001    /*
002     * KeepAliveThread.java
003     */
004    package org.activemq.transport.reliable;
005    
006    import java.util.Iterator;
007    
008    import javax.jms.JMSException;
009    
010    import org.apache.commons.logging.Log;
011    import org.apache.commons.logging.LogFactory;
012    import org.activemq.message.KeepAlive;
013    
014    import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArraySet;
015    import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
016    
017    
018    /**
019     * KeepAliveDaemon keeps channels alive by sending KeepAlive packets on a
020     * specified interval. If the packets aren't don't get a receipt within a
021     * specified time, the channel will be forcefully disconnected.
022     */
023    public class KeepAliveDaemon implements Runnable {
024    
025            private static final Log log = LogFactory.getLog(KeepAliveDaemon.class);
026    
027            private static KeepAliveDaemon instance = null;
028    
029            private long checkInterval = 15000L;
030            private long lastCheck = 0;
031            private Object lock = new Object();
032    
033            private SynchronizedBoolean started = new SynchronizedBoolean(false);
034            private SynchronizedBoolean stopped = new SynchronizedBoolean(false);
035            private CopyOnWriteArraySet monitoredChannels = new CopyOnWriteArraySet();
036            private CopyOnWriteArraySet zombieChannelSuspects = new CopyOnWriteArraySet();
037    
038    
039            /**
040             * Constructs a new KeepAliveDaemon which will send KeepAlive packets
041             * throught the wrapped channel.
042             */
043            protected KeepAliveDaemon() {
044    
045            }
046    
047            /**
048             * Gets the current instance. Singletons implemented this way aren't popular
049             * these days, but it might be good here. :)
050             * 
051             * @return the daemon
052             */
053            public static synchronized KeepAliveDaemon getInstance() {
054                    if (instance == null)
055                            instance = new KeepAliveDaemon();
056                    return instance;
057            }
058    
059            public void addMonitoredChannel(ReliableTransportChannel channel) {
060                    if (channel.getKeepAliveTimeout() <= 0)
061                            return;
062                    log.debug("Adding channel " + channel);
063                    // Check that the timeout isn't lower than our check interval as
064                    // this would cause the channel to constantly be disconnected.
065                    // This check should perhaps be done whenever the channel changes it's
066                    // interval, but in practice, this will probably never happen.
067                    if (channel.getKeepAliveTimeout() / 2 < checkInterval) {
068                            setCheckInterval(channel.getKeepAliveTimeout() / 2);
069                            log.info("Adjusting check interval to " + checkInterval + " as channel " + channel.toString()
070                                            + " has lower timeout time than the current check interval.");
071                    }
072                    monitoredChannels.add(channel);
073            }
074    
075            public void removeMonitoredChannel(ReliableTransportChannel channel) {
076                    log.debug("Removing channel " + channel);
077                    monitoredChannels.remove(channel);
078            }
079    
080            /**
081             * Sets the number of milliseconds between keep-alive checks are done.
082             * 
083             * @param interval
084             *            the interval
085             */
086            public void setCheckInterval(long interval) {
087                    this.checkInterval = interval;
088                    if (started.and(!stopped.get())) {
089                            restart();
090                    }
091            }
092    
093            public long getCheckInterval() {
094                    return checkInterval;
095            }
096    
097            public long getLastCheckTime() {
098                    return lastCheck;
099            }
100    
101            public void start() {
102                    if (started.commit(false, true)) {
103                            log.debug("Scheduling keep-alive every " + checkInterval + " millisecond.");
104                            Thread t = new Thread(this);
105                            t.setName("KeepAliveDaemon");
106                            t.setDaemon(true);
107                            t.start();
108                    }
109            }
110    
111            public void stop() {
112                    if (stopped.commit(false, true)) {
113                            synchronized (lock) {
114                                    lock.notifyAll();
115                            }
116                            log.debug("Stopping keep-alive.");
117                    }
118            }
119    
120            public void restart() {
121                    log.debug("Restarting keep-alive.");
122                    stop();
123                    start();
124            }
125    
126            public void run() {
127                    lastCheck = System.currentTimeMillis() - checkInterval;
128                    while (!stopped.get()) {
129                            for (Iterator i = zombieChannelSuspects.iterator(); i.hasNext();) {
130                                    ReliableTransportChannel channel = (ReliableTransportChannel) i.next();
131                                    examineZombieSuspect(channel);
132                            }
133    
134                            for (Iterator i = monitoredChannels.iterator(); i.hasNext();) {
135                                    ReliableTransportChannel channel = (ReliableTransportChannel) i.next();
136                                    if (!zombieChannelSuspects.contains(channel))
137                                            examineChannel(channel);
138                            }
139                            lastCheck = System.currentTimeMillis();
140                            synchronized (lock) {
141                                    try {
142                                            lock.wait(checkInterval);
143                                    } catch (InterruptedException e) {
144    
145                                    }
146                            }
147                    }
148            }
149    
150            private void examineZombieSuspect(ReliableTransportChannel channel) {
151                    if ((channel.getLastReceiptTimestamp() + channel.getKeepAliveTimeout() * 2) < System.currentTimeMillis()) {
152                            // Timed out
153                            log.info("Forcing channel "
154                                            + channel
155                                            + " to disconnect since it "
156                                            + (channel.getLastReceiptTimestamp() == 0 ? "never has responded " : "hasn't responded since "
157                                                            + new java.util.Date(channel.getLastReceiptTimestamp())) + " and has a timeout of "
158                                            + channel.getKeepAliveTimeout());
159                            channel.forceDisconnect();
160                            // Remove it and wait for a reconnect
161                            zombieChannelSuspects.remove(channel);
162                    } else if ((channel.getLastReceiptTimestamp() + channel.getKeepAliveTimeout()) < System.currentTimeMillis()) {
163                            // Still a zombie suspect, but has not timed out
164                            log.debug("Still waiting for response from channel " + channel);
165                    } else {
166                            // It's alive again
167                            log.debug("Channel " + channel + " responded in time.");
168                            zombieChannelSuspects.remove(channel);
169                    }
170            }
171    
172            private void examineChannel(ReliableTransportChannel channel) {
173            // Is the channel stopping?
174            if (channel.isPendingStop()) {
175                removeMonitoredChannel(channel);
176            } else {
177                // Then it should be active 
178                // Keep pinging the channel periodically
179                if ((channel.getLastReceiptTimestamp() + channel.getKeepAliveTimeout()) < System.currentTimeMillis()) {
180                    log.debug("Sending keep-alive on channel " + channel.toString());
181                    KeepAlive packet = new KeepAlive();
182                    packet.setReceiptRequired(true);
183                    boolean wasConnected = channel.isTransportConnected();
184                    try {
185                        channel.asyncSendWithReceipt(packet);
186                        zombieChannelSuspects.add(channel);
187                    } catch (JMSException e) {
188                        // only report an error if the channel was connetected.
189                        if (wasConnected) {
190                            log.error("Error sending keep-alive to channel " + channel.toString()
191                                    + ". Treating as temporary problem.", e);
192                        } 
193                    }
194                }
195            }
196        }
197    }