001 /** 002 * 003 * Copyright 2004 Protique Ltd 004 * Copyright 2005 Hiram Chirino 005 * 006 * Licensed under the Apache License, Version 2.0 (the "License"); 007 * you may not use this file except in compliance with the License. 008 * You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 * 018 **/ 019 020 package org.activemq.service.boundedvm; 021 import java.util.Collections; 022 import java.util.HashMap; 023 import java.util.Iterator; 024 import java.util.LinkedList; 025 import java.util.List; 026 import java.util.Map; 027 import java.util.Set; 028 029 import javax.jms.JMSException; 030 031 import org.apache.commons.logging.Log; 032 import org.apache.commons.logging.LogFactory; 033 import org.activemq.broker.BrokerClient; 034 import org.activemq.filter.AndFilter; 035 import org.activemq.filter.DestinationMap; 036 import org.activemq.filter.Filter; 037 import org.activemq.filter.FilterFactory; 038 import org.activemq.filter.FilterFactoryImpl; 039 import org.activemq.filter.NoLocalFilter; 040 import org.activemq.io.util.MemoryBoundedQueueManager; 041 import org.activemq.message.ActiveMQDestination; 042 import org.activemq.message.ActiveMQMessage; 043 import org.activemq.message.ActiveMQQueue; 044 import org.activemq.message.ConsumerInfo; 045 import org.activemq.message.MessageAck; 046 import org.activemq.service.DeadLetterPolicy; 047 import org.activemq.service.MessageContainer; 048 import org.activemq.service.MessageContainerManager; 049 import org.activemq.service.RedeliveryPolicy; 050 import org.activemq.service.TransactionManager; 051 import org.activemq.service.TransactionTask; 052 import org.activemq.store.MessageStore; 053 import org.activemq.store.PersistenceAdapter; 054 055 import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap; 056 import EDU.oswego.cs.dl.util.concurrent.Executor; 057 import EDU.oswego.cs.dl.util.concurrent.PooledExecutor; 058 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean; 059 import EDU.oswego.cs.dl.util.concurrent.ThreadFactory; 060 061 /** 062 * A MessageContainerManager for Durable queues 063 * 064 * @version $Revision: 1.1.1.1 $ 065 */ 066 public class DurableQueueBoundedMessageManager implements MessageContainerManager, Runnable { 067 private static final int DEFAULT_GARBAGE_COLLECTION_CAPACITY_LIMIT = 10; 068 private static final long DEFAULT_INACTIVE_TIMEOUT = 30 * 1000; 069 private static final Log log = LogFactory.getLog(DurableQueueBoundedMessageManager.class); 070 private MemoryBoundedQueueManager queueManager; 071 private ConcurrentHashMap containers; 072 private ConcurrentHashMap subscriptions; 073 private FilterFactory filterFactory; 074 private SynchronizedBoolean started; 075 private SynchronizedBoolean doingGarbageCollection; 076 private Map destinations; 077 private DestinationMap destinationMap; 078 private PooledExecutor threadPool; 079 private long inactiveTimeout; 080 private int garbageCoolectionCapacityLimit; 081 private RedeliveryPolicy redeliveryPolicy; 082 private DeadLetterPolicy deadLetterPolicy; 083 private final PersistenceAdapter persistenceAdapter; 084 085 protected static class DurableQueueThreadFactory implements ThreadFactory { 086 /** 087 * @param command - command to run 088 * @return a new thread 089 */ 090 public Thread newThread(Runnable command) { 091 Thread result = new Thread(command, "Durable Queue Worker"); 092 result.setPriority(Thread.NORM_PRIORITY + 1); 093 result.setDaemon(true); 094 return result; 095 } 096 } 097 098 /** 099 * Constructor for DurableQueueBoundedMessageManager 100 * 101 * @param mgr 102 * @param redeliveryPolicy 103 * @param deadLetterPolicy 104 */ 105 public DurableQueueBoundedMessageManager(PersistenceAdapter persistenceAdapter, MemoryBoundedQueueManager mgr, RedeliveryPolicy redeliveryPolicy, 106 DeadLetterPolicy deadLetterPolicy) { 107 this.persistenceAdapter = persistenceAdapter; 108 this.queueManager = mgr; 109 this.redeliveryPolicy = redeliveryPolicy; 110 this.deadLetterPolicy = deadLetterPolicy; 111 this.containers = new ConcurrentHashMap(); 112 this.destinationMap = new DestinationMap(); 113 this.destinations = new ConcurrentHashMap(); 114 this.subscriptions = new ConcurrentHashMap(); 115 this.filterFactory = new FilterFactoryImpl(); 116 this.started = new SynchronizedBoolean(false); 117 this.doingGarbageCollection = new SynchronizedBoolean(false); 118 this.threadPool = new PooledExecutor(); 119 this.threadPool.setThreadFactory(new DurableQueueThreadFactory()); 120 this.inactiveTimeout = DEFAULT_INACTIVE_TIMEOUT; 121 this.garbageCoolectionCapacityLimit = DEFAULT_GARBAGE_COLLECTION_CAPACITY_LIMIT; 122 } 123 124 /** 125 * @return Returns the garbageCoolectionCapacityLimit. 126 */ 127 public int getGarbageCoolectionCapacityLimit() { 128 return garbageCoolectionCapacityLimit; 129 } 130 131 /** 132 * @param garbageCoolectionCapacityLimit The garbageCoolectionCapacityLimit to set. 133 */ 134 public void setGarbageCoolectionCapacityLimit(int garbageCoolectionCapacityLimit) { 135 this.garbageCoolectionCapacityLimit = garbageCoolectionCapacityLimit; 136 } 137 138 /** 139 * @return Returns the inactiveTimeout. 140 */ 141 public long getInactiveTimeout() { 142 return inactiveTimeout; 143 } 144 145 /** 146 * @param inactiveTimeout The inactiveTimeout to set. 147 */ 148 public void setInactiveTimeout(long inactiveTimeout) { 149 this.inactiveTimeout = inactiveTimeout; 150 } 151 152 /** 153 * start the manager 154 * 155 * @throws JMSException 156 */ 157 public void start() throws JMSException { 158 if (started.commit(false, true)) { 159 for (Iterator i = containers.values().iterator();i.hasNext();) { 160 DurableQueueBoundedMessageContainer container = (DurableQueueBoundedMessageContainer) i.next(); 161 container.start(); 162 } 163 try { 164 threadPool.execute(this);//start garbage collection 165 } 166 catch (InterruptedException e) { 167 JMSException jmsEx = new JMSException("Garbage collection interupted on start()"); 168 jmsEx.setLinkedException(e); 169 throw jmsEx; 170 } 171 } 172 } 173 174 /** 175 * stop the manager 176 * 177 * @throws JMSException 178 */ 179 public void stop() throws JMSException { 180 if (started.commit(true, false)) { 181 for (Iterator i = containers.values().iterator();i.hasNext();) { 182 DurableQueueBoundedMessageContainer container = (DurableQueueBoundedMessageContainer) i.next(); 183 container.stop(); 184 } 185 threadPool.interruptAll(); 186 threadPool.shutdownNow(); 187 } 188 } 189 190 /** 191 * collect expired messages 192 */ 193 public void run() { 194 while (started.get()) { 195 doGarbageCollection(); 196 try { 197 Thread.sleep(2000); 198 } 199 catch (InterruptedException e) { 200 } 201 } 202 } 203 204 /** 205 * Add a consumer if appropiate 206 * 207 * @param client 208 * @param info 209 * @throws JMSException 210 */ 211 public synchronized void addMessageConsumer(BrokerClient client, ConsumerInfo info) throws JMSException { 212 ActiveMQDestination destination = info.getDestination(); 213 if ( !destination.isQueue() || destination.isTemporary() ) 214 return; 215 216 DurableQueueBoundedMessageContainer container = (DurableQueueBoundedMessageContainer) containers 217 .get(destination); 218 if (container == null) { 219 container = createContainer(destination,false); 220 } 221 if (log.isDebugEnabled()) { 222 log.debug("Adding consumer: " + info); 223 } 224 225 DurableQueueSubscription ts = container.addConsumer(createFilter(info), info, client); 226 if (ts != null) { 227 subscriptions.put(info.getConsumerId(), ts); 228 } 229 String name = destination.getPhysicalName(); 230 if (!destinations.containsKey(name)) { 231 destinations.put(name, destination); 232 } 233 } 234 235 /** 236 * @param client 237 * @param destination 238 * @param isDeadLetterQueue is this queue a dead letter queue 239 * @return the container 240 * @throws JMSException 241 */ 242 private DurableQueueBoundedMessageContainer createContainer(ActiveMQDestination destination, boolean isDeadLetterQueue) throws JMSException { 243 MessageStore messageStore = persistenceAdapter.createQueueMessageStore(destination.getPhysicalName()); 244 DurableQueueBoundedMessageContainer container = new DurableQueueBoundedMessageContainer(messageStore, threadPool, queueManager, destination, isDeadLetterQueue ? null : redeliveryPolicy, deadLetterPolicy); 245 addContainer(container); 246 if (started.get()) { 247 container.start(); 248 } 249 return container; 250 } 251 252 /** 253 * @param client 254 * @param info 255 * @throws JMSException 256 */ 257 public synchronized void removeMessageConsumer(BrokerClient client, ConsumerInfo info) throws JMSException { 258 ActiveMQDestination destination = info.getDestination(); 259 if ( !destination.isQueue() || destination.isTemporary() ) 260 return; 261 262 for (Iterator i = containers.values().iterator();i.hasNext();) { 263 DurableQueueBoundedMessageContainer container = (DurableQueueBoundedMessageContainer) i.next(); 264 if (container != null) { 265 container.removeConsumer(info); 266 } 267 } 268 subscriptions.remove(info.getConsumerId()); 269 } 270 271 /** 272 * Delete a durable subscriber 273 * 274 * @param clientId 275 * @param subscriberName 276 * @throws JMSException if the subscriber doesn't exist or is still active 277 */ 278 public void deleteSubscription(String clientId, String subscriberName) throws JMSException { 279 } 280 281 /** 282 * @param client 283 * @param message 284 * @throws JMSException 285 */ 286 public void sendMessage(final BrokerClient client, final ActiveMQMessage message) throws JMSException { 287 ActiveMQDestination destination = message.getJMSActiveMQDestination(); 288 if (!destination.isQueue() || destination.isTemporary() || !message.isPersistent()) 289 return; 290 291 if (queueManager.getCurrentCapacity() <= garbageCoolectionCapacityLimit) { 292 doGarbageCollection(); 293 } 294 DurableQueueBoundedMessageContainer container = (DurableQueueBoundedMessageContainer) containers.get(destination); 295 if (container == null) { 296 container = createContainer(destination, false); 297 } 298 299 Set set = destinationMap.get(message.getJMSActiveMQDestination()); 300 for (Iterator i = set.iterator();i.hasNext();) { 301 container = (DurableQueueBoundedMessageContainer) i.next(); 302 container.enqueue(message); 303 } 304 } 305 306 /** 307 * @param client 308 * @param ack 309 * @throws JMSException 310 */ 311 public void acknowledgeMessage(final BrokerClient client, final MessageAck ack) throws JMSException { 312 313 ActiveMQDestination destination = ack.getDestination(); 314 if (destination == null) { 315 log.warn("Ignoring acknowledgeMessage() on null destination: " + ack); 316 return; 317 } 318 if (!destination.isQueue() || destination.isTemporary() || !ack.isPersistent()) 319 return; 320 321 final DurableQueueSubscription ts = (DurableQueueSubscription) subscriptions.get(ack.getConsumerId()); 322 if (ts == null) 323 return; 324 325 final DurableMessagePointer messagePointer = ts.acknowledgeMessage(ack.getMessageID()); 326 if (messagePointer == null ) 327 return; 328 329 if( ts.isBrowser() ) { 330 ts.addAckedMessage(messagePointer); 331 return; 332 } 333 334 if (!ack.isMessageRead() || ack.isExpired()) { 335 redeliverMessage(ts, ack, messagePointer); 336 return; 337 } 338 339 // Let the message store ack the message. 340 messagePointer.getMessageStore().removeMessage(ack); 341 if (TransactionManager.isCurrentTransaction()) { 342 // Hook in a callback on first acked message 343 if (!ts.hasAckedMessage()) { 344 TransactionManager.getContexTransaction().addPostRollbackTask(new TransactionTask() { 345 public void execute() throws Throwable { 346 347 List ackList = ts.listAckedMessages(); 348 HashMap redeliverMap = new HashMap(); 349 //for (int x = ackList.size()-1; x >= 0 ; x--){ 350 for (int x = 0; x < ackList.size(); x++){ 351 DurableMessagePointer messagePointer = (DurableMessagePointer) ackList.get(x); 352 ActiveMQMessage message = messagePointer.getMessage(); 353 message.setJMSRedelivered(true); 354 if (message.incrementRedeliveryCount() > redeliveryPolicy.getMaximumRetryCount()) { 355 if (log.isDebugEnabled()){ 356 log.debug("Message: " + message + " has exceeded its retry count"); 357 } 358 // TODO: see if we can use the deadLetterPolicy of the container that dispatched the message. 359 deadLetterPolicy.sendToDeadLetter(message); 360 } 361 else if (ack.isExpired()) { 362 if (log.isDebugEnabled()){ 363 log.debug("Message: " + message + " has expired"); 364 } 365 // TODO: see if we can use the deadLetterPolicy of the container that dispatched the message. 366 deadLetterPolicy.sendToDeadLetter(message); 367 } 368 else { 369 Set containers = destinationMap.get(message.getJMSActiveMQDestination()); 370 for (Iterator i = containers.iterator();i.hasNext();) { 371 DurableQueueBoundedMessageContainer container = (DurableQueueBoundedMessageContainer) i.next(); 372 LinkedList l = (LinkedList) redeliverMap.get(container); 373 if( l==null ) { 374 l = new LinkedList(); 375 redeliverMap.put(container, l); 376 } 377 l.add(messagePointer); 378 } 379 } 380 } 381 382 for (Iterator i = redeliverMap.keySet().iterator();i.hasNext();) { 383 DurableQueueBoundedMessageContainer container = (DurableQueueBoundedMessageContainer) i.next(); 384 List l = (List) redeliverMap.get(container); 385 container.redeliver(l); 386 } 387 388 ts.removeAllAckedMessages(); 389 } 390 }); 391 TransactionManager.getContexTransaction().addPostCommitTask(new TransactionTask() { 392 public void execute() throws Throwable { 393 ts.removeAllAckedMessages(); 394 } 395 }); 396 } 397 // We need to keep track of messages that were acked. If we 398 // rollback. 399 ts.addAckedMessage(messagePointer); 400 } 401 } 402 403 /** 404 * @param client 405 * @param ack 406 * @param message 407 * @throws JMSException 408 */ 409 private void redeliverMessage(DurableQueueSubscription ts, MessageAck ack, DurableMessagePointer message) throws JMSException { 410 message.getMessage().setJMSRedelivered(true); 411 if (message.incrementDeliveryCount() >= redeliveryPolicy.getMaximumRetryCount()) { 412 if (log.isDebugEnabled()){ 413 log.debug("Message: " + message + " has exceeded its retry count"); 414 } 415 deadLetterPolicy.sendToDeadLetter(message.getMessage()); 416 } 417 else if (ack.isExpired()) { 418 if (log.isDebugEnabled()){ 419 log.debug("Message: " + message + " has expired"); 420 } 421 deadLetterPolicy.sendToDeadLetter(message.getMessage()); 422 } 423 else { 424 Set set = destinationMap.get(message.getMessage().getJMSActiveMQDestination()); 425 for (Iterator i = set.iterator();i.hasNext();) { 426 DurableQueueBoundedMessageContainer container = (DurableQueueBoundedMessageContainer) i.next(); 427 container.redeliver(message); 428 break; 429 } 430 } 431 } 432 433 /** 434 * @throws JMSException 435 */ 436 public void poll() throws JMSException { 437 } 438 439 /** 440 * @param physicalName 441 * @return MessageContainer 442 * @throws JMSException 443 */ 444 public MessageContainer getContainer(String physicalName) throws JMSException { 445 ActiveMQDestination key = (ActiveMQDestination) destinations.get(physicalName); 446 if (key != null) { 447 return (MessageContainer) containers.get(key); 448 } 449 return null; 450 } 451 452 /** 453 * @return a map of destinations 454 */ 455 public Map getDestinations() { 456 return Collections.unmodifiableMap(containers); 457 } 458 459 /** 460 * Returns an unmodifiable map, indexed by String name, of all the {@link javax.jms.Destination} 461 * objects used by non-broker consumers directly connected to this container 462 * 463 * @return 464 */ 465 public Map getLocalDestinations() { 466 Map localDestinations = new HashMap(); 467 for (Iterator iter = subscriptions.values().iterator(); iter.hasNext();) { 468 DurableSubscription sub = (DurableSubscription) iter.next(); 469 if (sub.isLocalSubscription()) { 470 final ActiveMQDestination dest = sub.getDestination(); 471 localDestinations.put(dest.getPhysicalName(), dest); 472 } 473 } 474 return Collections.unmodifiableMap(localDestinations); 475 } 476 477 /** 478 * @return the DeadLetterPolicy for this Container Manager 479 */ 480 public DeadLetterPolicy getDeadLetterPolicy() { 481 return deadLetterPolicy; 482 } 483 484 /** 485 * Set the DeadLetterPolicy for this Container Manager 486 * 487 * @param policy 488 */ 489 public void setDeadLetterPolicy(DeadLetterPolicy policy) { 490 this.deadLetterPolicy = policy; 491 } 492 493 /** 494 * Create filter for a Consumer 495 * 496 * @param info 497 * @return the Fitler 498 * @throws javax.jms.JMSException 499 */ 500 protected Filter createFilter(ConsumerInfo info) throws JMSException { 501 Filter filter = filterFactory.createFilter(info.getDestination(), info.getSelector()); 502 if (info.isNoLocal()) { 503 filter = new AndFilter(filter, new NoLocalFilter(info.getClientId())); 504 } 505 return filter; 506 } 507 508 private void doGarbageCollection() { 509 510 if (doingGarbageCollection.commit(true, false)) { 511 512 if (queueManager.getCurrentCapacity() <= garbageCoolectionCapacityLimit) { 513 for (Iterator i = containers.values().iterator();i.hasNext();) { 514 DurableQueueBoundedMessageContainer container = (DurableQueueBoundedMessageContainer) i.next(); 515 container.removeExpiredMessages(); 516 log.warn("memory limit low - forced to remove expired messages: " 517 + container.getDestinationName()); 518 } 519 } 520 521 //if still below the limit - clear queues with no subscribers 522 //which have been inactive for a while 523 if (queueManager.getCurrentCapacity() <= garbageCoolectionCapacityLimit) { 524 for (Iterator i = containers.values().iterator();i.hasNext();) { 525 DurableQueueBoundedMessageContainer container = (DurableQueueBoundedMessageContainer) i.next(); 526 if (!container.isActive() && (container.getIdleTimestamp() < (System.currentTimeMillis() - inactiveTimeout))) { 527 removeContainer(container); 528 log.warn("memory limit low - forced to remove inactive and idle queue: " 529 + container.getDestinationName()); 530 } 531 } 532 } 533 //if still now below limit - clear inactive queues 534 if (queueManager.getCurrentCapacity() <= garbageCoolectionCapacityLimit) { 535 for (Iterator i = containers.values().iterator();i.hasNext();) { 536 DurableQueueBoundedMessageContainer container = (DurableQueueBoundedMessageContainer) i.next(); 537 if (!container.isActive() && !container.isEmpty()) { 538 removeContainer(container); 539 log.warn("memory limit low - forced to remove inactive queue: " 540 + container.getDestinationName()); 541 } 542 } 543 } 544 doingGarbageCollection.set(false); 545 } 546 } 547 548 private synchronized void addContainer(DurableQueueBoundedMessageContainer container) { 549 containers.put(container.getDestination(), container); 550 destinationMap.put(container.getDestination(), container); 551 } 552 553 private synchronized void removeContainer(DurableQueueBoundedMessageContainer container) { 554 try { 555 container.close(); 556 log.info("closed inactive Durable queue container: " + container.getDestinationName()); 557 } 558 catch (JMSException e) { 559 log.warn("failure closing container", e); 560 } 561 containers.remove(container.getDestination()); 562 destinationMap.remove(container.getDestination(), container); 563 } 564 565 protected Executor getThreadPool() { 566 return threadPool; 567 } 568 569 public void createMessageContainer(ActiveMQDestination dest) throws JMSException { 570 createContainer(dest, false); 571 } 572 573 public Map getMessageContainerAdmins() throws JMSException { 574 return Collections.unmodifiableMap(containers); 575 } 576 577 578 public void destroyMessageContainer(ActiveMQDestination dest) throws JMSException { 579 if ( !dest.isQueue() ) 580 return; 581 582 DurableQueueBoundedMessageContainer container = (DurableQueueBoundedMessageContainer) containers.remove(dest); 583 if( container != null ) { 584 container.empty(); 585 container.stop(); 586 } 587 destinationMap.removeAll(dest); 588 } 589 590 public void sendToDeadLetterQueue(String deadLetterName, ActiveMQMessage message) throws JMSException { 591 592 ActiveMQQueue destination = new ActiveMQQueue(deadLetterName); 593 DurableQueueBoundedMessageContainer container = (DurableQueueBoundedMessageContainer) containers.get(destination); 594 if (container == null) { 595 container = createContainer(destination, true); 596 } 597 container.enqueue(message); 598 599 } 600 }