001 /** 002 * 003 * Copyright 2004 Protique Ltd 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 * 017 **/ 018 019 package org.activemq.web; 020 021 import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap; 022 import org.apache.commons.logging.Log; 023 import org.apache.commons.logging.LogFactory; 024 import org.activemq.ActiveMQConnection; 025 import org.activemq.ActiveMQConnectionFactory; 026 import org.activemq.ActiveMQSession; 027 028 import javax.jms.ConnectionFactory; 029 import javax.jms.DeliveryMode; 030 import javax.jms.Destination; 031 import javax.jms.JMSException; 032 import javax.jms.Message; 033 import javax.jms.MessageConsumer; 034 import javax.jms.MessageProducer; 035 import javax.jms.Session; 036 import javax.jms.Topic; 037 import javax.servlet.ServletContext; 038 import javax.servlet.http.HttpSession; 039 import javax.servlet.http.HttpSessionActivationListener; 040 import javax.servlet.http.HttpSessionEvent; 041 import java.io.Externalizable; 042 import java.io.IOException; 043 import java.io.ObjectInput; 044 import java.io.ObjectOutput; 045 import java.util.HashMap; 046 import java.util.Map; 047 048 /** 049 * Represents a messaging client used from inside a web container 050 * typically stored inside a HttpSession 051 * 052 * @version $Revision: 1.1.1.1 $ 053 */ 054 public class WebClient implements HttpSessionActivationListener, Externalizable { 055 public static final String webClientAttribute = "org.activemq.webclient"; 056 public static final String connectionFactoryAttribute = "org.activemq.connectionFactory"; 057 public static final String queueConsumersAttribute = "org.activemq.queueConsumers"; 058 public static final String brokerUrlInitParam = "org.activemq.brokerURL"; 059 public static final String embeddedBrokerInitParam = "org.activemq.embeddedBroker"; 060 061 private static final Log log = LogFactory.getLog(WebClient.class); 062 063 private static transient ConnectionFactory factory; 064 private static transient Map queueConsumers; 065 066 private transient ServletContext context; 067 private transient ActiveMQConnection connection; 068 private transient ActiveMQSession session; 069 private transient MessageProducer producer; 070 private transient Map topicConsumers = new ConcurrentHashMap(); 071 private int deliveryMode = DeliveryMode.NON_PERSISTENT; 072 073 074 /** 075 * @return the web client for the current HTTP session or null if there is not a web client created yet 076 */ 077 public static WebClient getWebClient(HttpSession session) { 078 return (WebClient) session.getAttribute(webClientAttribute); 079 } 080 081 082 public static void initContext(ServletContext context) { 083 factory = initConnectionFactory(context); 084 if (factory == null) { 085 log.warn("No ConnectionFactory available in the ServletContext for: " + connectionFactoryAttribute); 086 factory = new ActiveMQConnectionFactory("vm://localhost"); 087 context.setAttribute(connectionFactoryAttribute, factory); 088 } 089 queueConsumers = initQueueConsumers(context); 090 } 091 092 /** 093 * Only called by serialization 094 */ 095 public WebClient() { 096 } 097 098 public WebClient(ServletContext context) { 099 this.context = context; 100 initContext(context); 101 } 102 103 104 public int getDeliveryMode() { 105 return deliveryMode; 106 } 107 108 109 public void setDeliveryMode(int deliveryMode) { 110 this.deliveryMode = deliveryMode; 111 } 112 113 114 public void start() throws JMSException { 115 } 116 117 public void stop() throws JMSException { 118 System.out.println("Closing the WebClient!!! " + this); 119 120 try { 121 connection.close(); 122 } 123 finally { 124 producer = null; 125 session = null; 126 connection = null; 127 topicConsumers.clear(); 128 } 129 } 130 131 public void writeExternal(ObjectOutput out) throws IOException { 132 } 133 134 public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { 135 topicConsumers = new HashMap(); 136 } 137 138 public void send(Destination destination, Message message) throws JMSException { 139 if (producer == null) { 140 producer = getSession().createProducer(null); 141 producer.setDeliveryMode(deliveryMode ); 142 } 143 log.info("Sending to destination: " + destination); 144 producer.send(destination, message); 145 log.info("Sent! message: " + message); 146 } 147 148 public Session getSession() throws JMSException { 149 if (session == null) { 150 session = createSession(); 151 } 152 return session; 153 } 154 155 public ActiveMQConnection getConnection() throws JMSException { 156 if (connection == null) { 157 connection = (ActiveMQConnection) factory.createConnection(); 158 connection.start(); 159 } 160 return connection; 161 } 162 163 public void sessionWillPassivate(HttpSessionEvent event) { 164 try { 165 stop(); 166 } 167 catch (JMSException e) { 168 log.warn("Could not close connection: " + e, e); 169 } 170 } 171 172 public void sessionDidActivate(HttpSessionEvent event) { 173 // lets update the connection factory from the servlet context 174 context = event.getSession().getServletContext(); 175 initContext(context); 176 } 177 178 public static Map initQueueConsumers(ServletContext context) { 179 Map answer = (Map) context.getAttribute(queueConsumersAttribute); 180 if (answer == null) { 181 answer = new HashMap(); 182 context.setAttribute(queueConsumersAttribute, answer); 183 } 184 return answer; 185 } 186 187 188 public static ConnectionFactory initConnectionFactory(ServletContext servletContext) { 189 ConnectionFactory connectionFactory = (ConnectionFactory) servletContext.getAttribute(connectionFactoryAttribute); 190 if (connectionFactory == null) { 191 String brokerURL = (String) servletContext.getInitParameter(brokerUrlInitParam); 192 193 servletContext.log("Value of: " + brokerUrlInitParam + " is: " + brokerURL); 194 195 if (brokerURL == null) { 196 brokerURL = "vm://localhost"; 197 } 198 199 boolean embeddedBroker = MessageServletSupport.asBoolean(servletContext.getInitParameter(embeddedBrokerInitParam)); 200 servletContext.log("Use embedded broker: " + embeddedBroker); 201 202 ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerURL); 203 factory.setUseEmbeddedBroker(embeddedBroker); 204 205 connectionFactory = factory; 206 servletContext.setAttribute(connectionFactoryAttribute, connectionFactory); 207 } 208 return connectionFactory; 209 } 210 211 public synchronized MessageConsumer getConsumer(Destination destination) throws JMSException { 212 if (destination instanceof Topic) { 213 MessageConsumer consumer = (MessageConsumer) topicConsumers.get(destination); 214 if (consumer == null) { 215 consumer = getSession().createConsumer(destination); 216 topicConsumers.put(destination, consumer); 217 } 218 return consumer; 219 } 220 else { 221 synchronized (queueConsumers) { 222 SessionConsumerPair pair = (SessionConsumerPair) queueConsumers.get(destination); 223 if (pair == null) { 224 pair = createSessionConsumerPair(destination); 225 queueConsumers.put(destination, pair); 226 } 227 return pair.consumer; 228 } 229 } 230 } 231 232 protected ActiveMQSession createSession() throws JMSException { 233 return (ActiveMQSession) getConnection().createSession(false, Session.AUTO_ACKNOWLEDGE); 234 } 235 236 protected SessionConsumerPair createSessionConsumerPair(Destination destination) throws JMSException { 237 SessionConsumerPair answer = new SessionConsumerPair(); 238 answer.session = createSession(); 239 answer.consumer = answer.session.createConsumer(destination); 240 return answer; 241 } 242 243 protected static class SessionConsumerPair { 244 public Session session; 245 public MessageConsumer consumer; 246 } 247 }