Skip to content

Commit

Permalink
DEV: serve function continuation
Browse files Browse the repository at this point in the history
  • Loading branch information
Mgrdich committed Nov 28, 2024
1 parent 386807d commit ca7d8b8
Showing 1 changed file with 177 additions and 2 deletions.
179 changes: 177 additions & 2 deletions pkg/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ type conn struct {
// Immutable; never nil.
server *MiniServer

// cancelCtx cancels the connection-level context.
cancelCtx context.CancelFunc

// rwc is the underlying network connection.
// This is never wrapped by other types and is the value given out
// to CloseNotifier callers. It is usually of type *net.TCPConn or
Expand All @@ -55,17 +58,27 @@ type conn struct {
// This is the value of a Handler's (*Request).RemoteAddr.
remoteAddr string

// werr is set to the first write error to rwc.
// It is set via checkConnErrorWriter{w}, where bufw writes.
werr error

// bufr reads from r.
bufr *bufio.Reader

// bufw writes to checkConnErrorWriter{c}, which populates werr on error.
bufw *bufio.Writer

// lastMethod is the method of the most recent request
// on this connection, if any.
lastMethod string

// r is bufr's read source. It's a wrapper around rwc that provides
// io.LimitedReader-style limiting (while reading request headers)
// and functionality to support CloseNotifier. See *connReader docs.
r *connReader

curReq atomic.Pointer[response] // (which has a Request in it)

curState atomic.Uint64 // packed (unixtime<<8|uint8(ConnState))

}
Expand Down Expand Up @@ -461,6 +474,23 @@ func (s *MiniServer) trackConn(c *conn, add bool) {
delete(s.activeConn, c)
}

// DefaultMaxHeaderBytes is the maximum permitted size of the headers
// in an HTTP request.
// This can be overridden by setting Server.MaxHeaderBytes.
const DefaultMaxHeaderBytes = 1 << 20 // 1 MB

func (s *MiniServer) maxHeaderBytes() int {
if s.MaxHeaderBytes > 0 {
return s.MaxHeaderBytes
}

return DefaultMaxHeaderBytes
}

func (s *MiniServer) initialReadLimitSize() int64 {
return int64(s.maxHeaderBytes()) + 4096 // bufio slop
}

// conn Methods

func (c *conn) setState(state ConnState) {
Expand All @@ -487,7 +517,7 @@ func (c *conn) serve(ctx context.Context) {
c.remoteAddr = ra.String()
}

// ctx = context.WithValue(ctx, LocalAddrContextKey, c.rwc.LocalAddr())
ctx = context.WithValue(ctx, LocalAddrContextKey, c.rwc.LocalAddr())

var inFlightResponse *response

Expand Down Expand Up @@ -519,19 +549,73 @@ func (c *conn) serve(ctx context.Context) {

// HTTP/1.x from here on

fmt.Print("testing")
ctx, cancelCtx := context.WithCancel(ctx)
c.cancelCtx = cancelCtx
defer cancelCtx()

c.r = &connReader{conn: c}
c.bufr = newBufioReader(c.r)
c.bufw = newBufioWriterSize(checkConnErrorWriter{c}, 4<<10)

for {
w, err := c.readRequest(ctx)

if c.r.remain != c.server.initialReadLimitSize() {
// If we read any bytes off the wire, we're active.
c.setState(StateActive)
}

if err != nil {
fmt.Println(err, w)
}
}
}

var errTooLarge = errors.New("http: request too large")

// Read next request from connection.
func (c *conn) readRequest(ctx context.Context) (w *response, err error) {
return nil, err
}

func (c *conn) close() {
c.finalFlush()
c.rwc.Close()
}

func newBufioReader(r io.Reader) *bufio.Reader {
if v := bufioReaderPool.Get(); v != nil {
br := v.(*bufio.Reader)
br.Reset(r)

return br
}

// Note: if this reader size is ever changed, update
// TestHandlerBodyClose's assumptions.
return bufio.NewReader(r)
}

func putBufioReader(br *bufio.Reader) {
br.Reset(nil)
bufioReaderPool.Put(br)
}

func newBufioWriterSize(w io.Writer, size int) *bufio.Writer {
pool := bufioWriterPool(size)

if pool != nil {
if v := pool.Get(); v != nil {
bw := v.(*bufio.Writer)
bw.Reset(w)

return bw
}
}

return bufio.NewWriterSize(w, size)
}

// bufioWriterPool it checks the available size so it can assign correct writer to it
func bufioWriterPool(size int) *sync.Pool {
switch size {
Expand All @@ -552,6 +636,24 @@ func putBufioWriter(bw *bufio.Writer) {
}
}

// checkConnErrorWriter writes to c.rwc and records any write errors to c.werr.
// It only contains one field (and a pointer field at that), so it
// fits in an interface value without an extra allocation.
type checkConnErrorWriter struct {
c *conn
}

func (w checkConnErrorWriter) Write(p []byte) (n int, err error) {
n, err = w.c.rwc.Write(p)

if err != nil && w.c.werr == nil {
w.c.werr = err
w.c.cancelCtx()
}

return
}

func (c *conn) finalFlush() {
if c.bufr != nil {
// Steal the bufio.Reader (~4KB worth of memory) and its associated
Expand Down Expand Up @@ -673,3 +775,76 @@ func (cr *connReader) setInfiniteReadLimit() {
func (cr *connReader) hitReadLimit() bool {
return cr.remain <= 0
}

// handleReadError is called whenever a Read from the client returns a
// non-nil error.
//
// The provided non-nil err is almost always io.EOF or a "use of
// closed network connection". In any case, the error is not
// particularly interesting, except perhaps for debugging during
// development. Any error means the connection is dead and we should
// down its context.
//
// It may be called from multiple goroutines.
func (cr *connReader) handleReadError(_ error) {
cr.conn.cancelCtx()
cr.closeNotify()
}

// may be called from multiple goroutines.
func (cr *connReader) closeNotify() {
res := cr.conn.curReq.Load()

if res != nil && !res.didCloseNotify.Swap(true) {
res.closeNotifyCh <- true
}
}

func (cr *connReader) Read(p []byte) (n int, err error) {
cr.lock()

if cr.inRead {
cr.unlock()
panic("invalid concurrent Body.Read call")
}

if cr.hitReadLimit() {
cr.unlock()
return 0, io.EOF
}

if len(p) == 0 {
cr.unlock()
return 0, nil
}

if int64(len(p)) > cr.remain {
p = p[:cr.remain]
}

if cr.hasByte {
p[0] = cr.byteBuf[0]
cr.hasByte = false
cr.unlock()

return 1, nil
}

cr.inRead = true
cr.unlock()
n, err = cr.conn.rwc.Read(p)

cr.lock()
cr.inRead = false

if err != nil {
cr.handleReadError(err)
}

cr.remain -= int64(n)
cr.unlock()

cr.cond.Broadcast()

return n, err
}

0 comments on commit ca7d8b8

Please sign in to comment.