Skip to content

Commit

Permalink
feat: Support publishing new log entries to Pub/Sub topics
Browse files Browse the repository at this point in the history
Adds initial support publishing new log entries to Pub/Sub topics. Interested
parties can subscribe to the topic in order to receive notifications when new
entries are added.

Signed-off-by: James Alseth <[email protected]>
  • Loading branch information
jalseth committed Jul 8, 2023
1 parent 3ded91e commit 5f1c40a
Show file tree
Hide file tree
Showing 22 changed files with 1,070 additions and 26 deletions.
3 changes: 3 additions & 0 deletions Dockerfile.pubsub-emulator
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# gcloud sdk for pubsub emulator with netcat added for the startup health check
FROM google/cloud-sdk:437.0.1@sha256:615af2b80c5781891f402a38f35a5f422664f3bfee00c8ab2b94be47ac9fce8f
RUN apt-get install -y netcat
4 changes: 4 additions & 0 deletions cmd/rekor-server/app/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,10 @@ func init() {
Memory and file-based signers should only be used for testing.`)
rootCmd.PersistentFlags().String("rekor_server.signer-passwd", "", "Password to decrypt signer private key")

rootCmd.PersistentFlags().String("rekor_server.new_entry_publisher", "", "URL for pub/sub queue to send messages to when new entries are added to the log. Ignored if not set.")
rootCmd.PersistentFlags().Bool("rekor_server.publish_new_entry_protobuf", false, "Whether to publish events for new log entries in Protobuf wire format. Requires new_entry_publisher to be configured.")
rootCmd.PersistentFlags().Bool("rekor_server.publish_new_entry_json", false, "Whether to publish events for new log entries in CloudEvents JSON format. Requires new_entry_publisher to be configured.")

rootCmd.PersistentFlags().Uint16("port", 3000, "Port to bind to")

rootCmd.PersistentFlags().Bool("enable_retrieve_api", true, "enables Redis-based index API endpoint")
Expand Down
3 changes: 0 additions & 3 deletions cmd/rekor-server/app/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ var serveCmd = &cobra.Command{
Short: "start http server with configured api",
Long: `Starts a http server and serves the configured api`,
Run: func(cmd *cobra.Command, args []string) {

// Setup the logger to dev/prod
log.ConfigureLogger(viper.GetString("log_type"))

Expand All @@ -83,7 +82,6 @@ var serveCmd = &cobra.Command{
log.Logger.Error(err)
}
}()

//TODO: make this a config option for server to load via viper field
//TODO: add command line option to print versions supported in binary

Expand All @@ -101,7 +99,6 @@ var serveCmd = &cobra.Command{
hashedrekord.KIND: {hashedrekord_v001.APIVERSION},
dsse.KIND: {dsse_v001.APIVERSION},
}

for k, v := range pluggableTypeMap {
log.Logger.Infof("Loading support for pluggable type '%v'", k)
log.Logger.Infof("Loading version '%v' for pluggable type '%v'", v, k)
Expand Down
25 changes: 25 additions & 0 deletions docker-compose.test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ services:
build:
context: .
target: "test"
environment:
PUBSUB_EMULATOR_HOST: gcp-pubsub-emulator:8085
command: [
"rekor-server",
"-test.coverprofile=rekor-server.cov",
Expand All @@ -32,7 +34,30 @@ services:
"--enable_attestation_storage",
"--attestation_storage_bucket=file:///var/run/attestations",
"--max_request_body_size=32792576",
"--rekor_server.new_entry_publisher=gcppubsub://projects/test-project/topics/new-entry",
"--rekor_server.publish_new_entry_json=true",
"--rekor_server.publish_new_entry_protobuf=true",
]
ports:
- "3000:3000"
- "2112:2112"
depends_on:
- gcp-pubsub-emulator
gcp-pubsub-emulator:
image: gcp-pubsub-emulator
ports:
- "8085:8085"
command:
- gcloud
- beta
- emulators
- pubsub
- start
- --host-port=0.0.0.0:8085
- --project=test-project
healthcheck:
test: ["CMD", "nc", "-zv", "localhost", "8085"]
interval: 10s
timeout: 3s
retries: 3
start_period: 10s
1 change: 0 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -112,4 +112,3 @@ services:
timeout: 3s
retries: 3
start_period: 5s

3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ require (
)

require (
cloud.google.com/go/pubsub v1.31.0
github.com/AdamKorcz/go-fuzz-headers-1 v0.0.0-20230618160516-e936619f9f18
github.com/cyberphone/json-canonicalization v0.0.0-20220623050100-57a0ce2678a7
github.com/go-redis/redismock/v9 v9.0.3
Expand Down Expand Up @@ -182,7 +183,7 @@ require (
golang.org/x/term v0.10.0 // indirect
golang.org/x/text v0.11.0 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
google.golang.org/api v0.128.0 // indirect
google.golang.org/api v0.128.0
google.golang.org/appengine v1.6.7 // indirect
gopkg.in/square/go-jose.v2 v2.6.0 // indirect
gopkg.in/yaml.v2 v2.4.0
Expand Down
3 changes: 1 addition & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,7 @@ cloud.google.com/go/pubsub v1.26.0/go.mod h1:QgBH3U/jdJy/ftjPhTkyXNj543Tin1pRYcd
cloud.google.com/go/pubsub v1.27.1/go.mod h1:hQN39ymbV9geqBnfQq6Xf63yNhUAhv9CZhzp5O6qsW0=
cloud.google.com/go/pubsub v1.28.0/go.mod h1:vuXFpwaVoIPQMGXqRyUQigu/AX1S3IWugR9xznmcXX8=
cloud.google.com/go/pubsub v1.30.0/go.mod h1:qWi1OPS0B+b5L+Sg6Gmc9zD1Y+HaM0MdUr7LsupY1P4=
cloud.google.com/go/pubsub v1.31.0 h1:aXdyyJz90kA+bor9+6+xHAciMD5mj8v15WqFZ5E0sek=
cloud.google.com/go/pubsub v1.31.0/go.mod h1:dYmJ3K97NCQ/e4OwZ20rD4Ym3Bu8Gu9m/aJdWQjdcks=
cloud.google.com/go/pubsublite v1.5.0/go.mod h1:xapqNQ1CuLfGi23Yda/9l4bBCKz/wC3KIJ5gKcxveZg=
cloud.google.com/go/pubsublite v1.6.0/go.mod h1:1eFCS0U11xlOuMFV/0iBqw3zP12kddMeCbj/F3FSj9k=
Expand Down Expand Up @@ -2125,8 +2126,6 @@ github.com/shopspring/decimal v1.2.0/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFR
github.com/shurcooL/httpfs v0.0.0-20190707220628-8d4bc4ba7749/go.mod h1:ZY1cvUeJuFPAdZ/B6v7RHavJWZn2YPVFQ1OSXhCGOkg=
github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc=
github.com/shurcooL/vfsgen v0.0.0-20200824052919-0d455de96546/go.mod h1:TrYk7fJVaAttu97ZZKrO9UbRa8izdowaMIZcxYMbVaw=
github.com/sigstore/protobuf-specs v0.1.0 h1:X0l/E2C2c79t/rI/lmSu8WAoKWsQtMqDzAMiDdEMGr8=
github.com/sigstore/protobuf-specs v0.1.0/go.mod h1:5shUCxf82hGnjUEFVWiktcxwzdtn6EfeeJssxZ5Q5HE=
github.com/sigstore/sigstore v1.7.1 h1:fCATemikcBK0cG4+NcM940MfoIgmioY1vC6E66hXxks=
github.com/sigstore/sigstore v1.7.1/go.mod h1:0PmMzfJP2Y9+lugD0wer4e7TihR5tM7NcIs3bQNk5xg=
github.com/sigstore/sigstore/pkg/signature/kms/aws v1.7.1 h1:rDHrG/63b3nBq3G9plg7iYnWN6lBhOfq/XultlCZgII=
Expand Down
23 changes: 23 additions & 0 deletions pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"google.golang.org/grpc/credentials/insecure"

"github.com/sigstore/rekor/pkg/log"
"github.com/sigstore/rekor/pkg/pubsub"
"github.com/sigstore/rekor/pkg/sharding"
"github.com/sigstore/rekor/pkg/signer"
"github.com/sigstore/rekor/pkg/storage"
Expand All @@ -39,6 +40,8 @@ import (
"github.com/sigstore/sigstore/pkg/cryptoutils"
"github.com/sigstore/sigstore/pkg/signature"
"github.com/sigstore/sigstore/pkg/signature/options"

_ "github.com/sigstore/rekor/pkg/pubsub/gcp" // Load GCP pubsub implementation
)

func dial(ctx context.Context, rpcServer string) (*grpc.ClientConn, error) {
Expand All @@ -63,6 +66,9 @@ type API struct {
signer signature.Signer
// stops checkpoint publishing
checkpointPublishCancel context.CancelFunc
// Publishes notifications when new entries are added to the log. May be
// nil if no publisher is configured.
newEntryPublisher pubsub.Publisher
}

func NewAPI(treeID uint) (*API, error) {
Expand Down Expand Up @@ -112,6 +118,18 @@ func NewAPI(treeID uint) (*API, error) {

pubkey := cryptoutils.PEMEncode(cryptoutils.PublicKeyPEMType, b)

var newEntryPublisher pubsub.Publisher
if p := viper.GetString("rekor_server.new_entry_publisher"); p != "" {
if !viper.GetBool("rekor_server.publish_new_entry_protobuf") && !viper.GetBool("rekor_server.publish_new_entry_json") {
return nil, fmt.Errorf("%q is configured but neither %q or %q are enabled", "new_entry_publisher", "publish_new_entry_protobuf", "publish_new_entry_json")
}
newEntryPublisher, err = pubsub.Get(ctx, p)
if err != nil {
return nil, fmt.Errorf("init event publisher: %w", err)
}
log.ContextLogger(ctx).Debugf("Initialized new entry event publisher: %s", p)
}

return &API{
// Transparency Log Stuff
logClient: logClient,
Expand All @@ -121,6 +139,8 @@ func NewAPI(treeID uint) (*API, error) {
pubkey: string(pubkey),
pubkeyHash: hex.EncodeToString(pubkeyHashBytes[:]),
signer: rekorSigner,
// Utility functionality not required for operation of the core service
newEntryPublisher: newEntryPublisher,
}, nil
}

Expand Down Expand Up @@ -165,5 +185,8 @@ func ConfigureAPI(treeID uint) {
}

func StopAPI() {
if api.newEntryPublisher != nil {
api.newEntryPublisher.Close()
}
api.checkpointPublishCancel()
}
62 changes: 61 additions & 1 deletion pkg/api/entries.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,16 @@ import (
"github.com/transparency-dev/merkle/rfc6962"
"google.golang.org/genproto/googleapis/rpc/code"
"google.golang.org/grpc/codes"
"google.golang.org/protobuf/encoding/prototext"

"github.com/sigstore/rekor/pkg/events"
"github.com/sigstore/rekor/pkg/events/newentry"
"github.com/sigstore/rekor/pkg/generated/models"
"github.com/sigstore/rekor/pkg/generated/restapi/operations/entries"
"github.com/sigstore/rekor/pkg/log"
"github.com/sigstore/rekor/pkg/pubsub"
"github.com/sigstore/rekor/pkg/sharding"
"github.com/sigstore/rekor/pkg/tle"
"github.com/sigstore/rekor/pkg/trillianclient"
"github.com/sigstore/rekor/pkg/types"
"github.com/sigstore/rekor/pkg/util"
Expand Down Expand Up @@ -290,7 +295,7 @@ func createLogEntry(params entries.CreateLogEntryParams) (models.LogEntry, middl
RootHash: swag.String(hex.EncodeToString(root.RootHash)),
LogIndex: swag.Int64(queuedLeaf.LeafIndex),
Hashes: hashes,
Checkpoint: stringPointer(string(scBytes)),
Checkpoint: swag.String(string(scBytes)),
}

logEntryAnon.Verification = &models.LogEntryAnonVerification{
Expand All @@ -301,9 +306,64 @@ func createLogEntry(params entries.CreateLogEntryParams) (models.LogEntry, middl
logEntry := models.LogEntry{
entryID: logEntryAnon,
}

if api.newEntryPublisher != nil {
// Publishing notifications should not block the API response.
go func() {
var subjects []string
verifier, err := entry.Verifier()
if err != nil {
log.ContextLogger(ctx).Warnf("Could not get verifier for log entry %s: %w", entryID, err)
} else {
subjects = verifier.Subjects()
}
pbEntry, err := tle.GenerateTransparencyLogEntry(logEntryAnon)
if err != nil {
incPublishEvent(newentry.Name, "", false)
log.ContextLogger(ctx).Error(err)
return
}
event, err := newentry.New(entryID, pbEntry, subjects)
if err != nil {
incPublishEvent(newentry.Name, "", false)
log.ContextLogger(ctx).Error(err)
return
}
if viper.GetBool("rekor_server.publish_new_entry_protobuf") {
publishEvent(ctx, api.newEntryPublisher, event, events.ContentTypeProtobuf)
}
if viper.GetBool("rekor_server.publish_new_entry_json") {
publishEvent(ctx, api.newEntryPublisher, event, events.ContentTypeJSON)
}
}()
}

return logEntry, nil
}

func publishEvent(ctx context.Context, publisher pubsub.Publisher, event *events.Event, contentType events.EventContentType) {
err := publisher.Publish(context.Background(), event, contentType)
incPublishEvent(event.Type().Name(), contentType, err == nil)
if err != nil {
log.ContextLogger(ctx).Error(err)
} else {
log.ContextLogger(ctx).Debugf("Published new entry event (%s): %s", contentType, prototext.Format(event.Message()))
}
}

func incPublishEvent(event string, contentType events.EventContentType, ok bool) {
status := "SUCCESS"
if !ok {
status = "ERROR"
}
labels := map[string]string{
"event": event,
"status": status,
"content_type": string(contentType),
}
metricPublishEvents.With(labels).Inc()
}

// CreateLogEntryHandler creates new entry into log
func CreateLogEntryHandler(params entries.CreateLogEntryParams) middleware.Responder {
httpReq := params.HTTPRequest
Expand Down
5 changes: 5 additions & 0 deletions pkg/api/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ var (
Help: "The total number of new log entries",
})

metricPublishEvents = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "rekor_publish_events",
Help: "The status of publishing events to Pub/Sub",
}, []string{"event", "content_type", "status"})

MetricLatency = promauto.NewHistogramVec(prometheus.HistogramOpts{
Name: "rekor_api_latency",
Help: "Api Latency on calls",
Expand Down
19 changes: 19 additions & 0 deletions pkg/events/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// Copyright 2023 The Sigstore Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package events provides methods for working with CloudEvents.
package events

// The version of the CloudEvents specification the package adheres to.
const CloudEventsSpecVersion = "1.0"
Loading

0 comments on commit 5f1c40a

Please sign in to comment.