WIP: Experimental bitcask-based writecache implementation #654
16 changed files with 1228 additions and 14 deletions
|
@ -43,6 +43,7 @@ import (
|
|||
writecacheconfig "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache/config"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache/writecachebadger"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache/writecachebbolt"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache/writecachebitcask"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/metrics"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client"
|
||||
containerClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/container"
|
||||
|
@ -128,17 +129,21 @@ type shardCfg struct {
|
|||
}
|
||||
|
||||
writecacheCfg struct {
|
||||
enabled bool
|
||||
typ writecacheconfig.Type
|
||||
path string
|
||||
maxBatchSize int
|
||||
maxBatchDelay time.Duration
|
||||
smallObjectSize uint64
|
||||
maxObjSize uint64
|
||||
flushWorkerCount int
|
||||
sizeLimit uint64
|
||||
noSync bool
|
||||
gcInterval time.Duration
|
||||
enabled bool
|
||||
typ writecacheconfig.Type
|
||||
path string
|
||||
maxBatchSize int
|
||||
maxBatchDelay time.Duration
|
||||
smallObjectSize uint64
|
||||
maxObjSize uint64
|
||||
flushWorkerCount int
|
||||
sizeLimit uint64
|
||||
noSync bool
|
||||
gcInterval time.Duration
|
||||
bucketCount int
|
||||
regionCount int
|
||||
logFileSize uint64
|
||||
maxPendingLogFileFlush int
|
||||
}
|
||||
|
||||
piloramaCfg struct {
|
||||
|
@ -253,6 +258,10 @@ func (a *applicationConfiguration) setShardWriteCacheConfig(newConfig *shardCfg,
|
|||
wc.sizeLimit = writeCacheCfg.SizeLimit()
|
||||
wc.noSync = writeCacheCfg.NoSync()
|
||||
wc.gcInterval = writeCacheCfg.GCInterval()
|
||||
wc.bucketCount = writeCacheCfg.BucketCount()
|
||||
wc.regionCount = writeCacheCfg.RegionCount()
|
||||
wc.logFileSize = writeCacheCfg.LogFileSize()
|
||||
wc.maxPendingLogFileFlush = writeCacheCfg.MaxPendingLogFileFlush()
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -744,6 +753,18 @@ func (c *cfg) getWriteCacheOpts(shCfg shardCfg) writecacheconfig.Options {
|
|||
writecachebadger.WithLogger(c.log),
|
||||
writecachebadger.WithGCInterval(wcRead.gcInterval),
|
||||
)
|
||||
case writecacheconfig.TypeBitcask:
|
||||
writeCacheOpts.Type = writecacheconfig.TypeBitcask
|
||||
writeCacheOpts.BitcaskOptions = append(writeCacheOpts.BitcaskOptions,
|
||||
writecachebitcask.WithPath(wcRead.path),
|
||||
writecachebitcask.WithLogger(c.log),
|
||||
writecachebitcask.WithMaxObjectSize(wcRead.maxObjSize),
|
||||
writecachebitcask.WithBucketCount(wcRead.bucketCount),
|
||||
writecachebitcask.WithRegionCount(wcRead.regionCount),
|
||||
writecachebitcask.WithLogFileSize(wcRead.logFileSize),
|
||||
writecachebitcask.WithMaxBatchDelay(wcRead.maxBatchDelay),
|
||||
writecachebitcask.WithMaxPendingLogFileFlush(wcRead.maxPendingLogFileFlush),
|
||||
)
|
||||
default:
|
||||
panic(fmt.Sprintf("unknown writecache type: %q", wcRead.typ))
|
||||
}
|
||||
|
|
|
@ -28,6 +28,18 @@ const (
|
|||
|
||||
// DefaultGCInterval is the default duration of the GC cycle interval.
|
||||
DefaultGCInterval = 1 * time.Minute
|
||||
|
||||
// DefaultBucketCount is the default number of buckets for the bitcask cache.
|
||||
DefaultBucketCount = 1 << 16
|
||||
|
||||
// DefaultRegionCount is the default number of regions for the bitcask cache.
|
||||
DefaultRegionCount = 1 << 2
|
||||
|
||||
// DefaultLogFileSize is the default max size of a single log file for the bitcask cache.
|
||||
DefaultLogFileSize = 64 << 20
|
||||
|
||||
// DefaultMaxPendingLogFileFlush is the default max waiting log files to be flushed for the bitcask cache.
|
||||
DefaultMaxPendingLogFileFlush = 4
|
||||
)
|
||||
|
||||
// From wraps config section into Config.
|
||||
|
@ -53,6 +65,8 @@ func (x *Config) Type() writecacheconfig.Type {
|
|||
return writecacheconfig.TypeBBolt
|
||||
case "badger":
|
||||
return writecacheconfig.TypeBadger
|
||||
case "bitcask":
|
||||
return writecacheconfig.TypeBitcask
|
||||
}
|
||||
|
||||
panic(fmt.Sprintf("invalid writecache type: %q", t))
|
||||
|
@ -162,3 +176,67 @@ func (x *Config) GCInterval() time.Duration {
|
|||
|
||||
return DefaultGCInterval
|
||||
}
|
||||
|
||||
// BucketCount returns the value of "bucket_count" config parameter.
|
||||
//
|
||||
// Returns BucketCountDefault if the value is not a positive number.
|
||||
func (x *Config) BucketCount() int {
|
||||
c := config.IntSafe(
|
||||
(*config.Config)(x),
|
||||
"bucket_count",
|
||||
)
|
||||
|
||||
if c > 0 {
|
||||
return int(c)
|
||||
}
|
||||
|
||||
return DefaultBucketCount
|
||||
}
|
||||
|
||||
// RegionCount returns the value of "region_count" config parameter.
|
||||
//
|
||||
// Returns RegionCountDefault if the value is not a positive number.
|
||||
func (x *Config) RegionCount() int {
|
||||
c := config.IntSafe(
|
||||
(*config.Config)(x),
|
||||
"region_count",
|
||||
)
|
||||
|
||||
if c > 0 {
|
||||
return int(c)
|
||||
}
|
||||
|
||||
return DefaultRegionCount
|
||||
}
|
||||
|
||||
// LogFileSize returns the value of "log_file_size" config parameter.
|
||||
//
|
||||
// Returns LogFileSizeDefault if the value is not a positive number.
|
||||
func (x *Config) LogFileSize() uint64 {
|
||||
c := config.SizeInBytesSafe(
|
||||
(*config.Config)(x),
|
||||
"log_file_size",
|
||||
)
|
||||
|
||||
if c > 0 {
|
||||
return c
|
||||
}
|
||||
|
||||
return DefaultLogFileSize
|
||||
}
|
||||
|
||||
// MaxPendingLogFileFlush returns the value of "max_pending_log_file_flush" config parameter.
|
||||
//
|
||||
// Returns MaxPendingLogFileFlushDefault if the value is not a positive number.
|
||||
func (x *Config) MaxPendingLogFileFlush() int {
|
||||
c := config.IntSafe(
|
||||
(*config.Config)(x),
|
||||
"max_pending_log_file_flush",
|
||||
)
|
||||
|
||||
if c > 0 {
|
||||
return int(c)
|
||||
}
|
||||
|
||||
return DefaultMaxPendingLogFileFlush
|
||||
}
|
||||
|
|
|
@ -297,6 +297,10 @@ const (
|
|||
WritecacheCantRemoveObjectFromWritecache = "can't remove object from write-cache"
|
||||
WritecacheDBValueLogGCRunCompleted = "value log GC run completed"
|
||||
WritecacheBadgerObjAlreadyScheduled = "object already scheduled for flush"
|
||||
WritecacheBitcaskReadingLogFile = "reading log file"
|
||||
WritecacheBitcaskFlushingLogBytes = "flushing log bytes"
|
||||
WritecacheBitcaskRemovingLogFile = "removing log file"
|
||||
WritecacheBitcaskClosingLogFile = "closing log file"
|
||||
BlobovniczatreeCouldNotGetObjectFromLevel = "could not get object from level"
|
||||
BlobovniczatreeCouldNotReadPayloadRangeFromOpenedBlobovnicza = "could not read payload range from opened blobovnicza"
|
||||
BlobovniczatreeCouldNotReadPayloadRangeFromActiveBlobovnicza = "could not read payload range from active blobovnicza"
|
||||
|
|
|
@ -16,6 +16,7 @@ import (
|
|||
writecacheconfig "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache/config"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache/writecachebadger"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache/writecachebbolt"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache/writecachebitcask"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
|
@ -170,6 +171,12 @@ func New(opts ...Option) *Shard {
|
|||
writecachebadger.WithReportErrorFunc(reportFunc),
|
||||
writecachebadger.WithBlobstor(bs),
|
||||
writecachebadger.WithMetabase(mb))...)
|
||||
case writecacheconfig.TypeBitcask:
|
||||
s.writeCache = writecachebitcask.New(
|
||||
append(c.writeCacheOpts.BitcaskOptions,
|
||||
writecachebitcask.WithReportErrorFunc(reportFunc),
|
||||
writecachebitcask.WithBlobstor(bs),
|
||||
writecachebitcask.WithMetabase(mb))...)
|
||||
default:
|
||||
panic(fmt.Sprintf("invalid writecache type: %v", c.writeCacheOpts.Type))
|
||||
}
|
||||
|
@ -220,6 +227,8 @@ func WithWriteCacheMetrics(wcMetrics writecache.Metrics) Option {
|
|||
c.writeCacheOpts.BBoltOptions = append(c.writeCacheOpts.BBoltOptions, writecachebbolt.WithMetrics(wcMetrics))
|
||||
case writecacheconfig.TypeBadger:
|
||||
c.writeCacheOpts.BadgerOptions = append(c.writeCacheOpts.BadgerOptions, writecachebadger.WithMetrics(wcMetrics))
|
||||
case writecacheconfig.TypeBitcask:
|
||||
c.writeCacheOpts.BitcaskOptions = append(c.writeCacheOpts.BitcaskOptions, writecachebitcask.WithMetrics(wcMetrics))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -15,6 +15,7 @@ import (
|
|||
writecacheconfig "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache/config"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache/writecachebadger"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache/writecachebbolt"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache/writecachebitcask"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger/test"
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
|
@ -51,6 +52,10 @@ func newCustomShard(t testing.TB, rootPath string, enableWriteCache bool, wcOpts
|
|||
wcOpts.BadgerOptions = append(
|
||||
[]writecachebadger.Option{writecachebadger.WithPath(filepath.Join(rootPath, "wcache"))},
|
||||
wcOpts.BadgerOptions...)
|
||||
case writecacheconfig.TypeBitcask:
|
||||
wcOpts.BitcaskOptions = append(
|
||||
[]writecachebitcask.Option{writecachebitcask.WithPath(filepath.Join(rootPath, "wcache"))},
|
||||
wcOpts.BitcaskOptions...)
|
||||
}
|
||||
} else {
|
||||
rootPath = filepath.Join(rootPath, "nowc")
|
||||
|
|
|
@ -12,11 +12,15 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache/writecachebadger"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache/writecachebbolt"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache/writecachebitcask"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func BenchmarkWritecacheSeq(b *testing.B) {
|
||||
const payloadSize = 8 << 10
|
||||
b.Run("bitcask_seq", func(b *testing.B) {
|
||||
benchmarkPutSeq(b, newBitcaskCache(b), payloadSize)
|
||||
})
|
||||
b.Run("bbolt_seq", func(b *testing.B) {
|
||||
benchmarkPutSeq(b, newBBoltCache(b), payloadSize)
|
||||
})
|
||||
|
@ -27,6 +31,9 @@ func BenchmarkWritecacheSeq(b *testing.B) {
|
|||
|
||||
func BenchmarkWritecachePar(b *testing.B) {
|
||||
const payloadSize = 8 << 10
|
||||
b.Run("bitcask_par", func(b *testing.B) {
|
||||
benchmarkPutPar(b, newBitcaskCache(b), payloadSize)
|
||||
})
|
||||
b.Run("bbolt_par", func(b *testing.B) {
|
||||
benchmarkPutPar(b, newBBoltCache(b), payloadSize)
|
||||
})
|
||||
|
@ -120,3 +127,14 @@ func newBadgerCache(b *testing.B) writecache.Cache {
|
|||
writecachebadger.WithGCInterval(10*time.Second),
|
||||
)
|
||||
}
|
||||
|
||||
func newBitcaskCache(b *testing.B) writecache.Cache {
|
||||
bs := teststore.New(
|
||||
teststore.WithPut(func(pp common.PutPrm) (common.PutRes, error) { return common.PutRes{}, nil }),
|
||||
)
|
||||
return writecachebitcask.New(
|
||||
writecachebitcask.WithPath(b.TempDir()),
|
||||
writecachebitcask.WithBlobstor(bs),
|
||||
writecachebitcask.WithMetabase(testMetabase{}),
|
||||
)
|
||||
}
|
||||
|
|
|
@ -4,6 +4,7 @@ package config
|
|||
import (
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache/writecachebadger"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache/writecachebbolt"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache/writecachebitcask"
|
||||
)
|
||||
|
||||
// Type is the write cache implementation type.
|
||||
|
@ -12,11 +13,13 @@ type Type int
|
|||
const (
|
||||
TypeBBolt Type = iota
|
||||
TypeBadger
|
||||
TypeBitcask
|
||||
)
|
||||
|
||||
// Options are the configuration options for the write cache.
|
||||
type Options struct {
|
||||
Type Type
|
||||
BBoltOptions []writecachebbolt.Option
|
||||
BadgerOptions []writecachebadger.Option
|
||||
Type Type
|
||||
BBoltOptions []writecachebbolt.Option
|
||||
BadgerOptions []writecachebadger.Option
|
||||
BitcaskOptions []writecachebitcask.Option
|
||||
}
|
||||
|
|
48
pkg/local_object_storage/writecache/writecachebitcask/api.go
Normal file
48
pkg/local_object_storage/writecache/writecachebitcask/api.go
Normal file
|
@ -0,0 +1,48 @@
|
|||
package writecachebitcask
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/binary"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache"
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
)
|
||||
|
||||
func (c *cache) Get(_ context.Context, addr oid.Address) (*objectSDK.Object, error) {
|
||||
region := c.locateRegion(addr)
|
||||
return c.regions[region].get(addr)
|
||||
}
|
||||
|
||||
func (c *cache) Head(_ context.Context, addr oid.Address) (*objectSDK.Object, error) {
|
||||
region := c.locateRegion(addr)
|
||||
return c.regions[region].get(addr)
|
||||
}
|
||||
|
||||
func (c *cache) Put(ctx context.Context, req common.PutPrm) (common.PutRes, error) {
|
||||
if uint64(len(req.RawData)) > c.maxObjectSize {
|
||||
return common.PutRes{}, writecache.ErrBigObject
|
||||
}
|
||||
if mode.Mode(c.mode.Load()).ReadOnly() {
|
||||
return common.PutRes{}, writecache.ErrReadOnly
|
||||
}
|
||||
region := c.locateRegion(req.Address)
|
||||
return common.PutRes{}, c.regions[region].put(ctx, req.Address, req.RawData)
|
||||
}
|
||||
|
||||
func (c *cache) Delete(ctx context.Context, addr oid.Address) error {
|
||||
if mode.Mode(c.mode.Load()).ReadOnly() {
|
||||
return writecache.ErrReadOnly
|
||||
}
|
||||
region := c.locateRegion(addr)
|
||||
return c.regions[region].delete(ctx, addr)
|
||||
}
|
||||
|
||||
func (c *cache) locateRegion(addr oid.Address) int {
|
||||
id := addr.Object()
|
||||
h := binary.LittleEndian.Uint32(id[:4])
|
||||
region := h & (uint32(c.regionCount) - 1)
|
||||
return int(region)
|
||||
}
|
|
@ -0,0 +1,82 @@
|
|||
package writecachebitcask
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/teststore"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
|
||||
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestAPI(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
bs := teststore.New(
|
||||
teststore.WithPut(func(pp common.PutPrm) (common.PutRes, error) { return common.PutRes{}, nil }),
|
||||
)
|
||||
|
||||
c := New(
|
||||
WithPath(t.TempDir()),
|
||||
WithBlobstor(bs),
|
||||
WithMetabase(testMetabase{}),
|
||||
)
|
||||
|
||||
require.NoError(t, c.Open(false))
|
||||
require.NoError(t, c.Init())
|
||||
|
||||
obj := testutil.GenerateObject()
|
||||
addr := testutil.AddressFromObject(t, obj)
|
||||
data, err := obj.Marshal()
|
||||
require.NoError(t, err)
|
||||
|
||||
// Get nonexistent object
|
||||
{
|
||||
_, gotErr := c.Get(ctx, oid.Address{})
|
||||
require.True(t, client.IsErrObjectNotFound(gotErr))
|
||||
}
|
||||
|
||||
// Put an object
|
||||
{
|
||||
_, err := c.Put(ctx, common.PutPrm{
|
||||
Address: addr,
|
||||
Object: obj,
|
||||
RawData: data,
|
||||
})
|
||||
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
// Get the object previously put
|
||||
{
|
||||
gotObj, err := c.Get(ctx, addr)
|
||||
require.NoError(t, err)
|
||||
gotData, err := gotObj.Marshal()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, data, gotData)
|
||||
}
|
||||
|
||||
// Delete the object previously put
|
||||
{
|
||||
require.NoError(t, c.Delete(ctx, addr))
|
||||
require.True(t, client.IsErrObjectNotFound(c.Delete(ctx, addr)))
|
||||
}
|
||||
|
||||
// Get the object previously deleted
|
||||
{
|
||||
_, gotErr := c.Get(ctx, addr)
|
||||
require.True(t, client.IsErrObjectNotFound(gotErr))
|
||||
}
|
||||
|
||||
require.NoError(t, c.Close())
|
||||
}
|
||||
|
||||
type testMetabase struct{}
|
||||
|
||||
func (testMetabase) UpdateStorageID(meta.UpdateStorageIDPrm) (meta.UpdateStorageIDRes, error) {
|
||||
return meta.UpdateStorageIDRes{}, nil
|
||||
}
|
|
@ -0,0 +1,81 @@
|
|||
package writecachebitcask
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||
)
|
||||
|
||||
// SetLogger sets logger. It is used after the shard ID was generated to use it in logs.
|
||||
func (c *cache) SetLogger(l *logger.Logger) {
|
||||
c.log = l
|
||||
}
|
||||
|
||||
func (c *cache) DumpInfo() writecache.Info {
|
||||
return writecache.Info{
|
||||
Path: c.path,
|
||||
}
|
||||
}
|
||||
|
||||
func (c *cache) Open(readOnly bool) error {
|
||||
// Validate
|
||||
if c.bucketCount&(c.bucketCount-1) != 0 {
|
||||
return fmt.Errorf("numBuckets must be a power of 2: got %d", c.bucketCount)
|
||||
}
|
||||
if c.regionCount&(c.regionCount-1) != 0 {
|
||||
return fmt.Errorf("numRegions must be a power of 2: got %d", c.regionCount)
|
||||
}
|
||||
if c.regionCount >= c.bucketCount {
|
||||
return errors.New("numBuckets must be greater than numRegions")
|
||||
}
|
||||
|
||||
// Create regions
|
||||
c.regions = make([]*region, c.regionCount)
|
||||
for i := 0; i < c.regionCount; i++ {
|
||||
c.regions[i] = ®ion{
|
||||
opts: &c.options,
|
||||
index: i,
|
||||
keyDir: make([][]*entry, c.bucketCount/c.regionCount),
|
||||
flushCh: make(chan uint32, c.maxPendingLogFileFlush),
|
||||
}
|
||||
}
|
||||
|
||||
if readOnly {
|
||||
_ = c.SetMode(mode.ReadOnly)
|
||||
} else {
|
||||
_ = c.SetMode(mode.ReadWrite)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *cache) Init() error {
|
||||
for _, r := range c.regions {
|
||||
go r.flushWorker()
|
||||
if err := r.init(); err != nil {
|
||||
r.close()
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *cache) Close() error {
|
||||
var lastErr error
|
||||
if !c.closed.Swap(true) {
|
||||
for _, r := range c.regions {
|
||||
if err := r.close(); err != nil {
|
||||
lastErr = err
|
||||
}
|
||||
}
|
||||
}
|
||||
return lastErr
|
||||
}
|
||||
|
||||
func (c *cache) SetMode(m mode.Mode) error {
|
||||
c.mode.Store(uint32(m))
|
||||
return nil
|
||||
}
|
205
pkg/local_object_storage/writecache/writecachebitcask/flush.go
Normal file
205
pkg/local_object_storage/writecache/writecachebitcask/flush.go
Normal file
|
@ -0,0 +1,205 @@
|
|||
package writecachebitcask
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/binary"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
func (c *cache) Flush(ctx context.Context, ignoreErrors bool) error {
|
||||
var lastErr error
|
||||
|
||||
// Forcibly rotate all active membuffers and log files
|
||||
for _, r := range c.regions {
|
||||
r.flushWriteBatch()
|
||||
r.Lock()
|
||||
if err := r.rotateLogFile(ctx); err != nil && !ignoreErrors {
|
||||
lastErr = err
|
||||
}
|
||||
r.Unlock()
|
||||
}
|
||||
|
||||
// Wait for all flush channels to drain
|
||||
for _, r := range c.regions {
|
||||
for len(r.flushCh) > 0 {
|
||||
time.Sleep(1 * time.Second)
|
||||
}
|
||||
}
|
||||
|
||||
return lastErr
|
||||
}
|
||||
|
||||
func (r *region) flushWorker() {
|
||||
for logIndex := range r.flushCh {
|
||||
again:
|
||||
// Read the whole log file contents in memory
|
||||
b, err := os.ReadFile(r.logFilePath(logIndex))
|
||||
if err != nil {
|
||||
r.opts.log.Error(logs.WritecacheBitcaskReadingLogFile,
|
||||
zap.Int("region", r.index),
|
||||
zap.Uint32("logIndex", logIndex),
|
||||
zap.Error(err))
|
||||
time.Sleep(1 * time.Second)
|
||||
goto again
|
||||
}
|
||||
|
||||
// Flush the log file contents
|
||||
for {
|
||||
err := r.flushBytes(logIndex, b)
|
||||
if err == nil {
|
||||
break
|
||||
}
|
||||
r.opts.log.Error(logs.WritecacheBitcaskFlushingLogBytes,
|
||||
zap.Int("region", r.index),
|
||||
zap.Uint32("logIndex", logIndex),
|
||||
zap.Error(err))
|
||||
time.Sleep(1 * time.Second)
|
||||
}
|
||||
|
||||
// Delete the log file
|
||||
if err := os.Remove(r.logFilePath(logIndex)); err != nil {
|
||||
r.opts.log.Error(logs.WritecacheBitcaskRemovingLogFile,
|
||||
zap.Int("region", r.index),
|
||||
zap.Uint32("logIndex", logIndex),
|
||||
zap.Error(err))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (r *region) flushEntry(logIndex uint32, offset int, addr oid.Address, obj *objectSDK.Object) error {
|
||||
// Put the object to the underlying storage and store its storageID
|
||||
var storageID []byte
|
||||
if obj != nil {
|
||||
var prm common.PutPrm
|
||||
prm.Object = obj
|
||||
|
||||
res, err := r.opts.blobstor.Put(context.TODO(), prm)
|
||||
if err != nil {
|
||||
return fmt.Errorf("putting object in main storage: %w", err)
|
||||
}
|
||||
storageID = res.StorageID
|
||||
}
|
||||
|
||||
r.Lock()
|
||||
|
||||
// Find the current log index and offset of the entry in the key directory.
|
||||
bucket := r.locateBucket(addr)
|
||||
var curLogIndex uint32
|
||||
curOffset := -1
|
||||
bucketIndex := -1
|
||||
for i, e := range r.keyDir[bucket] {
|
||||
if e.addr.Equals(addr) {
|
||||
bucketIndex = i
|
||||
curLogIndex = e.logIndex
|
||||
curOffset = e.offset
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
// If the log file entry is up-to-date, then update the object metadata as well.
|
||||
if curLogIndex == logIndex && curOffset == offset && storageID != nil {
|
||||
var updPrm meta.UpdateStorageIDPrm
|
||||
updPrm.SetAddress(addr)
|
||||
updPrm.SetStorageID(storageID)
|
||||
|
||||
if _, err := r.opts.metabase.UpdateStorageID(updPrm); err != nil {
|
||||
r.Unlock()
|
||||
return fmt.Errorf("updating object metadata: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
// If the entry is currently in the key directory, remove it.
|
||||
if bucketIndex != -1 {
|
||||
last := len(r.keyDir[bucket]) - 1
|
||||
r.keyDir[bucket][bucketIndex], r.keyDir[bucket][last] = r.keyDir[bucket][last], r.keyDir[bucket][bucketIndex]
|
||||
r.keyDir[bucket] = r.keyDir[bucket][:last]
|
||||
}
|
||||
|
||||
r.Unlock()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *region) flushBytes(logIndex uint32, b []byte) error {
|
||||
rd := bytes.NewReader(b)
|
||||
|
||||
for offset := 0; ; {
|
||||
addr, obj, n, err := readLogFileEntry(rd)
|
||||
if err != nil {
|
||||
if errors.Is(err, io.EOF) {
|
||||
break
|
||||
}
|
||||
return fmt.Errorf("reading log file entry: %w", err)
|
||||
}
|
||||
|
||||
if err := r.flushEntry(logIndex, offset, addr, obj); err != nil {
|
||||
if rf := r.opts.reportError; rf != nil {
|
||||
rf(addr.EncodeToString(), err)
|
||||
}
|
||||
return fmt.Errorf("flushing log entry: %w", err)
|
||||
}
|
||||
|
||||
offset += n
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// readLogFileEntry reads an log file entry from the given reader.
|
||||
// It returns the corresponding address, object and entry size.
|
||||
// A nil object is returned if the entry correspond to object deletion.
|
||||
func readLogFileEntry(r io.Reader) (oid.Address, *objectSDK.Object, int, error) {
|
||||
// Read address header
|
||||
|
||||
var addr oid.Address
|
||||
var c cid.ID
|
||||
var o oid.ID
|
||||
|
||||
if _, err := r.Read(c[:]); err != nil {
|
||||
return addr, nil, 0, fmt.Errorf("reading container ID: %w", err)
|
||||
}
|
||||
if _, err := r.Read(o[:]); err != nil {
|
||||
return addr, nil, 0, fmt.Errorf("reading object ID: %w", err)
|
||||
}
|
||||
|
||||
addr.SetContainer(c)
|
||||
addr.SetObject(o)
|
||||
|
||||
// Read payload size
|
||||
|
||||
var sizeBytes [sizeLen]byte
|
||||
|
||||
if _, err := r.Read(sizeBytes[:]); err != nil {
|
||||
return addr, nil, 0, fmt.Errorf("reading object size: %w", err)
|
||||
}
|
||||
size := binary.LittleEndian.Uint32(sizeBytes[:])
|
||||
|
||||
// Read and unmarshal object, if needed
|
||||
|
||||
var data []byte
|
||||
var obj *objectSDK.Object
|
||||
if size != tombstone {
|
||||
data = make([]byte, size)
|
||||
if _, err := r.Read(data); err != nil {
|
||||
return addr, nil, 0, fmt.Errorf("reading object data: %w", err)
|
||||
}
|
||||
obj = objectSDK.New()
|
||||
if err := obj.Unmarshal(data); err != nil {
|
||||
return addr, nil, 0, fmt.Errorf("unmarshaling object: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
return addr, obj, keyLen + sizeLen + len(data), nil
|
||||
}
|
|
@ -0,0 +1,16 @@
|
|||
package writecachebitcask
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/storagetest"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger/test"
|
||||
)
|
||||
|
||||
func TestGeneric(t *testing.T) {
|
||||
storagetest.TestAll(t, func(t *testing.T) storagetest.Component {
|
||||
return New(
|
||||
WithLogger(test.NewLogger(t, true)),
|
||||
WithPath(t.TempDir()))
|
||||
})
|
||||
}
|
140
pkg/local_object_storage/writecache/writecachebitcask/options.go
Normal file
140
pkg/local_object_storage/writecache/writecachebitcask/options.go
Normal file
|
@ -0,0 +1,140 @@
|
|||
package writecachebitcask
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// Option represents write-cache configuration option.
|
||||
type Option func(*options)
|
||||
|
||||
type options struct {
|
||||
path string
|
||||
log *logger.Logger
|
||||
blobstor writecache.MainStorage
|
||||
metabase writecache.Metabase
|
||||
metrics writecache.Metrics
|
||||
|
||||
// reportError is the function called when encountering disk errors in background workers.
|
||||
reportError func(string, error)
|
||||
|
||||
maxObjectSize uint64
|
||||
bucketCount int
|
||||
regionCount int
|
||||
maxLogSize uint64
|
||||
maxBatchDelay time.Duration
|
||||
maxPendingLogFileFlush int
|
||||
}
|
||||
|
||||
// WithPath sets path to writecache data.
|
||||
func WithPath(path string) Option {
|
||||
return func(o *options) {
|
||||
o.path = path
|
||||
}
|
||||
}
|
||||
|
||||
// WithLogger sets logger.
|
||||
func WithLogger(log *logger.Logger) Option {
|
||||
return func(o *options) {
|
||||
o.log = &logger.Logger{Logger: log.With(zap.String("component", "WriteCache"))}
|
||||
}
|
||||
}
|
||||
|
||||
// WithBlobstor sets main object storage.
|
||||
func WithBlobstor(bs writecache.MainStorage) Option {
|
||||
return func(o *options) {
|
||||
o.blobstor = bs
|
||||
}
|
||||
}
|
||||
|
||||
// WithMetabase sets metabase.
|
||||
func WithMetabase(db writecache.Metabase) Option {
|
||||
return func(o *options) {
|
||||
o.metabase = db
|
||||
}
|
||||
}
|
||||
|
||||
// WithMetrics sets metrics implementation.
|
||||
func WithMetrics(metrics writecache.Metrics) Option {
|
||||
return func(o *options) {
|
||||
o.metrics = metrics
|
||||
}
|
||||
}
|
||||
|
||||
// WithReportErrorFunc sets error reporting function.
|
||||
func WithReportErrorFunc(f func(string, error)) Option {
|
||||
return func(o *options) {
|
||||
o.reportError = f
|
||||
}
|
||||
}
|
||||
|
||||
// WithMaxObjectSize sets maximum object size to be stored in write-cache.
|
||||
func WithMaxObjectSize(sz uint64) Option {
|
||||
return func(o *options) {
|
||||
if sz > 0 {
|
||||
o.maxObjectSize = sz
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// WithBucketCount sets the number of buckets to use.
|
||||
//
|
||||
// This value determines the total number of buckets to use by the internal hash table.
|
||||
// More buckets means fewer collisions but also increased memory usage.
|
||||
//
|
||||
// Default value is 2^16.
|
||||
func WithBucketCount(bucketCount int) Option {
|
||||
return func(o *options) {
|
||||
o.bucketCount = bucketCount
|
||||
}
|
||||
}
|
||||
|
||||
// WithRegionCount sets the number of regions to use.
|
||||
//
|
||||
// This is the number of independent partitions of the key space. Each region contains its
|
||||
// own lock, key directory, membuffer, log files and flushing process.
|
||||
//
|
||||
// Default value is 4.
|
||||
func WithRegionCount(regionCount int) Option {
|
||||
return func(o *options) {
|
||||
o.regionCount = regionCount
|
||||
}
|
||||
}
|
||||
|
||||
// WithLogFileSize sets the maximum size of a log file.
|
||||
//
|
||||
// After a log file grows to this size, it will be closed and passed to the flushing process.
|
||||
//
|
||||
// Default value is 64 MiB.
|
||||
func WithLogFileSize(logSize uint64) Option {
|
||||
return func(o *options) {
|
||||
o.maxLogSize = logSize
|
||||
}
|
||||
}
|
||||
|
||||
// WithMaxBatchDelay sets for how long to keep current write batch.
|
||||
//
|
||||
// Any pending write batch will be flushed to the current log file after at most this duration.
|
||||
// This helps minimize the number of sync IO operations under heavy load.
|
||||
//
|
||||
// Default value is 1ms.
|
||||
func WithMaxBatchDelay(d time.Duration) Option {
|
||||
return func(o *options) {
|
||||
o.maxBatchDelay = d
|
||||
}
|
||||
}
|
||||
|
||||
// WithMaxPendingLogFileFlush sets the maximum number of pending log files to be flushed per region.
|
||||
//
|
||||
// This is the maximum size of the queue of log files for the flushing process. After this many
|
||||
// files are enqueued for flushing, requests will block until the flushing process can keep up.
|
||||
//
|
||||
// Default value is 4.
|
||||
func WithMaxPendingLogFileFlush(n int) Option {
|
||||
return func(o *options) {
|
||||
o.maxPendingLogFileFlush = n
|
||||
}
|
||||
}
|
387
pkg/local_object_storage/writecache/writecachebitcask/region.go
Normal file
387
pkg/local_object_storage/writecache/writecachebitcask/region.go
Normal file
|
@ -0,0 +1,387 @@
|
|||
package writecachebitcask
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
"io"
|
||||
"math"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
|
||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
const (
|
||||
logFileOpenFlags = os.O_WRONLY | os.O_CREATE | os.O_APPEND | os.O_SYNC
|
||||
keyLen = len(cid.ID{}) + len(oid.ID{})
|
||||
sizeLen = 4
|
||||
tombstone = uint32(math.MaxUint32)
|
||||
)
|
||||
|
||||
// entry is a key directory entry.
|
||||
//
|
||||
// It stores the object address and its current log file index and offset.
|
||||
type entry struct {
|
||||
addr oid.Address
|
||||
logIndex uint32
|
||||
offset int
|
||||
}
|
||||
|
||||
// pendingWrite is a write operation in the current write batch, not yet committed to the log file.
|
||||
type pendingWrite struct {
|
||||
addr oid.Address
|
||||
offset int
|
||||
errCh chan error
|
||||
isDelete bool
|
||||
}
|
||||
|
||||
type region struct {
|
||||
sync.RWMutex
|
||||
|
||||
// Parameters
|
||||
opts *options
|
||||
index int
|
||||
|
||||
// Key directory
|
||||
keyDir [][]*entry
|
||||
|
||||
// Current mem-buffer and log file
|
||||
wbuf bytes.Buffer
|
||||
logIndex uint32
|
||||
logFile *os.File
|
||||
size int
|
||||
|
||||
// Put batch state
|
||||
writeBatch []pendingWrite
|
||||
|
||||
// Flush state
|
||||
flushCh chan uint32
|
||||
}
|
||||
|
||||
func (r *region) init() error {
|
||||
if err := r.restore(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
f, err := os.OpenFile(r.logFilePath(r.logIndex), logFileOpenFlags, 0644)
|
||||
if err != nil {
|
||||
return fmt.Errorf("creating log file for region %d: %v", r.index, err)
|
||||
}
|
||||
r.logFile = f
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// restore restores the region state from existing log files, if any.
|
||||
func (r *region) restore() error {
|
||||
dir := filepath.Join(r.opts.path, strconv.Itoa(r.index))
|
||||
if err := os.MkdirAll(dir, 0755); err != nil {
|
||||
return fmt.Errorf("creating region directory %d: %v", r.index, err)
|
||||
}
|
||||
entries, err := os.ReadDir(dir)
|
||||
if err != nil {
|
||||
return fmt.Errorf("listing region directory %d: %v", r.index, err)
|
||||
}
|
||||
var logIndices []uint32
|
||||
for _, ei := range entries {
|
||||
if !ei.Type().IsRegular() {
|
||||
continue
|
||||
}
|
||||
name := strings.TrimSuffix(filepath.Base(ei.Name()), filepath.Ext(ei.Name()))
|
||||
index, err := strconv.ParseUint(name, 16, 32)
|
||||
if err != nil {
|
||||
return fmt.Errorf("parsing log file index %q: %v", ei.Name(), err)
|
||||
}
|
||||
logIndices = append(logIndices, uint32(index))
|
||||
}
|
||||
|
||||
logCount := len(logIndices)
|
||||
if logCount == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
r.logIndex = dispatchRecoveredLogIndices(logIndices, func(i uint32) {
|
||||
r.flushCh <- i
|
||||
})
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func dispatchRecoveredLogIndices(indices []uint32, dispatchFunc func(uint32)) uint32 {
|
||||
n := len(indices)
|
||||
i0 := 0
|
||||
|
||||
// Check if the indices wrap around and correct the starting point
|
||||
if indices[0] == 0 && indices[n-1] == math.MaxUint32 {
|
||||
i0 = 1
|
||||
for indices[i0-1] == indices[i0]-1 {
|
||||
i0++
|
||||
}
|
||||
}
|
||||
|
||||
for i := 0; i < n; i++ {
|
||||
dispatchFunc(indices[(i0+i)%n])
|
||||
}
|
||||
|
||||
return indices[(i0+n-1)%n] + 1
|
||||
}
|
||||
|
||||
func (r *region) close() error {
|
||||
close(r.flushCh)
|
||||
if r.logFile != nil {
|
||||
return r.logFile.Close()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *region) put(ctx context.Context, addr oid.Address, data []byte) error {
|
||||
c, o := addr.Container(), addr.Object()
|
||||
pp := pendingWrite{
|
||||
addr: addr,
|
||||
errCh: make(chan error, 1),
|
||||
}
|
||||
|
||||
r.Lock()
|
||||
|
||||
// If the current log file is too large or missing, create a new one.
|
||||
if r.logFile == nil || uint64(r.size) >= r.opts.maxLogSize {
|
||||
if err := r.rotateLogFile(ctx); err != nil {
|
||||
r.Unlock()
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// Check whether we need to schedule a batch flush in the future.
|
||||
wasEmpty := len(r.writeBatch) == 0
|
||||
|
||||
pp.offset = r.size + r.wbuf.Len()
|
||||
r.writeBatch = append(r.writeBatch, pp)
|
||||
|
||||
// Write the entry to the mem buffer.
|
||||
r.wbuf.Write(c[:])
|
||||
r.wbuf.Write(o[:])
|
||||
_ = binary.Write(&r.wbuf, binary.LittleEndian, uint32(len(data)))
|
||||
r.wbuf.Write(data)
|
||||
|
||||
r.Unlock()
|
||||
|
||||
if wasEmpty {
|
||||
time.AfterFunc(r.opts.maxBatchDelay, r.flushWriteBatch)
|
||||
}
|
||||
|
||||
// Wait for the batch flush.
|
||||
select {
|
||||
case err := <-pp.errCh:
|
||||
return err
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
}
|
||||
}
|
||||
|
||||
func (r *region) delete(ctx context.Context, addr oid.Address) error {
|
||||
c, o := addr.Container(), addr.Object()
|
||||
pp := pendingWrite{
|
||||
addr: addr,
|
||||
errCh: make(chan error, 1),
|
||||
isDelete: true,
|
||||
}
|
||||
|
||||
bucket := r.locateBucket(addr)
|
||||
|
||||
r.Lock()
|
||||
|
||||
// If the current log file is too large or missing, create a new one.
|
||||
if r.logFile == nil || uint64(r.size) >= r.opts.maxLogSize {
|
||||
if err := r.rotateLogFile(ctx); err != nil {
|
||||
r.Unlock()
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// Locate the current entry (if any) in the key directory.
|
||||
offset := -1
|
||||
for _, e := range r.keyDir[bucket] {
|
||||
if e.addr.Equals(addr) {
|
||||
offset = e.offset
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if offset == -1 {
|
||||
r.Unlock()
|
||||
return logicerr.Wrap(new(apistatus.ObjectNotFound))
|
||||
}
|
||||
|
||||
// Check whether we need to schedule a batch flush in the future.
|
||||
wasEmpty := len(r.writeBatch) == 0
|
||||
|
||||
pp.offset = r.size + r.wbuf.Len()
|
||||
r.writeBatch = append(r.writeBatch, pp)
|
||||
|
||||
// Write the entry to the mem buffer.
|
||||
r.wbuf.Write(c[:])
|
||||
r.wbuf.Write(o[:])
|
||||
_ = binary.Write(&r.wbuf, binary.LittleEndian, tombstone)
|
||||
|
||||
r.Unlock()
|
||||
|
||||
if wasEmpty {
|
||||
time.AfterFunc(r.opts.maxBatchDelay, r.flushWriteBatch)
|
||||
}
|
||||
|
||||
// Wait for the batch flush.
|
||||
select {
|
||||
case err := <-pp.errCh:
|
||||
return err
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
}
|
||||
}
|
||||
|
||||
func (r *region) get(addr oid.Address) (*objectSDK.Object, error) {
|
||||
bucket := r.locateBucket(addr)
|
||||
|
||||
r.RLock()
|
||||
|
||||
// Locate the log file index and offset of the entry.
|
||||
var logIndex uint32
|
||||
offset := -1
|
||||
for _, e := range r.keyDir[bucket] {
|
||||
if e.addr.Equals(addr) {
|
||||
logIndex = e.logIndex
|
||||
offset = e.offset
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
defer r.RUnlock()
|
||||
|
||||
if offset == -1 {
|
||||
return nil, logicerr.Wrap(new(apistatus.ObjectNotFound))
|
||||
}
|
||||
|
||||
// Read the entry data from the corresponding log file.
|
||||
f, err := os.Open(r.logFilePath(logIndex))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("reading log file: %w", err)
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
if _, err := f.Seek(int64(offset), io.SeekStart); err != nil {
|
||||
return nil, fmt.Errorf("seeking log entry: %w", err)
|
||||
}
|
||||
_, obj, _, err := readLogFileEntry(f)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("reading log entry: %w", err)
|
||||
}
|
||||
if obj == nil {
|
||||
return nil, logicerr.Wrap(new(apistatus.ObjectNotFound))
|
||||
}
|
||||
|
||||
return obj, nil
|
||||
}
|
||||
|
||||
// flushWriteBatch appends the membuffer to the current log file and returns
|
||||
// any error to the callers.
|
||||
func (r *region) flushWriteBatch() {
|
||||
r.Lock()
|
||||
defer r.Unlock()
|
||||
|
||||
n, err := r.logFile.Write(r.wbuf.Bytes())
|
||||
for _, call := range r.writeBatch {
|
||||
call.errCh <- err
|
||||
}
|
||||
|
||||
if err == nil {
|
||||
for _, call := range r.writeBatch {
|
||||
r.updateKeyOffset(call.addr, call.offset, call.isDelete)
|
||||
}
|
||||
r.size += n
|
||||
}
|
||||
|
||||
// Reset membuffer and clear the current write batch
|
||||
r.wbuf.Reset()
|
||||
r.writeBatch = r.writeBatch[:0]
|
||||
}
|
||||
|
||||
func (r *region) locateBucket(addr oid.Address) int {
|
||||
id := addr.Object()
|
||||
h := binary.LittleEndian.Uint32(id[4:])
|
||||
bucket := h & (uint32(len(r.keyDir)) - 1)
|
||||
return int(bucket)
|
||||
}
|
||||
|
||||
func (r *region) updateKeyOffset(addr oid.Address, offset int, isDelete bool) {
|
||||
bucket := r.locateBucket(addr)
|
||||
exists := false
|
||||
for _, e := range r.keyDir[bucket] {
|
||||
if e.addr.Equals(addr) {
|
||||
exists = true
|
||||
// This check is necessary because the entries should be updated in the
|
||||
// same order they are appended to the log file. Otherwise, a different
|
||||
// state might result from recovering.
|
||||
if e.offset < offset {
|
||||
if isDelete {
|
||||
e.offset = -1
|
||||
} else {
|
||||
e.offset = offset
|
||||
}
|
||||
e.logIndex = r.logIndex
|
||||
}
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if !exists {
|
||||
r.keyDir[bucket] = append(r.keyDir[bucket], &entry{
|
||||
addr: addr,
|
||||
offset: offset,
|
||||
logIndex: r.logIndex,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// rotateLogFile closes the current log file, passes it to the flushing process and starts a new one.
|
||||
func (r *region) rotateLogFile(ctx context.Context) error {
|
||||
if r.logFile != nil {
|
||||
if err := r.logFile.Close(); err != nil {
|
||||
r.opts.log.Error(logs.WritecacheBitcaskClosingLogFile,
|
||||
zap.Uint32("logIndex", r.logIndex),
|
||||
zap.Error(err))
|
||||
}
|
||||
select {
|
||||
case r.flushCh <- r.logIndex:
|
||||
// Mark the log file as nil only after the flushing process is aware of it.
|
||||
r.logFile = nil
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
}
|
||||
}
|
||||
|
||||
f, err := os.OpenFile(r.logFilePath(r.logIndex+1), logFileOpenFlags, 0644)
|
||||
if err != nil {
|
||||
return fmt.Errorf("creating log file for region %d: %w", r.index, err)
|
||||
}
|
||||
|
||||
r.logIndex++
|
||||
r.logFile = f
|
||||
r.size = 0
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *region) logFilePath(i uint32) string {
|
||||
return filepath.Join(r.opts.path, strconv.Itoa(r.index), fmt.Sprintf("%08X.wlog", i))
|
||||
}
|
|
@ -0,0 +1,33 @@
|
|||
package writecachebitcask
|
||||
|
||||
import (
|
||||
"math"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestDispatchRecoveredIndices(t *testing.T) {
|
||||
max := uint32(math.MaxUint32)
|
||||
tests := []struct {
|
||||
indices []uint32
|
||||
wantOrder []uint32
|
||||
wantIndex uint32
|
||||
}{
|
||||
{[]uint32{0}, []uint32{0}, 1},
|
||||
{[]uint32{42}, []uint32{42}, 43},
|
||||
{[]uint32{5, 6, 7, 8}, []uint32{5, 6, 7, 8}, 9},
|
||||
{[]uint32{max - 2, max - 1, max}, []uint32{max - 2, max - 1, max}, 0},
|
||||
{[]uint32{0, 1, 2, max - 2, max - 1, max}, []uint32{max - 2, max - 1, max, 0, 1, 2}, 3},
|
||||
{[]uint32{0, max}, []uint32{max, 0}, 1},
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
var gotOrder []uint32
|
||||
gotIndex := dispatchRecoveredLogIndices(tc.indices, func(i uint32) {
|
||||
gotOrder = append(gotOrder, i)
|
||||
})
|
||||
require.Equal(t, tc.wantOrder, gotOrder)
|
||||
require.Equal(t, tc.wantIndex, gotIndex)
|
||||
}
|
||||
}
|
|
@ -0,0 +1,84 @@
|
|||
package writecachebitcask
|
||||
|
||||
import (
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
/*
|
||||
|
||||
The cache operates as a hash table where the key space is split in regions, each with its own lock (a sync.Mutex).
|
||||
Each region maintains a key directory and appends updates (PUTs and DELETEs) to a log file. A key directory
|
||||
entry stores the address, the log file index and the offset within the logfile. This structure is similar to
|
||||
a bitcask (see https://en.wikipedia.org/wiki/Bitcask and https://riak.com/assets/bitcask-intro.pdf).
|
||||
|
||||
Since the file writes use O_SYNC, the updates are batched in a memory buffer first, so that it incurs fewer writes
|
||||
under heavy load. After a log file reaches maximum capacity, it's closed and a new one is started. The completed log
|
||||
files are pushed to the flushing process which processes them one by one and deletes them after updating the underlying
|
||||
storage. The flushing process doesn't poll periodically for new work; instead, there's a buffered channel to which the
|
||||
log file indices are pushed. If this channel fills, it will block the sending process (a request) until there's room
|
||||
to continue, providing backpressure to the client when the underlying storage is not able to keep up.
|
||||
|
||||
The lower bytes of the object addresses are used as region and bucket hashes, since they are already hashed values
|
||||
by construction.
|
||||
|
||||
│ │
|
||||
Regions │ In-Memory │ In-Disk
|
||||
───────────────────────┼───────────────┼─────────────────────────────────────────────────────────────────────────────────
|
||||
┌─ ┌─ │ │ ┌────────────────────────────────────────┐ ┌───────────┐ ┌───────────┐
|
||||
│ │ Bucket 1 │ KeyDir │ │ LogFile 0 │ │ LogFile 1 │ │ LogFile 2 │
|
||||
│ Region 1 │ Bucket 2 │ ┌───────────┐ │ │┌──────────────┐ ┌──────────────┐ │ │ │ │ │
|
||||
│ │ (...) │ │ MemBuffer │ │ ││Addr Size Data│ │Addr Size Data│ (...) │ │ (...) │ │ (...) │ (...)
|
||||
│ (Mutex) │ Bucket K │ └───────────┘ │ │└──────────────┘ └──────────────┘ │ │ │ │ │
|
||||
│ └─ │ │ └────────────────────────────────────────┘ └───────────┘ └───────────┘
|
||||
│ ───────────────────────┼───────────────┼─────────────────────────────────────────────────────────────────────────────────
|
||||
│ ┌─ │ │ ┌────────────────────────────────────────┐ ┌───────────┐ ┌───────────┐
|
||||
│ │ Bucket K+1 │ KeyDir │ │ LogFile 0 │ │ LogFile 1 │ │ LogFile 2 │
|
||||
│ Region 2 │ Bucket K+2 │ ┌───────────┐ │ │┌──────────────┐ ┌──────────────┐ │ │ │ │ │
|
||||
│ │ (...) │ │ MemBuffer │ │ ││Addr Size Data│ │Addr Size Data│ (...) │ │ (...) │ │ (...) │ (...)
|
||||
Key │ │ Bucket 2K │ └───────────┘ │ │└──────────────┘ └──────────────┘ │ │ │ │ │
|
||||
Space │ └─ │ │ └────────────────────────────────────────┘ └───────────┘ └───────────┘
|
||||
│ ───────────────────────┼───────────────┼─────────────────────────────────────────────────────────────────────────────────
|
||||
│ (...) │ (...) │ (...)
|
||||
│ ───────────────────────┼───────────────┼─────────────────────────────────────────────────────────────────────────────────
|
||||
│ ┌─ │ │ ┌────────────────────────────────────────┐ ┌───────────┐ ┌───────────┐
|
||||
│ │ Bucket (N-1)*K+1 │ KeyDir │ │ LogFile 0 │ │ LogFile 1 │ │ LogFile 2 │
|
||||
│ Region N │ Bucket (N-1)*K+2 │ ┌───────────┐ │ │┌──────────────┐ ┌──────────────┐ │ │ │ │ │
|
||||
│ │ (...) │ │ MemBuffer │ │ ││Addr Size Data│ │Addr Size Data│ (...) │ │ (...) │ │ (...) │ (...)
|
||||
│ │ Bucket N*K │ └───────────┘ │ │└──────────────┘ └──────────────┘ │ │ │ │ │
|
||||
└─ └─ │ │ └────────────────────────────────────────┘ └───────────┘ └───────────┘
|
||||
───────────────────────┴───────────────┴─────────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
*/
|
||||
|
||||
type cache struct {
|
||||
options
|
||||
|
||||
mode atomic.Uint32
|
||||
dstepanov-yadro
commented
We have to add at least metrics to compare performance of the different writecache implementation. We have to add at least metrics to compare performance of the different writecache implementation.
|
||||
closed atomic.Bool
|
||||
regions []*region
|
||||
}
|
||||
|
||||
func New(opts ...Option) writecache.Cache {
|
||||
c := &cache{
|
||||
options: options{
|
||||
log: &logger.Logger{Logger: zap.NewNop()},
|
||||
metrics: writecache.DefaultMetrics(),
|
||||
|
||||
maxObjectSize: 128 << 10,
|
||||
bucketCount: 1 << 16,
|
||||
regionCount: 1 << 2,
|
||||
maxLogSize: 64 << 20,
|
||||
maxBatchDelay: 1 * time.Millisecond,
|
||||
maxPendingLogFileFlush: 4,
|
||||
},
|
||||
}
|
||||
for i := range opts {
|
||||
opts[i](&c.options)
|
||||
}
|
||||
return c
|
||||
}
|
Loading…
Add table
Reference in a new issue
Can u please make some constant and assign it to
4
?done