001 /** 002 * 003 * Copyright 2004 Protique Ltd 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 * 017 **/ 018 019 package org.activemq; 020 021 import java.io.IOException; 022 import java.util.Iterator; 023 import java.util.Map; 024 025 import javax.jms.Connection; 026 import javax.jms.ConnectionConsumer; 027 import javax.jms.ConnectionMetaData; 028 import javax.jms.DeliveryMode; 029 import javax.jms.Destination; 030 import javax.jms.ExceptionListener; 031 import javax.jms.IllegalStateException; 032 import javax.jms.JMSException; 033 import javax.jms.Queue; 034 import javax.jms.QueueConnection; 035 import javax.jms.QueueSession; 036 import javax.jms.ServerSessionPool; 037 import javax.jms.Session; 038 import javax.jms.Topic; 039 import javax.jms.TopicConnection; 040 import javax.jms.TopicSession; 041 import javax.jms.XAConnection; 042 043 import org.activemq.advisories.TempDestinationAdvisor; 044 import org.activemq.advisories.TempDestinationAdvisoryEvent; 045 import org.activemq.capacity.CapacityMonitorEvent; 046 import org.activemq.capacity.CapacityMonitorEventListener; 047 import org.activemq.filter.AndFilter; 048 import org.activemq.filter.Filter; 049 import org.activemq.filter.FilterFactory; 050 import org.activemq.filter.FilterFactoryImpl; 051 import org.activemq.filter.NoLocalFilter; 052 import org.activemq.io.util.ByteArray; 053 import org.activemq.io.util.ByteArrayCompression; 054 import org.activemq.io.util.ByteArrayFragmentation; 055 import org.activemq.io.util.MemoryBoundedObjectManager; 056 import org.activemq.io.util.MemoryBoundedQueue; 057 import org.activemq.io.util.MemoryBoundedQueueManager; 058 import org.activemq.management.JMSConnectionStatsImpl; 059 import org.activemq.management.JMSStatsImpl; 060 import org.activemq.management.StatsCapable; 061 import org.activemq.management.StatsImpl; 062 import org.activemq.message.ActiveMQDestination; 063 import org.activemq.message.ActiveMQMessage; 064 import org.activemq.message.ActiveMQObjectMessage; 065 import org.activemq.message.BrokerAdminCommand; 066 import org.activemq.message.CapacityInfo; 067 import org.activemq.message.CleanupConnectionInfo; 068 import org.activemq.message.ConnectionInfo; 069 import org.activemq.message.ConsumerInfo; 070 import org.activemq.message.Packet; 071 import org.activemq.message.PacketListener; 072 import org.activemq.message.ProducerInfo; 073 import org.activemq.message.Receipt; 074 import org.activemq.message.ResponseReceipt; 075 import org.activemq.message.SessionInfo; 076 import org.activemq.message.TransactionInfo; 077 import org.activemq.message.WireFormatInfo; 078 import org.activemq.message.XATransactionInfo; 079 import org.activemq.transport.TransportChannel; 080 import org.activemq.transport.TransportStatusEvent; 081 import org.activemq.transport.TransportStatusEventListener; 082 import org.activemq.util.IdGenerator; 083 import org.activemq.util.JMSExceptionHelper; 084 import org.apache.commons.logging.Log; 085 import org.apache.commons.logging.LogFactory; 086 087 import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap; 088 import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArrayList; 089 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean; 090 import EDU.oswego.cs.dl.util.concurrent.SynchronizedInt; 091 092 /** 093 * A <CODE>Connection</CODE> object is a client's active connection to its JMS 094 * provider. It typically allocates provider resources outside the Java virtual 095 * machine (JVM). 096 * <P> 097 * Connections support concurrent use. 098 * <P> 099 * A connection serves several purposes: 100 * <UL> 101 * <LI>It encapsulates an open connection with a JMS provider. It typically 102 * represents an open TCP/IP socket between a client and the service provider 103 * software. 104 * <LI>Its creation is where client authentication takes place. 105 * <LI>It can specify a unique client identifier. 106 * <LI>It provides a <CODE>ConnectionMetaData</CODE> object. 107 * <LI>It supports an optional <CODE>ExceptionListener</CODE> object. 108 * </UL> 109 * <P> 110 * Because the creation of a connection involves setting up authentication and 111 * communication, a connection is a relatively heavyweight object. Most clients 112 * will do all their messaging with a single connection. Other more advanced 113 * applications may use several connections. The JMS API does not architect a 114 * reason for using multiple connections; however, there may be operational 115 * reasons for doing so. 116 * <P> 117 * A JMS client typically creates a connection, one or more sessions, and a 118 * number of message producers and consumers. When a connection is created, it 119 * is in stopped mode. That means that no messages are being delivered. 120 * <P> 121 * It is typical to leave the connection in stopped mode until setup is complete 122 * (that is, until all message consumers have been created). At that point, the 123 * client calls the connection's <CODE>start</CODE> method, and messages begin 124 * arriving at the connection's consumers. This setup convention minimizes any 125 * client confusion that may result from asynchronous message delivery while the 126 * client is still in the process of setting itself up. 127 * <P> 128 * A connection can be started immediately, and the setup can be done 129 * afterwards. Clients that do this must be prepared to handle asynchronous 130 * message delivery while they are still in the process of setting up. 131 * <P> 132 * A message producer can send messages while a connection is stopped. <p/>This 133 * class is also a <CODE>TopicConnection </CODE>. A <CODE>TopicConnection</CODE> 134 * object is an active connection to a publish/subscribe JMS provider. A client 135 * uses a <CODE>TopicConnection</CODE> object to create one or more <CODE>TopicSession</CODE> 136 * objects for producing and consuming messages. 137 * <P> 138 * A <CODE>TopicConnection</CODE> can be used to create a <CODE>TopicSession</CODE>, 139 * from which specialized topic-related objects can be created. A more general, 140 * and recommended approach is to use the <CODE>Connection </CODE> object. 141 * <P> 142 * <p/><p/>This class is also a <CODE>QueueConnection</CODE>. A A <CODE>QueueConnection</CODE> 143 * object is an active connection to a point-to-point JMS provider. A client 144 * uses a <CODE>QueueConnection</CODE> object to create one or more <CODE>QueueSession</CODE> 145 * objects for producing and consuming messages. 146 * <P> 147 * A <CODE>QueueConnection</CODE> can be used to create a <CODE>QueueSession</CODE>, 148 * from which specialized queue-related objects can be created. A more general, 149 * and recommended, approach is to use the <CODE>Connection </CODE> object. 150 * <P> 151 * A <CODE>QueueConnection</CODE> cannot be used to create objects specific to 152 * the publish/subscribe domain. The <CODE>createDurableConnectionConsumer</CODE> 153 * method inherits from <CODE>Connection</CODE>, but must throw an <CODE>IllegalStateException</CODE> 154 * if used from <CODE>QueueConnection</CODE>. // * 155 * 156 * @version $Revision: 1.1.1.1 $ 157 * @see javax.jms.Connection 158 * @see javax.jms.ConnectionFactory 159 * @see javax.jms.QueueConnection 160 * @see javax.jms.TopicConnection 161 * @see javax.jms.TopicConnectionFactory 162 * @see javax.jms.QueueConnection 163 * @see javax.jms.QueueConnectionFactory 164 */ 165 public class ActiveMQConnection implements Connection, PacketListener, 166 ExceptionListener, TopicConnection, QueueConnection, StatsCapable, 167 CapacityMonitorEventListener, TransportStatusEventListener, Closeable { 168 169 /** 170 * Default UserName for the Connection 171 */ 172 public static final String DEFAULT_USER = "defaultUser"; 173 174 /** 175 * Default URL for the ActiveMQ Broker 176 */ 177 public static final String DEFAULT_BROKER_URL = "tcp://localhost:61616"; 178 179 /** 180 * Default client URL. If using a message broker in a hub(s)/spoke 181 * architecture - use the DEFAULT_BROKER_URL 182 * 183 * @see ActiveMQConnection#DEFAULT_BROKER_URL 184 */ 185 public static final String DEFAULT_URL = "peer://development"; 186 187 /** 188 * Default Password for the Connection 189 */ 190 public static final String DEFAULT_PASSWORD = "defaultPassword"; 191 192 private static final Log log = LogFactory.getLog(ActiveMQConnection.class); 193 194 private static final int DEFAULT_CONNECTION_MEMORY_LIMIT = 10 * 1024 * 1024; 195 196 // properties 197 private ActiveMQConnectionFactory factory; 198 199 private String userName; 200 201 private String password; 202 203 protected String clientID; 204 205 private int sendCloseTimeout = 2000; 206 207 private TransportChannel transportChannel; 208 209 private ExceptionListener exceptionListener; 210 211 private ActiveMQPrefetchPolicy prefetchPolicy; 212 213 private JMSStatsImpl factoryStats; 214 215 private MemoryBoundedObjectManager memoryManager; 216 217 private MemoryBoundedQueueManager boundedQueueManager; 218 219 protected IdGenerator handleIdGenerator; 220 221 private IdGenerator clientIdGenerator; 222 223 protected IdGenerator packetIdGenerator; 224 225 private IdGenerator sessionIdGenerator; 226 227 private JMSConnectionStatsImpl stats; 228 229 // internal state 230 private CopyOnWriteArrayList sessions; 231 232 private CopyOnWriteArrayList messageDispatchers; 233 234 private CopyOnWriteArrayList connectionConsumers; 235 236 private SynchronizedInt consumerNumberGenerator; 237 238 private ActiveMQConnectionMetaData connectionMetaData; 239 240 private boolean closed; 241 242 private SynchronizedBoolean started; 243 244 private boolean clientIDSet; 245 246 private boolean isConnectionInfoSentToBroker; 247 248 private boolean isTransportOK; 249 250 private boolean startedTransport; 251 252 private long startTime; 253 254 private long flowControlSleepTime = 0; 255 private Object flowControlMutex = new Object(); 256 257 private boolean quickClose; 258 259 private boolean internalConnection;// used for notifying that the 260 // connection is used for networks etc. 261 262 private boolean userSpecifiedClientID; 263 264 /** 265 * Should we use an async send for persistent non transacted messages ? 266 */ 267 protected boolean useAsyncSend = true; 268 269 private int sendConnectionInfoTimeout = 30000; 270 271 private boolean disableTimeStampsByDefault = false; 272 273 private boolean J2EEcompliant = true; 274 275 private boolean prepareMessageBodyOnSend = true; 276 277 private boolean copyMessageOnSend = true; 278 279 // compression and fragmentation variables 280 281 private boolean doMessageCompression = true; 282 283 private int messageCompressionLimit = ByteArrayCompression.DEFAULT_COMPRESSION_LIMIT;// data 284 // size 285 // above 286 // which 287 // compression 288 // will 289 // be 290 // used 291 292 private int messageCompressionLevel = ByteArrayCompression.DEFAULT_COMPRESSION_LEVEL; 293 294 private int messageCompressionStrategy = ByteArrayCompression.DEFAULT_COMPRESSION_STRATEGY;// default 295 // compression 296 // strategy 297 298 private boolean doMessageFragmentation = false; 299 300 private int messageFragmentationLimit = ByteArrayFragmentation.DEFAULT_FRAGMENTATION_LIMIT; 301 302 private boolean cachingEnabled = true; 303 304 private boolean optimizedMessageDispatch = false; 305 306 private CopyOnWriteArrayList transientConsumedRedeliverCache; 307 308 private FilterFactory filterFactory; 309 310 private Map tempDestinationMap; 311 312 private Map validDestinationsMap; 313 314 private String resourceManagerId; 315 //used for assembling message fragments 316 private final ConcurrentHashMap assemblies= new ConcurrentHashMap(); 317 private final ByteArrayFragmentation fragmentation = new ByteArrayFragmentation(); 318 319 /** 320 * A static helper method to create a new connection 321 * 322 * @return an ActiveMQConnection 323 * @throws JMSException 324 */ 325 public static ActiveMQConnection makeConnection() throws JMSException { 326 ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(); 327 return (ActiveMQConnection) factory.createConnection(); 328 } 329 330 /** 331 * A static helper method to create a new connection 332 * 333 * @param uri 334 * @return and ActiveMQConnection 335 * @throws JMSException 336 */ 337 public static ActiveMQConnection makeConnection(String uri) 338 throws JMSException { 339 ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(uri); 340 return (ActiveMQConnection) factory.createConnection(); 341 } 342 343 /** 344 * A static helper method to create a new connection 345 * 346 * @param user 347 * @param password 348 * @param uri 349 * @return an ActiveMQConnection 350 * @throws JMSException 351 */ 352 public static ActiveMQConnection makeConnection(String user, 353 String password, String uri) throws JMSException { 354 ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(user, 355 password, uri); 356 return (ActiveMQConnection) factory.createConnection(); 357 } 358 359 /** 360 * Constructs a connection from an existing TransportChannel and 361 * user/password. 362 * 363 * @param factory 364 * @param theUserName 365 * the users name 366 * @param thePassword 367 * the password 368 * @param transportChannel 369 * the transport channel to communicate with the server 370 * @throws JMSException 371 */ 372 public ActiveMQConnection(ActiveMQConnectionFactory factory, 373 String theUserName, String thePassword, 374 TransportChannel transportChannel) throws JMSException { 375 this(factory, theUserName, thePassword); 376 this.transportChannel = transportChannel; 377 this.transportChannel.setPacketListener(this); 378 this.transportChannel.setExceptionListener(this); 379 this.transportChannel.addTransportStatusEventListener(this); 380 this.isTransportOK = true; 381 } 382 383 protected ActiveMQConnection(ActiveMQConnectionFactory factory, 384 String theUserName, String thePassword) { 385 this.factory = factory; 386 this.userName = theUserName; 387 this.password = thePassword; 388 this.clientIdGenerator = new IdGenerator(); 389 this.packetIdGenerator = new IdGenerator(); 390 this.handleIdGenerator = new IdGenerator(); 391 this.sessionIdGenerator = new IdGenerator(); 392 this.consumerNumberGenerator = new SynchronizedInt(0); 393 this.sessions = new CopyOnWriteArrayList(); 394 this.messageDispatchers = new CopyOnWriteArrayList(); 395 this.connectionConsumers = new CopyOnWriteArrayList(); 396 this.connectionMetaData = new ActiveMQConnectionMetaData(); 397 this.started = new SynchronizedBoolean(false); 398 this.startTime = System.currentTimeMillis(); 399 this.prefetchPolicy = new ActiveMQPrefetchPolicy(); 400 this.memoryManager = new MemoryBoundedObjectManager(clientID, 401 DEFAULT_CONNECTION_MEMORY_LIMIT); 402 this.boundedQueueManager = new MemoryBoundedQueueManager(memoryManager); 403 this.memoryManager.addCapacityEventListener(this); 404 boolean transactional = this instanceof XAConnection; 405 factoryStats = factory.getFactoryStats(); 406 factoryStats.addConnection(this); 407 stats = new JMSConnectionStatsImpl(sessions, transactional); 408 this.transientConsumedRedeliverCache = new CopyOnWriteArrayList(); 409 this.tempDestinationMap = new ConcurrentHashMap(); 410 this.validDestinationsMap = new ConcurrentHashMap(); 411 factory.onConnectionCreate(this); 412 } 413 414 /** 415 * @return statistics for this Connection 416 */ 417 public StatsImpl getStats() { 418 return stats; 419 } 420 421 /** 422 * @return a number unique for this connection 423 */ 424 public JMSConnectionStatsImpl getConnectionStats() { 425 return stats; 426 } 427 428 /** 429 * Creates a <CODE>Session</CODE> object. 430 * 431 * @param transacted 432 * indicates whether the session is transacted 433 * @param acknowledgeMode 434 * indicates whether the consumer or the client will acknowledge 435 * any messages it receives; ignored if the session is 436 * transacted. Legal values are 437 * <code>Session.AUTO_ACKNOWLEDGE</code>, 438 * <code>Session.CLIENT_ACKNOWLEDGE</code>, and 439 * <code>Session.DUPS_OK_ACKNOWLEDGE</code>. 440 * @return a newly created session 441 * @throws JMSException 442 * if the <CODE>Connection</CODE> object fails to create a 443 * session due to some internal error or lack of support for the 444 * specific transaction and acknowledgement mode. 445 * @see Session#AUTO_ACKNOWLEDGE 446 * @see Session#CLIENT_ACKNOWLEDGE 447 * @see Session#DUPS_OK_ACKNOWLEDGE 448 * @since 1.1 449 */ 450 public Session createSession(boolean transacted, int acknowledgeMode) 451 throws JMSException { 452 checkClosed(); 453 sendConnectionInfoToBroker(); 454 return new ActiveMQSession( 455 this, 456 (transacted ? Session.SESSION_TRANSACTED 457 : (acknowledgeMode == Session.SESSION_TRANSACTED ? Session.AUTO_ACKNOWLEDGE 458 : acknowledgeMode))); 459 } 460 461 /** 462 * Creates a <CODE>Session</CODE> object. 463 * 464 * @param transacted 465 * indicates whether the session is transacted 466 * @param acknowledgeMode 467 * indicates whether the consumer or the client will acknowledge 468 * any messages it receives; ignored if the session is 469 * transacted. Legal values are 470 * <code>Session.AUTO_ACKNOWLEDGE</code>, 471 * <code>Session.CLIENT_ACKNOWLEDGE</code>, and 472 * <code>Session.DUPS_OK_ACKNOWLEDGE</code>. 473 * @param optimizedDispatch 474 * @return a newly created session 475 * @throws JMSException 476 * if the <CODE>Connection</CODE> object fails to create a 477 * session due to some internal error or lack of support for the 478 * specific transaction and acknowledgement mode. 479 * @see Session#AUTO_ACKNOWLEDGE 480 * @see Session#CLIENT_ACKNOWLEDGE 481 * @see Session#DUPS_OK_ACKNOWLEDGE 482 * @since 1.1 483 */ 484 public Session createSession(boolean transacted, int acknowledgeMode, 485 boolean optimizedDispatch) throws JMSException { 486 checkClosed(); 487 sendConnectionInfoToBroker(); 488 return new ActiveMQSession(this, 489 (transacted ? Session.SESSION_TRANSACTED : acknowledgeMode), 490 optimizedDispatch); 491 } 492 493 /** 494 * Gets the client identifier for this connection. 495 * <P> 496 * This value is specific to the JMS provider. It is either preconfigured by 497 * an administrator in a <CODE> ConnectionFactory</CODE> object or assigned 498 * dynamically by the application by calling the <code>setClientID</code> 499 * method. 500 * 501 * @return the unique client identifier 502 * @throws JMSException 503 * if the JMS provider fails to return the client ID for this 504 * connection due to some internal error. 505 */ 506 public String getClientID() throws JMSException { 507 checkClosed(); 508 return this.clientID; 509 } 510 511 /** 512 * Sets the client identifier for this connection. 513 * <P> 514 * The preferred way to assign a JMS client's client identifier is for it to 515 * be configured in a client-specific <CODE>ConnectionFactory</CODE> 516 * object and transparently assigned to the <CODE>Connection</CODE> object 517 * it creates. 518 * <P> 519 * Alternatively, a client can set a connection's client identifier using a 520 * provider-specific value. The facility to set a connection's client 521 * identifier explicitly is not a mechanism for overriding the identifier 522 * that has been administratively configured. It is provided for the case 523 * where no administratively specified identifier exists. If one does exist, 524 * an attempt to change it by setting it must throw an <CODE>IllegalStateException</CODE>. 525 * If a client sets the client identifier explicitly, it must do so 526 * immediately after it creates the connection and before any other action 527 * on the connection is taken. After this point, setting the client 528 * identifier is a programming error that should throw an <CODE>IllegalStateException</CODE>. 529 * <P> 530 * The purpose of the client identifier is to associate a connection and its 531 * objects with a state maintained on behalf of the client by a provider. 532 * The only such state identified by the JMS API is that required to support 533 * durable subscriptions. 534 * <P> 535 * If another connection with the same <code>clientID</code> is already 536 * running when this method is called, the JMS provider should detect the 537 * duplicate ID and throw an <CODE>InvalidClientIDException</CODE>. 538 * 539 * @param newClientID 540 * the unique client identifier 541 * @throws JMSException 542 * if the JMS provider fails to set the client ID for this 543 * connection due to some internal error. 544 * @throws javax.jms.InvalidClientIDException 545 * if the JMS client specifies an invalid or duplicate client 546 * ID. 547 * @throws javax.jms.IllegalStateException 548 * if the JMS client attempts to set a connection's client ID at 549 * the wrong time or when it has been administratively 550 * configured. 551 */ 552 public void setClientID(String newClientID) throws JMSException { 553 if (this.clientIDSet) { 554 throw new IllegalStateException("The clientID has already been set"); 555 } 556 if (this.isConnectionInfoSentToBroker) { 557 throw new IllegalStateException( 558 "Setting clientID on a used Connection is not allowed"); 559 } 560 checkClosed(); 561 this.clientID = newClientID; 562 this.userSpecifiedClientID = true; 563 ensureClientIDInitialised(); 564 } 565 566 /** 567 * Gets the metadata for this connection. 568 * 569 * @return the connection metadata 570 * @throws JMSException 571 * if the JMS provider fails to get the connection metadata for 572 * this connection. 573 * @see javax.jms.ConnectionMetaData 574 */ 575 public ConnectionMetaData getMetaData() throws JMSException { 576 checkClosed(); 577 return this.connectionMetaData; 578 } 579 580 /** 581 * Gets the <CODE>ExceptionListener</CODE> object for this connection. Not 582 * every <CODE>Connection</CODE> has an <CODE>ExceptionListener</CODE> 583 * associated with it. 584 * 585 * @return the <CODE>ExceptionListener</CODE> for this connection, or 586 * null. if no <CODE>ExceptionListener</CODE> is associated with 587 * this connection. 588 * @throws JMSException 589 * if the JMS provider fails to get the <CODE>ExceptionListener</CODE> 590 * for this connection. 591 * @see javax.jms.Connection#setExceptionListener(ExceptionListener) 592 */ 593 public ExceptionListener getExceptionListener() throws JMSException { 594 checkClosed(); 595 return this.exceptionListener; 596 } 597 598 /** 599 * Sets an exception listener for this connection. 600 * <P> 601 * If a JMS provider detects a serious problem with a connection, it informs 602 * the connection's <CODE> ExceptionListener</CODE>, if one has been 603 * registered. It does this by calling the listener's <CODE>onException 604 * </CODE> method, passing it a <CODE>JMSException</CODE> object 605 * describing the problem. 606 * <P> 607 * An exception listener allows a client to be notified of a problem 608 * asynchronously. Some connections only consume messages, so they would 609 * have no other way to learn their connection has failed. 610 * <P> 611 * A connection serializes execution of its <CODE>ExceptionListener</CODE>. 612 * <P> 613 * A JMS provider should attempt to resolve connection problems itself 614 * before it notifies the client of them. 615 * 616 * @param listener 617 * the exception listener 618 * @throws JMSException 619 * if the JMS provider fails to set the exception listener for 620 * this connection. 621 */ 622 public void setExceptionListener(ExceptionListener listener) 623 throws JMSException { 624 checkClosed(); 625 this.exceptionListener = listener; 626 this.transportChannel.setExceptionListener(listener); 627 } 628 629 /** 630 * Starts (or restarts) a connection's delivery of incoming messages. A call 631 * to <CODE>start</CODE> on a connection that has already been started is 632 * ignored. 633 * 634 * @throws JMSException 635 * if the JMS provider fails to start message delivery due to 636 * some internal error. 637 * @see javax.jms.Connection#stop() 638 */ 639 public void start() throws JMSException { 640 checkClosed(); 641 if (started.commit(false, true)) { 642 // We have a change in connection info to send. 643 // send the Connection info again 644 sendConnectionInfoToBroker(sendConnectionInfoTimeout, true, false); 645 for (Iterator i = sessions.iterator(); i.hasNext();) { 646 ActiveMQSession s = (ActiveMQSession) i.next(); 647 s.start(); 648 } 649 } 650 } 651 652 /** 653 * @return true if this Connection is started 654 */ 655 protected boolean isStarted() { 656 return started.get(); 657 } 658 659 /** 660 * Temporarily stops a connection's delivery of incoming messages. Delivery 661 * can be restarted using the connection's <CODE>start</CODE> method. When 662 * the connection is stopped, delivery to all the connection's message 663 * consumers is inhibited: synchronous receives block, and messages are not 664 * delivered to message listeners. 665 * <P> 666 * This call blocks until receives and/or message listeners in progress have 667 * completed. 668 * <P> 669 * Stopping a connection has no effect on its ability to send messages. A 670 * call to <CODE>stop</CODE> on a connection that has already been stopped 671 * is ignored. 672 * <P> 673 * A call to <CODE>stop</CODE> must not return until delivery of messages 674 * has paused. This means that a client can rely on the fact that none of 675 * its message listeners will be called and that all threads of control 676 * waiting for <CODE>receive</CODE> calls to return will not return with a 677 * message until the connection is restarted. The receive timers for a 678 * stopped connection continue to advance, so receives may time out while 679 * the connection is stopped. 680 * <P> 681 * If message listeners are running when <CODE>stop</CODE> is invoked, the 682 * <CODE>stop</CODE> call must wait until all of them have returned before 683 * it may return. While these message listeners are completing, they must 684 * have the full services of the connection available to them. 685 * 686 * @throws JMSException 687 * if the JMS provider fails to stop message delivery due to 688 * some internal error. 689 * @see javax.jms.Connection#start() 690 */ 691 public void stop() throws JMSException { 692 checkClosed(); 693 if (started.commit(true, false)) { 694 for (Iterator i = sessions.iterator(); i.hasNext();) { 695 ActiveMQSession s = (ActiveMQSession) i.next(); 696 s.stop(); 697 } 698 sendConnectionInfoToBroker(2000, true, false); 699 } 700 } 701 702 /** 703 * Closes the connection. 704 * <P> 705 * Since a provider typically allocates significant resources outside the 706 * JVM on behalf of a connection, clients should close these resources when 707 * they are not needed. Relying on garbage collection to eventually reclaim 708 * these resources may not be timely enough. 709 * <P> 710 * There is no need to close the sessions, producers, and consumers of a 711 * closed connection. 712 * <P> 713 * Closing a connection causes all temporary destinations to be deleted. 714 * <P> 715 * When this method is invoked, it should not return until message 716 * processing has been shut down in an orderly fashion. This means that all 717 * message listeners that may have been running have returned, and that all 718 * pending receives have returned. A close terminates all pending message 719 * receives on the connection's sessions' consumers. The receives may return 720 * with a message or with null, depending on whether there was a message 721 * available at the time of the close. If one or more of the connection's 722 * sessions' message listeners is processing a message at the time when 723 * connection <CODE>close</CODE> is invoked, all the facilities of the 724 * connection and its sessions must remain available to those listeners 725 * until they return control to the JMS provider. 726 * <P> 727 * Closing a connection causes any of its sessions' transactions in progress 728 * to be rolled back. In the case where a session's work is coordinated by 729 * an external transaction manager, a session's <CODE>commit</CODE> and 730 * <CODE> rollback</CODE> methods are not used and the result of a closed 731 * session's work is determined later by the transaction manager. Closing a 732 * connection does NOT force an acknowledgment of client-acknowledged 733 * sessions. 734 * <P> 735 * Invoking the <CODE>acknowledge</CODE> method of a received message from 736 * a closed connection's session must throw an <CODE>IllegalStateException</CODE>. 737 * Closing a closed connection must NOT throw an exception. 738 * 739 * @throws JMSException 740 * if the JMS provider fails to close the connection due to some 741 * internal error. For example, a failure to release resources 742 * or to close a socket connection can cause this exception to 743 * be thrown. 744 */ 745 public void close() throws JMSException { 746 this.transportChannel.setPendingStop(true); 747 synchronized (this) { 748 if (!closed) { 749 memoryManager.removeCapacityEventListener(this); 750 try { 751 closeTemporaryDestinations(); 752 for (Iterator i = this.sessions.iterator(); i.hasNext();) { 753 ActiveMQSession s = (ActiveMQSession) i.next(); 754 s.close(); 755 } 756 for (Iterator i = this.connectionConsumers.iterator(); i 757 .hasNext();) { 758 ActiveMQConnectionConsumer c = (ActiveMQConnectionConsumer) i 759 .next(); 760 c.close(); 761 } 762 try { 763 sendConnectionInfoToBroker(sendCloseTimeout, true, true); 764 } catch (TimeoutExpiredException e) { 765 log 766 .warn("Failed to send close to broker, timeout expired of: " 767 + sendCloseTimeout + " millis"); 768 } 769 this.connectionConsumers.clear(); 770 this.messageDispatchers.clear(); 771 this.transportChannel.stop(); 772 } finally { 773 this.sessions.clear(); 774 started.set(false); 775 factory.onConnectionClose(this); 776 } 777 closed = true; 778 transientConsumedRedeliverCache.clear(); 779 validDestinationsMap.clear(); 780 factoryStats.removeConnection(this); 781 } 782 } 783 784 } 785 786 /** 787 * Tells the broker to terminate its VM. This can be used to cleanly terminate a broker running in 788 * a standalone java process. Server must have property enable.vm.shutdown=true defined 789 * to allow this to work. 790 */ 791 public void terminateBrokerVM() throws JMSException { 792 BrokerAdminCommand command = new BrokerAdminCommand(); 793 command.setCommand(BrokerAdminCommand.SHUTDOWN_SERVER_VM); 794 asyncSendPacket(command); 795 } 796 797 /** 798 * simply throws an exception if the Connection is already closed 799 * 800 * @throws JMSException 801 */ 802 protected synchronized void checkClosed() throws JMSException { 803 if (!startedTransport) { 804 startedTransport = true; 805 this.transportChannel.setCachingEnabled(isCachingEnabled()); 806 if (useAsyncSend == false) { 807 this.transportChannel.setNoDelay(true); 808 } 809 810 this.transportChannel.setUsedInternally(internalConnection); 811 this.transportChannel.start(); 812 if (transportChannel.doesSupportWireFormatVersioning()) { 813 WireFormatInfo info = new WireFormatInfo(); 814 info.setVersion(transportChannel.getCurrentWireFormatVersion()); 815 this.asyncSendPacket(info); 816 } 817 } 818 if (this.closed) { 819 throw new ConnectionClosedException(); 820 } 821 } 822 823 /** 824 * Creates a connection consumer for this connection (optional operation). 825 * This is an expert facility not used by regular JMS clients. 826 * 827 * @param destination 828 * the destination to access 829 * @param messageSelector 830 * only messages with properties matching the message selector 831 * expression are delivered. A value of null or an empty string 832 * indicates that there is no message selector for the message 833 * consumer. 834 * @param sessionPool 835 * the server session pool to associate with this connection 836 * consumer 837 * @param maxMessages 838 * the maximum number of messages that can be assigned to a 839 * server session at one time 840 * @return the connection consumer 841 * @throws JMSException 842 * if the <CODE>Connection</CODE> object fails to create a 843 * connection consumer due to some internal error or invalid 844 * arguments for <CODE>sessionPool</CODE> and <CODE>messageSelector</CODE>. 845 * @throws javax.jms.InvalidDestinationException 846 * if an invalid destination is specified. 847 * @throws javax.jms.InvalidSelectorException 848 * if the message selector is invalid. 849 * @see javax.jms.ConnectionConsumer 850 * @since 1.1 851 */ 852 public ConnectionConsumer createConnectionConsumer(Destination destination, 853 String messageSelector, ServerSessionPool sessionPool, 854 int maxMessages) throws JMSException { 855 checkClosed(); 856 ensureClientIDInitialised(); 857 ConsumerInfo info = new ConsumerInfo(); 858 info.setConsumerId(handleIdGenerator.generateId()); 859 info.setDestination(ActiveMQMessageTransformation 860 .transformDestination(destination)); 861 info.setSelector(messageSelector); 862 info.setConsumerNo(handleIdGenerator.getNextShortSequence()); 863 info.setClientId(clientID); 864 return new ActiveMQConnectionConsumer(this, sessionPool, info, 865 maxMessages); 866 } 867 868 /** 869 * Creates a connection consumer for this connection (optional operation). 870 * This is an expert facility not used by regular JMS clients. 871 * 872 * @param destination 873 * the destination to access 874 * @param messageSelector 875 * only messages with properties matching the message selector 876 * expression are delivered. A value of null or an empty string 877 * indicates that there is no message selector for the message 878 * consumer. 879 * @param sessionPool 880 * the server session pool to associate with this connection 881 * consumer 882 * @param maxMessages 883 * the maximum number of messages that can be assigned to a 884 * server session at one time 885 * @param noLocal 886 * set true if you want to filter out messages published locally 887 * 888 * @return the connection consumer 889 * @throws JMSException 890 * if the <CODE>Connection</CODE> object fails to create a 891 * connection consumer due to some internal error or invalid 892 * arguments for <CODE>sessionPool</CODE> and <CODE>messageSelector</CODE>. 893 * @throws javax.jms.InvalidDestinationException 894 * if an invalid destination is specified. 895 * @throws javax.jms.InvalidSelectorException 896 * if the message selector is invalid. 897 * @see javax.jms.ConnectionConsumer 898 * @since 1.1 899 */ 900 public ConnectionConsumer createConnectionConsumer(Destination destination, 901 String messageSelector, ServerSessionPool sessionPool, 902 int maxMessages, boolean noLocal) throws JMSException { 903 904 checkClosed(); 905 ensureClientIDInitialised(); 906 ConsumerInfo info = new ConsumerInfo(); 907 info.setConsumerId(handleIdGenerator.generateId()); 908 info.setDestination(ActiveMQMessageTransformation 909 .transformDestination(destination)); 910 info.setSelector(messageSelector); 911 info.setConsumerNo(handleIdGenerator.getNextShortSequence()); 912 info.setNoLocal(noLocal); 913 info.setClientId(clientID); 914 return new ActiveMQConnectionConsumer(this, sessionPool, info, 915 maxMessages); 916 } 917 918 919 920 /** 921 * Create a durable connection consumer for this connection (optional 922 * operation). This is an expert facility not used by regular JMS clients. 923 * 924 * @param topic 925 * topic to access 926 * @param subscriptionName 927 * durable subscription name 928 * @param messageSelector 929 * only messages with properties matching the message selector 930 * expression are delivered. A value of null or an empty string 931 * indicates that there is no message selector for the message 932 * consumer. 933 * @param sessionPool 934 * the server session pool to associate with this durable 935 * connection consumer 936 * @param maxMessages 937 * the maximum number of messages that can be assigned to a 938 * server session at one time 939 * @return the durable connection consumer 940 * @throws JMSException 941 * if the <CODE>Connection</CODE> object fails to create a 942 * connection consumer due to some internal error or invalid 943 * arguments for <CODE>sessionPool</CODE> and <CODE>messageSelector</CODE>. 944 * @throws javax.jms.InvalidDestinationException 945 * if an invalid destination is specified. 946 * @throws javax.jms.InvalidSelectorException 947 * if the message selector is invalid. 948 * @see javax.jms.ConnectionConsumer 949 * @since 1.1 950 */ 951 public ConnectionConsumer createDurableConnectionConsumer(Topic topic, 952 String subscriptionName, String messageSelector, 953 ServerSessionPool sessionPool, int maxMessages) throws JMSException { 954 checkClosed(); 955 ensureClientIDInitialised(); 956 ConsumerInfo info = new ConsumerInfo(); 957 info.setConsumerId(this.handleIdGenerator.generateId()); 958 info.setDestination(ActiveMQMessageTransformation 959 .transformDestination(topic)); 960 info.setSelector(messageSelector); 961 info.setConsumerName(subscriptionName); 962 info.setConsumerNo(handleIdGenerator.getNextShortSequence()); 963 info.setClientId(clientID); 964 return new ActiveMQConnectionConsumer(this, sessionPool, info, 965 maxMessages); 966 } 967 968 /** 969 * Create a durable connection consumer for this connection (optional 970 * operation). This is an expert facility not used by regular JMS clients. 971 * 972 * @param topic 973 * topic to access 974 * @param subscriptionName 975 * durable subscription name 976 * @param messageSelector 977 * only messages with properties matching the message selector 978 * expression are delivered. A value of null or an empty string 979 * indicates that there is no message selector for the message 980 * consumer. 981 * @param sessionPool 982 * the server session pool to associate with this durable 983 * connection consumer 984 * @param maxMessages 985 * the maximum number of messages that can be assigned to a 986 * server session at one time 987 * @param noLocal 988 * set true if you want to filter out messages published locally 989 * 990 * @return the durable connection consumer 991 * @throws JMSException 992 * if the <CODE>Connection</CODE> object fails to create a 993 * connection consumer due to some internal error or invalid 994 * arguments for <CODE>sessionPool</CODE> and <CODE>messageSelector</CODE>. 995 * @throws javax.jms.InvalidDestinationException 996 * if an invalid destination is specified. 997 * @throws javax.jms.InvalidSelectorException 998 * if the message selector is invalid. 999 * @see javax.jms.ConnectionConsumer 1000 * @since 1.1 1001 */ 1002 public ConnectionConsumer createDurableConnectionConsumer(Topic topic, 1003 String subscriptionName, String messageSelector, 1004 ServerSessionPool sessionPool, int maxMessages, boolean noLocal) throws JMSException { 1005 checkClosed(); 1006 ensureClientIDInitialised(); 1007 ConsumerInfo info = new ConsumerInfo(); 1008 info.setConsumerId(this.handleIdGenerator.generateId()); 1009 info.setDestination(ActiveMQMessageTransformation 1010 .transformDestination(topic)); 1011 info.setSelector(messageSelector); 1012 info.setConsumerName(subscriptionName); 1013 info.setNoLocal(noLocal); 1014 info.setConsumerNo(handleIdGenerator.getNextShortSequence()); 1015 info.setClientId(clientID); 1016 return new ActiveMQConnectionConsumer(this, sessionPool, info, 1017 maxMessages); 1018 } 1019 1020 /** 1021 * Implementation of the PacketListener interface - consume a packet 1022 * 1023 * @param packet - 1024 * the Packet to consume 1025 * @see org.activemq.message.PacketListener#consume(org.activemq.message.Packet) 1026 */ 1027 public void consume(Packet packet) { 1028 if (!closed && packet != null) { 1029 if (packet.isJMSMessage()) { 1030 ActiveMQMessage message = (ActiveMQMessage) packet; 1031 message.setReadOnly(true); 1032 message.setConsumerIdentifer(clientID); 1033 1034 // lets check for expired messages which is only relevant for 1035 // multicast based stuff 1036 // as a pointcast based network should filter out this stuff 1037 if (transportChannel.isMulticast()) { 1038 long expiration = message.getJMSExpiration(); 1039 if (expiration > 0) { 1040 long timeStamp = System.currentTimeMillis(); 1041 if (timeStamp > expiration) { 1042 if (log.isDebugEnabled()) { 1043 log.debug("Discarding expired message: " + message); 1044 } 1045 return; 1046 } 1047 } 1048 } 1049 1050 try { 1051 message = assembleMessage(message); 1052 if( message !=null ) { 1053 int count = 0; 1054 for (Iterator i = this.messageDispatchers.iterator(); i.hasNext();) { 1055 ActiveMQMessageDispatcher dispatcher = (ActiveMQMessageDispatcher) i.next(); 1056 if (dispatcher.isTarget(message)) { 1057 if (count > 0) { 1058 // separate message for each Session etc. 1059 message = message.deepCopy(); 1060 } 1061 dispatcher.dispatch(message); 1062 count++; 1063 } 1064 } 1065 } 1066 } catch (JMSException jmsEx) { 1067 handleAsyncException(jmsEx); 1068 } 1069 } else if (packet.getPacketType() == Packet.CAPACITY_INFO) { 1070 CapacityInfo info = (CapacityInfo) packet; 1071 synchronized(flowControlMutex) { 1072 flowControlSleepTime = info.getFlowControlTimeout(); 1073 } 1074 // System.out.println("SET FLOW TIMEOUT = " + 1075 // flowControlSleepTime + " FOR " + info); 1076 } else if (packet.getPacketType() == Packet.KEEP_ALIVE && packet.isReceiptRequired()) { 1077 Receipt receipt = new Receipt(); 1078 receipt.setCorrelationId(packet.getId()); 1079 receipt.setReceiptRequired(false); 1080 try { 1081 asyncSendPacket(receipt); 1082 } catch (JMSException jmsEx) { 1083 handleAsyncException(jmsEx); 1084 } 1085 } 1086 } 1087 } 1088 1089 private final ActiveMQMessage assembleMessage(ActiveMQMessage message) { 1090 ActiveMQMessage result = message; 1091 if (message != null && !isInternalConnection() && message.isMessagePart()) { 1092 if (message.getNumberOfParts() == 1) { 1093 //passed though from another session - i.e. 1094 //a network or remote connection and now assembled 1095 message.resetMessagePart(); 1096 result = message; 1097 } 1098 else { 1099 result = null; 1100 String parentId = message.getParentMessageID(); 1101 ActiveMQMessage[] array = (ActiveMQMessage[]) assemblies.get(parentId); 1102 if (array == null) { 1103 array = new ActiveMQMessage[message.getNumberOfParts()]; 1104 assemblies.put(parentId, array); 1105 } 1106 array[message.getPartNumber()] = message; 1107 boolean complete = true; 1108 for (int i = 0;i < array.length;i++) { 1109 complete &= array[i] != null; 1110 } 1111 if (complete) { 1112 result = array[0]; 1113 ByteArray[] bas = new ByteArray[array.length]; 1114 try { 1115 for (int i = 0;i < bas.length;i++) { 1116 bas[i] = array[i].getBodyAsBytes(); 1117 if (i >= 1){ 1118 array[i].clearBody(); 1119 } 1120 } 1121 ByteArray ba = fragmentation.assemble(bas); 1122 result.setBodyAsBytes(ba); 1123 } 1124 catch (IOException ioe) { 1125 JMSException jmsEx = new JMSException("Failed to assemble fragment message: " + parentId); 1126 jmsEx.setLinkedException(ioe); 1127 onException(jmsEx); 1128 }catch(JMSException jmsEx){ 1129 onException(jmsEx); 1130 } 1131 } 1132 } 1133 } 1134 return result; 1135 } 1136 1137 /** 1138 * @see javax.jms.ExceptionListener#onException(javax.jms.JMSException) 1139 */ 1140 public void onException(JMSException jmsEx) { 1141 // Got an exception propagated up from the transport channel 1142 handleAsyncException(jmsEx); 1143 isTransportOK = false; 1144 try { 1145 close(); 1146 } catch (JMSException ex) { 1147 log.debug("Exception closing the connection", ex); 1148 } 1149 } 1150 1151 /** 1152 * Creates a <CODE>TopicSession</CODE> object. 1153 * 1154 * @param transacted 1155 * indicates whether the session is transacted 1156 * @param acknowledgeMode 1157 * indicates whether the consumer or the client will acknowledge 1158 * any messages it receives; ignored if the session is 1159 * transacted. Legal values are 1160 * <code>Session.AUTO_ACKNOWLEDGE</code>, 1161 * <code>Session.CLIENT_ACKNOWLEDGE</code>, and 1162 * <code>Session.DUPS_OK_ACKNOWLEDGE</code>. 1163 * @return a newly created topic session 1164 * @throws JMSException 1165 * if the <CODE>TopicConnection</CODE> object fails to create 1166 * a session due to some internal error or lack of support for 1167 * the specific transaction and acknowledgement mode. 1168 * @see Session#AUTO_ACKNOWLEDGE 1169 * @see Session#CLIENT_ACKNOWLEDGE 1170 * @see Session#DUPS_OK_ACKNOWLEDGE 1171 */ 1172 public TopicSession createTopicSession(boolean transacted, 1173 int acknowledgeMode) throws JMSException { 1174 checkClosed(); 1175 sendConnectionInfoToBroker(); 1176 return new ActiveMQTopicSession((ActiveMQSession) createSession( 1177 transacted, acknowledgeMode)); 1178 } 1179 1180 /** 1181 * Creates a connection consumer for this connection (optional operation). 1182 * This is an expert facility not used by regular JMS clients. 1183 * 1184 * @param topic 1185 * the topic to access 1186 * @param messageSelector 1187 * only messages with properties matching the message selector 1188 * expression are delivered. A value of null or an empty string 1189 * indicates that there is no message selector for the message 1190 * consumer. 1191 * @param sessionPool 1192 * the server session pool to associate with this connection 1193 * consumer 1194 * @param maxMessages 1195 * the maximum number of messages that can be assigned to a 1196 * server session at one time 1197 * @return the connection consumer 1198 * @throws JMSException 1199 * if the <CODE>TopicConnection</CODE> object fails to create 1200 * a connection consumer due to some internal error or invalid 1201 * arguments for <CODE>sessionPool</CODE> and <CODE>messageSelector</CODE>. 1202 * @throws InvalidDestinationException 1203 * if an invalid topic is specified. 1204 * @throws InvalidSelectorException 1205 * if the message selector is invalid. 1206 * @see javax.jms.ConnectionConsumer 1207 */ 1208 public ConnectionConsumer createConnectionConsumer(Topic topic, 1209 String messageSelector, ServerSessionPool sessionPool, 1210 int maxMessages) throws JMSException { 1211 checkClosed(); 1212 ensureClientIDInitialised(); 1213 ConsumerInfo info = new ConsumerInfo(); 1214 info.setConsumerId(this.handleIdGenerator.generateId()); 1215 info.setDestination(ActiveMQMessageTransformation 1216 .transformDestination(topic)); 1217 info.setSelector(messageSelector); 1218 info.setConsumerNo(handleIdGenerator.getNextShortSequence()); 1219 info.setClientId(clientID); 1220 return new ActiveMQConnectionConsumer(this, sessionPool, info, 1221 maxMessages); 1222 } 1223 1224 /** 1225 * Creates a <CODE>QueueSession</CODE> object. 1226 * 1227 * @param transacted 1228 * indicates whether the session is transacted 1229 * @param acknowledgeMode 1230 * indicates whether the consumer or the client will acknowledge 1231 * any messages it receives; ignored if the session is 1232 * transacted. Legal values are 1233 * <code>Session.AUTO_ACKNOWLEDGE</code>, 1234 * <code>Session.CLIENT_ACKNOWLEDGE</code>, and 1235 * <code>Session.DUPS_OK_ACKNOWLEDGE</code>. 1236 * @return a newly created queue session 1237 * @throws JMSException 1238 * if the <CODE>QueueConnection</CODE> object fails to create 1239 * a session due to some internal error or lack of support for 1240 * the specific transaction and acknowledgement mode. 1241 * @see Session#AUTO_ACKNOWLEDGE 1242 * @see Session#CLIENT_ACKNOWLEDGE 1243 * @see Session#DUPS_OK_ACKNOWLEDGE 1244 */ 1245 public QueueSession createQueueSession(boolean transacted, 1246 int acknowledgeMode) throws JMSException { 1247 checkClosed(); 1248 sendConnectionInfoToBroker(); 1249 return new ActiveMQQueueSession((ActiveMQSession) createSession( 1250 transacted, acknowledgeMode)); 1251 } 1252 1253 /** 1254 * Creates a connection consumer for this connection (optional operation). 1255 * This is an expert facility not used by regular JMS clients. 1256 * 1257 * @param queue 1258 * the queue to access 1259 * @param messageSelector 1260 * only messages with properties matching the message selector 1261 * expression are delivered. A value of null or an empty string 1262 * indicates that there is no message selector for the message 1263 * consumer. 1264 * @param sessionPool 1265 * the server session pool to associate with this connection 1266 * consumer 1267 * @param maxMessages 1268 * the maximum number of messages that can be assigned to a 1269 * server session at one time 1270 * @return the connection consumer 1271 * @throws JMSException 1272 * if the <CODE>QueueConnection</CODE> object fails to create 1273 * a connection consumer due to some internal error or invalid 1274 * arguments for <CODE>sessionPool</CODE> and <CODE>messageSelector</CODE>. 1275 * @throws InvalidDestinationException 1276 * if an invalid queue is specified. 1277 * @throws InvalidSelectorException 1278 * if the message selector is invalid. 1279 * @see javax.jms.ConnectionConsumer 1280 */ 1281 public ConnectionConsumer createConnectionConsumer(Queue queue, 1282 String messageSelector, ServerSessionPool sessionPool, 1283 int maxMessages) throws JMSException { 1284 checkClosed(); 1285 ensureClientIDInitialised(); 1286 ConsumerInfo info = new ConsumerInfo(); 1287 info.setConsumerId(this.handleIdGenerator.generateId()); 1288 info.setDestination(ActiveMQMessageTransformation 1289 .transformDestination(queue)); 1290 info.setSelector(messageSelector); 1291 info.setConsumerNo(handleIdGenerator.getNextShortSequence()); 1292 info.setClientId(clientID); 1293 return new ActiveMQConnectionConsumer(this, sessionPool, info, 1294 maxMessages); 1295 } 1296 1297 /** 1298 * Ensures that the clientID was manually specified and not auto-generated. 1299 * If the clientID was not specified this method will throw an exception. 1300 * This method is used to ensure that the clientID + durableSubscriber name 1301 * are used correctly. 1302 * 1303 * @throws JMSException 1304 */ 1305 public void checkClientIDWasManuallySpecified() throws JMSException { 1306 if (!userSpecifiedClientID) { 1307 throw new JMSException( 1308 "You cannot create a durable subscriber without specifying a unique clientID on a Connection"); 1309 } 1310 } 1311 1312 /** 1313 * handle disconnect/reconnect events 1314 * 1315 * @param event 1316 */ 1317 public void statusChanged(TransportStatusEvent event) { 1318 log.info("channel status changed: " + event); 1319 if (event.getChannelStatus() == TransportStatusEvent.RECONNECTED) { 1320 isTransportOK = true; 1321 doReconnect(); 1322 1323 } else if (event.getChannelStatus() == TransportStatusEvent.DISCONNECTED) { 1324 isTransportOK = false; 1325 clearMessagesInProgress(); 1326 } 1327 } 1328 1329 /** 1330 * send a Packet through the Connection - for internal use only 1331 * 1332 * @param packet 1333 * @throws JMSException 1334 */ 1335 public void asyncSendPacket(Packet packet) throws JMSException { 1336 asyncSendPacket(packet, true); 1337 } 1338 1339 /** 1340 * send a Packet through the Connection - for internal use only 1341 * 1342 * @param packet 1343 * @param doSendWhileReconnecting 1344 * @throws JMSException 1345 */ 1346 public void asyncSendPacket(Packet packet, boolean doSendWhileReconnecting) 1347 throws JMSException { 1348 if (isTransportOK 1349 && !closed 1350 && (doSendWhileReconnecting || transportChannel 1351 .isTransportConnected())) { 1352 packet.setId(packetIdGenerator.getNextShortSequence()); 1353 packet.setReceiptRequired(false); 1354 synchronized(flowControlMutex) { 1355 if (packet.isJMSMessage() && flowControlSleepTime > 0) { 1356 try { 1357 Thread.sleep(flowControlSleepTime); 1358 } catch (InterruptedException e) { 1359 } 1360 } 1361 } 1362 this.transportChannel.asyncSend(packet); 1363 } 1364 } 1365 1366 /** 1367 * send a Packet through a Connection - for internal use only 1368 * 1369 * @param packet 1370 * @throws JMSException 1371 */ 1372 public void syncSendPacket(Packet packet) throws JMSException { 1373 syncSendPacket(packet, 0); 1374 } 1375 1376 /** 1377 * Send a packet through a Connection - for internal use only 1378 * 1379 * @param packet 1380 * @param timeout 1381 * @throws JMSException 1382 */ 1383 public void syncSendPacket(Packet packet, int timeout) throws JMSException { 1384 if (isTransportOK && !closed) { 1385 Receipt receipt; 1386 packet.setId(packetIdGenerator.getNextShortSequence()); 1387 packet.setReceiptRequired(true); 1388 receipt = this.transportChannel.send(packet, timeout); 1389 if (receipt != null) { 1390 if (receipt.isFailed()) { 1391 Throwable e = receipt.getException(); 1392 if (e != null) { 1393 throw JMSExceptionHelper.newJMSException(e); 1394 } 1395 throw new JMSException( 1396 "syncSendPacket failed with unknown exception"); 1397 } 1398 } 1399 } else { 1400 if (closed) { 1401 throw new ConnectionClosedException(); 1402 } else { 1403 throw new JMSException( 1404 "syncSendTimedOut: connection no longer OK"); 1405 } 1406 } 1407 } 1408 1409 public Receipt syncSendRequest(Packet packet) throws JMSException { 1410 checkClosed(); 1411 if (isTransportOK && !closed) { 1412 Receipt receipt; 1413 packet.setReceiptRequired(true); 1414 packet.setId(this.packetIdGenerator.getNextShortSequence()); 1415 1416 receipt = this.transportChannel.send(packet); 1417 if (receipt != null && receipt.isFailed()) { 1418 Throwable e = receipt.getException(); 1419 if (e != null) { 1420 throw (JMSException) new JMSException(e.getMessage()) 1421 .initCause(e); 1422 } 1423 throw new JMSException( 1424 "syncSendPacket failed with unknown exception"); 1425 } 1426 return receipt; 1427 } else { 1428 if (closed) { 1429 throw new ConnectionClosedException(); 1430 } else { 1431 throw new JMSException( 1432 "syncSendTimedOut: connection no longer OK"); 1433 } 1434 } 1435 } 1436 1437 // Properties 1438 // ------------------------------------------------------------------------- 1439 1440 /** 1441 * @return Returns the prefetchPolicy. 1442 */ 1443 public ActiveMQPrefetchPolicy getPrefetchPolicy() { 1444 return prefetchPolicy; 1445 } 1446 1447 /** 1448 * @param prefetchPolicy 1449 * The prefetchPolicy to set. 1450 */ 1451 public void setPrefetchPolicy(ActiveMQPrefetchPolicy prefetchPolicy) { 1452 this.prefetchPolicy = prefetchPolicy; 1453 } 1454 1455 public int getSendCloseTimeout() { 1456 return sendCloseTimeout; 1457 } 1458 1459 public void setSendCloseTimeout(int sendCloseTimeout) { 1460 this.sendCloseTimeout = sendCloseTimeout; 1461 } 1462 1463 public int getSendConnectionInfoTimeout() { 1464 return sendConnectionInfoTimeout; 1465 } 1466 1467 public void setSendConnectionInfoTimeout(int sendConnectionInfoTimeout) { 1468 this.sendConnectionInfoTimeout = sendConnectionInfoTimeout; 1469 } 1470 1471 public TransportChannel getTransportChannel() { 1472 return transportChannel; 1473 } 1474 1475 /** 1476 * Returns the clientID of the connection, forcing one to be generated if 1477 * one has not yet been configured 1478 */ 1479 public String getInitializedClientID() throws JMSException { 1480 ensureClientIDInitialised(); 1481 return this.clientID; 1482 } 1483 1484 // Implementation methods 1485 // ------------------------------------------------------------------------- 1486 1487 /** 1488 * Used internally for adding Sessions to the Connection 1489 * 1490 * @param session 1491 * @throws JMSException 1492 */ 1493 protected void addSession(ActiveMQSession session) throws JMSException { 1494 this.sessions.add(session); 1495 addMessageDispatcher(session); 1496 if (started.get()) { 1497 session.start(); 1498 } 1499 SessionInfo info = createSessionInfo(session); 1500 info.setStarted(true); 1501 asyncSendPacket(info); 1502 } 1503 1504 /** 1505 * Used interanlly for removing Sessions from a Connection 1506 * 1507 * @param session 1508 * @throws JMSException 1509 */ 1510 protected void removeSession(ActiveMQSession session) throws JMSException { 1511 this.sessions.remove(session); 1512 removeMessageDispatcher(session); 1513 SessionInfo info = createSessionInfo(session); 1514 info.setStarted(false); 1515 asyncSendPacket(info, false); 1516 } 1517 1518 private SessionInfo createSessionInfo(ActiveMQSession session) { 1519 SessionInfo info = new SessionInfo(); 1520 info.setClientId(clientID); 1521 info.setSessionId(session.getSessionId()); 1522 info.setStartTime(session.getStartTime()); 1523 return info; 1524 } 1525 1526 /** 1527 * Add a ConnectionConsumer 1528 * 1529 * @param connectionConsumer 1530 * @throws JMSException 1531 */ 1532 protected void addConnectionConsumer( 1533 ActiveMQConnectionConsumer connectionConsumer) throws JMSException { 1534 this.connectionConsumers.add(connectionConsumer); 1535 addMessageDispatcher(connectionConsumer); 1536 } 1537 1538 /** 1539 * Remove a ConnectionConsumer 1540 * 1541 * @param connectionConsumer 1542 */ 1543 protected void removeConnectionConsumer( 1544 ActiveMQConnectionConsumer connectionConsumer) { 1545 this.connectionConsumers.add(connectionConsumer); 1546 removeMessageDispatcher(connectionConsumer); 1547 } 1548 1549 /** 1550 * Add a Message dispatcher to receive messages from the Broker 1551 * 1552 * @param messageDispatch 1553 * @throws JMSException 1554 * if an internal error 1555 */ 1556 protected void addMessageDispatcher( 1557 ActiveMQMessageDispatcher messageDispatch) throws JMSException { 1558 this.messageDispatchers.add(messageDispatch); 1559 } 1560 1561 /** 1562 * Remove a Message dispatcher 1563 * 1564 * @param messageDispatcher 1565 */ 1566 protected void removeMessageDispatcher( 1567 ActiveMQMessageDispatcher messageDispatcher) { 1568 this.messageDispatchers.remove(messageDispatcher); 1569 } 1570 1571 /** 1572 * Used for handling async exceptions 1573 * 1574 * @param jmsEx 1575 */ 1576 protected void handleAsyncException(JMSException jmsEx) { 1577 if (!closed) { 1578 if (this.exceptionListener != null) { 1579 this.exceptionListener.onException(jmsEx); 1580 } else { 1581 log.warn( 1582 "Async exception with no exception listener: " + jmsEx, 1583 jmsEx); 1584 } 1585 } 1586 } 1587 1588 protected void sendConnectionInfoToBroker() throws JMSException { 1589 sendConnectionInfoToBroker(sendConnectionInfoTimeout, closed, false); 1590 } 1591 1592 /** 1593 * Send the ConnectionInfo to the Broker 1594 * 1595 * @param timeout 1596 * @param isClosed 1597 * @throws JMSException 1598 */ 1599 protected void sendConnectionInfoToBroker(int timeout, boolean forceResend, 1600 boolean closing) throws JMSException { 1601 // Can we skip sending the ConnectionInfo packet?? 1602 if (isConnectionInfoSentToBroker && !forceResend) { 1603 return; 1604 } 1605 1606 fragmentation.setFragmentationLimit(getMessageFragmentationLimit()); 1607 1608 this.isConnectionInfoSentToBroker = true; 1609 ensureClientIDInitialised(); 1610 ConnectionInfo info = new ConnectionInfo(); 1611 info.setClientId(this.clientID); 1612 info.setHostName(IdGenerator.getHostName()); 1613 info.setUserName(userName); 1614 info.setPassword(password); 1615 info.setStartTime(startTime); 1616 info.setStarted(started.get()); 1617 info.setClosed(closed || closing); 1618 info.setClientVersion(connectionMetaData.getProviderVersion()); 1619 info.setWireFormatVersion(transportChannel 1620 .getCurrentWireFormatVersion()); 1621 if (info.getProperties() != null) { 1622 info.getProperties().setProperty(ConnectionInfo.NO_DELAY_PROPERTY, 1623 new Boolean(!useAsyncSend).toString()); 1624 } 1625 if (quickClose && info.isClosed()) { 1626 asyncSendPacket(info); 1627 } else { 1628 syncSendPacket(info, timeout); 1629 } 1630 } 1631 1632 /** 1633 * Set the maximum amount of memory this Connection should use for buffered 1634 * inbound messages 1635 * 1636 * @param newMemoryLimit 1637 * the new memory limit in bytes 1638 */ 1639 public void setConnectionMemoryLimit(int newMemoryLimit) { 1640 memoryManager.setValueLimit(newMemoryLimit); 1641 } 1642 1643 /** 1644 * Get the current value for the maximum amount of memory this Connection 1645 * should use for buffered inbound messages 1646 * 1647 * @return the current limit in bytes 1648 */ 1649 public int getConnectionMemoryLimit() { 1650 return (int) memoryManager.getValueLimit(); 1651 } 1652 1653 /** 1654 * CapacityMonitorEventListener implementation called when the capacity of a 1655 * CapacityService changes 1656 * 1657 * @param event 1658 */ 1659 public void capacityChanged(CapacityMonitorEvent event) { 1660 // send the event to broker ... 1661 CapacityInfo info = new CapacityInfo(); 1662 info.setResourceName(event.getMonitorName()); 1663 info.setCapacity(event.getCapacity()); 1664 // System.out.println("Cap changed: " + event); 1665 try { 1666 asyncSendPacket(info, false); 1667 } catch (JMSException e) { 1668 JMSException jmsEx = new JMSException( 1669 "failed to send change in capacity"); 1670 jmsEx.setLinkedException(e); 1671 handleAsyncException(jmsEx); 1672 } 1673 } 1674 1675 /** 1676 * @return a number unique for this connection 1677 */ 1678 protected int getNextConsumerNumber() { 1679 return this.consumerNumberGenerator.increment(); 1680 } 1681 1682 protected short generateSessionId() { 1683 return this.sessionIdGenerator.getNextShortSequence(); 1684 } 1685 1686 private synchronized void ensureClientIDInitialised() { 1687 if (this.clientID == null || this.clientID.trim().equals("")) { 1688 this.clientID = this.clientIdGenerator.generateId(); 1689 } 1690 transportChannel.setClientID(clientID); 1691 this.clientIDSet = true; 1692 } 1693 1694 protected MemoryBoundedQueue getMemoryBoundedQueue(String name) { 1695 return boundedQueueManager.getMemoryBoundedQueue(name); 1696 } 1697 1698 protected void doReconnect() { 1699 try { 1700 // send the Connection info again 1701 sendConnectionInfoToBroker(sendConnectionInfoTimeout, true, false); 1702 for (Iterator iter = sessions.iterator(); iter.hasNext();) { 1703 ActiveMQSession session = (ActiveMQSession) iter.next(); 1704 SessionInfo sessionInfo = createSessionInfo(session); 1705 sessionInfo.setStarted(true); 1706 asyncSendPacket(sessionInfo, false); 1707 // send consumers 1708 for (Iterator consumersIterator = session.consumers.iterator(); consumersIterator 1709 .hasNext();) { 1710 ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) consumersIterator 1711 .next(); 1712 ConsumerInfo consumerInfo = session 1713 .createConsumerInfo(consumer); 1714 consumerInfo.setStarted(true); 1715 asyncSendPacket(consumerInfo, false); 1716 } 1717 // send producers 1718 for (Iterator producersIterator = session.producers.iterator(); producersIterator 1719 .hasNext();) { 1720 ActiveMQMessageProducer producer = (ActiveMQMessageProducer) producersIterator 1721 .next(); 1722 ProducerInfo producerInfo = session 1723 .createProducerInfo(producer); 1724 producerInfo.setStarted(true); 1725 asyncSendPacket(producerInfo, false); 1726 } 1727 // send the current capacity 1728 CapacityMonitorEvent event = memoryManager 1729 .generateCapacityMonitorEvent(); 1730 if (event != null) { 1731 capacityChanged(event); 1732 } 1733 } 1734 } catch (JMSException jmsEx) { 1735 log.error("Failed to do reconnection"); 1736 handleAsyncException(jmsEx); 1737 isTransportOK = false; 1738 } 1739 } 1740 1741 /** 1742 * @return Returns the useAsyncSend. 1743 */ 1744 public boolean isUseAsyncSend() { 1745 return useAsyncSend; 1746 } 1747 1748 /** 1749 * @param useAsyncSend 1750 * The useAsyncSend to set. 1751 */ 1752 public void setUseAsyncSend(boolean useAsyncSend) { 1753 this.useAsyncSend = useAsyncSend; 1754 } 1755 1756 /** 1757 * @return Returns the cachingEnabled. 1758 */ 1759 public boolean isCachingEnabled() { 1760 return cachingEnabled; 1761 } 1762 1763 /** 1764 * @param cachingEnabled 1765 * The cachingEnabled to set. 1766 */ 1767 public void setCachingEnabled(boolean cachingEnabled) { 1768 this.cachingEnabled = cachingEnabled; 1769 } 1770 1771 /** 1772 * @return Returns the j2EEcompliant. 1773 */ 1774 public boolean isJ2EEcompliant() { 1775 return J2EEcompliant; 1776 } 1777 1778 /** 1779 * @param ecompliant 1780 * The j2EEcompliant to set. 1781 */ 1782 public void setJ2EEcompliant(boolean ecompliant) { 1783 J2EEcompliant = ecompliant; 1784 } 1785 1786 /** 1787 * @return Returns the internalConnection. 1788 */ 1789 public boolean isInternalConnection() { 1790 return internalConnection; 1791 } 1792 1793 /** 1794 * @param internalConnection 1795 * The internalConnection to set. 1796 */ 1797 public void setInternalConnection(boolean internalConnection) { 1798 this.internalConnection = internalConnection; 1799 } 1800 1801 /** 1802 * @return Returns the doMessageCompression. 1803 */ 1804 public boolean isDoMessageCompression() { 1805 return doMessageCompression 1806 && transportChannel.doesSupportMessageCompression(); 1807 } 1808 1809 /** 1810 * @param doMessageCompression 1811 * The doMessageCompression to set. 1812 */ 1813 public void setDoMessageCompression(boolean doMessageCompression) { 1814 this.doMessageCompression = doMessageCompression 1815 && transportChannel.doesSupportMessageCompression(); 1816 } 1817 1818 /** 1819 * @return Returns the doMessageFragmentation. 1820 */ 1821 public boolean isDoMessageFragmentation() { 1822 return doMessageFragmentation 1823 && transportChannel.doesSupportMessageFragmentation(); 1824 } 1825 1826 /** 1827 * @param doMessageFragmentation 1828 * The doMessageFragmentation to set. 1829 */ 1830 public void setDoMessageFragmentation(boolean doMessageFragmentation) { 1831 this.doMessageFragmentation = doMessageFragmentation 1832 && transportChannel.doesSupportMessageFragmentation(); 1833 } 1834 1835 /** 1836 * @return Returns the messageCompressionLevel. 1837 */ 1838 public int getMessageCompressionLevel() { 1839 return messageCompressionLevel; 1840 } 1841 1842 /** 1843 * @param messageCompressionLevel 1844 * The messageCompressionLevel to set. 1845 */ 1846 public void setMessageCompressionLevel(int messageCompressionLevel) { 1847 this.messageCompressionLevel = messageCompressionLevel; 1848 } 1849 1850 /** 1851 * @return Returns the messageCompressionLimit. 1852 */ 1853 public int getMessageCompressionLimit() { 1854 return messageCompressionLimit; 1855 } 1856 1857 /** 1858 * @param messageCompressionLimit 1859 * The messageCompressionLimit to set. 1860 */ 1861 public void setMessageCompressionLimit(int messageCompressionLimit) { 1862 this.messageCompressionLimit = messageCompressionLimit; 1863 } 1864 1865 /** 1866 * @return Returns the messageCompressionStrategy. 1867 */ 1868 public int getMessageCompressionStrategy() { 1869 return messageCompressionStrategy; 1870 } 1871 1872 /** 1873 * @param messageCompressionStrategy 1874 * The messageCompressionStrategy to set. 1875 */ 1876 public void setMessageCompressionStrategy(int messageCompressionStrategy) { 1877 this.messageCompressionStrategy = messageCompressionStrategy; 1878 } 1879 1880 /** 1881 * @return Returns the messageFragmentationLimit. 1882 */ 1883 public int getMessageFragmentationLimit() { 1884 return messageFragmentationLimit; 1885 } 1886 1887 /** 1888 * @param messageFragmentationLimit 1889 * The messageFragmentationLimit to set. 1890 */ 1891 public void setMessageFragmentationLimit(int messageFragmentationLimit) { 1892 this.messageFragmentationLimit = messageFragmentationLimit; 1893 } 1894 1895 /** 1896 * @return Returns the disableTimeStampsByDefault. 1897 */ 1898 public boolean isDisableTimeStampsByDefault() { 1899 return disableTimeStampsByDefault; 1900 } 1901 1902 /** 1903 * @param disableTimeStampsByDefault 1904 * The disableTimeStampsByDefault to set. 1905 */ 1906 public void setDisableTimeStampsByDefault(boolean disableTimeStampsByDefault) { 1907 this.disableTimeStampsByDefault = disableTimeStampsByDefault; 1908 } 1909 1910 /** 1911 * Causes pre-serialization of messages before send By default this is on 1912 * 1913 * @return Returns the prePrepareMessageOnSend. 1914 */ 1915 public boolean isPrepareMessageBodyOnSend() { 1916 return prepareMessageBodyOnSend; 1917 } 1918 1919 /** 1920 * Causes pre-serialization of messages before send By default this is on 1921 * 1922 * @param prePrepareMessageOnSend 1923 * The prePrepareMessageOnSend to set. 1924 */ 1925 public void setPrepareMessageBodyOnSend(boolean prePrepareMessageOnSend) { 1926 this.prepareMessageBodyOnSend = prePrepareMessageOnSend; 1927 } 1928 1929 /** 1930 * @return Returns the copyMessageOnSend. 1931 */ 1932 public boolean isCopyMessageOnSend() { 1933 return copyMessageOnSend; 1934 } 1935 1936 /** 1937 * @param copyMessageOnSend 1938 * The copyMessageOnSend to set. 1939 */ 1940 public void setCopyMessageOnSend(boolean copyMessageOnSend) { 1941 this.copyMessageOnSend = copyMessageOnSend; 1942 } 1943 1944 /** 1945 * @return Returns the quickClose. 1946 */ 1947 public boolean isQuickClose() { 1948 return quickClose; 1949 } 1950 1951 /** 1952 * @param quickClose 1953 * The quickClose to set. 1954 */ 1955 public void setQuickClose(boolean quickClose) { 1956 this.quickClose = quickClose; 1957 } 1958 1959 /** 1960 * @return Returns the optimizedMessageDispatch. 1961 */ 1962 public boolean isOptimizedMessageDispatch() { 1963 return optimizedMessageDispatch; 1964 } 1965 1966 /** 1967 * @param optimizedMessageDispatch 1968 * The optimizedMessageDispatch to set. 1969 */ 1970 public void setOptimizedMessageDispatch(boolean optimizedMessageDispatch) { 1971 this.optimizedMessageDispatch = optimizedMessageDispatch; 1972 } 1973 1974 protected void clearMessagesInProgress() { 1975 for (Iterator i = sessions.iterator(); i.hasNext();) { 1976 ActiveMQSession session = (ActiveMQSession) i.next(); 1977 session.clearMessagesInProgress(); 1978 } 1979 } 1980 1981 /** 1982 * Tells the broker to destroy a destination. 1983 * 1984 * @param destination 1985 */ 1986 public void destroyDestination(ActiveMQDestination destination) 1987 throws JMSException { 1988 BrokerAdminCommand command = new BrokerAdminCommand(); 1989 command.setCommand(BrokerAdminCommand.DESTROY_DESTINATION); 1990 command.setDestination(destination); 1991 syncSendPacket(command); 1992 } 1993 1994 /** 1995 * Cleans up this connection so that it's state is as if the connection was 1996 * just created. This allows the Resource Adapter to clean up a connection 1997 * so that it can be reused without having to close and recreate the 1998 * connection. 1999 * 2000 * @param sessionId 2001 */ 2002 public void cleanup() throws JMSException { 2003 2004 try { 2005 for (Iterator i = this.sessions.iterator(); i.hasNext();) { 2006 ActiveMQSession s = (ActiveMQSession) i.next(); 2007 s.close(); 2008 } 2009 for (Iterator i = this.connectionConsumers.iterator(); i.hasNext();) { 2010 ActiveMQConnectionConsumer c = (ActiveMQConnectionConsumer) i 2011 .next(); 2012 c.close(); 2013 } 2014 this.connectionConsumers.clear(); 2015 this.messageDispatchers.clear(); 2016 } finally { 2017 this.sessions.clear(); 2018 started.set(false); 2019 } 2020 2021 setExceptionListener(null); 2022 clientIDSet = false; 2023 isConnectionInfoSentToBroker = false; 2024 2025 CleanupConnectionInfo cleanupInfo = new CleanupConnectionInfo(); 2026 cleanupInfo.setClientId(getClientID()); 2027 asyncSendPacket(cleanupInfo); 2028 } 2029 2030 /** 2031 * Changes the associated username/password that is associated with this 2032 * connection. If the connection has been used, you must called cleanup() 2033 * before calling this method. 2034 * 2035 * @throws IllegalStateException 2036 * if the connection is in used. 2037 * @param sessionId 2038 */ 2039 public void changeUserInfo(String theUserName, String thePassword) 2040 throws JMSException { 2041 if (isConnectionInfoSentToBroker) 2042 throw new IllegalStateException( 2043 "changeUserInfo used Connection is not allowed"); 2044 2045 this.userName = theUserName; 2046 this.password = thePassword; 2047 } 2048 2049 protected void addToTransientConsumedRedeliverCache(ActiveMQMessage message) { 2050 transientConsumedRedeliverCache.add(message); 2051 } 2052 2053 protected void replayTransientConsumedRedeliveredMessages( 2054 ActiveMQSession session, ActiveMQMessageConsumer consumer) 2055 throws JMSException { 2056 if (consumer.getDestination().isTopic() 2057 && !transientConsumedRedeliverCache.isEmpty()) { 2058 Filter filter = getFilterFactory().createFilter( 2059 consumer.getDestination(), consumer.getMessageSelector()); 2060 if (consumer.isNoLocal()) { 2061 filter = new AndFilter(filter, new NoLocalFilter(clientID)); 2062 } 2063 for (Iterator i = transientConsumedRedeliverCache.iterator(); i 2064 .hasNext();) { 2065 ActiveMQMessage message = (ActiveMQMessage) i.next(); 2066 if (filter.matches(message)) { 2067 transientConsumedRedeliverCache.remove(message); 2068 message.setMessageAcknowledge(session); 2069 message.setJMSRedelivered(true); 2070 message.setConsumerNos(new int[] { consumer 2071 .getConsumerNumber() }); 2072 consumer.processMessage(message); 2073 } 2074 } 2075 } 2076 } 2077 2078 private FilterFactory getFilterFactory() { 2079 if (filterFactory == null) { 2080 filterFactory = new FilterFactoryImpl(); 2081 } 2082 return filterFactory; 2083 } 2084 2085 protected void startTemporaryDestination(ActiveMQDestination dest) 2086 throws JMSException { 2087 if (dest != null && dest.isTemporary()) { 2088 TempDestinationAdvisoryEvent event = (TempDestinationAdvisoryEvent) tempDestinationMap 2089 .get(dest); 2090 if (event == null) { 2091 event = new TempDestinationAdvisoryEvent(dest, true); 2092 tempDestinationMap.put(dest, event); 2093 ActiveMQObjectMessage msg = new ActiveMQObjectMessage(); 2094 msg.setObject(event); 2095 msg.setJMSDeliveryMode(DeliveryMode.NON_PERSISTENT); 2096 msg.setJMSDestination(dest.getTopicForTempAdvisory()); 2097 msg.setJMSMessageID("ID:" + dest.getPhysicalName() 2098 + " .started"); 2099 this.syncSendPacket(msg); 2100 } 2101 } 2102 } 2103 2104 protected void stopTemporaryDestination(ActiveMQDestination dest) 2105 throws JMSException { 2106 if (dest != null && dest.isTemporary()) { 2107 TempDestinationAdvisoryEvent event = (TempDestinationAdvisoryEvent) tempDestinationMap 2108 .remove(dest); 2109 if (event != null) { 2110 event.setStarted(false); 2111 ActiveMQObjectMessage msg = new ActiveMQObjectMessage(); 2112 msg.setObject(event); 2113 msg.setJMSDeliveryMode(DeliveryMode.NON_PERSISTENT); 2114 msg.setJMSDestination(dest.getTopicForTempAdvisory()); 2115 msg.setJMSMessageID("ID:" + dest.getPhysicalName() 2116 + " .stopped"); 2117 this.syncSendPacket(msg); 2118 } 2119 } 2120 } 2121 2122 protected void closeTemporaryDestinations() throws JMSException { 2123 for (Iterator i = tempDestinationMap.keySet().iterator(); i.hasNext();) { 2124 ActiveMQDestination dest = (ActiveMQDestination) i.next(); 2125 stopTemporaryDestination(dest); 2126 } 2127 } 2128 2129 protected void startAdvisoryForTempDestination(Destination d) 2130 throws JMSException { 2131 if (d != null) { 2132 ActiveMQDestination dest = (ActiveMQDestination) ActiveMQMessageTransformation 2133 .transformDestination(d); 2134 if (dest.isTemporary()) { 2135 TempDestinationAdvisor test = (TempDestinationAdvisor) validDestinationsMap 2136 .get(dest); 2137 if (test == null) { 2138 test = new TempDestinationAdvisor(this, dest); 2139 test.start(); 2140 validDestinationsMap.put(dest, test); 2141 } 2142 } 2143 } 2144 } 2145 2146 protected void stopAdvisoryForTempDestination(ActiveMQDestination d) 2147 throws JMSException { 2148 if (d != null) { 2149 ActiveMQDestination dest = (ActiveMQDestination) ActiveMQMessageTransformation 2150 .transformDestination(d); 2151 if (dest.isTemporary()) { 2152 TempDestinationAdvisor test = (TempDestinationAdvisor) validDestinationsMap 2153 .remove(dest); 2154 if (test != null) { 2155 test.stop(); 2156 } 2157 } 2158 } 2159 } 2160 2161 protected final void validateDestination(ActiveMQDestination dest) 2162 throws JMSException { 2163 if (dest != null) { 2164 if (dest.isTemporary()) { 2165 TempDestinationAdvisor test = (TempDestinationAdvisor) validDestinationsMap 2166 .get(dest); 2167 if (dest.isDeleted() || test == null || !test.isActive(dest)) { 2168 throw new JMSException( 2169 "Cannot publish to a deleted Destination: " + dest); 2170 } 2171 } 2172 } 2173 } 2174 2175 /** 2176 * @return Returns the resourceManagerId. 2177 * @throws JMSException 2178 */ 2179 synchronized public String getResourceManagerId() throws JMSException { 2180 if (resourceManagerId == null) { 2181 resourceManagerId = determineResourceManagerId(); 2182 } 2183 return resourceManagerId; 2184 } 2185 2186 /** 2187 * Get's the resource manager id. 2188 */ 2189 private String determineResourceManagerId() throws JMSException { 2190 2191 XATransactionInfo info = new XATransactionInfo(); 2192 info.setType(TransactionInfo.GET_RM_ID); 2193 2194 ResponseReceipt receipt = (ResponseReceipt) syncSendRequest(info); 2195 String rmId = (String) receipt.getResult(); 2196 assert rmId != null; 2197 return rmId; 2198 } 2199 2200 public ByteArrayFragmentation getFragmentation() { 2201 return fragmentation; 2202 } 2203 2204 public ConcurrentHashMap getAssemblies() { 2205 return assemblies; 2206 } 2207 2208 2209 }