1   /*
2       This file is part of quExec.
3   
4       quExec is free software; you can redistribute it and/or modify
5       it under the terms of the GNU Lesser General Public License as published by
6       the Free Software Foundation; either version 2 of the License, or
7       (at your option) any later version.
8   
9       quExec is distributed in the hope that it will be useful,
10      but WITHOUT ANY WARRANTY; without even the implied warranty of
11      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12      GNU Lesser General Public License for more details.
13  
14      You should have received a copy of the GNU Lesser General Public License
15      along with quExec; if not, write to the Free Software
16      Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
17  */
18  
19  package net.sourceforge.quexec.packet.chars.producer;
20  
21  
22  import static org.junit.Assert.assertEquals;
23  import static org.junit.Assert.assertFalse;
24  
25  import java.util.concurrent.TimeoutException;
26  
27  import javax.annotation.Resource;
28  import javax.jms.ConnectionFactory;
29  import javax.jms.Destination;
30  
31  import net.sourceforge.quexec.jms.AbstractJmsBasedTest;
32  import net.sourceforge.quexec.packet.chars.consumer.StreamCharPacketConsumer;
33  import net.sourceforge.quexec.packet.chars.stream.CharPacketInputStream;
34  import net.sourceforge.quexec.packet.chars.stream.CharPacketOutputStream;
35  import net.sourceforge.quexec.packet.chars.stream.JmsSendCharPacketStream;
36  import net.sourceforge.quexec.packet.chars.stream.QueueCharPacketStream;
37  import net.sourceforge.quexec.packet.chars.stream.StreamTestUtils;
38  import net.sourceforge.quexec.testutil.JmsTestUtils;
39  
40  import org.junit.Before;
41  import org.junit.Test;
42  import org.junit.runner.RunWith;
43  import org.springframework.test.context.ContextConfiguration;
44  import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
45  
46  @RunWith(SpringJUnit4ClassRunner.class)
47  @ContextConfiguration(locations={"classpath:/spring/jmsBase-context.xml"})
48  public class JmsReceiveCharPacketProducerTest extends AbstractJmsBasedTest {
49  	
50  //	private static final Log log = LogFactory.getLog(JmsReceiveCharPacketProducerTest.class);
51  	
52  	private static final String msg = "Hello!";
53  
54  	@Resource
55  	private ConnectionFactory connectionFactory;
56  	
57  	private CharPacketOutputStream sender;
58  	
59  	private CharPacketProducer producer;
60  	
61  	private CharPacketInputStream target;
62  
63  	@Before
64  	public void setUp() throws Exception {
65  		setUpMultithreading();
66  		
67  		Destination dest = JmsTestUtils.getDynamicQueue();
68  		
69  		JmsSendCharPacketStream sender = new JmsSendCharPacketStream();
70  		sender.setConnectionFactory(connectionFactory);
71  		sender.setDestination(dest);
72  
73  		JmsReceiveCharPacketProducer producer = new JmsReceiveCharPacketProducer();
74  		producer.setConnectionFactory(connectionFactory);
75  		producer.setDestination(dest);
76  
77  		StreamCharPacketConsumer consumer = new StreamCharPacketConsumer();
78  		QueueCharPacketStream target = QueueCharPacketStream.newInstance();
79  		consumer.setOutputStream(target);
80  		producer.setConsumer(consumer);
81  
82  		sender.init();
83  		target.init();
84  		producer.init();
85  		
86  		this.sender = sender;
87  		this.producer = producer;
88  		this.target = target;
89  
90  		this.producer.start();
91  	}
92  
93  	@Test(timeout=1000)
94  	public void emptyTransmission() throws InterruptedException {
95  		assertFalse(this.target.isFinished());
96  		this.sender.finish();
97  		StreamTestUtils.checkStreamFinished(this.target);
98  	}
99  	
100 	@Test(timeout=1000)
101 	public void singlePacketTransmission()
102 	throws InterruptedException, TimeoutException {
103 		assertFalse(this.target.isFinished());
104 		final String singlePacket = "Hello!";
105 		this.sender.sendPacket(singlePacket);
106 		this.sender.finish();
107 		assertEquals(singlePacket, this.target.readPacket());
108 		StreamTestUtils.checkStreamFinished(this.target);
109 	}
110 	
111 	@Test(timeout=1000)
112 	public void singlePacketTransmissionWithPolling()
113 	throws InterruptedException {
114 		assertFalse(this.target.isFinished());
115 		final String singlePacket = "Hello!";
116 		this.sender.sendPacket(singlePacket);
117 		this.sender.finish();
118 		String received;
119 		while (true) {
120 			received = this.target.tryReadPacket();
121 			if (received != null) {
122 				break;
123 			}
124 			Thread.sleep(100);
125 		}
126 		assertEquals(singlePacket, received);
127 		StreamTestUtils.checkStreamFinished(this.target);
128 	}
129 
130 	@Test(timeout=1000)
131 	public void firstSendThenReceivePackets()
132 	throws InterruptedException, TimeoutException {
133 		assertFalse(this.target.isFinished());
134 		final int numMsg = 10;
135 		for (int i = 0; i < numMsg; i++) {
136 			this.sender.sendPacket(msg + i);
137 		}
138 		this.sender.finish();
139 		for (int i = 0; i < numMsg; i++) {
140 			assertFalse(this.target.isFinished());
141 			assertEquals(msg + i, this.target.readPacket());
142 		}
143 		StreamTestUtils.checkStreamFinished(this.target);
144 	}
145 
146 	@Test(timeout=1000)
147 	public void sendAndReceivePacketsInterleaved()
148 	throws InterruptedException, TimeoutException {
149 		assertFalse(this.target.isFinished());
150 		final String msg = "Hello!";
151 		final int numMsg = 10;
152 		for (int i = 0; i < numMsg; i++) {
153 			this.sender.sendPacket(msg + i);
154 			assertFalse(this.target.isFinished());
155 			assertEquals(msg + i, this.target.readPacket());
156 		}
157 		this.sender.finish();
158 		StreamTestUtils.checkStreamFinished(this.target);
159 	}
160 
161 	@Test(timeout=1000)
162 	public void sendAndReceivePacketsParallel() throws InterruptedException {
163 		final int numMsg = 10;
164 		Task sendTask = new Task() {
165 			public void doIt() {
166 				for (int i = 0; i < numMsg; i++) {
167 					sender.sendPacket(msg + i);
168 				}
169 				sender.finish();
170 			}
171 		};
172 		Task recvTask = new Task() {
173 			public void doIt() throws InterruptedException, TimeoutException {
174 				assertFalse(target.isFinished());
175 				for (int i = 0; i < numMsg; i++) {
176 					assertFalse(target.isFinished());
177 					assertEquals(msg + i, target.readPacket());
178 				}
179 				StreamTestUtils.checkStreamFinished(target);
180 			}
181 		};
182 		
183 		scheduleTask(sendTask, "Sender");
184 		scheduleTask(recvTask, "Receiver");
185 		
186 		runAll();
187 	}
188 }