Skip to content

Kong/lua-resty-events

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

51 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

lua-resty-events

Inter process Pub/Sub pattern events propagation for Nginx worker processes

Table of Contents

Status

This library is considered production ready.

Synopsis

http {
    lua_package_path "/path/to/lua-resty-events/lib/?/init.lua;;";

    init_by_lua_block {
        local opts = {
            listening = "unix:/tmp/events.sock",
        }

        local ev = require("resty.events").new(opts)
        if not ev then
            ngx.log(ngx.ERR, "failed to new events object")
        end

        -- store ev to global
        _G.ev = ev
    }

    init_worker_by_lua_block {
        -- fetch ev from global
        local ev = _G.ev

        local handler = function(data, event, source, wid)
            print("received event; source=", source,
                  ", event=", event,
                  ", data=", tostring(data),
                  ", from process ", wid)
        end

        local id1 = ev:subscribe("*", "*", handler)
        local id2 = ev:subscribe("source", "*", handler)
        local id3 = ev:subscribe("source", "event", handler)

        local ok, err = ev:init_worker()
        if not ok then
            ngx.log(ngx.ERR, "failed to init events: ", err)
        end
    }

    # create a listening unix domain socket
    server {
        listen unix:/tmp/events.sock;
        location / {
            content_by_lua_block {
                -- fetch ev from global
                local ev = _G.ev
                ev:run()
            }
        }
    }
}

Description

This module provides a way to send events to the other worker processes in an Nginx server. Communication is through a unix domain socket which is listened by one and only one Nginx worker.

The design allows for 3 usecases;

  1. broadcast an event to all workers processes, see publish. Example: a healthcheck running in one worker, but informing all workers of a failed upstream node.
  2. broadcast an event to the current worker only, see target parameter of publish.
  3. coalesce external events to a single action. Example: all workers watch external events indicating an in-memory cache needs to be refreshed. When receiving it they all post it with a unique event hash (all workers generate the same hash), see target parameter of publish. Now only 1 worker will receive the event only once, so only one worker will hit the upstream database to refresh the in-memory data.

Back to TOC

Methods

Back to TOC

new

syntax: ev = events.new(opts)

context: init_by_lua*

Return a new events object. It should be stored in global scope for run later.

The opts parameter is a Lua table with named options:

  • listening: the unix domain socket, which must be same as another server block.
  • broker_id: (optional) the worker id that will start to listen, default 0.
  • unique_timeout: (optional) timeout of unique event data stored (in seconds), default 5. See the target parameter of the publish method.
  • max_queue_len: (optional) max length of internal events buffer queue, default 1024 * 10.
  • max_payload_len: (optional) max length of serialized event data, default 1024 * 64, max 1024 * 1024 * 16.
  • enable_privileged_agent: (optional) whether to enable privileged agent to receive events. By default it is enabled dynamically on broker connection for backward compatibility, but it is strongly suggested to explicitly configure this either with true or false as that ensures that events queue for privileged agent will be pre-created during initialization (or not created at all).

The return value will be the event object or nil.

There is a special parameter testing, which means the library will not enable unix domain socket listening, and the events will only be propagated in the worker process internally. In the meanwhile, unique_timeout will be meanless.

This feature is very useful for testing, such as resty cli. The default value for testing is false.

Back to TOC

init_worker

syntax: ok, err = ev:init_worker()

context: init_worker_by_lua*

Will initialize the event listener. This should typically be called from the init_worker_by_lua handler, because it will make sure only one Nginx worker starts to listen on unix domain socket.

The return value will be true, or nil and an error message.

Back to TOC

run

syntax: ev:run()

context: content_by_lua*

Active the event loop only in Nginx broker process, see opts broker_id of new. it must be called in content_by_lua*.

ev object must be the same object returned by new.

Should not call it if testing is set to true.

Back to TOC

publish

syntax: ok, err = ev:publish(target, source, event, data)

context: all phases except init_by_lua*

Will post a new event. target, source and event are all strings. data can be anything (including nil) as long as it is (de)serializable by the LuaJIT string buffer serializer and cJSON (legacy).

The target parameter could be:

  • "all" : the event will be broadcasted to all workers.
  • "current" : the event will be local to the worker process, it will not be broadcasted to other workers. With this method, the data element will not be serialized.
  • unique hash : the event will be send to only one worker. Also any follow up events with the same hash value will be ignored (for the unique_timeout period specified to new).

The return value will be true when the event was successfully published or nil + error in case of cjson serializition failure or event queue full.

Note: in case of "all" and "current" the worker process sending the event, will also receive the event! So if the eventsource will also act upon the event, it should not do so from the event posting code, but only when receiving it.

Note: in case of "all" and "unique hash" the serialized data has a hard-coded limit 2^24 - 1 bytes. It means that we can not send any data which is larger than 16MB.

Back to TOC

subscribe

syntax: id = ev:subscribe(source, event, callback)

context: all phases except init_by_lua*

Will register a callback function to receive events. If source and event are *, then the callback will be executed on every event, if source is provided and event is *, then only events with a matching source will be passed. If event name is given, then only when both source and event match the callback is invoked.

The callback should have the following signature;

syntax: callback = function(data, event, source, wid)

The parameters will be the same as the ones provided to publish, except for the extra value wid which will be the worker id of the originating worker process, or nil if it was a local event only. Any return value from callback will be discarded. Note: data may be a reference type of data (eg. a Lua table type). The same value is passed to all callbacks, so do not change the value in your handler, unless you know what you are doing!

The return value of subscribe will be a callback id, or it will throw an error if callback is not a function value.

Back to TOC

unsubscribe

syntax: ev:unsubscribe(id)

context: all phases except init_by_lua*

Will unregister the callback function and prevent it from receiving further events. The parameter id is the return value of subscribe.

Back to TOC

License

Copyright 2022-2024 Kong Inc.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

   http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

Back to TOC

See Also

Back to TOC