[] Fix writecache benchmarks and refactor hacky NeedsCompression

Signed-off-by: Alejandro Lopez <a.lopez@yadro.com>
This commit is contained in:
Alejandro Lopez 2023-08-09 15:54:08 +03:00 committed by Evgenii Stratonikov
parent 023b90342c
commit 8f994163ee
17 changed files with 161 additions and 92 deletions
pkg/local_object_storage

View file

@ -252,6 +252,10 @@ func (b *Blobovniczas) SetCompressor(cc *compression.Config) {
b.compression = cc
}
func (b *Blobovniczas) Compressor() *compression.Config {
return b.compression
}
// SetReportErrorFunc implements common.Storage.
func (b *Blobovniczas) SetReportErrorFunc(f func(string, error)) {
b.reportError = f

View file

@ -128,3 +128,7 @@ func WithMetrics(m Metrics) Option {
c.metrics = m
}
}
func (b *BlobStor) Compressor() *compression.Config {
return &b.cfg.compression
}

View file

@ -146,33 +146,33 @@ func TestBlobstor_needsCompression(t *testing.T) {
b := newBlobStor(t, true, "audio/*", "*/x-mpeg", "*/mpeg", "application/x-midi")
obj := newObjectWithCt("video/mpeg")
require.False(t, b.NeedsCompression(obj))
require.False(t, b.compression.NeedsCompression(obj))
obj = newObjectWithCt("audio/aiff")
require.False(t, b.NeedsCompression(obj))
require.False(t, b.compression.NeedsCompression(obj))
obj = newObjectWithCt("application/x-midi")
require.False(t, b.NeedsCompression(obj))
require.False(t, b.compression.NeedsCompression(obj))
obj = newObjectWithCt("text/plain")
require.True(t, b.NeedsCompression(obj))
require.True(t, b.compression.NeedsCompression(obj))
obj = newObjectWithCt("")
require.True(t, b.NeedsCompression(obj))
require.True(t, b.compression.NeedsCompression(obj))
})
t.Run("content-types omitted", func(t *testing.T) {
b := newBlobStor(t, true)
obj := newObjectWithCt("video/mpeg")
require.True(t, b.NeedsCompression(obj))
require.True(t, b.compression.NeedsCompression(obj))
})
t.Run("compress disabled", func(t *testing.T) {
b := newBlobStor(t, false, "video/mpeg")
obj := newObjectWithCt("video/mpeg")
require.False(t, b.NeedsCompression(obj))
require.False(t, b.compression.NeedsCompression(obj))
obj = newObjectWithCt("text/plain")
require.False(t, b.NeedsCompression(obj))
require.False(t, b.compression.NeedsCompression(obj))
})
}

View file

@ -15,7 +15,10 @@ type Storage interface {
Type() string
Path() string
SetCompressor(cc *compression.Config)
Compressor() *compression.Config
// SetReportErrorFunc allows to provide a function to be called on disk errors.
// This function MUST be called before Open.
SetReportErrorFunc(f func(string, error))

View file

@ -550,6 +550,10 @@ func (t *FSTree) SetCompressor(cc *compression.Config) {
t.Config = cc
}
func (t *FSTree) Compressor() *compression.Config {
return t.Config
}
// SetReportErrorFunc implements common.Storage.
func (t *FSTree) SetReportErrorFunc(_ func(string, error)) {
// Do nothing, FSTree can encounter only one error which is returned.

View file

@ -12,5 +12,6 @@ func (s *memstoreImpl) Close() error { return nil }
func (s *memstoreImpl) Type() string { return Type }
func (s *memstoreImpl) Path() string { return s.rootPath }
func (s *memstoreImpl) SetCompressor(cc *compression.Config) { s.compression = cc }
func (s *memstoreImpl) Compressor() *compression.Config { return s.compression }
func (s *memstoreImpl) SetReportErrorFunc(f func(string, error)) { s.reportError = f }
func (s *memstoreImpl) SetParentID(string) {}

View file

@ -9,7 +9,6 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)
@ -72,11 +71,3 @@ func (b *BlobStor) Put(ctx context.Context, prm common.PutPrm) (common.PutRes, e
return common.PutRes{}, ErrNoPlaceFound
}
// NeedsCompression returns true if the object should be compressed.
// For an object to be compressed 2 conditions must hold:
// 1. Compression is enabled in settings.
// 2. Object MIME Content-Type is allowed for compression.
func (b *BlobStor) NeedsCompression(obj *objectSDK.Object) bool {
return b.cfg.compression.NeedsCompression(obj)
}

View file

@ -15,6 +15,7 @@ type cfg struct {
Type func() string
Path func() string
SetCompressor func(cc *compression.Config)
Compressor func() *compression.Config
SetReportErrorFunc func(f func(string, error))
Get func(common.GetPrm) (common.GetRes, error)
@ -45,6 +46,10 @@ func WithSetCompressor(f func(*compression.Config)) Option {
return func(c *cfg) { c.overrides.SetCompressor = f }
}
func WithCompressor(f func() *compression.Config) Option {
return func(c *cfg) { c.overrides.Compressor = f }
}
func WithReportErrorFunc(f func(func(string, error))) Option {
return func(c *cfg) { c.overrides.SetReportErrorFunc = f }
}

View file

@ -128,6 +128,19 @@ func (s *TestStore) SetCompressor(cc *compression.Config) {
}
}
func (s *TestStore) Compressor() *compression.Config {
s.mu.RLock()
defer s.mu.RUnlock()
switch {
case s.overrides.Compressor != nil:
return s.overrides.Compressor()
case s.st != nil:
return s.st.Compressor()
default:
panic("unexpected storage call: Compressor()")
}
}
func (s *TestStore) SetReportErrorFunc(f func(string, error)) {
s.mu.RLock()
defer s.mu.RUnlock()

View file

@ -3,50 +3,120 @@ package benchmark
import (
"context"
"testing"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/teststore"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache/writecachebadger"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache/writecachebbolt"
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
"github.com/stretchr/testify/require"
)
func BenchmarkWritecache(b *testing.B) {
b.Run("bbolt", func(b *testing.B) {
cache := writecachebbolt.New(
writecachebbolt.WithPath(b.TempDir()),
)
benchmarkPut(b, cache)
func BenchmarkWritecacheSeq(b *testing.B) {
const payloadSize = 8 << 10
b.Run("bbolt_seq", func(b *testing.B) {
benchmarkPutSeq(b, newBBoltCache(b), payloadSize)
})
b.Run("badger", func(b *testing.B) {
cache := writecachebadger.New(
writecachebadger.WithPath(b.TempDir()),
)
benchmarkPut(b, cache)
b.Run("badger_seq", func(b *testing.B) {
benchmarkPutSeq(b, newBadgerCache(b), payloadSize)
})
}
func benchmarkPut(b *testing.B, cache writecache.Cache) {
if err := cache.Open(false); err != nil {
b.Fatalf("initializing: %v", err)
func BenchmarkWritecachePar(b *testing.B) {
const payloadSize = 8 << 10
b.Run("bbolt_par", func(b *testing.B) {
benchmarkPutPar(b, newBBoltCache(b), payloadSize)
})
b.Run("badger_par", func(b *testing.B) {
benchmarkPutPar(b, newBadgerCache(b), payloadSize)
})
}
if err := cache.Init(); err != nil {
b.Fatalf("opening: %v", err)
}
defer cache.Close()
func benchmarkPutSeq(b *testing.B, cache writecache.Cache, size uint64) {
benchmarkPutPrepare(b, cache)
ctx := context.Background()
objGen := testutil.RandObjGenerator{ObjSize: 8 << 10}
objGen := testutil.RandObjGenerator{ObjSize: size}
b.ResetTimer()
for n := 0; n < b.N; n++ {
obj := objGen.Next()
rawData, err := obj.Marshal()
require.NoError(b, err, "marshaling object")
prm := common.PutPrm{
Address: oidtest.Address(),
Object: objGen.Next(),
Address: testutil.AddressFromObject(b, obj),
Object: obj,
RawData: rawData,
}
if _, err := cache.Put(ctx, prm); err != nil {
b.Fatalf("putting: %v", err)
}
}
}
func benchmarkPutPar(b *testing.B, cache writecache.Cache, size uint64) {
benchmarkPutPrepare(b, cache)
ctx := context.Background()
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
objGen := testutil.RandObjGenerator{ObjSize: size}
for pb.Next() {
obj := objGen.Next()
rawData, err := obj.Marshal()
require.NoError(b, err, "marshaling object")
prm := common.PutPrm{
Address: testutil.AddressFromObject(b, obj),
Object: obj,
RawData: rawData,
}
if _, err := cache.Put(ctx, prm); err != nil {
b.Fatalf("putting: %v", err)
}
}
})
}
func benchmarkPutPrepare(b *testing.B, cache writecache.Cache) {
require.NoError(b, cache.Open(false), "opening")
require.NoError(b, cache.Init(), "initializing")
b.Cleanup(func() {
require.NoError(b, cache.Close(), "closing")
})
}
type testMetabase struct{}
func (testMetabase) UpdateStorageID(meta.UpdateStorageIDPrm) (meta.UpdateStorageIDRes, error) {
return meta.UpdateStorageIDRes{}, nil
}
func newBBoltCache(b *testing.B) writecache.Cache {
bs := teststore.New(
teststore.WithPut(func(pp common.PutPrm) (common.PutRes, error) { return common.PutRes{}, nil }),
)
return writecachebbolt.New(
writecachebbolt.WithPath(b.TempDir()),
writecachebbolt.WithBlobstor(bs),
writecachebbolt.WithMetabase(testMetabase{}),
writecachebbolt.WithMaxCacheSize(256<<30),
writecachebbolt.WithSmallObjectSize(128<<10),
)
}
func newBadgerCache(b *testing.B) writecache.Cache {
bs := teststore.New(
teststore.WithPut(func(pp common.PutPrm) (common.PutRes, error) { return common.PutRes{}, nil }),
)
return writecachebadger.New(
writecachebadger.WithPath(b.TempDir()),
writecachebadger.WithBlobstor(bs),
writecachebadger.WithMetabase(testMetabase{}),
writecachebadger.WithMaxCacheSize(256<<30),
writecachebadger.WithGCInterval(10*time.Second),
)
}

View file

@ -5,6 +5,8 @@ import (
"errors"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/compression"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
@ -40,6 +42,18 @@ type Cache interface {
Close() error
}
// MainStorage is the interface of the underlying storage of Cache implementations.
type MainStorage interface {
Compressor() *compression.Config
Exists(context.Context, common.ExistsPrm) (common.ExistsRes, error)
Put(context.Context, common.PutPrm) (common.PutRes, error)
}
// Metabase is the interface of the metabase used by Cache implementations.
type Metabase interface {
UpdateStorageID(meta.UpdateStorageIDPrm) (meta.UpdateStorageIDRes, error)
}
var (
// ErrReadOnly is returned when Put/Write is performed in a read-only mode.
ErrReadOnly = logicerr.New("write-cache is in read-only mode")

View file

@ -6,7 +6,6 @@ import (
"testing"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache"
@ -19,7 +18,7 @@ import (
)
func TestFlush(t *testing.T) {
createCacheFn := func(t *testing.T, smallSize uint64, mb *meta.DB, bs *blobstor.BlobStor, opts ...Option) writecache.Cache {
createCacheFn := func(t *testing.T, smallSize uint64, mb *meta.DB, bs writecache.MainStorage, opts ...Option) writecache.Cache {
return New(
append([]Option{
WithLogger(&logger.Logger{Logger: zaptest.NewLogger(t)}),

View file

@ -1,43 +1,24 @@
package writecachebadger
import (
"context"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
"go.uber.org/zap"
)
// Option represents write-cache configuration option.
type Option func(*options)
// meta is an interface for a metabase.
type metabase interface {
Exists(context.Context, meta.ExistsPrm) (meta.ExistsRes, error)
StorageID(context.Context, meta.StorageIDPrm) (meta.StorageIDRes, error)
UpdateStorageID(meta.UpdateStorageIDPrm) (meta.UpdateStorageIDRes, error)
}
// blob is an interface for the blobstor.
type blob interface {
Put(context.Context, common.PutPrm) (common.PutRes, error)
NeedsCompression(obj *objectSDK.Object) bool
Exists(ctx context.Context, res common.ExistsPrm) (common.ExistsRes, error)
}
type options struct {
log *logger.Logger
// path is a path to a directory for write-cache.
path string
// blobstor is the main persistent storage.
blobstor blob
blobstor writecache.MainStorage
// metabase is the metabase instance.
metabase metabase
metabase writecache.Metabase
// maxObjectSize is the maximum size of the object stored in the write-cache.
maxObjectSize uint64
// workersCount is the number of workers flushing objects in parallel.
@ -70,14 +51,14 @@ func WithPath(path string) Option {
}
// WithBlobstor sets main object storage.
func WithBlobstor(bs *blobstor.BlobStor) Option {
func WithBlobstor(bs writecache.MainStorage) Option {
return func(o *options) {
o.blobstor = bs
}
}
// WithMetabase sets metabase.
func WithMetabase(db *meta.DB) Option {
func WithMetabase(db writecache.Metabase) Option {
return func(o *options) {
o.metabase = db
}

View file

@ -8,7 +8,6 @@ import (
"testing"
objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
@ -22,7 +21,7 @@ import (
)
func TestFlush(t *testing.T) {
createCacheFn := func(t *testing.T, smallSize uint64, mb *meta.DB, bs *blobstor.BlobStor, opts ...Option) writecache.Cache {
createCacheFn := func(t *testing.T, smallSize uint64, mb *meta.DB, bs writecache.MainStorage, opts ...Option) writecache.Cache {
return New(
append([]Option{
WithLogger(&logger.Logger{Logger: zaptest.NewLogger(t)}),

View file

@ -1,45 +1,26 @@
package writecachebbolt
import (
"context"
"io/fs"
"os"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
"go.uber.org/zap"
)
// Option represents write-cache configuration option.
type Option func(*options)
// meta is an interface for a metabase.
type metabase interface {
Exists(context.Context, meta.ExistsPrm) (meta.ExistsRes, error)
StorageID(context.Context, meta.StorageIDPrm) (meta.StorageIDRes, error)
UpdateStorageID(meta.UpdateStorageIDPrm) (meta.UpdateStorageIDRes, error)
}
// blob is an interface for the blobstor.
type blob interface {
Put(context.Context, common.PutPrm) (common.PutRes, error)
NeedsCompression(obj *objectSDK.Object) bool
Exists(ctx context.Context, res common.ExistsPrm) (common.ExistsRes, error)
}
type options struct {
log *logger.Logger
// path is a path to a directory for write-cache.
path string
// blobstor is the main persistent storage.
blobstor blob
blobstor writecache.MainStorage
// metabase is the metabase instance.
metabase metabase
metabase writecache.Metabase
// maxObjectSize is the maximum size of the object stored in the write-cache.
maxObjectSize uint64
// smallObjectSize is the maximum size of the object stored in the database.
@ -80,14 +61,14 @@ func WithPath(path string) Option {
}
// WithBlobstor sets main object storage.
func WithBlobstor(bs *blobstor.BlobStor) Option {
func WithBlobstor(bs writecache.MainStorage) Option {
return func(o *options) {
o.blobstor = bs
}
}
// WithMetabase sets metabase.
func WithMetabase(db *meta.DB) Option {
func WithMetabase(db writecache.Metabase) Option {
return func(o *options) {
o.metabase = db
}

View file

@ -112,7 +112,7 @@ func (c *cache) putBig(ctx context.Context, addr string, prm common.PutPrm) erro
return err
}
if c.blobstor.NeedsCompression(prm.Object) {
if compressor := c.blobstor.Compressor(); compressor != nil && compressor.NeedsCompression(prm.Object) {
c.mtx.Lock()
c.compressFlags[addr] = struct{}{}
c.mtx.Unlock()

View file

@ -28,7 +28,7 @@ type CreateCacheFunc[Option any] func(
t *testing.T,
smallSize uint64,
meta *meta.DB,
bs *blobstor.BlobStor,
bs writecache.MainStorage,
opts ...Option,
) writecache.Cache