Skip to content

Commit

Permalink
feat: allow SP to create/update object for delegator (#581)
Browse files Browse the repository at this point in the history
* enable delegated agent to create object

* enable delegated agent to update object

* use flag to toggle SP as upload agent for bucket

* clean code

* disable discontinue

* fix gas limit

* refine code

* gen swaggger

* fix gvg test

* fix comment
  • Loading branch information
alexgao001 authored Mar 13, 2024
1 parent 5c6b827 commit df143c6
Show file tree
Hide file tree
Showing 17 changed files with 7,991 additions and 4,845 deletions.
4 changes: 4 additions & 0 deletions app/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,10 @@ func (app *App) registerSerengetiUpgradeHandler() {
func(ctx sdk.Context, plan upgradetypes.Plan, fromVM module.VersionMap) (module.VersionMap, error) {
app.Logger().Info("upgrade to ", plan.Name)
app.VirtualgroupKeeper.MigrateGlobalVirtualGroupFamiliesForSP(ctx)
app.GashubKeeper.SetMsgGasParams(ctx, *gashubtypes.NewMsgGasParamsWithFixedGas(sdk.MsgTypeURL(&storagemoduletypes.MsgToggleSPAsDelegatedAgent{}), 1.2e3))
app.GashubKeeper.SetMsgGasParams(ctx, *gashubtypes.NewMsgGasParamsWithFixedGas(sdk.MsgTypeURL(&storagemoduletypes.MsgDelegateCreateObject{}), 1.2e3))
app.GashubKeeper.SetMsgGasParams(ctx, *gashubtypes.NewMsgGasParamsWithFixedGas(sdk.MsgTypeURL(&storagemoduletypes.MsgDelegateUpdateObjectContent{}), 1.2e3))
app.GashubKeeper.SetMsgGasParams(ctx, *gashubtypes.NewMsgGasParamsWithFixedGas(sdk.MsgTypeURL(&storagemoduletypes.MsgSealObjectV2{}), 1.2e2))
return app.mm.RunMigrations(ctx, app.configurator, fromVM)
})

Expand Down
6 changes: 4 additions & 2 deletions e2e/tests/permission_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1115,7 +1115,8 @@ func (s *StorageTestSuite) TestStalePermissionForAccountGC() {

// bucket and object dont exist after deletion
headObjectReq := storagetypes.QueryHeadObjectRequest{
BucketName: objectName,
BucketName: bucketName,
ObjectName: objectName,
}
_, err = s.Client.HeadObject(ctx, &headObjectReq)
s.Require().Error(err)
Expand Down Expand Up @@ -1331,7 +1332,8 @@ func (s *StorageTestSuite) TestStalePermissionForGroupGC() {

// bucket and object dont exist after deletion
headObjectReq := storagetypes.QueryHeadObjectRequest{
BucketName: objectName,
BucketName: bucketName,
ObjectName: objectName,
}
_, err = s.Client.HeadObject(ctx, &headObjectReq)
s.Require().Error(err)
Expand Down
155 changes: 155 additions & 0 deletions e2e/tests/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2407,3 +2407,158 @@ func (s *StorageTestSuite) TestDeleteCreateObject_InCreatedStatus() {
_, err = s.Client.HeadObject(ctx, &queryHeadObjectRequest)
s.Require().EqualError(err, "rpc error: code = Unknown desc = No such object: unknown request")
}

func (s *StorageTestSuite) TestToggleBucketSpAsDelegatedAgents() {
var err error
// CreateBucket
sp := s.BaseSuite.PickStorageProvider()
gvg, found := sp.GetFirstGlobalVirtualGroup()
s.Require().True(found)
user := s.GenAndChargeAccounts(1, 1000000)[0]
bucketName := storageutils.GenRandomBucketName()
msgCreateBucket := storagetypes.NewMsgCreateBucket(
user.GetAddr(), bucketName, storagetypes.VISIBILITY_TYPE_PRIVATE, sp.OperatorKey.GetAddr(),
nil, math.MaxUint, nil, 0)
msgCreateBucket.PrimarySpApproval.GlobalVirtualGroupFamilyId = gvg.FamilyId
msgCreateBucket.PrimarySpApproval.Sig, err = sp.ApprovalKey.Sign(msgCreateBucket.GetApprovalBytes())
s.Require().NoError(err)
s.SendTxBlock(user, msgCreateBucket)

queryHeadBucketRequest := storagetypes.QueryHeadBucketRequest{
BucketName: bucketName,
}
ctx := context.Background()
queryHeadBucketResponse, err := s.Client.HeadBucket(ctx, &queryHeadBucketRequest)
s.Require().NoError(err)
s.Require().Equal(false, queryHeadBucketResponse.BucketInfo.SpAsDelegatedAgentDisabled)

MsgToggleSPAsDelegatedAgent := storagetypes.NewMsgToggleSPAsDelegatedAgent(
user.GetAddr(),
bucketName)
s.SendTxBlock(user, MsgToggleSPAsDelegatedAgent)

// HeadBucket
queryHeadBucketResponse, err = s.Client.HeadBucket(ctx, &queryHeadBucketRequest)
s.Require().NoError(err)
s.Require().Equal(true, queryHeadBucketResponse.BucketInfo.SpAsDelegatedAgentDisabled)
}

func (s *StorageTestSuite) TestCreateObjectByDelegatedAgents() {
var err error
ctx := context.Background()

// CreateBucket
sp := s.BaseSuite.PickStorageProvider()
gvg, found := sp.GetFirstGlobalVirtualGroup()
s.Require().True(found)

bucketOwner := s.GenAndChargeAccounts(1, 1000000)[0]
bucketName := storageutils.GenRandomBucketName()
objectName := storageutils.GenRandomObjectName()

msgCreateBucket := storagetypes.NewMsgCreateBucket(
bucketOwner.GetAddr(), bucketName, storagetypes.VISIBILITY_TYPE_PRIVATE, sp.OperatorKey.GetAddr(),
nil, math.MaxUint, nil, 0)
msgCreateBucket.PrimarySpApproval.GlobalVirtualGroupFamilyId = gvg.FamilyId
msgCreateBucket.PrimarySpApproval.Sig, err = sp.ApprovalKey.Sign(msgCreateBucket.GetApprovalBytes())
s.Require().NoError(err)

s.SendTxBlock(bucketOwner, msgCreateBucket)

// HeadBucket
queryHeadBucketRequest := storagetypes.QueryHeadBucketRequest{
BucketName: bucketName,
}
queryHeadBucketResponse, err := s.Client.HeadBucket(ctx, &queryHeadBucketRequest)
s.Require().NoError(err)
s.Require().Equal(false, queryHeadBucketResponse.BucketInfo.SpAsDelegatedAgentDisabled)

// DelegateCreate for user2, who does not have permission
var buffer bytes.Buffer
// Create 1MiB content where each line contains 1024 characters.
for i := 0; i < 1024; i++ {
buffer.WriteString(fmt.Sprintf("[%05d] %s\n", i, line))
}
payloadSize := buffer.Len()
contextType := "text/event-stream"
msgDelegateCreateObject := storagetypes.NewMsgDelegateCreateObject(
sp.OperatorKey.GetAddr(),
bucketOwner.GetAddr(),
bucketName,
objectName,
uint64(payloadSize),
storagetypes.VISIBILITY_TYPE_PRIVATE,
nil,
contextType,
storagetypes.REDUNDANCY_EC_TYPE)
s.SendTxBlock(sp.OperatorKey, msgDelegateCreateObject)

headObjectReq := storagetypes.QueryHeadObjectRequest{
BucketName: bucketName,
ObjectName: objectName,
}
headObjectResp, err := s.Client.HeadObject(ctx, &headObjectReq)
s.Require().NoError(err)
s.Require().Equal(objectName, headObjectResp.ObjectInfo.ObjectName)
s.Require().Equal(bucketOwner.GetAddr().String(), headObjectResp.ObjectInfo.Owner)
s.Require().Equal(0, len(headObjectResp.ObjectInfo.Checksums))

// SP seal object, and update the object checksum
checksum := sdk.Keccak256(buffer.Bytes())
expectChecksum := [][]byte{checksum, checksum, checksum, checksum, checksum, checksum, checksum}

gvgId := gvg.Id
msgSealObject := storagetypes.NewMsgSealObjectV2(sp.SealKey.GetAddr(), bucketName, objectName, gvg.Id, nil, expectChecksum)
secondarySigs := make([][]byte, 0)
secondarySPBlsPubKeys := make([]bls.PublicKey, 0)
blsSignHash := storagetypes.NewSecondarySpSealObjectSignDoc(s.GetChainID(), gvgId, headObjectResp.ObjectInfo.Id, storagetypes.GenerateHash(expectChecksum[:])).GetBlsSignHash()
// every secondary sp signs the checksums
for _, spID := range gvg.SecondarySpIds {
sig, err := core.BlsSignAndVerify(s.StorageProviders[spID], blsSignHash)
s.Require().NoError(err)
secondarySigs = append(secondarySigs, sig)
pk, err := bls.PublicKeyFromBytes(s.StorageProviders[spID].BlsKey.PubKey().Bytes())
s.Require().NoError(err)
secondarySPBlsPubKeys = append(secondarySPBlsPubKeys, pk)
}
aggBlsSig, err := core.BlsAggregateAndVerify(secondarySPBlsPubKeys, blsSignHash, secondarySigs)
s.Require().NoError(err)
msgSealObject.SecondarySpBlsAggSignatures = aggBlsSig
s.T().Logf("msg %s", msgSealObject.String())
s.SendTxBlock(sp.SealKey, msgSealObject)

headObjectResp, err = s.Client.HeadObject(ctx, &headObjectReq)
s.Require().NoError(err)
s.Require().Equal(objectName, headObjectResp.ObjectInfo.ObjectName)
s.Require().Equal(bucketOwner.GetAddr().String(), headObjectResp.ObjectInfo.Owner)
s.Require().Equal(expectChecksum, headObjectResp.ObjectInfo.Checksums)

// delegate update
var newBuffer bytes.Buffer
for i := 0; i < 2048; i++ {
newBuffer.WriteString(fmt.Sprintf("[%05d] %s\n", i, line))
}
newPayloadSize := uint64(newBuffer.Len())
newChecksum := sdk.Keccak256(newBuffer.Bytes())
newExpectChecksum := [][]byte{newChecksum, newChecksum, newChecksum, newChecksum, newChecksum, newChecksum, newChecksum}

msgUpdateObject := storagetypes.NewMsgDelegateUpdateObjectContent(sp.OperatorKey.GetAddr(),
bucketOwner.GetAddr(), bucketName, objectName, newPayloadSize, nil)
s.SendTxBlock(sp.OperatorKey, msgUpdateObject)
s.T().Logf("msgUpdateObject %s", msgUpdateObject.String())

// every secondary sp signs the checksums
newSecondarySigs := make([][]byte, 0)
newBlsSignHash := storagetypes.NewSecondarySpSealObjectSignDoc(s.GetChainID(), gvgId, headObjectResp.ObjectInfo.Id, storagetypes.GenerateHash(newExpectChecksum[:])).GetBlsSignHash()
for _, spID := range gvg.SecondarySpIds {
sig, err := core.BlsSignAndVerify(s.StorageProviders[spID], newBlsSignHash)
s.Require().NoError(err)
newSecondarySigs = append(newSecondarySigs, sig)
}
aggBlsSig, err = core.BlsAggregateAndVerify(secondarySPBlsPubKeys, newBlsSignHash, newSecondarySigs)
s.Require().NoError(err)
msgSealObject = storagetypes.NewMsgSealObjectV2(sp.SealKey.GetAddr(), bucketName, objectName, gvg.Id, nil, newExpectChecksum)
msgSealObject.SecondarySpBlsAggSignatures = aggBlsSig
s.T().Logf("msgSealObject %s", msgSealObject.String())
s.SendTxBlock(sp.SealKey, msgSealObject)
}
2 changes: 1 addition & 1 deletion e2e/tests/virtualgroup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func (s *VirtualGroupTestSuite) TestBasic() {
availableGvgFamilyIds := s.queryAvailableGlobalVirtualGroupFamilies([]uint32{gvg.FamilyId})
s.Require().Equal(availableGvgFamilyIds[0], gvg.FamilyId)
spAvailableGvgFamilyIds := s.querySpAvailableGlobalVirtualGroupFamilies(primarySP.Info.Id)
s.Require().Equal(spAvailableGvgFamilyIds[0], gvg.FamilyId)
s.Require().Contains(spAvailableGvgFamilyIds, gvg.FamilyId)

spOptimalGvgFamilyId := s.querySpOptimalGlobalVirtualGroupFamily(primarySP.Info.Id, virtualgroupmoduletypes.Strategy_Maximize_Free_Store_Size)
s.Require().Equal(spOptimalGvgFamilyId, gvg.FamilyId)
Expand Down
4 changes: 4 additions & 0 deletions proto/greenfield/storage/events.proto
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,10 @@ message EventSealObject {
uint32 global_virtual_group_id = 7;
// local_virtual_group_id defines the unique id of lvg which the object stored
uint32 local_virtual_group_id = 8;
// checksums define the total checksums of the object which generated by redundancy
// SP might set the checksum of object if it was delegated created by SP, which checksum
// will not be available until sealing object.
repeated bytes checksums = 9;
}

// EventCopyObject is emitted on MsgCopyObject
Expand Down
93 changes: 93 additions & 0 deletions proto/greenfield/storage/tx.proto
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@ service Msg {
rpc UpdateBucketInfo(MsgUpdateBucketInfo) returns (MsgUpdateBucketInfoResponse);
rpc MirrorBucket(MsgMirrorBucket) returns (MsgMirrorBucketResponse);
rpc DiscontinueBucket(MsgDiscontinueBucket) returns (MsgDiscontinueBucketResponse);
rpc ToggleSPAsDelegatedAgent(MsgToggleSPAsDelegatedAgent) returns (MsgToggleSPAsDelegatedAgentResponse);

// basic operation of object
rpc CreateObject(MsgCreateObject) returns (MsgCreateObjectResponse);
rpc SealObject(MsgSealObject) returns (MsgSealObjectResponse);
rpc SealObjectV2(MsgSealObjectV2) returns (MsgSealObjectV2Response);
rpc RejectSealObject(MsgRejectSealObject) returns (MsgRejectSealObjectResponse);
rpc CopyObject(MsgCopyObject) returns (MsgCopyObjectResponse);
rpc DeleteObject(MsgDeleteObject) returns (MsgDeleteObjectResponse);
Expand All @@ -36,6 +38,8 @@ service Msg {
rpc UpdateObjectInfo(MsgUpdateObjectInfo) returns (MsgUpdateObjectInfoResponse);
rpc UpdateObjectContent(MsgUpdateObjectContent) returns (MsgUpdateObjectContentResponse);
rpc CancelUpdateObjectContent(MsgCancelUpdateObjectContent) returns (MsgCancelUpdateObjectContentResponse);
rpc DelegateCreateObject(MsgDelegateCreateObject) returns (MsgDelegateCreateObjectResponse);
rpc DelegateUpdateObjectContent(MsgDelegateUpdateObjectContent) returns (MsgDelegateUpdateObjectContentResponse);

// basic operation of group
rpc CreateGroup(MsgCreateGroup) returns (MsgCreateGroupResponse);
Expand Down Expand Up @@ -190,6 +194,33 @@ message MsgSealObject {

message MsgSealObjectResponse {}

message MsgSealObjectV2 {
option (cosmos.msg.v1.signer) = "operator";

// operator defines the account address of primary SP
string operator = 1 [(cosmos_proto.scalar) = "cosmos.AddressString"];

// bucket_name defines the name of the bucket where the object is stored.
string bucket_name = 2;

// object_name defines the name of object to be sealed.
string object_name = 3;

// global_virtual_group_id defines the id of global virtual group
uint32 global_virtual_group_id = 4;

// secondary_sp_bls_agg_signatures defines the aggregate bls signature of the secondary sp that can
// acknowledge that the payload data has received and stored.
bytes secondary_sp_bls_agg_signatures = 5;

// (optional) checksums define the total checksums of the object which generated by redundancy
// SP might set the checksum of object if it was delegated created by SP, which checksum
// will not be available until sealing object.
repeated bytes expect_checksums = 6;
}

message MsgSealObjectV2Response {}

message MsgRejectSealObject {
option (cosmos.msg.v1.signer) = "operator";

Expand Down Expand Up @@ -679,3 +710,65 @@ message MsgCancelUpdateObjectContent {
}

message MsgCancelUpdateObjectContentResponse {}

message MsgDelegateCreateObject {
option (cosmos.msg.v1.signer) = "operator";
// operator defines the account address of the operator, it is the delegated agent that allows to creat object under bucket.
string operator = 1 [(cosmos_proto.scalar) = "cosmos.AddressString"];
// creator defines the account address of the object creator.
string creator = 2 [(cosmos_proto.scalar) = "cosmos.AddressString"];
// bucket_name defines the name of the bucket where the object is stored.
string bucket_name = 3;
// object_name defines the name of object
string object_name = 4;
// payload_size defines size of the object's payload
uint64 payload_size = 5;
// content_type define the format of the object which should be a standard MIME type.
string content_type = 6;
// visibility means the object is private or public. if private, only object owner or grantee can access it,
// otherwise every greenfield user can access it.
VisibilityType visibility = 7;
// expect_checksums defines a list of hashes which was generate by redundancy algorithm.
repeated bytes expect_checksums = 8;
// redundancy_type can be ec or replica
RedundancyType redundancy_type = 9;
}

message MsgDelegateCreateObjectResponse {
string object_id = 1 [
(cosmos_proto.scalar) = "cosmos.Uint",
(gogoproto.customtype) = "Uint",
(gogoproto.nullable) = false
];
}

message MsgDelegateUpdateObjectContent {
option (cosmos.msg.v1.signer) = "operator";
// operator defines the account address of the operator, it is the delegated agent that allows to creat object under bucket.
string operator = 1 [(cosmos_proto.scalar) = "cosmos.AddressString"];
// updater defines the account address of the object updater.
string updater = 2 [(cosmos_proto.scalar) = "cosmos.AddressString"];
// bucket_name defines the name of the bucket where the object is stored.
string bucket_name = 3;
// object_name defines the name of object
string object_name = 4;
// payload_size defines size of the object's payload
uint64 payload_size = 5;
// content_type define the format of the object which should be a standard MIME type.
string content_type = 6;
// expect_checksums defines a list of hashes which was generate by redundancy algorithm.
repeated bytes expect_checksums = 7;
}

message MsgDelegateUpdateObjectContentResponse {}

message MsgToggleSPAsDelegatedAgent {
option (cosmos.msg.v1.signer) = "operator";

// operator defines the account address of the operator, only the bucket owner can send the tx.
string operator = 1 [(cosmos_proto.scalar) = "cosmos.AddressString"];
// bucket_name defines the name of the bucket.
string bucket_name = 2;
}

message MsgToggleSPAsDelegatedAgentResponse {}
3 changes: 3 additions & 0 deletions proto/greenfield/storage/types.proto
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ message BucketInfo {
BucketStatus bucket_status = 10;
// tags defines a list of tags the bucket has
ResourceTags tags = 11;
// sp_as_delegated_agent_disabled indicates that whether bucket owner disable SP as the upload agent.
// when a bucket is created, by default, this is false, means SP is allowed to create object for delegator
bool sp_as_delegated_agent_disabled = 12;
}

message InternalBucketInfo {
Expand Down
Loading

0 comments on commit df143c6

Please sign in to comment.