001 /** 002 * 003 * Copyright 2004 Protique Ltd 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 * 017 **/ 018 019 package org.activemq.advisories; 020 import java.util.Iterator; 021 import java.util.List; 022 import java.util.Set; 023 import javax.jms.Connection; 024 import javax.jms.Destination; 025 import javax.jms.JMSException; 026 import javax.jms.Message; 027 import javax.jms.MessageConsumer; 028 import javax.jms.MessageListener; 029 import javax.jms.ObjectMessage; 030 import javax.jms.Session; 031 import org.apache.commons.logging.Log; 032 import org.apache.commons.logging.LogFactory; 033 import org.activemq.ActiveMQConnection; 034 import org.activemq.ActiveMQSession; 035 import org.activemq.message.ActiveMQDestination; 036 import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArrayList; 037 import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArraySet; 038 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean; 039 040 /** 041 * A helper class for listening for TempDestination advisories 042 * 043 * @version $Revision: 1.1.1.1 $ 044 */ 045 public class TempDestinationAdvisor implements MessageListener { 046 private static final Log log = LogFactory.getLog(TempDestinationAdvisor.class); 047 private Connection connection; 048 private ActiveMQDestination destination; 049 private Session session; 050 private List listeners = new CopyOnWriteArrayList(); 051 private Set activeDestinations = new CopyOnWriteArraySet(); 052 private SynchronizedBoolean started = new SynchronizedBoolean(false); 053 private long startedAt; 054 055 /** 056 * Construct a TempDestinationAdvisor 057 * 058 * @param connection 059 * @param destination the destination to listen for TempDestination events 060 * @throws JMSException 061 */ 062 public TempDestinationAdvisor(Connection connection, Destination destination) throws JMSException { 063 this.connection = connection; 064 this.destination = ActiveMQDestination.transformDestination(destination); 065 } 066 067 /** 068 * start listening for advisories 069 * 070 * @throws JMSException 071 */ 072 public void start() throws JMSException { 073 if (started.commit(false, true)) { 074 if (connection instanceof ActiveMQConnection) { 075 session = ((ActiveMQConnection) connection).createSession(false, Session.AUTO_ACKNOWLEDGE, true); 076 ((ActiveMQSession) session).setInternalSession(true); 077 } 078 else { 079 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 080 } 081 MessageConsumer consumer = session.createConsumer(destination.getTopicForTempAdvisory()); 082 consumer.setMessageListener(this); 083 startedAt = System.currentTimeMillis(); 084 } 085 } 086 087 /** 088 * stop listening for advisories 089 * 090 * @throws JMSException 091 */ 092 public void stop() throws JMSException { 093 if (started.commit(true, false)) { 094 if (session != null) { 095 session.close(); 096 } 097 } 098 } 099 100 /** 101 * returns true if the temporary destination is active 102 * 103 * @param destination 104 * @return true if a subscriber for the destination 105 */ 106 public boolean isActive(Destination destination) { 107 boolean rtnval = false; 108 synchronized(this) 109 { 110 rtnval = activeDestinations.contains(destination); 111 if (rtnval == false && startedAt > 0) 112 { 113 // wait a while to see if the advisory event arrives (no longer than 5 seconds) 114 long waittime = 5000 - (System.currentTimeMillis() - startedAt); 115 startedAt = 0; 116 try { 117 wait(waittime); 118 } catch (Exception e) {} 119 rtnval = activeDestinations.contains(destination); 120 } 121 } 122 return rtnval; 123 } 124 125 /** 126 * Add a listener 127 * 128 * @param l 129 */ 130 public void addListener(TempDestinationAdvisoryEventListener l) { 131 listeners.add(l); 132 } 133 134 /** 135 * Remove a listener 136 * 137 * @param l 138 */ 139 public void removeListener(TempDestinationAdvisoryEventListener l) { 140 listeners.remove(l); 141 } 142 143 /** 144 * OnMessage() implementation 145 * 146 * @param msg 147 */ 148 public void onMessage(Message msg) { 149 if (msg instanceof ObjectMessage) { 150 try { 151 TempDestinationAdvisoryEvent event = (TempDestinationAdvisoryEvent) ((ObjectMessage) msg).getObject(); 152 if (event.isStarted()) { 153 activeDestinations.add(event.getDestination()); 154 synchronized (this) { 155 notifyAll(); 156 } 157 } 158 else { 159 activeDestinations.remove(event.getDestination()); 160 } 161 fireEvent(event); 162 } 163 catch (JMSException e) { 164 log.error("Failed to process message: " + msg); 165 } 166 } 167 } 168 169 private void fireEvent(TempDestinationAdvisoryEvent event) { 170 for (Iterator i = listeners.iterator();i.hasNext();) { 171 TempDestinationAdvisoryEventListener l = (TempDestinationAdvisoryEventListener) i.next(); 172 l.onEvent(event); 173 } 174 } 175 }