001 /** 002 * 003 * Copyright 2004 Protique Ltd 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 * 017 **/ 018 019 package org.activemq.message; 020 021 import javax.jms.JMSException; 022 import javax.jms.MessageEOFException; 023 import javax.jms.MessageFormatException; 024 import javax.jms.MessageNotReadableException; 025 import javax.jms.MessageNotWriteableException; 026 import javax.jms.StreamMessage; 027 028 import org.activemq.io.util.ByteArray; 029 import org.activemq.io.util.ByteArrayCompression; 030 031 import java.io.ByteArrayInputStream; 032 import java.io.ByteArrayOutputStream; 033 import java.io.DataInputStream; 034 import java.io.DataOutputStream; 035 import java.io.EOFException; 036 import java.io.IOException; 037 038 /** 039 * A <CODE>StreamMessage</CODE> object is used to send a stream of primitive 040 * types in the Java programming language. It is filled and read sequentially. 041 * It inherits from the <CODE>Message</CODE> interface 042 * and adds a stream message body. Its methods are based largely on those 043 * found in <CODE>java.io.DataInputStream</CODE> and 044 * <CODE>java.io.DataOutputStream</CODE>. 045 * <p/> 046 * <P>The primitive types can be read or written explicitly using methods 047 * for each type. They may also be read or written generically as objects. 048 * For instance, a call to <CODE>StreamMessage.writeInt(6)</CODE> is 049 * equivalent to <CODE>StreamMessage.writeObject(new Integer(6))</CODE>. 050 * Both forms are provided, because the explicit form is convenient for 051 * static programming, and the object form is needed when types are not known 052 * at compile time. 053 * <p/> 054 * <P>When the message is first created, and when <CODE>clearBody</CODE> 055 * is called, the body of the message is in write-only mode. After the 056 * first call to <CODE>reset</CODE> has been made, the message body is in 057 * read-only mode. 058 * After a message has been sent, the client that sent it can retain and 059 * modify it without affecting the message that has been sent. The same message 060 * object can be sent multiple times. 061 * When a message has been received, the provider has called 062 * <CODE>reset</CODE> so that the message body is in read-only mode for the client. 063 * <p/> 064 * <P>If <CODE>clearBody</CODE> is called on a message in read-only mode, 065 * the message body is cleared and the message body is in write-only mode. 066 * <p/> 067 * <P>If a client attempts to read a message in write-only mode, a 068 * <CODE>MessageNotReadableException</CODE> is thrown. 069 * <p/> 070 * <P>If a client attempts to write a message in read-only mode, a 071 * <CODE>MessageNotWriteableException</CODE> is thrown. 072 * <p/> 073 * <P><CODE>StreamMessage</CODE> objects support the following conversion 074 * table. The marked cases must be supported. The unmarked cases must throw a 075 * <CODE>JMSException</CODE>. The <CODE>String</CODE>-to-primitive conversions 076 * may throw a runtime exception if the primitive's <CODE>valueOf()</CODE> 077 * method does not accept it as a valid <CODE>String</CODE> representation of 078 * the primitive. 079 * <p/> 080 * <P>A value written as the row type can be read as the column type. 081 * <p/> 082 * <PRE> 083 * | | boolean byte short char int long float double String byte[] 084 * |---------------------------------------------------------------------- 085 * |boolean | X X 086 * |byte | X X X X X 087 * |short | X X X X 088 * |char | X X 089 * |int | X X X 090 * |long | X X 091 * |float | X X X 092 * |double | X X 093 * |String | X X X X X X X X 094 * |byte[] | X 095 * |---------------------------------------------------------------------- 096 * </PRE> 097 * <p/> 098 * <P>Attempting to read a null value as a primitive type must be treated 099 * as calling the primitive's corresponding <code>valueOf(String)</code> 100 * conversion method with a null value. Since <code>char</code> does not 101 * support a <code>String</code> conversion, attempting to read a null value 102 * as a <code>char</code> must throw a <code>NullPointerException</code>. 103 * 104 * @see javax.jms.Session#createStreamMessage() 105 * @see javax.jms.BytesMessage 106 * @see javax.jms.MapMessage 107 * @see javax.jms.Message 108 * @see javax.jms.ObjectMessage 109 * @see javax.jms.TextMessage 110 */ 111 112 public class ActiveMQStreamMessage extends ActiveMQMessage implements StreamMessage { 113 114 private DataOutputStream dataOut; 115 private ByteArrayOutputStream bytesOut; 116 private DataInputStream dataIn; 117 private int bytesToRead = -1; 118 119 120 /** 121 * Return the type of Packet 122 * 123 * @return integer representation of the type of Packet 124 */ 125 126 public int getPacketType() { 127 return ACTIVEMQ_STREAM_MESSAGE; 128 } 129 130 /** 131 * @return Returns a shallow copy of the message instance 132 * @throws JMSException 133 */ 134 135 public ActiveMQMessage shallowCopy() throws JMSException { 136 ActiveMQStreamMessage other = new ActiveMQStreamMessage(); 137 this.initializeOther(other); 138 try { 139 other.setBodyAsBytes(this.getBodyAsBytes()); 140 } 141 catch (IOException e) { 142 JMSException jmsEx = new JMSException("setBodyAsBytes() failed"); 143 jmsEx.setLinkedException(e); 144 throw jmsEx; 145 } 146 return other; 147 } 148 149 /** 150 * @return Returns a deep copy of the message - note the header fields are only shallow copied 151 * @throws JMSException 152 */ 153 154 public ActiveMQMessage deepCopy() throws JMSException { 155 ActiveMQStreamMessage other = new ActiveMQStreamMessage(); 156 this.initializeOther(other); 157 try { 158 if (this.getBodyAsBytes() != null) { 159 ByteArray data = this.getBodyAsBytes().copy(); 160 other.setBodyAsBytes(data); 161 } 162 } 163 catch (IOException e) { 164 JMSException jmsEx = new JMSException("setBodyAsBytes() failed"); 165 jmsEx.setLinkedException(e); 166 throw jmsEx; 167 } 168 return other; 169 } 170 171 172 /** 173 * Clears out the message body. Clearing a message's body does not clear 174 * its header values or property entries. 175 * <p/> 176 * <P>If this message body was read-only, calling this method leaves 177 * the message body in the same state as an empty body in a newly 178 * created message. 179 * 180 * @throws JMSException if the JMS provider fails to clear the message 181 * body due to some internal error. 182 */ 183 184 public void clearBody() throws JMSException { 185 super.clearBody(); 186 this.dataOut = null; 187 this.dataIn = null; 188 this.bytesOut = null; 189 } 190 191 /** 192 * Reads a <code>boolean</code> from the stream message. 193 * 194 * @return the <code>boolean</code> value read 195 * @throws JMSException if the JMS provider fails to read the message 196 * due to some internal error. 197 * @throws MessageEOFException if unexpected end of message stream has 198 * been reached. 199 * @throws MessageFormatException if this type conversion is invalid. 200 * @throws MessageNotReadableException if the message is in write-only 201 * mode. 202 */ 203 204 public boolean readBoolean() throws JMSException { 205 initializeReading(); 206 try { 207 if (this.dataIn.available() == 0) { 208 throw new MessageEOFException("reached end of data"); 209 } 210 211 this.dataIn.mark(10); 212 int type = this.dataIn.read(); 213 if (type == BOOLEAN) { 214 return this.dataIn.readBoolean(); 215 } 216 if (type == STRING) { 217 return Boolean.valueOf(this.dataIn.readUTF()).booleanValue(); 218 } 219 if (type == NULL) { 220 this.dataIn.reset(); 221 throw new NullPointerException("Cannot convert NULL value to boolean."); 222 } 223 else { 224 this.dataIn.reset(); 225 throw new MessageFormatException(" not a boolean type"); 226 } 227 } 228 catch (EOFException e) { 229 JMSException jmsEx = new MessageEOFException(e.getMessage()); 230 jmsEx.setLinkedException(e); 231 throw jmsEx; 232 } 233 catch (IOException e) { 234 JMSException jmsEx = new MessageFormatException(e.getMessage()); 235 jmsEx.setLinkedException(e); 236 throw jmsEx; 237 } 238 } 239 240 241 /** 242 * Reads a <code>byte</code> value from the stream message. 243 * 244 * @return the next byte from the stream message as a 8-bit 245 * <code>byte</code> 246 * @throws JMSException if the JMS provider fails to read the message 247 * due to some internal error. 248 * @throws MessageEOFException if unexpected end of message stream has 249 * been reached. 250 * @throws MessageFormatException if this type conversion is invalid. 251 * @throws MessageNotReadableException if the message is in write-only 252 * mode. 253 */ 254 255 public byte readByte() throws JMSException { 256 initializeReading(); 257 try { 258 if (this.dataIn.available() == 0) { 259 throw new MessageEOFException("reached end of data"); 260 } 261 262 this.dataIn.mark(10); 263 int type = this.dataIn.read(); 264 if (type == BYTE) { 265 return this.dataIn.readByte(); 266 } 267 if (type == STRING) { 268 return Byte.valueOf(this.dataIn.readUTF()).byteValue(); 269 } 270 if (type == NULL) { 271 this.dataIn.reset(); 272 throw new NullPointerException("Cannot convert NULL value to byte."); 273 } 274 else { 275 this.dataIn.reset(); 276 throw new MessageFormatException(" not a byte type"); 277 } 278 } 279 catch (NumberFormatException mfe) { 280 try { 281 this.dataIn.reset(); 282 } 283 catch (IOException ioe) { 284 JMSException jmsEx = new JMSException("reset failed"); 285 jmsEx.setLinkedException(ioe); 286 } 287 throw mfe; 288 289 } 290 catch (EOFException e) { 291 JMSException jmsEx = new MessageEOFException(e.getMessage()); 292 jmsEx.setLinkedException(e); 293 throw jmsEx; 294 } 295 catch (IOException e) { 296 JMSException jmsEx = new MessageFormatException(e.getMessage()); 297 jmsEx.setLinkedException(e); 298 throw jmsEx; 299 } 300 } 301 302 303 /** 304 * Reads a 16-bit integer from the stream message. 305 * 306 * @return a 16-bit integer from the stream message 307 * @throws JMSException if the JMS provider fails to read the message 308 * due to some internal error. 309 * @throws MessageEOFException if unexpected end of message stream has 310 * been reached. 311 * @throws MessageFormatException if this type conversion is invalid. 312 * @throws MessageNotReadableException if the message is in write-only 313 * mode. 314 */ 315 316 public short readShort() throws JMSException { 317 initializeReading(); 318 try { 319 if (this.dataIn.available() == 0) { 320 throw new MessageEOFException("reached end of data"); 321 } 322 323 this.dataIn.mark(17); 324 int type = this.dataIn.read(); 325 if (type == SHORT) { 326 return this.dataIn.readShort(); 327 } 328 if (type == BYTE) { 329 return this.dataIn.readByte(); 330 } 331 if (type == STRING) { 332 return Short.valueOf(this.dataIn.readUTF()).shortValue(); 333 } 334 if (type == NULL) { 335 this.dataIn.reset(); 336 throw new NullPointerException("Cannot convert NULL value to short."); 337 } 338 else { 339 this.dataIn.reset(); 340 throw new MessageFormatException(" not a short type"); 341 } 342 } 343 catch (NumberFormatException mfe) { 344 try { 345 this.dataIn.reset(); 346 } 347 catch (IOException ioe) { 348 JMSException jmsEx = new JMSException("reset failed"); 349 jmsEx.setLinkedException(ioe); 350 } 351 throw mfe; 352 353 } 354 catch (EOFException e) { 355 JMSException jmsEx = new MessageEOFException(e.getMessage()); 356 jmsEx.setLinkedException(e); 357 throw jmsEx; 358 } 359 catch (IOException e) { 360 JMSException jmsEx = new MessageFormatException(e.getMessage()); 361 jmsEx.setLinkedException(e); 362 throw jmsEx; 363 } 364 365 } 366 367 368 /** 369 * Reads a Unicode character value from the stream message. 370 * 371 * @return a Unicode character from the stream message 372 * @throws JMSException if the JMS provider fails to read the message 373 * due to some internal error. 374 * @throws MessageEOFException if unexpected end of message stream has 375 * been reached. 376 * @throws MessageFormatException if this type conversion is invalid 377 * @throws MessageNotReadableException if the message is in write-only 378 * mode. 379 */ 380 381 public char readChar() throws JMSException { 382 initializeReading(); 383 try { 384 if (this.dataIn.available() == 0) { 385 throw new MessageEOFException("reached end of data"); 386 } 387 388 this.dataIn.mark(17); 389 int type = this.dataIn.read(); 390 if (type == CHAR) { 391 return this.dataIn.readChar(); 392 } 393 if (type == NULL) { 394 this.dataIn.reset(); 395 throw new NullPointerException("Cannot convert NULL value to char."); 396 } else { 397 this.dataIn.reset(); 398 throw new MessageFormatException(" not a char type"); 399 } 400 } 401 catch (NumberFormatException mfe) { 402 try { 403 this.dataIn.reset(); 404 } 405 catch (IOException ioe) { 406 JMSException jmsEx = new JMSException("reset failed"); 407 jmsEx.setLinkedException(ioe); 408 } 409 throw mfe; 410 411 } 412 catch (EOFException e) { 413 JMSException jmsEx = new MessageEOFException(e.getMessage()); 414 jmsEx.setLinkedException(e); 415 throw jmsEx; 416 } 417 catch (IOException e) { 418 JMSException jmsEx = new MessageFormatException(e.getMessage()); 419 jmsEx.setLinkedException(e); 420 throw jmsEx; 421 } 422 } 423 424 425 /** 426 * Reads a 32-bit integer from the stream message. 427 * 428 * @return a 32-bit integer value from the stream message, interpreted 429 * as an <code>int</code> 430 * @throws JMSException if the JMS provider fails to read the message 431 * due to some internal error. 432 * @throws MessageEOFException if unexpected end of message stream has 433 * been reached. 434 * @throws MessageFormatException if this type conversion is invalid. 435 * @throws MessageNotReadableException if the message is in write-only 436 * mode. 437 */ 438 439 public int readInt() throws JMSException { 440 initializeReading(); 441 try { 442 if (this.dataIn.available() == 0) { 443 throw new MessageEOFException("reached end of data"); 444 } 445 446 this.dataIn.mark(33); 447 int type = this.dataIn.read(); 448 if (type == INT) { 449 return this.dataIn.readInt(); 450 } 451 if (type == SHORT) { 452 return this.dataIn.readShort(); 453 } 454 if (type == BYTE) { 455 return this.dataIn.readByte(); 456 } 457 if (type == STRING) { 458 return Integer.valueOf(this.dataIn.readUTF()).intValue(); 459 } 460 if (type == NULL) { 461 this.dataIn.reset(); 462 throw new NullPointerException("Cannot convert NULL value to int."); 463 } 464 else { 465 this.dataIn.reset(); 466 throw new MessageFormatException(" not an int type"); 467 } 468 } 469 catch (NumberFormatException mfe) { 470 try { 471 this.dataIn.reset(); 472 } 473 catch (IOException ioe) { 474 JMSException jmsEx = new JMSException("reset failed"); 475 jmsEx.setLinkedException(ioe); 476 } 477 throw mfe; 478 479 } 480 catch (EOFException e) { 481 JMSException jmsEx = new MessageEOFException(e.getMessage()); 482 jmsEx.setLinkedException(e); 483 throw jmsEx; 484 } 485 catch (IOException e) { 486 JMSException jmsEx = new MessageFormatException(e.getMessage()); 487 jmsEx.setLinkedException(e); 488 throw jmsEx; 489 } 490 } 491 492 493 /** 494 * Reads a 64-bit integer from the stream message. 495 * 496 * @return a 64-bit integer value from the stream message, interpreted as 497 * a <code>long</code> 498 * @throws JMSException if the JMS provider fails to read the message 499 * due to some internal error. 500 * @throws MessageEOFException if unexpected end of message stream has 501 * been reached. 502 * @throws MessageFormatException if this type conversion is invalid. 503 * @throws MessageNotReadableException if the message is in write-only 504 * mode. 505 */ 506 507 public long readLong() throws JMSException { 508 initializeReading(); 509 try { 510 if (this.dataIn.available() == 0) { 511 throw new MessageEOFException("reached end of data"); 512 } 513 514 this.dataIn.mark(65); 515 int type = this.dataIn.read(); 516 if (type == LONG) { 517 return this.dataIn.readLong(); 518 } 519 if (type == INT) { 520 return this.dataIn.readInt(); 521 } 522 if (type == SHORT) { 523 return this.dataIn.readShort(); 524 } 525 if (type == BYTE) { 526 return this.dataIn.readByte(); 527 } 528 if (type == STRING) { 529 return Long.valueOf(this.dataIn.readUTF()).longValue(); 530 } 531 if (type == NULL) { 532 this.dataIn.reset(); 533 throw new NullPointerException("Cannot convert NULL value to long."); 534 } 535 else { 536 this.dataIn.reset(); 537 throw new MessageFormatException(" not a long type"); 538 } 539 } 540 catch (NumberFormatException mfe) { 541 try { 542 this.dataIn.reset(); 543 } 544 catch (IOException ioe) { 545 JMSException jmsEx = new JMSException("reset failed"); 546 jmsEx.setLinkedException(ioe); 547 } 548 throw mfe; 549 550 } 551 catch (EOFException e) { 552 JMSException jmsEx = new MessageEOFException(e.getMessage()); 553 jmsEx.setLinkedException(e); 554 throw jmsEx; 555 } 556 catch (IOException e) { 557 JMSException jmsEx = new MessageFormatException(e.getMessage()); 558 jmsEx.setLinkedException(e); 559 throw jmsEx; 560 } 561 } 562 563 564 /** 565 * Reads a <code>float</code> from the stream message. 566 * 567 * @return a <code>float</code> value from the stream message 568 * @throws JMSException if the JMS provider fails to read the message 569 * due to some internal error. 570 * @throws MessageEOFException if unexpected end of message stream has 571 * been reached. 572 * @throws MessageFormatException if this type conversion is invalid. 573 * @throws MessageNotReadableException if the message is in write-only 574 * mode. 575 */ 576 577 public float readFloat() throws JMSException { 578 initializeReading(); 579 try { 580 if (this.dataIn.available() == 0) { 581 throw new MessageEOFException("reached end of data"); 582 } 583 584 this.dataIn.mark(33); 585 int type = this.dataIn.read(); 586 if (type == FLOAT) { 587 return this.dataIn.readFloat(); 588 } 589 if (type == STRING) { 590 return Float.valueOf(this.dataIn.readUTF()).floatValue(); 591 } 592 if (type == NULL) { 593 this.dataIn.reset(); 594 throw new NullPointerException("Cannot convert NULL value to float."); 595 } 596 else { 597 this.dataIn.reset(); 598 throw new MessageFormatException(" not a float type"); 599 } 600 } 601 catch (NumberFormatException mfe) { 602 try { 603 this.dataIn.reset(); 604 } 605 catch (IOException ioe) { 606 JMSException jmsEx = new JMSException("reset failed"); 607 jmsEx.setLinkedException(ioe); 608 } 609 throw mfe; 610 611 } 612 catch (EOFException e) { 613 JMSException jmsEx = new MessageEOFException(e.getMessage()); 614 jmsEx.setLinkedException(e); 615 throw jmsEx; 616 } 617 catch (IOException e) { 618 JMSException jmsEx = new MessageFormatException(e.getMessage()); 619 jmsEx.setLinkedException(e); 620 throw jmsEx; 621 } 622 } 623 624 625 /** 626 * Reads a <code>double</code> from the stream message. 627 * 628 * @return a <code>double</code> value from the stream message 629 * @throws JMSException if the JMS provider fails to read the message 630 * due to some internal error. 631 * @throws MessageEOFException if unexpected end of message stream has 632 * been reached. 633 * @throws MessageFormatException if this type conversion is invalid. 634 * @throws MessageNotReadableException if the message is in write-only 635 * mode. 636 */ 637 638 public double readDouble() throws JMSException { 639 initializeReading(); 640 try { 641 if (this.dataIn.available() == 0) { 642 throw new MessageEOFException("reached end of data"); 643 } 644 645 this.dataIn.mark(65); 646 int type = this.dataIn.read(); 647 if (type == DOUBLE) { 648 return this.dataIn.readDouble(); 649 } 650 if (type == FLOAT) { 651 return this.dataIn.readFloat(); 652 } 653 if (type == STRING) { 654 return Double.valueOf(this.dataIn.readUTF()).doubleValue(); 655 } 656 if (type == NULL) { 657 this.dataIn.reset(); 658 throw new NullPointerException("Cannot convert NULL value to double."); 659 } 660 else { 661 this.dataIn.reset(); 662 throw new MessageFormatException(" not a double type"); 663 } 664 } 665 catch (NumberFormatException mfe) { 666 try { 667 this.dataIn.reset(); 668 } 669 catch (IOException ioe) { 670 JMSException jmsEx = new JMSException("reset failed"); 671 jmsEx.setLinkedException(ioe); 672 } 673 throw mfe; 674 675 } 676 catch (EOFException e) { 677 JMSException jmsEx = new MessageEOFException(e.getMessage()); 678 jmsEx.setLinkedException(e); 679 throw jmsEx; 680 } 681 catch (IOException e) { 682 JMSException jmsEx = new MessageFormatException(e.getMessage()); 683 jmsEx.setLinkedException(e); 684 throw jmsEx; 685 } 686 } 687 688 689 /** 690 * Reads a <CODE>String</CODE> from the stream message. 691 * 692 * @return a Unicode string from the stream message 693 * @throws JMSException if the JMS provider fails to read the message 694 * due to some internal error. 695 * @throws MessageEOFException if unexpected end of message stream has 696 * been reached. 697 * @throws MessageFormatException if this type conversion is invalid. 698 * @throws MessageNotReadableException if the message is in write-only 699 * mode. 700 */ 701 702 public String readString() throws JMSException { 703 initializeReading(); 704 try { 705 if (this.dataIn.available() == 0) { 706 throw new MessageEOFException("reached end of data"); 707 } 708 709 this.dataIn.mark(65); 710 int type = this.dataIn.read(); 711 if (type == NULL) { 712 return null; 713 } 714 if (type == STRING) { 715 return this.dataIn.readUTF(); 716 } 717 if (type == LONG) { 718 return new Long(this.dataIn.readLong()).toString(); 719 } 720 if (type == INT) { 721 return new Integer(this.dataIn.readInt()).toString(); 722 } 723 if (type == SHORT) { 724 return new Short(this.dataIn.readShort()).toString(); 725 } 726 if (type == BYTE) { 727 return new Byte(this.dataIn.readByte()).toString(); 728 } 729 if (type == FLOAT) { 730 return new Float(this.dataIn.readFloat()).toString(); 731 } 732 if (type == DOUBLE) { 733 return new Double(this.dataIn.readDouble()).toString(); 734 } 735 if (type == BOOLEAN) { 736 return (this.dataIn.readBoolean() ? Boolean.TRUE : Boolean.FALSE).toString(); 737 } 738 if (type == CHAR) { 739 return new Character(this.dataIn.readChar()).toString(); 740 } 741 else { 742 this.dataIn.reset(); 743 throw new MessageFormatException(" not a String type"); 744 } 745 } 746 catch (NumberFormatException mfe) { 747 try { 748 this.dataIn.reset(); 749 } 750 catch (IOException ioe) { 751 JMSException jmsEx = new JMSException("reset failed"); 752 jmsEx.setLinkedException(ioe); 753 } 754 throw mfe; 755 756 } 757 catch (EOFException e) { 758 JMSException jmsEx = new MessageEOFException(e.getMessage()); 759 jmsEx.setLinkedException(e); 760 throw jmsEx; 761 } 762 catch (IOException e) { 763 JMSException jmsEx = new MessageFormatException(e.getMessage()); 764 jmsEx.setLinkedException(e); 765 throw jmsEx; 766 } 767 } 768 769 770 /** 771 * Reads a byte array field from the stream message into the 772 * specified <CODE>byte[]</CODE> object (the read buffer). 773 * <p/> 774 * <P>To read the field value, <CODE>readBytes</CODE> should be 775 * successively called 776 * until it returns a value less than the length of the read buffer. 777 * The value of the bytes in the buffer following the last byte 778 * read is undefined. 779 * <p/> 780 * <P>If <CODE>readBytes</CODE> returns a value equal to the length of the 781 * buffer, a subsequent <CODE>readBytes</CODE> call must be made. If there 782 * are no more bytes to be read, this call returns -1. 783 * <p/> 784 * <P>If the byte array field value is null, <CODE>readBytes</CODE> 785 * returns -1. 786 * <p/> 787 * <P>If the byte array field value is empty, <CODE>readBytes</CODE> 788 * returns 0. 789 * <p/> 790 * <P>Once the first <CODE>readBytes</CODE> call on a <CODE>byte[]</CODE> 791 * field value has been made, 792 * the full value of the field must be read before it is valid to read 793 * the next field. An attempt to read the next field before that has 794 * been done will throw a <CODE>MessageFormatException</CODE>. 795 * <p/> 796 * <P>To read the byte field value into a new <CODE>byte[]</CODE> object, 797 * use the <CODE>readObject</CODE> method. 798 * 799 * @param value the buffer into which the data is read 800 * @return the total number of bytes read into the buffer, or -1 if 801 * there is no more data because the end of the byte field has been 802 * reached 803 * @throws JMSException if the JMS provider fails to read the message 804 * due to some internal error. 805 * @throws MessageEOFException if unexpected end of message stream has 806 * been reached. 807 * @throws MessageFormatException if this type conversion is invalid. 808 * @throws MessageNotReadableException if the message is in write-only 809 * mode. 810 * @see #readObject() 811 */ 812 813 public int readBytes(byte[] value) throws JMSException { 814 initializeReading(); 815 try { 816 if (value == null) { 817 throw new NullPointerException(); 818 } 819 if (bytesToRead == 0) { 820 bytesToRead = -1; 821 return -1; 822 } 823 else if (bytesToRead > 0) { 824 if (value.length >= bytesToRead) { 825 bytesToRead = 0; 826 return dataIn.read(value, 0, bytesToRead); 827 } 828 else { 829 bytesToRead -= value.length; 830 return dataIn.read(value); 831 } 832 } 833 else { 834 if (this.dataIn.available() == 0) { 835 throw new MessageEOFException("reached end of data"); 836 } 837 if (this.dataIn.available() < 1) { 838 throw new MessageFormatException("Not enough data left to read value"); 839 } 840 this.dataIn.mark(value.length + 1); 841 int type = this.dataIn.read(); 842 if (this.dataIn.available() < 1) { 843 return -1; 844 } 845 if (type != BYTES) { 846 throw new MessageFormatException("Not a byte array"); 847 } 848 int len = this.dataIn.readInt(); 849 850 if (len >= value.length) { 851 bytesToRead = len - value.length; 852 return this.dataIn.read(value); 853 } 854 else { 855 bytesToRead = 0; 856 return this.dataIn.read(value, 0, len); 857 } 858 } 859 } 860 catch (EOFException e) { 861 JMSException jmsEx = new MessageEOFException(e.getMessage()); 862 jmsEx.setLinkedException(e); 863 throw jmsEx; 864 } 865 catch (IOException e) { 866 JMSException jmsEx = new MessageFormatException(e.getMessage()); 867 jmsEx.setLinkedException(e); 868 throw jmsEx; 869 } 870 } 871 872 873 /** 874 * Reads an object from the stream message. 875 * <p/> 876 * <P>This method can be used to return, in objectified format, 877 * an object in the Java programming language ("Java object") that has 878 * been written to the stream with the equivalent 879 * <CODE>writeObject</CODE> method call, or its equivalent primitive 880 * <CODE>write<I>type</I></CODE> method. 881 * <p/> 882 * <P>Note that byte values are returned as <CODE>byte[]</CODE>, not 883 * <CODE>Byte[]</CODE>. 884 * <p/> 885 * <P>An attempt to call <CODE>readObject</CODE> to read a byte field 886 * value into a new <CODE>byte[]</CODE> object before the full value of the 887 * byte field has been read will throw a 888 * <CODE>MessageFormatException</CODE>. 889 * 890 * @return a Java object from the stream message, in objectified 891 * format (for example, if the object was written as an <CODE>int</CODE>, 892 * an <CODE>Integer</CODE> is returned) 893 * @throws JMSException if the JMS provider fails to read the message 894 * due to some internal error. 895 * @throws MessageEOFException if unexpected end of message stream has 896 * been reached. 897 * @throws MessageFormatException if this type conversion is invalid. 898 * @throws MessageNotReadableException if the message is in write-only 899 * mode. 900 * @see #readBytes(byte[] value) 901 */ 902 903 public Object readObject() throws JMSException { 904 initializeReading(); 905 try { 906 if (this.dataIn.available() == 0) { 907 throw new MessageEOFException("reached end of data"); 908 } 909 910 this.dataIn.mark(65); 911 int type = this.dataIn.read(); 912 if (type == NULL) { 913 return null; 914 } 915 if (type == STRING) { 916 return this.dataIn.readUTF(); 917 } 918 if (type == LONG) { 919 return new Long(this.dataIn.readLong()); 920 } 921 if (type == INT) { 922 return new Integer(this.dataIn.readInt()); 923 } 924 if (type == SHORT) { 925 return new Short(this.dataIn.readShort()); 926 } 927 if (type == BYTE) { 928 return new Byte(this.dataIn.readByte()); 929 } 930 if (type == FLOAT) { 931 return new Float(this.dataIn.readFloat()); 932 } 933 if (type == DOUBLE) { 934 return new Double(this.dataIn.readDouble()); 935 } 936 if (type == BOOLEAN) { 937 return this.dataIn.readBoolean() ? Boolean.TRUE : Boolean.FALSE; 938 } 939 if (type == CHAR) { 940 return new Character(this.dataIn.readChar()); 941 } 942 if (type == BYTES) { 943 int len = this.dataIn.readInt(); 944 byte[] value = new byte[len]; 945 this.dataIn.read(value); 946 return value; 947 } 948 else { 949 this.dataIn.reset(); 950 throw new MessageFormatException("unknown type"); 951 } 952 } 953 catch (NumberFormatException mfe) { 954 try { 955 this.dataIn.reset(); 956 } 957 catch (IOException ioe) { 958 JMSException jmsEx = new JMSException("reset failed"); 959 jmsEx.setLinkedException(ioe); 960 } 961 throw mfe; 962 963 } 964 catch (EOFException e) { 965 JMSException jmsEx = new MessageEOFException(e.getMessage()); 966 jmsEx.setLinkedException(e); 967 throw jmsEx; 968 } 969 catch (IOException e) { 970 JMSException jmsEx = new MessageFormatException(e.getMessage()); 971 jmsEx.setLinkedException(e); 972 throw jmsEx; 973 } 974 } 975 976 977 /** 978 * Writes a <code>boolean</code> to the stream message. 979 * The value <code>true</code> is written as the value 980 * <code>(byte)1</code>; the value <code>false</code> is written as 981 * the value <code>(byte)0</code>. 982 * 983 * @param value the <code>boolean</code> value to be written 984 * @throws JMSException if the JMS provider fails to write the message 985 * due to some internal error. 986 * @throws MessageNotWriteableException if the message is in read-only 987 * mode. 988 */ 989 990 public void writeBoolean(boolean value) throws JMSException { 991 initializeWriting(); 992 try { 993 this.dataOut.write(BOOLEAN); 994 this.dataOut.writeBoolean(value); 995 } 996 catch (IOException ioe) { 997 JMSException jmsEx = new JMSException(ioe.getMessage()); 998 jmsEx.setLinkedException(ioe); 999 throw jmsEx; 1000 } 1001 } 1002 1003 1004 /** 1005 * Writes a <code>byte</code> to the stream message. 1006 * 1007 * @param value the <code>byte</code> value to be written 1008 * @throws JMSException if the JMS provider fails to write the message 1009 * due to some internal error. 1010 * @throws MessageNotWriteableException if the message is in read-only 1011 * mode. 1012 */ 1013 1014 public void writeByte(byte value) throws JMSException { 1015 initializeWriting(); 1016 try { 1017 this.dataOut.write(BYTE); 1018 this.dataOut.writeByte(value); 1019 } 1020 catch (IOException ioe) { 1021 JMSException jmsEx = new JMSException(ioe.getMessage()); 1022 jmsEx.setLinkedException(ioe); 1023 throw jmsEx; 1024 } 1025 } 1026 1027 1028 /** 1029 * Writes a <code>short</code> to the stream message. 1030 * 1031 * @param value the <code>short</code> value to be written 1032 * @throws JMSException if the JMS provider fails to write the message 1033 * due to some internal error. 1034 * @throws MessageNotWriteableException if the message is in read-only 1035 * mode. 1036 */ 1037 1038 public void writeShort(short value) throws JMSException { 1039 initializeWriting(); 1040 try { 1041 this.dataOut.write(SHORT); 1042 this.dataOut.writeShort(value); 1043 } 1044 catch (IOException ioe) { 1045 JMSException jmsEx = new JMSException(ioe.getMessage()); 1046 jmsEx.setLinkedException(ioe); 1047 throw jmsEx; 1048 } 1049 } 1050 1051 1052 /** 1053 * Writes a <code>char</code> to the stream message. 1054 * 1055 * @param value the <code>char</code> value to be written 1056 * @throws JMSException if the JMS provider fails to write the message 1057 * due to some internal error. 1058 * @throws MessageNotWriteableException if the message is in read-only 1059 * mode. 1060 */ 1061 1062 public void writeChar(char value) throws JMSException { 1063 initializeWriting(); 1064 try { 1065 this.dataOut.write(CHAR); 1066 this.dataOut.writeChar(value); 1067 } 1068 catch (IOException ioe) { 1069 JMSException jmsEx = new JMSException(ioe.getMessage()); 1070 jmsEx.setLinkedException(ioe); 1071 throw jmsEx; 1072 } 1073 } 1074 1075 1076 /** 1077 * Writes an <code>int</code> to the stream message. 1078 * 1079 * @param value the <code>int</code> value to be written 1080 * @throws JMSException if the JMS provider fails to write the message 1081 * due to some internal error. 1082 * @throws MessageNotWriteableException if the message is in read-only 1083 * mode. 1084 */ 1085 1086 public void writeInt(int value) throws JMSException { 1087 initializeWriting(); 1088 try { 1089 this.dataOut.write(INT); 1090 this.dataOut.writeInt(value); 1091 } 1092 catch (IOException ioe) { 1093 JMSException jmsEx = new JMSException(ioe.getMessage()); 1094 jmsEx.setLinkedException(ioe); 1095 throw jmsEx; 1096 } 1097 } 1098 1099 1100 /** 1101 * Writes a <code>long</code> to the stream message. 1102 * 1103 * @param value the <code>long</code> value to be written 1104 * @throws JMSException if the JMS provider fails to write the message 1105 * due to some internal error. 1106 * @throws MessageNotWriteableException if the message is in read-only 1107 * mode. 1108 */ 1109 1110 public void writeLong(long value) throws JMSException { 1111 initializeWriting(); 1112 try { 1113 this.dataOut.write(LONG); 1114 this.dataOut.writeLong(value); 1115 } 1116 catch (IOException ioe) { 1117 JMSException jmsEx = new JMSException(ioe.getMessage()); 1118 jmsEx.setLinkedException(ioe); 1119 throw jmsEx; 1120 } 1121 } 1122 1123 1124 /** 1125 * Writes a <code>float</code> to the stream message. 1126 * 1127 * @param value the <code>float</code> value to be written 1128 * @throws JMSException if the JMS provider fails to write the message 1129 * due to some internal error. 1130 * @throws MessageNotWriteableException if the message is in read-only 1131 * mode. 1132 */ 1133 1134 public void writeFloat(float value) throws JMSException { 1135 initializeWriting(); 1136 try { 1137 this.dataOut.write(FLOAT); 1138 this.dataOut.writeFloat(value); 1139 } 1140 catch (IOException ioe) { 1141 JMSException jmsEx = new JMSException(ioe.getMessage()); 1142 jmsEx.setLinkedException(ioe); 1143 throw jmsEx; 1144 } 1145 } 1146 1147 1148 /** 1149 * Writes a <code>double</code> to the stream message. 1150 * 1151 * @param value the <code>double</code> value to be written 1152 * @throws JMSException if the JMS provider fails to write the message 1153 * due to some internal error. 1154 * @throws MessageNotWriteableException if the message is in read-only 1155 * mode. 1156 */ 1157 1158 public void writeDouble(double value) throws JMSException { 1159 initializeWriting(); 1160 try { 1161 this.dataOut.write(DOUBLE); 1162 this.dataOut.writeDouble(value); 1163 } 1164 catch (IOException ioe) { 1165 JMSException jmsEx = new JMSException(ioe.getMessage()); 1166 jmsEx.setLinkedException(ioe); 1167 throw jmsEx; 1168 } 1169 } 1170 1171 1172 /** 1173 * Writes a <code>String</code> to the stream message. 1174 * 1175 * @param value the <code>String</code> value to be written 1176 * @throws JMSException if the JMS provider fails to write the message 1177 * due to some internal error. 1178 * @throws MessageNotWriteableException if the message is in read-only 1179 * mode. 1180 */ 1181 1182 public void writeString(String value) throws JMSException { 1183 initializeWriting(); 1184 try { 1185 if (value == null) { 1186 this.dataOut.write(NULL); 1187 } 1188 else { 1189 this.dataOut.write(STRING); 1190 this.dataOut.writeUTF(value); 1191 } 1192 } 1193 catch (IOException ioe) { 1194 JMSException jmsEx = new JMSException(ioe.getMessage()); 1195 jmsEx.setLinkedException(ioe); 1196 throw jmsEx; 1197 } 1198 } 1199 1200 1201 /** 1202 * Writes a byte array field to the stream message. 1203 * <p/> 1204 * <P>The byte array <code>value</code> is written to the message 1205 * as a byte array field. Consecutively written byte array fields are 1206 * treated as two distinct fields when the fields are read. 1207 * 1208 * @param value the byte array value to be written 1209 * @throws JMSException if the JMS provider fails to write the message 1210 * due to some internal error. 1211 * @throws MessageNotWriteableException if the message is in read-only 1212 * mode. 1213 */ 1214 1215 public void writeBytes(byte[] value) throws JMSException { 1216 writeBytes(value, 0, value.length); 1217 } 1218 1219 1220 /** 1221 * Writes a portion of a byte array as a byte array field to the stream 1222 * message. 1223 * <p/> 1224 * <P>The a portion of the byte array <code>value</code> is written to the 1225 * message as a byte array field. Consecutively written byte 1226 * array fields are treated as two distinct fields when the fields are 1227 * read. 1228 * 1229 * @param value the byte array value to be written 1230 * @param offset the initial offset within the byte array 1231 * @param length the number of bytes to use 1232 * @throws JMSException if the JMS provider fails to write the message 1233 * due to some internal error. 1234 * @throws MessageNotWriteableException if the message is in read-only 1235 * mode. 1236 */ 1237 1238 public void writeBytes(byte[] value, int offset, int length) throws JMSException { 1239 initializeWriting(); 1240 try { 1241 this.dataOut.write(BYTES); 1242 this.dataOut.writeInt(length); 1243 this.dataOut.write(value, offset, length); 1244 } 1245 catch (IOException ioe) { 1246 JMSException jmsEx = new JMSException(ioe.getMessage()); 1247 jmsEx.setLinkedException(ioe); 1248 throw jmsEx; 1249 } 1250 } 1251 1252 1253 /** 1254 * Writes an object to the stream message. 1255 * <p/> 1256 * <P>This method works only for the objectified primitive 1257 * object types (<code>Integer</code>, <code>Double</code>, 1258 * <code>Long</code> ...), <code>String</code> objects, and byte 1259 * arrays. 1260 * 1261 * @param value the Java object to be written 1262 * @throws JMSException if the JMS provider fails to write the message 1263 * due to some internal error. 1264 * @throws MessageFormatException if the object is invalid. 1265 * @throws MessageNotWriteableException if the message is in read-only 1266 * mode. 1267 */ 1268 1269 public void writeObject(Object value) throws JMSException { 1270 initializeWriting(); 1271 if (value == null) { 1272 try { 1273 this.dataOut.write(NULL); 1274 } 1275 catch (IOException ioe) { 1276 JMSException jmsEx = new JMSException(ioe.getMessage()); 1277 jmsEx.setLinkedException(ioe); 1278 throw jmsEx; 1279 } 1280 } 1281 else if (value instanceof String) { 1282 writeString(value.toString()); 1283 } 1284 else if (value instanceof Character) { 1285 writeChar(((Character) value).charValue()); 1286 } 1287 else if (value instanceof Boolean) { 1288 writeBoolean(((Boolean) value).booleanValue()); 1289 } 1290 else if (value instanceof Byte) { 1291 writeByte(((Byte) value).byteValue()); 1292 } 1293 else if (value instanceof Short) { 1294 writeShort(((Short) value).shortValue()); 1295 } 1296 else if (value instanceof Integer) { 1297 writeInt(((Integer) value).intValue()); 1298 } 1299 else if (value instanceof Float) { 1300 writeFloat(((Float) value).floatValue()); 1301 } 1302 else if (value instanceof Double) { 1303 writeDouble(((Double) value).doubleValue()); 1304 } 1305 else if (value instanceof byte[]) { 1306 writeBytes((byte[]) value); 1307 } 1308 } 1309 1310 1311 /** 1312 * Puts the message body in read-only mode and repositions the stream of 1313 * bytes to the beginning. 1314 * 1315 * @throws JMSException if an internal error occurs 1316 */ 1317 1318 public void reset() throws JMSException { 1319 super.readOnlyMessage = true; 1320 if (this.dataOut != null) { 1321 try { 1322 this.dataOut.flush(); 1323 byte[] data = this.bytesOut.toByteArray(); 1324 super.setBodyAsBytes(data,0,data.length); 1325 dataOut.close(); 1326 } 1327 catch (IOException ioe) { 1328 JMSException jmsEx = new JMSException("reset failed: " + ioe.getMessage()); 1329 jmsEx.setLinkedException(ioe); 1330 throw jmsEx; 1331 } 1332 } 1333 this.bytesOut = null; 1334 this.dataIn = null; 1335 this.dataOut = null; 1336 } 1337 1338 /** 1339 * @param bodyAsBytes The bodyAsBytes to set. 1340 * @param offset 1341 * @param length 1342 */ 1343 public void setBodyAsBytes(byte[] bodyAsBytes,int offset, int length) { 1344 super.setBodyAsBytes(bodyAsBytes,offset,length); 1345 dataOut = null; 1346 dataIn = null; 1347 } 1348 1349 /** 1350 * @return Returns the data body 1351 * @throws IOException if an exception occurs retreiving the data 1352 */ 1353 public ByteArray getBodyAsBytes() throws IOException { 1354 if (this.dataOut != null) { 1355 this.dataOut.flush(); 1356 byte[] data = this.bytesOut.toByteArray(); 1357 super.setBodyAsBytes(data,0,data.length); 1358 dataOut.close(); 1359 dataOut = null; 1360 } 1361 return super.getBodyAsBytes(); 1362 } 1363 1364 private void initializeWriting() throws MessageNotWriteableException { 1365 if (super.readOnlyMessage) { 1366 throw new MessageNotWriteableException("This message is in read-only mode"); 1367 } 1368 if (this.dataOut == null) { 1369 this.bytesOut = new ByteArrayOutputStream(); 1370 this.dataOut = new DataOutputStream(this.bytesOut); 1371 } 1372 } 1373 1374 1375 private void initializeReading() throws MessageNotReadableException { 1376 if (!super.readOnlyMessage) { 1377 throw new MessageNotReadableException("This message is in write-only mode"); 1378 } 1379 try { 1380 ByteArray data = super.getBodyAsBytes(); 1381 if (this.dataIn == null && data != null) { 1382 if (ByteArrayCompression.isCompressed(data)){ 1383 ByteArrayCompression compression = new ByteArrayCompression(); 1384 data = compression.inflate(data); 1385 } 1386 ByteArrayInputStream bytesIn = new ByteArrayInputStream(data.getBuf(),data.getOffset(),data.getLength()); 1387 this.dataIn = new DataInputStream(bytesIn); 1388 } 1389 } 1390 catch (IOException e) { 1391 MessageNotReadableException mnr = new MessageNotReadableException("getBodyAsBytes failed"); 1392 mnr.setLinkedException(e); 1393 throw mnr; 1394 } 1395 } 1396 1397 public String toString() { 1398 return super.toString() + " ActiveMQStreamMessage{ " + 1399 "bytesOut = " + bytesOut + 1400 ", dataOut = " + dataOut + 1401 ", dataIn = " + dataIn + 1402 ", bytesToRead = " + bytesToRead + 1403 " }"; 1404 } 1405 }