From 10d46f4e338eba328d40bad2c39d0177d4e0a34c Mon Sep 17 00:00:00 2001 From: Yuval Lifshitz Date: Thu, 8 Aug 2024 18:00:01 +0000 Subject: [PATCH] rgw/logging: basic set of tests for bucket logging Signed-off-by: Yuval Lifshitz --- README.rst | 14 + pytest.ini | 1 + s3tests_boto3/functional/test_s3.py | 1165 ++++++++++++++++++++++++++- 3 files changed, 1162 insertions(+), 18 deletions(-) diff --git a/README.rst b/README.rst index e34c7a1..283b7e8 100644 --- a/README.rst +++ b/README.rst @@ -100,3 +100,17 @@ You can filter tests based on their attributes:: S3TEST_CONF=your.conf tox -- s3tests_boto3/functional/test_iam.py -m 'not fails_on_rgw' +======================== + Bucket logging tests +======================== + +Ceph has extensions for the bucket logging S3 API. For the tests to cover these extensions, the following file: `examples/rgw/boto3/service-2.sdk-extras.json` from the Ceph repo, +should be copied to the: `~/.aws/models/s3/2006-03-01/` directory on the machine where the tests are run. +If the file is not present, the tests will still run, but the extension tests will be skipped. In this case, the bucket logging object roll time must be decreased manually from its default of +300 seconds to 5 seconds:: + + vstart.sh -o rgw_bucket_log_object_roll_time=5 + +Then the tests can be run with:: + + S3TEST_CONF=your.conf tox -- -m 'bucket_logging' diff --git a/pytest.ini b/pytest.ini index 1a7d9a8..4aafd65 100644 --- a/pytest.ini +++ b/pytest.ini @@ -7,6 +7,7 @@ markers = auth_common bucket_policy bucket_encryption + bucket_logging checksum cloud_transition encryption diff --git a/s3tests_boto3/functional/test_s3.py b/s3tests_boto3/functional/test_s3.py index 85dfba1..039ea11 100644 --- a/s3tests_boto3/functional/test_s3.py +++ b/s3tests_boto3/functional/test_s3.py @@ -4929,24 +4929,6 @@ def test_bucket_acl_revoke_all(): policy['Grants'] = old_grants client.put_bucket_acl(Bucket=bucket_name, AccessControlPolicy=policy) -# TODO rgw log_bucket.set_as_logging_target() gives 403 Forbidden -# http://tracker.newdream.net/issues/984 -@pytest.mark.fails_on_rgw -def test_logging_toggle(): - bucket_name = get_new_bucket() - client = get_client() - - main_display_name = get_main_display_name() - main_user_id = get_main_user_id() - - status = {'LoggingEnabled': {'TargetBucket': bucket_name, 'TargetGrants': [{'Grantee': {'DisplayName': main_display_name, 'ID': main_user_id,'Type': 'CanonicalUser'},'Permission': 'FULL_CONTROL'}], 'TargetPrefix': 'foologgingprefix'}} - - client.put_bucket_logging(Bucket=bucket_name, BucketLoggingStatus=status) - client.get_bucket_logging(Bucket=bucket_name) - status = {'LoggingEnabled': {}} - client.put_bucket_logging(Bucket=bucket_name, BucketLoggingStatus=status) - # NOTE: this does not actually test whether or not logging works - def _setup_access(bucket_acl, object_acl): """ Simple test fixture: create a bucket with given ACL, with objects: @@ -13931,3 +13913,1150 @@ def test_post_object_upload_checksum(): r = requests.post(url, files=payload, verify=get_config_ssl_verify()) assert r.status_code == 400 + + +def _has_bucket_logging_extension(): + src_bucket_name = get_new_bucket_name() + src_bucket = get_new_bucket_resource(name=src_bucket_name) + log_bucket_name = get_new_bucket_name() + log_bucket = get_new_bucket_resource(name=log_bucket_name) + client = get_client() + logging_enabled = {'TargetBucket': log_bucket_name, 'TargetPrefix': 'log/', 'LoggingType': 'Journal'} + try: + response = client.put_bucket_logging(Bucket=src_bucket_name, BucketLoggingStatus={ + 'LoggingEnabled': logging_enabled, + }) + except ParamValidationError as e: + return False + return True + +def _has_taget_object_key_format(): + src_bucket_name = get_new_bucket_name() + src_bucket = get_new_bucket_resource(name=src_bucket_name) + log_bucket_name = get_new_bucket_name() + log_bucket = get_new_bucket_resource(name=log_bucket_name) + client = get_client() + logging_enabled = {'TargetBucket': log_bucket_name, 'TargetPrefix': 'log/', 'TargetObjectKeyFormat': {'SimplePrefix': {}}} + try: + response = client.put_bucket_logging(Bucket=src_bucket_name, BucketLoggingStatus={ + 'LoggingEnabled': logging_enabled, + }) + except ParamValidationError as e: + return False + return True + + +import shlex + +def _parse_standard_log_record(record): + record = record.replace('[', '"').replace(']', '"') + chunks = shlex.split(record) + assert len(chunks) == 26 + return json.dumps({ + 'BucketOwner': chunks[0], + 'BucketName': chunks[1], + 'RequestDateTime': chunks[2], + 'RemoteIP': chunks[3], + 'Requester': chunks[4], + 'RequestID': chunks[5], + 'Operation': chunks[6], + 'Key': chunks[7], + 'RequestURI': chunks[8], + 'HTTPStatus': chunks[9], + 'ErrorCode': chunks[10], + 'BytesSent': chunks[11], + 'ObjectSize': chunks[12], + 'TotalTime': chunks[13], + 'TurnAroundTime': chunks[14], + 'Referrer': chunks[15], + 'UserAgent': chunks[16], + 'VersionID': chunks[17], + 'HostID': chunks[18], + 'SigVersion': chunks[19], + 'CipherSuite': chunks[20], + 'AuthType': chunks[21], + 'HostHeader': chunks[22], + 'TLSVersion': chunks[23], + 'AccessPointARN': chunks[24], + 'ACLRequired': chunks[25], + }, indent=4) + + +def _parse_journal_log_record(record): + record = record.replace('[', '"').replace(']', '"') + chunks = shlex.split(record) + assert len(chunks) == 8 + return json.dumps({ + 'BucketOwner': chunks[0], + 'BucketName': chunks[1], + 'RequestDateTime': chunks[2], + 'Operation': chunks[3], + 'Key': chunks[4], + 'ObjectSize': chunks[5], + 'VersionID': chunks[6], + 'ETAG': chunks[7], + }, indent=4) + +def _parse_log_record(record, record_type): + if record_type == 'Standard': + return _parse_standard_log_record(record) + elif record_type == 'Journal': + return _parse_journal_log_record(record) + else: + assert False, 'unknown log record type' + +expected_object_roll_time = 5 + +@pytest.mark.bucket_logging +def test_put_bucket_logging(): + src_bucket_name = get_new_bucket_name() + src_bucket = get_new_bucket_resource(name=src_bucket_name) + log_bucket_name = get_new_bucket_name() + log_bucket = get_new_bucket_resource(name=log_bucket_name) + client = get_client() + has_extensions = _has_bucket_logging_extension() + has_key_format = _has_taget_object_key_format() + + # minimal configuration + logging_enabled = { + 'TargetBucket': log_bucket_name, + 'TargetPrefix': 'log/' + } + + if has_extensions: + logging_enabled['ObjectRollTime'] = expected_object_roll_time + response = client.put_bucket_logging(Bucket=src_bucket_name, BucketLoggingStatus={ + 'LoggingEnabled': logging_enabled, + }) + assert response['ResponseMetadata']['HTTPStatusCode'] == 200 + response = client.get_bucket_logging(Bucket=src_bucket_name) + assert response['ResponseMetadata']['HTTPStatusCode'] == 200 + if has_extensions: + logging_enabled['LoggingType'] = 'Standard' + logging_enabled['RecordsBatchSize'] = 0 + if has_key_format: + # default value for key prefix is returned + logging_enabled['TargetObjectKeyFormat'] = {'SimplePrefix': {}} + assert response['LoggingEnabled'] == logging_enabled + + if has_key_format: + # with simple target object prefix + logging_enabled = { + 'TargetBucket': log_bucket_name, + 'TargetPrefix': 'log/', + 'TargetObjectKeyFormat': { + 'SimplePrefix': {} + } + } + if has_extensions: + logging_enabled['ObjectRollTime'] = expected_object_roll_time + response = client.put_bucket_logging(Bucket=src_bucket_name, BucketLoggingStatus={ + 'LoggingEnabled': logging_enabled, + }) + assert response['ResponseMetadata']['HTTPStatusCode'] == 200 + response = client.get_bucket_logging(Bucket=src_bucket_name) + assert response['ResponseMetadata']['HTTPStatusCode'] == 200 + if has_extensions: + logging_enabled['LoggingType'] = 'Standard' + logging_enabled['RecordsBatchSize'] = 0 + assert response['LoggingEnabled'] == logging_enabled + + # with partitioned target object prefix + logging_enabled = { + 'TargetBucket': log_bucket_name, + 'TargetPrefix': 'log/', + 'TargetObjectKeyFormat': { + 'PartitionedPrefix': { + 'PartitionDateSource': 'DeliveryTime' + } + } + } + if has_extensions: + logging_enabled['ObjectRollTime'] = expected_object_roll_time + response = client.put_bucket_logging(Bucket=src_bucket_name, BucketLoggingStatus={ + 'LoggingEnabled': logging_enabled, + }) + assert response['ResponseMetadata']['HTTPStatusCode'] == 200 + response = client.get_bucket_logging(Bucket=src_bucket_name) + assert response['ResponseMetadata']['HTTPStatusCode'] == 200 + if has_extensions: + logging_enabled['LoggingType'] = 'Standard' + logging_enabled['RecordsBatchSize'] = 0 + assert response['LoggingEnabled'] == logging_enabled + + # with target grant (not implemented in RGW) + main_display_name = get_main_display_name() + main_user_id = get_main_user_id() + logging_enabled = { + 'TargetBucket': log_bucket_name, + 'TargetPrefix': 'log/', + 'TargetGrants': [{'Grantee': {'DisplayName': main_display_name, 'ID': main_user_id,'Type': 'CanonicalUser'},'Permission': 'FULL_CONTROL'}] + } + if has_extensions: + logging_enabled['ObjectRollTime'] = expected_object_roll_time + response = client.put_bucket_logging(Bucket=src_bucket_name, BucketLoggingStatus={ + 'LoggingEnabled': logging_enabled, + }) + assert response['ResponseMetadata']['HTTPStatusCode'] == 200 + response = client.get_bucket_logging(Bucket=src_bucket_name) + assert response['ResponseMetadata']['HTTPStatusCode'] == 200 + if has_extensions: + logging_enabled['LoggingType'] = 'Standard' + logging_enabled['RecordsBatchSize'] = 0 + # target grants are not implemented + logging_enabled.pop('TargetGrants') + if has_key_format: + # default value for key prefix is returned + logging_enabled['TargetObjectKeyFormat'] = {'SimplePrefix': {}} + assert response['LoggingEnabled'] == logging_enabled + + +@pytest.mark.bucket_logging +def test_put_bucket_logging_errors(): + src_bucket_name = get_new_bucket_name() + src_bucket = get_new_bucket_resource(name=src_bucket_name) + log_bucket_name1 = get_new_bucket_name() + log_bucket1 = get_new_bucket_resource(name=log_bucket_name1) + client = get_client() + + # invalid source bucket + try: + response = client.put_bucket_logging(Bucket=src_bucket_name+'kaboom', BucketLoggingStatus={ + 'LoggingEnabled': {'TargetBucket': log_bucket_name1, 'TargetPrefix': 'log/'}, + }) + assert False, 'expected failure' + except ClientError as e: + assert e.response['Error']['Code'] == 'NoSuchBucket' + + # invalid log bucket + try: + response = client.put_bucket_logging(Bucket=src_bucket_name, BucketLoggingStatus={ + 'LoggingEnabled': {'TargetBucket': log_bucket_name1+'kaboom', 'TargetPrefix': 'log/'}, + }) + assert False, 'expected failure' + except ClientError as e: + assert e.response['Error']['Code'] == 'NoSuchKey' + + # log bucket has bucket logging + log_bucket_name2 = get_new_bucket_name() + log_bucket2 = get_new_bucket_resource(name=log_bucket_name2) + response = client.put_bucket_logging(Bucket=log_bucket_name2, BucketLoggingStatus={ + 'LoggingEnabled': {'TargetBucket': log_bucket_name1, 'TargetPrefix': 'log/'}, + }) + assert response['ResponseMetadata']['HTTPStatusCode'] == 200 + try: + response = client.put_bucket_logging(Bucket=src_bucket_name, BucketLoggingStatus={ + 'LoggingEnabled': {'TargetBucket': log_bucket_name2, 'TargetPrefix': 'log/'}, + }) + assert False, 'expected failure' + except ClientError as e: + assert e.response['Error']['Code'] == 'InvalidArgument' + + if _has_taget_object_key_format(): + # invalid partition prefix + logging_enabled = { + 'TargetBucket': log_bucket_name1, + 'TargetPrefix': 'log/', + 'TargetObjectKeyFormat': { + 'PartitionedPrefix': { + 'PartitionDateSource': 'kaboom' + } + } + } + try: + response = client.put_bucket_logging(Bucket=src_bucket_name, BucketLoggingStatus={ + 'LoggingEnabled': logging_enabled, + }) + assert False, 'expected failure' + except ClientError as e: + assert e.response['Error']['Code'] == 'MalformedXML' + + # TODO: log bucket is encrypted + #_put_bucket_encryption_s3(client, log_bucket_name) + #try: + # response = client.put_bucket_logging(Bucket=src_bucket_name, BucketLoggingStatus={ + # 'LoggingEnabled': {'TargetBucket': log_bucket_name, 'TargetPrefix': 'log/'}, + # }) + # assert False, 'expected failure' + #except ClientError as e: + # assert e.response['Error']['Code'] == 'InvalidArgument' + + if _has_bucket_logging_extension(): + try: + response = client.put_bucket_logging(Bucket=src_bucket_name, BucketLoggingStatus={ + 'LoggingEnabled': {'TargetBucket': log_bucket_name1, 'TargetPrefix': 'log/', 'LoggingType': 'kaboom'}, + }) + assert False, 'expected failure' + except ClientError as e: + assert e.response['Error']['Code'] == 'MalformedXML' + + +@pytest.mark.bucket_logging +def test_rm_bucket_logging(): + src_bucket_name = get_new_bucket_name() + src_bucket = get_new_bucket_resource(name=src_bucket_name) + log_bucket_name = get_new_bucket_name() + log_bucket = get_new_bucket_resource(name=log_bucket_name) + client = get_client() + logging_enabled = {'TargetBucket': log_bucket_name, 'TargetPrefix': 'log/'} + response = client.put_bucket_logging(Bucket=src_bucket_name, BucketLoggingStatus={ + 'LoggingEnabled': logging_enabled, + }) + assert response['ResponseMetadata']['HTTPStatusCode'] == 200 + + response = client.put_bucket_logging(Bucket=src_bucket_name, BucketLoggingStatus={}) + assert response['ResponseMetadata']['HTTPStatusCode'] == 200 + + response = client.get_bucket_logging(Bucket=src_bucket_name) + assert response['ResponseMetadata']['HTTPStatusCode'] == 200 + assert not 'LoggingEnabled' in response + + +@pytest.mark.bucket_logging +@pytest.mark.fails_on_aws +def test_put_bucket_logging_extensions(): + if not _has_bucket_logging_extension(): + pytest.skip('ceph extension to bucket logging not supported at client') + src_bucket_name = get_new_bucket_name() + src_bucket = get_new_bucket_resource(name=src_bucket_name) + log_bucket_name = get_new_bucket_name() + log_bucket = get_new_bucket_resource(name=log_bucket_name) + client = get_client() + logging_enabled = {'TargetBucket': log_bucket_name, + 'TargetPrefix': 'log/', + 'LoggingType': 'Standard', + 'ObjectRollTime': expected_object_roll_time, + 'RecordsBatchSize': 0 + } + response = client.put_bucket_logging(Bucket=src_bucket_name, BucketLoggingStatus={ + 'LoggingEnabled': logging_enabled, + }) + assert response['ResponseMetadata']['HTTPStatusCode'] == 200 + + response = client.get_bucket_logging(Bucket=src_bucket_name) + assert response['ResponseMetadata']['HTTPStatusCode'] == 200 + + logging_enabled['TargetObjectKeyFormat'] = {'SimplePrefix': {}} + assert response['LoggingEnabled'] == logging_enabled + + +import logging + +logger = logging.getLogger(__name__) + +def _verify_records(records, bucket_name, event_type, src_keys, record_type, expected_count): + keys_found = [] + for record in iter(records.splitlines()): + logger.info('bucket log record: %s', _parse_log_record(record, record_type)) + if bucket_name in record and event_type in record: + for key in src_keys: + if key in record: + keys_found.append(key) + break + logger.info('keys found in bucket log: %s', str(keys_found)) + logger.info('keys from the source bucket: %s', str(src_keys)) + return len(keys_found) == expected_count + +def randcontent(): + letters = string.ascii_lowercase + length = random.randint(10, 1024) + return ''.join(random.choice(letters) for i in range(length)) + + +def _bucket_logging_put_objects(versioned): + src_bucket_name = get_new_bucket() + if versioned: + check_configure_versioning_retry(src_bucket_name, "Enabled", "Enabled") + log_bucket_name = get_new_bucket_name() + log_bucket = get_new_bucket_resource(name=log_bucket_name) + client = get_client() + has_extensions = _has_bucket_logging_extension() + + # minimal configuration + logging_enabled = {'TargetBucket': log_bucket_name, 'TargetPrefix': 'log/'} + if has_extensions: + logging_enabled['ObjectRollTime'] = expected_object_roll_time + logging_enabled['LoggingType'] = 'Journal' + response = client.put_bucket_logging(Bucket=src_bucket_name, BucketLoggingStatus={ + 'LoggingEnabled': logging_enabled, + }) + assert response['ResponseMetadata']['HTTPStatusCode'] == 200 + + num_keys = 5 + for j in range(num_keys): + name = 'myobject'+str(j) + client.put_object(Bucket=src_bucket_name, Key=name, Body=randcontent()) + if versioned: + client.put_object(Bucket=src_bucket_name, Key=name, Body=randcontent()) + + if versioned: + expected_count = 2*num_keys + else: + expected_count = num_keys + + response = client.list_objects_v2(Bucket=src_bucket_name) + src_keys = _get_keys(response) + + time.sleep(expected_object_roll_time) + client.put_object(Bucket=src_bucket_name, Key='dummy', Body='dummy') + + response = client.list_objects_v2(Bucket=log_bucket_name) + keys = _get_keys(response) + assert len(keys) == 1 + + record_type = 'Standard' if not has_extensions else 'Journal' + + for key in keys: + assert key.startswith('log/') + response = client.get_object(Bucket=log_bucket_name, Key=key) + body = _get_body(response) + assert _verify_records(body, src_bucket_name, 'REST.PUT.OBJECT', src_keys, record_type, expected_count) + + +@pytest.mark.bucket_logging +def test_bucket_logging_put_objects(): + _bucket_logging_put_objects(False) + + +@pytest.mark.bucket_logging +def test_bucket_logging_put_objects_versioned(): + _bucket_logging_put_objects(True) + + +@pytest.mark.bucket_logging +def test_bucket_logging_put_concurrency(): + src_bucket_name = get_new_bucket() + log_bucket_name = get_new_bucket_name() + log_bucket = get_new_bucket_resource(name=log_bucket_name) + client = get_client(client_config=botocore.config.Config(max_pool_connections=50)) + has_extensions = _has_bucket_logging_extension() + + # minimal configuration + logging_enabled = {'TargetBucket': log_bucket_name, 'TargetPrefix': 'log/'} + if has_extensions: + logging_enabled['ObjectRollTime'] = expected_object_roll_time + logging_enabled['LoggingType'] = 'Journal' + response = client.put_bucket_logging(Bucket=src_bucket_name, BucketLoggingStatus={ + 'LoggingEnabled': logging_enabled, + }) + assert response['ResponseMetadata']['HTTPStatusCode'] == 200 + + num_keys = 50 + t = [] + for i in range(num_keys): + name = 'myobject'+str(i) + thr = threading.Thread(target = client.put_object, + kwargs={'Bucket': src_bucket_name, 'Key': name, 'Body': randcontent()}) + thr.start() + t.append(thr) + _do_wait_completion(t) + + response = client.list_objects_v2(Bucket=src_bucket_name) + src_keys = _get_keys(response) + + time.sleep(expected_object_roll_time) + t = [] + for i in range(num_keys): + thr = threading.Thread(target = client.put_object, + kwargs={'Bucket': src_bucket_name, 'Key': 'dummy', 'Body': 'dummy'}) + thr.start() + t.append(thr) + _do_wait_completion(t) + + response = client.list_objects_v2(Bucket=log_bucket_name) + keys = _get_keys(response) + assert len(keys) == 1 + + record_type = 'Standard' if not has_extensions else 'Journal' + + for key in keys: + logger.info('logging object: %s', key) + assert key.startswith('log/') + response = client.get_object(Bucket=log_bucket_name, Key=key) + body = _get_body(response) + assert _verify_records(body, src_bucket_name, 'REST.PUT.OBJECT', src_keys, record_type, num_keys) + + +def _bucket_logging_delete_objects(versioned): + src_bucket_name = get_new_bucket() + if versioned: + check_configure_versioning_retry(src_bucket_name, "Enabled", "Enabled") + log_bucket_name = get_new_bucket_name() + log_bucket = get_new_bucket_resource(name=log_bucket_name) + client = get_client() + has_extensions = _has_bucket_logging_extension() + + num_keys = 5 + for j in range(num_keys): + name = 'myobject'+str(j) + client.put_object(Bucket=src_bucket_name, Key=name, Body=randcontent()) + if versioned: + client.put_object(Bucket=src_bucket_name, Key=name, Body=randcontent()) + + # minimal configuration + logging_enabled = {'TargetBucket': log_bucket_name, 'TargetPrefix': 'log/'} + if has_extensions: + logging_enabled['ObjectRollTime'] = expected_object_roll_time + logging_enabled['LoggingType'] = 'Journal' + response = client.put_bucket_logging(Bucket=src_bucket_name, BucketLoggingStatus={ + 'LoggingEnabled': logging_enabled, + }) + + assert response['ResponseMetadata']['HTTPStatusCode'] == 200 + response = client.list_objects_v2(Bucket=src_bucket_name) + src_keys = _get_keys(response) + for key in src_keys: + if versioned: + response = client.head_object(Bucket=src_bucket_name, Key=key) + client.delete_object(Bucket=src_bucket_name, Key=key, VersionId=response['VersionId']) + client.delete_object(Bucket=src_bucket_name, Key=key) + + time.sleep(expected_object_roll_time) + client.put_object(Bucket=src_bucket_name, Key='dummy', Body='dummy') + + response = client.list_objects_v2(Bucket=log_bucket_name) + keys = _get_keys(response) + assert len(keys) == 1 + + if versioned: + expected_count = 2*num_keys + else: + expected_count = num_keys + + key = keys[0] + assert key.startswith('log/') + response = client.get_object(Bucket=log_bucket_name, Key=key) + body = _get_body(response) + record_type = 'Standard' if not has_extensions else 'Journal' + assert _verify_records(body, src_bucket_name, 'REST.DELETE.OBJECT', src_keys, record_type, expected_count) + + +@pytest.mark.bucket_logging +def test_bucket_logging_delete_objects(): + _bucket_logging_delete_objects(False) + + +@pytest.mark.bucket_logging +def test_bucket_logging_delete_objects_versioned(): + _bucket_logging_delete_objects(True) + + +@pytest.mark.bucket_logging +def _bucket_logging_get_objects(versioned): + src_bucket_name = get_new_bucket() + if versioned: + check_configure_versioning_retry(src_bucket_name, "Enabled", "Enabled") + log_bucket_name = get_new_bucket_name() + log_bucket = get_new_bucket_resource(name=log_bucket_name) + client = get_client() + has_extensions = _has_bucket_logging_extension() + + num_keys = 5 + for j in range(num_keys): + name = 'myobject'+str(j) + client.put_object(Bucket=src_bucket_name, Key=name, Body=randcontent()) + if versioned: + client.put_object(Bucket=src_bucket_name, Key=name, Body=randcontent()) + + # minimal configuration + logging_enabled = {'TargetBucket': log_bucket_name, 'TargetPrefix': 'log/'} + if has_extensions: + logging_enabled['ObjectRollTime'] = expected_object_roll_time + logging_enabled['LoggingType'] = 'Standard' + response = client.put_bucket_logging(Bucket=src_bucket_name, BucketLoggingStatus={ + 'LoggingEnabled': logging_enabled, + }) + + assert response['ResponseMetadata']['HTTPStatusCode'] == 200 + response = client.list_objects_v2(Bucket=src_bucket_name) + src_keys = _get_keys(response) + for key in src_keys: + if versioned: + response = client.head_object(Bucket=src_bucket_name, Key=key) + client.get_object(Bucket=src_bucket_name, Key=key, VersionId=response['VersionId']) + client.get_object(Bucket=src_bucket_name, Key=key) + + time.sleep(expected_object_roll_time) + client.put_object(Bucket=src_bucket_name, Key='dummy', Body='dummy') + + response = client.list_objects_v2(Bucket=log_bucket_name) + keys = _get_keys(response) + assert len(keys) == 1 + + if versioned: + expected_count = 2*num_keys + else: + expected_count = num_keys + + key = keys[0] + assert key.startswith('log/') + response = client.get_object(Bucket=log_bucket_name, Key=key) + body = _get_body(response) + assert _verify_records(body, src_bucket_name, 'REST.GET.OBJECT', src_keys, 'Standard', expected_count) + + +@pytest.mark.bucket_logging +def test_bucket_logging_get_objects(): + _bucket_logging_get_objects(False) + + +@pytest.mark.bucket_logging +def test_bucket_logging_get_objects_versioned(): + _bucket_logging_get_objects(True) + + +@pytest.mark.bucket_logging +def _bucket_logging_copy_objects(versioned, another_bucket): + src_bucket_name = get_new_bucket() + if another_bucket: + dst_bucket_name = get_new_bucket() + else: + dst_bucket_name = src_bucket_name + if versioned: + check_configure_versioning_retry(src_bucket_name, "Enabled", "Enabled") + log_bucket_name = get_new_bucket_name() + log_bucket = get_new_bucket_resource(name=log_bucket_name) + client = get_client() + has_extensions = _has_bucket_logging_extension() + + num_keys = 5 + for j in range(num_keys): + name = 'myobject'+str(j) + client.put_object(Bucket=src_bucket_name, Key=name, Body=randcontent()) + if versioned: + client.put_object(Bucket=src_bucket_name, Key=name, Body=randcontent()) + + # minimal configuration + logging_enabled = {'TargetBucket': log_bucket_name, 'TargetPrefix': 'log/'} + if has_extensions: + logging_enabled['ObjectRollTime'] = expected_object_roll_time + logging_enabled['LoggingType'] = 'Journal' + response = client.put_bucket_logging(Bucket=src_bucket_name, BucketLoggingStatus={ + 'LoggingEnabled': logging_enabled, + }) + assert response['ResponseMetadata']['HTTPStatusCode'] == 200 + if another_bucket: + response = client.put_bucket_logging(Bucket=dst_bucket_name, BucketLoggingStatus={ + 'LoggingEnabled': logging_enabled, + }) + assert response['ResponseMetadata']['HTTPStatusCode'] == 200 + + response = client.list_objects_v2(Bucket=src_bucket_name) + src_keys = _get_keys(response) + dst_keys = [] + for key in src_keys: + dst_keys.append('copy_of_'+key) + if another_bucket: + client.copy_object(Bucket=dst_bucket_name, Key='copy_of_'+key, CopySource={'Bucket': src_bucket_name, 'Key': key}) + else: + client.copy_object(Bucket=src_bucket_name, Key='copy_of_'+key, CopySource={'Bucket': src_bucket_name, 'Key': key}) + + time.sleep(expected_object_roll_time) + client.put_object(Bucket=src_bucket_name, Key='dummy', Body='dummy') + + response = client.list_objects_v2(Bucket=log_bucket_name) + keys = _get_keys(response) + assert len(keys) == 1 + + key = keys[0] + assert key.startswith('log/') + response = client.get_object(Bucket=log_bucket_name, Key=key) + body = _get_body(response) + record_type = 'Standard' if not has_extensions else 'Journal' + assert _verify_records(body, dst_bucket_name, 'REST.PUT.OBJECT', dst_keys, record_type, num_keys) + + +@pytest.mark.bucket_logging +def test_bucket_logging_copy_objects(): + _bucket_logging_copy_objects(False, False) + + +@pytest.mark.bucket_logging +def test_bucket_logging_copy_objects_versioned(): + _bucket_logging_copy_objects(True, False) + + +@pytest.mark.bucket_logging +def test_bucket_logging_copy_objects_bucket(): + _bucket_logging_copy_objects(False, True) + + +@pytest.mark.bucket_logging +def test_bucket_logging_copy_objects_bucket_versioned(): + _bucket_logging_copy_objects(True, True) + + +@pytest.mark.bucket_logging +def _bucket_logging_head_objects(versioned): + src_bucket_name = get_new_bucket() + if versioned: + check_configure_versioning_retry(src_bucket_name, "Enabled", "Enabled") + log_bucket_name = get_new_bucket_name() + log_bucket = get_new_bucket_resource(name=log_bucket_name) + client = get_client() + has_extensions = _has_bucket_logging_extension() + + num_keys = 5 + for j in range(num_keys): + name = 'myobject'+str(j) + client.put_object(Bucket=src_bucket_name, Key=name, Body=randcontent()) + + logging_enabled = {'TargetBucket': log_bucket_name, 'TargetPrefix': 'log/'} + if has_extensions: + logging_enabled['ObjectRollTime'] = expected_object_roll_time + logging_enabled['LoggingType'] = 'Standard' + response = client.put_bucket_logging(Bucket=src_bucket_name, BucketLoggingStatus={ + 'LoggingEnabled': logging_enabled, + }) + + assert response['ResponseMetadata']['HTTPStatusCode'] == 200 + response = client.list_objects_v2(Bucket=src_bucket_name) + src_keys = _get_keys(response) + for key in src_keys: + if versioned: + response = client.head_object(Bucket=src_bucket_name, Key=key) + client.head_object(Bucket=src_bucket_name, Key=key, VersionId=response['VersionId']) + else: + client.head_object(Bucket=src_bucket_name, Key=key) + + time.sleep(expected_object_roll_time) + client.put_object(Bucket=src_bucket_name, Key='dummy', Body='dummy') + + response = client.list_objects_v2(Bucket=log_bucket_name) + keys = _get_keys(response) + assert len(keys) == 1 + + if versioned: + expected_count = 2*num_keys + else: + expected_count = num_keys + + key = keys[0] + assert key.startswith('log/') + response = client.get_object(Bucket=log_bucket_name, Key=key) + body = _get_body(response) + assert _verify_records(body, src_bucket_name, 'REST.HEAD.OBJECT', src_keys, 'Standard', expected_count) + + +@pytest.mark.bucket_logging +def test_bucket_logging_head_objects(): + _bucket_logging_head_objects(False) + + +@pytest.mark.bucket_logging +def test_bucket_logging_head_objects_versioned(): + _bucket_logging_head_objects(True) + + +@pytest.mark.bucket_logging +def _bucket_logging_mpu(versioned): + src_bucket_name = get_new_bucket() + if versioned: + check_configure_versioning_retry(src_bucket_name, "Enabled", "Enabled") + log_bucket_name = get_new_bucket_name() + log_bucket = get_new_bucket_resource(name=log_bucket_name) + client = get_client() + has_extensions = _has_bucket_logging_extension() + + # minimal configuration + logging_enabled = {'TargetBucket': log_bucket_name, 'TargetPrefix': 'log/'} + if has_extensions: + logging_enabled['ObjectRollTime'] = expected_object_roll_time + logging_enabled['LoggingType'] = 'Journal' + response = client.put_bucket_logging(Bucket=src_bucket_name, BucketLoggingStatus={ + 'LoggingEnabled': logging_enabled, + }) + assert response['ResponseMetadata']['HTTPStatusCode'] == 200 + + src_key = "myobject" + objlen = 30 * 1024 * 1024 + (upload_id, data, parts) = _multipart_upload(bucket_name=src_bucket_name, key=src_key, size=objlen) + client.complete_multipart_upload(Bucket=src_bucket_name, Key=src_key, UploadId=upload_id, MultipartUpload={'Parts': parts}) + if versioned: + (upload_id, data, parts) = _multipart_upload(bucket_name=src_bucket_name, key=src_key, size=objlen) + client.complete_multipart_upload(Bucket=src_bucket_name, Key=src_key, UploadId=upload_id, MultipartUpload={'Parts': parts}) + + time.sleep(expected_object_roll_time) + client.put_object(Bucket=src_bucket_name, Key='dummy', Body='dummy') + + response = client.list_objects_v2(Bucket=log_bucket_name) + keys = _get_keys(response) + assert len(keys) == 1 + + if versioned: + expected_count = 4 if not has_extensions else 2 + else: + expected_count = 2 if not has_extensions else 1 + + key = keys[0] + assert key.startswith('log/') + response = client.get_object(Bucket=log_bucket_name, Key=key) + body = _get_body(response) + record_type = 'Standard' if not has_extensions else 'Journal' + assert _verify_records(body, src_bucket_name, 'REST.POST.UPLOAD', [src_key, src_key], record_type, expected_count) + + +@pytest.mark.bucket_logging +def test_bucket_logging_mpu(): + _bucket_logging_mpu(False) + + +@pytest.mark.bucket_logging +def test_bucket_logging_mpu_versioned(): + _bucket_logging_mpu(True) + + +@pytest.mark.bucket_logging +def _bucket_logging_mpu_copy(versioned): + src_bucket_name = get_new_bucket() + if versioned: + check_configure_versioning_retry(src_bucket_name, "Enabled", "Enabled") + log_bucket_name = get_new_bucket_name() + log_bucket = get_new_bucket_resource(name=log_bucket_name) + client = get_client() + has_extensions = _has_bucket_logging_extension() + + src_key = "myobject" + objlen = 30 * 1024 * 1024 + (upload_id, data, parts) = _multipart_upload(bucket_name=src_bucket_name, key=src_key, size=objlen) + client.complete_multipart_upload(Bucket=src_bucket_name, Key=src_key, UploadId=upload_id, MultipartUpload={'Parts': parts}) + if versioned: + (upload_id, data, parts) = _multipart_upload(bucket_name=src_bucket_name, key=src_key, size=objlen) + client.complete_multipart_upload(Bucket=src_bucket_name, Key=src_key, UploadId=upload_id, MultipartUpload={'Parts': parts}) + + # minimal configuration + logging_enabled = {'TargetBucket': log_bucket_name, 'TargetPrefix': 'log/'} + if has_extensions: + logging_enabled['ObjectRollTime'] = expected_object_roll_time + logging_enabled['LoggingType'] = 'Journal' + response = client.put_bucket_logging(Bucket=src_bucket_name, BucketLoggingStatus={ + 'LoggingEnabled': logging_enabled, + }) + assert response['ResponseMetadata']['HTTPStatusCode'] == 200 + + client.copy_object(Bucket=src_bucket_name, Key='copy_of_'+src_key, CopySource={'Bucket': src_bucket_name, 'Key': src_key}) + + time.sleep(expected_object_roll_time) + client.put_object(Bucket=src_bucket_name, Key='dummy', Body='dummy') + + response = client.list_objects_v2(Bucket=log_bucket_name) + keys = _get_keys(response) + assert len(keys) == 1 + + key = keys[0] + assert key.startswith('log/') + response = client.get_object(Bucket=log_bucket_name, Key=key) + body = _get_body(response) + record_type = 'Standard' if not has_extensions else 'Journal' + assert _verify_records(body, src_bucket_name, 'REST.PUT.OBJECT', ['copy_of_'+src_key], record_type, 1) + + +@pytest.mark.bucket_logging +def test_bucket_logging_mpu_copy(): + _bucket_logging_mpu_copy(False) + + +@pytest.mark.bucket_logging +def test_bucket_logging_mpu_copy_versioned(): + _bucket_logging_mpu_copy(True) + + +def _bucket_logging_multi_delete(versioned): + src_bucket_name = get_new_bucket() + if versioned: + check_configure_versioning_retry(src_bucket_name, "Enabled", "Enabled") + log_bucket_name = get_new_bucket_name() + log_bucket = get_new_bucket_resource(name=log_bucket_name) + client = get_client() + has_extensions = _has_bucket_logging_extension() + + num_keys = 5 + for j in range(num_keys): + name = 'myobject'+str(j) + client.put_object(Bucket=src_bucket_name, Key=name, Body=randcontent()) + if versioned: + client.put_object(Bucket=src_bucket_name, Key=name, Body=randcontent()) + + # minimal configuration + logging_enabled = {'TargetBucket': log_bucket_name, 'TargetPrefix': 'log/'} + if has_extensions: + logging_enabled['ObjectRollTime'] = expected_object_roll_time + logging_enabled['LoggingType'] = 'Journal' + response = client.put_bucket_logging(Bucket=src_bucket_name, BucketLoggingStatus={ + 'LoggingEnabled': logging_enabled, + }) + + assert response['ResponseMetadata']['HTTPStatusCode'] == 200 + response = client.list_objects_v2(Bucket=src_bucket_name) + src_keys = _get_keys(response) + if versioned: + response = client.list_object_versions(Bucket=src_bucket_name) + objs_list = [] + for version in response['Versions']: + obj_dict = {'Key': version['Key'], 'VersionId': version['VersionId']} + objs_list.append(obj_dict) + objs_dict = {'Objects': objs_list} + client.delete_objects(Bucket=src_bucket_name, Delete=objs_dict) + else: + objs_dict = _make_objs_dict(key_names=src_keys) + client.delete_objects(Bucket=src_bucket_name, Delete=objs_dict) + + time.sleep(expected_object_roll_time) + client.put_object(Bucket=src_bucket_name, Key='dummy', Body='dummy') + + response = client.list_objects_v2(Bucket=log_bucket_name) + keys = _get_keys(response) + assert len(keys) == 1 + + if versioned: + expected_count = 2*num_keys + else: + expected_count = num_keys + + key = keys[0] + assert key.startswith('log/') + response = client.get_object(Bucket=log_bucket_name, Key=key) + body = _get_body(response) + record_type = 'Standard' if not has_extensions else 'Journal' + assert _verify_records(body, src_bucket_name, "REST.POST.DELETE_MULTI_OBJECT", src_keys, record_type, expected_count) + + +@pytest.mark.bucket_logging +def test_bucket_logging_multi_delete(): + _bucket_logging_multi_delete(False) + + +@pytest.mark.bucket_logging +def test_bucket_logging_multi_delete_versioned(): + _bucket_logging_multi_delete(True) + + +def _bucket_logging_type(logging_type): + src_bucket_name = get_new_bucket_name() + src_bucket = get_new_bucket_resource(name=src_bucket_name) + log_bucket_name = get_new_bucket_name() + log_bucket = get_new_bucket_resource(name=log_bucket_name) + client = get_client() + logging_enabled = { + 'TargetBucket': log_bucket_name, + 'TargetPrefix': 'log/', + 'ObjectRollTime': expected_object_roll_time, + 'LoggingType': logging_type + } + response = client.put_bucket_logging(Bucket=src_bucket_name, BucketLoggingStatus={ + 'LoggingEnabled': logging_enabled, + }) + num_keys = 5 + for j in range(num_keys): + name = 'myobject'+str(j) + client.put_object(Bucket=src_bucket_name, Key=name, Body=randcontent()) + client.head_object(Bucket=src_bucket_name, Key=name) + + response = client.list_objects_v2(Bucket=src_bucket_name) + src_keys = _get_keys(response) + + time.sleep(expected_object_roll_time) + client.put_object(Bucket=src_bucket_name, Key='dummy', Body='dummy') + client.head_object(Bucket=src_bucket_name, Key='dummy') + + response = client.list_objects_v2(Bucket=log_bucket_name) + keys = _get_keys(response) + assert len(keys) == 1 + + key = keys[0] + assert key.startswith('log/') + response = client.get_object(Bucket=log_bucket_name, Key=key) + body = _get_body(response) + if logging_type == 'Journal': + assert _verify_records(body, src_bucket_name, 'REST.PUT.OBJECT', src_keys, 'Journal', num_keys) + assert _verify_records(body, src_bucket_name, 'REST.HEAD.OBJECT', src_keys, 'Journal', num_keys) == False + elif logging_type == 'Standard': + assert _verify_records(body, src_bucket_name, 'REST.HEAD.OBJECT', src_keys, 'Standard', num_keys) + assert _verify_records(body, src_bucket_name, 'REST.PUT.OBJECT', src_keys, 'Standard', num_keys) + else: + assert False, 'invalid logging type:'+logging_type + + +@pytest.mark.bucket_logging +@pytest.mark.fails_on_aws +def test_bucket_logging_event_type_j(): + if not _has_bucket_logging_extension(): + pytest.skip('ceph extension to bucket logging not supported at client') + _bucket_logging_type('Journal') + + +@pytest.mark.bucket_logging +@pytest.mark.fails_on_aws +def test_bucket_logging_event_type_s(): + if not _has_bucket_logging_extension(): + pytest.skip('ceph extension to bucket logging not supported at client') + _bucket_logging_type('Standard') + + +@pytest.mark.bucket_logging +@pytest.mark.fails_on_aws +def test_bucket_logging_roll_time(): + if not _has_bucket_logging_extension(): + pytest.skip('ceph extension to bucket logging not supported at client') + src_bucket_name = get_new_bucket_name() + src_bucket = get_new_bucket_resource(name=src_bucket_name) + log_bucket_name = get_new_bucket_name() + log_bucket = get_new_bucket_resource(name=log_bucket_name) + client = get_client() + + roll_time = 10 + logging_enabled = {'TargetBucket': log_bucket_name, 'TargetPrefix': 'log/', 'ObjectRollTime': roll_time} + response = client.put_bucket_logging(Bucket=src_bucket_name, BucketLoggingStatus={ + 'LoggingEnabled': logging_enabled, + }) + assert response['ResponseMetadata']['HTTPStatusCode'] == 200 + + num_keys = 5 + for j in range(num_keys): + name = 'myobject'+str(j) + client.put_object(Bucket=src_bucket_name, Key=name, Body=randcontent()) + + response = client.list_objects_v2(Bucket=src_bucket_name) + src_keys = _get_keys(response) + + time.sleep(roll_time/2) + client.put_object(Bucket=src_bucket_name, Key='myobject', Body=randcontent()) + + response = client.list_objects_v2(Bucket=log_bucket_name) + keys = _get_keys(response) + assert len(keys) == 0 + + time.sleep(roll_time/2) + client.put_object(Bucket=src_bucket_name, Key='myobject', Body=randcontent()) + + response = client.list_objects_v2(Bucket=log_bucket_name) + keys = _get_keys(response) + len(keys) == 1 + + key = keys[0] + assert key.startswith('log/') + response = client.get_object(Bucket=log_bucket_name, Key=key) + body = _get_body(response) + assert _verify_records(body, src_bucket_name, 'REST.PUT.OBJECT', src_keys, 'Standard', num_keys) + client.delete_object(Bucket=log_bucket_name, Key=key) + + num_keys = 25 + for j in range(num_keys): + name = 'myobject'+str(j) + client.put_object(Bucket=src_bucket_name, Key=name, Body=randcontent()) + time.sleep(1) + + response = client.list_objects_v2(Bucket=src_bucket_name) + src_keys = _get_keys(response) + + time.sleep(roll_time) + client.put_object(Bucket=src_bucket_name, Key='myobject', Body=randcontent()) + + response = client.list_objects_v2(Bucket=log_bucket_name) + keys = _get_keys(response) + assert len(keys) > 1 + + body = '' + for key in keys: + assert key.startswith('log/') + response = client.get_object(Bucket=log_bucket_name, Key=key) + body += _get_body(response) + assert _verify_records(body, src_bucket_name, 'REST.PUT.OBJECT', src_keys, 'Standard', num_keys+1) + + +@pytest.mark.bucket_logging +def test_bucket_logging_multiple_prefixes(): + log_bucket_name = get_new_bucket_name() + log_bucket = get_new_bucket_resource(name=log_bucket_name) + client = get_client() + has_extensions = _has_bucket_logging_extension() + + num_buckets = 5 + buckets = [] + bucket_name_prefix = get_new_bucket_name() + for j in range(num_buckets): + src_bucket_name = bucket_name_prefix+str(j) + src_bucket = get_new_bucket_resource(name=src_bucket_name) + logging_enabled = {'TargetBucket': log_bucket_name, 'TargetPrefix': src_bucket_name+'/'} + if has_extensions: + logging_enabled['ObjectRollTime'] = expected_object_roll_time + response = client.put_bucket_logging(Bucket=src_bucket_name, BucketLoggingStatus={ + 'LoggingEnabled': logging_enabled, + }) + assert response['ResponseMetadata']['HTTPStatusCode'] == 200 + buckets.append(src_bucket_name) + + num_keys = 5 + for src_bucket_name in buckets: + for j in range(num_keys): + name = 'myobject'+str(j) + client.put_object(Bucket=src_bucket_name, Key=name, Body=randcontent()) + + time.sleep(expected_object_roll_time) + for src_bucket_name in buckets: + client.head_object(Bucket=src_bucket_name, Key='myobject0') + + response = client.list_objects_v2(Bucket=log_bucket_name) + keys = _get_keys(response) + assert len(keys) >= num_buckets + + for key in keys: + response = client.get_object(Bucket=log_bucket_name, Key=key) + body = _get_body(response) + found = False + for src_bucket_name in buckets: + if key.startswith(src_bucket_name): + found = True + response = client.list_objects_v2(Bucket=src_bucket_name) + src_keys = _get_keys(response) + assert _verify_records(body, src_bucket_name, 'REST.PUT.OBJECT', src_keys, 'Standard', num_keys) + assert found + + +@pytest.mark.bucket_logging +def test_bucket_logging_single_prefix(): + log_bucket_name = get_new_bucket_name() + log_bucket = get_new_bucket_resource(name=log_bucket_name) + client = get_client() + has_extensions = _has_bucket_logging_extension() + + num_buckets = 5 + buckets = [] + bucket_name_prefix = get_new_bucket_name() + for j in range(num_buckets): + src_bucket_name = bucket_name_prefix+str(j) + src_bucket = get_new_bucket_resource(name=src_bucket_name) + # minimal configuration + logging_enabled = {'TargetBucket': log_bucket_name, 'TargetPrefix': 'log/'} + if has_extensions: + logging_enabled['ObjectRollTime'] = expected_object_roll_time + response = client.put_bucket_logging(Bucket=src_bucket_name, BucketLoggingStatus={ + 'LoggingEnabled': logging_enabled, + }) + assert response['ResponseMetadata']['HTTPStatusCode'] == 200 + buckets.append(src_bucket_name) + + num_keys = 5 + bucket_ind = 0 + for src_bucket_name in buckets: + bucket_ind += 1 + for j in range(num_keys): + name = 'myobject'+str(bucket_ind)+str(j) + client.put_object(Bucket=src_bucket_name, Key=name, Body=randcontent()) + + time.sleep(expected_object_roll_time) + client.put_object(Bucket=buckets[0], Key='dummy', Body='dummy') + + response = client.list_objects_v2(Bucket=log_bucket_name) + keys = _get_keys(response) + assert len(keys) == 1 + + key = keys[0] + response = client.get_object(Bucket=log_bucket_name, Key=key) + body = _get_body(response) + found = False + for src_bucket_name in buckets: + response = client.list_objects_v2(Bucket=src_bucket_name) + src_keys = _get_keys(response) + found = _verify_records(body, src_bucket_name, 'REST.PUT.OBJECT', src_keys, 'Standard', num_keys) + assert found