From 964c0bea94f7a0229827bcde19af258cdf9450be Mon Sep 17 00:00:00 2001 From: Evan Cordell Date: Mon, 7 Nov 2022 13:32:14 -0500 Subject: [PATCH] add file-based update source --- pkg/updates/file.go | 49 ++++++ pkg/updates/memory.go | 188 ++++++++++++++++++++ pkg/updates/memory_test.go | 350 +++++++++++++++++++++++++++++++++++++ pkg/updates/source.go | 27 +++ 4 files changed, 614 insertions(+) create mode 100644 pkg/updates/file.go create mode 100644 pkg/updates/memory.go create mode 100644 pkg/updates/memory_test.go create mode 100644 pkg/updates/source.go diff --git a/pkg/updates/file.go b/pkg/updates/file.go new file mode 100644 index 00000000..c3091f28 --- /dev/null +++ b/pkg/updates/file.go @@ -0,0 +1,49 @@ +package updates + +import ( + "fmt" + + "golang.org/x/exp/slices" +) + +type Channel struct { + Name string + Metadata map[string]string `json:"metadata,omitempty"` + Edges EdgeSet `json:"edges,omitempty"` + Nodes []State `json:"nodes,omitempty"` +} + +type State struct { + ID string `json:"id"` + Tag string `json:"tag,omitempty"` + Migration string `json:"migration,omitempty"` + Phase string `json:"phase,omitempty"` + Digest string `json:"digest,omitempty"` +} + +// UpdateGraph holds a graph of required update edges +type UpdateGraph struct { + Channels []Channel `json:"channels,omitempty"` +} + +func (g *UpdateGraph) SourceForDatastore(datastore string) (Source, error) { + for _, c := range g.Channels { + if c.Metadata["datastore"] == datastore { + return NewMemorySource(c.Nodes, c.Edges) + } + } + return nil, fmt.Errorf("no channel found for datastore %q", datastore) +} + +func (g *UpdateGraph) SourceForChannel(channel string) (Source, error) { + for _, c := range g.Channels { + if c.Name == channel { + return NewMemorySource(c.Nodes, c.Edges) + } + } + return nil, fmt.Errorf("no channel found with name %q", channel) +} + +func (g *UpdateGraph) Copy() UpdateGraph { + return UpdateGraph{Channels: slices.Clone(g.Channels)} +} diff --git a/pkg/updates/memory.go b/pkg/updates/memory.go new file mode 100644 index 00000000..108f92f7 --- /dev/null +++ b/pkg/updates/memory.go @@ -0,0 +1,188 @@ +package updates + +import ( + "fmt" + + "golang.org/x/exp/maps" +) + +// EdgeSet maps a node id to a list of node ids that it can update to +type EdgeSet map[string][]string + +// NodeSet maps a node id to an index in the OrderedNodes array +type NodeSet map[string]int + +// MemorySource is an in-memory implementation of Source. +// It's an oracle to answer update questions for an installed version. +type MemorySource struct { + // OrderedNodes is an ordered list of all nodes. Lower index == newer version. + OrderedNodes []State + // Nodes is a helper to lookup a node by id + Nodes NodeSet + // Edges contains the edgeset for this source. + Edges EdgeSet +} + +// Next returns the newest version that can be installed in one step. +func (m *MemorySource) Next(from string) string { + if edges, ok := m.Edges[from]; ok && len(edges) > 0 { + return edges[len(edges)-1] + } + return "" +} + +// NextDirect returns the newest version that can be directly installed without +// running any migrations. +func (m *MemorySource) NextDirect(from string) (found string) { + initial := m.OrderedNodes[m.Nodes[from]] + if to, ok := m.Edges[from]; ok && len(to) > 0 { + for _, n := range m.Edges[from] { + node := m.OrderedNodes[m.Nodes[n]] + + // if the phase and migration match the current node, no migrations + // are required + if initial.Phase == node.Phase && initial.Migration == node.Migration { + found = n + } else { + break + } + } + } + return found +} + +// Latest returns the newest version that can be installed. If different +// from `Next`, that means multiple steps are required (i.e. a multi-phase +// migration, or a required stopping point in a series of updates). +func (m *MemorySource) Latest(id string) string { + if len(m.OrderedNodes) == 0 || id == m.OrderedNodes[0].ID { + return "" + } + return m.OrderedNodes[0].ID +} + +func (m *MemorySource) State(id string) State { + index, ok := m.Nodes[id] + if !ok { + return State{} + } + return m.OrderedNodes[index] +} + +// TODO: consider *State instead of State +func (m *MemorySource) Source(to string) (Source, error) { + // copy the ordered node list from `to` onward + var index int + if len(to) > 0 { + index = m.Nodes[to] + } + orderedNodes := make([]State, len(m.OrderedNodes)-index) + copy(orderedNodes, m.OrderedNodes[index:len(m.OrderedNodes)]) + + nodeSet := make(map[string]int, len(orderedNodes)) + for i, n := range orderedNodes { + nodeSet[n.ID] = i + } + + edges := make(map[string][]string) + for from, to := range m.Edges { + // skip edges where from is not in the node set + if _, ok := nodeSet[from]; !ok { + continue + } + _, ok := edges[from] + if !ok { + edges[from] = make([]string, 0) + } + for _, n := range to { + // skip edges where to is not in the node set + if _, ok := nodeSet[n]; !ok { + continue + } + edges[from] = append(edges[from], n) + } + } + + return newMemorySourceFromValidatedNodes(nodeSet, edges, orderedNodes) +} + +func (m *MemorySource) Matches(tag, digest, migration, phase string) []string { + matches := make([]string, 0) + for _, n := range m.OrderedNodes { + if n.Tag == tag && n.Digest == digest && n.Migration == migration && n.Phase == phase { + matches = append(matches, n.ID) + continue + } + } + return matches +} + +func (m *MemorySource) validateAllNodesPathToHead() error { + head := m.OrderedNodes[0].ID + for _, n := range m.OrderedNodes { + if n.ID == head { + continue + } + visited := make(map[string]struct{}, 0) + // chasing next should lead to head + for next := m.Next(n.ID); next != head; next = m.Next(next) { + if _, ok := visited[next]; ok { + return fmt.Errorf("channel cycle detected: %v", append(maps.Keys(visited), next)) + } + if next == "" { + return fmt.Errorf("there is no path from %s to %s", n.ID, m.OrderedNodes[0].ID) + } + visited[next] = struct{}{} + } + } + return nil +} + +func NewMemorySource(nodes []State, edges EdgeSet) (Source, error) { + if len(nodes) == 0 || len(edges) == 0 { + return nil, fmt.Errorf("no edges or no nodes") + } + + nodeSet := make(map[string]int, len(nodes)) + for i, n := range nodes { + if _, ok := nodeSet[n.ID]; ok { + return nil, fmt.Errorf("more than one node with ID %s", n.ID) + } + nodeSet[n.ID] = i + } + + for from, toSet := range edges { + // ensure all edges reference nodes + if _, ok := nodeSet[from]; !ok { + return nil, fmt.Errorf("node list is missing node %s", from) + } + for _, to := range toSet { + if _, ok := nodeSet[to]; !ok { + return nil, fmt.Errorf("node list is missing node %s", to) + } + } + if len(toSet) != 0 { + continue + } + + // The only node with no updates should be the head of the channel + // i.e. the first thing in the ordered node list + if from != nodes[0].ID { + return nil, fmt.Errorf("%s has no outgoing edges, but it is not the head of the channel", from) + } + } + + return newMemorySourceFromValidatedNodes(nodeSet, edges, nodes) +} + +func newMemorySourceFromValidatedNodes(nodeSet map[string]int, edges map[string][]string, nodes []State) (Source, error) { + source := &MemorySource{Nodes: nodeSet, Edges: edges, OrderedNodes: nodes} + + if err := source.validateAllNodesPathToHead(); err != nil { + return nil, err + } + + // TODO: validate that the adjacency lists are in the same order as the node list + // so that we can always assume further down the list is newer + return source, nil +} diff --git a/pkg/updates/memory_test.go b/pkg/updates/memory_test.go new file mode 100644 index 00000000..0a97c4e4 --- /dev/null +++ b/pkg/updates/memory_test.go @@ -0,0 +1,350 @@ +package updates + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestMemorySource(t *testing.T) { + type want struct { + latest, next, nextDirect string + } + tests := []struct { + name string + OrderedNodes []State + Edges EdgeSet + expectedForSubgraphWithHead map[string]map[string]want + expectedForID map[string]want + newErr string + }{ + { + name: "not all entries have paths to head", + OrderedNodes: []State{ + {ID: "v4", Tag: "tag", Migration: "migration", Phase: ""}, + {ID: "v3", Tag: "tag", Migration: "migration", Phase: ""}, + {ID: "v2", Tag: "tag", Migration: "migration", Phase: ""}, + {ID: "v1", Tag: "tag", Migration: "migration", Phase: ""}, + }, + Edges: map[string][]string{ + "v1": {"v2", "v3", "v4"}, + "v2": {"v3", "v4"}, + }, + newErr: "there is no path from v3 to v4", + }, + { + name: "cycle in channel", + OrderedNodes: []State{ + {ID: "v4", Tag: "tag", Migration: "migration", Phase: ""}, + {ID: "v3", Tag: "tag", Migration: "migration", Phase: ""}, + {ID: "v2", Tag: "tag", Migration: "migration", Phase: ""}, + {ID: "v1", Tag: "tag", Migration: "migration", Phase: ""}, + }, + Edges: map[string][]string{ + "v1": {"v2", "v3"}, + "v2": {"v3"}, + "v3": {"v2"}, + }, + newErr: "channel cycle detected", + }, + { + name: "latest is v4, no required steps", + OrderedNodes: []State{ + {ID: "v4", Tag: "tag", Migration: "migration", Phase: ""}, + {ID: "v3", Tag: "tag", Migration: "migration", Phase: ""}, + {ID: "v2", Tag: "tag", Migration: "migration", Phase: ""}, + {ID: "v1", Tag: "tag", Migration: "migration", Phase: ""}, + }, + Edges: map[string][]string{ + "v1": {"v2", "v3", "v4"}, + "v2": {"v3", "v4"}, + "v3": {"v4"}, + }, + expectedForID: map[string]want{ + "v1": {next: "v4", nextDirect: "v4", latest: "v4"}, + "v2": {next: "v4", nextDirect: "v4", latest: "v4"}, + "v3": {next: "v4", nextDirect: "v4", latest: "v4"}, + "": {next: "", nextDirect: "", latest: "v4"}, + }, + expectedForSubgraphWithHead: map[string]map[string]want{ + "v3": { + "v1": {next: "v3", nextDirect: "v3", latest: "v3"}, + "v2": {next: "v3", nextDirect: "v3", latest: "v3"}, + "v3": {next: "", nextDirect: "", latest: ""}, + "": {next: "", nextDirect: "", latest: "v3"}, + }, + "v2": { + "v1": {next: "v2", nextDirect: "v2", latest: "v2"}, + "v2": {next: "", nextDirect: "", latest: ""}, + "": {next: "", nextDirect: "", latest: "v2"}, + }, + "v1": { + "v1": {next: "", nextDirect: "", latest: ""}, + "": {next: "", nextDirect: "", latest: "v1"}, + }, + }, + }, + { + name: "latest is v4, required step at v3", + OrderedNodes: []State{ + {ID: "v4", Tag: "tag", Migration: "migration", Phase: ""}, + {ID: "v3", Tag: "tag", Migration: "migration", Phase: ""}, + {ID: "v2", Tag: "tag", Migration: "migration", Phase: ""}, + {ID: "v1", Tag: "tag", Migration: "migration", Phase: ""}, + }, + Edges: map[string][]string{ + "v1": {"v2", "v3"}, + "v2": {"v3"}, + "v3": {"v4"}, + }, + expectedForID: map[string]want{ + "v1": {next: "v3", nextDirect: "v3", latest: "v4"}, + "v2": {next: "v3", nextDirect: "v3", latest: "v4"}, + "v3": {next: "v4", nextDirect: "v4", latest: "v4"}, + }, + expectedForSubgraphWithHead: map[string]map[string]want{ + "v3": { + "v1": {next: "v3", nextDirect: "v3", latest: "v3"}, + "v2": {next: "v3", nextDirect: "v3", latest: "v3"}, + "v3": {next: "", nextDirect: "", latest: ""}, + "": {next: "", nextDirect: "", latest: "v3"}, + }, + "v2": { + "v1": {next: "v2", nextDirect: "v2", latest: "v2"}, + "v2": {next: "", nextDirect: "", latest: ""}, + "": {next: "", nextDirect: "", latest: "v2"}, + }, + "v1": { + "v1": {next: "", nextDirect: "", latest: ""}, + "": {next: "", nextDirect: "", latest: "v1"}, + }, + }, + }, + { + name: "latest is v4, multiple required steps", + OrderedNodes: []State{ + {ID: "v4", Tag: "tag", Migration: "migration", Phase: ""}, + {ID: "v3", Tag: "tag", Migration: "migration", Phase: ""}, + {ID: "v2", Tag: "tag", Migration: "migration", Phase: ""}, + {ID: "v1", Tag: "tag", Migration: "migration", Phase: ""}, + }, + Edges: map[string][]string{ + "v1": {"v2"}, + "v2": {"v3"}, + "v3": {"v4"}, + }, + expectedForID: map[string]want{ + "v1": {next: "v2", nextDirect: "v2", latest: "v4"}, + "v2": {next: "v3", nextDirect: "v3", latest: "v4"}, + "v3": {next: "v4", nextDirect: "v4", latest: "v4"}, + }, + expectedForSubgraphWithHead: map[string]map[string]want{ + "v3": { + "v1": {next: "v2", nextDirect: "v2", latest: "v3"}, + "v2": {next: "v3", nextDirect: "v3", latest: "v3"}, + "v3": {next: "", nextDirect: "", latest: ""}, + }, + "v2": { + "v1": {next: "v2", nextDirect: "v2", latest: "v2"}, + "v2": {next: "", nextDirect: "", latest: ""}, + }, + "v1": { + "v1": {next: "", nextDirect: "", latest: ""}, + }, + }, + }, + { + name: "required migration in list", + OrderedNodes: []State{ + {ID: "v4", Tag: "tag", Migration: "migration3", Phase: "phase3"}, + {ID: "v3", Tag: "tag", Migration: "migration2", Phase: "phase2"}, + {ID: "v2", Tag: "tag", Migration: "migration2", Phase: "phase2"}, + {ID: "v1", Tag: "tag", Migration: "migration", Phase: ""}, + }, + Edges: map[string][]string{ + "v1": {"v2"}, + "v2": {"v3"}, + "v3": {"v4"}, + }, + expectedForID: map[string]want{ + "v1": {next: "v2", nextDirect: "", latest: "v4"}, + "v2": {next: "v3", nextDirect: "v3", latest: "v4"}, + "v3": {next: "v4", nextDirect: "", latest: "v4"}, + "v4": {next: "", nextDirect: "", latest: ""}, + }, + expectedForSubgraphWithHead: map[string]map[string]want{ + "v3": { + "v1": {next: "v2", nextDirect: "", latest: "v3"}, + "v2": {next: "v3", nextDirect: "v3", latest: "v3"}, + "v3": {next: "", nextDirect: "", latest: ""}, + }, + "v2": { + "v1": {next: "v2", nextDirect: "", latest: "v2"}, + "v2": {next: "", nextDirect: "", latest: ""}, + }, + "v1": { + "v1": {next: "", nextDirect: "", latest: ""}, + }, + }, + }, + { + name: "latest is v5, required step at v4, migration at v3", + OrderedNodes: []State{ + {ID: "v5", Tag: "tag", Migration: "migration2", Phase: ""}, + {ID: "v4", Tag: "tag", Migration: "migration2", Phase: "phase2"}, + {ID: "v3", Tag: "tag", Migration: "migration2", Phase: "phase1"}, + {ID: "v2", Tag: "tag", Migration: "migration", Phase: ""}, + {ID: "v1", Tag: "tag", Migration: "migration", Phase: ""}, + }, + Edges: map[string][]string{ + "v1": {"v2", "v3", "v4"}, + "v2": {"v3", "v4"}, + "v3": {"v4"}, + "v4": {"v5"}, + }, + expectedForID: map[string]want{ + "v1": {next: "v4", nextDirect: "v2", latest: "v5"}, + "v2": {next: "v4", nextDirect: "", latest: "v5"}, + "v3": {next: "v4", nextDirect: "", latest: "v5"}, + }, + expectedForSubgraphWithHead: map[string]map[string]want{ + "v4": { + "v1": {next: "v4", nextDirect: "v2", latest: "v4"}, + "v2": {next: "v4", nextDirect: "", latest: "v4"}, + "v3": {next: "v4", nextDirect: "", latest: "v4"}, + "v4": {next: "", nextDirect: "", latest: ""}, + }, + "v3": { + "v1": {next: "v3", nextDirect: "v2", latest: "v3"}, + "v2": {next: "v3", nextDirect: "", latest: "v3"}, + "v3": {next: "", nextDirect: "", latest: ""}, + }, + "v2": { + "v1": {next: "v2", nextDirect: "v2", latest: "v2"}, + "v2": {next: "", nextDirect: "", latest: ""}, + }, + "v1": { + "v1": {next: "", nextDirect: "", latest: ""}, + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + m, err := NewMemorySource(tt.OrderedNodes, tt.Edges) + if err != nil { + require.Contains(t, err.Error(), tt.newErr) + } + for id, want := range tt.expectedForID { + t.Run(fmt.Sprintf("Next(%s)", id), func(t *testing.T) { + require.Equal(t, want.next, m.Next(id)) + }) + t.Run(fmt.Sprintf("NextDirect(%s)", id), func(t *testing.T) { + require.Equal(t, want.nextDirect, m.NextDirect(id)) + }) + t.Run(fmt.Sprintf("Latest(%s)", id), func(t *testing.T) { + require.Equal(t, want.latest, m.Latest(id)) + }) + } + for newHead, expected := range tt.expectedForSubgraphWithHead { + s, err := m.Source(newHead) + require.NoError(t, err) + for id, want := range expected { + t.Run(fmt.Sprintf("head=%s,Next(%s)", newHead, id), func(t *testing.T) { + require.Equal(t, want.next, s.Next(id)) + }) + t.Run(fmt.Sprintf("head=%s,NextDirect(%s)", newHead, id), func(t *testing.T) { + require.Equal(t, want.nextDirect, s.NextDirect(id)) + }) + t.Run(fmt.Sprintf("head=%s,Latest(%s)", newHead, id), func(t *testing.T) { + require.Equal(t, want.latest, s.Latest(id)) + }) + } + } + }) + } +} + +func TestMemorySourceState(t *testing.T) { + type fields struct { + OrderedNodes []State + Edges EdgeSet + } + tests := []struct { + name string + fields fields + id string + want State + wantErr string + }{ + { + name: "no nodes", + fields: fields{ + Edges: map[string][]string{ + "1": {"2"}, + }, + }, + wantErr: "no edges or no nodes", + }, + { + name: "no edges", + fields: fields{ + OrderedNodes: []State{ + {ID: "1"}, + }, + }, + wantErr: "no edges or no nodes", + }, + { + name: "edge not in list", + fields: fields{ + OrderedNodes: []State{ + {ID: "2"}, + {ID: "1"}, + }, + Edges: map[string][]string{ + "1": {"3"}, + }, + }, + wantErr: "node list is missing node 3", + }, + { + name: "found", + fields: fields{ + OrderedNodes: []State{ + {ID: "2"}, + {ID: "1"}, + }, + Edges: map[string][]string{ + "1": {"2"}, + }, + }, + id: "1", + want: State{ID: "1"}, + }, + { + name: "missing", + fields: fields{ + OrderedNodes: []State{ + {ID: "2"}, + {ID: "1"}, + }, + Edges: map[string][]string{ + "1": {"2"}, + }, + }, + id: "3", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + m, err := NewMemorySource(tt.fields.OrderedNodes, tt.fields.Edges) + if err != nil { + require.EqualError(t, err, tt.wantErr) + return + } + require.Equal(t, tt.want, m.State(tt.id)) + }) + } +} diff --git a/pkg/updates/source.go b/pkg/updates/source.go new file mode 100644 index 00000000..d67195a7 --- /dev/null +++ b/pkg/updates/source.go @@ -0,0 +1,27 @@ +package updates + +const Head = "" + +type Source interface { + // NextDirect returns the newest version that has an edge and has + // no required migrations + NextDirect(from string) string + + // Next returns the newest version that has an edge (but may include + // migrations) + Next(from string) string + + // Latest returns the newest version that has some path through the graph + Latest(from string) string + + // State returns information about the node, + // i.e. what image and migration to run + State(id string) State + + // Source returns a new source that is a subgraph of the current source, + // but where `head` is set to `to` + Source(to string) (Source, error) + + // Matches returns any ids that match the query + Matches(tag, digest, migration, phase string) []string +}