1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package net.sourceforge.quexec.testutil;
20
21 import java.util.concurrent.atomic.AtomicInteger;
22
23 import javax.jms.Destination;
24 import javax.jms.JMSException;
25 import javax.jms.Queue;
26 import javax.jms.QueueBrowser;
27 import javax.jms.Session;
28 import javax.naming.InitialContext;
29 import javax.naming.NamingException;
30
31 import org.apache.activemq.broker.BrokerService;
32 import org.apache.activemq.broker.TransportConnector;
33 import org.apache.commons.logging.Log;
34 import org.apache.commons.logging.LogFactory;
35 import org.springframework.jms.core.BrowserCallback;
36 import org.springframework.jms.core.JmsTemplate;
37 import org.springframework.jms.core.SessionCallback;
38
39
40
41
42
43
44
45
46
47 public abstract class JmsTestUtils {
48
49 private static final Log log = LogFactory.getLog(JmsTestUtils.class);
50
51 private static BrokerService broker;
52 private static TransportConnector tcpConnector;
53
54 private static final AtomicInteger dynamicQueueCount = new AtomicInteger(0);
55
56 private JmsTestUtils() {
57
58 }
59
60 public static void setUpEmbeddedJMSService() throws Exception {
61 log.info("starting ActiveMQ broker");
62
63 broker = new BrokerService();
64
65 broker.setBrokerName("testjms");
66 broker.setPersistent(false);
67 broker.setUseJmx(false);
68 tcpConnector = broker.addConnector("tcp://localhost:61616");
69
70 broker.deleteAllMessages();
71
72 broker.start();
73
74 log.info("ActiveMQ broker started");
75
76 }
77
78 public static void tearDownEmbeddedJMSService() throws Exception {
79 log.info("stopping ActiveMQ broker");
80
81 tcpConnector.stop();
82 broker.removeConnector(tcpConnector);
83
84 broker.stop();
85 broker.waitUntilStopped();
86
87 log.info("ActiveMQ broker stopped");
88 }
89
90
91
92
93
94
95
96
97
98 public static Queue getDynamicQueue() {
99 try {
100 return (Queue) new InitialContext().lookup(
101 "dynamicQueues/TestQueue" + dynamicQueueCount.getAndIncrement());
102 }
103 catch (NamingException e) {
104 throw new IllegalStateException("Resolution of dynamic test queue failed", e);
105 }
106 }
107
108
109
110
111
112
113
114
115
116
117
118 public static Queue getDynamicQueue(JmsTemplate jmsTempl) {
119 return (Queue) jmsTempl.execute(new SessionCallback() {
120 public Destination doInJms(Session sess) throws JMSException {
121 return sess.createQueue("testQueue" +
122 dynamicQueueCount.getAndIncrement());
123 }
124 });
125 }
126
127 public static void waitUntilQueueEmpty(JmsTemplate jmsTempl, Queue dest)
128 throws InterruptedException {
129 while (true) {
130 boolean isEmpty = (Boolean) jmsTempl.browse(dest, new BrowserCallback() {
131 public Object doInJms(Session sess, QueueBrowser browser) throws JMSException {
132 return new Boolean(!browser.getEnumeration().hasMoreElements());
133 }
134 });
135 if (isEmpty) {
136 break;
137 }
138 log.debug("Queue not yet empty. Sleeping for a while.");
139 Thread.sleep(10);
140 }
141 }
142 }