Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion include/cmirdmautils.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@

// } NcpyOperationInfo;

#if CMK_CUDA
#if CMK_CUDA || CMK_HIP
enum DeviceRecvType {
DEVICE_RECV_TYPE_CHARM,
DEVICE_RECV_TYPE_AMPI,
Expand Down
2 changes: 1 addition & 1 deletion src/comm_backend/comm_backend.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ void issueAm(int rank, const void *local_buf, size_t size, mr_t mr,
* @brief Issue a remote get operation. Thread-safe.
*/
void issueRget(int rank, const void *local_buf, size_t size, mr_t local_mr,
uintptr_t remote_disp, void *rmr, CompHandler localComp, void *user_context);
void* remote_buf, void *rmr, CompHandler localComp, void *user_context);
/**
* @brief Issue a remote put operation. Thread-safe.
*/
Expand Down
4 changes: 2 additions & 2 deletions src/comm_backend/comm_backend_internal.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,11 +98,11 @@ void issueAm(int rank, const void *msg, size_t size, mr_t mr, CompHandler localC
}

void issueRget(int rank, const void *local_buf, size_t size, mr_t local_mr,
uintptr_t remote_disp, void *rmr, CompHandler localComp, void *user_context) {
void* remote_buf, void *rmr, CompHandler localComp, void *user_context) {
if (gCommBackend == nullptr) {
return;
}
gCommBackend->issueRget(rank, local_buf, size, local_mr, remote_disp, rmr,
gCommBackend->issueRget(rank, local_buf, size, local_mr, remote_buf, rmr,
localComp, user_context);
}

Expand Down
2 changes: 1 addition & 1 deletion src/comm_backend/comm_backend_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ class CommBackendBase {
virtual void issueAm(int rank, const void *local_buf, size_t size, mr_t mr,
CompHandler localComp, AmHandler remoteComp, void *user_context) = 0;
virtual void issueRget(int rank, const void *local_buf, size_t size,
mr_t local_mr, uintptr_t remote_disp, void *rmr,
mr_t local_mr, void *remote_buf, void *rmr,
CompHandler localComp, void *user_context) {
// Default implementation: not supported
CmiAbort("Rget not supported in this backend");
Expand Down
3 changes: 2 additions & 1 deletion src/comm_backend/lci2/comm_backend_lci2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -127,10 +127,11 @@ void CommBackendLCI2::issueAm(int rank, const void *local_buf, size_t size, mr_t
}

void CommBackendLCI2::issueRget(int rank, const void *local_buf, size_t size,
mr_t local_mr, uintptr_t remote_disp, void *rmr,
mr_t local_mr, void* remote_buf, void *rmr,
CompHandler localComp, void *user_context) {
auto args = new localCallbackArgs{localComp, user_context};
lci::status_t status;
uintptr_t remote_disp = (uintptr_t)remote_buf - getThreadLocalRMR(rmr).base;
do {
status = lci::post_get_x(rank, const_cast<void *>(local_buf), size,
m_local_comp, remote_disp, getThreadLocalRMR(rmr))
Expand Down
2 changes: 1 addition & 1 deletion src/comm_backend/lci2/comm_backend_lci2.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class CommBackendLCI2 : public CommBackendBase {
void issueAm(int rank, const void *local_buf, size_t size, mr_t mr,
CompHandler localComp, AmHandler remoteComp, void *user_context) override;
void issueRget(int rank, const void *local_buf, size_t size, mr_t local_mr,
uintptr_t remote_disp, void *rmr,
void* remote_buf, void *rmr,
CompHandler localComp, void *user_context) override;
void issueRput(int rank, const void *local_buf, size_t size, mr_t local_mr,
uintptr_t remote_disp, void *rmr,
Expand Down
13 changes: 6 additions & 7 deletions src/conv-rdma.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -204,9 +204,9 @@ static void memcpyAnyPtr(void *dst, const void *src, size_t size) {
{
hipPointerAttribute_t srcAttr{}, dstAttr{};
bool srcIsDevice = (hipPointerGetAttributes(&srcAttr, src) == hipSuccess &&
srcAttr.memoryType == hipMemoryTypeDevice);
srcAttr.type == hipMemoryTypeDevice);
bool dstIsDevice = (hipPointerGetAttributes(&dstAttr, dst) == hipSuccess &&
dstAttr.memoryType == hipMemoryTypeDevice);
dstAttr.type == hipMemoryTypeDevice);
if (srcIsDevice || dstIsDevice) {
if (hipMemcpy(dst, src, size, hipMemcpyDefault) == hipSuccess)
return;
Expand Down Expand Up @@ -470,10 +470,10 @@ void CmiIssueRget(NcpyOperationInfo *ncpyOpInfo) {
} else if (!CmiUseCopyBasedRDMA) {
auto mr = *(comm_backend::mr_t *)ncpyOpInfo->destLayerInfo;
void *rmr = ncpyOpInfo->srcLayerInfo + sizeof(comm_backend::mr_t);
// FIXME: we assume the offset to the registered base address is 0 here
comm_backend::issueRget(CmiNodeOf(ncpyOpInfo->srcPe), ncpyOpInfo->destPtr,
ncpyOpInfo->srcSize, mr, 0, rmr,
ncpyOpInfo->srcSize, mr, (void*)ncpyOpInfo->srcPtr, rmr,
CommRgetLocalHandler, ncpyOpInfo);
// printf("reconverse rgets from src pe %d to dest pe %d, baseptr %p, srcptr %p, dstptr %p, size %zu\n", ncpyOpInfo->srcPe, ncpyOpInfo->destPe, comm_backend::getRMRBase(rmr), ncpyOpInfo->srcPtr, ncpyOpInfo->destPtr, ncpyOpInfo->srcSize);
} else {
CmiIssueRgetCopyBased(ncpyOpInfo);
}
Expand Down Expand Up @@ -501,10 +501,9 @@ if (target_node == CmiMyNode()) {
} else if (!CmiUseCopyBasedRDMA) {
auto mr = *(comm_backend::mr_t *)ncpyOpInfo->srcLayerInfo;
void *rmr = ncpyOpInfo->destLayerInfo + sizeof(comm_backend::mr_t);
// FIXME: we assume the offset to the registered base address is 0 here
comm_backend::issueRput(CmiNodeOf(ncpyOpInfo->destPe), ncpyOpInfo->srcPtr,
ncpyOpInfo->srcSize, mr, 0, rmr, CommRputLocalHandler,
ncpyOpInfo);
ncpyOpInfo->srcSize, mr, 0, rmr,
CommRputLocalHandler, ncpyOpInfo);
} else {
CmiIssueRputCopyBased(ncpyOpInfo);
}
Expand Down
2 changes: 1 addition & 1 deletion src/conv-topology.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ skt_ip_t skt_my_ip(void)
{
char hostname[1000];
skt_ip_t ip = _skt_invalid_ip;

// Prefer hostname resolution so multi-interface nodes still resolve to a
// stable, routable node identity.
if (gethostname(hostname, sizeof(hostname)) == 0) {
Expand Down
2 changes: 1 addition & 1 deletion src/convcore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -486,7 +486,7 @@ void CmiPushPE(int destRank, int messageSize, void *msg) {
int rank = destRank;
CmiAssertMsg(
rank >= 0 && rank < Cmi_mynodesize,
"CmiPushPE(myPe: %d, destRank: %d, nodeSize: %d): rank out of range",
"CmiPushPE(myPe: %d, destPe: %d, nodeSize: %d): rank out of range",
CmiMyPe(), destRank, Cmi_mynodesize);
Cmi_queues[rank]->push(msg);
}
Expand Down
26 changes: 26 additions & 0 deletions src/threads.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,7 @@ CthThread CthCreate(CthVoidFn fn, void *arg, int size) {
}

static void CthThreadBaseFree(CthThreadBase *th) {
struct CthThreadListener *l, *lnext;
/*
* remove the token if it is not queued in the converse scheduler
*/
Expand All @@ -260,9 +261,23 @@ static void CthThreadBaseFree(CthThreadBase *th) {
/* Call the free function pointer on all the listeners on
this thread and also delete the thread listener objects
*/
for (l = th->listener; l != NULL; l = lnext) {
lnext = l->next;
l->next = 0;
if (l->free)
l->free(l);
}
th->listener = NULL;
free(th->data);

if (th->stack != NULL) {
for (l = th->listener; l != NULL; l = lnext) {
lnext = l->next;
l->next = 0;
if (l->free)
l->free(l);
}
th->listener = NULL;
free(th->stack);
}
th->stack = NULL;
Expand All @@ -280,6 +295,11 @@ static void CthThreadFree(CthThread t) {
}

static void CthBaseResume(CthThread t) {
struct CthThreadListener *l;
for (l = B(t)->listener; l != NULL; l = l->next) {
if (l->resume)
l->resume(l);
}
CthFixData(t); /*Thread-local storage may have changed in other thread.*/
CpvAccess(CthCurrent) = t;
CpvAccess(CthData) = B(t)->data;
Expand Down Expand Up @@ -323,6 +343,12 @@ void CthSuspend(void) {
if (cur->suspendable == 0)
CmiAbort("Fatal Error> trying to suspend a non-suspendable thread!\n");

/* Call the suspend function on listeners */
for (l = cur->listener; l != NULL; l = l->next) {
if (l->suspend)
l->suspend(l);
}

CthThFn choosefn = cur->choosefn;
if (choosefn == 0)
CthNoStrategy();
Expand Down
1 change: 1 addition & 0 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ add_subdirectory(ping_ack)
add_subdirectory(random)
add_subdirectory(ring)
add_subdirectory(startup)
add_subdirectory(physical_nodes)
add_subdirectory(reduction)
add_subdirectory(rdma_pingpong)
add_subdirectory(rdma_pingpong_device)
Expand Down
13 changes: 4 additions & 9 deletions tests/physical_nodes/physical_nodes.cpp
Original file line number Diff line number Diff line change
@@ -1,25 +1,20 @@
#include "converse.h"
#include <unistd.h>

CmiStartFn mymain(int argc, char** argv)
{
CmiBarrier();

char hostname[256];
gethostname(hostname, sizeof(hostname));

if (CmiMyPe() == 0) {
if (CmiMyPe() == 0)
{
CmiPrintf("CmiNumPhysicalNodes() = %d\n", CmiNumPhysicalNodes());
CmiPrintf("CmiNumNodes() = %d\n", CmiNumNodes());
CmiPrintf("CmiNumPes() = %d\n", CmiNumPes());
CmiPrintf("Hostname = %s\n", hostname);

for (int pe = 0; pe < CmiNumPes(); pe++) {
for (int pe = 0; pe < CmiNumPes(); pe++)
{
CmiPrintf("PE %d: node=%d physicalNode=%d physicalRank=%d\n", pe,
CmiNodeOf(pe), CmiPhysicalNodeID(pe), CmiPhysicalRank(pe));
}
} else {
CmiPrintf("Hostname = %s\n", hostname);
}

CmiBarrier();
Expand Down
Loading