diff --git a/cmd/frostfs-lens/internal/flags.go b/cmd/frostfs-lens/internal/flags.go index 00451916..8a987a2d 100644 --- a/cmd/frostfs-lens/internal/flags.go +++ b/cmd/frostfs-lens/internal/flags.go @@ -8,7 +8,6 @@ const ( flagAddress = "address" flagEnginePath = "path" flagOutFile = "out" - flagDBType = "dbtype" ) // AddAddressFlag adds the address flag to the passed cobra command. @@ -34,9 +33,3 @@ func AddOutputFileFlag(cmd *cobra.Command, v *string) { "File to save object payload") _ = cmd.MarkFlagFilename(flagOutFile) } - -// AddDBTypeFlag adds the DB type flag to the passed cobra command. -func AddDBTypeFlag(cmd *cobra.Command, v *string) { - cmd.Flags().StringVar(v, flagDBType, "bbolt", - "Type of DB used by write cache (default: bbolt)") -} diff --git a/cmd/frostfs-lens/internal/writecache/inspect.go b/cmd/frostfs-lens/internal/writecache/inspect.go index 1a733513..afc986c8 100644 --- a/cmd/frostfs-lens/internal/writecache/inspect.go +++ b/cmd/frostfs-lens/internal/writecache/inspect.go @@ -1,13 +1,10 @@ package writecache import ( - "fmt" "os" common "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-lens/internal" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache/writecachebadger" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache/writecachebbolt" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" "github.com/spf13/cobra" ) @@ -23,34 +20,17 @@ func init() { common.AddAddressFlag(inspectCMD, &vAddress) common.AddComponentPathFlag(inspectCMD, &vPath) common.AddOutputFileFlag(inspectCMD, &vOut) - common.AddDBTypeFlag(inspectCMD, &vDBType) } func inspectFunc(cmd *cobra.Command, _ []string) { var data []byte - switch vDBType { - case "bbolt": - db, err := writecachebbolt.OpenDB(vPath, true, os.OpenFile) - common.ExitOnErr(cmd, common.Errf("could not open write-cache db: %w", err)) - defer db.Close() + db, err := writecache.OpenDB(vPath, true, os.OpenFile) + common.ExitOnErr(cmd, common.Errf("could not open write-cache db: %w", err)) + defer db.Close() - data, err = writecachebbolt.Get(db, []byte(vAddress)) - common.ExitOnErr(cmd, common.Errf("could not fetch object: %w", err)) - - case "badger": - log, err := logger.NewLogger(&logger.Prm{}) - common.ExitOnErr(cmd, common.Errf("could not create logger: %w", err)) - - db, err := writecachebadger.OpenDB(vPath, true, log) - common.ExitOnErr(cmd, common.Errf("could not open write-cache db: %w", err)) - - data, err = writecachebadger.Get(db, []byte(vAddress)) - common.ExitOnErr(cmd, common.Errf("could not fetch object: %w", err)) - - default: - common.ExitOnErr(cmd, fmt.Errorf("invalid dbtype: %q (possible values: bbolt, badger)", vDBType)) - } + data, err = writecache.Get(db, []byte(vAddress)) + common.ExitOnErr(cmd, common.Errf("could not fetch object: %w", err)) var o objectSDK.Object common.ExitOnErr(cmd, common.Errf("could not unmarshal object: %w", o.Unmarshal(data))) diff --git a/cmd/frostfs-lens/internal/writecache/list.go b/cmd/frostfs-lens/internal/writecache/list.go index df02a82f..bcbae0ec 100644 --- a/cmd/frostfs-lens/internal/writecache/list.go +++ b/cmd/frostfs-lens/internal/writecache/list.go @@ -6,9 +6,7 @@ import ( "os" common "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-lens/internal" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache/writecachebadger" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache/writecachebbolt" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "github.com/spf13/cobra" ) @@ -33,26 +31,10 @@ func listFunc(cmd *cobra.Command, _ []string) { return err } - switch vDBType { - case "bbolt": - db, err := writecachebbolt.OpenDB(vPath, true, os.OpenFile) - common.ExitOnErr(cmd, common.Errf("could not open write-cache db: %w", err)) - defer db.Close() + db, err := writecache.OpenDB(vPath, true, os.OpenFile) + common.ExitOnErr(cmd, common.Errf("could not open write-cache db: %w", err)) + defer db.Close() - err = writecachebbolt.IterateDB(db, wAddr) - common.ExitOnErr(cmd, common.Errf("write-cache iterator failure: %w", err)) - - case "badger": - log, err := logger.NewLogger(&logger.Prm{}) - common.ExitOnErr(cmd, common.Errf("could not create logger: %w", err)) - - db, err := writecachebadger.OpenDB(vPath, true, log) - common.ExitOnErr(cmd, common.Errf("could not open write-cache db: %w", err)) - - err = writecachebadger.IterateDB(db, wAddr) - common.ExitOnErr(cmd, common.Errf("write-cache iterator failure: %w", err)) - - default: - common.ExitOnErr(cmd, fmt.Errorf("invalid dbtype: %q (possible values: bbolt, badger)", vDBType)) - } + err = writecache.IterateDB(db, wAddr) + common.ExitOnErr(cmd, common.Errf("write-cache iterator failure: %w", err)) } diff --git a/cmd/frostfs-lens/internal/writecache/root.go b/cmd/frostfs-lens/internal/writecache/root.go index 11a8bb96..eb3b325b 100644 --- a/cmd/frostfs-lens/internal/writecache/root.go +++ b/cmd/frostfs-lens/internal/writecache/root.go @@ -8,7 +8,6 @@ var ( vAddress string vPath string vOut string - vDBType string ) // Root contains `write-cache` command definition. diff --git a/cmd/frostfs-node/config.go b/cmd/frostfs-node/config.go index 82e20378..ffe49a7e 100644 --- a/cmd/frostfs-node/config.go +++ b/cmd/frostfs-node/config.go @@ -41,9 +41,7 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard" shardmode "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" - writecacheconfig "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache/config" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache/writecachebadger" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache/writecachebbolt" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/metrics" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client" containerClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/container" @@ -138,7 +136,6 @@ type shardCfg struct { writecacheCfg struct { enabled bool - typ writecacheconfig.Type path string maxBatchSize int maxBatchDelay time.Duration @@ -147,7 +144,6 @@ type shardCfg struct { flushWorkerCount int sizeLimit uint64 noSync bool - gcInterval time.Duration } piloramaCfg struct { @@ -258,7 +254,6 @@ func (a *applicationConfiguration) setShardWriteCacheConfig(newConfig *shardCfg, wc := &newConfig.writecacheCfg wc.enabled = true - wc.typ = writeCacheCfg.Type() wc.path = writeCacheCfg.Path() wc.maxBatchSize = writeCacheCfg.BoltDB().MaxBatchSize() wc.maxBatchDelay = writeCacheCfg.BoltDB().MaxBatchDelay() @@ -267,7 +262,6 @@ func (a *applicationConfiguration) setShardWriteCacheConfig(newConfig *shardCfg, wc.flushWorkerCount = writeCacheCfg.WorkerCount() wc.sizeLimit = writeCacheCfg.SizeLimit() wc.noSync = writeCacheCfg.NoSync() - wc.gcInterval = writeCacheCfg.GCInterval() } } @@ -836,36 +830,20 @@ func (c *cfg) shardOpts() []shardOptsWithID { return shards } -func (c *cfg) getWriteCacheOpts(shCfg shardCfg) writecacheconfig.Options { - var writeCacheOpts writecacheconfig.Options +func (c *cfg) getWriteCacheOpts(shCfg shardCfg) []writecache.Option { + var writeCacheOpts []writecache.Option if wcRead := shCfg.writecacheCfg; wcRead.enabled { - switch wcRead.typ { - case writecacheconfig.TypeBBolt: - writeCacheOpts.Type = writecacheconfig.TypeBBolt - writeCacheOpts.BBoltOptions = append(writeCacheOpts.BBoltOptions, - writecachebbolt.WithPath(wcRead.path), - writecachebbolt.WithMaxBatchSize(wcRead.maxBatchSize), - writecachebbolt.WithMaxBatchDelay(wcRead.maxBatchDelay), - writecachebbolt.WithMaxObjectSize(wcRead.maxObjSize), - writecachebbolt.WithSmallObjectSize(wcRead.smallObjectSize), - writecachebbolt.WithFlushWorkersCount(wcRead.flushWorkerCount), - writecachebbolt.WithMaxCacheSize(wcRead.sizeLimit), - writecachebbolt.WithNoSync(wcRead.noSync), - writecachebbolt.WithLogger(c.log), - ) - case writecacheconfig.TypeBadger: - writeCacheOpts.Type = writecacheconfig.TypeBadger - writeCacheOpts.BadgerOptions = append(writeCacheOpts.BadgerOptions, - writecachebadger.WithPath(wcRead.path), - writecachebadger.WithMaxObjectSize(wcRead.maxObjSize), - writecachebadger.WithFlushWorkersCount(wcRead.flushWorkerCount), - writecachebadger.WithMaxCacheSize(wcRead.sizeLimit), - writecachebadger.WithLogger(c.log), - writecachebadger.WithGCInterval(wcRead.gcInterval), - ) - default: - panic(fmt.Sprintf("unknown writecache type: %q", wcRead.typ)) - } + writeCacheOpts = append(writeCacheOpts, + writecache.WithPath(wcRead.path), + writecache.WithMaxBatchSize(wcRead.maxBatchSize), + writecache.WithMaxBatchDelay(wcRead.maxBatchDelay), + writecache.WithMaxObjectSize(wcRead.maxObjSize), + writecache.WithSmallObjectSize(wcRead.smallObjectSize), + writecache.WithFlushWorkersCount(wcRead.flushWorkerCount), + writecache.WithMaxCacheSize(wcRead.sizeLimit), + writecache.WithNoSync(wcRead.noSync), + writecache.WithLogger(c.log), + ) } return writeCacheOpts } diff --git a/cmd/frostfs-node/config/engine/shard/writecache/config.go b/cmd/frostfs-node/config/engine/shard/writecache/config.go index d0fd4bf7..5e31e04a 100644 --- a/cmd/frostfs-node/config/engine/shard/writecache/config.go +++ b/cmd/frostfs-node/config/engine/shard/writecache/config.go @@ -1,12 +1,8 @@ package writecacheconfig import ( - "fmt" - "time" - "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config" boltdbconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/boltdb" - writecacheconfig "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache/config" ) // Config is a wrapper over the config section @@ -25,9 +21,6 @@ const ( // SizeLimitDefault is a default write-cache size limit. SizeLimitDefault = 1 << 30 - - // DefaultGCInterval is the default duration of the GC cycle interval. - DefaultGCInterval = 1 * time.Minute ) // From wraps config section into Config. @@ -42,22 +35,6 @@ func (x *Config) Enabled() bool { return config.Bool((*config.Config)(x), "enabled") } -// Type returns the writecache implementation type to use. -// -// Panics if the type is not recognized. -func (x *Config) Type() writecacheconfig.Type { - t := config.String((*config.Config)(x), "type") - - switch t { - case "bbolt", "": - return writecacheconfig.TypeBBolt - case "badger": - return writecacheconfig.TypeBadger - } - - panic(fmt.Sprintf("invalid writecache type: %q", t)) -} - // Path returns the value of "path" config parameter. // // Panics if the value is not a non-empty string. @@ -149,16 +126,3 @@ func (x *Config) NoSync() bool { func (x *Config) BoltDB() *boltdbconfig.Config { return (*boltdbconfig.Config)(x) } - -// GCInterval returns the value of "gc_interval" config parameter. -// -// Returns DefaultGCInterval if the value is not a positive duration. -func (x *Config) GCInterval() time.Duration { - d := config.DurationSafe((*config.Config)(x), "gc_interval") - - if d > 0 { - return d - } - - return DefaultGCInterval -} diff --git a/config/example/node.json b/config/example/node.json index c803e11f..8dda5e60 100644 --- a/config/example/node.json +++ b/config/example/node.json @@ -202,7 +202,6 @@ "resync_metabase": true, "writecache": { "enabled": true, - "type": "bbolt", "path": "tmp/1/cache", "memcache_capacity": 2147483648, "small_object_size": 16384, diff --git a/config/example/node.yaml b/config/example/node.yaml index d4c3516b..17151516 100644 --- a/config/example/node.yaml +++ b/config/example/node.yaml @@ -127,7 +127,6 @@ storage: writecache: enabled: true - type: bbolt small_object_size: 16k # size threshold for "small" objects which are cached in key-value DB, not in FS, bytes max_object_size: 134217728 # size threshold for "big" objects which bypass write-cache and go to the storage directly, bytes flush_worker_count: 30 # number of write-cache flusher threads diff --git a/docs/storage-node-configuration.md b/docs/storage-node-configuration.md index 613be128..e2334f8f 100644 --- a/docs/storage-node-configuration.md +++ b/docs/storage-node-configuration.md @@ -277,7 +277,6 @@ metabase: ```yaml writecache: enabled: true - type: bbolt path: /path/to/writecache capacity: 4294967296 small_object_size: 16384 @@ -287,7 +286,6 @@ writecache: | Parameter | Type | Default value | Description | |----------------------|------------|---------------|----------------------------------------------------------------------------------------------------------------------| -| `type` | `string` | | Type of write cache backing implementation to use (`bbolt`, `badger`). | | `path` | `string` | | Path to the metabase file. | | `capacity` | `size` | unrestricted | Approximate maximum size of the writecache. If the writecache is full, objects are written to the blobstor directly. | | `small_object_size` | `size` | `32K` | Maximum object size for "small" objects. This objects are stored in a key-value database instead of a file-system. | diff --git a/go.mod b/go.mod index 2b8ffea3..c4266fe2 100644 --- a/go.mod +++ b/go.mod @@ -12,7 +12,6 @@ require ( git.frostfs.info/TrueCloudLab/tzhash v1.8.0 github.com/cheggaaa/pb v1.0.29 github.com/chzyer/readline v1.5.1 - github.com/dgraph-io/ristretto v0.1.1 github.com/flynn-archive/go-shlex v0.0.0-20150515145356-3f9db97f8568 github.com/google/uuid v1.4.0 github.com/hashicorp/golang-lru/v2 v2.0.7 @@ -44,18 +43,11 @@ require ( ) require ( - github.com/dustin/go-humanize v1.0.1 // indirect - github.com/gogo/protobuf v1.3.2 // indirect - github.com/golang/glog v1.2.0 // indirect - github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect - github.com/google/flatbuffers v23.5.26+incompatible // indirect github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 // indirect - github.com/pkg/errors v0.9.1 // indirect github.com/sagikazarmark/locafero v0.4.0 // indirect github.com/sagikazarmark/slog-shim v0.1.0 // indirect github.com/sourcegraph/conc v0.3.0 // indirect go.mongodb.org/mongo-driver v1.11.4 // indirect - go.opencensus.io v0.24.0 // indirect ) require ( @@ -71,7 +63,6 @@ require ( github.com/cpuguy83/go-md2man/v2 v2.0.3 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 // indirect - github.com/dgraph-io/badger/v4 v4.2.0 github.com/fsnotify/fsnotify v1.7.0 // indirect github.com/go-logr/logr v1.3.0 // indirect github.com/go-logr/stdr v1.2.2 // indirect diff --git a/go.sum b/go.sum index 56f4cba5..490cd4ae 100644 Binary files a/go.sum and b/go.sum differ diff --git a/internal/logs/logs.go b/internal/logs/logs.go index c3f4fdc7..6ead8370 100644 --- a/internal/logs/logs.go +++ b/internal/logs/logs.go @@ -291,7 +291,6 @@ const ( ShardFailureToMarkLockersAsGarbage = "failure to mark lockers as garbage" ShardFailureToGetExpiredUnlockedObjects = "failure to get expired unlocked objects" ShardCouldNotMarkObjectToDeleteInMetabase = "could not mark object to delete in metabase" - WritecacheBadgerInitExperimental = "initializing badger-backed experimental writecache" WritecacheTriedToFlushItemsFromWritecache = "tried to flush items from write-cache" WritecacheWaitingForChannelsToFlush = "waiting for channels to flush" WritecacheFillingFlushMarksForObjectsInFSTree = "filling flush marks for objects in FSTree" @@ -300,8 +299,6 @@ const ( WritecacheFinishedUpdatingFlushMarks = "finished updating flush marks" WritecacheCantRemoveObjectsFromTheDatabase = "can't remove objects from the database" WritecacheCantRemoveObjectFromWritecache = "can't remove object from write-cache" - WritecacheDBValueLogGCRunCompleted = "value log GC run completed" - WritecacheBadgerObjAlreadyScheduled = "object already scheduled for flush" BlobovniczatreeCouldNotGetObjectFromLevel = "could not get object from level" BlobovniczatreeCouldNotReadPayloadRangeFromOpenedBlobovnicza = "could not read payload range from opened blobovnicza" BlobovniczatreeCouldNotReadPayloadRangeFromActiveBlobovnicza = "could not read payload range from active blobovnicza" diff --git a/pkg/local_object_storage/engine/control_test.go b/pkg/local_object_storage/engine/control_test.go index 9cfb311e..21652970 100644 --- a/pkg/local_object_storage/engine/control_test.go +++ b/pkg/local_object_storage/engine/control_test.go @@ -20,8 +20,7 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" - writecacheconfig "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache/config" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache/writecachebbolt" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger/test" cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test" "github.com/stretchr/testify/require" @@ -44,11 +43,8 @@ func TestInitializationFailure(t *testing.T) { storages, smallFileStorage, largeFileStorage := newTestStorages(t.TempDir(), 1<<20) - wcOpts := writecacheconfig.Options{ - Type: writecacheconfig.TypeBBolt, - BBoltOptions: []writecachebbolt.Option{ - writecachebbolt.WithPath(t.TempDir()), - }, + wcOpts := []writecache.Option{ + writecache.WithPath(t.TempDir()), } return []shard.Option{ diff --git a/pkg/local_object_storage/shard/control_test.go b/pkg/local_object_storage/shard/control_test.go index f08af6fc..8a650db0 100644 --- a/pkg/local_object_storage/shard/control_test.go +++ b/pkg/local_object_storage/shard/control_test.go @@ -17,8 +17,7 @@ import ( meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" - writecacheconfig "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache/config" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache/writecachebbolt" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger/test" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" @@ -58,11 +57,8 @@ func TestShardOpen(t *testing.T) { return nil, fs.ErrPermission } - wcOpts := writecacheconfig.Options{ - Type: writecacheconfig.TypeBBolt, - BBoltOptions: []writecachebbolt.Option{ - writecachebbolt.WithPath(filepath.Join(dir, "wc")), - }, + wcOpts := []writecache.Option{ + writecache.WithPath(filepath.Join(dir, "wc")), } newShard := func() *Shard { diff --git a/pkg/local_object_storage/shard/range_test.go b/pkg/local_object_storage/shard/range_test.go index 098a584a..35dc0c6a 100644 --- a/pkg/local_object_storage/shard/range_test.go +++ b/pkg/local_object_storage/shard/range_test.go @@ -12,8 +12,7 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/blobovniczatree" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil" - writecacheconfig "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache/config" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache/writecachebbolt" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger/test" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" @@ -69,11 +68,8 @@ func testShardGetRange(t *testing.T, hasWriteCache bool) { testCase{true, "object in write-cache, out of range, big offset", 100, newRange(101, math.MaxUint64-10)}) } - wcOpts := writecacheconfig.Options{ - Type: writecacheconfig.TypeBBolt, - BBoltOptions: []writecachebbolt.Option{ - writecachebbolt.WithMaxObjectSize(writeCacheMaxSize), - }, + wcOpts := []writecache.Option{ + writecache.WithMaxObjectSize(writeCacheMaxSize), } sh := newCustomShard(t, hasWriteCache, shardOptions{ diff --git a/pkg/local_object_storage/shard/shard.go b/pkg/local_object_storage/shard/shard.go index 56c6aee1..641b487e 100644 --- a/pkg/local_object_storage/shard/shard.go +++ b/pkg/local_object_storage/shard/shard.go @@ -2,7 +2,6 @@ package shard import ( "context" - "fmt" "sync" "sync/atomic" "time" @@ -13,9 +12,6 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache" - writecacheconfig "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache/config" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache/writecachebadger" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache/writecachebbolt" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" @@ -108,7 +104,7 @@ type cfg struct { metaOpts []meta.Option - writeCacheOpts writecacheconfig.Options + writeCacheOpts []writecache.Option piloramaOpts []pilorama.Option @@ -166,22 +162,11 @@ func New(opts ...Option) *Shard { s.blobStor.SetReportErrorFunc(reportFunc) if c.useWriteCache { - switch c.writeCacheOpts.Type { - case writecacheconfig.TypeBBolt: - s.writeCache = writecachebbolt.New( - append(c.writeCacheOpts.BBoltOptions, - writecachebbolt.WithReportErrorFunc(reportFunc), - writecachebbolt.WithBlobstor(bs), - writecachebbolt.WithMetabase(mb))...) - case writecacheconfig.TypeBadger: - s.writeCache = writecachebadger.New( - append(c.writeCacheOpts.BadgerOptions, - writecachebadger.WithReportErrorFunc(reportFunc), - writecachebadger.WithBlobstor(bs), - writecachebadger.WithMetabase(mb))...) - default: - panic(fmt.Sprintf("invalid writecache type: %v", c.writeCacheOpts.Type)) - } + s.writeCache = writecache.New( + append(c.writeCacheOpts, + writecache.WithReportErrorFunc(reportFunc), + writecache.WithBlobstor(bs), + writecache.WithMetabase(mb))...) } if s.piloramaOpts != nil { @@ -215,7 +200,7 @@ func WithMetaBaseOptions(opts ...meta.Option) Option { } // WithWriteCacheOptions returns option to set internal write cache options. -func WithWriteCacheOptions(opts writecacheconfig.Options) Option { +func WithWriteCacheOptions(opts []writecache.Option) Option { return func(c *cfg) { c.writeCacheOpts = opts } @@ -224,12 +209,7 @@ func WithWriteCacheOptions(opts writecacheconfig.Options) Option { // WithWriteCacheMetrics returns an option to set the metrics register used by the write cache. func WithWriteCacheMetrics(wcMetrics writecache.Metrics) Option { return func(c *cfg) { - switch c.writeCacheOpts.Type { - case writecacheconfig.TypeBBolt: - c.writeCacheOpts.BBoltOptions = append(c.writeCacheOpts.BBoltOptions, writecachebbolt.WithMetrics(wcMetrics)) - case writecacheconfig.TypeBadger: - c.writeCacheOpts.BadgerOptions = append(c.writeCacheOpts.BadgerOptions, writecachebadger.WithMetrics(wcMetrics)) - } + c.writeCacheOpts = append(c.writeCacheOpts, writecache.WithMetrics(wcMetrics)) } } diff --git a/pkg/local_object_storage/shard/shard_test.go b/pkg/local_object_storage/shard/shard_test.go index 811c7f3a..807633cd 100644 --- a/pkg/local_object_storage/shard/shard_test.go +++ b/pkg/local_object_storage/shard/shard_test.go @@ -11,9 +11,7 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree" meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama" - writecacheconfig "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache/config" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache/writecachebadger" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache/writecachebbolt" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger/test" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" @@ -33,7 +31,7 @@ func (s epochState) CurrentEpoch() uint64 { type shardOptions struct { rootPath string dontRelease bool - wcOpts writecacheconfig.Options + wcOpts []writecache.Option bsOpts []blobstor.Option metaOptions []meta.Option @@ -48,22 +46,12 @@ func newCustomShard(t testing.TB, enableWriteCache bool, o shardOptions) *Shard if o.rootPath == "" { o.rootPath = t.TempDir() } - if enableWriteCache && o.wcOpts.Type == 0 { - o.wcOpts.Type = writecacheconfig.TypeBBolt - } var sh *Shard if enableWriteCache { - switch o.wcOpts.Type { - case writecacheconfig.TypeBBolt: - o.wcOpts.BBoltOptions = append( - []writecachebbolt.Option{writecachebbolt.WithPath(filepath.Join(o.rootPath, "wcache"))}, - o.wcOpts.BBoltOptions...) - case writecacheconfig.TypeBadger: - o.wcOpts.BadgerOptions = append( - []writecachebadger.Option{writecachebadger.WithPath(filepath.Join(o.rootPath, "wcache"))}, - o.wcOpts.BadgerOptions...) - } + o.wcOpts = append( + []writecache.Option{writecache.WithPath(filepath.Join(o.rootPath, "wcache"))}, + o.wcOpts...) } if o.bsOpts == nil { diff --git a/pkg/local_object_storage/shard/shutdown_test.go b/pkg/local_object_storage/shard/shutdown_test.go index 163c3a4a..b94ea50d 100644 --- a/pkg/local_object_storage/shard/shutdown_test.go +++ b/pkg/local_object_storage/shard/shutdown_test.go @@ -7,8 +7,7 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil" - writecacheconfig "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache/config" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache/writecachebbolt" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache" cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" "github.com/stretchr/testify/require" @@ -36,11 +35,8 @@ func TestWriteCacheObjectLoss(t *testing.T) { } dir := t.TempDir() - wcOpts := writecacheconfig.Options{ - Type: writecacheconfig.TypeBBolt, - BBoltOptions: []writecachebbolt.Option{ - writecachebbolt.WithMaxObjectSize(smallSize * 2), - }, + wcOpts := []writecache.Option{ + writecache.WithMaxObjectSize(smallSize * 2), } sh := newCustomShard(t, true, shardOptions{dontRelease: true, rootPath: dir, wcOpts: wcOpts}) diff --git a/pkg/local_object_storage/writecache/benchmark/writecache_test.go b/pkg/local_object_storage/writecache/benchmark/writecache_test.go index ea4bf0d3..727d0fc7 100644 --- a/pkg/local_object_storage/writecache/benchmark/writecache_test.go +++ b/pkg/local_object_storage/writecache/benchmark/writecache_test.go @@ -3,35 +3,26 @@ package benchmark import ( "context" "testing" - "time" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/teststore" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil" meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache/writecachebadger" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache/writecachebbolt" "github.com/stretchr/testify/require" ) func BenchmarkWritecacheSeq(b *testing.B) { const payloadSize = 8 << 10 b.Run("bbolt_seq", func(b *testing.B) { - benchmarkPutSeq(b, newBBoltCache(b), payloadSize) - }) - b.Run("badger_seq", func(b *testing.B) { - benchmarkPutSeq(b, newBadgerCache(b), payloadSize) + benchmarkPutSeq(b, newCache(b), payloadSize) }) } func BenchmarkWritecachePar(b *testing.B) { const payloadSize = 8 << 10 b.Run("bbolt_par", func(b *testing.B) { - benchmarkPutPar(b, newBBoltCache(b), payloadSize) - }) - b.Run("badger_par", func(b *testing.B) { - benchmarkPutPar(b, newBadgerCache(b), payloadSize) + benchmarkPutPar(b, newCache(b), payloadSize) }) } @@ -95,28 +86,15 @@ func (testMetabase) UpdateStorageID(context.Context, meta.UpdateStorageIDPrm) (m return meta.UpdateStorageIDRes{}, nil } -func newBBoltCache(b *testing.B) writecache.Cache { +func newCache(b *testing.B) writecache.Cache { bs := teststore.New( teststore.WithPut(func(pp common.PutPrm) (common.PutRes, error) { return common.PutRes{}, nil }), ) - return writecachebbolt.New( - writecachebbolt.WithPath(b.TempDir()), - writecachebbolt.WithBlobstor(bs), - writecachebbolt.WithMetabase(testMetabase{}), - writecachebbolt.WithMaxCacheSize(256<<30), - writecachebbolt.WithSmallObjectSize(128<<10), - ) -} - -func newBadgerCache(b *testing.B) writecache.Cache { - bs := teststore.New( - teststore.WithPut(func(pp common.PutPrm) (common.PutRes, error) { return common.PutRes{}, nil }), - ) - return writecachebadger.New( - writecachebadger.WithPath(b.TempDir()), - writecachebadger.WithBlobstor(bs), - writecachebadger.WithMetabase(testMetabase{}), - writecachebadger.WithMaxCacheSize(256<<30), - writecachebadger.WithGCInterval(10*time.Second), + return writecache.New( + writecache.WithPath(b.TempDir()), + writecache.WithBlobstor(bs), + writecache.WithMetabase(testMetabase{}), + writecache.WithMaxCacheSize(256<<30), + writecache.WithSmallObjectSize(128<<10), ) } diff --git a/pkg/local_object_storage/writecache/writecachebbolt/cachebbolt.go b/pkg/local_object_storage/writecache/cachebbolt.go similarity index 92% rename from pkg/local_object_storage/writecache/writecachebbolt/cachebbolt.go rename to pkg/local_object_storage/writecache/cachebbolt.go index 9bbf5b7d..5eeda33a 100644 --- a/pkg/local_object_storage/writecache/writecachebbolt/cachebbolt.go +++ b/pkg/local_object_storage/writecache/cachebbolt.go @@ -1,4 +1,4 @@ -package writecachebbolt +package writecache import ( "context" @@ -8,7 +8,6 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" "go.etcd.io/bbolt" @@ -58,7 +57,7 @@ const ( var defaultBucket = []byte{0} // New creates new writecache instance. -func New(opts ...Option) writecache.Cache { +func New(opts ...Option) Cache { c := &cache{ flushCh: make(chan objectInfo), mode: mode.ReadWrite, @@ -73,7 +72,7 @@ func New(opts ...Option) writecache.Cache { maxBatchSize: bbolt.DefaultMaxBatchSize, maxBatchDelay: bbolt.DefaultMaxBatchDelay, openFile: os.OpenFile, - metrics: writecache.DefaultMetrics(), + metrics: DefaultMetrics(), }, } @@ -89,8 +88,8 @@ func (c *cache) SetLogger(l *logger.Logger) { c.log = l } -func (c *cache) DumpInfo() writecache.Info { - return writecache.Info{ +func (c *cache) DumpInfo() Info { + return Info{ Path: c.path, } } diff --git a/pkg/local_object_storage/writecache/config/config.go b/pkg/local_object_storage/writecache/config/config.go deleted file mode 100644 index 91f097e1..00000000 --- a/pkg/local_object_storage/writecache/config/config.go +++ /dev/null @@ -1,22 +0,0 @@ -// Package config provides the common configuration options for write cache implementations. -package config - -import ( - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache/writecachebadger" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache/writecachebbolt" -) - -// Type is the write cache implementation type. -type Type int - -const ( - TypeBBolt Type = iota - TypeBadger -) - -// Options are the configuration options for the write cache. -type Options struct { - Type Type - BBoltOptions []writecachebbolt.Option - BadgerOptions []writecachebadger.Option -} diff --git a/pkg/local_object_storage/writecache/writecachebbolt/delete.go b/pkg/local_object_storage/writecache/delete.go similarity index 88% rename from pkg/local_object_storage/writecache/writecachebbolt/delete.go rename to pkg/local_object_storage/writecache/delete.go index 15c83eed..0a4f4d65 100644 --- a/pkg/local_object_storage/writecache/writecachebbolt/delete.go +++ b/pkg/local_object_storage/writecache/delete.go @@ -1,4 +1,4 @@ -package writecachebbolt +package writecache import ( "context" @@ -8,7 +8,6 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" storagelog "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/log" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "go.etcd.io/bbolt" @@ -27,7 +26,7 @@ func (c *cache) Delete(ctx context.Context, addr oid.Address) error { defer span.End() deleted := false - storageType := writecache.StorageTypeUndefined + storageType := StorageTypeUndefined startedAt := time.Now() defer func() { c.metrics.Delete(time.Since(startedAt), deleted, storageType) @@ -36,7 +35,7 @@ func (c *cache) Delete(ctx context.Context, addr oid.Address) error { c.modeMtx.RLock() defer c.modeMtx.RUnlock() if c.readOnly() { - return writecache.ErrReadOnly + return ErrReadOnly } saddr := addr.EncodeToString() @@ -49,7 +48,7 @@ func (c *cache) Delete(ctx context.Context, addr oid.Address) error { }) if dataSize > 0 { - storageType = writecache.StorageTypeDB + storageType = StorageTypeDB var recordDeleted bool err := c.db.Update(func(tx *bbolt.Tx) error { b := tx.Bucket(defaultBucket) @@ -74,7 +73,7 @@ func (c *cache) Delete(ctx context.Context, addr oid.Address) error { return nil } - storageType = writecache.StorageTypeFSTree + storageType = StorageTypeFSTree _, err := c.fsTree.Delete(ctx, common.DeletePrm{Address: addr}) if err == nil { storagelog.Write(c.log, diff --git a/pkg/local_object_storage/writecache/writecachebbolt/flush.go b/pkg/local_object_storage/writecache/flush.go similarity index 93% rename from pkg/local_object_storage/writecache/writecachebbolt/flush.go rename to pkg/local_object_storage/writecache/flush.go index d73e374f..f4ceec8c 100644 --- a/pkg/local_object_storage/writecache/writecachebbolt/flush.go +++ b/pkg/local_object_storage/writecache/flush.go @@ -1,4 +1,4 @@ -package writecachebbolt +package writecache import ( "bytes" @@ -12,7 +12,6 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr" meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" @@ -195,7 +194,7 @@ func (c *cache) flushFSTree(ctx context.Context, ignoreErrors bool) error { return err } - err = c.flushObject(ctx, &obj, e.ObjectData, writecache.StorageTypeFSTree) + err = c.flushObject(ctx, &obj, e.ObjectData, StorageTypeFSTree) if err != nil { if ignoreErrors { return nil @@ -224,7 +223,7 @@ func (c *cache) workerFlushSmall(ctx context.Context) { return } - err := c.flushObject(ctx, objInfo.obj, objInfo.data, writecache.StorageTypeDB) + err := c.flushObject(ctx, objInfo.obj, objInfo.data, StorageTypeDB) if err != nil { // Error is handled in flushObject. continue @@ -235,7 +234,7 @@ func (c *cache) workerFlushSmall(ctx context.Context) { } // flushObject is used to write object directly to the main storage. -func (c *cache) flushObject(ctx context.Context, obj *objectSDK.Object, data []byte, st writecache.StorageType) error { +func (c *cache) flushObject(ctx context.Context, obj *objectSDK.Object, data []byte, st StorageType) error { var err error defer func() { @@ -274,7 +273,7 @@ func (c *cache) flushObject(ctx context.Context, obj *objectSDK.Object, data []b // Write-cache must be in readonly mode to ensure correctness of an operation and // to prevent interference with background flush workers. func (c *cache) Flush(ctx context.Context, ignoreErrors bool) error { - ctx, span := tracing.StartSpanFromContext(ctx, "writecache.Flush", + ctx, span := tracing.StartSpanFromContext(ctx, "Flush", trace.WithAttributes( attribute.Bool("ignore_errors", ignoreErrors), )) @@ -315,7 +314,7 @@ func (c *cache) flush(ctx context.Context, ignoreErrors bool) error { return err } - if err := c.flushObject(ctx, &obj, data, writecache.StorageTypeDB); err != nil { + if err := c.flushObject(ctx, &obj, data, StorageTypeDB); err != nil { return err } } diff --git a/pkg/local_object_storage/writecache/writecachetest/flush.go b/pkg/local_object_storage/writecache/flush_test.go similarity index 62% rename from pkg/local_object_storage/writecache/writecachetest/flush.go rename to pkg/local_object_storage/writecache/flush_test.go index 2c495df5..20db1de9 100644 --- a/pkg/local_object_storage/writecache/writecachetest/flush.go +++ b/pkg/local_object_storage/writecache/flush_test.go @@ -1,7 +1,8 @@ -package writecachetest +package writecache import ( "context" + "os" "path/filepath" "sync/atomic" "testing" @@ -13,12 +14,103 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil" meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger/test" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" + oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test" "github.com/stretchr/testify/require" + "go.etcd.io/bbolt" + "go.uber.org/zap" ) +func TestFlush(t *testing.T) { + testlogger := test.NewLogger(t, true) + + createCacheFn := func(t *testing.T, smallSize uint64, mb *meta.DB, bs MainStorage, opts ...Option) Cache { + return New( + append([]Option{ + WithLogger(testlogger), + WithPath(filepath.Join(t.TempDir(), "writecache")), + WithSmallObjectSize(smallSize), + WithMetabase(mb), + WithBlobstor(bs), + WithDisableBackgroundFlush(), + }, opts...)...) + } + + errCountOpt := func() (Option, *atomic.Uint32) { + cnt := &atomic.Uint32{} + return WithReportErrorFunc(func(msg string, err error) { + cnt.Add(1) + testlogger.Warn(msg, zap.Uint32("error_count", cnt.Load()), zap.Error(err)) + }), cnt + } + + failures := []TestFailureInjector[Option]{ + { + Desc: "db, invalid address", + InjectFn: func(t *testing.T, wc Cache) { + c := wc.(*cache) + obj := testutil.GenerateObject() + data, err := obj.Marshal() + require.NoError(t, err) + require.NoError(t, c.db.Batch(func(tx *bbolt.Tx) error { + b := tx.Bucket(defaultBucket) + return b.Put([]byte{1, 2, 3}, data) + })) + }, + }, + { + Desc: "db, invalid object", + InjectFn: func(t *testing.T, wc Cache) { + c := wc.(*cache) + require.NoError(t, c.db.Batch(func(tx *bbolt.Tx) error { + b := tx.Bucket(defaultBucket) + k := []byte(oidtest.Address().EncodeToString()) + v := []byte{1, 2, 3} + return b.Put(k, v) + })) + }, + }, + { + Desc: "fs, read error", + InjectFn: func(t *testing.T, wc Cache) { + c := wc.(*cache) + obj := testutil.GenerateObject() + data, err := obj.Marshal() + require.NoError(t, err) + + var prm common.PutPrm + prm.Address = objectCore.AddressOf(obj) + prm.RawData = data + + _, err = c.fsTree.Put(context.Background(), prm) + require.NoError(t, err) + + p := prm.Address.Object().EncodeToString() + "." + prm.Address.Container().EncodeToString() + p = filepath.Join(c.fsTree.RootPath, p[:1], p[1:]) + + _, err = os.Stat(p) // sanity check + require.NoError(t, err) + require.NoError(t, os.Truncate(p, 0)) // corrupt the file contents, so that it can't be unmarshalled + }, + }, + { + Desc: "fs, invalid object", + InjectFn: func(t *testing.T, wc Cache) { + c := wc.(*cache) + var prm common.PutPrm + prm.Address = oidtest.Address() + prm.RawData = []byte{1, 2, 3} + _, err := c.fsTree.Put(context.Background(), prm) + require.NoError(t, err) + }, + }, + } + + runFlushTest(t, createCacheFn, errCountOpt, failures...) +} + const ( objCount = 4 smallSize = 256 @@ -28,13 +120,13 @@ type CreateCacheFunc[Option any] func( t *testing.T, smallSize uint64, meta *meta.DB, - bs writecache.MainStorage, + bs MainStorage, opts ...Option, -) writecache.Cache +) Cache type TestFailureInjector[Option any] struct { Desc string - InjectFn func(*testing.T, writecache.Cache) + InjectFn func(*testing.T, Cache) } type objectPair struct { @@ -42,7 +134,7 @@ type objectPair struct { obj *objectSDK.Object } -func TestFlush[Option any]( +func runFlushTest[Option any]( t *testing.T, createCacheFn CreateCacheFunc[Option], errCountOption func() (Option, *atomic.Uint32), @@ -105,7 +197,7 @@ func newCache[Option any]( createCacheFn CreateCacheFunc[Option], smallSize uint64, opts ...Option, -) (writecache.Cache, *blobstor.BlobStor, *meta.DB) { +) (Cache, *blobstor.BlobStor, *meta.DB) { dir := t.TempDir() mb := meta.New( meta.WithPath(filepath.Join(dir, "meta")), @@ -136,7 +228,7 @@ func newCache[Option any]( return wc, bs, mb } -func putObject(t *testing.T, c writecache.Cache, size int) objectPair { +func putObject(t *testing.T, c Cache, size int) objectPair { obj := testutil.GenerateObjectWithSize(size) data, err := obj.Marshal() require.NoError(t, err) @@ -152,7 +244,7 @@ func putObject(t *testing.T, c writecache.Cache, size int) objectPair { return objectPair{prm.Address, prm.Object} } -func putObjects(t *testing.T, c writecache.Cache) []objectPair { +func putObjects(t *testing.T, c Cache) []objectPair { objects := make([]objectPair, objCount) for i := range objects { objects[i] = putObject(t, c, 1+(i%2)*smallSize) diff --git a/pkg/local_object_storage/writecache/writecachebbolt/generic_test.go b/pkg/local_object_storage/writecache/generic_test.go similarity index 94% rename from pkg/local_object_storage/writecache/writecachebbolt/generic_test.go rename to pkg/local_object_storage/writecache/generic_test.go index 7eadd1af..a6d9e479 100644 --- a/pkg/local_object_storage/writecache/writecachebbolt/generic_test.go +++ b/pkg/local_object_storage/writecache/generic_test.go @@ -1,4 +1,4 @@ -package writecachebbolt +package writecache import ( "testing" diff --git a/pkg/local_object_storage/writecache/writecachebbolt/get.go b/pkg/local_object_storage/writecache/get.go similarity index 90% rename from pkg/local_object_storage/writecache/writecachebbolt/get.go rename to pkg/local_object_storage/writecache/get.go index 838b207b..d2496e44 100644 --- a/pkg/local_object_storage/writecache/writecachebbolt/get.go +++ b/pkg/local_object_storage/writecache/get.go @@ -1,4 +1,4 @@ -package writecachebbolt +package writecache import ( "bytes" @@ -8,7 +8,6 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" @@ -36,7 +35,7 @@ func (c *cache) Get(ctx context.Context, addr oid.Address) (*objectSDK.Object, e func (c *cache) getInternal(ctx context.Context, saddr string, addr oid.Address) (*objectSDK.Object, error) { found := false - storageType := writecache.StorageTypeUndefined + storageType := StorageTypeUndefined startedAt := time.Now() defer func() { c.metrics.Get(time.Since(startedAt), found, storageType) @@ -46,7 +45,7 @@ func (c *cache) getInternal(ctx context.Context, saddr string, addr oid.Address) if err == nil { obj := objectSDK.New() found = true - storageType = writecache.StorageTypeDB + storageType = StorageTypeDB return obj, obj.Unmarshal(value) } @@ -56,7 +55,7 @@ func (c *cache) getInternal(ctx context.Context, saddr string, addr oid.Address) } found = true - storageType = writecache.StorageTypeFSTree + storageType = StorageTypeFSTree return res.Object, nil } @@ -66,7 +65,7 @@ func (c *cache) getInternal(ctx context.Context, saddr string, addr oid.Address) func (c *cache) Head(ctx context.Context, addr oid.Address) (*objectSDK.Object, error) { saddr := addr.EncodeToString() - ctx, span := tracing.StartSpanFromContext(ctx, "writecache.Head", + ctx, span := tracing.StartSpanFromContext(ctx, "Head", trace.WithAttributes( attribute.String("address", saddr), )) diff --git a/pkg/local_object_storage/writecache/writecachebbolt/iterate.go b/pkg/local_object_storage/writecache/iterate.go similarity index 97% rename from pkg/local_object_storage/writecache/writecachebbolt/iterate.go rename to pkg/local_object_storage/writecache/iterate.go index 530db42a..5349c069 100644 --- a/pkg/local_object_storage/writecache/writecachebbolt/iterate.go +++ b/pkg/local_object_storage/writecache/iterate.go @@ -1,4 +1,4 @@ -package writecachebbolt +package writecache import ( "errors" diff --git a/pkg/local_object_storage/writecache/writecachebbolt/mode.go b/pkg/local_object_storage/writecache/mode.go similarity index 98% rename from pkg/local_object_storage/writecache/writecachebbolt/mode.go rename to pkg/local_object_storage/writecache/mode.go index b187996a..e3ff2286 100644 --- a/pkg/local_object_storage/writecache/writecachebbolt/mode.go +++ b/pkg/local_object_storage/writecache/mode.go @@ -1,4 +1,4 @@ -package writecachebbolt +package writecache import ( "context" diff --git a/pkg/local_object_storage/writecache/writecachebbolt/options.go b/pkg/local_object_storage/writecache/options.go similarity index 92% rename from pkg/local_object_storage/writecache/writecachebbolt/options.go rename to pkg/local_object_storage/writecache/options.go index 3ea32919..c8eb1bc4 100644 --- a/pkg/local_object_storage/writecache/writecachebbolt/options.go +++ b/pkg/local_object_storage/writecache/options.go @@ -1,11 +1,10 @@ -package writecachebbolt +package writecache import ( "io/fs" "os" "time" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" "go.uber.org/zap" ) @@ -18,9 +17,9 @@ type options struct { // path is a path to a directory for write-cache. path string // blobstor is the main persistent storage. - blobstor writecache.MainStorage + blobstor MainStorage // metabase is the metabase instance. - metabase writecache.Metabase + metabase Metabase // maxObjectSize is the maximum size of the object stored in the write-cache. maxObjectSize uint64 // smallObjectSize is the maximum size of the object stored in the database. @@ -43,7 +42,7 @@ type options struct { // openFile is the function called internally by bbolt to open database files. Useful for hermetic testing. openFile func(string, int, fs.FileMode) (*os.File, error) // metrics is metrics implementation - metrics writecache.Metrics + metrics Metrics // disableBackgroundFlush is for testing purposes only. disableBackgroundFlush bool } @@ -63,14 +62,14 @@ func WithPath(path string) Option { } // WithBlobstor sets main object storage. -func WithBlobstor(bs writecache.MainStorage) Option { +func WithBlobstor(bs MainStorage) Option { return func(o *options) { o.blobstor = bs } } // WithMetabase sets metabase. -func WithMetabase(db writecache.Metabase) Option { +func WithMetabase(db Metabase) Option { return func(o *options) { o.metabase = db } @@ -152,7 +151,7 @@ func WithOpenFile(f func(string, int, fs.FileMode) (*os.File, error)) Option { } // WithMetrics sets metrics implementation. -func WithMetrics(metrics writecache.Metrics) Option { +func WithMetrics(metrics Metrics) Option { return func(o *options) { o.metrics = metrics } diff --git a/pkg/local_object_storage/writecache/writecachebbolt/put.go b/pkg/local_object_storage/writecache/put.go similarity index 84% rename from pkg/local_object_storage/writecache/writecachebbolt/put.go rename to pkg/local_object_storage/writecache/put.go index 63fa544e..6fc655c6 100644 --- a/pkg/local_object_storage/writecache/writecachebbolt/put.go +++ b/pkg/local_object_storage/writecache/put.go @@ -1,27 +1,18 @@ -package writecachebbolt +package writecache import ( "context" - "errors" "time" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" storagelog "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/log" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" "go.etcd.io/bbolt" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" ) -var ( - // ErrBigObject is returned when object is too big to be placed in cache. - ErrBigObject = errors.New("too big object") - // ErrOutOfSpace is returned when there is no space left to put a new object. - ErrOutOfSpace = errors.New("no space left in the write cache") -) - // Put puts object to write-cache. // // Returns ErrReadOnly if write-cache is in R/O mode. @@ -38,7 +29,7 @@ func (c *cache) Put(ctx context.Context, prm common.PutPrm) (common.PutRes, erro startedAt := time.Now() added := false - storageType := writecache.StorageTypeUndefined + storageType := StorageTypeUndefined defer func() { c.metrics.Put(time.Since(startedAt), added, storageType) }() @@ -46,7 +37,7 @@ func (c *cache) Put(ctx context.Context, prm common.PutPrm) (common.PutRes, erro c.modeMtx.RLock() defer c.modeMtx.RUnlock() if c.readOnly() { - return common.PutRes{}, writecache.ErrReadOnly + return common.PutRes{}, ErrReadOnly } sz := uint64(len(prm.RawData)) @@ -61,7 +52,7 @@ func (c *cache) Put(ctx context.Context, prm common.PutPrm) (common.PutRes, erro } if sz <= c.smallObjectSize { - storageType = writecache.StorageTypeDB + storageType = StorageTypeDB err := c.putSmall(oi) if err == nil { added = true @@ -69,7 +60,7 @@ func (c *cache) Put(ctx context.Context, prm common.PutPrm) (common.PutRes, erro return common.PutRes{}, err } - storageType = writecache.StorageTypeFSTree + storageType = StorageTypeFSTree err := c.putBig(ctx, oi.addr, prm) if err == nil { added = true diff --git a/pkg/local_object_storage/writecache/writecachebbolt/state.go b/pkg/local_object_storage/writecache/state.go similarity index 98% rename from pkg/local_object_storage/writecache/writecachebbolt/state.go rename to pkg/local_object_storage/writecache/state.go index 9261b260..bc75aaf2 100644 --- a/pkg/local_object_storage/writecache/writecachebbolt/state.go +++ b/pkg/local_object_storage/writecache/state.go @@ -1,4 +1,4 @@ -package writecachebbolt +package writecache import ( "fmt" diff --git a/pkg/local_object_storage/writecache/writecachebbolt/storage.go b/pkg/local_object_storage/writecache/storage.go similarity index 93% rename from pkg/local_object_storage/writecache/writecachebbolt/storage.go rename to pkg/local_object_storage/writecache/storage.go index 7a503062..5c25a3b3 100644 --- a/pkg/local_object_storage/writecache/writecachebbolt/storage.go +++ b/pkg/local_object_storage/writecache/storage.go @@ -1,4 +1,4 @@ -package writecachebbolt +package writecache import ( "context" @@ -10,7 +10,6 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree" storagelog "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/log" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" @@ -78,7 +77,7 @@ func (c *cache) deleteFromDB(key string) { }) if err == nil { - c.metrics.Evict(writecache.StorageTypeDB) + c.metrics.Evict(StorageTypeDB) storagelog.Write(c.log, storagelog.AddressField(key), storagelog.StorageTypeField(wcStorageType), @@ -103,7 +102,7 @@ func (c *cache) deleteFromDisk(ctx context.Context, addr oid.Address) { storagelog.StorageTypeField(wcStorageType), storagelog.OpField("fstree DELETE"), ) - c.metrics.Evict(writecache.StorageTypeFSTree) + c.metrics.Evict(StorageTypeFSTree) // counter changed by fstree c.estimateCacheSize() } diff --git a/pkg/local_object_storage/writecache/writecachebbolt/util.go b/pkg/local_object_storage/writecache/util.go similarity index 95% rename from pkg/local_object_storage/writecache/writecachebbolt/util.go rename to pkg/local_object_storage/writecache/util.go index fe225583..0ed4a954 100644 --- a/pkg/local_object_storage/writecache/writecachebbolt/util.go +++ b/pkg/local_object_storage/writecache/util.go @@ -1,4 +1,4 @@ -package writecachebbolt +package writecache import ( "io/fs" diff --git a/pkg/local_object_storage/writecache/writecachebadger/cachebadger.go b/pkg/local_object_storage/writecache/writecachebadger/cachebadger.go deleted file mode 100644 index 484f0181..00000000 --- a/pkg/local_object_storage/writecache/writecachebadger/cachebadger.go +++ /dev/null @@ -1,138 +0,0 @@ -package writecachebadger - -import ( - "context" - "sync" - - "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" - objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" - oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" - "go.uber.org/zap" -) - -type cache struct { - options - - mode mode.Mode - modeMtx sync.RWMutex - - // flushCh is a channel with objects to flush. - flushCh chan objectInfo - // scheduled4Flush contains objects scheduled for flush via flushCh - // helps to avoid multiple flushing of one object - scheduled4Flush map[oid.Address]struct{} - scheduled4FlushMtx sync.RWMutex - // wg is a wait group for flush workers. - wg sync.WaitGroup - // store contains underlying database. - store - // cancel is cancel function, protected by modeMtx in Close. - cancel func() -} - -// wcStorageType is used for write-cache operations logging. -const wcStorageType = "write-cache" - -type objectInfo struct { - addr oid.Address - data []byte - obj *objectSDK.Object -} - -const ( - defaultMaxObjectSize = 64 << 20 // 64 MiB - defaultSmallObjectSize = 32 << 10 // 32 KiB - defaultMaxCacheSize = 1 << 30 // 1 GiB -) - -// New creates new writecache instance. -func New(opts ...Option) writecache.Cache { - c := &cache{ - flushCh: make(chan objectInfo), - mode: mode.ReadWrite, - scheduled4Flush: map[oid.Address]struct{}{}, - - options: options{ - log: &logger.Logger{Logger: zap.NewNop()}, - maxObjectSize: defaultMaxObjectSize, - workersCount: defaultFlushWorkersCount, - maxCacheSize: defaultMaxCacheSize, - metrics: writecache.DefaultMetrics(), - }, - } - - for i := range opts { - opts[i](&c.options) - } - - return c -} - -// SetLogger sets logger. It is used after the shard ID was generated to use it in logs. -func (c *cache) SetLogger(l *logger.Logger) { - c.log = l -} - -func (c *cache) DumpInfo() writecache.Info { - return writecache.Info{ - Path: c.path, - } -} - -// Open opens and initializes database. Reads object counters from the ObjectCounters instance. -func (c *cache) Open(_ context.Context, readOnly bool) error { - c.modeMtx.Lock() - defer c.modeMtx.Unlock() - - err := c.openStore(readOnly) - if err != nil { - return metaerr.Wrap(err) - } - return metaerr.Wrap(c.initCounters()) -} - -// Init runs necessary services. -func (c *cache) Init() error { - c.modeMtx.Lock() - defer c.modeMtx.Unlock() - - c.log.Info(logs.WritecacheBadgerInitExperimental) - c.metrics.SetMode(c.mode) - ctx, cancel := context.WithCancel(context.Background()) - c.cancel = cancel - c.runFlushLoop(ctx) - c.runGCLoop(ctx) - return nil -} - -// Close closes db connection and stops services. Executes ObjectCounters.FlushAndClose op. -func (c *cache) Close() error { - // We cannot lock mutex for the whole operation duration - // because it is taken by some background workers, so `wg.Wait()` is done without modeMtx. - c.modeMtx.Lock() - if c.cancel != nil { - c.cancel() - c.cancel = nil - } - c.mode = mode.DegradedReadOnly // prevent new operations from being processed - c.modeMtx.Unlock() - - c.wg.Wait() - - c.modeMtx.Lock() - defer c.modeMtx.Unlock() - - var err error - if c.db != nil { - err = c.db.Close() - if err != nil { - c.db = nil - } - } - c.metrics.Close() - return nil -} diff --git a/pkg/local_object_storage/writecache/writecachebadger/delete.go b/pkg/local_object_storage/writecache/writecachebadger/delete.go deleted file mode 100644 index f96bf270..00000000 --- a/pkg/local_object_storage/writecache/writecachebadger/delete.go +++ /dev/null @@ -1,70 +0,0 @@ -package writecachebadger - -import ( - "context" - "time" - - storagelog "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/log" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache" - "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" - apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" - oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" - "github.com/dgraph-io/badger/v4" - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/trace" -) - -// Delete removes object from write-cache. -// -// Returns an error of type apistatus.ObjectNotFound if object is missing in write-cache. -func (c *cache) Delete(ctx context.Context, addr oid.Address) error { - _, span := tracing.StartSpanFromContext(ctx, "writecache.Delete", - trace.WithAttributes( - attribute.String("address", addr.EncodeToString()), - )) - defer span.End() - - deleted := false - storageType := writecache.StorageTypeUndefined - startedAt := time.Now() - defer func() { - c.metrics.Delete(time.Since(startedAt), deleted, storageType) - }() - - c.modeMtx.RLock() - defer c.modeMtx.RUnlock() - if c.readOnly() { - return writecache.ErrReadOnly - } - - key := addr2key(addr) - - err := c.db.Update(func(tx *badger.Txn) error { - it, err := tx.Get(key[:]) - if err != nil { - if err == badger.ErrKeyNotFound { - return logicerr.Wrap(new(apistatus.ObjectNotFound)) - } - return err - } - if it.ValueSize() > 0 { - storageType = writecache.StorageTypeDB - err := tx.Delete(key[:]) - if err == nil { - storagelog.Write(c.log, - storagelog.AddressField(addr.EncodeToString()), - storagelog.StorageTypeField(wcStorageType), - storagelog.OpField("db DELETE"), - ) - deleted = true - c.decDB() - } - return err - } - return nil - }) - - return metaerr.Wrap(err) -} diff --git a/pkg/local_object_storage/writecache/writecachebadger/flush.go b/pkg/local_object_storage/writecache/writecachebadger/flush.go deleted file mode 100644 index 48e31dee..00000000 --- a/pkg/local_object_storage/writecache/writecachebadger/flush.go +++ /dev/null @@ -1,286 +0,0 @@ -package writecachebadger - -import ( - "bytes" - "context" - "encoding/hex" - "errors" - "fmt" - "time" - - "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" - objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr" - meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache" - "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" - objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" - "github.com/dgraph-io/badger/v4" - "github.com/dgraph-io/ristretto/z" - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/trace" - "go.uber.org/zap" -) - -const ( - // flushBatchSize is amount of keys which will be read from cache to be flushed - // to the main storage. It is used to reduce contention between cache put - // and cache persist. - flushBatchSize = 512 - // defaultFlushWorkersCount is number of workers for putting objects in main storage. - defaultFlushWorkersCount = 20 - // defaultFlushInterval is default time interval between successive flushes. - defaultFlushInterval = time.Second -) - -type collector struct { - cache *cache - scheduled int - processed int -} - -func (c *collector) send(ctx context.Context, cancel func(), buf *z.Buffer) error { - list, err := badger.BufferToKVList(buf) - if err != nil { - return err - } - for _, kv := range list.Kv { - select { - case <-ctx.Done(): - return nil - default: - } - if kv.StreamDone { - return nil - } - if c.scheduled >= flushBatchSize { - cancel() - return nil - } - if got, want := len(kv.Key), len(internalKey{}); got != want { - c.cache.log.Debug( - fmt.Sprintf("not expected db key len: got %d, want %d", got, want)) - continue - } - c.processed++ - obj := objectSDK.New() - val := bytes.Clone(kv.Value) - if err = obj.Unmarshal(val); err != nil { - continue - } - addr := objectCore.AddressOf(obj) - c.cache.scheduled4FlushMtx.RLock() - _, ok := c.cache.scheduled4Flush[addr] - c.cache.scheduled4FlushMtx.RUnlock() - if ok { - c.cache.log.Debug(logs.WritecacheBadgerObjAlreadyScheduled, zap.Stringer("obj", addr)) - continue - } - c.cache.scheduled4FlushMtx.Lock() - c.cache.scheduled4Flush[addr] = struct{}{} - c.cache.scheduled4FlushMtx.Unlock() - c.scheduled++ - select { - case c.cache.flushCh <- objectInfo{ - addr: addr, - data: val, - obj: obj, - }: - case <-ctx.Done(): - return nil - } - } - return nil -} - -// runFlushLoop starts background workers which periodically flush objects to the blobstor. -func (c *cache) runFlushLoop(ctx context.Context) { - if c.disableBackgroundFlush { - return - } - for i := 0; i < c.workersCount; i++ { - c.wg.Add(1) - go c.workerFlushSmall(ctx) - } - - c.wg.Add(1) - go func() { - defer c.wg.Done() - - tt := time.NewTimer(defaultFlushInterval) - defer tt.Stop() - - for { - select { - case <-tt.C: - c.flushSmallObjects(ctx) - tt.Reset(defaultFlushInterval) - case <-ctx.Done(): - return - } - } - }() -} - -func (c *cache) flushSmallObjects(ctx context.Context) { - for { - select { - case <-ctx.Done(): - return - default: - } - c.modeMtx.RLock() - if c.readOnly() { - c.modeMtx.RUnlock() - time.Sleep(time.Second) - continue - } - - // Using the db after Close will panic and badger won't wait for outstanding txs, - // so we need to check manually. - if c.db.IsClosed() { - c.modeMtx.RUnlock() - return - } - ctx, cancel := context.WithCancel(ctx) - coll := collector{ - cache: c, - } - stream := c.db.NewStream() - // All calls to Send are done by a single goroutine - stream.Send = func(buf *z.Buffer) error { - return coll.send(ctx, cancel, buf) - } - if err := stream.Orchestrate(ctx); err != nil { - c.log.Debug(fmt.Sprintf( - "error during flushing object from wc: %s", err)) - } - c.modeMtx.RUnlock() - if coll.scheduled == 0 { - break - } - c.log.Debug(logs.WritecacheTriedToFlushItemsFromWritecache, - zap.Int("scheduled", coll.scheduled), zap.Int("processed", coll.processed)) - } -} - -func (c *cache) reportFlushError(msg string, addr string, err error) { - if c.reportError != nil { - c.reportError(msg, err) - } else { - c.log.Error(msg, - zap.String("address", addr), - zap.Error(err)) - } -} - -// workerFlushSmall writes small objects to the main storage. -func (c *cache) workerFlushSmall(ctx context.Context) { - defer c.wg.Done() - - var objInfo objectInfo - for { - // Give priority to direct put. - select { - case objInfo = <-c.flushCh: - case <-ctx.Done(): - return - } - - err := c.flushObject(ctx, objInfo.obj, objInfo.data, writecache.StorageTypeDB) - if err == nil { - c.deleteFromDB([]internalKey{addr2key(objInfo.addr)}) - } - c.scheduled4FlushMtx.Lock() - delete(c.scheduled4Flush, objInfo.addr) - c.scheduled4FlushMtx.Unlock() - } -} - -// flushObject is used to write object directly to the main storage. -func (c *cache) flushObject(ctx context.Context, obj *objectSDK.Object, data []byte, st writecache.StorageType) error { - var err error - - defer func() { - c.metrics.Flush(err == nil, st) - }() - - addr := objectCore.AddressOf(obj) - - var prm common.PutPrm - prm.Object = obj - prm.RawData = data - - res, err := c.blobstor.Put(ctx, prm) - if err != nil { - if !errors.Is(err, common.ErrNoSpace) && !errors.Is(err, common.ErrReadOnly) && - !errors.Is(err, blobstor.ErrNoPlaceFound) { - c.reportFlushError(logs.FrostFSNodeCantFlushObjectToBlobstor, - addr.EncodeToString(), err) - } - return err - } - - var updPrm meta.UpdateStorageIDPrm - updPrm.SetAddress(addr) - updPrm.SetStorageID(res.StorageID) - - _, err = c.metabase.UpdateStorageID(ctx, updPrm) - if err != nil { - c.reportFlushError(logs.FrostFSNodeCantUpdateObjectStorageID, - addr.EncodeToString(), err) - } - return err -} - -// Flush flushes all objects from the write-cache to the main storage. -// Write-cache must be in readonly mode to ensure correctness of an operation and -// to prevent interference with background flush workers. -func (c *cache) Flush(ctx context.Context, ignoreErrors bool) error { - ctx, span := tracing.StartSpanFromContext(ctx, "writecache.Flush", - trace.WithAttributes( - attribute.Bool("ignore_errors", ignoreErrors), - )) - defer span.End() - - c.modeMtx.RLock() - defer c.modeMtx.RUnlock() - - return c.flush(ctx, ignoreErrors) -} - -func (c *cache) flush(ctx context.Context, ignoreErrors bool) error { - return c.db.View(func(tx *badger.Txn) error { - it := tx.NewIterator(badger.DefaultIteratorOptions) - defer it.Close() - var key internalKey - for it.Rewind(); it.Valid(); it.Next() { - if got, want := int(it.Item().KeySize()), len(key); got != want { - err := fmt.Errorf("invalid db key len: got %d, want %d", got, want) - c.reportFlushError(logs.FrostFSNodeCantDecodeObjectAddressFromDB, hex.EncodeToString(it.Item().Key()), metaerr.Wrap(err)) - if ignoreErrors { - continue - } - return err - } - if err := it.Item().Value(func(data []byte) error { - var obj objectSDK.Object - if err := obj.Unmarshal(data); err != nil { - copy(key[:], it.Item().Key()) - c.reportFlushError(logs.FrostFSNodeCantUnmarshalObjectFromDB, key.address().EncodeToString(), metaerr.Wrap(err)) - if ignoreErrors { - return nil - } - return err - } - - return c.flushObject(ctx, &obj, data, writecache.StorageTypeDB) - }); err != nil { - return err - } - } - return nil - }) -} diff --git a/pkg/local_object_storage/writecache/writecachebadger/flush_test.go b/pkg/local_object_storage/writecache/writecachebadger/flush_test.go deleted file mode 100644 index 5e3f60ea..00000000 --- a/pkg/local_object_storage/writecache/writecachebadger/flush_test.go +++ /dev/null @@ -1,69 +0,0 @@ -package writecachebadger - -import ( - "path/filepath" - "sync/atomic" - "testing" - "time" - - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil" - meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache/writecachetest" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger/test" - oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test" - "github.com/dgraph-io/badger/v4" - "github.com/stretchr/testify/require" - "go.uber.org/zap" -) - -func TestFlush(t *testing.T) { - testlogger := test.NewLogger(t, true) - - createCacheFn := func(t *testing.T, smallSize uint64, mb *meta.DB, bs writecache.MainStorage, opts ...Option) writecache.Cache { - return New( - append([]Option{ - WithLogger(test.NewLogger(t, true)), - WithPath(filepath.Join(t.TempDir(), "writecache")), - WithMetabase(mb), - WithBlobstor(bs), - WithGCInterval(1 * time.Second), - WithDisableBackgroundFlush(), - }, opts...)...) - } - - errCountOpt := func() (Option, *atomic.Uint32) { - cnt := &atomic.Uint32{} - return WithReportErrorFunc(func(msg string, err error) { - cnt.Add(1) - testlogger.Warn(msg, zap.Uint32("error_count", cnt.Load()), zap.Error(err)) - }), cnt - } - - failures := []writecachetest.TestFailureInjector[Option]{ - { - Desc: "db, invalid address", - InjectFn: func(t *testing.T, wc writecache.Cache) { - c := wc.(*cache) - obj := testutil.GenerateObject() - data, err := obj.Marshal() - require.NoError(t, err) - require.NoError(t, c.db.Update(func(tx *badger.Txn) error { - return tx.Set([]byte{1, 2, 3}, data) - })) - }, - }, - { - Desc: "db, invalid object", - InjectFn: func(t *testing.T, wc writecache.Cache) { - c := wc.(*cache) - key := addr2key(oidtest.Address()) - require.NoError(t, c.db.Update(func(tx *badger.Txn) error { - return tx.Set(key[:], []byte{1, 2, 3}) - })) - }, - }, - } - - writecachetest.TestFlush(t, createCacheFn, errCountOpt, failures...) -} diff --git a/pkg/local_object_storage/writecache/writecachebadger/gc.go b/pkg/local_object_storage/writecache/writecachebadger/gc.go deleted file mode 100644 index 8937ff29..00000000 --- a/pkg/local_object_storage/writecache/writecachebadger/gc.go +++ /dev/null @@ -1,45 +0,0 @@ -package writecachebadger - -import ( - "context" - "time" - - "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" -) - -func (c *cache) runGCLoop(ctx context.Context) { - c.wg.Add(1) - - go func() { - defer c.wg.Done() - - t := time.NewTicker(c.gcInterval) - defer t.Stop() - - for { - select { - case <-ctx.Done(): - return - case <-t.C: - c.runGC() - } - } - }() -} - -func (c *cache) runGC() { - // This serves to synchronize the c.db field when changing mode as well. - c.modeMtx.RLock() - defer c.modeMtx.RUnlock() - - ro := c.readOnly() - if ro { - return - } - - // 0.5 is the recommended value so that write amplification of the value log is 2. - // See https://pkg.go.dev/github.com/dgraph-io/badger/v4#DB.RunValueLogGC for more info. - for c.db.RunValueLogGC(0.5) == nil { - c.log.Debug(logs.WritecacheDBValueLogGCRunCompleted) - } -} diff --git a/pkg/local_object_storage/writecache/writecachebadger/generic_test.go b/pkg/local_object_storage/writecache/writecachebadger/generic_test.go deleted file mode 100644 index 08845665..00000000 --- a/pkg/local_object_storage/writecache/writecachebadger/generic_test.go +++ /dev/null @@ -1,19 +0,0 @@ -package writecachebadger - -import ( - "testing" - "time" - - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/storagetest" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger/test" -) - -func TestGeneric(t *testing.T) { - storagetest.TestAll(t, func(t *testing.T) storagetest.Component { - return New( - WithLogger(test.NewLogger(t, true)), - WithFlushWorkersCount(2), - WithPath(t.TempDir()), - WithGCInterval(1*time.Second)) - }) -} diff --git a/pkg/local_object_storage/writecache/writecachebadger/get.go b/pkg/local_object_storage/writecache/writecachebadger/get.go deleted file mode 100644 index 42403e55..00000000 --- a/pkg/local_object_storage/writecache/writecachebadger/get.go +++ /dev/null @@ -1,95 +0,0 @@ -package writecachebadger - -import ( - "context" - "time" - - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache" - "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" - apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" - objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" - oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" - "github.com/dgraph-io/badger/v4" - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/trace" -) - -// Get returns object from write-cache. -// -// Returns an error of type apistatus.ObjectNotFound if the requested object is missing in write-cache. -func (c *cache) Get(ctx context.Context, addr oid.Address) (*objectSDK.Object, error) { - _, span := tracing.StartSpanFromContext(ctx, "writecache.Get", - trace.WithAttributes( - attribute.String("address", addr.EncodeToString()), - )) - defer span.End() - - obj, err := c.getInternal(addr) - return obj, metaerr.Wrap(err) -} - -func (c *cache) getInternal(addr oid.Address) (*objectSDK.Object, error) { - found := false - storageType := writecache.StorageTypeUndefined - startedAt := time.Now() - defer func() { - c.metrics.Get(time.Since(startedAt), found, storageType) - }() - - k := addr2key(addr) - value, err := Get(c.db, k[:]) - if err == nil { - obj := objectSDK.New() - found = true - storageType = writecache.StorageTypeDB - return obj, obj.Unmarshal(value) - } - - return nil, logicerr.Wrap(new(apistatus.ObjectNotFound)) -} - -// Head returns object header from write-cache. -// -// Returns an error of type apistatus.ObjectNotFound if the requested object is missing in write-cache. -func (c *cache) Head(ctx context.Context, addr oid.Address) (*objectSDK.Object, error) { - _, span := tracing.StartSpanFromContext(ctx, "writecache.Head", - trace.WithAttributes( - attribute.String("address", addr.EncodeToString()), - )) - defer span.End() - - obj, err := c.getInternal(addr) - if err != nil { - return nil, metaerr.Wrap(err) - } - - return obj.CutPayload(), nil -} - -// Get fetches object from the underlying database. -// Key should be a stringified address. -// -// Returns an error of type apistatus.ObjectNotFound if the requested object is missing in db. -func Get(db *badger.DB, key []byte) ([]byte, error) { - var value []byte - - err := db.View(func(tx *badger.Txn) error { - it, err := tx.Get(key) - if err != nil { - if err == badger.ErrKeyNotFound { - return logicerr.Wrap(new(apistatus.ObjectNotFound)) - } - return err - } - v, err := it.ValueCopy(nil) - if err != nil { - return err - } - value = v - return nil - }) - - return value, metaerr.Wrap(err) -} diff --git a/pkg/local_object_storage/writecache/writecachebadger/iterate.go b/pkg/local_object_storage/writecache/writecachebadger/iterate.go deleted file mode 100644 index 11124204..00000000 --- a/pkg/local_object_storage/writecache/writecachebadger/iterate.go +++ /dev/null @@ -1,32 +0,0 @@ -package writecachebadger - -import ( - "fmt" - - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr" - oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" - "github.com/dgraph-io/badger/v4" -) - -// IterateDB iterates over all objects stored in badger.DB instance and passes them to f until error return. -// It is assumed that db is an underlying database of some WriteCache instance. -// -// DB must not be nil and should be opened. -func IterateDB(db *badger.DB, f func(oid.Address) error) error { - return metaerr.Wrap(db.View(func(tx *badger.Txn) error { - opts := badger.DefaultIteratorOptions - opts.PrefetchValues = false - it := tx.NewIterator(opts) - for it.Rewind(); it.Valid(); it.Next() { - var key internalKey - if got, want := len(it.Item().Key()), len(key); got != want { - return fmt.Errorf("invalid db key len: got %d, want %d", got, want) - } - copy(key[:], it.Item().Key()) - if err := f(key.address()); err != nil { - return err - } - } - return nil - })) -} diff --git a/pkg/local_object_storage/writecache/writecachebadger/mode.go b/pkg/local_object_storage/writecache/writecachebadger/mode.go deleted file mode 100644 index 03d86183..00000000 --- a/pkg/local_object_storage/writecache/writecachebadger/mode.go +++ /dev/null @@ -1,78 +0,0 @@ -package writecachebadger - -import ( - "context" - "fmt" - "time" - - "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" - "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/trace" -) - -// SetMode sets write-cache mode of operation. -// When shard is put in read-only mode all objects in memory are flushed to disk -// and all background jobs are suspended. -func (c *cache) SetMode(m mode.Mode) error { - ctx, span := tracing.StartSpanFromContext(context.TODO(), "writecache.SetMode", - trace.WithAttributes( - attribute.String("mode", m.String()), - )) - defer span.End() - - c.modeMtx.Lock() - defer c.modeMtx.Unlock() - - err := c.setMode(ctx, m) - if err == nil { - c.metrics.SetMode(m) - } - return err -} - -// setMode applies new mode. Must be called with cache.modeMtx lock taken. -func (c *cache) setMode(ctx context.Context, m mode.Mode) error { - var err error - turnOffMeta := m.NoMetabase() - - if turnOffMeta && !c.mode.NoMetabase() { - err = c.flush(ctx, true) - if err != nil { - return err - } - } - - if c.db != nil { - if err = c.db.Close(); err != nil { - return fmt.Errorf("can't close write-cache database: %w", err) - } - } - - // Suspend producers to ensure there are channel send operations in fly. - // flushCh is populated by `flush` with `modeMtx` taken, thus waiting until it is empty - // guarantees that there are no in-fly operations. - for len(c.flushCh) != 0 { - c.log.Info(logs.WritecacheWaitingForChannelsToFlush) - time.Sleep(time.Second) - } - - if turnOffMeta { - c.mode = m - return nil - } - - if err = c.openStore(m.ReadOnly()); err != nil { - return err - } - - c.mode = m - return nil -} - -// readOnly returns true if current mode is read-only. -// `c.modeMtx` must be taken. -func (c *cache) readOnly() bool { - return c.mode.ReadOnly() -} diff --git a/pkg/local_object_storage/writecache/writecachebadger/options.go b/pkg/local_object_storage/writecache/writecachebadger/options.go deleted file mode 100644 index d041a9b8..00000000 --- a/pkg/local_object_storage/writecache/writecachebadger/options.go +++ /dev/null @@ -1,119 +0,0 @@ -package writecachebadger - -import ( - "time" - - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" - "go.uber.org/zap" -) - -// Option represents write-cache configuration option. -type Option func(*options) - -type options struct { - log *logger.Logger - // path is a path to a directory for write-cache. - path string - // blobstor is the main persistent storage. - blobstor writecache.MainStorage - // metabase is the metabase instance. - metabase writecache.Metabase - // maxObjectSize is the maximum size of the object stored in the write-cache. - maxObjectSize uint64 - // workersCount is the number of workers flushing objects in parallel. - workersCount int - // maxCacheSize is the maximum total size of all objects saved in cache (DB + FS). - // 1 GiB by default. - maxCacheSize uint64 - // objCounters contains atomic counters for the number of objects stored in cache. - objCounters counters - // reportError is the function called when encountering disk errors in background workers. - reportError func(string, error) - // metrics is metrics implementation - metrics writecache.Metrics - // gcInterval is the interval duration to run the GC cycle. - gcInterval time.Duration - // disableBackgroundFlush is for testing purposes only. - disableBackgroundFlush bool -} - -// WithLogger sets logger. -func WithLogger(log *logger.Logger) Option { - return func(o *options) { - o.log = &logger.Logger{Logger: log.With(zap.String("component", "WriteCache"))} - } -} - -// WithPath sets path to writecache db. -func WithPath(path string) Option { - return func(o *options) { - o.path = path - } -} - -// WithBlobstor sets main object storage. -func WithBlobstor(bs writecache.MainStorage) Option { - return func(o *options) { - o.blobstor = bs - } -} - -// WithMetabase sets metabase. -func WithMetabase(db writecache.Metabase) Option { - return func(o *options) { - o.metabase = db - } -} - -// WithMaxObjectSize sets maximum object size to be stored in write-cache. -func WithMaxObjectSize(sz uint64) Option { - return func(o *options) { - if sz > 0 { - o.maxObjectSize = sz - } - } -} - -func WithFlushWorkersCount(c int) Option { - return func(o *options) { - if c > 0 { - o.workersCount = c - } - } -} - -// WithMaxCacheSize sets maximum write-cache size in bytes. -func WithMaxCacheSize(sz uint64) Option { - return func(o *options) { - o.maxCacheSize = sz - } -} - -// WithReportErrorFunc sets error reporting function. -func WithReportErrorFunc(f func(string, error)) Option { - return func(o *options) { - o.reportError = f - } -} - -// WithMetrics sets metrics implementation. -func WithMetrics(metrics writecache.Metrics) Option { - return func(o *options) { - o.metrics = metrics - } -} - -// WithGCInterval sets the duration of the interval to run GC cycles. -func WithGCInterval(d time.Duration) Option { - return func(o *options) { - o.gcInterval = d - } -} - -// WithDisableBackgroundFlush disables background flush, for testing purposes only. -func WithDisableBackgroundFlush() Option { - return func(o *options) { - o.disableBackgroundFlush = true - } -} diff --git a/pkg/local_object_storage/writecache/writecachebadger/put.go b/pkg/local_object_storage/writecache/writecachebadger/put.go deleted file mode 100644 index 2071ba1d..00000000 --- a/pkg/local_object_storage/writecache/writecachebadger/put.go +++ /dev/null @@ -1,82 +0,0 @@ -package writecachebadger - -import ( - "context" - "time" - - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" - storagelog "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/log" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache" - "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/trace" -) - -// Put puts object to write-cache. -// -// Returns ErrReadOnly if write-cache is in R/O mode. -// Returns ErrNotInitialized if write-cache has not been initialized yet. -// Returns ErrOutOfSpace if saving an object leads to WC's size overflow. -// Returns ErrBigObject if an objects exceeds maximum object size. -func (c *cache) Put(ctx context.Context, prm common.PutPrm) (common.PutRes, error) { - _, span := tracing.StartSpanFromContext(ctx, "writecache.Put", - trace.WithAttributes( - attribute.String("address", prm.Address.EncodeToString()), - attribute.Bool("dont_compress", prm.DontCompress), - )) - defer span.End() - - startedAt := time.Now() - added := false - storageType := writecache.StorageTypeUndefined - defer func() { - c.metrics.Put(time.Since(startedAt), added, storageType) - }() - - c.modeMtx.RLock() - defer c.modeMtx.RUnlock() - if c.readOnly() { - return common.PutRes{}, writecache.ErrReadOnly - } - - sz := uint64(len(prm.RawData)) - if sz > c.maxObjectSize { - return common.PutRes{}, writecache.ErrBigObject - } - - oi := objectInfo{ - addr: prm.Address, - obj: prm.Object, - data: prm.RawData, - } - - storageType = writecache.StorageTypeDB - err := c.put(oi) - if err == nil { - added = true - } - return common.PutRes{}, err -} - -// put persists objects to the write-cache database and -// pushes the to the flush workers queue. -func (c *cache) put(obj objectInfo) error { - cacheSize := c.estimateCacheSize() - if c.maxCacheSize < c.incSizeDB(cacheSize) { - return writecache.ErrOutOfSpace - } - - wb := c.db.NewWriteBatch() - k := addr2key(obj.addr) - _ = wb.Set(k[:], obj.data) - err := wb.Flush() - if err == nil { - storagelog.Write(c.log, - storagelog.AddressField(obj.addr), - storagelog.StorageTypeField(wcStorageType), - storagelog.OpField("db PUT"), - ) - c.incDB() - } - return err -} diff --git a/pkg/local_object_storage/writecache/writecachebadger/state.go b/pkg/local_object_storage/writecache/writecachebadger/state.go deleted file mode 100644 index e098eb06..00000000 --- a/pkg/local_object_storage/writecache/writecachebadger/state.go +++ /dev/null @@ -1,67 +0,0 @@ -package writecachebadger - -import ( - "fmt" - "math" - "sync/atomic" - - "github.com/dgraph-io/badger/v4" -) - -func (c *cache) estimateCacheSize() uint64 { - onDiskSize, _ := c.db.EstimateSize(nil) - c.metrics.SetEstimateSize(onDiskSize, 0) - return onDiskSize -} - -func (c *cache) incSizeDB(sz uint64) uint64 { - return sz + c.maxObjectSize -} - -type counters struct { - cDB atomic.Uint64 -} - -func (x *counters) IncDB() { - x.cDB.Add(1) -} - -func (x *counters) DecDB() { - x.cDB.Add(math.MaxUint64) -} - -func (x *counters) DB() uint64 { - return x.cDB.Load() -} - -func (c *cache) initCounters() error { - var inDB uint64 - err := c.db.View(func(tx *badger.Txn) error { - opts := badger.DefaultIteratorOptions - opts.PrefetchValues = false - it := tx.NewIterator(opts) - defer it.Close() - for it.Rewind(); it.Valid(); it.Next() { - inDB++ - } - return nil - }) - if err != nil { - return fmt.Errorf("could not read write-cache DB counter: %w", err) - } - - c.objCounters.cDB.Store(inDB) - c.metrics.SetActualCounters(inDB, 0) - - return nil -} - -func (c *cache) incDB() { - c.objCounters.IncDB() - c.metrics.SetActualCounters(c.objCounters.DB(), 0) -} - -func (c *cache) decDB() { - c.objCounters.DecDB() - c.metrics.SetActualCounters(c.objCounters.DB(), 0) -} diff --git a/pkg/local_object_storage/writecache/writecachebadger/storage.go b/pkg/local_object_storage/writecache/writecachebadger/storage.go deleted file mode 100644 index 04337b7a..00000000 --- a/pkg/local_object_storage/writecache/writecachebadger/storage.go +++ /dev/null @@ -1,91 +0,0 @@ -package writecachebadger - -import ( - "fmt" - "os" - "path/filepath" - - "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" - storagelog "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/log" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util" - cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" - oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" - "github.com/dgraph-io/badger/v4" - "go.uber.org/zap" -) - -// store represents persistent storage with in-memory LRU cache -// for flushed items on top of it. -type store struct { - db *badger.DB -} - -type internalKey [len(cid.ID{}) + len(oid.ID{})]byte - -func (k internalKey) address() oid.Address { - var addr oid.Address - var cnr cid.ID - var obj oid.ID - copy(cnr[:], k[:len(cnr)]) - copy(obj[:], k[len(cnr):]) - addr.SetContainer(cnr) - addr.SetObject(obj) - return addr -} - -func addr2key(addr oid.Address) internalKey { - var key internalKey - cnr, obj := addr.Container(), addr.Object() - copy(key[:len(cnr)], cnr[:]) - copy(key[len(cnr):], obj[:]) - return key -} - -const dbName = "small.badger" - -func (c *cache) openStore(readOnly bool) error { - err := util.MkdirAllX(c.path, os.ModePerm) - if err != nil { - return err - } - - c.db, err = OpenDB(filepath.Join(c.path, dbName), readOnly, c.log) - if err != nil { - return fmt.Errorf("could not open database: %w", err) - } - - return nil -} - -func (c *cache) deleteFromDB(keys []internalKey) []internalKey { - if len(keys) == 0 { - return keys - } - - wb := c.db.NewWriteBatch() - - var errorIndex int - for errorIndex = range keys { - if err := wb.Delete(keys[errorIndex][:]); err != nil { - break - } - } - - for i := 0; i < errorIndex; i++ { - c.decDB() - c.metrics.Evict(writecache.StorageTypeDB) - storagelog.Write(c.log, - storagelog.AddressField(keys[i]), - storagelog.StorageTypeField(wcStorageType), - storagelog.OpField("db DELETE"), - ) - } - - if err := wb.Flush(); err != nil { - c.log.Error(logs.WritecacheCantRemoveObjectsFromTheDatabase, zap.Error(err)) - } - - copy(keys, keys[errorIndex:]) - return keys[:len(keys)-errorIndex] -} diff --git a/pkg/local_object_storage/writecache/writecachebadger/util.go b/pkg/local_object_storage/writecache/writecachebadger/util.go deleted file mode 100644 index e6079e37..00000000 --- a/pkg/local_object_storage/writecache/writecachebadger/util.go +++ /dev/null @@ -1,39 +0,0 @@ -package writecachebadger - -import ( - "fmt" - - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" - "github.com/dgraph-io/badger/v4" - badgeroptions "github.com/dgraph-io/badger/v4/options" -) - -// OpenDB opens a badger instance for write-cache. Opens in read-only mode if ro is true. -func OpenDB(p string, ro bool, l *logger.Logger) (*badger.DB, error) { - return badger.Open(badger.DefaultOptions(p). - WithReadOnly(ro). - WithSyncWrites(true). - WithCompression(badgeroptions.None). - WithLoggingLevel(badger.ERROR). - WithLogger(badgerLoggerWrapper{l})) -} - -type badgerLoggerWrapper struct { - l *logger.Logger -} - -func (w badgerLoggerWrapper) Errorf(msg string, args ...any) { - w.l.Error(fmt.Sprintf(msg, args...)) -} - -func (w badgerLoggerWrapper) Warningf(msg string, args ...any) { - w.l.Warn(fmt.Sprintf(msg, args...)) -} - -func (w badgerLoggerWrapper) Infof(msg string, args ...any) { - w.l.Info(fmt.Sprintf(msg, args...)) -} - -func (w badgerLoggerWrapper) Debugf(msg string, args ...any) { - w.l.Debug(fmt.Sprintf(msg, args...)) -} diff --git a/pkg/local_object_storage/writecache/writecachebbolt/flush_test.go b/pkg/local_object_storage/writecache/writecachebbolt/flush_test.go deleted file mode 100644 index 89add811..00000000 --- a/pkg/local_object_storage/writecache/writecachebbolt/flush_test.go +++ /dev/null @@ -1,109 +0,0 @@ -package writecachebbolt - -import ( - "context" - "os" - "path/filepath" - "sync/atomic" - "testing" - - objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil" - meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache/writecachetest" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger/test" - oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test" - "github.com/stretchr/testify/require" - "go.etcd.io/bbolt" - "go.uber.org/zap" -) - -func TestFlush(t *testing.T) { - testlogger := test.NewLogger(t, true) - - createCacheFn := func(t *testing.T, smallSize uint64, mb *meta.DB, bs writecache.MainStorage, opts ...Option) writecache.Cache { - return New( - append([]Option{ - WithLogger(testlogger), - WithPath(filepath.Join(t.TempDir(), "writecache")), - WithSmallObjectSize(smallSize), - WithMetabase(mb), - WithBlobstor(bs), - WithDisableBackgroundFlush(), - }, opts...)...) - } - - errCountOpt := func() (Option, *atomic.Uint32) { - cnt := &atomic.Uint32{} - return WithReportErrorFunc(func(msg string, err error) { - cnt.Add(1) - testlogger.Warn(msg, zap.Uint32("error_count", cnt.Load()), zap.Error(err)) - }), cnt - } - - failures := []writecachetest.TestFailureInjector[Option]{ - { - Desc: "db, invalid address", - InjectFn: func(t *testing.T, wc writecache.Cache) { - c := wc.(*cache) - obj := testutil.GenerateObject() - data, err := obj.Marshal() - require.NoError(t, err) - require.NoError(t, c.db.Batch(func(tx *bbolt.Tx) error { - b := tx.Bucket(defaultBucket) - return b.Put([]byte{1, 2, 3}, data) - })) - }, - }, - { - Desc: "db, invalid object", - InjectFn: func(t *testing.T, wc writecache.Cache) { - c := wc.(*cache) - require.NoError(t, c.db.Batch(func(tx *bbolt.Tx) error { - b := tx.Bucket(defaultBucket) - k := []byte(oidtest.Address().EncodeToString()) - v := []byte{1, 2, 3} - return b.Put(k, v) - })) - }, - }, - { - Desc: "fs, read error", - InjectFn: func(t *testing.T, wc writecache.Cache) { - c := wc.(*cache) - obj := testutil.GenerateObject() - data, err := obj.Marshal() - require.NoError(t, err) - - var prm common.PutPrm - prm.Address = objectCore.AddressOf(obj) - prm.RawData = data - - _, err = c.fsTree.Put(context.Background(), prm) - require.NoError(t, err) - - p := prm.Address.Object().EncodeToString() + "." + prm.Address.Container().EncodeToString() - p = filepath.Join(c.fsTree.RootPath, p[:1], p[1:]) - - _, err = os.Stat(p) // sanity check - require.NoError(t, err) - require.NoError(t, os.Truncate(p, 0)) // corrupt the file contents, so that it can't be unmarshalled - }, - }, - { - Desc: "fs, invalid object", - InjectFn: func(t *testing.T, wc writecache.Cache) { - c := wc.(*cache) - var prm common.PutPrm - prm.Address = oidtest.Address() - prm.RawData = []byte{1, 2, 3} - _, err := c.fsTree.Put(context.Background(), prm) - require.NoError(t, err) - }, - }, - } - - writecachetest.TestFlush(t, createCacheFn, errCountOpt, failures...) -}