001    /**
002     *
003     * Copyright 2004 Protique Ltd
004     *
005     * Licensed under the Apache License, Version 2.0 (the "License");
006     * you may not use this file except in compliance with the License.
007     * You may obtain a copy of the License at
008     *
009     * http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     *
017     **/
018    
019    
020    package org.activemq.io.impl;
021    import java.io.DataOutput;
022    import java.io.IOException;
023    
024    import org.activemq.message.AbstractPacket;
025    import org.activemq.message.ActiveMQDestination;
026    import org.activemq.message.ActiveMQXid;
027    import org.activemq.message.MessageAck;
028    import org.activemq.message.Packet;
029    import org.activemq.util.BitArray;
030    
031    /**
032     * Writes a ConsumerInfo object to a Stream
033     */
034    
035    public class MessageAckWriter extends AbstractPacketWriter {
036        private AbstractDefaultWireFormat wireFormat;
037        
038        MessageAckWriter(AbstractDefaultWireFormat wf){
039            this.wireFormat = wf;
040        }
041        
042        MessageAckWriter(){
043        }
044    
045        /**
046         * Return the type of Packet
047         *
048         * @return integer representation of the type of Packet
049         */
050    
051        public int getPacketType() {
052            return Packet.ACTIVEMQ_MSG_ACK;
053        }
054    
055        /**
056         * Write a Packet instance to data output stream
057         *
058         * @param packet  the instance to be seralized
059         * @param dataOut the output stream
060         * @throws IOException thrown if an error occurs
061         */
062    
063        public void writePacket(Packet packet, DataOutput dataOut) throws IOException {
064            MessageAck ack = (MessageAck) packet;
065            
066            boolean cachingEnabled = wireFormat != null ? wireFormat.isCachingEnabled() : false;
067            boolean longSequence = ack.getSequenceNumber() > Integer.MAX_VALUE;
068            
069           
070            Object[] visited = ack.getBrokersVisited();
071            boolean writeVisited = visited != null && visited.length > 0;
072            BitArray ba = ack.getBitArray();
073            ba.reset();
074            ba.set(AbstractPacket.RECEIPT_REQUIRED_INDEX, ack.isReceiptRequired());
075            ba.set(AbstractPacket.BROKERS_VISITED_INDEX,writeVisited);
076            ba.set(MessageAck.MESSAGE_READ_INDEX, ack.isMessageRead());
077            ba.set(MessageAck.TRANSACTION_ID_INDEX, ack.isPartOfTransaction());
078            ba.set(MessageAck.XA_TRANS_INDEX, ack.isXaTransacted());
079            ba.set(MessageAck.PERSISTENT_INDEX,ack.isPersistent());
080            ba.set(MessageAck.EXPIRED_INDEX,ack.isExpired());
081            ba.set(MessageAck.EXTERNAL_MESSAGE_ID_INDEX, ack.isExternalMessageId());
082            ba.set(MessageAck.CACHED_VALUES_INDEX,cachingEnabled);
083            ba.set(MessageAck.LONG_SEQUENCE_INDEX, longSequence);
084            ba.writeToStream(dataOut);
085            
086            if (ack.isReceiptRequired()){
087                dataOut.writeShort(ack.getId());
088            }
089            if (ack.isExternalMessageId()){
090                writeUTF(ack.getMessageID(),dataOut);
091            }else {
092                if (cachingEnabled){
093                    dataOut.writeShort(wireFormat.getWriteCachedKey(ack.getProducerKey()));
094                }else{
095                    writeUTF(ack.getProducerKey(),dataOut);
096                }
097                if (longSequence){
098                    dataOut.writeLong(ack.getSequenceNumber());
099                }else {
100                    dataOut.writeInt((int)ack.getSequenceNumber());
101                }
102            }
103            if (writeVisited){
104                dataOut.writeShort(visited.length);
105                for(int i =0; i < visited.length; i++){
106                    final String brokerName = visited[i].toString();
107                    if (brokerName != null) {
108                        dataOut.writeUTF(brokerName);
109                    }
110                }
111            }
112             
113            if (ack.isPartOfTransaction()) {
114                if (cachingEnabled){
115                    dataOut.writeShort(wireFormat.getWriteCachedKey(ack.getTransactionId()));
116                } else {
117                    if( ack.isXaTransacted()) {
118                        ActiveMQXid xid = (ActiveMQXid) ack.getTransactionId();
119                        xid.write(dataOut);
120                    } else {
121                        super.writeUTF((String) ack.getTransactionId(), dataOut);
122                    }            
123                }
124            }
125    
126            if (cachingEnabled){
127                dataOut.writeShort(wireFormat.getWriteCachedKey(ack.getConsumerId()));
128                dataOut.writeShort(wireFormat.getWriteCachedKey(ack.getDestination()));
129            }else {
130                super.writeUTF(ack.getConsumerId(), dataOut);
131                ActiveMQDestination.writeToStream((ActiveMQDestination) ack.getDestination(), dataOut);
132            }
133        }
134    
135    
136    }