Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Batch aggregation #4303

Merged
merged 34 commits into from
Jun 13, 2024
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
883016a
Aggregation migration, model, spec, factory
zwolf Mar 6, 2024
5f2076b
Client & spec
zwolf Mar 6, 2024
6683ba5
Policy and spec
zwolf Mar 26, 2024
0adb3f1
Schemas
zwolf Mar 26, 2024
38e4efe
Update policy & schemas
zwolf Mar 27, 2024
d4067ec
Update status enum
zwolf Mar 27, 2024
2d8f8e2
Add default to status migration
zwolf Mar 27, 2024
7db71c9
Update serializer
zwolf Mar 27, 2024
3a88a0c
Update factory
zwolf Mar 27, 2024
03ffb51
Controller and spec
zwolf Mar 27, 2024
b3b7292
Fix migration
zwolf Mar 27, 2024
9b8d423
Hound
zwolf Mar 27, 2024
a16c421
Hound again
zwolf Mar 27, 2024
ea565fb
Third round hound
zwolf Mar 27, 2024
f6440bd
Feed hound
zwolf Mar 27, 2024
25e1843
Consistency
zwolf Mar 27, 2024
ec55ab9
Merge branch 'master' into batch-aggregation
zwolf Mar 28, 2024
50f14c1
Add project_id to Aggregations to allow Doorkeeper to scope by project
zwolf May 24, 2024
83be921
Merge branch 'batch-aggregation' of github.com:zooniverse/panoptes in…
zwolf May 24, 2024
ee47747
Clarify admin specs, add collab spec
zwolf May 24, 2024
0271b4c
Remove ignored_columns
zwolf May 24, 2024
2fccdb7
Add spec for failed service connections
zwolf May 24, 2024
4f1d46c
Add serializer spec
zwolf May 28, 2024
a08dbd1
Implement #destroy, add error specs for collisions
zwolf May 28, 2024
d97e027
Aggregation documentation
zwolf May 28, 2024
b90bdcc
reorder spec
zwolf May 28, 2024
419dc65
Merge branch 'master' into batch-aggregation
zwolf May 28, 2024
c350406
Merge branch 'batch-aggregation' of github.com:zooniverse/panoptes in…
zwolf May 28, 2024
b175f36
Fix hash alignment
zwolf May 28, 2024
cc1fbfe
Remove aggregation association from subject
zwolf May 30, 2024
871a147
Remove user uniqueness constraint
zwolf May 30, 2024
1923b00
Merge branch 'master' into batch-aggregation
zwolf Jun 10, 2024
99cb844
MIgrate database, update structure.sql
zwolf Jun 10, 2024
7009cf4
Resolve migrations
zwolf Jun 10, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 15 additions & 11 deletions app/controllers/api/v1/aggregations_controller.rb
Original file line number Diff line number Diff line change
@@ -1,19 +1,23 @@
# frozen_string_literal: true

class Api::V1::AggregationsController < Api::ApiController
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Style/ClassAndModuleChildren: Use nested module/class definitions instead of compact style.

include JsonApiController::PunditPolicy

require_authentication :create, :update, scopes: [:project]
resource_actions :create, :update, :show, :index
require_authentication :index, :show, :update, :create, scopes: [:workflow]
resource_actions :index, :show, :create, :update
schema_type :json_schema
before_action :filter_by_subject_set, only: :index

private

def filter_by_subject_set
subject_set_ids = params.delete(:subject_set_id).try(:split, ',')
unless subject_set_ids.blank?
@controlled_resources = controlled_resources
.joins(workflow: :subject_sets)
.where(workflows: { subject_set_id: subject_set_ids } )
def create
zwolf marked this conversation as resolved.
Show resolved Hide resolved
zwolf marked this conversation as resolved.
Show resolved Hide resolved
workflow = Workflow.find(create_params['links']['workflow'])
response = AggregationClient.new.send_aggregation_request(
workflow.project.id,
workflow.id,
create_params['links']['user']
)
super do |agg|
agg.update({ task_id: response.body[:task_id], status: 'pending' })
end
rescue AggregationClient::ConnectionError
json_api_render(:service_unavailable, response.body)
end
end
23 changes: 11 additions & 12 deletions app/models/aggregation.rb
Original file line number Diff line number Diff line change
@@ -1,20 +1,19 @@
# frozen_string_literal: true

class Aggregation < ApplicationRecord

belongs_to :workflow
belongs_to :subject
belongs_to :user
validates :workflow, presence: true
validates :user, presence: true
validates :user_id, uniqueness: { scope: :workflow_id }

validates_presence_of :workflow, :subject, :aggregation
validates_uniqueness_of :subject_id, scope: :workflow_id
validate :aggregation, :workflow_version_present
self.ignored_columns = %w[subject_id aggregation]
zwolf marked this conversation as resolved.
Show resolved Hide resolved

private
enum status: {
created: 0,
pending: 1,
completed: 2,
failed: 3
}

def workflow_version_present
wv_key = :workflow_version
if aggregation && !aggregation.symbolize_keys.has_key?(wv_key)
errors.add(:aggregation, "must have #{wv_key} metadata")
end
end
end
26 changes: 7 additions & 19 deletions app/policies/aggregation_policy.rb
Original file line number Diff line number Diff line change
@@ -1,30 +1,18 @@
class AggregationPolicy < ApplicationPolicy
class ReadScope < Scope
# Allow access to public aggregations
class Scope < Scope
def resolve(action)
updatable_parents = policy_for(Workflow).scope_for(:update)
updatable_scope = scope.joins(:workflow).merge(updatable_parents)

public_aggregations = scope.joins(:workflow).where("workflows.aggregation ->> 'public' = 'true'")
Aggregation.union(updatable_scope, public_aggregations)
end
end

class WriteScope < Scope
def resolve(action)
parent_scope = policy_for(Workflow).scope_for(action)
parent_scope = policy_for(Workflow).scope_for(:update)
scope.where(workflow_id: parent_scope.select(:id))
end
end

scope :index, :show, with: ReadScope
scope :update, :destroy, :versions, :version, with: WriteScope

def linkable_subjects
policy_for(Subject).scope_for(:show)
end
scope :index, :show, :create, :update, :destroy, with: Scope

def linkable_workflows
policy_for(Workflow).scope_for(:update)
end

def linkable_users
policy_for(User).scope_for(:update)
end
end
35 changes: 22 additions & 13 deletions app/schemas/aggregation_create_schema.rb
Original file line number Diff line number Diff line change
@@ -1,26 +1,35 @@
class AggregationCreateSchema < JsonSchema
schema do
type "object"
description "An Aggregation for a subject"
required "links"
type 'object'
description 'An Aggregation for a workflow'
required 'links'
additional_properties false

property "aggregation" do
type "object"
property 'uuid' do
type 'string'
end

property "links" do
type "object"
additional_properties false
property 'task_id' do
type 'string'
end

required "subject", "workflow"
property 'status' do
type 'string'
end
zwolf marked this conversation as resolved.
Show resolved Hide resolved

property 'links' do
type 'object'
required 'workflow'
additional_properties false

property "subject" do
type "integer", "string"
property 'workflow' do
type 'integer', 'string'
pattern '^[0-9]*$'
end

property "workflow" do
type "integer", "string"
property 'user' do
type 'integer', 'string'
pattern '^[0-9]*$'
end
end
end
Expand Down
26 changes: 9 additions & 17 deletions app/schemas/aggregation_update_schema.rb
Original file line number Diff line number Diff line change
@@ -1,27 +1,19 @@
class AggregationUpdateSchema < JsonSchema
schema do
type "object"
description "An Aggregation for a subject"
required "aggregation", "links"
type 'object'
description 'An Aggregation for a workflow'
additional_properties false

property "aggregation" do
type "object"
property 'uuid' do
type 'string'
end

property "links" do
type "object"
additional_properties false

required "subject", "workflow"

property "subject" do
type "integer", "string"
end
property 'task_id' do
type 'string'
end

property "workflow" do
type "integer", "string"
end
property 'status' do
type 'string'
end
end
end
6 changes: 3 additions & 3 deletions app/serializers/aggregation_serializer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ class AggregationSerializer
include Serialization::PanoptesRestpack
include CachedSerializer

attributes :id, :created_at, :updated_at, :aggregation, :href
can_include :workflow, :subject
attributes :id, :href, :created_at, :updated_at, :uuid, :task_id, :status
can_include :workflow, :user

can_filter_by :workflow, :subject
can_filter_by :workflow
end
58 changes: 58 additions & 0 deletions app/services/aggregation_client.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
# frozen_string_literal: true

class AggregationClient
class ConnectionError < StandardError; end
class ResourceNotFound < ConnectionError; end
class ServerError < ConnectionError; end

attr_reader :connection

def initialize(adapter=Faraday.default_adapter)
@connection = connect!(adapter)
@host ||= ENV.fetch('AGGREGATION_HOST', 'http://test.example.com')
end

def connect!(adapter)
Faraday.new(@host, ssl: { verify: false }) do |faraday|
faraday.request :json
faraday.response :json, content_type: /\bjson$/
faraday.adapter(*adapter)
end
end

def send_aggregation_request(project_id, workflow_id, user_id)
params = { project_id: project_id, workflow_id: workflow_id, user_id: user_id }

request(:post, '/run_aggregation') do |req|
req.body = params.to_json
end
end

private

def request(http_method, params)
response = connection.send(http_method, *params) do |req|
req.headers['Accept'] = 'application/json'
req.headers['Content-Type'] = 'application/json'
req.options.timeout = 5 # open/read timeout in seconds
req.options.open_timeout = 2 # connection open timeout in seconds
yield req if block_given?
end

handle_response(response)
rescue Faraday::TimeoutError,
Faraday::ConnectionFailed => e
raise ConnectionError, e.message
end

def handle_response(response)
case response.status
when 404
raise ResourceNotFound, status: response.status, body: response.body
when 400..600
raise ServerError, response.body
else
response.body
end
end
end
27 changes: 27 additions & 0 deletions db/migrate/20240304201959_refactor_aggregation_model.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# frozen_string_literal: true

class RefactorAggregationModel < ActiveRecord::Migration[6.1]
def up
# delete existing unused columns
safety_assured { remove_column :aggregations, :subject_id }
safety_assured { remove_column :aggregations, :aggregation }

# and the new aggregations columns
add_column :aggregations, :user_id, :integer
add_foreign_key :aggregations, :users, column: :user_id, validate: false

add_column :aggregations, :uuid, :string
add_column :aggregations, :task_id, :string
add_column :aggregations, :status, :integer, default: 0
end

def down
add_column :aggregations, :subject_id, :integer
add_column :aggregations, :aggregation, :jsonb

remove_column :aggregations, :user_id
remove_column :aggregations, :uuid
remove_column :aggregations, :task_id
remove_column :aggregations, :status
end
end
36 changes: 15 additions & 21 deletions db/structure.sql
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,12 @@ ALTER SEQUENCE public.access_control_lists_id_seq OWNED BY public.access_control
CREATE TABLE public.aggregations (
id integer NOT NULL,
workflow_id integer,
subject_id integer,
aggregation jsonb DEFAULT '{}'::jsonb NOT NULL,
created_at timestamp without time zone,
updated_at timestamp without time zone
updated_at timestamp without time zone,
user_id integer,
uuid character varying,
task_id character varying,
status integer
);


Expand Down Expand Up @@ -2802,13 +2804,6 @@ CREATE INDEX index_access_control_lists_on_resource_id_and_resource_type ON publ
CREATE INDEX index_access_control_lists_on_user_group_id ON public.access_control_lists USING btree (user_group_id);


--
-- Name: index_aggregations_on_subject_id_and_workflow_id; Type: INDEX; Schema: public; Owner: -
--

CREATE UNIQUE INDEX index_aggregations_on_subject_id_and_workflow_id ON public.aggregations USING btree (subject_id, workflow_id);


--
-- Name: index_aggregations_on_workflow_id; Type: INDEX; Schema: public; Owner: -
--
Expand Down Expand Up @@ -3917,14 +3912,6 @@ ALTER TABLE ONLY public.subject_groups
ADD CONSTRAINT fk_rails_283ede5252 FOREIGN KEY (project_id) REFERENCES public.projects(id);


--
-- Name: aggregations fk_rails_28a7ada458; Type: FK CONSTRAINT; Schema: public; Owner: -
--

ALTER TABLE ONLY public.aggregations
ADD CONSTRAINT fk_rails_28a7ada458 FOREIGN KEY (subject_id) REFERENCES public.subjects(id) ON UPDATE CASCADE ON DELETE CASCADE;


--
-- Name: project_contents fk_rails_305e6d8bf1; Type: FK CONSTRAINT; Schema: public; Owner: -
--
Expand Down Expand Up @@ -4077,6 +4064,14 @@ ALTER TABLE ONLY public.collections_projects
ADD CONSTRAINT fk_rails_895b025564 FOREIGN KEY (project_id) REFERENCES public.projects(id);


--
-- Name: aggregations fk_rails_8eb620b6f6; Type: FK CONSTRAINT; Schema: public; Owner: -
--

ALTER TABLE ONLY public.aggregations
ADD CONSTRAINT fk_rails_8eb620b6f6 FOREIGN KEY (user_id) REFERENCES public.users(id) NOT VALID;


--
-- Name: set_member_subjects fk_rails_93073bf3b1; Type: FK CONSTRAINT; Schema: public; Owner: -
--
Expand Down Expand Up @@ -4605,6 +4600,5 @@ INSERT INTO "schema_migrations" (version) VALUES
('20231025200957'),
('20240216142515'),
('20240216171653'),
('20240216171937');


('20240216171937'),
('20240304201959');
Loading
Loading