From 394f4f36809bd8e761057d0de2c5699681068812 Mon Sep 17 00:00:00 2001 From: yuwmao Date: Wed, 5 Mar 2025 17:04:27 +0800 Subject: [PATCH 1/2] Add metrics for baseline resync DonerSnapshotMetrics is in memory; ReceiverSnapshotMetrics persists the progress. --- src/lib/homestore_backend/hs_homeobject.hpp | 130 +++++++++++++++++- src/lib/homestore_backend/hs_pg_manager.cpp | 4 + .../homestore_backend/pg_blob_iterator.cpp | 28 +++- .../replication_state_machine.cpp | 7 +- src/lib/homestore_backend/resync_pg_data.fbs | 2 + .../homestore_backend/resync_shard_data.fbs | 1 + .../snapshot_receive_handler.cpp | 89 +++++++++++- .../tests/homeobj_fixture.hpp | 13 ++ .../tests/homeobj_misc_tests.cpp | 2 +- .../tests/hs_repl_test_helper.hpp | 34 +++++ .../tests/test_homestore_backend.cpp | 4 +- .../tests/test_homestore_backend_dynamic.cpp | 4 +- 12 files changed, 294 insertions(+), 24 deletions(-) diff --git a/src/lib/homestore_backend/hs_homeobject.hpp b/src/lib/homestore_backend/hs_homeobject.hpp index 101cfc011..2c3044f60 100644 --- a/src/lib/homestore_backend/hs_homeobject.hpp +++ b/src/lib/homestore_backend/hs_homeobject.hpp @@ -170,11 +170,54 @@ class HSHomeObject : public HomeObjectImpl { char data[1]; }; + struct durable_snapshot_progress { + uint64_t start_time{0}; + uint64_t total_blobs{0}; + uint64_t total_bytes{0}; + uint64_t total_shards{0}; + uint64_t complete_blobs{0}; + uint64_t complete_bytes{0}; + uint64_t complete_shards{0}; + uint64_t corrupted_blobs{0}; + }; + + struct snapshot_progress { + uint64_t start_time{0}; + uint64_t total_blobs{0}; + uint64_t total_bytes{0}; + uint64_t total_shards{0}; + uint64_t complete_blobs{0}; + uint64_t complete_bytes{0}; + uint64_t complete_shards{0}; + // The count of the blobs which have been corrupted on the leader side. + uint64_t corrupted_blobs{0}; + // Record the stats of the current batch to avoid double counting. + uint64_t cur_shard_total_blobs{0}; + uint64_t cur_shard_complete_blobs{0}; + // Used to handle the retried batch message. + uint64_t cur_batch_blobs{0}; + uint64_t cur_batch_bytes{0}; + uint64_t error_count{0}; + + snapshot_progress() = default; + explicit snapshot_progress(durable_snapshot_progress p) { + start_time = p.start_time; + total_blobs = p.total_blobs; + total_bytes = p.total_bytes; + total_shards = p.total_shards; + complete_blobs = p.complete_blobs; + complete_bytes = p.complete_bytes; + complete_shards = p.complete_shards; + corrupted_blobs = p.corrupted_blobs; + } + }; + // Since shard list can be quite large and only need to be persisted once, we store it in a separate superblk struct snapshot_rcvr_info_superblk { shard_id_t shard_cursor; int64_t snp_lsn; pg_id_t pg_id; + durable_snapshot_progress progress; uint32_t size() const { return sizeof(snapshot_rcvr_info_superblk); } static auto name() -> string { return _snp_rcvr_meta_name; } @@ -286,6 +329,11 @@ class HSHomeObject : public HomeObjectImpl { * Returns the number of open shards on this PG. */ uint32_t open_shards() const; + + /** + * Returns the progress of the baseline resync. + */ + uint32_t get_snp_progress() const; }; struct HS_Shard : public Shard { @@ -401,6 +449,25 @@ class HSHomeObject : public HomeObjectImpl { void pack_resync_message(sisl::io_blob_safe& dest_blob, SyncMessageType type); bool end_of_scan() const; + // All of the leader's metrics are in-memory + struct DonerSnapshotMetrics : sisl::MetricsGroup { + explicit DonerSnapshotMetrics(pg_id_t pg_id) : sisl::MetricsGroup("snapshot_doner", std::to_string(pg_id)) { + REGISTER_COUNTER(snp_dnr_load_blob, "Loaded blobs in baseline resync"); + REGISTER_COUNTER(snp_dnr_load_bytes, "Loaded bytes in baseline resync"); + REGISTER_COUNTER(snp_dnr_resend_count, "Mesg resend times in baseline resync"); + REGISTER_COUNTER(snp_dnr_error_count, "Error times when reading blobs in baseline resync"); + REGISTER_HISTOGRAM(snp_dnr_blob_process_time, "Time cost of successfully process a blob in baseline resync", + HistogramBucketsType(DefaultBuckets)); + register_me_to_farm(); + } + + ~DonerSnapshotMetrics() { deregister_me_from_farm(); } + DonerSnapshotMetrics(const DonerSnapshotMetrics&) = delete; + DonerSnapshotMetrics(DonerSnapshotMetrics&&) noexcept = delete; + DonerSnapshotMetrics& operator=(const DonerSnapshotMetrics&) = delete; + DonerSnapshotMetrics& operator=(DonerSnapshotMetrics&&) noexcept = delete; + }; + struct ShardEntry { ShardInfo info; homestore::chunk_num_t v_chunk_num; @@ -421,6 +488,7 @@ class HSHomeObject : public HomeObjectImpl { pg_id_t pg_id_; shared< homestore::ReplDev > repl_dev_; uint64_t max_batch_size_; + std::unique_ptr metrics_; }; class SnapshotReceiveHandler { @@ -449,11 +517,11 @@ class HSHomeObject : public HomeObjectImpl { pg_id_t get_context_pg_id() const; // Try to load existing snapshot context info - bool load_prev_context(); + bool load_prev_context_and_metrics(); // Reset the context for a new snapshot, should be called before each new snapshot transmission - void reset_context(int64_t lsn, pg_id_t pg_id); - void destroy_context(); + void reset_context_and_metrics(int64_t lsn, pg_id_t pg_id); + void destroy_context_and_metrics(); shard_id_t get_shard_cursor() const; shard_id_t get_next_shard() const; @@ -467,14 +535,66 @@ class HSHomeObject : public HomeObjectImpl { const int64_t snp_lsn; const pg_id_t pg_id; shared< BlobIndexTable > index_table; - + std::shared_mutex progress_lock; + snapshot_progress progress; SnapshotContext(int64_t lsn, pg_id_t pg_id) : snp_lsn{lsn}, pg_id{pg_id} {} }; + struct ReceiverSnapshotMetrics: sisl::MetricsGroup { + ReceiverSnapshotMetrics(std::shared_ptr ctx) : sisl::MetricsGroup("snapshot_receiver", std::to_string(ctx->pg_id)), + ctx_{ctx} { + REGISTER_GAUGE(snp_rcvr_total_blob, "Total blobs in baseline resync"); + REGISTER_GAUGE(snp_rcvr_total_bytes, "Total bytes in baseline resync") + REGISTER_GAUGE(snp_rcvr_total_shards, "Total shards in baseline resync") + REGISTER_GAUGE(snp_rcvr_complete_blob, "Complete blob in baseline resync") + REGISTER_GAUGE(snp_rcvr_complete_bytes, "Complete bytes in baseline resync") + REGISTER_GAUGE(snp_rcvr_complete_shards, "Complete shards in baseline resync") + REGISTER_GAUGE(snp_rcvr_current_shard_total_blobs, + "Total blob of the current shard in baseline resync") + REGISTER_GAUGE(snp_rcvr_current_shard_complete_blobs, + "Compelete blob of the current blob in baseline resync"); + REGISTER_GAUGE(snp_rcvr_corrupted_blobs, "Corrupted blobs in baseline resync"); + REGISTER_GAUGE(snp_rcvr_elapsed_time_sec, "Time cost(seconds) of baseline resync"); + REGISTER_GAUGE(snp_rcvr_error_count, "Error count in baseline resync"); + REGISTER_HISTOGRAM(snp_rcvr_blob_process_time, "Time cost of successfully process a blob in baseline resync", + HistogramBucketsType(DefaultBuckets)); + + + attach_gather_cb(std::bind(&ReceiverSnapshotMetrics::on_gather, this)); + register_me_to_farm(); + } + ~ReceiverSnapshotMetrics() { deregister_me_from_farm(); } + ReceiverSnapshotMetrics(const ReceiverSnapshotMetrics&) = delete; + ReceiverSnapshotMetrics(ReceiverSnapshotMetrics&&) noexcept = delete; + ReceiverSnapshotMetrics& operator=(const ReceiverSnapshotMetrics&) = delete; + ReceiverSnapshotMetrics& operator=(ReceiverSnapshotMetrics&&) noexcept = delete; + + void on_gather() { + if (ctx_) { + std::shared_lock lock(ctx_->progress_lock); + GAUGE_UPDATE(*this, snp_rcvr_total_blob, ctx_->progress.total_blobs); + GAUGE_UPDATE(*this, snp_rcvr_total_bytes, ctx_->progress.total_bytes); + GAUGE_UPDATE(*this, snp_rcvr_total_shards, ctx_->progress.total_shards); + GAUGE_UPDATE(*this, snp_rcvr_complete_blob, ctx_->progress.complete_blobs); + GAUGE_UPDATE(*this, snp_rcvr_complete_bytes, ctx_->progress.complete_bytes); + GAUGE_UPDATE(*this, snp_rcvr_complete_shards, ctx_->progress.complete_shards); + GAUGE_UPDATE(*this, snp_rcvr_current_shard_total_blobs, ctx_->progress.cur_shard_total_blobs); + GAUGE_UPDATE(*this, snp_rcvr_current_shard_complete_blobs, ctx_->progress.cur_shard_complete_blobs); + GAUGE_UPDATE(*this, snp_rcvr_corrupted_blobs, ctx_->progress.corrupted_blobs); + GAUGE_UPDATE(*this, snp_rcvr_error_count, ctx_->progress.error_count); + auto duration = get_elapsed_time_ms(ctx_->progress.start_time * 1000) / 1000; + GAUGE_UPDATE(*this, snp_rcvr_elapsed_time_sec, duration); + } + } + private: + std::shared_ptr ctx_; + }; + HSHomeObject& home_obj_; const shared< homestore::ReplDev > repl_dev_; - std::unique_ptr< SnapshotContext > ctx_; + std::shared_ptr< SnapshotContext > ctx_; + std::unique_ptr< ReceiverSnapshotMetrics > metrics_; // Update the snp_info superblock void update_snp_info_sb(bool init = false); diff --git a/src/lib/homestore_backend/hs_pg_manager.cpp b/src/lib/homestore_backend/hs_pg_manager.cpp index 3fa97a3e4..4ab662a02 100644 --- a/src/lib/homestore_backend/hs_pg_manager.cpp +++ b/src/lib/homestore_backend/hs_pg_manager.cpp @@ -578,6 +578,10 @@ uint32_t HSHomeObject::HS_PG::open_shards() const { return std::count_if(shards_.begin(), shards_.end(), [](auto const& s) { return s->is_open(); }); } +uint32_t HSHomeObject::HS_PG::get_snp_progress() const { + return snp_rcvr_info_sb_->progress.complete_bytes / snp_rcvr_info_sb_->progress.total_bytes; +} + // NOTE: caller should hold the _pg_lock const HSHomeObject::HS_PG* HSHomeObject::_get_hs_pg_unlocked(pg_id_t pg_id) const { auto iter = _pg_map.find(pg_id); diff --git a/src/lib/homestore_backend/pg_blob_iterator.cpp b/src/lib/homestore_backend/pg_blob_iterator.cpp index 027b79fbe..6b4af2107 100644 --- a/src/lib/homestore_backend/pg_blob_iterator.cpp +++ b/src/lib/homestore_backend/pg_blob_iterator.cpp @@ -2,6 +2,7 @@ #include #include #include +#include #include "generated/resync_blob_data_generated.h" #include "generated/resync_pg_data_generated.h" #include "generated/resync_shard_data_generated.h" @@ -16,6 +17,7 @@ HSHomeObject::PGBlobIterator::PGBlobIterator(HSHomeObject& home_obj, homestore:: auto pg = get_pg_metadata(); pg_id_ = pg->pg_info_.id; repl_dev_ = static_cast< HS_PG* >(pg)->repl_dev_; + metrics_ = make_unique< DonerSnapshotMetrics >(pg_id_); max_batch_size_ = HS_BACKEND_DYNAMIC_CONFIG(max_snapshot_batch_size_mb) * Mi; if (max_batch_size_ == 0) { max_batch_size_ = DEFAULT_MAX_BATCH_SIZE_MB * Mi; } @@ -38,7 +40,11 @@ HSHomeObject::PGBlobIterator::PGBlobIterator(HSHomeObject& home_obj, homestore:: bool HSHomeObject::PGBlobIterator::update_cursor(objId id) { if (id.value == LAST_OBJ_ID) { return true; } //resend batch - if (id.value == cur_obj_id_.value) { return true; } + if (id.value == cur_obj_id_.value) { + LOGT("resend the same batch, objId:{}, cur_obj_id:{}", id.to_string(), cur_obj_id_.to_string()); + COUNTER_INCREMENT(*metrics_, snp_dnr_resend_count, 1); + return true; + } // If cur_obj_id_ == 0|0 (PG meta), this may be a request for resuming from specific shard if (cur_obj_id_.shard_seq_num == 0 && id.shard_seq_num != 0 && id.batch_id == 0) { @@ -122,12 +128,15 @@ bool HSHomeObject::PGBlobIterator::create_pg_snapshot_data(sisl::io_blob_safe& m members.push_back(CreateMemberDirect(builder_, &id, member.name.c_str(), member.priority)); } std::vector< uint64_t > shard_ids; + auto total_bytes = pg->durable_entities().total_occupied_blk_count.load(std::memory_order_relaxed) * repl_dev_-> + get_blk_size(); + auto total_blobs = pg->durable_entities().active_blob_count.load(std::memory_order_relaxed); for (auto& shard : shard_list_) { shard_ids.push_back(shard.info.id); } auto pg_entry = CreateResyncPGMetaDataDirect(builder_, pg_info.id, &uuid, pg_info.size, pg_info.chunk_size, pg->durable_entities().blob_sequence_num, - pg->shard_sequence_num_, &members, &shard_ids); + pg->shard_sequence_num_, &members, &shard_ids, total_blobs, total_bytes); builder_.FinishSizePrefixed(pg_entry); pack_resync_message(meta_blob, SyncMessageType::PG_META); @@ -151,7 +160,8 @@ bool HSHomeObject::PGBlobIterator::create_shard_snapshot_data(sisl::io_blob_safe auto shard = shard_list_[cur_shard_idx_]; auto shard_entry = CreateResyncShardMetaData( builder_, shard.info.id, pg_id_, static_cast< uint8_t >(shard.info.state), shard.info.lsn, - shard.info.created_time, shard.info.last_modified_time, shard.info.total_capacity_bytes, shard.v_chunk_num); + shard.info.created_time, shard.info.last_modified_time, shard.info.total_capacity_bytes, cur_blob_list_.size(), + shard.v_chunk_num); builder_.FinishSizePrefixed(shard_entry); @@ -183,7 +193,7 @@ BlobManager::AsyncResult< sisl::io_blob_safe > HSHomeObject::PGBlobIterator::loa BlobHeader const* header = r_cast< BlobHeader const* >(read_buf.cbytes()); if (!header->valid()) { - //TODO add metrics + //The metrics for corrupted blob is handled on the follower side. LOGE("Invalid header found, shard_id={}, blob_id={}, [header={}]", shard_id, blob_id, header->to_string()); state = ResyncBlobState::CORRUPTED; @@ -191,7 +201,7 @@ BlobManager::AsyncResult< sisl::io_blob_safe > HSHomeObject::PGBlobIterator::loa } if (header->shard_id != shard_id) { - //TODO add metrics + //The metrics for corrupted blob is handled on the follower side. LOGE("Invalid shard_id in header, shard_id={}, blob_id={}, [header={}]", shard_id, blob_id, header->to_string()); state = ResyncBlobState::CORRUPTED; @@ -240,6 +250,8 @@ bool HSHomeObject::PGBlobIterator::create_blobs_snapshot_data(sisl::io_blob_safe continue; } + auto start = Clock::now(); + #ifdef _PRERELEASE if (iomgr_flip::instance()->test_flip("pg_blob_iterator_load_blob_data_error")) { LOGW("Simulating loading blob data error"); @@ -269,10 +281,12 @@ bool HSHomeObject::PGBlobIterator::create_blobs_snapshot_data(sisl::io_blob_safe if (blob.size() == 0) { LOGE("Failed to retrieve blob for shard={} blob={} pbas={}", info.shard_id, info.blob_id, info.pbas.to_string()); - //TODO add metrics + COUNTER_INCREMENT(*metrics_, snp_dnr_error_count, 1); return false; } + auto duration = get_elapsed_time_us(start); + HISTOGRAM_OBSERVE(*metrics_, snp_dnr_blob_process_time, duration); std::vector< uint8_t > data(blob.cbytes(), blob.cbytes() + blob.size()); blob_entries.push_back(CreateResyncBlobDataDirect(builder_, info.blob_id, (uint8_t)state, &data)); total_bytes += blob.size(); @@ -286,6 +300,8 @@ bool HSHomeObject::PGBlobIterator::create_blobs_snapshot_data(sisl::io_blob_safe LOGD("create blobs snapshot data batch: shard_seq_num={}, batch_num={}, total_bytes={}, blob_num={}, end_of_shard={}", cur_obj_id_.shard_seq_num, cur_obj_id_.batch_id, total_bytes, blob_entries.size(), end_of_shard); + COUNTER_INCREMENT(*metrics_, snp_dnr_load_blob, blob_entries.size()); + COUNTER_INCREMENT(*metrics_, snp_dnr_load_bytes, total_bytes); pack_resync_message(data_blob, SyncMessageType::SHARD_BATCH); return true; } diff --git a/src/lib/homestore_backend/replication_state_machine.cpp b/src/lib/homestore_backend/replication_state_machine.cpp index d944e122b..ec3f1c791 100644 --- a/src/lib/homestore_backend/replication_state_machine.cpp +++ b/src/lib/homestore_backend/replication_state_machine.cpp @@ -226,7 +226,7 @@ bool ReplicationStateMachine::apply_snapshot(std::shared_ptr< homestore::snapsho std::this_thread::sleep_for(std::chrono::milliseconds(delay.get())); } #endif - m_snp_rcv_handler->destroy_context(); + m_snp_rcv_handler->destroy_context_and_metrics(); std::lock_guard lk(m_snapshot_lock); set_snapshot_context(context); @@ -321,7 +321,7 @@ void ReplicationStateMachine::write_snapshot_obj(std::shared_ptr< homestore::sna auto r_dev = repl_dev(); if (!m_snp_rcv_handler) { m_snp_rcv_handler = std::make_unique< HSHomeObject::SnapshotReceiveHandler >(*home_object_, r_dev); - if (m_snp_rcv_handler->load_prev_context()) { + if (m_snp_rcv_handler->load_prev_context_and_metrics()) { LOGI("Reloaded previous snapshot context, lsn:{} pg_id:{} next_shard:{}", context->get_lsn(), m_snp_rcv_handler->get_context_pg_id(), m_snp_rcv_handler->get_next_shard()); } @@ -330,6 +330,7 @@ void ReplicationStateMachine::write_snapshot_obj(std::shared_ptr< homestore::sna auto obj_id = objId(snp_obj->offset); auto log_suffix = fmt::format("group={} lsn={} shard={} batch_num={} size={}", uuids::to_string(r_dev->group_id()), context->get_lsn(), obj_id.shard_seq_num, obj_id.batch_id, snp_obj->blob.size()); + LOGI("received snapshot obj, {}", log_suffix); if (snp_obj->is_last_obj) { LOGD("Write snapshot reached is_last_obj true {}", log_suffix); @@ -382,7 +383,7 @@ void ReplicationStateMachine::write_snapshot_obj(std::shared_ptr< homestore::sna home_object_->pg_destroy(pg_data->pg_id()); } LOGI("reset context from lsn:{} to lsn:{}", m_snp_rcv_handler->get_context_lsn(), context->get_lsn()); - m_snp_rcv_handler->reset_context(context->get_lsn(), pg_data->pg_id()); + m_snp_rcv_handler->reset_context_and_metrics(context->get_lsn(), pg_data->pg_id()); auto ret = m_snp_rcv_handler->process_pg_snapshot_data(*pg_data); if (ret) { diff --git a/src/lib/homestore_backend/resync_pg_data.fbs b/src/lib/homestore_backend/resync_pg_data.fbs index bf7d05984..33c7ae8a2 100644 --- a/src/lib/homestore_backend/resync_pg_data.fbs +++ b/src/lib/homestore_backend/resync_pg_data.fbs @@ -17,6 +17,8 @@ table ResyncPGMetaData { shard_seq_num: uint64; // shard sequence number, used to assign next shard id; members : [Member]; // peers; shard_ids : [uint64]; // shard ids to transmit; + total_blobs_to_transfer : uint64; // total used bytes of the pg + total_bytes_to_transfer : uint64; // total capacity of the pg } // ResyncPGMetaData schema is the first message(ObjID=0) in the resync data stream diff --git a/src/lib/homestore_backend/resync_shard_data.fbs b/src/lib/homestore_backend/resync_shard_data.fbs index 5f4123f32..e23e5f5f6 100644 --- a/src/lib/homestore_backend/resync_shard_data.fbs +++ b/src/lib/homestore_backend/resync_shard_data.fbs @@ -10,6 +10,7 @@ table ResyncShardMetaData { created_time : uint64; // shard creation time last_modified_time : ulong; // shard last modify time total_capacity_bytes : ulong; // total capacity of the shard + total_active_blobs : ulong; // total blobs in the shard vchunk_id : uint16; // vchunk id } diff --git a/src/lib/homestore_backend/snapshot_receive_handler.cpp b/src/lib/homestore_backend/snapshot_receive_handler.cpp index f33ae1f73..79a4868ef 100644 --- a/src/lib/homestore_backend/snapshot_receive_handler.cpp +++ b/src/lib/homestore_backend/snapshot_receive_handler.cpp @@ -5,6 +5,7 @@ #include #include +#include namespace homeobject { HSHomeObject::SnapshotReceiveHandler::SnapshotReceiveHandler(HSHomeObject& home_obj, @@ -54,6 +55,13 @@ int HSHomeObject::SnapshotReceiveHandler::process_pg_snapshot_data(ResyncPGMetaD hs_pg->durable_entities_update( [&pg_meta](auto& de) { de.blob_sequence_num.store(pg_meta.blob_seq_num(), std::memory_order_relaxed); }); + // update metrics + std::unique_lock lock(mutex); + ctx_->progress.start_time = std::chrono::duration_cast< std::chrono::seconds >( + std::chrono::system_clock::now().time_since_epoch()).count(); + ctx_->progress.total_shards = ctx_->shard_list.size(); + ctx_->progress.total_blobs = pg_meta.total_blobs_to_transfer(); + ctx_->progress.total_bytes = pg_meta.total_bytes_to_transfer(); // No need to persist snp info superblock since it's almost meaningless to resume from this point. return 0; } @@ -123,12 +131,24 @@ int HSHomeObject::SnapshotReceiveHandler::process_shard_snapshot_data(ResyncShar home_obj_.local_create_shard(shard_sb->info, shard_sb->v_chunk_id, shard_sb->p_chunk_id, blk_id.blk_count()); ctx_->shard_cursor = shard_meta.shard_id(); ctx_->cur_batch_num = 0; + std::unique_lock lock(mutex); + ctx_->progress.cur_shard_total_blobs = shard_meta.total_active_blobs(); + ctx_->progress.cur_shard_complete_blobs = 0; return 0; } int HSHomeObject::SnapshotReceiveHandler::process_blobs_snapshot_data(ResyncBlobDataBatch const& data_blobs, const snp_batch_id_t batch_num, bool is_last_batch) { + //retry mesg, need to handle duplicate batch, reset progress + if (ctx_->cur_batch_num == batch_num) { + std::unique_lock lock(mutex); + ctx_->progress.cur_shard_complete_blobs -= ctx_->progress.cur_batch_blobs; + ctx_->progress.complete_blobs -= ctx_->progress.cur_batch_blobs; + ctx_->progress.complete_bytes -= ctx_->progress.cur_batch_bytes; + ctx_->progress.cur_batch_blobs = 0; + ctx_->progress.cur_batch_bytes = 0; + } ctx_->cur_batch_num = batch_num; // Find physical chunk id for current shard @@ -138,6 +158,7 @@ int HSHomeObject::SnapshotReceiveHandler::process_blobs_snapshot_data(ResyncBlob homestore::blk_alloc_hints hints; hints.chunk_id_hint = *p_chunk_id; + uint64_t total_bytes = 0; for (unsigned int i = 0; i < data_blobs.blob_list()->size(); i++) { const auto blob = data_blobs.blob_list()->Get(i); @@ -147,6 +168,8 @@ int HSHomeObject::SnapshotReceiveHandler::process_blobs_snapshot_data(ResyncBlob continue; } + auto start = Clock::now(); + #ifdef _PRERELEASE auto delay = iomgr_flip::instance()->get_test_flip< long >("simulate_write_snapshot_save_blob_delay", static_cast< long >(blob->blob_id())); @@ -175,6 +198,8 @@ int HSHomeObject::SnapshotReceiveHandler::process_blobs_snapshot_data(ResyncBlob if (blob->state() != static_cast< uint8_t >(ResyncBlobState::CORRUPTED)) { auto header = r_cast< BlobHeader const* >(blob_data); if (!header->valid()) { + std::unique_lock lock(mutex); + ctx_->progress.error_count++; LOGE("Invalid header found for blob_id {}: [header={}]", blob->blob_id(), header->to_string()); return INVALID_BLOB_HEADER; } @@ -189,8 +214,14 @@ int HSHomeObject::SnapshotReceiveHandler::process_blobs_snapshot_data(ResyncBlob if (std::memcmp(computed_hash, header->hash, BlobHeader::blob_max_hash_len) != 0) { LOGE("Hash mismatch for blob_id {}: header [{}] [computed={:np}]", blob->blob_id(), header->to_string(), spdlog::to_hex(computed_hash, computed_hash + BlobHeader::blob_max_hash_len)); + std::unique_lock lock(mutex); + ctx_->progress.error_count++; return BLOB_DATA_CORRUPTED; } + } else { + LOGW("find corrupted_blobs: {} in shard {}", blob->blob_id(), ctx_->shard_cursor); + std::unique_lock lock(mutex); + ctx_->progress.corrupted_blobs++; } // Alloc & persist blob data @@ -203,6 +234,8 @@ int HSHomeObject::SnapshotReceiveHandler::process_blobs_snapshot_data(ResyncBlob sisl::round_up(aligned_buf.size(), homestore::data_service().get_blk_size()), hints, blk_id); if (status != homestore::BlkAllocStatus::SUCCESS) { LOGE("Failed to allocate blocks for shard {} blob {}", ctx_->shard_cursor, blob->blob_id()); + std::unique_lock lock(mutex); + ctx_->progress.error_count++; return ALLOC_BLK_ERR; } @@ -216,6 +249,8 @@ int HSHomeObject::SnapshotReceiveHandler::process_blobs_snapshot_data(ResyncBlob #ifdef _PRERELEASE if (iomgr_flip::instance()->test_flip("snapshot_receiver_blob_write_data_error")) { LOGW("Simulating blob snapshot write data error"); + std::unique_lock lock(mutex); + ctx_->progress.error_count++; free_allocated_blks(); return WRITE_DATA_ERR; } @@ -235,10 +270,14 @@ int HSHomeObject::SnapshotReceiveHandler::process_blobs_snapshot_data(ResyncBlob if (ret.hasError()) { LOGE("Failed to write blob info of blob_id {} to blk_id:{}", blob->blob_id(), blk_id.to_string()); free_allocated_blks(); + std::unique_lock lock(mutex); + ctx_->progress.error_count++; return WRITE_DATA_ERR; } if (homestore::data_service().commit_blk(blk_id) != homestore::BlkAllocStatus::SUCCESS) { LOGE("Failed to commit blk_id:{} for blob_id: {}", blk_id.to_string(), blob->blob_id()); + std::unique_lock lock(mutex); + ctx_->progress.error_count++; free_allocated_blks(); return COMMIT_BLK_ERR; } @@ -248,9 +287,25 @@ int HSHomeObject::SnapshotReceiveHandler::process_blobs_snapshot_data(ResyncBlob home_obj_.local_add_blob_info(ctx_->pg_id, BlobInfo{ctx_->shard_cursor, blob->blob_id(), blk_id}); if (!success) { LOGE("Failed to add blob info for blob_id:{}", blob->blob_id()); + std::unique_lock lock(mutex); + ctx_->progress.error_count++; free_allocated_blks(); return ADD_BLOB_INDEX_ERR; } + total_bytes += data_size; + auto duration = get_elapsed_time_us(start); + HISTOGRAM_OBSERVE(*metrics_, snp_rcvr_blob_process_time, duration); + LOGD("Persisted blob_id:{} in {}us", blob->blob_id(), duration); + } + + //update metrics + { + std::unique_lock lock(mutex); + ctx_->progress.cur_batch_blobs = data_blobs.blob_list()->size(); + ctx_->progress.cur_batch_bytes = total_bytes; + ctx_->progress.cur_shard_complete_blobs += ctx_->progress.cur_batch_blobs; + ctx_->progress.complete_blobs += ctx_->progress.cur_batch_blobs; + ctx_->progress.complete_bytes += ctx_->progress.cur_batch_bytes; } if (is_last_batch) { @@ -264,7 +319,10 @@ int HSHomeObject::SnapshotReceiveHandler::process_blobs_snapshot_data(ResyncBlob if (state == ShardInfo::State::SEALED) { home_obj_.chunk_selector()->release_chunk(ctx_->pg_id, v_chunk_id.value()); } - + { + std::unique_lock lock(mutex); + ctx_->progress.complete_shards++; + } // We only update the snp info superblk on completion of each shard, since resumption is also shard-level update_snp_info_sb(ctx_->shard_cursor == ctx_->shard_list.front()); } @@ -275,7 +333,7 @@ int HSHomeObject::SnapshotReceiveHandler::process_blobs_snapshot_data(ResyncBlob int64_t HSHomeObject::SnapshotReceiveHandler::get_context_lsn() const { return ctx_ ? ctx_->snp_lsn : -1; } pg_id_t HSHomeObject::SnapshotReceiveHandler::get_context_pg_id() const { return ctx_ ? ctx_->pg_id : 0; } -bool HSHomeObject::SnapshotReceiveHandler::load_prev_context() { +bool HSHomeObject::SnapshotReceiveHandler::load_prev_context_and_metrics() { HS_PG* hs_pg = nullptr; { std::shared_lock lck(home_obj_._pg_lock); @@ -294,23 +352,27 @@ bool HSHomeObject::SnapshotReceiveHandler::load_prev_context() { hs_pg->snp_rcvr_shard_list_sb_->pg_id == hs_pg->pg_sb_->id, "PG id in snp_info sb not matching with PG sb"); - ctx_ = std::make_unique< SnapshotContext >(hs_pg->snp_rcvr_info_sb_->snp_lsn, hs_pg->snp_rcvr_info_sb_->pg_id); + ctx_ = std::make_shared< SnapshotContext >(hs_pg->snp_rcvr_info_sb_->snp_lsn, hs_pg->snp_rcvr_info_sb_->pg_id); ctx_->shard_cursor = hs_pg->snp_rcvr_info_sb_->shard_cursor; ctx_->cur_batch_num = 0; // Always resume from the beginning of the shard ctx_->index_table = hs_pg->index_table_; ctx_->shard_list = hs_pg->snp_rcvr_shard_list_sb_->get_shard_list(); + ctx_->progress = snapshot_progress(hs_pg->snp_rcvr_info_sb_->progress); + metrics_ = std::make_unique< ReceiverSnapshotMetrics >(ctx_); LOGINFO("Resuming snapshot receiver context from lsn:{} pg_id:{} shard_cursor:{}", ctx_->snp_lsn, hs_pg->snp_rcvr_info_sb_->pg_id, ctx_->shard_cursor); return true; } -void HSHomeObject::SnapshotReceiveHandler::reset_context(int64_t lsn, pg_id_t pg_id) { - if (ctx_ != nullptr) { destroy_context(); } - ctx_ = std::make_unique< SnapshotContext >(lsn, pg_id); +void HSHomeObject::SnapshotReceiveHandler::reset_context_and_metrics(int64_t lsn, pg_id_t pg_id) { + if (ctx_ != nullptr) { destroy_context_and_metrics(); } + ctx_ = std::make_shared< SnapshotContext >(lsn, pg_id); + metrics_ = std::make_unique< ReceiverSnapshotMetrics >(ctx_); } -void HSHomeObject::SnapshotReceiveHandler::destroy_context() { +void HSHomeObject::SnapshotReceiveHandler::destroy_context_and_metrics() { + metrics_.reset(); auto hs_pg = home_obj_.get_hs_pg(ctx_->pg_id); if (hs_pg == nullptr) { return; } hs_pg->snp_rcvr_info_sb_.destroy(); @@ -356,6 +418,19 @@ void HSHomeObject::SnapshotReceiveHandler::update_snp_info_sb(bool init) { sb->snp_lsn = ctx_->snp_lsn; sb->pg_id = ctx_->pg_id; sb->shard_cursor = get_next_shard(); + + { + std::unique_lock lock(mutex); + sb->progress.start_time = ctx_->progress.start_time; + sb->progress.total_blobs = ctx_->progress.total_blobs; + sb->progress.total_bytes = ctx_->progress.total_bytes; + sb->progress.total_shards = ctx_->progress.total_shards; + sb->progress.complete_shards = ctx_->progress.complete_shards; + sb->progress.complete_bytes = ctx_->progress.complete_bytes; + sb->progress.complete_blobs = ctx_->progress.complete_blobs; + sb->progress.corrupted_blobs = ctx_->progress.corrupted_blobs; + } + hs_pg->snp_rcvr_info_sb_.write(); // Ensure all the superblk & corresponding index/data update have been written to disk diff --git a/src/lib/homestore_backend/tests/homeobj_fixture.hpp b/src/lib/homestore_backend/tests/homeobj_fixture.hpp index 3314b1533..d97662506 100644 --- a/src/lib/homestore_backend/tests/homeobj_fixture.hpp +++ b/src/lib/homestore_backend/tests/homeobj_fixture.hpp @@ -14,6 +14,12 @@ #include "lib/homestore_backend/hs_homeobject.hpp" #include "bits_generator.hpp" #include "hs_repl_test_helper.hpp" +#include +#include +#include + +SETTINGS_INIT(iomgrcfg::IomgrSettings, iomgr_config); +#define IM_SETTINGS_FACTORY() SETTINGS_FACTORY(iomgr_config) using namespace std::chrono_literals; @@ -36,8 +42,14 @@ class HomeObjectFixture : public ::testing::Test { HomeObjectFixture() : rand_blob_size{1u, 16 * 1024}, rand_user_key_size{1u, 1024} {} void SetUp() override { + IM_SETTINGS_FACTORY().modifiable_settings([](auto& s) { + s.io_env.http_port = 5000 + g_helper->replica_num(); + LOGD("setup http port to {}", s.io_env.http_port); + }); HSHomeObject::_hs_chunk_size = 20 * Mi; _obj_inst = std::dynamic_pointer_cast< HSHomeObject >(g_helper->build_new_homeobject()); + //Used to export metrics, it should be called after init_homeobject + if (SISL_OPTIONS["enable_http"].as()) { g_helper->app->start_http_server(); } if (!g_helper->is_current_testcase_restarted()) { g_helper->bump_sync_point_and_sync(); } else { @@ -47,6 +59,7 @@ class HomeObjectFixture : public ::testing::Test { void TearDown() override { g_helper->sync(); + g_helper->app->stop_http_server(); _obj_inst.reset(); g_helper->delete_homeobject(); } diff --git a/src/lib/homestore_backend/tests/homeobj_misc_tests.cpp b/src/lib/homestore_backend/tests/homeobj_misc_tests.cpp index 0b27b2a47..ab1b0454e 100644 --- a/src/lib/homestore_backend/tests/homeobj_misc_tests.cpp +++ b/src/lib/homestore_backend/tests/homeobj_misc_tests.cpp @@ -201,7 +201,7 @@ TEST_F(HomeObjectFixture, SnapshotReceiveHandler) { ASSERT_TRUE(r_dev.hasValue()); auto handler = std::make_unique< homeobject::HSHomeObject::SnapshotReceiveHandler >(*_obj_inst, r_dev.value()); - handler->reset_context(snp_lsn, pg_id); + handler->reset_context_and_metrics(snp_lsn, pg_id); // Step 1: Test write pg meta - cannot test full logic since the PG already exists // Generate ResyncPGMetaData message diff --git a/src/lib/homestore_backend/tests/hs_repl_test_helper.hpp b/src/lib/homestore_backend/tests/hs_repl_test_helper.hpp index daca591ae..c53f9ffc8 100644 --- a/src/lib/homestore_backend/tests/hs_repl_test_helper.hpp +++ b/src/lib/homestore_backend/tests/hs_repl_test_helper.hpp @@ -34,6 +34,8 @@ #include #include #include +#include +#include #include @@ -138,8 +140,40 @@ class HSReplTestHelper { return std::string("127.0.0.1:") + std::to_string(port); } + + void get_prometheus_metrics(const Pistache::Rest::Request&, Pistache::Http::ResponseWriter response) { + response.send(Pistache::Http::Code::Ok, sisl::MetricsFarm::getInstance().report(sisl::ReportFormat::kTextFormat)); + } + + void start_http_server() { + std::vector< iomgr::http_route > routes = { + {Pistache::Http::Method::Get, "/metrics", + Pistache::Rest::Routes::bind(&TestReplApplication::get_prometheus_metrics, this)}, + }; + + auto http_server = ioenvironment.get_http_server(); + if (!http_server) { + LOGERROR("http server not available"); + return; + } + try { + http_server->setup_routes(routes); + } catch (std::runtime_error const& e) { LOGERROR("setup routes failed, {}", e.what()); } + http_server->start(); + LOGD("http server is running"); + } + + void stop_http_server() { + auto http_server = ioenvironment.get_http_server(); + if (!http_server) { + LOGWARN("http server not available"); + return; + } + http_server->stop(); + } }; + public: friend class TestReplApplication; diff --git a/src/lib/homestore_backend/tests/test_homestore_backend.cpp b/src/lib/homestore_backend/tests/test_homestore_backend.cpp index ace76ddb5..cdbb673fa 100644 --- a/src/lib/homestore_backend/tests/test_homestore_backend.cpp +++ b/src/lib/homestore_backend/tests/test_homestore_backend.cpp @@ -40,7 +40,9 @@ SISL_OPTION_GROUP( (num_shards, "", "num_shards", "number of shards", ::cxxopts::value< uint64_t >()->default_value("4"), "number"), (num_blobs, "", "num_blobs", "number of blobs", ::cxxopts::value< uint64_t >()->default_value("20"), "number"), (is_restart, "", "is_restart", "the process is restart or the first start", ::cxxopts::value< bool >()-> - default_value("false"), "true or false")); + default_value("false"), "true or false"), + (enable_http, "", "enable_http", "enable http server or not", + ::cxxopts::value< bool >()->default_value("false"), "true or false")); SISL_LOGGING_INIT(homeobject) #define test_options logging, config, homeobject, test_homeobject_repl_common diff --git a/src/lib/homestore_backend/tests/test_homestore_backend_dynamic.cpp b/src/lib/homestore_backend/tests/test_homestore_backend_dynamic.cpp index 8bd7130cf..78dd05ae1 100644 --- a/src/lib/homestore_backend/tests/test_homestore_backend_dynamic.cpp +++ b/src/lib/homestore_backend/tests/test_homestore_backend_dynamic.cpp @@ -531,7 +531,9 @@ SISL_OPTION_GROUP( (num_blobs, "", "num_blobs", "number of blobs", ::cxxopts::value< uint64_t >()->default_value("20"), "number"), (is_restart, "", "is_restart", "(internal) the process is restart or the first start, only used for the first testcase", - ::cxxopts::value< bool >()->default_value("false"), "true or false")); + ::cxxopts::value< bool >()->default_value("false"), "true or false"), + (enable_http, "", "enable_http", "enable http server or not", + ::cxxopts::value< bool >()->default_value("false"), "true or false")); SISL_LOGGING_INIT(homeobject) #define test_options logging, config, homeobject, test_homeobject_repl_common From 86519f6e3fd510bc16dba80d3755bb880641feb3 Mon Sep 17 00:00:00 2001 From: yuwmao Date: Fri, 7 Mar 2025 15:11:40 +0800 Subject: [PATCH 2/2] address comments --- conanfile.py | 2 +- src/lib/homestore_backend/hs_homeobject.hpp | 18 +++++++----------- src/lib/homestore_backend/pg_blob_iterator.cpp | 15 ++++++++++----- .../homestore_backend/resync_shard_data.fbs | 1 - .../snapshot_receive_handler.cpp | 4 ---- 5 files changed, 18 insertions(+), 22 deletions(-) diff --git a/conanfile.py b/conanfile.py index 0ec8f173d..67d7a096f 100644 --- a/conanfile.py +++ b/conanfile.py @@ -9,7 +9,7 @@ class HomeObjectConan(ConanFile): name = "homeobject" - version = "2.2.13" + version = "2.2.14" homepage = "https://github.com/eBay/HomeObject" description = "Blob Store built on HomeReplication" diff --git a/src/lib/homestore_backend/hs_homeobject.hpp b/src/lib/homestore_backend/hs_homeobject.hpp index 2c3044f60..18209f314 100644 --- a/src/lib/homestore_backend/hs_homeobject.hpp +++ b/src/lib/homestore_backend/hs_homeobject.hpp @@ -191,9 +191,6 @@ class HSHomeObject : public HomeObjectImpl { uint64_t complete_shards{0}; // The count of the blobs which have been corrupted on the leader side. uint64_t corrupted_blobs{0}; - // Record the stats of the current batch to avoid double counting. - uint64_t cur_shard_total_blobs{0}; - uint64_t cur_shard_complete_blobs{0}; // Used to handle the retried batch message. uint64_t cur_batch_blobs{0}; uint64_t cur_batch_bytes{0}; @@ -456,8 +453,12 @@ class HSHomeObject : public HomeObjectImpl { REGISTER_COUNTER(snp_dnr_load_bytes, "Loaded bytes in baseline resync"); REGISTER_COUNTER(snp_dnr_resend_count, "Mesg resend times in baseline resync"); REGISTER_COUNTER(snp_dnr_error_count, "Error times when reading blobs in baseline resync"); - REGISTER_HISTOGRAM(snp_dnr_blob_process_time, "Time cost of successfully process a blob in baseline resync", + REGISTER_HISTOGRAM(snp_dnr_blob_process_time, "Time cost(us) of successfully process a blob in baseline resync", HistogramBucketsType(DefaultBuckets)); + REGISTER_HISTOGRAM(snp_dnr_batch_process_time, + "Time cost(ms) of successfully process a batch in baseline resync", HistogramBucketsType(DefaultBuckets)); + REGISTER_HISTOGRAM(snp_dnr_batch_e2e_time, + "Time cost(ms) of a batch end-to-end round trip in baseline resync", HistogramBucketsType(DefaultBuckets)); register_me_to_farm(); } @@ -480,6 +481,7 @@ class HSHomeObject : public HomeObjectImpl { std::vector< BlobInfo > cur_blob_list_{0}; uint64_t cur_start_blob_idx_{0}; uint64_t cur_batch_blob_count_{0}; + Clock::time_point cur_batch_start_time_; flatbuffers::FlatBufferBuilder builder_; HSHomeObject& home_obj_; @@ -549,14 +551,10 @@ class HSHomeObject : public HomeObjectImpl { REGISTER_GAUGE(snp_rcvr_complete_blob, "Complete blob in baseline resync") REGISTER_GAUGE(snp_rcvr_complete_bytes, "Complete bytes in baseline resync") REGISTER_GAUGE(snp_rcvr_complete_shards, "Complete shards in baseline resync") - REGISTER_GAUGE(snp_rcvr_current_shard_total_blobs, - "Total blob of the current shard in baseline resync") - REGISTER_GAUGE(snp_rcvr_current_shard_complete_blobs, - "Compelete blob of the current blob in baseline resync"); REGISTER_GAUGE(snp_rcvr_corrupted_blobs, "Corrupted blobs in baseline resync"); REGISTER_GAUGE(snp_rcvr_elapsed_time_sec, "Time cost(seconds) of baseline resync"); REGISTER_GAUGE(snp_rcvr_error_count, "Error count in baseline resync"); - REGISTER_HISTOGRAM(snp_rcvr_blob_process_time, "Time cost of successfully process a blob in baseline resync", + REGISTER_HISTOGRAM(snp_rcvr_blob_process_time, "Time cost(us) of successfully process a blob in baseline resync", HistogramBucketsType(DefaultBuckets)); @@ -578,8 +576,6 @@ class HSHomeObject : public HomeObjectImpl { GAUGE_UPDATE(*this, snp_rcvr_complete_blob, ctx_->progress.complete_blobs); GAUGE_UPDATE(*this, snp_rcvr_complete_bytes, ctx_->progress.complete_bytes); GAUGE_UPDATE(*this, snp_rcvr_complete_shards, ctx_->progress.complete_shards); - GAUGE_UPDATE(*this, snp_rcvr_current_shard_total_blobs, ctx_->progress.cur_shard_total_blobs); - GAUGE_UPDATE(*this, snp_rcvr_current_shard_complete_blobs, ctx_->progress.cur_shard_complete_blobs); GAUGE_UPDATE(*this, snp_rcvr_corrupted_blobs, ctx_->progress.corrupted_blobs); GAUGE_UPDATE(*this, snp_rcvr_error_count, ctx_->progress.error_count); auto duration = get_elapsed_time_ms(ctx_->progress.start_time * 1000) / 1000; diff --git a/src/lib/homestore_backend/pg_blob_iterator.cpp b/src/lib/homestore_backend/pg_blob_iterator.cpp index 6b4af2107..400c026c4 100644 --- a/src/lib/homestore_backend/pg_blob_iterator.cpp +++ b/src/lib/homestore_backend/pg_blob_iterator.cpp @@ -38,6 +38,11 @@ HSHomeObject::PGBlobIterator::PGBlobIterator(HSHomeObject& home_obj, homestore:: //result represents if the objId is valid and the cursors are updated bool HSHomeObject::PGBlobIterator::update_cursor(objId id) { + if (cur_batch_start_time_ != Clock::time_point{}) { + HISTOGRAM_OBSERVE(*metrics_, snp_dnr_batch_e2e_time, get_elapsed_time_us(cur_batch_start_time_)); + } + cur_batch_start_time_ = Clock::now(); + if (id.value == LAST_OBJ_ID) { return true; } //resend batch if (id.value == cur_obj_id_.value) { @@ -160,8 +165,7 @@ bool HSHomeObject::PGBlobIterator::create_shard_snapshot_data(sisl::io_blob_safe auto shard = shard_list_[cur_shard_idx_]; auto shard_entry = CreateResyncShardMetaData( builder_, shard.info.id, pg_id_, static_cast< uint8_t >(shard.info.state), shard.info.lsn, - shard.info.created_time, shard.info.last_modified_time, shard.info.total_capacity_bytes, cur_blob_list_.size(), - shard.v_chunk_num); + shard.info.created_time, shard.info.last_modified_time, shard.info.total_capacity_bytes, shard.v_chunk_num); builder_.FinishSizePrefixed(shard_entry); @@ -233,6 +237,7 @@ BlobManager::AsyncResult< sisl::io_blob_safe > HSHomeObject::PGBlobIterator::loa } bool HSHomeObject::PGBlobIterator::create_blobs_snapshot_data(sisl::io_blob_safe& data_blob) { + auto batch_start = Clock::now(); std::vector< ::flatbuffers::Offset< ResyncBlobData > > blob_entries; bool end_of_shard = false; @@ -250,7 +255,7 @@ bool HSHomeObject::PGBlobIterator::create_blobs_snapshot_data(sisl::io_blob_safe continue; } - auto start = Clock::now(); + auto blob_start = Clock::now(); #ifdef _PRERELEASE if (iomgr_flip::instance()->test_flip("pg_blob_iterator_load_blob_data_error")) { @@ -285,8 +290,7 @@ bool HSHomeObject::PGBlobIterator::create_blobs_snapshot_data(sisl::io_blob_safe return false; } - auto duration = get_elapsed_time_us(start); - HISTOGRAM_OBSERVE(*metrics_, snp_dnr_blob_process_time, duration); + HISTOGRAM_OBSERVE(*metrics_, snp_dnr_blob_process_time, get_elapsed_time_us(blob_start)); std::vector< uint8_t > data(blob.cbytes(), blob.cbytes() + blob.size()); blob_entries.push_back(CreateResyncBlobDataDirect(builder_, info.blob_id, (uint8_t)state, &data)); total_bytes += blob.size(); @@ -303,6 +307,7 @@ bool HSHomeObject::PGBlobIterator::create_blobs_snapshot_data(sisl::io_blob_safe COUNTER_INCREMENT(*metrics_, snp_dnr_load_blob, blob_entries.size()); COUNTER_INCREMENT(*metrics_, snp_dnr_load_bytes, total_bytes); pack_resync_message(data_blob, SyncMessageType::SHARD_BATCH); + HISTOGRAM_OBSERVE(*metrics_, snp_dnr_batch_process_time, get_elapsed_time_us(batch_start)); return true; } diff --git a/src/lib/homestore_backend/resync_shard_data.fbs b/src/lib/homestore_backend/resync_shard_data.fbs index e23e5f5f6..5f4123f32 100644 --- a/src/lib/homestore_backend/resync_shard_data.fbs +++ b/src/lib/homestore_backend/resync_shard_data.fbs @@ -10,7 +10,6 @@ table ResyncShardMetaData { created_time : uint64; // shard creation time last_modified_time : ulong; // shard last modify time total_capacity_bytes : ulong; // total capacity of the shard - total_active_blobs : ulong; // total blobs in the shard vchunk_id : uint16; // vchunk id } diff --git a/src/lib/homestore_backend/snapshot_receive_handler.cpp b/src/lib/homestore_backend/snapshot_receive_handler.cpp index 79a4868ef..28b9b9cec 100644 --- a/src/lib/homestore_backend/snapshot_receive_handler.cpp +++ b/src/lib/homestore_backend/snapshot_receive_handler.cpp @@ -132,8 +132,6 @@ int HSHomeObject::SnapshotReceiveHandler::process_shard_snapshot_data(ResyncShar ctx_->shard_cursor = shard_meta.shard_id(); ctx_->cur_batch_num = 0; std::unique_lock lock(mutex); - ctx_->progress.cur_shard_total_blobs = shard_meta.total_active_blobs(); - ctx_->progress.cur_shard_complete_blobs = 0; return 0; } @@ -143,7 +141,6 @@ int HSHomeObject::SnapshotReceiveHandler::process_blobs_snapshot_data(ResyncBlob //retry mesg, need to handle duplicate batch, reset progress if (ctx_->cur_batch_num == batch_num) { std::unique_lock lock(mutex); - ctx_->progress.cur_shard_complete_blobs -= ctx_->progress.cur_batch_blobs; ctx_->progress.complete_blobs -= ctx_->progress.cur_batch_blobs; ctx_->progress.complete_bytes -= ctx_->progress.cur_batch_bytes; ctx_->progress.cur_batch_blobs = 0; @@ -303,7 +300,6 @@ int HSHomeObject::SnapshotReceiveHandler::process_blobs_snapshot_data(ResyncBlob std::unique_lock lock(mutex); ctx_->progress.cur_batch_blobs = data_blobs.blob_list()->size(); ctx_->progress.cur_batch_bytes = total_bytes; - ctx_->progress.cur_shard_complete_blobs += ctx_->progress.cur_batch_blobs; ctx_->progress.complete_blobs += ctx_->progress.cur_batch_blobs; ctx_->progress.complete_bytes += ctx_->progress.cur_batch_bytes; }