Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.nio.channels.InterruptedByTimeoutException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
Expand All @@ -50,7 +51,10 @@
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestRule;
import org.junit.runners.model.Statement;

@SuppressWarnings("serial")
public class ConcurrentExportTests {
Expand All @@ -61,29 +65,84 @@
return Matchers.instanceOf(InterruptedByTimeoutException.class);
}

// --- Debugging helpers ---

private static final long TEST_START_TIME = System.currentTimeMillis();

private static String timestamp() {
return String.format("[t=%5dms]", System.currentTimeMillis() - TEST_START_TIME);
}

private static void log(String msg) {
System.err.println(timestamp() + " [" + Thread.currentThread().getName() + "] " + msg);
}

private static void dumpAllThreads() {
Map<Thread, StackTraceElement[]> traces = Thread.getAllStackTraces();
traces.forEach((thread, stack) -> {
System.err.println(" Thread \"" + thread.getName() + "\" [" + thread.getState() + "]"
+ (thread.isInterrupted() ? " INTERRUPTED" : ""));
for (int i = 0; i < Math.min(stack.length, 8); i++) {
System.err.println(" at " + stack[i]);
}
});
}

/** Dumps all threads at 75% of TEST_TIMEOUT so the dump appears before JUnit kills the test. */
@Rule
public final TestRule watchdog = (base, description) -> new Statement() {
@Override
public void evaluate() throws Throwable {
Thread watchdogThread = new Thread(() -> {
try {
Thread.sleep(TEST_TIMEOUT * 3 / 4);

Check warning on line 98 in src/test/java/com/flowingcode/vaadin/addons/gridexporter/test/ConcurrentExportTests.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Remove this use of "Thread.sleep()".

See more on https://sonarcloud.io/project/issues?id=FlowingCode_GridExporterAddon&issues=AZznuo_epv4q2krwRPla&open=AZznuo_epv4q2krwRPla&pullRequest=195
System.err.println(timestamp() + " *** WATCHDOG: \"" + description.getMethodName()
+ "\" appears stuck — thread dump:");
dumpAllThreads();
} catch (InterruptedException e) {
// test finished in time
}
}, "watchdog");
watchdogThread.setDaemon(true);
watchdogThread.start();
try {
base.evaluate();
} finally {
watchdogThread.interrupt();
}
}
};

// --- Inner writer with per-download ID and lifecycle logging ---

private class ConcurrentStreamResourceWriter
extends ConfigurableConcurrentStreamResourceWriter {

private final String id;
private boolean interruptedByTimeout;
private boolean accepted;
private boolean finished;

public ConcurrentStreamResourceWriter(StreamResourceWriter delegate) {
public ConcurrentStreamResourceWriter(String id, StreamResourceWriter delegate) {
super(delegate);
this.id = id;
}

@Override
protected void onTimeout() {
log("download-" + id + ": onTimeout() — semaphore not acquired within timeout");
interruptedByTimeout = true;
}

@Override
protected void onAccept() {
log("download-" + id + ": onAccept()");
accepted = true;
}

@Override
protected void onFinish() {
log("download-" + id + ": onFinish()");
finished = true;
}

Expand Down Expand Up @@ -139,42 +198,55 @@
boolean isAccepted();
}

private Thread newThread(Runnable target) {
Thread thread = new Thread(target);
private Thread newThread(Runnable target, String name) {
Thread thread = new Thread(target, name);
threads.add(thread);
return thread;
}

private MockDownload newDownload() {
private MockDownload newDownload(String id) {

CyclicBarrier barrier = this.barrier;
CountDownLatch latch = new CountDownLatch(1);

ConcurrentStreamResourceWriter writer =
new ConcurrentStreamResourceWriter((stream, session) -> {
new ConcurrentStreamResourceWriter(id, (stream, session) -> {
log("download-" + id + ": delegate running — counting down latch");
latch.countDown();
log("download-" + id + ": delegate entering barrier (waiting=" + barrier.getNumberWaiting() + ")");
await(barrier);
log("download-" + id + ": delegate exited barrier");
});

writer.setUi(new UI());
Exchanger<Throwable> exchanger = new Exchanger<>();

Thread thread = newThread(() -> {

log("download-" + id + ": thread started (interrupted=" + Thread.currentThread().isInterrupted() + ")");
Throwable throwable = null;
try {
writer.accept(NULL_OUTPUT_STREAM, createSession());
} catch (Throwable t) {
log("download-" + id + ": writer.accept() threw " + t.getClass().getSimpleName());
throwable = t;
}

latch.countDown();
if (latch.getCount() != 0) {
log("download-" + id + ": counting down latch (fallback) and entering exchanger");
latch.countDown();
} else {
log("download-" + id + ": entering exchanger");
}

try {
exchanger.exchange(throwable);
log("download-" + id + ": exchanger completed");
} catch (InterruptedException e) {
log("download-" + id + ": exchanger interrupted — thread will exit without exchanging!");
return;
}
});
}, "download-" + id);

return new MockDownload() {
@Override
Expand All @@ -191,10 +263,11 @@
thread.start();
}

try {
latch.await(1, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
sneakyThrow(e);
// Spin until the thread is either blocked at tryAcquire (TIMED_WAITING,
// meaning the UI has already been captured) or has already finished
// (latch counted down after accept() returned or threw).
while (latch.getCount() > 0 && thread.getState() != Thread.State.TIMED_WAITING) {
Thread.onSpinWait();
}

return this;
Expand Down Expand Up @@ -244,9 +317,12 @@
}

private static void await(CyclicBarrier barrier) {
log("await(barrier): entering (waiting=" + barrier.getNumberWaiting() + ", parties=" + barrier.getParties() + ")");
try {
barrier.await();
log("await(barrier): returned");
} catch (Exception e) {
log("await(barrier): threw " + e.getClass().getSimpleName());
sneakyThrow(e);
}
}
Expand Down Expand Up @@ -309,8 +385,8 @@
ConcurrentStreamResourceWriter.setLimit(Float.POSITIVE_INFINITY);
initializeCyclicBarrier(2);

var q1 = newDownload().await();
var q2 = newDownload().await();
var q1 = newDownload("q1").await();
var q2 = newDownload("q2").await();

assertThat(q1.get(), nullValue());
assertThat(q2.get(), nullValue());
Expand All @@ -322,8 +398,8 @@
ConcurrentStreamResourceWriter.setLimit(2);
initializeCyclicBarrier(2);

var q1 = newDownload().await();
var q2 = newDownload().await();
var q1 = newDownload("q1").await();
var q2 = newDownload("q2").await();

assertThat(q1.get(), nullValue());
assertThat(q2.get(), nullValue());
Expand All @@ -335,8 +411,8 @@
ConcurrentStreamResourceWriter.setLimit(1);
initializeCyclicBarrier(2);

var q1 = newDownload().await();
var q2 = newDownload().start();
var q1 = newDownload("q1").await();
var q2 = newDownload("q2").start();
assertThat(barrier.getNumberWaiting(), equalTo(1));
await(barrier);

Expand All @@ -352,9 +428,9 @@
ConcurrentStreamResourceWriter.setLimit(2);
initializeCyclicBarrier(3);

var q1 = newDownload().await();
var q2 = newDownload().await();
var q3 = newDownload().withCost(2).start();
var q1 = newDownload("q1").await();
var q2 = newDownload("q2").await();
var q3 = newDownload("q3").withCost(2).start();
assertThat(barrier.getNumberWaiting(), equalTo(2));
await(barrier);

Expand All @@ -369,9 +445,9 @@
ConcurrentStreamResourceWriter.setLimit(2);
initializeCyclicBarrier(2);

var q1 = newDownload().withCost(2).await();
var q2 = newDownload().start();
var q3 = newDownload().start();
var q1 = newDownload("q1").withCost(2).await();
var q2 = newDownload("q2").start();
var q3 = newDownload("q3").start();
assertThat(barrier.getNumberWaiting(), equalTo(1));
await(barrier);

Expand All @@ -385,10 +461,10 @@
public void testAcceptFinish() throws InterruptedException {
ConcurrentStreamResourceWriter.setLimit(2);
initializeCyclicBarrier(2);
var q1 = newDownload().await();
var q1 = newDownload("q1").await();
assertTrue("Download has not been accepted", q1.isAccepted());
assertFalse("Download has finished too early", q1.isFinished());
var q2 = newDownload().await();
var q2 = newDownload("q2").await();
assertTrue("Download has not been accepted", q2.isAccepted());
assertThat(q1.get(), nullValue());
assertThat(q2.get(), nullValue());
Expand All @@ -403,12 +479,12 @@

initializeCyclicBarrier(2);
CyclicBarrier b1 = barrier;
var q1 = newDownload().await();
var q1 = newDownload("q1").await();
assertTrue("Download has not been accepted", q1.isAccepted());
assertFalse("Download has finished too early", q1.isFinished());

initializeCyclicBarrier(2);
var q2 = newDownload().withTimeout(TEST_TIMEOUT).start();
var q2 = newDownload("q2").withTimeout(TEST_TIMEOUT).start();
assertTrue("Download has not been accepted", q1.isAccepted());
assertFalse("Download has finished too early", q1.isFinished());

Expand Down
Loading