Merge pull request #279 from zhangsw/object-lock

add test cases for object lock.

Reviewed-by: Casey Bodley <cbodley@redhat.com>
This commit is contained in:
Casey Bodley 2019-06-20 13:21:53 -04:00 committed by GitHub
commit 02722e9de5
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -10443,3 +10443,643 @@ def test_bucket_policy_get_obj_acl_existing_tag():
status, error_code = _get_status_and_error_code(e.response) status, error_code = _get_status_and_error_code(e.response)
eq(status, 403) eq(status, 403)
@attr(resource='bucket')
@attr(method='put')
@attr(operation='Test put object lock with defalut retention')
@attr(assertion='success')
@attr('object-lock')
def test_object_lock_put_obj_lock():
bucket_name = get_new_bucket_name()
client = get_client()
client.create_bucket(Bucket=bucket_name, ObjectLockEnabledForBucket=True)
conf = {'ObjectLockEnabled':'Enabled',
'Rule': {
'DefaultRetention':{
'Mode':'GOVERNANCE',
'Days':1
}
}}
response = client.put_object_lock_configuration(
Bucket=bucket_name,
ObjectLockConfiguration=conf)
eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
conf = {'ObjectLockEnabled':'Enabled',
'Rule': {
'DefaultRetention':{
'Mode':'COMPLIANCE',
'Years':1
}
}}
response = client.put_object_lock_configuration(
Bucket=bucket_name,
ObjectLockConfiguration=conf)
eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
response = client.get_bucket_versioning(Bucket=bucket_name)
eq(response['Status'], 'Enabled')
@attr(resource='bucket')
@attr(method='put')
@attr(operation='Test put object lock with bucket object lock not enabled')
@attr(assertion='fails')
@attr('object-lock')
def test_object_lock_put_obj_lock_invalid_bucket():
bucket_name = get_new_bucket_name()
client = get_client()
client.create_bucket(Bucket=bucket_name)
conf = {'ObjectLockEnabled':'Enabled',
'Rule': {
'DefaultRetention':{
'Mode':'GOVERNANCE',
'Days':1
}
}}
e = assert_raises(ClientError, client.put_object_lock_configuration, Bucket=bucket_name, ObjectLockConfiguration=conf)
status, error_code = _get_status_and_error_code(e.response)
eq(status, 409)
eq(error_code, 'InvalidBucketState')
@attr(resource='bucket')
@attr(method='put')
@attr(operation='Test put object lock with days and years')
@attr(assertion='fails')
@attr('object-lock')
def test_object_lock_put_obj_lock_with_days_and_years():
bucket_name = get_new_bucket_name()
client = get_client()
client.create_bucket(Bucket=bucket_name, ObjectLockEnabledForBucket=True)
conf = {'ObjectLockEnabled':'Enabled',
'Rule': {
'DefaultRetention':{
'Mode':'GOVERNANCE',
'Days':1,
'Years':1
}
}}
e = assert_raises(ClientError, client.put_object_lock_configuration, Bucket=bucket_name, ObjectLockConfiguration=conf)
status, error_code = _get_status_and_error_code(e.response)
eq(status, 400)
eq(error_code, 'MalformedXML')
@attr(resource='bucket')
@attr(method='put')
@attr(operation='Test put object lock with invalid days')
@attr(assertion='fails')
@attr('object-lock')
def test_object_lock_put_obj_lock_invalid_days():
bucket_name = get_new_bucket_name()
client = get_client()
client.create_bucket(Bucket=bucket_name, ObjectLockEnabledForBucket=True)
conf = {'ObjectLockEnabled':'Enabled',
'Rule': {
'DefaultRetention':{
'Mode':'GOVERNANCE',
'Days':0
}
}}
e = assert_raises(ClientError, client.put_object_lock_configuration, Bucket=bucket_name, ObjectLockConfiguration=conf)
status, error_code = _get_status_and_error_code(e.response)
eq(status, 400)
eq(error_code, 'InvalidRetentionPeriod')
@attr(resource='bucket')
@attr(method='put')
@attr(operation='Test put object lock with invalid years')
@attr(assertion='fails')
@attr('object-lock')
def test_object_lock_put_obj_lock_invalid_years():
bucket_name = get_new_bucket_name()
client = get_client()
client.create_bucket(Bucket=bucket_name, ObjectLockEnabledForBucket=True)
conf = {'ObjectLockEnabled':'Enabled',
'Rule': {
'DefaultRetention':{
'Mode':'GOVERNANCE',
'Years':-1
}
}}
e = assert_raises(ClientError, client.put_object_lock_configuration, Bucket=bucket_name, ObjectLockConfiguration=conf)
status, error_code = _get_status_and_error_code(e.response)
eq(status, 400)
eq(error_code, 'InvalidRetentionPeriod')
@attr(resource='bucket')
@attr(method='put')
@attr(operation='Test put object lock with invalid mode')
@attr(assertion='fails')
@attr('object-lock')
def test_object_lock_put_obj_lock_invalid_years():
bucket_name = get_new_bucket_name()
client = get_client()
client.create_bucket(Bucket=bucket_name, ObjectLockEnabledForBucket=True)
conf = {'ObjectLockEnabled':'Enabled',
'Rule': {
'DefaultRetention':{
'Mode':'abc',
'Years':1
}
}}
e = assert_raises(ClientError, client.put_object_lock_configuration, Bucket=bucket_name, ObjectLockConfiguration=conf)
status, error_code = _get_status_and_error_code(e.response)
eq(status, 400)
eq(error_code, 'MalformedXML')
conf = {'ObjectLockEnabled':'Enabled',
'Rule': {
'DefaultRetention':{
'Mode':'governance',
'Years':1
}
}}
e = assert_raises(ClientError, client.put_object_lock_configuration, Bucket=bucket_name, ObjectLockConfiguration=conf)
status, error_code = _get_status_and_error_code(e.response)
eq(status, 400)
eq(error_code, 'MalformedXML')
attr(resource='bucket')
@attr(method='put')
@attr(operation='Test put object lock with invalid status')
@attr(assertion='fails')
@attr('object-lock')
def test_object_lock_put_obj_lock_invalid_status():
bucket_name = get_new_bucket_name()
client = get_client()
client.create_bucket(Bucket=bucket_name, ObjectLockEnabledForBucket=True)
conf = {'ObjectLockEnabled':'Disabled',
'Rule': {
'DefaultRetention':{
'Mode':'GOVERNANCE',
'Years':1
}
}}
e = assert_raises(ClientError, client.put_object_lock_configuration, Bucket=bucket_name, ObjectLockConfiguration=conf)
status, error_code = _get_status_and_error_code(e.response)
eq(status, 400)
eq(error_code, 'MalformedXML')
attr(resource='bucket')
@attr(method='put')
@attr(operation='Test suspend versioning when object lock enabled')
@attr(assertion='fails')
@attr('object-lock')
def test_object_lock_suspend_versioning():
bucket_name = get_new_bucket_name()
client = get_client()
client.create_bucket(Bucket=bucket_name, ObjectLockEnabledForBucket=True)
e = assert_raises(ClientError, client.put_bucket_versioning, Bucket=bucket_name, VersioningConfiguration={'Status': 'Suspended'})
status, error_code = _get_status_and_error_code(e.response)
eq(status, 409)
eq(error_code, 'InvalidBucketState')
@attr(resource='bucket')
@attr(method='get')
@attr(operation='Test get object lock')
@attr(assertion='success')
@attr('object-lock')
def test_object_lock_get_obj_lock():
bucket_name = get_new_bucket_name()
client = get_client()
client.create_bucket(Bucket=bucket_name, ObjectLockEnabledForBucket=True)
conf = {'ObjectLockEnabled':'Enabled',
'Rule': {
'DefaultRetention':{
'Mode':'GOVERNANCE',
'Days':1
}
}}
client.put_object_lock_configuration(
Bucket=bucket_name,
ObjectLockConfiguration=conf)
response = client.get_object_lock_configuration(Bucket=bucket_name)
eq(response['ObjectLockConfiguration'], conf)
@attr(resource='bucket')
@attr(method='get')
@attr(operation='Test get object lock with bucket object lock not enabled')
@attr(assertion='fails')
@attr('object-lock')
def test_object_lock_get_obj_lock_invalid_bucket():
bucket_name = get_new_bucket_name()
client = get_client()
client.create_bucket(Bucket=bucket_name)
e = assert_raises(ClientError, client.get_object_lock_configuration, Bucket=bucket_name)
status, error_code = _get_status_and_error_code(e.response)
eq(status, 404)
eq(error_code, 'ObjectLockConfigurationNotFoundError')
@attr(resource='bucket')
@attr(method='get')
@attr(operation='Test put object retention')
@attr(assertion='success')
@attr('object-lock')
def test_object_lock_put_obj_retention():
bucket_name = get_new_bucket_name()
client = get_client()
client.create_bucket(Bucket=bucket_name, ObjectLockEnabledForBucket=True)
key = 'file1'
response = client.put_object(Bucket=bucket_name, Body='abc', Key=key)
version_id = response['VersionId']
retention = {'Mode':'GOVERNANCE', 'RetainUntilDate':datetime.datetime(2030,1,1,tzinfo=pytz.UTC)}
response = client.put_object_retention(Bucket=bucket_name, Key=key, Retention=retention)
eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
client.delete_object(Bucket=bucket_name, Key=key, VersionId=version_id, BypassGovernanceRetention=True)
@attr(resource='bucket')
@attr(method='put')
@attr(operation='Test put object retention with bucket object lock not enabled')
@attr(assertion='fails')
@attr('object-lock')
def test_object_lock_put_obj_retention_invalid_bucket():
bucket_name = get_new_bucket_name()
client = get_client()
client.create_bucket(Bucket=bucket_name)
key = 'file1'
client.put_object(Bucket=bucket_name, Body='abc', Key=key)
retention = {'Mode':'GOVERNANCE', 'RetainUntilDate':datetime.datetime(2030,1,1,tzinfo=pytz.UTC)}
e = assert_raises(ClientError, client.put_object_retention, Bucket=bucket_name, Key=key, Retention=retention)
status, error_code = _get_status_and_error_code(e.response)
eq(status, 400)
eq(error_code, 'InvalidRequest')
@attr(resource='bucket')
@attr(method='put')
@attr(operation='Test put object retention with invalid mode')
@attr(assertion='fails')
@attr('object-lock')
def test_object_lock_put_obj_retention_invalid_mode():
bucket_name = get_new_bucket_name()
client = get_client()
client.create_bucket(Bucket=bucket_name, ObjectLockEnabledForBucket=True)
key = 'file1'
client.put_object(Bucket=bucket_name, Body='abc', Key=key)
retention = {'Mode':'governance', 'RetainUntilDate':datetime.datetime(2030,1,1,tzinfo=pytz.UTC)}
e = assert_raises(ClientError, client.put_object_retention, Bucket=bucket_name, Key=key, Retention=retention)
status, error_code = _get_status_and_error_code(e.response)
eq(status, 400)
eq(error_code, 'MalformedXML')
retention = {'Mode':'abc', 'RetainUntilDate':datetime.datetime(2030,1,1,tzinfo=pytz.UTC)}
e = assert_raises(ClientError, client.put_object_retention, Bucket=bucket_name, Key=key, Retention=retention)
status, error_code = _get_status_and_error_code(e.response)
eq(status, 400)
eq(error_code, 'MalformedXML')
@attr(resource='bucket')
@attr(method='get')
@attr(operation='Test get object retention')
@attr(assertion='success')
@attr('object-lock')
def test_object_lock_get_obj_retention():
bucket_name = get_new_bucket_name()
client = get_client()
client.create_bucket(Bucket=bucket_name, ObjectLockEnabledForBucket=True)
key = 'file1'
response = client.put_object(Bucket=bucket_name, Body='abc', Key=key)
version_id = response['VersionId']
retention = {'Mode':'GOVERNANCE', 'RetainUntilDate':datetime.datetime(2030,1,1,tzinfo=pytz.UTC)}
client.put_object_retention(Bucket=bucket_name, Key=key, Retention=retention)
response = client.get_object_retention(Bucket=bucket_name, Key=key)
eq(response['Retention'], retention)
client.delete_object(Bucket=bucket_name, Key=key, VersionId=version_id, BypassGovernanceRetention=True)
@attr(resource='bucket')
@attr(method='get')
@attr(operation='Test get object retention with invalid bucket')
@attr(assertion='fails')
@attr('object-lock')
def test_object_lock_get_obj_retention_invalid_bucket():
bucket_name = get_new_bucket_name()
client = get_client()
client.create_bucket(Bucket=bucket_name)
key = 'file1'
client.put_object(Bucket=bucket_name, Body='abc', Key=key)
e = assert_raises(ClientError, client.get_object_retention, Bucket=bucket_name, Key=key)
status, error_code = _get_status_and_error_code(e.response)
eq(status, 400)
eq(error_code, 'InvalidRequest')
@attr(resource='bucket')
@attr(method='put')
@attr(operation='Test put object retention with version id')
@attr(assertion='success')
@attr('object-lock')
def test_object_lock_put_obj_retention_versionid():
bucket_name = get_new_bucket_name()
client = get_client()
client.create_bucket(Bucket=bucket_name, ObjectLockEnabledForBucket=True)
key = 'file1'
client.put_object(Bucket=bucket_name, Body='abc', Key=key)
response = client.put_object(Bucket=bucket_name, Body='abc', Key=key)
version_id = response['VersionId']
retention = {'Mode':'GOVERNANCE', 'RetainUntilDate':datetime.datetime(2030,1,1,tzinfo=pytz.UTC)}
client.put_object_retention(Bucket=bucket_name, Key=key, VersionId=version_id, Retention=retention)
response = client.get_object_retention(Bucket=bucket_name, Key=key, VersionId=version_id)
eq(response['Retention'], retention)
client.delete_object(Bucket=bucket_name, Key=key, VersionId=version_id, BypassGovernanceRetention=True)
@attr(resource='bucket')
@attr(method='put')
@attr(operation='Test put object retention to override default retention')
@attr(assertion='success')
@attr('object-lock')
def test_object_lock_put_obj_retention_override_default_retention():
bucket_name = get_new_bucket_name()
client = get_client()
client.create_bucket(Bucket=bucket_name, ObjectLockEnabledForBucket=True)
conf = {'ObjectLockEnabled':'Enabled',
'Rule': {
'DefaultRetention':{
'Mode':'GOVERNANCE',
'Days':1
}
}}
client.put_object_lock_configuration(
Bucket=bucket_name,
ObjectLockConfiguration=conf)
key = 'file1'
response = client.put_object(Bucket=bucket_name, Body='abc', Key=key)
version_id = response['VersionId']
retention = {'Mode':'GOVERNANCE', 'RetainUntilDate':datetime.datetime(2030,1,1,tzinfo=pytz.UTC)}
client.put_object_retention(Bucket=bucket_name, Key=key, Retention=retention)
response = client.get_object_retention(Bucket=bucket_name, Key=key)
eq(response['Retention'], retention)
client.delete_object(Bucket=bucket_name, Key=key, VersionId=version_id, BypassGovernanceRetention=True)
@attr(resource='bucket')
@attr(method='put')
@attr(operation='Test put object retention to increase retention period')
@attr(assertion='success')
@attr('object-lock')
def test_object_lock_put_obj_retention_increase_period():
bucket_name = get_new_bucket_name()
client = get_client()
client.create_bucket(Bucket=bucket_name, ObjectLockEnabledForBucket=True)
key = 'file1'
response = client.put_object(Bucket=bucket_name, Body='abc', Key=key)
version_id = response['VersionId']
retention1 = {'Mode':'GOVERNANCE', 'RetainUntilDate':datetime.datetime(2030,1,1,tzinfo=pytz.UTC)}
client.put_object_retention(Bucket=bucket_name, Key=key, Retention=retention1)
retention2 = {'Mode':'GOVERNANCE', 'RetainUntilDate':datetime.datetime(2030,1,3,tzinfo=pytz.UTC)}
client.put_object_retention(Bucket=bucket_name, Key=key, Retention=retention2)
response = client.get_object_retention(Bucket=bucket_name, Key=key)
eq(response['Retention'], retention2)
client.delete_object(Bucket=bucket_name, Key=key, VersionId=version_id, BypassGovernanceRetention=True)
@attr(resource='bucket')
@attr(method='put')
@attr(operation='Test put object retention to shorten period')
@attr(assertion='fails')
@attr('object-lock')
def test_object_lock_put_obj_retention_shorten_period():
bucket_name = get_new_bucket_name()
client = get_client()
client.create_bucket(Bucket=bucket_name, ObjectLockEnabledForBucket=True)
key = 'file1'
response = client.put_object(Bucket=bucket_name, Body='abc', Key=key)
version_id = response['VersionId']
retention = {'Mode':'GOVERNANCE', 'RetainUntilDate':datetime.datetime(2030,1,3,tzinfo=pytz.UTC)}
client.put_object_retention(Bucket=bucket_name, Key=key, Retention=retention)
retention = {'Mode':'GOVERNANCE', 'RetainUntilDate':datetime.datetime(2030,1,1,tzinfo=pytz.UTC)}
e = assert_raises(ClientError, client.put_object_retention, Bucket=bucket_name, Key=key, Retention=retention)
status, error_code = _get_status_and_error_code(e.response)
eq(status, 403)
eq(error_code, 'AccessDenied')
client.delete_object(Bucket=bucket_name, Key=key, VersionId=version_id, BypassGovernanceRetention=True)
@attr(resource='bucket')
@attr(method='put')
@attr(operation='Test put object retention to shorten period with bypass header')
@attr(assertion='success')
@attr('object-lock')
def test_object_lock_put_obj_retention_shorten_period_bypass():
bucket_name = get_new_bucket_name()
client = get_client()
client.create_bucket(Bucket=bucket_name, ObjectLockEnabledForBucket=True)
key = 'file1'
response = client.put_object(Bucket=bucket_name, Body='abc', Key=key)
version_id = response['VersionId']
retention = {'Mode':'GOVERNANCE', 'RetainUntilDate':datetime.datetime(2030,1,3,tzinfo=pytz.UTC)}
client.put_object_retention(Bucket=bucket_name, Key=key, Retention=retention)
retention = {'Mode':'GOVERNANCE', 'RetainUntilDate':datetime.datetime(2030,1,1,tzinfo=pytz.UTC)}
client.put_object_retention(Bucket=bucket_name, Key=key, Retention=retention, BypassGovernanceRetention=True)
response = client.get_object_retention(Bucket=bucket_name, Key=key)
eq(response['Retention'], retention)
client.delete_object(Bucket=bucket_name, Key=key, VersionId=version_id, BypassGovernanceRetention=True)
@attr(resource='bucket')
@attr(method='delete')
@attr(operation='Test delete object with retention')
@attr(assertion='retention period make effects')
@attr('object-lock')
def test_object_lock_delete_object_with_retention():
bucket_name = get_new_bucket_name()
client = get_client()
client.create_bucket(Bucket=bucket_name, ObjectLockEnabledForBucket=True)
key = 'file1'
response = client.put_object(Bucket=bucket_name, Body='abc', Key=key)
retention = {'Mode':'GOVERNANCE', 'RetainUntilDate':datetime.datetime(2030,1,1,tzinfo=pytz.UTC)}
client.put_object_retention(Bucket=bucket_name, Key=key, Retention=retention)
e = assert_raises(ClientError, client.delete_object, Bucket=bucket_name, Key=key, VersionId=response['VersionId'])
status, error_code = _get_status_and_error_code(e.response)
eq(status, 403)
eq(error_code, 'AccessDenied')
response = client.delete_object(Bucket=bucket_name, Key=key, VersionId=response['VersionId'], BypassGovernanceRetention=True)
eq(response['ResponseMetadata']['HTTPStatusCode'], 204)
@attr(resource='bucket')
@attr(method='put')
@attr(operation='Test put legal hold')
@attr(assertion='success')
@attr('object-lock')
def test_object_lock_put_legal_hold():
bucket_name = get_new_bucket_name()
client = get_client()
client.create_bucket(Bucket=bucket_name, ObjectLockEnabledForBucket=True)
key = 'file1'
client.put_object(Bucket=bucket_name, Body='abc', Key=key)
legal_hold = {'Status': 'ON'}
response = client.put_object_legal_hold(Bucket=bucket_name, Key=key, LegalHold=legal_hold)
eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
response = client.put_object_legal_hold(Bucket=bucket_name, Key=key, LegalHold={'Status':'OFF'})
eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
@attr(resource='bucket')
@attr(method='put')
@attr(operation='Test put legal hold with invalid bucket')
@attr(assertion='fails')
@attr('object-lock')
def test_object_lock_put_legal_hold_invalid_bucket():
bucket_name = get_new_bucket_name()
client = get_client()
client.create_bucket(Bucket=bucket_name)
key = 'file1'
client.put_object(Bucket=bucket_name, Body='abc', Key=key)
legal_hold = {'Status': 'ON'}
e = assert_raises(ClientError, client.put_object_legal_hold, Bucket=bucket_name, Key=key, LegalHold=legal_hold)
status, error_code = _get_status_and_error_code(e.response)
eq(status, 400)
eq(error_code, 'InvalidRequest')
@attr(resource='bucket')
@attr(method='put')
@attr(operation='Test put legal hold with invalid status')
@attr(assertion='fails')
@attr('object-lock')
def test_object_lock_put_legal_hold_invalid_status():
bucket_name = get_new_bucket_name()
client = get_client()
client.create_bucket(Bucket=bucket_name, ObjectLockEnabledForBucket=True)
key = 'file1'
client.put_object(Bucket=bucket_name, Body='abc', Key=key)
legal_hold = {'Status': 'abc'}
e = assert_raises(ClientError, client.put_object_legal_hold, Bucket=bucket_name, Key=key, LegalHold=legal_hold)
status, error_code = _get_status_and_error_code(e.response)
eq(status, 400)
eq(error_code, 'MalformedXML')
@attr(resource='bucket')
@attr(method='get')
@attr(operation='Test get legal hold')
@attr(assertion='success')
@attr('object-lock')
def test_object_lock_get_legal_hold():
bucket_name = get_new_bucket_name()
client = get_client()
client.create_bucket(Bucket=bucket_name, ObjectLockEnabledForBucket=True)
key = 'file1'
client.put_object(Bucket=bucket_name, Body='abc', Key=key)
legal_hold = {'Status': 'ON'}
client.put_object_legal_hold(Bucket=bucket_name, Key=key, LegalHold=legal_hold)
response = client.get_object_legal_hold(Bucket=bucket_name, Key=key)
eq(response['LegalHold'], legal_hold)
legal_hold_off = {'Status': 'OFF'}
client.put_object_legal_hold(Bucket=bucket_name, Key=key, LegalHold=legal_hold_off)
response = client.get_object_legal_hold(Bucket=bucket_name, Key=key)
eq(response['LegalHold'], legal_hold_off)
@attr(resource='bucket')
@attr(method='get')
@attr(operation='Test get legal hold with invalid bucket')
@attr(assertion='fails')
@attr('object-lock')
def test_object_lock_get_legal_hold_invalid_bucket():
bucket_name = get_new_bucket_name()
client = get_client()
client.create_bucket(Bucket=bucket_name)
key = 'file1'
client.put_object(Bucket=bucket_name, Body='abc', Key=key)
e = assert_raises(ClientError, client.get_object_legal_hold, Bucket=bucket_name, Key=key)
status, error_code = _get_status_and_error_code(e.response)
eq(status, 400)
eq(error_code, 'InvalidRequest')
@attr(resource='bucket')
@attr(method='delete')
@attr(operation='Test delete object with legal hold on')
@attr(assertion='fails')
@attr('object-lock')
def test_object_lock_delete_object_with_legal_hold_on():
bucket_name = get_new_bucket_name()
client = get_client()
client.create_bucket(Bucket=bucket_name, ObjectLockEnabledForBucket=True)
key = 'file1'
response = client.put_object(Bucket=bucket_name, Body='abc', Key=key)
client.put_object_legal_hold(Bucket=bucket_name, Key=key, LegalHold={'Status': 'ON'})
e = assert_raises(ClientError, client.delete_object, Bucket=bucket_name, Key=key, VersionId=response['VersionId'])
status, error_code = _get_status_and_error_code(e.response)
eq(status, 403)
eq(error_code, 'AccessDenied')
client.put_object_legal_hold(Bucket=bucket_name, Key=key, LegalHold={'Status':'OFF'})
@attr(resource='bucket')
@attr(method='delete')
@attr(operation='Test delete object with legal hold off')
@attr(assertion='fails')
@attr('object-lock')
def test_object_lock_delete_object_with_legal_hold_off():
bucket_name = get_new_bucket_name()
client = get_client()
client.create_bucket(Bucket=bucket_name, ObjectLockEnabledForBucket=True)
key = 'file1'
response = client.put_object(Bucket=bucket_name, Body='abc', Key=key)
client.put_object_legal_hold(Bucket=bucket_name, Key=key, LegalHold={'Status': 'OFF'})
response = client.delete_object(Bucket=bucket_name, Key=key, VersionId=response['VersionId'])
eq(response['ResponseMetadata']['HTTPStatusCode'], 204)
@attr(resource='bucket')
@attr(method='get')
@attr(operation='Test get object metadata')
@attr(assertion='success')
@attr('object-lock')
def test_object_lock_get_obj_metadata():
bucket_name = get_new_bucket_name()
client = get_client()
client.create_bucket(Bucket=bucket_name, ObjectLockEnabledForBucket=True)
key = 'file1'
client.put_object(Bucket=bucket_name, Body='abc', Key=key)
legal_hold = {'Status': 'ON'}
client.put_object_legal_hold(Bucket=bucket_name, Key=key, LegalHold=legal_hold)
retention = {'Mode':'GOVERNANCE', 'RetainUntilDate':datetime.datetime(2030,1,1,tzinfo=pytz.UTC)}
client.put_object_retention(Bucket=bucket_name, Key=key, Retention=retention)
response = client.head_object(Bucket=bucket_name, Key=key)
print response
eq(response['ObjectLockMode'], retention['Mode'])
eq(response['ObjectLockRetainUntilDate'], retention['RetainUntilDate'])
eq(response['ObjectLockLegalHoldStatus'], legal_hold['Status'])
client.put_object_legal_hold(Bucket=bucket_name, Key=key, LegalHold={'Status':'OFF'})
client.delete_object(Bucket=bucket_name, Key=key, VersionId=response['VersionId'], BypassGovernanceRetention=True)
@attr(resource='bucket')
@attr(method='put')
@attr(operation='Test put legal hold and retention when uploading object')
@attr(assertion='success')
@attr('object-lock')
def test_object_lock_uploading_obj():
bucket_name = get_new_bucket_name()
client = get_client()
client.create_bucket(Bucket=bucket_name, ObjectLockEnabledForBucket=True)
key = 'file1'
client.put_object(Bucket=bucket_name, Body='abc', Key=key, ObjectLockMode='GOVERNANCE',
ObjectLockRetainUntilDate=datetime.datetime(2030,1,1,tzinfo=pytz.UTC), ObjectLockLegalHoldStatus='ON')
response = client.head_object(Bucket=bucket_name, Key=key)
eq(response['ObjectLockMode'], 'GOVERNANCE')
eq(response['ObjectLockRetainUntilDate'], datetime.datetime(2030,1,1,tzinfo=pytz.UTC))
eq(response['ObjectLockLegalHoldStatus'], 'ON')
client.put_object_legal_hold(Bucket=bucket_name, Key=key, LegalHold={'Status':'OFF'})
client.delete_object(Bucket=bucket_name, Key=key, VersionId=response['VersionId'], BypassGovernanceRetention=True)