Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Inomurko/s3 #2

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Dockerfile.dockerignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@
!/Makefile
!/*.go
!/cmd
!/s3
!/go.mod
!/go.sum
9 changes: 9 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,15 @@ celestia da-server accepts the following flags for celestia storage using
--celestia.server value (default: "http://localhost:26658") ($OP_PLASMA_DA_SERVER_CELESTIA_SERVER)
celestia server endpoint

--s3.credential-type $OP_PLASMA_S3_CREDENTIAL_TYPE Static or iam.
--s3.access-key-id $OP_PLASMA_S3_ACCESS_KEY_ID Access key id for S3 storage.
--s3.access-key-id $OP_PLASMA_S3_ACCESS_KEY_ID Access key id for S3 storage.
--s3.access-key-secret $OP_PLASMA_S3_ACCESS_KEY_SECRET Access key secret for S3 storage.
--s3.bucket $OP_PLASMA_S3_BUCKET Bucket name for S3 storage.
--s3.path $OP_PLASMA_S3_PATH
--routing.fallback (default: false) $OP_PLASMA_FALLBACK_TARGET Fall back backend target. Supports S3.
--routing.cache (default: false) $OP_PLASMA_CACHE

````

The celestia server endpoint should be set to the celestia-node rpc server,
Expand Down
119 changes: 109 additions & 10 deletions celestia_server.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package celestia

import (
"bytes"
"context"
"errors"
"fmt"
Expand All @@ -9,32 +10,44 @@ import (
"net/http"
"path"
"strconv"
"sync"
"time"

s3 "github.com/celestiaorg/op-plasma-celestia/s3"
plasma "github.com/ethereum-optimism/optimism/op-plasma"
"github.com/ethereum-optimism/optimism/op-service/rpc"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/log"
)

type CelestiaServer struct {
log log.Logger
endpoint string
store *CelestiaStore
s3Store *s3.S3Store
tls *rpc.ServerTLSConfig
httpServer *http.Server
listener net.Listener

cache bool
fallback bool
cacheLock sync.RWMutex
fallbackLock sync.RWMutex
}

func NewCelestiaServer(host string, port int, store *CelestiaStore, log log.Logger) *CelestiaServer {
func NewCelestiaServer(host string, port int, store *CelestiaStore, s3Store *s3.S3Store, fallback bool, cache bool, log log.Logger) *CelestiaServer {
endpoint := net.JoinHostPort(host, strconv.Itoa(port))
return &CelestiaServer{
log: log,
endpoint: endpoint,
store: store,
s3Store: s3Store,
httpServer: &http.Server{
Addr: endpoint,
},
fallback: fallback,
cache: cache,
}
}

Expand Down Expand Up @@ -94,18 +107,43 @@ func (d *CelestiaServer) HandleGet(w http.ResponseWriter, r *http.Request) {
return
}

// 1 read blob from cache if enabled
responseSent := false
if d.cache {
d.log.Debug("Retrieving data from cached backends")
input, err := d.multiSourceRead(r.Context(), comm, false)
if err == nil {
responseSent = true
if _, err := w.Write(input); err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
}
}
// 2 read blob from Celestia
input, err := d.store.Get(r.Context(), comm)
if err != nil && errors.Is(err, plasma.ErrNotFound) {
responseSent = true
w.WriteHeader(http.StatusNotFound)
return
} else {
responseSent = true
if _, err := w.Write(input); err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
}
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
if _, err := w.Write(input); err != nil {

//3 fallback
if d.fallback && err != nil {
input, err = d.multiSourceRead(r.Context(), comm, true)
if err != nil {
d.log.Error("Failed to read from fallback", "err", err)
w.WriteHeader(http.StatusInternalServerError)
return
}
} else if !responseSent {
w.WriteHeader(http.StatusInternalServerError)
return
}
}

Expand All @@ -124,14 +162,22 @@ func (d *CelestiaServer) HandlePut(w http.ResponseWriter, r *http.Request) {
return
}

comm, err := d.store.Put(r.Context(), input)
commitment, err := d.store.Put(r.Context(), input)
if err != nil {
key := hexutil.Encode(comm)
key := hexutil.Encode(commitment)
d.log.Info("Failed to store commitment to the DA server", "err", err, "key", key)
w.WriteHeader(http.StatusInternalServerError)
return
}
if _, err := w.Write(comm); err != nil {

if d.cache || d.fallback {
err = d.handleRedundantWrites(r.Context(), commitment, input)
if err != nil {
log.Error("Failed to write to redundant backends", "err", err)
}
}

if _, err := w.Write(commitment); err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
Expand All @@ -147,3 +193,56 @@ func (b *CelestiaServer) Stop() error {
_ = b.httpServer.Shutdown(ctx)
return nil
}

// multiSourceRead ... reads from a set of backends and returns the first successfully read blob
func (b *CelestiaServer) multiSourceRead(ctx context.Context, commitment []byte, fallback bool) ([]byte, error) {

if fallback {
b.fallbackLock.RLock()
defer b.fallbackLock.RUnlock()
} else {
b.cacheLock.RLock()
defer b.cacheLock.RUnlock()
}

key := crypto.Keccak256(commitment)
ctx, cancel := context.WithTimeout(ctx, b.s3Store.Timeout())
data, err := b.s3Store.Get(ctx, key)
defer cancel()
if err != nil {
b.log.Warn("Failed to read from redundant target S3", "err", err, "key", key)
return nil, errors.New("no data found in any redundant backend")
}

commit, err := b.store.CreateCommitment(data)
if err != nil || !bytes.Equal(commit, commitment[10:]) {
return nil, fmt.Errorf("celestia: invalid commitment: commit=%x commitment=%x err=%w", commit, commitment[10:], err)
}

return data, nil
}

// handleRedundantWrites ... writes to both sets of backends (i.e, fallback, cache)
// and returns an error if NONE of them succeed
// NOTE: multi-target set writes are done at once to avoid re-invocation of the same write function at the same
// caller step for different target sets vs. reading which is done conditionally to segment between a cached read type
// vs a fallback read type
func (b *CelestiaServer) handleRedundantWrites(ctx context.Context, commitment []byte, value []byte) error {
b.cacheLock.RLock()
b.fallbackLock.RLock()

defer func() {
b.cacheLock.RUnlock()
b.fallbackLock.RUnlock()
}()

ctx, cancel := context.WithTimeout(ctx, b.s3Store.Timeout())
key := crypto.Keccak256(commitment)
err := b.s3Store.Put(ctx, key, value)
defer cancel()
if err != nil {
b.log.Warn("Failed to write to redundant s3 target", "err", err, "timeout", b.s3Store.Timeout(), "key", key)
}

return nil
}
20 changes: 16 additions & 4 deletions celestia_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ import (
"time"

client "github.com/celestiaorg/celestia-openrpc"
"github.com/celestiaorg/go-square/blob"
"github.com/celestiaorg/go-square/inclusion"
"github.com/celestiaorg/go-square/merkle"
"github.com/celestiaorg/go-square/namespace"
plasma "github.com/ethereum-optimism/optimism/op-plasma"
"github.com/ethereum/go-ethereum/log"
)
Expand All @@ -23,8 +27,8 @@ type CelestiaConfig struct {
type CelestiaStore struct {
Log log.Logger
GetTimeout time.Duration
Namespace []byte
Client *client.Client
Namespace []byte
Client *client.Client
}

// NewCelestiaStore returns a celestia store.
Expand All @@ -37,9 +41,9 @@ func NewCelestiaStore(cfg CelestiaConfig) *CelestiaStore {
}
return &CelestiaStore{
Log: Log,
Client: client,
Client: client,
GetTimeout: time.Minute,
Namespace: cfg.Namespace,
Namespace: cfg.Namespace,
}
}

Expand All @@ -66,3 +70,11 @@ func (d *CelestiaStore) Put(ctx context.Context, data []byte) ([]byte, error) {
}
return nil, err
}

func (d *CelestiaStore) CreateCommitment(data []byte) ([]byte, error) {
ins, err := namespace.From(d.Namespace)
if err != nil {
return nil, err
}
return inclusion.CreateCommitment(blob.New(ins, data, 0), merkle.HashFromByteSlices, 64)
}
10 changes: 8 additions & 2 deletions cmd/daserver/entrypoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@ import (

"github.com/urfave/cli/v2"

celestia "github.com/celestiaorg/op-plasma-celestia"
s3 "github.com/celestiaorg/op-plasma-celestia/s3"
oplog "github.com/ethereum-optimism/optimism/op-service/log"
"github.com/ethereum-optimism/optimism/op-service/opio"
celestia "github.com/celestiaorg/op-plasma-celestia"
)

type Server interface {
Expand Down Expand Up @@ -38,7 +39,12 @@ func StartDAServer(cliCtx *cli.Context) error {
case cfg.CelestiaEnabled():
l.Info("Using celestia storage", "url", cfg.CelestiaConfig().URL)
store := celestia.NewCelestiaStore(cfg.CelestiaConfig())
server = celestia.NewCelestiaServer(cliCtx.String(ListenAddrFlagName), cliCtx.Int(PortFlagName), store, l)
l.Info("Using s3 storage", "config", cfg.S3Config)
s3Store, err := s3.NewS3(cfg.S3Config)
if err != nil {
return err
}
server = celestia.NewCelestiaServer(cliCtx.String(ListenAddrFlagName), cliCtx.Int(PortFlagName), store, s3Store, cfg.FallbackEnabled(), cfg.CacheEnabled(), l)
}

if err := server.Start(); err != nil {
Expand Down
Loading