[#259] Improve logging of boto3 client requests
Some checks reported warnings
DCO action / DCO (pull_request) Has been cancelled

Signed-off-by: Kirill Sosnovskikh <k.sosnovskikh@yadro.com>
This commit is contained in:
k.sosnovskikh 2024-07-10 17:17:27 +03:00
parent 429698944e
commit 996f92ffa7
2 changed files with 128 additions and 108 deletions

View file

@ -18,7 +18,7 @@ from frostfs_testlib.s3.interfaces import S3ClientWrapper, VersioningStatus, _ma
from frostfs_testlib.utils import string_utils
# TODO: Refactor this code to use shell instead of _cmd_run
from frostfs_testlib.utils.cli_utils import _configure_aws_cli, log_command_execution
from frostfs_testlib.utils.cli_utils import log_command_execution
from frostfs_testlib.utils.file_utils import TestFile
logger = logging.getLogger("NeoLogger")
@ -34,7 +34,15 @@ def report_error(func):
try:
return func(*a, **kw)
except ClientError as err:
log_command_execution("Result", str(err))
url = None
params = {"args": a, "kwargs": kw}
if isinstance(a[0], Boto3ClientWrapper):
client: Boto3ClientWrapper = a[0]
url = client.s3gate_endpoint
params = {"args": a[1:], "kwargs": kw}
log_command_execution(url, f"Failed {err.operation_name}", err.response, params)
raise
return deco
@ -90,7 +98,7 @@ class Boto3ClientWrapper(S3ClientWrapper):
verify=False,
)
def _to_s3_param(self, param: str):
def _to_s3_param(self, param: str) -> str:
replacement_map = {
"Acl": "ACL",
"Cors": "CORS",
@ -101,6 +109,11 @@ class Boto3ClientWrapper(S3ClientWrapper):
result = result.replace(find, replace)
return result
def _convert_to_s3_params(self, scope: dict, exclude: Optional[list[str]] = None) -> dict:
if not exclude:
exclude = ["self"]
return {self._to_s3_param(param): value for param, value in scope if param not in exclude and value is not None}
# BUCKET METHODS #
@reporter.step("Create bucket S3")
@report_error
@ -133,7 +146,7 @@ class Boto3ClientWrapper(S3ClientWrapper):
params.update({"CreateBucketConfiguration": {"LocationConstraint": location_constraint}})
s3_bucket = self.boto3_client.create_bucket(**params)
log_command_execution(f"Created S3 bucket {bucket}", s3_bucket)
log_command_execution(self.s3gate_endpoint, f"Created S3 bucket {bucket}", s3_bucket, params)
return bucket
@reporter.step("List buckets S3")
@ -142,7 +155,7 @@ class Boto3ClientWrapper(S3ClientWrapper):
found_buckets = []
response = self.boto3_client.list_buckets()
log_command_execution("S3 List buckets result", response)
log_command_execution(self.s3gate_endpoint, "S3 List buckets result", response)
for bucket in response["Buckets"]:
found_buckets.append(bucket["Name"])
@ -153,26 +166,27 @@ class Boto3ClientWrapper(S3ClientWrapper):
@report_error
def delete_bucket(self, bucket: str) -> None:
response = self.boto3_client.delete_bucket(Bucket=bucket)
log_command_execution("S3 Delete bucket result", response)
log_command_execution(self.s3gate_endpoint, "S3 Delete bucket result", response, {"Bucket": bucket})
@reporter.step("Head bucket S3")
@report_error
def head_bucket(self, bucket: str) -> None:
response = self.boto3_client.head_bucket(Bucket=bucket)
log_command_execution("S3 Head bucket result", response)
log_command_execution(self.s3gate_endpoint, "S3 Head bucket result", response, {"Bucket": bucket})
@reporter.step("Put bucket versioning status")
@report_error
def put_bucket_versioning(self, bucket: str, status: VersioningStatus) -> None:
response = self.boto3_client.put_bucket_versioning(Bucket=bucket, VersioningConfiguration={"Status": status.value})
log_command_execution("S3 Set bucket versioning to", response)
params = {"Bucket": bucket, "VersioningConfiguration": {"Status": status.value}}
response = self.boto3_client.put_bucket_versioning(**params)
log_command_execution(self.s3gate_endpoint, "S3 Set bucket versioning to", response, params)
@reporter.step("Get bucket versioning status")
@report_error
def get_bucket_versioning_status(self, bucket: str) -> Literal["Enabled", "Suspended"]:
response = self.boto3_client.get_bucket_versioning(Bucket=bucket)
status = response.get("Status")
log_command_execution("S3 Got bucket versioning status", response)
log_command_execution(self.s3gate_endpoint, "S3 Got bucket versioning status", response, {"Bucket": bucket})
return status
@reporter.step("Put bucket tagging")
@ -180,28 +194,29 @@ class Boto3ClientWrapper(S3ClientWrapper):
def put_bucket_tagging(self, bucket: str, tags: list) -> None:
tags = [{"Key": tag_key, "Value": tag_value} for tag_key, tag_value in tags]
tagging = {"TagSet": tags}
response = self.boto3_client.put_bucket_tagging(Bucket=bucket, Tagging=tagging)
log_command_execution("S3 Put bucket tagging", response)
params = self._convert_to_s3_params(locals().items(), exclude=["self", "tags"])
response = self.boto3_client.put_bucket_tagging(**params)
log_command_execution(self.s3gate_endpoint, "S3 Put bucket tagging", response, params)
@reporter.step("Get bucket tagging")
@report_error
def get_bucket_tagging(self, bucket: str) -> list:
response = self.boto3_client.get_bucket_tagging(Bucket=bucket)
log_command_execution("S3 Get bucket tagging", response)
log_command_execution(self.s3gate_endpoint, "S3 Get bucket tagging", response, {"Bucket": bucket})
return response.get("TagSet")
@reporter.step("Get bucket acl")
@report_error
def get_bucket_acl(self, bucket: str) -> list:
response = self.boto3_client.get_bucket_acl(Bucket=bucket)
log_command_execution("S3 Get bucket acl", response)
log_command_execution(self.s3gate_endpoint, "S3 Get bucket acl", response, {"Bucket": bucket})
return response.get("Grants")
@reporter.step("Delete bucket tagging")
@report_error
def delete_bucket_tagging(self, bucket: str) -> None:
response = self.boto3_client.delete_bucket_tagging(Bucket=bucket)
log_command_execution("S3 Delete bucket tagging", response)
log_command_execution(self.s3gate_endpoint, "S3 Delete bucket tagging", response, {"Bucket": bucket})
@reporter.step("Put bucket ACL")
@report_error
@ -212,71 +227,74 @@ class Boto3ClientWrapper(S3ClientWrapper):
grant_write: Optional[str] = None,
grant_read: Optional[str] = None,
) -> None:
params = {self._to_s3_param(param): value for param, value in locals().items() if param not in ["self"] and value is not None}
params = self._convert_to_s3_params(locals().items())
response = self.boto3_client.put_bucket_acl(**params)
log_command_execution("S3 ACL bucket result", response)
log_command_execution(self.s3gate_endpoint, "S3 ACL bucket result", response, params)
@reporter.step("Put object lock configuration")
@report_error
def put_object_lock_configuration(self, bucket: str, configuration: dict) -> dict:
response = self.boto3_client.put_object_lock_configuration(Bucket=bucket, ObjectLockConfiguration=configuration)
log_command_execution("S3 put_object_lock_configuration result", response)
params = {"Bucket": bucket, "ObjectLockConfiguration": configuration}
response = self.boto3_client.put_object_lock_configuration(**params)
log_command_execution(self.s3gate_endpoint, "S3 put_object_lock_configuration result", response, params)
return response
@reporter.step("Get object lock configuration")
@report_error
def get_object_lock_configuration(self, bucket: str) -> dict:
response = self.boto3_client.get_object_lock_configuration(Bucket=bucket)
log_command_execution("S3 get_object_lock_configuration result", response)
log_command_execution(self.s3gate_endpoint, "S3 get_object_lock_configuration result", response, {"Bucket": bucket})
return response.get("ObjectLockConfiguration")
@reporter.step("Get bucket policy")
@report_error
def get_bucket_policy(self, bucket: str) -> str:
response = self.boto3_client.get_bucket_policy(Bucket=bucket)
log_command_execution("S3 get_bucket_policy result", response)
log_command_execution(self.s3gate_endpoint, "S3 get_bucket_policy result", response, {"Bucket": bucket})
return response.get("Policy")
@reporter.step("Delete bucket policy")
@report_error
def delete_bucket_policy(self, bucket: str) -> str:
response = self.boto3_client.delete_bucket_policy(Bucket=bucket)
log_command_execution("S3 delete_bucket_policy result", response)
log_command_execution(self.s3gate_endpoint, "S3 delete_bucket_policy result", response, {"Bucket": bucket})
return response
@reporter.step("Put bucket policy")
@report_error
def put_bucket_policy(self, bucket: str, policy: dict) -> None:
response = self.boto3_client.put_bucket_policy(Bucket=bucket, Policy=json.dumps(policy))
log_command_execution("S3 put_bucket_policy result", response)
params = {"Bucket": bucket, "Policy": json.dumps(policy)}
response = self.boto3_client.put_bucket_policy(**params)
log_command_execution(self.s3gate_endpoint, "S3 put_bucket_policy result", response, params)
return response
@reporter.step("Get bucket cors")
@report_error
def get_bucket_cors(self, bucket: str) -> dict:
response = self.boto3_client.get_bucket_cors(Bucket=bucket)
log_command_execution("S3 get_bucket_cors result", response)
log_command_execution(self.s3gate_endpoint, "S3 get_bucket_cors result", response, {"Bucket": bucket})
return response.get("CORSRules")
@reporter.step("Get bucket location")
@report_error
def get_bucket_location(self, bucket: str) -> str:
response = self.boto3_client.get_bucket_location(Bucket=bucket)
log_command_execution("S3 get_bucket_location result", response)
log_command_execution(self.s3gate_endpoint, "S3 get_bucket_location result", response, {"Bucket": bucket})
return response.get("LocationConstraint")
@reporter.step("Put bucket cors")
@report_error
def put_bucket_cors(self, bucket: str, cors_configuration: dict) -> None:
response = self.boto3_client.put_bucket_cors(Bucket=bucket, CORSConfiguration=cors_configuration)
log_command_execution("S3 put_bucket_cors result", response)
params = self._convert_to_s3_params(locals().items())
response = self.boto3_client.put_bucket_cors(**params)
log_command_execution(self.s3gate_endpoint, "S3 put_bucket_cors result", response, params)
return response
@reporter.step("Delete bucket cors")
@report_error
def delete_bucket_cors(self, bucket: str) -> None:
response = self.boto3_client.delete_bucket_cors(Bucket=bucket)
log_command_execution("S3 delete_bucket_cors result", response)
log_command_execution(self.s3gate_endpoint, "S3 delete_bucket_cors result", response, {"Bucket": bucket})
# END OF BUCKET METHODS #
# OBJECT METHODS #
@ -284,8 +302,9 @@ class Boto3ClientWrapper(S3ClientWrapper):
@reporter.step("List objects S3 v2")
@report_error
def list_objects_v2(self, bucket: str, full_output: bool = False) -> Union[dict, list[str]]:
params = self._convert_to_s3_params(locals().items())
response = self.boto3_client.list_objects_v2(Bucket=bucket)
log_command_execution("S3 v2 List objects result", response)
log_command_execution(self.s3gate_endpoint, "S3 v2 List objects result", response, params)
obj_list = [obj["Key"] for obj in response.get("Contents", [])]
logger.info(f"Found s3 objects: {obj_list}")
@ -295,8 +314,9 @@ class Boto3ClientWrapper(S3ClientWrapper):
@reporter.step("List objects S3")
@report_error
def list_objects(self, bucket: str, full_output: bool = False) -> Union[dict, list[str]]:
params = self._convert_to_s3_params(locals().items())
response = self.boto3_client.list_objects(Bucket=bucket)
log_command_execution("S3 List objects result", response)
log_command_execution(self.s3gate_endpoint, "S3 List objects result", response, params)
obj_list = [obj["Key"] for obj in response.get("Contents", [])]
logger.info(f"Found s3 objects: {obj_list}")
@ -306,15 +326,17 @@ class Boto3ClientWrapper(S3ClientWrapper):
@reporter.step("List objects versions S3")
@report_error
def list_objects_versions(self, bucket: str, full_output: bool = False) -> dict:
params = self._convert_to_s3_params(locals().items())
response = self.boto3_client.list_object_versions(Bucket=bucket)
log_command_execution("S3 List objects versions result", response)
log_command_execution(self.s3gate_endpoint, "S3 List objects versions result", response, params)
return response if full_output else response.get("Versions", [])
@reporter.step("List objects delete markers S3")
@report_error
def list_delete_markers(self, bucket: str, full_output: bool = False) -> list:
params = self._convert_to_s3_params(locals().items())
response = self.boto3_client.list_object_versions(Bucket=bucket)
log_command_execution("S3 List objects delete markers result", response)
log_command_execution(self.s3gate_endpoint, "S3 List objects delete markers result", response, params)
return response if full_output else response.get("DeleteMarkers", [])
@reporter.step("Put object S3")
@ -339,36 +361,33 @@ class Boto3ClientWrapper(S3ClientWrapper):
with open(filepath, "rb") as put_file:
body = put_file.read()
params = {
self._to_s3_param(param): value
for param, value in locals().items()
if param not in ["self", "filepath", "put_file"] and value is not None
}
response = self.boto3_client.put_object(**params)
log_command_execution("S3 Put object result", response)
params = self._convert_to_s3_params(locals().items(), exclude=["self", "filepath", "put_file", "body"])
response = self.boto3_client.put_object(Body=body, **params)
log_command_execution(self.s3gate_endpoint, "S3 Put object result", response, params)
return response.get("VersionId")
@reporter.step("Head object S3")
@report_error
def head_object(self, bucket: str, key: str, version_id: Optional[str] = None) -> dict:
params = {self._to_s3_param(param): value for param, value in locals().items() if param not in ["self"] and value is not None}
params = self._convert_to_s3_params(locals().items())
response = self.boto3_client.head_object(**params)
log_command_execution("S3 Head object result", response)
log_command_execution(self.s3gate_endpoint, "S3 Head object result", response, params)
return response
@reporter.step("Delete object S3")
@report_error
def delete_object(self, bucket: str, key: str, version_id: Optional[str] = None) -> dict:
params = {self._to_s3_param(param): value for param, value in locals().items() if param not in ["self"] and value is not None}
params = self._convert_to_s3_params(locals().items())
response = self.boto3_client.delete_object(**params)
log_command_execution("S3 Delete object result", response)
log_command_execution(self.s3gate_endpoint, "S3 Delete object result", response, params)
return response
@reporter.step("Delete objects S3")
@report_error
def delete_objects(self, bucket: str, keys: list[str]) -> dict:
response = self.boto3_client.delete_objects(Bucket=bucket, Delete=_make_objs_dict(keys))
log_command_execution("S3 Delete objects result", response)
params = {"Bucket": bucket, "Delete": _make_objs_dict(keys)}
response = self.boto3_client.delete_objects(**params)
log_command_execution(self.s3gate_endpoint, "S3 Delete objects result", response, params)
assert (
"Errors" not in response
), f'The following objects have not been deleted: {[err_info["Key"] for err_info in response["Errors"]]}.\nError Message: {response["Errors"]["Message"]}'
@ -387,8 +406,9 @@ class Boto3ClientWrapper(S3ClientWrapper):
for object_version in object_versions
]
}
response = self.boto3_client.delete_objects(Bucket=bucket, Delete=delete_list)
log_command_execution("S3 Delete objects result", response)
params = {"Bucket": bucket, "Delete": delete_list}
response = self.boto3_client.delete_objects(**params)
log_command_execution(self.s3gate_endpoint, "S3 Delete objects result", response, params)
return response
@reporter.step("Delete object versions S3 without delete markers")
@ -396,8 +416,9 @@ class Boto3ClientWrapper(S3ClientWrapper):
def delete_object_versions_without_dm(self, bucket: str, object_versions: list) -> None:
# Delete objects without creating delete markers
for object_version in object_versions:
response = self.boto3_client.delete_object(Bucket=bucket, Key=object_version["Key"], VersionId=object_version["VersionId"])
log_command_execution("S3 Delete object result", response)
params = {"Bucket": bucket, "Key": object_version["Key"], "VersionId": object_version["VersionId"]}
response = self.boto3_client.delete_object(**params)
log_command_execution(self.s3gate_endpoint, "S3 Delete object result", response, params)
@reporter.step("Put object ACL")
@report_error
@ -409,17 +430,17 @@ class Boto3ClientWrapper(S3ClientWrapper):
grant_write: Optional[str] = None,
grant_read: Optional[str] = None,
) -> list:
params = {self._to_s3_param(param): value for param, value in locals().items() if param not in ["self"] and value is not None}
params = self._convert_to_s3_params(locals().items())
response = self.boto3_client.put_object_acl(**params)
log_command_execution("S3 put object ACL", response)
log_command_execution(self.s3gate_endpoint, "S3 put object ACL", response, params)
return response.get("Grants")
@reporter.step("Get object ACL")
@report_error
def get_object_acl(self, bucket: str, key: str, version_id: Optional[str] = None) -> list:
params = {self._to_s3_param(param): value for param, value in locals().items() if param not in ["self"] and value is not None}
params = self._convert_to_s3_params(locals().items())
response = self.boto3_client.get_object_acl(**params)
log_command_execution("S3 ACL objects result", response)
log_command_execution(self.s3gate_endpoint, "S3 ACL objects result", response, params)
return response.get("Grants")
@reporter.step("Copy object S3")
@ -442,13 +463,9 @@ class Boto3ClientWrapper(S3ClientWrapper):
key = string_utils.unique_name("copy-object-")
copy_source = f"{source_bucket}/{source_key}"
params = {
self._to_s3_param(param): value
for param, value in locals().items()
if param not in ["self", "source_bucket", "source_key"] and value is not None
}
params = self._convert_to_s3_params(locals().items(), exclude=["self", "source_bucket", "source_key"])
response = self.boto3_client.copy_object(**params)
log_command_execution("S3 Copy objects result", response)
log_command_execution(self.s3gate_endpoint, "S3 Copy objects result", response, params)
return key
@reporter.step("Get object S3")
@ -465,13 +482,12 @@ class Boto3ClientWrapper(S3ClientWrapper):
if object_range:
range_str = f"bytes={object_range[0]}-{object_range[1]}"
params = {
self._to_s3_param(param): value
for param, value in {**locals(), **{"Range": range_str}}.items()
if param not in ["self", "object_range", "full_output", "range_str", "filename"] and value is not None
}
params = self._convert_to_s3_params(
{**locals(), **{"Range": range_str}}.items(),
exclude=["self", "object_range", "full_output", "range_str"],
)
response = self.boto3_client.get_object(**params)
log_command_execution("S3 Get objects result", response)
log_command_execution(self.s3gate_endpoint, "S3 Get objects result", response, params)
if full_output:
return response
@ -487,8 +503,9 @@ class Boto3ClientWrapper(S3ClientWrapper):
@reporter.step("Create multipart upload S3")
@report_error
def create_multipart_upload(self, bucket: str, key: str) -> str:
response = self.boto3_client.create_multipart_upload(Bucket=bucket, Key=key)
log_command_execution("S3 Created multipart upload", response)
params = self._convert_to_s3_params(locals().items())
response = self.boto3_client.create_multipart_upload(**params)
log_command_execution(self.s3gate_endpoint, "S3 Created multipart upload", response, params)
assert response.get("UploadId"), f"Expected UploadId in response:\n{response}"
return response["UploadId"]
@ -497,15 +514,16 @@ class Boto3ClientWrapper(S3ClientWrapper):
@report_error
def list_multipart_uploads(self, bucket: str) -> Optional[list[dict]]:
response = self.boto3_client.list_multipart_uploads(Bucket=bucket)
log_command_execution("S3 List multipart upload", response)
log_command_execution(self.s3gate_endpoint, "S3 List multipart upload", response, {"Bucket": bucket})
return response.get("Uploads")
@reporter.step("Abort multipart upload S3")
@report_error
def abort_multipart_upload(self, bucket: str, key: str, upload_id: str) -> None:
response = self.boto3_client.abort_multipart_upload(Bucket=bucket, Key=key, UploadId=upload_id)
log_command_execution("S3 Abort multipart upload", response)
params = self._convert_to_s3_params(locals().items())
response = self.boto3_client.abort_multipart_upload(**params)
log_command_execution(self.s3gate_endpoint, "S3 Abort multipart upload", response, params)
@reporter.step("Upload part S3")
@report_error
@ -513,14 +531,10 @@ class Boto3ClientWrapper(S3ClientWrapper):
with open(filepath, "rb") as put_file:
body = put_file.read()
response = self.boto3_client.upload_part(
UploadId=upload_id,
Bucket=bucket,
Key=key,
PartNumber=part_num,
Body=body,
)
log_command_execution("S3 Upload part", response)
params = self._convert_to_s3_params(locals().items(), exclude=["self", "put_file", "part_num", "filepath", "body"])
params["PartNumber"] = part_num
response = self.boto3_client.upload_part(Body=body, **params)
log_command_execution(self.s3gate_endpoint, "S3 Upload part", response, params)
assert response.get("ETag"), f"Expected ETag in response:\n{response}"
return response["ETag"]
@ -528,14 +542,10 @@ class Boto3ClientWrapper(S3ClientWrapper):
@reporter.step("Upload copy part S3")
@report_error
def upload_part_copy(self, bucket: str, key: str, upload_id: str, part_num: int, copy_source: str) -> str:
response = self.boto3_client.upload_part_copy(
UploadId=upload_id,
Bucket=bucket,
Key=key,
PartNumber=part_num,
CopySource=copy_source,
)
log_command_execution("S3 Upload copy part", response)
params = self._convert_to_s3_params(locals().items(), exclude=["self", "put_file", "part_num", "filepath"])
params["PartNumber"] = part_num
response = self.boto3_client.upload_part_copy(**params)
log_command_execution(self.s3gate_endpoint, "S3 Upload copy part", response, params)
assert response.get("CopyPartResult", []).get("ETag"), f"Expected ETag in response:\n{response}"
return response["CopyPartResult"]["ETag"]
@ -543,8 +553,9 @@ class Boto3ClientWrapper(S3ClientWrapper):
@reporter.step("List parts S3")
@report_error
def list_parts(self, bucket: str, key: str, upload_id: str) -> list[dict]:
response = self.boto3_client.list_parts(UploadId=upload_id, Bucket=bucket, Key=key)
log_command_execution("S3 List part", response)
params = self._convert_to_s3_params(locals().items())
response = self.boto3_client.list_parts(**params)
log_command_execution(self.s3gate_endpoint, "S3 List part", response, params)
assert response.get("Parts"), f"Expected Parts in response:\n{response}"
return response["Parts"]
@ -553,8 +564,10 @@ class Boto3ClientWrapper(S3ClientWrapper):
@report_error
def complete_multipart_upload(self, bucket: str, key: str, upload_id: str, parts: list) -> None:
parts = [{"ETag": etag, "PartNumber": part_num} for part_num, etag in parts]
response = self.boto3_client.complete_multipart_upload(Bucket=bucket, Key=key, UploadId=upload_id, MultipartUpload={"Parts": parts})
log_command_execution("S3 Complete multipart upload", response)
params = self._convert_to_s3_params(locals().items(), exclude=["self", "parts"])
params["MultipartUpload"] = {"Parts": parts}
response = self.boto3_client.complete_multipart_upload(**params)
log_command_execution(self.s3gate_endpoint, "S3 Complete multipart upload", response, params)
return response
@ -568,9 +581,9 @@ class Boto3ClientWrapper(S3ClientWrapper):
version_id: Optional[str] = None,
bypass_governance_retention: Optional[bool] = None,
) -> None:
params = {self._to_s3_param(param): value for param, value in locals().items() if param not in ["self"] and value is not None}
params = self._convert_to_s3_params(locals().items())
response = self.boto3_client.put_object_retention(**params)
log_command_execution("S3 Put object retention ", response)
log_command_execution(self.s3gate_endpoint, "S3 Put object retention ", response, params)
@reporter.step("Put object legal hold")
@report_error
@ -582,35 +595,33 @@ class Boto3ClientWrapper(S3ClientWrapper):
version_id: Optional[str] = None,
) -> None:
legal_hold = {"Status": legal_hold_status}
params = {
self._to_s3_param(param): value
for param, value in locals().items()
if param not in ["self", "legal_hold_status"] and value is not None
}
params = self._convert_to_s3_params(locals().items(), exclude=["self", "legal_hold_status"])
response = self.boto3_client.put_object_legal_hold(**params)
log_command_execution("S3 Put object legal hold ", response)
log_command_execution(self.s3gate_endpoint, "S3 Put object legal hold ", response, params)
@reporter.step("Put object tagging")
@report_error
def put_object_tagging(self, bucket: str, key: str, tags: list, version_id: Optional[str] = "") -> None:
tags = [{"Key": tag_key, "Value": tag_value} for tag_key, tag_value in tags]
tagging = {"TagSet": tags}
response = self.boto3_client.put_object_tagging(Bucket=bucket, Key=key, Tagging=tagging, VersionId=version_id)
log_command_execution("S3 Put object tagging", response)
params = self._convert_to_s3_params(locals().items(), exclude=["self", "tags"])
response = self.boto3_client.put_object_tagging(**params)
log_command_execution(self.s3gate_endpoint, "S3 Put object tagging", response, params)
@reporter.step("Get object tagging")
@report_error
def get_object_tagging(self, bucket: str, key: str, version_id: Optional[str] = None) -> list:
params = {self._to_s3_param(param): value for param, value in locals().items() if param not in ["self"] and value is not None}
params = self._convert_to_s3_params(locals().items())
response = self.boto3_client.get_object_tagging(**params)
log_command_execution("S3 Get object tagging", response)
log_command_execution(self.s3gate_endpoint, "S3 Get object tagging", response, params)
return response.get("TagSet")
@reporter.step("Delete object tagging")
@report_error
def delete_object_tagging(self, bucket: str, key: str) -> None:
response = self.boto3_client.delete_object_tagging(Bucket=bucket, Key=key)
log_command_execution("S3 Delete object tagging", response)
params = self._convert_to_s3_params(locals().items())
response = self.boto3_client.delete_object_tagging(**params)
log_command_execution(self.s3gate_endpoint, "S3 Delete object tagging", response, params)
@reporter.step("Get object attributes")
@report_error

View file

@ -15,7 +15,7 @@ from contextlib import suppress
from datetime import datetime
from io import StringIO
from textwrap import shorten
from typing import Dict, List, TypedDict, Union
from typing import Dict, List, Optional, TypedDict, Union
import pexpect
@ -75,12 +75,21 @@ def _attach_allure_log(cmd: str, output: str, return_code: int, start_time: date
reporter.attach(command_attachment, "Command execution")
def log_command_execution(cmd: str, output: Union[str, TypedDict]) -> None:
def log_command_execution(url: str, cmd: str, output: Union[str, TypedDict], params: Optional[dict] = None) -> None:
logger.info(f"{cmd}: {output}")
with suppress(Exception):
json_output = json.dumps(output, indent=4, sort_keys=True)
output = json_output
command_attachment = f"COMMAND: '{cmd}'\n" f"OUTPUT:\n {output}\n"
try:
json_params = json.dumps(params, indent=4, sort_keys=True)
except TypeError as err:
logger.warning(f"Failed to serialize '{cmd}' request parameters:\n{params}\nException: {err}")
else:
params = json_params
command_attachment = f"COMMAND: '{cmd}'\n" f"URL: {url}\n" f"PARAMS:\n{params}\n" f"OUTPUT:\n{output}\n"
with reporter.step(f'COMMAND: {shorten(cmd, width=60, placeholder="...")}'):
reporter.attach(command_attachment, "Command execution")