001    /** 
002     * 
003     * Copyright 2004 Protique Ltd
004     * Copyright 2004 Hiram Chirino
005     * 
006     * Licensed under the Apache License, Version 2.0 (the "License"); 
007     * you may not use this file except in compliance with the License. 
008     * You may obtain a copy of the License at 
009     * 
010     * http://www.apache.org/licenses/LICENSE-2.0
011     * 
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS, 
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
015     * See the License for the specific language governing permissions and 
016     * limitations under the License. 
017     * 
018     **/
019    package org.activemq.io.impl;
020    
021    import java.io.ByteArrayOutputStream;
022    import java.io.DataInput;
023    import java.io.DataInputStream;
024    import java.io.DataOutput;
025    import java.io.DataOutputStream;
026    import java.io.IOException;
027    import java.io.ObjectStreamException;
028    
029    import org.activeio.PacketData;
030    import org.activeio.adapter.PacketByteArrayOutputStream;
031    import org.activeio.adapter.PacketInputStream;
032    import org.activemq.io.WireFormat;
033    import org.activemq.message.CachedValue;
034    import org.activemq.message.Packet;
035    
036    /**
037     * Provides a stateless implementation of AbstractDefaultWireFormat.  Safe for use by multiple threads and incurs no locking overhead.
038     * 
039     * @version $Revision: 1.1.1.1 $
040     */
041    public class StatelessDefaultWireFormat extends AbstractDefaultWireFormat {
042    
043        private static final long serialVersionUID = -2648674156081593006L;
044    
045        public Packet writePacket(Packet packet, DataOutput dataOut) throws IOException {
046            
047            PacketWriter writer = getWriter(packet);
048            if (writer != null) {
049                
050                PacketByteArrayOutputStream internalBytesOut = new PacketByteArrayOutputStream( 50+ (packet.getMemoryUsage()==0 ? 1024 : packet.getMemoryUsage()) );
051                DataOutputStream internalDataOut = new DataOutputStream(internalBytesOut);            
052                writer.writePacket(packet, internalDataOut);
053                internalDataOut.close();
054                
055                org.activeio.Packet p = internalBytesOut.getPacket();
056                int count = p.remaining();
057    
058                dataOut.writeByte(packet.getPacketType());
059                dataOut.writeInt(count);
060                packet.setMemoryUsage(count);
061                p.writeTo(dataOut);
062                
063            }
064            return null;
065        }
066    
067        /**
068         * Write a Packet to a PacketByteArrayOutputStream
069         * 
070         * @param packet
071         * @param dataOut
072         * @return a response packet - or null
073         * @throws IOException
074         */
075        public org.activeio.Packet writePacket(Packet packet, PacketByteArrayOutputStream paos) throws IOException {
076            PacketWriter writer = getWriter(packet);
077            if (writer != null) {
078                
079                // We may not be writing to the start of the PAOS.
080                int startPosition = paos.position();        
081                // Skip space for the headers.
082                paos.skip(5);
083                // Stream the data.
084                DataOutputStream data = new DataOutputStream(paos);
085                writer.writePacket(packet, data);
086                data.close();
087                org.activeio.Packet rc = paos.getPacket();
088                
089                int count = rc.remaining()-(startPosition+5);
090                packet.setMemoryUsage(count);
091                
092                // Now write the headers to the packet.
093                
094                rc.position(startPosition);
095                PacketData pd = new PacketData(rc);
096                pd.writeByte(packet.getPacketType());
097                pd.writeInt(count);
098                rc.rewind();
099                return rc;
100            }
101            return null;
102        }
103     
104        /**
105         * A helper method which converts a packet into a byte array Overrides the WireFormat to make use of the internal
106         * BytesOutputStream
107         * 
108         * @param packet
109         * @return a byte array representing the packet using some wire protocol
110         * @throws IOException
111         */
112        public byte[] toBytes(Packet packet) throws IOException {
113            
114            byte[] data = null;
115            PacketWriter writer = getWriter(packet);
116            if (writer != null) {
117                
118                // Try to guess the right size.            
119                ByteArrayOutputStream internalBytesOut = new ByteArrayOutputStream( 50+ (packet.getMemoryUsage()==0 ? 1024 : packet.getMemoryUsage()) );
120                DataOutputStream internalDataOut = new DataOutputStream(internalBytesOut);
121                
122                internalBytesOut.reset();
123                internalDataOut.writeByte(packet.getPacketType());
124                internalDataOut.writeInt(-1);//the length
125                writer.writePacket(packet, internalDataOut);
126                internalDataOut.flush();
127                data = internalBytesOut.toByteArray();
128                // lets subtract the header offset from the length
129                int length = data.length - 5;
130                packet.setMemoryUsage(length);
131                //write in the length to the data
132                data[1] = (byte) ((length >>> 24) & 0xFF);
133                data[2] = (byte) ((length >>> 16) & 0xFF);
134                data[3] = (byte) ((length >>> 8) & 0xFF);
135                data[4] = (byte) ((length >>> 0) & 0xFF);
136            }
137            return data;
138        }
139        
140        protected Packet readPacket(DataInput dataIn, PacketReader reader) throws IOException {
141            Packet packet = reader.createPacket();
142            int length = dataIn.readInt();
143            packet.setMemoryUsage(length);
144            reader.buildPacket(packet, dataIn);
145            return packet;
146        }
147    
148        /**
149         * @param dataIn
150         * @return
151         * @throws IOException
152         */
153        public Packet readPacket(org.activeio.Packet dataIn) throws IOException {
154            return readPacket(new DataInputStream(new PacketInputStream(dataIn)));
155        }
156    
157        protected void handleCachedValue(CachedValue cv) {
158            throw new IllegalStateException("Value caching is not supported.");
159        }
160    
161        public Object getValueFromReadCache(short key) {
162            throw new IllegalStateException("Value caching is not supported.");
163        }
164    
165        short getWriteCachedKey(Object value) {
166            throw new IllegalStateException("Value caching is not supported.");
167        }
168        
169        public boolean isCachingEnabled() {
170            return false;
171        }
172    
173        public WireFormat copy() {
174            return new StatelessDefaultWireFormat();
175        }
176        
177        private Object readResolve() throws ObjectStreamException {
178            return new DefaultWireFormat();
179        }
180    
181    }