001 /** 002 * 003 * Copyright 2004 Protique Ltd 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 * 017 **/ 018 package org.activemq.service.impl; 019 020 import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap; 021 import org.apache.commons.logging.Log; 022 import org.apache.commons.logging.LogFactory; 023 import org.activemq.broker.BrokerClient; 024 import org.activemq.message.ActiveMQMessage; 025 import org.activemq.service.MessageContainerManager; 026 import org.activemq.service.Service; 027 import org.activemq.service.Subscription; 028 029 import javax.jms.JMSException; 030 import java.util.Map; 031 import java.util.Iterator; 032 033 /** 034 * A Dispatcher that polls for updates for active Message Consumers 035 * 036 * @version $Revision: 1.1.1.1 $ 037 */ 038 public class DispatchWorker implements Runnable, Service { 039 private static final Log log = LogFactory.getLog(DispatchWorker.class); 040 private static final int POLL_TIMEOUT = 250; 041 042 private Map subscriptions = new ConcurrentHashMap(1000, 0.75f); 043 private Object lock = new Object(); 044 private boolean active = true; 045 private boolean started = false; 046 private MessageContainerManager containerManager; 047 048 /** 049 * Register the MessageContainerManager for the Dispatcher 050 * 051 * @param mcm 052 */ 053 public void register(MessageContainerManager mcm) { 054 this.containerManager = mcm; 055 } 056 057 /** 058 * Called to indicate that there is work to do on a Subscription this will wake up a Dispatch Worker if it is 059 * waiting for messages to dispatch 060 */ 061 public void wakeup() { 062 synchronized (lock) { 063 active = true; 064 lock.notifyAll(); 065 } 066 } 067 068 /** 069 * Add an active subscription 070 * 071 * @param client 072 * @param sub 073 */ 074 public void addActiveSubscription(BrokerClient client, Subscription sub) { 075 if (log.isDebugEnabled()) { 076 log.info("Adding subscription: " + sub + " to client: " + client); 077 } 078 subscriptions.put(sub, client); 079 } 080 081 /** 082 * remove an active subscription 083 * 084 * @param client 085 * @param sub 086 */ 087 public void removeActiveSubscription(BrokerClient client, Subscription sub) { 088 if (log.isDebugEnabled()) { 089 log.info("Removing subscription: " + sub + " from client: " + client); 090 } 091 subscriptions.remove(sub); 092 } 093 094 /** 095 * dispatch messages to active Consumers 096 * 097 * @see java.lang.Runnable#run() 098 */ 099 public void run() { 100 while (started) { 101 doPoll(); 102 boolean dispatched = false; 103 try { 104 // our collection will not throw concurrent modification exception 105 for (Iterator iter = subscriptions.keySet().iterator(); iter.hasNext();) { 106 Subscription sub = (Subscription) iter.next(); 107 if (sub != null && sub.isReadyToDispatch()) { 108 dispatched = dispatchMessages(sub, dispatched); 109 } 110 } 111 } 112 catch (JMSException jmsEx) { 113 log.error("Could not dispatch to Subscription: " + jmsEx, jmsEx); 114 } 115 if (!dispatched) { 116 synchronized (lock) { 117 active = false; 118 if (!active && started) { 119 try { 120 lock.wait(POLL_TIMEOUT); 121 } 122 catch (InterruptedException e) { 123 } 124 } 125 } 126 } 127 } 128 } 129 130 131 /** 132 * start the DispatchWorker 133 * 134 * @see org.activemq.service.Service#start() 135 */ 136 public void start() { 137 started = true; 138 } 139 140 /** 141 * stop the DispatchWorker 142 * 143 * @see org.activemq.service.Service#stop() 144 */ 145 public void stop() { 146 started = false; 147 } 148 149 150 // Implementation methods 151 //------------------------------------------------------------------------- 152 153 protected boolean dispatchMessages(Subscription subscription, boolean dispatched) throws JMSException { 154 ActiveMQMessage[] msgs = subscription.getMessagesToDispatch(); 155 if (msgs != null && msgs.length > 0) { 156 BrokerClient client = (BrokerClient) subscriptions.get(subscription); 157 if (client == null) { 158 log.warn("Null client for subscription: " + subscription); 159 } 160 else { 161 for (int i = 0; i < msgs.length; i++) { 162 ActiveMQMessage msg = msgs[i].shallowCopy(); 163 164 if (log.isDebugEnabled()) { 165 log.debug("Dispatching message: " + msg); 166 } 167 int[] consumerNos = new int[1]; 168 consumerNos[0] = subscription.getConsumerNumber(); 169 msg.setConsumerNos(consumerNos); 170 client.dispatch(msg); 171 dispatched = true; 172 } 173 } 174 } 175 return dispatched; 176 } 177 178 protected void doPoll() { 179 if (containerManager != null && started) { 180 try { 181 containerManager.poll(); 182 } 183 catch (JMSException e) { 184 log.error("Error polling from the ContainerManager: ", e); 185 } 186 } 187 } 188 }