001    /**
002     * 
003     * Copyright 2004 Protique Ltd
004     * 
005     * Licensed under the Apache License, Version 2.0 (the "License"); 
006     * you may not use this file except in compliance with the License. 
007     * You may obtain a copy of the License at 
008     * 
009     * http://www.apache.org/licenses/LICENSE-2.0
010     * 
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS, 
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
014     * See the License for the specific language governing permissions and 
015     * limitations under the License. 
016     * 
017     **/
018    package org.activemq.io.impl;
019    
020    import org.activemq.message.Packet;
021    import org.activemq.message.AbstractPacket;
022    import org.activemq.message.ActiveMQDestination;
023    import org.activemq.util.SerializationHelper;
024    import org.activemq.util.BitArray;
025    
026    import java.io.IOException;
027    import java.io.DataOutput;
028    import java.io.ByteArrayOutputStream;
029    import java.io.DataOutputStream;
030    import java.io.DataInput;
031    
032    /**
033     * @version $Revision: 1.1 $
034     */
035    public abstract class AbstractPacketMarshaller extends AbstractPacketReader implements PacketWriter {
036        protected int wireFormatVersion = DefaultWireFormat.WIRE_FORMAT_VERSION;
037    
038    
039        /**
040         * simple helper method to ensure null strings are catered for
041         *
042         * @param str
043         * @param dataOut
044         * @throws IOException
045         */
046        protected void writeUTF(String str, DataOutput dataOut) throws IOException {
047            if (str == null) {
048                str = "";
049            }
050            dataOut.writeUTF(str);
051        }
052    
053        /**
054         * Reads a destination from the input stream
055         */
056        protected ActiveMQDestination readDestination(DataInput dataIn) throws IOException {
057            return ActiveMQDestination.readFromStream(dataIn);
058        }
059    
060        /**
061         * Writes the given destination to the stream
062         */
063        protected void writeDestination(ActiveMQDestination destination, DataOutput dataOut) throws IOException {
064             ActiveMQDestination.writeToStream(destination, dataOut);
065        }
066    
067        /**
068         * @param packet
069         * @return true if this PacketWriter can write this type of Packet
070         */
071        public boolean canWrite(Packet packet) {
072            return packet.getPacketType() == this.getPacketType();
073        }
074    
075        /**
076         * Simple (but inefficent) utility method to write an object on to a stream
077         *
078         * @param object
079         * @param dataOut
080         * @throws IOException
081         */
082        protected void writeObject(Object object, DataOutput dataOut) throws IOException {
083            if (object != null) {
084                byte[] data = SerializationHelper.writeObject(object);
085                dataOut.writeInt(data.length);
086                dataOut.write(data);
087            }
088            else {
089                dataOut.writeInt(0);
090            }
091        }
092    
093        /**
094         * Serializes a Packet int a byte array
095         *
096         * @param packet
097         * @return the byte[]
098         * @throws IOException
099         */
100        public byte[] writePacketToByteArray(Packet packet) throws IOException {
101            ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
102            DataOutputStream dataOut = new DataOutputStream(bytesOut);
103            writePacket(packet, dataOut);
104            dataOut.flush();
105            return bytesOut.toByteArray();
106        }
107    
108        /**
109         * Write a Packet instance to data output stream
110         *
111         * @param p  the instance to be seralized
112         * @param dataOut the output stream
113         * @throws IOException thrown if an error occurs
114         */
115        public void writePacket(Packet p, DataOutput dataOut) throws IOException {
116            AbstractPacket packet = (AbstractPacket)p;
117            dataOut.writeShort(packet.getId());
118            BitArray ba = packet.getBitArray();
119            ba.set(AbstractPacket.RECEIPT_REQUIRED_INDEX, packet.isReceiptRequired());
120            Object[] visited = packet.getBrokersVisited();
121            boolean writeVisited = visited != null && visited.length > 0;
122            ba.set(AbstractPacket.BROKERS_VISITED_INDEX,writeVisited);
123            ba.writeToStream(dataOut);
124            if (writeVisited){
125                dataOut.writeShort(visited.length);
126                for(int i =0; i < visited.length; i++){
127                    dataOut.writeUTF(visited[i].toString());
128                }
129            }
130        }
131    
132        /**
133         * Set the wire format version
134         * @param version
135         */
136        public void setWireFormatVersion(int version){
137            this.wireFormatVersion = version;
138        }
139    
140        /**
141         * @return the wire format version
142         */
143        public int getWireFormatVersion(){
144            return wireFormatVersion;
145        }
146    
147    }