Skip to content

Commit

Permalink
Fixes #1359 - add setownership and setblockpublicaccess commands + re…
Browse files Browse the repository at this point in the history
…lated entries in info

New commands:
- setownership: Allows to set the Object Ownership Control configuration
for a bucket.
- setblockpublicaccess: Allows to enable/disable Bucket Block Public
Access rules
  • Loading branch information
fviard committed Dec 11, 2023
1 parent a2b1bdf commit 32278a0
Show file tree
Hide file tree
Showing 3 changed files with 152 additions and 9 deletions.
85 changes: 78 additions & 7 deletions S3/S3.py
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,11 @@ def bucket_create(self, bucket, bucket_location = None, extra_headers = None):
check_bucket_name(bucket, dns_strict = False)
if self.config.acl_public:
headers["x-amz-acl"] = "public-read"
# AWS suddenly changed the default "ownership" control value mid 2023.
# ACL are disabled by default, so obviously the bucket can't be public.
# See: https://aws.amazon.com/fr/blogs/aws/heads-up-amazon-s3-security-changes-are-coming-in-april-of-2023/
# To be noted: "Block Public Access" flags should also be disabled after the bucket creation to be able to set a "public" acl for an object.
headers["x-amz-object-ownership"] = 'ObjectWriter'

request = self.create_request("BUCKET_CREATE", bucket = bucket, headers = headers, body = body)
response = self.send_request(request)
Expand Down Expand Up @@ -467,22 +472,88 @@ def get_bucket_requester_pays(self, uri):
response = self.send_request(request)
resp_data = response.get('data', '')
if resp_data:
payer = getTextFromXml(response['data'], "Payer")
payer = getTextFromXml(resp_data, "Payer")
else:
payer = None
return payer

def set_bucket_ownership(self, uri, ownership):
headers = SortedDict(ignore_case=True)
body = '<OwnershipControls xmlns="http://s3.amazonaws.com/doc/2006-03-01/">' \
'<Rule>' \
'<ObjectOwnership>%s</ObjectOwnership>' \
'</Rule>' \
'</OwnershipControls>'
body = body % ownership
debug(u"set_bucket_ownership(%s)" % body)
headers['content-md5'] = generate_content_md5(body)
request = self.create_request("BUCKET_CREATE", uri = uri,
headers = headers, body = body,
uri_params = {'ownershipControls': None})
response = self.send_request(request)
return response

def get_bucket_ownership(self, uri):
request = self.create_request("BUCKET_LIST", bucket=uri.bucket(),
uri_params={'ownershipControls': None})
response = self.send_request(request)
resp_data = response.get('data', '')
if resp_data:
ownership = getTextFromXml(resp_data, ".//Rule//ObjectOwnership")
else:
ownership = None
return ownership

def set_bucket_public_access_block(self, uri, flags):
headers = SortedDict(ignore_case=True)

body = '<PublicAccessBlockConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/">'
for tag in ('BlockPublicAcls', 'IgnorePublicAcls', 'BlockPublicPolicy', 'RestrictPublicBuckets'):
val = flags.get(tag, False) and "true" or "false"
body += '<%s>%s</%s>' % (tag, val, tag)
body += '</PublicAccessBlockConfiguration>'

debug(u"set_bucket_public_access_block(%s)" % body)
headers['content-md5'] = generate_content_md5(body)
request = self.create_request("BUCKET_CREATE", uri = uri,
headers = headers, body = body,
uri_params = {'publicAccessBlock': None})
response = self.send_request(request)
return response

def get_bucket_public_access_block(self, uri):
request = self.create_request("BUCKET_LIST", bucket=uri.bucket(),
uri_params={'publicAccessBlock': None})
response = self.send_request(request)
resp_data = response.get('data', '')
if resp_data:
flags = {
"BlockPublicAcls": getTextFromXml(resp_data, "BlockPublicAcls") == "true",
"IgnorePublicAcls": getTextFromXml(resp_data, "IgnorePublicAcls") == "true",
"BlockPublicPolicy": getTextFromXml(resp_data, "BlockPublicPolicy") == "true",
"RestrictPublicBuckets": getTextFromXml(resp_data, "RestrictPublicBuckets") == "true",
}
else:
flags = {}
return flags

def bucket_info(self, uri):
response = {}
response['bucket-location'] = self.get_bucket_location(uri)

for key, func in (('requester-pays', self.get_bucket_requester_pays),
('versioning', self.get_versioning),
('ownership', self.get_bucket_ownership)):
try:
response[key] = func(uri)
except S3Error as e:
response[key] = None

try:
response['requester-pays'] = self.get_bucket_requester_pays(uri)
except S3Error as e:
response['requester-pays'] = None
try:
response['versioning'] = self.get_versioning(uri)
response['public-access-block'] = self.get_bucket_public_access_block(uri)
except S3Error as e:
response['versioning'] = None
response['public-access-block'] = {}

return response

def website_info(self, uri, bucket_location = None):
Expand Down
12 changes: 11 additions & 1 deletion run-tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,6 @@ def pbucket(tail):
must_find = "Bucket '%s/' created" % pbucket(1))



## ====== Create multiple buckets
test_s3cmd("Create multiple buckets", ['mb', pbucket(2), pbucket(3)],
must_find = [ "Bucket '%s/' created" % pbucket(2), "Bucket '%s/' created" % pbucket(3)])
Expand All @@ -406,6 +405,17 @@ def pbucket(tail):
must_not_find_re = "Bucket.*created")


## ====== Enalbe ACLs and public access to buckets

Check failure on line 408 in run-tests.py

View workflow job for this annotation

GitHub Actions / Check for spelling errors

Enalbe ==> Enable
for idx, bpath in enumerate((pbucket(1), pbucket(2), pbucket(3))):
test_s3cmd("Enable ACLs for bucket %d" % idx, ['setownership', bpath, 'ObjectWriter'],
must_find = "%s/: Bucket Object Ownership updated" % bpath,
skip_if_profile = ['minio'])

test_s3cmd("Disable Block Public Access for bucket %d" % idx, ['setblockpublicaccess', bpath, ''],
must_find = "%s/: Block Public Access updated" % bpath,
skip_if_profile = ['minio'])


## ====== Buckets list
test_s3cmd("Buckets list", ["ls"],
must_find = [ pbucket(1), pbucket(2), pbucket(3) ], must_not_find_re = pbucket('EU'))
Expand Down
64 changes: 63 additions & 1 deletion s3cmd
Original file line number Diff line number Diff line change
Expand Up @@ -1084,8 +1084,11 @@ def cmd_info(args):
or 'none'))
output(u" Payer: %s" % (info['requester-pays']
or 'none'))
output(u" Ownership: %s" % (info['ownership']
or 'none'))
output(u" Versioning:%s" % (info['versioning']
or 'none'))

expiration = s3.expiration_info(uri, cfg.bucket_location)
if expiration and expiration['prefix'] is not None:
expiration_desc = "Expiration Rule: "
Expand All @@ -1100,7 +1103,14 @@ def cmd_info(args):
expiration_desc += expiration['date'] + "' "
output(u" %s" % expiration_desc)
else:
output(u" Expiration Rule: none")
output(u" Expiration rule: none")

public_access_block = ','.join([
key for key, val in info['public-access-block'].items()
if val
])
output(u" Block Public Access: %s" % (public_access_block
or 'none'))

try:
policy = s3.get_policy(uri)
Expand Down Expand Up @@ -2260,6 +2270,56 @@ def cmd_setversioning(args):
output(u"%s: Versioning status updated" % bucket_uri)
return EX_OK

def cmd_setownership(args):
cfg = Config()
s3 = S3(cfg)
bucket_uri = S3Uri(args[0])
if bucket_uri.object():
raise ParameterError("Only bucket name is required for [setownership] command")

valid_values = {x.lower():x for x in [
'BucketOwnerPreferred', 'BucketOwnerEnforced', 'ObjectWriter'
]}
value = valid_values.get(args[1].lower())
if not value:
choices = " or ".join(['%s' % x for x in valid_values.keys()])
raise ParameterError("Must be %s. Got: %s" % (choices, args[1]))

response = s3.set_bucket_ownership(bucket_uri, value)

debug(u"response - %s" % response['status'])
if response['status'] == 200:
output(u"%s: Bucket Object Ownership updated" % bucket_uri)
return EX_OK

def cmd_setblockpublicaccess(args):
cfg = Config()
s3 = S3(cfg)
bucket_uri = S3Uri(args[0])
if bucket_uri.object():
raise ParameterError("Only bucket name is required for [setblockpublicaccess] command")

valid_values = {x.lower():x for x in [
'BlockPublicAcls', 'IgnorePublicAcls', 'BlockPublicPolicy', 'RestrictPublicBuckets'
]}
flags = {}
raw_flags = args[1].split(',')
for raw_value in raw_flags:
if not raw_value:
continue
value = valid_values.get(raw_value.lower())
if not value:
choices = " or ".join(['%s' % x for x in valid_values.keys()])
raise ParameterError("Must be %s. Got: %s" % (choices, raw_value))
flags[value] = True

response = s3.set_bucket_public_access_block(bucket_uri, flags)

debug(u"response - %s" % response['status'])
if response['status'] == 200:
output(u"%s: Block Public Access updated" % bucket_uri)
return EX_OK

def cmd_setpolicy(args):
cfg = Config()
s3 = S3(cfg)
Expand Down Expand Up @@ -2865,6 +2925,8 @@ def get_commands_list():
{"cmd":"mv", "label":"Move object", "param":"s3://BUCKET1/OBJECT1 s3://BUCKET2[/OBJECT2]", "func":cmd_mv, "argc":2},
{"cmd":"setacl", "label":"Modify Access control list for Bucket or Files", "param":"s3://BUCKET[/OBJECT]", "func":cmd_setacl, "argc":1},
{"cmd":"setversioning", "label":"Modify Bucket Versioning", "param":"s3://BUCKET enable|disable", "func":cmd_setversioning, "argc":2},
{"cmd":"setownership", "label":"Modify Bucket Object Ownership", "param":"s3://BUCKET BucketOwnerPreferred|BucketOwnerEnforced|ObjectWriter", "func":cmd_setownership, "argc":2},
{"cmd":"setblockpublicaccess", "label":"Modify Block Public Access rules", "param":"s3://BUCKET BlockPublicAcls,IgnorePublicAcls,BlockPublicPolicy,RestrictPublicBuckets", "func":cmd_setblockpublicaccess, "argc":2},

{"cmd":"setobjectlegalhold", "label":"Modify Object Legal Hold", "param":"STATUS s3://BUCKET/OBJECT", "func":cmd_setobjectlegalhold, "argc":2},
{"cmd":"setobjectretention", "label":"Modify Object Retention", "param":"MODE RETAIN_UNTIL_DATE s3://BUCKET/OBJECT", "func":cmd_setobjectretention, "argc":3},
Expand Down

0 comments on commit 32278a0

Please sign in to comment.