001 /** 002 * 003 * Copyright 2004 Protique Ltd 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 * 017 **/ 018 package org.activemq.service.impl; 019 020 import javax.jms.JMSException; 021 022 import org.activemq.message.ActiveMQMessage; 023 import org.activemq.message.ActiveMQTopic; 024 import org.activemq.message.ConsumerInfo; 025 import org.activemq.message.MessageAck; 026 import org.activemq.service.MessageContainerAdmin; 027 import org.activemq.service.MessageIdentity; 028 import org.activemq.service.Subscription; 029 import org.activemq.service.TopicMessageContainer; 030 import org.activemq.store.RecoveryListener; 031 import org.activemq.store.TopicMessageStore; 032 033 /** 034 * A default implementation of a Durable Topic based 035 * {@link org.activemq.service.MessageContainer} 036 * which acts as an adapter between the {@link org.activemq.service.MessageContainerManager} 037 * requirements and those of the persistent {@link TopicMessageStore} implementations. 038 * 039 * @version $Revision: 1.1.1.1 $ 040 */ 041 public class DurableTopicMessageContainer implements TopicMessageContainer, MessageContainerAdmin { 042 043 private TopicMessageStore messageStore; 044 private String destinationName; 045 private MessageIdentity lastMessageIdentity; 046 private final DurableTopicMessageContainerManager manager; 047 048 public DurableTopicMessageContainer(DurableTopicMessageContainerManager manager, TopicMessageStore messageStore, String destinationName) { 049 this.manager = manager; 050 this.messageStore = messageStore; 051 this.destinationName = destinationName; 052 } 053 054 public String getDestinationName() { 055 return destinationName; 056 } 057 058 public void addMessage(ActiveMQMessage message) throws JMSException { 059 messageStore.addMessage(message); 060 lastMessageIdentity = message.getJMSMessageIdentity(); 061 } 062 063 public void delete(MessageIdentity messageID, MessageAck ack) throws JMSException { 064 // only called in MessagePointer and so shouldn't really delete 065 //messageStore.removeMessage(new MessageIdentity(messageID)); 066 } 067 068 public boolean containsMessage(MessageIdentity messageIdentity) throws JMSException { 069 /** TODO: make more optimal implementation */ 070 return getMessage(messageIdentity) != null; 071 } 072 073 public ActiveMQMessage getMessage(MessageIdentity messageID) throws JMSException { 074 return messageStore.getMessage(messageID); 075 } 076 077 public void registerMessageInterest(MessageIdentity messageIdentity) throws JMSException { 078 messageStore.incrementMessageCount(messageIdentity); 079 } 080 081 public void unregisterMessageInterest(MessageIdentity ack) throws JMSException { 082 messageStore.decrementMessageCountAndMaybeDelete(ack); 083 } 084 085 086 public void setLastAcknowledgedMessageID(Subscription subscription, MessageIdentity messageIdentity) throws JMSException { 087 messageStore.setLastAcknowledgedMessageIdentity(subscription.getPersistentKey(), messageIdentity); 088 } 089 090 public void recoverSubscription(final Subscription subscription) throws JMSException { 091 messageStore.recoverSubscription(subscription.getPersistentKey(), lastMessageIdentity, new RecoveryListener() { 092 public void recoverMessage(MessageIdentity messageIdentity) throws JMSException { 093 subscription.addMessage(DurableTopicMessageContainer.this, getMessage(messageIdentity)); 094 } 095 }); 096 } 097 098 public void storeSubscription(ConsumerInfo info, Subscription subscription) throws JMSException { 099 messageStore.setSubscriberEntry(info, subscription.getSubscriptionEntry()); 100 } 101 102 public void start() throws JMSException { 103 lastMessageIdentity = messageStore.getLastestMessageIdentity(); 104 messageStore.start(); 105 } 106 107 public void stop() throws JMSException { 108 messageStore.stop(); 109 } 110 111 /** 112 * @see org.activemq.service.MessageContainer#getMessageContainerAdmin() 113 */ 114 public MessageContainerAdmin getMessageContainerAdmin() { 115 return this; 116 } 117 118 /** 119 * @see org.activemq.service.MessageContainerAdmin#empty() 120 */ 121 public void empty() throws JMSException { 122 if( manager.isConsumerActiveOnDestination(new ActiveMQTopic(destinationName)) ) { 123 messageStore.removeAllMessages(); 124 } else { 125 throw new JMSException("Cannot empty a topic while it is use."); 126 } 127 } 128 129 /** 130 * @see org.activemq.service.MessageContainer#isDeadLetterQueue() 131 */ 132 public boolean isDeadLetterQueue() { 133 return false; 134 } 135 136 public void deleteSubscription(String sub) throws JMSException { 137 messageStore.deleteSubscription(sub); 138 } 139 }