Skip to content

Commit

Permalink
Update xkcp_server.c
Browse files Browse the repository at this point in the history
  • Loading branch information
liudf0716 authored Apr 19, 2017
1 parent 450fce0 commit 3c57f37
Showing 1 changed file with 118 additions and 17 deletions.
135 changes: 118 additions & 17 deletions xkcp_server.c
Original file line number Diff line number Diff line change
Expand Up @@ -43,26 +43,122 @@

#include "ikcp.h"
#include "debug.h"
#include "jwHash.h"
#include "xkcp_server.h"
#include "xkcp_config.h"
#include "xkcp_util.h"
#include "xkcp_mon.h"
#include "tcp_client.h"

IQUEUE_HEAD(xkcp_task_list);
#ifndef NI_MAXHOST
#define NI_MAXHOST 1025
#endif
#ifndef NI_MAXSERV
#define NI_MAXSERV 32
#endif

static short mport = 9087;
static jwHashTable *xkcp_hash = NULL;

static struct xkcp_proxy_param *param = NULL;

iqueue_head * get_xkcp_task_list()
jwHashTable * get_xkcp_hash()
{
return &xkcp_task_list;
return xkcp_hash;
}

static void timer_event_cb(evutil_socket_t fd, short event, void *arg)
{
xkcp_timer_event_cb(arg, &xkcp_task_list);
hash_iterator(xkcp_hash, xkcp_update_task_list, HASHPTR);

set_timer_interval(arg);
}

static struct xkcp_task *create_new_tcp_connection(const int xkcpfd, struct event_base *base,
struct sockaddr_in *from, int from_len, int conv, iqueue_head *task_list)
{
struct xkcp_proxy_param *param = malloc(sizeof(struct xkcp_proxy_param));
memset(param, 0, sizeof(struct xkcp_proxy_param));
memcpy(&param->sockaddr, from, from_len);
param->xkcpfd = xkcpfd;
param->addr_len = from_len;

ikcpcb *kcp_server = ikcp_create(conv, param);
xkcp_set_config_param(kcp_server);

struct xkcp_task *task = malloc(sizeof(struct xkcp_task));
assert(task);
task->kcp = kcp_server;
task->svr_addr = &param->serveraddr;

struct bufferevent *bev = bufferevent_socket_new(base, -1, BEV_OPT_CLOSE_ON_FREE);
if (!bev) {
debug(LOG_ERR, "bufferevent_socket_new failed [%s]", strerror(errno));
goto err;
}

task->bev = bev;
bufferevent_setcb(bev, tcp_client_read_cb, NULL, tcp_client_event_cb, task);
bufferevent_enable(bev, EV_READ);
if (bufferevent_socket_connect_hostname(bev, NULL, AF_INET,
xkcp_get_param()->remote_addr,
xkcp_get_param()->remote_port) < 0) {
bufferevent_free(bev);
debug(LOG_ERR, "bufferevent_socket_connect failed [%s]", strerror(errno));
goto err;
}
add_task_tail(task, task_list);
debug(LOG_INFO, "tcp client [%d] connect to [%s]:[%d] success", bufferevent_getfd(bev),
xkcp_get_param()->remote_addr, xkcp_get_param()->remote_port);
return kcp_server;
err:
free(param);
free(task);
return NULL;
}

static struct xkcp_proxy_param *accept_client_data(const int xkcpfd, struct event_base *base,
struct sockaddr_in *from, int from_len, char *data, int len)
{
char host[NI_MAXHOST] = {0};
char serv[NI_MAXSERV] = {0};
char key[NI_MAXHOST+NI_MAXSERV+1] = {0};

int nret = getnameinfo((struct sockaddr *) from, fromlen,
host, sizeof(host), serv, sizeof(serv),
NI_NUMERICHOST | NI_DGRAM);
if (nret) {
debug(LOG_INFO, "accept_new_client: getnameinfo error %s", strerror(errno));
return NULL;
}

iqueue_head *task_list = NULL;
snprintf(key, NI_MAXHOST+NI_MAXSERV+1, "%s:%s", host, serv);
get_ptr_by_str(xkcp_table, key, &task_list);
ikcpcb *kcp_server = NULL;
if (task_list) {
//old client
int conv = ikcp_getconv(buf);
ikcpcb *kcp_server = get_kcp_from_conv(conv, task_list);
if (!kcp_server) {
// new tcp connection
kcp_server = create_new_tcp_connection(xkcpfd, base, from, from_len, conv, task_list);
}
} else {
// new client
task_list = malloc(sizeof(iqueue_head));
iqueue_init(task_list);
add_ptr_by_str(xkcp_table, key, task_list);
kcp_server = create_new_tcp_connection(xkcpfd, base, from, from_len, conv, task_list);
}

if (kcp_server ) {
int nret = ikcp_input(kcp_server, data, len);
if (nret < 0) {
debug(LOG_INFO, "[%d] ikcp_input failed [%d]", kcp_server->conv, nret);
}

if (task_list)
xkcp_forward_all_data(task_list);
}
}

static void xkcp_rcv_cb(const int sock, short int which, void *arg)
Expand All @@ -75,18 +171,20 @@ static void xkcp_rcv_cb(const int sock, short int which, void *arg)
char buf[BUF_RECV_LEN] = {0};
int len = recvfrom(sock, buf, sizeof(buf) - 1, 0, (struct sockaddr *) &clientaddr, &clientlen);
if (len > 0) {
if (param == NULL) {
param = malloc(sizeof(struct xkcp_proxy_param));
memset(param, 0, sizeof(struct xkcp_proxy_param));
memcpy(&param->serveraddr, &clientaddr, clientlen);
param->udp_fd = sock;
param->addr_len = clientlen;
}
int conv = ikcp_getconv(buf);
ikcpcb *kcp_server = get_kcp_from_conv(conv, &xkcp_task_list);
debug(LOG_INFO, "xkcp_server: xkcp_rcv_cb -- xkcp sock %d conv is %d, kcp_server is %d, recv data %d",
sock, conv, kcp_server?1:0, len);
if (kcp_server == NULL) {
#if 1
accept_client_data(sock, base, clientaddr, clientlen, buf, len);
#else
struct xkcp_proxy_param *param = malloc(sizeof(struct xkcp_proxy_param));
memset(param, 0, sizeof(struct xkcp_proxy_param));
memcpy(&param->sockaddr, &clientaddr, clientlen);
param->udp_fd = sock;
param->addr_len = clientlen;

kcp_server = ikcp_create(conv, param);
xkcp_set_config_param(kcp_server);

Expand Down Expand Up @@ -119,9 +217,9 @@ static void xkcp_rcv_cb(const int sock, short int which, void *arg)
if (nret < 0) {
debug(LOG_INFO, "[%d] ikcp_input failed [%d]", kcp_server->conv, nret);
}
}
xkcp_forward_all_data(&xkcp_task_list);
xkcp_forward_all_data(&xkcp_task_list);
#endif
}
}

static int set_xkcp_listener()
Expand Down Expand Up @@ -162,6 +260,8 @@ int server_main_loop()
exit(0);
}

xkcp_hash = create_hash(100);

int xkcp_fd = set_xkcp_listener();

mon_listener = set_xkcp_mon_listener(base, mport, &xkcp_task_list);
Expand All @@ -177,7 +277,8 @@ int server_main_loop()
evconnlistener_free(mon_listener);
close(xkcp_fd);
event_base_free(base);

delete_hash(xkcp_hash);

return 0;
}

Expand Down

0 comments on commit 3c57f37

Please sign in to comment.