001    /** 
002     * 
003     * Copyright 2004 Protique Ltd
004     * 
005     * Licensed under the Apache License, Version 2.0 (the "License"); 
006     * you may not use this file except in compliance with the License. 
007     * You may obtain a copy of the License at 
008     * 
009     * http://www.apache.org/licenses/LICENSE-2.0
010     * 
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS, 
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
014     * See the License for the specific language governing permissions and 
015     * limitations under the License. 
016     * 
017     **/
018    
019    package org.activemq.service.boundedvm;
020    import java.util.List;
021    
022    import javax.jms.DeliveryMode;
023    import javax.jms.JMSException;
024    
025    import org.activemq.broker.BrokerClient;
026    import org.activemq.filter.Filter;
027    import org.activemq.io.util.MemoryBoundedQueue;
028    import org.activemq.message.ActiveMQMessage;
029    import org.activemq.message.ConsumerInfo;
030    
031    /**
032     * A holder for Transient Queue consumer info and message routing
033     * 
034     * @version $Revision: 1.1.1.1 $
035     */
036    public class TransientQueueSubscription extends TransientSubscription {
037    
038        private MemoryBoundedQueue dispatchedQueue;
039        private MemoryBoundedQueue ackedQueue; // Where messages go that are acked in a transaction
040    
041        /**
042         * Construct the TransientQueueSubscription
043         * 
044         * @param client
045         * @param dispatchedQueue
046         * @param ackQueue 
047         * @param filter
048         * @param info
049         */
050        public TransientQueueSubscription(BrokerClient client, MemoryBoundedQueue dispatchedQueue, MemoryBoundedQueue ackQueue, Filter filter,
051                ConsumerInfo info) {
052            super(filter, info, client);
053            this.dispatchedQueue = dispatchedQueue;
054                    this.ackedQueue = ackQueue;
055        }
056    
057        /**
058         * determines if the Subscription is interested in the message
059         * 
060         * @param message
061         * @return true if this Subscription will accept the message
062         * @throws JMSException
063         */
064        public boolean isTarget(ActiveMQMessage message) throws JMSException {
065            boolean result = false;
066            if (message != null) {
067                //make sure we don't loop messages around the cluster
068                if (!client.isClusteredConnection() || !message.isEntryCluster(clusterName)
069                        || message.isEntryBroker(brokerName)) {
070                    result = filter.matches(message)
071                            && (message.getJMSDeliveryMode() == DeliveryMode.NON_PERSISTENT || consumerInfo
072                                    .getDestination().isTemporary());
073                }
074            }
075            return result;
076        }
077    
078        /**
079         * @return true if the consumer has capacity for more messages
080         */
081        public boolean canAcceptMessages() {
082            return dispatchedQueue.size() <= consumerInfo.getPrefetchNumber();
083        }
084    
085        /**
086         * Dispatch a message to the Consumer
087         * 
088         * @param message
089         * @throws JMSException
090         */
091        public void doDispatch(ActiveMQMessage message) throws JMSException {
092            dispatchedQueue.enqueueNoBlock(message);
093            message = message.shallowCopy();
094            message.setConsumerNos(new int[]{consumerInfo.getConsumerNo()});
095            client.dispatch(message);
096        }
097            
098     
099        /**
100         * Acknowledge the receipt of a message by a consumer
101         * 
102         * @param id
103         * @return the removed ActiveMQMessage with the associated id
104         */
105        public ActiveMQMessage acknowledgeMessage(String id) {
106            ActiveMQMessage msg = (ActiveMQMessage) dispatchedQueue.remove(id);
107            return msg;
108        }
109    
110        /**
111         * @return all the unacknowledge messages
112         */
113        public List getUndeliveredMessages() {
114            return dispatchedQueue.getContents();
115        }
116    
117        /**
118         * close the subscription
119         */
120        public void close() {
121            super.close();
122            dispatchedQueue.close();
123            ackedQueue.close();
124        }
125            
126            /**
127         * Add an acked message.
128         */
129        public boolean hasAckedMessage() {
130            return !ackedQueue.isEmpty();
131        }
132    
133        /**
134         * Add an acked message.
135         * 
136         * @throws InterruptedException
137         */
138        public void addAckedMessage(ActiveMQMessage message) {
139            ackedQueue.enqueueNoBlock(message);
140        }
141    
142        /**
143         * Get a list of all the acked messages
144         */
145        public List listAckedMessages() {
146            return ackedQueue.getContents();
147        }
148    
149        /**
150         * Add an acked message.
151         */
152        public void removeAllAckedMessages() {
153            ackedQueue.clear();
154        }
155    
156            public boolean isBrowser() {
157                    return consumerInfo.isBrowser();
158            }
159    }