001 /** 002 * 003 * Copyright 2004 Hiram Chirino 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 * 017 **/ 018 019 package org.activemq.store.journal; 020 021 import java.io.IOException; 022 import java.util.ArrayList; 023 import java.util.Iterator; 024 025 import javax.jms.JMSException; 026 import javax.transaction.xa.XAException; 027 028 import org.activeio.journal.RecordLocation; 029 import org.activemq.message.ActiveMQMessage; 030 import org.activemq.message.ActiveMQXid; 031 import org.activemq.message.MessageAck; 032 import org.activemq.store.TransactionStore; 033 import org.apache.derby.iapi.store.raw.xact.TransactionId; 034 035 import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap; 036 037 /** 038 */ 039 public class JournalTransactionStore implements TransactionStore { 040 041 private final JournalPersistenceAdapter peristenceAdapter; 042 ConcurrentHashMap inflightTransactions = new ConcurrentHashMap(); 043 ConcurrentHashMap preparedTransactions = new ConcurrentHashMap(); 044 045 public static class TxOperation { 046 047 static final byte ADD_OPERATION_TYPE = 0; 048 static final byte REMOVE_OPERATION_TYPE = 1; 049 static final byte ACK_OPERATION_TYPE = 3; 050 051 public byte operationType; 052 public JournalMessageStore store; 053 public Object data; 054 055 public TxOperation(byte operationType, JournalMessageStore store, Object data) { 056 this.operationType=operationType; 057 this.store=store; 058 this.data=data; 059 } 060 061 } 062 /** 063 * Operations 064 * @version $Revision: 1.3 $ 065 */ 066 public static class Tx { 067 068 private final RecordLocation location; 069 private ArrayList operations = new ArrayList(); 070 071 public Tx(RecordLocation location) { 072 this.location=location; 073 } 074 075 public void add(JournalMessageStore store, ActiveMQMessage msg) { 076 operations.add(new TxOperation(TxOperation.ADD_OPERATION_TYPE, store, msg)); 077 } 078 079 public void add(JournalMessageStore store, MessageAck ack) { 080 operations.add(new TxOperation(TxOperation.REMOVE_OPERATION_TYPE, store, ack)); 081 } 082 083 public void add(JournalTopicMessageStore store, JournalAck ack) { 084 operations.add(new TxOperation(TxOperation.ACK_OPERATION_TYPE, store, ack)); 085 } 086 087 public ActiveMQMessage[] getMessages() { 088 ArrayList list = new ArrayList(); 089 for (Iterator iter = operations.iterator(); iter.hasNext();) { 090 TxOperation op = (TxOperation) iter.next(); 091 if( op.operationType==TxOperation.ADD_OPERATION_TYPE ) { 092 list.add(op.data); 093 } 094 } 095 ActiveMQMessage rc[] = new ActiveMQMessage[list.size()]; 096 list.toArray(rc); 097 return rc; 098 } 099 100 public MessageAck[] getAcks() { 101 ArrayList list = new ArrayList(); 102 for (Iterator iter = operations.iterator(); iter.hasNext();) { 103 TxOperation op = (TxOperation) iter.next(); 104 if( op.operationType==TxOperation.REMOVE_OPERATION_TYPE ) { 105 list.add(op.data); 106 } 107 } 108 MessageAck rc[] = new MessageAck[list.size()]; 109 list.toArray(rc); 110 return rc; 111 } 112 113 public ArrayList getOperations() { 114 return operations; 115 } 116 117 } 118 119 public interface AddMessageCommand { 120 ActiveMQMessage getMessage(); 121 122 void run() throws IOException; 123 } 124 125 public interface RemoveMessageCommand { 126 MessageAck getMessageAck(); 127 128 void run() throws IOException; 129 } 130 131 public JournalTransactionStore(JournalPersistenceAdapter adapter) { 132 this.peristenceAdapter = adapter; 133 } 134 135 /** 136 * @throws XAException 137 * @throws IOException 138 * @see org.activemq.store.TransactionStore#prepare(TransactionId) 139 */ 140 public void prepare(Object txid) throws XAException { 141 Tx tx = (Tx) inflightTransactions.remove(txid); 142 if (tx == null) 143 return; 144 peristenceAdapter.writeTxCommand(new TxCommand(TxCommand.XA_PREPARE, txid, false), true); 145 preparedTransactions.put(txid, tx); 146 } 147 148 /** 149 * @throws IOException 150 * @see org.activemq.store.TransactionStore#prepare(TransactionId) 151 */ 152 public void replayPrepare(Object txid) throws IOException { 153 Tx tx = (Tx) inflightTransactions.remove(txid); 154 if (tx == null) 155 return; 156 preparedTransactions.put(txid, tx); 157 } 158 159 public Tx getTx(Object txid, RecordLocation location) { 160 Tx tx = (Tx) inflightTransactions.get(txid); 161 if (tx == null) { 162 tx = new Tx(location); 163 inflightTransactions.put(txid, tx); 164 } 165 return tx; 166 } 167 168 /** 169 * @throws XAException 170 * @throws XAException 171 * @see org.activemq.store.TransactionStore#commit(org.activemq.service.Transaction) 172 */ 173 public void commit(Object txid, boolean wasPrepared) throws XAException { 174 Tx tx; 175 if (wasPrepared) { 176 tx = (Tx) preparedTransactions.remove(txid); 177 } else { 178 tx = (Tx) inflightTransactions.remove(txid); 179 } 180 181 if (tx == null) 182 return; 183 184 if (txid.getClass() == ActiveMQXid.class ) { 185 peristenceAdapter.writeTxCommand(new TxCommand(TxCommand.XA_COMMIT, txid, wasPrepared), 186 true); 187 } else { 188 peristenceAdapter.writeTxCommand(new TxCommand(TxCommand.LOCAL_COMMIT, txid, wasPrepared), 189 true); 190 } 191 } 192 193 /** 194 * @throws XAException 195 * @see org.activemq.store.TransactionStore#commit(org.activemq.service.Transaction) 196 */ 197 public Tx replayCommit(Object txid, boolean wasPrepared) throws IOException { 198 if (wasPrepared) { 199 return (Tx) preparedTransactions.remove(txid); 200 } else { 201 return (Tx) inflightTransactions.remove(txid); 202 } 203 } 204 205 /** 206 * @throws XAException 207 * @throws IOException 208 * @see org.activemq.store.TransactionStore#rollback(TransactionId) 209 */ 210 public void rollback(Object txid) throws XAException { 211 212 Tx tx = (Tx) inflightTransactions.remove(txid); 213 if (tx != null) 214 tx = (Tx) preparedTransactions.remove(txid); 215 216 if (tx != null) { 217 if (txid.getClass() == ActiveMQXid.class ) { 218 peristenceAdapter.writeTxCommand(new TxCommand(TxCommand.XA_ROLLBACK, txid, false), 219 true); 220 } else { 221 peristenceAdapter.writeTxCommand(new TxCommand(TxCommand.LOCAL_ROLLBACK, txid, false), 222 true); 223 } 224 } 225 226 } 227 228 /** 229 * @throws IOException 230 * @see org.activemq.store.TransactionStore#rollback(TransactionId) 231 */ 232 public void replayRollback(Object txid) throws IOException { 233 Tx tx = (Tx) inflightTransactions.remove(txid); 234 if (tx != null) 235 tx = (Tx) preparedTransactions.remove(txid); 236 } 237 238 synchronized public void recover(RecoveryListener listener) throws XAException { 239 // All the inflight transactions get rolled back.. 240 inflightTransactions.clear(); 241 try { 242 for (Iterator iter = preparedTransactions.keySet().iterator(); iter.hasNext();) { 243 Object txid = (Object) iter.next(); 244 Tx tx = (Tx) preparedTransactions.get(txid); 245 try { 246 listener.recover((ActiveMQXid) txid,tx.getMessages(), tx.getAcks()); 247 } catch (JMSException e) { 248 throw (XAException)new XAException().initCause(e); 249 } 250 } 251 } finally { 252 } 253 } 254 255 /** 256 * @param message 257 * @throws IOException 258 */ 259 void addMessage(JournalMessageStore store, ActiveMQMessage message, RecordLocation location) { 260 Tx tx = getTx(message.getTransactionId(), location); 261 tx.add(store, message); 262 } 263 264 /** 265 * @param ack 266 * @throws IOException 267 */ 268 public void removeMessage(JournalMessageStore store, MessageAck ack, RecordLocation location) { 269 Tx tx = getTx(ack.getTransactionId(), location); 270 tx.add(store, ack); 271 } 272 273 274 public void acknowledge(JournalTopicMessageStore store, JournalAck ack, RecordLocation location) { 275 Tx tx = getTx(ack.getTransactionId(), location); 276 tx.add(store, ack); 277 } 278 279 280 public RecordLocation checkpoint() throws IOException { 281 282 // Nothing really to checkpoint.. since, we don't 283 // checkpoint tx operations in to long term store until they are committed. 284 285 // But we keep track of the first location of an operation 286 // that was associated with an active tx. The journal can not 287 // roll over active tx records. 288 RecordLocation rc = null; 289 for (Iterator iter = inflightTransactions.values().iterator(); iter.hasNext();) { 290 Tx tx = (Tx) iter.next(); 291 RecordLocation location = tx.location; 292 if (rc == null || rc.compareTo(location) < 0) { 293 rc = location; 294 } 295 } 296 for (Iterator iter = preparedTransactions.values().iterator(); iter.hasNext();) { 297 Tx tx = (Tx) iter.next(); 298 RecordLocation location = tx.location; 299 if (rc == null || rc.compareTo(location) < 0) { 300 rc = location; 301 } 302 } 303 return rc; 304 } 305 306 public void start() throws JMSException { 307 } 308 309 public void stop() throws JMSException { 310 } 311 312 313 }