diff --git a/pom.xml b/pom.xml index 9a46da9..ddefc2e 100644 --- a/pom.xml +++ b/pom.xml @@ -61,6 +61,10 @@ io.quarkus quarkus-rest-jackson + + io.quarkus + quarkus-smallrye-fault-tolerance + io.quarkus quarkus-container-image-docker diff --git a/src/main/java/io/cryostat/reports/HeapDumpFormData.java b/src/main/java/io/cryostat/reports/HeapDumpFormData.java new file mode 100644 index 0000000..c8e06bb --- /dev/null +++ b/src/main/java/io/cryostat/reports/HeapDumpFormData.java @@ -0,0 +1,35 @@ +/* + * Copyright The Cryostat Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.cryostat.reports; + +import jakarta.ws.rs.core.MediaType; +import org.jboss.resteasy.reactive.PartType; +import org.jboss.resteasy.reactive.RestForm; +import org.jboss.resteasy.reactive.multipart.FileUpload; + +public class HeapDumpFormData { + @RestForm + @PartType(MediaType.APPLICATION_OCTET_STREAM) + public FileUpload file; + + @RestForm + @PartType(MediaType.TEXT_PLAIN) + public String heapDumpId; + + @RestForm + @PartType(MediaType.TEXT_PLAIN) + public String jvmId; +} diff --git a/src/main/java/io/cryostat/reports/PresignedHeapDumpFormData.java b/src/main/java/io/cryostat/reports/PresignedHeapDumpFormData.java new file mode 100644 index 0000000..b669386 --- /dev/null +++ b/src/main/java/io/cryostat/reports/PresignedHeapDumpFormData.java @@ -0,0 +1,36 @@ +/* + * Copyright The Cryostat Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.cryostat.reports; + +import java.net.URI; + +import jakarta.ws.rs.core.MediaType; +import org.jboss.resteasy.reactive.PartType; +import org.jboss.resteasy.reactive.RestForm; + +public class PresignedHeapDumpFormData { + @RestForm + @PartType(MediaType.TEXT_PLAIN) + public URI uri; + + @RestForm + @PartType(MediaType.TEXT_PLAIN) + public String heapDumpId; + + @RestForm + @PartType(MediaType.TEXT_PLAIN) + public String jvmId; +} diff --git a/src/main/java/io/cryostat/reports/Producers.java b/src/main/java/io/cryostat/reports/Producers.java index 78bf8a4..0a9baaa 100644 --- a/src/main/java/io/cryostat/reports/Producers.java +++ b/src/main/java/io/cryostat/reports/Producers.java @@ -18,6 +18,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.ForkJoinPool; +import io.cryostat.core.diagnostic.HeapDumpReportGenerator; import io.cryostat.core.reports.InterruptibleReportGenerator; import io.cryostat.core.util.RuleFilterParser; import io.cryostat.libcryostat.sys.FileSystem; @@ -40,6 +41,17 @@ InterruptibleReportGenerator produceReportGenerator() { singleThread ? Executors.newSingleThreadExecutor() : ForkJoinPool.commonPool()); } + @Produces + // RequestScoped so that each individual report generation request has its own interruptible + // generator with an independent task queueing thread which dispatches to the shared common pool + @RequestScoped + HeapDumpReportGenerator produceHeapDumpReportGenerator() { + boolean singleThread = + Runtime.getRuntime().availableProcessors() < 2 + || Boolean.getBoolean(ReportResource.SINGLETHREAD_PROPERTY); + return new HeapDumpReportGenerator(singleThread ? Executors.newSingleThreadExecutor() : ForkJoinPool.commonPool()); + } + @Produces @ApplicationScoped RuleFilterParser produceRuleFilterParser() { diff --git a/src/main/java/io/cryostat/reports/ReportResource.java b/src/main/java/io/cryostat/reports/ReportResource.java index 7df22ad..70bb006 100644 --- a/src/main/java/io/cryostat/reports/ReportResource.java +++ b/src/main/java/io/cryostat/reports/ReportResource.java @@ -19,6 +19,7 @@ import java.io.IOException; import java.io.InputStream; import java.net.HttpURLConnection; +import java.net.ProtocolException; import java.net.URISyntaxException; import java.nio.file.Files; import java.nio.file.StandardCopyOption; @@ -43,6 +44,8 @@ import javax.net.ssl.TrustManagerFactory; import javax.net.ssl.X509TrustManager; +import io.cryostat.core.diagnostic.HeapDumpAnalysis; +import io.cryostat.core.diagnostic.HeapDumpReportGenerator; import io.cryostat.core.reports.InterruptibleReportGenerator; import io.cryostat.core.reports.InterruptibleReportGenerator.AnalysisResult; import io.cryostat.core.util.RuleFilterParser; @@ -68,6 +71,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.Pair; import org.eclipse.microprofile.config.inject.ConfigProperty; +import org.eclipse.microprofile.faulttolerance.Bulkhead; import org.jboss.logging.Logger; import org.jboss.resteasy.reactive.multipart.FileUpload; import org.openjdk.jmc.common.io.IOToolkit; @@ -85,6 +89,9 @@ public class ReportResource { @ConfigProperty(name = "io.cryostat.reports.timeout", defaultValue = "29000") String timeoutMs; + @ConfigProperty(name = "io.cryostat.reports.heap-dump-memory-limit", defaultValue = "0") + int heapDumpMemoryLimit; + @ConfigProperty(name = "cryostat.storage.auth-method") Optional storageAuthMethod; @@ -107,6 +114,7 @@ public class ReportResource { Optional storageCertPath; @Inject InterruptibleReportGenerator generator; + @Inject HeapDumpReportGenerator heapDumpGenerator; @Inject RuleFilterParser rfp; @Inject FileSystem fs; @Inject ObjectMapper mapper; @@ -127,6 +135,7 @@ void onStart(@Observes StartupEvent ev) { @Produces(MediaType.TEXT_PLAIN) public void healthCheck() {} + @Bulkhead @Blocking @Path("remote_report") @Produces(MediaType.APPLICATION_JSON) @@ -151,6 +160,67 @@ public String getReportFromPresigned(RoutingContext ctx, @BeanParam PresignedFor logger.debugv("Attempting to download presigned recording from {0}", form.uri); HttpURLConnection httpConn = (HttpURLConnection) form.uri.toURL().openConnection(); + try (var stream = getPresignedObjectStream(httpConn)) { + + Predicate predicate = rfp.parse(form.filter); + Future> evalMapFuture = null; + + evalMapFuture = generator.generateEvalMapInterruptibly(stream, predicate); + long elapsed = System.nanoTime() - start; + ctxHelper(ctx, evalMapFuture); + return mapper.writeValueAsString( + evalMapFuture.get(timeout - elapsed, TimeUnit.NANOSECONDS)); + } catch (ExecutionException | InterruptedException e) { + logger.error(e); + throw new InternalServerErrorException(e); + } catch (TimeoutException e) { + logger.error(e); + throw new ServerErrorException(Response.Status.GATEWAY_TIMEOUT, e); + } catch (Exception e) { + logger.error(e); + throw e; + } finally { + httpConn.disconnect(); + } + } + + @Blocking + @Bulkhead + @Path("report") + @Produces(MediaType.APPLICATION_JSON) + @Consumes(MediaType.MULTIPART_FORM_DATA) + @POST + public String getReport(RoutingContext ctx, @BeanParam RecordingFormData form) + throws IOException { + FileUpload upload = form.file; + + Pair> uploadResult = handleUpload(upload); + java.nio.file.Path file = uploadResult.getLeft(); + long timeout = TimeUnit.MILLISECONDS.toNanos(Long.parseLong(timeoutMs)); + long start = uploadResult.getRight().getLeft(); + long elapsed = uploadResult.getRight().getRight(); + + if (StringUtils.isNotBlank(form.filter)) { + logger.debugv("Received request with filter: {0}", form.filter); + } + Predicate predicate = rfp.parse(form.filter); + Future> evalMapFuture = null; + + try (var stream = fs.newInputStream(file)) { + evalMapFuture = generator.generateEvalMapInterruptibly(stream, predicate); + ctxHelper(ctx, evalMapFuture); + return mapper.writeValueAsString( + evalMapFuture.get(timeout - elapsed, TimeUnit.NANOSECONDS)); + } catch (ExecutionException | InterruptedException e) { + throw new InternalServerErrorException(e); + } catch (TimeoutException e) { + throw new ServerErrorException(Response.Status.GATEWAY_TIMEOUT, e); + } finally { + cleanupHelper(evalMapFuture, file, upload.fileName(), start); + } + } + + private InputStream getPresignedObjectStream(HttpURLConnection httpConn) throws IOException, ProtocolException { httpConn.setRequestMethod("GET"); if (httpConn instanceof HttpsURLConnection) { HttpsURLConnection httpsConn = (HttpsURLConnection) httpConn; @@ -194,18 +264,40 @@ public String getReportFromPresigned(RoutingContext ctx, @BeanParam PresignedFor "Authorization", String.format("%s %s", storageAuthMethod.get(), storageAuth.get())); } - assertContentLength(httpConn.getContentLengthLong()); - try (var stream = httpConn.getInputStream()) { - - Predicate predicate = rfp.parse(form.filter); - Future> evalMapFuture = null; + return httpConn.getInputStream(); + } - evalMapFuture = generator.generateEvalMapInterruptibly(stream, predicate); + @Blocking + @Bulkhead + @Path("heapdump/remote_report") + @Produces(MediaType.APPLICATION_JSON) + @Consumes(MediaType.MULTIPART_FORM_DATA) + @POST + public String getHeapDumpReportFromPresigned(RoutingContext ctx, @BeanParam PresignedHeapDumpFormData form) + throws IOException, URISyntaxException { + // TODO queue these requests so we don't overload ourselves, in particular by reading + // multiple Heap Dump files into memory at once for analysis. We should process these serially + // from the queue. If we are getting overloaded then our response time to each subsequent + // request will continue to grow unbounded, so at some point we should stop accepting + // requests when the queue is too long. + // Since this is a @Blocking method that runs on a worker thread pool, can we implement this + // serial queueing behaviour by simply synchronizing on a shared singleton resource ex. the + // generator instance? + // A better long-term solution would be to use a shared messaging queue between Cryostat and + // the report generators, so that Cryostat can put up a URL for a presigned recording to be + // processed and a free report generator can claim that work item and then post back the + // report response + long timeout = TimeUnit.MILLISECONDS.toNanos(Long.parseLong(timeoutMs)); + long start = System.nanoTime(); + HttpURLConnection httpConn = (HttpURLConnection) form.uri.toURL().openConnection(); + try (var stream = getPresignedObjectStream(httpConn)) { + Future evalFuture = null; + evalFuture = heapDumpGenerator.generate(stream, heapDumpMemoryLimit); long elapsed = System.nanoTime() - start; - ctxHelper(ctx, evalMapFuture); + ctxHelper(ctx, evalFuture); return mapper.writeValueAsString( - evalMapFuture.get(timeout - elapsed, TimeUnit.NANOSECONDS)); + evalFuture.get(timeout - elapsed, TimeUnit.NANOSECONDS)); } catch (ExecutionException | InterruptedException e) { logger.error(e); throw new InternalServerErrorException(e); @@ -221,11 +313,12 @@ public String getReportFromPresigned(RoutingContext ctx, @BeanParam PresignedFor } @Blocking - @Path("report") + @Bulkhead + @Path("heapdump/report") @Produces(MediaType.APPLICATION_JSON) @Consumes(MediaType.MULTIPART_FORM_DATA) @POST - public String getReport(RoutingContext ctx, @BeanParam RecordingFormData form) + public String getHeapDumpReport(RoutingContext ctx, @BeanParam HeapDumpFormData form) throws IOException { FileUpload upload = form.file; @@ -235,23 +328,19 @@ public String getReport(RoutingContext ctx, @BeanParam RecordingFormData form) long start = uploadResult.getRight().getLeft(); long elapsed = uploadResult.getRight().getRight(); - if (StringUtils.isNotBlank(form.filter)) { - logger.debugv("Received request with filter: {0}", form.filter); - } - Predicate predicate = rfp.parse(form.filter); - Future> evalMapFuture = null; + Future evalFuture = null; try (var stream = fs.newInputStream(file)) { - evalMapFuture = generator.generateEvalMapInterruptibly(stream, predicate); - ctxHelper(ctx, evalMapFuture); + evalFuture = heapDumpGenerator.generate(stream, heapDumpMemoryLimit); + ctxHelper(ctx, evalFuture); return mapper.writeValueAsString( - evalMapFuture.get(timeout - elapsed, TimeUnit.NANOSECONDS)); + evalFuture.get(timeout - elapsed, TimeUnit.NANOSECONDS)); } catch (ExecutionException | InterruptedException e) { throw new InternalServerErrorException(e); } catch (TimeoutException e) { throw new ServerErrorException(Response.Status.GATEWAY_TIMEOUT, e); } finally { - cleanupHelper(evalMapFuture, file, upload.fileName(), start); + cleanupHelper(evalFuture, file, upload.fileName(), start); } }