diff --git a/examples/bin/query b/examples/bin/query new file mode 100644 index 00000000..dbe95b1d --- /dev/null +++ b/examples/bin/query @@ -0,0 +1,14 @@ +#!/usr/bin/env ruby +require_relative '../init' + +Dir[File.expand_path('../workflows/*.rb', __dir__)].each { |f| require f } + +workflow_class_name, workflow_id, run_id, query, args = ARGV +workflow_class = Object.const_get(workflow_class_name) + +if ![workflow_class, workflow_id, run_id, query].all? + fail 'Wrong arguments, use `bin/query WORKFLOW WORKFLOW_ID RUN_ID QUERY [ARGS]`' +end + +result = Cadence.query_workflow(workflow_class, query, workflow_id, run_id, args) +puts result.inspect \ No newline at end of file diff --git a/examples/bin/worker b/examples/bin/worker index 2d6735fe..4e3386c0 100755 --- a/examples/bin/worker +++ b/examples/bin/worker @@ -28,6 +28,7 @@ worker.register_workflow(SideEffectWorkflow) worker.register_workflow(SimpleTimerWorkflow) worker.register_workflow(TimeoutWorkflow) worker.register_workflow(TripBookingWorkflow) +worker.register_workflow(QueryWorkflow) worker.register_activity(AsyncActivity) worker.register_activity(EchoActivity) diff --git a/examples/spec/integration/query_workflow_spec.rb b/examples/spec/integration/query_workflow_spec.rb new file mode 100644 index 00000000..385b856f --- /dev/null +++ b/examples/spec/integration/query_workflow_spec.rb @@ -0,0 +1,55 @@ +require 'workflows/query_workflow' +require 'cadence/errors' + + +describe QueryWorkflow, :integration do + subject { described_class } + + it 'returns the correct result for the queries' do + workflow_id, run_id = run_workflow(described_class) + + # Query with nil workflow class + expect(Cadence.query_workflow(nil, 'state', workflow_id, run_id)) + .to eq 'started' + + # Query with arbitrary args + expect(Cadence.query_workflow(described_class, 'state', workflow_id, run_id, + 'upcase', 'ignored', 'reverse')) + .to eq 'DETRATS' + + # Query with no args + expect(Cadence.query_workflow(described_class, 'signal_count', workflow_id, run_id)) + .to eq 0 + + # Query with unregistered handler + expect { Cadence.query_workflow(described_class, 'unknown_query', workflow_id, run_id) } + .to raise_error(Cadence::QueryFailed, 'Workflow did not register a handler for unknown_query') + + Cadence.signal_workflow(described_class, 'make_progress', workflow_id, run_id) + + # Query for updated signal_count with an unsatisfied reject condition + expect(Cadence.query_workflow(described_class, 'signal_count', workflow_id, run_id, query_reject_condition: :not_open)) + .to eq 1 + + Cadence.signal_workflow(described_class, 'finish', workflow_id, run_id) + wait_for_workflow_completion(workflow_id, run_id) + + # Repeating original query scenarios above, expecting updated state and signal results + expect(Cadence.query_workflow(nil, 'state', workflow_id, run_id)) + .to eq 'finished' + + expect(Cadence.query_workflow(described_class, 'state', workflow_id, run_id, + 'upcase', 'ignored', 'reverse')) + .to eq 'DEHSINIF' + + expect(Cadence.query_workflow(described_class, 'signal_count', workflow_id, run_id)) + .to eq 2 + + expect { Cadence.query_workflow(described_class, 'unknown_query', workflow_id, run_id) } + .to raise_error(Cadence::QueryFailed, 'Workflow did not register a handler for unknown_query') + + # Now that the workflow is completed, test a query with a reject condition satisfied + expect { Cadence.query_workflow(described_class, 'state', workflow_id, run_id, query_reject_condition: :not_open) } + .to raise_error(Cadence::QueryFailed, 'Query rejected: status WORKFLOW_EXECUTION_STATUS_COMPLETED') + end +end \ No newline at end of file diff --git a/examples/workflows/query_workflow.rb b/examples/workflows/query_workflow.rb new file mode 100644 index 00000000..74759250 --- /dev/null +++ b/examples/workflows/query_workflow.rb @@ -0,0 +1,36 @@ +class QueryWorkflow < Cadence::Workflow + attr_reader :state, :signal_count, :last_signal_received + + def execute + @state = "started" + @signal_count = 0 + @last_signal_received = nil + + workflow.on_query("state") { |*args| apply_transforms(state, args) } + workflow.on_query("signal_count") { signal_count } + + workflow.on_signal do |signal| + @signal_count += 1 + @last_signal_received = signal + end + + workflow.wait_for { last_signal_received == "finish" } + @state = "finished" + + { + signal_count: signal_count, + last_signal_received: last_signal_received, + final_state: state + } + end + + private + + def apply_transforms(value, transforms) + return value if value.nil? || transforms.empty? + transforms.inject(value) do |memo, input| + next memo unless memo.respond_to?(input) + memo.public_send(input) + end + end +end \ No newline at end of file diff --git a/lib/cadence.rb b/lib/cadence.rb index 6c1f4c92..2bbaea22 100644 --- a/lib/cadence.rb +++ b/lib/cadence.rb @@ -12,6 +12,7 @@ module Cadence :schedule_workflow, :register_domain, :signal_workflow, + :query_workflow, :reset_workflow, :terminate_workflow, :fetch_workflow_execution_info, diff --git a/lib/cadence/client.rb b/lib/cadence/client.rb index fdb248fe..7ce12168 100644 --- a/lib/cadence/client.rb +++ b/lib/cadence/client.rb @@ -75,6 +75,18 @@ def signal_workflow(workflow, signal, workflow_id, run_id, input = nil) ) end + def query_workflow(workflow, query, workflow_id, run_id, *args, domain: nil, query_reject_condition: nil) + execution_options = ExecutionOptions.new(workflow, {}, config.default_execution_options) + connection.query_workflow( + domain: domain || execution_options.domain, + workflow_id: workflow_id, + run_id: run_id, + query: query, + args: args, + query_reject_condition: query_reject_condition + ) + end + def reset_workflow(domain, workflow_id, run_id, strategy: nil, decision_task_id: nil, reason: 'manual reset') # Pick default strategy for backwards-compatibility strategy ||= :last_decision_task unless decision_task_id diff --git a/lib/cadence/connection/thrift.rb b/lib/cadence/connection/thrift.rb index bbdf2a9a..ff83e1fd 100644 --- a/lib/cadence/connection/thrift.rb +++ b/lib/cadence/connection/thrift.rb @@ -14,15 +14,21 @@ class Thrift reject: CadenceThrift::WorkflowIdReusePolicy::RejectDuplicate }.freeze + QUERY_REJECT_CONDITION = { + # none: CadenceThrift::QueryRejectCondition::NONE, + not_open: CadenceThrift::QueryRejectCondition::NOT_OPEN, + not_completed_cleanly: CadenceThrift::QueryRejectCondition::NOT_COMPLETED_CLEANLY + }.freeze + DEFAULT_OPTIONS = { polling_ttl: 60, # 1 minute max_page_size: 100 }.freeze HISTORY_EVENT_FILTER = { - all: CadenceThrift::HistoryEventFilterType::ALL_EVENT, - close: CadenceThrift::HistoryEventFilterType::CLOSE_EVENT, - }.freeze + all: CadenceThrift::HistoryEventFilterType::ALL_EVENT, + close: CadenceThrift::HistoryEventFilterType::CLOSE_EVENT, + }.freeze def initialize(host, port, identity, options = {}) @url = "http://#{host}:#{port}" @@ -142,11 +148,12 @@ def poll_for_decision_task(domain:, task_list:) send_request('PollForDecisionTask', request) end - def respond_decision_task_completed(task_token:, decisions:) + def respond_decision_task_completed(task_token:, decisions:, query_results: {}) request = CadenceThrift::RespondDecisionTaskCompletedRequest.new( identity: identity, taskToken: task_token, - decisions: Array(decisions) + decisions: Array(decisions), + queryResults: query_results.transform_values { |value| Serializer.serialize(value) } ) send_request('RespondDecisionTaskCompleted', request) end @@ -337,16 +344,58 @@ def get_search_attributes raise NotImplementedError end - def respond_query_task_completed - raise NotImplementedError + def respond_query_task_completed(task_token:, query_result:) + query_result_thrift = Serializer.serialize(query_result) + request = CadenceThrift::RespondQueryTaskCompletedRequest.new( + taskToken: task_token, + completedType: query_result_thrift.result_type, + queryResult: query_result_thrift.answer, + errorMessage: query_result_thrift.error_message, + ) + + client.respond_query_task_completed(request) end def reset_sticky_task_list raise NotImplementedError end - def query_workflow - raise NotImplementedError + def query_workflow(domain:, workflow_id:, run_id:, query:, args: nil, query_reject_condition: nil) + request = CadenceThrift::QueryWorkflowRequest.new( + domain: domain, + execution: CadenceThrift::WorkflowExecution.new( + workflowId: workflow_id, + runId: run_id + ), + query: CadenceThrift::WorkflowQuery.new( + queryType: query, + queryArgs: JSON.serialize(args) + ) + ) + if query_reject_condition + condition = QUERY_REJECT_CONDITION[query_reject_condition] + raise Client::ArgumentError, 'Unknown query_reject_condition specified' unless condition + + request.query_reject_condition = condition + end + + begin + response = client.query_workflow(request) + puts(response) + # rescue InvalidArgument => e doesn't seem to work + # + rescue Error => e + raise Cadence::QueryFailed, e.details + end + + if response.query_rejected + rejection_status = response.query_rejected.status || 'not specified by server' + raise Cadence::QueryFailed, "Query rejected: status #{rejection_status}" + elsif !response.query_result + raise Cadence::QueryFailed, 'Invalid response from server' + else + JSON.deserialize(response.query_result) + end end def describe_workflow_execution(domain:, workflow_id:, run_id:) @@ -389,9 +438,9 @@ def transport def connection @connection ||= begin - protocol = ::Thrift::BinaryProtocol.new(transport) - CadenceThrift::WorkflowService::Client.new(protocol) - end + protocol = ::Thrift::BinaryProtocol.new(transport) + CadenceThrift::WorkflowService::Client.new(protocol) + end end def send_request(name, request) @@ -435,4 +484,4 @@ def serialize_status_filter(value) end end end -end +end \ No newline at end of file diff --git a/lib/cadence/errors.rb b/lib/cadence/errors.rb index 28fc4734..fb01ff91 100644 --- a/lib/cadence/errors.rb +++ b/lib/cadence/errors.rb @@ -18,4 +18,9 @@ class TimeoutError < ClientError; end # A superclass for activity exceptions raised explicitly # with the itent to propagate to a workflow class ActivityException < ClientError; end + + class ApiError < Error; end + + class QueryFailed < ApiError; end + end diff --git a/lib/cadence/workflow/context.rb b/lib/cadence/workflow/context.rb index 9c852f7a..a130263e 100644 --- a/lib/cadence/workflow/context.rb +++ b/lib/cadence/workflow/context.rb @@ -17,9 +17,10 @@ class Workflow class Context attr_reader :metadata - def initialize(state_manager, dispatcher, metadata, config) + def initialize(state_manager, dispatcher, metadata, config, query_registry) @state_manager = state_manager @dispatcher = dispatcher + @query_registry = query_registry @metadata = metadata @config = config end @@ -227,6 +228,10 @@ def on_signal(&block) end end + def on_query(query, &block) + query_registry.register(query, &block) + end + def cancel_activity(activity_id) decision = Decision::RequestActivityCancellation.new(activity_id: activity_id) @@ -246,7 +251,7 @@ def cancel(target, cancelation_id) private - attr_reader :state_manager, :dispatcher, :config + attr_reader :state_manager, :dispatcher, :config, :query_registry def schedule_decision(decision) state_manager.schedule(decision) diff --git a/lib/cadence/workflow/decision_task_processor.rb b/lib/cadence/workflow/decision_task_processor.rb index 1781cc41..8b5b9942 100644 --- a/lib/cadence/workflow/decision_task_processor.rb +++ b/lib/cadence/workflow/decision_task_processor.rb @@ -7,7 +7,19 @@ module Cadence class Workflow class DecisionTaskProcessor + Query = Struct.new(:query) do + + def query_type + query.queryType + end + + def query_args + JSON.deserialize(query.queryArgs) + end + end + MAX_FAILED_ATTEMPTS = 50 + LEGACY_QUERY_KEY = :legacy_query def initialize(task, domain, workflow_lookup, middleware_chain, config) @task = task @@ -39,7 +51,14 @@ def process executor.run end - complete_task(decisions) + query_results = executor.process_queries(parse_queries) + + if legacy_query_task? + complete_query(query_results[LEGACY_QUERY_KEY]) + else + complete_task(decisions, query_results) + end + rescue StandardError => error fail_task(error.inspect) Cadence.logger.debug(error.backtrace.join("\n")) @@ -53,7 +72,7 @@ def process private attr_reader :task, :domain, :task_token, :workflow_name, :workflow_class, - :middleware_chain, :config, :metadata + :middleware_chain, :config, :metadata def connection @connection ||= Cadence::Connection.generate(config.for_connection) @@ -86,16 +105,43 @@ def fetch_full_history Workflow::History.new(events) end - def complete_task(decisions) + def legacy_query_task? + !!task.query + end + + def parse_queries + # Support for deprecated query style + if legacy_query_task? + { LEGACY_QUERY_KEY => Query.new(task.query) } + else + return {} if task.queries.nil? + task.queries.each_with_object({}) do |(query_id, query), result| + result[query_id] = Query.new(query) + end + end + end + + def complete_task(decisions, query_results) Cadence.logger.info("Decision task for #{workflow_name} completed") connection.respond_decision_task_completed( task_token: task_token, - decisions: serialize_decisions(decisions) + decisions: serialize_decisions(decisions), + query_results: query_results + ) + end + + def complete_query(result) + Cadence.logger.info("Workflow Query task completed", metadata.to_h) + + connection.respond_query_task_completed( + task_token: task_token, + query_result: result ) rescue StandardError => error - Cadence.logger.error("Unable to complete Decision task #{workflow_name}: #{error.inspect}") - Cadence::ErrorHandler.handle(error, metadata: metadata) + Cadence.logger.error("Unable to complete a query", metadata.to_h.merge(error: error.inspect)) + + Cadence::ErrorHandler.handle(error, config, metadata: metadata) end def fail_task(message) diff --git a/lib/cadence/workflow/executor.rb b/lib/cadence/workflow/executor.rb index daf5b77f..ff09b8b0 100644 --- a/lib/cadence/workflow/executor.rb +++ b/lib/cadence/workflow/executor.rb @@ -1,6 +1,7 @@ require 'fiber' require 'cadence/workflow/dispatcher' +require 'cadence/workflow/query_registry' require 'cadence/workflow/state_manager' require 'cadence/workflow/context' require 'cadence/workflow/history/event_target' @@ -12,6 +13,7 @@ class Executor def initialize(workflow_class, history, metadata, config) @workflow_class = workflow_class @dispatcher = Dispatcher.new + @query_registry = QueryRegistry.new @state_manager = StateManager.new(dispatcher) @metadata = metadata @history = history @@ -32,13 +34,32 @@ def run return state_manager.decisions end + # Process queries using the pre-registered query handlers + # + # @note this method is expected to be executed after the history has + # been fully replayed (by invoking the #run method) + # + # @param queries [Hash] + # + # @return [Hash] + def process_queries(queries = {}) + queries.transform_values(&method(:process_query)) + end + private - attr_reader :workflow_class, :dispatcher, :state_manager, :metadata, :history, :config + attr_reader :workflow_class, :dispatcher, :state_manager, :metadata, :history, :config, :query_registry + + def process_query(query) + result = query_registry.handle(query.query_type, query.query_args) + QueryResult.answer(result) + rescue StandardError => error + QueryResult.failure(error) + end def execute_workflow(input, workflow_started_event_attributes) metadata = generate_workflow_metadata_from(workflow_started_event_attributes) - context = Workflow::Context.new(state_manager, dispatcher, metadata, config) + context = Workflow::Context.new(state_manager, dispatcher, metadata, config, query_registry) Fiber.new do workflow_class.execute_in_context(context, input) diff --git a/lib/cadence/workflow/query_registry.rb b/lib/cadence/workflow/query_registry.rb new file mode 100644 index 00000000..f3d472e5 --- /dev/null +++ b/lib/cadence/workflow/query_registry.rb @@ -0,0 +1,34 @@ +require 'cadence/errors' + +module Cadence + class Workflow + class QueryRegistry + def initialize + @handlers = {} + end + + def register(type, &handler) + if handlers.key?(type) + warn "[NOTICE] Overwriting a query handler for #{type}" + end + + handlers[type] = handler + end + + def handle(type, args = nil) + handler = handlers[type] + puts(handler) + + unless handler + raise Cadence::QueryFailed, "Workflow did not register a handler for #{type}" + end + + handler.call(*args) + end + + private + + attr_reader :handlers + end + end +end diff --git a/lib/cadence/workflow/query_result.rb b/lib/cadence/workflow/query_result.rb new file mode 100644 index 00000000..ab119f51 --- /dev/null +++ b/lib/cadence/workflow/query_result.rb @@ -0,0 +1,16 @@ +module Cadence + class Workflow + module QueryResult + Answer = Struct.new(:result) + Failure = Struct.new(:error) + + def self.answer(result) + Answer.new(result).freeze + end + + def self.failure(error) + Failure.new(error).freeze + end + end + end +end \ No newline at end of file diff --git a/lib/cadence/workflow/serializer.rb b/lib/cadence/workflow/serializer.rb index dcc891f9..f9879a6b 100644 --- a/lib/cadence/workflow/serializer.rb +++ b/lib/cadence/workflow/serializer.rb @@ -1,4 +1,5 @@ require 'cadence/workflow/decision' +require 'cadence/workflow/query_result' require 'cadence/workflow/serializer/schedule_activity' require 'cadence/workflow/serializer/start_child_workflow' require 'cadence/workflow/serializer/request_activity_cancellation' @@ -7,6 +8,8 @@ require 'cadence/workflow/serializer/cancel_timer' require 'cadence/workflow/serializer/complete_workflow' require 'cadence/workflow/serializer/fail_workflow' +require 'cadence/workflow/serializer/query_answer' +require 'cadence/workflow/serializer/query_failure' module Cadence class Workflow @@ -19,7 +22,9 @@ module Serializer Workflow::Decision::StartTimer => Serializer::StartTimer, Workflow::Decision::CancelTimer => Serializer::CancelTimer, Workflow::Decision::CompleteWorkflow => Serializer::CompleteWorkflow, - Workflow::Decision::FailWorkflow => Serializer::FailWorkflow + Workflow::Decision::FailWorkflow => Serializer::FailWorkflow, + Workflow::QueryResult::Answer => Serializer::QueryAnswer, + Workflow::QueryResult::Failure => Serializer::QueryFailure, }.freeze def self.serialize(object) diff --git a/lib/cadence/workflow/serializer/query_answer.rb b/lib/cadence/workflow/serializer/query_answer.rb new file mode 100644 index 00000000..62fc63f1 --- /dev/null +++ b/lib/cadence/workflow/serializer/query_answer.rb @@ -0,0 +1,16 @@ +require 'cadence/workflow/serializer/base' + +module Cadence + class Workflow + module Serializer + class QueryAnswer < Base + def to_thrift + CadenceThrift::WorkflowQueryResult.new( + resultType: CadenceThrift::QueryResultType::ANSWERED, + answer: JSON.serialize(object.result) + ) + end + end + end + end +end \ No newline at end of file diff --git a/lib/cadence/workflow/serializer/query_failure.rb b/lib/cadence/workflow/serializer/query_failure.rb new file mode 100644 index 00000000..0a256ec5 --- /dev/null +++ b/lib/cadence/workflow/serializer/query_failure.rb @@ -0,0 +1,16 @@ +require 'cadence/workflow/serializer/base' + +module Cadence + class Workflow + module Serializer + class QueryFailure < Base + def to_thrift + CadenceThrift::WorkflowQueryResult.new( + resultType: CadenceThrift::QueryResultType::FAILED, + errorReason: object.error.message + ) + end + end + end + end +end diff --git a/spec/fabricators/thrift/decision_task_fabricator.rb b/spec/fabricators/thrift/decision_task_fabricator.rb index c100bbc1..eb30fff6 100644 --- a/spec/fabricators/thrift/decision_task_fabricator.rb +++ b/spec/fabricators/thrift/decision_task_fabricator.rb @@ -14,4 +14,5 @@ history { Fabricate(:history_thrift) } nextPageToken nil attempt 1 + query { nil } end diff --git a/spec/fabricators/thrift/workflow_query_fabricator.rb b/spec/fabricators/thrift/workflow_query_fabricator.rb new file mode 100644 index 00000000..b198ce5b --- /dev/null +++ b/spec/fabricators/thrift/workflow_query_fabricator.rb @@ -0,0 +1,6 @@ +Fabricator(:api_workflow_query, from: CadenceThrift::WorkflowQuery) do + queryType { 'state' } + # might need to change the line below + queryArgs { Cadence::JSON.serialize(['']) } +end + diff --git a/spec/unit/lib/cadence/connection/thrift_spec.rb b/spec/unit/lib/cadence/connection/thrift_spec.rb index 4006c6a3..b3b6194e 100644 --- a/spec/unit/lib/cadence/connection/thrift_spec.rb +++ b/spec/unit/lib/cadence/connection/thrift_spec.rb @@ -223,4 +223,4 @@ end end end -end +end \ No newline at end of file diff --git a/spec/unit/lib/cadence/workflow/context_spec.rb b/spec/unit/lib/cadence/workflow/context_spec.rb index 62a7d566..f461054b 100644 --- a/spec/unit/lib/cadence/workflow/context_spec.rb +++ b/spec/unit/lib/cadence/workflow/context_spec.rb @@ -2,46 +2,73 @@ require 'cadence/workflow/context' require 'cadence/workflow/dispatcher' require 'cadence/configuration' +require 'cadence/metadata/workflow' + +class MyTestWorkflow < Cadence::Workflow; end describe Cadence::Workflow::Context do - subject { described_class.new(state_manager, dispatcher, metadata, config) } - - let(:state_manager) { instance_double('Cadence::Workflow::StateManager') } - let(:dispatcher) { Cadence::Workflow::Dispatcher.new } - let(:metadata_hash) do - { - domain: 'test-domain', - id: SecureRandom.uuid, - name: 'TestWorkflow', - run_id: SecureRandom.uuid, - attempt: 0, - timeouts: { execution: 15, task: 10 }, - headers: { 'TestHeader' => 'Value' } - } - end - let(:metadata) { Cadence::Metadata::Workflow.new(metadata_hash) } - let(:config) { Cadence::Configuration.new } + subject { described_class.new(state_manager, dispatcher, metadata, config, query_registry) } + + let(:state_manager) { instance_double('Cadence::Workflow::StateManager') } + let(:dispatcher) { Cadence::Workflow::Dispatcher.new } + let(:metadata_hash) do + { + domain: 'test-domain', + id: SecureRandom.uuid, + name: 'TestWorkflow', + run_id: SecureRandom.uuid, + attempt: 0, + timeouts: { execution: 15, task: 10 }, + headers: { 'TestHeader' => 'Value' } + } + end + let(:metadata) { Cadence::Metadata::Workflow.new(metadata_hash) } + let(:config) { Cadence::Configuration.new } + let(:query_registry) { instance_double('Cadence::Workflow::QueryRegistry') } + let(:workflow_context) do + Cadence::Workflow::Context.new( + state_manager, + dispatcher, + metadata, + Cadence.configuration, + query_registry, + ) + end + + describe '#on_query' do + let(:handler) { Proc.new {} } + + before { allow(query_registry).to receive(:register) } + + it 'registers a query with the query registry' do + workflow_context.on_query('test-query', &handler) - describe '#headers' do - it 'returns metadata headers' do - expect(subject.headers).to eq('TestHeader' => 'Value') + expect(query_registry).to have_received(:register).with('test-query') do |&block| + expect(block).to eq(handler) end end + end + + describe '#headers' do + it 'returns metadata headers' do + expect(workflow_context.headers).to eq('TestHeader' => 'Value') + end + end + + describe '.sleep_until' do + let(:start_time) { Time.now } + let(:end_time) { Time.now + 1 } + let(:delay_time) { (end_time - start_time).to_i } + + before do + allow(state_manager).to receive(:local_time).and_return(start_time) + allow(subject).to receive(:sleep) + end - describe '.sleep_until' do - let(:start_time) { Time.now} - let(:end_time) { Time.now + 1} - let(:delay_time) { (end_time-start_time).to_i } - - before do - allow(state_manager).to receive(:local_time).and_return(start_time) - allow(subject).to receive(:sleep) - end - - it 'sleeps until the given end_time' do - subject.sleep_until(end_time) - # Since sleep_until uses, sleep, just make sure that sleep is called with the delay_time - expect(subject).to have_received(:sleep).with(delay_time) - end + it 'sleeps until the given end_time' do + subject.sleep_until(end_time) + # Since sleep_until uses, sleep, just make sure that sleep is called with the delay_time + expect(subject).to have_received(:sleep).with(delay_time) end + end end diff --git a/spec/unit/lib/cadence/workflow/decision_task_processor_spec.rb b/spec/unit/lib/cadence/workflow/decision_task_processor_spec.rb index b3e702ec..a44e6048 100644 --- a/spec/unit/lib/cadence/workflow/decision_task_processor_spec.rb +++ b/spec/unit/lib/cadence/workflow/decision_task_processor_spec.rb @@ -11,7 +11,11 @@ class TestWorkflow < Cadence::Workflow; end subject { described_class.new(task, domain, lookup, middleware_chain, config) } - let(:task) { Fabricate(:decision_task_thrift) } + let(:query) { nil } + let(:queries) { nil } + let(:task) { Fabricate(:decision_task_thrift, { workflowType: workflow_type_thrift, query: query, queries: queries }.compact) } + let(:workflow_type_thrift) { Fabricate(:workflow_type_thrift, name: workflow_name) } + let(:workflow_name) { 'TestWorkflow' } let(:domain) { 'test-domain' } let(:lookup) { Cadence::ExecutableLookup.new } let(:connection) do @@ -36,7 +40,7 @@ class TestWorkflow < Cadence::Workflow; end allow(Cadence.logger).to receive(:error) allow(Cadence.logger).to receive(:debug) allow(Cadence::ErrorHandler).to receive(:handle) - + allow(connection).to receive(:respond_query_task_completed) allow(Cadence::Metadata) .to receive(:generate) .with(Cadence::Metadata::DECISION_TYPE, task, domain) @@ -51,6 +55,7 @@ class TestWorkflow < Cadence::Workflow; end .to receive(:new) .with(TestWorkflow, an_instance_of(Cadence::Workflow::History), metadata, config) .and_return(executor) + allow(executor).to receive(:process_queries) end it 'runs the workflow executor' do diff --git a/spec/unit/lib/cadence/workflow/executor_spec.rb b/spec/unit/lib/cadence/workflow/executor_spec.rb index 3612e037..72943bb0 100644 --- a/spec/unit/lib/cadence/workflow/executor_spec.rb +++ b/spec/unit/lib/cadence/workflow/executor_spec.rb @@ -1,6 +1,8 @@ require 'cadence/workflow/executor' require 'cadence/workflow/history' require 'cadence/workflow' +require 'cadence/workflow/decision_task_processor' +require 'cadence/workflow/query_registry' describe Cadence::Workflow::Executor do subject { described_class.new(workflow, history, decision_metadata, config) } @@ -8,11 +10,11 @@ let(:workflow_started_event) { Fabricate(:workflow_execution_started_event_thrift, eventId: 1) } let(:history) do Cadence::Workflow::History.new([ - workflow_started_event, - Fabricate(:decision_task_scheduled_event_thrift, eventId: 2), - Fabricate(:decision_task_started_event_thrift, eventId: 3), - Fabricate(:decision_task_completed_event_thrift, eventId: 4) - ]) + workflow_started_event, + Fabricate(:decision_task_scheduled_event_thrift, eventId: 2), + Fabricate(:decision_task_started_event_thrift, eventId: 3), + Fabricate(:decision_task_completed_event_thrift, eventId: 4) + ]) end let(:workflow) { TestWorkflow } let(:decision_metadata) { Fabricate(:decision_metadata) } @@ -32,10 +34,10 @@ def execute expect(workflow) .to have_received(:execute_in_context) - .with( - an_instance_of(Cadence::Workflow::Context), - nil - ) + .with( + an_instance_of(Cadence::Workflow::Context), + nil + ) end it 'returns a complete workflow decision' do @@ -59,18 +61,52 @@ def execute event_attributes = workflow_started_event.workflowExecutionStartedEventAttributes expect(Cadence::Metadata::Workflow) .to have_received(:new) - .with( - domain: decision_metadata.domain, - id: decision_metadata.workflow_id, - name: event_attributes.workflowType.name, - run_id: event_attributes.originalExecutionRunId, - attempt: event_attributes.attempt, - headers: { 'Foo' => 'Bar' }, - timeouts: { - execution: event_attributes.executionStartToCloseTimeoutSeconds, - task: event_attributes.taskStartToCloseTimeoutSeconds - } - ) + .with( + domain: decision_metadata.domain, + id: decision_metadata.workflow_id, + name: event_attributes.workflowType.name, + run_id: event_attributes.originalExecutionRunId, + attempt: event_attributes.attempt, + headers: { 'Foo' => 'Bar' }, + timeouts: { + execution: event_attributes.executionStartToCloseTimeoutSeconds, + task: event_attributes.taskStartToCloseTimeoutSeconds + } + ) + end + end + + describe '#process_queries' do + let(:query_registry) { Cadence::Workflow::QueryRegistry.new } + let(:query_1_result) { 42 } + let(:query_2_error) { StandardError.new('Test query failure') } + let(:queries) do + { + '1' => Cadence::Workflow::DecisionTaskProcessor::Query.new(Fabricate(:api_workflow_query, queryType: 'success')), + '2' => Cadence::Workflow::DecisionTaskProcessor::Query.new(Fabricate(:api_workflow_query, queryType: 'failure')), + '3' => Cadence::Workflow::DecisionTaskProcessor::Query.new(Fabricate(:api_workflow_query, queryType: 'unknown')), + } + end + + before do + allow(Cadence::Workflow::QueryRegistry).to receive(:new).and_return(query_registry) + query_registry.register('success') { query_1_result } + query_registry.register('failure') { raise query_2_error } + end + + it 'returns query results' do + results = subject.process_queries(queries) + expect(results.length).to eq(3) + expect(results['1']).to be_a(Cadence::Workflow::QueryResult::Answer) + expect(results['1'].result).to eq(query_1_result) + expect(results['2']).to be_a(Cadence::Workflow::QueryResult::Failure) + expect(results['2'].error).to eq(query_2_error) + expect(results['3']).to be_a(Cadence::Workflow::QueryResult::Failure) + expect(results['3'].error).to be_a(Cadence::QueryFailed) + expect(results['3'].error.message).to eq('Workflow did not register a handler for unknown') end end end + + + diff --git a/spec/unit/lib/cadence/workflow/query_registry_spec.rb b/spec/unit/lib/cadence/workflow/query_registry_spec.rb new file mode 100644 index 00000000..7dfdb72d --- /dev/null +++ b/spec/unit/lib/cadence/workflow/query_registry_spec.rb @@ -0,0 +1,67 @@ +require 'cadence/workflow/query_registry' + +describe Cadence::Workflow::QueryRegistry do + subject { described_class.new } + + describe '#register' do + let(:handler) { Proc.new {} } + + it 'registers a query handler' do + subject.register('test-query', &handler) + + expect(subject.send(:handlers)['test-query']).to eq(handler) + end + + context 'when query handler is already registered' do + let(:handler_2) { Proc.new {} } + + before { subject.register('test-query', &handler) } + + it 'warns' do + allow(subject).to receive(:warn) + + subject.register('test-query', &handler_2) + + expect(subject) + .to have_received(:warn) + .with('[NOTICE] Overwriting a query handler for test-query') + end + + it 're-registers a query handler' do + subject.register('test-query', &handler_2) + + expect(subject.send(:handlers)['test-query']).to eq(handler_2) + end + end + end + + describe '#handle' do + context 'when a query handler has been registered' do + let(:handler) { Proc.new { 42 } } + + before { subject.register('test-query', &handler) } + + it 'runs the handler and returns the result' do + expect(subject.handle('test-query')).to eq(42) + end + end + + context 'when a query handler has been registered with args' do + let(:handler) { Proc.new { |arg_1, arg_2| arg_1 + arg_2 } } + + before { subject.register('test-query', &handler) } + + it 'runs the handler and returns the result' do + expect(subject.handle('test-query', [3, 5])).to eq(8) + end + end + + context 'when a query handler has not been registered' do + it 'raises' do + expect do + subject.handle('test-query') + end.to raise_error(Cadence::QueryFailed, 'Workflow did not register a handler for test-query') + end + end + end +end \ No newline at end of file diff --git a/spec/unit/lib/cadence/workflow/query_result_spec.rb b/spec/unit/lib/cadence/workflow/query_result_spec.rb new file mode 100644 index 00000000..4c5704f3 --- /dev/null +++ b/spec/unit/lib/cadence/workflow/query_result_spec.rb @@ -0,0 +1,25 @@ +require 'cadence/workflow/query_result' + +describe Cadence::Workflow::QueryResult do + describe '.answer' do + it 'returns an answer query result' do + result = described_class.answer(42) + + expect(result).to be_a(Cadence::Workflow::QueryResult::Answer) + expect(result).to be_frozen + expect(result.result).to eq(42) + end + end + + describe '.failure' do + let(:error) { StandardError.new('Test query failure') } + + it 'returns a failure query result' do + result = described_class.failure(error) + + expect(result).to be_a(Cadence::Workflow::QueryResult::Failure) + expect(result).to be_frozen + expect(result.error).to eq(error) + end + end +end \ No newline at end of file diff --git a/spec/unit/lib/cadence/workflow/serializer/query_answer_spec.rb b/spec/unit/lib/cadence/workflow/serializer/query_answer_spec.rb new file mode 100644 index 00000000..5c57b83f --- /dev/null +++ b/spec/unit/lib/cadence/workflow/serializer/query_answer_spec.rb @@ -0,0 +1,25 @@ +require 'cadence/workflow/serializer/query_failure' +require 'cadence/workflow/query_result' +require 'cadence/workflow/serializer/query_answer' + +describe Cadence::Workflow::Serializer::QueryAnswer do + class TestDeserializer + end + + describe 'to_thrift' do + let(:query_result) { Cadence::Workflow::QueryResult.answer(42) } + let(:query_workflow_result) do + CadenceThrift::WorkflowQueryResult.new( + answer: 42 + ) + end + it 'produces a thrift object' do + result = described_class.new(query_result).to_thrift + + expect(result).to be_a(CadenceThrift::WorkflowQueryResult) + expect(result.resultType).to eq(CadenceThrift::QueryResultType::ANSWERED + ) + expect(result.answer).to eq(Cadence::JSON.serialize(query_workflow_result.answer)) + end + end +end \ No newline at end of file diff --git a/spec/unit/lib/cadence/workflow/serializer/query_failure_spec.rb b/spec/unit/lib/cadence/workflow/serializer/query_failure_spec.rb new file mode 100644 index 00000000..570cb013 --- /dev/null +++ b/spec/unit/lib/cadence/workflow/serializer/query_failure_spec.rb @@ -0,0 +1,18 @@ +require 'cadence/workflow/serializer/query_failure' +require 'cadence/workflow/query_result' + +describe Cadence::Workflow::Serializer::QueryFailure do + describe 'to_thrift' do + let(:exception) { StandardError.new('Test query failure') } + let(:query_result) { Cadence::Workflow::QueryResult.failure(exception) } + + it 'produces a thrift object' do + result = described_class.new(query_result).to_thrift + + expect(result).to be_a(CadenceThrift::WorkflowQueryResult) + expect(result.resultType).to eq(CadenceThrift::QueryResultType::FAILED + ) + expect(result.errorReason).to eq('Test query failure') + end + end +end \ No newline at end of file