001 /** 002 * 003 * Copyright 2004 Protique Ltd 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 * 017 **/ 018 package org.activemq; 019 020 import java.net.URI; 021 import java.net.URISyntaxException; 022 import java.util.ArrayList; 023 import java.util.Iterator; 024 import java.util.List; 025 import java.util.Map; 026 import java.util.Properties; 027 028 import javax.jms.Connection; 029 import javax.jms.ConnectionFactory; 030 import javax.jms.JMSException; 031 import javax.jms.QueueConnection; 032 import javax.jms.QueueConnectionFactory; 033 import javax.jms.TopicConnection; 034 import javax.jms.TopicConnectionFactory; 035 import javax.naming.Context; 036 037 import org.activemq.broker.Broker; 038 import org.activemq.broker.BrokerConnector; 039 import org.activemq.broker.BrokerContainer; 040 import org.activemq.broker.BrokerContainerFactory; 041 import org.activemq.broker.BrokerContext; 042 import org.activemq.broker.impl.BrokerClientImpl; 043 import org.activemq.broker.impl.BrokerConnectorImpl; 044 import org.activemq.broker.impl.BrokerContainerFactoryImpl; 045 import org.activemq.io.WireFormat; 046 import org.activemq.io.WireFormatLoader; 047 import org.activemq.io.impl.DefaultWireFormat; 048 import org.activemq.io.util.ByteArrayCompression; 049 import org.activemq.io.util.ByteArrayFragmentation; 050 import org.activemq.jndi.JNDIBaseStorable; 051 import org.activemq.management.JMSStatsImpl; 052 import org.activemq.management.StatsCapable; 053 import org.activemq.management.StatsImpl; 054 import org.activemq.message.ActiveMQQueue; 055 import org.activemq.message.ActiveMQTopic; 056 import org.activemq.message.ConnectionInfo; 057 import org.activemq.message.ConsumerInfo; 058 import org.activemq.service.Service; 059 import org.activemq.transport.TransportChannel; 060 import org.activemq.transport.TransportChannelFactory; 061 import org.activemq.transport.TransportChannelListener; 062 import org.activemq.transport.TransportChannelProvider; 063 import org.activemq.transport.vm.VmTransportChannel; 064 import org.activemq.util.BeanUtils; 065 import org.activemq.util.IdGenerator; 066 import org.activemq.util.URIHelper; 067 import org.apache.commons.logging.Log; 068 import org.apache.commons.logging.LogFactory; 069 070 /** 071 * A ConnectionFactory is an an Administed object, and is used for creating 072 * Connections. 073 * <p/> 074 * This class also implements QueueConnectionFactory and TopicConnectionFactory and is an Administered object. 075 * You can use this connection to create both QueueConnections and TopicConnections. 076 * 077 * @version $Revision: 1.2 $ 078 * @see javax.jms.ConnectionFactory 079 */ 080 public class ActiveMQConnectionFactory extends JNDIBaseStorable implements ConnectionFactory, QueueConnectionFactory, TopicConnectionFactory, Service, StatsCapable { 081 082 private static final Log log = LogFactory.getLog(ActiveMQConnectionFactory.class); 083 084 private BrokerContext brokerContext = BrokerContext.getInstance(); 085 private BrokerContainerFactory brokerContainerFactory; 086 protected BrokerContainer brokerContainer; 087 088 protected String userName; 089 protected String password; 090 protected String brokerURL; 091 protected String clientID; 092 protected String brokerName; 093 private boolean useEmbeddedBroker; 094 /** 095 * Should we use an async send for persistent non transacted messages ? 096 */ 097 protected boolean useAsyncSend = true; 098 protected boolean disableTimeStampsByDefault = false; 099 protected boolean J2EEcompliant = true; 100 101 /* The list of emebeded brokers that this object started */ 102 private List startedEmbeddedBrokers = new ArrayList(); 103 104 private JMSStatsImpl stats = new JMSStatsImpl(); 105 private WireFormat wireFormat = new DefaultWireFormat(); 106 private IdGenerator idGenerator = new IdGenerator(); 107 private int connectionCount; 108 private String brokerXmlConfig; 109 110 111 //compression and fragmentation variables 112 113 protected boolean doMessageCompression = true; 114 protected int messageCompressionLimit = ByteArrayCompression.DEFAULT_COMPRESSION_LIMIT;//data size above which compression will be used 115 protected int messageCompressionLevel = ByteArrayCompression.DEFAULT_COMPRESSION_LEVEL; 116 protected int messageCompressionStrategy = ByteArrayCompression.DEFAULT_COMPRESSION_STRATEGY;//default compression strategy 117 118 protected boolean doMessageFragmentation = false; 119 protected int messageFragmentationLimit = ByteArrayFragmentation.DEFAULT_FRAGMENTATION_LIMIT; 120 121 protected boolean cachingEnabled = true; 122 123 protected boolean prepareMessageBodyOnSend = true; //causes pre-serialization of messages 124 125 protected boolean quickClose = false; 126 127 protected boolean internalConnection = false;//connections are used internally - for networks etc. 128 129 protected boolean optimizedMessageDispatch = false;//set to true for better consumption for transient topics 130 131 protected boolean copyMessageOnSend = true;//set false for better throughput 132 133 private ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy(); 134 135 /** 136 * Default Constructor for ActiveMQConnectionFactory 137 */ 138 public ActiveMQConnectionFactory() { 139 this( ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, ActiveMQConnection.DEFAULT_URL); 140 } 141 142 143 public ActiveMQConnectionFactory(String brokerURL) { 144 this(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, brokerURL); 145 } 146 147 public ActiveMQConnectionFactory(String userName, String password, String brokerURL) { 148 this.userName = userName; 149 this.password = password; 150 this.brokerURL = brokerURL; 151 152 if( brokerURL.indexOf("?")>= 0 ) { 153 String options = brokerURL.substring(brokerURL.indexOf("?")+1); 154 Map properties = URIHelper.parseQuery(options); 155 if (!properties.isEmpty()) { 156 BeanUtils.populate(this, properties); 157 } 158 } 159 } 160 161 /** 162 * Constructs a {@link ConnectionFactory} with an already configured and <b>started</b> {@link BrokerContainer} 163 * ready for use in embedded mode. 164 * 165 * @param container 166 */ 167 public ActiveMQConnectionFactory(BrokerContainer container) { 168 this(container, "vm://" + container.getBroker().getName()); 169 } 170 171 /** 172 * Constructs a {@link ConnectionFactory} with an already configured and <b>started</b> {@link BrokerContainer} 173 * ready for use in embedded mode and the brokerURL connection. 174 * 175 * @param container 176 */ 177 public ActiveMQConnectionFactory(BrokerContainer container, String brokerURL) { 178 this.brokerContainer = container; 179 this.useEmbeddedBroker = false; 180 this.brokerURL = brokerURL; 181 182 if( brokerURL.indexOf("?")>= 0 ) { 183 String options = brokerURL.substring(brokerURL.indexOf("?")+1); 184 Map properties = URIHelper.parseQuery(options); 185 if (!properties.isEmpty()) { 186 BeanUtils.populate(this, properties); 187 } 188 } 189 } 190 191 192 public StatsImpl getStats() { 193 return stats; 194 } 195 196 public JMSStatsImpl getFactoryStats() { 197 return stats; 198 } 199 200 /** 201 * @return Returns the brokerURL. 202 */ 203 public String getBrokerURL() { 204 return brokerURL; 205 } 206 207 /** 208 * @param brokerURL The brokerURL to set. 209 */ 210 public void setBrokerURL(String brokerURL) { 211 this.brokerURL = brokerURL; 212 } 213 214 /** 215 * @return Returns the clientID. 216 */ 217 public String getClientID() { 218 return clientID; 219 } 220 221 /** 222 * @param clientID The clientID to set. 223 */ 224 public void setClientID(String clientID) { 225 this.clientID = clientID; 226 } 227 228 /** 229 * @return Returns the password. 230 */ 231 public String getPassword() { 232 return password; 233 } 234 235 /** 236 * @param password The password to set. 237 */ 238 public void setPassword(String password) { 239 this.password = password; 240 } 241 242 /** 243 * @return Returns the userName. 244 */ 245 public String getUserName() { 246 return userName; 247 } 248 249 /** 250 * @param userName The userName to set. 251 */ 252 public void setUserName(String userName) { 253 this.userName = userName; 254 } 255 256 /** 257 * Is an embedded broker used by this connection factory 258 * 259 * @return true if an embedded broker will be used by this connection factory 260 */ 261 public boolean isUseEmbeddedBroker() { 262 return useEmbeddedBroker; 263 } 264 265 /** 266 * Allows embedded brokers to be associated with a connection factory 267 * 268 * @param useEmbeddedBroker 269 */ 270 public void setUseEmbeddedBroker(boolean useEmbeddedBroker) { 271 this.useEmbeddedBroker = useEmbeddedBroker; 272 } 273 274 /** 275 * The name of the broker to use if creating an embedded broker 276 * 277 * @return 278 */ 279 public String getBrokerName() { 280 if (brokerName == null) { 281 // lets auto-create a broker name 282 brokerName = idGenerator.generateId(); 283 } 284 return brokerName; 285 } 286 287 /** 288 * The name of the broker to use if creating an embedded broker 289 * 290 * @return 291 */ 292 public String getBrokerName(String url) { 293 if (brokerName == null) { 294 brokerName = url; 295 } 296 return brokerName; 297 } 298 299 public void setBrokerName(String brokerName) { 300 this.brokerName = brokerName; 301 } 302 303 /** 304 * @return Returns the useAsyncSend. 305 */ 306 public boolean isUseAsyncSend() { 307 return useAsyncSend; 308 } 309 310 /** 311 * @param useAsyncSend The useAsyncSend to set. 312 */ 313 public void setUseAsyncSend(boolean useAsyncSend) { 314 this.useAsyncSend = useAsyncSend; 315 } 316 317 public WireFormat getWireFormat() { 318 return wireFormat.copy();//need a separate instance - especially if wire format caching enabled 319 } 320 321 /** 322 * Allows the prefetch policy to be configured 323 * 324 * @return 325 */ 326 public ActiveMQPrefetchPolicy getPrefetchPolicy() { 327 return prefetchPolicy; 328 } 329 330 /** 331 * Sets the prefetch policy 332 * 333 * @param prefetchPolicy 334 */ 335 public void setPrefetchPolicy(ActiveMQPrefetchPolicy prefetchPolicy) { 336 this.prefetchPolicy = prefetchPolicy; 337 } 338 339 /** 340 * Set this flag for fast throughput! 341 * <P> 342 * Enables asynchronous sending of messages and disables timestamps by default 343 * </P> 344 * @param value - the flag to set 345 */ 346 public void setTurboBoost(boolean value){ 347 348 disableTimeStampsByDefault = value; 349 useAsyncSend = value; 350 cachingEnabled = value; 351 optimizedMessageDispatch = value; 352 prepareMessageBodyOnSend = !value; 353 copyMessageOnSend = !value; 354 } 355 356 /** 357 * @return true if turboBoost enabled 358 */ 359 public boolean isTurboBoost(){ 360 return disableTimeStampsByDefault && useAsyncSend && cachingEnabled; 361 } 362 363 /** 364 * @return Returns the optimizedMessageDispatch. 365 */ 366 public boolean isOptimizedMessageDispatch() { 367 return optimizedMessageDispatch; 368 } 369 /** 370 * @param optimizedMessageDispatch The optimizedMessageDispatch to set. 371 */ 372 public void setOptimizedMessageDispatch(boolean optimizedMessageDispatch) { 373 this.optimizedMessageDispatch = optimizedMessageDispatch; 374 } 375 /** 376 * @return Returns the disableTimeStampsByDefault. 377 */ 378 public boolean isDisableTimeStampsByDefault() { 379 return disableTimeStampsByDefault; 380 } 381 /** 382 * @param disableTimeStampsByDefault The disableTimeStampsByDefault to set. 383 */ 384 public void setDisableTimeStampsByDefault(boolean disableTimeStampsByDefault) { 385 this.disableTimeStampsByDefault = disableTimeStampsByDefault; 386 } 387 /** 388 * @return Returns the j2EEcompliant. 389 */ 390 public boolean isJ2EEcompliant() { 391 return J2EEcompliant; 392 } 393 /** 394 * @param ecompliant The j2EEcompliant to set. 395 */ 396 public void setJ2EEcompliant(boolean ecompliant) { 397 J2EEcompliant = ecompliant; 398 } 399 400 /** 401 * @return Returns the internalConnection. 402 */ 403 public boolean isInternalConnection() { 404 return internalConnection; 405 } 406 /** 407 * @param internalConnection The internalConnection to set. 408 */ 409 public void setInternalConnection(boolean internalConnection) { 410 this.internalConnection = internalConnection; 411 } 412 /** 413 * @return Returns the quickClose. 414 */ 415 public boolean isQuickClose() { 416 return quickClose; 417 } 418 /** 419 * @param quickClose The quickClose to set. 420 */ 421 public void setQuickClose(boolean quickClose) { 422 this.quickClose = quickClose; 423 } 424 /** 425 * @return Returns the doMessageCompression. 426 */ 427 public boolean isDoMessageCompression() { 428 return doMessageCompression; 429 } 430 /** 431 * @param doMessageCompression The doMessageCompression to set. 432 */ 433 public void setDoMessageCompression(boolean doMessageCompression) { 434 this.doMessageCompression = doMessageCompression; 435 } 436 /** 437 * @return Returns the doMessageFragmentation. 438 */ 439 public boolean isDoMessageFragmentation() { 440 return doMessageFragmentation; 441 } 442 /** 443 * @param doMessageFragmentation The doMessageFragmentation to set. 444 */ 445 public void setDoMessageFragmentation(boolean doMessageFragmentation) { 446 this.doMessageFragmentation = doMessageFragmentation; 447 } 448 /** 449 * @return Returns the messageCompressionLimit. 450 */ 451 public int getMessageCompressionLimit() { 452 return messageCompressionLimit; 453 } 454 /** 455 * @param messageCompressionLimit The messageCompressionLimit to set. 456 */ 457 public void setMessageCompressionLimit(int messageCompressionLimit) { 458 this.messageCompressionLimit = messageCompressionLimit; 459 } 460 /** 461 * @return Returns the messageCompressionStrategy. 462 */ 463 public int getMessageCompressionStrategy() { 464 return messageCompressionStrategy; 465 } 466 /** 467 * @param messageCompressionStrategy The messageCompressionStrategy to set. 468 */ 469 public void setMessageCompressionStrategy(int messageCompressionStrategy) { 470 this.messageCompressionStrategy = messageCompressionStrategy; 471 } 472 /** 473 * @return Returns the messageFragmentationLimit. 474 */ 475 public int getMessageFragmentationLimit() { 476 return messageFragmentationLimit; 477 } 478 /** 479 * @param messageFragmentationLimit The messageFragmentationLimit to set. 480 */ 481 public void setMessageFragmentationLimit(int messageFragmentationLimit) { 482 this.messageFragmentationLimit = messageFragmentationLimit; 483 } 484 485 /** 486 * @return Returns the cachingEnabled. 487 */ 488 public boolean isCachingEnabled() { 489 return cachingEnabled; 490 } 491 /** 492 * @param cachingEnabled The cachingEnabled to set. 493 */ 494 public void setCachingEnabled(boolean cachingEnabled) { 495 this.cachingEnabled = cachingEnabled; 496 } 497 /** 498 * Causes pre-serialization of messages before send 499 * By default this is on 500 * @return Returns the prePrepareMessageOnSend. 501 */ 502 public boolean isPrepareMessageBodyOnSend() { 503 return prepareMessageBodyOnSend; 504 } 505 /** 506 * Causes pre-serialization of messages before send 507 * By default this is on 508 * @param prePrepareMessageOnSend The prePrepareMessageOnSend to set. 509 */ 510 public void setPrepareMessageBodyOnSend(boolean prePrepareMessageOnSend) { 511 this.prepareMessageBodyOnSend = prePrepareMessageOnSend; 512 } 513 /** 514 * @return Returns the copyMessageOnSend. 515 */ 516 public boolean isCopyMessageOnSend() { 517 return copyMessageOnSend; 518 } 519 /** 520 * @param copyMessageOnSend The copyMessageOnSend to set. 521 */ 522 public void setCopyMessageOnSend(boolean copyMessageOnSend) { 523 this.copyMessageOnSend = copyMessageOnSend; 524 } 525 /** 526 * Allows a custom wire format to be used; otherwise the default Java wire format is used 527 * which is designed for minimum size and maximum speed on the Java platform 528 * 529 * @param wireFormat 530 */ 531 public void setWireFormat(WireFormat wireFormat) { 532 this.wireFormat = wireFormat; 533 } 534 535 /** 536 * set the WireFormat by name - e.g. 'default','amqpfast' etc. 537 * 538 * @param format 539 * @throws JMSException 540 */ 541 542 public void setWireFormat(String format) throws JMSException{ 543 this.wireFormat = WireFormatLoader.getWireFormat(format); 544 } 545 546 public String getBrokerXmlConfig() { 547 return brokerXmlConfig; 548 } 549 550 public BrokerContainer getBrokerContainer() { 551 return brokerContainer; 552 } 553 554 /** 555 * Sets the <a href="http://activemq.org/Xml+Configuration">XML configuration file</a> 556 * used to configure the ActiveMQ broker via Spring if using embedded mode. 557 * 558 * @param brokerXmlConfig is the filename which is assumed to be on the classpath unless a URL 559 * is specified. So a value of <code>foo/bar.xml</code> would be assumed to be on the classpath 560 * whereas <code>file:dir/file.xml</code> would use the file system. 561 * Any valid URL string is supported. 562 * @see #setUseEmbeddedBroker(boolean) 563 */ 564 public void setBrokerXmlConfig(String brokerXmlConfig) { 565 this.brokerXmlConfig = brokerXmlConfig; 566 } 567 568 public BrokerContainerFactory getBrokerContainerFactory() throws JMSException { 569 if (brokerContainerFactory == null) { 570 brokerContainerFactory = createBrokerContainerFactory(); 571 } 572 return brokerContainerFactory; 573 } 574 575 public void setBrokerContainerFactory(BrokerContainerFactory brokerContainerFactory) { 576 this.brokerContainerFactory = brokerContainerFactory; 577 } 578 579 /** 580 * Returns the context used to store broker containers and connectors which defaults 581 * to using the singleton 582 */ 583 public BrokerContext getBrokerContext() { 584 return brokerContext; 585 } 586 587 public void setBrokerContext(BrokerContext brokerContext) { 588 this.brokerContext = brokerContext; 589 } 590 591 /** 592 * Create a JMS Connection 593 * 594 * @return the JMS Connection 595 * @throws JMSException if an error occurs creating the Connection 596 */ 597 public Connection createConnection() throws JMSException { 598 return this.createConnection(this.userName, this.password); 599 } 600 601 /** 602 * @param userName 603 * @param password 604 * @return the Connection 605 * @throws JMSException if an error occurs creating the Connection 606 */ 607 public Connection createConnection(String userName, String password) throws JMSException { 608 ActiveMQConnection connection = new ActiveMQConnection(this, userName, password, createTransportChannel(this.brokerURL)); 609 connection.setCachingEnabled(isCachingEnabled()); 610 connection.setUseAsyncSend(isUseAsyncSend()); 611 connection.setDisableTimeStampsByDefault(isDisableTimeStampsByDefault()); 612 connection.setJ2EEcompliant(isJ2EEcompliant()); 613 connection.setDoMessageCompression(isDoMessageCompression()); 614 connection.setMessageCompressionLevel(messageCompressionLevel); 615 connection.setMessageCompressionLimit(getMessageCompressionLimit()); 616 connection.setMessageCompressionStrategy(getMessageCompressionStrategy()); 617 connection.setDoMessageFragmentation(isDoMessageFragmentation()); 618 connection.setMessageFragmentationLimit(getMessageFragmentationLimit()); 619 connection.setPrepareMessageBodyOnSend(isPrepareMessageBodyOnSend()); 620 connection.setInternalConnection(isInternalConnection()); 621 connection.setQuickClose(isQuickClose()); 622 connection.setOptimizedMessageDispatch(isOptimizedMessageDispatch()); 623 connection.setCopyMessageOnSend(isCopyMessageOnSend()); 624 connection.setPrefetchPolicy(getPrefetchPolicy()); 625 if (this.clientID != null && this.clientID.length() > 0) { 626 connection.setClientID(this.clientID); 627 } 628 return connection; 629 } 630 631 /** 632 * Create a JMS QueueConnection 633 * 634 * @return the JMS QueueConnection 635 * @throws JMSException if an error occurs creating the Connection 636 */ 637 public QueueConnection createQueueConnection() throws JMSException { 638 return this.createQueueConnection(this.userName, this.password); 639 } 640 641 /** 642 * @param userName 643 * @param password 644 * @return the QueueConnection 645 * @throws JMSException if an error occurs creating the Connection 646 */ 647 public QueueConnection createQueueConnection(String userName, String password) throws JMSException { 648 return (QueueConnection) createConnection(userName, password); 649 } 650 651 /** 652 * Create a JMS TopicConnection 653 * 654 * @return the JMS TopicConnection 655 * @throws JMSException if an error occurs creating the Connection 656 */ 657 public TopicConnection createTopicConnection() throws JMSException { 658 return this.createTopicConnection(this.userName, this.password); 659 } 660 661 /** 662 * @param userName 663 * @param password 664 * @return the TopicConnection 665 * @throws JMSException if an error occurs creating the Connection 666 */ 667 public TopicConnection createTopicConnection(String userName, String password) throws JMSException { 668 return (TopicConnection) createConnection(userName, password); 669 } 670 671 672 public void start() throws JMSException { 673 } 674 675 /** 676 * A hook to allow any embedded JMS Broker's to be closed down 677 * 678 * @throws JMSException 679 */ 680 public synchronized void stop() throws JMSException { 681 // Stop all embded brokers that we started. 682 for (Iterator iter = startedEmbeddedBrokers.iterator(); iter.hasNext();) { 683 String uri = (String) iter.next(); 684 brokerContext.deregisterConnector(uri); 685 } 686 if (brokerContainer != null) { 687 brokerContainer.stop(); 688 brokerContainer = null; 689 } 690 } 691 692 693 public Broker getEmbeddedBroker() throws JMSException { 694 if (isUseEmbeddedBroker()) { 695 return getContainer(getBrokerName(), getBrokerName()).getBroker(); 696 } 697 return null; 698 } 699 700 public static synchronized void registerBroker(String theURLString, BrokerConnector brokerConnector) { 701 BrokerContext.getInstance().registerConnector(theURLString, brokerConnector); 702 } 703 704 public static synchronized void unregisterBroker(String theURLString) { 705 BrokerContext.getInstance().deregisterConnector(theURLString); 706 } 707 708 709 // Implementation methods 710 //------------------------------------------------------------------------- 711 712 713 /** 714 * Set the properties that will represent the instance in JNDI 715 * 716 * @param props 717 */ 718 protected void buildFromProperties(Properties props) { 719 this.userName = props.getProperty("userName", this.userName); 720 this.password = props.getProperty("password", this.password); 721 String temp = props.getProperty(Context.PROVIDER_URL); 722 if (temp == null || temp.length() == 0) { 723 temp = props.getProperty("brokerURL"); 724 } 725 if (temp != null && temp.length() > 0) { 726 this.brokerURL = temp; 727 } 728 this.brokerName = props.getProperty("brokerName", this.brokerName); 729 this.clientID = props.getProperty("clientID"); 730 this.useAsyncSend = getBoolean(props, "useAsyncSend", true); 731 this.useEmbeddedBroker = getBoolean(props, "useEmbeddedBroker"); 732 this.brokerXmlConfig = props.getProperty("brokerXmlConfig", this.brokerXmlConfig); 733 this.J2EEcompliant = getBoolean(props,"J2EEcompliant",true); 734 if (props.containsKey("turboBoost")){ 735 this.setTurboBoost(getBoolean(props, "turboBoost")); 736 } 737 } 738 739 /** 740 * Initialize the instance from properties stored in JNDI 741 * 742 * @param props 743 */ 744 protected void populateProperties(Properties props) { 745 props.put("userName", this.userName); 746 props.put("password", this.password); 747 props.put("brokerURL", this.brokerURL); 748 props.put(Context.PROVIDER_URL, this.brokerURL); 749 props.put("brokerName", this.brokerName); 750 if (this.clientID != null) { 751 props.put("clientID", this.clientID); 752 } 753 props.put("useAsyncSend", (useAsyncSend) ? "true" : "false"); 754 props.put("useEmbeddedBroker", (useEmbeddedBroker) ? "true" : "false"); 755 props.put("J2EEcompliant", (this.J2EEcompliant) ? "true" : "false"); 756 props.put("turboBoost", (isTurboBoost()) ? "true" : "false"); 757 if (this.brokerXmlConfig != null) { 758 props.put("brokerXmlConfig", this.brokerXmlConfig); 759 } 760 } 761 762 /** 763 * Helper method to return the property value as a boolean flag 764 * 765 * @param props 766 * @param key 767 * @return 768 */ 769 protected boolean getBoolean(Properties props, String key) { 770 return getBoolean(props, key, false); 771 } 772 773 /** 774 * Helper method to return the property value as a boolean flag 775 * 776 * @param props 777 * @param key 778 * @param defaultValue 779 * @return 780 */ 781 protected boolean getBoolean(Properties props, String key, boolean defaultValue) { 782 String value = props.getProperty(key); 783 return value != null ? value.equalsIgnoreCase("true") : defaultValue; 784 } 785 786 protected BrokerContainerFactory createBrokerContainerFactory() throws JMSException { 787 if (brokerXmlConfig != null) { 788 return XmlConfigHelper.createBrokerContainerFactory(brokerXmlConfig); 789 } 790 return new BrokerContainerFactoryImpl(); 791 } 792 793 /** 794 * Factory method to create a TransportChannel from a URL 795 * @param theURLString 796 * @return the TransportChannel to use with the embedded broker 797 * @throws JMSException 798 */ 799 protected TransportChannel createTransportChannel(String theURLString) throws JMSException { 800 URI uri = createURI(theURLString); 801 TransportChannelFactory factory = TransportChannelProvider.getFactory(uri); 802 BrokerConnector brokerConnector = null; 803 boolean created = false; 804 TransportChannel transportChannel = null; 805 boolean embedServer = isUseEmbeddedBroker() || factory.requiresEmbeddedBroker(); 806 if (embedServer) { 807 synchronized (this) { 808 if (factory.requiresEmbeddedBroker()) { 809 transportChannel = factory.create(getWireFormat(), uri); 810 brokerConnector = transportChannel.getEmbeddedBrokerConnector(); 811 } 812 if (brokerConnector == null) { 813 brokerConnector = brokerContext.getConnectorByURL(theURLString); 814 if (brokerConnector == null) { 815 brokerConnector = createBrokerConnector(theURLString); 816 brokerContext.registerConnector(theURLString, brokerConnector); 817 startedEmbeddedBrokers.add(theURLString); 818 created = true; 819 } 820 } 821 else { 822 created = true; 823 } 824 } 825 } 826 if (transportChannel == null){ 827 transportChannel = factory.create(getWireFormat(), uri); 828 } 829 830 if (embedServer) { 831 return ensureServerIsAvailable(uri, transportChannel, brokerConnector, created); 832 } 833 return transportChannel; 834 } 835 836 protected synchronized BrokerContainer getContainer(String url, String name) throws JMSException { 837 if (brokerContainer == null) { 838 brokerContainer = brokerContext.getBrokerContainerByName(url, name, getBrokerContainerFactory()); 839 } 840 return brokerContainer; 841 } 842 843 protected BrokerConnector createBrokerConnector(String url) throws JMSException { 844 BrokerConnector brokerConnector; 845 brokerConnector = new BrokerConnectorImpl(getContainer(url, getBrokerName()), url, getWireFormat()); 846 brokerConnector.start(); 847 848 // lets wait a little for the server to startup 849 log.info("Embedded JMS Broker has started"); 850 try { 851 Thread.sleep(1000); 852 } 853 catch (InterruptedException e) { 854 log.warn("caught exception sleeping",e); 855 } 856 return brokerConnector; 857 } 858 859 860 protected TransportChannel ensureServerIsAvailable(URI remoteLocation, TransportChannel channel, BrokerConnector brokerConnector, boolean created) throws JMSException { 861 ensureVmServerIsAvailable(channel, brokerConnector); 862 if (channel.isMulticast()) { 863 return ensureMulticastChannelIsAvailable(remoteLocation, channel, brokerConnector, created); 864 } 865 return channel; 866 } 867 868 private void ensureVmServerIsAvailable(TransportChannel channel, BrokerConnector brokerConnector) throws JMSException { 869 if (channel instanceof VmTransportChannel && brokerConnector instanceof TransportChannelListener) { 870 VmTransportChannel answer = (VmTransportChannel) channel; 871 answer.connect(brokerConnector); 872 } 873 } 874 875 protected TransportChannel ensureMulticastChannelIsAvailable(URI remoteLocation, TransportChannel channel, BrokerConnector brokerConnector, boolean created) throws JMSException { 876 if (created) { 877 BrokerConnectorImpl brokerImpl = (BrokerConnectorImpl) brokerConnector; 878 879 BrokerClientImpl client = new BrokerClientImpl(); 880 client.initialize(brokerImpl, channel); 881 channel.start(); 882 String brokerClientID = createMulticastClientID(); 883 channel.setClientID(brokerClientID); 884 885 // lets spoof a consumer for topics which will replicate messages 886 // over the multicast transport 887 ConnectionInfo info = new ConnectionInfo(); 888 info.setHostName(IdGenerator.getHostName()); 889 info.setClientId(brokerClientID); 890 info.setStarted(true); 891 client.consumeConnectionInfo(info); 892 893 ConsumerInfo consumerInfo = new ConsumerInfo(); 894 consumerInfo.setDestination(new ActiveMQTopic(">")); 895 consumerInfo.setNoLocal(true); 896 consumerInfo.setClientId(brokerClientID); 897 consumerInfo.setConsumerId(idGenerator.generateId()); 898 consumerInfo.setStarted(true); 899 client.consumeConsumerInfo(consumerInfo); 900 901 consumerInfo = new ConsumerInfo(); 902 consumerInfo.setDestination(new ActiveMQQueue(">")); 903 consumerInfo.setNoLocal(true); 904 consumerInfo.setClientId(brokerClientID); 905 consumerInfo.setConsumerId(idGenerator.generateId()); 906 consumerInfo.setStarted(true); 907 client.consumeConsumerInfo(consumerInfo); 908 } 909 910 // now lets create a VM channel that the JMS client will use 911 // to connect to the embedded brokerConnector 912 URI localURI = createURI("vm", remoteLocation); 913 TransportChannel localChannel = TransportChannelProvider.create(getWireFormat(), localURI); 914 ensureVmServerIsAvailable(localChannel, brokerConnector); 915 return localChannel; 916 } 917 918 /** 919 * Creates the clientID for the multicast client (used to dispatch local 920 * messages over a multicast bus) 921 */ 922 protected String createMulticastClientID() { 923 return idGenerator.generateId(); 924 } 925 926 protected URI createURI(String protocol, URI uri) throws JMSException { 927 try { 928 return new URI(protocol, uri.getRawSchemeSpecificPart(), uri.getFragment()); 929 } 930 catch (URISyntaxException e) { 931 JMSException jmsEx = new JMSException("the URL string is badly formated:", e.getMessage()); 932 jmsEx.setLinkedException(e); 933 throw jmsEx; 934 935 } 936 } 937 938 protected URI createURI(String uri) throws JMSException { 939 try { 940 if (uri == null) { 941 throw new JMSException("The connection URI must be specified!"); 942 } 943 return new URI(uri); 944 } 945 catch (URISyntaxException e) { 946 e.printStackTrace(); 947 JMSException jmsEx = new JMSException("the URL string is badly formated:", e.getMessage()); 948 jmsEx.setLinkedException(e); 949 throw jmsEx; 950 951 } 952 } 953 954 /** 955 * Called when a connection is closed so that we can shut down any embedded brokers cleanly 956 * 957 * @param connection 958 */ 959 synchronized void onConnectionClose(ActiveMQConnection connection) throws JMSException { 960 if (--connectionCount <= 0) { 961 // close any broker if we've got one 962 stop(); 963 } 964 965 } 966 967 synchronized void onConnectionCreate(ActiveMQConnection connection) { 968 ++connectionCount; 969 } 970 971 }