001 /** 002 * 003 * Copyright 2004 Protique Ltd 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 * 017 **/ 018 019 package org.activemq.advisories; 020 import java.util.HashMap; 021 import java.util.HashSet; 022 import java.util.Iterator; 023 import java.util.List; 024 import java.util.Map; 025 import java.util.Set; 026 027 import javax.jms.Connection; 028 import javax.jms.Destination; 029 import javax.jms.JMSException; 030 import javax.jms.Message; 031 import javax.jms.MessageConsumer; 032 import javax.jms.MessageListener; 033 import javax.jms.ObjectMessage; 034 import javax.jms.Session; 035 036 import org.activemq.message.ActiveMQDestination; 037 import org.activemq.message.ActiveMQTopic; 038 import org.activemq.message.ConnectionInfo; 039 import org.apache.commons.logging.Log; 040 import org.apache.commons.logging.LogFactory; 041 042 import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArrayList; 043 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean; 044 045 /** 046 * A helper class for listening for MessageConnection advisories * 047 * 048 * @version $Revision: 1.1.1.1 $ 049 */ 050 public class ConnectionAdvisor implements MessageListener { 051 private static final Log log = LogFactory.getLog(ConnectionAdvisor.class); 052 private Connection connection; 053 private Session session; 054 private List listeners = new CopyOnWriteArrayList(); 055 private Map activeConnections = new HashMap(); 056 private SynchronizedBoolean started = new SynchronizedBoolean(false); 057 private Object lock = new Object(); 058 059 /** 060 * Construct a ConnectionAdvisor 061 * 062 * @param connection 063 * @throws JMSException 064 */ 065 public ConnectionAdvisor(Connection connection) throws JMSException { 066 this.connection = connection; 067 } 068 069 /** 070 * start listening for advisories 071 * 072 * @throws JMSException 073 */ 074 public void start() throws JMSException { 075 if (started.commit(false, true)) { 076 077 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 078 079 String advisoryName = ActiveMQDestination.CONNECTION_ADVISORY_PREFIX; 080 Destination advisoryDestination = new ActiveMQTopic(advisoryName); 081 MessageConsumer consumer = session.createConsumer(advisoryDestination); 082 consumer.setMessageListener(this); 083 } 084 } 085 086 /** 087 * stop listening for advisories 088 * 089 * @throws JMSException 090 */ 091 public void stop() throws JMSException { 092 if (started.commit(true, false)) { 093 if (session != null) { 094 session.close(); 095 } 096 synchronized (lock) { 097 lock.notifyAll(); 098 } 099 } 100 } 101 102 /** 103 * Add a listener 104 * 105 * @param l 106 */ 107 public void addListener(ConnectionAdvisoryEventListener l) { 108 listeners.add(l); 109 } 110 111 /** 112 * Remove a listener 113 * 114 * @param l 115 */ 116 public void removeListener(ConnectionAdvisoryEventListener l) { 117 listeners.remove(l); 118 } 119 120 /** 121 * returns true if the connection is active 122 * 123 * @param clientId for the connection 124 * @return true if the connection is active 125 */ 126 public boolean isActive(String clientId) { 127 return activeConnections.containsKey(clientId); 128 } 129 130 /** 131 * Retrive all current Connections 132 * 133 * @return 134 */ 135 public Set getConnections() { 136 Set set = new HashSet(); 137 set.addAll(activeConnections.values()); 138 return set; 139 } 140 141 /** 142 * Waits until the number of active connections is equivalent to the number supplied, or the timeout is exceeded 143 * 144 * @param number 145 * @param timeout 146 * @return the number of activeConnections 147 */ 148 public int waitForActiveConnections(int number, long timeout) { 149 int result = 0; 150 // if timeInMillis is less than zero assume nowait 151 long waitTime = timeout; 152 long start = (timeout <= 0) ? 0 : System.currentTimeMillis(); 153 synchronized (lock) { 154 while (started.get()) { 155 result = numberOfActiveConnections(); 156 if (result == number || waitTime <= 0) { 157 break; 158 } 159 else { 160 try { 161 lock.wait(waitTime); 162 } 163 catch (Throwable e) { 164 log.debug("Interrupted", e); 165 e.printStackTrace(); 166 } 167 waitTime = timeout - (System.currentTimeMillis() - start); 168 } 169 } 170 } 171 return result; 172 } 173 174 /** 175 * return the current number of active connections 176 * 177 * @return 178 */ 179 public int numberOfActiveConnections() { 180 return activeConnections.size(); 181 } 182 183 /** 184 * OnMessage() implementation 185 * 186 * @param msg 187 */ 188 public void onMessage(Message msg) { 189 if (msg instanceof ObjectMessage) { 190 try { 191 ConnectionInfo info = (ConnectionInfo) ((ObjectMessage) msg).getObject(); 192 193 ConnectionAdvisoryEvent event = new ConnectionAdvisoryEvent(info); 194 if (!event.getInfo().isClosed()) { 195 activeConnections.put(event.getInfo().getClientId(), event.getInfo()); 196 } 197 else { 198 activeConnections.remove(event.getInfo().getClientId()); 199 } 200 synchronized (lock) { 201 lock.notify(); 202 } 203 fireEvent(event); 204 } 205 catch (Throwable e) { 206 log.error("Failed to process message: " + msg); 207 } 208 } 209 } 210 211 private void fireEvent(ConnectionAdvisoryEvent event) { 212 for (Iterator i = listeners.iterator();i.hasNext();) { 213 ConnectionAdvisoryEventListener l = (ConnectionAdvisoryEventListener) i.next(); 214 l.onEvent(event); 215 } 216 } 217 }