Skip to content

Commit 4256eea

Browse files
committed
Add metrics for baseline resync
DonerSnapshotMetrics is in memory; ReceiverSnapshotMetrics persists the progress.
1 parent 1f5683e commit 4256eea

13 files changed

Lines changed: 295 additions & 25 deletions

conanfile.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99

1010
class HomeObjectConan(ConanFile):
1111
name = "homeobject"
12-
version = "2.2.12"
12+
version = "2.2.13"
1313

1414
homepage = "https://github.com/eBay/HomeObject"
1515
description = "Blob Store built on HomeReplication"

src/lib/homestore_backend/hs_homeobject.hpp

Lines changed: 125 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -170,11 +170,54 @@ class HSHomeObject : public HomeObjectImpl {
170170
char data[1];
171171
};
172172

173+
struct durable_snapshot_progress {
174+
uint64_t start_time{0};
175+
uint64_t total_blobs{0};
176+
uint64_t total_bytes{0};
177+
uint64_t total_shards{0};
178+
uint64_t complete_blobs{0};
179+
uint64_t complete_bytes{0};
180+
uint64_t complete_shards{0};
181+
uint64_t corrupted_blobs{0};
182+
};
183+
184+
struct snapshot_progress {
185+
uint64_t start_time{0};
186+
uint64_t total_blobs{0};
187+
uint64_t total_bytes{0};
188+
uint64_t total_shards{0};
189+
uint64_t complete_blobs{0};
190+
uint64_t complete_bytes{0};
191+
uint64_t complete_shards{0};
192+
// The count of the blobs which have been corrupted on the leader side.
193+
uint64_t corrupted_blobs{0};
194+
// Record the stats of the current batch to avoid double counting.
195+
uint64_t cur_shard_total_blobs{0};
196+
uint64_t cur_shard_complete_blobs{0};
197+
// Used to handle the retried batch message.
198+
uint64_t cur_batch_blobs{0};
199+
uint64_t cur_batch_bytes{0};
200+
uint64_t error_count{0};
201+
202+
snapshot_progress() = default;
203+
explicit snapshot_progress(durable_snapshot_progress p) {
204+
start_time = p.start_time;
205+
total_blobs = p.total_blobs;
206+
total_bytes = p.total_bytes;
207+
total_shards = p.total_shards;
208+
complete_blobs = p.complete_blobs;
209+
complete_bytes = p.complete_bytes;
210+
complete_shards = p.complete_shards;
211+
corrupted_blobs = p.corrupted_blobs;
212+
}
213+
};
214+
173215
// Since shard list can be quite large and only need to be persisted once, we store it in a separate superblk
174216
struct snapshot_rcvr_info_superblk {
175217
shard_id_t shard_cursor;
176218
int64_t snp_lsn;
177219
pg_id_t pg_id;
220+
durable_snapshot_progress progress;
178221

179222
uint32_t size() const { return sizeof(snapshot_rcvr_info_superblk); }
180223
static auto name() -> string { return _snp_rcvr_meta_name; }
@@ -286,6 +329,11 @@ class HSHomeObject : public HomeObjectImpl {
286329
* Returns the number of open shards on this PG.
287330
*/
288331
uint32_t open_shards() const;
332+
333+
/**
334+
* Returns the progress of the baseline resync.
335+
*/
336+
uint32_t get_snp_progress() const;
289337
};
290338

291339
struct HS_Shard : public Shard {
@@ -400,6 +448,25 @@ class HSHomeObject : public HomeObjectImpl {
400448
void pack_resync_message(sisl::io_blob_safe& dest_blob, SyncMessageType type);
401449
bool end_of_scan() const;
402450

451+
// All of the leader's metrics are in-memory
452+
struct DonerSnapshotMetrics : sisl::MetricsGroup {
453+
explicit DonerSnapshotMetrics(pg_id_t pg_id) : sisl::MetricsGroup("snapshot_doner", std::to_string(pg_id)) {
454+
REGISTER_COUNTER(snp_dnr_load_blob, "Loaded blobs in baseline resync");
455+
REGISTER_COUNTER(snp_dnr_load_bytes, "Loaded bytes in baseline resync");
456+
REGISTER_COUNTER(snp_dnr_resend_count, "Mesg resend times in baseline resync");
457+
REGISTER_COUNTER(snp_dnr_error_count, "Error times when reading blobs in baseline resync");
458+
REGISTER_HISTOGRAM(snp_dnr_blob_process_time, "Time cost of successfully process a blob in baseline resync",
459+
HistogramBucketsType(DefaultBuckets));
460+
register_me_to_farm();
461+
}
462+
463+
~DonerSnapshotMetrics() { deregister_me_from_farm(); }
464+
DonerSnapshotMetrics(const DonerSnapshotMetrics&) = delete;
465+
DonerSnapshotMetrics(DonerSnapshotMetrics&&) noexcept = delete;
466+
DonerSnapshotMetrics& operator=(const DonerSnapshotMetrics&) = delete;
467+
DonerSnapshotMetrics& operator=(DonerSnapshotMetrics&&) noexcept = delete;
468+
};
469+
403470
struct ShardEntry {
404471
ShardInfo info;
405472
homestore::chunk_num_t v_chunk_num;
@@ -420,6 +487,7 @@ class HSHomeObject : public HomeObjectImpl {
420487
pg_id_t pg_id_;
421488
shared< homestore::ReplDev > repl_dev_;
422489
uint64_t max_batch_size_;
490+
std::unique_ptr<DonerSnapshotMetrics> metrics_;
423491
};
424492

425493
class SnapshotReceiveHandler {
@@ -448,11 +516,11 @@ class HSHomeObject : public HomeObjectImpl {
448516
pg_id_t get_context_pg_id() const;
449517

450518
// Try to load existing snapshot context info
451-
bool load_prev_context();
519+
bool load_prev_context_and_metrics();
452520

453521
// Reset the context for a new snapshot, should be called before each new snapshot transmission
454-
void reset_context(int64_t lsn, pg_id_t pg_id);
455-
void destroy_context();
522+
void reset_context_and_metrics(int64_t lsn, pg_id_t pg_id);
523+
void destroy_context_and_metrics();
456524

457525
shard_id_t get_shard_cursor() const;
458526
shard_id_t get_next_shard() const;
@@ -466,14 +534,66 @@ class HSHomeObject : public HomeObjectImpl {
466534
const int64_t snp_lsn;
467535
const pg_id_t pg_id;
468536
shared< BlobIndexTable > index_table;
469-
537+
std::shared_mutex progress_lock;
538+
snapshot_progress progress;
470539
SnapshotContext(int64_t lsn, pg_id_t pg_id) : snp_lsn{lsn}, pg_id{pg_id} {}
471540
};
472541

542+
struct ReceiverSnapshotMetrics: sisl::MetricsGroup {
543+
ReceiverSnapshotMetrics(std::shared_ptr<SnapshotContext> ctx) : sisl::MetricsGroup("snapshot_receiver", std::to_string(ctx->pg_id)),
544+
ctx_{ctx} {
545+
REGISTER_GAUGE(snp_rcvr_total_blob, "Total blobs in baseline resync");
546+
REGISTER_GAUGE(snp_rcvr_total_bytes, "Total bytes in baseline resync")
547+
REGISTER_GAUGE(snp_rcvr_total_shards, "Total shards in baseline resync")
548+
REGISTER_GAUGE(snp_rcvr_complete_blob, "Complete blob in baseline resync")
549+
REGISTER_GAUGE(snp_rcvr_complete_bytes, "Complete bytes in baseline resync")
550+
REGISTER_GAUGE(snp_rcvr_complete_shards, "Complete shards in baseline resync")
551+
REGISTER_GAUGE(snp_rcvr_current_shard_total_blobs,
552+
"Total blob of the current shard in baseline resync")
553+
REGISTER_GAUGE(snp_rcvr_current_shard_complete_blobs,
554+
"Compelete blob of the current blob in baseline resync");
555+
REGISTER_GAUGE(snp_rcvr_corrupted_blobs, "Corrupted blobs in baseline resync");
556+
REGISTER_GAUGE(snp_rcvr_elapsed_time_sec, "Time cost(seconds) of baseline resync");
557+
REGISTER_GAUGE(snp_rcvr_error_count, "Error count in baseline resync");
558+
REGISTER_HISTOGRAM(snp_rcvr_blob_process_time, "Time cost of successfully process a blob in baseline resync",
559+
HistogramBucketsType(DefaultBuckets));
560+
561+
562+
attach_gather_cb(std::bind(&ReceiverSnapshotMetrics::on_gather, this));
563+
register_me_to_farm();
564+
}
565+
~ReceiverSnapshotMetrics() { deregister_me_from_farm(); }
566+
ReceiverSnapshotMetrics(const ReceiverSnapshotMetrics&) = delete;
567+
ReceiverSnapshotMetrics(ReceiverSnapshotMetrics&&) noexcept = delete;
568+
ReceiverSnapshotMetrics& operator=(const ReceiverSnapshotMetrics&) = delete;
569+
ReceiverSnapshotMetrics& operator=(ReceiverSnapshotMetrics&&) noexcept = delete;
570+
571+
void on_gather() {
572+
if (ctx_) {
573+
std::shared_lock<std::shared_mutex> lock(ctx_->progress_lock);
574+
GAUGE_UPDATE(*this, snp_rcvr_total_blob, ctx_->progress.total_blobs);
575+
GAUGE_UPDATE(*this, snp_rcvr_total_bytes, ctx_->progress.total_bytes);
576+
GAUGE_UPDATE(*this, snp_rcvr_total_shards, ctx_->progress.total_shards);
577+
GAUGE_UPDATE(*this, snp_rcvr_complete_blob, ctx_->progress.complete_blobs);
578+
GAUGE_UPDATE(*this, snp_rcvr_complete_bytes, ctx_->progress.complete_bytes);
579+
GAUGE_UPDATE(*this, snp_rcvr_complete_shards, ctx_->progress.complete_shards);
580+
GAUGE_UPDATE(*this, snp_rcvr_current_shard_total_blobs, ctx_->progress.cur_shard_total_blobs);
581+
GAUGE_UPDATE(*this, snp_rcvr_current_shard_complete_blobs, ctx_->progress.cur_shard_complete_blobs);
582+
GAUGE_UPDATE(*this, snp_rcvr_corrupted_blobs, ctx_->progress.corrupted_blobs);
583+
GAUGE_UPDATE(*this, snp_rcvr_error_count, ctx_->progress.error_count);
584+
auto duration = get_elapsed_time_ms(ctx_->progress.start_time * 1000) / 1000;
585+
GAUGE_UPDATE(*this, snp_rcvr_elapsed_time_sec, duration);
586+
}
587+
}
588+
private:
589+
std::shared_ptr<SnapshotContext> ctx_;
590+
};
591+
473592
HSHomeObject& home_obj_;
474593
const shared< homestore::ReplDev > repl_dev_;
475594

476-
std::unique_ptr< SnapshotContext > ctx_;
595+
std::shared_ptr< SnapshotContext > ctx_;
596+
std::unique_ptr< ReceiverSnapshotMetrics > metrics_;
477597

478598
// Update the snp_info superblock
479599
void update_snp_info_sb(bool init = false);

src/lib/homestore_backend/hs_pg_manager.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -553,6 +553,10 @@ uint32_t HSHomeObject::HS_PG::open_shards() const {
553553
return std::count_if(shards_.begin(), shards_.end(), [](auto const& s) { return s->is_open(); });
554554
}
555555

556+
uint32_t HSHomeObject::HS_PG::get_snp_progress() const {
557+
return snp_rcvr_info_sb_->progress.complete_bytes / snp_rcvr_info_sb_->progress.total_bytes;
558+
}
559+
556560
// NOTE: caller should hold the _pg_lock
557561
const HSHomeObject::HS_PG* HSHomeObject::_get_hs_pg_unlocked(pg_id_t pg_id) const {
558562
auto iter = _pg_map.find(pg_id);

src/lib/homestore_backend/pg_blob_iterator.cpp

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
#include <sisl/logging/logging.h>
33
#include <sisl/options/options.h>
44
#include <sisl/settings/settings.hpp>
5+
#include <sisl/metrics/metrics.hpp>
56
#include "generated/resync_blob_data_generated.h"
67
#include "generated/resync_pg_data_generated.h"
78
#include "generated/resync_shard_data_generated.h"
@@ -16,6 +17,7 @@ HSHomeObject::PGBlobIterator::PGBlobIterator(HSHomeObject& home_obj, homestore::
1617
auto pg = get_pg_metadata();
1718
pg_id_ = pg->pg_info_.id;
1819
repl_dev_ = static_cast< HS_PG* >(pg)->repl_dev_;
20+
metrics_ = make_unique< DonerSnapshotMetrics >(pg_id_);
1921
max_batch_size_ = HS_BACKEND_DYNAMIC_CONFIG(max_snapshot_batch_size_mb) * Mi;
2022
if (max_batch_size_ == 0) { max_batch_size_ = DEFAULT_MAX_BATCH_SIZE_MB * Mi; }
2123

@@ -38,7 +40,11 @@ HSHomeObject::PGBlobIterator::PGBlobIterator(HSHomeObject& home_obj, homestore::
3840
bool HSHomeObject::PGBlobIterator::update_cursor(objId id) {
3941
if (id.value == LAST_OBJ_ID) { return true; }
4042
//resend batch
41-
if (id.value == cur_obj_id_.value) { return true; }
43+
if (id.value == cur_obj_id_.value) {
44+
LOGT("resend the same batch, objId:{}, cur_obj_id:{}", id.to_string(), cur_obj_id_.to_string());
45+
COUNTER_INCREMENT(*metrics_, snp_dnr_resend_count, 1);
46+
return true;
47+
}
4248

4349
// If cur_obj_id_ == 0|0 (PG meta), this may be a request for resuming from specific shard
4450
if (cur_obj_id_.shard_seq_num == 0 && id.shard_seq_num != 0 && id.batch_id == 0) {
@@ -122,12 +128,15 @@ bool HSHomeObject::PGBlobIterator::create_pg_snapshot_data(sisl::io_blob_safe& m
122128
members.push_back(CreateMemberDirect(builder_, &id, member.name.c_str(), member.priority));
123129
}
124130
std::vector< uint64_t > shard_ids;
131+
auto total_bytes = pg->durable_entities().total_occupied_blk_count.load(std::memory_order_relaxed) * repl_dev_->
132+
get_blk_size();
133+
auto total_blobs = pg->durable_entities().active_blob_count.load(std::memory_order_relaxed);
125134
for (auto& shard : shard_list_) {
126135
shard_ids.push_back(shard.info.id);
127136
}
128137

129138
auto pg_entry = CreateResyncPGMetaDataDirect(builder_, pg_info.id, &uuid, pg_info.size, pg_info.chunk_size, pg->durable_entities().blob_sequence_num,
130-
pg->shard_sequence_num_, &members, &shard_ids);
139+
pg->shard_sequence_num_, &members, &shard_ids, total_blobs, total_bytes);
131140
builder_.FinishSizePrefixed(pg_entry);
132141

133142
pack_resync_message(meta_blob, SyncMessageType::PG_META);
@@ -151,7 +160,8 @@ bool HSHomeObject::PGBlobIterator::create_shard_snapshot_data(sisl::io_blob_safe
151160
auto shard = shard_list_[cur_shard_idx_];
152161
auto shard_entry = CreateResyncShardMetaData(
153162
builder_, shard.info.id, pg_id_, static_cast< uint8_t >(shard.info.state), shard.info.lsn,
154-
shard.info.created_time, shard.info.last_modified_time, shard.info.total_capacity_bytes, shard.v_chunk_num);
163+
shard.info.created_time, shard.info.last_modified_time, shard.info.total_capacity_bytes, cur_blob_list_.size(),
164+
shard.v_chunk_num);
155165

156166
builder_.FinishSizePrefixed(shard_entry);
157167

@@ -183,15 +193,15 @@ BlobManager::AsyncResult< sisl::io_blob_safe > HSHomeObject::PGBlobIterator::loa
183193

184194
BlobHeader const* header = r_cast< BlobHeader const* >(read_buf.cbytes());
185195
if (!header->valid()) {
186-
//TODO add metrics
196+
//The metrics for corrupted blob is handled on the follower side.
187197
LOGE("Invalid header found, shard_id={}, blob_id={}, [header={}]", shard_id, blob_id,
188198
header->to_string());
189199
state = ResyncBlobState::CORRUPTED;
190200
return std::move(read_buf);
191201
}
192202

193203
if (header->shard_id != shard_id) {
194-
//TODO add metrics
204+
//The metrics for corrupted blob is handled on the follower side.
195205
LOGE("Invalid shard_id in header, shard_id={}, blob_id={}, [header={}]", shard_id,
196206
blob_id, header->to_string());
197207
state = ResyncBlobState::CORRUPTED;
@@ -240,6 +250,8 @@ bool HSHomeObject::PGBlobIterator::create_blobs_snapshot_data(sisl::io_blob_safe
240250
continue;
241251
}
242252

253+
auto start = Clock::now();
254+
243255
#ifdef _PRERELEASE
244256
if (iomgr_flip::instance()->test_flip("pg_blob_iterator_load_blob_data_error")) {
245257
LOGW("Simulating loading blob data error");
@@ -269,10 +281,12 @@ bool HSHomeObject::PGBlobIterator::create_blobs_snapshot_data(sisl::io_blob_safe
269281
if (blob.size() == 0) {
270282
LOGE("Failed to retrieve blob for shard={} blob={} pbas={}", info.shard_id, info.blob_id,
271283
info.pbas.to_string());
272-
//TODO add metrics
284+
COUNTER_INCREMENT(*metrics_, snp_dnr_error_count, 1);
273285
return false;
274286
}
275287

288+
auto duration = get_elapsed_time_us(start);
289+
HISTOGRAM_OBSERVE(*metrics_, snp_dnr_blob_process_time, duration);
276290
std::vector< uint8_t > data(blob.cbytes(), blob.cbytes() + blob.size());
277291
blob_entries.push_back(CreateResyncBlobDataDirect(builder_, info.blob_id, (uint8_t)state, &data));
278292
total_bytes += blob.size();
@@ -286,6 +300,8 @@ bool HSHomeObject::PGBlobIterator::create_blobs_snapshot_data(sisl::io_blob_safe
286300
LOGD("create blobs snapshot data batch: shard_seq_num={}, batch_num={}, total_bytes={}, blob_num={}, end_of_shard={}",
287301
cur_obj_id_.shard_seq_num, cur_obj_id_.batch_id, total_bytes, blob_entries.size(), end_of_shard);
288302

303+
COUNTER_INCREMENT(*metrics_, snp_dnr_load_blob, blob_entries.size());
304+
COUNTER_INCREMENT(*metrics_, snp_dnr_load_bytes, total_bytes);
289305
pack_resync_message(data_blob, SyncMessageType::SHARD_BATCH);
290306
return true;
291307
}

src/lib/homestore_backend/replication_state_machine.cpp

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -226,7 +226,7 @@ bool ReplicationStateMachine::apply_snapshot(std::shared_ptr< homestore::snapsho
226226
std::this_thread::sleep_for(std::chrono::milliseconds(delay.get()));
227227
}
228228
#endif
229-
m_snp_rcv_handler->destroy_context();
229+
m_snp_rcv_handler->destroy_context_and_metrics();
230230

231231
std::lock_guard lk(m_snapshot_lock);
232232
set_snapshot_context(context);
@@ -321,7 +321,7 @@ void ReplicationStateMachine::write_snapshot_obj(std::shared_ptr< homestore::sna
321321
auto r_dev = repl_dev();
322322
if (!m_snp_rcv_handler) {
323323
m_snp_rcv_handler = std::make_unique< HSHomeObject::SnapshotReceiveHandler >(*home_object_, r_dev);
324-
if (m_snp_rcv_handler->load_prev_context()) {
324+
if (m_snp_rcv_handler->load_prev_context_and_metrics()) {
325325
LOGI("Reloaded previous snapshot context, lsn:{} pg_id:{} next_shard:{}", context->get_lsn(),
326326
m_snp_rcv_handler->get_context_pg_id(), m_snp_rcv_handler->get_next_shard());
327327
}
@@ -330,6 +330,7 @@ void ReplicationStateMachine::write_snapshot_obj(std::shared_ptr< homestore::sna
330330
auto obj_id = objId(snp_obj->offset);
331331
auto log_suffix = fmt::format("group={} lsn={} shard={} batch_num={} size={}", uuids::to_string(r_dev->group_id()),
332332
context->get_lsn(), obj_id.shard_seq_num, obj_id.batch_id, snp_obj->blob.size());
333+
LOGI("received snapshot obj, {}", log_suffix);
333334

334335
if (snp_obj->is_last_obj) {
335336
LOGD("Write snapshot reached is_last_obj true {}", log_suffix);
@@ -382,7 +383,7 @@ void ReplicationStateMachine::write_snapshot_obj(std::shared_ptr< homestore::sna
382383
home_object_->pg_destroy(pg_data->pg_id());
383384
}
384385
LOGI("reset context from lsn:{} to lsn:{}", m_snp_rcv_handler->get_context_lsn(), context->get_lsn());
385-
m_snp_rcv_handler->reset_context(context->get_lsn(), pg_data->pg_id());
386+
m_snp_rcv_handler->reset_context_and_metrics(context->get_lsn(), pg_data->pg_id());
386387

387388
auto ret = m_snp_rcv_handler->process_pg_snapshot_data(*pg_data);
388389
if (ret) {

src/lib/homestore_backend/resync_pg_data.fbs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ table ResyncPGMetaData {
1717
shard_seq_num: uint64; // shard sequence number, used to assign next shard id;
1818
members : [Member]; // peers;
1919
shard_ids : [uint64]; // shard ids to transmit;
20+
total_blobs_to_transfer : uint64; // total used bytes of the pg
21+
total_bytes_to_transfer : uint64; // total capacity of the pg
2022
}
2123

2224
// ResyncPGMetaData schema is the first message(ObjID=0) in the resync data stream

src/lib/homestore_backend/resync_shard_data.fbs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ table ResyncShardMetaData {
1010
created_time : uint64; // shard creation time
1111
last_modified_time : ulong; // shard last modify time
1212
total_capacity_bytes : ulong; // total capacity of the shard
13+
total_active_blobs : ulong; // total blobs in the shard
1314
vchunk_id : uint16; // vchunk id
1415
}
1516

0 commit comments

Comments
 (0)