001 /** 002 * 003 * Copyright 2004 Protique Ltd 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 * 017 **/ 018 package org.activemq.service.impl; 019 020 import org.activemq.service.QueueList; 021 import org.activemq.service.QueueListEntry; 022 import org.activemq.util.FastInputStream; 023 import org.activemq.util.FastOutputStream; 024 import org.activemq.util.JMSExceptionHelper; 025 026 import javax.jms.JMSException; 027 import java.io.ByteArrayInputStream; 028 import java.io.ByteArrayOutputStream; 029 import java.io.DataInputStream; 030 import java.io.DataOutputStream; 031 import java.io.IOException; 032 import java.io.Serializable; 033 034 /** 035 * A base class which is useful for implementation inheritence when implementing 036 * a persistent QueueList 037 * 038 * @version $Revision: 1.1.1.1 $ 039 */ 040 public abstract class QueueListSupport implements QueueList { 041 protected static final Long HEAD_KEY = new Long(0); 042 043 public static class Header implements Serializable { 044 private static final long serialVersionUID = 64734383295040L; 045 046 public Long headKey; 047 public Long tailKey; 048 public long lastKeyValue; 049 public int size; 050 051 public byte[] asBytes() throws IOException { 052 ByteArrayOutputStream buffer = new ByteArrayOutputStream(); 053 DataOutputStream out = new DataOutputStream(buffer); 054 out.writeLong(unwrapLong(headKey)); 055 out.writeLong(unwrapLong(tailKey)); 056 out.writeLong(lastKeyValue); 057 out.writeInt(size); 058 return buffer.toByteArray(); 059 } 060 061 public void fromBytes(byte[] data) throws IOException { 062 DataInputStream in = new DataInputStream(new ByteArrayInputStream(data)); 063 this.headKey = wrapLong(in.readLong()); 064 this.tailKey = wrapLong(in.readLong()); 065 this.lastKeyValue = in.readLong(); 066 this.size = in.readInt(); 067 } 068 } 069 070 public static class Node implements Serializable, QueueListEntry { 071 private static final long serialVersionUID = 4609474001468609536L; 072 073 public Long previousKey; 074 public Long nextKey; 075 public Object value; 076 077 // not stored, but cached when read from the table 078 public transient Long key; 079 080 public Object getElement() { 081 return value; 082 } 083 084 public byte[] asBytes() throws IOException { 085 FastOutputStream buffer = new FastOutputStream(); 086 DataOutputStream out = new DataOutputStream(buffer); 087 out.writeLong(unwrapLong(previousKey)); 088 out.writeLong(unwrapLong(nextKey)); 089 out.writeUTF((String) value); 090 return buffer.toByteArray(); 091 } 092 093 public void fromBytes(byte[] data) throws IOException { 094 DataInputStream in = new DataInputStream(new FastInputStream(data)); 095 this.previousKey = wrapLong(in.readLong()); 096 this.nextKey = wrapLong(in.readLong()); 097 this.value = in.readUTF(); 098 } 099 } 100 101 public Object getFirst() throws JMSException { 102 try { 103 Long key = getHeader().headKey; 104 if (key != null) { 105 Node node = getNode(key); 106 if (node != null) { 107 return node.getElement(); 108 } 109 } 110 return null; 111 } 112 catch (IOException e) { 113 throw JMSExceptionHelper.newJMSException("Failed to read from table: " + e, e); 114 } 115 } 116 117 public Object getLast() throws JMSException { 118 try { 119 Long key = getHeader().tailKey; 120 if (key != null) { 121 Node node = getNode(key); 122 if (node != null) { 123 return node.getElement(); 124 } 125 } 126 return null; 127 } 128 catch (IOException e) { 129 throw JMSExceptionHelper.newJMSException("Failed to read from table: " + e, e); 130 } 131 } 132 133 public Object removeFirst() throws JMSException { 134 try { 135 Header header = getHeader(); 136 Long key = header.headKey; 137 if (key != null) { 138 Node node = getNode(key); 139 if (node != null) { 140 doRemoveNode(node); 141 header.headKey = node.nextKey; 142 --header.size; 143 updateHeader(header); 144 return node.getElement(); 145 } 146 } 147 return null; 148 } 149 catch (IOException e) { 150 throw JMSExceptionHelper.newJMSException("Failed to write to table: " + e, e); 151 } 152 } 153 154 public Object removeLast() throws JMSException { 155 try { 156 Header header = getHeader(); 157 Long key = header.tailKey; 158 if (key != null) { 159 Node node = getNode(key); 160 if (node != null) { 161 doRemoveNode(node); 162 header.tailKey = node.previousKey; 163 --header.size; 164 updateHeader(header); 165 return node.getElement(); 166 } 167 } 168 return null; 169 } 170 catch (IOException e) { 171 throw JMSExceptionHelper.newJMSException("Failed to write to table: " + e, e); 172 } 173 } 174 175 public QueueListEntry addFirst(Object value) throws JMSException { 176 try { 177 Header header = getHeader(); 178 Node node = createNode(); 179 node.value = value; 180 Long nextKey = header.headKey; 181 node.nextKey = nextKey; 182 Long key = createKey(header); 183 node.key = key; 184 updateNode(node); 185 updateNextNode(nextKey, key); 186 header.headKey = key; 187 if (header.tailKey == null) { 188 header.tailKey = key; 189 } 190 header.size++; 191 updateHeader(header); 192 return node; 193 } 194 catch (IOException e) { 195 throw JMSExceptionHelper.newJMSException("Failed to write to table: " + e, e); 196 } 197 } 198 199 public QueueListEntry addLast(Object value) throws JMSException { 200 try { 201 Header header = getHeader(); 202 return doAddLast(value, header); 203 } 204 catch (IOException e) { 205 throw JMSExceptionHelper.newJMSException("Failed to write to table: " + e, e); 206 } 207 } 208 209 public boolean contains(Object value) throws JMSException { 210 return indexOf(value) != -1; 211 } 212 213 public int size() throws JMSException { 214 try { 215 return getHeader().size; 216 } 217 catch (IOException e) { 218 throw JMSExceptionHelper.newJMSException("Failed to read from table: " + e, e); 219 } 220 } 221 222 public boolean isEmpty() throws JMSException { 223 int size = size(); 224 return size == 0; 225 } 226 227 public QueueListEntry add(Object value) throws JMSException { 228 return addLast(value); 229 } 230 231 public Object get(int index) throws JMSException { 232 try { 233 Node node = getNode(index); 234 if (node != null) { 235 return node.value; 236 } 237 return null; 238 } 239 catch (IOException e) { 240 throw JMSExceptionHelper.newJMSException("Failed to read from table: " + e, e); 241 } 242 } 243 244 public Object set(int index, Object element) throws JMSException { 245 try { 246 Node node = getNode(index); 247 if (node != null) { 248 Object previous = node.value; 249 node.value = element; 250 updateNode(node); 251 return previous; 252 } 253 return null; 254 } 255 catch (IOException e) { 256 throw JMSExceptionHelper.newJMSException("Failed to write to table: " + e, e); 257 } 258 } 259 260 public void add(int index, Object element) throws JMSException { 261 if (index == 0) { 262 addFirst(element); 263 } 264 else { 265 try { 266 Header header = getHeader(); 267 Node nextNode = getNode(header, index); 268 doAddBefore(header, nextNode, element); 269 } 270 catch (IOException e) { 271 throw JMSExceptionHelper.newJMSException("Failed to write to table: " + e, e); 272 } 273 } 274 } 275 276 public Object remove(int index) throws JMSException { 277 try { 278 Node node = getNode(index); 279 if (node != null) { 280 removeNode(node); 281 } 282 return null; 283 } 284 catch (IOException e) { 285 throw JMSExceptionHelper.newJMSException("Failed to write to table: " + e, e); 286 } 287 } 288 289 public int indexOf(Object value) throws JMSException { 290 try { 291 Header header = getHeader(); 292 Long key = header.headKey; 293 for (int i = 0; key != null; i++) { 294 Node node = getNode(key); 295 if (node == null) { 296 break; 297 } 298 if (value == node.value || (value != null && value.equals(node.value))) { 299 return i; 300 } 301 key = node.nextKey; 302 } 303 return -1; 304 } 305 catch (IOException e) { 306 throw JMSExceptionHelper.newJMSException("Failed to read from table: " + e, e); 307 } 308 } 309 310 public int lastIndexOf(Object value) throws JMSException { 311 try { 312 Header header = getHeader(); 313 Long key = header.tailKey; 314 for (int i = header.size - 1; key != null; i--) { 315 Node node = getNode(key); 316 if (node == null) { 317 break; 318 } 319 if (value == node.value || (value != null && value.equals(node.value))) { 320 return i; 321 } 322 key = node.previousKey; 323 } 324 return -1; 325 } 326 catch (IOException e) { 327 throw JMSExceptionHelper.newJMSException("Failed to read from table: " + e, e); 328 } 329 } 330 331 public QueueListEntry getFirstEntry() throws JMSException { 332 try { 333 return getNode(getHeader().headKey); 334 } 335 catch (IOException e) { 336 throw JMSExceptionHelper.newJMSException("Failed to read from table: " + e, e); 337 } 338 } 339 340 public QueueListEntry getLastEntry() throws JMSException { 341 try { 342 return getNode(getHeader().tailKey); 343 } 344 catch (IOException e) { 345 throw JMSExceptionHelper.newJMSException("Failed to read from table: " + e, e); 346 } 347 } 348 349 public QueueListEntry getNextEntry(QueueListEntry entry) throws JMSException { 350 Node node = (Node) entry; 351 try { 352 return getNode(node.nextKey); 353 } 354 catch (IOException e) { 355 throw JMSExceptionHelper.newJMSException("Failed to read from table: " + e, e); 356 } 357 } 358 359 public QueueListEntry getPrevEntry(QueueListEntry entry) throws JMSException { 360 Node node = (Node) entry; 361 try { 362 return getNode(node.previousKey); 363 } 364 catch (IOException e) { 365 throw JMSExceptionHelper.newJMSException("Failed to read from table: " + e, e); 366 } 367 } 368 369 public QueueListEntry addBefore(Object value, QueueListEntry entry) throws JMSException { 370 try { 371 return doAddBefore(getHeader(), (Node) entry, value); 372 } 373 catch (IOException e) { 374 throw JMSExceptionHelper.newJMSException("Failed to write to table: " + e, e); 375 } 376 } 377 378 public void remove(QueueListEntry entry) throws JMSException { 379 try { 380 removeNode((Node) entry); 381 } 382 catch (IOException e) { 383 throw JMSExceptionHelper.newJMSException("Failed to write to table: " + e, e); 384 } 385 } 386 387 public Object[] toArray() throws JMSException { 388 try { 389 Header header = getHeader(); 390 int size = header.size; 391 if (size == 0) { 392 return EMPTY_ARRAY; 393 } 394 Long key = header.headKey; 395 Object[] answer = new Object[size]; 396 for (int i = 0; i < size && key != null; i++) { 397 Node node = getNode(key); 398 if (node != null) { 399 answer[i] = node.value; 400 key = node.nextKey; 401 } 402 } 403 return answer; 404 } 405 catch (IOException e) { 406 throw JMSExceptionHelper.newJMSException("Failed to write to table: " + e, e); 407 } 408 } 409 410 public void rotate() throws JMSException { 411 // TODO could tune this by just swizzling pointers 412 Object obj = removeFirst(); 413 if (obj != null) { 414 addLast(obj); 415 } 416 } 417 418 protected Long createKey(Header header) throws IOException, JMSException { 419 long value = header.lastKeyValue; 420 while (true) { 421 if (value == Long.MAX_VALUE) { 422 value = 1; 423 } 424 else { 425 value++; 426 } 427 Long key = new Long(value); 428 if (getNode(key) == null) { 429 header.lastKeyValue = value; 430 return key; 431 } 432 } 433 } 434 435 protected boolean removeNode(Node node) throws IOException, JMSException { 436 Header header = null; 437 boolean updatedPrevious = false; 438 if (node.previousKey != null) { 439 // lets point the previous node to our next node 440 Node previousNode = getNode(node.previousKey); 441 if (previousNode != null) { 442 previousNode.nextKey = node.nextKey; 443 updateNode(previousNode); 444 updatedPrevious = true; 445 } 446 } 447 if (!updatedPrevious) { 448 // lets update the head record 449 header = getHeader(); 450 header.headKey = node.nextKey; 451 } 452 453 boolean updatedNext = false; 454 if (node.nextKey != null) { 455 // lets point the next node to our previous node 456 Node nextNode = getNode(node.nextKey); 457 if (nextNode != null) { 458 nextNode.previousKey = node.previousKey; 459 updateNode(nextNode); 460 updatedNext = true; 461 } 462 } 463 if (!updatedNext) { 464 // lets update the tail record 465 header = getHeader(); 466 header.tailKey = node.previousKey; 467 } 468 return true; 469 } 470 471 /** 472 * Looks up the header object, lazily creating one if the current table is empty 473 * 474 * @return 475 * @throws java.io.IOException 476 */ 477 protected abstract Header getHeader() throws IOException, JMSException; 478 479 /** 480 * Writes the header back to disk after its been changed 481 * 482 * @param header 483 * @throws java.io.IOException 484 */ 485 protected abstract void updateHeader(Header header) throws IOException, JMSException; 486 487 /** 488 * Updates the node 489 * 490 * @param node 491 * @throws java.io.IOException 492 */ 493 protected abstract void updateNode(Node node) throws IOException, JMSException; 494 495 protected abstract Node getNode(Long key) throws IOException, JMSException; 496 497 protected Node getNode(int index) throws IOException, JMSException { 498 Header header = getHeader(); 499 return getNode(header, index); 500 } 501 502 protected Node getNode(Header header, int index) throws IOException, JMSException { 503 Node node = null; 504 int middle = header.size / 2; 505 if (index > middle) { 506 // lets navigate backwards 507 Long key = header.tailKey; 508 for (int i = header.size; i > index && key != null; i--) { 509 node = getNode(key); 510 if (node != null) { 511 key = node.previousKey; 512 } 513 } 514 } 515 else { 516 Long key = header.headKey; 517 for (int i = 0; i <= index && key != null; i++) { 518 node = getNode(key); 519 if (node != null) { 520 key = node.nextKey; 521 } 522 } 523 } 524 return node; 525 } 526 527 protected Node doAddLast(Object value, Header header) throws IOException, JMSException { 528 Node node = createNode(); 529 Long key = createKey(header); 530 node.key = key; 531 node.value = value; 532 Long previousKey = header.tailKey; 533 node.previousKey = previousKey; 534 updateNode(node); 535 updatePreviousNode(previousKey, key); 536 header.tailKey = key; 537 if (header.headKey == null) { 538 header.headKey = key; 539 } 540 header.size++; 541 updateHeader(header); 542 return node; 543 } 544 545 protected void updateNextNode(Long nextKey, Long key) throws IOException, JMSException { 546 if (nextKey != null) { 547 Node nextNode = getNode(nextKey); 548 if (nextNode == null) { 549 throw new IOException("Missing node for key: " + nextKey); 550 } 551 nextNode.previousKey = key; 552 updateNode(nextNode); 553 } 554 } 555 556 protected void updatePreviousNode(Long previousKey, Long key) throws IOException, JMSException { 557 if (previousKey != null) { 558 Node previousNode = getNode(previousKey); 559 if (previousNode == null) { 560 throw new IOException("Missing previous node for key: " + previousKey); 561 } 562 previousNode.nextKey = key; 563 updateNode(previousNode); 564 } 565 } 566 567 protected Node doAddBefore(Header header, Node nextNode, Object element) throws JMSException, IOException { 568 if (nextNode == null) { 569 return doAddLast(element, header); 570 } 571 else { 572 // lets add before this nextNode 573 Long key = createKey(header); 574 Node node = createNode(); 575 node.value = element; 576 node.key = key; 577 Long previousKey = nextNode.previousKey; 578 node.previousKey = previousKey; 579 node.nextKey = nextNode.key; 580 nextNode.previousKey = key; 581 header.size++; 582 583 updateNode(node); 584 updateNode(nextNode); 585 updatePreviousNode(previousKey, key); 586 updateHeader(header); 587 return node; 588 } 589 } 590 591 protected abstract void doRemoveNode(Node node) throws IOException, JMSException; 592 593 protected static Long wrapLong(long value) { 594 // lets use 0 for null 595 if (value == 0) { 596 return null; 597 } 598 // TODO use a cache? 599 return new Long(value); 600 } 601 602 protected static long unwrapLong(Long key) { 603 if (key != null) { 604 return key.longValue(); 605 } 606 // lets use 0 for null 607 return 0; 608 } 609 610 protected Node createNode() { 611 return new Node(); 612 } 613 }