Skip to content

Commit

Permalink
Refactoring mapper tests with notifying Scheduler ctime
Browse files Browse the repository at this point in the history
  • Loading branch information
Janis Erdmanis committed Jun 7, 2024
1 parent 75b2dac commit 0421a18
Show file tree
Hide file tree
Showing 7 changed files with 136 additions and 111 deletions.
15 changes: 4 additions & 11 deletions src/Client.jl
Original file line number Diff line number Diff line change
Expand Up @@ -331,10 +331,8 @@ Model.commit(guard::CastGuard) = isempty(guard.ack_integrity) ? commit(guard.ack

Model.index(guard::CastGuard) = index(guard.ack_cast)


Model.isbinding(guard::CastGuard, ack::AckConsistency{BallotBoxState}) = isbinding(commit(guard), ack)
Model.isconsistent(guard::CastGuard, ack::AckConsistency{BallotBoxState}) = isconsistent(commit(guard), ack)
#ProtocolSchema.tracking_code(guard::CastGuard, spec::DemeSpec) = tracking_code(guard.vote, spec)

tracking_code(guard::CastGuard, spec::DemeSpec) = ProtocolSchema.tracking_code(guard.vote, spec) |> encode_crockford_base32

Expand Down Expand Up @@ -512,9 +510,7 @@ function Base.show(io::IO, voter::DemeAccount)
end


#function enroll!(voter::DemeAccount, router, ticketid, token) # EnrollGuard

function enroll!(voter::DemeAccount, router, invite::Invite) # EnrollGuard
function enroll!(voter::DemeAccount, router, invite::Invite; skip_registration = false) # EnrollGuard

if !isadmitted(voter)
admission = seek_admission(router, id(voter), invite)
Expand All @@ -523,7 +519,7 @@ function enroll!(voter::DemeAccount, router, invite::Invite) # EnrollGuard
voter.guard = EnrollGuard(admission)
end

enroll!(voter, router)
skip_registration || enroll!(voter, router)

return
end
Expand Down Expand Up @@ -554,8 +550,6 @@ function enroll!(voter::DemeAccount, route::Route) # For continuing from the las
# commit = get_chain_commit(router)
# @assert isbinding(commit, voter.deme)
# @assert verify(commit, crypto(voter.deme))

# voter.commit = commit

g = generator(voter.commit)

Expand Down Expand Up @@ -780,7 +774,7 @@ end
# Parser.marshal, Parser.unmarshal ; Client.enroll method seems like a good fit where to do parsing


function enroll!(invite::Invite; server::Route = route(invite.route), key::Union{Integer, Nothing} = nothing)
function enroll!(invite::Invite; server::Route = route(invite.route), key::Union{Integer, Nothing} = nothing, skip_registration = false)

spec = get_deme(server)

Expand All @@ -792,8 +786,7 @@ function enroll!(invite::Invite; server::Route = route(invite.route), key::Union
account = DemeAccount(spec, key, server)
end

#enroll!(account, server, invite.ticketid, invite.token)
enroll!(account, server, invite)
enroll!(account, server, invite; skip_registration)

return account
end
Expand Down
1 change: 1 addition & 0 deletions src/Core/Model/audit.jl
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ function audit_members(ledger::BraidChainLedger)
id(record) in identities && return false
id(record) in blacklist && return false # to prevent reissuing of membership
pseudonym(record) in members && return false
# also need to check that generator is correct

push!(tickets, ticket(record))
push!(identities, id(record))
Expand Down
6 changes: 3 additions & 3 deletions src/Core/Model/ballotbox.jl
Original file line number Diff line number Diff line change
Expand Up @@ -148,9 +148,9 @@ uuid(proposal::Proposal) = proposal.uuid



isdone(proposal::Proposal; time) = proposal.closed < time
isopen(proposal::Proposal; time) = proposal.open < time && proposal.closed > time
isstarted(proposal::Proposal; time) = proposal.open < time
isdone(proposal::Proposal; time) = proposal.closed <= time
isopen(proposal::Proposal; time) = proposal.open <= time && proposal.closed > time
isstarted(proposal::Proposal; time) = proposal.open <= time

"""
issuer(proposal::Proposal)
Expand Down
99 changes: 64 additions & 35 deletions src/Server/Mapper.jl
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,14 @@ const MEMBER_LOCK = ReentrantLock()
global BRAID_CHAIN::Union{BraidChainController, Nothing}

global POLLING_STATION::Union{PollingStation, Nothing}
const TALLY_SCHEDULER = Scheduler(UUID, retry_interval = 5)

global TALLY_SCHEDULER::Scheduler
global TALLY_PROCESS::Task
const TALLY_CONDITION = Condition()

const ENTROPY_SCHEDULER = Scheduler(retry_interval = 1)
global ENTROPY_SCHEDULER::Scheduler
global ENTROPY_PROCESS::Task
const ENTROPY_CONDITION = Condition() # Used to inform that event loop had finished

# global BRAID_BROKER::BraidBroker
const BRAID_BROKER_SCHEDULER = Scheduler(retry_interval = 5)
Expand Down Expand Up @@ -210,42 +213,43 @@ function entropy_process_loop()

end

# It is initialized with recording of proposal
#init_bbox_store(Controllers.ledger(bbox))
# BallotBox is initialized with recording of proposal to the BraidChain
# init_bbox_store(Controllers.ledger(bbox))

notify(ENTROPY_CONDITION, uuid)

return
end

# function broker_process_loop(; force = false)

function broker_process_loop(; force = false)

force || wait(BRAID_BROKER_SCHEDULER)
# no pooling thus no false triggers are expected here
# force || wait(BRAID_BROKER_SCHEDULER)
# # no pooling thus no false triggers are expected here

lock(MEMBER_LOCK)
# lock(MEMBER_LOCK)

try
_members = members(BRAID_CHAIN)
_braid = braid(BRAIDER_BROKER, _members) # one is selected at random from all available options
catch
retry!(BRAID_BROKER_SCHEDULER)
finally
unlock(MEMBER_LOCK)
end

Controllers.record!(BRAID_CHAIN, _braid)
Controllers.commit!(BRAID_CHAIN, RECORDER[])
# try
# _members = members(BRAID_CHAIN)
# _braid = braid(BRAIDER_BROKER, _members) # one is selected at random from all available options
# catch
# retry!(BRAID_BROKER_SCHEDULER)
# finally
# unlock(MEMBER_LOCK)
# end

return
end
# Controllers.record!(BRAID_CHAIN, _braid)
# Controllers.commit!(BRAID_CHAIN, RECORDER[])

# return
# end

function tally_process_loop()

uuid = wait(TALLY_SCHEDULER)

uuid = wait(TALLY_SCHEDULER)
tally_votes!(uuid; public=true)

notify(TALLY_CONDITION, uuid)

return
end

Expand Down Expand Up @@ -283,6 +287,9 @@ function load_system() # a kwarg could be passed on whether to audit the system

global POLLING_STATION = PollingStation()

global TALLY_SCHEDULER = Scheduler(UUID, retry_interval = 5)
global ENTROPY_SCHEDULER = Scheduler(UUID, retry_interval = 1)

for record in chain_ledger

if record isa Proposal
Expand Down Expand Up @@ -337,13 +344,22 @@ function load_system() # a kwarg could be passed on whether to audit the system
global COLLECTOR = load_signer(:collector)

if !isnothing(COLLECTOR)
global ENTROPY_PROCESS = @async while true
entropy_process_loop()

global ENTROPY_PROCESS = Task() do
while true
entropy_process_loop()
end
end
yield(ENTROPY_PROCESS)


global TALLY_PROCESS = @async while true
tally_process_loop()
global TALLY_PROCESS = Task() do
while true
tally_process_loop()
end
end
yield(TALLY_PROCESS)

end

return authorized_roles(BRAID_CHAIN.spec)
Expand Down Expand Up @@ -423,13 +439,26 @@ function setup(demefunc::Function, groupspec::GroupSpec, generator::Generator)
global COLLECTOR = Signer(demespec.crypto, generator, key_list[N])
store(COLLECTOR, :collector)

global ENTROPY_PROCESS = @async while true
entropy_process_loop()
# I may create SchedulerActor and constructors here EntropyActor, TallyActor
# to make the process more streamlined
global ENTROPY_SCHEDULER = Scheduler(UUID, retry_interval = 1)

global ENTROPY_PROCESS = Task() do
while true
entropy_process_loop()
end
end
yield(ENTROPY_PROCESS)

global TALLY_PROCESS = @async while true
tally_process_loop()

global TALLY_SCHEDULER = Scheduler(UUID, retry_interval = 5)

global TALLY_PROCESS = Task() do
while true
tally_process_loop()
end
end
yield(TALLY_PROCESS)
end

return authorized_roles(demespec) # I may deprecate this in favour of a method.
Expand Down Expand Up @@ -593,13 +622,13 @@ function submit_chain_record!(proposal::Proposal)
end


function cast_vote(uuid::UUID, vote::Vote; late_votes = false)
function cast_vote(uuid::UUID, vote::Vote; late_votes = false, ctime = Dates.now(UTC))

if !(Model.isstarted(get_proposal(uuid); time = Dates.now(UTC)))
if !(Model.isstarted(get_proposal(uuid); time = ctime))

error("Voting have not yet started")

elseif !late_votes && Model.isdone(get_proposal(uuid); time = Dates.now(UTC))
elseif !late_votes && Model.isdone(get_proposal(uuid); time = ctime)

error("Vote received for proposal too late")

Expand Down
57 changes: 19 additions & 38 deletions src/Utils/Schedulers.jl
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@ is in the event loop like:
scheduler = Scheduler(; retry_interval = 1)
lock(scheduler) do
schedule!(scheduler, now() + Second(1), value)
end
schedule!(scheduler, now() + Second(1), value)
while true
value = wait(scheduler)
Expand Down Expand Up @@ -51,19 +49,7 @@ Scheduler(; pool_interval=nothing, retry_interval=nothing, delay=0) = Scheduler(
Notify a scheduler with a value which is returned at `wait`.
"""
Base.notify(scheduler::Scheduler) = notify(scheduler.condition)
Base.notify(scheduler::Scheduler, value) = notify(scheduler.condition, value)

"""
lock(scheduler::Scheduler)
Lock a scheduler. This is necessary to avoid simultanous modifications of the `schedule` field.
Note that other `Scheduler` fields are not protected with the lock as thoose are considered
internal.
"""
Base.lock(scheduler::Scheduler) = lock(scheduler.condition)
Base.unlock(scheduler::Scheduler) = unlock(scheduler.condition)

Base.notify(scheduler::Scheduler; ctime::DateTime = now(UTC)) = notify(scheduler.condition, ctime)

"""
waituntil(time::DateTime)
Expand Down Expand Up @@ -93,7 +79,7 @@ no events are scheduled.
"""
function next_event(scheduler::Scheduler; ctime = now(UTC))

length(scheduler.schedule) == 0 && return nothing
isempty(scheduler) && return nothing

timestamp, value = scheduler.schedule[1]

Expand All @@ -104,6 +90,8 @@ function next_event(scheduler::Scheduler; ctime = now(UTC))
end


Base.isempty(scheduler::Scheduler) = length(scheduler.schedule) == 0

isstarted(scheduler::Scheduler) = scheduler.started
isfinished(scheduler::Scheduler) = scheduler.finished

Expand All @@ -113,26 +101,21 @@ isfinished(scheduler::Scheduler) = scheduler.finished
Wait until next event is reached and return it's value. In the case event have run through
smoothelly the scheduler event is droped with the next `wait` call. See also [`retry!`](@ref) method.
"""
function Base.wait(scheduler::Scheduler)
function Base.wait(scheduler::Scheduler; ctime::DateTime = now(UTC))

if isstarted(scheduler) && isfinished(scheduler)

# Need to test this
#lock(scheduler) do
# The lock is unnecessary as long as we limit ourselves to a single thread
popfirst!(scheduler.schedule)
#end

scheduler.started = false
scheduler.finished = false

end

if isnothing(next_event(scheduler))
wait(scheduler.condition) # It should be here
while isempty(scheduler)
ctime = wait(scheduler.condition) # It should be here
end

time, value = next_event(scheduler)

if isstarted(scheduler) && !isfinished(scheduler)

scheduler.started = true
Expand All @@ -142,37 +125,35 @@ function Base.wait(scheduler::Scheduler)
error("Retry interval not set")
else
Timer(timer -> notify(scheduler), scheduler.retry_interval)
wait(scheduler.condition)
wait(scheduler.condition) # An external wakeup would simply retry. A new value inserted preceding would be treated first
end

elseif !isstarted(scheduler)

scheduler.started = true
scheduler.finished = true

if time > 0

Timer(timer -> notify(scheduler), time)
wait(scheduler.condition)

while (time = next_event(scheduler; ctime) |> first) > 0
Timer(timer -> notify(scheduler), time) # If waiting happens here
ctime = wait(scheduler.condition)
end

else
error("Impossible scheduler state")
end

return value
# next_event is called also here to ensure that latest event that could have been scheduled along others is treated first
return next_event(scheduler) |> last
end


"""
schedule!(scheduler::Scheduler, timestamp::DateTime[, value])
Schedule an event at `timestamp` with a provided `value`. To avoid messing up a schedule acquire
a scheduler's lock before adding the event as:
Schedule an event at `timestamp` with a provided `value`.
schedule!(scheduler, now() + Second(1), value)
lock(scheduler) do
schedule!(scheduler, now() + Second(1), value)
end
"""
function schedule!(scheduler::Scheduler, timestamp::DateTime, value)

Expand Down
Loading

0 comments on commit 0421a18

Please sign in to comment.