Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: allow SP to create/update object for delegator #581

Merged
merged 11 commits into from
Mar 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions app/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,10 @@ func (app *App) registerSerengetiUpgradeHandler() {
func(ctx sdk.Context, plan upgradetypes.Plan, fromVM module.VersionMap) (module.VersionMap, error) {
app.Logger().Info("upgrade to ", plan.Name)
app.VirtualgroupKeeper.MigrateGlobalVirtualGroupFamiliesForSP(ctx)
app.GashubKeeper.SetMsgGasParams(ctx, *gashubtypes.NewMsgGasParamsWithFixedGas(sdk.MsgTypeURL(&storagemoduletypes.MsgToggleSPAsDelegatedAgent{}), 1.2e3))
unclezoro marked this conversation as resolved.
Show resolved Hide resolved
app.GashubKeeper.SetMsgGasParams(ctx, *gashubtypes.NewMsgGasParamsWithFixedGas(sdk.MsgTypeURL(&storagemoduletypes.MsgDelegateCreateObject{}), 1.2e3))
app.GashubKeeper.SetMsgGasParams(ctx, *gashubtypes.NewMsgGasParamsWithFixedGas(sdk.MsgTypeURL(&storagemoduletypes.MsgDelegateUpdateObjectContent{}), 1.2e3))
app.GashubKeeper.SetMsgGasParams(ctx, *gashubtypes.NewMsgGasParamsWithFixedGas(sdk.MsgTypeURL(&storagemoduletypes.MsgSealObjectV2{}), 1.2e2))
return app.mm.RunMigrations(ctx, app.configurator, fromVM)
})

Expand Down
6 changes: 4 additions & 2 deletions e2e/tests/permission_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1115,7 +1115,8 @@ func (s *StorageTestSuite) TestStalePermissionForAccountGC() {

// bucket and object dont exist after deletion
headObjectReq := storagetypes.QueryHeadObjectRequest{
BucketName: objectName,
BucketName: bucketName,
ObjectName: objectName,
}
_, err = s.Client.HeadObject(ctx, &headObjectReq)
s.Require().Error(err)
Expand Down Expand Up @@ -1331,7 +1332,8 @@ func (s *StorageTestSuite) TestStalePermissionForGroupGC() {

// bucket and object dont exist after deletion
headObjectReq := storagetypes.QueryHeadObjectRequest{
BucketName: objectName,
BucketName: bucketName,
ObjectName: objectName,
}
_, err = s.Client.HeadObject(ctx, &headObjectReq)
s.Require().Error(err)
Expand Down
155 changes: 155 additions & 0 deletions e2e/tests/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2407,3 +2407,158 @@ func (s *StorageTestSuite) TestDeleteCreateObject_InCreatedStatus() {
_, err = s.Client.HeadObject(ctx, &queryHeadObjectRequest)
s.Require().EqualError(err, "rpc error: code = Unknown desc = No such object: unknown request")
}

func (s *StorageTestSuite) TestToggleBucketSpAsDelegatedAgents() {
var err error
// CreateBucket
sp := s.BaseSuite.PickStorageProvider()
gvg, found := sp.GetFirstGlobalVirtualGroup()
s.Require().True(found)
user := s.GenAndChargeAccounts(1, 1000000)[0]
bucketName := storageutils.GenRandomBucketName()
msgCreateBucket := storagetypes.NewMsgCreateBucket(
user.GetAddr(), bucketName, storagetypes.VISIBILITY_TYPE_PRIVATE, sp.OperatorKey.GetAddr(),
nil, math.MaxUint, nil, 0)
msgCreateBucket.PrimarySpApproval.GlobalVirtualGroupFamilyId = gvg.FamilyId
msgCreateBucket.PrimarySpApproval.Sig, err = sp.ApprovalKey.Sign(msgCreateBucket.GetApprovalBytes())
s.Require().NoError(err)
s.SendTxBlock(user, msgCreateBucket)

queryHeadBucketRequest := storagetypes.QueryHeadBucketRequest{
BucketName: bucketName,
}
ctx := context.Background()
queryHeadBucketResponse, err := s.Client.HeadBucket(ctx, &queryHeadBucketRequest)
s.Require().NoError(err)
s.Require().Equal(false, queryHeadBucketResponse.BucketInfo.SpAsDelegatedAgentDisabled)

MsgToggleSPAsDelegatedAgent := storagetypes.NewMsgToggleSPAsDelegatedAgent(
user.GetAddr(),
bucketName)
s.SendTxBlock(user, MsgToggleSPAsDelegatedAgent)

// HeadBucket
queryHeadBucketResponse, err = s.Client.HeadBucket(ctx, &queryHeadBucketRequest)
s.Require().NoError(err)
s.Require().Equal(true, queryHeadBucketResponse.BucketInfo.SpAsDelegatedAgentDisabled)
}

func (s *StorageTestSuite) TestCreateObjectByDelegatedAgents() {
var err error
ctx := context.Background()

// CreateBucket
sp := s.BaseSuite.PickStorageProvider()
gvg, found := sp.GetFirstGlobalVirtualGroup()
s.Require().True(found)

bucketOwner := s.GenAndChargeAccounts(1, 1000000)[0]
bucketName := storageutils.GenRandomBucketName()
objectName := storageutils.GenRandomObjectName()

msgCreateBucket := storagetypes.NewMsgCreateBucket(
bucketOwner.GetAddr(), bucketName, storagetypes.VISIBILITY_TYPE_PRIVATE, sp.OperatorKey.GetAddr(),
nil, math.MaxUint, nil, 0)
msgCreateBucket.PrimarySpApproval.GlobalVirtualGroupFamilyId = gvg.FamilyId
msgCreateBucket.PrimarySpApproval.Sig, err = sp.ApprovalKey.Sign(msgCreateBucket.GetApprovalBytes())
s.Require().NoError(err)

s.SendTxBlock(bucketOwner, msgCreateBucket)

// HeadBucket
queryHeadBucketRequest := storagetypes.QueryHeadBucketRequest{
BucketName: bucketName,
}
queryHeadBucketResponse, err := s.Client.HeadBucket(ctx, &queryHeadBucketRequest)
s.Require().NoError(err)
s.Require().Equal(false, queryHeadBucketResponse.BucketInfo.SpAsDelegatedAgentDisabled)

// DelegateCreate for user2, who does not have permission
var buffer bytes.Buffer
// Create 1MiB content where each line contains 1024 characters.
for i := 0; i < 1024; i++ {
buffer.WriteString(fmt.Sprintf("[%05d] %s\n", i, line))
}
payloadSize := buffer.Len()
contextType := "text/event-stream"
msgDelegateCreateObject := storagetypes.NewMsgDelegateCreateObject(
sp.OperatorKey.GetAddr(),
bucketOwner.GetAddr(),
bucketName,
objectName,
uint64(payloadSize),
storagetypes.VISIBILITY_TYPE_PRIVATE,
nil,
contextType,
storagetypes.REDUNDANCY_EC_TYPE)
s.SendTxBlock(sp.OperatorKey, msgDelegateCreateObject)

headObjectReq := storagetypes.QueryHeadObjectRequest{
BucketName: bucketName,
ObjectName: objectName,
}
headObjectResp, err := s.Client.HeadObject(ctx, &headObjectReq)
s.Require().NoError(err)
s.Require().Equal(objectName, headObjectResp.ObjectInfo.ObjectName)
s.Require().Equal(bucketOwner.GetAddr().String(), headObjectResp.ObjectInfo.Owner)
s.Require().Equal(0, len(headObjectResp.ObjectInfo.Checksums))

// SP seal object, and update the object checksum
checksum := sdk.Keccak256(buffer.Bytes())
expectChecksum := [][]byte{checksum, checksum, checksum, checksum, checksum, checksum, checksum}

gvgId := gvg.Id
msgSealObject := storagetypes.NewMsgSealObjectV2(sp.SealKey.GetAddr(), bucketName, objectName, gvg.Id, nil, expectChecksum)
secondarySigs := make([][]byte, 0)
secondarySPBlsPubKeys := make([]bls.PublicKey, 0)
blsSignHash := storagetypes.NewSecondarySpSealObjectSignDoc(s.GetChainID(), gvgId, headObjectResp.ObjectInfo.Id, storagetypes.GenerateHash(expectChecksum[:])).GetBlsSignHash()
// every secondary sp signs the checksums
for _, spID := range gvg.SecondarySpIds {
sig, err := core.BlsSignAndVerify(s.StorageProviders[spID], blsSignHash)
s.Require().NoError(err)
secondarySigs = append(secondarySigs, sig)
pk, err := bls.PublicKeyFromBytes(s.StorageProviders[spID].BlsKey.PubKey().Bytes())
s.Require().NoError(err)
secondarySPBlsPubKeys = append(secondarySPBlsPubKeys, pk)
}
aggBlsSig, err := core.BlsAggregateAndVerify(secondarySPBlsPubKeys, blsSignHash, secondarySigs)
s.Require().NoError(err)
msgSealObject.SecondarySpBlsAggSignatures = aggBlsSig
s.T().Logf("msg %s", msgSealObject.String())
s.SendTxBlock(sp.SealKey, msgSealObject)

headObjectResp, err = s.Client.HeadObject(ctx, &headObjectReq)
s.Require().NoError(err)
s.Require().Equal(objectName, headObjectResp.ObjectInfo.ObjectName)
s.Require().Equal(bucketOwner.GetAddr().String(), headObjectResp.ObjectInfo.Owner)
s.Require().Equal(expectChecksum, headObjectResp.ObjectInfo.Checksums)

// delegate update
var newBuffer bytes.Buffer
for i := 0; i < 2048; i++ {
newBuffer.WriteString(fmt.Sprintf("[%05d] %s\n", i, line))
}
newPayloadSize := uint64(newBuffer.Len())
newChecksum := sdk.Keccak256(newBuffer.Bytes())
newExpectChecksum := [][]byte{newChecksum, newChecksum, newChecksum, newChecksum, newChecksum, newChecksum, newChecksum}

msgUpdateObject := storagetypes.NewMsgDelegateUpdateObjectContent(sp.OperatorKey.GetAddr(),
bucketOwner.GetAddr(), bucketName, objectName, newPayloadSize, nil)
s.SendTxBlock(sp.OperatorKey, msgUpdateObject)
s.T().Logf("msgUpdateObject %s", msgUpdateObject.String())

// every secondary sp signs the checksums
newSecondarySigs := make([][]byte, 0)
newBlsSignHash := storagetypes.NewSecondarySpSealObjectSignDoc(s.GetChainID(), gvgId, headObjectResp.ObjectInfo.Id, storagetypes.GenerateHash(newExpectChecksum[:])).GetBlsSignHash()
for _, spID := range gvg.SecondarySpIds {
sig, err := core.BlsSignAndVerify(s.StorageProviders[spID], newBlsSignHash)
s.Require().NoError(err)
newSecondarySigs = append(newSecondarySigs, sig)
}
aggBlsSig, err = core.BlsAggregateAndVerify(secondarySPBlsPubKeys, newBlsSignHash, newSecondarySigs)
s.Require().NoError(err)
msgSealObject = storagetypes.NewMsgSealObjectV2(sp.SealKey.GetAddr(), bucketName, objectName, gvg.Id, nil, newExpectChecksum)
msgSealObject.SecondarySpBlsAggSignatures = aggBlsSig
s.T().Logf("msgSealObject %s", msgSealObject.String())
s.SendTxBlock(sp.SealKey, msgSealObject)
}
2 changes: 1 addition & 1 deletion e2e/tests/virtualgroup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func (s *VirtualGroupTestSuite) TestBasic() {
availableGvgFamilyIds := s.queryAvailableGlobalVirtualGroupFamilies([]uint32{gvg.FamilyId})
s.Require().Equal(availableGvgFamilyIds[0], gvg.FamilyId)
spAvailableGvgFamilyIds := s.querySpAvailableGlobalVirtualGroupFamilies(primarySP.Info.Id)
s.Require().Equal(spAvailableGvgFamilyIds[0], gvg.FamilyId)
s.Require().Contains(spAvailableGvgFamilyIds, gvg.FamilyId)

spOptimalGvgFamilyId := s.querySpOptimalGlobalVirtualGroupFamily(primarySP.Info.Id, virtualgroupmoduletypes.Strategy_Maximize_Free_Store_Size)
s.Require().Equal(spOptimalGvgFamilyId, gvg.FamilyId)
Expand Down
4 changes: 4 additions & 0 deletions proto/greenfield/storage/events.proto
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,10 @@ message EventSealObject {
uint32 global_virtual_group_id = 7;
// local_virtual_group_id defines the unique id of lvg which the object stored
uint32 local_virtual_group_id = 8;
// checksums define the total checksums of the object which generated by redundancy
// SP might set the checksum of object if it was delegated created by SP, which checksum
// will not be available until sealing object.
repeated bytes checksums = 9;
alexgao001 marked this conversation as resolved.
Show resolved Hide resolved
}

// EventCopyObject is emitted on MsgCopyObject
Expand Down
93 changes: 93 additions & 0 deletions proto/greenfield/storage/tx.proto
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@ service Msg {
rpc UpdateBucketInfo(MsgUpdateBucketInfo) returns (MsgUpdateBucketInfoResponse);
rpc MirrorBucket(MsgMirrorBucket) returns (MsgMirrorBucketResponse);
rpc DiscontinueBucket(MsgDiscontinueBucket) returns (MsgDiscontinueBucketResponse);
rpc ToggleSPAsDelegatedAgent(MsgToggleSPAsDelegatedAgent) returns (MsgToggleSPAsDelegatedAgentResponse);

// basic operation of object
rpc CreateObject(MsgCreateObject) returns (MsgCreateObjectResponse);
rpc SealObject(MsgSealObject) returns (MsgSealObjectResponse);
rpc SealObjectV2(MsgSealObjectV2) returns (MsgSealObjectV2Response);
rpc RejectSealObject(MsgRejectSealObject) returns (MsgRejectSealObjectResponse);
rpc CopyObject(MsgCopyObject) returns (MsgCopyObjectResponse);
rpc DeleteObject(MsgDeleteObject) returns (MsgDeleteObjectResponse);
Expand All @@ -36,6 +38,8 @@ service Msg {
rpc UpdateObjectInfo(MsgUpdateObjectInfo) returns (MsgUpdateObjectInfoResponse);
rpc UpdateObjectContent(MsgUpdateObjectContent) returns (MsgUpdateObjectContentResponse);
rpc CancelUpdateObjectContent(MsgCancelUpdateObjectContent) returns (MsgCancelUpdateObjectContentResponse);
rpc DelegateCreateObject(MsgDelegateCreateObject) returns (MsgDelegateCreateObjectResponse);
rpc DelegateUpdateObjectContent(MsgDelegateUpdateObjectContent) returns (MsgDelegateUpdateObjectContentResponse);

// basic operation of group
rpc CreateGroup(MsgCreateGroup) returns (MsgCreateGroupResponse);
Expand Down Expand Up @@ -190,6 +194,33 @@ message MsgSealObject {

message MsgSealObjectResponse {}

message MsgSealObjectV2 {
option (cosmos.msg.v1.signer) = "operator";

// operator defines the account address of primary SP
string operator = 1 [(cosmos_proto.scalar) = "cosmos.AddressString"];

// bucket_name defines the name of the bucket where the object is stored.
string bucket_name = 2;

// object_name defines the name of object to be sealed.
string object_name = 3;

// global_virtual_group_id defines the id of global virtual group
uint32 global_virtual_group_id = 4;

// secondary_sp_bls_agg_signatures defines the aggregate bls signature of the secondary sp that can
// acknowledge that the payload data has received and stored.
bytes secondary_sp_bls_agg_signatures = 5;

// (optional) checksums define the total checksums of the object which generated by redundancy
// SP might set the checksum of object if it was delegated created by SP, which checksum
// will not be available until sealing object.
repeated bytes expect_checksums = 6;
}

message MsgSealObjectV2Response {}

message MsgRejectSealObject {
option (cosmos.msg.v1.signer) = "operator";

Expand Down Expand Up @@ -679,3 +710,65 @@ message MsgCancelUpdateObjectContent {
}

message MsgCancelUpdateObjectContentResponse {}

message MsgDelegateCreateObject {
option (cosmos.msg.v1.signer) = "operator";
// operator defines the account address of the operator, it is the delegated agent that allows to creat object under bucket.
string operator = 1 [(cosmos_proto.scalar) = "cosmos.AddressString"];
// creator defines the account address of the object creator.
string creator = 2 [(cosmos_proto.scalar) = "cosmos.AddressString"];
// bucket_name defines the name of the bucket where the object is stored.
string bucket_name = 3;
// object_name defines the name of object
string object_name = 4;
// payload_size defines size of the object's payload
uint64 payload_size = 5;
// content_type define the format of the object which should be a standard MIME type.
string content_type = 6;
// visibility means the object is private or public. if private, only object owner or grantee can access it,
// otherwise every greenfield user can access it.
VisibilityType visibility = 7;
// expect_checksums defines a list of hashes which was generate by redundancy algorithm.
repeated bytes expect_checksums = 8;
// redundancy_type can be ec or replica
RedundancyType redundancy_type = 9;
}

message MsgDelegateCreateObjectResponse {
string object_id = 1 [
(cosmos_proto.scalar) = "cosmos.Uint",
(gogoproto.customtype) = "Uint",
(gogoproto.nullable) = false
];
}

message MsgDelegateUpdateObjectContent {
option (cosmos.msg.v1.signer) = "operator";
// operator defines the account address of the operator, it is the delegated agent that allows to creat object under bucket.
string operator = 1 [(cosmos_proto.scalar) = "cosmos.AddressString"];
// updater defines the account address of the object updater.
string updater = 2 [(cosmos_proto.scalar) = "cosmos.AddressString"];
// bucket_name defines the name of the bucket where the object is stored.
string bucket_name = 3;
// object_name defines the name of object
string object_name = 4;
// payload_size defines size of the object's payload
uint64 payload_size = 5;
// content_type define the format of the object which should be a standard MIME type.
string content_type = 6;
// expect_checksums defines a list of hashes which was generate by redundancy algorithm.
repeated bytes expect_checksums = 7;
}

message MsgDelegateUpdateObjectContentResponse {}

message MsgToggleSPAsDelegatedAgent {
option (cosmos.msg.v1.signer) = "operator";

// operator defines the account address of the operator, only the bucket owner can send the tx.
string operator = 1 [(cosmos_proto.scalar) = "cosmos.AddressString"];
// bucket_name defines the name of the bucket.
string bucket_name = 2;
}

message MsgToggleSPAsDelegatedAgentResponse {}
3 changes: 3 additions & 0 deletions proto/greenfield/storage/types.proto
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ message BucketInfo {
BucketStatus bucket_status = 10;
// tags defines a list of tags the bucket has
ResourceTags tags = 11;
// sp_as_delegated_agent_disabled indicates that whether bucket owner disable SP as the upload agent.
// when a bucket is created, by default, this is false, means SP is allowed to create object for delegator
bool sp_as_delegated_agent_disabled = 12;
}

message InternalBucketInfo {
Expand Down
Loading
Loading