Skip to content

Commit

Permalink
Bucket Notifications - PR notes, take 2
Browse files Browse the repository at this point in the history
1. Switch connect file format kv -> json
2. Use only filename to specify connect file.
3. Use AWS s3api schema to store notifications conf.

Signed-off-by: Amit Prinz Setter <[email protected]>
  • Loading branch information
alphaprinz committed Dec 12, 2024
1 parent cf0daca commit c4f270c
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 41 deletions.
16 changes: 11 additions & 5 deletions src/api/common_api.js
Original file line number Diff line number Diff line change
Expand Up @@ -1428,15 +1428,21 @@ module.exports = {
},
bucket_notification: {
type: 'object',
required: ['Id', 'Connect'],
required: ['Id', 'Topic'],
properties: {
Id: {
type: 'string'
type: 'array',
items: {
type: 'string',
},
},
Connect: {
type: 'string'
Topic: {
type: 'array',
items: {
type: 'string',
},
},
Events: {
Event: {
type: 'array',
items: {
type: 'string',
Expand Down
6 changes: 4 additions & 2 deletions src/cmd/manage_nsfs.js
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ async function update_bucket(data, user_input) {

if (user_input.notifications) {
//notifications are tested before they can be updated
const test_notif_err = await notifications_util.test_notifications(data);
const test_notif_err = await notifications_util.test_notifications(data, config_fs.connect_dir_path);
if (test_notif_err) {
throw_cli_error(ManageCLIError.InvalidArgument, "Failed to update notifications", test_notif_err);
}
Expand Down Expand Up @@ -719,7 +719,9 @@ async function logging_management() {
}

async function notification_management() {
new notifications_util.Notificator({fs_context: config_fs.fs_context}).process_notification_files();
new notifications_util.Notificator({
fs_context: config_fs.fs_context,
connect_files_dir: config_fs.connect_dir_path}).process_notification_files();
}

exports.main = main;
Expand Down
10 changes: 0 additions & 10 deletions src/endpoint/s3/ops/s3_put_bucket_notification.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,6 @@ async function put_bucket_notification(req) {
if (!topic_configuration ||
typeof topic_configuration !== 'object') throw new S3Error(S3Error.MalformedXML);


//align request aws s3api sends
for (const notif of topic_configuration) {
if (Array.isArray(notif.Id)) notif.Id = notif.Id[0];
notif.Connect = Array.isArray(notif.Topic) ? notif.Topic[0] : notif.Topic;
notif.Events = notif.Event;
delete notif.Event;
delete notif.Topic;
}

const reply = await req.object_sdk.put_bucket_notification({
bucket_name: req.params.bucket,
notifications: topic_configuration
Expand Down
3 changes: 3 additions & 0 deletions src/sdk/config_fs.js
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ const CONFIG_SUBDIRS = Object.freeze({
ACCOUNTS_BY_NAME: 'accounts_by_name',
ACCOUNTS: 'accounts', // deprecated on 5.18
USERS: 'users',
CONNECT: 'connect',
});

const CONFIG_TYPES = Object.freeze({
Expand Down Expand Up @@ -86,6 +87,7 @@ class ConfigFS {
this.identities_dir_path = path.join(config_root, CONFIG_SUBDIRS.IDENTITIES);
this.access_keys_dir_path = path.join(config_root, CONFIG_SUBDIRS.ACCESS_KEYS);
this.buckets_dir_path = path.join(config_root, CONFIG_SUBDIRS.BUCKETS);
this.connect_dir_path = path.join(config_root, CONFIG_SUBDIRS.CONNECT);
this.system_json_path = path.join(config_root, 'system.json');
this.config_json_path = path.join(config_root, 'config.json');
this.fs_context = fs_context || native_fs_utils.get_process_fs_context(this.config_root_backend);
Expand Down Expand Up @@ -160,6 +162,7 @@ class ConfigFS {
this.accounts_by_name_dir_path,
this.identities_dir_path,
this.access_keys_dir_path,
this.connect_dir_path,
];

if (config.NSFS_GLACIER_LOGS_ENABLED) {
Expand Down
42 changes: 18 additions & 24 deletions src/util/notifications_util.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ const os = require('os');
const fs = require('fs');
const http = require('http');
const https = require('https');
const path = require('path');
const { get_process_fs_context } = require('./native_fs_utils');
const nb_native = require('../util/nb_native');
const http_utils = require('../util/http_utils');
Expand All @@ -25,18 +26,21 @@ const OP_TO_EVENT = Object.freeze({
lifecycle_delete: { name: 'LifecycleExpiration' },
});

const DEFAULT_CONNECT_FILES_DIR = '/etc/notif_connect/';

class Notificator {

/**
*
* @param {Object} options
*/

constructor({name, fs_context}) {
constructor({name, fs_context, connect_files_dir}) {
this.name = name;
this.connect_str_to_connection = new Map();
this.notif_to_connect = new Map();
this.fs_context = fs_context ?? get_process_fs_context();
this.connect_files_dir = connect_files_dir ?? DEFAULT_CONNECT_FILES_DIR;
}

async run_batch() {
Expand Down Expand Up @@ -113,7 +117,7 @@ class Notificator {
dbg.log2("notifying with notification =", notif);
let connect = this.notif_to_connect.get(notif.meta.name);
if (!connect) {
connect = parse_connect_file(notif.meta.connect);
connect = this.parse_connect_file(notif.meta.connect);
this.notif_to_connect.set(notif.meta.name, connect);
}
let connection = this.connect_str_to_connection.get(notif.meta.name);
Expand All @@ -140,6 +144,13 @@ class Notificator {
//we can remove the currently processed persistent file
return true;
}

parse_connect_file(connect_filename) {
const connect_str = fs.readFileSync(path.join(this.connect_files_dir, connect_filename), 'utf-8');
const connect = JSON.parse(connect_str);
load_files(connect);
return connect;
}
}

class HttpNotificator {
Expand Down Expand Up @@ -259,24 +270,6 @@ function load_files(object) {
dbg.log2('load_files for obj =', object);
}

function parse_connect_file(connect_filepath) {
const connect = {};
const connect_strs = fs.readFileSync(connect_filepath, 'utf-8').split(os.EOL);
for (const connect_str of connect_strs) {
if (connect_str === '') continue;
const kv = connect_str.split('=');
//parse JSONs-
if (kv[0].endsWith('object')) {
kv[1] = JSON.parse(kv[1]);
}
connect[kv[0]] = kv[1];
}
//parse file contents (useful for tls cert files)
load_files(connect);
return connect;
}


function get_connection(connect) {
switch (connect.notification_protocol.toLowerCase()) {
case 'http':
Expand All @@ -295,9 +288,10 @@ function get_connection(connect) {
}


async function test_notifications(bucket) {
async function test_notifications(bucket, connect_files_dir) {
let notificator = new Notificator({connect_files_dir});
for (const notif of bucket.notifications) {
const connect = parse_connect_file(notif.connect);
const connect = notificator.parse_connect_file(notif.connect);
dbg.log1("testing notif", notif);
try {
const connection = get_connection(connect);
Expand Down Expand Up @@ -408,8 +402,8 @@ function compose_notification_lifecycle(deleted_obj, notif_conf, bucket) {
function compose_meta(record, notif_conf) {
return {
meta: {
connect: notif_conf.Connect,
name: notif_conf.Id
connect: notif_conf.Topic[0],
name: notif_conf.Id[0],
},
notif: {
Records: [record],
Expand Down

0 comments on commit c4f270c

Please sign in to comment.