diff --git a/tars/adapter.go b/tars/adapter.go index 843a4ca3..8046652c 100755 --- a/tars/adapter.go +++ b/tars/adapter.go @@ -256,7 +256,7 @@ func (c *AdapterProxy) doKeepAlive() { IRequestId: c.servantProxy.genRequestID(), SServantName: c.servantProxy.name, SFuncName: "tars_ping", - ITimeout: int32(c.servantProxy.timeout), + ITimeout: int32(c.servantProxy.asyncTimeout), } msg := &Message{Req: &req, Ser: c.servantProxy} msg.Init() diff --git a/tars/application.go b/tars/application.go index dffd0085..f2be1d3c 100755 --- a/tars/application.go +++ b/tars/application.go @@ -228,6 +228,7 @@ func (a *application) initConfig() { a.cltCfg.Stat = cMap["stat"] a.cltCfg.Property = cMap["property"] a.cltCfg.ModuleName = cMap["modulename"] + a.cltCfg.SyncInvokeTimeout = c.GetIntWithDef("/tars/application/client", SyncInvokeTimeout) a.cltCfg.AsyncInvokeTimeout = c.GetIntWithDef("/tars/application/client", AsyncInvokeTimeout) a.cltCfg.RefreshEndpointInterval = c.GetIntWithDef("/tars/application/client", refreshEndpointInterval) a.cltCfg.ReportInterval = c.GetIntWithDef("/tars/application/client", reportInterval) diff --git a/tars/config.go b/tars/config.go index 39dba6ce..1cc68dd3 100755 --- a/tars/config.go +++ b/tars/config.go @@ -80,8 +80,9 @@ type clientConfig struct { ReportInterval int CheckStatusInterval int KeepAliveInterval int - AsyncInvokeTimeout int // add client timeout + SyncInvokeTimeout int + AsyncInvokeTimeout int ClientQueueLen int ClientIdleTimeout time.Duration ClientReadTimeout time.Duration @@ -152,6 +153,7 @@ func newClientConfig() *clientConfig { ReportInterval: reportInterval, CheckStatusInterval: checkStatusInterval, KeepAliveInterval: keepAliveInterval, + SyncInvokeTimeout: SyncInvokeTimeout, AsyncInvokeTimeout: AsyncInvokeTimeout, ClientQueueLen: ClientQueueLen, ClientIdleTimeout: tools.ParseTimeOut(ClientIdleTimeout), diff --git a/tars/message.go b/tars/message.go index a0a2b11b..6c639b21 100644 --- a/tars/message.go +++ b/tars/message.go @@ -1,10 +1,15 @@ package tars import ( + "context" "time" + "github.com/TarsCloud/TarsGo/tars/model" + "github.com/TarsCloud/TarsGo/tars/protocol/res/basef" "github.com/TarsCloud/TarsGo/tars/protocol/res/requestf" "github.com/TarsCloud/TarsGo/tars/selector" + "github.com/TarsCloud/TarsGo/tars/util/current" + "github.com/TarsCloud/TarsGo/tars/util/tools" ) // HashType is the hash type @@ -31,6 +36,9 @@ type Message struct { hashCode uint32 hashType HashType isHash bool + Async bool + Callback model.Callback + RespCh chan *requestf.ResponsePacket } // Init define the beginTime @@ -66,3 +74,57 @@ func (m *Message) HashType() selector.HashType { func (m *Message) IsHash() bool { return m.isHash } + +func newMessage(ctx context.Context, cType byte, + sFuncName string, + buf []byte, + status map[string]string, + reqContext map[string]string, + resp *requestf.ResponsePacket, + s *ServantProxy) *Message { + + // 将ctx中的dyeing信息传入到request中 + var msgType int32 + if dyeingKey, ok := current.GetDyeingKey(ctx); ok { + TLOG.Debug("dyeing debug: find dyeing key:", dyeingKey) + if status == nil { + status = make(map[string]string) + } + status[current.StatusDyedKey] = dyeingKey + msgType |= basef.TARSMESSAGETYPEDYED + } + + // 将ctx中的trace信息传入到request中 + if trace, ok := current.GetTarsTrace(ctx); ok && trace.Call() { + traceKey := trace.GetTraceFullKey(false) + TLOG.Debug("trace debug: find trace key:", traceKey) + if status == nil { + status = make(map[string]string) + } + status[current.StatusTraceKey] = traceKey + msgType |= basef.TARSMESSAGETYPETRACE + } + + req := requestf.RequestPacket{ + IVersion: s.version, + CPacketType: int8(cType), + IMessageType: msgType, + IRequestId: s.genRequestID(), + SServantName: s.name, + SFuncName: sFuncName, + ITimeout: int32(s.syncTimeout), + SBuffer: tools.ByteToInt8(buf), + Context: reqContext, + Status: status, + } + msg := &Message{Req: &req, Ser: s, Resp: resp} + msg.Init() + + if ok, hashType, hashCode, isHash := current.GetClientHash(ctx); ok { + msg.isHash = isHash + msg.hashType = HashType(hashType) + msg.hashCode = hashCode + } + + return msg +} diff --git a/tars/model/servant.go b/tars/model/servant.go index 22e43f68..696cd789 100755 --- a/tars/model/servant.go +++ b/tars/model/servant.go @@ -8,6 +8,10 @@ import ( "github.com/TarsCloud/TarsGo/tars/protocol/res/requestf" ) +type Callback interface { + Dispatch(context.Context, *requestf.RequestPacket, *requestf.ResponsePacket, error) (int32, error) +} + // Servant is interface for call the remote server. type Servant interface { Name() string @@ -17,6 +21,15 @@ type Servant interface { status map[string]string, context map[string]string, resp *requestf.ResponsePacket) error + + TarsInvokeAsync(ctx context.Context, cType byte, + sFuncName string, + buf []byte, + status map[string]string, + context map[string]string, + resp *requestf.ResponsePacket, + callback Callback) error + TarsSetTimeout(t int) TarsSetProtocol(Protocol) Endpoints() []*endpoint.Endpoint diff --git a/tars/servant.go b/tars/servant.go index 006acf90..5dcabade 100755 --- a/tars/servant.go +++ b/tars/servant.go @@ -15,7 +15,6 @@ import ( "github.com/TarsCloud/TarsGo/tars/util/current" "github.com/TarsCloud/TarsGo/tars/util/endpoint" "github.com/TarsCloud/TarsGo/tars/util/rtimer" - "github.com/TarsCloud/TarsGo/tars/util/tools" ) var ( @@ -31,13 +30,14 @@ const ( // ServantProxy tars servant proxy instance type ServantProxy struct { - name string - comm *Communicator - manager EndpointManager - timeout int - version int16 - proto model.Protocol - queueLen int32 + name string + comm *Communicator + manager EndpointManager + syncTimeout int + asyncTimeout int + version int16 + proto model.Protocol + queueLen int32 pushCallback func([]byte) } @@ -49,10 +49,11 @@ func NewServantProxy(comm *Communicator, objName string, opts ...EndpointManager func newServantProxy(comm *Communicator, objName string, opts ...EndpointManagerOption) *ServantProxy { s := &ServantProxy{ - comm: comm, - proto: &protocol.TarsProtocol{}, - timeout: comm.Client.AsyncInvokeTimeout, - version: basef.TARSVERSION, + comm: comm, + proto: &protocol.TarsProtocol{}, + syncTimeout: comm.Client.AsyncInvokeTimeout, + asyncTimeout: comm.Client.AsyncInvokeTimeout, + version: basef.TARSVERSION, } pos := strings.Index(objName, "@") if pos > 0 { @@ -77,7 +78,7 @@ func (s *ServantProxy) Name() string { // TarsSetTimeout sets the timeout for client calling the server , which is in ms. func (s *ServantProxy) TarsSetTimeout(t int) { - s.timeout = t + s.syncTimeout = t } // TarsSetVersion set tars version @@ -122,53 +123,42 @@ func (s *ServantProxy) TarsInvoke(ctx context.Context, cType byte, resp *requestf.ResponsePacket) error { defer CheckPanic() - // 将ctx中的dyeing信息传入到request中 - var msgType int32 - if dyeingKey, ok := current.GetDyeingKey(ctx); ok { - TLOG.Debug("dyeing debug: find dyeing key:", dyeingKey) - if status == nil { - status = make(map[string]string) - } - status[current.StatusDyedKey] = dyeingKey - msgType |= basef.TARSMESSAGETYPEDYED + msg := newMessage(ctx, cType, sFuncName, buf, status, reqContext, resp, s) + timeout := time.Duration(s.syncTimeout) * time.Millisecond + if err := s.invokeFilters(ctx, msg, timeout); err != nil { + return err } + *resp = *msg.Resp + return nil +} - // 将ctx中的trace信息传入到request中 - if trace, ok := current.GetTarsTrace(ctx); ok && trace.Call() { - traceKey := trace.GetTraceFullKey(false) - TLOG.Debug("trace debug: find trace key:", traceKey) - if status == nil { - status = make(map[string]string) - } - status[current.StatusTraceKey] = traceKey - msgType |= basef.TARSMESSAGETYPETRACE - } +// TarsInvokeAsync is used for client invoking server. +func (s *ServantProxy) TarsInvokeAsync(ctx context.Context, cType byte, + sFuncName string, + buf []byte, + status map[string]string, + reqContext map[string]string, + resp *requestf.ResponsePacket, + callback model.Callback) error { + defer CheckPanic() - req := requestf.RequestPacket{ - IVersion: s.version, - CPacketType: int8(cType), - IRequestId: s.genRequestID(), - SServantName: s.name, - SFuncName: sFuncName, - SBuffer: tools.ByteToInt8(buf), - ITimeout: int32(s.timeout), - Context: reqContext, - Status: status, - IMessageType: msgType, - } - msg := &Message{Req: &req, Ser: s, Resp: resp} - msg.Init() - - timeout := time.Duration(s.timeout) * time.Millisecond - if ok, hashType, hashCode, isHash := current.GetClientHash(ctx); ok { - msg.isHash = isHash - msg.hashType = HashType(hashType) - msg.hashCode = hashCode + msg := newMessage(ctx, cType, sFuncName, buf, status, reqContext, resp, s) + msg.Req.ITimeout = int32(s.asyncTimeout) + if callback == nil { + msg.Req.CPacketType = basef.TARSONEWAY + } else { + msg.Async = true + msg.Callback = callback } + timeout := time.Duration(s.asyncTimeout) * time.Millisecond + return s.invokeFilters(ctx, msg, timeout) +} + +func (s *ServantProxy) invokeFilters(ctx context.Context, msg *Message, timeout time.Duration) error { if ok, to, isTimeout := current.GetClientTimeout(ctx); ok && isTimeout { timeout = time.Duration(to) * time.Millisecond - req.ITimeout = int32(to) + msg.Req.ITimeout = int32(to) } var err error @@ -196,11 +186,19 @@ func (s *ServantProxy) TarsInvoke(ctx context.Context, cType byte, } } } - s.manager.postInvoke() + // no async rpc call + if !msg.Async { + s.manager.postInvoke() + msg.End() + s.reportStat(msg, err) + } + + return err +} +func (s *ServantProxy) reportStat(msg *Message, err error) { if err != nil { - msg.End() - TLOG.Errorf("Invoke error: %s, %s, %v, cost:%d", s.name, sFuncName, err.Error(), msg.Cost()) + TLOG.Errorf("Invoke error: %s, %s, %v, cost:%d", s.name, msg.Req.SFuncName, err.Error(), msg.Cost()) if msg.Resp == nil { ReportStat(msg, StatSuccess, StatSuccess, StatFailed) } else if msg.Status == basef.TARSINVOKETIMEOUT { @@ -208,15 +206,12 @@ func (s *ServantProxy) TarsInvoke(ctx context.Context, cType byte, } else { ReportStat(msg, StatSuccess, StatSuccess, StatFailed) } - return err + return } - msg.End() - *resp = *msg.Resp ReportStat(msg, StatFailed, StatSuccess, StatSuccess) - return err } -func (s *ServantProxy) doInvoke(ctx context.Context, msg *Message, timeout time.Duration) error { +func (s *ServantProxy) doInvoke(ctx context.Context, msg *Message, timeout time.Duration) (err error) { adp, needCheck := s.manager.SelectAdapterProxy(msg) if adp == nil { return errors.New("no adapter Proxy selected:" + msg.Req.SServantName) @@ -237,29 +232,60 @@ func (s *ServantProxy) doInvoke(ctx context.Context, msg *Message, timeout time. } atomic.AddInt32(&s.queueLen, 1) - readCh := make(chan *requestf.ResponsePacket) - adp.resp.Store(msg.Req.IRequestId, readCh) - defer func() { + msg.RespCh = make(chan *requestf.ResponsePacket) + adp.resp.Store(msg.Req.IRequestId, msg.RespCh) + var releaseFunc = func() { CheckPanic() atomic.AddInt32(&s.queueLen, -1) adp.resp.Delete(msg.Req.IRequestId) + } + defer func() { + if !msg.Async || err != nil { + releaseFunc() + } }() - if err := adp.Send(msg.Req); err != nil { + + if err = adp.Send(msg.Req); err != nil { adp.failAdd() return err } + if msg.Req.CPacketType == basef.TARSONEWAY { adp.successAdd() return nil } + + // async call rpc + if msg.Async { + go func() { + defer releaseFunc() + err := s.waitResp(msg, timeout, needCheck) + s.manager.postInvoke() + msg.End() + s.reportStat(msg, err) + if msg.Status != basef.TARSINVOKETIMEOUT { + current.SetResponseContext(ctx, msg.Resp.Context) + current.SetResponseStatus(ctx, msg.Resp.Status) + } + if _, err := msg.Callback.Dispatch(ctx, msg.Req, msg.Resp, err); err != nil { + TLOG.Errorf("Callback error: %s, %s, %+v", s.name, msg.Req.SFuncName, err) + } + }() + return nil + } + + return s.waitResp(msg, timeout, needCheck) +} + +func (s *ServantProxy) waitResp(msg *Message, timeout time.Duration, needCheck bool) error { + adp := msg.Adp select { case <-rtimer.After(timeout): msg.Status = basef.TARSINVOKETIMEOUT adp.failAdd() - msg.End() return fmt.Errorf("request timeout, begin time:%d, cost:%d, obj:%s, func:%s, addr:(%s:%d), reqid:%d", msg.BeginTime, msg.Cost(), msg.Req.SServantName, msg.Req.SFuncName, adp.point.Host, adp.point.Port, msg.Req.IRequestId) - case msg.Resp = <-readCh: + case msg.Resp = <-msg.RespCh: if needCheck { go func() { adp.reset() diff --git a/tars/setting.go b/tars/setting.go index 0eda0a64..28822799 100755 --- a/tars/setting.go +++ b/tars/setting.go @@ -85,8 +85,10 @@ const ( // communicator default ,update from remote config refreshEndpointInterval int = 60000 reportInterval int = 5000 + // SyncInvokeTimeout sync invoke timeout + SyncInvokeTimeout int = 3000 // AsyncInvokeTimeout async invoke timeout - AsyncInvokeTimeout int = 3000 + AsyncInvokeTimeout int = 5000 // check endpoint status every 1000 ms checkStatusInterval int = 1000 diff --git a/tars/tools/tars2go/gen_go.go b/tars/tools/tars2go/gen_go.go index f96e2016..d3fca47c 100755 --- a/tars/tools/tars2go/gen_go.go +++ b/tars/tools/tars2go/gen_go.go @@ -1111,6 +1111,8 @@ func (gen *GenGo) genInterface(itf *InterfaceInfo) { gen.genHead() gen.genIFPackage(itf) + gen.genIFCallbackInterfaceWithContext(itf) + gen.genIFCallbackDispatch(itf) gen.genIFProxy(itf) gen.genIFServer(itf) @@ -1166,16 +1168,20 @@ func (obj *` + itf.Name + `) AddServantWithContext(imp ` + itf.Name + `ServantWi } for _, v := range itf.Fun { - gen.genIFProxyFun(itf.Name, &v, false, false) - gen.genIFProxyFun(itf.Name, &v, true, false) - gen.genIFProxyFun(itf.Name, &v, true, true) + gen.genIFProxyFun(itf.Name, &v, false, false, false) + gen.genIFProxyFun(itf.Name, &v, true, false, false) + gen.genIFProxyFun(itf.Name, &v, true, false, true) + gen.genIFProxyFun(itf.Name, &v, true, true, false) } } -func (gen *GenGo) genIFProxyFun(interfName string, fun *FunInfo, withContext bool, isOneWay bool) { +func (gen *GenGo) genIFProxyFun(interfName string, fun *FunInfo, withContext bool, isOneWay bool, isAsync bool) { c := &gen.code if withContext { - if isOneWay { + if isAsync { + c.WriteString("// Async" + fun.Name + "WithContext is the proxy function for the method defined in the tars file, with the context\n") + c.WriteString("func (obj *" + interfName + ") Async" + fun.Name + "WithContext(tarsCtx context.Context, callback " + interfName + "Callback, ") + } else if isOneWay { c.WriteString("// " + fun.Name + "OneWayWithContext is the proxy function for the method defined in the tars file, with the context\n") c.WriteString("func (obj *" + interfName + ") " + fun.Name + "OneWayWithContext(tarsCtx context.Context,") } else { @@ -1187,6 +1193,9 @@ func (gen *GenGo) genIFProxyFun(interfName string, fun *FunInfo, withContext boo c.WriteString("func (obj *" + interfName + ") " + fun.Name + "(") } for _, v := range fun.Args { + if isAsync && v.IsOut { + continue + } gen.genArgs(&v) } @@ -1209,8 +1218,11 @@ func (gen *GenGo) genIFProxyFun(interfName string, fun *FunInfo, withContext boo return } - if fun.HasRet { - c.WriteString("(ret " + gen.genType(fun.RetType) + ", err error) {\n") + // 异步调用不需要返回值 + if isAsync { + c.WriteString("(err error) {\n") + } else if fun.HasRet { + c.WriteString("(ret " + gen.genType(fun.RetType) + ", err error){\n") } else { c.WriteString("(err error) {\n") } @@ -1224,6 +1236,10 @@ func (gen *GenGo) genIFProxyFun(interfName string, fun *FunInfo, withContext boo c.WriteString("buf := codec.NewBuffer()") var isOut bool for k, v := range fun.Args { + // 异步调用不传递out参数 + if isAsync && v.IsOut { + continue + } if v.IsOut { isOut = true } @@ -1234,14 +1250,16 @@ func (gen *GenGo) genIFProxyFun(interfName string, fun *FunInfo, withContext boo if v.IsOut { dummy.Key = "(*" + dummy.Key + ")" } - gen.genWriteVar(dummy, "", fun.HasRet) + gen.genWriteVar(dummy, "", fun.HasRet && !isAsync) } // empty args and below separate c.WriteString("\n") - errStr := errString(fun.HasRet) // trace if !isOneWay && !withoutTrace { + if isAsync { + c.WriteString(`if callback != nil {`) + } c.WriteString(` trace, ok := current.GetTarsTrace(tarsCtx) if ok && trace.Call() { @@ -1265,6 +1283,9 @@ if ok && trace.Call() { } tars.Trace(trace.GetTraceKey(tarstrace.EstCS), tarstrace.AnnotationCS, tars.GetClientConfig().ModuleName, obj.servant.Name(), "` + fun.Name + `", 0, traceParam, "") }`) + if isAsync { + c.WriteString(`}`) + } c.WriteString("\n\n") } c.WriteString(`var statusMap map[string]string @@ -1275,25 +1296,34 @@ if len(opts) == 1{ contextMap = opts[0] statusMap = opts[1] } - -tarsResp := new(requestf.ResponsePacket)`) - - if isOneWay { +tarsResp := new(requestf.ResponsePacket) +`) + if isAsync { + c.WriteString("var cb *" + interfName + "CallbackProxy\n") + c.WriteString(`if callback != nil {`) c.WriteString(` - err = obj.servant.TarsInvoke(tarsCtx, 1, "` + fun.OriginName + `", buf.ToBytes(), statusMap, contextMap, tarsResp) - ` + errStr + ` + cb = &` + interfName + `CallbackProxy{callback: callback} +`) + c.WriteString(`} + err = obj.servant.TarsInvokeAsync(tarsCtx, 0, "` + fun.OriginName + `", buf.ToBytes(), statusMap, contextMap, tarsResp, cb) +`) + c.WriteString(errString(false) + ` + `) + } else if isOneWay { + c.WriteString(`err = obj.servant.TarsInvokeAsync(tarsCtx, 1, "` + fun.OriginName + `", buf.ToBytes(), statusMap, contextMap, tarsResp, nil) + ` + errString(fun.HasRet) + ` `) } else { - c.WriteString(` - err = obj.servant.TarsInvoke(tarsCtx, 0, "` + fun.OriginName + `", buf.ToBytes(), statusMap, contextMap, tarsResp) - ` + errStr + ` + c.WriteString(`err = obj.servant.TarsInvoke(tarsCtx, 0, "` + fun.OriginName + `", buf.ToBytes(), statusMap, contextMap, tarsResp) + ` + errString(fun.HasRet) + ` `) } - if (isOut || fun.HasRet) && !isOneWay { + if (isOut || fun.HasRet) && !isOneWay && !isAsync { c.WriteString("readBuf := codec.NewReader(tools.Int8ToByte(tarsResp.SBuffer))") } - if fun.HasRet && !isOneWay { + // read return value + if fun.HasRet && !isOneWay && !isAsync { dummy := &StructMember{} dummy.Type = fun.RetType dummy.Key = "ret" @@ -1302,7 +1332,7 @@ tarsResp := new(requestf.ResponsePacket)`) gen.genReadVar(dummy, "", fun.HasRet) } - if !isOneWay { + if !isOneWay && !isAsync { for k, v := range fun.Args { if v.IsOut { dummy := &StructMember{} @@ -1313,8 +1343,8 @@ tarsResp := new(requestf.ResponsePacket)`) gen.genReadVar(dummy, "", fun.HasRet) } } - if withContext && !withoutTrace { - traceParamFlag := "traceParamFlag := trace.NeedTraceParam(tarstrace.EstCR, uint(0))" + if !withoutTrace { + traceParamFlag := "traceParamFlag := tarace.NeedTraceParam(trace.EstCR, uint(0))" if isOut || fun.HasRet { traceParamFlag = "traceParamFlag := trace.NeedTraceParam(tarstrace.EstCR, uint(readBuf.Len()))" } @@ -1368,13 +1398,15 @@ if ok && trace.Call() { } }`) } - c.WriteString(` _ = length _ = have _ = ty `) - if fun.HasRet { + + if isAsync { + c.WriteString("return nil\n") + } else if fun.HasRet { c.WriteString("return ret, nil\n") } else { c.WriteString("return nil\n") @@ -1393,6 +1425,20 @@ func (gen *GenGo) genArgs(arg *ArgInfo) { c.WriteString(gen.genType(arg.Type) + ",") } +func (gen *GenGo) genIFCallbackInterfaceWithContext(itf *InterfaceInfo) { + c := &gen.code + c.WriteString("type " + itf.Name + "Callback interface {" + "\n") + for _, v := range itf.Fun { + gen.genIFCallbackFunWithContext(&v) + } + c.WriteString("}" + "\n\n") + + c.WriteString("// " + itf.Name + "CallbackProxy struct\n") + c.WriteString("type " + itf.Name + "CallbackProxy struct {" + "\n") + c.WriteString("callback " + itf.Name + "Callback\n") + c.WriteString("}" + "\n\n") +} + func (gen *GenGo) genIFServer(itf *InterfaceInfo) { c := &gen.code c.WriteString("type " + itf.Name + "Servant interface {\n") @@ -1408,7 +1454,22 @@ func (gen *GenGo) genIFServerWithContext(itf *InterfaceInfo) { for _, v := range itf.Fun { gen.genIFServerFunWithContext(&v) } - c.WriteString("} \n") + c.WriteString("}\n") +} + +func (gen *GenGo) genIFCallbackFunWithContext(fun *FunInfo) { + c := &gen.code + c.WriteString("Callback" + fun.Name + "(tarsCtx context.Context, ") + if fun.HasRet { + c.WriteString("ret " + gen.genType(fun.RetType) + ", ") + } + for _, v := range fun.Args { + if v.IsOut { + gen.genArgs(&v) + } + } + c.WriteString(")\n") + c.WriteString("Callback" + fun.Name + "Error(tarsCtx context.Context, err error)\n") } func (gen *GenGo) genIFServerFun(fun *FunInfo) { @@ -1439,6 +1500,212 @@ func (gen *GenGo) genIFServerFunWithContext(fun *FunInfo) { c.WriteString("err error)\n") } +func (gen *GenGo) genIFCallbackDispatch(itf *InterfaceInfo) { + c := &gen.code + c.WriteString("// Dispatch is used to call the server side implement for the method defined in the tars file\n") + c.WriteString("func(obj *" + itf.Name + `CallbackProxy) Dispatch(tarsCtx context.Context, tarsReq *requestf.RequestPacket, tarsResp *requestf.ResponsePacket, errResp error) (ret int32, err error) { + var ( + length int32 + have bool + ty byte + ) + `) + + var param bool + for _, v := range itf.Fun { + if len(v.Args) > 0 { + param = true + break + } + } + + if param { + c.WriteString("readBuf := codec.NewReader(tools.Int8ToByte(tarsResp.SBuffer))") + } else { + c.WriteString("readBuf := codec.NewReader(nil)") + } + c.WriteString(` + buf := codec.NewBuffer() + switch tarsReq.SFuncName { +`) + + for _, v := range itf.Fun { + gen.genCallbackSwitchCase(itf.Name, &v) + } + + c.WriteString(` + default: + return basef.TARSSERVERSUCCESS, fmt.Errorf("func mismatch") + } + + _ = readBuf + _ = buf + _ = length + _ = have + _ = ty + return tarsResp.IRet, nil +} +`) +} + +func (gen *GenGo) genCallbackSwitchCase(tname string, fun *FunInfo) { + c := &gen.code + c.WriteString(`case "` + fun.OriginName + `":` + "\n") + + c.WriteString(`ret := tarsResp.IRet + if errResp != nil { + obj.callback.Callback` + fun.Name + `Error(tarsCtx, errResp) + return ret, nil + } +`) + c.WriteString(` + defer func() { + if err != nil { + obj.callback.Callback` + fun.Name + `Error(tarsCtx, err) + } + }() +`) + if fun.HasRet { + c.WriteString("var funRet " + gen.genType(fun.RetType)) + dummy := &StructMember{} + dummy.Type = fun.RetType + dummy.Key = "funRet" + dummy.Tag = 0 + dummy.Require = true + gen.genReadVar(dummy, "", fun.HasRet) + } + + outArgsCount := 0 + for _, v := range fun.Args { + if v.IsOut { + c.WriteString("var " + v.Name + " " + gen.genType(v.Type) + "\n") + if v.Type.Type == tkTMap { + c.WriteString(v.Name + " = make(" + gen.genType(v.Type) + ")\n") + } else if v.Type.Type == tkTVector { + c.WriteString(v.Name + " = make(" + gen.genType(v.Type) + ", 0)\n") + } + outArgsCount++ + } + } + + c.WriteString("\n") + + if outArgsCount > 0 { + c.WriteString("if tarsResp.IVersion == basef.TARSVERSION {\n") + + for k, v := range fun.Args { + if v.IsOut { + dummy := &StructMember{} + dummy.Type = v.Type + dummy.Key = v.Name + dummy.Tag = int32(k + 1) + dummy.Require = true + gen.genReadVar(dummy, "", true) + } + } + + c.WriteString(`} else if tarsResp.IVersion == basef.TUPVERSION { + reqTup := tup.NewUniAttribute() + reqTup.Decode(readBuf) + + var tupBuffer []byte + + `) + for _, v := range fun.Args { + if v.IsOut { + c.WriteString("\n") + c.WriteString(`reqTup.GetBuffer("` + v.Name + `", &tupBuffer)` + "\n") + c.WriteString("readBuf.Reset(tupBuffer)") + + dummy := &StructMember{} + dummy.Type = v.Type + dummy.Key = v.Name + dummy.Tag = 0 + dummy.Require = true + gen.genReadVar(dummy, "", true) + } + } + + c.WriteString(`} else if tarsResp.IVersion == basef.JSONVERSION { + var jsonData map[string]interface{} + decoder := json.NewDecoder(bytes.NewReader(readBuf.ToBytes())) + decoder.UseNumber() + err = decoder.Decode(&jsonData) + if err != nil { + return ret, fmt.Errorf("decode resppacket failed, error: %+v", err) + } + `) + + for _, v := range fun.Args { + if v.IsOut { + c.WriteString("{\n") + c.WriteString(`jsonStr, _ := json.Marshal(jsonData["` + v.Name + `"])` + "\n") + if v.Type.CType == tkStruct { + c.WriteString(v.Name + ".ResetDefault()\n") + } + c.WriteString("if err = json.Unmarshal(jsonStr, &" + v.Name + "); err != nil {") + c.WriteString(` + return ret, err + } + } + `) + } + } + + c.WriteString(` + } else { + err = fmt.Errorf("decode resppacket fail, error version: %d", tarsReq.IVersion) + return ret, err + }`) + + c.WriteString("\n\n") + } + if !withoutTrace { + c.WriteString(` +trace, ok := current.GetTarsTrace(tarsCtx) +if ok && trace.Call() { + var traceParam string + traceParamFlag := trace.NeedTraceParam(tarstrace.EstCR, uint(readBuf.Len())) + if traceParamFlag == tarstrace.EnpNormal { + value := map[string]interface{}{} +`) + if fun.HasRet { + c.WriteString(`value[""] = funRet` + "\n") + } + for _, v := range fun.Args { + if v.IsOut { + c.WriteString(`value["` + v.Name + `"] = ` + v.Name + "\n") + } + } + c.WriteString(`jm, _ := json.Marshal(value) + traceParam = string(jm) + } else if traceParamFlag == tarstrace.EnpOverMaxLen { + traceParam = "{\"trace_param_over_max_len\":true}" + } + tars.Trace(trace.GetTraceKey(tarstrace.EstCR), tarstrace.AnnotationCR, tars.GetClientConfig().ModuleName, tarsReq.SServantName, "` + fun.OriginName + `", tarsResp.IRet, traceParam, "") +}`) + c.WriteString("\n\n") + } + + c.WriteString(` + if err != nil { + return ret, err + } + `) + + c.WriteString(` + obj.callback.Callback` + fun.Name + `(tarsCtx,`) + if fun.HasRet { + c.WriteString("funRet, ") + } + for _, v := range fun.Args { + if v.IsOut { + c.WriteString("&" + v.Name + ",") + } + } + c.WriteString(")\n") +} + func (gen *GenGo) genIFDispatch(itf *InterfaceInfo) { c := &gen.code c.WriteString("// Dispatch is used to call the server side implement for the method defined in the tars file. withContext shows using context or not. \n")