Skip to content

Commit

Permalink
Add cron_schedule to StartWorkflowExecution (#21)
Browse files Browse the repository at this point in the history
* add cron schedules to ruby client

* add cron

* remove load test file

* address pr comments

* add schedule workflow

* add schedule from argv

* update tests
  • Loading branch information
carlcortright authored Sep 9, 2020
1 parent e17a917 commit f681793
Show file tree
Hide file tree
Showing 7 changed files with 81 additions and 2 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,5 @@ _yardoc/

# rspec failure tracking
.rspec_status

.idea/*
2 changes: 2 additions & 0 deletions .idea/.gitignore

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 19 additions & 0 deletions examples/bin/trigger_cron
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
#!/usr/bin/env ruby
require_relative '../init'

Dir[File.expand_path('../workflows/*.rb', __dir__)].each { |f| require f }

workflow_class_name, schedule, *args = ARGV
workflow_class = Object.const_get(workflow_class_name)
workflow_id = SecureRandom.uuid

# Convert integer strings to integers
input = args.map { |arg| Integer(arg) rescue arg }

run_id = Cadence.schedule_workflow(
workflow_class,
schedule,
*input,
options: { workflow_id: workflow_id }
)
Cadence.logger.info "Started workflow: #{run_id} / #{workflow_id}"
23 changes: 23 additions & 0 deletions lib/cadence.rb
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,29 @@ def start_workflow(workflow, *input, **args)
response.runId
end

def schedule_workflow(workflow, cron_schedule, *input, **args)
options = args.delete(:options) || {}
input << args unless args.empty?

execution_options = ExecutionOptions.new(workflow, options)
workflow_id = options[:workflow_id] || SecureRandom.uuid

response = client.start_workflow_execution(
domain: execution_options.domain,
workflow_id: workflow_id,
workflow_name: execution_options.name,
task_list: execution_options.task_list,
input: input,
execution_timeout: execution_options.timeouts[:execution],
task_timeout: execution_options.timeouts[:task],
workflow_id_reuse_policy: options[:workflow_id_reuse_policy],
headers: execution_options.headers,
cron_schedule: cron_schedule
)

response.runId
end

def register_domain(name, description = nil)
client.register_domain(name: name, description: description)
rescue CadenceThrift::DomainAlreadyExistsError
Expand Down
6 changes: 4 additions & 2 deletions lib/cadence/client/thrift_client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ def start_workflow_execution(
execution_timeout:,
task_timeout:,
workflow_id_reuse_policy: nil,
headers: nil
headers: nil,
cron_schedule: nil
)
request = CadenceThrift::StartWorkflowExecutionRequest.new(
identity: identity,
Expand All @@ -82,7 +83,8 @@ def start_workflow_execution(
requestId: SecureRandom.uuid,
header: CadenceThrift::Header.new(
fields: headers
)
),
cronSchedule: cron_schedule
)

if workflow_id_reuse_policy
Expand Down
1 change: 1 addition & 0 deletions lib/cadence/execution_options.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ def initialize(object, options = {})
@domain = options[:domain]
@task_list = options[:task_list]
@retry_policy = options[:retry_policy]
@cron_schedule = options[:cron_schedule]
@timeouts = options[:timeouts] || {}
@headers = options[:headers] || {}

Expand Down
30 changes: 30 additions & 0 deletions spec/unit/lib/cadence_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,36 @@ class TestStartWorkflow < Cadence::Workflow
)
end

it 'starts a cron workflow' do
described_class.schedule_workflow(
TestStartWorkflow,
'* * * * *'
42,
options: {
name: 'test-workflow',
domain: 'test-domain',
task_list: 'test-task-list',
headers: { 'Foo' => 'Bar' },

}
)

expect(client)
.to have_received(:start_workflow_execution)
.with(
domain: 'test-domain',
workflow_id: an_instance_of(String),
workflow_name: 'test-workflow',
task_list: 'test-task-list',
cron_schedule: '* * * * *',
input: [42],
task_timeout: Cadence.configuration.timeouts[:task],
execution_timeout: Cadence.configuration.timeouts[:execution],
workflow_id_reuse_policy: nil,
headers: { 'Foo' => 'Bar' }
)
end

it 'starts a workflow using a mix of input, keyword arguments and options' do
described_class.start_workflow(
TestStartWorkflow,
Expand Down

0 comments on commit f681793

Please sign in to comment.