001 /** 002 * 003 * Copyright 2004 Hiram Chirino 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 * 017 **/ 018 package org.activemq.store.jdbc; 019 020 import java.io.IOException; 021 import java.sql.Connection; 022 import java.sql.SQLException; 023 024 import javax.jms.JMSException; 025 026 import org.activemq.io.WireFormat; 027 import org.activemq.message.ActiveMQMessage; 028 import org.activemq.message.MessageAck; 029 import org.activemq.service.MessageIdentity; 030 import org.activemq.store.MessageStore; 031 import org.activemq.store.RecoveryListener; 032 import org.activemq.store.jdbc.JDBCAdapter.MessageListResultHandler; 033 import org.activemq.util.JMSExceptionHelper; 034 import org.activemq.util.LongSequenceGenerator; 035 036 /** 037 * @version $Revision: 1.1 $ 038 */ 039 public class JDBCMessageStore implements MessageStore { 040 041 protected final WireFormat wireFormat; 042 protected final String destinationName; 043 protected final LongSequenceGenerator sequenceGenerator; 044 protected final JDBCAdapter adapter; 045 protected final JDBCPersistenceAdapter persistenceAdapter; 046 047 public JDBCMessageStore(JDBCPersistenceAdapter persistenceAdapter, JDBCAdapter adapter, WireFormat wireFormat, String destinationName) { 048 this.persistenceAdapter = persistenceAdapter; 049 this.adapter = adapter; 050 this.sequenceGenerator = adapter.getSequenceGenerator(); 051 this.wireFormat = wireFormat; 052 this.destinationName = destinationName; 053 } 054 055 public void addMessage(ActiveMQMessage message) throws JMSException { 056 057 // Serialize the Message.. 058 String messageID = message.getJMSMessageID(); 059 byte data[]; 060 try { 061 data = wireFormat.toBytes(message); 062 } catch (IOException e) { 063 throw JMSExceptionHelper.newJMSException("Failed to broker message: " + messageID + " in container: " + e, e); 064 } 065 066 long seq=sequenceGenerator.getNextSequenceId(); 067 068 // Get a connection and insert the message into the DB. 069 Connection c = null; 070 try { 071 c = persistenceAdapter.getConnection(); 072 adapter.doAddMessage(c, seq, messageID, destinationName, data, message.getJMSExpiration()); 073 } catch (SQLException e) { 074 throw JMSExceptionHelper.newJMSException("Failed to broker message: " + messageID + " in container: " + e, e); 075 } finally { 076 persistenceAdapter.returnConnection(c); 077 } 078 079 MessageIdentity answer = message.getJMSMessageIdentity(); 080 answer.setSequenceNumber(new Long(seq)); 081 } 082 083 084 public ActiveMQMessage getMessage(MessageIdentity identity) throws JMSException { 085 086 long id; 087 try { 088 id = getMessageSequenceId(identity); 089 } catch (JMSException e1) { 090 return null; 091 } 092 093 // Get a connection and pull the message out of the DB 094 Connection c = null; 095 try { 096 c = persistenceAdapter.getConnection(); 097 byte data[] = adapter.doGetMessage(c, id); 098 if( data==null ) 099 return null; 100 101 ActiveMQMessage answer = (ActiveMQMessage) wireFormat.fromBytes(data);; 102 answer.setJMSMessageID(identity.getMessageID()); 103 answer.setJMSMessageIdentity(identity); 104 return answer; 105 } catch (IOException e) { 106 throw JMSExceptionHelper.newJMSException("Failed to broker message: " + identity.getMessageID() + " in container: " + e, e); 107 } catch (SQLException e) { 108 throw JMSExceptionHelper.newJMSException("Failed to broker message: " + identity.getMessageID() + " in container: " + e, e); 109 } finally { 110 persistenceAdapter.returnConnection(c); 111 } 112 } 113 114 /** 115 * @param identity 116 * @return 117 * @throws JMSException 118 */ 119 protected long getMessageSequenceId(MessageIdentity identity) throws JMSException { 120 Object sequenceNumber = identity.getSequenceNumber(); 121 if (sequenceNumber != null && sequenceNumber.getClass() == Long.class) { 122 return ((Long) sequenceNumber).longValue(); 123 } else { 124 // Get a connection and pull the message out of the DB 125 Connection c = null; 126 try { 127 c = persistenceAdapter.getConnection(); 128 Long rc = adapter.getMessageSequenceId(c, identity.getMessageID()); 129 if (rc == null) 130 throw new JMSException("Could not locate message in database with message id: " 131 + identity.getMessageID()); 132 return rc.longValue(); 133 } catch (SQLException e) { 134 throw JMSExceptionHelper.newJMSException("Failed to broker message: " + identity.getMessageID() 135 + " in container: " + e, e); 136 } finally { 137 persistenceAdapter.returnConnection(c); 138 } 139 } 140 } 141 142 public void removeMessage(MessageAck ack) throws JMSException { 143 long seq = getMessageSequenceId(ack.getMessageIdentity()); 144 145 // Get a connection and remove the message from the DB 146 Connection c = null; 147 try { 148 c = persistenceAdapter.getConnection(); 149 adapter.doRemoveMessage(c, seq); 150 } catch (SQLException e) { 151 throw JMSExceptionHelper.newJMSException("Failed to broker message: " + ack.getMessageID() + " in container: " + e, e); 152 } finally { 153 persistenceAdapter.returnConnection(c); 154 } 155 } 156 157 158 public void recover(final RecoveryListener listener) throws JMSException { 159 160 // Get all the Message ids out of the database. 161 Connection c = null; 162 try { 163 c = persistenceAdapter.getConnection(); 164 adapter.doRecover(c, destinationName, new MessageListResultHandler() { 165 public void onMessage(long seq, String messageID) throws JMSException { 166 listener.recoverMessage(new MessageIdentity(messageID, new Long(seq))); 167 } 168 }); 169 170 } catch (SQLException e) { 171 throw JMSExceptionHelper.newJMSException("Failed to recover container. Reason: " + e, e); 172 } finally { 173 persistenceAdapter.returnConnection(c); 174 } 175 } 176 177 public void start() throws JMSException { 178 } 179 180 public void stop() throws JMSException { 181 } 182 183 /** 184 * @see org.activemq.store.MessageStore#removeAllMessages() 185 */ 186 public void removeAllMessages() throws JMSException { 187 // Get a connection and remove the message from the DB 188 Connection c = null; 189 try { 190 c = persistenceAdapter.getConnection(); 191 adapter.doRemoveAllMessages(c, destinationName); 192 } catch (SQLException e) { 193 throw JMSExceptionHelper.newJMSException("Failed to broker remove all messages: " + e, e); 194 } finally { 195 persistenceAdapter.returnConnection(c); 196 } 197 } 198 }