diff --git a/internal/asyncapi/asyncapi.go b/internal/asyncapi/asyncapi.go index 398f9d2..192a303 100644 --- a/internal/asyncapi/asyncapi.go +++ b/internal/asyncapi/asyncapi.go @@ -11,10 +11,10 @@ type AsyncAPI struct { Asyncapi string `json:"asyncapi" yaml:"asyncapi"` ID string `json:"id" yaml:"id"` Info InfoItem `json:"info" yaml:"info"` - Servers types.OrderedMap[string, Server] `json:"servers" yaml:"servers"` + Servers types.OrderedMap[string, Server] `json:"servers" yaml:"servers" cgen:"selectable"` DefaultContentType string `json:"defaultContentType" yaml:"defaultContentType"` - Channels types.OrderedMap[string, Channel] `json:"channels" yaml:"channels"` - Operations types.OrderedMap[string, Operation] `json:"operations" yaml:"operations"` + Channels types.OrderedMap[string, Channel] `json:"channels" yaml:"channels" cgen:"selectable"` + Operations types.OrderedMap[string, Operation] `json:"operations" yaml:"operations" cgen:"selectable"` Components ComponentsItem `json:"components" yaml:"components"` } diff --git a/internal/asyncapi/channel.go b/internal/asyncapi/channel.go index 1ed9117..60a5b1b 100644 --- a/internal/asyncapi/channel.go +++ b/internal/asyncapi/channel.go @@ -46,18 +46,18 @@ func (c Channel) build(ctx *common.CompileContext, channelKey string, flags map[ return &render.Channel{Dummy: true}, nil } - _, isComponent := flags[common.SchemaTagComponent] + _, isSelectable := flags[common.SchemaTagSelectable] if c.Ref != "" { // Make a promise selectable if it defined in `channels` section - return registerRef(ctx, c.Ref, channelKey, lo.Ternary(isComponent, nil, lo.ToPtr(true))), nil + return registerRef(ctx, c.Ref, channelKey, lo.Ternary(isSelectable, lo.ToPtr(true), nil)), nil } chName, _ := lo.Coalesce(c.XGoName, channelKey) res := &render.Channel{ OriginalName: chName, - Address: c.Address, - IsComponent: isComponent, - IsPublisher: ctx.CompileOpts.GeneratePublishers, + Address: c.Address, + IsSelectable: isSelectable, + IsPublisher: ctx.CompileOpts.GeneratePublishers, IsSubscriber: ctx.CompileOpts.GenerateSubscribers, } @@ -65,7 +65,7 @@ func (c Channel) build(ctx *common.CompileContext, channelKey string, flags map[ if len(c.Servers) > 0 { ctx.Logger.Trace("Channel servers", "refs", c.Servers) for _, srvRef := range c.Servers { - prm := lang.NewPromise[*render.Server](srvRef.Ref) + prm := lang.NewPromise[*render.Server](srvRef.Ref, nil) res.ServersPromises = append(res.ServersPromises, prm) ctx.PutPromise(prm) } @@ -94,7 +94,7 @@ func (c Channel) build(ctx *common.CompileContext, channelKey string, flags map[ for _, paramName := range c.Parameters.Keys() { ctx.Logger.Trace("Channel parameter", "name", paramName) ref := ctx.PathStackRef("parameters", paramName) - prm := lang.NewGolangTypeAssignCbPromise(ref, nil, func(obj any) common.GolangType { + prm := lang.NewGolangTypePromise(ref, func(obj any) common.GolangType { return obj.(*render.Parameter).Type }) ctx.PutPromise(prm) @@ -106,12 +106,12 @@ func (c Channel) build(ctx *common.CompileContext, channelKey string, flags map[ ctx.Logger.PrevCallLevel() } - for _, msgEntry := range c.Messages.Entries() { - msgName, ref := msgEntry.Key, msgEntry.Value + for _, msgName := range c.Messages.Keys() { ctx.Logger.Trace("Channel message", "name", msgName) - refObj := lang.NewRef(ref.Ref, msgName, lo.ToPtr(false)) - ctx.PutPromise(refObj) - res.MessagesPromises = append(res.MessagesPromises, refObj) + ref := ctx.PathStackRef("messages", msgName) + prm2 := lang.NewRef(ref, msgName, lo.ToPtr(true)) + ctx.PutPromise(prm2) + res.MessagesRefs = append(res.MessagesRefs, prm2) } // All known Operations @@ -129,7 +129,7 @@ func (c Channel) build(ctx *common.CompileContext, channelKey string, flags map[ ctx.Logger.Trace("Found channel bindings") ref := ctx.PathStackRef("bindings") - res.BindingsPromise = lang.NewPromise[*render.Bindings](ref) + res.BindingsPromise = lang.NewPromise[*render.Bindings](ref, nil) ctx.PutPromise(res.BindingsPromise) res.BindingsType = &lang.GoStruct{ @@ -161,5 +161,5 @@ func (c Channel) build(ctx *common.CompileContext, channelKey string, flags map[ type SecurityRequirement struct { - types.OrderedMap[string, []string] // FIXME: orderedmap must be in fields as utils.OrderedMap[SecurityRequirement, []SecurityRequirement] + types.OrderedMap[string, types.Union2[[]string, string]] // Possible values: `"name": []` or `"$ref": "url"` } diff --git a/internal/asyncapi/components.go b/internal/asyncapi/components.go index a126985..2c02a1e 100644 --- a/internal/asyncapi/components.go +++ b/internal/asyncapi/components.go @@ -5,7 +5,7 @@ import ( ) type ComponentsItem struct { - Schemas types.OrderedMap[string, Object] `json:"schemas" yaml:"schemas" cgen:"components,marshal,definition"` + Schemas types.OrderedMap[string, Object] `json:"schemas" yaml:"schemas" cgen:"components,marshal,definition,selectable"` Servers types.OrderedMap[string, Server] `json:"servers" yaml:"servers" cgen:"components"` Channels types.OrderedMap[string, Channel] `json:"channels" yaml:"channels" cgen:"components"` diff --git a/internal/asyncapi/message.go b/internal/asyncapi/message.go index 2e94543..612e4ff 100644 --- a/internal/asyncapi/message.go +++ b/internal/asyncapi/message.go @@ -47,7 +47,6 @@ func (m Message) Compile(ctx *common.CompileContext) error { } func (m Message) build(ctx *common.CompileContext, messageKey string) (common.Renderable, error) { - _, isComponent := ctx.Stack.Top().Flags[common.SchemaTagComponent] if m.XIgnore { ctx.Logger.Debug("Message denoted to be ignored") return &render.Message{Dummy: true}, nil @@ -58,19 +57,16 @@ func (m Message) build(ctx *common.CompileContext, messageKey string) (common.Re // Message is the only type of objects, that has their own root key, the key in components and can be used // as ref in other objects at the same time (at channel.[publish|subscribe].message). - // Therefore, a message object may get to selections more than once, it's needed to handle in templates. + // Therefore, a message object may get to selections more than once, it's needed to handle it in templates. refName := messageKey pathStack := ctx.Stack.Items() - makeSelectable := !isComponent // Ignore the messageKey in definitions other than `messages`, since messageKey always be "message" there. if messageKey == "message" && len(pathStack) > 3 { refName = "" - // And force make the message selectable if it was defined in `components.messages` section. - makeSelectable = true } // Always draw the promises that are located in the `messages` section - return registerRef(ctx, m.Ref, refName, lo.Ternary(makeSelectable, lo.ToPtr(true), nil)), nil + return registerRef(ctx, m.Ref, refName, nil), nil } msgName, _ := lo.Coalesce(m.XGoName, messageKey) @@ -92,8 +88,8 @@ func (m Message) build(ctx *common.CompileContext, messageKey string) (common.Re }, PayloadType: m.getPayloadType(ctx), HeadersFallbackType: &lang.GoMap{KeyType: &lang.GoSimple{TypeName: "string"}, ValueType: &lang.GoSimple{TypeName: "any", IsInterface: true}}, - ContentType: m.ContentType, - IsComponent: isComponent, + ContentType: m.ContentType, + IsSelectable: true, } ctx.Logger.Trace(fmt.Sprintf("Message content type is %q", res.ContentType)) @@ -127,7 +123,7 @@ func (m Message) build(ctx *common.CompileContext, messageKey string) (common.Re if m.Headers != nil { ctx.Logger.Trace("Message headers") ref := ctx.PathStackRef("headers") - res.HeadersTypePromise = lang.NewPromise[*lang.GoStruct](ref) + res.HeadersTypePromise = lang.NewPromise[*lang.GoStruct](ref, nil) res.HeadersTypePromise.AssignErrorNote = "Probably the headers schema has type other than of 'object'?" ctx.PutPromise(res.HeadersTypePromise) } @@ -144,7 +140,7 @@ func (m Message) build(ctx *common.CompileContext, messageKey string) (common.Re } ref := ctx.PathStackRef("bindings") - res.BindingsPromise = lang.NewPromise[*render.Bindings](ref) + res.BindingsPromise = lang.NewPromise[*render.Bindings](ref,nil) ctx.PutPromise(res.BindingsPromise) } @@ -152,7 +148,7 @@ func (m Message) build(ctx *common.CompileContext, messageKey string) (common.Re if m.CorrelationID != nil { ctx.Logger.Trace("Message correlationId") ref := ctx.PathStackRef("correlationId") - res.CorrelationIDPromise = lang.NewPromise[*render.CorrelationID](ref) + res.CorrelationIDPromise = lang.NewPromise[*render.CorrelationID](ref,nil) ctx.PutPromise(res.CorrelationIDPromise) } @@ -178,7 +174,7 @@ func (m Message) setStructFields(ctx *common.CompileContext, langMessage *render } if langMessage.HeadersTypePromise != nil { ctx.Logger.Trace("Message headers has a concrete type") - prm := lang.NewGolangTypePromise(langMessage.HeadersTypePromise.Ref()) + prm := lang.NewGolangTypePromise(langMessage.HeadersTypePromise.Ref(), nil) ctx.PutPromise(prm) fields = append(fields, lang.GoStructField{Name: string(render.CorrelationIDStructFieldHeaders), Type: prm}) } else { @@ -194,7 +190,7 @@ func (m Message) getPayloadType(ctx *common.CompileContext) common.GolangType { if m.Payload != nil { ctx.Logger.Trace("Message payload has a concrete type") ref := ctx.PathStackRef("payload") - prm := lang.NewGolangTypePromise(ref) + prm := lang.NewGolangTypePromise(ref, nil) ctx.PutPromise(prm) return prm } diff --git a/internal/asyncapi/object.go b/internal/asyncapi/object.go index 92983e1..17278b1 100644 --- a/internal/asyncapi/object.go +++ b/internal/asyncapi/object.go @@ -86,7 +86,7 @@ func (o Object) Compile(ctx *common.CompileContext) error { } func (o Object) build(ctx *common.CompileContext, flags map[common.SchemaTag]string, objectKey string) (common.Renderable, error) { - _, isComponent := flags[common.SchemaTagComponent] + _, isSelectable := flags[common.SchemaTagSelectable] ignore := o.XIgnore if ignore { ctx.Logger.Debug("Object denoted to be ignored") @@ -97,11 +97,11 @@ func (o Object) build(ctx *common.CompileContext, flags map[common.SchemaTag]str refName := objectKey // Ignore the objectKey in definitions other than `components.schemas`, generate a unique name instead - if !isComponent { + if !isSelectable { refName = ctx.GenerateObjName("", "") } - return registerRef(ctx, o.Ref, refName, lo.ToPtr(false)), nil + return registerRef(ctx, o.Ref, refName, &isSelectable), nil } if o.Type == nil { @@ -290,7 +290,7 @@ func (o Object) buildLangStruct(ctx *common.CompileContext, flags map[common.Sch for _, entry := range o.Properties.Entries() { ctx.Logger.Trace("Object property", "name", entry.Key) ref := ctx.PathStackRef("properties", entry.Key) - prm := lang.NewGolangTypePromise(ref) + prm := lang.NewGolangTypePromise(ref, nil) ctx.PutPromise(prm) var langObj common.GolangType = prm @@ -321,7 +321,7 @@ func (o Object) buildLangStruct(ctx *common.CompileContext, flags map[common.Sch case 0: // "additionalProperties:" is an object ctx.Logger.Trace("Object additional properties", "type", "object") ref := ctx.PathStackRef("additionalProperties") - prm := lang.NewGolangTypePromise(ref) + prm := lang.NewGolangTypePromise(ref, nil) ctx.PutPromise(prm) xTags, xTagNames, xTagVals := o.AdditionalProperties.V0.xGoTagsInfo(ctx) f := lang.GoStructField{ @@ -391,7 +391,7 @@ func (o Object) buildLangArray(ctx *common.CompileContext, flags map[common.Sche case o.Items != nil && o.Items.Selector == 0: // Only one "type:" of items ctx.Logger.Trace("Object items", "typesCount", "single") ref := ctx.PathStackRef("items") - prm := lang.NewGolangTypePromise(ref) + prm := lang.NewGolangTypePromise(ref, nil) ctx.PutPromise(prm) res.ItemsType = prm case o.Items == nil || o.Items.Selector == 1: // No items or Several types for each item sequentially @@ -442,19 +442,19 @@ func (o Object) buildUnionStruct(ctx *common.CompileContext, flags map[common.Sc res.Fields = lo.Times(len(o.OneOf), func(index int) lang.GoStructField { ref := ctx.PathStackRef("oneOf", strconv.Itoa(index)) - prm := lang.NewGolangTypePromise(ref) + prm := lang.NewGolangTypePromise(ref, nil) ctx.PutPromise(prm) return lang.GoStructField{Type: &lang.GoPointer{Type: prm}} }) res.Fields = append(res.Fields, lo.Times(len(o.AnyOf), func(index int) lang.GoStructField { ref := ctx.PathStackRef("anyOf", strconv.Itoa(index)) - prm := lang.NewGolangTypePromise(ref) + prm := lang.NewGolangTypePromise(ref, nil) ctx.PutPromise(prm) return lang.GoStructField{Type: &lang.GoPointer{Type: prm}} })...) res.Fields = append(res.Fields, lo.Times(len(o.AllOf), func(index int) lang.GoStructField { ref := ctx.PathStackRef("allOf", strconv.Itoa(index)) - prm := lang.NewGolangTypePromise(ref) + prm := lang.NewGolangTypePromise(ref, nil) ctx.PutPromise(prm) return lang.GoStructField{Type: prm} })...) diff --git a/internal/asyncapi/operation.go b/internal/asyncapi/operation.go index 20c38f5..c3abb31 100644 --- a/internal/asyncapi/operation.go +++ b/internal/asyncapi/operation.go @@ -55,16 +55,16 @@ func (o Operation) build(ctx *common.CompileContext, operationKey string, flags return &render.Operation{Dummy: true}, nil } - _, isComponent := flags[common.SchemaTagComponent] + _, isSelectable := flags[common.SchemaTagSelectable] if o.Ref != "" { // Make a promise selectable if it defined in `operations` section - return registerRef(ctx, o.Ref, operationKey, lo.Ternary(isComponent, nil, lo.ToPtr(true))), nil + return registerRef(ctx, o.Ref, operationKey, lo.Ternary(isSelectable, lo.ToPtr(true), nil)), nil } res := &render.Operation{ OriginalName: operationKey, - IsComponent: isComponent, - IsPublisher: o.Action == OperationActionSend, + IsSelectable: isSelectable, + IsPublisher: o.Action == OperationActionSend, IsSubscriber: o.Action == OperationActionReceive, } @@ -73,7 +73,7 @@ func (o Operation) build(ctx *common.CompileContext, operationKey string, flags } ctx.Logger.Trace("Bound channel", "ref", o.Channel.Ref) - prm := lang.NewPromise[*render.Channel](o.Channel.Ref) + prm := lang.NewPromise[*render.Channel](o.Channel.Ref,nil) ctx.PutPromise(prm) res.ChannelPromise = prm @@ -81,7 +81,7 @@ func (o Operation) build(ctx *common.CompileContext, operationKey string, flags ctx.Logger.Trace("Found operation bindings") ref := ctx.PathStackRef("bindings") - res.BindingsPromise = lang.NewPromise[*render.Bindings](ref) + res.BindingsPromise = lang.NewPromise[*render.Bindings](ref, nil) ctx.PutPromise(res.BindingsPromise) res.BindingsType = &lang.GoStruct{ @@ -95,7 +95,7 @@ func (o Operation) build(ctx *common.CompileContext, operationKey string, flags for _, message := range o.Messages { ctx.Logger.Trace("Operation message", "ref", message.Ref) - prm := lang.NewPromise[*render.Message](message.Ref) + prm := lang.NewPromise[*render.Message](message.Ref, nil) ctx.PutPromise(prm) res.MessagesPromises = append(res.MessagesPromises, prm) } @@ -107,7 +107,7 @@ func (o Operation) build(ctx *common.CompileContext, operationKey string, flags ctx.Logger.Trace("Prebuild the operations for every supported protocol") for proto := range ProtocolBuilders { ctx.Logger.Trace("Operation", "proto", proto) - prm := lang.NewGolangTypeAssignCbPromise(o.Channel.Ref, nil, func(obj any) common.GolangType { + prmProtoChType := lang.NewGolangTypePromise(o.Channel.Ref, func(obj any) common.GolangType { ch := obj.(*render.Channel) protoCh, found := lo.Find(ch.ProtoChannels, func(p *render.ProtoChannel) bool { return p.Protocol == proto @@ -117,6 +117,18 @@ func (o Operation) build(ctx *common.CompileContext, operationKey string, flags } return protoCh.Type }) + ctx.PutPromise(prmProtoChType) + prmProtoCh := lang.NewPromise[*render.ProtoChannel](o.Channel.Ref, func(obj any) *render.ProtoChannel { + ch := obj.(*render.Channel) + protoCh, found := lo.Find(ch.ProtoChannels, func(p *render.ProtoChannel) bool { + return p.Protocol == proto + }) + if !found { + panic(fmt.Sprintf("ProtoChannel[%s] not found in %s. This is a bug", proto, ch)) + } + return protoCh + }) + ctx.PutPromise(prmProtoCh) res.ProtoOperations = append(res.ProtoOperations, &render.ProtoOperation{ Operation: res, Type: &lang.GoStruct{ @@ -125,10 +137,11 @@ func (o Operation) build(ctx *common.CompileContext, operationKey string, flags HasDefinition: true, }, Fields: []lang.GoStructField{ - {Type: prm}, + {Name: "Channel", Type: &lang.GoPointer{Type: prmProtoChType}}, }, }, - Protocol: proto, + ProtoChannelPromise: prmProtoCh, + Protocol: proto, }) } diff --git a/internal/asyncapi/server.go b/internal/asyncapi/server.go index ba42813..03572fa 100644 --- a/internal/asyncapi/server.go +++ b/internal/asyncapi/server.go @@ -39,24 +39,24 @@ func (s Server) Compile(ctx *common.CompileContext) error { } func (s Server) build(ctx *common.CompileContext, serverKey string) (common.Renderable, error) { - _, isComponent := ctx.Stack.Top().Flags[common.SchemaTagComponent] + _, isSelectable := ctx.Stack.Top().Flags[common.SchemaTagSelectable] if s.XIgnore { ctx.Logger.Debug("Server denoted to be ignored") return &render.Server{Dummy: true}, nil } if s.Ref != "" { // Make a promise selectable if it defined in `servers` section - return registerRef(ctx, s.Ref, serverKey, lo.Ternary(isComponent, nil, lo.ToPtr(true))), nil + return registerRef(ctx, s.Ref, serverKey, lo.Ternary(isSelectable, lo.ToPtr(true), nil)), nil } srvName, _ := lo.Coalesce(s.XGoName, serverKey) res := render.Server{ OriginalName: srvName, - Host: s.Host, - Pathname: s.Pathname, + Host: s.Host, + Pathname: s.Pathname, Protocol: s.Protocol, ProtocolVersion: s.ProtocolVersion, - IsComponent: isComponent, + IsSelectable: isSelectable, } // Channels which are bound to this server @@ -80,7 +80,7 @@ func (s Server) build(ctx *common.CompileContext, serverKey string) (common.Rend } ref := ctx.PathStackRef("bindings") - res.BindingsPromise = lang.NewPromise[*render.Bindings](ref) + res.BindingsPromise = lang.NewPromise[*render.Bindings](ref, nil) ctx.PutPromise(res.BindingsPromise) } @@ -88,7 +88,7 @@ func (s Server) build(ctx *common.CompileContext, serverKey string) (common.Rend for _, v := range s.Variables.Entries() { ctx.Logger.Trace("Server variable", "name", v.Key) ref := ctx.PathStackRef("variables", v.Key) - prm := lang.NewPromise[*render.ServerVariable](ref) + prm := lang.NewPromise[*render.ServerVariable](ref,nil) ctx.PutPromise(prm) res.VariablesPromises.Set(v.Key, prm) } @@ -113,7 +113,7 @@ func (s Server) build(ctx *common.CompileContext, serverKey string) (common.Rend res.ProtoServer = protoServer // Register protocol only for servers in `servers` document section, not in `components` - if !isComponent { + if !isSelectable { ctx.Storage.RegisterProtocol(s.Protocol) } diff --git a/internal/common/tags.go b/internal/common/tags.go index 75f05a5..8066532 100644 --- a/internal/common/tags.go +++ b/internal/common/tags.go @@ -3,8 +3,11 @@ package common type SchemaTag string const ( + // SchemaTagSelectable force marks that an object is selectable. This makes the object to be rendered if it is + // not selectable, e.g. it's defined in `components` section + SchemaTagSelectable SchemaTag = "selectable" // SchemaTagSelectable marks that an object is selectable // SchemaTagDefinition forces an object to be rendered as a definition instead of inline declaration - SchemaTagDefinition SchemaTag = "definition" // TODO: what the difference between this and SchemaTagComponent? + SchemaTagDefinition SchemaTag = "definition" // SchemaTagComponent marks all top-level objects in `component` section SchemaTagComponent SchemaTag = "components" // SchemaTagMarshal marks that an object is meant to be marshaled/unmarshaled. Inherited to the nested objects diff --git a/internal/render/channel.go b/internal/render/channel.go index 4a38145..c02db7d 100644 --- a/internal/render/channel.go +++ b/internal/render/channel.go @@ -9,11 +9,11 @@ import ( ) type Channel struct { - OriginalName string // Channel name, typically equals to Channel key, can get overridden in x-go-name - Address string - Dummy bool - IsComponent bool // true if channel is defined in `components` section - IsPublisher bool + OriginalName string // Channel name, typically equals to Channel key, can get overridden in x-go-name + Address string + Dummy bool + IsSelectable bool // true if channel should get to selections + IsPublisher bool IsSubscriber bool ServersPromises []*lang.Promise[*Server] // Servers that this channel is bound with. Empty list means "no servers bound". @@ -21,7 +21,7 @@ type Channel struct { ParametersType *lang.GoStruct // nil if no parameters - MessagesPromises []*lang.Ref + MessagesRefs []*lang.Ref // All operations we know about for further selecting ones that are bound to this channel // We can't collect here just the operations already bound with this channel, because the channel in operation @@ -40,7 +40,7 @@ func (c *Channel) Kind() common.ObjectKind { } func (c *Channel) Selectable() bool { - return !c.Dummy && !c.IsComponent // Select only the channels defined in the `channels` section + return !c.Dummy && c.IsSelectable // Select only the channels defined in the `channels` section } func (c *Channel) Visible() bool { @@ -78,7 +78,9 @@ func (c *Channel) BoundMessages() []common.Renderable { op := common.DerefRenderable(o).(*Operation) return op.Messages() }) - r := lo.Without(c.Messages(), opMsgs...) + r := utils.WithoutBy(c.Messages(), opMsgs, func(a, b common.Renderable) bool { + return common.CheckSameRenderables(a, b) + }) return r } @@ -122,7 +124,7 @@ func (c *Channel) ProtoBindingsValue(protoName string) common.Renderable { } func (c *Channel) Messages() []common.Renderable { - return lo.Map(c.MessagesPromises, func(prm *lang.Ref, _ int) common.Renderable { return prm.T() }) + return lo.Map(c.MessagesRefs, func(prm *lang.Ref, _ int) common.Renderable { return prm }) } type ProtoChannel struct { diff --git a/internal/render/lang/promise.go b/internal/render/lang/promise.go index 3af7cfe..e9ae9e1 100644 --- a/internal/render/lang/promise.go +++ b/internal/render/lang/promise.go @@ -8,22 +8,14 @@ import ( type promiseAssignCbFunc[T any] func(obj any) T -func NewPromise[T any](ref string) *Promise[T] { - return newPromise(ref, common.PromiseOriginInternal, nil, defaultAssignCb[T]) +func NewPromise[T any](ref string, assignCb promiseAssignCbFunc[T]) *Promise[T] { + return newPromise[T](ref, common.PromiseOriginInternal, nil, assignCb) } func NewCbPromise[T any](findCb common.PromiseFindCbFunc, assignCb promiseAssignCbFunc[T]) *Promise[T] { return newPromise("", common.PromiseOriginInternal, findCb, assignCb) } -func defaultAssignCb[T any](obj any) T { - t, ok := obj.(T) - if !ok { - panic(fmt.Sprintf("Object %+v is not a type %T", obj, new(T))) - } - return t -} - func newPromise[T any]( ref string, origin common.PromiseOrigin, @@ -108,15 +100,9 @@ func (r *Promise[T]) IsStruct() bool { return false } -func NewGolangTypePromise(ref string) *GolangTypePromise { +func NewGolangTypePromise(ref string, assignCb promiseAssignCbFunc[common.GolangType]) *GolangTypePromise { return &GolangTypePromise{ - Promise: *newPromise[common.GolangType](ref, common.PromiseOriginInternal, nil, nil), - } -} - -func NewGolangTypeAssignCbPromise(ref string, findCb common.PromiseFindCbFunc, assignCb promiseAssignCbFunc[common.GolangType]) *GolangTypePromise { - return &GolangTypePromise{ - Promise: *newPromise[common.GolangType](ref, common.PromiseOriginInternal, findCb, assignCb), + Promise: *newPromise[common.GolangType](ref, common.PromiseOriginInternal, nil, assignCb), } } @@ -176,9 +162,9 @@ type ListPromise[T any] struct { } func (r *ListPromise[T]) AssignList(objs []any) { - if r.assignCb != nil { + if r.assignItemCb != nil { r.targets = lo.Map(objs, func(item any, _ int) T { - return r.assignCb(item) + return r.assignItemCb(item) }) r.assigned = true return diff --git a/internal/render/message.go b/internal/render/message.go index 6d37e5b..012bad2 100644 --- a/internal/render/message.go +++ b/internal/render/message.go @@ -11,9 +11,9 @@ import ( type Message struct { OriginalName string OutType *lang.GoStruct - InType *lang.GoStruct - Dummy bool - IsComponent bool // true if message is defined in `components` section + InType *lang.GoStruct + Dummy bool + IsSelectable bool // true if message should get to selections HeadersFallbackType *lang.GoMap HeadersTypePromise *lang.Promise[*lang.GoStruct] @@ -37,7 +37,7 @@ func (m *Message) Kind() common.ObjectKind { } func (m *Message) Selectable() bool { - return !m.Dummy && !m.IsComponent // Select only the messages defined in the `channels` section` + return !m.Dummy && m.IsSelectable // Select only the messages defined in the `channels` section` } func (m *Message) Visible() bool { diff --git a/internal/render/operation.go b/internal/render/operation.go index 6d4df8b..b4c8b39 100644 --- a/internal/render/operation.go +++ b/internal/render/operation.go @@ -9,10 +9,10 @@ import ( ) type Operation struct { - OriginalName string // Actually it isn't used - Dummy bool - IsComponent bool // true if channel is defined in `components` section - IsPublisher bool + OriginalName string // Actually it isn't used + Dummy bool + IsSelectable bool // true if channel should get to selections + IsPublisher bool IsSubscriber bool ChannelPromise *lang.Promise[*Channel] // Channel this operation bound with @@ -30,7 +30,7 @@ func (o *Operation) Kind() common.ObjectKind { } func (o *Operation) Selectable() bool { - return !o.Dummy && !o.IsComponent // Proto channels for each supported protocol + return !o.Dummy && o.IsSelectable // Proto channels for each supported protocol } func (o *Operation) Visible() bool { @@ -98,7 +98,8 @@ func (o *Operation) ProtoBindingsValue(protoName string) common.Renderable { type ProtoOperation struct { *Operation - Type *lang.GoStruct + ProtoChannelPromise *lang.Promise[*ProtoChannel] + Type *lang.GoStruct Protocol string } @@ -110,6 +111,10 @@ func (p *ProtoOperation) String() string { return fmt.Sprintf("ProtoOperation[%s] %s", p.Protocol, p.OriginalName) } +func (p *ProtoOperation) ProtoChannel() *ProtoChannel { + return p.ProtoChannelPromise.T() +} + // isBound returns true if operation is bound to at least one server with supported protocol func (p *ProtoOperation) isBound() bool { protos := lo.Map(p.ChannelPromise.T().BoundServers(), func(s common.Renderable, _ int) string { diff --git a/internal/render/server.go b/internal/render/server.go index 6d761a9..0f8bc47 100644 --- a/internal/render/server.go +++ b/internal/render/server.go @@ -11,8 +11,8 @@ import ( type Server struct { OriginalName string - Dummy bool // x-ignore is set - IsComponent bool // true if server is defined in `components` section + Dummy bool // x-ignore is set + IsSelectable bool // true if server is defined in `components` section AllActiveChannelsPromise *lang.ListPromise[common.Renderable] @@ -34,7 +34,7 @@ func (s *Server) Kind() common.ObjectKind { } func (s *Server) Selectable() bool { - return !s.Dummy && !s.IsComponent // Select only the servers defined in the `channels` section` + return !s.Dummy && s.IsSelectable // Select only the servers defined in the `channels` section` } func (s *Server) Visible() bool { @@ -55,8 +55,8 @@ func (s *Server) Name() string { func (s *Server) BoundChannels() []common.Renderable { r := lo.Filter(s.AllActiveChannelsPromise.T(), func(r common.Renderable, _ int) bool { ch := common.DerefRenderable(r).(*Channel) - return lo.ContainsBy(ch.BoundServers(), func(s common.Renderable) bool { - return common.CheckSameRenderables(s, r) + return lo.ContainsBy(ch.BoundServers(), func(item common.Renderable) bool { + return common.CheckSameRenderables(s, item) }) }) return r diff --git a/internal/utils/types.go b/internal/utils/types.go new file mode 100644 index 0000000..42ec7a9 --- /dev/null +++ b/internal/utils/types.go @@ -0,0 +1,15 @@ +package utils + +import "github.com/samber/lo" + +// WithoutBy returns slice excluding all given values executing the predicate function. +func WithoutBy[T comparable, Slice ~[]T](collection Slice, exclude Slice, predicate func(a, b T) bool) Slice { + result := make(Slice, 0, len(collection)) + for i := range collection { + contains := lo.ContainsBy(exclude, func(item T) bool { return predicate(collection[i], item) }) + if !contains { + result = append(result, collection[i]) + } + } + return result +} \ No newline at end of file diff --git a/templates/amqp/amqp_channel.tmpl b/templates/amqp/amqp_channel.tmpl index d1a3d81..d8b10a8 100644 --- a/templates/amqp/amqp_channel.tmpl +++ b/templates/amqp/amqp_channel.tmpl @@ -1,9 +1,28 @@ +{{define "channel/amqp/newFunction/block1"}} + {{- if and .BindingsType (.BindingsProtocols | has .Protocol)}} + bindings := {{ .BindingsType | gousage }}{}.{{.Protocol | capitalize | goid}}() + switch bindings.ChannelType { + case {{goqualrun .Protocol "ChannelTypeQueue"}}: + res.queue = res.address.String() + default: + res.routingKey = res.address.String() + } + if bindings.ExchangeConfiguration.Name != nil { + res.exchange = *bindings.ExchangeConfiguration.Name + } + if bindings.QueueConfiguration.Name != "" { + res.queue = bindings.QueueConfiguration.Name + } + {{- end}} +{{- end}} +{{define "channel/amqp/publishMethods/block1"}} + envelope.SetRoutingKey(c.RoutingKey()) +{{- end}} + {{- def .Type}} -{{- if .BoundMessages }} - {{template "channel/proto/newFunction" .}} +{{template "channel/proto/newFunction" .}} - {{template "channel/proto/openFunction" .}} -{{- end}} +{{template "channel/proto/openFunction" .}} {{- if ndefined .Type}} {{ .Type | godef }} diff --git a/templates/amqp/amqp_operation.tmpl b/templates/amqp/amqp_operation.tmpl index 9d7a3e7..9702bf6 100644 --- a/templates/amqp/amqp_operation.tmpl +++ b/templates/amqp/amqp_operation.tmpl @@ -1,29 +1,14 @@ -{{define "operation/amqp/newFunction/block1"}} - {{- if and .BindingsType (.BindingsProtocols | has .Protocol)}} - bindings := {{ .BindingsType | gousage }}{}.{{.Protocol | capitalize | goid}}() - switch bindings.ChannelType { - case {{goqualrun .Protocol "ChannelTypeQueue"}}: - res.queue = res.name.String() - default: - res.routingKey = res.name.String() - } - if bindings.ExchangeConfiguration.Name != nil { - res.exchange = *bindings.ExchangeConfiguration.Name - } - if bindings.QueueConfiguration.Name != "" { - res.queue = bindings.QueueConfiguration.Name - } - {{- end}} -{{- end}} {{define "operation/amqp/publishMethods/block1"}} - envelope.SetRoutingKey(o.RoutingKey()) + envelope.SetRoutingKey(o.Channel.RoutingKey()) {{- end}} - {{- with deref .}} + {{ .Type | def }} + {{template "operation/proto/newFunction" .}} {{template "operation/proto/openFunction" .}} {{ .Type | godef }} + {{template "operation/proto/commonMethods" .}} {{if .IsPublisher}}{{template "operation/proto/publishMethods" .}}{{end}} {{if .IsSubscriber}}{{template "operation/proto/subscribeMethods" .}}{{end}} {{template "operation/proto/serverInterface" .}} diff --git a/templates/common/channel.tmpl b/templates/common/channel.tmpl index eacf307..051f695 100644 --- a/templates/common/channel.tmpl +++ b/templates/common/channel.tmpl @@ -1,9 +1,62 @@ {{define "channel/proto/newFunction"}} -{{template "operation/proto/newFunction" .}} +func New{{ . | goid }}{{.Protocol | capitalize | goid}}( + {{- with .ParametersType}}params {{ . | gousage}},{{end}} + {{- if .IsPublisher}}publisher {{goqualrun .Protocol "Publisher"}},{{end}} + {{- if .IsSubscriber}}subscriber {{goqualrun .Protocol "Subscriber"}},{{end}} +) *{{.Type| gousage}} { + res := {{.Type | gousage}}{ + address: {{. | goid}}Address({{if .ParametersType}}params{{end}}), + {{- if .IsPublisher}}publisher: publisher,{{end}} + {{- if .IsSubscriber}}subscriber: subscriber,{{end}} + } + {{- with trytmpl (print "channel/" .Protocol "/newFunction/block1") .}} + {{.}} + {{- end}} + return &res +} {{- end}} {{define "channel/proto/openFunction"}} -{{template "operation/proto/openFunction" .}} +func Open{{ . | goid }}{{.Protocol | capitalize | goid}}( + ctx {{goqual "context.Context"}}, + {{ with .ParametersType}}params {{. | gousage}},{{end}} + {{ if .IsPublisher}}producer {{goqualrun .Protocol "Producer"}},{{end}} + {{ if .IsSubscriber}}consumer {{goqualrun .Protocol "Consumer"}},{{end}} +) (ch *{{.Type | gousage}}, err error) { + {{- if .BindingsProtocols | has .Protocol}} + chBindings := {{.BindingsType | gousage}}{}.{{.Protocol | capitalize | goid}}() + {{- end }} + + {{- if or .IsPublisher .IsSubscriber}} + address, err := {{. | goid}}Address({{if .ParametersType}}params{{end}}).Expand() + if err != nil { + return nil, err + } + {{- end}} + {{- if .IsPublisher}} + var publisher {{goqualrun .Protocol "Publisher"}} + + if producer != nil { + if publisher, err = producer.Publisher(ctx, address, {{if .BindingsProtocols | has .Protocol}}&chBindings{{else}}nil{{end}}, nil); err != nil { + return nil, err + } + } + {{- end}} + {{- if .IsSubscriber}} + var subscriber {{goqualrun .Protocol "Subscriber"}} + if consumer != nil { + if subscriber, err = consumer.Subscriber(ctx, address, {{if .BindingsProtocols | has .Protocol}}&chBindings{{else}}nil{{end}}, nil); err != nil { + return nil, err + } + } + {{- end}} + + return New{{ . | goid }}{{.Protocol | capitalize | goid}}( + {{ if .ParametersType}}params, {{end}} + {{ if .IsPublisher}}publisher,{{end}} + {{ if .IsSubscriber}}subscriber,{{end}} + ), nil +} {{- end}} {{define "channel/proto/commonMethods"}} @@ -40,9 +93,9 @@ type {{ . | goid }}ChannelEnvelopeMarshaler{{.Protocol | capitalize | goid}} int if err := message.MarshalChannel{{$ | goid}}{{$.Protocol | capitalize | goid}}(envelope); err != nil { return err } - {{ with trytmpl (print "operation/" $.Protocol "/publishMethods/block1") .}}{{.}}{{end}} - {{- if .BindingsProtocols | has $.Protocol }} - envelope.SetBindings({{.BindingsType | gousage}}{}.{{$.Protocol | capitalize | goid}}()) + {{ with trytmpl (print "channel/" $.Protocol "/publishMethods/block1") .}}{{.}}{{end}} + {{- if (deref .).BindingsProtocols | has $.Protocol }} + envelope.SetBindings({{(deref .).BindingsType | gousage}}{}.{{$.Protocol | capitalize | goid}}()) {{- end}} return nil } @@ -85,14 +138,19 @@ type {{ . | goid }}ChannelEnvelopeUnmarshaler{{.Protocol | capitalize | goid}} i func (c {{ $.Type | gousage }}) Subscribe{{. | goid}}( ctx {{goqual "context.Context"}}, - cb func(message {{ .InType | goptr | gousage }}) error, - ) error { - return o.Subscribe(ctx, func(envelope {{goqualrun $.Protocol "EnvelopeReader"}}) error { - message := new({{ .InType | gousage }}) - if err := c.OpenEnvelope(envelope, message); err != nil { - return err + cb func(message {{ (deref .).InType | goptr }}), + ) (err error) { + subCtx, cancel := {{goqual "context.WithCancelCause"}}(ctx) + defer cancel(nil) + + return c.Subscribe(subCtx, func(envelope {{goqualrun $.Protocol "EnvelopeReader"}}) { + message := new({{ (deref .).InType | gousage }}) + if err2 := c.OpenEnvelope{{. | goid}}(envelope, message); err2 != nil { + err = {{goqual "fmt.Errorf"}}("open message envelope: %w", err2) + cancel(err) + return } - return cb(message) + cb(message) }) } {{- end}} @@ -109,7 +167,7 @@ func (c {{.Type| gousage}}) Subscribe(ctx {{goqual "context.Context"}}, cb func( {{define "channel/proto/serverInterface"}} type {{ .Type | goid }}Server interface { - OpenChannel{{.Type | goid}}(ctx {{goqual "context.Context"}}, {{if .ParametersType}}params {{ .ParametersType | gousage }}{{end}}) (*{{ .Type | gousage }}, error) + OpenChannel{{. | goid}}{{.Protocol | capitalize | goid}}(ctx {{goqual "context.Context"}}, {{if .ParametersType}}params {{ .ParametersType | gousage }}{{end}}) (*{{ .Type | gousage }}, error) {{if .IsPublisher}}Producer() {{goqualrun .Protocol "Producer"}}{{end}} {{if .IsSubscriber}}Consumer() {{goqualrun .Protocol "Consumer"}}{{end}} } diff --git a/templates/common/message.tmpl b/templates/common/message.tmpl index 3c040fd..3812596 100644 --- a/templates/common/message.tmpl +++ b/templates/common/message.tmpl @@ -1,14 +1,18 @@ {{define "message/proto/marshalMethods"}} {{- range .BoundChannels}} - func (m *{{ $.OutType | goid }}) MarshalChannel{{. | goid}}{{$.Protocol | capitalize | goid}}(envelope {{goqualrun $.Protocol "EnvelopeWriter"}}) error { - return m.MarshalEnvelope{{$.Protocol | capitalize | goid}}(envelope) - } + {{- if and (visible .) .IsPublisher}} + func (m *{{ $.OutType | goid }}) MarshalChannel{{. | goid}}{{$.Protocol | capitalize | goid}}(envelope {{goqualrun $.Protocol "EnvelopeWriter"}}) error { + return m.MarshalEnvelope{{$.Protocol | capitalize | goid}}(envelope) + } + {{- end}} {{- end}} {{- range .BoundOperations}} - func (m *{{ $.OutType | goid }}) MarshalOperation{{. | goid}}{{$.Protocol | capitalize | goid}}(envelope {{goqualrun $.Protocol "EnvelopeWriter"}}) error { - return m.MarshalEnvelope{{$.Protocol | capitalize | goid}}(envelope) - } + {{- if and (visible .) .IsPublisher}} + func (m *{{ $.OutType | goid }}) MarshalOperation{{. | goid}}{{$.Protocol | capitalize | goid}}(envelope {{goqualrun $.Protocol "EnvelopeWriter"}}) error { + return m.MarshalEnvelope{{$.Protocol | capitalize | goid}}(envelope) + } + {{- end}} {{- end}} func (m *{{ .OutType | goid }}) MarshalEnvelope{{ .Protocol | capitalize | goid }}(envelope {{goqualrun .Protocol "EnvelopeWriter"}}) error { @@ -29,15 +33,19 @@ func (m *{{ .OutType | goid }}) MarshalEnvelope{{ .Protocol | capitalize | goid {{define "message/proto/unmarshalMethods"}} {{- range .BoundChannels}} - func (m *{{ $.InType | goid }}) UnmarshalChannel{{. | goid}}{{$.Protocol | capitalize | goid}}(envelope {{goqualrun $.Protocol "EnvelopeReader"}}) error { - return m.UnmarshalEnvelope{{$.Protocol | capitalize | goid}}(envelope) - } + {{- if and (visible .) .IsSubscriber}} + func (m *{{ $.InType | goid }}) UnmarshalChannel{{. | goid}}{{$.Protocol | capitalize | goid}}(envelope {{goqualrun $.Protocol "EnvelopeReader"}}) error { + return m.UnmarshalEnvelope{{$.Protocol | capitalize | goid}}(envelope) + } + {{- end}} {{- end}} {{- range .BoundOperations}} - func (m *{{ $.InType | goid }}) UnmarshalOperation{{. | goid}}{{$.Protocol | capitalize | goid}}(envelope {{goqualrun $.Protocol "EnvelopeReader"}}) error { - return m.UnmarshalEnvelope{{$.Protocol | capitalize | goid}}(envelope) - } + {{- if and (visible .) .IsSubscriber}} + func (m *{{ $.InType | goid }}) UnmarshalOperation{{. | goid}}{{$.Protocol | capitalize | goid}}(envelope {{goqualrun $.Protocol "EnvelopeReader"}}) error { + return m.UnmarshalEnvelope{{$.Protocol | capitalize | goid}}(envelope) + } + {{- end}} {{- end}} func (m *{{ .InType | goid }}) UnmarshalEnvelope{{ .Protocol | capitalize | goid }}(envelope {{goqualrun .Protocol "EnvelopeReader"}}) error { diff --git a/templates/common/operation.tmpl b/templates/common/operation.tmpl index 4e7b2a1..069e804 100644 --- a/templates/common/operation.tmpl +++ b/templates/common/operation.tmpl @@ -1,34 +1,36 @@ {{define "operation/proto/newFunction"}} -func New{{ .Channel | goid }}{{.Protocol | capitalize | goid}}{{. | goid}}( +func New{{.ProtoChannel | goid}}{{.Protocol | capitalize | goid}}{{ . | goid }}( {{- with .Channel.ParametersType}}params {{ . | gousage}},{{end}} {{- if .IsPublisher}}publisher {{goqualrun .Protocol "Publisher"}},{{end}} {{- if .IsSubscriber}}subscriber {{goqualrun .Protocol "Subscriber"}},{{end}} ) *{{.Type| gousage}} { res := {{.Type | gousage}}{ - name: {{.Channel | goid}}Name({{if .ParametersType}}params{{end}}), - {{- if .IsPublisher}}publisher: publisher,{{end}} - {{- if .IsSubscriber}}subscriber: subscriber,{{end}} + Channel: {{gopkg .ProtoChannel.Type}}New{{.ProtoChannel | goid}}{{.Protocol | capitalize | goid}}( + {{- with .Channel.ParametersType}}params{{end}}, + {{- if .ProtoChannel.IsPublisher}}{{- if .IsPublisher}}publisher{{else}}nil{{end}},{{end}} + {{- if .ProtoChannel.IsSubscriber}}{{- if .IsSubscriber}}subscriber{{else}}nil{{end}},{{end}} + ), } - {{- with trytmpl (print "operation/" .Protocol "/newFunction/block1") .}} - {{.}} - {{- end}} return &res } {{- end}} {{define "operation/proto/openFunction"}} -func Open{{ .Channel | goid }}{{.Protocol | capitalize | goid}}{{. | goid}}( +func Open{{.ProtoChannel | goid}}{{.Protocol | capitalize | goid}}{{ . | goid }}( ctx {{goqual "context.Context"}}, {{ with .Channel.ParametersType}}params {{. | gousage}},{{end}} {{ if .IsPublisher}}producer {{goqualrun .Protocol "Producer"}},{{end}} {{ if .IsSubscriber}}consumer {{goqualrun .Protocol "Consumer"}},{{end}} ) (ch *{{.Type | gousage}}, err error) { {{- if .Channel.BindingsProtocols | has .Protocol}} - bindings := {{.Channel.BindingsType | gousage}}{}.{{.Protocol | capitalize | goid}}() + chBindings := {{.Channel.BindingsType | gousage}}{}.{{.Protocol | capitalize | goid}}() + {{- end }} + {{- if .BindingsProtocols | has .Protocol}} + opBindings := {{.BindingsType | gousage}}{}.{{.Protocol | capitalize | goid}}() {{- end }} {{- if or .IsPublisher .IsSubscriber}} - address, err := {{.Channel | goid}}Address({{if .Channel.ParametersType}}params{{end}}).Expand() + address, err := {{with .Channel.SelectProtoObject .Protocol}}{{gopkg .Type}}{{end}}{{.Channel | goid}}Address({{if .Channel.ParametersType}}params{{end}}).Expand() if err != nil { return nil, err } @@ -37,7 +39,7 @@ func Open{{ .Channel | goid }}{{.Protocol | capitalize | goid}}{{. | goid}}( var publisher {{goqualrun .Protocol "Publisher"}} if producer != nil { - if publisher, err = producer.Publisher(ctx, address, {{if .Channel.BindingsProtocols | has .Protocol}}&bindings{{else}}nil{{end}}); err != nil { + if publisher, err = producer.Publisher(ctx, address, {{if .Channel.BindingsProtocols | has .Protocol}}&chBindings{{else}}nil{{end}}, {{if .BindingsProtocols | has .Protocol}}&opBindings{{else}}nil{{end}}); err != nil { return nil, err } } @@ -45,17 +47,26 @@ func Open{{ .Channel | goid }}{{.Protocol | capitalize | goid}}{{. | goid}}( {{- if .IsSubscriber}} var subscriber {{goqualrun .Protocol "Subscriber"}} if consumer != nil { - if subscriber, err = consumer.Subscriber(ctx, address, {{if .Channel.BindingsProtocols | has .Protocol}}&bindings{{else}}nil{{end}}); err != nil { + if subscriber, err = consumer.Subscriber(ctx, address, {{if .Channel.BindingsProtocols | has .Protocol}}&chBindings{{else}}nil{{end}}, {{if .BindingsProtocols | has .Protocol}}&opBindings{{else}}nil{{end}}); err != nil { return nil, err } } {{- end}} - return New{{ .Channel | goid }}{{.Protocol | capitalize | goid}}{{. | goid}}( - {{ if .Channel.ParametersType}}params, {{end}} - {{ if .IsPublisher}}publisher,{{end}} - {{ if .IsSubscriber}}subscriber,{{end}} - ), nil + res := &{{.Type | gousage}}{ + Channel: {{gopkg .ProtoChannel.Type}}New{{.ProtoChannel | goid}}{{.Protocol | capitalize | goid}}( + {{- with .Channel.ParametersType}}params{{end}}, + {{- if .ProtoChannel.IsPublisher}}{{- if .IsPublisher}}publisher{{else}}nil{{end}},{{end}} + {{- if .ProtoChannel.IsSubscriber}}{{- if .IsSubscriber}}subscriber{{else}}nil{{end}},{{end}} + ), + } + return res, nil +} +{{- end}} + +{{define "operation/proto/commonMethods"}} +func (o {{ .Type | gousage }}) Close() error { + return o.Channel.Close() } {{- end}} @@ -88,7 +99,7 @@ type {{ . | goid }}OperationEnvelopeMarshaler{{.Protocol | capitalize | goid}} i if err := o.SealEnvelope{{. | goid}}(envelope, message); err != nil { return err } - return o.Publish(ctx, envelope) + return o.Channel.Publish(ctx, envelope) } {{- end}} {{- end}} @@ -111,14 +122,19 @@ type {{ . | goid }}OperationEnvelopeUnmarshaler{{.Protocol | capitalize | goid}} func (o {{ $.Type | gousage }}) Subscribe{{. | goid}}( ctx {{goqual "context.Context"}}, - cb func(message {{ .InType | goptr | gousage }}) error, - ) error { - return o.Subscribe(ctx, func(envelope {{goqualrun $.Protocol "EnvelopeReader"}}) error { + cb func(message {{ .InType | goptr }}), + ) (err error) { + subCtx, cancel := {{goqual "context.WithCancelCause"}}(ctx) + defer cancel(nil) + + return o.Channel.Subscribe(subCtx, func(envelope {{goqualrun $.Protocol "EnvelopeReader"}}) { message := new({{ .InType | gousage }}) - if err := o.OpenEnvelope(envelope, message); err != nil { - return err + if err2 := o.OpenEnvelope{{. | goid}}(envelope, message); err2 != nil { + err = {{goqual "fmt.Errorf"}}("open message envelope: %w", err2) + cancel(err) + return } - return cb(message) + cb(message) }) } {{- end}} @@ -127,7 +143,7 @@ type {{ . | goid }}OperationEnvelopeUnmarshaler{{.Protocol | capitalize | goid}} {{define "operation/proto/serverInterface"}} type {{ .Type | goid }}Server interface { - OpenOperation{{.Type | goid}}(ctx {{goqual "context.Context"}}, {{with .Channel.ParametersType}}params {{ . | gousage }}{{end}}) (*{{ .Type | gousage }}, error) + OpenOperation{{.Channel | goid}}{{.Protocol | capitalize | goid}}{{. | goid}}(ctx {{goqual "context.Context"}}, {{with .Channel.ParametersType}}params {{ . | gousage }}{{end}}) (*{{ .Type | gousage }}, error) {{if .IsPublisher}}Producer() {{goqualrun .Protocol "Producer"}}{{end}} {{if .IsSubscriber}}Consumer() {{goqualrun .Protocol "Consumer"}}{{end}} } diff --git a/templates/common/server.tmpl b/templates/common/server.tmpl index 25e185d..3c487c2 100644 --- a/templates/common/server.tmpl +++ b/templates/common/server.tmpl @@ -1,11 +1,11 @@ {{define "server/proto/channelOpenMethods"}} {{- range .BoundChannels}} {{- with .SelectProtoObject $.Protocol }} - func (s {{ $.Type | gousage }}) OpenChannel{{ . | goid }}( + func (s {{ $.Type | gousage }}) OpenChannel{{ . | goid }}{{$.Protocol | capitalize | goid}}( ctx {{goqual "context.Context"}}, {{with .ParametersType}}params {{ . | gousage }},{{end}} ) (ch *{{ .Type | gousage }}, err error) { - return {{gopkg .Type}}Open{{ .Type | goid }}( + return {{gopkg .Type}}Open{{ . | goid }}{{$.Protocol | capitalize | goid}}( ctx, {{- if .ParametersType}}params,{{end}} {{- if .IsPublisher}}s.producer,{{end}} @@ -19,13 +19,13 @@ {{define "server/proto/operationOpenMethods"}} {{- range .BoundOperations}} {{- with .SelectProtoObject $.Protocol }} - func (s {{ $.Type | gousage }}) OpenOperation{{ . | goid }}( + func (s {{ $.Type | gousage }}) OpenOperation{{ .Channel | goid }}{{$.Protocol | capitalize | goid}}{{ . | goid }}( ctx {{goqual "context.Context"}}, - {{with .ParametersType}}params {{ . | gousage }},{{end}} + {{with .Channel.ParametersType}}params {{ . | gousage }},{{end}} ) (ch *{{ .Type | gousage }}, err error) { - return {{gopkg .Type}}Open{{ .Type | goid }}( + return {{gopkg .Type}}Open{{ .Channel | goid }}{{$.Protocol | capitalize | goid}}{{ . | goid }}( ctx, - {{- if .ParametersType}}params,{{end}} + {{- if .Channel.ParametersType}}params,{{end}} {{- if .IsPublisher}}s.producer,{{end}} {{- if .IsSubscriber}}s.consumer,{{end}} ) diff --git a/templates/http/http_channel.tmpl b/templates/http/http_channel.tmpl index 52a7b40..6806bdc 100644 --- a/templates/http/http_channel.tmpl +++ b/templates/http/http_channel.tmpl @@ -1,9 +1,7 @@ {{- def .Type}} -{{- if .BoundMessages }} - {{template "channel/proto/newFunction" .}} +{{template "channel/proto/newFunction" .}} - {{template "channel/proto/openFunction" .}} -{{- end}} +{{template "channel/proto/openFunction" .}} {{- if ndefined .Type}} {{.Type| godef}} diff --git a/templates/http/http_operation.tmpl b/templates/http/http_operation.tmpl index 4eaa117..ec08073 100644 --- a/templates/http/http_operation.tmpl +++ b/templates/http/http_operation.tmpl @@ -1,8 +1,11 @@ {{- with deref .}} + {{ .Type | def }} + {{template "operation/proto/newFunction" .}} {{template "operation/proto/openFunction" .}} {{ .Type | godef }} + {{template "operation/proto/commonMethods" .}} {{if .IsPublisher}}{{template "operation/proto/publishMethods" .}}{{end}} {{if .IsSubscriber}}{{template "operation/proto/subscribeMethods" .}}{{end}} {{template "operation/proto/serverInterface" .}} diff --git a/templates/ip/ip_channel.tmpl b/templates/ip/ip_channel.tmpl index 52a7b40..6806bdc 100644 --- a/templates/ip/ip_channel.tmpl +++ b/templates/ip/ip_channel.tmpl @@ -1,9 +1,7 @@ {{- def .Type}} -{{- if .BoundMessages }} - {{template "channel/proto/newFunction" .}} +{{template "channel/proto/newFunction" .}} - {{template "channel/proto/openFunction" .}} -{{- end}} +{{template "channel/proto/openFunction" .}} {{- if ndefined .Type}} {{.Type| godef}} diff --git a/templates/ip/ip_operation.tmpl b/templates/ip/ip_operation.tmpl index 4eaa117..ec08073 100644 --- a/templates/ip/ip_operation.tmpl +++ b/templates/ip/ip_operation.tmpl @@ -1,8 +1,11 @@ {{- with deref .}} + {{ .Type | def }} + {{template "operation/proto/newFunction" .}} {{template "operation/proto/openFunction" .}} {{ .Type | godef }} + {{template "operation/proto/commonMethods" .}} {{if .IsPublisher}}{{template "operation/proto/publishMethods" .}}{{end}} {{if .IsSubscriber}}{{template "operation/proto/subscribeMethods" .}}{{end}} {{template "operation/proto/serverInterface" .}} diff --git a/templates/kafka/kafka_channel.tmpl b/templates/kafka/kafka_channel.tmpl index 578e215..e120856 100644 --- a/templates/kafka/kafka_channel.tmpl +++ b/templates/kafka/kafka_channel.tmpl @@ -1,9 +1,20 @@ +{{define "channel/kafka/newFunction/block1"}} + res.topic = res.address.String() + {{- if .BindingsType}} + bindings := {{.BindingsType | gousage}}{}.{{.Protocol | capitalize | goid}}() + if bindings.Topic != "" { + res.topic = bindings.Topic + } + {{- end}} +{{- end}} +{{define "channel/kafka/publishMethods/block1"}} + envelope.SetTopic(c.Topic()) +{{- end}} + {{- def .Type}} -{{- if .BoundMessages }} - {{template "channel/proto/newFunction" .}} +{{template "channel/proto/newFunction" .}} - {{template "channel/proto/openFunction" .}} -{{- end}} +{{template "channel/proto/openFunction" .}} {{- if ndefined .Type}} {{.Type | godef}} diff --git a/templates/kafka/kafka_operation.tmpl b/templates/kafka/kafka_operation.tmpl index 32d0d1b..172fd20 100644 --- a/templates/kafka/kafka_operation.tmpl +++ b/templates/kafka/kafka_operation.tmpl @@ -1,21 +1,14 @@ -{{define "operation/kafka/newFunction/block1"}} - res.topic = res.name.String() - {{- if .BindingsType}} - bindings := {{.BindingsType | gousage}}{}.{{.Protocol | capitalize | goid}}() - if bindings.Topic != "" { - res.topic = bindings.Topic - } - {{- end}} -{{- end}} {{define "operation/kafka/publishMethods/block1"}} - envelope.SetTopic(o.topic) + envelope.SetTopic(o.Channel.Topic()) {{- end}} - {{- with deref .}} + {{ .Type | def }} + {{template "operation/proto/newFunction" .}} {{template "operation/proto/openFunction" .}} {{ .Type | godef }} + {{template "operation/proto/commonMethods" .}} {{if .IsPublisher}}{{template "operation/proto/publishMethods" .}}{{end}} {{if .IsSubscriber}}{{template "operation/proto/subscribeMethods" .}}{{end}} {{template "operation/proto/serverInterface" .}} diff --git a/templates/mqtt/mqtt_channel.tmpl b/templates/mqtt/mqtt_channel.tmpl index a713ea6..627dfc8 100644 --- a/templates/mqtt/mqtt_channel.tmpl +++ b/templates/mqtt/mqtt_channel.tmpl @@ -1,9 +1,14 @@ +{{define "channel/mqtt/newFunction/block1"}} + res.topic = res.address.String() +{{- end}} +{{define "channel/mqtt/publishMethods/block1"}} + envelope.SetTopic(c.Topic()) +{{- end}} + {{- def .Type}} -{{- if .BoundMessages }} - {{template "channel/proto/newFunction" .}} +{{template "channel/proto/newFunction" .}} - {{template "channel/proto/openFunction" .}} -{{- end}} +{{template "channel/proto/openFunction" .}} {{- if ndefined .Type}} {{ .Type | godef }} diff --git a/templates/mqtt/mqtt_operation.tmpl b/templates/mqtt/mqtt_operation.tmpl index 0944a0c..5caf22b 100644 --- a/templates/mqtt/mqtt_operation.tmpl +++ b/templates/mqtt/mqtt_operation.tmpl @@ -1,15 +1,14 @@ -{{define "operation/mqtt/newFunction/block1"}} - res.topic = res.name.String() +{{define "channel/mqtt/publishMethods/block1"}} + envelope.SetTopic(o.Channel.Topic()) {{- end}} -{{define "operation/mqtt/publishMethods/block1"}} - envelope.SetTopic(o.topic) -{{- end}} - {{- with deref .}} + {{ .Type | def }} + {{template "operation/proto/newFunction" .}} {{template "operation/proto/openFunction" .}} {{ .Type | godef }} + {{template "operation/proto/commonMethods" .}} {{if .IsPublisher}}{{template "operation/proto/publishMethods" .}}{{end}} {{if .IsSubscriber}}{{template "operation/proto/subscribeMethods" .}}{{end}} {{template "operation/proto/serverInterface" .}} diff --git a/templates/redis/redis_channel.tmpl b/templates/redis/redis_channel.tmpl index b78d0e7..5272089 100644 --- a/templates/redis/redis_channel.tmpl +++ b/templates/redis/redis_channel.tmpl @@ -1,9 +1,7 @@ {{- def .Type}} -{{- if .BoundMessages }} - {{template "channel/proto/newFunction" .}} +{{template "channel/proto/newFunction" .}} - {{template "channel/proto/openFunction" .}} -{{- end}} +{{template "channel/proto/openFunction" .}} {{- if ndefined .Type}} {{.Type| godef}} diff --git a/templates/redis/redis_operation.tmpl b/templates/redis/redis_operation.tmpl index 4eaa117..ec08073 100644 --- a/templates/redis/redis_operation.tmpl +++ b/templates/redis/redis_operation.tmpl @@ -1,8 +1,11 @@ {{- with deref .}} + {{ .Type | def }} + {{template "operation/proto/newFunction" .}} {{template "operation/proto/openFunction" .}} {{ .Type | godef }} + {{template "operation/proto/commonMethods" .}} {{if .IsPublisher}}{{template "operation/proto/publishMethods" .}}{{end}} {{if .IsSubscriber}}{{template "operation/proto/subscribeMethods" .}}{{end}} {{template "operation/proto/serverInterface" .}} diff --git a/templates/tcp/tcp_channel.tmpl b/templates/tcp/tcp_channel.tmpl index 52a7b40..6806bdc 100644 --- a/templates/tcp/tcp_channel.tmpl +++ b/templates/tcp/tcp_channel.tmpl @@ -1,9 +1,7 @@ {{- def .Type}} -{{- if .BoundMessages }} - {{template "channel/proto/newFunction" .}} +{{template "channel/proto/newFunction" .}} - {{template "channel/proto/openFunction" .}} -{{- end}} +{{template "channel/proto/openFunction" .}} {{- if ndefined .Type}} {{.Type| godef}} diff --git a/templates/tcp/tcp_operation.tmpl b/templates/tcp/tcp_operation.tmpl index 4eaa117..ec08073 100644 --- a/templates/tcp/tcp_operation.tmpl +++ b/templates/tcp/tcp_operation.tmpl @@ -1,8 +1,11 @@ {{- with deref .}} + {{ .Type | def }} + {{template "operation/proto/newFunction" .}} {{template "operation/proto/openFunction" .}} {{ .Type | godef }} + {{template "operation/proto/commonMethods" .}} {{if .IsPublisher}}{{template "operation/proto/publishMethods" .}}{{end}} {{if .IsSubscriber}}{{template "operation/proto/subscribeMethods" .}}{{end}} {{template "operation/proto/serverInterface" .}} diff --git a/templates/udp/udp_channel.tmpl b/templates/udp/udp_channel.tmpl index b78d0e7..5272089 100644 --- a/templates/udp/udp_channel.tmpl +++ b/templates/udp/udp_channel.tmpl @@ -1,9 +1,7 @@ {{- def .Type}} -{{- if .BoundMessages }} - {{template "channel/proto/newFunction" .}} +{{template "channel/proto/newFunction" .}} - {{template "channel/proto/openFunction" .}} -{{- end}} +{{template "channel/proto/openFunction" .}} {{- if ndefined .Type}} {{.Type| godef}} diff --git a/templates/udp/udp_operation.tmpl b/templates/udp/udp_operation.tmpl index 4eaa117..ec08073 100644 --- a/templates/udp/udp_operation.tmpl +++ b/templates/udp/udp_operation.tmpl @@ -1,8 +1,11 @@ {{- with deref .}} + {{ .Type | def }} + {{template "operation/proto/newFunction" .}} {{template "operation/proto/openFunction" .}} {{ .Type | godef }} + {{template "operation/proto/commonMethods" .}} {{if .IsPublisher}}{{template "operation/proto/publishMethods" .}}{{end}} {{if .IsSubscriber}}{{template "operation/proto/subscribeMethods" .}}{{end}} {{template "operation/proto/serverInterface" .}} diff --git a/templates/ws/ws_channel.tmpl b/templates/ws/ws_channel.tmpl index b78d0e7..5272089 100644 --- a/templates/ws/ws_channel.tmpl +++ b/templates/ws/ws_channel.tmpl @@ -1,9 +1,7 @@ {{- def .Type}} -{{- if .BoundMessages }} - {{template "channel/proto/newFunction" .}} +{{template "channel/proto/newFunction" .}} - {{template "channel/proto/openFunction" .}} -{{- end}} +{{template "channel/proto/openFunction" .}} {{- if ndefined .Type}} {{.Type| godef}} diff --git a/templates/ws/ws_operation.tmpl b/templates/ws/ws_operation.tmpl index 4eaa117..ec08073 100644 --- a/templates/ws/ws_operation.tmpl +++ b/templates/ws/ws_operation.tmpl @@ -1,8 +1,11 @@ {{- with deref .}} + {{ .Type | def }} + {{template "operation/proto/newFunction" .}} {{template "operation/proto/openFunction" .}} {{ .Type | godef }} + {{template "operation/proto/commonMethods" .}} {{if .IsPublisher}}{{template "operation/proto/publishMethods" .}}{{end}} {{if .IsSubscriber}}{{template "operation/proto/subscribeMethods" .}}{{end}} {{template "operation/proto/serverInterface" .}}