Skip to content

Commit

Permalink
Revert "extend and improve hasSlotsInSync unit test"
Browse files Browse the repository at this point in the history
This reverts commit e4f00b7.
  • Loading branch information
FxKu committed Aug 14, 2024
1 parent dc4d170 commit 657e3e0
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 143 deletions.
6 changes: 2 additions & 4 deletions pkg/cluster/streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,7 @@ func (c *Cluster) syncStreams() error {
// there will be a separate event stream resource for each ID
appIds := getDistinctApplicationIds(c.Spec.Streams)
for _, appId := range appIds {
if c.hasSlotsInSync(appId, databaseSlots, slotsToSync) {
if hasSlotsInSync(appId, databaseSlots, slotsToSync) {
if err = c.syncStream(appId); err != nil {
c.logger.Warningf("could not sync event streams with applicationId %s: %v", appId, err)
}
Expand All @@ -410,15 +410,13 @@ func (c *Cluster) syncStreams() error {
return nil
}

func (c *Cluster) hasSlotsInSync(appId string, databaseSlots map[string]map[string]zalandov1.Slot, slotsToSync map[string]map[string]string) bool {
func hasSlotsInSync(appId string, databaseSlots map[string]map[string]zalandov1.Slot, slotsToSync map[string]map[string]string) bool {
allSlotsInSync := true
for dbName, slots := range databaseSlots {
for slotName := range slots {
if slotName == getSlotName(dbName, appId) {
if _, exists := slotsToSync[slotName]; !exists {
allSlotsInSync = false
c.logger.Warnf("replication slot %q for applicationId %s not found in database", slotName, appId)
continue
}
}
}
Expand Down
145 changes: 6 additions & 139 deletions pkg/cluster/streams_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,44 +193,15 @@ func TestGatherApplicationIds(t *testing.T) {
}

func TestHasSlotsInSync(t *testing.T) {
client, _ := newFakeK8sStreamClient()
var cluster = New(
Config{
OpConfig: config.Config{
Auth: config.Auth{
SecretNameTemplate: "{username}.{cluster}.credentials.{tprkind}.{tprgroup}",
},
PodManagementPolicy: "ordered_ready",
Resources: config.Resources{
ClusterLabels: map[string]string{"application": "spilo"},
ClusterNameLabel: "cluster-name",
DefaultCPURequest: "300m",
DefaultCPULimit: "300m",
DefaultMemoryRequest: "300Mi",
DefaultMemoryLimit: "300Mi",
PodRoleLabel: "spilo-role",
},
},
}, client, pg, logger, eventRecorder)

cluster.Name = clusterName
cluster.Namespace = namespace

appId2 := fmt.Sprintf("%s-2", appId)
dbNotExists := "dbnotexists"
slotNotExists := fmt.Sprintf("%s_%s_%s", constants.EventStreamSourceSlotPrefix, dbNotExists, strings.Replace(appId, "-", "_", -1))
slotNotExistsAppId2 := fmt.Sprintf("%s_%s_%s", constants.EventStreamSourceSlotPrefix, dbNotExists, strings.Replace(appId2, "-", "_", -1))

tests := []struct {
subTest string
applicationId string
expectedSlots map[string]map[string]zalandov1.Slot
actualSlots map[string]map[string]string
slotsInSync bool
}{
{
subTest: fmt.Sprintf("slots in sync for applicationId %s", appId),
applicationId: appId,
subTest: "slots are in sync",
expectedSlots: map[string]map[string]zalandov1.Slot{
dbName: {
slotName: zalandov1.Slot{
Expand All @@ -256,70 +227,7 @@ func TestHasSlotsInSync(t *testing.T) {
},
slotsInSync: true,
}, {
subTest: fmt.Sprintf("slots empty for applicationId %s", appId),
applicationId: appId,
expectedSlots: map[string]map[string]zalandov1.Slot{
dbNotExists: {
slotNotExists: zalandov1.Slot{
Slot: map[string]string{
"databases": dbName,
"plugin": constants.EventStreamSourcePluginType,
"type": "logical",
},
Publication: map[string]acidv1.StreamTable{
"test1": acidv1.StreamTable{
EventType: "stream-type-a",
},
},
},
},
},
actualSlots: map[string]map[string]string{},
slotsInSync: false,
}, {
subTest: fmt.Sprintf("one slot not in sync for applicationId %s", appId),
applicationId: appId,
expectedSlots: map[string]map[string]zalandov1.Slot{
dbName: {
slotName: zalandov1.Slot{
Slot: map[string]string{
"databases": dbName,
"plugin": constants.EventStreamSourcePluginType,
"type": "logical",
},
Publication: map[string]acidv1.StreamTable{
"test1": acidv1.StreamTable{
EventType: "stream-type-a",
},
},
},
},
dbNotExists: {
slotNotExists: zalandov1.Slot{
Slot: map[string]string{
"databases": "dbnotexists",
"plugin": constants.EventStreamSourcePluginType,
"type": "logical",
},
Publication: map[string]acidv1.StreamTable{
"test2": acidv1.StreamTable{
EventType: "stream-type-b",
},
},
},
},
},
actualSlots: map[string]map[string]string{
slotName: map[string]string{
"databases": dbName,
"plugin": constants.EventStreamSourcePluginType,
"type": "logical",
},
},
slotsInSync: false,
}, {
subTest: fmt.Sprintf("slots in sync for applicationId %s, but not for for %s - checking %s", appId, appId2, appId),
applicationId: appId,
subTest: "slots are not in sync",
expectedSlots: map[string]map[string]zalandov1.Slot{
dbName: {
slotName: zalandov1.Slot{
Expand All @@ -335,49 +243,8 @@ func TestHasSlotsInSync(t *testing.T) {
},
},
},
dbNotExists: {
slotNotExistsAppId2: zalandov1.Slot{
Slot: map[string]string{
"databases": "dbnotexists",
"plugin": constants.EventStreamSourcePluginType,
"type": "logical",
},
Publication: map[string]acidv1.StreamTable{
"test2": acidv1.StreamTable{
EventType: "stream-type-b",
},
},
},
},
},
actualSlots: map[string]map[string]string{
slotName: map[string]string{
"databases": dbName,
"plugin": constants.EventStreamSourcePluginType,
"type": "logical",
},
},
slotsInSync: true,
}, {
subTest: fmt.Sprintf("slots in sync for applicationId %s, but not for for %s - checking %s", appId, appId2, appId2),
applicationId: appId2,
expectedSlots: map[string]map[string]zalandov1.Slot{
dbName: {
"dbnotexists": {
slotName: zalandov1.Slot{
Slot: map[string]string{
"databases": dbName,
"plugin": constants.EventStreamSourcePluginType,
"type": "logical",
},
Publication: map[string]acidv1.StreamTable{
"test1": acidv1.StreamTable{
EventType: "stream-type-a",
},
},
},
},
dbNotExists: {
slotNotExistsAppId2: zalandov1.Slot{
Slot: map[string]string{
"databases": "dbnotexists",
"plugin": constants.EventStreamSourcePluginType,
Expand All @@ -403,9 +270,9 @@ func TestHasSlotsInSync(t *testing.T) {
}

for _, tt := range tests {
result := cluster.hasSlotsInSync(tt.applicationId, tt.expectedSlots, tt.actualSlots)
if result != tt.slotsInSync {
t.Errorf("%s: unexpected result for slot test of applicationId: %v, expected slots %#v, actual slots %#v", tt.subTest, tt.applicationId, tt.expectedSlots, tt.actualSlots)
result := hasSlotsInSync(appId, tt.expectedSlots, tt.actualSlots)
if !result {
t.Errorf("slots are not in sync, expected %#v, got %#v", tt.expectedSlots, tt.actualSlots)
}
}
}
Expand Down

0 comments on commit 657e3e0

Please sign in to comment.