Skip to content

Commit

Permalink
Update
Browse files Browse the repository at this point in the history
  • Loading branch information
creadone committed Aug 31, 2021
1 parent d73a172 commit 91dd2f5
Show file tree
Hide file tree
Showing 6 changed files with 40 additions and 24 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,5 @@
/lua/.rocks
/lua/*.snap
/lua/*.xlog
/.rocks/
/.rocks/
/tmp
5 changes: 2 additions & 3 deletions bin/dev-run
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
#!/usr/bin/env bash

mkdir -p ../tmp
tarantool init.lua
rm -rf *.snap *.xlog
# rm -rf /var/backup/qube/*
clear
rm -rf ./tmp/*.snap ./tmp/*.xlog
20 changes: 11 additions & 9 deletions config.lua
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,17 @@ local Config = {}
Config.tarantool = {
access = {
user = 'qube',
password = '77c04ced3f915240d0c5d8d5819f84c7',
password = '1234567890',
},
node = {
pid_file = '/var/run/qube.pid',
pid_file = './tmp/qube.pid',
-- pid_file = '/var/run/qube.pid',
memtx_memory = 1024 * 1024 * 1024 * 1,
memtx_dir = './',
wal_dir = './',
memtx_dir = './tmp',
wal_dir = './tmp',
background = false,
custom_proc_title = 'qube',
log_level = 5,
-- log_format = 'json'
}
}
Expand All @@ -29,11 +31,11 @@ Config.http = {

-- Shipper
Config.shipper = {
enable = true,
user_agent = 'QubeShipper',
token = '77c04ced3f915240d0c5d8d5819f84c7',
webhook_url = 'http://localhost:3000/qube/_jobs',
task_check = 1
enable = true,
user_agent = 'QubeShipper',
token = '77c04ced3f915240d0c5d8d5819f84c7',
webhook_url = 'http://localhost:3000/_jobs',
delay = 0
}

return Config
3 changes: 2 additions & 1 deletion init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ box.cfg(config.tarantool.node)
local json = require('json')
local qube = require('lib.qube')
local shipper = require('lib.shipper')
shipper.start()
shipper.start()

local http_router = require('http.router')
local http_server = require('http.server')
Expand Down Expand Up @@ -49,6 +49,7 @@ local routes = {
{ method = 'POST', path = '/tubes/:tube', controller = 'add_task' },
{ method = 'GET', path = '/tubes/:tube', controller = 'take_task' },
{ method = 'PUT', path = '/tubes/:tube/:task_id/ack', controller = 'ack_task' },
{ method = 'POST', path = '/jobs', controller = 'jobs' },
}

local router = http_router.new()
Expand Down
15 changes: 15 additions & 0 deletions lib/qube.lua
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
local json = require('json')
-- local xray = require('lib.xray')

local M = {}
M.queue = require('queue')
Expand Down Expand Up @@ -56,4 +57,18 @@ function M.tube_list(_)
return list
end

-- Special for Rails adapter
function M.jobs(request)
local body = json.decode(request:read())
local tube_name = body['data']['queue_name']

if M.queue.tube[tube_name] == nil then
M.queue.create_tube(tube_name, 'fifo')
end

if M.queue.tube[tube_name]:put(body['data']) then
return true
end
end

return M
18 changes: 8 additions & 10 deletions lib/shipper.lua
Original file line number Diff line number Diff line change
Expand Up @@ -37,20 +37,20 @@ function M.serialize(task)
}
end

-- Taking the new task from channel
-- and send to the external app
-- Taking the new task from the channel and send it to external app
function M.sender_worker()
local webhook = config.shipper.webhook_url
local options = M.client_opts
local queue = M.queue

-- Save fiber session id
M.transport.sender.sid = queue.identify()
M.transport.sender.sid = M.queue.identify()

while true do
if M.transport.tunnel:is_empty() then
logger.debug('Sender: channel is empty')
fiber.testcancel()
fiber.sleep(config.shipper.task_check)
fiber.sleep(config.shipper.delay)
else
local task = M.transport.tunnel:get(0)
logger.debug('Sender: received new task')
Expand All @@ -60,8 +60,7 @@ function M.sender_worker()
if success then
local tube_name = task['tube']

-- Before call ack need apply finder's session
-- or will be raised 'Task was not taken'
-- Before call ack need to apply finder's session to prevent error 'Task was not taken'
queue.identify(M.transport.finder.sid)
local ok, resp = pcall(function()
return M.queue.tube[tube_name]:ack(task['task_id'])
Expand All @@ -75,13 +74,12 @@ function M.sender_worker()
logger.error('Sender: failed to shipped task: ' .. tostring(push_err))
end
fiber.testcancel()
fiber.sleep(config.shipper.task_check)
fiber.sleep(config.shipper.delay)
end
end
end

-- Iterating over tubes, find new task
-- and pass to the sender_worker
-- Iterating over tubes, find new task and pass it to the sender_worker
function M.finder_worker()
local queue = M.queue
M.transport.finder.sid = queue.identify()
Expand All @@ -101,7 +99,7 @@ function M.finder_worker()
end
end
fiber.testcancel()
fiber.sleep(config.shipper.task_check)
fiber.sleep(config.shipper.delay)
end
end

Expand Down

0 comments on commit 91dd2f5

Please sign in to comment.