[#262] meta: Do not return old expired objects
ci/woodpecker/push/pre-commit Pipeline was successful Details

Signed-off-by: Pavel Karpy <p.karpy@yadro.com>
pull/269/head
Pavel Karpy 2023-04-17 20:24:12 +03:00 committed by Evgenii Stratonikov
parent 13c8afcb02
commit 3d23b08773
4 changed files with 45 additions and 1 deletions

View File

@ -8,6 +8,7 @@ Changelog for FrostFS Node
### Fixed ### Fixed
- Take network settings into account during netmap contract update (#100) - Take network settings into account during netmap contract update (#100)
- Read config files from dir even if config file not provided via `--config` for node (#238) - Read config files from dir even if config file not provided via `--config` for node (#238)
- Expired by more than 1 epoch objects could be returned (#262)
### Removed ### Removed
### Updated ### Updated

View File

@ -73,6 +73,13 @@ func checkExpiredObjects(t *testing.T, db *meta.DB, f func(exp, nonExp *objectSD
require.NoError(t, metaPut(db, nonExpObj, nil)) require.NoError(t, metaPut(db, nonExpObj, nil))
f(expObj, nonExpObj) f(expObj, nonExpObj)
oldExpObj := testutil.GenerateObject()
setExpiration(oldExpObj, 1)
require.NoError(t, metaPut(db, oldExpObj, nil))
f(oldExpObj, nonExpObj)
} }
func setExpiration(o *objectSDK.Object, epoch uint64) { func setExpiration(o *objectSDK.Object, epoch uint64) {

View File

@ -66,13 +66,24 @@ func (db *DB) Get(ctx context.Context, prm GetPrm) (res GetRes, err error) {
} }
currEpoch := db.epochState.CurrentEpoch() currEpoch := db.epochState.CurrentEpoch()
var obj *objectSDK.Object
err = db.boltDB.View(func(tx *bbolt.Tx) error { err = db.boltDB.View(func(tx *bbolt.Tx) error {
key := make([]byte, addressKeySize) key := make([]byte, addressKeySize)
res.hdr, err = db.get(tx, prm.addr, key, true, prm.raw, currEpoch) obj, err = db.get(tx, prm.addr, key, true, prm.raw, currEpoch)
return err return err
}) })
if err != nil {
return
}
err = validate(obj, currEpoch)
if err != nil {
return
}
res.hdr = obj
return return
} }

View File

@ -4,7 +4,9 @@ import (
"bytes" "bytes"
"crypto/sha256" "crypto/sha256"
"fmt" "fmt"
"strconv"
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
@ -259,3 +261,26 @@ func isLockObject(tx *bbolt.Tx, idCnr cid.ID, obj oid.ID) bool {
bucketNameLockers(idCnr, make([]byte, bucketKeySize)), bucketNameLockers(idCnr, make([]byte, bucketKeySize)),
objectKey(obj, make([]byte, objectKeySize))) objectKey(obj, make([]byte, objectKeySize)))
} }
// performs object validity checks.
func validate(obj *object.Object, currEpoch uint64) error {
for _, a := range obj.Attributes() {
if key := a.Key(); key != objectV2.SysAttributeExpEpoch && key != objectV2.SysAttributeExpEpochNeoFS {
continue
}
expEpoch, err := strconv.ParseUint(a.Value(), 10, 64)
if err != nil {
// unexpected for already stored and valudated objects
return fmt.Errorf("expiration epoch parsing: %w", err)
}
if expEpoch < currEpoch {
return ErrObjectIsExpired
}
break
}
return nil
}