001 /** 002 * 003 * Copyright 2004 Protique Ltd 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 * 017 **/ 018 019 package org.activemq; 020 021 import javax.jms.ConnectionConsumer; 022 import javax.jms.IllegalStateException; 023 import javax.jms.JMSException; 024 import javax.jms.ServerSession; 025 import javax.jms.ServerSessionPool; 026 import javax.jms.Session; 027 028 import org.activemq.io.util.MemoryBoundedQueue; 029 import org.activemq.message.ActiveMQMessage; 030 import org.activemq.message.ConsumerInfo; 031 032 /** 033 * For application servers, <CODE>Connection</CODE> objects provide a special 034 * facility for creating a <CODE>ConnectionConsumer</CODE> (optional). The 035 * messages it is to consume are specified by a <CODE>Destination</CODE> and a 036 * message selector. In addition, a <CODE>ConnectionConsumer</CODE> must be 037 * given a <CODE>ServerSessionPool</CODE> to use for processing its messages. 038 * <p/> 039 * <P> 040 * Normally, when traffic is light, a <CODE>ConnectionConsumer</CODE> gets a 041 * <CODE>ServerSession</CODE> from its pool, loads it with a single message, 042 * and starts it. As traffic picks up, messages can back up. If this happens, a 043 * <CODE>ConnectionConsumer</CODE> can load each <CODE>ServerSession</CODE> 044 * with more than one message. This reduces the thread context switches and 045 * minimizes resource use at the expense of some serialization of message 046 * processing. 047 * 048 * @see javax.jms.Connection#createConnectionConsumer 049 * @see javax.jms.Connection#createDurableConnectionConsumer 050 * @see javax.jms.QueueConnection#createConnectionConsumer 051 * @see javax.jms.TopicConnection#createConnectionConsumer 052 * @see javax.jms.TopicConnection#createDurableConnectionConsumer 053 */ 054 055 public class ActiveMQConnectionConsumer implements ConnectionConsumer, ActiveMQMessageDispatcher { 056 057 private ActiveMQConnection connection; 058 059 private ServerSessionPool sessionPool; 060 061 private ConsumerInfo consumerInfo; 062 063 private boolean closed; 064 065 protected MemoryBoundedQueue messageQueue; 066 067 /** 068 * Create a ConnectionConsumer 069 * 070 * @param theConnection 071 * @param theSessionPool 072 * @param theConsumerInfo 073 * @param theMaximumMessages 074 * @throws JMSException 075 */ 076 protected ActiveMQConnectionConsumer(ActiveMQConnection theConnection, ServerSessionPool theSessionPool, 077 ConsumerInfo theConsumerInfo, int theMaximumMessages) throws JMSException { 078 this.connection = theConnection; 079 this.sessionPool = theSessionPool; 080 this.consumerInfo = theConsumerInfo; 081 this.connection.addConnectionConsumer(this); 082 this.consumerInfo.setStarted(true); 083 this.consumerInfo.setPrefetchNumber(theMaximumMessages); 084 this.connection.syncSendPacket(this.consumerInfo); 085 086 String queueName = connection.clientID + ":" + theConsumerInfo.getConsumerName() + ":" 087 + theConsumerInfo.getConsumerNo(); 088 this.messageQueue = connection.getMemoryBoundedQueue(queueName); 089 } 090 091 /** 092 * Tests to see if the Message Dispatcher is a target for this message 093 * 094 * @param message 095 * the message to test 096 * @return true if the Message Dispatcher can dispatch the message 097 */ 098 public boolean isTarget(ActiveMQMessage message) { 099 return message.isConsumerTarget(this.consumerInfo.getConsumerNo()); 100 } 101 102 /** 103 * Dispatch an ActiveMQMessage 104 * 105 * @param message 106 */ 107 public void dispatch(ActiveMQMessage message) { 108 if (message.isConsumerTarget(this.consumerInfo.getConsumerNo())) { 109 message.setConsumerIdentifer(this.consumerInfo.getConsumerId()); 110 message.setTransientConsumed(!this.consumerInfo.isDurableTopic() 111 && !this.consumerInfo.getDestination().isQueue()); 112 try { 113 if (sessionPool != null) 114 dispatchToSession(message); 115 else 116 dispatchToQueue(message); 117 } catch (JMSException jmsEx) { 118 this.connection.handleAsyncException(jmsEx); 119 } 120 } 121 } 122 123 /** 124 * @param message 125 * @throws JMSException 126 */ 127 private void dispatchToQueue(ActiveMQMessage message) throws JMSException { 128 messageQueue.enqueue(message); 129 } 130 131 /** 132 * Receives the next message that arrives within the specified timeout 133 * interval. 134 * 135 * @throws JMSException 136 */ 137 public ActiveMQMessage receive(long timeout) throws JMSException { 138 try { 139 ActiveMQMessage message = (ActiveMQMessage) messageQueue.dequeue(timeout); 140 return message; 141 } catch (InterruptedException ioe) { 142 return null; 143 } 144 } 145 146 /** 147 * @param message 148 * @throws JMSException 149 */ 150 private void dispatchToSession(ActiveMQMessage message) throws JMSException { 151 152 ServerSession serverSession = sessionPool.getServerSession(); 153 Session nestedSession = serverSession.getSession(); 154 ActiveMQSession session = null; 155 if (nestedSession instanceof ActiveMQSession) { 156 session = (ActiveMQSession) nestedSession; 157 } else if (nestedSession instanceof ActiveMQTopicSession) { 158 ActiveMQTopicSession topicSession = (ActiveMQTopicSession) nestedSession; 159 session = (ActiveMQSession) topicSession.getNext(); 160 } else if (nestedSession instanceof ActiveMQQueueSession) { 161 ActiveMQQueueSession queueSession = (ActiveMQQueueSession) nestedSession; 162 session = (ActiveMQSession) queueSession.getNext(); 163 } else { 164 throw new JMSException("Invalid instance of session obtained from server session." + 165 "The instance should be one of the following: ActiveMQSession, ActiveMQTopicSession, ActiveMQQueueSession. " + 166 "Found instance of " + nestedSession.getClass().getName()); 167 } 168 session.dispatch(message); 169 serverSession.start(); 170 } 171 172 /** 173 * Gets the server session pool associated with this connection consumer. 174 * 175 * @return the server session pool used by this connection consumer 176 * @throws JMSException 177 * if the JMS provider fails to get the server session pool 178 * associated with this consumer due to some internal error. 179 */ 180 181 public ServerSessionPool getServerSessionPool() throws JMSException { 182 if (closed) { 183 throw new IllegalStateException("The Connection Consumer is closed"); 184 } 185 return this.sessionPool; 186 } 187 188 /** 189 * Closes the connection consumer. <p/> 190 * <P> 191 * Since a provider may allocate some resources on behalf of a connection 192 * consumer outside the Java virtual machine, clients should close these 193 * resources when they are not needed. Relying on garbage collection to 194 * eventually reclaim these resources may not be timely enough. 195 * 196 * @throws JMSException 197 */ 198 199 public void close() throws JMSException { 200 if (!closed) { 201 closed = true; 202 this.consumerInfo.setStarted(false); 203 this.connection.asyncSendPacket(this.consumerInfo); 204 this.connection.removeConnectionConsumer(this); 205 } 206 207 } 208 }