forked from TrueCloudLab/frostfs-node
[#xx] Add error type for writecache misses
Signed-off-by: Alejandro Lopez <a.lopez@yadro.com>
This commit is contained in:
parent
4ea0df77d0
commit
317e501d1c
13 changed files with 255 additions and 186 deletions
|
@ -2,6 +2,7 @@ package shard
|
|||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
|
@ -138,10 +139,10 @@ func (s *Shard) fetchObjectData(ctx context.Context, addr oid.Address, skipMeta
|
|||
|
||||
if s.hasWriteCache() {
|
||||
res, err := wc(s.writeCache)
|
||||
if err == nil || IsErrOutOfRange(err) {
|
||||
if err == nil || IsErrOutOfRange(err) || client.IsErrObjectNotFound(err) {
|
||||
return res, false, err
|
||||
}
|
||||
if client.IsErrObjectNotFound(err) {
|
||||
if errors.Is(err, writecache.ErrMiss) {
|
||||
s.log.Debug(logs.ShardObjectIsMissingInWritecache,
|
||||
zap.Stringer("addr", addr),
|
||||
zap.Bool("skip_meta", skipMeta))
|
||||
|
|
|
@ -63,4 +63,11 @@ var (
|
|||
ErrBigObject = errors.New("too big object")
|
||||
// ErrOutOfSpace is returned when there is no space left to put a new object.
|
||||
ErrOutOfSpace = errors.New("no space left in the write cache")
|
||||
// ErrMiss is returned when there is no information about the object in the cache.
|
||||
// Note that the distinction between this error and ObjectNotFound is important. When
|
||||
// the cache returns ObjectNotFound, it can be safely assumed that the object is not
|
||||
// found in the cache or any underlying storage. When the cache returns ErrMiss, nothing
|
||||
// is known about the object and thus the underlying storage must be consulted as well.
|
||||
// This allows the cache to serve as a consistent view of the underlying storage.
|
||||
ErrMiss = errors.New("write-cache miss")
|
||||
)
|
||||
|
|
|
@ -6,10 +6,8 @@ import (
|
|||
|
||||
storagelog "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/log"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"github.com/dgraph-io/badger/v4"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
|
@ -17,8 +15,6 @@ import (
|
|||
)
|
||||
|
||||
// Delete removes object from write-cache.
|
||||
//
|
||||
// Returns an error of type apistatus.ObjectNotFound if object is missing in write-cache.
|
||||
func (c *cache) Delete(ctx context.Context, addr oid.Address) error {
|
||||
_, span := tracing.StartSpanFromContext(ctx, "writecache.Delete",
|
||||
trace.WithAttributes(
|
||||
|
@ -39,22 +35,22 @@ func (c *cache) Delete(ctx context.Context, addr oid.Address) error {
|
|||
return writecache.ErrReadOnly
|
||||
}
|
||||
|
||||
saddr := addr.EncodeToString()
|
||||
key := addr2key(addr)
|
||||
|
||||
err := c.db.Update(func(tx *badger.Txn) error {
|
||||
it, err := tx.Get([]byte(saddr))
|
||||
it, err := tx.Get(key[:])
|
||||
if err != nil {
|
||||
if err == badger.ErrKeyNotFound {
|
||||
return logicerr.Wrap(new(apistatus.ObjectNotFound))
|
||||
return writecache.ErrMiss
|
||||
}
|
||||
return err
|
||||
}
|
||||
if it.ValueSize() > 0 {
|
||||
storageType = writecache.StorageTypeDB
|
||||
err := tx.Delete([]byte(saddr))
|
||||
err := tx.Delete(key[:])
|
||||
if err == nil {
|
||||
storagelog.Write(c.log,
|
||||
storagelog.AddressField(saddr),
|
||||
storagelog.AddressField(addr.EncodeToString()),
|
||||
storagelog.StorageTypeField(wcStorageType),
|
||||
storagelog.OpField("db DELETE"),
|
||||
)
|
||||
|
|
|
@ -1,19 +0,0 @@
|
|||
package writecachebadger
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/storagetest"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger/test"
|
||||
)
|
||||
|
||||
func TestGeneric(t *testing.T) {
|
||||
storagetest.TestAll(t, func(t *testing.T) storagetest.Component {
|
||||
return New(
|
||||
WithLogger(test.NewLogger(t, true)),
|
||||
WithFlushWorkersCount(2),
|
||||
WithPath(t.TempDir()),
|
||||
WithGCInterval(1*time.Second))
|
||||
})
|
||||
}
|
|
@ -5,10 +5,8 @@ import (
|
|||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"github.com/dgraph-io/badger/v4"
|
||||
|
@ -16,9 +14,6 @@ import (
|
|||
"go.opentelemetry.io/otel/trace"
|
||||
)
|
||||
|
||||
// Get returns object from write-cache.
|
||||
//
|
||||
// Returns an error of type apistatus.ObjectNotFound if the requested object is missing in write-cache.
|
||||
func (c *cache) Get(ctx context.Context, addr oid.Address) (*objectSDK.Object, error) {
|
||||
_, span := tracing.StartSpanFromContext(ctx, "writecache.Get",
|
||||
trace.WithAttributes(
|
||||
|
@ -47,12 +42,10 @@ func (c *cache) getInternal(addr oid.Address) (*objectSDK.Object, error) {
|
|||
return obj, obj.Unmarshal(value)
|
||||
}
|
||||
|
||||
return nil, logicerr.Wrap(new(apistatus.ObjectNotFound))
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Head returns object header from write-cache.
|
||||
//
|
||||
// Returns an error of type apistatus.ObjectNotFound if the requested object is missing in write-cache.
|
||||
func (c *cache) Head(ctx context.Context, addr oid.Address) (*objectSDK.Object, error) {
|
||||
_, span := tracing.StartSpanFromContext(ctx, "writecache.Head",
|
||||
trace.WithAttributes(
|
||||
|
@ -70,8 +63,6 @@ func (c *cache) Head(ctx context.Context, addr oid.Address) (*objectSDK.Object,
|
|||
|
||||
// Get fetches object from the underlying database.
|
||||
// Key should be a stringified address.
|
||||
//
|
||||
// Returns an error of type apistatus.ObjectNotFound if the requested object is missing in db.
|
||||
func Get(db *badger.DB, key []byte) ([]byte, error) {
|
||||
var value []byte
|
||||
|
||||
|
@ -79,7 +70,7 @@ func Get(db *badger.DB, key []byte) ([]byte, error) {
|
|||
it, err := tx.Get(key)
|
||||
if err != nil {
|
||||
if err == badger.ErrKeyNotFound {
|
||||
return logicerr.Wrap(new(apistatus.ObjectNotFound))
|
||||
return writecache.ErrMiss
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
@ -91,5 +82,5 @@ func Get(db *badger.DB, key []byte) ([]byte, error) {
|
|||
return nil
|
||||
})
|
||||
|
||||
return value, metaerr.Wrap(err)
|
||||
return value, err
|
||||
}
|
||||
|
|
|
@ -6,6 +6,7 @@ import (
|
|||
"testing"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/storagetest"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
|
||||
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache"
|
||||
|
@ -16,7 +17,7 @@ import (
|
|||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestFlush(t *testing.T) {
|
||||
func TestGeneric(t *testing.T) {
|
||||
createCacheFn := func(t *testing.T, smallSize uint64, mb *meta.DB, bs writecache.MainStorage, opts ...Option) writecache.Cache {
|
||||
return New(
|
||||
append([]Option{
|
||||
|
@ -59,5 +60,22 @@ func TestFlush(t *testing.T) {
|
|||
},
|
||||
}
|
||||
|
||||
writecachetest.TestFlush(t, createCacheFn, errCountOpt, failures...)
|
||||
t.Run("api", func(t *testing.T) {
|
||||
writecachetest.TestAPI(t, createCacheFn)
|
||||
})
|
||||
|
||||
t.Run("flush", func(t *testing.T) {
|
||||
writecachetest.TestFlush(t, createCacheFn, errCountOpt, failures...)
|
||||
})
|
||||
|
||||
t.Run("storage", func(t *testing.T) {
|
||||
storagetest.TestAll(t, func(t *testing.T) storagetest.Component {
|
||||
return New(
|
||||
WithLogger(test.NewLogger(t, true)),
|
||||
WithFlushWorkersCount(2),
|
||||
WithPath(t.TempDir()),
|
||||
WithGCInterval(1*time.Second))
|
||||
})
|
||||
})
|
||||
|
||||
}
|
|
@ -10,6 +10,7 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"go.etcd.io/bbolt"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
|
@ -17,8 +18,6 @@ import (
|
|||
)
|
||||
|
||||
// Delete removes object from write-cache.
|
||||
//
|
||||
// Returns an error of type apistatus.ObjectNotFound if object is missing in write-cache.
|
||||
func (c *cache) Delete(ctx context.Context, addr oid.Address) error {
|
||||
ctx, span := tracing.StartSpanFromContext(ctx, "writecache.Delete",
|
||||
trace.WithAttributes(
|
||||
|
@ -61,6 +60,7 @@ func (c *cache) Delete(ctx context.Context, addr oid.Address) error {
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
deleted = true
|
||||
storagelog.Write(c.log,
|
||||
storagelog.AddressField(saddr),
|
||||
storagelog.StorageTypeField(wcStorageType),
|
||||
|
@ -69,9 +69,9 @@ func (c *cache) Delete(ctx context.Context, addr oid.Address) error {
|
|||
if recordDeleted {
|
||||
c.objCounters.cDB.Add(math.MaxUint64)
|
||||
c.estimateCacheSize()
|
||||
return nil
|
||||
}
|
||||
deleted = true
|
||||
return nil
|
||||
return metaerr.Wrap(writecache.ErrMiss)
|
||||
}
|
||||
|
||||
storageType = writecache.StorageTypeFSTree
|
||||
|
@ -85,5 +85,8 @@ func (c *cache) Delete(ctx context.Context, addr oid.Address) error {
|
|||
deleted = true
|
||||
c.estimateCacheSize()
|
||||
}
|
||||
if client.IsErrObjectNotFound(err) {
|
||||
err = writecache.ErrMiss
|
||||
}
|
||||
return metaerr.Wrap(err)
|
||||
}
|
||||
|
|
|
@ -1,17 +0,0 @@
|
|||
package writecachebbolt
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/storagetest"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger/test"
|
||||
)
|
||||
|
||||
func TestGeneric(t *testing.T) {
|
||||
storagetest.TestAll(t, func(t *testing.T) storagetest.Component {
|
||||
return New(
|
||||
WithLogger(test.NewLogger(t, true)),
|
||||
WithFlushWorkersCount(2),
|
||||
WithPath(t.TempDir()))
|
||||
})
|
||||
}
|
|
@ -6,10 +6,8 @@ import (
|
|||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"github.com/nspcc-dev/neo-go/pkg/util/slice"
|
||||
|
@ -52,7 +50,7 @@ func (c *cache) getInternal(ctx context.Context, saddr string, addr oid.Address)
|
|||
|
||||
res, err := c.fsTree.Get(ctx, common.GetPrm{Address: addr})
|
||||
if err != nil {
|
||||
return nil, logicerr.Wrap(new(apistatus.ObjectNotFound))
|
||||
return nil, metaerr.Wrap(writecache.ErrMiss)
|
||||
}
|
||||
|
||||
found = true
|
||||
|
@ -61,8 +59,6 @@ func (c *cache) getInternal(ctx context.Context, saddr string, addr oid.Address)
|
|||
}
|
||||
|
||||
// Head returns object header from write-cache.
|
||||
//
|
||||
// Returns an error of type apistatus.ObjectNotFound if the requested object is missing in write-cache.
|
||||
func (c *cache) Head(ctx context.Context, addr oid.Address) (*objectSDK.Object, error) {
|
||||
saddr := addr.EncodeToString()
|
||||
|
||||
|
@ -82,8 +78,6 @@ func (c *cache) Head(ctx context.Context, addr oid.Address) (*objectSDK.Object,
|
|||
|
||||
// Get fetches object from the underlying database.
|
||||
// Key should be a stringified address.
|
||||
//
|
||||
// Returns an error of type apistatus.ObjectNotFound if the requested object is missing in db.
|
||||
func Get(db *bbolt.DB, key []byte) ([]byte, error) {
|
||||
var value []byte
|
||||
err := db.View(func(tx *bbolt.Tx) error {
|
||||
|
@ -93,7 +87,7 @@ func Get(db *bbolt.DB, key []byte) ([]byte, error) {
|
|||
}
|
||||
value = b.Get(key)
|
||||
if value == nil {
|
||||
return logicerr.Wrap(new(apistatus.ObjectNotFound))
|
||||
return metaerr.Wrap(writecache.ErrMiss)
|
||||
}
|
||||
value = slice.Copy(value)
|
||||
return nil
|
||||
|
|
|
@ -9,6 +9,7 @@ import (
|
|||
|
||||
objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/storagetest"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
|
||||
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache"
|
||||
|
@ -100,5 +101,20 @@ func TestFlush(t *testing.T) {
|
|||
},
|
||||
}
|
||||
|
||||
writecachetest.TestFlush(t, createCacheFn, errCountOpt, failures...)
|
||||
t.Run("api", func(t *testing.T) {
|
||||
writecachetest.TestAPI(t, createCacheFn)
|
||||
})
|
||||
|
||||
t.Run("flush", func(t *testing.T) {
|
||||
writecachetest.TestFlush(t, createCacheFn, errCountOpt, failures...)
|
||||
})
|
||||
|
||||
t.Run("storage", func(t *testing.T) {
|
||||
storagetest.TestAll(t, func(t *testing.T) storagetest.Component {
|
||||
return New(
|
||||
WithLogger(test.NewLogger(t, true)),
|
||||
WithFlushWorkersCount(2),
|
||||
WithPath(t.TempDir()))
|
||||
})
|
||||
})
|
||||
}
|
60
pkg/local_object_storage/writecache/writecachetest/api.go
Normal file
60
pkg/local_object_storage/writecache/writecachetest/api.go
Normal file
|
@ -0,0 +1,60 @@
|
|||
package writecachetest
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache"
|
||||
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestAPI[Option any](t *testing.T, createCacheFn CreateCacheFunc[Option]) {
|
||||
ctx := context.Background()
|
||||
wc, _, _ := newCache(t, createCacheFn, smallSize)
|
||||
|
||||
obj := testutil.GenerateObject()
|
||||
addr := testutil.AddressFromObject(t, obj)
|
||||
data, err := obj.Marshal()
|
||||
require.NoError(t, err)
|
||||
|
||||
// Get nonexistent object
|
||||
{
|
||||
_, gotErr := wc.Get(ctx, oidtest.Address())
|
||||
require.ErrorIs(t, gotErr, writecache.ErrMiss)
|
||||
}
|
||||
|
||||
// Put an object
|
||||
{
|
||||
_, err := wc.Put(ctx, common.PutPrm{
|
||||
Address: addr,
|
||||
Object: obj,
|
||||
RawData: data,
|
||||
})
|
||||
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
// Get the object previously put
|
||||
{
|
||||
gotObj, err := wc.Get(ctx, addr)
|
||||
require.NoError(t, err)
|
||||
gotData, err := gotObj.Marshal()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, data, gotData)
|
||||
}
|
||||
|
||||
// Delete the object previously put
|
||||
{
|
||||
require.NoError(t, wc.Delete(ctx, addr))
|
||||
require.ErrorIs(t, wc.Delete(ctx, addr), writecache.ErrMiss)
|
||||
}
|
||||
|
||||
// Get the object previously deleted
|
||||
{
|
||||
_, gotErr := wc.Get(ctx, addr)
|
||||
require.Error(t, gotErr, writecache.ErrMiss)
|
||||
}
|
||||
}
|
|
@ -2,20 +2,10 @@ package writecachetest
|
|||
|
||||
import (
|
||||
"context"
|
||||
"path/filepath"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
|
||||
objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
|
||||
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache"
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
|
@ -24,24 +14,6 @@ const (
|
|||
smallSize = 256
|
||||
)
|
||||
|
||||
type CreateCacheFunc[Option any] func(
|
||||
t *testing.T,
|
||||
smallSize uint64,
|
||||
meta *meta.DB,
|
||||
bs writecache.MainStorage,
|
||||
opts ...Option,
|
||||
) writecache.Cache
|
||||
|
||||
type TestFailureInjector[Option any] struct {
|
||||
Desc string
|
||||
InjectFn func(*testing.T, writecache.Cache)
|
||||
}
|
||||
|
||||
type objectPair struct {
|
||||
addr oid.Address
|
||||
obj *objectSDK.Object
|
||||
}
|
||||
|
||||
func TestFlush[Option any](
|
||||
t *testing.T,
|
||||
createCacheFn CreateCacheFunc[Option],
|
||||
|
@ -50,6 +22,11 @@ func TestFlush[Option any](
|
|||
) {
|
||||
t.Run("no errors", func(t *testing.T) {
|
||||
wc, bs, mb := newCache(t, createCacheFn, smallSize)
|
||||
|
||||
// Set mode for metabase and blobstor to prevent background flushes.
|
||||
require.NoError(t, mb.SetMode(mode.ReadOnly))
|
||||
require.NoError(t, bs.SetMode(mode.ReadOnly))
|
||||
|
||||
objects := putObjects(t, wc)
|
||||
|
||||
require.NoError(t, bs.SetMode(mode.ReadWrite))
|
||||
|
@ -62,6 +39,11 @@ func TestFlush[Option any](
|
|||
|
||||
t.Run("flush on moving to degraded mode", func(t *testing.T) {
|
||||
wc, bs, mb := newCache(t, createCacheFn, smallSize)
|
||||
|
||||
// Set mode for metabase and blobstor to prevent background flushes.
|
||||
require.NoError(t, mb.SetMode(mode.ReadOnly))
|
||||
require.NoError(t, bs.SetMode(mode.ReadOnly))
|
||||
|
||||
objects := putObjects(t, wc)
|
||||
|
||||
// Blobstor is read-only, so we expect en error from `flush` here.
|
||||
|
@ -82,6 +64,11 @@ func TestFlush[Option any](
|
|||
t.Run(f.Desc, func(t *testing.T) {
|
||||
errCountOpt, errCount := errCountOption()
|
||||
wc, bs, mb := newCache(t, createCacheFn, smallSize, errCountOpt)
|
||||
|
||||
// Set mode for metabase and blobstor to prevent background flushes.
|
||||
require.NoError(t, mb.SetMode(mode.ReadOnly))
|
||||
require.NoError(t, bs.SetMode(mode.ReadOnly))
|
||||
|
||||
objects := putObjects(t, wc)
|
||||
f.InjectFn(t, wc)
|
||||
|
||||
|
@ -99,87 +86,3 @@ func TestFlush[Option any](
|
|||
}
|
||||
})
|
||||
}
|
||||
|
||||
func newCache[Option any](
|
||||
t *testing.T,
|
||||
createCacheFn CreateCacheFunc[Option],
|
||||
smallSize uint64,
|
||||
opts ...Option,
|
||||
) (writecache.Cache, *blobstor.BlobStor, *meta.DB) {
|
||||
dir := t.TempDir()
|
||||
mb := meta.New(
|
||||
meta.WithPath(filepath.Join(dir, "meta")),
|
||||
meta.WithEpochState(dummyEpoch{}))
|
||||
require.NoError(t, mb.Open(false))
|
||||
require.NoError(t, mb.Init())
|
||||
|
||||
bs := blobstor.New(blobstor.WithStorages([]blobstor.SubStorage{
|
||||
{
|
||||
Storage: fstree.New(
|
||||
fstree.WithPath(filepath.Join(dir, "blob")),
|
||||
fstree.WithDepth(0),
|
||||
fstree.WithDirNameLen(1)),
|
||||
},
|
||||
}))
|
||||
require.NoError(t, bs.Open(false))
|
||||
require.NoError(t, bs.Init())
|
||||
|
||||
wc := createCacheFn(t, smallSize, mb, bs, opts...)
|
||||
t.Cleanup(func() { require.NoError(t, wc.Close()) })
|
||||
require.NoError(t, wc.Open(false))
|
||||
require.NoError(t, wc.Init())
|
||||
|
||||
// First set mode for metabase and blobstor to prevent background flushes.
|
||||
require.NoError(t, mb.SetMode(mode.ReadOnly))
|
||||
require.NoError(t, bs.SetMode(mode.ReadOnly))
|
||||
|
||||
return wc, bs, mb
|
||||
}
|
||||
|
||||
func putObject(t *testing.T, c writecache.Cache, size int) objectPair {
|
||||
obj := testutil.GenerateObjectWithSize(size)
|
||||
data, err := obj.Marshal()
|
||||
require.NoError(t, err)
|
||||
|
||||
var prm common.PutPrm
|
||||
prm.Address = objectCore.AddressOf(obj)
|
||||
prm.Object = obj
|
||||
prm.RawData = data
|
||||
|
||||
_, err = c.Put(context.Background(), prm)
|
||||
require.NoError(t, err)
|
||||
|
||||
return objectPair{prm.Address, prm.Object}
|
||||
}
|
||||
|
||||
func putObjects(t *testing.T, c writecache.Cache) []objectPair {
|
||||
objects := make([]objectPair, objCount)
|
||||
for i := range objects {
|
||||
objects[i] = putObject(t, c, 1+(i%2)*smallSize)
|
||||
}
|
||||
return objects
|
||||
}
|
||||
|
||||
func check(t *testing.T, mb *meta.DB, bs *blobstor.BlobStor, objects []objectPair) {
|
||||
for i := range objects {
|
||||
var mPrm meta.StorageIDPrm
|
||||
mPrm.SetAddress(objects[i].addr)
|
||||
|
||||
mRes, err := mb.StorageID(context.Background(), mPrm)
|
||||
require.NoError(t, err)
|
||||
|
||||
var prm common.GetPrm
|
||||
prm.Address = objects[i].addr
|
||||
prm.StorageID = mRes.StorageID()
|
||||
|
||||
res, err := bs.Get(context.Background(), prm)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, objects[i].obj, res.Object)
|
||||
}
|
||||
}
|
||||
|
||||
type dummyEpoch struct{}
|
||||
|
||||
func (dummyEpoch) CurrentEpoch() uint64 {
|
||||
return 0
|
||||
}
|
||||
|
|
|
@ -0,0 +1,116 @@
|
|||
package writecachetest
|
||||
|
||||
import (
|
||||
"context"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
|
||||
objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
|
||||
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache"
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
type CreateCacheFunc[Option any] func(
|
||||
t *testing.T,
|
||||
smallSize uint64,
|
||||
meta *meta.DB,
|
||||
bs writecache.MainStorage,
|
||||
opts ...Option,
|
||||
) writecache.Cache
|
||||
|
||||
type TestFailureInjector[Option any] struct {
|
||||
Desc string
|
||||
InjectFn func(*testing.T, writecache.Cache)
|
||||
}
|
||||
|
||||
type objectPair struct {
|
||||
addr oid.Address
|
||||
obj *objectSDK.Object
|
||||
}
|
||||
|
||||
func newCache[Option any](
|
||||
t *testing.T,
|
||||
createCacheFn CreateCacheFunc[Option],
|
||||
smallSize uint64,
|
||||
opts ...Option,
|
||||
) (writecache.Cache, *blobstor.BlobStor, *meta.DB) {
|
||||
dir := t.TempDir()
|
||||
mb := meta.New(
|
||||
meta.WithPath(filepath.Join(dir, "meta")),
|
||||
meta.WithEpochState(dummyEpoch{}))
|
||||
require.NoError(t, mb.Open(false))
|
||||
require.NoError(t, mb.Init())
|
||||
|
||||
bs := blobstor.New(blobstor.WithStorages([]blobstor.SubStorage{
|
||||
{
|
||||
Storage: fstree.New(
|
||||
fstree.WithPath(filepath.Join(dir, "blob")),
|
||||
fstree.WithDepth(0),
|
||||
fstree.WithDirNameLen(1)),
|
||||
},
|
||||
}))
|
||||
require.NoError(t, bs.Open(false))
|
||||
require.NoError(t, bs.Init())
|
||||
|
||||
wc := createCacheFn(t, smallSize, mb, bs, opts...)
|
||||
t.Cleanup(func() { require.NoError(t, wc.Close()) })
|
||||
require.NoError(t, wc.Open(false))
|
||||
require.NoError(t, wc.Init())
|
||||
|
||||
return wc, bs, mb
|
||||
}
|
||||
|
||||
func putObject(t *testing.T, c writecache.Cache, size int) objectPair {
|
||||
obj := testutil.GenerateObjectWithSize(size)
|
||||
data, err := obj.Marshal()
|
||||
require.NoError(t, err)
|
||||
|
||||
var prm common.PutPrm
|
||||
prm.Address = objectCore.AddressOf(obj)
|
||||
prm.Object = obj
|
||||
prm.RawData = data
|
||||
|
||||
_, err = c.Put(context.Background(), prm)
|
||||
require.NoError(t, err)
|
||||
|
||||
return objectPair{prm.Address, prm.Object}
|
||||
}
|
||||
|
||||
func putObjects(t *testing.T, c writecache.Cache) []objectPair {
|
||||
objects := make([]objectPair, objCount)
|
||||
for i := range objects {
|
||||
objects[i] = putObject(t, c, 1+(i%2)*smallSize)
|
||||
}
|
||||
return objects
|
||||
}
|
||||
|
||||
func check(t *testing.T, mb *meta.DB, bs *blobstor.BlobStor, objects []objectPair) {
|
||||
for i := range objects {
|
||||
var mPrm meta.StorageIDPrm
|
||||
mPrm.SetAddress(objects[i].addr)
|
||||
|
||||
mRes, err := mb.StorageID(context.Background(), mPrm)
|
||||
require.NoError(t, err)
|
||||
|
||||
var prm common.GetPrm
|
||||
prm.Address = objects[i].addr
|
||||
prm.StorageID = mRes.StorageID()
|
||||
|
||||
res, err := bs.Get(context.Background(), prm)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, objects[i].obj, res.Object)
|
||||
}
|
||||
}
|
||||
|
||||
type dummyEpoch struct{}
|
||||
|
||||
func (dummyEpoch) CurrentEpoch() uint64 {
|
||||
return 0
|
||||
}
|
Loading…
Reference in a new issue