001    /** 
002     * 
003     * Copyright 2004 Protique Ltd
004     * 
005     * Licensed under the Apache License, Version 2.0 (the "License"); 
006     * you may not use this file except in compliance with the License. 
007     * You may obtain a copy of the License at 
008     * 
009     * http://www.apache.org/licenses/LICENSE-2.0
010     * 
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS, 
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
014     * See the License for the specific language governing permissions and 
015     * limitations under the License. 
016     * 
017     **/
018    package org.activemq.service.impl;
019    
020    import javax.jms.JMSException;
021    
022    import org.activemq.message.ActiveMQMessage;
023    import org.activemq.message.ActiveMQTopic;
024    import org.activemq.message.ConsumerInfo;
025    import org.activemq.message.MessageAck;
026    import org.activemq.service.MessageContainerAdmin;
027    import org.activemq.service.MessageIdentity;
028    import org.activemq.service.Subscription;
029    import org.activemq.service.TopicMessageContainer;
030    import org.activemq.store.RecoveryListener;
031    import org.activemq.store.TopicMessageStore;
032    
033    /**
034     * A default implementation of a Durable Topic based
035     * {@link org.activemq.service.MessageContainer}
036     * which acts as an adapter between the {@link org.activemq.service.MessageContainerManager}
037     * requirements and those of the persistent {@link TopicMessageStore} implementations.
038     *
039     * @version $Revision: 1.1.1.1 $
040     */
041    public class DurableTopicMessageContainer implements TopicMessageContainer, MessageContainerAdmin {
042    
043        private TopicMessageStore messageStore;
044        private String destinationName;
045        private MessageIdentity lastMessageIdentity;
046        private final DurableTopicMessageContainerManager manager;
047    
048        public DurableTopicMessageContainer(DurableTopicMessageContainerManager manager, TopicMessageStore messageStore, String destinationName) {
049            this.manager = manager;
050            this.messageStore = messageStore;
051            this.destinationName = destinationName;
052        }
053    
054        public String getDestinationName() {
055            return destinationName;
056        }
057    
058        public void addMessage(ActiveMQMessage message) throws JMSException {
059            messageStore.addMessage(message);
060            lastMessageIdentity = message.getJMSMessageIdentity();
061        }
062    
063        public void delete(MessageIdentity messageID, MessageAck ack) throws JMSException {
064            // only called in MessagePointer and so shouldn't really delete
065            //messageStore.removeMessage(new MessageIdentity(messageID));
066        }
067    
068        public boolean containsMessage(MessageIdentity messageIdentity) throws JMSException {
069            /** TODO: make more optimal implementation */
070            return getMessage(messageIdentity) != null;
071        }
072    
073        public ActiveMQMessage getMessage(MessageIdentity messageID) throws JMSException {
074            return messageStore.getMessage(messageID);
075        }
076    
077        public void registerMessageInterest(MessageIdentity messageIdentity) throws JMSException {
078            messageStore.incrementMessageCount(messageIdentity);
079        }
080    
081        public void unregisterMessageInterest(MessageIdentity ack) throws JMSException {
082            messageStore.decrementMessageCountAndMaybeDelete(ack);
083        }
084    
085    
086        public void setLastAcknowledgedMessageID(Subscription subscription, MessageIdentity messageIdentity) throws JMSException {
087            messageStore.setLastAcknowledgedMessageIdentity(subscription.getPersistentKey(), messageIdentity);
088        }
089    
090        public void recoverSubscription(final Subscription subscription) throws JMSException {
091            messageStore.recoverSubscription(subscription.getPersistentKey(), lastMessageIdentity, new RecoveryListener() {
092                public void recoverMessage(MessageIdentity messageIdentity) throws JMSException {
093                    subscription.addMessage(DurableTopicMessageContainer.this, getMessage(messageIdentity));
094                }
095            });
096        }
097    
098        public void storeSubscription(ConsumerInfo info, Subscription subscription) throws JMSException {
099            messageStore.setSubscriberEntry(info, subscription.getSubscriptionEntry());
100        }
101    
102        public void start() throws JMSException {
103            lastMessageIdentity = messageStore.getLastestMessageIdentity();
104            messageStore.start();
105        }
106    
107        public void stop() throws JMSException {
108            messageStore.stop();
109        }
110    
111        /**
112         * @see org.activemq.service.MessageContainer#getMessageContainerAdmin()
113         */
114        public MessageContainerAdmin getMessageContainerAdmin() {
115            return this;
116        }
117    
118        /**
119         * @see org.activemq.service.MessageContainerAdmin#empty()
120         */
121        public void empty() throws JMSException {
122            if( manager.isConsumerActiveOnDestination(new ActiveMQTopic(destinationName)) ) {
123                messageStore.removeAllMessages();
124            } else {
125                throw new JMSException("Cannot empty a topic while it is use.");
126            }
127        }
128    
129        /**
130         * @see org.activemq.service.MessageContainer#isDeadLetterQueue()
131         */
132        public boolean isDeadLetterQueue() {
133            return false;
134        }
135    
136        public void deleteSubscription(String sub) throws JMSException {
137            messageStore.deleteSubscription(sub);      
138        }
139    }