Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bucket Notifications - PR notes, take 2 #8593

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 11 additions & 5 deletions src/api/common_api.js
Original file line number Diff line number Diff line change
Expand Up @@ -1428,15 +1428,21 @@ module.exports = {
},
bucket_notification: {
type: 'object',
required: ['Id', 'Connect'],
required: ['Id', 'Topic'],
properties: {
Id: {
type: 'string'
type: 'array',
items: {
type: 'string',
},
},
Connect: {
type: 'string'
Topic: {
type: 'array',
items: {
type: 'string',
},
},
Events: {
Event: {
type: 'array',
items: {
type: 'string',
Expand Down
6 changes: 4 additions & 2 deletions src/cmd/manage_nsfs.js
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ async function update_bucket(data, user_input) {

if (user_input.notifications) {
//notifications are tested before they can be updated
const test_notif_err = await notifications_util.test_notifications(data);
const test_notif_err = await notifications_util.test_notifications(data, config_fs.connect_dir_path);
if (test_notif_err) {
throw_cli_error(ManageCLIError.InvalidArgument, "Failed to update notifications", test_notif_err);
}
Expand Down Expand Up @@ -719,7 +719,9 @@ async function logging_management() {
}

async function notification_management() {
new notifications_util.Notificator({fs_context: config_fs.fs_context}).process_notification_files();
new notifications_util.Notificator({
fs_context: config_fs.fs_context,
connect_files_dir: config_fs.connect_dir_path}).process_notification_files();
}

exports.main = main;
Expand Down
10 changes: 0 additions & 10 deletions src/endpoint/s3/ops/s3_put_bucket_notification.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,6 @@ async function put_bucket_notification(req) {
if (!topic_configuration ||
typeof topic_configuration !== 'object') throw new S3Error(S3Error.MalformedXML);


//align request aws s3api sends
for (const notif of topic_configuration) {
if (Array.isArray(notif.Id)) notif.Id = notif.Id[0];
notif.Connect = Array.isArray(notif.Topic) ? notif.Topic[0] : notif.Topic;
notif.Events = notif.Event;
delete notif.Event;
delete notif.Topic;
}

const reply = await req.object_sdk.put_bucket_notification({
bucket_name: req.params.bucket,
notifications: topic_configuration
Expand Down
3 changes: 3 additions & 0 deletions src/sdk/config_fs.js
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ const CONFIG_SUBDIRS = Object.freeze({
ACCOUNTS_BY_NAME: 'accounts_by_name',
ACCOUNTS: 'accounts', // deprecated on 5.18
USERS: 'users',
CONNECT: 'connect',
});

const CONFIG_TYPES = Object.freeze({
Expand Down Expand Up @@ -86,6 +87,7 @@ class ConfigFS {
this.identities_dir_path = path.join(config_root, CONFIG_SUBDIRS.IDENTITIES);
this.access_keys_dir_path = path.join(config_root, CONFIG_SUBDIRS.ACCESS_KEYS);
this.buckets_dir_path = path.join(config_root, CONFIG_SUBDIRS.BUCKETS);
this.connect_dir_path = path.join(config_root, CONFIG_SUBDIRS.CONNECT);
this.system_json_path = path.join(config_root, 'system.json');
this.config_json_path = path.join(config_root, 'config.json');
this.fs_context = fs_context || native_fs_utils.get_process_fs_context(this.config_root_backend);
Expand Down Expand Up @@ -160,6 +162,7 @@ class ConfigFS {
this.accounts_by_name_dir_path,
this.identities_dir_path,
this.access_keys_dir_path,
this.connect_dir_path,
];

if (config.NSFS_GLACIER_LOGS_ENABLED) {
Expand Down
42 changes: 18 additions & 24 deletions src/util/notifications_util.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ const os = require('os');
const fs = require('fs');
const http = require('http');
const https = require('https');
const path = require('path');
const { get_process_fs_context } = require('./native_fs_utils');
const nb_native = require('../util/nb_native');
const http_utils = require('../util/http_utils');
Expand All @@ -25,18 +26,21 @@ const OP_TO_EVENT = Object.freeze({
lifecycle_delete: { name: 'LifecycleExpiration' },
});

const DEFAULT_CONNECT_FILES_DIR = '/etc/notif_connect/';

class Notificator {

/**
*
* @param {Object} options
*/

constructor({name, fs_context}) {
constructor({name, fs_context, connect_files_dir}) {
this.name = name;
this.connect_str_to_connection = new Map();
this.notif_to_connect = new Map();
this.fs_context = fs_context ?? get_process_fs_context();
this.connect_files_dir = connect_files_dir ?? DEFAULT_CONNECT_FILES_DIR;
}

async run_batch() {
Expand Down Expand Up @@ -113,7 +117,7 @@ class Notificator {
dbg.log2("notifying with notification =", notif);
let connect = this.notif_to_connect.get(notif.meta.name);
if (!connect) {
connect = parse_connect_file(notif.meta.connect);
connect = this.parse_connect_file(notif.meta.connect);
this.notif_to_connect.set(notif.meta.name, connect);
}
let connection = this.connect_str_to_connection.get(notif.meta.name);
Expand All @@ -140,6 +144,13 @@ class Notificator {
//we can remove the currently processed persistent file
return true;
}

parse_connect_file(connect_filename) {
const connect_str = fs.readFileSync(path.join(this.connect_files_dir, connect_filename), 'utf-8');
const connect = JSON.parse(connect_str);
load_files(connect);
return connect;
}
}

class HttpNotificator {
Expand Down Expand Up @@ -259,24 +270,6 @@ function load_files(object) {
dbg.log2('load_files for obj =', object);
}

function parse_connect_file(connect_filepath) {
const connect = {};
const connect_strs = fs.readFileSync(connect_filepath, 'utf-8').split(os.EOL);
for (const connect_str of connect_strs) {
if (connect_str === '') continue;
const kv = connect_str.split('=');
//parse JSONs-
if (kv[0].endsWith('object')) {
kv[1] = JSON.parse(kv[1]);
}
connect[kv[0]] = kv[1];
}
//parse file contents (useful for tls cert files)
load_files(connect);
return connect;
}


function get_connection(connect) {
switch (connect.notification_protocol.toLowerCase()) {
case 'http':
Expand All @@ -295,9 +288,10 @@ function get_connection(connect) {
}


async function test_notifications(bucket) {
async function test_notifications(bucket, connect_files_dir) {
const notificator = new Notificator({connect_files_dir});
for (const notif of bucket.notifications) {
const connect = parse_connect_file(notif.connect);
const connect = notificator.parse_connect_file(notif.connect);
dbg.log1("testing notif", notif);
try {
const connection = get_connection(connect);
Expand Down Expand Up @@ -408,8 +402,8 @@ function compose_notification_lifecycle(deleted_obj, notif_conf, bucket) {
function compose_meta(record, notif_conf) {
return {
meta: {
connect: notif_conf.Connect,
name: notif_conf.Id
connect: notif_conf.Topic[0],
name: notif_conf.Id[0],
},
notif: {
Records: [record],
Expand Down
Loading