001    /** 
002     * 
003     * Copyright 2004 Protique Ltd
004     * 
005     * Licensed under the Apache License, Version 2.0 (the "License"); 
006     * you may not use this file except in compliance with the License. 
007     * You may obtain a copy of the License at 
008     * 
009     * http://www.apache.org/licenses/LICENSE-2.0
010     * 
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS, 
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
014     * See the License for the specific language governing permissions and 
015     * limitations under the License. 
016     * 
017     **/
018    package org.activemq.pool;
019    
020    import java.util.HashMap;
021    import java.util.Map;
022    
023    import javax.jms.Connection;
024    import javax.jms.ConnectionConsumer;
025    import javax.jms.ConnectionMetaData;
026    import javax.jms.Destination;
027    import javax.jms.ExceptionListener;
028    import javax.jms.JMSException;
029    import javax.jms.Queue;
030    import javax.jms.QueueConnection;
031    import javax.jms.QueueSession;
032    import javax.jms.ServerSessionPool;
033    import javax.jms.Session;
034    import javax.jms.Topic;
035    import javax.jms.TopicConnection;
036    import javax.jms.TopicSession;
037    
038    import org.activemq.ActiveMQConnection;
039    import org.activemq.ActiveMQSession;
040    import org.activemq.AlreadyClosedException;
041    
042    /**
043     * Represents a proxy {@link Connection} which is-a
044     * {@link TopicConnection} and {@link QueueConnection} which is pooled and on {@link #close()}
045     * will return itself to the sessionPool.
046     *
047     * @version $Revision: 1.1.1.1 $
048     */
049    public class PooledConnection implements TopicConnection, QueueConnection {
050    
051        private ActiveMQConnection connection;
052        private Map cache;
053        private boolean stopped;
054    
055        public PooledConnection(ActiveMQConnection connection) {
056            this(connection, new HashMap());
057        }
058    
059        public PooledConnection(ActiveMQConnection connection, Map cache) {
060            this.connection = connection;
061            this.cache = cache;
062        }
063    
064        /**
065         * Factory method to create a new instance.
066         */
067        public PooledConnection newInstance() {
068            return new PooledConnection(connection, cache);
069        }
070    
071        public void close() throws JMSException {
072            connection = null;
073        }
074    
075        public void start() throws JMSException {
076            // TODO should we start connections first before pooling them?
077            getConnection().start();
078        }
079    
080        public void stop() throws JMSException {
081            stopped = true;
082        }
083    
084        public ConnectionConsumer createConnectionConsumer(Destination destination, String selector, ServerSessionPool serverSessionPool, int maxMessages) throws JMSException {
085            return getConnection().createConnectionConsumer(destination, selector, serverSessionPool, maxMessages);
086        }
087    
088        public ConnectionConsumer createConnectionConsumer(Topic topic, String s, ServerSessionPool serverSessionPool, int maxMessages) throws JMSException {
089            return getConnection().createConnectionConsumer(topic, s, serverSessionPool, maxMessages);
090        }
091    
092        public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String selector, String s1, ServerSessionPool serverSessionPool, int i) throws JMSException {
093            return getConnection().createDurableConnectionConsumer(topic, selector, s1, serverSessionPool, i);
094        }
095    
096        public String getClientID() throws JMSException {
097            return getConnection().getClientID();
098        }
099    
100        public ExceptionListener getExceptionListener() throws JMSException {
101            return getConnection().getExceptionListener();
102        }
103    
104        public ConnectionMetaData getMetaData() throws JMSException {
105            return getConnection().getMetaData();
106        }
107    
108        public void setExceptionListener(ExceptionListener exceptionListener) throws JMSException {
109            getConnection().setExceptionListener(exceptionListener);
110        }
111    
112        public void setClientID(String clientID) throws JMSException {
113            getConnection().setClientID(clientID);
114        }
115    
116        public ConnectionConsumer createConnectionConsumer(Queue queue, String selector, ServerSessionPool serverSessionPool, int maxMessages) throws JMSException {
117            return getConnection().createConnectionConsumer(queue, selector, serverSessionPool, maxMessages);
118        }
119    
120    
121        // Session factory methods
122        //-------------------------------------------------------------------------
123        public QueueSession createQueueSession(boolean transacted, int ackMode) throws JMSException {
124            return (QueueSession) createSession(transacted, ackMode);
125        }
126    
127        public TopicSession createTopicSession(boolean transacted, int ackMode) throws JMSException {
128            return (TopicSession) createSession(transacted, ackMode);
129        }
130    
131        public Session createSession(boolean transacted, int ackMode) throws JMSException {
132            SessionKey key = new SessionKey(transacted, ackMode);
133            SessionPool pool = (SessionPool) cache.get(key);
134            if (pool == null) {
135                pool = new SessionPool(getConnection(), key);
136                cache.put(key, pool);
137            }
138            return pool.borrowSession();
139        }
140    
141    
142        // Implementation methods
143        //-------------------------------------------------------------------------
144        protected ActiveMQConnection getConnection() throws JMSException {
145            if (stopped || connection == null) {
146                throw new AlreadyClosedException();
147            }
148            return connection;
149        }
150    
151        protected ActiveMQSession createSession(SessionKey key) throws JMSException {
152            return (ActiveMQSession) getConnection().createSession(key.isTransacted(), key.getAckMode());
153        }
154    
155    }