001    /** 
002     * 
003     * Copyright 2004 Protique Ltd
004     * 
005     * Licensed under the Apache License, Version 2.0 (the "License"); 
006     * you may not use this file except in compliance with the License. 
007     * You may obtain a copy of the License at 
008     * 
009     * http://www.apache.org/licenses/LICENSE-2.0
010     * 
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS, 
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
014     * See the License for the specific language governing permissions and 
015     * limitations under the License. 
016     * 
017     **/
018    
019    package org.activemq.transport.reliable;
020    import java.net.URI;
021    import java.util.List;
022    
023    import javax.jms.ExceptionListener;
024    import javax.jms.JMSException;
025    
026    import org.activemq.TimeoutExpiredException;
027    import org.activemq.UnsupportedWireFormatException;
028    import org.activemq.io.WireFormat;
029    import org.activemq.message.Packet;
030    import org.activemq.message.PacketListener;
031    import org.activemq.message.Receipt;
032    import org.activemq.message.ReceiptHolder;
033    import org.activemq.transport.TransportChannel;
034    import org.activemq.transport.TransportStatusEvent;
035    import org.activemq.transport.composite.CompositeTransportChannel;
036    import org.apache.commons.logging.Log;
037    import org.apache.commons.logging.LogFactory;
038    
039    /**
040     * A Compsite implementation of a TransportChannel
041     * 
042     * @version $Revision: 1.1.1.1 $
043     */
044    public class ReliableTransportChannel extends CompositeTransportChannel implements PacketListener, ExceptionListener {
045        private static final Log log = LogFactory.getLog(ReliableTransportChannel.class);
046        private Object lock = new Object();
047        private long keepAliveTimeout = 60000L;
048    
049        /**
050         * Construct this transport
051         * 
052         * @param wireFormat
053         */
054        public ReliableTransportChannel(WireFormat wireFormat) {
055            super(wireFormat);
056            setMaximumRetries(0);
057            setEstablishConnectionTimeout(0);
058            setFailureSleepTime(5000L);
059        }
060    
061        /**
062         * Construct this transport
063         * 
064         * @param wireFormat
065         * @param uris
066         */
067        public ReliableTransportChannel(WireFormat wireFormat, List uris) {
068            super(wireFormat, uris);
069            setMaximumRetries(0);
070            setEstablishConnectionTimeout(0);
071            setFailureSleepTime(5000L);
072        }
073    
074        /**
075         * @return pretty print for this
076         */
077        public String toString() {
078            return "ReliableTransportChannel: " + (channel == null ? "No active channel" : channel.toString());
079        }
080    
081            /**
082             * Sets the number of milliseconds this channel can be idle after a keep-alive packet
083             * has been sent without being disconnected.
084             * 
085             * @param timeoutInterval the timeout interval
086             */
087            public void setKeepAliveTimeout(long timeoutInterval) {
088                    this.keepAliveTimeout = timeoutInterval;
089            }
090            
091            public long getKeepAliveTimeout() {
092                    return keepAliveTimeout;
093            }
094            
095        /**
096         * @param packet
097         * @param timeout
098         * @return receipt - or null
099         * @throws JMSException
100         */
101        public Receipt send(Packet packet, int timeout) throws JMSException {
102            do {
103                TransportChannel tc = getEstablishedChannel(timeout);
104                if (tc != null) {
105                    try {
106                        return tc.send(packet, timeout);
107                    }
108                    catch (TimeoutExpiredException e) {
109                        throw e;
110                    }
111                    catch (UnsupportedWireFormatException uwf) {
112                        throw uwf;
113                    }
114                    catch (JMSException jmsEx) {
115                        if (isPendingStop()) {
116                            break;
117                        }
118                        doReconnect(tc, timeout);
119                    }
120                }
121            }
122            while (!closed.get() && !isPendingStop());
123            return null;
124        }
125    
126        /**
127         * @param packet
128         * @throws JMSException
129         */
130        public void asyncSend(Packet packet) throws JMSException {
131            long timeout = getEstablishConnectionTimeout();
132            do {
133                TransportChannel tc = getEstablishedChannel(timeout);
134                if (tc != null) {
135                    try {
136                        tc.asyncSend(packet);
137                        break;
138                    }
139                    catch (TimeoutExpiredException e) {
140                        throw e;
141                    }
142                    catch (UnsupportedWireFormatException uwf) {
143                        throw uwf;
144                    }
145                    catch (JMSException jmsEx) {
146                        if (isPendingStop()) {
147                            break;
148                        }
149                        doReconnect(tc, timeout);
150                    }
151                }
152            }
153            while (!closed.get() && !isPendingStop());
154        }
155        
156        public ReceiptHolder asyncSendWithReceipt(Packet packet) throws JMSException {
157            long timeout = getEstablishConnectionTimeout();
158                    do {
159                TransportChannel tc = getEstablishedChannel(timeout);
160                if (tc != null) {
161                    try {
162                        return tc.asyncSendWithReceipt(packet);
163                    }
164                    catch (TimeoutExpiredException e) {
165                        throw e;
166                    }
167                    catch (UnsupportedWireFormatException uwf) {
168                        throw uwf;
169                    }
170                    catch (JMSException jmsEx) {
171                        if (isPendingStop()) {
172                            break;
173                        }
174                        doReconnect(tc, timeout);
175                    }
176                }
177            }
178            while (!closed.get() && !isPendingStop());
179            return null;
180        }
181        
182        protected void configureChannel() {
183            channel.setPacketListener(this);
184            channel.setExceptionListener(this);
185            channel.addTransportStatusEventListener(this);
186        }
187    
188        protected URI extractURI(List list) throws JMSException {
189            int idx = 0;
190            if (list.size() > 1) {
191                SMLCGRandom rand = new SMLCGRandom();
192                do {
193                    idx = (int) (rand.nextDouble() * list.size());
194                }
195                while (idx < 0 || idx >= list.size());
196            }
197            Object answer = list.remove(idx);
198            if (answer instanceof URI) {
199                return (URI) answer;
200            }
201            else {
202                log.error("#### got: " + answer + " of type: " + answer.getClass());
203                return null;
204            }
205        }
206    
207        /**
208         * consume a packet from the enbedded channel
209         * 
210         * @param packet to consume
211         */
212        public void consume(Packet packet) {
213            //do processing
214            //avoid a lock
215            PacketListener listener = getPacketListener();
216            if (listener != null) {
217                listener.consume(packet);
218            }
219        }
220    
221        /**
222         * handle exception from the embedded channel
223         * 
224         * @param jmsEx
225         */
226        public void onException(JMSException jmsEx) {
227            TransportChannel tc = this.channel;
228            if (jmsEx instanceof UnsupportedWireFormatException) {
229                fireException(jmsEx);
230            }
231            else {
232                try {
233                    doReconnect(tc, getEstablishConnectionTimeout());
234                }
235                catch (JMSException ex) {
236                    ex.setLinkedException(jmsEx);
237                    fireException(ex);
238                }
239            }
240        }
241    
242        /**
243         * stop this channel
244         */
245        public void stop() {
246            super.stop();
247            fireStatusEvent(super.currentURI, TransportStatusEvent.STOPPED);
248        }
249    
250        /**
251         * Fire a JMSException to the exception listener
252         * 
253         * @param jmsEx
254         */
255        protected void fireException(JMSException jmsEx) {
256            ExceptionListener listener = getExceptionListener();
257            if (listener != null) {
258                listener.onException(jmsEx);
259            }
260        }
261    
262        protected TransportChannel getEstablishedChannel(long timeout) throws JMSException {
263            if (!closed.get() && this.channel == null && !isPendingStop()) {
264                establishConnection(timeout);
265            }
266            return this.channel;
267        }
268    
269        protected void doReconnect(TransportChannel currentChannel, long timeout) throws JMSException {
270            setTransportConnected(false);
271            if (!closed.get() && !isPendingStop()) {
272                synchronized (lock) {
273                    //Loss of connectivity can be signalled from more than one
274                    //thread - hence the check here - we want to avoid doing it more than once
275                    if (this.channel == currentChannel) {
276                        fireStatusEvent(super.currentURI, TransportStatusEvent.DISCONNECTED);
277                        try {
278                            establishConnection(timeout);
279                        }
280                        catch (JMSException jmsEx) {
281                            fireStatusEvent(super.currentURI, TransportStatusEvent.FAILED);
282                            throw jmsEx;
283                        }
284                        setTransportConnected(true);
285                        fireStatusEvent(super.currentURI, TransportStatusEvent.RECONNECTED);
286                    }
287                }
288            }
289        }
290        
291        protected void doClose() {
292                if (!closed.get()) {
293                        if (!isPendingStop()) {
294                                setPendingStop(true);
295                                stop();
296                        }
297                closed.set(true);
298                }
299        }
300    }