Skip to content

Commit

Permalink
Merge pull request #4975 from aduffeck/delete-shares-from-deleted-spaces
Browse files Browse the repository at this point in the history
Delete shares from deleted spaces
  • Loading branch information
butonic authored Nov 28, 2024
2 parents 6529d03 + 9c5399e commit 77cca69
Show file tree
Hide file tree
Showing 6 changed files with 214 additions and 15 deletions.
5 changes: 5 additions & 0 deletions changelog/unreleased/delete-stale-shares.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Bugfix: Delete stale shares in the jsoncs3 share manager

The jsoncs3 share manager now properly deletes all references to removed shares and shares that belong to a space that was deleted

https://github.com/cs3org/reva/pull/4975
100 changes: 85 additions & 15 deletions pkg/share/manager/jsoncs3/jsoncs3.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/cs3org/reva/v2/pkg/errtypes"
"github.com/cs3org/reva/v2/pkg/events"
"github.com/cs3org/reva/v2/pkg/events/stream"
"github.com/cs3org/reva/v2/pkg/logger"
"github.com/cs3org/reva/v2/pkg/rgrpc/todo/pool"
"github.com/cs3org/reva/v2/pkg/share"
"github.com/cs3org/reva/v2/pkg/share/manager/jsoncs3/providercache"
Expand Down Expand Up @@ -114,6 +115,12 @@ func init() {
registry.Register("jsoncs3", NewDefault)
}

var (
_registeredEvents = []events.Unmarshaller{
events.SpaceDeleted{},
}
)

type config struct {
GatewayAddr string `mapstructure:"gateway_addr"`
MaxConcurrency int `mapstructure:"max_concurrency"`
Expand Down Expand Up @@ -188,7 +195,8 @@ func NewDefault(m map[string]interface{}) (share.Manager, error) {
// New returns a new manager instance.
func New(s metadata.Storage, gatewaySelector pool.Selectable[gatewayv1beta1.GatewayAPIClient], ttlSeconds int, es events.Stream, maxconcurrency int) (*Manager, error) {
ttl := time.Duration(ttlSeconds) * time.Second
return &Manager{

m := &Manager{
Cache: providercache.New(s, ttl),
CreatedCache: sharecache.New(s, "users", "created.json", ttl),
UserReceivedStates: receivedsharecache.New(s, ttl),
Expand All @@ -197,7 +205,18 @@ func New(s metadata.Storage, gatewaySelector pool.Selectable[gatewayv1beta1.Gate
gatewaySelector: gatewaySelector,
eventStream: es,
MaxConcurrency: maxconcurrency,
}, nil
}

// listen for events
if m.eventStream != nil {
ch, err := events.Consume(m.eventStream, "jsoncs3sharemanager", _registeredEvents...)
if err != nil {
appctx.GetLogger(context.Background()).Error().Err(err).Msg("error consuming events")
}
go m.ProcessEvents(ch)
}

return m, nil
}

func (m *Manager) initialize(ctx context.Context) error {
Expand Down Expand Up @@ -248,6 +267,22 @@ func (m *Manager) initialize(ctx context.Context) error {
return nil
}

func (m *Manager) ProcessEvents(ch <-chan events.Event) {
log := logger.New()
for event := range ch {
ctx := context.Background()

if err := m.initialize(ctx); err != nil {
log.Error().Err(err).Msg("error initializing manager")
}

if ev, ok := event.Event.(events.SpaceDeleted); ok {
log.Debug().Msgf("space deleted event: %v", ev)
go func() { m.purgeSpace(ctx, ev.ID) }()
}
}
}

// Share creates a new share
func (m *Manager) Share(ctx context.Context, md *provider.ResourceInfo, g *collaboration.ShareGrant) (*collaboration.Share, error) {
ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "Share")
Expand Down Expand Up @@ -420,7 +455,7 @@ func (m *Manager) GetShare(ctx context.Context, ref *collaboration.ShareReferenc
return nil, err
}
if share.IsExpired(s) {
if err := m.removeShare(ctx, s); err != nil {
if err := m.removeShare(ctx, s, false); err != nil {
sublog.Error().Err(err).
Msg("failed to unshare expired share")
}
Expand Down Expand Up @@ -485,7 +520,7 @@ func (m *Manager) Unshare(ctx context.Context, ref *collaboration.ShareReference
return errtypes.NotFound(ref.String())
}

return m.removeShare(ctx, s)
return m.removeShare(ctx, s, false)
}

// UpdateShare updates the mode of the given share.
Expand Down Expand Up @@ -622,7 +657,7 @@ func (m *Manager) listSharesByIDs(ctx context.Context, user *userv1beta1.User, f
resourceID := s.GetResourceId()
sublog = sublog.With().Str("storageid", resourceID.GetStorageId()).Str("spaceid", resourceID.GetSpaceId()).Str("opaqueid", resourceID.GetOpaqueId()).Logger()
if share.IsExpired(s) {
if err := m.removeShare(ctx, s); err != nil {
if err := m.removeShare(ctx, s, false); err != nil {
sublog.Error().Err(err).
Msg("failed to unshare expired share")
}
Expand Down Expand Up @@ -740,7 +775,7 @@ func (m *Manager) listCreatedShares(ctx context.Context, user *userv1beta1.User,
continue
}
if share.IsExpired(s) {
if err := m.removeShare(ctx, s); err != nil {
if err := m.removeShare(ctx, s, false); err != nil {
sublog.Error().Err(err).
Msg("failed to unshare expired share")
}
Expand Down Expand Up @@ -906,7 +941,7 @@ func (m *Manager) ListReceivedShares(ctx context.Context, filters []*collaborati
}
sublogr = sublogr.With().Str("shareid", shareID).Logger()
if share.IsExpired(s) {
if err := m.removeShare(ctx, s); err != nil {
if err := m.removeShare(ctx, s, false); err != nil {
sublogr.Error().Err(err).
Msg("failed to unshare expired share")
}
Expand Down Expand Up @@ -1009,7 +1044,7 @@ func (m *Manager) getReceived(ctx context.Context, ref *collaboration.ShareRefer
return nil, errtypes.NotFound(ref.String())
}
if share.IsExpired(s) {
if err := m.removeShare(ctx, s); err != nil {
if err := m.removeShare(ctx, s, false); err != nil {
sublog.Error().Err(err).
Msg("failed to unshare expired share")
}
Expand Down Expand Up @@ -1136,24 +1171,59 @@ func (m *Manager) Load(ctx context.Context, shareChan <-chan *collaboration.Shar
return nil
}

func (m *Manager) removeShare(ctx context.Context, s *collaboration.Share) error {
func (m *Manager) purgeSpace(ctx context.Context, id *provider.StorageSpaceId) {
log := appctx.GetLogger(ctx)
storageID, spaceID := storagespace.SplitStorageID(id.OpaqueId)

shares, err := m.Cache.ListSpace(ctx, storageID, spaceID)
if err != nil {
log.Error().Err(err).Msg("error listing shares in space")
return
}

// iterate over all shares in the space and remove them
for _, share := range shares.Shares {
err := m.removeShare(ctx, share, true)
if err != nil {
log.Error().Err(err).Msg("error removing share")
}
}

// remove all shares in the space
err = m.Cache.PurgeSpace(ctx, storageID, spaceID)
if err != nil {
log.Error().Err(err).Msg("error purging space")
}
}

func (m *Manager) removeShare(ctx context.Context, s *collaboration.Share, skipSpaceCache bool) error {
ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "removeShare")
defer span.End()

eg, ctx := errgroup.WithContext(ctx)
eg.Go(func() error {
storageID, spaceID, _ := shareid.Decode(s.Id.OpaqueId)
err := m.Cache.Remove(ctx, storageID, spaceID, s.Id.OpaqueId)
if !skipSpaceCache {
eg.Go(func() error {
storageID, spaceID, _ := shareid.Decode(s.Id.OpaqueId)
err := m.Cache.Remove(ctx, storageID, spaceID, s.Id.OpaqueId)

return err
})
return err
})
}

eg.Go(func() error {
// remove from created cache
return m.CreatedCache.Remove(ctx, s.GetCreator().GetOpaqueId(), s.Id.OpaqueId)
})

// TODO remove from grantee cache
eg.Go(func() error {
// remove from user received states
if s.GetGrantee().Type == provider.GranteeType_GRANTEE_TYPE_USER {
return m.UserReceivedStates.Remove(ctx, s.GetGrantee().GetUserId().GetOpaqueId(), s.GetResourceId().GetStorageId()+shareid.IDDelimiter+s.GetResourceId().GetSpaceId(), s.Id.OpaqueId)
} else if s.GetGrantee().Type == provider.GranteeType_GRANTEE_TYPE_GROUP {
return m.GroupReceivedCache.Remove(ctx, s.GetGrantee().GetGroupId().GetOpaqueId(), s.Id.OpaqueId)
}
return nil
})

return eg.Wait()
}
25 changes: 25 additions & 0 deletions pkg/share/manager/jsoncs3/providercache/providercache.go
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,31 @@ func (c *Cache) Persist(ctx context.Context, storageID, spaceID string) error {
return nil
}

// PurgeSpace removes a space from the cache
func (c *Cache) PurgeSpace(ctx context.Context, storageID, spaceID string) error {
ctx, span := tracer.Start(ctx, "PurgeSpace")
defer span.End()

unlock := c.LockSpace(spaceID)
defer unlock()
span.AddEvent("got lock")

if !c.isSpaceCached(storageID, spaceID) {
err := c.syncWithLock(ctx, storageID, spaceID)
if err != nil {
return err
}
}

spaces, ok := c.Providers.Load(storageID)
if !ok {
return nil
}
spaces.Spaces.Store(spaceID, &Shares{})

return c.Persist(ctx, storageID, spaceID)
}

func (c *Cache) syncWithLock(ctx context.Context, storageID, spaceID string) error {
ctx, span := tracer.Start(ctx, "syncWithLock")
defer span.End()
Expand Down
10 changes: 10 additions & 0 deletions pkg/share/manager/jsoncs3/providercache/providercache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,5 +181,15 @@ var _ = Describe("Cache", func() {
Expect(c.Persist(ctx, storageID, spaceID)).ToNot(Succeed())
})
})

Describe("PurgeSpace", func() {
It("removes the entry", func() {
Expect(c.PurgeSpace(ctx, storageID, spaceID)).To(Succeed())

s, err := c.Get(ctx, storageID, spaceID, shareID, false)
Expect(err).ToNot(HaveOccurred())
Expect(s).To(BeNil())
})
})
})
})
68 changes: 68 additions & 0 deletions pkg/share/manager/jsoncs3/receivedsharecache/receivedsharecache.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,74 @@ func (c *Cache) Get(ctx context.Context, userID, spaceID, shareID string) (*Stat
return rss.Spaces[spaceID].States[shareID], nil
}

// Remove removes an entry from the cache
func (c *Cache) Remove(ctx context.Context, userID, spaceID, shareID string) error {
ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "Grab lock")
unlock := c.lockUser(userID)
span.End()
span.SetAttributes(attribute.String("cs3.userid", userID))
defer unlock()

ctx, span = appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "Add")
defer span.End()
span.SetAttributes(attribute.String("cs3.userid", userID), attribute.String("cs3.spaceid", spaceID))

persistFunc := func() error {
c.initializeIfNeeded(userID, spaceID)

rss, _ := c.ReceivedSpaces.Load(userID)
receivedSpace := rss.Spaces[spaceID]
if receivedSpace.States == nil {
receivedSpace.States = map[string]*State{}
}
delete(receivedSpace.States, shareID)
if len(receivedSpace.States) == 0 {
delete(rss.Spaces, spaceID)
}

return c.persist(ctx, userID)
}

log := appctx.GetLogger(ctx).With().
Str("hostname", os.Getenv("HOSTNAME")).
Str("userID", userID).
Str("spaceID", spaceID).Logger()

var err error
for retries := 100; retries > 0; retries-- {
err = persistFunc()
switch err.(type) {
case nil:
span.SetStatus(codes.Ok, "")
return nil
case errtypes.Aborted:
log.Debug().Msg("aborted when persisting added received share: etag changed. retrying...")
// this is the expected status code from the server when the if-match etag check fails
// continue with sync below
case errtypes.PreconditionFailed:
log.Debug().Msg("precondition failed when persisting added received share: etag changed. retrying...")
// actually, this is the wrong status code and we treat it like errtypes.Aborted because of inconsistencies on the server side
// continue with sync below
case errtypes.AlreadyExists:
log.Debug().Msg("already exists when persisting added received share. retrying...")
// CS3 uses an already exists error instead of precondition failed when using an If-None-Match=* header / IfExists flag in the InitiateFileUpload call.
// Thas happens when the cache thinks there is no file.
// continue with sync below
default:
span.SetStatus(codes.Error, fmt.Sprintf("persisting added received share failed. giving up: %s", err.Error()))
log.Error().Err(err).Msg("persisting added received share failed")
return err
}
if err := c.syncWithLock(ctx, userID); err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
log.Error().Err(err).Msg("persisting added received share failed. giving up.")
return err
}
}
return err
}

// List returns a list of received shares for a given user
// The return list is guaranteed to be thread-safe
func (c *Cache) List(ctx context.Context, userID string) (map[string]*Space, error) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,5 +134,26 @@ var _ = Describe("Cache", func() {
Expect(s).ToNot(BeNil())
})
})

Describe("Remove", func() {
It("removes the entry", func() {
err := c.Remove(ctx, userID, spaceID, shareID)
Expect(err).ToNot(HaveOccurred())

s, err := c.Get(ctx, userID, spaceID, shareID)
Expect(err).ToNot(HaveOccurred())
Expect(s).To(BeNil())
})

It("persists the removal", func() {
err := c.Remove(ctx, userID, spaceID, shareID)
Expect(err).ToNot(HaveOccurred())

c = receivedsharecache.New(storage, 0*time.Second)
s, err := c.Get(ctx, userID, spaceID, shareID)
Expect(err).ToNot(HaveOccurred())
Expect(s).To(BeNil())
})
})
})
})

0 comments on commit 77cca69

Please sign in to comment.