001 /** 002 * 003 * Copyright 2004 Protique Ltd 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 * 017 **/ 018 019 package org.activemq.transport.reliable; 020 import java.net.URI; 021 import java.util.List; 022 023 import javax.jms.ExceptionListener; 024 import javax.jms.JMSException; 025 026 import org.activemq.TimeoutExpiredException; 027 import org.activemq.UnsupportedWireFormatException; 028 import org.activemq.io.WireFormat; 029 import org.activemq.message.Packet; 030 import org.activemq.message.PacketListener; 031 import org.activemq.message.Receipt; 032 import org.activemq.message.ReceiptHolder; 033 import org.activemq.transport.TransportChannel; 034 import org.activemq.transport.TransportStatusEvent; 035 import org.activemq.transport.composite.CompositeTransportChannel; 036 import org.apache.commons.logging.Log; 037 import org.apache.commons.logging.LogFactory; 038 039 /** 040 * A Compsite implementation of a TransportChannel 041 * 042 * @version $Revision: 1.1.1.1 $ 043 */ 044 public class ReliableTransportChannel extends CompositeTransportChannel implements PacketListener, ExceptionListener { 045 private static final Log log = LogFactory.getLog(ReliableTransportChannel.class); 046 private Object lock = new Object(); 047 private long keepAliveTimeout = 60000L; 048 049 /** 050 * Construct this transport 051 * 052 * @param wireFormat 053 */ 054 public ReliableTransportChannel(WireFormat wireFormat) { 055 super(wireFormat); 056 setMaximumRetries(0); 057 setEstablishConnectionTimeout(0); 058 setFailureSleepTime(5000L); 059 } 060 061 /** 062 * Construct this transport 063 * 064 * @param wireFormat 065 * @param uris 066 */ 067 public ReliableTransportChannel(WireFormat wireFormat, List uris) { 068 super(wireFormat, uris); 069 setMaximumRetries(0); 070 setEstablishConnectionTimeout(0); 071 setFailureSleepTime(5000L); 072 } 073 074 /** 075 * @return pretty print for this 076 */ 077 public String toString() { 078 return "ReliableTransportChannel: " + (channel == null ? "No active channel" : channel.toString()); 079 } 080 081 /** 082 * Sets the number of milliseconds this channel can be idle after a keep-alive packet 083 * has been sent without being disconnected. 084 * 085 * @param timeoutInterval the timeout interval 086 */ 087 public void setKeepAliveTimeout(long timeoutInterval) { 088 this.keepAliveTimeout = timeoutInterval; 089 } 090 091 public long getKeepAliveTimeout() { 092 return keepAliveTimeout; 093 } 094 095 /** 096 * @param packet 097 * @param timeout 098 * @return receipt - or null 099 * @throws JMSException 100 */ 101 public Receipt send(Packet packet, int timeout) throws JMSException { 102 do { 103 TransportChannel tc = getEstablishedChannel(timeout); 104 if (tc != null) { 105 try { 106 return tc.send(packet, timeout); 107 } 108 catch (TimeoutExpiredException e) { 109 throw e; 110 } 111 catch (UnsupportedWireFormatException uwf) { 112 throw uwf; 113 } 114 catch (JMSException jmsEx) { 115 if (isPendingStop()) { 116 break; 117 } 118 doReconnect(tc, timeout); 119 } 120 } 121 } 122 while (!closed.get() && !isPendingStop()); 123 return null; 124 } 125 126 /** 127 * @param packet 128 * @throws JMSException 129 */ 130 public void asyncSend(Packet packet) throws JMSException { 131 long timeout = getEstablishConnectionTimeout(); 132 do { 133 TransportChannel tc = getEstablishedChannel(timeout); 134 if (tc != null) { 135 try { 136 tc.asyncSend(packet); 137 break; 138 } 139 catch (TimeoutExpiredException e) { 140 throw e; 141 } 142 catch (UnsupportedWireFormatException uwf) { 143 throw uwf; 144 } 145 catch (JMSException jmsEx) { 146 if (isPendingStop()) { 147 break; 148 } 149 doReconnect(tc, timeout); 150 } 151 } 152 } 153 while (!closed.get() && !isPendingStop()); 154 } 155 156 public ReceiptHolder asyncSendWithReceipt(Packet packet) throws JMSException { 157 long timeout = getEstablishConnectionTimeout(); 158 do { 159 TransportChannel tc = getEstablishedChannel(timeout); 160 if (tc != null) { 161 try { 162 return tc.asyncSendWithReceipt(packet); 163 } 164 catch (TimeoutExpiredException e) { 165 throw e; 166 } 167 catch (UnsupportedWireFormatException uwf) { 168 throw uwf; 169 } 170 catch (JMSException jmsEx) { 171 if (isPendingStop()) { 172 break; 173 } 174 doReconnect(tc, timeout); 175 } 176 } 177 } 178 while (!closed.get() && !isPendingStop()); 179 return null; 180 } 181 182 protected void configureChannel() { 183 channel.setPacketListener(this); 184 channel.setExceptionListener(this); 185 channel.addTransportStatusEventListener(this); 186 } 187 188 protected URI extractURI(List list) throws JMSException { 189 int idx = 0; 190 if (list.size() > 1) { 191 SMLCGRandom rand = new SMLCGRandom(); 192 do { 193 idx = (int) (rand.nextDouble() * list.size()); 194 } 195 while (idx < 0 || idx >= list.size()); 196 } 197 Object answer = list.remove(idx); 198 if (answer instanceof URI) { 199 return (URI) answer; 200 } 201 else { 202 log.error("#### got: " + answer + " of type: " + answer.getClass()); 203 return null; 204 } 205 } 206 207 /** 208 * consume a packet from the enbedded channel 209 * 210 * @param packet to consume 211 */ 212 public void consume(Packet packet) { 213 //do processing 214 //avoid a lock 215 PacketListener listener = getPacketListener(); 216 if (listener != null) { 217 listener.consume(packet); 218 } 219 } 220 221 /** 222 * handle exception from the embedded channel 223 * 224 * @param jmsEx 225 */ 226 public void onException(JMSException jmsEx) { 227 TransportChannel tc = this.channel; 228 if (jmsEx instanceof UnsupportedWireFormatException) { 229 fireException(jmsEx); 230 } 231 else { 232 try { 233 doReconnect(tc, getEstablishConnectionTimeout()); 234 } 235 catch (JMSException ex) { 236 ex.setLinkedException(jmsEx); 237 fireException(ex); 238 } 239 } 240 } 241 242 /** 243 * stop this channel 244 */ 245 public void stop() { 246 super.stop(); 247 fireStatusEvent(super.currentURI, TransportStatusEvent.STOPPED); 248 } 249 250 /** 251 * Fire a JMSException to the exception listener 252 * 253 * @param jmsEx 254 */ 255 protected void fireException(JMSException jmsEx) { 256 ExceptionListener listener = getExceptionListener(); 257 if (listener != null) { 258 listener.onException(jmsEx); 259 } 260 } 261 262 protected TransportChannel getEstablishedChannel(long timeout) throws JMSException { 263 if (!closed.get() && this.channel == null && !isPendingStop()) { 264 establishConnection(timeout); 265 } 266 return this.channel; 267 } 268 269 protected void doReconnect(TransportChannel currentChannel, long timeout) throws JMSException { 270 setTransportConnected(false); 271 if (!closed.get() && !isPendingStop()) { 272 synchronized (lock) { 273 //Loss of connectivity can be signalled from more than one 274 //thread - hence the check here - we want to avoid doing it more than once 275 if (this.channel == currentChannel) { 276 fireStatusEvent(super.currentURI, TransportStatusEvent.DISCONNECTED); 277 try { 278 establishConnection(timeout); 279 } 280 catch (JMSException jmsEx) { 281 fireStatusEvent(super.currentURI, TransportStatusEvent.FAILED); 282 throw jmsEx; 283 } 284 setTransportConnected(true); 285 fireStatusEvent(super.currentURI, TransportStatusEvent.RECONNECTED); 286 } 287 } 288 } 289 } 290 291 protected void doClose() { 292 if (!closed.get()) { 293 if (!isPendingStop()) { 294 setPendingStop(true); 295 stop(); 296 } 297 closed.set(true); 298 } 299 } 300 }