Skip to content
This repository has been archived by the owner on Nov 8, 2023. It is now read-only.

Commit

Permalink
events resolver & refactoring (#83)
Browse files Browse the repository at this point in the history
  • Loading branch information
vitiko authored Dec 16, 2021
1 parent 5555d8a commit 97ff153
Show file tree
Hide file tree
Showing 9 changed files with 144 additions and 88 deletions.
29 changes: 29 additions & 0 deletions state/mapping/event_instance.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package mapping

import (
"github.com/s7techlab/cckit/state"
)

type (
EventInstance struct {
instance interface{}
eventMapper EventMapper
serializer state.Serializer
}
)

func NewEventInstance(instance interface{}, eventMapper EventMapper, serializer state.Serializer) (*EventInstance, error) {
return &EventInstance{
instance: instance,
eventMapper: eventMapper,
serializer: serializer,
}, nil
}

func (ei EventInstance) Name() (string, error) {
return ei.eventMapper.Name(ei.instance)
}

func (ei EventInstance) ToBytes() ([]byte, error) {
return ei.serializer.ToBytes(ei.instance)
}
22 changes: 0 additions & 22 deletions state/mapping/event_mapped.go

This file was deleted.

18 changes: 16 additions & 2 deletions state/mapping/event_mapping.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ import (
"github.com/s7techlab/cckit/state"
)

var (
ErrEventNameNotFound = errors.New(`event name not found`)
)

type (
Namer func(entity interface{}) string

Expand Down Expand Up @@ -77,20 +81,30 @@ func (emm EventMappings) Exists(entry interface{}) bool {
return err == nil
}

func (emm EventMappings) Map(entry interface{}) (mapped EventMapped, err error) {
func (emm EventMappings) Map(entry interface{}) (instance *EventInstance, err error) {
mapping, err := emm.Get(entry)
if err != nil {
return nil, errors.Wrap(err, `mapping`)
}

switch entry.(type) {
case proto.Message:
return NewProtoEventMapped(entry, mapping)
return NewEventInstance(entry, mapping, DefaultSerializer)
default:
return nil, ErrEntryTypeNotSupported
}
}

func (emm EventMappings) Resolve(eventName string, payload []byte) (event interface{}, err error) {
for _, m := range emm {
if m.name == eventName {
return DefaultSerializer.FromBytes(payload, m.Schema())
}
}

return nil, ErrEventNameNotFound
}

func (em EventMapping) Schema() interface{} {
return em.schema
}
Expand Down
2 changes: 1 addition & 1 deletion state/mapping/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ func (s *Impl) GetByKey(
return nil, ErrStateMappingNotFound
}

keyRef, err := s.State.Get(NewKeyRefIDMapped(entry, idx, idxVal), &schema.KeyRef{})
keyRef, err := s.State.Get(NewKeyRefIDInstance(entry, idx, idxVal), &schema.KeyRef{})
if err != nil {
return nil, errors.Errorf(`%s: {%s}.%s: %s`, ErrIndexReferenceNotFound, mapKey(entry), idx, err)
}
Expand Down
44 changes: 44 additions & 0 deletions state/mapping/state_instance.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package mapping

import (
"github.com/s7techlab/cckit/state"
)

type (
StateInstance struct {
// instance can be instance itself or key for instance
// key can be proto or Key ( []string )
instance interface{}
stateMapper StateMapper
serializer state.Serializer
}
)

func NewStateInstance(instance interface{}, stateMapper StateMapper, serializer state.Serializer) *StateInstance {
return &StateInstance{
instance: instance,
stateMapper: stateMapper,
serializer: serializer,
}
}

func (si *StateInstance) Key() (state.Key, error) {
switch instance := si.instance.(type) {
case []string:
return instance, nil
default:
return si.stateMapper.PrimaryKey(instance)
}
}

func (si *StateInstance) Keys() ([]state.KeyValue, error) {
return si.stateMapper.Keys(si.instance)
}

func (si *StateInstance) ToBytes() ([]byte, error) {
return si.serializer.ToBytes(si.instance)
}

func (si *StateInstance) Mapper() StateMapper {
return si.stateMapper
}
12 changes: 7 additions & 5 deletions state/mapping/state_keyref.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,14 @@ func NewKeyRefID(target interface{}, idx string, refKey state.Key) *schema.KeyRe
}
}

func NewKeyRefMapped(target interface{}, idx string, refKey, pKey state.Key) *ProtoStateMapped {
return NewProtoStateMapped(NewKeyRef(target, idx, refKey, pKey), KeyRefMapper)
func NewKeyRefInstance(target interface{}, idx string, refKey, pKey state.Key) *StateInstance {
return NewStateInstance(NewKeyRef(target, idx, refKey, pKey), KeyRefMapper, DefaultSerializer)
}

func NewKeyRefIDMapped(target interface{}, idx string, refKey state.Key) *ProtoStateMapped {
return NewProtoStateMapped(
func NewKeyRefIDInstance(target interface{}, idx string, refKey state.Key) *StateInstance {
return NewStateInstance(
NewKeyRefID(target, idx, refKey),
KeyRefIDMapper)
KeyRefIDMapper,
DefaultSerializer,
)
}
43 changes: 0 additions & 43 deletions state/mapping/state_mapped.go

This file was deleted.

21 changes: 10 additions & 11 deletions state/mapping/state_mapping.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ import (
"github.com/s7techlab/cckit/state"
)

var (
DefaultSerializer = &state.ProtoSerializer{}
)

type (
// StateMappers interface for mappers collection
StateMappers interface {
Expand All @@ -20,7 +24,7 @@ type (
PrimaryKey(schema interface{}) (key state.Key, err error)
}

// StateMapper
// StateMapper interface for dealing with mapped state
StateMapper interface {
Schema() interface{}
List() interface{}
Expand All @@ -38,11 +42,6 @@ type (
InstanceKeyer func(instance interface{}) (state.Key, error)
InstanceMultiKeyer func(instance interface{}) ([]state.Key, error)

StateMapped interface {
state.KeyValue // entry key and value
Mapper() StateMapper
Keys() ([]state.KeyValue, error)
}
// StateMapping defines metadata for mapping from schema to state keys/values
StateMapping struct {
schema interface{}
Expand Down Expand Up @@ -150,23 +149,23 @@ func (smm StateMappings) PrimaryKey(entry interface{}) (pkey state.Key, err erro
return m.PrimaryKey(entry)
}

func (smm StateMappings) Map(entry interface{}) (mapped StateMapped, err error) {
func (smm StateMappings) Map(entry interface{}) (instance *StateInstance, err error) {
mapper, err := smm.Get(entry)
if err != nil {
return nil, errors.Wrap(err, `mapping`)
}

switch entry.(type) {
case proto.Message, []string:
return NewProtoStateMapped(entry, mapper), nil
return NewStateInstance(entry, mapper, DefaultSerializer), nil
default:
return nil, ErrEntryTypeNotSupported
}
}

//
func (smm *StateMappings) IdxKey(entity interface{}, idx string, idxVal state.Key) (state.Key, error) {
keyMapped := NewKeyRefIDMapped(entity, idx, idxVal)
keyMapped := NewKeyRefIDInstance(entity, idx, idxVal)
return keyMapped.Key()
}

Expand Down Expand Up @@ -202,7 +201,7 @@ func (sm *StateMapping) PrimaryKey(entity interface{}) (state.Key, error) {
return append(sm.namespace, key...), nil
}

// Indexes prepares primary and additional uniq/non-uniq keys for storage
// Keys prepares primary and additional uniq/non-uniq keys for storage
func (sm *StateMapping) Keys(entity interface{}) ([]state.KeyValue, error) {
if len(sm.indexes) == 0 {
return nil, nil
Expand All @@ -223,7 +222,7 @@ func (sm *StateMapping) Keys(entity interface{}) ([]state.KeyValue, error) {

for _, key := range idxKeys {
// key will be <`_idx`,{SchemaName},{idxName}, {Key[1]},... {Key[n}}>s
stateKeys = append(stateKeys, NewKeyRefMapped(sm.schema, idx.Name, key, pk))
stateKeys = append(stateKeys, NewKeyRefInstance(sm.schema, idx.Name, key, pk))
}
}

Expand Down
41 changes: 37 additions & 4 deletions state/tranformer.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
package state

import "github.com/s7techlab/cckit/convert"
import (
"encoding/json"

"github.com/golang/protobuf/proto"

"github.com/s7techlab/cckit/convert"
)

type (
// ToBytesTransformer is used after getState operation for convert value
// FromBytesTransformer is used after getState operation for convert value
FromBytesTransformer func(bb []byte, config ...interface{}) (interface{}, error)

// ToBytesTransformer is used before putState operation for convert payload
Expand All @@ -12,8 +18,19 @@ type (
// KeyTransformer is used before putState operation for convert key
KeyTransformer func(Key) (Key, error)

// NameTransformer is used before setEvent operation for convert name
// StringTransformer is used before setEvent operation for convert name
StringTransformer func(string) (string, error)

Serializer interface {
ToBytes(interface{}) ([]byte, error)
FromBytes(serialized []byte, target interface{}) (interface{}, error)
}

ProtoSerializer struct {
}

JSONSerializer struct {
}
)

func ConvertFromBytes(bb []byte, config ...interface{}) (interface{}, error) {
Expand All @@ -28,11 +45,27 @@ func ConvertToBytes(v interface{}, config ...interface{}) ([]byte, error) {
return convert.ToBytes(v)
}

// ConvertKey returns string parts of composite key
// KeyAsIs returns string parts of composite key
func KeyAsIs(key Key) (Key, error) {
return key, nil
}

func NameAsIs(name string) (string, error) {
return name, nil
}

func (ps *ProtoSerializer) ToBytes(entry interface{}) ([]byte, error) {
return proto.Marshal(entry.(proto.Message))
}

func (ps *ProtoSerializer) FromBytes(serialized []byte, target interface{}) (interface{}, error) {
return convert.FromBytes(serialized, target)
}

func (js *JSONSerializer) ToBytes(entry interface{}) ([]byte, error) {
return json.Marshal(entry)
}

func (js *JSONSerializer) FromBytes(serialized []byte, target interface{}) (interface{}, error) {
return convert.JSONUnmarshalPtr(serialized, target)
}

0 comments on commit 97ff153

Please sign in to comment.