001 /** 002 * 003 * Copyright 2004 Hiram Chirino 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 * 017 **/ 018 package org.activemq.io.util; 019 020 import java.util.HashMap; 021 import java.util.Iterator; 022 023 import org.activemq.message.ActiveMQMessage; 024 import org.activemq.service.QueueListEntry; 025 import org.activemq.service.impl.DefaultQueueList; 026 import org.activemq.store.cache.MessageCache; 027 028 /** 029 * A simple cache that stores messages in memory. Cache entries are evicted 030 * when the memoryManager starts to run short on memory (A LRU cache is used). 031 * 032 * @version $Revision: 1.1.1.1 $ 033 */ 034 public class MemoryBoundedMessageCache implements MessageCache, MemoryBoundedObject { 035 036 private static final int OBJECT_OVERHEAD = 50; 037 038 private final MemoryBoundedObjectManager memoryManager; 039 040 /** msgId -> LRUNode */ 041 private final HashMap messages = new HashMap(); 042 /** Ordered list of messageIds recently used at the front */ 043 private final DefaultQueueList lruList = new DefaultQueueList(); 044 045 private int memoryUsedByThisCache; 046 private float growthLimit = 0.75f; 047 private boolean closed; 048 049 /** Used associate the a Message to it's QueueListEntry in the lruList */ 050 private static class CacheNode { 051 ActiveMQMessage message; 052 QueueListEntry entry; 053 } 054 055 public MemoryBoundedMessageCache(MemoryBoundedObjectManager memoryManager) { 056 this.memoryManager = memoryManager; 057 this.memoryManager.add(this); 058 } 059 060 /** 061 * Gets a message that was previously <code>put</code> into this object. 062 * 063 * @param msgid 064 * @return null if the message was not previously put or if the message has expired out of the cache. 065 */ 066 synchronized public ActiveMQMessage get(String msgid) { 067 CacheNode rc = (CacheNode) messages.get(msgid); 068 if( rc != null ) { 069 // Move to front (the recently used part of the list). 070 lruList.remove(rc.entry); 071 rc.entry = lruList.addFirst(msgid); 072 return rc.message; 073 } 074 return null; 075 } 076 077 /** 078 * Puts a message into the cache. 079 * 080 * @param messageID 081 * @param message 082 */ 083 synchronized public void put(String messageID, ActiveMQMessage message) { 084 085 // Drop old messages until there is space. 086 while( isFull() && !messages.isEmpty() ) { 087 removeOldest(); 088 } 089 090 if( !isFull() ) { 091 incrementMemoryUsed(message); 092 CacheNode newNode = new CacheNode(); 093 newNode.message = message; 094 newNode.entry = lruList.addFirst(messageID); 095 CacheNode oldNode = (CacheNode) messages.put(messageID, newNode); 096 if( oldNode !=null ) { 097 lruList.remove(oldNode); 098 decrementMemoryUsed(oldNode.message); 099 } 100 } 101 } 102 103 private void removeOldest() { 104 String messageID = (String) lruList.removeLast(); 105 CacheNode node = (CacheNode) messages.remove(messageID); 106 decrementMemoryUsed(node.message); 107 } 108 109 private boolean isFull() { 110 return memoryManager.getPercentFull() > growthLimit; 111 } 112 113 /** 114 * Remvoes a message from the cache. 115 * 116 * @param messageID 117 */ 118 synchronized public void remove(String messageID) { 119 CacheNode node = (CacheNode) messages.remove(messageID); 120 if( node !=null ) { 121 lruList.remove(node.entry); 122 decrementMemoryUsed(node.message); 123 } 124 } 125 126 private void incrementMemoryUsed(ActiveMQMessage packet) { 127 if (packet != null) { 128 int size = OBJECT_OVERHEAD; 129 if (packet != null) { 130 if (packet.incrementMemoryReferenceCount() == 1) { 131 size += packet.getMemoryUsage(); 132 } 133 } 134 synchronized( this ) { 135 memoryUsedByThisCache += size; 136 } 137 memoryManager.incrementMemoryUsed(size); 138 } 139 } 140 141 private void decrementMemoryUsed(ActiveMQMessage packet) { 142 if (packet != null) { 143 int size = OBJECT_OVERHEAD; 144 if (packet != null) { 145 if (packet.decrementMemoryReferenceCount() == 0) { 146 size += packet.getMemoryUsage(); 147 } 148 } 149 150 synchronized( this ) { 151 memoryUsedByThisCache -= size; 152 } 153 memoryManager.decrementMemoryUsed(size); 154 } 155 } 156 157 /** 158 * @return returns the percentage of memory usage at which that cache will stop to grow. 159 */ 160 public float getGrowthLimit() { 161 return growthLimit; 162 } 163 164 /** 165 * @param growTillFence the percentage of memory usage at which that cache will stop to grow. 166 */ 167 public void setGrowthLimit(float growTillFence) { 168 this.growthLimit = growTillFence; 169 } 170 171 synchronized public void close() { 172 if( closed ) 173 return; 174 closed=true; 175 176 for (Iterator iter = messages.values().iterator(); iter.hasNext();) { 177 CacheNode node = (CacheNode) iter.next(); 178 decrementMemoryUsed(node.message); 179 } 180 messages.clear(); 181 lruList.clear(); 182 183 memoryManager.remove(this); 184 } 185 }