001 /** 002 * 003 * Copyright 2004 Protique Ltd 004 * Copyright 2005 Hiram Chirino 005 * 006 * Licensed under the Apache License, Version 2.0 (the "License"); 007 * you may not use this file except in compliance with the License. 008 * You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 * 018 **/ 019 020 package org.activemq.service.boundedvm; 021 import org.activemq.broker.BrokerClient; 022 import org.activemq.filter.Filter; 023 import org.activemq.io.util.MemoryBoundedQueue; 024 import org.activemq.io.util.MemoryBoundedQueueManager; 025 import org.activemq.io.util.MemoryManageable; 026 import org.activemq.message.ActiveMQDestination; 027 import org.activemq.message.ActiveMQMessage; 028 import org.activemq.message.ConsumerInfo; 029 import org.activemq.service.DeadLetterPolicy; 030 import org.activemq.service.MessageContainerAdmin; 031 import org.activemq.service.MessageIdentity; 032 import org.activemq.service.QueueListEntry; 033 import org.activemq.service.RedeliveryPolicy; 034 import org.activemq.service.Service; 035 import org.activemq.service.TransactionManager; 036 import org.activemq.service.TransactionTask; 037 import org.activemq.service.impl.DefaultQueueList; 038 import org.activemq.store.MessageStore; 039 import org.activemq.store.RecoveryListener; 040 import org.apache.commons.logging.Log; 041 import org.apache.commons.logging.LogFactory; 042 043 import EDU.oswego.cs.dl.util.concurrent.Executor; 044 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean; 045 046 import javax.jms.JMSException; 047 048 import java.util.HashMap; 049 import java.util.List; 050 import java.util.Map; 051 052 /** 053 * A MessageContainer for Durable queues 054 * 055 * @version $Revision: 1.1.1.1 $ 056 */ 057 public class DurableQueueBoundedMessageContainer implements Service, Runnable, MessageContainerAdmin { 058 059 private final MessageStore messageStore; 060 private final MemoryBoundedQueueManager queueManager; 061 private final ActiveMQDestination destination; 062 private final Executor threadPool; 063 private final DeadLetterPolicy deadLetterPolicy; 064 private final Log log; 065 private final MemoryBoundedQueue queue; 066 067 private final DefaultQueueList subscriptions = new DefaultQueueList(); 068 private final SynchronizedBoolean started = new SynchronizedBoolean(false); 069 private final SynchronizedBoolean running = new SynchronizedBoolean(false); 070 private final Object dispatchMutex = new Object(); 071 private final Object subscriptionsMutex = new Object(); 072 073 private long idleTimestamp; //length of time (ms) there have been no active subscribers 074 075 /** 076 * Construct this beast 077 * 078 * @param threadPool 079 * @param queueManager 080 * @param destination 081 * @param redeliveryPolicy 082 * @param deadLetterPolicy 083 */ 084 public DurableQueueBoundedMessageContainer(MessageStore messageStore, Executor threadPool, MemoryBoundedQueueManager queueManager, 085 ActiveMQDestination destination,RedeliveryPolicy redeliveryPolicy, DeadLetterPolicy deadLetterPolicy) { 086 this.messageStore = messageStore; 087 this.threadPool = threadPool; 088 this.queueManager = queueManager; 089 this.destination = destination; 090 this.deadLetterPolicy = deadLetterPolicy; 091 092 this.queue = queueManager.getMemoryBoundedQueue("DURABLE_QUEUE:-" + destination.getPhysicalName()); 093 this.log = LogFactory.getLog("DurableQueueBoundedMessageContainer:- " + destination); 094 } 095 096 097 /** 098 * @return true if there are subscribers waiting for messages 099 */ 100 public boolean isActive(){ 101 return !subscriptions.isEmpty(); 102 } 103 104 /** 105 * @return true if no messages are enqueued 106 */ 107 public boolean isEmpty(){ 108 return queue.isEmpty(); 109 } 110 111 /** 112 * @return the timestamp (ms) from the when the last active subscriber stopped 113 */ 114 public long getIdleTimestamp(){ 115 return idleTimestamp; 116 } 117 118 119 120 /** 121 * Add a consumer to dispatch messages to 122 * 123 * @param filter 124 * @param info 125 * @param client 126 * @return DurableQueueSubscription 127 * @throws JMSException 128 */ 129 public DurableQueueSubscription addConsumer(Filter filter, ConsumerInfo info, BrokerClient client) 130 throws JMSException { 131 DurableQueueSubscription ts = findMatch(info); 132 if (ts == null) { 133 MemoryBoundedQueue queue = queueManager.getMemoryBoundedQueue("DURABLE_SUB:-"+info.getConsumerId()); 134 MemoryBoundedQueue ackQueue = queueManager.getMemoryBoundedQueue("DURABLE_SUB_ACKED:-"+info.getConsumerId()); 135 ts = new DurableQueueSubscription(client, queue, ackQueue, filter, info); 136 synchronized (subscriptionsMutex) { 137 idleTimestamp = 0; 138 subscriptions.add(ts); 139 checkRunning(); 140 } 141 } 142 return ts; 143 } 144 145 /** 146 * Remove a consumer 147 * 148 * @param info 149 * @throws JMSException 150 */ 151 public void removeConsumer(ConsumerInfo info) throws JMSException { 152 DurableQueueSubscription ts = null; 153 synchronized (subscriptionsMutex) { 154 ts = findMatch(info); 155 if (ts != null) { 156 157 subscriptions.remove(ts); 158 if (subscriptions.isEmpty()) { 159 running.commit(true, false); 160 idleTimestamp = System.currentTimeMillis(); 161 } 162 } 163 } 164 if (ts != null) { 165 166 // get unacknowledged messages and re-enqueue them 167 List list = ts.getUndeliveredMessages(); 168 for (int i = list.size() - 1; i >= 0; i--) { 169 queue.enqueueFirstNoBlock((MemoryManageable) list.get(i)); 170 } 171 172 // If it is a queue browser, then re-enqueue the browsed 173 // messages. 174 if (ts.isBrowser()) { 175 list = ts.listAckedMessages(); 176 for (int i = list.size() - 1; i >= 0; i--) { 177 queue.enqueueFirstNoBlock((MemoryManageable) list 178 .get(i)); 179 } 180 ts.removeAllAckedMessages(); 181 } 182 183 ts.close(); 184 } 185 } 186 187 /** 188 * start working 189 * 190 * @throws JMSException 191 */ 192 public void start() throws JMSException { 193 if (started.commit(false, true)) { 194 messageStore.start(); 195 196 // Avoid recovery failing due to memory constraints. 197 this.queueManager.setMemoryLimitEnforced(false); 198 try { 199 messageStore.recover(new RecoveryListener() { 200 public void recoverMessage(MessageIdentity messageIdentity) throws JMSException { 201 recoverMessageToBeDelivered(messageIdentity); 202 } 203 }); 204 } finally { 205 this.queueManager.setMemoryLimitEnforced(true); 206 } 207 208 checkRunning(); 209 } 210 } 211 212 private void recoverMessageToBeDelivered(MessageIdentity msgId) throws JMSException { 213 DurableMessagePointer pointer = new DurableMessagePointer(messageStore, getDestination(), messageStore.getMessage(msgId)); 214 queue.enqueue(pointer); 215 } 216 217 /** 218 * enqueue a message for dispatching 219 * 220 * @param message 221 * @throws JMSException 222 */ 223 public void enqueue(final ActiveMQMessage message) throws JMSException { 224 final DurableMessagePointer pointer = new DurableMessagePointer(messageStore, getDestination(), message); 225 if (message.isAdvisory()) { 226 doAdvisoryDispatchMessage(pointer); 227 } 228 else { 229 messageStore.addMessage(message); 230 // If there is no transaction.. then this executes directly. 231 TransactionManager.getContexTransaction().addPostCommitTask(new TransactionTask(){ 232 public void execute() throws Throwable { 233 queue.enqueue(pointer); 234 checkRunning(); 235 } 236 }); 237 } 238 } 239 240 public void redeliver(DurableMessagePointer message) { 241 queue.enqueueFirstNoBlock(message); 242 checkRunning(); 243 } 244 245 public void redeliver(List messages) { 246 queue.enqueueAllFirstNoBlock(messages); 247 checkRunning(); 248 } 249 250 /** 251 * stop working 252 */ 253 public void stop() { 254 started.set(false); 255 running.set(false); 256 queue.clear(); 257 } 258 259 /** 260 * close down this container 261 * 262 * @throws JMSException 263 */ 264 public void close() throws JMSException { 265 if (started.get()) { 266 stop(); 267 } 268 synchronized(subscriptionsMutex){ 269 QueueListEntry entry = subscriptions.getFirstEntry(); 270 while (entry != null) { 271 DurableQueueSubscription ts = (DurableQueueSubscription) entry.getElement(); 272 ts.close(); 273 entry = subscriptions.getNextEntry(entry); 274 } 275 subscriptions.clear(); 276 } 277 } 278 279 /** 280 * do some dispatching 281 */ 282 public void run() { 283 // Only allow one thread at a time to dispatch. 284 synchronized (dispatchMutex) { 285 boolean dispatched = false; 286 boolean targeted = false; 287 DurableMessagePointer messagePointer = null; 288 int notDispatchedCount = 0; 289 int sleepTime = 250; 290 int iterationsWithoutDispatchingBeforeStopping = 10000 / sleepTime;// ~10 291 // seconds 292 Map messageParts = new HashMap(); 293 try { 294 while (started.get() && running.get()) { 295 dispatched = false; 296 targeted = false; 297 synchronized (subscriptionsMutex) { 298 if (!subscriptions.isEmpty()) { 299 messagePointer = (DurableMessagePointer) queue 300 .dequeue(sleepTime); 301 if (messagePointer != null) { 302 ActiveMQMessage message = messagePointer 303 .getMessage(); 304 if (!message.isExpired()) { 305 306 QueueListEntry entry = subscriptions 307 .getFirstEntry(); 308 while (entry != null) { 309 DurableQueueSubscription ts = (DurableQueueSubscription) entry 310 .getElement(); 311 if (ts.isTarget(message)) { 312 targeted = true; 313 if (message.isMessagePart()) { 314 DurableQueueSubscription sameTarget = (DurableQueueSubscription) messageParts 315 .get(message 316 .getParentMessageID()); 317 if (sameTarget == null) { 318 sameTarget = ts; 319 messageParts 320 .put( 321 message 322 .getParentMessageID(), 323 sameTarget); 324 } 325 sameTarget 326 .doDispatch(messagePointer); 327 if (message.isLastMessagePart()) { 328 messageParts 329 .remove(message 330 .getParentMessageID()); 331 } 332 messagePointer = null; 333 dispatched = true; 334 notDispatchedCount = 0; 335 break; 336 } else if (ts.canAcceptMessages()) { 337 ts.doDispatch(messagePointer); 338 messagePointer = null; 339 dispatched = true; 340 notDispatchedCount = 0; 341 subscriptions.rotate(); 342 break; 343 } 344 } 345 entry = subscriptions 346 .getNextEntry(entry); 347 } 348 349 } else { 350 // expire message 351 if (log.isDebugEnabled()) { 352 log.debug("expired message: " 353 + messagePointer); 354 } 355 if (deadLetterPolicy != null) { 356 deadLetterPolicy 357 .sendToDeadLetter(messagePointer 358 .getMessage()); 359 } 360 messagePointer = null; 361 } 362 } 363 } 364 } 365 if (!dispatched) { 366 if (messagePointer != null) { 367 if (targeted) { 368 queue.enqueueFirstNoBlock(messagePointer); 369 } else { 370 //no matching subscribers - dump to end and hope one shows up ... 371 queue.enqueueNoBlock(messagePointer); 372 373 } 374 } 375 if (running.get()) { 376 if (notDispatchedCount++ > iterationsWithoutDispatchingBeforeStopping 377 && queue.isEmpty()) { 378 synchronized (running) { 379 running.commit(true, false); 380 } 381 } else { 382 Thread.sleep(sleepTime); 383 } 384 } 385 } 386 } 387 } catch (InterruptedException ie) { 388 //someone is stopping us from another thread 389 } catch (Throwable e) { 390 log.warn("stop dispatching", e); 391 stop(); 392 } 393 } 394 } 395 396 private DurableQueueSubscription findMatch(ConsumerInfo info) throws JMSException { 397 DurableQueueSubscription result = null; 398 synchronized (subscriptionsMutex) { 399 QueueListEntry entry = subscriptions.getFirstEntry(); 400 while (entry != null) { 401 DurableQueueSubscription ts = (DurableQueueSubscription) entry 402 .getElement(); 403 if (ts.getConsumerInfo().equals(info)) { 404 result = ts; 405 break; 406 } 407 entry = subscriptions.getNextEntry(entry); 408 } 409 } 410 return result; 411 } 412 413 /** 414 * @return the destination associated with this container 415 */ 416 public ActiveMQDestination getDestination() { 417 return destination; 418 } 419 420 /** 421 * @return the destination name 422 */ 423 public String getDestinationName() { 424 return destination.getPhysicalName(); 425 } 426 427 protected void clear() { 428 queue.clear(); 429 } 430 431 protected void removeExpiredMessages() { 432 long currentTime = System.currentTimeMillis(); 433 List list = queue.getContents(); 434 for (int i = 0;i < list.size();i++) { 435 DurableMessagePointer msgPointer = (DurableMessagePointer) list.get(i); 436 ActiveMQMessage message = msgPointer.getMessage(); 437 if (message.isExpired(currentTime)) { 438 // TODO: remove message from message store. 439 queue.remove(msgPointer); 440 if (log.isDebugEnabled()) { 441 log.debug("expired message: " + msgPointer); 442 } 443 } 444 } 445 } 446 447 protected void checkRunning(){ 448 if (!running.get() && started.get() && !subscriptions.isEmpty()) { 449 synchronized (running) { 450 if (running.commit(false, true)) { 451 try { 452 threadPool.execute(this); 453 } 454 catch (InterruptedException e) { 455 log.error(this + " Couldn't start executing ",e); 456 } 457 } 458 } 459 } 460 } 461 462 463 /** 464 * @see org.activemq.service.MessageContainer#getMessageContainerAdmin() 465 */ 466 public MessageContainerAdmin getMessageContainerAdmin() { 467 return this; 468 } 469 470 /** 471 * @see org.activemq.service.MessageContainerAdmin#empty() 472 */ 473 public void empty() throws JMSException { 474 if( subscriptions.isEmpty() ) { 475 messageStore.removeAllMessages(); 476 queue.clear(); 477 } else { 478 throw new JMSException("Cannot empty a queue while it is use."); 479 } 480 } 481 482 /** 483 * Dispatch an Advisory Message 484 * @param messagePointer 485 */ 486 private synchronized void doAdvisoryDispatchMessage(DurableMessagePointer messagePointer) { 487 ActiveMQMessage message = messagePointer.getMessage(); 488 try { 489 490 if (message.isAdvisory() && !message.isExpired()) { 491 synchronized (subscriptionsMutex) { 492 QueueListEntry entry = subscriptions.getFirstEntry(); 493 while (entry != null) { 494 DurableQueueSubscription ts = (DurableQueueSubscription) entry 495 .getElement(); 496 if (ts.isTarget(message)) { 497 ts.doDispatch(messagePointer); 498 break; 499 } 500 entry = subscriptions.getNextEntry(entry); 501 } 502 } 503 } 504 } catch (JMSException jmsEx) { 505 log.warn("Failed to dispatch advisory", jmsEx); 506 } 507 } 508 509 }