001    /**
002     * 
003     * Copyright 2004 Protique Ltd
004     * 
005     * Licensed under the Apache License, Version 2.0 (the "License"); 
006     * you may not use this file except in compliance with the License. 
007     * You may obtain a copy of the License at 
008     * 
009     * http://www.apache.org/licenses/LICENSE-2.0
010     * 
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS, 
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
014     * See the License for the specific language governing permissions and 
015     * limitations under the License. 
016     * 
017     **/
018    package org.activemq.service.impl;
019    
020    import java.util.Collections;
021    import java.util.HashMap;
022    import java.util.Iterator;
023    import java.util.Map;
024    import java.util.Set;
025    
026    import javax.jms.DeliveryMode;
027    import javax.jms.Destination;
028    import javax.jms.JMSException;
029    
030    import org.apache.commons.logging.Log;
031    import org.apache.commons.logging.LogFactory;
032    import org.activemq.broker.BrokerClient;
033    import org.activemq.filter.AndFilter;
034    import org.activemq.filter.DestinationMap;
035    import org.activemq.filter.Filter;
036    import org.activemq.filter.FilterFactory;
037    import org.activemq.filter.FilterFactoryImpl;
038    import org.activemq.filter.NoLocalFilter;
039    import org.activemq.message.ActiveMQDestination;
040    import org.activemq.message.ActiveMQMessage;
041    import org.activemq.message.ActiveMQQueue;
042    import org.activemq.message.ConsumerInfo;
043    import org.activemq.message.MessageAck;
044    import org.activemq.service.DeadLetterPolicy;
045    import org.activemq.service.Dispatcher;
046    import org.activemq.service.MessageContainer;
047    import org.activemq.service.QueueList;
048    import org.activemq.service.QueueListEntry;
049    import org.activemq.service.QueueMessageContainer;
050    import org.activemq.service.QueueMessageContainerManager;
051    import org.activemq.service.RedeliveryPolicy;
052    import org.activemq.service.Subscription;
053    import org.activemq.service.SubscriptionContainer;
054    import org.activemq.service.TransactionManager;
055    import org.activemq.service.TransactionTask;
056    import org.activemq.store.PersistenceAdapter;
057    
058    import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
059    
060    /**
061     * A default Broker used for Queue messages
062     *
063     * @version $Revision: 1.1.1.1 $
064     */
065    public class DurableQueueMessageContainerManager extends MessageContainerManagerSupport implements QueueMessageContainerManager {
066        private static final Log log = LogFactory.getLog(DurableQueueMessageContainerManager.class);
067        private static final int MAX_MESSAGES_DISPATCHED_FROM_POLL = 50;
068    
069        private PersistenceAdapter persistenceAdapter;
070        protected SubscriptionContainer subscriptionContainer;
071        protected FilterFactory filterFactory;
072        protected Map activeSubscriptions = new ConcurrentHashMap();
073        protected Map browsers = new ConcurrentHashMap();
074        protected Map messagePartSubscribers = new ConcurrentHashMap();
075        protected DestinationMap destinationMap = new DestinationMap();
076        private Object subscriptionMutex = new Object();
077        
078       
079       
080    
081        public DurableQueueMessageContainerManager(PersistenceAdapter persistenceAdapter, RedeliveryPolicy redeliveryPolicy,DeadLetterPolicy deadLetterPolicy) {
082            this(persistenceAdapter, new SubscriptionContainerImpl(redeliveryPolicy,deadLetterPolicy), new FilterFactoryImpl(), new DispatcherImpl());
083        }
084    
085        public DurableQueueMessageContainerManager(PersistenceAdapter persistenceAdapter, SubscriptionContainer subscriptionContainer, FilterFactory filterFactory, Dispatcher dispatcher) {
086            super(dispatcher);
087            this.persistenceAdapter = persistenceAdapter;
088            this.subscriptionContainer = subscriptionContainer;
089            this.filterFactory = filterFactory;
090        }
091    
092        /**
093         * Answers true if this ContainerManager is interested in managing the destination.
094         * 
095             * @param destination
096         * @param b
097             * @return
098             */
099            private boolean isManagerFor(ActiveMQDestination destination) {
100                    return destination!=null && destination.isQueue() && !destination.isTemporary();
101            }
102    
103            /**
104         * Answers true if this ContainerManager is interested in handing a operation of 
105         * on the provided destination.  persistentOp is true when the opperation is persistent.
106         * 
107             * @param destination
108             * @param persistentOp
109         * @param b
110             * @return
111             */
112            private boolean isManagerFor(ActiveMQDestination destination, boolean persistentOp) {
113                    // We are going to handle both persistent and non persistent operations for now.
114                    return isManagerFor(destination) && persistentOp;
115            }
116    
117        /**
118         * Returns an unmodifiable map, indexed by String name, of all the {@link javax.jms.Destination}
119         * objects used by non-broker consumers directly connected to this container
120         *
121         * @return
122         */
123        public Map getLocalDestinations() {
124            Map localDestinations = new HashMap();
125            for (Iterator iter = subscriptionContainer.subscriptionIterator(); iter.hasNext();) {
126                Subscription sub = (Subscription) iter.next();
127                if (sub.isLocalSubscription()) {
128                    final ActiveMQDestination dest = sub.getDestination();
129                    localDestinations.put(dest.getPhysicalName(), dest);
130                }
131            }
132            return Collections.unmodifiableMap(localDestinations);
133        }
134    
135            /**
136         * @param client
137         * @param info
138         * @throws javax.jms.JMSException
139         */
140        public void addMessageConsumer(BrokerClient client, ConsumerInfo info) throws JMSException {
141            
142            // Are we not intrested in handling that destination?
143            if( !isManagerFor(info.getDestination()) ) {
144                    return;
145            }
146    
147            if (log.isDebugEnabled()) {
148                log.debug("Adding consumer: " + info);
149            }
150                    
151            //ensure a matching container exists for the destination
152            getContainer(info.getDestination().getPhysicalName());
153            
154            Subscription sub = subscriptionContainer.makeSubscription(dispatcher, client,info, createFilter(info));
155            dispatcher.addActiveSubscription(client, sub);
156            updateActiveSubscriptions(sub);
157    
158            // set active last in case we end up dispatching some messages
159            // while recovering
160            sub.setActive(true);
161        }
162    
163            /**
164         * @param client
165         * @param info
166         * @throws javax.jms.JMSException
167         */
168        public void removeMessageConsumer(BrokerClient client, ConsumerInfo info) throws JMSException {
169            if (log.isDebugEnabled()) {
170                log.debug("Removing consumer: " + info);
171            }
172            if (info.getDestination() != null && info.getDestination().isQueue()) {
173                synchronized (subscriptionMutex) {
174                    Subscription sub = (Subscription) subscriptionContainer.removeSubscription(info.getConsumerId());
175                    if (sub != null) {
176                        sub.setActive(false);
177                        sub.clear();//resets entries in the QueueMessageContainer
178                        dispatcher.removeActiveSubscription(client, sub);
179                        //need to do wildcards for this - but for now use exact matches
180                        for (Iterator iter = messageContainers.values().iterator(); iter.hasNext();) {
181                            QueueMessageContainer container = (QueueMessageContainer) iter.next();
182                            //should change this for wild cards ...
183                            if (container.getDestinationName().equals(sub.getDestination().getPhysicalName())) {
184                                QueueList list = getSubscriptionList(container);
185                                list.remove(sub);
186                                if (list.isEmpty()) {
187                                    activeSubscriptions.remove(sub.getDestination().getPhysicalName());
188                                }
189                                list = getBrowserList(container);
190                                list.remove(sub);
191                                if (list.isEmpty()) {
192                                    browsers.remove(sub.getDestination().getPhysicalName());
193                                }
194                            }
195                        }
196                    }
197                }
198            }
199        }
200    
201        /**
202         * Delete a durable subscriber
203         *
204         * @param clientId
205         * @param subscriberName
206         * @throws javax.jms.JMSException if the subscriber doesn't exist or is still active
207         */
208        public void deleteSubscription(String clientId, String subscriberName) throws JMSException {
209        }
210    
211        /**
212         * @param client
213         * @param message
214         * @throws javax.jms.JMSException
215         */
216        public void sendMessage(final BrokerClient client, final ActiveMQMessage message) throws JMSException {
217    
218            ActiveMQDestination dest = (ActiveMQDestination) message.getJMSDestination();
219            // Are we not intrested in handling that destination?
220            if( !isManagerFor(dest, message.getJMSDeliveryMode()==DeliveryMode.PERSISTENT) ) {
221                    return;
222            }
223            
224            if (log.isDebugEnabled()) {
225                log.debug("Dispaching message: " + message);
226            }
227            //ensure a matching container exists for the destination
228            getContainer(((ActiveMQDestination) message.getJMSDestination()).getPhysicalName());
229            Set set = destinationMap.get(message.getJMSActiveMQDestination());
230            for (Iterator i = set.iterator();i.hasNext();) {
231                QueueMessageContainer container = (QueueMessageContainer) i.next();
232                container.addMessage(message);
233                // Once transaction has completed.. dispatch the message.
234                TransactionManager.getContexTransaction().addPostCommitTask(new TransactionTask(){
235                    public void execute() throws Throwable {
236                        dispatcher.wakeup();
237                        updateSendStats(client, message);
238                    }
239                });         
240                
241            }
242        }
243    
244        /**
245         * Acknowledge a message as being read and consumed by the Consumer
246         *
247         * @param client
248         * @param ack
249         * @throws javax.jms.JMSException
250         */
251        public void acknowledgeMessage(final BrokerClient client, final MessageAck ack) throws JMSException {
252            // Are we not intrested in handling that destination?
253            if( !isManagerFor(ack.getDestination(), ack.isPersistent()) ) {
254                    return;
255            }
256            final Subscription sub = subscriptionContainer.getSubscription(ack.getConsumerId());
257            if (sub == null){
258                return;
259            }
260    
261            sub.messageConsumed(ack);
262            if (ack.isMessageRead()) {
263                updateAcknowledgeStats(client, sub);
264            }
265        }
266        
267        /**
268         * Poll for messages
269         *
270         * @throws javax.jms.JMSException
271         */
272        public void poll() throws JMSException {
273            synchronized (subscriptionMutex) {
274                for (Iterator iter = activeSubscriptions.keySet().iterator(); iter.hasNext();) {
275                    QueueMessageContainer container = (QueueMessageContainer) iter.next();
276    
277                    QueueList browserList = (QueueList) browsers.get(container);
278                    doPeek(container, browserList);
279                    QueueList list = (QueueList) activeSubscriptions.get(container);
280                    doPoll(container, list);
281                }
282            }
283        }
284    
285        public MessageContainer getContainer(String destinationName) throws JMSException {
286            MessageContainer container = (MessageContainer) messageContainers.get(destinationName);
287            if (container == null) {
288                synchronized (subscriptionMutex) {
289                    container = super.getContainer(destinationName);
290                }
291            }
292            return container;
293        }
294    
295        // Implementation methods
296        //-------------------------------------------------------------------------
297    
298        protected MessageContainer createContainer(String destinationName) throws JMSException {
299            QueueMessageContainer container = new DurableQueueMessageContainer(persistenceAdapter, persistenceAdapter.createQueueMessageStore(destinationName), destinationName);
300            
301            //Add any interested Subscriptions to the new Container
302            for (Iterator iter = subscriptionContainer.subscriptionIterator(); iter.hasNext();) {
303                Subscription sub = (Subscription) iter.next();
304                if (sub.isBrowser()) {
305                    updateBrowsers(container, sub);
306                }
307                else {
308                    updateActiveSubscriptions(container, sub);
309                }
310            }
311            
312            ActiveMQDestination key = new ActiveMQQueue(destinationName);
313            destinationMap.put(key, container);
314            return container;
315        }
316    
317        protected Destination createDestination(String destinationName) {
318            return new ActiveMQQueue(destinationName);
319        }
320    
321        private void doPeek(QueueMessageContainer container, QueueList browsers) throws JMSException {
322            if (browsers != null && browsers.size() > 0) {
323                for (int i = 0; i < browsers.size(); i++) {
324                    SubscriptionImpl sub = (SubscriptionImpl) browsers.get(i);
325                    int count = 0;
326                    ActiveMQMessage msg = null;
327                    do {
328                        msg = container.peekNext(sub.getLastMessageIdentity());
329                        if (msg != null) {
330                            if (sub.isTarget(msg)) {
331                                System.out.println("browser dispatch: "+msg.getJMSMessageID());
332                                sub.addMessage(container, msg);
333                                dispatcher.wakeup(sub);
334                            }
335                            else {
336                                sub.setLastMessageIdentifier(msg.getJMSMessageIdentity());
337                            }
338                        }
339                    }
340                    while (msg != null && !sub.isAtPrefetchLimit() && count++ < MAX_MESSAGES_DISPATCHED_FROM_POLL);
341                }
342            }
343        }
344    
345        private void doPoll(QueueMessageContainer container, QueueList subList) throws JMSException {
346            int count = 0;
347            ActiveMQMessage msg = null;
348            if (subList != null && subList.size() > 0) {
349                do {
350                    boolean dispatched = false;
351                    msg = container.poll();
352                    if (msg != null) {
353                        QueueListEntry entry = subList.getFirstEntry();
354                        boolean targeted = false;
355                        while (entry != null) {
356                            SubscriptionImpl sub = (SubscriptionImpl) entry.getElement();
357                            if (sub.isTarget(msg)) {
358                                targeted = true;
359                                if (msg.isMessagePart()){
360                                    SubscriptionImpl sameTarget = (SubscriptionImpl)messagePartSubscribers.get(msg.getParentMessageID());
361                                    if (sameTarget == null){
362                                        sameTarget = sub;
363                                        messagePartSubscribers.put(msg.getParentMessageID(), sameTarget);
364                                    }
365                                    sameTarget.addMessage(container,msg);
366                                    if (msg.isLastMessagePart()){
367                                        messagePartSubscribers.remove(msg.getParentMessageID());
368                                    }
369                                    dispatched = true;
370                                    dispatcher.wakeup(sameTarget);
371                                    break;
372                                }else if (!sub.isAtPrefetchLimit()) {
373                                    System.out.println("dispatching: "+msg.getJMSMessageID());
374                                    sub.addMessage(container, msg);
375                                    dispatched = true;
376                                    dispatcher.wakeup(sub);
377                                    subList.rotate(); //round-robin the list
378                                    break;
379                                }
380                            }
381                            entry = subList.getNextEntry(entry);
382                        }
383                        if (!dispatched) {
384                            if (targeted) { //ie. it can be selected by current active consumers - but they are at
385                                // pre-fectch
386                                // limit
387                                container.returnMessage(msg.getJMSMessageIdentity());
388                            }
389                            break;
390                        }
391                    }
392                }
393                while (msg != null && count++ < MAX_MESSAGES_DISPATCHED_FROM_POLL);
394            }
395        }
396    
397        private void updateActiveSubscriptions(Subscription subscription) throws JMSException {
398            //need to do wildcards for this - but for now use exact matches
399            synchronized (subscriptionMutex) {
400                boolean processedSubscriptionContainer = false;
401    
402                String subscriptionPhysicalName = subscription.getDestination().getPhysicalName();
403                for (Iterator iter = messageContainers.entrySet().iterator(); iter.hasNext();) {
404                    Map.Entry entry = (Map.Entry) iter.next();
405                    String destinationName = (String) entry.getKey();
406                    QueueMessageContainer container = (QueueMessageContainer) entry.getValue();
407    
408                    if (destinationName.equals(subscriptionPhysicalName)) {
409                        processedSubscriptionContainer = true;
410                    }
411                    processSubscription(subscription, container);
412                }
413                if (!processedSubscriptionContainer) {
414                    processSubscription(subscription, (QueueMessageContainer) getContainer(subscriptionPhysicalName));
415                }
416            }
417        }
418    
419        protected void processSubscription(Subscription subscription, QueueMessageContainer container) throws JMSException {
420            // TODO should change this for wild cards ...
421            if (subscription.isBrowser()) {
422                updateBrowsers(container, subscription);
423            }
424            else {
425                updateActiveSubscriptions(container, subscription);
426            }
427        }
428    
429        private void updateActiveSubscriptions(QueueMessageContainer container, Subscription sub) throws JMSException {
430            //need to do wildcards for this - but for now use exact matches
431            //should change this for wild cards ...
432            if (container.getDestinationName().equals(sub.getDestination().getPhysicalName())) {
433                container.reset();//reset container - flushing all filter out messages to new consumer
434                QueueList list = getSubscriptionList(container);
435                if (!list.contains(sub)) {
436                    list.add(sub);
437                }
438            }
439        }
440    
441        private QueueList getSubscriptionList(QueueMessageContainer container) {
442            QueueList list = (QueueList) activeSubscriptions.get(container);
443            if (list == null) {
444                list = new DefaultQueueList();
445                activeSubscriptions.put(container, list);
446            }
447            return list;
448        }
449    
450        private void updateBrowsers(QueueMessageContainer container, Subscription sub) throws JMSException {
451            //need to do wildcards for this - but for now use exact matches
452            //should change this for wild cards ...
453            if (container.getDestinationName().equals(sub.getDestination().getPhysicalName())) {
454                container.reset();//reset container - flushing all filter out messages to new consumer
455                QueueList list = getBrowserList(container);
456                if (!list.contains(sub)) {
457                    list.add(sub);
458                }
459            }
460        }
461    
462        private QueueList getBrowserList(QueueMessageContainer container) {
463            QueueList list = (QueueList) browsers.get(container);
464            if (list == null) {
465                list = new DefaultQueueList();
466                browsers.put(container, list);
467            }
468            return list;
469        }
470    
471        /**
472         * Create filter for a Consumer
473         *
474         * @param info
475         * @return the Fitler
476         * @throws javax.jms.JMSException
477         */
478        protected Filter createFilter(ConsumerInfo info) throws JMSException {
479            Filter filter = filterFactory.createFilter(info.getDestination(), info.getSelector());
480            if (info.isNoLocal()) {
481                filter = new AndFilter(filter, new NoLocalFilter(info.getClientId()));
482            }
483            return filter;
484        }
485    
486        public void createMessageContainer(ActiveMQDestination dest) throws JMSException {
487            // This container only does queues.
488            if(!dest.isQueue()) 
489                return;
490            super.createMessageContainer(dest);
491        }
492        
493        public synchronized void destroyMessageContainer(ActiveMQDestination dest) throws JMSException {
494            // This container only does queues.
495            if(!dest.isQueue()) 
496                return;
497            super.destroyMessageContainer(dest);
498            destinationMap.removeAll(dest);
499        }
500        
501        /**
502         * Add a message to a dead letter queue
503         * @param deadLetterName
504         * @param message
505         * @throws JMSException
506         */
507        public void sendToDeadLetterQueue(String deadLetterName,ActiveMQMessage message) throws JMSException{
508            QueueMessageContainer container = (QueueMessageContainer)getContainer(deadLetterName);
509            container.setDeadLetterQueue(true);
510            container.addMessage(message);
511            dispatcher.wakeup();
512        }
513    }