001 /** 002 * 003 * Copyright 2004 Protique Ltd 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 * 017 **/ 018 package org.activemq.ra; 019 020 import java.lang.reflect.Method; 021 022 import javax.jms.JMSException; 023 import javax.jms.Message; 024 import javax.jms.MessageListener; 025 import javax.jms.MessageProducer; 026 import javax.jms.ServerSession; 027 import javax.jms.Session; 028 import javax.resource.spi.endpoint.MessageEndpoint; 029 import javax.resource.spi.work.Work; 030 import javax.resource.spi.work.WorkEvent; 031 import javax.resource.spi.work.WorkException; 032 import javax.resource.spi.work.WorkListener; 033 import javax.resource.spi.work.WorkManager; 034 035 import org.activemq.ActiveMQSession; 036 import org.activemq.ActiveMQSession.DeliveryListener; 037 import org.activemq.util.JMSExceptionHelper; 038 import org.apache.commons.logging.Log; 039 import org.apache.commons.logging.LogFactory; 040 041 /** 042 * @version $Revision: 1.1.1.1 $ 043 */ 044 public class ServerSessionImpl implements ServerSession, SessionAndProducer, Work, DeliveryListener { 045 046 public static final Method ON_MESSAGE_METHOD; 047 048 static { 049 try { 050 ON_MESSAGE_METHOD = MessageListener.class.getMethod("onMessage", new Class[]{Message.class}); 051 } 052 catch (Exception e) { 053 throw new ExceptionInInitializerError(e); 054 } 055 } 056 057 private static int nextLogId=0; 058 synchronized static private int getNextLogId() { 059 return nextLogId++; 060 } 061 062 private int serverSessionId = getNextLogId(); 063 private final Log log = LogFactory.getLog( ServerSessionImpl.class.getName()+":"+serverSessionId ); 064 065 private ActiveMQSession session; 066 private WorkManager workManager; 067 private MessageEndpoint endpoint; 068 private MessageProducer messageProducer; 069 private final ServerSessionPoolImpl pool; 070 071 private Object runControlMutex = new Object(); 072 private boolean runningFlag = false; 073 /** 074 * True if an error was detected that cause this session to be stale. When a session 075 * is stale, it should not be used again for proccessing. 076 */ 077 private boolean stale; 078 /** 079 * Does the TX commit need to be managed by the RA? 080 */ 081 private final boolean useRAManagedTx; 082 /** 083 * The maximum number of messages to batch 084 */ 085 private final int batchSize; 086 /** 087 * The current number of messages in the batch 088 */ 089 private int currentBatchSize; 090 091 public ServerSessionImpl(ServerSessionPoolImpl pool, ActiveMQSession session, WorkManager workManager, MessageEndpoint endpoint, boolean useRAManagedTx, int batchSize) throws JMSException { 092 this.pool = pool; 093 this.session = session; 094 this.workManager = workManager; 095 this.endpoint = endpoint; 096 this.useRAManagedTx = useRAManagedTx; 097 this.session.setMessageListener((MessageListener) endpoint); 098 this.session.setDeliveryListener(this); 099 this.batchSize = batchSize; 100 } 101 102 public Session getSession() throws JMSException { 103 return session; 104 } 105 106 public MessageProducer getMessageProducer() throws JMSException { 107 if (messageProducer == null) { 108 messageProducer = getSession().createProducer(null); 109 } 110 return messageProducer; 111 } 112 113 /** 114 * @see javax.jms.ServerSession#start() 115 */ 116 public void start() throws JMSException { 117 118 synchronized (runControlMutex) { 119 if (runningFlag) { 120 log.debug("Start request ignored, allready running."); 121 return; 122 } 123 runningFlag = true; 124 } 125 126 // We get here because we need to start a async worker. 127 log.debug("Starting run."); 128 try { 129 workManager.scheduleWork(this, WorkManager.INDEFINITE, null, 130 new WorkListener() { 131 //The work listener is useful only for debugging... 132 public void workAccepted(WorkEvent event) { 133 log.debug("Work accepted: " + event); 134 } 135 136 public void workRejected(WorkEvent event) { 137 log.debug("Work rejected: " + event); 138 } 139 140 public void workStarted(WorkEvent event) { 141 log.debug("Work started: " + event); 142 } 143 144 public void workCompleted(WorkEvent event) { 145 log.debug("Work completed: " + event); 146 } 147 148 }); 149 } 150 catch (WorkException e) { 151 throw (JMSException) new JMSException("Start failed: " + e).initCause(e); 152 } 153 } 154 155 /** 156 * @see java.lang.Runnable#run() 157 */ 158 synchronized public void run() { 159 log.debug("Running"); 160 while (true) { 161 log.debug("run loop start"); 162 try { 163 SessionAndProducerHelper.register(this); 164 currentBatchSize = 0; 165 session.run(); 166 } 167 catch (Throwable e) { 168 stale=true; 169 log.debug("Endpoint failed to process message.", e); 170 log.info("Endpoint failed to process message. Reason: " + e); 171 } 172 finally { 173 SessionAndProducerHelper.unregister(this); 174 log.debug("run loop end"); 175 synchronized (runControlMutex) { 176 // This endpoint may have gone stale due to error 177 if( stale) { 178 runningFlag = false; 179 pool.removeFromPool(this); 180 break; 181 } 182 if( !session.hasUncomsumedMessages() ) { 183 runningFlag = false; 184 pool.returnToPool(this); 185 break; 186 } 187 } 188 } 189 } 190 log.debug("Run finished"); 191 } 192 193 194 /** 195 * The ActiveMQSession's run method will call back to this method before 196 * dispactching a message to the MessageListener. 197 */ 198 public void beforeDelivery(ActiveMQSession session, Message msg) { 199 if (currentBatchSize == 0) { 200 try { 201 endpoint.beforeDelivery(ON_MESSAGE_METHOD); 202 } catch (Throwable e) { 203 throw new RuntimeException("Endpoint before delivery notification failure", e); 204 } 205 } 206 } 207 208 /** 209 * The ActiveMQSession's run method will call back to this method after 210 * dispactching a message to the MessageListener. 211 */ 212 public void afterDelivery(ActiveMQSession session, Message msg) { 213 if (++currentBatchSize >= batchSize || !session.hasUncomsumedMessages()) { 214 currentBatchSize = 0; 215 try { 216 endpoint.afterDelivery(); 217 } catch (Throwable e) { 218 throw new RuntimeException("Endpoint after delivery notification failure", e); 219 } finally { 220 if( session.getTransactionContext().isInLocalTransaction() ) { 221 if( !useRAManagedTx ) { 222 // Sanitiy Check: If the local transaction has not been commited.. 223 // Commit it now. 224 log.warn("Local transaction had not been commited. Commiting now."); 225 } 226 try { 227 session.commit(); 228 } catch (JMSException e) { 229 log.info("Commit failed:", e); 230 } 231 } 232 } 233 } 234 } 235 236 /** 237 * @see javax.resource.spi.work.Work#release() 238 */ 239 public void release() { 240 log.debug("release called"); 241 } 242 243 /** 244 * @see java.lang.Object#toString() 245 */ 246 public String toString() { 247 return "ServerSessionImpl:"+serverSessionId; 248 } 249 250 public void close() { 251 try { 252 endpoint.release(); 253 } catch (Throwable e) { 254 log.debug("Endpoint did not release properly: "+e,e); 255 } 256 try { 257 session.close(); 258 } catch (Throwable e) { 259 log.debug("Session did not close properly: "+e,e); 260 } 261 } 262 263 }