From b948c3b790f2fef9ac940333ae900e99d6e8b76c Mon Sep 17 00:00:00 2001 From: Anton Konovalov Date: Mon, 14 May 2018 11:59:50 +0300 Subject: [PATCH] add grpc test --- chan.go | 94 ++++++++++++++++++++++++++++++++++++++++++---------- chan_test.go | 57 ++++++++++++++++++++++++++++++- 2 files changed, 132 insertions(+), 19 deletions(-) diff --git a/chan.go b/chan.go index b426c55..cf472e7 100644 --- a/chan.go +++ b/chan.go @@ -5,12 +5,24 @@ import ( "context" "fmt" "io" + golog "log" "net" "sync" "time" ) +var ( + debugNET = true +) + +func log(args ...interface{}) { + if debugNET { + golog.Println(args...) + } +} + + var ( dnsOfChannels = map[string]*ChanListener{} dnsLock = new(sync.RWMutex) @@ -82,17 +94,22 @@ func DialChannel(ctx context.Context,network, address string) (*ChanConn,error) } conn := &ChanConn{ + prefix: `client`, + once: new(sync.Once), ctx:ctx, raddr:address, readBuff: new(bytes.Buffer), read:make(chan []byte,0), - write:make(chan []byte,1), + write:make(chan []byte,0), + accepted:make(chan struct{},0), } err := listener.send(conn) if err != nil { return nil, err } + <-conn.accepted + return conn,nil } @@ -101,7 +118,7 @@ func ListenChannel(ctx context.Context, address string) (*ChanListener,error) { ctx: ctx, once: new(sync.Once), addr: ChanAddr(address), - conns: make(chan *ChanConn,1), + conns: make(chan *ChanConn,0), } err := registryChannelListener(lis) @@ -113,14 +130,25 @@ func ListenChannel(ctx context.Context, address string) (*ChanListener,error) { } type ChanConn struct { + prefix string + once *sync.Once raddr string ctx context.Context readBuff *bytes.Buffer read chan []byte write chan []byte + accepted chan struct{} + closed bool } func (c ChanConn) Read(b []byte) (n int, err error) { + defer func () { + log(`{`,c.prefix,`}`,"READ \nERR",fmt.Sprint(err),"\n", fmt.Sprintf("%s",b)) + }() + + if c.closed { + return 0,io.EOF + } if c.readBuff.Len() > 0 { n, err = c.readBuff.Read(b) @@ -145,7 +173,16 @@ func (c ChanConn) Read(b []byte) (n int, err error) { } } + func (c ChanConn) Write(b []byte) (n int, err error) { + defer func () { + log(`{`,c.prefix,`}`,"WRITE ","\n","ERR",fmt.Sprint(err), "\n",fmt.Sprintf("%s",b)) + }() + + if c.closed { + return 0, io.EOF + } + select { case c.write <- b: return len(b), nil @@ -154,9 +191,13 @@ func (c ChanConn) Write(b []byte) (n int, err error) { } } -func (c ChanConn) Close() error { - close(c.read) - close(c.write) +func (c *ChanConn) Close() error { + c.once.Do(func () { + log("{",c.prefix,"} closed") + c.closed = true + close(c.read) + close(c.write) + }) return nil } @@ -198,40 +239,57 @@ func (c ChanListener) Accept() (net.Conn, error) { if !ok { return nil, io.EOF } + log("{ server } <-") lisConn := &ChanConn{ + prefix: `server`, ctx: conn.ctx, + once: new(sync.Once), raddr: conn.raddr, readBuff: new(bytes.Buffer), - read:make(chan []byte,1), + read:make(chan []byte,0), write:make(chan []byte,0), } - p := <- conn.write - lisConn.read <- p - start := make(chan struct{}) go func () { close(start) - select { - case d, ok := <- lisConn.write: - if !ok { + for { + select { + case d, ok := <- conn.write: + if !ok { + log("client connection was closed") + return + } + if !lisConn.closed { + lisConn.read <- d + } + log("{ client } >> { server }") + case d, ok := <- lisConn.write: + if !ok { + log("server connection was closed") + return + } + if !conn.closed { + conn.read <- d + log("{ server } >> { client }") + } + case <- lisConn.ctx.Done(): + return + case <- conn.ctx.Done(): return } - conn.read <- d - case <- conn.ctx.Done(): - return } }() - - <-start - + <- start + close(conn.accepted) return lisConn, nil } func (c ChanListener) send(conn *ChanConn) error { select { case c.conns <- conn: + log("{ client } ->") return nil case <-c.ctx.Done(): return c.ctx.Err() diff --git a/chan_test.go b/chan_test.go index 80beaa3..8a7fb40 100644 --- a/chan_test.go +++ b/chan_test.go @@ -6,9 +6,12 @@ import ( "net/http" "testing" "time" + + "google.golang.org/grpc" + pb "google.golang.org/grpc/examples/helloworld/helloworld" ) -func TestChanConn_Read(t *testing.T) { +func TestChanConnHttp(t *testing.T) { lis, err := ListenChannel(context.Background(),"my-service:0") if err != nil { t.Fatalf("unexpected error - %s",err) @@ -50,3 +53,55 @@ func TestChanConn_Read(t *testing.T) { t.Errorf("unexpected status code %d", resp.StatusCode ) } } + + +// server is used to implement helloworld.GreeterServer. +type server struct{} + +// SayHello implements helloworld.GreeterServer +func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) { + return &pb.HelloReply{Message: "Hello " + in.Name}, nil +} + +func TestChanConnGrpc(t *testing.T) { + lis, err := ListenChannel(context.Background(),"my-service:0") + if err != nil { + t.Fatalf("unexpected error - %s",err) + } + defer lis.Close() + + srv := grpc.NewServer() + pb.RegisterGreeterServer(srv,new(server)) + + + start := make(chan struct{}) + go func () { + close(start) + srv.Serve(lis) + }() + + <-start + + conn, err := grpc.DialContext(context.Background(),"my-service:0", + grpc.WithInsecure(), + grpc.WithDialer( + func (addr string, _ time.Duration) (net.Conn,error) { + return DialChannel(context.Background(),"channel",addr) + }), + ) + + if err != nil { + t.Errorf("unexpected error %s",err) + } + + resp , err := pb.NewGreeterClient(conn).SayHello(context.Background(),&pb.HelloRequest{ + Name:"Aloxa", + }) + if err != nil { + t.Errorf("unexpected error %s",err) + } else { + if resp.Message != "Hello Aloxa" { + t.Errorf("messaage resp 'Hello Aloxa' != %s", resp.Message) + } + } +}