001 /** 002 * 003 * Copyright 2004 Protique Ltd 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 * 017 **/ 018 019 package org.activemq.io.util; 020 021 import java.util.ArrayList; 022 import java.util.Iterator; 023 import java.util.List; 024 025 import javax.jms.JMSException; 026 027 import org.apache.commons.logging.Log; 028 import org.apache.commons.logging.LogFactory; 029 import org.activemq.service.QueueListEntry; 030 import org.activemq.service.impl.DefaultQueueList; 031 032 /** 033 * @author Ramzi Saba 034 * 035 * A prioritized version of the MemoryBoundedQueue supporting the 10 JMS priority levels 036 * 0-9, 0 being the lowest and 9 being the highest. 037 * 038 * @version $Revision: 1.1.1.1 $ 039 */ 040 public class MemoryBoundedPrioritizedQueue extends MemoryBoundedQueue { 041 042 private static final Log log = LogFactory.getLog(MemoryBoundedPrioritizedQueue.class); 043 private static final int DEFAULT_PRIORITY = 4; 044 private final DefaultQueueList[] prioritizedPackets = new DefaultQueueList[10]; // array of 10 prioritized queues 045 046 /** 047 * Constructor 048 * 049 * @param name 050 * @param manager 051 * @param name 052 */ 053 public MemoryBoundedPrioritizedQueue(MemoryBoundedQueueManager manager, String name) { 054 super(manager, name); 055 for (int i=0; i<10; ++i) { 056 this.prioritizedPackets[i] = new DefaultQueueList(); 057 } 058 } 059 060 /** 061 * @return the number of items held by this queue 062 */ 063 public int size() { 064 //return internalList.size(); 065 int size=0; 066 for (int j=0; j<10; ++j) { 067 size += prioritizedPackets[j].size(); 068 } 069 return size; 070 } 071 072 /** 073 * Enqueue a MemoryManageable without checking memory usage limits 074 * 075 * @param packet 076 */ 077 public void enqueueNoBlock(MemoryManageable packet) { 078 if (!closed) { 079 //internalList.add(packet); 080 prioritizedPackets[getPacketPriority(packet)].add(packet); 081 incrementMemoryUsed(packet); 082 synchronized (outLock) { 083 outLock.notify(); 084 } 085 } 086 } 087 088 /** 089 * Enqueue a packet to the head of the queue with total disregard for memory constraints 090 * 091 * @param packet 092 */ 093 public final void enqueueFirstNoBlock(MemoryManageable packet) { 094 if (!closed) { 095 //internalList.addFirst(packet); 096 prioritizedPackets[getPacketPriority(packet)].addFirst(packet); 097 incrementMemoryUsed(packet); 098 synchronized (outLock) { 099 outLock.notify(); 100 } 101 } 102 } 103 104 /** 105 * Enqueue an array of packets to the head of the queue with total disregard for memory constraints 106 * 107 * @param packets 108 */ 109 public void enqueueAllFirstNoBlock(List packets) { 110 if (!closed) { 111 //internalList.addAllFirst(packets); 112 Iterator iterator = packets.iterator(); 113 for (Iterator iter = packets.iterator(); iter.hasNext();) { 114 MemoryManageable packet = (MemoryManageable) iter.next(); 115 prioritizedPackets[getPacketPriority(packet)].addFirst(packet); 116 incrementMemoryUsed(packet); 117 } 118 synchronized (outLock) { 119 outLock.notify(); 120 } 121 } 122 } 123 124 /** 125 * @return the first dequeued MemoryManageable or blocks until one is available 126 * @throws InterruptedException 127 */ 128 public MemoryManageable dequeue() throws InterruptedException { 129 MemoryManageable result = null; 130 synchronized (outLock) { 131 //while (internalList.isEmpty() && !closed) { 132 while (isEmpty() && !closed) { 133 outLock.wait(WAIT_TIMEOUT); 134 } 135 result = dequeueNoWait(); 136 } 137 return result; 138 } 139 140 /** 141 * dequeues a MemoryManageable from the head of the queue 142 * 143 * @return the MemoryManageable at the head of the queue or null, if none is available 144 * @throws InterruptedException 145 */ 146 public MemoryManageable dequeueNoWait() throws InterruptedException { 147 MemoryManageable packet = null; 148 synchronized (outLock) { 149 while (stopped && !closed) { 150 outLock.wait(WAIT_TIMEOUT); 151 } 152 } 153 //packet = (MemoryManageable) internalList.removeFirst(); 154 for (int i=9; i>=0; --i) { 155 packet = (MemoryManageable) prioritizedPackets[i].removeFirst(); 156 if (packet != null) break; 157 } 158 decrementMemoryUsed(packet); 159 if (packet != null) { 160 synchronized (inLock) { 161 inLock.notify(); 162 } 163 } 164 return packet; 165 } 166 167 /** 168 * Remove a packet from the queue 169 * 170 * @param packet 171 * @return true if the packet was found 172 */ 173 public boolean remove(MemoryManageable packet) { 174 boolean result = false; 175 //if (!internalList.isEmpty()) { 176 if (!isEmpty()) { 177 //result = internalList.remove(packet); 178 result = prioritizedPackets[getPacketPriority(packet)].remove(packet); 179 } 180 if (result) { 181 decrementMemoryUsed(packet); 182 } 183 synchronized (inLock) { 184 inLock.notify(); 185 } 186 return result; 187 } 188 189 /** 190 * Remove a MemoryManageable by it's id 191 * 192 * @param id 193 * @return 194 */ 195 public MemoryManageable remove(Object id) { 196 MemoryManageable result = null; 197 for (int i=0; i<10; ++i) { 198 //QueueListEntry entry = internalList.getFirstEntry(); 199 QueueListEntry entry = prioritizedPackets[i].getFirstEntry(); 200 try { 201 while (entry != null) { 202 MemoryManageable p = (MemoryManageable) entry.getElement(); 203 if (p.getMemoryId().equals(id)) { 204 result = p; 205 remove(p); 206 break; 207 } 208 //entry = internalList.getNextEntry(entry); 209 entry = prioritizedPackets[i].getNextEntry(entry); 210 } 211 } 212 catch (JMSException jmsEx) { 213 jmsEx.printStackTrace(); 214 } 215 } 216 synchronized (inLock) { 217 inLock.notify(); 218 } 219 return result; 220 } 221 222 /** 223 * remove any MemoryManageables in the queue 224 */ 225 public void clear() { 226 //while (!internalList.isEmpty()) { 227 for (int i=0; i<10; ++i) { 228 while (!prioritizedPackets[i].isEmpty()) { 229 //MemoryManageable packet = (MemoryManageable) internalList.removeFirst(); 230 MemoryManageable packet = (MemoryManageable) prioritizedPackets[i].removeFirst(); 231 decrementMemoryUsed(packet); 232 } 233 } 234 synchronized (inLock) { 235 inLock.notifyAll(); 236 } 237 } 238 239 /** 240 * @return true if the queue is empty 241 */ 242 public boolean isEmpty() { 243 //return internalList.isEmpty(); 244 for (int i=0; i<10; ++i) { 245 if (!prioritizedPackets[i].isEmpty()) return false; 246 } 247 return true; 248 } 249 250 /** 251 * retrieve a MemoryManageable at an indexed position in the queue 252 * 253 * @param index 254 * @return 255 */ 256 public MemoryManageable get(int index) { 257 //return (MemoryManageable) internalList.get(index); 258 throw new UnsupportedOperationException("Cannot invoke this method on a MemoryBoundedPrioritizedQueue instance"); 259 } 260 261 /** 262 * Retrieve a shallow copy of the contents as a list 263 * 264 * @return a list containing the bounded queue contents 265 */ 266 public List getContents() { 267 //Object[] array = internalList.toArray(); 268 List list = new ArrayList(); 269 for (int j=9; j>=0; --j) { 270 Object[] array = prioritizedPackets[j].toArray(); 271 for (int i = 0; i < array.length; i++) { 272 list.add(array[i]); 273 } 274 } 275 return list; 276 } 277 278 private int getPacketPriority(MemoryManageable packet) { 279 int priority=DEFAULT_PRIORITY; 280 if (packet.getPriority()>=0 || packet.getPriority()<=9) { 281 priority = packet.getPriority(); 282 } 283 return priority; 284 } 285 }