001    /**
002     *
003     * Copyright 2004 Protique Ltd
004     *
005     * Licensed under the Apache License, Version 2.0 (the "License");
006     * you may not use this file except in compliance with the License.
007     * You may obtain a copy of the License at
008     *
009     * http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     *
017     **/
018    package org.activemq.benchmark;
019    
020    import javax.jms.DeliveryMode;
021    import javax.jms.Destination;
022    import javax.jms.JMSException;
023    import javax.jms.Message;
024    import javax.jms.MessageProducer;
025    import javax.jms.Session;
026    import java.io.BufferedReader;
027    import java.io.File;
028    import java.io.FileReader;
029    import java.io.IOException;
030    
031    /**
032     * @author James Strachan
033     * @version $Revision$
034     */
035    public class Producer extends BenchmarkSupport {
036    
037        int loops = -1;
038        int loopSize = 1000;
039        private int messageSize = 1000;
040    
041        public static void main(String[] args) {
042            Producer tool = new Producer();
043            if (args.length > 0) {
044                tool.setUrl(args[0]);
045            }
046            if (args.length > 1) {
047                tool.setTopic(parseBoolean(args[1]));
048            }
049            if (args.length > 2) {
050                tool.setSubject(args[2]);
051            }
052            if (args.length > 3) {
053                tool.setDurable(parseBoolean(args[3]));
054            }
055            if (args.length > 4) {
056                tool.setMessageSize(Integer.parseInt(args[4]));
057            }
058            if (args.length > 5) {
059                tool.setConnectionCount(Integer.parseInt(args[5]));
060            }
061            try {
062                tool.run();
063            }
064            catch (Exception e) {
065                System.out.println("Caught: " + e);
066                e.printStackTrace();
067            }
068        }
069    
070        public Producer() {
071        }
072    
073        public void run() throws Exception {
074            start();
075            publish();
076        }
077    
078        // Properties
079        //-------------------------------------------------------------------------
080        public int getMessageSize() {
081            return messageSize;
082        }
083    
084        public void setMessageSize(int messageSize) {
085            this.messageSize = messageSize;
086        }
087    
088        public int getLoopSize() {
089            return loopSize;
090        }
091    
092        public void setLoopSize(int loopSize) {
093            this.loopSize = loopSize;
094        }
095    
096        // Implementation methods
097        //-------------------------------------------------------------------------
098    
099        protected void publish() throws Exception {
100            final String text = getMessage();
101    
102            System.out.println("Publishing to: " + subjects.length + " subject(s)");
103    
104            for (int i = 0; i < subjects.length; i++) {
105                final String subject = subjects[i];
106                Thread thread = new Thread() {
107                    public void run() {
108                        try {
109                            publish(text, subject);
110                        }
111                        catch (JMSException e) {
112                            System.out.println("Caught: " + e);
113                            e.printStackTrace();
114                        }
115                    }
116                };
117                thread.start();
118            }
119    
120        }
121    
122        protected String getMessage() {
123            StringBuffer buffer = new StringBuffer();
124            for (int i = 0; i < messageSize; i++) {
125                char ch = 'X';
126                buffer.append(ch);
127            }
128            return buffer.toString();
129        }
130    
131        protected void publish(String text, String subject) throws JMSException {
132            Session session = createSession();
133    
134            Destination destination = createDestination(session, subject);
135    
136            MessageProducer publisher = session.createProducer(destination);
137            if (isDurable()) {
138                publisher.setDeliveryMode(DeliveryMode.PERSISTENT);
139            }
140            else {
141                publisher.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
142            }
143    
144            System.out.println("Starting publisher on : " + destination + " of type: " + destination.getClass().getName());
145            System.out.println("Message length: " + text.length());
146    
147            if (loops <= 0) {
148                while (true) {
149                    publishLoop(session, publisher, text);
150                }
151            }
152            else {
153                for (int i = 0; i < loops; i++) {
154                    publishLoop(session, publisher, text);
155                }
156            }
157        }
158    
159        protected void publishLoop(Session session, MessageProducer publisher, String text) throws JMSException {
160            for (int i = 0; i < loopSize; i++) {
161                Message message = session.createTextMessage(text);
162    
163                publisher.send(message);
164                count(1);
165            }
166        }
167    
168        protected String loadFile(String file) throws IOException {
169            System.out.println("Loading file: " + file);
170    
171            StringBuffer buffer = new StringBuffer();
172            BufferedReader in = new BufferedReader(new FileReader(file));
173            while (true) {
174                String line = in.readLine();
175                if (line == null) {
176                    break;
177                }
178                buffer.append(line);
179                buffer.append(File.separator);
180            }
181            return buffer.toString();
182        }
183    }