001 /** 002 * 003 * Copyright 2004 Protique Ltd 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 * 017 **/ 018 package org.activemq.broker.impl; 019 020 import org.activemq.broker.BrokerClient; 021 import org.activemq.broker.BrokerConnector; 022 import org.activemq.broker.BrokerContainer; 023 import org.activemq.io.WireFormat; 024 import org.activemq.message.ActiveMQMessage; 025 import org.activemq.message.ActiveMQXid; 026 import org.activemq.message.BrokerInfo; 027 import org.activemq.message.ConnectionInfo; 028 import org.activemq.message.ConsumerInfo; 029 import org.activemq.message.DurableUnsubscribe; 030 import org.activemq.message.MessageAck; 031 import org.activemq.message.ProducerInfo; 032 import org.activemq.message.SessionInfo; 033 import org.activemq.transport.TransportChannel; 034 import org.activemq.transport.TransportChannelListener; 035 import org.activemq.transport.TransportServerChannel; 036 import org.activemq.transport.TransportServerChannelProvider; 037 import org.apache.commons.logging.Log; 038 import org.apache.commons.logging.LogFactory; 039 040 import javax.jms.JMSException; 041 import javax.jms.JMSSecurityException; 042 import javax.transaction.xa.XAException; 043 import java.net.URI; 044 import java.net.URISyntaxException; 045 import java.util.Collections; 046 import java.util.HashMap; 047 import java.util.Map; 048 049 /** 050 * An implementation of the broker (the JMS server) 051 * 052 * @version $Revision: 1.1.1.1 $ 053 */ 054 public class BrokerConnectorImpl implements BrokerConnector, TransportChannelListener { 055 056 private TransportServerChannel serverChannel; 057 private Log log; 058 private BrokerContainer container; 059 private Map clients = Collections.synchronizedMap(new HashMap()); 060 061 /** 062 * Helper constructor for TCP protocol with the given bind address 063 * 064 * @param container 065 * @param bindAddress 066 * @throws JMSException 067 */ 068 public BrokerConnectorImpl(BrokerContainer container, String bindAddress, WireFormat wireFormat) throws JMSException { 069 this(container, createTransportServerChannel(wireFormat, bindAddress)); 070 } 071 072 /** 073 * @param container 074 * @param serverChannel 075 */ 076 public BrokerConnectorImpl(BrokerContainer container, TransportServerChannel serverChannel) { 077 this(container); 078 this.serverChannel = serverChannel; 079 serverChannel.setTransportChannelListener(this); 080 } 081 082 /** 083 * @param container 084 * @param serverChannel 085 */ 086 public BrokerConnectorImpl(BrokerContainer container) { 087 assert container != null; 088 this.log = LogFactory.getLog(getClass().getName()); 089 this.container = container; 090 this.container.addConnector(this); 091 092 } 093 094 /** 095 * @return infomation about the Broker 096 */ 097 public BrokerInfo getBrokerInfo() { 098 return container.getBroker().getBrokerInfo(); 099 } 100 101 /** 102 * Get a hint about the broker capacity for more messages 103 * 104 * @return percentage value (0-100) about how much capacity the 105 * broker has 106 */ 107 public int getBrokerCapacity() { 108 return container.getBroker().getRoundedCapacity(); 109 } 110 111 /** 112 * @return Get the server channel 113 */ 114 public TransportServerChannel getServerChannel() { 115 return serverChannel; 116 } 117 118 /** 119 * start the Broker 120 * 121 * @throws JMSException 122 */ 123 public void start() throws JMSException { 124 if (this.serverChannel != null){ 125 this.serverChannel.start(); 126 } 127 log.info("ActiveMQ connector started: " + serverChannel); 128 } 129 130 /** 131 * Stop the Broker 132 * 133 * @throws JMSException 134 */ 135 public void stop() throws JMSException { 136 this.container.removeConnector(this); 137 if (this.serverChannel != null){ 138 this.serverChannel.stop(); 139 } 140 log.info("ActiveMQ connector stopped: " + serverChannel); 141 } 142 143 /** 144 * Register a Broker Client 145 * 146 * @param client 147 * @param info contains infomation about the Connection this Client represents 148 * @throws JMSException 149 * @throws javax.jms.InvalidClientIDException 150 * if the JMS client specifies an invalid or duplicate client ID. 151 * @throws JMSSecurityException if client authentication fails due to an invalid user name or password. 152 */ 153 public void registerClient(BrokerClient client, ConnectionInfo info) throws JMSException { 154 this.container.registerConnection(client, info); 155 } 156 157 /** 158 * Deregister a Broker Client 159 * 160 * @param client 161 * @param info 162 * @throws JMSException if some internal error occurs 163 */ 164 public void deregisterClient(BrokerClient client, ConnectionInfo info) throws JMSException { 165 this.container.deregisterConnection(client, info); 166 } 167 168 /** 169 * Registers a MessageConsumer 170 * 171 * @param client 172 * @param info 173 * @throws JMSException 174 * @throws JMSSecurityException if client authentication fails for the Destination the Consumer applies for 175 */ 176 public void registerMessageConsumer(BrokerClient client, ConsumerInfo info) throws JMSException { 177 if (info.getDestination() == null) { 178 throw new JMSException("No Destination specified on consumerInfo for client: " + client + " info: " + info); 179 } 180 this.container.registerMessageConsumer(client, info); 181 182 } 183 184 /** 185 * De-register a MessageConsumer from the Broker 186 * 187 * @param client 188 * @param info 189 * @throws JMSException 190 */ 191 public void deregisterMessageConsumer(BrokerClient client, ConsumerInfo info) throws JMSException { 192 this.container.deregisterMessageConsumer(client, info); 193 } 194 195 /** 196 * Registers a MessageProducer 197 * 198 * @param client 199 * @param info 200 * @throws JMSException 201 * @throws JMSSecurityException if client authentication fails for the Destination the Consumer applies for 202 */ 203 public void registerMessageProducer(BrokerClient client, ProducerInfo info) throws JMSException { 204 this.container.registerMessageProducer(client, info); 205 } 206 207 /** 208 * De-register a MessageProducer from the Broker 209 * 210 * @param client 211 * @param info 212 * @throws JMSException 213 */ 214 public void deregisterMessageProducer(BrokerClient client, ProducerInfo info) throws JMSException { 215 this.container.deregisterMessageProducer(client, info); 216 } 217 218 /** 219 * Register a client-side Session (used for Monitoring) 220 * 221 * @param client 222 * @param info 223 * @throws JMSException 224 */ 225 public void registerSession(BrokerClient client, SessionInfo info) throws JMSException { 226 this.container.registerSession(client, info); 227 } 228 229 /** 230 * De-register a client-side Session from the Broker (used for monitoring) 231 * 232 * @param client 233 * @param info 234 * @throws JMSException 235 */ 236 public void deregisterSession(BrokerClient client, SessionInfo info) throws JMSException { 237 this.container.deregisterSession(client, info); 238 } 239 240 /** 241 * Start a transaction from the Client session 242 * 243 * @param client 244 * @param transactionId 245 * @throws JMSException 246 */ 247 public void startTransaction(BrokerClient client, String transactionId) throws JMSException { 248 this.container.startTransaction(client, transactionId); 249 } 250 251 /** 252 * Rollback a transacton 253 * 254 * @param client 255 * @param transactionId 256 * @throws JMSException 257 */ 258 public void rollbackTransaction(BrokerClient client, String transactionId) throws JMSException { 259 this.container.rollbackTransaction(client, transactionId); 260 } 261 262 /** 263 * Commit a transaction 264 * 265 * @param client 266 * @param transactionId 267 * @throws JMSException 268 */ 269 public void commitTransaction(BrokerClient client, String transactionId) throws JMSException { 270 this.container.commitTransaction(client, transactionId); 271 } 272 273 /** 274 * Send a non-transacted message to the Broker 275 * 276 * @param client 277 * @param message 278 * @throws JMSException 279 */ 280 public void sendMessage(BrokerClient client, ActiveMQMessage message) throws JMSException { 281 this.container.sendMessage(client, message); 282 } 283 284 /** 285 * Acknowledge reciept of a message 286 * 287 * @param client 288 * @param ack 289 * @throws JMSException 290 */ 291 public void acknowledgeMessage(BrokerClient client, MessageAck ack) throws JMSException { 292 this.container.acknowledgeMessage(client, ack); 293 } 294 295 /** 296 * Command to delete a durable topic subscription 297 * 298 * @param client 299 * @param ds 300 * @throws JMSException 301 */ 302 public void durableUnsubscribe(BrokerClient client, DurableUnsubscribe ds) throws JMSException { 303 this.container.durableUnsubscribe(client, ds); 304 } 305 306 307 /** 308 * @param channel - client to add 309 */ 310 public void addClient(TransportChannel channel) { 311 try { 312 BrokerClient client = new BrokerClientImpl(); 313 client.initialize(this, channel); 314 if (log.isDebugEnabled()) { 315 log.debug("Starting new client: " + client); 316 } 317 channel.setServerSide(true); 318 channel.start(); 319 clients.put(channel, client); 320 } 321 catch (JMSException e) { 322 log.error("Failed to add client due to: " + e, e); 323 } 324 } 325 326 /** 327 * @param channel - client to remove 328 */ 329 public void removeClient(TransportChannel channel) { 330 BrokerClient client = (BrokerClient) clients.remove(channel); 331 if (client != null) { 332 if (log.isDebugEnabled()) { 333 log.debug("Client leaving client: " + client); 334 } 335 336 // we may have already been closed, if not then lets simulate a normal shutdown 337 client.cleanUp(); 338 } 339 else { 340 // might have got a duplicate callback 341 log.warn("No such client for channel: " + channel); 342 } 343 } 344 345 /** 346 * @return the BrokerContainer for this Connector 347 */ 348 public BrokerContainer getBrokerContainer() { 349 return this.container; 350 } 351 352 /** 353 * Start an XA transaction. 354 * 355 * @see org.activemq.broker.BrokerConnector#startTransaction(org.activemq.broker.BrokerClient, org.activemq.message.ActiveMQXid) 356 */ 357 public void startTransaction(BrokerClient client, ActiveMQXid xid) throws XAException { 358 this.container.startTransaction(client, xid); 359 } 360 361 /** 362 * Gets the prepared XA transactions. 363 * 364 * @see org.activemq.broker.BrokerConnector#getPreparedTransactions(org.activemq.broker.BrokerClient) 365 */ 366 public ActiveMQXid[] getPreparedTransactions(BrokerClient client) throws XAException { 367 return this.container.getPreparedTransactions(client); 368 } 369 370 /** 371 * Prepare an XA transaction. 372 * 373 * @see org.activemq.broker.BrokerConnector#prepareTransaction(org.activemq.broker.BrokerClient, org.activemq.message.ActiveMQXid) 374 */ 375 public int prepareTransaction(BrokerClient client, ActiveMQXid xid) throws XAException { 376 return this.container.prepareTransaction(client, xid); 377 } 378 379 /** 380 * Rollback an XA transaction. 381 * 382 * @see org.activemq.broker.BrokerConnector#rollbackTransaction(org.activemq.broker.BrokerClient, org.activemq.message.ActiveMQXid) 383 */ 384 public void rollbackTransaction(BrokerClient client, ActiveMQXid xid) throws XAException { 385 this.container.rollbackTransaction(client, xid); 386 } 387 388 /** 389 * Commit an XA transaction. 390 * 391 * @see org.activemq.broker.BrokerConnector#commitTransaction(org.activemq.broker.BrokerClient, org.activemq.message.ActiveMQXid, boolean) 392 */ 393 public void commitTransaction(BrokerClient client, ActiveMQXid xid, boolean onePhase) throws XAException { 394 this.container.commitTransaction(client, xid, onePhase); 395 } 396 397 /** 398 * @see org.activemq.broker.BrokerConnector#getResourceManagerId(org.activemq.broker.BrokerClient) 399 */ 400 public String getResourceManagerId(BrokerClient client) { 401 // TODO: I think we need to return a better (more unique) RM id. 402 return getBrokerInfo().getBrokerName(); 403 } 404 405 406 // Implementation methods 407 //------------------------------------------------------------------------- 408 /** 409 * Factory method ot create a transport channel 410 * 411 * @param bindAddress 412 * @return @throws JMSException 413 * @throws JMSException 414 */ 415 protected static TransportServerChannel createTransportServerChannel(WireFormat wireFormat, String bindAddress) throws JMSException { 416 URI url; 417 try { 418 url = new URI(bindAddress); 419 } 420 catch (URISyntaxException e) { 421 JMSException jmsEx = new JMSException("Badly formated bindAddress: " + e.getMessage()); 422 jmsEx.setLinkedException(e); 423 throw jmsEx; 424 } 425 return TransportServerChannelProvider.create(wireFormat, url); 426 } 427 428 }