Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion packages/beacon-node/src/chain/archiver/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ export class Archiver {
// should be after ArchiveBlocksTask to handle restart cleanly
await this.statesArchiver.maybeArchiveState(finalized);

this.chain.regen.pruneOnFinalized(finalizedEpoch);
this.chain.pruneOnFinalized(finalizedEpoch);

// tasks rely on extended fork choice
const prunedBlocks = this.chain.forkChoice.prune(finalized.rootHex);
Expand Down
38 changes: 38 additions & 0 deletions packages/beacon-node/src/chain/balancesTreeCache.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import {ListBasicTreeViewDU, UintNumberType} from "@chainsafe/ssz";
import {IBalancesTreeCache, CachedBeaconStateAllForks} from "@lodestar/state-transition";
import {Metrics} from "../metrics/index.js";

const MAX_ITEMS = 2;

export class BalancesTreeCache implements IBalancesTreeCache {
private readonly unusedBalancesTrees: ListBasicTreeViewDU<UintNumberType>[] = [];

constructor(private readonly metrics: Metrics | null = null) {
if (metrics) {
metrics.balancesTreeCache.size.addCollect(() => {
metrics.balancesTreeCache.size.set(this.unusedBalancesTrees.length);
});
}
}

processUnusedState(state: CachedBeaconStateAllForks | undefined): void {
if (state === undefined) {
return;
}

this.unusedBalancesTrees.push(state.balances);
while (this.unusedBalancesTrees.length > MAX_ITEMS) {
this.unusedBalancesTrees.shift();
}
}

getUnusedBalances(): ListBasicTreeViewDU<UintNumberType> | undefined {
if (this.unusedBalancesTrees.length === 0) {
this.metrics?.balancesTreeCache.miss.inc();
return undefined;
}

this.metrics?.balancesTreeCache.hit.inc();
return this.unusedBalancesTrees.shift();
}
}
11 changes: 10 additions & 1 deletion packages/beacon-node/src/chain/blocks/importBlock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,16 @@ export async function importBlock(

// This adds the state necessary to process the next block
// Some block event handlers require state being in state cache so need to do this before emitting EventType.block
this.regen.processState(blockRootHex, postState);
this.regen.processState(blockRootHex, postState).then((prunedStates) => {
if (prunedStates) {
for (const states of prunedStates.values()) {
// cp states on the same epoch shares the same balances seed tree so only need one of them
this.balancesTreeCache.processUnusedState(states[0]);
}
}
}).catch((e) => {
this.logger.error("Regen error to process state for block", {slot: blockSlot, root: blockRootHex}, e as Error);
});

this.metrics?.importBlock.bySource.inc({source});
this.logger.verbose("Added block to forkchoice and state cache", {slot: blockSlot, root: blockRootHex});
Expand Down
14 changes: 14 additions & 0 deletions packages/beacon-node/src/chain/chain.ts
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ import {DbCPStateDatastore} from "./stateCache/datastore/db.js";
import {FileCPStateDatastore} from "./stateCache/datastore/file.js";
import {SyncCommitteeRewards, computeSyncCommitteeRewards} from "./rewards/syncCommitteeRewards.js";
import {AttestationsRewards, computeAttestationsRewards} from "./rewards/attestationsRewards.js";
import {BalancesTreeCache} from "./balancesTreeCache.js";

/**
* Arbitrary constants, blobs and payloads should be consumed immediately in the same slot
Expand Down Expand Up @@ -158,6 +159,7 @@ export class BeaconChain implements IBeaconChain {
readonly beaconProposerCache: BeaconProposerCache;
readonly checkpointBalancesCache: CheckpointBalancesCache;
readonly shufflingCache: ShufflingCache;
readonly balancesTreeCache: BalancesTreeCache;
/** Map keyed by executionPayload.blockHash of the block for those blobs */
readonly producedContentsCache = new Map<BlockHash, deneb.Contents>();

Expand Down Expand Up @@ -247,6 +249,7 @@ export class BeaconChain implements IBeaconChain {
this.beaconProposerCache = new BeaconProposerCache(opts);
this.checkpointBalancesCache = new CheckpointBalancesCache();
this.shufflingCache = new ShufflingCache(metrics, this.opts);
this.balancesTreeCache = new BalancesTreeCache(metrics);

// Restore state caches
// anchorState may already by a CachedBeaconState. If so, don't create the cache again, since deserializing all
Expand All @@ -260,6 +263,7 @@ export class BeaconChain implements IBeaconChain {
config,
pubkey2index: new PubkeyIndexMap(),
index2pubkey: [],
balancesTreeCache: this.balancesTreeCache,
});
this.shufflingCache.processState(cachedState, cachedState.epochCtx.previousShuffling.epoch);
this.shufflingCache.processState(cachedState, cachedState.epochCtx.currentShuffling.epoch);
Expand Down Expand Up @@ -863,6 +867,16 @@ export class BeaconChain implements IBeaconChain {
}
}

pruneOnFinalized(finalizedEpoch: Epoch): void {
const prunedStates = this.regen.pruneOnFinalized(finalizedEpoch);
if (prunedStates) {
// cp states on the same epoch shares the same balances seed tree so only need one of them
for (const states of prunedStates.values()) {
this.balancesTreeCache.processUnusedState(states[0]);
}
}
}

/**
* Regenerate state for attestation verification, this does not happen with default chain option of maxSkipSlots = 32 .
* However, need to handle just in case. Lodestar doesn't support multiple regen state requests for attestation verification
Expand Down
2 changes: 2 additions & 0 deletions packages/beacon-node/src/chain/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,8 @@ export interface IBeaconChain {
blockRef: BeaconBlock | BlindedBeaconBlock,
validatorIds?: (ValidatorIndex | string)[]
): Promise<SyncCommitteeRewards>;

pruneOnFinalized(finalizedEpoch: Epoch): void;
}

export type SSZObjectType =
Expand Down
22 changes: 16 additions & 6 deletions packages/beacon-node/src/chain/regen/queued.ts
Original file line number Diff line number Diff line change
Expand Up @@ -148,16 +148,26 @@ export class QueuedStateRegenerator implements IStateRegenerator {
this.blockStateCache.prune(headStateRoot);
}

pruneOnFinalized(finalizedEpoch: number): void {
this.checkpointStateCache.pruneFinalized(finalizedEpoch);
pruneOnFinalized(finalizedEpoch: number): Map<Epoch, CachedBeaconStateAllForks[]> | null {
const prunedStates = this.checkpointStateCache.pruneFinalized(finalizedEpoch);
this.blockStateCache.deleteAllBeforeEpoch(finalizedEpoch);

return prunedStates;
}

processState(blockRootHex: RootHex, postState: CachedBeaconStateAllForks): void {
async processState(
blockRootHex: RootHex,
postState: CachedBeaconStateAllForks
): Promise<Map<Epoch, CachedBeaconStateAllForks[]> | null> {
this.blockStateCache.add(postState);
this.checkpointStateCache.processState(blockRootHex, postState).catch((e) => {
this.logger.debug("Error processing block state", {blockRootHex, slot: postState.slot}, e);
});
let prunedStates: Map<Epoch, CachedBeaconStateAllForks[]> | null = null;
try {
prunedStates = await this.checkpointStateCache.processState(blockRootHex, postState);
} catch (e) {
this.logger.debug("Error processing block state", {blockRootHex, slot: postState.slot}, e as Error);
}

return prunedStates;
}

addCheckpointState(cp: phase0.Checkpoint, item: CachedBeaconStateAllForks): void {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,9 @@ export class InMemoryCheckpointStateCache implements CheckpointStateCache {
return this.getLatest(rootHex, maxEpoch, opts);
}

async processState(): Promise<number> {
async processState(): Promise<Map<Epoch, CachedBeaconStateAllForks[]> | null> {
// do nothing, this class does not support prunning
return 0;
return null;
}

get(cp: CheckpointHex, opts?: StateCloneOpts): CachedBeaconStateAllForks | null {
Expand Down Expand Up @@ -122,12 +122,17 @@ export class InMemoryCheckpointStateCache implements CheckpointStateCache {
return previousHits;
}

pruneFinalized(finalizedEpoch: Epoch): void {
pruneFinalized(finalizedEpoch: Epoch): Map<Epoch, CachedBeaconStateAllForks[]> {
const result = new Map<Epoch, CachedBeaconStateAllForks[]>();

for (const epoch of this.epochIndex.keys()) {
if (epoch < finalizedEpoch) {
this.deleteAllEpochItems(epoch);
const deletedStates = this.deleteAllEpochItems(epoch);
result.set(epoch, deletedStates);
}
}

return result;
}

prune(finalizedEpoch: Epoch, justifiedEpoch: Epoch): void {
Expand All @@ -153,11 +158,19 @@ export class InMemoryCheckpointStateCache implements CheckpointStateCache {
}
}

deleteAllEpochItems(epoch: Epoch): void {
deleteAllEpochItems(epoch: Epoch): CachedBeaconStateAllForks[] {
const states = [];
for (const rootHex of this.epochIndex.get(epoch) || []) {
this.cache.delete(toCheckpointKey({rootHex, epoch}));
const key = toCheckpointKey({rootHex, epoch});
const state = this.cache.get(key);
if (state) {
states.push(state);
}
this.cache.delete(key);
}
this.epochIndex.delete(epoch);

return states;
}

clear(): void {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -421,14 +421,17 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache {
/**
* Prune all checkpoint states before the provided finalized epoch.
*/
pruneFinalized(finalizedEpoch: Epoch): void {
pruneFinalized(finalizedEpoch: Epoch): Map<Epoch, CachedBeaconStateAllForks[]> | null {
for (const epoch of this.epochIndex.keys()) {
if (epoch < finalizedEpoch) {
this.deleteAllEpochItems(epoch).catch((e) =>
this.logger.debug("Error delete all epoch items", {epoch, finalizedEpoch}, e as Error)
);
}
}

// not likely to return anything in-memory state because we may persist states even before they are finalized
return null;
}

/**
Expand Down Expand Up @@ -481,12 +484,14 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache {
*
* As of Mar 2024, it takes <=350ms to persist a holesky state on fast server
*/
async processState(blockRootHex: RootHex, state: CachedBeaconStateAllForks): Promise<number> {
let persistCount = 0;
async processState(
blockRootHex: RootHex,
state: CachedBeaconStateAllForks
): Promise<Map<Epoch, CachedBeaconStateAllForks[]> | null> {
// it's important to sort the epochs in ascending order, in case of big reorg we always want to keep the most recent checkpoint states
const sortedEpochs = Array.from(this.epochIndex.keys()).sort((a, b) => a - b);
if (sortedEpochs.length <= this.maxEpochsInMemory) {
return 0;
return null;
}

const blockSlot = state.slot;
Expand All @@ -502,24 +507,19 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache {
// normally the block persist happens at 2/3 of slot 0 of epoch, if it's already late then just skip to allow other tasks to run
// there are plenty of chances in the same epoch to persist checkpoint states, also if block is late it could be reorged
this.logger.verbose("Skip persist checkpoint states", {blockSlot, root: blockRootHex});
return 0;
return null;
}

const persistEpochs = sortedEpochs.slice(0, sortedEpochs.length - this.maxEpochsInMemory);

const result = new Map<Epoch, CachedBeaconStateAllForks[]>();
for (const lowestEpoch of persistEpochs) {
// usually there is only 0 or 1 epoch to persist in this loop
persistCount += await this.processPastEpoch(blockRootHex, state, lowestEpoch);
const prunedStates = await this.processPastEpoch(blockRootHex, state, lowestEpoch);
result.set(lowestEpoch, prunedStates);
}

if (persistCount > 0) {
this.logger.verbose("Persisted checkpoint states", {
slot: blockSlot,
root: blockRootHex,
persistCount,
persistEpochs: persistEpochs.length,
});
}
return persistCount;
return result;
}

/**
Expand Down Expand Up @@ -648,13 +648,16 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache {
* Performance note:
* - In normal condition, we persist 1 checkpoint state per epoch.
* - In reorged condition, we may persist multiple (most likely 2) checkpoint states per epoch.
*
* Return the pruned states from memory
*/
private async processPastEpoch(
blockRootHex: RootHex,
state: CachedBeaconStateAllForks,
epoch: Epoch
): Promise<number> {
): Promise<CachedBeaconStateAllForks[]> {
let persistCount = 0;
const prunedStates: CachedBeaconStateAllForks[] = [];
const epochBoundarySlot = computeStartSlotAtEpoch(epoch);
const epochBoundaryRoot =
epochBoundarySlot === state.slot ? fromHexString(blockRootHex) : getBlockRootAtSlot(state, epochBoundarySlot);
Expand Down Expand Up @@ -735,10 +738,20 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache {
this.metrics?.statePruneFromMemoryCount.inc();
this.logger.verbose("Pruned checkpoint state from memory", logMeta);
}

prunedStates.push(state);
}
}

return persistCount;
if (persistCount > 0) {
this.logger.verbose("Persisted checkpoint states", {
stateSlot: state.slot,
blockRoot: blockRootHex,
persistCount,
});
}

return prunedStates;
}

/**
Expand Down
7 changes: 5 additions & 2 deletions packages/beacon-node/src/chain/stateCache/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,11 @@ export interface CheckpointStateCache {
): Promise<CachedBeaconStateAllForks | null>;
updatePreComputedCheckpoint(rootHex: RootHex, epoch: Epoch): number | null;
prune(finalizedEpoch: Epoch, justifiedEpoch: Epoch): void;
pruneFinalized(finalizedEpoch: Epoch): void;
processState(blockRootHex: RootHex, state: CachedBeaconStateAllForks): Promise<number>;
pruneFinalized(finalizedEpoch: Epoch): Map<Epoch, CachedBeaconStateAllForks[]> | null;
processState(
blockRootHex: RootHex,
state: CachedBeaconStateAllForks
): Promise<Map<Epoch, CachedBeaconStateAllForks[]> | null>;
clear(): void;
dumpSummary(): routes.lodestar.StateCacheItem[];
/** Expose beacon states stored in cache. Use with caution */
Expand Down
15 changes: 15 additions & 0 deletions packages/beacon-node/src/metrics/metrics/lodestar.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1315,6 +1315,21 @@ export function createLodestarMetrics(
}),
},

balancesTreeCache: {
size: register.gauge({
name: "lodestar_balances_tree_cache_size",
help: "Balances tree cache size",
}),
hit: register.gauge({
name: "lodestar_balances_tree_cache_hit_total",
help: "Total number of balances tree cache hits",
}),
miss: register.gauge({
name: "lodestar_balances_tree_cache_miss_total",
help: "Total number of balances tree cache misses",
}),
},

seenCache: {
aggregatedAttestations: {
superSetCheckTotal: register.histogram({
Expand Down
5 changes: 5 additions & 0 deletions packages/state-transition/src/cache/balancesTreeCache.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
import {UintNumberType, ListBasicTreeViewDU} from "@chainsafe/ssz";

export interface IBalancesTreeCache {
getUnusedBalances(): ListBasicTreeViewDU<UintNumberType> | undefined;
}
Loading