Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

class HomeObjectConan(ConanFile):
name = "homeobject"
version = "2.2.13"
version = "2.2.14"

homepage = "https://github.com/eBay/HomeObject"
description = "Blob Store built on HomeReplication"
Expand Down
126 changes: 121 additions & 5 deletions src/lib/homestore_backend/hs_homeobject.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -170,11 +170,51 @@ class HSHomeObject : public HomeObjectImpl {
char data[1];
};

struct durable_snapshot_progress {
uint64_t start_time{0};
uint64_t total_blobs{0};
uint64_t total_bytes{0};
uint64_t total_shards{0};
uint64_t complete_blobs{0};
uint64_t complete_bytes{0};
uint64_t complete_shards{0};
uint64_t corrupted_blobs{0};
};

struct snapshot_progress {
uint64_t start_time{0};
uint64_t total_blobs{0};
uint64_t total_bytes{0};
uint64_t total_shards{0};
uint64_t complete_blobs{0};
uint64_t complete_bytes{0};
uint64_t complete_shards{0};
// The count of the blobs which have been corrupted on the leader side.
uint64_t corrupted_blobs{0};
// Used to handle the retried batch message.
uint64_t cur_batch_blobs{0};
uint64_t cur_batch_bytes{0};
uint64_t error_count{0};

snapshot_progress() = default;
explicit snapshot_progress(durable_snapshot_progress p) {
start_time = p.start_time;
total_blobs = p.total_blobs;
total_bytes = p.total_bytes;
total_shards = p.total_shards;
complete_blobs = p.complete_blobs;
complete_bytes = p.complete_bytes;
complete_shards = p.complete_shards;
corrupted_blobs = p.corrupted_blobs;
}
};

// Since shard list can be quite large and only need to be persisted once, we store it in a separate superblk
struct snapshot_rcvr_info_superblk {
shard_id_t shard_cursor;
int64_t snp_lsn;
pg_id_t pg_id;
durable_snapshot_progress progress;

uint32_t size() const { return sizeof(snapshot_rcvr_info_superblk); }
static auto name() -> string { return _snp_rcvr_meta_name; }
Expand Down Expand Up @@ -286,6 +326,11 @@ class HSHomeObject : public HomeObjectImpl {
* Returns the number of open shards on this PG.
*/
uint32_t open_shards() const;

/**
* Returns the progress of the baseline resync.
*/
uint32_t get_snp_progress() const;
};

struct HS_Shard : public Shard {
Expand Down Expand Up @@ -401,6 +446,29 @@ class HSHomeObject : public HomeObjectImpl {
void pack_resync_message(sisl::io_blob_safe& dest_blob, SyncMessageType type);
bool end_of_scan() const;

// All of the leader's metrics are in-memory
struct DonerSnapshotMetrics : sisl::MetricsGroup {
explicit DonerSnapshotMetrics(pg_id_t pg_id) : sisl::MetricsGroup("snapshot_doner", std::to_string(pg_id)) {
REGISTER_COUNTER(snp_dnr_load_blob, "Loaded blobs in baseline resync");
REGISTER_COUNTER(snp_dnr_load_bytes, "Loaded bytes in baseline resync");
REGISTER_COUNTER(snp_dnr_resend_count, "Mesg resend times in baseline resync");
REGISTER_COUNTER(snp_dnr_error_count, "Error times when reading blobs in baseline resync");
REGISTER_HISTOGRAM(snp_dnr_blob_process_time, "Time cost(us) of successfully process a blob in baseline resync",
HistogramBucketsType(DefaultBuckets));
REGISTER_HISTOGRAM(snp_dnr_batch_process_time,
"Time cost(ms) of successfully process a batch in baseline resync", HistogramBucketsType(DefaultBuckets));
REGISTER_HISTOGRAM(snp_dnr_batch_e2e_time,
"Time cost(ms) of a batch end-to-end round trip in baseline resync", HistogramBucketsType(DefaultBuckets));
register_me_to_farm();
}

~DonerSnapshotMetrics() { deregister_me_from_farm(); }
DonerSnapshotMetrics(const DonerSnapshotMetrics&) = delete;
DonerSnapshotMetrics(DonerSnapshotMetrics&&) noexcept = delete;
DonerSnapshotMetrics& operator=(const DonerSnapshotMetrics&) = delete;
DonerSnapshotMetrics& operator=(DonerSnapshotMetrics&&) noexcept = delete;
};

struct ShardEntry {
ShardInfo info;
homestore::chunk_num_t v_chunk_num;
Expand All @@ -413,6 +481,7 @@ class HSHomeObject : public HomeObjectImpl {
std::vector< BlobInfo > cur_blob_list_{0};
uint64_t cur_start_blob_idx_{0};
uint64_t cur_batch_blob_count_{0};
Clock::time_point cur_batch_start_time_;
flatbuffers::FlatBufferBuilder builder_;

HSHomeObject& home_obj_;
Expand All @@ -421,6 +490,7 @@ class HSHomeObject : public HomeObjectImpl {
pg_id_t pg_id_;
shared< homestore::ReplDev > repl_dev_;
uint64_t max_batch_size_;
std::unique_ptr<DonerSnapshotMetrics> metrics_;
};

class SnapshotReceiveHandler {
Expand Down Expand Up @@ -449,11 +519,11 @@ class HSHomeObject : public HomeObjectImpl {
pg_id_t get_context_pg_id() const;

// Try to load existing snapshot context info
bool load_prev_context();
bool load_prev_context_and_metrics();

// Reset the context for a new snapshot, should be called before each new snapshot transmission
void reset_context(int64_t lsn, pg_id_t pg_id);
void destroy_context();
void reset_context_and_metrics(int64_t lsn, pg_id_t pg_id);
void destroy_context_and_metrics();

shard_id_t get_shard_cursor() const;
shard_id_t get_next_shard() const;
Expand All @@ -467,14 +537,60 @@ class HSHomeObject : public HomeObjectImpl {
const int64_t snp_lsn;
const pg_id_t pg_id;
shared< BlobIndexTable > index_table;

std::shared_mutex progress_lock;
snapshot_progress progress;
SnapshotContext(int64_t lsn, pg_id_t pg_id) : snp_lsn{lsn}, pg_id{pg_id} {}
};

struct ReceiverSnapshotMetrics: sisl::MetricsGroup {
ReceiverSnapshotMetrics(std::shared_ptr<SnapshotContext> ctx) : sisl::MetricsGroup("snapshot_receiver", std::to_string(ctx->pg_id)),
ctx_{ctx} {
REGISTER_GAUGE(snp_rcvr_total_blob, "Total blobs in baseline resync");
REGISTER_GAUGE(snp_rcvr_total_bytes, "Total bytes in baseline resync")
REGISTER_GAUGE(snp_rcvr_total_shards, "Total shards in baseline resync")
REGISTER_GAUGE(snp_rcvr_complete_blob, "Complete blob in baseline resync")
REGISTER_GAUGE(snp_rcvr_complete_bytes, "Complete bytes in baseline resync")
REGISTER_GAUGE(snp_rcvr_complete_shards, "Complete shards in baseline resync")
REGISTER_GAUGE(snp_rcvr_corrupted_blobs, "Corrupted blobs in baseline resync");
REGISTER_GAUGE(snp_rcvr_elapsed_time_sec, "Time cost(seconds) of baseline resync");
REGISTER_GAUGE(snp_rcvr_error_count, "Error count in baseline resync");
REGISTER_HISTOGRAM(snp_rcvr_blob_process_time, "Time cost(us) of successfully process a blob in baseline resync",
HistogramBucketsType(DefaultBuckets));


attach_gather_cb(std::bind(&ReceiverSnapshotMetrics::on_gather, this));
register_me_to_farm();
}
~ReceiverSnapshotMetrics() { deregister_me_from_farm(); }
ReceiverSnapshotMetrics(const ReceiverSnapshotMetrics&) = delete;
ReceiverSnapshotMetrics(ReceiverSnapshotMetrics&&) noexcept = delete;
ReceiverSnapshotMetrics& operator=(const ReceiverSnapshotMetrics&) = delete;
ReceiverSnapshotMetrics& operator=(ReceiverSnapshotMetrics&&) noexcept = delete;

void on_gather() {
if (ctx_) {
std::shared_lock<std::shared_mutex> lock(ctx_->progress_lock);
GAUGE_UPDATE(*this, snp_rcvr_total_blob, ctx_->progress.total_blobs);
GAUGE_UPDATE(*this, snp_rcvr_total_bytes, ctx_->progress.total_bytes);
GAUGE_UPDATE(*this, snp_rcvr_total_shards, ctx_->progress.total_shards);
GAUGE_UPDATE(*this, snp_rcvr_complete_blob, ctx_->progress.complete_blobs);
GAUGE_UPDATE(*this, snp_rcvr_complete_bytes, ctx_->progress.complete_bytes);
GAUGE_UPDATE(*this, snp_rcvr_complete_shards, ctx_->progress.complete_shards);
GAUGE_UPDATE(*this, snp_rcvr_corrupted_blobs, ctx_->progress.corrupted_blobs);
GAUGE_UPDATE(*this, snp_rcvr_error_count, ctx_->progress.error_count);
auto duration = get_elapsed_time_ms(ctx_->progress.start_time * 1000) / 1000;
GAUGE_UPDATE(*this, snp_rcvr_elapsed_time_sec, duration);
}
}
private:
std::shared_ptr<SnapshotContext> ctx_;
};

HSHomeObject& home_obj_;
const shared< homestore::ReplDev > repl_dev_;

std::unique_ptr< SnapshotContext > ctx_;
std::shared_ptr< SnapshotContext > ctx_;
std::unique_ptr< ReceiverSnapshotMetrics > metrics_;

// Update the snp_info superblock
void update_snp_info_sb(bool init = false);
Expand Down
4 changes: 4 additions & 0 deletions src/lib/homestore_backend/hs_pg_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -578,6 +578,10 @@ uint32_t HSHomeObject::HS_PG::open_shards() const {
return std::count_if(shards_.begin(), shards_.end(), [](auto const& s) { return s->is_open(); });
}

uint32_t HSHomeObject::HS_PG::get_snp_progress() const {
return snp_rcvr_info_sb_->progress.complete_bytes / snp_rcvr_info_sb_->progress.total_bytes;
}

// NOTE: caller should hold the _pg_lock
const HSHomeObject::HS_PG* HSHomeObject::_get_hs_pg_unlocked(pg_id_t pg_id) const {
auto iter = _pg_map.find(pg_id);
Expand Down
31 changes: 26 additions & 5 deletions src/lib/homestore_backend/pg_blob_iterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#include <sisl/logging/logging.h>
#include <sisl/options/options.h>
#include <sisl/settings/settings.hpp>
#include <sisl/metrics/metrics.hpp>
#include "generated/resync_blob_data_generated.h"
#include "generated/resync_pg_data_generated.h"
#include "generated/resync_shard_data_generated.h"
Expand All @@ -16,6 +17,7 @@ HSHomeObject::PGBlobIterator::PGBlobIterator(HSHomeObject& home_obj, homestore::
auto pg = get_pg_metadata();
pg_id_ = pg->pg_info_.id;
repl_dev_ = static_cast< HS_PG* >(pg)->repl_dev_;
metrics_ = make_unique< DonerSnapshotMetrics >(pg_id_);
max_batch_size_ = HS_BACKEND_DYNAMIC_CONFIG(max_snapshot_batch_size_mb) * Mi;
if (max_batch_size_ == 0) { max_batch_size_ = DEFAULT_MAX_BATCH_SIZE_MB * Mi; }

Expand All @@ -36,9 +38,18 @@ HSHomeObject::PGBlobIterator::PGBlobIterator(HSHomeObject& home_obj, homestore::

//result represents if the objId is valid and the cursors are updated
bool HSHomeObject::PGBlobIterator::update_cursor(objId id) {
if (cur_batch_start_time_ != Clock::time_point{}) {
HISTOGRAM_OBSERVE(*metrics_, snp_dnr_batch_e2e_time, get_elapsed_time_us(cur_batch_start_time_));
}
cur_batch_start_time_ = Clock::now();

if (id.value == LAST_OBJ_ID) { return true; }
//resend batch
if (id.value == cur_obj_id_.value) { return true; }
if (id.value == cur_obj_id_.value) {
LOGT("resend the same batch, objId:{}, cur_obj_id:{}", id.to_string(), cur_obj_id_.to_string());
COUNTER_INCREMENT(*metrics_, snp_dnr_resend_count, 1);
return true;
}

// If cur_obj_id_ == 0|0 (PG meta), this may be a request for resuming from specific shard
if (cur_obj_id_.shard_seq_num == 0 && id.shard_seq_num != 0 && id.batch_id == 0) {
Expand Down Expand Up @@ -122,12 +133,15 @@ bool HSHomeObject::PGBlobIterator::create_pg_snapshot_data(sisl::io_blob_safe& m
members.push_back(CreateMemberDirect(builder_, &id, member.name.c_str(), member.priority));
}
std::vector< uint64_t > shard_ids;
auto total_bytes = pg->durable_entities().total_occupied_blk_count.load(std::memory_order_relaxed) * repl_dev_->
get_blk_size();
auto total_blobs = pg->durable_entities().active_blob_count.load(std::memory_order_relaxed);
for (auto& shard : shard_list_) {
shard_ids.push_back(shard.info.id);
}

auto pg_entry = CreateResyncPGMetaDataDirect(builder_, pg_info.id, &uuid, pg_info.size, pg_info.chunk_size, pg->durable_entities().blob_sequence_num,
pg->shard_sequence_num_, &members, &shard_ids);
pg->shard_sequence_num_, &members, &shard_ids, total_blobs, total_bytes);
builder_.FinishSizePrefixed(pg_entry);

pack_resync_message(meta_blob, SyncMessageType::PG_META);
Expand Down Expand Up @@ -183,15 +197,15 @@ BlobManager::AsyncResult< sisl::io_blob_safe > HSHomeObject::PGBlobIterator::loa

BlobHeader const* header = r_cast< BlobHeader const* >(read_buf.cbytes());
if (!header->valid()) {
//TODO add metrics
//The metrics for corrupted blob is handled on the follower side.
LOGE("Invalid header found, shard_id={}, blob_id={}, [header={}]", shard_id, blob_id,
header->to_string());
state = ResyncBlobState::CORRUPTED;
return std::move(read_buf);
}

if (header->shard_id != shard_id) {
//TODO add metrics
//The metrics for corrupted blob is handled on the follower side.
LOGE("Invalid shard_id in header, shard_id={}, blob_id={}, [header={}]", shard_id,
blob_id, header->to_string());
state = ResyncBlobState::CORRUPTED;
Expand Down Expand Up @@ -223,6 +237,7 @@ BlobManager::AsyncResult< sisl::io_blob_safe > HSHomeObject::PGBlobIterator::loa
}

bool HSHomeObject::PGBlobIterator::create_blobs_snapshot_data(sisl::io_blob_safe& data_blob) {
auto batch_start = Clock::now();
std::vector< ::flatbuffers::Offset< ResyncBlobData > > blob_entries;

bool end_of_shard = false;
Expand All @@ -240,6 +255,8 @@ bool HSHomeObject::PGBlobIterator::create_blobs_snapshot_data(sisl::io_blob_safe
continue;
}

auto blob_start = Clock::now();

#ifdef _PRERELEASE
if (iomgr_flip::instance()->test_flip("pg_blob_iterator_load_blob_data_error")) {
LOGW("Simulating loading blob data error");
Expand Down Expand Up @@ -269,10 +286,11 @@ bool HSHomeObject::PGBlobIterator::create_blobs_snapshot_data(sisl::io_blob_safe
if (blob.size() == 0) {
LOGE("Failed to retrieve blob for shard={} blob={} pbas={}", info.shard_id, info.blob_id,
info.pbas.to_string());
//TODO add metrics
COUNTER_INCREMENT(*metrics_, snp_dnr_error_count, 1);
return false;
}

HISTOGRAM_OBSERVE(*metrics_, snp_dnr_blob_process_time, get_elapsed_time_us(blob_start));
std::vector< uint8_t > data(blob.cbytes(), blob.cbytes() + blob.size());
blob_entries.push_back(CreateResyncBlobDataDirect(builder_, info.blob_id, (uint8_t)state, &data));
total_bytes += blob.size();
Expand All @@ -286,7 +304,10 @@ bool HSHomeObject::PGBlobIterator::create_blobs_snapshot_data(sisl::io_blob_safe
LOGD("create blobs snapshot data batch: shard_seq_num={}, batch_num={}, total_bytes={}, blob_num={}, end_of_shard={}",
cur_obj_id_.shard_seq_num, cur_obj_id_.batch_id, total_bytes, blob_entries.size(), end_of_shard);

COUNTER_INCREMENT(*metrics_, snp_dnr_load_blob, blob_entries.size());
COUNTER_INCREMENT(*metrics_, snp_dnr_load_bytes, total_bytes);
pack_resync_message(data_blob, SyncMessageType::SHARD_BATCH);
HISTOGRAM_OBSERVE(*metrics_, snp_dnr_batch_process_time, get_elapsed_time_us(batch_start));
return true;
}

Expand Down
7 changes: 4 additions & 3 deletions src/lib/homestore_backend/replication_state_machine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ bool ReplicationStateMachine::apply_snapshot(std::shared_ptr< homestore::snapsho
std::this_thread::sleep_for(std::chrono::milliseconds(delay.get()));
}
#endif
m_snp_rcv_handler->destroy_context();
m_snp_rcv_handler->destroy_context_and_metrics();

std::lock_guard lk(m_snapshot_lock);
set_snapshot_context(context);
Expand Down Expand Up @@ -321,7 +321,7 @@ void ReplicationStateMachine::write_snapshot_obj(std::shared_ptr< homestore::sna
auto r_dev = repl_dev();
if (!m_snp_rcv_handler) {
m_snp_rcv_handler = std::make_unique< HSHomeObject::SnapshotReceiveHandler >(*home_object_, r_dev);
if (m_snp_rcv_handler->load_prev_context()) {
if (m_snp_rcv_handler->load_prev_context_and_metrics()) {
LOGI("Reloaded previous snapshot context, lsn:{} pg_id:{} next_shard:{}", context->get_lsn(),
m_snp_rcv_handler->get_context_pg_id(), m_snp_rcv_handler->get_next_shard());
}
Expand All @@ -330,6 +330,7 @@ void ReplicationStateMachine::write_snapshot_obj(std::shared_ptr< homestore::sna
auto obj_id = objId(snp_obj->offset);
auto log_suffix = fmt::format("group={} lsn={} shard={} batch_num={} size={}", uuids::to_string(r_dev->group_id()),
context->get_lsn(), obj_id.shard_seq_num, obj_id.batch_id, snp_obj->blob.size());
LOGI("received snapshot obj, {}", log_suffix);

if (snp_obj->is_last_obj) {
LOGD("Write snapshot reached is_last_obj true {}", log_suffix);
Expand Down Expand Up @@ -382,7 +383,7 @@ void ReplicationStateMachine::write_snapshot_obj(std::shared_ptr< homestore::sna
home_object_->pg_destroy(pg_data->pg_id());
}
LOGI("reset context from lsn:{} to lsn:{}", m_snp_rcv_handler->get_context_lsn(), context->get_lsn());
m_snp_rcv_handler->reset_context(context->get_lsn(), pg_data->pg_id());
m_snp_rcv_handler->reset_context_and_metrics(context->get_lsn(), pg_data->pg_id());

auto ret = m_snp_rcv_handler->process_pg_snapshot_data(*pg_data);
if (ret) {
Expand Down
2 changes: 2 additions & 0 deletions src/lib/homestore_backend/resync_pg_data.fbs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ table ResyncPGMetaData {
shard_seq_num: uint64; // shard sequence number, used to assign next shard id;
members : [Member]; // peers;
shard_ids : [uint64]; // shard ids to transmit;
total_blobs_to_transfer : uint64; // total used bytes of the pg
total_bytes_to_transfer : uint64; // total capacity of the pg
}

// ResyncPGMetaData schema is the first message(ObjID=0) in the resync data stream
Expand Down
Loading