1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package net.sourceforge.quexec.packet.chars.stream;
20
21 import static org.junit.Assert.assertEquals;
22 import static org.junit.Assert.assertFalse;
23 import static org.junit.Assert.assertNull;
24 import static org.junit.Assert.assertTrue;
25
26 import java.util.concurrent.TimeoutException;
27
28 import net.sourceforge.quexec.testutil.AbstractMultithreadedTest;
29
30 import org.apache.commons.logging.Log;
31 import org.apache.commons.logging.LogFactory;
32 import org.junit.Ignore;
33 import org.junit.Test;
34
35
36
37
38
39 public abstract class AbstractCharPacketStreamTest
40 extends AbstractMultithreadedTest {
41
42 private static final Log log = LogFactory.getLog(AbstractCharPacketStreamTest.class);
43
44 private static final String msg = "Hello!";
45
46 private CharPacketInputStream inputStream;
47
48 private CharPacketOutputStream outputStream;
49
50 protected final void initStreamsBeforeEveryTest() throws Exception {
51 this.inputStream = getInputStream();
52 this.outputStream = getOutputStream();
53 }
54
55 protected abstract CharPacketInputStream getInputStream();
56
57 protected abstract CharPacketOutputStream getOutputStream();
58
59 @Test
60 public void emptyTransmission()
61 throws InterruptedException, TimeoutException {
62 sequentialSendReadCheck(0);
63 }
64
65 @Test
66 public void singlePacketTransmission()
67 throws InterruptedException, TimeoutException {
68 sequentialSendReadCheck(1);
69 }
70
71 @Test
72 public void severalPacketsTransmission() throws
73 InterruptedException, TimeoutException {
74 sequentialSendReadCheck(10);
75 }
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106 @Ignore
107 @Test(timeout = 20000)
108 public void parallelTransmission() throws Exception {
109 log.debug("Test parallelTransmission begins for class " + getClass().getName());
110 parallelSendReadCheck(50);
111 }
112
113 private void sequentialSendReadCheck(int numMsg)
114 throws InterruptedException, TimeoutException {
115 assertFalse(this.inputStream.isFinished());
116 assertNull(this.inputStream.tryReadPacket());
117
118 for (int i = 0 ; i < numMsg; i++) {
119 this.outputStream.sendPacket(msg + i);
120 }
121 this.outputStream.finish();
122
123 for (int i = 0 ; i < numMsg; i++) {
124 assertFalse(this.inputStream.isFinished());
125 assertEquals(msg + i, this.inputStream.readPacket());
126 }
127
128 int retry;
129 for (retry = 0; retry < 100; retry++) {
130
131
132
133 if (this.inputStream.isFinished()) {
134 break;
135 }
136 Thread.sleep(10);
137 }
138 assertFalse(retry == 100);
139 }
140
141 private void parallelSendReadCheck(final int numMsg)
142 throws InterruptedException {
143 Task sender = new Task() {
144 public void doIt() {
145 for (int i = 0 ; i < numMsg; i++) {
146 outputStream.sendPacket(msg + i);
147 }
148 outputStream.finish();
149 };
150 };
151
152 Task receiver = new Task() {
153 public void doIt() throws Throwable {
154 for (int i = 0 ; i < numMsg; i++) {
155 assertFalse(inputStream.isFinished());
156 assertEquals(msg + i, inputStream.readPacket());
157 }
158 while (!inputStream.isFinished()) {
159 Thread.sleep(10);
160 }
161 assertTrue(inputStream.isFinished());
162 }
163 };
164
165 scheduleTask(sender, "Sender");
166 scheduleTask(receiver, "Receiver");
167
168 runAll();
169 }
170 }