diff --git a/state/mapping/event_instance.go b/state/mapping/event_instance.go new file mode 100644 index 00000000..902459ec --- /dev/null +++ b/state/mapping/event_instance.go @@ -0,0 +1,29 @@ +package mapping + +import ( + "github.com/s7techlab/cckit/state" +) + +type ( + EventInstance struct { + instance interface{} + eventMapper EventMapper + serializer state.Serializer + } +) + +func NewEventInstance(instance interface{}, eventMapper EventMapper, serializer state.Serializer) (*EventInstance, error) { + return &EventInstance{ + instance: instance, + eventMapper: eventMapper, + serializer: serializer, + }, nil +} + +func (ei EventInstance) Name() (string, error) { + return ei.eventMapper.Name(ei.instance) +} + +func (ei EventInstance) ToBytes() ([]byte, error) { + return ei.serializer.ToBytes(ei.instance) +} diff --git a/state/mapping/event_mapped.go b/state/mapping/event_mapped.go deleted file mode 100644 index 0742f033..00000000 --- a/state/mapping/event_mapped.go +++ /dev/null @@ -1,22 +0,0 @@ -package mapping - -import "github.com/golang/protobuf/proto" - -type ( - ProtoEventMapped struct { - instance interface{} - eventMapper EventMapper - } -) - -func NewProtoEventMapped(instance interface{}, eventMapper EventMapper) (*ProtoEventMapped, error) { - return &ProtoEventMapped{instance, eventMapper}, nil -} - -func (em *ProtoEventMapped) Name() (string, error) { - return em.eventMapper.Name(em.instance) -} - -func (em *ProtoEventMapped) ToBytes() ([]byte, error) { - return proto.Marshal(em.instance.(proto.Message)) -} diff --git a/state/mapping/event_mapping.go b/state/mapping/event_mapping.go index 68038baf..ae339a64 100644 --- a/state/mapping/event_mapping.go +++ b/state/mapping/event_mapping.go @@ -10,6 +10,10 @@ import ( "github.com/s7techlab/cckit/state" ) +var ( + ErrEventNameNotFound = errors.New(`event name not found`) +) + type ( Namer func(entity interface{}) string @@ -77,7 +81,7 @@ func (emm EventMappings) Exists(entry interface{}) bool { return err == nil } -func (emm EventMappings) Map(entry interface{}) (mapped EventMapped, err error) { +func (emm EventMappings) Map(entry interface{}) (instance *EventInstance, err error) { mapping, err := emm.Get(entry) if err != nil { return nil, errors.Wrap(err, `mapping`) @@ -85,12 +89,22 @@ func (emm EventMappings) Map(entry interface{}) (mapped EventMapped, err error) switch entry.(type) { case proto.Message: - return NewProtoEventMapped(entry, mapping) + return NewEventInstance(entry, mapping, DefaultSerializer) default: return nil, ErrEntryTypeNotSupported } } +func (emm EventMappings) Resolve(eventName string, payload []byte) (event interface{}, err error) { + for _, m := range emm { + if m.name == eventName { + return DefaultSerializer.FromBytes(payload, m.Schema()) + } + } + + return nil, ErrEventNameNotFound +} + func (em EventMapping) Schema() interface{} { return em.schema } diff --git a/state/mapping/state.go b/state/mapping/state.go index 59f5d6ed..5d386225 100644 --- a/state/mapping/state.go +++ b/state/mapping/state.go @@ -249,7 +249,7 @@ func (s *Impl) GetByKey( return nil, ErrStateMappingNotFound } - keyRef, err := s.State.Get(NewKeyRefIDMapped(entry, idx, idxVal), &schema.KeyRef{}) + keyRef, err := s.State.Get(NewKeyRefIDInstance(entry, idx, idxVal), &schema.KeyRef{}) if err != nil { return nil, errors.Errorf(`%s: {%s}.%s: %s`, ErrIndexReferenceNotFound, mapKey(entry), idx, err) } diff --git a/state/mapping/state_instance.go b/state/mapping/state_instance.go new file mode 100644 index 00000000..992e4f8a --- /dev/null +++ b/state/mapping/state_instance.go @@ -0,0 +1,44 @@ +package mapping + +import ( + "github.com/s7techlab/cckit/state" +) + +type ( + StateInstance struct { + // instance can be instance itself or key for instance + // key can be proto or Key ( []string ) + instance interface{} + stateMapper StateMapper + serializer state.Serializer + } +) + +func NewStateInstance(instance interface{}, stateMapper StateMapper, serializer state.Serializer) *StateInstance { + return &StateInstance{ + instance: instance, + stateMapper: stateMapper, + serializer: serializer, + } +} + +func (si *StateInstance) Key() (state.Key, error) { + switch instance := si.instance.(type) { + case []string: + return instance, nil + default: + return si.stateMapper.PrimaryKey(instance) + } +} + +func (si *StateInstance) Keys() ([]state.KeyValue, error) { + return si.stateMapper.Keys(si.instance) +} + +func (si *StateInstance) ToBytes() ([]byte, error) { + return si.serializer.ToBytes(si.instance) +} + +func (si *StateInstance) Mapper() StateMapper { + return si.stateMapper +} diff --git a/state/mapping/state_keyref.go b/state/mapping/state_keyref.go index 899ab22e..5dc533e7 100644 --- a/state/mapping/state_keyref.go +++ b/state/mapping/state_keyref.go @@ -42,12 +42,14 @@ func NewKeyRefID(target interface{}, idx string, refKey state.Key) *schema.KeyRe } } -func NewKeyRefMapped(target interface{}, idx string, refKey, pKey state.Key) *ProtoStateMapped { - return NewProtoStateMapped(NewKeyRef(target, idx, refKey, pKey), KeyRefMapper) +func NewKeyRefInstance(target interface{}, idx string, refKey, pKey state.Key) *StateInstance { + return NewStateInstance(NewKeyRef(target, idx, refKey, pKey), KeyRefMapper, DefaultSerializer) } -func NewKeyRefIDMapped(target interface{}, idx string, refKey state.Key) *ProtoStateMapped { - return NewProtoStateMapped( +func NewKeyRefIDInstance(target interface{}, idx string, refKey state.Key) *StateInstance { + return NewStateInstance( NewKeyRefID(target, idx, refKey), - KeyRefIDMapper) + KeyRefIDMapper, + DefaultSerializer, + ) } diff --git a/state/mapping/state_mapped.go b/state/mapping/state_mapped.go deleted file mode 100644 index 8f2a400d..00000000 --- a/state/mapping/state_mapped.go +++ /dev/null @@ -1,43 +0,0 @@ -package mapping - -import ( - "github.com/golang/protobuf/proto" - "github.com/s7techlab/cckit/state" -) - -type ( - ProtoStateMapped struct { - // instance can be instance itself or key for instance - // key can be proto or Key ( []string ) - instance interface{} - stateMapper StateMapper - } -) - -func NewProtoStateMapped(instance interface{}, stateMapper StateMapper) *ProtoStateMapped { - return &ProtoStateMapped{ - instance: instance, - stateMapper: stateMapper, - } -} - -func (pm *ProtoStateMapped) Key() (state.Key, error) { - switch instance := pm.instance.(type) { - case []string: - return instance, nil - default: - return pm.stateMapper.PrimaryKey(instance) - } -} - -func (pm *ProtoStateMapped) Keys() ([]state.KeyValue, error) { - return pm.stateMapper.Keys(pm.instance) -} - -func (pm *ProtoStateMapped) ToBytes() ([]byte, error) { - return proto.Marshal(pm.instance.(proto.Message)) -} - -func (pm *ProtoStateMapped) Mapper() StateMapper { - return pm.stateMapper -} diff --git a/state/mapping/state_mapping.go b/state/mapping/state_mapping.go index 67e36d92..61fbb816 100644 --- a/state/mapping/state_mapping.go +++ b/state/mapping/state_mapping.go @@ -11,6 +11,10 @@ import ( "github.com/s7techlab/cckit/state" ) +var ( + DefaultSerializer = &state.ProtoSerializer{} +) + type ( // StateMappers interface for mappers collection StateMappers interface { @@ -20,7 +24,7 @@ type ( PrimaryKey(schema interface{}) (key state.Key, err error) } - // StateMapper + // StateMapper interface for dealing with mapped state StateMapper interface { Schema() interface{} List() interface{} @@ -38,11 +42,6 @@ type ( InstanceKeyer func(instance interface{}) (state.Key, error) InstanceMultiKeyer func(instance interface{}) ([]state.Key, error) - StateMapped interface { - state.KeyValue // entry key and value - Mapper() StateMapper - Keys() ([]state.KeyValue, error) - } // StateMapping defines metadata for mapping from schema to state keys/values StateMapping struct { schema interface{} @@ -150,7 +149,7 @@ func (smm StateMappings) PrimaryKey(entry interface{}) (pkey state.Key, err erro return m.PrimaryKey(entry) } -func (smm StateMappings) Map(entry interface{}) (mapped StateMapped, err error) { +func (smm StateMappings) Map(entry interface{}) (instance *StateInstance, err error) { mapper, err := smm.Get(entry) if err != nil { return nil, errors.Wrap(err, `mapping`) @@ -158,7 +157,7 @@ func (smm StateMappings) Map(entry interface{}) (mapped StateMapped, err error) switch entry.(type) { case proto.Message, []string: - return NewProtoStateMapped(entry, mapper), nil + return NewStateInstance(entry, mapper, DefaultSerializer), nil default: return nil, ErrEntryTypeNotSupported } @@ -166,7 +165,7 @@ func (smm StateMappings) Map(entry interface{}) (mapped StateMapped, err error) // func (smm *StateMappings) IdxKey(entity interface{}, idx string, idxVal state.Key) (state.Key, error) { - keyMapped := NewKeyRefIDMapped(entity, idx, idxVal) + keyMapped := NewKeyRefIDInstance(entity, idx, idxVal) return keyMapped.Key() } @@ -202,7 +201,7 @@ func (sm *StateMapping) PrimaryKey(entity interface{}) (state.Key, error) { return append(sm.namespace, key...), nil } -// Indexes prepares primary and additional uniq/non-uniq keys for storage +// Keys prepares primary and additional uniq/non-uniq keys for storage func (sm *StateMapping) Keys(entity interface{}) ([]state.KeyValue, error) { if len(sm.indexes) == 0 { return nil, nil @@ -223,7 +222,7 @@ func (sm *StateMapping) Keys(entity interface{}) ([]state.KeyValue, error) { for _, key := range idxKeys { // key will be <`_idx`,{SchemaName},{idxName}, {Key[1]},... {Key[n}}>s - stateKeys = append(stateKeys, NewKeyRefMapped(sm.schema, idx.Name, key, pk)) + stateKeys = append(stateKeys, NewKeyRefInstance(sm.schema, idx.Name, key, pk)) } } diff --git a/state/tranformer.go b/state/tranformer.go index 993351bb..e58d4e5c 100644 --- a/state/tranformer.go +++ b/state/tranformer.go @@ -1,9 +1,15 @@ package state -import "github.com/s7techlab/cckit/convert" +import ( + "encoding/json" + + "github.com/golang/protobuf/proto" + + "github.com/s7techlab/cckit/convert" +) type ( - // ToBytesTransformer is used after getState operation for convert value + // FromBytesTransformer is used after getState operation for convert value FromBytesTransformer func(bb []byte, config ...interface{}) (interface{}, error) // ToBytesTransformer is used before putState operation for convert payload @@ -12,8 +18,19 @@ type ( // KeyTransformer is used before putState operation for convert key KeyTransformer func(Key) (Key, error) - // NameTransformer is used before setEvent operation for convert name + // StringTransformer is used before setEvent operation for convert name StringTransformer func(string) (string, error) + + Serializer interface { + ToBytes(interface{}) ([]byte, error) + FromBytes(serialized []byte, target interface{}) (interface{}, error) + } + + ProtoSerializer struct { + } + + JSONSerializer struct { + } ) func ConvertFromBytes(bb []byte, config ...interface{}) (interface{}, error) { @@ -28,7 +45,7 @@ func ConvertToBytes(v interface{}, config ...interface{}) ([]byte, error) { return convert.ToBytes(v) } -// ConvertKey returns string parts of composite key +// KeyAsIs returns string parts of composite key func KeyAsIs(key Key) (Key, error) { return key, nil } @@ -36,3 +53,19 @@ func KeyAsIs(key Key) (Key, error) { func NameAsIs(name string) (string, error) { return name, nil } + +func (ps *ProtoSerializer) ToBytes(entry interface{}) ([]byte, error) { + return proto.Marshal(entry.(proto.Message)) +} + +func (ps *ProtoSerializer) FromBytes(serialized []byte, target interface{}) (interface{}, error) { + return convert.FromBytes(serialized, target) +} + +func (js *JSONSerializer) ToBytes(entry interface{}) ([]byte, error) { + return json.Marshal(entry) +} + +func (js *JSONSerializer) FromBytes(serialized []byte, target interface{}) (interface{}, error) { + return convert.JSONUnmarshalPtr(serialized, target) +}