1   /*
2       This file is part of quExec.
3   
4       quExec is free software; you can redistribute it and/or modify
5       it under the terms of the GNU Lesser General Public License as published by
6       the Free Software Foundation; either version 2 of the License, or
7       (at your option) any later version.
8   
9       quExec is distributed in the hope that it will be useful,
10      but WITHOUT ANY WARRANTY; without even the implied warranty of
11      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12      GNU Lesser General Public License for more details.
13  
14      You should have received a copy of the GNU Lesser General Public License
15      along with quExec; if not, write to the Free Software
16      Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
17  */
18  
19  package net.sourceforge.quexec.packet.chars.stream;
20  
21  import static org.junit.Assert.assertEquals;
22  import static org.junit.Assert.assertFalse;
23  import static org.junit.Assert.assertNull;
24  import static org.junit.Assert.assertTrue;
25  
26  import java.util.concurrent.TimeoutException;
27  
28  import net.sourceforge.quexec.testutil.AbstractMultithreadedTest;
29  
30  import org.apache.commons.logging.Log;
31  import org.apache.commons.logging.LogFactory;
32  import org.junit.Ignore;
33  import org.junit.Test;
34  
35  /**
36   * @author schickin
37   *
38   */
39  public abstract class AbstractCharPacketStreamTest
40  extends AbstractMultithreadedTest {
41  	
42  	private static final Log log = LogFactory.getLog(AbstractCharPacketStreamTest.class);
43  	
44  	private static final String msg = "Hello!";
45  	
46  	private CharPacketInputStream inputStream;
47  	
48  	private CharPacketOutputStream outputStream;
49  
50  	protected final void initStreamsBeforeEveryTest() throws Exception {
51  		this.inputStream = getInputStream();
52  		this.outputStream = getOutputStream();
53  	}
54  
55  	protected abstract CharPacketInputStream getInputStream();
56  	
57  	protected abstract CharPacketOutputStream getOutputStream();
58  
59  	@Test
60  	public void emptyTransmission()
61  	throws InterruptedException, TimeoutException {
62  		sequentialSendReadCheck(0);
63  	}
64  	
65  	@Test
66  	public void singlePacketTransmission()
67  	throws InterruptedException, TimeoutException {
68  		sequentialSendReadCheck(1);
69  	}
70  	
71  	@Test
72  	public void severalPacketsTransmission() throws
73  	InterruptedException, TimeoutException {
74  		sequentialSendReadCheck(10);
75  	}
76  	
77  	// TODO: This test reports transient errors (e.g. Hello!8 expected but gut Hello!18)
78  	// Analysis showed so far that following things happen:
79  	//
80  	// 1. the underlying ActiveMQ connection fails with an exception:
81  	//
82  	//	5059 ERROR [net.sourceforge.quexec.packet.chars.stream.JmsCharPacketStreamTest] JMSException recorded
83  	//	javax.jms.JMSException: java.io.EOFException
84  	//	at net.sourceforge.quexec.packet.chars.stream.JmsCharPacketStreamTest$1.onException(JmsCharPacketStreamTest.java:69)
85  	//	at org.springframework.jms.connection.ChainedExceptionListener.onException(ChainedExceptionListener.java:60)
86  	//	at org.apache.activemq.ActiveMQConnection$4.run(ActiveMQConnection.java:1779)
87  	//	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
88  	//	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
89  	//	at java.lang.Thread.run(Thread.java:636)
90  	//	Caused by: java.io.EOFException
91  	//	at java.io.DataInputStream.readInt(DataInputStream.java:392)
92  	//	at org.apache.activemq.openwire.OpenWireFormat.unmarshal(OpenWireFormat.java:269)
93  	//	at org.apache.activemq.transport.tcp.TcpTransport.readCommand(TcpTransport.java:210)
94  	//	at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:202)
95  	//	at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:185)
96  	//	... 1 more
97  	//
98  	// 2. the Spring SingleConnectionFactory automatically reconnects
99  	//
100 	// 3. the recovery mechanism of ActiveMQ reports some duplicate messages in its
101 	// message store. seemingly, the recovery does not work correctly.
102 	//
103 	// Summary: the behavior seems to be caused by non-thread-safe logic within
104 	// the embedded ActiveMQ broker that is used for the unit tests
105 	
106 	@Ignore
107 	@Test(timeout = 20000)
108 	public void parallelTransmission() throws Exception {
109 		log.debug("Test parallelTransmission begins for class " + getClass().getName());
110 		parallelSendReadCheck(50);
111 	}
112 	
113 	private void sequentialSendReadCheck(int numMsg)
114 	throws InterruptedException, TimeoutException {
115 		assertFalse(this.inputStream.isFinished());
116 		assertNull(this.inputStream.tryReadPacket());
117 
118 		for (int i = 0 ; i < numMsg; i++) {
119 			this.outputStream.sendPacket(msg + i);
120 		}
121 		this.outputStream.finish();
122 		
123 		for (int i = 0 ; i < numMsg; i++) {
124 			assertFalse(this.inputStream.isFinished());
125 			assertEquals(msg + i, this.inputStream.readPacket());
126 		}
127 		
128 		int retry;
129 		for (retry = 0; retry < 100; retry++) {
130 			// since the call to isFinished is non-blocking, it may happen that
131 			// the finish-packet has not yet reached the inputStream object.
132 			// thus, we need to retry a couple of times.
133 			if (this.inputStream.isFinished()) {
134 				break;
135 			}
136 			Thread.sleep(10);
137 		}
138 		assertFalse(retry == 100);
139 	}
140 	
141 	private void parallelSendReadCheck(final int numMsg)
142 	throws InterruptedException {
143 		Task sender = new Task() {
144 			public void doIt() {
145 				for (int i = 0 ; i < numMsg; i++) {
146 					outputStream.sendPacket(msg + i);
147 				}
148 				outputStream.finish();
149 			};
150 		};
151 		
152 		Task receiver = new Task() {
153 			public void doIt() throws Throwable {
154 				for (int i = 0 ; i < numMsg; i++) {
155 					assertFalse(inputStream.isFinished());
156 					assertEquals(msg + i, inputStream.readPacket());
157 				}
158 				while (!inputStream.isFinished()) {
159 					Thread.sleep(10);
160 				}
161 				assertTrue(inputStream.isFinished());
162 			}
163 		};
164 
165 		scheduleTask(sender, "Sender");
166 		scheduleTask(receiver, "Receiver");
167 		
168 		runAll();
169 	}
170 }