001 /** 002 * 003 * Copyright 2004 Hiram Chirino 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 * 017 **/ 018 package org.activemq.ra; 019 020 import java.io.PrintWriter; 021 import java.util.ArrayList; 022 import java.util.Iterator; 023 024 import javax.jms.Connection; 025 import javax.jms.ExceptionListener; 026 import javax.jms.JMSException; 027 import javax.resource.ResourceException; 028 import javax.resource.spi.ConnectionEvent; 029 import javax.resource.spi.ConnectionEventListener; 030 import javax.resource.spi.ConnectionRequestInfo; 031 import javax.resource.spi.LocalTransaction; 032 import javax.resource.spi.ManagedConnection; 033 import javax.resource.spi.ManagedConnectionMetaData; 034 import javax.security.auth.Subject; 035 import javax.transaction.xa.XAResource; 036 037 import org.apache.commons.logging.Log; 038 import org.apache.commons.logging.LogFactory; 039 import org.activemq.ActiveMQConnection; 040 import org.activemq.LocalTransactionEventListener; 041 import org.activemq.TransactionContext; 042 043 /** 044 * ActiveMQManagedConnection maps to real physical connection to the 045 * server. Since a ManagedConnection has to provide a transaction 046 * managment interface to the physical connection, and sessions 047 * are the objects implement transaction managment interfaces in 048 * the JMS API, this object also maps to a singe physical JMS session. 049 * <p/> 050 * The side-effect is that JMS connection the application gets 051 * will allways create the same session object. This is good if 052 * running in an app server since the sessions are elisted in the 053 * context transaction. This is bad if used outside of an app 054 * server since the user may be trying to create 2 different 055 * sessions to coordinate 2 different uow. 056 * 057 * @version $Revision: 1.1.1.1 $ 058 */ 059 public class ActiveMQManagedConnection implements ManagedConnection, ExceptionListener { // TODO: , DissociatableManagedConnection { 060 061 private static final Log log = LogFactory.getLog(ActiveMQManagedConnection.class); 062 063 private PrintWriter logWriter; 064 065 private final ActiveMQConnection physicalConnection; 066 private final TransactionContext transactionContext; 067 private final ArrayList proxyConnections = new ArrayList(); 068 private final ArrayList listeners = new ArrayList(); 069 private final LocalAndXATransaction localAndXATransaction; 070 071 private Subject subject; 072 private ActiveMQConnectionRequestInfo info; 073 private boolean destoryed; 074 075 public ActiveMQManagedConnection(Subject subject, ActiveMQConnection physicalConnection, ActiveMQConnectionRequestInfo info) throws ResourceException { 076 try { 077 this.subject = subject; 078 this.info = info; 079 this.physicalConnection = physicalConnection; 080 this.transactionContext = new TransactionContext(physicalConnection); 081 082 this.localAndXATransaction = new LocalAndXATransaction(transactionContext) { 083 public void setInManagedTx(boolean inManagedTx) throws JMSException { 084 super.setInManagedTx(inManagedTx); 085 Iterator iterator = proxyConnections.iterator(); 086 while (iterator.hasNext()) { 087 JMSConnectionProxy proxy = (JMSConnectionProxy) iterator.next(); 088 proxy.setUseSharedTxContext(inManagedTx); 089 } 090 } 091 }; 092 093 this.transactionContext.setLocalTransactionEventListener( new LocalTransactionEventListener() { 094 public void beginEvent() { 095 fireBeginEvent(); 096 } 097 public void commitEvent() { 098 fireCommitEvent(); 099 } 100 public void rollbackEvent() { 101 fireRollbackEvent(); 102 } 103 }); 104 105 physicalConnection.setExceptionListener(this); 106 } catch (JMSException e) { 107 throw new ResourceException("Could not create a new connection: "+e.getMessage(), e); 108 } 109 } 110 111 public boolean isInManagedTx() { 112 return localAndXATransaction.isInManagedTx(); 113 } 114 115 static public boolean matches(Object x, Object y) { 116 if (x == null ^ y == null) { 117 return false; 118 } 119 if (x != null && !x.equals(y)) { 120 return false; 121 } 122 return true; 123 } 124 125 public void associate(Subject subject, ActiveMQConnectionRequestInfo info) throws JMSException { 126 127 // Do we need to change the associated userid/password 128 if( !matches(info.getUserName(), this.info.getUserName()) || !matches(info.getPassword(), this.info.getPassword()) ) { 129 ((ActiveMQConnection)physicalConnection).changeUserInfo(info.getUserName(), info.getPassword()); 130 } 131 132 // Do we need to set the clientId? 133 if( info.getClientid()!=null && info.getClientid().length()>0 ) 134 physicalConnection.setClientID(info.getClientid()); 135 136 this.subject = subject; 137 this.info = info; 138 } 139 140 public Connection getPhysicalConnection() { 141 return physicalConnection; 142 } 143 144 private void fireBeginEvent() { 145 ConnectionEvent event = new ConnectionEvent(ActiveMQManagedConnection.this, 146 ConnectionEvent.LOCAL_TRANSACTION_STARTED); 147 Iterator iterator = listeners.iterator(); 148 while (iterator.hasNext()) { 149 ConnectionEventListener l = (ConnectionEventListener) iterator.next(); 150 l.localTransactionStarted(event); 151 } 152 } 153 154 private void fireCommitEvent() { 155 ConnectionEvent event = new ConnectionEvent(ActiveMQManagedConnection.this, 156 ConnectionEvent.LOCAL_TRANSACTION_COMMITTED); 157 Iterator iterator = listeners.iterator(); 158 while (iterator.hasNext()) { 159 ConnectionEventListener l = (ConnectionEventListener) iterator.next(); 160 l.localTransactionCommitted(event); 161 } 162 } 163 164 private void fireRollbackEvent() { 165 ConnectionEvent event = new ConnectionEvent(ActiveMQManagedConnection.this, 166 ConnectionEvent.LOCAL_TRANSACTION_ROLLEDBACK); 167 Iterator iterator = listeners.iterator(); 168 while (iterator.hasNext()) { 169 ConnectionEventListener l = (ConnectionEventListener) iterator.next(); 170 l.localTransactionRolledback(event); 171 } 172 } 173 174 private void fireCloseEvent(JMSConnectionProxy proxy) { 175 ConnectionEvent event = new ConnectionEvent(ActiveMQManagedConnection.this, 176 ConnectionEvent.CONNECTION_CLOSED); 177 event.setConnectionHandle(proxy); 178 179 Iterator iterator = listeners.iterator(); 180 while (iterator.hasNext()) { 181 ConnectionEventListener l = (ConnectionEventListener) iterator.next(); 182 l.connectionClosed(event); 183 } 184 } 185 186 private void fireErrorOccurredEvent(Exception error) { 187 ConnectionEvent event = new ConnectionEvent(ActiveMQManagedConnection.this, 188 ConnectionEvent.CONNECTION_ERROR_OCCURRED, error); 189 Iterator iterator = listeners.iterator(); 190 while (iterator.hasNext()) { 191 ConnectionEventListener l = (ConnectionEventListener) iterator.next(); 192 l.connectionErrorOccurred(event); 193 } 194 } 195 196 /** 197 * @see javax.resource.spi.ManagedConnection#getConnection(javax.security.auth.Subject, 198 * javax.resource.spi.ConnectionRequestInfo) 199 */ 200 public Object getConnection(Subject subject, ConnectionRequestInfo info) 201 throws ResourceException { 202 JMSConnectionProxy proxy = new JMSConnectionProxy(this); 203 proxyConnections.add(proxy); 204 return proxy; 205 } 206 207 private boolean isDestroyed() { 208 return destoryed; 209 } 210 211 /** 212 * Close down the physical connection to the server. 213 * 214 * @see javax.resource.spi.ManagedConnection#destroy() 215 */ 216 public void destroy() throws ResourceException { 217 // Have we allready been destroyed?? 218 if (isDestroyed()) { 219 return; 220 } 221 222 cleanup(); 223 224 try { 225 physicalConnection.close(); 226 destoryed = true; 227 } catch (JMSException e) { 228 log.info("Error occured during close of a JMS connection.", e); 229 } 230 } 231 232 /** 233 * Cleans up all proxy handles attached to this physical connection so that 234 * they cannot be used anymore. 235 * 236 * @see javax.resource.spi.ManagedConnection#cleanup() 237 */ 238 public void cleanup() throws ResourceException { 239 240 // Have we allready been destroyed?? 241 if (isDestroyed()) { 242 return; 243 } 244 245 Iterator iterator = proxyConnections.iterator(); 246 while (iterator.hasNext()) { 247 JMSConnectionProxy proxy = (JMSConnectionProxy) iterator.next(); 248 proxy.cleanup(); 249 } 250 proxyConnections.clear(); 251 252 try { 253 ((ActiveMQConnection)physicalConnection).cleanup(); 254 } catch (JMSException e) { 255 throw new ResourceException("Could cleanup the ActiveMQ connection: "+e, e); 256 } 257 258 } 259 260 /** 261 * @see javax.resource.spi.ManagedConnection#associateConnection(java.lang.Object) 262 */ 263 public void associateConnection(Object connection) throws ResourceException { 264 throw new ResourceException("Not supported."); 265 } 266 267 /** 268 * @see javax.resource.spi.ManagedConnection#addConnectionEventListener(javax.resource.spi.ConnectionEventListener) 269 */ 270 public void addConnectionEventListener(ConnectionEventListener listener) { 271 listeners.add(listener); 272 } 273 274 /** 275 * @see javax.resource.spi.ManagedConnection#removeConnectionEventListener(javax.resource.spi.ConnectionEventListener) 276 */ 277 public void removeConnectionEventListener(ConnectionEventListener listener) { 278 listeners.remove(listener); 279 } 280 281 /** 282 * @see javax.resource.spi.ManagedConnection#getXAResource() 283 */ 284 public XAResource getXAResource() throws ResourceException { 285 return localAndXATransaction; 286 } 287 288 /** 289 * @see javax.resource.spi.ManagedConnection#getLocalTransaction() 290 */ 291 public LocalTransaction getLocalTransaction() throws ResourceException { 292 return localAndXATransaction; 293 } 294 295 /** 296 * @see javax.resource.spi.ManagedConnection#getMetaData() 297 */ 298 public ManagedConnectionMetaData getMetaData() throws ResourceException { 299 return new ManagedConnectionMetaData() { 300 301 public String getEISProductName() throws ResourceException { 302 if (physicalConnection == null) { 303 throw new ResourceException("Not connected."); 304 } 305 try { 306 return physicalConnection.getMetaData().getJMSProviderName(); 307 } 308 catch (JMSException e) { 309 throw new ResourceException("Error accessing provider.", e); 310 } 311 } 312 313 public String getEISProductVersion() throws ResourceException { 314 if (physicalConnection == null) { 315 throw new ResourceException("Not connected."); 316 } 317 try { 318 return physicalConnection.getMetaData().getProviderVersion(); 319 } 320 catch (JMSException e) { 321 throw new ResourceException("Error accessing provider.", e); 322 } 323 } 324 325 public int getMaxConnections() throws ResourceException { 326 if (physicalConnection == null) { 327 throw new ResourceException("Not connected."); 328 } 329 return Integer.MAX_VALUE; 330 } 331 332 public String getUserName() throws ResourceException { 333 if (physicalConnection == null) { 334 throw new ResourceException("Not connected."); 335 } 336 try { 337 return physicalConnection.getClientID(); 338 } 339 catch (JMSException e) { 340 throw new ResourceException("Error accessing provider.", e); 341 } 342 } 343 }; 344 } 345 346 /** 347 * @see javax.resource.spi.ManagedConnection#setLogWriter(java.io.PrintWriter) 348 */ 349 public void setLogWriter(PrintWriter logWriter) throws ResourceException { 350 this.logWriter = logWriter; 351 } 352 353 /** 354 * @see javax.resource.spi.ManagedConnection#getLogWriter() 355 */ 356 public PrintWriter getLogWriter() throws ResourceException { 357 return logWriter; 358 } 359 360 /** 361 * @param subject 362 * @param info 363 * @return 364 */ 365 public boolean matches(Subject subject, ConnectionRequestInfo info) { 366 367 // Check to see if it is our info class 368 if (info == null) { 369 return false; 370 } 371 if (info.getClass() != ActiveMQConnectionRequestInfo.class) { 372 return false; 373 } 374 375 // Do the subjects match? 376 if (subject == null ^ this.subject == null) { 377 return false; 378 } 379 if (subject != null && !subject.equals(this.subject)) { 380 return false; 381 } 382 383 // Does the info match? 384 return info.equals(this.info); 385 } 386 387 /** 388 * When a proxy is closed this cleans up the proxy and notifys the 389 * ConnectionEventListeners that a connection closed. 390 * 391 * @param proxy 392 */ 393 public void proxyClosedEvent(JMSConnectionProxy proxy) { 394 proxyConnections.remove(proxy); 395 proxy.cleanup(); 396 fireCloseEvent(proxy); 397 } 398 399 public void onException(JMSException e) { 400 log.warn("Connection failed: "+e); 401 log.debug("Cause: ", e); 402 403 // Let any active proxy connections know that exception occured. 404 for (Iterator iter = proxyConnections.iterator(); iter.hasNext();) { 405 JMSConnectionProxy proxy = (JMSConnectionProxy) iter.next(); 406 proxy.onException(e); 407 } 408 // Let the container know that the error occured. 409 fireErrorOccurredEvent(e); 410 } 411 412 /** 413 * @return Returns the transactionContext. 414 */ 415 public TransactionContext getTransactionContext() { 416 return transactionContext; 417 } 418 419 }