001 /** 002 * 003 * Copyright 2004 Protique Ltd 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 * 017 **/ 018 019 package org.activemq.io.impl; 020 import java.io.DataInput; 021 import java.io.IOException; 022 023 import org.activemq.message.AbstractPacket; 024 import org.activemq.message.ActiveMQDestination; 025 import org.activemq.message.ActiveMQXid; 026 import org.activemq.message.MessageAck; 027 import org.activemq.message.Packet; 028 import org.activemq.util.BitArray; 029 030 /** 031 * Reads a ConsumerInfo object from a Stream 032 */ 033 public class MessageAckReader extends AbstractPacketReader { 034 private AbstractDefaultWireFormat wireFormat; 035 036 MessageAckReader(AbstractDefaultWireFormat wf) { 037 this.wireFormat = wf; 038 } 039 040 MessageAckReader() { 041 } 042 043 /** 044 * Return the type of Packet 045 * 046 * @return integer representation of the type of Packet 047 */ 048 public int getPacketType() { 049 return Packet.ACTIVEMQ_MSG_ACK; 050 } 051 052 /** 053 * @return a new Packet instance 054 */ 055 public Packet createPacket() { 056 return new MessageAck(); 057 } 058 059 /** 060 * build a Packet instance from the data input stream 061 * 062 * @param packet A Packet object 063 * @param dataIn the data input stream to build the packet from 064 * @throws IOException 065 */ 066 public void buildPacket(Packet packet, DataInput dataIn) throws IOException { 067 MessageAck ack = (MessageAck) packet; 068 BitArray ba = ack.getBitArray(); 069 ba.readFromStream(dataIn); 070 boolean cachingEnabled = ba.get(MessageAck.CACHED_VALUES_INDEX); 071 ack.setMessageRead(ba.get(MessageAck.MESSAGE_READ_INDEX)); 072 ack.setPersistent(ba.get(MessageAck.PERSISTENT_INDEX)); 073 ack.setExpired(ba.get(MessageAck.EXPIRED_INDEX)); 074 if (ba.get(AbstractPacket.RECEIPT_REQUIRED_INDEX)) { 075 ack.setReceiptRequired(true); 076 ack.setId(dataIn.readShort()); 077 } 078 if (ba.get(MessageAck.EXTERNAL_MESSAGE_ID_INDEX)) { 079 ack.setExternalMessageId(true); 080 ack.setMessageID(dataIn.readUTF()); 081 } 082 else { 083 if (cachingEnabled) { 084 short key = dataIn.readShort(); 085 ack.setProducerKey((String) wireFormat.getValueFromReadCache(key)); 086 } 087 else { 088 ack.setProducerKey(dataIn.readUTF()); 089 } 090 if (ba.get(MessageAck.LONG_SEQUENCE_INDEX)) { 091 ack.setSequenceNumber(dataIn.readLong()); 092 } 093 else { 094 ack.setSequenceNumber(dataIn.readInt()); 095 } 096 } 097 if (ba.get(AbstractPacket.BROKERS_VISITED_INDEX)) { 098 int visitedLen = dataIn.readShort(); 099 for (int i = 0;i < visitedLen;i++) { 100 ack.addBrokerVisited(dataIn.readUTF()); 101 } 102 } 103 if (ba.get(MessageAck.TRANSACTION_ID_INDEX)) { 104 if (cachingEnabled) { 105 short key = dataIn.readShort(); 106 ack.setTransactionId(wireFormat.getValueFromReadCache(key)); 107 } else { 108 if (ba.get(MessageAck.XA_TRANS_INDEX)) { 109 ack.setTransactionId(ActiveMQXid.read(dataIn)); 110 } 111 else { 112 ack.setTransactionId(super.readUTF(dataIn)); 113 } 114 } 115 } 116 else { 117 ack.setTransactionId(null); 118 } 119 if (cachingEnabled) { 120 short key = dataIn.readShort(); 121 ack.setConsumerId((String) wireFormat.getValueFromReadCache(key)); 122 key = dataIn.readShort(); 123 ack.setDestination((ActiveMQDestination) wireFormat.getValueFromReadCache(key)); 124 } 125 else { 126 ack.setConsumerId(dataIn.readUTF()); 127 ack.setDestination(ActiveMQDestination.readFromStream(dataIn)); 128 } 129 } 130 }