001 /* 002 * KeepAliveThread.java 003 */ 004 package org.activemq.transport.reliable; 005 006 import java.util.Iterator; 007 008 import javax.jms.JMSException; 009 010 import org.apache.commons.logging.Log; 011 import org.apache.commons.logging.LogFactory; 012 import org.activemq.message.KeepAlive; 013 014 import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArraySet; 015 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean; 016 017 018 /** 019 * KeepAliveDaemon keeps channels alive by sending KeepAlive packets on a 020 * specified interval. If the packets aren't don't get a receipt within a 021 * specified time, the channel will be forcefully disconnected. 022 */ 023 public class KeepAliveDaemon implements Runnable { 024 025 private static final Log log = LogFactory.getLog(KeepAliveDaemon.class); 026 027 private static KeepAliveDaemon instance = null; 028 029 private long checkInterval = 15000L; 030 private long lastCheck = 0; 031 private Object lock = new Object(); 032 033 private SynchronizedBoolean started = new SynchronizedBoolean(false); 034 private SynchronizedBoolean stopped = new SynchronizedBoolean(false); 035 private CopyOnWriteArraySet monitoredChannels = new CopyOnWriteArraySet(); 036 private CopyOnWriteArraySet zombieChannelSuspects = new CopyOnWriteArraySet(); 037 038 039 /** 040 * Constructs a new KeepAliveDaemon which will send KeepAlive packets 041 * throught the wrapped channel. 042 */ 043 protected KeepAliveDaemon() { 044 045 } 046 047 /** 048 * Gets the current instance. Singletons implemented this way aren't popular 049 * these days, but it might be good here. :) 050 * 051 * @return the daemon 052 */ 053 public static synchronized KeepAliveDaemon getInstance() { 054 if (instance == null) 055 instance = new KeepAliveDaemon(); 056 return instance; 057 } 058 059 public void addMonitoredChannel(ReliableTransportChannel channel) { 060 if (channel.getKeepAliveTimeout() <= 0) 061 return; 062 log.debug("Adding channel " + channel); 063 // Check that the timeout isn't lower than our check interval as 064 // this would cause the channel to constantly be disconnected. 065 // This check should perhaps be done whenever the channel changes it's 066 // interval, but in practice, this will probably never happen. 067 if (channel.getKeepAliveTimeout() / 2 < checkInterval) { 068 setCheckInterval(channel.getKeepAliveTimeout() / 2); 069 log.info("Adjusting check interval to " + checkInterval + " as channel " + channel.toString() 070 + " has lower timeout time than the current check interval."); 071 } 072 monitoredChannels.add(channel); 073 } 074 075 public void removeMonitoredChannel(ReliableTransportChannel channel) { 076 log.debug("Removing channel " + channel); 077 monitoredChannels.remove(channel); 078 } 079 080 /** 081 * Sets the number of milliseconds between keep-alive checks are done. 082 * 083 * @param interval 084 * the interval 085 */ 086 public void setCheckInterval(long interval) { 087 this.checkInterval = interval; 088 if (started.and(!stopped.get())) { 089 restart(); 090 } 091 } 092 093 public long getCheckInterval() { 094 return checkInterval; 095 } 096 097 public long getLastCheckTime() { 098 return lastCheck; 099 } 100 101 public void start() { 102 if (started.commit(false, true)) { 103 log.debug("Scheduling keep-alive every " + checkInterval + " millisecond."); 104 Thread t = new Thread(this); 105 t.setName("KeepAliveDaemon"); 106 t.setDaemon(true); 107 t.start(); 108 } 109 } 110 111 public void stop() { 112 if (stopped.commit(false, true)) { 113 synchronized (lock) { 114 lock.notifyAll(); 115 } 116 log.debug("Stopping keep-alive."); 117 } 118 } 119 120 public void restart() { 121 log.debug("Restarting keep-alive."); 122 stop(); 123 start(); 124 } 125 126 public void run() { 127 lastCheck = System.currentTimeMillis() - checkInterval; 128 while (!stopped.get()) { 129 for (Iterator i = zombieChannelSuspects.iterator(); i.hasNext();) { 130 ReliableTransportChannel channel = (ReliableTransportChannel) i.next(); 131 examineZombieSuspect(channel); 132 } 133 134 for (Iterator i = monitoredChannels.iterator(); i.hasNext();) { 135 ReliableTransportChannel channel = (ReliableTransportChannel) i.next(); 136 if (!zombieChannelSuspects.contains(channel)) 137 examineChannel(channel); 138 } 139 lastCheck = System.currentTimeMillis(); 140 synchronized (lock) { 141 try { 142 lock.wait(checkInterval); 143 } catch (InterruptedException e) { 144 145 } 146 } 147 } 148 } 149 150 private void examineZombieSuspect(ReliableTransportChannel channel) { 151 if ((channel.getLastReceiptTimestamp() + channel.getKeepAliveTimeout() * 2) < System.currentTimeMillis()) { 152 // Timed out 153 log.info("Forcing channel " 154 + channel 155 + " to disconnect since it " 156 + (channel.getLastReceiptTimestamp() == 0 ? "never has responded " : "hasn't responded since " 157 + new java.util.Date(channel.getLastReceiptTimestamp())) + " and has a timeout of " 158 + channel.getKeepAliveTimeout()); 159 channel.forceDisconnect(); 160 // Remove it and wait for a reconnect 161 zombieChannelSuspects.remove(channel); 162 } else if ((channel.getLastReceiptTimestamp() + channel.getKeepAliveTimeout()) < System.currentTimeMillis()) { 163 // Still a zombie suspect, but has not timed out 164 log.debug("Still waiting for response from channel " + channel); 165 } else { 166 // It's alive again 167 log.debug("Channel " + channel + " responded in time."); 168 zombieChannelSuspects.remove(channel); 169 } 170 } 171 172 private void examineChannel(ReliableTransportChannel channel) { 173 // Is the channel stopping? 174 if (channel.isPendingStop()) { 175 removeMonitoredChannel(channel); 176 } else { 177 // Then it should be active 178 // Keep pinging the channel periodically 179 if ((channel.getLastReceiptTimestamp() + channel.getKeepAliveTimeout()) < System.currentTimeMillis()) { 180 log.debug("Sending keep-alive on channel " + channel.toString()); 181 KeepAlive packet = new KeepAlive(); 182 packet.setReceiptRequired(true); 183 boolean wasConnected = channel.isTransportConnected(); 184 try { 185 channel.asyncSendWithReceipt(packet); 186 zombieChannelSuspects.add(channel); 187 } catch (JMSException e) { 188 // only report an error if the channel was connetected. 189 if (wasConnected) { 190 log.error("Error sending keep-alive to channel " + channel.toString() 191 + ". Treating as temporary problem.", e); 192 } 193 } 194 } 195 } 196 } 197 }