[#139] test: Add test storage implementation
All checks were successful
ci/woodpecker/pr/pre-commit Pipeline was successful

This aims to reduce the usage of chmod hackery to induce or simulate
OS-related failures.

Signed-off-by: Alejandro Lopez <a.lopez@yadro.com>
This commit is contained in:
Alejandro Lopez 2023-03-21 13:38:44 +03:00
parent da8da1c63a
commit 3fb064649f
20 changed files with 502 additions and 206 deletions

View file

@ -1,6 +1,8 @@
package writecache
import (
"os"
common "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-lens/internal"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache"
"github.com/spf13/cobra"
@ -24,7 +26,7 @@ func init() {
}
func openWC(cmd *cobra.Command) *bbolt.DB {
db, err := writecache.OpenDB(vPath, true)
db, err := writecache.OpenDB(vPath, true, os.OpenFile)
common.ExitOnErr(cmd, common.Errf("could not open write-cache db: %w", err))
return db

3
go.mod
View file

@ -29,7 +29,7 @@ require (
github.com/spf13/cobra v1.6.1
github.com/spf13/pflag v1.0.5
github.com/spf13/viper v1.14.0
github.com/stretchr/testify v1.8.1
github.com/stretchr/testify v1.8.2
go.etcd.io/bbolt v1.3.6
go.uber.org/atomic v1.10.0
go.uber.org/zap v1.24.0
@ -87,6 +87,7 @@ require (
github.com/spaolacci/murmur3 v1.1.0 // indirect
github.com/spf13/afero v1.9.2 // indirect
github.com/spf13/jwalterweatherman v1.1.0 // indirect
github.com/stretchr/objx v0.5.0 // indirect
github.com/subosito/gotenv v1.4.1 // indirect
github.com/syndtr/goleveldb v1.0.1-0.20210305035536-64b5b1c73954 // indirect
github.com/twmb/murmur3 v1.1.5 // indirect

BIN
go.sum

Binary file not shown.

View file

@ -50,7 +50,6 @@ func TestExistsInvalidStorageID(t *testing.T) {
})
t.Run("invalid storage id", func(t *testing.T) {
// "0/X/Y" <-> "1/X/Y"
storageID := slice.Copy(putRes.StorageID)
storageID[0] = '9'
badDir := filepath.Join(dir, "9")

View file

@ -1,7 +1,6 @@
package blobstor
import (
"os"
"path/filepath"
"testing"
@ -9,32 +8,37 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/blobovniczatree"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/teststore"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
"github.com/stretchr/testify/require"
)
const blobovniczaDir = "blobovniczas"
func defaultStorages(p string, smallSizeLimit uint64) []SubStorage {
func defaultTestStorages(p string, smallSizeLimit uint64) ([]SubStorage, *teststore.TestStore, *teststore.TestStore) {
smallFileStorage := teststore.NewFromStorage(blobovniczatree.NewBlobovniczaTree(
blobovniczatree.WithRootPath(filepath.Join(p, "blobovniczas")),
blobovniczatree.WithBlobovniczaShallowWidth(1)), // default width is 16, slow init
)
largeFileStorage := teststore.NewFromStorage(fstree.New(fstree.WithPath(p)))
return []SubStorage{
{
Storage: blobovniczatree.NewBlobovniczaTree(
blobovniczatree.WithRootPath(filepath.Join(p, "blobovniczas")),
blobovniczatree.WithBlobovniczaShallowWidth(1)), // default width is 16, slow init
Storage: smallFileStorage,
Policy: func(_ *objectSDK.Object, data []byte) bool {
return uint64(len(data)) <= smallSizeLimit
},
},
{
Storage: fstree.New(fstree.WithPath(p)),
Storage: largeFileStorage,
},
}
}, smallFileStorage, largeFileStorage
}
func defaultStorages(p string, smallSizeLimit uint64) []SubStorage {
storages, _, _ := defaultTestStorages(p, smallSizeLimit)
return storages
}
func TestCompression(t *testing.T) {
dir, err := os.MkdirTemp("", "frostfs*")
require.NoError(t, err)
t.Cleanup(func() { _ = os.RemoveAll(dir) })
dir := t.TempDir()
const (
smallSizeLimit = 512
@ -70,7 +74,7 @@ func TestCompression(t *testing.T) {
testPut := func(t *testing.T, b *BlobStor, i int) {
var prm common.PutPrm
prm.Object = smallObj[i]
_, err = b.Put(prm)
_, err := b.Put(prm)
require.NoError(t, err)
prm = common.PutPrm{}
@ -102,9 +106,7 @@ func TestCompression(t *testing.T) {
func TestBlobstor_needsCompression(t *testing.T) {
const smallSizeLimit = 512
newBlobStor := func(t *testing.T, compress bool, ct ...string) *BlobStor {
dir, err := os.MkdirTemp("", "frostfs*")
require.NoError(t, err)
t.Cleanup(func() { _ = os.RemoveAll(dir) })
dir := t.TempDir()
bs := New(
WithCompressObjects(compress),

View file

@ -2,14 +2,15 @@ package blobstor
import (
"os"
"path/filepath"
"testing"
objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/teststore"
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
)
@ -20,8 +21,10 @@ func TestExists(t *testing.T) {
const smallSizeLimit = 512
b := New(
WithStorages(defaultStorages(dir, smallSizeLimit)))
storages, _, largeFileStorage := defaultTestStorages(dir, smallSizeLimit)
b := New(WithStorages(storages))
require.NoError(t, b.Open(false))
require.NoError(t, b.Init())
@ -33,7 +36,7 @@ func TestExists(t *testing.T) {
for i := range objects {
var prm common.PutPrm
prm.Object = objects[i]
_, err = b.Put(prm)
_, err := b.Put(prm)
require.NoError(t, err)
}
@ -51,20 +54,9 @@ func TestExists(t *testing.T) {
require.NoError(t, err)
require.False(t, res.Exists)
t.Run("corrupt direcrory", func(t *testing.T) {
var bigDir string
de, err := os.ReadDir(dir)
require.NoError(t, err)
for i := range de {
if de[i].Name() != blobovniczaDir {
bigDir = filepath.Join(dir, de[i].Name())
break
}
}
require.NotEmpty(t, bigDir)
require.NoError(t, os.Chmod(dir, 0))
t.Cleanup(func() { require.NoError(t, os.Chmod(dir, 0777)) })
t.Run("corrupt directory", func(t *testing.T) {
largeFileStorage.ExistsPassthrough.Store(false)
largeFileStorage.On("Exists", mock.Anything).Return(common.ExistsRes{}, teststore.ErrDiskExploded)
// Object exists, first error is logged.
prm.Address = objectCore.AddressOf(objects[0])
@ -76,6 +68,7 @@ func TestExists(t *testing.T) {
prm.Address = objectCore.AddressOf(objects[1])
_, err = b.Exists(prm)
require.Error(t, err)
require.ErrorIs(t, err, teststore.ErrDiskExploded)
})
}

View file

@ -1,23 +1,16 @@
package blobstor
import (
"os"
"path/filepath"
"strconv"
"testing"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/storagetest"
)
func TestGeneric(t *testing.T) {
defer func() { _ = os.RemoveAll(t.Name()) }()
var n int
newMetabase := func(t *testing.T) storagetest.Component {
n++
dir := filepath.Join(t.Name(), strconv.Itoa(n))
return New(
WithStorages(defaultStorages(dir, 128)))
WithStorages(defaultStorages(t.TempDir(), 128)))
}
storagetest.TestAll(t, newMetabase)

View file

@ -0,0 +1,178 @@
// Package teststore provides a common.Storage implementation for testing/mocking purposes.
//
// A new teststore.TestStore can be obtained in two ways:
// 1. Using teststore.New, a new instance is returned which doesn't pass-through any calls.
// Thus, if any of its methods is called without being mocked beforehand, the test will
// fail. This is useful for small tests that integrate with common.Storage implementations
// and need to expect a couple of method calls only.
// 2. Using teststore.NewFromStorage wraps an existing common.Storage implementation which
// by default will pass-though all calls to the underlying storage. Individual pass-through
// can be disabled and mocked for individual methods. This is useful for larger tests that
// integrate with common.Storage implementations where a few selected calls need to be mocked
// for e.g. test expectations or injecting failures.
//
// For more info on how to mock individual calls, see https://pkg.go.dev/github.com/stretchr/testify/mock.
package teststore
import (
"errors"
"sync/atomic"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/compression"
"github.com/stretchr/testify/mock"
)
// TestStore is a common.Storage implementation for testing/mocking purposes.
type TestStore struct {
mock.Mock
st common.Storage
// The following flags controls individual method passthough.
OpenPassthrough atomic.Bool
InitPassthrough atomic.Bool
ClosePassthrough atomic.Bool
TypePassthrough atomic.Bool
PathPassthrough atomic.Bool
SetCompressorPassthrough atomic.Bool
SetReportErrorFuncPassthrough atomic.Bool
GetPassthrough atomic.Bool
GetRangePassthrough atomic.Bool
ExistsPassthrough atomic.Bool
PutPassthrough atomic.Bool
DeletePassthrough atomic.Bool
IteratePassthrough atomic.Bool
}
// ErrDiskExploded is a phony error which can be used for testing purposes to differentiate it from
// more common errors.
var ErrDiskExploded = errors.New("disk exploded")
// New returns a teststore.TestStore with no underlying storage and all methods' passthrough disabled.
func New() *TestStore {
return &TestStore{}
}
// New returns a teststore.TestStore with the given underlying storage and all methods' passthrough enabled.
func NewFromStorage(st common.Storage) *TestStore {
s := &TestStore{st: st}
s.OpenPassthrough.Store(true)
s.InitPassthrough.Store(true)
s.ClosePassthrough.Store(true)
s.TypePassthrough.Store(true)
s.PathPassthrough.Store(true)
s.SetCompressorPassthrough.Store(true)
s.SetReportErrorFuncPassthrough.Store(true)
s.GetPassthrough.Store(true)
s.GetRangePassthrough.Store(true)
s.ExistsPassthrough.Store(true)
s.PutPassthrough.Store(true)
s.DeletePassthrough.Store(true)
s.IteratePassthrough.Store(true)
return s
}
func (s *TestStore) Open(readOnly bool) error {
if s.OpenPassthrough.Load() {
return s.st.Open(readOnly)
}
ret := s.Called(readOnly)
return ret.Error(0)
}
func (s *TestStore) Init() error {
if s.InitPassthrough.Load() {
return s.st.Init()
}
return s.Called().Error(0)
}
func (s *TestStore) Close() error {
if s.ClosePassthrough.Load() {
return s.st.Close()
}
return s.Called().Error(0)
}
func (s *TestStore) Type() string {
if s.TypePassthrough.Load() {
return s.st.Type()
}
return s.Called().String(0)
}
func (s *TestStore) Path() string {
if s.PathPassthrough.Load() {
return s.st.Path()
}
return s.Called().String(0)
}
func (s *TestStore) SetCompressor(cc *compression.Config) {
if s.SetCompressorPassthrough.Load() {
s.st.SetCompressor(cc)
return
}
s.Called(cc)
}
func (s *TestStore) SetReportErrorFunc(f func(string, error)) {
if s.SetReportErrorFuncPassthrough.Load() {
s.st.SetReportErrorFunc(f)
return
}
s.Called(f)
}
func (s *TestStore) Get(req common.GetPrm) (common.GetRes, error) {
if s.GetPassthrough.Load() {
return s.st.Get(req)
}
ret := s.Called(req)
return ret.Get(0).(common.GetRes), ret.Error(1)
}
func (s *TestStore) GetRange(req common.GetRangePrm) (common.GetRangeRes, error) {
if s.GetRangePassthrough.Load() {
return s.st.GetRange(req)
}
ret := s.Called(req)
return ret.Get(0).(common.GetRangeRes), ret.Error(1)
}
func (s *TestStore) Exists(req common.ExistsPrm) (common.ExistsRes, error) {
if s.ExistsPassthrough.Load() {
return s.st.Exists(req)
}
ret := s.Called(req)
if res := ret.Get(0); res != nil {
return res.(common.ExistsRes), ret.Error(1)
}
return common.ExistsRes{}, ret.Error(1)
}
func (s *TestStore) Put(req common.PutPrm) (common.PutRes, error) {
if s.PutPassthrough.Load() {
return s.st.Put(req)
}
ret := s.Called(req)
return ret.Get(0).(common.PutRes), ret.Error(1)
}
func (s *TestStore) Delete(req common.DeletePrm) (common.DeleteRes, error) {
if s.DeletePassthrough.Load() {
return s.st.Delete(req)
}
ret := s.Called(req)
return ret.Get(0).(common.DeleteRes), ret.Error(1)
}
func (s *TestStore) Iterate(req common.IteratePrm) (common.IterateRes, error) {
if s.IteratePassthrough.Load() {
return s.st.Iterate(req)
}
ret := s.Called(req)
return ret.Get(0).(common.IterateRes), ret.Error(1)
}

View file

@ -3,14 +3,17 @@ package engine
import (
"errors"
"fmt"
"io/fs"
"os"
"path/filepath"
"strconv"
"sync/atomic"
"testing"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/teststore"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
@ -18,98 +21,134 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"go.etcd.io/bbolt"
"go.uber.org/zap/zaptest"
)
// TestInitializationFailure checks that shard is initialized and closed even if media
// under any single component is absent. We emulate this with permission denied error.
// under any single component is absent.
func TestInitializationFailure(t *testing.T) {
type paths struct {
blobstor string
metabase string
writecache string
pilorama string
type openFileFunc func(string, int, fs.FileMode) (*os.File, error)
type testShardOpts struct {
openFileMetabase openFileFunc
openFileWriteCache openFileFunc
openFilePilorama openFileFunc
}
existsDir := filepath.Join(t.TempDir(), "shard")
badDir := filepath.Join(t.TempDir(), "missing")
testShard := func(c paths) []shard.Option {
testShard := func(opts testShardOpts) ([]shard.Option, *teststore.TestStore, *teststore.TestStore) {
sid, err := generateShardID()
require.NoError(t, err)
tempDir := t.TempDir()
blobstorPath := filepath.Join(tempDir, "bs")
metabasePath := filepath.Join(tempDir, "mb")
writecachePath := filepath.Join(tempDir, "wc")
piloramaPath := filepath.Join(tempDir, "pl")
storages, smallFileStorage, largeFileStorage := newTestStorages(blobstorPath, 1<<20)
return []shard.Option{
shard.WithID(sid),
shard.WithLogger(&logger.Logger{Logger: zaptest.NewLogger(t)}),
shard.WithBlobStorOptions(
blobstor.WithStorages(
newStorages(c.blobstor, 1<<20))),
blobstor.WithStorages(storages)),
shard.WithMetaBaseOptions(
meta.WithBoltDBOptions(&bbolt.Options{
Timeout: 100 * time.Millisecond,
Timeout: 100 * time.Millisecond,
OpenFile: opts.openFileMetabase,
}),
meta.WithPath(c.metabase),
meta.WithPath(metabasePath),
meta.WithPermissions(0700),
meta.WithEpochState(epochState{})),
shard.WithWriteCache(true),
shard.WithWriteCacheOptions(writecache.WithPath(c.writecache)),
shard.WithPiloramaOptions(pilorama.WithPath(c.pilorama)),
}
shard.WithWriteCacheOptions(
writecache.WithPath(writecachePath),
writecache.WithOpenFile(opts.openFileWriteCache),
),
shard.WithPiloramaOptions(
pilorama.WithPath(piloramaPath),
pilorama.WithOpenFile(opts.openFilePilorama),
),
}, smallFileStorage, largeFileStorage
}
t.Run("blobstor", func(t *testing.T) {
badDir := filepath.Join(badDir, t.Name())
require.NoError(t, os.MkdirAll(badDir, os.ModePerm))
require.NoError(t, os.Chmod(badDir, 0))
testEngineFailInitAndReload(t, badDir, false, testShard(paths{
blobstor: filepath.Join(badDir, "0"),
metabase: filepath.Join(existsDir, t.Name(), "1"),
writecache: filepath.Join(existsDir, t.Name(), "2"),
pilorama: filepath.Join(existsDir, t.Name(), "3"),
}))
shardOpts, _, largeFileStorage := testShard(testShardOpts{
openFileMetabase: os.OpenFile,
openFileWriteCache: os.OpenFile,
openFilePilorama: os.OpenFile,
})
largeFileStorage.OpenPassthrough.Store(false)
largeFileStorage.On("Open", mock.Anything).Return(teststore.ErrDiskExploded)
beforeReload := func() {
largeFileStorage.OpenPassthrough.Store(true)
}
testEngineFailInitAndReload(t, false, shardOpts, beforeReload)
})
t.Run("metabase", func(t *testing.T) {
badDir := filepath.Join(badDir, t.Name())
require.NoError(t, os.MkdirAll(badDir, os.ModePerm))
require.NoError(t, os.Chmod(badDir, 0))
testEngineFailInitAndReload(t, badDir, true, testShard(paths{
blobstor: filepath.Join(existsDir, t.Name(), "0"),
metabase: filepath.Join(badDir, "1"),
writecache: filepath.Join(existsDir, t.Name(), "2"),
pilorama: filepath.Join(existsDir, t.Name(), "3"),
}))
var openFileMetabaseSucceed atomic.Bool
openFileMetabase := func(p string, f int, mode fs.FileMode) (*os.File, error) {
if openFileMetabaseSucceed.Load() {
return os.OpenFile(p, f, mode)
}
return nil, teststore.ErrDiskExploded
}
beforeReload := func() {
openFileMetabaseSucceed.Store(true)
}
shardOpts, _, _ := testShard(testShardOpts{
openFileMetabase: openFileMetabase,
openFileWriteCache: os.OpenFile,
openFilePilorama: os.OpenFile,
})
testEngineFailInitAndReload(t, true, shardOpts, beforeReload)
})
t.Run("write-cache", func(t *testing.T) {
badDir := filepath.Join(badDir, t.Name())
require.NoError(t, os.MkdirAll(badDir, os.ModePerm))
require.NoError(t, os.Chmod(badDir, 0))
testEngineFailInitAndReload(t, badDir, false, testShard(paths{
blobstor: filepath.Join(existsDir, t.Name(), "0"),
metabase: filepath.Join(existsDir, t.Name(), "1"),
writecache: filepath.Join(badDir, "2"),
pilorama: filepath.Join(existsDir, t.Name(), "3"),
}))
var openFileWriteCacheSucceed atomic.Bool
openFileWriteCache := func(p string, f int, mode fs.FileMode) (*os.File, error) {
if openFileWriteCacheSucceed.Load() {
return os.OpenFile(p, f, mode)
}
return nil, teststore.ErrDiskExploded
}
beforeReload := func() {
openFileWriteCacheSucceed.Store(true)
}
shardOpts, _, _ := testShard(testShardOpts{
openFileMetabase: os.OpenFile,
openFileWriteCache: openFileWriteCache,
openFilePilorama: os.OpenFile,
})
testEngineFailInitAndReload(t, false, shardOpts, beforeReload)
})
t.Run("pilorama", func(t *testing.T) {
badDir := filepath.Join(badDir, t.Name())
require.NoError(t, os.MkdirAll(badDir, os.ModePerm))
require.NoError(t, os.Chmod(badDir, 0))
testEngineFailInitAndReload(t, badDir, false, testShard(paths{
blobstor: filepath.Join(existsDir, t.Name(), "0"),
metabase: filepath.Join(existsDir, t.Name(), "1"),
writecache: filepath.Join(existsDir, t.Name(), "2"),
pilorama: filepath.Join(badDir, "3"),
}))
var openFilePiloramaSucceed atomic.Bool
openFilePilorama := func(p string, f int, mode fs.FileMode) (*os.File, error) {
if openFilePiloramaSucceed.Load() {
return os.OpenFile(p, f, mode)
}
return nil, teststore.ErrDiskExploded
}
beforeReload := func() {
openFilePiloramaSucceed.Store(true)
}
shardOpts, _, _ := testShard(testShardOpts{
openFileMetabase: os.OpenFile,
openFileWriteCache: os.OpenFile,
openFilePilorama: openFilePilorama,
})
testEngineFailInitAndReload(t, false, shardOpts, beforeReload)
})
}
func testEngineFailInitAndReload(t *testing.T, badDir string, errOnAdd bool, s []shard.Option) {
func testEngineFailInitAndReload(t *testing.T, errOnAdd bool, opts []shard.Option, beforeReload func()) {
var configID string
e := New()
_, err := e.AddShard(s...)
_, err := e.AddShard(opts...)
if errOnAdd {
require.Error(t, err)
// This branch is only taken when we cannot update shard ID in the metabase.
@ -138,9 +177,10 @@ func testEngineFailInitAndReload(t *testing.T, badDir string, errOnAdd bool, s [
e.mtx.RUnlock()
require.Equal(t, 0, shardCount)
require.NoError(t, os.Chmod(badDir, os.ModePerm))
beforeReload()
require.NoError(t, e.Reload(ReConfiguration{
shards: map[string][]shard.Option{configID: s},
shards: map[string][]shard.Option{configID: opts},
}))
e.mtx.RLock()
@ -192,26 +232,28 @@ func TestPersistentShardID(t *testing.T) {
dir, err := os.MkdirTemp("", "*")
require.NoError(t, err)
e, _, id := newEngineWithErrorThreshold(t, dir, 1)
te := newEngineWithErrorThreshold(t, dir, 1)
checkShardState(t, e, id[0], 0, mode.ReadWrite)
require.NoError(t, e.Close())
checkShardState(t, te.ng, te.shards[0].id, 0, mode.ReadWrite)
require.NoError(t, te.ng.Close())
e, _, newID := newEngineWithErrorThreshold(t, dir, 1)
require.Equal(t, id, newID)
require.NoError(t, e.Close())
newTe := newEngineWithErrorThreshold(t, dir, 1)
for i := 0; i < len(newTe.shards); i++ {
require.Equal(t, te.shards[i].id, newTe.shards[i].id)
}
require.NoError(t, newTe.ng.Close())
p1 := e.shards[id[0].String()].Shard.DumpInfo().MetaBaseInfo.Path
p2 := e.shards[id[1].String()].Shard.DumpInfo().MetaBaseInfo.Path
p1 := newTe.ng.shards[te.shards[0].id.String()].Shard.DumpInfo().MetaBaseInfo.Path
p2 := newTe.ng.shards[te.shards[1].id.String()].Shard.DumpInfo().MetaBaseInfo.Path
tmp := filepath.Join(dir, "tmp")
require.NoError(t, os.Rename(p1, tmp))
require.NoError(t, os.Rename(p2, p1))
require.NoError(t, os.Rename(tmp, p2))
e, _, newID = newEngineWithErrorThreshold(t, dir, 1)
require.Equal(t, id[1], newID[0])
require.Equal(t, id[0], newID[1])
require.NoError(t, e.Close())
newTe = newEngineWithErrorThreshold(t, dir, 1)
require.Equal(t, te.shards[1].id, newTe.shards[0].id)
require.Equal(t, te.shards[0].id, newTe.shards[1].id)
require.NoError(t, newTe.ng.Close())
}

View file

@ -9,6 +9,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/blobovniczatree"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/teststore"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
@ -120,6 +121,30 @@ func newStorages(root string, smallSize uint64) []blobstor.SubStorage {
}
}
func newTestStorages(root string, smallSize uint64) ([]blobstor.SubStorage, *teststore.TestStore, *teststore.TestStore) {
smallFileStorage := teststore.NewFromStorage(blobovniczatree.NewBlobovniczaTree(
blobovniczatree.WithRootPath(filepath.Join(root, "blobovnicza")),
blobovniczatree.WithBlobovniczaShallowDepth(1),
blobovniczatree.WithBlobovniczaShallowWidth(1),
blobovniczatree.WithPermissions(0700)),
)
largeFileStorage := teststore.NewFromStorage(fstree.New(
fstree.WithPath(root),
fstree.WithDepth(1),
))
return []blobstor.SubStorage{
{
Storage: smallFileStorage,
Policy: func(_ *object.Object, data []byte) bool {
return uint64(len(data)) < smallSize
},
},
{
Storage: largeFileStorage,
},
}, smallFileStorage, largeFileStorage
}
func testNewShard(t testing.TB, id int) *shard.Shard {
sid, err := generateShardID()
require.NoError(t, err)

View file

@ -9,6 +9,8 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/teststore"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
@ -17,13 +19,26 @@ import (
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zaptest"
)
const errSmallSize = 256
func newEngineWithErrorThreshold(t testing.TB, dir string, errThreshold uint32) (*StorageEngine, string, [2]*shard.ID) {
type testEngine struct {
ng *StorageEngine
dir string
shards [2]*testShard
}
type testShard struct {
id *shard.ID
smallFileStorage *teststore.TestStore
largeFileStorage *teststore.TestStore
}
func newEngineWithErrorThreshold(t testing.TB, dir string, errThreshold uint32) *testEngine {
if dir == "" {
var err error
@ -37,14 +52,13 @@ func newEngineWithErrorThreshold(t testing.TB, dir string, errThreshold uint32)
WithShardPoolSize(1),
WithErrorThreshold(errThreshold))
var ids [2]*shard.ID
var err error
var testShards [2]*testShard
for i := range ids {
ids[i], err = e.AddShard(
for i := range testShards {
storages, smallFileStorage, largeFileStorage := newTestStorages(filepath.Join(dir, strconv.Itoa(i)), errSmallSize)
id, err := e.AddShard(
shard.WithLogger(&logger.Logger{Logger: zaptest.NewLogger(t)}),
shard.WithBlobStorOptions(
blobstor.WithStorages(newStorages(filepath.Join(dir, strconv.Itoa(i)), errSmallSize))),
shard.WithBlobStorOptions(blobstor.WithStorages(storages)),
shard.WithMetaBaseOptions(
meta.WithPath(filepath.Join(dir, fmt.Sprintf("%d.metabase", i))),
meta.WithPermissions(0700),
@ -54,94 +68,109 @@ func newEngineWithErrorThreshold(t testing.TB, dir string, errThreshold uint32)
pilorama.WithPath(filepath.Join(dir, fmt.Sprintf("%d.pilorama", i))),
pilorama.WithPerm(0700)))
require.NoError(t, err)
testShards[i] = &testShard{
id: id,
smallFileStorage: smallFileStorage,
largeFileStorage: largeFileStorage,
}
}
require.NoError(t, e.Open())
require.NoError(t, e.Init())
return e, dir, ids
return &testEngine{
ng: e,
dir: dir,
shards: testShards,
}
}
func TestErrorReporting(t *testing.T) {
t.Run("ignore errors by default", func(t *testing.T) {
e, dir, id := newEngineWithErrorThreshold(t, "", 0)
te := newEngineWithErrorThreshold(t, "", 0)
obj := generateObjectWithCID(t, cidtest.ID())
obj.SetPayload(make([]byte, errSmallSize))
var prm shard.PutPrm
prm.SetObject(obj)
e.mtx.RLock()
_, err := e.shards[id[0].String()].Shard.Put(prm)
e.mtx.RUnlock()
te.ng.mtx.RLock()
_, err := te.ng.shards[te.shards[0].id.String()].Shard.Put(prm)
te.ng.mtx.RUnlock()
require.NoError(t, err)
_, err = e.Get(GetPrm{addr: object.AddressOf(obj)})
_, err = te.ng.Get(GetPrm{addr: object.AddressOf(obj)})
require.NoError(t, err)
checkShardState(t, e, id[0], 0, mode.ReadWrite)
checkShardState(t, e, id[1], 0, mode.ReadWrite)
checkShardState(t, te.ng, te.shards[0].id, 0, mode.ReadWrite)
checkShardState(t, te.ng, te.shards[1].id, 0, mode.ReadWrite)
corruptSubDir(t, filepath.Join(dir, "0"))
for _, shard := range te.shards {
shard.largeFileStorage.GetPassthrough.Store(false)
shard.largeFileStorage.On("Get", mock.Anything).Return(common.GetRes{}, teststore.ErrDiskExploded)
}
for i := uint32(1); i < 3; i++ {
_, err = e.Get(GetPrm{addr: object.AddressOf(obj)})
_, err = te.ng.Get(GetPrm{addr: object.AddressOf(obj)})
require.Error(t, err)
checkShardState(t, e, id[0], i, mode.ReadWrite)
checkShardState(t, e, id[1], 0, mode.ReadWrite)
checkShardState(t, te.ng, te.shards[0].id, i, mode.ReadWrite)
checkShardState(t, te.ng, te.shards[1].id, 0, mode.ReadWrite)
}
})
t.Run("with error threshold", func(t *testing.T) {
const errThreshold = 3
e, dir, id := newEngineWithErrorThreshold(t, "", errThreshold)
te := newEngineWithErrorThreshold(t, "", errThreshold)
obj := generateObjectWithCID(t, cidtest.ID())
obj.SetPayload(make([]byte, errSmallSize))
var prm shard.PutPrm
prm.SetObject(obj)
e.mtx.RLock()
_, err := e.shards[id[0].String()].Put(prm)
e.mtx.RUnlock()
te.ng.mtx.RLock()
_, err := te.ng.shards[te.shards[0].id.String()].Put(prm)
te.ng.mtx.RUnlock()
require.NoError(t, err)
_, err = e.Get(GetPrm{addr: object.AddressOf(obj)})
_, err = te.ng.Get(GetPrm{addr: object.AddressOf(obj)})
require.NoError(t, err)
checkShardState(t, e, id[0], 0, mode.ReadWrite)
checkShardState(t, e, id[1], 0, mode.ReadWrite)
checkShardState(t, te.ng, te.shards[0].id, 0, mode.ReadWrite)
checkShardState(t, te.ng, te.shards[1].id, 0, mode.ReadWrite)
corruptSubDir(t, filepath.Join(dir, "0"))
for _, shard := range te.shards {
shard.largeFileStorage.GetPassthrough.Store(false)
shard.largeFileStorage.On("Get", mock.Anything).Return(common.GetRes{}, teststore.ErrDiskExploded)
}
for i := uint32(1); i < errThreshold; i++ {
_, err = e.Get(GetPrm{addr: object.AddressOf(obj)})
_, err = te.ng.Get(GetPrm{addr: object.AddressOf(obj)})
require.Error(t, err)
checkShardState(t, e, id[0], i, mode.ReadWrite)
checkShardState(t, e, id[1], 0, mode.ReadWrite)
checkShardState(t, te.ng, te.shards[0].id, i, mode.ReadWrite)
checkShardState(t, te.ng, te.shards[1].id, 0, mode.ReadWrite)
}
for i := uint32(0); i < 2; i++ {
_, err = e.Get(GetPrm{addr: object.AddressOf(obj)})
_, err = te.ng.Get(GetPrm{addr: object.AddressOf(obj)})
require.Error(t, err)
checkShardState(t, e, id[0], errThreshold+i, mode.DegradedReadOnly)
checkShardState(t, e, id[1], 0, mode.ReadWrite)
checkShardState(t, te.ng, te.shards[0].id, errThreshold+i, mode.DegradedReadOnly)
checkShardState(t, te.ng, te.shards[1].id, 0, mode.ReadWrite)
}
require.NoError(t, e.SetShardMode(id[0], mode.ReadWrite, false))
checkShardState(t, e, id[0], errThreshold+1, mode.ReadWrite)
require.NoError(t, te.ng.SetShardMode(te.shards[0].id, mode.ReadWrite, false))
checkShardState(t, te.ng, te.shards[0].id, errThreshold+1, mode.ReadWrite)
require.NoError(t, e.SetShardMode(id[0], mode.ReadWrite, true))
checkShardState(t, e, id[0], 0, mode.ReadWrite)
require.NoError(t, te.ng.SetShardMode(te.shards[0].id, mode.ReadWrite, true))
checkShardState(t, te.ng, te.shards[0].id, 0, mode.ReadWrite)
})
}
// Issue #1186.
func TestBlobstorFailback(t *testing.T) {
dir, err := os.MkdirTemp("", "*")
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, os.RemoveAll(dir)) })
e, _, id := newEngineWithErrorThreshold(t, dir, 1)
te := newEngineWithErrorThreshold(t, dir, 1)
objs := make([]*objectSDK.Object, 0, 2)
for _, size := range []int{15, errSmallSize + 1} {
@ -150,49 +179,49 @@ func TestBlobstorFailback(t *testing.T) {
var prm shard.PutPrm
prm.SetObject(obj)
e.mtx.RLock()
_, err = e.shards[id[0].String()].Shard.Put(prm)
e.mtx.RUnlock()
te.ng.mtx.RLock()
_, err = te.ng.shards[te.shards[0].id.String()].Shard.Put(prm)
te.ng.mtx.RUnlock()
require.NoError(t, err)
objs = append(objs, obj)
}
for i := range objs {
addr := object.AddressOf(objs[i])
_, err = e.Get(GetPrm{addr: addr})
_, err = te.ng.Get(GetPrm{addr: addr})
require.NoError(t, err)
_, err = e.GetRange(RngPrm{addr: addr})
_, err = te.ng.GetRange(RngPrm{addr: addr})
require.NoError(t, err)
}
checkShardState(t, e, id[0], 0, mode.ReadWrite)
require.NoError(t, e.Close())
checkShardState(t, te.ng, te.shards[0].id, 0, mode.ReadWrite)
require.NoError(t, te.ng.Close())
p1 := e.shards[id[0].String()].Shard.DumpInfo().BlobStorInfo.SubStorages[1].Path
p2 := e.shards[id[1].String()].Shard.DumpInfo().BlobStorInfo.SubStorages[1].Path
p1 := te.ng.shards[te.shards[0].id.String()].Shard.DumpInfo().BlobStorInfo.SubStorages[1].Path
p2 := te.ng.shards[te.shards[1].id.String()].Shard.DumpInfo().BlobStorInfo.SubStorages[1].Path
tmp := filepath.Join(dir, "tmp")
require.NoError(t, os.Rename(p1, tmp))
require.NoError(t, os.Rename(p2, p1))
require.NoError(t, os.Rename(tmp, p2))
e, _, id = newEngineWithErrorThreshold(t, dir, 1)
te = newEngineWithErrorThreshold(t, dir, 1)
for i := range objs {
addr := object.AddressOf(objs[i])
getRes, err := e.Get(GetPrm{addr: addr})
getRes, err := te.ng.Get(GetPrm{addr: addr})
require.NoError(t, err)
require.Equal(t, objs[i], getRes.Object())
rngRes, err := e.GetRange(RngPrm{addr: addr, off: 1, ln: 10})
rngRes, err := te.ng.GetRange(RngPrm{addr: addr, off: 1, ln: 10})
require.NoError(t, err)
require.Equal(t, objs[i].Payload()[1:11], rngRes.Object().Payload())
_, err = e.GetRange(RngPrm{addr: addr, off: errSmallSize + 10, ln: 1})
_, err = te.ng.GetRange(RngPrm{addr: addr, off: errSmallSize + 10, ln: 1})
require.ErrorAs(t, err, &apistatus.ObjectOutOfRange{})
}
checkShardState(t, e, id[0], 1, mode.DegradedReadOnly)
checkShardState(t, e, id[1], 0, mode.ReadWrite)
checkShardState(t, te.ng, te.shards[0].id, 1, mode.DegradedReadOnly)
checkShardState(t, te.ng, te.shards[1].id, 0, mode.ReadWrite)
}
func checkShardState(t *testing.T, e *StorageEngine, id *shard.ID, errCount uint32, mode mode.Mode) {
@ -203,19 +232,3 @@ func checkShardState(t *testing.T, e *StorageEngine, id *shard.ID, errCount uint
require.Equal(t, errCount, sh.errorCount.Load())
require.Equal(t, mode, sh.GetMode())
}
// corruptSubDir makes random directory except "blobovnicza" in blobstor FSTree unreadable.
func corruptSubDir(t *testing.T, dir string) {
de, err := os.ReadDir(dir)
require.NoError(t, err)
// FIXME(@cthulhu-rider): copy-paste of unexported const from blobstor package, see #1407
const dirBlobovnicza = "blobovnicza"
for i := range de {
if de[i].IsDir() && de[i].Name() != dirBlobovnicza {
require.NoError(t, os.Chmod(filepath.Join(dir, de[i].Name()), 0))
return
}
}
}

View file

@ -22,7 +22,7 @@ func BenchmarkTreeVsSearch(b *testing.B) {
}
func benchmarkTreeVsSearch(b *testing.B, objCount int) {
e, _, _ := newEngineWithErrorThreshold(b, "", 0)
te := newEngineWithErrorThreshold(b, "", 0)
cid := cidtest.ID()
d := pilorama.CIDDescriptor{CID: cid, Position: 0, Size: 1}
treeID := "someTree"
@ -30,11 +30,11 @@ func benchmarkTreeVsSearch(b *testing.B, objCount int) {
for i := 0; i < objCount; i++ {
obj := generateObjectWithCID(b, cid)
addAttribute(obj, pilorama.AttributeFilename, strconv.Itoa(i))
err := Put(e, obj)
err := Put(te.ng, obj)
if err != nil {
b.Fatal(err)
}
_, err = e.TreeAddByPath(d, treeID, pilorama.AttributeFilename, nil,
_, err = te.ng.TreeAddByPath(d, treeID, pilorama.AttributeFilename, nil,
[]pilorama.KeyValue{{pilorama.AttributeFilename, []byte(strconv.Itoa(i))}})
if err != nil {
b.Fatal(err)
@ -50,7 +50,7 @@ func benchmarkTreeVsSearch(b *testing.B, objCount int) {
prm.WithFilters(fs)
for i := 0; i < b.N; i++ {
res, err := e.Select(prm)
res, err := te.ng.Select(prm)
if err != nil {
b.Fatal(err)
}
@ -61,7 +61,7 @@ func benchmarkTreeVsSearch(b *testing.B, objCount int) {
})
b.Run("TreeGetByPath", func(b *testing.B) {
for i := 0; i < b.N; i++ {
nodes, err := e.TreeGetByPath(cid, treeID, pilorama.AttributeFilename, []string{strconv.Itoa(objCount / 2)}, true)
nodes, err := te.ng.TreeGetByPath(cid, treeID, pilorama.AttributeFilename, []string{strconv.Itoa(objCount / 2)}, true)
if err != nil {
b.Fatal(err)
}

View file

@ -66,6 +66,7 @@ func NewBoltForest(opts ...Option) ForestStorage {
perm: os.ModePerm,
maxBatchDelay: bbolt.DefaultMaxBatchDelay,
maxBatchSize: bbolt.DefaultMaxBatchSize,
openFile: os.OpenFile,
},
}
@ -107,6 +108,7 @@ func (t *boltForest) Open(readOnly bool) error {
opts.ReadOnly = readOnly
opts.NoSync = t.noSync
opts.Timeout = 100 * time.Millisecond
opts.OpenFile = t.openFile
t.db, err = bbolt.Open(t.path, t.perm, &opts)
if err != nil {

View file

@ -2,6 +2,7 @@ package pilorama
import (
"io/fs"
"os"
"time"
)
@ -13,6 +14,7 @@ type cfg struct {
noSync bool
maxBatchDelay time.Duration
maxBatchSize int
openFile func(string, int, fs.FileMode) (*os.File, error)
}
func WithPath(path string) Option {
@ -44,3 +46,9 @@ func WithMaxBatchSize(size int) Option {
c.maxBatchSize = size
}
}
func WithOpenFile(openFile func(string, int, fs.FileMode) (*os.File, error)) Option {
return func(c *cfg) {
c.openFile = openFile
}
}

View file

@ -1,14 +1,18 @@
package shard
import (
"io/fs"
"math"
"os"
"path/filepath"
"sync/atomic"
"testing"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/teststore"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
@ -22,6 +26,7 @@ import (
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
objecttest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/test"
"github.com/stretchr/testify/require"
"go.etcd.io/bbolt"
"go.uber.org/zap/zaptest"
)
@ -40,19 +45,32 @@ func TestShardOpen(t *testing.T) {
dir := t.TempDir()
metaPath := filepath.Join(dir, "meta")
st := teststore.NewFromStorage(fstree.New(
fstree.WithDirNameLen(2),
fstree.WithPath(filepath.Join(dir, "blob")),
fstree.WithDepth(1)),
)
var allowedMode atomic.Int64
openFileMetabase := func(p string, f int, perm fs.FileMode) (*os.File, error) {
if int64(f&3) == allowedMode.Load() {
return os.OpenFile(p, f, perm)
}
return nil, teststore.ErrDiskExploded
}
newShard := func() *Shard {
return New(
WithLogger(&logger.Logger{Logger: zaptest.NewLogger(t)}),
WithBlobStorOptions(
blobstor.WithStorages([]blobstor.SubStorage{
{
Storage: fstree.New(
fstree.WithDirNameLen(2),
fstree.WithPath(filepath.Join(dir, "blob")),
fstree.WithDepth(1)),
},
{Storage: st},
})),
WithMetaBaseOptions(meta.WithPath(metaPath), meta.WithEpochState(epochState{})),
WithMetaBaseOptions(
meta.WithPath(metaPath),
meta.WithEpochState(epochState{}),
meta.WithBoltDBOptions(&bbolt.Options{OpenFile: openFileMetabase}),
),
WithPiloramaOptions(
pilorama.WithPath(filepath.Join(dir, "pilorama"))),
WithWriteCache(true),
@ -60,6 +78,8 @@ func TestShardOpen(t *testing.T) {
writecache.WithPath(filepath.Join(dir, "wc"))))
}
allowedMode.Store(int64(os.O_RDWR))
sh := newShard()
require.NoError(t, sh.Open())
require.NoError(t, sh.Init())
@ -67,7 +87,8 @@ func TestShardOpen(t *testing.T) {
require.NoError(t, sh.Close())
// Metabase can be opened in read-only => start in ReadOnly mode.
require.NoError(t, os.Chmod(metaPath, 0444))
allowedMode.Store(int64(os.O_RDONLY))
sh = newShard()
require.NoError(t, sh.Open())
require.NoError(t, sh.Init())
@ -77,7 +98,8 @@ func TestShardOpen(t *testing.T) {
require.NoError(t, sh.Close())
// Metabase is corrupted => start in DegradedReadOnly mode.
require.NoError(t, os.Chmod(metaPath, 0000))
allowedMode.Store(math.MaxInt64)
sh = newShard()
require.NoError(t, sh.Open())
require.NoError(t, sh.Init())

View file

@ -46,12 +46,13 @@ func TestFlush(t *testing.T) {
require.NoError(t, mb.Open(false))
require.NoError(t, mb.Init())
fsTree := fstree.New(
fstree.WithPath(filepath.Join(dir, "blob")),
fstree.WithDepth(0),
fstree.WithDirNameLen(1))
bs := blobstor.New(blobstor.WithStorages([]blobstor.SubStorage{
{Storage: fsTree},
{
Storage: fstree.New(
fstree.WithPath(filepath.Join(dir, "blob")),
fstree.WithDepth(0),
fstree.WithDirNameLen(1)),
},
}))
require.NoError(t, bs.Open(false))
require.NoError(t, bs.Init())
@ -208,7 +209,7 @@ func TestFlush(t *testing.T) {
_, err = os.Stat(p) // sanity check
require.NoError(t, err)
require.NoError(t, os.Chmod(p, 0))
require.NoError(t, os.Truncate(p, 0))
})
})
t.Run("fs, invalid object", func(t *testing.T) {

View file

@ -1,6 +1,8 @@
package writecache
import (
"io/fs"
"os"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
@ -55,6 +57,8 @@ type options struct {
noSync bool
// reportError is the function called when encountering disk errors in background workers.
reportError func(string, error)
// openFile is the function called internally by bbolt to open database files. Useful for hermetic testing.
openFile func(string, int, fs.FileMode) (*os.File, error)
}
// WithLogger sets logger.
@ -152,3 +156,10 @@ func WithReportErrorFunc(f func(string, error)) Option {
o.reportError = f
}
}
// WithOpenFile sets the OpenFile function to use internally by bolt. Useful for hermetic testing.
func WithOpenFile(f func(string, int, fs.FileMode) (*os.File, error)) Option {
return func(o *options) {
o.openFile = f
}
}

View file

@ -43,7 +43,7 @@ func (c *cache) openStore(readOnly bool) error {
return err
}
c.db, err = OpenDB(c.path, readOnly)
c.db, err = OpenDB(c.path, readOnly, c.openFile)
if err != nil {
return fmt.Errorf("could not open database: %w", err)
}

View file

@ -1,6 +1,7 @@
package writecache
import (
"io/fs"
"os"
"path/filepath"
"time"
@ -9,10 +10,11 @@ import (
)
// OpenDB opens BoltDB instance for write-cache. Opens in read-only mode if ro is true.
func OpenDB(p string, ro bool) (*bbolt.DB, error) {
func OpenDB(p string, ro bool, openFile func(string, int, fs.FileMode) (*os.File, error)) (*bbolt.DB, error) {
return bbolt.Open(filepath.Join(p, dbName), os.ModePerm, &bbolt.Options{
NoFreelistSync: true,
ReadOnly: ro,
Timeout: 100 * time.Millisecond,
OpenFile: openFile,
})
}

View file

@ -1,6 +1,7 @@
package writecache
import (
"os"
"sync"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
@ -106,6 +107,7 @@ func New(opts ...Option) Cache {
maxCacheSize: defaultMaxCacheSize,
maxBatchSize: bbolt.DefaultMaxBatchSize,
maxBatchDelay: bbolt.DefaultMaxBatchDelay,
openFile: os.OpenFile,
},
}