001 /** 002 * 003 * Copyright 2004 Protique Ltd 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 * 017 **/ 018 019 package org.activemq.service.boundedvm; 020 import java.util.Collections; 021 import java.util.HashMap; 022 import java.util.Iterator; 023 import java.util.LinkedList; 024 import java.util.List; 025 import java.util.Map; 026 import java.util.Set; 027 028 import javax.jms.JMSException; 029 030 import org.apache.commons.logging.Log; 031 import org.apache.commons.logging.LogFactory; 032 import org.activemq.broker.BrokerClient; 033 import org.activemq.filter.AndFilter; 034 import org.activemq.filter.DestinationMap; 035 import org.activemq.filter.Filter; 036 import org.activemq.filter.FilterFactory; 037 import org.activemq.filter.FilterFactoryImpl; 038 import org.activemq.filter.NoLocalFilter; 039 import org.activemq.io.util.MemoryBoundedQueue; 040 import org.activemq.io.util.MemoryBoundedQueueManager; 041 import org.activemq.message.ActiveMQDestination; 042 import org.activemq.message.ActiveMQMessage; 043 import org.activemq.message.ConsumerInfo; 044 import org.activemq.message.MessageAck; 045 import org.activemq.service.DeadLetterPolicy; 046 import org.activemq.service.MessageContainer; 047 import org.activemq.service.MessageContainerManager; 048 import org.activemq.service.RedeliveryPolicy; 049 import org.activemq.service.TransactionManager; 050 import org.activemq.service.TransactionTask; 051 052 import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap; 053 import EDU.oswego.cs.dl.util.concurrent.Executor; 054 import EDU.oswego.cs.dl.util.concurrent.PooledExecutor; 055 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean; 056 import EDU.oswego.cs.dl.util.concurrent.ThreadFactory; 057 058 /** 059 * A MessageContainerManager for transient queues 060 * 061 * @version $Revision: 1.1.1.1 $ 062 */ 063 /** 064 * A manager of MessageContainer instances 065 */ 066 public class TransientQueueBoundedMessageManager implements MessageContainerManager, Runnable { 067 private static final int DEFAULT_GARBAGE_COLLECTION_CAPACITY_LIMIT = 10; 068 private static final long DEFAULT_INACTIVE_TIMEOUT = 30 * 1000; 069 private static final Log log = LogFactory.getLog(TransientQueueBoundedMessageManager.class); 070 private MemoryBoundedQueueManager queueManager; 071 private ConcurrentHashMap containers; 072 private ConcurrentHashMap subscriptions; 073 private FilterFactory filterFactory; 074 private SynchronizedBoolean started; 075 private SynchronizedBoolean doingGarbageCollection; 076 private Map destinations; 077 private DestinationMap destinationMap; 078 private PooledExecutor threadPool; 079 private long inactiveTimeout; 080 private int garbageCoolectionCapacityLimit; 081 private RedeliveryPolicy redeliveryPolicy; 082 private DeadLetterPolicy deadLetterPolicy; 083 protected static class TransientQueueThreadFactory implements ThreadFactory { 084 /** 085 * @param command - command to run 086 * @return a new thread 087 */ 088 public Thread newThread(Runnable command) { 089 Thread result = new Thread(command); 090 result.setPriority(Thread.NORM_PRIORITY + 1); 091 result.setDaemon(true); 092 return result; 093 } 094 } 095 096 /** 097 * Constructor for TransientQueueBoundedMessageManager 098 * 099 * @param mgr 100 * @param redeliveryPolicy 101 * @param deadLetterPolicy 102 */ 103 public TransientQueueBoundedMessageManager(MemoryBoundedQueueManager mgr, RedeliveryPolicy redeliveryPolicy, 104 DeadLetterPolicy deadLetterPolicy) { 105 this.queueManager = mgr; 106 this.redeliveryPolicy = redeliveryPolicy; 107 this.deadLetterPolicy = deadLetterPolicy; 108 this.containers = new ConcurrentHashMap(); 109 this.destinationMap = new DestinationMap(); 110 this.destinations = new ConcurrentHashMap(); 111 this.subscriptions = new ConcurrentHashMap(); 112 this.filterFactory = new FilterFactoryImpl(); 113 this.started = new SynchronizedBoolean(false); 114 this.doingGarbageCollection = new SynchronizedBoolean(false); 115 this.threadPool = new PooledExecutor(); 116 this.threadPool.setThreadFactory(new TransientQueueThreadFactory()); 117 this.inactiveTimeout = DEFAULT_INACTIVE_TIMEOUT; 118 this.garbageCoolectionCapacityLimit = DEFAULT_GARBAGE_COLLECTION_CAPACITY_LIMIT; 119 } 120 121 /** 122 * @return Returns the garbageCoolectionCapacityLimit. 123 */ 124 public int getGarbageCoolectionCapacityLimit() { 125 return garbageCoolectionCapacityLimit; 126 } 127 128 /** 129 * @param garbageCoolectionCapacityLimit The garbageCoolectionCapacityLimit to set. 130 */ 131 public void setGarbageCoolectionCapacityLimit(int garbageCoolectionCapacityLimit) { 132 this.garbageCoolectionCapacityLimit = garbageCoolectionCapacityLimit; 133 } 134 135 /** 136 * @return Returns the inactiveTimeout. 137 */ 138 public long getInactiveTimeout() { 139 return inactiveTimeout; 140 } 141 142 /** 143 * @param inactiveTimeout The inactiveTimeout to set. 144 */ 145 public void setInactiveTimeout(long inactiveTimeout) { 146 this.inactiveTimeout = inactiveTimeout; 147 } 148 149 /** 150 * start the manager 151 * 152 * @throws JMSException 153 */ 154 public void start() throws JMSException { 155 if (started.commit(false, true)) { 156 for (Iterator i = containers.values().iterator();i.hasNext();) { 157 TransientQueueBoundedMessageContainer container = (TransientQueueBoundedMessageContainer) i.next(); 158 container.start(); 159 } 160 try { 161 threadPool.execute(this);//start garbage collection 162 } 163 catch (InterruptedException e) { 164 JMSException jmsEx = new JMSException("Garbage collection interupted on start()"); 165 jmsEx.setLinkedException(e); 166 throw jmsEx; 167 } 168 } 169 } 170 171 /** 172 * stop the manager 173 * 174 * @throws JMSException 175 */ 176 public void stop() throws JMSException { 177 if (started.commit(true, false)) { 178 for (Iterator i = containers.values().iterator();i.hasNext();) { 179 TransientQueueBoundedMessageContainer container = (TransientQueueBoundedMessageContainer) i.next(); 180 container.stop(); 181 } 182 threadPool.interruptAll(); 183 threadPool.shutdownNow(); 184 } 185 } 186 187 /** 188 * collect expired messages 189 */ 190 public void run() { 191 while (started.get()) { 192 doGarbageCollection(); 193 try { 194 Thread.sleep(2000); 195 } 196 catch (InterruptedException e) { 197 } 198 } 199 } 200 201 /** 202 * Add a consumer if appropiate 203 * 204 * @param client 205 * @param info 206 * @throws JMSException 207 */ 208 public synchronized void addMessageConsumer(BrokerClient client, ConsumerInfo info) throws JMSException { 209 ActiveMQDestination destination = info.getDestination(); 210 if (destination.isQueue()) { 211 TransientQueueBoundedMessageContainer container = (TransientQueueBoundedMessageContainer) containers 212 .get(destination); 213 if (container == null) { 214 MemoryBoundedQueue queue = queueManager.getMemoryBoundedQueue(client.toString()); 215 container = new TransientQueueBoundedMessageContainer(threadPool, queueManager, destination, 216 redeliveryPolicy, deadLetterPolicy); 217 addContainer(container); 218 if (started.get()) { 219 container.start(); 220 } 221 } 222 if (log.isDebugEnabled()) { 223 log.debug("Adding consumer: " + info); 224 } 225 226 TransientQueueSubscription ts = container.addConsumer(createFilter(info), info, client); 227 if (ts != null) { 228 subscriptions.put(info.getConsumerId(), ts); 229 } 230 String name = destination.getPhysicalName(); 231 if (!destinations.containsKey(name)) { 232 destinations.put(name, destination); 233 } 234 } 235 } 236 237 /** 238 * @param client 239 * @param info 240 * @throws JMSException 241 */ 242 public synchronized void removeMessageConsumer(BrokerClient client, ConsumerInfo info) throws JMSException { 243 ActiveMQDestination destination = info.getDestination(); 244 if (destination.isQueue()) { 245 for (Iterator i = containers.values().iterator();i.hasNext();) { 246 TransientQueueBoundedMessageContainer container = (TransientQueueBoundedMessageContainer) i.next(); 247 if (container != null) { 248 container.removeConsumer(info); 249 } 250 } 251 subscriptions.remove(info.getConsumerId()); 252 } 253 } 254 255 /** 256 * Delete a durable subscriber 257 * 258 * @param clientId 259 * @param subscriberName 260 * @throws JMSException if the subscriber doesn't exist or is still active 261 */ 262 public void deleteSubscription(String clientId, String subscriberName) throws JMSException { 263 } 264 265 /** 266 * @param client 267 * @param message 268 * @throws JMSException 269 */ 270 public void sendMessage(final BrokerClient client, final ActiveMQMessage message) throws JMSException { 271 ActiveMQDestination destination = message.getJMSActiveMQDestination(); 272 if ( !destination.isQueue() || !message.isTemporary() ) 273 return; 274 275 // If there is no transaction.. then this executes directly. 276 TransactionManager.getContexTransaction().addPostCommitTask(new TransactionTask(){ 277 public void execute() throws Throwable { 278 doSendMessage(client, message); 279 } 280 }); 281 } 282 283 /** 284 * @param client 285 * @param message 286 * @throws JMSException 287 */ 288 private void doSendMessage(BrokerClient client, ActiveMQMessage message) throws JMSException { 289 290 ActiveMQDestination destination = message.getJMSActiveMQDestination(); 291 if (queueManager.getCurrentCapacity() <= garbageCoolectionCapacityLimit) { 292 doGarbageCollection(); 293 } 294 TransientQueueBoundedMessageContainer container = (TransientQueueBoundedMessageContainer) containers 295 .get(destination); 296 if (container == null) { 297 MemoryBoundedQueue queue = queueManager.getMemoryBoundedQueue(client.toString()); 298 container = new TransientQueueBoundedMessageContainer(threadPool, queueManager, destination, 299 redeliveryPolicy, deadLetterPolicy); 300 addContainer(container); 301 if (started.get()) { 302 container.start(); 303 } 304 } 305 Set set = destinationMap.get(message.getJMSActiveMQDestination()); 306 for (Iterator i = set.iterator();i.hasNext();) { 307 container = (TransientQueueBoundedMessageContainer) i.next(); 308 container.enqueue(message); 309 } 310 } 311 312 /** 313 * @param client 314 * @param ack 315 * @throws JMSException 316 */ 317 public void acknowledgeMessage(final BrokerClient client, final MessageAck ack) throws JMSException { 318 if (!ack.getDestination().isQueue() || !ack.isTemporary()) 319 return; 320 321 final TransientQueueSubscription ts = (TransientQueueSubscription) subscriptions.get(ack.getConsumerId()); 322 if (ts == null) 323 return; 324 325 final ActiveMQMessage message = ts.acknowledgeMessage(ack.getMessageID()); 326 if (message == null ) 327 return; 328 329 if( ts.isBrowser() ) { 330 ts.addAckedMessage(message); 331 return; 332 } 333 334 if (!ack.isMessageRead() || ack.isExpired()) { 335 redeliverMessage(ts, ack, message); 336 } else if (TransactionManager.isCurrentTransaction()) { 337 // Hook in a callbacks on first acked message 338 if (!ts.hasAckedMessage()) { 339 TransactionManager.getContexTransaction().addPostRollbackTask(new TransactionTask() { 340 public void execute() throws Throwable { 341 342 List ackList = ts.listAckedMessages(); 343 HashMap redeliverMap = new HashMap(); 344 for (Iterator iter = ackList.iterator(); iter.hasNext();) { 345 346 ActiveMQMessage message = (ActiveMQMessage) iter.next(); 347 message.setJMSRedelivered(true); 348 if (message.incrementRedeliveryCount() > redeliveryPolicy.getMaximumRetryCount()) { 349 if (log.isDebugEnabled()){ 350 log.debug("Message: " + message + " has exceeded its retry count"); 351 } 352 deadLetterPolicy.sendToDeadLetter(message); 353 } 354 else if (ack.isExpired()) { 355 if (log.isDebugEnabled()){ 356 log.debug("Message: " + message + " has expired"); 357 } 358 deadLetterPolicy.sendToDeadLetter(message); 359 } 360 else { 361 Set containers = destinationMap.get(message.getJMSActiveMQDestination()); 362 for (Iterator i = containers.iterator();i.hasNext();) { 363 TransientQueueBoundedMessageContainer container = (TransientQueueBoundedMessageContainer) i.next(); 364 LinkedList l = (LinkedList) redeliverMap.get(container); 365 if( l==null ) { 366 l = new LinkedList(); 367 redeliverMap.put(container, l); 368 } 369 l.add(message); 370 } 371 } 372 } 373 374 for (Iterator i = redeliverMap.keySet().iterator();i.hasNext();) { 375 TransientQueueBoundedMessageContainer container = (TransientQueueBoundedMessageContainer) i.next(); 376 List l = (List) redeliverMap.get(container); 377 container.redeliver(l); 378 } 379 380 ts.removeAllAckedMessages(); 381 } 382 }); 383 TransactionManager.getContexTransaction().addPostCommitTask(new TransactionTask() { 384 public void execute() throws Throwable { 385 ts.removeAllAckedMessages(); 386 } 387 }); 388 } 389 // We need to keep track of messages that were acked. If we 390 // rollback. 391 ts.addAckedMessage(message); 392 } 393 } 394 395 /** 396 * @param client 397 * @param ack 398 * @param message 399 * @throws JMSException 400 */ 401 private void redeliverMessage(TransientQueueSubscription ts, MessageAck ack, ActiveMQMessage message) throws JMSException { 402 message.setJMSRedelivered(true); 403 if (message.incrementRedeliveryCount() > redeliveryPolicy.getMaximumRetryCount()) { 404 if (log.isDebugEnabled()){ 405 log.debug("Message: " + message + " has exceeded its retry count"); 406 } 407 deadLetterPolicy.sendToDeadLetter(message); 408 } 409 else if (ack.isExpired()) { 410 if (log.isDebugEnabled()){ 411 log.debug("Message: " + message + " has expired"); 412 } 413 deadLetterPolicy.sendToDeadLetter(message); 414 } 415 else { 416 Set set = destinationMap.get(message.getJMSActiveMQDestination()); 417 for (Iterator i = set.iterator();i.hasNext();) { 418 TransientQueueBoundedMessageContainer container = (TransientQueueBoundedMessageContainer) i.next(); 419 container.redeliver(message); 420 break; 421 } 422 } 423 } 424 425 /** 426 * @throws JMSException 427 */ 428 public void poll() throws JMSException { 429 } 430 431 /** 432 * @param physicalName 433 * @return MessageContainer 434 * @throws JMSException 435 */ 436 public MessageContainer getContainer(String physicalName) throws JMSException { 437 Object key = destinations.get(physicalName); 438 if (key != null) { 439 return (MessageContainer) containers.get(key); 440 } 441 return null; 442 } 443 444 /** 445 * @return a map of destinations 446 */ 447 public Map getDestinations() { 448 return Collections.unmodifiableMap(destinations); 449 } 450 451 /** 452 * Returns an unmodifiable map, indexed by String name, of all the {@link javax.jms.Destination} 453 * objects used by non-broker consumers directly connected to this container 454 * 455 * @return 456 */ 457 public Map getLocalDestinations() { 458 Map localDestinations = new HashMap(); 459 for (Iterator iter = subscriptions.values().iterator(); iter.hasNext();) { 460 TransientSubscription sub = (TransientSubscription) iter.next(); 461 if (sub.isLocalSubscription()) { 462 final ActiveMQDestination dest = sub.getDestination(); 463 localDestinations.put(dest.getPhysicalName(), dest); 464 } 465 } 466 return Collections.unmodifiableMap(localDestinations); 467 } 468 469 /** 470 * @return the DeadLetterPolicy for this Container Manager 471 */ 472 public DeadLetterPolicy getDeadLetterPolicy() { 473 return deadLetterPolicy; 474 } 475 476 /** 477 * Set the DeadLetterPolicy for this Container Manager 478 * 479 * @param policy 480 */ 481 public void setDeadLetterPolicy(DeadLetterPolicy policy) { 482 this.deadLetterPolicy = policy; 483 } 484 485 /** 486 * Create filter for a Consumer 487 * 488 * @param info 489 * @return the Fitler 490 * @throws javax.jms.JMSException 491 */ 492 protected Filter createFilter(ConsumerInfo info) throws JMSException { 493 Filter filter = filterFactory.createFilter(info.getDestination(), info.getSelector()); 494 if (info.isNoLocal()) { 495 filter = new AndFilter(filter, new NoLocalFilter(info.getClientId())); 496 } 497 return filter; 498 } 499 500 private void doGarbageCollection() { 501 if (doingGarbageCollection.commit(false, true)) { 502 if (queueManager.getCurrentCapacity() <= garbageCoolectionCapacityLimit) { 503 for (Iterator i = containers.values().iterator();i.hasNext();) { 504 TransientQueueBoundedMessageContainer container = (TransientQueueBoundedMessageContainer) i.next(); 505 container.removeExpiredMessages(); 506 log.warn("memory limit low - forced to remove expired messages: " 507 + container.getDestinationName()); 508 } 509 } 510 //if still below the limit - clear queues with no subscribers 511 //which have been inactive for a while 512 if (queueManager.getCurrentCapacity() <= garbageCoolectionCapacityLimit) { 513 for (Iterator i = containers.values().iterator();i.hasNext();) { 514 TransientQueueBoundedMessageContainer container = (TransientQueueBoundedMessageContainer) i.next(); 515 if (!container.isActive() 516 && (container.getIdleTimestamp() < (System.currentTimeMillis() - inactiveTimeout))) { 517 removeContainer(container); 518 log.warn("memory limit low - forced to remove inactive and idle queue: " 519 + container.getDestinationName()); 520 } 521 } 522 } 523 //if still now below limit - clear temporary queues with no subscribers 524 if (queueManager.getCurrentCapacity() <= garbageCoolectionCapacityLimit) { 525 for (Iterator i = containers.values().iterator();i.hasNext();) { 526 TransientQueueBoundedMessageContainer container = (TransientQueueBoundedMessageContainer) i.next(); 527 if (!container.isActive() && container.getDestination().isTemporary()) { 528 removeContainer(container); 529 log.warn("memory limit low - forced to remove inactive temporary queue: " 530 + container.getDestinationName()); 531 } 532 } 533 } 534 //if still now below limit - clear inactive queues 535 if (queueManager.getCurrentCapacity() <= garbageCoolectionCapacityLimit) { 536 for (Iterator i = containers.values().iterator();i.hasNext();) { 537 TransientQueueBoundedMessageContainer container = (TransientQueueBoundedMessageContainer) i.next(); 538 if (!container.isActive() && !container.isEmpty()) { 539 removeContainer(container); 540 log.warn("memory limit low - forced to remove inactive queue: " 541 + container.getDestinationName()); 542 } 543 } 544 } 545 doingGarbageCollection.set(false); 546 } 547 } 548 549 private synchronized void addContainer(TransientQueueBoundedMessageContainer container) { 550 containers.put(container.getDestination(), container); 551 destinationMap.put(container.getDestination(), container); 552 } 553 554 private synchronized void removeContainer(TransientQueueBoundedMessageContainer container) { 555 try { 556 container.close(); 557 log.info("closed inactive transient queue container: " + container.getDestinationName()); 558 } 559 catch (JMSException e) { 560 log.warn("failure closing container", e); 561 } 562 containers.remove(container.getDestination()); 563 destinationMap.remove(container.getDestination(), container); 564 } 565 566 protected Executor getThreadPool() { 567 return threadPool; 568 } 569 570 public void createMessageContainer(ActiveMQDestination dest) throws JMSException { 571 } 572 573 public void destroyMessageContainer(ActiveMQDestination dest) throws JMSException { 574 if ( !dest.isQueue() ) 575 return; 576 577 TransientQueueBoundedMessageContainer container = (TransientQueueBoundedMessageContainer) containers.remove(dest); 578 if( container != null ) { 579 container.stop(); 580 } 581 destinationMap.removeAll(dest); 582 } 583 584 public Map getMessageContainerAdmins() throws JMSException { 585 return Collections.EMPTY_MAP; 586 } 587 }