forked from TrueCloudLab/frostfs-s3-gw
Compare commits
2 commits
a5c89b78bc
...
70ec5a0a5b
Author | SHA1 | Date | |
---|---|---|---|
70ec5a0a5b | |||
4a8c382491 |
8 changed files with 161 additions and 16 deletions
|
@ -8,6 +8,7 @@ This document outlines major changes between releases.
|
|||
- Clean up List and Name caches when object is missing in Tree service (#57)
|
||||
- Get empty bucket CORS from frostfs (TrueCloudLab#36)
|
||||
- Don't count pool error on client abort (#35)
|
||||
- Don't create unnecessary delete-markers (#83)
|
||||
|
||||
### Added
|
||||
- Return `X-Owner-Id` in `head-bucket` response (#79)
|
||||
|
@ -29,6 +30,7 @@ This document outlines major changes between releases.
|
|||
- Limit number of objects to delete at one time (TrueCloudLab#37)
|
||||
- CompleteMultipartUpload handler now sends whitespace characters to keep alive client's connection (#60)
|
||||
- Support new system attributes (#64)
|
||||
- Changed values for `frostfs_s3_gw_state_health` metric (#91)
|
||||
|
||||
## [0.26.0] - 2022-12-28
|
||||
|
||||
|
|
|
@ -147,6 +147,81 @@ func TestRemoveDeleteMarker(t *testing.T) {
|
|||
require.True(t, existInMockedFrostFS(tc, bktInfo, objInfo), "object doesn't exist but should")
|
||||
}
|
||||
|
||||
func TestDeleteMarkerVersioned(t *testing.T) {
|
||||
tc := prepareHandlerContext(t)
|
||||
|
||||
bktName, objName := "bucket-for-removal", "object-to-delete"
|
||||
createVersionedBucketAndObject(t, tc, bktName, objName)
|
||||
|
||||
t.Run("not create new delete marker if last version is delete marker", func(t *testing.T) {
|
||||
deleteMarkerVersion, isDeleteMarker := deleteObject(t, tc, bktName, objName, emptyVersion)
|
||||
require.True(t, isDeleteMarker)
|
||||
versions := listVersions(t, tc, bktName)
|
||||
require.Equal(t, deleteMarkerVersion, versions.DeleteMarker[0].VersionID)
|
||||
|
||||
_, isDeleteMarker = deleteObject(t, tc, bktName, objName, emptyVersion)
|
||||
require.True(t, isDeleteMarker)
|
||||
versions = listVersions(t, tc, bktName)
|
||||
require.Len(t, versions.DeleteMarker, 1)
|
||||
require.Equal(t, deleteMarkerVersion, versions.DeleteMarker[0].VersionID)
|
||||
})
|
||||
|
||||
t.Run("do not create delete marker if object does not exist", func(t *testing.T) {
|
||||
versionsBefore := listVersions(t, tc, bktName)
|
||||
_, isDeleteMarker := deleteObject(t, tc, bktName, "dummy", emptyVersion)
|
||||
require.False(t, isDeleteMarker)
|
||||
versionsAfter := listVersions(t, tc, bktName)
|
||||
require.Equal(t, versionsBefore, versionsAfter)
|
||||
})
|
||||
}
|
||||
|
||||
func TestDeleteMarkerSuspended(t *testing.T) {
|
||||
tc := prepareHandlerContext(t)
|
||||
|
||||
bktName, objName := "bucket-for-removal", "object-to-delete"
|
||||
bktInfo, _ := createVersionedBucketAndObject(t, tc, bktName, objName)
|
||||
putBucketVersioning(t, tc, bktName, false)
|
||||
|
||||
t.Run("not create new delete marker if last version is delete marker", func(t *testing.T) {
|
||||
deleteMarkerVersion, isDeleteMarker := deleteObject(t, tc, bktName, objName, emptyVersion)
|
||||
require.True(t, isDeleteMarker)
|
||||
require.Equal(t, data.UnversionedObjectVersionID, deleteMarkerVersion)
|
||||
|
||||
deleteMarkerVersion, isDeleteMarker = deleteObject(t, tc, bktName, objName, emptyVersion)
|
||||
require.True(t, isDeleteMarker)
|
||||
require.Equal(t, data.UnversionedObjectVersionID, deleteMarkerVersion)
|
||||
|
||||
versions := listVersions(t, tc, bktName)
|
||||
require.Len(t, versions.DeleteMarker, 1)
|
||||
require.Equal(t, deleteMarkerVersion, versions.DeleteMarker[0].VersionID)
|
||||
})
|
||||
|
||||
t.Run("do not create delete marker if object does not exist", func(t *testing.T) {
|
||||
versionsBefore := listVersions(t, tc, bktName)
|
||||
_, isDeleteMarker := deleteObject(t, tc, bktName, "dummy", emptyVersion)
|
||||
require.False(t, isDeleteMarker)
|
||||
versionsAfter := listVersions(t, tc, bktName)
|
||||
require.Equal(t, versionsBefore, versionsAfter)
|
||||
})
|
||||
|
||||
t.Run("remove last unversioned non delete marker", func(t *testing.T) {
|
||||
objName := "obj3"
|
||||
putObject(t, tc, bktName, objName)
|
||||
|
||||
nodeVersion, err := tc.tree.GetUnversioned(tc.Context(), bktInfo, objName)
|
||||
require.NoError(t, err)
|
||||
|
||||
deleteMarkerVersion, isDeleteMarker := deleteObject(t, tc, bktName, objName, emptyVersion)
|
||||
require.True(t, isDeleteMarker)
|
||||
require.Equal(t, data.UnversionedObjectVersionID, deleteMarkerVersion)
|
||||
|
||||
objVersions := getVersion(listVersions(t, tc, bktName), objName)
|
||||
require.Len(t, objVersions, 0)
|
||||
|
||||
require.False(t, tc.MockedPool().ObjectExists(nodeVersion.OID))
|
||||
})
|
||||
}
|
||||
|
||||
func TestDeleteObjectCombined(t *testing.T) {
|
||||
tc := prepareHandlerContext(t)
|
||||
|
||||
|
@ -197,7 +272,7 @@ func TestDeleteMarkers(t *testing.T) {
|
|||
deleteObject(t, tc, bktName, objName, emptyVersion)
|
||||
|
||||
versions := listVersions(t, tc, bktName)
|
||||
require.Len(t, versions.DeleteMarker, 3, "invalid delete markers length")
|
||||
require.Len(t, versions.DeleteMarker, 0, "invalid delete markers length")
|
||||
require.Len(t, versions.Version, 0, "versions must be empty")
|
||||
|
||||
require.Len(t, listOIDsFromMockedFrostFS(t, tc, bktName), 0, "shouldn't be any object in frostfs")
|
||||
|
@ -316,6 +391,16 @@ func listVersions(t *testing.T, tc *handlerContext, bktName string) *ListObjects
|
|||
return res
|
||||
}
|
||||
|
||||
func getVersion(resp *ListObjectsVersionsResponse, objName string) []*ObjectVersionResponse {
|
||||
var res []*ObjectVersionResponse
|
||||
for i, version := range resp.Version {
|
||||
if version.Key == objName {
|
||||
res = append(res, &resp.Version[i])
|
||||
}
|
||||
}
|
||||
return res
|
||||
}
|
||||
|
||||
func putObject(t *testing.T, tc *handlerContext, bktName, objName string) {
|
||||
body := bytes.NewReader([]byte("content"))
|
||||
w, r := prepareTestPayloadRequest(tc, bktName, objName, body)
|
||||
|
|
|
@ -33,6 +33,7 @@ type handlerContext struct {
|
|||
t *testing.T
|
||||
h *handler
|
||||
tp *layer.TestFrostFS
|
||||
tree *tree.Tree
|
||||
context context.Context
|
||||
}
|
||||
|
||||
|
@ -85,11 +86,13 @@ func prepareHandlerContext(t *testing.T) *handlerContext {
|
|||
var owner user.ID
|
||||
user.IDFromKey(&owner, key.PrivateKey.PublicKey)
|
||||
|
||||
treeMock := NewTreeServiceMock(t)
|
||||
|
||||
layerCfg := &layer.Config{
|
||||
Caches: layer.DefaultCachesConfigs(zap.NewExample()),
|
||||
AnonKey: layer.AnonymousKey{Key: key},
|
||||
Resolver: testResolver,
|
||||
TreeService: NewTreeServiceMock(t),
|
||||
TreeService: treeMock,
|
||||
}
|
||||
|
||||
var pp netmap.PlacementPolicy
|
||||
|
@ -110,6 +113,7 @@ func prepareHandlerContext(t *testing.T) *handlerContext {
|
|||
t: t,
|
||||
h: h,
|
||||
tp: tp,
|
||||
tree: treeMock,
|
||||
context: context.WithValue(context.Background(), api.BoxData, newTestAccessBox(t, key)),
|
||||
}
|
||||
}
|
||||
|
|
|
@ -561,21 +561,38 @@ func (n *layer) deleteObject(ctx context.Context, bkt *data.BucketInfo, settings
|
|||
return obj
|
||||
}
|
||||
|
||||
var newVersion *data.NodeVersion
|
||||
lastVersion, err := n.getLastNodeVersion(ctx, bkt, obj)
|
||||
if err != nil {
|
||||
obj.Error = err
|
||||
return n.handleNotFoundError(bkt, obj)
|
||||
}
|
||||
|
||||
if settings.VersioningSuspended() {
|
||||
obj.VersionID = data.UnversionedObjectVersionID
|
||||
|
||||
var nodeVersion *data.NodeVersion
|
||||
if nodeVersion, obj.Error = n.getNodeVersionToDelete(ctx, bkt, obj); obj.Error != nil {
|
||||
return n.handleNotFoundError(bkt, obj)
|
||||
var nullVersionToDelete *data.NodeVersion
|
||||
if lastVersion.IsUnversioned {
|
||||
if !lastVersion.IsDeleteMarker() {
|
||||
nullVersionToDelete = lastVersion
|
||||
}
|
||||
} else if nullVersionToDelete, obj.Error = n.getNodeVersionToDelete(ctx, bkt, obj); obj.Error != nil {
|
||||
if !isNotFoundError(obj.Error) {
|
||||
return obj
|
||||
}
|
||||
}
|
||||
|
||||
if obj.DeleteMarkVersion, obj.Error = n.removeOldVersion(ctx, bkt, nodeVersion, obj); obj.Error != nil {
|
||||
return obj
|
||||
if nullVersionToDelete != nil {
|
||||
if obj.DeleteMarkVersion, obj.Error = n.removeOldVersion(ctx, bkt, nullVersionToDelete, obj); obj.Error != nil {
|
||||
return obj
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if lastVersion.IsDeleteMarker() {
|
||||
obj.DeleteMarkVersion = lastVersion.OID.EncodeToString()
|
||||
return obj
|
||||
}
|
||||
|
||||
randOID, err := getRandomOID()
|
||||
if err != nil {
|
||||
obj.Error = fmt.Errorf("couldn't get random oid: %w", err)
|
||||
|
@ -584,7 +601,7 @@ func (n *layer) deleteObject(ctx context.Context, bkt *data.BucketInfo, settings
|
|||
|
||||
obj.DeleteMarkVersion = randOID.EncodeToString()
|
||||
|
||||
newVersion = &data.NodeVersion{
|
||||
newVersion := &data.NodeVersion{
|
||||
BaseNodeVersion: data.BaseNodeVersion{
|
||||
OID: randOID,
|
||||
FilePath: obj.Name,
|
||||
|
@ -606,8 +623,7 @@ func (n *layer) deleteObject(ctx context.Context, bkt *data.BucketInfo, settings
|
|||
}
|
||||
|
||||
func (n *layer) handleNotFoundError(bkt *data.BucketInfo, obj *VersionedObject) *VersionedObject {
|
||||
if errors.IsS3Error(obj.Error, errors.ErrNoSuchKey) ||
|
||||
errors.IsS3Error(obj.Error, errors.ErrNoSuchVersion) {
|
||||
if isNotFoundError(obj.Error) {
|
||||
obj.Error = nil
|
||||
n.cache.CleanListCacheEntriesContainingObject(obj.Name, bkt.CID)
|
||||
n.cache.DeleteObjectName(bkt.CID, bkt.Name, obj.Name)
|
||||
|
@ -616,6 +632,11 @@ func (n *layer) handleNotFoundError(bkt *data.BucketInfo, obj *VersionedObject)
|
|||
return obj
|
||||
}
|
||||
|
||||
func isNotFoundError(err error) bool {
|
||||
return errors.IsS3Error(err, errors.ErrNoSuchKey) ||
|
||||
errors.IsS3Error(err, errors.ErrNoSuchVersion)
|
||||
}
|
||||
|
||||
func (n *layer) getNodeVersionToDelete(ctx context.Context, bkt *data.BucketInfo, obj *VersionedObject) (*data.NodeVersion, error) {
|
||||
objVersion := &ObjectVersion{
|
||||
BktInfo: bkt,
|
||||
|
@ -627,6 +648,17 @@ func (n *layer) getNodeVersionToDelete(ctx context.Context, bkt *data.BucketInfo
|
|||
return n.getNodeVersion(ctx, objVersion)
|
||||
}
|
||||
|
||||
func (n *layer) getLastNodeVersion(ctx context.Context, bkt *data.BucketInfo, obj *VersionedObject) (*data.NodeVersion, error) {
|
||||
objVersion := &ObjectVersion{
|
||||
BktInfo: bkt,
|
||||
ObjectName: obj.Name,
|
||||
VersionID: "",
|
||||
NoErrorOnDeleteMarker: true,
|
||||
}
|
||||
|
||||
return n.getNodeVersion(ctx, objVersion)
|
||||
}
|
||||
|
||||
func (n *layer) removeOldVersion(ctx context.Context, bkt *data.BucketInfo, nodeVersion *data.NodeVersion, obj *VersionedObject) (string, error) {
|
||||
if nodeVersion.IsDeleteMarker() {
|
||||
return obj.VersionID, nil
|
||||
|
|
|
@ -15,6 +15,7 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/creds/accessbox"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/checksum"
|
||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/eacl"
|
||||
|
@ -55,6 +56,16 @@ func (t *TestFrostFS) Objects() []*object.Object {
|
|||
return res
|
||||
}
|
||||
|
||||
func (t *TestFrostFS) ObjectExists(objID oid.ID) bool {
|
||||
for _, obj := range t.objects {
|
||||
if id, _ := obj.ID(); id.Equals(objID) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
func (t *TestFrostFS) AddObject(key string, obj *object.Object) {
|
||||
t.objects[key] = obj
|
||||
}
|
||||
|
@ -161,7 +172,7 @@ func (t *TestFrostFS) ReadObject(ctx context.Context, prm PrmObjectRead) (*Objec
|
|||
}, nil
|
||||
}
|
||||
|
||||
return nil, fmt.Errorf("object not found %s", addr)
|
||||
return nil, fmt.Errorf("%w: %s", apistatus.ObjectNotFound{}, addr)
|
||||
}
|
||||
|
||||
func (t *TestFrostFS) CreateObject(ctx context.Context, prm PrmObjectCreate) (oid.ID, error) {
|
||||
|
|
|
@ -182,6 +182,7 @@ func (a *App) initAPI(ctx context.Context) {
|
|||
|
||||
func (a *App) initMetrics() {
|
||||
a.metrics = metrics.NewAppMetrics(a.log, frostfs.NewPoolStatistic(a.pool), a.cfg.GetBool(cfgPrometheusEnabled))
|
||||
a.metrics.SetHealth(metrics.HealthStatusStarting)
|
||||
}
|
||||
|
||||
func (a *App) initResolver() {
|
||||
|
@ -369,7 +370,7 @@ func (a *App) Wait() {
|
|||
}
|
||||
|
||||
func (a *App) setHealthStatus() {
|
||||
a.metrics.SetHealth(1)
|
||||
a.metrics.SetHealth(metrics.HealthStatusReady)
|
||||
}
|
||||
|
||||
// Serve runs HTTP server to handle S3 API requests.
|
||||
|
|
|
@ -37,7 +37,7 @@ func (m *AppMetrics) SetEnabled(enabled bool) {
|
|||
m.mu.Unlock()
|
||||
}
|
||||
|
||||
func (m *AppMetrics) SetHealth(status int32) {
|
||||
func (m *AppMetrics) SetHealth(status HealthStatus) {
|
||||
if !m.isEnabled() {
|
||||
return
|
||||
}
|
||||
|
@ -48,7 +48,7 @@ func (m *AppMetrics) SetHealth(status int32) {
|
|||
func (m *AppMetrics) Shutdown() {
|
||||
m.mu.Lock()
|
||||
if m.enabled {
|
||||
m.gate.State.SetHealth(0)
|
||||
m.gate.State.SetHealth(HealthStatusShuttingDown)
|
||||
m.enabled = false
|
||||
}
|
||||
m.gate.Unregister()
|
||||
|
|
|
@ -4,6 +4,16 @@ import "github.com/prometheus/client_golang/prometheus"
|
|||
|
||||
const stateSubsystem = "state"
|
||||
|
||||
// HealthStatus of the gate application.
|
||||
type HealthStatus int32
|
||||
|
||||
const (
|
||||
HealthStatusUndefined HealthStatus = 0
|
||||
HealthStatusStarting HealthStatus = 1
|
||||
HealthStatusReady HealthStatus = 2
|
||||
HealthStatusShuttingDown HealthStatus = 3
|
||||
)
|
||||
|
||||
type stateMetrics struct {
|
||||
healthCheck prometheus.Gauge
|
||||
}
|
||||
|
@ -27,6 +37,6 @@ func (m stateMetrics) unregister() {
|
|||
prometheus.Unregister(m.healthCheck)
|
||||
}
|
||||
|
||||
func (m stateMetrics) SetHealth(s int32) {
|
||||
func (m stateMetrics) SetHealth(s HealthStatus) {
|
||||
m.healthCheck.Set(float64(s))
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue