001    /** 
002     * 
003     * Copyright 2004 Protique Ltd
004     * 
005     * Licensed under the Apache License, Version 2.0 (the "License"); 
006     * you may not use this file except in compliance with the License. 
007     * You may obtain a copy of the License at 
008     * 
009     * http://www.apache.org/licenses/LICENSE-2.0
010     * 
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS, 
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
014     * See the License for the specific language governing permissions and 
015     * limitations under the License. 
016     * 
017     **/
018    
019    package org.activemq.advisories;
020    import java.util.Iterator;
021    import java.util.List;
022    import java.util.Map;
023    import java.util.Set;
024    import javax.jms.Connection;
025    import javax.jms.Destination;
026    import javax.jms.JMSException;
027    import javax.jms.Message;
028    import javax.jms.MessageConsumer;
029    import javax.jms.MessageListener;
030    import javax.jms.ObjectMessage;
031    import javax.jms.Session;
032    import org.apache.commons.logging.Log;
033    import org.apache.commons.logging.LogFactory;
034    import org.activemq.message.ActiveMQDestination;
035    import org.activemq.message.ConsumerInfo;
036    import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
037    import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArrayList;
038    import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArraySet;
039    import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
040    
041    /**
042     * A helper class for listening for MessageConsumer advisories
043     * 
044     * @version $Revision: 1.1.1.1 $
045     */
046    public class ConsumerAdvisor implements MessageListener {
047        private static final Log log = LogFactory.getLog(ConsumerAdvisor.class);
048        private Connection connection;
049        private ActiveMQDestination destination;
050        private Session session;
051        private List listeners = new CopyOnWriteArrayList();
052        private SynchronizedBoolean started = new SynchronizedBoolean(false);
053        private Map activeSubscribers = new ConcurrentHashMap();
054    
055        /**
056         * Construct a ConsumerAdvisor
057         * 
058         * @param connection
059         * @param destination the destination to listen for Consumer events
060         * @throws JMSException
061         */
062        public ConsumerAdvisor(Connection connection, Destination destination) throws JMSException {
063            this.connection = connection;
064            this.destination = ActiveMQDestination.transformDestination(destination);
065        }
066    
067        /**
068         * start listening for advisories
069         * 
070         * @throws JMSException
071         */
072        public void start() throws JMSException {
073            if (started.commit(false, true)) {
074                session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
075                MessageConsumer consumer = session.createConsumer(destination.getTopicForConsumerAdvisory());
076                consumer.setMessageListener(this);
077            }
078        }
079    
080        /**
081         * stop listening for advisories
082         * 
083         * @throws JMSException
084         */
085        public void stop() throws JMSException {
086            if (started.commit(true, false)) {
087                if (session != null) {
088                    session.close();
089                }
090            }
091        }
092    
093        /**
094         * Add a listener
095         * 
096         * @param l
097         */
098        public void addListener(ConsumerAdvisoryEventListener l) {
099            listeners.add(l);
100        }
101    
102        /**
103         * Remove a listener
104         * 
105         * @param l
106         */
107        public void removeListener(ConsumerAdvisoryEventListener l) {
108            listeners.remove(l);
109        }
110    
111        /**
112         * returns true if there is an active subscriber for the destination
113         * 
114         * @param destination
115         * @return true if a subscriber for the destination
116         */
117        public boolean isActive(Destination destination) {
118            return activeSubscribers.containsKey(destination);
119        }
120    
121        /**
122         * return a set of active ConsumerInfo's for a particular destination
123         * @param destination
124         * @return the set of currently active ConsumerInfo objects
125         */
126        public Set activeConsumers(Destination destination) {
127            Set set = (Set) activeSubscribers.get(destination);
128            return set != null ? set : new CopyOnWriteArraySet();
129        }
130    
131        /**
132         * OnMessage() implementation
133         * 
134         * @param msg
135         */
136        public void onMessage(Message msg) {
137            if (msg instanceof ObjectMessage) {
138                try {
139                    ConsumerInfo info = (ConsumerInfo) ((ObjectMessage) msg).getObject();
140                    updateActiveConsumers(info);
141                    ConsumerAdvisoryEvent event = new ConsumerAdvisoryEvent(info);
142                    fireEvent(event);
143                }
144                catch (JMSException e) {
145                    log.error("Failed to process message: " + msg);
146                }
147            }
148        }
149    
150        private void fireEvent(ConsumerAdvisoryEvent event) {
151            for (Iterator i = listeners.iterator();i.hasNext();) {
152                ConsumerAdvisoryEventListener l = (ConsumerAdvisoryEventListener) i.next();
153                l.onEvent(event);
154            }
155        }
156    
157        private void updateActiveConsumers(ConsumerInfo info) {
158            Set set = (Set) activeSubscribers.get(info.getDestination());
159            if (info.isStarted()) {
160                if (set == null) {
161                    set = new CopyOnWriteArraySet();
162                    activeSubscribers.put(info.getDestination(), set);
163                }
164                set.add(info);
165            }
166            else {
167                if (set != null) {
168                    set.remove(info);
169                    if (set.isEmpty()) {
170                        activeSubscribers.remove(info.getDestination());
171                    }
172                }
173            }
174        }
175    }