001 /** 002 * 003 * Copyright 2004 Protique Ltd 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 * 017 **/ 018 019 package org.activemq.tool; 020 021 import java.io.IOException; 022 import java.io.PrintWriter; 023 import java.util.ArrayList; 024 import java.util.Collections; 025 import java.util.Iterator; 026 import java.util.List; 027 import java.util.Random; 028 029 import javax.jms.BytesMessage; 030 import javax.jms.Connection; 031 import javax.jms.DeliveryMode; 032 import javax.jms.Destination; 033 import javax.jms.JMSException; 034 import javax.jms.Message; 035 import javax.jms.MessageConsumer; 036 import javax.jms.MessageProducer; 037 import javax.jms.Session; 038 039 import junit.framework.TestCase; 040 041 import org.activemq.ActiveMQConnectionFactory; 042 import org.activemq.message.ActiveMQQueue; 043 044 import EDU.oswego.cs.dl.util.concurrent.Latch; 045 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean; 046 import EDU.oswego.cs.dl.util.concurrent.WaitableInt; 047 048 /** 049 * @version $Revision$ 050 */ 051 public class AcidTestTool extends TestCase { 052 053 private Random random = new Random(); 054 private byte data[]; 055 private int workerCount = 10; 056 private PrintWriter statWriter; 057 058 // Worker configuration. 059 protected int recordSize = 1024; 060 protected int batchSize = 5; 061 protected int workerThinkTime = 500; 062 SynchronizedBoolean ignoreJMSErrors = new SynchronizedBoolean(false); 063 064 protected Destination target; 065 private ActiveMQConnectionFactory factory; 066 private Connection connection; 067 068 WaitableInt publishedBatches = new WaitableInt(0); 069 WaitableInt consumedBatches = new WaitableInt(0); 070 071 List errors = Collections.synchronizedList(new ArrayList()); 072 073 private interface Worker extends Runnable { 074 public boolean waitForExit(long i) throws InterruptedException; 075 } 076 077 private final class ProducerWorker implements Worker { 078 079 Session session; 080 private MessageProducer producer; 081 private BytesMessage message; 082 Latch doneLatch = new Latch(); 083 private final String workerId; 084 085 ProducerWorker(Session session, String workerId) throws JMSException { 086 this.session = session; 087 this.workerId = workerId; 088 producer = session.createProducer(target); 089 producer.setDeliveryMode(DeliveryMode.PERSISTENT); 090 message = session.createBytesMessage(); 091 message.setStringProperty("workerId", workerId); 092 message.writeBytes(data); 093 } 094 095 public void run() { 096 try { 097 for( int batchId=0; true; batchId++ ) { 098 // System.out.println("Sending batch: "+workerId+" "+batchId); 099 for( int msgId=0; msgId < batchSize; msgId++ ) { 100 // Sleep some random amount of time less than workerThinkTime 101 try { 102 Thread.sleep(random.nextInt(workerThinkTime)); 103 } catch (InterruptedException e1) { 104 return; 105 } 106 107 message.setIntProperty("batch-id",batchId); 108 message.setIntProperty("msg-id",msgId); 109 110 111 producer.send(message); 112 } 113 session.commit(); 114 publishedBatches.increment(); 115 // System.out.println("Commited send batch: "+workerId+" "+batchId); 116 } 117 } catch (JMSException e) { 118 if( !ignoreJMSErrors.get() ) { 119 e.printStackTrace(); 120 errors.add(e); 121 } 122 return; 123 } catch (Throwable e) { 124 e.printStackTrace(); 125 errors.add(e); 126 return; 127 } finally { 128 System.out.println("Producer exiting."); 129 doneLatch.release(); 130 } 131 } 132 133 public boolean waitForExit(long i) throws InterruptedException { 134 return doneLatch.attempt(i); 135 } 136 } 137 138 private final class ConsumerWorker implements Worker { 139 140 Session session; 141 private MessageConsumer consumer; 142 private final long timeout; 143 Latch doneLatch = new Latch(); 144 private final String workerId; 145 146 ConsumerWorker(Session session, String workerId, long timeout) throws JMSException { 147 this.session = session; 148 this.workerId = workerId; 149 this.timeout = timeout; 150 consumer = session.createConsumer(target,"workerId='"+workerId+"'"); 151 } 152 153 public void run() { 154 155 try { 156 int batchId=0; 157 while( true ) { 158 for( int msgId=0; msgId < batchSize; msgId++ ) { 159 160 // Sleep some random amount of time less than workerThinkTime 161 try { 162 Thread.sleep(random.nextInt(workerThinkTime)); 163 } catch (InterruptedException e1) { 164 return; 165 } 166 167 Message message = consumer.receive(timeout); 168 if( msgId > 0 ) { 169 assertNotNull(message); 170 assertEquals(message.getIntProperty("batch-id"), batchId); 171 assertEquals(message.getIntProperty("msg-id"), msgId); 172 } else { 173 if( message==null ) { 174 System.out.println("At end of batch an don't have a next batch to process. done."); 175 return; 176 } 177 assertEquals(msgId, message.getIntProperty("msg-id") ); 178 batchId = message.getIntProperty("batch-id"); 179 // System.out.println("Receiving batch: "+workerId+" "+batchId); 180 } 181 182 } 183 session.commit(); 184 consumedBatches.increment(); 185 // System.out.println("Commited receive batch: "+workerId+" "+batchId); 186 } 187 } catch (JMSException e) { 188 if( !ignoreJMSErrors.get() ) { 189 e.printStackTrace(); 190 errors.add(e); 191 } 192 return; 193 } catch (Throwable e) { 194 e.printStackTrace(); 195 errors.add(e); 196 return; 197 } finally { 198 System.out.println("Consumer exiting."); 199 doneLatch.release(); 200 } 201 } 202 203 public boolean waitForExit(long i) throws InterruptedException { 204 return doneLatch.attempt(i); 205 } 206 } 207 208 /** 209 * @see junit.framework.TestCase#setUp() 210 */ 211 protected void setUp() throws Exception { 212 factory = new ActiveMQConnectionFactory("tcp://localhost:61616"); 213 this.target = new ActiveMQQueue(getClass().getName()); 214 } 215 216 protected void tearDown() throws Exception { 217 if( connection!=null ) { 218 try { connection.close(); } catch (Throwable ignore) {} 219 connection = null; 220 } 221 } 222 223 /** 224 * @throws InterruptedException 225 * @throws JMSException 226 * @throws JMSException 227 * 228 */ 229 private void reconnect() throws InterruptedException, JMSException { 230 if( connection!=null ) { 231 try { connection.close(); } catch (Throwable ignore) {} 232 connection = null; 233 } 234 235 long reconnectDelay=1000; 236 JMSException lastError=null; 237 238 while( connection == null) { 239 if( reconnectDelay > 1000*10 ) { 240 reconnectDelay = 1000*10; 241 } 242 try { 243 connection = factory.createConnection(); 244 connection.start(); 245 } catch (JMSException e) { 246 lastError = e; 247 Thread.sleep(reconnectDelay); 248 reconnectDelay*=2; 249 } 250 } 251 } 252 253 /** 254 * @throws Throwable 255 * @throws IOException 256 * 257 */ 258 public void testAcidTransactions() throws Throwable { 259 260 System.out.println("Client threads write records using: Record Size: " + recordSize + ", Batch Size: " 261 + batchSize + ", Worker Think Time: " + workerThinkTime); 262 263 // Create the record and fill it with some values. 264 data = new byte[recordSize]; 265 for (int i = 0; i < data.length; i++) { 266 data[i] = (byte) i; 267 } 268 269 System.out.println("=============================================="); 270 System.out.println("===> Start the server now."); 271 System.out.println("=============================================="); 272 reconnect(); 273 274 System.out.println("Starting " + workerCount + " Workers..."); 275 ArrayList workers = new ArrayList(); 276 for( int i=0; i< workerCount; i++ ){ 277 String workerId = "worker-"+i; 278 279 Worker w = new ConsumerWorker(connection.createSession(true,Session.SESSION_TRANSACTED), workerId, 1000*5); 280 workers.add(w); 281 new Thread(w,"Consumer:"+workerId).start(); 282 283 w = new ProducerWorker(connection.createSession(true,Session.SESSION_TRANSACTED), workerId); 284 workers.add(w); 285 new Thread(w,"Producer:"+workerId).start(); 286 } 287 288 System.out.println("Waiting for "+(workerCount*10)+" batches to be delivered."); 289 290 // 291 // Wait for about 5 batches of messages per worker to be consumed before restart. 292 // 293 while( publishedBatches.get() < workerCount*5) { 294 System.out.println("Stats: Produced Batches: "+this.publishedBatches.get()+", Consumed Batches: "+this.consumedBatches.get()); 295 Thread.sleep(1000); 296 } 297 298 System.out.println("=============================================="); 299 System.out.println("===> Server is under load now. Kill it!"); 300 System.out.println("=============================================="); 301 ignoreJMSErrors.set(true); 302 303 // Wait for all the workers to finish. 304 System.out.println("Waiting for all workers to exit due to server shutdown."); 305 for (Iterator iter = workers.iterator(); iter.hasNext();) { 306 Worker worker = (Worker) iter.next(); 307 while( !worker.waitForExit(1000) ) { 308 System.out.println("=============================================="); 309 System.out.println("===> Server is under load now. Kill it!"); 310 System.out.println("=============================================="); 311 System.out.println("Stats: Produced Batches: "+this.publishedBatches.get()+", Consumed Batches: "+this.consumedBatches.get()); 312 } 313 } 314 workers.clear(); 315 316 // No errors should have occured so far. 317 if( errors.size()>0 ) 318 throw (Throwable) errors.get(0); 319 320 System.out.println("=============================================="); 321 System.out.println("===> Start the server now."); 322 System.out.println("=============================================="); 323 reconnect(); 324 325 System.out.println("Restarted."); 326 327 // Validate the all transactions were commited as a uow. Looking for partial commits. 328 for( int i=0; i< workerCount; i++ ){ 329 String workerId = "worker-"+i; 330 Worker w = new ConsumerWorker(connection.createSession(true,Session.SESSION_TRANSACTED), workerId, 5*1000); 331 workers.add(w); 332 new Thread(w, "Consumer:"+workerId).start(); 333 } 334 335 System.out.println("Waiting for restarted consumers to finish consuming all messages.."); 336 for (Iterator iter = workers.iterator(); iter.hasNext();) { 337 Worker worker = (Worker) iter.next(); 338 while( !worker.waitForExit(1000*5) ) { 339 System.out.println("Waiting for restarted consumers to finish consuming all messages.."); 340 System.out.println("Stats: Produced Batches: "+this.publishedBatches.get()+", Consumed Batches: "+this.consumedBatches.get()); 341 } 342 } 343 workers.clear(); 344 345 System.out.println("Workers finished.."); 346 System.out.println("Stats: Produced Batches: "+this.publishedBatches.get()+", Consumed Batches: "+this.consumedBatches.get()); 347 348 if( errors.size()>0 ) 349 throw (Throwable) errors.get(0); 350 351 } 352 353 public static void main(String[] args) { 354 try { 355 AcidTestTool tool = new AcidTestTool(); 356 tool.setUp(); 357 tool.testAcidTransactions(); 358 tool.tearDown(); 359 } catch (Throwable e) { 360 System.out.println("Test Failed: "+e.getMessage()); 361 e.printStackTrace(); 362 } 363 } 364 }