From 0d0caab0616817c97a15324dfd955f2afb334417 Mon Sep 17 00:00:00 2001 From: Kenneth Giusti Date: Mon, 25 Nov 2024 15:55:27 -0500 Subject: [PATCH] Fixes #1700: refactor the AMQP link lifecycle This change removes some of the old link attach routing logic and attempts to clean up the link API. The logic that used to track the exchange of Attach/Detach performatives has been simplified. The various counters and booleans maintained by the qdr_link_t structure for tracking this exchange has been reduced to a mask/flag implementation similar to Protons implementation of endpoint state. This patch refactors the link detach adaptor API to be more like the existing AMQP connection API: there is now an explict API call to release the link instance at the end of its lifecycle. The adaptor API is modified by separating the AMQP detach handling from the release of the link instance. The old qdr_link_detach() adaptor function has been refactored into two functions: qdr_link_detach_received() and qdr_link_close(). The qdr_link_detach_received() call is made by the AMQP adaptor when a Detach Peformative has been received by the peer. It is only used by the AMQP adaptor. The new qdr_link_closed() API call is made by all adaptors when the link instance is destroyed. This is similar to the existing qdr_connection_closed() call but for links. It is used by all adaptors to indicate to the core that the link is no longer in use and can be cleaned up. In the case of the AMQP adaptor this call will be made after the link detach handshake has completed. Test coverage by the test-sender AMQP client has been increased by adding a clean connection close function. --- include/qpid/dispatch/protocol_adaptor.h | 31 ++- src/adaptors/amqp/amqp_adaptor.c | 96 +++++----- src/adaptors/amqp/container.c | 56 +++--- src/adaptors/amqp/container.h | 2 - src/adaptors/amqp/node_type.h | 15 +- src/adaptors/tcp/tcp_adaptor.c | 6 +- src/router_core/connections.c | 178 ++++++++++-------- src/router_core/core_link_endpoint.c | 14 +- src/router_core/core_link_endpoint.h | 4 +- .../address_lookup_client.c | 4 +- .../modules/edge_router/addr_proxy.c | 6 +- .../streaming_link_scrubber.c | 2 +- src/router_core/route_control.c | 2 +- src/router_core/router_core_private.h | 25 ++- src/router_core/transfer.c | 10 +- tests/test-receiver.c | 1 - tests/test-sender.c | 100 ++++++---- 17 files changed, 311 insertions(+), 241 deletions(-) diff --git a/include/qpid/dispatch/protocol_adaptor.h b/include/qpid/dispatch/protocol_adaptor.h index 8bb80b3c3..648117b22 100644 --- a/include/qpid/dispatch/protocol_adaptor.h +++ b/include/qpid/dispatch/protocol_adaptor.h @@ -653,12 +653,6 @@ void qdr_terminus_set_dnp_address_iterator(qdr_terminus_t *term, qd_iterator_t * ****************************************************************************** */ -typedef enum { - QD_DETACHED, // Protocol detach - QD_CLOSED, // Protocol close - QD_LOST // Connection or session closed -} qd_detach_type_t; - /** * qdr_link_set_context * @@ -810,15 +804,32 @@ qdr_link_t *qdr_link_first_attach(qdr_connection_t *conn, void qdr_link_second_attach(qdr_link_t *link, qdr_terminus_t *source, qdr_terminus_t *target); /** - * qdr_link_detach + * qdr_link_detach_received * - * This function is invoked when a link detach arrives. + * This function is invoked when a link detach performative arrives from the remote peer. This may the first detach + * (peer-initiated link detach) or in response to a detach sent by the router (second detach). * * @param link The link pointer returned by qdr_link_first_attach or in a FIRST_ATTACH event. - * @param dt The type of detach that occurred. * @param error The link error from the detach frame or 0 if none. */ -void qdr_link_detach(qdr_link_t *link, qd_detach_type_t dt, qdr_error_t *error); +void qdr_link_detach_received(qdr_link_t *link, qdr_error_t *error); + + +/** + * qdr_link_closed + * + * This function is invoked by the adaptor when the link has fully closed. This will be the last call made by the + * adaptor for this link. This may be called as a result of a successful detach handshake or due to link loss. This will + * also be called during adaptor shutdown on any outstanding links. + * + * The core may free the qdr_link_t by this call. The adaptor MUST NOT reference the qdr_link_t on return from this + * call. + * + * @param link The link pointer returned by qdr_link_first_attach or in a FIRST_ATTACH event. + * @param forced True if the link was closed due to failure or shutdown. False if closed by clean detach handshake. + */ +void qdr_link_closed(qdr_link_t *link, bool forced); + /** * qdr_link_deliver diff --git a/src/adaptors/amqp/amqp_adaptor.c b/src/adaptors/amqp/amqp_adaptor.c index 2a78c013d..172e21c2d 100644 --- a/src/adaptors/amqp/amqp_adaptor.c +++ b/src/adaptors/amqp/amqp_adaptor.c @@ -139,24 +139,6 @@ static qdr_delivery_t *qdr_node_delivery_qdr_from_pn(pn_delivery_t *dlv) return ref ? (qdr_delivery_t*) ref->ref : 0; } -// clean up all qdr_delivery/pn_delivery bindings for the link -// -void qd_link_abandoned_deliveries_handler(qd_router_t *router, qd_link_t *link) -{ - qd_link_ref_list_t *list = qd_link_get_ref_list(link); - qd_link_ref_t *ref = DEQ_HEAD(*list); - - while (ref) { - qdr_delivery_t *dlv = (qdr_delivery_t*) ref->ref; - pn_delivery_t *pdlv = qdr_delivery_get_context(dlv); - assert(pdlv && ref == (qd_link_ref_t*) pn_delivery_get_context(pdlv)); - - // this will remove and release the ref - qdr_node_disconnect_deliveries(router->router_core, link, dlv, pdlv); - ref = DEQ_HEAD(*list); - } -} - // read the delivery-state set by the remote endpoint // @@ -1223,10 +1205,9 @@ static int AMQP_link_flow_handler(qd_router_t *router, qd_link_t *link) /** * Link Detached Handler */ -static int AMQP_link_detach_handler(qd_router_t *router, qd_link_t *link, qd_detach_type_t dt) +static int AMQP_link_detach_handler(qd_router_t *router, qd_link_t *link) { - if (!link) - return 0; + assert(link); pn_link_t *pn_link = qd_link_pn(link); if (!pn_link) @@ -1257,29 +1238,55 @@ static int AMQP_link_detach_handler(qd_router_t *router, qd_link_t *link, qd_det } } - qdr_link_t *rlink = (qdr_link_t*) qd_link_get_context(link); - pn_condition_t *cond = qd_link_pn(link) ? pn_link_remote_condition(qd_link_pn(link)) : 0; + // Notify the core that a detach has been received + qdr_link_t *rlink = (qdr_link_t *) qd_link_get_context(link); if (rlink) { - // - // If this is the second (response) detach or the link hasn't really detached but is being dropped due to parent - // connection/session loss then this is the last proton event that will be generated for this link. The qd_link - // will be freed on return from this call so remove the cross linkage between it and the qdr_link peer. - - if (dt == QD_LOST || qdr_link_get_context(rlink) == 0) { - // note qdr_link context will be zeroed when the core sends the first detach, so if it is zero then this is - // the second detach! - qd_link_set_context(link, 0); - qdr_link_set_context(rlink, 0); - } - - qdr_error_t *error = qdr_error_from_pn(cond); - qdr_link_detach(rlink, dt, error); + pn_condition_t *cond = pn_link_remote_condition(pn_link); + qdr_error_t *error = qdr_error_from_pn(cond); + qdr_link_detach_received(rlink, error); } return 0; } + +/** + * Link closed handler + * + * This is the last callback for the given link - the link will be freed on return from this call! Forced is true if the + * link has not properly closed (detach handshake completed). +*/ +static void AMQP_link_closed_handler(qd_router_t *router, qd_link_t *qd_link, bool forced) +{ + assert(qd_link); + + // Clean up all qdr_delivery/pn_delivery bindings for the link. + + qd_link_ref_list_t *list = qd_link_get_ref_list(qd_link); + qd_link_ref_t *ref = DEQ_HEAD(*list); + + while (ref) { + qdr_delivery_t *dlv = (qdr_delivery_t*) ref->ref; + pn_delivery_t *pdlv = qdr_delivery_get_context(dlv); + assert(pdlv && ref == (qd_link_ref_t*) pn_delivery_get_context(pdlv)); + + // This will decrement the qdr_delivery_t reference count - do not access the dlv pointer after this call! + qdr_node_disconnect_deliveries(router->router_core, qd_link, dlv, pdlv); + ref = DEQ_HEAD(*list); + } + + qdr_link_t *qdr_link = (qdr_link_t *) qd_link_get_context(qd_link); + if (qdr_link) { + // Notify core that this link no longer exists + qdr_link_set_context(qdr_link, 0); + qd_link_set_context(qd_link, 0); + qdr_link_closed(qdr_link, forced); + // This will cause the core to free qdr_link at some point so: + qdr_link = 0; + } +} + static void bind_connection_context(qdr_connection_t *qdrc, void* token) { qd_connection_t *conn = (qd_connection_t*) token; @@ -1776,8 +1783,8 @@ static const qd_node_type_t router_node = {"router", 0, AMQP_outgoing_link_handler, AMQP_conn_wake_handler, AMQP_link_detach_handler, + AMQP_link_closed_handler, AMQP_link_attach_handler, - qd_link_abandoned_deliveries_handler, AMQP_link_flow_handler, 0, // node_created_handler 0, // node_destroyed_handler @@ -1920,7 +1927,7 @@ static void CORE_link_detach(void *context, qdr_link_t *link, qdr_error_t *error return; pn_link_t *pn_link = qd_link_pn(qlink); - if (!pn_link) + if (!pn_link || !!(pn_link_state(pn_link) & PN_LOCAL_CLOSED)) // already detached return; if (error) { @@ -1945,17 +1952,6 @@ static void CORE_link_detach(void *context, qdr_link_t *link, qdr_error_t *error } } - // - // This is the last event for this link that the core is going to send into Proton so remove the core => adaptor - // linkage. If this is the response attach then there will be no further proton link events to send to the core so - // remove the adaptor => core linkage. If this is the first (request) detach preserve the adaptor => core linkage so - // we can notify the core when the second (response) detach arrives - // - qdr_link_set_context(link, 0); - if (!first) { - qd_link_set_context(qlink, 0); - } - qd_link_close(qlink); } diff --git a/src/adaptors/amqp/container.c b/src/adaptors/amqp/container.c index e18824496..b35ddb705 100644 --- a/src/adaptors/amqp/container.c +++ b/src/adaptors/amqp/container.c @@ -64,6 +64,9 @@ struct qd_link_t { ALLOC_DEFINE_SAFE(qd_link_t); ALLOC_DEFINE(qd_link_ref_t); +static void qd_link_free(qd_link_t *); + + /** Encapsulates a proton session */ struct qd_session_t { DEQ_LINKS(qd_session_t); @@ -277,7 +280,8 @@ static void notify_closed(qd_container_t *container, qd_connection_t *conn, void // The given connection has dropped. There will be no further link events for this connection so manually clean up all -// links +// links. Note that we do not free the pn_link_t - proton will free all links when the parent connection is freed. +// static void close_links(qd_container_t *container, pn_connection_t *conn, bool print_log) { pn_link_t *pn_link = pn_link_head(conn, 0); @@ -289,7 +293,7 @@ static void close_links(qd_container_t *container, pn_connection_t *conn, bool p if (print_log) qd_log(LOG_CONTAINER, QD_LOG_DEBUG, "Aborting link '%s' due to parent connection end", pn_link_name(pn_link)); - container->ntype->link_detach_handler(container->qd_router, qd_link, QD_LOST); + container->ntype->link_closed_handler(container->qd_router, qd_link, true); // true == forced qd_link_free(qd_link); } @@ -318,6 +322,7 @@ static void cleanup_link(qd_link_t *link) // cleanup any inbound message that has not been forwarded qd_message_t *msg = qd_alloc_deref_safe_ptr(&link->incoming_msg); if (msg) { + qd_nullify_safe_ptr(&link->incoming_msg); qd_message_free(msg); } } @@ -326,8 +331,7 @@ static void cleanup_link(qd_link_t *link) static int close_handler(qd_container_t *container, pn_connection_t *conn, qd_connection_t* qd_conn) { // - // Close all links, passing QD_LOST as the reason. These links are not - // being properly 'detached'. They are being orphaned. + // Close all links. These links are not being properly 'detached'. They are being orphaned. // if (qd_conn) qd_conn->closed = true; @@ -508,9 +512,9 @@ void qd_container_handle_event(qd_container_t *container, pn_event_t *event, } if (!(pn_connection_state(conn) & PN_LOCAL_CLOSED)) { if (pn_session_state(ssn) == (PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED)) { - // Remote has nuked our session. Check for any links that were - // left open and forcibly detach them, since no detaches will - // arrive on this session. + // Remote has closed the session. Check for any child links and forcibly close them since there will be + // no detach performatives arriving for these links. Note that we do not free the pn_link_t since proton + // will free all child pn_link_t when it frees the session. pn_link = pn_link_head(conn, 0); while (pn_link) { pn_link_t *next_link = pn_link_next(pn_link, 0); @@ -529,7 +533,7 @@ void qd_container_handle_event(qd_container_t *container, pn_event_t *event, } qd_log(LOG_CONTAINER, QD_LOG_DEBUG, "Aborting link '%s' due to parent session end", pn_link_name(pn_link)); - container->ntype->link_detach_handler(container->qd_router, qd_link, QD_LOST); + container->ntype->link_closed_handler(container->qd_router, qd_link, true); qd_link_free(qd_link); } } @@ -590,10 +594,6 @@ void qd_container_handle_event(qd_container_t *container, pn_event_t *event, pn_link = pn_event_link(event); qd_link = (qd_link_t*) pn_link_get_context(pn_link); if (qd_link) { - qd_detach_type_t dt = pn_event_type(event) == PN_LINK_REMOTE_CLOSE ? QD_CLOSED : QD_DETACHED; - if (qd_link->pn_link == pn_link) { - pn_link_close(pn_link); - } if (qd_link->policy_counted) { qd_link->policy_counted = false; if (pn_link_is_sender(pn_link)) { @@ -609,16 +609,21 @@ void qd_container_handle_event(qd_container_t *container, pn_event_t *event, } } - container->ntype->link_detach_handler(container->qd_router, qd_link, dt); + // notify arrival of inbound detach + container->ntype->link_detach_handler(container->qd_router, qd_link); - if (pn_link_state(pn_link) & PN_LOCAL_CLOSED) { - // link fully closed - add_link_to_free_list(&qd_conn->free_link_list, pn_link); + if (pn_link_state(pn_link) == (PN_LOCAL_CLOSED | PN_REMOTE_CLOSED)) { + // Link now fully detached + container->ntype->link_closed_handler(container->qd_router, qd_link, false); qd_link_free(qd_link); + add_link_to_free_list(&qd_conn->free_link_list, pn_link); + } + } else { // no qd_link, manually detach or free + if ((pn_link_state(pn_link) & PN_LOCAL_CLOSED) == 0) { + pn_link_close(pn_link); + } else { + add_link_to_free_list(&qd_conn->free_link_list, pn_link); } - - } else { - add_link_to_free_list(&qd_conn->free_link_list, pn_link); } } break; @@ -626,8 +631,13 @@ void qd_container_handle_event(qd_container_t *container, pn_event_t *event, case PN_LINK_LOCAL_CLOSE: pn_link = pn_event_link(event); if (pn_link_state(pn_link) == (PN_LOCAL_CLOSED | PN_REMOTE_CLOSED)) { - add_link_to_free_list(&qd_conn->free_link_list, pn_link); - qd_link_free((qd_link_t *) pn_link_get_context(pn_link)); + qd_link_t *qd_link = (qd_link_t*) pn_link_get_context(pn_link); + if (qd_link) { + // Link now fully detached + container->ntype->link_closed_handler(container->qd_router, qd_link, false); + qd_link_free(qd_link); + } + add_link_to_free_list(&qd_conn->free_link_list, pn_link); // why??? } break; @@ -775,7 +785,7 @@ qd_link_t *qd_link(qd_connection_t *conn, qd_direction_t dir, const char* name, } -void qd_link_free(qd_link_t *link) +static void qd_link_free(qd_link_t *link) { if (!link) return; @@ -783,8 +793,6 @@ void qd_link_free(qd_link_t *link) DEQ_REMOVE(amqp_adaptor.container->links, link); sys_mutex_unlock(&amqp_adaptor.container->lock); - amqp_adaptor.container->ntype->link_abandoned_deliveries_handler(amqp_adaptor.container->qd_router, link); - cleanup_link(link); free_qd_link_t(link); } diff --git a/src/adaptors/amqp/container.h b/src/adaptors/amqp/container.h index c133c9671..026956403 100644 --- a/src/adaptors/amqp/container.h +++ b/src/adaptors/amqp/container.h @@ -68,7 +68,6 @@ qd_container_t *qd_container(qd_router_t *router, const qd_node_type_t *node_typ void qd_container_free(qd_container_t *container); qd_link_t *qd_link(qd_connection_t *conn, qd_direction_t dir, const char *name, qd_session_class_t); -void qd_link_free(qd_link_t *link); /** * List of reference in the qd_link used to track abandoned deliveries @@ -98,7 +97,6 @@ pn_terminus_t *qd_link_target(qd_link_t *link); pn_terminus_t *qd_link_remote_source(qd_link_t *link); pn_terminus_t *qd_link_remote_target(qd_link_t *link); void qd_link_close(qd_link_t *link); -void qd_link_free(qd_link_t *link); void qd_link_q2_restart_receive(const qd_alloc_safe_ptr_t context); void qd_link_q3_block(qd_link_t *link); void qd_link_q3_unblock(qd_link_t *link); diff --git a/src/adaptors/amqp/node_type.h b/src/adaptors/amqp/node_type.h index 1308ee404..b5bbcc448 100644 --- a/src/adaptors/amqp/node_type.h +++ b/src/adaptors/amqp/node_type.h @@ -26,10 +26,10 @@ typedef struct qd_router_t qd_router_t; typedef bool (*qd_container_delivery_handler_t) (qd_router_t *, qd_link_t *link); typedef void (*qd_container_disposition_handler_t) (qd_router_t *, qd_link_t *link, pn_delivery_t *pnd); typedef int (*qd_container_link_handler_t) (qd_router_t *, qd_link_t *link); -typedef int (*qd_container_link_detach_handler_t) (qd_router_t *, qd_link_t *link, qd_detach_type_t dt); +typedef int (*qd_container_link_detach_handler_t) (qd_router_t *, qd_link_t *link); +typedef void (*qd_container_link_closed_handler_t) (qd_router_t *, qd_link_t *link, bool forced); typedef void (*qd_container_node_handler_t) (qd_router_t *); typedef int (*qd_container_conn_handler_t) (qd_router_t *, qd_connection_t *conn, void *context); -typedef void (*qd_container_link_abandoned_deliveries_handler_t) (qd_router_t *, qd_link_t *link); /** * A set of Node handlers for deliveries, links and container events. @@ -57,15 +57,20 @@ struct qd_node_type_t { /** Invoked when an activated connection is available for writing. */ qd_container_conn_handler_t writable_handler; - /** Invoked when a link is detached. */ + /** Invoked when link detached is received. */ qd_container_link_detach_handler_t link_detach_handler; + + /** The last callback issued for the given qd_link_t. The adaptor must clean up all state related to the qd_link_t + * as it will be freed on return from this call. The forced flag is set to true if the link is being forced closed + * due to the parent connection/session closing or on shutdown. + */ + qd_container_link_closed_handler_t link_closed_handler; + ///@} /** Invoked when a link we created was opened by the peer */ qd_container_link_handler_t link_attach_handler; - qd_container_link_abandoned_deliveries_handler_t link_abandoned_deliveries_handler; - /** Invoked when a link receives a flow event */ qd_container_link_handler_t link_flow_handler; diff --git a/src/adaptors/tcp/tcp_adaptor.c b/src/adaptors/tcp/tcp_adaptor.c index 00d9a80b4..d3ecb885d 100644 --- a/src/adaptors/tcp/tcp_adaptor.c +++ b/src/adaptors/tcp/tcp_adaptor.c @@ -628,7 +628,7 @@ static void close_connection_XSIDE_IO(qd_tcp_connection_t *conn) } if (!!conn->inbound_link) { - qdr_link_detach(conn->inbound_link, QD_LOST, 0); + qdr_link_closed(conn->inbound_link, true); } if (!!conn->outbound_delivery) { @@ -638,7 +638,7 @@ static void close_connection_XSIDE_IO(qd_tcp_connection_t *conn) } if (!!conn->outbound_link) { - qdr_link_detach(conn->outbound_link, QD_LOST, 0); + qdr_link_closed(conn->outbound_link, true); } if (conn->observer_handle) { @@ -2501,7 +2501,7 @@ QD_EXPORT void qd_dispatch_delete_tcp_connector(qd_dispatch_t *qd, void *impl) // if (!!connector->out_link) { qdr_link_set_context(connector->out_link, 0); - qdr_link_detach(connector->out_link, QD_LOST, 0); + qdr_link_closed(connector->out_link, true); connector->out_link = 0; } diff --git a/src/router_core/connections.c b/src/router_core/connections.c index c7977dba6..c340198b1 100644 --- a/src/router_core/connections.c +++ b/src/router_core/connections.c @@ -36,9 +36,8 @@ static void qdr_connection_closed_CT(qdr_core_t *core, qdr_action_t *action, boo static void qdr_link_inbound_first_attach_CT(qdr_core_t *core, qdr_action_t *action, bool discard); static void qdr_link_inbound_second_attach_CT(qdr_core_t *core, qdr_action_t *action, bool discard); static void qdr_link_inbound_detach_CT(qdr_core_t *core, qdr_action_t *action, bool discard); -static void qdr_link_detach_sent_CT(qdr_core_t *core, qdr_action_t *action, bool discard); +static void qdr_link_closed_CT(qdr_core_t *core, qdr_action_t *action, bool discard); static void qdr_link_processing_complete_CT(qdr_core_t *core, qdr_action_t *action, bool discard); -static void qdr_link_detach_sent(qdr_link_t *link); static void qdr_link_processing_complete(qdr_core_t *core, qdr_link_t *link); static void qdr_connection_group_cleanup_CT(qdr_core_t *core, qdr_connection_t *conn); static void qdr_connection_set_tracing_CT(qdr_core_t *core, qdr_action_t *action, bool discard); @@ -500,7 +499,7 @@ int qdr_connection_process(qdr_connection_t *conn) } sys_mutex_lock(&conn->work_lock); - if (link_work->work_type == QDR_LINK_WORK_DELIVERY && link_work->value > 0 && !link->detach_received) { + if (link_work->work_type == QDR_LINK_WORK_DELIVERY && link_work->value > 0) { // link_work ref transferred from link_work to work_list DEQ_INSERT_HEAD(link->work_list, link_work); link_work->processing = false; @@ -518,11 +517,9 @@ int qdr_connection_process(qdr_connection_t *conn) event_count++; } - if (detach_sent) { - // let the core thread know so it can clean up - qdr_link_detach_sent(link); - } else + if (!detach_sent) { qdr_record_link_credit(core, link); + } ref = DEQ_NEXT(ref); } @@ -697,6 +694,7 @@ qdr_link_t *qdr_link_first_attach(qdr_connection_t *conn, strcpy(link->name, name); link->link_direction = dir; + link->state = QDR_LINK_STATE_UNINIT; // transition to first attach occurs on core thread link->capacity = conn->link_capacity; link->credit_pending = conn->link_capacity; link->admin_enabled = true; @@ -764,25 +762,23 @@ void qdr_link_second_attach(qdr_link_t *link, qdr_terminus_t *source, qdr_termin } -void qdr_link_detach(qdr_link_t *link, qd_detach_type_t dt, qdr_error_t *error) +void qdr_link_detach_received(qdr_link_t *link, qdr_error_t *error) { - qdr_action_t *action = qdr_action(qdr_link_inbound_detach_CT, "link_detach"); + qdr_action_t *action = qdr_action(qdr_link_inbound_detach_CT, "link_detach_received"); set_safe_ptr_qdr_connection_t(link->conn, &action->args.connection.conn); set_safe_ptr_qdr_link_t(link, &action->args.connection.link); action->args.connection.error = error; - action->args.connection.dt = dt; qdr_action_enqueue(link->core, action); } -/* let the core thread know that a dispatch has been sent by the I/O thread - */ -static void qdr_link_detach_sent(qdr_link_t *link) +void qdr_link_closed(qdr_link_t *link, bool forced) { - qdr_action_t *action = qdr_action(qdr_link_detach_sent_CT, "link_detach_sent"); + qdr_action_t *action = qdr_action(qdr_link_closed_CT, "link_closed"); set_safe_ptr_qdr_link_t(link, &action->args.connection.link); + action->args.connection.forced_close = forced; qdr_action_enqueue(link->core, action); } @@ -1213,6 +1209,7 @@ qdr_link_t *qdr_create_link_CT(qdr_core_t *core, link->conn_id = conn->identity; link->link_type = link_type; link->link_direction = dir; + link->state = QDR_LINK_STATE_ATTACH_SENT; link->capacity = conn->link_capacity; link->credit_pending = conn->link_capacity; link->name = (char*) malloc(QD_DISCRIMINATOR_SIZE + 8); @@ -1223,7 +1220,6 @@ qdr_link_t *qdr_create_link_CT(qdr_core_t *core, link->oper_status = QDR_LINK_OPER_DOWN; link->insert_prefix = 0; link->strip_prefix = 0; - link->attach_count = 1; link->core_ticks = qdr_core_uptime_ticks(core); link->zero_credit_time = link->core_ticks; link->priority = priority; @@ -1267,8 +1263,11 @@ qdr_link_t *qdr_create_link_CT(qdr_core_t *core, } -void qdr_link_outbound_detach_CT(qdr_core_t *core, qdr_link_t *link, qdr_error_t *error, qdr_condition_t condition, bool close) +void qdr_link_outbound_detach_CT(qdr_core_t *core, qdr_link_t *link, qdr_error_t *error, qdr_condition_t condition) { + assert((link->state & QDR_LINK_STATE_DETACH_SENT) == 0); + link->state |= QDR_LINK_STATE_DETACH_SENT; + // // Ensure a pooled link is no longer available for streaming messages // @@ -1283,8 +1282,8 @@ void qdr_link_outbound_detach_CT(qdr_core_t *core, qdr_link_t *link, qdr_error_t // tell the I/O thread to do the detach // - link->detach_count += 1; - qdr_link_work_t *work = qdr_link_work(link->detach_count == 1 ? QDR_LINK_WORK_FIRST_DETACH : QDR_LINK_WORK_SECOND_DETACH); + bool first_detach = (link->state & QDR_LINK_STATE_DETACH_RECVD) == 0; // haven't received a detach + qdr_link_work_t *work = qdr_link_work(first_detach ? QDR_LINK_WORK_FIRST_DETACH : QDR_LINK_WORK_SECOND_DETACH); if (error) work->error = error; @@ -2034,12 +2033,13 @@ static void qdr_link_inbound_first_attach_CT(qdr_core_t *core, qdr_action_t *act return; } - qd_direction_t dir = action->args.connection.dir; + qd_direction_t dir = action->args.connection.dir; // - // Start the attach count. + // Expect this is the initial attach (remote initiated link) // - link->attach_count = 1; + assert((link->state & QDR_LINK_STATE_ATTACH_SENT) == 0); + link->state |= QDR_LINK_STATE_ATTACH_RECVD; // // Put the link into the proper lists for tracking. @@ -2058,7 +2058,7 @@ static void qdr_link_inbound_first_attach_CT(qdr_core_t *core, qdr_action_t *act if (((link->link_type == QD_LINK_CONTROL || link->link_type == QD_LINK_ROUTER) && conn->role != QDR_ROLE_INTER_ROUTER)) { link->link_type = QD_LINK_ENDPOINT; // Demote the link type to endpoint if this is not an inter-router connection - qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_FORBIDDEN, true); + qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_FORBIDDEN); qdr_terminus_free(source); qdr_terminus_free(target); qd_log(LOG_ROUTER_CORE, QD_LOG_ERROR, @@ -2073,7 +2073,7 @@ static void qdr_link_inbound_first_attach_CT(qdr_core_t *core, qdr_action_t *act // if (conn->role == QDR_ROLE_INTER_ROUTER && link->link_type == QD_LINK_ENDPOINT && core->control_links_by_mask_bit[conn->mask_bit] == 0) { - qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_WRONG_ROLE, true); + qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_WRONG_ROLE); qdr_terminus_free(source); qdr_terminus_free(target); qd_log(LOG_ROUTER_CORE, QD_LOG_ERROR, @@ -2117,7 +2117,7 @@ static void qdr_link_inbound_first_attach_CT(qdr_core_t *core, qdr_action_t *act if (core->addr_lookup_handler) core->addr_lookup_handler(core->addr_lookup_context, conn, link, dir, source, target); else { - qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_NO_ROUTE_TO_DESTINATION, true); + qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_NO_ROUTE_TO_DESTINATION); qdr_terminus_free(source); qdr_terminus_free(target); qd_log(LOG_ROUTER_CORE, QD_LOG_ERROR, @@ -2156,7 +2156,7 @@ static void qdr_link_inbound_first_attach_CT(qdr_core_t *core, qdr_action_t *act if (core->addr_lookup_handler) core->addr_lookup_handler(core->addr_lookup_context, conn, link, dir, source, target); else { - qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_NO_ROUTE_TO_DESTINATION, true); + qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_NO_ROUTE_TO_DESTINATION); qdr_terminus_free(source); qdr_terminus_free(target); qd_log(LOG_ROUTER_CORE, QD_LOG_ERROR, @@ -2203,8 +2203,13 @@ static void qdr_link_inbound_second_attach_CT(qdr_core_t *core, qdr_action_t *ac return; } + // expect: called due to an attach received as a response to our sent attach + // + assert(!!(link->state & QDR_LINK_STATE_ATTACH_SENT)); + link->state |= QDR_LINK_STATE_ATTACH_RECVD; + link->oper_status = QDR_LINK_OPER_UP; - link->attach_count++; + // // Mark the link as an edge link if it's inside an edge connection. @@ -2289,28 +2294,13 @@ static void qdr_link_inbound_second_attach_CT(qdr_core_t *core, qdr_action_t *ac } -static void qdr_link_inbound_detach_CT(qdr_core_t *core, qdr_action_t *action, bool discard) +// Perform all detach-related link processing. +// +// error: (optional) error information that arrived in the detach performative +// +static void qdr_link_process_detach(qdr_core_t *core, qdr_link_t *link, qdr_error_t *error) { - qdr_connection_t *conn = safe_deref_qdr_connection_t(action->args.connection.conn); - qdr_link_t *link = safe_deref_qdr_link_t(action->args.connection.link); - qdr_error_t *error = action->args.connection.error; - qd_detach_type_t dt = action->args.connection.dt; - - if (discard || !conn || !link) { - qdr_error_free(error); - return; - } - - if (link->detach_received) - return; - - link->detach_received = true; - ++link->detach_count; - - if (link->core_endpoint) { - qdrc_endpoint_do_detach_CT(core, link->core_endpoint, error, dt); - return; - } + qdr_connection_t *conn = link->conn; // // ensure a pooled link is no longer available for use @@ -2329,8 +2319,6 @@ static void qdr_link_inbound_detach_CT(qdr_core_t *core, qdr_action_t *action, b qdr_route_auto_link_detached_CT(core, link, error); } - - qdr_address_t *addr = link->owning_addr; if (addr) addr->ref_count++; @@ -2404,26 +2392,10 @@ static void qdr_link_inbound_detach_CT(qdr_core_t *core, qdr_action_t *action, b link->owning_addr = 0; - if (link->detach_count == 1) { - // - // Handle the disposition of any deliveries that remain on the link - // - qdr_link_cleanup_deliveries_CT(core, conn, link, false); - - // - // If the detach occurred via protocol, send a detach back. - // - if (dt != QD_LOST) { - qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_NONE, dt == QD_CLOSED); - } else { - // no detach can be sent out because the connection was lost - qdr_link_cleanup_protected_CT(core, conn, link, "Link lost"); - } - } else if (link->detach_send_done) { // detach count indicates detach has been scheduled - // I/O thread is finished sending detach, ok to free link now - - qdr_link_cleanup_protected_CT(core, conn, link, "Link detached"); - } + // + // Handle the disposition of any deliveries that remain on the link + // + qdr_link_cleanup_deliveries_CT(core, link->conn, link, false); // // If there was an address associated with this link, check to see if any address-related @@ -2433,24 +2405,78 @@ static void qdr_link_inbound_detach_CT(qdr_core_t *core, qdr_action_t *action, b addr->ref_count--; qdr_check_addr_CT(core, addr); } +} + + +static void qdr_link_inbound_detach_CT(qdr_core_t *core, qdr_action_t *action, bool discard) +{ + qdr_connection_t *conn = safe_deref_qdr_connection_t(action->args.connection.conn); + qdr_link_t *link = safe_deref_qdr_link_t(action->args.connection.link); + qdr_error_t *error = action->args.connection.error; + + if (discard || !conn || !link) { + qdr_error_free(error); + return; + } + + if (link->state & QDR_LINK_STATE_DETACH_RECVD) + return; + + qd_log(LOG_ROUTER_CORE, QD_LOG_DEBUG, + "[C%"PRIu64"][L%"PRIu64"] qdr_link_inbound_detach_CT()", + conn->identity, link->identity); + + link->state |= QDR_LINK_STATE_DETACH_RECVD; + + const bool first_detach = (link->state & QDR_LINK_STATE_DETACH_SENT) == 0; + + if (link->core_endpoint) { + qdrc_endpoint_do_detach_CT(core, link->core_endpoint, error, first_detach); + return; + } + + qdr_link_process_detach(core, link, error); + + if (first_detach) { + // Send response detach + qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_NONE); + } if (error) qdr_error_free(error); } -/* invoked on core thread to signal that the I/O thread has sent the detach - */ -static void qdr_link_detach_sent_CT(qdr_core_t *core, qdr_action_t *action, bool discard) +static void qdr_link_closed_CT(qdr_core_t *core, qdr_action_t *action, bool discard) { - qdr_link_t *link = safe_deref_qdr_link_t(action->args.connection.link); + qdr_link_t *link = safe_deref_qdr_link_t(action->args.connection.link); + bool forced_close = action->args.connection.forced_close; if (discard || !link) return; - link->detach_send_done = true; - if (link->conn && link->detach_received) - qdr_link_cleanup_protected_CT(core, link->conn, link, "Link detached"); + if (forced_close) { + // The link has been forced closed rather than cleanly detached. + if ((link->state & QDR_LINK_STATE_DETACH_RECVD) == 0) { + // detach-related cleanup was not done - do it now + if (link->core_endpoint) { + bool first_detach = (link->state & QDR_LINK_STATE_DETACH_SENT) == 0; + qdrc_endpoint_do_detach_CT(core, link->core_endpoint, 0, first_detach); + return; + } + + qd_log(LOG_ROUTER_CORE, QD_LOG_DEBUG, + "[C%"PRIu64"][L%"PRIu64"] qdr_link_closed_CT(forced=%s) handle %s detach", + link->conn->identity, link->identity, forced_close ? "YES" : "NO", + (link->state & QDR_LINK_STATE_DETACH_SENT) == 0 ? "first" : "second"); + + qdr_link_process_detach(core, link, 0); + } + } + + qdr_link_cleanup_protected_CT(core, link->conn, link, + action->args.connection.forced_close + ? "Link forced closed" : "Link closed"); } diff --git a/src/router_core/core_link_endpoint.c b/src/router_core/core_link_endpoint.c index 8c4e83ebe..f70b54f6e 100644 --- a/src/router_core/core_link_endpoint.c +++ b/src/router_core/core_link_endpoint.c @@ -100,8 +100,8 @@ void qdrc_endpoint_second_attach_CT(qdr_core_t *core, qdrc_endpoint_t *ep, qdr_t void qdrc_endpoint_detach_CT(qdr_core_t *core, qdrc_endpoint_t *ep, qdr_error_t *error) { - qdr_link_outbound_detach_CT(core, ep->link, error, QDR_CONDITION_NONE, true); - if (ep->link->detach_count == 2) { + qdr_link_outbound_detach_CT(core, ep->link, error, QDR_CONDITION_NONE); + if (QDR_LINK_STATE_IS_CLOSED(ep->link->state)) { qdrc_endpoint_do_cleanup_CT(core, ep); } } @@ -213,17 +213,13 @@ void qdrc_endpoint_do_flow_CT(qdr_core_t *core, qdrc_endpoint_t *ep, int credit, } -void qdrc_endpoint_do_detach_CT(qdr_core_t *core, qdrc_endpoint_t *ep, qdr_error_t *error, qd_detach_type_t dt) +void qdrc_endpoint_do_detach_CT(qdr_core_t *core, qdrc_endpoint_t *ep, qdr_error_t *error, bool first_detach) { - if (dt == QD_LOST) { - qdrc_endpoint_do_cleanup_CT(core, ep); - qdr_error_free(error); - - } else if (ep->link->detach_count == 1) { + if (first_detach) { if (!!ep->desc->on_first_detach) ep->desc->on_first_detach(ep->link_context, error); else { - qdr_link_outbound_detach_CT(core, ep->link, 0, QDR_CONDITION_NONE, true); + qdr_link_outbound_detach_CT(core, ep->link, 0, QDR_CONDITION_NONE); qdr_error_free(error); } } else { diff --git a/src/router_core/core_link_endpoint.h b/src/router_core/core_link_endpoint.h index 59ec76da7..7e8b65c0a 100644 --- a/src/router_core/core_link_endpoint.h +++ b/src/router_core/core_link_endpoint.h @@ -193,7 +193,7 @@ qd_direction_t qdrc_endpoint_get_direction_CT(const qdrc_endpoint_t *endpoint qdr_connection_t *qdrc_endpoint_get_connection_CT(qdrc_endpoint_t *endpoint); /** - * Detach a link attached to the core-endpoint + * Respond to a link attach to the core-endpoint. Typically called by the on_first_attach callback. * * @param core Pointer to the core object * @param endpoint Pointer to an endpoint object @@ -261,7 +261,7 @@ void qdrc_endpoint_do_second_attach_CT(qdr_core_t *core, qdrc_endpoint_t *endpoi void qdrc_endpoint_do_deliver_CT(qdr_core_t *core, qdrc_endpoint_t *endpoint, qdr_delivery_t *delivery); void qdrc_endpoint_do_update_CT(qdr_core_t *core, qdrc_endpoint_t *endpoint, qdr_delivery_t *delivery, bool settled); void qdrc_endpoint_do_flow_CT(qdr_core_t *core, qdrc_endpoint_t *endpoint, int credit, bool drain); -void qdrc_endpoint_do_detach_CT(qdr_core_t *core, qdrc_endpoint_t *endpoint, qdr_error_t *error, qd_detach_type_t dt); +void qdrc_endpoint_do_detach_CT(qdr_core_t *core, qdrc_endpoint_t *endpoint, qdr_error_t *error, bool first_detach); void qdrc_endpoint_do_cleanup_CT(qdr_core_t *core, qdrc_endpoint_t *endpoint); #endif diff --git a/src/router_core/modules/address_lookup_client/address_lookup_client.c b/src/router_core/modules/address_lookup_client/address_lookup_client.c index ede897c08..5fc812a9c 100644 --- a/src/router_core/modules/address_lookup_client/address_lookup_client.c +++ b/src/router_core/modules/address_lookup_client/address_lookup_client.c @@ -234,13 +234,13 @@ static void qdr_link_react_to_first_attach_CT(qdr_core_t *core, source = target = 0; // ownership passed to qdrc_endpoint_do_bound_attach_CT } else if (unavailable) { - qdr_link_outbound_detach_CT(core, link, qdr_error(QD_AMQP_COND_NOT_FOUND, "Node not found"), 0, true); + qdr_link_outbound_detach_CT(core, link, qdr_error(QD_AMQP_COND_NOT_FOUND, "Node not found"), 0); } else if (!addr) { // // No route to this destination, reject the link // - qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_NO_ROUTE_TO_DESTINATION, true); + qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_NO_ROUTE_TO_DESTINATION); } else { // // Prior to binding, check to see if this is an inter-edge connection. If so, diff --git a/src/router_core/modules/edge_router/addr_proxy.c b/src/router_core/modules/edge_router/addr_proxy.c index 4d7e228c0..8fb1118cb 100644 --- a/src/router_core/modules/edge_router/addr_proxy.c +++ b/src/router_core/modules/edge_router/addr_proxy.c @@ -117,7 +117,7 @@ static void del_inlink(qcm_edge_addr_proxy_t *ap, qdr_address_t *addr) if (link) { qd_nullify_safe_ptr(&addr->edge_inlink_sp); qdr_core_unbind_address_link_CT(ap->core, addr, link); - qdr_link_outbound_detach_CT(ap->core, link, 0, QDR_CONDITION_NONE, true); + qdr_link_outbound_detach_CT(ap->core, link, 0, QDR_CONDITION_NONE); } } @@ -148,7 +148,7 @@ static void del_outlink(qcm_edge_addr_proxy_t *ap, qdr_address_t *addr) if (link) { qd_nullify_safe_ptr(&addr->edge_outlink_sp); qdr_core_unbind_address_link_CT(ap->core, addr, link); - qdr_link_outbound_detach_CT(ap->core, link, 0, QDR_CONDITION_NONE, true); + qdr_link_outbound_detach_CT(ap->core, link, 0, QDR_CONDITION_NONE); } } @@ -184,7 +184,7 @@ static void remove_proxies_for_addr(qcm_edge_addr_proxy_t *ap, qdr_address_t *ad qdr_link_t *link = ref->link; if (link->conn && link->conn->role == QDR_ROLE_INTER_EDGE) { qdr_core_unbind_address_link_CT(ap->core, addr, link); - qdr_link_outbound_detach_CT(ap->core, link, 0, QDR_CONDITION_NONE, true); + qdr_link_outbound_detach_CT(ap->core, link, 0, QDR_CONDITION_NONE); } ref = next; } diff --git a/src/router_core/modules/streaming_link_scrubber/streaming_link_scrubber.c b/src/router_core/modules/streaming_link_scrubber/streaming_link_scrubber.c index 34981d00c..ad6065569 100644 --- a/src/router_core/modules/streaming_link_scrubber/streaming_link_scrubber.c +++ b/src/router_core/modules/streaming_link_scrubber/streaming_link_scrubber.c @@ -91,7 +91,7 @@ static void idle_link_cleanup(qdr_core_t *core, qdr_connection_t *conn) qd_log(LOG_ROUTER_CORE, QD_LOG_DEBUG, "[C%" PRIu64 "][L%" PRIu64 "] Streaming link scrubber: closing idle link %s", link->conn->identity, link->identity, (link->name) ? link->name : ""); - qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_NONE, true); + qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_NONE); } } } diff --git a/src/router_core/route_control.c b/src/router_core/route_control.c index abb45885a..40d501d19 100644 --- a/src/router_core/route_control.c +++ b/src/router_core/route_control.c @@ -194,7 +194,7 @@ static void qdr_auto_link_deactivate_CT(qdr_core_t *core, qdr_auto_link_t *al, q qdr_route_log_CT(core, "Auto Link Deactivated", al->name, al->identity, conn); if (al->link) { - qdr_link_outbound_detach_CT(core, al->link, 0, QDR_CONDITION_NONE, true); + qdr_link_outbound_detach_CT(core, al->link, 0, QDR_CONDITION_NONE); al->link->auto_link = 0; al->link = 0; } diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h index df69d8a6d..2487acd6a 100644 --- a/src/router_core/router_core_private.h +++ b/src/router_core/router_core_private.h @@ -126,10 +126,10 @@ struct qdr_action_t { qdr_terminus_t *source; qdr_terminus_t *target; qdr_error_t *error; - qd_detach_type_t dt; int credit; bool drain; bool enable_protocol_trace; + bool forced_close; qdr_delivery_t *initial_delivery; } connection; @@ -428,6 +428,22 @@ typedef enum { QDR_LINK_OPER_IDLE } qdr_link_oper_status_t; +typedef enum { + QDR_LINK_STATE_UNINIT = 0x00, + QDR_LINK_STATE_ATTACH_RECVD = 0x01, + QDR_LINK_STATE_ATTACH_SENT = 0x02, + QDR_LINK_STATE_DETACH_RECVD = 0x04, + QDR_LINK_STATE_DETACH_SENT = 0x08, + QDR_LINK_STATE_MASK = 0x0F +} qdr_link_state_t; + +// Link Open: both sides attach (and no detaches yet) +#define QDR_LINK_STATE_IS_OPEN(LS) (!!(((LS) & QDR_LINK_STATE_MASK) == \ + (QDR_LINK_STATE_ATTACH_RECVD | QDR_LINK_STATE_ATTACH_SENT))) +// Link Closed: both sides detach +#define QDR_LINK_STATE_IS_CLOSED(LS) (!!(((LS) & (QDR_LINK_STATE_DETACH_RECVD | QDR_LINK_STATE_DETACH_SENT)) \ + == (QDR_LINK_STATE_DETACH_RECVD | QDR_LINK_STATE_DETACH_SENT))) + #define QDR_LINK_RATE_DEPTH 5 struct qdr_link_t { @@ -439,12 +455,11 @@ struct qdr_link_t { qdr_connection_t *conn; ///< [ref] Connection that owns this link qd_link_type_t link_type; qd_direction_t link_direction; + qdr_link_state_t state; qdr_link_work_list_t work_list; char *name; char *disambiguated_name; char *terminus_addr; - int attach_count; ///< 1 or 2 depending on the state of the lifecycle - int detach_count; ///< 0, 1, or 2 depending on the state of the lifecycle uint32_t open_moved_streams; ///< Number of still-open streaming deliveries that were moved from this link qdr_address_t *owning_addr; ///< [ref] Address record that owns this link qdrc_endpoint_t *core_endpoint; ///< [ref] Set if this link terminates on an in-core endpoint @@ -467,8 +482,6 @@ struct qdr_link_t { bool strip_annotations_out; bool drain_mode; bool stalled_outbound; ///< Indicates that this link is stalled on outbound buffer backpressure - bool detach_received; ///< True on core receipt of inbound attach - bool detach_send_done; ///< True once the detach has been sent by the I/O thread bool edge; ///< True if this link is in an edge-connection bool processing; ///< True if an IO thread is currently handling this link bool ready_to_free; ///< True if the core thread wanted to clean up the link but it was processing @@ -992,7 +1005,7 @@ qdr_link_t *qdr_create_link_CT(qdr_core_t *core, qd_session_class_t ssn_class, uint8_t priority); -void qdr_link_outbound_detach_CT(qdr_core_t *core, qdr_link_t *link, qdr_error_t *error, qdr_condition_t condition, bool close); +void qdr_link_outbound_detach_CT(qdr_core_t *core, qdr_link_t *link, qdr_error_t *error, qdr_condition_t condition); void qdr_link_outbound_second_attach_CT(qdr_core_t *core, qdr_link_t *link, qdr_terminus_t *source, qdr_terminus_t *target); bool qdr_link_is_idle_CT(const qdr_link_t *link); qdr_terminus_t *qdr_terminus_router_control(void); ///< new terminus for router control links diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c index b44798686..5e67b4c1e 100644 --- a/src/router_core/transfer.c +++ b/src/router_core/transfer.c @@ -152,10 +152,6 @@ int qdr_link_process_deliveries(qdr_core_t *core, qdr_link_t *link, int credit) if (link->link_direction == QD_OUTGOING) { - // If a detach has been received on the link, there is no need to process deliveries on the link. - if (link->detach_received) - return 0; - while (credit > 0) { sys_mutex_lock(&conn->work_lock); dlv = DEQ_HEAD(link->undelivered); @@ -432,10 +428,10 @@ static void qdr_link_flow_CT(qdr_core_t *core, qdr_action_t *action, bool discar qdrc_endpoint_do_flow_CT(core, link->core_endpoint, credit, drain); } - if (link->attach_count == 1) + if (!QDR_LINK_STATE_IS_OPEN(link->state)) // - // The link is half-open. Store the pending credit to be dealt with once the link is - // progressed to the next step. + // The link is not fully open. Store the pending credit to be dealt with once the link has + // reached the open state. // link->credit_stored += credit; diff --git a/tests/test-receiver.c b/tests/test-receiver.c index 46377ffbd..77d7120a2 100644 --- a/tests/test-receiver.c +++ b/tests/test-receiver.c @@ -196,7 +196,6 @@ static bool event_handler(pn_event_t *event) case PN_PROACTOR_INACTIVE: case PN_PROACTOR_INTERRUPT: { - assert(stop); // expect: due to stopping debug("proactor inactive!\n"); return true; } break; diff --git a/tests/test-sender.c b/tests/test-sender.c index 2646d00e0..baaf300a1 100644 --- a/tests/test-sender.c +++ b/tests/test-sender.c @@ -88,9 +88,6 @@ char *host_address = _addr; char *container_name = "TestSender"; char proactor_address[1024]; -pn_connection_t *pn_conn; -pn_session_t *pn_ssn; -pn_link_t *pn_link; pn_proactor_t *proactor; pn_message_t *out_message; @@ -253,6 +250,7 @@ static bool event_handler(pn_event_t *event) case PN_CONNECTION_INIT: { // Create and open all the endpoints needed to send a message // + pn_connection_t *pn_conn = pn_event_connection(event); pn_connection_open(pn_conn); pn_session_t *pn_ssn = pn_session(pn_conn); pn_session_open(pn_ssn); @@ -267,30 +265,31 @@ static bool event_handler(pn_event_t *event) } break; - case PN_CONNECTION_WAKE: { - if (stop) { - pn_proactor_cancel_timeout(proactor); - if (drop_connection) { // hard stop - if (verbose) { - fprintf(stdout, - "Sent:%"PRIu64" Accepted:%"PRIu64" Rejected:%"PRIu64 - " Released:%"PRIu64" Modified:%"PRIu64"\n", - count, accepted, rejected, released, modified); - fflush(stdout); - } - exit(0); - } - if (pn_conn) { - debug("Stop detected - closing connection...\n"); - if (pn_link) pn_link_close(pn_link); - if (pn_ssn) pn_session_close(pn_ssn); - pn_connection_close(pn_conn); - pn_link = 0; - pn_ssn = 0; - pn_conn = 0; - } + case PN_LINK_REMOTE_CLOSE: { + pn_link_t *pn_link = pn_event_link(event); + if (pn_link_state(pn_link) == (PN_LOCAL_CLOSED | PN_REMOTE_CLOSED)) { + pn_session_close(pn_link_session(pn_link)); + pn_link_free(pn_link); } - } break; + break; + } + + case PN_SESSION_REMOTE_CLOSE: { + pn_session_t *pn_session = pn_event_session(event); + if (pn_session_state(pn_session) == (PN_LOCAL_CLOSED | PN_REMOTE_CLOSED)) { + pn_connection_close(pn_session_connection(pn_session)); + pn_session_free(pn_session); + } + break; + } + + case PN_CONNECTION_REMOTE_CLOSE: { + pn_connection_t *pn_conn = pn_event_connection(event); + if (pn_connection_state(pn_conn) == (PN_LOCAL_CLOSED | PN_REMOTE_CLOSED)) { + pn_conn = 0; + } + break; + } case PN_LINK_FLOW: { // the remote has given us some credit, now we can send messages @@ -315,7 +314,19 @@ static bool event_handler(pn_event_t *event) // no need to wait for acks debug("stopping (presettled)...\n"); stop = true; - pn_connection_wake(pn_conn); + pn_proactor_cancel_timeout(proactor); + if (drop_connection) { // hard stop + if (verbose) { + fprintf(stdout, + "Sent:%"PRIu64" Accepted:%"PRIu64" Rejected:%"PRIu64 + " Released:%"PRIu64" Modified:%"PRIu64"\n", + count, accepted, rejected, released, modified); + fflush(stdout); + } + exit(0); + } else { // graceful stop + pn_link_close(sender); + } } } } @@ -357,29 +368,40 @@ static bool event_handler(pn_event_t *event) if (limit && acked == limit) { // initiate clean shutdown of the endpoints - debug("stopping...\n"); + debug("Done sending\n"); stop = true; - pn_connection_wake(pn_conn); + pn_proactor_cancel_timeout(proactor); + if (drop_connection) { // hard stop + if (verbose) { + fprintf(stdout, + "Sent:%"PRIu64" Accepted:%"PRIu64" Rejected:%"PRIu64 + " Released:%"PRIu64" Modified:%"PRIu64"\n", + count, accepted, rejected, released, modified); + fflush(stdout); + } + exit(0); + } else { // graceful stop + pn_link_close(pn_event_link(event)); + } } } } break; case PN_PROACTOR_TIMEOUT: { - if (verbose) { - fprintf(stdout, - "Sent:%"PRIu64" Accepted:%"PRIu64" Rejected:%"PRIu64 - " Released:%"PRIu64" Modified:%"PRIu64" Limit:%"PRIu64"\n", - count, accepted, rejected, released, modified, limit); - fflush(stdout); - if (!stop) { - pn_proactor_set_timeout(proactor, 10 * 1000); + if (!stop) { + if (verbose) { + fprintf(stdout, + "Sent:%"PRIu64" Accepted:%"PRIu64" Rejected:%"PRIu64 + " Released:%"PRIu64" Modified:%"PRIu64" Limit:%"PRIu64"\n", + count, accepted, rejected, released, modified, limit); + fflush(stdout); } + pn_proactor_set_timeout(proactor, 10 * 1000); } } break; case PN_PROACTOR_INACTIVE: case PN_PROACTOR_INTERRUPT: { - assert(stop); // expect: due to stopping debug("proactor inactive!\n"); return true; } break; @@ -469,7 +491,7 @@ int main(int argc, char** argv) port = "5672"; } - pn_conn = pn_connection(); + pn_connection_t *pn_conn = pn_connection(); // the container name should be unique for each client pn_connection_set_container(pn_conn, container_name); pn_connection_set_hostname(pn_conn, host);