[#2068] writecache: Remove deleted objects from the writecache

Signed-off-by: Evgenii Stratonikov <evgeniy@morphbits.ru>
This commit is contained in:
Evgenii Stratonikov 2022-11-17 11:59:27 +03:00 committed by fyrchik
parent 4a49ea0855
commit 42554a9298
2 changed files with 132 additions and 26 deletions

View file

@ -13,6 +13,7 @@ import (
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard/mode"
"github.com/nspcc-dev/neofs-node/pkg/util/logger"
checksumtest "github.com/nspcc-dev/neofs-sdk-go/checksum/test"
apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status"
cidtest "github.com/nspcc-dev/neofs-sdk-go/container/id/test"
"github.com/nspcc-dev/neofs-sdk-go/object"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
@ -25,17 +26,17 @@ import (
"go.uber.org/zap/zaptest"
)
type objectPair struct {
addr oid.Address
obj *object.Object
}
func TestFlush(t *testing.T) {
const (
objCount = 4
smallSize = 256
)
type objectPair struct {
addr oid.Address
obj *object.Object
}
newCache := func(t *testing.T, opts ...Option) (Cache, *blobstor.BlobStor, *meta.DB) {
dir := t.TempDir()
mb := meta.New(
@ -75,18 +76,7 @@ func TestFlush(t *testing.T) {
putObjects := func(t *testing.T, c Cache) []objectPair {
objects := make([]objectPair, objCount)
for i := range objects {
obj, data := newObject(t, 1+(i%2)*smallSize)
addr := objectCore.AddressOf(obj)
var prm common.PutPrm
prm.Address = objectCore.AddressOf(obj)
prm.Object = obj
prm.RawData = data
_, err := c.Put(prm)
require.NoError(t, err)
objects[i] = objectPair{addr: addr, obj: obj}
objects[i] = putObject(t, c, 1+(i%2)*smallSize)
}
return objects
}
@ -230,6 +220,82 @@ func TestFlush(t *testing.T) {
})
})
})
t.Run("on init", func(t *testing.T) {
wc, bs, mb := newCache(t)
objects := []objectPair{
// removed
putObject(t, wc, 1),
putObject(t, wc, smallSize+1),
// not found
putObject(t, wc, 1),
putObject(t, wc, smallSize+1),
// ok
putObject(t, wc, 1),
putObject(t, wc, smallSize+1),
}
require.NoError(t, wc.Close())
require.NoError(t, bs.SetMode(mode.ReadWrite))
require.NoError(t, mb.SetMode(mode.ReadWrite))
for i := range objects {
var prm meta.PutPrm
prm.SetObject(objects[i].obj)
_, err := mb.Put(prm)
require.NoError(t, err)
}
var inhumePrm meta.InhumePrm
inhumePrm.SetAddresses(objects[0].addr, objects[1].addr)
inhumePrm.SetTombstoneAddress(oidtest.Address())
_, err := mb.Inhume(inhumePrm)
require.NoError(t, err)
var deletePrm meta.DeletePrm
deletePrm.SetAddresses(objects[2].addr, objects[3].addr)
_, err = mb.Delete(deletePrm)
require.NoError(t, err)
require.NoError(t, bs.SetMode(mode.ReadOnly))
require.NoError(t, mb.SetMode(mode.ReadOnly))
// Open in read-only: no error, nothing is removed.
require.NoError(t, wc.Open(true))
require.NoError(t, wc.Init())
for i := range objects {
_, err := wc.Get(objects[i].addr)
require.NoError(t, err, i)
}
require.NoError(t, wc.Close())
// Open in read-write: no error, something is removed.
require.NoError(t, wc.Open(false))
require.NoError(t, wc.Init())
for i := range objects {
_, err := wc.Get(objects[i].addr)
if i < 2 {
require.ErrorAs(t, err, new(apistatus.ObjectNotFound), i)
} else {
require.NoError(t, err, i)
}
}
})
}
func putObject(t *testing.T, c Cache, size int) objectPair {
obj, data := newObject(t, size)
var prm common.PutPrm
prm.Address = objectCore.AddressOf(obj)
prm.Object = obj
prm.RawData = data
_, err := c.Put(prm)
require.NoError(t, err)
return objectPair{prm.Address, prm.Object}
}
func newObject(t *testing.T, size int) (*object.Object, []byte) {

View file

@ -4,10 +4,12 @@ import (
"errors"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common"
storagelog "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/internal/log"
meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase"
apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
"go.etcd.io/bbolt"
"go.uber.org/zap"
)
func (c *cache) initFlushMarks() {
@ -15,8 +17,18 @@ func (c *cache) initFlushMarks() {
var prm common.IteratePrm
prm.LazyHandler = func(addr oid.Address, _ func() ([]byte, error)) error {
if c.isFlushed(addr) {
flushed, needRemove := c.flushStatus(addr)
if flushed {
c.store.flushed.Add(addr.EncodeToString(), true)
if needRemove {
var prm common.DeletePrm
prm.Address = addr
_, err := c.fsTree.Delete(prm)
if err == nil {
storagelog.Write(c.log, storagelog.AddressField(addr), storagelog.OpField("fstree DELETE"))
}
}
}
return nil
}
@ -25,6 +37,7 @@ func (c *cache) initFlushMarks() {
c.log.Info("filling flush marks for objects in database")
var m []string
var indices []int
var lastKey []byte
var batchSize = flushBatchSize
for {
@ -46,29 +59,56 @@ func (c *cache) initFlushMarks() {
continue
}
if c.isFlushed(addr) {
flushed, needRemove := c.flushStatus(addr)
if flushed {
c.store.flushed.Add(addr.EncodeToString(), true)
if needRemove {
indices = append(indices, i)
}
}
}
if len(m) == 0 {
break
}
err := c.db.Batch(func(tx *bbolt.Tx) error {
b := tx.Bucket(defaultBucket)
for _, j := range indices {
if err := b.Delete([]byte(m[j])); err != nil {
return err
}
}
return nil
})
if err == nil {
for _, j := range indices {
storagelog.Write(c.log, zap.String("address", m[j]), storagelog.OpField("db DELETE"))
}
}
lastKey = append([]byte(m[len(m)-1]), 0)
}
c.log.Info("finished updating flush marks")
}
func (c *cache) isFlushed(addr oid.Address) bool {
// flushStatus returns info about the object state in the main storage.
// First return value is true iff object exists.
// Second return value is true iff object can be safely removed.
func (c *cache) flushStatus(addr oid.Address) (bool, bool) {
var existsPrm meta.ExistsPrm
existsPrm.SetAddress(addr)
_, err := c.metabase.Exists(existsPrm)
if err != nil {
needRemove := errors.Is(err, meta.ErrObjectIsExpired) || errors.As(err, new(apistatus.ObjectAlreadyRemoved))
return needRemove, needRemove
}
var prm meta.StorageIDPrm
prm.SetAddress(addr)
mRes, err := c.metabase.StorageID(prm)
if err != nil {
return errors.Is(err, meta.ErrObjectIsExpired) || errors.As(err, new(apistatus.ObjectAlreadyRemoved)) || errors.Is(err, apistatus.ObjectNotFound{})
}
mRes, _ := c.metabase.StorageID(prm)
res, err := c.blobstor.Exists(common.ExistsPrm{Address: addr, StorageID: mRes.StorageID()})
return err == nil && res.Exists
return err == nil && res.Exists, false
}