001    /** 
002     * 
003     * Copyright 2004 Protique Ltd
004     * 
005     * Licensed under the Apache License, Version 2.0 (the "License"); 
006     * you may not use this file except in compliance with the License. 
007     * You may obtain a copy of the License at 
008     * 
009     * http://www.apache.org/licenses/LICENSE-2.0
010     * 
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS, 
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
014     * See the License for the specific language governing permissions and 
015     * limitations under the License. 
016     * 
017     **/
018    
019    package org.activemq.transport.multicast;
020    import java.io.IOException;
021    import java.io.Serializable;
022    import java.net.URI;
023    import java.net.URISyntaxException;
024    import java.util.Iterator;
025    import java.util.Map;
026    import javax.jms.JMSException;
027    import org.apache.commons.logging.Log;
028    import org.apache.commons.logging.LogFactory;
029    import org.activemq.io.impl.DefaultWireFormat;
030    import org.activemq.message.ActiveMQMessage;
031    import org.activemq.message.ActiveMQObjectMessage;
032    import org.activemq.message.Packet;
033    import org.activemq.message.PacketListener;
034    import org.activemq.transport.DiscoveryAgentSupport;
035    import org.activemq.transport.DiscoveryEvent;
036    import org.activemq.util.IdGenerator;
037    import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
038    import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
039    import EDU.oswego.cs.dl.util.concurrent.SynchronizedLong;
040    
041    
042    /**
043     * An agent used to discover other instances of a service
044     * 
045     * @version $Revision: 1.1.1.1 $
046     */
047    public class MulticastDiscoveryAgent extends DiscoveryAgentSupport implements PacketListener, Runnable {
048        private static final Log log = LogFactory.getLog(MulticastDiscoveryAgent.class);
049        /**
050         * default URI used for discovery
051         */
052        public static final String DEFAULT_DISCOVERY_URI = "multicast://224.1.2.3:6066";
053    //    private static final String KEEP_ALIVE_TYPE = "KEEP_ALIVE";
054        private static final String SERVICE_TYPE = "SERVICE";
055        private static final String ALIVE_TYPE = "ALIVE_TYPE";
056        private static final String SERVICE_NAME = "SERVICE_NAME";
057        private static final String CHANNEL_NAME = "CHANNEL_NAME";
058        private static final long DEFAULT_KEEP_ALIVE_TIMEOUT = 5000;
059        private static final int DEFAULT_TIMEOUT_COUNT = 2;
060        private ConcurrentHashMap services;
061        private ConcurrentHashMap keepAliveMap;
062        private SynchronizedBoolean started;
063        private MulticastTransportChannel channel;
064        private Thread runner;
065        private IdGenerator idGen;
066        private String localId;
067        private URI uri;
068        private int timeoutCount;
069        private long keepAliveTimeout;
070        private long timeoutExpiration;
071        //private ActiveMQMessage keepAliveMessage;
072        private ActiveMQObjectMessage serviceMessage;
073        private String serviceName = "";
074        private int timeToLive = 1;
075        private String channelName = "defaultChannel";
076    
077        /**
078         * Construct a discovery agent that uses multicast
079         * 
080         * @param channelName
081         * @throws JMSException
082         */
083        public MulticastDiscoveryAgent(String channelName) throws JMSException {
084            init();
085            this.channelName = channelName;
086            try {
087                setUri(new URI(DEFAULT_DISCOVERY_URI));
088            }
089            catch (URISyntaxException e) {
090                JMSException jmsEx = new JMSException("URI Syntax exception: " + e.getMessage());
091                jmsEx.setLinkedException(e);
092                throw jmsEx;
093            }
094        }
095    
096        public MulticastDiscoveryAgent(URI uri) {
097            init();
098            this.uri = uri;
099        }
100    
101        private void init() {
102            this.started = new SynchronizedBoolean(false);
103            this.services = new ConcurrentHashMap();
104            this.keepAliveMap = new ConcurrentHashMap();
105            this.idGen = new IdGenerator();
106            this.localId = idGen.generateId();
107            this.keepAliveTimeout = DEFAULT_KEEP_ALIVE_TIMEOUT;
108            this.timeoutCount = DEFAULT_TIMEOUT_COUNT;
109            this.timeoutExpiration = this.keepAliveTimeout * timeoutCount;
110        }
111    
112    
113        /**
114         * @return Returns the keepAliveTimeout.
115         */
116        public long getKeepAliveTimeout() {
117            return keepAliveTimeout;
118        }
119    
120        /**
121         * @param keepAliveTimeout The keepAliveTimeout to set.
122         */
123        public void setKeepAliveTimeout(long keepAliveTimeout) {
124            this.keepAliveTimeout = keepAliveTimeout;
125        }
126    
127        /**
128         * @return Returns the timeoutCount.
129         */
130        public int getTimeoutCount() {
131            return timeoutCount;
132        }
133    
134        /**
135         * @param timeoutCount The timeoutCount to set.
136         */
137        public void setTimeoutCount(int timeoutCount) {
138            this.timeoutCount = timeoutCount;
139        }
140    
141        /**
142         * @return Returns the localId.
143         */
144        public String getLocalId() {
145            return localId;
146        }
147    
148        /**
149         * @param localId The localId to set.
150         */
151        public void setLocalId(String localId) {
152            this.localId = localId;
153        }
154    
155        /**
156         * @return Returns the uri.
157         */
158        public URI getUri() {
159            return uri;
160        }
161    
162        /**
163         * @param uri The uri to set.
164         */
165        public void setUri(URI uri) {
166            this.uri = uri;
167        }
168    
169        /**
170         * @return the timeToLive of multicast packets used for discovery
171         */
172        public int getTimeToLive() {
173            return this.timeToLive;
174        }
175    
176        /**
177         * @param timeToLive The timeToLive for multicast packets used in discovery.
178         * @throws IOException
179         */
180        public void setTimeToLive(int timeToLive) throws IOException {
181            this.timeToLive = timeToLive;
182            if (channel != null) {
183                channel.setTimeToLive(timeToLive);
184            }
185        }
186    
187        /**
188         * @return Returns the channelName.
189         */
190        public String getChannelName() {
191            return channelName;
192        }
193    
194        /**
195         * @param channelName The channelName to set.
196         */
197        public void setChannelName(String channelName) {
198            this.channelName = channelName;
199        }
200    
201        /**
202         * @return a pretty print of this instance
203         */
204        public String toString() {
205            return "MulticastDiscoveryAgent:" + serviceName;
206        }
207    
208        /**
209         * @return the number of active services, including self
210         */
211        public int getServicesCount() {
212            return (this.serviceMessage != null ? 1 : 0) + services.size();
213        }
214    
215        /**
216         * Register a service for other discover nodes
217         * 
218         * @param name
219         * @param details
220         * @throws JMSException
221         */
222        public void registerService(String name, Map details) throws JMSException {
223            if (this.serviceMessage != null){
224                //notify the old service has stopped
225                this.serviceMessage.setBooleanProperty(ALIVE_TYPE, false);
226                sendService();
227            }
228            this.serviceName = name;
229            this.serviceMessage = new ActiveMQObjectMessage();
230            this.serviceMessage.setJMSType(SERVICE_TYPE);
231            this.serviceMessage.setStringProperty(SERVICE_NAME, name);
232            this.serviceMessage.setStringProperty(CHANNEL_NAME, channelName);
233            this.serviceMessage.setBooleanProperty(ALIVE_TYPE, true);
234            this.serviceMessage.setObject((Serializable) details);
235            sendService();
236        }
237    
238        /**
239         * start this discovery agent
240         * 
241         * @throws JMSException
242         */
243        public void start() throws JMSException {
244            if (started.commit(false, true)) {
245                this.timeoutExpiration = this.keepAliveTimeout * timeoutCount;
246                channel = new MulticastTransportChannel(new DefaultWireFormat(), uri);
247    
248                channel.setClientID(localId);
249                channel.setPacketListener(this);
250                try {
251                    channel.setTimeToLive(getTimeToLive());
252                }
253                catch (IOException e) {
254                    JMSException jmsEx = new JMSException("Set time to live failed");
255                    jmsEx.setLinkedException(e);
256                    throw jmsEx;
257                }
258                log.info("Starting multicast discovery agent on URI: " + uri + " with clientID: " + channel.getClientID());
259    
260                channel.start();
261                runner = new Thread(this);
262                runner.setName(toString());
263                runner.setDaemon(true);
264                runner.setPriority(Thread.MAX_PRIORITY);
265                runner.start();
266                sendService();
267                fireServiceStarted(serviceMessage);
268            }
269        }
270    
271        /**
272         * stop this discovery agent
273         * 
274         * @throws JMSException
275         */
276        public void stop() throws JMSException {
277            boolean doStop = false;
278            synchronized (started) {
279                doStop = started.get();
280                if (doStop) {
281                    if (this.serviceMessage != null){
282                        //notify the old service has stopped
283                        this.serviceMessage.setBooleanProperty(ALIVE_TYPE, false);
284                        sendService();
285                    }
286                    channel.stop();
287                    started.set(false);
288                }
289            }
290            if (doStop) {
291                fireServiceStopped(serviceMessage);
292            }
293        }
294    
295        /**
296         * send a keep alive message
297         */
298        public void run() {
299            try {
300                int count = 0;
301                while (started.get()) {
302                    sendService();
303                    log.debug(serviceName + " sent keep alive");
304                    if (++count >= timeoutCount) {
305                        count = 0;
306                        checkNodesAlive();
307                    }
308                    Thread.sleep(getKeepAliveTimeout());
309                }
310            }
311            catch (Throwable e) {
312                log.error(toString() + " run failed", e);
313            }
314        }
315    
316        /**
317         * Consume multicast packets
318         * 
319         * @param packet
320         */
321        public void consume(Packet packet) {
322            try {
323                if (packet != null && packet.isJMSMessage()) {
324                    ActiveMQMessage msg = (ActiveMQMessage) packet;
325                    String receivedChannelName = msg.getStringProperty(CHANNEL_NAME);
326                    if (receivedChannelName != null && receivedChannelName.equals(channelName)) {
327                        String type = msg.getJMSType();
328                        if (type != null) {
329                            if (type.equals(SERVICE_TYPE)) {
330                                processService(msg);
331                            }
332                            else {
333                                log.warn(toString() + " received Message of unknown type: " + type);
334                            }
335                        }
336                        else {
337                            log.error(toString() + " message type is null");
338                        }
339                    }
340                    else {
341                        if (log.isDebugEnabled()) {
342                            log.debug("Discarded discovery message for channel: " + receivedChannelName + " in channel: " + channelName);
343                        }
344                    }
345                }
346                else {
347                    log.warn(toString() + " received unexpected packet: " + packet);
348                }
349            }
350            catch (Throwable e) {
351                log.error(toString() + " couldn't process packet: " + packet, e);
352            }
353        }
354    
355        
356    
357        private void sendService() throws JMSException {
358            if (started.get() && channel != null && !channel.isPendingStop() && serviceMessage != null) {
359                channel.asyncSend(serviceMessage);
360            }
361        }
362    
363       
364        private void processService(ActiveMQMessage message) throws JMSException {
365            if (message != null) {
366                ActiveMQObjectMessage objMsg = (ActiveMQObjectMessage) message;
367                String name = objMsg.getStringProperty(SERVICE_NAME);
368    
369                if (log.isDebugEnabled()) {
370                    log.debug("Service message received for: " + name);
371                }
372                addService(name);
373                ActiveMQObjectMessage oldMsg = (ActiveMQObjectMessage) services.get(name);
374                services.put(name, objMsg);
375                if (oldMsg == null) {
376                    fireServiceStarted(objMsg);
377                    //send out that we are here!
378                    sendService();
379                }
380                if (message.getBooleanProperty(ALIVE_TYPE)) {
381                    addService(name);
382                }
383                else {
384                    removeService(name);
385                }
386            }
387        }
388    
389        private void fireServiceStarted(ActiveMQObjectMessage message) throws JMSException {
390            if (message != null) {
391                String name = message.getStringProperty(SERVICE_NAME);
392                Map map = (Map) message.getObject();
393                DiscoveryEvent event = new DiscoveryEvent(this, name, map);
394                fireAddService(event);
395            }
396        }
397    
398        private void fireServiceStopped(ActiveMQObjectMessage message) throws JMSException {
399            if (message != null) {
400                String name = message.getStringProperty(SERVICE_NAME);
401                Map map = (Map) message.getObject();
402                DiscoveryEvent event = new DiscoveryEvent(this, name, map);
403                fireRemoveService(event);
404            }
405        }
406    
407        private void addService(String name) {
408            long timestamp = System.currentTimeMillis();
409            SynchronizedLong activeTime = (SynchronizedLong) keepAliveMap.get(name);
410            if (activeTime == null) {
411                activeTime = new SynchronizedLong(0);
412                keepAliveMap.put(name, activeTime);
413            }
414            activeTime.set(timestamp);
415        }
416    
417        private void removeService(String name) throws JMSException {
418            keepAliveMap.remove(name);
419            ActiveMQObjectMessage message = (ActiveMQObjectMessage) services.remove(name);
420            if (message != null) {
421                fireServiceStopped(message);
422            }
423        }
424    
425        private void checkNodesAlive() throws JMSException {
426            long timestamp = System.currentTimeMillis();
427            long timeout = timestamp - timeoutExpiration;
428            for (Iterator i = keepAliveMap.entrySet().iterator();i.hasNext();) {
429                Map.Entry entry = (Map.Entry) i.next();
430                SynchronizedLong activeTime = (SynchronizedLong) entry.getValue();
431                if (activeTime.get() < timeout) {
432                    String name = entry.getKey().toString();
433                    removeService(name);
434                    log.warn(serviceName + " Expiring node: " + name);
435                }
436            }
437        }
438    }