Fix/Meta returns old expired objects #262
4 changed files with 45 additions and 1 deletions
|
@ -8,6 +8,7 @@ Changelog for FrostFS Node
|
|||
### Fixed
|
||||
- Take network settings into account during netmap contract update (#100)
|
||||
- Read config files from dir even if config file not provided via `--config` for node (#238)
|
||||
- Expired by more than 1 epoch objects could be returned (#262)
|
||||
|
||||
### Removed
|
||||
### Updated
|
||||
|
|
|
@ -73,6 +73,13 @@ func checkExpiredObjects(t *testing.T, db *meta.DB, f func(exp, nonExp *objectSD
|
|||
require.NoError(t, metaPut(db, nonExpObj, nil))
|
||||
|
||||
f(expObj, nonExpObj)
|
||||
|
||||
oldExpObj := testutil.GenerateObject()
|
||||
setExpiration(oldExpObj, 1)
|
||||
|
||||
require.NoError(t, metaPut(db, oldExpObj, nil))
|
||||
|
||||
f(oldExpObj, nonExpObj)
|
||||
}
|
||||
|
||||
func setExpiration(o *objectSDK.Object, epoch uint64) {
|
||||
|
|
|
@ -66,13 +66,24 @@ func (db *DB) Get(ctx context.Context, prm GetPrm) (res GetRes, err error) {
|
|||
}
|
||||
|
||||
currEpoch := db.epochState.CurrentEpoch()
|
||||
var obj *objectSDK.Object
|
||||
|
||||
err = db.boltDB.View(func(tx *bbolt.Tx) error {
|
||||
key := make([]byte, addressKeySize)
|
||||
res.hdr, err = db.get(tx, prm.addr, key, true, prm.raw, currEpoch)
|
||||
obj, err = db.get(tx, prm.addr, key, true, prm.raw, currEpoch)
|
||||
|
||||
return err
|
||||
})
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
err = validate(obj, currEpoch)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
res.hdr = obj
|
||||
|
||||
return
|
||||
}
|
||||
|
|
|
@ -4,7 +4,9 @@ import (
|
|||
"bytes"
|
||||
"crypto/sha256"
|
||||
"fmt"
|
||||
"strconv"
|
||||
|
||||
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
|
@ -259,3 +261,26 @@ func isLockObject(tx *bbolt.Tx, idCnr cid.ID, obj oid.ID) bool {
|
|||
bucketNameLockers(idCnr, make([]byte, bucketKeySize)),
|
||||
objectKey(obj, make([]byte, objectKeySize)))
|
||||
}
|
||||
|
||||
// performs object validity checks.
|
||||
func validate(obj *object.Object, currEpoch uint64) error {
|
||||
|
||||
for _, a := range obj.Attributes() {
|
||||
if key := a.Key(); key != objectV2.SysAttributeExpEpoch && key != objectV2.SysAttributeExpEpochNeoFS {
|
||||
continue
|
||||
}
|
||||
|
||||
expEpoch, err := strconv.ParseUint(a.Value(), 10, 64)
|
||||
if err != nil {
|
||||
// unexpected for already stored and valudated objects
|
||||
return fmt.Errorf("expiration epoch parsing: %w", err)
|
||||
}
|
||||
|
||||
if expEpoch < currEpoch {
|
||||
return ErrObjectIsExpired
|
||||
}
|
||||
|
||||
break
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue
Why have you decided to validate here instead of in final
Get
step, right before returning a result?can you, please, add details a little? validate what?
it is just best we can do if we have already unmarshaled an obj,
get
has multiplereturn
s so i just moved unmarshaling to that func. do you mean do validation viadefer
?I mean checking this not inside the transaction, but outside.
.Attributes()
allocates, iteration is not as fast as it could be and write tx can only execute in 1 thread.get
is used inDelete
, for example, do we have a test for deleting an expired object?oh, i see, did not think about that and forget at all that it executes inside a TX, fixed
yes, we have a couple