diff --git a/s3tests_boto3/functional/test_s3.py b/s3tests_boto3/functional/test_s3.py index 56433ba..e80afe6 100644 --- a/s3tests_boto3/functional/test_s3.py +++ b/s3tests_boto3/functional/test_s3.py @@ -14017,7 +14017,7 @@ def _parse_standard_log_record(record): record = record.replace('[', '"').replace(']', '"') chunks = shlex.split(record) assert len(chunks) == 26 - return json.dumps({ + return { 'BucketOwner': chunks[0], 'BucketName': chunks[1], 'RequestDateTime': chunks[2], @@ -14044,14 +14044,14 @@ def _parse_standard_log_record(record): 'TLSVersion': chunks[23], 'AccessPointARN': chunks[24], 'ACLRequired': chunks[25], - }, indent=4) + } def _parse_journal_log_record(record): record = record.replace('[', '"').replace(']', '"') chunks = shlex.split(record) assert len(chunks) == 8 - return json.dumps({ + return { 'BucketOwner': chunks[0], 'BucketName': chunks[1], 'RequestDateTime': chunks[2], @@ -14060,7 +14060,7 @@ def _parse_journal_log_record(record): 'ObjectSize': chunks[5], 'VersionID': chunks[6], 'ETAG': chunks[7], - }, indent=4) + } def _parse_log_record(record, record_type): if record_type == 'Standard': @@ -14072,6 +14072,36 @@ def _parse_log_record(record, record_type): expected_object_roll_time = 5 +import logging + +logger = logging.getLogger(__name__) + + +def _verify_records(records, bucket_name, event_type, src_keys, record_type, expected_count, exact_match=False): + keys_found = [] + all_keys = [] + for record in iter(records.splitlines()): + parsed_record = _parse_log_record(record, record_type) + logger.info('bucket log record: %s', json.dumps(parsed_record, indent=4)) + if bucket_name in record and event_type in record: + all_keys.append(parsed_record['Key']) + for key in src_keys: + if key in record: + keys_found.append(key) + break + logger.info('keys found in bucket log: %s', str(all_keys)) + logger.info('keys from the source bucket: %s', str(src_keys)) + if exact_match: + return len(keys_found) == expected_count and len(keys_found) == len(all_keys) + return len(keys_found) == expected_count + + +def randcontent(): + letters = string.ascii_lowercase + length = random.randint(10, 1024) + return ''.join(random.choice(letters) for i in range(length)) + + @pytest.mark.bucket_logging def test_put_bucket_logging(): src_bucket_name = get_new_bucket_name() @@ -14176,6 +14206,161 @@ def test_put_bucket_logging(): assert response['LoggingEnabled'] == logging_enabled +def _bucket_logging_key_filter(log_type): + src_bucket_name = get_new_bucket_name() + src_bucket = get_new_bucket_resource(name=src_bucket_name) + log_bucket_name = get_new_bucket_name() + log_bucket = get_new_bucket_resource(name=log_bucket_name) + client = get_client() + + logging_enabled = { + 'TargetBucket': log_bucket_name, + 'LoggingType': log_type, + 'TargetPrefix': 'log/', + 'ObjectRollTime': expected_object_roll_time, + 'TargetObjectKeyFormat': {'SimplePrefix': {}}, + 'RecordsBatchSize': 0, + 'Filter': + { + 'Key': { + 'FilterRules': [ + {'Name': 'prefix', 'Value': 'test/'}, + {'Name': 'suffix', 'Value': '.txt'} + ] + } + } + } + + response = client.put_bucket_logging(Bucket=src_bucket_name, BucketLoggingStatus={ + 'LoggingEnabled': logging_enabled, + }) + assert response['ResponseMetadata']['HTTPStatusCode'] == 200 + + response = client.get_bucket_logging(Bucket=src_bucket_name) + assert response['ResponseMetadata']['HTTPStatusCode'] == 200 + if log_type == 'Journal': + assert response['LoggingEnabled'] == logging_enabled + elif log_type == 'Standard': + print('TODO') + else: + assert False, 'unknown log type: %s' % log_type + + names = [] + num_keys = 5 + for j in range(num_keys): + name = 'myobject'+str(j) + if log_type == 'Standard': + # standard log records are not filtered + names.append(name) + client.put_object(Bucket=src_bucket_name, Key=name, Body=randcontent()) + + for j in range(num_keys): + name = 'test/'+'myobject'+str(j)+'.txt' + names.append(name) + client.put_object(Bucket=src_bucket_name, Key=name, Body=randcontent()) + + expected_count = len(names) + + time.sleep(expected_object_roll_time) + client.put_object(Bucket=src_bucket_name, Key='test/dummy.txt', Body='dummy') + + response = client.list_objects_v2(Bucket=log_bucket_name) + keys = _get_keys(response) + assert len(keys) == 1 + + for key in keys: + assert key.startswith('log/') + response = client.get_object(Bucket=log_bucket_name, Key=key) + body = _get_body(response) + assert _verify_records(body, src_bucket_name, 'REST.PUT.OBJECT', names, log_type, expected_count, exact_match=True) + + +@pytest.mark.bucket_logging +@pytest.mark.fails_on_aws +def test_bucket_logging_key_filter_s(): + if not _has_bucket_logging_extension(): + pytest.skip('ceph extension to bucket logging not supported at client') + _bucket_logging_key_filter('Standard') + + +@pytest.mark.bucket_logging +@pytest.mark.fails_on_aws +def test_bucket_logging_key_filter_j(): + if not _has_bucket_logging_extension(): + pytest.skip('ceph extension to bucket logging not supported at client') + _bucket_logging_key_filter('Journal') + + +def _bucket_logging_flush(log_type): + src_bucket_name = get_new_bucket_name() + src_bucket = get_new_bucket_resource(name=src_bucket_name) + log_bucket_name = get_new_bucket_name() + log_bucket = get_new_bucket_resource(name=log_bucket_name) + client = get_client() + + logging_enabled = { + 'TargetBucket': log_bucket_name, + 'LoggingType': log_type, + 'TargetPrefix': 'log/', + 'ObjectRollTime': 300, # 5 minutes + 'TargetObjectKeyFormat': {'SimplePrefix': {}}, + 'RecordsBatchSize': 0, + } + + response = client.put_bucket_logging(Bucket=src_bucket_name, BucketLoggingStatus={ + 'LoggingEnabled': logging_enabled, + }) + assert response['ResponseMetadata']['HTTPStatusCode'] == 200 + + response = client.get_bucket_logging(Bucket=src_bucket_name) + assert response['ResponseMetadata']['HTTPStatusCode'] == 200 + if log_type == 'Journal': + assert response['LoggingEnabled'] == logging_enabled + elif log_type == 'Standard': + print('TODO') + else: + assert False, 'unknown log type: %s' % log_type + + num_keys = 5 + for j in range(num_keys): + name = 'myobject'+str(j) + client.put_object(Bucket=src_bucket_name, Key=name, Body=randcontent()) + + response = client.list_objects_v2(Bucket=src_bucket_name) + src_keys = _get_keys(response) + + response = client.post_bucket_logging(Bucket=src_bucket_name) + assert response['ResponseMetadata']['HTTPStatusCode'] == 200 + + expected_count = num_keys + + response = client.list_objects_v2(Bucket=log_bucket_name) + keys = _get_keys(response) + assert len(keys) == 1 + + for key in keys: + assert key.startswith('log/') + response = client.get_object(Bucket=log_bucket_name, Key=key) + body = _get_body(response) + assert _verify_records(body, src_bucket_name, 'REST.PUT.OBJECT', src_keys, log_type, expected_count) + + +@pytest.mark.bucket_logging +@pytest.mark.fails_on_aws +def test_bucket_logging_flush_j(): + if not _has_bucket_logging_extension(): + pytest.skip('ceph extension to bucket logging not supported at client') + _bucket_logging_flush('Journal') + + +@pytest.mark.bucket_logging +@pytest.mark.fails_on_aws +def test_bucket_logging_flush_s(): + if not _has_bucket_logging_extension(): + pytest.skip('ceph extension to bucket logging not supported at client') + _bucket_logging_flush('Standard') + + @pytest.mark.bucket_logging def test_put_bucket_logging_errors(): src_bucket_name = get_new_bucket_name() @@ -14305,29 +14490,6 @@ def test_put_bucket_logging_extensions(): assert response['LoggingEnabled'] == logging_enabled -import logging - -logger = logging.getLogger(__name__) - -def _verify_records(records, bucket_name, event_type, src_keys, record_type, expected_count): - keys_found = [] - for record in iter(records.splitlines()): - logger.info('bucket log record: %s', _parse_log_record(record, record_type)) - if bucket_name in record and event_type in record: - for key in src_keys: - if key in record: - keys_found.append(key) - break - logger.info('keys found in bucket log: %s', str(keys_found)) - logger.info('keys from the source bucket: %s', str(src_keys)) - return len(keys_found) == expected_count - -def randcontent(): - letters = string.ascii_lowercase - length = random.randint(10, 1024) - return ''.join(random.choice(letters) for i in range(length)) - - def _bucket_logging_put_objects(versioned): src_bucket_name = get_new_bucket() if versioned: