Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support for query functionality #75

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
14 changes: 14 additions & 0 deletions examples/bin/query
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
#!/usr/bin/env ruby
require_relative '../init'

Dir[File.expand_path('../workflows/*.rb', __dir__)].each { |f| require f }

workflow_class_name, workflow_id, run_id, query, args = ARGV
workflow_class = Object.const_get(workflow_class_name)

if ![workflow_class, workflow_id, run_id, query].all?
fail 'Wrong arguments, use `bin/query WORKFLOW WORKFLOW_ID RUN_ID QUERY [ARGS]`'
end

result = Cadence.query_workflow(workflow_class, query, workflow_id, run_id, args)
puts result.inspect
1 change: 1 addition & 0 deletions examples/bin/worker
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ worker.register_workflow(SideEffectWorkflow)
worker.register_workflow(SimpleTimerWorkflow)
worker.register_workflow(TimeoutWorkflow)
worker.register_workflow(TripBookingWorkflow)
worker.register_workflow(QueryWorkflow)

worker.register_activity(AsyncActivity)
worker.register_activity(EchoActivity)
Expand Down
55 changes: 55 additions & 0 deletions examples/spec/integration/query_workflow_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
require 'workflows/query_workflow'
require 'cadence/errors'


describe QueryWorkflow, :integration do
subject { described_class }

it 'returns the correct result for the queries' do
workflow_id, run_id = run_workflow(described_class)

# Query with nil workflow class
expect(Cadence.query_workflow(nil, 'state', workflow_id, run_id))
.to eq 'started'

# Query with arbitrary args
expect(Cadence.query_workflow(described_class, 'state', workflow_id, run_id,
'upcase', 'ignored', 'reverse'))
.to eq 'DETRATS'

# Query with no args
expect(Cadence.query_workflow(described_class, 'signal_count', workflow_id, run_id))
.to eq 0

# Query with unregistered handler
expect { Cadence.query_workflow(described_class, 'unknown_query', workflow_id, run_id) }
.to raise_error(Cadence::QueryFailed, 'Workflow did not register a handler for unknown_query')

Cadence.signal_workflow(described_class, 'make_progress', workflow_id, run_id)

# Query for updated signal_count with an unsatisfied reject condition
expect(Cadence.query_workflow(described_class, 'signal_count', workflow_id, run_id, query_reject_condition: :not_open))
.to eq 1

Cadence.signal_workflow(described_class, 'finish', workflow_id, run_id)
wait_for_workflow_completion(workflow_id, run_id)

# Repeating original query scenarios above, expecting updated state and signal results
expect(Cadence.query_workflow(nil, 'state', workflow_id, run_id))
.to eq 'finished'

expect(Cadence.query_workflow(described_class, 'state', workflow_id, run_id,
'upcase', 'ignored', 'reverse'))
.to eq 'DEHSINIF'

expect(Cadence.query_workflow(described_class, 'signal_count', workflow_id, run_id))
.to eq 2

expect { Cadence.query_workflow(described_class, 'unknown_query', workflow_id, run_id) }
.to raise_error(Cadence::QueryFailed, 'Workflow did not register a handler for unknown_query')

# Now that the workflow is completed, test a query with a reject condition satisfied
expect { Cadence.query_workflow(described_class, 'state', workflow_id, run_id, query_reject_condition: :not_open) }
.to raise_error(Cadence::QueryFailed, 'Query rejected: status WORKFLOW_EXECUTION_STATUS_COMPLETED')
end
end
36 changes: 36 additions & 0 deletions examples/workflows/query_workflow.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
class QueryWorkflow < Cadence::Workflow
attr_reader :state, :signal_count, :last_signal_received

def execute
@state = "started"
@signal_count = 0
@last_signal_received = nil

workflow.on_query("state") { |*args| apply_transforms(state, args) }
workflow.on_query("signal_count") { signal_count }

workflow.on_signal do |signal|
@signal_count += 1
@last_signal_received = signal
end

workflow.wait_for { last_signal_received == "finish" }
@state = "finished"

{
signal_count: signal_count,
last_signal_received: last_signal_received,
final_state: state
}
end

private

def apply_transforms(value, transforms)
return value if value.nil? || transforms.empty?
transforms.inject(value) do |memo, input|
next memo unless memo.respond_to?(input)
memo.public_send(input)
end
end
end
1 change: 1 addition & 0 deletions lib/cadence.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ module Cadence
:schedule_workflow,
:register_domain,
:signal_workflow,
:query_workflow,
:reset_workflow,
:terminate_workflow,
:fetch_workflow_execution_info,
Expand Down
12 changes: 12 additions & 0 deletions lib/cadence/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,18 @@ def signal_workflow(workflow, signal, workflow_id, run_id, input = nil)
)
end

def query_workflow(workflow, query, workflow_id, run_id, *args, domain: nil, query_reject_condition: nil)
execution_options = ExecutionOptions.new(workflow, {}, config.default_execution_options)
connection.query_workflow(
domain: domain || execution_options.domain,
workflow_id: workflow_id,
run_id: run_id,
query: query,
args: args,
query_reject_condition: query_reject_condition
)
end

def reset_workflow(domain, workflow_id, run_id, strategy: nil, decision_task_id: nil, reason: 'manual reset')
# Pick default strategy for backwards-compatibility
strategy ||= :last_decision_task unless decision_task_id
Expand Down
75 changes: 62 additions & 13 deletions lib/cadence/connection/thrift.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,21 @@ class Thrift
reject: CadenceThrift::WorkflowIdReusePolicy::RejectDuplicate
}.freeze

QUERY_REJECT_CONDITION = {
# none: CadenceThrift::QueryRejectCondition::NONE,
not_open: CadenceThrift::QueryRejectCondition::NOT_OPEN,
not_completed_cleanly: CadenceThrift::QueryRejectCondition::NOT_COMPLETED_CLEANLY
}.freeze

DEFAULT_OPTIONS = {
polling_ttl: 60, # 1 minute
max_page_size: 100
}.freeze

HISTORY_EVENT_FILTER = {
all: CadenceThrift::HistoryEventFilterType::ALL_EVENT,
close: CadenceThrift::HistoryEventFilterType::CLOSE_EVENT,
}.freeze
all: CadenceThrift::HistoryEventFilterType::ALL_EVENT,
close: CadenceThrift::HistoryEventFilterType::CLOSE_EVENT,
}.freeze

def initialize(host, port, identity, options = {})
@url = "http://#{host}:#{port}"
Expand Down Expand Up @@ -142,11 +148,12 @@ def poll_for_decision_task(domain:, task_list:)
send_request('PollForDecisionTask', request)
end

def respond_decision_task_completed(task_token:, decisions:)
def respond_decision_task_completed(task_token:, decisions:, query_results: {})
request = CadenceThrift::RespondDecisionTaskCompletedRequest.new(
identity: identity,
taskToken: task_token,
decisions: Array(decisions)
decisions: Array(decisions),
queryResults: query_results.transform_values { |value| Serializer.serialize(value) }
)
send_request('RespondDecisionTaskCompleted', request)
end
Expand Down Expand Up @@ -337,16 +344,58 @@ def get_search_attributes
raise NotImplementedError
end

def respond_query_task_completed
raise NotImplementedError
def respond_query_task_completed(task_token:, query_result:)
query_result_thrift = Serializer.serialize(query_result)
request = CadenceThrift::RespondQueryTaskCompletedRequest.new(
taskToken: task_token,
completedType: query_result_thrift.result_type,
queryResult: query_result_thrift.answer,
errorMessage: query_result_thrift.error_message,
)

client.respond_query_task_completed(request)
end

def reset_sticky_task_list
raise NotImplementedError
end

def query_workflow
raise NotImplementedError
def query_workflow(domain:, workflow_id:, run_id:, query:, args: nil, query_reject_condition: nil)
request = CadenceThrift::QueryWorkflowRequest.new(
domain: domain,
execution: CadenceThrift::WorkflowExecution.new(
workflowId: workflow_id,
runId: run_id
),
query: CadenceThrift::WorkflowQuery.new(
queryType: query,
queryArgs: JSON.serialize(args)
)
)
if query_reject_condition
condition = QUERY_REJECT_CONDITION[query_reject_condition]
raise Client::ArgumentError, 'Unknown query_reject_condition specified' unless condition

request.query_reject_condition = condition
end

begin
response = client.query_workflow(request)
puts(response)
# rescue InvalidArgument => e doesn't seem to work
#
rescue Error => e
raise Cadence::QueryFailed, e.details
end

if response.query_rejected
rejection_status = response.query_rejected.status || 'not specified by server'
raise Cadence::QueryFailed, "Query rejected: status #{rejection_status}"
elsif !response.query_result
raise Cadence::QueryFailed, 'Invalid response from server'
else
JSON.deserialize(response.query_result)
end
end

DeRauk marked this conversation as resolved.
Show resolved Hide resolved
def describe_workflow_execution(domain:, workflow_id:, run_id:)
Expand Down Expand Up @@ -389,9 +438,9 @@ def transport

def connection
@connection ||= begin
protocol = ::Thrift::BinaryProtocol.new(transport)
CadenceThrift::WorkflowService::Client.new(protocol)
end
protocol = ::Thrift::BinaryProtocol.new(transport)
CadenceThrift::WorkflowService::Client.new(protocol)
end
end

def send_request(name, request)
Expand Down Expand Up @@ -435,4 +484,4 @@ def serialize_status_filter(value)
end
end
end
end
end
5 changes: 5 additions & 0 deletions lib/cadence/errors.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,9 @@ class TimeoutError < ClientError; end
# A superclass for activity exceptions raised explicitly
# with the itent to propagate to a workflow
class ActivityException < ClientError; end

class ApiError < Error; end

class QueryFailed < ApiError; end

end
2 changes: 2 additions & 0 deletions lib/cadence/metadata.rb
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,15 @@ def activity_metadata_from(task, domain)
end

def decision_metadata_from(task, domain)
puts(task.workflowType.class)
Metadata::Decision.new(
domain: domain,
id: task.startedEventId,
task_token: task.taskToken,
attempt: task.attempt,
workflow_run_id: task.workflowExecution.runId,
workflow_id: task.workflowExecution.workflowId,
# task.workflowType.name
workflow_name: task.workflowType.name
)
end
Expand Down
9 changes: 7 additions & 2 deletions lib/cadence/workflow/context.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@ class Workflow
class Context
attr_reader :metadata

def initialize(state_manager, dispatcher, metadata, config)
def initialize(state_manager, dispatcher, metadata, config, query_registry)
@state_manager = state_manager
@dispatcher = dispatcher
@query_registry = query_registry
@metadata = metadata
@config = config
end
Expand Down Expand Up @@ -227,6 +228,10 @@ def on_signal(&block)
end
end

def on_query(query, &block)
query_registry.register(query, &block)
end

def cancel_activity(activity_id)
decision = Decision::RequestActivityCancellation.new(activity_id: activity_id)

Expand All @@ -246,7 +251,7 @@ def cancel(target, cancelation_id)

private

attr_reader :state_manager, :dispatcher, :config
attr_reader :state_manager, :dispatcher, :config, :query_registry

def schedule_decision(decision)
state_manager.schedule(decision)
Expand Down
Loading