Test expiration header for lc rules with tags

Signed-off-by: Or Friedmann <ofriedma@redhat.com>
This commit is contained in:
Or Friedmann 2020-09-30 16:25:22 +03:00 committed by Casey Bodley
parent ae981dd3a8
commit 41006d68c2

View file

@ -25,6 +25,7 @@ import os
import string import string
import random import random
import socket import socket
import dateutil.parser
import ssl import ssl
from collections import namedtuple from collections import namedtuple
@ -9197,7 +9198,7 @@ def test_lifecyclev2_expiration():
@attr(resource='bucket') @attr(resource='bucket')
@attr(method='put') @attr(method='put')
@attr(operation='test lifecycle expiration on versining enabled bucket') @attr(operation='test lifecycle expiration on versioning enabled bucket')
@attr('lifecycle') @attr('lifecycle')
@attr('lifecycle_expiration') @attr('lifecycle_expiration')
@attr('fails_on_aws') @attr('fails_on_aws')
@ -9601,20 +9602,18 @@ def setup_lifecycle_expiration(client, bucket_name, rule_id, delta_days,
def check_lifecycle_expiration_header(response, start_time, rule_id, def check_lifecycle_expiration_header(response, start_time, rule_id,
delta_days): delta_days):
print(response) expr_exists = ('x-amz-expiration' in response['ResponseMetadata']['HTTPHeaders'])
print(response['ResponseMetadata']['HTTPHeaders']) if (not expr_exists):
return False
expr_hdr = response['ResponseMetadata']['HTTPHeaders']['x-amz-expiration']
exp_header = response['ResponseMetadata']['HTTPHeaders']['x-amz-expiration'] m = re.search(r'expiry-date="(.+)", rule-id="(.+)"', expr_hdr)
m = re.search(r'expiry-date="(.+)", rule-id="(.+)"', exp_header)
datestr = m.group(1)
exp_date = datetime.datetime.strptime(datestr, '%a, %d %b %Y %H:%M:%S %Z')
exp_diff = exp_date - start_time
rule_id = m.group(2)
eq(exp_diff.days, delta_days) expiration = dateutil.parser.parse(m.group(1))
eq(m.group(2), rule_id) days_to_expire = ((expiration.replace(tzinfo=None) - start_time).days == delta_days)
rule_eq_id = (m.group(2) == rule_id)
return True return days_to_expire and rule_eq_id
@attr(resource='bucket') @attr(resource='bucket')
@attr(method='put') @attr(method='put')
@ -9650,6 +9649,112 @@ def test_lifecycle_expiration_header_head():
eq(response['ResponseMetadata']['HTTPStatusCode'], 200) eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
eq(check_lifecycle_expiration_header(response, now, 'rule1', 1), True) eq(check_lifecycle_expiration_header(response, now, 'rule1', 1), True)
@attr(resource='bucket')
@attr(method='head')
@attr(operation='test lifecycle expiration header head with tags')
@attr('lifecycle')
@attr('lifecycle_expiration')
def test_lifecycle_expiration_header_tags_head():
bucket_name = get_new_bucket()
client = get_client()
lifecycle={
"Rules": [
{
"Filter": {
"Tag": {"Key": "key1", "Value": "tag1"}
},
"Status": "Enabled",
"Expiration": {
"Days": 1
},
"ID": "rule1"
},
]
}
response = client.put_bucket_lifecycle_configuration(
Bucket=bucket_name, LifecycleConfiguration=lifecycle)
key1 = "obj_key1"
body1 = "obj_key1_body"
tags1={'TagSet': [{'Key': 'key1', 'Value': 'tag1'},
{'Key': 'key5','Value': 'tag5'}]}
response = client.put_object(Bucket=bucket_name, Key=key1, Body=body1)
response = client.put_object_tagging(Bucket=bucket_name, Key=key1,Tagging=tags1)
# stat the object, check header
response = client.head_object(Bucket=bucket_name, Key=key1)
eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
eq(check_lifecycle_expiration_header(response, datetime.datetime.now(None), 'rule1', 1), True)
# test that header is not returning when it should not
lifecycle={
"Rules": [
{
"Filter": {
"Tag": {"Key": "key2", "Value": "tag1"}
},
"Status": "Enabled",
"Expiration": {
"Days": 1
},
"ID": "rule1"
},
]
}
response = client.put_bucket_lifecycle_configuration(
Bucket=bucket_name, LifecycleConfiguration=lifecycle)
# stat the object, check header
response = client.head_object(Bucket=bucket_name, Key=key1)
eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
eq(check_lifecycle_expiration_header(response, datetime.datetime.now(None), 'rule1', 1), False)
@attr(resource='bucket')
@attr(method='head')
@attr(operation='test lifecycle expiration header head with tags and And')
@attr('lifecycle')
@attr('lifecycle_expiration')
def test_lifecycle_expiration_header_and_tags_head():
now = datetime.datetime.now(None)
bucket_name = get_new_bucket()
client = get_client()
lifecycle={
"Rules": [
{
"Filter": {
"And": {
"Tags": [
{
"Key": "key1",
"Value": "tag1"
},
{
"Key": "key5",
"Value": "tag6"
}
]
}
},
"Status": "Enabled",
"Expiration": {
"Days": 1
},
"ID": "rule1"
},
]
}
response = client.put_bucket_lifecycle_configuration(
Bucket=bucket_name, LifecycleConfiguration=lifecycle)
key1 = "obj_key1"
body1 = "obj_key1_body"
tags1={'TagSet': [{'Key': 'key1', 'Value': 'tag1'},
{'Key': 'key5','Value': 'tag5'}]}
response = client.put_object(Bucket=bucket_name, Key=key1, Body=body1)
response = client.put_object_tagging(Bucket=bucket_name, Key=key1,Tagging=tags1)
# stat the object, check header
response = client.head_object(Bucket=bucket_name, Key=key1)
eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
eq(check_lifecycle_expiration_header(response, datetime.datetime.now(None), 'rule1', 1), False)
@attr(resource='bucket') @attr(resource='bucket')
@attr(method='put') @attr(method='put')
@attr(operation='set lifecycle config with noncurrent version expiration') @attr(operation='set lifecycle config with noncurrent version expiration')