Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[udpfw] close idle paths after 60 seconds #307

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 18 additions & 5 deletions t/udpfw.c
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ struct connection_t {
} down_addr;
uint64_t packet_num_up;
uint64_t packet_num_down;
int64_t last_transmit_at;
};

struct queue_t {
Expand Down Expand Up @@ -156,9 +157,19 @@ static struct connection_t *find_or_create_connection(struct sockaddr *sa, sockl
c->prev->next = c;
c->packet_num_up = 0;
c->packet_num_down = 0;
c->last_transmit_at = 0;
return c;
}

static void destroy_connection(struct connection_t *c, int64_t now)
{
fprintf(stderr, "%" PRId64 ":%zu:destroy\n", now, c->cid);
c->prev->next = c->next;
c->next->prev = c->prev;
close(c->up_fd);
free(c);
}

static void init_queue(struct queue_t *q)
{
assert(q->ring.depth != 0);
Expand All @@ -174,14 +185,16 @@ static void dequeue(struct queue_t *q, int up, int64_t now)
return;
if (now < q->congested_until)
return;

struct connection_t *conn = q->ring.elements[q->ring.head].conn;
if (up) {
send(q->ring.elements[q->ring.head].conn->up_fd, q->ring.elements[q->ring.head].data, q->ring.elements[q->ring.head].len,
0);
send(conn->up_fd, q->ring.elements[q->ring.head].data, q->ring.elements[q->ring.head].len, 0);
} else {
sendto(listen_fd, q->ring.elements[q->ring.head].data, q->ring.elements[q->ring.head].len, 0,
(void *)&q->ring.elements[q->ring.head].conn->down_addr.ss, q->ring.elements[q->ring.head].conn->down_addr.len);
}
fprintf(stderr, "%" PRId64 ":%zu:%c:forward\n", now, q->ring.elements[q->ring.head].conn->cid, up ? 'u' : 'd');
conn->last_transmit_at = now;
fprintf(stderr, "%" PRId64 ":%zu:%c:forward\n", now, conn->cid, up ? 'u' : 'd');
q->ring.head = (q->ring.head + 1) % q->ring.depth;
if (q->ring.head == q->ring.tail) // empty queue
return;
Expand Down Expand Up @@ -413,8 +426,8 @@ int main(int argc, char **argv)
if (FD_ISSET(c->up_fd, &fds)) {
while (enqueue(&down, c, now))
;
} else {
/* close idle connections */
} else if (c->last_transmit_at + 60 * 1000 < now) {
destroy_connection(c, now);
}
}
}
Expand Down