From dd8a94bef2f433560eeac7981fa240fe116377da Mon Sep 17 00:00:00 2001 From: ekoby <7406535+ekoby@users.noreply.github.com> Date: Mon, 4 Dec 2023 20:35:03 -0500 Subject: [PATCH] Fix channel reconnect (#581) * reset connection on failed connect * wait for enroller to finish * update tlsuv@v0.27.2 * set msg seq once * clear up TLS connection on any error --- deps/CMakeLists.txt | 2 +- inc_internal/message.h | 2 +- library/bind.c | 3 ++- library/channel.c | 15 +++++++++++---- library/message.c | 11 +++++++---- library/ziti_ctrl.c | 6 +++--- programs/sample-host/sample-host.c | 4 ++-- programs/sample_http_link/sample_http_link.c | 2 +- programs/sample_wttr/sample_wttr.c | 2 +- tests/integ/bootstrap.exp | 2 ++ tests/message_tests.cpp | 18 ++++++++++++------ tests/ziti_src_tests.cpp | 2 +- 12 files changed, 44 insertions(+), 25 deletions(-) diff --git a/deps/CMakeLists.txt b/deps/CMakeLists.txt index 06907752..f312064f 100644 --- a/deps/CMakeLists.txt +++ b/deps/CMakeLists.txt @@ -8,7 +8,7 @@ else () FetchContent_Declare(tlsuv GIT_REPOSITORY https://github.com/openziti/tlsuv.git - GIT_TAG v0.27.0 + GIT_TAG v0.27.2 ) FetchContent_MakeAvailable(tlsuv) diff --git a/inc_internal/message.h b/inc_internal/message.h index 00c3de5c..ccb83b8d 100644 --- a/inc_internal/message.h +++ b/inc_internal/message.h @@ -99,7 +99,7 @@ message *message_new_from_header(pool_t *pool, uint8_t buf[HEADER_SIZE]); message *message_new(pool_t *pool, uint32_t content, const hdr_t *headers, int nheaders, size_t body_len); -void message_set_seq(message *m, uint32_t seq); +void message_set_seq(message *m, uint32_t *seq); #ifdef __cplusplus diff --git a/library/bind.c b/library/bind.c index 457685e3..3d83b995 100644 --- a/library/bind.c +++ b/library/bind.c @@ -487,7 +487,8 @@ void on_unbind(void *ctx, message *m, int code) { .value = (uint8_t *) &conn_id }, }; - ziti_channel_send(b->ch, ContentTypeStateClosed, headers, 1, NULL, 0, NULL); + message *close_msg = message_new(NULL, ContentTypeStateClosed, headers, 1, 0); + ziti_channel_send_message(b->ch, close_msg, NULL); } else { CONN_LOG(TRACE, "failed to receive unbind response because channel was disconnected: %d/%s", code, ziti_errorstr(code)); } diff --git a/library/channel.c b/library/channel.c index cbd9a7de..96f53674 100644 --- a/library/channel.c +++ b/library/channel.c @@ -122,7 +122,7 @@ static int ziti_channel_init(struct ziti_ctx *ctx, ziti_channel_t *ch, uint32_t ch->ctx = ctx; ch->loop = ctx->loop; ch->id = id; - ch->msg_seq = -1; +// ch->msg_seq = 0; char hostname[MAXHOSTNAMELEN]; size_t hostlen = sizeof(hostname); @@ -363,6 +363,9 @@ void on_channel_send(uv_write_t *w, int status) { int ziti_channel_send_message(ziti_channel_t *ch, message *msg, struct ziti_write_req_s *ziti_write) { uv_buf_t buf = uv_buf_init((char *) msg->msgbufp, msg->msgbuflen); + message_set_seq(msg, &ch->msg_seq); + CH_LOG(TRACE, "=> ct[%04X] seq[%d] len[%d]", msg->header.content, msg->header.seq, msg->header.body_len); + NEWP(req, uv_write_t); if (ziti_write == NULL) { ziti_write = calloc(1, sizeof(struct ziti_write_req_s)); @@ -386,7 +389,7 @@ int ziti_channel_send(ziti_channel_t *ch, uint32_t content, const hdr_t *hdrs, i uint32_t body_len, struct ziti_write_req_s *ziti_write) { message *m = message_new(NULL, content, hdrs, nhdrs, body_len); - message_set_seq(m, ch->msg_seq++); + message_set_seq(m, &ch->msg_seq); CH_LOG(TRACE, "=> ct[%04X] seq[%d] len[%d]", content, m->header.seq, body_len); memcpy(m->body, body, body_len); @@ -406,8 +409,7 @@ ziti_channel_send_for_reply(ziti_channel_t *ch, uint32_t content, const hdr_t *h reply_cb rep_cb, void *reply_ctx) { struct waiter_s *result = NULL; message *m = message_new(NULL, content, hdrs, nhdrs, body_len); - message_set_seq(m, ch->msg_seq++); - CH_LOG(TRACE, "=> ct[%04X] seq[%d] len[%d]", content, m->header.seq, body_len); + message_set_seq(m, &ch->msg_seq); memcpy(m->body, body, body_len); if (rep_cb != NULL) { @@ -838,6 +840,8 @@ static void on_channel_data(uv_stream_t *s, ssize_t len, const uv_buf_t *buf) { CH_LOG(INFO, "channel was closed [%zd/%s]", len, uv_strerror(len)); // propagate close on_channel_close(ch, ZITI_CONNABORT, len); + tlsuv_stream_close(ch->connection, on_tls_close); + ch->connection = NULL; break; default: @@ -881,6 +885,9 @@ static void on_channel_connect_internal(uv_connect_t *req, int status) { free(r); } + tlsuv_stream_close(ch->connection, on_tls_close); + ch->connection = NULL; + if (ch->state != Closed) { ch->state = Disconnected; reconnect_channel(ch, false); diff --git a/library/message.c b/library/message.c index 7c637ce7..a8974906 100644 --- a/library/message.c +++ b/library/message.c @@ -19,7 +19,7 @@ #include "utils.h" #include "endian_internal.h" -static uint8_t *read_int32(const uint8_t *p, uint32_t *val) { +static const uint8_t *read_int32(const uint8_t *p, uint32_t *val) { *val = le32toh(*(uint32_t *) p); return p + sizeof(uint32_t); } @@ -84,7 +84,7 @@ uint8_t *write_hdr(const hdr_t *h, uint8_t *buf) { } int parse_hdrs(uint8_t *buf, uint32_t len, hdr_t **hp) { - uint8_t *p = buf; + const uint8_t *p = buf; hdr_t *headers = NULL; int count = 0; @@ -230,7 +230,10 @@ message *message_new(pool_t *pool, uint32_t content, const hdr_t *hdrs, int nhdr return m; } -void message_set_seq(message *m, uint32_t seq) { - m->header.seq = seq; +void message_set_seq(message *m, uint32_t *seq) { + if (m->header.seq == 0) { + *seq += 1; + m->header.seq = *seq; + } header_to_buffer(&m->header, m->msgbufp); } diff --git a/library/ziti_ctrl.c b/library/ziti_ctrl.c index b591f09e..ea40e5e0 100644 --- a/library/ziti_ctrl.c +++ b/library/ziti_ctrl.c @@ -132,7 +132,7 @@ static void ctrl_paging_req(struct ctrl_resp *resp); static void ctrl_default_cb(void *s, const ziti_error *e, struct ctrl_resp *resp); -static void ctrl_body_cb(tlsuv_http_req_t *req, const char *b, ssize_t len); +static void ctrl_body_cb(tlsuv_http_req_t *req, char *b, ssize_t len); static tlsuv_http_req_t * start_request(tlsuv_http_t *http, const char *method, const char *path, tlsuv_http_resp_cb cb, struct ctrl_resp *resp) { @@ -285,11 +285,11 @@ static void ctrl_service_cb(ziti_service **services, ziti_error *e, struct ctrl_ free(services); } -static void free_body_cb(tlsuv_http_req_t *req, const char *body, ssize_t len) { +static void free_body_cb(tlsuv_http_req_t *req, char *body, ssize_t len) { free((char *) body); } -static void ctrl_body_cb(tlsuv_http_req_t *req, const char *b, ssize_t len) { +static void ctrl_body_cb(tlsuv_http_req_t *req, char *b, ssize_t len) { struct ctrl_resp *resp = req->data; ziti_controller *ctrl = resp->ctrl; diff --git a/programs/sample-host/sample-host.c b/programs/sample-host/sample-host.c index 6036cf0a..684e3f23 100644 --- a/programs/sample-host/sample-host.c +++ b/programs/sample-host/sample-host.c @@ -130,7 +130,7 @@ void on_connect(ziti_connection conn, int status) { static size_t total; -ssize_t on_data(ziti_connection c, uint8_t *buf, ssize_t len) { +ssize_t on_data(ziti_connection c, const uint8_t *buf, ssize_t len) { if (len == ZITI_EOF) { printf("request completed: %s\n", ziti_errorstr(len)); @@ -152,7 +152,7 @@ ssize_t on_data(ziti_connection c, uint8_t *buf, ssize_t len) { static uv_signal_t sig; static void on_signal(uv_signal_t *h, int signal) { ziti_context ztx = h->data; - ziti_dump(ztx, fprintf, stdout); + ziti_dump(ztx, (int (*)(void *, const char *, ...)) fprintf, stdout); } static void on_ziti_init(ziti_context ztx, const ziti_event_t *ev) { diff --git a/programs/sample_http_link/sample_http_link.c b/programs/sample_http_link/sample_http_link.c index 0477a463..77780345 100644 --- a/programs/sample_http_link/sample_http_link.c +++ b/programs/sample_http_link/sample_http_link.c @@ -45,7 +45,7 @@ void resp_cb(tlsuv_http_resp_t *resp, void *data) { printf("\n"); } -void body_cb(tlsuv_http_req_t *req, const char *body, ssize_t len) { +void body_cb(tlsuv_http_req_t *req, char *body, ssize_t len) { if (len == UV_EOF) { printf("\n\n====================\nRequest completed\n"); ziti_shutdown(ziti); diff --git a/programs/sample_wttr/sample_wttr.c b/programs/sample_wttr/sample_wttr.c index a42519ad..a8150f84 100644 --- a/programs/sample_wttr/sample_wttr.c +++ b/programs/sample_wttr/sample_wttr.c @@ -29,7 +29,7 @@ exit(code);\ static size_t total; static ziti_context ziti; -ssize_t on_data(ziti_connection c, uint8_t *buf, ssize_t len) { +ssize_t on_data(ziti_connection c, const uint8_t *buf, ssize_t len) { if (len == ZITI_EOF) { printf("request completed: %s\n", ziti_errorstr(len)); diff --git a/tests/integ/bootstrap.exp b/tests/integ/bootstrap.exp index d5bbfe37..eccdd93f 100644 --- a/tests/integ/bootstrap.exp +++ b/tests/integ/bootstrap.exp @@ -71,12 +71,14 @@ expect { "ziti identity is saved in ./test-server.json" {} eof { error "test-server not enrolled" } } +wait spawn $enroller ./test-client.jwt ./test-client.json expect { "ziti identity is saved in ./test-client.json" {} eof { error "test-client not enrolled" } } +wait diff --git a/tests/message_tests.cpp b/tests/message_tests.cpp index c9f21a06..a97d325c 100644 --- a/tests/message_tests.cpp +++ b/tests/message_tests.cpp @@ -34,12 +34,13 @@ TEST_CASE("simple", "[model]") { }, }; auto content1 = "this is a message"; + uint32_t s1 = 3333; auto m1 = message_new(p, ContentTypeData, headers, 2, strlen(content1)); strncpy(reinterpret_cast(m1->body), content1, strlen(content1)); - message_set_seq(m1, 3333); + message_set_seq(m1, &s1); auto m2 = message_new_from_header(p, m1->msgbufp); - CHECK(m2->header.seq == 3333); + CHECK(m2->header.seq == 3334); CHECK(m2->msgbuflen == m1->msgbuflen); memcpy(m2->msgbufp, m1->msgbufp, m1->msgbuflen); m2->nhdrs = parse_hdrs(m2->headers, m2->header.headers_len, &m2->hdrs); @@ -75,13 +76,16 @@ TEST_CASE("large", "[model]") { .value = (uint8_t *) "bar" }, }; + uint32_t seq = 3333; auto content1 = "this is a very long message, it won't fint into the pooled message structure"; auto m1 = message_new(p, ContentTypeData, headers, 2, strlen(content1)); strncpy(reinterpret_cast(m1->body), content1, strlen(content1)); - message_set_seq(m1, 3333); + message_set_seq(m1, &seq); + auto m2 = message_new_from_header(p, m1->msgbufp); - CHECK(m2->header.seq == 3333); + CHECK(m2->header.seq == 3334); + CHECK(seq == 3334); CHECK(m2->msgbuflen == m1->msgbuflen); memcpy(m2->msgbufp, m1->msgbufp, m1->msgbuflen); m2->nhdrs = parse_hdrs(m2->headers, m2->header.headers_len, &m2->hdrs); @@ -116,13 +120,15 @@ TEST_CASE("large unpooled", "[model]") { .value = (uint8_t *) "bar" }, }; + uint32_t seq = 3333; auto content1 = "this is a very long message, it won't fint into the pooled message structure"; auto m1 = message_new(p, ContentTypeData, headers, 2, strlen(content1)); strncpy(reinterpret_cast(m1->body), content1, strlen(content1)); - message_set_seq(m1, 3333); + message_set_seq(m1, &seq); auto m2 = message_new_from_header(nullptr, m1->msgbufp); - CHECK(m2->header.seq == 3333); + CHECK(m2->header.seq == 3334); + CHECK(seq == 3334); CHECK(m2->msgbuflen == m1->msgbuflen); memcpy(m2->msgbufp, m1->msgbufp, m1->msgbuflen); m2->nhdrs = parse_hdrs(m2->headers, m2->header.headers_len, &m2->hdrs); diff --git a/tests/ziti_src_tests.cpp b/tests/ziti_src_tests.cpp index a9b83e8c..544befe2 100644 --- a/tests/ziti_src_tests.cpp +++ b/tests/ziti_src_tests.cpp @@ -60,7 +60,7 @@ TEST_CASE("httpbin.ziti:ziti_src", "[integ]") { auto t = (source_test*)ctx; t->code = resp->code; - resp->body_cb = [](tlsuv_http_req_t *req, const char *body, ssize_t len){ + resp->body_cb = [](tlsuv_http_req_t *req, char *body, ssize_t len){ auto t = (source_test*)req->data; if (len > 0) t->body.append(body, len);