diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/app/store/Store.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/app/store/Store.java index 73c030f15e35..88f8874f6dd0 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/app/store/Store.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/app/store/Store.java @@ -29,6 +29,7 @@ import io.cdap.cdap.common.ConflictException; import io.cdap.cdap.common.NotFoundException; import io.cdap.cdap.common.ProgramNotFoundException; +import io.cdap.cdap.internal.app.store.AppSummary; import io.cdap.cdap.internal.app.store.ApplicationMeta; import io.cdap.cdap.internal.app.store.RunRecordDetail; import io.cdap.cdap.internal.app.store.WorkflowTable; @@ -495,6 +496,17 @@ void updateApplicationSourceControlMeta(Map up boolean scanApplications(ScanApplicationsRequest request, int txBatchSize, BiConsumer consumer); + /** + * Scans for application summaries according to the parameters passed in request. + * + * @param request parameters defining filters and sorting + * @param txBatchSize maximum number of applications to scan in one transaction + * @param consumer a {@link Consumer} to consume each application summary being scanned + * @return if limit was reached (true) or all items were scanned before reaching the limit (false) + */ + boolean scanApplicationSummaries(ScanApplicationsRequest request, int txBatchSize, + Consumer consumer); + /** * Returns a Map of {@link ApplicationMeta} for the given set of {@link ApplicationId}. * diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/gateway/handlers/ProgramLifecycleHttpHandler.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/gateway/handlers/ProgramLifecycleHttpHandler.java index 46c7535a50ac..e6259a98b422 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/gateway/handlers/ProgramLifecycleHttpHandler.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/gateway/handlers/ProgramLifecycleHttpHandler.java @@ -16,7 +16,10 @@ package io.cdap.cdap.gateway.handlers; +import com.google.common.base.Splitter; +import com.google.common.base.Strings; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; import com.google.gson.Gson; import com.google.gson.GsonBuilder; import com.google.gson.reflect.TypeToken; @@ -26,6 +29,8 @@ import io.cdap.cdap.api.app.ApplicationSpecification; import io.cdap.cdap.api.service.ServiceUnavailableException; import io.cdap.cdap.app.mapreduce.MRJobInfoFetcher; +import io.cdap.cdap.app.store.ApplicationFilter; +import io.cdap.cdap.app.store.ScanApplicationsRequest; import io.cdap.cdap.app.store.Store; import io.cdap.cdap.common.BadRequestException; import io.cdap.cdap.common.ConflictException; @@ -60,10 +65,13 @@ import io.cdap.cdap.proto.RunRecord; import io.cdap.cdap.proto.id.ApplicationId; import io.cdap.cdap.proto.id.ApplicationReference; +import io.cdap.cdap.proto.id.EntityId; +import io.cdap.cdap.proto.id.NamespaceId; import io.cdap.cdap.proto.id.ProgramId; import io.cdap.cdap.proto.id.ProgramReference; import io.cdap.cdap.proto.id.ProgramRunId; import io.cdap.cdap.security.spi.authorization.UnauthorizedException; +import io.cdap.cdap.spi.data.SortOrder; import io.cdap.http.HttpResponder; import io.netty.handler.codec.http.FullHttpRequest; import io.netty.handler.codec.http.HttpRequest; @@ -71,11 +79,15 @@ import java.io.IOException; import java.lang.reflect.Type; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.EnumSet; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Objects; +import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import javax.ws.rs.DefaultValue; @@ -1064,6 +1076,108 @@ public void getAllWorkers(HttpRequest request, HttpResponder responder, ProgramType.WORKER))); } + /** + * Returns a summary of pipelines in the namespace, including metadata, run status, and run + * counts. + * + *

This endpoint is optimized to avoid full deserialization of application specifications. + * + * @param request the HTTP request + * @param responder the HTTP responder used to send the JSON response + * @param namespaceId the ID of the namespace to scan + * @param artifactName optional comma-separated list of artifact names to filter by + * @param artifactVersion optional artifact version to filter by + * @param pageToken optional page token for pagination + * @param pageSize optional page size, defaults to 25 + * @param orderBy optional sort order + * @param nameFilter optional filter for application name + * @param nameFilterType optional type of name filter (e.g., EQUALS, CONTAINS) + * @param latestOnly optional flag to return only the latest version, defaults to true + * @param sortCreationTime optional flag to sort by creation time + * @throws Exception if any error occurs during execution + */ + @GET + @Path("/apps/summary") + public void getPipelineSummaries(HttpRequest request, HttpResponder responder, + @PathParam("namespace-id") String namespaceId, + @QueryParam("artifactName") String artifactName, + @QueryParam("artifactVersion") String artifactVersion, + @QueryParam("pageToken") String pageToken, + @QueryParam("pageSize") @DefaultValue("25") Integer pageSize, + @QueryParam("orderBy") SortOrder orderBy, + @QueryParam("nameFilter") String nameFilter, + @QueryParam("nameFilterType") NameFilterType nameFilterType, + @QueryParam("latestOnly") @DefaultValue("true") Boolean latestOnly, + @QueryParam("sortCreationTime") Boolean sortCreationTime) throws Exception { + + ScanApplicationsRequest scanRequest = getSummaryScanRequest(namespaceId, artifactName, + artifactVersion, + pageToken, pageSize, orderBy, nameFilter, nameFilterType, latestOnly, sortCreationTime); + + responder.sendJson(HttpResponseStatus.OK, + ProgramHandlerUtil.toJson(lifecycleService.getAppSummaries(scanRequest))); + } + + private ScanApplicationsRequest getSummaryScanRequest(String namespaceId, String artifactName, + String artifactVersion, String pageToken, + Integer pageSize, SortOrder orderBy, String nameFilter, + NameFilterType nameFilterType, Boolean latestOnly, Boolean sortCreationTime) { + Set names = new HashSet<>(); + if (!Strings.isNullOrEmpty(artifactName)) { + for (String name : Splitter.on(',').omitEmptyStrings().trimResults() + .split(Objects.requireNonNull(artifactName))) { + names.add(name); + } + } + + ScanApplicationsRequest.Builder builder = ScanApplicationsRequest.builder(); + builder.setNamespaceId(new NamespaceId(namespaceId)); + if (pageSize != null) { + builder.setLimit(pageSize); + } + if (latestOnly != null) { + builder.setLatestOnly(latestOnly); + } + if (orderBy != null) { + builder.setSortOrder(orderBy); + } + if (sortCreationTime != null) { + builder.setSortCreationTime(sortCreationTime); + } + if (!Strings.isNullOrEmpty(pageToken)) { + builder.setScanFrom(ApplicationId.fromIdParts(Iterables.concat( + Collections.singleton(namespaceId), + Arrays.asList(EntityId.IDSTRING_PART_SEPARATOR_PATTERN.split(pageToken.trim()))))); + } + + List filters = new ArrayList<>(); + if (!names.isEmpty()) { + filters.add(new ApplicationFilter.ArtifactNamesInFilter(names)); + } + if (!Strings.isNullOrEmpty(artifactVersion)) { + filters.add(new ApplicationFilter.ArtifactVersionFilter(artifactVersion.trim())); + } + if (!Strings.isNullOrEmpty(nameFilter)) { + String cleanNameFilter = nameFilter.trim(); + NameFilterType type = nameFilterType != null ? nameFilterType : NameFilterType.CONTAINS; + + switch (type) { + case EQUALS: + builder.setApplicationReference(new ApplicationReference(namespaceId, cleanNameFilter)); + break; + case EQUALS_IGNORE_CASE: + filters.add(new ApplicationFilter.ApplicationIdEqualsFilter(cleanNameFilter)); + break; + case CONTAINS: + default: + filters.add(new ApplicationFilter.ApplicationIdContainsFilter(cleanNameFilter)); + break; + } + } + builder.addFilters(filters); + return builder.build(); + } + /** * Return the availability (i.e. discoverable registration) status of a service. */ diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/ProgramLifecycleService.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/ProgramLifecycleService.java index 8ee093574b07..94d491aa2a62 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/ProgramLifecycleService.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/ProgramLifecycleService.java @@ -69,6 +69,10 @@ import io.cdap.cdap.proto.ProgramType; import io.cdap.cdap.proto.RunCountResult; import io.cdap.cdap.proto.RunRecord; +import io.cdap.cdap.proto.AppSummaryRecord; +import io.cdap.cdap.proto.AppSummaryResponse; +import io.cdap.cdap.api.artifact.ArtifactSummary; +import io.cdap.cdap.internal.app.store.AppSummary; import io.cdap.cdap.proto.id.ApplicationId; import io.cdap.cdap.proto.id.ApplicationReference; import io.cdap.cdap.proto.id.EntityId; @@ -100,6 +104,7 @@ import java.util.LinkedHashSet; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.concurrent.ExecutionException; @@ -144,6 +149,7 @@ public class ProgramLifecycleService { private final int maxConcurrentLaunching; private final int defaultStopTimeoutSecs; private final int batchSize; + private final int summaryBatchSize; private final boolean userProgramLaunchDisabled; @@ -170,6 +176,7 @@ public class ProgramLifecycleService { this.userProgramLaunchDisabled = cConf.getBoolean( Constants.AppFabric.USER_PROGRAM_LAUNCH_DISABLED, false); this.batchSize = cConf.getInt(Constants.AppFabric.STREAMING_BATCH_SIZE); + this.summaryBatchSize = cConf.getInt(Constants.AppFabric.SUMMARY_STREAMING_BATCH_SIZE); this.profileService = profileService; this.preferencesService = preferencesService; this.provisionerNotifier = provisionerNotifier; @@ -177,6 +184,127 @@ public class ProgramLifecycleService { this.flowControlService = flowControlService; } + /** + * Returns a summary of applications in the namespace, including metadata, run status, and run + * counts. This method is optimized to avoid full deserialization of application specifications. + * + * @param request the scan request + * @return the response with pipelines and next page token + */ + public AppSummaryResponse getAppSummaries(final ScanApplicationsRequest request) + throws Exception { + List appSummaries = new ArrayList<>(); + store.scanApplicationSummaries(request, summaryBatchSize, appSummaries::add); + if (appSummaries.isEmpty()) { + return new AppSummaryResponse(Collections.emptyList(), null); + } + + List visibleAppSummaries = filterVisibleSummaries(appSummaries); + List visibleProgramReferences = visibleAppSummaries.stream() + .map(AppSummary::getPrimaryProgram) + .filter(Objects::nonNull) + .map(ProgramId::getProgramReference) + .collect(Collectors.toList()); + + Map runCounts = fetchRunCounts(visibleProgramReferences); + Map latestRuns = fetchLatestRuns(visibleProgramReferences); + List pipelines = createResponseRecords(visibleAppSummaries, runCounts, + latestRuns); + + String nextPageToken = generateNextPageToken(appSummaries); + return new AppSummaryResponse(pipelines, nextPageToken); + } + + private List filterVisibleSummaries(List appSummaries) { + Set allProgramIds = appSummaries.stream() + .map(AppSummary::getPrimaryProgram) + .filter(Objects::nonNull) + .collect(Collectors.toSet()); + Set visiblePrograms = allProgramIds.isEmpty() + ? Collections.emptySet() + : accessEnforcer.isVisible(allProgramIds, authenticationContext.getPrincipal()); + + return appSummaries.stream() + .filter(summary -> { + ProgramId programId = summary.getPrimaryProgram(); + return programId == null || visiblePrograms.contains(programId); + }) + .collect(Collectors.toList()); + } + + private Map fetchRunCounts(List programRefs) + throws Exception { + Map runCounts = new HashMap<>(); + if (programRefs.isEmpty()) { + return runCounts; + } + + for (RunCountResult countResult : getProgramTotalRunCounts(programRefs)) { + if (countResult.getException() == null) { + runCounts.put(countResult.getProgramReference(), countResult.getCount()); + } + } + return runCounts; + } + + private Map fetchLatestRuns(List programRefs) + throws Exception { + Map latestRuns = new HashMap<>(); + if (programRefs.isEmpty()) { + return latestRuns; + } + + List histories = getRunRecords( + programRefs, ProgramRunStatus.ALL, 0, Long.MAX_VALUE, 1); + for (ProgramHistory history : histories) { + if (history.getException() == null && !history.getRuns().isEmpty()) { + latestRuns.put(history.getProgramId().getProgramReference(), history.getRuns().get(0)); + } + } + return latestRuns; + } + + private List createResponseRecords( + List visibleAppSummaries, + Map runCounts, + Map latestRuns) { + return visibleAppSummaries.stream().map(summary -> { + ProgramId programId = summary.getPrimaryProgram(); + ArtifactSummary artifactSummary = ArtifactSummary.from(summary.getArtifactId()); + if (programId == null) { + return new AppSummaryRecord( + summary.getAppId().getApplication(), + summary.getAppId().getVersion(), + summary.getDescription(), + artifactSummary, + 0L, + null + ); + } + + ProgramReference programRef = programId.getProgramReference(); + return new AppSummaryRecord( + summary.getAppId().getApplication(), + summary.getAppId().getVersion(), + summary.getDescription(), + artifactSummary, + runCounts.getOrDefault(programRef, 0L), + latestRuns.get(programRef) + ); + }).collect(Collectors.toList()); + } + + private String generateNextPageToken(List appSummaries) { + ProgramId lastProgramId = appSummaries.get(appSummaries.size() - 1).getPrimaryProgram(); + if (lastProgramId == null) { + return null; + } + + return lastProgramId.getApplication() + + EntityId.IDSTRING_PART_SEPARATOR + + lastProgramId.getVersion(); + } + /** * Returns the status of the latest version program. * diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/store/AppMetadataStore.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/store/AppMetadataStore.java index b610fba5b08b..f3c9caf36ad7 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/store/AppMetadataStore.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/store/AppMetadataStore.java @@ -3134,10 +3134,22 @@ public static final class Cursor { } } - private static final class AppScanEntry implements Map.Entry { + static final class AppScanEntry implements Map.Entry { private static final String SPEC_KEY = "spec"; private static final String ARTIFACT_ID_KEY = "artifactId"; + private static final String DESCRIPTION_KEY = "description"; + private static final String WORKFLOWS_KEY = "workflows"; + private static final String SPARKS_KEY = "sparks"; + private static final String MAPREDUCE_KEY = "mapReduces"; + private static final String SERVICES_KEY = "services"; + private static final String WORKERS_KEY = "workers"; + private static final Map KEY_TO_TYPE_MAP = ImmutableMap.of( + WORKFLOWS_KEY, ProgramType.WORKFLOW, + SPARKS_KEY, ProgramType.SPARK, + MAPREDUCE_KEY, ProgramType.MAPREDUCE, + SERVICES_KEY, ProgramType.SERVICE, + WORKERS_KEY, ProgramType.WORKER); private final ApplicationId appId; private final String rawAppMeta; @@ -3150,6 +3162,7 @@ private static final class AppScanEntry implements Map.Entry getArtifactId() { return Optional.empty(); } + /** + * Returns the {@link AppSummary} by parsing the raw application metadata JSON string. This + * method uses a streaming parser to avoid full deserialization of the application + * specification. + * + * @return the {@link AppSummary} if parsing is successful + * @throws IllegalStateException if parsing fails + */ + public Optional getAppSummary() { + if (appSummary != null) { + return Optional.of(appSummary); + } + + ArtifactId parsedArtifactId = artifactId; + String description = null; + ProgramId primaryProgram = null; + + try (JsonReader reader = new JsonReader(new StringReader(rawAppMeta))) { + reader.beginObject(); + advanceToSpec(reader); + while (reader.hasNext()) { + String name = reader.nextName(); + ProgramType matchedProgramType = KEY_TO_TYPE_MAP.get(name); + + if (ARTIFACT_ID_KEY.equals(name)) { + parsedArtifactId = GSON.fromJson(reader, ArtifactId.class); + artifactId = parsedArtifactId; + } else if (DESCRIPTION_KEY.equals(name)) { + description = reader.nextString(); + } else if (matchedProgramType != null) { + primaryProgram = extractPrimaryProgram(reader, matchedProgramType, primaryProgram); + } else { + reader.skipValue(); + } + + if (isParseComplete(parsedArtifactId, description, primaryProgram)) { + break; + } + } + } catch (IOException | IllegalStateException e) { + throw new IllegalStateException("Failed to parse app summary from app meta", e); + } + + appSummary = new AppSummary(appId, parsedArtifactId, description, primaryProgram); + return Optional.of(appSummary); + } + + private void advanceToSpec(JsonReader reader) throws IOException { + while (reader.hasNext()) { + if (SPEC_KEY.equals(reader.nextName())) { + reader.beginObject(); + return; + } + reader.skipValue(); + } + } + + private ProgramId extractPrimaryProgram(JsonReader reader, ProgramType type, + ProgramId currentPrimary) + throws IOException { + reader.beginObject(); + ProgramId result = currentPrimary; + while (reader.hasNext()) { + String programName = reader.nextName(); + if (result == null) { + result = appId.program(type, programName); + } + reader.skipValue(); + } + + reader.endObject(); + return result; + } + + private boolean isParseComplete(ArtifactId artifactId, String description, + ProgramId programId) { + return artifactId != null && description != null && programId != null; + } + @Override public ApplicationMeta setValue(ApplicationMeta value) { throw new UnsupportedOperationException(); diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/store/AppSummary.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/store/AppSummary.java new file mode 100644 index 000000000000..7e1e902b41e4 --- /dev/null +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/store/AppSummary.java @@ -0,0 +1,76 @@ +/* + * Copyright © 2026 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.cdap.internal.app.store; + +import io.cdap.cdap.api.artifact.ArtifactId; +import io.cdap.cdap.proto.id.ApplicationId; +import io.cdap.cdap.proto.id.ProgramId; + +/** + * A summary of an application, containing its artifact ID, description, and primary program. + */ +public class AppSummary { + + private final ApplicationId appId; + private final ArtifactId artifactId; + private final String description; + private final ProgramId primaryProgram; + + /** + * Constructs an instance of {@link AppSummary}. + * + * @param appId the application ID + * @param artifactId the artifact ID of the application + * @param description the description of the application + * @param primaryProgram the primary program of the application, or null if none + */ + public AppSummary(ApplicationId appId, ArtifactId artifactId, String description, + ProgramId primaryProgram) { + this.appId = appId; + this.artifactId = artifactId; + this.description = description; + this.primaryProgram = primaryProgram; + } + + /** + * Returns the artifact ID of the application. + */ + public ArtifactId getArtifactId() { + return artifactId; + } + + /** + * Returns the description of the application. + */ + public String getDescription() { + return description; + } + + /** + * Returns the primary program of the application. + */ + public ProgramId getPrimaryProgram() { + return primaryProgram; + } + + /** + * Returns the application ID. + */ + public ApplicationId getAppId() { + return appId; + } +} diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/store/DefaultStore.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/store/DefaultStore.java index 57fb3c102cef..e27447b70c10 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/store/DefaultStore.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/store/DefaultStore.java @@ -89,6 +89,8 @@ import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.Predicate; +import java.util.function.Function; +import java.util.function.BiFunction; import java.util.stream.Collectors; import javax.annotation.Nullable; import org.apache.twill.api.RunId; @@ -810,7 +812,29 @@ public void scanApplications(int txBatchSize, @Override public boolean scanApplications(ScanApplicationsRequest request, int txBatchSize, BiConsumer consumer) { + return scanApplicationsInternal(request, txBatchSize, + Optional::of, + entry -> consumer.accept(entry.getKey(), entry.getValue()), + (req, batch) -> scanApplicationsWithReorder(req, batch, consumer)); + } + + @Override + public boolean scanApplicationSummaries(ScanApplicationsRequest request, int txBatchSize, + Consumer consumer) { + return scanApplicationsInternal(request, txBatchSize, + entry -> ((AppMetadataStore.AppScanEntry) entry).getAppSummary(), + consumer, + (req, batch) -> { + // TODO: Implement in-memory reordering for DESC sort if needed. + throw new UnsupportedOperationException( + "DESC sort order is not supported yet for summaries"); + }); + } + private boolean scanApplicationsInternal(ScanApplicationsRequest request, int txBatchSize, + Function, Optional> extractor, + Consumer consumer, + BiFunction fallback) { AtomicReference requestRef = new AtomicReference<>(request); AtomicReference lastKey = new AtomicReference<>(); AtomicInteger currentLimit = new AtomicInteger(request.getLimit()); @@ -823,7 +847,7 @@ public boolean scanApplications(ScanApplicationsRequest request, int txBatchSize getAppMetadataStore(context).scanApplications(requestRef.get(), entry -> { lastKey.set(entry.getKey()); currentLimit.decrementAndGet(); - consumer.accept(entry.getKey(), entry.getValue()); + extractor.apply(entry).ifPresent(consumer); return count.incrementAndGet() < txBatchSize && currentLimit.get() > 0; }); }); @@ -831,7 +855,7 @@ public boolean scanApplications(ScanApplicationsRequest request, int txBatchSize if (requestRef.get().getSortOrder() != SortOrder.DESC || count.get() != 0) { throw e; } - return scanApplicationsWithReorder(requestRef.get(), txBatchSize, consumer); + return fallback.apply(requestRef.get(), txBatchSize); } if (lastKey.get() == null) { diff --git a/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/services/http/handlers/ProgramLifecycleHttpHandlerTest.java b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/services/http/handlers/ProgramLifecycleHttpHandlerTest.java index e88cc83c14f9..e4940a4ae425 100644 --- a/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/services/http/handlers/ProgramLifecycleHttpHandlerTest.java +++ b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/services/http/handlers/ProgramLifecycleHttpHandlerTest.java @@ -72,6 +72,8 @@ import io.cdap.cdap.proto.ProgramType; import io.cdap.cdap.proto.ProtoConstraint; import io.cdap.cdap.proto.ProtoTrigger; +import io.cdap.cdap.proto.AppSummaryRecord; +import io.cdap.cdap.proto.AppSummaryResponse; import io.cdap.cdap.proto.RunRecord; import io.cdap.cdap.proto.ScheduleDetail; import io.cdap.cdap.proto.ServiceInstances; @@ -1994,6 +1996,109 @@ private void verifyRuntimeArgs(String url) throws Exception { Assert.assertEquals(0, argsRead.size()); } + @Test + public void testPipelineSummary() throws Exception { + deploy(AppWithWorkflow.class, 200, Constants.Gateway.API_VERSION_3_TOKEN, TEST_NAMESPACE1); + ApplicationDetail appDetails = getAppDetails(TEST_NAMESPACE1, AppWithWorkflow.NAME); + ProgramId programId = new NamespaceId(TEST_NAMESPACE1) + .app(AppWithWorkflow.NAME, appDetails.getAppVersion()) + .workflow(AppWithWorkflow.SampleWorkflow.NAME); + + String path = "apps/summary"; + HttpResponse response = doGet(getVersionedApiPath(path, TEST_NAMESPACE1)); + Assert.assertEquals(200, response.getResponseCode()); + + AppSummaryResponse summaryResponse = GSON.fromJson(response.getResponseBodyAsString(), + AppSummaryResponse.class); + Assert.assertNotNull(summaryResponse); + Assert.assertFalse(summaryResponse.getApps().isEmpty()); + + AppSummaryRecord appRecord = summaryResponse.getApps().stream() + .filter(r -> r.getName().equals(AppWithWorkflow.NAME)) + .findFirst().orElse(null); + Assert.assertNotNull(appRecord); + Assert.assertEquals(AppWithWorkflow.NAME, appRecord.getName()); + Assert.assertEquals(0L, appRecord.getTotalRunCount()); + Assert.assertNull(appRecord.getLatestRun()); + + startProgram(programId, 200); + waitState(programId, RUNNING); + + stopProgram(Id.Program.fromEntityId(programId), 200); + waitState(programId, STOPPED); + + response = doGet(getVersionedApiPath(path, TEST_NAMESPACE1)); + Assert.assertEquals(200, response.getResponseCode()); + summaryResponse = GSON.fromJson(response.getResponseBodyAsString(), AppSummaryResponse.class); + appRecord = summaryResponse.getApps().stream() + .filter(r -> r.getName().equals(AppWithWorkflow.NAME)) + .findFirst().orElse(null); + Assert.assertNotNull(appRecord); + Assert.assertTrue(appRecord.getTotalRunCount() >= 1); + Assert.assertNotNull(appRecord.getLatestRun()); + + path = "apps/summary?nameFilter=" + AppWithWorkflow.NAME; + response = doGet(getVersionedApiPath(path, TEST_NAMESPACE1)); + Assert.assertEquals(200, response.getResponseCode()); + summaryResponse = GSON.fromJson(response.getResponseBodyAsString(), AppSummaryResponse.class); + Assert.assertEquals(1, summaryResponse.getApps().size()); + Assert.assertEquals(AppWithWorkflow.NAME, summaryResponse.getApps().get(0).getName()); + + path = "apps/summary?pageSize=1"; + response = doGet(getVersionedApiPath(path, TEST_NAMESPACE1)); + Assert.assertEquals(200, response.getResponseCode()); + summaryResponse = GSON.fromJson(response.getResponseBodyAsString(), AppSummaryResponse.class); + Assert.assertEquals(1, summaryResponse.getApps().size()); + } + + @Test + public void testPipelineSummaryWithFilters() throws Exception { + Id.Artifact artifactId1 = Id.Artifact.from(Id.Namespace.from(TEST_NAMESPACE1), "art1", "1.0.0"); + addAppArtifact(artifactId1, AppWithWorkflow.class); + AppRequest request1 = new AppRequest<>(new ArtifactSummary("art1", "1.0.0"), null); + ApplicationId appId1 = new ApplicationId(TEST_NAMESPACE1, "app1"); + Assert.assertEquals(200, deploy(appId1, request1).getResponseCode()); + + Id.Artifact artifactId2 = Id.Artifact.from(Id.Namespace.from(TEST_NAMESPACE1), "art2", "1.0.0"); + addAppArtifact(artifactId2, AppWithWorkflow.class); + AppRequest request2 = new AppRequest<>(new ArtifactSummary("art2", "1.0.0"), null); + ApplicationId appId2 = new ApplicationId(TEST_NAMESPACE1, "app2"); + Assert.assertEquals(200, deploy(appId2, request2).getResponseCode()); + + String path = "apps/summary?artifactName=art1"; + HttpResponse response = doGet(getVersionedApiPath(path, TEST_NAMESPACE1)); + Assert.assertEquals(200, response.getResponseCode()); + AppSummaryResponse summaryResponse = GSON.fromJson(response.getResponseBodyAsString(), + AppSummaryResponse.class); + Assert.assertEquals(1, summaryResponse.getApps().size()); + Assert.assertEquals("app1", summaryResponse.getApps().get(0).getName()); + + path = "apps/summary?artifactName=art1,art2"; + response = doGet(getVersionedApiPath(path, TEST_NAMESPACE1)); + Assert.assertEquals(200, response.getResponseCode()); + summaryResponse = GSON.fromJson(response.getResponseBodyAsString(), AppSummaryResponse.class); + Assert.assertEquals(2, summaryResponse.getApps().size()); + + path = "apps/summary?artifactVersion=1.0.0"; + response = doGet(getVersionedApiPath(path, TEST_NAMESPACE1)); + Assert.assertEquals(200, response.getResponseCode()); + summaryResponse = GSON.fromJson(response.getResponseBodyAsString(), AppSummaryResponse.class); + Assert.assertEquals(2, summaryResponse.getApps().size()); + + path = "apps/summary?nameFilter=APP1&nameFilterType=EQUALS_IGNORE_CASE"; + response = doGet(getVersionedApiPath(path, TEST_NAMESPACE1)); + Assert.assertEquals(200, response.getResponseCode()); + summaryResponse = GSON.fromJson(response.getResponseBodyAsString(), AppSummaryResponse.class); + Assert.assertEquals(1, summaryResponse.getApps().size()); + Assert.assertEquals("app1", summaryResponse.getApps().get(0).getName()); + + path = "apps/summary?nameFilter=APP1&nameFilterType=EQUALS"; + response = doGet(getVersionedApiPath(path, TEST_NAMESPACE1)); + Assert.assertEquals(200, response.getResponseCode()); + summaryResponse = GSON.fromJson(response.getResponseBodyAsString(), AppSummaryResponse.class); + Assert.assertEquals(0, summaryResponse.getApps().size()); + } + private int getResponseStatusCode(String json, int position) { return ((List) GSON.fromJson(json, new TypeToken>() { }.getType())) .get(position).getStatusCode(); diff --git a/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/store/DefaultStoreTest.java b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/store/DefaultStoreTest.java index 29054c9b0c62..fe44e93e6d36 100644 --- a/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/store/DefaultStoreTest.java +++ b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/store/DefaultStoreTest.java @@ -44,6 +44,7 @@ import io.cdap.cdap.api.workflow.NodeStatus; import io.cdap.cdap.app.program.ProgramDescriptor; import io.cdap.cdap.app.runtime.ProgramController; +import io.cdap.cdap.app.store.ApplicationFilter; import io.cdap.cdap.app.store.ScanApplicationsRequest; import io.cdap.cdap.app.store.Store; import io.cdap.cdap.common.ApplicationNotFoundException; @@ -1114,6 +1115,101 @@ protected void testScanApplications(Store store) throws ConflictException { Assert.assertEquals(latestApps.subList(firstPageSize, latestApps.size()), restartApps); } + @Test + public void testScanApplicationSummaries() throws Exception { + testScanApplicationSummaries(store); + } + + protected void testScanApplicationSummaries(Store store) throws Exception { + ApplicationSpecification appSpec = Specifications.from(new AllProgramsApp()); + + int count = 10; + for (int i = 0; i < count; i++) { + String appName = "summaryTest" + i; + ApplicationMeta appMeta = new ApplicationMeta(appName, appSpec, + new ChangeDetail(null, null, null, + System.currentTimeMillis())); + store.addLatestApplication(new ApplicationId(NamespaceId.DEFAULT.getNamespace(), appName), + appMeta); + } + + List summaries = new ArrayList<>(); + store.scanApplicationSummaries(ScanApplicationsRequest.builder().build(), + 5, summaries::add); + + Assert.assertEquals(count, summaries.size()); + + // Verify fields of one summary + AppSummary summary = summaries.get(0); + Assert.assertNotNull(summary.getAppId()); + Assert.assertNotNull(summary.getArtifactId()); + + // Test pagination + List page1 = new ArrayList<>(); + boolean limitReached = store.scanApplicationSummaries( + ScanApplicationsRequest.builder().setLimit(5).build(), + 5, page1::add); + Assert.assertTrue(limitReached); + Assert.assertEquals(5, page1.size()); + + // Test sorting (should throw UnsupportedOperationException for DESC if not supported) + try { + List descSummaries = new ArrayList<>(); + store.scanApplicationSummaries( + ScanApplicationsRequest.builder().setSortOrder(SortOrder.DESC).build(), + 10, descSummaries::add); + + Assert.assertEquals(count, descSummaries.size()); + List ascNames = summaries.stream() + .map(s -> s.getAppId().getApplication()) + .collect(Collectors.toList()); + List descNames = descSummaries.stream() + .map(s -> s.getAppId().getApplication()) + .collect(Collectors.toList()); + Assert.assertEquals(Lists.reverse(ascNames), descNames); + } catch (UnsupportedOperationException e) { + // Expected if not supported + } + } + + @Test + public void testScanApplicationSummariesWithArtifactFilter() throws Exception { + testScanApplicationSummariesWithArtifactFilter(store); + } + + protected void testScanApplicationSummariesWithArtifactFilter(Store store) throws Exception { + ArtifactId artifactId1 = NamespaceId.DEFAULT.artifact("artifact1", "1.0").toApiArtifactId(); + ArtifactId artifactId2 = NamespaceId.DEFAULT.artifact("artifact2", "1.0").toApiArtifactId(); + + ApplicationSpecification spec1 = createDummyAppSpec("app1", ApplicationId.DEFAULT_VERSION, + artifactId1); + ApplicationSpecification spec2 = createDummyAppSpec("app2", ApplicationId.DEFAULT_VERSION, + artifactId2); + + ApplicationMeta appMeta1 = new ApplicationMeta("app1", spec1, + new ChangeDetail(null, null, null, + System.currentTimeMillis())); + ApplicationMeta appMeta2 = new ApplicationMeta("app2", spec2, + new ChangeDetail(null, null, null, + System.currentTimeMillis())); + + store.addLatestApplication(NamespaceId.DEFAULT.app("app1"), appMeta1); + store.addLatestApplication(NamespaceId.DEFAULT.app("app2"), appMeta2); + + List summaries = new ArrayList<>(); + ApplicationFilter filter = new ApplicationFilter.ArtifactNamesInFilter( + Collections.singleton("artifact1")); + + store.scanApplicationSummaries(ScanApplicationsRequest.builder() + .setNamespaceId(NamespaceId.DEFAULT) + .addFilters(Collections.singletonList(filter)) + .build(), + 5, summaries::add); + + Assert.assertEquals(1, summaries.size()); + Assert.assertEquals("app1", summaries.get(0).getAppId().getApplication()); + } + @Test public void testScanApplicationsWithNamespace() throws ConflictException { testScanApplicationsWithNamespace(store); diff --git a/cdap-common/src/main/java/io/cdap/cdap/common/conf/Constants.java b/cdap-common/src/main/java/io/cdap/cdap/common/conf/Constants.java index c9213c6cde93..34b7bb9e356c 100644 --- a/cdap-common/src/main/java/io/cdap/cdap/common/conf/Constants.java +++ b/cdap-common/src/main/java/io/cdap/cdap/common/conf/Constants.java @@ -252,6 +252,7 @@ public static final class AppFabric { public static final String PROGRAM_JVM_OPTS_PREFIX = "app.program.jvm.opts."; public static final String BACKLOG_CONNECTIONS = "app.connection.backlog"; public static final String STREAMING_BATCH_SIZE = "app.streaming.batch.size"; + public static final String SUMMARY_STREAMING_BATCH_SIZE = "app.summary.streaming.batch.size"; public static final String EXEC_THREADS = "app.exec.threads"; public static final String BOSS_THREADS = "app.boss.threads"; public static final String WORKER_THREADS = "app.worker.threads"; diff --git a/cdap-common/src/main/resources/cdap-default.xml b/cdap-common/src/main/resources/cdap-default.xml index f3014da21223..43d8c55e08dc 100644 --- a/cdap-common/src/main/resources/cdap-default.xml +++ b/cdap-common/src/main/resources/cdap-default.xml @@ -1806,6 +1806,14 @@ + + app.summary.streaming.batch.size + 30 + + Batch size for scanning applications for the Summary API. + + + app.exec.threads 20 diff --git a/cdap-proto/src/main/java/io/cdap/cdap/proto/AppSummaryRecord.java b/cdap-proto/src/main/java/io/cdap/cdap/proto/AppSummaryRecord.java new file mode 100644 index 000000000000..071a8b5c4a28 --- /dev/null +++ b/cdap-proto/src/main/java/io/cdap/cdap/proto/AppSummaryRecord.java @@ -0,0 +1,98 @@ +/* + * Copyright © 2026 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.cdap.proto; + +import io.cdap.cdap.api.artifact.ArtifactSummary; +import javax.annotation.Nullable; + +/** + * A summary record of an application, containing its metadata, run count, and latest run. + */ +public class AppSummaryRecord { + + private final String name; + private final String version; + private final String description; + private final ArtifactSummary artifact; + private final long totalRunCount; + @Nullable + private final RunRecord latestRun; + + /** + * Constructs an instance of {@link AppSummaryRecord}. + * + * @param name the application name + * @param version the application version + * @param description the application description + * @param artifact the artifact summary + * @param totalRunCount total number of runs for this application + * @param latestRun the latest run record, or null if none + */ + public AppSummaryRecord(String name, String version, String description, + ArtifactSummary artifact, long totalRunCount, + @Nullable RunRecord latestRun) { + this.name = name; + this.version = version; + this.description = description; + this.artifact = artifact; + this.totalRunCount = totalRunCount; + this.latestRun = latestRun; + } + + /** + * Returns the application name. + */ + public String getName() { + return name; + } + + /** + * Returns the application version. + */ + public String getVersion() { + return version; + } + + /** + * Returns the application description. + */ + public String getDescription() { + return description; + } + + /** + * Returns the artifact summary. + */ + public ArtifactSummary getArtifact() { + return artifact; + } + + /** + * Returns the total run count. + */ + public long getTotalRunCount() { + return totalRunCount; + } + + /** + * Returns the latest run record, or null if there are no runs. + */ + @Nullable + public RunRecord getLatestRun() { + return latestRun; + } +} diff --git a/cdap-proto/src/main/java/io/cdap/cdap/proto/AppSummaryResponse.java b/cdap-proto/src/main/java/io/cdap/cdap/proto/AppSummaryResponse.java new file mode 100644 index 000000000000..8a37663c9663 --- /dev/null +++ b/cdap-proto/src/main/java/io/cdap/cdap/proto/AppSummaryResponse.java @@ -0,0 +1,53 @@ +/* + * Copyright © 2026 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.cdap.proto; + +import java.util.List; + +/** + * The response payload for the application summary endpoint. + */ +public class AppSummaryResponse { + + private final List apps; + private final String nextPageToken; + + /** + * Constructs an instance of {@link AppSummaryResponse}. + * + * @param apps list of application summaries + * @param nextPageToken token for the next page of results, or null if none + */ + public AppSummaryResponse(List apps, String nextPageToken) { + this.apps = apps; + this.nextPageToken = nextPageToken; + } + + /** + * Returns the list of application summaries. + */ + public List getApps() { + return apps; + } + + /** + * Returns the token for the next page of results, or null if there are no more pages. + */ + public String getNextPageToken() { + return nextPageToken; + } +}