diff --git a/.gitignore b/.gitignore index ee919347..1ef7a879 100644 --- a/.gitignore +++ b/.gitignore @@ -11,3 +11,5 @@ _yardoc/ # rspec failure tracking .rspec_status + +.idea/* \ No newline at end of file diff --git a/.idea/.gitignore b/.idea/.gitignore new file mode 100644 index 00000000..e7e9d11d --- /dev/null +++ b/.idea/.gitignore @@ -0,0 +1,2 @@ +# Default ignored files +/workspace.xml diff --git a/examples/bin/trigger_cron b/examples/bin/trigger_cron new file mode 100755 index 00000000..88a58194 --- /dev/null +++ b/examples/bin/trigger_cron @@ -0,0 +1,19 @@ +#!/usr/bin/env ruby +require_relative '../init' + +Dir[File.expand_path('../workflows/*.rb', __dir__)].each { |f| require f } + +workflow_class_name, schedule, *args = ARGV +workflow_class = Object.const_get(workflow_class_name) +workflow_id = SecureRandom.uuid + +# Convert integer strings to integers +input = args.map { |arg| Integer(arg) rescue arg } + +run_id = Cadence.schedule_workflow( + workflow_class, + schedule, + *input, + options: { workflow_id: workflow_id } +) +Cadence.logger.info "Started workflow: #{run_id} / #{workflow_id}" diff --git a/lib/cadence.rb b/lib/cadence.rb index 918877b3..190efa91 100644 --- a/lib/cadence.rb +++ b/lib/cadence.rb @@ -33,6 +33,29 @@ def start_workflow(workflow, *input, **args) response.runId end + def schedule_workflow(workflow, cron_schedule, *input, **args) + options = args.delete(:options) || {} + input << args unless args.empty? + + execution_options = ExecutionOptions.new(workflow, options) + workflow_id = options[:workflow_id] || SecureRandom.uuid + + response = client.start_workflow_execution( + domain: execution_options.domain, + workflow_id: workflow_id, + workflow_name: execution_options.name, + task_list: execution_options.task_list, + input: input, + execution_timeout: execution_options.timeouts[:execution], + task_timeout: execution_options.timeouts[:task], + workflow_id_reuse_policy: options[:workflow_id_reuse_policy], + headers: execution_options.headers, + cron_schedule: cron_schedule + ) + + response.runId + end + def register_domain(name, description = nil) client.register_domain(name: name, description: description) rescue CadenceThrift::DomainAlreadyExistsError diff --git a/lib/cadence/client/thrift_client.rb b/lib/cadence/client/thrift_client.rb index e062ca4a..4cd255b6 100644 --- a/lib/cadence/client/thrift_client.rb +++ b/lib/cadence/client/thrift_client.rb @@ -64,7 +64,8 @@ def start_workflow_execution( execution_timeout:, task_timeout:, workflow_id_reuse_policy: nil, - headers: nil + headers: nil, + cron_schedule: nil ) request = CadenceThrift::StartWorkflowExecutionRequest.new( identity: identity, @@ -82,7 +83,8 @@ def start_workflow_execution( requestId: SecureRandom.uuid, header: CadenceThrift::Header.new( fields: headers - ) + ), + cronSchedule: cron_schedule ) if workflow_id_reuse_policy diff --git a/lib/cadence/execution_options.rb b/lib/cadence/execution_options.rb index b54c4c03..c4572df4 100644 --- a/lib/cadence/execution_options.rb +++ b/lib/cadence/execution_options.rb @@ -9,6 +9,7 @@ def initialize(object, options = {}) @domain = options[:domain] @task_list = options[:task_list] @retry_policy = options[:retry_policy] + @cron_schedule = options[:cron_schedule] @timeouts = options[:timeouts] || {} @headers = options[:headers] || {} diff --git a/spec/unit/lib/cadence_spec.rb b/spec/unit/lib/cadence_spec.rb index 73e2ec5d..bac3d4ff 100644 --- a/spec/unit/lib/cadence_spec.rb +++ b/spec/unit/lib/cadence_spec.rb @@ -72,6 +72,36 @@ class TestStartWorkflow < Cadence::Workflow ) end + it 'starts a cron workflow' do + described_class.schedule_workflow( + TestStartWorkflow, + '* * * * *' + 42, + options: { + name: 'test-workflow', + domain: 'test-domain', + task_list: 'test-task-list', + headers: { 'Foo' => 'Bar' }, + + } + ) + + expect(client) + .to have_received(:start_workflow_execution) + .with( + domain: 'test-domain', + workflow_id: an_instance_of(String), + workflow_name: 'test-workflow', + task_list: 'test-task-list', + cron_schedule: '* * * * *', + input: [42], + task_timeout: Cadence.configuration.timeouts[:task], + execution_timeout: Cadence.configuration.timeouts[:execution], + workflow_id_reuse_policy: nil, + headers: { 'Foo' => 'Bar' } + ) + end + it 'starts a workflow using a mix of input, keyword arguments and options' do described_class.start_workflow( TestStartWorkflow,