Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@
<groupId>io.quarkus</groupId>
<artifactId>quarkus-rest-jackson</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-smallrye-fault-tolerance</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-container-image-docker</artifactId>
Expand Down
35 changes: 35 additions & 0 deletions src/main/java/io/cryostat/reports/HeapDumpFormData.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright The Cryostat Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.cryostat.reports;

import jakarta.ws.rs.core.MediaType;
import org.jboss.resteasy.reactive.PartType;
import org.jboss.resteasy.reactive.RestForm;
import org.jboss.resteasy.reactive.multipart.FileUpload;

public class HeapDumpFormData {
@RestForm
@PartType(MediaType.APPLICATION_OCTET_STREAM)
public FileUpload file;

@RestForm
@PartType(MediaType.TEXT_PLAIN)
public String heapDumpId;

@RestForm
@PartType(MediaType.TEXT_PLAIN)
public String jvmId;
}
36 changes: 36 additions & 0 deletions src/main/java/io/cryostat/reports/PresignedHeapDumpFormData.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright The Cryostat Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.cryostat.reports;

import java.net.URI;

import jakarta.ws.rs.core.MediaType;
import org.jboss.resteasy.reactive.PartType;
import org.jboss.resteasy.reactive.RestForm;

public class PresignedHeapDumpFormData {
@RestForm
@PartType(MediaType.TEXT_PLAIN)
public URI uri;

@RestForm
@PartType(MediaType.TEXT_PLAIN)
public String heapDumpId;

@RestForm
@PartType(MediaType.TEXT_PLAIN)
public String jvmId;
}
12 changes: 12 additions & 0 deletions src/main/java/io/cryostat/reports/Producers.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.ForkJoinPool;

import io.cryostat.core.diagnostic.HeapDumpReportGenerator;
import io.cryostat.core.reports.InterruptibleReportGenerator;
import io.cryostat.core.util.RuleFilterParser;
import io.cryostat.libcryostat.sys.FileSystem;
Expand All @@ -40,6 +41,17 @@ InterruptibleReportGenerator produceReportGenerator() {
singleThread ? Executors.newSingleThreadExecutor() : ForkJoinPool.commonPool());
}

@Produces
// RequestScoped so that each individual report generation request has its own interruptible
// generator with an independent task queueing thread which dispatches to the shared common pool
@RequestScoped
HeapDumpReportGenerator produceHeapDumpReportGenerator() {
boolean singleThread =
Runtime.getRuntime().availableProcessors() < 2
|| Boolean.getBoolean(ReportResource.SINGLETHREAD_PROPERTY);
return new HeapDumpReportGenerator(singleThread ? Executors.newSingleThreadExecutor() : ForkJoinPool.commonPool());
}

@Produces
@ApplicationScoped
RuleFilterParser produceRuleFilterParser() {
Expand Down
127 changes: 108 additions & 19 deletions src/main/java/io/cryostat/reports/ReportResource.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.net.HttpURLConnection;
import java.net.ProtocolException;
import java.net.URISyntaxException;
import java.nio.file.Files;
import java.nio.file.StandardCopyOption;
Expand All @@ -43,6 +44,8 @@
import javax.net.ssl.TrustManagerFactory;
import javax.net.ssl.X509TrustManager;

import io.cryostat.core.diagnostic.HeapDumpAnalysis;
import io.cryostat.core.diagnostic.HeapDumpReportGenerator;
import io.cryostat.core.reports.InterruptibleReportGenerator;
import io.cryostat.core.reports.InterruptibleReportGenerator.AnalysisResult;
import io.cryostat.core.util.RuleFilterParser;
Expand All @@ -68,6 +71,7 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.eclipse.microprofile.faulttolerance.Bulkhead;
import org.jboss.logging.Logger;
import org.jboss.resteasy.reactive.multipart.FileUpload;
import org.openjdk.jmc.common.io.IOToolkit;
Expand All @@ -85,6 +89,9 @@ public class ReportResource {
@ConfigProperty(name = "io.cryostat.reports.timeout", defaultValue = "29000")
String timeoutMs;

@ConfigProperty(name = "io.cryostat.reports.heap-dump-memory-limit", defaultValue = "0")
int heapDumpMemoryLimit;

@ConfigProperty(name = "cryostat.storage.auth-method")
Optional<String> storageAuthMethod;

Expand All @@ -107,6 +114,7 @@ public class ReportResource {
Optional<java.nio.file.Path> storageCertPath;

@Inject InterruptibleReportGenerator generator;
@Inject HeapDumpReportGenerator heapDumpGenerator;
@Inject RuleFilterParser rfp;
@Inject FileSystem fs;
@Inject ObjectMapper mapper;
Expand All @@ -127,6 +135,7 @@ void onStart(@Observes StartupEvent ev) {
@Produces(MediaType.TEXT_PLAIN)
public void healthCheck() {}

@Bulkhead
@Blocking
@Path("remote_report")
@Produces(MediaType.APPLICATION_JSON)
Expand All @@ -151,6 +160,67 @@ public String getReportFromPresigned(RoutingContext ctx, @BeanParam PresignedFor

logger.debugv("Attempting to download presigned recording from {0}", form.uri);
HttpURLConnection httpConn = (HttpURLConnection) form.uri.toURL().openConnection();
try (var stream = getPresignedObjectStream(httpConn)) {

Predicate<IRule> predicate = rfp.parse(form.filter);
Future<Map<String, AnalysisResult>> evalMapFuture = null;

evalMapFuture = generator.generateEvalMapInterruptibly(stream, predicate);
long elapsed = System.nanoTime() - start;
ctxHelper(ctx, evalMapFuture);
return mapper.writeValueAsString(
evalMapFuture.get(timeout - elapsed, TimeUnit.NANOSECONDS));
} catch (ExecutionException | InterruptedException e) {
logger.error(e);
throw new InternalServerErrorException(e);
} catch (TimeoutException e) {
logger.error(e);
throw new ServerErrorException(Response.Status.GATEWAY_TIMEOUT, e);
} catch (Exception e) {
logger.error(e);
throw e;
} finally {
httpConn.disconnect();
}
}

@Blocking
@Bulkhead
@Path("report")
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.MULTIPART_FORM_DATA)
@POST
public String getReport(RoutingContext ctx, @BeanParam RecordingFormData form)
throws IOException {
FileUpload upload = form.file;

Pair<java.nio.file.Path, Pair<Long, Long>> uploadResult = handleUpload(upload);
java.nio.file.Path file = uploadResult.getLeft();
long timeout = TimeUnit.MILLISECONDS.toNanos(Long.parseLong(timeoutMs));
long start = uploadResult.getRight().getLeft();
long elapsed = uploadResult.getRight().getRight();

if (StringUtils.isNotBlank(form.filter)) {
logger.debugv("Received request with filter: {0}", form.filter);
}
Predicate<IRule> predicate = rfp.parse(form.filter);
Future<Map<String, AnalysisResult>> evalMapFuture = null;

try (var stream = fs.newInputStream(file)) {
evalMapFuture = generator.generateEvalMapInterruptibly(stream, predicate);
ctxHelper(ctx, evalMapFuture);
return mapper.writeValueAsString(
evalMapFuture.get(timeout - elapsed, TimeUnit.NANOSECONDS));
} catch (ExecutionException | InterruptedException e) {
throw new InternalServerErrorException(e);
} catch (TimeoutException e) {
throw new ServerErrorException(Response.Status.GATEWAY_TIMEOUT, e);
} finally {
cleanupHelper(evalMapFuture, file, upload.fileName(), start);
}
}

private InputStream getPresignedObjectStream(HttpURLConnection httpConn) throws IOException, ProtocolException {
httpConn.setRequestMethod("GET");
if (httpConn instanceof HttpsURLConnection) {
Comment thread
Josh-Matsuoka marked this conversation as resolved.
HttpsURLConnection httpsConn = (HttpsURLConnection) httpConn;
Expand Down Expand Up @@ -194,18 +264,40 @@ public String getReportFromPresigned(RoutingContext ctx, @BeanParam PresignedFor
"Authorization",
String.format("%s %s", storageAuthMethod.get(), storageAuth.get()));
}

assertContentLength(httpConn.getContentLengthLong());
try (var stream = httpConn.getInputStream()) {

Predicate<IRule> predicate = rfp.parse(form.filter);
Future<Map<String, AnalysisResult>> evalMapFuture = null;
return httpConn.getInputStream();
}

evalMapFuture = generator.generateEvalMapInterruptibly(stream, predicate);
@Blocking
@Bulkhead
@Path("heapdump/remote_report")
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.MULTIPART_FORM_DATA)
@POST
public String getHeapDumpReportFromPresigned(RoutingContext ctx, @BeanParam PresignedHeapDumpFormData form)
throws IOException, URISyntaxException {
// TODO queue these requests so we don't overload ourselves, in particular by reading
// multiple Heap Dump files into memory at once for analysis. We should process these serially
// from the queue. If we are getting overloaded then our response time to each subsequent
// request will continue to grow unbounded, so at some point we should stop accepting
// requests when the queue is too long.
// Since this is a @Blocking method that runs on a worker thread pool, can we implement this
// serial queueing behaviour by simply synchronizing on a shared singleton resource ex. the
// generator instance?
// A better long-term solution would be to use a shared messaging queue between Cryostat and
// the report generators, so that Cryostat can put up a URL for a presigned recording to be
// processed and a free report generator can claim that work item and then post back the
// report response
long timeout = TimeUnit.MILLISECONDS.toNanos(Long.parseLong(timeoutMs));
long start = System.nanoTime();
HttpURLConnection httpConn = (HttpURLConnection) form.uri.toURL().openConnection();
try (var stream = getPresignedObjectStream(httpConn)) {
Future<HeapDumpAnalysis> evalFuture = null;
evalFuture = heapDumpGenerator.generate(stream, heapDumpMemoryLimit);
long elapsed = System.nanoTime() - start;
ctxHelper(ctx, evalMapFuture);
ctxHelper(ctx, evalFuture);
return mapper.writeValueAsString(
evalMapFuture.get(timeout - elapsed, TimeUnit.NANOSECONDS));
evalFuture.get(timeout - elapsed, TimeUnit.NANOSECONDS));
} catch (ExecutionException | InterruptedException e) {
logger.error(e);
throw new InternalServerErrorException(e);
Expand All @@ -221,11 +313,12 @@ public String getReportFromPresigned(RoutingContext ctx, @BeanParam PresignedFor
}

@Blocking
@Path("report")
@Bulkhead
@Path("heapdump/report")
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.MULTIPART_FORM_DATA)
@POST
public String getReport(RoutingContext ctx, @BeanParam RecordingFormData form)
public String getHeapDumpReport(RoutingContext ctx, @BeanParam HeapDumpFormData form)
throws IOException {
FileUpload upload = form.file;

Expand All @@ -235,23 +328,19 @@ public String getReport(RoutingContext ctx, @BeanParam RecordingFormData form)
long start = uploadResult.getRight().getLeft();
long elapsed = uploadResult.getRight().getRight();

if (StringUtils.isNotBlank(form.filter)) {
logger.debugv("Received request with filter: {0}", form.filter);
}
Predicate<IRule> predicate = rfp.parse(form.filter);
Future<Map<String, AnalysisResult>> evalMapFuture = null;
Future<HeapDumpAnalysis> evalFuture = null;

try (var stream = fs.newInputStream(file)) {
evalMapFuture = generator.generateEvalMapInterruptibly(stream, predicate);
ctxHelper(ctx, evalMapFuture);
evalFuture = heapDumpGenerator.generate(stream, heapDumpMemoryLimit);
ctxHelper(ctx, evalFuture);
return mapper.writeValueAsString(
evalMapFuture.get(timeout - elapsed, TimeUnit.NANOSECONDS));
evalFuture.get(timeout - elapsed, TimeUnit.NANOSECONDS));
} catch (ExecutionException | InterruptedException e) {
throw new InternalServerErrorException(e);
} catch (TimeoutException e) {
throw new ServerErrorException(Response.Status.GATEWAY_TIMEOUT, e);
} finally {
cleanupHelper(evalMapFuture, file, upload.fileName(), start);
cleanupHelper(evalFuture, file, upload.fileName(), start);
}
}

Expand Down
Loading