From d2d4df4d756e22be246eac492ef2c2c55ad4dc6e Mon Sep 17 00:00:00 2001 From: skaller Date: Thu, 21 Mar 2024 05:58:33 +1100 Subject: [PATCH] Fix bug in channel refcount --- src/packages/rt-alloc.fdoc | 112 ++++++++++- src/packages/rt-channel.fdoc | 351 ++++++++++++++++++++++++++++++----- 2 files changed, 407 insertions(+), 56 deletions(-) diff --git a/src/packages/rt-alloc.fdoc b/src/packages/rt-alloc.fdoc index 956faafdf..c56e91997 100644 --- a/src/packages/rt-alloc.fdoc +++ b/src/packages/rt-alloc.fdoc @@ -6,7 +6,12 @@ @tangler ts_allocator.hpp = share/lib/rtl/rt/ts_allocator.hpp @tangler ring_allocator.hpp = share/lib/rtl/rt/ring_allocator.hpp @tangler system_allocator.hpp = share/lib/rtl/rt/system_allocator.hpp +@tangler statistics_allocator.hpp = share/lib/rtl/rt/statistics_allocator.hpp + + @tangler test01.cxx = $PWD/test01.cxx + + @h1 Real Time Allocators In programming systems including C and C++, user space allocation is usually handled by a universal allocator such as @{malloc()}. The operation time and memory consumption @@ -415,8 +420,8 @@ struct static_bump_allocator_t : public allocator_t { size_t size()const override { return sizeof(*this); } // factory function - static alloc_ref_t create(void *p) { - return allocator_t::create (new static_bump_allocator_t (p)); + static alloc_ref_t create(void *parent) { + return allocator_t::create (new(parent) static_bump_allocator_t (parent)); } }; @ @@ -446,7 +451,7 @@ struct dynamic_bump_allocator_t : public allocator_t { // factory function static alloc_ref_t create(alloc_ref_t parent, size_t n) { - return allocator_t::create( new dynamic_bump_allocator_t(parent, n) ); + return allocator_t::create( new(parent) dynamic_bump_allocator_t(parent, n) ); } }; #endif @@ -459,6 +464,8 @@ However, deallocations put the argument block into a freelist which can be used to service a subsequent allocation request. The simplest variant is a bump allocator with an added freelist. +BUG: this won't compile because there's no parent. +The current situation is that if there's no parent the C++ new is used. @tangle block.hpp #ifndef BLOCK @@ -492,7 +499,7 @@ struct static_block_allocator_t : public allocator_t { // factory function static alloc_ref_t create(void *q, size_t n) { - return allocator_t::create( new static_block_allocator_t(q, n)); + return allocator_t::create( new(parent) static_block_allocator_t(q, n)); } }; #endif @@ -534,7 +541,7 @@ struct ts_allocator_t : public allocator_t { // factory function static alloc_ref_t create(alloc_ref_t parent, alloc_ref_t delegate) { - return allocator_t::create( new ts_allocator_t(parent, delegate)); + return allocator_t::create( new(parent) ts_allocator_t(parent, delegate)); } }; @@ -696,7 +703,7 @@ struct ring_buffer_t : public allocator_t { // factory function static alloc_ref_t create(alloc_ref_t parent, mem_req_t req) { - return allocator_t::create (new ring_buffer_t (parent, req)); + return allocator_t::create (new(parent) ring_buffer_t (parent, req)); } }; #endif @@ -709,6 +716,8 @@ This is the main allocator. #ifndef SYSTEM_ALLOCATOR #define SYSTEM_ALLOCATOR #include +#include + #include "ring_allocator.hpp" // FIXME: WARNING: the system allocator MUST BE CONSTRUCTED AT STARTUP @@ -744,7 +753,7 @@ struct system_allocator_t : public allocator_t { // factory function static alloc_ref_t create(alloc_ref_t parent, ::std::vector reqs) { - return allocator_t::create (new system_allocator_t (parent, reqs)); + return allocator_t::create (new(parent) system_allocator_t (parent, reqs)); } }; @@ -753,9 +762,13 @@ struct system_allocator_t : public allocator_t { ::std::vector fixup (::std::vector input) { ::std::vector output; for (auto req : input) { +//::std::cout << "Handling req " << req.block_size << ::std::endl; for( int idx = 0; idx <= output.size(); ++idx) { // past last element - if(idx == output.size()) output.push_back(req); + if(idx == output.size()) { + output.push_back(req); + break; + } // found equal so add to block count else if(req.block_size == output[idx].block_size) { @@ -770,10 +783,87 @@ struct system_allocator_t : public allocator_t { } } } +::std::cout << "DONE" << ::std::endl; + ::std::reverse(output.begin(), output.end()); return output; } +#endif +@ +@h1 Statictics Allocator +A delegating allocator that gathers statistics for the system allocator + +// saves statistics to a file +// stats saved: max allocation at any one time for each block size + + +@tangle statistics_allocator.hpp +#ifndef STATISTICS_ALLOCATOR +#define STATISTICS_ALLOCATOR + +#include +#include +#include +#include +#include "allocator.hpp" + +struct statistics_allocator_t : public allocator_t { + alloc_ref_t delegate; + + char const *filename; + using stat_t = ::std::map >; + using stat_rec_t = stat_t::value_type; + stat_t stats; + + statistics_allocator_t(alloc_ref_t parent, alloc_ref_t delegat, char const *tg ) : + filename(tg), allocator_t(parent), delegate(delegat) + { ::std::cout << "Statistics to " << tg << ::std::endl; } + + virtual size_t size()const override { return sizeof(*this); } + + void *allocate(size_t n) override { + auto p = delegate.allocate(n); + auto loc = stats.find(n); + if(loc == stats.end()) { + stat_rec_t rec = {n, {1, 1}}; + stats.insert(rec); + } + else { + auto rec = *loc; + rec.second.first++; + if(rec.second.first> rec.second.second) rec.second.second = rec.second.first; // max allocated + stats[n] = rec.second; + } + return p; + } + void deallocate(void *p, size_t n) override { + delegate.deallocate(p,n); + auto loc = stats.find(n); + if(loc == stats.end()) { + ::std::cerr << "Deallocate block of size " << n << " but that size block has never been allocated" << ::std::endl; + //::std::abort(); + return; + } + auto rec = *loc; + rec.second.first--; + if(rec.second.first> rec.second.second) rec.second.second = rec.second.first; // max allocated + stats[n] = rec.second; + } + + ~statistics_allocator_t() override { + auto outfile = ::std::fopen(filename,"w"); + ::std::cerr << "Stats written to " << filename << ::std::endl; + for (auto rec : stats) + ::std::fprintf(outfile, "%8lu: %8lu\n",rec.first, rec.second.second); + ::std::fclose(outfile); + } + + static alloc_ref_t create(alloc_ref_t parent, alloc_ref_t delegate, char const *filename) { + return allocator_t::create (new statistics_allocator_t (parent, delegate, filename)); + } + +}; #endif @ @@ -789,6 +879,7 @@ using namespace std; #include "ts_allocator.hpp" #include "ring_allocator.hpp" #include "system_allocator.hpp" +#include "statistics_allocator.hpp" int main () { cout << "Hello World" << endl; @@ -805,11 +896,12 @@ int main () { mem_req_t{128, 10}, mem_req_t{256, 10} }; - auto a6 = system_allocator_t(a1, config); + auto a6 = system_allocator_t::create(a1, config); + auto a7 = statistics_allocator_t::create(a1,a6,"stats.txt"); for (int z : { 18, 43, 75 }) { cout << "Request size " << z << endl; for(int i = 0; i < 6; ++i) { - void *p = a6.allocate (z); + void *p = a7.allocate (z); cout << "allocation " << i << " -> " << p << endl; } } diff --git a/src/packages/rt-channel.fdoc b/src/packages/rt-channel.fdoc index 059e2e221..2c425b350 100644 --- a/src/packages/rt-channel.fdoc +++ b/src/packages/rt-channel.fdoc @@ -11,6 +11,8 @@ @tangler fibre.hpp = share/lib/rtl/rt/fibre.hpp @tangler system.hpp = share/lib/rtl/rt/system.hpp +@tangler csp_01.cxx = $PWD/csp_01.cxx + @title Channel Abstraction @h1 Continuations @@ -229,7 +231,7 @@ struct channel_t { ::std::atomic_size_t refcnt; fibre_t *top; - channel_t () : top (nullptr), refcnt(1) {} + channel_t () : top (nullptr), refcnt(0) {} virtual ~channel_t(){ //::std::cerr << "channel " << this << " destructor" << ::std::endl; } @@ -241,20 +243,6 @@ struct channel_t { // required for the polymorphic deleter virtual size_t size() const = 0; - // push a fibre as a reader: precondition it must be a reader - // and, if the channel is non-empty it must contain only readers - virtual void push_reader(fibre_t *r)=0; - - // push a fibre as a writer: precondition it must be a writer - // and, if the channel is non-empty it must contain only writers - virtual void push_writer(fibre_t *w)=0; - - // pop a reader if there is one, otherwise nullptr - virtual fibre_t *pop_reader()=0; - - // pop a writer if there is one, otherwise nullptr - virtual fibre_t *pop_writer()=0; - virtual void signal()=0; // channel read operation @@ -365,7 +353,7 @@ struct channel_endpoint_t { } ~channel_endpoint_t () { -// ::std::cerr << "Channel endpoint " << this << " destructor, channel " << channel << ", refcnt " << channel->refcnt.load() << ::std::endl; + //::std::cerr << "Channel endpoint " << this << " destructor, channel " << channel << ", refcnt " << channel->refcnt.load() << ::std::endl; switch (channel->refcnt.load()) { case 0: break; case 1: delete_channel(); break; @@ -374,7 +362,7 @@ struct channel_endpoint_t { } void delete_channel() { - ::std::cerr << "Delete channel " << channel << ", refcnt " << channel->refcnt.load() << ::std::endl; + //::std::cerr << "Delete channel " << channel << ", refcnt " << channel->refcnt.load() << ::std::endl; fibre_t *top = channel->top; channel->top = nullptr; channel->refcnt.store(0); @@ -490,11 +478,6 @@ struct sequential_channel_t : channel_t { sequential_channel_t () : channel_t() {} size_t size() const override { return sizeof(sequential_channel_t); } - void push_reader(fibre_t *r) override { st_push_reader(r); } - void push_writer(fibre_t *w) override { st_push_writer(w); } - fibre_t *pop_reader() override { return st_pop_reader(); } - fibre_t *pop_writer() override { return st_pop_writer(); } - void signal() override {} void read(void **target, fibre_t **pcurrent) override { @@ -585,30 +568,8 @@ struct concurrent_channel_t : sequential_channel_t { size_t size() const override { return sizeof(concurrent_channel_t); } - void push_reader(fibre_t *r) override { - lock(); - st_push_reader(r); - unlock(); - } - void push_writer(fibre_t *w) override { - lock(); - st_push_writer(w); - unlock(); - } - fibre_t *pop_reader() override { - lock(); - auto r = st_pop_reader(); - unlock(); - return r; - } - fibre_t *pop_writer() override { - lock(); - auto w = st_pop_writer(); - unlock(); - return w; - } - void read(void **target, fibre_t **pcurrent) override { +//::std::cout << "read on " << this << ::std::endl; fibre_t *current = *pcurrent; lock(); fibre_t *w = st_pop_writer(); @@ -626,6 +587,7 @@ struct concurrent_channel_t : sequential_channel_t { delete_concrete_object(current,current->process->process_allocator); } else { --refcnt; + //::std::cout<< "do_read: fibre " << current << ", set channel "<< this <<" recnt to " << refcnt << ::std::endl; st_push_reader(current); unlock(); *pcurrent = current->process->pop(); // active list @@ -634,6 +596,7 @@ struct concurrent_channel_t : sequential_channel_t { } void write(void **source, fibre_t **pcurrent) override { +//::std::cout << "write on " << this << ::std::endl; fibre_t *current = *pcurrent; lock(); fibre_t *r = st_pop_reader(); @@ -657,7 +620,7 @@ struct concurrent_channel_t : sequential_channel_t { delete_concrete_object(current,current->process->process_allocator); } else { --refcnt; - // ::std::cout<< "do_write: fibre " << current << ", set channel "<< chan <<" recnt to " << chan->refcnt << ::std::endl; + //::std::cout<< "do_write: fibre " << current << ", set channel "<< this <<" recnt to " << refcnt << ::std::endl; st_push_writer(current); // i/o fail: push current onto channel unlock(); *pcurrent = current->process->pop(); // reset current from active list @@ -1188,4 +1151,300 @@ fibre_t::~fibre_t() #endif @ +@h1 Simple test case 1 +@tangle csp_01.cxx +#include +#include +#include + + +#include "csp.hpp" +#include "statistics_allocator.hpp" + +//#include "chips.hpp" + +// TEST CASE + +struct hello : coroutine_t { + + CSP_RESUME_START + ::std::cerr << "Hello World" << ::std::endl; + CSP_COSUICIDE + CSP_RESUME_END + CSP_SIZE +}; + + +struct producer : coroutine_t { + ::std::list *plst; + ::std::list::iterator it; + chan_epref_t out; + union { + void *iodata; + int value; + }; + io_request_t w_req; + + ~producer() { } + + CSP_CODEF_START + ::std::list *plst_a, + chan_epref_t outchan_a + CSP_CODEF_MID + plst = plst_a; + out = outchan_a; + CSP_CODEF_END + + CSP_RESUME_START + it = plst->begin(); + SVC_WRITE_REQ(&w_req,&out,&iodata); + + case 1: + if(it == plst->end()) { + CSP_COSUICIDE + } + value = *it++; + pc = 1; + SVC(&w_req) + + CSP_RESUME_END + CSP_SIZE +}; + +struct consumer: coroutine_t { + ::std::list *plst; + union { + void *iodata; + int value; + }; + io_request_t r_req; + chan_epref_t inp; + + ~consumer() {} + + CSP_CODEF_START + ::std::list *plst_a, + chan_epref_t inchan_a + CSP_CODEF_MID + plst = plst_a; + inp = inchan_a; + CSP_CODEF_END + + CSP_RESUME_START + SVC_READ_REQ(&r_req,&inp,&iodata) + + case 1: + SVC(&r_req) + + case 2: + ::std::cerr << "Consumer gets " << value << ::std::endl; + plst->push_back(value); + CSP_GOTO(1) + + CSP_RESUME_END + CSP_SIZE +}; + +struct square : subroutine_t { + int inp; + int *pout; + + square(fibre_t *f) : subroutine_t(f) {} + + CSP_CALLDEF_START, + int *pout_a, + int inp_a + CSP_CALLDEF_MID + pout = pout_a; + inp = inp_a; + CSP_CALLDEF_END + + CSP_RESUME_START + *pout = inp * inp; + { + con_t *tmp = caller; + delete_concrete_object(this, fibre->process->process_allocator); + return tmp; + } + CSP_RESUME_END + CSP_SIZE +}; + + +struct transducer: coroutine_t { + union { + void *iodata; + int value; + }; + io_request_t r_req; + io_request_t w_req; + chan_epref_t inp; + chan_epref_t out; + + ~transducer() {} + + CSP_CODEF_START + chan_epref_t inchan_a, + chan_epref_t outchan_a + CSP_CODEF_MID + inp = inchan_a; + out = outchan_a; + CSP_CODEF_END + + CSP_RESUME_START + SVC_READ_REQ(&r_req,&inp,&iodata) + SVC_WRITE_REQ(&w_req,&out,&iodata) + + case 1: + SVC(&r_req) + + case 2: + //CSP_CALL_DIRECT2(square,&value,value) + return (new(fibre->process->process_allocator) square(fibre))->call(this,&value,value); + + case 3: + pc = 1; + SVC(&w_req) + + CSP_RESUME_END + CSP_SIZE +}; + + +struct init: coroutine_t { + ::std::list *inlst; + ::std::list *outlst; + spawn_fibre_request_t spawn_req; + chan_epref_t ch1out; + chan_epref_t ch1inp; + chan_epref_t ch2out; + chan_epref_t ch2inp; + chan_epref_t clock_connection; + io_request_t clock_req; + double waituntil; + double *pwaituntil; + ::std::shared_ptr clock; + + ~init() {} + + // store parameters in + CSP_CODEF_START + ::std::list *lin, + ::std::list *lout + CSP_CODEF_MID + inlst = lin; + outlst = lout; + CSP_CODEF_END + + CSP_RESUME_START + ch1out = make_concurrent_channel(fibre->process->system->system_allocator); + ch1inp = ch1out->dup(); + ch2out = make_concurrent_channel(fibre->process->system->system_allocator); + ch2inp = ch2out->dup(); + + SVC_SPAWN_FIBRE_DEFERRED_REQ(&spawn_req, (new(fibre->process->process_allocator) producer)->setup(inlst, ch1out)) + SVC(&spawn_req) + + case 1: + SVC_SPAWN_FIBRE_DEFERRED_REQ(&spawn_req, (new(fibre->process->process_allocator) transducer)->setup(ch1inp, ch2out)) + SVC(&spawn_req) + + case 2: + SVC_SPAWN_FIBRE_DEFERRED_REQ(&spawn_req, (new(fibre->process->process_allocator) consumer)->setup(outlst,ch2inp)) + SVC(&spawn_req) + + case 3: + SVC_SPAWN_FIBRE_DEFERRED_REQ(&spawn_req, (new(fibre->process->process_allocator) hello)) + SVC(&spawn_req) + case 4: +/* + { + clock = make_clock(fibre->process->system); + clock->start(); + //::std::cerr << "Clock started, time is " << clock->now() << ::std::endl; + clock_connection = clock->connect(); + //::std::cerr << "Got connection" << ::std::endl; + { + double rightnow = clock->now(); + waituntil = rightnow + 12.10; + //::std::cerr << ::std::fixed << "Wait until" << waituntil << " for " << waituntil - rightnow << " seconds" << ::std::endl; + } + //::std::cerr << "Alarm time " << waituntil << ", stored at " << &waituntil + // << "=" << pwaituntil << " which is stored at " << &pwaituntil << ::std::endl; + pwaituntil = &waituntil; + SVC_WRITE_REQ(&clock_req,&clock_connection,&pwaituntil); + ::std::cerr<<"****** INIT Sleeping ********" << ::std::endl; + } +// too early to signal +// need different kind of channel w. cv + SVC(&clock_req) +// too late to signal + case 5: + // if this doesn't print, we didn't resume after the sleep correctly +*/ + ::std::cerr<<"****** INIT Sleep Over ********" << ::std::endl; + CSP_COSUICIDE + + + CSP_RESUME_END + CSP_SIZE +}; // init class + +#include + +int main() { + // create the input list + ::std::list inlst; + for (auto i = 0; i < 20; ++i) inlst.push_back(i); + + // empty output list + ::std::list outlst; + + { + ::std::vector reqs; + reqs.push_back(mem_req_t {sizeof(hello),50}); + reqs.push_back(mem_req_t {sizeof(producer),50}); + reqs.push_back(mem_req_t {sizeof(transducer),50}); + reqs.push_back(mem_req_t {sizeof(consumer),50}); + reqs.push_back(mem_req_t {sizeof(init),50}); + reqs.push_back(mem_req_t {sizeof(square),50}); + reqs.push_back(mem_req_t {256,50}); + reqs.push_back(mem_req_t {512,50}); + + ::std::cout << "N reqs = " << reqs.size() << ::std::endl; + auto fixed_reqs = fixup(reqs); + ::std::cout << "fixed up N reqs = " << fixed_reqs.size() << ::std::endl; + for(auto req : fixed_reqs) ::std::cout << req.block_size << " x " << req.n_blocks << ::std::endl; + // bootstrap allocator + alloc_ref_t malloc_free = malloc_free_allocator_t::create(); // parent C++ allocator +/* + //alloc_ref_t malloc_free_debugger = new(malloc_free) debugging_allocator_t(malloc_free, malloc_free, "Malloc"); + + + // system allocator + alloc_ref_t system_allocator_delegate = new(malloc_free_debugger) system_allocator_t(malloc_free_debugger,malloc_free_debugger, reqs); + alloc_ref_t system_allocator_debugger = new(malloc_free_debugger) debugging_allocator_t(malloc_free_debugger, system_allocator_delegate, "Sys"); + alloc_ref_t system_allocator = new(malloc_free_debugger) statistics_allocator_t( malloc_free_debugger, system_allocator_debugger, "Sysalloc.stats.txt"); + + // initial process will use the system allocator + alloc_ref_t process_allocator = system_allocator; +*/ + alloc_ref_t x = system_allocator_t::create(malloc_free, fixed_reqs); + alloc_ref_t system_allocator = statistics_allocator_t::create(malloc_free, x, "xstats.txt"); + alloc_ref_t process_allocator = system_allocator; + + // creates the clock too + system_t *system = new system_t(system_allocator); +::std::cout << "STARTING" << ::std::endl; + csp_run(system, process_allocator, (new(process_allocator) init)->setup(&inlst, &outlst)); +::std::cerr << "RUN COMPLETE" << ::std::endl; + delete system; + } + + // the result is now in the outlist so print it + ::std::cerr<< "main: +++++++++ List of squares:" << ::std::endl; + for(auto v : outlst) ::std::cerr << v << ::std::endl; + ::std::cerr<< "main: +++++++++ Done" << ::std::endl; +} +@