Skip to content
This repository has been archived by the owner on Aug 29, 2023. It is now read-only.

Commit

Permalink
Merge pull request #43 from weaveworks/event_storage
Browse files Browse the repository at this point in the history
Remove old EventStorage interface, rename WatchStorage to EventStorage
  • Loading branch information
luxas authored Aug 21, 2020
2 parents a47e40f + 1fe533b commit 91096ca
Show file tree
Hide file tree
Showing 9 changed files with 85 additions and 89 deletions.
7 changes: 6 additions & 1 deletion .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
name: Continuous Integration
on: [push, pull_request]

on:
pull_request:
push:
branches: [master]

jobs:
unit-tests:
name: Unit Tests
Expand Down
7 changes: 6 additions & 1 deletion .github/workflows/golangci-lint.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
# https://github.com/marketplace/actions/run-golangci-lint
name: golangci-lint
on: [push, pull_request]

on:
pull_request:
push:
branches: [master]

jobs:
golangci:
name: Linter
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/goreleaser.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ jobs:
- name: Generate release notes
run: |
echo 'CHANGELOG' > /tmp/release.txt
github-release-notes -org weaveworks -repo libgitops -since-latest-release >> /tmp/release.txt
github-release-notes -org weaveworks -repo libgitops -since-latest-release -include-author -include-commits >> /tmp/release.txt
- name: Run GoReleaser
uses: goreleaser/goreleaser-action@v2
with:
Expand Down
45 changes: 31 additions & 14 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,33 +70,50 @@ This extended `pkg/runtime.Object` is used heavily in the storage subsystem desc
## The storage system - `pkg/storage`

The storage system is a collection of interfaces and reference implementations for storing Kubernetes-like objects
(that comply to the extended `pkg/runtime.Object` described above). It can be thought of as a database abstraction layer for objects based on how the interfaces are laid out.
(that comply to the extended `pkg/runtime.Object` described above). It can be thought of as a database abstraction layer
for objects based on how the interfaces are laid out.

There are three "layers" of storages:

### RawStorage interface

The `RawStorage` interfaces deal with _bytes_, this includes `RawStorage` and `MappedRawStorage`. It is essentially a filesystem abstraction.
The `RawStorage` interfaces deal with _bytes_, this includes `RawStorage` and `MappedRawStorage`. It is essentially a
filesystem abstraction.

- `GenericRawStorage` is a generic implementation of `RawStorage`, storing all objects as files on disk using the following path pattern: `<top-level-dir>/<kind>/<identifier>/metadata.json`.
- `GenericMappedRawStorage` is a generic implementation of `MappedRawStorage`, keeping track of mappings between `ObjectKey`s and the real file path on disk. This might be used for e.g. a Git repository where the file structure and contents don't follow a specific format, but mappings need to be registered separately.
- `GenericRawStorage` is a generic implementation of `RawStorage`, storing all objects as files on disk using the
following path pattern: `<top-level-dir>/<kind>/<identifier>/metadata.json`.
- `GenericMappedRawStorage` is a generic implementation of `MappedRawStorage`, keeping track of mappings between
`ObjectKey`s and the real file path on disk. This might be used for e.g. a Git repository where the file structure
and contents don't follow a specific format, but mappings need to be registered separately.

### Storage interfaces

"Generic" `Storage` interfaces deal with _objects_, this includes `Storage`, `TransactionStorage`, `WatchStorage` and `EventStorage`.
"Generic" `Storage` interfaces deal with _objects_, this includes `Storage`, `TransactionStorage` and `EventStorage`.

- The `Storage` interface is a union of two smaller interfaces, `ReadStorage` and `WriteStorage`. It exposes CRUD operations like `Get`, `List`, `Create`, `Update`, `Delete`.
- `TransactionStorage` extends `ReadStorage` with a `Transaction` method, which temporarily gives access to also the `WriteStorage` part when the transaction is active.
- `EventStorage` allows the user to subscribe to object events arising from changes by other actors in the system, e.g. a new object was added, or that someone changed or deleted some other object.
- The `Storage` interface is a union of two smaller interfaces, `ReadStorage` and `WriteStorage`. It exposes CRUD
operations like `Get`, `List`, `Create`, `Update`, `Delete`.
- `TransactionStorage` extends `ReadStorage` with a `Transaction` method, which temporarily gives access to also the
`WriteStorage` part when the transaction is active.
- `EventStorage` allows the user to subscribe to object events arising from changes by other actors in the system, e.g.
a new object was added, or that someone changed or deleted some other object.

### Storage implementations

"High-level" `Storage` implementations bind together multiple `Storage`s, this includes `GenericWatchStorage`, `GitStorage` and `ManifestStorage`.

- `GenericStorage` is a generic implementation of `Storage`, using the given `RawStorage` and `Serializer` to provide object operations to the user.
- `GenericWatchStorage` is an implementation of `EventStorage`, using inotify to watch a directory on disk. It sends update events to a registered channel. It is a superset of and extends a given `Storage`.
- `GitStorage` takes in a `GitDirectory` a `PullRequestProvider` and a `Serializer`. It watches for new commits automatically pulled by the `GitDirectory`, and re-syncs the underlying `GenericMappedRawStorage`. It implements the `TransactionStorage` interface, and when the transaction is active, allows writing which then yields a new branch and commit, pushed to the origin. Lastly, it can, using the `PullRequestProvider` create a Pull Request for the branch. In the future, it should also implement `EventStorage`.
- `ManifestStorage` watches a directory on disk using `GenericWatchStorage`, uses a `GenericStorage` for object operations, and a `MappedRawStorage` for files. Using it, implementing `EventStorage`, you can subscribe to file update/create/delete events in a given directory, e.g. a cloned Git repository or "manifest directory".
"High-level" `Storage` implementations bind together multiple `Storage`s, this includes `GenericWatchStorage`,
`GitStorage` and `ManifestStorage`.

- `GenericStorage` is a generic implementation of `Storage`, using the given `RawStorage` and `Serializer` to provide
object operations to the user.
- `GenericWatchStorage` is an implementation of `EventStorage`, using inotify to watch a directory on disk. It sends
update events to a registered channel. It is a superset of and extends a given `Storage`.
- `GitStorage` takes in a `GitDirectory` a `PullRequestProvider` and a `Serializer`. It watches for new commits
automatically pulled by the `GitDirectory`, and re-syncs the underlying `GenericMappedRawStorage`. It implements
the `TransactionStorage` interface, and when the transaction is active, allows writing which then yields a new branch
and commit, pushed to the origin. Lastly, it can, using the `PullRequestProvider` create a Pull Request for the
branch. In the future, it should also implement `EventStorage`.
- `ManifestStorage` watches a directory on disk using `GenericWatchStorage`, uses a `GenericStorage` for object
operations, and a `GenericMappedRawStorage` for files. Using it, implementing `EventStorage`, you can subscribe to
file update/create/delete events in a given directory, e.g. a cloned Git repository or "manifest directory".

**Example on how the storages interact:**

Expand Down
10 changes: 7 additions & 3 deletions cmd/sample-app/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package main
import (
"bytes"
"fmt"
"github.com/weaveworks/libgitops/pkg/storage/watch"
"github.com/weaveworks/libgitops/pkg/storage/watch/update"
"net/http"
"os"

Expand All @@ -15,7 +17,6 @@ import (
"github.com/weaveworks/libgitops/pkg/runtime"
"github.com/weaveworks/libgitops/pkg/serializer"
"github.com/weaveworks/libgitops/pkg/storage"
"github.com/weaveworks/libgitops/pkg/storage/manifest"
)

const (
Expand Down Expand Up @@ -53,14 +54,17 @@ func run() error {
)
defer func() { _ = plainStorage.Close() }()

watchStorage, err := manifest.NewManifestStorage(WatchDir, scheme.Serializer)
watchStorage, err := watch.NewManifestStorage(WatchDir, scheme.Serializer)
if err != nil {
return err
}
defer func() { _ = watchStorage.Close() }()

updates := make(chan update.Update, 4096)
watchStorage.SetUpdateStream(updates)

go func() {
for upd := range watchStorage.GetUpdateStream() {
for upd := range updates {
logrus.Infof("Got %s update for: %v %v", upd.Event, upd.PartialObject.GetObjectKind().GroupVersionKind(), upd.PartialObject.GetObjectMeta())
}
}()
Expand Down
10 changes: 7 additions & 3 deletions cmd/sample-gitops/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package main
import (
"context"
"fmt"
"github.com/weaveworks/libgitops/pkg/storage/watch"
"github.com/weaveworks/libgitops/pkg/storage/watch/update"
"io/ioutil"
"net/http"
"os"
Expand All @@ -19,7 +21,6 @@ import (
"github.com/weaveworks/libgitops/pkg/gitdir"
"github.com/weaveworks/libgitops/pkg/logs"
"github.com/weaveworks/libgitops/pkg/storage"
"github.com/weaveworks/libgitops/pkg/storage/manifest"
"github.com/weaveworks/libgitops/pkg/storage/transaction"
githubpr "github.com/weaveworks/libgitops/pkg/storage/transaction/pullrequest/github"
)
Expand Down Expand Up @@ -129,14 +130,17 @@ func run(identityFile, gitURL, ghToken, authorName, authorEmail string) error {
// Set the log level
logs.Logger.SetLevel(logrus.InfoLevel)

watchStorage, err := manifest.NewManifestStorage(gitDir.Dir(), scheme.Serializer)
watchStorage, err := watch.NewManifestStorage(gitDir.Dir(), scheme.Serializer)
if err != nil {
return err
}
defer func() { _ = watchStorage.Close() }()

updates := make(chan update.Update, 4096)
watchStorage.SetUpdateStream(updates)

go func() {
for upd := range watchStorage.GetUpdateStream() {
for upd := range updates {
logrus.Infof("Got %s update for: %v %v", upd.Event, upd.PartialObject.GetObjectKind().GroupVersionKind(), upd.PartialObject.GetObjectMeta())
}
}()
Expand Down
48 changes: 0 additions & 48 deletions pkg/storage/manifest/storage.go

This file was deleted.

38 changes: 22 additions & 16 deletions pkg/storage/watch/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

log "github.com/sirupsen/logrus"
"github.com/weaveworks/libgitops/pkg/runtime"
"github.com/weaveworks/libgitops/pkg/serializer"
"github.com/weaveworks/libgitops/pkg/storage"
"github.com/weaveworks/libgitops/pkg/storage/watch/update"
"github.com/weaveworks/libgitops/pkg/util/sync"
Expand All @@ -13,25 +14,26 @@ import (
"k8s.io/apimachinery/pkg/types"
)

// EventDeleteObjectName represents the name of the sent object in the GenericWatchStorage's event stream
// when the given object was deleted
const EventDeleteObjectName = "<deleted>"
// NewManifestStorage returns a pre-configured GenericWatchStorage backed by a storage.GenericStorage,
// and a GenericMappedRawStorage for the given manifestDir and Serializer. This should be sufficient
// for most users that want to watch changes in a directory with manifests.
func NewManifestStorage(manifestDir string, ser serializer.Serializer) (update.EventStorage, error) {
return NewGenericWatchStorage(
storage.NewGenericStorage(
storage.NewGenericMappedRawStorage(manifestDir),
ser,
[]runtime.IdentifierFactory{runtime.Metav1NameIdentifier},
),
)
}

// WatchStorage is an extended Storage implementation, which provides a watcher
// NewGenericWatchStorage is an extended Storage implementation, which provides a watcher
// for watching changes in the directory managed by the embedded Storage's RawStorage.
// If the RawStorage is a MappedRawStorage instance, it's mappings will automatically
// be updated by the WatchStorage. Update events are sent to the given event stream.
type WatchStorage interface {
// WatchStorage extends the Storage interface
storage.Storage
// GetTrigger returns a hook that can be used to detect a watch event
SetUpdateStream(update.UpdateStream)
}

// NewGenericWatchStorage constructs a new WatchStorage.
// Note: This WatchStorage only works for one-frame files (i.e. only one YAML document per
// file is supported).
func NewGenericWatchStorage(s storage.Storage) (WatchStorage, error) {
// Note: This WatchStorage only works for one-frame files (i.e. only one YAML document
// per file is supported).
func NewGenericWatchStorage(s storage.Storage) (update.EventStorage, error) {
ws := &GenericWatchStorage{
Storage: s,
}
Expand All @@ -49,6 +51,10 @@ func NewGenericWatchStorage(s storage.Storage) (WatchStorage, error) {
return ws, nil
}

// EventDeleteObjectName is used as the name of an object sent to the
// GenericWatchStorage's event stream when the the object has been deleted
const EventDeleteObjectName = "<deleted>"

// GenericWatchStorage implements the WatchStorage interface
type GenericWatchStorage struct {
storage.Storage
Expand All @@ -57,7 +63,7 @@ type GenericWatchStorage struct {
monitor *sync.Monitor
}

var _ WatchStorage = &GenericWatchStorage{}
var _ update.EventStorage = &GenericWatchStorage{}

// Suspend modify events during Create
func (s *GenericWatchStorage) Create(obj runtime.Object) error {
Expand Down
7 changes: 5 additions & 2 deletions pkg/storage/watch/update/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ type UpdateStream chan Update
type EventStorage interface {
storage.Storage

// GetUpdateStream can be subscribed to for receiving update events.
GetUpdateStream() UpdateStream
// SetUpdateStream gives the EventStorage a channel to send events to.
// The caller is responsible for choosing a large enough buffer to avoid
// blocking the underlying EventStorage implementation unnecessarily.
// TODO: In the future maybe enable sending events to multiple listeners?
SetUpdateStream(UpdateStream)
}

0 comments on commit 91096ca

Please sign in to comment.