diff --git a/xkcp_server.c b/xkcp_server.c index 50cf168..f2a5edb 100644 --- a/xkcp_server.c +++ b/xkcp_server.c @@ -43,26 +43,122 @@ #include "ikcp.h" #include "debug.h" +#include "jwHash.h" #include "xkcp_server.h" #include "xkcp_config.h" #include "xkcp_util.h" #include "xkcp_mon.h" #include "tcp_client.h" -IQUEUE_HEAD(xkcp_task_list); +#ifndef NI_MAXHOST +#define NI_MAXHOST 1025 +#endif +#ifndef NI_MAXSERV +#define NI_MAXSERV 32 +#endif static short mport = 9087; +static jwHashTable *xkcp_hash = NULL; -static struct xkcp_proxy_param *param = NULL; - -iqueue_head * get_xkcp_task_list() +jwHashTable * get_xkcp_hash() { - return &xkcp_task_list; + return xkcp_hash; } static void timer_event_cb(evutil_socket_t fd, short event, void *arg) { - xkcp_timer_event_cb(arg, &xkcp_task_list); + hash_iterator(xkcp_hash, xkcp_update_task_list, HASHPTR); + + set_timer_interval(arg); +} + +static struct xkcp_task *create_new_tcp_connection(const int xkcpfd, struct event_base *base, + struct sockaddr_in *from, int from_len, int conv, iqueue_head *task_list) +{ + struct xkcp_proxy_param *param = malloc(sizeof(struct xkcp_proxy_param)); + memset(param, 0, sizeof(struct xkcp_proxy_param)); + memcpy(¶m->sockaddr, from, from_len); + param->xkcpfd = xkcpfd; + param->addr_len = from_len; + + ikcpcb *kcp_server = ikcp_create(conv, param); + xkcp_set_config_param(kcp_server); + + struct xkcp_task *task = malloc(sizeof(struct xkcp_task)); + assert(task); + task->kcp = kcp_server; + task->svr_addr = ¶m->serveraddr; + + struct bufferevent *bev = bufferevent_socket_new(base, -1, BEV_OPT_CLOSE_ON_FREE); + if (!bev) { + debug(LOG_ERR, "bufferevent_socket_new failed [%s]", strerror(errno)); + goto err; + } + + task->bev = bev; + bufferevent_setcb(bev, tcp_client_read_cb, NULL, tcp_client_event_cb, task); + bufferevent_enable(bev, EV_READ); + if (bufferevent_socket_connect_hostname(bev, NULL, AF_INET, + xkcp_get_param()->remote_addr, + xkcp_get_param()->remote_port) < 0) { + bufferevent_free(bev); + debug(LOG_ERR, "bufferevent_socket_connect failed [%s]", strerror(errno)); + goto err; + } + add_task_tail(task, task_list); + debug(LOG_INFO, "tcp client [%d] connect to [%s]:[%d] success", bufferevent_getfd(bev), + xkcp_get_param()->remote_addr, xkcp_get_param()->remote_port); + return kcp_server; +err: + free(param); + free(task); + return NULL; +} + +static struct xkcp_proxy_param *accept_client_data(const int xkcpfd, struct event_base *base, + struct sockaddr_in *from, int from_len, char *data, int len) +{ + char host[NI_MAXHOST] = {0}; + char serv[NI_MAXSERV] = {0}; + char key[NI_MAXHOST+NI_MAXSERV+1] = {0}; + + int nret = getnameinfo((struct sockaddr *) from, fromlen, + host, sizeof(host), serv, sizeof(serv), + NI_NUMERICHOST | NI_DGRAM); + if (nret) { + debug(LOG_INFO, "accept_new_client: getnameinfo error %s", strerror(errno)); + return NULL; + } + + iqueue_head *task_list = NULL; + snprintf(key, NI_MAXHOST+NI_MAXSERV+1, "%s:%s", host, serv); + get_ptr_by_str(xkcp_table, key, &task_list); + ikcpcb *kcp_server = NULL; + if (task_list) { + //old client + int conv = ikcp_getconv(buf); + ikcpcb *kcp_server = get_kcp_from_conv(conv, task_list); + if (!kcp_server) { + // new tcp connection + kcp_server = create_new_tcp_connection(xkcpfd, base, from, from_len, conv, task_list); + } + } else { + // new client + task_list = malloc(sizeof(iqueue_head)); + iqueue_init(task_list); + add_ptr_by_str(xkcp_table, key, task_list); + kcp_server = create_new_tcp_connection(xkcpfd, base, from, from_len, conv, task_list); + } + + if (kcp_server ) { + int nret = ikcp_input(kcp_server, data, len); + if (nret < 0) { + debug(LOG_INFO, "[%d] ikcp_input failed [%d]", kcp_server->conv, nret); + } + + if (task_list) + xkcp_forward_all_data(task_list); + } } static void xkcp_rcv_cb(const int sock, short int which, void *arg) @@ -75,18 +171,20 @@ static void xkcp_rcv_cb(const int sock, short int which, void *arg) char buf[BUF_RECV_LEN] = {0}; int len = recvfrom(sock, buf, sizeof(buf) - 1, 0, (struct sockaddr *) &clientaddr, &clientlen); if (len > 0) { - if (param == NULL) { - param = malloc(sizeof(struct xkcp_proxy_param)); - memset(param, 0, sizeof(struct xkcp_proxy_param)); - memcpy(¶m->serveraddr, &clientaddr, clientlen); - param->udp_fd = sock; - param->addr_len = clientlen; - } int conv = ikcp_getconv(buf); ikcpcb *kcp_server = get_kcp_from_conv(conv, &xkcp_task_list); debug(LOG_INFO, "xkcp_server: xkcp_rcv_cb -- xkcp sock %d conv is %d, kcp_server is %d, recv data %d", sock, conv, kcp_server?1:0, len); if (kcp_server == NULL) { +#if 1 + accept_client_data(sock, base, clientaddr, clientlen, buf, len); +#else + struct xkcp_proxy_param *param = malloc(sizeof(struct xkcp_proxy_param)); + memset(param, 0, sizeof(struct xkcp_proxy_param)); + memcpy(¶m->sockaddr, &clientaddr, clientlen); + param->udp_fd = sock; + param->addr_len = clientlen; + kcp_server = ikcp_create(conv, param); xkcp_set_config_param(kcp_server); @@ -119,9 +217,9 @@ static void xkcp_rcv_cb(const int sock, short int which, void *arg) if (nret < 0) { debug(LOG_INFO, "[%d] ikcp_input failed [%d]", kcp_server->conv, nret); } - } - - xkcp_forward_all_data(&xkcp_task_list); + xkcp_forward_all_data(&xkcp_task_list); +#endif + } } static int set_xkcp_listener() @@ -162,6 +260,8 @@ int server_main_loop() exit(0); } + xkcp_hash = create_hash(100); + int xkcp_fd = set_xkcp_listener(); mon_listener = set_xkcp_mon_listener(base, mport, &xkcp_task_list); @@ -177,7 +277,8 @@ int server_main_loop() evconnlistener_free(mon_listener); close(xkcp_fd); event_base_free(base); - + delete_hash(xkcp_hash); + return 0; }