Skip to content

Commit

Permalink
fix sync streams and add diffs for annotations and owner references (#…
Browse files Browse the repository at this point in the history
…2728)

* extend and improve hasSlotsInSync unit test
* fix sync streams and add diffs for annotations and owner references
* incl. current annotations as desired where we do not fully control them
* added one more unit test and fixed sub test names
* pass maintenance windows to function and update unit test
  • Loading branch information
FxKu authored Aug 14, 2024
1 parent aad03f7 commit c7ee34e
Show file tree
Hide file tree
Showing 6 changed files with 268 additions and 96 deletions.
2 changes: 1 addition & 1 deletion pkg/cluster/majorversionupgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func (c *Cluster) majorVersionUpgrade() error {
return nil
}

if !c.isInMainternanceWindow() {
if !isInMainternanceWindow(c.Spec.MaintenanceWindows) {
c.logger.Infof("skipping major version upgrade, not in maintenance window")
return nil
}
Expand Down
56 changes: 46 additions & 10 deletions pkg/cluster/streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,8 @@ func (c *Cluster) syncPublication(dbName string, databaseSlotsList map[string]za
createPublications[slotName] = tableList
} else if currentTables != tableList {
alterPublications[slotName] = tableList
} else {
(*slotsToSync)[slotName] = slotAndPublication.Slot
}
}

Expand All @@ -142,30 +144,34 @@ func (c *Cluster) syncPublication(dbName string, databaseSlotsList map[string]za
return nil
}

var errorMessage error = nil
errors := make([]string, 0)
for publicationName, tables := range createPublications {
if err = c.executeCreatePublication(publicationName, tables); err != nil {
errorMessage = fmt.Errorf("creation of publication %q failed: %v", publicationName, err)
errors = append(errors, fmt.Sprintf("creation of publication %q failed: %v", publicationName, err))
continue
}
(*slotsToSync)[publicationName] = databaseSlotsList[publicationName].Slot
}
for publicationName, tables := range alterPublications {
if err = c.executeAlterPublication(publicationName, tables); err != nil {
errorMessage = fmt.Errorf("update of publication %q failed: %v", publicationName, err)
errors = append(errors, fmt.Sprintf("update of publication %q failed: %v", publicationName, err))
continue
}
(*slotsToSync)[publicationName] = databaseSlotsList[publicationName].Slot
}
for _, publicationName := range deletePublications {
if err = c.executeDropPublication(publicationName); err != nil {
errorMessage = fmt.Errorf("deletion of publication %q failed: %v", publicationName, err)
errors = append(errors, fmt.Sprintf("deletion of publication %q failed: %v", publicationName, err))
continue
}
(*slotsToSync)[publicationName] = nil
}

return errorMessage
if len(errors) > 0 {
return fmt.Errorf("%v", strings.Join(errors, `', '`))
}

return nil
}

func (c *Cluster) generateFabricEventStream(appId string) *zalandov1.FabricEventStream {
Expand Down Expand Up @@ -370,7 +376,7 @@ func (c *Cluster) syncStreams() error {
for dbName, databaseSlotsList := range databaseSlots {
err := c.syncPublication(dbName, databaseSlotsList, &slotsToSync)
if err != nil {
c.logger.Warningf("could not sync publications in database %q: %v", dbName, err)
c.logger.Warningf("could not sync all publications in database %q: %v", dbName, err)
continue
}
}
Expand Down Expand Up @@ -398,7 +404,7 @@ func (c *Cluster) syncStreams() error {
c.logger.Warningf("could not sync event streams with applicationId %s: %v", appId, err)
}
} else {
c.logger.Warningf("database replication slots for streams with applicationId %s not in sync, skipping event stream sync", appId)
c.logger.Warningf("database replication slots %#v for streams with applicationId %s not in sync, skipping event stream sync", slotsToSync, appId)
}
}

Expand All @@ -415,8 +421,9 @@ func hasSlotsInSync(appId string, databaseSlots map[string]map[string]zalandov1.
for dbName, slots := range databaseSlots {
for slotName := range slots {
if slotName == getSlotName(dbName, appId) {
if _, exists := slotsToSync[slotName]; !exists {
if slot, exists := slotsToSync[slotName]; !exists || slot == nil {
allSlotsInSync = false
continue
}
}
}
Expand All @@ -432,7 +439,17 @@ func (c *Cluster) syncStream(appId string) error {
if appId == stream.Spec.ApplicationId {
streamExists = true
desiredStreams := c.generateFabricEventStream(appId)
if match, reason := sameStreams(stream.Spec.EventStreams, desiredStreams.Spec.EventStreams); !match {
if !reflect.DeepEqual(stream.ObjectMeta.OwnerReferences, desiredStreams.ObjectMeta.OwnerReferences) {
c.logger.Infof("owner references of event streams with applicationId %s do not match the current ones", appId)
stream.ObjectMeta.OwnerReferences = desiredStreams.ObjectMeta.OwnerReferences
c.setProcessName("updating event streams with applicationId %s", appId)
stream, err := c.KubeClient.FabricEventStreams(stream.Namespace).Update(context.TODO(), stream, metav1.UpdateOptions{})
if err != nil {
return fmt.Errorf("could not update event streams with applicationId %s: %v", appId, err)
}
c.Streams[appId] = stream
}
if match, reason := c.compareStreams(stream, desiredStreams); !match {
c.logger.Debugf("updating event streams with applicationId %s: %s", appId, reason)
desiredStreams.ObjectMeta = stream.ObjectMeta
updatedStream, err := c.updateStreams(desiredStreams)
Expand All @@ -459,7 +476,26 @@ func (c *Cluster) syncStream(appId string) error {
return nil
}

func sameStreams(curEventStreams, newEventStreams []zalandov1.EventStream) (match bool, reason string) {
func (c *Cluster) compareStreams(curEventStreams, newEventStreams *zalandov1.FabricEventStream) (match bool, reason string) {
reasons := make([]string, 0)
match = true

// stream operator can add extra annotations so incl. current annotations in desired annotations
desiredAnnotations := c.annotationsSet(curEventStreams.Annotations)
if changed, reason := c.compareAnnotations(curEventStreams.ObjectMeta.Annotations, desiredAnnotations); changed {
match = false
reasons = append(reasons, fmt.Sprintf("new streams annotations do not match: %s", reason))
}

if changed, reason := sameEventStreams(curEventStreams.Spec.EventStreams, newEventStreams.Spec.EventStreams); !changed {
match = false
reasons = append(reasons, fmt.Sprintf("new streams EventStreams array does not match : %s", reason))
}

return match, strings.Join(reasons, ", ")
}

func sameEventStreams(curEventStreams, newEventStreams []zalandov1.EventStream) (match bool, reason string) {
if len(newEventStreams) != len(curEventStreams) {
return false, "number of defined streams is different"
}
Expand Down
Loading

0 comments on commit c7ee34e

Please sign in to comment.