Skip to content

Commit

Permalink
Refactor ShardProvider Interface
Browse files Browse the repository at this point in the history
  • Loading branch information
kingster committed Aug 3, 2022
1 parent 65b57dc commit 780db31
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 30 deletions.
70 changes: 41 additions & 29 deletions clients/go/dkv/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,12 @@ type DKVNodeSet struct {
Nodes []DKVNode `json:"nodes"`
}

//DKVShardKey defined a DKV ShardKey
type DKVShardKey string

//DKVShard defines a group of DKVNodeSet(s) along with their roles.
type DKVShard struct {
Name string `json:"name"`
Name DKVShardKey `json:"name"`
Topology map[DKVServerRole]*DKVNodeSet `json:"topology"`
}

Expand All @@ -53,45 +56,55 @@ func (s DKVShard) getNodesByType(nodeType ...DKVServerRole) (*DKVNodeSet, error)
return nil, fmt.Errorf("valid DKV node type must be given")
}

type ListOfKeys [][]byte
// DKVKeyGroup Group of DKV Shard and associated keys.
type DKVKeyGroup struct {
//Keys Set of byte Keys
Keys [][]byte
//Shard the DKV Shard associated with this keys
Shard *DKVShard
}

//ShardProvider Provides the ShardInformation for the given key(s)
type ShardProvider interface {
ProvideShard(key []byte) (*DKVShard, error)
ProvideShards(keys ...[]byte) (map[*DKVShard]ListOfKeys, error)
ProvideShards(keys ...[]byte) ([]DKVKeyGroup, error)
}

//KeyHashBasedShardProvider A xxhash based shared provider.
type KeyHashBasedShardProvider struct {
shardConfiguration []DKVShard
}

func (p *KeyHashBasedShardProvider) getShardID(key []byte) int {
h := xxhash.New64()
h.Write(key)
hash := h.Sum64()
var id = (hash & 0xFFFF) % uint64(len(p.shardConfiguration))
//LongHashFunction xx = LongHashFunction.xx();
//long hsh = xx.hashBytes(key);
//return (int) ((hsh & 0xFFFF) % shardConfiguration.getNumShards());
return int(id)
}

//ProvideShard provides shard based on input key
//ProvideShard provides shardId based on input key
func (p *KeyHashBasedShardProvider) ProvideShard(key []byte) (*DKVShard, error) {
shardId := p.getShardID(key)
return &p.shardConfiguration[shardId], nil
}

//ProvideShards provides list of pairs<shard, keys> for given list of keys.
func (p *KeyHashBasedShardProvider) ProvideShards(keys ...[]byte) (map[*DKVShard]ListOfKeys, error) {
m := make(map[*DKVShard]ListOfKeys)
//ProvideShards provides list of pairs<shardId, keys> for given list of keys.
func (p *KeyHashBasedShardProvider) ProvideShards(keys ...[]byte) ([]DKVKeyGroup, error) {
m := make(map[int][][]byte)
for _, key := range keys {
shardId := p.getShardID(key)
shard := &p.shardConfiguration[shardId]
m[shard] = append(m[shard], key)
m[shardId] = append(m[shardId], key)
}
return m, nil

values := make([]DKVKeyGroup, 0, len(m))
for id, keys := range m {
values = append(values, DKVKeyGroup{
Keys: keys,
Shard: &p.shardConfiguration[id],
})
}
return values, nil
}

func (p *KeyHashBasedShardProvider) getShardID(key []byte) int {
h := xxhash.New64()
h.Write(key)
hash := h.Sum64()
var id = (hash & 0xFFFF) % uint64(len(p.shardConfiguration))
return int(id)
}

type simpleDKVClient struct {
Expand All @@ -110,6 +123,9 @@ type ShardedDKVClient struct {

// NewShardedDKVClient creates and returns a instance of ShardedDKVClient.
func NewShardedDKVClient(shardProvider ShardProvider) (*ShardedDKVClient, error) {
if shardProvider == nil {
return nil, fmt.Errorf("shardProvider cannot be nil")
}
cache, err := ristretto.NewCache(&ristretto.Config{
NumCounters: 1000,
MaxCost: 1000,
Expand Down Expand Up @@ -200,10 +216,6 @@ func (dkvClnt *ShardedDKVClient) Delete(key []byte) error {
// Get takes the key as byte array along with the consistency,
// finds the corresponding shard and invokes the GRPC Get method.
func (dkvClnt *ShardedDKVClient) Get(rc serverpb.ReadConsistency, key []byte) (*serverpb.GetResponse, error) {
//DKVShard dkvShard = shardProvider.provideShard(key);
//DKVNodeType nodeType = getNodeTypeByReadConsistency(consistency);
//DKVClient dkvClient = pool.getDKVClient(dkvShard, nodeType, UNKNOWN);
//return dkvClient.get(consistency, key);
dkvShard, err := dkvClnt.shardProvider.ProvideShard(key)
if err != nil {
return nil, err
Expand All @@ -221,17 +233,17 @@ func (dkvClnt *ShardedDKVClient) Get(rc serverpb.ReadConsistency, key []byte) (*
// finds the corresponding shard and invokes the GRPC MultiGet method.
func (dkvClnt *ShardedDKVClient) MultiGet(rc serverpb.ReadConsistency, keys ...[]byte) ([]*serverpb.KVPair, error) {
var kv []*serverpb.KVPair
dkvShards, err := dkvClnt.shardProvider.ProvideShards(keys...)
dkvShardIds, err := dkvClnt.shardProvider.ProvideShards(keys...)
if err != nil {
return nil, err
}
nodeType := getNodeTypeByReadConsistency(rc)
for dkvShard, keys := range dkvShards {
_dkvClient, err := dkvClnt.getShardedClient(dkvShard, nodeType, noRole)
for _, keyGroup := range dkvShardIds {
_dkvClient, err := dkvClnt.getShardedClient(keyGroup.Shard, nodeType, noRole)
if err != nil {
return nil, err
}
kvs, err := _dkvClient.MultiGet(rc, keys...)
kvs, err := _dkvClient.MultiGet(rc, keyGroup.Keys...)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion clients/go/dkv/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func TestDistributedClient(t *testing.T) {
func initClusterConfig(t *testing.T) {
for i, v := range dkvPorts {
shard := DKVShard{
Name: fmt.Sprintf("shard%d", i),
Name: DKVShardKey(fmt.Sprintf("shard%d", i)),
Topology: map[DKVServerRole]*DKVNodeSet{
noRole: &DKVNodeSet{
Name: "default",
Expand Down

0 comments on commit 780db31

Please sign in to comment.