forked from TrueCloudLab/frostfs-node
Compare commits
No commits in common. "317e501d1c5eecbec38217f87aee61b60bd8a1d7" and "9072772a09c93f2bcdb9ad1441344512980a4a39" have entirely different histories.
317e501d1c
...
9072772a09
23 changed files with 231 additions and 375 deletions
|
@ -142,8 +142,7 @@ func (c *lruNetCache) get(key uint64) (*netmapSDK.NetMap, error) {
|
||||||
// wrapper over TTL cache of values read from the network
|
// wrapper over TTL cache of values read from the network
|
||||||
// that implements container storage.
|
// that implements container storage.
|
||||||
type ttlContainerStorage struct {
|
type ttlContainerStorage struct {
|
||||||
containerCache *ttlNetCache[cid.ID, *container.Container]
|
*ttlNetCache[cid.ID, *container.Container]
|
||||||
delInfoCache *ttlNetCache[cid.ID, *container.DelInfo]
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func newCachedContainerStorage(v container.Source, ttl time.Duration) ttlContainerStorage {
|
func newCachedContainerStorage(v container.Source, ttl time.Duration) ttlContainerStorage {
|
||||||
|
@ -152,31 +151,18 @@ func newCachedContainerStorage(v container.Source, ttl time.Duration) ttlContain
|
||||||
lruCnrCache := newNetworkTTLCache(containerCacheSize, ttl, func(id cid.ID) (*container.Container, error) {
|
lruCnrCache := newNetworkTTLCache(containerCacheSize, ttl, func(id cid.ID) (*container.Container, error) {
|
||||||
return v.Get(id)
|
return v.Get(id)
|
||||||
})
|
})
|
||||||
lruDelInfoCache := newNetworkTTLCache(containerCacheSize, ttl, func(id cid.ID) (*container.DelInfo, error) {
|
|
||||||
return v.DeletionInfo(id)
|
|
||||||
})
|
|
||||||
|
|
||||||
return ttlContainerStorage{
|
return ttlContainerStorage{lruCnrCache}
|
||||||
containerCache: lruCnrCache,
|
|
||||||
delInfoCache: lruDelInfoCache,
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s ttlContainerStorage) handleRemoval(cnr cid.ID) {
|
func (s ttlContainerStorage) handleRemoval(cnr cid.ID) {
|
||||||
s.containerCache.set(cnr, nil, new(apistatus.ContainerNotFound))
|
s.set(cnr, nil, new(apistatus.ContainerNotFound))
|
||||||
|
|
||||||
// The removal invalidates possibly stored error response.
|
|
||||||
s.delInfoCache.remove(cnr)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get returns container value from the cache. If value is missing in the cache
|
// Get returns container value from the cache. If value is missing in the cache
|
||||||
// or expired, then it returns value from side chain and updates the cache.
|
// or expired, then it returns value from side chain and updates the cache.
|
||||||
func (s ttlContainerStorage) Get(cnr cid.ID) (*container.Container, error) {
|
func (s ttlContainerStorage) Get(cnr cid.ID) (*container.Container, error) {
|
||||||
return s.containerCache.get(cnr)
|
return s.get(cnr)
|
||||||
}
|
|
||||||
|
|
||||||
func (s ttlContainerStorage) DeletionInfo(cnr cid.ID) (*container.DelInfo, error) {
|
|
||||||
return s.delInfoCache.get(cnr)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type ttlEACLStorage struct {
|
type ttlEACLStorage struct {
|
||||||
|
|
|
@ -113,7 +113,7 @@ func configureEACLAndContainerSources(c *cfg, client *cntClient.Client, cnrSrc c
|
||||||
c.cfgObject.eaclSource = eACLFetcher
|
c.cfgObject.eaclSource = eACLFetcher
|
||||||
cnrRdr.eacl = eACLFetcher
|
cnrRdr.eacl = eACLFetcher
|
||||||
c.cfgObject.cnrSource = cnrSrc
|
c.cfgObject.cnrSource = cnrSrc
|
||||||
cnrRdr.src = cnrSrc
|
cnrRdr.get = cnrSrc
|
||||||
cnrRdr.lister = client
|
cnrRdr.lister = client
|
||||||
} else {
|
} else {
|
||||||
// use RPC node as source of Container contract items (with caching)
|
// use RPC node as source of Container contract items (with caching)
|
||||||
|
@ -131,7 +131,7 @@ func configureEACLAndContainerSources(c *cfg, client *cntClient.Client, cnrSrc c
|
||||||
cnr, err := cnrSrc.Get(ev.ID)
|
cnr, err := cnrSrc.Get(ev.ID)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
cachedContainerLister.update(cnr.Value.Owner(), ev.ID, true)
|
cachedContainerLister.update(cnr.Value.Owner(), ev.ID, true)
|
||||||
cachedContainerStorage.containerCache.set(ev.ID, cnr, nil)
|
cachedContainerStorage.set(ev.ID, cnr, nil)
|
||||||
} else {
|
} else {
|
||||||
// unlike removal, we expect successful receive of the container
|
// unlike removal, we expect successful receive of the container
|
||||||
// after successful creation, so logging can be useful
|
// after successful creation, so logging can be useful
|
||||||
|
@ -159,6 +159,7 @@ func configureEACLAndContainerSources(c *cfg, client *cntClient.Client, cnrSrc c
|
||||||
}
|
}
|
||||||
|
|
||||||
cachedContainerStorage.handleRemoval(ev.ID)
|
cachedContainerStorage.handleRemoval(ev.ID)
|
||||||
|
|
||||||
c.log.Debug(logs.FrostFSNodeContainerRemovalEventsReceipt,
|
c.log.Debug(logs.FrostFSNodeContainerRemovalEventsReceipt,
|
||||||
zap.Stringer("id", ev.ID),
|
zap.Stringer("id", ev.ID),
|
||||||
)
|
)
|
||||||
|
@ -169,7 +170,7 @@ func configureEACLAndContainerSources(c *cfg, client *cntClient.Client, cnrSrc c
|
||||||
|
|
||||||
cnrRdr.lister = cachedContainerLister
|
cnrRdr.lister = cachedContainerLister
|
||||||
cnrRdr.eacl = c.cfgObject.eaclSource
|
cnrRdr.eacl = c.cfgObject.eaclSource
|
||||||
cnrRdr.src = c.cfgObject.cnrSource
|
cnrRdr.get = c.cfgObject.cnrSource
|
||||||
|
|
||||||
cnrWrt.cacheEnabled = true
|
cnrWrt.cacheEnabled = true
|
||||||
cnrWrt.eacls = cachedEACLStorage
|
cnrWrt.eacls = cachedEACLStorage
|
||||||
|
@ -640,7 +641,7 @@ func (c *usedSpaceService) processLoadValue(_ context.Context, a containerSDK.Si
|
||||||
type morphContainerReader struct {
|
type morphContainerReader struct {
|
||||||
eacl containerCore.EACLSource
|
eacl containerCore.EACLSource
|
||||||
|
|
||||||
src containerCore.Source
|
get containerCore.Source
|
||||||
|
|
||||||
lister interface {
|
lister interface {
|
||||||
List(*user.ID) ([]cid.ID, error)
|
List(*user.ID) ([]cid.ID, error)
|
||||||
|
@ -648,11 +649,7 @@ type morphContainerReader struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (x *morphContainerReader) Get(id cid.ID) (*containerCore.Container, error) {
|
func (x *morphContainerReader) Get(id cid.ID) (*containerCore.Container, error) {
|
||||||
return x.src.Get(id)
|
return x.get.Get(id)
|
||||||
}
|
|
||||||
|
|
||||||
func (x *morphContainerReader) DeletionInfo(id cid.ID) (*containerCore.DelInfo, error) {
|
|
||||||
return x.src.DeletionInfo(id)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (x *morphContainerReader) GetEACL(id cid.ID) (*containerCore.EACL, error) {
|
func (x *morphContainerReader) GetEACL(id cid.ID) (*containerCore.EACL, error) {
|
||||||
|
|
|
@ -32,7 +32,7 @@ func (c cnrSource) Get(id cid.ID) (*container.Container, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c cnrSource) DeletionInfo(cid cid.ID) (*container.DelInfo, error) {
|
func (c cnrSource) DeletionInfo(cid cid.ID) (*container.DelInfo, error) {
|
||||||
return c.src.DeletionInfo(cid)
|
return c.cli.DeletionInfo(cid)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c cnrSource) List() ([]cid.ID, error) {
|
func (c cnrSource) List() ([]cid.ID, error) {
|
||||||
|
|
|
@ -42,7 +42,6 @@ const (
|
||||||
NotificatorNotificatorStartProcessingObjectNotifications = "notificator: start processing object notifications"
|
NotificatorNotificatorStartProcessingObjectNotifications = "notificator: start processing object notifications"
|
||||||
NotificatorNotificatorProcessingObjectNotification = "notificator: processing object notification"
|
NotificatorNotificatorProcessingObjectNotification = "notificator: processing object notification"
|
||||||
PolicerCouldNotGetContainer = "could not get container"
|
PolicerCouldNotGetContainer = "could not get container"
|
||||||
PolicerCouldNotConfirmContainerRemoval = "could not confirm container removal"
|
|
||||||
PolicerCouldNotInhumeObjectWithMissingContainer = "could not inhume object with missing container"
|
PolicerCouldNotInhumeObjectWithMissingContainer = "could not inhume object with missing container"
|
||||||
PolicerCouldNotBuildPlacementVectorForObject = "could not build placement vector for object"
|
PolicerCouldNotBuildPlacementVectorForObject = "could not build placement vector for object"
|
||||||
PolicerRedundantLocalObjectCopyDetected = "redundant local object copy detected"
|
PolicerRedundantLocalObjectCopyDetected = "redundant local object copy detected"
|
||||||
|
|
|
@ -41,8 +41,6 @@ type Source interface {
|
||||||
// Implementations must not retain the container pointer and modify
|
// Implementations must not retain the container pointer and modify
|
||||||
// the container through it.
|
// the container through it.
|
||||||
Get(cid.ID) (*Container, error)
|
Get(cid.ID) (*Container, error)
|
||||||
|
|
||||||
DeletionInfo(cid.ID) (*DelInfo, error)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// EACL groups information about the FrostFS container's extended ACL stored in
|
// EACL groups information about the FrostFS container's extended ACL stored in
|
||||||
|
|
|
@ -1,22 +0,0 @@
|
||||||
package container
|
|
||||||
|
|
||||||
import (
|
|
||||||
"errors"
|
|
||||||
|
|
||||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
|
||||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
|
||||||
)
|
|
||||||
|
|
||||||
// WasRemoved checks whether the container ever existed or
|
|
||||||
// it just has not been created yet at the current epoch.
|
|
||||||
func WasRemoved(s Source, cid cid.ID) (bool, error) {
|
|
||||||
_, err := s.DeletionInfo(cid)
|
|
||||||
if err == nil {
|
|
||||||
return true, nil
|
|
||||||
}
|
|
||||||
var errContainerNotFound *apistatus.ContainerNotFound
|
|
||||||
if errors.As(err, &errContainerNotFound) {
|
|
||||||
return false, nil
|
|
||||||
}
|
|
||||||
return false, err
|
|
||||||
}
|
|
|
@ -2,7 +2,6 @@ package shard
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||||
|
@ -139,10 +138,10 @@ func (s *Shard) fetchObjectData(ctx context.Context, addr oid.Address, skipMeta
|
||||||
|
|
||||||
if s.hasWriteCache() {
|
if s.hasWriteCache() {
|
||||||
res, err := wc(s.writeCache)
|
res, err := wc(s.writeCache)
|
||||||
if err == nil || IsErrOutOfRange(err) || client.IsErrObjectNotFound(err) {
|
if err == nil || IsErrOutOfRange(err) {
|
||||||
return res, false, err
|
return res, false, err
|
||||||
}
|
}
|
||||||
if errors.Is(err, writecache.ErrMiss) {
|
if client.IsErrObjectNotFound(err) {
|
||||||
s.log.Debug(logs.ShardObjectIsMissingInWritecache,
|
s.log.Debug(logs.ShardObjectIsMissingInWritecache,
|
||||||
zap.Stringer("addr", addr),
|
zap.Stringer("addr", addr),
|
||||||
zap.Bool("skip_meta", skipMeta))
|
zap.Bool("skip_meta", skipMeta))
|
||||||
|
|
|
@ -63,11 +63,4 @@ var (
|
||||||
ErrBigObject = errors.New("too big object")
|
ErrBigObject = errors.New("too big object")
|
||||||
// ErrOutOfSpace is returned when there is no space left to put a new object.
|
// ErrOutOfSpace is returned when there is no space left to put a new object.
|
||||||
ErrOutOfSpace = errors.New("no space left in the write cache")
|
ErrOutOfSpace = errors.New("no space left in the write cache")
|
||||||
// ErrMiss is returned when there is no information about the object in the cache.
|
|
||||||
// Note that the distinction between this error and ObjectNotFound is important. When
|
|
||||||
// the cache returns ObjectNotFound, it can be safely assumed that the object is not
|
|
||||||
// found in the cache or any underlying storage. When the cache returns ErrMiss, nothing
|
|
||||||
// is known about the object and thus the underlying storage must be consulted as well.
|
|
||||||
// This allows the cache to serve as a consistent view of the underlying storage.
|
|
||||||
ErrMiss = errors.New("write-cache miss")
|
|
||||||
)
|
)
|
||||||
|
|
|
@ -6,8 +6,10 @@ import (
|
||||||
|
|
||||||
storagelog "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/log"
|
storagelog "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/log"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||||
|
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
"github.com/dgraph-io/badger/v4"
|
"github.com/dgraph-io/badger/v4"
|
||||||
"go.opentelemetry.io/otel/attribute"
|
"go.opentelemetry.io/otel/attribute"
|
||||||
|
@ -15,6 +17,8 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
// Delete removes object from write-cache.
|
// Delete removes object from write-cache.
|
||||||
|
//
|
||||||
|
// Returns an error of type apistatus.ObjectNotFound if object is missing in write-cache.
|
||||||
func (c *cache) Delete(ctx context.Context, addr oid.Address) error {
|
func (c *cache) Delete(ctx context.Context, addr oid.Address) error {
|
||||||
_, span := tracing.StartSpanFromContext(ctx, "writecache.Delete",
|
_, span := tracing.StartSpanFromContext(ctx, "writecache.Delete",
|
||||||
trace.WithAttributes(
|
trace.WithAttributes(
|
||||||
|
@ -35,22 +39,22 @@ func (c *cache) Delete(ctx context.Context, addr oid.Address) error {
|
||||||
return writecache.ErrReadOnly
|
return writecache.ErrReadOnly
|
||||||
}
|
}
|
||||||
|
|
||||||
key := addr2key(addr)
|
saddr := addr.EncodeToString()
|
||||||
|
|
||||||
err := c.db.Update(func(tx *badger.Txn) error {
|
err := c.db.Update(func(tx *badger.Txn) error {
|
||||||
it, err := tx.Get(key[:])
|
it, err := tx.Get([]byte(saddr))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err == badger.ErrKeyNotFound {
|
if err == badger.ErrKeyNotFound {
|
||||||
return writecache.ErrMiss
|
return logicerr.Wrap(new(apistatus.ObjectNotFound))
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if it.ValueSize() > 0 {
|
if it.ValueSize() > 0 {
|
||||||
storageType = writecache.StorageTypeDB
|
storageType = writecache.StorageTypeDB
|
||||||
err := tx.Delete(key[:])
|
err := tx.Delete([]byte(saddr))
|
||||||
if err == nil {
|
if err == nil {
|
||||||
storagelog.Write(c.log,
|
storagelog.Write(c.log,
|
||||||
storagelog.AddressField(addr.EncodeToString()),
|
storagelog.AddressField(saddr),
|
||||||
storagelog.StorageTypeField(wcStorageType),
|
storagelog.StorageTypeField(wcStorageType),
|
||||||
storagelog.OpField("db DELETE"),
|
storagelog.OpField("db DELETE"),
|
||||||
)
|
)
|
||||||
|
|
|
@ -6,7 +6,6 @@ import (
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/storagetest"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
|
||||||
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache"
|
||||||
|
@ -17,7 +16,7 @@ import (
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestGeneric(t *testing.T) {
|
func TestFlush(t *testing.T) {
|
||||||
createCacheFn := func(t *testing.T, smallSize uint64, mb *meta.DB, bs writecache.MainStorage, opts ...Option) writecache.Cache {
|
createCacheFn := func(t *testing.T, smallSize uint64, mb *meta.DB, bs writecache.MainStorage, opts ...Option) writecache.Cache {
|
||||||
return New(
|
return New(
|
||||||
append([]Option{
|
append([]Option{
|
||||||
|
@ -60,22 +59,5 @@ func TestGeneric(t *testing.T) {
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
t.Run("api", func(t *testing.T) {
|
writecachetest.TestFlush(t, createCacheFn, errCountOpt, failures...)
|
||||||
writecachetest.TestAPI(t, createCacheFn)
|
|
||||||
})
|
|
||||||
|
|
||||||
t.Run("flush", func(t *testing.T) {
|
|
||||||
writecachetest.TestFlush(t, createCacheFn, errCountOpt, failures...)
|
|
||||||
})
|
|
||||||
|
|
||||||
t.Run("storage", func(t *testing.T) {
|
|
||||||
storagetest.TestAll(t, func(t *testing.T) storagetest.Component {
|
|
||||||
return New(
|
|
||||||
WithLogger(test.NewLogger(t, true)),
|
|
||||||
WithFlushWorkersCount(2),
|
|
||||||
WithPath(t.TempDir()),
|
|
||||||
WithGCInterval(1*time.Second))
|
|
||||||
})
|
|
||||||
})
|
|
||||||
|
|
||||||
}
|
}
|
|
@ -0,0 +1,19 @@
|
||||||
|
package writecachebadger
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/storagetest"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger/test"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestGeneric(t *testing.T) {
|
||||||
|
storagetest.TestAll(t, func(t *testing.T) storagetest.Component {
|
||||||
|
return New(
|
||||||
|
WithLogger(test.NewLogger(t, true)),
|
||||||
|
WithFlushWorkersCount(2),
|
||||||
|
WithPath(t.TempDir()),
|
||||||
|
WithGCInterval(1*time.Second))
|
||||||
|
})
|
||||||
|
}
|
|
@ -5,8 +5,10 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||||
|
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
"github.com/dgraph-io/badger/v4"
|
"github.com/dgraph-io/badger/v4"
|
||||||
|
@ -14,6 +16,9 @@ import (
|
||||||
"go.opentelemetry.io/otel/trace"
|
"go.opentelemetry.io/otel/trace"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// Get returns object from write-cache.
|
||||||
|
//
|
||||||
|
// Returns an error of type apistatus.ObjectNotFound if the requested object is missing in write-cache.
|
||||||
func (c *cache) Get(ctx context.Context, addr oid.Address) (*objectSDK.Object, error) {
|
func (c *cache) Get(ctx context.Context, addr oid.Address) (*objectSDK.Object, error) {
|
||||||
_, span := tracing.StartSpanFromContext(ctx, "writecache.Get",
|
_, span := tracing.StartSpanFromContext(ctx, "writecache.Get",
|
||||||
trace.WithAttributes(
|
trace.WithAttributes(
|
||||||
|
@ -42,10 +47,12 @@ func (c *cache) getInternal(addr oid.Address) (*objectSDK.Object, error) {
|
||||||
return obj, obj.Unmarshal(value)
|
return obj, obj.Unmarshal(value)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil, err
|
return nil, logicerr.Wrap(new(apistatus.ObjectNotFound))
|
||||||
}
|
}
|
||||||
|
|
||||||
// Head returns object header from write-cache.
|
// Head returns object header from write-cache.
|
||||||
|
//
|
||||||
|
// Returns an error of type apistatus.ObjectNotFound if the requested object is missing in write-cache.
|
||||||
func (c *cache) Head(ctx context.Context, addr oid.Address) (*objectSDK.Object, error) {
|
func (c *cache) Head(ctx context.Context, addr oid.Address) (*objectSDK.Object, error) {
|
||||||
_, span := tracing.StartSpanFromContext(ctx, "writecache.Head",
|
_, span := tracing.StartSpanFromContext(ctx, "writecache.Head",
|
||||||
trace.WithAttributes(
|
trace.WithAttributes(
|
||||||
|
@ -63,6 +70,8 @@ func (c *cache) Head(ctx context.Context, addr oid.Address) (*objectSDK.Object,
|
||||||
|
|
||||||
// Get fetches object from the underlying database.
|
// Get fetches object from the underlying database.
|
||||||
// Key should be a stringified address.
|
// Key should be a stringified address.
|
||||||
|
//
|
||||||
|
// Returns an error of type apistatus.ObjectNotFound if the requested object is missing in db.
|
||||||
func Get(db *badger.DB, key []byte) ([]byte, error) {
|
func Get(db *badger.DB, key []byte) ([]byte, error) {
|
||||||
var value []byte
|
var value []byte
|
||||||
|
|
||||||
|
@ -70,7 +79,7 @@ func Get(db *badger.DB, key []byte) ([]byte, error) {
|
||||||
it, err := tx.Get(key)
|
it, err := tx.Get(key)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err == badger.ErrKeyNotFound {
|
if err == badger.ErrKeyNotFound {
|
||||||
return writecache.ErrMiss
|
return logicerr.Wrap(new(apistatus.ObjectNotFound))
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -82,5 +91,5 @@ func Get(db *badger.DB, key []byte) ([]byte, error) {
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
|
|
||||||
return value, err
|
return value, metaerr.Wrap(err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -10,7 +10,6 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
"go.etcd.io/bbolt"
|
"go.etcd.io/bbolt"
|
||||||
"go.opentelemetry.io/otel/attribute"
|
"go.opentelemetry.io/otel/attribute"
|
||||||
|
@ -18,6 +17,8 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
// Delete removes object from write-cache.
|
// Delete removes object from write-cache.
|
||||||
|
//
|
||||||
|
// Returns an error of type apistatus.ObjectNotFound if object is missing in write-cache.
|
||||||
func (c *cache) Delete(ctx context.Context, addr oid.Address) error {
|
func (c *cache) Delete(ctx context.Context, addr oid.Address) error {
|
||||||
ctx, span := tracing.StartSpanFromContext(ctx, "writecache.Delete",
|
ctx, span := tracing.StartSpanFromContext(ctx, "writecache.Delete",
|
||||||
trace.WithAttributes(
|
trace.WithAttributes(
|
||||||
|
@ -60,7 +61,6 @@ func (c *cache) Delete(ctx context.Context, addr oid.Address) error {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
deleted = true
|
|
||||||
storagelog.Write(c.log,
|
storagelog.Write(c.log,
|
||||||
storagelog.AddressField(saddr),
|
storagelog.AddressField(saddr),
|
||||||
storagelog.StorageTypeField(wcStorageType),
|
storagelog.StorageTypeField(wcStorageType),
|
||||||
|
@ -69,9 +69,9 @@ func (c *cache) Delete(ctx context.Context, addr oid.Address) error {
|
||||||
if recordDeleted {
|
if recordDeleted {
|
||||||
c.objCounters.cDB.Add(math.MaxUint64)
|
c.objCounters.cDB.Add(math.MaxUint64)
|
||||||
c.estimateCacheSize()
|
c.estimateCacheSize()
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
return metaerr.Wrap(writecache.ErrMiss)
|
deleted = true
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
storageType = writecache.StorageTypeFSTree
|
storageType = writecache.StorageTypeFSTree
|
||||||
|
@ -85,8 +85,5 @@ func (c *cache) Delete(ctx context.Context, addr oid.Address) error {
|
||||||
deleted = true
|
deleted = true
|
||||||
c.estimateCacheSize()
|
c.estimateCacheSize()
|
||||||
}
|
}
|
||||||
if client.IsErrObjectNotFound(err) {
|
|
||||||
err = writecache.ErrMiss
|
|
||||||
}
|
|
||||||
return metaerr.Wrap(err)
|
return metaerr.Wrap(err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -9,7 +9,6 @@ import (
|
||||||
|
|
||||||
objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/storagetest"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
|
||||||
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache"
|
||||||
|
@ -101,20 +100,5 @@ func TestFlush(t *testing.T) {
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
t.Run("api", func(t *testing.T) {
|
writecachetest.TestFlush(t, createCacheFn, errCountOpt, failures...)
|
||||||
writecachetest.TestAPI(t, createCacheFn)
|
|
||||||
})
|
|
||||||
|
|
||||||
t.Run("flush", func(t *testing.T) {
|
|
||||||
writecachetest.TestFlush(t, createCacheFn, errCountOpt, failures...)
|
|
||||||
})
|
|
||||||
|
|
||||||
t.Run("storage", func(t *testing.T) {
|
|
||||||
storagetest.TestAll(t, func(t *testing.T) storagetest.Component {
|
|
||||||
return New(
|
|
||||||
WithLogger(test.NewLogger(t, true)),
|
|
||||||
WithFlushWorkersCount(2),
|
|
||||||
WithPath(t.TempDir()))
|
|
||||||
})
|
|
||||||
})
|
|
||||||
}
|
}
|
|
@ -0,0 +1,17 @@
|
||||||
|
package writecachebbolt
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/storagetest"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger/test"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestGeneric(t *testing.T) {
|
||||||
|
storagetest.TestAll(t, func(t *testing.T) storagetest.Component {
|
||||||
|
return New(
|
||||||
|
WithLogger(test.NewLogger(t, true)),
|
||||||
|
WithFlushWorkersCount(2),
|
||||||
|
WithPath(t.TempDir()))
|
||||||
|
})
|
||||||
|
}
|
|
@ -6,8 +6,10 @@ import (
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||||
|
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/util/slice"
|
"github.com/nspcc-dev/neo-go/pkg/util/slice"
|
||||||
|
@ -50,7 +52,7 @@ func (c *cache) getInternal(ctx context.Context, saddr string, addr oid.Address)
|
||||||
|
|
||||||
res, err := c.fsTree.Get(ctx, common.GetPrm{Address: addr})
|
res, err := c.fsTree.Get(ctx, common.GetPrm{Address: addr})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, metaerr.Wrap(writecache.ErrMiss)
|
return nil, logicerr.Wrap(new(apistatus.ObjectNotFound))
|
||||||
}
|
}
|
||||||
|
|
||||||
found = true
|
found = true
|
||||||
|
@ -59,6 +61,8 @@ func (c *cache) getInternal(ctx context.Context, saddr string, addr oid.Address)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Head returns object header from write-cache.
|
// Head returns object header from write-cache.
|
||||||
|
//
|
||||||
|
// Returns an error of type apistatus.ObjectNotFound if the requested object is missing in write-cache.
|
||||||
func (c *cache) Head(ctx context.Context, addr oid.Address) (*objectSDK.Object, error) {
|
func (c *cache) Head(ctx context.Context, addr oid.Address) (*objectSDK.Object, error) {
|
||||||
saddr := addr.EncodeToString()
|
saddr := addr.EncodeToString()
|
||||||
|
|
||||||
|
@ -78,6 +82,8 @@ func (c *cache) Head(ctx context.Context, addr oid.Address) (*objectSDK.Object,
|
||||||
|
|
||||||
// Get fetches object from the underlying database.
|
// Get fetches object from the underlying database.
|
||||||
// Key should be a stringified address.
|
// Key should be a stringified address.
|
||||||
|
//
|
||||||
|
// Returns an error of type apistatus.ObjectNotFound if the requested object is missing in db.
|
||||||
func Get(db *bbolt.DB, key []byte) ([]byte, error) {
|
func Get(db *bbolt.DB, key []byte) ([]byte, error) {
|
||||||
var value []byte
|
var value []byte
|
||||||
err := db.View(func(tx *bbolt.Tx) error {
|
err := db.View(func(tx *bbolt.Tx) error {
|
||||||
|
@ -87,7 +93,7 @@ func Get(db *bbolt.DB, key []byte) ([]byte, error) {
|
||||||
}
|
}
|
||||||
value = b.Get(key)
|
value = b.Get(key)
|
||||||
if value == nil {
|
if value == nil {
|
||||||
return metaerr.Wrap(writecache.ErrMiss)
|
return logicerr.Wrap(new(apistatus.ObjectNotFound))
|
||||||
}
|
}
|
||||||
value = slice.Copy(value)
|
value = slice.Copy(value)
|
||||||
return nil
|
return nil
|
||||||
|
|
|
@ -1,60 +0,0 @@
|
||||||
package writecachetest
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"testing"
|
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache"
|
|
||||||
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
|
|
||||||
"github.com/stretchr/testify/require"
|
|
||||||
)
|
|
||||||
|
|
||||||
func TestAPI[Option any](t *testing.T, createCacheFn CreateCacheFunc[Option]) {
|
|
||||||
ctx := context.Background()
|
|
||||||
wc, _, _ := newCache(t, createCacheFn, smallSize)
|
|
||||||
|
|
||||||
obj := testutil.GenerateObject()
|
|
||||||
addr := testutil.AddressFromObject(t, obj)
|
|
||||||
data, err := obj.Marshal()
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
// Get nonexistent object
|
|
||||||
{
|
|
||||||
_, gotErr := wc.Get(ctx, oidtest.Address())
|
|
||||||
require.ErrorIs(t, gotErr, writecache.ErrMiss)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Put an object
|
|
||||||
{
|
|
||||||
_, err := wc.Put(ctx, common.PutPrm{
|
|
||||||
Address: addr,
|
|
||||||
Object: obj,
|
|
||||||
RawData: data,
|
|
||||||
})
|
|
||||||
|
|
||||||
require.NoError(t, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Get the object previously put
|
|
||||||
{
|
|
||||||
gotObj, err := wc.Get(ctx, addr)
|
|
||||||
require.NoError(t, err)
|
|
||||||
gotData, err := gotObj.Marshal()
|
|
||||||
require.NoError(t, err)
|
|
||||||
require.Equal(t, data, gotData)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Delete the object previously put
|
|
||||||
{
|
|
||||||
require.NoError(t, wc.Delete(ctx, addr))
|
|
||||||
require.ErrorIs(t, wc.Delete(ctx, addr), writecache.ErrMiss)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Get the object previously deleted
|
|
||||||
{
|
|
||||||
_, gotErr := wc.Get(ctx, addr)
|
|
||||||
require.Error(t, gotErr, writecache.ErrMiss)
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -2,10 +2,20 @@ package writecachetest
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"path/filepath"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
|
objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
|
||||||
|
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache"
|
||||||
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -14,6 +24,24 @@ const (
|
||||||
smallSize = 256
|
smallSize = 256
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type CreateCacheFunc[Option any] func(
|
||||||
|
t *testing.T,
|
||||||
|
smallSize uint64,
|
||||||
|
meta *meta.DB,
|
||||||
|
bs writecache.MainStorage,
|
||||||
|
opts ...Option,
|
||||||
|
) writecache.Cache
|
||||||
|
|
||||||
|
type TestFailureInjector[Option any] struct {
|
||||||
|
Desc string
|
||||||
|
InjectFn func(*testing.T, writecache.Cache)
|
||||||
|
}
|
||||||
|
|
||||||
|
type objectPair struct {
|
||||||
|
addr oid.Address
|
||||||
|
obj *objectSDK.Object
|
||||||
|
}
|
||||||
|
|
||||||
func TestFlush[Option any](
|
func TestFlush[Option any](
|
||||||
t *testing.T,
|
t *testing.T,
|
||||||
createCacheFn CreateCacheFunc[Option],
|
createCacheFn CreateCacheFunc[Option],
|
||||||
|
@ -22,11 +50,6 @@ func TestFlush[Option any](
|
||||||
) {
|
) {
|
||||||
t.Run("no errors", func(t *testing.T) {
|
t.Run("no errors", func(t *testing.T) {
|
||||||
wc, bs, mb := newCache(t, createCacheFn, smallSize)
|
wc, bs, mb := newCache(t, createCacheFn, smallSize)
|
||||||
|
|
||||||
// Set mode for metabase and blobstor to prevent background flushes.
|
|
||||||
require.NoError(t, mb.SetMode(mode.ReadOnly))
|
|
||||||
require.NoError(t, bs.SetMode(mode.ReadOnly))
|
|
||||||
|
|
||||||
objects := putObjects(t, wc)
|
objects := putObjects(t, wc)
|
||||||
|
|
||||||
require.NoError(t, bs.SetMode(mode.ReadWrite))
|
require.NoError(t, bs.SetMode(mode.ReadWrite))
|
||||||
|
@ -39,11 +62,6 @@ func TestFlush[Option any](
|
||||||
|
|
||||||
t.Run("flush on moving to degraded mode", func(t *testing.T) {
|
t.Run("flush on moving to degraded mode", func(t *testing.T) {
|
||||||
wc, bs, mb := newCache(t, createCacheFn, smallSize)
|
wc, bs, mb := newCache(t, createCacheFn, smallSize)
|
||||||
|
|
||||||
// Set mode for metabase and blobstor to prevent background flushes.
|
|
||||||
require.NoError(t, mb.SetMode(mode.ReadOnly))
|
|
||||||
require.NoError(t, bs.SetMode(mode.ReadOnly))
|
|
||||||
|
|
||||||
objects := putObjects(t, wc)
|
objects := putObjects(t, wc)
|
||||||
|
|
||||||
// Blobstor is read-only, so we expect en error from `flush` here.
|
// Blobstor is read-only, so we expect en error from `flush` here.
|
||||||
|
@ -64,11 +82,6 @@ func TestFlush[Option any](
|
||||||
t.Run(f.Desc, func(t *testing.T) {
|
t.Run(f.Desc, func(t *testing.T) {
|
||||||
errCountOpt, errCount := errCountOption()
|
errCountOpt, errCount := errCountOption()
|
||||||
wc, bs, mb := newCache(t, createCacheFn, smallSize, errCountOpt)
|
wc, bs, mb := newCache(t, createCacheFn, smallSize, errCountOpt)
|
||||||
|
|
||||||
// Set mode for metabase and blobstor to prevent background flushes.
|
|
||||||
require.NoError(t, mb.SetMode(mode.ReadOnly))
|
|
||||||
require.NoError(t, bs.SetMode(mode.ReadOnly))
|
|
||||||
|
|
||||||
objects := putObjects(t, wc)
|
objects := putObjects(t, wc)
|
||||||
f.InjectFn(t, wc)
|
f.InjectFn(t, wc)
|
||||||
|
|
||||||
|
@ -86,3 +99,87 @@ func TestFlush[Option any](
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func newCache[Option any](
|
||||||
|
t *testing.T,
|
||||||
|
createCacheFn CreateCacheFunc[Option],
|
||||||
|
smallSize uint64,
|
||||||
|
opts ...Option,
|
||||||
|
) (writecache.Cache, *blobstor.BlobStor, *meta.DB) {
|
||||||
|
dir := t.TempDir()
|
||||||
|
mb := meta.New(
|
||||||
|
meta.WithPath(filepath.Join(dir, "meta")),
|
||||||
|
meta.WithEpochState(dummyEpoch{}))
|
||||||
|
require.NoError(t, mb.Open(false))
|
||||||
|
require.NoError(t, mb.Init())
|
||||||
|
|
||||||
|
bs := blobstor.New(blobstor.WithStorages([]blobstor.SubStorage{
|
||||||
|
{
|
||||||
|
Storage: fstree.New(
|
||||||
|
fstree.WithPath(filepath.Join(dir, "blob")),
|
||||||
|
fstree.WithDepth(0),
|
||||||
|
fstree.WithDirNameLen(1)),
|
||||||
|
},
|
||||||
|
}))
|
||||||
|
require.NoError(t, bs.Open(false))
|
||||||
|
require.NoError(t, bs.Init())
|
||||||
|
|
||||||
|
wc := createCacheFn(t, smallSize, mb, bs, opts...)
|
||||||
|
t.Cleanup(func() { require.NoError(t, wc.Close()) })
|
||||||
|
require.NoError(t, wc.Open(false))
|
||||||
|
require.NoError(t, wc.Init())
|
||||||
|
|
||||||
|
// First set mode for metabase and blobstor to prevent background flushes.
|
||||||
|
require.NoError(t, mb.SetMode(mode.ReadOnly))
|
||||||
|
require.NoError(t, bs.SetMode(mode.ReadOnly))
|
||||||
|
|
||||||
|
return wc, bs, mb
|
||||||
|
}
|
||||||
|
|
||||||
|
func putObject(t *testing.T, c writecache.Cache, size int) objectPair {
|
||||||
|
obj := testutil.GenerateObjectWithSize(size)
|
||||||
|
data, err := obj.Marshal()
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
var prm common.PutPrm
|
||||||
|
prm.Address = objectCore.AddressOf(obj)
|
||||||
|
prm.Object = obj
|
||||||
|
prm.RawData = data
|
||||||
|
|
||||||
|
_, err = c.Put(context.Background(), prm)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
return objectPair{prm.Address, prm.Object}
|
||||||
|
}
|
||||||
|
|
||||||
|
func putObjects(t *testing.T, c writecache.Cache) []objectPair {
|
||||||
|
objects := make([]objectPair, objCount)
|
||||||
|
for i := range objects {
|
||||||
|
objects[i] = putObject(t, c, 1+(i%2)*smallSize)
|
||||||
|
}
|
||||||
|
return objects
|
||||||
|
}
|
||||||
|
|
||||||
|
func check(t *testing.T, mb *meta.DB, bs *blobstor.BlobStor, objects []objectPair) {
|
||||||
|
for i := range objects {
|
||||||
|
var mPrm meta.StorageIDPrm
|
||||||
|
mPrm.SetAddress(objects[i].addr)
|
||||||
|
|
||||||
|
mRes, err := mb.StorageID(context.Background(), mPrm)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
var prm common.GetPrm
|
||||||
|
prm.Address = objects[i].addr
|
||||||
|
prm.StorageID = mRes.StorageID()
|
||||||
|
|
||||||
|
res, err := bs.Get(context.Background(), prm)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, objects[i].obj, res.Object)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type dummyEpoch struct{}
|
||||||
|
|
||||||
|
func (dummyEpoch) CurrentEpoch() uint64 {
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
|
|
@ -1,116 +0,0 @@
|
||||||
package writecachetest
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"path/filepath"
|
|
||||||
"testing"
|
|
||||||
|
|
||||||
objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
|
|
||||||
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache"
|
|
||||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
|
||||||
"github.com/stretchr/testify/require"
|
|
||||||
)
|
|
||||||
|
|
||||||
type CreateCacheFunc[Option any] func(
|
|
||||||
t *testing.T,
|
|
||||||
smallSize uint64,
|
|
||||||
meta *meta.DB,
|
|
||||||
bs writecache.MainStorage,
|
|
||||||
opts ...Option,
|
|
||||||
) writecache.Cache
|
|
||||||
|
|
||||||
type TestFailureInjector[Option any] struct {
|
|
||||||
Desc string
|
|
||||||
InjectFn func(*testing.T, writecache.Cache)
|
|
||||||
}
|
|
||||||
|
|
||||||
type objectPair struct {
|
|
||||||
addr oid.Address
|
|
||||||
obj *objectSDK.Object
|
|
||||||
}
|
|
||||||
|
|
||||||
func newCache[Option any](
|
|
||||||
t *testing.T,
|
|
||||||
createCacheFn CreateCacheFunc[Option],
|
|
||||||
smallSize uint64,
|
|
||||||
opts ...Option,
|
|
||||||
) (writecache.Cache, *blobstor.BlobStor, *meta.DB) {
|
|
||||||
dir := t.TempDir()
|
|
||||||
mb := meta.New(
|
|
||||||
meta.WithPath(filepath.Join(dir, "meta")),
|
|
||||||
meta.WithEpochState(dummyEpoch{}))
|
|
||||||
require.NoError(t, mb.Open(false))
|
|
||||||
require.NoError(t, mb.Init())
|
|
||||||
|
|
||||||
bs := blobstor.New(blobstor.WithStorages([]blobstor.SubStorage{
|
|
||||||
{
|
|
||||||
Storage: fstree.New(
|
|
||||||
fstree.WithPath(filepath.Join(dir, "blob")),
|
|
||||||
fstree.WithDepth(0),
|
|
||||||
fstree.WithDirNameLen(1)),
|
|
||||||
},
|
|
||||||
}))
|
|
||||||
require.NoError(t, bs.Open(false))
|
|
||||||
require.NoError(t, bs.Init())
|
|
||||||
|
|
||||||
wc := createCacheFn(t, smallSize, mb, bs, opts...)
|
|
||||||
t.Cleanup(func() { require.NoError(t, wc.Close()) })
|
|
||||||
require.NoError(t, wc.Open(false))
|
|
||||||
require.NoError(t, wc.Init())
|
|
||||||
|
|
||||||
return wc, bs, mb
|
|
||||||
}
|
|
||||||
|
|
||||||
func putObject(t *testing.T, c writecache.Cache, size int) objectPair {
|
|
||||||
obj := testutil.GenerateObjectWithSize(size)
|
|
||||||
data, err := obj.Marshal()
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
var prm common.PutPrm
|
|
||||||
prm.Address = objectCore.AddressOf(obj)
|
|
||||||
prm.Object = obj
|
|
||||||
prm.RawData = data
|
|
||||||
|
|
||||||
_, err = c.Put(context.Background(), prm)
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
return objectPair{prm.Address, prm.Object}
|
|
||||||
}
|
|
||||||
|
|
||||||
func putObjects(t *testing.T, c writecache.Cache) []objectPair {
|
|
||||||
objects := make([]objectPair, objCount)
|
|
||||||
for i := range objects {
|
|
||||||
objects[i] = putObject(t, c, 1+(i%2)*smallSize)
|
|
||||||
}
|
|
||||||
return objects
|
|
||||||
}
|
|
||||||
|
|
||||||
func check(t *testing.T, mb *meta.DB, bs *blobstor.BlobStor, objects []objectPair) {
|
|
||||||
for i := range objects {
|
|
||||||
var mPrm meta.StorageIDPrm
|
|
||||||
mPrm.SetAddress(objects[i].addr)
|
|
||||||
|
|
||||||
mRes, err := mb.StorageID(context.Background(), mPrm)
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
var prm common.GetPrm
|
|
||||||
prm.Address = objects[i].addr
|
|
||||||
prm.StorageID = mRes.StorageID()
|
|
||||||
|
|
||||||
res, err := bs.Get(context.Background(), prm)
|
|
||||||
require.NoError(t, err)
|
|
||||||
require.Equal(t, objects[i].obj, res.Object)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
type dummyEpoch struct{}
|
|
||||||
|
|
||||||
func (dummyEpoch) CurrentEpoch() uint64 {
|
|
||||||
return 0
|
|
||||||
}
|
|
|
@ -1,7 +1,6 @@
|
||||||
package container
|
package container
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"crypto/sha256"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
|
@ -12,22 +11,7 @@ import (
|
||||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (x *containerSource) DeletionInfo(cnr cid.ID) (*containercore.DelInfo, error) {
|
func (c *Client) DeletionInfo(cid cid.ID) (*containercore.DelInfo, error) {
|
||||||
return DeletionInfo((*Client)(x), cnr)
|
|
||||||
}
|
|
||||||
|
|
||||||
type deletionInfo interface {
|
|
||||||
DeletionInfo(cid []byte) (*containercore.DelInfo, error)
|
|
||||||
}
|
|
||||||
|
|
||||||
func DeletionInfo(c deletionInfo, cnr cid.ID) (*containercore.DelInfo, error) {
|
|
||||||
binCnr := make([]byte, sha256.Size)
|
|
||||||
cnr.Encode(binCnr)
|
|
||||||
|
|
||||||
return c.DeletionInfo(binCnr)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *Client) DeletionInfo(cid []byte) (*containercore.DelInfo, error) {
|
|
||||||
prm := client.TestInvokePrm{}
|
prm := client.TestInvokePrm{}
|
||||||
prm.SetMethod(deletionInfoMethod)
|
prm.SetMethod(deletionInfoMethod)
|
||||||
prm.SetArgs(cid)
|
prm.SetArgs(cid)
|
||||||
|
|
|
@ -5,7 +5,6 @@ import (
|
||||||
"errors"
|
"errors"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||||
containercore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
|
||||||
objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
||||||
|
@ -28,20 +27,12 @@ func (p *Policer) processObject(ctx context.Context, addrWithType objectcore.Add
|
||||||
zap.String("error", err.Error()),
|
zap.String("error", err.Error()),
|
||||||
)
|
)
|
||||||
if client.IsErrContainerNotFound(err) {
|
if client.IsErrContainerNotFound(err) {
|
||||||
existed, err := containercore.WasRemoved(p.cnrSrc, idCnr)
|
err := p.buryFn(ctx, addrWithType.Address)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
p.log.Error(logs.PolicerCouldNotConfirmContainerRemoval,
|
p.log.Error(logs.PolicerCouldNotInhumeObjectWithMissingContainer,
|
||||||
zap.Stringer("cid", idCnr),
|
zap.Stringer("cid", idCnr),
|
||||||
zap.Stringer("oid", idObj),
|
zap.Stringer("oid", idObj),
|
||||||
zap.String("error", err.Error()))
|
zap.String("error", err.Error()))
|
||||||
} else if existed {
|
|
||||||
err := p.buryFn(ctx, addrWithType.Address)
|
|
||||||
if err != nil {
|
|
||||||
p.log.Error(logs.PolicerCouldNotInhumeObjectWithMissingContainer,
|
|
||||||
zap.Stringer("cid", idCnr),
|
|
||||||
zap.Stringer("oid", idObj),
|
|
||||||
zap.String("error", err.Error()))
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -34,13 +34,8 @@ func TestBuryObjectWithoutContainer(t *testing.T) {
|
||||||
|
|
||||||
// Container source and bury function
|
// Container source and bury function
|
||||||
buryCh := make(chan oid.Address)
|
buryCh := make(chan oid.Address)
|
||||||
containerSrc := containerSrc{
|
containerSrc := func(id cid.ID) (*container.Container, error) {
|
||||||
get: func(id cid.ID) (*container.Container, error) {
|
return nil, new(apistatus.ContainerNotFound)
|
||||||
return nil, new(apistatus.ContainerNotFound)
|
|
||||||
},
|
|
||||||
deletionInfo: func(id cid.ID) (*container.DelInfo, error) {
|
|
||||||
return &container.DelInfo{}, nil
|
|
||||||
},
|
|
||||||
}
|
}
|
||||||
buryFn := func(ctx context.Context, a oid.Address) error {
|
buryFn := func(ctx context.Context, a oid.Address) error {
|
||||||
buryCh <- a
|
buryCh <- a
|
||||||
|
@ -54,7 +49,7 @@ func TestBuryObjectWithoutContainer(t *testing.T) {
|
||||||
// Policer instance
|
// Policer instance
|
||||||
p := New(
|
p := New(
|
||||||
WithKeySpaceIterator(&sliceKeySpaceIterator{objs: objs}),
|
WithKeySpaceIterator(&sliceKeySpaceIterator{objs: objs}),
|
||||||
WithContainerSource(containerSrc),
|
WithContainerSource(containerSrcFunc(containerSrc)),
|
||||||
WithBuryFunc(buryFn),
|
WithBuryFunc(buryFn),
|
||||||
WithPool(pool),
|
WithPool(pool),
|
||||||
)
|
)
|
||||||
|
@ -199,17 +194,12 @@ func TestProcessObject(t *testing.T) {
|
||||||
cnr := &container.Container{}
|
cnr := &container.Container{}
|
||||||
cnr.Value.Init()
|
cnr.Value.Init()
|
||||||
cnr.Value.SetPlacementPolicy(policy)
|
cnr.Value.SetPlacementPolicy(policy)
|
||||||
containerSrc := containerSrc{
|
containerSrc := func(id cid.ID) (*container.Container, error) {
|
||||||
get: func(id cid.ID) (*container.Container, error) {
|
if id.Equals(addr.Container()) {
|
||||||
if id.Equals(addr.Container()) {
|
return cnr, nil
|
||||||
return cnr, nil
|
}
|
||||||
}
|
t.Errorf("unexpected container requested: got=%v, want=%v", id, addr.Container())
|
||||||
t.Errorf("unexpected container requested: got=%v, want=%v", id, addr.Container())
|
return nil, new(apistatus.ContainerNotFound)
|
||||||
return nil, new(apistatus.ContainerNotFound)
|
|
||||||
},
|
|
||||||
deletionInfo: func(id cid.ID) (*container.DelInfo, error) {
|
|
||||||
return &container.DelInfo{}, nil
|
|
||||||
},
|
|
||||||
}
|
}
|
||||||
buryFn := func(ctx context.Context, a oid.Address) error {
|
buryFn := func(ctx context.Context, a oid.Address) error {
|
||||||
t.Errorf("unexpected object buried: %v", a)
|
t.Errorf("unexpected object buried: %v", a)
|
||||||
|
@ -221,7 +211,7 @@ func TestProcessObject(t *testing.T) {
|
||||||
var gotReplicateTo []int
|
var gotReplicateTo []int
|
||||||
|
|
||||||
p := New(
|
p := New(
|
||||||
WithContainerSource(containerSrc),
|
WithContainerSource(containerSrcFunc(containerSrc)),
|
||||||
WithPlacementBuilder(placementBuilderFunc(placementBuilder)),
|
WithPlacementBuilder(placementBuilderFunc(placementBuilder)),
|
||||||
WithNetmapKeys(announcedKeysFunc(func(k []byte) bool {
|
WithNetmapKeys(announcedKeysFunc(func(k []byte) bool {
|
||||||
return bytes.Equal(k, nodes[0].PublicKey())
|
return bytes.Equal(k, nodes[0].PublicKey())
|
||||||
|
@ -261,6 +251,9 @@ func TestIteratorContract(t *testing.T) {
|
||||||
Type: objectSDK.TypeRegular,
|
Type: objectSDK.TypeRegular,
|
||||||
}}
|
}}
|
||||||
|
|
||||||
|
containerSrc := func(id cid.ID) (*container.Container, error) {
|
||||||
|
return nil, new(apistatus.ContainerNotFound)
|
||||||
|
}
|
||||||
buryFn := func(ctx context.Context, a oid.Address) error {
|
buryFn := func(ctx context.Context, a oid.Address) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -280,18 +273,9 @@ func TestIteratorContract(t *testing.T) {
|
||||||
finishCh: make(chan struct{}),
|
finishCh: make(chan struct{}),
|
||||||
}
|
}
|
||||||
|
|
||||||
containerSrc := containerSrc{
|
|
||||||
get: func(id cid.ID) (*container.Container, error) {
|
|
||||||
return nil, new(apistatus.ContainerNotFound)
|
|
||||||
},
|
|
||||||
deletionInfo: func(id cid.ID) (*container.DelInfo, error) {
|
|
||||||
return &container.DelInfo{}, nil
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
p := New(
|
p := New(
|
||||||
WithKeySpaceIterator(it),
|
WithKeySpaceIterator(it),
|
||||||
WithContainerSource(containerSrc),
|
WithContainerSource(containerSrcFunc(containerSrc)),
|
||||||
WithBuryFunc(buryFn),
|
WithBuryFunc(buryFn),
|
||||||
WithPool(pool),
|
WithPool(pool),
|
||||||
func(c *cfg) {
|
func(c *cfg) {
|
||||||
|
@ -369,14 +353,10 @@ func (it *sliceKeySpaceIterator) Rewind() {
|
||||||
it.cur = 0
|
it.cur = 0
|
||||||
}
|
}
|
||||||
|
|
||||||
type containerSrc struct {
|
// containerSrcFunc is a container.Source backed by a function.
|
||||||
get func(id cid.ID) (*container.Container, error)
|
type containerSrcFunc func(cid.ID) (*container.Container, error)
|
||||||
deletionInfo func(id cid.ID) (*container.DelInfo, error)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (f containerSrc) Get(id cid.ID) (*container.Container, error) { return f.get(id) }
|
func (f containerSrcFunc) Get(id cid.ID) (*container.Container, error) { return f(id) }
|
||||||
|
|
||||||
func (f containerSrc) DeletionInfo(id cid.ID) (*container.DelInfo, error) { return f.deletionInfo(id) }
|
|
||||||
|
|
||||||
// placementBuilderFunc is a placement.Builder backed by a function
|
// placementBuilderFunc is a placement.Builder backed by a function
|
||||||
type placementBuilderFunc func(cid.ID, *oid.ID, netmap.PlacementPolicy) ([][]netmap.NodeInfo, error)
|
type placementBuilderFunc func(cid.ID, *oid.ID, netmap.PlacementPolicy) ([][]netmap.NodeInfo, error)
|
||||||
|
|
|
@ -12,13 +12,13 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||||
containerCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/netmap"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/netmap"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
|
||||||
metrics "git.frostfs.info/TrueCloudLab/frostfs-observability/metrics/grpc"
|
metrics "git.frostfs.info/TrueCloudLab/frostfs-observability/metrics/grpc"
|
||||||
tracing "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
tracing "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||||
tracing_grpc "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing/grpc"
|
tracing_grpc "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing/grpc"
|
||||||
|
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||||
"github.com/panjf2000/ants/v2"
|
"github.com/panjf2000/ants/v2"
|
||||||
|
@ -441,6 +441,18 @@ func (s *Service) syncContainers(ctx context.Context, cnrs []cid.ID) {
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *Service) containerEverExisted(cid cid.ID) (bool, error) {
|
||||||
|
_, err := s.cnrSource.DeletionInfo(cid)
|
||||||
|
if err == nil {
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
var errContainerNotFound *apistatus.ContainerNotFound
|
||||||
|
if errors.As(err, &errContainerNotFound) {
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
|
||||||
func (s *Service) removeContainers(ctx context.Context, newContainers map[cid.ID]struct{}) {
|
func (s *Service) removeContainers(ctx context.Context, newContainers map[cid.ID]struct{}) {
|
||||||
ctx, span := tracing.StartSpanFromContext(ctx, "TreeService.removeContainers")
|
ctx, span := tracing.StartSpanFromContext(ctx, "TreeService.removeContainers")
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
@ -454,7 +466,7 @@ func (s *Service) removeContainers(ctx context.Context, newContainers map[cid.ID
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
existed, err := containerCore.WasRemoved(s.cnrSource, cnr)
|
existed, err := s.containerEverExisted(cnr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
s.log.Error(logs.TreeCouldNotCheckIfContainerExisted,
|
s.log.Error(logs.TreeCouldNotCheckIfContainerExisted,
|
||||||
zap.Stringer("cid", cnr),
|
zap.Stringer("cid", cnr),
|
||||||
|
|
Loading…
Add table
Reference in a new issue