/******************************************************************************
Distributed Memory Implementation
******************************************************************************/
struct MPIR_Win
{
/*** insert struct MPIR_Win here ***/
MPI_Win * handles;
int * rhc_counts;
int rhc_processed;
}
#define MPIR_WIN_CLOSED 0
#define MPIR_WIN_OPEN 1
MPID_Win_create(dwin, info)
{
dwin->handles = malloc(sizeof(MPI_Win) * dwin->np);
MPI_Allreduce(dwin->handle, dwin->handles, dwin->comm);
dwin->access_epoch_state = EPOCH_STATE_ACTIVE;
dwin->access_epoch_id = 0;
dwin->exposure_epoch_state = EPOCH_STATE_ACTIVE;
dwin->exposure_epoch_id = 0;
dwin->rhc_cnts = malloc(sizeof(int) * dwin->np);
zero(dwin->rhc_cnts[0..dwin->np-1]);
rhc_processed = 0;
}
/* MPID_Win_create() */
MPID_Win_free(win)
{
free(dwin->handles);
free(dwin->rhc_cnts);
}
/* MPID_Win_free() */
MPID_Win_fence(assert, win)
{
if (!(assert & MPI_MODE_PRECEDE))
{
MPI_Alltoall(dwin->rhc_cnts, 1, MPI_INT,
tmp, 1, MPI_INT, dwin->comm);
rhc_incoming = sum(tmp[0..dwin->np-1]);
while(dwin->rhc_processed < rhc_incoming &&
dwin->active_reqs.count > 0 &&
dwin->active_flags.count > 0)
{
MPID_Win_wait(dwin);
}
zero(dwin->rhc_cnts[0..np-1]);
dwin->rhc_processed = 0;
dwin->tag = 0;
}
if (!(assert & MPI_MODE_SUCCEED))
{
MPI_Barrier(dwin->comm);
}
}
/* MPID_Win_fence() */
MPID_Get(origin_addr, origin_count, origin_datatype,
target_rank, target_disp, target_count, target_datatype, dwin)
{
if (target_rank == dwin->rank)
{
target_offset = win.displ * target_disp;
target_addr = win.base + target_offset;
MPIR_Copy_data(target_addr, target_count, target_datatype,
origin_addr, origin_count, origin_datatype);
}
else
{
tag = dwin->tag++;
req_p = dwin->active_reqs.alloc();
flag_p = dwin->active_flags.alloc();
rc = MPID_Irecv(origin_addr, origin_count, origin_datatype, tag,
target_rank, dwin->comm, req_p);
header = (dwin->handles[target_rank], target_disp, target_count,
target_datatype, tag);
rc = MPID_Rhcv(target_rank, dwin->comm, MPIDI_Win_get_hdlr,
(header), 1, flag_p);
dwin->rhc_cnts[target_rank]++;
}
}
/* MPID_get() */
MPIDI_Win_get_hdlr(src, comm, (header))
{
(handle, disp, count, datatype, tag) = header;
MPIR_ID_lookup(handle, &dwin);
addr = dwin->base + dwin->disp_unit * disp;
req_p = dwin->active_reqs.alloc();
rc = MPI_Irsend(addr, count, datatype, src, tag, comm, req_p);
dwin->rhc_processed++;
}
/* MPIDI_Win_get_hdlr() */
MPID_Put(origin_addr, origin_count, origin_datatype,
target_rank, target_disp, target_count, target_datatype, dwin)
{
pack_sz = MPID_Pack_size(origin_count, origin_datatype,
dwin->comm, target_rank);
if (target_rank == dwin->rank)
{
target_offset = win.displ * target_disp;
target_addr = win.base + target_offset;
MPIR_Copy_data(origin_addr, origin_count, origin_datatype,
target_addr, target_count, target_datatype);
}
else if (pack_sz < EAGER_MAX)
{
flag_p = dwin->active_flags.alloc();
header = (dwin->handles[target_rank], target_disp, target_count,
target_datatype);
pos = 0;
if (MPIR_Datatype_iscontig(origin_count, origin_datatype))
{
addr = origin_addr;
}
else
{
pos = 0;
MPID_Pack(origin_addr, origin_count, origin_datatype, tmpbuf, pos,
tmpbuf_sz, dwin->comm, target_rank);
addr = tmpbuf;
}
MPID_Pack(origin_addr, origin_count, origin_datatype, tmpbuf, pos,
tmpbuf_sz, dwin->comm, target_rank);
rc = MPID_Rhcv(target_rank, dwin->comm, MPIDI_Win_put_eager_hdlr,
(header, addr[0..pos-1]), 2, flag_p);
dwin->rhc_cnts[target_rank]++;
}
else
{
tag = dwin->tag++;
req_p = dwin->active_reqs.alloc();
flag_p = dwin->active_flags.alloc();
header = (dwin->handles[target_rank], target_disp, target_count,
target_datatype, tag);
rc = MPID_Rhcv(target_rank, dwin->comm, MPIDI_Win_put_hdlr,
(header), 1, flag_p);
dwin->rhc_cnts[target_rank]++;
rc = MPID_Isend(origin_addr, origin_count, origin_datatype, tag,
target_rank, dwin->comm, req_p);
}
}
/* MPID_Put() */
MPIDI_Win_put_eager_hdlr(src, comm, (header,(data,data_sz)))
{
(handle, disp, count, datatype, tag) = header;
MPIR_ID_lookup(handle, &dwin);
addr = dwin->base + dwin->disp_unit * disp;
pos = 0;
rc = MPID_Unpack(addr, count, datatype, data, pos, data_sz,
win->comm, src);
dwin->rhc_processed++;
}
/* MPIDI_Win_put_eager_hdlr() */
MPIDI_Win_put_hdlr(src, comm, (header))
{
(handle, disp, count, datatype, tag) = header;
MPIR_ID_lookup(handle, &dwin);
req_p = dwin->active_reqs.alloc();
addr = dwin->base + dwin->disp_unit * disp;
rc = MPI_Irecv(addr, count, datatype, src, tag, comm, &req);
MPIDI_Win_req_add(dwin->active_reqs, req_p);
dwin->rhc_processed++;
}
/* MPIDI_Win_put_hdlr() */
MPI_Accumulate(origin_addr, origin_count, origin_datatype,
target_rank, target_disp, target_count, target_datatype,
op, win)
{
origin_byte_count = origin_count * origin_datatype->size;
if (target_rank == dwin->rank)
{
target_offset = win.displ * target_disp;
target_addr = win.base + target_offset;
/* peform requested operation */
}
else if (origin_byte_count < EAGER_MAX)
{
flag_p = dwin->active_flags.alloc();
if (MPIR_Datatype_iscontig(origin_count, origin_datatype))
{
addr = origin_addr;
}
else
{
pos = 0;
MPID_Pack(origin_addr, origin_count, origin_datatype, tmpbuf, pos,
tmpbuf_sz, dwin->comm, target_rank);
addr = tmpbuf;
}
header = (dwin->handles[target_rank], target_disp, target_count,
target_datatype);
rc = MPID_Rhcv(target_rank, dwin->comm, MPIDI_Win_acc_eager_hdlr,
(header, addr[0..data_sz]), 2, flag_p);
dwin->rhc_cnts[target_rank]++;
}
else
{
/* ??? */
}
}
/* MPID_Accumulate() */
MPIDI_Win_acc_contig_hdlr(src, comm, (header,(data,data_sz)))
{
(handle, disp, count, datatype, tag) = header;
MPIR_ID_lookup(handle, &dwin);
addr = dwin->base + dwin->disp_unit * disp;
/* perform requested operation, converting origin data if necessary */
dwin->rhc_processed++;
}
/* MPIDI_Win_acc_eager_hdlr() */
MPIDI_Win_acc_eager_hdlr(src, comm, (header,(data,data_sz)))
{
(handle, disp, count, datatype, tag) = header;
MPIR_ID_lookup(handle, &dwin);
addr = dwin->base + dwin->disp_unit * disp;
/* perform operations */
dwin->rhc_processed++;
}
/* MPIDI_Win_acc_eager_hdlr() */