diff --git a/src/frostfs_testlib/resources/error_patterns.py b/src/frostfs_testlib/resources/error_patterns.py index e2e4c48..e92b33d 100644 --- a/src/frostfs_testlib/resources/error_patterns.py +++ b/src/frostfs_testlib/resources/error_patterns.py @@ -23,6 +23,5 @@ INVALID_RANGE_OVERFLOW = "invalid '{range}' range: uint64 overflow" INVALID_OFFSET_SPECIFIER = "invalid '{range}' range offset specifier" INVALID_LENGTH_SPECIFIER = "invalid '{range}' range length specifier" -S3_MALFORMED_XML_REQUEST = ( - "The XML you provided was not well-formed or did not validate against our published schema." -) +S3_BUCKET_DOES_NOT_ALLOW_ACL = "The bucket does not allow ACLs" +S3_MALFORMED_XML_REQUEST = "The XML you provided was not well-formed or did not validate against our published schema." diff --git a/src/frostfs_testlib/resources/s3_acl_grants.py b/src/frostfs_testlib/resources/s3_acl_grants.py new file mode 100644 index 0000000..37005e8 --- /dev/null +++ b/src/frostfs_testlib/resources/s3_acl_grants.py @@ -0,0 +1,9 @@ +ALL_USERS_GROUP_URI = "http://acs.amazonaws.com/groups/global/AllUsers" +ALL_USERS_GROUP_WRITE_GRANT = {"Grantee": {"Type": "Group", "URI": ALL_USERS_GROUP_URI}, "Permission": "WRITE"} +ALL_USERS_GROUP_READ_GRANT = {"Grantee": {"Type": "Group", "URI": ALL_USERS_GROUP_URI}, "Permission": "READ"} +CANONICAL_USER_FULL_CONTROL_GRANT = {"Grantee": {"Type": "CanonicalUser"}, "Permission": "FULL_CONTROL"} + +# https://docs.aws.amazon.com/AmazonS3/latest/userguide/acl-overview.html#canned-acl +PRIVATE_GRANTS = [CANONICAL_USER_FULL_CONTROL_GRANT] +PUBLIC_READ_GRANTS = [CANONICAL_USER_FULL_CONTROL_GRANT, ALL_USERS_GROUP_READ_GRANT] +PUBLIC_READ_WRITE_GRANTS = [CANONICAL_USER_FULL_CONTROL_GRANT, ALL_USERS_GROUP_WRITE_GRANT, ALL_USERS_GROUP_READ_GRANT] diff --git a/src/frostfs_testlib/steps/s3/s3_helper.py b/src/frostfs_testlib/steps/s3/s3_helper.py index baf362b..ab0cee3 100644 --- a/src/frostfs_testlib/steps/s3/s3_helper.py +++ b/src/frostfs_testlib/steps/s3/s3_helper.py @@ -120,32 +120,28 @@ def assert_object_lock_mode( ).days == retain_period, f"Expected retention period is {retain_period} days" -def assert_s3_acl(acl_grants: list, permitted_users: str): - if permitted_users == "AllUsers": - grantees = {"AllUsers": 0, "CanonicalUser": 0} - for acl_grant in acl_grants: - if acl_grant.get("Grantee", {}).get("Type") == "Group": - uri = acl_grant.get("Grantee", {}).get("URI") - permission = acl_grant.get("Permission") - assert (uri, permission) == ( - "http://acs.amazonaws.com/groups/global/AllUsers", - "FULL_CONTROL", - ), "All Groups should have FULL_CONTROL" - grantees["AllUsers"] += 1 - if acl_grant.get("Grantee", {}).get("Type") == "CanonicalUser": - permission = acl_grant.get("Permission") - assert permission == "FULL_CONTROL", "Canonical User should have FULL_CONTROL" - grantees["CanonicalUser"] += 1 - assert grantees["AllUsers"] >= 1, "All Users should have FULL_CONTROL" - assert grantees["CanonicalUser"] >= 1, "Canonical User should have FULL_CONTROL" +def _format_grants_as_strings(grants: list[dict]) -> list: + grantee_format = "{g_type}::{uri}:{permission}" + return set( + [ + grantee_format.format( + g_type=grant.get("Grantee", {}).get("Type", ""), + uri=grant.get("Grantee", {}).get("URI", ""), + permission=grant.get("Permission", ""), + ) + for grant in grants + ] + ) - if permitted_users == "CanonicalUser": - for acl_grant in acl_grants: - if acl_grant.get("Grantee", {}).get("Type") == "CanonicalUser": - permission = acl_grant.get("Permission") - assert permission == "FULL_CONTROL", "Only CanonicalUser should have FULL_CONTROL" - else: - logger.error("FULL_CONTROL is given to All Users") + +@reporter.step("Verify ACL permissions") +def verify_acl_permissions(actual_acl_grants: list[dict], expected_acl_grants: list[dict], strict: bool = True): + actual_grants = _format_grants_as_strings(actual_acl_grants) + expected_grants = _format_grants_as_strings(expected_acl_grants) + + assert expected_grants <= actual_grants, "Permissions mismatch" + if strict: + assert expected_grants == actual_grants, "Extra permissions found, must not be there" @reporter.step("Delete bucket with all objects")