001 /** 002 * 003 * Copyright 2004 Protique Ltd 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 * 017 **/ 018 019 package org.activemq; 020 import java.io.IOException; 021 import java.io.Serializable; 022 import java.util.Iterator; 023 import java.util.LinkedList; 024 import java.util.List; 025 import java.util.ListIterator; 026 027 import javax.jms.BytesMessage; 028 import javax.jms.DeliveryMode; 029 import javax.jms.Destination; 030 import javax.jms.IllegalStateException; 031 import javax.jms.InvalidDestinationException; 032 import javax.jms.InvalidSelectorException; 033 import javax.jms.JMSException; 034 import javax.jms.MapMessage; 035 import javax.jms.Message; 036 import javax.jms.MessageConsumer; 037 import javax.jms.MessageListener; 038 import javax.jms.MessageProducer; 039 import javax.jms.ObjectMessage; 040 import javax.jms.Queue; 041 import javax.jms.QueueBrowser; 042 import javax.jms.QueueReceiver; 043 import javax.jms.QueueSender; 044 import javax.jms.QueueSession; 045 import javax.jms.Session; 046 import javax.jms.StreamMessage; 047 import javax.jms.TemporaryQueue; 048 import javax.jms.TemporaryTopic; 049 import javax.jms.TextMessage; 050 import javax.jms.Topic; 051 import javax.jms.TopicPublisher; 052 import javax.jms.TopicSession; 053 import javax.jms.TopicSubscriber; 054 import javax.jms.TransactionRolledBackException; 055 056 import org.activemq.io.util.ByteArray; 057 import org.activemq.io.util.ByteArrayCompression; 058 import org.activemq.io.util.ByteArrayFragmentation; 059 import org.activemq.management.JMSSessionStatsImpl; 060 import org.activemq.management.StatsCapable; 061 import org.activemq.management.StatsImpl; 062 import org.activemq.message.ActiveMQBytesMessage; 063 import org.activemq.message.ActiveMQDestination; 064 import org.activemq.message.ActiveMQMapMessage; 065 import org.activemq.message.ActiveMQMessage; 066 import org.activemq.message.ActiveMQObjectMessage; 067 import org.activemq.message.ActiveMQQueue; 068 import org.activemq.message.ActiveMQStreamMessage; 069 import org.activemq.message.ActiveMQTemporaryQueue; 070 import org.activemq.message.ActiveMQTemporaryTopic; 071 import org.activemq.message.ActiveMQTextMessage; 072 import org.activemq.message.ActiveMQTopic; 073 import org.activemq.message.ConsumerInfo; 074 import org.activemq.message.DurableUnsubscribe; 075 import org.activemq.message.MessageAck; 076 import org.activemq.message.MessageAcknowledge; 077 import org.activemq.message.ProducerInfo; 078 import org.activemq.service.impl.DefaultQueueList; 079 import org.activemq.util.IdGenerator; 080 import org.apache.commons.logging.Log; 081 import org.apache.commons.logging.LogFactory; 082 083 import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArrayList; 084 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean; 085 086 /** 087 * <P> 088 * A <CODE>Session</CODE> object is a single-threaded context for producing and consuming messages. Although it may 089 * allocate provider resources outside the Java virtual machine (JVM), it is considered a lightweight JMS object. 090 * <P> 091 * A session serves several purposes: 092 * <UL> 093 * <LI>It is a factory for its message producers and consumers. 094 * <LI>It supplies provider-optimized message factories. 095 * <LI>It is a factory for <CODE>TemporaryTopics</CODE> and <CODE>TemporaryQueues</CODE>. 096 * <LI>It provides a way to create <CODE>Queue</CODE> or <CODE>Topic</CODE> objects for those clients that need to 097 * dynamically manipulate provider-specific destination names. 098 * <LI>It supports a single series of transactions that combine work spanning its producers and consumers into atomic 099 * units. 100 * <LI>It defines a serial order for the messages it consumes and the messages it produces. 101 * <LI>It retains messages it consumes until they have been acknowledged. 102 * <LI>It serializes execution of message listeners registered with its message consumers. 103 * <LI>It is a factory for <CODE>QueueBrowsers</CODE>. 104 * </UL> 105 * <P> 106 * A session can create and service multiple message producers and consumers. 107 * <P> 108 * One typical use is to have a thread block on a synchronous <CODE>MessageConsumer</CODE> until a message arrives. 109 * The thread may then use one or more of the <CODE>Session</CODE>'s<CODE>MessageProducer</CODE>s. 110 * <P> 111 * If a client desires to have one thread produce messages while others consume them, the client should use a separate 112 * session for its producing thread. 113 * <P> 114 * Once a connection has been started, any session with one or more registered message listeners is dedicated to the 115 * thread of control that delivers messages to it. It is erroneous for client code to use this session or any of its 116 * constituent objects from another thread of control. The only exception to this rule is the use of the session or 117 * connection <CODE>close</CODE> method. 118 * <P> 119 * It should be easy for most clients to partition their work naturally into sessions. This model allows clients to 120 * start simply and incrementally add message processing complexity as their need for concurrency grows. 121 * <P> 122 * The <CODE>close</CODE> method is the only session method that can be called while some other session method is 123 * being executed in another thread. 124 * <P> 125 * A session may be specified as transacted. Each transacted session supports a single series of transactions. Each 126 * transaction groups a set of message sends and a set of message receives into an atomic unit of work. In effect, 127 * transactions organize a session's input message stream and output message stream into series of atomic units. When a 128 * transaction commits, its atomic unit of input is acknowledged and its associated atomic unit of output is sent. If a 129 * transaction rollback is done, the transaction's sent messages are destroyed and the session's input is automatically 130 * recovered. 131 * <P> 132 * The content of a transaction's input and output units is simply those messages that have been produced and consumed 133 * within the session's current transaction. 134 * <P> 135 * A transaction is completed using either its session's <CODE>commit</CODE> method or its session's <CODE>rollback 136 * </CODE> method. The completion of a session's current transaction automatically begins the next. The result is that a 137 * transacted session always has a current transaction within which its work is done. 138 * <P> 139 * The Java Transaction Service (JTS) or some other transaction monitor may be used to combine a session's transaction 140 * with transactions on other resources (databases, other JMS sessions, etc.). Since Java distributed transactions are 141 * controlled via the Java Transaction API (JTA), use of the session's <CODE>commit</CODE> and <CODE>rollback</CODE> 142 * methods in this context is prohibited. 143 * <P> 144 * The JMS API does not require support for JTA; however, it does define how a provider supplies this support. 145 * <P> 146 * Although it is also possible for a JMS client to handle distributed transactions directly, it is unlikely that many 147 * JMS clients will do this. Support for JTA in the JMS API is targeted at systems vendors who will be integrating the 148 * JMS API into their application server products. 149 * 150 * @version $Revision: 1.1.1.1 $ 151 * @see javax.jms.Session 152 * @see javax.jms.QueueSession 153 * @see javax.jms.TopicSession 154 * @see javax.jms.XASession 155 */ 156 public class ActiveMQSession 157 implements 158 Session, 159 QueueSession, 160 TopicSession, 161 ActiveMQMessageDispatcher, 162 MessageAcknowledge, 163 StatsCapable { 164 165 public static interface DeliveryListener { 166 public void beforeDelivery(ActiveMQSession session, Message msg); 167 public void afterDelivery(ActiveMQSession session, Message msg); 168 } 169 170 protected static final int CONSUMER_DISPATCH_UNSET = 1; 171 protected static final int CONSUMER_DISPATCH_ASYNC = 2; 172 protected static final int CONSUMER_DISPATCH_SYNC = 3; 173 private static final Log log = LogFactory.getLog(ActiveMQSession.class); 174 protected ActiveMQConnection connection; 175 protected int acknowledgeMode; 176 protected CopyOnWriteArrayList consumers; 177 protected CopyOnWriteArrayList producers; 178 private IdGenerator temporaryDestinationGenerator; 179 private MessageListener messageListener; 180 protected boolean closed; 181 private SynchronizedBoolean started; 182 private short sessionId; 183 private long startTime; 184 private DefaultQueueList deliveredMessages; 185 private ActiveMQSessionExecutor messageExecutor; 186 private JMSSessionStatsImpl stats; 187 private int consumerDispatchState; 188 private ByteArrayCompression compression; 189 private TransactionContext transactionContext; 190 private boolean internalSession; 191 private DeliveryListener deliveryListener; 192 193 /** 194 * Construct the Session 195 * 196 * @param theConnection 197 * @param theAcknowledgeMode n.b if transacted - the acknowledgeMode == Session.SESSION_TRANSACTED 198 * @throws JMSException on internal error 199 */ 200 protected ActiveMQSession(ActiveMQConnection theConnection, int theAcknowledgeMode) throws JMSException { 201 this(theConnection, theAcknowledgeMode,theConnection.isOptimizedMessageDispatch()); 202 } 203 204 /** 205 * Construct the Session 206 * 207 * @param theConnection 208 * @param theAcknowledgeMode n.b if transacted - the acknowledgeMode == Session.SESSION_TRANSACTED 209 * @param optimizedDispatch 210 * @throws JMSException on internal error 211 */ 212 protected ActiveMQSession(ActiveMQConnection theConnection, int theAcknowledgeMode,boolean optimizedDispatch) throws JMSException { 213 this.connection = theConnection; 214 this.acknowledgeMode = theAcknowledgeMode; 215 setTransactionContext(new TransactionContext(theConnection)); 216 this.consumers = new CopyOnWriteArrayList(); 217 this.producers = new CopyOnWriteArrayList(); 218 this.temporaryDestinationGenerator = new IdGenerator(); 219 this.started = new SynchronizedBoolean(false); 220 this.sessionId = connection.generateSessionId(); 221 this.startTime = System.currentTimeMillis(); 222 this.deliveredMessages = new DefaultQueueList(); 223 this.messageExecutor = new ActiveMQSessionExecutor(this, connection.getMemoryBoundedQueue("Session(" 224 + sessionId + ")")); 225 this.messageExecutor.setOptimizedMessageDispatch(optimizedDispatch); 226 connection.addSession(this); 227 stats = new JMSSessionStatsImpl(producers, consumers); 228 this.consumerDispatchState = CONSUMER_DISPATCH_UNSET; 229 this.compression = new ByteArrayCompression(); 230 this.compression.setCompressionLevel(theConnection.getMessageCompressionLevel()); 231 this.compression.setCompressionStrategy(theConnection.getMessageCompressionStrategy()); 232 this.compression.setCompressionLimit(theConnection.getMessageCompressionLimit()); 233 234 this.internalSession = theConnection.isInternalConnection(); 235 } 236 237 public void setTransactionContext(TransactionContext transactionContext) { 238 if( this.transactionContext!=null ) { 239 this.transactionContext.removeSession(this); 240 } 241 this.transactionContext = transactionContext; 242 this.transactionContext.addSession(this); 243 } 244 245 public TransactionContext getTransactionContext() { 246 return transactionContext; 247 } 248 249 public StatsImpl getStats() { 250 return stats; 251 } 252 253 public JMSSessionStatsImpl getSessionStats() { 254 return stats; 255 } 256 257 /** 258 * Creates a <CODE>BytesMessage</CODE> object. A <CODE>BytesMessage</CODE> object is used to send a message 259 * containing a stream of uninterpreted bytes. 260 * 261 * @return the an ActiveMQBytesMessage 262 * @throws JMSException if the JMS provider fails to create this message due to some internal error. 263 */ 264 public BytesMessage createBytesMessage() throws JMSException { 265 checkClosed(); 266 return new ActiveMQBytesMessage(); 267 } 268 269 /** 270 * Creates a <CODE>MapMessage</CODE> object. A <CODE>MapMessage</CODE> object is used to send a self-defining 271 * set of name-value pairs, where names are <CODE>String</CODE> objects and values are primitive values in the 272 * Java programming language. 273 * 274 * @return an ActiveMQMapMessage 275 * @throws JMSException if the JMS provider fails to create this message due to some internal error. 276 */ 277 public MapMessage createMapMessage() throws JMSException { 278 checkClosed(); 279 return new ActiveMQMapMessage(); 280 } 281 282 /** 283 * Creates a <CODE>Message</CODE> object. The <CODE>Message</CODE> interface is the root interface of all JMS 284 * messages. A <CODE>Message</CODE> object holds all the standard message header information. It can be sent when 285 * a message containing only header information is sufficient. 286 * 287 * @return an ActiveMQMessage 288 * @throws JMSException if the JMS provider fails to create this message due to some internal error. 289 */ 290 public Message createMessage() throws JMSException { 291 checkClosed(); 292 return new ActiveMQMessage(); 293 } 294 295 /** 296 * Creates an <CODE>ObjectMessage</CODE> object. An <CODE>ObjectMessage</CODE> object is used to send a message 297 * that contains a serializable Java object. 298 * 299 * @return an ActiveMQObjectMessage 300 * @throws JMSException if the JMS provider fails to create this message due to some internal error. 301 */ 302 public ObjectMessage createObjectMessage() throws JMSException { 303 checkClosed(); 304 return new ActiveMQObjectMessage(); 305 } 306 307 /** 308 * Creates an initialized <CODE>ObjectMessage</CODE> object. An <CODE>ObjectMessage</CODE> object is used to 309 * send a message that contains a serializable Java object. 310 * 311 * @param object the object to use to initialize this message 312 * @return an ActiveMQObjectMessage 313 * @throws JMSException if the JMS provider fails to create this message due to some internal error. 314 */ 315 public ObjectMessage createObjectMessage(Serializable object) throws JMSException { 316 checkClosed(); 317 ActiveMQObjectMessage msg = new ActiveMQObjectMessage(); 318 msg.setObject(object); 319 return msg; 320 } 321 322 /** 323 * Creates a <CODE>StreamMessage</CODE> object. A <CODE>StreamMessage</CODE> object is used to send a 324 * self-defining stream of primitive values in the Java programming language. 325 * 326 * @return an ActiveMQStreamMessage 327 * @throws JMSException if the JMS provider fails to create this message due to some internal error. 328 */ 329 public StreamMessage createStreamMessage() throws JMSException { 330 checkClosed(); 331 return new ActiveMQStreamMessage(); 332 } 333 334 /** 335 * Creates a <CODE>TextMessage</CODE> object. A <CODE>TextMessage</CODE> object is used to send a message 336 * containing a <CODE>String</CODE> object. 337 * 338 * @return an ActiveMQTextMessage 339 * @throws JMSException if the JMS provider fails to create this message due to some internal error. 340 */ 341 public TextMessage createTextMessage() throws JMSException { 342 checkClosed(); 343 return new ActiveMQTextMessage(); 344 } 345 346 /** 347 * Creates an initialized <CODE>TextMessage</CODE> object. A <CODE>TextMessage</CODE> object is used to send a 348 * message containing a <CODE>String</CODE>. 349 * 350 * @param text the string used to initialize this message 351 * @return an ActiveMQTextMessage 352 * @throws JMSException if the JMS provider fails to create this message due to some internal error. 353 */ 354 public TextMessage createTextMessage(String text) throws JMSException { 355 checkClosed(); 356 ActiveMQTextMessage msg = new ActiveMQTextMessage(); 357 msg.setText(text); 358 return msg; 359 } 360 361 /** 362 * Indicates whether the session is in transacted mode. 363 * 364 * @return true if the session is in transacted mode 365 * @throws JMSException if there is some internal error. 366 */ 367 public boolean getTransacted() throws JMSException { 368 checkClosed(); 369 return this.acknowledgeMode == Session.SESSION_TRANSACTED || transactionContext.isInXATransaction(); 370 } 371 372 /** 373 * Returns the acknowledgement mode of the session. The acknowledgement mode is set at the time that the session is 374 * created. If the session is transacted, the acknowledgement mode is ignored. 375 * 376 * @return If the session is not transacted, returns the current acknowledgement mode for the session. If the 377 * session is transacted, returns SESSION_TRANSACTED. 378 * @throws JMSException 379 * @see javax.jms.Connection#createSession(boolean,int) 380 * @since 1.1 exception JMSException if there is some internal error. 381 */ 382 public int getAcknowledgeMode() throws JMSException { 383 checkClosed(); 384 return this.acknowledgeMode; 385 } 386 387 /** 388 * Commits all messages done in this transaction and releases any locks currently held. 389 * 390 * @throws JMSException if the JMS provider fails to commit the transaction due to some internal error. 391 * @throws TransactionRolledBackException if the transaction is rolled back due to some internal error during 392 * commit. 393 * @throws javax.jms.IllegalStateException if the method is not called by a transacted session. 394 */ 395 public void commit() throws JMSException { 396 checkClosed(); 397 if (!getTransacted()) { 398 throw new javax.jms.IllegalStateException("Not a transacted session"); 399 } 400 transactionContext.commit(); 401 } 402 403 /** 404 * Rolls back any messages done in this transaction and releases any locks currently held. 405 * 406 * @throws JMSException if the JMS provider fails to roll back the transaction due to some internal error. 407 * @throws javax.jms.IllegalStateException if the method is not called by a transacted session. 408 */ 409 public void rollback() throws JMSException { 410 checkClosed(); 411 if (!getTransacted()) { 412 throw new javax.jms.IllegalStateException("Not a transacted session"); 413 } 414 transactionContext.rollback(); 415 } 416 417 public void clearDeliveredMessages() { 418 deliveredMessages.clear(); 419 } 420 421 /** 422 * Closes the session. 423 * <P> 424 * Since a provider may allocate some resources on behalf of a session outside the JVM, clients should close the 425 * resources when they are not needed. Relying on garbage collection to eventually reclaim these resources may not 426 * be timely enough. 427 * <P> 428 * There is no need to close the producers and consumers of a closed session. 429 * <P> 430 * This call will block until a <CODE>receive</CODE> call or message listener in progress has completed. A blocked 431 * message consumer <CODE>receive</CODE> call returns <CODE>null</CODE> when this session is closed. 432 * <P> 433 * Closing a transacted session must roll back the transaction in progress. 434 * <P> 435 * This method is the only <CODE>Session</CODE> method that can be called concurrently. 436 * <P> 437 * Invoking any other <CODE>Session</CODE> method on a closed session must throw a <CODE> 438 * JMSException.IllegalStateException</CODE>. Closing a closed session must <I>not </I> throw an exception. 439 * 440 * @throws JMSException if the JMS provider fails to close the session due to some internal error. 441 */ 442 public void close() throws JMSException { 443 if (!this.closed) { 444 if (getTransactionContext().isInLocalTransaction()) { 445 rollback(); 446 } 447 doClose(); 448 closed = true; 449 } 450 } 451 452 protected void doClose() throws JMSException { 453 doAcknowledge(true); 454 deliveredMessages.clear(); 455 for (Iterator i = consumers.iterator();i.hasNext();) { 456 ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) i.next(); 457 consumer.close(); 458 } 459 for (Iterator i = producers.iterator();i.hasNext();) { 460 ActiveMQMessageProducer producer = (ActiveMQMessageProducer) i.next(); 461 producer.close(); 462 } 463 consumers.clear(); 464 producers.clear(); 465 this.connection.removeSession(this); 466 this.transactionContext.removeSession(this); 467 messageExecutor.close(); 468 } 469 470 /** 471 * @throws IllegalStateException if the Session is closed 472 */ 473 protected void checkClosed() throws IllegalStateException { 474 if (this.closed) { 475 throw new IllegalStateException("The Session is closed"); 476 } 477 } 478 479 /** 480 * Stops message delivery in this session, and restarts message delivery with the oldest unacknowledged message. 481 * <P> 482 * All consumers deliver messages in a serial order. Acknowledging a received message automatically acknowledges all 483 * messages that have been delivered to the client. 484 * <P> 485 * Restarting a session causes it to take the following actions: 486 * <UL> 487 * <LI>Stop message delivery 488 * <LI>Mark all messages that might have been delivered but not acknowledged as "redelivered" 489 * <LI>Restart the delivery sequence including all unacknowledged messages that had been previously delivered. 490 * Redelivered messages do not have to be delivered in exactly their original delivery order. 491 * </UL> 492 * 493 * @throws JMSException if the JMS provider fails to stop and restart message delivery due to some internal error. 494 * @throws IllegalStateException if the method is called by a transacted session. 495 */ 496 public void recover() throws JMSException { 497 checkClosed(); 498 if (getTransacted()) { 499 throw new IllegalStateException("This session is transacted"); 500 } 501 redeliverUnacknowledgedMessages(); 502 } 503 504 /** 505 * Returns the session's distinguished message listener (optional). 506 * 507 * @return the message listener associated with this session 508 * @throws JMSException if the JMS provider fails to get the message listener due to an internal error. 509 * @see javax.jms.Session#setMessageListener(javax.jms.MessageListener) 510 * @see javax.jms.ServerSessionPool 511 * @see javax.jms.ServerSession 512 */ 513 public MessageListener getMessageListener() throws JMSException { 514 checkClosed(); 515 return this.messageListener; 516 } 517 518 /** 519 * Sets the session's distinguished message listener (optional). 520 * <P> 521 * When the distinguished message listener is set, no other form of message receipt in the session can be used; 522 * however, all forms of sending messages are still supported. 523 * <P> 524 * This is an expert facility not used by regular JMS clients. 525 * 526 * @param listener the message listener to associate with this session 527 * @throws JMSException if the JMS provider fails to set the message listener due to an internal error. 528 * @see javax.jms.Session#getMessageListener() 529 * @see javax.jms.ServerSessionPool 530 * @see javax.jms.ServerSession 531 */ 532 public void setMessageListener(MessageListener listener) throws JMSException { 533 checkClosed(); 534 this.messageListener = listener; 535 if (listener != null) { 536 messageExecutor.setDispatchedBySessionPool(true); 537 } 538 } 539 540 /** 541 * Optional operation, intended to be used only by Application Servers, not by ordinary JMS clients. 542 * 543 * @see javax.jms.ServerSession 544 */ 545 public void run() { 546 ActiveMQMessage message; 547 while ((message = messageExecutor.dequeueNoWait()) != null) { 548 if( deliveryListener!=null ) 549 deliveryListener.beforeDelivery(this, message); 550 beforeMessageDelivered(message); 551 deliver(message); 552 if( deliveryListener!=null ) 553 deliveryListener.afterDelivery(this, message); 554 } 555 } 556 557 /** 558 * Delivers a message to the messageListern 559 * @param message The message to deliver 560 */ 561 private void deliver(ActiveMQMessage message) { 562 if (!message.isExpired() && this.messageListener != null) { 563 try { 564 565 if( log.isDebugEnabled() ) { 566 log.debug("Message delivered to session message listener: "+message); 567 } 568 569 this.messageListener.onMessage(message); 570 this.afterMessageDelivered(true, message, true, false, true); 571 } 572 catch (Throwable t) { 573 log.info("Caught :" + t, t); 574 this.afterMessageDelivered(true, message, false, false, true); 575 } 576 } 577 else { 578 this.afterMessageDelivered(true, message, false, message.isExpired(), true); 579 } 580 } 581 582 /** 583 * Creates a <CODE>MessageProducer</CODE> to send messages to the specified destination. 584 * <P> 585 * A client uses a <CODE>MessageProducer</CODE> object to send messages to a destination. Since <CODE>Queue 586 * </CODE> and <CODE>Topic</CODE> both inherit from <CODE>Destination</CODE>, they can be used in the 587 * destination parameter to create a <CODE>MessageProducer</CODE> object. 588 * 589 * @param destination the <CODE>Destination</CODE> to send to, or null if this is a producer which does not have a 590 * specified destination. 591 * @return the MessageProducer 592 * @throws JMSException if the session fails to create a MessageProducer due to some internal error. 593 * @throws InvalidDestinationException if an invalid destination is specified. 594 * @since 1.1 595 */ 596 public MessageProducer createProducer(Destination destination) throws JMSException { 597 checkClosed(); 598 return new ActiveMQMessageProducer(this, ActiveMQMessageTransformation.transformDestination(destination)); 599 } 600 601 /** 602 * Creates a <CODE>MessageConsumer</CODE> for the specified destination. Since <CODE>Queue</CODE> and <CODE> 603 * Topic</CODE> both inherit from <CODE>Destination</CODE>, they can be used in the destination parameter to 604 * create a <CODE>MessageConsumer</CODE>. 605 * 606 * @param destination the <CODE>Destination</CODE> to access. 607 * @return the MessageConsumer 608 * @throws JMSException if the session fails to create a consumer due to some internal error. 609 * @throws InvalidDestinationException if an invalid destination is specified. 610 * @since 1.1 611 */ 612 public MessageConsumer createConsumer(Destination destination) throws JMSException { 613 checkClosed(); 614 int prefetch = destination instanceof Topic ? connection.getPrefetchPolicy().getTopicPrefetch() : connection 615 .getPrefetchPolicy().getQueuePrefetch(); 616 return new ActiveMQMessageConsumer(this, ActiveMQMessageTransformation.transformDestination(destination), "", 617 "", this.connection.getNextConsumerNumber(), prefetch, false, false); 618 } 619 620 /** 621 * Creates a <CODE>MessageConsumer</CODE> for the specified destination, using a message selector. Since <CODE> 622 * Queue</CODE> and <CODE>Topic</CODE> both inherit from <CODE>Destination</CODE>, they can be used in the 623 * destination parameter to create a <CODE>MessageConsumer</CODE>. 624 * <P> 625 * A client uses a <CODE>MessageConsumer</CODE> object to receive messages that have been sent to a destination. 626 * 627 * @param destination the <CODE>Destination</CODE> to access 628 * @param messageSelector only messages with properties matching the message selector expression are delivered. A 629 * value of null or an empty string indicates that there is no message selector for the message consumer. 630 * @return the MessageConsumer 631 * @throws JMSException if the session fails to create a MessageConsumer due to some internal error. 632 * @throws InvalidDestinationException if an invalid destination is specified. 633 * @throws InvalidSelectorException if the message selector is invalid. 634 * @since 1.1 635 */ 636 public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException { 637 checkClosed(); 638 int prefetch = destination instanceof Topic ? connection.getPrefetchPolicy().getTopicPrefetch() : connection 639 .getPrefetchPolicy().getQueuePrefetch(); 640 return new ActiveMQMessageConsumer(this, ActiveMQMessageTransformation.transformDestination(destination), "", 641 messageSelector, this.connection.getNextConsumerNumber(), prefetch, false, false); 642 } 643 644 /** 645 * Creates <CODE>MessageConsumer</CODE> for the specified destination, using a message selector. This method can 646 * specify whether messages published by its own connection should be delivered to it, if the destination is a 647 * topic. 648 * <P> 649 * Since <CODE>Queue</CODE> and <CODE>Topic</CODE> both inherit from <CODE>Destination</CODE>, they can be 650 * used in the destination parameter to create a <CODE>MessageConsumer</CODE>. 651 * <P> 652 * A client uses a <CODE>MessageConsumer</CODE> object to receive messages that have been published to a 653 * destination. 654 * <P> 655 * In some cases, a connection may both publish and subscribe to a topic. The consumer <CODE>NoLocal</CODE> 656 * attribute allows a consumer to inhibit the delivery of messages published by its own connection. The default 657 * value for this attribute is False. The <CODE>noLocal</CODE> value must be supported by destinations that are 658 * topics. 659 * 660 * @param destination the <CODE>Destination</CODE> to access 661 * @param messageSelector only messages with properties matching the message selector expression are delivered. A 662 * value of null or an empty string indicates that there is no message selector for the message consumer. 663 * @param NoLocal - if true, and the destination is a topic, inhibits the delivery of messages published by its own 664 * connection. The behavior for <CODE>NoLocal</CODE> is not specified if the destination is a queue. 665 * @return the MessageConsumer 666 * @throws JMSException if the session fails to create a MessageConsumer due to some internal error. 667 * @throws InvalidDestinationException if an invalid destination is specified. 668 * @throws InvalidSelectorException if the message selector is invalid. 669 * @since 1.1 670 */ 671 public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean NoLocal) 672 throws JMSException { 673 checkClosed(); 674 int prefetch = connection.getPrefetchPolicy().getTopicPrefetch(); 675 return new ActiveMQMessageConsumer(this, ActiveMQMessageTransformation.transformDestination(destination), "", 676 messageSelector, this.connection.getNextConsumerNumber(), prefetch, NoLocal, false); 677 } 678 679 /** 680 * Creates a queue identity given a <CODE>Queue</CODE> name. 681 * <P> 682 * This facility is provided for the rare cases where clients need to dynamically manipulate queue identity. It 683 * allows the creation of a queue identity with a provider-specific name. Clients that depend on this ability are 684 * not portable. 685 * <P> 686 * Note that this method is not for creating the physical queue. The physical creation of queues is an 687 * administrative task and is not to be initiated by the JMS API. The one exception is the creation of temporary 688 * queues, which is accomplished with the <CODE>createTemporaryQueue</CODE> method. 689 * 690 * @param queueName the name of this <CODE>Queue</CODE> 691 * @return a <CODE>Queue</CODE> with the given name 692 * @throws JMSException if the session fails to create a queue due to some internal error. 693 * @since 1.1 694 */ 695 public Queue createQueue(String queueName) throws JMSException { 696 checkClosed(); 697 return new ActiveMQQueue(queueName); 698 } 699 700 /** 701 * Creates a topic identity given a <CODE>Topic</CODE> name. 702 * <P> 703 * This facility is provided for the rare cases where clients need to dynamically manipulate topic identity. This 704 * allows the creation of a topic identity with a provider-specific name. Clients that depend on this ability are 705 * not portable. 706 * <P> 707 * Note that this method is not for creating the physical topic. The physical creation of topics is an 708 * administrative task and is not to be initiated by the JMS API. The one exception is the creation of temporary 709 * topics, which is accomplished with the <CODE>createTemporaryTopic</CODE> method. 710 * 711 * @param topicName the name of this <CODE>Topic</CODE> 712 * @return a <CODE>Topic</CODE> with the given name 713 * @throws JMSException if the session fails to create a topic due to some internal error. 714 * @since 1.1 715 */ 716 public Topic createTopic(String topicName) throws JMSException { 717 checkClosed(); 718 return new ActiveMQTopic(topicName); 719 } 720 721 /** 722 * Creates a <CODE>QueueBrowser</CODE> object to peek at the messages on the specified queue. 723 * 724 * @param queue the <CODE>queue</CODE> to access 725 * @exception InvalidDestinationException if an invalid destination is specified 726 * @since 1.1 727 */ 728 /** 729 * Creates a durable subscriber to the specified topic. 730 * <P> 731 * If a client needs to receive all the messages published on a topic, including the ones published while the 732 * subscriber is inactive, it uses a durable <CODE>TopicSubscriber</CODE>. The JMS provider retains a record of 733 * this durable subscription and insures that all messages from the topic's publishers are retained until they are 734 * acknowledged by this durable subscriber or they have expired. 735 * <P> 736 * Sessions with durable subscribers must always provide the same client identifier. In addition, each client must 737 * specify a name that uniquely identifies (within client identifier) each durable subscription it creates. Only one 738 * session at a time can have a <CODE>TopicSubscriber</CODE> for a particular durable subscription. 739 * <P> 740 * A client can change an existing durable subscription by creating a durable <CODE>TopicSubscriber</CODE> with 741 * the same name and a new topic and/or message selector. Changing a durable subscriber is equivalent to 742 * unsubscribing (deleting) the old one and creating a new one. 743 * <P> 744 * In some cases, a connection may both publish and subscribe to a topic. The subscriber <CODE>NoLocal</CODE> 745 * attribute allows a subscriber to inhibit the delivery of messages published by its own connection. The default 746 * value for this attribute is false. 747 * 748 * @param topic the non-temporary <CODE>Topic</CODE> to subscribe to 749 * @param name the name used to identify this subscription 750 * @return the TopicSubscriber 751 * @throws JMSException if the session fails to create a subscriber due to some internal error. 752 * @throws InvalidDestinationException if an invalid topic is specified. 753 * @since 1.1 754 */ 755 public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException { 756 checkClosed(); 757 return new ActiveMQTopicSubscriber(this, ActiveMQMessageTransformation.transformDestination(topic), name, "", 758 this.connection.getNextConsumerNumber(), this.connection.getPrefetchPolicy().getDurableTopicPrefetch(), 759 false, false); 760 } 761 762 /** 763 * Creates a durable subscriber to the specified topic, using a message selector and specifying whether messages 764 * published by its own connection should be delivered to it. 765 * <P> 766 * If a client needs to receive all the messages published on a topic, including the ones published while the 767 * subscriber is inactive, it uses a durable <CODE>TopicSubscriber</CODE>. The JMS provider retains a record of 768 * this durable subscription and insures that all messages from the topic's publishers are retained until they are 769 * acknowledged by this durable subscriber or they have expired. 770 * <P> 771 * Sessions with durable subscribers must always provide the same client identifier. In addition, each client must 772 * specify a name which uniquely identifies (within client identifier) each durable subscription it creates. Only 773 * one session at a time can have a <CODE>TopicSubscriber</CODE> for a particular durable subscription. An 774 * inactive durable subscriber is one that exists but does not currently have a message consumer associated with it. 775 * <P> 776 * A client can change an existing durable subscription by creating a durable <CODE>TopicSubscriber</CODE> with 777 * the same name and a new topic and/or message selector. Changing a durable subscriber is equivalent to 778 * unsubscribing (deleting) the old one and creating a new one. 779 * 780 * @param topic the non-temporary <CODE>Topic</CODE> to subscribe to 781 * @param name the name used to identify this subscription 782 * @param messageSelector only messages with properties matching the message selector expression are delivered. A 783 * value of null or an empty string indicates that there is no message selector for the message consumer. 784 * @param noLocal if set, inhibits the delivery of messages published by its own connection 785 * @return the Queue Browser 786 * @throws JMSException if the session fails to create a subscriber due to some internal error. 787 * @throws InvalidDestinationException if an invalid topic is specified. 788 * @throws InvalidSelectorException if the message selector is invalid. 789 * @since 1.1 790 */ 791 public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal) 792 throws JMSException { 793 checkClosed(); 794 return new ActiveMQTopicSubscriber(this, ActiveMQMessageTransformation.transformDestination(topic), name, 795 messageSelector, this.connection.getNextConsumerNumber(), this.connection.getPrefetchPolicy() 796 .getDurableTopicPrefetch(), noLocal, false); 797 } 798 799 /** 800 * Creates a <CODE>QueueBrowser</CODE> object to peek at the messages on the specified queue. 801 * 802 * @param queue the <CODE>queue</CODE> to access 803 * @return the Queue Browser 804 * @throws JMSException if the session fails to create a browser due to some internal error. 805 * @throws InvalidDestinationException if an invalid destination is specified 806 * @since 1.1 807 */ 808 public QueueBrowser createBrowser(Queue queue) throws JMSException { 809 checkClosed(); 810 return new ActiveMQQueueBrowser(this, (ActiveMQQueue) ActiveMQMessageTransformation.transformDestination(queue), "", 811 this.connection.getNextConsumerNumber()); 812 } 813 814 /** 815 * Creates a <CODE>QueueBrowser</CODE> object to peek at the messages on the specified queue using a message 816 * selector. 817 * 818 * @param queue the <CODE>queue</CODE> to access 819 * @param messageSelector only messages with properties matching the message selector expression are delivered. A 820 * value of null or an empty string indicates that there is no message selector for the message consumer. 821 * @return the Queue Browser 822 * @throws JMSException if the session fails to create a browser due to some internal error. 823 * @throws InvalidDestinationException if an invalid destination is specified 824 * @throws InvalidSelectorException if the message selector is invalid. 825 * @since 1.1 826 */ 827 public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException { 828 checkClosed(); 829 return new ActiveMQQueueBrowser(this, (ActiveMQQueue) ActiveMQMessageTransformation.transformDestination(queue), 830 messageSelector, this.connection.getNextConsumerNumber()); 831 } 832 833 /** 834 * Creates a <CODE>TemporaryQueue</CODE> object. Its lifetime will be that of the <CODE>Connection</CODE> unless 835 * it is deleted earlier. 836 * 837 * @return a temporary queue identity 838 * @throws JMSException if the session fails to create a temporary queue due to some internal error. 839 * @since 1.1 840 */ 841 public TemporaryQueue createTemporaryQueue() throws JMSException { 842 checkClosed(); 843 String tempQueueName = "TemporaryQueue-" 844 + ActiveMQDestination.createTemporaryName(this.connection.getInitializedClientID()); 845 tempQueueName += this.temporaryDestinationGenerator.generateId(); 846 ActiveMQTemporaryQueue tempQueue = new ActiveMQTemporaryQueue(tempQueueName); 847 tempQueue.setSessionCreatedBy(this); 848 this.connection.startTemporaryDestination(tempQueue); 849 return tempQueue; 850 } 851 852 /** 853 * Creates a <CODE>TemporaryTopic</CODE> object. Its lifetime will be that of the <CODE>Connection</CODE> unless 854 * it is deleted earlier. 855 * 856 * @return a temporary topic identity 857 * @throws JMSException if the session fails to create a temporary topic due to some internal error. 858 * @since 1.1 859 */ 860 public TemporaryTopic createTemporaryTopic() throws JMSException { 861 checkClosed(); 862 String tempTopicName = "TemporaryTopic-" 863 + ActiveMQDestination.createTemporaryName(this.connection.getInitializedClientID()); 864 tempTopicName += this.temporaryDestinationGenerator.generateId(); 865 ActiveMQTemporaryTopic tempTopic = new ActiveMQTemporaryTopic(tempTopicName); 866 tempTopic.setSessionCreatedBy(this); 867 this.connection.startTemporaryDestination(tempTopic); 868 return tempTopic; 869 } 870 871 /** 872 * Creates a <CODE>QueueReceiver</CODE> object to receive messages from the specified queue. 873 * 874 * @param queue the <CODE>Queue</CODE> to access 875 * @return @throws JMSException if the session fails to create a receiver due to some internal error. 876 * @throws JMSException 877 * @throws InvalidDestinationException if an invalid queue is specified. 878 */ 879 public QueueReceiver createReceiver(Queue queue) throws JMSException { 880 checkClosed(); 881 return new ActiveMQQueueReceiver(this, ActiveMQDestination.transformDestination(queue), "", this.connection 882 .getNextConsumerNumber(), this.connection.getPrefetchPolicy().getQueuePrefetch()); 883 } 884 885 /** 886 * Creates a <CODE>QueueReceiver</CODE> object to receive messages from the specified queue using a message 887 * selector. 888 * 889 * @param queue the <CODE>Queue</CODE> to access 890 * @param messageSelector only messages with properties matching the message selector expression are delivered. A 891 * value of null or an empty string indicates that there is no message selector for the message consumer. 892 * @return QueueReceiver 893 * @throws JMSException if the session fails to create a receiver due to some internal error. 894 * @throws InvalidDestinationException if an invalid queue is specified. 895 * @throws InvalidSelectorException if the message selector is invalid. 896 */ 897 public QueueReceiver createReceiver(Queue queue, String messageSelector) throws JMSException { 898 checkClosed(); 899 return new ActiveMQQueueReceiver(this, ActiveMQMessageTransformation.transformDestination(queue), 900 messageSelector, this.connection.getNextConsumerNumber(), this.connection.getPrefetchPolicy() 901 .getQueuePrefetch()); 902 } 903 904 /** 905 * Creates a <CODE>QueueSender</CODE> object to send messages to the specified queue. 906 * 907 * @param queue the <CODE>Queue</CODE> to access, or null if this is an unidentified producer 908 * @return QueueSender 909 * @throws JMSException if the session fails to create a sender due to some internal error. 910 * @throws InvalidDestinationException if an invalid queue is specified. 911 */ 912 public QueueSender createSender(Queue queue) throws JMSException { 913 checkClosed(); 914 return new ActiveMQQueueSender(this, ActiveMQMessageTransformation.transformDestination(queue)); 915 } 916 917 /** 918 * Creates a nondurable subscriber to the specified topic. <p/> 919 * <P> 920 * A client uses a <CODE>TopicSubscriber</CODE> object to receive messages that have been published to a topic. 921 * <p/> 922 * <P> 923 * Regular <CODE>TopicSubscriber</CODE> objects are not durable. They receive only messages that are published 924 * while they are active. <p/> 925 * <P> 926 * In some cases, a connection may both publish and subscribe to a topic. The subscriber <CODE>NoLocal</CODE> 927 * attribute allows a subscriber to inhibit the delivery of messages published by its own connection. The default 928 * value for this attribute is false. 929 * 930 * @param topic the <CODE>Topic</CODE> to subscribe to 931 * @return TopicSubscriber 932 * @throws JMSException if the session fails to create a subscriber due to some internal error. 933 * @throws InvalidDestinationException if an invalid topic is specified. 934 */ 935 public TopicSubscriber createSubscriber(Topic topic) throws JMSException { 936 checkClosed(); 937 return new ActiveMQTopicSubscriber(this, ActiveMQMessageTransformation.transformDestination(topic), null, null, 938 this.connection.getNextConsumerNumber(), this.connection.getPrefetchPolicy().getTopicPrefetch(), false, 939 false); 940 } 941 942 /** 943 * Creates a nondurable subscriber to the specified topic, using a message selector or specifying whether messages 944 * published by its own connection should be delivered to it. <p/> 945 * <P> 946 * A client uses a <CODE>TopicSubscriber</CODE> object to receive messages that have been published to a topic. 947 * <p/> 948 * <P> 949 * Regular <CODE>TopicSubscriber</CODE> objects are not durable. They receive only messages that are published 950 * while they are active. <p/> 951 * <P> 952 * Messages filtered out by a subscriber's message selector will never be delivered to the subscriber. From the 953 * subscriber's perspective, they do not exist. <p/> 954 * <P> 955 * In some cases, a connection may both publish and subscribe to a topic. The subscriber <CODE>NoLocal</CODE> 956 * attribute allows a subscriber to inhibit the delivery of messages published by its own connection. The default 957 * value for this attribute is false. 958 * 959 * @param topic the <CODE>Topic</CODE> to subscribe to 960 * @param messageSelector only messages with properties matching the message selector expression are delivered. A 961 * value of null or an empty string indicates that there is no message selector for the message consumer. 962 * @param noLocal if set, inhibits the delivery of messages published by its own connection 963 * @return TopicSubscriber 964 * @throws JMSException if the session fails to create a subscriber due to some internal error. 965 * @throws InvalidDestinationException if an invalid topic is specified. 966 * @throws InvalidSelectorException if the message selector is invalid. 967 */ 968 public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal) throws JMSException { 969 checkClosed(); 970 return new ActiveMQTopicSubscriber(this, ActiveMQMessageTransformation.transformDestination(topic), null, 971 messageSelector, this.connection.getNextConsumerNumber(), this.connection.getPrefetchPolicy() 972 .getTopicPrefetch(), noLocal, false); 973 } 974 975 /** 976 * Creates a publisher for the specified topic. <p/> 977 * <P> 978 * A client uses a <CODE>TopicPublisher</CODE> object to publish messages on a topic. Each time a client creates a 979 * <CODE>TopicPublisher</CODE> on a topic, it defines a new sequence of messages that have no ordering 980 * relationship with the messages it has previously sent. 981 * 982 * @param topic the <CODE>Topic</CODE> to publish to, or null if this is an unidentified producer 983 * @return TopicPublisher 984 * @throws JMSException if the session fails to create a publisher due to some internal error. 985 * @throws InvalidDestinationException if an invalid topic is specified. 986 */ 987 public TopicPublisher createPublisher(Topic topic) throws JMSException { 988 checkClosed(); 989 return new ActiveMQTopicPublisher(this, ActiveMQMessageTransformation.transformDestination(topic)); 990 } 991 992 /** 993 * Unsubscribes a durable subscription that has been created by a client. 994 * <P> 995 * This method deletes the state being maintained on behalf of the subscriber by its provider. 996 * <P> 997 * It is erroneous for a client to delete a durable subscription while there is an active <CODE>MessageConsumer 998 * </CODE> or <CODE>TopicSubscriber</CODE> for the subscription, or while a consumed message is part of a pending 999 * transaction or has not been acknowledged in the session. 1000 * 1001 * @param name the name used to identify this subscription 1002 * @throws JMSException if the session fails to unsubscribe to the durable subscription due to some internal error. 1003 * @throws InvalidDestinationException if an invalid subscription name is specified. 1004 * @since 1.1 1005 */ 1006 public void unsubscribe(String name) throws JMSException { 1007 checkClosed(); 1008 DurableUnsubscribe ds = new DurableUnsubscribe(); 1009 ds.setClientId(this.connection.getClientID()); 1010 ds.setSubscriberName(name); 1011 this.connection.syncSendPacket(ds); 1012 } 1013 1014 /** 1015 * Tests to see if the Message Dispatcher is a target for this message 1016 * 1017 * @param message the message to test 1018 * @return true if the Message Dispatcher can dispatch the message 1019 */ 1020 public boolean isTarget(ActiveMQMessage message) { 1021 for (Iterator i = this.consumers.iterator();i.hasNext();) { 1022 ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) i.next(); 1023 if (message.isConsumerTarget(consumer.getConsumerNumber())) { 1024 return true; 1025 } 1026 } 1027 return false; 1028 } 1029 1030 /** 1031 * Dispatch an ActiveMQMessage 1032 * 1033 * @param message 1034 */ 1035 public void dispatch(ActiveMQMessage message) { 1036 message.setMessageAcknowledge(this); 1037 messageExecutor.execute(message); 1038 } 1039 1040 /** 1041 * Acknowledges all consumed messages of the session of this consumed message. 1042 * <P> 1043 * All consumed JMS messages support the <CODE>acknowledge</CODE> method for use when a client has specified that 1044 * its JMS session's consumed messages are to be explicitly acknowledged. By invoking <CODE>acknowledge</CODE> on 1045 * a consumed message, a client acknowledges all messages consumed by the session that the message was delivered to. 1046 * <P> 1047 * Calls to <CODE>acknowledge</CODE> are ignored for both transacted sessions and sessions specified to use 1048 * implicit acknowledgement modes. 1049 * <P> 1050 * A client may individually acknowledge each message as it is consumed, or it may choose to acknowledge messages as 1051 * an application-defined group (which is done by calling acknowledge on the last received message of the group, 1052 * thereby acknowledging all messages consumed by the session.) 1053 * <P> 1054 * Messages that have been received but not acknowledged may be redelivered. 1055 * @param caller - the message calling acknowledge on the session 1056 * 1057 * @throws JMSException if the JMS provider fails to acknowledge the messages due to some internal error. 1058 * @throws javax.jms.IllegalStateException if this method is called on a closed session. 1059 * @see javax.jms.Session#CLIENT_ACKNOWLEDGE 1060 */ 1061 public void acknowledge(ActiveMQMessage caller) throws JMSException { 1062 checkClosed(); 1063 /** 1064 * Find the caller and ensure it is marked as consumed 1065 * This is to ensure acknowledge called by a 1066 * MessageListener works correctly 1067 */ 1068 ActiveMQMessage msg = (ActiveMQMessage)deliveredMessages.get(caller); 1069 if (msg != null){ 1070 msg.setMessageConsumed(true); 1071 } 1072 1073 doAcknowledge(false); 1074 } 1075 1076 protected void doAcknowledge(boolean isClosing) throws JMSException { 1077 if (!closed) { 1078 if (this.acknowledgeMode == Session.CLIENT_ACKNOWLEDGE) { 1079 ActiveMQMessage msg = null; 1080 while((msg = (ActiveMQMessage)deliveredMessages.removeFirst())!=null){ 1081 boolean messageConsumed = isClosing ? false : msg.isMessageConsumed(); 1082 if (!msg.isTransientConsumed()){ 1083 sendMessageAck(msg, messageConsumed, false); 1084 }else { 1085 if (!messageConsumed){ 1086 connection.addToTransientConsumedRedeliverCache(msg); 1087 } 1088 } 1089 } 1090 deliveredMessages.clear(); 1091 } 1092 } 1093 } 1094 1095 protected void beforeMessageDelivered(ActiveMQMessage message) { 1096 if (message != null && !closed) { 1097 deliveredMessages.add(message); 1098 } 1099 } 1100 1101 protected void afterMessageDelivered(boolean sendAcknowledge, ActiveMQMessage message, boolean messageConsumed, 1102 boolean messageExpired, boolean beforeCalled) { 1103 if (message != null && !closed) { 1104 if ((isClientAcknowledge() && !messageExpired) || (isTransacted() && message.isTransientConsumed())) { 1105 message.setMessageConsumed(messageConsumed); 1106 if (!beforeCalled) { 1107 deliveredMessages.add(message); 1108 } 1109 } 1110 else { 1111 if (beforeCalled) { 1112 deliveredMessages.remove(message); 1113 } 1114 } 1115 //don't send acks for expired messages unless sendAcknowledge is set 1116 //the sendAcknowledge flag is set for all messages expect those destined 1117 //for transient Topic subscribers 1118 if (sendAcknowledge && !isClientAcknowledge()) { 1119 try { 1120 doStartTransaction(); 1121 sendMessageAck(message,messageConsumed,messageExpired); 1122 } 1123 catch (JMSException e) { 1124 log.warn("failed to notify Broker that message is delivered", e); 1125 } 1126 } 1127 } 1128 } 1129 1130 /** 1131 * remove a temporary destination 1132 * @param destination 1133 * @throws JMSException if active subscribers already exist 1134 */ 1135 public void removeTemporaryDestination(ActiveMQDestination destination) throws JMSException{ 1136 this.connection.stopTemporaryDestination(destination); 1137 } 1138 1139 private void sendMessageAck(ActiveMQMessage message, boolean messageConsumed, boolean messageExpired) 1140 throws JMSException { 1141 if (message.isMessagePart()) { 1142 ActiveMQMessage[] parts = (ActiveMQMessage[]) connection.getAssemblies().remove(message.getParentMessageID()); 1143 if (parts != null) { 1144 for (int i = 0;i < parts.length;i++) { 1145 parts[i].setConsumerIdentifer(message.getConsumerIdentifer()); 1146 doSendMessageAck(parts[i], messageConsumed, messageExpired); 1147 } 1148 } 1149 else { 1150 JMSException jmsEx = new JMSException("Could not find parts for fragemented message: " + message); 1151 connection.onException(jmsEx); 1152 } 1153 } 1154 else { 1155 doSendMessageAck(message, messageConsumed, messageExpired); 1156 } 1157 } 1158 1159 private void doSendMessageAck(ActiveMQMessage message, boolean messageConsumed, boolean messageExpired) 1160 throws JMSException { 1161 if (message != null && !message.isAdvisory()) { 1162 MessageAck ack = new MessageAck(); 1163 ack.setConsumerId(message.getConsumerIdentifer()); 1164 ack.setTransactionId(transactionContext.getTransactionId()); 1165 ack.setExternalMessageId(message.isExternalMessageId()); 1166 ack.setMessageID(message.getJMSMessageID()); 1167 ack.setSequenceNumber(message.getSequenceNumber()); 1168 ack.setProducerKey(message.getProducerKey()); 1169 ack.setMessageRead(messageConsumed); 1170 ack.setDestination(message.getJMSActiveMQDestination()); 1171 ack.setPersistent(message.getJMSDeliveryMode() == DeliveryMode.PERSISTENT); 1172 ack.setExpired(messageExpired); 1173 ack.setSessionId(getSessionId()); 1174 this.connection.asyncSendPacket(ack); 1175 } 1176 } 1177 1178 /** 1179 * @param consumer 1180 * @throws JMSException 1181 */ 1182 protected void addConsumer(ActiveMQMessageConsumer consumer) throws JMSException { 1183 // ensure that the connection info is sent to the broker 1184 connection.sendConnectionInfoToBroker(); 1185 // lets add the stat 1186 if (consumer.isDurableSubscriber()) { 1187 stats.onCreateDurableSubscriber(); 1188 } 1189 ConsumerInfo info = createConsumerInfo(consumer); 1190 info.setStarted(true); 1191 //we add before notifying the server - as messages could 1192 //start to be dispatched before receipt from syncSend() 1193 //is returned 1194 this.consumers.add(consumer); 1195 if (started.get()){ 1196 connection.replayTransientConsumedRedeliveredMessages(this,consumer); 1197 } 1198 try { 1199 this.connection.syncSendPacket(info); 1200 } 1201 catch (JMSException jmsEx) { 1202 this.consumers.remove(consumer); 1203 throw jmsEx; 1204 } 1205 } 1206 1207 /** 1208 * @param consumer 1209 * @throws JMSException 1210 */ 1211 protected void removeConsumer(ActiveMQMessageConsumer consumer) throws JMSException { 1212 this.consumers.remove(consumer); 1213 // lets remove the stat 1214 if (consumer.isDurableSubscriber()) { 1215 stats.onRemoveDurableSubscriber(); 1216 } 1217 if (!closed) { 1218 ConsumerInfo info = createConsumerInfo(consumer); 1219 info.setStarted(false); 1220 this.connection.asyncSendPacket(info, false); 1221 } 1222 } 1223 1224 protected ConsumerInfo createConsumerInfo(ActiveMQMessageConsumer consumer) throws JMSException { 1225 ConsumerInfo info = new ConsumerInfo(); 1226 info.setConsumerId(consumer.consumerIdentifier); 1227 info.setClientId(connection.clientID); 1228 info.setSessionId(this.sessionId); 1229 info.setConsumerNo(consumer.consumerNumber); 1230 info.setPrefetchNumber(consumer.prefetchNumber); 1231 info.setDestination(consumer.destination); 1232 info.setNoLocal(consumer.noLocal); 1233 info.setBrowser(consumer.browser); 1234 info.setSelector(consumer.messageSelector); 1235 info.setStartTime(consumer.startTime); 1236 info.setConsumerName(consumer.consumerName); 1237 return info; 1238 } 1239 1240 /** 1241 * @param producer 1242 * @throws JMSException 1243 */ 1244 protected void addProducer(ActiveMQMessageProducer producer) throws JMSException { 1245 // ensure that the connection info is sent to the broker 1246 connection.sendConnectionInfoToBroker(); 1247 //start listening for advisories if the destination is temporary 1248 this.connection.startAdvisoryForTempDestination(producer.defaultDestination); 1249 producer.setProducerId(connection.handleIdGenerator.getNextShortSequence()); 1250 ProducerInfo info = createProducerInfo(producer); 1251 info.setStarted(true); 1252 this.connection.asyncSendPacket(info); 1253 this.producers.add(producer); 1254 } 1255 1256 /** 1257 * @param producer 1258 * @throws JMSException 1259 */ 1260 protected void removeProducer(ActiveMQMessageProducer producer) throws JMSException { 1261 this.producers.remove(producer); 1262 if (!closed) { 1263 this.connection.stopAdvisoryForTempDestination(producer.defaultDestination); 1264 ProducerInfo info = createProducerInfo(producer); 1265 info.setStarted(false); 1266 this.connection.asyncSendPacket(info, false); 1267 } 1268 } 1269 1270 protected ProducerInfo createProducerInfo(ActiveMQMessageProducer producer) throws JMSException { 1271 ProducerInfo info = new ProducerInfo(); 1272 info.setProducerId(producer.getProducerId()); 1273 info.setClientId(connection.clientID); 1274 info.setSessionId(this.sessionId); 1275 info.setDestination(producer.defaultDestination); 1276 info.setStartTime(producer.getStartTime()); 1277 return info; 1278 } 1279 1280 /** 1281 * Start this Session 1282 * @throws JMSException 1283 */ 1284 protected void start() throws JMSException { 1285 started.set(true); 1286 for (Iterator i = consumers.iterator(); i.hasNext();){ 1287 ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer)i.next(); 1288 connection.replayTransientConsumedRedeliveredMessages(this,consumer); 1289 } 1290 messageExecutor.start(); 1291 } 1292 1293 /** 1294 * Stop this Session 1295 */ 1296 protected void stop() { 1297 started.set(false); 1298 messageExecutor.stop(); 1299 } 1300 1301 /** 1302 * @return Returns the sessionId. 1303 */ 1304 protected short getSessionId() { 1305 return sessionId; 1306 } 1307 1308 /** 1309 * @param sessionId The sessionId to set. 1310 */ 1311 protected void setSessionId(short sessionId) { 1312 this.sessionId = sessionId; 1313 } 1314 1315 /** 1316 * @return Returns the startTime. 1317 */ 1318 protected long getStartTime() { 1319 return startTime; 1320 } 1321 1322 /** 1323 * @param startTime The startTime to set. 1324 */ 1325 protected void setStartTime(long startTime) { 1326 this.startTime = startTime; 1327 } 1328 1329 /** 1330 * send the message for dispatch by the broker 1331 * 1332 * @param producer 1333 * @param destination 1334 * @param message 1335 * @param deliveryMode 1336 * @param priority 1337 * @param timeToLive 1338 * @throws JMSException 1339 */ 1340 protected void send(ActiveMQMessageProducer producer, Destination destination, Message message, int deliveryMode, 1341 int priority, long timeToLive, boolean reuseMessageId) throws JMSException { 1342 checkClosed(); 1343 // ensure that the connection info is sent to the broker 1344 connection.sendConnectionInfoToBroker(); 1345 // tell the Broker we are about to start a new transaction 1346 doStartTransaction(); 1347 message.setJMSDestination(destination); 1348 message.setJMSDeliveryMode(deliveryMode); 1349 message.setJMSPriority(priority); 1350 long expiration = 0L; 1351 if (!producer.getDisableMessageTimestamp()) { 1352 long timeStamp = System.currentTimeMillis(); 1353 message.setJMSTimestamp(timeStamp); 1354 if (timeToLive > 0) { 1355 expiration = timeToLive + timeStamp; 1356 } 1357 } 1358 message.setJMSExpiration(expiration); 1359 String id = message.getJMSMessageID(); 1360 String producerKey = producer.getProducerMessageKey(); 1361 long sequenceNumber = producer.getIdGenerator().getNextSequence(); 1362 1363 if ((id == null || id.length() == 0) || !producer.getDisableMessageID() && !reuseMessageId) { 1364 message.setJMSMessageID(producerKey + sequenceNumber); 1365 } 1366 //transform to our own message format here 1367 ActiveMQMessage msg = ActiveMQMessageTransformation.transformMessage(message); 1368 if (connection.isCopyMessageOnSend()){ 1369 msg = msg.shallowCopy(); 1370 } 1371 //clear identity - incase forwared on 1372 msg.setJMSMessageIdentity(null); 1373 msg.setExternalMessageId(id != null); 1374 msg.setSequenceNumber(sequenceNumber); 1375 msg.setProducerKey(producerKey); 1376 msg.setTransactionId(transactionContext.getTransactionId()); 1377 msg.setJMSClientID(this.connection.clientID); 1378 msg.setMesssageHandle(producer.getProducerId()); 1379 //reset state as could be forwarded on 1380 msg.setJMSRedelivered(false); 1381 if (!connection.isInternalConnection()){ 1382 msg.clearBrokersVisited(); 1383 connection.validateDestination(msg.getJMSActiveMQDestination()); 1384 } 1385 1386 if (this.connection.isPrepareMessageBodyOnSend()){ 1387 msg.prepareMessageBody(); 1388 } 1389 //do message payload compression 1390 if (connection.isDoMessageCompression()){ 1391 try { 1392 msg.getBodyAsBytes(compression); 1393 } 1394 catch (IOException e) { 1395 JMSException jmsEx = new JMSException("Failed to compress message payload"); 1396 jmsEx.setLinkedException(e); 1397 throw jmsEx; 1398 } 1399 } 1400 boolean fragmentedMessage = connection.isDoMessageFragmentation(); 1401 if (fragmentedMessage && !msg.isMessagePart()){ 1402 try { 1403 ByteArrayFragmentation fragmentation = connection.getFragmentation(); 1404 fragmentedMessage = fragmentation.doFragmentation(msg.getBodyAsBytes()); 1405 if (fragmentedMessage){ 1406 ByteArray[] array = fragmentation.fragment(msg.getBodyAsBytes()); 1407 String parentMessageId = msg.getJMSMessageID(); 1408 for (int i = 0; i < array.length; i++){ 1409 ActiveMQMessage fragment = msg.shallowCopy(); 1410 fragment.setJMSMessageID(null); 1411 fragment.setMessagePart(true); 1412 fragment.setParentMessageID(parentMessageId); 1413 fragment.setNumberOfParts((short)array.length); 1414 fragment.setPartNumber((short)i); 1415 if (i != 0){ 1416 fragment.setSequenceNumber(producer.getIdGenerator().getNextSequence()); 1417 } 1418 fragment.setBodyAsBytes(array[i]); 1419 if (this.connection.isUseAsyncSend()) { 1420 this.connection.asyncSendPacket(fragment); 1421 } 1422 else { 1423 this.connection.syncSendPacket(fragment); 1424 } 1425 1426 } 1427 } 1428 }catch (IOException e) { 1429 JMSException jmsEx = new JMSException("Failed to fragment message payload"); 1430 jmsEx.setLinkedException(e); 1431 throw jmsEx; 1432 } 1433 } 1434 if (log.isDebugEnabled()) { 1435 log.debug("Sending message: " + msg); 1436 } 1437 1438 if (!fragmentedMessage){ 1439 if (this.connection.isUseAsyncSend()) { 1440 this.connection.asyncSendPacket(msg); 1441 } 1442 else { 1443 this.connection.syncSendPacket(msg); 1444 } 1445 } 1446 } 1447 1448 /** 1449 * Send TransactionInfo to indicate transaction has started 1450 * 1451 * @throws JMSException if some internal error occurs 1452 */ 1453 protected void doStartTransaction() throws JMSException { 1454 if (getTransacted() && !transactionContext.isInXATransaction()) { 1455 transactionContext.begin(); 1456 } 1457 } 1458 1459 protected void setSessionConsumerDispatchState(int value) throws JMSException { 1460 if (consumerDispatchState != ActiveMQSession.CONSUMER_DISPATCH_UNSET && value != consumerDispatchState) { 1461 String errorStr = "Cannot mix consumer dispatching on a session - already: "; 1462 if (value == ActiveMQSession.CONSUMER_DISPATCH_SYNC) { 1463 errorStr += "synchronous"; 1464 } 1465 else { 1466 errorStr += "asynchronous"; 1467 } 1468 throw new IllegalStateException(errorStr); 1469 } 1470 consumerDispatchState = value; 1471 } 1472 1473 protected void redeliverUnacknowledgedMessages() { 1474 redeliverUnacknowledgedMessages(false); 1475 } 1476 1477 protected void redeliverUnacknowledgedMessages(boolean onlyDeliverTransientConsumed) { 1478 messageExecutor.stop(); 1479 LinkedList replay = new LinkedList(); 1480 Object obj = null; 1481 while ((obj = deliveredMessages.removeFirst()) != null) { 1482 replay.add(obj); 1483 } 1484 1485 deliveredMessages.clear(); 1486 if (!replay.isEmpty()) { 1487 for (ListIterator i = replay.listIterator(replay.size());i.hasPrevious();) { 1488 ActiveMQMessage msg = (ActiveMQMessage) i.previous(); 1489 if (!onlyDeliverTransientConsumed || msg.isTransientConsumed()) { 1490 msg.setJMSRedelivered(true); 1491 msg.incrementDeliveryCount(); 1492 messageExecutor.executeFirst(msg); 1493 } 1494 } 1495 } 1496 replay.clear(); 1497 messageExecutor.start(); 1498 } 1499 1500 protected void clearMessagesInProgress() { 1501 messageExecutor.clearMessagesInProgress(); 1502 for (Iterator i = consumers.iterator();i.hasNext();) { 1503 ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) i.next(); 1504 consumer.clearMessagesInProgress(); 1505 } 1506 } 1507 1508 public boolean hasUncomsumedMessages() { 1509 return messageExecutor.hasUncomsumedMessages(); 1510 } 1511 1512 public List getUnconsumedMessages() { 1513 return messageExecutor.getUnconsumedMessages(); 1514 } 1515 1516 public boolean isTransacted() { 1517 return this.acknowledgeMode == Session.SESSION_TRANSACTED; 1518 } 1519 1520 protected boolean isClientAcknowledge() { 1521 return this.acknowledgeMode == Session.CLIENT_ACKNOWLEDGE; 1522 } 1523 1524 /** 1525 * @return Returns the internalSession. 1526 */ 1527 public boolean isInternalSession() { 1528 return internalSession; 1529 } 1530 /** 1531 * @param internalSession The internalSession to set. 1532 */ 1533 public void setInternalSession(boolean internalSession) { 1534 this.internalSession = internalSession; 1535 } 1536 1537 public DeliveryListener getDeliveryListener() { 1538 return deliveryListener; 1539 } 1540 1541 1542 public void setDeliveryListener(DeliveryListener deliveryListener) { 1543 this.deliveryListener = deliveryListener; 1544 } 1545 1546 }