diff --git a/cmd/frostfs-node/config.go b/cmd/frostfs-node/config.go index 8e103b527..496e071be 100644 --- a/cmd/frostfs-node/config.go +++ b/cmd/frostfs-node/config.go @@ -43,6 +43,7 @@ import ( writecacheconfig "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache/config" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache/writecachebadger" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache/writecachebbolt" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache/writecachebitcask" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/metrics" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client" containerClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/container" @@ -128,17 +129,21 @@ type shardCfg struct { } writecacheCfg struct { - enabled bool - typ writecacheconfig.Type - path string - maxBatchSize int - maxBatchDelay time.Duration - smallObjectSize uint64 - maxObjSize uint64 - flushWorkerCount int - sizeLimit uint64 - noSync bool - gcInterval time.Duration + enabled bool + typ writecacheconfig.Type + path string + maxBatchSize int + maxBatchDelay time.Duration + smallObjectSize uint64 + maxObjSize uint64 + flushWorkerCount int + sizeLimit uint64 + noSync bool + gcInterval time.Duration + bucketCount int + regionCount int + logFileSize uint64 + maxPendingLogFileFlush int } piloramaCfg struct { @@ -253,6 +258,10 @@ func (a *applicationConfiguration) setShardWriteCacheConfig(newConfig *shardCfg, wc.sizeLimit = writeCacheCfg.SizeLimit() wc.noSync = writeCacheCfg.NoSync() wc.gcInterval = writeCacheCfg.GCInterval() + wc.bucketCount = writeCacheCfg.BucketCount() + wc.regionCount = writeCacheCfg.RegionCount() + wc.logFileSize = writeCacheCfg.LogFileSize() + wc.maxPendingLogFileFlush = writeCacheCfg.MaxPendingLogFileFlush() } } @@ -744,6 +753,18 @@ func (c *cfg) getWriteCacheOpts(shCfg shardCfg) writecacheconfig.Options { writecachebadger.WithLogger(c.log), writecachebadger.WithGCInterval(wcRead.gcInterval), ) + case writecacheconfig.TypeBitcask: + writeCacheOpts.Type = writecacheconfig.TypeBitcask + writeCacheOpts.BitcaskOptions = append(writeCacheOpts.BitcaskOptions, + writecachebitcask.WithPath(wcRead.path), + writecachebitcask.WithLogger(c.log), + writecachebitcask.WithMaxObjectSize(wcRead.maxObjSize), + writecachebitcask.WithBucketCount(wcRead.bucketCount), + writecachebitcask.WithRegionCount(wcRead.regionCount), + writecachebitcask.WithLogFileSize(wcRead.logFileSize), + writecachebitcask.WithMaxBatchDelay(wcRead.maxBatchDelay), + writecachebitcask.WithMaxPendingLogFileFlush(wcRead.maxPendingLogFileFlush), + ) default: panic(fmt.Sprintf("unknown writecache type: %q", wcRead.typ)) } diff --git a/cmd/frostfs-node/config/engine/shard/writecache/config.go b/cmd/frostfs-node/config/engine/shard/writecache/config.go index 504fe3ca2..2ef3b541c 100644 --- a/cmd/frostfs-node/config/engine/shard/writecache/config.go +++ b/cmd/frostfs-node/config/engine/shard/writecache/config.go @@ -28,6 +28,18 @@ const ( // DefaultGCInterval is the default duration of the GC cycle interval. DefaultGCInterval = 1 * time.Minute + + // DefaultBucketCount is the default number of buckets for the bitcask cache. + DefaultBucketCount = 1 << 16 + + // DefaultRegionCount is the default number of regions for the bitcask cache. + DefaultRegionCount = 1 << 2 + + // DefaultLogFileSize is the default max size of a single log file for the bitcask cache. + DefaultLogFileSize = 64 << 20 + + // DefaultMaxPendingLogFileFlush is the default max waiting log files to be flushed for the bitcask cache. + DefaultMaxPendingLogFileFlush = 4 ) // From wraps config section into Config. @@ -53,6 +65,8 @@ func (x *Config) Type() writecacheconfig.Type { return writecacheconfig.TypeBBolt case "badger": return writecacheconfig.TypeBadger + case "bitcask": + return writecacheconfig.TypeBitcask } panic(fmt.Sprintf("invalid writecache type: %q", t)) @@ -162,3 +176,67 @@ func (x *Config) GCInterval() time.Duration { return DefaultGCInterval } + +// BucketCount returns the value of "bucket_count" config parameter. +// +// Returns BucketCountDefault if the value is not a positive number. +func (x *Config) BucketCount() int { + c := config.IntSafe( + (*config.Config)(x), + "bucket_count", + ) + + if c > 0 { + return int(c) + } + + return DefaultBucketCount +} + +// RegionCount returns the value of "region_count" config parameter. +// +// Returns RegionCountDefault if the value is not a positive number. +func (x *Config) RegionCount() int { + c := config.IntSafe( + (*config.Config)(x), + "region_count", + ) + + if c > 0 { + return int(c) + } + + return DefaultRegionCount +} + +// LogFileSize returns the value of "log_file_size" config parameter. +// +// Returns LogFileSizeDefault if the value is not a positive number. +func (x *Config) LogFileSize() uint64 { + c := config.SizeInBytesSafe( + (*config.Config)(x), + "log_file_size", + ) + + if c > 0 { + return c + } + + return DefaultLogFileSize +} + +// MaxPendingLogFileFlush returns the value of "max_pending_log_file_flush" config parameter. +// +// Returns MaxPendingLogFileFlushDefault if the value is not a positive number. +func (x *Config) MaxPendingLogFileFlush() int { + c := config.IntSafe( + (*config.Config)(x), + "max_pending_log_file_flush", + ) + + if c > 0 { + return int(c) + } + + return DefaultMaxPendingLogFileFlush +} diff --git a/internal/logs/logs.go b/internal/logs/logs.go index 6ceee4f17..6fb32cbbb 100644 --- a/internal/logs/logs.go +++ b/internal/logs/logs.go @@ -297,6 +297,10 @@ const ( WritecacheCantRemoveObjectFromWritecache = "can't remove object from write-cache" WritecacheDBValueLogGCRunCompleted = "value log GC run completed" WritecacheBadgerObjAlreadyScheduled = "object already scheduled for flush" + WritecacheBitcaskReadingLogFile = "reading log file" + WritecacheBitcaskFlushingLogBytes = "flushing log bytes" + WritecacheBitcaskRemovingLogFile = "removing log file" + WritecacheBitcaskClosingLogFile = "closing log file" BlobovniczatreeCouldNotGetObjectFromLevel = "could not get object from level" BlobovniczatreeCouldNotReadPayloadRangeFromOpenedBlobovnicza = "could not read payload range from opened blobovnicza" BlobovniczatreeCouldNotReadPayloadRangeFromActiveBlobovnicza = "could not read payload range from active blobovnicza" diff --git a/pkg/local_object_storage/shard/shard.go b/pkg/local_object_storage/shard/shard.go index 00f4fbb9e..d1123acd6 100644 --- a/pkg/local_object_storage/shard/shard.go +++ b/pkg/local_object_storage/shard/shard.go @@ -16,6 +16,7 @@ import ( writecacheconfig "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache/config" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache/writecachebadger" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache/writecachebbolt" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache/writecachebitcask" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" @@ -170,6 +171,12 @@ func New(opts ...Option) *Shard { writecachebadger.WithReportErrorFunc(reportFunc), writecachebadger.WithBlobstor(bs), writecachebadger.WithMetabase(mb))...) + case writecacheconfig.TypeBitcask: + s.writeCache = writecachebitcask.New( + append(c.writeCacheOpts.BitcaskOptions, + writecachebitcask.WithReportErrorFunc(reportFunc), + writecachebitcask.WithBlobstor(bs), + writecachebitcask.WithMetabase(mb))...) default: panic(fmt.Sprintf("invalid writecache type: %v", c.writeCacheOpts.Type)) } @@ -220,6 +227,8 @@ func WithWriteCacheMetrics(wcMetrics writecache.Metrics) Option { c.writeCacheOpts.BBoltOptions = append(c.writeCacheOpts.BBoltOptions, writecachebbolt.WithMetrics(wcMetrics)) case writecacheconfig.TypeBadger: c.writeCacheOpts.BadgerOptions = append(c.writeCacheOpts.BadgerOptions, writecachebadger.WithMetrics(wcMetrics)) + case writecacheconfig.TypeBitcask: + c.writeCacheOpts.BitcaskOptions = append(c.writeCacheOpts.BitcaskOptions, writecachebitcask.WithMetrics(wcMetrics)) } } } diff --git a/pkg/local_object_storage/shard/shard_test.go b/pkg/local_object_storage/shard/shard_test.go index 6337b0b6e..7702b99a5 100644 --- a/pkg/local_object_storage/shard/shard_test.go +++ b/pkg/local_object_storage/shard/shard_test.go @@ -15,6 +15,7 @@ import ( writecacheconfig "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache/config" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache/writecachebadger" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache/writecachebbolt" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache/writecachebitcask" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger/test" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" @@ -51,6 +52,10 @@ func newCustomShard(t testing.TB, rootPath string, enableWriteCache bool, wcOpts wcOpts.BadgerOptions = append( []writecachebadger.Option{writecachebadger.WithPath(filepath.Join(rootPath, "wcache"))}, wcOpts.BadgerOptions...) + case writecacheconfig.TypeBitcask: + wcOpts.BitcaskOptions = append( + []writecachebitcask.Option{writecachebitcask.WithPath(filepath.Join(rootPath, "wcache"))}, + wcOpts.BitcaskOptions...) } } else { rootPath = filepath.Join(rootPath, "nowc") diff --git a/pkg/local_object_storage/writecache/benchmark/writecache_test.go b/pkg/local_object_storage/writecache/benchmark/writecache_test.go index 6ae04a92a..b17251733 100644 --- a/pkg/local_object_storage/writecache/benchmark/writecache_test.go +++ b/pkg/local_object_storage/writecache/benchmark/writecache_test.go @@ -12,11 +12,15 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache/writecachebadger" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache/writecachebbolt" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache/writecachebitcask" "github.com/stretchr/testify/require" ) func BenchmarkWritecacheSeq(b *testing.B) { const payloadSize = 8 << 10 + b.Run("bitcask_seq", func(b *testing.B) { + benchmarkPutSeq(b, newBitcaskCache(b), payloadSize) + }) b.Run("bbolt_seq", func(b *testing.B) { benchmarkPutSeq(b, newBBoltCache(b), payloadSize) }) @@ -27,6 +31,9 @@ func BenchmarkWritecacheSeq(b *testing.B) { func BenchmarkWritecachePar(b *testing.B) { const payloadSize = 8 << 10 + b.Run("bitcask_par", func(b *testing.B) { + benchmarkPutPar(b, newBitcaskCache(b), payloadSize) + }) b.Run("bbolt_par", func(b *testing.B) { benchmarkPutPar(b, newBBoltCache(b), payloadSize) }) @@ -120,3 +127,14 @@ func newBadgerCache(b *testing.B) writecache.Cache { writecachebadger.WithGCInterval(10*time.Second), ) } + +func newBitcaskCache(b *testing.B) writecache.Cache { + bs := teststore.New( + teststore.WithPut(func(pp common.PutPrm) (common.PutRes, error) { return common.PutRes{}, nil }), + ) + return writecachebitcask.New( + writecachebitcask.WithPath(b.TempDir()), + writecachebitcask.WithBlobstor(bs), + writecachebitcask.WithMetabase(testMetabase{}), + ) +} diff --git a/pkg/local_object_storage/writecache/config/config.go b/pkg/local_object_storage/writecache/config/config.go index 91f097e17..1ec43d6dc 100644 --- a/pkg/local_object_storage/writecache/config/config.go +++ b/pkg/local_object_storage/writecache/config/config.go @@ -4,6 +4,7 @@ package config import ( "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache/writecachebadger" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache/writecachebbolt" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache/writecachebitcask" ) // Type is the write cache implementation type. @@ -12,11 +13,13 @@ type Type int const ( TypeBBolt Type = iota TypeBadger + TypeBitcask ) // Options are the configuration options for the write cache. type Options struct { - Type Type - BBoltOptions []writecachebbolt.Option - BadgerOptions []writecachebadger.Option + Type Type + BBoltOptions []writecachebbolt.Option + BadgerOptions []writecachebadger.Option + BitcaskOptions []writecachebitcask.Option } diff --git a/pkg/local_object_storage/writecache/writecachebitcask/api.go b/pkg/local_object_storage/writecache/writecachebitcask/api.go new file mode 100644 index 000000000..8d0c4a19f --- /dev/null +++ b/pkg/local_object_storage/writecache/writecachebitcask/api.go @@ -0,0 +1,48 @@ +package writecachebitcask + +import ( + "context" + "encoding/binary" + + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache" + objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" + oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" +) + +func (c *cache) Get(_ context.Context, addr oid.Address) (*objectSDK.Object, error) { + region := c.locateRegion(addr) + return c.regions[region].get(addr) +} + +func (c *cache) Head(_ context.Context, addr oid.Address) (*objectSDK.Object, error) { + region := c.locateRegion(addr) + return c.regions[region].get(addr) +} + +func (c *cache) Put(ctx context.Context, req common.PutPrm) (common.PutRes, error) { + if uint64(len(req.RawData)) > c.maxObjectSize { + return common.PutRes{}, writecache.ErrBigObject + } + if mode.Mode(c.mode.Load()).ReadOnly() { + return common.PutRes{}, writecache.ErrReadOnly + } + region := c.locateRegion(req.Address) + return common.PutRes{}, c.regions[region].put(ctx, req.Address, req.RawData) +} + +func (c *cache) Delete(ctx context.Context, addr oid.Address) error { + if mode.Mode(c.mode.Load()).ReadOnly() { + return writecache.ErrReadOnly + } + region := c.locateRegion(addr) + return c.regions[region].delete(ctx, addr) +} + +func (c *cache) locateRegion(addr oid.Address) int { + id := addr.Object() + h := binary.LittleEndian.Uint32(id[:4]) + region := h & (uint32(c.regionCount) - 1) + return int(region) +} diff --git a/pkg/local_object_storage/writecache/writecachebitcask/api_test.go b/pkg/local_object_storage/writecache/writecachebitcask/api_test.go new file mode 100644 index 000000000..00125bbfa --- /dev/null +++ b/pkg/local_object_storage/writecache/writecachebitcask/api_test.go @@ -0,0 +1,82 @@ +package writecachebitcask + +import ( + "context" + "testing" + + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/teststore" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil" + meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client" + oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" + "github.com/stretchr/testify/require" +) + +func TestAPI(t *testing.T) { + ctx := context.Background() + + bs := teststore.New( + teststore.WithPut(func(pp common.PutPrm) (common.PutRes, error) { return common.PutRes{}, nil }), + ) + + c := New( + WithPath(t.TempDir()), + WithBlobstor(bs), + WithMetabase(testMetabase{}), + ) + + require.NoError(t, c.Open(false)) + require.NoError(t, c.Init()) + + obj := testutil.GenerateObject() + addr := testutil.AddressFromObject(t, obj) + data, err := obj.Marshal() + require.NoError(t, err) + + // Get nonexistent object + { + _, gotErr := c.Get(ctx, oid.Address{}) + require.True(t, client.IsErrObjectNotFound(gotErr)) + } + + // Put an object + { + _, err := c.Put(ctx, common.PutPrm{ + Address: addr, + Object: obj, + RawData: data, + }) + + require.NoError(t, err) + } + + // Get the object previously put + { + gotObj, err := c.Get(ctx, addr) + require.NoError(t, err) + gotData, err := gotObj.Marshal() + require.NoError(t, err) + require.Equal(t, data, gotData) + } + + // Delete the object previously put + { + require.NoError(t, c.Delete(ctx, addr)) + require.True(t, client.IsErrObjectNotFound(c.Delete(ctx, addr))) + } + + // Get the object previously deleted + { + _, gotErr := c.Get(ctx, addr) + require.True(t, client.IsErrObjectNotFound(gotErr)) + } + + require.NoError(t, c.Close()) +} + +type testMetabase struct{} + +func (testMetabase) UpdateStorageID(meta.UpdateStorageIDPrm) (meta.UpdateStorageIDRes, error) { + return meta.UpdateStorageIDRes{}, nil +} diff --git a/pkg/local_object_storage/writecache/writecachebitcask/control.go b/pkg/local_object_storage/writecache/writecachebitcask/control.go new file mode 100644 index 000000000..13f57cde2 --- /dev/null +++ b/pkg/local_object_storage/writecache/writecachebitcask/control.go @@ -0,0 +1,81 @@ +package writecachebitcask + +import ( + "errors" + "fmt" + + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" +) + +// SetLogger sets logger. It is used after the shard ID was generated to use it in logs. +func (c *cache) SetLogger(l *logger.Logger) { + c.log = l +} + +func (c *cache) DumpInfo() writecache.Info { + return writecache.Info{ + Path: c.path, + } +} + +func (c *cache) Open(readOnly bool) error { + // Validate + if c.bucketCount&(c.bucketCount-1) != 0 { + return fmt.Errorf("numBuckets must be a power of 2: got %d", c.bucketCount) + } + if c.regionCount&(c.regionCount-1) != 0 { + return fmt.Errorf("numRegions must be a power of 2: got %d", c.regionCount) + } + if c.regionCount >= c.bucketCount { + return errors.New("numBuckets must be greater than numRegions") + } + + // Create regions + c.regions = make([]*region, c.regionCount) + for i := 0; i < c.regionCount; i++ { + c.regions[i] = ®ion{ + opts: &c.options, + index: i, + keyDir: make([][]*entry, c.bucketCount/c.regionCount), + flushCh: make(chan uint32, c.maxPendingLogFileFlush), + } + } + + if readOnly { + _ = c.SetMode(mode.ReadOnly) + } else { + _ = c.SetMode(mode.ReadWrite) + } + + return nil +} + +func (c *cache) Init() error { + for _, r := range c.regions { + go r.flushWorker() + if err := r.init(); err != nil { + r.close() + return err + } + } + return nil +} + +func (c *cache) Close() error { + var lastErr error + if !c.closed.Swap(true) { + for _, r := range c.regions { + if err := r.close(); err != nil { + lastErr = err + } + } + } + return lastErr +} + +func (c *cache) SetMode(m mode.Mode) error { + c.mode.Store(uint32(m)) + return nil +} diff --git a/pkg/local_object_storage/writecache/writecachebitcask/flush.go b/pkg/local_object_storage/writecache/writecachebitcask/flush.go new file mode 100644 index 000000000..edcd88a91 --- /dev/null +++ b/pkg/local_object_storage/writecache/writecachebitcask/flush.go @@ -0,0 +1,205 @@ +package writecachebitcask + +import ( + "bytes" + "context" + "encoding/binary" + "errors" + "fmt" + "io" + "os" + "time" + + "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" + meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" + cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" + objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" + oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" + "go.uber.org/zap" +) + +func (c *cache) Flush(ctx context.Context, ignoreErrors bool) error { + var lastErr error + + // Forcibly rotate all active membuffers and log files + for _, r := range c.regions { + r.flushWriteBatch() + r.Lock() + if err := r.rotateLogFile(ctx); err != nil && !ignoreErrors { + lastErr = err + } + r.Unlock() + } + + // Wait for all flush channels to drain + for _, r := range c.regions { + for len(r.flushCh) > 0 { + time.Sleep(1 * time.Second) + } + } + + return lastErr +} + +func (r *region) flushWorker() { + for logIndex := range r.flushCh { + again: + // Read the whole log file contents in memory + b, err := os.ReadFile(r.logFilePath(logIndex)) + if err != nil { + r.opts.log.Error(logs.WritecacheBitcaskReadingLogFile, + zap.Int("region", r.index), + zap.Uint32("logIndex", logIndex), + zap.Error(err)) + time.Sleep(1 * time.Second) + goto again + } + + // Flush the log file contents + for { + err := r.flushBytes(logIndex, b) + if err == nil { + break + } + r.opts.log.Error(logs.WritecacheBitcaskFlushingLogBytes, + zap.Int("region", r.index), + zap.Uint32("logIndex", logIndex), + zap.Error(err)) + time.Sleep(1 * time.Second) + } + + // Delete the log file + if err := os.Remove(r.logFilePath(logIndex)); err != nil { + r.opts.log.Error(logs.WritecacheBitcaskRemovingLogFile, + zap.Int("region", r.index), + zap.Uint32("logIndex", logIndex), + zap.Error(err)) + } + } +} + +func (r *region) flushEntry(logIndex uint32, offset int, addr oid.Address, obj *objectSDK.Object) error { + // Put the object to the underlying storage and store its storageID + var storageID []byte + if obj != nil { + var prm common.PutPrm + prm.Object = obj + + res, err := r.opts.blobstor.Put(context.TODO(), prm) + if err != nil { + return fmt.Errorf("putting object in main storage: %w", err) + } + storageID = res.StorageID + } + + r.Lock() + + // Find the current log index and offset of the entry in the key directory. + bucket := r.locateBucket(addr) + var curLogIndex uint32 + curOffset := -1 + bucketIndex := -1 + for i, e := range r.keyDir[bucket] { + if e.addr.Equals(addr) { + bucketIndex = i + curLogIndex = e.logIndex + curOffset = e.offset + break + } + } + + // If the log file entry is up-to-date, then update the object metadata as well. + if curLogIndex == logIndex && curOffset == offset && storageID != nil { + var updPrm meta.UpdateStorageIDPrm + updPrm.SetAddress(addr) + updPrm.SetStorageID(storageID) + + if _, err := r.opts.metabase.UpdateStorageID(updPrm); err != nil { + r.Unlock() + return fmt.Errorf("updating object metadata: %w", err) + } + } + + // If the entry is currently in the key directory, remove it. + if bucketIndex != -1 { + last := len(r.keyDir[bucket]) - 1 + r.keyDir[bucket][bucketIndex], r.keyDir[bucket][last] = r.keyDir[bucket][last], r.keyDir[bucket][bucketIndex] + r.keyDir[bucket] = r.keyDir[bucket][:last] + } + + r.Unlock() + + return nil +} + +func (r *region) flushBytes(logIndex uint32, b []byte) error { + rd := bytes.NewReader(b) + + for offset := 0; ; { + addr, obj, n, err := readLogFileEntry(rd) + if err != nil { + if errors.Is(err, io.EOF) { + break + } + return fmt.Errorf("reading log file entry: %w", err) + } + + if err := r.flushEntry(logIndex, offset, addr, obj); err != nil { + if rf := r.opts.reportError; rf != nil { + rf(addr.EncodeToString(), err) + } + return fmt.Errorf("flushing log entry: %w", err) + } + + offset += n + } + + return nil +} + +// readLogFileEntry reads an log file entry from the given reader. +// It returns the corresponding address, object and entry size. +// A nil object is returned if the entry correspond to object deletion. +func readLogFileEntry(r io.Reader) (oid.Address, *objectSDK.Object, int, error) { + // Read address header + + var addr oid.Address + var c cid.ID + var o oid.ID + + if _, err := r.Read(c[:]); err != nil { + return addr, nil, 0, fmt.Errorf("reading container ID: %w", err) + } + if _, err := r.Read(o[:]); err != nil { + return addr, nil, 0, fmt.Errorf("reading object ID: %w", err) + } + + addr.SetContainer(c) + addr.SetObject(o) + + // Read payload size + + var sizeBytes [sizeLen]byte + if _, err := r.Read(sizeBytes[:]); err != nil { + return addr, nil, 0, fmt.Errorf("reading object size: %w", err) + } + size := binary.LittleEndian.Uint32(sizeBytes[:]) + + // Read and unmarshal object, if needed + + var data []byte + var obj *objectSDK.Object + if size != tombstone { + data = make([]byte, size) + if _, err := r.Read(data); err != nil { + return addr, nil, 0, fmt.Errorf("reading object data: %w", err) + } + obj = objectSDK.New() + if err := obj.Unmarshal(data); err != nil { + return addr, nil, 0, fmt.Errorf("unmarshaling object: %w", err) + } + } + + return addr, obj, keyLen + sizeLen + len(data), nil +} diff --git a/pkg/local_object_storage/writecache/writecachebitcask/generic_test.go b/pkg/local_object_storage/writecache/writecachebitcask/generic_test.go new file mode 100644 index 000000000..27236d164 --- /dev/null +++ b/pkg/local_object_storage/writecache/writecachebitcask/generic_test.go @@ -0,0 +1,16 @@ +package writecachebitcask + +import ( + "testing" + + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/storagetest" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger/test" +) + +func TestGeneric(t *testing.T) { + storagetest.TestAll(t, func(t *testing.T) storagetest.Component { + return New( + WithLogger(test.NewLogger(t, true)), + WithPath(t.TempDir())) + }) +} diff --git a/pkg/local_object_storage/writecache/writecachebitcask/options.go b/pkg/local_object_storage/writecache/writecachebitcask/options.go new file mode 100644 index 000000000..ad8c2b51b --- /dev/null +++ b/pkg/local_object_storage/writecache/writecachebitcask/options.go @@ -0,0 +1,140 @@ +package writecachebitcask + +import ( + "time" + + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" + "go.uber.org/zap" +) + +// Option represents write-cache configuration option. +type Option func(*options) + +type options struct { + path string + log *logger.Logger + blobstor writecache.MainStorage + metabase writecache.Metabase + metrics writecache.Metrics + + // reportError is the function called when encountering disk errors in background workers. + reportError func(string, error) + + maxObjectSize uint64 + bucketCount int + regionCount int + maxLogSize uint64 + maxBatchDelay time.Duration + maxPendingLogFileFlush int +} + +// WithPath sets path to writecache data. +func WithPath(path string) Option { + return func(o *options) { + o.path = path + } +} + +// WithLogger sets logger. +func WithLogger(log *logger.Logger) Option { + return func(o *options) { + o.log = &logger.Logger{Logger: log.With(zap.String("component", "WriteCache"))} + } +} + +// WithBlobstor sets main object storage. +func WithBlobstor(bs writecache.MainStorage) Option { + return func(o *options) { + o.blobstor = bs + } +} + +// WithMetabase sets metabase. +func WithMetabase(db writecache.Metabase) Option { + return func(o *options) { + o.metabase = db + } +} + +// WithMetrics sets metrics implementation. +func WithMetrics(metrics writecache.Metrics) Option { + return func(o *options) { + o.metrics = metrics + } +} + +// WithReportErrorFunc sets error reporting function. +func WithReportErrorFunc(f func(string, error)) Option { + return func(o *options) { + o.reportError = f + } +} + +// WithMaxObjectSize sets maximum object size to be stored in write-cache. +func WithMaxObjectSize(sz uint64) Option { + return func(o *options) { + if sz > 0 { + o.maxObjectSize = sz + } + } +} + +// WithBucketCount sets the number of buckets to use. +// +// This value determines the total number of buckets to use by the internal hash table. +// More buckets means fewer collisions but also increased memory usage. +// +// Default value is 2^16. +func WithBucketCount(bucketCount int) Option { + return func(o *options) { + o.bucketCount = bucketCount + } +} + +// WithRegionCount sets the number of regions to use. +// +// This is the number of independent partitions of the key space. Each region contains its +// own lock, key directory, membuffer, log files and flushing process. +// +// Default value is 4. +func WithRegionCount(regionCount int) Option { + return func(o *options) { + o.regionCount = regionCount + } +} + +// WithLogFileSize sets the maximum size of a log file. +// +// After a log file grows to this size, it will be closed and passed to the flushing process. +// +// Default value is 64 MiB. +func WithLogFileSize(logSize uint64) Option { + return func(o *options) { + o.maxLogSize = logSize + } +} + +// WithMaxBatchDelay sets for how long to keep current write batch. +// +// Any pending write batch will be flushed to the current log file after at most this duration. +// This helps minimize the number of sync IO operations under heavy load. +// +// Default value is 1ms. +func WithMaxBatchDelay(d time.Duration) Option { + return func(o *options) { + o.maxBatchDelay = d + } +} + +// WithMaxPendingLogFileFlush sets the maximum number of pending log files to be flushed per region. +// +// This is the maximum size of the queue of log files for the flushing process. After this many +// files are enqueued for flushing, requests will block until the flushing process can keep up. +// +// Default value is 4. +func WithMaxPendingLogFileFlush(n int) Option { + return func(o *options) { + o.maxPendingLogFileFlush = n + } +} diff --git a/pkg/local_object_storage/writecache/writecachebitcask/region.go b/pkg/local_object_storage/writecache/writecachebitcask/region.go new file mode 100644 index 000000000..1da4675e5 --- /dev/null +++ b/pkg/local_object_storage/writecache/writecachebitcask/region.go @@ -0,0 +1,387 @@ +package writecachebitcask + +import ( + "bytes" + "context" + "encoding/binary" + "fmt" + "io" + "math" + "os" + "path/filepath" + "strconv" + "strings" + "sync" + "time" + + "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr" + apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" + cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" + objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" + oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" + "go.uber.org/zap" +) + +const ( + logFileOpenFlags = os.O_WRONLY | os.O_CREATE | os.O_APPEND | os.O_SYNC + keyLen = len(cid.ID{}) + len(oid.ID{}) + sizeLen = 4 + tombstone = uint32(math.MaxUint32) +) + +// entry is a key directory entry. +// +// It stores the object address and its current log file index and offset. +type entry struct { + addr oid.Address + logIndex uint32 + offset int +} + +// pendingWrite is a write operation in the current write batch, not yet committed to the log file. +type pendingWrite struct { + addr oid.Address + offset int + errCh chan error + isDelete bool +} + +type region struct { + sync.RWMutex + + // Parameters + opts *options + index int + + // Key directory + keyDir [][]*entry + + // Current mem-buffer and log file + wbuf bytes.Buffer + logIndex uint32 + logFile *os.File + size int + + // Put batch state + writeBatch []pendingWrite + + // Flush state + flushCh chan uint32 +} + +func (r *region) init() error { + if err := r.restore(); err != nil { + return err + } + + f, err := os.OpenFile(r.logFilePath(r.logIndex), logFileOpenFlags, 0644) + if err != nil { + return fmt.Errorf("creating log file for region %d: %v", r.index, err) + } + r.logFile = f + + return nil +} + +// restore restores the region state from existing log files, if any. +func (r *region) restore() error { + dir := filepath.Join(r.opts.path, strconv.Itoa(r.index)) + if err := os.MkdirAll(dir, 0755); err != nil { + return fmt.Errorf("creating region directory %d: %v", r.index, err) + } + entries, err := os.ReadDir(dir) + if err != nil { + return fmt.Errorf("listing region directory %d: %v", r.index, err) + } + var logIndices []uint32 + for _, ei := range entries { + if !ei.Type().IsRegular() { + continue + } + name := strings.TrimSuffix(filepath.Base(ei.Name()), filepath.Ext(ei.Name())) + index, err := strconv.ParseUint(name, 16, 32) + if err != nil { + return fmt.Errorf("parsing log file index %q: %v", ei.Name(), err) + } + logIndices = append(logIndices, uint32(index)) + } + + logCount := len(logIndices) + if logCount == 0 { + return nil + } + + r.logIndex = dispatchRecoveredLogIndices(logIndices, func(i uint32) { + r.flushCh <- i + }) + + return nil +} + +func dispatchRecoveredLogIndices(indices []uint32, dispatchFunc func(uint32)) uint32 { + n := len(indices) + i0 := 0 + + // Check if the indices wrap around and correct the starting point + if indices[0] == 0 && indices[n-1] == math.MaxUint32 { + i0 = 1 + for indices[i0-1] == indices[i0]-1 { + i0++ + } + } + + for i := 0; i < n; i++ { + dispatchFunc(indices[(i0+i)%n]) + } + + return indices[(i0+n-1)%n] + 1 +} + +func (r *region) close() error { + close(r.flushCh) + if r.logFile != nil { + return r.logFile.Close() + } + return nil +} + +func (r *region) put(ctx context.Context, addr oid.Address, data []byte) error { + c, o := addr.Container(), addr.Object() + pp := pendingWrite{ + addr: addr, + errCh: make(chan error, 1), + } + + r.Lock() + + // If the current log file is too large or missing, create a new one. + if r.logFile == nil || uint64(r.size) >= r.opts.maxLogSize { + if err := r.rotateLogFile(ctx); err != nil { + r.Unlock() + return err + } + } + + // Check whether we need to schedule a batch flush in the future. + wasEmpty := len(r.writeBatch) == 0 + + pp.offset = r.size + r.wbuf.Len() + r.writeBatch = append(r.writeBatch, pp) + + // Write the entry to the mem buffer. + r.wbuf.Write(c[:]) + r.wbuf.Write(o[:]) + _ = binary.Write(&r.wbuf, binary.LittleEndian, uint32(len(data))) + r.wbuf.Write(data) + + r.Unlock() + + if wasEmpty { + time.AfterFunc(r.opts.maxBatchDelay, r.flushWriteBatch) + } + + // Wait for the batch flush. + select { + case err := <-pp.errCh: + return err + case <-ctx.Done(): + return ctx.Err() + } +} + +func (r *region) delete(ctx context.Context, addr oid.Address) error { + c, o := addr.Container(), addr.Object() + pp := pendingWrite{ + addr: addr, + errCh: make(chan error, 1), + isDelete: true, + } + + bucket := r.locateBucket(addr) + + r.Lock() + + // If the current log file is too large or missing, create a new one. + if r.logFile == nil || uint64(r.size) >= r.opts.maxLogSize { + if err := r.rotateLogFile(ctx); err != nil { + r.Unlock() + return err + } + } + + // Locate the current entry (if any) in the key directory. + offset := -1 + for _, e := range r.keyDir[bucket] { + if e.addr.Equals(addr) { + offset = e.offset + break + } + } + + if offset == -1 { + r.Unlock() + return logicerr.Wrap(new(apistatus.ObjectNotFound)) + } + + // Check whether we need to schedule a batch flush in the future. + wasEmpty := len(r.writeBatch) == 0 + + pp.offset = r.size + r.wbuf.Len() + r.writeBatch = append(r.writeBatch, pp) + + // Write the entry to the mem buffer. + r.wbuf.Write(c[:]) + r.wbuf.Write(o[:]) + _ = binary.Write(&r.wbuf, binary.LittleEndian, tombstone) + + r.Unlock() + + if wasEmpty { + time.AfterFunc(r.opts.maxBatchDelay, r.flushWriteBatch) + } + + // Wait for the batch flush. + select { + case err := <-pp.errCh: + return err + case <-ctx.Done(): + return ctx.Err() + } +} + +func (r *region) get(addr oid.Address) (*objectSDK.Object, error) { + bucket := r.locateBucket(addr) + + r.RLock() + + // Locate the log file index and offset of the entry. + var logIndex uint32 + offset := -1 + for _, e := range r.keyDir[bucket] { + if e.addr.Equals(addr) { + logIndex = e.logIndex + offset = e.offset + break + } + } + + defer r.RUnlock() + + if offset == -1 { + return nil, logicerr.Wrap(new(apistatus.ObjectNotFound)) + } + + // Read the entry data from the corresponding log file. + f, err := os.Open(r.logFilePath(logIndex)) + if err != nil { + return nil, fmt.Errorf("reading log file: %w", err) + } + defer f.Close() + + if _, err := f.Seek(int64(offset), io.SeekStart); err != nil { + return nil, fmt.Errorf("seeking log entry: %w", err) + } + _, obj, _, err := readLogFileEntry(f) + if err != nil { + return nil, fmt.Errorf("reading log entry: %w", err) + } + if obj == nil { + return nil, logicerr.Wrap(new(apistatus.ObjectNotFound)) + } + + return obj, nil +} + +// flushWriteBatch appends the membuffer to the current log file and returns +// any error to the callers. +func (r *region) flushWriteBatch() { + r.Lock() + defer r.Unlock() + + n, err := r.logFile.Write(r.wbuf.Bytes()) + for _, call := range r.writeBatch { + call.errCh <- err + } + + if err == nil { + for _, call := range r.writeBatch { + r.updateKeyOffset(call.addr, call.offset, call.isDelete) + } + r.size += n + } + + // Reset membuffer and clear the current write batch + r.wbuf.Reset() + r.writeBatch = r.writeBatch[:0] +} + +func (r *region) locateBucket(addr oid.Address) int { + id := addr.Object() + h := binary.LittleEndian.Uint32(id[4:]) + bucket := h & (uint32(len(r.keyDir)) - 1) + return int(bucket) +} + +func (r *region) updateKeyOffset(addr oid.Address, offset int, isDelete bool) { + bucket := r.locateBucket(addr) + exists := false + for _, e := range r.keyDir[bucket] { + if e.addr.Equals(addr) { + exists = true + // This check is necessary because the entries should be updated in the + // same order they are appended to the log file. Otherwise, a different + // state might result from recovering. + if e.offset < offset { + if isDelete { + e.offset = -1 + } else { + e.offset = offset + } + e.logIndex = r.logIndex + } + break + } + } + + if !exists { + r.keyDir[bucket] = append(r.keyDir[bucket], &entry{ + addr: addr, + offset: offset, + logIndex: r.logIndex, + }) + } +} + +// rotateLogFile closes the current log file, passes it to the flushing process and starts a new one. +func (r *region) rotateLogFile(ctx context.Context) error { + if r.logFile != nil { + if err := r.logFile.Close(); err != nil { + r.opts.log.Error(logs.WritecacheBitcaskClosingLogFile, + zap.Uint32("logIndex", r.logIndex), + zap.Error(err)) + } + select { + case r.flushCh <- r.logIndex: + // Mark the log file as nil only after the flushing process is aware of it. + r.logFile = nil + case <-ctx.Done(): + return ctx.Err() + } + } + + f, err := os.OpenFile(r.logFilePath(r.logIndex+1), logFileOpenFlags, 0644) + if err != nil { + return fmt.Errorf("creating log file for region %d: %w", r.index, err) + } + + r.logIndex++ + r.logFile = f + r.size = 0 + + return nil +} + +func (r *region) logFilePath(i uint32) string { + return filepath.Join(r.opts.path, strconv.Itoa(r.index), fmt.Sprintf("%08X.wlog", i)) +} diff --git a/pkg/local_object_storage/writecache/writecachebitcask/region_test.go b/pkg/local_object_storage/writecache/writecachebitcask/region_test.go new file mode 100644 index 000000000..7618a4e70 --- /dev/null +++ b/pkg/local_object_storage/writecache/writecachebitcask/region_test.go @@ -0,0 +1,33 @@ +package writecachebitcask + +import ( + "math" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestDispatchRecoveredIndices(t *testing.T) { + max := uint32(math.MaxUint32) + tests := []struct { + indices []uint32 + wantOrder []uint32 + wantIndex uint32 + }{ + {[]uint32{0}, []uint32{0}, 1}, + {[]uint32{42}, []uint32{42}, 43}, + {[]uint32{5, 6, 7, 8}, []uint32{5, 6, 7, 8}, 9}, + {[]uint32{max - 2, max - 1, max}, []uint32{max - 2, max - 1, max}, 0}, + {[]uint32{0, 1, 2, max - 2, max - 1, max}, []uint32{max - 2, max - 1, max, 0, 1, 2}, 3}, + {[]uint32{0, max}, []uint32{max, 0}, 1}, + } + + for _, tc := range tests { + var gotOrder []uint32 + gotIndex := dispatchRecoveredLogIndices(tc.indices, func(i uint32) { + gotOrder = append(gotOrder, i) + }) + require.Equal(t, tc.wantOrder, gotOrder) + require.Equal(t, tc.wantIndex, gotIndex) + } +} diff --git a/pkg/local_object_storage/writecache/writecachebitcask/writecachebitcask.go b/pkg/local_object_storage/writecache/writecachebitcask/writecachebitcask.go new file mode 100644 index 000000000..d2cb67fdb --- /dev/null +++ b/pkg/local_object_storage/writecache/writecachebitcask/writecachebitcask.go @@ -0,0 +1,84 @@ +package writecachebitcask + +import ( + "sync/atomic" + "time" + + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" + "go.uber.org/zap" +) + +/* + +The cache operates as a hash table where the key space is split in regions, each with its own lock (a sync.Mutex). +Each region maintains a key directory and appends updates (PUTs and DELETEs) to a log file. A key directory +entry stores the address, the log file index and the offset within the logfile. This structure is similar to +a bitcask (see https://en.wikipedia.org/wiki/Bitcask and https://riak.com/assets/bitcask-intro.pdf). + +Since the file writes use O_SYNC, the updates are batched in a memory buffer first, so that it incurs fewer writes +under heavy load. After a log file reaches maximum capacity, it's closed and a new one is started. The completed log +files are pushed to the flushing process which processes them one by one and deletes them after updating the underlying +storage. The flushing process doesn't poll periodically for new work; instead, there's a buffered channel to which the +log file indices are pushed. If this channel fills, it will block the sending process (a request) until there's room +to continue, providing backpressure to the client when the underlying storage is not able to keep up. + +The lower bytes of the object addresses are used as region and bucket hashes, since they are already hashed values +by construction. + + │ │ + Regions │ In-Memory │ In-Disk + ───────────────────────┼───────────────┼───────────────────────────────────────────────────────────────────────────────── + ┌─ ┌─ │ │ ┌────────────────────────────────────────┐ ┌───────────┐ ┌───────────┐ + │ │ Bucket 1 │ KeyDir │ │ LogFile 0 │ │ LogFile 1 │ │ LogFile 2 │ + │ Region 1 │ Bucket 2 │ ┌───────────┐ │ │┌──────────────┐ ┌──────────────┐ │ │ │ │ │ + │ │ (...) │ │ MemBuffer │ │ ││Addr Size Data│ │Addr Size Data│ (...) │ │ (...) │ │ (...) │ (...) + │ (Mutex) │ Bucket K │ └───────────┘ │ │└──────────────┘ └──────────────┘ │ │ │ │ │ + │ └─ │ │ └────────────────────────────────────────┘ └───────────┘ └───────────┘ + │ ───────────────────────┼───────────────┼───────────────────────────────────────────────────────────────────────────────── + │ ┌─ │ │ ┌────────────────────────────────────────┐ ┌───────────┐ ┌───────────┐ + │ │ Bucket K+1 │ KeyDir │ │ LogFile 0 │ │ LogFile 1 │ │ LogFile 2 │ + │ Region 2 │ Bucket K+2 │ ┌───────────┐ │ │┌──────────────┐ ┌──────────────┐ │ │ │ │ │ + │ │ (...) │ │ MemBuffer │ │ ││Addr Size Data│ │Addr Size Data│ (...) │ │ (...) │ │ (...) │ (...) + Key │ │ Bucket 2K │ └───────────┘ │ │└──────────────┘ └──────────────┘ │ │ │ │ │ + Space │ └─ │ │ └────────────────────────────────────────┘ └───────────┘ └───────────┘ + │ ───────────────────────┼───────────────┼───────────────────────────────────────────────────────────────────────────────── + │ (...) │ (...) │ (...) + │ ───────────────────────┼───────────────┼───────────────────────────────────────────────────────────────────────────────── + │ ┌─ │ │ ┌────────────────────────────────────────┐ ┌───────────┐ ┌───────────┐ + │ │ Bucket (N-1)*K+1 │ KeyDir │ │ LogFile 0 │ │ LogFile 1 │ │ LogFile 2 │ + │ Region N │ Bucket (N-1)*K+2 │ ┌───────────┐ │ │┌──────────────┐ ┌──────────────┐ │ │ │ │ │ + │ │ (...) │ │ MemBuffer │ │ ││Addr Size Data│ │Addr Size Data│ (...) │ │ (...) │ │ (...) │ (...) + │ │ Bucket N*K │ └───────────┘ │ │└──────────────┘ └──────────────┘ │ │ │ │ │ + └─ └─ │ │ └────────────────────────────────────────┘ └───────────┘ └───────────┘ + ───────────────────────┴───────────────┴───────────────────────────────────────────────────────────────────────────────── + +*/ + +type cache struct { + options + + mode atomic.Uint32 + closed atomic.Bool + regions []*region +} + +func New(opts ...Option) writecache.Cache { + c := &cache{ + options: options{ + log: &logger.Logger{Logger: zap.NewNop()}, + metrics: writecache.DefaultMetrics(), + + maxObjectSize: 128 << 10, + bucketCount: 1 << 16, + regionCount: 1 << 2, + maxLogSize: 64 << 20, + maxBatchDelay: 1 * time.Millisecond, + maxPendingLogFileFlush: 4, + }, + } + for i := range opts { + opts[i](&c.options) + } + return c +}