Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions src/ledger/LedgerManagerImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2415,12 +2415,13 @@ LedgerManagerImpl::applyThread(
{
for (auto const& txBundle : cluster)
{
// Apply timer
std::optional<medida::TimerContext> txTime;
// Apply timer; samples go into the thread's metrics batch and are
// published at ledger close.
std::optional<BatchedTimerScope> txTime;
if (!mApp.getConfig().DISABLE_SOROBAN_METRICS_FOR_TESTING)
{
txTime.emplace(
mApplyState.getMetrics().mTransactionApply.TimeScope());
txTime.emplace(getSorobanMetrics(),
&SorobanMetrics::ApplyMetricsBatch::mTxApplyNsecs);
}

Hash txSubSeed = subSha256(sorobanBasePrngSeed, txBundle.getTxNum());
Expand Down
153 changes: 153 additions & 0 deletions src/ledger/SorobanMetrics.cpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
#include "ledger/SorobanMetrics.h"
#include "util/MetricsRegistry.h"

#include <medida/histogram.h>
#include <medida/meter.h>
#include <medida/metrics_registry.h>
#include <medida/timer.h>
#include <unordered_map>

namespace stellar
{
Expand All @@ -26,6 +30,8 @@ SorobanMetrics::SorobanMetrics(MetricsRegistry& metrics)

/* tx-wide metrics */
, mTxSizeByte(metrics.NewHistogram({"soroban", "tx", "size-byte"}))
, mTransactionApply(metrics.NewTimer({"ledger", "transaction", "apply"}))
, mOperationApply(metrics.NewTimer({"ledger", "operation", "apply"}))
/* InvokeHostFunctionOp metrics */
, mHostFnOpReadEntry(
metrics.NewMeter({"soroban", "host-fn-op", "read-entry"}, "entry"))
Expand Down Expand Up @@ -202,9 +208,156 @@ SorobanMetrics::accumulateLedgerWriteByte(uint64_t writeByte)
mCounterLedgerWriteByte += writeByte;
}

SorobanMetrics::ApplyMetricsBatch&
SorobanMetrics::getApplyThreadBatch()
{
// One entry per (thread, SorobanMetrics instance): tests can run several
// applications in one process and threads outlive applications, so the
// weak_ptr is re-validated on every first-use (the registry owns the
// batches; a stale entry from a destroyed instance whose address got
// reused expires together with its registry).
Comment on lines +214 to +218
static thread_local std::unordered_map<SorobanMetrics const*,
std::weak_ptr<ApplyMetricsBatch>>
tlBatches;
auto& weak = tlBatches[this];
if (auto existing = weak.lock())
{
return *existing;
}
auto batch = std::make_shared<ApplyMetricsBatch>();
{
std::lock_guard<std::mutex> lock(mApplyBatchesMutex);
mApplyBatches.push_back(batch);
}
weak = batch;
return *batch;
}

void
SorobanMetrics::flushApplyMetricsBatches()
{
std::vector<std::shared_ptr<ApplyMetricsBatch>> batches;
{
std::lock_guard<std::mutex> lock(mApplyBatchesMutex);
batches = mApplyBatches;
}
if (batches.empty())
{
return;
}

ApplyMetricsBatch total;
auto take = [](uint64_t& v) {
auto res = v;
v = 0;
return res;
};
auto drain = [](std::vector<int64_t>& dst, std::vector<int64_t>& src) {
dst.insert(dst.end(), src.begin(), src.end());
src.clear();
};
for (auto const& b : batches)
{
std::lock_guard<std::mutex> lock(b->mMutex);
total.mHostFnOpReadEntry += take(b->mHostFnOpReadEntry);
total.mHostFnOpWriteEntry += take(b->mHostFnOpWriteEntry);
total.mHostFnOpReadKeyByte += take(b->mHostFnOpReadKeyByte);
total.mHostFnOpWriteKeyByte += take(b->mHostFnOpWriteKeyByte);
total.mHostFnOpReadLedgerByte += take(b->mHostFnOpReadLedgerByte);
total.mHostFnOpReadDataByte += take(b->mHostFnOpReadDataByte);
total.mHostFnOpReadCodeByte += take(b->mHostFnOpReadCodeByte);
total.mHostFnOpWriteLedgerByte += take(b->mHostFnOpWriteLedgerByte);
total.mHostFnOpWriteDataByte += take(b->mHostFnOpWriteDataByte);
total.mHostFnOpWriteCodeByte += take(b->mHostFnOpWriteCodeByte);
total.mHostFnOpEmitEvent += take(b->mHostFnOpEmitEvent);
total.mHostFnOpEmitEventByte += take(b->mHostFnOpEmitEventByte);
total.mHostFnOpCpuInsn += take(b->mHostFnOpCpuInsn);
total.mHostFnOpMemByte += take(b->mHostFnOpMemByte);
total.mHostFnOpCpuInsnExclVm += take(b->mHostFnOpCpuInsnExclVm);
total.mHostFnOpMaxRwKeyByte += take(b->mHostFnOpMaxRwKeyByte);
total.mHostFnOpMaxRwDataByte += take(b->mHostFnOpMaxRwDataByte);
total.mHostFnOpMaxRwCodeByte += take(b->mHostFnOpMaxRwCodeByte);
total.mHostFnOpMaxEmitEventByte += take(b->mHostFnOpMaxEmitEventByte);
total.mHostFnOpSuccess += take(b->mHostFnOpSuccess);
total.mHostFnOpFailure += take(b->mHostFnOpFailure);
total.mExtFpTtlOpReadLedgerByte += take(b->mExtFpTtlOpReadLedgerByte);
total.mRestoreFpOpReadLedgerByte += take(b->mRestoreFpOpReadLedgerByte);
total.mRestoreFpOpWriteLedgerByte +=
take(b->mRestoreFpOpWriteLedgerByte);

drain(total.mHostFnOpInvokeTimeNsecs, b->mHostFnOpInvokeTimeNsecs);
drain(total.mHostFnOpInvokeTimeNsecsExclVm,
b->mHostFnOpInvokeTimeNsecsExclVm);
drain(total.mHostFnOpInvokeTimeFsecsCpuInsnRatio,
b->mHostFnOpInvokeTimeFsecsCpuInsnRatio);
drain(total.mHostFnOpInvokeTimeFsecsCpuInsnRatioExclVm,
b->mHostFnOpInvokeTimeFsecsCpuInsnRatioExclVm);
drain(total.mHostFnOpDeclaredInsnsUsageRatio,
b->mHostFnOpDeclaredInsnsUsageRatio);
drain(total.mHostFnOpExecNsecs, b->mHostFnOpExecNsecs);
drain(total.mExtFpTtlOpExecNsecs, b->mExtFpTtlOpExecNsecs);
drain(total.mRestoreFpOpExecNsecs, b->mRestoreFpOpExecNsecs);
drain(total.mTxSizeByte, b->mTxSizeByte);
drain(total.mTxApplyNsecs, b->mTxApplyNsecs);
drain(total.mOpApplyNsecs, b->mOpApplyNsecs);
}

// Publish into the underlying medida metrics, one bulk call per metric.
// Zero meter increments are skipped (a Mark(0) does not change any
// observable value); empty sample batches are no-ops in UpdateMany.
auto markIf = [](medida::Meter& meter, uint64_t value) {
if (value != 0)
{
meter.Mark(value);
}
};
markIf(mHostFnOpReadEntry, total.mHostFnOpReadEntry);
markIf(mHostFnOpWriteEntry, total.mHostFnOpWriteEntry);
markIf(mHostFnOpReadKeyByte, total.mHostFnOpReadKeyByte);
markIf(mHostFnOpWriteKeyByte, total.mHostFnOpWriteKeyByte);
markIf(mHostFnOpReadLedgerByte, total.mHostFnOpReadLedgerByte);
markIf(mHostFnOpReadDataByte, total.mHostFnOpReadDataByte);
markIf(mHostFnOpReadCodeByte, total.mHostFnOpReadCodeByte);
markIf(mHostFnOpWriteLedgerByte, total.mHostFnOpWriteLedgerByte);
markIf(mHostFnOpWriteDataByte, total.mHostFnOpWriteDataByte);
markIf(mHostFnOpWriteCodeByte, total.mHostFnOpWriteCodeByte);
markIf(mHostFnOpEmitEvent, total.mHostFnOpEmitEvent);
markIf(mHostFnOpEmitEventByte, total.mHostFnOpEmitEventByte);
markIf(mHostFnOpCpuInsn, total.mHostFnOpCpuInsn);
markIf(mHostFnOpMemByte, total.mHostFnOpMemByte);
markIf(mHostFnOpCpuInsnExclVm, total.mHostFnOpCpuInsnExclVm);
markIf(mHostFnOpMaxRwKeyByte, total.mHostFnOpMaxRwKeyByte);
markIf(mHostFnOpMaxRwDataByte, total.mHostFnOpMaxRwDataByte);
markIf(mHostFnOpMaxRwCodeByte, total.mHostFnOpMaxRwCodeByte);
markIf(mHostFnOpMaxEmitEventByte, total.mHostFnOpMaxEmitEventByte);
markIf(mHostFnOpSuccess, total.mHostFnOpSuccess);
markIf(mHostFnOpFailure, total.mHostFnOpFailure);
markIf(mExtFpTtlOpReadLedgerByte, total.mExtFpTtlOpReadLedgerByte);
markIf(mRestoreFpOpReadLedgerByte, total.mRestoreFpOpReadLedgerByte);
markIf(mRestoreFpOpWriteLedgerByte, total.mRestoreFpOpWriteLedgerByte);

mHostFnOpInvokeTimeNsecs.UpdateMany(total.mHostFnOpInvokeTimeNsecs);
mHostFnOpInvokeTimeNsecsExclVm.UpdateMany(
total.mHostFnOpInvokeTimeNsecsExclVm);
mHostFnOpInvokeTimeFsecsCpuInsnRatio.UpdateMany(
total.mHostFnOpInvokeTimeFsecsCpuInsnRatio);
mHostFnOpInvokeTimeFsecsCpuInsnRatioExclVm.UpdateMany(
total.mHostFnOpInvokeTimeFsecsCpuInsnRatioExclVm);
mHostFnOpDeclaredInsnsUsageRatio.UpdateMany(
total.mHostFnOpDeclaredInsnsUsageRatio);
mHostFnOpExec.UpdateMany(total.mHostFnOpExecNsecs);
mExtFpTtlOpExec.UpdateMany(total.mExtFpTtlOpExecNsecs);
mRestoreFpOpExec.UpdateMany(total.mRestoreFpOpExecNsecs);
mTxSizeByte.UpdateMany(total.mTxSizeByte);
mTransactionApply.UpdateMany(total.mTxApplyNsecs);
mOperationApply.UpdateMany(total.mOpApplyNsecs);
}

void
SorobanMetrics::publishAndResetLedgerWideMetrics()
{
flushApplyMetricsBatches();

mLedgerTxCount.Update(mCounterLedgerTxCount);
mLedgerCpuInsn.Update(mCounterLedgerCpuInsn);
mLedgerTxsSizeByte.Update(mCounterLedgerTxsSizeByte);
Expand Down
134 changes: 134 additions & 0 deletions src/ledger/SorobanMetrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,11 @@
// limits. It also performs aggregation of ledger-wide resource usage across
// different operations.
#include <atomic>
#include <chrono>
#include <cstdint>
#include <memory>
#include <mutex>
#include <vector>

namespace medida
{
Expand All @@ -24,6 +28,58 @@ class MetricsRegistry;

class SorobanMetrics
{
public:
// Accumulates apply-path metric updates from a single thread. Hot apply
// code records into its own thread's batch (brief, uncontended lock
// acquisition), and the batches are drained into the underlying
// process-wide medida metrics once per ledger on the main thread via
// publishAndResetLedgerWideMetrics(). This keeps shared metric state (its
// locks and cache lines) off the parallel apply threads.
struct ApplyMetricsBatch
{
std::mutex mMutex;

// Pending Meter increments (Marks summed since the last publish).
uint64_t mHostFnOpReadEntry{0};
uint64_t mHostFnOpWriteEntry{0};
uint64_t mHostFnOpReadKeyByte{0};
uint64_t mHostFnOpWriteKeyByte{0};
uint64_t mHostFnOpReadLedgerByte{0};
uint64_t mHostFnOpReadDataByte{0};
uint64_t mHostFnOpReadCodeByte{0};
uint64_t mHostFnOpWriteLedgerByte{0};
uint64_t mHostFnOpWriteDataByte{0};
uint64_t mHostFnOpWriteCodeByte{0};
uint64_t mHostFnOpEmitEvent{0};
uint64_t mHostFnOpEmitEventByte{0};
uint64_t mHostFnOpCpuInsn{0};
uint64_t mHostFnOpMemByte{0};
uint64_t mHostFnOpCpuInsnExclVm{0};
uint64_t mHostFnOpMaxRwKeyByte{0};
uint64_t mHostFnOpMaxRwDataByte{0};
uint64_t mHostFnOpMaxRwCodeByte{0};
uint64_t mHostFnOpMaxEmitEventByte{0};
uint64_t mHostFnOpSuccess{0};
uint64_t mHostFnOpFailure{0};
uint64_t mExtFpTtlOpReadLedgerByte{0};
uint64_t mRestoreFpOpReadLedgerByte{0};
uint64_t mRestoreFpOpWriteLedgerByte{0};

// Pending sample streams for percentile-bearing histograms/timers
// (timer samples are in nanoseconds).
std::vector<int64_t> mHostFnOpInvokeTimeNsecs;
std::vector<int64_t> mHostFnOpInvokeTimeNsecsExclVm;
std::vector<int64_t> mHostFnOpInvokeTimeFsecsCpuInsnRatio;
std::vector<int64_t> mHostFnOpInvokeTimeFsecsCpuInsnRatioExclVm;
std::vector<int64_t> mHostFnOpDeclaredInsnsUsageRatio;
std::vector<int64_t> mHostFnOpExecNsecs;
std::vector<int64_t> mExtFpTtlOpExecNsecs;
std::vector<int64_t> mRestoreFpOpExecNsecs;
std::vector<int64_t> mTxSizeByte;
std::vector<int64_t> mTxApplyNsecs;
std::vector<int64_t> mOpApplyNsecs;
};

private:
std::atomic<uint64_t> mCounterLedgerTxCount{0};
std::atomic<uint64_t> mCounterLedgerCpuInsn{0};
Expand All @@ -38,6 +94,13 @@ class SorobanMetrics
std::atomic<uint64_t> mLedgerInsnsExclVmCount{0};
std::atomic<uint64_t> mLedgerHostFnExecTimeNsecs{0};

// All per-thread batches handed out by getApplyThreadBatch(), drained on
// each publishAndResetLedgerWideMetrics() call.
std::mutex mApplyBatchesMutex;
std::vector<std::shared_ptr<ApplyMetricsBatch>> mApplyBatches;

void flushApplyMetricsBatches();

public:
// ledger-wide metrics
medida::Histogram& mLedgerTxCount;
Expand All @@ -53,6 +116,14 @@ class SorobanMetrics
// tx-wide metrics
medida::Histogram& mTxSizeByte;

// Cached references to the (op-kind-agnostic) "ledger.transaction.apply"
// and "ledger.operation.apply" timers: the parallel apply path records
// per-tx/per-op samples into its ApplyMetricsBatch and they are published
// into these at ledger close. These are the same timer instances the
// sequential apply path updates directly via registry lookups.
medida::Timer& mTransactionApply;
medida::Timer& mOperationApply;

// `InvokeHostFunctionOp` metrics
medida::Meter& mHostFnOpReadEntry;
medida::Meter& mHostFnOpWriteEntry;
Expand Down Expand Up @@ -127,6 +198,10 @@ class SorobanMetrics

SorobanMetrics(MetricsRegistry& metrics);

// Returns the calling thread's metrics batch for this SorobanMetrics
// instance, creating and registering it on first use.
ApplyMetricsBatch& getApplyThreadBatch();

void accumulateModelledCpuInsns(uint64_t insnsCount,
uint64_t insnsExclVmCount,
uint64_t execTimeNsecs);
Expand All @@ -140,4 +215,63 @@ class SorobanMetrics

void publishAndResetLedgerWideMetrics();
};

// Adds the wall-clock duration of its lifetime (in nanoseconds) to a
// caller-provided accumulator on destruction.
class ScopedNsecsTimer
{
public:
explicit ScopedNsecsTimer(uint64_t& target)
: mTarget(target), mStart(std::chrono::steady_clock::now())
{
}

ScopedNsecsTimer(ScopedNsecsTimer const&) = delete;
ScopedNsecsTimer& operator=(ScopedNsecsTimer const&) = delete;

~ScopedNsecsTimer()
{
mTarget += std::chrono::duration_cast<std::chrono::nanoseconds>(
std::chrono::steady_clock::now() - mStart)
.count();
}

private:
uint64_t& mTarget;
std::chrono::steady_clock::time_point mStart;
};

// Times its lifetime and records the elapsed nanoseconds as a sample in the
// given vector of the calling thread's ApplyMetricsBatch.
class BatchedTimerScope
{
public:
using SampleField =
std::vector<int64_t> SorobanMetrics::ApplyMetricsBatch::*;

BatchedTimerScope(SorobanMetrics& metrics, SampleField field)
: mMetrics(metrics)
, mField(field)
, mStart(std::chrono::steady_clock::now())
{
}

BatchedTimerScope(BatchedTimerScope const&) = delete;
BatchedTimerScope& operator=(BatchedTimerScope const&) = delete;

~BatchedTimerScope()
{
auto elapsed = std::chrono::duration_cast<std::chrono::nanoseconds>(
std::chrono::steady_clock::now() - mStart)
.count();
auto& batch = mMetrics.getApplyThreadBatch();
std::lock_guard<std::mutex> lock(batch.mMutex);
(batch.*mField).push_back(elapsed);
}

private:
SorobanMetrics& mMetrics;
SampleField mField;
std::chrono::steady_clock::time_point mStart;
};
}
Loading
Loading