001    /** 
002     * 
003     * Copyright 2004 Protique Ltd
004     * 
005     * Licensed under the Apache License, Version 2.0 (the "License"); 
006     * you may not use this file except in compliance with the License. 
007     * You may obtain a copy of the License at 
008     * 
009     * http://www.apache.org/licenses/LICENSE-2.0
010     * 
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS, 
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
014     * See the License for the specific language governing permissions and 
015     * limitations under the License. 
016     * 
017     **/
018    package org.activemq.service.impl;
019    
020    import org.apache.commons.logging.Log;
021    import org.apache.commons.logging.LogFactory;
022    import org.activemq.broker.BrokerClient;
023    import org.activemq.filter.FilterFactory;
024    import org.activemq.filter.FilterFactoryImpl;
025    import org.activemq.message.ActiveMQDestination;
026    import org.activemq.message.ActiveMQMessage;
027    import org.activemq.message.ConsumerInfo;
028    import org.activemq.service.Dispatcher;
029    import org.activemq.service.MessageContainer;
030    import org.activemq.service.Subscription;
031    import org.activemq.service.SubscriptionContainer;
032    import org.activemq.service.RedeliveryPolicy;
033    import org.activemq.service.DeadLetterPolicy;
034    import org.activemq.service.TransactionManager;
035    import org.activemq.service.TransactionTask;
036    import org.activemq.store.PersistenceAdapter;
037    
038    import javax.jms.DeliveryMode;
039    import javax.jms.JMSException;
040    import java.util.Iterator;
041    import java.util.Set;
042    
043    /**
044     * A default implementation of a Broker of Topic messages for transient consumers
045     *
046     * @version $Revision: 1.1.1.1 $
047     */
048    public class TransientTopicMessageContainerManager extends DurableTopicMessageContainerManager {
049        private static final Log log = LogFactory.getLog(TransientTopicMessageContainerManager.class);
050    
051        public TransientTopicMessageContainerManager(PersistenceAdapter persistenceAdapter) {
052            this(persistenceAdapter, new SubscriptionContainerImpl(new RedeliveryPolicy(), new DeadLetterPolicy()), new FilterFactoryImpl(), new DispatcherImpl());
053        }
054    
055        public TransientTopicMessageContainerManager(PersistenceAdapter persistenceAdapter, SubscriptionContainer subscriptionContainer, FilterFactory filterFactory, Dispatcher dispatcher) {
056            super(persistenceAdapter, subscriptionContainer, filterFactory, dispatcher);
057        }
058    
059        /**
060         * @param client
061         * @param info
062         * @throws javax.jms.JMSException
063         */
064        public void addMessageConsumer(BrokerClient client, ConsumerInfo info) throws JMSException {
065            if (info.getDestination().isTopic()) {
066                doAddMessageConsumer(client, info);
067            }
068        }
069    
070    
071        /**
072         * @param client
073         * @param info
074         * @throws javax.jms.JMSException
075         */
076        public void removeMessageConsumer(BrokerClient client, ConsumerInfo info) throws JMSException {
077            Subscription sub = (Subscription) activeSubscriptions.remove(info.getConsumerId());
078            if (sub != null) {
079                sub.setActive(false);
080                dispatcher.removeActiveSubscription(client, sub);
081                subscriptionContainer.removeSubscription(info.getConsumerId());
082                sub.clear();
083            }
084        }
085    
086    
087        /**
088         * @param client
089         * @param message
090         * @throws javax.jms.JMSException
091         */
092        public void sendMessage(final BrokerClient client, final ActiveMQMessage message) throws JMSException {
093            final ActiveMQDestination destination = message.getJMSActiveMQDestination();
094            if (destination == null || !destination.isTopic()) {
095                return;
096            }
097            
098            TransactionManager.getContexTransaction().addPostCommitTask(new TransactionTask(){
099                public void execute() throws Throwable {
100                    doSendMessage(client, message, destination);
101                }
102            });     
103            
104        }
105    
106        /**
107         * @param client
108         * @param message
109         * @param destination
110         * @throws JMSException
111         */
112        private void doSendMessage(BrokerClient client, ActiveMQMessage message, ActiveMQDestination destination) throws JMSException {
113            MessageContainer container = null;
114            if (log.isDebugEnabled()) {
115                log.debug("Dispaching to " + subscriptionContainer + " subscriptions with message: " + message);
116            }
117            Set subscriptions = subscriptionContainer.getSubscriptions(destination);
118            for (Iterator i = subscriptions.iterator(); i.hasNext();) {
119                Subscription sub = (Subscription) i.next();
120                if (sub.isTarget(message) && (!sub.isDurableTopic() || message.getJMSDeliveryMode() == DeliveryMode.NON_PERSISTENT)) {
121                    if (container == null) {
122                        container = getContainer(message.getJMSDestination().toString());
123                        container.addMessage(message);
124                    }
125                    sub.addMessage(container, message);
126                }
127            }
128            updateSendStats(client, message);
129        }
130    
131        /**
132         * Delete a durable subscriber
133         *
134         * @param clientId
135         * @param subscriberName
136         * @throws javax.jms.JMSException if the subscriber doesn't exist or is still active
137         */
138        public void deleteSubscription(String clientId, String subscriberName) throws JMSException {
139        }
140    }