From b7ad696149335a1965ca9317e763f98ba82f2ce4 Mon Sep 17 00:00:00 2001 From: skaller Date: Wed, 20 Mar 2024 00:27:49 +1100 Subject: [PATCH] Add package channel.hpp which contains most of the old code from the nurtl repository. Everything added compiles. However there is a known BUG: fibres MUST have a registry of channels. When I/O is done, ALL the channels in the registry have to have their reference count decremented. On resume, incremented. The current code only decrements the refcount of the cnannel on which the request is made. This is not enough. --- src/packages/rt-channel.fdoc | 1191 ++++++++++++++++++++++++++++++++++ 1 file changed, 1191 insertions(+) create mode 100644 src/packages/rt-channel.fdoc diff --git a/src/packages/rt-channel.fdoc b/src/packages/rt-channel.fdoc new file mode 100644 index 000000000..059e2e221 --- /dev/null +++ b/src/packages/rt-channel.fdoc @@ -0,0 +1,1191 @@ +@tangler channel.hpp = share/lib/rtl/rt/channel.hpp +@tangler sequential_channel.hpp = share/lib/rtl/rt/sequential_channel.hpp +@tangler concurrent_channel.hpp = share/lib/rtl/rt/concurrent_channel.hpp +@tangler async_channel.hpp = share/lib/rtl/rt/async_channel.hpp + +@tangler svc.hpp = share/lib/rtl/rt/svc.hpp +@tangler csp.hpp = share/lib/rtl/rt/csp.hpp +@tangler csp_process.hpp = share/lib/rtl/rt/csp_process.hpp +@tangler csp_thread.hpp = share/lib/rtl/rt/csp_thread.hpp +@tangler con.hpp = share/lib/rtl/rt/con.hpp +@tangler fibre.hpp = share/lib/rtl/rt/fibre.hpp +@tangler system.hpp = share/lib/rtl/rt/system.hpp + +@title Channel Abstraction + +@h1 Continuations +@tangle con.hpp +#ifndef CON +#define CON +// continuation +#include +using size_t = ::std::size_t; + +struct fibre_t; +struct con_t { + fibre_t *fibre; + int pc; // program counter + con_t () : pc(0), fibre(nullptr) {} + virtual con_t *return_control()=0; + virtual con_t *resume()=0; + virtual size_t size()const=0; + virtual ~con_t(){} +}; + +// coroutine +struct coroutine_t : con_t { + con_t *return_control(); // deletes object and returns nullptr +}; + +// top level subroutine +struct subroutine_t : con_t { + subroutine_t(fibre_t *f) { fibre = f; } + con_t *caller; // caller continuation + con_t *return_control(); // deletes object and returns caller +}; + +// NOTE: a subroutine must not delete itself if it returns +// a nested procedure which binds to its local variable frame + +struct curry_subroutine_t : con_t { + con_t *caller; // caller continuation + curry_subroutine_t (fibre_t *f) { fibre = f; } + + con_t *return_control() { + auto tmp = caller; + caller = nullptr; + return tmp; + } +}; +#endif +@ + +@h1 Fibres +@tangle fibre.hpp +#ifndef FIBRE +#define FIBRE +// fibre3.hpp +// fibre +#include "con.hpp" +struct csp_process_t; +union svc_req_t; + +struct fibre_t { + con_t *cc; + fibre_t *next; + csp_process_t *process; + svc_req_t *svc_req; // request + + // default DEAD + //fibre_t() : cc(nullptr), next(nullptr), process(nullptr) {} + + // construct from continuation + fibre_t(con_t *ccin, csp_process_t *owned_by) : + cc(ccin), next (nullptr), process(owned_by), svc_req(nullptr) + { + ccin->fibre=this; + } + + // immobile + fibre_t(fibre_t const&)=delete; + fibre_t& operator=(fibre_t const&)=delete; + + // destructor deletes any remaining continuations in spaghetti stack + ~fibre_t(); // defined in csp.hpp to resolve circular reference + + // run until either fibre issues a service request or dies + svc_req_t *run_fibre() { + while(cc) { + cc=cc->resume(); + if(cc && svc_req) return svc_req; + } + return nullptr; + } +}; +#endif +@ + + + +@h1 Channel Design +A channel is a lightweight synchronisation device which allows fibres +to perform two operations: read and write. When a read is performed by a channel owning fibre, +if there is a writer on the channel, data is transfered, the writer is removed from +the channel, and both the reader and writer become active. + +If there is no writer on the channel, the reader is suspended and added to the +channel: the channel now owns the fibre. + +Write operation is dual. + +If no one owns a channel, the channel is deleted along with all the fibres +it owns. Similarly, if no one owns a fibre, it is deleted, +and shared ownership of the any channels the fibre has will be relinquished. +Together this can lead to a recursive deletion of many channels and fibres. + +For example in a pipeline, if one fibre in the middle decides to quit, +it is deleted, and now the components on either side of it can no longer +write or read, so they will be deleted as well, and the effect cascades +and destroys the whole pipeline. + +To make this work, we have to use a double reference counting scheme. +Objects called channel endpoints reference count the channel +they're endpoints for. In turn, each endpoint is accessed by a smart pointer +which reference counts it, and can be passed around the continuations +of the fibre. The fibre itself must maintain an array of all the endpoints +it owns, the registry. + +When a fibre suspends on an I/O operation this is what happens: +first, the fibre is removed from the scheduler, and added to the channel. +We use a linked list in the channel. The reference count of the fibre +doesn't change at this point, since we simply moved ownership to the channel. + +Next, we have to decrement the reference count of all the registered endpoints, +since none of them can do any work, since the owning fibre is no longer active. +When the fibre is resumed all these reference counts must be incremented again. + +When the reference count of a channel endpoint would be decremented to zero, +the endpoint is deleted, which causes the reference count of the channel +it refers to to be decremented. + +And clearly, when the channel reference count goes to zero, it is deleted too. +When the channel is deleted, all the fibres owned by the channel are also deleted, +since none of them can become active again, since they're waiting on I/O on +the channel, which no longer exists. + +Deleting a fibre requires deleting all the continuations in its stack. +As this happens, references to channel endpoints are deleted, decrementing +the endpoint reference counts, until only the registry of the fibre remains. +When the references in the registry are deleted all the endpoints will be +deleted, which will decrement, and possibly delete, the channel referred to. + + +The implementation is not quite trivial, since it is recursive and +cycles must be broken to prevent an infinite recursion. This is done +by marking the channel when deletion is in progress, which stops +attempts to delete the owned fibres (since they're already being deleted). +In fact, we use the channel refcount itself as the mark: if the refcount +is zero, deletion of the channel is in progress. + +Note that channels are shared between fibres, but an endpoint +msut be exclusively owned by a single fibre. The smart pointer +to the channel endpoints can be copied, assigned, and moved +around but only within continuations of the fibre. + +When a fibre is spawned it must be passed references to channel +endpoints, which is done by constructing its registry and passing +that. It is vital the spawner delete all these endpoints it is not +allowed to own. This should be done as soon as possible because +whilst it owns these endpoints, the referred to channel is still +considered reachable. This must be done before the next I/O operation +because these endpoints are not registered in the +creators registry. This means if it is suspended, the reference counts +of the end points will not be decremented as required. + +One should note this is not trivial. When a fibre wants to spawn several others +and connect them with channels, it must hold the appropriate endpoint array +for each fibre it will spawn until they're done. On the other hand since +the first spawned fibre might start immediately and immediately do I/O, +the endpoints the other ends of the channels must be held long enough +to ensure that fibre does not immediately suicide. + +Another point to note is that a channel must have a reference count +of at least 2 to be useful, one for each of two endpoints in different +active fibres. + +Finally note, any number of fibres can subscribe to a channel. +Channels are untyped can can operate in any direction. +Similarly, endpoints are not intrinsically input or output. +A channel is, of course, doomed if there is only one subscriber. + + +@tangle channel.hpp +#ifndef CHANNEL +#define CHANNEL +#include +#include +#include +using uintptr_t = ::std::uintptr_t; +#include "fibre.hpp" +#include "allocator.hpp" + +struct channel_endpoint_t; + +// CHANNEL ABSTRACTION + +// low bit fiddling routines +inline static bool get_lowbit(void *p) { + return (uintptr_t)p & (uintptr_t)1u; +} +inline static void *clear_lowbit(void *p) { + return (void*)((uintptr_t)p & ~(uintptr_t)1u); +} +inline static void *set_lowbit(void *p) { + return (void*)((uintptr_t)p | (uintptr_t)1u); +} + +// channel +struct channel_t { + ::std::atomic_size_t refcnt; + fibre_t *top; + + channel_t () : top (nullptr), refcnt(1) {} + virtual ~channel_t(){ + //::std::cerr << "channel " << this << " destructor" << ::std::endl; + } + + // immobile object + channel_t(channel_t const&)=delete; + channel_t& operator= (channel_t const&)=delete; + + // required for the polymorphic deleter + virtual size_t size() const = 0; + + // push a fibre as a reader: precondition it must be a reader + // and, if the channel is non-empty it must contain only readers + virtual void push_reader(fibre_t *r)=0; + + // push a fibre as a writer: precondition it must be a writer + // and, if the channel is non-empty it must contain only writers + virtual void push_writer(fibre_t *w)=0; + + // pop a reader if there is one, otherwise nullptr + virtual fibre_t *pop_reader()=0; + + // pop a writer if there is one, otherwise nullptr + virtual fibre_t *pop_writer()=0; + + virtual void signal()=0; + + // channel read operation + virtual void read(void **target, fibre_t **pcurrent)=0; + + // channel write operation + virtual void write(void **source, fibre_t **pcurrent)=0; + +protected: + // basic push and pop operations, not thread safe + + void st_push_reader(fibre_t *r) { + r->next = top; + top = r; + } + + void st_push_writer(fibre_t *w) { + w->next = top; + top = (fibre_t*)set_lowbit(w); + } + + fibre_t *st_pop_reader() { + fibre_t *tmp = top; + if(!tmp || get_lowbit(tmp))return nullptr; + top = top -> next; + return tmp; // lowbit is clear, its a reader + } + + fibre_t *st_pop_writer() { + fibre_t *tmp = top; + if(!tmp || !get_lowbit(tmp)) return nullptr; + tmp = (fibre_t*)clear_lowbit(tmp); // lowbit is set for writer + top = tmp -> next; + return tmp; + } + +}; + +struct chan_epref_t { + channel_endpoint_t *endpoint; + chan_epref_t(channel_endpoint_t *p) : endpoint(p) {} // endpoint ctor sets refcnt to 1 initially + + channel_endpoint_t *operator->()const { return endpoint; } + channel_endpoint_t *get() const { return endpoint; } + + chan_epref_t() : endpoint(nullptr) {} + + // rvalue move + chan_epref_t(chan_epref_t &&p) { + if(p.endpoint) { + endpoint = p.endpoint; + p.endpoint = nullptr; + } + else endpoint = nullptr; + } + + // lvalue copy + chan_epref_t(chan_epref_t &p); + + // rvalue assign + void operator= (chan_epref_t &&p) { + if (&p!=this) { // ignore self assign + this->~chan_epref_t(); // destroy target + new(this) chan_epref_t(::std::move(p)); // move source to target + } + } // rass + + // lvalue assign + void operator= (chan_epref_t &p) { + if(&p!=this) { // ignore self assign + this->~chan_epref_t(); // destroy target + new(this) chan_epref_t(p); // copy source to target + } + } // lass + + + // destructor + // uses allocator passed to endpoint on construction to delete it + ~chan_epref_t(); +}; + + +// the allocator used to construct the endpoint +// must be passed to it so it can be used to delete it +// it MUST be the same allocator used to construct the channel +// try to FIXME so it is .. this requires storing the allocator in +// the channel OR ensuring all channels and endpoints are constructed +// using the system allocator +struct channel_endpoint_t { + size_t refcnt; + channel_t *channel; + alloc_ref_t allocator; + channel_endpoint_t(channel_t *p, alloc_ref_t a) : allocator(a), channel(p), refcnt(1) { ++channel->refcnt; } + + // immobile object + channel_endpoint_t(channel_endpoint_t const&)=delete; + channel_endpoint_t& operator= (channel_endpoint_t const&)=delete; + + // create a duplicate of the current endpoint refering + // to the same channel. Returns a chan_epref_t; a shared + // pointer to the new endpoint. Incredrummoyne@corleonemarinas.comments the counter + // of endpoints in the channel. + // note, C++ must construct a single object containing + // both the reference count and the channel endpoint. + + chan_epref_t dup() { + return chan_epref_t(new(allocator) channel_endpoint_t(channel, allocator)); + } + + ~channel_endpoint_t () { +// ::std::cerr << "Channel endpoint " << this << " destructor, channel " << channel << ", refcnt " << channel->refcnt.load() << ::std::endl; + switch (channel->refcnt.load()) { + case 0: break; + case 1: delete_channel(); break; + default: --channel->refcnt; break; + } + } + + void delete_channel() { + ::std::cerr << "Delete channel " << channel << ", refcnt " << channel->refcnt.load() << ::std::endl; + fibre_t *top = channel->top; + channel->top = nullptr; + channel->refcnt.store(0); + while (top) { + fibre_t *f = (fibre_t*)clear_lowbit(top); + fibre_t *tmp = f->next; +// ::std::cerr << "Channel " << channel << " Deletes fibre " << f << ", next=" << tmp << ::std::endl; + delete_concrete_object(f, allocator); + top = tmp; + } + //::std::cerr << "Deleting channel " << channel << " now" << ::std::endl; + delete_csp_polymorphic_object(channel, allocator); + } + +}; + + +// lvalue copy +chan_epref_t::chan_epref_t(chan_epref_t &p) { + if(p.endpoint) { + endpoint = p.endpoint; + p.endpoint->refcnt++; + } + else endpoint = nullptr; +} + +// destructor +// uses allocator passed to endpoint on construction to delete it +chan_epref_t::~chan_epref_t() { + if (endpoint) { + if(endpoint->refcnt == 1) { +//::std::cerr << "Endpoint ref " << this << " deletes endpoint " << endpoint << ::std::endl; + delete_concrete_object(endpoint, endpoint->allocator); + } + else + --endpoint->refcnt; + } +} // dtor + +chan_epref_t acquire_channel(alloc_ref_t a, channel_t *p) { + return chan_epref_t(new(a) channel_endpoint_t(p,a)); +} +#endif +@ + + +@h1 Service Requests +@tangle svc.hpp +#ifndef SVC_HPP +#define SVC_HPP +#include "allocator.hpp" + +struct chan_epref_t; +struct con_t; + +// service requests +// +// Request codes +enum svc_code_t { + read_request_code_e, + write_request_code_e, + spawn_fibre_request_code_e, + spawn_fibre_deferred_request_code_e, + spawn_process_request_code_e, + spawn_cothread_request_code_e +}; + +// synchronous I/O requests +struct io_request_t { + svc_code_t svc_code; + chan_epref_t *chan; + void **pdata; +}; + +// fibre and cothread spawn requests +struct spawn_fibre_request_t { + svc_code_t svc_code; + con_t *tospawn; +}; + +// fibre and cothread spawn requests +struct spawn_process_request_t { + svc_code_t svc_code; + con_t *tospawn; + alloc_ref_t process_allocator; +}; + + +// unified service request type (only used for casts) +union svc_req_t { + io_request_t io_request; + spawn_fibre_request_t spawn_fibre_request; + spawn_process_request_t spawn_process_request; + svc_code_t get_code () const { return io_request.svc_code; } +}; +#endif +@ + +@h1 Sequential Channel +@tangle sequential_channel.hpp +#ifndef SEQUENTIAL_CHANNEL +#define SEQUENTIAL_CHANNEL + +#include "channel.hpp" +#include "svc.hpp" +#include "csp_process.hpp" + +// SINGLE THREADED CHANNEL (no locking) + +// channel +struct sequential_channel_t : channel_t { + + sequential_channel_t () : channel_t() {} + + size_t size() const override { return sizeof(sequential_channel_t); } + void push_reader(fibre_t *r) override { st_push_reader(r); } + void push_writer(fibre_t *w) override { st_push_writer(w); } + fibre_t *pop_reader() override { return st_pop_reader(); } + fibre_t *pop_writer() override { return st_pop_writer(); } + + void signal() override {} + + void read(void **target, fibre_t **pcurrent) override { +//::std::cerr << *pcurrent << " Sequential Channel read begins " << ::std::endl; + fibre_t *current = *pcurrent; + fibre_t *w = st_pop_writer(); +//::std::cerr << *pcurrent << " Sequential Channel read finds writer " << w << ::std::endl; + if(w) { + ++refcnt; + *target = + *w->svc_req->io_request.pdata; // transfer data + w->process->push(w); // onto active list + } + else { + if(refcnt == 1) { + *pcurrent = current->process->pop(); // active list +//::std::cerr << "channel read " << this << " deletes fibre " << current << ::std::endl; + delete_concrete_object(current,current->process->process_allocator); + } else { + --refcnt; + st_push_reader(current); + *pcurrent = current->process->pop(); // active list + } + } + } + + void write(void **source, fibre_t **pcurrent) override { +//::std::cerr << *pcurrent << " Sequential Channel write begins " << ::std::endl; + fibre_t *current = *pcurrent; + fibre_t *r = st_pop_reader(); +//::std::cerr << *pcurrent << " Sequential Channel write finds reader " << r << ::std::endl; + if(r) { + ++refcnt; + *r->svc_req->io_request.pdata = *source; + + if(r->process == current->process) { + current->process->push(current); // current is writer, pushed onto active list + *pcurrent = r; // make reader current + } + else { + r->process->push(r); + } + } + else { + if(refcnt == 1) { + *pcurrent = current->process->pop(); // reset current from active list +//::std::cerr << "channel write " << this << " deletes fibre " << current << ::std::endl; + delete_concrete_object(current,current->process->process_allocator); + } else { + --refcnt; + st_push_writer(current); // i/o fail: push current onto channel + *pcurrent = current->process->pop(); // reset current from active list + } + } + } +}; + +chan_epref_t make_sequential_channel(alloc_ref_t a) { + return acquire_channel(a, new(a) sequential_channel_t); +} + + +void system_t::connect_sequential (chan_epref_t *left, chan_epref_t *right) { + auto chleft= make_sequential_channel(system_allocator); + auto chright= chleft ->dup(); + *left = chleft; + *right= chright; +} +#endif +@ + +@h1 Concurrent Channel +@tangle concurrent_channel.hpp +#ifndef CONCURRENT_CHANNEL +#define CONCURRENT_CHANNEL +#include "allocator.hpp" +#include "system.hpp" +#include "sequential_channel.hpp" + +// CONCURRENT CHANNEL (operations are locked, but no async) + +struct concurrent_channel_t : sequential_channel_t { + ::std::atomic_flag lk; + void lock() { while(lk.test_and_set(::std::memory_order_acquire)); } + void unlock() { lk.clear(::std::memory_order_release); } + + concurrent_channel_t () : lk(false) {} + + size_t size() const override { return sizeof(concurrent_channel_t); } + + void push_reader(fibre_t *r) override { + lock(); + st_push_reader(r); + unlock(); + } + void push_writer(fibre_t *w) override { + lock(); + st_push_writer(w); + unlock(); + } + fibre_t *pop_reader() override { + lock(); + auto r = st_pop_reader(); + unlock(); + return r; + } + fibre_t *pop_writer() override { + lock(); + auto w = st_pop_writer(); + unlock(); + return w; + } + + void read(void **target, fibre_t **pcurrent) override { + fibre_t *current = *pcurrent; + lock(); + fibre_t *w = st_pop_writer(); + if(w) { + ++refcnt; + unlock(); + *target = + *w->svc_req->io_request.pdata; // transfer data + w->process->push(w); // onto active list + } + else { + if(refcnt == 1) { + //::std::cerr << "Concurrent channel read deletes requesting fibre " << current << :: std::endl; + *pcurrent = current->process->pop(); // active list + delete_concrete_object(current,current->process->process_allocator); + } else { + --refcnt; + st_push_reader(current); + unlock(); + *pcurrent = current->process->pop(); // active list + } + } + } + + void write(void **source, fibre_t **pcurrent) override { + fibre_t *current = *pcurrent; + lock(); + fibre_t *r = st_pop_reader(); + if(r) { + ++refcnt; + unlock(); + *r->svc_req->io_request.pdata = *source; + + if(r->process == current->process) { + current->process->push(current); // current is writer, pushed onto active list + *pcurrent = r; // make reader current + } + else { + r->process->push(r); + } + } + else { + if(refcnt == 1) { + //::std::cerr << "Concurrent channel write deletes requesting fibre " << current << :: std::endl; + *pcurrent = current->process->pop(); // reset current from active list + delete_concrete_object(current,current->process->process_allocator); + } else { + --refcnt; + // ::std::cout<< "do_write: fibre " << current << ", set channel "<< chan <<" recnt to " << chan->refcnt << ::std::endl; + st_push_writer(current); // i/o fail: push current onto channel + unlock(); + *pcurrent = current->process->pop(); // reset current from active list + } + } + } + + +}; + +chan_epref_t make_concurrent_channel(alloc_ref_t a) { + return acquire_channel(a, new(a) concurrent_channel_t); +} + +void system_t::connect_concurrent (chan_epref_t *left, chan_epref_t *right) { + auto chleft= make_concurrent_channel(system_allocator); + auto chright= chleft->dup(); + *left = chleft; + *right= chright; +} +#endif +@ + +@h1 Asynchronous channel +@tangle async_channel.hpp +#ifndef ASYNC_CHANNEL +#define ASYNC_CHANNEL +#include "allocator.hpp" +#include "concurrent_channel.hpp" + +// ASYNCHRONOUS CHANNEL +// +// This is like a concurrent channel, except +// (a) it actively notifies possibly sleeping subscribers +// to the condition variable // that the channel has changed state. +// (b) It increments the async count of an active set when a fibre +// of that set is pushed onto the channel +// (c) decrements the async count when a fibre previously on the channel +// is made currect or put onto an active set +// +// NOTE: when a fibre tries to do I/O on a channel and is the +// only holder of an endpoint, the reference count will be 1. +// In this case, it must be deleted because the I/O request can never be +// satisfied. In turn this would decrement the reference count to 0, +// so the channel, and all fibres on it, also need to be deleted. +// Fibres on the channel may hold endpoints to the channel, +// so if the reference count goes to zero no action is taken, +// the channel is going to be deleted anyhow. +// +// There is no point signaling subscribers to the condition variable, +// because the purpose of that is to wake up readers and +// writers that the channel state has changed, in particular, an +// unsatisfied I/O request may have been performed, causing a fibre +// on the channel to now go onto an active set and be available for +// resumption. +// +// It is important to note that external devices such as a clock +// MUST prevent this by holding an endpoint to the channel. +// In particular a clock, for example, is considered active even if +// it is sleeping waiting for an alarm to go off or a request to come in. +// A clock holds a request channel endpoint, even when there are no +// clients. + +struct async_channel_t : concurrent_channel_t { + ::std::condition_variable cv; + ::std::mutex cv_lock; + + void signal() override { cv.notify_all(); } + + size_t size() const override { return sizeof(async_channel_t); } + + async_channel_t () {} + + void read(void **target, fibre_t **pcurrent) override { + fibre_t *current = *pcurrent; + ++current->process->async_count; + lock(); + fibre_t *w = st_pop_writer(); + if(w) { + ++refcnt; + unlock(); + *target = + *w->svc_req->io_request.pdata; // transfer data + w->process->push(w); // onto active list + } + else { + if(refcnt == 1) { +::std::cerr << "Async channel " << this << " read detects refcnt 1" << ::std::endl; + *pcurrent = current->process->pop(); // reset current from active list + delete_concrete_object(current,current->process->process_allocator); + return; // to prevent signalling a deleted channel + } else { + --refcnt; + st_push_reader(current); + unlock(); + *pcurrent = current->process->pop(); // active list + } + } + signal(); + } + + void write(void **source, fibre_t **pcurrent) override { + fibre_t *current = *pcurrent; + ++current->process->async_count; + lock(); + fibre_t *r = st_pop_reader(); + if(r) { + ++refcnt; + unlock(); + *r->svc_req->io_request.pdata = *source; + + if(r->process == current->process) { + current->process->push(current); // current is writer, pushed onto active list + *pcurrent = r; // make reader current + } + else { + r->process->push(r); + } + } + else { + if(refcnt == 1) { +::std::cerr << "Async channel " << this << " write detects refcnt 1" << ::std::endl; + delete_concrete_object(current,current->process->process_allocator); + *pcurrent = current->process->pop(); // reset current from active list + return; // to prevent signalling a deleted channel + } else { + --refcnt; + st_push_writer(current); // i/o fail: push current onto channel + unlock(); + *pcurrent = current->process->pop(); // reset current from active list + } + } + signal(); + } + + +}; + +chan_epref_t make_async_channel(alloc_ref_t a) { + return acquire_channel(a, new async_channel_t); +} +#endif +@ + + +@tangle system.hpp +#ifndef SYSTEM +#define SYSTEM +#include "allocator.hpp" +#include "channel.hpp" + +// For use by the kernel CSP system +struct system_t { + alloc_ref_t system_allocator; + system_t (alloc_ref_t a) : system_allocator(a) {} + + void connect_sequential (chan_epref_t *left, chan_epref_t *right); + void connect_concurrent (chan_epref_t *left, chan_epref_t *right); + void connect_async(chan_epref_t *left, chan_epref_t *right); + +}; +#endif +@ + +@h1 CSP Processes + +@tangle csp_process.hpp +#ifndef CSP_PROCESS +#define CSP_PROCESS +#include "system.hpp" + +// active set +struct csp_process_t { + ::std::atomic_size_t refcnt; + system_t *system; + alloc_ref_t process_allocator; + + fibre_t *active; + ::std::atomic_flag lock; // this one is a spin lock for sync ops + + // an async service which pushes a fibre onto the active + // set also decrements the active count and must + // signal this condition variable to wake up all schedulers + // so they notice the active set is now populated + + ::std::atomic_size_t async_count; + ::std::mutex async_lock; // mutex lock for async ops + ::std::condition_variable async_wake; + + void async_complete() { + //::std::cerr << "Active set: async complete" << ::std::endl; + --async_count; async_wake.notify_all(); + } + + ::std::atomic_size_t running_thread_count; + + csp_process_t(system_t *s, alloc_ref_t a) : + system(s), process_allocator(a), + refcnt(1), active(nullptr), async_count(0), lock(false), running_thread_count(0) + { + // ::std::cerr << "New process" << ::std::endl; + } + + csp_process_t *share() { ++refcnt; return this; } + void forget() { + --refcnt; + if(!atomic_load(&refcnt)) + delete_concrete_object(this,system->system_allocator); + } + + // push a new active fibre onto active list + void push(fibre_t *fresh) { +// ::std::cerr << "Active set push " << fresh << ::std::endl; + while(lock.test_and_set(::std::memory_order_acquire)); // spin + fresh->next = active; + active = fresh; + lock.clear(::std::memory_order_release); // release lock + } + // pop an active fibre off the active list + fibre_t *pop() { +// ::std::cerr << "Active set pop .. " << ::std::endl; + while(lock.test_and_set(::std::memory_order_acquire)); // spin + fibre_t *tmp = active; + if(tmp)active = tmp->next; + lock.clear(::std::memory_order_release); // release lock +// ::std::cerr << "Active set popped .. " << tmp << ::std::endl; + return tmp; + } +}; +#endif +@ + +@h1 CSP threads +@tangle csp_thread.hpp +#ifndef CSP_THREAD +#define CSP_THREAD +#include +// csp_thread_t4.hpp +#include "system.hpp" +#include "svc.hpp" +#include "csp_process.hpp" + +// scheduler +struct csp_thread_t { + csp_process_t *process; // chain of fibres ready to run + + fibre_t *current; // currently running fibre, nullptr if none + ~csp_thread_t() { process->forget(); } + + csp_thread_t(csp_process_t *a) : current(nullptr), process(a) {} + + void sync_run(con_t *); + void do_read(io_request_t *req); + void do_write(io_request_t *req); + void do_spawn_fibre(spawn_fibre_request_t *req); + void do_spawn_fibre_deferred(spawn_fibre_request_t *req); + void do_spawn_process(spawn_process_request_t *req); + void do_spawn_cothread(spawn_fibre_request_t *req); +}; + +extern void csp_run(system_t *system, alloc_ref_t process_allocator, con_t *init) { +::std::cerr << "csp_run start" << ::std::endl; + csp_thread_t (new(system->system_allocator) csp_process_t(system, process_allocator)).sync_run(init); +::std::cerr << "csp_run over " << ::std::endl; +} + +// scheduler subroutine runs until there is no work to do +void csp_thread_t::sync_run(con_t *cc) { + current = new(process->process_allocator) fibre_t(cc, process); + cc->fibre = current; + ++process->running_thread_count; +retry: + while(current) // while there's work to do + { + current->svc_req = nullptr; // null out service request + svc_req_t *svc_req = current->run_fibre(); + if(svc_req) // fibre issued service request + switch (svc_req->get_code()) + { + case read_request_code_e: + do_read(&(svc_req->io_request)); + break; + case write_request_code_e: + do_write(&(svc_req->io_request)); + break; + case spawn_fibre_request_code_e: + do_spawn_fibre(&(svc_req->spawn_fibre_request)); + break; + case spawn_fibre_deferred_request_code_e: + do_spawn_fibre_deferred(&(svc_req->spawn_fibre_request)); + break; + case spawn_process_request_code_e: + do_spawn_process(&(svc_req->spawn_process_request)); + break; + case spawn_cothread_request_code_e: + do_spawn_cothread(&(svc_req->spawn_fibre_request)); + break; + default: + assert(false); + } + else // the fibre returned without issuing a request so should be dead + { + assert(!current->cc); // check it's adead fibre + //::std::cerr << "csp_thread: null continuation in fibre, deleting fibre " << current << ::std::endl; + auto old_current = current; + current = nullptr; + delete_concrete_object(old_current,old_current->process->process_allocator); + current = process->pop(); // get more work + //::std::cerr << "csp_thread: new current fibre " << current << ::std::endl; + } + } + + // decrement running thread count + process->running_thread_count--; + + // Async events can reload the active set, but they do NOT change current +rewait: + // if the async count > 0 we're waiting for the async op to complete + // if the running thread count > 0 we're waiting for other threads to stall +// //::std::cerr << "Scheduler out of fibres: async count = " << process->async_count.load() << ::std::endl; + if(process->async_count.load() > 0 || process->running_thread_count.load() > 0) { + // delay + { +////::std::cerr << "Scheduler sleeping (inf)" << ::std::endl; + ::std::unique_lock<::std::mutex> lk(process->async_lock); + process->async_wake.wait_for(lk,::std::chrono::milliseconds(100000)); + } // lock released now + current = process->pop(); // get more work + if(current) { + process->running_thread_count++; + goto retry; + } + goto rewait; + } + +// //::std::cerr << "Scheduler out of work, returning" << ::std::endl; +} + + +void csp_thread_t::do_read(io_request_t *req) { + req->chan->get()->channel->read(current->svc_req->io_request.pdata, ¤t); +} + +void csp_thread_t::do_write(io_request_t *req) { + req->chan->get()->channel->write(current->svc_req->io_request.pdata, ¤t); +} + + +void csp_thread_t::do_spawn_fibre(spawn_fibre_request_t *req) { +// //::std::cerr << "do spawn" << ::std::endl; + current->svc_req=nullptr; + process->push(current); + con_t *cc= req->tospawn; + current = new(process->process_allocator) fibre_t(cc, process); + cc->fibre = current; +// //::std::cerr << "spawned " << current << ::std::endl; +} + +void csp_thread_t::do_spawn_fibre_deferred(spawn_fibre_request_t *req) { +// //::std::cerr << "do spawn deferred" << ::std::endl; + current->svc_req=nullptr; + con_t *init = req->tospawn; + fibre_t *d = new(process->process_allocator) fibre_t(init, process); + init->fibre = d; + process->push(d); +// //::std::cerr << "spawn deferred " << d << ::std::endl; +} + +static void spawn(csp_process_t *pa, con_t *cc) { + csp_thread_t(pa).sync_run(cc); +} +void csp_thread_t::do_spawn_process(spawn_process_request_t *req) { + csp_process_t *process = new(process->system->system_allocator) csp_process_t(process->system, req->process_allocator); + ::std::thread(spawn,process,req->tospawn).detach(); +} + +void csp_thread_t::do_spawn_cothread(spawn_fibre_request_t *req) { + current->process->refcnt++; + ::std::thread(spawn,current->process,req->tospawn).detach(); +} +#endif +@ + +@h1 CSP: the lot +@tangle csp.hpp +#ifndef CSP +#define CSP +#include + +// forward decls +struct csp_clock_t; +struct fibre_t; +struct channel_t; +struct csp_process_t; +struct con_t; +struct allocator_t; +struct alloc_ref_t; +struct system_t; +struct channel_endpoint_t; +struct chan_epref_t; + +// the csp system +#include "allocator.hpp" +#include "malloc_free.hpp" +//#include "utility_allocators.hpp" +//#include "freelist.hpp" +#include "system_allocator.hpp" +#include "system.hpp" + +#include "con.hpp" +#include "fibre.hpp" + +#include "csp_process.hpp" +#include "svc.hpp" + +con_t *coroutine_t::return_control() +{ +//::std::cerr << "Coroutine " << this << " returns control" << ::std::endl; + delete_csp_polymorphic_object(this,fibre->process->process_allocator); + return nullptr; +} + +con_t *subroutine_t::return_control() { + auto tmp = caller; + delete_csp_polymorphic_object(this,fibre->process->process_allocator); + return tmp; +} + + +// resolve circular reference +fibre_t::~fibre_t() +{ + //::std::cerr << "Fibre destructor " << this << ::std::endl; + while(cc) cc= cc->return_control(); +} + +#include "channel.hpp" +#include "csp_thread.hpp" +#include "sequential_channel.hpp" +#include "concurrent_channel.hpp" +#include "async_channel.hpp" +//#include "clock.hpp" + +#define CSP_SUBRETURN return return_control(); +#define CSP_COSUICIDE return return_control(); + +#define CSP_CALLDEF_START \ + con_t *call(con_t *caller_a + +#define CSP_CALLDEF_MID ){\ + caller = caller_a;\ + pc = 0; + +#define CSP_CALLDEF_END \ + return this;\ +} + +#define CSP_CODEF_START \ + con_t *setup( + +#define CSP_CODEF_MID ){\ + pc = 0; + +#define CSP_CODEF_END \ + return this;\ +} +#define CSP_RESUME_START\ + con_t *resume() override {\ + switch(pc++){\ + case 0: + +#define CSP_RESUME_END\ + default: assert(false);\ + }} + +#define CSP_SIZE \ + size_t size() const override { return sizeof(*this); } + +#define SVC_READ_REQ(xpreq,xpchan,xpdata)\ + (xpreq)->svc_code = read_request_code_e;\ + (xpreq)->pdata = (void**)xpdata;\ + (xpreq)->chan = xpchan; + +#define SVC_WRITE_REQ(xpreq,xpchan,xpdata)\ + (xpreq)->svc_code = write_request_code_e;\ + (xpreq)->pdata = (void**)xpdata;\ + (xpreq)->chan = xpchan; + +#define SVC_ASYNC_WRITE_REQ(xpreq,xpchan,xpdata)\ + (xpreq)->svc_code = async_write_request_code_e;\ + (xpreq)->pdata = (void**)xpdata;\ + (xpreq)->chan = xpchan; + +#define SVC_SPAWN_FIBRE_REQ(xpreq,xcont)\ + (xpreq)->svc_code = spawn_fibre_request_code_e;\ + (xpreq)->tospawn = xcont; + +#define SVC_SPAWN_FIBRE_DEFERRED_REQ(xpreq,xcont)\ + (xpreq)->svc_code = spawn_fibre_deferred_request_code_e;\ + (xpreq)->tospawn = xcont; + +#define SVC_SPAWN_PTHREAD_REQ(xprec,xcont)\ + (xpreq)->spawn_pthread_request_code_e;\ + (xpreq)->tospawn = xcont; + +#define SVC(preq)\ + fibre->svc_req = (svc_req_t*)(void*)preq;\ + return this; + +#define CSP_GOTO(caseno)\ + pc = caseno;\ + return this; + +/* +#define CSP_CALL_DIRECT0(procedure)\ + return (new procedure(global))->call(this); + +#define CSP_CALL_DIRECT1(procedure,arg)\ + return (new procedure(global))->call(this,arg); + +#define CSP_CALL_DIRECT2(procedure,arg1,arg2)\ + return (new procedure(global))->call(this,arg1,arg2); + + +#define CSP_CALL_DIRECT3(procedure,arg1,arg2,arg3)\ + return (new procedure(global))->call(this,arg1,arg2,arg3); +*/ +#endif +@ + +