diff --git a/app/databrowser/pom.xml b/app/databrowser/pom.xml
index 5c062a7a7e..d9c7263e85 100644
--- a/app/databrowser/pom.xml
+++ b/app/databrowser/pom.xml
@@ -14,6 +14,12 @@
${junit.version}
test
+
+ org.mockito
+ mockito-core
+ ${mockito.version}
+ test
+
org.hamcrest
hamcrest-all
diff --git a/app/databrowser/src/main/java/org/csstudio/trends/databrowser3/archive/ArchiveFetchJob.java b/app/databrowser/src/main/java/org/csstudio/trends/databrowser3/archive/ArchiveFetchJob.java
index e73f062ac0..b74240cba8 100644
--- a/app/databrowser/src/main/java/org/csstudio/trends/databrowser3/archive/ArchiveFetchJob.java
+++ b/app/databrowser/src/main/java/org/csstudio/trends/databrowser3/archive/ArchiveFetchJob.java
@@ -17,7 +17,6 @@
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Level;
import org.csstudio.trends.databrowser3.Activator;
@@ -71,7 +70,7 @@ public class ArchiveFetchJob implements JobRunnable
* Instead of directly accessing the archive, ArchiveFetchJob launches
* a WorkerThread for the actual archive access, so that the Job
* can then poll the progress monitor for cancellation and if
- * necessary interrupt the WorkerThread which might be 'stuck'
+ * necessary cancel the archive reader when it is 'stuck'
* in a long running operation.
*/
class WorkerThread implements Runnable
@@ -80,7 +79,7 @@ class WorkerThread implements Runnable
private volatile boolean cancelled = false;
/** Archive reader that's currently queried */
- private AtomicReference reader = new AtomicReference<>();
+ private volatile ArchiveReader reader;
/** @return Message that somehow indicates progress */
public String getMessage()
@@ -92,10 +91,9 @@ public String getMessage()
public void cancel()
{
cancelled = true;
-
- final ArchiveReader the_reader = reader.get();
- if (the_reader != null)
- the_reader.cancel();
+ final ArchiveReader r = reader;
+ if (r != null)
+ r.cancel();
}
/** {@inheritDoc} */
@@ -117,51 +115,37 @@ public void run()
final Collection archives = item.getArchiveDataSources();
final List archives_without_channel = new ArrayList<>();
+ final int bins_final = bins;
int i = 0;
for (ArchiveDataSource archive : archives)
{
if (cancelled)
break;
- final String url = archive.getUrl();
// Display "N/total", using '1' for the first sub-archive.
message = MessageFormat.format(Messages.ArchiveFetchDetailFmt,
archive.getName(), ++i, archives.size());
+
try
- (
- final ArchiveReader the_reader = ArchiveReaders.createReader(url);
- )
{
- reader.set(the_reader);
- try
- (
- final ValueIterator value_iter = (item.getRequestType() == RequestType.RAW)
- ? the_reader.getRawValues(item.getResolvedName(), start, end)
- : the_reader.getOptimizedValues(item.getResolvedName(), start, end, bins)
- )
+ final List fetched = fetchFromSource(archive, bins_final);
+ if (!cancelled)
{
- // Get samples into array
- final List result = new ArrayList<>();
- while (value_iter.hasNext())
- result.add(value_iter.next());
- samples += result.size();
- item.mergeArchivedSamples(archive.getName(), result);
- }
- catch (UnknownChannelException e)
- {
- // Do not immediately notify about unknown channels. First search for the data in all archive
- // sources and only report this kind of errors at the end
- archives_without_channel.add(archive);
- }
- finally
- {
- reader.set(null);
+ samples += fetched.size();
+ item.mergeArchivedSamples(archive.getName(), fetched);
}
}
+ catch (UnknownChannelException ex)
+ {
+ // Do not immediately notify about unknown channels. First search for the data in all archive
+ // sources and only report this kind of errors at the end
+ archives_without_channel.add(archive);
+ }
catch (Exception ex)
- { // Tell listener unless it's the result of a 'cancel'?
- if (! cancelled)
+ {
+ logger.log(Level.WARNING, ex,
+ () -> "Archive fetch failed for source: " + archive.getName());
+ if (!cancelled)
listener.archiveFetchFailed(ArchiveFetchJob.this, archive, ex);
- // Continue with the next data source
}
}
final long end_time = System.currentTimeMillis();
@@ -180,6 +164,35 @@ public void run()
listener.fetchCompleted(ArchiveFetchJob.this);
}
+ /** Fetch all samples from one archive source.
+ * Runs directly on WorkerThread, timed by the outer polling loop.
+ * @return list of samples
+ * @throws Exception on fetch error
+ */
+ List fetchFromSource(final ArchiveDataSource archive, final int bins) throws Exception
+ {
+ try (final ArchiveReader the_reader = openReader(archive.getUrl()))
+ {
+ reader = the_reader;
+ try
+ (
+ final ValueIterator value_iter = (item.getRequestType() == RequestType.RAW)
+ ? the_reader.getRawValues(item.getResolvedName(), start, end)
+ : the_reader.getOptimizedValues(item.getResolvedName(), start, end, bins)
+ )
+ {
+ final List result = new ArrayList<>();
+ while (value_iter.hasNext())
+ result.add(value_iter.next());
+ return result;
+ }
+ finally
+ {
+ reader = null;
+ }
+ }
+ }
+
@Override
public String toString()
{
@@ -205,6 +218,25 @@ public ArchiveFetchJob(final PVItem item, final Instant start,
this.job = JobManager.schedule(toString(), this);
}
+ /** Test-only constructor: does not schedule via JobManager. */
+ ArchiveFetchJob(final PVItem item, final Instant start, final Instant end,
+ final ArchiveFetchJobListener listener, final boolean testOnly)
+ {
+ this.item = item;
+ this.start = start;
+ this.end = end;
+ this.listener = listener;
+ this.job = null;
+ }
+
+ /** Create an {@link ArchiveReader} for the given URL.
+ * Override in tests to inject fakes.
+ */
+ protected ArchiveReader openReader(final String url) throws Exception
+ {
+ return ArchiveReaders.createReader(url);
+ }
+
/** @return PVItem for which this job was created */
public PVItem getPVItem()
{
@@ -248,6 +280,8 @@ public void run(JobMonitor monitor) throws Exception
final Future> done = Activator.thread_pool.submit(worker);
// Poll worker and progress monitor
long start = System.currentTimeMillis();
+ String lastSourceMessage = "";
+ long sourceStartTime = System.currentTimeMillis();
while (!done.isDone())
{ // Wait until worker is done, or time out to update info message
try
@@ -258,13 +292,19 @@ public void run(JobMonitor monitor) throws Exception
{
// Ignore
}
+ final String currentMessage = worker.getMessage();
+ if (!currentMessage.equals(lastSourceMessage))
+ {
+ lastSourceMessage = currentMessage;
+ sourceStartTime = System.currentTimeMillis();
+ }
final long seconds = (System.currentTimeMillis() - start) / 1000;
final String info = MessageFormat.format(Messages.ArchiveFetchProgressFmt,
- worker.getMessage(), seconds);
+ currentMessage, seconds);
monitor.updateTaskName(info);
- // Try to cancel the worker in response to user's cancel request.
- // Continues to cancel the worker until isDone()
- if (monitor.isCanceled())
+ if (monitor.isCanceled()
+ || (Preferences.archive_read_timeout_ms > 0
+ && System.currentTimeMillis() - sourceStartTime > Preferences.archive_read_timeout_ms))
worker.cancel();
}
}
diff --git a/app/databrowser/src/main/java/org/csstudio/trends/databrowser3/preferences/Preferences.java b/app/databrowser/src/main/java/org/csstudio/trends/databrowser3/preferences/Preferences.java
index 0354548f64..8bd1b7f571 100644
--- a/app/databrowser/src/main/java/org/csstudio/trends/databrowser3/preferences/Preferences.java
+++ b/app/databrowser/src/main/java/org/csstudio/trends/databrowser3/preferences/Preferences.java
@@ -54,6 +54,8 @@ public static class TimePreset
/** Setting */
@Preference public static int archive_fetch_delay;
/** Setting */
+ @Preference public static int archive_read_timeout_ms;
+ /** Setting */
@Preference public static int concurrent_requests;
/** Setting */
@Preference public static ArchiveRescale archive_rescale;
diff --git a/app/databrowser/src/main/resources/databrowser_preferences.properties b/app/databrowser/src/main/resources/databrowser_preferences.properties
index 3a69e53ae7..d9c8ea5a81 100644
--- a/app/databrowser/src/main/resources/databrowser_preferences.properties
+++ b/app/databrowser/src/main/resources/databrowser_preferences.properties
@@ -49,6 +49,19 @@ trace_type=AREA
# while interactively zooming and panning
archive_fetch_delay=500
+# Per-source read timeout in milliseconds.
+#
+# Each archive source (appliance, channel archiver, RDB, ...) in the
+# WorkerThread fetch loop is given this much time to return data.
+# When the source does not respond within the timeout the fetch is
+# abandoned, a warning is logged, the failure is reported to the
+# listener, and the loop continues with the next source.
+#
+# A value of 0 disables the timeout (wait forever).
+# 30 s covers scalar and waveform PVs on a local/campus network.
+# Increase this in settings.ini for WAN deployments.
+archive_read_timeout_ms=30000
+
# Number of concurrent archive fetch requests.
#
# When more requests are necessary, the background jobs
diff --git a/app/databrowser/src/test/java/org/csstudio/trends/databrowser3/archive/ArchiveFetchJobTest.java b/app/databrowser/src/test/java/org/csstudio/trends/databrowser3/archive/ArchiveFetchJobTest.java
new file mode 100644
index 0000000000..b0a078d5bb
--- /dev/null
+++ b/app/databrowser/src/test/java/org/csstudio/trends/databrowser3/archive/ArchiveFetchJobTest.java
@@ -0,0 +1,391 @@
+package org.csstudio.trends.databrowser3.archive;
+
+import org.csstudio.trends.databrowser3.model.ArchiveDataSource;
+import org.csstudio.trends.databrowser3.model.PVItem;
+import org.csstudio.trends.databrowser3.model.RequestType;
+import org.csstudio.trends.databrowser3.preferences.Preferences;
+import org.epics.vtype.Alarm;
+import org.epics.vtype.Display;
+import org.epics.vtype.Time;
+import org.epics.vtype.VDouble;
+import org.epics.vtype.VType;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.phoebus.archive.reader.ArchiveReader;
+import org.phoebus.archive.reader.UnknownChannelException;
+import org.phoebus.archive.reader.ValueIterator;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.junit.jupiter.api.Assertions.*;
+import static org.mockito.Mockito.*;
+
+class ArchiveFetchJobTest {
+
+ // --- test-only subclass -------------------------------------------------
+
+ /** Subclass that skips JobManager scheduling and injects fake readers. */
+ static class TestableFetchJob extends ArchiveFetchJob {
+ private final java.util.Map readers = new java.util.LinkedHashMap<>();
+
+ TestableFetchJob(PVItem item, Instant start, Instant end, ArchiveFetchJobListener listener) {
+ super(item, start, end, listener, true);
+ }
+
+ void whenUrl(String url, ArchiveReader reader) {
+ readers.put(url, reader);
+ }
+
+ @Override
+ protected ArchiveReader openReader(String url) throws Exception {
+ ArchiveReader r = readers.get(url);
+ if (r == null)
+ throw new Exception("No fake reader registered for URL: " + url);
+ return r;
+ }
+ }
+
+ // --- helpers ------------------------------------------------------------
+
+ private static VType makeValue() {
+ return VDouble.of(1.0, Alarm.none(), Time.now(), Display.none());
+ }
+
+ private static ValueIterator oneValueIterator() throws Exception {
+ VType v = makeValue();
+ ValueIterator it = mock(ValueIterator.class);
+ when(it.hasNext()).thenReturn(true, false);
+ when(it.next()).thenReturn(v);
+ return it;
+ }
+
+ private static ArchiveReader readerReturning(ValueIterator valueIterator) throws Exception {
+ ArchiveReader r = mock(ArchiveReader.class);
+ when(r.getRawValues(any(), any(), any())).thenReturn(valueIterator);
+ when(r.getOptimizedValues(any(), any(), any(), anyInt())).thenReturn(valueIterator);
+ return r;
+ }
+
+ // --- tests --------------------------------------------------------------
+
+ @Test
+ @Timeout(5)
+ void timeoutCancelsJob() throws Exception {
+ PVItem item = new PVItem("TESTPV", 0.0);
+ item.addArchiveDataSource(new ArchiveDataSource("src://slow", "Slow"));
+
+ List errors = Collections.synchronizedList(new ArrayList<>());
+ List completed = Collections.synchronizedList(new ArrayList<>());
+ ArchiveFetchJobListener listener = new ArchiveFetchJobListener() {
+ @Override public void fetchCompleted(ArchiveFetchJob job) { completed.add("done"); }
+ @Override public void archiveFetchFailed(ArchiveFetchJob job, ArchiveDataSource archive, Exception error) {
+ errors.add(archive.getName());
+ }
+ @Override public void channelNotFound(ArchiveFetchJob job, boolean found, List failed) {}
+ };
+
+ // Slow reader: blocks until released (simulates unresponsive archiver)
+ CountDownLatch readerBlocking = new CountDownLatch(1);
+ CountDownLatch releaseReader = new CountDownLatch(1);
+ ValueIterator blockingIter = mock(ValueIterator.class);
+ when(blockingIter.hasNext()).thenAnswer(inv -> {
+ readerBlocking.countDown();
+ releaseReader.await();
+ return false;
+ });
+ ArchiveReader slowReader = readerReturning(blockingIter);
+
+ try {
+ TestableFetchJob job = new TestableFetchJob(item, Instant.now().minusSeconds(60), Instant.now(), listener);
+ job.whenUrl("src://slow", slowReader);
+
+ ArchiveFetchJob.WorkerThread worker = job.new WorkerThread();
+ Thread t = new Thread(worker::run);
+ t.start();
+
+ readerBlocking.await(); // wait until fetch is stuck on slow source
+ worker.cancel(); // simulate outer-loop timeout firing
+ releaseReader.countDown(); // unblock the reader so it can exit cleanly
+
+ t.join(3000);
+ assertFalse(t.isAlive(), "WorkerThread should have exited after cancel");
+
+ assertTrue(errors.isEmpty(), "cancel does not report a per-source error");
+ assertTrue(completed.isEmpty(), "fetchCompleted is not called when job is cancelled");
+ } finally {
+ releaseReader.countDown(); // safety: unblock in case test fails early
+ }
+ }
+
+ @Test
+ @Timeout(5)
+ void healthySourceCompletesWithinTimeout() throws Exception {
+ PVItem item = new PVItem("TESTPV", 0.0);
+ item.addArchiveDataSource(new ArchiveDataSource("src://fast", "Fast"));
+
+ List errors = new ArrayList<>();
+ AtomicReference completedJob = new AtomicReference<>();
+ ArchiveFetchJobListener listener = new ArchiveFetchJobListener() {
+ @Override public void fetchCompleted(ArchiveFetchJob job) { completedJob.set(job); }
+ @Override public void archiveFetchFailed(ArchiveFetchJob job, ArchiveDataSource archive, Exception error) {
+ errors.add(error.getMessage());
+ }
+ @Override public void channelNotFound(ArchiveFetchJob job, boolean found, List failed) {}
+ };
+
+ ValueIterator fastIter = oneValueIterator();
+ ArchiveReader fastReader = readerReturning(fastIter);
+
+ TestableFetchJob job = new TestableFetchJob(item, Instant.now().minusSeconds(60), Instant.now(), listener);
+ job.whenUrl("src://fast", fastReader);
+
+ ArchiveFetchJob.WorkerThread worker = job.new WorkerThread();
+ worker.run();
+
+ assertTrue(errors.isEmpty(), "no errors expected for healthy source, got: " + errors);
+ assertNotNull(completedJob.get(), "fetchCompleted should have been called");
+ }
+
+ @Test
+ @Timeout(5)
+ void unknownChannelExceptionRoutesToChannelNotFound() throws Exception {
+ PVItem item = new PVItem("TESTPV", 0.0);
+ item.addArchiveDataSource(new ArchiveDataSource("src://unknown", "Unknown"));
+
+ List errors = new ArrayList<>();
+ AtomicReference foundRef = new AtomicReference<>();
+ ArchiveFetchJobListener listener = new ArchiveFetchJobListener() {
+ @Override public void fetchCompleted(ArchiveFetchJob job) {}
+ @Override public void archiveFetchFailed(ArchiveFetchJob job, ArchiveDataSource archive, Exception error) {
+ errors.add(archive.getName());
+ }
+ @Override public void channelNotFound(ArchiveFetchJob job, boolean found, List failed) {
+ foundRef.set(found);
+ }
+ };
+
+ ArchiveReader unknownReader = mock(ArchiveReader.class);
+ when(unknownReader.getRawValues(any(), any(), any())).thenThrow(new UnknownChannelException("TESTPV"));
+ when(unknownReader.getOptimizedValues(any(), any(), any(), anyInt())).thenThrow(new UnknownChannelException("TESTPV"));
+
+ TestableFetchJob job = new TestableFetchJob(item, Instant.now().minusSeconds(60), Instant.now(), listener);
+ job.whenUrl("src://unknown", unknownReader);
+
+ job.new WorkerThread().run();
+
+ assertNotNull(foundRef.get(), "channelNotFound should have been called");
+ assertFalse(foundRef.get(), "found should be false when no source has the channel");
+ assertTrue(errors.isEmpty(), "archiveFetchFailed must not be called for UnknownChannelException");
+ }
+
+ @Test
+ @Timeout(5)
+ void partialUnknownChannelReportsFoundTrue() throws Exception {
+ PVItem item = new PVItem("TESTPV", 0.0);
+ item.addArchiveDataSource(new ArchiveDataSource("src://unknown", "Unknown"));
+ item.addArchiveDataSource(new ArchiveDataSource("src://ok", "Ok"));
+
+ AtomicReference foundRef = new AtomicReference<>();
+ AtomicReference> failedRef = new AtomicReference<>();
+ ArchiveFetchJobListener listener = new ArchiveFetchJobListener() {
+ @Override public void fetchCompleted(ArchiveFetchJob job) {}
+ @Override public void archiveFetchFailed(ArchiveFetchJob job, ArchiveDataSource archive, Exception error) {}
+ @Override public void channelNotFound(ArchiveFetchJob job, boolean found, List failed) {
+ foundRef.set(found);
+ failedRef.set(failed);
+ }
+ };
+
+ ArchiveReader unknownReader = mock(ArchiveReader.class);
+ when(unknownReader.getRawValues(any(), any(), any())).thenThrow(new UnknownChannelException("TESTPV"));
+ when(unknownReader.getOptimizedValues(any(), any(), any(), anyInt())).thenThrow(new UnknownChannelException("TESTPV"));
+
+ TestableFetchJob job = new TestableFetchJob(item, Instant.now().minusSeconds(60), Instant.now(), listener);
+ job.whenUrl("src://unknown", unknownReader);
+ job.whenUrl("src://ok", readerReturning(oneValueIterator()));
+
+ job.new WorkerThread().run();
+
+ assertNotNull(foundRef.get(), "channelNotFound should have been called");
+ assertTrue(foundRef.get(), "found=true when channel exists in at least one source");
+ assertEquals(1, failedRef.get().size());
+ assertEquals("Unknown", failedRef.get().get(0).getName());
+ }
+
+ @Test
+ @Timeout(5)
+ void multipleSourcesAllSamplesComplete() throws Exception {
+ PVItem item = new PVItem("TESTPV", 0.0);
+ item.addArchiveDataSource(new ArchiveDataSource("src://a", "SourceA"));
+ item.addArchiveDataSource(new ArchiveDataSource("src://b", "SourceB"));
+
+ List errors = new ArrayList<>();
+ AtomicReference completedJob = new AtomicReference<>();
+ ArchiveFetchJobListener listener = new ArchiveFetchJobListener() {
+ @Override public void fetchCompleted(ArchiveFetchJob job) { completedJob.set(job); }
+ @Override public void archiveFetchFailed(ArchiveFetchJob job, ArchiveDataSource archive, Exception error) {
+ errors.add(error.getMessage());
+ }
+ @Override public void channelNotFound(ArchiveFetchJob job, boolean found, List failed) {}
+ };
+
+ TestableFetchJob job = new TestableFetchJob(item, Instant.now().minusSeconds(60), Instant.now(), listener);
+ job.whenUrl("src://a", readerReturning(oneValueIterator()));
+ job.whenUrl("src://b", readerReturning(oneValueIterator()));
+
+ job.new WorkerThread().run();
+
+ assertTrue(errors.isEmpty(), "no errors expected for two healthy sources");
+ assertNotNull(completedJob.get(), "fetchCompleted should be called after both sources succeed");
+ }
+
+ @Test
+ @Timeout(5)
+ void fetchExceptionCallsArchiveFetchFailed() throws Exception {
+ PVItem item = new PVItem("TESTPV", 0.0);
+ item.addArchiveDataSource(new ArchiveDataSource("src://broken", "Broken"));
+ item.addArchiveDataSource(new ArchiveDataSource("src://ok", "Ok"));
+
+ List errors = Collections.synchronizedList(new ArrayList<>());
+ AtomicReference completedJob = new AtomicReference<>();
+ ArchiveFetchJobListener listener = new ArchiveFetchJobListener() {
+ @Override public void fetchCompleted(ArchiveFetchJob job) { completedJob.set(job); }
+ @Override public void archiveFetchFailed(ArchiveFetchJob job, ArchiveDataSource archive, Exception error) {
+ errors.add(archive.getName());
+ }
+ @Override public void channelNotFound(ArchiveFetchJob job, boolean found, List failed) {}
+ };
+
+ ArchiveReader brokenReader = mock(ArchiveReader.class);
+ when(brokenReader.getRawValues(any(), any(), any())).thenThrow(new IOException("network error"));
+ when(brokenReader.getOptimizedValues(any(), any(), any(), anyInt())).thenThrow(new IOException("network error"));
+
+ TestableFetchJob job = new TestableFetchJob(item, Instant.now().minusSeconds(60), Instant.now(), listener);
+ job.whenUrl("src://broken", brokenReader);
+ job.whenUrl("src://ok", readerReturning(oneValueIterator()));
+
+ job.new WorkerThread().run();
+
+ assertEquals(List.of("Broken"), errors, "broken source should call archiveFetchFailed");
+ assertNotNull(completedJob.get(), "fetch should complete after the error is reported");
+ }
+
+ @Test
+ @Timeout(5)
+ void cancelBeforeFirstFetchSkipsAll() throws Exception {
+ PVItem item = new PVItem("TESTPV", 0.0);
+ item.addArchiveDataSource(new ArchiveDataSource("src://any", "Any"));
+
+ List completed = new ArrayList<>();
+ ArchiveFetchJobListener listener = new ArchiveFetchJobListener() {
+ @Override public void fetchCompleted(ArchiveFetchJob job) { completed.add("done"); }
+ @Override public void archiveFetchFailed(ArchiveFetchJob job, ArchiveDataSource archive, Exception error) {}
+ @Override public void channelNotFound(ArchiveFetchJob job, boolean found, List failed) {}
+ };
+
+ TestableFetchJob job = new TestableFetchJob(item, Instant.now().minusSeconds(60), Instant.now(), listener);
+ ArchiveFetchJob.WorkerThread worker = job.new WorkerThread();
+ worker.cancel(); // set cancelled=true before run() is called
+ worker.run();
+
+ assertTrue(completed.isEmpty(), "fetchCompleted must not be called when cancelled before start");
+ }
+
+ @Test
+ @Timeout(5)
+ void rawRequestTypeCallsGetRawValues() throws Exception {
+ PVItem item = new PVItem("TESTPV", 0.0);
+ item.setRequestType(RequestType.RAW);
+ item.addArchiveDataSource(new ArchiveDataSource("src://any", "Any"));
+
+ ArchiveFetchJobListener listener = new ArchiveFetchJobListener() {
+ @Override public void fetchCompleted(ArchiveFetchJob job) {}
+ @Override public void archiveFetchFailed(ArchiveFetchJob job, ArchiveDataSource archive, Exception error) {}
+ @Override public void channelNotFound(ArchiveFetchJob job, boolean found, List failed) {}
+ };
+
+ ArchiveReader mockReader = readerReturning(oneValueIterator());
+ TestableFetchJob job = new TestableFetchJob(item, Instant.now().minusSeconds(60), Instant.now(), listener);
+ job.whenUrl("src://any", mockReader);
+
+ job.new WorkerThread().run();
+
+ verify(mockReader).getRawValues(any(), any(), any());
+ verify(mockReader, never()).getOptimizedValues(any(), any(), any(), anyInt());
+ }
+
+ @Test
+ @Timeout(5)
+ void optimizedRequestTypeCallsGetOptimizedValues() throws Exception {
+ PVItem item = new PVItem("TESTPV", 0.0);
+ item.setRequestType(RequestType.OPTIMIZED);
+ item.addArchiveDataSource(new ArchiveDataSource("src://any", "Any"));
+
+ ArchiveFetchJobListener listener = new ArchiveFetchJobListener() {
+ @Override public void fetchCompleted(ArchiveFetchJob job) {}
+ @Override public void archiveFetchFailed(ArchiveFetchJob job, ArchiveDataSource archive, Exception error) {}
+ @Override public void channelNotFound(ArchiveFetchJob job, boolean found, List failed) {}
+ };
+
+ ArchiveReader mockReader = readerReturning(oneValueIterator());
+ TestableFetchJob job = new TestableFetchJob(item, Instant.now().minusSeconds(60), Instant.now(), listener);
+ job.whenUrl("src://any", mockReader);
+
+ job.new WorkerThread().run();
+
+ verify(mockReader, never()).getRawValues(any(), any(), any());
+ verify(mockReader).getOptimizedValues(any(), any(), any(), anyInt());
+ }
+
+ @Test
+ @Timeout(5)
+ void cancelInterruptsPendingFetch() throws Exception {
+ PVItem item = new PVItem("TESTPV", 0.0);
+ item.addArchiveDataSource(new ArchiveDataSource("src://slow", "Slow"));
+
+ ArchiveFetchJobListener listener = new ArchiveFetchJobListener() {
+ @Override public void fetchCompleted(ArchiveFetchJob job) {}
+ @Override public void archiveFetchFailed(ArchiveFetchJob job, ArchiveDataSource archive, Exception error) {}
+ @Override public void channelNotFound(ArchiveFetchJob job, boolean found, List failed) {}
+ };
+
+ CountDownLatch readerBlocking = new CountDownLatch(1);
+ CountDownLatch releaseReader = new CountDownLatch(1);
+ ValueIterator blockingIter = mock(ValueIterator.class);
+ when(blockingIter.hasNext()).thenAnswer(inv -> {
+ readerBlocking.countDown();
+ releaseReader.await();
+ return false;
+ });
+ ArchiveReader slowReader = readerReturning(blockingIter);
+
+ // Long timeout so cancellation (not timeout) drives the exit
+ int savedTimeout = Preferences.archive_read_timeout_ms;
+ Preferences.archive_read_timeout_ms = 30_000;
+ try {
+ TestableFetchJob job = new TestableFetchJob(item, Instant.now().minusSeconds(60), Instant.now(), listener);
+ job.whenUrl("src://slow", slowReader);
+
+ ArchiveFetchJob.WorkerThread worker = job.new WorkerThread();
+
+ Thread t = new Thread(worker::run);
+ t.start();
+
+ readerBlocking.await(); // wait until the fetch is blocked
+ worker.cancel(); // signal cancellation + cancel active reader
+ releaseReader.countDown(); // unblock the carrier thread
+
+ t.join(3000);
+ assertFalse(t.isAlive(), "WorkerThread should have exited after cancel()");
+ } finally {
+ Preferences.archive_read_timeout_ms = savedTimeout;
+ releaseReader.countDown();
+ }
+ }
+}
diff --git a/app/trends/archive-reader/pom.xml b/app/trends/archive-reader/pom.xml
index d84c31b5fd..ce599f391b 100644
--- a/app/trends/archive-reader/pom.xml
+++ b/app/trends/archive-reader/pom.xml
@@ -62,5 +62,11 @@
1.3
test
+
+ org.mockito
+ mockito-core
+ ${mockito.version}
+ test
+
diff --git a/app/trends/archive-reader/src/main/java/org/phoebus/archive/reader/appliance/ApplianceArchiveReader.java b/app/trends/archive-reader/src/main/java/org/phoebus/archive/reader/appliance/ApplianceArchiveReader.java
index 1887885028..2c90dbec63 100644
--- a/app/trends/archive-reader/src/main/java/org/phoebus/archive/reader/appliance/ApplianceArchiveReader.java
+++ b/app/trends/archive-reader/src/main/java/org/phoebus/archive/reader/appliance/ApplianceArchiveReader.java
@@ -13,6 +13,8 @@
import java.util.List;
import java.util.Map;
import java.util.WeakHashMap;
+import java.util.logging.Level;
+import java.util.logging.Logger;
import org.epics.archiverappliance.retrieval.client.DataRetrieval;
import org.epics.archiverappliance.retrieval.client.EpicsMessage;
@@ -29,14 +31,16 @@
*
* @author Miha Novak miha.novak@cosylab.com
*/
-public class ApplianceArchiveReader implements ArchiveReader, IteratorListener {
+public class ApplianceArchiveReader implements ArchiveReader {
+
+ private static final Logger logger = Logger.getLogger(ApplianceArchiveReader.class.getName());
private final String httpURL;
private final String pbrawURL;
private final boolean useStatistics;
private final boolean useNewOptimizedOperator;
- private Map iterators = Collections.synchronizedMap(
+ Map iterators = Collections.synchronizedMap(
new WeakHashMap());
/**
@@ -109,7 +113,7 @@ public ValueIterator getRawValues(String name, Instant start, Instant end)
throws UnknownChannelException, Exception {
try {
name = stripSchema(name);
- ApplianceRawValueIterator it = new ApplianceRawValueIterator(this, name, start, end, this);
+ ApplianceRawValueIterator it = new ApplianceRawValueIterator(this, name, start, end);
iterators.put(it,this);
return it;
} catch (ArchiverApplianceException ex) {
@@ -125,12 +129,14 @@ public ValueIterator getOptimizedValues(String name, Instant start, Instant end,
if (useNewOptimizedOperator) {
//try to fetch the data using the new optimized operator
try {
- it = new ApplianceOptimizedValueIterator(this, name, start, end, count, useStatistics, this);
+ it = new ApplianceOptimizedValueIterator(this, name, start, end, count, useStatistics);
} catch (ArchiverApplianceInvalidTypeException e) {
- //binning not supported
+ //binning not supported for this PV type — expected for enum/waveform
+ logger.log(Level.FINE, "Optimized operator not supported for PV type, falling back: " + name, e);
binningSupported = false;
} catch (ArchiverApplianceException e) {
//optimized operator not supported on the server, fall back to the old way
+ logger.log(Level.WARNING, "Optimized operator failed for " + name + ", falling back to count-based path", e);
}
}
@@ -139,31 +145,33 @@ public ValueIterator getOptimizedValues(String name, Instant start, Instant end,
try {
int points = getNumberOfPoints(name, start, end);
if (points <= count) {
- it = new ApplianceRawValueIterator(this, name, start, end, this);
+ it = new ApplianceRawValueIterator(this, name, start, end);
} else {
//only fetch if binning is "still" supported
if (binningSupported) {
try {
//try to bin the values using the mean and std etc. This will work for numeric scalar PVs
if (useStatistics) {
- it = new ApplianceStatisticsValueIterator(this, name, start, end, count,this);
+ it = new ApplianceStatisticsValueIterator(this, name, start, end, count);
} else {
- it = new ApplianceMeanValueIterator(this, name, start, end, count,this);
+ it = new ApplianceMeanValueIterator(this, name, start, end, count);
}
} catch (ArchiverApplianceInvalidTypeException e) {
+ logger.log(Level.FINE, "Binning not supported for PV type, using non-numeric path: " + name, e);
binningSupported = false;
}
}
if (!binningSupported) {
//if binning is not supported (string, waveform type), try nth operator
- it = new ApplianceNonNumericOptimizedValueIterator(this, name, start, end, count, points,this);
+ it = new ApplianceNonNumericOptimizedValueIterator(this, name, start, end, count, points);
}
}
} catch (ArchiverApplianceException e) {
//fallback for older archiver appliance, which didn't have the nth operator
+ logger.log(Level.WARNING, "Count-based path failed for " + name + ", falling back to raw values", e);
try {
- it = new ApplianceRawValueIterator(this, name, start, end, this);
+ it = new ApplianceRawValueIterator(this, name, start, end);
} catch (ArchiverApplianceException exc) {
throw new UnknownChannelException(name);
}
@@ -177,7 +185,11 @@ public ValueIterator getOptimizedValues(String name, Instant start, Instant end,
public void cancel() {
ApplianceValueIterator[] its = iterators.keySet().toArray(new ApplianceValueIterator[0]);
for (ApplianceValueIterator a : its) {
- a.close();
+ try {
+ a.close();
+ } catch (Exception ex) {
+ logger.log(Level.WARNING, "Failed to close iterator during cancel", ex);
+ }
}
}
@@ -315,11 +327,6 @@ private int getNumberOfPointsLegacy(String pvName, Instant start, Instant end) t
return 0;
}
- @Override
- public void finished(ApplianceValueIterator iterator) {
- iterators.remove(iterator);
- }
-
private static String stripSchema(String name) {
int idx = name.indexOf(ApplianceArchiveReaderConstants.SCHEMA_DELIMITER);
return idx > -1 ? name.substring(idx + ApplianceArchiveReaderConstants.SCHEMA_DELIMITER.length()) : name;
diff --git a/app/trends/archive-reader/src/main/java/org/phoebus/archive/reader/appliance/ApplianceMeanValueIterator.java b/app/trends/archive-reader/src/main/java/org/phoebus/archive/reader/appliance/ApplianceMeanValueIterator.java
index 4303995af9..c68ed1c0fa 100644
--- a/app/trends/archive-reader/src/main/java/org/phoebus/archive/reader/appliance/ApplianceMeanValueIterator.java
+++ b/app/trends/archive-reader/src/main/java/org/phoebus/archive/reader/appliance/ApplianceMeanValueIterator.java
@@ -33,16 +33,14 @@ public class ApplianceMeanValueIterator extends ApplianceValueIterator {
* @param start start of the time period
* @param end end of the time period
* @param points the number of requested points
- * @param listener the listener that is notified when the iterator is closed
- *
* @throws IOException if there was an error during the data fetch process
* @throws ArchiverApplianceException if it is not possible to load optimized data for the selected PV
* @throws ArchiverApplianceInvalidTypeException if the type of data cannot be returned in optimized format
*/
public ApplianceMeanValueIterator(ApplianceArchiveReader reader,
- String name, Instant start, Instant end, int points, IteratorListener listener)
+ String name, Instant start, Instant end, int points)
throws ArchiverApplianceException, IOException {
- super(reader,name,start,end,listener);
+ super(reader, name, start, end);
this.requestedPoints = points;
this.display = determineDisplay(reader, name, end);
fetchData();
diff --git a/app/trends/archive-reader/src/main/java/org/phoebus/archive/reader/appliance/ApplianceNonNumericOptimizedValueIterator.java b/app/trends/archive-reader/src/main/java/org/phoebus/archive/reader/appliance/ApplianceNonNumericOptimizedValueIterator.java
index b3a7e99389..201c5bf915 100644
--- a/app/trends/archive-reader/src/main/java/org/phoebus/archive/reader/appliance/ApplianceNonNumericOptimizedValueIterator.java
+++ b/app/trends/archive-reader/src/main/java/org/phoebus/archive/reader/appliance/ApplianceNonNumericOptimizedValueIterator.java
@@ -26,15 +26,14 @@ public class ApplianceNonNumericOptimizedValueIterator extends ApplianceValueIte
* @param end end of the time period
* @param requestedPoints number of requested points
* @param totalNumberOfPoints the number of all points in the archive
- * @param listener the listener that is notified when the iterator is closed
*
* @throws IOException if there was an error during the data fetch process
* @throws ArchiverApplianceException if it is not possible to load optimized data for the selected PV
*/
public ApplianceNonNumericOptimizedValueIterator(ApplianceArchiveReader reader,
- String name, Instant start, Instant end, int requestedPoints, int totalNumberOfPoints,
- IteratorListener listener) throws ArchiverApplianceException, IOException {
- super(reader,name,start,end,listener);
+ String name, Instant start, Instant end, int requestedPoints, int totalNumberOfPoints)
+ throws ArchiverApplianceException, IOException {
+ super(reader, name, start, end);
this.requestedPoints = requestedPoints;
this.totalNumberOfPoints = totalNumberOfPoints;
fetchData();
diff --git a/app/trends/archive-reader/src/main/java/org/phoebus/archive/reader/appliance/ApplianceOptimizedValueIterator.java b/app/trends/archive-reader/src/main/java/org/phoebus/archive/reader/appliance/ApplianceOptimizedValueIterator.java
index 91dbd5b750..cbb8b653a8 100644
--- a/app/trends/archive-reader/src/main/java/org/phoebus/archive/reader/appliance/ApplianceOptimizedValueIterator.java
+++ b/app/trends/archive-reader/src/main/java/org/phoebus/archive/reader/appliance/ApplianceOptimizedValueIterator.java
@@ -44,16 +44,15 @@ public class ApplianceOptimizedValueIterator extends ApplianceValueIterator {
* @param points the number of requested points
* @param useStatistics true if the returned data should include statistics or false if only mean value should be
* present
- * @param listener the listener that is notified when the iterator is closed
*
* @throws IOException if there was an error during the data fetch process
* @throws ArchiverApplianceInvalidTypeException if the type of data cannot be returned in optimized format
* @throws ArchiverApplianceException if it is not possible to load optimised data for the selected PV
*/
public ApplianceOptimizedValueIterator(ApplianceArchiveReader reader, String name, Instant start, Instant end,
- int points, boolean useStatistics, IteratorListener listener) throws ArchiverApplianceException,
+ int points, boolean useStatistics) throws ArchiverApplianceException,
IOException {
- super(reader, name, start, end, listener);
+ super(reader, name, start, end);
this.requestedPoints = points;
this.useStatistics = useStatistics;
this.display = determineDisplay(reader, name, end);
diff --git a/app/trends/archive-reader/src/main/java/org/phoebus/archive/reader/appliance/ApplianceRawValueIterator.java b/app/trends/archive-reader/src/main/java/org/phoebus/archive/reader/appliance/ApplianceRawValueIterator.java
index 9f3306bff6..81bcbc469b 100644
--- a/app/trends/archive-reader/src/main/java/org/phoebus/archive/reader/appliance/ApplianceRawValueIterator.java
+++ b/app/trends/archive-reader/src/main/java/org/phoebus/archive/reader/appliance/ApplianceRawValueIterator.java
@@ -22,15 +22,13 @@ public class ApplianceRawValueIterator extends ApplianceValueIterator {
* @param name name of the PV
* @param start start of the time period
* @param end end of the time period
- * @param listener the listener which is notified when the iterator is closed
- *
* @throws IOException if there was an error during the data fetch process
* @throws ArchiverApplianceException if the data cannot be loaded with this algorithm
*/
public ApplianceRawValueIterator(ApplianceArchiveReader reader,
- String name, Instant start, Instant end, IteratorListener listener)
+ String name, Instant start, Instant end)
throws ArchiverApplianceException, IOException {
- super(reader,name,start,end,listener);
+ super(reader, name, start, end);
fetchData();
}
}
\ No newline at end of file
diff --git a/app/trends/archive-reader/src/main/java/org/phoebus/archive/reader/appliance/ApplianceStatisticsValueIterator.java b/app/trends/archive-reader/src/main/java/org/phoebus/archive/reader/appliance/ApplianceStatisticsValueIterator.java
index 2140a62ce9..dbed29295a 100644
--- a/app/trends/archive-reader/src/main/java/org/phoebus/archive/reader/appliance/ApplianceStatisticsValueIterator.java
+++ b/app/trends/archive-reader/src/main/java/org/phoebus/archive/reader/appliance/ApplianceStatisticsValueIterator.java
@@ -3,6 +3,8 @@
import java.io.IOException;
import java.time.Instant;
import java.util.Iterator;
+import java.util.logging.Level;
+import java.util.logging.Logger;
import org.epics.archiverappliance.retrieval.client.DataRetrieval;
import org.epics.archiverappliance.retrieval.client.EpicsMessage;
@@ -27,6 +29,9 @@
*/
public class ApplianceStatisticsValueIterator extends ApplianceMeanValueIterator {
+ private static final Logger logger =
+ Logger.getLogger(ApplianceStatisticsValueIterator.class.getPackage().getName());
+
private Iterator stdIterator;
private GenMsgIterator stdStream;
private Iterator minIterator;
@@ -52,8 +57,8 @@ public class ApplianceStatisticsValueIterator extends ApplianceMeanValueIterator
* @throws ArchiverApplianceInvalidTypeException if the type of data cannot be returned in optimized format
*/
public ApplianceStatisticsValueIterator(ApplianceArchiveReader reader, String name, Instant start, Instant end,
- int points, IteratorListener listener) throws ArchiverApplianceException, IOException {
- super(reader, name, start, end, points, listener);
+ int points) throws ArchiverApplianceException, IOException {
+ super(reader, name, start, end, points);
}
@Override
@@ -111,24 +116,25 @@ protected void fetchDataInternal(String pvName) throws ArchiverApplianceExceptio
*/
@Override
public void close() {
- try {
- synchronized (this) {
- if (stdStream != null) {
- stdStream.close();
- }
- if (minStream != null) {
- minStream.close();
- }
- if (maxStream != null) {
- maxStream.close();
- }
- if (countStream != null) {
- countStream.close();
- }
+ synchronized (this) {
+ try {
+ closeStream(stdStream);
+ closeStream(minStream);
+ closeStream(maxStream);
+ closeStream(countStream);
+ } finally {
super.close();
}
- } catch (IOException e) {
- throw new IllegalStateException(e);
+ }
+ }
+
+ private static void closeStream(GenMsgIterator s) {
+ if (s != null) {
+ try {
+ s.close();
+ } catch (IOException e) {
+ logger.log(Level.WARNING, e, () -> "Failed to close statistics sub-stream");
+ }
}
}
diff --git a/app/trends/archive-reader/src/main/java/org/phoebus/archive/reader/appliance/ApplianceValueIterator.java b/app/trends/archive-reader/src/main/java/org/phoebus/archive/reader/appliance/ApplianceValueIterator.java
index d7feba56c5..b565ff272e 100644
--- a/app/trends/archive-reader/src/main/java/org/phoebus/archive/reader/appliance/ApplianceValueIterator.java
+++ b/app/trends/archive-reader/src/main/java/org/phoebus/archive/reader/appliance/ApplianceValueIterator.java
@@ -3,6 +3,8 @@
import java.io.IOException;
import java.time.Instant;
import java.util.ArrayList;
+import java.util.logging.Level;
+import java.util.logging.Logger;
import java.util.Iterator;
import java.util.List;
import java.util.HashMap;
@@ -52,6 +54,9 @@
*/
public abstract class ApplianceValueIterator implements ValueIterator {
+ private static final Logger logger =
+ Logger.getLogger(ApplianceValueIterator.class.getPackage().getName());
+
protected EnumDisplay enumDisplay;
protected Display display;
protected GenMsgIterator mainStream;
@@ -63,11 +68,9 @@ public abstract class ApplianceValueIterator implements ValueIterator {
protected final Instant start;
protected final Instant end;
- private final IteratorListener listener;
-
- protected boolean closed = false;
+ protected volatile boolean closed = false;
- private static Object lock = new Object();
+ private final Object lock = new Object();
/**
* Constructs a new ApplianceValueIterator.
@@ -77,13 +80,11 @@ public abstract class ApplianceValueIterator implements ValueIterator {
* @param start the start of the time window of the data
* @param end the end of the time window of the data
*/
- protected ApplianceValueIterator(ApplianceArchiveReader reader, String name, Instant start, Instant end,
- IteratorListener listener) {
+ protected ApplianceValueIterator(ApplianceArchiveReader reader, String name, Instant start, Instant end) {
this.reader = reader;
this.name = name;
this.start = start;
this.end = end;
- this.listener = listener;
}
/**
@@ -126,8 +127,10 @@ protected void fetchDataInternal(String pvName) throws ArchiverApplianceExceptio
* @see org.csstudio.archive.reader.ValueIterator#hasNext()
*/
@Override
- public synchronized boolean hasNext() {
- return !closed && mainIterator != null && mainIterator.hasNext();
+ public boolean hasNext() {
+ if (closed || mainIterator == null)
+ return false;
+ return mainIterator.hasNext();
}
/*
@@ -262,9 +265,9 @@ public void close() {
closed = true;
}
} catch (IOException e) {
+ logger.log(Level.WARNING, e, () -> "Failed to close stream for PV: " + name);
throw new IllegalStateException(e);
}
- listener.finished(this);
}
/**
diff --git a/app/trends/archive-reader/src/main/java/org/phoebus/archive/reader/appliance/IteratorListener.java b/app/trends/archive-reader/src/main/java/org/phoebus/archive/reader/appliance/IteratorListener.java
deleted file mode 100644
index 76ec96df95..0000000000
--- a/app/trends/archive-reader/src/main/java/org/phoebus/archive/reader/appliance/IteratorListener.java
+++ /dev/null
@@ -1,7 +0,0 @@
-package org.phoebus.archive.reader.appliance;
-
-/** Listener to iterator */
-public interface IteratorListener {
- /** @param source Value iterator */
- public void finished(ApplianceValueIterator source);
-}
diff --git a/app/trends/archive-reader/src/test/java/org/phoebus/archive/reader/appliance/ApplianceArchiveReaderTest.java b/app/trends/archive-reader/src/test/java/org/phoebus/archive/reader/appliance/ApplianceArchiveReaderTest.java
new file mode 100644
index 0000000000..8ebeda09b7
--- /dev/null
+++ b/app/trends/archive-reader/src/test/java/org/phoebus/archive/reader/appliance/ApplianceArchiveReaderTest.java
@@ -0,0 +1,185 @@
+package org.phoebus.archive.reader.appliance;
+
+import edu.stanford.slac.archiverappliance.PB.EPICSEvent.PayloadInfo;
+import edu.stanford.slac.archiverappliance.PB.EPICSEvent.PayloadType;
+import org.epics.archiverappliance.retrieval.client.EpicsMessage;
+import org.epics.archiverappliance.retrieval.client.GenMsgIterator;
+import org.junit.jupiter.api.Test;
+
+import java.lang.ref.WeakReference;
+import java.time.Instant;
+import java.util.Collections;
+
+import static org.junit.jupiter.api.Assertions.*;
+import static org.mockito.Mockito.*;
+
+class ApplianceArchiveReaderTest {
+
+ private static final Instant START = Instant.now().minusSeconds(3600);
+ private static final Instant END = Instant.now();
+
+ private static GenMsgIterator emptyStream() {
+ GenMsgIterator s = mock(GenMsgIterator.class);
+ when(s.iterator()).thenReturn(Collections.emptyIterator());
+ return s;
+ }
+
+ private static GenMsgIterator probeStream(PayloadType type) {
+ EpicsMessage msg = mock(EpicsMessage.class);
+ GenMsgIterator s = mock(GenMsgIterator.class);
+ when(s.iterator()).thenReturn(Collections.singletonList(msg).iterator());
+ when(s.getPayLoadInfo()).thenReturn(PayloadInfo.newBuilder().setType(type).buildPartial());
+ return s;
+ }
+
+ private static GenMsgIterator countStream(int count) {
+ EpicsMessage msg = mock(EpicsMessage.class);
+ when(msg.getNumberValue()).thenReturn(count);
+ GenMsgIterator s = mock(GenMsgIterator.class);
+ when(s.iterator()).thenReturn(Collections.singletonList(msg).iterator());
+ return s;
+ }
+
+ /**
+ * When the PV type is SCALAR_ENUM the optimized path throws, getOptimizedValues
+ * falls back, and since binning is unsupported it reaches NonNumericOptimizedValueIterator
+ * (or RawValueIterator if points <= count).
+ */
+ @Test
+ void scalarEnumRoutesToNonNumericIterator() throws Exception {
+ // probe returns SCALAR_ENUM → OptimizedValueIterator throws ArchiverApplianceInvalidTypeException
+ // ncount returns 200 points > requested 100 → NonNumeric path chosen
+ FakeDataRetrieval dr = new FakeDataRetrieval(probeStream(PayloadType.SCALAR_ENUM));
+ dr.whenPvContains("ncount", countStream(200));
+ dr.whenPvContains("nth_", emptyStream());
+
+ FakeApplianceArchiveReader reader = new FakeApplianceArchiveReader(dr, false, true);
+ Object iter = reader.getOptimizedValues("TEST:PV", START, END, 100);
+
+ assertInstanceOf(ApplianceNonNumericOptimizedValueIterator.class, iter);
+ }
+
+ @Test
+ void numericScalarUsesOptimizedIterator() throws Exception {
+ // probe returns SCALAR_DOUBLE → OptimizedValueIterator succeeds
+ FakeDataRetrieval dr = new FakeDataRetrieval(probeStream(PayloadType.SCALAR_DOUBLE));
+ dr.whenPvContains("optimized_", emptyStream());
+
+ FakeApplianceArchiveReader reader = new FakeApplianceArchiveReader(dr, false, true);
+ Object iter = reader.getOptimizedValues("TEST:PV", START, END, 100);
+
+ assertInstanceOf(ApplianceOptimizedValueIterator.class, iter);
+ }
+
+ @Test
+ void cancelClosesAllActiveIterators() throws Exception {
+ FakeDataRetrieval dr = new FakeDataRetrieval(emptyStream());
+ FakeApplianceArchiveReader reader = new FakeApplianceArchiveReader(dr);
+
+ ApplianceValueIterator iter = (ApplianceValueIterator) reader.getRawValues("TEST:PV", START, END);
+ assertFalse(iter.closed);
+
+ reader.cancel();
+ assertTrue(iter.closed);
+ }
+
+ @Test
+ void weakMapDoesNotPreventGC() throws Exception {
+ FakeDataRetrieval dr = new FakeDataRetrieval(emptyStream());
+ FakeApplianceArchiveReader reader = new FakeApplianceArchiveReader(dr);
+
+ WeakReference ref;
+ {
+ ApplianceValueIterator iter = (ApplianceValueIterator) reader.getRawValues("TEST:PV", START, END);
+ ref = new WeakReference<>(iter);
+ // iter goes out of scope here; no other strong reference
+ }
+
+ for (int i = 0; i < 100 && ref.get() != null; i++) {
+ System.gc();
+ Thread.sleep(10);
+ }
+
+ assertNull(ref.get(), "Iterator was not garbage collected");
+ assertTrue(reader.iterators.isEmpty(), "WeakHashMap should be empty after GC");
+ }
+
+ @Test
+ void getNumberOfPointsUsesNcountOperator() throws Exception {
+ // ncount returns 0 ≤ requested 100 → falls back to RawValueIterator
+ FakeDataRetrieval dr = new FakeDataRetrieval(emptyStream());
+ FakeApplianceArchiveReader reader = new FakeApplianceArchiveReader(dr, false, false);
+ reader.getOptimizedValues("TEST:PV", START, END, 100);
+
+ assertTrue(dr.pvsCalled.stream().anyMatch(pv -> pv.startsWith("ncount(")),
+ "Expected an ncount( call, got: " + dr.pvsCalled);
+ }
+
+ @Test
+ void getRawValuesRegistersIteratorInMap() throws Exception {
+ FakeDataRetrieval dr = new FakeDataRetrieval(emptyStream());
+ FakeApplianceArchiveReader reader = new FakeApplianceArchiveReader(dr);
+ ApplianceValueIterator iter = (ApplianceValueIterator) reader.getRawValues("TEST:PV", START, END);
+ assertTrue(reader.iterators.containsKey(iter), "iterator should be registered in the map after getRawValues");
+ }
+
+ @Test
+ void closeCallsCancel() throws Exception {
+ FakeDataRetrieval dr = new FakeDataRetrieval(emptyStream());
+ FakeApplianceArchiveReader reader = new FakeApplianceArchiveReader(dr);
+ ApplianceValueIterator iter = (ApplianceValueIterator) reader.getRawValues("TEST:PV", START, END);
+ assertFalse(iter.closed);
+ reader.close();
+ assertTrue(iter.closed, "close() should close all active iterators via cancel()");
+ }
+
+ @Test
+ void pointsAtOrBelowCountUsesRawIterator() throws Exception {
+ // ncount returns 50, requested count is 100 → points <= count → ApplianceRawValueIterator
+ FakeDataRetrieval dr = new FakeDataRetrieval(emptyStream());
+ dr.whenPvContains("ncount(", countStream(50));
+ FakeApplianceArchiveReader reader = new FakeApplianceArchiveReader(dr, false, false);
+ Object iter = reader.getOptimizedValues("TEST:PV", START, END, 100);
+ assertInstanceOf(ApplianceRawValueIterator.class, iter);
+ }
+
+ @Test
+ void numericScalarWithUseStatisticsTrueUsesStatisticsIterator() throws Exception {
+ // ncount returns 200 > 100 and PV type is SCALAR_DOUBLE → ApplianceStatisticsValueIterator
+ FakeDataRetrieval dr = new FakeDataRetrieval(probeStream(PayloadType.SCALAR_DOUBLE));
+ dr.whenPvContains("ncount(", countStream(200));
+ dr.whenPvContains("mean_", emptyStream());
+ dr.whenPvContains("std_", emptyStream());
+ dr.whenPvContains("min_", emptyStream());
+ dr.whenPvContains("max_", emptyStream());
+ dr.whenPvContains("count_", emptyStream());
+ FakeApplianceArchiveReader reader = new FakeApplianceArchiveReader(dr, true, false);
+ Object iter = reader.getOptimizedValues("TEST:PV", START, END, 100);
+ assertInstanceOf(ApplianceStatisticsValueIterator.class, iter);
+ }
+
+ @Test
+ void numericScalarWithUseStatisticsFalseUsesMeanIterator() throws Exception {
+ // ncount returns 200 > 100, useStatistics=false → ApplianceMeanValueIterator (not statistics subclass)
+ FakeDataRetrieval dr = new FakeDataRetrieval(probeStream(PayloadType.SCALAR_DOUBLE));
+ dr.whenPvContains("ncount(", countStream(200));
+ dr.whenPvContains("mean_", emptyStream());
+ FakeApplianceArchiveReader reader = new FakeApplianceArchiveReader(dr, false, false);
+ Object iter = reader.getOptimizedValues("TEST:PV", START, END, 100);
+ assertEquals(ApplianceMeanValueIterator.class, iter.getClass(),
+ "expected exact ApplianceMeanValueIterator, not a statistics subclass");
+ }
+
+ @Test
+ void oldApplianceFallbackOnFetchFailure() throws Exception {
+ // Statistics fetch fails mid-construction (std_ stream null → ArchiverApplianceException)
+ // → outer catch falls back to ApplianceRawValueIterator
+ FakeDataRetrieval dr = new FakeDataRetrieval(emptyStream());
+ dr.whenPvContains("ncount(", countStream(200));
+ dr.whenPvContains("mean_", emptyStream());
+ dr.whenPvContains("std_", null); // null triggers ArchiverApplianceException in statistics iterator
+ FakeApplianceArchiveReader reader = new FakeApplianceArchiveReader(dr, true, false);
+ Object iter = reader.getOptimizedValues("TEST:PV", START, END, 100);
+ assertInstanceOf(ApplianceRawValueIterator.class, iter);
+ }
+}
diff --git a/app/trends/archive-reader/src/test/java/org/phoebus/archive/reader/appliance/ApplianceMeanValueIteratorTest.java b/app/trends/archive-reader/src/test/java/org/phoebus/archive/reader/appliance/ApplianceMeanValueIteratorTest.java
new file mode 100644
index 0000000000..43c9b27bf6
--- /dev/null
+++ b/app/trends/archive-reader/src/test/java/org/phoebus/archive/reader/appliance/ApplianceMeanValueIteratorTest.java
@@ -0,0 +1,66 @@
+package org.phoebus.archive.reader.appliance;
+
+import edu.stanford.slac.archiverappliance.PB.EPICSEvent.PayloadInfo;
+import edu.stanford.slac.archiverappliance.PB.EPICSEvent.PayloadType;
+import org.epics.archiverappliance.retrieval.client.EpicsMessage;
+import org.epics.archiverappliance.retrieval.client.GenMsgIterator;
+import org.junit.jupiter.api.Test;
+
+import java.time.Instant;
+import java.util.Collections;
+
+import static org.junit.jupiter.api.Assertions.*;
+import static org.mockito.Mockito.*;
+
+class ApplianceMeanValueIteratorTest {
+
+ private static final Instant START = Instant.now().minusSeconds(3600);
+ private static final Instant END = Instant.now();
+ private static final int POINTS = 60;
+
+ private static GenMsgIterator emptyStream() {
+ GenMsgIterator s = mock(GenMsgIterator.class);
+ when(s.iterator()).thenReturn(Collections.emptyIterator());
+ return s;
+ }
+
+ private static GenMsgIterator probeStream(PayloadType type) {
+ EpicsMessage msg = mock(EpicsMessage.class);
+ GenMsgIterator s = mock(GenMsgIterator.class);
+ when(s.iterator()).thenReturn(Collections.singletonList(msg).iterator());
+ when(s.getPayLoadInfo()).thenReturn(PayloadInfo.newBuilder().setType(type).buildPartial());
+ return s;
+ }
+
+ @Test
+ void fetchUrlContainsMeanIntervalOperator() throws Exception {
+ FakeDataRetrieval dr = new FakeDataRetrieval(probeStream(PayloadType.SCALAR_DOUBLE));
+ dr.whenPvContains("mean_", emptyStream());
+
+ FakeApplianceArchiveReader reader = new FakeApplianceArchiveReader(dr);
+ new ApplianceMeanValueIterator(reader, "TEST:PV", START, END, POINTS);
+
+ assertTrue(dr.pvsCalled.stream().anyMatch(pv -> pv.startsWith("mean_") && pv.endsWith("(TEST:PV)")),
+ "Expected mean_(TEST:PV) in calls, got: " + dr.pvsCalled);
+ }
+
+ @Test
+ void determineDisplayRejectsEnum() {
+ FakeDataRetrieval dr = new FakeDataRetrieval(probeStream(PayloadType.SCALAR_ENUM));
+ FakeApplianceArchiveReader reader = new FakeApplianceArchiveReader(dr);
+
+ assertThrows(ArchiverApplianceInvalidTypeException.class,
+ () -> new ApplianceMeanValueIterator(reader, "TEST:PV", START, END, POINTS));
+ }
+
+ @Test
+ void determineDisplayAcceptsDouble() throws Exception {
+ FakeDataRetrieval dr = new FakeDataRetrieval(probeStream(PayloadType.SCALAR_DOUBLE));
+ dr.whenPvContains("mean_", emptyStream());
+
+ FakeApplianceArchiveReader reader = new FakeApplianceArchiveReader(dr);
+ ApplianceMeanValueIterator iter = new ApplianceMeanValueIterator(reader, "TEST:PV", START, END, POINTS);
+
+ assertNotNull(iter.display);
+ }
+}
diff --git a/app/trends/archive-reader/src/test/java/org/phoebus/archive/reader/appliance/ApplianceNonNumericOptimizedValueIteratorTest.java b/app/trends/archive-reader/src/test/java/org/phoebus/archive/reader/appliance/ApplianceNonNumericOptimizedValueIteratorTest.java
new file mode 100644
index 0000000000..a3b6dcf976
--- /dev/null
+++ b/app/trends/archive-reader/src/test/java/org/phoebus/archive/reader/appliance/ApplianceNonNumericOptimizedValueIteratorTest.java
@@ -0,0 +1,57 @@
+package org.phoebus.archive.reader.appliance;
+
+import org.epics.archiverappliance.retrieval.client.GenMsgIterator;
+import org.junit.jupiter.api.Test;
+
+import java.time.Instant;
+import java.util.Collections;
+
+import static org.junit.jupiter.api.Assertions.*;
+import static org.mockito.Mockito.*;
+
+class ApplianceNonNumericOptimizedValueIteratorTest {
+
+ private static final Instant START = Instant.now().minusSeconds(3600);
+ private static final Instant END = Instant.now();
+
+ private static GenMsgIterator emptyStream() {
+ GenMsgIterator s = mock(GenMsgIterator.class);
+ when(s.iterator()).thenReturn(Collections.emptyIterator());
+ return s;
+ }
+
+ private ApplianceNonNumericOptimizedValueIterator makeIterator(
+ FakeDataRetrieval dr, int requestedPoints, int totalPoints)
+ throws ArchiverApplianceException, java.io.IOException {
+ FakeApplianceArchiveReader reader = new FakeApplianceArchiveReader(dr);
+ return new ApplianceNonNumericOptimizedValueIterator(reader, "TEST:PV", START, END,
+ requestedPoints, totalPoints);
+ }
+
+ @Test
+ void nEqualsOneCallsRawFetch() throws Exception {
+ // totalPoints == requestedPoints → n = 1 → fetches bare PV name
+ FakeDataRetrieval dr = new FakeDataRetrieval(emptyStream());
+ makeIterator(dr, 100, 100);
+ assertTrue(dr.pvsCalled.contains("TEST:PV"),
+ "Expected bare PV name, got: " + dr.pvsCalled);
+ }
+
+ @Test
+ void nGreaterThanOneUsesNthOperator() throws Exception {
+ // totalPoints = 1000, requestedPoints = 100 → n = 10 → nth_10(TEST:PV)
+ FakeDataRetrieval dr = new FakeDataRetrieval(emptyStream());
+ makeIterator(dr, 100, 1000);
+ assertTrue(dr.pvsCalled.stream().anyMatch(pv -> pv.startsWith("nth_")),
+ "Expected nth_ prefix, got: " + dr.pvsCalled);
+ }
+
+ @Test
+ void boundaryAt1point5xGivesN2() throws Exception {
+ // 1.5 * 100 < 160 < 2 * 100 → n forced to 2, regardless of integer division (160/100=1)
+ FakeDataRetrieval dr = new FakeDataRetrieval(emptyStream());
+ makeIterator(dr, 100, 160);
+ assertTrue(dr.pvsCalled.stream().anyMatch(pv -> pv.startsWith("nth_2(")),
+ "Expected nth_2( prefix at boundary, got: " + dr.pvsCalled);
+ }
+}
diff --git a/app/trends/archive-reader/src/test/java/org/phoebus/archive/reader/appliance/ApplianceOptimizedValueIteratorTest.java b/app/trends/archive-reader/src/test/java/org/phoebus/archive/reader/appliance/ApplianceOptimizedValueIteratorTest.java
new file mode 100644
index 0000000000..c12686c866
--- /dev/null
+++ b/app/trends/archive-reader/src/test/java/org/phoebus/archive/reader/appliance/ApplianceOptimizedValueIteratorTest.java
@@ -0,0 +1,102 @@
+package org.phoebus.archive.reader.appliance;
+
+import edu.stanford.slac.archiverappliance.PB.EPICSEvent.PayloadInfo;
+import edu.stanford.slac.archiverappliance.PB.EPICSEvent.PayloadType;
+import org.epics.archiverappliance.retrieval.client.EpicsMessage;
+import org.epics.archiverappliance.retrieval.client.GenMsgIterator;
+import org.epics.vtype.VNumber;
+import org.epics.vtype.VStatistics;
+import org.epics.vtype.VType;
+import org.junit.jupiter.api.Test;
+
+import java.time.Instant;
+import java.util.Collections;
+
+import static org.junit.jupiter.api.Assertions.*;
+import static org.mockito.Mockito.*;
+
+class ApplianceOptimizedValueIteratorTest {
+
+ private static final Instant START = Instant.now().minusSeconds(3600);
+ private static final Instant END = Instant.now();
+ private static final int POINTS = 100;
+
+ private static GenMsgIterator emptyStream() {
+ GenMsgIterator s = mock(GenMsgIterator.class);
+ when(s.iterator()).thenReturn(Collections.emptyIterator());
+ return s;
+ }
+
+ private static GenMsgIterator probeStream(PayloadType type) {
+ EpicsMessage msg = mock(EpicsMessage.class);
+ GenMsgIterator s = mock(GenMsgIterator.class);
+ when(s.iterator()).thenReturn(Collections.singletonList(msg).iterator());
+ when(s.getPayLoadInfo()).thenReturn(PayloadInfo.newBuilder().setType(type).buildPartial());
+ return s;
+ }
+
+ @Test
+ void fetchUrlContainsOptimizedNOperator() throws Exception {
+ FakeDataRetrieval dr = new FakeDataRetrieval(probeStream(PayloadType.SCALAR_DOUBLE));
+ dr.whenPvContains("optimized_", emptyStream());
+
+ FakeApplianceArchiveReader reader = new FakeApplianceArchiveReader(dr);
+ new ApplianceOptimizedValueIterator(reader, "TEST:PV", START, END, POINTS, false);
+
+ assertTrue(dr.pvsCalled.stream().anyMatch(pv -> pv.startsWith("optimized_") && pv.endsWith("(TEST:PV)")),
+ "Expected optimized_(TEST:PV) call, got: " + dr.pvsCalled);
+ }
+
+ @Test
+ void determineDisplayRejectsEnum() {
+ FakeDataRetrieval dr = new FakeDataRetrieval(probeStream(PayloadType.SCALAR_ENUM));
+ FakeApplianceArchiveReader reader = new FakeApplianceArchiveReader(dr);
+
+ assertThrows(ArchiverApplianceInvalidTypeException.class,
+ () -> new ApplianceOptimizedValueIterator(reader, "TEST:PV", START, END, POINTS, false));
+ }
+
+ private static EpicsMessage waveformMsg(double mean, double std, double min, double max, int count) {
+ EpicsMessage msg = mock(EpicsMessage.class);
+ when(msg.getElementCount()).thenReturn(5);
+ when(msg.getNumberAt(0)).thenReturn(mean);
+ when(msg.getNumberAt(1)).thenReturn(std);
+ when(msg.getNumberAt(2)).thenReturn(min);
+ when(msg.getNumberAt(3)).thenReturn(max);
+ when(msg.getNumberAt(4)).thenReturn(count);
+ when(msg.getTimestamp()).thenReturn(new java.sql.Timestamp(System.currentTimeMillis()));
+ when(msg.getSeverity()).thenReturn(0);
+ when(msg.getStatus()).thenReturn(0);
+ return msg;
+ }
+
+ private ApplianceOptimizedValueIterator makeWithWaveformData(boolean useStatistics) throws Exception {
+ EpicsMessage dataMsg = waveformMsg(1.0, 0.1, 0.5, 1.5, 10);
+ GenMsgIterator dataStream = mock(GenMsgIterator.class);
+ PayloadInfo waveformInfo = PayloadInfo.newBuilder().setType(PayloadType.WAVEFORM_DOUBLE).buildPartial();
+ when(dataStream.iterator()).thenReturn(Collections.singletonList(dataMsg).iterator());
+ when(dataStream.getPayLoadInfo()).thenReturn(waveformInfo);
+
+ FakeDataRetrieval dr = new FakeDataRetrieval(probeStream(PayloadType.SCALAR_DOUBLE));
+ dr.whenPvContains("optimized_", dataStream);
+
+ FakeApplianceArchiveReader reader = new FakeApplianceArchiveReader(dr);
+ return new ApplianceOptimizedValueIterator(reader, "TEST:PV", START, END, POINTS, useStatistics);
+ }
+
+ @Test
+ void nextReturnsVStatisticsWhenUseStatisticsTrue() throws Exception {
+ ApplianceOptimizedValueIterator iter = makeWithWaveformData(true);
+ assertTrue(iter.hasNext());
+ VType result = iter.next();
+ assertInstanceOf(VStatistics.class, result);
+ }
+
+ @Test
+ void nextReturnsVNumberWhenUseStatisticsFalse() throws Exception {
+ ApplianceOptimizedValueIterator iter = makeWithWaveformData(false);
+ assertTrue(iter.hasNext());
+ VType result = iter.next();
+ assertInstanceOf(VNumber.class, result);
+ }
+}
diff --git a/app/trends/archive-reader/src/test/java/org/phoebus/archive/reader/appliance/ApplianceStatisticsValueIteratorTest.java b/app/trends/archive-reader/src/test/java/org/phoebus/archive/reader/appliance/ApplianceStatisticsValueIteratorTest.java
new file mode 100644
index 0000000000..011e908d86
--- /dev/null
+++ b/app/trends/archive-reader/src/test/java/org/phoebus/archive/reader/appliance/ApplianceStatisticsValueIteratorTest.java
@@ -0,0 +1,138 @@
+package org.phoebus.archive.reader.appliance;
+
+import edu.stanford.slac.archiverappliance.PB.EPICSEvent.PayloadInfo;
+import edu.stanford.slac.archiverappliance.PB.EPICSEvent.PayloadType;
+import org.epics.archiverappliance.retrieval.client.EpicsMessage;
+import org.epics.archiverappliance.retrieval.client.GenMsgIterator;
+import org.epics.vtype.VStatistics;
+import org.epics.vtype.VType;
+import org.junit.jupiter.api.Test;
+
+import java.sql.Timestamp;
+import java.time.Instant;
+import java.util.Collections;
+
+import static org.junit.jupiter.api.Assertions.*;
+import static org.mockito.Mockito.*;
+
+class ApplianceStatisticsValueIteratorTest {
+
+ private static final Instant START = Instant.now().minusSeconds(3600);
+ private static final Instant END = Instant.now();
+ private static final int POINTS = 60;
+
+ private static GenMsgIterator emptyStream() {
+ GenMsgIterator s = mock(GenMsgIterator.class);
+ when(s.iterator()).thenReturn(Collections.emptyIterator());
+ return s;
+ }
+
+ private static GenMsgIterator probeStream(PayloadType type) {
+ EpicsMessage msg = mock(EpicsMessage.class);
+ GenMsgIterator s = mock(GenMsgIterator.class);
+ when(s.iterator()).thenReturn(Collections.singletonList(msg).iterator());
+ when(s.getPayLoadInfo()).thenReturn(PayloadInfo.newBuilder().setType(type).buildPartial());
+ return s;
+ }
+
+ private ApplianceStatisticsValueIterator makeIterator(FakeDataRetrieval dr)
+ throws ArchiverApplianceException, java.io.IOException {
+ FakeApplianceArchiveReader reader = new FakeApplianceArchiveReader(dr);
+ return new ApplianceStatisticsValueIterator(reader, "TEST:PV", START, END, POINTS);
+ }
+
+ private FakeDataRetrieval setupDr(GenMsgIterator meanStream, GenMsgIterator stdStream,
+ GenMsgIterator minStream, GenMsgIterator maxStream,
+ GenMsgIterator countStream) {
+ FakeDataRetrieval dr = new FakeDataRetrieval(probeStream(PayloadType.SCALAR_DOUBLE));
+ dr.whenPvContains("mean_", meanStream);
+ dr.whenPvContains("std_", stdStream);
+ dr.whenPvContains("min_", minStream);
+ dr.whenPvContains("max_", maxStream);
+ dr.whenPvContains("count_", countStream);
+ return dr;
+ }
+
+ @Test
+ void fetchIssuesFiveOperatorCalls() throws Exception {
+ FakeDataRetrieval dr = setupDr(emptyStream(), emptyStream(), emptyStream(), emptyStream(), emptyStream());
+ makeIterator(dr);
+
+ assertTrue(dr.pvsCalled.stream().anyMatch(pv -> pv.startsWith("mean_")), "missing mean_ call");
+ assertTrue(dr.pvsCalled.stream().anyMatch(pv -> pv.startsWith("std_")), "missing std_ call");
+ assertTrue(dr.pvsCalled.stream().anyMatch(pv -> pv.startsWith("min_")), "missing min_ call");
+ assertTrue(dr.pvsCalled.stream().anyMatch(pv -> pv.startsWith("max_")), "missing max_ call");
+ assertTrue(dr.pvsCalled.stream().anyMatch(pv -> pv.startsWith("count_")), "missing count_ call");
+ }
+
+ @Test
+ void closeClosesAllFiveStreams() throws Exception {
+ GenMsgIterator meanStream = emptyStream();
+ GenMsgIterator stdStream = emptyStream();
+ GenMsgIterator minStream = emptyStream();
+ GenMsgIterator maxStream = emptyStream();
+ GenMsgIterator countStream = emptyStream();
+
+ FakeDataRetrieval dr = setupDr(meanStream, stdStream, minStream, maxStream, countStream);
+ ApplianceStatisticsValueIterator iter = makeIterator(dr);
+
+ iter.close();
+
+ verify(meanStream).close();
+ verify(stdStream).close();
+ verify(minStream).close();
+ verify(maxStream).close();
+ verify(countStream).close();
+ }
+
+ // ---- next() assembly and lifecycle ----
+
+ /** One-message stream returning the given numeric value; mean stream gets SCALAR_DOUBLE PayloadInfo. */
+ private static GenMsgIterator valueStream(double value, boolean withPayloadInfo) {
+ EpicsMessage msg = mock(EpicsMessage.class);
+ when(msg.getNumberValue()).thenReturn(value);
+ when(msg.getSeverity()).thenReturn(0);
+ when(msg.getStatus()).thenReturn(0);
+ when(msg.getTimestamp()).thenReturn(new Timestamp(System.currentTimeMillis()));
+ GenMsgIterator s = mock(GenMsgIterator.class);
+ when(s.iterator()).thenReturn(Collections.singletonList(msg).iterator());
+ if (withPayloadInfo)
+ when(s.getPayLoadInfo()).thenReturn(
+ PayloadInfo.newBuilder().setType(PayloadType.SCALAR_DOUBLE).buildPartial());
+ return s;
+ }
+
+ @Test
+ void nextAssemblesVStatisticsFromFiveIterators() throws Exception {
+ GenMsgIterator meanStream = valueStream(2.0, true);
+ GenMsgIterator stdStream = valueStream(0.5, false);
+ GenMsgIterator minStream = valueStream(1.0, false);
+ GenMsgIterator maxStream = valueStream(3.0, false);
+ GenMsgIterator countStream = valueStream(10.0, false);
+
+ FakeDataRetrieval dr = setupDr(meanStream, stdStream, minStream, maxStream, countStream);
+ ApplianceStatisticsValueIterator iter = makeIterator(dr);
+
+ assertTrue(iter.hasNext());
+ VType result = iter.next();
+
+ assertInstanceOf(VStatistics.class, result);
+ VStatistics vs = (VStatistics) result;
+ assertEquals(2.0, vs.getAverage(), 0.001);
+ assertEquals(0.5, vs.getStdDev(), 0.001);
+ assertEquals(1.0, vs.getMin(), 0.001);
+ assertEquals(3.0, vs.getMax(), 0.001);
+ assertEquals(10, (int) vs.getNSamples());
+ }
+
+ @Test
+ void nextReturnsNullWhenClosed() throws Exception {
+ GenMsgIterator meanStream = valueStream(1.0, true);
+ FakeDataRetrieval dr = setupDr(meanStream, emptyStream(), emptyStream(), emptyStream(), emptyStream());
+ ApplianceStatisticsValueIterator iter = makeIterator(dr);
+
+ iter.close();
+
+ assertNull(iter.next(), "next() after close() should return null without throwing");
+ }
+}
diff --git a/app/trends/archive-reader/src/test/java/org/phoebus/archive/reader/appliance/ApplianceValueIteratorConcurrencyTest.java b/app/trends/archive-reader/src/test/java/org/phoebus/archive/reader/appliance/ApplianceValueIteratorConcurrencyTest.java
new file mode 100644
index 0000000000..569c9adb0b
--- /dev/null
+++ b/app/trends/archive-reader/src/test/java/org/phoebus/archive/reader/appliance/ApplianceValueIteratorConcurrencyTest.java
@@ -0,0 +1,132 @@
+package org.phoebus.archive.reader.appliance;
+
+import org.epics.archiverappliance.retrieval.client.EpicsMessage;
+import org.epics.archiverappliance.retrieval.client.GenMsgIterator;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.phoebus.archive.reader.ValueIterator;
+
+import java.sql.Timestamp;
+import java.time.Instant;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+
+import static org.junit.jupiter.api.Assertions.*;
+import static org.mockito.Mockito.*;
+
+class ApplianceValueIteratorConcurrencyTest {
+
+ private static GenMsgIterator emptyStream() {
+ GenMsgIterator s = mock(GenMsgIterator.class);
+ when(s.iterator()).thenReturn(Collections.emptyIterator());
+ return s;
+ }
+
+ private static ValueIterator getRaw(FakeApplianceArchiveReader reader) throws Exception {
+ Instant start = Instant.now().minusSeconds(60);
+ return reader.getRawValues("TESTPV", start, Instant.now());
+ }
+
+ /**
+ * Regression test for the hasNext()/close() deadlock.
+ *
+ * With the old synchronized hasNext(), a worker blocked inside mainIterator.hasNext()
+ * held the 'this' monitor. close() could never acquire it, so the job hung forever
+ * and the semaphore permit was never returned. @Timeout(5) catches the hang.
+ */
+ @Test
+ @Timeout(5)
+ void closeCompletesWhileHasNextIsBlocking() throws Exception {
+ BlockingGenMsgIterator blocker = new BlockingGenMsgIterator();
+ GenMsgIterator stream = mock(GenMsgIterator.class);
+ when(stream.iterator()).thenReturn(blocker);
+
+ FakeApplianceArchiveReader reader = new FakeApplianceArchiveReader(new FakeDataRetrieval(stream));
+ ValueIterator iter = getRaw(reader);
+
+ Thread worker = new Thread(iter::hasNext);
+ worker.start();
+ blocker.entered.await(); // worker is now blocked inside hasNext()
+
+ iter.close(); // must not deadlock
+ assertFalse(iter.hasNext());
+
+ blocker.release.countDown();
+ worker.join(1000);
+ assertFalse(worker.isAlive());
+ }
+
+ @Test
+ void hasNextReturnsFalseAfterClose() throws Exception {
+ EpicsMessage msg = mock(EpicsMessage.class);
+ GenMsgIterator stream = mock(GenMsgIterator.class);
+ when(stream.iterator()).thenReturn(Collections.singletonList(msg).iterator());
+
+ ValueIterator iter = getRaw(new FakeApplianceArchiveReader(new FakeDataRetrieval(stream)));
+
+ assertTrue(iter.hasNext());
+ iter.close();
+ assertFalse(iter.hasNext());
+ }
+
+ @Test
+ void nextReturnsNullAfterClose() throws Exception {
+ EpicsMessage msg = mock(EpicsMessage.class);
+ GenMsgIterator stream = mock(GenMsgIterator.class);
+ when(stream.iterator()).thenReturn(Collections.singletonList(msg).iterator());
+
+ ValueIterator iter = getRaw(new FakeApplianceArchiveReader(new FakeDataRetrieval(stream)));
+ iter.close();
+ assertNull(iter.next());
+ }
+
+ /**
+ * Regression test for the static lock serialising fetches across all PV iterators.
+ *
+ * With a static lock, t1 holding it inside getDataForPVs blocks t2 even though
+ * they belong to completely independent readers. @Timeout(5) catches the hang.
+ */
+ @Test
+ @Timeout(5)
+ void twoIteratorFetchesProceedConcurrently() throws Exception {
+ CountDownLatch firstInFetch = new CountDownLatch(1);
+ CountDownLatch releaseFetch = new CountDownLatch(1);
+
+ FakeDataRetrieval dr1 = new FakeDataRetrieval(emptyStream()) {
+ @Override
+ public GenMsgIterator getDataForPVs(List pvNames, Timestamp start,
+ Timestamp end, boolean b, Map m) {
+ firstInFetch.countDown();
+ try { releaseFetch.await(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); }
+ return super.getDataForPVs(pvNames, start, end, b, m);
+ }
+ };
+ FakeDataRetrieval dr2 = new FakeDataRetrieval(emptyStream());
+
+ FakeApplianceArchiveReader reader1 = new FakeApplianceArchiveReader(dr1);
+ FakeApplianceArchiveReader reader2 = new FakeApplianceArchiveReader(dr2);
+ Instant start = Instant.now().minusSeconds(60);
+ Instant end = Instant.now();
+
+ Thread t1 = new Thread(() -> {
+ try { reader1.getRawValues("PV1", start, end); }
+ catch (Exception ignored) {}
+ });
+ Thread t2 = new Thread(() -> {
+ try { reader2.getRawValues("PV2", start, end); }
+ catch (Exception ignored) {}
+ });
+
+ t1.start();
+ firstInFetch.await(); // t1 is inside getDataForPVs, holding the lock
+
+ t2.start();
+ t2.join(2000); // t2 must finish while t1 is still blocked
+ assertFalse(t2.isAlive(), "t2 blocked — static lock contention between independent iterators");
+
+ releaseFetch.countDown();
+ t1.join(1000);
+ }
+}
diff --git a/app/trends/archive-reader/src/test/java/org/phoebus/archive/reader/appliance/ApplianceValueIteratorExtractDataTest.java b/app/trends/archive-reader/src/test/java/org/phoebus/archive/reader/appliance/ApplianceValueIteratorExtractDataTest.java
new file mode 100644
index 0000000000..ba95d438e8
--- /dev/null
+++ b/app/trends/archive-reader/src/test/java/org/phoebus/archive/reader/appliance/ApplianceValueIteratorExtractDataTest.java
@@ -0,0 +1,243 @@
+package org.phoebus.archive.reader.appliance;
+
+import com.google.protobuf.ByteString;
+import edu.stanford.slac.archiverappliance.PB.EPICSEvent;
+import edu.stanford.slac.archiverappliance.PB.EPICSEvent.FieldValue;
+import edu.stanford.slac.archiverappliance.PB.EPICSEvent.PayloadInfo;
+import edu.stanford.slac.archiverappliance.PB.EPICSEvent.PayloadType;
+import org.epics.archiverappliance.retrieval.client.EpicsMessage;
+import org.epics.archiverappliance.retrieval.client.GenMsgIterator;
+import org.epics.vtype.AlarmSeverity;
+import org.epics.vtype.VByteArray;
+import org.epics.vtype.VDoubleArray;
+import org.epics.vtype.VEnum;
+import org.epics.vtype.VIntArray;
+import org.epics.vtype.VNumber;
+import org.epics.vtype.VString;
+import org.epics.vtype.VType;
+import org.junit.jupiter.api.Test;
+
+import java.sql.Timestamp;
+import java.time.Instant;
+import java.util.Collections;
+
+import static org.junit.jupiter.api.Assertions.*;
+import static org.mockito.Mockito.*;
+
+/**
+ * Functional tests for ApplianceValueIterator.extractData() across all supported payload types.
+ *
+ * Uses a minimal concrete subclass that exposes the protected extractData() method directly,
+ * avoiding the need for real archive network connections.
+ */
+class ApplianceValueIteratorExtractDataTest {
+
+ // ---- minimal test subclass ----
+
+ private static class DataExtractIterator extends ApplianceValueIterator {
+ DataExtractIterator(ApplianceArchiveReader reader, GenMsgIterator stream) {
+ super(reader, "TEST:PV", Instant.EPOCH, Instant.EPOCH);
+ this.mainStream = stream;
+ this.mainIterator = stream.iterator();
+ }
+
+ VType extract(EpicsMessage msg) {
+ return extractData(msg);
+ }
+ }
+
+ // ---- helpers ----
+
+ private static GenMsgIterator streamOfType(PayloadType type, FieldValue... headers) {
+ GenMsgIterator s = mock(GenMsgIterator.class);
+ when(s.iterator()).thenReturn(Collections.emptyIterator());
+ PayloadInfo.Builder b = PayloadInfo.newBuilder().setType(type);
+ for (FieldValue h : headers) b.addHeaders(h);
+ when(s.getPayLoadInfo()).thenReturn(b.buildPartial());
+ return s;
+ }
+
+ private static DataExtractIterator iteratorWithStream(GenMsgIterator stream) {
+ FakeDataRetrieval dr = new FakeDataRetrieval(stream);
+ return new DataExtractIterator(new FakeApplianceArchiveReader(dr), stream);
+ }
+
+ private static DataExtractIterator iteratorForType(PayloadType type, FieldValue... headers) {
+ return iteratorWithStream(streamOfType(type, headers));
+ }
+
+ /** Mock EpicsMessage returning the given numeric value with no alarm and a current timestamp. */
+ private static EpicsMessage numericMsg(Number value) {
+ EpicsMessage msg = mock(EpicsMessage.class);
+ when(msg.getNumberValue()).thenReturn(value);
+ when(msg.getSeverity()).thenReturn(0);
+ when(msg.getStatus()).thenReturn(0);
+ when(msg.getTimestamp()).thenReturn(new Timestamp(System.currentTimeMillis()));
+ return msg;
+ }
+
+ /** EpicsMessage backed by a real protobuf message object (needed for getMessage().getField()). */
+ private static EpicsMessage pbMsg(com.google.protobuf.Message pb) {
+ EpicsMessage msg = mock(EpicsMessage.class);
+ when(msg.getMessage()).thenReturn(pb);
+ when(msg.getSeverity()).thenReturn(0);
+ when(msg.getStatus()).thenReturn(0);
+ when(msg.getTimestamp()).thenReturn(new Timestamp(System.currentTimeMillis()));
+ return msg;
+ }
+
+ // ---- scalar numeric types ----
+
+ @Test
+ void scalarDoubleYieldsVNumber() {
+ DataExtractIterator iter = iteratorForType(PayloadType.SCALAR_DOUBLE);
+ VType result = iter.extract(numericMsg(42.5));
+ assertInstanceOf(VNumber.class, result);
+ assertEquals(42.5, ((VNumber) result).getValue().doubleValue(), 0.001);
+ }
+
+ @Test
+ void scalarIntYieldsVNumber() {
+ DataExtractIterator iter = iteratorForType(PayloadType.SCALAR_INT);
+ VType result = iter.extract(numericMsg(7));
+ assertInstanceOf(VNumber.class, result);
+ assertEquals(7, ((VNumber) result).getValue().intValue());
+ }
+
+ @Test
+ void scalarByteYieldsVNumber() {
+ DataExtractIterator iter = iteratorForType(PayloadType.SCALAR_BYTE);
+ VType result = iter.extract(numericMsg(3));
+ assertInstanceOf(VNumber.class, result);
+ }
+
+ @Test
+ void scalarShortYieldsVNumber() {
+ DataExtractIterator iter = iteratorForType(PayloadType.SCALAR_SHORT);
+ VType result = iter.extract(numericMsg(100));
+ assertInstanceOf(VNumber.class, result);
+ }
+
+ @Test
+ void scalarFloatYieldsVNumber() {
+ DataExtractIterator iter = iteratorForType(PayloadType.SCALAR_FLOAT);
+ VType result = iter.extract(numericMsg(1.5f));
+ assertInstanceOf(VNumber.class, result);
+ }
+
+ // ---- enum ----
+
+ @Test
+ void scalarEnumYieldsVEnum() {
+ DataExtractIterator iter = iteratorForType(PayloadType.SCALAR_ENUM,
+ FieldValue.newBuilder().setName("ENUM_0").setVal("Off").build(),
+ FieldValue.newBuilder().setName("ENUM_1").setVal("On").build());
+ VType result = iter.extract(numericMsg(1));
+ assertInstanceOf(VEnum.class, result);
+ assertEquals(1, ((VEnum) result).getIndex());
+ }
+
+ // ---- alarm ----
+
+ @Test
+ void alarmSeverityPreserved() {
+ DataExtractIterator iter = iteratorForType(PayloadType.SCALAR_DOUBLE);
+ EpicsMessage msg = mock(EpicsMessage.class);
+ when(msg.getNumberValue()).thenReturn(0.0);
+ when(msg.getSeverity()).thenReturn(2); // 2 = MAJOR
+ when(msg.getStatus()).thenReturn(0);
+ when(msg.getTimestamp()).thenReturn(new Timestamp(System.currentTimeMillis()));
+ VType result = iter.extract(msg);
+ assertInstanceOf(VNumber.class, result);
+ assertEquals(AlarmSeverity.MAJOR, ((VNumber) result).getAlarm().getSeverity());
+ }
+
+ // ---- display metadata ----
+
+ @Test
+ void displayHeadersExtracted() {
+ DataExtractIterator iter = iteratorForType(PayloadType.SCALAR_DOUBLE,
+ FieldValue.newBuilder().setName("EGU").setVal("mA").build(),
+ FieldValue.newBuilder().setName("LOPR").setVal("0.0").build(),
+ FieldValue.newBuilder().setName("HOPR").setVal("100.0").build());
+ VType result = iter.extract(numericMsg(50.0));
+ assertInstanceOf(VNumber.class, result);
+ VNumber vn = (VNumber) result;
+ assertEquals("mA", vn.getDisplay().getUnit());
+ assertEquals(0.0, vn.getDisplay().getDisplayRange().getMinimum(), 0.001);
+ assertEquals(100.0, vn.getDisplay().getDisplayRange().getMaximum(), 0.001);
+ }
+
+ // ---- enum label ordering ----
+
+ @Test
+ void enumLabelsExtractedAndSorted() {
+ // Labels supplied out-of-order — extractData must return them sorted by index number
+ DataExtractIterator iter = iteratorForType(PayloadType.SCALAR_ENUM,
+ FieldValue.newBuilder().setName("ENUM_2").setVal("Error").build(),
+ FieldValue.newBuilder().setName("ENUM_0").setVal("Off").build(),
+ FieldValue.newBuilder().setName("ENUM_1").setVal("On").build());
+ VType result = iter.extract(numericMsg(0));
+ assertInstanceOf(VEnum.class, result);
+ assertEquals("Off", ((VEnum) result).getDisplay().getChoices().get(0));
+ assertEquals("On", ((VEnum) result).getDisplay().getChoices().get(1));
+ assertEquals("Error", ((VEnum) result).getDisplay().getChoices().get(2));
+ }
+
+ // ---- string and waveform types via real protobuf messages ----
+
+ @Test
+ void scalarStringYieldsVString() {
+ EPICSEvent.ScalarString pb = EPICSEvent.ScalarString.newBuilder()
+ .setSecondsintoyear(0).setNano(0).setVal("test-value").build();
+ DataExtractIterator iter = iteratorForType(PayloadType.SCALAR_STRING);
+ VType result = iter.extract(pbMsg(pb));
+ assertInstanceOf(VString.class, result);
+ assertEquals("test-value", ((VString) result).getValue());
+ }
+
+ @Test
+ void waveformDoubleYieldsVDoubleArray() {
+ EPICSEvent.VectorDouble pb = EPICSEvent.VectorDouble.newBuilder()
+ .setSecondsintoyear(0).setNano(0)
+ .addVal(1.0).addVal(2.0).addVal(3.0).build();
+ DataExtractIterator iter = iteratorForType(PayloadType.WAVEFORM_DOUBLE);
+ VType result = iter.extract(pbMsg(pb));
+ assertInstanceOf(VDoubleArray.class, result);
+ VDoubleArray vda = (VDoubleArray) result;
+ assertEquals(3, vda.getData().size());
+ assertEquals(1.0, vda.getData().getDouble(0), 0.001);
+ assertEquals(2.0, vda.getData().getDouble(1), 0.001);
+ assertEquals(3.0, vda.getData().getDouble(2), 0.001);
+ }
+
+ @Test
+ void waveformIntYieldsVIntArray() {
+ EPICSEvent.VectorInt pb = EPICSEvent.VectorInt.newBuilder()
+ .setSecondsintoyear(0).setNano(0)
+ .addVal(10).addVal(20).addVal(30).build();
+ DataExtractIterator iter = iteratorForType(PayloadType.WAVEFORM_INT);
+ VType result = iter.extract(pbMsg(pb));
+ assertInstanceOf(VIntArray.class, result);
+ VIntArray via = (VIntArray) result;
+ assertEquals(3, via.getData().size());
+ assertEquals(10, via.getData().getInt(0));
+ assertEquals(20, via.getData().getInt(1));
+ assertEquals(30, via.getData().getInt(2));
+ }
+
+ @Test
+ void waveformByteYieldsVByteArray() {
+ EPICSEvent.VectorChar pb = EPICSEvent.VectorChar.newBuilder()
+ .setSecondsintoyear(0).setNano(0)
+ .setVal(ByteString.copyFrom(new byte[]{1, 2, 3})).build();
+ DataExtractIterator iter = iteratorForType(PayloadType.WAVEFORM_BYTE);
+ VType result = iter.extract(pbMsg(pb));
+ assertInstanceOf(VByteArray.class, result);
+ VByteArray vba = (VByteArray) result;
+ assertEquals(3, vba.getData().size());
+ assertEquals((byte) 1, vba.getData().getByte(0));
+ assertEquals((byte) 2, vba.getData().getByte(1));
+ assertEquals((byte) 3, vba.getData().getByte(2));
+ }
+}
diff --git a/app/trends/archive-reader/src/test/java/org/phoebus/archive/reader/appliance/BlockingGenMsgIterator.java b/app/trends/archive-reader/src/test/java/org/phoebus/archive/reader/appliance/BlockingGenMsgIterator.java
new file mode 100644
index 0000000000..8f68992b32
--- /dev/null
+++ b/app/trends/archive-reader/src/test/java/org/phoebus/archive/reader/appliance/BlockingGenMsgIterator.java
@@ -0,0 +1,35 @@
+package org.phoebus.archive.reader.appliance;
+
+import org.epics.archiverappliance.retrieval.client.EpicsMessage;
+
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.concurrent.CountDownLatch;
+
+/**
+ * Iterator whose hasNext() blocks until released, for concurrency tests.
+ *
+ * The caller awaits {@link #entered} to confirm the worker thread is inside
+ * hasNext(), then counts down {@link #release} to unblock it.
+ */
+class BlockingGenMsgIterator implements Iterator {
+
+ final CountDownLatch entered = new CountDownLatch(1);
+ final CountDownLatch release = new CountDownLatch(1);
+
+ @Override
+ public boolean hasNext() {
+ entered.countDown();
+ try {
+ release.await();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ return false;
+ }
+
+ @Override
+ public EpicsMessage next() {
+ throw new NoSuchElementException();
+ }
+}
diff --git a/app/trends/archive-reader/src/test/java/org/phoebus/archive/reader/appliance/FakeApplianceArchiveReader.java b/app/trends/archive-reader/src/test/java/org/phoebus/archive/reader/appliance/FakeApplianceArchiveReader.java
new file mode 100644
index 0000000000..437efbf42b
--- /dev/null
+++ b/app/trends/archive-reader/src/test/java/org/phoebus/archive/reader/appliance/FakeApplianceArchiveReader.java
@@ -0,0 +1,26 @@
+package org.phoebus.archive.reader.appliance;
+
+import org.epics.archiverappliance.retrieval.client.DataRetrieval;
+
+/**
+ * ApplianceArchiveReader that injects a provided DataRetrieval, avoiding any real network calls.
+ */
+class FakeApplianceArchiveReader extends ApplianceArchiveReader {
+
+ private final DataRetrieval retrieval;
+
+ FakeApplianceArchiveReader(DataRetrieval retrieval) {
+ super("pbraw://fake-host:17668", false, true);
+ this.retrieval = retrieval;
+ }
+
+ FakeApplianceArchiveReader(DataRetrieval retrieval, boolean useStatistics, boolean useNewOptimizedOperator) {
+ super("pbraw://fake-host:17668", useStatistics, useNewOptimizedOperator);
+ this.retrieval = retrieval;
+ }
+
+ @Override
+ public DataRetrieval createDataRetriveal(String url) {
+ return retrieval;
+ }
+}
diff --git a/app/trends/archive-reader/src/test/java/org/phoebus/archive/reader/appliance/FakeDataRetrieval.java b/app/trends/archive-reader/src/test/java/org/phoebus/archive/reader/appliance/FakeDataRetrieval.java
new file mode 100644
index 0000000000..1c96437eeb
--- /dev/null
+++ b/app/trends/archive-reader/src/test/java/org/phoebus/archive/reader/appliance/FakeDataRetrieval.java
@@ -0,0 +1,47 @@
+package org.phoebus.archive.reader.appliance;
+
+import org.epics.archiverappliance.retrieval.client.DataRetrieval;
+import org.epics.archiverappliance.retrieval.client.GenMsgIterator;
+
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Concrete DataRetrieval subclass for testing.
+ *
+ * All three getDataForPV overloads are final and delegate to getDataForPVs,
+ * so overriding getDataForPVs intercepts every call. PV names are recorded in
+ * pvsCalled; responses can be stubbed by PV-name substring via whenPvContains().
+ */
+class FakeDataRetrieval extends DataRetrieval {
+
+ final List pvsCalled = new ArrayList<>();
+
+ private GenMsgIterator defaultResponse;
+ private final Map responses = new LinkedHashMap<>();
+
+ FakeDataRetrieval(GenMsgIterator defaultResponse) {
+ this.defaultResponse = defaultResponse;
+ }
+
+ /** Return {@code response} whenever the requested PV name contains {@code substring}. */
+ void whenPvContains(String substring, GenMsgIterator response) {
+ responses.put(substring, response);
+ }
+
+ @Override
+ public GenMsgIterator getDataForPVs(List pvNames, Timestamp start, Timestamp end,
+ boolean fetchLatestMetadata, Map otherParams) {
+ String pv = pvNames.isEmpty() ? "" : pvNames.get(0);
+ pvsCalled.add(pv);
+ for (Map.Entry entry : responses.entrySet()) {
+ if (pv.contains(entry.getKey())) {
+ return entry.getValue();
+ }
+ }
+ return defaultResponse;
+ }
+}