001 /** 002 * 003 * Copyright 2004 Protique Ltd 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 * 017 **/ 018 package org.activemq.service.impl; 019 020 import java.util.Map; 021 022 import javax.transaction.xa.XAException; 023 import javax.transaction.xa.XAResource; 024 025 import org.apache.commons.logging.Log; 026 import org.apache.commons.logging.LogFactory; 027 import org.activemq.message.ActiveMQXid; 028 import org.activemq.store.TransactionStore; 029 030 /** 031 * @version $Revision: 1.1.1.1 $ 032 */ 033 public class XATransactionCommand extends AbstractTransaction { 034 private static final Log log = LogFactory.getLog(TransactionManagerImpl.class); 035 036 private ActiveMQXid xid; 037 private transient Map xaTxs; 038 private transient TransactionStore transactionStore; 039 040 public XATransactionCommand(ActiveMQXid xid, Map xaTxs, TransactionStore transactionStore) { 041 this.xid = xid; 042 this.xaTxs = xaTxs; 043 this.transactionStore = transactionStore; 044 } 045 046 047 /** 048 * Called after the transaction command has been recovered from disk 049 * 050 * @param xaTxs 051 * @param preparedTransactions 052 */ 053 public void initialise(Map xaTxs, TransactionStore preparedTransactions) { 054 this.xaTxs = xaTxs; 055 this.transactionStore = preparedTransactions; 056 } 057 058 public void commit(boolean onePhase) throws XAException { 059 if(log.isDebugEnabled()) 060 log.debug("XA Transaction commit: "+xid); 061 062 switch (getState()) { 063 case START_STATE: 064 // 1 phase commit, no work done. 065 checkForPreparedState(onePhase); 066 setStateFinished(); 067 break; 068 case IN_USE_STATE: 069 // 1 phase commit, work done. 070 checkForPreparedState(onePhase); 071 doPrePrepare(); 072 setStateFinished(); 073 transactionStore.commit(getTransactionId(), false); 074 doPostCommit(); 075 break; 076 case PREPARED_STATE: 077 // 2 phase commit, work done. 078 // We would record commit here. 079 setStateFinished(); 080 transactionStore.commit(getTransactionId(), true); 081 doPostCommit(); 082 break; 083 default: 084 illegalStateTransition("commit"); 085 } 086 } 087 088 private void illegalStateTransition(String callName) throws XAException { 089 XAException xae = new XAException("Cannot call " + callName + " now."); 090 xae.errorCode = XAException.XAER_PROTO; 091 throw xae; 092 } 093 094 private void checkForPreparedState(boolean onePhase) throws XAException { 095 if (!onePhase) { 096 XAException xae = new XAException("Cannot do 2 phase commit if the transaction has not been prepared."); 097 xae.errorCode = XAException.XAER_PROTO; 098 throw xae; 099 } 100 } 101 102 private void doPrePrepare() throws XAException { 103 try { 104 prePrepare(); 105 } catch (XAException e) { 106 throw e; 107 } catch (Throwable e) { 108 log.warn("PRE-PREPARE FAILED: ", e); 109 rollback(); 110 XAException xae = new XAException("PRE-PREPARE FAILED: Transaction rolled back."); 111 xae.errorCode = XAException.XA_RBOTHER; 112 xae.initCause(e); 113 throw xae; 114 } 115 } 116 117 private void doPostCommit() throws XAException { 118 try { 119 postCommit(); 120 } 121 catch (Throwable e) { 122 // I guess this could happen. Post commit task failed 123 // to execute properly. 124 log.warn("POST COMMIT FAILED: ", e); 125 XAException xae = new XAException("POST COMMIT FAILED"); 126 xae.errorCode = XAException.XAER_RMERR; 127 xae.initCause(e); 128 throw xae; 129 } 130 } 131 132 public void rollback() throws XAException { 133 134 if(log.isDebugEnabled()) 135 log.debug("XA Transaction rollback: "+xid); 136 137 switch (getState()) { 138 case START_STATE: 139 // 1 phase rollback no work done. 140 setStateFinished(); 141 break; 142 case IN_USE_STATE: 143 // 1 phase rollback work done. 144 setStateFinished(); 145 transactionStore.rollback(getTransactionId()); 146 doPostRollback(); 147 break; 148 case PREPARED_STATE: 149 // 2 phase rollback work done. 150 setStateFinished(); 151 transactionStore.rollback(getTransactionId()); 152 doPostRollback(); 153 break; 154 } 155 156 } 157 158 private void doPostRollback() throws XAException { 159 try { 160 postRollback(); 161 } 162 catch (Throwable e) { 163 // I guess this could happen. Post commit task failed 164 // to execute properly. 165 log.warn("POST ROLLBACK FAILED: ", e); 166 XAException xae = new XAException("POST ROLLBACK FAILED"); 167 xae.errorCode = XAException.XAER_RMERR; 168 xae.initCause(e); 169 throw xae; 170 } 171 } 172 173 public int prepare() throws XAException { 174 if(log.isDebugEnabled()) 175 log.debug("XA Transaction prepare: "+xid); 176 177 switch (getState()) { 178 case START_STATE: 179 // No work done.. no commit/rollback needed. 180 setStateFinished(); 181 return XAResource.XA_RDONLY; 182 case IN_USE_STATE: 183 // We would record prepare here. 184 doPrePrepare(); 185 setState(AbstractTransaction.PREPARED_STATE); 186 transactionStore.prepare(getTransactionId()); 187 return XAResource.XA_OK; 188 default : 189 illegalStateTransition("prepare"); 190 return XAResource.XA_RDONLY; 191 } 192 } 193 194 private void setStateFinished() { 195 setState(AbstractTransaction.FINISHED_STATE); 196 xaTxs.remove(xid); 197 } 198 199 public boolean isXaTransacted() { 200 return true; 201 } 202 203 public Object getTransactionId() { 204 return xid; 205 } 206 }