From 681680612ffb9b13e520e88535390b5e7d1d8c5c Mon Sep 17 00:00:00 2001 From: creadone Date: Sat, 1 May 2021 06:39:26 +0300 Subject: [PATCH] add shipper --- .luacheckrc | 10 ++++ LICENSE | 2 +- README.md | 5 +- bin/dev-run | 3 +- config.lua | 43 ++++++++------ init.lua | 19 +++---- lib/qube.lua | 4 +- lib/shipper.lua | 135 ++++++++++++++++++++++++++++++++++++++++++++ qube-dev-1.rockspec | 22 ++++++++ 9 files changed, 210 insertions(+), 33 deletions(-) create mode 100644 lib/shipper.lua create mode 100644 qube-dev-1.rockspec diff --git a/.luacheckrc b/.luacheckrc index e69de29..e7e44b2 100644 --- a/.luacheckrc +++ b/.luacheckrc @@ -0,0 +1,10 @@ +include_files = { + '**/*.lua', + '*.rockspec', + '*.luacheckrc' +} +exclude_files = { + 'lua_modules', + '.luarocks', + '.rocks' +} \ No newline at end of file diff --git a/LICENSE b/LICENSE index cc5935a..57e14d7 100644 --- a/LICENSE +++ b/LICENSE @@ -1,6 +1,6 @@ MIT License -Copyright (c) 2021 tnt-qube +Copyright (c) 2021 Sergey Fedorov Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal diff --git a/README.md b/README.md index 9187029..3b2e0e1 100644 --- a/README.md +++ b/README.md @@ -6,8 +6,9 @@ HTTP API layer over Tarantool Queue. 1. Install [Tarantool](https://github.com/tarantool/tarantool) or `brew install tarantool` 2. Install [Queue](https://github.com/tarantool/queue) or `tarantoolctl rocks install queue` -2. `git clone https://github.com/tnt-qube/qube` -3. Edit config: +3. Install [HTTP](https://github.com/tarantool/http) or `tarantoolctl rocks install http` +4. `git clone https://github.com/tnt-qube/qube` +5. Edit config: ```lua -- config.lua diff --git a/bin/dev-run b/bin/dev-run index 6e3724d..8137e90 100755 --- a/bin/dev-run +++ b/bin/dev-run @@ -1,5 +1,6 @@ #!/usr/bin/env bash tarantool init.lua -rm -rf *.snap *.xlog *.log +rm -rf *.snap *.xlog +# rm -rf /var/backup/qube/* clear \ No newline at end of file diff --git a/config.lua b/config.lua index 452f225..373f998 100644 --- a/config.lua +++ b/config.lua @@ -1,28 +1,39 @@ local Config = {} -Config.http = { - root = '/api/v1', - host = '127.0.0.1', - port = '5672', - token = '77c04ced3f915240d0c5d8d5819f84c7', - log_requests = true, - log_errors = true -} - +-- TNT configuration Config.tarantool = { access = { user = 'qube', password = '77c04ced3f915240d0c5d8d5819f84c7', }, node = { - pid_file = '/var/run/tarantool', - memtx_memory = 1024 * 1024 * 1024 * 2, - memtx_dir = '/var/backup/qube', - wal_dir = '/var/backup/qube', - log = '/var/log/qube/qube.log', - background = false, - custom_proc_title = 'qube' + pid_file = '/var/run/qube.pid', + memtx_memory = 1024 * 1024 * 1024 * 1, + memtx_dir = './', + wal_dir = './', + background = false, + custom_proc_title = 'qube', + -- log_format = 'json' } } +-- HTTP Server +Config.http = { + root = '/api/v1', + host = '127.0.0.1', + port = '5672', + token = '77c04ced3f915240d0c5d8d5819f84c7', + log_requests = true, + log_errors = true +} + +-- Shipper +Config.shipper = { + enable = true, + user_agent = 'QubeShipper', + token = '77c04ced3f915240d0c5d8d5819f84c7', + webhook_url = 'http://localhost:3000/qube/_jobs', + task_check = 1 +} + return Config \ No newline at end of file diff --git a/init.lua b/init.lua index 5fd4a46..ab56a63 100644 --- a/init.lua +++ b/init.lua @@ -1,20 +1,17 @@ -local json = require('json') +require('strict').on() + local config = require('config') +box.cfg(config.tarantool.node) -box.cfg(config.tarantool.node) -- or box.cfg{} for local dev +local json = require('json') +local qube = require('lib.qube') +local shipper = require('lib.shipper') + shipper.start() local http_router = require('http.router') local http_server = require('http.server') local tsgi = require('http.tsgi') -local qube = require('lib.qube') --- local xray = require('lib.xray') - -box.once('access', function() - box.schema.user.create(config.tarantool.user, { password = config.tarantool.password }) - box.schema.user.grant(config.tarantool.user, 'read,write,execute', 'universe') -end) - local function send_response(code, payload) if not type(payload) == 'table' then return { status = code, body = json.encode({ message = tostring(payload) }) } @@ -26,7 +23,7 @@ end local function auth_request(env) local request_token = env:header('x-auth-token') if not request_token == config.http.token then - return send_response(403, 'Failed to authorize request') + return send_response(403, 'Failed to authenticate request') else return tsgi.next(env) end diff --git a/lib/qube.lua b/lib/qube.lua index af7801e..1c5b453 100644 --- a/lib/qube.lua +++ b/lib/qube.lua @@ -2,7 +2,7 @@ local json = require('json') local M = {} M.queue = require('queue') -M.queue.cfg({ ttr = 300 }) +M.queue.create_tube('default', 'fifo', { if_not_exists = true }) function M.create_tube(request) local name = request:post_param('tube') @@ -16,7 +16,7 @@ end function M.delete_tube(request) local name = request:stash('tube') local tube = M.queue.tube[name] - tube:truncate() + tube:truncate() return tube:drop() end diff --git a/lib/shipper.lua b/lib/shipper.lua new file mode 100644 index 0000000..b4a4304 --- /dev/null +++ b/lib/shipper.lua @@ -0,0 +1,135 @@ +local fiber = require('fiber') +local config = require('config') +local client = require('http.client').new() +local json = require('json') +local logger = require('log') + +local M = {} +M.queue = require('queue') + +M.transport = { + finder = { sid = nil, fb = nil }, + sender = { sid = nil, fb = nil }, + tunnel = fiber.channel(1), +} + +M.client_opts = { + ['headers'] = { + ['User-Agent'] = config.shipper.user_agent, + ['X-Auth-Token'] = config.shipper.token, + ['Content-Type'] = 'application/json', + } +} + +-- Check if module can start +function M.can_start() + local queue_status = M.queue ~= nil + local shipper_status = config.shipper.enable == true + return (queue_status and shipper_status) +end + +-- Pack task before send to app +function M.serialize(task) + return { + task_id = task['task_id'], + queue = task['tube'], + data = task[3] + } +end + +-- Taking the new task from channel +-- and send to the external app +function M.sender_worker() + local webhook = config.shipper.webhook_url + local options = M.client_opts + local queue = M.queue + + -- Save fiber session id + M.transport.sender.sid = queue.identify() + while true do + if M.transport.tunnel:is_empty() then + logger.debug('Sender: channel is empty') + fiber.testcancel() + fiber.sleep(config.shipper.task_check) + else + local task = M.transport.tunnel:get(0) + logger.debug('Sender: received new task') + local success, push_err = pcall(function() + return client:post(webhook, json.encode(task), options) + end) + if success then + local tube_name = task['tube'] + + -- Before call ack need apply finder's session + -- or will be raised 'Task was not taken' + queue.identify(M.transport.finder.sid) + local ok, resp = pcall(function() + return M.queue.tube[tube_name]:ack(task['task_id']) + end) + if ok then + logger.info('Sender: task %s#%s has been shipped', task['task_id'], task['tube']) + else + logger.error('Sender: failed to ack task, %s', tostring(resp)) + end + else + logger.error('Sender: failed to shipped task: ' .. tostring(push_err)) + end + fiber.testcancel() + fiber.sleep(config.shipper.task_check) + end + end +end + +-- Iterating over tubes, find new task +-- and pass to the sender_worker +function M.finder_worker() + local queue = M.queue + M.transport.finder.sid = queue.identify() + while true do + queue.identify(M.transport.finder.sid) + for tube_name, _ in pairs(queue.tube) do + local success, item = pcall(function() + return queue.tube[tube_name]:take(0) + end) + if success and item ~= nil then + local task = M.serialize(item) + task['tube'] = tube_name + M.transport.tunnel:put(task) + logger.verbose('Finder: found new task') + else + logger.verbose('Finder: waiting for new task...') + end + end + fiber.testcancel() + fiber.sleep(config.shipper.task_check) + end +end + +-- Run shipper +function M.start() + if M.can_start() then + -- Start workers + local finder = M.transport.finder + finder.fb = fiber.create(M.finder_worker) + finder.fb:name('finder') + local sender = M.transport.sender + sender.fb = fiber.create(M.sender_worker) + sender.fb:name('sender') + + return true + else + logger.error('Failed to start workers') + return false + end +end + +-- Stop shipper +function M.stop() + local finder = M.transport.finder + local sender = M.transport.sender + for fib in pairs({finder, sender}) do + fib.fb:kill(); + end +end + +return M \ No newline at end of file diff --git a/qube-dev-1.rockspec b/qube-dev-1.rockspec new file mode 100644 index 0000000..eac8d94 --- /dev/null +++ b/qube-dev-1.rockspec @@ -0,0 +1,22 @@ +package = 'qube' +version = 'dev-2' +source = { + url = 'git://github.com/tnt-qube/qube.git', + branch = 'master', +} + +description = { + summary = 'API layer over Tarantool Queue via HTTP', + homepage = 'https://github.com/tnt-qube', + license = 'MIT' +} +dependencies = { + 'lua >= 5.1' +} + +build = { + type = 'builtin', + modules = { + ['qube'] = 'qube/init.lua', + } +} \ No newline at end of file