Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 37 additions & 17 deletions src/cart/crt_bulk.c
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* (C) Copyright 2016-2024 Intel Corporation.
* (C) Copyright 2025 Google LLC
* (C) Copyright 2025 Hewlett Packard Enterprise Development LP
* (C) Copyright 2025-2026 Hewlett Packard Enterprise Development LP
*
* SPDX-License-Identifier: BSD-2-Clause-Patent
*/
Expand All @@ -16,8 +16,8 @@
static inline bool
crt_sgl_valid(d_sg_list_t *sgl)
{
d_iov_t *iov;
int i;
d_iov_t *iov;
int i;

if (sgl == NULL || sgl->sg_nr == 0) {
if (sgl == NULL)
Expand Down Expand Up @@ -129,6 +129,7 @@ crt_bulk_create(crt_context_t crt_ctx, d_sg_list_t *sgl,
ret_hdl->hg_bulk_hdl = HG_BULK_NULL;
ret_hdl->crt_ctx = crt_ctx;
ret_hdl->deferred = true;

D_GOTO(out, rc = DER_SUCCESS);
}

Expand All @@ -137,6 +138,7 @@ crt_bulk_create(crt_context_t crt_ctx, d_sg_list_t *sgl,

rc = crt_hg_bulk_create(&ctx->cc_hg_ctx, sgl, bulk_perm, &ret_hdl->hg_bulk_hdl);
if (rc != 0) {
CRT_METRIC_INC(ctx, CM_BULK_CREATE_FAILED);
D_ERROR("crt_hg_bulk_create() failed, rc: " DF_RC "\n", DP_RC(rc));
if (ret_hdl->iovs != NULL)
D_FREE(ret_hdl->iovs);
Expand All @@ -145,6 +147,8 @@ crt_bulk_create(crt_context_t crt_ctx, d_sg_list_t *sgl,
D_GOTO(out, rc);
}

CRT_METRIC_INC(ctx, CM_BULK_CREATE);

out:
if (rc == 0 && bulk_hdl)
*bulk_hdl = ret_hdl;
Expand Down Expand Up @@ -175,6 +179,8 @@ crt_bulk_bind(crt_bulk_t crt_bulk, crt_context_t crt_ctx)
}

out:
if (rc == 0)
CRT_METRIC_INC(ctx, CM_BULK_BOUND);
return rc;
}

Expand Down Expand Up @@ -212,25 +218,39 @@ crt_bulk_free(crt_bulk_t crt_bulk)
D_GOTO(out, rc = -DER_INVAL);
}

/* This can happen if D_QUOTA_BULKS is enabled on a client */
if (bulk->hg_bulk_hdl == HG_BULK_NULL) {
if (bulk->deferred) {
/* Treat as success */
D_GOTO(out, rc = DER_SUCCESS);
} else {
D_ASSERTF(0, "Bulk handle should not be NULL\n");
if (atomic_fetch_sub(&bulk->refcount, 1) > 1)
return DER_SUCCESS;

crt_bulk_free_common(bulk);

return DER_SUCCESS;
}

void
crt_bulk_free_common(struct crt_bulk *bulk)
{
struct crt_context *ctx;
hg_return_t hg_ret;

D_ASSERT(bulk != NULL);

if (bulk->hg_bulk_hdl != HG_BULK_NULL) {
hg_ret = HG_Bulk_free(bulk->hg_bulk_hdl);
if (hg_ret != HG_SUCCESS) {
D_ERROR("HG_Bulk_free() failed (%s)\n", HG_Error_to_string(hg_ret));
/* Ignore the error, as we are already in a cleanup path */
}
}

hg_ret = HG_Bulk_free(bulk->hg_bulk_hdl);
if (hg_ret != HG_SUCCESS) {
D_ERROR("HG_Bulk_free failed, hg_ret: %d.\n", hg_ret);
rc = crt_hgret_2_der(hg_ret);
}
ctx = bulk->crt_ctx;

/* bulks that are decoded don't have a cart context associated with them */
if (ctx)
CRT_METRIC_INC(ctx, CM_BULK_FREE);

/* decoded bulks are not counted towards quota; such bulks have crt_ctx set to NULL */
if (bulk->crt_ctx)
put_quota_resource(bulk->crt_ctx, CRT_QUOTA_BULKS);
if (!bulk->deferred && ctx != NULL)
put_quota_resource(ctx, CRT_QUOTA_BULKS);
out:
if (bulk != NULL) {
if (bulk->iovs)
Expand Down
127 changes: 57 additions & 70 deletions src/cart/crt_context.c
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,29 @@ crt_context_ep_empty(crt_context_t crt_ctx)
return rc == 0;
}

/* helper function to add counters */
static void
crt_context_add_counters(struct crt_context *ctx)
{
char *prov;
int idx;
int rc = 0;

D_ASSERT(ctx != NULL);

prov = crt_provider_name_get(ctx->cc_hg_ctx.chc_provider);
idx = ctx->cc_idx;

#define X(name, type, desc, unit_desc) \
rc = d_tm_add_metric(&ctx->cc_metrics.name, type, desc, unit_desc, "net/%s/%s/ctx_%u", \
prov, #name, idx); \
if (rc != 0) \
DL_WARN(rc, "Failed to add metric " #name "\n");

CRT_METRICS_LIST;
#undef X
}

static int
crt_context_init(struct crt_context *ctx)
{
Expand Down Expand Up @@ -272,56 +295,7 @@ crt_context_provider_create(crt_context_t *crt_ctx, crt_provider_t provider, boo

/** initialize sensors for servers */
if (crt_gdata.cg_use_sensors && crt_is_service()) {
int ret;
char *prov;

prov = crt_provider_name_get(ctx->cc_hg_ctx.chc_provider);
ret = d_tm_add_metric(&ctx->cc_timedout, D_TM_COUNTER,
"Total number of timed out RPC requests",
"reqs", "net/%s/req_timeout/ctx_%u",
prov, ctx->cc_idx);
if (ret)
DL_WARN(ret, "Failed to create timed out req counter");

ret = d_tm_add_metric(&ctx->cc_timedout_uri, D_TM_COUNTER,
"Total number of timed out URI lookup "
"requests", "reqs",
"net/%s/uri_lookup_timeout/ctx_%u",
prov, ctx->cc_idx);
if (ret)
DL_WARN(ret, "Failed to create timed out uri req counter");

ret = d_tm_add_metric(&ctx->cc_failed_addr, D_TM_COUNTER,
"Total number of failed address "
"resolution attempts", "reqs",
"net/%s/failed_addr/ctx_%u",
prov, ctx->cc_idx);
if (ret)
DL_WARN(ret, "Failed to create failed addr counter");

ret = d_tm_add_metric(&ctx->cc_net_glitches, D_TM_COUNTER,
"Total number of network glitch errors", "errors",
"net/%s/glitch/ctx_%u", prov, ctx->cc_idx);
if (ret)
DL_WARN(ret, "Failed to create network glitch counter");

ret = d_tm_add_metric(&ctx->cc_swim_delay, D_TM_STATS_GAUGE,
"SWIM delay measurements", "delay",
"net/%s/swim_delay/ctx_%u", prov, ctx->cc_idx);
if (ret)
DL_WARN(ret, "Failed to create SWIM delay gauge");

ret = d_tm_add_metric(&ctx->cc_quotas.rpc_waitq_depth, D_TM_GAUGE,
"Current count of enqueued RPCs", "rpcs",
"net/%s/waitq_depth/ctx_%u", prov, ctx->cc_idx);
if (ret)
DL_WARN(ret, "Failed to create rpc waitq gauge");

ret = d_tm_add_metric(&ctx->cc_quotas.rpc_quota_exceeded, D_TM_COUNTER,
"Total number of exceeded RPC quota errors", "errors",
"net/%s/quota_exceeded/ctx_%u", prov, ctx->cc_idx);
if (ret)
DL_WARN(ret, "Failed to create quota exceeded counter");
crt_context_add_counters(ctx);
}

if (crt_is_service() && crt_gdata.cg_auto_swim_disable == 0 &&
Expand Down Expand Up @@ -501,14 +475,42 @@ crt_rpc_completed(struct crt_rpc_priv *rpc_priv)
void
crt_rpc_complete_and_unlock(struct crt_rpc_priv *rpc_priv, int rc)
{
struct crt_context *ctx;
struct crt_cb_info cbinfo;

D_ASSERT(rpc_priv != NULL);
ctx = rpc_priv->crp_pub.cr_ctx;

if (crt_rpc_completed(rpc_priv)) {
CRT_METRIC_INC(ctx, CM_RPC_DOUBLE_COMPLETE);
crt_rpc_unlock(rpc_priv);
RPC_ERROR(rpc_priv, "already completed, possibly due to duplicated completions.\n");
return;
}

cbinfo.cci_rpc = &rpc_priv->crp_pub;
cbinfo.cci_arg = rpc_priv->crp_arg;
cbinfo.cci_rc = rc;

if (cbinfo.cci_rc == 0)
cbinfo.cci_rc = rpc_priv->crp_reply_hdr.cch_rc;

if (cbinfo.cci_rc != 0)
RPC_CWARN(crt_quiet_error(cbinfo.cci_rc), DB_NET, rpc_priv, "failed, " DF_RC "\n",
DP_RC(cbinfo.cci_rc));

switch (cbinfo.cci_rc) {
case DER_SUCCESS:
CRT_METRIC_INC(ctx, CM_RPC_COMPLETED);
break;
case -DER_TIMEDOUT:
CRT_METRIC_INC(ctx, CM_RPC_TIMEDOUT);
break;
default:
CRT_METRIC_INC(ctx, CM_RPC_COMPLETED_ERR);
break;
}

if (rc == -DER_CANCELED)
rpc_priv->crp_state = RPC_STATE_CANCELED;
else if (rc == -DER_TIMEDOUT)
Expand All @@ -521,18 +523,6 @@ crt_rpc_complete_and_unlock(struct crt_rpc_priv *rpc_priv, int rc)
crt_rpc_unlock(rpc_priv);

if (rpc_priv->crp_complete_cb != NULL) {
struct crt_cb_info cbinfo;

cbinfo.cci_rpc = &rpc_priv->crp_pub;
cbinfo.cci_arg = rpc_priv->crp_arg;
cbinfo.cci_rc = rc;
if (cbinfo.cci_rc == 0)
cbinfo.cci_rc = rpc_priv->crp_reply_hdr.cch_rc;

if (cbinfo.cci_rc != 0)
RPC_CWARN(crt_quiet_error(cbinfo.cci_rc), DB_NET, rpc_priv,
"failed, " DF_RC "\n", DP_RC(cbinfo.cci_rc));

RPC_TRACE(DB_TRACE, rpc_priv,
"Invoking RPC callback (rank %d tag %d) rc: "
DF_RC "\n",
Expand Down Expand Up @@ -1147,9 +1137,6 @@ crt_req_timeout_hdlr(struct crt_rpc_priv *rpc_priv)
grp_priv = crt_grp_pub2priv(tgt_ep->ep_grp);
crt_ctx = rpc_priv->crp_pub.cr_ctx;

if (crt_gdata.cg_use_sensors)
d_tm_inc_counter(crt_ctx->cc_timedout, 1);

switch (rpc_priv->crp_state) {
case RPC_STATE_INITED:
case RPC_STATE_QUEUED:
Expand All @@ -1170,8 +1157,8 @@ crt_req_timeout_hdlr(struct crt_rpc_priv *rpc_priv)
container_of(ul_req, struct crt_rpc_priv, crp_pub), ul_in->ul_grp_id,
ul_in->ul_rank, ul_req->cr_ep.ep_rank);

if (crt_gdata.cg_use_sensors)
d_tm_inc_counter(crt_ctx->cc_timedout_uri, 1);
CRT_METRIC_INC(crt_ctx, CM_URI_LOOKUP_TIMEDOUT);

crt_req_abort(ul_req);
/*
* don't crt_rpc_complete_and_unlock rpc_priv here, because crt_req_abort
Expand Down Expand Up @@ -1410,7 +1397,7 @@ crt_context_req_track(struct crt_rpc_priv *rpc_priv)

if (quota_rc == -DER_QUOTA_LIMIT) {
d_list_add_tail(&rpc_priv->crp_waitq_link, &crt_ctx->cc_quotas.rpc_waitq);
d_tm_inc_gauge(crt_ctx->cc_quotas.rpc_waitq_depth, 1);
CRT_METRIC_INC_GAUGE(crt_ctx, CM_RPC_WAITQ_DEPTH, 1);
}

D_MUTEX_UNLOCK(&crt_ctx->cc_mutex);
Expand Down Expand Up @@ -1589,7 +1576,7 @@ crt_context_req_untrack(struct crt_rpc_priv *rpc_priv)
D_INIT_LIST_HEAD(&submit_list);
if (tmp_rpc != NULL) {
add_rpc_to_list(tmp_rpc, &submit_list);
d_tm_dec_gauge(crt_ctx->cc_quotas.rpc_waitq_depth, 1);
CRT_METRIC_DEC_GAUGE(crt_ctx, CM_RPC_WAITQ_DEPTH, 1);
} else {
put_quota_resource(rpc_priv->crp_pub.cr_ctx, CRT_QUOTA_RPCS);
}
Expand Down Expand Up @@ -2155,7 +2142,7 @@ get_quota_resource(crt_context_t crt_ctx, crt_quota_type_t quota)
} else {
D_DEBUG(DB_TRACE, "Quota limit (%d) reached for quota_type=%d\n",
ctx->cc_quotas.limit[quota], quota);
d_tm_inc_counter(ctx->cc_quotas.rpc_quota_exceeded, 1);
CRT_METRIC_INC(ctx, CM_RPC_QUOTA_EXCEEDED);
rc = -DER_QUOTA_LIMIT;
}

Expand Down
18 changes: 17 additions & 1 deletion src/cart/crt_corpc.c
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/*
* (C) Copyright 2016-2024 Intel Corporation.
* (C) Copyright 2025 Hewlett Packard Enterprise Development LP
* (C) Copyright 2025-2026 Hewlett Packard Enterprise Development LP
*
* SPDX-License-Identifier: BSD-2-Clause-Patent
*/
Expand Down Expand Up @@ -348,12 +348,16 @@ crt_corpc_req_create(crt_context_t crt_ctx, crt_group_t *grp,
bool filter_invert;
d_rank_t grp_root, pri_root;
uint32_t grp_ver;
struct crt_context *ctx;
int rc = 0;

if (crt_ctx == CRT_CONTEXT_NULL || req == NULL) {
D_ERROR("invalid parameter (NULL crt_ctx or req).\n");
D_GOTO(out, rc = -DER_INVAL);
}

ctx = crt_ctx;

if (!crt_initialized()) {
D_ERROR("CaRT not initialized yet.\n");
D_GOTO(out, rc = -DER_UNINIT);
Expand Down Expand Up @@ -438,6 +442,8 @@ crt_corpc_req_create(crt_context_t crt_ctx, crt_group_t *grp,
D_GOTO(out, rc);
}

CRT_METRIC_INC(ctx, CM_CORPC_CREATED);

*req = &rpc_priv->crp_pub;
out:
if (rc < 0)
Expand Down Expand Up @@ -540,7 +546,17 @@ crt_corpc_complete(struct crt_rpc_priv *rpc_priv)
myrank = co_info->co_grp_priv->gp_self;
am_root = (myrank == co_info->co_root);
if (am_root) {
struct crt_context *ctx;

crt_rpc_lock(rpc_priv);

ctx = rpc_priv->crp_pub.cr_ctx;

if (co_info->co_rc == 0)
CRT_METRIC_INC(ctx, CM_CORPC_COMPLETED);
else
CRT_METRIC_INC(ctx, CM_CORPC_COMPLETED_ERR);

crt_rpc_complete_and_unlock(rpc_priv, co_info->co_rc);
} else {
if (co_info->co_rc != 0)
Expand Down
Loading
Loading