001    /** 
002     * 
003     * Copyright 2004 Protique Ltd
004     * 
005     * Licensed under the Apache License, Version 2.0 (the "License"); 
006     * you may not use this file except in compliance with the License. 
007     * You may obtain a copy of the License at 
008     * 
009     * http://www.apache.org/licenses/LICENSE-2.0
010     * 
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS, 
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
014     * See the License for the specific language governing permissions and 
015     * limitations under the License. 
016     * 
017     **/
018    package org.activemq.store.vm;
019    
020    import java.util.Collections;
021    import java.util.HashMap;
022    import java.util.Iterator;
023    import java.util.LinkedHashMap;
024    import java.util.Map;
025    
026    import javax.jms.JMSException;
027    
028    import org.activemq.message.ActiveMQMessage;
029    import org.activemq.message.ConsumerInfo;
030    import org.activemq.service.MessageIdentity;
031    import org.activemq.service.SubscriberEntry;
032    import org.activemq.store.RecoveryListener;
033    import org.activemq.store.TopicMessageStore;
034    
035    /**
036     * @version $Revision: 1.1.1.1 $
037     */
038    public class VMTopicMessageStore extends VMMessageStore implements TopicMessageStore {
039        private static final Integer ONE = new Integer(1);
040    
041        private Map ackDatabase;
042        private Map messageCounts;
043        private Map subscriberDatabase;
044    
045        public VMTopicMessageStore() {
046            this(new LinkedHashMap(), makeMap(), makeMap(), makeMap());
047        }
048    
049        public VMTopicMessageStore(LinkedHashMap messageTable, Map subscriberDatabase, Map ackDatabase, Map messageCounts) {
050            super(messageTable);
051            this.subscriberDatabase = subscriberDatabase;
052            this.ackDatabase = ackDatabase;
053            this.messageCounts = messageCounts;
054        }
055    
056        public synchronized void incrementMessageCount(MessageIdentity messageId) throws JMSException {
057            Integer number = (Integer) messageCounts.get(messageId.getMessageID());
058            if (number == null) {
059                number = ONE;
060            }
061            else {
062                number = new Integer(number.intValue() + 1);
063            }
064            messageCounts.put(messageId.getMessageID(), number);
065        }
066    
067        public synchronized void decrementMessageCountAndMaybeDelete(MessageIdentity msgId) throws JMSException {
068            Integer number = (Integer) messageCounts.get(msgId.getMessageID());
069            if (number == null || number.intValue() <= 1) {
070                removeMessage(msgId);
071                if (number != null) {
072                    messageCounts.remove(msgId.getMessageID());
073                }
074            }
075            else {
076                messageCounts.put(msgId.getMessageID(), new Integer(number.intValue() - 1));
077                number = ONE;
078            }
079        }
080    
081        public void setLastAcknowledgedMessageIdentity(String subscription, MessageIdentity messageIdentity) throws JMSException {
082            ackDatabase.put(subscription, messageIdentity);
083        }
084    
085        public synchronized void recoverSubscription(String subscriptionId, MessageIdentity lastDispatchedMessage, RecoveryListener listener) throws JMSException {
086            //iterate through the message table and populate the subscriber
087            Map map = new HashMap(messageTable);
088            boolean alreadyAcked = true;
089            MessageIdentity lastAcked = (MessageIdentity)ackDatabase.get(subscriptionId);
090                    if( lastAcked==null )
091                            return;
092                    
093            for (Iterator i = map.values().iterator(); i.hasNext(); ){
094                ActiveMQMessage msg = (ActiveMQMessage)i.next();                    
095                if (!alreadyAcked){
096                    listener.recoverMessage(msg.getJMSMessageIdentity());
097                }
098                if (lastAcked.getMessageID().equals(msg.getJMSMessageID())){
099                    alreadyAcked = false;
100                }                   
101            }
102        }
103    
104        public MessageIdentity getLastestMessageIdentity() throws JMSException {
105            return super.lastMessageIdentity;
106        }
107    
108        public SubscriberEntry getSubscriberEntry(ConsumerInfo info) throws JMSException {
109            Object key = info.getConsumerKey();
110            return (SubscriberEntry) subscriberDatabase.get(key);
111        }
112    
113        public void setSubscriberEntry(ConsumerInfo info, SubscriberEntry subscriberEntry) throws JMSException {
114            subscriberDatabase.put(info.getConsumerKey(), subscriberEntry);
115        }
116    
117        public void stop() throws JMSException {
118        }
119    
120        protected static Map makeMap() {
121            return Collections.synchronizedMap(new HashMap());
122        }
123    
124        public void deleteSubscription(String sub) {
125            ackDatabase.remove(sub);        
126        }
127    
128    }