[#xx] Fix writecache benchmarks and refactor hacky NeedsCompression #586
17 changed files with 161 additions and 92 deletions
|
@ -252,6 +252,10 @@ func (b *Blobovniczas) SetCompressor(cc *compression.Config) {
|
||||||
b.compression = cc
|
b.compression = cc
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (b *Blobovniczas) Compressor() *compression.Config {
|
||||||
|
return b.compression
|
||||||
|
}
|
||||||
|
|
||||||
// SetReportErrorFunc implements common.Storage.
|
// SetReportErrorFunc implements common.Storage.
|
||||||
func (b *Blobovniczas) SetReportErrorFunc(f func(string, error)) {
|
func (b *Blobovniczas) SetReportErrorFunc(f func(string, error)) {
|
||||||
b.reportError = f
|
b.reportError = f
|
||||||
|
|
|
@ -128,3 +128,7 @@ func WithMetrics(m Metrics) Option {
|
||||||
c.metrics = m
|
c.metrics = m
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (b *BlobStor) Compressor() *compression.Config {
|
||||||
|
return &b.cfg.compression
|
||||||
|
}
|
||||||
|
|
|
@ -146,33 +146,33 @@ func TestBlobstor_needsCompression(t *testing.T) {
|
||||||
b := newBlobStor(t, true, "audio/*", "*/x-mpeg", "*/mpeg", "application/x-midi")
|
b := newBlobStor(t, true, "audio/*", "*/x-mpeg", "*/mpeg", "application/x-midi")
|
||||||
|
|
||||||
obj := newObjectWithCt("video/mpeg")
|
obj := newObjectWithCt("video/mpeg")
|
||||||
require.False(t, b.NeedsCompression(obj))
|
require.False(t, b.compression.NeedsCompression(obj))
|
||||||
|
|
||||||
obj = newObjectWithCt("audio/aiff")
|
obj = newObjectWithCt("audio/aiff")
|
||||||
require.False(t, b.NeedsCompression(obj))
|
require.False(t, b.compression.NeedsCompression(obj))
|
||||||
|
|
||||||
obj = newObjectWithCt("application/x-midi")
|
obj = newObjectWithCt("application/x-midi")
|
||||||
require.False(t, b.NeedsCompression(obj))
|
require.False(t, b.compression.NeedsCompression(obj))
|
||||||
|
|
||||||
obj = newObjectWithCt("text/plain")
|
obj = newObjectWithCt("text/plain")
|
||||||
require.True(t, b.NeedsCompression(obj))
|
require.True(t, b.compression.NeedsCompression(obj))
|
||||||
|
|
||||||
obj = newObjectWithCt("")
|
obj = newObjectWithCt("")
|
||||||
require.True(t, b.NeedsCompression(obj))
|
require.True(t, b.compression.NeedsCompression(obj))
|
||||||
})
|
})
|
||||||
t.Run("content-types omitted", func(t *testing.T) {
|
t.Run("content-types omitted", func(t *testing.T) {
|
||||||
b := newBlobStor(t, true)
|
b := newBlobStor(t, true)
|
||||||
obj := newObjectWithCt("video/mpeg")
|
obj := newObjectWithCt("video/mpeg")
|
||||||
require.True(t, b.NeedsCompression(obj))
|
require.True(t, b.compression.NeedsCompression(obj))
|
||||||
})
|
})
|
||||||
t.Run("compress disabled", func(t *testing.T) {
|
t.Run("compress disabled", func(t *testing.T) {
|
||||||
b := newBlobStor(t, false, "video/mpeg")
|
b := newBlobStor(t, false, "video/mpeg")
|
||||||
|
|
||||||
obj := newObjectWithCt("video/mpeg")
|
obj := newObjectWithCt("video/mpeg")
|
||||||
require.False(t, b.NeedsCompression(obj))
|
require.False(t, b.compression.NeedsCompression(obj))
|
||||||
|
|
||||||
obj = newObjectWithCt("text/plain")
|
obj = newObjectWithCt("text/plain")
|
||||||
require.False(t, b.NeedsCompression(obj))
|
require.False(t, b.compression.NeedsCompression(obj))
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -15,7 +15,10 @@ type Storage interface {
|
||||||
|
|
||||||
Type() string
|
Type() string
|
||||||
Path() string
|
Path() string
|
||||||
|
|
||||||
SetCompressor(cc *compression.Config)
|
SetCompressor(cc *compression.Config)
|
||||||
|
Compressor() *compression.Config
|
||||||
|
|
||||||
// SetReportErrorFunc allows to provide a function to be called on disk errors.
|
// SetReportErrorFunc allows to provide a function to be called on disk errors.
|
||||||
// This function MUST be called before Open.
|
// This function MUST be called before Open.
|
||||||
SetReportErrorFunc(f func(string, error))
|
SetReportErrorFunc(f func(string, error))
|
||||||
|
|
|
@ -550,6 +550,10 @@ func (t *FSTree) SetCompressor(cc *compression.Config) {
|
||||||
t.Config = cc
|
t.Config = cc
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (t *FSTree) Compressor() *compression.Config {
|
||||||
|
return t.Config
|
||||||
|
}
|
||||||
|
|
||||||
// SetReportErrorFunc implements common.Storage.
|
// SetReportErrorFunc implements common.Storage.
|
||||||
func (t *FSTree) SetReportErrorFunc(_ func(string, error)) {
|
func (t *FSTree) SetReportErrorFunc(_ func(string, error)) {
|
||||||
// Do nothing, FSTree can encounter only one error which is returned.
|
// Do nothing, FSTree can encounter only one error which is returned.
|
||||||
|
|
|
@ -12,5 +12,6 @@ func (s *memstoreImpl) Close() error { return nil }
|
||||||
func (s *memstoreImpl) Type() string { return Type }
|
func (s *memstoreImpl) Type() string { return Type }
|
||||||
func (s *memstoreImpl) Path() string { return s.rootPath }
|
func (s *memstoreImpl) Path() string { return s.rootPath }
|
||||||
func (s *memstoreImpl) SetCompressor(cc *compression.Config) { s.compression = cc }
|
func (s *memstoreImpl) SetCompressor(cc *compression.Config) { s.compression = cc }
|
||||||
|
func (s *memstoreImpl) Compressor() *compression.Config { return s.compression }
|
||||||
func (s *memstoreImpl) SetReportErrorFunc(f func(string, error)) { s.reportError = f }
|
func (s *memstoreImpl) SetReportErrorFunc(f func(string, error)) { s.reportError = f }
|
||||||
func (s *memstoreImpl) SetParentID(string) {}
|
func (s *memstoreImpl) SetParentID(string) {}
|
||||||
|
|
|
@ -9,7 +9,6 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
|
||||||
"go.opentelemetry.io/otel/attribute"
|
"go.opentelemetry.io/otel/attribute"
|
||||||
"go.opentelemetry.io/otel/trace"
|
"go.opentelemetry.io/otel/trace"
|
||||||
)
|
)
|
||||||
|
@ -72,11 +71,3 @@ func (b *BlobStor) Put(ctx context.Context, prm common.PutPrm) (common.PutRes, e
|
||||||
|
|
||||||
return common.PutRes{}, ErrNoPlaceFound
|
return common.PutRes{}, ErrNoPlaceFound
|
||||||
}
|
}
|
||||||
|
|
||||||
// NeedsCompression returns true if the object should be compressed.
|
|
||||||
// For an object to be compressed 2 conditions must hold:
|
|
||||||
// 1. Compression is enabled in settings.
|
|
||||||
// 2. Object MIME Content-Type is allowed for compression.
|
|
||||||
func (b *BlobStor) NeedsCompression(obj *objectSDK.Object) bool {
|
|
||||||
return b.cfg.compression.NeedsCompression(obj)
|
|
||||||
}
|
|
||||||
|
|
|
@ -15,6 +15,7 @@ type cfg struct {
|
||||||
Type func() string
|
Type func() string
|
||||||
Path func() string
|
Path func() string
|
||||||
SetCompressor func(cc *compression.Config)
|
SetCompressor func(cc *compression.Config)
|
||||||
|
Compressor func() *compression.Config
|
||||||
SetReportErrorFunc func(f func(string, error))
|
SetReportErrorFunc func(f func(string, error))
|
||||||
|
|
||||||
Get func(common.GetPrm) (common.GetRes, error)
|
Get func(common.GetPrm) (common.GetRes, error)
|
||||||
|
@ -45,6 +46,10 @@ func WithSetCompressor(f func(*compression.Config)) Option {
|
||||||
return func(c *cfg) { c.overrides.SetCompressor = f }
|
return func(c *cfg) { c.overrides.SetCompressor = f }
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func WithCompressor(f func() *compression.Config) Option {
|
||||||
|
return func(c *cfg) { c.overrides.Compressor = f }
|
||||||
|
}
|
||||||
|
|
||||||
func WithReportErrorFunc(f func(func(string, error))) Option {
|
func WithReportErrorFunc(f func(func(string, error))) Option {
|
||||||
return func(c *cfg) { c.overrides.SetReportErrorFunc = f }
|
return func(c *cfg) { c.overrides.SetReportErrorFunc = f }
|
||||||
}
|
}
|
||||||
|
|
|
@ -128,6 +128,19 @@ func (s *TestStore) SetCompressor(cc *compression.Config) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *TestStore) Compressor() *compression.Config {
|
||||||
|
s.mu.RLock()
|
||||||
|
defer s.mu.RUnlock()
|
||||||
|
switch {
|
||||||
|
case s.overrides.Compressor != nil:
|
||||||
|
return s.overrides.Compressor()
|
||||||
|
case s.st != nil:
|
||||||
|
return s.st.Compressor()
|
||||||
|
default:
|
||||||
|
panic("unexpected storage call: Compressor()")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (s *TestStore) SetReportErrorFunc(f func(string, error)) {
|
func (s *TestStore) SetReportErrorFunc(f func(string, error)) {
|
||||||
s.mu.RLock()
|
s.mu.RLock()
|
||||||
defer s.mu.RUnlock()
|
defer s.mu.RUnlock()
|
||||||
|
|
|
@ -3,50 +3,120 @@ package benchmark
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"testing"
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/teststore"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
|
||||||
|
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache/writecachebadger"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache/writecachebadger"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache/writecachebbolt"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache/writecachebbolt"
|
||||||
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
func BenchmarkWritecache(b *testing.B) {
|
func BenchmarkWritecacheSeq(b *testing.B) {
|
||||||
b.Run("bbolt", func(b *testing.B) {
|
const payloadSize = 8 << 10
|
||||||
cache := writecachebbolt.New(
|
b.Run("bbolt_seq", func(b *testing.B) {
|
||||||
writecachebbolt.WithPath(b.TempDir()),
|
benchmarkPutSeq(b, newBBoltCache(b), payloadSize)
|
||||||
)
|
|
||||||
benchmarkPut(b, cache)
|
|
||||||
})
|
})
|
||||||
b.Run("badger", func(b *testing.B) {
|
b.Run("badger_seq", func(b *testing.B) {
|
||||||
cache := writecachebadger.New(
|
benchmarkPutSeq(b, newBadgerCache(b), payloadSize)
|
||||||
writecachebadger.WithPath(b.TempDir()),
|
|
||||||
)
|
|
||||||
benchmarkPut(b, cache)
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func benchmarkPut(b *testing.B, cache writecache.Cache) {
|
func BenchmarkWritecachePar(b *testing.B) {
|
||||||
if err := cache.Open(false); err != nil {
|
const payloadSize = 8 << 10
|
||||||
b.Fatalf("initializing: %v", err)
|
b.Run("bbolt_par", func(b *testing.B) {
|
||||||
|
benchmarkPutPar(b, newBBoltCache(b), payloadSize)
|
||||||
|
})
|
||||||
|
b.Run("badger_par", func(b *testing.B) {
|
||||||
|
benchmarkPutPar(b, newBadgerCache(b), payloadSize)
|
||||||
|
})
|
||||||
}
|
}
|
||||||
if err := cache.Init(); err != nil {
|
|
||||||
b.Fatalf("opening: %v", err)
|
func benchmarkPutSeq(b *testing.B, cache writecache.Cache, size uint64) {
|
||||||
}
|
benchmarkPutPrepare(b, cache)
|
||||||
defer cache.Close()
|
|
||||||
|
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
objGen := testutil.RandObjGenerator{ObjSize: 8 << 10}
|
objGen := testutil.RandObjGenerator{ObjSize: size}
|
||||||
|
|
||||||
b.ResetTimer()
|
b.ResetTimer()
|
||||||
for n := 0; n < b.N; n++ {
|
for n := 0; n < b.N; n++ {
|
||||||
|
obj := objGen.Next()
|
||||||
|
rawData, err := obj.Marshal()
|
||||||
|
require.NoError(b, err, "marshaling object")
|
||||||
prm := common.PutPrm{
|
prm := common.PutPrm{
|
||||||
Address: oidtest.Address(),
|
Address: testutil.AddressFromObject(b, obj),
|
||||||
Object: objGen.Next(),
|
Object: obj,
|
||||||
|
RawData: rawData,
|
||||||
}
|
}
|
||||||
if _, err := cache.Put(ctx, prm); err != nil {
|
if _, err := cache.Put(ctx, prm); err != nil {
|
||||||
b.Fatalf("putting: %v", err)
|
b.Fatalf("putting: %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func benchmarkPutPar(b *testing.B, cache writecache.Cache, size uint64) {
|
||||||
|
benchmarkPutPrepare(b, cache)
|
||||||
|
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
b.ResetTimer()
|
||||||
|
b.RunParallel(func(pb *testing.PB) {
|
||||||
|
objGen := testutil.RandObjGenerator{ObjSize: size}
|
||||||
|
for pb.Next() {
|
||||||
|
obj := objGen.Next()
|
||||||
|
rawData, err := obj.Marshal()
|
||||||
|
require.NoError(b, err, "marshaling object")
|
||||||
|
prm := common.PutPrm{
|
||||||
|
Address: testutil.AddressFromObject(b, obj),
|
||||||
|
Object: obj,
|
||||||
|
RawData: rawData,
|
||||||
|
}
|
||||||
|
if _, err := cache.Put(ctx, prm); err != nil {
|
||||||
|
b.Fatalf("putting: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func benchmarkPutPrepare(b *testing.B, cache writecache.Cache) {
|
||||||
|
require.NoError(b, cache.Open(false), "opening")
|
||||||
|
require.NoError(b, cache.Init(), "initializing")
|
||||||
|
b.Cleanup(func() {
|
||||||
|
require.NoError(b, cache.Close(), "closing")
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
type testMetabase struct{}
|
||||||
|
|
||||||
|
func (testMetabase) UpdateStorageID(meta.UpdateStorageIDPrm) (meta.UpdateStorageIDRes, error) {
|
||||||
|
return meta.UpdateStorageIDRes{}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func newBBoltCache(b *testing.B) writecache.Cache {
|
||||||
|
bs := teststore.New(
|
||||||
|
teststore.WithPut(func(pp common.PutPrm) (common.PutRes, error) { return common.PutRes{}, nil }),
|
||||||
|
)
|
||||||
|
return writecachebbolt.New(
|
||||||
|
writecachebbolt.WithPath(b.TempDir()),
|
||||||
|
writecachebbolt.WithBlobstor(bs),
|
||||||
|
writecachebbolt.WithMetabase(testMetabase{}),
|
||||||
|
writecachebbolt.WithMaxCacheSize(256<<30),
|
||||||
|
writecachebbolt.WithSmallObjectSize(128<<10),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
func newBadgerCache(b *testing.B) writecache.Cache {
|
||||||
|
bs := teststore.New(
|
||||||
|
teststore.WithPut(func(pp common.PutPrm) (common.PutRes, error) { return common.PutRes{}, nil }),
|
||||||
|
)
|
||||||
|
return writecachebadger.New(
|
||||||
|
writecachebadger.WithPath(b.TempDir()),
|
||||||
|
writecachebadger.WithBlobstor(bs),
|
||||||
|
writecachebadger.WithMetabase(testMetabase{}),
|
||||||
|
writecachebadger.WithMaxCacheSize(256<<30),
|
||||||
|
writecachebadger.WithGCInterval(10*time.Second),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
|
@ -5,6 +5,8 @@ import (
|
||||||
"errors"
|
"errors"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/compression"
|
||||||
|
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||||
|
@ -40,6 +42,18 @@ type Cache interface {
|
||||||
Close() error
|
Close() error
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// MainStorage is the interface of the underlying storage of Cache implementations.
|
||||||
|
type MainStorage interface {
|
||||||
|
Compressor() *compression.Config
|
||||||
|
Exists(context.Context, common.ExistsPrm) (common.ExistsRes, error)
|
||||||
|
Put(context.Context, common.PutPrm) (common.PutRes, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Metabase is the interface of the metabase used by Cache implementations.
|
||||||
|
type Metabase interface {
|
||||||
|
UpdateStorageID(meta.UpdateStorageIDPrm) (meta.UpdateStorageIDRes, error)
|
||||||
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
// ErrReadOnly is returned when Put/Write is performed in a read-only mode.
|
// ErrReadOnly is returned when Put/Write is performed in a read-only mode.
|
||||||
ErrReadOnly = logicerr.New("write-cache is in read-only mode")
|
ErrReadOnly = logicerr.New("write-cache is in read-only mode")
|
||||||
|
|
|
@ -6,7 +6,6 @@ import (
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
|
||||||
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache"
|
||||||
|
@ -19,7 +18,7 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestFlush(t *testing.T) {
|
func TestFlush(t *testing.T) {
|
||||||
createCacheFn := func(t *testing.T, smallSize uint64, mb *meta.DB, bs *blobstor.BlobStor, opts ...Option) writecache.Cache {
|
createCacheFn := func(t *testing.T, smallSize uint64, mb *meta.DB, bs writecache.MainStorage, opts ...Option) writecache.Cache {
|
||||||
return New(
|
return New(
|
||||||
append([]Option{
|
append([]Option{
|
||||||
WithLogger(&logger.Logger{Logger: zaptest.NewLogger(t)}),
|
WithLogger(&logger.Logger{Logger: zaptest.NewLogger(t)}),
|
||||||
|
|
|
@ -1,43 +1,24 @@
|
||||||
package writecachebadger
|
package writecachebadger
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
|
||||||
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Option represents write-cache configuration option.
|
// Option represents write-cache configuration option.
|
||||||
type Option func(*options)
|
type Option func(*options)
|
||||||
|
|
||||||
// meta is an interface for a metabase.
|
|
||||||
type metabase interface {
|
|
||||||
Exists(context.Context, meta.ExistsPrm) (meta.ExistsRes, error)
|
|
||||||
StorageID(context.Context, meta.StorageIDPrm) (meta.StorageIDRes, error)
|
|
||||||
UpdateStorageID(meta.UpdateStorageIDPrm) (meta.UpdateStorageIDRes, error)
|
|
||||||
}
|
|
||||||
|
|
||||||
// blob is an interface for the blobstor.
|
|
||||||
type blob interface {
|
|
||||||
Put(context.Context, common.PutPrm) (common.PutRes, error)
|
|
||||||
NeedsCompression(obj *objectSDK.Object) bool
|
|
||||||
Exists(ctx context.Context, res common.ExistsPrm) (common.ExistsRes, error)
|
|
||||||
}
|
|
||||||
|
|
||||||
type options struct {
|
type options struct {
|
||||||
log *logger.Logger
|
log *logger.Logger
|
||||||
// path is a path to a directory for write-cache.
|
// path is a path to a directory for write-cache.
|
||||||
path string
|
path string
|
||||||
// blobstor is the main persistent storage.
|
// blobstor is the main persistent storage.
|
||||||
blobstor blob
|
blobstor writecache.MainStorage
|
||||||
fyrchik marked this conversation as resolved
Outdated
|
|||||||
// metabase is the metabase instance.
|
// metabase is the metabase instance.
|
||||||
metabase metabase
|
metabase writecache.Metabase
|
||||||
// maxObjectSize is the maximum size of the object stored in the write-cache.
|
// maxObjectSize is the maximum size of the object stored in the write-cache.
|
||||||
maxObjectSize uint64
|
maxObjectSize uint64
|
||||||
// workersCount is the number of workers flushing objects in parallel.
|
// workersCount is the number of workers flushing objects in parallel.
|
||||||
|
@ -70,14 +51,14 @@ func WithPath(path string) Option {
|
||||||
}
|
}
|
||||||
|
|
||||||
// WithBlobstor sets main object storage.
|
// WithBlobstor sets main object storage.
|
||||||
func WithBlobstor(bs *blobstor.BlobStor) Option {
|
func WithBlobstor(bs writecache.MainStorage) Option {
|
||||||
return func(o *options) {
|
return func(o *options) {
|
||||||
o.blobstor = bs
|
o.blobstor = bs
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// WithMetabase sets metabase.
|
// WithMetabase sets metabase.
|
||||||
func WithMetabase(db *meta.DB) Option {
|
func WithMetabase(db writecache.Metabase) Option {
|
||||||
return func(o *options) {
|
return func(o *options) {
|
||||||
o.metabase = db
|
o.metabase = db
|
||||||
}
|
}
|
||||||
|
|
|
@ -8,7 +8,6 @@ import (
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
|
||||||
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
||||||
|
@ -22,7 +21,7 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestFlush(t *testing.T) {
|
func TestFlush(t *testing.T) {
|
||||||
createCacheFn := func(t *testing.T, smallSize uint64, mb *meta.DB, bs *blobstor.BlobStor, opts ...Option) writecache.Cache {
|
createCacheFn := func(t *testing.T, smallSize uint64, mb *meta.DB, bs writecache.MainStorage, opts ...Option) writecache.Cache {
|
||||||
return New(
|
return New(
|
||||||
append([]Option{
|
append([]Option{
|
||||||
WithLogger(&logger.Logger{Logger: zaptest.NewLogger(t)}),
|
WithLogger(&logger.Logger{Logger: zaptest.NewLogger(t)}),
|
||||||
|
|
|
@ -1,45 +1,26 @@
|
||||||
package writecachebbolt
|
package writecachebbolt
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
|
||||||
"io/fs"
|
"io/fs"
|
||||||
"os"
|
"os"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
|
||||||
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Option represents write-cache configuration option.
|
// Option represents write-cache configuration option.
|
||||||
type Option func(*options)
|
type Option func(*options)
|
||||||
|
|
||||||
// meta is an interface for a metabase.
|
|
||||||
type metabase interface {
|
|
||||||
Exists(context.Context, meta.ExistsPrm) (meta.ExistsRes, error)
|
|
||||||
StorageID(context.Context, meta.StorageIDPrm) (meta.StorageIDRes, error)
|
|
||||||
UpdateStorageID(meta.UpdateStorageIDPrm) (meta.UpdateStorageIDRes, error)
|
|
||||||
}
|
|
||||||
|
|
||||||
// blob is an interface for the blobstor.
|
|
||||||
type blob interface {
|
|
||||||
Put(context.Context, common.PutPrm) (common.PutRes, error)
|
|
||||||
NeedsCompression(obj *objectSDK.Object) bool
|
|
||||||
Exists(ctx context.Context, res common.ExistsPrm) (common.ExistsRes, error)
|
|
||||||
}
|
|
||||||
|
|
||||||
type options struct {
|
type options struct {
|
||||||
log *logger.Logger
|
log *logger.Logger
|
||||||
// path is a path to a directory for write-cache.
|
// path is a path to a directory for write-cache.
|
||||||
path string
|
path string
|
||||||
// blobstor is the main persistent storage.
|
// blobstor is the main persistent storage.
|
||||||
blobstor blob
|
blobstor writecache.MainStorage
|
||||||
// metabase is the metabase instance.
|
// metabase is the metabase instance.
|
||||||
metabase metabase
|
metabase writecache.Metabase
|
||||||
// maxObjectSize is the maximum size of the object stored in the write-cache.
|
// maxObjectSize is the maximum size of the object stored in the write-cache.
|
||||||
maxObjectSize uint64
|
maxObjectSize uint64
|
||||||
// smallObjectSize is the maximum size of the object stored in the database.
|
// smallObjectSize is the maximum size of the object stored in the database.
|
||||||
|
@ -80,14 +61,14 @@ func WithPath(path string) Option {
|
||||||
}
|
}
|
||||||
|
|
||||||
// WithBlobstor sets main object storage.
|
// WithBlobstor sets main object storage.
|
||||||
func WithBlobstor(bs *blobstor.BlobStor) Option {
|
func WithBlobstor(bs writecache.MainStorage) Option {
|
||||||
return func(o *options) {
|
return func(o *options) {
|
||||||
o.blobstor = bs
|
o.blobstor = bs
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// WithMetabase sets metabase.
|
// WithMetabase sets metabase.
|
||||||
func WithMetabase(db *meta.DB) Option {
|
func WithMetabase(db writecache.Metabase) Option {
|
||||||
return func(o *options) {
|
return func(o *options) {
|
||||||
o.metabase = db
|
o.metabase = db
|
||||||
}
|
}
|
||||||
|
|
|
@ -112,7 +112,7 @@ func (c *cache) putBig(ctx context.Context, addr string, prm common.PutPrm) erro
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if c.blobstor.NeedsCompression(prm.Object) {
|
if compressor := c.blobstor.Compressor(); compressor != nil && compressor.NeedsCompression(prm.Object) {
|
||||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
Maybe also make Maybe also make `(*compression.Config) NeedsCompression()` work for `nil` Config?
Or check for nil here, I think it is easy to forget this in blobstor, for example.
ale64bit
commented
done done
|
|||||||
c.mtx.Lock()
|
c.mtx.Lock()
|
||||||
c.compressFlags[addr] = struct{}{}
|
c.compressFlags[addr] = struct{}{}
|
||||||
c.mtx.Unlock()
|
c.mtx.Unlock()
|
||||||
|
|
|
@ -28,7 +28,7 @@ type CreateCacheFunc[Option any] func(
|
||||||
t *testing.T,
|
t *testing.T,
|
||||||
smallSize uint64,
|
smallSize uint64,
|
||||||
meta *meta.DB,
|
meta *meta.DB,
|
||||||
bs *blobstor.BlobStor,
|
bs writecache.MainStorage,
|
||||||
opts ...Option,
|
opts ...Option,
|
||||||
) writecache.Cache
|
) writecache.Cache
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue
Storage
here is related not to the writecache storage, but to the storage it flushes tooCae we leave the name as
Blobstor
?discussed offline. Renamed to
MainStorage
.