1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package net.sourceforge.quexec.packet.chars.producer;
20
21
22 import static org.junit.Assert.assertEquals;
23 import static org.junit.Assert.assertFalse;
24
25 import java.util.concurrent.TimeoutException;
26
27 import javax.annotation.Resource;
28 import javax.jms.ConnectionFactory;
29 import javax.jms.Destination;
30
31 import net.sourceforge.quexec.jms.AbstractJmsBasedTest;
32 import net.sourceforge.quexec.packet.chars.consumer.StreamCharPacketConsumer;
33 import net.sourceforge.quexec.packet.chars.stream.CharPacketInputStream;
34 import net.sourceforge.quexec.packet.chars.stream.CharPacketOutputStream;
35 import net.sourceforge.quexec.packet.chars.stream.JmsSendCharPacketStream;
36 import net.sourceforge.quexec.packet.chars.stream.QueueCharPacketStream;
37 import net.sourceforge.quexec.packet.chars.stream.StreamTestUtils;
38 import net.sourceforge.quexec.testutil.JmsTestUtils;
39
40 import org.junit.Before;
41 import org.junit.Test;
42 import org.junit.runner.RunWith;
43 import org.springframework.test.context.ContextConfiguration;
44 import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
45
46 @RunWith(SpringJUnit4ClassRunner.class)
47 @ContextConfiguration(locations={"classpath:/spring/jmsBase-context.xml"})
48 public class JmsReceiveCharPacketProducerTest extends AbstractJmsBasedTest {
49
50
51
52 private static final String msg = "Hello!";
53
54 @Resource
55 private ConnectionFactory connectionFactory;
56
57 private CharPacketOutputStream sender;
58
59 private CharPacketProducer producer;
60
61 private CharPacketInputStream target;
62
63 @Before
64 public void setUp() throws Exception {
65 setUpMultithreading();
66
67 Destination dest = JmsTestUtils.getDynamicQueue();
68
69 JmsSendCharPacketStream sender = new JmsSendCharPacketStream();
70 sender.setConnectionFactory(connectionFactory);
71 sender.setDestination(dest);
72
73 JmsReceiveCharPacketProducer producer = new JmsReceiveCharPacketProducer();
74 producer.setConnectionFactory(connectionFactory);
75 producer.setDestination(dest);
76
77 StreamCharPacketConsumer consumer = new StreamCharPacketConsumer();
78 QueueCharPacketStream target = QueueCharPacketStream.newInstance();
79 consumer.setOutputStream(target);
80 producer.setConsumer(consumer);
81
82 sender.init();
83 target.init();
84 producer.init();
85
86 this.sender = sender;
87 this.producer = producer;
88 this.target = target;
89
90 this.producer.start();
91 }
92
93 @Test(timeout=1000)
94 public void emptyTransmission() throws InterruptedException {
95 assertFalse(this.target.isFinished());
96 this.sender.finish();
97 StreamTestUtils.checkStreamFinished(this.target);
98 }
99
100 @Test(timeout=1000)
101 public void singlePacketTransmission()
102 throws InterruptedException, TimeoutException {
103 assertFalse(this.target.isFinished());
104 final String singlePacket = "Hello!";
105 this.sender.sendPacket(singlePacket);
106 this.sender.finish();
107 assertEquals(singlePacket, this.target.readPacket());
108 StreamTestUtils.checkStreamFinished(this.target);
109 }
110
111 @Test(timeout=1000)
112 public void singlePacketTransmissionWithPolling()
113 throws InterruptedException {
114 assertFalse(this.target.isFinished());
115 final String singlePacket = "Hello!";
116 this.sender.sendPacket(singlePacket);
117 this.sender.finish();
118 String received;
119 while (true) {
120 received = this.target.tryReadPacket();
121 if (received != null) {
122 break;
123 }
124 Thread.sleep(100);
125 }
126 assertEquals(singlePacket, received);
127 StreamTestUtils.checkStreamFinished(this.target);
128 }
129
130 @Test(timeout=1000)
131 public void firstSendThenReceivePackets()
132 throws InterruptedException, TimeoutException {
133 assertFalse(this.target.isFinished());
134 final int numMsg = 10;
135 for (int i = 0; i < numMsg; i++) {
136 this.sender.sendPacket(msg + i);
137 }
138 this.sender.finish();
139 for (int i = 0; i < numMsg; i++) {
140 assertFalse(this.target.isFinished());
141 assertEquals(msg + i, this.target.readPacket());
142 }
143 StreamTestUtils.checkStreamFinished(this.target);
144 }
145
146 @Test(timeout=1000)
147 public void sendAndReceivePacketsInterleaved()
148 throws InterruptedException, TimeoutException {
149 assertFalse(this.target.isFinished());
150 final String msg = "Hello!";
151 final int numMsg = 10;
152 for (int i = 0; i < numMsg; i++) {
153 this.sender.sendPacket(msg + i);
154 assertFalse(this.target.isFinished());
155 assertEquals(msg + i, this.target.readPacket());
156 }
157 this.sender.finish();
158 StreamTestUtils.checkStreamFinished(this.target);
159 }
160
161 @Test(timeout=1000)
162 public void sendAndReceivePacketsParallel() throws InterruptedException {
163 final int numMsg = 10;
164 Task sendTask = new Task() {
165 public void doIt() {
166 for (int i = 0; i < numMsg; i++) {
167 sender.sendPacket(msg + i);
168 }
169 sender.finish();
170 }
171 };
172 Task recvTask = new Task() {
173 public void doIt() throws InterruptedException, TimeoutException {
174 assertFalse(target.isFinished());
175 for (int i = 0; i < numMsg; i++) {
176 assertFalse(target.isFinished());
177 assertEquals(msg + i, target.readPacket());
178 }
179 StreamTestUtils.checkStreamFinished(target);
180 }
181 };
182
183 scheduleTask(sendTask, "Sender");
184 scheduleTask(recvTask, "Receiver");
185
186 runAll();
187 }
188 }