001    /** 
002     * 
003     * Copyright 2004 Protique Ltd
004     * 
005     * Licensed under the Apache License, Version 2.0 (the "License"); 
006     * you may not use this file except in compliance with the License. 
007     * You may obtain a copy of the License at 
008     * 
009     * http://www.apache.org/licenses/LICENSE-2.0
010     * 
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS, 
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
014     * See the License for the specific language governing permissions and 
015     * limitations under the License. 
016     * 
017     **/
018    
019    package org.activemq;
020    
021    import java.util.Iterator;
022    import java.util.List;
023    
024    import javax.jms.JMSException;
025    
026    import org.activemq.io.util.MemoryBoundedQueue;
027    import org.activemq.message.ActiveMQMessage;
028    
029    /**
030     * A utility class used by the Session for dispatching messages asynchronously to consumers
031     *
032     * @version $Revision: 1.1.1.1 $
033     * @see javax.jms.Session
034     */
035    public class ActiveMQSessionExecutor implements Runnable {
036        private ActiveMQSession session;
037        private MemoryBoundedQueue messageQueue;
038        private boolean closed;
039        private Thread runner;
040        private boolean dispatchedBySessionPool;
041        private boolean optimizedMessageDispatch;
042    
043        ActiveMQSessionExecutor(ActiveMQSession session, MemoryBoundedQueue queue) {
044            this.session = session;
045            this.messageQueue = queue;
046        }
047    
048        void setDispatchedBySessionPool(boolean value) {
049            dispatchedBySessionPool = value;
050        }
051        
052        /**
053         * @return Returns the optimizedMessageDispatch.
054         */
055        boolean isOptimizedMessageDispatch() {
056            return optimizedMessageDispatch;
057        }
058        /**
059         * @param optimizedMessageDispatch The optimizedMessageDispatch to set.
060         */
061        void setOptimizedMessageDispatch(boolean optimizedMessageDispatch) {
062            this.optimizedMessageDispatch = optimizedMessageDispatch;
063        }
064    
065        void execute(ActiveMQMessage message) {
066            if (optimizedMessageDispatch && !dispatchedBySessionPool){
067                dispatch(message);
068            }else {
069                messageQueue.enqueue(message);
070            }
071           
072        }
073    
074        void executeFirst(ActiveMQMessage message) {
075            messageQueue.enqueueFirstNoBlock(message);
076        }
077    
078        boolean hasUncomsumedMessages() {
079            return !messageQueue.isEmpty();
080        }
081    
082        List getUnconsumedMessages() {
083            return messageQueue.getContents();
084        }
085        
086        /**
087         * implementation of Runnable
088         */
089        public void run() {
090            while (!closed && !dispatchedBySessionPool) {
091                ActiveMQMessage message = null;
092                try {
093                    message = (ActiveMQMessage) messageQueue.dequeue(100);
094                }
095                catch (InterruptedException ie) {
096                }
097                if (!closed) {
098                    if (message != null) {
099                        if (!dispatchedBySessionPool) {
100                            dispatch(message);
101                        }
102                        else {
103                            messageQueue.enqueueFirstNoBlock(message);
104                        }
105                    }
106                }
107            }
108        }
109        
110        void dispatch(ActiveMQMessage message){
111            for (Iterator i = this.session.consumers.iterator(); i.hasNext();) {
112                ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) i.next();
113                if (message.isConsumerTarget(consumer.getConsumerNumber())) {
114                    try {
115                        consumer.processMessage(message.shallowCopy());
116                    }
117                    catch (JMSException e) {
118                        this.session.connection.handleAsyncException(e);
119                    }
120                }
121            }
122        }
123    
124        synchronized void start() {
125            messageQueue.start();
126            if (runner == null && (!dispatchedBySessionPool || optimizedMessageDispatch)) {
127                runner = new Thread(this, "JmsSessionDispatcher: " + session.getSessionId());
128                runner.setPriority(Thread.MAX_PRIORITY);
129                //runner.setDaemon(true);
130                runner.start();
131            }
132        }
133    
134        synchronized void stop() {
135            messageQueue.stop();
136        }
137    
138        synchronized void close() {
139            closed = true;
140            messageQueue.close();
141        }
142    
143        void clear() {
144            messageQueue.clear();
145        }
146    
147        ActiveMQMessage dequeueNoWait() {
148            try {
149                return (ActiveMQMessage) messageQueue.dequeueNoWait();
150            }
151            catch (InterruptedException ie) {
152                return null;
153            }
154        }
155        
156        protected void clearMessagesInProgress(){
157            messageQueue.clear();
158        }
159        
160        
161    }