Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support for query functionality #75

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
14 changes: 14 additions & 0 deletions examples/bin/query
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
#!/usr/bin/env ruby
require_relative '../init'

Dir[File.expand_path('../workflows/*.rb', __dir__)].each { |f| require f }

workflow_class_name, workflow_id, run_id, query, args = ARGV
workflow_class = Object.const_get(workflow_class_name)

if ![workflow_class, workflow_id, run_id, query].all?
fail 'Wrong arguments, use `bin/query WORKFLOW WORKFLOW_ID RUN_ID QUERY [ARGS]`'
end

result = Cadence.query_workflow(workflow_class, query, workflow_id, run_id, args)
puts result.inspect
1 change: 1 addition & 0 deletions examples/bin/worker
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ worker.register_workflow(SideEffectWorkflow)
worker.register_workflow(SimpleTimerWorkflow)
worker.register_workflow(TimeoutWorkflow)
worker.register_workflow(TripBookingWorkflow)
worker.register_workflow(QueryWorkflow)

worker.register_activity(AsyncActivity)
worker.register_activity(EchoActivity)
Expand Down
55 changes: 55 additions & 0 deletions examples/spec/integration/query_workflow_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
require 'examples/workflows/query_workflow'
require 'cadence/errors'


describe QueryWorkflow, :integration do
subject { described_class }

it 'returns the correct result for the queries' do
workflow_id, run_id = run_workflow(described_class)

# Query with nil workflow class
expect(Cadence.query_workflow(nil, 'state', workflow_id, run_id))
.to eq 'started'

# Query with arbitrary args
expect(Cadence.query_workflow(described_class, 'state', workflow_id, run_id,
'upcase', 'ignored', 'reverse'))
.to eq 'DETRATS'

# Query with no args
expect(Cadence.query_workflow(described_class, 'signal_count', workflow_id, run_id))
.to eq 0

# Query with unregistered handler
expect { Cadence.query_workflow(described_class, 'unknown_query', workflow_id, run_id) }
.to raise_error(Cadence::QueryFailed, 'Workflow did not register a handler for unknown_query')

Cadence.signal_workflow(described_class, 'make_progress', workflow_id, run_id)

# Query for updated signal_count with an unsatisfied reject condition
expect(Cadence.query_workflow(described_class, 'signal_count', workflow_id, run_id, query_reject_condition: :not_open))
.to eq 1

Cadence.signal_workflow(described_class, 'finish', workflow_id, run_id)
wait_for_workflow_completion(workflow_id, run_id)

# Repeating original query scenarios above, expecting updated state and signal results
expect(Cadence.query_workflow(nil, 'state', workflow_id, run_id))
.to eq 'finished'

expect(Cadence.query_workflow(described_class, 'state', workflow_id, run_id,
'upcase', 'ignored', 'reverse'))
.to eq 'DEHSINIF'

expect(Cadence.query_workflow(described_class, 'signal_count', workflow_id, run_id))
.to eq 2

expect { Cadence.query_workflow(described_class, 'unknown_query', workflow_id, run_id) }
.to raise_error(Cadence::QueryFailed, 'Workflow did not register a handler for unknown_query')

# Now that the workflow is completed, test a query with a reject condition satisfied
expect { Cadence.query_workflow(described_class, 'state', workflow_id, run_id, query_reject_condition: :not_open) }
.to raise_error(Cadence::QueryFailed, 'Query rejected: status WORKFLOW_EXECUTION_STATUS_COMPLETED')
end
end
36 changes: 36 additions & 0 deletions examples/workflows/query_workflow.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
class QueryWorkflow < Cadence::Workflow
attr_reader :state, :signal_count, :last_signal_received

def execute
@state = "started"
@signal_count = 0
@last_signal_received = nil

workflow.on_query("state") { |*args| apply_transforms(state, args) }
workflow.on_query("signal_count") { signal_count }

workflow.on_signal do |signal|
@signal_count += 1
@last_signal_received = signal
end

workflow.wait_for { last_signal_received == "finish" }
@state = "finished"

{
signal_count: signal_count,
last_signal_received: last_signal_received,
final_state: state
}
end

private

def apply_transforms(value, transforms)
return value if value.nil? || transforms.empty?
transforms.inject(value) do |memo, input|
next memo unless memo.respond_to?(input)
memo.public_send(input)
end
end
end
1 change: 1 addition & 0 deletions lib/cadence.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ module Cadence
:schedule_workflow,
:register_domain,
:signal_workflow,
:query_workflow,
:reset_workflow,
:terminate_workflow,
:fetch_workflow_execution_info,
Expand Down
11 changes: 11 additions & 0 deletions lib/cadence/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,17 @@ def signal_workflow(workflow, signal, workflow_id, run_id, input = nil)
)
end

def query_workflow(workflow, query, workflow_id, run_id, *args, domain: nil, query_reject_condition: nil)
connection.query_workflow(
domain: domain || workflow.domain,
workflow_id: workflow_id,
run_id: run_id,
query: query,
args: args,
query_reject_condition: query_reject_condition
)
end

def reset_workflow(domain, workflow_id, run_id, strategy: nil, decision_task_id: nil, reason: 'manual reset')
# Pick default strategy for backwards-compatibility
strategy ||= :last_decision_task unless decision_task_id
Expand Down
57 changes: 47 additions & 10 deletions lib/cadence/connection/thrift.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,21 @@ class Thrift
reject: CadenceThrift::WorkflowIdReusePolicy::RejectDuplicate
}.freeze

QUERY_REJECT_CONDITION = {
# none: CadenceThrift::QueryRejectCondition::NONE,
not_open: CadenceThrift::QueryRejectCondition::NOT_OPEN,
not_completed_cleanly: CadenceThrift::QueryRejectCondition::NOT_COMPLETED_CLEANLY
}.freeze

DEFAULT_OPTIONS = {
polling_ttl: 60, # 1 minute
max_page_size: 100
}.freeze

HISTORY_EVENT_FILTER = {
all: CadenceThrift::HistoryEventFilterType::ALL_EVENT,
close: CadenceThrift::HistoryEventFilterType::CLOSE_EVENT,
}.freeze
all: CadenceThrift::HistoryEventFilterType::ALL_EVENT,
close: CadenceThrift::HistoryEventFilterType::CLOSE_EVENT,
}.freeze

def initialize(host, port, identity, options = {})
@url = "http://#{host}:#{port}"
Expand Down Expand Up @@ -345,8 +351,39 @@ def reset_sticky_task_list
raise NotImplementedError
end

def query_workflow
raise NotImplementedError
def query_workflow(domain:, workflow_id:, run_id:, query:, args: nil, query_reject_condition: nil)
request = CadenceThrift::QueryWorkflowRequest.new(
domain: domain,
execution: CadenceThrift::WorkflowExecution.new(
workflow_id: workflow_id,
run_id: run_id
),
query: CadenceThrift::WorkflowQuery.new(
query_type: query,
query_args: args.to_json
djung335 marked this conversation as resolved.
Show resolved Hide resolved
)
)
if query_reject_condition
condition = QUERY_REJECT_CONDITION[query_reject_condition]
raise Client::ArgumentError, 'Unknown query_reject_condition specified' unless condition

request.query_reject_condition = condition
end

begin
response = client.query_workflow(request)
rescue ::GRPC::InvalidArgument => e
djung335 marked this conversation as resolved.
Show resolved Hide resolved
raise Cadence::QueryFailed, e.details
end

if response.query_rejected
rejection_status = response.query_rejected.status || 'not specified by server'
raise Cadence::QueryFailed, "Query rejected: status #{rejection_status}"
elsif !response.query_result
raise Cadence::QueryFailed, 'Invalid response from server'
else
response.query_result.from_json
djung335 marked this conversation as resolved.
Show resolved Hide resolved
end
end

DeRauk marked this conversation as resolved.
Show resolved Hide resolved
def describe_workflow_execution(domain:, workflow_id:, run_id:)
Expand Down Expand Up @@ -389,9 +426,9 @@ def transport

def connection
@connection ||= begin
protocol = ::Thrift::BinaryProtocol.new(transport)
CadenceThrift::WorkflowService::Client.new(protocol)
end
protocol = ::Thrift::BinaryProtocol.new(transport)
CadenceThrift::WorkflowService::Client.new(protocol)
end
end

def send_request(name, request)
Expand All @@ -413,7 +450,7 @@ def serialize_time_filter(from, to)
CadenceThrift::StartTimeFilter.new(
earliestTime: Cadence::Utils.time_to_nanos(from).to_i,
latestTime: Cadence::Utils.time_to_nanos(to).to_i,
)
)
end

def serialize_execution_filter(value)
Expand All @@ -435,4 +472,4 @@ def serialize_status_filter(value)
end
end
end
end
end
5 changes: 5 additions & 0 deletions lib/cadence/errors.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,9 @@ class TimeoutError < ClientError; end
# A superclass for activity exceptions raised explicitly
# with the itent to propagate to a workflow
class ActivityException < ClientError; end

class ApiError < Error; end

class QueryFailed < ApiError; end

end
9 changes: 7 additions & 2 deletions lib/cadence/workflow/context.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@ class Workflow
class Context
attr_reader :metadata

def initialize(state_manager, dispatcher, metadata, config)
def initialize(state_manager, dispatcher, metadata, config, query_registry)
@state_manager = state_manager
@dispatcher = dispatcher
@query_registry = query_registry
@metadata = metadata
@config = config
end
Expand Down Expand Up @@ -227,6 +228,10 @@ def on_signal(&block)
end
end

def on_query(query, &block)
query_registry.register(query, &block)
end

def cancel_activity(activity_id)
decision = Decision::RequestActivityCancellation.new(activity_id: activity_id)

Expand All @@ -246,7 +251,7 @@ def cancel(target, cancelation_id)

private

attr_reader :state_manager, :dispatcher, :config
attr_reader :state_manager, :dispatcher, :config, :query_registry

def schedule_decision(decision)
state_manager.schedule(decision)
Expand Down
26 changes: 24 additions & 2 deletions lib/cadence/workflow/executor.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
require 'fiber'

require 'cadence/workflow/dispatcher'
require 'cadence/workflow/query_registry'
require 'cadence/workflow/state_manager'
require 'cadence/workflow/context'
require 'cadence/workflow/history/event_target'
Expand All @@ -12,6 +13,7 @@ class Executor
def initialize(workflow_class, history, metadata, config)
@workflow_class = workflow_class
@dispatcher = Dispatcher.new
@query_registry = QueryRegistry.new
@state_manager = StateManager.new(dispatcher)
@metadata = metadata
@history = history
Expand All @@ -32,13 +34,33 @@ def run
return state_manager.decisions
end

# Process queries using the pre-registered query handlers
#
# @note this method is expected to be executed after the history has
# been fully replayed (by invoking the #run method)
#
# @param queries [Hash<String, Cadence::Workflow::TaskProcessor::Query>]
#
# @return [Hash<String, Cadence::Workflow::QueryResult>]
def process_queries(queries = {})
queries.transform_values(&method(:process_query))
end

private

attr_reader :workflow_class, :dispatcher, :state_manager, :metadata, :history, :config
attr_reader :workflow_class, :dispatcher, :state_manager, :metadata, :history, :config, :query_registry

def process_query(query)
result = query_registry.handle(query.query_type, query.query_args)

QueryResult.answer(result)
rescue StandardError => error
QueryResult.failure(error)
end

def execute_workflow(input, workflow_started_event_attributes)
metadata = generate_workflow_metadata_from(workflow_started_event_attributes)
context = Workflow::Context.new(state_manager, dispatcher, metadata, config)
context = Workflow::Context.new(state_manager, dispatcher, metadata, config, query_registry)

Fiber.new do
workflow_class.execute_in_context(context, input)
djung335 marked this conversation as resolved.
Show resolved Hide resolved
Expand Down
33 changes: 33 additions & 0 deletions lib/cadence/workflow/query_registry.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
require 'cadence/errors'

module Cadence
class Workflow
class QueryRegistry
def initialize
@handlers = {}
end

def register(type, &handler)
if handlers.key?(type)
warn "[NOTICE] Overwriting a query handler for #{type}"
end

handlers[type] = handler
end

def handle(type, args = nil)
handler = handlers[type]

unless handler
raise Cadence::QueryFailed, "Workflow did not register a handler for #{type}"
end

handler.call(*args)
end

private

attr_reader :handlers
end
end
end
16 changes: 16 additions & 0 deletions lib/cadence/workflow/query_result.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
module Cadence
class Workflow
module QueryResult
Answer = Struct.new(:result)
Failure = Struct.new(:error)

def self.answer(result)
Answer.new(result).freeze
end

def self.failure(error)
Failure.new(error).freeze
end
end
end
end
Loading