Skip to content

Commit

Permalink
first messages exchanged!
Browse files Browse the repository at this point in the history
  • Loading branch information
apotonick committed Dec 10, 2023
1 parent b190615 commit fd16924
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 12 deletions.
44 changes: 36 additions & 8 deletions lib/trailblazer/workflow/collaboration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,14 @@ module Synchronous # DISCUSS: (file) location.
module_function

# @private
# Returns a fake Suspend event that maintains the actual start events in its {:resume_events}.
def initial_lane_positions(lanes)
lanes.collect do |activity|
catch_id = Trailblazer::Activity::Introspect.Nodes(activity, task: activity.to_h[:circuit].to_h[:start_task]).id # DISCUSS: store IDs or the actual catch event in {:resumes}?

[
activity,
{resume_events: [activity.to_h[:circuit].to_h[:start_task]]} # We deliberately have *one* position per lane, we're Synchronous.
{"resumes" => [catch_id]} # We deliberately have *one* position per lane, we're Synchronous.
]
end
.to_h
Expand All @@ -64,7 +67,8 @@ def advance(collaboration, (ctx, flow), circuit_options, lane_positions:, start_

circuit_options = circuit_options.merge(start_task: start_position[:task])

signal, (ctx, flow) = Activity::TaskWrap.invoke(start_position[:activity], [ctx, flow], **circuit_options)
# signal, (ctx, flow) = Activity::TaskWrap.invoke(start_position[:activity], [ctx, flow], **circuit_options)
signal, (ctx, flow) = Trailblazer::Developer.wtf?(start_position[:activity], [ctx, flow], **circuit_options)

# now we have :throw, or not
# @returns Event::Throw::Queued
Expand All @@ -73,7 +77,7 @@ def advance(collaboration, (ctx, flow), circuit_options, lane_positions:, start_
lane_positions = advance_position(lane_positions, start_position[:activity], signal)

break unless flow[:throw].any?
break if (@options[:skip_message_from] || []).include?(flow[:throw][-1][0]) # FIXME: untested!
# break if (@options[:skip_message_from] || []).include?(flow[:throw][-1][0]) # FIXME: untested!

flow, start_position = receiver_task(flow, message_flow)
# every time we "deliver" a message, we should check if it's allow (meaning the receiving activity is actually in the targeted catch event)
Expand All @@ -90,12 +94,15 @@ def advance(collaboration, (ctx, flow), circuit_options, lane_positions:, start_
# every time we "deliver" a message, we should check if it's allowed (meaning the receiving activity is actually in the targeted catch event)
# {:activity} and {:task} are the targeted position.
def validate_targeted_position(lane_positions, activity:, task:)
receiver_position = lane_positions[activity] # receiver should always be in a suspend task/event gateway.
# puts "@@@@@ #{start_task} ? #{receiver_position.inspect}"
# the *actual* receiver position, where we're currently.
actual_receiver_position = lane_positions[activity] # receiver should always be in a suspend task/event gateway.

if possible_catch_events = receiver_position.to_h[:resume_events]
return true if possible_catch_events.include?(task)
end
reachable_catch_events = actual_receiver_position.to_h["resumes"]
.collect { |catch_id| Trailblazer::Activity::Introspect.Nodes(activity, id: catch_id).task }

# if possible_catch_event_ids =
return true if reachable_catch_events.include?(task)
# end

raise "Message can't be passed to #{task} because #{activity} is not in appropriate position"
end
Expand All @@ -105,6 +112,27 @@ def validate_targeted_position(lane_positions, activity:, task:)
def advance_position(lane_positions, activity, suspend_event)
lane_positions.merge(activity => suspend_event)
end





# @private
def receiver_task(flow, message_flow)
next_throw, *remaining = flow[:throw]

throwing_event = next_throw[0] # DISCUSS: why array in Synchronous?
flow = flow.merge(throw: remaining)

return flow, receiver_position_for(message_flow, throwing_event)
end

# @private
def receiver_position_for(message_flow, throwing_event)
receiver_activity, catch_task = message_flow.fetch(throwing_event)

Position.new(receiver_activity, catch_task)
end
end # Synchronous
end
end
Expand Down
43 changes: 39 additions & 4 deletions test/collaboration_test.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
require "test_helper"

# TODO: how can we prevent users from triggering lifecycle.Create? only UI events allowed?
#
class CollaborationTest < Minitest::Spec
it "what" do
ui_create_form = "Activity_0wc2mcq" # TODO: this is from pro-rails tests.
Expand All @@ -26,6 +28,18 @@ class CollaborationTest < Minitest::Spec
ui_update_form_with_errors = "Activity_00kfo8w"
ui_rejected = "Event_1vb197y"

# FIXME: redundant with {lane_test}.
create_id = "Activity_0wwfenp"
update_id = "Activity_0q9p56e"
notify_id = "Activity_0wr78cv"
reject_id = "Activity_0d9yewp"
approve_id = "Activity_1qrkaz0"
revise_id = "Activity_18qv6ob"
publish_id = "Activity_1bjelgv"
delete_id = "Activity_0cc4us9"
archive_id = "Activity_1hgscu3"
success_id = "Event_1p8873y"


moderation_json = File.read("test/fixtures/v1/moderation.json")
signal, (ctx, _) = Trailblazer::Workflow::Generate.invoke([{json_document: moderation_json}, {}])
Expand Down Expand Up @@ -104,8 +118,8 @@ class CollaborationTest < Minitest::Spec
initial_lane_positions = Trailblazer::Workflow::Collaboration::Synchronous.initial_lane_positions(schema_hash[:lanes].values)

# TODO: do this in the State layer.
# start_task = [lane_activity_ui, initial_lane_positions[lane_activity_ui][:resume_events].first]
start_position = Trailblazer::Workflow::Collaboration::Position.new(lane_activity_ui, initial_lane_positions[lane_activity_ui][:resume_events].first)
start_task = Trailblazer::Activity::Introspect.Nodes(lane_activity_ui, id: "catch-before-#{ui_create_form}").task # catch-before-Activity_0wc2mcq
start_position = Trailblazer::Workflow::Collaboration::Position.new(lane_activity_ui, start_task)

configuration, (ctx, flow) = Trailblazer::Workflow::Collaboration::Synchronous.advance(
schema,
Expand All @@ -119,8 +133,29 @@ class CollaborationTest < Minitest::Spec
)

assert_equal configuration.lane_positions.keys, [lane_activity, lane_activity_ui]
assert_equal configuration.lane_positions.values.inspect, %([{:resume_events=>[#<Trailblazer::Workflow::Event::Catch start_task=true type=:catch_event semantic=[:catch, \"catch-before-Activity_0wwfenp\"]>]}, \
#<Trailblazer::Workflow::Event::Suspend resumes=[\"catch-before-#{ui_create}\"] type=:suspend semantic=[:suspend, \"suspend-Gateway_14h0q7a\"]>])
assert_equal configuration.lane_positions.values.inspect, %([{"resumes"=>["catch-before-Activity_0wwfenp"]}, \
#<Trailblazer::Workflow::Event::Suspend resumes=["catch-before-#{ui_create}"] type=:suspend semantic=[:suspend, "suspend-Gateway_14h0q7a"]>])
assert_equal ctx.inspect, %({:seq=>[:create_form]})

# create_form <submit>
start_task_id = Trailblazer::Activity::Introspect.Nodes(lane_activity_ui, id: "suspend-Gateway_14h0q7a").data["resumes"].first # "catch-before-Activity_1psp91r"
start_position = Trailblazer::Workflow::Collaboration::Position.new(lane_activity_ui, Trailblazer::Activity::Introspect.Nodes(lane_activity_ui, id: start_task_id).task)

configuration, (ctx, flow) = Trailblazer::Workflow::Collaboration::Synchronous.advance(
schema,
[{seq: []}, {throw: []}],
{}, # circuit_options

start_position: start_position,
lane_positions: configuration.lane_positions, # current position/"state"

message_flow: schema_hash[:message_flow],
)

assert_equal configuration.lane_positions.keys, [lane_activity, lane_activity_ui]

assert_equal configuration.lane_positions.values.inspect, %([#<Trailblazer::Workflow::Event::Suspend resumes=["catch-before-#{update_id}", "catch-before-#{notify_id}"] type=:suspend semantic=[:suspend, "suspend-Gateway_0fnbg3r"]>, \
#<Trailblazer::Workflow::Event::Suspend resumes=["catch-before-#{ui_update_form}"] type=:suspend semantic=[:suspend, "suspend-Gateway_01cn7zv"]>])
assert_equal ctx.inspect, %({:seq=>[:create, :create]})
end
end

0 comments on commit fd16924

Please sign in to comment.