diff --git a/pkg/local_object_storage/writecache/flush_test.go b/pkg/local_object_storage/writecache/flush_test.go index 15dbfa6b6d..5482befc6f 100644 --- a/pkg/local_object_storage/writecache/flush_test.go +++ b/pkg/local_object_storage/writecache/flush_test.go @@ -13,6 +13,7 @@ import ( "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard/mode" "github.com/nspcc-dev/neofs-node/pkg/util/logger" checksumtest "github.com/nspcc-dev/neofs-sdk-go/checksum/test" + apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" cidtest "github.com/nspcc-dev/neofs-sdk-go/container/id/test" "github.com/nspcc-dev/neofs-sdk-go/object" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" @@ -25,17 +26,17 @@ import ( "go.uber.org/zap/zaptest" ) +type objectPair struct { + addr oid.Address + obj *object.Object +} + func TestFlush(t *testing.T) { const ( objCount = 4 smallSize = 256 ) - type objectPair struct { - addr oid.Address - obj *object.Object - } - newCache := func(t *testing.T, opts ...Option) (Cache, *blobstor.BlobStor, *meta.DB) { dir := t.TempDir() mb := meta.New( @@ -75,18 +76,7 @@ func TestFlush(t *testing.T) { putObjects := func(t *testing.T, c Cache) []objectPair { objects := make([]objectPair, objCount) for i := range objects { - obj, data := newObject(t, 1+(i%2)*smallSize) - addr := objectCore.AddressOf(obj) - - var prm common.PutPrm - prm.Address = objectCore.AddressOf(obj) - prm.Object = obj - prm.RawData = data - - _, err := c.Put(prm) - require.NoError(t, err) - - objects[i] = objectPair{addr: addr, obj: obj} + objects[i] = putObject(t, c, 1+(i%2)*smallSize) } return objects } @@ -230,6 +220,82 @@ func TestFlush(t *testing.T) { }) }) }) + + t.Run("on init", func(t *testing.T) { + wc, bs, mb := newCache(t) + objects := []objectPair{ + // removed + putObject(t, wc, 1), + putObject(t, wc, smallSize+1), + // not found + putObject(t, wc, 1), + putObject(t, wc, smallSize+1), + // ok + putObject(t, wc, 1), + putObject(t, wc, smallSize+1), + } + + require.NoError(t, wc.Close()) + require.NoError(t, bs.SetMode(mode.ReadWrite)) + require.NoError(t, mb.SetMode(mode.ReadWrite)) + + for i := range objects { + var prm meta.PutPrm + prm.SetObject(objects[i].obj) + _, err := mb.Put(prm) + require.NoError(t, err) + } + + var inhumePrm meta.InhumePrm + inhumePrm.SetAddresses(objects[0].addr, objects[1].addr) + inhumePrm.SetTombstoneAddress(oidtest.Address()) + _, err := mb.Inhume(inhumePrm) + require.NoError(t, err) + + var deletePrm meta.DeletePrm + deletePrm.SetAddresses(objects[2].addr, objects[3].addr) + _, err = mb.Delete(deletePrm) + require.NoError(t, err) + + require.NoError(t, bs.SetMode(mode.ReadOnly)) + require.NoError(t, mb.SetMode(mode.ReadOnly)) + + // Open in read-only: no error, nothing is removed. + require.NoError(t, wc.Open(true)) + require.NoError(t, wc.Init()) + for i := range objects { + _, err := wc.Get(objects[i].addr) + require.NoError(t, err, i) + } + require.NoError(t, wc.Close()) + + // Open in read-write: no error, something is removed. + require.NoError(t, wc.Open(false)) + require.NoError(t, wc.Init()) + for i := range objects { + _, err := wc.Get(objects[i].addr) + if i < 2 { + require.ErrorAs(t, err, new(apistatus.ObjectNotFound), i) + } else { + require.NoError(t, err, i) + } + } + }) +} + +func putObject(t *testing.T, c Cache, size int) objectPair { + obj, data := newObject(t, size) + + var prm common.PutPrm + prm.Address = objectCore.AddressOf(obj) + prm.Object = obj + prm.RawData = data + + _, err := c.Put(prm) + require.NoError(t, err) + + return objectPair{prm.Address, prm.Object} + } func newObject(t *testing.T, size int) (*object.Object, []byte) { diff --git a/pkg/local_object_storage/writecache/init.go b/pkg/local_object_storage/writecache/init.go index 63cf5ab8b4..dae13b1b14 100644 --- a/pkg/local_object_storage/writecache/init.go +++ b/pkg/local_object_storage/writecache/init.go @@ -4,10 +4,12 @@ import ( "errors" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common" + storagelog "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/internal/log" meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase" apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" "go.etcd.io/bbolt" + "go.uber.org/zap" ) func (c *cache) initFlushMarks() { @@ -15,8 +17,18 @@ func (c *cache) initFlushMarks() { var prm common.IteratePrm prm.LazyHandler = func(addr oid.Address, _ func() ([]byte, error)) error { - if c.isFlushed(addr) { + flushed, needRemove := c.flushStatus(addr) + if flushed { c.store.flushed.Add(addr.EncodeToString(), true) + if needRemove { + var prm common.DeletePrm + prm.Address = addr + + _, err := c.fsTree.Delete(prm) + if err == nil { + storagelog.Write(c.log, storagelog.AddressField(addr), storagelog.OpField("fstree DELETE")) + } + } } return nil } @@ -25,6 +37,7 @@ func (c *cache) initFlushMarks() { c.log.Info("filling flush marks for objects in database") var m []string + var indices []int var lastKey []byte var batchSize = flushBatchSize for { @@ -46,29 +59,56 @@ func (c *cache) initFlushMarks() { continue } - if c.isFlushed(addr) { + flushed, needRemove := c.flushStatus(addr) + if flushed { c.store.flushed.Add(addr.EncodeToString(), true) + if needRemove { + indices = append(indices, i) + } } } if len(m) == 0 { break } + + err := c.db.Batch(func(tx *bbolt.Tx) error { + b := tx.Bucket(defaultBucket) + for _, j := range indices { + if err := b.Delete([]byte(m[j])); err != nil { + return err + } + } + return nil + }) + if err == nil { + for _, j := range indices { + storagelog.Write(c.log, zap.String("address", m[j]), storagelog.OpField("db DELETE")) + } + } lastKey = append([]byte(m[len(m)-1]), 0) } c.log.Info("finished updating flush marks") } -func (c *cache) isFlushed(addr oid.Address) bool { +// flushStatus returns info about the object state in the main storage. +// First return value is true iff object exists. +// Second return value is true iff object can be safely removed. +func (c *cache) flushStatus(addr oid.Address) (bool, bool) { + var existsPrm meta.ExistsPrm + existsPrm.SetAddress(addr) + + _, err := c.metabase.Exists(existsPrm) + if err != nil { + needRemove := errors.Is(err, meta.ErrObjectIsExpired) || errors.As(err, new(apistatus.ObjectAlreadyRemoved)) + return needRemove, needRemove + } + var prm meta.StorageIDPrm prm.SetAddress(addr) - mRes, err := c.metabase.StorageID(prm) - if err != nil { - return errors.Is(err, meta.ErrObjectIsExpired) || errors.As(err, new(apistatus.ObjectAlreadyRemoved)) || errors.Is(err, apistatus.ObjectNotFound{}) - } - + mRes, _ := c.metabase.StorageID(prm) res, err := c.blobstor.Exists(common.ExistsPrm{Address: addr, StorageID: mRes.StorageID()}) - return err == nil && res.Exists + return err == nil && res.Exists, false }