[#xx] Fix writecache benchmarks and refactor hacky NeedsCompression #586

Merged
fyrchik merged 1 commit from ale64bit/frostfs-node:fix/wc-benchmarks into master 2023-08-10 08:05:20 +00:00
17 changed files with 161 additions and 92 deletions
Showing only changes of commit 0143ccb2db - Show all commits

View file

@ -252,6 +252,10 @@ func (b *Blobovniczas) SetCompressor(cc *compression.Config) {
b.compression = cc
}
func (b *Blobovniczas) Compressor() *compression.Config {
return b.compression
}
// SetReportErrorFunc implements common.Storage.
func (b *Blobovniczas) SetReportErrorFunc(f func(string, error)) {
b.reportError = f

View file

@ -128,3 +128,7 @@ func WithMetrics(m Metrics) Option {
c.metrics = m
}
}
func (b *BlobStor) Compressor() *compression.Config {
return &b.cfg.compression
}

View file

@ -146,33 +146,33 @@ func TestBlobstor_needsCompression(t *testing.T) {
b := newBlobStor(t, true, "audio/*", "*/x-mpeg", "*/mpeg", "application/x-midi")
obj := newObjectWithCt("video/mpeg")
require.False(t, b.NeedsCompression(obj))
require.False(t, b.compression.NeedsCompression(obj))
obj = newObjectWithCt("audio/aiff")
require.False(t, b.NeedsCompression(obj))
require.False(t, b.compression.NeedsCompression(obj))
obj = newObjectWithCt("application/x-midi")
require.False(t, b.NeedsCompression(obj))
require.False(t, b.compression.NeedsCompression(obj))
obj = newObjectWithCt("text/plain")
require.True(t, b.NeedsCompression(obj))
require.True(t, b.compression.NeedsCompression(obj))
obj = newObjectWithCt("")
require.True(t, b.NeedsCompression(obj))
require.True(t, b.compression.NeedsCompression(obj))
})
t.Run("content-types omitted", func(t *testing.T) {
b := newBlobStor(t, true)
obj := newObjectWithCt("video/mpeg")
require.True(t, b.NeedsCompression(obj))
require.True(t, b.compression.NeedsCompression(obj))
})
t.Run("compress disabled", func(t *testing.T) {
b := newBlobStor(t, false, "video/mpeg")
obj := newObjectWithCt("video/mpeg")
require.False(t, b.NeedsCompression(obj))
require.False(t, b.compression.NeedsCompression(obj))
obj = newObjectWithCt("text/plain")
require.False(t, b.NeedsCompression(obj))
require.False(t, b.compression.NeedsCompression(obj))
})
}

View file

@ -15,7 +15,10 @@ type Storage interface {
Type() string
Path() string
SetCompressor(cc *compression.Config)
Compressor() *compression.Config
// SetReportErrorFunc allows to provide a function to be called on disk errors.
// This function MUST be called before Open.
SetReportErrorFunc(f func(string, error))

View file

@ -550,6 +550,10 @@ func (t *FSTree) SetCompressor(cc *compression.Config) {
t.Config = cc
}
func (t *FSTree) Compressor() *compression.Config {
return t.Config
}
// SetReportErrorFunc implements common.Storage.
func (t *FSTree) SetReportErrorFunc(_ func(string, error)) {
// Do nothing, FSTree can encounter only one error which is returned.

View file

@ -12,5 +12,6 @@ func (s *memstoreImpl) Close() error { return nil }
func (s *memstoreImpl) Type() string { return Type }
func (s *memstoreImpl) Path() string { return s.rootPath }
func (s *memstoreImpl) SetCompressor(cc *compression.Config) { s.compression = cc }
func (s *memstoreImpl) Compressor() *compression.Config { return s.compression }
func (s *memstoreImpl) SetReportErrorFunc(f func(string, error)) { s.reportError = f }
func (s *memstoreImpl) SetParentID(string) {}

View file

@ -9,7 +9,6 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)
@ -72,11 +71,3 @@ func (b *BlobStor) Put(ctx context.Context, prm common.PutPrm) (common.PutRes, e
return common.PutRes{}, ErrNoPlaceFound
}
// NeedsCompression returns true if the object should be compressed.
// For an object to be compressed 2 conditions must hold:
// 1. Compression is enabled in settings.
// 2. Object MIME Content-Type is allowed for compression.
func (b *BlobStor) NeedsCompression(obj *objectSDK.Object) bool {
return b.cfg.compression.NeedsCompression(obj)
}

View file

@ -15,6 +15,7 @@ type cfg struct {
Type func() string
Path func() string
SetCompressor func(cc *compression.Config)
Compressor func() *compression.Config
SetReportErrorFunc func(f func(string, error))
Get func(common.GetPrm) (common.GetRes, error)
@ -45,6 +46,10 @@ func WithSetCompressor(f func(*compression.Config)) Option {
return func(c *cfg) { c.overrides.SetCompressor = f }
}
func WithCompressor(f func() *compression.Config) Option {
return func(c *cfg) { c.overrides.Compressor = f }
}
func WithReportErrorFunc(f func(func(string, error))) Option {
return func(c *cfg) { c.overrides.SetReportErrorFunc = f }
}

View file

@ -128,6 +128,19 @@ func (s *TestStore) SetCompressor(cc *compression.Config) {
}
}
func (s *TestStore) Compressor() *compression.Config {
s.mu.RLock()
defer s.mu.RUnlock()
switch {
case s.overrides.Compressor != nil:
return s.overrides.Compressor()
case s.st != nil:
return s.st.Compressor()
default:
panic("unexpected storage call: Compressor()")
}
}
func (s *TestStore) SetReportErrorFunc(f func(string, error)) {
s.mu.RLock()
defer s.mu.RUnlock()

View file

@ -3,50 +3,120 @@ package benchmark
import (
"context"
"testing"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/teststore"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache/writecachebadger"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache/writecachebbolt"
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
"github.com/stretchr/testify/require"
)
func BenchmarkWritecache(b *testing.B) {
b.Run("bbolt", func(b *testing.B) {
cache := writecachebbolt.New(
writecachebbolt.WithPath(b.TempDir()),
)
benchmarkPut(b, cache)
func BenchmarkWritecacheSeq(b *testing.B) {
const payloadSize = 8 << 10
b.Run("bbolt_seq", func(b *testing.B) {
benchmarkPutSeq(b, newBBoltCache(b), payloadSize)
})
b.Run("badger", func(b *testing.B) {
cache := writecachebadger.New(
writecachebadger.WithPath(b.TempDir()),
)
benchmarkPut(b, cache)
b.Run("badger_seq", func(b *testing.B) {
benchmarkPutSeq(b, newBadgerCache(b), payloadSize)
})
}
func benchmarkPut(b *testing.B, cache writecache.Cache) {
if err := cache.Open(false); err != nil {
b.Fatalf("initializing: %v", err)
}
if err := cache.Init(); err != nil {
b.Fatalf("opening: %v", err)
}
defer cache.Close()
func BenchmarkWritecachePar(b *testing.B) {
const payloadSize = 8 << 10
b.Run("bbolt_par", func(b *testing.B) {
benchmarkPutPar(b, newBBoltCache(b), payloadSize)
})
b.Run("badger_par", func(b *testing.B) {
benchmarkPutPar(b, newBadgerCache(b), payloadSize)
})
}
func benchmarkPutSeq(b *testing.B, cache writecache.Cache, size uint64) {
benchmarkPutPrepare(b, cache)
ctx := context.Background()
objGen := testutil.RandObjGenerator{ObjSize: 8 << 10}
objGen := testutil.RandObjGenerator{ObjSize: size}
b.ResetTimer()
for n := 0; n < b.N; n++ {
obj := objGen.Next()
rawData, err := obj.Marshal()
require.NoError(b, err, "marshaling object")
prm := common.PutPrm{
Address: oidtest.Address(),
Object: objGen.Next(),
Address: testutil.AddressFromObject(b, obj),
Object: obj,
RawData: rawData,
}
if _, err := cache.Put(ctx, prm); err != nil {
b.Fatalf("putting: %v", err)
}
}
}
func benchmarkPutPar(b *testing.B, cache writecache.Cache, size uint64) {
benchmarkPutPrepare(b, cache)
ctx := context.Background()
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
objGen := testutil.RandObjGenerator{ObjSize: size}
for pb.Next() {
obj := objGen.Next()
rawData, err := obj.Marshal()
require.NoError(b, err, "marshaling object")
prm := common.PutPrm{
Address: testutil.AddressFromObject(b, obj),
Object: obj,
RawData: rawData,
}
if _, err := cache.Put(ctx, prm); err != nil {
b.Fatalf("putting: %v", err)
}
}
})
}
func benchmarkPutPrepare(b *testing.B, cache writecache.Cache) {
require.NoError(b, cache.Open(false), "opening")
require.NoError(b, cache.Init(), "initializing")
b.Cleanup(func() {
require.NoError(b, cache.Close(), "closing")
})
}
type testMetabase struct{}
func (testMetabase) UpdateStorageID(meta.UpdateStorageIDPrm) (meta.UpdateStorageIDRes, error) {
return meta.UpdateStorageIDRes{}, nil
}
func newBBoltCache(b *testing.B) writecache.Cache {
bs := teststore.New(
teststore.WithPut(func(pp common.PutPrm) (common.PutRes, error) { return common.PutRes{}, nil }),
)
return writecachebbolt.New(
writecachebbolt.WithPath(b.TempDir()),
writecachebbolt.WithBlobstor(bs),
writecachebbolt.WithMetabase(testMetabase{}),
writecachebbolt.WithMaxCacheSize(256<<30),
writecachebbolt.WithSmallObjectSize(128<<10),
)
}
func newBadgerCache(b *testing.B) writecache.Cache {
bs := teststore.New(
teststore.WithPut(func(pp common.PutPrm) (common.PutRes, error) { return common.PutRes{}, nil }),
)
return writecachebadger.New(
writecachebadger.WithPath(b.TempDir()),
writecachebadger.WithBlobstor(bs),
writecachebadger.WithMetabase(testMetabase{}),
writecachebadger.WithMaxCacheSize(256<<30),
writecachebadger.WithGCInterval(10*time.Second),
)
}

View file

@ -5,6 +5,8 @@ import (
"errors"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/compression"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
@ -40,6 +42,18 @@ type Cache interface {
Close() error
}
// MainStorage is the interface of the underlying storage of Cache implementations.
type MainStorage interface {
Compressor() *compression.Config
Exists(context.Context, common.ExistsPrm) (common.ExistsRes, error)
Put(context.Context, common.PutPrm) (common.PutRes, error)
}
// Metabase is the interface of the metabase used by Cache implementations.
type Metabase interface {
UpdateStorageID(meta.UpdateStorageIDPrm) (meta.UpdateStorageIDRes, error)
}
var (
// ErrReadOnly is returned when Put/Write is performed in a read-only mode.
ErrReadOnly = logicerr.New("write-cache is in read-only mode")

View file

@ -6,7 +6,6 @@ import (
"testing"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache"
@ -19,7 +18,7 @@ import (
)
func TestFlush(t *testing.T) {
createCacheFn := func(t *testing.T, smallSize uint64, mb *meta.DB, bs *blobstor.BlobStor, opts ...Option) writecache.Cache {
createCacheFn := func(t *testing.T, smallSize uint64, mb *meta.DB, bs writecache.MainStorage, opts ...Option) writecache.Cache {
return New(
append([]Option{
WithLogger(&logger.Logger{Logger: zaptest.NewLogger(t)}),

View file

@ -1,43 +1,24 @@
package writecachebadger
import (
"context"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
"go.uber.org/zap"
)
// Option represents write-cache configuration option.
type Option func(*options)
// meta is an interface for a metabase.
type metabase interface {
Exists(context.Context, meta.ExistsPrm) (meta.ExistsRes, error)
StorageID(context.Context, meta.StorageIDPrm) (meta.StorageIDRes, error)
UpdateStorageID(meta.UpdateStorageIDPrm) (meta.UpdateStorageIDRes, error)
}
// blob is an interface for the blobstor.
type blob interface {
Put(context.Context, common.PutPrm) (common.PutRes, error)
NeedsCompression(obj *objectSDK.Object) bool
Exists(ctx context.Context, res common.ExistsPrm) (common.ExistsRes, error)
}
type options struct {
log *logger.Logger
// path is a path to a directory for write-cache.
path string
// blobstor is the main persistent storage.
blobstor blob
blobstor writecache.MainStorage
fyrchik marked this conversation as resolved Outdated

Storage here is related not to the writecache storage, but to the storage it flushes too
Cae we leave the name as Blobstor?

`Storage` here is related not to the writecache storage, but to the storage it flushes too Cae we leave the name as `Blobstor`?

discussed offline. Renamed to MainStorage.

discussed offline. Renamed to `MainStorage`.
// metabase is the metabase instance.
metabase metabase
metabase writecache.Metabase
// maxObjectSize is the maximum size of the object stored in the write-cache.
maxObjectSize uint64
// workersCount is the number of workers flushing objects in parallel.
@ -70,14 +51,14 @@ func WithPath(path string) Option {
}
// WithBlobstor sets main object storage.
func WithBlobstor(bs *blobstor.BlobStor) Option {
func WithBlobstor(bs writecache.MainStorage) Option {
return func(o *options) {
o.blobstor = bs
}
}
// WithMetabase sets metabase.
func WithMetabase(db *meta.DB) Option {
func WithMetabase(db writecache.Metabase) Option {
return func(o *options) {
o.metabase = db
}

View file

@ -8,7 +8,6 @@ import (
"testing"
objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
@ -22,7 +21,7 @@ import (
)
func TestFlush(t *testing.T) {
createCacheFn := func(t *testing.T, smallSize uint64, mb *meta.DB, bs *blobstor.BlobStor, opts ...Option) writecache.Cache {
createCacheFn := func(t *testing.T, smallSize uint64, mb *meta.DB, bs writecache.MainStorage, opts ...Option) writecache.Cache {
return New(
append([]Option{
WithLogger(&logger.Logger{Logger: zaptest.NewLogger(t)}),

View file

@ -1,45 +1,26 @@
package writecachebbolt
import (
"context"
"io/fs"
"os"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
"go.uber.org/zap"
)
// Option represents write-cache configuration option.
type Option func(*options)
// meta is an interface for a metabase.
type metabase interface {
Exists(context.Context, meta.ExistsPrm) (meta.ExistsRes, error)
StorageID(context.Context, meta.StorageIDPrm) (meta.StorageIDRes, error)
UpdateStorageID(meta.UpdateStorageIDPrm) (meta.UpdateStorageIDRes, error)
}
// blob is an interface for the blobstor.
type blob interface {
Put(context.Context, common.PutPrm) (common.PutRes, error)
NeedsCompression(obj *objectSDK.Object) bool
Exists(ctx context.Context, res common.ExistsPrm) (common.ExistsRes, error)
}
type options struct {
log *logger.Logger
// path is a path to a directory for write-cache.
path string
// blobstor is the main persistent storage.
blobstor blob
blobstor writecache.MainStorage
// metabase is the metabase instance.
metabase metabase
metabase writecache.Metabase
// maxObjectSize is the maximum size of the object stored in the write-cache.
maxObjectSize uint64
// smallObjectSize is the maximum size of the object stored in the database.
@ -80,14 +61,14 @@ func WithPath(path string) Option {
}
// WithBlobstor sets main object storage.
func WithBlobstor(bs *blobstor.BlobStor) Option {
func WithBlobstor(bs writecache.MainStorage) Option {
return func(o *options) {
o.blobstor = bs
}
}
// WithMetabase sets metabase.
func WithMetabase(db *meta.DB) Option {
func WithMetabase(db writecache.Metabase) Option {
return func(o *options) {
o.metabase = db
}

View file

@ -112,7 +112,7 @@ func (c *cache) putBig(ctx context.Context, addr string, prm common.PutPrm) erro
return err
}
if c.blobstor.NeedsCompression(prm.Object) {
if compressor := c.blobstor.Compressor(); compressor != nil && compressor.NeedsCompression(prm.Object) {
fyrchik marked this conversation as resolved Outdated

Maybe also make (*compression.Config) NeedsCompression() work for nil Config?
Or check for nil here, I think it is easy to forget this in blobstor, for example.

Maybe also make `(*compression.Config) NeedsCompression()` work for `nil` Config? Or check for nil here, I think it is easy to forget this in blobstor, for example.

done

done
c.mtx.Lock()
c.compressFlags[addr] = struct{}{}
c.mtx.Unlock()

View file

@ -28,7 +28,7 @@ type CreateCacheFunc[Option any] func(
t *testing.T,
smallSize uint64,
meta *meta.DB,
bs *blobstor.BlobStor,
bs writecache.MainStorage,
opts ...Option,
) writecache.Cache