001 /** 002 * 003 * Copyright 2004 Protique Ltd 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 * 017 **/ 018 019 package org.activemq.io.util; 020 021 import java.util.ArrayList; 022 import java.util.Iterator; 023 import java.util.List; 024 025 import javax.jms.JMSException; 026 027 import org.apache.commons.logging.Log; 028 import org.apache.commons.logging.LogFactory; 029 import org.activemq.service.QueueListEntry; 030 import org.activemq.service.impl.DefaultQueueList; 031 032 import EDU.oswego.cs.dl.util.concurrent.SynchronizedLong; 033 034 /** 035 * MemoryBoundedQueue is a queue bounded by memory usage for MemoryManageable s 036 * 037 * @version $Revision: 1.1.1.1 $ 038 */ 039 public class MemoryBoundedQueue implements MemoryBoundedObject { 040 041 private static final Log log = LogFactory.getLog(MemoryBoundedQueue.class); 042 043 private static final int OBJECT_OVERHEAD = 50; 044 protected static final int WAIT_TIMEOUT = 100; 045 046 private final MemoryBoundedQueueManager manager; 047 private final String name; 048 049 protected final Object outLock = new Object(); 050 protected final Object inLock = new Object(); 051 private final DefaultQueueList internalList = new DefaultQueueList(); 052 protected boolean stopped = false; 053 protected boolean closed = false; 054 private SynchronizedLong memoryUsedByThisQueue = new SynchronizedLong(0); 055 056 /** 057 * Constructor 058 * 059 * @param name 060 * @param manager 061 * @param name 062 */ 063 public MemoryBoundedQueue(MemoryBoundedQueueManager manager, String name) { 064 this.manager = manager; 065 this.name = name; 066 this.manager.add(this); 067 } 068 /** 069 * @return a pretty print of this queue 070 */ 071 public String toString() { 072 return "MemoryBoundedQueue{ size=" + size() + ", memory usage=" + memoryUsedByThisQueue + " }"; 073 } 074 075 /** 076 * @return the number of items held by this queue 077 */ 078 public int size() { 079 return internalList.size(); 080 } 081 082 /** 083 * @return an aproximation the memory used by this queue 084 */ 085 public long getLocalMemoryUsedByThisQueue() { 086 return memoryUsedByThisQueue.get(); 087 } 088 089 /** 090 * close and remove this queue from the MemoryBoundedQueueManager 091 */ 092 public void close() { 093 try { 094 clear(); 095 closed = true; 096 synchronized (outLock) { 097 outLock.notifyAll(); 098 } 099 synchronized (inLock) { 100 inLock.notifyAll(); 101 } 102 } 103 catch (Throwable e) { 104 e.printStackTrace(); 105 } 106 finally { 107 manager.remove(this); 108 } 109 } 110 111 /** 112 * Enqueue a MemoryManageable without checking memory usage limits 113 * 114 * @param packet 115 */ 116 public void enqueueNoBlock(MemoryManageable packet) { 117 if (!closed) { 118 internalList.add(packet); 119 incrementMemoryUsed(packet); 120 synchronized (outLock) { 121 outLock.notify(); 122 } 123 } 124 } 125 126 /** 127 * Enqueue a MemoryManageable to this queue 128 * 129 * @param packet 130 */ 131 public void enqueue(MemoryManageable packet) { 132 if (!manager.isMemoryLimitEnforced() || !manager.isFull()) { 133 enqueueNoBlock(packet); 134 } 135 else { 136 synchronized (inLock) { 137 try { 138 while (manager.isFull() && !closed) { 139 log.warn("Queue is full, waiting for it to be dequeued."); 140 inLock.wait(WAIT_TIMEOUT); 141 } 142 } 143 catch (InterruptedException ie) { 144 } 145 } 146 enqueueNoBlock(packet); 147 } 148 } 149 150 /** 151 * Enqueue a packet to the head of the queue with total disregard for memory constraints 152 * 153 * @param packet 154 */ 155 public void enqueueFirstNoBlock(MemoryManageable packet) { 156 if (!closed) { 157 internalList.addFirst(packet); 158 incrementMemoryUsed(packet); 159 synchronized (outLock) { 160 outLock.notify(); 161 } 162 } 163 } 164 165 /** 166 * Enqueue an array of packets to the head of the queue with total disregard for memory constraints 167 * 168 * @param packets 169 */ 170 public void enqueueAllFirstNoBlock(List packets) { 171 if (!closed) { 172 internalList.addAllFirst(packets); 173 Iterator iterator = packets.iterator(); 174 for (Iterator iter = packets.iterator(); iter.hasNext();) { 175 MemoryManageable packet = (MemoryManageable) iter.next(); 176 incrementMemoryUsed(packet); 177 } 178 synchronized (outLock) { 179 outLock.notify(); 180 } 181 } 182 } 183 184 /** 185 * Enqueue a MemoryManageable to the head of the queue 186 * 187 * @param packet 188 * @throws InterruptedException 189 */ 190 public void enqueueFirst(MemoryManageable packet) throws InterruptedException { 191 if (!manager.isMemoryLimitEnforced() || !manager.isFull()) { 192 enqueueFirstNoBlock(packet); 193 } 194 else { 195 synchronized (inLock) { 196 while (manager.isFull() && !closed) { 197 inLock.wait(WAIT_TIMEOUT); 198 } 199 } 200 enqueueFirstNoBlock(packet); 201 } 202 } 203 204 /** 205 * @return the first dequeued MemoryManageable or blocks until one is available 206 * @throws InterruptedException 207 */ 208 public MemoryManageable dequeue() throws InterruptedException { 209 MemoryManageable result = null; 210 synchronized (outLock) { 211 while (internalList.isEmpty() && !closed) { 212 outLock.wait(WAIT_TIMEOUT); 213 } 214 result = dequeueNoWait(); 215 } 216 return result; 217 } 218 219 /** 220 * Dequeues a MemoryManageable from the head of the queue 221 * 222 * @param timeInMillis time to wait for a MemoryManageable to be available 223 * @return the first MemoryManageable or null if none available within <I>timeInMillis </I> 224 * @throws InterruptedException 225 */ 226 public MemoryManageable dequeue(long timeInMillis) throws InterruptedException { 227 MemoryManageable result = null; 228 if (timeInMillis == 0) { 229 result = dequeue(); 230 } 231 else { 232 synchronized (outLock) { 233 // if timeInMillis is less than zero assume nowait 234 long waitTime = timeInMillis; 235 long start = (timeInMillis <= 0) ? 0 : System.currentTimeMillis(); 236 while (!closed) { 237 result = dequeueNoWait(); 238 if (result != null || waitTime <= 0) { 239 break; 240 } 241 else { 242 outLock.wait(waitTime); 243 waitTime = timeInMillis - (System.currentTimeMillis() - start); 244 } 245 } 246 } 247 } 248 return result; 249 } 250 251 /** 252 * dequeues a MemoryManageable from the head of the queue 253 * 254 * @return the MemoryManageable at the head of the queue or null, if none is available 255 * @throws InterruptedException 256 */ 257 public MemoryManageable dequeueNoWait() throws InterruptedException { 258 MemoryManageable packet = null; 259 synchronized (outLock) { 260 while (stopped && !closed) { 261 outLock.wait(WAIT_TIMEOUT); 262 } 263 } 264 packet = (MemoryManageable) internalList.removeFirst(); 265 decrementMemoryUsed(packet); 266 if (packet != null) { 267 synchronized (inLock) { 268 inLock.notify(); 269 } 270 } 271 return packet; 272 } 273 274 /** 275 * @return true if the queue is enabled for dequeing (default = true) 276 */ 277 public boolean isStarted() { 278 synchronized (outLock) { 279 return stopped == false; 280 } 281 } 282 283 /** 284 * disable dequeueing 285 */ 286 public void stop() { 287 synchronized (outLock) { 288 stopped = true; 289 } 290 } 291 292 /** 293 * enable dequeueing 294 */ 295 public void start() { 296 synchronized (outLock) { 297 stopped = false; 298 outLock.notifyAll(); 299 } 300 synchronized (inLock) { 301 inLock.notifyAll(); 302 } 303 } 304 305 /** 306 * Remove a packet from the queue 307 * 308 * @param packet 309 * @return true if the packet was found 310 */ 311 public boolean remove(MemoryManageable packet) { 312 boolean result = false; 313 if (!internalList.isEmpty()) { 314 result = internalList.remove(packet); 315 } 316 if (result) { 317 decrementMemoryUsed(packet); 318 } 319 synchronized (inLock) { 320 inLock.notify(); 321 } 322 return result; 323 } 324 325 /** 326 * Remove a MemoryManageable by it's id 327 * 328 * @param id 329 * @return 330 */ 331 public MemoryManageable remove(Object id) { 332 MemoryManageable result = null; 333 QueueListEntry entry = internalList.getFirstEntry(); 334 try { 335 while (entry != null) { 336 MemoryManageable p = (MemoryManageable) entry.getElement(); 337 if (p.getMemoryId().equals(id)) { 338 result = p; 339 remove(p); 340 break; 341 } 342 entry = internalList.getNextEntry(entry); 343 } 344 } 345 catch (JMSException jmsEx) { 346 jmsEx.printStackTrace(); 347 } 348 synchronized (inLock) { 349 inLock.notify(); 350 } 351 return result; 352 } 353 354 /** 355 * remove any MemoryManageable s in the queue 356 */ 357 public void clear() { 358 while (!internalList.isEmpty()) { 359 MemoryManageable packet = (MemoryManageable) internalList.removeFirst(); 360 decrementMemoryUsed(packet); 361 } 362 synchronized (inLock) { 363 inLock.notifyAll(); 364 } 365 } 366 367 /** 368 * @return true if the queue is empty 369 */ 370 public boolean isEmpty() { 371 return internalList.isEmpty(); 372 } 373 374 /** 375 * retrieve a MemoryManageable at an indexed position in the queue 376 * 377 * @param index 378 * @return 379 */ 380 public MemoryManageable get(int index) { 381 return (MemoryManageable) internalList.get(index); 382 } 383 384 /** 385 * Retrieve a shallow copy of the contents as a list 386 * 387 * @return a list containing the bounded queue contents 388 */ 389 public List getContents() { 390 Object[] array = internalList.toArray(); 391 List list = new ArrayList(); 392 for (int i = 0; i < array.length; i++) { 393 list.add(array[i]); 394 } 395 return list; 396 } 397 398 protected void incrementMemoryUsed(MemoryManageable packet) { 399 if (packet != null) { 400 int size = OBJECT_OVERHEAD; 401 if (packet != null) { 402 if (packet.incrementMemoryReferenceCount() == 1) { 403 size += packet.getMemoryUsage(); 404 } 405 } 406 memoryUsedByThisQueue.add(size); 407 manager.incrementMemoryUsed(size); 408 } 409 } 410 411 protected void decrementMemoryUsed(MemoryManageable packet) { 412 if (packet != null) { 413 int size = OBJECT_OVERHEAD; 414 if (packet != null) { 415 if ( packet.decrementMemoryReferenceCount() == 0) { 416 size += packet.getMemoryUsage(); 417 } 418 } 419 420 memoryUsedByThisQueue.subtract(size); 421 manager.decrementMemoryUsed(size); 422 } 423 } 424 /** 425 * @return Returns the name. 426 */ 427 public String getName() { 428 return name; 429 } 430 }