Remove DropGraves() #1493
5 changed files with 129 additions and 124 deletions
|
@ -130,17 +130,9 @@ func TestDeleteECObject_WithoutSplit(t *testing.T) {
|
||||||
require.NoError(t, db.IterateOverGraveyard(context.Background(), graveyardIterationPrm))
|
require.NoError(t, db.IterateOverGraveyard(context.Background(), graveyardIterationPrm))
|
||||||
require.Equal(t, 2, len(tombstonedObjects))
|
require.Equal(t, 2, len(tombstonedObjects))
|
||||||
|
|
||||||
var tombstones []oid.Address
|
_, err = db.InhumeTombstones(context.Background(), tombstonedObjects)
|
||||||
for _, tss := range tombstonedObjects {
|
|
||||||
tombstones = append(tombstones, tss.tomb)
|
|
||||||
}
|
|
||||||
inhumePrm.SetAddresses(tombstones...)
|
|
||||||
inhumePrm.SetGCMark()
|
|
||||||
_, err = db.Inhume(context.Background(), inhumePrm)
|
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
require.NoError(t, db.DropGraves(context.Background(), tombstonedObjects))
|
|
||||||
|
|
||||||
// GC finds tombstone as garbage and deletes it
|
// GC finds tombstone as garbage and deletes it
|
||||||
|
|
||||||
garbageAddresses = nil
|
garbageAddresses = nil
|
||||||
|
@ -374,17 +366,9 @@ func testDeleteECObjectWithSplit(t *testing.T, chunksCount int, withLinking bool
|
||||||
require.NoError(t, db.IterateOverGraveyard(context.Background(), graveyardIterationPrm))
|
require.NoError(t, db.IterateOverGraveyard(context.Background(), graveyardIterationPrm))
|
||||||
require.True(t, len(tombstonedObjects) == parentCount+chunksCount)
|
require.True(t, len(tombstonedObjects) == parentCount+chunksCount)
|
||||||
|
|
||||||
var tombstones []oid.Address
|
_, err = db.InhumeTombstones(context.Background(), tombstonedObjects)
|
||||||
for _, tss := range tombstonedObjects {
|
|
||||||
tombstones = append(tombstones, tss.tomb)
|
|
||||||
}
|
|
||||||
inhumePrm.SetAddresses(tombstones...)
|
|
||||||
inhumePrm.SetGCMark()
|
|
||||||
_, err = db.Inhume(context.Background(), inhumePrm)
|
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
require.NoError(t, db.DropGraves(context.Background(), tombstonedObjects))
|
|
||||||
|
|
||||||
// GC finds tombstone as garbage and deletes it
|
// GC finds tombstone as garbage and deletes it
|
||||||
|
|
||||||
garbageAddresses = nil
|
garbageAddresses = nil
|
||||||
|
|
|
@ -9,6 +9,7 @@ import (
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||||
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
"go.etcd.io/bbolt"
|
"go.etcd.io/bbolt"
|
||||||
)
|
)
|
||||||
|
@ -255,46 +256,58 @@ func graveFromKV(k, v []byte) (res TombstonedObject, err error) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// DropGraves deletes tombstoned objects from the
|
// InhumeTombstones deletes tombstoned objects from the
|
||||||
// graveyard bucket.
|
// graveyard bucket.
|
||||||
//
|
//
|
||||||
// Returns any error appeared during deletion process.
|
// Returns any error appeared during deletion process.
|
||||||
func (db *DB) DropGraves(ctx context.Context, tss []TombstonedObject) error {
|
func (db *DB) InhumeTombstones(ctx context.Context, tss []TombstonedObject) (InhumeRes, error) {
|
||||||
var (
|
var (
|
||||||
startedAt = time.Now()
|
startedAt = time.Now()
|
||||||
success = false
|
success = false
|
||||||
)
|
)
|
||||||
defer func() {
|
defer func() {
|
||||||
db.metrics.AddMethodDuration("DropGraves", time.Since(startedAt), success)
|
db.metrics.AddMethodDuration("InhumeTombstones", time.Since(startedAt), success)
|
||||||
}()
|
}()
|
||||||
|
|
||||||
_, span := tracing.StartSpanFromContext(ctx, "metabase.DropGraves")
|
_, span := tracing.StartSpanFromContext(ctx, "metabase.InhumeTombstones")
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
|
||||||
db.modeMtx.RLock()
|
db.modeMtx.RLock()
|
||||||
defer db.modeMtx.RUnlock()
|
defer db.modeMtx.RUnlock()
|
||||||
|
|
||||||
if db.mode.NoMetabase() {
|
if db.mode.NoMetabase() {
|
||||||
return ErrDegradedMode
|
return InhumeRes{}, ErrDegradedMode
|
||||||
} else if db.mode.ReadOnly() {
|
} else if db.mode.ReadOnly() {
|
||||||
return ErrReadOnlyMode
|
return InhumeRes{}, ErrReadOnlyMode
|
||||||
}
|
}
|
||||||
|
|
||||||
buf := make([]byte, addressKeySize)
|
buf := make([]byte, addressKeySize)
|
||||||
|
prm := InhumePrm{forceRemoval: true}
|
||||||
|
currEpoch := db.epochState.CurrentEpoch()
|
||||||
|
|
||||||
return db.boltDB.Batch(func(tx *bbolt.Tx) error {
|
var res InhumeRes
|
||||||
bkt := tx.Bucket(graveyardBucketName)
|
|
||||||
if bkt == nil {
|
err := db.boltDB.Batch(func(tx *bbolt.Tx) error {
|
||||||
return nil
|
res = InhumeRes{inhumedByCnrID: make(map[cid.ID]ObjectCounters)}
|
||||||
acid-ant marked this conversation as resolved
Outdated
|
|||||||
|
|
||||||
|
garbageBKT := tx.Bucket(garbageBucketName)
|
||||||
|
graveyardBKT := tx.Bucket(graveyardBucketName)
|
||||||
|
|
||||||
|
bkt, value, err := db.getInhumeTargetBucketAndValue(garbageBKT, graveyardBKT, prm)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, ts := range tss {
|
for i := range tss {
|
||||||
err := bkt.Delete(addressKey(ts.Address(), buf))
|
if err := db.inhumeTxSingle(bkt, value, graveyardBKT, garbageBKT, tss[i].Tombstone(), buf, currEpoch, prm, &res); err != nil {
|
||||||
if err != nil {
|
return err
|
||||||
|
}
|
||||||
|
if err := graveyardBKT.Delete(addressKey(tss[i].Address(), buf)); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
|
return res, err
|
||||||
}
|
}
|
||||||
|
|
|
@ -7,7 +7,9 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
|
||||||
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
||||||
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
|
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
|
||||||
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
|
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
|
@ -393,7 +395,7 @@ func TestDB_IterateOverGarbage_Offset(t *testing.T) {
|
||||||
require.False(t, iWasCalled)
|
require.False(t, iWasCalled)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestDB_DropGraves(t *testing.T) {
|
func TestDB_InhumeTombstones(t *testing.T) {
|
||||||
db := newDB(t)
|
db := newDB(t)
|
||||||
defer func() { require.NoError(t, db.Close(context.Background())) }()
|
defer func() { require.NoError(t, db.Close(context.Background())) }()
|
||||||
|
|
||||||
|
@ -410,9 +412,20 @@ func TestDB_DropGraves(t *testing.T) {
|
||||||
err = putBig(db, obj2)
|
err = putBig(db, obj2)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
// inhume with tombstone
|
id1, _ := obj1.ID()
|
||||||
addrTombstone := oidtest.Address()
|
id2, _ := obj2.ID()
|
||||||
addrTombstone.SetContainer(cnr)
|
ts := objectSDK.NewTombstone()
|
||||||
|
ts.SetMembers([]oid.ID{id1, id2})
|
||||||
|
objTs := objectSDK.New()
|
||||||
|
objTs.SetContainerID(cnr)
|
||||||
|
objTs.SetType(objectSDK.TypeTombstone)
|
||||||
|
|
||||||
|
data, _ := ts.Marshal()
|
||||||
|
objTs.SetPayload(data)
|
||||||
|
require.NoError(t, objectSDK.CalculateAndSetID(objTs))
|
||||||
|
require.NoError(t, putBig(db, objTs))
|
||||||
|
|
||||||
|
addrTombstone := object.AddressOf(objTs)
|
||||||
|
|
||||||
var inhumePrm meta.InhumePrm
|
var inhumePrm meta.InhumePrm
|
||||||
inhumePrm.SetAddresses(object.AddressOf(obj1), object.AddressOf(obj2))
|
inhumePrm.SetAddresses(object.AddressOf(obj1), object.AddressOf(obj2))
|
||||||
|
@ -435,8 +448,11 @@ func TestDB_DropGraves(t *testing.T) {
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, 2, counter)
|
require.Equal(t, 2, counter)
|
||||||
|
|
||||||
err = db.DropGraves(context.Background(), buriedTS)
|
res, err := db.InhumeTombstones(context.Background(), buriedTS)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
require.EqualValues(t, 1, res.LogicInhumed())
|
||||||
|
require.EqualValues(t, 0, res.UserInhumed())
|
||||||
|
require.EqualValues(t, map[cid.ID]meta.ObjectCounters{cnr: {Logic: 1}}, res.InhumedByCnrID())
|
||||||
|
|
||||||
counter = 0
|
counter = 0
|
||||||
iterGravePRM.SetHandler(func(_ meta.TombstonedObject) error {
|
iterGravePRM.SetHandler(func(_ meta.TombstonedObject) error {
|
||||||
|
|
|
@ -217,85 +217,93 @@ func (db *DB) inhumeTx(tx *bbolt.Tx, epoch uint64, prm InhumePrm, res *InhumeRes
|
||||||
garbageBKT := tx.Bucket(garbageBucketName)
|
garbageBKT := tx.Bucket(garbageBucketName)
|
||||||
graveyardBKT := tx.Bucket(graveyardBucketName)
|
graveyardBKT := tx.Bucket(graveyardBucketName)
|
||||||
|
|
||||||
bkt, value, err := db.getInhumeTargetBucketAndValue(garbageBKT, graveyardBKT, &prm)
|
bkt, value, err := db.getInhumeTargetBucketAndValue(garbageBKT, graveyardBKT, prm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
buf := make([]byte, addressKeySize)
|
buf := make([]byte, addressKeySize)
|
||||||
for i := range prm.target {
|
for i := range prm.target {
|
||||||
id := prm.target[i].Object()
|
if err := db.inhumeTxSingle(bkt, value, graveyardBKT, garbageBKT, prm.target[i], buf, epoch, prm, res); err != nil {
|
||||||
cnr := prm.target[i].Container()
|
|
||||||
|
|
||||||
// prevent locked objects to be inhumed
|
|
||||||
if !prm.forceRemoval && objectLocked(tx, cnr, id) {
|
|
||||||
return new(apistatus.ObjectLocked)
|
|
||||||
}
|
|
||||||
|
|
||||||
var lockWasChecked bool
|
|
||||||
|
|
||||||
// prevent lock objects to be inhumed
|
|
||||||
// if `Inhume` was called not with the
|
|
||||||
// `WithForceGCMark` option
|
|
||||||
if !prm.forceRemoval {
|
|
||||||
if isLockObject(tx, cnr, id) {
|
|
||||||
return ErrLockObjectRemoval
|
|
||||||
}
|
|
||||||
|
|
||||||
lockWasChecked = true
|
|
||||||
}
|
|
||||||
|
|
||||||
obj, err := db.get(tx, prm.target[i], buf, false, true, epoch)
|
|
||||||
targetKey := addressKey(prm.target[i], buf)
|
|
||||||
var ecErr *objectSDK.ECInfoError
|
|
||||||
if err == nil {
|
|
||||||
err = db.updateDeleteInfo(tx, garbageBKT, graveyardBKT, targetKey, cnr, obj, res)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
} else if errors.As(err, &ecErr) {
|
|
||||||
err = db.inhumeECInfo(tx, epoch, prm.tomb, res, garbageBKT, graveyardBKT, ecErr.ECInfo(), cnr, bkt, value)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if prm.tomb != nil {
|
|
||||||
var isTomb bool
|
|
||||||
isTomb, err = db.markAsGC(graveyardBKT, garbageBKT, targetKey)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
if isTomb {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// consider checking if target is already in graveyard?
|
|
||||||
err = bkt.Put(targetKey, value)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if prm.lockObjectHandling {
|
|
||||||
// do not perform lock check if
|
|
||||||
// it was already called
|
|
||||||
if lockWasChecked {
|
|
||||||
// inhumed object is not of
|
|
||||||
// the LOCK type
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
if isLockObject(tx, cnr, id) {
|
|
||||||
res.deletedLockObj = append(res.deletedLockObj, prm.target[i])
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return db.applyInhumeResToCounters(tx, res)
|
return db.applyInhumeResToCounters(tx, res)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (db *DB) inhumeTxSingle(bkt *bbolt.Bucket, value []byte, graveyardBKT, garbageBKT *bbolt.Bucket, addr oid.Address, buf []byte, epoch uint64, prm InhumePrm, res *InhumeRes) error {
|
||||||
|
id := addr.Object()
|
||||||
|
cnr := addr.Container()
|
||||||
|
tx := bkt.Tx()
|
||||||
|
|
||||||
|
// prevent locked objects to be inhumed
|
||||||
|
if !prm.forceRemoval && objectLocked(tx, cnr, id) {
|
||||||
|
return new(apistatus.ObjectLocked)
|
||||||
|
}
|
||||||
|
|
||||||
|
var lockWasChecked bool
|
||||||
|
|
||||||
|
// prevent lock objects to be inhumed
|
||||||
|
// if `Inhume` was called not with the
|
||||||
|
// `WithForceGCMark` option
|
||||||
|
if !prm.forceRemoval {
|
||||||
|
if isLockObject(tx, cnr, id) {
|
||||||
|
return ErrLockObjectRemoval
|
||||||
|
}
|
||||||
|
|
||||||
|
lockWasChecked = true
|
||||||
|
}
|
||||||
|
|
||||||
|
obj, err := db.get(tx, addr, buf, false, true, epoch)
|
||||||
|
targetKey := addressKey(addr, buf)
|
||||||
|
var ecErr *objectSDK.ECInfoError
|
||||||
|
if err == nil {
|
||||||
|
err = db.updateDeleteInfo(tx, garbageBKT, graveyardBKT, targetKey, cnr, obj, res)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
} else if errors.As(err, &ecErr) {
|
||||||
|
err = db.inhumeECInfo(tx, epoch, prm.tomb, res, garbageBKT, graveyardBKT, ecErr.ECInfo(), cnr, bkt, value)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if prm.tomb != nil {
|
||||||
|
var isTomb bool
|
||||||
|
isTomb, err = db.markAsGC(graveyardBKT, garbageBKT, targetKey)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if isTomb {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// consider checking if target is already in graveyard?
|
||||||
|
err = bkt.Put(targetKey, value)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if prm.lockObjectHandling {
|
||||||
|
// do not perform lock check if
|
||||||
|
// it was already called
|
||||||
|
if lockWasChecked {
|
||||||
|
// inhumed object is not of
|
||||||
|
// the LOCK type
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if isLockObject(tx, cnr, id) {
|
||||||
|
res.deletedLockObj = append(res.deletedLockObj, addr)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (db *DB) inhumeECInfo(tx *bbolt.Tx, epoch uint64, tomb *oid.Address, res *InhumeRes,
|
func (db *DB) inhumeECInfo(tx *bbolt.Tx, epoch uint64, tomb *oid.Address, res *InhumeRes,
|
||||||
garbageBKT *bbolt.Bucket, graveyardBKT *bbolt.Bucket,
|
garbageBKT *bbolt.Bucket, graveyardBKT *bbolt.Bucket,
|
||||||
ecInfo *objectSDK.ECInfo, cnr cid.ID, targetBucket *bbolt.Bucket, value []byte,
|
ecInfo *objectSDK.ECInfo, cnr cid.ID, targetBucket *bbolt.Bucket, value []byte,
|
||||||
|
@ -354,7 +362,7 @@ func (db *DB) applyInhumeResToCounters(tx *bbolt.Tx, res *InhumeRes) error {
|
||||||
// 1. tombstone address if Inhume was called with
|
// 1. tombstone address if Inhume was called with
|
||||||
// a Tombstone
|
// a Tombstone
|
||||||
// 2. zeroValue if Inhume was called with a GC mark
|
// 2. zeroValue if Inhume was called with a GC mark
|
||||||
func (db *DB) getInhumeTargetBucketAndValue(garbageBKT, graveyardBKT *bbolt.Bucket, prm *InhumePrm) (targetBucket *bbolt.Bucket, value []byte, err error) {
|
func (db *DB) getInhumeTargetBucketAndValue(garbageBKT, graveyardBKT *bbolt.Bucket, prm InhumePrm) (targetBucket *bbolt.Bucket, value []byte, err error) {
|
||||||
if prm.tomb != nil {
|
if prm.tomb != nil {
|
||||||
targetBucket = graveyardBKT
|
targetBucket = graveyardBKT
|
||||||
tombKey := addressKey(*prm.tomb, make([]byte, addressKeySize))
|
tombKey := addressKey(*prm.tomb, make([]byte, addressKeySize))
|
||||||
|
|
|
@ -627,23 +627,14 @@ func (s *Shard) selectExpired(ctx context.Context, epoch uint64, addresses []oid
|
||||||
//
|
//
|
||||||
// Does not modify tss.
|
// Does not modify tss.
|
||||||
func (s *Shard) HandleExpiredTombstones(ctx context.Context, tss []meta.TombstonedObject) {
|
func (s *Shard) HandleExpiredTombstones(ctx context.Context, tss []meta.TombstonedObject) {
|
||||||
if s.GetMode().NoMetabase() {
|
s.m.RLock()
|
||||||
|
defer s.m.RUnlock()
|
||||||
|
|
||||||
|
if s.info.Mode.NoMetabase() {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Mark tombstones as garbage.
|
res, err := s.metaBase.InhumeTombstones(ctx, tss)
|
||||||
var pInhume meta.InhumePrm
|
|
||||||
|
|
||||||
tsAddrs := make([]oid.Address, 0, len(tss))
|
|
||||||
for _, ts := range tss {
|
|
||||||
tsAddrs = append(tsAddrs, ts.Tombstone())
|
|
||||||
}
|
|
||||||
|
|
||||||
pInhume.SetGCMark()
|
|
||||||
pInhume.SetAddresses(tsAddrs...)
|
|
||||||
|
|
||||||
// inhume tombstones
|
|
||||||
res, err := s.metaBase.Inhume(ctx, pInhume)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
s.log.Warn(ctx, logs.ShardCouldNotMarkTombstonesAsGarbage,
|
s.log.Warn(ctx, logs.ShardCouldNotMarkTombstonesAsGarbage,
|
||||||
zap.String("error", err.Error()),
|
zap.String("error", err.Error()),
|
||||||
|
@ -663,13 +654,6 @@ func (s *Shard) HandleExpiredTombstones(ctx context.Context, tss []meta.Tombston
|
||||||
s.addToContainerSize(delInfo.CID.EncodeToString(), -int64(delInfo.Size))
|
s.addToContainerSize(delInfo.CID.EncodeToString(), -int64(delInfo.Size))
|
||||||
i++
|
i++
|
||||||
}
|
}
|
||||||
|
|
||||||
// drop just processed expired tombstones
|
|
||||||
// from graveyard
|
|
||||||
err = s.metaBase.DropGraves(ctx, tss)
|
|
||||||
if err != nil {
|
|
||||||
s.log.Warn(ctx, logs.ShardCouldNotDropExpiredGraveRecords, zap.Error(err))
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// HandleExpiredLocks unlocks all objects which were locked by lockers.
|
// HandleExpiredLocks unlocks all objects which were locked by lockers.
|
||||||
|
|
Loading…
Add table
Reference in a new issue
Do we need to allocate it here once we don't want to return it?
We need to initialize it here, because the function in
Batch
may be executed multiple times.Oh, wait, it should be returned, the result is used in the caller.
fixed
extended the test to check the return value