001    /** 
002     * 
003     * Copyright 2004 Protique Ltd
004     * 
005     * Licensed under the Apache License, Version 2.0 (the "License"); 
006     * you may not use this file except in compliance with the License. 
007     * You may obtain a copy of the License at 
008     * 
009     * http://www.apache.org/licenses/LICENSE-2.0
010     * 
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS, 
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
014     * See the License for the specific language governing permissions and 
015     * limitations under the License. 
016     * 
017     **/
018    
019    package org.activemq.io.util;
020    import java.io.File;
021    import java.io.IOException;
022    import java.util.List;
023    import javax.jms.JMSException;
024    import org.apache.commons.logging.Log;
025    import org.apache.commons.logging.LogFactory;
026    import org.activemq.io.WireFormat;
027    import org.activemq.io.impl.DefaultWireFormat;
028    import org.activemq.message.ActiveMQMessage;
029    import EDU.oswego.cs.dl.util.concurrent.SynchronizedInt;
030    
031    /**
032     * Implements a controlled thread safe queue, with ActiveMQMessages being spooled to disk for reading asynchronously.
033     */
034    public class SpooledBoundedActiveMQMessageQueue {
035        private String name;
036        private DataContainer container;
037        private WireFormat wireFormat;
038        private long maxDataLength;
039        private boolean closed;
040        private boolean stopped;
041        private SynchronizedInt size = new SynchronizedInt(0);
042        private Object inLock = new Object();
043        private Object outLock = new Object();
044        private static int WAIT_TIMEOUT = 250;
045        private static final Log log = LogFactory.getLog(SpooledBoundedActiveMQMessageQueue.class);
046    
047        /**
048         * Constructor for SpooledBoundedActiveMQMessageQueue
049         * 
050         * @param dir
051         * @param name
052         * @param maxDataLength
053         * @param maxBlockSize
054         * @throws IOException
055         */
056        public SpooledBoundedActiveMQMessageQueue(File dir, String name, long maxDataLength, int maxBlockSize) throws IOException {
057            //ensure name can be used as a file name
058            char[] chars = name.toCharArray();
059            for (int i = 0;i < chars.length;i++) {
060                if (!Character.isLetterOrDigit(chars[i])) {
061                    chars[i] = '_';
062                }
063            }
064            this.name = new String(chars);
065            this.maxDataLength = maxDataLength;
066            this.wireFormat = new DefaultWireFormat();
067            this.container = new DataContainer(dir, this.name, maxBlockSize);
068            //as the DataContainer is temporary, clean-up any old files
069            this.container.deleteAll();
070        }
071    
072        /**
073         * Constructor for SpooledBoundedActiveMQMessageQueue
074         * 
075         * @param dir
076         * @param name
077         * @throws IOException
078         */
079        public SpooledBoundedActiveMQMessageQueue(File dir, String name) throws IOException {
080            this(dir, name, 1024 * 1024 * 64, 8192);
081        }
082    
083        /**
084         * Place a ActiveMQMessage at the head of the Queue
085         * 
086         * @param packet
087         * @throws JMSException
088         */
089        public void enqueue(ActiveMQMessage packet) throws JMSException {
090            if (!isFull()) {
091                enqueueNoBlock(packet);
092            }
093            else {
094                synchronized (inLock) {
095                    try {
096                        while (isFull()) {
097                            inLock.wait(WAIT_TIMEOUT);
098                        }
099                    }
100                    catch (InterruptedException ie) {
101                    }
102                }
103                enqueueNoBlock(packet);
104            }
105        }
106    
107        /**
108         * Enqueue a ActiveMQMessage without checking usage limits
109         * 
110         * @param packet
111         * @throws JMSException
112         */
113        public void enqueueNoBlock(ActiveMQMessage packet) throws JMSException {
114            byte[] data;
115            try {
116                data = wireFormat.toBytes(packet);
117                size.increment();
118                container.write(data);
119            }
120            catch (IOException e) {
121                JMSException jmsEx = new JMSException("enqueNoBlock failed: " + e.getMessage());
122                jmsEx.setLinkedException(e);
123                throw jmsEx;
124            }
125            synchronized (outLock) {
126                outLock.notify();
127            }
128        }
129    
130        /**
131         * @return the first dequeued ActiveMQMessage or blocks until one is available
132         * @throws JMSException
133         * @throws InterruptedException
134         */
135        public ActiveMQMessage dequeue() throws JMSException, InterruptedException {
136            ActiveMQMessage result = null;
137            synchronized (outLock) {
138                while ((result = dequeueNoWait()) == null) {
139                    outLock.wait(WAIT_TIMEOUT);
140                }
141            }
142            return result;
143        }
144    
145        /**
146         * @return the ActiveMQMessage from the head of the Queue or null if the Queue is empty
147         * @param timeInMillis maximum time to wait to dequeue a ActiveMQMessage
148         * @throws JMSException
149         * @throws InterruptedException
150         */
151        public ActiveMQMessage dequeue(long timeInMillis) throws JMSException, InterruptedException {
152            ActiveMQMessage result = dequeueNoWait();
153            if (result == null) {
154                synchronized (outLock) {
155                    outLock.wait(timeInMillis);
156                    result = dequeueNoWait();
157                }
158            }
159            return result;
160        }
161    
162        /**
163         * @return the ActiveMQMessage from the head of the Queue or null if the Queue is empty
164         * @throws JMSException
165         * @throws InterruptedException
166         */
167        public ActiveMQMessage dequeueNoWait() throws JMSException, InterruptedException {
168            ActiveMQMessage result = null;
169            if (stopped) {
170                synchronized (outLock) {
171                    while (stopped && !closed) {
172                        outLock.wait(WAIT_TIMEOUT);
173                    }
174                }
175            }
176            byte[] data;
177            try {
178                data = container.read();
179                if (data != null) {
180                    result = (ActiveMQMessage)wireFormat.fromBytes(data);
181                    size.decrement();
182                }
183            }
184            catch (IOException e) {
185                JMSException jmsEx = new JMSException("fromBytes failed");
186                jmsEx.setLinkedException(e);
187                jmsEx.initCause(e);
188                throw jmsEx;
189            }
190            if (result != null && !isFull()) {
191                synchronized (inLock) {
192                    inLock.notify();
193                }
194            }
195            return result;
196        }
197    
198        /**
199         * @return true if this queue has reached it's data length limit
200         */
201        public boolean isFull() {
202            return container.length() >= maxDataLength;
203        }
204    
205        /**
206         * close this queue
207         */
208        public void close() {
209            try {
210                closed = true;
211                container.close();
212            }
213            catch (IOException ioe) {
214                log.warn("Couldn't close queue", ioe);
215            }
216        }
217    
218        /**
219         * @return the name of this BoundedActiveMQMessageQueue
220         */
221        public String getName() {
222            return name;
223        }
224    
225        /**
226         * @return number of ActiveMQMessages held by this queue
227         */
228        public int size() {
229            return size.get();
230        }
231    
232        /**
233         * @return true if the queue is enabled for dequeing (default = true)
234         */
235        public boolean isStarted() {
236            return stopped == false;
237        }
238    
239        /**
240         * disable dequeueing
241         */
242        public void stop() {
243            synchronized (outLock) {
244                stopped = true;
245            }
246        }
247    
248        /**
249         * enable dequeueing
250         */
251        public void start() {
252            stopped = false;
253            synchronized (outLock) {
254                outLock.notifyAll();
255            }
256            synchronized (inLock) {
257                inLock.notifyAll();
258            }
259        }
260    
261        /**
262         * @return true if this queue is empty
263         */
264        public boolean isEmpty() {
265            return size.get() == 0;
266        }
267    
268        /**
269         * clear the queue
270         */
271        public void clear() {
272        }
273    
274        /**
275         * @return a copy of the contents
276         */
277        public List getContents() {
278            return null;
279        }
280    }