From 1a0cb0f34a3239df8a27be865528b5e41eafb8b0 Mon Sep 17 00:00:00 2001 From: Alejandro Lopez Date: Thu, 22 Jun 2023 14:55:30 +0300 Subject: [PATCH] [#421] Try using badger for the write-cache Signed-off-by: Alejandro Lopez --- cmd/frostfs-lens/internal/flags.go | 7 + .../internal/writecache/inspect.go | 35 +- cmd/frostfs-lens/internal/writecache/list.go | 30 +- cmd/frostfs-lens/internal/writecache/root.go | 13 +- cmd/frostfs-node/config.go | 53 ++- .../config/engine/shard/writecache/config.go | 36 ++ config/example/node.json | 1 + config/example/node.yaml | 1 + docs/storage-node-configuration.md | 2 + go.mod | 12 + go.sum | Bin 99007 -> 100798 bytes internal/logs/logs.go | 372 +++++++++--------- .../engine/control_test.go | 53 +-- pkg/local_object_storage/engine/shards.go | 5 +- .../internal/testutil/object.go | 6 + .../shard/control_test.go | 13 +- pkg/local_object_storage/shard/gc_test.go | 11 +- pkg/local_object_storage/shard/range_test.go | 13 +- pkg/local_object_storage/shard/shard.go | 40 +- pkg/local_object_storage/shard/shard_test.go | 24 +- .../shard/shutdown_test.go | 12 +- .../writecache/benchmark/writecache_test.go | 52 +++ .../writecache/config/config.go | 22 ++ pkg/local_object_storage/writecache/doc.go | 11 - .../writecache/flush_test.go | 237 ----------- .../writecache/generic_test.go | 30 -- .../writecache/metrics.go | 20 +- .../writecache/writecache.go | 144 +------ .../writecachebadger/cachebadger.go | 129 ++++++ .../writecache/writecachebadger/delete.go | 70 ++++ .../writecache/writecachebadger/flush.go | 257 ++++++++++++ .../writecache/writecachebadger/flush_test.go | 65 +++ .../writecache/writecachebadger/gc.go | 31 ++ .../writecachebadger/generic_test.go | 20 + .../writecache/writecachebadger/get.go | 95 +++++ .../writecache/writecachebadger/iterate.go | 32 ++ .../writecache/{ => writecachebadger}/mode.go | 9 +- .../writecache/writecachebadger/options.go | 141 +++++++ .../writecache/writecachebadger/put.go | 82 ++++ .../writecache/writecachebadger/state.go | 57 +++ .../writecache/writecachebadger/storage.go | 91 +++++ .../writecache/writecachebadger/util.go | 36 ++ .../writecache/writecachebbolt/cachebbolt.go | 146 +++++++ .../{ => writecachebbolt}/delete.go | 11 +- .../writecache/{ => writecachebbolt}/flush.go | 11 +- .../writecache/writecachebbolt/flush_test.go | 106 +++++ .../writecachebbolt/generic_test.go | 18 + .../writecache/{ => writecachebbolt}/get.go | 9 +- .../{ => writecachebbolt}/iterate.go | 2 +- .../writecache/writecachebbolt/mode.go | 75 ++++ .../{ => writecachebbolt}/options.go | 7 +- .../writecache/{ => writecachebbolt}/put.go | 11 +- .../writecache/{ => writecachebbolt}/state.go | 2 +- .../{ => writecachebbolt}/storage.go | 7 +- .../writecache/{ => writecachebbolt}/util.go | 2 +- .../writecache/writecachetest/flush.go | 185 +++++++++ 56 files changed, 2215 insertions(+), 747 deletions(-) create mode 100644 pkg/local_object_storage/writecache/benchmark/writecache_test.go create mode 100644 pkg/local_object_storage/writecache/config/config.go delete mode 100644 pkg/local_object_storage/writecache/doc.go delete mode 100644 pkg/local_object_storage/writecache/flush_test.go delete mode 100644 pkg/local_object_storage/writecache/generic_test.go create mode 100644 pkg/local_object_storage/writecache/writecachebadger/cachebadger.go create mode 100644 pkg/local_object_storage/writecache/writecachebadger/delete.go create mode 100644 pkg/local_object_storage/writecache/writecachebadger/flush.go create mode 100644 pkg/local_object_storage/writecache/writecachebadger/flush_test.go create mode 100644 pkg/local_object_storage/writecache/writecachebadger/gc.go create mode 100644 pkg/local_object_storage/writecache/writecachebadger/generic_test.go create mode 100644 pkg/local_object_storage/writecache/writecachebadger/get.go create mode 100644 pkg/local_object_storage/writecache/writecachebadger/iterate.go rename pkg/local_object_storage/writecache/{ => writecachebadger}/mode.go (82%) create mode 100644 pkg/local_object_storage/writecache/writecachebadger/options.go create mode 100644 pkg/local_object_storage/writecache/writecachebadger/put.go create mode 100644 pkg/local_object_storage/writecache/writecachebadger/state.go create mode 100644 pkg/local_object_storage/writecache/writecachebadger/storage.go create mode 100644 pkg/local_object_storage/writecache/writecachebadger/util.go create mode 100644 pkg/local_object_storage/writecache/writecachebbolt/cachebbolt.go rename pkg/local_object_storage/writecache/{ => writecachebbolt}/delete.go (87%) rename pkg/local_object_storage/writecache/{ => writecachebbolt}/flush.go (94%) create mode 100644 pkg/local_object_storage/writecache/writecachebbolt/flush_test.go create mode 100644 pkg/local_object_storage/writecache/writecachebbolt/generic_test.go rename pkg/local_object_storage/writecache/{ => writecachebbolt}/get.go (92%) rename pkg/local_object_storage/writecache/{ => writecachebbolt}/iterate.go (97%) create mode 100644 pkg/local_object_storage/writecache/writecachebbolt/mode.go rename pkg/local_object_storage/writecache/{ => writecachebbolt}/options.go (96%) rename pkg/local_object_storage/writecache/{ => writecachebbolt}/put.go (91%) rename pkg/local_object_storage/writecache/{ => writecachebbolt}/state.go (98%) rename pkg/local_object_storage/writecache/{ => writecachebbolt}/storage.go (94%) rename pkg/local_object_storage/writecache/{ => writecachebbolt}/util.go (95%) create mode 100644 pkg/local_object_storage/writecache/writecachetest/flush.go diff --git a/cmd/frostfs-lens/internal/flags.go b/cmd/frostfs-lens/internal/flags.go index 8a987a2d..95710f7c 100644 --- a/cmd/frostfs-lens/internal/flags.go +++ b/cmd/frostfs-lens/internal/flags.go @@ -8,6 +8,7 @@ const ( flagAddress = "address" flagEnginePath = "path" flagOutFile = "out" + flagDBType = "dbtype" ) // AddAddressFlag adds the address flag to the passed cobra command. @@ -33,3 +34,9 @@ func AddOutputFileFlag(cmd *cobra.Command, v *string) { "File to save object payload") _ = cmd.MarkFlagFilename(flagOutFile) } + +// AddDBTypeFlag adds the DB type flag to the passed cobra command. +func AddDBTypeFlag(cmd *cobra.Command, v *string) { + cmd.Flags().StringVar(v, flagOutFile, "bbolt", + "Type of DB used by write cache (default: bbolt)") +} diff --git a/cmd/frostfs-lens/internal/writecache/inspect.go b/cmd/frostfs-lens/internal/writecache/inspect.go index 7d3c8ab2..1a733513 100644 --- a/cmd/frostfs-lens/internal/writecache/inspect.go +++ b/cmd/frostfs-lens/internal/writecache/inspect.go @@ -1,8 +1,13 @@ package writecache import ( + "fmt" + "os" + common "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-lens/internal" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache/writecachebadger" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache/writecachebbolt" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" "github.com/spf13/cobra" ) @@ -18,14 +23,34 @@ func init() { common.AddAddressFlag(inspectCMD, &vAddress) common.AddComponentPathFlag(inspectCMD, &vPath) common.AddOutputFileFlag(inspectCMD, &vOut) + common.AddDBTypeFlag(inspectCMD, &vDBType) } func inspectFunc(cmd *cobra.Command, _ []string) { - db := openWC(cmd) - defer db.Close() + var data []byte - data, err := writecache.Get(db, []byte(vAddress)) - common.ExitOnErr(cmd, common.Errf("could not fetch object: %w", err)) + switch vDBType { + case "bbolt": + db, err := writecachebbolt.OpenDB(vPath, true, os.OpenFile) + common.ExitOnErr(cmd, common.Errf("could not open write-cache db: %w", err)) + defer db.Close() + + data, err = writecachebbolt.Get(db, []byte(vAddress)) + common.ExitOnErr(cmd, common.Errf("could not fetch object: %w", err)) + + case "badger": + log, err := logger.NewLogger(&logger.Prm{}) + common.ExitOnErr(cmd, common.Errf("could not create logger: %w", err)) + + db, err := writecachebadger.OpenDB(vPath, true, log) + common.ExitOnErr(cmd, common.Errf("could not open write-cache db: %w", err)) + + data, err = writecachebadger.Get(db, []byte(vAddress)) + common.ExitOnErr(cmd, common.Errf("could not fetch object: %w", err)) + + default: + common.ExitOnErr(cmd, fmt.Errorf("invalid dbtype: %q (possible values: bbolt, badger)", vDBType)) + } var o objectSDK.Object common.ExitOnErr(cmd, common.Errf("could not unmarshal object: %w", o.Unmarshal(data))) diff --git a/cmd/frostfs-lens/internal/writecache/list.go b/cmd/frostfs-lens/internal/writecache/list.go index f6d0cfff..df02a82f 100644 --- a/cmd/frostfs-lens/internal/writecache/list.go +++ b/cmd/frostfs-lens/internal/writecache/list.go @@ -3,9 +3,12 @@ package writecache import ( "fmt" "io" + "os" common "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-lens/internal" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache/writecachebadger" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache/writecachebbolt" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "github.com/spf13/cobra" ) @@ -30,9 +33,26 @@ func listFunc(cmd *cobra.Command, _ []string) { return err } - db := openWC(cmd) - defer db.Close() + switch vDBType { + case "bbolt": + db, err := writecachebbolt.OpenDB(vPath, true, os.OpenFile) + common.ExitOnErr(cmd, common.Errf("could not open write-cache db: %w", err)) + defer db.Close() - err := writecache.IterateDB(db, wAddr) - common.ExitOnErr(cmd, common.Errf("write-cache iterator failure: %w", err)) + err = writecachebbolt.IterateDB(db, wAddr) + common.ExitOnErr(cmd, common.Errf("write-cache iterator failure: %w", err)) + + case "badger": + log, err := logger.NewLogger(&logger.Prm{}) + common.ExitOnErr(cmd, common.Errf("could not create logger: %w", err)) + + db, err := writecachebadger.OpenDB(vPath, true, log) + common.ExitOnErr(cmd, common.Errf("could not open write-cache db: %w", err)) + + err = writecachebadger.IterateDB(db, wAddr) + common.ExitOnErr(cmd, common.Errf("write-cache iterator failure: %w", err)) + + default: + common.ExitOnErr(cmd, fmt.Errorf("invalid dbtype: %q (possible values: bbolt, badger)", vDBType)) + } } diff --git a/cmd/frostfs-lens/internal/writecache/root.go b/cmd/frostfs-lens/internal/writecache/root.go index 4a130584..11a8bb96 100644 --- a/cmd/frostfs-lens/internal/writecache/root.go +++ b/cmd/frostfs-lens/internal/writecache/root.go @@ -1,18 +1,14 @@ package writecache import ( - "os" - - common "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-lens/internal" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache" "github.com/spf13/cobra" - "go.etcd.io/bbolt" ) var ( vAddress string vPath string vOut string + vDBType string ) // Root contains `write-cache` command definition. @@ -24,10 +20,3 @@ var Root = &cobra.Command{ func init() { Root.AddCommand(listCMD, inspectCMD) } - -func openWC(cmd *cobra.Command) *bbolt.DB { - db, err := writecache.OpenDB(vPath, true, os.OpenFile) - common.ExitOnErr(cmd, common.Errf("could not open write-cache db: %w", err)) - - return db -} diff --git a/cmd/frostfs-node/config.go b/cmd/frostfs-node/config.go index 283cf501..2a84805d 100644 --- a/cmd/frostfs-node/config.go +++ b/cmd/frostfs-node/config.go @@ -40,7 +40,9 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard" shardmode "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache" + writecacheconfig "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache/config" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache/writecachebadger" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache/writecachebbolt" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/metrics" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client" containerClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/container" @@ -127,6 +129,7 @@ type shardCfg struct { writecacheCfg struct { enabled bool + typ writecacheconfig.Type path string maxBatchSize int maxBatchDelay time.Duration @@ -135,6 +138,7 @@ type shardCfg struct { flushWorkerCount int sizeLimit uint64 noSync bool + gcInterval time.Duration } piloramaCfg struct { @@ -238,6 +242,7 @@ func (a *applicationConfiguration) setShardWriteCacheConfig(newConfig *shardCfg, wc := &newConfig.writecacheCfg wc.enabled = true + wc.typ = writeCacheCfg.Type() wc.path = writeCacheCfg.Path() wc.maxBatchSize = writeCacheCfg.BoltDB().MaxBatchSize() wc.maxBatchDelay = writeCacheCfg.BoltDB().MaxBatchDelay() @@ -246,6 +251,7 @@ func (a *applicationConfiguration) setShardWriteCacheConfig(newConfig *shardCfg, wc.flushWorkerCount = writeCacheCfg.WorkersNumber() wc.sizeLimit = writeCacheCfg.SizeLimit() wc.noSync = writeCacheCfg.NoSync() + wc.gcInterval = writeCacheCfg.GCInterval() } } @@ -704,20 +710,37 @@ func (c *cfg) shardOpts() []shardOptsWithID { return shards } -func (c *cfg) getWriteCacheOpts(shCfg shardCfg) []writecache.Option { - var writeCacheOpts []writecache.Option +func (c *cfg) getWriteCacheOpts(shCfg shardCfg) writecacheconfig.Options { + var writeCacheOpts writecacheconfig.Options if wcRead := shCfg.writecacheCfg; wcRead.enabled { - writeCacheOpts = append(writeCacheOpts, - writecache.WithPath(wcRead.path), - writecache.WithMaxBatchSize(wcRead.maxBatchSize), - writecache.WithMaxBatchDelay(wcRead.maxBatchDelay), - writecache.WithMaxObjectSize(wcRead.maxObjSize), - writecache.WithSmallObjectSize(wcRead.smallObjectSize), - writecache.WithFlushWorkersCount(wcRead.flushWorkerCount), - writecache.WithMaxCacheSize(wcRead.sizeLimit), - writecache.WithNoSync(wcRead.noSync), - writecache.WithLogger(c.log), - ) + switch wcRead.typ { + case writecacheconfig.TypeBBolt: + writeCacheOpts.Type = writecacheconfig.TypeBBolt + writeCacheOpts.BBoltOptions = append(writeCacheOpts.BBoltOptions, + writecachebbolt.WithPath(wcRead.path), + writecachebbolt.WithMaxBatchSize(wcRead.maxBatchSize), + writecachebbolt.WithMaxBatchDelay(wcRead.maxBatchDelay), + writecachebbolt.WithMaxObjectSize(wcRead.maxObjSize), + writecachebbolt.WithSmallObjectSize(wcRead.smallObjectSize), + writecachebbolt.WithFlushWorkersCount(wcRead.flushWorkerCount), + writecachebbolt.WithMaxCacheSize(wcRead.sizeLimit), + writecachebbolt.WithNoSync(wcRead.noSync), + writecachebbolt.WithLogger(c.log), + ) + case writecacheconfig.TypeBadger: + writeCacheOpts.Type = writecacheconfig.TypeBBolt + writeCacheOpts.BadgerOptions = append(writeCacheOpts.BadgerOptions, + writecachebadger.WithPath(wcRead.path), + writecachebadger.WithMaxObjectSize(wcRead.maxObjSize), + writecachebadger.WithFlushWorkersCount(wcRead.flushWorkerCount), + writecachebadger.WithMaxCacheSize(wcRead.sizeLimit), + writecachebadger.WithNoSync(wcRead.noSync), + writecachebadger.WithLogger(c.log), + writecachebadger.WithGCInterval(wcRead.gcInterval), + ) + default: + panic(fmt.Sprintf("unknown writecache type: %q", wcRead.typ)) + } } return writeCacheOpts } @@ -836,7 +859,7 @@ func (c *cfg) getShardOpts(shCfg shardCfg) shardOptsWithID { shard.WithMetaBaseOptions(mbOptions...), shard.WithPiloramaOptions(piloramaOpts...), shard.WithWriteCache(shCfg.writecacheCfg.enabled), - shard.WithWriteCacheOptions(writeCacheOpts...), + shard.WithWriteCacheOptions(writeCacheOpts), shard.WithRemoverBatchSize(shCfg.gcCfg.removerBatchSize), shard.WithGCRemoverSleepInterval(shCfg.gcCfg.removerSleepInterval), shard.WithExpiredCollectorBatchSize(shCfg.gcCfg.expiredCollectorBatchSize), diff --git a/cmd/frostfs-node/config/engine/shard/writecache/config.go b/cmd/frostfs-node/config/engine/shard/writecache/config.go index c003cefa..504fe3ca 100644 --- a/cmd/frostfs-node/config/engine/shard/writecache/config.go +++ b/cmd/frostfs-node/config/engine/shard/writecache/config.go @@ -1,8 +1,12 @@ package writecacheconfig import ( + "fmt" + "time" + "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config" boltdbconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/boltdb" + writecacheconfig "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache/config" ) // Config is a wrapper over the config section @@ -21,6 +25,9 @@ const ( // SizeLimitDefault is a default write-cache size limit. SizeLimitDefault = 1 << 30 + + // DefaultGCInterval is the default duration of the GC cycle interval. + DefaultGCInterval = 1 * time.Minute ) // From wraps config section into Config. @@ -35,6 +42,22 @@ func (x *Config) Enabled() bool { return config.Bool((*config.Config)(x), "enabled") } +// Type returns the writecache implementation type to use. +// +// Panics if the type is not recognized. +func (x *Config) Type() writecacheconfig.Type { + t := config.String((*config.Config)(x), "type") + + switch t { + case "bbolt", "": + return writecacheconfig.TypeBBolt + case "badger": + return writecacheconfig.TypeBadger + } + + panic(fmt.Sprintf("invalid writecache type: %q", t)) +} + // Path returns the value of "path" config parameter. // // Panics if the value is not a non-empty string. @@ -126,3 +149,16 @@ func (x *Config) NoSync() bool { func (x *Config) BoltDB() *boltdbconfig.Config { return (*boltdbconfig.Config)(x) } + +// GCInterval returns the value of "gc_interval" config parameter. +// +// Returns DefaultGCInterval if the value is not a positive duration. +func (x *Config) GCInterval() time.Duration { + d := config.DurationSafe((*config.Config)(x), "gc_interval") + + if d > 0 { + return d + } + + return DefaultGCInterval +} diff --git a/config/example/node.json b/config/example/node.json index 6c98903f..6e995112 100644 --- a/config/example/node.json +++ b/config/example/node.json @@ -194,6 +194,7 @@ "resync_metabase": true, "writecache": { "enabled": true, + "type": "bbolt", "path": "tmp/1/cache", "memcache_capacity": 2147483648, "small_object_size": 16384, diff --git a/config/example/node.yaml b/config/example/node.yaml index 0ef5fea7..acce3741 100644 --- a/config/example/node.yaml +++ b/config/example/node.yaml @@ -122,6 +122,7 @@ storage: writecache: enabled: true + type: bbolt small_object_size: 16k # size threshold for "small" objects which are cached in key-value DB, not in FS, bytes max_object_size: 134217728 # size threshold for "big" objects which bypass write-cache and go to the storage directly, bytes workers_number: 30 # number of write-cache flusher threads diff --git a/docs/storage-node-configuration.md b/docs/storage-node-configuration.md index 439edf59..2e2d0408 100644 --- a/docs/storage-node-configuration.md +++ b/docs/storage-node-configuration.md @@ -273,6 +273,7 @@ metabase: ```yaml writecache: enabled: true + type: bbolt path: /path/to/writecache capacity: 4294967296 small_object_size: 16384 @@ -282,6 +283,7 @@ writecache: | Parameter | Type | Default value | Description | |----------------------|------------|---------------|----------------------------------------------------------------------------------------------------------------------| +| `type` | `string` | | Type of write cache backing implementation to use (`bbolt`, `badger`). | | `path` | `string` | | Path to the metabase file. | | `capacity` | `size` | unrestricted | Approximate maximum size of the writecache. If the writecache is full, objects are written to the blobstor directly. | | `small_object_size` | `size` | `32K` | Maximum object size for "small" objects. This objects are stored in a key-value database instead of a file-system. | diff --git a/go.mod b/go.mod index f6be26a9..7ad0fad8 100644 --- a/go.mod +++ b/go.mod @@ -41,6 +41,17 @@ require ( gopkg.in/yaml.v3 v3.0.1 ) +require ( + github.com/dgraph-io/ristretto v0.1.1 // indirect + github.com/dustin/go-humanize v1.0.0 // indirect + github.com/gogo/protobuf v1.3.2 // indirect + github.com/golang/glog v1.1.0 // indirect + github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect + github.com/google/flatbuffers v1.12.1 // indirect + github.com/pkg/errors v0.9.1 // indirect + go.opencensus.io v0.24.0 // indirect +) + require ( git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0 // indirect git.frostfs.info/TrueCloudLab/rfc6979 v0.4.0 // indirect @@ -55,6 +66,7 @@ require ( github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 // indirect + github.com/dgraph-io/badger/v4 v4.1.0 github.com/fsnotify/fsnotify v1.6.0 // indirect github.com/go-logr/logr v1.2.4 // indirect github.com/go-logr/stdr v1.2.2 // indirect diff --git a/go.sum b/go.sum index e13be1d2d3f97af38ddf8fc1fa685273b2fc4b06..07411ad4a88c83feac95e4d0fb80430f51bea044 100644 GIT binary patch delta 1404 zcma)*yRX}17{(QbQk@QfdJ!O1Iuk=plFy(0`GQn|<6NBNB(dW-jrd*+JQscgV3vQ{XNg~e$V^MSGS&(w|;nt4htvJ z!)4VC>W02>tf-!mml;{ZYv6K;f7T+Hl?iNY%uTdtZ_0|zUJbH(LeevCj}bfPzwz*#^-0s?e;RZ3?8%RtMtEO-u+EH-mKoYzIRxikxYBD*`M+xK!Y zqp(~=&CY6jrR7Lg-2Wx_hT7pDx$7afN;DD`KW?vVZ#Etb#l2z5ZU;0RMTK1278sH9 zW#~@we}_!t#P!b^m#L@wZed+!rzXAX(p2^*vBvwGwxADS-E~Izk|Jk>z|r{13=ApV zS)Q}rl7$xhH?{oo;w~m6QDU^sInFAm!_&Ry)-x5k!O-0;YC+WZWy&6C3T2Jf$M;e3 zUhTZjQHGnDi5R6#7-+U?`16d$g3z3pQhRP0+Da{4g3J~kd0aL>t$e--oO6OxhHKji zZOxMAL{FUuv&j_2ZIflFf)FNQF2yXDP$Jp&>{KL%C1^b;@3ik9ziW*u24ejD=vqR0*74i&7KH;@(Xyjb5huxQ!20v=i=+O}2V zt@|hP-H&brmcO=o<)Cx>$Ifqe8ums{jMTQRD1Mm-zB-k`bdzrAP#Ol!LLFaCa^!ls z7H_aLj%0i?>p+2AIaJr{kJC#VIX<-Y;CW;7f2nGPG$I_G9G$&AW2jK3oL>Fh%#~V>9?O$u6{y~D&Nfh1_n08&;S4c delta 76 zcmV-S0JHzTlLo(q2CyA9lYBTKvp6_R53|%q3IUU 0) - require.NoError(t, wc.Flush(context.Background(), true)) - - check(t, mb, bs, objects) - } - t.Run("db, invalid address", func(t *testing.T) { - testIgnoreErrors(t, func(c *cache) { - _, data := newObject(t, 1) - require.NoError(t, c.db.Batch(func(tx *bbolt.Tx) error { - b := tx.Bucket(defaultBucket) - return b.Put([]byte{1, 2, 3}, data) - })) - }) - }) - t.Run("db, invalid object", func(t *testing.T) { - testIgnoreErrors(t, func(c *cache) { - require.NoError(t, c.db.Batch(func(tx *bbolt.Tx) error { - b := tx.Bucket(defaultBucket) - return b.Put([]byte(oidtest.Address().EncodeToString()), []byte{1, 2, 3}) - })) - }) - }) - t.Run("fs, read error", func(t *testing.T) { - testIgnoreErrors(t, func(c *cache) { - obj, data := newObject(t, 1) - - var prm common.PutPrm - prm.Address = objectCore.AddressOf(obj) - prm.RawData = data - - _, err := c.fsTree.Put(context.Background(), prm) - require.NoError(t, err) - - p := prm.Address.Object().EncodeToString() + "." + prm.Address.Container().EncodeToString() - p = filepath.Join(c.fsTree.RootPath, p[:1], p[1:]) - - _, err = os.Stat(p) // sanity check - require.NoError(t, err) - require.NoError(t, os.Truncate(p, 0)) // corrupt the file contents, so that it can't be unmarshalled - }) - }) - t.Run("fs, invalid object", func(t *testing.T) { - testIgnoreErrors(t, func(c *cache) { - var prm common.PutPrm - prm.Address = oidtest.Address() - prm.RawData = []byte{1, 2, 3} - _, err := c.fsTree.Put(context.Background(), prm) - require.NoError(t, err) - }) - }) - }) -} - -func putObject(t *testing.T, c Cache, size int) objectPair { - obj, data := newObject(t, size) - - var prm common.PutPrm - prm.Address = objectCore.AddressOf(obj) - prm.Object = obj - prm.RawData = data - - _, err := c.Put(context.Background(), prm) - require.NoError(t, err) - - return objectPair{prm.Address, prm.Object} - -} - -func newObject(t *testing.T, size int) (*objectSDK.Object, []byte) { - obj := objectSDK.New() - ver := versionSDK.Current() - - obj.SetID(oidtest.ID()) - obj.SetOwnerID(usertest.ID()) - obj.SetContainerID(cidtest.ID()) - obj.SetType(objectSDK.TypeRegular) - obj.SetVersion(&ver) - obj.SetPayloadChecksum(checksumtest.Checksum()) - obj.SetPayloadHomomorphicHash(checksumtest.Checksum()) - obj.SetPayload(make([]byte, size)) - - data, err := obj.Marshal() - require.NoError(t, err) - return obj, data -} - -type dummyEpoch struct{} - -func (dummyEpoch) CurrentEpoch() uint64 { - return 0 -} diff --git a/pkg/local_object_storage/writecache/generic_test.go b/pkg/local_object_storage/writecache/generic_test.go deleted file mode 100644 index 53d6624b..00000000 --- a/pkg/local_object_storage/writecache/generic_test.go +++ /dev/null @@ -1,30 +0,0 @@ -package writecache - -import ( - "os" - "path/filepath" - "strconv" - "testing" - - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/storagetest" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" - "github.com/stretchr/testify/require" - "go.uber.org/zap/zaptest" -) - -func TestGeneric(t *testing.T) { - defer func() { _ = os.RemoveAll(t.Name()) }() - - var n int - newCache := func(t *testing.T) storagetest.Component { - n++ - dir := filepath.Join(t.Name(), strconv.Itoa(n)) - require.NoError(t, os.MkdirAll(dir, os.ModePerm)) - return New( - WithLogger(&logger.Logger{Logger: zaptest.NewLogger(t)}), - WithFlushWorkersCount(2), - WithPath(dir)) - } - - storagetest.TestAll(t, newCache) -} diff --git a/pkg/local_object_storage/writecache/metrics.go b/pkg/local_object_storage/writecache/metrics.go index 957bf277..5eac0669 100644 --- a/pkg/local_object_storage/writecache/metrics.go +++ b/pkg/local_object_storage/writecache/metrics.go @@ -31,22 +31,24 @@ type Metrics interface { Close() } +func DefaultMetrics() Metrics { return metricsStub{} } + type metricsStub struct{} -func (s *metricsStub) Get(time.Duration, bool, StorageType) {} +func (metricsStub) Get(time.Duration, bool, StorageType) {} -func (s *metricsStub) Delete(time.Duration, bool, StorageType) {} +func (metricsStub) Delete(time.Duration, bool, StorageType) {} -func (s *metricsStub) Put(time.Duration, bool, StorageType) {} +func (metricsStub) Put(time.Duration, bool, StorageType) {} -func (s *metricsStub) SetEstimateSize(uint64, uint64) {} +func (metricsStub) SetEstimateSize(uint64, uint64) {} -func (s *metricsStub) SetMode(mode.Mode) {} +func (metricsStub) SetMode(mode.Mode) {} -func (s *metricsStub) SetActualCounters(uint64, uint64) {} +func (metricsStub) SetActualCounters(uint64, uint64) {} -func (s *metricsStub) Flush(bool, StorageType) {} +func (metricsStub) Flush(bool, StorageType) {} -func (s *metricsStub) Evict(StorageType) {} +func (metricsStub) Evict(StorageType) {} -func (s *metricsStub) Close() {} +func (metricsStub) Close() {} diff --git a/pkg/local_object_storage/writecache/writecache.go b/pkg/local_object_storage/writecache/writecache.go index 067ff5ae..084c9a3a 100644 --- a/pkg/local_object_storage/writecache/writecache.go +++ b/pkg/local_object_storage/writecache/writecache.go @@ -2,18 +2,14 @@ package writecache import ( "context" - "os" - "sync" + "errors" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" - "go.etcd.io/bbolt" - "go.uber.org/zap" ) // Info groups the information about write-cache. @@ -44,133 +40,13 @@ type Cache interface { Close() error } -type cache struct { - options - - // mtx protects statistics, counters and compressFlags. - mtx sync.RWMutex - - mode mode.Mode - modeMtx sync.RWMutex - - // compressFlags maps address of a big object to boolean value indicating - // whether object should be compressed. - compressFlags map[string]struct{} - - // flushCh is a channel with objects to flush. - flushCh chan *objectSDK.Object - // closeCh is close channel, protected by modeMtx. - closeCh chan struct{} - // wg is a wait group for flush workers. - wg sync.WaitGroup - // store contains underlying database. - store - // fsTree contains big files stored directly on file-system. - fsTree *fstree.FSTree -} - -// wcStorageType is used for write-cache operations logging. -const wcStorageType = "write-cache" - -type objectInfo struct { - addr string - data []byte - obj *objectSDK.Object -} - -const ( - defaultMaxObjectSize = 64 * 1024 * 1024 // 64 MiB - defaultSmallObjectSize = 32 * 1024 // 32 KiB - defaultMaxCacheSize = 1 << 30 // 1 GiB -) - var ( - defaultBucket = []byte{0} + // ErrReadOnly is returned when Put/Write is performed in a read-only mode. + ErrReadOnly = logicerr.New("write-cache is in read-only mode") + // ErrNotInitialized is returned when write-cache is initializing. + ErrNotInitialized = logicerr.New("write-cache is not initialized yet") + // ErrBigObject is returned when object is too big to be placed in cache. + ErrBigObject = errors.New("too big object") + // ErrOutOfSpace is returned when there is no space left to put a new object. + ErrOutOfSpace = errors.New("no space left in the write cache") ) - -// New creates new writecache instance. -func New(opts ...Option) Cache { - c := &cache{ - flushCh: make(chan *objectSDK.Object), - mode: mode.ReadWrite, - - compressFlags: make(map[string]struct{}), - options: options{ - log: &logger.Logger{Logger: zap.NewNop()}, - maxObjectSize: defaultMaxObjectSize, - smallObjectSize: defaultSmallObjectSize, - workersCount: defaultFlushWorkersCount, - maxCacheSize: defaultMaxCacheSize, - maxBatchSize: bbolt.DefaultMaxBatchSize, - maxBatchDelay: bbolt.DefaultMaxBatchDelay, - openFile: os.OpenFile, - metrics: &metricsStub{}, - }, - } - - for i := range opts { - opts[i](&c.options) - } - - return c -} - -// SetLogger sets logger. It is used after the shard ID was generated to use it in logs. -func (c *cache) SetLogger(l *logger.Logger) { - c.log = l -} - -func (c *cache) DumpInfo() Info { - return Info{ - Path: c.path, - } -} - -// Open opens and initializes database. Reads object counters from the ObjectCounters instance. -func (c *cache) Open(readOnly bool) error { - err := c.openStore(readOnly) - if err != nil { - return metaerr.Wrap(err) - } - - // Opening after Close is done during maintenance mode, - // thus we need to create a channel here. - c.closeCh = make(chan struct{}) - - return metaerr.Wrap(c.initCounters()) -} - -// Init runs necessary services. -func (c *cache) Init() error { - c.metrics.SetMode(c.mode) - c.runFlushLoop() - return nil -} - -// Close closes db connection and stops services. Executes ObjectCounters.FlushAndClose op. -func (c *cache) Close() error { - // We cannot lock mutex for the whole operation duration - // because it is taken by some background workers, so `wg.Wait()` is done without modeMtx. - c.modeMtx.Lock() - if c.closeCh != nil { - close(c.closeCh) - } - c.mode = mode.DegradedReadOnly // prevent new operations from being processed - c.modeMtx.Unlock() - - c.wg.Wait() - - c.modeMtx.Lock() - defer c.modeMtx.Unlock() - - c.closeCh = nil - var err error - if c.db != nil { - err = c.db.Close() - if err != nil { - c.db = nil - } - } - c.metrics.Close() - return nil -} diff --git a/pkg/local_object_storage/writecache/writecachebadger/cachebadger.go b/pkg/local_object_storage/writecache/writecachebadger/cachebadger.go new file mode 100644 index 00000000..837e76a0 --- /dev/null +++ b/pkg/local_object_storage/writecache/writecachebadger/cachebadger.go @@ -0,0 +1,129 @@ +package writecachebadger + +import ( + "sync" + + "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" + objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" + oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" + "go.uber.org/zap" +) + +type cache struct { + options + + mode mode.Mode + modeMtx sync.RWMutex + + // flushCh is a channel with objects to flush. + flushCh chan *objectSDK.Object + // closeCh is close channel, protected by modeMtx. + closeCh chan struct{} + // wg is a wait group for flush workers. + wg sync.WaitGroup + // store contains underlying database. + store +} + +// wcStorageType is used for write-cache operations logging. +const wcStorageType = "write-cache" + +type objectInfo struct { + addr oid.Address + data []byte + obj *objectSDK.Object +} + +const ( + defaultMaxObjectSize = 64 << 20 // 64 MiB + defaultSmallObjectSize = 32 << 10 // 32 KiB + defaultMaxCacheSize = 1 << 30 // 1 GiB +) + +// New creates new writecache instance. +func New(opts ...Option) writecache.Cache { + c := &cache{ + flushCh: make(chan *objectSDK.Object), + mode: mode.ReadWrite, + + options: options{ + log: &logger.Logger{Logger: zap.NewNop()}, + maxObjectSize: defaultMaxObjectSize, + workersCount: defaultFlushWorkersCount, + maxCacheSize: defaultMaxCacheSize, + metrics: writecache.DefaultMetrics(), + }, + } + + for i := range opts { + opts[i](&c.options) + } + + return c +} + +// SetLogger sets logger. It is used after the shard ID was generated to use it in logs. +func (c *cache) SetLogger(l *logger.Logger) { + c.log = l +} + +func (c *cache) DumpInfo() writecache.Info { + return writecache.Info{ + Path: c.path, + } +} + +// Open opens and initializes database. Reads object counters from the ObjectCounters instance. +func (c *cache) Open(readOnly bool) error { + err := c.openStore(readOnly) + if err != nil { + return metaerr.Wrap(err) + } + + // Opening after Close is done during maintenance mode, + // thus we need to create a channel here. + c.closeCh = make(chan struct{}) + + return metaerr.Wrap(c.initCounters()) +} + +// Init runs necessary services. +func (c *cache) Init() error { + c.log.Info(logs.WritecacheBadgerInitExperimental) + c.metrics.SetMode(c.mode) + c.runFlushLoop() + c.runGCLoop() + return nil +} + +// Close closes db connection and stops services. Executes ObjectCounters.FlushAndClose op. +func (c *cache) Close() error { + // We cannot lock mutex for the whole operation duration + // because it is taken by some background workers, so `wg.Wait()` is done without modeMtx. + c.modeMtx.Lock() + if c.closeCh != nil { + close(c.closeCh) + } + c.mode = mode.DegradedReadOnly // prevent new operations from being processed + c.modeMtx.Unlock() + + c.wg.Wait() + + c.modeMtx.Lock() + defer c.modeMtx.Unlock() + + c.closeCh = nil + var err error + if c.db != nil { + err = c.db.Close() + if err != nil { + c.db = nil + } + } + c.metrics.Close() + return nil +} diff --git a/pkg/local_object_storage/writecache/writecachebadger/delete.go b/pkg/local_object_storage/writecache/writecachebadger/delete.go new file mode 100644 index 00000000..1b46b2be --- /dev/null +++ b/pkg/local_object_storage/writecache/writecachebadger/delete.go @@ -0,0 +1,70 @@ +package writecachebadger + +import ( + "context" + "time" + + storagelog "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/log" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache" + "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" + apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" + oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" + "github.com/dgraph-io/badger/v4" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" +) + +// Delete removes object from write-cache. +// +// Returns an error of type apistatus.ObjectNotFound if object is missing in write-cache. +func (c *cache) Delete(ctx context.Context, addr oid.Address) error { + _, span := tracing.StartSpanFromContext(ctx, "writecache.Delete", + trace.WithAttributes( + attribute.String("address", addr.EncodeToString()), + )) + defer span.End() + + deleted := false + storageType := writecache.StorageTypeUndefined + startedAt := time.Now() + defer func() { + c.metrics.Delete(time.Since(startedAt), deleted, storageType) + }() + + c.modeMtx.RLock() + defer c.modeMtx.RUnlock() + if c.readOnly() { + return writecache.ErrReadOnly + } + + saddr := addr.EncodeToString() + + err := c.db.Update(func(tx *badger.Txn) error { + it, err := tx.Get([]byte(saddr)) + if err != nil { + if err == badger.ErrKeyNotFound { + return logicerr.Wrap(apistatus.ObjectNotFound{}) + } + return err + } + if it.ValueSize() > 0 { + storageType = writecache.StorageTypeDB + err := tx.Delete([]byte(saddr)) + if err == nil { + storagelog.Write(c.log, + storagelog.AddressField(saddr), + storagelog.StorageTypeField(wcStorageType), + storagelog.OpField("db DELETE"), + ) + deleted = true + c.objCounters.DecDB() + } + return err + } + return nil + }) + + return metaerr.Wrap(err) +} diff --git a/pkg/local_object_storage/writecache/writecachebadger/flush.go b/pkg/local_object_storage/writecache/writecachebadger/flush.go new file mode 100644 index 00000000..d8bdddb5 --- /dev/null +++ b/pkg/local_object_storage/writecache/writecachebadger/flush.go @@ -0,0 +1,257 @@ +package writecachebadger + +import ( + "bytes" + "context" + "encoding/hex" + "errors" + "fmt" + "time" + + "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" + objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr" + meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache" + "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" + objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" + "github.com/dgraph-io/badger/v4" + "github.com/mr-tron/base58" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" + "go.uber.org/zap" +) + +const ( + // flushBatchSize is amount of keys which will be read from cache to be flushed + // to the main storage. It is used to reduce contention between cache put + // and cache persist. + flushBatchSize = 512 + // defaultFlushWorkersCount is number of workers for putting objects in main storage. + defaultFlushWorkersCount = 20 + // defaultFlushInterval is default time interval between successive flushes. + defaultFlushInterval = time.Second +) + +// runFlushLoop starts background workers which periodically flush objects to the blobstor. +func (c *cache) runFlushLoop() { + for i := 0; i < c.workersCount; i++ { + c.wg.Add(1) + go c.workerFlushSmall() + } + + c.wg.Add(1) + go func() { + defer c.wg.Done() + + tt := time.NewTimer(defaultFlushInterval) + defer tt.Stop() + + for { + select { + case <-tt.C: + c.flushSmallObjects() + tt.Reset(defaultFlushInterval) + case <-c.closeCh: + return + } + } + }() +} + +func (c *cache) flushSmallObjects() { + var lastKey internalKey + var m []objectInfo + for { + select { + case <-c.closeCh: + return + default: + } + + m = m[:0] + + c.modeMtx.RLock() + if c.readOnly() { + c.modeMtx.RUnlock() + time.Sleep(time.Second) + continue + } + + _ = c.db.View(func(tx *badger.Txn) error { + it := tx.NewIterator(badger.DefaultIteratorOptions) + defer it.Close() + if len(lastKey) == 0 { + it.Rewind() + } else { + it.Seek(lastKey[:]) + if it.Valid() && bytes.Equal(it.Item().Key(), lastKey[:]) { + it.Next() + } + } + for ; it.Valid() && len(m) < flushBatchSize; it.Next() { + if got, want := int(it.Item().KeySize()), len(lastKey); got != want { + return fmt.Errorf("invalid db key len: got %d, want %d", got, want) + } + it.Item().KeyCopy(lastKey[:]) + value, err := it.Item().ValueCopy(nil) + if err != nil { + return err + } + m = append(m, objectInfo{ + addr: lastKey.address(), + data: value, + }) + } + return nil + }) + + var count int + for i := range m { + obj := objectSDK.New() + if err := obj.Unmarshal(m[i].data); err != nil { + continue + } + + count++ + select { + case c.flushCh <- obj: + case <-c.closeCh: + c.modeMtx.RUnlock() + return + } + } + + if count == 0 { + c.modeMtx.RUnlock() + break + } + + c.modeMtx.RUnlock() + + c.log.Debug(logs.WritecacheTriedToFlushItemsFromWritecache, + zap.Int("count", count), + zap.String("start", base58.Encode(lastKey[:]))) + } +} + +func (c *cache) reportFlushError(msg string, addr string, err error) { + if c.reportError != nil { + c.reportError(msg, err) + } else { + c.log.Error(msg, + zap.String("address", addr), + zap.Error(err)) + } +} + +// workerFlushSmall writes small objects to the main storage. +func (c *cache) workerFlushSmall() { + defer c.wg.Done() + + var obj *objectSDK.Object + for { + // Give priority to direct put. + select { + case obj = <-c.flushCh: + case <-c.closeCh: + return + } + + err := c.flushObject(context.TODO(), obj, nil, writecache.StorageTypeDB) + if err != nil { + // Error is handled in flushObject. + continue + } + + c.deleteFromDB([]string{objectCore.AddressOf(obj).EncodeToString()}) + } +} + +// flushObject is used to write object directly to the main storage. +func (c *cache) flushObject(ctx context.Context, obj *objectSDK.Object, data []byte, st writecache.StorageType) error { + var err error + + defer func() { + c.metrics.Flush(err == nil, st) + }() + + addr := objectCore.AddressOf(obj) + + var prm common.PutPrm + prm.Object = obj + prm.RawData = data + + res, err := c.blobstor.Put(ctx, prm) + if err != nil { + if !errors.Is(err, common.ErrNoSpace) && !errors.Is(err, common.ErrReadOnly) && + !errors.Is(err, blobstor.ErrNoPlaceFound) { + c.reportFlushError("can't flush an object to blobstor", + addr.EncodeToString(), err) + } + return err + } + + var updPrm meta.UpdateStorageIDPrm + updPrm.SetAddress(addr) + updPrm.SetStorageID(res.StorageID) + + _, err = c.metabase.UpdateStorageID(updPrm) + if err != nil { + c.reportFlushError("can't update object storage ID", + addr.EncodeToString(), err) + } + return err +} + +// Flush flushes all objects from the write-cache to the main storage. +// Write-cache must be in readonly mode to ensure correctness of an operation and +// to prevent interference with background flush workers. +func (c *cache) Flush(ctx context.Context, ignoreErrors bool) error { + ctx, span := tracing.StartSpanFromContext(ctx, "writecache.Flush", + trace.WithAttributes( + attribute.Bool("ignore_errors", ignoreErrors), + )) + defer span.End() + + c.modeMtx.RLock() + defer c.modeMtx.RUnlock() + + return c.flush(ctx, ignoreErrors) +} + +func (c *cache) flush(ctx context.Context, ignoreErrors bool) error { + return c.db.View(func(tx *badger.Txn) error { + it := tx.NewIterator(badger.DefaultIteratorOptions) + defer it.Close() + var key internalKey + for it.Rewind(); it.Valid(); it.Next() { + if got, want := int(it.Item().KeySize()), len(key); got != want { + err := fmt.Errorf("invalid db key len: got %d, want %d", got, want) + c.reportFlushError("can't decode object address from the DB", hex.EncodeToString(it.Item().Key()), metaerr.Wrap(err)) + if ignoreErrors { + continue + } + return err + } + if err := it.Item().Value(func(data []byte) error { + var obj objectSDK.Object + if err := obj.Unmarshal(data); err != nil { + copy(key[:], it.Item().Key()) + c.reportFlushError("can't unmarshal an object from the DB", key.address().EncodeToString(), metaerr.Wrap(err)) + if ignoreErrors { + return nil + } + return err + } + + return c.flushObject(ctx, &obj, data, writecache.StorageTypeDB) + }); err != nil { + return err + } + } + return nil + }) +} diff --git a/pkg/local_object_storage/writecache/writecachebadger/flush_test.go b/pkg/local_object_storage/writecache/writecachebadger/flush_test.go new file mode 100644 index 00000000..4d65d585 --- /dev/null +++ b/pkg/local_object_storage/writecache/writecachebadger/flush_test.go @@ -0,0 +1,65 @@ +package writecachebadger + +import ( + "path/filepath" + "sync/atomic" + "testing" + "time" + + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil" + meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache/writecachetest" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" + oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test" + "github.com/dgraph-io/badger/v4" + "github.com/stretchr/testify/require" + "go.uber.org/zap/zaptest" +) + +func TestFlush(t *testing.T) { + createCacheFn := func(t *testing.T, smallSize uint64, mb *meta.DB, bs *blobstor.BlobStor, opts ...Option) writecache.Cache { + return New( + append([]Option{ + WithLogger(&logger.Logger{Logger: zaptest.NewLogger(t)}), + WithPath(filepath.Join(t.TempDir(), "writecache")), + WithMetabase(mb), + WithBlobstor(bs), + WithGCInterval(1 * time.Second), + }, opts...)...) + } + + errCountOpt := func() (Option, *atomic.Uint32) { + cnt := &atomic.Uint32{} + return WithReportErrorFunc(func(string, error) { + cnt.Add(1) + }), cnt + } + + failures := []writecachetest.TestFailureInjector[Option]{ + { + Desc: "db, invalid address", + InjectFn: func(t *testing.T, wc writecache.Cache) { + c := wc.(*cache) + obj := testutil.GenerateObject() + data, err := obj.Marshal() + require.NoError(t, err) + require.NoError(t, c.db.Update(func(tx *badger.Txn) error { + return tx.Set([]byte{1, 2, 3}, data) + })) + }, + }, + { + Desc: "db, invalid object", + InjectFn: func(t *testing.T, wc writecache.Cache) { + c := wc.(*cache) + require.NoError(t, c.db.Update(func(tx *badger.Txn) error { + return tx.Set([]byte(oidtest.Address().EncodeToString()), []byte{1, 2, 3}) + })) + }, + }, + } + + writecachetest.TestFlush(t, createCacheFn, errCountOpt, failures...) +} diff --git a/pkg/local_object_storage/writecache/writecachebadger/gc.go b/pkg/local_object_storage/writecache/writecachebadger/gc.go new file mode 100644 index 00000000..51d3e976 --- /dev/null +++ b/pkg/local_object_storage/writecache/writecachebadger/gc.go @@ -0,0 +1,31 @@ +package writecachebadger + +import ( + "time" + + "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" +) + +func (c *cache) runGCLoop() { + c.wg.Add(1) + + go func() { + defer c.wg.Done() + + t := time.NewTicker(c.gcInterval) + defer t.Stop() + + for { + select { + case <-c.closeCh: + return + case <-t.C: + // 0.5 is the recommended value so that write amplification of the value log is 2. + // See https://pkg.go.dev/github.com/dgraph-io/badger/v4#DB.RunValueLogGC for more info. + for c.db.RunValueLogGC(0.5) == nil { + c.log.Debug(logs.WritecacheDBValueLogGCRunCompleted) + } + } + } + }() +} diff --git a/pkg/local_object_storage/writecache/writecachebadger/generic_test.go b/pkg/local_object_storage/writecache/writecachebadger/generic_test.go new file mode 100644 index 00000000..be0a40e0 --- /dev/null +++ b/pkg/local_object_storage/writecache/writecachebadger/generic_test.go @@ -0,0 +1,20 @@ +package writecachebadger + +import ( + "testing" + "time" + + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/storagetest" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" + "go.uber.org/zap/zaptest" +) + +func TestGeneric(t *testing.T) { + storagetest.TestAll(t, func(t *testing.T) storagetest.Component { + return New( + WithLogger(&logger.Logger{Logger: zaptest.NewLogger(t)}), + WithFlushWorkersCount(2), + WithPath(t.TempDir()), + WithGCInterval(1*time.Second)) + }) +} diff --git a/pkg/local_object_storage/writecache/writecachebadger/get.go b/pkg/local_object_storage/writecache/writecachebadger/get.go new file mode 100644 index 00000000..36896c56 --- /dev/null +++ b/pkg/local_object_storage/writecache/writecachebadger/get.go @@ -0,0 +1,95 @@ +package writecachebadger + +import ( + "context" + "time" + + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache" + "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" + apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" + objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" + oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" + "github.com/dgraph-io/badger/v4" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" +) + +// Get returns object from write-cache. +// +// Returns an error of type apistatus.ObjectNotFound if the requested object is missing in write-cache. +func (c *cache) Get(ctx context.Context, addr oid.Address) (*objectSDK.Object, error) { + _, span := tracing.StartSpanFromContext(ctx, "writecache.Get", + trace.WithAttributes( + attribute.String("address", addr.EncodeToString()), + )) + defer span.End() + + obj, err := c.getInternal(addr) + return obj, metaerr.Wrap(err) +} + +func (c *cache) getInternal(addr oid.Address) (*objectSDK.Object, error) { + found := false + storageType := writecache.StorageTypeUndefined + startedAt := time.Now() + defer func() { + c.metrics.Get(time.Since(startedAt), found, storageType) + }() + + k := addr2key(addr) + value, err := Get(c.db, k[:]) + if err == nil { + obj := objectSDK.New() + found = true + storageType = writecache.StorageTypeDB + return obj, obj.Unmarshal(value) + } + + return nil, logicerr.Wrap(apistatus.ObjectNotFound{}) +} + +// Head returns object header from write-cache. +// +// Returns an error of type apistatus.ObjectNotFound if the requested object is missing in write-cache. +func (c *cache) Head(ctx context.Context, addr oid.Address) (*objectSDK.Object, error) { + _, span := tracing.StartSpanFromContext(ctx, "writecache.Head", + trace.WithAttributes( + attribute.String("address", addr.EncodeToString()), + )) + defer span.End() + + obj, err := c.getInternal(addr) + if err != nil { + return nil, metaerr.Wrap(err) + } + + return obj.CutPayload(), nil +} + +// Get fetches object from the underlying database. +// Key should be a stringified address. +// +// Returns an error of type apistatus.ObjectNotFound if the requested object is missing in db. +func Get(db *badger.DB, key []byte) ([]byte, error) { + var value []byte + + err := db.View(func(tx *badger.Txn) error { + it, err := tx.Get(key) + if err != nil { + if err == badger.ErrKeyNotFound { + return logicerr.Wrap(apistatus.ObjectNotFound{}) + } + return err + } + v, err := it.ValueCopy(nil) + if err != nil { + return err + } + value = v + return nil + }) + + return value, metaerr.Wrap(err) +} diff --git a/pkg/local_object_storage/writecache/writecachebadger/iterate.go b/pkg/local_object_storage/writecache/writecachebadger/iterate.go new file mode 100644 index 00000000..11124204 --- /dev/null +++ b/pkg/local_object_storage/writecache/writecachebadger/iterate.go @@ -0,0 +1,32 @@ +package writecachebadger + +import ( + "fmt" + + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr" + oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" + "github.com/dgraph-io/badger/v4" +) + +// IterateDB iterates over all objects stored in badger.DB instance and passes them to f until error return. +// It is assumed that db is an underlying database of some WriteCache instance. +// +// DB must not be nil and should be opened. +func IterateDB(db *badger.DB, f func(oid.Address) error) error { + return metaerr.Wrap(db.View(func(tx *badger.Txn) error { + opts := badger.DefaultIteratorOptions + opts.PrefetchValues = false + it := tx.NewIterator(opts) + for it.Rewind(); it.Valid(); it.Next() { + var key internalKey + if got, want := len(it.Item().Key()), len(key); got != want { + return fmt.Errorf("invalid db key len: got %d, want %d", got, want) + } + copy(key[:], it.Item().Key()) + if err := f(key.address()); err != nil { + return err + } + } + return nil + })) +} diff --git a/pkg/local_object_storage/writecache/mode.go b/pkg/local_object_storage/writecache/writecachebadger/mode.go similarity index 82% rename from pkg/local_object_storage/writecache/mode.go rename to pkg/local_object_storage/writecache/writecachebadger/mode.go index bdbbec7c..9a39fa41 100644 --- a/pkg/local_object_storage/writecache/mode.go +++ b/pkg/local_object_storage/writecache/writecachebadger/mode.go @@ -1,4 +1,4 @@ -package writecache +package writecachebadger import ( "context" @@ -7,18 +7,11 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" ) -// ErrReadOnly is returned when Put/Write is performed in a read-only mode. -var ErrReadOnly = logicerr.New("write-cache is in read-only mode") - -// ErrNotInitialized is returned when write-cache is initializing. -var ErrNotInitialized = logicerr.New("write-cache is not initialized yet") - // SetMode sets write-cache mode of operation. // When shard is put in read-only mode all objects in memory are flushed to disk // and all background jobs are suspended. diff --git a/pkg/local_object_storage/writecache/writecachebadger/options.go b/pkg/local_object_storage/writecache/writecachebadger/options.go new file mode 100644 index 00000000..635c1418 --- /dev/null +++ b/pkg/local_object_storage/writecache/writecachebadger/options.go @@ -0,0 +1,141 @@ +package writecachebadger + +import ( + "context" + "time" + + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" + meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" + objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" + "go.uber.org/zap" +) + +// Option represents write-cache configuration option. +type Option func(*options) + +// meta is an interface for a metabase. +type metabase interface { + Exists(context.Context, meta.ExistsPrm) (meta.ExistsRes, error) + StorageID(context.Context, meta.StorageIDPrm) (meta.StorageIDRes, error) + UpdateStorageID(meta.UpdateStorageIDPrm) (meta.UpdateStorageIDRes, error) +} + +// blob is an interface for the blobstor. +type blob interface { + Put(context.Context, common.PutPrm) (common.PutRes, error) + NeedsCompression(obj *objectSDK.Object) bool + Exists(ctx context.Context, res common.ExistsPrm) (common.ExistsRes, error) +} + +type options struct { + log *logger.Logger + // path is a path to a directory for write-cache. + path string + // blobstor is the main persistent storage. + blobstor blob + // metabase is the metabase instance. + metabase metabase + // maxObjectSize is the maximum size of the object stored in the write-cache. + maxObjectSize uint64 + // workersCount is the number of workers flushing objects in parallel. + workersCount int + // maxCacheSize is the maximum total size of all objects saved in cache (DB + FS). + // 1 GiB by default. + maxCacheSize uint64 + // objCounters contains atomic counters for the number of objects stored in cache. + objCounters counters + // noSync is true iff FSTree allows unsynchronized writes. + noSync bool + // reportError is the function called when encountering disk errors in background workers. + reportError func(string, error) + // metrics is metrics implementation + metrics writecache.Metrics + // gcInterval is the interval duration to run the GC cycle. + gcInterval time.Duration +} + +// WithLogger sets logger. +func WithLogger(log *logger.Logger) Option { + return func(o *options) { + o.log = &logger.Logger{Logger: log.With(zap.String("component", "WriteCache"))} + } +} + +// WithPath sets path to writecache db. +func WithPath(path string) Option { + return func(o *options) { + o.path = path + } +} + +// WithBlobstor sets main object storage. +func WithBlobstor(bs *blobstor.BlobStor) Option { + return func(o *options) { + o.blobstor = bs + } +} + +// WithMetabase sets metabase. +func WithMetabase(db *meta.DB) Option { + return func(o *options) { + o.metabase = db + } +} + +// WithMaxObjectSize sets maximum object size to be stored in write-cache. +func WithMaxObjectSize(sz uint64) Option { + return func(o *options) { + if sz > 0 { + o.maxObjectSize = sz + } + } +} + +func WithFlushWorkersCount(c int) Option { + return func(o *options) { + if c > 0 { + o.workersCount = c + } + } +} + +// WithMaxCacheSize sets maximum write-cache size in bytes. +func WithMaxCacheSize(sz uint64) Option { + return func(o *options) { + o.maxCacheSize = sz + } +} + +// WithNoSync sets an option to allow returning to caller on PUT before write is persisted. +// Note, that we use this flag for FSTree only and DO NOT use it for a bolt DB because +// we cannot yet properly handle the corrupted database during the startup. This SHOULD NOT +// be relied upon and may be changed in future. +func WithNoSync(noSync bool) Option { + return func(o *options) { + o.noSync = noSync + } +} + +// WithReportErrorFunc sets error reporting function. +func WithReportErrorFunc(f func(string, error)) Option { + return func(o *options) { + o.reportError = f + } +} + +// WithMetrics sets metrics implementation. +func WithMetrics(metrics writecache.Metrics) Option { + return func(o *options) { + o.metrics = metrics + } +} + +// WithGCInterval sets the duration of the interval to run GC cycles. +func WithGCInterval(d time.Duration) Option { + return func(o *options) { + o.gcInterval = d + } +} diff --git a/pkg/local_object_storage/writecache/writecachebadger/put.go b/pkg/local_object_storage/writecache/writecachebadger/put.go new file mode 100644 index 00000000..c03a0d33 --- /dev/null +++ b/pkg/local_object_storage/writecache/writecachebadger/put.go @@ -0,0 +1,82 @@ +package writecachebadger + +import ( + "context" + "time" + + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" + storagelog "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/log" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache" + "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" +) + +// Put puts object to write-cache. +// +// Returns ErrReadOnly if write-cache is in R/O mode. +// Returns ErrNotInitialized if write-cache has not been initialized yet. +// Returns ErrOutOfSpace if saving an object leads to WC's size overflow. +// Returns ErrBigObject if an objects exceeds maximum object size. +func (c *cache) Put(ctx context.Context, prm common.PutPrm) (common.PutRes, error) { + _, span := tracing.StartSpanFromContext(ctx, "writecache.Put", + trace.WithAttributes( + attribute.String("address", prm.Address.EncodeToString()), + attribute.Bool("dont_compress", prm.DontCompress), + )) + defer span.End() + + startedAt := time.Now() + added := false + storageType := writecache.StorageTypeUndefined + defer func() { + c.metrics.Put(time.Since(startedAt), added, storageType) + }() + + c.modeMtx.RLock() + defer c.modeMtx.RUnlock() + if c.readOnly() { + return common.PutRes{}, writecache.ErrReadOnly + } + + sz := uint64(len(prm.RawData)) + if sz > c.maxObjectSize { + return common.PutRes{}, writecache.ErrBigObject + } + + oi := objectInfo{ + addr: prm.Address, + obj: prm.Object, + data: prm.RawData, + } + + storageType = writecache.StorageTypeDB + err := c.put(oi) + if err == nil { + added = true + } + return common.PutRes{}, err +} + +// put persists objects to the write-cache database and +// pushes the to the flush workers queue. +func (c *cache) put(obj objectInfo) error { + cacheSize := c.estimateCacheSize() + if c.maxCacheSize < c.incSizeDB(cacheSize) { + return writecache.ErrOutOfSpace + } + + wb := c.db.NewWriteBatch() + k := addr2key(obj.addr) + _ = wb.Set(k[:], obj.data) + err := wb.Flush() + if err == nil { + storagelog.Write(c.log, + storagelog.AddressField(obj.addr), + storagelog.StorageTypeField(wcStorageType), + storagelog.OpField("db PUT"), + ) + c.objCounters.IncDB() + } + return err +} diff --git a/pkg/local_object_storage/writecache/writecachebadger/state.go b/pkg/local_object_storage/writecache/writecachebadger/state.go new file mode 100644 index 00000000..994dfa3d --- /dev/null +++ b/pkg/local_object_storage/writecache/writecachebadger/state.go @@ -0,0 +1,57 @@ +package writecachebadger + +import ( + "fmt" + "math" + "sync/atomic" + + "github.com/dgraph-io/badger/v4" +) + +func (c *cache) estimateCacheSize() uint64 { + onDiskSize, _ := c.db.EstimateSize(nil) + c.metrics.SetEstimateSize(onDiskSize, 0) + return onDiskSize +} + +func (c *cache) incSizeDB(sz uint64) uint64 { + return sz + c.maxObjectSize +} + +type counters struct { + cDB atomic.Uint64 +} + +func (x *counters) IncDB() { + x.cDB.Add(1) +} + +func (x *counters) DecDB() { + x.cDB.Add(math.MaxUint64) +} + +func (x *counters) DB() uint64 { + return x.cDB.Load() +} + +func (c *cache) initCounters() error { + var inDB uint64 + err := c.db.View(func(tx *badger.Txn) error { + opts := badger.DefaultIteratorOptions + opts.PrefetchValues = false + it := tx.NewIterator(opts) + defer it.Close() + for it.Rewind(); it.Valid(); it.Next() { + inDB++ + } + return nil + }) + if err != nil { + return fmt.Errorf("could not read write-cache DB counter: %w", err) + } + + c.objCounters.cDB.Store(inDB) + c.metrics.SetActualCounters(inDB, 0) + + return nil +} diff --git a/pkg/local_object_storage/writecache/writecachebadger/storage.go b/pkg/local_object_storage/writecache/writecachebadger/storage.go new file mode 100644 index 00000000..25d1900d --- /dev/null +++ b/pkg/local_object_storage/writecache/writecachebadger/storage.go @@ -0,0 +1,91 @@ +package writecachebadger + +import ( + "fmt" + "os" + "path/filepath" + + "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" + storagelog "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/log" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util" + cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" + oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" + "github.com/dgraph-io/badger/v4" + "go.uber.org/zap" +) + +// store represents persistent storage with in-memory LRU cache +// for flushed items on top of it. +type store struct { + db *badger.DB +} + +type internalKey [len(cid.ID{}) + len(oid.ID{})]byte + +func (k internalKey) address() oid.Address { + var addr oid.Address + var cnr cid.ID + var obj oid.ID + copy(cnr[:], k[:len(cnr)]) + copy(obj[:], k[len(cnr):]) + addr.SetContainer(cnr) + addr.SetObject(obj) + return addr +} + +func addr2key(addr oid.Address) internalKey { + var key internalKey + cnr, obj := addr.Container(), addr.Object() + copy(key[:len(cnr)], cnr[:]) + copy(key[len(cnr):], obj[:]) + return key +} + +const dbName = "small.badger" + +func (c *cache) openStore(readOnly bool) error { + err := util.MkdirAllX(c.path, os.ModePerm) + if err != nil { + return err + } + + c.db, err = OpenDB(filepath.Join(c.path, dbName), readOnly, c.log) + if err != nil { + return fmt.Errorf("could not open database: %w", err) + } + + return nil +} + +func (c *cache) deleteFromDB(keys []string) []string { + if len(keys) == 0 { + return keys + } + + wb := c.db.NewWriteBatch() + + var errorIndex int + for errorIndex = range keys { + if err := wb.Delete([]byte(keys[errorIndex])); err != nil { + break + } + } + + for i := 0; i < errorIndex; i++ { + c.objCounters.DecDB() + c.metrics.Evict(writecache.StorageTypeDB) + storagelog.Write(c.log, + storagelog.AddressField(keys[i]), + storagelog.StorageTypeField(wcStorageType), + storagelog.OpField("db DELETE"), + ) + } + + if err := wb.Flush(); err != nil { + c.log.Error(logs.WritecacheCantRemoveObjectsFromTheDatabase, zap.Error(err)) + } + + copy(keys, keys[errorIndex:]) + return keys[:len(keys)-errorIndex] +} diff --git a/pkg/local_object_storage/writecache/writecachebadger/util.go b/pkg/local_object_storage/writecache/writecachebadger/util.go new file mode 100644 index 00000000..1bb278f0 --- /dev/null +++ b/pkg/local_object_storage/writecache/writecachebadger/util.go @@ -0,0 +1,36 @@ +package writecachebadger + +import ( + "fmt" + + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" + "github.com/dgraph-io/badger/v4" +) + +// OpenDB opens a badger instance for write-cache. Opens in read-only mode if ro is true. +func OpenDB(p string, ro bool, l *logger.Logger) (*badger.DB, error) { + return badger.Open(badger.DefaultOptions(p). + WithReadOnly(ro). + WithLoggingLevel(badger.ERROR). + WithLogger(badgerLoggerWrapper{l})) +} + +type badgerLoggerWrapper struct { + l *logger.Logger +} + +func (w badgerLoggerWrapper) Errorf(msg string, args ...any) { + w.l.Error(fmt.Sprintf(msg, args...)) +} + +func (w badgerLoggerWrapper) Warningf(msg string, args ...any) { + w.l.Error(fmt.Sprintf(msg, args...)) +} + +func (w badgerLoggerWrapper) Infof(msg string, args ...any) { + w.l.Error(fmt.Sprintf(msg, args...)) +} + +func (w badgerLoggerWrapper) Debugf(msg string, args ...any) { + w.l.Error(fmt.Sprintf(msg, args...)) +} diff --git a/pkg/local_object_storage/writecache/writecachebbolt/cachebbolt.go b/pkg/local_object_storage/writecache/writecachebbolt/cachebbolt.go new file mode 100644 index 00000000..407d1a9c --- /dev/null +++ b/pkg/local_object_storage/writecache/writecachebbolt/cachebbolt.go @@ -0,0 +1,146 @@ +package writecachebbolt + +import ( + "os" + "sync" + + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" + objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" + "go.etcd.io/bbolt" + "go.uber.org/zap" +) + +type cache struct { + options + + // mtx protects statistics, counters and compressFlags. + mtx sync.RWMutex + + mode mode.Mode + modeMtx sync.RWMutex + + // compressFlags maps address of a big object to boolean value indicating + // whether object should be compressed. + compressFlags map[string]struct{} + + // flushCh is a channel with objects to flush. + flushCh chan *objectSDK.Object + // closeCh is close channel, protected by modeMtx. + closeCh chan struct{} + // wg is a wait group for flush workers. + wg sync.WaitGroup + // store contains underlying database. + store + // fsTree contains big files stored directly on file-system. + fsTree *fstree.FSTree +} + +// wcStorageType is used for write-cache operations logging. +const wcStorageType = "write-cache" + +type objectInfo struct { + addr string + data []byte + obj *objectSDK.Object +} + +const ( + defaultMaxObjectSize = 64 * 1024 * 1024 // 64 MiB + defaultSmallObjectSize = 32 * 1024 // 32 KiB + defaultMaxCacheSize = 1 << 30 // 1 GiB +) + +var ( + defaultBucket = []byte{0} +) + +// New creates new writecache instance. +func New(opts ...Option) writecache.Cache { + c := &cache{ + flushCh: make(chan *objectSDK.Object), + mode: mode.ReadWrite, + + compressFlags: make(map[string]struct{}), + options: options{ + log: &logger.Logger{Logger: zap.NewNop()}, + maxObjectSize: defaultMaxObjectSize, + smallObjectSize: defaultSmallObjectSize, + workersCount: defaultFlushWorkersCount, + maxCacheSize: defaultMaxCacheSize, + maxBatchSize: bbolt.DefaultMaxBatchSize, + maxBatchDelay: bbolt.DefaultMaxBatchDelay, + openFile: os.OpenFile, + metrics: writecache.DefaultMetrics(), + }, + } + + for i := range opts { + opts[i](&c.options) + } + + return c +} + +// SetLogger sets logger. It is used after the shard ID was generated to use it in logs. +func (c *cache) SetLogger(l *logger.Logger) { + c.log = l +} + +func (c *cache) DumpInfo() writecache.Info { + return writecache.Info{ + Path: c.path, + } +} + +// Open opens and initializes database. Reads object counters from the ObjectCounters instance. +func (c *cache) Open(readOnly bool) error { + err := c.openStore(readOnly) + if err != nil { + return metaerr.Wrap(err) + } + + // Opening after Close is done during maintenance mode, + // thus we need to create a channel here. + c.closeCh = make(chan struct{}) + + return metaerr.Wrap(c.initCounters()) +} + +// Init runs necessary services. +func (c *cache) Init() error { + c.metrics.SetMode(c.mode) + c.runFlushLoop() + return nil +} + +// Close closes db connection and stops services. Executes ObjectCounters.FlushAndClose op. +func (c *cache) Close() error { + // We cannot lock mutex for the whole operation duration + // because it is taken by some background workers, so `wg.Wait()` is done without modeMtx. + c.modeMtx.Lock() + if c.closeCh != nil { + close(c.closeCh) + } + c.mode = mode.DegradedReadOnly // prevent new operations from being processed + c.modeMtx.Unlock() + + c.wg.Wait() + + c.modeMtx.Lock() + defer c.modeMtx.Unlock() + + c.closeCh = nil + var err error + if c.db != nil { + err = c.db.Close() + if err != nil { + c.db = nil + } + } + c.metrics.Close() + return nil +} diff --git a/pkg/local_object_storage/writecache/delete.go b/pkg/local_object_storage/writecache/writecachebbolt/delete.go similarity index 87% rename from pkg/local_object_storage/writecache/delete.go rename to pkg/local_object_storage/writecache/writecachebbolt/delete.go index aeab88b0..b0cc091a 100644 --- a/pkg/local_object_storage/writecache/delete.go +++ b/pkg/local_object_storage/writecache/writecachebbolt/delete.go @@ -1,4 +1,4 @@ -package writecache +package writecachebbolt import ( "context" @@ -7,6 +7,7 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" storagelog "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/log" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "go.etcd.io/bbolt" @@ -25,7 +26,7 @@ func (c *cache) Delete(ctx context.Context, addr oid.Address) error { defer span.End() deleted := false - storageType := StorageTypeUndefined + storageType := writecache.StorageTypeUndefined startedAt := time.Now() defer func() { c.metrics.Delete(time.Since(startedAt), deleted, storageType) @@ -34,7 +35,7 @@ func (c *cache) Delete(ctx context.Context, addr oid.Address) error { c.modeMtx.RLock() defer c.modeMtx.RUnlock() if c.readOnly() { - return ErrReadOnly + return writecache.ErrReadOnly } saddr := addr.EncodeToString() @@ -47,7 +48,7 @@ func (c *cache) Delete(ctx context.Context, addr oid.Address) error { }) if dataSize > 0 { - storageType = StorageTypeDB + storageType = writecache.StorageTypeDB err := c.db.Update(func(tx *bbolt.Tx) error { b := tx.Bucket(defaultBucket) err := b.Delete([]byte(saddr)) @@ -66,7 +67,7 @@ func (c *cache) Delete(ctx context.Context, addr oid.Address) error { return nil } - storageType = StorageTypeFSTree + storageType = writecache.StorageTypeFSTree _, err := c.fsTree.Delete(ctx, common.DeletePrm{Address: addr}) if err == nil { storagelog.Write(c.log, diff --git a/pkg/local_object_storage/writecache/flush.go b/pkg/local_object_storage/writecache/writecachebbolt/flush.go similarity index 94% rename from pkg/local_object_storage/writecache/flush.go rename to pkg/local_object_storage/writecache/writecachebbolt/flush.go index 243be462..78018eea 100644 --- a/pkg/local_object_storage/writecache/flush.go +++ b/pkg/local_object_storage/writecache/writecachebbolt/flush.go @@ -1,4 +1,4 @@ -package writecache +package writecachebbolt import ( "bytes" @@ -12,6 +12,7 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr" meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" @@ -210,7 +211,7 @@ func (c *cache) flushFSTree(ctx context.Context, ignoreErrors bool) error { return err } - err = c.flushObject(ctx, &obj, data, StorageTypeFSTree) + err = c.flushObject(ctx, &obj, data, writecache.StorageTypeFSTree) if err != nil { if ignoreErrors { return nil @@ -239,7 +240,7 @@ func (c *cache) workerFlushSmall() { return } - err := c.flushObject(context.TODO(), obj, nil, StorageTypeDB) + err := c.flushObject(context.TODO(), obj, nil, writecache.StorageTypeDB) if err != nil { // Error is handled in flushObject. continue @@ -250,7 +251,7 @@ func (c *cache) workerFlushSmall() { } // flushObject is used to write object directly to the main storage. -func (c *cache) flushObject(ctx context.Context, obj *objectSDK.Object, data []byte, st StorageType) error { +func (c *cache) flushObject(ctx context.Context, obj *objectSDK.Object, data []byte, st writecache.StorageType) error { var err error defer func() { @@ -330,7 +331,7 @@ func (c *cache) flush(ctx context.Context, ignoreErrors bool) error { return err } - if err := c.flushObject(ctx, &obj, data, StorageTypeDB); err != nil { + if err := c.flushObject(ctx, &obj, data, writecache.StorageTypeDB); err != nil { return err } } diff --git a/pkg/local_object_storage/writecache/writecachebbolt/flush_test.go b/pkg/local_object_storage/writecache/writecachebbolt/flush_test.go new file mode 100644 index 00000000..465410ba --- /dev/null +++ b/pkg/local_object_storage/writecache/writecachebbolt/flush_test.go @@ -0,0 +1,106 @@ +package writecachebbolt + +import ( + "context" + "os" + "path/filepath" + "sync/atomic" + "testing" + + objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil" + meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache/writecachetest" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" + oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test" + "github.com/stretchr/testify/require" + "go.etcd.io/bbolt" + "go.uber.org/zap/zaptest" +) + +func TestFlush(t *testing.T) { + createCacheFn := func(t *testing.T, smallSize uint64, mb *meta.DB, bs *blobstor.BlobStor, opts ...Option) writecache.Cache { + return New( + append([]Option{ + WithLogger(&logger.Logger{Logger: zaptest.NewLogger(t)}), + WithPath(filepath.Join(t.TempDir(), "writecache")), + WithSmallObjectSize(smallSize), + WithMetabase(mb), + WithBlobstor(bs), + }, opts...)...) + } + + errCountOpt := func() (Option, *atomic.Uint32) { + cnt := &atomic.Uint32{} + return WithReportErrorFunc(func(string, error) { + cnt.Add(1) + }), cnt + } + + failures := []writecachetest.TestFailureInjector[Option]{ + { + Desc: "db, invalid address", + InjectFn: func(t *testing.T, wc writecache.Cache) { + c := wc.(*cache) + obj := testutil.GenerateObject() + data, err := obj.Marshal() + require.NoError(t, err) + require.NoError(t, c.db.Batch(func(tx *bbolt.Tx) error { + b := tx.Bucket(defaultBucket) + return b.Put([]byte{1, 2, 3}, data) + })) + }, + }, + { + Desc: "db, invalid object", + InjectFn: func(t *testing.T, wc writecache.Cache) { + c := wc.(*cache) + require.NoError(t, c.db.Batch(func(tx *bbolt.Tx) error { + b := tx.Bucket(defaultBucket) + k := []byte(oidtest.Address().EncodeToString()) + v := []byte{1, 2, 3} + return b.Put(k, v) + })) + }, + }, + { + Desc: "fs, read error", + InjectFn: func(t *testing.T, wc writecache.Cache) { + c := wc.(*cache) + obj := testutil.GenerateObject() + data, err := obj.Marshal() + require.NoError(t, err) + + var prm common.PutPrm + prm.Address = objectCore.AddressOf(obj) + prm.RawData = data + + _, err = c.fsTree.Put(context.Background(), prm) + require.NoError(t, err) + + p := prm.Address.Object().EncodeToString() + "." + prm.Address.Container().EncodeToString() + p = filepath.Join(c.fsTree.RootPath, p[:1], p[1:]) + + _, err = os.Stat(p) // sanity check + require.NoError(t, err) + require.NoError(t, os.Truncate(p, 0)) // corrupt the file contents, so that it can't be unmarshalled + }, + }, + { + Desc: "fs, invalid object", + InjectFn: func(t *testing.T, wc writecache.Cache) { + c := wc.(*cache) + var prm common.PutPrm + prm.Address = oidtest.Address() + prm.RawData = []byte{1, 2, 3} + _, err := c.fsTree.Put(context.Background(), prm) + require.NoError(t, err) + }, + }, + } + + writecachetest.TestFlush(t, createCacheFn, errCountOpt, failures...) +} diff --git a/pkg/local_object_storage/writecache/writecachebbolt/generic_test.go b/pkg/local_object_storage/writecache/writecachebbolt/generic_test.go new file mode 100644 index 00000000..509efdd6 --- /dev/null +++ b/pkg/local_object_storage/writecache/writecachebbolt/generic_test.go @@ -0,0 +1,18 @@ +package writecachebbolt + +import ( + "testing" + + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/storagetest" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" + "go.uber.org/zap/zaptest" +) + +func TestGeneric(t *testing.T) { + storagetest.TestAll(t, func(t *testing.T) storagetest.Component { + return New( + WithLogger(&logger.Logger{Logger: zaptest.NewLogger(t)}), + WithFlushWorkersCount(2), + WithPath(t.TempDir())) + }) +} diff --git a/pkg/local_object_storage/writecache/get.go b/pkg/local_object_storage/writecache/writecachebbolt/get.go similarity index 92% rename from pkg/local_object_storage/writecache/get.go rename to pkg/local_object_storage/writecache/writecachebbolt/get.go index 2546bada..9d2bc39d 100644 --- a/pkg/local_object_storage/writecache/get.go +++ b/pkg/local_object_storage/writecache/writecachebbolt/get.go @@ -1,4 +1,4 @@ -package writecache +package writecachebbolt import ( "context" @@ -7,6 +7,7 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" @@ -35,7 +36,7 @@ func (c *cache) Get(ctx context.Context, addr oid.Address) (*objectSDK.Object, e func (c *cache) getInternal(ctx context.Context, saddr string, addr oid.Address) (*objectSDK.Object, error) { found := false - storageType := StorageTypeUndefined + storageType := writecache.StorageTypeUndefined startedAt := time.Now() defer func() { c.metrics.Get(time.Since(startedAt), found, storageType) @@ -45,7 +46,7 @@ func (c *cache) getInternal(ctx context.Context, saddr string, addr oid.Address) if err == nil { obj := objectSDK.New() found = true - storageType = StorageTypeDB + storageType = writecache.StorageTypeDB return obj, obj.Unmarshal(value) } @@ -55,7 +56,7 @@ func (c *cache) getInternal(ctx context.Context, saddr string, addr oid.Address) } found = true - storageType = StorageTypeFSTree + storageType = writecache.StorageTypeFSTree return res.Object, nil } diff --git a/pkg/local_object_storage/writecache/iterate.go b/pkg/local_object_storage/writecache/writecachebbolt/iterate.go similarity index 97% rename from pkg/local_object_storage/writecache/iterate.go rename to pkg/local_object_storage/writecache/writecachebbolt/iterate.go index 5349c069..530db42a 100644 --- a/pkg/local_object_storage/writecache/iterate.go +++ b/pkg/local_object_storage/writecache/writecachebbolt/iterate.go @@ -1,4 +1,4 @@ -package writecache +package writecachebbolt import ( "errors" diff --git a/pkg/local_object_storage/writecache/writecachebbolt/mode.go b/pkg/local_object_storage/writecache/writecachebbolt/mode.go new file mode 100644 index 00000000..f7a9fffa --- /dev/null +++ b/pkg/local_object_storage/writecache/writecachebbolt/mode.go @@ -0,0 +1,75 @@ +package writecachebbolt + +import ( + "context" + "fmt" + "time" + + "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" + "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" +) + +// SetMode sets write-cache mode of operation. +// When shard is put in read-only mode all objects in memory are flushed to disk +// and all background jobs are suspended. +func (c *cache) SetMode(m mode.Mode) error { + ctx, span := tracing.StartSpanFromContext(context.TODO(), "writecache.SetMode", + trace.WithAttributes( + attribute.String("mode", m.String()), + )) + defer span.End() + + err := c.setMode(ctx, m) + if err == nil { + c.metrics.SetMode(m) + } + return err +} + +// setMode applies new mode. Must be called with cache.modeMtx lock taken. +func (c *cache) setMode(ctx context.Context, m mode.Mode) error { + var err error + turnOffMeta := m.NoMetabase() + + if turnOffMeta && !c.mode.NoMetabase() { + err = c.flush(ctx, true) + if err != nil { + return err + } + } + + if c.db != nil { + if err = c.db.Close(); err != nil { + return fmt.Errorf("can't close write-cache database: %w", err) + } + } + + // Suspend producers to ensure there are channel send operations in fly. + // flushCh is populated by `flush` with `modeMtx` taken, thus waiting until it is empty + // guarantees that there are no in-fly operations. + for len(c.flushCh) != 0 { + c.log.Info(logs.WritecacheWaitingForChannelsToFlush) + time.Sleep(time.Second) + } + + if turnOffMeta { + c.mode = m + return nil + } + + if err = c.openStore(m.ReadOnly()); err != nil { + return err + } + + c.mode = m + return nil +} + +// readOnly returns true if current mode is read-only. +// `c.modeMtx` must be taken. +func (c *cache) readOnly() bool { + return c.mode.ReadOnly() +} diff --git a/pkg/local_object_storage/writecache/options.go b/pkg/local_object_storage/writecache/writecachebbolt/options.go similarity index 96% rename from pkg/local_object_storage/writecache/options.go rename to pkg/local_object_storage/writecache/writecachebbolt/options.go index bea40aa3..0a21421c 100644 --- a/pkg/local_object_storage/writecache/options.go +++ b/pkg/local_object_storage/writecache/writecachebbolt/options.go @@ -1,4 +1,4 @@ -package writecache +package writecachebbolt import ( "context" @@ -9,6 +9,7 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" "go.uber.org/zap" @@ -61,7 +62,7 @@ type options struct { // openFile is the function called internally by bbolt to open database files. Useful for hermetic testing. openFile func(string, int, fs.FileMode) (*os.File, error) // metrics is metrics implementation - metrics Metrics + metrics writecache.Metrics } // WithLogger sets logger. @@ -168,7 +169,7 @@ func WithOpenFile(f func(string, int, fs.FileMode) (*os.File, error)) Option { } // WithMetrics sets metrics implementation. -func WithMetrics(metrics Metrics) Option { +func WithMetrics(metrics writecache.Metrics) Option { return func(o *options) { o.metrics = metrics } diff --git a/pkg/local_object_storage/writecache/put.go b/pkg/local_object_storage/writecache/writecachebbolt/put.go similarity index 91% rename from pkg/local_object_storage/writecache/put.go rename to pkg/local_object_storage/writecache/writecachebbolt/put.go index 619b2bd2..505d091a 100644 --- a/pkg/local_object_storage/writecache/put.go +++ b/pkg/local_object_storage/writecache/writecachebbolt/put.go @@ -1,4 +1,4 @@ -package writecache +package writecachebbolt import ( "context" @@ -8,6 +8,7 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" storagelog "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/log" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" "go.etcd.io/bbolt" "go.opentelemetry.io/otel/attribute" @@ -37,7 +38,7 @@ func (c *cache) Put(ctx context.Context, prm common.PutPrm) (common.PutRes, erro startedAt := time.Now() added := false - storageType := StorageTypeUndefined + storageType := writecache.StorageTypeUndefined defer func() { c.metrics.Put(time.Since(startedAt), added, storageType) }() @@ -45,7 +46,7 @@ func (c *cache) Put(ctx context.Context, prm common.PutPrm) (common.PutRes, erro c.modeMtx.RLock() defer c.modeMtx.RUnlock() if c.readOnly() { - return common.PutRes{}, ErrReadOnly + return common.PutRes{}, writecache.ErrReadOnly } sz := uint64(len(prm.RawData)) @@ -60,7 +61,7 @@ func (c *cache) Put(ctx context.Context, prm common.PutPrm) (common.PutRes, erro } if sz <= c.smallObjectSize { - storageType = StorageTypeDB + storageType = writecache.StorageTypeDB err := c.putSmall(oi) if err == nil { added = true @@ -68,7 +69,7 @@ func (c *cache) Put(ctx context.Context, prm common.PutPrm) (common.PutRes, erro return common.PutRes{}, err } - storageType = StorageTypeFSTree + storageType = writecache.StorageTypeFSTree err := c.putBig(ctx, oi.addr, prm) if err == nil { added = true diff --git a/pkg/local_object_storage/writecache/state.go b/pkg/local_object_storage/writecache/writecachebbolt/state.go similarity index 98% rename from pkg/local_object_storage/writecache/state.go rename to pkg/local_object_storage/writecache/writecachebbolt/state.go index 14103e62..95037975 100644 --- a/pkg/local_object_storage/writecache/state.go +++ b/pkg/local_object_storage/writecache/writecachebbolt/state.go @@ -1,4 +1,4 @@ -package writecache +package writecachebbolt import ( "fmt" diff --git a/pkg/local_object_storage/writecache/storage.go b/pkg/local_object_storage/writecache/writecachebbolt/storage.go similarity index 94% rename from pkg/local_object_storage/writecache/storage.go rename to pkg/local_object_storage/writecache/writecachebbolt/storage.go index 3bd3813d..ab0b5990 100644 --- a/pkg/local_object_storage/writecache/storage.go +++ b/pkg/local_object_storage/writecache/writecachebbolt/storage.go @@ -1,4 +1,4 @@ -package writecache +package writecachebbolt import ( "context" @@ -10,6 +10,7 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree" storagelog "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/log" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" @@ -79,7 +80,7 @@ func (c *cache) deleteFromDB(keys []string) []string { }) for i := 0; i < errorIndex; i++ { c.objCounters.DecDB() - c.metrics.Evict(StorageTypeDB) + c.metrics.Evict(writecache.StorageTypeDB) storagelog.Write(c.log, storagelog.AddressField(keys[i]), storagelog.StorageTypeField(wcStorageType), @@ -122,7 +123,7 @@ func (c *cache) deleteFromDisk(ctx context.Context, keys []string) []string { storagelog.StorageTypeField(wcStorageType), storagelog.OpField("fstree DELETE"), ) - c.metrics.Evict(StorageTypeFSTree) + c.metrics.Evict(writecache.StorageTypeFSTree) c.objCounters.DecFS() } } diff --git a/pkg/local_object_storage/writecache/util.go b/pkg/local_object_storage/writecache/writecachebbolt/util.go similarity index 95% rename from pkg/local_object_storage/writecache/util.go rename to pkg/local_object_storage/writecache/writecachebbolt/util.go index 0ed4a954..fe225583 100644 --- a/pkg/local_object_storage/writecache/util.go +++ b/pkg/local_object_storage/writecache/writecachebbolt/util.go @@ -1,4 +1,4 @@ -package writecache +package writecachebbolt import ( "io/fs" diff --git a/pkg/local_object_storage/writecache/writecachetest/flush.go b/pkg/local_object_storage/writecache/writecachetest/flush.go new file mode 100644 index 00000000..e36778e0 --- /dev/null +++ b/pkg/local_object_storage/writecache/writecachetest/flush.go @@ -0,0 +1,185 @@ +package writecachetest + +import ( + "context" + "path/filepath" + "sync/atomic" + "testing" + + objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil" + meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache" + objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" + oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" + "github.com/stretchr/testify/require" +) + +const ( + objCount = 4 + smallSize = 256 +) + +type CreateCacheFunc[Option any] func( + t *testing.T, + smallSize uint64, + meta *meta.DB, + bs *blobstor.BlobStor, + opts ...Option, +) writecache.Cache + +type TestFailureInjector[Option any] struct { + Desc string + InjectFn func(*testing.T, writecache.Cache) +} + +type objectPair struct { + addr oid.Address + obj *objectSDK.Object +} + +func TestFlush[Option any]( + t *testing.T, + createCacheFn CreateCacheFunc[Option], + errCountOption func() (Option, *atomic.Uint32), + failures ...TestFailureInjector[Option], +) { + t.Run("no errors", func(t *testing.T) { + wc, bs, mb := newCache(t, createCacheFn, smallSize) + objects := putObjects(t, wc) + + require.NoError(t, bs.SetMode(mode.ReadWrite)) + require.NoError(t, mb.SetMode(mode.ReadWrite)) + + require.NoError(t, wc.Flush(context.Background(), false)) + + check(t, mb, bs, objects) + }) + + t.Run("flush on moving to degraded mode", func(t *testing.T) { + wc, bs, mb := newCache(t, createCacheFn, smallSize) + objects := putObjects(t, wc) + + // Blobstor is read-only, so we expect en error from `flush` here. + require.Error(t, wc.SetMode(mode.Degraded)) + + // First move to read-only mode to close background workers. + require.NoError(t, wc.SetMode(mode.ReadOnly)) + require.NoError(t, bs.SetMode(mode.ReadWrite)) + require.NoError(t, mb.SetMode(mode.ReadWrite)) + require.NoError(t, wc.SetMode(mode.Degraded)) + + check(t, mb, bs, objects) + }) + + t.Run("ignore errors", func(t *testing.T) { + for _, f := range failures { + f := f + t.Run(f.Desc, func(t *testing.T) { + errCountOpt, errCount := errCountOption() + wc, bs, mb := newCache(t, createCacheFn, smallSize, errCountOpt) + objects := putObjects(t, wc) + f.InjectFn(t, wc) + + require.NoError(t, wc.SetMode(mode.ReadOnly)) + require.NoError(t, bs.SetMode(mode.ReadWrite)) + require.NoError(t, mb.SetMode(mode.ReadWrite)) + + require.Equal(t, uint32(0), errCount.Load()) + require.Error(t, wc.Flush(context.Background(), false)) + require.True(t, errCount.Load() > 0) + require.NoError(t, wc.Flush(context.Background(), true)) + + check(t, mb, bs, objects) + }) + } + }) +} + +func newCache[Option any]( + t *testing.T, + createCacheFn CreateCacheFunc[Option], + smallSize uint64, + opts ...Option, +) (writecache.Cache, *blobstor.BlobStor, *meta.DB) { + dir := t.TempDir() + mb := meta.New( + meta.WithPath(filepath.Join(dir, "meta")), + meta.WithEpochState(dummyEpoch{})) + require.NoError(t, mb.Open(false)) + require.NoError(t, mb.Init()) + + bs := blobstor.New(blobstor.WithStorages([]blobstor.SubStorage{ + { + Storage: fstree.New( + fstree.WithPath(filepath.Join(dir, "blob")), + fstree.WithDepth(0), + fstree.WithDirNameLen(1)), + }, + })) + require.NoError(t, bs.Open(false)) + require.NoError(t, bs.Init()) + + wc := createCacheFn(t, smallSize, mb, bs, opts...) + t.Cleanup(func() { require.NoError(t, wc.Close()) }) + require.NoError(t, wc.Open(false)) + require.NoError(t, wc.Init()) + + // First set mode for metabase and blobstor to prevent background flushes. + require.NoError(t, mb.SetMode(mode.ReadOnly)) + require.NoError(t, bs.SetMode(mode.ReadOnly)) + + return wc, bs, mb +} + +func putObject(t *testing.T, c writecache.Cache, size int) objectPair { + obj := testutil.GenerateObjectWithSize(size) + data, err := obj.Marshal() + require.NoError(t, err) + + var prm common.PutPrm + prm.Address = objectCore.AddressOf(obj) + prm.Object = obj + prm.RawData = data + + _, err = c.Put(context.Background(), prm) + require.NoError(t, err) + + return objectPair{prm.Address, prm.Object} +} + +func putObjects(t *testing.T, c writecache.Cache) []objectPair { + objects := make([]objectPair, objCount) + for i := range objects { + objects[i] = putObject(t, c, 1+(i%2)*smallSize) + } + return objects +} + +func check(t *testing.T, mb *meta.DB, bs *blobstor.BlobStor, objects []objectPair) { + for i := range objects { + var mPrm meta.StorageIDPrm + mPrm.SetAddress(objects[i].addr) + + mRes, err := mb.StorageID(context.Background(), mPrm) + require.NoError(t, err) + + var prm common.GetPrm + prm.Address = objects[i].addr + prm.StorageID = mRes.StorageID() + + res, err := bs.Get(context.Background(), prm) + require.NoError(t, err) + require.Equal(t, objects[i].obj, res.Object) + } +} + +type dummyEpoch struct{} + +func (dummyEpoch) CurrentEpoch() uint64 { + return 0 +}