forked from TrueCloudLab/frostfs-s3-gw
Denis Kirillov
5f9555afad
According to https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectVersions.html we have to use `key-marker` Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
486 lines
17 KiB
Go
486 lines
17 KiB
Go
package handler
|
|
|
|
import (
|
|
"bytes"
|
|
"net/http"
|
|
"net/http/httptest"
|
|
"net/url"
|
|
"testing"
|
|
|
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
|
|
apiErrors "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors"
|
|
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
|
"github.com/stretchr/testify/require"
|
|
)
|
|
|
|
const (
|
|
emptyVersion = ""
|
|
)
|
|
|
|
func TestDeleteBucketOnAlreadyRemovedError(t *testing.T) {
|
|
hc := prepareHandlerContext(t)
|
|
|
|
bktName, objName := "bucket-for-removal", "object-to-delete"
|
|
bktInfo := createTestBucket(hc, bktName)
|
|
|
|
putObject(hc, bktName, objName)
|
|
|
|
addr := getAddressOfLastVersion(hc, bktInfo, objName)
|
|
hc.tp.SetObjectError(addr, &apistatus.ObjectAlreadyRemoved{})
|
|
|
|
deleteObjects(t, hc, bktName, [][2]string{{objName, emptyVersion}})
|
|
|
|
deleteBucket(t, hc, bktName, http.StatusNoContent)
|
|
}
|
|
|
|
func getAddressOfLastVersion(hc *handlerContext, bktInfo *data.BucketInfo, objName string) oid.Address {
|
|
nodeVersion, err := hc.tree.GetLatestVersion(hc.context, bktInfo, objName)
|
|
require.NoError(hc.t, err)
|
|
var addr oid.Address
|
|
addr.SetContainer(bktInfo.CID)
|
|
addr.SetObject(nodeVersion.OID)
|
|
return addr
|
|
}
|
|
|
|
func TestDeleteBucket(t *testing.T) {
|
|
tc := prepareHandlerContext(t)
|
|
|
|
bktName, objName := "bucket-for-removal", "object-to-delete"
|
|
_, objInfo := createVersionedBucketAndObject(t, tc, bktName, objName)
|
|
|
|
deleteMarkerVersion, isDeleteMarker := deleteObject(t, tc, bktName, objName, emptyVersion)
|
|
require.True(t, isDeleteMarker)
|
|
|
|
deleteBucket(t, tc, bktName, http.StatusConflict)
|
|
deleteObject(t, tc, bktName, objName, objInfo.VersionID())
|
|
deleteBucket(t, tc, bktName, http.StatusConflict)
|
|
deleteObject(t, tc, bktName, objName, deleteMarkerVersion)
|
|
deleteBucket(t, tc, bktName, http.StatusNoContent)
|
|
}
|
|
|
|
func TestDeleteBucketOnNotFoundError(t *testing.T) {
|
|
hc := prepareHandlerContext(t)
|
|
|
|
bktName, objName := "bucket-for-removal", "object-to-delete"
|
|
bktInfo := createTestBucket(hc, bktName)
|
|
|
|
putObject(hc, bktName, objName)
|
|
|
|
nodeVersion, err := hc.tree.GetUnversioned(hc.context, bktInfo, objName)
|
|
require.NoError(t, err)
|
|
var addr oid.Address
|
|
addr.SetContainer(bktInfo.CID)
|
|
addr.SetObject(nodeVersion.OID)
|
|
hc.tp.SetObjectError(addr, &apistatus.ObjectNotFound{})
|
|
|
|
deleteObjects(t, hc, bktName, [][2]string{{objName, emptyVersion}})
|
|
|
|
deleteBucket(t, hc, bktName, http.StatusNoContent)
|
|
}
|
|
|
|
func TestDeleteObject(t *testing.T) {
|
|
tc := prepareHandlerContext(t)
|
|
|
|
bktName, objName := "bucket-for-removal", "object-to-delete"
|
|
bktInfo, objInfo := createBucketAndObject(tc, bktName, objName)
|
|
|
|
checkFound(t, tc, bktName, objName, emptyVersion)
|
|
deleteObject(t, tc, bktName, objName, emptyVersion)
|
|
checkNotFound(t, tc, bktName, objName, emptyVersion)
|
|
|
|
require.False(t, existInMockedFrostFS(tc, bktInfo, objInfo))
|
|
}
|
|
|
|
func TestDeleteObjectFromSuspended(t *testing.T) {
|
|
tc := prepareHandlerContext(t)
|
|
bktName, objName := "bucket-versioned-for-removal", "object-to-delete"
|
|
|
|
createSuspendedBucket(t, tc, bktName)
|
|
putObject(tc, bktName, objName)
|
|
|
|
versionID, isDeleteMarker := deleteObject(t, tc, bktName, objName, emptyVersion)
|
|
require.True(t, isDeleteMarker)
|
|
require.Equal(t, data.UnversionedObjectVersionID, versionID)
|
|
}
|
|
|
|
func TestDeleteDeletedObject(t *testing.T) {
|
|
tc := prepareHandlerContext(t)
|
|
|
|
t.Run("unversioned bucket", func(t *testing.T) {
|
|
bktName, objName := "bucket-unversioned-removal", "object-to-delete"
|
|
createBucketAndObject(tc, bktName, objName)
|
|
|
|
versionID, isDeleteMarker := deleteObject(t, tc, bktName, objName, emptyVersion)
|
|
require.Empty(t, versionID)
|
|
require.False(t, isDeleteMarker)
|
|
versionID, isDeleteMarker = deleteObject(t, tc, bktName, objName, emptyVersion)
|
|
require.Empty(t, versionID)
|
|
require.False(t, isDeleteMarker)
|
|
})
|
|
|
|
t.Run("versioned bucket", func(t *testing.T) {
|
|
bktName, objName := "bucket-versioned-for-removal", "object-to-delete"
|
|
createVersionedBucketAndObject(t, tc, bktName, objName)
|
|
|
|
_, isDeleteMarker := deleteObject(t, tc, bktName, objName, emptyVersion)
|
|
require.True(t, isDeleteMarker)
|
|
_, isDeleteMarker = deleteObject(t, tc, bktName, objName, emptyVersion)
|
|
require.True(t, isDeleteMarker)
|
|
})
|
|
|
|
t.Run("versioned bucket not found obj", func(t *testing.T) {
|
|
bktName, objName := "bucket-versioned-for-removal", "object-to-delete"
|
|
_, objInfo := createVersionedBucketAndObject(t, tc, bktName, objName)
|
|
|
|
versionID, isDeleteMarker := deleteObject(t, tc, bktName, objName, objInfo.VersionID())
|
|
require.False(t, isDeleteMarker)
|
|
require.Equal(t, objInfo.VersionID(), versionID)
|
|
|
|
versionID2, isDeleteMarker := deleteObject(t, tc, bktName, objName, versionID)
|
|
require.False(t, isDeleteMarker)
|
|
require.Equal(t, objInfo.VersionID(), versionID2)
|
|
})
|
|
}
|
|
|
|
func TestDeleteObjectVersioned(t *testing.T) {
|
|
tc := prepareHandlerContext(t)
|
|
|
|
bktName, objName := "bucket-for-removal", "object-to-delete"
|
|
bktInfo, objInfo := createVersionedBucketAndObject(t, tc, bktName, objName)
|
|
|
|
checkFound(t, tc, bktName, objName, emptyVersion)
|
|
deleteObject(t, tc, bktName, objName, emptyVersion)
|
|
checkNotFound(t, tc, bktName, objName, emptyVersion)
|
|
|
|
checkFound(t, tc, bktName, objName, objInfo.VersionID())
|
|
deleteObject(t, tc, bktName, objName, objInfo.VersionID())
|
|
checkNotFound(t, tc, bktName, objName, objInfo.VersionID())
|
|
|
|
require.False(t, existInMockedFrostFS(tc, bktInfo, objInfo), "object exists but shouldn't")
|
|
}
|
|
|
|
func TestDeleteObjectUnversioned(t *testing.T) {
|
|
tc := prepareHandlerContext(t)
|
|
|
|
bktName, objName := "bucket-for-removal-unversioned", "object-to-delete-unversioned"
|
|
bktInfo, objInfo := createBucketAndObject(tc, bktName, objName)
|
|
|
|
checkFound(t, tc, bktName, objName, emptyVersion)
|
|
deleteObject(t, tc, bktName, objName, emptyVersion)
|
|
checkNotFound(t, tc, bktName, objName, emptyVersion)
|
|
|
|
versions := listVersions(t, tc, bktName)
|
|
require.Len(t, versions.DeleteMarker, 0, "delete markers must be empty")
|
|
require.Len(t, versions.Version, 0, "versions must be empty")
|
|
|
|
require.False(t, existInMockedFrostFS(tc, bktInfo, objInfo), "object exists but shouldn't")
|
|
}
|
|
|
|
func TestRemoveDeleteMarker(t *testing.T) {
|
|
tc := prepareHandlerContext(t)
|
|
|
|
bktName, objName := "bucket-for-removal", "object-to-delete"
|
|
bktInfo, objInfo := createVersionedBucketAndObject(t, tc, bktName, objName)
|
|
|
|
checkFound(t, tc, bktName, objName, emptyVersion)
|
|
deleteMarkerVersion, isDeleteMarker := deleteObject(t, tc, bktName, objName, emptyVersion)
|
|
require.True(t, isDeleteMarker)
|
|
checkNotFound(t, tc, bktName, objName, emptyVersion)
|
|
|
|
checkFound(t, tc, bktName, objName, objInfo.VersionID())
|
|
deleteObject(t, tc, bktName, objName, deleteMarkerVersion)
|
|
checkFound(t, tc, bktName, objName, emptyVersion)
|
|
|
|
require.True(t, existInMockedFrostFS(tc, bktInfo, objInfo), "object doesn't exist but should")
|
|
}
|
|
|
|
func TestDeleteMarkerVersioned(t *testing.T) {
|
|
tc := prepareHandlerContext(t)
|
|
|
|
bktName, objName := "bucket-for-removal", "object-to-delete"
|
|
createVersionedBucketAndObject(t, tc, bktName, objName)
|
|
|
|
t.Run("not create new delete marker if last version is delete marker", func(t *testing.T) {
|
|
deleteMarkerVersion, isDeleteMarker := deleteObject(t, tc, bktName, objName, emptyVersion)
|
|
require.True(t, isDeleteMarker)
|
|
versions := listVersions(t, tc, bktName)
|
|
require.Equal(t, deleteMarkerVersion, versions.DeleteMarker[0].VersionID)
|
|
|
|
_, isDeleteMarker = deleteObject(t, tc, bktName, objName, emptyVersion)
|
|
require.True(t, isDeleteMarker)
|
|
versions = listVersions(t, tc, bktName)
|
|
require.Len(t, versions.DeleteMarker, 1)
|
|
require.Equal(t, deleteMarkerVersion, versions.DeleteMarker[0].VersionID)
|
|
})
|
|
|
|
t.Run("do not create delete marker if object does not exist", func(t *testing.T) {
|
|
versionsBefore := listVersions(t, tc, bktName)
|
|
_, isDeleteMarker := deleteObject(t, tc, bktName, "dummy", emptyVersion)
|
|
require.False(t, isDeleteMarker)
|
|
versionsAfter := listVersions(t, tc, bktName)
|
|
require.Equal(t, versionsBefore, versionsAfter)
|
|
})
|
|
}
|
|
|
|
func TestDeleteMarkerSuspended(t *testing.T) {
|
|
tc := prepareHandlerContext(t)
|
|
|
|
bktName, objName := "bucket-for-removal", "object-to-delete"
|
|
bktInfo, _ := createVersionedBucketAndObject(t, tc, bktName, objName)
|
|
putBucketVersioning(t, tc, bktName, false)
|
|
|
|
t.Run("not create new delete marker if last version is delete marker", func(t *testing.T) {
|
|
deleteMarkerVersion, isDeleteMarker := deleteObject(t, tc, bktName, objName, emptyVersion)
|
|
require.True(t, isDeleteMarker)
|
|
require.Equal(t, data.UnversionedObjectVersionID, deleteMarkerVersion)
|
|
|
|
deleteMarkerVersion, isDeleteMarker = deleteObject(t, tc, bktName, objName, emptyVersion)
|
|
require.True(t, isDeleteMarker)
|
|
require.Equal(t, data.UnversionedObjectVersionID, deleteMarkerVersion)
|
|
|
|
versions := listVersions(t, tc, bktName)
|
|
require.Len(t, versions.DeleteMarker, 1)
|
|
require.Equal(t, deleteMarkerVersion, versions.DeleteMarker[0].VersionID)
|
|
})
|
|
|
|
t.Run("do not create delete marker if object does not exist", func(t *testing.T) {
|
|
versionsBefore := listVersions(t, tc, bktName)
|
|
_, isDeleteMarker := deleteObject(t, tc, bktName, "dummy", emptyVersion)
|
|
require.False(t, isDeleteMarker)
|
|
versionsAfter := listVersions(t, tc, bktName)
|
|
require.Equal(t, versionsBefore, versionsAfter)
|
|
})
|
|
|
|
t.Run("remove last unversioned non delete marker", func(t *testing.T) {
|
|
objName := "obj3"
|
|
putObject(tc, bktName, objName)
|
|
|
|
nodeVersion, err := tc.tree.GetUnversioned(tc.Context(), bktInfo, objName)
|
|
require.NoError(t, err)
|
|
|
|
deleteMarkerVersion, isDeleteMarker := deleteObject(t, tc, bktName, objName, emptyVersion)
|
|
require.True(t, isDeleteMarker)
|
|
require.Equal(t, data.UnversionedObjectVersionID, deleteMarkerVersion)
|
|
|
|
objVersions := getVersion(listVersions(t, tc, bktName), objName)
|
|
require.Len(t, objVersions, 0)
|
|
|
|
require.False(t, tc.MockedPool().ObjectExists(nodeVersion.OID))
|
|
})
|
|
}
|
|
|
|
func TestDeleteObjectCombined(t *testing.T) {
|
|
tc := prepareHandlerContext(t)
|
|
|
|
bktName, objName := "bucket-for-removal", "object-to-delete"
|
|
bktInfo, objInfo := createBucketAndObject(tc, bktName, objName)
|
|
|
|
putBucketVersioning(t, tc, bktName, true)
|
|
|
|
checkFound(t, tc, bktName, objName, emptyVersion)
|
|
deleteObject(t, tc, bktName, objName, emptyVersion)
|
|
checkNotFound(t, tc, bktName, objName, emptyVersion)
|
|
|
|
checkFound(t, tc, bktName, objName, objInfo.VersionID())
|
|
|
|
require.True(t, existInMockedFrostFS(tc, bktInfo, objInfo), "object doesn't exist but should")
|
|
}
|
|
|
|
func TestDeleteObjectSuspended(t *testing.T) {
|
|
tc := prepareHandlerContext(t)
|
|
|
|
bktName, objName := "bucket-for-removal", "object-to-delete"
|
|
bktInfo, objInfo := createBucketAndObject(tc, bktName, objName)
|
|
|
|
putBucketVersioning(t, tc, bktName, true)
|
|
|
|
checkFound(t, tc, bktName, objName, emptyVersion)
|
|
deleteObject(t, tc, bktName, objName, emptyVersion)
|
|
checkNotFound(t, tc, bktName, objName, emptyVersion)
|
|
|
|
putBucketVersioning(t, tc, bktName, false)
|
|
|
|
deleteObject(t, tc, bktName, objName, emptyVersion)
|
|
checkNotFound(t, tc, bktName, objName, objInfo.VersionID())
|
|
|
|
require.False(t, existInMockedFrostFS(tc, bktInfo, objInfo), "object exists but shouldn't")
|
|
}
|
|
|
|
func TestDeleteMarkers(t *testing.T) {
|
|
tc := prepareHandlerContext(t)
|
|
|
|
bktName, objName := "bucket-for-removal", "object-to-delete"
|
|
createTestBucket(tc, bktName)
|
|
putBucketVersioning(t, tc, bktName, true)
|
|
|
|
checkNotFound(t, tc, bktName, objName, emptyVersion)
|
|
deleteObject(t, tc, bktName, objName, emptyVersion)
|
|
deleteObject(t, tc, bktName, objName, emptyVersion)
|
|
deleteObject(t, tc, bktName, objName, emptyVersion)
|
|
|
|
versions := listVersions(t, tc, bktName)
|
|
require.Len(t, versions.DeleteMarker, 0, "invalid delete markers length")
|
|
require.Len(t, versions.Version, 0, "versions must be empty")
|
|
|
|
require.Len(t, listOIDsFromMockedFrostFS(t, tc, bktName), 0, "shouldn't be any object in frostfs")
|
|
}
|
|
|
|
func TestDeleteObjectFromListCache(t *testing.T) {
|
|
tc := prepareHandlerContext(t)
|
|
|
|
bktName, objName := "bucket-for-removal", "object-to-delete"
|
|
bktInfo, objInfo := createVersionedBucketAndObject(t, tc, bktName, objName)
|
|
|
|
versions := listObjectsV1(tc, bktName, "", "", "", -1)
|
|
require.Len(t, versions.Contents, 1)
|
|
|
|
checkFound(t, tc, bktName, objName, objInfo.VersionID())
|
|
deleteObject(t, tc, bktName, objName, objInfo.VersionID())
|
|
checkNotFound(t, tc, bktName, objName, objInfo.VersionID())
|
|
|
|
// check cache is clean after object removal
|
|
versions = listObjectsV1(tc, bktName, "", "", "", -1)
|
|
require.Len(t, versions.Contents, 0)
|
|
|
|
require.False(t, existInMockedFrostFS(tc, bktInfo, objInfo))
|
|
}
|
|
|
|
func TestDeleteObjectCheckMarkerReturn(t *testing.T) {
|
|
tc := prepareHandlerContext(t)
|
|
|
|
bktName, objName := "bucket-for-removal", "object-to-delete"
|
|
createVersionedBucketAndObject(t, tc, bktName, objName)
|
|
|
|
deleteMarkerVersion, isDeleteMarker := deleteObject(t, tc, bktName, objName, emptyVersion)
|
|
require.True(t, isDeleteMarker)
|
|
|
|
versions := listVersions(t, tc, bktName)
|
|
require.Len(t, versions.DeleteMarker, 1)
|
|
require.Equal(t, deleteMarkerVersion, versions.DeleteMarker[0].VersionID)
|
|
|
|
deleteMarkerVersion2, isDeleteMarker2 := deleteObject(t, tc, bktName, objName, deleteMarkerVersion)
|
|
require.True(t, isDeleteMarker2)
|
|
versions = listVersions(t, tc, bktName)
|
|
require.Len(t, versions.DeleteMarker, 0)
|
|
require.Equal(t, deleteMarkerVersion, deleteMarkerVersion2)
|
|
}
|
|
|
|
func createBucketAndObject(tc *handlerContext, bktName, objName string) (*data.BucketInfo, *data.ObjectInfo) {
|
|
bktInfo := createTestBucket(tc, bktName)
|
|
|
|
objInfo := createTestObject(tc, bktInfo, objName)
|
|
|
|
return bktInfo, objInfo
|
|
}
|
|
|
|
func createVersionedBucketAndObject(t *testing.T, tc *handlerContext, bktName, objName string) (*data.BucketInfo, *data.ObjectInfo) {
|
|
createTestBucket(tc, bktName)
|
|
bktInfo, err := tc.Layer().GetBucketInfo(tc.Context(), bktName)
|
|
require.NoError(t, err)
|
|
putBucketVersioning(t, tc, bktName, true)
|
|
|
|
objInfo := createTestObject(tc, bktInfo, objName)
|
|
|
|
return bktInfo, objInfo
|
|
}
|
|
|
|
func putBucketVersioning(t *testing.T, tc *handlerContext, bktName string, enabled bool) {
|
|
cfg := &VersioningConfiguration{Status: "Suspended"}
|
|
if enabled {
|
|
cfg.Status = "Enabled"
|
|
}
|
|
w, r := prepareTestRequest(tc, bktName, "", cfg)
|
|
tc.Handler().PutBucketVersioningHandler(w, r)
|
|
assertStatus(t, w, http.StatusOK)
|
|
}
|
|
|
|
func deleteObject(t *testing.T, tc *handlerContext, bktName, objName, version string) (string, bool) {
|
|
query := make(url.Values)
|
|
query.Add(api.QueryVersionID, version)
|
|
|
|
w, r := prepareTestFullRequest(tc, bktName, objName, query, nil)
|
|
tc.Handler().DeleteObjectHandler(w, r)
|
|
assertStatus(t, w, http.StatusNoContent)
|
|
|
|
return w.Header().Get(api.AmzVersionID), w.Header().Get(api.AmzDeleteMarker) != ""
|
|
}
|
|
|
|
func deleteObjects(t *testing.T, tc *handlerContext, bktName string, objVersions [][2]string) *DeleteObjectsResponse {
|
|
req := &DeleteObjectsRequest{}
|
|
for _, version := range objVersions {
|
|
req.Objects = append(req.Objects, ObjectIdentifier{
|
|
ObjectName: version[0],
|
|
VersionID: version[1],
|
|
})
|
|
}
|
|
|
|
w, r := prepareTestRequest(tc, bktName, "", req)
|
|
r.Header.Set(api.ContentMD5, "")
|
|
tc.Handler().DeleteMultipleObjectsHandler(w, r)
|
|
assertStatus(t, w, http.StatusOK)
|
|
|
|
res := &DeleteObjectsResponse{}
|
|
parseTestResponse(t, w, res)
|
|
return res
|
|
}
|
|
|
|
func deleteBucket(t *testing.T, tc *handlerContext, bktName string, code int) {
|
|
w, r := prepareTestRequest(tc, bktName, "", nil)
|
|
tc.Handler().DeleteBucketHandler(w, r)
|
|
assertStatus(t, w, code)
|
|
}
|
|
|
|
func checkNotFound(t *testing.T, hc *handlerContext, bktName, objName, version string) {
|
|
w := headObjectBase(hc, bktName, objName, version)
|
|
assertStatus(t, w, http.StatusNotFound)
|
|
}
|
|
|
|
func headObjectAssertS3Error(hc *handlerContext, bktName, objName, version string, code apiErrors.ErrorCode) {
|
|
w := headObjectBase(hc, bktName, objName, version)
|
|
assertS3Error(hc.t, w, apiErrors.GetAPIError(code))
|
|
}
|
|
|
|
func checkFound(t *testing.T, hc *handlerContext, bktName, objName, version string) {
|
|
w := headObjectBase(hc, bktName, objName, version)
|
|
assertStatus(t, w, http.StatusOK)
|
|
}
|
|
|
|
func headObjectBase(hc *handlerContext, bktName, objName, version string) *httptest.ResponseRecorder {
|
|
query := make(url.Values)
|
|
query.Add(api.QueryVersionID, version)
|
|
|
|
w, r := prepareTestFullRequest(hc, bktName, objName, query, nil)
|
|
hc.Handler().HeadObjectHandler(w, r)
|
|
return w
|
|
}
|
|
|
|
func listVersions(_ *testing.T, tc *handlerContext, bktName string) *ListObjectsVersionsResponse {
|
|
return listObjectsVersions(tc, bktName, "", "", "", "", -1)
|
|
}
|
|
|
|
func getVersion(resp *ListObjectsVersionsResponse, objName string) []*ObjectVersionResponse {
|
|
var res []*ObjectVersionResponse
|
|
for i, version := range resp.Version {
|
|
if version.Key == objName {
|
|
res = append(res, &resp.Version[i])
|
|
}
|
|
}
|
|
return res
|
|
}
|
|
|
|
func putObject(hc *handlerContext, bktName, objName string) {
|
|
body := bytes.NewReader([]byte("content"))
|
|
w, r := prepareTestPayloadRequest(hc, bktName, objName, body)
|
|
hc.Handler().PutObjectHandler(w, r)
|
|
assertStatus(hc.t, w, http.StatusOK)
|
|
}
|
|
|
|
func createSuspendedBucket(t *testing.T, tc *handlerContext, bktName string) *data.BucketInfo {
|
|
createTestBucket(tc, bktName)
|
|
bktInfo, err := tc.Layer().GetBucketInfo(tc.Context(), bktName)
|
|
require.NoError(t, err)
|
|
putBucketVersioning(t, tc, bktName, false)
|
|
return bktInfo
|
|
}
|