Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sidekiq 7 #2237

Draft
wants to merge 25 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion app/controllers/subscribers_auth_token_controller.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ def auth_token
token = generate_token(subscriber)
email = build_email(subscriber, token)

SendEmailWorker
SendEmailJob
.perform_async_in_queue(email.id, queue: :send_email_transactional)

render json: { subscriber: }, status: :created
Expand Down
2 changes: 1 addition & 1 deletion app/controllers/subscribers_govuk_account_controller.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def link_subscriber_to_account
subscriber:,
)

SendEmailWorker.perform_async_in_queue(
SendEmailJob.perform_async_in_queue(
email.id,
queue: :send_email_transactional,
)
Expand Down
2 changes: 1 addition & 1 deletion app/controllers/subscriptions_auth_token_controller.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ def auth_token
token = generate_token
email = build_email(token, subscriber_list)

SendEmailWorker
SendEmailJob
.perform_async_in_queue(email.id, queue: :send_email_transactional)

render json: {}, status: :ok
Expand Down
2 changes: 1 addition & 1 deletion app/controllers/subscriptions_controller.rb
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,6 @@ def send_confirmation_email(subscription)
return unless subscription[:new_record]

email = SubscriptionConfirmationEmailBuilder.call(subscription: subscription[:record])
SendEmailWorker.perform_async_in_queue(email.id, queue: :send_email_transactional)
SendEmailJob.perform_async_in_queue(email.id, queue: :send_email_transactional)
end
end
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
class ApplicationWorker
include Sidekiq::Worker
class ApplicationJob
include Sidekiq::Job

private

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
class BulkMigrateListWorker < ApplicationWorker
class BulkMigrateListJob < ApplicationJob
def perform(source_list_id, destination_list_id)
@source_list_id = source_list_id
@destination_list_id = destination_list_id
Expand Down Expand Up @@ -67,7 +67,7 @@ def send_confirmation_message
destination_id: destination_list.id,
count: subscribers_to_move_count,
)
SendEmailWorker.perform_async_in_queue(email.id, queue: :send_email_transactional)
SendEmailJob.perform_async_in_queue(email.id, queue: :send_email_transactional)
logger.info("Migration of subscriberlist #{source_list.id} complete. Email with id #{email.id} queued for delivery")
end
end
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
class BulkUnsubscribeListWorker < ApplicationWorker
class BulkUnsubscribeListJob < ApplicationJob
sidekiq_options queue: :process_and_generate_emails

def perform(subscriber_list_id, message_id)
run_with_advisory_lock(SubscriberList, subscriber_list_id) do
ProcessMessageWorker.new.perform(message_id) if message_id
ProcessMessageJob.new.perform(message_id) if message_id

Subscription.active.where(subscriber_list_id:).update_all(
ended_reason: :bulk_unsubscribed,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
class DailyDigestInitiatorWorker < ApplicationWorker
class DailyDigestInitiatorJob < ApplicationJob
def perform(date = Date.current.to_s)
run_with_advisory_lock(DigestRun, "#{date}-#{Frequency::DAILY}") do
DigestInitiatorService.call(date: Date.parse(date), range: Frequency::DAILY)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
class DigestEmailGenerationWorker < ApplicationWorker
class DigestEmailGenerationJob < ApplicationJob
sidekiq_options queue: :email_generation_digest

def perform(digest_run_subscriber_id)
Expand All @@ -23,7 +23,7 @@ def perform(digest_run_subscriber_id)
end

email_ids.each do |email_id|
SendEmailWorker.perform_async_in_queue(email_id, queue: :send_email_digest)
SendEmailJob.perform_async_in_queue(email_id, queue: :send_email_digest)
end
end

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
class DigestRunCompletionMarkerWorker < ApplicationWorker
class DigestRunCompletionMarkerJob < ApplicationJob
def perform
candidates = DigestRun.where.not(processed_at: nil).where(completed_at: nil)
candidates.find_each do |digest_run|
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
class EmailDeletionWorker < ApplicationWorker
class EmailDeletionJob < ApplicationJob
def perform
run_with_advisory_lock(Email, "delete") do
start_time = Time.zone.now
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
class HistoricalDataDeletionWorker < ApplicationWorker
class HistoricalDataDeletionJob < ApplicationJob
def perform
# cascades matched content changes
delete_and_log("content changes") { ContentChange.where("created_at < ?", max_retention_period) }
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
class MetricsCollectionWorker < ApplicationWorker
class MetricsCollectionJob < ApplicationJob
def perform
ContentChangeExporter.call
DigestRunExporter.call
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
class MetricsCollectionWorker::BaseExporter
class MetricsCollectionJob::BaseExporter
def self.call
new.call
end
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
class MetricsCollectionWorker::ContentChangeExporter < MetricsCollectionWorker::BaseExporter
class MetricsCollectionJob::ContentChangeExporter < MetricsCollectionJob::BaseExporter
def call
GovukStatsd.gauge("content_changes.unprocessed_total", unprocessed_content_changes)
end
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
class MetricsCollectionWorker::DigestRunExporter < MetricsCollectionWorker::BaseExporter
class MetricsCollectionJob::DigestRunExporter < MetricsCollectionJob::BaseExporter
def call
critical_digest_runs = DigestRun.where("created_at < ?", 2.hours.ago)
.where(completed_at: nil)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
class MetricsCollectionWorker::MessageExporter < MetricsCollectionWorker::BaseExporter
class MetricsCollectionJob::MessageExporter < MetricsCollectionJob::BaseExporter
def call
GovukStatsd.gauge("messages.unprocessed_total", unprocessed_messages)
end
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
class NullifySubscribersWorker < ApplicationWorker
class NullifySubscribersJob < ApplicationJob
def perform
run_with_advisory_lock(Subscriber, "nullify") do
nullifyable_subscribers.each do |s|
begin
GdsApi.account_api.delete_user_by_subject_identifier(subject_identifier: s.govuk_account_id) unless s.govuk_account_id.nil?
rescue GdsApi::HTTPNotFound
Rails.logger.warn("NullifySubscribersWorker tried to remove account id #{s.govuk_account_id}, but couldn't find it.")
Rails.logger.warn("NullifySubscribersJob tried to remove account id #{s.govuk_account_id}, but couldn't find it.")
end
s.update!(address: nil, govuk_account_id: nil, updated_at: Time.zone.now)
end
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
class PollingAlertCheckWorker < ApplicationWorker
class PollingAlertCheckJob < ApplicationJob
include SearchAlertList

def perform(document_type)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
class ProcessContentChangeWorker < ApplicationWorker
class ProcessContentChangeJob < ApplicationJob
sidekiq_options queue: :process_and_generate_emails

def perform(content_change_id)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
class ProcessMessageWorker < ApplicationWorker
class ProcessMessageJob < ApplicationJob
sidekiq_options queue: :process_and_generate_emails

def perform(message_id)
Expand Down
7 changes: 7 additions & 0 deletions app/jobs/recover_lost_jobs_job.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
class RecoverLostJobsJob < ApplicationJob
def perform
RecoverLostJobsJob::UnprocessedCheck.new.call
RecoverLostJobsJob::MissingDigestRunsCheck.new.call
RecoverLostJobsJob::OldPendingEmailsCheck.new.call
end
end
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
class RecoverLostJobsWorker::MissingDigestRunsCheck
class RecoverLostJobsJob::MissingDigestRunsCheck
def call
recover(DailyDigestInitiatorWorker, non_existent_daily_digests)
recover(WeeklyDigestInitiatorWorker, non_existent_weekly_digests)
recover(DailyDigestInitiatorJob, non_existent_daily_digests)
recover(WeeklyDigestInitiatorJob, non_existent_weekly_digests)
end

private
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
class RecoverLostJobsWorker::OldPendingEmailsCheck
class RecoverLostJobsJob::OldPendingEmailsCheck
def call
old_pending_emails = Email.where(status: :pending)
.where("created_at <= ?", 3.hours.ago)
Expand All @@ -11,7 +11,7 @@ def call
def recover(old_pending_emails)
old_pending_emails.in_batches do |relation|
relation.pluck(:id).each do |id|
SendEmailWorker.perform_async_in_queue(id, queue: :send_email_immediate)
SendEmailJob.perform_async_in_queue(id, queue: :send_email_immediate)
end
end
end
Expand Down
19 changes: 19 additions & 0 deletions app/jobs/recover_lost_jobs_job/unprocessed_check.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
class RecoverLostJobsJob::UnprocessedCheck
def call
recover(ProcessContentChangeJob, old_unprocessed(ContentChange).pluck(:id))
recover(ProcessMessageJob, old_unprocessed(Message).pluck(:id))
recover(DigestEmailGenerationJob, old_unprocessed(DigestRunSubscriber).pluck(:id))
recover(DailyDigestInitiatorJob, old_unprocessed(DigestRun.daily).pluck(:date).map(&:to_s))
recover(WeeklyDigestInitiatorJob, old_unprocessed(DigestRun.weekly).pluck(:date).map(&:to_s))
end

private

def old_unprocessed(scope)
scope.where(processed_at: nil).where("created_at <= ?", 1.hour.ago)
end

def recover(worker, work)
work.each { |arg| worker.perform_async(arg) }
end
end
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
class SendEmailWorker < ApplicationWorker
class SendEmailJob < ApplicationJob
# More information around the rate limit can be found here ->
# https://docs.publishing.service.gov.uk/manual/govuk-notify.html under "GOV.UK Emails".
RATE_LIMIT_THRESHOLD = 21_600 # max requests in a minute, equates to 350 a second
Expand All @@ -7,7 +7,7 @@ class SendEmailWorker < ApplicationWorker
def perform(email_id, metrics, queue)
if rate_limit_exceeded?
logger.warn("Rescheduling email #{email_id} due to exceeding rate limit")
SendEmailWorker.set(queue: queue || "send_email_immediate")
SendEmailJob.set(queue: queue || "send_email_immediate")
.perform_in(5.minutes, email_id, metrics, queue)
return
end
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
class SubscriberListAuditWorker < ApplicationWorker
class SubscriberListAuditJob < ApplicationJob
sidekiq_options queue: :subscriber_list_audit

def perform(url_batch, audit_start_time_string)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
class WeeklyDigestInitiatorWorker < ApplicationWorker
class WeeklyDigestInitiatorJob < ApplicationJob
def perform(date = Date.current.to_s)
run_with_advisory_lock(DigestRun, "#{date}-#{Frequency::WEEKLY}") do
DigestInitiatorService.call(date: Date.parse(date), range: Frequency::WEEKLY)
Expand Down
2 changes: 1 addition & 1 deletion app/services/bulk_unsubscribe_list_service.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ def initialize(subscriber_list:, params:, govuk_request_id:, user: nil)
def call
message = Message.create!(message_params) if message_params
Metrics.message_created if message
BulkUnsubscribeListWorker.perform_async(
BulkUnsubscribeListJob.perform_async(
subscriber_list.id,
message&.id,
)
Expand Down
2 changes: 1 addition & 1 deletion app/services/content_change_handler_service.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ def initialize(params:, govuk_request_id:, user: nil)
def call
content_change = ContentChange.create!(content_change_params)
Metrics.content_change_created
ProcessContentChangeWorker.perform_async(content_change.id)
ProcessContentChangeJob.perform_async(content_change.id)
end

private
Expand Down
2 changes: 1 addition & 1 deletion app/services/digest_initiator_service.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def create_digest_run_subscribers(digest_run)

def enqueue_jobs(digest_run_subscriber_ids)
digest_run_subscriber_ids.each do |digest_run_subscriber_id|
DigestEmailGenerationWorker.perform_async(digest_run_subscriber_id)
DigestEmailGenerationJob.perform_async(digest_run_subscriber_id)
end
end
end
2 changes: 1 addition & 1 deletion app/services/immediate_email_generation_service.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ def call
subscriber_batches.each do |batch|
email_ids = batch.generate_emails
email_ids.each do |id|
SendEmailWorker.perform_async_in_queue(
SendEmailJob.perform_async_in_queue(
id,
worker_metrics,
queue: content.queue,
Expand Down
7 changes: 0 additions & 7 deletions app/workers/recover_lost_jobs_worker.rb

This file was deleted.

19 changes: 0 additions & 19 deletions app/workers/recover_lost_jobs_worker/unprocessed_check.rb

This file was deleted.

3 changes: 0 additions & 3 deletions config/initializers/sidekiq.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
# Set strict args so we're ready for Sidekiq 7
Sidekiq.strict_args!

Sidekiq.configure_server do |config|
config.logger.level = Rails.logger.level
end
20 changes: 10 additions & 10 deletions config/sidekiq.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,33 +18,33 @@
:schedule:
daily_digest_initiator:
cron: '30 8 * * * Europe/London' # every day at 8:30am
class: DailyDigestInitiatorWorker
class: DailyDigestInitiatorJob
weekly_digest_initiator:
cron: '30 8 * * 6 Europe/London' # every Saturday at 8:30am
class: WeeklyDigestInitiatorWorker
class: WeeklyDigestInitiatorJob
historical_data_deletion:
cron: '0 12 * * * Europe/London' # every day at midday
class: HistoricalDataDeletionWorker
class: HistoricalDataDeletionJob
nullify_subscribers:
every: '1h'
class: NullifySubscribersWorker
class: NullifySubscribersJob
email_deleter:
every: '1h'
class: EmailDeletionWorker
class: EmailDeletionJob
digest_run_completion_marker:
every: '1m'
class: DigestRunCompletionMarkerWorker
class: DigestRunCompletionMarkerJob
metrics_collection:
every: '1m'
class: MetricsCollectionWorker
class: MetricsCollectionJob
recover_lost_jobs:
every: '30m'
class: RecoverLostJobsWorker
class: RecoverLostJobsJob
check_medical_safety_alerts:
every: '15m'
class: PollingAlertCheckWorker
class: PollingAlertCheckJob
args: ["medical_safety_alert"]
check_travel_advice_alerts:
every: '15m'
class: PollingAlertCheckWorker
class: PollingAlertCheckJob
args: ["travel_advice"]
2 changes: 1 addition & 1 deletion docs/alert_check_scheduled_jobs.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ kubectl -n apps deploy/email-alert-api -- rails c

(Rails console)
> Email.where(content_id: <your content id>).delete_all
> PollingAlertCheckWorker.new.perform(<your document type, either "medical_safety_alert" or "travel_advice|>)
> PollingAlertCheckJob.new.perform(<your document type, either "medical_safety_alert" or "travel_advice|>)
```

The alert check worker will find no emails with notify status "delivered", and nothing to actively poll (no emails with notify status nil), and will set the alert metric. Prometheus should collect the metrics after a minute, and set off the alert.
Expand Down
4 changes: 2 additions & 2 deletions lib/subscriber_list_mover.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def call
)
end

BulkMigrateListWorker.perform_async(
BulkMigrateListJob.perform_async(
source_subscriber_list.id,
destination_subscriber_list.id,
)
Expand All @@ -33,7 +33,7 @@ def call

if send_email
puts "Sending emails to subscribers about change"
emails.each { |id| SendEmailWorker.perform_async_in_queue(id, queue: :send_email_immediate) }
emails.each { |id| SendEmailJob.perform_async_in_queue(id, queue: :send_email_immediate) }
end
end

Expand Down
Loading
Loading