From 6bd4de38ee04d5b4a3d53061df8727f5c713af1a Mon Sep 17 00:00:00 2001 From: Sky Brewer Date: Tue, 26 May 2026 10:15:00 +0200 Subject: [PATCH 01/10] Add failing regression tests for hasNext()/close() deadlock With synchronized hasNext(), a worker blocked inside mainIterator.hasNext() holds the 'this' monitor indefinitely; close() cannot acquire it and hangs. closeCompletesWhileHasNextIsBlocking reproduces this with @Timeout(5). Includes FakeDataRetrieval/FakeApplianceArchiveReader/BlockingGenMsgIterator test infrastructure and Mockito dependency. Co-Authored-By: Claude Sonnet 4.6 --- app/trends/archive-reader/pom.xml | 6 ++ ...ApplianceValueIteratorConcurrencyTest.java | 80 +++++++++++++++++++ .../appliance/BlockingGenMsgIterator.java | 35 ++++++++ .../appliance/FakeApplianceArchiveReader.java | 26 ++++++ .../reader/appliance/FakeDataRetrieval.java | 47 +++++++++++ 5 files changed, 194 insertions(+) create mode 100644 app/trends/archive-reader/src/test/java/org/phoebus/archive/reader/appliance/ApplianceValueIteratorConcurrencyTest.java create mode 100644 app/trends/archive-reader/src/test/java/org/phoebus/archive/reader/appliance/BlockingGenMsgIterator.java create mode 100644 app/trends/archive-reader/src/test/java/org/phoebus/archive/reader/appliance/FakeApplianceArchiveReader.java create mode 100644 app/trends/archive-reader/src/test/java/org/phoebus/archive/reader/appliance/FakeDataRetrieval.java diff --git a/app/trends/archive-reader/pom.xml b/app/trends/archive-reader/pom.xml index d84c31b5fd..ce599f391b 100644 --- a/app/trends/archive-reader/pom.xml +++ b/app/trends/archive-reader/pom.xml @@ -62,5 +62,11 @@ 1.3 test + + org.mockito + mockito-core + ${mockito.version} + test + diff --git a/app/trends/archive-reader/src/test/java/org/phoebus/archive/reader/appliance/ApplianceValueIteratorConcurrencyTest.java b/app/trends/archive-reader/src/test/java/org/phoebus/archive/reader/appliance/ApplianceValueIteratorConcurrencyTest.java new file mode 100644 index 0000000000..b9d0ea92c4 --- /dev/null +++ b/app/trends/archive-reader/src/test/java/org/phoebus/archive/reader/appliance/ApplianceValueIteratorConcurrencyTest.java @@ -0,0 +1,80 @@ +package org.phoebus.archive.reader.appliance; + +import org.epics.archiverappliance.retrieval.client.EpicsMessage; +import org.epics.archiverappliance.retrieval.client.GenMsgIterator; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; +import org.phoebus.archive.reader.ValueIterator; + +import java.time.Instant; +import java.util.Collections; + +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.Mockito.*; + +class ApplianceValueIteratorConcurrencyTest { + + private static GenMsgIterator emptyStream() { + GenMsgIterator s = mock(GenMsgIterator.class); + when(s.iterator()).thenReturn(Collections.emptyIterator()); + return s; + } + + private static ValueIterator getRaw(FakeApplianceArchiveReader reader) throws Exception { + Instant start = Instant.now().minusSeconds(60); + return reader.getRawValues("TESTPV", start, Instant.now()); + } + + /** + * Regression test for the hasNext()/close() deadlock. + * + * With the old synchronized hasNext(), a worker blocked inside mainIterator.hasNext() + * held the 'this' monitor. close() could never acquire it, so the job hung forever + * and the semaphore permit was never returned. @Timeout(5) catches the hang. + */ + @Test + @Timeout(5) + void closeCompletesWhileHasNextIsBlocking() throws Exception { + BlockingGenMsgIterator blocker = new BlockingGenMsgIterator(); + GenMsgIterator stream = mock(GenMsgIterator.class); + when(stream.iterator()).thenReturn(blocker); + + FakeApplianceArchiveReader reader = new FakeApplianceArchiveReader(new FakeDataRetrieval(stream)); + ValueIterator iter = getRaw(reader); + + Thread worker = new Thread(iter::hasNext); + worker.start(); + blocker.entered.await(); // worker is now blocked inside hasNext() + + iter.close(); // must not deadlock + assertFalse(iter.hasNext()); + + blocker.release.countDown(); + worker.join(1000); + assertFalse(worker.isAlive()); + } + + @Test + void hasNextReturnsFalseAfterClose() throws Exception { + EpicsMessage msg = mock(EpicsMessage.class); + GenMsgIterator stream = mock(GenMsgIterator.class); + when(stream.iterator()).thenReturn(Collections.singletonList(msg).iterator()); + + ValueIterator iter = getRaw(new FakeApplianceArchiveReader(new FakeDataRetrieval(stream))); + + assertTrue(iter.hasNext()); + iter.close(); + assertFalse(iter.hasNext()); + } + + @Test + void nextReturnsNullAfterClose() throws Exception { + EpicsMessage msg = mock(EpicsMessage.class); + GenMsgIterator stream = mock(GenMsgIterator.class); + when(stream.iterator()).thenReturn(Collections.singletonList(msg).iterator()); + + ValueIterator iter = getRaw(new FakeApplianceArchiveReader(new FakeDataRetrieval(stream))); + iter.close(); + assertNull(iter.next()); + } +} diff --git a/app/trends/archive-reader/src/test/java/org/phoebus/archive/reader/appliance/BlockingGenMsgIterator.java b/app/trends/archive-reader/src/test/java/org/phoebus/archive/reader/appliance/BlockingGenMsgIterator.java new file mode 100644 index 0000000000..8f68992b32 --- /dev/null +++ b/app/trends/archive-reader/src/test/java/org/phoebus/archive/reader/appliance/BlockingGenMsgIterator.java @@ -0,0 +1,35 @@ +package org.phoebus.archive.reader.appliance; + +import org.epics.archiverappliance.retrieval.client.EpicsMessage; + +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.concurrent.CountDownLatch; + +/** + * Iterator whose hasNext() blocks until released, for concurrency tests. + * + * The caller awaits {@link #entered} to confirm the worker thread is inside + * hasNext(), then counts down {@link #release} to unblock it. + */ +class BlockingGenMsgIterator implements Iterator { + + final CountDownLatch entered = new CountDownLatch(1); + final CountDownLatch release = new CountDownLatch(1); + + @Override + public boolean hasNext() { + entered.countDown(); + try { + release.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + return false; + } + + @Override + public EpicsMessage next() { + throw new NoSuchElementException(); + } +} diff --git a/app/trends/archive-reader/src/test/java/org/phoebus/archive/reader/appliance/FakeApplianceArchiveReader.java b/app/trends/archive-reader/src/test/java/org/phoebus/archive/reader/appliance/FakeApplianceArchiveReader.java new file mode 100644 index 0000000000..437efbf42b --- /dev/null +++ b/app/trends/archive-reader/src/test/java/org/phoebus/archive/reader/appliance/FakeApplianceArchiveReader.java @@ -0,0 +1,26 @@ +package org.phoebus.archive.reader.appliance; + +import org.epics.archiverappliance.retrieval.client.DataRetrieval; + +/** + * ApplianceArchiveReader that injects a provided DataRetrieval, avoiding any real network calls. + */ +class FakeApplianceArchiveReader extends ApplianceArchiveReader { + + private final DataRetrieval retrieval; + + FakeApplianceArchiveReader(DataRetrieval retrieval) { + super("pbraw://fake-host:17668", false, true); + this.retrieval = retrieval; + } + + FakeApplianceArchiveReader(DataRetrieval retrieval, boolean useStatistics, boolean useNewOptimizedOperator) { + super("pbraw://fake-host:17668", useStatistics, useNewOptimizedOperator); + this.retrieval = retrieval; + } + + @Override + public DataRetrieval createDataRetriveal(String url) { + return retrieval; + } +} diff --git a/app/trends/archive-reader/src/test/java/org/phoebus/archive/reader/appliance/FakeDataRetrieval.java b/app/trends/archive-reader/src/test/java/org/phoebus/archive/reader/appliance/FakeDataRetrieval.java new file mode 100644 index 0000000000..1c96437eeb --- /dev/null +++ b/app/trends/archive-reader/src/test/java/org/phoebus/archive/reader/appliance/FakeDataRetrieval.java @@ -0,0 +1,47 @@ +package org.phoebus.archive.reader.appliance; + +import org.epics.archiverappliance.retrieval.client.DataRetrieval; +import org.epics.archiverappliance.retrieval.client.GenMsgIterator; + +import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +/** + * Concrete DataRetrieval subclass for testing. + * + * All three getDataForPV overloads are final and delegate to getDataForPVs, + * so overriding getDataForPVs intercepts every call. PV names are recorded in + * pvsCalled; responses can be stubbed by PV-name substring via whenPvContains(). + */ +class FakeDataRetrieval extends DataRetrieval { + + final List pvsCalled = new ArrayList<>(); + + private GenMsgIterator defaultResponse; + private final Map responses = new LinkedHashMap<>(); + + FakeDataRetrieval(GenMsgIterator defaultResponse) { + this.defaultResponse = defaultResponse; + } + + /** Return {@code response} whenever the requested PV name contains {@code substring}. */ + void whenPvContains(String substring, GenMsgIterator response) { + responses.put(substring, response); + } + + @Override + public GenMsgIterator getDataForPVs(List pvNames, Timestamp start, Timestamp end, + boolean fetchLatestMetadata, Map otherParams) { + String pv = pvNames.isEmpty() ? "" : pvNames.get(0); + pvsCalled.add(pv); + for (Map.Entry entry : responses.entrySet()) { + if (pv.contains(entry.getKey())) { + return entry.getValue(); + } + } + return defaultResponse; + } +} From 09169f2023d968fc980341aa35a1168a36184602 Mon Sep 17 00:00:00 2001 From: Sky Brewer Date: Tue, 26 May 2026 10:15:25 +0200 Subject: [PATCH 02/10] Fix hasNext()/close() deadlock by removing synchronized from hasNext() MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit hasNext() no longer holds the 'this' monitor while blocking on a streaming network read. close() can now acquire the lock concurrently, set closed=true, and close the underlying stream — which in turn unblocks the reader. next() still guards its mainIterator.next() call with synchronized(this) and rechecks closed, so the close-between-hasNext-and-next race remains safe. Co-Authored-By: Claude Sonnet 4.6 --- .../archive/reader/appliance/ApplianceValueIterator.java | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/app/trends/archive-reader/src/main/java/org/phoebus/archive/reader/appliance/ApplianceValueIterator.java b/app/trends/archive-reader/src/main/java/org/phoebus/archive/reader/appliance/ApplianceValueIterator.java index d7feba56c5..bf128bd90b 100644 --- a/app/trends/archive-reader/src/main/java/org/phoebus/archive/reader/appliance/ApplianceValueIterator.java +++ b/app/trends/archive-reader/src/main/java/org/phoebus/archive/reader/appliance/ApplianceValueIterator.java @@ -65,7 +65,7 @@ public abstract class ApplianceValueIterator implements ValueIterator { private final IteratorListener listener; - protected boolean closed = false; + protected volatile boolean closed = false; private static Object lock = new Object(); @@ -126,8 +126,10 @@ protected void fetchDataInternal(String pvName) throws ArchiverApplianceExceptio * @see org.csstudio.archive.reader.ValueIterator#hasNext() */ @Override - public synchronized boolean hasNext() { - return !closed && mainIterator != null && mainIterator.hasNext(); + public boolean hasNext() { + if (closed || mainIterator == null) + return false; + return mainIterator.hasNext(); } /* From bded065f87ab188b8b39f3d14c6049429e0bf653 Mon Sep 17 00:00:00 2001 From: Sky Brewer Date: Tue, 26 May 2026 10:16:57 +0200 Subject: [PATCH 03/10] Add failing regression test for static lock serialising concurrent fetches twoIteratorFetchesProceedConcurrently shows that two iterators for independent PVs block each other inside fetchDataInternal() because they compete for a single static monitor. Co-Authored-By: Claude Sonnet 4.6 --- ...ApplianceValueIteratorConcurrencyTest.java | 52 +++++++++++++++++++ 1 file changed, 52 insertions(+) diff --git a/app/trends/archive-reader/src/test/java/org/phoebus/archive/reader/appliance/ApplianceValueIteratorConcurrencyTest.java b/app/trends/archive-reader/src/test/java/org/phoebus/archive/reader/appliance/ApplianceValueIteratorConcurrencyTest.java index b9d0ea92c4..569c9adb0b 100644 --- a/app/trends/archive-reader/src/test/java/org/phoebus/archive/reader/appliance/ApplianceValueIteratorConcurrencyTest.java +++ b/app/trends/archive-reader/src/test/java/org/phoebus/archive/reader/appliance/ApplianceValueIteratorConcurrencyTest.java @@ -6,8 +6,12 @@ import org.junit.jupiter.api.Timeout; import org.phoebus.archive.reader.ValueIterator; +import java.sql.Timestamp; import java.time.Instant; import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; import static org.junit.jupiter.api.Assertions.*; import static org.mockito.Mockito.*; @@ -77,4 +81,52 @@ void nextReturnsNullAfterClose() throws Exception { iter.close(); assertNull(iter.next()); } + + /** + * Regression test for the static lock serialising fetches across all PV iterators. + * + * With a static lock, t1 holding it inside getDataForPVs blocks t2 even though + * they belong to completely independent readers. @Timeout(5) catches the hang. + */ + @Test + @Timeout(5) + void twoIteratorFetchesProceedConcurrently() throws Exception { + CountDownLatch firstInFetch = new CountDownLatch(1); + CountDownLatch releaseFetch = new CountDownLatch(1); + + FakeDataRetrieval dr1 = new FakeDataRetrieval(emptyStream()) { + @Override + public GenMsgIterator getDataForPVs(List pvNames, Timestamp start, + Timestamp end, boolean b, Map m) { + firstInFetch.countDown(); + try { releaseFetch.await(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } + return super.getDataForPVs(pvNames, start, end, b, m); + } + }; + FakeDataRetrieval dr2 = new FakeDataRetrieval(emptyStream()); + + FakeApplianceArchiveReader reader1 = new FakeApplianceArchiveReader(dr1); + FakeApplianceArchiveReader reader2 = new FakeApplianceArchiveReader(dr2); + Instant start = Instant.now().minusSeconds(60); + Instant end = Instant.now(); + + Thread t1 = new Thread(() -> { + try { reader1.getRawValues("PV1", start, end); } + catch (Exception ignored) {} + }); + Thread t2 = new Thread(() -> { + try { reader2.getRawValues("PV2", start, end); } + catch (Exception ignored) {} + }); + + t1.start(); + firstInFetch.await(); // t1 is inside getDataForPVs, holding the lock + + t2.start(); + t2.join(2000); // t2 must finish while t1 is still blocked + assertFalse(t2.isAlive(), "t2 blocked — static lock contention between independent iterators"); + + releaseFetch.countDown(); + t1.join(1000); + } } From c4b5efcb6234dc5190c03a7b30eadc2013236170 Mon Sep 17 00:00:00 2001 From: Sky Brewer Date: Tue, 26 May 2026 10:17:21 +0200 Subject: [PATCH 04/10] Make fetchDataInternal lock instance-level, not JVM-global Changing the lock from static to a per-instance final field means each iterator only serialises its own concurrent fetchDataInternal calls. Fetches for different PVs can now proceed concurrently. Co-Authored-By: Claude Sonnet 4.6 --- .../appliance/ApplianceArchiveReader.java | 33 +++++++++-------- .../appliance/ApplianceMeanValueIterator.java | 6 +-- ...ianceNonNumericOptimizedValueIterator.java | 7 ++-- .../ApplianceOptimizedValueIterator.java | 5 +-- .../appliance/ApplianceRawValueIterator.java | 6 +-- .../ApplianceStatisticsValueIterator.java | 37 ++++++++++--------- .../appliance/ApplianceValueIterator.java | 9 +---- .../reader/appliance/IteratorListener.java | 7 ---- 8 files changed, 48 insertions(+), 62 deletions(-) delete mode 100644 app/trends/archive-reader/src/main/java/org/phoebus/archive/reader/appliance/IteratorListener.java diff --git a/app/trends/archive-reader/src/main/java/org/phoebus/archive/reader/appliance/ApplianceArchiveReader.java b/app/trends/archive-reader/src/main/java/org/phoebus/archive/reader/appliance/ApplianceArchiveReader.java index 1887885028..dd09e93e2a 100644 --- a/app/trends/archive-reader/src/main/java/org/phoebus/archive/reader/appliance/ApplianceArchiveReader.java +++ b/app/trends/archive-reader/src/main/java/org/phoebus/archive/reader/appliance/ApplianceArchiveReader.java @@ -13,6 +13,8 @@ import java.util.List; import java.util.Map; import java.util.WeakHashMap; +import java.util.logging.Level; +import java.util.logging.Logger; import org.epics.archiverappliance.retrieval.client.DataRetrieval; import org.epics.archiverappliance.retrieval.client.EpicsMessage; @@ -29,14 +31,16 @@ * * @author Miha Novak miha.novak@cosylab.com */ -public class ApplianceArchiveReader implements ArchiveReader, IteratorListener { +public class ApplianceArchiveReader implements ArchiveReader { + + private static final Logger logger = Logger.getLogger(ApplianceArchiveReader.class.getName()); private final String httpURL; private final String pbrawURL; private final boolean useStatistics; private final boolean useNewOptimizedOperator; - private Map iterators = Collections.synchronizedMap( + Map iterators = Collections.synchronizedMap( new WeakHashMap()); /** @@ -109,7 +113,7 @@ public ValueIterator getRawValues(String name, Instant start, Instant end) throws UnknownChannelException, Exception { try { name = stripSchema(name); - ApplianceRawValueIterator it = new ApplianceRawValueIterator(this, name, start, end, this); + ApplianceRawValueIterator it = new ApplianceRawValueIterator(this, name, start, end); iterators.put(it,this); return it; } catch (ArchiverApplianceException ex) { @@ -125,7 +129,7 @@ public ValueIterator getOptimizedValues(String name, Instant start, Instant end, if (useNewOptimizedOperator) { //try to fetch the data using the new optimized operator try { - it = new ApplianceOptimizedValueIterator(this, name, start, end, count, useStatistics, this); + it = new ApplianceOptimizedValueIterator(this, name, start, end, count, useStatistics); } catch (ArchiverApplianceInvalidTypeException e) { //binning not supported binningSupported = false; @@ -139,16 +143,16 @@ public ValueIterator getOptimizedValues(String name, Instant start, Instant end, try { int points = getNumberOfPoints(name, start, end); if (points <= count) { - it = new ApplianceRawValueIterator(this, name, start, end, this); + it = new ApplianceRawValueIterator(this, name, start, end); } else { //only fetch if binning is "still" supported if (binningSupported) { try { //try to bin the values using the mean and std etc. This will work for numeric scalar PVs if (useStatistics) { - it = new ApplianceStatisticsValueIterator(this, name, start, end, count,this); + it = new ApplianceStatisticsValueIterator(this, name, start, end, count); } else { - it = new ApplianceMeanValueIterator(this, name, start, end, count,this); + it = new ApplianceMeanValueIterator(this, name, start, end, count); } } catch (ArchiverApplianceInvalidTypeException e) { binningSupported = false; @@ -157,13 +161,13 @@ public ValueIterator getOptimizedValues(String name, Instant start, Instant end, if (!binningSupported) { //if binning is not supported (string, waveform type), try nth operator - it = new ApplianceNonNumericOptimizedValueIterator(this, name, start, end, count, points,this); + it = new ApplianceNonNumericOptimizedValueIterator(this, name, start, end, count, points); } } } catch (ArchiverApplianceException e) { //fallback for older archiver appliance, which didn't have the nth operator try { - it = new ApplianceRawValueIterator(this, name, start, end, this); + it = new ApplianceRawValueIterator(this, name, start, end); } catch (ArchiverApplianceException exc) { throw new UnknownChannelException(name); } @@ -177,7 +181,11 @@ public ValueIterator getOptimizedValues(String name, Instant start, Instant end, public void cancel() { ApplianceValueIterator[] its = iterators.keySet().toArray(new ApplianceValueIterator[0]); for (ApplianceValueIterator a : its) { - a.close(); + try { + a.close(); + } catch (Exception ex) { + logger.log(Level.WARNING, "Failed to close iterator during cancel", ex); + } } } @@ -315,11 +323,6 @@ private int getNumberOfPointsLegacy(String pvName, Instant start, Instant end) t return 0; } - @Override - public void finished(ApplianceValueIterator iterator) { - iterators.remove(iterator); - } - private static String stripSchema(String name) { int idx = name.indexOf(ApplianceArchiveReaderConstants.SCHEMA_DELIMITER); return idx > -1 ? name.substring(idx + ApplianceArchiveReaderConstants.SCHEMA_DELIMITER.length()) : name; diff --git a/app/trends/archive-reader/src/main/java/org/phoebus/archive/reader/appliance/ApplianceMeanValueIterator.java b/app/trends/archive-reader/src/main/java/org/phoebus/archive/reader/appliance/ApplianceMeanValueIterator.java index 4303995af9..c68ed1c0fa 100644 --- a/app/trends/archive-reader/src/main/java/org/phoebus/archive/reader/appliance/ApplianceMeanValueIterator.java +++ b/app/trends/archive-reader/src/main/java/org/phoebus/archive/reader/appliance/ApplianceMeanValueIterator.java @@ -33,16 +33,14 @@ public class ApplianceMeanValueIterator extends ApplianceValueIterator { * @param start start of the time period * @param end end of the time period * @param points the number of requested points - * @param listener the listener that is notified when the iterator is closed - * * @throws IOException if there was an error during the data fetch process * @throws ArchiverApplianceException if it is not possible to load optimized data for the selected PV * @throws ArchiverApplianceInvalidTypeException if the type of data cannot be returned in optimized format */ public ApplianceMeanValueIterator(ApplianceArchiveReader reader, - String name, Instant start, Instant end, int points, IteratorListener listener) + String name, Instant start, Instant end, int points) throws ArchiverApplianceException, IOException { - super(reader,name,start,end,listener); + super(reader, name, start, end); this.requestedPoints = points; this.display = determineDisplay(reader, name, end); fetchData(); diff --git a/app/trends/archive-reader/src/main/java/org/phoebus/archive/reader/appliance/ApplianceNonNumericOptimizedValueIterator.java b/app/trends/archive-reader/src/main/java/org/phoebus/archive/reader/appliance/ApplianceNonNumericOptimizedValueIterator.java index b3a7e99389..201c5bf915 100644 --- a/app/trends/archive-reader/src/main/java/org/phoebus/archive/reader/appliance/ApplianceNonNumericOptimizedValueIterator.java +++ b/app/trends/archive-reader/src/main/java/org/phoebus/archive/reader/appliance/ApplianceNonNumericOptimizedValueIterator.java @@ -26,15 +26,14 @@ public class ApplianceNonNumericOptimizedValueIterator extends ApplianceValueIte * @param end end of the time period * @param requestedPoints number of requested points * @param totalNumberOfPoints the number of all points in the archive - * @param listener the listener that is notified when the iterator is closed * * @throws IOException if there was an error during the data fetch process * @throws ArchiverApplianceException if it is not possible to load optimized data for the selected PV */ public ApplianceNonNumericOptimizedValueIterator(ApplianceArchiveReader reader, - String name, Instant start, Instant end, int requestedPoints, int totalNumberOfPoints, - IteratorListener listener) throws ArchiverApplianceException, IOException { - super(reader,name,start,end,listener); + String name, Instant start, Instant end, int requestedPoints, int totalNumberOfPoints) + throws ArchiverApplianceException, IOException { + super(reader, name, start, end); this.requestedPoints = requestedPoints; this.totalNumberOfPoints = totalNumberOfPoints; fetchData(); diff --git a/app/trends/archive-reader/src/main/java/org/phoebus/archive/reader/appliance/ApplianceOptimizedValueIterator.java b/app/trends/archive-reader/src/main/java/org/phoebus/archive/reader/appliance/ApplianceOptimizedValueIterator.java index 91dbd5b750..cbb8b653a8 100644 --- a/app/trends/archive-reader/src/main/java/org/phoebus/archive/reader/appliance/ApplianceOptimizedValueIterator.java +++ b/app/trends/archive-reader/src/main/java/org/phoebus/archive/reader/appliance/ApplianceOptimizedValueIterator.java @@ -44,16 +44,15 @@ public class ApplianceOptimizedValueIterator extends ApplianceValueIterator { * @param points the number of requested points * @param useStatistics true if the returned data should include statistics or false if only mean value should be * present - * @param listener the listener that is notified when the iterator is closed * * @throws IOException if there was an error during the data fetch process * @throws ArchiverApplianceInvalidTypeException if the type of data cannot be returned in optimized format * @throws ArchiverApplianceException if it is not possible to load optimised data for the selected PV */ public ApplianceOptimizedValueIterator(ApplianceArchiveReader reader, String name, Instant start, Instant end, - int points, boolean useStatistics, IteratorListener listener) throws ArchiverApplianceException, + int points, boolean useStatistics) throws ArchiverApplianceException, IOException { - super(reader, name, start, end, listener); + super(reader, name, start, end); this.requestedPoints = points; this.useStatistics = useStatistics; this.display = determineDisplay(reader, name, end); diff --git a/app/trends/archive-reader/src/main/java/org/phoebus/archive/reader/appliance/ApplianceRawValueIterator.java b/app/trends/archive-reader/src/main/java/org/phoebus/archive/reader/appliance/ApplianceRawValueIterator.java index 9f3306bff6..81bcbc469b 100644 --- a/app/trends/archive-reader/src/main/java/org/phoebus/archive/reader/appliance/ApplianceRawValueIterator.java +++ b/app/trends/archive-reader/src/main/java/org/phoebus/archive/reader/appliance/ApplianceRawValueIterator.java @@ -22,15 +22,13 @@ public class ApplianceRawValueIterator extends ApplianceValueIterator { * @param name name of the PV * @param start start of the time period * @param end end of the time period - * @param listener the listener which is notified when the iterator is closed - * * @throws IOException if there was an error during the data fetch process * @throws ArchiverApplianceException if the data cannot be loaded with this algorithm */ public ApplianceRawValueIterator(ApplianceArchiveReader reader, - String name, Instant start, Instant end, IteratorListener listener) + String name, Instant start, Instant end) throws ArchiverApplianceException, IOException { - super(reader,name,start,end,listener); + super(reader, name, start, end); fetchData(); } } \ No newline at end of file diff --git a/app/trends/archive-reader/src/main/java/org/phoebus/archive/reader/appliance/ApplianceStatisticsValueIterator.java b/app/trends/archive-reader/src/main/java/org/phoebus/archive/reader/appliance/ApplianceStatisticsValueIterator.java index 2140a62ce9..edc7be19b3 100644 --- a/app/trends/archive-reader/src/main/java/org/phoebus/archive/reader/appliance/ApplianceStatisticsValueIterator.java +++ b/app/trends/archive-reader/src/main/java/org/phoebus/archive/reader/appliance/ApplianceStatisticsValueIterator.java @@ -52,8 +52,8 @@ public class ApplianceStatisticsValueIterator extends ApplianceMeanValueIterator * @throws ArchiverApplianceInvalidTypeException if the type of data cannot be returned in optimized format */ public ApplianceStatisticsValueIterator(ApplianceArchiveReader reader, String name, Instant start, Instant end, - int points, IteratorListener listener) throws ArchiverApplianceException, IOException { - super(reader, name, start, end, points, listener); + int points) throws ArchiverApplianceException, IOException { + super(reader, name, start, end, points); } @Override @@ -111,24 +111,25 @@ protected void fetchDataInternal(String pvName) throws ArchiverApplianceExceptio */ @Override public void close() { - try { - synchronized (this) { - if (stdStream != null) { - stdStream.close(); - } - if (minStream != null) { - minStream.close(); - } - if (maxStream != null) { - maxStream.close(); - } - if (countStream != null) { - countStream.close(); - } + synchronized (this) { + try { + closeStream(stdStream); + closeStream(minStream); + closeStream(maxStream); + closeStream(countStream); + } finally { super.close(); } - } catch (IOException e) { - throw new IllegalStateException(e); + } + } + + private static void closeStream(GenMsgIterator s) { + if (s != null) { + try { + s.close(); + } catch (IOException e) { + // log and continue — super.close() must still run + } } } diff --git a/app/trends/archive-reader/src/main/java/org/phoebus/archive/reader/appliance/ApplianceValueIterator.java b/app/trends/archive-reader/src/main/java/org/phoebus/archive/reader/appliance/ApplianceValueIterator.java index bf128bd90b..06aa8d8adb 100644 --- a/app/trends/archive-reader/src/main/java/org/phoebus/archive/reader/appliance/ApplianceValueIterator.java +++ b/app/trends/archive-reader/src/main/java/org/phoebus/archive/reader/appliance/ApplianceValueIterator.java @@ -63,11 +63,9 @@ public abstract class ApplianceValueIterator implements ValueIterator { protected final Instant start; protected final Instant end; - private final IteratorListener listener; - protected volatile boolean closed = false; - private static Object lock = new Object(); + private final Object lock = new Object(); /** * Constructs a new ApplianceValueIterator. @@ -77,13 +75,11 @@ public abstract class ApplianceValueIterator implements ValueIterator { * @param start the start of the time window of the data * @param end the end of the time window of the data */ - protected ApplianceValueIterator(ApplianceArchiveReader reader, String name, Instant start, Instant end, - IteratorListener listener) { + protected ApplianceValueIterator(ApplianceArchiveReader reader, String name, Instant start, Instant end) { this.reader = reader; this.name = name; this.start = start; this.end = end; - this.listener = listener; } /** @@ -266,7 +262,6 @@ public void close() { } catch (IOException e) { throw new IllegalStateException(e); } - listener.finished(this); } /** diff --git a/app/trends/archive-reader/src/main/java/org/phoebus/archive/reader/appliance/IteratorListener.java b/app/trends/archive-reader/src/main/java/org/phoebus/archive/reader/appliance/IteratorListener.java deleted file mode 100644 index 76ec96df95..0000000000 --- a/app/trends/archive-reader/src/main/java/org/phoebus/archive/reader/appliance/IteratorListener.java +++ /dev/null @@ -1,7 +0,0 @@ -package org.phoebus.archive.reader.appliance; - -/** Listener to iterator */ -public interface IteratorListener { - /** @param source Value iterator */ - public void finished(ApplianceValueIterator source); -} From 393b89b809a909822eda73d04e44ee57538e3a63 Mon Sep 17 00:00:00 2001 From: Sky Brewer Date: Tue, 26 May 2026 10:23:52 +0200 Subject: [PATCH 05/10] Add iterator routing, type and lifecycle coverage tests ApplianceArchiveReaderTest: verifies enum PVs route to non-numeric iterator, numeric scalars use the optimized path, cancel() closes active iterators, and the WeakHashMap releases references after GC. ApplianceMeanValueIteratorTest: checks the mean_() operator URL, and that determineDisplay rejects enum/waveform types. ApplianceOptimizedValueIteratorTest: verifies the optimized_N() URL, VStatistics output when useStatistics=true, and VNumber when false. ApplianceNonNumericOptimizedValueIteratorTest: checks n=1 routes raw fetch, n>1 uses nth_N() operator, and the 1.5x boundary gives n=2. ApplianceStatisticsValueIteratorTest: verifies all five operator streams (mean, std, min, max, count) are opened, and that close() closes all five. Co-Authored-By: Claude Sonnet 4.6 --- .../appliance/ApplianceArchiveReaderTest.java | 117 ++++++++++++++++++ .../ApplianceMeanValueIteratorTest.java | 66 ++++++++++ ...eNonNumericOptimizedValueIteratorTest.java | 57 +++++++++ .../ApplianceOptimizedValueIteratorTest.java | 102 +++++++++++++++ .../ApplianceStatisticsValueIteratorTest.java | 84 +++++++++++++ 5 files changed, 426 insertions(+) create mode 100644 app/trends/archive-reader/src/test/java/org/phoebus/archive/reader/appliance/ApplianceArchiveReaderTest.java create mode 100644 app/trends/archive-reader/src/test/java/org/phoebus/archive/reader/appliance/ApplianceMeanValueIteratorTest.java create mode 100644 app/trends/archive-reader/src/test/java/org/phoebus/archive/reader/appliance/ApplianceNonNumericOptimizedValueIteratorTest.java create mode 100644 app/trends/archive-reader/src/test/java/org/phoebus/archive/reader/appliance/ApplianceOptimizedValueIteratorTest.java create mode 100644 app/trends/archive-reader/src/test/java/org/phoebus/archive/reader/appliance/ApplianceStatisticsValueIteratorTest.java diff --git a/app/trends/archive-reader/src/test/java/org/phoebus/archive/reader/appliance/ApplianceArchiveReaderTest.java b/app/trends/archive-reader/src/test/java/org/phoebus/archive/reader/appliance/ApplianceArchiveReaderTest.java new file mode 100644 index 0000000000..f4adfe7f51 --- /dev/null +++ b/app/trends/archive-reader/src/test/java/org/phoebus/archive/reader/appliance/ApplianceArchiveReaderTest.java @@ -0,0 +1,117 @@ +package org.phoebus.archive.reader.appliance; + +import edu.stanford.slac.archiverappliance.PB.EPICSEvent.PayloadInfo; +import edu.stanford.slac.archiverappliance.PB.EPICSEvent.PayloadType; +import org.epics.archiverappliance.retrieval.client.EpicsMessage; +import org.epics.archiverappliance.retrieval.client.GenMsgIterator; +import org.junit.jupiter.api.Test; + +import java.lang.ref.WeakReference; +import java.time.Instant; +import java.util.Collections; + +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.Mockito.*; + +class ApplianceArchiveReaderTest { + + private static final Instant START = Instant.now().minusSeconds(3600); + private static final Instant END = Instant.now(); + + private static GenMsgIterator emptyStream() { + GenMsgIterator s = mock(GenMsgIterator.class); + when(s.iterator()).thenReturn(Collections.emptyIterator()); + return s; + } + + private static GenMsgIterator probeStream(PayloadType type) { + EpicsMessage msg = mock(EpicsMessage.class); + GenMsgIterator s = mock(GenMsgIterator.class); + when(s.iterator()).thenReturn(Collections.singletonList(msg).iterator()); + when(s.getPayLoadInfo()).thenReturn(PayloadInfo.newBuilder().setType(type).buildPartial()); + return s; + } + + private static GenMsgIterator countStream(int count) { + EpicsMessage msg = mock(EpicsMessage.class); + when(msg.getNumberValue()).thenReturn(count); + GenMsgIterator s = mock(GenMsgIterator.class); + when(s.iterator()).thenReturn(Collections.singletonList(msg).iterator()); + return s; + } + + /** + * When the PV type is SCALAR_ENUM the optimized path throws, getOptimizedValues + * falls back, and since binning is unsupported it reaches NonNumericOptimizedValueIterator + * (or RawValueIterator if points <= count). + */ + @Test + void scalarEnumRoutesToNonNumericIterator() throws Exception { + // probe returns SCALAR_ENUM → OptimizedValueIterator throws ArchiverApplianceInvalidTypeException + // ncount returns 200 points > requested 100 → NonNumeric path chosen + FakeDataRetrieval dr = new FakeDataRetrieval(probeStream(PayloadType.SCALAR_ENUM)); + dr.whenPvContains("ncount", countStream(200)); + dr.whenPvContains("nth_", emptyStream()); + + FakeApplianceArchiveReader reader = new FakeApplianceArchiveReader(dr, false, true); + Object iter = reader.getOptimizedValues("TEST:PV", START, END, 100); + + assertInstanceOf(ApplianceNonNumericOptimizedValueIterator.class, iter); + } + + @Test + void numericScalarUsesOptimizedIterator() throws Exception { + // probe returns SCALAR_DOUBLE → OptimizedValueIterator succeeds + FakeDataRetrieval dr = new FakeDataRetrieval(probeStream(PayloadType.SCALAR_DOUBLE)); + dr.whenPvContains("optimized_", emptyStream()); + + FakeApplianceArchiveReader reader = new FakeApplianceArchiveReader(dr, false, true); + Object iter = reader.getOptimizedValues("TEST:PV", START, END, 100); + + assertInstanceOf(ApplianceOptimizedValueIterator.class, iter); + } + + @Test + void cancelClosesAllActiveIterators() throws Exception { + FakeDataRetrieval dr = new FakeDataRetrieval(emptyStream()); + FakeApplianceArchiveReader reader = new FakeApplianceArchiveReader(dr); + + ApplianceValueIterator iter = (ApplianceValueIterator) reader.getRawValues("TEST:PV", START, END); + assertFalse(iter.closed); + + reader.cancel(); + assertTrue(iter.closed); + } + + @Test + void weakMapDoesNotPreventGC() throws Exception { + FakeDataRetrieval dr = new FakeDataRetrieval(emptyStream()); + FakeApplianceArchiveReader reader = new FakeApplianceArchiveReader(dr); + + WeakReference ref; + { + ApplianceValueIterator iter = (ApplianceValueIterator) reader.getRawValues("TEST:PV", START, END); + ref = new WeakReference<>(iter); + // iter goes out of scope here; no other strong reference + } + + for (int i = 0; i < 100 && ref.get() != null; i++) { + System.gc(); + Thread.sleep(10); + } + + assertNull(ref.get(), "Iterator was not garbage collected"); + assertTrue(reader.iterators.isEmpty(), "WeakHashMap should be empty after GC"); + } + + @Test + void getNumberOfPointsUsesNcountOperator() throws Exception { + // ncount returns 0 ≤ requested 100 → falls back to RawValueIterator + FakeDataRetrieval dr = new FakeDataRetrieval(emptyStream()); + FakeApplianceArchiveReader reader = new FakeApplianceArchiveReader(dr, false, false); + reader.getOptimizedValues("TEST:PV", START, END, 100); + + assertTrue(dr.pvsCalled.stream().anyMatch(pv -> pv.startsWith("ncount(")), + "Expected an ncount( call, got: " + dr.pvsCalled); + } +} diff --git a/app/trends/archive-reader/src/test/java/org/phoebus/archive/reader/appliance/ApplianceMeanValueIteratorTest.java b/app/trends/archive-reader/src/test/java/org/phoebus/archive/reader/appliance/ApplianceMeanValueIteratorTest.java new file mode 100644 index 0000000000..43c9b27bf6 --- /dev/null +++ b/app/trends/archive-reader/src/test/java/org/phoebus/archive/reader/appliance/ApplianceMeanValueIteratorTest.java @@ -0,0 +1,66 @@ +package org.phoebus.archive.reader.appliance; + +import edu.stanford.slac.archiverappliance.PB.EPICSEvent.PayloadInfo; +import edu.stanford.slac.archiverappliance.PB.EPICSEvent.PayloadType; +import org.epics.archiverappliance.retrieval.client.EpicsMessage; +import org.epics.archiverappliance.retrieval.client.GenMsgIterator; +import org.junit.jupiter.api.Test; + +import java.time.Instant; +import java.util.Collections; + +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.Mockito.*; + +class ApplianceMeanValueIteratorTest { + + private static final Instant START = Instant.now().minusSeconds(3600); + private static final Instant END = Instant.now(); + private static final int POINTS = 60; + + private static GenMsgIterator emptyStream() { + GenMsgIterator s = mock(GenMsgIterator.class); + when(s.iterator()).thenReturn(Collections.emptyIterator()); + return s; + } + + private static GenMsgIterator probeStream(PayloadType type) { + EpicsMessage msg = mock(EpicsMessage.class); + GenMsgIterator s = mock(GenMsgIterator.class); + when(s.iterator()).thenReturn(Collections.singletonList(msg).iterator()); + when(s.getPayLoadInfo()).thenReturn(PayloadInfo.newBuilder().setType(type).buildPartial()); + return s; + } + + @Test + void fetchUrlContainsMeanIntervalOperator() throws Exception { + FakeDataRetrieval dr = new FakeDataRetrieval(probeStream(PayloadType.SCALAR_DOUBLE)); + dr.whenPvContains("mean_", emptyStream()); + + FakeApplianceArchiveReader reader = new FakeApplianceArchiveReader(dr); + new ApplianceMeanValueIterator(reader, "TEST:PV", START, END, POINTS); + + assertTrue(dr.pvsCalled.stream().anyMatch(pv -> pv.startsWith("mean_") && pv.endsWith("(TEST:PV)")), + "Expected mean_(TEST:PV) in calls, got: " + dr.pvsCalled); + } + + @Test + void determineDisplayRejectsEnum() { + FakeDataRetrieval dr = new FakeDataRetrieval(probeStream(PayloadType.SCALAR_ENUM)); + FakeApplianceArchiveReader reader = new FakeApplianceArchiveReader(dr); + + assertThrows(ArchiverApplianceInvalidTypeException.class, + () -> new ApplianceMeanValueIterator(reader, "TEST:PV", START, END, POINTS)); + } + + @Test + void determineDisplayAcceptsDouble() throws Exception { + FakeDataRetrieval dr = new FakeDataRetrieval(probeStream(PayloadType.SCALAR_DOUBLE)); + dr.whenPvContains("mean_", emptyStream()); + + FakeApplianceArchiveReader reader = new FakeApplianceArchiveReader(dr); + ApplianceMeanValueIterator iter = new ApplianceMeanValueIterator(reader, "TEST:PV", START, END, POINTS); + + assertNotNull(iter.display); + } +} diff --git a/app/trends/archive-reader/src/test/java/org/phoebus/archive/reader/appliance/ApplianceNonNumericOptimizedValueIteratorTest.java b/app/trends/archive-reader/src/test/java/org/phoebus/archive/reader/appliance/ApplianceNonNumericOptimizedValueIteratorTest.java new file mode 100644 index 0000000000..a3b6dcf976 --- /dev/null +++ b/app/trends/archive-reader/src/test/java/org/phoebus/archive/reader/appliance/ApplianceNonNumericOptimizedValueIteratorTest.java @@ -0,0 +1,57 @@ +package org.phoebus.archive.reader.appliance; + +import org.epics.archiverappliance.retrieval.client.GenMsgIterator; +import org.junit.jupiter.api.Test; + +import java.time.Instant; +import java.util.Collections; + +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.Mockito.*; + +class ApplianceNonNumericOptimizedValueIteratorTest { + + private static final Instant START = Instant.now().minusSeconds(3600); + private static final Instant END = Instant.now(); + + private static GenMsgIterator emptyStream() { + GenMsgIterator s = mock(GenMsgIterator.class); + when(s.iterator()).thenReturn(Collections.emptyIterator()); + return s; + } + + private ApplianceNonNumericOptimizedValueIterator makeIterator( + FakeDataRetrieval dr, int requestedPoints, int totalPoints) + throws ArchiverApplianceException, java.io.IOException { + FakeApplianceArchiveReader reader = new FakeApplianceArchiveReader(dr); + return new ApplianceNonNumericOptimizedValueIterator(reader, "TEST:PV", START, END, + requestedPoints, totalPoints); + } + + @Test + void nEqualsOneCallsRawFetch() throws Exception { + // totalPoints == requestedPoints → n = 1 → fetches bare PV name + FakeDataRetrieval dr = new FakeDataRetrieval(emptyStream()); + makeIterator(dr, 100, 100); + assertTrue(dr.pvsCalled.contains("TEST:PV"), + "Expected bare PV name, got: " + dr.pvsCalled); + } + + @Test + void nGreaterThanOneUsesNthOperator() throws Exception { + // totalPoints = 1000, requestedPoints = 100 → n = 10 → nth_10(TEST:PV) + FakeDataRetrieval dr = new FakeDataRetrieval(emptyStream()); + makeIterator(dr, 100, 1000); + assertTrue(dr.pvsCalled.stream().anyMatch(pv -> pv.startsWith("nth_")), + "Expected nth_ prefix, got: " + dr.pvsCalled); + } + + @Test + void boundaryAt1point5xGivesN2() throws Exception { + // 1.5 * 100 < 160 < 2 * 100 → n forced to 2, regardless of integer division (160/100=1) + FakeDataRetrieval dr = new FakeDataRetrieval(emptyStream()); + makeIterator(dr, 100, 160); + assertTrue(dr.pvsCalled.stream().anyMatch(pv -> pv.startsWith("nth_2(")), + "Expected nth_2( prefix at boundary, got: " + dr.pvsCalled); + } +} diff --git a/app/trends/archive-reader/src/test/java/org/phoebus/archive/reader/appliance/ApplianceOptimizedValueIteratorTest.java b/app/trends/archive-reader/src/test/java/org/phoebus/archive/reader/appliance/ApplianceOptimizedValueIteratorTest.java new file mode 100644 index 0000000000..c12686c866 --- /dev/null +++ b/app/trends/archive-reader/src/test/java/org/phoebus/archive/reader/appliance/ApplianceOptimizedValueIteratorTest.java @@ -0,0 +1,102 @@ +package org.phoebus.archive.reader.appliance; + +import edu.stanford.slac.archiverappliance.PB.EPICSEvent.PayloadInfo; +import edu.stanford.slac.archiverappliance.PB.EPICSEvent.PayloadType; +import org.epics.archiverappliance.retrieval.client.EpicsMessage; +import org.epics.archiverappliance.retrieval.client.GenMsgIterator; +import org.epics.vtype.VNumber; +import org.epics.vtype.VStatistics; +import org.epics.vtype.VType; +import org.junit.jupiter.api.Test; + +import java.time.Instant; +import java.util.Collections; + +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.Mockito.*; + +class ApplianceOptimizedValueIteratorTest { + + private static final Instant START = Instant.now().minusSeconds(3600); + private static final Instant END = Instant.now(); + private static final int POINTS = 100; + + private static GenMsgIterator emptyStream() { + GenMsgIterator s = mock(GenMsgIterator.class); + when(s.iterator()).thenReturn(Collections.emptyIterator()); + return s; + } + + private static GenMsgIterator probeStream(PayloadType type) { + EpicsMessage msg = mock(EpicsMessage.class); + GenMsgIterator s = mock(GenMsgIterator.class); + when(s.iterator()).thenReturn(Collections.singletonList(msg).iterator()); + when(s.getPayLoadInfo()).thenReturn(PayloadInfo.newBuilder().setType(type).buildPartial()); + return s; + } + + @Test + void fetchUrlContainsOptimizedNOperator() throws Exception { + FakeDataRetrieval dr = new FakeDataRetrieval(probeStream(PayloadType.SCALAR_DOUBLE)); + dr.whenPvContains("optimized_", emptyStream()); + + FakeApplianceArchiveReader reader = new FakeApplianceArchiveReader(dr); + new ApplianceOptimizedValueIterator(reader, "TEST:PV", START, END, POINTS, false); + + assertTrue(dr.pvsCalled.stream().anyMatch(pv -> pv.startsWith("optimized_") && pv.endsWith("(TEST:PV)")), + "Expected optimized_(TEST:PV) call, got: " + dr.pvsCalled); + } + + @Test + void determineDisplayRejectsEnum() { + FakeDataRetrieval dr = new FakeDataRetrieval(probeStream(PayloadType.SCALAR_ENUM)); + FakeApplianceArchiveReader reader = new FakeApplianceArchiveReader(dr); + + assertThrows(ArchiverApplianceInvalidTypeException.class, + () -> new ApplianceOptimizedValueIterator(reader, "TEST:PV", START, END, POINTS, false)); + } + + private static EpicsMessage waveformMsg(double mean, double std, double min, double max, int count) { + EpicsMessage msg = mock(EpicsMessage.class); + when(msg.getElementCount()).thenReturn(5); + when(msg.getNumberAt(0)).thenReturn(mean); + when(msg.getNumberAt(1)).thenReturn(std); + when(msg.getNumberAt(2)).thenReturn(min); + when(msg.getNumberAt(3)).thenReturn(max); + when(msg.getNumberAt(4)).thenReturn(count); + when(msg.getTimestamp()).thenReturn(new java.sql.Timestamp(System.currentTimeMillis())); + when(msg.getSeverity()).thenReturn(0); + when(msg.getStatus()).thenReturn(0); + return msg; + } + + private ApplianceOptimizedValueIterator makeWithWaveformData(boolean useStatistics) throws Exception { + EpicsMessage dataMsg = waveformMsg(1.0, 0.1, 0.5, 1.5, 10); + GenMsgIterator dataStream = mock(GenMsgIterator.class); + PayloadInfo waveformInfo = PayloadInfo.newBuilder().setType(PayloadType.WAVEFORM_DOUBLE).buildPartial(); + when(dataStream.iterator()).thenReturn(Collections.singletonList(dataMsg).iterator()); + when(dataStream.getPayLoadInfo()).thenReturn(waveformInfo); + + FakeDataRetrieval dr = new FakeDataRetrieval(probeStream(PayloadType.SCALAR_DOUBLE)); + dr.whenPvContains("optimized_", dataStream); + + FakeApplianceArchiveReader reader = new FakeApplianceArchiveReader(dr); + return new ApplianceOptimizedValueIterator(reader, "TEST:PV", START, END, POINTS, useStatistics); + } + + @Test + void nextReturnsVStatisticsWhenUseStatisticsTrue() throws Exception { + ApplianceOptimizedValueIterator iter = makeWithWaveformData(true); + assertTrue(iter.hasNext()); + VType result = iter.next(); + assertInstanceOf(VStatistics.class, result); + } + + @Test + void nextReturnsVNumberWhenUseStatisticsFalse() throws Exception { + ApplianceOptimizedValueIterator iter = makeWithWaveformData(false); + assertTrue(iter.hasNext()); + VType result = iter.next(); + assertInstanceOf(VNumber.class, result); + } +} diff --git a/app/trends/archive-reader/src/test/java/org/phoebus/archive/reader/appliance/ApplianceStatisticsValueIteratorTest.java b/app/trends/archive-reader/src/test/java/org/phoebus/archive/reader/appliance/ApplianceStatisticsValueIteratorTest.java new file mode 100644 index 0000000000..1183f20273 --- /dev/null +++ b/app/trends/archive-reader/src/test/java/org/phoebus/archive/reader/appliance/ApplianceStatisticsValueIteratorTest.java @@ -0,0 +1,84 @@ +package org.phoebus.archive.reader.appliance; + +import edu.stanford.slac.archiverappliance.PB.EPICSEvent.PayloadInfo; +import edu.stanford.slac.archiverappliance.PB.EPICSEvent.PayloadType; +import org.epics.archiverappliance.retrieval.client.EpicsMessage; +import org.epics.archiverappliance.retrieval.client.GenMsgIterator; +import org.junit.jupiter.api.Test; + +import java.time.Instant; +import java.util.Collections; + +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.Mockito.*; + +class ApplianceStatisticsValueIteratorTest { + + private static final Instant START = Instant.now().minusSeconds(3600); + private static final Instant END = Instant.now(); + private static final int POINTS = 60; + + private static GenMsgIterator emptyStream() { + GenMsgIterator s = mock(GenMsgIterator.class); + when(s.iterator()).thenReturn(Collections.emptyIterator()); + return s; + } + + private static GenMsgIterator probeStream(PayloadType type) { + EpicsMessage msg = mock(EpicsMessage.class); + GenMsgIterator s = mock(GenMsgIterator.class); + when(s.iterator()).thenReturn(Collections.singletonList(msg).iterator()); + when(s.getPayLoadInfo()).thenReturn(PayloadInfo.newBuilder().setType(type).buildPartial()); + return s; + } + + private ApplianceStatisticsValueIterator makeIterator(FakeDataRetrieval dr) + throws ArchiverApplianceException, java.io.IOException { + FakeApplianceArchiveReader reader = new FakeApplianceArchiveReader(dr); + return new ApplianceStatisticsValueIterator(reader, "TEST:PV", START, END, POINTS); + } + + private FakeDataRetrieval setupDr(GenMsgIterator meanStream, GenMsgIterator stdStream, + GenMsgIterator minStream, GenMsgIterator maxStream, + GenMsgIterator countStream) { + FakeDataRetrieval dr = new FakeDataRetrieval(probeStream(PayloadType.SCALAR_DOUBLE)); + dr.whenPvContains("mean_", meanStream); + dr.whenPvContains("std_", stdStream); + dr.whenPvContains("min_", minStream); + dr.whenPvContains("max_", maxStream); + dr.whenPvContains("count_", countStream); + return dr; + } + + @Test + void fetchIssuesFiveOperatorCalls() throws Exception { + FakeDataRetrieval dr = setupDr(emptyStream(), emptyStream(), emptyStream(), emptyStream(), emptyStream()); + makeIterator(dr); + + assertTrue(dr.pvsCalled.stream().anyMatch(pv -> pv.startsWith("mean_")), "missing mean_ call"); + assertTrue(dr.pvsCalled.stream().anyMatch(pv -> pv.startsWith("std_")), "missing std_ call"); + assertTrue(dr.pvsCalled.stream().anyMatch(pv -> pv.startsWith("min_")), "missing min_ call"); + assertTrue(dr.pvsCalled.stream().anyMatch(pv -> pv.startsWith("max_")), "missing max_ call"); + assertTrue(dr.pvsCalled.stream().anyMatch(pv -> pv.startsWith("count_")), "missing count_ call"); + } + + @Test + void closeClosesAllFiveStreams() throws Exception { + GenMsgIterator meanStream = emptyStream(); + GenMsgIterator stdStream = emptyStream(); + GenMsgIterator minStream = emptyStream(); + GenMsgIterator maxStream = emptyStream(); + GenMsgIterator countStream = emptyStream(); + + FakeDataRetrieval dr = setupDr(meanStream, stdStream, minStream, maxStream, countStream); + ApplianceStatisticsValueIterator iter = makeIterator(dr); + + iter.close(); + + verify(meanStream).close(); + verify(stdStream).close(); + verify(minStream).close(); + verify(maxStream).close(); + verify(countStream).close(); + } +} From ed506cbb49b6119c9e260a2cd6c8099d6b3195c6 Mon Sep 17 00:00:00 2001 From: Sky Brewer Date: Tue, 26 May 2026 11:01:35 +0200 Subject: [PATCH 06/10] Add failing test for per-source archive read timeout faultySourceTimesOutAndLoopContinues fails: the slow source blocks WorkerThread.run() indefinitely because no timeout guards the fetch loop; @Timeout(5) fires after 5 s, proving the bug. Scaffolding added so the tests compile: - archive_read_timeout_ms preference key and 30 s default - ArchiveFetchJob.openReader() hook so TestableFetchJob can inject fake readers without going through ArchiveReaders SPI - Package-private test-only constructor that skips JobManager - Mockito added to databrowser pom Co-Authored-By: Claude Sonnet 4.6 --- app/databrowser/pom.xml | 6 + .../databrowser3/archive/ArchiveFetchJob.java | 21 +- .../databrowser3/preferences/Preferences.java | 2 + .../databrowser_preferences.properties | 13 ++ .../archive/ArchiveFetchJobTest.java | 199 ++++++++++++++++++ 5 files changed, 240 insertions(+), 1 deletion(-) create mode 100644 app/databrowser/src/test/java/org/csstudio/trends/databrowser3/archive/ArchiveFetchJobTest.java diff --git a/app/databrowser/pom.xml b/app/databrowser/pom.xml index 5c062a7a7e..d9c7263e85 100644 --- a/app/databrowser/pom.xml +++ b/app/databrowser/pom.xml @@ -14,6 +14,12 @@ ${junit.version} test + + org.mockito + mockito-core + ${mockito.version} + test + org.hamcrest hamcrest-all diff --git a/app/databrowser/src/main/java/org/csstudio/trends/databrowser3/archive/ArchiveFetchJob.java b/app/databrowser/src/main/java/org/csstudio/trends/databrowser3/archive/ArchiveFetchJob.java index e73f062ac0..60b037eab7 100644 --- a/app/databrowser/src/main/java/org/csstudio/trends/databrowser3/archive/ArchiveFetchJob.java +++ b/app/databrowser/src/main/java/org/csstudio/trends/databrowser3/archive/ArchiveFetchJob.java @@ -128,7 +128,7 @@ public void run() archive.getName(), ++i, archives.size()); try ( - final ArchiveReader the_reader = ArchiveReaders.createReader(url); + final ArchiveReader the_reader = openReader(url); ) { reader.set(the_reader); @@ -205,6 +205,25 @@ public ArchiveFetchJob(final PVItem item, final Instant start, this.job = JobManager.schedule(toString(), this); } + /** Test-only constructor: does not schedule via JobManager. */ + ArchiveFetchJob(final PVItem item, final Instant start, final Instant end, + final ArchiveFetchJobListener listener, final boolean testOnly) + { + this.item = item; + this.start = start; + this.end = end; + this.listener = listener; + this.job = null; + } + + /** Create an {@link ArchiveReader} for the given URL. + * Override in tests to inject fakes. + */ + protected ArchiveReader openReader(final String url) throws Exception + { + return ArchiveReaders.createReader(url); + } + /** @return PVItem for which this job was created */ public PVItem getPVItem() { diff --git a/app/databrowser/src/main/java/org/csstudio/trends/databrowser3/preferences/Preferences.java b/app/databrowser/src/main/java/org/csstudio/trends/databrowser3/preferences/Preferences.java index 0354548f64..8bd1b7f571 100644 --- a/app/databrowser/src/main/java/org/csstudio/trends/databrowser3/preferences/Preferences.java +++ b/app/databrowser/src/main/java/org/csstudio/trends/databrowser3/preferences/Preferences.java @@ -54,6 +54,8 @@ public static class TimePreset /** Setting */ @Preference public static int archive_fetch_delay; /** Setting */ + @Preference public static int archive_read_timeout_ms; + /** Setting */ @Preference public static int concurrent_requests; /** Setting */ @Preference public static ArchiveRescale archive_rescale; diff --git a/app/databrowser/src/main/resources/databrowser_preferences.properties b/app/databrowser/src/main/resources/databrowser_preferences.properties index 3a69e53ae7..d9c8ea5a81 100644 --- a/app/databrowser/src/main/resources/databrowser_preferences.properties +++ b/app/databrowser/src/main/resources/databrowser_preferences.properties @@ -49,6 +49,19 @@ trace_type=AREA # while interactively zooming and panning archive_fetch_delay=500 +# Per-source read timeout in milliseconds. +# +# Each archive source (appliance, channel archiver, RDB, ...) in the +# WorkerThread fetch loop is given this much time to return data. +# When the source does not respond within the timeout the fetch is +# abandoned, a warning is logged, the failure is reported to the +# listener, and the loop continues with the next source. +# +# A value of 0 disables the timeout (wait forever). +# 30 s covers scalar and waveform PVs on a local/campus network. +# Increase this in settings.ini for WAN deployments. +archive_read_timeout_ms=30000 + # Number of concurrent archive fetch requests. # # When more requests are necessary, the background jobs diff --git a/app/databrowser/src/test/java/org/csstudio/trends/databrowser3/archive/ArchiveFetchJobTest.java b/app/databrowser/src/test/java/org/csstudio/trends/databrowser3/archive/ArchiveFetchJobTest.java new file mode 100644 index 0000000000..dd29cbc05c --- /dev/null +++ b/app/databrowser/src/test/java/org/csstudio/trends/databrowser3/archive/ArchiveFetchJobTest.java @@ -0,0 +1,199 @@ +package org.csstudio.trends.databrowser3.archive; + +import org.csstudio.trends.databrowser3.model.ArchiveDataSource; +import org.csstudio.trends.databrowser3.model.PVItem; +import org.csstudio.trends.databrowser3.preferences.Preferences; +import org.epics.vtype.Alarm; +import org.epics.vtype.Display; +import org.epics.vtype.Time; +import org.epics.vtype.VDouble; +import org.epics.vtype.VType; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; +import org.phoebus.archive.reader.ArchiveReader; +import org.phoebus.archive.reader.ValueIterator; + +import java.time.Instant; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicReference; + +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.Mockito.*; + +class ArchiveFetchJobTest { + + // --- test-only subclass ------------------------------------------------- + + /** Subclass that skips JobManager scheduling and injects fake readers. */ + static class TestableFetchJob extends ArchiveFetchJob { + private final java.util.Map readers = new java.util.LinkedHashMap<>(); + + TestableFetchJob(PVItem item, Instant start, Instant end, ArchiveFetchJobListener listener) { + super(item, start, end, listener, true); + } + + void whenUrl(String url, ArchiveReader reader) { + readers.put(url, reader); + } + + @Override + protected ArchiveReader openReader(String url) throws Exception { + ArchiveReader r = readers.get(url); + if (r == null) + throw new Exception("No fake reader registered for URL: " + url); + return r; + } + } + + // --- helpers ------------------------------------------------------------ + + private static VType makeValue() { + return VDouble.of(1.0, Alarm.none(), Time.now(), Display.none()); + } + + private static ValueIterator oneValueIterator() throws Exception { + VType v = makeValue(); + ValueIterator it = mock(ValueIterator.class); + when(it.hasNext()).thenReturn(true, false); + when(it.next()).thenReturn(v); + return it; + } + + private static ArchiveReader readerReturning(ValueIterator valueIterator) throws Exception { + ArchiveReader r = mock(ArchiveReader.class); + when(r.getRawValues(any(), any(), any())).thenReturn(valueIterator); + when(r.getOptimizedValues(any(), any(), any(), anyInt())).thenReturn(valueIterator); + return r; + } + + // --- tests -------------------------------------------------------------- + + @Test + @Timeout(5) + void faultySourceTimesOutAndLoopContinues() throws Exception { + PVItem item = new PVItem("TESTPV", 0.0); + item.addArchiveDataSource(new ArchiveDataSource("src://slow", "Slow")); + item.addArchiveDataSource(new ArchiveDataSource("src://fast", "Fast")); + + List errors = Collections.synchronizedList(new ArrayList<>()); + List completed = Collections.synchronizedList(new ArrayList<>()); + ArchiveFetchJobListener listener = new ArchiveFetchJobListener() { + @Override public void fetchCompleted(ArchiveFetchJob job) { completed.add("done"); } + @Override public void archiveFetchFailed(ArchiveFetchJob job, ArchiveDataSource archive, Exception error) { + errors.add(archive.getName()); + } + @Override public void channelNotFound(ArchiveFetchJob job, boolean found, List failed) {} + }; + + // Slow reader: blocks until released (simulates unresponsive archiver) + CountDownLatch readerBlocking = new CountDownLatch(1); + CountDownLatch releaseReader = new CountDownLatch(1); + ValueIterator blockingIter = mock(ValueIterator.class); + when(blockingIter.hasNext()).thenAnswer(inv -> { + readerBlocking.countDown(); + releaseReader.await(); + return false; + }); + ArchiveReader slowReader = readerReturning(blockingIter); + + ValueIterator fastIter = oneValueIterator(); + ArchiveReader fastReader = readerReturning(fastIter); + + int savedTimeout = Preferences.archive_read_timeout_ms; + Preferences.archive_read_timeout_ms = 500; + try { + TestableFetchJob job = new TestableFetchJob(item, Instant.now().minusSeconds(60), Instant.now(), listener); + job.whenUrl("src://slow", slowReader); + job.whenUrl("src://fast", fastReader); + + ArchiveFetchJob.WorkerThread worker = job.new WorkerThread(); + worker.run(); + + releaseReader.countDown(); // unblock the carrier thread so it can clean up + + assertEquals(List.of("Slow"), errors, "slow source should report timeout failure"); + assertEquals(List.of("done"), completed, "fast source result should complete the job"); + } finally { + Preferences.archive_read_timeout_ms = savedTimeout; + releaseReader.countDown(); // safety: unblock in case test fails early + } + } + + @Test + @Timeout(5) + void healthySourceCompletesWithinTimeout() throws Exception { + PVItem item = new PVItem("TESTPV", 0.0); + item.addArchiveDataSource(new ArchiveDataSource("src://fast", "Fast")); + + List errors = new ArrayList<>(); + AtomicReference completedJob = new AtomicReference<>(); + ArchiveFetchJobListener listener = new ArchiveFetchJobListener() { + @Override public void fetchCompleted(ArchiveFetchJob job) { completedJob.set(job); } + @Override public void archiveFetchFailed(ArchiveFetchJob job, ArchiveDataSource archive, Exception error) { + errors.add(error.getMessage()); + } + @Override public void channelNotFound(ArchiveFetchJob job, boolean found, List failed) {} + }; + + ValueIterator fastIter = oneValueIterator(); + ArchiveReader fastReader = readerReturning(fastIter); + + TestableFetchJob job = new TestableFetchJob(item, Instant.now().minusSeconds(60), Instant.now(), listener); + job.whenUrl("src://fast", fastReader); + + ArchiveFetchJob.WorkerThread worker = job.new WorkerThread(); + worker.run(); + + assertTrue(errors.isEmpty(), "no errors expected for healthy source, got: " + errors); + assertNotNull(completedJob.get(), "fetchCompleted should have been called"); + } + + @Test + @Timeout(5) + void cancelInterruptsPendingFetch() throws Exception { + PVItem item = new PVItem("TESTPV", 0.0); + item.addArchiveDataSource(new ArchiveDataSource("src://slow", "Slow")); + + ArchiveFetchJobListener listener = new ArchiveFetchJobListener() { + @Override public void fetchCompleted(ArchiveFetchJob job) {} + @Override public void archiveFetchFailed(ArchiveFetchJob job, ArchiveDataSource archive, Exception error) {} + @Override public void channelNotFound(ArchiveFetchJob job, boolean found, List failed) {} + }; + + CountDownLatch readerBlocking = new CountDownLatch(1); + CountDownLatch releaseReader = new CountDownLatch(1); + ValueIterator blockingIter = mock(ValueIterator.class); + when(blockingIter.hasNext()).thenAnswer(inv -> { + readerBlocking.countDown(); + releaseReader.await(); + return false; + }); + ArchiveReader slowReader = readerReturning(blockingIter); + + // Long timeout so cancellation (not timeout) drives the exit + int savedTimeout = Preferences.archive_read_timeout_ms; + Preferences.archive_read_timeout_ms = 30_000; + try { + TestableFetchJob job = new TestableFetchJob(item, Instant.now().minusSeconds(60), Instant.now(), listener); + job.whenUrl("src://slow", slowReader); + + ArchiveFetchJob.WorkerThread worker = job.new WorkerThread(); + + Thread t = new Thread(worker::run); + t.start(); + + readerBlocking.await(); // wait until the fetch is blocked + worker.cancel(); // signal cancellation + cancel active reader + releaseReader.countDown(); // unblock the carrier thread + + t.join(3000); + assertFalse(t.isAlive(), "WorkerThread should have exited after cancel()"); + } finally { + Preferences.archive_read_timeout_ms = savedTimeout; + releaseReader.countDown(); + } + } +} From 4845fdcaba9da05ba655ce733a9502369618df6f Mon Sep 17 00:00:00 2001 From: Sky Brewer Date: Tue, 26 May 2026 11:15:37 +0200 Subject: [PATCH 07/10] Add per-source read timeout to ArchiveFetchJob fetch loop WorkerThread.run() now submits each source fetch to Activator.thread_pool and calls Future.get(archive_read_timeout_ms, MILLISECONDS). On timeout: - Future.cancel(true) interrupts the carrier thread - The active ArchiveReader.cancel() is called to close the connection - archiveFetchFailed() is reported and the loop advances to the next source fetchFromSource() extracts the reader-open + iterator-drain logic that previously lived inline in run(), keeping the outer loop readable. archive_read_timeout_ms=0 disables the timeout (Future.get() with no deadline), preserving the previous behaviour when needed. Co-Authored-By: Claude Sonnet 4.6 --- .../databrowser3/archive/ArchiveFetchJob.java | 115 ++++++++++++++---- 1 file changed, 89 insertions(+), 26 deletions(-) diff --git a/app/databrowser/src/main/java/org/csstudio/trends/databrowser3/archive/ArchiveFetchJob.java b/app/databrowser/src/main/java/org/csstudio/trends/databrowser3/archive/ArchiveFetchJob.java index 60b037eab7..4a820a6631 100644 --- a/app/databrowser/src/main/java/org/csstudio/trends/databrowser3/archive/ArchiveFetchJob.java +++ b/app/databrowser/src/main/java/org/csstudio/trends/databrowser3/archive/ArchiveFetchJob.java @@ -9,14 +9,17 @@ import static org.csstudio.trends.databrowser3.Activator.logger; +import java.io.IOException; import java.text.MessageFormat; import java.time.Instant; import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; import java.util.logging.Level; @@ -82,6 +85,9 @@ class WorkerThread implements Runnable /** Archive reader that's currently queried */ private AtomicReference reader = new AtomicReference<>(); + /** Future for the per-source fetch currently in progress */ + private final AtomicReference>> currentFetch = new AtomicReference<>(); + /** @return Message that somehow indicates progress */ public String getMessage() { @@ -93,6 +99,10 @@ public void cancel() { cancelled = true; + final Future> pending = currentFetch.getAndSet(null); + if (pending != null) + pending.cancel(true); + final ArchiveReader the_reader = reader.get(); if (the_reader != null) the_reader.cancel(); @@ -117,51 +127,65 @@ public void run() final Collection archives = item.getArchiveDataSources(); final List archives_without_channel = new ArrayList<>(); + final int bins_final = bins; int i = 0; for (ArchiveDataSource archive : archives) { if (cancelled) break; - final String url = archive.getUrl(); // Display "N/total", using '1' for the first sub-archive. message = MessageFormat.format(Messages.ArchiveFetchDetailFmt, archive.getName(), ++i, archives.size()); + + final Future> fetch = Activator.thread_pool.submit( + () -> fetchFromSource(archive, bins_final)); + currentFetch.set(fetch); + if (cancelled) + { + fetch.cancel(true); + break; + } try - ( - final ArchiveReader the_reader = openReader(url); - ) { - reader.set(the_reader); - try - ( - final ValueIterator value_iter = (item.getRequestType() == RequestType.RAW) - ? the_reader.getRawValues(item.getResolvedName(), start, end) - : the_reader.getOptimizedValues(item.getResolvedName(), start, end, bins) - ) + final List result = (Preferences.archive_read_timeout_ms <= 0) + ? fetch.get() + : fetch.get(Preferences.archive_read_timeout_ms, TimeUnit.MILLISECONDS); + if (!cancelled) { - // Get samples into array - final List result = new ArrayList<>(); - while (value_iter.hasNext()) - result.add(value_iter.next()); samples += result.size(); item.mergeArchivedSamples(archive.getName(), result); } - catch (UnknownChannelException e) - { - // Do not immediately notify about unknown channels. First search for the data in all archive - // sources and only report this kind of errors at the end + } + catch (TimeoutException ex) + { + fetch.cancel(true); + final ArchiveReader timed_out = reader.getAndSet(null); + if (timed_out != null) + timed_out.cancel(); + logger.log(Level.WARNING, + "Archive source timed out after " + Preferences.archive_read_timeout_ms + + " ms, skipping: " + archive.getName()); + if (! cancelled) + listener.archiveFetchFailed(ArchiveFetchJob.this, archive, + new IOException("Read timeout (" + Preferences.archive_read_timeout_ms + " ms)")); + } + catch (ExecutionException ex) + { + final Throwable cause = ex.getCause(); + if (cause instanceof UnknownChannelException) archives_without_channel.add(archive); - } - finally - { - reader.set(null); - } + else if (! cancelled) + listener.archiveFetchFailed(ArchiveFetchJob.this, archive, + cause instanceof Exception ? (Exception) cause : ex); } catch (Exception ex) - { // Tell listener unless it's the result of a 'cancel'? + { if (! cancelled) listener.archiveFetchFailed(ArchiveFetchJob.this, archive, ex); - // Continue with the next data source + } + finally + { + currentFetch.set(null); } } final long end_time = System.currentTimeMillis(); @@ -180,6 +204,45 @@ public void run() listener.fetchCompleted(ArchiveFetchJob.this); } + /** Fetch all samples from one archive source. + * Runs on a carrier thread inside a timed Future. + * @return list of samples + * @throws Exception on fetch error + */ + List fetchFromSource(final ArchiveDataSource archive, final int bins) throws Exception + { + try (final ArchiveReader the_reader = openReader(archive.getUrl())) + { + // If a timeout or cancel fired while openReader() was blocking, + // bail now so the try-with-resources closes the_reader cleanly. + if (Thread.currentThread().isInterrupted()) + throw new InterruptedException("Interrupted during open"); + reader.set(the_reader); + try + ( + final ValueIterator value_iter = (item.getRequestType() == RequestType.RAW) + ? the_reader.getRawValues(item.getResolvedName(), start, end) + : the_reader.getOptimizedValues(item.getResolvedName(), start, end, bins) + ) + { + final List result = new ArrayList<>(); + while (value_iter.hasNext()) + result.add(value_iter.next()); + return result; + } + finally + { + reader.set(null); + } + } + finally + { + // Clear any stale interrupt flag before the carrier thread + // returns to the shared pool, so subsequent tasks are not poisoned. + Thread.interrupted(); + } + } + @Override public String toString() { From 47d1f6d7959019037ca18e5cbd845b1593f3d967 Mon Sep 17 00:00:00 2001 From: Sky Brewer Date: Tue, 26 May 2026 14:36:42 +0200 Subject: [PATCH 08/10] Add generic functional tests alongside existing regression tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Broadens test coverage beyond the three regression scenarios (static lock, hasNext/close deadlock, per-source timeout) to cover the normal code paths that a reader of these classes relies on. - ApplianceValueIteratorExtractDataTest (new, 13 tests): extractData() across all supported payload types — scalar numerics, enum, string, waveform — plus alarm severity and display header extraction. - ApplianceArchiveReaderTest (+6 tests): getOptimizedValues() routing (points≤count→raw, statistics, mean, old-appliance fallback on fetch failure), iterator map registration, and close()→cancel() delegation. - ApplianceStatisticsValueIteratorTest (+2 tests): VStatistics assembly from five sub-iterators, and null-safe next() after close(). - ArchiveFetchJobTest (+7 tests): UnknownChannelException→channelNotFound routing (full and partial), two healthy sources completing together, IOException→archiveFetchFailed, cancel-before-start, and RAW vs OPTIMIZED request type dispatch. Co-Authored-By: Claude Sonnet 4.6 --- .../archive/ArchiveFetchJobTest.java | 194 ++++++++++++++ .../appliance/ApplianceArchiveReaderTest.java | 68 +++++ .../ApplianceStatisticsValueIteratorTest.java | 54 ++++ ...ApplianceValueIteratorExtractDataTest.java | 243 ++++++++++++++++++ 4 files changed, 559 insertions(+) create mode 100644 app/trends/archive-reader/src/test/java/org/phoebus/archive/reader/appliance/ApplianceValueIteratorExtractDataTest.java diff --git a/app/databrowser/src/test/java/org/csstudio/trends/databrowser3/archive/ArchiveFetchJobTest.java b/app/databrowser/src/test/java/org/csstudio/trends/databrowser3/archive/ArchiveFetchJobTest.java index dd29cbc05c..fcbc948216 100644 --- a/app/databrowser/src/test/java/org/csstudio/trends/databrowser3/archive/ArchiveFetchJobTest.java +++ b/app/databrowser/src/test/java/org/csstudio/trends/databrowser3/archive/ArchiveFetchJobTest.java @@ -2,6 +2,7 @@ import org.csstudio.trends.databrowser3.model.ArchiveDataSource; import org.csstudio.trends.databrowser3.model.PVItem; +import org.csstudio.trends.databrowser3.model.RequestType; import org.csstudio.trends.databrowser3.preferences.Preferences; import org.epics.vtype.Alarm; import org.epics.vtype.Display; @@ -11,8 +12,10 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; import org.phoebus.archive.reader.ArchiveReader; +import org.phoebus.archive.reader.UnknownChannelException; import org.phoebus.archive.reader.ValueIterator; +import java.io.IOException; import java.time.Instant; import java.util.ArrayList; import java.util.Collections; @@ -151,6 +154,197 @@ void healthySourceCompletesWithinTimeout() throws Exception { assertNotNull(completedJob.get(), "fetchCompleted should have been called"); } + @Test + @Timeout(5) + void unknownChannelExceptionRoutesToChannelNotFound() throws Exception { + PVItem item = new PVItem("TESTPV", 0.0); + item.addArchiveDataSource(new ArchiveDataSource("src://unknown", "Unknown")); + + List errors = new ArrayList<>(); + AtomicReference foundRef = new AtomicReference<>(); + ArchiveFetchJobListener listener = new ArchiveFetchJobListener() { + @Override public void fetchCompleted(ArchiveFetchJob job) {} + @Override public void archiveFetchFailed(ArchiveFetchJob job, ArchiveDataSource archive, Exception error) { + errors.add(archive.getName()); + } + @Override public void channelNotFound(ArchiveFetchJob job, boolean found, List failed) { + foundRef.set(found); + } + }; + + ArchiveReader unknownReader = mock(ArchiveReader.class); + when(unknownReader.getRawValues(any(), any(), any())).thenThrow(new UnknownChannelException("TESTPV")); + when(unknownReader.getOptimizedValues(any(), any(), any(), anyInt())).thenThrow(new UnknownChannelException("TESTPV")); + + TestableFetchJob job = new TestableFetchJob(item, Instant.now().minusSeconds(60), Instant.now(), listener); + job.whenUrl("src://unknown", unknownReader); + + job.new WorkerThread().run(); + + assertNotNull(foundRef.get(), "channelNotFound should have been called"); + assertFalse(foundRef.get(), "found should be false when no source has the channel"); + assertTrue(errors.isEmpty(), "archiveFetchFailed must not be called for UnknownChannelException"); + } + + @Test + @Timeout(5) + void partialUnknownChannelReportsFoundTrue() throws Exception { + PVItem item = new PVItem("TESTPV", 0.0); + item.addArchiveDataSource(new ArchiveDataSource("src://unknown", "Unknown")); + item.addArchiveDataSource(new ArchiveDataSource("src://ok", "Ok")); + + AtomicReference foundRef = new AtomicReference<>(); + AtomicReference> failedRef = new AtomicReference<>(); + ArchiveFetchJobListener listener = new ArchiveFetchJobListener() { + @Override public void fetchCompleted(ArchiveFetchJob job) {} + @Override public void archiveFetchFailed(ArchiveFetchJob job, ArchiveDataSource archive, Exception error) {} + @Override public void channelNotFound(ArchiveFetchJob job, boolean found, List failed) { + foundRef.set(found); + failedRef.set(failed); + } + }; + + ArchiveReader unknownReader = mock(ArchiveReader.class); + when(unknownReader.getRawValues(any(), any(), any())).thenThrow(new UnknownChannelException("TESTPV")); + when(unknownReader.getOptimizedValues(any(), any(), any(), anyInt())).thenThrow(new UnknownChannelException("TESTPV")); + + TestableFetchJob job = new TestableFetchJob(item, Instant.now().minusSeconds(60), Instant.now(), listener); + job.whenUrl("src://unknown", unknownReader); + job.whenUrl("src://ok", readerReturning(oneValueIterator())); + + job.new WorkerThread().run(); + + assertNotNull(foundRef.get(), "channelNotFound should have been called"); + assertTrue(foundRef.get(), "found=true when channel exists in at least one source"); + assertEquals(1, failedRef.get().size()); + assertEquals("Unknown", failedRef.get().get(0).getName()); + } + + @Test + @Timeout(5) + void multipleSourcesAllSamplesComplete() throws Exception { + PVItem item = new PVItem("TESTPV", 0.0); + item.addArchiveDataSource(new ArchiveDataSource("src://a", "SourceA")); + item.addArchiveDataSource(new ArchiveDataSource("src://b", "SourceB")); + + List errors = new ArrayList<>(); + AtomicReference completedJob = new AtomicReference<>(); + ArchiveFetchJobListener listener = new ArchiveFetchJobListener() { + @Override public void fetchCompleted(ArchiveFetchJob job) { completedJob.set(job); } + @Override public void archiveFetchFailed(ArchiveFetchJob job, ArchiveDataSource archive, Exception error) { + errors.add(error.getMessage()); + } + @Override public void channelNotFound(ArchiveFetchJob job, boolean found, List failed) {} + }; + + TestableFetchJob job = new TestableFetchJob(item, Instant.now().minusSeconds(60), Instant.now(), listener); + job.whenUrl("src://a", readerReturning(oneValueIterator())); + job.whenUrl("src://b", readerReturning(oneValueIterator())); + + job.new WorkerThread().run(); + + assertTrue(errors.isEmpty(), "no errors expected for two healthy sources"); + assertNotNull(completedJob.get(), "fetchCompleted should be called after both sources succeed"); + } + + @Test + @Timeout(5) + void fetchExceptionCallsArchiveFetchFailed() throws Exception { + PVItem item = new PVItem("TESTPV", 0.0); + item.addArchiveDataSource(new ArchiveDataSource("src://broken", "Broken")); + item.addArchiveDataSource(new ArchiveDataSource("src://ok", "Ok")); + + List errors = Collections.synchronizedList(new ArrayList<>()); + AtomicReference completedJob = new AtomicReference<>(); + ArchiveFetchJobListener listener = new ArchiveFetchJobListener() { + @Override public void fetchCompleted(ArchiveFetchJob job) { completedJob.set(job); } + @Override public void archiveFetchFailed(ArchiveFetchJob job, ArchiveDataSource archive, Exception error) { + errors.add(archive.getName()); + } + @Override public void channelNotFound(ArchiveFetchJob job, boolean found, List failed) {} + }; + + ArchiveReader brokenReader = mock(ArchiveReader.class); + when(brokenReader.getRawValues(any(), any(), any())).thenThrow(new IOException("network error")); + when(brokenReader.getOptimizedValues(any(), any(), any(), anyInt())).thenThrow(new IOException("network error")); + + TestableFetchJob job = new TestableFetchJob(item, Instant.now().minusSeconds(60), Instant.now(), listener); + job.whenUrl("src://broken", brokenReader); + job.whenUrl("src://ok", readerReturning(oneValueIterator())); + + job.new WorkerThread().run(); + + assertEquals(List.of("Broken"), errors, "broken source should call archiveFetchFailed"); + assertNotNull(completedJob.get(), "fetch should complete after the error is reported"); + } + + @Test + @Timeout(5) + void cancelBeforeFirstFetchSkipsAll() throws Exception { + PVItem item = new PVItem("TESTPV", 0.0); + item.addArchiveDataSource(new ArchiveDataSource("src://any", "Any")); + + List completed = new ArrayList<>(); + ArchiveFetchJobListener listener = new ArchiveFetchJobListener() { + @Override public void fetchCompleted(ArchiveFetchJob job) { completed.add("done"); } + @Override public void archiveFetchFailed(ArchiveFetchJob job, ArchiveDataSource archive, Exception error) {} + @Override public void channelNotFound(ArchiveFetchJob job, boolean found, List failed) {} + }; + + TestableFetchJob job = new TestableFetchJob(item, Instant.now().minusSeconds(60), Instant.now(), listener); + ArchiveFetchJob.WorkerThread worker = job.new WorkerThread(); + worker.cancel(); // set cancelled=true before run() is called + worker.run(); + + assertTrue(completed.isEmpty(), "fetchCompleted must not be called when cancelled before start"); + } + + @Test + @Timeout(5) + void rawRequestTypeCallsGetRawValues() throws Exception { + PVItem item = new PVItem("TESTPV", 0.0); + item.setRequestType(RequestType.RAW); + item.addArchiveDataSource(new ArchiveDataSource("src://any", "Any")); + + ArchiveFetchJobListener listener = new ArchiveFetchJobListener() { + @Override public void fetchCompleted(ArchiveFetchJob job) {} + @Override public void archiveFetchFailed(ArchiveFetchJob job, ArchiveDataSource archive, Exception error) {} + @Override public void channelNotFound(ArchiveFetchJob job, boolean found, List failed) {} + }; + + ArchiveReader mockReader = readerReturning(oneValueIterator()); + TestableFetchJob job = new TestableFetchJob(item, Instant.now().minusSeconds(60), Instant.now(), listener); + job.whenUrl("src://any", mockReader); + + job.new WorkerThread().run(); + + verify(mockReader).getRawValues(any(), any(), any()); + verify(mockReader, never()).getOptimizedValues(any(), any(), any(), anyInt()); + } + + @Test + @Timeout(5) + void optimizedRequestTypeCallsGetOptimizedValues() throws Exception { + PVItem item = new PVItem("TESTPV", 0.0); + item.setRequestType(RequestType.OPTIMIZED); + item.addArchiveDataSource(new ArchiveDataSource("src://any", "Any")); + + ArchiveFetchJobListener listener = new ArchiveFetchJobListener() { + @Override public void fetchCompleted(ArchiveFetchJob job) {} + @Override public void archiveFetchFailed(ArchiveFetchJob job, ArchiveDataSource archive, Exception error) {} + @Override public void channelNotFound(ArchiveFetchJob job, boolean found, List failed) {} + }; + + ArchiveReader mockReader = readerReturning(oneValueIterator()); + TestableFetchJob job = new TestableFetchJob(item, Instant.now().minusSeconds(60), Instant.now(), listener); + job.whenUrl("src://any", mockReader); + + job.new WorkerThread().run(); + + verify(mockReader, never()).getRawValues(any(), any(), any()); + verify(mockReader).getOptimizedValues(any(), any(), any(), anyInt()); + } + @Test @Timeout(5) void cancelInterruptsPendingFetch() throws Exception { diff --git a/app/trends/archive-reader/src/test/java/org/phoebus/archive/reader/appliance/ApplianceArchiveReaderTest.java b/app/trends/archive-reader/src/test/java/org/phoebus/archive/reader/appliance/ApplianceArchiveReaderTest.java index f4adfe7f51..8ebeda09b7 100644 --- a/app/trends/archive-reader/src/test/java/org/phoebus/archive/reader/appliance/ApplianceArchiveReaderTest.java +++ b/app/trends/archive-reader/src/test/java/org/phoebus/archive/reader/appliance/ApplianceArchiveReaderTest.java @@ -114,4 +114,72 @@ void getNumberOfPointsUsesNcountOperator() throws Exception { assertTrue(dr.pvsCalled.stream().anyMatch(pv -> pv.startsWith("ncount(")), "Expected an ncount( call, got: " + dr.pvsCalled); } + + @Test + void getRawValuesRegistersIteratorInMap() throws Exception { + FakeDataRetrieval dr = new FakeDataRetrieval(emptyStream()); + FakeApplianceArchiveReader reader = new FakeApplianceArchiveReader(dr); + ApplianceValueIterator iter = (ApplianceValueIterator) reader.getRawValues("TEST:PV", START, END); + assertTrue(reader.iterators.containsKey(iter), "iterator should be registered in the map after getRawValues"); + } + + @Test + void closeCallsCancel() throws Exception { + FakeDataRetrieval dr = new FakeDataRetrieval(emptyStream()); + FakeApplianceArchiveReader reader = new FakeApplianceArchiveReader(dr); + ApplianceValueIterator iter = (ApplianceValueIterator) reader.getRawValues("TEST:PV", START, END); + assertFalse(iter.closed); + reader.close(); + assertTrue(iter.closed, "close() should close all active iterators via cancel()"); + } + + @Test + void pointsAtOrBelowCountUsesRawIterator() throws Exception { + // ncount returns 50, requested count is 100 → points <= count → ApplianceRawValueIterator + FakeDataRetrieval dr = new FakeDataRetrieval(emptyStream()); + dr.whenPvContains("ncount(", countStream(50)); + FakeApplianceArchiveReader reader = new FakeApplianceArchiveReader(dr, false, false); + Object iter = reader.getOptimizedValues("TEST:PV", START, END, 100); + assertInstanceOf(ApplianceRawValueIterator.class, iter); + } + + @Test + void numericScalarWithUseStatisticsTrueUsesStatisticsIterator() throws Exception { + // ncount returns 200 > 100 and PV type is SCALAR_DOUBLE → ApplianceStatisticsValueIterator + FakeDataRetrieval dr = new FakeDataRetrieval(probeStream(PayloadType.SCALAR_DOUBLE)); + dr.whenPvContains("ncount(", countStream(200)); + dr.whenPvContains("mean_", emptyStream()); + dr.whenPvContains("std_", emptyStream()); + dr.whenPvContains("min_", emptyStream()); + dr.whenPvContains("max_", emptyStream()); + dr.whenPvContains("count_", emptyStream()); + FakeApplianceArchiveReader reader = new FakeApplianceArchiveReader(dr, true, false); + Object iter = reader.getOptimizedValues("TEST:PV", START, END, 100); + assertInstanceOf(ApplianceStatisticsValueIterator.class, iter); + } + + @Test + void numericScalarWithUseStatisticsFalseUsesMeanIterator() throws Exception { + // ncount returns 200 > 100, useStatistics=false → ApplianceMeanValueIterator (not statistics subclass) + FakeDataRetrieval dr = new FakeDataRetrieval(probeStream(PayloadType.SCALAR_DOUBLE)); + dr.whenPvContains("ncount(", countStream(200)); + dr.whenPvContains("mean_", emptyStream()); + FakeApplianceArchiveReader reader = new FakeApplianceArchiveReader(dr, false, false); + Object iter = reader.getOptimizedValues("TEST:PV", START, END, 100); + assertEquals(ApplianceMeanValueIterator.class, iter.getClass(), + "expected exact ApplianceMeanValueIterator, not a statistics subclass"); + } + + @Test + void oldApplianceFallbackOnFetchFailure() throws Exception { + // Statistics fetch fails mid-construction (std_ stream null → ArchiverApplianceException) + // → outer catch falls back to ApplianceRawValueIterator + FakeDataRetrieval dr = new FakeDataRetrieval(emptyStream()); + dr.whenPvContains("ncount(", countStream(200)); + dr.whenPvContains("mean_", emptyStream()); + dr.whenPvContains("std_", null); // null triggers ArchiverApplianceException in statistics iterator + FakeApplianceArchiveReader reader = new FakeApplianceArchiveReader(dr, true, false); + Object iter = reader.getOptimizedValues("TEST:PV", START, END, 100); + assertInstanceOf(ApplianceRawValueIterator.class, iter); + } } diff --git a/app/trends/archive-reader/src/test/java/org/phoebus/archive/reader/appliance/ApplianceStatisticsValueIteratorTest.java b/app/trends/archive-reader/src/test/java/org/phoebus/archive/reader/appliance/ApplianceStatisticsValueIteratorTest.java index 1183f20273..011e908d86 100644 --- a/app/trends/archive-reader/src/test/java/org/phoebus/archive/reader/appliance/ApplianceStatisticsValueIteratorTest.java +++ b/app/trends/archive-reader/src/test/java/org/phoebus/archive/reader/appliance/ApplianceStatisticsValueIteratorTest.java @@ -4,8 +4,11 @@ import edu.stanford.slac.archiverappliance.PB.EPICSEvent.PayloadType; import org.epics.archiverappliance.retrieval.client.EpicsMessage; import org.epics.archiverappliance.retrieval.client.GenMsgIterator; +import org.epics.vtype.VStatistics; +import org.epics.vtype.VType; import org.junit.jupiter.api.Test; +import java.sql.Timestamp; import java.time.Instant; import java.util.Collections; @@ -81,4 +84,55 @@ void closeClosesAllFiveStreams() throws Exception { verify(maxStream).close(); verify(countStream).close(); } + + // ---- next() assembly and lifecycle ---- + + /** One-message stream returning the given numeric value; mean stream gets SCALAR_DOUBLE PayloadInfo. */ + private static GenMsgIterator valueStream(double value, boolean withPayloadInfo) { + EpicsMessage msg = mock(EpicsMessage.class); + when(msg.getNumberValue()).thenReturn(value); + when(msg.getSeverity()).thenReturn(0); + when(msg.getStatus()).thenReturn(0); + when(msg.getTimestamp()).thenReturn(new Timestamp(System.currentTimeMillis())); + GenMsgIterator s = mock(GenMsgIterator.class); + when(s.iterator()).thenReturn(Collections.singletonList(msg).iterator()); + if (withPayloadInfo) + when(s.getPayLoadInfo()).thenReturn( + PayloadInfo.newBuilder().setType(PayloadType.SCALAR_DOUBLE).buildPartial()); + return s; + } + + @Test + void nextAssemblesVStatisticsFromFiveIterators() throws Exception { + GenMsgIterator meanStream = valueStream(2.0, true); + GenMsgIterator stdStream = valueStream(0.5, false); + GenMsgIterator minStream = valueStream(1.0, false); + GenMsgIterator maxStream = valueStream(3.0, false); + GenMsgIterator countStream = valueStream(10.0, false); + + FakeDataRetrieval dr = setupDr(meanStream, stdStream, minStream, maxStream, countStream); + ApplianceStatisticsValueIterator iter = makeIterator(dr); + + assertTrue(iter.hasNext()); + VType result = iter.next(); + + assertInstanceOf(VStatistics.class, result); + VStatistics vs = (VStatistics) result; + assertEquals(2.0, vs.getAverage(), 0.001); + assertEquals(0.5, vs.getStdDev(), 0.001); + assertEquals(1.0, vs.getMin(), 0.001); + assertEquals(3.0, vs.getMax(), 0.001); + assertEquals(10, (int) vs.getNSamples()); + } + + @Test + void nextReturnsNullWhenClosed() throws Exception { + GenMsgIterator meanStream = valueStream(1.0, true); + FakeDataRetrieval dr = setupDr(meanStream, emptyStream(), emptyStream(), emptyStream(), emptyStream()); + ApplianceStatisticsValueIterator iter = makeIterator(dr); + + iter.close(); + + assertNull(iter.next(), "next() after close() should return null without throwing"); + } } diff --git a/app/trends/archive-reader/src/test/java/org/phoebus/archive/reader/appliance/ApplianceValueIteratorExtractDataTest.java b/app/trends/archive-reader/src/test/java/org/phoebus/archive/reader/appliance/ApplianceValueIteratorExtractDataTest.java new file mode 100644 index 0000000000..ba95d438e8 --- /dev/null +++ b/app/trends/archive-reader/src/test/java/org/phoebus/archive/reader/appliance/ApplianceValueIteratorExtractDataTest.java @@ -0,0 +1,243 @@ +package org.phoebus.archive.reader.appliance; + +import com.google.protobuf.ByteString; +import edu.stanford.slac.archiverappliance.PB.EPICSEvent; +import edu.stanford.slac.archiverappliance.PB.EPICSEvent.FieldValue; +import edu.stanford.slac.archiverappliance.PB.EPICSEvent.PayloadInfo; +import edu.stanford.slac.archiverappliance.PB.EPICSEvent.PayloadType; +import org.epics.archiverappliance.retrieval.client.EpicsMessage; +import org.epics.archiverappliance.retrieval.client.GenMsgIterator; +import org.epics.vtype.AlarmSeverity; +import org.epics.vtype.VByteArray; +import org.epics.vtype.VDoubleArray; +import org.epics.vtype.VEnum; +import org.epics.vtype.VIntArray; +import org.epics.vtype.VNumber; +import org.epics.vtype.VString; +import org.epics.vtype.VType; +import org.junit.jupiter.api.Test; + +import java.sql.Timestamp; +import java.time.Instant; +import java.util.Collections; + +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.Mockito.*; + +/** + * Functional tests for ApplianceValueIterator.extractData() across all supported payload types. + * + * Uses a minimal concrete subclass that exposes the protected extractData() method directly, + * avoiding the need for real archive network connections. + */ +class ApplianceValueIteratorExtractDataTest { + + // ---- minimal test subclass ---- + + private static class DataExtractIterator extends ApplianceValueIterator { + DataExtractIterator(ApplianceArchiveReader reader, GenMsgIterator stream) { + super(reader, "TEST:PV", Instant.EPOCH, Instant.EPOCH); + this.mainStream = stream; + this.mainIterator = stream.iterator(); + } + + VType extract(EpicsMessage msg) { + return extractData(msg); + } + } + + // ---- helpers ---- + + private static GenMsgIterator streamOfType(PayloadType type, FieldValue... headers) { + GenMsgIterator s = mock(GenMsgIterator.class); + when(s.iterator()).thenReturn(Collections.emptyIterator()); + PayloadInfo.Builder b = PayloadInfo.newBuilder().setType(type); + for (FieldValue h : headers) b.addHeaders(h); + when(s.getPayLoadInfo()).thenReturn(b.buildPartial()); + return s; + } + + private static DataExtractIterator iteratorWithStream(GenMsgIterator stream) { + FakeDataRetrieval dr = new FakeDataRetrieval(stream); + return new DataExtractIterator(new FakeApplianceArchiveReader(dr), stream); + } + + private static DataExtractIterator iteratorForType(PayloadType type, FieldValue... headers) { + return iteratorWithStream(streamOfType(type, headers)); + } + + /** Mock EpicsMessage returning the given numeric value with no alarm and a current timestamp. */ + private static EpicsMessage numericMsg(Number value) { + EpicsMessage msg = mock(EpicsMessage.class); + when(msg.getNumberValue()).thenReturn(value); + when(msg.getSeverity()).thenReturn(0); + when(msg.getStatus()).thenReturn(0); + when(msg.getTimestamp()).thenReturn(new Timestamp(System.currentTimeMillis())); + return msg; + } + + /** EpicsMessage backed by a real protobuf message object (needed for getMessage().getField()). */ + private static EpicsMessage pbMsg(com.google.protobuf.Message pb) { + EpicsMessage msg = mock(EpicsMessage.class); + when(msg.getMessage()).thenReturn(pb); + when(msg.getSeverity()).thenReturn(0); + when(msg.getStatus()).thenReturn(0); + when(msg.getTimestamp()).thenReturn(new Timestamp(System.currentTimeMillis())); + return msg; + } + + // ---- scalar numeric types ---- + + @Test + void scalarDoubleYieldsVNumber() { + DataExtractIterator iter = iteratorForType(PayloadType.SCALAR_DOUBLE); + VType result = iter.extract(numericMsg(42.5)); + assertInstanceOf(VNumber.class, result); + assertEquals(42.5, ((VNumber) result).getValue().doubleValue(), 0.001); + } + + @Test + void scalarIntYieldsVNumber() { + DataExtractIterator iter = iteratorForType(PayloadType.SCALAR_INT); + VType result = iter.extract(numericMsg(7)); + assertInstanceOf(VNumber.class, result); + assertEquals(7, ((VNumber) result).getValue().intValue()); + } + + @Test + void scalarByteYieldsVNumber() { + DataExtractIterator iter = iteratorForType(PayloadType.SCALAR_BYTE); + VType result = iter.extract(numericMsg(3)); + assertInstanceOf(VNumber.class, result); + } + + @Test + void scalarShortYieldsVNumber() { + DataExtractIterator iter = iteratorForType(PayloadType.SCALAR_SHORT); + VType result = iter.extract(numericMsg(100)); + assertInstanceOf(VNumber.class, result); + } + + @Test + void scalarFloatYieldsVNumber() { + DataExtractIterator iter = iteratorForType(PayloadType.SCALAR_FLOAT); + VType result = iter.extract(numericMsg(1.5f)); + assertInstanceOf(VNumber.class, result); + } + + // ---- enum ---- + + @Test + void scalarEnumYieldsVEnum() { + DataExtractIterator iter = iteratorForType(PayloadType.SCALAR_ENUM, + FieldValue.newBuilder().setName("ENUM_0").setVal("Off").build(), + FieldValue.newBuilder().setName("ENUM_1").setVal("On").build()); + VType result = iter.extract(numericMsg(1)); + assertInstanceOf(VEnum.class, result); + assertEquals(1, ((VEnum) result).getIndex()); + } + + // ---- alarm ---- + + @Test + void alarmSeverityPreserved() { + DataExtractIterator iter = iteratorForType(PayloadType.SCALAR_DOUBLE); + EpicsMessage msg = mock(EpicsMessage.class); + when(msg.getNumberValue()).thenReturn(0.0); + when(msg.getSeverity()).thenReturn(2); // 2 = MAJOR + when(msg.getStatus()).thenReturn(0); + when(msg.getTimestamp()).thenReturn(new Timestamp(System.currentTimeMillis())); + VType result = iter.extract(msg); + assertInstanceOf(VNumber.class, result); + assertEquals(AlarmSeverity.MAJOR, ((VNumber) result).getAlarm().getSeverity()); + } + + // ---- display metadata ---- + + @Test + void displayHeadersExtracted() { + DataExtractIterator iter = iteratorForType(PayloadType.SCALAR_DOUBLE, + FieldValue.newBuilder().setName("EGU").setVal("mA").build(), + FieldValue.newBuilder().setName("LOPR").setVal("0.0").build(), + FieldValue.newBuilder().setName("HOPR").setVal("100.0").build()); + VType result = iter.extract(numericMsg(50.0)); + assertInstanceOf(VNumber.class, result); + VNumber vn = (VNumber) result; + assertEquals("mA", vn.getDisplay().getUnit()); + assertEquals(0.0, vn.getDisplay().getDisplayRange().getMinimum(), 0.001); + assertEquals(100.0, vn.getDisplay().getDisplayRange().getMaximum(), 0.001); + } + + // ---- enum label ordering ---- + + @Test + void enumLabelsExtractedAndSorted() { + // Labels supplied out-of-order — extractData must return them sorted by index number + DataExtractIterator iter = iteratorForType(PayloadType.SCALAR_ENUM, + FieldValue.newBuilder().setName("ENUM_2").setVal("Error").build(), + FieldValue.newBuilder().setName("ENUM_0").setVal("Off").build(), + FieldValue.newBuilder().setName("ENUM_1").setVal("On").build()); + VType result = iter.extract(numericMsg(0)); + assertInstanceOf(VEnum.class, result); + assertEquals("Off", ((VEnum) result).getDisplay().getChoices().get(0)); + assertEquals("On", ((VEnum) result).getDisplay().getChoices().get(1)); + assertEquals("Error", ((VEnum) result).getDisplay().getChoices().get(2)); + } + + // ---- string and waveform types via real protobuf messages ---- + + @Test + void scalarStringYieldsVString() { + EPICSEvent.ScalarString pb = EPICSEvent.ScalarString.newBuilder() + .setSecondsintoyear(0).setNano(0).setVal("test-value").build(); + DataExtractIterator iter = iteratorForType(PayloadType.SCALAR_STRING); + VType result = iter.extract(pbMsg(pb)); + assertInstanceOf(VString.class, result); + assertEquals("test-value", ((VString) result).getValue()); + } + + @Test + void waveformDoubleYieldsVDoubleArray() { + EPICSEvent.VectorDouble pb = EPICSEvent.VectorDouble.newBuilder() + .setSecondsintoyear(0).setNano(0) + .addVal(1.0).addVal(2.0).addVal(3.0).build(); + DataExtractIterator iter = iteratorForType(PayloadType.WAVEFORM_DOUBLE); + VType result = iter.extract(pbMsg(pb)); + assertInstanceOf(VDoubleArray.class, result); + VDoubleArray vda = (VDoubleArray) result; + assertEquals(3, vda.getData().size()); + assertEquals(1.0, vda.getData().getDouble(0), 0.001); + assertEquals(2.0, vda.getData().getDouble(1), 0.001); + assertEquals(3.0, vda.getData().getDouble(2), 0.001); + } + + @Test + void waveformIntYieldsVIntArray() { + EPICSEvent.VectorInt pb = EPICSEvent.VectorInt.newBuilder() + .setSecondsintoyear(0).setNano(0) + .addVal(10).addVal(20).addVal(30).build(); + DataExtractIterator iter = iteratorForType(PayloadType.WAVEFORM_INT); + VType result = iter.extract(pbMsg(pb)); + assertInstanceOf(VIntArray.class, result); + VIntArray via = (VIntArray) result; + assertEquals(3, via.getData().size()); + assertEquals(10, via.getData().getInt(0)); + assertEquals(20, via.getData().getInt(1)); + assertEquals(30, via.getData().getInt(2)); + } + + @Test + void waveformByteYieldsVByteArray() { + EPICSEvent.VectorChar pb = EPICSEvent.VectorChar.newBuilder() + .setSecondsintoyear(0).setNano(0) + .setVal(ByteString.copyFrom(new byte[]{1, 2, 3})).build(); + DataExtractIterator iter = iteratorForType(PayloadType.WAVEFORM_BYTE); + VType result = iter.extract(pbMsg(pb)); + assertInstanceOf(VByteArray.class, result); + VByteArray vba = (VByteArray) result; + assertEquals(3, vba.getData().size()); + assertEquals((byte) 1, vba.getData().getByte(0)); + assertEquals((byte) 2, vba.getData().getByte(1)); + assertEquals((byte) 3, vba.getData().getByte(2)); + } +} From 5c822206cd4811df76222e7d7dc0758257fe90ef Mon Sep 17 00:00:00 2001 From: Sky Brewer Date: Tue, 26 May 2026 16:43:28 +0200 Subject: [PATCH 09/10] Improve error logging for archive fetch failures and fallback paths - ApplianceValueIterator.close(): log IOException at WARNING instead of wrapping it in IllegalStateException; move closed=true into finally so it is always set even when the stream close throws - ApplianceStatisticsValueIterator.closeStream(): add package-level logger and emit WARNING instead of silently swallowing the IOException - ApplianceArchiveReader.getOptimizedValues(): log FINE for expected type fallbacks and WARNING when the optimized or count-based path fails - ArchiveFetchJob.WorkerThread: log WARNING for ExecutionException and generic Exception fetch failures, not just TimeoutException Co-Authored-By: Claude Sonnet 4.6 --- .../databrowser3/archive/ArchiveFetchJob.java | 14 +++++++++++--- .../reader/appliance/ApplianceArchiveReader.java | 6 +++++- .../ApplianceStatisticsValueIterator.java | 7 ++++++- .../reader/appliance/ApplianceValueIterator.java | 6 ++++++ 4 files changed, 28 insertions(+), 5 deletions(-) diff --git a/app/databrowser/src/main/java/org/csstudio/trends/databrowser3/archive/ArchiveFetchJob.java b/app/databrowser/src/main/java/org/csstudio/trends/databrowser3/archive/ArchiveFetchJob.java index 4a820a6631..3dff97617f 100644 --- a/app/databrowser/src/main/java/org/csstudio/trends/databrowser3/archive/ArchiveFetchJob.java +++ b/app/databrowser/src/main/java/org/csstudio/trends/databrowser3/archive/ArchiveFetchJob.java @@ -174,12 +174,20 @@ public void run() final Throwable cause = ex.getCause(); if (cause instanceof UnknownChannelException) archives_without_channel.add(archive); - else if (! cancelled) - listener.archiveFetchFailed(ArchiveFetchJob.this, archive, - cause instanceof Exception ? (Exception) cause : ex); + else + { + final Throwable logged = cause != null ? cause : ex; + logger.log(Level.WARNING, logged, + () -> "Archive fetch failed for source: " + archive.getName()); + if (! cancelled) + listener.archiveFetchFailed(ArchiveFetchJob.this, archive, + cause instanceof Exception ? (Exception) cause : ex); + } } catch (Exception ex) { + logger.log(Level.WARNING, ex, + () -> "Archive fetch error for source: " + archive.getName()); if (! cancelled) listener.archiveFetchFailed(ArchiveFetchJob.this, archive, ex); } diff --git a/app/trends/archive-reader/src/main/java/org/phoebus/archive/reader/appliance/ApplianceArchiveReader.java b/app/trends/archive-reader/src/main/java/org/phoebus/archive/reader/appliance/ApplianceArchiveReader.java index dd09e93e2a..2c90dbec63 100644 --- a/app/trends/archive-reader/src/main/java/org/phoebus/archive/reader/appliance/ApplianceArchiveReader.java +++ b/app/trends/archive-reader/src/main/java/org/phoebus/archive/reader/appliance/ApplianceArchiveReader.java @@ -131,10 +131,12 @@ public ValueIterator getOptimizedValues(String name, Instant start, Instant end, try { it = new ApplianceOptimizedValueIterator(this, name, start, end, count, useStatistics); } catch (ArchiverApplianceInvalidTypeException e) { - //binning not supported + //binning not supported for this PV type — expected for enum/waveform + logger.log(Level.FINE, "Optimized operator not supported for PV type, falling back: " + name, e); binningSupported = false; } catch (ArchiverApplianceException e) { //optimized operator not supported on the server, fall back to the old way + logger.log(Level.WARNING, "Optimized operator failed for " + name + ", falling back to count-based path", e); } } @@ -155,6 +157,7 @@ public ValueIterator getOptimizedValues(String name, Instant start, Instant end, it = new ApplianceMeanValueIterator(this, name, start, end, count); } } catch (ArchiverApplianceInvalidTypeException e) { + logger.log(Level.FINE, "Binning not supported for PV type, using non-numeric path: " + name, e); binningSupported = false; } } @@ -166,6 +169,7 @@ public ValueIterator getOptimizedValues(String name, Instant start, Instant end, } } catch (ArchiverApplianceException e) { //fallback for older archiver appliance, which didn't have the nth operator + logger.log(Level.WARNING, "Count-based path failed for " + name + ", falling back to raw values", e); try { it = new ApplianceRawValueIterator(this, name, start, end); } catch (ArchiverApplianceException exc) { diff --git a/app/trends/archive-reader/src/main/java/org/phoebus/archive/reader/appliance/ApplianceStatisticsValueIterator.java b/app/trends/archive-reader/src/main/java/org/phoebus/archive/reader/appliance/ApplianceStatisticsValueIterator.java index edc7be19b3..dbed29295a 100644 --- a/app/trends/archive-reader/src/main/java/org/phoebus/archive/reader/appliance/ApplianceStatisticsValueIterator.java +++ b/app/trends/archive-reader/src/main/java/org/phoebus/archive/reader/appliance/ApplianceStatisticsValueIterator.java @@ -3,6 +3,8 @@ import java.io.IOException; import java.time.Instant; import java.util.Iterator; +import java.util.logging.Level; +import java.util.logging.Logger; import org.epics.archiverappliance.retrieval.client.DataRetrieval; import org.epics.archiverappliance.retrieval.client.EpicsMessage; @@ -27,6 +29,9 @@ */ public class ApplianceStatisticsValueIterator extends ApplianceMeanValueIterator { + private static final Logger logger = + Logger.getLogger(ApplianceStatisticsValueIterator.class.getPackage().getName()); + private Iterator stdIterator; private GenMsgIterator stdStream; private Iterator minIterator; @@ -128,7 +133,7 @@ private static void closeStream(GenMsgIterator s) { try { s.close(); } catch (IOException e) { - // log and continue — super.close() must still run + logger.log(Level.WARNING, e, () -> "Failed to close statistics sub-stream"); } } } diff --git a/app/trends/archive-reader/src/main/java/org/phoebus/archive/reader/appliance/ApplianceValueIterator.java b/app/trends/archive-reader/src/main/java/org/phoebus/archive/reader/appliance/ApplianceValueIterator.java index 06aa8d8adb..b565ff272e 100644 --- a/app/trends/archive-reader/src/main/java/org/phoebus/archive/reader/appliance/ApplianceValueIterator.java +++ b/app/trends/archive-reader/src/main/java/org/phoebus/archive/reader/appliance/ApplianceValueIterator.java @@ -3,6 +3,8 @@ import java.io.IOException; import java.time.Instant; import java.util.ArrayList; +import java.util.logging.Level; +import java.util.logging.Logger; import java.util.Iterator; import java.util.List; import java.util.HashMap; @@ -52,6 +54,9 @@ */ public abstract class ApplianceValueIterator implements ValueIterator { + private static final Logger logger = + Logger.getLogger(ApplianceValueIterator.class.getPackage().getName()); + protected EnumDisplay enumDisplay; protected Display display; protected GenMsgIterator mainStream; @@ -260,6 +265,7 @@ public void close() { closed = true; } } catch (IOException e) { + logger.log(Level.WARNING, e, () -> "Failed to close stream for PV: " + name); throw new IllegalStateException(e); } } From 4d19a4f09b7425a80c36995daafba46b31c29023 Mon Sep 17 00:00:00 2001 From: Sky Brewer Date: Wed, 27 May 2026 12:27:23 +0200 Subject: [PATCH 10/10] Move per-source read timeout to outer polling loop Timeout is now checked in the existing run(JobMonitor) poll loop rather than a per-source virtual-thread watchdog, eliminating the extra thread per source. A stuck source triggers cancel() on the whole job. Co-Authored-By: Claude Sonnet 4.6 --- .../databrowser3/archive/ArchiveFetchJob.java | 108 +++++------------- .../archive/ArchiveFetchJobTest.java | 24 ++-- 2 files changed, 40 insertions(+), 92 deletions(-) diff --git a/app/databrowser/src/main/java/org/csstudio/trends/databrowser3/archive/ArchiveFetchJob.java b/app/databrowser/src/main/java/org/csstudio/trends/databrowser3/archive/ArchiveFetchJob.java index 3dff97617f..b74240cba8 100644 --- a/app/databrowser/src/main/java/org/csstudio/trends/databrowser3/archive/ArchiveFetchJob.java +++ b/app/databrowser/src/main/java/org/csstudio/trends/databrowser3/archive/ArchiveFetchJob.java @@ -9,18 +9,14 @@ import static org.csstudio.trends.databrowser3.Activator.logger; -import java.io.IOException; import java.text.MessageFormat; import java.time.Instant; import java.util.ArrayList; import java.util.Collection; import java.util.List; -import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicReference; import java.util.logging.Level; import org.csstudio.trends.databrowser3.Activator; @@ -74,7 +70,7 @@ public class ArchiveFetchJob implements JobRunnable * Instead of directly accessing the archive, ArchiveFetchJob launches * a WorkerThread for the actual archive access, so that the Job * can then poll the progress monitor for cancellation and if - * necessary interrupt the WorkerThread which might be 'stuck' + * necessary cancel the archive reader when it is 'stuck' * in a long running operation. */ class WorkerThread implements Runnable @@ -83,10 +79,7 @@ class WorkerThread implements Runnable private volatile boolean cancelled = false; /** Archive reader that's currently queried */ - private AtomicReference reader = new AtomicReference<>(); - - /** Future for the per-source fetch currently in progress */ - private final AtomicReference>> currentFetch = new AtomicReference<>(); + private volatile ArchiveReader reader; /** @return Message that somehow indicates progress */ public String getMessage() @@ -98,14 +91,9 @@ public String getMessage() public void cancel() { cancelled = true; - - final Future> pending = currentFetch.getAndSet(null); - if (pending != null) - pending.cancel(true); - - final ArchiveReader the_reader = reader.get(); - if (the_reader != null) - the_reader.cancel(); + final ArchiveReader r = reader; + if (r != null) + r.cancel(); } /** {@inheritDoc} */ @@ -137,64 +125,28 @@ public void run() message = MessageFormat.format(Messages.ArchiveFetchDetailFmt, archive.getName(), ++i, archives.size()); - final Future> fetch = Activator.thread_pool.submit( - () -> fetchFromSource(archive, bins_final)); - currentFetch.set(fetch); - if (cancelled) - { - fetch.cancel(true); - break; - } try { - final List result = (Preferences.archive_read_timeout_ms <= 0) - ? fetch.get() - : fetch.get(Preferences.archive_read_timeout_ms, TimeUnit.MILLISECONDS); + final List fetched = fetchFromSource(archive, bins_final); if (!cancelled) { - samples += result.size(); - item.mergeArchivedSamples(archive.getName(), result); + samples += fetched.size(); + item.mergeArchivedSamples(archive.getName(), fetched); } } - catch (TimeoutException ex) + catch (UnknownChannelException ex) { - fetch.cancel(true); - final ArchiveReader timed_out = reader.getAndSet(null); - if (timed_out != null) - timed_out.cancel(); - logger.log(Level.WARNING, - "Archive source timed out after " + Preferences.archive_read_timeout_ms - + " ms, skipping: " + archive.getName()); - if (! cancelled) - listener.archiveFetchFailed(ArchiveFetchJob.this, archive, - new IOException("Read timeout (" + Preferences.archive_read_timeout_ms + " ms)")); - } - catch (ExecutionException ex) - { - final Throwable cause = ex.getCause(); - if (cause instanceof UnknownChannelException) - archives_without_channel.add(archive); - else - { - final Throwable logged = cause != null ? cause : ex; - logger.log(Level.WARNING, logged, - () -> "Archive fetch failed for source: " + archive.getName()); - if (! cancelled) - listener.archiveFetchFailed(ArchiveFetchJob.this, archive, - cause instanceof Exception ? (Exception) cause : ex); - } + // Do not immediately notify about unknown channels. First search for the data in all archive + // sources and only report this kind of errors at the end + archives_without_channel.add(archive); } catch (Exception ex) { logger.log(Level.WARNING, ex, - () -> "Archive fetch error for source: " + archive.getName()); - if (! cancelled) + () -> "Archive fetch failed for source: " + archive.getName()); + if (!cancelled) listener.archiveFetchFailed(ArchiveFetchJob.this, archive, ex); } - finally - { - currentFetch.set(null); - } } final long end_time = System.currentTimeMillis(); logger.log(Level.FINE, @@ -213,7 +165,7 @@ public void run() } /** Fetch all samples from one archive source. - * Runs on a carrier thread inside a timed Future. + * Runs directly on WorkerThread, timed by the outer polling loop. * @return list of samples * @throws Exception on fetch error */ @@ -221,11 +173,7 @@ List fetchFromSource(final ArchiveDataSource archive, final int bins) thr { try (final ArchiveReader the_reader = openReader(archive.getUrl())) { - // If a timeout or cancel fired while openReader() was blocking, - // bail now so the try-with-resources closes the_reader cleanly. - if (Thread.currentThread().isInterrupted()) - throw new InterruptedException("Interrupted during open"); - reader.set(the_reader); + reader = the_reader; try ( final ValueIterator value_iter = (item.getRequestType() == RequestType.RAW) @@ -240,15 +188,9 @@ List fetchFromSource(final ArchiveDataSource archive, final int bins) thr } finally { - reader.set(null); + reader = null; } } - finally - { - // Clear any stale interrupt flag before the carrier thread - // returns to the shared pool, so subsequent tasks are not poisoned. - Thread.interrupted(); - } } @Override @@ -338,6 +280,8 @@ public void run(JobMonitor monitor) throws Exception final Future done = Activator.thread_pool.submit(worker); // Poll worker and progress monitor long start = System.currentTimeMillis(); + String lastSourceMessage = ""; + long sourceStartTime = System.currentTimeMillis(); while (!done.isDone()) { // Wait until worker is done, or time out to update info message try @@ -348,13 +292,19 @@ public void run(JobMonitor monitor) throws Exception { // Ignore } + final String currentMessage = worker.getMessage(); + if (!currentMessage.equals(lastSourceMessage)) + { + lastSourceMessage = currentMessage; + sourceStartTime = System.currentTimeMillis(); + } final long seconds = (System.currentTimeMillis() - start) / 1000; final String info = MessageFormat.format(Messages.ArchiveFetchProgressFmt, - worker.getMessage(), seconds); + currentMessage, seconds); monitor.updateTaskName(info); - // Try to cancel the worker in response to user's cancel request. - // Continues to cancel the worker until isDone() - if (monitor.isCanceled()) + if (monitor.isCanceled() + || (Preferences.archive_read_timeout_ms > 0 + && System.currentTimeMillis() - sourceStartTime > Preferences.archive_read_timeout_ms)) worker.cancel(); } } diff --git a/app/databrowser/src/test/java/org/csstudio/trends/databrowser3/archive/ArchiveFetchJobTest.java b/app/databrowser/src/test/java/org/csstudio/trends/databrowser3/archive/ArchiveFetchJobTest.java index fcbc948216..b0a078d5bb 100644 --- a/app/databrowser/src/test/java/org/csstudio/trends/databrowser3/archive/ArchiveFetchJobTest.java +++ b/app/databrowser/src/test/java/org/csstudio/trends/databrowser3/archive/ArchiveFetchJobTest.java @@ -76,10 +76,9 @@ private static ArchiveReader readerReturning(ValueIterator valueIterator) throws @Test @Timeout(5) - void faultySourceTimesOutAndLoopContinues() throws Exception { + void timeoutCancelsJob() throws Exception { PVItem item = new PVItem("TESTPV", 0.0); item.addArchiveDataSource(new ArchiveDataSource("src://slow", "Slow")); - item.addArchiveDataSource(new ArchiveDataSource("src://fast", "Fast")); List errors = Collections.synchronizedList(new ArrayList<>()); List completed = Collections.synchronizedList(new ArrayList<>()); @@ -102,25 +101,24 @@ void faultySourceTimesOutAndLoopContinues() throws Exception { }); ArchiveReader slowReader = readerReturning(blockingIter); - ValueIterator fastIter = oneValueIterator(); - ArchiveReader fastReader = readerReturning(fastIter); - - int savedTimeout = Preferences.archive_read_timeout_ms; - Preferences.archive_read_timeout_ms = 500; try { TestableFetchJob job = new TestableFetchJob(item, Instant.now().minusSeconds(60), Instant.now(), listener); job.whenUrl("src://slow", slowReader); - job.whenUrl("src://fast", fastReader); ArchiveFetchJob.WorkerThread worker = job.new WorkerThread(); - worker.run(); + Thread t = new Thread(worker::run); + t.start(); - releaseReader.countDown(); // unblock the carrier thread so it can clean up + readerBlocking.await(); // wait until fetch is stuck on slow source + worker.cancel(); // simulate outer-loop timeout firing + releaseReader.countDown(); // unblock the reader so it can exit cleanly - assertEquals(List.of("Slow"), errors, "slow source should report timeout failure"); - assertEquals(List.of("done"), completed, "fast source result should complete the job"); + t.join(3000); + assertFalse(t.isAlive(), "WorkerThread should have exited after cancel"); + + assertTrue(errors.isEmpty(), "cancel does not report a per-source error"); + assertTrue(completed.isEmpty(), "fetchCompleted is not called when job is cancelled"); } finally { - Preferences.archive_read_timeout_ms = savedTimeout; releaseReader.countDown(); // safety: unblock in case test fails early } }