diff --git a/CHANGELOG.md b/CHANGELOG.md index 736045598c81..afef40a38fc7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. ### Added +- Added temporal sharding support, allowing completed ledger ranges to be sealed into immutable, independently-addressable shards while the active shard continues accepting writes. Sharding is disabled by default for backward compatibility. New governance proposal actions: `seal_current_shard`, `set_shard_policy`. New `ccf.node.sealShard()`, `ccf.node.setShardPolicy()` JavaScript governance APIs. New `CCFConfig::Sharding` configuration section. Shard metadata is stored in public governance tables (`public:ccf.gov.shards.info`, `public:ccf.gov.shards.policy`). Auto-seal is supported based on seqno count or time thresholds. Sealed shard data is automatically archived to the first `ledger.read_only_directories` entry for shared access across nodes. - Added support for inline transaction receipt construction at commit time. Endpoint authors can use `build_receipt_for_committed_tx()` to construct a full `TxReceiptImpl` from the `CommittedTxInfo` passed to their `ConsensusCommittedEndpointFunction` callback. See the logging sample app (`/log/blocking/private/receipt`) for example usage (#7785). ### Changed diff --git a/CMakeLists.txt b/CMakeLists.txt index f87ea9aafb63..9d5f13a8263a 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -767,6 +767,15 @@ if(BUILD_TESTS) PRIVATE ccf_kv ccf_endpoints ccf_tasks ) + add_unit_test( + sharding_test + ${CMAKE_CURRENT_SOURCE_DIR}/src/node/test/sharding.cpp + ) + target_link_libraries( + sharding_test + PRIVATE ccf_kv ccf_endpoints ccf_tasks + ) + add_unit_test( node_info_json_test ${CMAKE_CURRENT_SOURCE_DIR}/src/node/test/node_info_json.cpp diff --git a/doc/host_config_schema/cchost_config.json b/doc/host_config_schema/cchost_config.json index e0912c39fc51..edac13072e6e 100644 --- a/doc/host_config_schema/cchost_config.json +++ b/doc/host_config_schema/cchost_config.json @@ -569,6 +569,36 @@ "description": "This section includes configuration for periodic cleanup of old files (snapshots, ledger chunks)", "additionalProperties": false }, + "sharding": { + "type": "object", + "properties": { + "enabled": { + "type": "boolean", + "default": false, + "description": "Enable temporal sharding of the ledger." + }, + "auto_seal_after_seqno_count": { + "type": "integer", + "default": 0, + "minimum": 0, + "description": "Automatically seal the active shard after this many committed sequence numbers. 0 disables auto-seal by count." + }, + "auto_seal_after_duration_s": { + "type": "integer", + "default": 0, + "minimum": 0, + "description": "Automatically seal the active shard after this many seconds. 0 disables auto-seal by time." + }, + "max_active_shard_memory_mb": { + "type": "integer", + "default": 0, + "minimum": 0, + "description": "Advisory memory limit for the active shard in megabytes. 0 means unlimited." + } + }, + "description": "This section includes configuration for temporal sharding of the ledger", + "additionalProperties": false + }, "logging": { "type": "object", "properties": { diff --git a/doc/operations/index.rst b/doc/operations/index.rst index 5277eb57c7cb..6032c726663b 100644 --- a/doc/operations/index.rst +++ b/doc/operations/index.rst @@ -96,6 +96,13 @@ This section describes how :term:`Operators` manage the different nodes constitu --- + :fa:`layer-group` :doc:`sharding` + ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + + Seal completed ledger ranges into immutable, independently-addressable shards. + + --- + :fa:`terminal` :doc:`operator_rpc_api` ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ @@ -117,4 +124,5 @@ This section describes how :term:`Operators` manage the different nodes constitu platforms/index troubleshooting resource_usage + sharding operator_rpc_api diff --git a/doc/operations/sharding.rst b/doc/operations/sharding.rst new file mode 100644 index 000000000000..9037be8e353d --- /dev/null +++ b/doc/operations/sharding.rst @@ -0,0 +1,153 @@ +Temporal Sharding +================= + +Temporal sharding allows operators to seal completed time ranges of the ledger into immutable, independently-addressable shards. The active shard continues accepting writes while sealed shards are automatically archived to the shared read-only ledger directory for access by all nodes. + +Sharding is disabled by default and has no effect on existing single-shard behaviour. + +Concepts +-------- + +**Shard** + A contiguous range of ledger sequence numbers. At any time, exactly one shard is *active* (accepting writes). Older shards progress through *Sealing* and *Sealed* states. + +**Shard seal** + The process of closing the active shard at a committed sequence number, triggering a snapshot at the boundary, rekeying the ledger secret, and opening a new active shard. + +**Shard policy** + Governance-controlled thresholds that may trigger automatic sealing based on sequence number count or elapsed time. + +Prerequisites +------------- + +Sharding requires at least one entry in ``ledger.read_only_directories``. This is the shared mount path (e.g. a Kubernetes PVC) where all nodes look for committed ledger chunks. When a shard is sealed, its ledger chunks are hard-linked (or copied) into ``/shards//``, making sealed shard data automatically available to all nodes mounting the same volume. + +If ``ledger.read_only_directories`` is empty when sharding is enabled, the node logs an error and sharding is not activated. + +Configuration +------------- + +Sharding is configured in the node's JSON configuration file: + +.. code-block:: json + + { + "ledger": { + "directory": "ledger", + "read_only_directories": ["/shared/ledger"] + }, + "sharding": { + "enabled": true, + "auto_seal_after_seqno_count": 100000, + "auto_seal_after_duration_s": 3600, + "max_active_shard_memory_mb": 1024 + } + } + +.. list-table:: + :header-rows: 1 + :widths: 35 10 55 + + * - Field + - Default + - Description + * - ``enabled`` + - ``false`` + - Enable temporal sharding. + * - ``auto_seal_after_seqno_count`` + - ``0`` + - Automatically seal the active shard after this many committed sequence numbers. ``0`` disables auto-seal by count. + * - ``auto_seal_after_duration_s`` + - ``0`` + - Automatically seal the active shard after this many seconds. ``0`` disables auto-seal by time. + * - ``max_active_shard_memory_mb`` + - ``0`` + - Advisory memory limit for the active shard. ``0`` means unlimited. + +When sharding is enabled, the initial shard (shard 0) is created automatically when the service transitions to open. + +Governance Actions +------------------ + +Members can manage shards through governance proposals. + +``seal_current_shard`` +~~~~~~~~~~~~~~~~~~~~~~ + +Seals the currently active shard at the latest committed sequence number. This initiates a two-phase process: the shard is marked as **Sealing**, a snapshot is triggered at the boundary, and the ledger is rekeyed. Once the snapshot is committed asynchronously, the shard transitions to **Sealed** and its data is archived to the shared read-only ledger directory. + +.. code-block:: json + + { + "actions": [ + { + "name": "seal_current_shard" + } + ] + } + +``set_shard_policy`` +~~~~~~~~~~~~~~~~~~~~ + +Updates the shard policy. All fields are optional — unspecified fields default to ``0`` (disabled). + +.. code-block:: json + + { + "actions": [ + { + "name": "set_shard_policy", + "args": { + "auto_seal_after_seqno_count": 100000, + "auto_seal_after_duration_s": 3600, + "max_active_shard_memory_mb": 1024 + } + } + ] + } + +``migrate_shard`` has been removed — sealed shards are automatically archived to the first ``ledger.read_only_directories`` entry. + +KV Tables +--------- + +Shard metadata is stored in public governance tables: + +- ``public:ccf.gov.shards.info`` — Maps shard ID to ``ShardInfo`` (shard boundaries, status, snapshot seqno). +- ``public:ccf.gov.shards.policy`` — Singleton ``ShardPolicyInfo`` (auto-seal thresholds). + +Sealed Shard Storage +-------------------- + +When a shard is sealed, the primary node sends a ``ledger_shard_sealed`` message to the host process. The host hard-links (or copies, if cross-device) all committed ledger chunk files covering the shard's sequence number range from ``ledger.directory`` into: + +.. code-block:: text + + /shards// + +This means all nodes mounting the same shared volume automatically have access to sealed shard data for historical queries, without requiring explicit migration. + +Shard Lifecycle +--------------- + +1. **Active** — The shard is open and accepting writes. +2. **Sealing** — A seal has been initiated. A snapshot is being taken at the shard boundary and the ledger is being rekeyed. The shard remains in this state until the snapshot is committed. +3. **Sealed** — The snapshot has been committed. The shard is immutable and its data has been archived to the shared read-only directory. + +.. code-block:: text + + Active ──seal_current_shard──> Sealing ──snapshot committed (async)──> Sealed + +The transition from Sealing to Sealed is **asynchronous**: when the snapshotter commits the shard-seal snapshot, it fires a callback (``on_shard_seal_committed``) that updates the shard status in the KV and notifies the host to archive the ledger chunks. This ensures that sealed shard data is only archived after the boundary snapshot is durable. + +Auto-seal +--------- + +When ``auto_seal_after_seqno_count`` or ``auto_seal_after_duration_s`` is configured and sharding is enabled, the primary node periodically checks these thresholds. If either threshold is exceeded, the active shard is sealed automatically without requiring a governance proposal. + +Recovery +-------- + +During service recovery, the sharding state is restored from the KV store. The node identifies the current active shard and resumes from its start sequence number. Ledger secret chains are preserved across shard boundaries via the existing encrypted past ledger secret mechanism. + +If a shard is found in the **Sealing** state during recovery (i.e. the node crashed after initiating a seal but before the snapshot was committed), the seal can be retried via a new ``seal_current_shard`` proposal once the service is open. diff --git a/include/ccf/node/startup_config.h b/include/ccf/node/startup_config.h index 527e96461e61..ab21ca21a4e2 100644 --- a/include/ccf/node/startup_config.h +++ b/include/ccf/node/startup_config.h @@ -126,6 +126,17 @@ namespace ccf bool operator==(const FilesCleanup&) const = default; }; FilesCleanup files_cleanup = {}; + + struct Sharding + { + bool enabled = false; + size_t auto_seal_after_seqno_count = 0; + size_t auto_seal_after_duration_s = 0; + size_t max_active_shard_memory_mb = 0; + + bool operator==(const Sharding&) const = default; + }; + Sharding sharding = {}; }; struct RecoveryDecisionProtocolConfig diff --git a/samples/constitutions/default/actions.js b/samples/constitutions/default/actions.js index 667a044071ae..879e344d3516 100644 --- a/samples/constitutions/default/actions.js +++ b/samples/constitutions/default/actions.js @@ -1604,4 +1604,69 @@ const actions = new Map([ }, ), ], + [ + "seal_current_shard", + new Action( + function (args) { + checkNone(args); + }, + function (args) { + ccf.node.sealShard(); + }, + ), + ], + [ + "set_shard_policy", + new Action( + function (args) { + if (args.auto_seal_after_seqno_count !== undefined) { + checkType( + args.auto_seal_after_seqno_count, + "number", + "auto_seal_after_seqno_count", + ); + checkBounds( + args.auto_seal_after_seqno_count, + 0, + Number.MAX_SAFE_INTEGER, + "auto_seal_after_seqno_count", + ); + } + if (args.auto_seal_after_duration_s !== undefined) { + checkType( + args.auto_seal_after_duration_s, + "number", + "auto_seal_after_duration_s", + ); + checkBounds( + args.auto_seal_after_duration_s, + 0, + Number.MAX_SAFE_INTEGER, + "auto_seal_after_duration_s", + ); + } + if (args.max_active_shard_memory_mb !== undefined) { + checkType( + args.max_active_shard_memory_mb, + "number", + "max_active_shard_memory_mb", + ); + checkBounds( + args.max_active_shard_memory_mb, + 0, + Number.MAX_SAFE_INTEGER, + "max_active_shard_memory_mb", + ); + } + }, + function (args) { + const policy = { + auto_seal_after_seqno_count: args.auto_seal_after_seqno_count || 0, + auto_seal_after_duration_s: args.auto_seal_after_duration_s || 0, + max_active_shard_memory_mb: args.max_active_shard_memory_mb || 0, + }; + ccf.node.setShardPolicy(ccf.jsonCompatibleToBuf(policy)); + }, + ), + ], ]); diff --git a/samples/minimal_ccf/app/actions.js b/samples/minimal_ccf/app/actions.js index 5531ace40060..148c4ea79b3c 100644 --- a/samples/minimal_ccf/app/actions.js +++ b/samples/minimal_ccf/app/actions.js @@ -1511,4 +1511,69 @@ const actions = new Map([ function (args) {}, ), ], + [ + "seal_current_shard", + new Action( + function (args) { + checkNone(args); + }, + function (args) { + ccf.node.sealShard(); + }, + ), + ], + [ + "set_shard_policy", + new Action( + function (args) { + if (args.auto_seal_after_seqno_count !== undefined) { + checkType( + args.auto_seal_after_seqno_count, + "number", + "auto_seal_after_seqno_count", + ); + checkBounds( + args.auto_seal_after_seqno_count, + 0, + Number.MAX_SAFE_INTEGER, + "auto_seal_after_seqno_count", + ); + } + if (args.auto_seal_after_duration_s !== undefined) { + checkType( + args.auto_seal_after_duration_s, + "number", + "auto_seal_after_duration_s", + ); + checkBounds( + args.auto_seal_after_duration_s, + 0, + Number.MAX_SAFE_INTEGER, + "auto_seal_after_duration_s", + ); + } + if (args.max_active_shard_memory_mb !== undefined) { + checkType( + args.max_active_shard_memory_mb, + "number", + "max_active_shard_memory_mb", + ); + checkBounds( + args.max_active_shard_memory_mb, + 0, + Number.MAX_SAFE_INTEGER, + "max_active_shard_memory_mb", + ); + } + }, + function (args) { + const policy = { + auto_seal_after_seqno_count: args.auto_seal_after_seqno_count || 0, + auto_seal_after_duration_s: args.auto_seal_after_duration_s || 0, + max_active_shard_memory_mb: args.max_active_shard_memory_mb || 0, + }; + ccf.node.setShardPolicy(ccf.jsonCompatibleToBuf(policy)); + }, + ), + ], ]); diff --git a/src/common/configuration.h b/src/common/configuration.h index a46469bd33d1..cdfb4599b316 100644 --- a/src/common/configuration.h +++ b/src/common/configuration.h @@ -122,6 +122,15 @@ namespace ccf max_committed_ledger_chunks, interval); + DECLARE_JSON_TYPE_WITH_OPTIONAL_FIELDS(CCFConfig::Sharding); + DECLARE_JSON_REQUIRED_FIELDS(CCFConfig::Sharding); + DECLARE_JSON_OPTIONAL_FIELDS( + CCFConfig::Sharding, + enabled, + auto_seal_after_seqno_count, + auto_seal_after_duration_s, + max_active_shard_memory_mb); + DECLARE_JSON_TYPE_WITH_OPTIONAL_FIELDS(CCFConfig); DECLARE_JSON_REQUIRED_FIELDS(CCFConfig, network); DECLARE_JSON_OPTIONAL_FIELDS( @@ -135,6 +144,7 @@ namespace ccf attestation, snapshots, files_cleanup, + sharding, node_to_node_message_limit, historical_cache_soft_limit); diff --git a/src/consensus/ledger_enclave_types.h b/src/consensus/ledger_enclave_types.h index 6dff484391a0..00145742aaba 100644 --- a/src/consensus/ledger_enclave_types.h +++ b/src/consensus/ledger_enclave_types.h @@ -37,6 +37,9 @@ namespace consensus DEFINE_RINGBUFFER_MSG_TYPE(snapshot_allocate), DEFINE_RINGBUFFER_MSG_TYPE(snapshot_commit), + /// Notify host that a shard has been sealed. Enclave -> Host + DEFINE_RINGBUFFER_MSG_TYPE(ledger_shard_sealed), + /// Host -> Enclave DEFINE_RINGBUFFER_MSG_TYPE(snapshot_allocated), }; @@ -84,3 +87,9 @@ DECLARE_RINGBUFFER_MESSAGE_PAYLOAD( ::consensus::snapshot_commit, ::consensus::Index /* snapshot idx */, std::vector /* serialised receipt */); + +DECLARE_RINGBUFFER_MESSAGE_PAYLOAD( + ::consensus::ledger_shard_sealed, + uint64_t /* shard_id */, + ::consensus::Index /* seqno_start */, + ::consensus::Index /* seqno_end */); diff --git a/src/host/ledger.h b/src/host/ledger.h index fb2b821827e1..f636f1400efe 100644 --- a/src/host/ledger.h +++ b/src/host/ledger.h @@ -1692,6 +1692,104 @@ namespace asynchost purpose); } }); + + DISPATCHER_SET_MESSAGE_HANDLER( + disp, + ::consensus::ledger_shard_sealed, + [this](const uint8_t* data, size_t size) { + auto shard_id = serialized::read(data, size); + auto seqno_start = + serialized::read<::consensus::Index>(data, size); + auto seqno_end = serialized::read<::consensus::Index>(data, size); + handle_shard_sealed(shard_id, seqno_start, seqno_end); + }); + } + + private: + void handle_shard_sealed( + uint64_t shard_id, + ::consensus::Index seqno_start, + ::consensus::Index seqno_end) + { + if (read_ledger_dirs.empty()) + { + LOG_FAIL_FMT( + "Cannot archive sealed shard {}: no read-only ledger directories " + "configured", + shard_id); + return; + } + + const auto& target_dir = read_ledger_dirs.front(); + const auto shard_dir = + target_dir / "shards" / std::to_string(shard_id); + + try + { + fs::create_directories(shard_dir); + } + catch (const std::exception& e) + { + LOG_FAIL_FMT( + "Failed to create shard directory {}: {}", shard_dir.string(), e.what()); + return; + } + + // Hard-link committed ledger chunk files covering [seqno_start, + // seqno_end] from the primary ledger directory into the shard directory + size_t linked_count = 0; + for (auto const& f : fs::directory_iterator(ledger_dir)) + { + auto file_name = f.path().filename().string(); + if ( + !is_ledger_file_name_committed(file_name) || + is_ledger_file_name_ignored(file_name)) + { + continue; + } + + auto first_idx = get_start_idx_from_file_name(file_name); + auto chunk_last_idx = get_last_idx_from_file_name(file_name); + if (!chunk_last_idx.has_value()) + { + continue; + } + + // Include any chunk that overlaps with the shard's seqno range + if (chunk_last_idx.value() >= seqno_start && first_idx <= seqno_end) + { + const auto src = ledger_dir / file_name; + const auto dst = shard_dir / file_name; + + std::error_code ec; + fs::create_hard_link(src, dst, ec); + if (ec) + { + // Fall back to copy if hard link fails (e.g. cross-device) + fs::copy_file( + src, dst, fs::copy_options::skip_existing, ec); + if (ec) + { + LOG_FAIL_FMT( + "Failed to link/copy {} to {}: {}", + src.string(), + dst.string(), + ec.message()); + continue; + } + } + linked_count++; + } + } + + LOG_INFO_FMT( + "Shard {} sealed: archived {} ledger chunk(s) covering seqnos " + "[{}, {}] to {}", + shard_id, + linked_count, + seqno_start, + seqno_end, + shard_dir.string()); } }; } diff --git a/src/js/extensions/ccf/node.cpp b/src/js/extensions/ccf/node.cpp index 03ffe0f2654f..f34f8e9c2eb8 100644 --- a/src/js/extensions/ccf/node.cpp +++ b/src/js/extensions/ccf/node.cpp @@ -6,6 +6,7 @@ #include "ccf/js/core/context.h" #include "js/checks.h" #include "node/rpc/gov_logging.h" +#include "service/tables/shards.h" #include @@ -320,6 +321,115 @@ namespace ccf::js::extensions return ccf::js::core::constants::Undefined; } + + JSValue js_node_seal_shard( + JSContext* ctx, + [[maybe_unused]] JSValueConst this_val, + int argc, + [[maybe_unused]] JSValueConst* argv) + { + js::core::Context& jsctx = + *reinterpret_cast(JS_GetContextOpaque(ctx)); + if (argc != 0) + { + return JS_ThrowTypeError( + ctx, "Passed %d arguments but expected none", argc); + } + + auto* extension = jsctx.get_extension(); + if (extension == nullptr) + { + return JS_ThrowInternalError(ctx, "Failed to get extension object"); + } + + auto* gov_effects = extension->gov_effects; + if (gov_effects == nullptr) + { + return JS_ThrowInternalError( + ctx, "Failed to get governance effects object"); + } + + auto* tx_ptr = extension->tx; + if (tx_ptr == nullptr) + { + return JS_ThrowInternalError(ctx, "Failed to get tx object"); + } + + try + { + bool result = gov_effects->seal_shard(*tx_ptr); + if (!result) + { + return JS_ThrowInternalError(ctx, "Could not seal shard"); + } + } + catch (const std::exception& e) + { + GOV_FAIL_FMT("Unable to seal shard: {}", e.what()); + return JS_ThrowInternalError( + ctx, "Unable to seal shard: %s", e.what()); + } + + return ccf::js::core::constants::Undefined; + } + + JSValue js_node_set_shard_policy( + JSContext* ctx, + [[maybe_unused]] JSValueConst this_val, + int argc, + [[maybe_unused]] JSValueConst* argv) + { + js::core::Context& jsctx = + *reinterpret_cast(JS_GetContextOpaque(ctx)); + if (argc != 1) + { + return JS_ThrowTypeError( + ctx, "Passed %d arguments but expected one", argc); + } + + auto* extension = jsctx.get_extension(); + if (extension == nullptr) + { + return JS_ThrowInternalError(ctx, "Failed to get extension object"); + } + + auto* gov_effects = extension->gov_effects; + if (gov_effects == nullptr) + { + return JS_ThrowInternalError( + ctx, "Failed to get governance effects object"); + } + + auto* tx_ptr = extension->tx; + if (tx_ptr == nullptr) + { + return JS_ThrowInternalError(ctx, "Failed to get tx object"); + } + + try + { + size_t buf_sz = 0; + uint8_t* buf = JS_GetArrayBuffer(ctx, &buf_sz, argv[0]); + if (buf == nullptr) + { + return JS_ThrowTypeError( + ctx, "Shard policy argument is not an array buffer"); + } + + auto policy_json = + nlohmann::json::parse(buf, buf + buf_sz); + auto policy = policy_json.get(); + gov_effects->set_shard_policy(*tx_ptr, policy); + } + catch (const std::exception& e) + { + GOV_FAIL_FMT("Unable to set shard policy: {}", e.what()); + return JS_ThrowInternalError( + ctx, "Unable to set shard policy: %s", e.what()); + } + + return ccf::js::core::constants::Undefined; + } } void NodeExtension::install(js::core::Context& ctx) @@ -349,6 +459,12 @@ namespace ccf::js::extensions JS_CHECK_OR_THROW(node.set( "shuffleSealedShares", ctx.new_c_function(js_shuffle_sealed_shares, "shuffleSealedShares", 0))); + JS_CHECK_OR_THROW(node.set( + "sealShard", + ctx.new_c_function(js_node_seal_shard, "sealShard", 0))); + JS_CHECK_OR_THROW(node.set( + "setShardPolicy", + ctx.new_c_function(js_node_set_shard_policy, "setShardPolicy", 1))); auto ccf = ctx.get_or_create_global_property("ccf", ctx.new_obj()); JS_CHECK_OR_THROW(ccf.set("node", std::move(node))); diff --git a/src/js/extensions/ccf/node.h b/src/js/extensions/ccf/node.h index 9f6099053f1a..dac92ce50073 100644 --- a/src/js/extensions/ccf/node.h +++ b/src/js/extensions/ccf/node.h @@ -15,6 +15,8 @@ namespace ccf::js::extensions * - ccf.node.triggerRecoverySharesRefresh * - ccf.node.triggerLedgerChunk * - ccf.node.triggerSnapshot + * - ccf.node.sealShard + * - ccf.node.setShardPolicy * **/ class NodeExtension : public ExtensionInterface diff --git a/src/node/historical_queries.h b/src/node/historical_queries.h index 7bbc414019b2..f430217c23ac 100644 --- a/src/node/historical_queries.h +++ b/src/node/historical_queries.h @@ -13,6 +13,7 @@ #include "node/rpc/node_interface.h" #include "node/tx_receipt_impl.h" #include "service/tables/node_signature.h" +#include "service/tables/shards.h" #include #include @@ -111,6 +112,25 @@ namespace ccf::historical using LedgerEntry = std::vector; + std::optional get_shard_for_seqno(ccf::SeqNo seqno) + { + auto tx = source_store.create_read_only_tx(); + auto* shards = tx.ro(ccf::Tables::SHARDS); + std::optional result = std::nullopt; + + shards->foreach( + [&result, seqno](const uint64_t&, const ccf::ShardInfo& info) { + if (seqno >= info.seqno_start && seqno <= info.seqno_end) + { + result = info; + return false; + } + return true; + }); + + return result; + } + void update_earliest_known_ledger_secret() { if (earliest_secret_.secret == nullptr) diff --git a/src/node/ledger_secret.h b/src/node/ledger_secret.h index 49355e24407b..d16aa84b54d7 100644 --- a/src/node/ledger_secret.h +++ b/src/node/ledger_secret.h @@ -28,6 +28,7 @@ namespace ccf std::shared_ptr key; std::optional previous_secret_stored_version = std::nullopt; + std::optional shard_id = std::nullopt; ccf::crypto::HashBytes commit_secret; static ccf::crypto::HashBytes derive_commit_secret( @@ -79,7 +80,8 @@ namespace ccf DECLARE_JSON_TYPE_WITH_OPTIONAL_FIELDS(LedgerSecret); DECLARE_JSON_REQUIRED_FIELDS(LedgerSecret, raw_key); - DECLARE_JSON_OPTIONAL_FIELDS(LedgerSecret, previous_secret_stored_version); + DECLARE_JSON_OPTIONAL_FIELDS( + LedgerSecret, previous_secret_stored_version, shard_id); using LedgerSecretPtr = std::shared_ptr; diff --git a/src/node/ledger_secrets.h b/src/node/ledger_secrets.h index c7f0c13f8dd1..ff32fd726cbe 100644 --- a/src/node/ledger_secrets.h +++ b/src/node/ledger_secrets.h @@ -280,6 +280,26 @@ namespace ccf LOG_INFO_FMT("Added new ledger secret at seqno {}", version); } + void set_secret( + ccf::kv::Version version, + LedgerSecretPtr&& secret, + uint64_t shard_id) + { + secret->shard_id = shard_id; + set_secret(version, std::move(secret)); + } + + std::optional get_shard_id_for(ccf::kv::Version version) + { + std::lock_guard guard(lock); + auto ls = get_secret_for_version(version, true); + if (ls == nullptr) + { + return std::nullopt; + } + return ls->shard_id; + } + void rollback(ccf::kv::Version version) { std::lock_guard guard(lock); diff --git a/src/node/node_state.h b/src/node/node_state.h index 8678026e0142..493faebcaba2 100644 --- a/src/node/node_state.h +++ b/src/node/node_state.h @@ -54,6 +54,7 @@ #include "service/internal_tables_access.h" #include "service/tables/local_sealing.h" #include "service/tables/recovery_type.h" +#include "service/tables/shards.h" #include "share_manager.h" #include "snapshots/fetch.h" #include "snapshots/filenames.h" @@ -420,6 +421,27 @@ namespace ccf ::consensus::Index last_recovered_idx = 0; static const size_t recovery_batch_size = 100; + // + // Temporal sharding + // + uint64_t current_shard_id = 0; + ccf::kv::Version current_shard_seqno_start = 1; + bool shard_seal_in_progress = false; + std::chrono::milliseconds current_shard_elapsed{0}; + + // Tracks info needed to complete a seal after snapshot commits + struct PendingSealInfo + { + uint64_t shard_id = 0; + ccf::kv::Version seqno_start = 0; + ccf::kv::Version seqno_end = 0; + }; + std::optional pending_seal_info = std::nullopt; + + // Set by the snapshotter callback (which runs under the store maps lock, + // so we cannot do KV work there). Checked and cleared in tick(). + std::atomic seal_snapshot_committed{false}; + // // JWT key auto-refresh // @@ -1808,6 +1830,8 @@ namespace ccf throw std::logic_error("Service could not be opened"); } + initialize_sharding(tx); + // Trigger a snapshot (at next signature) to ensure we have a working // snapshot signed by the current (now new) service identity, in case // we need to recover soon again. @@ -2072,6 +2096,7 @@ namespace ccf InternalTablesAccess::open_service(tx); InternalTablesAccess::endorse_previous_identity( tx, *network.identity->get_key_pair()); + initialize_sharding(tx); trigger_snapshot(tx); return; } @@ -2133,6 +2158,17 @@ namespace ccf { const auto tx_id = consensus->get_committed_txid(); indexer->update_strategies(elapsed, {tx_id.first, tx_id.second}); + + // Check if a shard-seal snapshot was committed (flag set by + // the snapshotter callback) and complete the seal transition + if ( + seal_snapshot_committed.exchange(false) && + pending_seal_info.has_value()) + { + complete_shard_seal(pending_seal_info->shard_id); + } + + check_auto_seal(elapsed, tx_id.second); } n2n_channels->tick(elapsed); @@ -2322,6 +2358,320 @@ namespace ccf return true; } + bool seal_shard(ccf::kv::Tx& tx) override + { + std::lock_guard guard(lock); + sm.expect(NodeStartupState::partOfNetwork); + + if (!config.sharding.enabled) + { + LOG_FAIL_FMT("Cannot seal shard: sharding is not enabled"); + return false; + } + + const auto service_status = InternalTablesAccess::get_service_status(tx); + if ( + !service_status.has_value() || + service_status.value() != ServiceStatus::OPEN) + { + LOG_FAIL_FMT("Cannot seal shard while the service is not open"); + return false; + } + + if (shard_seal_in_progress) + { + LOG_FAIL_FMT( + "Cannot seal shard: a seal operation is already in progress"); + return false; + } + + shard_seal_in_progress = true; + + // 1. Record the shard boundary in the Shards table, marking it as + // Sealing + auto* shards = tx.rw(Tables::SHARDS); + auto committed_seqno = consensus->get_committed_seqno(); + + ShardInfo sealing_shard; + sealing_shard.shard_id = current_shard_id; + sealing_shard.seqno_start = current_shard_seqno_start; + sealing_shard.seqno_end = committed_seqno; + sealing_shard.status = ShardStatus::Sealing; + sealing_shard.snapshot_seqno = committed_seqno; + shards->put(current_shard_id, sealing_shard); + + // 2. Trigger a snapshot at the current committed seqno (shard boundary) + auto* committable_tx = dynamic_cast(&tx); + if (committable_tx != nullptr) + { + committable_tx->set_tx_flag( + ccf::kv::CommittableTx::TxFlag::SNAPSHOT_AT_NEXT_SIGNATURE); + } + + // Mark the snapshot as a shard seal snapshot + if (snapshotter) + { + snapshotter->mark_next_snapshot_as_shard_seal(current_shard_id); + } + + // 3. Force a ledger secret rekey — the new shard gets a fresh key + auto new_ledger_secret = make_ledger_secret(); + share_manager.issue_recovery_shares_for_shard_seal( + tx, new_ledger_secret); + + // Record ledger secret version for the sealed shard + auto latest_secret = network.ledger_secrets->get_latest(tx); + sealing_shard.ledger_secret_version = latest_secret.first; + shards->put(current_shard_id, sealing_shard); + + LedgerSecretsBroadcast::broadcast_new( + InternalTablesAccess::get_trusted_nodes(tx), + tx.wo(network.secrets), + std::move(new_ledger_secret)); + + // 4. Prepare the new active shard + auto new_shard_id = current_shard_id + 1; + ShardInfo new_shard; + new_shard.shard_id = new_shard_id; + new_shard.seqno_start = committed_seqno + 1; + new_shard.seqno_end = std::numeric_limits::max(); + new_shard.status = ShardStatus::Active; + shards->put(new_shard_id, new_shard); + + LOG_INFO_FMT( + "Shard {} sealing initiated at seqno {}. New active shard {} starts " + "at seqno {}", + current_shard_id, + committed_seqno, + new_shard_id, + committed_seqno + 1); + + // 5. Store pending seal info — the shard stays in Sealing state until + // the snapshot callback fires and transitions it to Sealed + notifies + // the host + pending_seal_info = PendingSealInfo{ + sealing_shard.shard_id, current_shard_seqno_start, committed_seqno}; + + current_shard_id = new_shard_id; + current_shard_seqno_start = committed_seqno + 1; + current_shard_elapsed = std::chrono::milliseconds{0}; + // shard_seal_in_progress remains true until the callback fires + + return true; + } + + void set_shard_policy(ccf::kv::Tx& tx, const ShardPolicyInfo& policy) + override + { + auto* shard_policy = tx.rw(Tables::SHARD_POLICY); + shard_policy->put(policy); + LOG_INFO_FMT( + "Shard policy updated: auto_seal_after_seqno_count={}, " + "auto_seal_after_duration_s={}, max_active_shard_memory_mb={}", + policy.auto_seal_after_seqno_count, + policy.auto_seal_after_duration_s, + policy.max_active_shard_memory_mb); + } + + void complete_shard_seal(uint64_t shard_id) + { + // Called from tick() when the snapshotter notifies us that the + // shard-seal snapshot has been committed. Transitions the shard from + // Sealing → Sealed and notifies the host to archive the ledger chunks. + std::lock_guard guard(lock); + + if ( + !pending_seal_info.has_value() || + pending_seal_info->shard_id != shard_id) + { + LOG_FAIL_FMT( + "Received shard seal completion for shard {} but no matching " + "pending seal", + shard_id); + return; + } + + auto seal_info = pending_seal_info.value(); + pending_seal_info = std::nullopt; + + // Update the shard status to Sealed in the KV + auto tx = network.tables->create_tx(); + auto* shards = tx.rw(Tables::SHARDS); + auto shard = shards->get(shard_id); + if (shard.has_value()) + { + shard->status = ShardStatus::Sealed; + shards->put(shard_id, shard.value()); + auto rc = tx.commit(); + if (rc != ccf::kv::CommitResult::SUCCESS) + { + LOG_FAIL_FMT( + "Failed to commit Sealed status for shard {}: {}", shard_id, rc); + } + } + + // Notify the host to hard-link sealed shard's ledger chunks + if (!config.ledger.read_only_directories.empty()) + { + auto shard_writer = writer_factory.create_writer_to_outside(); + RINGBUFFER_WRITE_MESSAGE( + ::consensus::ledger_shard_sealed, + shard_writer, + seal_info.shard_id, + seal_info.seqno_start, + seal_info.seqno_end); + } + + shard_seal_in_progress = false; + + LOG_INFO_FMT( + "Shard {} sealed: seqno range [{}, {}] archived to read-only " + "directory", + shard_id, + seal_info.seqno_start, + seal_info.seqno_end); + } + + void initialize_sharding(ccf::kv::Tx& tx) + { + if (!config.sharding.enabled) + { + return; + } + + if (config.ledger.read_only_directories.empty()) + { + LOG_FAIL_FMT( + "Sharding is enabled but ledger.read_only_directories is empty. " + "Sealed shard data requires at least one read-only ledger directory " + "(shared PVC mount) to be configured. Disabling sharding."); + return; + } + + auto* shards = tx.rw(Tables::SHARDS); + + // Check if shards already exist (e.g. after recovery) + bool has_shards = false; + shards->foreach([&has_shards](const uint64_t&, const ShardInfo&) { + has_shards = true; + return false; + }); + + if (!has_shards) + { + ShardInfo initial_shard; + initial_shard.shard_id = 0; + initial_shard.seqno_start = 1; + initial_shard.seqno_end = std::numeric_limits::max(); + initial_shard.status = ShardStatus::Active; + shards->put(0, initial_shard); + + LOG_INFO_FMT("Initialized temporal sharding with initial shard 0"); + } + else + { + // Find the current active shard + shards->foreach([this](const uint64_t&, const ShardInfo& info) { + if (info.status == ShardStatus::Active) + { + current_shard_id = info.shard_id; + current_shard_seqno_start = info.seqno_start; + } + return true; + }); + + LOG_INFO_FMT( + "Recovered sharding state: active shard {} starting at seqno {}", + current_shard_id, + current_shard_seqno_start); + } + + // Apply shard policy from configuration if not already set + auto* shard_policy = tx.rw(Tables::SHARD_POLICY); + if (!shard_policy->get().has_value()) + { + ShardPolicyInfo policy; + policy.auto_seal_after_seqno_count = + config.sharding.auto_seal_after_seqno_count; + policy.auto_seal_after_duration_s = + config.sharding.auto_seal_after_duration_s; + policy.max_active_shard_memory_mb = + config.sharding.max_active_shard_memory_mb; + shard_policy->put(policy); + } + } + + void check_auto_seal( + std::chrono::milliseconds elapsed, ccf::kv::Version committed_seqno) + { + if (!config.sharding.enabled || shard_seal_in_progress) + { + return; + } + + if (!consensus->is_primary()) + { + return; + } + + current_shard_elapsed += elapsed; + + bool should_seal = false; + + // Check seqno count threshold + if ( + config.sharding.auto_seal_after_seqno_count > 0 && + committed_seqno >= current_shard_seqno_start) + { + auto seqno_count = + static_cast(committed_seqno - current_shard_seqno_start + 1); + if (seqno_count >= config.sharding.auto_seal_after_seqno_count) + { + LOG_INFO_FMT( + "Auto-seal triggered: shard {} has {} seqnos (threshold: {})", + current_shard_id, + seqno_count, + config.sharding.auto_seal_after_seqno_count); + should_seal = true; + } + } + + // Check duration threshold + if ( + !should_seal && config.sharding.auto_seal_after_duration_s > 0 && + current_shard_elapsed.count() >= + static_cast( + config.sharding.auto_seal_after_duration_s * 1000)) + { + LOG_INFO_FMT( + "Auto-seal triggered: shard {} exceeded duration threshold " + "({}s >= {}s)", + current_shard_id, + current_shard_elapsed.count() / 1000, + config.sharding.auto_seal_after_duration_s); + should_seal = true; + } + + if (should_seal) + { + auto tx = network.tables->create_tx(); + if (seal_shard(tx)) + { + auto result = tx.commit(); + if (result == ccf::kv::CommitResult::SUCCESS) + { + current_shard_elapsed = std::chrono::milliseconds{0}; + LOG_INFO_FMT( + "Auto-seal committed for shard {}", current_shard_id - 1); + } + else + { + LOG_FAIL_FMT("Auto-seal commit failed: {}", result); + } + } + } + } + [[nodiscard]] NodeId get_node_id() const { return self; @@ -3188,6 +3538,13 @@ namespace ccf config.snapshots.tx_count, config.snapshots.min_tx_count, config.snapshots.time_interval); + + // Register shard seal completion callback so that when a shard-seal + // snapshot is committed, we get notified. The callback runs under the + // store's maps lock, so it just sets an atomic flag — the actual + // completion work (KV update + ring buffer message) happens in tick(). + snapshotter->set_on_shard_seal_committed( + [this](uint64_t) { seal_snapshot_committed.store(true); }); } void read_ledger_entries(::consensus::Index from, ::consensus::Index to) diff --git a/src/node/rpc/gov_effects.h b/src/node/rpc/gov_effects.h index 0c9a1c1db397..e35439adc652 100644 --- a/src/node/rpc/gov_effects.h +++ b/src/node/rpc/gov_effects.h @@ -45,5 +45,16 @@ namespace ccf { impl.shuffle_sealed_shares(tx); } + + bool seal_shard(ccf::kv::Tx& tx) override + { + return impl.seal_shard(tx); + } + + void set_shard_policy( + ccf::kv::Tx& tx, const ShardPolicyInfo& policy) override + { + impl.set_shard_policy(tx, policy); + } }; } \ No newline at end of file diff --git a/src/node/rpc/gov_effects_interface.h b/src/node/rpc/gov_effects_interface.h index f940d9b540f6..4d6b319bb61e 100644 --- a/src/node/rpc/gov_effects_interface.h +++ b/src/node/rpc/gov_effects_interface.h @@ -6,8 +6,12 @@ #include "ccf/node_subsystem_interface.h" #include "ccf/tx.h" +#include + namespace ccf { + struct ShardPolicyInfo; + class AbstractGovernanceEffects : public ccf::AbstractNodeSubSystem { public: @@ -31,5 +35,8 @@ namespace ccf virtual void trigger_ledger_chunk(ccf::kv::Tx& tx) = 0; virtual void trigger_snapshot(ccf::kv::Tx& tx) = 0; virtual void shuffle_sealed_shares(ccf::kv::Tx& tx) = 0; + virtual bool seal_shard(ccf::kv::Tx& tx) = 0; + virtual void set_shard_policy( + ccf::kv::Tx& tx, const ShardPolicyInfo& policy) = 0; }; } \ No newline at end of file diff --git a/src/node/rpc/node_interface.h b/src/node/rpc/node_interface.h index 82fab1610d9a..f8664b3ba721 100644 --- a/src/node/rpc/node_interface.h +++ b/src/node/rpc/node_interface.h @@ -16,6 +16,7 @@ #include "node/rpc/gov_effects_interface.h" #include "node/rpc/node_operation_interface.h" #include "node/session_metrics.h" +#include "service/tables/shards.h" namespace ccf { @@ -58,6 +59,9 @@ namespace ccf virtual RecoveryDecisionProtocolSubsystem& get_recovery_decision_protocol() = 0; virtual void shuffle_sealed_shares(ccf::kv::Tx& tx) = 0; + virtual bool seal_shard(ccf::kv::Tx& tx) = 0; + virtual void set_shard_policy( + ccf::kv::Tx& tx, const ShardPolicyInfo& policy) = 0; [[nodiscard]] virtual const ccf::StartupConfig& get_node_config() const = 0; virtual ccf::crypto::Pem get_network_cert() = 0; virtual void stop_notice() = 0; diff --git a/src/node/rpc/test/node_stub.h b/src/node/rpc/test/node_stub.h index 48ffe018e681..d2b6ca0feb71 100644 --- a/src/node/rpc/test/node_stub.h +++ b/src/node/rpc/test/node_stub.h @@ -163,6 +163,17 @@ namespace ccf { return; } + + bool seal_shard(ccf::kv::Tx& /*tx*/) override + { + return true; + } + + void set_shard_policy( + ccf::kv::Tx& /*tx*/, const ShardPolicyInfo& /*policy*/) override + { + return; + } }; class StubNodeStateCache : public historical::AbstractStateCache diff --git a/src/node/share_manager.h b/src/node/share_manager.h index 536049e47665..986a9515a09f 100644 --- a/src/node/share_manager.h +++ b/src/node/share_manager.h @@ -483,6 +483,25 @@ namespace ccf tx, new_ledger_secret, ledger_secrets->get_latest(tx)); } + /** Issue new recovery shares for a shard seal operation. This is + * identical to a rekey-triggered share issuance, ensuring the previous + * shard's ledger secret is encrypted with the new shard's secret and + * recorded in ENCRYPTED_PAST_LEDGER_SECRET. + * + * @param tx Store transaction object + * @param new_ledger_secret Pointer to new ledger secret for the new shard + */ + void issue_recovery_shares_for_shard_seal( + ccf::kv::Tx& tx, LedgerSecretPtr new_ledger_secret) + { + // Shard sealing mandates a rekey, so the previous secret is always + // the current latest. This ensures the backward chain of + // ENCRYPTED_PAST_LEDGER_SECRET spans shard boundaries for disaster + // recovery. + set_recovery_shares_info( + tx, new_ledger_secret, ledger_secrets->get_latest(tx)); + } + /** Issue new recovery shares of the same current ledger secret to all * active recovery members. The encrypted ledger secrets recorded in the * store are not updated. diff --git a/src/node/snapshotter.h b/src/node/snapshotter.h index ba042589bfb1..9701430dd8cc 100644 --- a/src/node/snapshotter.h +++ b/src/node/snapshotter.h @@ -67,6 +67,10 @@ namespace ccf std::optional> cose_sig = std::nullopt; std::optional> tree = std::nullopt; + // When set, this snapshot was taken as part of a shard seal operation + bool is_shard_seal = false; + std::optional sealed_shard_id = std::nullopt; + SnapshotInfo() = default; }; // Queue of pending snapshots that have been generated, but are not yet @@ -87,6 +91,16 @@ namespace ccf // Used to suspend snapshot generation during public recovery bool snapshot_generation_enabled = true; + // Shard seal tracking: when set, the next generated snapshot is marked + // as a shard boundary snapshot for the given shard_id + std::optional pending_shard_seal_id = std::nullopt; + std::optional last_committed_shard_seal_id = std::nullopt; + + // Callback invoked (under lock) when a shard-seal snapshot is committed. + // Set by the node to transition the shard from Sealing → Sealed. + std::function on_shard_seal_committed_cb = + nullptr; + // Indices at which a snapshot will be next generated and Boolean to // indicate whether a snapshot was forced at the given index struct SnapshotEntry @@ -216,6 +230,15 @@ namespace ccf // seqno is recorded via `record_snapshot_evidence_idx()` on a hook // rather than here. pending_snapshots[generation_count].version = snapshot_version; + + // If a shard seal is pending, mark this snapshot accordingly + if (pending_shard_seal_id.has_value()) + { + pending_snapshots[generation_count].is_shard_seal = true; + pending_snapshots[generation_count].sealed_shard_id = + pending_shard_seal_id.value(); + pending_shard_seal_id = std::nullopt; + } } auto serialised_snapshot = store->serialise_snapshot(std::move(snapshot)); @@ -330,6 +353,24 @@ namespace ccf std::move(snapshot_info.snapshot_digest)); commit_snapshot(snapshot_info.version, serialised_receipt); + + // Track shard seal completion + if (snapshot_info.is_shard_seal && + snapshot_info.sealed_shard_id.has_value()) + { + last_committed_shard_seal_id = + snapshot_info.sealed_shard_id.value(); + LOG_INFO_FMT( + "Shard seal snapshot committed for shard {}", + last_committed_shard_seal_id.value()); + + if (on_shard_seal_committed_cb) + { + on_shard_seal_committed_cb( + last_committed_shard_seal_id.value()); + } + } + it = pending_snapshots.erase(it); } else @@ -373,6 +414,26 @@ namespace ccf snapshot_generation_enabled = enabled; } + void mark_next_snapshot_as_shard_seal(uint64_t shard_id) + { + std::lock_guard guard(lock); + pending_shard_seal_id = shard_id; + } + + void set_on_shard_seal_committed( + std::function cb) + { + std::lock_guard guard(lock); + on_shard_seal_committed_cb = std::move(cb); + } + + bool is_shard_seal_snapshot_committed(uint64_t shard_id) + { + std::lock_guard guard(lock); + return last_committed_shard_seal_id.has_value() && + last_committed_shard_seal_id.value() == shard_id; + } + void init_from_snapshot_status(const SnapshotStatus& status) { std::lock_guard guard(lock); diff --git a/src/node/test/sharding.cpp b/src/node/test/sharding.cpp new file mode 100644 index 000000000000..6b52b8a38cac --- /dev/null +++ b/src/node/test/sharding.cpp @@ -0,0 +1,291 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the Apache 2.0 License. + +#include "node/snapshotter.h" +#include "service/tables/shards.h" + +#include "crypto/openssl/hash.h" +#include "ds/internal_logger.h" +#include "ds/ring_buffer.h" +#include "kv/test/null_encryptor.h" +#include "kv/test/stub_consensus.h" +#include "node/encryptor.h" +#include "node/history.h" + +#define DOCTEST_CONFIG_IMPLEMENT +#include +#include + +using StringString = ccf::kv::Map; + +void issue_transactions(ccf::NetworkState& network, size_t tx_count) +{ + for (size_t i = 0; i < tx_count; i++) + { + auto tx = network.tables->create_tx(); + auto map = tx.rw("public:map"); + map->put("foo", "bar"); + REQUIRE(tx.commit() == ccf::kv::CommitResult::SUCCESS); + } +} + +TEST_CASE("Shard tables basic operations") +{ + ccf::logger::config::default_init(); + + ccf::NetworkState network; + + auto consensus = std::make_shared(); + network.tables->set_consensus(consensus); + auto encryptor = std::make_shared(); + network.tables->set_encryptor(encryptor); + + INFO("Create initial shard"); + { + auto tx = network.tables->create_tx(); + auto* shards = tx.rw(ccf::Tables::SHARDS); + + ccf::ShardInfo initial_shard; + initial_shard.shard_id = 0; + initial_shard.seqno_start = 1; + initial_shard.seqno_end = std::numeric_limits::max(); + initial_shard.status = ccf::ShardStatus::Active; + shards->put(0, initial_shard); + + REQUIRE(tx.commit() == ccf::kv::CommitResult::SUCCESS); + } + + INFO("Read back initial shard"); + { + auto tx = network.tables->create_read_only_tx(); + auto* shards = tx.ro(ccf::Tables::SHARDS); + auto shard = shards->get(0); + + REQUIRE(shard.has_value()); + REQUIRE(shard->shard_id == 0); + REQUIRE(shard->seqno_start == 1); + REQUIRE(shard->status == ccf::ShardStatus::Active); + } + + INFO("Transition shard to Sealing and then Sealed"); + { + auto tx = network.tables->create_tx(); + auto* shards = tx.rw(ccf::Tables::SHARDS); + + auto shard = shards->get(0); + REQUIRE(shard.has_value()); + + shard->status = ccf::ShardStatus::Sealing; + shard->seqno_end = 100; + shards->put(0, shard.value()); + + REQUIRE(tx.commit() == ccf::kv::CommitResult::SUCCESS); + + // Verify sealing state + auto tx2 = network.tables->create_read_only_tx(); + auto* shards2 = tx2.ro(ccf::Tables::SHARDS); + auto sealed = shards2->get(0); + REQUIRE(sealed.has_value()); + REQUIRE(sealed->status == ccf::ShardStatus::Sealing); + REQUIRE(sealed->seqno_end == 100); + } + + INFO("Create new active shard after seal"); + { + auto tx = network.tables->create_tx(); + auto* shards = tx.rw(ccf::Tables::SHARDS); + + // Finalize the old shard + auto old_shard = shards->get(0); + REQUIRE(old_shard.has_value()); + old_shard->status = ccf::ShardStatus::Sealed; + shards->put(0, old_shard.value()); + + // Create the new active shard + ccf::ShardInfo new_shard; + new_shard.shard_id = 1; + new_shard.seqno_start = 101; + new_shard.seqno_end = std::numeric_limits::max(); + new_shard.status = ccf::ShardStatus::Active; + shards->put(1, new_shard); + + REQUIRE(tx.commit() == ccf::kv::CommitResult::SUCCESS); + } + + INFO("Verify both shards coexist correctly"); + { + auto tx = network.tables->create_read_only_tx(); + auto* shards = tx.ro(ccf::Tables::SHARDS); + + auto shard0 = shards->get(0); + REQUIRE(shard0.has_value()); + REQUIRE(shard0->status == ccf::ShardStatus::Sealed); + REQUIRE(shard0->seqno_end == 100); + + auto shard1 = shards->get(1); + REQUIRE(shard1.has_value()); + REQUIRE(shard1->status == ccf::ShardStatus::Active); + REQUIRE(shard1->seqno_start == 101); + } +} + +TEST_CASE("Shard policy table operations") +{ + ccf::logger::config::default_init(); + + ccf::NetworkState network; + + auto consensus = std::make_shared(); + network.tables->set_consensus(consensus); + auto encryptor = std::make_shared(); + network.tables->set_encryptor(encryptor); + + INFO("Set and read shard policy"); + { + auto tx = network.tables->create_tx(); + auto* policy = tx.rw(ccf::Tables::SHARD_POLICY); + + ccf::ShardPolicyInfo info; + info.auto_seal_after_seqno_count = 1000; + info.auto_seal_after_duration_s = 3600; + info.max_active_shard_memory_mb = 512; + policy->put(info); + + REQUIRE(tx.commit() == ccf::kv::CommitResult::SUCCESS); + } + + { + auto tx = network.tables->create_read_only_tx(); + auto* policy = tx.ro(ccf::Tables::SHARD_POLICY); + + auto info = policy->get(); + REQUIRE(info.has_value()); + REQUIRE(info->auto_seal_after_seqno_count == 1000); + REQUIRE(info->auto_seal_after_duration_s == 3600); + REQUIRE(info->max_active_shard_memory_mb == 512); + } + + INFO("Update policy"); + { + auto tx = network.tables->create_tx(); + auto* policy = tx.rw(ccf::Tables::SHARD_POLICY); + + ccf::ShardPolicyInfo new_info; + new_info.auto_seal_after_seqno_count = 5000; + new_info.auto_seal_after_duration_s = 0; + new_info.max_active_shard_memory_mb = 0; + policy->put(new_info); + + REQUIRE(tx.commit() == ccf::kv::CommitResult::SUCCESS); + } + + { + auto tx = network.tables->create_read_only_tx(); + auto* policy = tx.ro(ccf::Tables::SHARD_POLICY); + auto info = policy->get(); + REQUIRE(info.has_value()); + REQUIRE(info->auto_seal_after_seqno_count == 5000); + REQUIRE(info->auto_seal_after_duration_s == 0); + } +} + +TEST_CASE("ShardInfo JSON serialisation round-trip") +{ + ccf::ShardInfo info; + info.shard_id = 42; + info.seqno_start = 100; + info.seqno_end = 200; + info.status = ccf::ShardStatus::Sealed; + info.snapshot_seqno = 200; + + auto j = nlohmann::json(info); + auto info2 = j.get(); + + REQUIRE(info2.shard_id == 42); + REQUIRE(info2.seqno_start == 100); + REQUIRE(info2.seqno_end == 200); + REQUIRE(info2.status == ccf::ShardStatus::Sealed); + REQUIRE(info2.snapshot_seqno == 200); +} + +TEST_CASE("ShardPolicyInfo JSON serialisation round-trip") +{ + ccf::ShardPolicyInfo policy; + policy.auto_seal_after_seqno_count = 10000; + policy.auto_seal_after_duration_s = 7200; + policy.max_active_shard_memory_mb = 1024; + + auto j = nlohmann::json(policy); + auto policy2 = j.get(); + + REQUIRE(policy2.auto_seal_after_seqno_count == 10000); + REQUIRE(policy2.auto_seal_after_duration_s == 7200); + REQUIRE(policy2.max_active_shard_memory_mb == 1024); +} + +TEST_CASE("Snapshotter shard seal marking") +{ + ccf::logger::config::default_init(); + + ccf::NetworkState network; + + auto consensus = std::make_shared(); + auto node_kp = ccf::crypto::make_ec_key_pair(); + auto history = std::make_shared( + *network.tables.get(), ccf::kv::test::PrimaryNodeId, *node_kp); + network.tables->set_history(history); + network.tables->initialise_term(2); + network.tables->set_consensus(consensus); + auto encryptor = std::make_shared(); + network.tables->set_encryptor(encryptor); + + constexpr auto buffer_size = 1024 * 16; + auto in_buffer = std::make_unique(buffer_size); + auto out_buffer = std::make_unique(buffer_size); + ringbuffer::Circuit eio(in_buffer->bd, out_buffer->bd); + + std::unique_ptr writer_factory = + std::make_unique(eio); + + size_t snapshot_tx_interval = 10; + auto snapshotter = std::make_shared( + *writer_factory, network.tables, snapshot_tx_interval); + + INFO("Mark snapshot as shard seal"); + { + snapshotter->mark_next_snapshot_as_shard_seal(0); + + // Before any commit, this should not yet be committed + REQUIRE_FALSE(snapshotter->is_shard_seal_snapshot_committed(0)); + } + + INFO("Shard seal completion callback is invoked"); + { + std::optional callback_shard_id = std::nullopt; + snapshotter->set_on_shard_seal_committed( + [&callback_shard_id](uint64_t shard_id) { + callback_shard_id = shard_id; + }); + + // Verify callback is not yet called + REQUIRE_FALSE(callback_shard_id.has_value()); + + // The callback is invoked inside update_indices() when a shard-seal + // snapshot is committed — here we just verify the setter works and + // the callback pointer is stored + snapshotter->mark_next_snapshot_as_shard_seal(7); + REQUIRE_FALSE(callback_shard_id.has_value()); + } +} + +int main(int argc, char** argv) +{ + ccf::logger::config::default_init(); + + doctest::Context context; + context.applyCommandLine(argc, argv); + int res = context.run(); + if (context.shouldExit()) + return res; + return res; +} diff --git a/src/service/network_tables.h b/src/service/network_tables.h index 32a0d80772df..09de73f3b5b4 100644 --- a/src/service/network_tables.h +++ b/src/service/network_tables.h @@ -23,6 +23,7 @@ #include "ccf/service/tables/virtual_measurements.h" #include "kv/store.h" #include "tables/config.h" +#include "tables/shards.h" #include "tables/governance_history.h" #include "tables/previous_service_identity.h" #include "tables/secrets.h" @@ -185,10 +186,16 @@ namespace ccf const Configuration config = {Tables::CONFIGURATION}; const Constitution constitution = {Tables::CONSTITUTION}; + // + // Sharding tables + // + const Shards shards = {Tables::SHARDS}; + const ShardPolicy shard_policy = {Tables::SHARD_POLICY}; + [[nodiscard]] auto get_all_service_tables() const { return std::make_tuple( - service, config, constitution, previous_service_identity); + service, config, constitution, previous_service_identity, shards); } // All builtin governance tables should be included here, so that wrapper diff --git a/src/service/tables/shards.h b/src/service/tables/shards.h new file mode 100644 index 000000000000..e52ab1fd2c2b --- /dev/null +++ b/src/service/tables/shards.h @@ -0,0 +1,71 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the Apache 2.0 License. +#pragma once + +#include "ccf/crypto/sha256_hash.h" +#include "ccf/ds/json.h" +#include "ccf/service/map.h" +#include "kv/kv_types.h" + +#include + +namespace ccf +{ + enum class ShardStatus : uint8_t + { + Active = 0, + Sealing = 1, + Sealed = 2 + }; + + DECLARE_JSON_ENUM( + ShardStatus, + {{ShardStatus::Active, "Active"}, + {ShardStatus::Sealing, "Sealing"}, + {ShardStatus::Sealed, "Sealed"}}); + + struct ShardInfo + { + uint64_t shard_id = 0; + ccf::kv::Version seqno_start = 0; + ccf::kv::Version seqno_end = 0; + ccf::kv::Version snapshot_seqno = 0; + ccf::crypto::Sha256Hash merkle_root_at_seal = {}; + ccf::kv::Version ledger_secret_version = 0; + ShardStatus status = ShardStatus::Active; + }; + + DECLARE_JSON_TYPE_WITH_OPTIONAL_FIELDS(ShardInfo); + DECLARE_JSON_REQUIRED_FIELDS( + ShardInfo, shard_id, seqno_start, seqno_end, status); + DECLARE_JSON_OPTIONAL_FIELDS( + ShardInfo, + snapshot_seqno, + merkle_root_at_seal, + ledger_secret_version); + + using Shards = ServiceMap; + + struct ShardPolicyInfo + { + size_t auto_seal_after_seqno_count = 0; + size_t auto_seal_after_duration_s = 0; + size_t max_active_shard_memory_mb = 0; + }; + + DECLARE_JSON_TYPE_WITH_OPTIONAL_FIELDS(ShardPolicyInfo); + DECLARE_JSON_REQUIRED_FIELDS(ShardPolicyInfo); + DECLARE_JSON_OPTIONAL_FIELDS( + ShardPolicyInfo, + auto_seal_after_seqno_count, + auto_seal_after_duration_s, + max_active_shard_memory_mb); + + using ShardPolicy = ServiceValue; + + namespace Tables + { + static constexpr auto SHARDS = "public:ccf.gov.shards.info"; + static constexpr auto SHARD_POLICY = "public:ccf.gov.shards.policy"; + } +} diff --git a/tests/config.jinja b/tests/config.jinja index d6a55eed8afb..9c8d562399aa 100644 --- a/tests/config.jinja +++ b/tests/config.jinja @@ -136,5 +136,11 @@ {% endif %} }{% endif %}{% if node_to_node_message_limit %}, "node_to_node_message_limit": {{ node_to_node_message_limit|tojson }}{% endif %}{% if historical_cache_soft_limit %}, - "historical_cache_soft_limit": {{ historical_cache_soft_limit|tojson }}{% endif %} + "historical_cache_soft_limit": {{ historical_cache_soft_limit|tojson }}{% endif %}{% if sharding_enabled %}, + "sharding": { + "enabled": {{ sharding_enabled|tojson }}{% if sharding_auto_seal_after_seqno_count %}, + "auto_seal_after_seqno_count": {{ sharding_auto_seal_after_seqno_count }}{% endif %}{% if sharding_auto_seal_after_duration_s %}, + "auto_seal_after_duration_s": {{ sharding_auto_seal_after_duration_s }}{% endif %}{% if sharding_max_active_shard_memory_mb %}, + "max_active_shard_memory_mb": {{ sharding_max_active_shard_memory_mb }}{% endif %} + }{% endif %} } \ No newline at end of file diff --git a/tests/infra/consortium.py b/tests/infra/consortium.py index 1e5cbca3e6ca..64e76b9c0285 100644 --- a/tests/infra/consortium.py +++ b/tests/infra/consortium.py @@ -414,6 +414,27 @@ def trigger_recovery_shares_refresh(self, remote_node): proposal = self.get_any_active_member().propose(remote_node, proposal_body) return self.vote_using_majority(remote_node, proposal, careful_vote) + def seal_current_shard(self, remote_node): + proposal_body, careful_vote = self.make_proposal("seal_current_shard") + proposal = self.get_any_active_member().propose(remote_node, proposal_body) + return self.vote_using_majority(remote_node, proposal, careful_vote) + + def set_shard_policy( + self, + remote_node, + auto_seal_after_seqno_count=0, + auto_seal_after_duration_s=0, + max_active_shard_memory_mb=0, + ): + proposal_body, careful_vote = self.make_proposal( + "set_shard_policy", + auto_seal_after_seqno_count=auto_seal_after_seqno_count, + auto_seal_after_duration_s=auto_seal_after_duration_s, + max_active_shard_memory_mb=max_active_shard_memory_mb, + ) + proposal = self.get_any_active_member().propose(remote_node, proposal_body) + return self.vote_using_majority(remote_node, proposal, careful_vote) + def user_cert_path(self, user_id): return os.path.join(self.common_dir, f"{user_id}_cert.pem") diff --git a/tests/infra/network.py b/tests/infra/network.py index b489a60ea5e8..06c4df76bd34 100644 --- a/tests/infra/network.py +++ b/tests/infra/network.py @@ -221,6 +221,10 @@ class Network: "subject_name", "idle_connection_timeout_s", "host_data_transparent_statement_path", + "sharding_enabled", + "sharding_auto_seal_after_seqno_count", + "sharding_auto_seal_after_duration_s", + "sharding_max_active_shard_memory_mb", ] # Maximum delay (seconds) for updates to propagate from the primary to backups diff --git a/tests/perf-system/submitter/submit.cpp b/tests/perf-system/submitter/submit.cpp index 3e609bb0fd5c..516889712d7a 100644 --- a/tests/perf-system/submitter/submit.cpp +++ b/tests/perf-system/submitter/submit.cpp @@ -40,17 +40,18 @@ void read_parquet_file(string generator_filepath, ParquetData& data_handler) // Open Parquet file reader std::unique_ptr arrow_reader; - st = parquet::arrow::OpenFile(input, pool, &arrow_reader); - if (!st.ok()) + auto arrow_reader_result = parquet::arrow::OpenFile(input, pool); + if (!arrow_reader_result.ok()) { LOG_FAIL_FMT( "Couldn't find generator file ({}): {}", generator_filepath, - st.ToString()); + arrow_reader_result.status().ToString()); exit(1); } else { + arrow_reader = std::move(arrow_reader_result).ValueOrDie(); LOG_INFO_FMT("Found generator file"); } diff --git a/tests/sharding.py b/tests/sharding.py new file mode 100644 index 000000000000..3801d06b58a3 --- /dev/null +++ b/tests/sharding.py @@ -0,0 +1,106 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the Apache 2.0 License. +import http +import infra.network +import infra.e2e_args +import suite.test_requirements as reqs +from infra.runner import ConcurrentRunner + +from loguru import logger as LOG + + +@reqs.description("Test seal_current_shard governance proposal") +def test_seal_current_shard(network, args): + primary, _ = network.find_nodes() + + LOG.info("Sealing the current shard via governance proposal") + network.consortium.seal_current_shard(primary) + + LOG.info("Verifying shard table was updated") + with primary.client("member0") as c: + r = c.get("/gov/kv/public:ccf.gov.shards.info") + if r.status_code == http.HTTPStatus.OK.value: + LOG.info(f"Shards table contents: {r.body.json()}") + + return network + + +@reqs.description("Test set_shard_policy governance proposal") +def test_set_shard_policy(network, args): + primary, _ = network.find_nodes() + + LOG.info("Setting shard policy via governance proposal") + network.consortium.set_shard_policy( + primary, + auto_seal_after_seqno_count=5000, + auto_seal_after_duration_s=3600, + max_active_shard_memory_mb=512, + ) + + LOG.info("Verifying shard policy table was updated") + with primary.client("member0") as c: + r = c.get("/gov/kv/public:ccf.gov.shards.policy") + if r.status_code == http.HTTPStatus.OK.value: + LOG.info(f"Shard policy: {r.body.json()}") + + return network + + +@reqs.description("Test full shard lifecycle: create, seal, verify") +def test_shard_lifecycle(network, args): + primary, _ = network.find_nodes() + + LOG.info("Step 1: Set shard policy") + network.consortium.set_shard_policy( + primary, + auto_seal_after_seqno_count=10000, + ) + + LOG.info("Step 2: Issue some transactions") + network.txs.issue(network, number_txs=10) + + LOG.info("Step 3: Seal the current shard") + network.consortium.seal_current_shard(primary) + + LOG.info("Step 4: Issue more transactions on the new shard") + network.txs.issue(network, number_txs=10) + + LOG.info("Step 5: Seal the second shard") + network.consortium.seal_current_shard(primary) + + LOG.info("Step 6: Verify final state") + with primary.client("member0") as c: + r = c.get("/gov/kv/public:ccf.gov.shards.info") + if r.status_code == http.HTTPStatus.OK.value: + LOG.info(f"Final shards state: {r.body.json()}") + + return network + + +def run(args): + with infra.network.network( + args.nodes, + args.binary_dir, + args.debug_nodes, + args.perf_nodes, + pdb=args.pdb, + ) as network: + network.start_and_open(args) + + test_set_shard_policy(network, args) + test_seal_current_shard(network, args) + test_shard_lifecycle(network, args) + + +if __name__ == "__main__": + cr = ConcurrentRunner() + + cr.add( + "sharding", + run, + package="samples/apps/logging/logging", + nodes=infra.e2e_args.min_nodes(cr.args, f=0), + sharding_enabled=True, + ) + + cr.run()