001    /**
002     * 
003     * Copyright 2004 Protique Ltd
004     * 
005     * Licensed under the Apache License, Version 2.0 (the "License"); 
006     * you may not use this file except in compliance with the License. 
007     * You may obtain a copy of the License at 
008     * 
009     * http://www.apache.org/licenses/LICENSE-2.0
010     * 
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS, 
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
014     * See the License for the specific language governing permissions and 
015     * limitations under the License. 
016     * 
017     **/
018    
019    package org.activemq.service.impl;
020    import java.util.ArrayList;
021    import java.util.List;
022    import javax.jms.JMSException;
023    import org.apache.commons.logging.Log;
024    import org.apache.commons.logging.LogFactory;
025    import org.activemq.broker.BrokerClient;
026    import org.activemq.broker.BrokerConnector;
027    import org.activemq.broker.BrokerContainer;
028    import org.activemq.broker.Broker;
029    import org.activemq.filter.Filter;
030    import org.activemq.message.ActiveMQDestination;
031    import org.activemq.message.ActiveMQMessage;
032    import org.activemq.message.BrokerInfo;
033    import org.activemq.message.ConsumerInfo;
034    import org.activemq.message.MessageAck;
035    import org.activemq.service.DeadLetterPolicy;
036    import org.activemq.service.Dispatcher;
037    import org.activemq.service.MessageContainer;
038    import org.activemq.service.MessageIdentity;
039    import org.activemq.service.QueueList;
040    import org.activemq.service.QueueListEntry;
041    import org.activemq.service.RedeliveryPolicy;
042    import org.activemq.service.SubscriberEntry;
043    import org.activemq.service.Subscription;
044    import org.activemq.service.TransactionManager;
045    import org.activemq.service.TransactionTask;
046    import org.activemq.security.SecurityAdapter;
047    import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
048    import EDU.oswego.cs.dl.util.concurrent.SynchronizedInt;
049    
050    /**
051     * A Subscription holds messages to be dispatched to a a Client Consumer
052     * 
053     * @version $Revision: 1.1.1.1 $
054     */
055    public class SubscriptionImpl implements Subscription {
056        private static final Log log = LogFactory.getLog(SubscriptionImpl.class);
057        private String clientId;
058        private String subscriberName;
059        private ActiveMQDestination destination;
060        private String selector;
061        private int prefetchLimit;
062        private boolean noLocal;
063        private int consumerNumber;
064        private String consumerId;
065        private boolean browser;
066        protected Dispatcher dispatch;
067        protected String brokerName;
068        protected String clusterName;
069        protected MessageIdentity lastMessageIdentity;
070        private Filter filter;
071        protected SynchronizedInt unconsumedMessagesDispatched = new SynchronizedInt(0);
072        protected QueueList messagePtrs = new DefaultQueueList();
073        private boolean usePrefetch = true;
074        private SubscriberEntry subscriberEntry;
075        private BrokerClient activeClient;
076        private RedeliveryPolicy redeliveryPolicy;
077        private DeadLetterPolicy deadLetterPolicy;
078        private SynchronizedBoolean active = new SynchronizedBoolean(false);
079        private Object lock = new Object();
080    
081        /**
082         * Create a Subscription object that holds messages to be dispatched to a Consumer
083         * 
084         * @param dispatcher
085         * @param client
086         * @param info
087         * @param filter
088         * @param redeliveryPolicy
089         * @param deadLetterPolicy
090         */
091        public SubscriptionImpl(Dispatcher dispatcher, BrokerClient client, ConsumerInfo info, Filter filter,
092                RedeliveryPolicy redeliveryPolicy, DeadLetterPolicy deadLetterPolicy) {
093            this.dispatch = dispatcher;
094            this.filter = filter;
095            this.redeliveryPolicy = redeliveryPolicy;
096            this.deadLetterPolicy = deadLetterPolicy;
097            setActiveConsumer(client, info);
098        }
099    
100        /**
101         * Set the active consumer info
102         * 
103         * @param client
104         * @param info
105         */
106        public void setActiveConsumer(BrokerClient client, ConsumerInfo info) {
107            if (info != null) {
108                this.clientId = info.getClientId();
109                this.subscriberName = info.getConsumerName();
110                this.noLocal = info.isNoLocal();
111                this.destination = info.getDestination();
112                this.selector = info.getSelector();
113                this.prefetchLimit = info.getPrefetchNumber();
114                this.consumerNumber = info.getConsumerNo();
115                this.consumerId = info.getConsumerId();
116                this.browser = info.isBrowser();
117            }
118            this.activeClient = client;
119            if (client != null) {
120                BrokerConnector brokerConnector = client.getBrokerConnector();
121                if (brokerConnector != null) {
122                    BrokerInfo brokerInfo = brokerConnector.getBrokerInfo();
123                    if (brokerInfo != null) {
124                        brokerName = brokerInfo.getBrokerName();
125                        clusterName = brokerInfo.getClusterName();
126                    }
127                }
128            }
129        }
130    
131        /**
132         * @return pretty print of the Subscription
133         */
134        public String toString() {
135            String str = "SubscriptionImpl(" + super.hashCode() + ")[" + consumerId + "]" + clientId + ": "
136                    + subscriberName + " : " + destination;
137            return str;
138        }
139    
140        /**
141         * Called when the Subscription is discarded
142         * 
143         * @throws JMSException
144         */
145        public void clear() throws JMSException {
146            synchronized (lock) {
147                QueueListEntry entry = messagePtrs.getFirstEntry();
148                while (entry != null) {
149                    MessagePointer pointer = (MessagePointer) entry.getElement();
150                    pointer.clear();
151                    entry = messagePtrs.getNextEntry(entry);
152                }
153                messagePtrs.clear();
154            }
155        }
156    
157        /**
158         * Called when an active subscriber has closed. This resets all MessagePtrs
159         * 
160         * @throws JMSException
161         */
162        public void reset() throws JMSException {
163            synchronized (lock) {
164                QueueListEntry entry = messagePtrs.getFirstEntry();
165                while (entry != null) {
166                    MessagePointer pointer = (MessagePointer) entry.getElement();
167                    if (pointer.isDispatched() && !pointer.isDeleted()) {
168                        pointer.reset();
169                        pointer.setRedelivered(true);
170                    }
171                    else {
172                        break;
173                    }
174                    entry = messagePtrs.getNextEntry(entry);
175                }
176            }
177        }
178    
179        public BrokerClient getActiveClient() {
180            return activeClient;
181        }
182    
183        /**
184         * @return Returns the clientId.
185         */
186        public String getClientId() {
187            return clientId;
188        }
189    
190        /**
191         * @param clientId The clientId to set.
192         */
193        public void setClientId(String clientId) {
194            this.clientId = clientId;
195        }
196    
197        /**
198         * @return Returns the filter.
199         */
200        public Filter getFilter() {
201            return filter;
202        }
203    
204        /**
205         * @param filter The filter to set.
206         */
207        public void setFilter(Filter filter) {
208            this.filter = filter;
209        }
210    
211        public boolean isWildcard() {
212            return filter.isWildcard();
213        }
214    
215        public String getPersistentKey() {
216            // not required other than for persistent topic subscriptions
217            return null;
218        }
219    
220        public boolean isSameDurableSubscription(ConsumerInfo info) throws JMSException {
221            if (isDurableTopic()) {
222                return equal(clientId, info.getClientId()) && equal(subscriberName, info.getConsumerName());
223            }
224            return false;
225        }
226    
227        /**
228         * @return Returns the noLocal.
229         */
230        public boolean isNoLocal() {
231            return noLocal;
232        }
233    
234        /**
235         * @param noLocal The noLocal to set.
236         */
237        public void setNoLocal(boolean noLocal) {
238            this.noLocal = noLocal;
239        }
240    
241        /**
242         * @return Returns the subscriberName.
243         */
244        public String getSubscriberName() {
245            return subscriberName;
246        }
247    
248        /**
249         * @param subscriberName The subscriberName to set.
250         */
251        public void setSubscriberName(String subscriberName) {
252            this.subscriberName = subscriberName;
253        }
254    
255        public RedeliveryPolicy getRedeliveryPolicy() {
256            return redeliveryPolicy;
257        }
258    
259        public void setRedeliveryPolicy(RedeliveryPolicy redeliveryPolicy) {
260            this.redeliveryPolicy = redeliveryPolicy;
261        }
262    
263        /**
264         * determines if the Subscription is interested in the message
265         * 
266         * @param message
267         * @return true if this Subscription will accept the message
268         * @throws JMSException
269         */
270        public boolean isTarget(ActiveMQMessage message) throws JMSException {
271            boolean result = false;
272            if (message != null) {
273                if (activeClient == null || brokerName == null || clusterName == null
274                        || !activeClient.isClusteredConnection() || !message.isEntryCluster(clusterName)
275                        || message.isEntryBroker(brokerName)) {
276                    result = message.isDispatchedFromDLQ() || filter.matches(message);
277                    // lets check that we don't have no-local enabled
278                    if (noLocal && result) {
279                        if (clientIDsEqual(message)) {
280                            result = false;
281                        }
282                    }
283    
284                    if (result && !isAuthorizedForMessage(message)) {
285                        result = false;
286                    }
287                }
288            }
289            return result;
290        }
291    
292        /**
293         * If the Subscription is a target for the message, the subscription will add a reference to the message and
294         * register an interest in the message to the container
295         * 
296         * @param container
297         * @param message
298         * @throws JMSException
299         */
300        public void addMessage(MessageContainer container, ActiveMQMessage message) throws JMSException {
301            //log.info("###### Adding to subscription: " + this + " message: " + message);
302            if (log.isDebugEnabled()) {
303                log.debug("Adding to subscription: " + this + " message: " + message);
304            }
305            MessagePointer pointer = new MessagePointer(container, message);
306            synchronized (lock) {
307                messagePtrs.add(pointer);
308            }
309            dispatch.wakeup(this);
310            lastMessageIdentity = message.getJMSMessageIdentity();
311        }
312    
313        /**
314         * Indicates a message has been delivered to a MessageConsumer
315         * 
316         * @param ack
317         * @throws JMSException
318         */
319        public void messageConsumed(final MessageAck ack) throws JMSException {
320            //remove up to this message
321            int count = 0;
322            boolean found = false;
323            synchronized (lock) {
324                QueueListEntry entry = messagePtrs.getFirstEntry();
325                while (entry != null) {
326                    final MessagePointer pointer = (MessagePointer) entry.getElement();
327                    count++;
328                    // If in transaction: only consume the message acked.
329                    // If not in transaction: consume all previously delivered messages.
330                    if (!ack.isPartOfTransaction() || pointer.getMessageIdentity().equals(ack.getMessageIdentity())) {
331                        if ((ack.isExpired() || ack.isMessageRead()) && !browser) {
332                            pointer.delete(ack);//delete message from the container (if possible)
333                        }
334                        if (!ack.isMessageRead() && !browser) {
335                            // It was a NACK.
336                            pointer.reset();
337                            pointer.setRedelivered(true);
338                        }
339                        else {
340                            unconsumedMessagesDispatched.decrement();
341                            // We may have to undo the delivery..
342                            TransactionManager.getContexTransaction().addPostRollbackTask(new TransactionTask() {
343                                public void execute() throws Throwable {
344                                    unconsumedMessagesDispatched.increment();
345                                    pointer.reset();
346                                    pointer.setRedelivered(true);
347                                    dispatch.wakeup(SubscriptionImpl.this);
348                                }
349                            });
350                            final QueueListEntry theEntry = entry;
351                            TransactionManager.getContexTransaction().addPostCommitTask(new TransactionTask() {
352                                public void execute() throws Throwable {
353                                    messagePtrs.remove(theEntry);
354                                    if ((ack.isExpired() || ack.isMessageRead()) && !browser) {
355                                        if (ack.isExpired() && !pointer.getContainer().isDeadLetterQueue()) {
356                                            ActiveMQMessage msg = pointer.getContainer().getMessage(
357                                                    pointer.getMessageIdentity());
358                                            if (msg != null) {
359                                                deadLetterPolicy.sendToDeadLetter(msg);
360                                            }
361                                        }
362                                    }
363                                }
364                            });
365                        }
366                        if (pointer.getMessageIdentity().equals(ack.getMessageIdentity())) {
367                            found = true;
368                            break;
369                        }
370                    }
371                    entry = messagePtrs.getNextEntry(entry);
372                }
373            }
374            if (!found && log.isDebugEnabled()) {
375                log.debug("Did not find a matching message for identity: " + ack.getMessageIdentity());
376            }
377            dispatch.wakeup(this);
378        }
379    
380        /**
381         * Retrieve messages to dispatch
382         * 
383         * @return the messages to dispatch
384         * @throws JMSException
385         */
386        public ActiveMQMessage[] getMessagesToDispatch() throws JMSException {
387            if (usePrefetch) {
388                return getMessagesWithPrefetch();
389            }
390            List tmpList = new ArrayList();
391            synchronized (lock) {
392                QueueListEntry entry = messagePtrs.getFirstEntry();
393                while (entry != null) {
394                    MessagePointer pointer = (MessagePointer) entry.getElement();
395                    if (!pointer.isDispatched()) {
396                        ActiveMQMessage msg = pointer.getContainer().getMessage(pointer.getMessageIdentity());
397                        if (msg != null) {
398                            if (pointer.isDispatched() || pointer.isRedelivered()) {
399                                //already dispatched - so mark as redelivered
400                                msg.setJMSRedelivered(true);
401                                if (redeliveryPolicy.isBackOffMode()
402                                        && msg.getDeliveryCount() < redeliveryPolicy.getMaximumRetryCount()) {
403                                    long sleepTime = redeliveryPolicy.getInitialRedeliveryTimeout();
404                                    sleepTime *= (msg.getDeliveryCount() * redeliveryPolicy.getBackOffIncreaseRate());
405                                    try {
406                                        Thread.sleep(sleepTime);
407                                    }
408                                    catch (InterruptedException e) {
409                                    }
410                                }
411                                //incremenent delivery count
412                                msg.incrementDeliveryCount();
413                            }
414                            if (!pointer.getContainer().isDeadLetterQueue()
415                                    && (msg.isExpired() || msg.getDeliveryCount() >= redeliveryPolicy
416                                            .getMaximumRetryCount())) {
417                                if (msg.isExpired()) {
418                                    log.warn("Message: " + msg + " has expired");
419                                }
420                                else {
421                                    log.warn("Message: " + msg + " exceeded retry count: " + msg.getDeliveryCount());
422                                }
423                                deadLetterPolicy.sendToDeadLetter(msg);
424                                QueueListEntry discarded = entry;
425                                entry = messagePtrs.getPrevEntry(discarded);
426                                messagePtrs.remove(discarded);
427                            }
428                            else {
429                                pointer.setDispatched(true);
430                                msg.setDispatchedFromDLQ(pointer.getContainer().isDeadLetterQueue());
431                                tmpList.add(msg);
432                            }
433                        }
434                        else {
435                            //the message is probably expired
436                            log.info("Message probably expired: " + msg);
437                            QueueListEntry discarded = entry;
438                            entry = messagePtrs.getPrevEntry(discarded);
439                            messagePtrs.remove(discarded);
440                            if (msg != null) {
441                                deadLetterPolicy.sendToDeadLetter(msg);
442                            }
443                        }
444                    }
445                    entry = messagePtrs.getNextEntry(entry);
446                }
447            }
448            ActiveMQMessage[] messages = new ActiveMQMessage[tmpList.size()];
449            return (ActiveMQMessage[]) tmpList.toArray(messages);
450        }
451    
452        public SubscriberEntry getSubscriptionEntry() {
453            if (subscriberEntry == null) {
454                subscriberEntry = createSubscriptionEntry();
455            }
456            return subscriberEntry;
457        }
458    
459        public boolean isLocalSubscription() {
460            if (activeClient != null) {
461                return !(activeClient.isClusteredConnection() || activeClient.isBrokerConnection());
462            }
463            return true;
464        }
465    
466        // Implementation methods
467        //-------------------------------------------------------------------------
468        protected SubscriberEntry createSubscriptionEntry() {
469            SubscriberEntry answer = new SubscriberEntry();
470            answer.setClientID(clientId);
471            answer.setConsumerName(subscriberName);
472            answer.setDestination(destination.getPhysicalName());
473            answer.setSelector(selector);
474            return answer;
475        }
476    
477        protected ActiveMQMessage[] getMessagesWithPrefetch() throws JMSException {
478            
479            List tmpList = new ArrayList();
480            synchronized (lock) {
481                QueueListEntry entry = messagePtrs.getFirstEntry();
482                int count = 0;
483                boolean fragmentedMessages = false;
484                int maxNumberToDispatch = prefetchLimit - unconsumedMessagesDispatched.get();
485                while (entry != null && (count < maxNumberToDispatch || fragmentedMessages)) {
486                    MessagePointer pointer = (MessagePointer) entry.getElement();
487                    if (!pointer.isDispatched()) {
488                        ActiveMQMessage msg = pointer.getContainer().getMessage(pointer.getMessageIdentity());
489                        if (msg != null && !msg.isExpired()) {
490                            if (pointer.isDispatched() || pointer.isRedelivered()) {
491                                //already dispatched - so mark as redelivered
492                                msg.setJMSRedelivered(true);
493                            }
494                            pointer.setDispatched(true);
495                            tmpList.add(msg);
496                            fragmentedMessages = msg.isMessagePart() && !msg.isLastMessagePart();
497                            unconsumedMessagesDispatched.increment();
498                            count++;
499                        }
500                        else {
501                            //the message is probably expired
502                            log.info("Message probably expired: " + msg);
503                            QueueListEntry discarded = entry;
504                            entry = messagePtrs.getPrevEntry(discarded);
505                            messagePtrs.remove(discarded);
506                            if (msg != null) {
507                                deadLetterPolicy.sendToDeadLetter(msg);
508                            }
509                        }
510                    }
511                    entry = messagePtrs.getNextEntry(entry);
512                }
513            }
514            /**
515             * if (tmpList.isEmpty() && ! messagePtrs.isEmpty()) { System.out.println("### Nothing to dispatch but
516             * messagePtrs still has: " + messagePtrs.size() + " to dispatch, prefetchLimit: " + prefetchLimit + "
517             * unconsumedMessagesDispatched: " + unconsumedMessagesDispatched.get() + " maxNumberToDispatch: " +
518             * maxNumberToDispatch); MessagePointer first = (MessagePointer) messagePtrs.getFirst(); System.out.println("###
519             * First: " + first + " dispatched: " + first.isDispatched() + " id: " + first.getMessageIdentity()); } else {
520             * if (! tmpList.isEmpty()) { System.out.println("### dispatching: " + tmpList.size() + " items = " + tmpList); } }
521             */
522            ActiveMQMessage[] messages = new ActiveMQMessage[tmpList.size()];
523            return (ActiveMQMessage[]) tmpList.toArray(messages);
524        }
525    
526        /**
527         * Indicates the Subscription it's reached it's pre-fetch limit
528         * 
529         * @return true/false
530         * @throws JMSException
531         */
532        public boolean isAtPrefetchLimit() throws JMSException {
533            if (usePrefetch) {
534                int underlivedMessageCount = messagePtrs.size() - unconsumedMessagesDispatched.get();
535                return underlivedMessageCount >= prefetchLimit;
536            }
537            else {
538                return false;
539            }
540        }
541    
542        /**
543         * Indicates if this Subscription has more messages to send to the Consumer
544         * 
545         * @return true if more messages available to dispatch
546         */
547        public boolean isReadyToDispatch() throws JMSException {
548            /** TODO we may have dispatched messags inside messagePtrs */
549            boolean answer = active.get() && messagePtrs.size() > 0;
550            return answer;
551        }
552    
553        /**
554         * @return Returns the destination.
555         */
556        public ActiveMQDestination getDestination() {
557            return destination;
558        }
559    
560        /**
561         * @return Returns the selector.
562         */
563        public String getSelector() {
564            return selector;
565        }
566    
567        /**
568         * @return Returns the active.
569         */
570        public boolean isActive() {
571            return active.get();
572        }
573    
574        /**
575         * @param newActive The active to set.
576         * @throws JMSException
577         */
578        public void setActive(boolean newActive) throws JMSException {
579            synchronized (active.getLock()) {
580                active.set(newActive);
581            }
582            if (!newActive) {
583                reset();
584            }
585        }
586    
587        /**
588         * @return Returns the consumerNumber.
589         */
590        public int getConsumerNumber() {
591            return consumerNumber;
592        }
593    
594        /**
595         * @return the consumer Id for the active consumer
596         */
597        public String getConsumerId() {
598            return consumerId;
599        }
600    
601        /**
602         * Indicates the Subscriber is a Durable Subscriber
603         * 
604         * @return true if the subscriber is a durable topic
605         * @throws JMSException
606         */
607        public boolean isDurableTopic() throws JMSException {
608            return destination.isTopic() && subscriberName != null && subscriberName.length() > 0;
609        }
610    
611        /**
612         * Indicates the consumer is a browser only
613         * 
614         * @return true if a Browser
615         * @throws JMSException
616         */
617        public boolean isBrowser() throws JMSException {
618            return browser;
619        }
620    
621        public MessageIdentity getLastMessageIdentity() throws JMSException {
622            return lastMessageIdentity;
623        }
624    
625        public void setLastMessageIdentifier(MessageIdentity messageIdentity) throws JMSException {
626            this.lastMessageIdentity = messageIdentity;
627        }
628    
629        protected boolean clientIDsEqual(ActiveMQMessage message) {
630            String msgClientID = message.getJMSClientID();
631            String subClientID = clientId;
632            if (msgClientID == null || subClientID == null) {
633                return false;
634            }
635            else {
636                return msgClientID.equals(subClientID);
637            }
638        }
639    
640        protected static final boolean equal(Object left, Object right) {
641            return left == right || (left != null && right != null && left.equals(right));
642        }
643    
644    
645        /**
646         * Returns whether or not the consumer can receive the given message
647         */
648        protected boolean isAuthorizedForMessage(ActiveMQMessage message) {
649            // TODO we could maybe provide direct access to the security adapter
650            BrokerClient client = getActiveClient();
651            if (client != null) {
652                BrokerConnector connector = client.getBrokerConnector();
653                if (connector != null) {
654                    BrokerContainer container = connector.getBrokerContainer();
655                    if (container != null) {
656                        Broker broker = container.getBroker();
657                        if (broker != null) {
658                            SecurityAdapter securityAdapter = broker.getSecurityAdapter();
659                            if (securityAdapter != null) {
660                                return securityAdapter.authorizeReceive(client, message);
661                            }
662                        }
663                    }
664                }
665            }
666            return true;
667        }
668    }