Skip to content

Commit

Permalink
Various Improvements To mtev_net_heartbeat (#916)
Browse files Browse the repository at this point in the history
* Detect if messages fail to encrypt or decrypt and bail rather than trying to use them
* Add error and debug outlets for mtev_net_heartbeat
* Log IP address of sending node in mtev_net_heartbeat when receiving messages
  • Loading branch information
pamaddox authored Jan 8, 2024
1 parent 5274254 commit 457130f
Show file tree
Hide file tree
Showing 2 changed files with 129 additions and 31 deletions.
3 changes: 3 additions & 0 deletions ChangeLog.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@

## 2.5

* Fix issue in mtev_net_heartbeat where there would be attempts to use invalid messages
* Improve logging in mtev_net_heartbeat

### 2.5.3

* Fix issue where the URL string on incoming HTTP2 requests was not being escaped properly.
Expand Down
157 changes: 126 additions & 31 deletions src/mtev_net_heartbeat.c
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@
#include <unistd.h>
#include <errno.h>
#include <openssl/evp.h>
#include <openssl/err.h>

static mtev_log_stream_t nlerr = NULL;
static mtev_log_stream_t nldeb = NULL;

struct tgt {
enum { TGT_DIRECT, TGT_BROADCAST, TGT_MULTICAST } type;
Expand Down Expand Up @@ -67,6 +71,44 @@ struct mtev_net_heartbeat_context {

static mtev_net_heartbeat_ctx *global;

static int log_ssl_decrypt_error(const char *s, size_t len, void *p) {
(void)len;
char *addr_str = (char *)p;

mtevL(nlerr, "netheartbeat: received decrypt SSL error on message from IP %s: %s",
addr_str ? addr_str : "[unknown]", s);

return 0;
}
static int log_ssl_encrypt_error(const char *s, size_t len, void *p) {
(void) len;
(void) p;
mtevL(nlerr, "netheartbeat: received encrypt SSL error: %s\n", s);
return 0;
}
static inline void get_ip_addr_from_sockaddr(struct sockaddr *peer_addr,
char *buf,
size_t buflen) {
switch (peer_addr->sa_family) {
case AF_INET: {
struct sockaddr_in *addr = (struct sockaddr_in *)peer_addr;
inet_ntop(AF_INET, &(addr->sin_addr), buf, buflen);
break;
}
case AF_INET6: {
struct sockaddr_in6 *addr = (struct sockaddr_in6 *)peer_addr;
inet_ntop(AF_INET6, &(addr->sin6_addr), buf, buflen);
break;
}
default: {
memset(buf, 0, buflen);
strncpy(buf, "unknown", buflen-1);
break;
}
};
}


/* net_int, 16 byte IV, 16 byte magic */
#define HDR_LENSIZE 4
#define HDR_IVSIZE 16
Expand All @@ -90,7 +132,7 @@ mtev_net_heartbeat_handler(eventer_t e, int mask, void *closure, struct timeval
void *text = text_buff;

struct iovec iov[3];
struct msghdr msg = { .msg_iov = iov };;
struct msghdr msg = { .msg_iov = iov };

msg.msg_iov = iov;
while(true) {
Expand All @@ -107,9 +149,13 @@ mtev_net_heartbeat_handler(eventer_t e, int mask, void *closure, struct timeval
len = recvmsg(fd, &msg, MSG_PEEK);
if(len == -1 && errno == EAGAIN) break;
if(len < 0) {
mtevL(mtev_error, "recvmsg error: %s\n", strerror(errno));
mtevL(nlerr, "netheartbeat: recvmsg error: %s\n", strerror(errno));
break;
}
struct sockaddr_storage peer_addr;
socklen_t peer_addr_len = sizeof(peer_addr);
recvfrom(fd, NULL, 0, MSG_PEEK, (struct sockaddr *) &peer_addr, &peer_addr_len);

if(len < (HDR_LENSIZE + HDR_IVSIZE)) {
/* discard */
(void) recvmsg(fd, &msg, 0);
Expand All @@ -119,13 +165,17 @@ mtev_net_heartbeat_handler(eventer_t e, int mask, void *closure, struct timeval

/* Nasty crap to grow buffers if needed */
if(len > payload_len) {
mtevL(nldeb, "netheartbeat: growing buffer in %s: old length %d, new length %d\n", __func__,
payload_len, len);
void *newpayload, *newtext;
newpayload = malloc(len - HDR_IVSIZE);
newtext = malloc(len - HDR_IVSIZE);
if(!newpayload || !newtext) {
free(newpayload);
free(newtext);
mtevL(mtev_error, "recvmsg error: payload too large %d\n", len);
char addr_str[INET6_ADDRSTRLEN];
get_ip_addr_from_sockaddr((struct sockaddr *)&peer_addr, addr_str, sizeof(addr_str));
mtevL(nlerr, "netheartbeat: recvmsg error from %s: payload too large %d\n", addr_str, len);
(void) recvmsg(fd, &msg, 0);
continue;
}
Expand All @@ -143,7 +193,9 @@ mtev_net_heartbeat_handler(eventer_t e, int mask, void *closure, struct timeval
msg.msg_iov[2].iov_len;
len = recvmsg(fd, &msg, 0);
if(len != expected) {
mtevL(mtev_error, "netheartbeat: bad read %d != %d\n", len, expected);
char addr_str[INET6_ADDRSTRLEN];
get_ip_addr_from_sockaddr((struct sockaddr *)&peer_addr, addr_str, sizeof(addr_str));
mtevL(nlerr, "netheartbeat: bad read from %s: %d != %d\n", addr_str, len, expected);
continue;
}

Expand All @@ -152,15 +204,26 @@ mtev_net_heartbeat_handler(eventer_t e, int mask, void *closure, struct timeval
evp_ctx = EVP_CIPHER_CTX_new();
EVP_CipherInit(evp_ctx, EVP_aes_256_cbc(), ctx->key, ivec, false);
mtevAssert(EVP_CIPHER_CTX_iv_length(evp_ctx) == HDR_IVSIZE);
EVP_DecryptUpdate(evp_ctx,text,&outlen1,
(unsigned char *)payload,len);
EVP_DecryptFinal(evp_ctx,text+outlen1,&outlen2);
if ((!EVP_DecryptUpdate(evp_ctx,text,&outlen1,
(unsigned char *)payload,len)) ||
(!EVP_DecryptFinal(evp_ctx,text+outlen1,&outlen2))) {
char addr_str[INET6_ADDRSTRLEN];
get_ip_addr_from_sockaddr((struct sockaddr *)&peer_addr, addr_str, sizeof(addr_str));
ERR_print_errors_cb(log_ssl_decrypt_error, &addr_str);
EVP_CIPHER_CTX_free(evp_ctx);
continue;
}
EVP_CIPHER_CTX_free(evp_ctx);
len = outlen1+outlen2;

hdr = text;
if(hdr[2] != htonl(HBPKTMAGIC1) || hdr[3] != htonl(HBPKTMAGIC2)) {
mtevL(mtev_error, "netheartbeat: malformed packet\n");
char addr_str[INET6_ADDRSTRLEN];
get_ip_addr_from_sockaddr((struct sockaddr *)&peer_addr, addr_str, sizeof(addr_str));
mtevL(nlerr, "netheartbeat: malformed packet received from %s: expected %04x and %04x, got %04x "
"and %04x - len %d\n", addr_str, htonl(HBPKTMAGIC1), htonl(HBPKTMAGIC2), hdr[2], hdr[3],
len);
continue;
}
if(ctx->process_input) {
ctx->process_input(text + HDR_MAGICSIZE, len - HDR_MAGICSIZE,
Expand All @@ -180,18 +243,44 @@ mtev_net_headerbeat_sendall(mtev_net_heartbeat_ctx *ctx, void *payload, int payl
int fd = -1;
struct tgt *tgt = &ctx->targets[i];
switch(tgt->type) {
case TGT_BROADCAST: fd = ctx->sender_v4_bcast; break;
case TGT_MULTICAST: fd = eventer_get_fd(tgt->e); break;
case TGT_BROADCAST:
fd = ctx->sender_v4_bcast;
mtevL(nldeb, "netheartbeat: sending %d byte payload (type TGT_BROADCAST) to fd %d\n", payload_len, fd);
break;
case TGT_MULTICAST:
fd = eventer_get_fd(tgt->e);
mtevL(nldeb, "netheartbeat: sending %d byte payload (type TGT_MULTICAST) to fd %d\n", payload_len, fd);
break;
case TGT_DIRECT:
if(tgt->addr->sa_family == AF_INET) fd = ctx->sender_v4;
else if(tgt->addr->sa_family == AF_INET6) fd = ctx->sender_v6;
if(tgt->addr->sa_family == AF_INET) {
fd = ctx->sender_v4;
if (N_L_S_ON(nldeb)) {
char addr_buf[INET_ADDRSTRLEN];
struct sockaddr_in *addr_in = (struct sockaddr_in *)tgt->addr;
inet_ntop(AF_INET, &(addr_in->sin_addr), addr_buf, INET_ADDRSTRLEN);
mtevL(nldeb, "netheartbeat: sending %d byte payload (type TGT_DIRECT, ipv4) to fd %d (addr %s)\n", payload_len, fd, addr_buf);
}
}
else if (tgt->addr->sa_family == AF_INET6) {
fd = ctx->sender_v6;
if (N_L_S_ON(nldeb)) {
char addr_buf[INET6_ADDRSTRLEN];
struct sockaddr_in6 *addr_in = (struct sockaddr_in6 *)tgt->addr;
inet_ntop(AF_INET6, &(addr_in->sin6_addr), addr_buf, INET6_ADDRSTRLEN);
mtevL(nldeb, "netheartbeat: sending %d byte payload (type TGT_DIRECT, ipv6) to fd %d (addr %s)\n", payload_len, fd, addr_buf);
}
}
break;
}
if(fd >= 0) {
if (sendto(fd, payload, payload_len, 0,
tgt->addr, tgt->len) != payload_len) {
int sent = sendto(fd, payload, payload_len, 0,
tgt->addr, tgt->len);
if (sent != payload_len) {
rv = -1;
mtevL(mtev_error, "Bad send on mtev_net_heartbeat != %d", payload_len);
mtevL(nlerr, "netheartbeat: Bad send on mtev_net_heartbeat: sent bytes (%d) != payload_len (%d)\n", sent, payload_len);
}
else {
mtevL(nldeb, "netheartbeat: successfully sent %d byte payload to fd %d\n", payload_len, fd);
}
}
}
Expand Down Expand Up @@ -245,17 +334,20 @@ mtev_net_heartbeat_serialize_and_send(mtev_net_heartbeat_ctx *ctx) {
if(cipher_buf != cipher_buf_static) free(cipher_buf);
cipher_buf = malloc(len + blocksize * 2);
if(!cipher_buf) {
mtevL(mtev_error, "netheartbeat: malloc(%d) failure\n", len + blocksize * 2);
mtevL(nlerr, "netheartbeat: malloc(%d) failure\n", len + blocksize * 2);
goto bail;
}
}

memcpy(cipher_buf, payload, HDRLEN);
text = (unsigned char *)payload + HDR_LENSIZE + HDR_IVSIZE;
text_len = len - (HDR_LENSIZE + HDR_IVSIZE);
EVP_EncryptUpdate(evp_ctx,cipher_buf+HDR_LENSIZE+HDR_IVSIZE,&outlen1,
text,text_len);
EVP_EncryptFinal(evp_ctx,cipher_buf+HDR_LENSIZE+HDR_IVSIZE+outlen1,&outlen2);
if ((!EVP_EncryptUpdate(evp_ctx,cipher_buf+HDR_LENSIZE+HDR_IVSIZE,&outlen1,
text,text_len)) ||
(!EVP_EncryptFinal(evp_ctx,cipher_buf+HDR_LENSIZE+HDR_IVSIZE+outlen1,
&outlen2))) {
ERR_print_errors_cb(log_ssl_encrypt_error, NULL);
}
EVP_CIPHER_CTX_free(evp_ctx);
len = HDR_LENSIZE + HDR_IVSIZE + outlen1 + outlen2;

Expand Down Expand Up @@ -486,11 +578,11 @@ mtev_net_heartbeat_from_conf(const char *basepath) {
section = mtev_conf_get_section_read(MTEV_CONF_ROOT, basepath);
if(mtev_conf_section_is_empty(section)) goto out;
if(!mtev_conf_get_string(section, "self::node()/@key", &keyhex)) {
mtevL(mtev_error, "netheartbeat section found, but no key attribute!\n");
mtevL(nlerr, "netheartbeat section found, but no key attribute!\n");
goto out;
}
if(strlen(keyhex) != 64) {
mtevL(mtev_error, "netheartbeat key must be 32 bytes (64 hex)!\n");
mtevL(nlerr, "netheartbeat key must be 32 bytes (64 hex)!\n");
free(keyhex);
goto out;
}
Expand All @@ -500,7 +592,7 @@ mtev_net_heartbeat_from_conf(const char *basepath) {
else if(keyhex[i] >= 'a' && keyhex[i] <= 'f') v = keyhex[i] - 'a' + 10;
else if(keyhex[i] >= 'A' && keyhex[i] <= 'F') v = keyhex[i] - 'A' + 10;
else {
mtevL(mtev_error, "netheartbeat key must be hexidecimal!\n");
mtevL(nlerr, "netheartbeat key must be hexidecimal!\n");
free(keyhex);
goto out;
}
Expand All @@ -510,7 +602,7 @@ mtev_net_heartbeat_from_conf(const char *basepath) {

(void)mtev_conf_get_int32(section, "self::node()/@port", &port);
if(port == 0) {
mtevL(mtev_error, "netheartbeat section found, but no port attribute!\n");
mtevL(nlerr, "netheartbeat section found, but no port attribute!\n");
goto out;
}
(void)mtev_conf_get_int32(section, "self::node()/@period", &period);
Expand All @@ -536,11 +628,11 @@ mtev_net_heartbeat_from_conf(const char *basepath) {
(void)mtev_conf_get_int32(notes[i], "self::node()/@ttl", &ttl);
(void)mtev_conf_get_int32(notes[i], "self::node()/@port", &port);
if(port <= 0 || port > 0xffff) {
mtevL(mtev_error, "netheartbeat bad port %d in notify\n", port);
mtevL(nlerr, "netheartbeat bad port %d in notify\n", port);
continue;
}
if(!mtev_conf_get_stringbuf(notes[i], "self::node()/@address", addr_str, sizeof(addr_str))) {
mtevL(mtev_error, "netheartbeat no address in notify\n");
mtevL(nlerr, "netheartbeat no address in notify\n");
continue;
}
if(!mtev_conf_get_stringbuf(notes[i], "self::node()/@type", type_str, sizeof(type_str))) {
Expand All @@ -554,7 +646,7 @@ mtev_net_heartbeat_from_conf(const char *basepath) {
rv = inet_pton(family, addr_str, &a);
}
if(rv != 1) {
mtevL(mtev_error, "netheartbeat bad address: %s\n", addr_str);
mtevL(nlerr, "netheartbeat bad address: %s\n", addr_str);
continue;
}
if(family == AF_INET) {
Expand All @@ -572,26 +664,26 @@ mtev_net_heartbeat_from_conf(const char *basepath) {
in_len = sizeof(in6);
}
else {
mtevL(mtev_error, "netheartbeat bad address family: %d\n", family);
mtevL(nlerr, "netheartbeat bad address family: %d\n", family);
continue;
}
if(!strcmp(type_str, "direct")) {
if(mtev_net_heartbeat_add_target(ctx, in, in_len)) {
mtevL(mtev_error, "netheartbeat error adding: %s:%d\n", addr_str, port);
mtevL(nlerr, "netheartbeat error adding: %s:%d\n", addr_str, port);
}
}
else if(!strcmp(type_str, "broadcast")) {
if(mtev_net_heartbeat_add_broadcast(ctx, in, in_len)) {
mtevL(mtev_error, "netheartbeat error adding: %s:%d\n", addr_str, port);
mtevL(nlerr, "netheartbeat error adding: %s:%d\n", addr_str, port);
}
}
else if(!strcmp(type_str, "multicast")) {
if(mtev_net_heartbeat_add_multicast(ctx, in, in_len, ttl)) {
mtevL(mtev_error, "netheartbeat error adding: %s:%d\n", addr_str, port);
mtevL(nlerr, "netheartbeat error adding: %s:%d\n", addr_str, port);
}
}
else {
mtevL(mtev_error, "netbroadcast notify type unknown: %s\n", type_str);
mtevL(nlerr, "netbroadcast notify type unknown: %s\n", type_str);
}
}
mtev_conf_release_sections_read(notes, cnt);
Expand All @@ -606,7 +698,10 @@ void
mtev_net_heartbeat_init(void) {
static int inited = 0;
if(inited) return;

inited = 1;
nlerr = mtev_log_stream_find("error/netheartbeat");
nldeb = mtev_log_stream_find("debug/netheartbeat");
eventer_name_callback_ext("mtev_net_heartbeat_pulse",
mtev_net_heartbeat_pulse,
mtev_net_heartbeat_pulse_namer, NULL);
Expand Down

0 comments on commit 457130f

Please sign in to comment.