Skip to content

Commit

Permalink
Implement matching API to allow updating task list partition config (#…
Browse files Browse the repository at this point in the history
…6472)

## What?
This is a large PR that completes the implementation of the migration of task list partition config from dynamic config to database. With this PR, we're able to start the migration process. This PR does the following:
- **Functionality change**:
  - Introduce new matching API allowing us to Update and Refresh task list partition config which is stored in database
  - Implement Frontend's UpdateTaskListPartitionConfig API
  - Update DescribeTaskList API and the admin CLI command to include task list partition config in the result.
- **Observability Improvement**:
  - Update Matching's Task List Manager to emit metrics for task list partition config
  - Update Partition Config Provider to emit metrics for task list partition config
 - **Bug fix**:
    - In #6427, we forgot update the partition config inside PollForActivityTask, AddActivityTask, AddDecisionTask methods.
- **Documentation Improvement**:
  - Update document for scalable tasklist

## Test
This PR has unit tests for all components. The end-to-end tests was done manually by running workflows in dev2.
  • Loading branch information
Shaddoll authored Nov 16, 2024
1 parent b97e553 commit 6f6249b
Show file tree
Hide file tree
Showing 47 changed files with 3,898 additions and 592 deletions.
1,341 changes: 1,171 additions & 170 deletions .gen/proto/matching/v1/service.pb.go

Large diffs are not rendered by default.

421 changes: 264 additions & 157 deletions .gen/proto/matching/v1/service.pb.yarpc.go

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion client/clientfactory.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func (cf *rpcClientFactory) NewMatchingClientWithTimeout(

peerResolver := matching.NewPeerResolver(cf.resolver, namedPort)

partitionConfigProvider := matching.NewPartitionConfigProvider(cf.logger, domainIDToName, cf.dynConfig)
partitionConfigProvider := matching.NewPartitionConfigProvider(cf.logger, cf.metricsClient, domainIDToName, cf.dynConfig)
defaultLoadBalancer := matching.NewLoadBalancer(partitionConfigProvider)
roundRobinLoadBalancer := matching.NewRoundRobinLoadBalancer(partitionConfigProvider)
weightedLoadBalancer := matching.NewWeightedLoadBalancer(roundRobinLoadBalancer, partitionConfigProvider, cf.logger)
Expand Down
66 changes: 63 additions & 3 deletions client/matching/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,24 @@ func (c *clientImpl) AddActivityTask(
persistence.TaskListTypeActivity,
request.GetForwardedFrom(),
)
originalTaskListName := request.TaskList.GetName()
request.TaskList.Name = partition
peer, err := c.peerResolver.FromTaskList(request.TaskList.GetName())
if err != nil {
return nil, err
}
return c.client.AddActivityTask(ctx, request, append(opts, yarpc.WithShardKey(peer))...)
resp, err := c.client.AddActivityTask(ctx, request, append(opts, yarpc.WithShardKey(peer))...)
if err != nil {
return nil, err
}
request.TaskList.Name = originalTaskListName
c.provider.UpdatePartitionConfig(
request.GetDomainUUID(),
*request.TaskList,
persistence.TaskListTypeActivity,
resp.PartitionConfig,
)
return resp, nil
}

func (c *clientImpl) AddDecisionTask(
Expand All @@ -84,12 +96,24 @@ func (c *clientImpl) AddDecisionTask(
persistence.TaskListTypeDecision,
request.GetForwardedFrom(),
)
originalTaskListName := request.TaskList.GetName()
request.TaskList.Name = partition
peer, err := c.peerResolver.FromTaskList(request.TaskList.GetName())
if err != nil {
return nil, err
}
return c.client.AddDecisionTask(ctx, request, append(opts, yarpc.WithShardKey(peer))...)
resp, err := c.client.AddDecisionTask(ctx, request, append(opts, yarpc.WithShardKey(peer))...)
if err != nil {
return nil, err
}
request.TaskList.Name = originalTaskListName
c.provider.UpdatePartitionConfig(
request.GetDomainUUID(),
*request.TaskList,
persistence.TaskListTypeDecision,
resp.PartitionConfig,
)
return resp, nil
}

func (c *clientImpl) PollForActivityTask(
Expand All @@ -103,13 +127,25 @@ func (c *clientImpl) PollForActivityTask(
persistence.TaskListTypeActivity,
request.GetForwardedFrom(),
)
originalTaskListName := request.PollRequest.GetTaskList().GetName()
request.PollRequest.TaskList.Name = partition
peer, err := c.peerResolver.FromTaskList(request.PollRequest.TaskList.GetName())
if err != nil {
return nil, err
}
// TODO: update activity response to include backlog count hint and update the weight for partitions
return c.client.PollForActivityTask(ctx, request, append(opts, yarpc.WithShardKey(peer))...)
resp, err := c.client.PollForActivityTask(ctx, request, append(opts, yarpc.WithShardKey(peer))...)
if err != nil {
return nil, err
}
request.PollRequest.TaskList.Name = originalTaskListName
c.provider.UpdatePartitionConfig(
request.GetDomainUUID(),
*request.PollRequest.GetTaskList(),
persistence.TaskListTypeActivity,
resp.PartitionConfig,
)
return resp, nil
}

func (c *clientImpl) PollForDecisionTask(
Expand Down Expand Up @@ -263,3 +299,27 @@ func (c *clientImpl) GetTaskListsByDomain(
ActivityTaskListMap: activityTaskListMap,
}, nil
}

func (c *clientImpl) UpdateTaskListPartitionConfig(
ctx context.Context,
request *types.MatchingUpdateTaskListPartitionConfigRequest,
opts ...yarpc.CallOption,
) (*types.MatchingUpdateTaskListPartitionConfigResponse, error) {
peer, err := c.peerResolver.FromTaskList(request.TaskList.GetName())
if err != nil {
return nil, err
}
return c.client.UpdateTaskListPartitionConfig(ctx, request, append(opts, yarpc.WithShardKey(peer))...)
}

func (c *clientImpl) RefreshTaskListPartitionConfig(
ctx context.Context,
request *types.MatchingRefreshTaskListPartitionConfigRequest,
opts ...yarpc.CallOption,
) (*types.MatchingRefreshTaskListPartitionConfigResponse, error) {
peer, err := c.peerResolver.FromTaskList(request.TaskList.GetName())
if err != nil {
return nil, err
}
return c.client.RefreshTaskListPartitionConfig(ctx, request, append(opts, yarpc.WithShardKey(peer))...)
}
95 changes: 95 additions & 0 deletions client/matching/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ func TestClient_withResponse(t *testing.T) {
balancer.EXPECT().PickWritePartition(_testDomainUUID, types.TaskList{Name: _testTaskList}, persistence.TaskListTypeActivity, "").Return(_testPartition)
p.EXPECT().FromTaskList(_testPartition).Return("peer0", nil)
c.EXPECT().AddActivityTask(gomock.Any(), gomock.Any(), []yarpc.CallOption{yarpc.WithShardKey("peer0")}).Return(&types.AddActivityTaskResponse{}, nil)
mp.EXPECT().UpdatePartitionConfig(_testDomainUUID, types.TaskList{Name: _testTaskList}, persistence.TaskListTypeActivity, nil)
},
want: &types.AddActivityTaskResponse{},
},
Expand Down Expand Up @@ -198,6 +199,7 @@ func TestClient_withResponse(t *testing.T) {
balancer.EXPECT().PickWritePartition(_testDomainUUID, types.TaskList{Name: _testTaskList}, persistence.TaskListTypeDecision, "").Return(_testPartition)
p.EXPECT().FromTaskList(_testPartition).Return("peer0", nil)
c.EXPECT().AddDecisionTask(gomock.Any(), gomock.Any(), []yarpc.CallOption{yarpc.WithShardKey("peer0")}).Return(&types.AddDecisionTaskResponse{}, nil)
mp.EXPECT().UpdatePartitionConfig(_testDomainUUID, types.TaskList{Name: _testTaskList}, persistence.TaskListTypeDecision, nil)
},
want: &types.AddDecisionTaskResponse{},
},
Expand Down Expand Up @@ -233,6 +235,7 @@ func TestClient_withResponse(t *testing.T) {
balancer.EXPECT().PickReadPartition(_testDomainUUID, types.TaskList{Name: _testTaskList}, persistence.TaskListTypeActivity, "").Return(_testPartition)
p.EXPECT().FromTaskList(_testPartition).Return("peer0", nil)
c.EXPECT().PollForActivityTask(gomock.Any(), gomock.Any(), []yarpc.CallOption{yarpc.WithShardKey("peer0")}).Return(&types.MatchingPollForActivityTaskResponse{}, nil)
mp.EXPECT().UpdatePartitionConfig(_testDomainUUID, types.TaskList{Name: _testTaskList}, persistence.TaskListTypeActivity, nil)
},
want: &types.MatchingPollForActivityTaskResponse{},
},
Expand Down Expand Up @@ -431,6 +434,74 @@ func TestClient_withResponse(t *testing.T) {
want: nil,
wantError: true,
},
{
name: "UpdateTaskListPartitionConfig",
op: func(c Client) (any, error) {
return c.UpdateTaskListPartitionConfig(context.Background(), testMatchingUpdateTaskListPartitionConfigRequest())
},
mock: func(p *MockPeerResolver, balancer *MockLoadBalancer, c *MockClient, mp *MockPartitionConfigProvider) {
p.EXPECT().FromTaskList(_testTaskList).Return("peer0", nil)
c.EXPECT().UpdateTaskListPartitionConfig(gomock.Any(), gomock.Any(), []yarpc.CallOption{yarpc.WithShardKey("peer0")}).Return(&types.MatchingUpdateTaskListPartitionConfigResponse{}, nil)
},
want: &types.MatchingUpdateTaskListPartitionConfigResponse{},
},
{
name: "UpdateTaskListPartitionConfig - Error in resolving peer",
op: func(c Client) (any, error) {
return c.UpdateTaskListPartitionConfig(context.Background(), testMatchingUpdateTaskListPartitionConfigRequest())
},
mock: func(p *MockPeerResolver, balancer *MockLoadBalancer, c *MockClient, mp *MockPartitionConfigProvider) {
p.EXPECT().FromTaskList(_testTaskList).Return("peer0", assert.AnError)
},
want: nil,
wantError: true,
},
{
name: "UpdateTaskListPartitionConfig - Error while listing tasklist partitions",
op: func(c Client) (any, error) {
return c.UpdateTaskListPartitionConfig(context.Background(), testMatchingUpdateTaskListPartitionConfigRequest())
},
mock: func(p *MockPeerResolver, balancer *MockLoadBalancer, c *MockClient, mp *MockPartitionConfigProvider) {
p.EXPECT().FromTaskList(_testTaskList).Return("peer0", nil)
c.EXPECT().UpdateTaskListPartitionConfig(gomock.Any(), gomock.Any(), []yarpc.CallOption{yarpc.WithShardKey("peer0")}).Return(nil, assert.AnError)
},
want: nil,
wantError: true,
},
{
name: "RefreshTaskListPartitionConfig",
op: func(c Client) (any, error) {
return c.RefreshTaskListPartitionConfig(context.Background(), testMatchingRefreshTaskListPartitionConfigRequest())
},
mock: func(p *MockPeerResolver, balancer *MockLoadBalancer, c *MockClient, mp *MockPartitionConfigProvider) {
p.EXPECT().FromTaskList(_testTaskList).Return("peer0", nil)
c.EXPECT().RefreshTaskListPartitionConfig(gomock.Any(), gomock.Any(), []yarpc.CallOption{yarpc.WithShardKey("peer0")}).Return(&types.MatchingRefreshTaskListPartitionConfigResponse{}, nil)
},
want: &types.MatchingRefreshTaskListPartitionConfigResponse{},
},
{
name: "RefreshTaskListPartitionConfig - Error in resolving peer",
op: func(c Client) (any, error) {
return c.RefreshTaskListPartitionConfig(context.Background(), testMatchingRefreshTaskListPartitionConfigRequest())
},
mock: func(p *MockPeerResolver, balancer *MockLoadBalancer, c *MockClient, mp *MockPartitionConfigProvider) {
p.EXPECT().FromTaskList(_testTaskList).Return("peer0", assert.AnError)
},
want: nil,
wantError: true,
},
{
name: "RefreshTaskListPartitionConfig - Error while listing tasklist partitions",
op: func(c Client) (any, error) {
return c.RefreshTaskListPartitionConfig(context.Background(), testMatchingRefreshTaskListPartitionConfigRequest())
},
mock: func(p *MockPeerResolver, balancer *MockLoadBalancer, c *MockClient, mp *MockPartitionConfigProvider) {
p.EXPECT().FromTaskList(_testTaskList).Return("peer0", nil)
c.EXPECT().RefreshTaskListPartitionConfig(gomock.Any(), gomock.Any(), []yarpc.CallOption{yarpc.WithShardKey("peer0")}).Return(nil, assert.AnError)
},
want: nil,
wantError: true,
},
}
for _, tt := range tests {
tt := tt
Expand Down Expand Up @@ -526,3 +597,27 @@ func testGetTaskListsByDomainRequest() *types.GetTaskListsByDomainRequest {
Domain: _testDomain,
}
}

func testMatchingUpdateTaskListPartitionConfigRequest() *types.MatchingUpdateTaskListPartitionConfigRequest {
return &types.MatchingUpdateTaskListPartitionConfigRequest{
DomainUUID: _testDomainUUID,
TaskList: &types.TaskList{Name: _testTaskList},
PartitionConfig: &types.TaskListPartitionConfig{
Version: 1,
NumReadPartitions: 3,
NumWritePartitions: 2,
},
}
}

func testMatchingRefreshTaskListPartitionConfigRequest() *types.MatchingRefreshTaskListPartitionConfigRequest {
return &types.MatchingRefreshTaskListPartitionConfigRequest{
DomainUUID: _testDomainUUID,
TaskList: &types.TaskList{Name: _testTaskList},
PartitionConfig: &types.TaskListPartitionConfig{
Version: 1,
NumReadPartitions: 3,
NumWritePartitions: 2,
},
}
}
2 changes: 2 additions & 0 deletions client/matching/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,6 @@ type Client interface {
PollForDecisionTask(context.Context, *types.MatchingPollForDecisionTaskRequest, ...yarpc.CallOption) (*types.MatchingPollForDecisionTaskResponse, error)
QueryWorkflow(context.Context, *types.MatchingQueryWorkflowRequest, ...yarpc.CallOption) (*types.QueryWorkflowResponse, error)
RespondQueryTaskCompleted(context.Context, *types.MatchingRespondQueryTaskCompletedRequest, ...yarpc.CallOption) error
UpdateTaskListPartitionConfig(context.Context, *types.MatchingUpdateTaskListPartitionConfigRequest, ...yarpc.CallOption) (*types.MatchingUpdateTaskListPartitionConfigResponse, error)
RefreshTaskListPartitionConfig(context.Context, *types.MatchingRefreshTaskListPartitionConfigRequest, ...yarpc.CallOption) (*types.MatchingRefreshTaskListPartitionConfigResponse, error)
}
40 changes: 40 additions & 0 deletions client/matching/interface_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

31 changes: 29 additions & 2 deletions client/matching/partition_config_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ import (
"github.com/uber/cadence/common/dynamicconfig"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
"github.com/uber/cadence/common/metrics"
"github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/common/types"
)

Expand All @@ -55,6 +57,7 @@ type (
partitionConfigProviderImpl struct {
configCache cache.Cache
logger log.Logger
metricsClient metrics.Client
domainIDToName func(string) (string, error)
enableReadFromCache dynamicconfig.BoolPropertyFnWithTaskListInfoFilters
nReadPartitions dynamicconfig.IntPropertyFnWithTaskListInfoFilters
Expand All @@ -74,11 +77,13 @@ func (c *syncedTaskListPartitionConfig) updateConfig(newConfig types.TaskListPar

func NewPartitionConfigProvider(
logger log.Logger,
metricsClient metrics.Client,
domainIDToName func(string) (string, error),
dc *dynamicconfig.Collection,
) PartitionConfigProvider {
return &partitionConfigProviderImpl{
logger: logger,
metricsClient: metricsClient,
domainIDToName: domainIDToName,
enableReadFromCache: dc.GetBoolPropertyFilteredByTaskListInfo(dynamicconfig.MatchingEnableGetNumberOfPartitionsFromCache),
nReadPartitions: dc.GetIntPropertyFilteredByTaskListInfo(dynamicconfig.MatchingNumTasklistReadPartitions),
Expand Down Expand Up @@ -106,8 +111,15 @@ func (p *partitionConfigProviderImpl) GetNumberOfReadPartitions(domainID string,
return 1
}
c.RLock()
defer c.RUnlock()
return int(c.NumReadPartitions)
v := c.Version
w := c.NumWritePartitions
r := c.NumReadPartitions
c.RUnlock()
scope := p.metricsClient.Scope(metrics.PartitionConfigProviderScope, metrics.DomainTag(domainName), metrics.TaskListRootPartitionTag(taskList.GetName()), getTaskListTypeTag(taskListType))
scope.UpdateGauge(metrics.TaskListPartitionConfigNumReadGauge, float64(r))
scope.UpdateGauge(metrics.TaskListPartitionConfigNumWriteGauge, float64(w))
scope.UpdateGauge(metrics.TaskListPartitionConfigVersionGauge, float64(v))
return int(r)
}

func (p *partitionConfigProviderImpl) GetNumberOfWritePartitions(domainID string, taskList types.TaskList, taskListType int) int {
Expand All @@ -133,6 +145,10 @@ func (p *partitionConfigProviderImpl) GetNumberOfWritePartitions(domainID string
w := c.NumWritePartitions
r := c.NumReadPartitions
c.RUnlock()
scope := p.metricsClient.Scope(metrics.PartitionConfigProviderScope, metrics.DomainTag(domainName), metrics.TaskListRootPartitionTag(taskList.GetName()), getTaskListTypeTag(taskListType))
scope.UpdateGauge(metrics.TaskListPartitionConfigNumReadGauge, float64(r))
scope.UpdateGauge(metrics.TaskListPartitionConfigNumWriteGauge, float64(w))
scope.UpdateGauge(metrics.TaskListPartitionConfigVersionGauge, float64(v))
if w > r {
p.logger.Warn("Number of write partitions exceeds number of read partitions, using number of read partitions", tag.WorkflowDomainID(domainID), tag.WorkflowTaskListName(taskList.GetName()), tag.WorkflowTaskListType(taskListType), tag.Dynamic("read-partition", r), tag.Dynamic("write-partition", w), tag.Dynamic("config-version", v))
return int(r)
Expand Down Expand Up @@ -188,3 +204,14 @@ func (p *partitionConfigProviderImpl) getPartitionConfig(domainID string, taskLi
}
return c
}

func getTaskListTypeTag(taskListType int) metrics.Tag {
switch taskListType {
case persistence.TaskListTypeActivity:
return metrics.TaskListTypeTag("activity")
case persistence.TaskListTypeDecision:
return metrics.TaskListTypeTag("decision")
default:
return metrics.TaskListTypeTag("")
}
}
Loading

0 comments on commit 6f6249b

Please sign in to comment.