Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support for query functionality #75

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
14 changes: 14 additions & 0 deletions examples/bin/query
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
#!/usr/bin/env ruby
require_relative '../init'

Dir[File.expand_path('../workflows/*.rb', __dir__)].each { |f| require f }

workflow_class_name, workflow_id, run_id, query, args = ARGV
workflow_class = Object.const_get(workflow_class_name)

if ![workflow_class, workflow_id, run_id, query].all?
fail 'Wrong arguments, use `bin/query WORKFLOW WORKFLOW_ID RUN_ID QUERY [ARGS]`'
end

result = Cadence.query_workflow(workflow_class, query, workflow_id, run_id, args)
puts result.inspect
1 change: 1 addition & 0 deletions examples/bin/worker
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ worker.register_workflow(SideEffectWorkflow)
worker.register_workflow(SimpleTimerWorkflow)
worker.register_workflow(TimeoutWorkflow)
worker.register_workflow(TripBookingWorkflow)
worker.register_workflow(QueryWorkflow)

worker.register_activity(AsyncActivity)
worker.register_activity(EchoActivity)
Expand Down
55 changes: 55 additions & 0 deletions examples/spec/integration/query_workflow_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
require 'examples/workflows/query_workflow'
require 'cadence/errors'


describe QueryWorkflow, :integration do
subject { described_class }

it 'returns the correct result for the queries' do
workflow_id, run_id = run_workflow(described_class)

# Query with nil workflow class
expect(Cadence.query_workflow(nil, 'state', workflow_id, run_id))
.to eq 'started'

# Query with arbitrary args
expect(Cadence.query_workflow(described_class, 'state', workflow_id, run_id,
'upcase', 'ignored', 'reverse'))
.to eq 'DETRATS'

# Query with no args
expect(Cadence.query_workflow(described_class, 'signal_count', workflow_id, run_id))
.to eq 0

# Query with unregistered handler
expect { Cadence.query_workflow(described_class, 'unknown_query', workflow_id, run_id) }
.to raise_error(Cadence::QueryFailed, 'Workflow did not register a handler for unknown_query')

Cadence.signal_workflow(described_class, 'make_progress', workflow_id, run_id)

# Query for updated signal_count with an unsatisfied reject condition
expect(Cadence.query_workflow(described_class, 'signal_count', workflow_id, run_id, query_reject_condition: :not_open))
.to eq 1

Cadence.signal_workflow(described_class, 'finish', workflow_id, run_id)
wait_for_workflow_completion(workflow_id, run_id)

# Repeating original query scenarios above, expecting updated state and signal results
expect(Cadence.query_workflow(nil, 'state', workflow_id, run_id))
.to eq 'finished'

expect(Cadence.query_workflow(described_class, 'state', workflow_id, run_id,
'upcase', 'ignored', 'reverse'))
.to eq 'DEHSINIF'

expect(Cadence.query_workflow(described_class, 'signal_count', workflow_id, run_id))
.to eq 2

expect { Cadence.query_workflow(described_class, 'unknown_query', workflow_id, run_id) }
.to raise_error(Cadence::QueryFailed, 'Workflow did not register a handler for unknown_query')

# Now that the workflow is completed, test a query with a reject condition satisfied
expect { Cadence.query_workflow(described_class, 'state', workflow_id, run_id, query_reject_condition: :not_open) }
.to raise_error(Cadence::QueryFailed, 'Query rejected: status WORKFLOW_EXECUTION_STATUS_COMPLETED')
end
end
36 changes: 36 additions & 0 deletions examples/workflows/query_workflow.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
class QueryWorkflow < Cadence::Workflow
attr_reader :state, :signal_count, :last_signal_received

def execute
@state = "started"
@signal_count = 0
@last_signal_received = nil

workflow.on_query("state") { |*args| apply_transforms(state, args) }
workflow.on_query("signal_count") { signal_count }

workflow.on_signal do |signal|
@signal_count += 1
@last_signal_received = signal
end

workflow.wait_for { last_signal_received == "finish" }
@state = "finished"

{
signal_count: signal_count,
last_signal_received: last_signal_received,
final_state: state
}
end

private

def apply_transforms(value, transforms)
return value if value.nil? || transforms.empty?
transforms.inject(value) do |memo, input|
next memo unless memo.respond_to?(input)
memo.public_send(input)
end
end
end
1 change: 1 addition & 0 deletions lib/cadence.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ module Cadence
:schedule_workflow,
:register_domain,
:signal_workflow,
:query_workflow,
:reset_workflow,
:terminate_workflow,
:fetch_workflow_execution_info,
Expand Down
11 changes: 11 additions & 0 deletions lib/cadence/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,17 @@ def signal_workflow(workflow, signal, workflow_id, run_id, input = nil)
)
end

def query_workflow(workflow, query, workflow_id, run_id, *args, domain: nil, query_reject_condition: nil)
connection.query_workflow(
domain: domain || workflow.domain,
workflow_id: workflow_id,
run_id: run_id,
query: query,
args: args,
query_reject_condition: query_reject_condition
)
end

def reset_workflow(domain, workflow_id, run_id, strategy: nil, decision_task_id: nil, reason: 'manual reset')
# Pick default strategy for backwards-compatibility
strategy ||= :last_decision_task unless decision_task_id
Expand Down
72 changes: 59 additions & 13 deletions lib/cadence/connection/thrift.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,21 @@ class Thrift
reject: CadenceThrift::WorkflowIdReusePolicy::RejectDuplicate
}.freeze

QUERY_REJECT_CONDITION = {
# none: CadenceThrift::QueryRejectCondition::NONE,
not_open: CadenceThrift::QueryRejectCondition::NOT_OPEN,
not_completed_cleanly: CadenceThrift::QueryRejectCondition::NOT_COMPLETED_CLEANLY
}.freeze

DEFAULT_OPTIONS = {
polling_ttl: 60, # 1 minute
max_page_size: 100
}.freeze

HISTORY_EVENT_FILTER = {
all: CadenceThrift::HistoryEventFilterType::ALL_EVENT,
close: CadenceThrift::HistoryEventFilterType::CLOSE_EVENT,
}.freeze
all: CadenceThrift::HistoryEventFilterType::ALL_EVENT,
close: CadenceThrift::HistoryEventFilterType::CLOSE_EVENT,
}.freeze

def initialize(host, port, identity, options = {})
@url = "http://#{host}:#{port}"
Expand Down Expand Up @@ -142,11 +148,12 @@ def poll_for_decision_task(domain:, task_list:)
send_request('PollForDecisionTask', request)
end

def respond_decision_task_completed(task_token:, decisions:)
def respond_decision_task_completed(task_token:, decisions:, query_results: {})
request = CadenceThrift::RespondDecisionTaskCompletedRequest.new(
identity: identity,
taskToken: task_token,
decisions: Array(decisions)
decisions: Array(decisions),
queryResults: query_results.transform_values { |value| Serializer.serialize(value) }
)
send_request('RespondDecisionTaskCompleted', request)
end
Expand Down Expand Up @@ -337,16 +344,55 @@ def get_search_attributes
raise NotImplementedError
end

def respond_query_task_completed
raise NotImplementedError
def respond_query_task_completed(task_token:, query_result:)
query_result_thrift = Serializer.serialize(query_result)
request = CadenceThrift::RespondQueryTaskCompletedRequest.new(
taskToken: task_token,
completedType: query_result_thrift.result_type,
queryResult: query_result_thrift.answer,
errorMessage: query_result_thrift.error_message,
)

client.respond_query_task_completed(request)
end

def reset_sticky_task_list
raise NotImplementedError
end

def query_workflow
raise NotImplementedError
def query_workflow(domain:, workflow_id:, run_id:, query:, args: nil, query_reject_condition: nil)
request = CadenceThrift::QueryWorkflowRequest.new(
domain: domain,
execution: CadenceThrift::WorkflowExecution.new(
workflow_id: workflow_id,
run_id: run_id
),
query: CadenceThrift::WorkflowQuery.new(
query_type: query,
query_args: JSON.serialize(args)
)
)
if query_reject_condition
condition = QUERY_REJECT_CONDITION[query_reject_condition]
raise Client::ArgumentError, 'Unknown query_reject_condition specified' unless condition

request.query_reject_condition = condition
end

begin
response = client.query_workflow(request)
rescue Cadence::InvalidArgument => e
raise Cadence::QueryFailed, e.details
end

if response.query_rejected
rejection_status = response.query_rejected.status || 'not specified by server'
raise Cadence::QueryFailed, "Query rejected: status #{rejection_status}"
elsif !response.query_result
raise Cadence::QueryFailed, 'Invalid response from server'
else
JSON.deserialize(response.query_result)
end
end

DeRauk marked this conversation as resolved.
Show resolved Hide resolved
def describe_workflow_execution(domain:, workflow_id:, run_id:)
Expand Down Expand Up @@ -389,9 +435,9 @@ def transport

def connection
@connection ||= begin
protocol = ::Thrift::BinaryProtocol.new(transport)
CadenceThrift::WorkflowService::Client.new(protocol)
end
protocol = ::Thrift::BinaryProtocol.new(transport)
CadenceThrift::WorkflowService::Client.new(protocol)
end
end

def send_request(name, request)
Expand Down Expand Up @@ -435,4 +481,4 @@ def serialize_status_filter(value)
end
end
end
end
end
5 changes: 5 additions & 0 deletions lib/cadence/errors.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,9 @@ class TimeoutError < ClientError; end
# A superclass for activity exceptions raised explicitly
# with the itent to propagate to a workflow
class ActivityException < ClientError; end

class ApiError < Error; end

class QueryFailed < ApiError; end

end
9 changes: 7 additions & 2 deletions lib/cadence/workflow/context.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@ class Workflow
class Context
attr_reader :metadata

def initialize(state_manager, dispatcher, metadata, config)
def initialize(state_manager, dispatcher, metadata, config, query_registry)
@state_manager = state_manager
@dispatcher = dispatcher
@query_registry = query_registry
@metadata = metadata
@config = config
end
Expand Down Expand Up @@ -227,6 +228,10 @@ def on_signal(&block)
end
end

def on_query(query, &block)
query_registry.register(query, &block)
end

def cancel_activity(activity_id)
decision = Decision::RequestActivityCancellation.new(activity_id: activity_id)

Expand All @@ -246,7 +251,7 @@ def cancel(target, cancelation_id)

private

attr_reader :state_manager, :dispatcher, :config
attr_reader :state_manager, :dispatcher, :config, :query_registry

def schedule_decision(decision)
state_manager.schedule(decision)
Expand Down
57 changes: 51 additions & 6 deletions lib/cadence/workflow/decision_task_processor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,19 @@
module Cadence
class Workflow
class DecisionTaskProcessor
Query = Struct.new(:query) do

def query_type
query.query_type
end

def query_args
JSON.deserialize(query.query_args)
end
end

MAX_FAILED_ATTEMPTS = 50
LEGACY_QUERY_KEY = :legacy_query

def initialize(task, domain, workflow_lookup, middleware_chain, config)
@task = task
Expand Down Expand Up @@ -39,7 +51,14 @@ def process
executor.run
end

complete_task(decisions)
query_results = executor.process_queries(parse_queries)

if legacy_query_task?
complete_query(query_results[LEGACY_QUERY_KEY])
else
complete_task(commands, query_results)
djung335 marked this conversation as resolved.
Show resolved Hide resolved
end

rescue StandardError => error
fail_task(error.inspect)
Cadence.logger.debug(error.backtrace.join("\n"))
Expand All @@ -53,7 +72,7 @@ def process
private

attr_reader :task, :domain, :task_token, :workflow_name, :workflow_class,
:middleware_chain, :config, :metadata
:middleware_chain, :config, :metadata

def connection
@connection ||= Cadence::Connection.generate(config.for_connection)
Expand Down Expand Up @@ -86,16 +105,42 @@ def fetch_full_history
Workflow::History.new(events)
end

def complete_task(decisions)
def legacy_query_task?
!!task.query
end

def parse_queries
# Support for deprecated query style
if legacy_query_task?
{ LEGACY_QUERY_KEY => Query.new(task.query) }
else
djung335 marked this conversation as resolved.
Show resolved Hide resolved
task.queries.each_with_object({}) do |(query_id, query), result|
result[query_id] = Query.new(query)
end
end
end

def complete_task(commands, query_results)
djung335 marked this conversation as resolved.
Show resolved Hide resolved
Cadence.logger.info("Decision task for #{workflow_name} completed")

connection.respond_decision_task_completed(
task_token: task_token,
decisions: serialize_decisions(decisions)
decisions: serialize_decisions(decisions),
query_results: query_results
)
djung335 marked this conversation as resolved.
Show resolved Hide resolved
end

def complete_query(result)
Cadence.logger.info("Workflow Query task completed", metadata.to_h)

connection.respond_query_task_completed(
task_token: task_token,
query_result: result
)
rescue StandardError => error
Cadence.logger.error("Unable to complete Decision task #{workflow_name}: #{error.inspect}")
Cadence::ErrorHandler.handle(error, metadata: metadata)
Cadence.logger.error("Unable to complete a query", metadata.to_h.merge(error: error.inspect))

Cadence::ErrorHandler.handle(error, config, metadata: metadata)
end

def fail_task(message)
Expand Down
Loading