Skip to content

Commit

Permalink
0.0.6: fork, spawn
Browse files Browse the repository at this point in the history
  • Loading branch information
disruptek committed Jul 24, 2020
1 parent 5501bb4 commit 157c184
Show file tree
Hide file tree
Showing 9 changed files with 89 additions and 19 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ Key primitives and their implementation status, in order of priority:
- [x] signal
- [x] wait
- [ ] I/O
- [ ] fork
- [x] fork
- [ ] thread

Windows is not supported by the included dispatcher yet due to the lack of
Expand Down
1 change: 1 addition & 0 deletions cps.nim
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type
Io
Attach
Detach
Fork

NodeFilter = proc(n: NimNode): NimNode

Expand Down
2 changes: 1 addition & 1 deletion cps.nimble
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
version = "0.0.5"
version = "0.0.6"
author = "disruptek"
description = "continuation-passing style"
license = "MIT"
Expand Down
57 changes: 45 additions & 12 deletions cps/eventqueue.nim
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,19 @@ const
cpsPoolSize {.intdefine.} = 64 ## expected pending continuations

type
Id = distinct int

State = enum
Unready = "the default state, pre-initialized"
Stopped = "we are outside an event loop but available for queuing events"
Running = "we're in a loop polling for events and running continuations"
Stopping = "we're tearing down the dispatcher and it will shortly stop"

Clock = MonoTime
Id = distinct int
Fd = distinct int

WaitingIds = seq[Id]
PendingIds = Table[Semaphore, Id]

EventQueue = object
state: State ## dispatcher readiness
pending: PendingIds ## maps pending semaphores to Ids
Expand All @@ -39,6 +39,7 @@ type
lastId: Id ## id of last-issued registration
selector: Selector[Id] ## watches selectable stuff
yields: Deque[Cont] ## continuations ready to run
waiters: int ## a count of selector listeners

manager: Selector[Clock] ## monitor polling, wake-ups
timer: Fd ## file-descriptor of polling timer
Expand All @@ -53,9 +54,9 @@ type
fd: Fd ## our last file-descriptor

const
wakeupId = Id(-1)
invalidId = Id(0)
invalidFd = Fd(-1)
wakeupId = Id(-1)
bogusIds = wakeupId .. invalidId
oneMs = initDuration(milliseconds = 1)

Expand All @@ -75,12 +76,24 @@ proc `[]=`(w: var WaitingIds; fd: int | Fd; id: Id) =
while fd.int > len(w):
setLen(w, len(w) * 2)
system.`[]=`(w, fd.int, id)
case id
of wakeupId, invalidId: # don't count invalid ids
discard
else:
inc eq.waiters

proc pop(w: var WaitingIds; fd: int | Fd): Id =
result = w[fd.int]
if result != wakeupId: # don't zap our wakeup id
if result != wakeupId: # don't zap our wakeup id
if result != invalidId: # don't count invalid ids
dec eq.waiters
w[fd.int] = invalidId

method clone[T](c: T): T =
## copy the continuation for the purposes of, eg. fork
result = new T
result[] = c[]

proc init() {.inline.} =
## initialize the event queue to prepare it for requests
if eq.state == Unready:
Expand All @@ -89,6 +102,7 @@ proc init() {.inline.} =
eq.manager = newSelector[Clock]()
eq.wake = newSelectEvent()
eq.selector = newSelector[Id]()
eq.waiters = 0

# make sure we have a decent amount of space for registrations
if len(eq.waiting) < cpsPoolSize:
Expand All @@ -103,7 +117,7 @@ proc init() {.inline.} =
# XXX: this seems to be the only reasonable way to get our wakeup fd
# we want to get the fd used for the wakeup event
trigger eq.wake
for ready in eq.selector.select(-1):
for ready in select(eq.selector, -1):
assert User in ready.events
eq.waiting[ready.fd] = wakeupId

Expand All @@ -114,7 +128,14 @@ proc init() {.inline.} =
proc nextId(): Id {.inline.} =
## generate a new registration identifier
init()
inc eq.lastId
# rollover is pretty unlikely, right?
when sizeof(eq.lastId) < 8:
if (unlikely) eq.lastId == high(eq.lastId):
eq.lastId = succ(invalidId)
else:
inc eq.lastId
else:
inc eq.lastId
result = eq.lastId

proc newSemaphore*(): Semaphore =
Expand Down Expand Up @@ -210,7 +231,7 @@ proc poll*() =
## See what continuations need running and run them.
if eq.state != Running: return

if len(eq) > 0:
if eq.waiters > 0:
when cpsDebug:
let clock = now()
let ready = select(eq.selector, -1)
Expand Down Expand Up @@ -239,16 +260,16 @@ proc poll*() =
else:
raise newException(KeyError, "missing registration " & $id)

if len(eq.yields) > 0:
# at this point, we've handled all timers and i/o so we can simply
# iterate over the yields and run them. to make sure we don't run
# any newly-added yields in this poll, we'll process no more than
# the current number of queued yields...
# iterate over the yields and run them. to make sure we don't run any
# newly-added continuations in this poll, we'll process no more than
# the current number of queued continuations...

for index in 1 .. len(eq.yields):
let cont = popFirst eq.yields
trampoline cont

elif eq.timer == invalidFd:
elif eq.timer == invalidFd and len(eq) == 0:
# if there's no timer and we have no pending continuations,
stop()
else:
Expand Down Expand Up @@ -360,3 +381,15 @@ proc cpsWait*(s: var Semaphore): Cont {.cpsMagic.} =
else:
eq[s] = id
eq[id] = c

proc cpsFork*(): Cont {.cpsMagic.} =
## Duplicate the current continuation.
result = c
wakeAfter:
addLast(eq.yields, clone(c))

proc cpsSpawn*(c: Cont) =
## Queue the supplied continuation `c`; control remains in the calling
## procedure.
wakeAfter:
addLast(eq.yields, c)
2 changes: 1 addition & 1 deletion docs/cps.html
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ <h1><a class="toc-backref" href="#17">Macros</a></h1>
<div class="twelve-columns footer">
<span class="nim-sprite"></span>
<br/>
<small style="color: var(--hint);">Made with Nim. Generated: 2020-07-24 01:25:16 UTC</small>
<small style="color: var(--hint);">Made with Nim. Generated: 2020-07-24 02:42:02 UTC</small>
</div>
</div>
</div>
Expand Down
24 changes: 21 additions & 3 deletions docs/cps/eventqueue.html
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,10 @@ <h1 class="title">eventqueue</h1>
title="cpsSignalAll(s: var Semaphore)"><wbr />cps<wbr />Signal<wbr />All<span class="attachedType"></span></a></li>
<li><a class="reference" href="#cpsWait%2CSemaphore"
title="cpsWait(s: var Semaphore)"><wbr />cps<wbr />Wait<span class="attachedType"></span></a></li>
<li><a class="reference" href="#cpsFork"
title="cpsFork()"><wbr />cps<wbr />Fork<span class="attachedType"></span></a></li>
<li><a class="reference" href="#cpsSpawn%2CCont"
title="cpsSpawn(c: Cont)"><wbr />cps<wbr />Spawn<span class="attachedType">Cont</span></a></li>

</ul>
</li>
Expand Down Expand Up @@ -269,14 +273,14 @@ <h1><a class="toc-backref" href="#12">Procs</a></h1>
<dt><pre><span class="Keyword">proc</span> <a href="#cpsSignal%2CSemaphore"><span class="Identifier">cpsSignal</span></a><span class="Other">(</span><span class="Identifier">s</span><span class="Other">:</span> <span class="Keyword">var</span> <a href="semaphore.html#Semaphore"><span class="Identifier">Semaphore</span></a><span class="Other">)</span> <span><span class="Other">{</span><span class="Other pragmadots">...</span><span class="Other">}</span></span><span class="pragmawrap"><span class="Other">{.</span><span class="pragma"><span class="Identifier">raises</span><span class="Other">:</span> <span class="Other">[</span><span class="Other">]</span><span class="Other">,</span> <span class="Identifier">tags</span><span class="Other">:</span> <span class="Other">[</span><span class="Other">]</span></span><span class="Other">.}</span></span></pre></dt>
<dd>

Signal the given semaphore, causing the first waiting continuation to be queued for execution in the dispatcher; control remains in the calling procedure.
Signal the given Semaphore <tt class="docutils literal"><span class="pre">s</span></tt>, causing the first waiting continuation to be queued for execution in the dispatcher; control remains in the calling procedure.

</dd>
<a id="cpsSignalAll,Semaphore"></a>
<dt><pre><span class="Keyword">proc</span> <a href="#cpsSignalAll%2CSemaphore"><span class="Identifier">cpsSignalAll</span></a><span class="Other">(</span><span class="Identifier">s</span><span class="Other">:</span> <span class="Keyword">var</span> <a href="semaphore.html#Semaphore"><span class="Identifier">Semaphore</span></a><span class="Other">)</span> <span><span class="Other">{</span><span class="Other pragmadots">...</span><span class="Other">}</span></span><span class="pragmawrap"><span class="Other">{.</span><span class="pragma"><span class="Identifier">raises</span><span class="Other">:</span> <span class="Other">[</span><span class="Other">]</span><span class="Other">,</span> <span class="Identifier">tags</span><span class="Other">:</span> <span class="Other">[</span><span class="Other">]</span></span><span class="Other">.}</span></span></pre></dt>
<dd>

Signal the given semaphore, causing all waiting continuations to be queued for execution in the dispatcher; control remains in the calling procedure.
Signal the given Semaphore <tt class="docutils literal"><span class="pre">s</span></tt>, causing all waiting continuations to be queued for execution in the dispatcher; control remains in the calling procedure.

</dd>
<a id="cpsWait,Semaphore"></a>
Expand All @@ -285,6 +289,20 @@ <h1><a class="toc-backref" href="#12">Procs</a></h1>

Queue the current continuation pending readiness of the given Semaphore <tt class="docutils literal"><span class="pre">s</span></tt>.

</dd>
<a id="cpsFork"></a>
<dt><pre><span class="Keyword">proc</span> <a href="#cpsFork"><span class="Identifier">cpsFork</span></a><span class="Other">(</span><span class="Other">)</span> <span><span class="Other">{</span><span class="Other pragmadots">...</span><span class="Other">}</span></span><span class="pragmawrap"><span class="Other">{.</span><span class="pragma"><span class="Identifier">raises</span><span class="Other">:</span> <span class="Other">[</span><span class="Other">]</span><span class="Other">,</span> <span class="Identifier">tags</span><span class="Other">:</span> <span class="Other">[</span><span class="Other">]</span></span><span class="Other">.}</span></span></pre></dt>
<dd>

Duplicate the current continuation.

</dd>
<a id="cpsSpawn,Cont"></a>
<dt><pre><span class="Keyword">proc</span> <a href="#cpsSpawn%2CCont"><span class="Identifier">cpsSpawn</span></a><span class="Other">(</span><span class="Identifier">c</span><span class="Other">:</span> <a href="eventqueue.html#Cont"><span class="Identifier">Cont</span></a><span class="Other">)</span> <span><span class="Other">{</span><span class="Other pragmadots">...</span><span class="Other">}</span></span><span class="pragmawrap"><span class="Other">{.</span><span class="pragma"><span class="Identifier">raises</span><span class="Other">:</span> <span class="Other">[</span><span class="Other">]</span><span class="Other">,</span> <span class="Identifier">tags</span><span class="Other">:</span> <span class="Other">[</span><span class="Identifier">TimeEffect</span><span class="Other">]</span></span><span class="Other">.}</span></span></pre></dt>
<dd>

Queue the supplied continuation <tt class="docutils literal"><span class="pre">c</span></tt>; control remains in the calling procedure.

</dd>

</dl></div>
Expand All @@ -301,7 +319,7 @@ <h1><a class="toc-backref" href="#19">Exports</a></h1>
<div class="twelve-columns footer">
<span class="nim-sprite"></span>
<br/>
<small style="color: var(--hint);">Made with Nim. Generated: 2020-07-24 01:25:17 UTC</small>
<small style="color: var(--hint);">Made with Nim. Generated: 2020-07-24 02:42:03 UTC</small>
</div>
</div>
</div>
Expand Down
2 changes: 1 addition & 1 deletion docs/cps/semaphore.html
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ <h1><a class="toc-backref" href="#18">Templates</a></h1>
<div class="twelve-columns footer">
<span class="nim-sprite"></span>
<br/>
<small style="color: var(--hint);">Made with Nim. Generated: 2020-07-24 01:25:17 UTC</small>
<small style="color: var(--hint);">Made with Nim. Generated: 2020-07-24 02:42:03 UTC</small>
</div>
</div>
</div>
Expand Down
13 changes: 13 additions & 0 deletions tests/tfork.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import cps
import cps/eventqueue

var r: int

proc adder(): Cont {.cps.} =
cpsFork()
inc r

cpsSpawn adder()
run()
if r != 2:
raise newException(Defect, "uh oh")
5 changes: 5 additions & 0 deletions tests/tfork.nim.cfg
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
--path="$config/.."
--threads:on
--define:threadsafe
--gc:arc
--define:cpsDebug

0 comments on commit 157c184

Please sign in to comment.