Compare commits

...

2 commits

Author SHA1 Message Date
70ec5a0a5b [#83] Don't create extra delete marker
We shouldn't create delete marker if:
1. object doesn't exist at all
2. last version is already a delete marker

Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
2023-04-19 17:56:11 +03:00
4a8c382491 [#91] Update values for health metric
Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
2023-04-18 10:25:11 +03:00
8 changed files with 161 additions and 16 deletions

View file

@ -8,6 +8,7 @@ This document outlines major changes between releases.
- Clean up List and Name caches when object is missing in Tree service (#57) - Clean up List and Name caches when object is missing in Tree service (#57)
- Get empty bucket CORS from frostfs (TrueCloudLab#36) - Get empty bucket CORS from frostfs (TrueCloudLab#36)
- Don't count pool error on client abort (#35) - Don't count pool error on client abort (#35)
- Don't create unnecessary delete-markers (#83)
### Added ### Added
- Return `X-Owner-Id` in `head-bucket` response (#79) - Return `X-Owner-Id` in `head-bucket` response (#79)
@ -29,6 +30,7 @@ This document outlines major changes between releases.
- Limit number of objects to delete at one time (TrueCloudLab#37) - Limit number of objects to delete at one time (TrueCloudLab#37)
- CompleteMultipartUpload handler now sends whitespace characters to keep alive client's connection (#60) - CompleteMultipartUpload handler now sends whitespace characters to keep alive client's connection (#60)
- Support new system attributes (#64) - Support new system attributes (#64)
- Changed values for `frostfs_s3_gw_state_health` metric (#91)
## [0.26.0] - 2022-12-28 ## [0.26.0] - 2022-12-28

View file

@ -147,6 +147,81 @@ func TestRemoveDeleteMarker(t *testing.T) {
require.True(t, existInMockedFrostFS(tc, bktInfo, objInfo), "object doesn't exist but should") require.True(t, existInMockedFrostFS(tc, bktInfo, objInfo), "object doesn't exist but should")
} }
func TestDeleteMarkerVersioned(t *testing.T) {
tc := prepareHandlerContext(t)
bktName, objName := "bucket-for-removal", "object-to-delete"
createVersionedBucketAndObject(t, tc, bktName, objName)
t.Run("not create new delete marker if last version is delete marker", func(t *testing.T) {
deleteMarkerVersion, isDeleteMarker := deleteObject(t, tc, bktName, objName, emptyVersion)
require.True(t, isDeleteMarker)
versions := listVersions(t, tc, bktName)
require.Equal(t, deleteMarkerVersion, versions.DeleteMarker[0].VersionID)
_, isDeleteMarker = deleteObject(t, tc, bktName, objName, emptyVersion)
require.True(t, isDeleteMarker)
versions = listVersions(t, tc, bktName)
require.Len(t, versions.DeleteMarker, 1)
require.Equal(t, deleteMarkerVersion, versions.DeleteMarker[0].VersionID)
})
t.Run("do not create delete marker if object does not exist", func(t *testing.T) {
versionsBefore := listVersions(t, tc, bktName)
_, isDeleteMarker := deleteObject(t, tc, bktName, "dummy", emptyVersion)
require.False(t, isDeleteMarker)
versionsAfter := listVersions(t, tc, bktName)
require.Equal(t, versionsBefore, versionsAfter)
})
}
func TestDeleteMarkerSuspended(t *testing.T) {
tc := prepareHandlerContext(t)
bktName, objName := "bucket-for-removal", "object-to-delete"
bktInfo, _ := createVersionedBucketAndObject(t, tc, bktName, objName)
putBucketVersioning(t, tc, bktName, false)
t.Run("not create new delete marker if last version is delete marker", func(t *testing.T) {
deleteMarkerVersion, isDeleteMarker := deleteObject(t, tc, bktName, objName, emptyVersion)
require.True(t, isDeleteMarker)
require.Equal(t, data.UnversionedObjectVersionID, deleteMarkerVersion)
deleteMarkerVersion, isDeleteMarker = deleteObject(t, tc, bktName, objName, emptyVersion)
require.True(t, isDeleteMarker)
require.Equal(t, data.UnversionedObjectVersionID, deleteMarkerVersion)
versions := listVersions(t, tc, bktName)
require.Len(t, versions.DeleteMarker, 1)
require.Equal(t, deleteMarkerVersion, versions.DeleteMarker[0].VersionID)
})
t.Run("do not create delete marker if object does not exist", func(t *testing.T) {
versionsBefore := listVersions(t, tc, bktName)
_, isDeleteMarker := deleteObject(t, tc, bktName, "dummy", emptyVersion)
require.False(t, isDeleteMarker)
versionsAfter := listVersions(t, tc, bktName)
require.Equal(t, versionsBefore, versionsAfter)
})
t.Run("remove last unversioned non delete marker", func(t *testing.T) {
objName := "obj3"
putObject(t, tc, bktName, objName)
nodeVersion, err := tc.tree.GetUnversioned(tc.Context(), bktInfo, objName)
require.NoError(t, err)
deleteMarkerVersion, isDeleteMarker := deleteObject(t, tc, bktName, objName, emptyVersion)
require.True(t, isDeleteMarker)
require.Equal(t, data.UnversionedObjectVersionID, deleteMarkerVersion)
objVersions := getVersion(listVersions(t, tc, bktName), objName)
require.Len(t, objVersions, 0)
require.False(t, tc.MockedPool().ObjectExists(nodeVersion.OID))
})
}
func TestDeleteObjectCombined(t *testing.T) { func TestDeleteObjectCombined(t *testing.T) {
tc := prepareHandlerContext(t) tc := prepareHandlerContext(t)
@ -197,7 +272,7 @@ func TestDeleteMarkers(t *testing.T) {
deleteObject(t, tc, bktName, objName, emptyVersion) deleteObject(t, tc, bktName, objName, emptyVersion)
versions := listVersions(t, tc, bktName) versions := listVersions(t, tc, bktName)
require.Len(t, versions.DeleteMarker, 3, "invalid delete markers length") require.Len(t, versions.DeleteMarker, 0, "invalid delete markers length")
require.Len(t, versions.Version, 0, "versions must be empty") require.Len(t, versions.Version, 0, "versions must be empty")
require.Len(t, listOIDsFromMockedFrostFS(t, tc, bktName), 0, "shouldn't be any object in frostfs") require.Len(t, listOIDsFromMockedFrostFS(t, tc, bktName), 0, "shouldn't be any object in frostfs")
@ -316,6 +391,16 @@ func listVersions(t *testing.T, tc *handlerContext, bktName string) *ListObjects
return res return res
} }
func getVersion(resp *ListObjectsVersionsResponse, objName string) []*ObjectVersionResponse {
var res []*ObjectVersionResponse
for i, version := range resp.Version {
if version.Key == objName {
res = append(res, &resp.Version[i])
}
}
return res
}
func putObject(t *testing.T, tc *handlerContext, bktName, objName string) { func putObject(t *testing.T, tc *handlerContext, bktName, objName string) {
body := bytes.NewReader([]byte("content")) body := bytes.NewReader([]byte("content"))
w, r := prepareTestPayloadRequest(tc, bktName, objName, body) w, r := prepareTestPayloadRequest(tc, bktName, objName, body)

View file

@ -33,6 +33,7 @@ type handlerContext struct {
t *testing.T t *testing.T
h *handler h *handler
tp *layer.TestFrostFS tp *layer.TestFrostFS
tree *tree.Tree
context context.Context context context.Context
} }
@ -85,11 +86,13 @@ func prepareHandlerContext(t *testing.T) *handlerContext {
var owner user.ID var owner user.ID
user.IDFromKey(&owner, key.PrivateKey.PublicKey) user.IDFromKey(&owner, key.PrivateKey.PublicKey)
treeMock := NewTreeServiceMock(t)
layerCfg := &layer.Config{ layerCfg := &layer.Config{
Caches: layer.DefaultCachesConfigs(zap.NewExample()), Caches: layer.DefaultCachesConfigs(zap.NewExample()),
AnonKey: layer.AnonymousKey{Key: key}, AnonKey: layer.AnonymousKey{Key: key},
Resolver: testResolver, Resolver: testResolver,
TreeService: NewTreeServiceMock(t), TreeService: treeMock,
} }
var pp netmap.PlacementPolicy var pp netmap.PlacementPolicy
@ -110,6 +113,7 @@ func prepareHandlerContext(t *testing.T) *handlerContext {
t: t, t: t,
h: h, h: h,
tp: tp, tp: tp,
tree: treeMock,
context: context.WithValue(context.Background(), api.BoxData, newTestAccessBox(t, key)), context: context.WithValue(context.Background(), api.BoxData, newTestAccessBox(t, key)),
} }
} }

View file

@ -561,21 +561,38 @@ func (n *layer) deleteObject(ctx context.Context, bkt *data.BucketInfo, settings
return obj return obj
} }
var newVersion *data.NodeVersion lastVersion, err := n.getLastNodeVersion(ctx, bkt, obj)
if err != nil {
obj.Error = err
return n.handleNotFoundError(bkt, obj)
}
if settings.VersioningSuspended() { if settings.VersioningSuspended() {
obj.VersionID = data.UnversionedObjectVersionID obj.VersionID = data.UnversionedObjectVersionID
var nodeVersion *data.NodeVersion var nullVersionToDelete *data.NodeVersion
if nodeVersion, obj.Error = n.getNodeVersionToDelete(ctx, bkt, obj); obj.Error != nil { if lastVersion.IsUnversioned {
return n.handleNotFoundError(bkt, obj) if !lastVersion.IsDeleteMarker() {
nullVersionToDelete = lastVersion
}
} else if nullVersionToDelete, obj.Error = n.getNodeVersionToDelete(ctx, bkt, obj); obj.Error != nil {
if !isNotFoundError(obj.Error) {
return obj
}
} }
if obj.DeleteMarkVersion, obj.Error = n.removeOldVersion(ctx, bkt, nodeVersion, obj); obj.Error != nil { if nullVersionToDelete != nil {
return obj if obj.DeleteMarkVersion, obj.Error = n.removeOldVersion(ctx, bkt, nullVersionToDelete, obj); obj.Error != nil {
return obj
}
} }
} }
if lastVersion.IsDeleteMarker() {
obj.DeleteMarkVersion = lastVersion.OID.EncodeToString()
return obj
}
randOID, err := getRandomOID() randOID, err := getRandomOID()
if err != nil { if err != nil {
obj.Error = fmt.Errorf("couldn't get random oid: %w", err) obj.Error = fmt.Errorf("couldn't get random oid: %w", err)
@ -584,7 +601,7 @@ func (n *layer) deleteObject(ctx context.Context, bkt *data.BucketInfo, settings
obj.DeleteMarkVersion = randOID.EncodeToString() obj.DeleteMarkVersion = randOID.EncodeToString()
newVersion = &data.NodeVersion{ newVersion := &data.NodeVersion{
BaseNodeVersion: data.BaseNodeVersion{ BaseNodeVersion: data.BaseNodeVersion{
OID: randOID, OID: randOID,
FilePath: obj.Name, FilePath: obj.Name,
@ -606,8 +623,7 @@ func (n *layer) deleteObject(ctx context.Context, bkt *data.BucketInfo, settings
} }
func (n *layer) handleNotFoundError(bkt *data.BucketInfo, obj *VersionedObject) *VersionedObject { func (n *layer) handleNotFoundError(bkt *data.BucketInfo, obj *VersionedObject) *VersionedObject {
if errors.IsS3Error(obj.Error, errors.ErrNoSuchKey) || if isNotFoundError(obj.Error) {
errors.IsS3Error(obj.Error, errors.ErrNoSuchVersion) {
obj.Error = nil obj.Error = nil
n.cache.CleanListCacheEntriesContainingObject(obj.Name, bkt.CID) n.cache.CleanListCacheEntriesContainingObject(obj.Name, bkt.CID)
n.cache.DeleteObjectName(bkt.CID, bkt.Name, obj.Name) n.cache.DeleteObjectName(bkt.CID, bkt.Name, obj.Name)
@ -616,6 +632,11 @@ func (n *layer) handleNotFoundError(bkt *data.BucketInfo, obj *VersionedObject)
return obj return obj
} }
func isNotFoundError(err error) bool {
return errors.IsS3Error(err, errors.ErrNoSuchKey) ||
errors.IsS3Error(err, errors.ErrNoSuchVersion)
}
func (n *layer) getNodeVersionToDelete(ctx context.Context, bkt *data.BucketInfo, obj *VersionedObject) (*data.NodeVersion, error) { func (n *layer) getNodeVersionToDelete(ctx context.Context, bkt *data.BucketInfo, obj *VersionedObject) (*data.NodeVersion, error) {
objVersion := &ObjectVersion{ objVersion := &ObjectVersion{
BktInfo: bkt, BktInfo: bkt,
@ -627,6 +648,17 @@ func (n *layer) getNodeVersionToDelete(ctx context.Context, bkt *data.BucketInfo
return n.getNodeVersion(ctx, objVersion) return n.getNodeVersion(ctx, objVersion)
} }
func (n *layer) getLastNodeVersion(ctx context.Context, bkt *data.BucketInfo, obj *VersionedObject) (*data.NodeVersion, error) {
objVersion := &ObjectVersion{
BktInfo: bkt,
ObjectName: obj.Name,
VersionID: "",
NoErrorOnDeleteMarker: true,
}
return n.getNodeVersion(ctx, objVersion)
}
func (n *layer) removeOldVersion(ctx context.Context, bkt *data.BucketInfo, nodeVersion *data.NodeVersion, obj *VersionedObject) (string, error) { func (n *layer) removeOldVersion(ctx context.Context, bkt *data.BucketInfo, nodeVersion *data.NodeVersion, obj *VersionedObject) (string, error) {
if nodeVersion.IsDeleteMarker() { if nodeVersion.IsDeleteMarker() {
return obj.VersionID, nil return obj.VersionID, nil

View file

@ -15,6 +15,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/creds/accessbox" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/creds/accessbox"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/checksum" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/checksum"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/eacl" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/eacl"
@ -55,6 +56,16 @@ func (t *TestFrostFS) Objects() []*object.Object {
return res return res
} }
func (t *TestFrostFS) ObjectExists(objID oid.ID) bool {
for _, obj := range t.objects {
if id, _ := obj.ID(); id.Equals(objID) {
return true
}
}
return false
}
func (t *TestFrostFS) AddObject(key string, obj *object.Object) { func (t *TestFrostFS) AddObject(key string, obj *object.Object) {
t.objects[key] = obj t.objects[key] = obj
} }
@ -161,7 +172,7 @@ func (t *TestFrostFS) ReadObject(ctx context.Context, prm PrmObjectRead) (*Objec
}, nil }, nil
} }
return nil, fmt.Errorf("object not found %s", addr) return nil, fmt.Errorf("%w: %s", apistatus.ObjectNotFound{}, addr)
} }
func (t *TestFrostFS) CreateObject(ctx context.Context, prm PrmObjectCreate) (oid.ID, error) { func (t *TestFrostFS) CreateObject(ctx context.Context, prm PrmObjectCreate) (oid.ID, error) {

View file

@ -182,6 +182,7 @@ func (a *App) initAPI(ctx context.Context) {
func (a *App) initMetrics() { func (a *App) initMetrics() {
a.metrics = metrics.NewAppMetrics(a.log, frostfs.NewPoolStatistic(a.pool), a.cfg.GetBool(cfgPrometheusEnabled)) a.metrics = metrics.NewAppMetrics(a.log, frostfs.NewPoolStatistic(a.pool), a.cfg.GetBool(cfgPrometheusEnabled))
a.metrics.SetHealth(metrics.HealthStatusStarting)
} }
func (a *App) initResolver() { func (a *App) initResolver() {
@ -369,7 +370,7 @@ func (a *App) Wait() {
} }
func (a *App) setHealthStatus() { func (a *App) setHealthStatus() {
a.metrics.SetHealth(1) a.metrics.SetHealth(metrics.HealthStatusReady)
} }
// Serve runs HTTP server to handle S3 API requests. // Serve runs HTTP server to handle S3 API requests.

View file

@ -37,7 +37,7 @@ func (m *AppMetrics) SetEnabled(enabled bool) {
m.mu.Unlock() m.mu.Unlock()
} }
func (m *AppMetrics) SetHealth(status int32) { func (m *AppMetrics) SetHealth(status HealthStatus) {
if !m.isEnabled() { if !m.isEnabled() {
return return
} }
@ -48,7 +48,7 @@ func (m *AppMetrics) SetHealth(status int32) {
func (m *AppMetrics) Shutdown() { func (m *AppMetrics) Shutdown() {
m.mu.Lock() m.mu.Lock()
if m.enabled { if m.enabled {
m.gate.State.SetHealth(0) m.gate.State.SetHealth(HealthStatusShuttingDown)
m.enabled = false m.enabled = false
} }
m.gate.Unregister() m.gate.Unregister()

View file

@ -4,6 +4,16 @@ import "github.com/prometheus/client_golang/prometheus"
const stateSubsystem = "state" const stateSubsystem = "state"
// HealthStatus of the gate application.
type HealthStatus int32
const (
HealthStatusUndefined HealthStatus = 0
HealthStatusStarting HealthStatus = 1
HealthStatusReady HealthStatus = 2
HealthStatusShuttingDown HealthStatus = 3
)
type stateMetrics struct { type stateMetrics struct {
healthCheck prometheus.Gauge healthCheck prometheus.Gauge
} }
@ -27,6 +37,6 @@ func (m stateMetrics) unregister() {
prometheus.Unregister(m.healthCheck) prometheus.Unregister(m.healthCheck)
} }
func (m stateMetrics) SetHealth(s int32) { func (m stateMetrics) SetHealth(s HealthStatus) {
m.healthCheck.Set(float64(s)) m.healthCheck.Set(float64(s))
} }