[#139] test: Add test storage implementation #160
20 changed files with 502 additions and 206 deletions
|
@ -1,6 +1,8 @@
|
||||||
package writecache
|
package writecache
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"os"
|
||||||
|
|
||||||
common "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-lens/internal"
|
common "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-lens/internal"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache"
|
||||||
"github.com/spf13/cobra"
|
"github.com/spf13/cobra"
|
||||||
|
@ -24,7 +26,7 @@ func init() {
|
||||||
}
|
}
|
||||||
|
|
||||||
func openWC(cmd *cobra.Command) *bbolt.DB {
|
func openWC(cmd *cobra.Command) *bbolt.DB {
|
||||||
db, err := writecache.OpenDB(vPath, true)
|
db, err := writecache.OpenDB(vPath, true, os.OpenFile)
|
||||||
common.ExitOnErr(cmd, common.Errf("could not open write-cache db: %w", err))
|
common.ExitOnErr(cmd, common.Errf("could not open write-cache db: %w", err))
|
||||||
|
|
||||||
return db
|
return db
|
||||||
|
|
3
go.mod
3
go.mod
|
@ -29,7 +29,7 @@ require (
|
||||||
github.com/spf13/cobra v1.6.1
|
github.com/spf13/cobra v1.6.1
|
||||||
github.com/spf13/pflag v1.0.5
|
github.com/spf13/pflag v1.0.5
|
||||||
github.com/spf13/viper v1.14.0
|
github.com/spf13/viper v1.14.0
|
||||||
github.com/stretchr/testify v1.8.1
|
github.com/stretchr/testify v1.8.2
|
||||||
go.etcd.io/bbolt v1.3.6
|
go.etcd.io/bbolt v1.3.6
|
||||||
go.uber.org/atomic v1.10.0
|
go.uber.org/atomic v1.10.0
|
||||||
go.uber.org/zap v1.24.0
|
go.uber.org/zap v1.24.0
|
||||||
|
@ -87,6 +87,7 @@ require (
|
||||||
github.com/spaolacci/murmur3 v1.1.0 // indirect
|
github.com/spaolacci/murmur3 v1.1.0 // indirect
|
||||||
github.com/spf13/afero v1.9.2 // indirect
|
github.com/spf13/afero v1.9.2 // indirect
|
||||||
github.com/spf13/jwalterweatherman v1.1.0 // indirect
|
github.com/spf13/jwalterweatherman v1.1.0 // indirect
|
||||||
|
github.com/stretchr/objx v0.5.0 // indirect
|
||||||
github.com/subosito/gotenv v1.4.1 // indirect
|
github.com/subosito/gotenv v1.4.1 // indirect
|
||||||
github.com/syndtr/goleveldb v1.0.1-0.20210305035536-64b5b1c73954 // indirect
|
github.com/syndtr/goleveldb v1.0.1-0.20210305035536-64b5b1c73954 // indirect
|
||||||
github.com/twmb/murmur3 v1.1.5 // indirect
|
github.com/twmb/murmur3 v1.1.5 // indirect
|
||||||
|
|
BIN
go.sum
BIN
go.sum
Binary file not shown.
|
@ -50,7 +50,6 @@ func TestExistsInvalidStorageID(t *testing.T) {
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("invalid storage id", func(t *testing.T) {
|
t.Run("invalid storage id", func(t *testing.T) {
|
||||||
// "0/X/Y" <-> "1/X/Y"
|
|
||||||
storageID := slice.Copy(putRes.StorageID)
|
storageID := slice.Copy(putRes.StorageID)
|
||||||
storageID[0] = '9'
|
storageID[0] = '9'
|
||||||
badDir := filepath.Join(dir, "9")
|
badDir := filepath.Join(dir, "9")
|
||||||
|
|
|
@ -1,7 +1,6 @@
|
||||||
package blobstor
|
package blobstor
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"os"
|
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
|
@ -9,32 +8,37 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/blobovniczatree"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/blobovniczatree"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/teststore"
|
||||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
const blobovniczaDir = "blobovniczas"
|
func defaultTestStorages(p string, smallSizeLimit uint64) ([]SubStorage, *teststore.TestStore, *teststore.TestStore) {
|
||||||
|
smallFileStorage := teststore.NewFromStorage(blobovniczatree.NewBlobovniczaTree(
|
||||||
func defaultStorages(p string, smallSizeLimit uint64) []SubStorage {
|
|
||||||
return []SubStorage{
|
|
||||||
{
|
|
||||||
Storage: blobovniczatree.NewBlobovniczaTree(
|
|
||||||
blobovniczatree.WithRootPath(filepath.Join(p, "blobovniczas")),
|
blobovniczatree.WithRootPath(filepath.Join(p, "blobovniczas")),
|
||||||
blobovniczatree.WithBlobovniczaShallowWidth(1)), // default width is 16, slow init
|
blobovniczatree.WithBlobovniczaShallowWidth(1)), // default width is 16, slow init
|
||||||
|
)
|
||||||
|
largeFileStorage := teststore.NewFromStorage(fstree.New(fstree.WithPath(p)))
|
||||||
|
return []SubStorage{
|
||||||
|
{
|
||||||
|
Storage: smallFileStorage,
|
||||||
Policy: func(_ *objectSDK.Object, data []byte) bool {
|
Policy: func(_ *objectSDK.Object, data []byte) bool {
|
||||||
return uint64(len(data)) <= smallSizeLimit
|
return uint64(len(data)) <= smallSizeLimit
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
Storage: fstree.New(fstree.WithPath(p)),
|
Storage: largeFileStorage,
|
||||||
},
|
},
|
||||||
}
|
}, smallFileStorage, largeFileStorage
|
||||||
|
}
|
||||||
|
|
||||||
|
func defaultStorages(p string, smallSizeLimit uint64) []SubStorage {
|
||||||
|
storages, _, _ := defaultTestStorages(p, smallSizeLimit)
|
||||||
|
return storages
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestCompression(t *testing.T) {
|
func TestCompression(t *testing.T) {
|
||||||
dir, err := os.MkdirTemp("", "frostfs*")
|
dir := t.TempDir()
|
||||||
require.NoError(t, err)
|
|
||||||
t.Cleanup(func() { _ = os.RemoveAll(dir) })
|
|
||||||
|
|
||||||
const (
|
const (
|
||||||
smallSizeLimit = 512
|
smallSizeLimit = 512
|
||||||
|
@ -70,7 +74,7 @@ func TestCompression(t *testing.T) {
|
||||||
testPut := func(t *testing.T, b *BlobStor, i int) {
|
testPut := func(t *testing.T, b *BlobStor, i int) {
|
||||||
var prm common.PutPrm
|
var prm common.PutPrm
|
||||||
prm.Object = smallObj[i]
|
prm.Object = smallObj[i]
|
||||||
_, err = b.Put(prm)
|
_, err := b.Put(prm)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
prm = common.PutPrm{}
|
prm = common.PutPrm{}
|
||||||
|
@ -102,9 +106,7 @@ func TestCompression(t *testing.T) {
|
||||||
func TestBlobstor_needsCompression(t *testing.T) {
|
func TestBlobstor_needsCompression(t *testing.T) {
|
||||||
const smallSizeLimit = 512
|
const smallSizeLimit = 512
|
||||||
newBlobStor := func(t *testing.T, compress bool, ct ...string) *BlobStor {
|
newBlobStor := func(t *testing.T, compress bool, ct ...string) *BlobStor {
|
||||||
dir, err := os.MkdirTemp("", "frostfs*")
|
dir := t.TempDir()
|
||||||
require.NoError(t, err)
|
|
||||||
t.Cleanup(func() { _ = os.RemoveAll(dir) })
|
|
||||||
|
|
||||||
bs := New(
|
bs := New(
|
||||||
WithCompressObjects(compress),
|
WithCompressObjects(compress),
|
||||||
|
|
|
@ -2,14 +2,15 @@ package blobstor
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/teststore"
|
||||||
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
|
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
|
||||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
|
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
|
||||||
|
"github.com/stretchr/testify/mock"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -20,8 +21,10 @@ func TestExists(t *testing.T) {
|
||||||
|
|
||||||
const smallSizeLimit = 512
|
const smallSizeLimit = 512
|
||||||
|
|
||||||
b := New(
|
storages, _, largeFileStorage := defaultTestStorages(dir, smallSizeLimit)
|
||||||
WithStorages(defaultStorages(dir, smallSizeLimit)))
|
|
||||||
|
b := New(WithStorages(storages))
|
||||||
|
|
||||||
require.NoError(t, b.Open(false))
|
require.NoError(t, b.Open(false))
|
||||||
require.NoError(t, b.Init())
|
require.NoError(t, b.Init())
|
||||||
|
|
||||||
|
@ -33,7 +36,7 @@ func TestExists(t *testing.T) {
|
||||||
for i := range objects {
|
for i := range objects {
|
||||||
var prm common.PutPrm
|
var prm common.PutPrm
|
||||||
prm.Object = objects[i]
|
prm.Object = objects[i]
|
||||||
_, err = b.Put(prm)
|
_, err := b.Put(prm)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -51,20 +54,9 @@ func TestExists(t *testing.T) {
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.False(t, res.Exists)
|
require.False(t, res.Exists)
|
||||||
|
|
||||||
t.Run("corrupt direcrory", func(t *testing.T) {
|
t.Run("corrupt directory", func(t *testing.T) {
|
||||||
var bigDir string
|
largeFileStorage.ExistsPassthrough.Store(false)
|
||||||
de, err := os.ReadDir(dir)
|
largeFileStorage.On("Exists", mock.Anything).Return(common.ExistsRes{}, teststore.ErrDiskExploded)
|
||||||
|
|||||||
require.NoError(t, err)
|
|
||||||
for i := range de {
|
|
||||||
if de[i].Name() != blobovniczaDir {
|
|
||||||
bigDir = filepath.Join(dir, de[i].Name())
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
require.NotEmpty(t, bigDir)
|
|
||||||
|
|
||||||
require.NoError(t, os.Chmod(dir, 0))
|
|
||||||
t.Cleanup(func() { require.NoError(t, os.Chmod(dir, 0777)) })
|
|
||||||
|
|
||||||
// Object exists, first error is logged.
|
// Object exists, first error is logged.
|
||||||
prm.Address = objectCore.AddressOf(objects[0])
|
prm.Address = objectCore.AddressOf(objects[0])
|
||||||
|
@ -76,6 +68,7 @@ func TestExists(t *testing.T) {
|
||||||
prm.Address = objectCore.AddressOf(objects[1])
|
prm.Address = objectCore.AddressOf(objects[1])
|
||||||
_, err = b.Exists(prm)
|
_, err = b.Exists(prm)
|
||||||
require.Error(t, err)
|
require.Error(t, err)
|
||||||
|
require.ErrorIs(t, err, teststore.ErrDiskExploded)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,23 +1,16 @@
|
||||||
package blobstor
|
package blobstor
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"os"
|
|
||||||
"path/filepath"
|
|
||||||
"strconv"
|
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/storagetest"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/storagetest"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestGeneric(t *testing.T) {
|
func TestGeneric(t *testing.T) {
|
||||||
defer func() { _ = os.RemoveAll(t.Name()) }()
|
|
||||||
|
|
||||||
var n int
|
|
||||||
newMetabase := func(t *testing.T) storagetest.Component {
|
newMetabase := func(t *testing.T) storagetest.Component {
|
||||||
n++
|
|
||||||
dir := filepath.Join(t.Name(), strconv.Itoa(n))
|
|
||||||
return New(
|
return New(
|
||||||
WithStorages(defaultStorages(dir, 128)))
|
WithStorages(defaultStorages(t.TempDir(), 128)))
|
||||||
}
|
}
|
||||||
|
|
||||||
storagetest.TestAll(t, newMetabase)
|
storagetest.TestAll(t, newMetabase)
|
||||||
|
|
178
pkg/local_object_storage/blobstor/teststore/teststore.go
Normal file
178
pkg/local_object_storage/blobstor/teststore/teststore.go
Normal file
|
@ -0,0 +1,178 @@
|
||||||
|
// Package teststore provides a common.Storage implementation for testing/mocking purposes.
|
||||||
|
//
|
||||||
|
// A new teststore.TestStore can be obtained in two ways:
|
||||||
|
// 1. Using teststore.New, a new instance is returned which doesn't pass-through any calls.
|
||||||
|
// Thus, if any of its methods is called without being mocked beforehand, the test will
|
||||||
|
// fail. This is useful for small tests that integrate with common.Storage implementations
|
||||||
|
// and need to expect a couple of method calls only.
|
||||||
|
// 2. Using teststore.NewFromStorage wraps an existing common.Storage implementation which
|
||||||
|
// by default will pass-though all calls to the underlying storage. Individual pass-through
|
||||||
|
// can be disabled and mocked for individual methods. This is useful for larger tests that
|
||||||
|
// integrate with common.Storage implementations where a few selected calls need to be mocked
|
||||||
|
// for e.g. test expectations or injecting failures.
|
||||||
|
//
|
||||||
|
// For more info on how to mock individual calls, see https://pkg.go.dev/github.com/stretchr/testify/mock.
|
||||||
|
package teststore
|
||||||
|
|
||||||
|
import (
|
||||||
|
"errors"
|
||||||
|
"sync/atomic"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/compression"
|
||||||
|
"github.com/stretchr/testify/mock"
|
||||||
|
)
|
||||||
|
|
||||||
|
// TestStore is a common.Storage implementation for testing/mocking purposes.
|
||||||
|
type TestStore struct {
|
||||||
|
mock.Mock
|
||||||
|
|
||||||
|
st common.Storage
|
||||||
|
|
||||||
|
// The following flags controls individual method passthough.
|
||||||
|
|
||||||
|
OpenPassthrough atomic.Bool
|
||||||
|
InitPassthrough atomic.Bool
|
||||||
|
ClosePassthrough atomic.Bool
|
||||||
|
TypePassthrough atomic.Bool
|
||||||
|
PathPassthrough atomic.Bool
|
||||||
|
SetCompressorPassthrough atomic.Bool
|
||||||
|
SetReportErrorFuncPassthrough atomic.Bool
|
||||||
|
GetPassthrough atomic.Bool
|
||||||
|
GetRangePassthrough atomic.Bool
|
||||||
|
ExistsPassthrough atomic.Bool
|
||||||
|
PutPassthrough atomic.Bool
|
||||||
|
DeletePassthrough atomic.Bool
|
||||||
|
IteratePassthrough atomic.Bool
|
||||||
|
}
|
||||||
|
|
||||||
|
// ErrDiskExploded is a phony error which can be used for testing purposes to differentiate it from
|
||||||
|
// more common errors.
|
||||||
|
var ErrDiskExploded = errors.New("disk exploded")
|
||||||
|
|
||||||
|
// New returns a teststore.TestStore with no underlying storage and all methods' passthrough disabled.
|
||||||
|
func New() *TestStore {
|
||||||
|
return &TestStore{}
|
||||||
|
}
|
||||||
|
|
||||||
|
// New returns a teststore.TestStore with the given underlying storage and all methods' passthrough enabled.
|
||||||
|
func NewFromStorage(st common.Storage) *TestStore {
|
||||||
|
s := &TestStore{st: st}
|
||||||
|
s.OpenPassthrough.Store(true)
|
||||||
|
s.InitPassthrough.Store(true)
|
||||||
|
s.ClosePassthrough.Store(true)
|
||||||
|
s.TypePassthrough.Store(true)
|
||||||
|
s.PathPassthrough.Store(true)
|
||||||
|
s.SetCompressorPassthrough.Store(true)
|
||||||
|
s.SetReportErrorFuncPassthrough.Store(true)
|
||||||
|
s.GetPassthrough.Store(true)
|
||||||
|
s.GetRangePassthrough.Store(true)
|
||||||
|
s.ExistsPassthrough.Store(true)
|
||||||
|
s.PutPassthrough.Store(true)
|
||||||
|
s.DeletePassthrough.Store(true)
|
||||||
|
s.IteratePassthrough.Store(true)
|
||||||
|
return s
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *TestStore) Open(readOnly bool) error {
|
||||||
|
if s.OpenPassthrough.Load() {
|
||||||
|
return s.st.Open(readOnly)
|
||||||
|
}
|
||||||
|
ret := s.Called(readOnly)
|
||||||
|
return ret.Error(0)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *TestStore) Init() error {
|
||||||
|
if s.InitPassthrough.Load() {
|
||||||
|
return s.st.Init()
|
||||||
|
}
|
||||||
|
return s.Called().Error(0)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *TestStore) Close() error {
|
||||||
|
if s.ClosePassthrough.Load() {
|
||||||
|
return s.st.Close()
|
||||||
|
}
|
||||||
|
return s.Called().Error(0)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *TestStore) Type() string {
|
||||||
|
if s.TypePassthrough.Load() {
|
||||||
|
return s.st.Type()
|
||||||
|
}
|
||||||
|
return s.Called().String(0)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *TestStore) Path() string {
|
||||||
|
if s.PathPassthrough.Load() {
|
||||||
|
return s.st.Path()
|
||||||
|
}
|
||||||
|
return s.Called().String(0)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *TestStore) SetCompressor(cc *compression.Config) {
|
||||||
|
if s.SetCompressorPassthrough.Load() {
|
||||||
|
s.st.SetCompressor(cc)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
s.Called(cc)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *TestStore) SetReportErrorFunc(f func(string, error)) {
|
||||||
|
if s.SetReportErrorFuncPassthrough.Load() {
|
||||||
|
s.st.SetReportErrorFunc(f)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
s.Called(f)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *TestStore) Get(req common.GetPrm) (common.GetRes, error) {
|
||||||
|
if s.GetPassthrough.Load() {
|
||||||
|
return s.st.Get(req)
|
||||||
|
}
|
||||||
|
ret := s.Called(req)
|
||||||
|
return ret.Get(0).(common.GetRes), ret.Error(1)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *TestStore) GetRange(req common.GetRangePrm) (common.GetRangeRes, error) {
|
||||||
|
if s.GetRangePassthrough.Load() {
|
||||||
|
return s.st.GetRange(req)
|
||||||
|
}
|
||||||
|
ret := s.Called(req)
|
||||||
|
return ret.Get(0).(common.GetRangeRes), ret.Error(1)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *TestStore) Exists(req common.ExistsPrm) (common.ExistsRes, error) {
|
||||||
|
if s.ExistsPassthrough.Load() {
|
||||||
|
return s.st.Exists(req)
|
||||||
|
}
|
||||||
|
ret := s.Called(req)
|
||||||
|
if res := ret.Get(0); res != nil {
|
||||||
|
return res.(common.ExistsRes), ret.Error(1)
|
||||||
|
}
|
||||||
|
return common.ExistsRes{}, ret.Error(1)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *TestStore) Put(req common.PutPrm) (common.PutRes, error) {
|
||||||
|
if s.PutPassthrough.Load() {
|
||||||
|
return s.st.Put(req)
|
||||||
|
}
|
||||||
|
ret := s.Called(req)
|
||||||
|
return ret.Get(0).(common.PutRes), ret.Error(1)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *TestStore) Delete(req common.DeletePrm) (common.DeleteRes, error) {
|
||||||
|
if s.DeletePassthrough.Load() {
|
||||||
|
return s.st.Delete(req)
|
||||||
|
}
|
||||||
|
ret := s.Called(req)
|
||||||
|
return ret.Get(0).(common.DeleteRes), ret.Error(1)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *TestStore) Iterate(req common.IteratePrm) (common.IterateRes, error) {
|
||||||
|
if s.IteratePassthrough.Load() {
|
||||||
|
return s.st.Iterate(req)
|
||||||
|
}
|
||||||
|
ret := s.Called(req)
|
||||||
|
return ret.Get(0).(common.IterateRes), ret.Error(1)
|
||||||
|
}
|
|
@ -3,14 +3,17 @@ package engine
|
||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"io/fs"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
"sync/atomic"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/teststore"
|
||||||
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
|
||||||
|
@ -18,98 +21,134 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||||
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
|
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
|
||||||
|
"github.com/stretchr/testify/mock"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
"go.etcd.io/bbolt"
|
"go.etcd.io/bbolt"
|
||||||
"go.uber.org/zap/zaptest"
|
"go.uber.org/zap/zaptest"
|
||||||
)
|
)
|
||||||
|
|
||||||
// TestInitializationFailure checks that shard is initialized and closed even if media
|
// TestInitializationFailure checks that shard is initialized and closed even if media
|
||||||
// under any single component is absent. We emulate this with permission denied error.
|
// under any single component is absent.
|
||||||
func TestInitializationFailure(t *testing.T) {
|
func TestInitializationFailure(t *testing.T) {
|
||||||
type paths struct {
|
type openFileFunc func(string, int, fs.FileMode) (*os.File, error)
|
||||||
blobstor string
|
|
||||||
metabase string
|
type testShardOpts struct {
|
||||||
writecache string
|
openFileMetabase openFileFunc
|
||||||
pilorama string
|
openFileWriteCache openFileFunc
|
||||||
|
openFilePilorama openFileFunc
|
||||||
}
|
}
|
||||||
|
|
||||||
existsDir := filepath.Join(t.TempDir(), "shard")
|
testShard := func(opts testShardOpts) ([]shard.Option, *teststore.TestStore, *teststore.TestStore) {
|
||||||
badDir := filepath.Join(t.TempDir(), "missing")
|
|
||||||
|
|
||||||
testShard := func(c paths) []shard.Option {
|
|
||||||
sid, err := generateShardID()
|
sid, err := generateShardID()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
tempDir := t.TempDir()
|
||||||
|
blobstorPath := filepath.Join(tempDir, "bs")
|
||||||
|
metabasePath := filepath.Join(tempDir, "mb")
|
||||||
|
writecachePath := filepath.Join(tempDir, "wc")
|
||||||
|
piloramaPath := filepath.Join(tempDir, "pl")
|
||||||
|
|
||||||
|
storages, smallFileStorage, largeFileStorage := newTestStorages(blobstorPath, 1<<20)
|
||||||
|
|
||||||
return []shard.Option{
|
return []shard.Option{
|
||||||
shard.WithID(sid),
|
shard.WithID(sid),
|
||||||
shard.WithLogger(&logger.Logger{Logger: zaptest.NewLogger(t)}),
|
shard.WithLogger(&logger.Logger{Logger: zaptest.NewLogger(t)}),
|
||||||
shard.WithBlobStorOptions(
|
shard.WithBlobStorOptions(
|
||||||
blobstor.WithStorages(
|
blobstor.WithStorages(storages)),
|
||||||
newStorages(c.blobstor, 1<<20))),
|
|
||||||
shard.WithMetaBaseOptions(
|
shard.WithMetaBaseOptions(
|
||||||
meta.WithBoltDBOptions(&bbolt.Options{
|
meta.WithBoltDBOptions(&bbolt.Options{
|
||||||
Timeout: 100 * time.Millisecond,
|
Timeout: 100 * time.Millisecond,
|
||||||
|
OpenFile: opts.openFileMetabase,
|
||||||
}),
|
}),
|
||||||
meta.WithPath(c.metabase),
|
meta.WithPath(metabasePath),
|
||||||
meta.WithPermissions(0700),
|
meta.WithPermissions(0700),
|
||||||
meta.WithEpochState(epochState{})),
|
meta.WithEpochState(epochState{})),
|
||||||
shard.WithWriteCache(true),
|
shard.WithWriteCache(true),
|
||||||
shard.WithWriteCacheOptions(writecache.WithPath(c.writecache)),
|
shard.WithWriteCacheOptions(
|
||||||
shard.WithPiloramaOptions(pilorama.WithPath(c.pilorama)),
|
writecache.WithPath(writecachePath),
|
||||||
}
|
writecache.WithOpenFile(opts.openFileWriteCache),
|
||||||
|
),
|
||||||
|
shard.WithPiloramaOptions(
|
||||||
|
pilorama.WithPath(piloramaPath),
|
||||||
|
pilorama.WithOpenFile(opts.openFilePilorama),
|
||||||
|
),
|
||||||
|
}, smallFileStorage, largeFileStorage
|
||||||
}
|
}
|
||||||
|
|
||||||
t.Run("blobstor", func(t *testing.T) {
|
t.Run("blobstor", func(t *testing.T) {
|
||||||
badDir := filepath.Join(badDir, t.Name())
|
shardOpts, _, largeFileStorage := testShard(testShardOpts{
|
||||||
require.NoError(t, os.MkdirAll(badDir, os.ModePerm))
|
openFileMetabase: os.OpenFile,
|
||||||
require.NoError(t, os.Chmod(badDir, 0))
|
openFileWriteCache: os.OpenFile,
|
||||||
testEngineFailInitAndReload(t, badDir, false, testShard(paths{
|
openFilePilorama: os.OpenFile,
|
||||||
blobstor: filepath.Join(badDir, "0"),
|
})
|
||||||
metabase: filepath.Join(existsDir, t.Name(), "1"),
|
largeFileStorage.OpenPassthrough.Store(false)
|
||||||
writecache: filepath.Join(existsDir, t.Name(), "2"),
|
largeFileStorage.On("Open", mock.Anything).Return(teststore.ErrDiskExploded)
|
||||||
pilorama: filepath.Join(existsDir, t.Name(), "3"),
|
beforeReload := func() {
|
||||||
}))
|
largeFileStorage.OpenPassthrough.Store(true)
|
||||||
|
}
|
||||||
|
testEngineFailInitAndReload(t, false, shardOpts, beforeReload)
|
||||||
})
|
})
|
||||||
t.Run("metabase", func(t *testing.T) {
|
t.Run("metabase", func(t *testing.T) {
|
||||||
badDir := filepath.Join(badDir, t.Name())
|
var openFileMetabaseSucceed atomic.Bool
|
||||||
require.NoError(t, os.MkdirAll(badDir, os.ModePerm))
|
openFileMetabase := func(p string, f int, mode fs.FileMode) (*os.File, error) {
|
||||||
require.NoError(t, os.Chmod(badDir, 0))
|
if openFileMetabaseSucceed.Load() {
|
||||||
testEngineFailInitAndReload(t, badDir, true, testShard(paths{
|
return os.OpenFile(p, f, mode)
|
||||||
blobstor: filepath.Join(existsDir, t.Name(), "0"),
|
}
|
||||||
metabase: filepath.Join(badDir, "1"),
|
return nil, teststore.ErrDiskExploded
|
||||||
writecache: filepath.Join(existsDir, t.Name(), "2"),
|
}
|
||||||
pilorama: filepath.Join(existsDir, t.Name(), "3"),
|
beforeReload := func() {
|
||||||
}))
|
openFileMetabaseSucceed.Store(true)
|
||||||
|
}
|
||||||
|
shardOpts, _, _ := testShard(testShardOpts{
|
||||||
|
openFileMetabase: openFileMetabase,
|
||||||
|
openFileWriteCache: os.OpenFile,
|
||||||
|
openFilePilorama: os.OpenFile,
|
||||||
|
})
|
||||||
|
testEngineFailInitAndReload(t, true, shardOpts, beforeReload)
|
||||||
})
|
})
|
||||||
t.Run("write-cache", func(t *testing.T) {
|
t.Run("write-cache", func(t *testing.T) {
|
||||||
badDir := filepath.Join(badDir, t.Name())
|
var openFileWriteCacheSucceed atomic.Bool
|
||||||
require.NoError(t, os.MkdirAll(badDir, os.ModePerm))
|
openFileWriteCache := func(p string, f int, mode fs.FileMode) (*os.File, error) {
|
||||||
require.NoError(t, os.Chmod(badDir, 0))
|
if openFileWriteCacheSucceed.Load() {
|
||||||
testEngineFailInitAndReload(t, badDir, false, testShard(paths{
|
return os.OpenFile(p, f, mode)
|
||||||
blobstor: filepath.Join(existsDir, t.Name(), "0"),
|
}
|
||||||
metabase: filepath.Join(existsDir, t.Name(), "1"),
|
return nil, teststore.ErrDiskExploded
|
||||||
writecache: filepath.Join(badDir, "2"),
|
}
|
||||||
pilorama: filepath.Join(existsDir, t.Name(), "3"),
|
beforeReload := func() {
|
||||||
}))
|
openFileWriteCacheSucceed.Store(true)
|
||||||
|
}
|
||||||
|
shardOpts, _, _ := testShard(testShardOpts{
|
||||||
|
openFileMetabase: os.OpenFile,
|
||||||
|
openFileWriteCache: openFileWriteCache,
|
||||||
|
openFilePilorama: os.OpenFile,
|
||||||
|
})
|
||||||
|
testEngineFailInitAndReload(t, false, shardOpts, beforeReload)
|
||||||
})
|
})
|
||||||
t.Run("pilorama", func(t *testing.T) {
|
t.Run("pilorama", func(t *testing.T) {
|
||||||
badDir := filepath.Join(badDir, t.Name())
|
var openFilePiloramaSucceed atomic.Bool
|
||||||
require.NoError(t, os.MkdirAll(badDir, os.ModePerm))
|
openFilePilorama := func(p string, f int, mode fs.FileMode) (*os.File, error) {
|
||||||
require.NoError(t, os.Chmod(badDir, 0))
|
if openFilePiloramaSucceed.Load() {
|
||||||
testEngineFailInitAndReload(t, badDir, false, testShard(paths{
|
return os.OpenFile(p, f, mode)
|
||||||
blobstor: filepath.Join(existsDir, t.Name(), "0"),
|
}
|
||||||
metabase: filepath.Join(existsDir, t.Name(), "1"),
|
return nil, teststore.ErrDiskExploded
|
||||||
writecache: filepath.Join(existsDir, t.Name(), "2"),
|
}
|
||||||
pilorama: filepath.Join(badDir, "3"),
|
beforeReload := func() {
|
||||||
}))
|
openFilePiloramaSucceed.Store(true)
|
||||||
|
}
|
||||||
|
shardOpts, _, _ := testShard(testShardOpts{
|
||||||
|
openFileMetabase: os.OpenFile,
|
||||||
|
openFileWriteCache: os.OpenFile,
|
||||||
|
openFilePilorama: openFilePilorama,
|
||||||
|
})
|
||||||
|
testEngineFailInitAndReload(t, false, shardOpts, beforeReload)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func testEngineFailInitAndReload(t *testing.T, badDir string, errOnAdd bool, s []shard.Option) {
|
func testEngineFailInitAndReload(t *testing.T, errOnAdd bool, opts []shard.Option, beforeReload func()) {
|
||||||
var configID string
|
var configID string
|
||||||
|
|
||||||
e := New()
|
e := New()
|
||||||
_, err := e.AddShard(s...)
|
_, err := e.AddShard(opts...)
|
||||||
if errOnAdd {
|
if errOnAdd {
|
||||||
require.Error(t, err)
|
require.Error(t, err)
|
||||||
// This branch is only taken when we cannot update shard ID in the metabase.
|
// This branch is only taken when we cannot update shard ID in the metabase.
|
||||||
|
@ -138,9 +177,10 @@ func testEngineFailInitAndReload(t *testing.T, badDir string, errOnAdd bool, s [
|
||||||
e.mtx.RUnlock()
|
e.mtx.RUnlock()
|
||||||
require.Equal(t, 0, shardCount)
|
require.Equal(t, 0, shardCount)
|
||||||
|
|
||||||
require.NoError(t, os.Chmod(badDir, os.ModePerm))
|
beforeReload()
|
||||||
|
|
||||||
require.NoError(t, e.Reload(ReConfiguration{
|
require.NoError(t, e.Reload(ReConfiguration{
|
||||||
shards: map[string][]shard.Option{configID: s},
|
shards: map[string][]shard.Option{configID: opts},
|
||||||
}))
|
}))
|
||||||
|
|
||||||
e.mtx.RLock()
|
e.mtx.RLock()
|
||||||
|
@ -192,26 +232,28 @@ func TestPersistentShardID(t *testing.T) {
|
||||||
dir, err := os.MkdirTemp("", "*")
|
dir, err := os.MkdirTemp("", "*")
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
e, _, id := newEngineWithErrorThreshold(t, dir, 1)
|
te := newEngineWithErrorThreshold(t, dir, 1)
|
||||||
|
|
||||||
checkShardState(t, e, id[0], 0, mode.ReadWrite)
|
checkShardState(t, te.ng, te.shards[0].id, 0, mode.ReadWrite)
|
||||||
require.NoError(t, e.Close())
|
require.NoError(t, te.ng.Close())
|
||||||
|
|
||||||
e, _, newID := newEngineWithErrorThreshold(t, dir, 1)
|
newTe := newEngineWithErrorThreshold(t, dir, 1)
|
||||||
require.Equal(t, id, newID)
|
for i := 0; i < len(newTe.shards); i++ {
|
||||||
require.NoError(t, e.Close())
|
require.Equal(t, te.shards[i].id, newTe.shards[i].id)
|
||||||
|
}
|
||||||
|
require.NoError(t, newTe.ng.Close())
|
||||||
|
|
||||||
p1 := e.shards[id[0].String()].Shard.DumpInfo().MetaBaseInfo.Path
|
p1 := newTe.ng.shards[te.shards[0].id.String()].Shard.DumpInfo().MetaBaseInfo.Path
|
||||||
p2 := e.shards[id[1].String()].Shard.DumpInfo().MetaBaseInfo.Path
|
p2 := newTe.ng.shards[te.shards[1].id.String()].Shard.DumpInfo().MetaBaseInfo.Path
|
||||||
tmp := filepath.Join(dir, "tmp")
|
tmp := filepath.Join(dir, "tmp")
|
||||||
require.NoError(t, os.Rename(p1, tmp))
|
require.NoError(t, os.Rename(p1, tmp))
|
||||||
require.NoError(t, os.Rename(p2, p1))
|
require.NoError(t, os.Rename(p2, p1))
|
||||||
require.NoError(t, os.Rename(tmp, p2))
|
require.NoError(t, os.Rename(tmp, p2))
|
||||||
|
|
||||||
e, _, newID = newEngineWithErrorThreshold(t, dir, 1)
|
newTe = newEngineWithErrorThreshold(t, dir, 1)
|
||||||
require.Equal(t, id[1], newID[0])
|
require.Equal(t, te.shards[1].id, newTe.shards[0].id)
|
||||||
require.Equal(t, id[0], newID[1])
|
require.Equal(t, te.shards[0].id, newTe.shards[1].id)
|
||||||
require.NoError(t, e.Close())
|
require.NoError(t, newTe.ng.Close())
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -9,6 +9,7 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/blobovniczatree"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/blobovniczatree"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/teststore"
|
||||||
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
|
||||||
|
@ -120,6 +121,30 @@ func newStorages(root string, smallSize uint64) []blobstor.SubStorage {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func newTestStorages(root string, smallSize uint64) ([]blobstor.SubStorage, *teststore.TestStore, *teststore.TestStore) {
|
||||||
|
smallFileStorage := teststore.NewFromStorage(blobovniczatree.NewBlobovniczaTree(
|
||||||
|
blobovniczatree.WithRootPath(filepath.Join(root, "blobovnicza")),
|
||||||
|
blobovniczatree.WithBlobovniczaShallowDepth(1),
|
||||||
|
blobovniczatree.WithBlobovniczaShallowWidth(1),
|
||||||
|
blobovniczatree.WithPermissions(0700)),
|
||||||
|
)
|
||||||
|
largeFileStorage := teststore.NewFromStorage(fstree.New(
|
||||||
|
fstree.WithPath(root),
|
||||||
|
fstree.WithDepth(1),
|
||||||
|
))
|
||||||
|
return []blobstor.SubStorage{
|
||||||
|
{
|
||||||
|
Storage: smallFileStorage,
|
||||||
|
Policy: func(_ *object.Object, data []byte) bool {
|
||||||
|
return uint64(len(data)) < smallSize
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Storage: largeFileStorage,
|
||||||
|
},
|
||||||
|
}, smallFileStorage, largeFileStorage
|
||||||
|
}
|
||||||
|
|
||||||
func testNewShard(t testing.TB, id int) *shard.Shard {
|
func testNewShard(t testing.TB, id int) *shard.Shard {
|
||||||
sid, err := generateShardID()
|
sid, err := generateShardID()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
|
@ -9,6 +9,8 @@ import (
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/teststore"
|
||||||
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
|
||||||
|
@ -17,13 +19,26 @@ import (
|
||||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||||
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
|
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
|
||||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
|
"github.com/stretchr/testify/mock"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
"go.uber.org/zap/zaptest"
|
"go.uber.org/zap/zaptest"
|
||||||
)
|
)
|
||||||
|
|
||||||
const errSmallSize = 256
|
const errSmallSize = 256
|
||||||
|
|
||||||
func newEngineWithErrorThreshold(t testing.TB, dir string, errThreshold uint32) (*StorageEngine, string, [2]*shard.ID) {
|
type testEngine struct {
|
||||||
|
ng *StorageEngine
|
||||||
|
dir string
|
||||||
|
shards [2]*testShard
|
||||||
|
}
|
||||||
|
|
||||||
|
type testShard struct {
|
||||||
|
id *shard.ID
|
||||||
|
smallFileStorage *teststore.TestStore
|
||||||
|
largeFileStorage *teststore.TestStore
|
||||||
|
}
|
||||||
|
|
||||||
|
func newEngineWithErrorThreshold(t testing.TB, dir string, errThreshold uint32) *testEngine {
|
||||||
if dir == "" {
|
if dir == "" {
|
||||||
var err error
|
var err error
|
||||||
|
|
||||||
|
@ -37,14 +52,13 @@ func newEngineWithErrorThreshold(t testing.TB, dir string, errThreshold uint32)
|
||||||
WithShardPoolSize(1),
|
WithShardPoolSize(1),
|
||||||
WithErrorThreshold(errThreshold))
|
WithErrorThreshold(errThreshold))
|
||||||
|
|
||||||
var ids [2]*shard.ID
|
var testShards [2]*testShard
|
||||||
var err error
|
|
||||||
|
|
||||||
for i := range ids {
|
for i := range testShards {
|
||||||
ids[i], err = e.AddShard(
|
storages, smallFileStorage, largeFileStorage := newTestStorages(filepath.Join(dir, strconv.Itoa(i)), errSmallSize)
|
||||||
|
id, err := e.AddShard(
|
||||||
shard.WithLogger(&logger.Logger{Logger: zaptest.NewLogger(t)}),
|
shard.WithLogger(&logger.Logger{Logger: zaptest.NewLogger(t)}),
|
||||||
shard.WithBlobStorOptions(
|
shard.WithBlobStorOptions(blobstor.WithStorages(storages)),
|
||||||
blobstor.WithStorages(newStorages(filepath.Join(dir, strconv.Itoa(i)), errSmallSize))),
|
|
||||||
shard.WithMetaBaseOptions(
|
shard.WithMetaBaseOptions(
|
||||||
meta.WithPath(filepath.Join(dir, fmt.Sprintf("%d.metabase", i))),
|
meta.WithPath(filepath.Join(dir, fmt.Sprintf("%d.metabase", i))),
|
||||||
meta.WithPermissions(0700),
|
meta.WithPermissions(0700),
|
||||||
|
@ -54,94 +68,109 @@ func newEngineWithErrorThreshold(t testing.TB, dir string, errThreshold uint32)
|
||||||
pilorama.WithPath(filepath.Join(dir, fmt.Sprintf("%d.pilorama", i))),
|
pilorama.WithPath(filepath.Join(dir, fmt.Sprintf("%d.pilorama", i))),
|
||||||
pilorama.WithPerm(0700)))
|
pilorama.WithPerm(0700)))
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
testShards[i] = &testShard{
|
||||||
|
id: id,
|
||||||
|
smallFileStorage: smallFileStorage,
|
||||||
|
largeFileStorage: largeFileStorage,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
require.NoError(t, e.Open())
|
require.NoError(t, e.Open())
|
||||||
require.NoError(t, e.Init())
|
require.NoError(t, e.Init())
|
||||||
|
|
||||||
return e, dir, ids
|
return &testEngine{
|
||||||
|
ng: e,
|
||||||
|
dir: dir,
|
||||||
|
shards: testShards,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestErrorReporting(t *testing.T) {
|
func TestErrorReporting(t *testing.T) {
|
||||||
t.Run("ignore errors by default", func(t *testing.T) {
|
t.Run("ignore errors by default", func(t *testing.T) {
|
||||||
e, dir, id := newEngineWithErrorThreshold(t, "", 0)
|
te := newEngineWithErrorThreshold(t, "", 0)
|
||||||
|
|
||||||
obj := generateObjectWithCID(t, cidtest.ID())
|
obj := generateObjectWithCID(t, cidtest.ID())
|
||||||
obj.SetPayload(make([]byte, errSmallSize))
|
obj.SetPayload(make([]byte, errSmallSize))
|
||||||
|
|
||||||
var prm shard.PutPrm
|
var prm shard.PutPrm
|
||||||
prm.SetObject(obj)
|
prm.SetObject(obj)
|
||||||
e.mtx.RLock()
|
te.ng.mtx.RLock()
|
||||||
_, err := e.shards[id[0].String()].Shard.Put(prm)
|
_, err := te.ng.shards[te.shards[0].id.String()].Shard.Put(prm)
|
||||||
e.mtx.RUnlock()
|
te.ng.mtx.RUnlock()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
_, err = e.Get(GetPrm{addr: object.AddressOf(obj)})
|
_, err = te.ng.Get(GetPrm{addr: object.AddressOf(obj)})
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
checkShardState(t, e, id[0], 0, mode.ReadWrite)
|
checkShardState(t, te.ng, te.shards[0].id, 0, mode.ReadWrite)
|
||||||
checkShardState(t, e, id[1], 0, mode.ReadWrite)
|
checkShardState(t, te.ng, te.shards[1].id, 0, mode.ReadWrite)
|
||||||
|
|
||||||
corruptSubDir(t, filepath.Join(dir, "0"))
|
for _, shard := range te.shards {
|
||||||
|
shard.largeFileStorage.GetPassthrough.Store(false)
|
||||||
|
shard.largeFileStorage.On("Get", mock.Anything).Return(common.GetRes{}, teststore.ErrDiskExploded)
|
||||||
|
}
|
||||||
|
|
||||||
for i := uint32(1); i < 3; i++ {
|
for i := uint32(1); i < 3; i++ {
|
||||||
_, err = e.Get(GetPrm{addr: object.AddressOf(obj)})
|
_, err = te.ng.Get(GetPrm{addr: object.AddressOf(obj)})
|
||||||
require.Error(t, err)
|
require.Error(t, err)
|
||||||
checkShardState(t, e, id[0], i, mode.ReadWrite)
|
checkShardState(t, te.ng, te.shards[0].id, i, mode.ReadWrite)
|
||||||
checkShardState(t, e, id[1], 0, mode.ReadWrite)
|
checkShardState(t, te.ng, te.shards[1].id, 0, mode.ReadWrite)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
t.Run("with error threshold", func(t *testing.T) {
|
t.Run("with error threshold", func(t *testing.T) {
|
||||||
const errThreshold = 3
|
const errThreshold = 3
|
||||||
|
|
||||||
e, dir, id := newEngineWithErrorThreshold(t, "", errThreshold)
|
te := newEngineWithErrorThreshold(t, "", errThreshold)
|
||||||
|
|
||||||
obj := generateObjectWithCID(t, cidtest.ID())
|
obj := generateObjectWithCID(t, cidtest.ID())
|
||||||
obj.SetPayload(make([]byte, errSmallSize))
|
obj.SetPayload(make([]byte, errSmallSize))
|
||||||
|
|
||||||
var prm shard.PutPrm
|
var prm shard.PutPrm
|
||||||
prm.SetObject(obj)
|
prm.SetObject(obj)
|
||||||
e.mtx.RLock()
|
te.ng.mtx.RLock()
|
||||||
_, err := e.shards[id[0].String()].Put(prm)
|
_, err := te.ng.shards[te.shards[0].id.String()].Put(prm)
|
||||||
e.mtx.RUnlock()
|
te.ng.mtx.RUnlock()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
_, err = e.Get(GetPrm{addr: object.AddressOf(obj)})
|
_, err = te.ng.Get(GetPrm{addr: object.AddressOf(obj)})
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
checkShardState(t, e, id[0], 0, mode.ReadWrite)
|
checkShardState(t, te.ng, te.shards[0].id, 0, mode.ReadWrite)
|
||||||
checkShardState(t, e, id[1], 0, mode.ReadWrite)
|
checkShardState(t, te.ng, te.shards[1].id, 0, mode.ReadWrite)
|
||||||
|
|
||||||
corruptSubDir(t, filepath.Join(dir, "0"))
|
for _, shard := range te.shards {
|
||||||
|
shard.largeFileStorage.GetPassthrough.Store(false)
|
||||||
|
shard.largeFileStorage.On("Get", mock.Anything).Return(common.GetRes{}, teststore.ErrDiskExploded)
|
||||||
|
}
|
||||||
|
|
||||||
for i := uint32(1); i < errThreshold; i++ {
|
for i := uint32(1); i < errThreshold; i++ {
|
||||||
_, err = e.Get(GetPrm{addr: object.AddressOf(obj)})
|
_, err = te.ng.Get(GetPrm{addr: object.AddressOf(obj)})
|
||||||
require.Error(t, err)
|
require.Error(t, err)
|
||||||
checkShardState(t, e, id[0], i, mode.ReadWrite)
|
checkShardState(t, te.ng, te.shards[0].id, i, mode.ReadWrite)
|
||||||
checkShardState(t, e, id[1], 0, mode.ReadWrite)
|
checkShardState(t, te.ng, te.shards[1].id, 0, mode.ReadWrite)
|
||||||
}
|
}
|
||||||
|
|
||||||
for i := uint32(0); i < 2; i++ {
|
for i := uint32(0); i < 2; i++ {
|
||||||
_, err = e.Get(GetPrm{addr: object.AddressOf(obj)})
|
_, err = te.ng.Get(GetPrm{addr: object.AddressOf(obj)})
|
||||||
require.Error(t, err)
|
require.Error(t, err)
|
||||||
checkShardState(t, e, id[0], errThreshold+i, mode.DegradedReadOnly)
|
checkShardState(t, te.ng, te.shards[0].id, errThreshold+i, mode.DegradedReadOnly)
|
||||||
checkShardState(t, e, id[1], 0, mode.ReadWrite)
|
checkShardState(t, te.ng, te.shards[1].id, 0, mode.ReadWrite)
|
||||||
}
|
}
|
||||||
|
|
||||||
require.NoError(t, e.SetShardMode(id[0], mode.ReadWrite, false))
|
require.NoError(t, te.ng.SetShardMode(te.shards[0].id, mode.ReadWrite, false))
|
||||||
checkShardState(t, e, id[0], errThreshold+1, mode.ReadWrite)
|
checkShardState(t, te.ng, te.shards[0].id, errThreshold+1, mode.ReadWrite)
|
||||||
|
|
||||||
require.NoError(t, e.SetShardMode(id[0], mode.ReadWrite, true))
|
require.NoError(t, te.ng.SetShardMode(te.shards[0].id, mode.ReadWrite, true))
|
||||||
checkShardState(t, e, id[0], 0, mode.ReadWrite)
|
checkShardState(t, te.ng, te.shards[0].id, 0, mode.ReadWrite)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// Issue #1186.
|
|
||||||
func TestBlobstorFailback(t *testing.T) {
|
func TestBlobstorFailback(t *testing.T) {
|
||||||
dir, err := os.MkdirTemp("", "*")
|
dir, err := os.MkdirTemp("", "*")
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
t.Cleanup(func() { require.NoError(t, os.RemoveAll(dir)) })
|
t.Cleanup(func() { require.NoError(t, os.RemoveAll(dir)) })
|
||||||
|
|
||||||
e, _, id := newEngineWithErrorThreshold(t, dir, 1)
|
te := newEngineWithErrorThreshold(t, dir, 1)
|
||||||
|
|
||||||
objs := make([]*objectSDK.Object, 0, 2)
|
objs := make([]*objectSDK.Object, 0, 2)
|
||||||
for _, size := range []int{15, errSmallSize + 1} {
|
for _, size := range []int{15, errSmallSize + 1} {
|
||||||
|
@ -150,49 +179,49 @@ func TestBlobstorFailback(t *testing.T) {
|
||||||
|
|
||||||
var prm shard.PutPrm
|
var prm shard.PutPrm
|
||||||
prm.SetObject(obj)
|
prm.SetObject(obj)
|
||||||
e.mtx.RLock()
|
te.ng.mtx.RLock()
|
||||||
_, err = e.shards[id[0].String()].Shard.Put(prm)
|
_, err = te.ng.shards[te.shards[0].id.String()].Shard.Put(prm)
|
||||||
e.mtx.RUnlock()
|
te.ng.mtx.RUnlock()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
objs = append(objs, obj)
|
objs = append(objs, obj)
|
||||||
}
|
}
|
||||||
|
|
||||||
for i := range objs {
|
for i := range objs {
|
||||||
addr := object.AddressOf(objs[i])
|
addr := object.AddressOf(objs[i])
|
||||||
_, err = e.Get(GetPrm{addr: addr})
|
_, err = te.ng.Get(GetPrm{addr: addr})
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
_, err = e.GetRange(RngPrm{addr: addr})
|
_, err = te.ng.GetRange(RngPrm{addr: addr})
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
checkShardState(t, e, id[0], 0, mode.ReadWrite)
|
checkShardState(t, te.ng, te.shards[0].id, 0, mode.ReadWrite)
|
||||||
require.NoError(t, e.Close())
|
require.NoError(t, te.ng.Close())
|
||||||
|
|
||||||
p1 := e.shards[id[0].String()].Shard.DumpInfo().BlobStorInfo.SubStorages[1].Path
|
p1 := te.ng.shards[te.shards[0].id.String()].Shard.DumpInfo().BlobStorInfo.SubStorages[1].Path
|
||||||
p2 := e.shards[id[1].String()].Shard.DumpInfo().BlobStorInfo.SubStorages[1].Path
|
p2 := te.ng.shards[te.shards[1].id.String()].Shard.DumpInfo().BlobStorInfo.SubStorages[1].Path
|
||||||
tmp := filepath.Join(dir, "tmp")
|
tmp := filepath.Join(dir, "tmp")
|
||||||
require.NoError(t, os.Rename(p1, tmp))
|
require.NoError(t, os.Rename(p1, tmp))
|
||||||
require.NoError(t, os.Rename(p2, p1))
|
require.NoError(t, os.Rename(p2, p1))
|
||||||
require.NoError(t, os.Rename(tmp, p2))
|
require.NoError(t, os.Rename(tmp, p2))
|
||||||
|
|
||||||
e, _, id = newEngineWithErrorThreshold(t, dir, 1)
|
te = newEngineWithErrorThreshold(t, dir, 1)
|
||||||
|
|
||||||
for i := range objs {
|
for i := range objs {
|
||||||
addr := object.AddressOf(objs[i])
|
addr := object.AddressOf(objs[i])
|
||||||
getRes, err := e.Get(GetPrm{addr: addr})
|
getRes, err := te.ng.Get(GetPrm{addr: addr})
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, objs[i], getRes.Object())
|
require.Equal(t, objs[i], getRes.Object())
|
||||||
|
|
||||||
rngRes, err := e.GetRange(RngPrm{addr: addr, off: 1, ln: 10})
|
rngRes, err := te.ng.GetRange(RngPrm{addr: addr, off: 1, ln: 10})
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, objs[i].Payload()[1:11], rngRes.Object().Payload())
|
require.Equal(t, objs[i].Payload()[1:11], rngRes.Object().Payload())
|
||||||
|
|
||||||
_, err = e.GetRange(RngPrm{addr: addr, off: errSmallSize + 10, ln: 1})
|
_, err = te.ng.GetRange(RngPrm{addr: addr, off: errSmallSize + 10, ln: 1})
|
||||||
require.ErrorAs(t, err, &apistatus.ObjectOutOfRange{})
|
require.ErrorAs(t, err, &apistatus.ObjectOutOfRange{})
|
||||||
}
|
}
|
||||||
|
|
||||||
checkShardState(t, e, id[0], 1, mode.DegradedReadOnly)
|
checkShardState(t, te.ng, te.shards[0].id, 1, mode.DegradedReadOnly)
|
||||||
checkShardState(t, e, id[1], 0, mode.ReadWrite)
|
checkShardState(t, te.ng, te.shards[1].id, 0, mode.ReadWrite)
|
||||||
}
|
}
|
||||||
|
|
||||||
func checkShardState(t *testing.T, e *StorageEngine, id *shard.ID, errCount uint32, mode mode.Mode) {
|
func checkShardState(t *testing.T, e *StorageEngine, id *shard.ID, errCount uint32, mode mode.Mode) {
|
||||||
|
@ -203,19 +232,3 @@ func checkShardState(t *testing.T, e *StorageEngine, id *shard.ID, errCount uint
|
||||||
require.Equal(t, errCount, sh.errorCount.Load())
|
require.Equal(t, errCount, sh.errorCount.Load())
|
||||||
require.Equal(t, mode, sh.GetMode())
|
require.Equal(t, mode, sh.GetMode())
|
||||||
}
|
}
|
||||||
|
|
||||||
// corruptSubDir makes random directory except "blobovnicza" in blobstor FSTree unreadable.
|
|
||||||
func corruptSubDir(t *testing.T, dir string) {
|
|
||||||
de, err := os.ReadDir(dir)
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
// FIXME(@cthulhu-rider): copy-paste of unexported const from blobstor package, see #1407
|
|
||||||
const dirBlobovnicza = "blobovnicza"
|
|
||||||
|
|
||||||
for i := range de {
|
|
||||||
if de[i].IsDir() && de[i].Name() != dirBlobovnicza {
|
|
||||||
require.NoError(t, os.Chmod(filepath.Join(dir, de[i].Name()), 0))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
|
@ -22,7 +22,7 @@ func BenchmarkTreeVsSearch(b *testing.B) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func benchmarkTreeVsSearch(b *testing.B, objCount int) {
|
func benchmarkTreeVsSearch(b *testing.B, objCount int) {
|
||||||
e, _, _ := newEngineWithErrorThreshold(b, "", 0)
|
te := newEngineWithErrorThreshold(b, "", 0)
|
||||||
cid := cidtest.ID()
|
cid := cidtest.ID()
|
||||||
d := pilorama.CIDDescriptor{CID: cid, Position: 0, Size: 1}
|
d := pilorama.CIDDescriptor{CID: cid, Position: 0, Size: 1}
|
||||||
treeID := "someTree"
|
treeID := "someTree"
|
||||||
|
@ -30,11 +30,11 @@ func benchmarkTreeVsSearch(b *testing.B, objCount int) {
|
||||||
for i := 0; i < objCount; i++ {
|
for i := 0; i < objCount; i++ {
|
||||||
obj := generateObjectWithCID(b, cid)
|
obj := generateObjectWithCID(b, cid)
|
||||||
addAttribute(obj, pilorama.AttributeFilename, strconv.Itoa(i))
|
addAttribute(obj, pilorama.AttributeFilename, strconv.Itoa(i))
|
||||||
err := Put(e, obj)
|
err := Put(te.ng, obj)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
b.Fatal(err)
|
b.Fatal(err)
|
||||||
}
|
}
|
||||||
_, err = e.TreeAddByPath(d, treeID, pilorama.AttributeFilename, nil,
|
_, err = te.ng.TreeAddByPath(d, treeID, pilorama.AttributeFilename, nil,
|
||||||
[]pilorama.KeyValue{{pilorama.AttributeFilename, []byte(strconv.Itoa(i))}})
|
[]pilorama.KeyValue{{pilorama.AttributeFilename, []byte(strconv.Itoa(i))}})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
b.Fatal(err)
|
b.Fatal(err)
|
||||||
|
@ -50,7 +50,7 @@ func benchmarkTreeVsSearch(b *testing.B, objCount int) {
|
||||||
prm.WithFilters(fs)
|
prm.WithFilters(fs)
|
||||||
|
|
||||||
for i := 0; i < b.N; i++ {
|
for i := 0; i < b.N; i++ {
|
||||||
res, err := e.Select(prm)
|
res, err := te.ng.Select(prm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
b.Fatal(err)
|
b.Fatal(err)
|
||||||
}
|
}
|
||||||
|
@ -61,7 +61,7 @@ func benchmarkTreeVsSearch(b *testing.B, objCount int) {
|
||||||
})
|
})
|
||||||
b.Run("TreeGetByPath", func(b *testing.B) {
|
b.Run("TreeGetByPath", func(b *testing.B) {
|
||||||
for i := 0; i < b.N; i++ {
|
for i := 0; i < b.N; i++ {
|
||||||
nodes, err := e.TreeGetByPath(cid, treeID, pilorama.AttributeFilename, []string{strconv.Itoa(objCount / 2)}, true)
|
nodes, err := te.ng.TreeGetByPath(cid, treeID, pilorama.AttributeFilename, []string{strconv.Itoa(objCount / 2)}, true)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
b.Fatal(err)
|
b.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -66,6 +66,7 @@ func NewBoltForest(opts ...Option) ForestStorage {
|
||||||
perm: os.ModePerm,
|
perm: os.ModePerm,
|
||||||
maxBatchDelay: bbolt.DefaultMaxBatchDelay,
|
maxBatchDelay: bbolt.DefaultMaxBatchDelay,
|
||||||
maxBatchSize: bbolt.DefaultMaxBatchSize,
|
maxBatchSize: bbolt.DefaultMaxBatchSize,
|
||||||
|
openFile: os.OpenFile,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -107,6 +108,7 @@ func (t *boltForest) Open(readOnly bool) error {
|
||||||
opts.ReadOnly = readOnly
|
opts.ReadOnly = readOnly
|
||||||
opts.NoSync = t.noSync
|
opts.NoSync = t.noSync
|
||||||
opts.Timeout = 100 * time.Millisecond
|
opts.Timeout = 100 * time.Millisecond
|
||||||
|
opts.OpenFile = t.openFile
|
||||||
|
|
||||||
t.db, err = bbolt.Open(t.path, t.perm, &opts)
|
t.db, err = bbolt.Open(t.path, t.perm, &opts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -2,6 +2,7 @@ package pilorama
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"io/fs"
|
"io/fs"
|
||||||
|
"os"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -13,6 +14,7 @@ type cfg struct {
|
||||||
noSync bool
|
noSync bool
|
||||||
maxBatchDelay time.Duration
|
maxBatchDelay time.Duration
|
||||||
maxBatchSize int
|
maxBatchSize int
|
||||||
|
openFile func(string, int, fs.FileMode) (*os.File, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
func WithPath(path string) Option {
|
func WithPath(path string) Option {
|
||||||
|
@ -44,3 +46,9 @@ func WithMaxBatchSize(size int) Option {
|
||||||
c.maxBatchSize = size
|
c.maxBatchSize = size
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func WithOpenFile(openFile func(string, int, fs.FileMode) (*os.File, error)) Option {
|
||||||
|
return func(c *cfg) {
|
||||||
|
c.openFile = openFile
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -1,14 +1,18 @@
|
||||||
package shard
|
package shard
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"io/fs"
|
||||||
|
"math"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
"sync/atomic"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/teststore"
|
||||||
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
|
||||||
|
@ -22,6 +26,7 @@ import (
|
||||||
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
|
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
|
||||||
objecttest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/test"
|
objecttest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/test"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
|
"go.etcd.io/bbolt"
|
||||||
"go.uber.org/zap/zaptest"
|
"go.uber.org/zap/zaptest"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -40,19 +45,32 @@ func TestShardOpen(t *testing.T) {
|
||||||
dir := t.TempDir()
|
dir := t.TempDir()
|
||||||
metaPath := filepath.Join(dir, "meta")
|
metaPath := filepath.Join(dir, "meta")
|
||||||
|
|
||||||
|
st := teststore.NewFromStorage(fstree.New(
|
||||||
|
fstree.WithDirNameLen(2),
|
||||||
|
fstree.WithPath(filepath.Join(dir, "blob")),
|
||||||
|
fstree.WithDepth(1)),
|
||||||
|
)
|
||||||
|
|
||||||
|
var allowedMode atomic.Int64
|
||||||
|
openFileMetabase := func(p string, f int, perm fs.FileMode) (*os.File, error) {
|
||||||
|
if int64(f&3) == allowedMode.Load() {
|
||||||
|
return os.OpenFile(p, f, perm)
|
||||||
|
}
|
||||||
|
return nil, teststore.ErrDiskExploded
|
||||||
|
}
|
||||||
|
|
||||||
newShard := func() *Shard {
|
newShard := func() *Shard {
|
||||||
return New(
|
return New(
|
||||||
WithLogger(&logger.Logger{Logger: zaptest.NewLogger(t)}),
|
WithLogger(&logger.Logger{Logger: zaptest.NewLogger(t)}),
|
||||||
WithBlobStorOptions(
|
WithBlobStorOptions(
|
||||||
blobstor.WithStorages([]blobstor.SubStorage{
|
blobstor.WithStorages([]blobstor.SubStorage{
|
||||||
{
|
{Storage: st},
|
||||||
Storage: fstree.New(
|
|
||||||
fstree.WithDirNameLen(2),
|
|
||||||
fstree.WithPath(filepath.Join(dir, "blob")),
|
|
||||||
fstree.WithDepth(1)),
|
|
||||||
},
|
|
||||||
})),
|
})),
|
||||||
WithMetaBaseOptions(meta.WithPath(metaPath), meta.WithEpochState(epochState{})),
|
WithMetaBaseOptions(
|
||||||
|
meta.WithPath(metaPath),
|
||||||
|
meta.WithEpochState(epochState{}),
|
||||||
|
meta.WithBoltDBOptions(&bbolt.Options{OpenFile: openFileMetabase}),
|
||||||
|
),
|
||||||
WithPiloramaOptions(
|
WithPiloramaOptions(
|
||||||
pilorama.WithPath(filepath.Join(dir, "pilorama"))),
|
pilorama.WithPath(filepath.Join(dir, "pilorama"))),
|
||||||
WithWriteCache(true),
|
WithWriteCache(true),
|
||||||
|
@ -60,6 +78,8 @@ func TestShardOpen(t *testing.T) {
|
||||||
writecache.WithPath(filepath.Join(dir, "wc"))))
|
writecache.WithPath(filepath.Join(dir, "wc"))))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
allowedMode.Store(int64(os.O_RDWR))
|
||||||
|
|
||||||
sh := newShard()
|
sh := newShard()
|
||||||
require.NoError(t, sh.Open())
|
require.NoError(t, sh.Open())
|
||||||
require.NoError(t, sh.Init())
|
require.NoError(t, sh.Init())
|
||||||
|
@ -67,7 +87,8 @@ func TestShardOpen(t *testing.T) {
|
||||||
require.NoError(t, sh.Close())
|
require.NoError(t, sh.Close())
|
||||||
|
|
||||||
// Metabase can be opened in read-only => start in ReadOnly mode.
|
// Metabase can be opened in read-only => start in ReadOnly mode.
|
||||||
require.NoError(t, os.Chmod(metaPath, 0444))
|
allowedMode.Store(int64(os.O_RDONLY))
|
||||||
|
|
||||||
sh = newShard()
|
sh = newShard()
|
||||||
require.NoError(t, sh.Open())
|
require.NoError(t, sh.Open())
|
||||||
require.NoError(t, sh.Init())
|
require.NoError(t, sh.Init())
|
||||||
|
@ -77,7 +98,8 @@ func TestShardOpen(t *testing.T) {
|
||||||
require.NoError(t, sh.Close())
|
require.NoError(t, sh.Close())
|
||||||
|
|
||||||
// Metabase is corrupted => start in DegradedReadOnly mode.
|
// Metabase is corrupted => start in DegradedReadOnly mode.
|
||||||
require.NoError(t, os.Chmod(metaPath, 0000))
|
allowedMode.Store(math.MaxInt64)
|
||||||
|
|
||||||
sh = newShard()
|
sh = newShard()
|
||||||
require.NoError(t, sh.Open())
|
require.NoError(t, sh.Open())
|
||||||
require.NoError(t, sh.Init())
|
require.NoError(t, sh.Init())
|
||||||
|
|
|
@ -46,12 +46,13 @@ func TestFlush(t *testing.T) {
|
||||||
require.NoError(t, mb.Open(false))
|
require.NoError(t, mb.Open(false))
|
||||||
require.NoError(t, mb.Init())
|
require.NoError(t, mb.Init())
|
||||||
|
|
||||||
fsTree := fstree.New(
|
bs := blobstor.New(blobstor.WithStorages([]blobstor.SubStorage{
|
||||||
|
{
|
||||||
|
Storage: fstree.New(
|
||||||
fstree.WithPath(filepath.Join(dir, "blob")),
|
fstree.WithPath(filepath.Join(dir, "blob")),
|
||||||
fstree.WithDepth(0),
|
fstree.WithDepth(0),
|
||||||
fstree.WithDirNameLen(1))
|
fstree.WithDirNameLen(1)),
|
||||||
bs := blobstor.New(blobstor.WithStorages([]blobstor.SubStorage{
|
},
|
||||||
{Storage: fsTree},
|
|
||||||
}))
|
}))
|
||||||
require.NoError(t, bs.Open(false))
|
require.NoError(t, bs.Open(false))
|
||||||
require.NoError(t, bs.Init())
|
require.NoError(t, bs.Init())
|
||||||
|
@ -208,7 +209,7 @@ func TestFlush(t *testing.T) {
|
||||||
|
|
||||||
_, err = os.Stat(p) // sanity check
|
_, err = os.Stat(p) // sanity check
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.NoError(t, os.Chmod(p, 0))
|
require.NoError(t, os.Truncate(p, 0))
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
t.Run("fs, invalid object", func(t *testing.T) {
|
t.Run("fs, invalid object", func(t *testing.T) {
|
||||||
|
|
|
@ -1,6 +1,8 @@
|
||||||
package writecache
|
package writecache
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"io/fs"
|
||||||
|
"os"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
|
||||||
|
@ -55,6 +57,8 @@ type options struct {
|
||||||
noSync bool
|
noSync bool
|
||||||
// reportError is the function called when encountering disk errors in background workers.
|
// reportError is the function called when encountering disk errors in background workers.
|
||||||
reportError func(string, error)
|
reportError func(string, error)
|
||||||
|
// openFile is the function called internally by bbolt to open database files. Useful for hermetic testing.
|
||||||
|
openFile func(string, int, fs.FileMode) (*os.File, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
// WithLogger sets logger.
|
// WithLogger sets logger.
|
||||||
|
@ -152,3 +156,10 @@ func WithReportErrorFunc(f func(string, error)) Option {
|
||||||
o.reportError = f
|
o.reportError = f
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// WithOpenFile sets the OpenFile function to use internally by bolt. Useful for hermetic testing.
|
||||||
|
func WithOpenFile(f func(string, int, fs.FileMode) (*os.File, error)) Option {
|
||||||
|
return func(o *options) {
|
||||||
|
o.openFile = f
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -43,7 +43,7 @@ func (c *cache) openStore(readOnly bool) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
c.db, err = OpenDB(c.path, readOnly)
|
c.db, err = OpenDB(c.path, readOnly, c.openFile)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not open database: %w", err)
|
return fmt.Errorf("could not open database: %w", err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package writecache
|
package writecache
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"io/fs"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"time"
|
"time"
|
||||||
|
@ -9,10 +10,11 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
// OpenDB opens BoltDB instance for write-cache. Opens in read-only mode if ro is true.
|
// OpenDB opens BoltDB instance for write-cache. Opens in read-only mode if ro is true.
|
||||||
func OpenDB(p string, ro bool) (*bbolt.DB, error) {
|
func OpenDB(p string, ro bool, openFile func(string, int, fs.FileMode) (*os.File, error)) (*bbolt.DB, error) {
|
||||||
return bbolt.Open(filepath.Join(p, dbName), os.ModePerm, &bbolt.Options{
|
return bbolt.Open(filepath.Join(p, dbName), os.ModePerm, &bbolt.Options{
|
||||||
NoFreelistSync: true,
|
NoFreelistSync: true,
|
||||||
ReadOnly: ro,
|
ReadOnly: ro,
|
||||||
Timeout: 100 * time.Millisecond,
|
Timeout: 100 * time.Millisecond,
|
||||||
|
OpenFile: openFile,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package writecache
|
package writecache
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"os"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||||
|
@ -106,6 +107,7 @@ func New(opts ...Option) Cache {
|
||||||
maxCacheSize: defaultMaxCacheSize,
|
maxCacheSize: defaultMaxCacheSize,
|
||||||
maxBatchSize: bbolt.DefaultMaxBatchSize,
|
maxBatchSize: bbolt.DefaultMaxBatchSize,
|
||||||
maxBatchDelay: bbolt.DefaultMaxBatchDelay,
|
maxBatchDelay: bbolt.DefaultMaxBatchDelay,
|
||||||
|
openFile: os.OpenFile,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue
We try to avoid mocks in our codebase.
I see.