Compare commits

..

3 commits

Author SHA1 Message Date
317e501d1c [#xx] Add error type for writecache misses
Signed-off-by: Alejandro Lopez <a.lopez@yadro.com>
2023-08-29 12:10:03 +03:00
4ea0df77d0 [#574] policer: Check if the container was really removed
Signed-off-by: Airat Arifullin <a.arifullin@yadro.com>
2023-08-28 14:21:38 +00:00
554ff2c06b [#574] core: Extend Source interface with DeletionInfo method
* Introduce common method EverExisted
* Define DeletionInfo for struct that must implement Source
* Refactor tree srv

Signed-off-by: Airat Arifullin <a.arifullin@yadro.com>
2023-08-28 14:21:38 +00:00
23 changed files with 375 additions and 231 deletions

View file

@ -142,7 +142,8 @@ func (c *lruNetCache) get(key uint64) (*netmapSDK.NetMap, error) {
// wrapper over TTL cache of values read from the network
// that implements container storage.
type ttlContainerStorage struct {
*ttlNetCache[cid.ID, *container.Container]
containerCache *ttlNetCache[cid.ID, *container.Container]
delInfoCache *ttlNetCache[cid.ID, *container.DelInfo]
}
func newCachedContainerStorage(v container.Source, ttl time.Duration) ttlContainerStorage {
@ -151,18 +152,31 @@ func newCachedContainerStorage(v container.Source, ttl time.Duration) ttlContain
lruCnrCache := newNetworkTTLCache(containerCacheSize, ttl, func(id cid.ID) (*container.Container, error) {
return v.Get(id)
})
lruDelInfoCache := newNetworkTTLCache(containerCacheSize, ttl, func(id cid.ID) (*container.DelInfo, error) {
return v.DeletionInfo(id)
})
return ttlContainerStorage{lruCnrCache}
return ttlContainerStorage{
containerCache: lruCnrCache,
delInfoCache: lruDelInfoCache,
}
}
func (s ttlContainerStorage) handleRemoval(cnr cid.ID) {
s.set(cnr, nil, new(apistatus.ContainerNotFound))
s.containerCache.set(cnr, nil, new(apistatus.ContainerNotFound))
// The removal invalidates possibly stored error response.
s.delInfoCache.remove(cnr)
}
// Get returns container value from the cache. If value is missing in the cache
// or expired, then it returns value from side chain and updates the cache.
func (s ttlContainerStorage) Get(cnr cid.ID) (*container.Container, error) {
return s.get(cnr)
return s.containerCache.get(cnr)
}
func (s ttlContainerStorage) DeletionInfo(cnr cid.ID) (*container.DelInfo, error) {
return s.delInfoCache.get(cnr)
}
type ttlEACLStorage struct {

View file

@ -113,7 +113,7 @@ func configureEACLAndContainerSources(c *cfg, client *cntClient.Client, cnrSrc c
c.cfgObject.eaclSource = eACLFetcher
cnrRdr.eacl = eACLFetcher
c.cfgObject.cnrSource = cnrSrc
cnrRdr.get = cnrSrc
cnrRdr.src = cnrSrc
cnrRdr.lister = client
} else {
// use RPC node as source of Container contract items (with caching)
@ -131,7 +131,7 @@ func configureEACLAndContainerSources(c *cfg, client *cntClient.Client, cnrSrc c
cnr, err := cnrSrc.Get(ev.ID)
if err == nil {
cachedContainerLister.update(cnr.Value.Owner(), ev.ID, true)
cachedContainerStorage.set(ev.ID, cnr, nil)
cachedContainerStorage.containerCache.set(ev.ID, cnr, nil)
} else {
// unlike removal, we expect successful receive of the container
// after successful creation, so logging can be useful
@ -159,7 +159,6 @@ func configureEACLAndContainerSources(c *cfg, client *cntClient.Client, cnrSrc c
}
cachedContainerStorage.handleRemoval(ev.ID)
c.log.Debug(logs.FrostFSNodeContainerRemovalEventsReceipt,
zap.Stringer("id", ev.ID),
)
@ -170,7 +169,7 @@ func configureEACLAndContainerSources(c *cfg, client *cntClient.Client, cnrSrc c
cnrRdr.lister = cachedContainerLister
cnrRdr.eacl = c.cfgObject.eaclSource
cnrRdr.get = c.cfgObject.cnrSource
cnrRdr.src = c.cfgObject.cnrSource
cnrWrt.cacheEnabled = true
cnrWrt.eacls = cachedEACLStorage
@ -641,7 +640,7 @@ func (c *usedSpaceService) processLoadValue(_ context.Context, a containerSDK.Si
type morphContainerReader struct {
eacl containerCore.EACLSource
get containerCore.Source
src containerCore.Source
lister interface {
List(*user.ID) ([]cid.ID, error)
@ -649,7 +648,11 @@ type morphContainerReader struct {
}
func (x *morphContainerReader) Get(id cid.ID) (*containerCore.Container, error) {
return x.get.Get(id)
return x.src.Get(id)
}
func (x *morphContainerReader) DeletionInfo(id cid.ID) (*containerCore.DelInfo, error) {
return x.src.DeletionInfo(id)
}
func (x *morphContainerReader) GetEACL(id cid.ID) (*containerCore.EACL, error) {

View file

@ -32,7 +32,7 @@ func (c cnrSource) Get(id cid.ID) (*container.Container, error) {
}
func (c cnrSource) DeletionInfo(cid cid.ID) (*container.DelInfo, error) {
return c.cli.DeletionInfo(cid)
return c.src.DeletionInfo(cid)
}
func (c cnrSource) List() ([]cid.ID, error) {

View file

@ -42,6 +42,7 @@ const (
NotificatorNotificatorStartProcessingObjectNotifications = "notificator: start processing object notifications"
NotificatorNotificatorProcessingObjectNotification = "notificator: processing object notification"
PolicerCouldNotGetContainer = "could not get container"
PolicerCouldNotConfirmContainerRemoval = "could not confirm container removal"
PolicerCouldNotInhumeObjectWithMissingContainer = "could not inhume object with missing container"
PolicerCouldNotBuildPlacementVectorForObject = "could not build placement vector for object"
PolicerRedundantLocalObjectCopyDetected = "redundant local object copy detected"

View file

@ -41,6 +41,8 @@ type Source interface {
// Implementations must not retain the container pointer and modify
// the container through it.
Get(cid.ID) (*Container, error)
DeletionInfo(cid.ID) (*DelInfo, error)
}
// EACL groups information about the FrostFS container's extended ACL stored in

View file

@ -0,0 +1,22 @@
package container
import (
"errors"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
)
// WasRemoved checks whether the container ever existed or
// it just has not been created yet at the current epoch.
func WasRemoved(s Source, cid cid.ID) (bool, error) {
_, err := s.DeletionInfo(cid)
if err == nil {
return true, nil
}
var errContainerNotFound *apistatus.ContainerNotFound
if errors.As(err, &errContainerNotFound) {
return false, nil
}
return false, err
}

View file

@ -2,6 +2,7 @@ package shard
import (
"context"
"errors"
"fmt"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
@ -138,10 +139,10 @@ func (s *Shard) fetchObjectData(ctx context.Context, addr oid.Address, skipMeta
if s.hasWriteCache() {
res, err := wc(s.writeCache)
if err == nil || IsErrOutOfRange(err) {
if err == nil || IsErrOutOfRange(err) || client.IsErrObjectNotFound(err) {
return res, false, err
}
if client.IsErrObjectNotFound(err) {
if errors.Is(err, writecache.ErrMiss) {
s.log.Debug(logs.ShardObjectIsMissingInWritecache,
zap.Stringer("addr", addr),
zap.Bool("skip_meta", skipMeta))

View file

@ -63,4 +63,11 @@ var (
ErrBigObject = errors.New("too big object")
// ErrOutOfSpace is returned when there is no space left to put a new object.
ErrOutOfSpace = errors.New("no space left in the write cache")
// ErrMiss is returned when there is no information about the object in the cache.
// Note that the distinction between this error and ObjectNotFound is important. When
// the cache returns ObjectNotFound, it can be safely assumed that the object is not
// found in the cache or any underlying storage. When the cache returns ErrMiss, nothing
// is known about the object and thus the underlying storage must be consulted as well.
// This allows the cache to serve as a consistent view of the underlying storage.
ErrMiss = errors.New("write-cache miss")
)

View file

@ -6,10 +6,8 @@ import (
storagelog "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/log"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"github.com/dgraph-io/badger/v4"
"go.opentelemetry.io/otel/attribute"
@ -17,8 +15,6 @@ import (
)
// Delete removes object from write-cache.
//
// Returns an error of type apistatus.ObjectNotFound if object is missing in write-cache.
func (c *cache) Delete(ctx context.Context, addr oid.Address) error {
_, span := tracing.StartSpanFromContext(ctx, "writecache.Delete",
trace.WithAttributes(
@ -39,22 +35,22 @@ func (c *cache) Delete(ctx context.Context, addr oid.Address) error {
return writecache.ErrReadOnly
}
saddr := addr.EncodeToString()
key := addr2key(addr)
err := c.db.Update(func(tx *badger.Txn) error {
it, err := tx.Get([]byte(saddr))
it, err := tx.Get(key[:])
if err != nil {
if err == badger.ErrKeyNotFound {
return logicerr.Wrap(new(apistatus.ObjectNotFound))
return writecache.ErrMiss
}
return err
}
if it.ValueSize() > 0 {
storageType = writecache.StorageTypeDB
err := tx.Delete([]byte(saddr))
err := tx.Delete(key[:])
if err == nil {
storagelog.Write(c.log,
storagelog.AddressField(saddr),
storagelog.AddressField(addr.EncodeToString()),
storagelog.StorageTypeField(wcStorageType),
storagelog.OpField("db DELETE"),
)

View file

@ -1,19 +0,0 @@
package writecachebadger
import (
"testing"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/storagetest"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger/test"
)
func TestGeneric(t *testing.T) {
storagetest.TestAll(t, func(t *testing.T) storagetest.Component {
return New(
WithLogger(test.NewLogger(t, true)),
WithFlushWorkersCount(2),
WithPath(t.TempDir()),
WithGCInterval(1*time.Second))
})
}

View file

@ -5,10 +5,8 @@ import (
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"github.com/dgraph-io/badger/v4"
@ -16,9 +14,6 @@ import (
"go.opentelemetry.io/otel/trace"
)
// Get returns object from write-cache.
//
// Returns an error of type apistatus.ObjectNotFound if the requested object is missing in write-cache.
func (c *cache) Get(ctx context.Context, addr oid.Address) (*objectSDK.Object, error) {
_, span := tracing.StartSpanFromContext(ctx, "writecache.Get",
trace.WithAttributes(
@ -47,12 +42,10 @@ func (c *cache) getInternal(addr oid.Address) (*objectSDK.Object, error) {
return obj, obj.Unmarshal(value)
}
return nil, logicerr.Wrap(new(apistatus.ObjectNotFound))
return nil, err
}
// Head returns object header from write-cache.
//
// Returns an error of type apistatus.ObjectNotFound if the requested object is missing in write-cache.
func (c *cache) Head(ctx context.Context, addr oid.Address) (*objectSDK.Object, error) {
_, span := tracing.StartSpanFromContext(ctx, "writecache.Head",
trace.WithAttributes(
@ -70,8 +63,6 @@ func (c *cache) Head(ctx context.Context, addr oid.Address) (*objectSDK.Object,
// Get fetches object from the underlying database.
// Key should be a stringified address.
//
// Returns an error of type apistatus.ObjectNotFound if the requested object is missing in db.
func Get(db *badger.DB, key []byte) ([]byte, error) {
var value []byte
@ -79,7 +70,7 @@ func Get(db *badger.DB, key []byte) ([]byte, error) {
it, err := tx.Get(key)
if err != nil {
if err == badger.ErrKeyNotFound {
return logicerr.Wrap(new(apistatus.ObjectNotFound))
return writecache.ErrMiss
}
return err
}
@ -91,5 +82,5 @@ func Get(db *badger.DB, key []byte) ([]byte, error) {
return nil
})
return value, metaerr.Wrap(err)
return value, err
}

View file

@ -6,6 +6,7 @@ import (
"testing"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/storagetest"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache"
@ -16,7 +17,7 @@ import (
"github.com/stretchr/testify/require"
)
func TestFlush(t *testing.T) {
func TestGeneric(t *testing.T) {
createCacheFn := func(t *testing.T, smallSize uint64, mb *meta.DB, bs writecache.MainStorage, opts ...Option) writecache.Cache {
return New(
append([]Option{
@ -59,5 +60,22 @@ func TestFlush(t *testing.T) {
},
}
writecachetest.TestFlush(t, createCacheFn, errCountOpt, failures...)
t.Run("api", func(t *testing.T) {
writecachetest.TestAPI(t, createCacheFn)
})
t.Run("flush", func(t *testing.T) {
writecachetest.TestFlush(t, createCacheFn, errCountOpt, failures...)
})
t.Run("storage", func(t *testing.T) {
storagetest.TestAll(t, func(t *testing.T) storagetest.Component {
return New(
WithLogger(test.NewLogger(t, true)),
WithFlushWorkersCount(2),
WithPath(t.TempDir()),
WithGCInterval(1*time.Second))
})
})
}

View file

@ -10,6 +10,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.etcd.io/bbolt"
"go.opentelemetry.io/otel/attribute"
@ -17,8 +18,6 @@ import (
)
// Delete removes object from write-cache.
//
// Returns an error of type apistatus.ObjectNotFound if object is missing in write-cache.
func (c *cache) Delete(ctx context.Context, addr oid.Address) error {
ctx, span := tracing.StartSpanFromContext(ctx, "writecache.Delete",
trace.WithAttributes(
@ -61,6 +60,7 @@ func (c *cache) Delete(ctx context.Context, addr oid.Address) error {
if err != nil {
return err
}
deleted = true
storagelog.Write(c.log,
storagelog.AddressField(saddr),
storagelog.StorageTypeField(wcStorageType),
@ -69,9 +69,9 @@ func (c *cache) Delete(ctx context.Context, addr oid.Address) error {
if recordDeleted {
c.objCounters.cDB.Add(math.MaxUint64)
c.estimateCacheSize()
return nil
}
deleted = true
return nil
return metaerr.Wrap(writecache.ErrMiss)
}
storageType = writecache.StorageTypeFSTree
@ -85,5 +85,8 @@ func (c *cache) Delete(ctx context.Context, addr oid.Address) error {
deleted = true
c.estimateCacheSize()
}
if client.IsErrObjectNotFound(err) {
err = writecache.ErrMiss
}
return metaerr.Wrap(err)
}

View file

@ -1,17 +0,0 @@
package writecachebbolt
import (
"testing"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/storagetest"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger/test"
)
func TestGeneric(t *testing.T) {
storagetest.TestAll(t, func(t *testing.T) storagetest.Component {
return New(
WithLogger(test.NewLogger(t, true)),
WithFlushWorkersCount(2),
WithPath(t.TempDir()))
})
}

View file

@ -6,10 +6,8 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"github.com/nspcc-dev/neo-go/pkg/util/slice"
@ -52,7 +50,7 @@ func (c *cache) getInternal(ctx context.Context, saddr string, addr oid.Address)
res, err := c.fsTree.Get(ctx, common.GetPrm{Address: addr})
if err != nil {
return nil, logicerr.Wrap(new(apistatus.ObjectNotFound))
return nil, metaerr.Wrap(writecache.ErrMiss)
}
found = true
@ -61,8 +59,6 @@ func (c *cache) getInternal(ctx context.Context, saddr string, addr oid.Address)
}
// Head returns object header from write-cache.
//
// Returns an error of type apistatus.ObjectNotFound if the requested object is missing in write-cache.
func (c *cache) Head(ctx context.Context, addr oid.Address) (*objectSDK.Object, error) {
saddr := addr.EncodeToString()
@ -82,8 +78,6 @@ func (c *cache) Head(ctx context.Context, addr oid.Address) (*objectSDK.Object,
// Get fetches object from the underlying database.
// Key should be a stringified address.
//
// Returns an error of type apistatus.ObjectNotFound if the requested object is missing in db.
func Get(db *bbolt.DB, key []byte) ([]byte, error) {
var value []byte
err := db.View(func(tx *bbolt.Tx) error {
@ -93,7 +87,7 @@ func Get(db *bbolt.DB, key []byte) ([]byte, error) {
}
value = b.Get(key)
if value == nil {
return logicerr.Wrap(new(apistatus.ObjectNotFound))
return metaerr.Wrap(writecache.ErrMiss)
}
value = slice.Copy(value)
return nil

View file

@ -9,6 +9,7 @@ import (
objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/storagetest"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache"
@ -100,5 +101,20 @@ func TestFlush(t *testing.T) {
},
}
writecachetest.TestFlush(t, createCacheFn, errCountOpt, failures...)
t.Run("api", func(t *testing.T) {
writecachetest.TestAPI(t, createCacheFn)
})
t.Run("flush", func(t *testing.T) {
writecachetest.TestFlush(t, createCacheFn, errCountOpt, failures...)
})
t.Run("storage", func(t *testing.T) {
storagetest.TestAll(t, func(t *testing.T) storagetest.Component {
return New(
WithLogger(test.NewLogger(t, true)),
WithFlushWorkersCount(2),
WithPath(t.TempDir()))
})
})
}

View file

@ -0,0 +1,60 @@
package writecachetest
import (
"context"
"testing"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache"
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
"github.com/stretchr/testify/require"
)
func TestAPI[Option any](t *testing.T, createCacheFn CreateCacheFunc[Option]) {
ctx := context.Background()
wc, _, _ := newCache(t, createCacheFn, smallSize)
obj := testutil.GenerateObject()
addr := testutil.AddressFromObject(t, obj)
data, err := obj.Marshal()
require.NoError(t, err)
// Get nonexistent object
{
_, gotErr := wc.Get(ctx, oidtest.Address())
require.ErrorIs(t, gotErr, writecache.ErrMiss)
}
// Put an object
{
_, err := wc.Put(ctx, common.PutPrm{
Address: addr,
Object: obj,
RawData: data,
})
require.NoError(t, err)
}
// Get the object previously put
{
gotObj, err := wc.Get(ctx, addr)
require.NoError(t, err)
gotData, err := gotObj.Marshal()
require.NoError(t, err)
require.Equal(t, data, gotData)
}
// Delete the object previously put
{
require.NoError(t, wc.Delete(ctx, addr))
require.ErrorIs(t, wc.Delete(ctx, addr), writecache.ErrMiss)
}
// Get the object previously deleted
{
_, gotErr := wc.Get(ctx, addr)
require.Error(t, gotErr, writecache.ErrMiss)
}
}

View file

@ -2,20 +2,10 @@ package writecachetest
import (
"context"
"path/filepath"
"sync/atomic"
"testing"
objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"github.com/stretchr/testify/require"
)
@ -24,24 +14,6 @@ const (
smallSize = 256
)
type CreateCacheFunc[Option any] func(
t *testing.T,
smallSize uint64,
meta *meta.DB,
bs writecache.MainStorage,
opts ...Option,
) writecache.Cache
type TestFailureInjector[Option any] struct {
Desc string
InjectFn func(*testing.T, writecache.Cache)
}
type objectPair struct {
addr oid.Address
obj *objectSDK.Object
}
func TestFlush[Option any](
t *testing.T,
createCacheFn CreateCacheFunc[Option],
@ -50,6 +22,11 @@ func TestFlush[Option any](
) {
t.Run("no errors", func(t *testing.T) {
wc, bs, mb := newCache(t, createCacheFn, smallSize)
// Set mode for metabase and blobstor to prevent background flushes.
require.NoError(t, mb.SetMode(mode.ReadOnly))
require.NoError(t, bs.SetMode(mode.ReadOnly))
objects := putObjects(t, wc)
require.NoError(t, bs.SetMode(mode.ReadWrite))
@ -62,6 +39,11 @@ func TestFlush[Option any](
t.Run("flush on moving to degraded mode", func(t *testing.T) {
wc, bs, mb := newCache(t, createCacheFn, smallSize)
// Set mode for metabase and blobstor to prevent background flushes.
require.NoError(t, mb.SetMode(mode.ReadOnly))
require.NoError(t, bs.SetMode(mode.ReadOnly))
objects := putObjects(t, wc)
// Blobstor is read-only, so we expect en error from `flush` here.
@ -82,6 +64,11 @@ func TestFlush[Option any](
t.Run(f.Desc, func(t *testing.T) {
errCountOpt, errCount := errCountOption()
wc, bs, mb := newCache(t, createCacheFn, smallSize, errCountOpt)
// Set mode for metabase and blobstor to prevent background flushes.
require.NoError(t, mb.SetMode(mode.ReadOnly))
require.NoError(t, bs.SetMode(mode.ReadOnly))
objects := putObjects(t, wc)
f.InjectFn(t, wc)
@ -99,87 +86,3 @@ func TestFlush[Option any](
}
})
}
func newCache[Option any](
t *testing.T,
createCacheFn CreateCacheFunc[Option],
smallSize uint64,
opts ...Option,
) (writecache.Cache, *blobstor.BlobStor, *meta.DB) {
dir := t.TempDir()
mb := meta.New(
meta.WithPath(filepath.Join(dir, "meta")),
meta.WithEpochState(dummyEpoch{}))
require.NoError(t, mb.Open(false))
require.NoError(t, mb.Init())
bs := blobstor.New(blobstor.WithStorages([]blobstor.SubStorage{
{
Storage: fstree.New(
fstree.WithPath(filepath.Join(dir, "blob")),
fstree.WithDepth(0),
fstree.WithDirNameLen(1)),
},
}))
require.NoError(t, bs.Open(false))
require.NoError(t, bs.Init())
wc := createCacheFn(t, smallSize, mb, bs, opts...)
t.Cleanup(func() { require.NoError(t, wc.Close()) })
require.NoError(t, wc.Open(false))
require.NoError(t, wc.Init())
// First set mode for metabase and blobstor to prevent background flushes.
require.NoError(t, mb.SetMode(mode.ReadOnly))
require.NoError(t, bs.SetMode(mode.ReadOnly))
return wc, bs, mb
}
func putObject(t *testing.T, c writecache.Cache, size int) objectPair {
obj := testutil.GenerateObjectWithSize(size)
data, err := obj.Marshal()
require.NoError(t, err)
var prm common.PutPrm
prm.Address = objectCore.AddressOf(obj)
prm.Object = obj
prm.RawData = data
_, err = c.Put(context.Background(), prm)
require.NoError(t, err)
return objectPair{prm.Address, prm.Object}
}
func putObjects(t *testing.T, c writecache.Cache) []objectPair {
objects := make([]objectPair, objCount)
for i := range objects {
objects[i] = putObject(t, c, 1+(i%2)*smallSize)
}
return objects
}
func check(t *testing.T, mb *meta.DB, bs *blobstor.BlobStor, objects []objectPair) {
for i := range objects {
var mPrm meta.StorageIDPrm
mPrm.SetAddress(objects[i].addr)
mRes, err := mb.StorageID(context.Background(), mPrm)
require.NoError(t, err)
var prm common.GetPrm
prm.Address = objects[i].addr
prm.StorageID = mRes.StorageID()
res, err := bs.Get(context.Background(), prm)
require.NoError(t, err)
require.Equal(t, objects[i].obj, res.Object)
}
}
type dummyEpoch struct{}
func (dummyEpoch) CurrentEpoch() uint64 {
return 0
}

View file

@ -0,0 +1,116 @@
package writecachetest
import (
"context"
"path/filepath"
"testing"
objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"github.com/stretchr/testify/require"
)
type CreateCacheFunc[Option any] func(
t *testing.T,
smallSize uint64,
meta *meta.DB,
bs writecache.MainStorage,
opts ...Option,
) writecache.Cache
type TestFailureInjector[Option any] struct {
Desc string
InjectFn func(*testing.T, writecache.Cache)
}
type objectPair struct {
addr oid.Address
obj *objectSDK.Object
}
func newCache[Option any](
t *testing.T,
createCacheFn CreateCacheFunc[Option],
smallSize uint64,
opts ...Option,
) (writecache.Cache, *blobstor.BlobStor, *meta.DB) {
dir := t.TempDir()
mb := meta.New(
meta.WithPath(filepath.Join(dir, "meta")),
meta.WithEpochState(dummyEpoch{}))
require.NoError(t, mb.Open(false))
require.NoError(t, mb.Init())
bs := blobstor.New(blobstor.WithStorages([]blobstor.SubStorage{
{
Storage: fstree.New(
fstree.WithPath(filepath.Join(dir, "blob")),
fstree.WithDepth(0),
fstree.WithDirNameLen(1)),
},
}))
require.NoError(t, bs.Open(false))
require.NoError(t, bs.Init())
wc := createCacheFn(t, smallSize, mb, bs, opts...)
t.Cleanup(func() { require.NoError(t, wc.Close()) })
require.NoError(t, wc.Open(false))
require.NoError(t, wc.Init())
return wc, bs, mb
}
func putObject(t *testing.T, c writecache.Cache, size int) objectPair {
obj := testutil.GenerateObjectWithSize(size)
data, err := obj.Marshal()
require.NoError(t, err)
var prm common.PutPrm
prm.Address = objectCore.AddressOf(obj)
prm.Object = obj
prm.RawData = data
_, err = c.Put(context.Background(), prm)
require.NoError(t, err)
return objectPair{prm.Address, prm.Object}
}
func putObjects(t *testing.T, c writecache.Cache) []objectPair {
objects := make([]objectPair, objCount)
for i := range objects {
objects[i] = putObject(t, c, 1+(i%2)*smallSize)
}
return objects
}
func check(t *testing.T, mb *meta.DB, bs *blobstor.BlobStor, objects []objectPair) {
for i := range objects {
var mPrm meta.StorageIDPrm
mPrm.SetAddress(objects[i].addr)
mRes, err := mb.StorageID(context.Background(), mPrm)
require.NoError(t, err)
var prm common.GetPrm
prm.Address = objects[i].addr
prm.StorageID = mRes.StorageID()
res, err := bs.Get(context.Background(), prm)
require.NoError(t, err)
require.Equal(t, objects[i].obj, res.Object)
}
}
type dummyEpoch struct{}
func (dummyEpoch) CurrentEpoch() uint64 {
return 0
}

View file

@ -1,6 +1,7 @@
package container
import (
"crypto/sha256"
"fmt"
"strings"
@ -11,7 +12,22 @@ import (
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
)
func (c *Client) DeletionInfo(cid cid.ID) (*containercore.DelInfo, error) {
func (x *containerSource) DeletionInfo(cnr cid.ID) (*containercore.DelInfo, error) {
return DeletionInfo((*Client)(x), cnr)
}
type deletionInfo interface {
DeletionInfo(cid []byte) (*containercore.DelInfo, error)
}
func DeletionInfo(c deletionInfo, cnr cid.ID) (*containercore.DelInfo, error) {
binCnr := make([]byte, sha256.Size)
cnr.Encode(binCnr)
return c.DeletionInfo(binCnr)
}
func (c *Client) DeletionInfo(cid []byte) (*containercore.DelInfo, error) {
prm := client.TestInvokePrm{}
prm.SetMethod(deletionInfoMethod)
prm.SetArgs(cid)

View file

@ -5,6 +5,7 @@ import (
"errors"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
containercore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
@ -27,12 +28,20 @@ func (p *Policer) processObject(ctx context.Context, addrWithType objectcore.Add
zap.String("error", err.Error()),
)
if client.IsErrContainerNotFound(err) {
err := p.buryFn(ctx, addrWithType.Address)
existed, err := containercore.WasRemoved(p.cnrSrc, idCnr)
if err != nil {
p.log.Error(logs.PolicerCouldNotInhumeObjectWithMissingContainer,
p.log.Error(logs.PolicerCouldNotConfirmContainerRemoval,
zap.Stringer("cid", idCnr),
zap.Stringer("oid", idObj),
zap.String("error", err.Error()))
} else if existed {
err := p.buryFn(ctx, addrWithType.Address)
if err != nil {
p.log.Error(logs.PolicerCouldNotInhumeObjectWithMissingContainer,
zap.Stringer("cid", idCnr),
zap.Stringer("oid", idObj),
zap.String("error", err.Error()))
}
}
}

View file

@ -34,8 +34,13 @@ func TestBuryObjectWithoutContainer(t *testing.T) {
// Container source and bury function
buryCh := make(chan oid.Address)
containerSrc := func(id cid.ID) (*container.Container, error) {
return nil, new(apistatus.ContainerNotFound)
containerSrc := containerSrc{
get: func(id cid.ID) (*container.Container, error) {
return nil, new(apistatus.ContainerNotFound)
},
deletionInfo: func(id cid.ID) (*container.DelInfo, error) {
return &container.DelInfo{}, nil
},
}
buryFn := func(ctx context.Context, a oid.Address) error {
buryCh <- a
@ -49,7 +54,7 @@ func TestBuryObjectWithoutContainer(t *testing.T) {
// Policer instance
p := New(
WithKeySpaceIterator(&sliceKeySpaceIterator{objs: objs}),
WithContainerSource(containerSrcFunc(containerSrc)),
WithContainerSource(containerSrc),
WithBuryFunc(buryFn),
WithPool(pool),
)
@ -194,12 +199,17 @@ func TestProcessObject(t *testing.T) {
cnr := &container.Container{}
cnr.Value.Init()
cnr.Value.SetPlacementPolicy(policy)
containerSrc := func(id cid.ID) (*container.Container, error) {
if id.Equals(addr.Container()) {
return cnr, nil
}
t.Errorf("unexpected container requested: got=%v, want=%v", id, addr.Container())
return nil, new(apistatus.ContainerNotFound)
containerSrc := containerSrc{
get: func(id cid.ID) (*container.Container, error) {
if id.Equals(addr.Container()) {
return cnr, nil
}
t.Errorf("unexpected container requested: got=%v, want=%v", id, addr.Container())
return nil, new(apistatus.ContainerNotFound)
},
deletionInfo: func(id cid.ID) (*container.DelInfo, error) {
return &container.DelInfo{}, nil
},
}
buryFn := func(ctx context.Context, a oid.Address) error {
t.Errorf("unexpected object buried: %v", a)
@ -211,7 +221,7 @@ func TestProcessObject(t *testing.T) {
var gotReplicateTo []int
p := New(
WithContainerSource(containerSrcFunc(containerSrc)),
WithContainerSource(containerSrc),
WithPlacementBuilder(placementBuilderFunc(placementBuilder)),
WithNetmapKeys(announcedKeysFunc(func(k []byte) bool {
return bytes.Equal(k, nodes[0].PublicKey())
@ -251,9 +261,6 @@ func TestIteratorContract(t *testing.T) {
Type: objectSDK.TypeRegular,
}}
containerSrc := func(id cid.ID) (*container.Container, error) {
return nil, new(apistatus.ContainerNotFound)
}
buryFn := func(ctx context.Context, a oid.Address) error {
return nil
}
@ -273,9 +280,18 @@ func TestIteratorContract(t *testing.T) {
finishCh: make(chan struct{}),
}
containerSrc := containerSrc{
get: func(id cid.ID) (*container.Container, error) {
return nil, new(apistatus.ContainerNotFound)
},
deletionInfo: func(id cid.ID) (*container.DelInfo, error) {
return &container.DelInfo{}, nil
},
}
p := New(
WithKeySpaceIterator(it),
WithContainerSource(containerSrcFunc(containerSrc)),
WithContainerSource(containerSrc),
WithBuryFunc(buryFn),
WithPool(pool),
func(c *cfg) {
@ -353,10 +369,14 @@ func (it *sliceKeySpaceIterator) Rewind() {
it.cur = 0
}
// containerSrcFunc is a container.Source backed by a function.
type containerSrcFunc func(cid.ID) (*container.Container, error)
type containerSrc struct {
get func(id cid.ID) (*container.Container, error)
deletionInfo func(id cid.ID) (*container.DelInfo, error)
}
func (f containerSrcFunc) Get(id cid.ID) (*container.Container, error) { return f(id) }
func (f containerSrc) Get(id cid.ID) (*container.Container, error) { return f.get(id) }
func (f containerSrc) DeletionInfo(id cid.ID) (*container.DelInfo, error) { return f.deletionInfo(id) }
// placementBuilderFunc is a placement.Builder backed by a function
type placementBuilderFunc func(cid.ID, *oid.ID, netmap.PlacementPolicy) ([][]netmap.NodeInfo, error)

View file

@ -12,13 +12,13 @@ import (
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
containerCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
metrics "git.frostfs.info/TrueCloudLab/frostfs-observability/metrics/grpc"
tracing "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
tracing_grpc "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing/grpc"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
"github.com/panjf2000/ants/v2"
@ -441,18 +441,6 @@ func (s *Service) syncContainers(ctx context.Context, cnrs []cid.ID) {
wg.Wait()
}
func (s *Service) containerEverExisted(cid cid.ID) (bool, error) {
_, err := s.cnrSource.DeletionInfo(cid)
if err == nil {
return true, nil
}
var errContainerNotFound *apistatus.ContainerNotFound
if errors.As(err, &errContainerNotFound) {
return false, nil
}
return false, err
}
func (s *Service) removeContainers(ctx context.Context, newContainers map[cid.ID]struct{}) {
ctx, span := tracing.StartSpanFromContext(ctx, "TreeService.removeContainers")
defer span.End()
@ -466,7 +454,7 @@ func (s *Service) removeContainers(ctx context.Context, newContainers map[cid.ID
continue
}
existed, err := s.containerEverExisted(cnr)
existed, err := containerCore.WasRemoved(s.cnrSource, cnr)
if err != nil {
s.log.Error(logs.TreeCouldNotCheckIfContainerExisted,
zap.Stringer("cid", cnr),