- what are the requirements for the communication agent?
- driven by the needs of RMA, Pt2Pt, and Collective
- Agent is responsible for transportation and "matching"
In the send/recv case, matching is as simple as matching the standard MPI
envelope information.
In the RMA case, "matching" is much more complicated. RMA operation
messages/RHCs need to be matched to exposure epochs. Before an operation
message can be processed, locks, etc. may need to be acquired.
- what we're doing here is allowing the origin to create a string of CARs
that the target will service, serialize these CARS, pass them to the
target, and place them in the appropriate queue for service - but not
quite, because there is some translation/processing that has to be done
here.
- they cause matching and/or creation of CARs on the other side that
correspond to completion of operations desired by the origin
- in a put case, can you just serialize a recv of the data? no, you can't
do this. there has to be something else going on. so CARs can be
created as a result of the reception of a CAM. damn.
- one way to look at this is that CAR processing could create additional
CARs to be processed...this would cleanly handle the put case - origin
serializes the put, and the target creates the corresponding recv after
processing the put CAR?
- much of the checking of parameters can be performed on the origin, as
we have all the window displacement information locally
- a series of RMA operations could be described as a chain of operation
CARs where each operation has a data CARs hung off of it
- we would like to be able to submit a batch of CARs to a single destination.
- rma operations will be handled differently by the different methods, as
some of them can handle these directly. however, almost none of them will
have native support for accumulate operations -- so do these need to go
up to the CA?
Isend/Irecv scenerio
- generates a send CAR on the sending side and a receive CAR on the receiving
CAR
send CAR - segment, counter, envelope
recv CAR - segment, counter, envelope
- CA_enqueue(vc, send_car)
- add send_car to the end of send queue associated with the VC
- if queue was empty (before the add) then start processing the new CAR
- CA_enqueue(vc, recv_car)
- add recv_car to the end of recv queue associated with the VC
- ...
"if we buffer, you suffer!" --rob
- if a receive request is posted after the matching message is in the process
of being received into a temporary buffer, we would like the message
reception to be paused, the already buffered data to be copied into the user
buffer, and the remainder of the receive to be deposited directly into the
user buffer
Persistent requests
- We need to allow the method the opportunity to register memory reused in a
persistant sent. We suggest accomplishing this by calling CA_register_mem()
during the MPI_<op>_init() and CA_unregister_mem() during the
MPI_Request_free().
CA_register_mem(method, buffer, count, datatype, ®_mem_data_p)
CA_unregister_mem(method, buffer, count, datatype, reg_mem_data_p)
A reg_mem_data_p (void *) is returned from CA_register_mem() which points at
method specific data. This reg_mem_data_p should be attached to each
"registered" send/recv CAR.
- For persistent receives with "any source", we will call register memory for
each method.
Structures
- there will be a separate structure of some sort for each VC. there will be a
mapping function that allows one to map comm/rank to a VC.
- for each VC there is a posted receive queue and posted send queue
- there is also a global "unexpected receive" queue in which incoming sends
which do not match are placed.
- there is also a wildcard queue, possibly one per method. we will devise some
mechanism for avoiding locking this queue each time we receive an incoming
send. some alternatives are: one wc queue per method, an is-empty flag for
the wildcard queue (which must be capable of being atomically read or written
without a lock). WE WILL HAVE ONE WILDCARD QUEUE ONLY.
- this will let us do per-communicator "no wildcard" attributes if we like as
well :).
- a receive call from above matches the unexpected queue and then inserts
itself into the wildcard of vc-specific queue
- an incoming message from a remote system matches the wildcard queue or the
vc-specific receive queue. to maintain MPI ordering semantics, we must keep
a counter. this counter's value is stored in each CAR. it is incremented
only on enqueue into the wildcard queue (new value used by wildcard CAR), and
ties go to the wildcard receive.
- we will have to handle rollover in some reasonable way...how do we want to do
that? i propose some very high overhead mad search through things in order
to teach these evil wildcard users a lesson!
end result: we have rewarded the case where receives are posted first with only
per-vc locks. in the case of wildcard receives we have a counter increment and
a second queue to check. in the case of unmatched incoming messages we have a
single queue which might be a source of some contention.
this is a more memory-intensive approach than the original prototype, but it
should allow for higher performance and parallelism across VCs that share a
common method at least in the well-behaved case of pre-posted, known-source
receives (which is the approach we should reward :)).
- we decided to use a single unexpected queue since having multiple unexpected
queues requires timestamp to order the messages. the timestamp counter is a
point of global contention, just like the global unexpect queue.
- this same argument can be applied to the posted queue case, but allowing for
multiple queues here creates a much nicer search situation for the case where
people prepost their receives. is this worth the cost of the counter
manipulation?
for the non-threaded, non-shmem case we think this will be a win. for the
case of threads or shmem, we are unsure if the lock overhead of the queues
and counter increment will result in a loss.
SINCE WE CAN'T COME TO A REAL CONSENSUS, WE WILL HIDE THIS BEHIND AN API.
then we can play with the implementation ad nauseum, just as we have with
this conversation :).
ASIDE:
rusty and rajeev broke queues into a queue per method, with each method
implementing its own queue code. they then had a separate wildcard queue,
and wildcards were ALSO posted in each method queue.
interface:
- enqueue(queue, car)
- dequeue(queue, car)
- findorallocate(vc?, envelope, &car)
- used to match a posted receive to an incoming message
- check for unexpeced incoming message
- if match, return match to that message
- otherwise post receive in the vc queue (if not wildcard) or
in the wildcard queue (for the wildcard case)
- also used to match an incoming message to a posted receive
- check for matches in wildcard queue and vc queue (need to lock both
queues if wildcard queue is non-empty)
- if both, use counter to arbitrate which to match, match
- if neither, allocate spot in unexpected incoming message
- could split this into incomingfoa and expectingfoa? something like that.
- we maybe want to use requests for matching envelopes, and separate from this
the CARs for describing the pieces that make up the payloads
- maybe special CARs that are the envelopes to match? does that make any sense
in the gather case? this makes a certain amount of sense in that we want to
be queueing things as units, with the envelope info and payload separate.
this gets ugly when the dependency information is added :)
mpi_send(tag, context, dest, buf, ...)
{
calls mpid_send()
{
allocates a counter
allocates a car (c), fills in envelope, payload, pointer to counter
set counter to 1
bump refcount on the communicator
get the vc from the comm/rank
get the right queue (q) from the vc
enqueue(q, c) <- brian and i think this might be register_op()?
{
we want this to MAYBE start doing work if the queue was empty
this calls a method-specific call function
}
...we need to make progress and/or wait on completion...
testing for completion isn't necessarily the same operation as making
progress happen
in the adi3 we have testflags and waitflags at the moment which do both
of these
do we want to kick all the pipes or just this particular pipe (vc) in
this case? or just the pipes for our method? dunno.
our ability to efficiently kick all the "active" pipes will depend on
how easily we can get the status of the various methods from this
level.
when the method completes the car, it decrements the counter? or is
that a CA thing? either way the car is dequeued too. we think this is
a method thing? finally if the refcount on the car is 0, it is freed,
otherwise it sits around until the guys dependent on it are done??? we
think that the car can maybe go away. but we'll wait on this issue
until we get to the collective case.
we may or may not even need a refcount in the car...dunno yet
we think that the car maybe needs a pointer to the queue that it is in
so that we can know most easily where to dequeue from?
we decided to do this as:
while (counter > 0) kick_all_pipes();
then we decrement the refcount on the communicator.
done.
}
}
----------
Rusty was right!
MPID_Send()
{
int counter = 1;
/* error checking */
vc = get_vc(comm, dest);
vc->alloc_car(vc, &car, SEND_CAR);
/* construct send car */
car->type = SEND;
car->context = context;
car->rank = dest;
car->tag = tag;
car->buf = buf;
car->count = count;
car->datatype = datatype;
car->first = first;
car->last = last;
car->pcount = &counter;
vc->advance_state(car, GO_FORWARD_AND_BE_HAPPY)
while(counter > 0)
{
make_progress();
}
}
/*** NOT DONE YET ***/
MPID_Recv()
{
int counter = 1;
/* error checking */
if (any source recv)
{
alloc(&recv_any_p);
lock(recv_any_p->mutex);
found = false;
for(method = 0; !found && method < NUM_METHODS; method++)
{
method_table[method]->recv_find_or_alloc_car(&car[method],
context, ANY_SRC, tag, recv_any_p,
RECV_ANY_SOURCE_CAR, &found);
car[method]->pcount = &counter;
car[method]->buf = buf;
car[method]->count = count;
car[method]->datatype = datatype;
car[method]->first = first;
car[method]->last = last;
method_table[method]->advance_state(car, <STATE>)
if (found == true)
{
method_table[method]->set_segment(car[method],
buf, count, datatype, first, last);
for(method2 = 0; method2 < method; method2++)
{
method_table[method2]->cancel(car[method]);
}
}
}
unlock(recv_any_p->mutex);
if (found == true)
{
free(recv_any_p);
}
}
else
{
vc = get_vc(comm, src);
vc->recv_find_or_alloc_car(&car,
context, src, tag, NULL, &counter, RECV_CAR,
&found);
if (found)
{
vc->set_segment(car, buf, count, datatype, first, last);
}
}
while(counter > 0)
{
make_progress();
}
}
so we see that this can work, but it's a little ugly. we think that we might
eventually want to have a set of functions for creating specific cars for each
method -- this would be a fast path for the send/recv cases.
---------
collective case:
MPID_Xfer_scatter_init()?
buffer dependencies could be on either side -- we need some sort of mechanism
for arbitrating which method should allocate buffers, based on which is more
advantageous.
in the case where the sender supplies the buffer, we are creating a type of
flow control which allows us to most efficiently use the local buffers.
start is going to put the cars with envelopes into the "active" state?
something like that...
(see Rob's picture)
buffer management is handled by the method, since each method has its own
peculiarities with respect to limitations.
we need to keep up with who allocates temporary buffers, and we need to further
ensure that they are only freed when they are no longer needed by any cars
(could be more than one)
--------
The xfer request structure will contain both a counter for determining when
operation have completed as well as tracking data structures for building the
CARs associated with the xfer block.
THESE ARE FOR HOMOGENEOUS USE ONLY. In the hetergeneous case, the message
headers (envelopes) would need to include data format information for each
fragment of the message. (basically each car)
MPID_xfer_scatter_init(src, tag, comm, &req)
{
alloc(req); /* allocates a xfer request */
req->src = src;
req->tag = tag;
req->comm = comm;
req->car_count = 0;
req->recv_vc = get_vc(comm, src);
req->recv_car_list = recv_envelope;
}
MPID_xferi_scatter_recv_mop_op(req, buf, count, dtype, first, last, mop_func)
{
req->recv_vc->alloc(car);
car->type = RECV_DATA | UNPACK;
car->segment = segment;
if (mop_func != MOP_NOOP)
{
recv_car->type |= MOP;
recv_car->op_func = mop_func;
}
list_add(req->recv_car_list, car);
req->recv_car_list.size += get_segment_size(segment);
}
/*
*
* the type field has three independent sets of flags:
* SEND_DATA - sending
* RECV_DATA - receiving
*
* SUPPLY_PACKED_BUF - the method associated with this CAR will supply a
* buffer for use by both this and some other CAR
* PACKED_BUF_DEPENDENCY - this CAR will be supplied a buffer from some other
* CAR (thus it has a buffer dependency)
*
* PACK - this CAR packs the data from the segment into the packed buffer
* UNPACK - this CAR unpacks the data from the pack buffer into the segment
* buffer
*/
MPID_xferi_scatter_recv_mop_forward_op(req, buf, count, dtype, first, last, mop_func, dest)
{
send_vc = get_vc(req->comm, dest);
send_vc->alloc(send_car);
req->recv_vc->alloc(recv_car);
/*
* Buffer handling is determined using some yet-to-be-determined function.
* Nothing above this interface should be looking at the methods, so this
* decision can't be made until we get in here?
*
* This function can take the count/dtype as a parameter to use to help
* decide if it wants to tailor buffer allocation to eager vs. rendezvous.
*/
buffer_handing = func(..., mop_orientation);
switch(buffer_handling)
{
case SENDER_SUPPLIES_BUF:
send_car->type = SEND_DATA | SUPPLY_PACKED_BUF | PROGRESS_DEPENDENCY;
send_car->buf_dep = recv_car;
send_car->pack_buf = NULL;
send_car->pack_size = get_segment_size(buf, count, dtype, first, last);
recv_car->type = RECV_DATA | UNPACK | PACKED_BUF_DEPENDENCY
| SUPPLY_PROGRESS;
recv_car->buf_dep = send_car;
recv_car->op_func = mop_func;
recv_car->pack_buf = NULL;
recv_car->segment = segment(buf, count, dtype, first, last);
recv_car->progress_dep = send_car;
case RECEIVER_SUPPLIES_BUF:
send_car->type = SEND_DATA | PACKED_BUF_DEPENDENCY
| PROGRESS_DEPENDENCY;
send_car->pack_buf = NULL;
send_car->pack_size = get_segment_size(buf, count, dtype, first, last);
recv_car->type = RECV_DATA | UNPACK | SUPPLY_PACKED_BUF
| SUPPLY_PROGRESS;
recv_car->buf_dep = send_car;
recv_car->pack_buf = NULL;
recv_car->segment = segment(buf, count, dtype, first, last);
recv_car->progress_dep = send_car;
case USE_SEGMENT_BUF:
send_car->type = SEND_DATA | PACK | PROGRESS_DEPENDENCY;
send_car->pack_buf = NULL;
send_car->pack_size = get_segment_size(buf, count, dtype, first, last);
recv_car->type = RECV_DATA | UNPACK | SUPPLY_PROGRESS;
recv_car->pack_buf = NULL;
recv_car->segment = segment(buf, count, dtype, first, last);
recv_car->progress_dep = send_car;
case USE_TMP_BUF:
/* case where neither method is capable of providing a buffer */
alloc(tmp_buf);
send_car->type = SEND_DATA | PROGRESS_DEPENDENCY;
send_car->pack_buf = tmp_buf;
send_car->pack_size = get_segment_size(buf, count, dtype, first, last);
recv_car->type = RECV_DATA | UNPACK | SUPPLY_PROGRESS;
recv_car->pack_buf = tmp_buf;
recv_car->segment = segment(buf, count, dtype, first, last);
recv_car->progress_dep = send_car;
}
if (mop_func != MOP_NOOP)
{
if (mop_orientation == SEND)
{
send_car->type |= MOP;
send_car->op_func = mop_func;
}
else
{
recv_car->type |= MOP;
recv_car->op_func = mop_func;
}
}
if (req->send_car_list{dest} == NULL)
{
req->send_car_list{dest} = envelope_car{dest};
req->send_car_list{dest}.size = 0;
}
/* basically we're adding to the size of the
* envelope, which we know are at the head of these lists
*/
list_add(req->send_car_list{dest}, send_car);
req->send_car_list{dest}.size += send_car->pack_size;
list_add(req->recv_car_list, recv_car);
req->recv_car_list.size += recv_car->pack_size;
}
MPID_xfer_scatter_forward_op(req, size, dest)
{
/* this one doesn't unpack. david points out that there is no
USE_SEGMENT_BUF case here. that might be worth optimizing out... */
send_vc = get_vc(req->comm, dest);
send_vc->alloc(send_car);
req->recv_vc->alloc(recv_car);
/*
* Buffer handling is determined using some yet-to-be-determined function.
* Nothing above this interface should be looking at the methods, so this
* decision can't be made until we get in here?
*
* This function can take the count/dtype as a parameter to use to help
* decide if it wants to tailor buffer allocation to eager vs. rendezvous.
*/
buffer_handing = func(...);
switch(buffer_handling)
{
case SENDER_SUPPLIES_BUF:
send_car->type = SEND_DATA | SUPPLY_PACKED_BUF
| PROGRESS_DEPENDENCY;
send_car->buf_dep = recv_car;
send_car->pack_buf = NULL;
send_car->pack_size = size;
recv_car->type = RECV_DATA | PACKED_BUF_DEPENDENCY | SUPPLY_PROGRESS;
recv_car->pack_buf = NULL;
recv_car->pack_size = size;
recv_car->progress_dep = send_car;
case RECEIVER_SUPPLIES_BUF:
send_car->type = SEND_DATA | PACKED_BUF_DEPENDENCY
| PROGRESS_DEPENDENCY;
send_car->pack_buf = NULL;
send_car->pack_size = size;
recv_car->type = RECV_DATA | SUPPLY_PACKED_BUF | SUPPLY_PROGRESS;
recv_car->buf_dep = send_car;
recv_car->pack_buf = NULL;
recv_car->pack_size = size;
recv_car->progress_dep = send_car;
case USE_TMP_BUF:
/* case where neither method is capable of providing a buffer */
alloc(tmp_buf);
send_car->type = SEND_DATA | PROGRESS_DEPENDENCY;
send_car->pack_buf = tmp_buf;
send_car->pack_size = size;
recv_car->type = RECV_DATA | SUPPLY_PROGRESS;
recv_car->pack_buf = tmp_buf;
recv_car->pack_size = size;
recv_car->progress_dep = send_car;
}
if (req->send_car_list{dest} == NULL)
{
req->send_car_list{dest} = envelope_car{dest};
req->send_car_list{dest}.size = 0;
}
/* basically we're adding to the size of the
* envelope, which we know are at the head of these lists
*/
list_add(req->send_car_list{dest}, send_car);
req->send_car_list{dest}.size += send_car->pack_size;
list_add(req->recv_car_list, recv_car);
req->recv_car_list.size += recv_car->pack_size;
}
MPID_xfer_scatter_send_op(req, buf, count, dtype, first, last, dest)
{
send_vc = get_vc(req->comm, dest);
send_vc->alloc(car);
car->type = SEND_DATA | PACK;
car->segment = segment;
if (req->send_car_list{dest} == NULL)
req->send_car_list{dest} = envelope_car{dest}; /* allocate an envelope
and put it at the
head of the
send_car_list for
this destination */
list_add(req->send_car_list{dest}, car);
req->send_car_list{dest}.size += get_segment_size(segment);
}
MPID_xfer_scatter_start(req)
{
/* add the heads of the lists of cars into the ready lists/queues */
foreach rank in {send} {
vc = get_vc(req->comm, rank);
vc.send(vc, req->send_car_list{rank});
}
req->recv_vc.recv(vc, req->recv_car_list);
}
MPID_xfer_gather_init(dest, tag, comm, &req)
{
alloc(req); /* allocates a xfer request */
req->dest = dest;
req->tag = tag;
req->comm = comm;
req->car_count = 0;
req->send_vc = get_vc(comm, dest);
req->send_car_list = send_envelope;
}
MPID_xferi_gather_recv_mop_op(req, buf, count, dtype, first, last, mop_func, src)
{
req->recv_vc->alloc(car);
car->type = RECV_DATA | UNPACK;
car->segment = segment;
if (req->recv_car_list{src} == NULL)
{
req->recv_car_list{src} = envelope_car{src};
}
if (mop_func != MOP_NOOP)
{
recv_car->type |= MOP;
recv_car->op_func = mop_func;
}
list_add(req->recv_car_list{src}, car);
req->recv_car_list{src}.size += get_segment_size(segment);
}
/*
*
* the type field has three independent sets of flags:
* SEND_DATA - sending
* RECV_DATA - receiving
*
* SUPPLY_PACKED_BUF - the method associated with this CAR will supply a
* buffer for use by both this and some other CAR
* PACKED_BUF_DEPENDENCY - this CAR will be supplied a buffer from some other
* CAR (thus it has a buffer dependency)
*
* PACK - this CAR packs the data from the segment into the packed buffer
* UNPACK - this CAR unpacks the data from the pack buffer into the segment
* buffer
*/
MPID_xferi_gather_recv_mop_forward_op(req, buf, count, dtype, first, last, mop, src)
{
recv_vc = get_vc(req->comm, src);
recv_vc->alloc(recv_car);
req->send_vc->alloc(send_car);
/*
* Buffer handling is determined using some yet-to-be-determined function.
* Nothing above this interface should be looking at the methods, so this
* decision can't be made until we get in here?
*
* This function can take the count/dtype as a parameter to use to help
* decide if it wants to tailor buffer allocation to eager vs. rendezvous.
*/
buffer_handing = func(..., mop_orientation);
switch(buffer_handling)
{
case SENDER_SUPPLIES_BUF:
send_car->type = SEND_DATA | SUPPLY_PACKED_BUF | PROGRESS_DEPENDENCY;
send_car->buf_dep = recv_car;
send_car->pack_buf = NULL;
send_car->pack_size = get_segment_size(buf, count, dtype, first, last);
recv_car->type = RECV_DATA | UNPACK | PACKED_BUF_DEPENDENCY
| SUPPLY_PROGRESS;
recv_car->pack_buf = NULL;
recv_car->segment = segment(buf, count, dtype, first, last);
recv_car->progress_dep = send_car;
case RECEIVER_SUPPLIES_BUF:
send_car->type = SEND_DATA | PACKED_BUF_DEPENDENCY
| PROGRESS_DEPENDENCY;
send_car->pack_buf = NULL;
send_car->pack_size = get_segment_size(buf, count, dtype, first, last);
recv_car->type = RECV_DATA | UNPACK | SUPPLY_PACKED_BUF
| SUPPLY_PROGRESS;
recv_car->buf_dep = send_car;
recv_car->pack_buf = NULL;
recv_car->segment = segment(buf, count, dtype, first, last);
recv_car->progress_dep = send_car;
case USE_SEGMENT_BUF:
send_car->type = SEND_DATA | PACK | PROGRESS_DEPENDENCY;
send_car->pack_buf = NULL;
send_car->pack_size = get_segment_size(buf, count, dtype, first, last);
recv_car->type = RECV_DATA | UNPACK | SUPPLY_PROGRESS;
recv_car->pack_buf = NULL;
recv_car->segment = segment(buf, count, dtype, first, last);
recv_car->progress_dep = send_car;
case USE_TMP_BUF:
/* case where neither method is capable of providing a buffer */
alloc(tmp_buf);
send_car->type = SEND_DATA | PROGRESS_DEPENDENCY;
send_car->pack_buf = tmp_buf;
send_car->pack_size = get_segment_size(buf, count, dtype, first, last);
recv_car->type = RECV_DATA | UNPACK | SUPPLY_PROGRESS;
recv_car->pack_buf = tmp_buf;
recv_car->segment = segment(buf, count, dtype, first, last);
recv_car->progress_dep = send_car;
}
if (mop_func != MOP_NOOP)
{
if (mop_orientation == SEND)
{
send_car->type |= MOP;
send_car->op_func = mop_func;
}
else
{
recv_car->type |= MOP;
recv_car->op_func = mop_func;
}
}
if (req->recv_car_list{src} == NULL)
{
req->recv_car_list{src} = envelope_car{src};
req->recv_car_list{src}.size = 0;
}
/* basically we're adding to the size of the
* envelope, which we know are at the head of these lists
*/
list_add(req->recv_car_list{src}, recv_car);
req->recv_car_list{src}.size += recv_car->pack_size;
list_add(req->send_car_list, send_car);
req->send_car_list.size += send_car->pack_size;
}
MPID_xfer_gather_forward_op(req, size, dest)
{
/* this one doesn't unpack. david points out that there is no
USE_SEGMENT_BUF case here. that might be worth optimizing out... */
recv_vc = get_vc(req->comm, src);
recv_vc->alloc(recv_car);
req->send_vc->alloc(send_car);
/*
* Buffer handling is determined using some yet-to-be-determined function.
* Nothing above this interface should be looking at the methods, so this
* decision can't be made until we get in here?
*
* This function can take the count/dtype as a parameter to use to help
* decide if it wants to tailor buffer allocation to eager vs. rendezvous.
*/
buffer_handing = func(...);
switch(buffer_handling)
{
case SENDER_SUPPLIES_BUF:
send_car->type = SEND_DATA | SUPPLY_PACKED_BUF
| PROGRESS_DEPENDENCY;
send_car->buf_dep = recv_car;
send_car->pack_buf = NULL;
send_car->pack_size = size;
recv_car->type = RECV_DATA | PACKED_BUF_DEPENDENCY | SUPPLY_PROGRESS;
recv_car->buf_dep = send_car;
recv_car->pack_buf = NULL;
recv_car->pack_size = size;
recv_car->progress_dep = send_car;
case RECEIVER_SUPPLIES_BUF:
send_car->type = SEND_DATA | PACKED_BUF_DEPENDENCY
| PROGRESS_DEPENDENCY;
send_car->pack_buf = NULL;
send_car->pack_size = size;
recv_car->type = RECV_DATA | SUPPLY_PACKED_BUF | SUPPLY_PROGRESS;
recv_car->buf_dep = send_car;
recv_car->pack_buf = NULL;
recv_car->pack_size = size;
recv_car->progress_dep = send_car;
case USE_TMP_BUF:
/* case where neither method is capable of providing a buffer */
alloc(tmp_buf);
send_car->type = SEND_DATA | PROGRESS_DEPENDENCY;
send_car->pack_buf = tmp_buf;
send_car->pack_size = size;
recv_car->type = RECV_DATA | SUPPLY_PROGRESS;
recv_car->pack_buf = tmp_buf;
recv_car->pack_size = size;
recv_car->progress_dep = send_car;
}
if (req->recv_car_list{src} == NULL)
{
req->recv_car_list{src} = envelope_car{src};
req->recv_car_list{src}.size = 0;
}
/* basically we're adding to the size of the
* envelope, which we know are at the head of these lists
*/
list_add(req->recv_car_list{src}, recv_car);
req->recv_car_list{src}.size += recv_car->pack_size;
list_add(req->send_car_list, send_car);
req->send_car_list.size += send_car->pack_size;
}
MPID_xfer_gather_send_op(req, buf, count, dtype, first, last)
{
req->send_vc->alloc(send_car);
send_car->type = SEND_DATA | PACK;
send_car->segment = segment;
list_add(req->send_car_list, send_car);
req->send_car_list.size += get_segment_size(segment);
}
MPID_xfer_gather_start(req)
{
/* add the heads of the lists of cars into the ready lists/queues */
foreach rank in {recv} {
/* need to get the vc for rank; extract it from the head car, which is
* the envelope - this could be a special field in the envelope, or
* we could have the vc in all cars (that seems silly)
*
* alternatively we could look up the vc using the info in the request
* plus the rank from the envelope...
*/
vc = get_vc(req->comm, rank);
vc.recv(vc, req->recv_car_list{rank});
}
req->send_vc.send(vc, req->send_car_list);
}
/***************************************************************/
now we need to make progress on things, taking the dependencies into acct.
/***************************************************************/
the open question at the moment is where to put the car lists.
one option is to associate things with the vc.
this necessitates having a list of "active" vcs, I think.
we would update any necessary global method state during the xfer start
functions.
we note that there is a disparity between the send side and the receive side
in that there is always the possibility that messages will arrive, so we need
to be polling all the time for receives, at some interval. This interval
might be very infrequent when we are not expecting any messages, but more
frequent when we know that there are outstanding operations on the given
method.
in some cases it costs the same to poll all vcs for a given method as it does
to poll a set of or a single vc for the method. in other cases it is more
expensive to poll all vcs on the method.
another issue is knowing the relative speeds of the methods WRT how often to
poll.
the polling functions could give feedback as well indicating that progress was
made, or # of things processed, or whatever. this might be useful as well...
this could be a priority thing of some sort.
we call a method-specific function to add the car lists to the specific VC(s).
this allows for the method to update any internal structures as well which it
can use to keep up with active connections/sockets/VIs/whatever.
/* this is will be a pointer in the vc to a method-specific function
*/
We're going to allow methods to modify the VC method functions whenever they
feel like it in order to express the current state via the function pointers.
Boy, that's wacky isn't it? :)
This could also be used to put a VC in an error state.
tcp_send(vc, car_list)
{
if connect isn't initiated
initiate connect
queue car list
else if connect in progress
queue car list
else if send queue is not empty
queue car list
make progress on this send queue
else
make progress on specified car list
enqueue remainder of car list
fi
}
tcp_recv(vc, car_list)
{
}
tcp_make_progress()
{
}
In the case where a recv is posted to the method but the matching message has
already started to arrive (a corresponding unexpected CAR already exists), we
need a way to switch the CAR the progress engine is using to complete the
receive. To accomplish this, we will lock the unexpected CAR, point the
unexpected CAR at the new recv CAR, copy any data already received into the
buffer associated with the new car, and then release the lock. When the
progress engine receives more data, it will notice that the unexpected CAR
contains a pointer to a real recv CAR, update its cached CAR pointer, and
release the unexpected CAR (continuing to process the incoming data using the
state in the recv CAR).
Dependencies:
- completion dependencies are simply handled by use of separate cars in these
ordered lists. these are for multiple cars which make up the same envelope.
- we've lost our priority dependencies. oops! at present we use an
associative array (thank you perl), but that data structure does not maintain
the order of operations. to simplify this situation, we constrain
prioritization to the message level rather than prioritizing each and every
operation/CAR. this allows us to continue using an associative array type
data structure as long as that array maintains the order in which we inserted
new keys into the array.
- this approach loses the priority information between methods. is that ok?
no. so we need to have something that keeps this information across methods
as well.
- david seems to think that this priority stuff isn't going to work.
- we might get back more than one buffer to use for receiving one car's worth
of data, so we can't just have a single pointer to a buffer in the car, if we
use a pointer in the car at all. alternatively we could keep the pointers to
buffers in some structure associated with the vc (as method-specific data).
so we use iovec-like things to keep up with the multiple buffers.
describing buffers:
the sender can then hand over groups of buffers. this includes a list of
pointers and sizes and a total length.
grabbing or providing buffers (which?):
options:
- single function call used by receiver to ask for a buffer, and if the sender
isn't ready, then we get a temp. buffer.
- single function call used by sender used to provide buffers immediately when
available.
- handshake: recv notifies sender he is ready, sender provides buffer space
when it is available. he does not reserve buffer space before the receiver
notifies him.
the code matching handshakes on the sending side may supply buffers for more
than one send CAR which has been marked by the receiver as ready, but it MUST
do so in the order given by the completion dependencies
sender provides buffers when it is at the head of the queue OR if all CARs
ahead of the sending CAR have satisfied all their buffer dependencies (and
the recv is ready). the sender has the option of providing only part of the
required buffer space. in this case, there must be some minimum amount
agreed upon between the two methods (to ensure that the receiver can make
progress with the amount of buffer space provided).
CARs are considered "ahead" if they are part of the same envelope/message
(CAR list) and are ahead of the send CAR in the ordered list (vertical,
completion dependence) OR if they are on the same VC with the sender but in a
different envelope and were enqueued (hi david!) previously. We can
optionally consider whether or not these different envelopes are in the same
context when making dependency assessments (gets back to the reordering
possibility).
- this "all previous CARs are satisfied" stuff refers to the space for the
envelopes, not the data (unless it was sent eagerly). we can do this due to
the semantics of MPI contexts
- a receive only notifies when the incoming envelope has been matched.
likewise, the sender (if using rendezvous) must match envelopes with the
remote receiver before he will hand buffers back to the receiver.
- there are opportunities for reordering the send queue based on the MPI
semantics between isends/sends/collectives. if the contexts aren't the same,
you can reorder (maintaining intra-context ordering). recall that
collectives are in a different context from p2p
- senders in the rendezvous protocol with progress dependencies may eagerly
send their RTS. likewise the CTS may also be sent eagerly when the matching
receive is seen.
- we need to keep a small number of buffers in reserve on some methods
(e.g. VIA) in order to guarantee that we can make progress. the example is a
system that would otherwise use up all his buffers for RTS/CTS packets.
we don't think that this is quite sufficient.
so in the via case we need to have at least two buffers lying around (at a
very low level) for our window protocol, one for receiving window messages
and one for sending them, but we don't use that up at the level we're looking
at. the send buffer can be small, but the receive buffer may have to be
bigger (?).
actually we can just have two buffers around at the messaging level, which
are enough to make slow progress. then we piggyback our window info inside
the 32 bits in the header of each send.
additionally the envelope/header can't be bigger than the minimum via packet
size (16K)
we can implement a window scheme per VI/VC. we can keep a pool of buffers
around that VI/VCs can use (and free if they aren't needed, except for the
two mentioned above).
* we could also use DMA writes to do the data transfer and get rid of this
problem altogether. this gives you a packet per message used out of the
receive queue. IMPLEMENT THIS ONE.
- DMA writes for both the data transfer and the control messages is david's
favorite screaming fast idea. by utilizing this two-dma approach to ensure
that the entire header is there, we can get around implementation details.
- we came up with a wacky scheme for writing window values directly into the
remote memory. to do this you want to assign a sequence number to each
packet. then the format of the data you write is:
<counter><window><sequence_nr><counter>
The counter values are identical for a given write and are used as sentinel
values (is this the right nomenclature?)/cookies to allow the local side to
know that the data in the region is valid. if the local process reads the
region and doesn't get the same counter value on each end, it tries again.
This is cool because it's a memory operation, it's likely to work the second
time if it fails, and it doesn't consume any buffers off the VI queues.
We might even be able to get all this into a 32-bit value and make this
simpler.
- we want to utilize our pool of buffers to primarily service forwarding
operations -- we want to wait for the send operation to reach the head of its
VC queue before we really start giving lots of buffers to that operation.
-----
MPID_Irecv(buf, count, datatype, ...)
{
/* foa() - looks up VC (among other things. if rank is ANY_SOURCE, get
* back NULL
*/
recv_posted_foa(comm, rank, tag, &car, &vc, &found);
car->buf = buf;
car->count = count;
car->datatype = datatype;
unlock(car->mutex);
if (found)
{
/* some indication of the sent message has already arrived */
/* process_recv() can make progress implicitly if they like */
vc->process_recv(vc, car);
/* after this call, make_progress() must be the only call
* necessary to finish this message. make_progress may need to be
* called more than once. this function may _optionally_ also make
* progress.
*/
}
}
/* this isn't necessarily a function... */
method_specific_send_envelope_arrived_on_the_wire()
{
/* y'all, the envelope has been yanked */
comm = get_comm_from_context(context_id);
/* we won't know if we really want the vc in here until we start
* implementiong this...but this isn't method specific, so we need to
* make up our minds sooner or later.
*
* this is pretty much the least number of parameters from which we can
* reasonably extract all the information we might need (vc, context)
*/
recv_incoming_foa(comm, rank, tag, &car, &found);
/* recv_incoming_foa() returns an existing CAR if a matching recv was
* found in the "posted queue"; otherwise it allocates an new CAR and
* returns a pointer to it. found is true if an existing CAR was matched
* in the posted queue.
*/
if (data with header)
{
car->data = p_data;
}
unlock(car->mutex);
if (found)
{
/* we could do a lot of work in this function if we like */
method_specific_process_recv(vc, car);
}
}
- we will allocate cars by determining the maximum needed size at runtime and
allocating all cars to that size. preserves locality while simultaneously
separating the generic "car" from any method-specific info, allowing 3rd
party methods to be added without modifying the generic car
MPI_Wait()
{
int backoff_cnt = 0;
while(req->counter > 0)
{
if (backoff_cnt++ > SPIN_THRESHOLD)
{
if (backoff_cnt < SLEEP_THRESHOLD)
{
/* may want to do this only if you have limited processor
resources */
mpid_yield();
}
else
{
mpid_lock(req->mutex);
req->state = BLOCKING_WAIT;
while (req->counter > 0)
mpid_cond_wait(req->mutex, req->cond);
mpid_unlock(req->mutex);
}
}
}
}
tcp_make_progress()
{
/* assuming a set of global FD set which contain the current "list" of
active FDs, we simply make a copy of those
fd_set local_read_set = global_read_set;
fd_set local_write_set = global_write_set;
nset = select(nfds, local_read_set, local_write_set, NULL, timeout);
foreach fd (set in local_read_set)
{
get_header();
if (new incoming message)
{
comm = get_comm_from_context(context_id);
recv_incoming_foa(comm, rank, tag, &car, &found);
if (found)
{
if (car->type & PACKED_BUF_DEPENDENCY != 0)
{
}
if (car->type & UNPACK == UNPACK)
{
MPIR_Segment_init(car->segment, car->buf, car->count,
car->datatype);
}
}
}
}
}
Inter-method interface
progress reporting:
recv CAR has new data available and want to allow the method associated with
the send CAR to make progress on the send CAR. This implies that we need a
function provided by the send CAR's method which the recv CAR's method can call
directly.
car_r.car_s.vc.recv_ready(car_s)
provide_bufs(car_s, <bufs>, length)
report_progress(car, nbytes)
provide_bufs_and_progress(car, bufs, len, nbytes)
{
}
- we want to have all the "code" in the arcs/edges
- it might make sense to separate the vm, method buffer, recv, and send
into separate machines
- do we want it to be ok for the buffering system to return 0?
- concern: if we pound out a purely event-based system, can we then come
back and build an efficient implementation? or are we screwing
ourselves?
- get a state machine, so we can talk about the api...
- we want this first cut to be event-driven
- we also need some fast-path capabilities for some key
transitions/actions.
- we see then that we have a very simple state machine really. but we
think that we should have some flow diagrams that describe the
processing that happens on the edges of these simple state machines.
- ugh. then we start to see that we would like to differentiate between
receiving control and data? i dunno.
- looks like we have a couple of events/api functions for the vcs now: buffer
events and wire events.
- not purely event driven, but bubbles are suspendible state and events are
what wake things up.
buffer allocation:
- receiver vs. sender buffer allocation relies on the sender to grab buffers
in all cases? this is what we're saying in our state machine at the moment,
but it's NOT what we were saying previously. we need to make a decision on
this one.
- maybe we're not going to ever eagerly receive(?)
method.alloc()
- parameters:
maybe src_vc, dest_vc?
maybe provider_vc, consumer_vc? (for notification, accounting)
min_sz, max_sz?
user_arg? callback_fn?
- this function should return immediately, possibly showing failure.
- there is a separate issue of matching BETWEEN methods. we haven't
quite figured out how to do this...we were thinking that the
source and dest could be used to pick more optimal buffers in
the case where the method knows about pairings
- this gets us to a sticky point -- how much do we try to optimize as much
as possible vs. how much do we try to make it easy to implement new
methods
-----
Open Issues:
- When should the ready signal be triggered from the receive car to the send
car?
- When should buffers be allocated when forwarding eager messages so as to
avoid buffer copies?
- Need to define the communication agent