diff --git a/conanfile.py b/conanfile.py index 092550c90..0ec8f173d 100644 --- a/conanfile.py +++ b/conanfile.py @@ -9,7 +9,7 @@ class HomeObjectConan(ConanFile): name = "homeobject" - version = "2.2.12" + version = "2.2.13" homepage = "https://github.com/eBay/HomeObject" description = "Blob Store built on HomeReplication" diff --git a/src/include/homeobject/blob_manager.hpp b/src/include/homeobject/blob_manager.hpp index a9bb40e68..153018cfc 100644 --- a/src/include/homeobject/blob_manager.hpp +++ b/src/include/homeobject/blob_manager.hpp @@ -10,7 +10,8 @@ namespace homeobject { ENUM(BlobErrorCode, uint16_t, UNKNOWN = 1, TIMEOUT, INVALID_ARG, UNSUPPORTED_OP, NOT_LEADER, REPLICATION_ERROR, - UNKNOWN_SHARD, UNKNOWN_BLOB, CHECKSUM_MISMATCH, READ_FAILED, INDEX_ERROR, SEALED_SHARD, RETRY_REQUEST); + UNKNOWN_SHARD, UNKNOWN_BLOB, UNKNOWN_PG, CHECKSUM_MISMATCH, READ_FAILED, INDEX_ERROR, SEALED_SHARD, RETRY_REQUEST, + SHUTTING_DOWN); struct BlobError { BlobErrorCode code; // set when we are not the current leader of the PG. @@ -27,7 +28,8 @@ struct BlobError { struct Blob { Blob() = default; Blob(sisl::io_blob_safe b, std::string const& u, uint64_t o) : body(std::move(b)), user_key(u), object_off(o) {} - Blob(sisl::io_blob_safe b, std::string const& u, uint64_t o, peer_id_t l) : body(std::move(b)), user_key(u), object_off(o), current_leader(l) {} + Blob(sisl::io_blob_safe b, std::string const& u, uint64_t o, peer_id_t l) : + body(std::move(b)), user_key(u), object_off(o), current_leader(l) {} Blob clone() const; diff --git a/src/include/homeobject/pg_manager.hpp b/src/include/homeobject/pg_manager.hpp index 4819052ae..9b0ccd987 100644 --- a/src/include/homeobject/pg_manager.hpp +++ b/src/include/homeobject/pg_manager.hpp @@ -12,7 +12,7 @@ namespace homeobject { ENUM(PGError, uint16_t, UNKNOWN = 1, INVALID_ARG, TIMEOUT, UNKNOWN_PG, NOT_LEADER, UNKNOWN_PEER, UNSUPPORTED_OP, - CRC_MISMATCH, NO_SPACE_LEFT, DRIVE_WRITE_ERROR, RETRY_REQUEST); + CRC_MISMATCH, NO_SPACE_LEFT, DRIVE_WRITE_ERROR, RETRY_REQUEST, SHUTTING_DOWN); struct PGMember { // Max length is based on homestore::replica_member_info::max_name_len - 1. Last byte is null terminated. diff --git a/src/include/homeobject/shard_manager.hpp b/src/include/homeobject/shard_manager.hpp index b94ff55a0..8894f7ec5 100644 --- a/src/include/homeobject/shard_manager.hpp +++ b/src/include/homeobject/shard_manager.hpp @@ -10,7 +10,7 @@ namespace homeobject { ENUM(ShardError, uint16_t, UNKNOWN = 1, TIMEOUT, INVALID_ARG, NOT_LEADER, UNSUPPORTED_OP, UNKNOWN_PG, UNKNOWN_SHARD, - PG_NOT_READY, CRC_MISMATCH, NO_SPACE_LEFT, RETRY_REQUEST); + PG_NOT_READY, CRC_MISMATCH, NO_SPACE_LEFT, RETRY_REQUEST, SHUTTING_DOWN); struct ShardInfo { enum class State : uint8_t { @@ -22,7 +22,7 @@ struct ShardInfo { shard_id_t id; pg_id_t placement_group; State state; - uint64_t lsn; // created_lsn + uint64_t lsn; // created_lsn uint64_t created_time; uint64_t last_modified_time; uint64_t available_capacity_bytes; diff --git a/src/lib/homestore_backend/hs_blob_manager.cpp b/src/lib/homestore_backend/hs_blob_manager.cpp index 81d1ee599..582889e6f 100644 --- a/src/lib/homestore_backend/hs_blob_manager.cpp +++ b/src/lib/homestore_backend/hs_blob_manager.cpp @@ -82,6 +82,12 @@ struct put_blob_req_ctx : public repl_result_ctx< BlobManager::Result< HSHomeObj }; BlobManager::AsyncResult< blob_id_t > HSHomeObject::_put_blob(ShardInfo const& shard, Blob&& blob) { + if (is_shutting_down()) { + LOGI("service is being shut down"); + return folly::makeUnexpected(BlobErrorCode::SHUTTING_DOWN); + } + incr_pending_request_num(); + auto& pg_id = shard.placement_group; shared< homestore::ReplDev > repl_dev; blob_id_t new_blob_id; @@ -101,11 +107,13 @@ BlobManager::AsyncResult< blob_id_t > HSHomeObject::_put_blob(ShardInfo const& s if (!repl_dev->is_leader()) { LOGW("failed to put blob for pg [{}], shard [{}], not leader", pg_id, shard.id); + decr_pending_request_num(); return folly::makeUnexpected(BlobError(BlobErrorCode::NOT_LEADER, repl_dev->get_leader_id())); } if (!repl_dev->is_ready_for_traffic()) { LOGW("failed to put blob for pg [{}], shard [{}], not ready for traffic", pg_id, shard.id); + decr_pending_request_num(); return folly::makeUnexpected(BlobError(BlobErrorCode::RETRY_REQUEST)); } @@ -176,10 +184,12 @@ BlobManager::AsyncResult< blob_id_t > HSHomeObject::_put_blob(ShardInfo const& s if (result.hasError()) { auto err = result.error(); if (err.getCode() == BlobErrorCode::NOT_LEADER) { err.current_leader = repl_dev->get_leader_id(); } + decr_pending_request_num(); return folly::makeUnexpected(err); } auto blob_info = result.value(); BLOGT(blob_info.shard_id, blob_info.blob_id, "Put blob success blkid=[{}]", blob_info.pbas.to_string()); + decr_pending_request_num(); return blob_info.blob_id; }); } @@ -258,6 +268,12 @@ void HSHomeObject::on_blob_put_commit(int64_t lsn, sisl::blob const& header, sis BlobManager::AsyncResult< Blob > HSHomeObject::_get_blob(ShardInfo const& shard, blob_id_t blob_id, uint64_t req_offset, uint64_t req_len) const { + if (is_shutting_down()) { + LOGI("service is being shut down"); + return folly::makeUnexpected(BlobErrorCode::SHUTTING_DOWN); + } + incr_pending_request_num(); + auto& pg_id = shard.placement_group; auto hs_pg = get_hs_pg(pg_id); RELEASE_ASSERT(hs_pg, "PG not found"); @@ -269,12 +285,14 @@ BlobManager::AsyncResult< Blob > HSHomeObject::_get_blob(ShardInfo const& shard, if (!repl_dev->is_ready_for_traffic()) { LOGW("failed to get blob for pg [{}], shard [{}], not ready for traffic", pg_id, shard.id); + decr_pending_request_num(); return folly::makeUnexpected(BlobError(BlobErrorCode::RETRY_REQUEST)); } auto r = get_blob_from_index_table(index_table, shard.id, blob_id); if (!r) { BLOGE(shard.id, blob_id, "Blob not found in index during get blob"); + decr_pending_request_num(); return folly::makeUnexpected(r.error()); } @@ -298,17 +316,20 @@ BlobManager::AsyncResult< Blob > HSHomeObject::_get_blob_data(const shared< home read_buf = std::move(read_buf)](auto&& result) mutable -> BlobManager::AsyncResult< Blob > { if (result) { BLOGE(shard_id, blob_id, "Failed to get blob: err={}", blob_id, shard_id, result.value()); + decr_pending_request_num(); return folly::makeUnexpected(BlobError(BlobErrorCode::READ_FAILED)); } BlobHeader const* header = r_cast< BlobHeader const* >(read_buf.cbytes()); if (!header->valid()) { BLOGE(shard_id, blob_id, "Invalid header found: [header={}]", header->to_string()); + decr_pending_request_num(); return folly::makeUnexpected(BlobError(BlobErrorCode::READ_FAILED)); } if (header->shard_id != shard_id) { BLOGE(shard_id, blob_id, "Invalid shard_id in header: [header={}]", header->to_string()); + decr_pending_request_num(); return folly::makeUnexpected(BlobError(BlobErrorCode::READ_FAILED)); } @@ -325,12 +346,14 @@ BlobManager::AsyncResult< Blob > HSHomeObject::_get_blob_data(const shared< home if (std::memcmp(computed_hash, header->hash, BlobHeader::blob_max_hash_len) != 0) { BLOGE(shard_id, blob_id, "Hash mismatch header [{}] [computed={:np}]", header->to_string(), spdlog::to_hex(computed_hash, computed_hash + BlobHeader::blob_max_hash_len)); + decr_pending_request_num(); return folly::makeUnexpected(BlobError(BlobErrorCode::CHECKSUM_MISMATCH)); } if (req_offset + req_len > header->blob_size) { BLOGE(shard_id, blob_id, "Invalid offset length requested in get blob offset={} len={} size={}", req_offset, req_len, header->blob_size); + decr_pending_request_num(); return folly::makeUnexpected(BlobError(BlobErrorCode::INVALID_ARG)); } @@ -341,6 +364,7 @@ BlobManager::AsyncResult< Blob > HSHomeObject::_get_blob_data(const shared< home std::memcpy(body.bytes(), blob_bytes + req_offset, res_len); BLOGT(blob_id, shard_id, "Blob get success: blkid={}", blkid.to_string()); + decr_pending_request_num(); return Blob(std::move(body), std::move(user_key), header->object_offset, repl_dev->get_leader_id()); }); } @@ -362,6 +386,7 @@ HSHomeObject::blob_put_get_blk_alloc_hints(sisl::blob const& header, cintrusive< auto hs_pg = get_hs_pg(msg_header->pg_id); if (hs_pg == nullptr) { LOGW("Received a blob_put on an unknown pg:{}, underlying engine will retry this later", msg_header->pg_id); + if (ctx) { ctx->promise_.setValue(folly::makeUnexpected(BlobError(BlobErrorCode::UNKNOWN_PG))); } return folly::makeUnexpected(homestore::ReplServiceError::RESULT_NOT_EXIST_YET); } @@ -370,6 +395,7 @@ HSHomeObject::blob_put_get_blk_alloc_hints(sisl::blob const& header, cintrusive< if (shard_iter == _shard_map.end()) { LOGW("Received a blob_put on an unknown shard:{}, underlying engine will retry this later", msg_header->shard_id); + if (ctx) { ctx->promise_.setValue(folly::makeUnexpected(BlobError(BlobErrorCode::UNKNOWN_SHARD))); } return folly::makeUnexpected(homestore::ReplServiceError::RESULT_NOT_EXIST_YET); } @@ -392,6 +418,12 @@ HSHomeObject::blob_put_get_blk_alloc_hints(sisl::blob const& header, cintrusive< } BlobManager::NullAsyncResult HSHomeObject::_del_blob(ShardInfo const& shard, blob_id_t blob_id) { + if (is_shutting_down()) { + LOGI("service is being shut down"); + return folly::makeUnexpected(BlobErrorCode::SHUTTING_DOWN); + } + incr_pending_request_num(); + BLOGT(shard.id, blob_id, "deleting blob"); auto& pg_id = shard.placement_group; auto hs_pg = get_hs_pg(pg_id); @@ -402,11 +434,13 @@ BlobManager::NullAsyncResult HSHomeObject::_del_blob(ShardInfo const& shard, blo if (!repl_dev->is_leader()) { LOGW("failed to del blob for pg [{}], shard [{}], blob_id [{}], not leader", pg_id, shard.id, blob_id); + decr_pending_request_num(); return folly::makeUnexpected(BlobError(BlobErrorCode::NOT_LEADER, repl_dev->get_leader_id())); } if (!repl_dev->is_ready_for_traffic()) { LOGW("failed to del blob for pg [{}], shard [{}], not ready for traffic", pg_id, shard.id); + decr_pending_request_num(); return folly::makeUnexpected(BlobError(BlobErrorCode::RETRY_REQUEST)); } @@ -424,14 +458,16 @@ BlobManager::NullAsyncResult HSHomeObject::_del_blob(ShardInfo const& shard, blo std::memcpy(req->key_buf().bytes(), &blob_id, sizeof(blob_id_t)); repl_dev->async_alloc_write(req->cheader_buf(), req->ckey_buf(), sisl::sg_list{}, req); - return req->result().deferValue([repl_dev](const auto& result) -> folly::Expected< folly::Unit, BlobError > { + return req->result().deferValue([this, repl_dev](const auto& result) -> folly::Expected< folly::Unit, BlobError > { if (result.hasError()) { auto err = result.error(); if (err.getCode() == BlobErrorCode::NOT_LEADER) { err.current_leader = repl_dev->get_leader_id(); } + decr_pending_request_num(); return folly::makeUnexpected(err); } auto blob_info = result.value(); BLOGT(blob_info.shard_id, blob_info.blob_id, "Delete blob successful"); + decr_pending_request_num(); return folly::Unit(); }); } @@ -470,7 +506,7 @@ void HSHomeObject::on_blob_del_commit(int64_t lsn, sisl::blob const& header, sis if (ctx) ctx->promise_.setValue(folly::makeUnexpected(r.error())); return; } else { - if (ctx) { ctx->promise_.setValue(BlobManager::Result< BlobInfo >(blob_info)); } + if (ctx) ctx->promise_.setValue(BlobManager::Result< BlobInfo >(blob_info)); return; } } diff --git a/src/lib/homestore_backend/hs_homeobject.cpp b/src/lib/homestore_backend/hs_homeobject.cpp index 36e710aae..2b5664e16 100644 --- a/src/lib/homestore_backend/hs_homeobject.cpp +++ b/src/lib/homestore_backend/hs_homeobject.cpp @@ -60,9 +60,7 @@ class HSReplApplication : public homestore::ReplApplication { return it->second; } - void on_repl_devs_init_completed() override { - _home_object->on_replica_restart(); - } + void on_repl_devs_init_completed() override { _home_object->on_replica_restart(); } std::pair< std::string, uint16_t > lookup_peer(homestore::replica_id_t uuid) const override { std::string endpoint; @@ -320,9 +318,22 @@ HSHomeObject::~HSHomeObject() { } trigger_timed_events(); #endif + + start_shutting_down(); + // Wait for all pending requests to complete + while (true) { + auto pending_reqs = get_pending_request_num(); + if (0 == pending_reqs) break; + LOGI("waiting for {} pending requests to complete", pending_reqs); + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + } + LOGI("start shutting down HomeStore"); + homestore::HomeStore::instance()->shutdown(); homestore::HomeStore::reset_instance(); iomanager.stop(); + + LOGI("complete shutting down HomeStore"); } HomeObjectStats HSHomeObject::_get_stats() const { @@ -340,7 +351,7 @@ HomeObjectStats HSHomeObject::_get_stats() const { stats.num_open_shards = num_open_shards; stats.avail_open_shards = chunk_selector()->total_chunks() - num_open_shards; - stats.num_disks = chunk_selector()->total_disks(); + stats.num_disks = chunk_selector()->total_disks(); return stats; } diff --git a/src/lib/homestore_backend/hs_homeobject.hpp b/src/lib/homestore_backend/hs_homeobject.hpp index 952eb2ada..101cfc011 100644 --- a/src/lib/homestore_backend/hs_homeobject.hpp +++ b/src/lib/homestore_backend/hs_homeobject.hpp @@ -393,7 +393,8 @@ class HSHomeObject : public HomeObjectImpl { bool update_cursor(objId id); objId expected_next_obj_id(); bool generate_shard_blob_list(); - BlobManager::AsyncResult< sisl::io_blob_safe > load_blob_data(const BlobInfo& blob_info, ResyncBlobState& state); + BlobManager::AsyncResult< sisl::io_blob_safe > load_blob_data(const BlobInfo& blob_info, + ResyncBlobState& state); bool create_pg_snapshot_data(sisl::io_blob_safe& meta_blob); bool create_shard_snapshot_data(sisl::io_blob_safe& meta_blob); bool create_blobs_snapshot_data(sisl::io_blob_safe& data_blob); @@ -409,7 +410,7 @@ class HSHomeObject : public HomeObjectImpl { objId cur_obj_id_{0, 0}; int64_t cur_shard_idx_{-1}; - std::vector cur_blob_list_{0}; + std::vector< BlobInfo > cur_blob_list_{0}; uint64_t cur_start_blob_idx_{0}; uint64_t cur_batch_blob_count_{0}; flatbuffers::FlatBufferBuilder builder_; @@ -677,7 +678,7 @@ class HSHomeObject : public HomeObjectImpl { const homestore::MultiBlkId& pbas, cintrusive< homestore::repl_req_ctx >& hs_ctx); void on_blob_del_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key, cintrusive< homestore::repl_req_ctx >& hs_ctx); - bool local_add_blob_info(pg_id_t pg_id, BlobInfo const &blob_info); + bool local_add_blob_info(pg_id_t pg_id, BlobInfo const& blob_info); homestore::ReplResult< homestore::blk_alloc_hints > blob_put_get_blk_alloc_hints(sisl::blob const& header, cintrusive< homestore::repl_req_ctx >& ctx); void compute_blob_payload_hash(BlobHeader::HashAlgorithm algorithm, const uint8_t* blob_bytes, size_t blob_size, @@ -746,13 +747,28 @@ class HSHomeObject : public HomeObjectImpl { */ void destroy_pg_index_table(pg_id_t pg_id); -/** + /** * @brief Destroy the superblock for the PG identified by pg_id. * Ensures all operations are persisted by triggering a cp flush before destruction. * * @param pg_id The ID of the PG to be destroyed. */ + void destroy_pg_superblk(pg_id_t pg_id); + + // graceful shutdown related +private: + std::atomic_bool shutting_down{false}; + mutable std::atomic_uint64_t pending_request_num{0}; + + bool is_shutting_down() const { return shutting_down.load(); } + void start_shutting_down() { shutting_down = true; } + + uint64_t get_pending_request_num() const { return pending_request_num.load(); } + + // only leader will call incr and decr pending request num + void incr_pending_request_num() const { pending_request_num++; } + void decr_pending_request_num() const { pending_request_num--; } }; class BlobIndexServiceCallbacks : public homestore::IndexServiceCallbacks { diff --git a/src/lib/homestore_backend/hs_pg_manager.cpp b/src/lib/homestore_backend/hs_pg_manager.cpp index 6f52686bf..3fa97a3e4 100644 --- a/src/lib/homestore_backend/hs_pg_manager.cpp +++ b/src/lib/homestore_backend/hs_pg_manager.cpp @@ -61,18 +61,30 @@ PGError toPgError(ReplServiceError const& e) { } PGManager::NullAsyncResult HSHomeObject::_create_pg(PGInfo&& pg_info, std::set< peer_id_t > const& peers) { + if (is_shutting_down()) { + LOGI("service is being shut down"); + return folly::makeUnexpected(PGError::SHUTTING_DOWN); + } + incr_pending_request_num(); + auto pg_id = pg_info.id; - if (pg_exists(pg_id)) return folly::Unit(); + if (pg_exists(pg_id)) { + LOGW("pg already exists! pg_id {}", pg_id); + decr_pending_request_num(); + return folly::Unit(); + } const auto chunk_size = chunk_selector()->get_chunk_size(); if (pg_info.size < chunk_size) { LOGW("Not support to create PG which pg_size {} < chunk_size {}", pg_info.size, chunk_size); + decr_pending_request_num(); return folly::makeUnexpected(PGError::INVALID_ARG); } auto const num_chunk = chunk_selector()->select_chunks_for_pg(pg_id, pg_info.size); if (!num_chunk.has_value()) { LOGW("Failed to select chunks for pg {}", pg_id); + decr_pending_request_num(); return folly::makeUnexpected(PGError::NO_SPACE_LEFT); } @@ -106,15 +118,16 @@ PGManager::NullAsyncResult HSHomeObject::_create_pg(PGInfo&& pg_info, std::set< // if don't have repl dev, it will return ReplServiceError::SERVER_NOT_FOUND return hs_repl_service() .remove_repl_dev(repl_dev_group_id) - .deferValue([r, repl_dev_group_id](auto&& e) -> PGManager::NullAsyncResult { + .deferValue([r, repl_dev_group_id, this](auto&& e) -> PGManager::NullAsyncResult { if (e != ReplServiceError::OK) { LOGW("Failed to remove repl device which group_id {}, error: {}", repl_dev_group_id, e); } - + decr_pending_request_num(); // still return the original error return folly::makeUnexpected(r.error()); }); } + decr_pending_request_num(); return folly::Unit(); }); } @@ -232,11 +245,22 @@ void HSHomeObject::on_create_pg_message_commit(int64_t lsn, sisl::blob const& he PGManager::NullAsyncResult HSHomeObject::_replace_member(pg_id_t pg_id, peer_id_t const& old_member_id, PGMember const& new_member, uint32_t commit_quorum) { + if (is_shutting_down()) { + LOGI("service is being shut down"); + return folly::makeUnexpected(PGError::SHUTTING_DOWN); + } + incr_pending_request_num(); + auto hs_pg = get_hs_pg(pg_id); - if (hs_pg == nullptr) return folly::makeUnexpected(PGError::UNKNOWN_PG); + if (hs_pg == nullptr) { + decr_pending_request_num(); + return folly::makeUnexpected(PGError::UNKNOWN_PG); + } + auto& repl_dev = pg_repl_dev(*hs_pg); if (!repl_dev.is_leader() && commit_quorum == 0) { // Only leader can replace a member + decr_pending_request_num(); return folly::makeUnexpected(PGError::NOT_LEADER); } auto group_id = repl_dev.group_id(); @@ -255,6 +279,7 @@ PGManager::NullAsyncResult HSHomeObject::_replace_member(pg_id_t pg_id, peer_id_ .replace_member(group_id, out_replica, in_replica, commit_quorum) .via(executor_) .thenValue([this](auto&& v) mutable -> PGManager::NullAsyncResult { + decr_pending_request_num(); if (v.hasError()) { return folly::makeUnexpected(toPgError(v.error())); } return folly::Unit(); }); @@ -350,10 +375,10 @@ void HSHomeObject::destroy_hs_resources(pg_id_t pg_id) { LOGW("get repl dev for group_id={} has failed", boost::uuids::to_string(group_id)); return; } - v.value()->purge(); + v.value()->purge(); - // Step 2: reset pg chunks - chunk_selector_->reset_pg_chunks(pg_id); + // Step 2: reset pg chunks + chunk_selector_->reset_pg_chunks(pg_id); } void HSHomeObject::destroy_pg_index_table(pg_id_t pg_id) { diff --git a/src/lib/homestore_backend/hs_shard_manager.cpp b/src/lib/homestore_backend/hs_shard_manager.cpp index 8e3448a8a..e3b6e308c 100644 --- a/src/lib/homestore_backend/hs_shard_manager.cpp +++ b/src/lib/homestore_backend/hs_shard_manager.cpp @@ -44,7 +44,6 @@ ShardError toShardError(ReplServiceError const& e) { } } - uint64_t ShardManager::max_shard_size() { return Gi; } uint64_t ShardManager::max_shard_num_in_pg() { return ((uint64_t)0x01) << shard_width; } @@ -94,25 +93,36 @@ ShardInfo HSHomeObject::deserialize_shard_info(const char* json_str, size_t str_ } ShardManager::AsyncResult< ShardInfo > HSHomeObject::_create_shard(pg_id_t pg_owner, uint64_t size_bytes) { + + if (is_shutting_down()) { + LOGI("service is being shut down"); + return folly::makeUnexpected(ShardError::SHUTTING_DOWN); + } + incr_pending_request_num(); + auto hs_pg = get_hs_pg(pg_owner); if (!hs_pg) { LOGW("failed to create shard with non-exist pg [{}]", pg_owner); + decr_pending_request_num(); return folly::makeUnexpected(ShardError::UNKNOWN_PG); } auto repl_dev = hs_pg->repl_dev_; if (!repl_dev) { LOGW("failed to get repl dev instance for pg [{}]", pg_owner); + decr_pending_request_num(); return folly::makeUnexpected(ShardError::PG_NOT_READY); } if (!repl_dev->is_leader()) { LOGW("failed to create shard for pg [{}], not leader", pg_owner); + decr_pending_request_num(); return folly::makeUnexpected(ShardError::NOT_LEADER); } if (!repl_dev->is_ready_for_traffic()) { LOGW("failed to create shard for pg [{}], not ready for traffic", pg_owner); + decr_pending_request_num(); return folly::makeUnexpected(ShardError::RETRY_REQUEST); } @@ -123,6 +133,7 @@ ShardManager::AsyncResult< ShardInfo > HSHomeObject::_create_shard(pg_id_t pg_ow const auto v_chunkID = chunk_selector()->get_most_available_blk_chunk(pg_owner); if (!v_chunkID.has_value()) { LOGW("no availble chunk left to create shard for pg [{}]", pg_owner); + decr_pending_request_num(); return folly::makeUnexpected(ShardError::NO_SPACE_LEFT); } @@ -168,6 +179,12 @@ ShardManager::AsyncResult< ShardInfo > HSHomeObject::_create_shard(pg_id_t pg_ow } ShardManager::AsyncResult< ShardInfo > HSHomeObject::_seal_shard(ShardInfo const& info) { + if (is_shutting_down()) { + LOGI("service is being shut down"); + return folly::makeUnexpected(ShardError::SHUTTING_DOWN); + } + incr_pending_request_num(); + auto pg_id = info.placement_group; auto shard_id = info.id; @@ -178,11 +195,13 @@ ShardManager::AsyncResult< ShardInfo > HSHomeObject::_seal_shard(ShardInfo const if (!repl_dev->is_leader()) { LOGW("failed to seal shard for shard [{}], not leader", shard_id); + decr_pending_request_num(); return folly::makeUnexpected(ShardError::NOT_LEADER); } if (!repl_dev->is_ready_for_traffic()) { LOGW("failed to seal shard for shard [{}], not ready for traffic", shard_id); + decr_pending_request_num(); return folly::makeUnexpected(ShardError::RETRY_REQUEST); } @@ -232,7 +251,12 @@ bool HSHomeObject::on_shard_message_pre_commit(int64_t lsn, sisl::blob const& he const ReplicationMessageHeader* msg_header = r_cast< const ReplicationMessageHeader* >(header.cbytes()); if (msg_header->corrupted()) { LOGW("replication message header is corrupted with crc error, lsn:{}", lsn); - if (ctx) { ctx->promise_.setValue(folly::makeUnexpected(ShardError::CRC_MISMATCH)); } + if (ctx) { + ctx->promise_.setValue(folly::makeUnexpected(ShardError::CRC_MISMATCH)); + decr_pending_request_num(); + } + // TODO::if fail to pre_commit, shuold we crash here? + return false; } switch (msg_header->msg_type) { @@ -271,14 +295,22 @@ void HSHomeObject::on_shard_message_rollback(int64_t lsn, sisl::blob const& head const ReplicationMessageHeader* msg_header = r_cast< const ReplicationMessageHeader* >(header.cbytes()); if (msg_header->corrupted()) { LOGW("replication message header is corrupted with crc error, lsn:{}", lsn); - if (ctx) { ctx->promise_.setValue(folly::makeUnexpected(ShardError::CRC_MISMATCH)); } + if (ctx) { + ctx->promise_.setValue(folly::makeUnexpected(ShardError::CRC_MISMATCH)); + decr_pending_request_num(); + } return; } switch (msg_header->msg_type) { case ReplicationMessageType::CREATE_SHARD_MSG: { bool res = release_chunk_based_on_create_shard_message(header); - if (!res) { LOGW("failed to release chunk based on create shard msg"); } + if (!res) { + // FIXME: should we crash here? + LOGW("failed to release chunk based on create shard msg"); + } + // TODO:set a proper error code + if (ctx) { ctx->promise_.setValue(folly::makeUnexpected(ShardError::RETRY_REQUEST)); } break; } case ReplicationMessageType::SEAL_SHARD_MSG: { @@ -298,12 +330,16 @@ void HSHomeObject::on_shard_message_rollback(int64_t lsn, sisl::blob const& head shard_info.id); } } + // TODO:set a proper error code + if (ctx) { ctx->promise_.setValue(folly::makeUnexpected(ShardError::RETRY_REQUEST)); } + break; } default: { break; } } + if (ctx) decr_pending_request_num(); } void HSHomeObject::local_create_shard(ShardInfo shard_info, homestore::chunk_num_t v_chunk_id, @@ -321,7 +357,9 @@ void HSHomeObject::local_create_shard(ShardInfo shard_info, homestore::chunk_num auto pg_id = shard_info.placement_group; auto chunk = chunk_selector_->select_specific_chunk(pg_id, v_chunk_id); RELEASE_ASSERT(chunk != nullptr, "chunk selection failed with v_chunk_id: {} in PG: {}", v_chunk_id, pg_id); - } else { LOGD("shard {} already exist, skip creating shard", shard_info.id); } + } else { + LOGD("shard {} already exist, skip creating shard", shard_info.id); + } // update pg's total_occupied_blk_count auto hs_pg = get_hs_pg(shard_info.placement_group); @@ -341,7 +379,11 @@ void HSHomeObject::on_shard_message_commit(int64_t lsn, sisl::blob const& h, hom auto header = r_cast< const ReplicationMessageHeader* >(h.cbytes()); if (header->corrupted()) { LOGW("replication message header is corrupted with crc error, lsn:{}", lsn); - if (ctx) { ctx->promise_.setValue(folly::makeUnexpected(ShardError::CRC_MISMATCH)); } + if (ctx) { + ctx->promise_.setValue(folly::makeUnexpected(ShardError::CRC_MISMATCH)); + decr_pending_request_num(); + } + // TODO::if fail to commit, shuold we crash here? return; } @@ -355,14 +397,20 @@ void HSHomeObject::on_shard_message_commit(int64_t lsn, sisl::blob const& h, hom std::error_code err = repl_dev->async_read(blkids, value_sgs, value_blob.size()).get(); if (err) { LOGW("failed to read data from homestore blks, lsn:{}", lsn); - if (ctx) { ctx->promise_.setValue(folly::makeUnexpected(ShardError::UNKNOWN)); } + if (ctx) { + ctx->promise_.setValue(folly::makeUnexpected(ShardError::UNKNOWN)); + decr_pending_request_num(); + } return; } if (crc32_ieee(init_crc32, value.cbytes(), value.size()) != header->payload_crc) { // header & value is inconsistent; LOGW("replication message header is inconsistent with value, lsn:{}", lsn); - if (ctx) { ctx->promise_.setValue(folly::makeUnexpected(ShardError::CRC_MISMATCH)); } + if (ctx) { + ctx->promise_.setValue(folly::makeUnexpected(ShardError::CRC_MISMATCH)); + decr_pending_request_num(); + } return; } #endif @@ -405,11 +453,16 @@ void HSHomeObject::on_shard_message_commit(int64_t lsn, sisl::blob const& h, hom LOGW("try to commit SEAL_SHARD_MSG but shard state is not sealed, shard_id: {}", shard_info.id); if (ctx) { ctx->promise_.setValue(ShardManager::Result< ShardInfo >(shard_info)); } LOGI("Commit done for SEAL_SHARD_MSG for shard {}", shard_info.id); + break; } default: break; } + + // only for leader and it`s not log replay, then we can make sure this is a on-line client request and we should dec + // the pending request num. + if (ctx) decr_pending_request_num(); } void HSHomeObject::on_shard_meta_blk_found(homestore::meta_blk* mblk, sisl::byte_view buf) {