Skip to content

Commit

Permalink
[WIP] 90% redesign h2/mut from scratch
Browse files Browse the repository at this point in the history
  • Loading branch information
radkesvat committed Jul 10, 2024
1 parent 85fb15e commit b214fdc
Show file tree
Hide file tree
Showing 5 changed files with 217 additions and 188 deletions.
58 changes: 27 additions & 31 deletions tunnels/client/http2/helpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,12 @@ static void onH2LinePaused(void *arg)
// ++(con->pause_counter);
// if (con->pause_counter > 8)
// {
http2_client_child_con_state_t *stream_i;
for (stream_i = con->root.next; stream_i;)
{
pauseLineDownSide(stream_i->line);
stream_i = stream_i->next;
}
http2_client_child_con_state_t *stream_i;
for (stream_i = con->root.next; stream_i;)
{
pauseLineDownSide(stream_i->line);
stream_i = stream_i->next;
}
// }
}

Expand Down Expand Up @@ -117,8 +117,9 @@ static http2_client_child_con_state_t *createHttp2Stream(http2_client_con_state_
nvs[nvlen++] = (makeNV("content-type", "application/grpc+proto"));
}
// todo (match chrome) this one is same as curl, but not same as chrome
nvs[nvlen++] = makeNV("Accept", "*/*");
//chrome: "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7");
nvs[nvlen++] = makeNV("Accept", "*/*");
// chrome:
// "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7");

nvs[nvlen++] = makeNV("Accept-Language", "en,fa;q=0.9,zh-CN;q=0.8,zh;q=0.7");
nvs[nvlen++] = makeNV("Cache-Control", "no-cache");
Expand All @@ -132,11 +133,11 @@ static http2_client_child_con_state_t *createHttp2Stream(http2_client_con_state_
http2_client_child_con_state_t *stream = wwmGlobalMalloc(sizeof(http2_client_child_con_state_t));
memset(stream, 0, sizeof(http2_client_child_con_state_t));
// stream->stream_id = nghttp2_submit_request2(con->session, NULL, &nvs[0], nvlen, NULL,stream);
stream->stream_id = nghttp2_submit_headers(con->session, flags, -1, NULL, &nvs[0], nvlen, stream);
stream->chunkbs = newBufferStream(getLineBufferPool(con->line));
stream->parent = con->line;
stream->line = child_line;
stream->tunnel = con->tunnel;
stream->stream_id = nghttp2_submit_headers(con->session, flags, -1, NULL, &nvs[0], nvlen, stream);
stream->grpc_buffer_stream = newBufferStream(getLineBufferPool(con->line));
stream->parent = con->line;
stream->line = child_line;
stream->tunnel = con->tunnel;
LSTATE_I_MUT(stream->line, stream->tunnel->chain_index) = stream;
setupLineUpSide(stream->line, onStreamLinePaused, stream, onStreamLineResumed);

Expand All @@ -150,7 +151,7 @@ static void deleteHttp2Stream(http2_client_child_con_state_t *stream)
{

LSTATE_I_DROP(stream->line, stream->tunnel->chain_index);
destroyBufferStream(stream->chunkbs);
destroyBufferStream(stream->grpc_buffer_stream);
doneLineUpSide(stream->line);
wwmGlobalFree(stream);
}
Expand All @@ -159,21 +160,18 @@ static http2_client_con_state_t *createHttp2Connection(tunnel_t *self, int tid)
{
http2_client_state_t *state = TSTATE(self);
http2_client_con_state_t *con = wwmGlobalMalloc(sizeof(http2_client_con_state_t));

*con = (http2_client_con_state_t){
.queue = newContextQueue(),
.content_type = state->content_type,
.path = state->path,
.host = state->host,
.host_port = state->host_port,
.scheme = state->scheme,
.state = kH2SendMagic,
.method = state->content_type == kApplicationGrpc? kHttpPost: kHttpGet,
.line = newLine(tid),
.ping_timer = htimer_add(loops[tid], onPingTimer, kPingInterval, INFINITE),
.tunnel = self,
};
LSTATE_MUT(con->line) = con;
*con = (http2_client_con_state_t) {.queue = newContextQueue(),
.content_type = state->content_type,
.path = state->path,
.host = state->host,
.host_port = state->host_port,
.scheme = state->scheme,
.state = kH2SendMagic,
.method = state->content_type == kApplicationGrpc ? kHttpPost : kHttpGet,
.line = newLine(tid),
.ping_timer = htimer_add(loops[tid], onPingTimer, kPingInterval, INFINITE),
.tunnel = self};
LSTATE_MUT(con->line) = con;
setupLineDownSide(con->line, onH2LinePaused, con, onH2LineResumed);

hevent_set_userdata(con->ping_timer, con);
Expand All @@ -185,8 +183,6 @@ static http2_client_con_state_t *createHttp2Connection(tunnel_t *self, int tid)
};
nghttp2_submit_settings(con->session, NGHTTP2_FLAG_NONE, settings, ARRAY_SIZE(settings));



return con;
}
static void deleteHttp2Connection(http2_client_con_state_t *con)
Expand Down
117 changes: 64 additions & 53 deletions tunnels/client/http2/http2_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,26 @@ enum
kDefaultConcurrency = 64 // cons will be muxed into 1
};

static void sendGrpcFinalData(tunnel_t *self, line_t *line, size_t stream_id)
static int onStreamClosedCallback(nghttp2_session *session, int32_t stream_id, uint32_t error_code, void *userdata)
{
http2_frame_hd framehd;
shift_buffer_t *buf = popBuffer(getLineBufferPool(line));
setLen(buf, HTTP2_FRAME_HDLEN);

framehd.length = 0;
framehd.type = kHttP2Data;
framehd.flags = kHttP2FlagEndStream;
framehd.stream_id = stream_id;
http2FrameHdPack(&framehd, rawBufMut(buf));
context_t *endstream_ctx = newContext(line);
endstream_ctx->payload = buf;
self->up->upStream(self->up, endstream_ctx);
(void) stream_id;
(void) error_code;

http2_client_con_state_t *con = (http2_client_con_state_t *) userdata;
tunnel_t *self = con->tunnel;
http2_client_child_con_state_t *stream = nghttp2_session_get_stream_user_data(session, stream_id);
if (! stream)
{
return 0;
}
context_t *fc = newFinContext(stream->line);
CSTATE_DROP(fc);
tunnel_t *dest = stream->tunnel->dw;
removeStream(con, stream);
deleteHttp2Stream(stream);
dest->downStream(dest, fc);

return 0;
}

static bool trySendRequest(tunnel_t *self, http2_client_con_state_t *con, int32_t stream_id, shift_buffer_t *buf)
Expand Down Expand Up @@ -189,15 +195,18 @@ static int onHeaderCallback(nghttp2_session *session, const nghttp2_frame *frame
// }
// }

if (strcmp((const char *) name, "grpc-ack") == 0)
if (strcmp((const char *) name, "custom-ack") == 0)
{
http2_client_child_con_state_t *stream = nghttp2_session_get_stream_user_data(session, frame->hd.stream_id);
if (stream)
{
const int consumed = atoi((const char *) value);

if (stream->bytes_sent_nack >= kMaxSendBeforeAck)
{
stream->bytes_sent_nack -= consumed;
LOGD("consumed: %d left: %d",consumed,(int)stream->bytes_sent_nack);

if (stream->bytes_sent_nack < kMaxSendBeforeAck)
{
resumeLineDownSide(stream->line);
Expand All @@ -206,6 +215,8 @@ static int onHeaderCallback(nghttp2_session *session, const nghttp2_frame *frame
else
{
stream->bytes_sent_nack -= consumed;
LOGD("consumed: %d left: %d",consumed,(int)stream->bytes_sent_nack);

}
}
}
Expand Down Expand Up @@ -238,22 +249,22 @@ static int onDataChunkRecvCallback(nghttp2_session *session, uint8_t flags, int3
shift_buffer_t *buf = popBuffer(getLineBufferPool(con->line));
setLen(buf, len);
writeRaw(buf, data, len);
bufferStreamPush(stream->chunkbs, buf);
bufferStreamPush(stream->grpc_buffer_stream, buf);

while (true)
{
if (stream->bytes_needed == 0 && bufferStreamLen(stream->chunkbs) >= GRPC_MESSAGE_HDLEN)
if (stream->grpc_bytes_needed == 0 && bufferStreamLen(stream->grpc_buffer_stream) >= GRPC_MESSAGE_HDLEN)
{
shift_buffer_t *gheader_buf = bufferStreamRead(stream->chunkbs, GRPC_MESSAGE_HDLEN);
shift_buffer_t *gheader_buf = bufferStreamRead(stream->grpc_buffer_stream, GRPC_MESSAGE_HDLEN);
grpc_message_hd msghd;
grpcMessageHdUnpack(&msghd, rawBuf(gheader_buf));
stream->bytes_needed = msghd.length;
stream->grpc_bytes_needed = msghd.length;
reuseBuffer(getLineBufferPool(con->line), gheader_buf);
}
if (stream->bytes_needed > 0 && bufferStreamLen(stream->chunkbs) >= stream->bytes_needed)
if (stream->grpc_bytes_needed > 0 && bufferStreamLen(stream->grpc_buffer_stream) >= stream->grpc_bytes_needed)
{
shift_buffer_t *gdata_buf = bufferStreamRead(stream->chunkbs, stream->bytes_needed);
stream->bytes_needed = 0;
shift_buffer_t *gdata_buf = bufferStreamRead(stream->grpc_buffer_stream, stream->grpc_bytes_needed);
stream->grpc_bytes_needed = 0;
context_t *stream_data = newContext(stream->line);
stream_data->payload = gdata_buf;

Expand All @@ -263,7 +274,7 @@ static int onDataChunkRecvCallback(nghttp2_session *session, uint8_t flags, int3
nghttp2_nv nvs[1];
char stri[32];
sprintf(stri, "%d", (int) stream->bytes_received_nack);
nvs[0] = makeNV("grpc-ack", stri);
nvs[0] = makeNV("custom-ack", stri);
nghttp2_submit_headers(con->session, NGHTTP2_FLAG_END_HEADERS, stream_id, NULL, &nvs[0], 1, NULL);
stream->bytes_received_nack = 0;
}
Expand Down Expand Up @@ -292,7 +303,7 @@ static int onDataChunkRecvCallback(nghttp2_session *session, uint8_t flags, int3
nghttp2_nv nvs[1];
char stri[32];
sprintf(stri, "%d", (int) stream->bytes_received_nack);
nvs[0] = makeNV("grpc-ack", stri);
nvs[0] = makeNV("custom-ack", stri);
nghttp2_submit_headers(con->session, NGHTTP2_FLAG_END_HEADERS, stream_id, NULL, &nvs[0], 1, NULL);
stream->bytes_received_nack = 0;
}
Expand All @@ -305,15 +316,16 @@ static int onDataChunkRecvCallback(nghttp2_session *session, uint8_t flags, int3

static int onFrameRecvCallback(nghttp2_session *session, const nghttp2_frame *frame, void *userdata)
{
(void) session;
if (WW_UNLIKELY(userdata == NULL))
{
return 0;
}

// LOGD("onFrameRecvCallback\n");
printFrameHd(&frame->hd);
http2_client_con_state_t *con = (http2_client_con_state_t *) userdata;
tunnel_t *self = con->tunnel;
http2_client_con_state_t *con = (http2_client_con_state_t *) userdata;
// tunnel_t *self = con->tunnel;

switch (frame->hd.type)
{
Expand All @@ -338,25 +350,6 @@ static int onFrameRecvCallback(nghttp2_session *session, const nghttp2_frame *fr
default:
break;
}
if (frame->hd.flags & kHttP2FlagEndStream)
{
http2_client_child_con_state_t *stream = nghttp2_session_get_stream_user_data(session, frame->hd.stream_id);
if (! stream)
{
return 0;
}
// http2_client_state_t *state = TSTATE(self);
resumeLineUpSide(stream->parent);
nghttp2_session_set_stream_user_data(con->session, stream->stream_id, NULL);
context_t *fc = newFinContext(stream->line);
CSTATE_DROP(fc);
tunnel_t *dest = stream->tunnel->dw;
removeStream(con, stream);
deleteHttp2Stream(stream);
dest->downStream(dest, fc);

return 0;
}

if ((frame->hd.type & NGHTTP2_HEADERS) == NGHTTP2_HEADERS)
{
Expand Down Expand Up @@ -440,23 +433,40 @@ static void upStream(tunnel_t *self, context_t *c)
http2_client_con_state_t *con = LSTATE(stream->parent);
CSTATE_DROP(c);

if (con->content_type == kApplicationGrpc && con->handshake_completed)
// LOGE("closed %d",stream->stream_id);
int flags = NGHTTP2_FLAG_END_STREAM | NGHTTP2_FLAG_END_HEADERS;
if (con->content_type == kApplicationGrpc)
{
lockLine(con->line);
sendGrpcFinalData(self, con->line, stream->stream_id);
if (! isAlive(con->line))
nghttp2_nv nv = makeNV("grpc-status", "0");
nghttp2_submit_headers(con->session, flags, stream->stream_id, NULL, &nv, 1, NULL);
}
else
{
nghttp2_submit_headers(con->session, flags, stream->stream_id, NULL, NULL, 0, NULL);
}

nghttp2_session_set_stream_user_data(con->session, stream->stream_id, NULL);
removeStream(con, stream);
deleteHttp2Stream(stream);


lockLine(con->line);
while (trySendRequest(self, con, 0, NULL))
{
if (! isAlive(c->line))
{
unLockLine(con->line);
destroyContext(c);
return;
}
}
if (! isAlive(con->line))
{
unLockLine(con->line);
destroyContext(c);
return;
}

resumeLineUpSide(con->line);
nghttp2_session_set_stream_user_data(con->session, stream->stream_id, NULL);
removeStream(con, stream);
deleteHttp2Stream(stream);
unLockLine(con->line);

if (con->root.next == NULL && con->childs_added >= state->concurrency && isAlive(con->line))
{
Expand Down Expand Up @@ -560,6 +570,7 @@ tunnel_t *newHttp2Client(node_instance_context_t *instance_info)
nghttp2_session_callbacks_set_on_header_callback(state->cbs, onHeaderCallback);
nghttp2_session_callbacks_set_on_data_chunk_recv_callback(state->cbs, onDataChunkRecvCallback);
nghttp2_session_callbacks_set_on_frame_recv_callback(state->cbs, onFrameRecvCallback);
nghttp2_session_callbacks_set_on_stream_close_callback(state->cbs, onStreamClosedCallback);

for (size_t i = 0; i < workers_count; i++)
{
Expand Down
Loading

0 comments on commit b214fdc

Please sign in to comment.