Skip to content

Commit

Permalink
add shipper
Browse files Browse the repository at this point in the history
  • Loading branch information
creadone committed May 1, 2021
1 parent e3cdcd0 commit 6816806
Show file tree
Hide file tree
Showing 9 changed files with 210 additions and 33 deletions.
10 changes: 10 additions & 0 deletions .luacheckrc
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
include_files = {
'**/*.lua',
'*.rockspec',
'*.luacheckrc'
}
exclude_files = {
'lua_modules',
'.luarocks',
'.rocks'
}
2 changes: 1 addition & 1 deletion LICENSE
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
MIT License

Copyright (c) 2021 tnt-qube
Copyright (c) 2021 Sergey Fedorov

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
Expand Down
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ HTTP API layer over Tarantool Queue.

1. Install [Tarantool](https://github.com/tarantool/tarantool) or `brew install tarantool`
2. Install [Queue](https://github.com/tarantool/queue) or `tarantoolctl rocks install queue`
2. `git clone https://github.com/tnt-qube/qube`
3. Edit config:
3. Install [HTTP](https://github.com/tarantool/http) or `tarantoolctl rocks install http`
4. `git clone https://github.com/tnt-qube/qube`
5. Edit config:

```lua
-- config.lua
Expand Down
3 changes: 2 additions & 1 deletion bin/dev-run
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#!/usr/bin/env bash

tarantool init.lua
rm -rf *.snap *.xlog *.log
rm -rf *.snap *.xlog
# rm -rf /var/backup/qube/*
clear
43 changes: 27 additions & 16 deletions config.lua
Original file line number Diff line number Diff line change
@@ -1,28 +1,39 @@
local Config = {}

Config.http = {
root = '/api/v1',
host = '127.0.0.1',
port = '5672',
token = '77c04ced3f915240d0c5d8d5819f84c7',
log_requests = true,
log_errors = true
}

-- TNT configuration
Config.tarantool = {
access = {
user = 'qube',
password = '77c04ced3f915240d0c5d8d5819f84c7',
},
node = {
pid_file = '/var/run/tarantool',
memtx_memory = 1024 * 1024 * 1024 * 2,
memtx_dir = '/var/backup/qube',
wal_dir = '/var/backup/qube',
log = '/var/log/qube/qube.log',
background = false,
custom_proc_title = 'qube'
pid_file = '/var/run/qube.pid',
memtx_memory = 1024 * 1024 * 1024 * 1,
memtx_dir = './',
wal_dir = './',
background = false,
custom_proc_title = 'qube',
-- log_format = 'json'
}
}

-- HTTP Server
Config.http = {
root = '/api/v1',
host = '127.0.0.1',
port = '5672',
token = '77c04ced3f915240d0c5d8d5819f84c7',
log_requests = true,
log_errors = true
}

-- Shipper
Config.shipper = {
enable = true,
user_agent = 'QubeShipper',
token = '77c04ced3f915240d0c5d8d5819f84c7',
webhook_url = 'http://localhost:3000/qube/_jobs',
task_check = 1
}

return Config
19 changes: 8 additions & 11 deletions init.lua
Original file line number Diff line number Diff line change
@@ -1,20 +1,17 @@
local json = require('json')
require('strict').on()

local config = require('config')
box.cfg(config.tarantool.node)

box.cfg(config.tarantool.node) -- or box.cfg{} for local dev
local json = require('json')
local qube = require('lib.qube')
local shipper = require('lib.shipper')
shipper.start()

local http_router = require('http.router')
local http_server = require('http.server')
local tsgi = require('http.tsgi')

local qube = require('lib.qube')
-- local xray = require('lib.xray')

box.once('access', function()
box.schema.user.create(config.tarantool.user, { password = config.tarantool.password })
box.schema.user.grant(config.tarantool.user, 'read,write,execute', 'universe')
end)

local function send_response(code, payload)
if not type(payload) == 'table' then
return { status = code, body = json.encode({ message = tostring(payload) }) }
Expand All @@ -26,7 +23,7 @@ end
local function auth_request(env)
local request_token = env:header('x-auth-token')
if not request_token == config.http.token then
return send_response(403, 'Failed to authorize request')
return send_response(403, 'Failed to authenticate request')
else
return tsgi.next(env)
end
Expand Down
4 changes: 2 additions & 2 deletions lib/qube.lua
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ local json = require('json')

local M = {}
M.queue = require('queue')
M.queue.cfg({ ttr = 300 })
M.queue.create_tube('default', 'fifo', { if_not_exists = true })

function M.create_tube(request)
local name = request:post_param('tube')
Expand All @@ -16,7 +16,7 @@ end
function M.delete_tube(request)
local name = request:stash('tube')
local tube = M.queue.tube[name]
tube:truncate()
tube:truncate()
return tube:drop()
end

Expand Down
135 changes: 135 additions & 0 deletions lib/shipper.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
local fiber = require('fiber')
local config = require('config')
local client = require('http.client').new()
local json = require('json')
local logger = require('log')

local M = {}
M.queue = require('queue')

M.transport = {
finder = { sid = nil, fb = nil },
sender = { sid = nil, fb = nil },
tunnel = fiber.channel(1),
}

M.client_opts = {
['headers'] = {
['User-Agent'] = config.shipper.user_agent,
['X-Auth-Token'] = config.shipper.token,
['Content-Type'] = 'application/json',
}
}

-- Check if module can start
function M.can_start()
local queue_status = M.queue ~= nil
local shipper_status = config.shipper.enable == true
return (queue_status and shipper_status)
end

-- Pack task before send to app
function M.serialize(task)
return {
task_id = task['task_id'],
queue = task['tube'],
data = task[3]
}
end

-- Taking the new task from channel
-- and send to the external app
function M.sender_worker()
local webhook = config.shipper.webhook_url
local options = M.client_opts
local queue = M.queue

-- Save fiber session id
M.transport.sender.sid = queue.identify()
while true do
if M.transport.tunnel:is_empty() then
logger.debug('Sender: channel is empty')
fiber.testcancel()
fiber.sleep(config.shipper.task_check)
else
local task = M.transport.tunnel:get(0)
logger.debug('Sender: received new task')
local success, push_err = pcall(function()
return client:post(webhook, json.encode(task), options)
end)
if success then
local tube_name = task['tube']

-- Before call ack need apply finder's session
-- or will be raised 'Task was not taken'
queue.identify(M.transport.finder.sid)
local ok, resp = pcall(function()
return M.queue.tube[tube_name]:ack(task['task_id'])
end)
if ok then
logger.info('Sender: task %s#%s has been shipped', task['task_id'], task['tube'])
else
logger.error('Sender: failed to ack task, %s', tostring(resp))
end
else
logger.error('Sender: failed to shipped task: ' .. tostring(push_err))
end
fiber.testcancel()
fiber.sleep(config.shipper.task_check)
end
end
end

-- Iterating over tubes, find new task
-- and pass to the sender_worker
function M.finder_worker()
local queue = M.queue
M.transport.finder.sid = queue.identify()
while true do
queue.identify(M.transport.finder.sid)
for tube_name, _ in pairs(queue.tube) do
local success, item = pcall(function()
return queue.tube[tube_name]:take(0)
end)
if success and item ~= nil then
local task = M.serialize(item)
task['tube'] = tube_name
M.transport.tunnel:put(task)
logger.verbose('Finder: found new task')
else
logger.verbose('Finder: waiting for new task...')
end
end
fiber.testcancel()
fiber.sleep(config.shipper.task_check)
end
end

-- Run shipper
function M.start()
if M.can_start() then
-- Start workers
local finder = M.transport.finder
finder.fb = fiber.create(M.finder_worker)
finder.fb:name('finder')
local sender = M.transport.sender
sender.fb = fiber.create(M.sender_worker)
sender.fb:name('sender')

return true
else
logger.error('Failed to start workers')
return false
end
end

-- Stop shipper
function M.stop()
local finder = M.transport.finder
local sender = M.transport.sender
for fib in pairs({finder, sender}) do
fib.fb:kill();
end
end

return M
22 changes: 22 additions & 0 deletions qube-dev-1.rockspec
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package = 'qube'
version = 'dev-2'
source = {
url = 'git://github.com/tnt-qube/qube.git',
branch = 'master',
}

description = {
summary = 'API layer over Tarantool Queue via HTTP',
homepage = 'https://github.com/tnt-qube',
license = 'MIT'
}
dependencies = {
'lua >= 5.1'
}

build = {
type = 'builtin',
modules = {
['qube'] = 'qube/init.lua',
}
}

0 comments on commit 6816806

Please sign in to comment.