[#139] test: Add test storage implementation #173
|
@ -1,6 +1,8 @@
|
|||
package writecache
|
||||
|
||||
import (
|
||||
"os"
|
||||
|
||||
common "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-lens/internal"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache"
|
||||
"github.com/spf13/cobra"
|
||||
|
@ -24,7 +26,7 @@ func init() {
|
|||
}
|
||||
|
||||
func openWC(cmd *cobra.Command) *bbolt.DB {
|
||||
db, err := writecache.OpenDB(vPath, true)
|
||||
db, err := writecache.OpenDB(vPath, true, os.OpenFile)
|
||||
common.ExitOnErr(cmd, common.Errf("could not open write-cache db: %w", err))
|
||||
|
||||
return db
|
||||
|
|
|
@ -50,7 +50,6 @@ func TestExistsInvalidStorageID(t *testing.T) {
|
|||
})
|
||||
|
||||
t.Run("invalid storage id", func(t *testing.T) {
|
||||
// "0/X/Y" <-> "1/X/Y"
|
||||
storageID := slice.Copy(putRes.StorageID)
|
||||
storageID[0] = '9'
|
||||
badDir := filepath.Join(dir, "9")
|
||||
|
|
|
@ -1,7 +1,6 @@
|
|||
package blobstor
|
||||
|
||||
import (
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
|
||||
|
@ -9,32 +8,37 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/blobovniczatree"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/teststore"
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
const blobovniczaDir = "blobovniczas"
|
||||
|
||||
func defaultStorages(p string, smallSizeLimit uint64) []SubStorage {
|
||||
func defaultTestStorages(p string, smallSizeLimit uint64) ([]SubStorage, *teststore.TestStore, *teststore.TestStore) {
|
||||
fyrchik marked this conversation as resolved
Outdated
|
||||
smallFileStorage := teststore.New(teststore.WithSubstorage(blobovniczatree.NewBlobovniczaTree(
|
||||
blobovniczatree.WithRootPath(filepath.Join(p, "blobovniczas")),
|
||||
blobovniczatree.WithBlobovniczaShallowWidth(1)), // default width is 16, slow init
|
||||
))
|
||||
largeFileStorage := teststore.New(teststore.WithSubstorage(fstree.New(fstree.WithPath(p))))
|
||||
return []SubStorage{
|
||||
{
|
||||
Storage: blobovniczatree.NewBlobovniczaTree(
|
||||
blobovniczatree.WithRootPath(filepath.Join(p, "blobovniczas")),
|
||||
blobovniczatree.WithBlobovniczaShallowWidth(1)), // default width is 16, slow init
|
||||
Storage: smallFileStorage,
|
||||
Policy: func(_ *objectSDK.Object, data []byte) bool {
|
||||
return uint64(len(data)) <= smallSizeLimit
|
||||
},
|
||||
},
|
||||
{
|
||||
Storage: fstree.New(fstree.WithPath(p)),
|
||||
Storage: largeFileStorage,
|
||||
},
|
||||
}
|
||||
}, smallFileStorage, largeFileStorage
|
||||
}
|
||||
|
||||
func defaultStorages(p string, smallSizeLimit uint64) []SubStorage {
|
||||
storages, _, _ := defaultTestStorages(p, smallSizeLimit)
|
||||
return storages
|
||||
}
|
||||
|
||||
func TestCompression(t *testing.T) {
|
||||
dir, err := os.MkdirTemp("", "frostfs*")
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(func() { _ = os.RemoveAll(dir) })
|
||||
dir := t.TempDir()
|
||||
|
||||
const (
|
||||
smallSizeLimit = 512
|
||||
|
@ -70,7 +74,7 @@ func TestCompression(t *testing.T) {
|
|||
testPut := func(t *testing.T, b *BlobStor, i int) {
|
||||
var prm common.PutPrm
|
||||
prm.Object = smallObj[i]
|
||||
_, err = b.Put(prm)
|
||||
_, err := b.Put(prm)
|
||||
require.NoError(t, err)
|
||||
|
||||
prm = common.PutPrm{}
|
||||
|
@ -102,9 +106,7 @@ func TestCompression(t *testing.T) {
|
|||
func TestBlobstor_needsCompression(t *testing.T) {
|
||||
const smallSizeLimit = 512
|
||||
newBlobStor := func(t *testing.T, compress bool, ct ...string) *BlobStor {
|
||||
dir, err := os.MkdirTemp("", "frostfs*")
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(func() { _ = os.RemoveAll(dir) })
|
||||
dir := t.TempDir()
|
||||
|
||||
bs := New(
|
||||
WithCompressObjects(compress),
|
||||
|
|
|
@ -2,11 +2,11 @@ package blobstor
|
|||
|
||||
import (
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
|
||||
objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/teststore"
|
||||
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
|
||||
|
@ -20,8 +20,10 @@ func TestExists(t *testing.T) {
|
|||
|
||||
const smallSizeLimit = 512
|
||||
|
||||
b := New(
|
||||
WithStorages(defaultStorages(dir, smallSizeLimit)))
|
||||
storages, _, largeFileStorage := defaultTestStorages(dir, smallSizeLimit)
|
||||
|
||||
b := New(WithStorages(storages))
|
||||
|
||||
require.NoError(t, b.Open(false))
|
||||
require.NoError(t, b.Init())
|
||||
|
||||
|
@ -33,7 +35,7 @@ func TestExists(t *testing.T) {
|
|||
for i := range objects {
|
||||
var prm common.PutPrm
|
||||
prm.Object = objects[i]
|
||||
_, err = b.Put(prm)
|
||||
_, err := b.Put(prm)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
|
@ -51,20 +53,10 @@ func TestExists(t *testing.T) {
|
|||
require.NoError(t, err)
|
||||
require.False(t, res.Exists)
|
||||
|
||||
t.Run("corrupt direcrory", func(t *testing.T) {
|
||||
var bigDir string
|
||||
de, err := os.ReadDir(dir)
|
||||
require.NoError(t, err)
|
||||
for i := range de {
|
||||
if de[i].Name() != blobovniczaDir {
|
||||
bigDir = filepath.Join(dir, de[i].Name())
|
||||
break
|
||||
}
|
||||
}
|
||||
require.NotEmpty(t, bigDir)
|
||||
|
||||
require.NoError(t, os.Chmod(dir, 0))
|
||||
t.Cleanup(func() { require.NoError(t, os.Chmod(dir, 0777)) })
|
||||
t.Run("corrupt directory", func(t *testing.T) {
|
||||
largeFileStorage.SetOption(teststore.WithExists(func(common.ExistsPrm) (common.ExistsRes, error) {
|
||||
return common.ExistsRes{}, teststore.ErrDiskExploded
|
||||
}))
|
||||
|
||||
// Object exists, first error is logged.
|
||||
prm.Address = objectCore.AddressOf(objects[0])
|
||||
|
@ -76,6 +68,7 @@ func TestExists(t *testing.T) {
|
|||
prm.Address = objectCore.AddressOf(objects[1])
|
||||
_, err = b.Exists(prm)
|
||||
require.Error(t, err)
|
||||
require.ErrorIs(t, err, teststore.ErrDiskExploded)
|
||||
})
|
||||
}
|
||||
|
||||
|
|
|
@ -1,23 +1,16 @@
|
|||
package blobstor
|
||||
|
||||
import (
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"testing"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/storagetest"
|
||||
)
|
||||
|
||||
func TestGeneric(t *testing.T) {
|
||||
defer func() { _ = os.RemoveAll(t.Name()) }()
|
||||
|
||||
var n int
|
||||
newMetabase := func(t *testing.T) storagetest.Component {
|
||||
n++
|
||||
dir := filepath.Join(t.Name(), strconv.Itoa(n))
|
||||
return New(
|
||||
WithStorages(defaultStorages(dir, 128)))
|
||||
WithStorages(defaultStorages(t.TempDir(), 128)))
|
||||
}
|
||||
|
||||
storagetest.TestAll(t, newMetabase)
|
||||
|
|
74
pkg/local_object_storage/blobstor/teststore/option.go
Normal file
|
@ -0,0 +1,74 @@
|
|||
package teststore
|
||||
|
||||
import (
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/compression"
|
||||
)
|
||||
|
||||
type cfg struct {
|
||||
st common.Storage
|
||||
overrides struct {
|
||||
Open func(readOnly bool) error
|
||||
Init func() error
|
||||
Close func() error
|
||||
|
||||
Type func() string
|
||||
Path func() string
|
||||
SetCompressor func(cc *compression.Config)
|
||||
SetReportErrorFunc func(f func(string, error))
|
||||
|
||||
Get func(common.GetPrm) (common.GetRes, error)
|
||||
GetRange func(common.GetRangePrm) (common.GetRangeRes, error)
|
||||
Exists func(common.ExistsPrm) (common.ExistsRes, error)
|
||||
Put func(common.PutPrm) (common.PutRes, error)
|
||||
Delete func(common.DeletePrm) (common.DeleteRes, error)
|
||||
Iterate func(common.IteratePrm) (common.IterateRes, error)
|
||||
}
|
||||
}
|
||||
|
||||
type Option func(*cfg)
|
||||
|
||||
func WithSubstorage(st common.Storage) Option {
|
||||
return func(c *cfg) {
|
||||
c.st = st
|
||||
}
|
||||
}
|
||||
|
||||
func WithOpen(f func(bool) error) Option { return func(c *cfg) { c.overrides.Open = f } }
|
||||
func WithInit(f func() error) Option { return func(c *cfg) { c.overrides.Init = f } }
|
||||
func WithClose(f func() error) Option { return func(c *cfg) { c.overrides.Close = f } }
|
||||
|
||||
func WithType(f func() string) Option { return func(c *cfg) { c.overrides.Type = f } }
|
||||
func WithPath(f func() string) Option { return func(c *cfg) { c.overrides.Path = f } }
|
||||
|
||||
func WithSetCompressor(f func(*compression.Config)) Option {
|
||||
return func(c *cfg) { c.overrides.SetCompressor = f }
|
||||
}
|
||||
|
||||
func WithReportErrorFunc(f func(func(string, error))) Option {
|
||||
return func(c *cfg) { c.overrides.SetReportErrorFunc = f }
|
||||
}
|
||||
|
||||
func WithGet(f func(common.GetPrm) (common.GetRes, error)) Option {
|
||||
return func(c *cfg) { c.overrides.Get = f }
|
||||
}
|
||||
|
||||
func WithGetRange(f func(common.GetRangePrm) (common.GetRangeRes, error)) Option {
|
||||
return func(c *cfg) { c.overrides.GetRange = f }
|
||||
}
|
||||
|
||||
func WithExists(f func(common.ExistsPrm) (common.ExistsRes, error)) Option {
|
||||
return func(c *cfg) { c.overrides.Exists = f }
|
||||
}
|
||||
|
||||
func WithPut(f func(common.PutPrm) (common.PutRes, error)) Option {
|
||||
return func(c *cfg) { c.overrides.Put = f }
|
||||
}
|
||||
|
||||
func WithDelete(f func(common.DeletePrm) (common.DeleteRes, error)) Option {
|
||||
return func(c *cfg) { c.overrides.Delete = f }
|
||||
}
|
||||
|
||||
func WithIterate(f func(common.IteratePrm) (common.IterateRes, error)) Option {
|
||||
return func(c *cfg) { c.overrides.Iterate = f }
|
||||
}
|
215
pkg/local_object_storage/blobstor/teststore/teststore.go
Normal file
|
@ -0,0 +1,215 @@
|
|||
// Package teststore provides a common.Storage implementation for testing/mocking purposes.
|
||||
//
|
||||
// A new teststore.TestStore can be obtained with teststore.New. Whenever one of the common.Storage
|
||||
// methods is called, the implementation selects what function to call in the following order:
|
||||
// 1. If an override for that method was provided at construction time (via teststore.WithXXX()) or
|
||||
// afterwards via SetOption, that override is used.
|
||||
// 2. If a substorage was provided at construction time (via teststore.WithSubstorage()) or afterwars
|
||||
// via SetOption, the corresponding method in the substorage is used.
|
||||
// 3. If none of the above apply, the call panics with an error describing the unexpected call.
|
||||
//
|
||||
// It's safe to call SetOption and the overrides from multiple goroutines, but it's the override's
|
||||
// responsibility to ensure safety of whatever operation it executes.
|
||||
package teststore
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/compression"
|
||||
)
|
||||
|
||||
// TestStore is a common.Storage implementation for testing/mocking purposes.
|
||||
type TestStore struct {
|
||||
mu sync.RWMutex
|
||||
*cfg
|
||||
}
|
||||
|
||||
// ErrDiskExploded is a phony error which can be used for testing purposes to differentiate it from
|
||||
// more common errors.
|
||||
var ErrDiskExploded = errors.New("disk exploded")
|
||||
|
||||
// New returns a teststore.TestStore from the given options.
|
||||
func New(opts ...Option) *TestStore {
|
||||
c := &cfg{}
|
||||
for _, opt := range opts {
|
||||
opt(c)
|
||||
}
|
||||
return &TestStore{cfg: c}
|
||||
}
|
||||
|
||||
// SetOption overrides an option of an existing teststore.TestStore.
|
||||
// This is useful for overriding methods during a test so that different
|
||||
// behaviors are simulated.
|
||||
func (s *TestStore) SetOption(opt Option) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
opt(s.cfg)
|
||||
}
|
||||
|
||||
func (s *TestStore) Open(readOnly bool) error {
|
||||
s.mu.RLock()
|
||||
defer s.mu.RUnlock()
|
||||
switch {
|
||||
case s.overrides.Open != nil:
|
||||
return s.overrides.Open(readOnly)
|
||||
case s.st != nil:
|
||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
Сan we enforce this in Сan we enforce this in `new` and make this branch a `default`? Implicit panic is also not bad here.
ale64bit
commented
This is intentional, so that a This is intentional, so that a `TreeStore` can also be used purely as a mock (e.g. to verify only certain calls are executed and nothing is propagated to any underlying storage).
Re. the implicit panic, isn't better an explicit panic with specific error message rather than a nil dereference panic which might not be immediately clear?
|
||||
return s.st.Open(readOnly)
|
||||
default:
|
||||
panic(fmt.Sprintf("unexpected storage call: Open(%v)", readOnly))
|
||||
}
|
||||
}
|
||||
|
||||
func (s *TestStore) Init() error {
|
||||
s.mu.RLock()
|
||||
defer s.mu.RUnlock()
|
||||
switch {
|
||||
case s.overrides.Init != nil:
|
||||
return s.overrides.Init()
|
||||
case s.st != nil:
|
||||
return s.st.Init()
|
||||
default:
|
||||
panic("unexpected storage call: Init()")
|
||||
}
|
||||
}
|
||||
|
||||
func (s *TestStore) Close() error {
|
||||
s.mu.RLock()
|
||||
defer s.mu.RUnlock()
|
||||
switch {
|
||||
case s.overrides.Close != nil:
|
||||
return s.overrides.Close()
|
||||
case s.st != nil:
|
||||
return s.st.Close()
|
||||
default:
|
||||
panic("unexpected storage call: Close()")
|
||||
}
|
||||
}
|
||||
|
||||
func (s *TestStore) Type() string {
|
||||
s.mu.RLock()
|
||||
defer s.mu.RUnlock()
|
||||
switch {
|
||||
case s.overrides.Type != nil:
|
||||
return s.overrides.Type()
|
||||
case s.st != nil:
|
||||
return s.st.Type()
|
||||
default:
|
||||
panic("unexpected storage call: Type()")
|
||||
}
|
||||
}
|
||||
|
||||
func (s *TestStore) Path() string {
|
||||
s.mu.RLock()
|
||||
defer s.mu.RUnlock()
|
||||
switch {
|
||||
case s.overrides.Path != nil:
|
||||
return s.overrides.Path()
|
||||
case s.st != nil:
|
||||
return s.st.Path()
|
||||
default:
|
||||
panic("unexpected storage call: Path()")
|
||||
}
|
||||
}
|
||||
|
||||
func (s *TestStore) SetCompressor(cc *compression.Config) {
|
||||
s.mu.RLock()
|
||||
defer s.mu.RUnlock()
|
||||
switch {
|
||||
case s.overrides.SetCompressor != nil:
|
||||
s.overrides.SetCompressor(cc)
|
||||
case s.st != nil:
|
||||
s.st.SetCompressor(cc)
|
||||
default:
|
||||
panic(fmt.Sprintf("unexpected storage call: SetCompressor(%+v)", cc))
|
||||
}
|
||||
}
|
||||
|
||||
func (s *TestStore) SetReportErrorFunc(f func(string, error)) {
|
||||
s.mu.RLock()
|
||||
defer s.mu.RUnlock()
|
||||
switch {
|
||||
case s.overrides.SetReportErrorFunc != nil:
|
||||
s.overrides.SetReportErrorFunc(f)
|
||||
case s.st != nil:
|
||||
s.st.SetReportErrorFunc(f)
|
||||
default:
|
||||
panic("unexpected storage call: SetReportErrorFunc(<func>)")
|
||||
}
|
||||
}
|
||||
|
||||
func (s *TestStore) Get(req common.GetPrm) (common.GetRes, error) {
|
||||
switch {
|
||||
case s.overrides.Get != nil:
|
||||
return s.overrides.Get(req)
|
||||
case s.st != nil:
|
||||
return s.st.Get(req)
|
||||
default:
|
||||
panic(fmt.Sprintf("unexpected storage call: Get(%+v)", req))
|
||||
}
|
||||
}
|
||||
|
||||
func (s *TestStore) GetRange(req common.GetRangePrm) (common.GetRangeRes, error) {
|
||||
s.mu.RLock()
|
||||
defer s.mu.RUnlock()
|
||||
switch {
|
||||
case s.overrides.GetRange != nil:
|
||||
return s.overrides.GetRange(req)
|
||||
case s.st != nil:
|
||||
return s.st.GetRange(req)
|
||||
default:
|
||||
panic(fmt.Sprintf("unexpected storage call: GetRange(%+v)", req))
|
||||
}
|
||||
}
|
||||
|
||||
func (s *TestStore) Exists(req common.ExistsPrm) (common.ExistsRes, error) {
|
||||
switch {
|
||||
case s.overrides.Exists != nil:
|
||||
return s.overrides.Exists(req)
|
||||
case s.st != nil:
|
||||
return s.st.Exists(req)
|
||||
default:
|
||||
panic(fmt.Sprintf("unexpected storage call: Exists(%+v)", req))
|
||||
}
|
||||
}
|
||||
|
||||
func (s *TestStore) Put(req common.PutPrm) (common.PutRes, error) {
|
||||
s.mu.RLock()
|
||||
defer s.mu.RUnlock()
|
||||
switch {
|
||||
case s.overrides.Put != nil:
|
||||
return s.overrides.Put(req)
|
||||
case s.st != nil:
|
||||
return s.st.Put(req)
|
||||
default:
|
||||
panic(fmt.Sprintf("unexpected storage call: Put(%+v)", req))
|
||||
}
|
||||
}
|
||||
|
||||
func (s *TestStore) Delete(req common.DeletePrm) (common.DeleteRes, error) {
|
||||
s.mu.RLock()
|
||||
defer s.mu.RUnlock()
|
||||
switch {
|
||||
case s.overrides.Delete != nil:
|
||||
return s.overrides.Delete(req)
|
||||
case s.st != nil:
|
||||
return s.st.Delete(req)
|
||||
default:
|
||||
panic(fmt.Sprintf("unexpected storage call: Delete(%+v)", req))
|
||||
}
|
||||
}
|
||||
|
||||
func (s *TestStore) Iterate(req common.IteratePrm) (common.IterateRes, error) {
|
||||
s.mu.RLock()
|
||||
defer s.mu.RUnlock()
|
||||
switch {
|
||||
case s.overrides.Iterate != nil:
|
||||
return s.overrides.Iterate(req)
|
||||
case s.st != nil:
|
||||
return s.st.Iterate(req)
|
||||
default:
|
||||
panic(fmt.Sprintf("unexpected storage call: Iterate(%+v)", req))
|
||||
}
|
||||
}
|
|
@ -3,14 +3,17 @@ package engine
|
|||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"io/fs"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/teststore"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
|
||||
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
|
||||
|
@ -25,92 +28,128 @@ import (
|
|||
)
|
||||
|
||||
// TestInitializationFailure checks that shard is initialized and closed even if media
|
||||
// under any single component is absent. We emulate this with permission denied error.
|
||||
// under any single component is absent.
|
||||
func TestInitializationFailure(t *testing.T) {
|
||||
type paths struct {
|
||||
blobstor string
|
||||
metabase string
|
||||
writecache string
|
||||
pilorama string
|
||||
type openFileFunc func(string, int, fs.FileMode) (*os.File, error)
|
||||
|
||||
type testShardOpts struct {
|
||||
openFileMetabase openFileFunc
|
||||
openFileWriteCache openFileFunc
|
||||
openFilePilorama openFileFunc
|
||||
}
|
||||
|
||||
existsDir := filepath.Join(t.TempDir(), "shard")
|
||||
badDir := filepath.Join(t.TempDir(), "missing")
|
||||
|
||||
testShard := func(c paths) []shard.Option {
|
||||
testShard := func(opts testShardOpts) ([]shard.Option, *teststore.TestStore, *teststore.TestStore) {
|
||||
sid, err := generateShardID()
|
||||
require.NoError(t, err)
|
||||
|
||||
tempDir := t.TempDir()
|
||||
blobstorPath := filepath.Join(tempDir, "bs")
|
||||
metabasePath := filepath.Join(tempDir, "mb")
|
||||
writecachePath := filepath.Join(tempDir, "wc")
|
||||
piloramaPath := filepath.Join(tempDir, "pl")
|
||||
|
||||
storages, smallFileStorage, largeFileStorage := newTestStorages(blobstorPath, 1<<20)
|
||||
|
||||
return []shard.Option{
|
||||
shard.WithID(sid),
|
||||
shard.WithLogger(&logger.Logger{Logger: zaptest.NewLogger(t)}),
|
||||
shard.WithBlobStorOptions(
|
||||
blobstor.WithStorages(
|
||||
newStorages(c.blobstor, 1<<20))),
|
||||
blobstor.WithStorages(storages)),
|
||||
shard.WithMetaBaseOptions(
|
||||
meta.WithBoltDBOptions(&bbolt.Options{
|
||||
Timeout: 100 * time.Millisecond,
|
||||
Timeout: 100 * time.Millisecond,
|
||||
OpenFile: opts.openFileMetabase,
|
||||
}),
|
||||
meta.WithPath(c.metabase),
|
||||
meta.WithPath(metabasePath),
|
||||
meta.WithPermissions(0700),
|
||||
meta.WithEpochState(epochState{})),
|
||||
shard.WithWriteCache(true),
|
||||
shard.WithWriteCacheOptions(writecache.WithPath(c.writecache)),
|
||||
shard.WithPiloramaOptions(pilorama.WithPath(c.pilorama)),
|
||||
}
|
||||
shard.WithWriteCacheOptions(
|
||||
writecache.WithPath(writecachePath),
|
||||
writecache.WithOpenFile(opts.openFileWriteCache),
|
||||
),
|
||||
shard.WithPiloramaOptions(
|
||||
pilorama.WithPath(piloramaPath),
|
||||
pilorama.WithOpenFile(opts.openFilePilorama),
|
||||
),
|
||||
}, smallFileStorage, largeFileStorage
|
||||
}
|
||||
|
||||
t.Run("blobstor", func(t *testing.T) {
|
||||
badDir := filepath.Join(badDir, t.Name())
|
||||
require.NoError(t, os.MkdirAll(badDir, os.ModePerm))
|
||||
require.NoError(t, os.Chmod(badDir, 0))
|
||||
testEngineFailInitAndReload(t, badDir, false, testShard(paths{
|
||||
blobstor: filepath.Join(badDir, "0"),
|
||||
metabase: filepath.Join(existsDir, t.Name(), "1"),
|
||||
writecache: filepath.Join(existsDir, t.Name(), "2"),
|
||||
pilorama: filepath.Join(existsDir, t.Name(), "3"),
|
||||
shardOpts, _, largeFileStorage := testShard(testShardOpts{
|
||||
openFileMetabase: os.OpenFile,
|
||||
openFileWriteCache: os.OpenFile,
|
||||
openFilePilorama: os.OpenFile,
|
||||
})
|
||||
largeFileStorage.SetOption(teststore.WithOpen(func(ro bool) error {
|
||||
return teststore.ErrDiskExploded
|
||||
}))
|
||||
beforeReload := func() {
|
||||
largeFileStorage.SetOption(teststore.WithOpen(nil))
|
||||
}
|
||||
testEngineFailInitAndReload(t, false, shardOpts, beforeReload)
|
||||
})
|
||||
t.Run("metabase", func(t *testing.T) {
|
||||
badDir := filepath.Join(badDir, t.Name())
|
||||
require.NoError(t, os.MkdirAll(badDir, os.ModePerm))
|
||||
require.NoError(t, os.Chmod(badDir, 0))
|
||||
testEngineFailInitAndReload(t, badDir, true, testShard(paths{
|
||||
blobstor: filepath.Join(existsDir, t.Name(), "0"),
|
||||
metabase: filepath.Join(badDir, "1"),
|
||||
writecache: filepath.Join(existsDir, t.Name(), "2"),
|
||||
pilorama: filepath.Join(existsDir, t.Name(), "3"),
|
||||
}))
|
||||
var openFileMetabaseSucceed atomic.Bool
|
||||
openFileMetabase := func(p string, f int, mode fs.FileMode) (*os.File, error) {
|
||||
if openFileMetabaseSucceed.Load() {
|
||||
return os.OpenFile(p, f, mode)
|
||||
}
|
||||
return nil, teststore.ErrDiskExploded
|
||||
}
|
||||
beforeReload := func() {
|
||||
openFileMetabaseSucceed.Store(true)
|
||||
}
|
||||
shardOpts, _, _ := testShard(testShardOpts{
|
||||
openFileMetabase: openFileMetabase,
|
||||
openFileWriteCache: os.OpenFile,
|
||||
openFilePilorama: os.OpenFile,
|
||||
})
|
||||
testEngineFailInitAndReload(t, true, shardOpts, beforeReload)
|
||||
})
|
||||
t.Run("write-cache", func(t *testing.T) {
|
||||
badDir := filepath.Join(badDir, t.Name())
|
||||
require.NoError(t, os.MkdirAll(badDir, os.ModePerm))
|
||||
require.NoError(t, os.Chmod(badDir, 0))
|
||||
testEngineFailInitAndReload(t, badDir, false, testShard(paths{
|
||||
blobstor: filepath.Join(existsDir, t.Name(), "0"),
|
||||
metabase: filepath.Join(existsDir, t.Name(), "1"),
|
||||
writecache: filepath.Join(badDir, "2"),
|
||||
pilorama: filepath.Join(existsDir, t.Name(), "3"),
|
||||
}))
|
||||
var openFileWriteCacheSucceed atomic.Bool
|
||||
openFileWriteCache := func(p string, f int, mode fs.FileMode) (*os.File, error) {
|
||||
if openFileWriteCacheSucceed.Load() {
|
||||
return os.OpenFile(p, f, mode)
|
||||
}
|
||||
return nil, teststore.ErrDiskExploded
|
||||
}
|
||||
beforeReload := func() {
|
||||
openFileWriteCacheSucceed.Store(true)
|
||||
}
|
||||
shardOpts, _, _ := testShard(testShardOpts{
|
||||
openFileMetabase: os.OpenFile,
|
||||
openFileWriteCache: openFileWriteCache,
|
||||
openFilePilorama: os.OpenFile,
|
||||
})
|
||||
testEngineFailInitAndReload(t, false, shardOpts, beforeReload)
|
||||
})
|
||||
t.Run("pilorama", func(t *testing.T) {
|
||||
badDir := filepath.Join(badDir, t.Name())
|
||||
require.NoError(t, os.MkdirAll(badDir, os.ModePerm))
|
||||
require.NoError(t, os.Chmod(badDir, 0))
|
||||
testEngineFailInitAndReload(t, badDir, false, testShard(paths{
|
||||
blobstor: filepath.Join(existsDir, t.Name(), "0"),
|
||||
metabase: filepath.Join(existsDir, t.Name(), "1"),
|
||||
writecache: filepath.Join(existsDir, t.Name(), "2"),
|
||||
pilorama: filepath.Join(badDir, "3"),
|
||||
}))
|
||||
var openFilePiloramaSucceed atomic.Bool
|
||||
openFilePilorama := func(p string, f int, mode fs.FileMode) (*os.File, error) {
|
||||
if openFilePiloramaSucceed.Load() {
|
||||
return os.OpenFile(p, f, mode)
|
||||
}
|
||||
return nil, teststore.ErrDiskExploded
|
||||
}
|
||||
beforeReload := func() {
|
||||
openFilePiloramaSucceed.Store(true)
|
||||
}
|
||||
shardOpts, _, _ := testShard(testShardOpts{
|
||||
openFileMetabase: os.OpenFile,
|
||||
openFileWriteCache: os.OpenFile,
|
||||
openFilePilorama: openFilePilorama,
|
||||
})
|
||||
testEngineFailInitAndReload(t, false, shardOpts, beforeReload)
|
||||
})
|
||||
}
|
||||
|
||||
func testEngineFailInitAndReload(t *testing.T, badDir string, errOnAdd bool, s []shard.Option) {
|
||||
func testEngineFailInitAndReload(t *testing.T, errOnAdd bool, opts []shard.Option, beforeReload func()) {
|
||||
var configID string
|
||||
|
||||
e := New()
|
||||
_, err := e.AddShard(s...)
|
||||
_, err := e.AddShard(opts...)
|
||||
if errOnAdd {
|
||||
require.Error(t, err)
|
||||
// This branch is only taken when we cannot update shard ID in the metabase.
|
||||
|
@ -139,9 +178,10 @@ func testEngineFailInitAndReload(t *testing.T, badDir string, errOnAdd bool, s [
|
|||
e.mtx.RUnlock()
|
||||
require.Equal(t, 0, shardCount)
|
||||
|
||||
require.NoError(t, os.Chmod(badDir, os.ModePerm))
|
||||
beforeReload()
|
||||
|
||||
require.NoError(t, e.Reload(ReConfiguration{
|
||||
shards: map[string][]shard.Option{configID: s},
|
||||
shards: map[string][]shard.Option{configID: opts},
|
||||
}))
|
||||
|
||||
e.mtx.RLock()
|
||||
|
@ -193,26 +233,28 @@ func TestPersistentShardID(t *testing.T) {
|
|||
dir, err := os.MkdirTemp("", "*")
|
||||
require.NoError(t, err)
|
||||
|
||||
e, _, id := newEngineWithErrorThreshold(t, dir, 1)
|
||||
te := newEngineWithErrorThreshold(t, dir, 1)
|
||||
|
||||
checkShardState(t, e, id[0], 0, mode.ReadWrite)
|
||||
require.NoError(t, e.Close())
|
||||
checkShardState(t, te.ng, te.shards[0].id, 0, mode.ReadWrite)
|
||||
require.NoError(t, te.ng.Close())
|
||||
|
||||
e, _, newID := newEngineWithErrorThreshold(t, dir, 1)
|
||||
require.Equal(t, id, newID)
|
||||
require.NoError(t, e.Close())
|
||||
newTe := newEngineWithErrorThreshold(t, dir, 1)
|
||||
for i := 0; i < len(newTe.shards); i++ {
|
||||
require.Equal(t, te.shards[i].id, newTe.shards[i].id)
|
||||
}
|
||||
require.NoError(t, newTe.ng.Close())
|
||||
|
||||
p1 := e.shards[id[0].String()].Shard.DumpInfo().MetaBaseInfo.Path
|
||||
p2 := e.shards[id[1].String()].Shard.DumpInfo().MetaBaseInfo.Path
|
||||
p1 := newTe.ng.shards[te.shards[0].id.String()].Shard.DumpInfo().MetaBaseInfo.Path
|
||||
p2 := newTe.ng.shards[te.shards[1].id.String()].Shard.DumpInfo().MetaBaseInfo.Path
|
||||
tmp := filepath.Join(dir, "tmp")
|
||||
require.NoError(t, os.Rename(p1, tmp))
|
||||
require.NoError(t, os.Rename(p2, p1))
|
||||
require.NoError(t, os.Rename(tmp, p2))
|
||||
|
||||
e, _, newID = newEngineWithErrorThreshold(t, dir, 1)
|
||||
require.Equal(t, id[1], newID[0])
|
||||
require.Equal(t, id[0], newID[1])
|
||||
require.NoError(t, e.Close())
|
||||
newTe = newEngineWithErrorThreshold(t, dir, 1)
|
||||
require.Equal(t, te.shards[1].id, newTe.shards[0].id)
|
||||
require.Equal(t, te.shards[0].id, newTe.shards[1].id)
|
||||
require.NoError(t, newTe.ng.Close())
|
||||
|
||||
}
|
||||
|
||||
|
|
|
@ -9,6 +9,7 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/blobovniczatree"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/teststore"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
|
||||
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
|
||||
|
@ -115,6 +116,32 @@ func newStorages(root string, smallSize uint64) []blobstor.SubStorage {
|
|||
}
|
||||
}
|
||||
|
||||
func newTestStorages(root string, smallSize uint64) ([]blobstor.SubStorage, *teststore.TestStore, *teststore.TestStore) {
|
||||
smallFileStorage := teststore.New(
|
||||
teststore.WithSubstorage(blobovniczatree.NewBlobovniczaTree(
|
||||
blobovniczatree.WithRootPath(filepath.Join(root, "blobovnicza")),
|
||||
blobovniczatree.WithBlobovniczaShallowDepth(1),
|
||||
blobovniczatree.WithBlobovniczaShallowWidth(1),
|
||||
blobovniczatree.WithPermissions(0700)),
|
||||
))
|
||||
largeFileStorage := teststore.New(
|
||||
teststore.WithSubstorage(fstree.New(
|
||||
fstree.WithPath(root),
|
||||
fstree.WithDepth(1)),
|
||||
))
|
||||
return []blobstor.SubStorage{
|
||||
{
|
||||
Storage: smallFileStorage,
|
||||
Policy: func(_ *object.Object, data []byte) bool {
|
||||
return uint64(len(data)) < smallSize
|
||||
},
|
||||
},
|
||||
{
|
||||
Storage: largeFileStorage,
|
||||
},
|
||||
}, smallFileStorage, largeFileStorage
|
||||
}
|
||||
|
||||
func testNewShard(t testing.TB, id int) *shard.Shard {
|
||||
sid, err := generateShardID()
|
||||
require.NoError(t, err)
|
||||
|
|
|
@ -9,6 +9,8 @@ import (
|
|||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/teststore"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
|
||||
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
|
||||
|
@ -24,7 +26,19 @@ import (
|
|||
|
||||
const errSmallSize = 256
|
||||
|
||||
func newEngineWithErrorThreshold(t testing.TB, dir string, errThreshold uint32) (*StorageEngine, string, [2]*shard.ID) {
|
||||
type testEngine struct {
|
||||
ng *StorageEngine
|
||||
dir string
|
||||
shards [2]*testShard
|
||||
}
|
||||
|
||||
type testShard struct {
|
||||
id *shard.ID
|
||||
smallFileStorage *teststore.TestStore
|
||||
largeFileStorage *teststore.TestStore
|
||||
}
|
||||
|
||||
func newEngineWithErrorThreshold(t testing.TB, dir string, errThreshold uint32) *testEngine {
|
||||
if dir == "" {
|
||||
var err error
|
||||
|
||||
|
@ -38,14 +52,13 @@ func newEngineWithErrorThreshold(t testing.TB, dir string, errThreshold uint32)
|
|||
WithShardPoolSize(1),
|
||||
WithErrorThreshold(errThreshold))
|
||||
|
||||
var ids [2]*shard.ID
|
||||
var err error
|
||||
var testShards [2]*testShard
|
||||
|
||||
for i := range ids {
|
||||
ids[i], err = e.AddShard(
|
||||
for i := range testShards {
|
||||
storages, smallFileStorage, largeFileStorage := newTestStorages(filepath.Join(dir, strconv.Itoa(i)), errSmallSize)
|
||||
id, err := e.AddShard(
|
||||
shard.WithLogger(&logger.Logger{Logger: zaptest.NewLogger(t)}),
|
||||
shard.WithBlobStorOptions(
|
||||
blobstor.WithStorages(newStorages(filepath.Join(dir, strconv.Itoa(i)), errSmallSize))),
|
||||
shard.WithBlobStorOptions(blobstor.WithStorages(storages)),
|
||||
shard.WithMetaBaseOptions(
|
||||
meta.WithPath(filepath.Join(dir, fmt.Sprintf("%d.metabase", i))),
|
||||
meta.WithPermissions(0700),
|
||||
|
@ -55,94 +68,111 @@ func newEngineWithErrorThreshold(t testing.TB, dir string, errThreshold uint32)
|
|||
pilorama.WithPath(filepath.Join(dir, fmt.Sprintf("%d.pilorama", i))),
|
||||
pilorama.WithPerm(0700)))
|
||||
require.NoError(t, err)
|
||||
|
||||
testShards[i] = &testShard{
|
||||
id: id,
|
||||
smallFileStorage: smallFileStorage,
|
||||
largeFileStorage: largeFileStorage,
|
||||
}
|
||||
}
|
||||
require.NoError(t, e.Open())
|
||||
require.NoError(t, e.Init())
|
||||
|
||||
return e, dir, ids
|
||||
return &testEngine{
|
||||
ng: e,
|
||||
dir: dir,
|
||||
shards: testShards,
|
||||
}
|
||||
}
|
||||
|
||||
func TestErrorReporting(t *testing.T) {
|
||||
t.Run("ignore errors by default", func(t *testing.T) {
|
||||
e, dir, id := newEngineWithErrorThreshold(t, "", 0)
|
||||
te := newEngineWithErrorThreshold(t, "", 0)
|
||||
|
||||
obj := testutil.GenerateObjectWithCID(cidtest.ID())
|
||||
obj.SetPayload(make([]byte, errSmallSize))
|
||||
|
||||
var prm shard.PutPrm
|
||||
prm.SetObject(obj)
|
||||
e.mtx.RLock()
|
||||
_, err := e.shards[id[0].String()].Shard.Put(prm)
|
||||
e.mtx.RUnlock()
|
||||
te.ng.mtx.RLock()
|
||||
_, err := te.ng.shards[te.shards[0].id.String()].Shard.Put(prm)
|
||||
te.ng.mtx.RUnlock()
|
||||
require.NoError(t, err)
|
||||
|
||||
_, err = e.Get(GetPrm{addr: object.AddressOf(obj)})
|
||||
_, err = te.ng.Get(GetPrm{addr: object.AddressOf(obj)})
|
||||
require.NoError(t, err)
|
||||
|
||||
checkShardState(t, e, id[0], 0, mode.ReadWrite)
|
||||
checkShardState(t, e, id[1], 0, mode.ReadWrite)
|
||||
checkShardState(t, te.ng, te.shards[0].id, 0, mode.ReadWrite)
|
||||
checkShardState(t, te.ng, te.shards[1].id, 0, mode.ReadWrite)
|
||||
|
||||
corruptSubDir(t, filepath.Join(dir, "0"))
|
||||
for _, shard := range te.shards {
|
||||
shard.largeFileStorage.SetOption(teststore.WithGet(func(common.GetPrm) (common.GetRes, error) {
|
||||
return common.GetRes{}, teststore.ErrDiskExploded
|
||||
}))
|
||||
}
|
||||
|
||||
for i := uint32(1); i < 3; i++ {
|
||||
_, err = e.Get(GetPrm{addr: object.AddressOf(obj)})
|
||||
_, err = te.ng.Get(GetPrm{addr: object.AddressOf(obj)})
|
||||
require.Error(t, err)
|
||||
checkShardState(t, e, id[0], i, mode.ReadWrite)
|
||||
checkShardState(t, e, id[1], 0, mode.ReadWrite)
|
||||
checkShardState(t, te.ng, te.shards[0].id, i, mode.ReadWrite)
|
||||
checkShardState(t, te.ng, te.shards[1].id, 0, mode.ReadWrite)
|
||||
}
|
||||
})
|
||||
t.Run("with error threshold", func(t *testing.T) {
|
||||
const errThreshold = 3
|
||||
|
||||
e, dir, id := newEngineWithErrorThreshold(t, "", errThreshold)
|
||||
te := newEngineWithErrorThreshold(t, "", errThreshold)
|
||||
|
||||
obj := testutil.GenerateObjectWithCID(cidtest.ID())
|
||||
obj.SetPayload(make([]byte, errSmallSize))
|
||||
|
||||
var prm shard.PutPrm
|
||||
prm.SetObject(obj)
|
||||
e.mtx.RLock()
|
||||
_, err := e.shards[id[0].String()].Put(prm)
|
||||
e.mtx.RUnlock()
|
||||
te.ng.mtx.RLock()
|
||||
_, err := te.ng.shards[te.shards[0].id.String()].Put(prm)
|
||||
te.ng.mtx.RUnlock()
|
||||
require.NoError(t, err)
|
||||
|
||||
_, err = e.Get(GetPrm{addr: object.AddressOf(obj)})
|
||||
_, err = te.ng.Get(GetPrm{addr: object.AddressOf(obj)})
|
||||
require.NoError(t, err)
|
||||
|
||||
checkShardState(t, e, id[0], 0, mode.ReadWrite)
|
||||
checkShardState(t, e, id[1], 0, mode.ReadWrite)
|
||||
checkShardState(t, te.ng, te.shards[0].id, 0, mode.ReadWrite)
|
||||
checkShardState(t, te.ng, te.shards[1].id, 0, mode.ReadWrite)
|
||||
|
||||
corruptSubDir(t, filepath.Join(dir, "0"))
|
||||
for _, shard := range te.shards {
|
||||
shard.largeFileStorage.SetOption(teststore.WithGet(func(common.GetPrm) (common.GetRes, error) {
|
||||
return common.GetRes{}, teststore.ErrDiskExploded
|
||||
}))
|
||||
}
|
||||
|
||||
for i := uint32(1); i < errThreshold; i++ {
|
||||
_, err = e.Get(GetPrm{addr: object.AddressOf(obj)})
|
||||
_, err = te.ng.Get(GetPrm{addr: object.AddressOf(obj)})
|
||||
require.Error(t, err)
|
||||
checkShardState(t, e, id[0], i, mode.ReadWrite)
|
||||
checkShardState(t, e, id[1], 0, mode.ReadWrite)
|
||||
checkShardState(t, te.ng, te.shards[0].id, i, mode.ReadWrite)
|
||||
checkShardState(t, te.ng, te.shards[1].id, 0, mode.ReadWrite)
|
||||
}
|
||||
|
||||
for i := uint32(0); i < 2; i++ {
|
||||
_, err = e.Get(GetPrm{addr: object.AddressOf(obj)})
|
||||
_, err = te.ng.Get(GetPrm{addr: object.AddressOf(obj)})
|
||||
require.Error(t, err)
|
||||
checkShardState(t, e, id[0], errThreshold+i, mode.DegradedReadOnly)
|
||||
checkShardState(t, e, id[1], 0, mode.ReadWrite)
|
||||
checkShardState(t, te.ng, te.shards[0].id, errThreshold+i, mode.DegradedReadOnly)
|
||||
checkShardState(t, te.ng, te.shards[1].id, 0, mode.ReadWrite)
|
||||
}
|
||||
|
||||
require.NoError(t, e.SetShardMode(id[0], mode.ReadWrite, false))
|
||||
checkShardState(t, e, id[0], errThreshold+1, mode.ReadWrite)
|
||||
require.NoError(t, te.ng.SetShardMode(te.shards[0].id, mode.ReadWrite, false))
|
||||
checkShardState(t, te.ng, te.shards[0].id, errThreshold+1, mode.ReadWrite)
|
||||
|
||||
require.NoError(t, e.SetShardMode(id[0], mode.ReadWrite, true))
|
||||
checkShardState(t, e, id[0], 0, mode.ReadWrite)
|
||||
require.NoError(t, te.ng.SetShardMode(te.shards[0].id, mode.ReadWrite, true))
|
||||
checkShardState(t, te.ng, te.shards[0].id, 0, mode.ReadWrite)
|
||||
})
|
||||
}
|
||||
|
||||
// Issue #1186.
|
||||
func TestBlobstorFailback(t *testing.T) {
|
||||
dir, err := os.MkdirTemp("", "*")
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(func() { require.NoError(t, os.RemoveAll(dir)) })
|
||||
|
||||
e, _, id := newEngineWithErrorThreshold(t, dir, 1)
|
||||
te := newEngineWithErrorThreshold(t, dir, 1)
|
||||
|
||||
objs := make([]*objectSDK.Object, 0, 2)
|
||||
for _, size := range []int{15, errSmallSize + 1} {
|
||||
|
@ -151,49 +181,49 @@ func TestBlobstorFailback(t *testing.T) {
|
|||
|
||||
var prm shard.PutPrm
|
||||
prm.SetObject(obj)
|
||||
e.mtx.RLock()
|
||||
_, err = e.shards[id[0].String()].Shard.Put(prm)
|
||||
e.mtx.RUnlock()
|
||||
te.ng.mtx.RLock()
|
||||
_, err = te.ng.shards[te.shards[0].id.String()].Shard.Put(prm)
|
||||
te.ng.mtx.RUnlock()
|
||||
require.NoError(t, err)
|
||||
objs = append(objs, obj)
|
||||
}
|
||||
|
||||
for i := range objs {
|
||||
addr := object.AddressOf(objs[i])
|
||||
_, err = e.Get(GetPrm{addr: addr})
|
||||
_, err = te.ng.Get(GetPrm{addr: addr})
|
||||
require.NoError(t, err)
|
||||
_, err = e.GetRange(RngPrm{addr: addr})
|
||||
_, err = te.ng.GetRange(RngPrm{addr: addr})
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
checkShardState(t, e, id[0], 0, mode.ReadWrite)
|
||||
require.NoError(t, e.Close())
|
||||
checkShardState(t, te.ng, te.shards[0].id, 0, mode.ReadWrite)
|
||||
require.NoError(t, te.ng.Close())
|
||||
|
||||
p1 := e.shards[id[0].String()].Shard.DumpInfo().BlobStorInfo.SubStorages[1].Path
|
||||
p2 := e.shards[id[1].String()].Shard.DumpInfo().BlobStorInfo.SubStorages[1].Path
|
||||
p1 := te.ng.shards[te.shards[0].id.String()].Shard.DumpInfo().BlobStorInfo.SubStorages[1].Path
|
||||
p2 := te.ng.shards[te.shards[1].id.String()].Shard.DumpInfo().BlobStorInfo.SubStorages[1].Path
|
||||
tmp := filepath.Join(dir, "tmp")
|
||||
require.NoError(t, os.Rename(p1, tmp))
|
||||
require.NoError(t, os.Rename(p2, p1))
|
||||
require.NoError(t, os.Rename(tmp, p2))
|
||||
|
||||
e, _, id = newEngineWithErrorThreshold(t, dir, 1)
|
||||
te = newEngineWithErrorThreshold(t, dir, 1)
|
||||
|
||||
for i := range objs {
|
||||
addr := object.AddressOf(objs[i])
|
||||
getRes, err := e.Get(GetPrm{addr: addr})
|
||||
getRes, err := te.ng.Get(GetPrm{addr: addr})
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, objs[i], getRes.Object())
|
||||
|
||||
rngRes, err := e.GetRange(RngPrm{addr: addr, off: 1, ln: 10})
|
||||
rngRes, err := te.ng.GetRange(RngPrm{addr: addr, off: 1, ln: 10})
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, objs[i].Payload()[1:11], rngRes.Object().Payload())
|
||||
|
||||
_, err = e.GetRange(RngPrm{addr: addr, off: errSmallSize + 10, ln: 1})
|
||||
_, err = te.ng.GetRange(RngPrm{addr: addr, off: errSmallSize + 10, ln: 1})
|
||||
require.ErrorAs(t, err, &apistatus.ObjectOutOfRange{})
|
||||
}
|
||||
|
||||
checkShardState(t, e, id[0], 1, mode.DegradedReadOnly)
|
||||
checkShardState(t, e, id[1], 0, mode.ReadWrite)
|
||||
checkShardState(t, te.ng, te.shards[0].id, 1, mode.DegradedReadOnly)
|
||||
checkShardState(t, te.ng, te.shards[1].id, 0, mode.ReadWrite)
|
||||
}
|
||||
|
||||
func checkShardState(t *testing.T, e *StorageEngine, id *shard.ID, errCount uint32, mode mode.Mode) {
|
||||
|
@ -204,19 +234,3 @@ func checkShardState(t *testing.T, e *StorageEngine, id *shard.ID, errCount uint
|
|||
require.Equal(t, errCount, sh.errorCount.Load())
|
||||
require.Equal(t, mode, sh.GetMode())
|
||||
}
|
||||
|
||||
// corruptSubDir makes random directory except "blobovnicza" in blobstor FSTree unreadable.
|
||||
func corruptSubDir(t *testing.T, dir string) {
|
||||
de, err := os.ReadDir(dir)
|
||||
require.NoError(t, err)
|
||||
|
||||
// FIXME(@cthulhu-rider): copy-paste of unexported const from blobstor package, see #1407
|
||||
const dirBlobovnicza = "blobovnicza"
|
||||
|
||||
for i := range de {
|
||||
if de[i].IsDir() && de[i].Name() != dirBlobovnicza {
|
||||
require.NoError(t, os.Chmod(filepath.Join(dir, de[i].Name()), 0))
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -17,7 +17,6 @@ import (
|
|||
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
objecttest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
|
||||
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
|
||||
"github.com/panjf2000/ants/v2"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
@ -191,7 +190,7 @@ func TestLockExpiration(t *testing.T) {
|
|||
require.NoError(t, err)
|
||||
|
||||
var inhumePrm InhumePrm
|
||||
inhumePrm.WithTarget(objecttest.Address(), objectcore.AddressOf(obj))
|
||||
inhumePrm.WithTarget(oidtest.Address(), objectcore.AddressOf(obj))
|
||||
|
||||
_, err = e.Inhume(inhumePrm)
|
||||
require.ErrorAs(t, err, new(apistatus.ObjectLocked))
|
||||
|
@ -204,7 +203,7 @@ func TestLockExpiration(t *testing.T) {
|
|||
time.Sleep(time.Second)
|
||||
|
||||
// 4.
|
||||
inhumePrm.WithTarget(objecttest.Address(), objectcore.AddressOf(obj))
|
||||
inhumePrm.WithTarget(oidtest.Address(), objectcore.AddressOf(obj))
|
||||
|
||||
_, err = e.Inhume(inhumePrm)
|
||||
require.NoError(t, err)
|
||||
|
@ -263,7 +262,7 @@ func TestLockForceRemoval(t *testing.T) {
|
|||
_, err = e.Inhume(inhumePrm)
|
||||
require.ErrorAs(t, err, new(apistatus.ObjectLocked))
|
||||
|
||||
inhumePrm.WithTarget(objecttest.Address(), objectcore.AddressOf(obj))
|
||||
inhumePrm.WithTarget(oidtest.Address(), objectcore.AddressOf(obj))
|
||||
|
||||
_, err = e.Inhume(inhumePrm)
|
||||
require.ErrorAs(t, err, new(apistatus.ObjectLocked))
|
||||
|
|
|
@ -23,7 +23,7 @@ func BenchmarkTreeVsSearch(b *testing.B) {
|
|||
}
|
||||
|
||||
func benchmarkTreeVsSearch(b *testing.B, objCount int) {
|
||||
e, _, _ := newEngineWithErrorThreshold(b, "", 0)
|
||||
te := newEngineWithErrorThreshold(b, "", 0)
|
||||
cid := cidtest.ID()
|
||||
d := pilorama.CIDDescriptor{CID: cid, Position: 0, Size: 1}
|
||||
treeID := "someTree"
|
||||
|
@ -31,11 +31,11 @@ func benchmarkTreeVsSearch(b *testing.B, objCount int) {
|
|||
for i := 0; i < objCount; i++ {
|
||||
obj := testutil.GenerateObjectWithCID(cid)
|
||||
testutil.AddAttribute(obj, pilorama.AttributeFilename, strconv.Itoa(i))
|
||||
err := Put(e, obj)
|
||||
err := Put(te.ng, obj)
|
||||
if err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
_, err = e.TreeAddByPath(d, treeID, pilorama.AttributeFilename, nil,
|
||||
_, err = te.ng.TreeAddByPath(d, treeID, pilorama.AttributeFilename, nil,
|
||||
[]pilorama.KeyValue{{pilorama.AttributeFilename, []byte(strconv.Itoa(i))}})
|
||||
if err != nil {
|
||||
b.Fatal(err)
|
||||
|
@ -51,7 +51,7 @@ func benchmarkTreeVsSearch(b *testing.B, objCount int) {
|
|||
prm.WithFilters(fs)
|
||||
|
||||
for i := 0; i < b.N; i++ {
|
||||
res, err := e.Select(prm)
|
||||
res, err := te.ng.Select(prm)
|
||||
if err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
|
@ -62,7 +62,7 @@ func benchmarkTreeVsSearch(b *testing.B, objCount int) {
|
|||
})
|
||||
b.Run("TreeGetByPath", func(b *testing.B) {
|
||||
for i := 0; i < b.N; i++ {
|
||||
nodes, err := e.TreeGetByPath(cid, treeID, pilorama.AttributeFilename, []string{strconv.Itoa(objCount / 2)}, true)
|
||||
nodes, err := te.ng.TreeGetByPath(cid, treeID, pilorama.AttributeFilename, []string{strconv.Itoa(objCount / 2)}, true)
|
||||
if err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
|
|
|
@ -66,6 +66,7 @@ func NewBoltForest(opts ...Option) ForestStorage {
|
|||
perm: os.ModePerm,
|
||||
maxBatchDelay: bbolt.DefaultMaxBatchDelay,
|
||||
maxBatchSize: bbolt.DefaultMaxBatchSize,
|
||||
openFile: os.OpenFile,
|
||||
},
|
||||
}
|
||||
|
||||
|
@ -107,6 +108,7 @@ func (t *boltForest) Open(readOnly bool) error {
|
|||
opts.ReadOnly = readOnly
|
||||
opts.NoSync = t.noSync
|
||||
opts.Timeout = 100 * time.Millisecond
|
||||
opts.OpenFile = t.openFile
|
||||
|
||||
t.db, err = bbolt.Open(t.path, t.perm, &opts)
|
||||
if err != nil {
|
||||
|
|
|
@ -2,6 +2,7 @@ package pilorama
|
|||
|
||||
import (
|
||||
"io/fs"
|
||||
"os"
|
||||
"time"
|
||||
)
|
||||
|
||||
|
@ -13,6 +14,7 @@ type cfg struct {
|
|||
noSync bool
|
||||
maxBatchDelay time.Duration
|
||||
maxBatchSize int
|
||||
openFile func(string, int, fs.FileMode) (*os.File, error)
|
||||
}
|
||||
|
||||
func WithPath(path string) Option {
|
||||
|
@ -44,3 +46,9 @@ func WithMaxBatchSize(size int) Option {
|
|||
c.maxBatchSize = size
|
||||
}
|
||||
}
|
||||
|
||||
func WithOpenFile(openFile func(string, int, fs.FileMode) (*os.File, error)) Option {
|
||||
return func(c *cfg) {
|
||||
c.openFile = openFile
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,14 +1,18 @@
|
|||
package shard
|
||||
|
||||
import (
|
||||
"io/fs"
|
||||
"math"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/teststore"
|
||||
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
|
||||
|
@ -22,6 +26,7 @@ import (
|
|||
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
|
||||
objecttest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/test"
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.etcd.io/bbolt"
|
||||
"go.uber.org/zap/zaptest"
|
||||
)
|
||||
|
||||
|
@ -40,19 +45,33 @@ func TestShardOpen(t *testing.T) {
|
|||
dir := t.TempDir()
|
||||
metaPath := filepath.Join(dir, "meta")
|
||||
|
||||
st := teststore.New(teststore.WithSubstorage(fstree.New(
|
||||
fstree.WithDirNameLen(2),
|
||||
fstree.WithPath(filepath.Join(dir, "blob")),
|
||||
fstree.WithDepth(1)),
|
||||
))
|
||||
|
||||
var allowedMode atomic.Int64
|
||||
openFileMetabase := func(p string, f int, perm fs.FileMode) (*os.File, error) {
|
||||
const modeMask = os.O_RDONLY | os.O_RDWR | os.O_WRONLY
|
||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
`3` looks like a magic here, can we define a constant as `os.O_RDONLY | os.O_RDWR | os.O_WRONLY`? Or somehow refer to them in the comment for the named constant.
ale64bit
commented
done done
|
||||
if int64(f&modeMask) == allowedMode.Load() {
|
||||
return os.OpenFile(p, f, perm)
|
||||
}
|
||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
Maybe Maybe `syscall.EPERM` or `fs.ErrPermission`?
ale64bit
commented
done. Is there any value in returning an error that is guaranteed to not be specially handled or better to use a relevant error for this path? done. Is there any value in returning an error that is guaranteed to _not_ be specially handled or better to use a relevant error for this path?
|
||||
return nil, fs.ErrPermission
|
||||
}
|
||||
|
||||
newShard := func() *Shard {
|
||||
return New(
|
||||
WithLogger(&logger.Logger{Logger: zaptest.NewLogger(t)}),
|
||||
WithBlobStorOptions(
|
||||
blobstor.WithStorages([]blobstor.SubStorage{
|
||||
{
|
||||
Storage: fstree.New(
|
||||
fstree.WithDirNameLen(2),
|
||||
fstree.WithPath(filepath.Join(dir, "blob")),
|
||||
fstree.WithDepth(1)),
|
||||
},
|
||||
{Storage: st},
|
||||
})),
|
||||
WithMetaBaseOptions(meta.WithPath(metaPath), meta.WithEpochState(epochState{})),
|
||||
WithMetaBaseOptions(
|
||||
meta.WithPath(metaPath),
|
||||
meta.WithEpochState(epochState{}),
|
||||
meta.WithBoltDBOptions(&bbolt.Options{OpenFile: openFileMetabase}),
|
||||
),
|
||||
WithPiloramaOptions(
|
||||
pilorama.WithPath(filepath.Join(dir, "pilorama"))),
|
||||
WithWriteCache(true),
|
||||
|
@ -60,6 +79,8 @@ func TestShardOpen(t *testing.T) {
|
|||
writecache.WithPath(filepath.Join(dir, "wc"))))
|
||||
}
|
||||
|
||||
allowedMode.Store(int64(os.O_RDWR))
|
||||
|
||||
sh := newShard()
|
||||
require.NoError(t, sh.Open())
|
||||
require.NoError(t, sh.Init())
|
||||
|
@ -67,7 +88,8 @@ func TestShardOpen(t *testing.T) {
|
|||
require.NoError(t, sh.Close())
|
||||
|
||||
// Metabase can be opened in read-only => start in ReadOnly mode.
|
||||
require.NoError(t, os.Chmod(metaPath, 0444))
|
||||
allowedMode.Store(int64(os.O_RDONLY))
|
||||
|
||||
sh = newShard()
|
||||
require.NoError(t, sh.Open())
|
||||
require.NoError(t, sh.Init())
|
||||
|
@ -77,7 +99,8 @@ func TestShardOpen(t *testing.T) {
|
|||
require.NoError(t, sh.Close())
|
||||
|
||||
// Metabase is corrupted => start in DegradedReadOnly mode.
|
||||
require.NoError(t, os.Chmod(metaPath, 0000))
|
||||
allowedMode.Store(math.MaxInt64)
|
||||
|
||||
sh = newShard()
|
||||
require.NoError(t, sh.Open())
|
||||
require.NoError(t, sh.Init())
|
||||
|
|
|
@ -46,12 +46,13 @@ func TestFlush(t *testing.T) {
|
|||
require.NoError(t, mb.Open(false))
|
||||
require.NoError(t, mb.Init())
|
||||
|
||||
fsTree := fstree.New(
|
||||
fstree.WithPath(filepath.Join(dir, "blob")),
|
||||
fstree.WithDepth(0),
|
||||
fstree.WithDirNameLen(1))
|
||||
bs := blobstor.New(blobstor.WithStorages([]blobstor.SubStorage{
|
||||
{Storage: fsTree},
|
||||
{
|
||||
Storage: fstree.New(
|
||||
fstree.WithPath(filepath.Join(dir, "blob")),
|
||||
fstree.WithDepth(0),
|
||||
fstree.WithDirNameLen(1)),
|
||||
},
|
||||
}))
|
||||
require.NoError(t, bs.Open(false))
|
||||
require.NoError(t, bs.Init())
|
||||
|
@ -208,7 +209,7 @@ func TestFlush(t *testing.T) {
|
|||
|
||||
_, err = os.Stat(p) // sanity check
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, os.Chmod(p, 0))
|
||||
require.NoError(t, os.Truncate(p, 0)) // corrupt the file contents, so that it can't be unmarshalled
|
||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
What do we achieve with What do we achieve with `Truncate` here?
ale64bit
commented
`os.Chmod` was used here to cause the file to not be read properly, but the same can be achieved with `os.Truncate` so that it can't be unmarshalled. I thought at least `os.Truncate` didn't depend on the user running the test.
fyrchik
commented
I mean the purpose is not obvious to me, as a code reader. May be just write I mean the purpose is not obvious to me, as a code reader. May be just write `"corrupted data"` in the file?
ale64bit
commented
ah I see. Yes, good point. I added a comment about it on this line. ah I see. Yes, good point. I added a comment about it on this line.
|
||||
})
|
||||
})
|
||||
t.Run("fs, invalid object", func(t *testing.T) {
|
||||
|
|
|
@ -1,6 +1,8 @@
|
|||
package writecache
|
||||
|
||||
import (
|
||||
"io/fs"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
|
||||
|
@ -55,6 +57,8 @@ type options struct {
|
|||
noSync bool
|
||||
// reportError is the function called when encountering disk errors in background workers.
|
||||
reportError func(string, error)
|
||||
// openFile is the function called internally by bbolt to open database files. Useful for hermetic testing.
|
||||
openFile func(string, int, fs.FileMode) (*os.File, error)
|
||||
}
|
||||
|
||||
// WithLogger sets logger.
|
||||
|
@ -152,3 +156,10 @@ func WithReportErrorFunc(f func(string, error)) Option {
|
|||
o.reportError = f
|
||||
}
|
||||
}
|
||||
|
||||
// WithOpenFile sets the OpenFile function to use internally by bolt. Useful for hermetic testing.
|
||||
func WithOpenFile(f func(string, int, fs.FileMode) (*os.File, error)) Option {
|
||||
return func(o *options) {
|
||||
o.openFile = f
|
||||
}
|
||||
}
|
||||
|
|
|
@ -43,7 +43,7 @@ func (c *cache) openStore(readOnly bool) error {
|
|||
return err
|
||||
}
|
||||
|
||||
c.db, err = OpenDB(c.path, readOnly)
|
||||
c.db, err = OpenDB(c.path, readOnly, c.openFile)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not open database: %w", err)
|
||||
}
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package writecache
|
||||
|
||||
import (
|
||||
"io/fs"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"time"
|
||||
|
@ -9,10 +10,11 @@ import (
|
|||
)
|
||||
|
||||
// OpenDB opens BoltDB instance for write-cache. Opens in read-only mode if ro is true.
|
||||
func OpenDB(p string, ro bool) (*bbolt.DB, error) {
|
||||
func OpenDB(p string, ro bool, openFile func(string, int, fs.FileMode) (*os.File, error)) (*bbolt.DB, error) {
|
||||
return bbolt.Open(filepath.Join(p, dbName), os.ModePerm, &bbolt.Options{
|
||||
NoFreelistSync: true,
|
||||
ReadOnly: ro,
|
||||
Timeout: 100 * time.Millisecond,
|
||||
OpenFile: openFile,
|
||||
})
|
||||
}
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package writecache
|
||||
|
||||
import (
|
||||
"os"
|
||||
"sync"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||
|
@ -106,6 +107,7 @@ func New(opts ...Option) Cache {
|
|||
maxCacheSize: defaultMaxCacheSize,
|
||||
maxBatchSize: bbolt.DefaultMaxBatchSize,
|
||||
maxBatchDelay: bbolt.DefaultMaxBatchDelay,
|
||||
openFile: os.OpenFile,
|
||||
},
|
||||
}
|
||||
|
||||
|
|
Do we need to return the last 2 arguments? Why not casting a type from
SubStorage
?What's wrong with returning both arguments? Seems to me that way it's more clear what is returned and the cast/indexing is unnecessary (those assume a type and slice size).
That the number of returned arguments directly depends on the slice length, and casting is not that hard.
I don't insist, just share an opinion.
Seems to me it's more clear to return struct with named fields. Casting is evil and should be prohibited.