Compare commits

...

1 commit

Author SHA1 Message Date
6df9f44cfd [#139] test: Add test storage implementation
This aims to reduce the usage of chmod hackery to induce or simulate
OS-related failures.

Signed-off-by: Alejandro Lopez <a.lopez@yadro.com>
2023-03-29 17:24:53 +03:00
20 changed files with 617 additions and 208 deletions

View file

@ -1,6 +1,8 @@
package writecache package writecache
import ( import (
"os"
common "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-lens/internal" common "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-lens/internal"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache"
"github.com/spf13/cobra" "github.com/spf13/cobra"
@ -24,7 +26,7 @@ func init() {
} }
func openWC(cmd *cobra.Command) *bbolt.DB { func openWC(cmd *cobra.Command) *bbolt.DB {
db, err := writecache.OpenDB(vPath, true) db, err := writecache.OpenDB(vPath, true, os.OpenFile)
common.ExitOnErr(cmd, common.Errf("could not open write-cache db: %w", err)) common.ExitOnErr(cmd, common.Errf("could not open write-cache db: %w", err))
return db return db

View file

@ -50,7 +50,6 @@ func TestExistsInvalidStorageID(t *testing.T) {
}) })
t.Run("invalid storage id", func(t *testing.T) { t.Run("invalid storage id", func(t *testing.T) {
// "0/X/Y" <-> "1/X/Y"
storageID := slice.Copy(putRes.StorageID) storageID := slice.Copy(putRes.StorageID)
storageID[0] = '9' storageID[0] = '9'
badDir := filepath.Join(dir, "9") badDir := filepath.Join(dir, "9")

View file

@ -1,7 +1,6 @@
package blobstor package blobstor
import ( import (
"os"
"path/filepath" "path/filepath"
"testing" "testing"
@ -9,32 +8,37 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/blobovniczatree" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/blobovniczatree"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/teststore"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
const blobovniczaDir = "blobovniczas" func defaultTestStorages(p string, smallSizeLimit uint64) ([]SubStorage, *teststore.TestStore, *teststore.TestStore) {
smallFileStorage := teststore.New(teststore.WithSubstorage(blobovniczatree.NewBlobovniczaTree(
func defaultStorages(p string, smallSizeLimit uint64) []SubStorage { blobovniczatree.WithRootPath(filepath.Join(p, "blobovniczas")),
blobovniczatree.WithBlobovniczaShallowWidth(1)), // default width is 16, slow init
))
largeFileStorage := teststore.New(teststore.WithSubstorage(fstree.New(fstree.WithPath(p))))
return []SubStorage{ return []SubStorage{
{ {
Storage: blobovniczatree.NewBlobovniczaTree( Storage: smallFileStorage,
blobovniczatree.WithRootPath(filepath.Join(p, "blobovniczas")),
blobovniczatree.WithBlobovniczaShallowWidth(1)), // default width is 16, slow init
Policy: func(_ *objectSDK.Object, data []byte) bool { Policy: func(_ *objectSDK.Object, data []byte) bool {
return uint64(len(data)) <= smallSizeLimit return uint64(len(data)) <= smallSizeLimit
}, },
}, },
{ {
Storage: fstree.New(fstree.WithPath(p)), Storage: largeFileStorage,
}, },
} }, smallFileStorage, largeFileStorage
}
func defaultStorages(p string, smallSizeLimit uint64) []SubStorage {
storages, _, _ := defaultTestStorages(p, smallSizeLimit)
return storages
} }
func TestCompression(t *testing.T) { func TestCompression(t *testing.T) {
dir, err := os.MkdirTemp("", "frostfs*") dir := t.TempDir()
require.NoError(t, err)
t.Cleanup(func() { _ = os.RemoveAll(dir) })
const ( const (
smallSizeLimit = 512 smallSizeLimit = 512
@ -70,7 +74,7 @@ func TestCompression(t *testing.T) {
testPut := func(t *testing.T, b *BlobStor, i int) { testPut := func(t *testing.T, b *BlobStor, i int) {
var prm common.PutPrm var prm common.PutPrm
prm.Object = smallObj[i] prm.Object = smallObj[i]
_, err = b.Put(prm) _, err := b.Put(prm)
require.NoError(t, err) require.NoError(t, err)
prm = common.PutPrm{} prm = common.PutPrm{}
@ -102,9 +106,7 @@ func TestCompression(t *testing.T) {
func TestBlobstor_needsCompression(t *testing.T) { func TestBlobstor_needsCompression(t *testing.T) {
const smallSizeLimit = 512 const smallSizeLimit = 512
newBlobStor := func(t *testing.T, compress bool, ct ...string) *BlobStor { newBlobStor := func(t *testing.T, compress bool, ct ...string) *BlobStor {
dir, err := os.MkdirTemp("", "frostfs*") dir := t.TempDir()
require.NoError(t, err)
t.Cleanup(func() { _ = os.RemoveAll(dir) })
bs := New( bs := New(
WithCompressObjects(compress), WithCompressObjects(compress),

View file

@ -2,11 +2,11 @@ package blobstor
import ( import (
"os" "os"
"path/filepath"
"testing" "testing"
objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/teststore"
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test" cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test" oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
@ -20,8 +20,10 @@ func TestExists(t *testing.T) {
const smallSizeLimit = 512 const smallSizeLimit = 512
b := New( storages, _, largeFileStorage := defaultTestStorages(dir, smallSizeLimit)
WithStorages(defaultStorages(dir, smallSizeLimit)))
b := New(WithStorages(storages))
require.NoError(t, b.Open(false)) require.NoError(t, b.Open(false))
require.NoError(t, b.Init()) require.NoError(t, b.Init())
@ -33,7 +35,7 @@ func TestExists(t *testing.T) {
for i := range objects { for i := range objects {
var prm common.PutPrm var prm common.PutPrm
prm.Object = objects[i] prm.Object = objects[i]
_, err = b.Put(prm) _, err := b.Put(prm)
require.NoError(t, err) require.NoError(t, err)
} }
@ -51,20 +53,10 @@ func TestExists(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
require.False(t, res.Exists) require.False(t, res.Exists)
t.Run("corrupt direcrory", func(t *testing.T) { t.Run("corrupt directory", func(t *testing.T) {
var bigDir string largeFileStorage.SetOption(teststore.WithExists(func(common.ExistsPrm) (common.ExistsRes, error) {
de, err := os.ReadDir(dir) return common.ExistsRes{}, teststore.ErrDiskExploded
require.NoError(t, err) }))
for i := range de {
if de[i].Name() != blobovniczaDir {
bigDir = filepath.Join(dir, de[i].Name())
break
}
}
require.NotEmpty(t, bigDir)
require.NoError(t, os.Chmod(dir, 0))
t.Cleanup(func() { require.NoError(t, os.Chmod(dir, 0777)) })
// Object exists, first error is logged. // Object exists, first error is logged.
prm.Address = objectCore.AddressOf(objects[0]) prm.Address = objectCore.AddressOf(objects[0])
@ -76,6 +68,7 @@ func TestExists(t *testing.T) {
prm.Address = objectCore.AddressOf(objects[1]) prm.Address = objectCore.AddressOf(objects[1])
_, err = b.Exists(prm) _, err = b.Exists(prm)
require.Error(t, err) require.Error(t, err)
require.ErrorIs(t, err, teststore.ErrDiskExploded)
}) })
} }

View file

@ -1,23 +1,16 @@
package blobstor package blobstor
import ( import (
"os"
"path/filepath"
"strconv"
"testing" "testing"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/storagetest" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/storagetest"
) )
func TestGeneric(t *testing.T) { func TestGeneric(t *testing.T) {
defer func() { _ = os.RemoveAll(t.Name()) }()
var n int
newMetabase := func(t *testing.T) storagetest.Component { newMetabase := func(t *testing.T) storagetest.Component {
n++
dir := filepath.Join(t.Name(), strconv.Itoa(n))
return New( return New(
WithStorages(defaultStorages(dir, 128))) WithStorages(defaultStorages(t.TempDir(), 128)))
} }
storagetest.TestAll(t, newMetabase) storagetest.TestAll(t, newMetabase)

View file

@ -0,0 +1,74 @@
package teststore
import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/compression"
)
type cfg struct {
st common.Storage
overrides struct {
Open func(readOnly bool) error
Init func() error
Close func() error
Type func() string
Path func() string
SetCompressor func(cc *compression.Config)
SetReportErrorFunc func(f func(string, error))
Get func(common.GetPrm) (common.GetRes, error)
GetRange func(common.GetRangePrm) (common.GetRangeRes, error)
Exists func(common.ExistsPrm) (common.ExistsRes, error)
Put func(common.PutPrm) (common.PutRes, error)
Delete func(common.DeletePrm) (common.DeleteRes, error)
Iterate func(common.IteratePrm) (common.IterateRes, error)
}
}
type Option func(*cfg)
func WithSubstorage(st common.Storage) Option {
return func(c *cfg) {
c.st = st
}
}
func WithOpen(f func(bool) error) Option { return func(c *cfg) { c.overrides.Open = f } }
func WithInit(f func() error) Option { return func(c *cfg) { c.overrides.Init = f } }
func WithClose(f func() error) Option { return func(c *cfg) { c.overrides.Close = f } }
func WithType(f func() string) Option { return func(c *cfg) { c.overrides.Type = f } }
func WithPath(f func() string) Option { return func(c *cfg) { c.overrides.Path = f } }
func WithSetCompressor(f func(*compression.Config)) Option {
return func(c *cfg) { c.overrides.SetCompressor = f }
}
func WithReportErrorFunc(f func(func(string, error))) Option {
return func(c *cfg) { c.overrides.SetReportErrorFunc = f }
}
func WithGet(f func(common.GetPrm) (common.GetRes, error)) Option {
return func(c *cfg) { c.overrides.Get = f }
}
func WithGetRange(f func(common.GetRangePrm) (common.GetRangeRes, error)) Option {
return func(c *cfg) { c.overrides.GetRange = f }
}
func WithExists(f func(common.ExistsPrm) (common.ExistsRes, error)) Option {
return func(c *cfg) { c.overrides.Exists = f }
}
func WithPut(f func(common.PutPrm) (common.PutRes, error)) Option {
return func(c *cfg) { c.overrides.Put = f }
}
func WithDelete(f func(common.DeletePrm) (common.DeleteRes, error)) Option {
return func(c *cfg) { c.overrides.Delete = f }
}
func WithIterate(f func(common.IteratePrm) (common.IterateRes, error)) Option {
return func(c *cfg) { c.overrides.Iterate = f }
}

View file

@ -0,0 +1,215 @@
// Package teststore provides a common.Storage implementation for testing/mocking purposes.
//
// A new teststore.TestStore can be obtained with teststore.New. Whenever one of the common.Storage
// methods is called, the implementation selects what function to call in the following order:
// 1. If an override for that method was provided at construction time (via teststore.WithXXX()) or
// afterwards via SetOption, that override is used.
// 2. If a substorage was provided at construction time (via teststore.WithSubstorage()) or afterwars
// via SetOption, the corresponding method in the substorage is used.
// 3. If none of the above apply, the call panics with an error describing the unexpected call.
//
// It's safe to call SetOption and the overrides from multiple goroutines, but it's the override's
// responsibility to ensure safety of whatever operation it executes.
package teststore
import (
"errors"
"fmt"
"sync"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/compression"
)
// TestStore is a common.Storage implementation for testing/mocking purposes.
type TestStore struct {
mu sync.RWMutex
*cfg
}
// ErrDiskExploded is a phony error which can be used for testing purposes to differentiate it from
// more common errors.
var ErrDiskExploded = errors.New("disk exploded")
// New returns a teststore.TestStore from the given options.
func New(opts ...Option) *TestStore {
c := &cfg{}
for _, opt := range opts {
opt(c)
}
return &TestStore{cfg: c}
}
// SetOption overrides an option of an existing teststore.TestStore.
// This is useful for overriding methods during a test so that different
// behaviors are simulated.
func (s *TestStore) SetOption(opt Option) {
s.mu.Lock()
defer s.mu.Unlock()
opt(s.cfg)
}
func (s *TestStore) Open(readOnly bool) error {
s.mu.RLock()
defer s.mu.RUnlock()
switch {
case s.overrides.Open != nil:
return s.overrides.Open(readOnly)
case s.st != nil:
return s.st.Open(readOnly)
default:
panic(fmt.Sprintf("unexpected storage call: Open(%v)", readOnly))
}
}
func (s *TestStore) Init() error {
s.mu.RLock()
defer s.mu.RUnlock()
switch {
case s.overrides.Init != nil:
return s.overrides.Init()
case s.st != nil:
return s.st.Init()
default:
panic("unexpected storage call: Init()")
}
}
func (s *TestStore) Close() error {
s.mu.RLock()
defer s.mu.RUnlock()
switch {
case s.overrides.Close != nil:
return s.overrides.Close()
case s.st != nil:
return s.st.Close()
default:
panic("unexpected storage call: Close()")
}
}
func (s *TestStore) Type() string {
s.mu.RLock()
defer s.mu.RUnlock()
switch {
case s.overrides.Type != nil:
return s.overrides.Type()
case s.st != nil:
return s.st.Type()
default:
panic("unexpected storage call: Type()")
}
}
func (s *TestStore) Path() string {
s.mu.RLock()
defer s.mu.RUnlock()
switch {
case s.overrides.Path != nil:
return s.overrides.Path()
case s.st != nil:
return s.st.Path()
default:
panic("unexpected storage call: Path()")
}
}
func (s *TestStore) SetCompressor(cc *compression.Config) {
s.mu.RLock()
defer s.mu.RUnlock()
switch {
case s.overrides.SetCompressor != nil:
s.overrides.SetCompressor(cc)
case s.st != nil:
s.st.SetCompressor(cc)
default:
panic(fmt.Sprintf("unexpected storage call: SetCompressor(%+v)", cc))
}
}
func (s *TestStore) SetReportErrorFunc(f func(string, error)) {
s.mu.RLock()
defer s.mu.RUnlock()
switch {
case s.overrides.SetReportErrorFunc != nil:
s.overrides.SetReportErrorFunc(f)
case s.st != nil:
s.st.SetReportErrorFunc(f)
default:
panic("unexpected storage call: SetReportErrorFunc(<func>)")
}
}
func (s *TestStore) Get(req common.GetPrm) (common.GetRes, error) {
switch {
case s.overrides.Get != nil:
return s.overrides.Get(req)
case s.st != nil:
return s.st.Get(req)
default:
panic(fmt.Sprintf("unexpected storage call: Get(%+v)", req))
}
}
func (s *TestStore) GetRange(req common.GetRangePrm) (common.GetRangeRes, error) {
s.mu.RLock()
defer s.mu.RUnlock()
switch {
case s.overrides.GetRange != nil:
return s.overrides.GetRange(req)
case s.st != nil:
return s.st.GetRange(req)
default:
panic(fmt.Sprintf("unexpected storage call: GetRange(%+v)", req))
}
}
func (s *TestStore) Exists(req common.ExistsPrm) (common.ExistsRes, error) {
switch {
case s.overrides.Exists != nil:
return s.overrides.Exists(req)
case s.st != nil:
return s.st.Exists(req)
default:
panic(fmt.Sprintf("unexpected storage call: Exists(%+v)", req))
}
}
func (s *TestStore) Put(req common.PutPrm) (common.PutRes, error) {
s.mu.RLock()
defer s.mu.RUnlock()
switch {
case s.overrides.Put != nil:
return s.overrides.Put(req)
case s.st != nil:
return s.st.Put(req)
default:
panic(fmt.Sprintf("unexpected storage call: Put(%+v)", req))
}
}
func (s *TestStore) Delete(req common.DeletePrm) (common.DeleteRes, error) {
s.mu.RLock()
defer s.mu.RUnlock()
switch {
case s.overrides.Delete != nil:
return s.overrides.Delete(req)
case s.st != nil:
return s.st.Delete(req)
default:
panic(fmt.Sprintf("unexpected storage call: Delete(%+v)", req))
}
}
func (s *TestStore) Iterate(req common.IteratePrm) (common.IterateRes, error) {
s.mu.RLock()
defer s.mu.RUnlock()
switch {
case s.overrides.Iterate != nil:
return s.overrides.Iterate(req)
case s.st != nil:
return s.st.Iterate(req)
default:
panic(fmt.Sprintf("unexpected storage call: Iterate(%+v)", req))
}
}

View file

@ -3,14 +3,17 @@ package engine
import ( import (
"errors" "errors"
"fmt" "fmt"
"io/fs"
"os" "os"
"path/filepath" "path/filepath"
"strconv" "strconv"
"sync/atomic"
"testing" "testing"
"time" "time"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/teststore"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
@ -25,92 +28,128 @@ import (
) )
// TestInitializationFailure checks that shard is initialized and closed even if media // TestInitializationFailure checks that shard is initialized and closed even if media
// under any single component is absent. We emulate this with permission denied error. // under any single component is absent.
func TestInitializationFailure(t *testing.T) { func TestInitializationFailure(t *testing.T) {
type paths struct { type openFileFunc func(string, int, fs.FileMode) (*os.File, error)
blobstor string
metabase string type testShardOpts struct {
writecache string openFileMetabase openFileFunc
pilorama string openFileWriteCache openFileFunc
openFilePilorama openFileFunc
} }
existsDir := filepath.Join(t.TempDir(), "shard") testShard := func(opts testShardOpts) ([]shard.Option, *teststore.TestStore, *teststore.TestStore) {
badDir := filepath.Join(t.TempDir(), "missing")
testShard := func(c paths) []shard.Option {
sid, err := generateShardID() sid, err := generateShardID()
require.NoError(t, err) require.NoError(t, err)
tempDir := t.TempDir()
blobstorPath := filepath.Join(tempDir, "bs")
metabasePath := filepath.Join(tempDir, "mb")
writecachePath := filepath.Join(tempDir, "wc")
piloramaPath := filepath.Join(tempDir, "pl")
storages, smallFileStorage, largeFileStorage := newTestStorages(blobstorPath, 1<<20)
return []shard.Option{ return []shard.Option{
shard.WithID(sid), shard.WithID(sid),
shard.WithLogger(&logger.Logger{Logger: zaptest.NewLogger(t)}), shard.WithLogger(&logger.Logger{Logger: zaptest.NewLogger(t)}),
shard.WithBlobStorOptions( shard.WithBlobStorOptions(
blobstor.WithStorages( blobstor.WithStorages(storages)),
newStorages(c.blobstor, 1<<20))),
shard.WithMetaBaseOptions( shard.WithMetaBaseOptions(
meta.WithBoltDBOptions(&bbolt.Options{ meta.WithBoltDBOptions(&bbolt.Options{
Timeout: 100 * time.Millisecond, Timeout: 100 * time.Millisecond,
OpenFile: opts.openFileMetabase,
}), }),
meta.WithPath(c.metabase), meta.WithPath(metabasePath),
meta.WithPermissions(0700), meta.WithPermissions(0700),
meta.WithEpochState(epochState{})), meta.WithEpochState(epochState{})),
shard.WithWriteCache(true), shard.WithWriteCache(true),
shard.WithWriteCacheOptions(writecache.WithPath(c.writecache)), shard.WithWriteCacheOptions(
shard.WithPiloramaOptions(pilorama.WithPath(c.pilorama)), writecache.WithPath(writecachePath),
} writecache.WithOpenFile(opts.openFileWriteCache),
),
shard.WithPiloramaOptions(
pilorama.WithPath(piloramaPath),
pilorama.WithOpenFile(opts.openFilePilorama),
),
}, smallFileStorage, largeFileStorage
} }
t.Run("blobstor", func(t *testing.T) { t.Run("blobstor", func(t *testing.T) {
badDir := filepath.Join(badDir, t.Name()) shardOpts, _, largeFileStorage := testShard(testShardOpts{
require.NoError(t, os.MkdirAll(badDir, os.ModePerm)) openFileMetabase: os.OpenFile,
require.NoError(t, os.Chmod(badDir, 0)) openFileWriteCache: os.OpenFile,
testEngineFailInitAndReload(t, badDir, false, testShard(paths{ openFilePilorama: os.OpenFile,
blobstor: filepath.Join(badDir, "0"), })
metabase: filepath.Join(existsDir, t.Name(), "1"), largeFileStorage.SetOption(teststore.WithOpen(func(ro bool) error {
writecache: filepath.Join(existsDir, t.Name(), "2"), return teststore.ErrDiskExploded
pilorama: filepath.Join(existsDir, t.Name(), "3"),
})) }))
beforeReload := func() {
largeFileStorage.SetOption(teststore.WithOpen(nil))
}
testEngineFailInitAndReload(t, false, shardOpts, beforeReload)
}) })
t.Run("metabase", func(t *testing.T) { t.Run("metabase", func(t *testing.T) {
badDir := filepath.Join(badDir, t.Name()) var openFileMetabaseSucceed atomic.Bool
require.NoError(t, os.MkdirAll(badDir, os.ModePerm)) openFileMetabase := func(p string, f int, mode fs.FileMode) (*os.File, error) {
require.NoError(t, os.Chmod(badDir, 0)) if openFileMetabaseSucceed.Load() {
testEngineFailInitAndReload(t, badDir, true, testShard(paths{ return os.OpenFile(p, f, mode)
blobstor: filepath.Join(existsDir, t.Name(), "0"), }
metabase: filepath.Join(badDir, "1"), return nil, teststore.ErrDiskExploded
writecache: filepath.Join(existsDir, t.Name(), "2"), }
pilorama: filepath.Join(existsDir, t.Name(), "3"), beforeReload := func() {
})) openFileMetabaseSucceed.Store(true)
}
shardOpts, _, _ := testShard(testShardOpts{
openFileMetabase: openFileMetabase,
openFileWriteCache: os.OpenFile,
openFilePilorama: os.OpenFile,
})
testEngineFailInitAndReload(t, true, shardOpts, beforeReload)
}) })
t.Run("write-cache", func(t *testing.T) { t.Run("write-cache", func(t *testing.T) {
badDir := filepath.Join(badDir, t.Name()) var openFileWriteCacheSucceed atomic.Bool
require.NoError(t, os.MkdirAll(badDir, os.ModePerm)) openFileWriteCache := func(p string, f int, mode fs.FileMode) (*os.File, error) {
require.NoError(t, os.Chmod(badDir, 0)) if openFileWriteCacheSucceed.Load() {
testEngineFailInitAndReload(t, badDir, false, testShard(paths{ return os.OpenFile(p, f, mode)
blobstor: filepath.Join(existsDir, t.Name(), "0"), }
metabase: filepath.Join(existsDir, t.Name(), "1"), return nil, teststore.ErrDiskExploded
writecache: filepath.Join(badDir, "2"), }
pilorama: filepath.Join(existsDir, t.Name(), "3"), beforeReload := func() {
})) openFileWriteCacheSucceed.Store(true)
}
shardOpts, _, _ := testShard(testShardOpts{
openFileMetabase: os.OpenFile,
openFileWriteCache: openFileWriteCache,
openFilePilorama: os.OpenFile,
})
testEngineFailInitAndReload(t, false, shardOpts, beforeReload)
}) })
t.Run("pilorama", func(t *testing.T) { t.Run("pilorama", func(t *testing.T) {
badDir := filepath.Join(badDir, t.Name()) var openFilePiloramaSucceed atomic.Bool
require.NoError(t, os.MkdirAll(badDir, os.ModePerm)) openFilePilorama := func(p string, f int, mode fs.FileMode) (*os.File, error) {
require.NoError(t, os.Chmod(badDir, 0)) if openFilePiloramaSucceed.Load() {
testEngineFailInitAndReload(t, badDir, false, testShard(paths{ return os.OpenFile(p, f, mode)
blobstor: filepath.Join(existsDir, t.Name(), "0"), }
metabase: filepath.Join(existsDir, t.Name(), "1"), return nil, teststore.ErrDiskExploded
writecache: filepath.Join(existsDir, t.Name(), "2"), }
pilorama: filepath.Join(badDir, "3"), beforeReload := func() {
})) openFilePiloramaSucceed.Store(true)
}
shardOpts, _, _ := testShard(testShardOpts{
openFileMetabase: os.OpenFile,
openFileWriteCache: os.OpenFile,
openFilePilorama: openFilePilorama,
})
testEngineFailInitAndReload(t, false, shardOpts, beforeReload)
}) })
} }
func testEngineFailInitAndReload(t *testing.T, badDir string, errOnAdd bool, s []shard.Option) { func testEngineFailInitAndReload(t *testing.T, errOnAdd bool, opts []shard.Option, beforeReload func()) {
var configID string var configID string
e := New() e := New()
_, err := e.AddShard(s...) _, err := e.AddShard(opts...)
if errOnAdd { if errOnAdd {
require.Error(t, err) require.Error(t, err)
// This branch is only taken when we cannot update shard ID in the metabase. // This branch is only taken when we cannot update shard ID in the metabase.
@ -139,9 +178,10 @@ func testEngineFailInitAndReload(t *testing.T, badDir string, errOnAdd bool, s [
e.mtx.RUnlock() e.mtx.RUnlock()
require.Equal(t, 0, shardCount) require.Equal(t, 0, shardCount)
require.NoError(t, os.Chmod(badDir, os.ModePerm)) beforeReload()
require.NoError(t, e.Reload(ReConfiguration{ require.NoError(t, e.Reload(ReConfiguration{
shards: map[string][]shard.Option{configID: s}, shards: map[string][]shard.Option{configID: opts},
})) }))
e.mtx.RLock() e.mtx.RLock()
@ -193,26 +233,28 @@ func TestPersistentShardID(t *testing.T) {
dir, err := os.MkdirTemp("", "*") dir, err := os.MkdirTemp("", "*")
require.NoError(t, err) require.NoError(t, err)
e, _, id := newEngineWithErrorThreshold(t, dir, 1) te := newEngineWithErrorThreshold(t, dir, 1)
checkShardState(t, e, id[0], 0, mode.ReadWrite) checkShardState(t, te.ng, te.shards[0].id, 0, mode.ReadWrite)
require.NoError(t, e.Close()) require.NoError(t, te.ng.Close())
e, _, newID := newEngineWithErrorThreshold(t, dir, 1) newTe := newEngineWithErrorThreshold(t, dir, 1)
require.Equal(t, id, newID) for i := 0; i < len(newTe.shards); i++ {
require.NoError(t, e.Close()) require.Equal(t, te.shards[i].id, newTe.shards[i].id)
}
require.NoError(t, newTe.ng.Close())
p1 := e.shards[id[0].String()].Shard.DumpInfo().MetaBaseInfo.Path p1 := newTe.ng.shards[te.shards[0].id.String()].Shard.DumpInfo().MetaBaseInfo.Path
p2 := e.shards[id[1].String()].Shard.DumpInfo().MetaBaseInfo.Path p2 := newTe.ng.shards[te.shards[1].id.String()].Shard.DumpInfo().MetaBaseInfo.Path
tmp := filepath.Join(dir, "tmp") tmp := filepath.Join(dir, "tmp")
require.NoError(t, os.Rename(p1, tmp)) require.NoError(t, os.Rename(p1, tmp))
require.NoError(t, os.Rename(p2, p1)) require.NoError(t, os.Rename(p2, p1))
require.NoError(t, os.Rename(tmp, p2)) require.NoError(t, os.Rename(tmp, p2))
e, _, newID = newEngineWithErrorThreshold(t, dir, 1) newTe = newEngineWithErrorThreshold(t, dir, 1)
require.Equal(t, id[1], newID[0]) require.Equal(t, te.shards[1].id, newTe.shards[0].id)
require.Equal(t, id[0], newID[1]) require.Equal(t, te.shards[0].id, newTe.shards[1].id)
require.NoError(t, e.Close()) require.NoError(t, newTe.ng.Close())
} }

View file

@ -9,6 +9,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/blobovniczatree" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/blobovniczatree"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/teststore"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
@ -115,6 +116,32 @@ func newStorages(root string, smallSize uint64) []blobstor.SubStorage {
} }
} }
func newTestStorages(root string, smallSize uint64) ([]blobstor.SubStorage, *teststore.TestStore, *teststore.TestStore) {
smallFileStorage := teststore.New(
teststore.WithSubstorage(blobovniczatree.NewBlobovniczaTree(
blobovniczatree.WithRootPath(filepath.Join(root, "blobovnicza")),
blobovniczatree.WithBlobovniczaShallowDepth(1),
blobovniczatree.WithBlobovniczaShallowWidth(1),
blobovniczatree.WithPermissions(0700)),
))
largeFileStorage := teststore.New(
teststore.WithSubstorage(fstree.New(
fstree.WithPath(root),
fstree.WithDepth(1)),
))
return []blobstor.SubStorage{
{
Storage: smallFileStorage,
Policy: func(_ *object.Object, data []byte) bool {
return uint64(len(data)) < smallSize
},
},
{
Storage: largeFileStorage,
},
}, smallFileStorage, largeFileStorage
}
func testNewShard(t testing.TB, id int) *shard.Shard { func testNewShard(t testing.TB, id int) *shard.Shard {
sid, err := generateShardID() sid, err := generateShardID()
require.NoError(t, err) require.NoError(t, err)

View file

@ -9,6 +9,8 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/teststore"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
@ -24,7 +26,19 @@ import (
const errSmallSize = 256 const errSmallSize = 256
func newEngineWithErrorThreshold(t testing.TB, dir string, errThreshold uint32) (*StorageEngine, string, [2]*shard.ID) { type testEngine struct {
ng *StorageEngine
dir string
shards [2]*testShard
}
type testShard struct {
id *shard.ID
smallFileStorage *teststore.TestStore
largeFileStorage *teststore.TestStore
}
func newEngineWithErrorThreshold(t testing.TB, dir string, errThreshold uint32) *testEngine {
if dir == "" { if dir == "" {
var err error var err error
@ -38,14 +52,13 @@ func newEngineWithErrorThreshold(t testing.TB, dir string, errThreshold uint32)
WithShardPoolSize(1), WithShardPoolSize(1),
WithErrorThreshold(errThreshold)) WithErrorThreshold(errThreshold))
var ids [2]*shard.ID var testShards [2]*testShard
var err error
for i := range ids { for i := range testShards {
ids[i], err = e.AddShard( storages, smallFileStorage, largeFileStorage := newTestStorages(filepath.Join(dir, strconv.Itoa(i)), errSmallSize)
id, err := e.AddShard(
shard.WithLogger(&logger.Logger{Logger: zaptest.NewLogger(t)}), shard.WithLogger(&logger.Logger{Logger: zaptest.NewLogger(t)}),
shard.WithBlobStorOptions( shard.WithBlobStorOptions(blobstor.WithStorages(storages)),
blobstor.WithStorages(newStorages(filepath.Join(dir, strconv.Itoa(i)), errSmallSize))),
shard.WithMetaBaseOptions( shard.WithMetaBaseOptions(
meta.WithPath(filepath.Join(dir, fmt.Sprintf("%d.metabase", i))), meta.WithPath(filepath.Join(dir, fmt.Sprintf("%d.metabase", i))),
meta.WithPermissions(0700), meta.WithPermissions(0700),
@ -55,94 +68,111 @@ func newEngineWithErrorThreshold(t testing.TB, dir string, errThreshold uint32)
pilorama.WithPath(filepath.Join(dir, fmt.Sprintf("%d.pilorama", i))), pilorama.WithPath(filepath.Join(dir, fmt.Sprintf("%d.pilorama", i))),
pilorama.WithPerm(0700))) pilorama.WithPerm(0700)))
require.NoError(t, err) require.NoError(t, err)
testShards[i] = &testShard{
id: id,
smallFileStorage: smallFileStorage,
largeFileStorage: largeFileStorage,
}
} }
require.NoError(t, e.Open()) require.NoError(t, e.Open())
require.NoError(t, e.Init()) require.NoError(t, e.Init())
return e, dir, ids return &testEngine{
ng: e,
dir: dir,
shards: testShards,
}
} }
func TestErrorReporting(t *testing.T) { func TestErrorReporting(t *testing.T) {
t.Run("ignore errors by default", func(t *testing.T) { t.Run("ignore errors by default", func(t *testing.T) {
e, dir, id := newEngineWithErrorThreshold(t, "", 0) te := newEngineWithErrorThreshold(t, "", 0)
obj := testutil.GenerateObjectWithCID(cidtest.ID()) obj := testutil.GenerateObjectWithCID(cidtest.ID())
obj.SetPayload(make([]byte, errSmallSize)) obj.SetPayload(make([]byte, errSmallSize))
var prm shard.PutPrm var prm shard.PutPrm
prm.SetObject(obj) prm.SetObject(obj)
e.mtx.RLock() te.ng.mtx.RLock()
_, err := e.shards[id[0].String()].Shard.Put(prm) _, err := te.ng.shards[te.shards[0].id.String()].Shard.Put(prm)
e.mtx.RUnlock() te.ng.mtx.RUnlock()
require.NoError(t, err) require.NoError(t, err)
_, err = e.Get(GetPrm{addr: object.AddressOf(obj)}) _, err = te.ng.Get(GetPrm{addr: object.AddressOf(obj)})
require.NoError(t, err) require.NoError(t, err)
checkShardState(t, e, id[0], 0, mode.ReadWrite) checkShardState(t, te.ng, te.shards[0].id, 0, mode.ReadWrite)
checkShardState(t, e, id[1], 0, mode.ReadWrite) checkShardState(t, te.ng, te.shards[1].id, 0, mode.ReadWrite)
corruptSubDir(t, filepath.Join(dir, "0")) for _, shard := range te.shards {
shard.largeFileStorage.SetOption(teststore.WithGet(func(common.GetPrm) (common.GetRes, error) {
return common.GetRes{}, teststore.ErrDiskExploded
}))
}
for i := uint32(1); i < 3; i++ { for i := uint32(1); i < 3; i++ {
_, err = e.Get(GetPrm{addr: object.AddressOf(obj)}) _, err = te.ng.Get(GetPrm{addr: object.AddressOf(obj)})
require.Error(t, err) require.Error(t, err)
checkShardState(t, e, id[0], i, mode.ReadWrite) checkShardState(t, te.ng, te.shards[0].id, i, mode.ReadWrite)
checkShardState(t, e, id[1], 0, mode.ReadWrite) checkShardState(t, te.ng, te.shards[1].id, 0, mode.ReadWrite)
} }
}) })
t.Run("with error threshold", func(t *testing.T) { t.Run("with error threshold", func(t *testing.T) {
const errThreshold = 3 const errThreshold = 3
e, dir, id := newEngineWithErrorThreshold(t, "", errThreshold) te := newEngineWithErrorThreshold(t, "", errThreshold)
obj := testutil.GenerateObjectWithCID(cidtest.ID()) obj := testutil.GenerateObjectWithCID(cidtest.ID())
obj.SetPayload(make([]byte, errSmallSize)) obj.SetPayload(make([]byte, errSmallSize))
var prm shard.PutPrm var prm shard.PutPrm
prm.SetObject(obj) prm.SetObject(obj)
e.mtx.RLock() te.ng.mtx.RLock()
_, err := e.shards[id[0].String()].Put(prm) _, err := te.ng.shards[te.shards[0].id.String()].Put(prm)
e.mtx.RUnlock() te.ng.mtx.RUnlock()
require.NoError(t, err) require.NoError(t, err)
_, err = e.Get(GetPrm{addr: object.AddressOf(obj)}) _, err = te.ng.Get(GetPrm{addr: object.AddressOf(obj)})
require.NoError(t, err) require.NoError(t, err)
checkShardState(t, e, id[0], 0, mode.ReadWrite) checkShardState(t, te.ng, te.shards[0].id, 0, mode.ReadWrite)
checkShardState(t, e, id[1], 0, mode.ReadWrite) checkShardState(t, te.ng, te.shards[1].id, 0, mode.ReadWrite)
corruptSubDir(t, filepath.Join(dir, "0")) for _, shard := range te.shards {
shard.largeFileStorage.SetOption(teststore.WithGet(func(common.GetPrm) (common.GetRes, error) {
return common.GetRes{}, teststore.ErrDiskExploded
}))
}
for i := uint32(1); i < errThreshold; i++ { for i := uint32(1); i < errThreshold; i++ {
_, err = e.Get(GetPrm{addr: object.AddressOf(obj)}) _, err = te.ng.Get(GetPrm{addr: object.AddressOf(obj)})
require.Error(t, err) require.Error(t, err)
checkShardState(t, e, id[0], i, mode.ReadWrite) checkShardState(t, te.ng, te.shards[0].id, i, mode.ReadWrite)
checkShardState(t, e, id[1], 0, mode.ReadWrite) checkShardState(t, te.ng, te.shards[1].id, 0, mode.ReadWrite)
} }
for i := uint32(0); i < 2; i++ { for i := uint32(0); i < 2; i++ {
_, err = e.Get(GetPrm{addr: object.AddressOf(obj)}) _, err = te.ng.Get(GetPrm{addr: object.AddressOf(obj)})
require.Error(t, err) require.Error(t, err)
checkShardState(t, e, id[0], errThreshold+i, mode.DegradedReadOnly) checkShardState(t, te.ng, te.shards[0].id, errThreshold+i, mode.DegradedReadOnly)
checkShardState(t, e, id[1], 0, mode.ReadWrite) checkShardState(t, te.ng, te.shards[1].id, 0, mode.ReadWrite)
} }
require.NoError(t, e.SetShardMode(id[0], mode.ReadWrite, false)) require.NoError(t, te.ng.SetShardMode(te.shards[0].id, mode.ReadWrite, false))
checkShardState(t, e, id[0], errThreshold+1, mode.ReadWrite) checkShardState(t, te.ng, te.shards[0].id, errThreshold+1, mode.ReadWrite)
require.NoError(t, e.SetShardMode(id[0], mode.ReadWrite, true)) require.NoError(t, te.ng.SetShardMode(te.shards[0].id, mode.ReadWrite, true))
checkShardState(t, e, id[0], 0, mode.ReadWrite) checkShardState(t, te.ng, te.shards[0].id, 0, mode.ReadWrite)
}) })
} }
// Issue #1186.
func TestBlobstorFailback(t *testing.T) { func TestBlobstorFailback(t *testing.T) {
dir, err := os.MkdirTemp("", "*") dir, err := os.MkdirTemp("", "*")
require.NoError(t, err) require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, os.RemoveAll(dir)) }) t.Cleanup(func() { require.NoError(t, os.RemoveAll(dir)) })
e, _, id := newEngineWithErrorThreshold(t, dir, 1) te := newEngineWithErrorThreshold(t, dir, 1)
objs := make([]*objectSDK.Object, 0, 2) objs := make([]*objectSDK.Object, 0, 2)
for _, size := range []int{15, errSmallSize + 1} { for _, size := range []int{15, errSmallSize + 1} {
@ -151,49 +181,49 @@ func TestBlobstorFailback(t *testing.T) {
var prm shard.PutPrm var prm shard.PutPrm
prm.SetObject(obj) prm.SetObject(obj)
e.mtx.RLock() te.ng.mtx.RLock()
_, err = e.shards[id[0].String()].Shard.Put(prm) _, err = te.ng.shards[te.shards[0].id.String()].Shard.Put(prm)
e.mtx.RUnlock() te.ng.mtx.RUnlock()
require.NoError(t, err) require.NoError(t, err)
objs = append(objs, obj) objs = append(objs, obj)
} }
for i := range objs { for i := range objs {
addr := object.AddressOf(objs[i]) addr := object.AddressOf(objs[i])
_, err = e.Get(GetPrm{addr: addr}) _, err = te.ng.Get(GetPrm{addr: addr})
require.NoError(t, err) require.NoError(t, err)
_, err = e.GetRange(RngPrm{addr: addr}) _, err = te.ng.GetRange(RngPrm{addr: addr})
require.NoError(t, err) require.NoError(t, err)
} }
checkShardState(t, e, id[0], 0, mode.ReadWrite) checkShardState(t, te.ng, te.shards[0].id, 0, mode.ReadWrite)
require.NoError(t, e.Close()) require.NoError(t, te.ng.Close())
p1 := e.shards[id[0].String()].Shard.DumpInfo().BlobStorInfo.SubStorages[1].Path p1 := te.ng.shards[te.shards[0].id.String()].Shard.DumpInfo().BlobStorInfo.SubStorages[1].Path
p2 := e.shards[id[1].String()].Shard.DumpInfo().BlobStorInfo.SubStorages[1].Path p2 := te.ng.shards[te.shards[1].id.String()].Shard.DumpInfo().BlobStorInfo.SubStorages[1].Path
tmp := filepath.Join(dir, "tmp") tmp := filepath.Join(dir, "tmp")
require.NoError(t, os.Rename(p1, tmp)) require.NoError(t, os.Rename(p1, tmp))
require.NoError(t, os.Rename(p2, p1)) require.NoError(t, os.Rename(p2, p1))
require.NoError(t, os.Rename(tmp, p2)) require.NoError(t, os.Rename(tmp, p2))
e, _, id = newEngineWithErrorThreshold(t, dir, 1) te = newEngineWithErrorThreshold(t, dir, 1)
for i := range objs { for i := range objs {
addr := object.AddressOf(objs[i]) addr := object.AddressOf(objs[i])
getRes, err := e.Get(GetPrm{addr: addr}) getRes, err := te.ng.Get(GetPrm{addr: addr})
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, objs[i], getRes.Object()) require.Equal(t, objs[i], getRes.Object())
rngRes, err := e.GetRange(RngPrm{addr: addr, off: 1, ln: 10}) rngRes, err := te.ng.GetRange(RngPrm{addr: addr, off: 1, ln: 10})
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, objs[i].Payload()[1:11], rngRes.Object().Payload()) require.Equal(t, objs[i].Payload()[1:11], rngRes.Object().Payload())
_, err = e.GetRange(RngPrm{addr: addr, off: errSmallSize + 10, ln: 1}) _, err = te.ng.GetRange(RngPrm{addr: addr, off: errSmallSize + 10, ln: 1})
require.ErrorAs(t, err, &apistatus.ObjectOutOfRange{}) require.ErrorAs(t, err, &apistatus.ObjectOutOfRange{})
} }
checkShardState(t, e, id[0], 1, mode.DegradedReadOnly) checkShardState(t, te.ng, te.shards[0].id, 1, mode.DegradedReadOnly)
checkShardState(t, e, id[1], 0, mode.ReadWrite) checkShardState(t, te.ng, te.shards[1].id, 0, mode.ReadWrite)
} }
func checkShardState(t *testing.T, e *StorageEngine, id *shard.ID, errCount uint32, mode mode.Mode) { func checkShardState(t *testing.T, e *StorageEngine, id *shard.ID, errCount uint32, mode mode.Mode) {
@ -204,19 +234,3 @@ func checkShardState(t *testing.T, e *StorageEngine, id *shard.ID, errCount uint
require.Equal(t, errCount, sh.errorCount.Load()) require.Equal(t, errCount, sh.errorCount.Load())
require.Equal(t, mode, sh.GetMode()) require.Equal(t, mode, sh.GetMode())
} }
// corruptSubDir makes random directory except "blobovnicza" in blobstor FSTree unreadable.
func corruptSubDir(t *testing.T, dir string) {
de, err := os.ReadDir(dir)
require.NoError(t, err)
// FIXME(@cthulhu-rider): copy-paste of unexported const from blobstor package, see #1407
const dirBlobovnicza = "blobovnicza"
for i := range de {
if de[i].IsDir() && de[i].Name() != dirBlobovnicza {
require.NoError(t, os.Chmod(filepath.Join(dir, de[i].Name()), 0))
return
}
}
}

View file

@ -17,7 +17,6 @@ import (
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test" cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
objecttest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test" oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
"github.com/panjf2000/ants/v2" "github.com/panjf2000/ants/v2"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
@ -191,7 +190,7 @@ func TestLockExpiration(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
var inhumePrm InhumePrm var inhumePrm InhumePrm
inhumePrm.WithTarget(objecttest.Address(), objectcore.AddressOf(obj)) inhumePrm.WithTarget(oidtest.Address(), objectcore.AddressOf(obj))
_, err = e.Inhume(inhumePrm) _, err = e.Inhume(inhumePrm)
require.ErrorAs(t, err, new(apistatus.ObjectLocked)) require.ErrorAs(t, err, new(apistatus.ObjectLocked))
@ -204,7 +203,7 @@ func TestLockExpiration(t *testing.T) {
time.Sleep(time.Second) time.Sleep(time.Second)
// 4. // 4.
inhumePrm.WithTarget(objecttest.Address(), objectcore.AddressOf(obj)) inhumePrm.WithTarget(oidtest.Address(), objectcore.AddressOf(obj))
_, err = e.Inhume(inhumePrm) _, err = e.Inhume(inhumePrm)
require.NoError(t, err) require.NoError(t, err)
@ -263,7 +262,7 @@ func TestLockForceRemoval(t *testing.T) {
_, err = e.Inhume(inhumePrm) _, err = e.Inhume(inhumePrm)
require.ErrorAs(t, err, new(apistatus.ObjectLocked)) require.ErrorAs(t, err, new(apistatus.ObjectLocked))
inhumePrm.WithTarget(objecttest.Address(), objectcore.AddressOf(obj)) inhumePrm.WithTarget(oidtest.Address(), objectcore.AddressOf(obj))
_, err = e.Inhume(inhumePrm) _, err = e.Inhume(inhumePrm)
require.ErrorAs(t, err, new(apistatus.ObjectLocked)) require.ErrorAs(t, err, new(apistatus.ObjectLocked))

View file

@ -23,7 +23,7 @@ func BenchmarkTreeVsSearch(b *testing.B) {
} }
func benchmarkTreeVsSearch(b *testing.B, objCount int) { func benchmarkTreeVsSearch(b *testing.B, objCount int) {
e, _, _ := newEngineWithErrorThreshold(b, "", 0) te := newEngineWithErrorThreshold(b, "", 0)
cid := cidtest.ID() cid := cidtest.ID()
d := pilorama.CIDDescriptor{CID: cid, Position: 0, Size: 1} d := pilorama.CIDDescriptor{CID: cid, Position: 0, Size: 1}
treeID := "someTree" treeID := "someTree"
@ -31,11 +31,11 @@ func benchmarkTreeVsSearch(b *testing.B, objCount int) {
for i := 0; i < objCount; i++ { for i := 0; i < objCount; i++ {
obj := testutil.GenerateObjectWithCID(cid) obj := testutil.GenerateObjectWithCID(cid)
testutil.AddAttribute(obj, pilorama.AttributeFilename, strconv.Itoa(i)) testutil.AddAttribute(obj, pilorama.AttributeFilename, strconv.Itoa(i))
err := Put(e, obj) err := Put(te.ng, obj)
if err != nil { if err != nil {
b.Fatal(err) b.Fatal(err)
} }
_, err = e.TreeAddByPath(d, treeID, pilorama.AttributeFilename, nil, _, err = te.ng.TreeAddByPath(d, treeID, pilorama.AttributeFilename, nil,
[]pilorama.KeyValue{{pilorama.AttributeFilename, []byte(strconv.Itoa(i))}}) []pilorama.KeyValue{{pilorama.AttributeFilename, []byte(strconv.Itoa(i))}})
if err != nil { if err != nil {
b.Fatal(err) b.Fatal(err)
@ -51,7 +51,7 @@ func benchmarkTreeVsSearch(b *testing.B, objCount int) {
prm.WithFilters(fs) prm.WithFilters(fs)
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
res, err := e.Select(prm) res, err := te.ng.Select(prm)
if err != nil { if err != nil {
b.Fatal(err) b.Fatal(err)
} }
@ -62,7 +62,7 @@ func benchmarkTreeVsSearch(b *testing.B, objCount int) {
}) })
b.Run("TreeGetByPath", func(b *testing.B) { b.Run("TreeGetByPath", func(b *testing.B) {
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
nodes, err := e.TreeGetByPath(cid, treeID, pilorama.AttributeFilename, []string{strconv.Itoa(objCount / 2)}, true) nodes, err := te.ng.TreeGetByPath(cid, treeID, pilorama.AttributeFilename, []string{strconv.Itoa(objCount / 2)}, true)
if err != nil { if err != nil {
b.Fatal(err) b.Fatal(err)
} }

View file

@ -66,6 +66,7 @@ func NewBoltForest(opts ...Option) ForestStorage {
perm: os.ModePerm, perm: os.ModePerm,
maxBatchDelay: bbolt.DefaultMaxBatchDelay, maxBatchDelay: bbolt.DefaultMaxBatchDelay,
maxBatchSize: bbolt.DefaultMaxBatchSize, maxBatchSize: bbolt.DefaultMaxBatchSize,
openFile: os.OpenFile,
}, },
} }
@ -107,6 +108,7 @@ func (t *boltForest) Open(readOnly bool) error {
opts.ReadOnly = readOnly opts.ReadOnly = readOnly
opts.NoSync = t.noSync opts.NoSync = t.noSync
opts.Timeout = 100 * time.Millisecond opts.Timeout = 100 * time.Millisecond
opts.OpenFile = t.openFile
t.db, err = bbolt.Open(t.path, t.perm, &opts) t.db, err = bbolt.Open(t.path, t.perm, &opts)
if err != nil { if err != nil {

View file

@ -2,6 +2,7 @@ package pilorama
import ( import (
"io/fs" "io/fs"
"os"
"time" "time"
) )
@ -13,6 +14,7 @@ type cfg struct {
noSync bool noSync bool
maxBatchDelay time.Duration maxBatchDelay time.Duration
maxBatchSize int maxBatchSize int
openFile func(string, int, fs.FileMode) (*os.File, error)
} }
func WithPath(path string) Option { func WithPath(path string) Option {
@ -44,3 +46,9 @@ func WithMaxBatchSize(size int) Option {
c.maxBatchSize = size c.maxBatchSize = size
} }
} }
func WithOpenFile(openFile func(string, int, fs.FileMode) (*os.File, error)) Option {
return func(c *cfg) {
c.openFile = openFile
}
}

View file

@ -1,14 +1,18 @@
package shard package shard
import ( import (
"io/fs"
"math"
"os" "os"
"path/filepath" "path/filepath"
"sync/atomic"
"testing" "testing"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/teststore"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
@ -22,6 +26,7 @@ import (
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test" oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
objecttest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/test" objecttest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/test"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"go.etcd.io/bbolt"
"go.uber.org/zap/zaptest" "go.uber.org/zap/zaptest"
) )
@ -40,19 +45,33 @@ func TestShardOpen(t *testing.T) {
dir := t.TempDir() dir := t.TempDir()
metaPath := filepath.Join(dir, "meta") metaPath := filepath.Join(dir, "meta")
st := teststore.New(teststore.WithSubstorage(fstree.New(
fstree.WithDirNameLen(2),
fstree.WithPath(filepath.Join(dir, "blob")),
fstree.WithDepth(1)),
))
var allowedMode atomic.Int64
openFileMetabase := func(p string, f int, perm fs.FileMode) (*os.File, error) {
const modeMask = os.O_RDONLY | os.O_RDWR | os.O_WRONLY
if int64(f&modeMask) == allowedMode.Load() {
return os.OpenFile(p, f, perm)
}
return nil, fs.ErrPermission
}
newShard := func() *Shard { newShard := func() *Shard {
return New( return New(
WithLogger(&logger.Logger{Logger: zaptest.NewLogger(t)}), WithLogger(&logger.Logger{Logger: zaptest.NewLogger(t)}),
WithBlobStorOptions( WithBlobStorOptions(
blobstor.WithStorages([]blobstor.SubStorage{ blobstor.WithStorages([]blobstor.SubStorage{
{ {Storage: st},
Storage: fstree.New(
fstree.WithDirNameLen(2),
fstree.WithPath(filepath.Join(dir, "blob")),
fstree.WithDepth(1)),
},
})), })),
WithMetaBaseOptions(meta.WithPath(metaPath), meta.WithEpochState(epochState{})), WithMetaBaseOptions(
meta.WithPath(metaPath),
meta.WithEpochState(epochState{}),
meta.WithBoltDBOptions(&bbolt.Options{OpenFile: openFileMetabase}),
),
WithPiloramaOptions( WithPiloramaOptions(
pilorama.WithPath(filepath.Join(dir, "pilorama"))), pilorama.WithPath(filepath.Join(dir, "pilorama"))),
WithWriteCache(true), WithWriteCache(true),
@ -60,6 +79,8 @@ func TestShardOpen(t *testing.T) {
writecache.WithPath(filepath.Join(dir, "wc")))) writecache.WithPath(filepath.Join(dir, "wc"))))
} }
allowedMode.Store(int64(os.O_RDWR))
sh := newShard() sh := newShard()
require.NoError(t, sh.Open()) require.NoError(t, sh.Open())
require.NoError(t, sh.Init()) require.NoError(t, sh.Init())
@ -67,7 +88,8 @@ func TestShardOpen(t *testing.T) {
require.NoError(t, sh.Close()) require.NoError(t, sh.Close())
// Metabase can be opened in read-only => start in ReadOnly mode. // Metabase can be opened in read-only => start in ReadOnly mode.
require.NoError(t, os.Chmod(metaPath, 0444)) allowedMode.Store(int64(os.O_RDONLY))
sh = newShard() sh = newShard()
require.NoError(t, sh.Open()) require.NoError(t, sh.Open())
require.NoError(t, sh.Init()) require.NoError(t, sh.Init())
@ -77,7 +99,8 @@ func TestShardOpen(t *testing.T) {
require.NoError(t, sh.Close()) require.NoError(t, sh.Close())
// Metabase is corrupted => start in DegradedReadOnly mode. // Metabase is corrupted => start in DegradedReadOnly mode.
require.NoError(t, os.Chmod(metaPath, 0000)) allowedMode.Store(math.MaxInt64)
sh = newShard() sh = newShard()
require.NoError(t, sh.Open()) require.NoError(t, sh.Open())
require.NoError(t, sh.Init()) require.NoError(t, sh.Init())

View file

@ -46,12 +46,13 @@ func TestFlush(t *testing.T) {
require.NoError(t, mb.Open(false)) require.NoError(t, mb.Open(false))
require.NoError(t, mb.Init()) require.NoError(t, mb.Init())
fsTree := fstree.New(
fstree.WithPath(filepath.Join(dir, "blob")),
fstree.WithDepth(0),
fstree.WithDirNameLen(1))
bs := blobstor.New(blobstor.WithStorages([]blobstor.SubStorage{ bs := blobstor.New(blobstor.WithStorages([]blobstor.SubStorage{
{Storage: fsTree}, {
Storage: fstree.New(
fstree.WithPath(filepath.Join(dir, "blob")),
fstree.WithDepth(0),
fstree.WithDirNameLen(1)),
},
})) }))
require.NoError(t, bs.Open(false)) require.NoError(t, bs.Open(false))
require.NoError(t, bs.Init()) require.NoError(t, bs.Init())
@ -208,7 +209,7 @@ func TestFlush(t *testing.T) {
_, err = os.Stat(p) // sanity check _, err = os.Stat(p) // sanity check
require.NoError(t, err) require.NoError(t, err)
require.NoError(t, os.Chmod(p, 0)) require.NoError(t, os.Truncate(p, 0)) // corrupt the file contents, so that it can't be unmarshalled
}) })
}) })
t.Run("fs, invalid object", func(t *testing.T) { t.Run("fs, invalid object", func(t *testing.T) {

View file

@ -1,6 +1,8 @@
package writecache package writecache
import ( import (
"io/fs"
"os"
"time" "time"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
@ -55,6 +57,8 @@ type options struct {
noSync bool noSync bool
// reportError is the function called when encountering disk errors in background workers. // reportError is the function called when encountering disk errors in background workers.
reportError func(string, error) reportError func(string, error)
// openFile is the function called internally by bbolt to open database files. Useful for hermetic testing.
openFile func(string, int, fs.FileMode) (*os.File, error)
} }
// WithLogger sets logger. // WithLogger sets logger.
@ -152,3 +156,10 @@ func WithReportErrorFunc(f func(string, error)) Option {
o.reportError = f o.reportError = f
} }
} }
// WithOpenFile sets the OpenFile function to use internally by bolt. Useful for hermetic testing.
func WithOpenFile(f func(string, int, fs.FileMode) (*os.File, error)) Option {
return func(o *options) {
o.openFile = f
}
}

View file

@ -43,7 +43,7 @@ func (c *cache) openStore(readOnly bool) error {
return err return err
} }
c.db, err = OpenDB(c.path, readOnly) c.db, err = OpenDB(c.path, readOnly, c.openFile)
if err != nil { if err != nil {
return fmt.Errorf("could not open database: %w", err) return fmt.Errorf("could not open database: %w", err)
} }

View file

@ -1,6 +1,7 @@
package writecache package writecache
import ( import (
"io/fs"
"os" "os"
"path/filepath" "path/filepath"
"time" "time"
@ -9,10 +10,11 @@ import (
) )
// OpenDB opens BoltDB instance for write-cache. Opens in read-only mode if ro is true. // OpenDB opens BoltDB instance for write-cache. Opens in read-only mode if ro is true.
func OpenDB(p string, ro bool) (*bbolt.DB, error) { func OpenDB(p string, ro bool, openFile func(string, int, fs.FileMode) (*os.File, error)) (*bbolt.DB, error) {
return bbolt.Open(filepath.Join(p, dbName), os.ModePerm, &bbolt.Options{ return bbolt.Open(filepath.Join(p, dbName), os.ModePerm, &bbolt.Options{
NoFreelistSync: true, NoFreelistSync: true,
ReadOnly: ro, ReadOnly: ro,
Timeout: 100 * time.Millisecond, Timeout: 100 * time.Millisecond,
OpenFile: openFile,
}) })
} }

View file

@ -1,6 +1,7 @@
package writecache package writecache
import ( import (
"os"
"sync" "sync"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
@ -106,6 +107,7 @@ func New(opts ...Option) Cache {
maxCacheSize: defaultMaxCacheSize, maxCacheSize: defaultMaxCacheSize,
maxBatchSize: bbolt.DefaultMaxBatchSize, maxBatchSize: bbolt.DefaultMaxBatchSize,
maxBatchDelay: bbolt.DefaultMaxBatchDelay, maxBatchDelay: bbolt.DefaultMaxBatchDelay,
openFile: os.OpenFile,
}, },
} }