001    /** 
002     * 
003     * Copyright 2004 Protique Ltd
004     * 
005     * Licensed under the Apache License, Version 2.0 (the "License"); 
006     * you may not use this file except in compliance with the License. 
007     * You may obtain a copy of the License at 
008     * 
009     * http://www.apache.org/licenses/LICENSE-2.0
010     * 
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS, 
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
014     * See the License for the specific language governing permissions and 
015     * limitations under the License. 
016     * 
017     **/
018    package org.activemq.transport.http;
019    
020    import org.apache.commons.logging.Log;
021    import org.apache.commons.logging.LogFactory;
022    import org.activemq.io.TextWireFormat;
023    import org.activemq.message.Packet;
024    import org.activemq.util.Callback;
025    import org.activemq.util.ExceptionTemplate;
026    import org.activemq.util.JMSExceptionHelper;
027    
028    import javax.jms.JMSException;
029    import java.io.DataInputStream;
030    import java.io.IOException;
031    import java.io.OutputStreamWriter;
032    import java.io.Writer;
033    import java.net.HttpURLConnection;
034    import java.net.MalformedURLException;
035    import java.net.URL;
036    
037    /**
038     * @version $Revision$
039     */
040    public class HttpTransportChannel extends HttpTransportChannelSupport {
041        private static final Log log = LogFactory.getLog(HttpTransportChannel.class);
042        private URL url;
043        private HttpURLConnection sendConnection;
044        private HttpURLConnection receiveConnection;
045    
046    
047        public HttpTransportChannel(TextWireFormat wireFormat, String remoteUrl) throws MalformedURLException {
048            super(wireFormat, remoteUrl);
049            url = new URL(remoteUrl);
050        }
051    
052        public void asyncSend(Packet packet) throws JMSException {
053            try {
054                
055                HttpURLConnection connection = getSendConnection();
056                String text = getTextWireFormat().toString(packet);
057                Writer writer = new OutputStreamWriter(connection.getOutputStream());
058                writer.write(text);
059                writer.flush();
060                int answer = connection.getResponseCode();
061                if (answer != HttpURLConnection.HTTP_OK) {
062                    throw new JMSException("Failed to post packet: " + packet + " as response was: " + answer);
063                }
064            }
065            catch (IOException e) {
066                throw JMSExceptionHelper.newJMSException("Could not post packet: " + packet + " due to: " + e, e);
067            }
068        }
069    
070        public void stop() {
071            ExceptionTemplate template = new ExceptionTemplate();
072            if (sendConnection != null) {
073                template.run(new Callback() {
074                    public void execute() throws Throwable {
075                        sendConnection.disconnect();
076                    }
077                });
078            }
079            if (receiveConnection != null) {
080                template.run(new Callback() {
081                    public void execute() throws Throwable {
082                        receiveConnection.disconnect();
083                    }
084                });
085            }
086            super.stop();
087            Throwable firstException = template.getFirstException();
088            if (firstException != null) {
089                log.warn("Failed to shut down cleanly: " + firstException, firstException);
090            }
091        }
092    
093        public boolean isMulticast() {
094            return false;
095        }
096    
097        public void run() {
098            log.trace("HTTP GET consumer thread starting for clientID: " + getClientID());
099            String remoteUrl = getRemoteUrl();
100            while (!getClosed().get()) {
101                try {
102                    HttpURLConnection connection = getReceiveConnection();
103                    int answer = connection.getResponseCode();
104                    if (answer != HttpURLConnection.HTTP_OK) {
105                        if (answer == HttpURLConnection.HTTP_CLIENT_TIMEOUT) {
106                            log.trace("GET timed out");
107                        }
108                        else {
109                            log.warn("Failed to perform GET on: " + remoteUrl + " as response was: " + answer);
110                        }
111                    }
112                    else {
113                        Packet packet = getWireFormat().readPacket(new DataInputStream(connection.getInputStream()));
114                        //Packet packet = getWireFormat().fromString(connection.getContent().toString());
115                        if (packet == null) {
116                            log.warn("Received null packet from url: " + remoteUrl);
117                        }
118                        else {
119                            doConsumePacket(packet);
120                        }
121                    }
122                }
123                catch (Exception e) {
124                    if (!getClosed().get()) {
125                        log.warn("Failed to perform GET on: " + remoteUrl + " due to: " + e, e);
126                    }
127                    else {
128                        log.trace("Caught error after closed: " + e, e);
129                    }
130                }
131            }
132        }
133    
134    
135        // Implementation methods
136        //-------------------------------------------------------------------------
137        protected HttpURLConnection createSendConnection() throws IOException {
138            HttpURLConnection conn = (HttpURLConnection) getRemoteURL().openConnection();
139            conn.setDoOutput(true);
140            conn.setRequestMethod("POST");
141            configureConnection(conn);
142            conn.connect();
143            return conn;
144        }
145    
146        protected HttpURLConnection createReceiveConnection() throws IOException {
147            HttpURLConnection conn = (HttpURLConnection) getRemoteURL().openConnection();
148            conn.setDoOutput(false);
149            conn.setDoInput(true);
150            conn.setRequestMethod("GET");
151            configureConnection(conn);
152            conn.connect();
153            return conn;
154        }
155    
156        protected void configureConnection(HttpURLConnection connection) {
157            String clientID = getClientID();
158            if (clientID != null) {
159                connection.setRequestProperty("clientID", clientID);
160                //connection.addRequestProperty("clientID", clientID);
161            }
162        }
163        
164        protected URL getRemoteURL() {
165            return url;
166        }
167        
168        protected HttpURLConnection getSendConnection() throws IOException {
169            setSendConnection( createSendConnection() );
170            return sendConnection;
171        }
172        
173        protected HttpURLConnection getReceiveConnection() throws IOException {
174            setReceiveConnection( createReceiveConnection() );
175            return receiveConnection;
176        }
177        
178        protected void setSendConnection( HttpURLConnection conn ) {
179            if ( sendConnection != null ) {
180                    sendConnection.disconnect();
181            }
182            sendConnection = conn;
183        }
184        
185        protected void setReceiveConnection( HttpURLConnection conn ) {
186            if ( receiveConnection != null ) {
187                    receiveConnection.disconnect();
188            }
189            receiveConnection = conn;
190        }
191        
192            public void forceDisconnect() {
193                // TODO: implement me.
194                    throw new RuntimeException("Not yet Implemented.");
195            }
196        
197    }