001 /* 002 * Copyright (c) 2005 Your Corporation. All Rights Reserved. 003 */ 004 package org.activemq.transport.stomp; 005 006 import EDU.oswego.cs.dl.util.concurrent.Channel; 007 import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap; 008 import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArrayList; 009 import EDU.oswego.cs.dl.util.concurrent.LinkedQueue; 010 import org.activemq.io.WireFormat; 011 import org.activemq.message.ActiveMQDestination; 012 import org.activemq.message.ActiveMQTextMessage; 013 import org.activemq.message.ConnectionInfo; 014 import org.activemq.message.ConsumerInfo; 015 import org.activemq.message.Packet; 016 import org.activemq.message.Receipt; 017 import org.activemq.message.SessionInfo; 018 import org.activemq.message.ActiveMQBytesMessage; 019 import org.activemq.util.IdGenerator; 020 021 import javax.jms.JMSException; 022 import javax.jms.Session; 023 import java.io.BufferedReader; 024 import java.io.DataInput; 025 import java.io.DataInputStream; 026 import java.io.DataOutput; 027 import java.io.DataOutputStream; 028 import java.io.IOException; 029 import java.io.InputStreamReader; 030 import java.net.DatagramPacket; 031 import java.net.ProtocolException; 032 import java.util.List; 033 import java.util.Map; 034 import java.util.Properties; 035 036 /** 037 * Implements the TTMP protocol. 038 */ 039 public class StompWireFormat implements WireFormat 040 { 041 042 static final IdGenerator PACKET_IDS = new IdGenerator(); 043 static final IdGenerator clientIds = new IdGenerator(); 044 045 private CommandParser commandParser = new CommandParser(this); 046 private HeaderParser headerParser = new HeaderParser(); 047 048 private DataInputStream in; 049 050 private String clientId; 051 052 private Channel pendingReadPackets = new LinkedQueue(); 053 private Channel pendingWriteFrames = new LinkedQueue(); 054 private List receiptListeners = new CopyOnWriteArrayList(); 055 private short sessionId; 056 private Map subscriptions = new ConcurrentHashMap(); 057 private List ackListeners = new CopyOnWriteArrayList(); 058 private final Map transactions = new ConcurrentHashMap(); 059 060 061 062 void addReceiptListener(ReceiptListener listener) 063 { 064 receiptListeners.add(listener); 065 } 066 067 068 public Packet readPacket(DataInput in) throws IOException 069 { 070 Packet pending = (Packet) AsyncHelper.tryUntilNotInterrupted(new AsyncHelper.HelperWithReturn() 071 { 072 public Object cycle() throws InterruptedException 073 { 074 return pendingReadPackets.poll(0); 075 } 076 }); 077 if (pending != null) 078 { 079 return pending; 080 } 081 082 try 083 { 084 return commandParser.parse(in); 085 } 086 catch (ProtocolException e) 087 { 088 sendError(e.getMessage()); 089 return FlushPacket.PACKET; 090 } 091 } 092 093 public Packet writePacket(final Packet packet, final DataOutput out) throws IOException, JMSException 094 { 095 flushPendingFrames(out); 096 097 // It may have just been a flush request. 098 if( packet == null ) 099 return null; 100 101 if (packet.getPacketType() == Packet.RECEIPT_INFO) 102 { 103 assert(packet instanceof Receipt); 104 Receipt receipt = (Receipt) packet; 105 for (int i = 0; i < receiptListeners.size(); i++) 106 { 107 ReceiptListener listener = (ReceiptListener) receiptListeners.get(i); 108 if (listener.onReceipt(receipt, out)) 109 { 110 receiptListeners.remove(listener); 111 return null; 112 } 113 } 114 } 115 116 if (packet.getPacketType() == Packet.ACTIVEMQ_TEXT_MESSAGE) 117 { 118 assert(packet instanceof ActiveMQTextMessage); 119 ActiveMQTextMessage msg = (ActiveMQTextMessage) packet; 120 Subscription sub = (Subscription) subscriptions.get(msg.getJMSDestination()); 121 sub.receive(msg, out); 122 } 123 else if (packet.getPacketType() == Packet.ACTIVEMQ_BYTES_MESSAGE) 124 { 125 assert(packet instanceof ActiveMQBytesMessage); 126 ActiveMQBytesMessage msg = (ActiveMQBytesMessage) packet; 127 Subscription sub = (Subscription) subscriptions.get(msg.getJMSDestination()); 128 sub.receive(msg, out); 129 } 130 return null; 131 } 132 133 private void flushPendingFrames(final DataOutput out) throws IOException 134 { 135 boolean interrupted = false; 136 do 137 { 138 try 139 { 140 byte[] frame = (byte[]) pendingWriteFrames.poll(0); 141 if (frame == null) return; 142 out.write(frame); 143 } 144 catch (InterruptedException e) 145 { 146 interrupted = true; 147 } 148 } 149 while (interrupted); 150 } 151 152 private void sendError(final String message) 153 { 154 // System.err.println("sending error [" + message + "]"); 155 AsyncHelper.tryUntilNotInterrupted(new AsyncHelper.Helper() 156 { 157 public void cycle() throws InterruptedException 158 { 159 pendingWriteFrames.put(new FrameBuilder(Stomp.Responses.ERROR) 160 .addHeader(Stomp.Headers.Error.MESSAGE, message) 161 .toFrame()); 162 } 163 }); 164 } 165 166 /** 167 * some transports may register their streams (e.g. Tcp) 168 * 169 * @param dataOut 170 * @param dataIn 171 */ 172 public void registerTransportStreams(DataOutputStream dataOut, DataInputStream dataIn) 173 { 174 this.in = dataIn; 175 } 176 177 /** 178 * Some wire formats require a handshake at start-up 179 * 180 * @throws java.io.IOException 181 */ 182 public void initiateServerSideProtocol() throws IOException 183 { 184 BufferedReader in = new BufferedReader(new InputStreamReader(this.in)); 185 String first_line = in.readLine(); 186 if (!first_line.startsWith(Stomp.Commands.CONNECT)) 187 { 188 throw new IOException("First line does not begin with with " + Stomp.Commands.CONNECT); 189 } 190 191 Properties headers = headerParser.parse(in); 192 //if (!headers.containsKey(TTMP.Headers.Connect.LOGIN)) 193 // System.err.println("Required header [" + TTMP.Headers.Connect.LOGIN + "] missing"); 194 //if (!headers.containsKey(TTMP.Headers.Connect.PASSCODE)) 195 // System.err.println("Required header [" + TTMP.Headers.Connect.PASSCODE + "] missing"); 196 197 // allow anyone to login for now 198 199 String login = headers.getProperty(Stomp.Headers.Connect.LOGIN); 200 String passcode = headers.getProperty(Stomp.Headers.Connect.PASSCODE); 201 202 // skip to end of the packet 203 while (in.read() != 0) {} 204 final ConnectionInfo info = new ConnectionInfo(); 205 final Short packet_id = new Short(PACKET_IDS.getNextShortSequence()); 206 clientId = clientIds.generateId(); 207 commandParser.setClientId(clientId); 208 209 info.setClientId(clientId); 210 info.setReceiptRequired(true); 211 info.setClientVersion(Integer.toString(getCurrentWireFormatVersion())); 212 info.setClosed(false); 213 info.setHostName("ttmp.fake.host.name"); 214 info.setId(packet_id.shortValue()); 215 info.setUserName(login); 216 info.setPassword(passcode); 217 info.setStartTime(System.currentTimeMillis()); 218 info.setStarted(true); 219 220 AsyncHelper.tryUntilNotInterrupted(new AsyncHelper.Helper() 221 { 222 public void cycle() throws InterruptedException 223 { 224 pendingReadPackets.put(info); 225 } 226 }); 227 228 addReceiptListener(new ReceiptListener() 229 { 230 public boolean onReceipt(Receipt receipt, DataOutput out) 231 { 232 if (receipt.getCorrelationId() != packet_id.shortValue()) return false; 233 final Short session_packet_id = new Short(PACKET_IDS.getNextShortSequence()); 234 sessionId = clientIds.getNextShortSequence(); 235 236 final SessionInfo info = new SessionInfo(); 237 info.setStartTime(System.currentTimeMillis()); 238 info.setId(session_packet_id.shortValue()); 239 info.setClientId(clientId); 240 info.setSessionId(sessionId); 241 info.setStarted(true); 242 info.setSessionMode(Session.AUTO_ACKNOWLEDGE); 243 info.setReceiptRequired(true); 244 245 AsyncHelper.tryUntilNotInterrupted(new AsyncHelper.Helper() 246 { 247 public void cycle() throws InterruptedException 248 { 249 pendingReadPackets.put(info); 250 } 251 }); 252 253 addReceiptListener(new ReceiptListener() 254 { 255 public boolean onReceipt(Receipt receipt, DataOutput out) throws IOException 256 { 257 if (receipt.getCorrelationId() != session_packet_id.shortValue()) return false; 258 StringBuffer buffer = new StringBuffer(); 259 buffer.append(Stomp.Responses.CONNECTED).append(Stomp.NEWLINE); 260 buffer.append(Stomp.Headers.Connected.SESSION) 261 .append(Stomp.Headers.SEPERATOR) 262 .append(clientId) 263 .append(Stomp.NEWLINE) 264 .append(Stomp.NEWLINE); 265 buffer.append(Stomp.NULL); 266 out.writeBytes(buffer.toString()); 267 return true; 268 } 269 }); 270 271 return true; 272 } 273 }); 274 } 275 276 /** 277 * Creates a new copy of this wire format so it can be used in another thread/context 278 */ 279 public WireFormat copy() 280 { 281 return new StompWireFormat(); 282 } 283 284 /* Stuff below here is leaky stuff we don't actually need */ 285 286 /** 287 * Some wire formats require a handshake at start-up 288 * 289 * @throws java.io.IOException 290 */ 291 public void initiateClientSideProtocol() throws IOException 292 { 293 throw new UnsupportedOperationException("Not yet implemented!"); 294 } 295 296 /** 297 * Can this wireformat process packets of this version 298 * 299 * @param version the version number to test 300 * @return true if can accept the version 301 */ 302 public boolean canProcessWireFormatVersion(int version) 303 { 304 return version == getCurrentWireFormatVersion(); 305 } 306 307 /** 308 * @return the current version of this wire format 309 */ 310 public int getCurrentWireFormatVersion() 311 { 312 return 1; 313 } 314 315 /** 316 * @return Returns the enableCaching. 317 */ 318 public boolean isCachingEnabled() 319 { 320 return false; 321 } 322 323 /** 324 * @param enableCaching The enableCaching to set. 325 */ 326 public void setCachingEnabled(boolean enableCaching) 327 { 328 // never 329 } 330 331 /** 332 * some wire formats will implement their own fragementation 333 * 334 * @return true unless a wire format supports it's own fragmentation 335 */ 336 public boolean doesSupportMessageFragmentation() 337 { 338 return false; 339 } 340 341 /** 342 * Some wire formats will not be able to understand compressed messages 343 * 344 * @return true unless a wire format cannot understand compression 345 */ 346 public boolean doesSupportMessageCompression() 347 { 348 return false; 349 } 350 351 /** 352 * Writes the given package to a new datagram 353 * 354 * @param channelID is the unique channel ID 355 * @param packet is the packet to write 356 * @return 357 * @throws java.io.IOException 358 * @throws javax.jms.JMSException 359 */ 360 public DatagramPacket writePacket(String channelID, Packet packet) throws IOException, JMSException 361 { 362 throw new UnsupportedOperationException("Will not be implemented"); 363 } 364 365 /** 366 * Reads the packet from the given byte[] 367 * 368 * @param bytes 369 * @param offset 370 * @param length 371 * @return 372 * @throws java.io.IOException 373 */ 374 public Packet fromBytes(byte[] bytes, int offset, int length) throws IOException 375 { 376 throw new UnsupportedOperationException("Will not be implemented"); 377 } 378 379 /** 380 * Reads the packet from the given byte[] 381 * 382 * @param bytes 383 * @return 384 * @throws java.io.IOException 385 */ 386 public Packet fromBytes(byte[] bytes) throws IOException 387 { 388 throw new UnsupportedOperationException("Will not be implemented"); 389 } 390 391 /** 392 * A helper method which converts a packet into a byte array 393 * 394 * @param packet 395 * @return a byte array representing the packet using some wire protocol 396 * @throws java.io.IOException 397 * @throws javax.jms.JMSException 398 */ 399 public byte[] toBytes(Packet packet) throws IOException, JMSException 400 { 401 throw new UnsupportedOperationException("Will not be implemented"); 402 } 403 404 /** 405 * A helper method for working with sockets where the first byte is read 406 * first, then the rest of the message is read. 407 * <p/> 408 * Its common when dealing with sockets to have different timeout semantics 409 * until the first non-zero byte is read of a message, after which 410 * time a zero timeout is used. 411 * 412 * @param firstByte the first byte of the packet 413 * @param in the rest of the packet 414 * @return 415 * @throws java.io.IOException 416 */ 417 public Packet readPacket(int firstByte, DataInput in) throws IOException 418 { 419 throw new UnsupportedOperationException("Will not be implemented"); 420 } 421 422 /** 423 * Read a packet from a Datagram packet from the given channelID. If the 424 * packet is from the same channel ID as it was sent then we have a 425 * loop-back so discard the packet 426 * 427 * @param channelID is the unique channel ID 428 * @param dpacket 429 * @return the packet read from the datagram or null if it should be 430 * discarded 431 * @throws java.io.IOException 432 */ 433 public Packet readPacket(String channelID, DatagramPacket dpacket) throws IOException 434 { 435 throw new UnsupportedOperationException("Will not be implemented"); 436 } 437 438 void clearTransactionId(String user_tx_id) 439 { 440 this.transactions.remove(user_tx_id); 441 } 442 443 String getClientId() 444 { 445 return this.clientId; 446 } 447 448 public short getSessionId() 449 { 450 return sessionId; 451 } 452 453 public void addSubscription(Subscription s) 454 { 455 if (subscriptions.containsKey(s.getDestination())) 456 { 457 Subscription old = (Subscription) subscriptions.get(s.getDestination()); 458 ConsumerInfo p = old.close(); 459 enqueuePacket(p); 460 subscriptions.put(s.getDestination(), s); 461 } 462 else 463 { 464 subscriptions.put(s.getDestination(), s); 465 } 466 } 467 468 public void enqueuePacket(final Packet ack) 469 { 470 AsyncHelper.tryUntilNotInterrupted(new AsyncHelper.Helper() 471 { 472 public void cycle() throws InterruptedException 473 { 474 pendingReadPackets.put(ack); 475 } 476 }); 477 } 478 479 public Subscription getSubscriptionFor(ActiveMQDestination destination) 480 { 481 return (Subscription) subscriptions.get(destination); 482 } 483 484 public void addAckListener(AckListener listener) 485 { 486 this.ackListeners.add(listener); 487 } 488 489 public List getAckListeners() 490 { 491 return ackListeners; 492 } 493 494 public String getTransactionId(String key) 495 { 496 return (String) transactions.get(key); 497 } 498 499 public void registerTransactionId(String user_tx_id, String tx_id) 500 { 501 transactions.put(user_tx_id, tx_id); 502 } 503 }