diff --git a/cmd/frostfs-cli/modules/container/list.go b/cmd/frostfs-cli/modules/container/list.go index f01e4db4d..4afca6c17 100644 --- a/cmd/frostfs-cli/modules/container/list.go +++ b/cmd/frostfs-cli/modules/container/list.go @@ -51,7 +51,7 @@ var listContainersCmd = &cobra.Command{ var prm internalclient.ListContainersPrm prm.SetClient(cli) - prm.Account = idUser + prm.OwnerID = idUser res, err := internalclient.ListContainers(cmd.Context(), prm) commonCmd.ExitOnErr(cmd, "rpc error: %w", err) diff --git a/cmd/frostfs-cli/modules/object/nodes.go b/cmd/frostfs-cli/modules/object/nodes.go index e6918dfc9..31682c0e1 100644 --- a/cmd/frostfs-cli/modules/object/nodes.go +++ b/cmd/frostfs-cli/modules/object/nodes.go @@ -1,15 +1,12 @@ package object import ( - "bytes" - "cmp" "context" "crypto/ecdsa" "encoding/hex" "encoding/json" "errors" "fmt" - "slices" "sync" internalclient "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/client" @@ -507,7 +504,6 @@ func isObjectStoredOnNode(ctx context.Context, cmd *cobra.Command, cnrID cid.ID, } func printPlacement(cmd *cobra.Command, objID oid.ID, objects []phyObject, result *objectNodesResult) { - normilizeObjectNodesResult(objects, result) if json, _ := cmd.Flags().GetBool(commonflags.JSON); json { printObjectNodesAsJSON(cmd, objID, objects, result) } else { @@ -515,34 +511,6 @@ func printPlacement(cmd *cobra.Command, objID oid.ID, objects []phyObject, resul } } -func normilizeObjectNodesResult(objects []phyObject, result *objectNodesResult) { - slices.SortFunc(objects, func(lhs, rhs phyObject) int { - if lhs.ecHeader == nil && rhs.ecHeader == nil { - return bytes.Compare(lhs.objectID[:], rhs.objectID[:]) - } - if lhs.ecHeader == nil { - return -1 - } - if rhs.ecHeader == nil { - return 1 - } - if lhs.ecHeader.parent == rhs.ecHeader.parent { - return cmp.Compare(lhs.ecHeader.index, rhs.ecHeader.index) - } - return bytes.Compare(lhs.ecHeader.parent[:], rhs.ecHeader.parent[:]) - }) - for _, obj := range objects { - op := result.placements[obj.objectID] - slices.SortFunc(op.confirmedNodes, func(lhs, rhs netmapSDK.NodeInfo) int { - return bytes.Compare(lhs.PublicKey(), rhs.PublicKey()) - }) - slices.SortFunc(op.requiredNodes, func(lhs, rhs netmapSDK.NodeInfo) int { - return bytes.Compare(lhs.PublicKey(), rhs.PublicKey()) - }) - result.placements[obj.objectID] = op - } -} - func printObjectNodesAsText(cmd *cobra.Command, objID oid.ID, objects []phyObject, result *objectNodesResult) { fmt.Fprintf(cmd.OutOrStdout(), "Object %s stores payload in %d data objects:\n", objID.EncodeToString(), len(objects)) diff --git a/cmd/frostfs-lens/internal/writecache/inspect.go b/cmd/frostfs-lens/internal/writecache/inspect.go index afc986c8b..63c669a35 100644 --- a/cmd/frostfs-lens/internal/writecache/inspect.go +++ b/cmd/frostfs-lens/internal/writecache/inspect.go @@ -25,7 +25,7 @@ func init() { func inspectFunc(cmd *cobra.Command, _ []string) { var data []byte - db, err := writecache.OpenDB(vPath, true, os.OpenFile) + db, err := writecache.OpenDB(vPath, true, os.OpenFile, 0) common.ExitOnErr(cmd, common.Errf("could not open write-cache db: %w", err)) defer db.Close() diff --git a/cmd/frostfs-lens/internal/writecache/list.go b/cmd/frostfs-lens/internal/writecache/list.go index bcbae0ec9..9c8fa6138 100644 --- a/cmd/frostfs-lens/internal/writecache/list.go +++ b/cmd/frostfs-lens/internal/writecache/list.go @@ -31,7 +31,7 @@ func listFunc(cmd *cobra.Command, _ []string) { return err } - db, err := writecache.OpenDB(vPath, true, os.OpenFile) + db, err := writecache.OpenDB(vPath, true, os.OpenFile, 0) common.ExitOnErr(cmd, common.Errf("could not open write-cache db: %w", err)) defer db.Close() diff --git a/cmd/frostfs-node/config.go b/cmd/frostfs-node/config.go index 5af37865f..29c338184 100644 --- a/cmd/frostfs-node/config.go +++ b/cmd/frostfs-node/config.go @@ -154,12 +154,15 @@ type shardCfg struct { writecacheCfg struct { enabled bool path string + maxBatchSize int + maxBatchDelay time.Duration + smallObjectSize uint64 maxObjSize uint64 flushWorkerCount int sizeLimit uint64 countLimit uint64 noSync bool - flushSizeLimit uint64 + pageSize int } piloramaCfg struct { @@ -289,12 +292,15 @@ func (a *applicationConfiguration) setShardWriteCacheConfig(newConfig *shardCfg, wc.enabled = true wc.path = writeCacheCfg.Path() + wc.maxBatchSize = writeCacheCfg.BoltDB().MaxBatchSize() + wc.maxBatchDelay = writeCacheCfg.BoltDB().MaxBatchDelay() + wc.pageSize = writeCacheCfg.BoltDB().PageSize() wc.maxObjSize = writeCacheCfg.MaxObjectSize() + wc.smallObjectSize = writeCacheCfg.SmallObjectSize() wc.flushWorkerCount = writeCacheCfg.WorkerCount() wc.sizeLimit = writeCacheCfg.SizeLimit() wc.countLimit = writeCacheCfg.CountLimit() wc.noSync = writeCacheCfg.NoSync() - wc.flushSizeLimit = writeCacheCfg.MaxFlushingObjectsSize() } } @@ -910,8 +916,11 @@ func (c *cfg) getWriteCacheOpts(shCfg shardCfg) []writecache.Option { if wcRead := shCfg.writecacheCfg; wcRead.enabled { writeCacheOpts = append(writeCacheOpts, writecache.WithPath(wcRead.path), - writecache.WithFlushSizeLimit(wcRead.flushSizeLimit), + writecache.WithMaxBatchSize(wcRead.maxBatchSize), + writecache.WithMaxBatchDelay(wcRead.maxBatchDelay), + writecache.WithPageSize(wcRead.pageSize), writecache.WithMaxObjectSize(wcRead.maxObjSize), + writecache.WithSmallObjectSize(wcRead.smallObjectSize), writecache.WithFlushWorkersCount(wcRead.flushWorkerCount), writecache.WithMaxCacheSize(wcRead.sizeLimit), writecache.WithMaxCacheCount(wcRead.countLimit), diff --git a/cmd/frostfs-node/config/engine/config.go b/cmd/frostfs-node/config/engine/config.go index c944d1c58..e5735e88b 100644 --- a/cmd/frostfs-node/config/engine/config.go +++ b/cmd/frostfs-node/config/engine/config.go @@ -41,6 +41,10 @@ func IterateShards(c *config.Config, required bool, f func(*shardconfig.Config) c.Sub(si), ) + if sc.Mode() == mode.Disabled { + continue + } + // Path for the blobstor can't be present in the default section, because different shards // must have different paths, so if it is missing, the shard is not here. // At the same time checking for "blobstor" section doesn't work proper @@ -50,10 +54,6 @@ func IterateShards(c *config.Config, required bool, f func(*shardconfig.Config) } (*config.Config)(sc).SetDefault(def) - if sc.Mode() == mode.Disabled { - continue - } - if err := f(sc); err != nil { return err } diff --git a/cmd/frostfs-node/config/engine/config_test.go b/cmd/frostfs-node/config/engine/config_test.go index 19ad0e7ac..337724afb 100644 --- a/cmd/frostfs-node/config/engine/config_test.go +++ b/cmd/frostfs-node/config/engine/config_test.go @@ -18,6 +18,22 @@ import ( "github.com/stretchr/testify/require" ) +func TestIterateShards(t *testing.T) { + fileConfigTest := func(c *config.Config) { + var res []string + require.NoError(t, + engineconfig.IterateShards(c, false, func(sc *shardconfig.Config) error { + res = append(res, sc.Metabase().Path()) + return nil + })) + require.Equal(t, []string{"abc", "xyz"}, res) + } + + const cfgDir = "./testdata/shards" + configtest.ForEachFileType(cfgDir, fileConfigTest) + configtest.ForEnvFileType(t, cfgDir, fileConfigTest) +} + func TestEngineSection(t *testing.T) { t.Run("defaults", func(t *testing.T) { empty := configtest.EmptyConfig() @@ -73,11 +89,12 @@ func TestEngineSection(t *testing.T) { require.Equal(t, true, wc.NoSync()) require.Equal(t, "tmp/0/cache", wc.Path()) + require.EqualValues(t, 16384, wc.SmallObjectSize()) require.EqualValues(t, 134217728, wc.MaxObjectSize()) require.EqualValues(t, 30, wc.WorkerCount()) require.EqualValues(t, 3221225472, wc.SizeLimit()) + require.EqualValues(t, 4096, wc.BoltDB().PageSize()) require.EqualValues(t, 49, wc.CountLimit()) - require.EqualValues(t, uint64(100), wc.MaxFlushingObjectsSize()) require.Equal(t, "tmp/0/meta", meta.Path()) require.Equal(t, fs.FileMode(0o644), meta.BoltDB().Perm()) @@ -129,11 +146,12 @@ func TestEngineSection(t *testing.T) { require.Equal(t, false, wc.NoSync()) require.Equal(t, "tmp/1/cache", wc.Path()) + require.EqualValues(t, 16384, wc.SmallObjectSize()) require.EqualValues(t, 134217728, wc.MaxObjectSize()) require.EqualValues(t, 30, wc.WorkerCount()) require.EqualValues(t, 4294967296, wc.SizeLimit()) + require.EqualValues(t, 0, wc.BoltDB().PageSize()) require.EqualValues(t, writecacheconfig.CountLimitDefault, wc.CountLimit()) - require.EqualValues(t, writecacheconfig.MaxFlushingObjectsSizeDefault, wc.MaxFlushingObjectsSize()) require.Equal(t, "tmp/1/meta", meta.Path()) require.Equal(t, fs.FileMode(0o644), meta.BoltDB().Perm()) diff --git a/cmd/frostfs-node/config/engine/shard/writecache/config.go b/cmd/frostfs-node/config/engine/shard/writecache/config.go index 6fff0308b..bfe8144df 100644 --- a/cmd/frostfs-node/config/engine/shard/writecache/config.go +++ b/cmd/frostfs-node/config/engine/shard/writecache/config.go @@ -2,6 +2,7 @@ package writecacheconfig import ( "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config" + boltdbconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/boltdb" ) // Config is a wrapper over the config section @@ -9,6 +10,9 @@ import ( type Config config.Config const ( + // SmallSizeDefault is a default size of small objects. + SmallSizeDefault = 32 << 10 + // MaxSizeDefault is a default value of the object payload size limit. MaxSizeDefault = 64 << 20 @@ -20,8 +24,6 @@ const ( // CountLimitDefault is a default write-cache count limit. CountLimitDefault = 0 - - MaxFlushingObjectsSizeDefault = 128 << 20 ) // From wraps config section into Config. @@ -52,6 +54,22 @@ func (x *Config) Path() string { return p } +// SmallObjectSize returns the value of "small_object_size" config parameter. +// +// Returns SmallSizeDefault if the value is not a positive number. +func (x *Config) SmallObjectSize() uint64 { + s := config.SizeInBytesSafe( + (*config.Config)(x), + "small_object_size", + ) + + if s > 0 { + return s + } + + return SmallSizeDefault +} + // MaxObjectSize returns the value of "max_object_size" config parameter. // // Returns MaxSizeDefault if the value is not a positive number. @@ -123,18 +141,7 @@ func (x *Config) NoSync() bool { return config.BoolSafe((*config.Config)(x), "no_sync") } -// MaxFlushingObjectsSize returns the value of "max_flushing_objects_size" config parameter. -// -// Returns MaxFlushingObjectsSizeDefault if the value is not a positive number. -func (x *Config) MaxFlushingObjectsSize() uint64 { - s := config.SizeInBytesSafe( - (*config.Config)(x), - "max_flushing_objects_size", - ) - - if s > 0 { - return s - } - - return MaxFlushingObjectsSizeDefault +// BoltDB returns config instance for querying bolt db specific parameters. +func (x *Config) BoltDB() *boltdbconfig.Config { + return (*boltdbconfig.Config)(x) } diff --git a/cmd/frostfs-node/config/engine/testdata/shards.env b/cmd/frostfs-node/config/engine/testdata/shards.env new file mode 100644 index 000000000..079789b0f --- /dev/null +++ b/cmd/frostfs-node/config/engine/testdata/shards.env @@ -0,0 +1,3 @@ +FROSTFS_STORAGE_SHARD_0_METABASE_PATH=abc +FROSTFS_STORAGE_SHARD_1_MODE=disabled +FROSTFS_STORAGE_SHARD_2_METABASE_PATH=xyz diff --git a/cmd/frostfs-node/config/engine/testdata/shards.json b/cmd/frostfs-node/config/engine/testdata/shards.json new file mode 100644 index 000000000..b3d6abe85 --- /dev/null +++ b/cmd/frostfs-node/config/engine/testdata/shards.json @@ -0,0 +1,13 @@ +{ + "storage.shard": { + "0": { + "metabase.path": "abc" + }, + "1": { + "mode": "disabled" + }, + "2": { + "metabase.path": "xyz" + } + } +} diff --git a/cmd/frostfs-node/config/engine/testdata/shards.yaml b/cmd/frostfs-node/config/engine/testdata/shards.yaml new file mode 100644 index 000000000..bbbba3af8 --- /dev/null +++ b/cmd/frostfs-node/config/engine/testdata/shards.yaml @@ -0,0 +1,7 @@ +storage.shard: + 0: + metabase.path: abc + 1: + mode: disabled + 2: + metabase.path: xyz diff --git a/config/example/node.env b/config/example/node.env index f470acf3e..172033e7d 100644 --- a/config/example/node.env +++ b/config/example/node.env @@ -109,7 +109,6 @@ FROSTFS_STORAGE_SHARD_0_WRITECACHE_FLUSH_WORKER_COUNT=30 FROSTFS_STORAGE_SHARD_0_WRITECACHE_CAPACITY=3221225472 FROSTFS_STORAGE_SHARD_0_WRITECACHE_PAGE_SIZE=4096 FROSTFS_STORAGE_SHARD_0_WRITECACHE_MAX_OBJECT_COUNT=49 -FROSTFS_STORAGE_SHARD_0_WRITECACHE_MAX_FLUSHING_OBJECTS_SIZE=100 ### Metabase config FROSTFS_STORAGE_SHARD_0_METABASE_PATH=tmp/0/meta FROSTFS_STORAGE_SHARD_0_METABASE_PERM=0644 diff --git a/config/example/node.json b/config/example/node.json index dba3bad8b..dbf1349aa 100644 --- a/config/example/node.json +++ b/config/example/node.json @@ -154,8 +154,7 @@ "flush_worker_count": 30, "capacity": 3221225472, "page_size": 4096, - "max_object_count": 49, - "max_flushing_objects_size": 100 + "max_object_count": 49 }, "metabase": { "path": "tmp/0/meta", diff --git a/config/example/node.yaml b/config/example/node.yaml index 8f9300b4a..bc62dccfc 100644 --- a/config/example/node.yaml +++ b/config/example/node.yaml @@ -179,7 +179,6 @@ storage: capacity: 3221225472 # approximate write-cache total size, bytes max_object_count: 49 page_size: 4k - max_flushing_objects_size: 100b metabase: path: tmp/0/meta # metabase path diff --git a/docs/storage-node-configuration.md b/docs/storage-node-configuration.md index 98d72cb69..7149f09e0 100644 --- a/docs/storage-node-configuration.md +++ b/docs/storage-node-configuration.md @@ -287,18 +287,23 @@ writecache: enabled: true path: /path/to/writecache capacity: 4294967296 + small_object_size: 16384 max_object_size: 134217728 flush_worker_count: 30 + page_size: '4k' ``` -| Parameter | Type | Default value | Description | -| --------------------------- | ---------- | ------------- | ----------------------------------------------------------------------------------------------------------------------------- | -| `path` | `string` | | Path to the metabase file. | -| `capacity` | `size` | `1G` | Approximate maximum size of the writecache. If the writecache is full, objects are written to the blobstor directly. | -| `max_object_count` | `int` | unrestricted | Approximate maximum objects count in the writecache. If the writecache is full, objects are written to the blobstor directly. | -| `max_object_size` | `size` | `64M` | Maximum object size allowed to be stored in the writecache. | -| `flush_worker_count` | `int` | `20` | Amount of background workers that move data from the writecache to the blobstor. | -| `max_flushing_objects_size` | `size` | `512M` | Max total size of background flushing objects. | +| Parameter | Type | Default value | Description | +|----------------------|------------|---------------|-------------------------------------------------------------------------------------------------------------------------------| +| `path` | `string` | | Path to the metabase file. | +| `capacity` | `size` | `1G` | Approximate maximum size of the writecache. If the writecache is full, objects are written to the blobstor directly. | +| `max_object_count` | `int` | unrestricted | Approximate maximum objects count in the writecache. If the writecache is full, objects are written to the blobstor directly. | +| `small_object_size` | `size` | `32K` | Maximum object size for "small" objects. This objects are stored in a key-value database instead of a file-system. | +| `max_object_size` | `size` | `64M` | Maximum object size allowed to be stored in the writecache. | +| `flush_worker_count` | `int` | `20` | Amount of background workers that move data from the writecache to the blobstor. | +| `max_batch_size` | `int` | `1000` | Maximum amount of small object `PUT` operations to perform in a single transaction. | +| `max_batch_delay` | `duration` | `10ms` | Maximum delay before a batch starts. | +| `page_size` | `size` | `0` | Page size overrides the default OS page size for small objects storage. Does not affect the existing storage. | # `node` section diff --git a/go.mod b/go.mod index c538a3178..39a3cf123 100644 --- a/go.mod +++ b/go.mod @@ -8,7 +8,7 @@ require ( git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0 git.frostfs.info/TrueCloudLab/frostfs-locode-db v0.4.1-0.20240710074952-65761deb5c0d git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20241112082307-f17779933e88 - git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20241107121119-cb813e27a823 + git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20241226115718-82e48c8a634d git.frostfs.info/TrueCloudLab/hrw v1.2.1 git.frostfs.info/TrueCloudLab/multinet v0.0.0-20241015075604-6cb0d80e0972 git.frostfs.info/TrueCloudLab/policy-engine v0.0.0-20240814080254-96225afacb88 @@ -27,7 +27,7 @@ require ( github.com/klauspost/compress v1.17.4 github.com/mailru/easyjson v0.7.7 github.com/mr-tron/base58 v1.2.0 - github.com/multiformats/go-multiaddr v0.12.1 + github.com/multiformats/go-multiaddr v0.14.0 github.com/nspcc-dev/neo-go v0.106.3 github.com/olekukonko/tablewriter v0.0.5 github.com/panjf2000/ants/v2 v2.9.0 @@ -40,15 +40,15 @@ require ( github.com/ssgreg/journald v1.0.0 github.com/stretchr/testify v1.9.0 go.etcd.io/bbolt v1.3.10 - go.opentelemetry.io/otel v1.28.0 - go.opentelemetry.io/otel/trace v1.28.0 + go.opentelemetry.io/otel v1.31.0 + go.opentelemetry.io/otel/trace v1.31.0 go.uber.org/zap v1.27.0 golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 - golang.org/x/sync v0.7.0 - golang.org/x/sys v0.22.0 - golang.org/x/term v0.21.0 - google.golang.org/grpc v1.66.2 - google.golang.org/protobuf v1.34.2 + golang.org/x/sync v0.10.0 + golang.org/x/sys v0.28.0 + golang.org/x/term v0.27.0 + google.golang.org/grpc v1.69.2 + google.golang.org/protobuf v1.36.1 gopkg.in/yaml.v3 v3.0.1 ) @@ -119,15 +119,15 @@ require ( go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.28.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.28.0 // indirect go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.28.0 // indirect - go.opentelemetry.io/otel/metric v1.28.0 // indirect - go.opentelemetry.io/otel/sdk v1.28.0 // indirect + go.opentelemetry.io/otel/metric v1.31.0 // indirect + go.opentelemetry.io/otel/sdk v1.31.0 // indirect go.opentelemetry.io/proto/otlp v1.3.1 // indirect go.uber.org/multierr v1.11.0 // indirect - golang.org/x/crypto v0.24.0 // indirect - golang.org/x/net v0.26.0 // indirect - golang.org/x/text v0.16.0 // indirect - google.golang.org/genproto/googleapis/api v0.0.0-20240701130421-f6361c86f094 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20240701130421-f6361c86f094 // indirect + golang.org/x/crypto v0.31.0 // indirect + golang.org/x/net v0.33.0 // indirect + golang.org/x/text v0.21.0 // indirect + google.golang.org/genproto/googleapis/api v0.0.0-20241015192408-796eee8c2d53 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20241223144023-3abc09e42ca8 // indirect gopkg.in/ini.v1 v1.67.0 // indirect lukechampine.com/blake3 v1.2.1 // indirect rsc.io/tmplfunc v0.0.3 // indirect diff --git a/go.sum b/go.sum index 064f3274e..6c5b79e80 100644 --- a/go.sum +++ b/go.sum @@ -8,8 +8,8 @@ git.frostfs.info/TrueCloudLab/frostfs-locode-db v0.4.1-0.20240710074952-65761deb git.frostfs.info/TrueCloudLab/frostfs-locode-db v0.4.1-0.20240710074952-65761deb5c0d/go.mod h1:7ZZq8iguY7qFsXajdHGmZd2AW4QbucyrJwhbsRfOfek= git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20241112082307-f17779933e88 h1:9bvBDLApbbO5sXBKdODpE9tzy3HV99nXxkDWNn22rdI= git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20241112082307-f17779933e88/go.mod h1:kbwB4v2o6RyOfCo9kEFeUDZIX3LKhmS0yXPrtvzkQ1g= -git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20241107121119-cb813e27a823 h1:sepm9FeuoInmygH1K/+3L+Yp5bJhGiVi/oGCH6Emp2c= -git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20241107121119-cb813e27a823/go.mod h1:eoK7+KZQ9GJxbzIs6vTnoUJqFDppavInLRHaN4MYgZg= +git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20241226115718-82e48c8a634d h1:Sh6Tgmiz/UxPDM7kYtwyGPp/2fpqeqF0SkDJlIlV0LQ= +git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20241226115718-82e48c8a634d/go.mod h1:qruvF6zaznX0RDPpfOk0AMcyy5XO8rm+s+wyj5nSDnI= git.frostfs.info/TrueCloudLab/hrw v1.2.1 h1:ccBRK21rFvY5R1WotI6LNoPlizk7qSvdfD8lNIRudVc= git.frostfs.info/TrueCloudLab/hrw v1.2.1/go.mod h1:C1Ygde2n843yTZEQ0FP69jYiuaYV0kriLvP4zm8JuvM= git.frostfs.info/TrueCloudLab/multinet v0.0.0-20241015075604-6cb0d80e0972 h1:/960fWeyn2AFHwQUwDsWB3sbP6lTEnFnMzLMM6tx6N8= @@ -106,6 +106,8 @@ github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvq github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= +github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= +github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= @@ -188,8 +190,8 @@ github.com/multiformats/go-base32 v0.1.0 h1:pVx9xoSPqEIQG8o+UbAe7DNi51oej1NtK+aG github.com/multiformats/go-base32 v0.1.0/go.mod h1:Kj3tFY6zNr+ABYMqeUNeGvkIC/UYgtWibDcT0rExnbI= github.com/multiformats/go-base36 v0.2.0 h1:lFsAbNOGeKtuKozrtBsAkSVhv1p9D0/qedU9rQyccr0= github.com/multiformats/go-base36 v0.2.0/go.mod h1:qvnKE++v+2MWCfePClUEjE78Z7P2a1UV0xHgWc0hkp4= -github.com/multiformats/go-multiaddr v0.12.1 h1:vm+BA/WZA8QZDp1pF1FWhi5CT3g1tbi5GJmqpb6wnlk= -github.com/multiformats/go-multiaddr v0.12.1/go.mod h1:7mPkiBMmLeFipt+nNSq9pHZUeJSt8lHBgH6yhj0YQzE= +github.com/multiformats/go-multiaddr v0.14.0 h1:bfrHrJhrRuh/NXH5mCnemjpbGjzRw/b+tJFOD41g2tU= +github.com/multiformats/go-multiaddr v0.14.0/go.mod h1:6EkVAxtznq2yC3QT5CM1UTAwG0GTP3EWAIcjHuzQ+r4= github.com/multiformats/go-multibase v0.2.0 h1:isdYCVLvksgWlMW9OZRYJEa9pZETFivncJHmHnnd87g= github.com/multiformats/go-multibase v0.2.0/go.mod h1:bFBZX4lKCA/2lyOFSAoKH5SS6oPyjtnzK/XTFDPkNuk= github.com/multiformats/go-multihash v0.2.3 h1:7Lyc8XfX/IY2jWb/gI7JP+o7JEq9hOa7BFvVU9RSh+U= @@ -290,20 +292,22 @@ github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9dec github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= go.etcd.io/bbolt v1.3.10 h1:+BqfJTcCzTItrop8mq/lbzL8wSGtj94UO/3U31shqG0= go.etcd.io/bbolt v1.3.10/go.mod h1:bK3UQLPJZly7IlNmV7uVHJDxfe5aK9Ll93e/74Y9oEQ= -go.opentelemetry.io/otel v1.28.0 h1:/SqNcYk+idO0CxKEUOtKQClMK/MimZihKYMruSMViUo= -go.opentelemetry.io/otel v1.28.0/go.mod h1:q68ijF8Fc8CnMHKyzqL6akLO46ePnjkgfIMIjUIX9z4= +go.opentelemetry.io/otel v1.31.0 h1:NsJcKPIW0D0H3NgzPDHmo0WW6SptzPdqg/L1zsIm2hY= +go.opentelemetry.io/otel v1.31.0/go.mod h1:O0C14Yl9FgkjqcCZAsE053C13OaddMYr/hz6clDkEJE= go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.28.0 h1:3Q/xZUyC1BBkualc9ROb4G8qkH90LXEIICcs5zv1OYY= go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.28.0/go.mod h1:s75jGIWA9OfCMzF0xr+ZgfrB5FEbbV7UuYo32ahUiFI= go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.28.0 h1:R3X6ZXmNPRR8ul6i3WgFURCHzaXjHdm0karRG/+dj3s= go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.28.0/go.mod h1:QWFXnDavXWwMx2EEcZsf3yxgEKAqsxQ+Syjp+seyInw= go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.28.0 h1:EVSnY9JbEEW92bEkIYOVMw4q1WJxIAGoFTrtYOzWuRQ= go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.28.0/go.mod h1:Ea1N1QQryNXpCD0I1fdLibBAIpQuBkznMmkdKrapk1Y= -go.opentelemetry.io/otel/metric v1.28.0 h1:f0HGvSl1KRAU1DLgLGFjrwVyismPlnuU6JD6bOeuA5Q= -go.opentelemetry.io/otel/metric v1.28.0/go.mod h1:Fb1eVBFZmLVTMb6PPohq3TO9IIhUisDsbJoL/+uQW4s= -go.opentelemetry.io/otel/sdk v1.28.0 h1:b9d7hIry8yZsgtbmM0DKyPWMMUMlK9NEKuIG4aBqWyE= -go.opentelemetry.io/otel/sdk v1.28.0/go.mod h1:oYj7ClPUA7Iw3m+r7GeEjz0qckQRJK2B8zjcZEfu7Pg= -go.opentelemetry.io/otel/trace v1.28.0 h1:GhQ9cUuQGmNDd5BTCP2dAvv75RdMxEfTmYejp+lkx9g= -go.opentelemetry.io/otel/trace v1.28.0/go.mod h1:jPyXzNPg6da9+38HEwElrQiHlVMTnVfM3/yv2OlIHaI= +go.opentelemetry.io/otel/metric v1.31.0 h1:FSErL0ATQAmYHUIzSezZibnyVlft1ybhy4ozRPcF2fE= +go.opentelemetry.io/otel/metric v1.31.0/go.mod h1:C3dEloVbLuYoX41KpmAhOqNriGbA+qqH6PQ5E5mUfnY= +go.opentelemetry.io/otel/sdk v1.31.0 h1:xLY3abVHYZ5HSfOg3l2E5LUj2Cwva5Y7yGxnSW9H5Gk= +go.opentelemetry.io/otel/sdk v1.31.0/go.mod h1:TfRbMdhvxIIr/B2N2LQW2S5v9m3gOQ/08KsbbO5BPT0= +go.opentelemetry.io/otel/sdk/metric v1.31.0 h1:i9hxxLJF/9kkvfHppyLL55aW7iIJz4JjxTeYusH7zMc= +go.opentelemetry.io/otel/sdk/metric v1.31.0/go.mod h1:CRInTMVvNhUKgSAMbKyTMxqOBC0zgyxzW55lZzX43Y8= +go.opentelemetry.io/otel/trace v1.31.0 h1:ffjsj1aRouKewfr85U2aGagJ46+MvodynlQ1HYdmJys= +go.opentelemetry.io/otel/trace v1.31.0/go.mod h1:TXZkRk7SM2ZQLtR6eoAWQFIHPvzQ06FJAsO1tJg480A= go.opentelemetry.io/proto/otlp v1.3.1 h1:TrMUixzpM0yuc/znrFTP9MMRh8trP93mkCiDVeXrui0= go.opentelemetry.io/proto/otlp v1.3.1/go.mod h1:0X1WI4de4ZsLrrJNLAQbFeLCm3T7yBkR0XqQ7niQU+8= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= @@ -318,8 +322,8 @@ golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPh golang.org/x/crypto v0.0.0-20210513164829-c07d793c2f9a/go.mod h1:P+XmwS30IXTQdn5tA2iutPOUgjI07+tq3H3K9MVA1s8= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.17.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4= -golang.org/x/crypto v0.24.0 h1:mnl8DM0o513X8fdIkmyFE/5hTYxbwYOjDS/+rK6qpRI= -golang.org/x/crypto v0.24.0/go.mod h1:Z1PMYSOR5nyMcyAVAIQSKCDwalqy85Aqn1x3Ws4L5DM= +golang.org/x/crypto v0.31.0 h1:ihbySMvVjLAeSH1IbfcRTkD/iNscyz8rGzjF/E5hV6U= +golang.org/x/crypto v0.31.0/go.mod h1:kDsLvtWBEx7MV9tJOj9bnXsPbxwJQ6csT/x4KIN4Ssk= golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 h1:2dVuKD2vS7b0QIHQbpyTISPd0LeHDbnYEryqj5Q1ug8= golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56/go.mod h1:M4RDyNAINzryxdtnbRXRL/OHtkFuWGRjvuhBJpk2IlY= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= @@ -339,16 +343,16 @@ golang.org/x/net v0.0.0-20220607020251-c690dde0001d/go.mod h1:XRhObCWvk6IyKnWLug golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= -golang.org/x/net v0.26.0 h1:soB7SVo0PWrY4vPW/+ay0jKDNScG2X9wFeYlXIvJsOQ= -golang.org/x/net v0.26.0/go.mod h1:5YKkiSynbBIh3p6iOc/vibscux0x38BZDkn8sCUPxHE= +golang.org/x/net v0.33.0 h1:74SYHlV8BIgHIFC/LrYkOGIwL19eTYXQ5wc6TBuO36I= +golang.org/x/net v0.33.0/go.mod h1:HXLR5J+9DxmrqMwG9qjGCxZ+zKXxBru04zlTvWlWuN4= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y= -golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M= -golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ= +golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -375,16 +379,16 @@ golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/sys v0.22.0 h1:RI27ohtqKCnwULzJLqkv897zojh5/DwS/ENaMzUOaWI= -golang.org/x/sys v0.22.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA= +golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo= golang.org/x/term v0.15.0/go.mod h1:BDl952bC7+uMoWR75FIrCDx79TPU9oHkTZ9yRbYOrX0= golang.org/x/term v0.17.0/go.mod h1:lLRBjIVuehSbZlaOtGMbcMncT+aqLLLmKrsjNrUguwk= -golang.org/x/term v0.21.0 h1:WVXCp+/EBEHOj53Rvu+7KiT/iElMrO8ACK16SMZ3jaA= -golang.org/x/term v0.21.0/go.mod h1:ooXLefLobQVslOqselCNF4SxFAaoS6KujMbsGzSDmX0= +golang.org/x/term v0.27.0 h1:WP60Sv1nlK1T6SupCHbXzSaN0b9wUmsPoRS9b61A23Q= +golang.org/x/term v0.27.0/go.mod h1:iMsnZpn0cago0GOrHO2+Y7u7JPn5AylBrcoWkElMTSM= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= @@ -392,8 +396,8 @@ golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= -golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4= -golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI= +golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo= +golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= @@ -406,12 +410,12 @@ golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8T golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20220517211312-f3a8303e98df/go.mod h1:K8+ghG5WaK9qNqU5K3HdILfMLy1f3aNYFI/wnl100a8= -google.golang.org/genproto/googleapis/api v0.0.0-20240701130421-f6361c86f094 h1:0+ozOGcrp+Y8Aq8TLNN2Aliibms5LEzsq99ZZmAGYm0= -google.golang.org/genproto/googleapis/api v0.0.0-20240701130421-f6361c86f094/go.mod h1:fJ/e3If/Q67Mj99hin0hMhiNyCRmt6BQ2aWIJshUSJw= -google.golang.org/genproto/googleapis/rpc v0.0.0-20240701130421-f6361c86f094 h1:BwIjyKYGsK9dMCBOorzRri8MQwmi7mT9rGHsCEinZkA= -google.golang.org/genproto/googleapis/rpc v0.0.0-20240701130421-f6361c86f094/go.mod h1:Ue6ibwXGpU+dqIcODieyLOcgj7z8+IcskoNIgZxtrFY= -google.golang.org/grpc v1.66.2 h1:3QdXkuq3Bkh7w+ywLdLvM56cmGvQHUMZpiCzt6Rqaoo= -google.golang.org/grpc v1.66.2/go.mod h1:s3/l6xSSCURdVfAnL+TqCNMyTDAGN6+lZeVxnZR128Y= +google.golang.org/genproto/googleapis/api v0.0.0-20241015192408-796eee8c2d53 h1:fVoAXEKA4+yufmbdVYv+SE73+cPZbbbe8paLsHfkK+U= +google.golang.org/genproto/googleapis/api v0.0.0-20241015192408-796eee8c2d53/go.mod h1:riSXTwQ4+nqmPGtobMFyW5FqVAmIs0St6VPp4Ug7CE4= +google.golang.org/genproto/googleapis/rpc v0.0.0-20241223144023-3abc09e42ca8 h1:TqExAhdPaB60Ux47Cn0oLV07rGnxZzIsaRhQaqS666A= +google.golang.org/genproto/googleapis/rpc v0.0.0-20241223144023-3abc09e42ca8/go.mod h1:lcTa1sDdWEIHMWlITnIczmw5w60CF9ffkb8Z+DVmmjA= +google.golang.org/grpc v1.69.2 h1:U3S9QEtbXC0bYNvRtcoklF3xGtLViumSYxWykJS+7AU= +google.golang.org/grpc v1.69.2/go.mod h1:vyjdE6jLBI76dgpDojsFGNaHlxdjXN9ghpnd2o7JGZ4= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= @@ -420,8 +424,8 @@ google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzi google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= -google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg= -google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw= +google.golang.org/protobuf v1.36.1 h1:yBPeRvTftaleIgM3PZ/WBIZ7XM/eEYAaEyCwvyjq/gk= +google.golang.org/protobuf v1.36.1/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= diff --git a/internal/logs/logs.go b/internal/logs/logs.go index d0bac4d11..ae77a5ab2 100644 --- a/internal/logs/logs.go +++ b/internal/logs/logs.go @@ -516,8 +516,10 @@ const ( StartedWritecacheSealAsync = "started writecache seal async" WritecacheSealCompletedAsync = "writecache seal completed successfully" FailedToSealWritecacheAsync = "failed to seal writecache async" - WritecacheShrinkSkippedNotEmpty = "writecache shrink skipped: not empty" + WritecacheShrinkSkippedNotEmpty = "writecache shrink skipped: database is not empty" BlobovniczatreeFailedToRemoveRebuildTempFile = "failed to remove rebuild temp file" - WritecacheCantGetObject = "can't get an object from fstree" FailedToUpdateMultinetConfiguration = "failed to update multinet configuration" + WritecacheCantRemoveObjectsFromTheDatabase = "can't remove objects from the database" + WritecacheTriedToFlushItemsFromWritecache = "tried to flush items from write-cache" + FSTreeCantDecodeDBObjectAddress = "can't decode object address from the DB" ) diff --git a/pkg/local_object_storage/blobstor/common/delete.go b/pkg/local_object_storage/blobstor/common/delete.go index c19e099cb..1b04eab1a 100644 --- a/pkg/local_object_storage/blobstor/common/delete.go +++ b/pkg/local_object_storage/blobstor/common/delete.go @@ -8,7 +8,6 @@ import ( type DeletePrm struct { Address oid.Address StorageID []byte - Size uint64 } // DeleteRes groups the resulting values of Delete operation. diff --git a/pkg/local_object_storage/blobstor/fstree/counter.go b/pkg/local_object_storage/blobstor/fstree/counter.go index b5dbc9e40..718104e2e 100644 --- a/pkg/local_object_storage/blobstor/fstree/counter.go +++ b/pkg/local_object_storage/blobstor/fstree/counter.go @@ -1,21 +1,22 @@ package fstree import ( - "sync" + "math" + "sync/atomic" ) // FileCounter used to count files in FSTree. The implementation must be thread-safe. type FileCounter interface { - Set(count, size uint64) - Inc(size uint64) - Dec(size uint64) + Set(v uint64) + Inc() + Dec() } type noopCounter struct{} -func (c *noopCounter) Set(uint64, uint64) {} -func (c *noopCounter) Inc(uint64) {} -func (c *noopCounter) Dec(uint64) {} +func (c *noopCounter) Set(uint64) {} +func (c *noopCounter) Inc() {} +func (c *noopCounter) Dec() {} func counterEnabled(c FileCounter) bool { _, noop := c.(*noopCounter) @@ -23,50 +24,14 @@ func counterEnabled(c FileCounter) bool { } type SimpleCounter struct { - mtx sync.RWMutex - count uint64 - size uint64 + v atomic.Uint64 } func NewSimpleCounter() *SimpleCounter { return &SimpleCounter{} } -func (c *SimpleCounter) Set(count, size uint64) { - c.mtx.Lock() - defer c.mtx.Unlock() - - c.count = count - c.size = size -} - -func (c *SimpleCounter) Inc(size uint64) { - c.mtx.Lock() - defer c.mtx.Unlock() - - c.count++ - c.size += size -} - -func (c *SimpleCounter) Dec(size uint64) { - c.mtx.Lock() - defer c.mtx.Unlock() - - if c.count > 0 { - c.count-- - } else { - panic("fstree.SimpleCounter: invalid count") - } - if c.size >= size { - c.size -= size - } else { - panic("fstree.SimpleCounter: invalid size") - } -} - -func (c *SimpleCounter) CountSize() (uint64, uint64) { - c.mtx.RLock() - defer c.mtx.RUnlock() - - return c.count, c.size -} +func (c *SimpleCounter) Set(v uint64) { c.v.Store(v) } +func (c *SimpleCounter) Inc() { c.v.Add(1) } +func (c *SimpleCounter) Dec() { c.v.Add(math.MaxUint64) } +func (c *SimpleCounter) Value() uint64 { return c.v.Load() } diff --git a/pkg/local_object_storage/blobstor/fstree/fstree.go b/pkg/local_object_storage/blobstor/fstree/fstree.go index 53eb0395a..13e7eb7b4 100644 --- a/pkg/local_object_storage/blobstor/fstree/fstree.go +++ b/pkg/local_object_storage/blobstor/fstree/fstree.go @@ -222,81 +222,6 @@ func (t *FSTree) iterate(ctx context.Context, depth uint64, curPath []string, pr return nil } -type ObjectInfo struct { - Address oid.Address - DataSize uint64 -} -type IterateInfoHandler func(ObjectInfo) error - -func (t *FSTree) IterateInfo(ctx context.Context, handler IterateInfoHandler) error { - var ( - err error - startedAt = time.Now() - ) - defer func() { - t.metrics.IterateInfo(time.Since(startedAt), err == nil) - }() - _, span := tracing.StartSpanFromContext(ctx, "FSTree.IterateInfo") - defer span.End() - - return t.iterateInfo(ctx, 0, []string{t.RootPath}, handler) -} - -func (t *FSTree) iterateInfo(ctx context.Context, depth uint64, curPath []string, handler IterateInfoHandler) error { - curName := strings.Join(curPath[1:], "") - dirPath := filepath.Join(curPath...) - entries, err := os.ReadDir(dirPath) - if err != nil { - return fmt.Errorf("read fstree dir '%s': %w", dirPath, err) - } - - isLast := depth >= t.Depth - l := len(curPath) - curPath = append(curPath, "") - - for i := range entries { - select { - case <-ctx.Done(): - return ctx.Err() - default: - } - curPath[l] = entries[i].Name() - - if !isLast && entries[i].IsDir() { - err := t.iterateInfo(ctx, depth+1, curPath, handler) - if err != nil { - return err - } - } - - if depth != t.Depth { - continue - } - - addr, err := addressFromString(curName + entries[i].Name()) - if err != nil { - continue - } - info, err := entries[i].Info() - if err != nil { - if os.IsNotExist(err) { - continue - } - return err - } - - err = handler(ObjectInfo{ - Address: addr, - DataSize: uint64(info.Size()), - }) - if err != nil { - return err - } - } - - return nil -} - func (t *FSTree) treePath(addr oid.Address) string { sAddr := stringifyAddress(addr) @@ -338,7 +263,7 @@ func (t *FSTree) Delete(ctx context.Context, prm common.DeletePrm) (common.Delet } p := t.treePath(prm.Address) - err = t.writer.removeFile(p, prm.Size) + err = t.writer.removeFile(p) return common.DeleteRes{}, err } @@ -510,38 +435,32 @@ func (t *FSTree) initFileCounter() error { return nil } - count, size, err := t.countFiles() + counter, err := t.countFiles() if err != nil { return err } - t.fileCounter.Set(count, size) + t.fileCounter.Set(counter) return nil } -func (t *FSTree) countFiles() (uint64, uint64, error) { - var count, size uint64 +func (t *FSTree) countFiles() (uint64, error) { + var counter uint64 // it is simpler to just consider every file // that is not directory as an object err := filepath.WalkDir(t.RootPath, func(_ string, d fs.DirEntry, _ error) error { - if d.IsDir() { - return nil + if !d.IsDir() { + counter++ } - count++ - info, err := d.Info() - if err != nil { - return err - } - size += uint64(info.Size()) return nil }, ) if err != nil { - return 0, 0, fmt.Errorf("could not walk through %s directory: %w", t.RootPath, err) + return 0, fmt.Errorf("could not walk through %s directory: %w", t.RootPath, err) } - return count, size, nil + return counter, nil } func (t *FSTree) ObjectsCount(ctx context.Context) (uint64, error) { diff --git a/pkg/local_object_storage/blobstor/fstree/fstree_test.go b/pkg/local_object_storage/blobstor/fstree/fstree_test.go index 50dae46a7..4a434e52c 100644 --- a/pkg/local_object_storage/blobstor/fstree/fstree_test.go +++ b/pkg/local_object_storage/blobstor/fstree/fstree_test.go @@ -47,9 +47,8 @@ func TestObjectCounter(t *testing.T) { require.NoError(t, fst.Open(mode.ComponentReadWrite)) require.NoError(t, fst.Init()) - count, size := counter.CountSize() - require.Equal(t, uint64(0), count) - require.Equal(t, uint64(0), size) + counterValue := counter.Value() + require.Equal(t, uint64(0), counterValue) defer func() { require.NoError(t, fst.Close(context.Background())) @@ -65,73 +64,39 @@ func TestObjectCounter(t *testing.T) { putPrm.Address = addr putPrm.RawData, _ = obj.Marshal() + var getPrm common.GetPrm + getPrm.Address = putPrm.Address + var delPrm common.DeletePrm delPrm.Address = addr - t.Run("without size hint", func(t *testing.T) { - eg, egCtx := errgroup.WithContext(context.Background()) + eg, egCtx := errgroup.WithContext(context.Background()) - eg.Go(func() error { - for range 1_000 { - _, err := fst.Put(egCtx, putPrm) - if err != nil { - return err - } + eg.Go(func() error { + for range 1_000 { + _, err := fst.Put(egCtx, putPrm) + if err != nil { + return err } - return nil - }) - - eg.Go(func() error { - var le logicerr.Logical - for range 1_000 { - _, err := fst.Delete(egCtx, delPrm) - if err != nil && !errors.As(err, &le) { - return err - } - } - return nil - }) - - require.NoError(t, eg.Wait()) - - count, size = counter.CountSize() - realCount, realSize, err := fst.countFiles() - require.NoError(t, err) - require.Equal(t, realCount, count, "real %d, actual %d", realCount, count) - require.Equal(t, realSize, size, "real %d, actual %d", realSize, size) + } + return nil }) - t.Run("with size hint", func(t *testing.T) { - delPrm.Size = uint64(len(putPrm.RawData)) - eg, egCtx := errgroup.WithContext(context.Background()) - - eg.Go(func() error { - for range 1_000 { - _, err := fst.Put(egCtx, putPrm) - if err != nil { - return err - } + eg.Go(func() error { + var le logicerr.Logical + for range 1_000 { + _, err := fst.Delete(egCtx, delPrm) + if err != nil && !errors.As(err, &le) { + return err } - return nil - }) - - eg.Go(func() error { - var le logicerr.Logical - for range 1_000 { - _, err := fst.Delete(egCtx, delPrm) - if err != nil && !errors.As(err, &le) { - return err - } - } - return nil - }) - - require.NoError(t, eg.Wait()) - - count, size = counter.CountSize() - realCount, realSize, err := fst.countFiles() - require.NoError(t, err) - require.Equal(t, realCount, count, "real %d, actual %d", realCount, count) - require.Equal(t, realSize, size, "real %d, actual %d", realSize, size) + } + return nil }) + + require.NoError(t, eg.Wait()) + + counterValue = counter.Value() + realCount, err := fst.countFiles() + require.NoError(t, err) + require.Equal(t, realCount, counterValue) } diff --git a/pkg/local_object_storage/blobstor/fstree/fstree_write_generic.go b/pkg/local_object_storage/blobstor/fstree/fstree_write_generic.go index 4110ba7d7..8b2622885 100644 --- a/pkg/local_object_storage/blobstor/fstree/fstree_write_generic.go +++ b/pkg/local_object_storage/blobstor/fstree/fstree_write_generic.go @@ -16,7 +16,7 @@ import ( type writer interface { writeData(string, []byte) error - removeFile(string, uint64) error + removeFile(string) error } type genericWriter struct { @@ -78,14 +78,14 @@ func (w *genericWriter) writeAndRename(tmpPath, p string, data []byte) error { } if w.fileCounterEnabled { - w.fileCounter.Inc(uint64(len(data))) + w.fileCounter.Inc() var targetFileExists bool if _, e := os.Stat(p); e == nil { targetFileExists = true } err = os.Rename(tmpPath, p) if err == nil && targetFileExists { - w.fileCounter.Dec(uint64(len(data))) + w.fileCounter.Dec() } } else { err = os.Rename(tmpPath, p) @@ -107,10 +107,15 @@ func (w *genericWriter) writeFile(p string, data []byte) error { return err } -func (w *genericWriter) removeFile(p string, size uint64) error { +func (w *genericWriter) removeFile(p string) error { var err error if w.fileCounterEnabled { - err = w.removeWithCounter(p, size) + w.fileGuard.Lock(p) + err = os.Remove(p) + w.fileGuard.Unlock(p) + if err == nil { + w.fileCounter.Dec() + } } else { err = os.Remove(p) } @@ -120,22 +125,3 @@ func (w *genericWriter) removeFile(p string, size uint64) error { } return err } - -func (w *genericWriter) removeWithCounter(p string, size uint64) error { - w.fileGuard.Lock(p) - defer w.fileGuard.Unlock(p) - - if size == 0 { - stat, err := os.Stat(p) - if err != nil { - return err - } - size = uint64(stat.Size()) - } - - if err := os.Remove(p); err != nil { - return err - } - w.fileCounter.Dec(uint64(size)) - return nil -} diff --git a/pkg/local_object_storage/blobstor/fstree/fstree_write_linux.go b/pkg/local_object_storage/blobstor/fstree/fstree_write_linux.go index 3561c616b..efc5a3d3d 100644 --- a/pkg/local_object_storage/blobstor/fstree/fstree_write_linux.go +++ b/pkg/local_object_storage/blobstor/fstree/fstree_write_linux.go @@ -9,7 +9,6 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr" - utilSync "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/sync" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" "golang.org/x/sys/unix" ) @@ -19,9 +18,7 @@ type linuxWriter struct { perm uint32 flags int - fileGuard keyLock - fileCounter FileCounter - fileCounterEnabled bool + counter FileCounter } func newSpecificWriteData(c FileCounter, root string, perm fs.FileMode, noSync bool) writer { @@ -36,18 +33,11 @@ func newSpecificWriteData(c FileCounter, root string, perm fs.FileMode, noSync b return nil } _ = unix.Close(fd) // Don't care about error. - var fileGuard keyLock = &noopKeyLock{} - fileCounterEnabled := counterEnabled(c) - if fileCounterEnabled { - fileGuard = utilSync.NewKeyLocker[string]() - } w := &linuxWriter{ - root: root, - perm: uint32(perm), - flags: flags, - fileGuard: fileGuard, - fileCounter: c, - fileCounterEnabled: fileCounterEnabled, + root: root, + perm: uint32(perm), + flags: flags, + counter: c, } return w } @@ -61,10 +51,6 @@ func (w *linuxWriter) writeData(p string, data []byte) error { } func (w *linuxWriter) writeFile(p string, data []byte) error { - if w.fileCounterEnabled { - w.fileGuard.Lock(p) - defer w.fileGuard.Unlock(p) - } fd, err := unix.Open(w.root, w.flags, w.perm) if err != nil { return err @@ -75,7 +61,7 @@ func (w *linuxWriter) writeFile(p string, data []byte) error { if n == len(data) { err = unix.Linkat(unix.AT_FDCWD, tmpPath, unix.AT_FDCWD, p, unix.AT_SYMLINK_FOLLOW) if err == nil { - w.fileCounter.Inc(uint64(len(data))) + w.counter.Inc() } if errors.Is(err, unix.EEXIST) { err = nil @@ -91,30 +77,13 @@ func (w *linuxWriter) writeFile(p string, data []byte) error { return errClose } -func (w *linuxWriter) removeFile(p string, size uint64) error { - if w.fileCounterEnabled { - w.fileGuard.Lock(p) - defer w.fileGuard.Unlock(p) - - if size == 0 { - var stat unix.Stat_t - err := unix.Stat(p, &stat) - if err != nil { - if err == unix.ENOENT { - return logicerr.Wrap(new(apistatus.ObjectNotFound)) - } - return err - } - size = uint64(stat.Size) - } - } - +func (w *linuxWriter) removeFile(p string) error { err := unix.Unlink(p) if err != nil && err == unix.ENOENT { return logicerr.Wrap(new(apistatus.ObjectNotFound)) } if err == nil { - w.fileCounter.Dec(uint64(size)) + w.counter.Dec() } return err } diff --git a/pkg/local_object_storage/blobstor/fstree/metrics.go b/pkg/local_object_storage/blobstor/fstree/metrics.go index 4241beec9..10de935eb 100644 --- a/pkg/local_object_storage/blobstor/fstree/metrics.go +++ b/pkg/local_object_storage/blobstor/fstree/metrics.go @@ -13,7 +13,6 @@ type Metrics interface { Close() Iterate(d time.Duration, success bool) - IterateInfo(d time.Duration, success bool) Delete(d time.Duration, success bool) Exists(d time.Duration, success bool) Put(d time.Duration, size int, success bool) @@ -28,7 +27,6 @@ func (m *noopMetrics) SetParentID(string) {} func (m *noopMetrics) SetMode(mode.ComponentMode) {} func (m *noopMetrics) Close() {} func (m *noopMetrics) Iterate(time.Duration, bool) {} -func (m *noopMetrics) IterateInfo(time.Duration, bool) {} func (m *noopMetrics) Delete(time.Duration, bool) {} func (m *noopMetrics) Exists(time.Duration, bool) {} func (m *noopMetrics) Put(time.Duration, int, bool) {} diff --git a/pkg/local_object_storage/engine/exists_test.go b/pkg/local_object_storage/engine/exists_test.go index 1b51c10dc..9b3c0833f 100644 --- a/pkg/local_object_storage/engine/exists_test.go +++ b/pkg/local_object_storage/engine/exists_test.go @@ -42,7 +42,7 @@ func benchmarkExists(b *testing.B, shardNum int) { for range b.N { var shPrm shard.ExistsPrm shPrm.Address = addr - shPrm.ParentAddress = oid.Address{} + shPrm.ECParentAddress = oid.Address{} ok, _, err := e.exists(context.Background(), shPrm) if err != nil || ok { b.Fatalf("%t %v", ok, err) diff --git a/pkg/local_object_storage/engine/inhume_test.go b/pkg/local_object_storage/engine/inhume_test.go index 6980afb07..8978a6cbf 100644 --- a/pkg/local_object_storage/engine/inhume_test.go +++ b/pkg/local_object_storage/engine/inhume_test.go @@ -7,8 +7,11 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard" + apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" + oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" + oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test" "github.com/stretchr/testify/require" ) @@ -84,3 +87,47 @@ func TestStorageEngine_Inhume(t *testing.T) { require.Empty(t, addrs) }) } + +func TestStorageEngine_ECInhume(t *testing.T) { + parentObjectAddress := oidtest.Address() + containerID := parentObjectAddress.Container() + + chunkObject0 := testutil.GenerateObjectWithCID(containerID) + chunkObject0.SetECHeader(objectSDK.NewECHeader( + objectSDK.ECParentInfo{ + ID: parentObjectAddress.Object(), + }, 0, 4, []byte{}, 0)) + + chunkObject1 := testutil.GenerateObjectWithCID(containerID) + chunkObject1.SetECHeader(objectSDK.NewECHeader( + objectSDK.ECParentInfo{ + ID: parentObjectAddress.Object(), + }, 1, 4, []byte{}, 0)) + + tombstone := objectSDK.NewTombstone() + tombstone.SetMembers([]oid.ID{parentObjectAddress.Object()}) + payload, err := tombstone.Marshal() + require.NoError(t, err) + tombstoneObject := testutil.GenerateObjectWithCID(containerID) + tombstoneObject.SetType(objectSDK.TypeTombstone) + tombstoneObject.SetPayload(payload) + tombstoneObjectAddress := object.AddressOf(tombstoneObject) + + e := testNewEngine(t).setShardsNum(t, 5).prepare(t).engine + defer func() { require.NoError(t, e.Close(context.Background())) }() + + require.NoError(t, Put(context.Background(), e, chunkObject0, false)) + + require.NoError(t, Put(context.Background(), e, tombstoneObject, false)) + + var inhumePrm InhumePrm + inhumePrm.WithTarget(tombstoneObjectAddress, parentObjectAddress) + _, err = e.Inhume(context.Background(), inhumePrm) + require.NoError(t, err) + + var alreadyRemoved *apistatus.ObjectAlreadyRemoved + + require.ErrorAs(t, Put(context.Background(), e, chunkObject0, false), &alreadyRemoved) + + require.ErrorAs(t, Put(context.Background(), e, chunkObject1, false), &alreadyRemoved) +} diff --git a/pkg/local_object_storage/engine/put.go b/pkg/local_object_storage/engine/put.go index e080191ae..ba4a144d1 100644 --- a/pkg/local_object_storage/engine/put.go +++ b/pkg/local_object_storage/engine/put.go @@ -71,21 +71,21 @@ func (e *StorageEngine) put(ctx context.Context, prm PutPrm) error { // In #1146 this check was parallelized, however, it became // much slower on fast machines for 4 shards. - var parent oid.Address + var ecParent oid.Address if prm.Object.ECHeader() != nil { - parent.SetObject(prm.Object.ECHeader().Parent()) - parent.SetContainer(addr.Container()) + ecParent.SetObject(prm.Object.ECHeader().Parent()) + ecParent.SetContainer(addr.Container()) } var shPrm shard.ExistsPrm shPrm.Address = addr - shPrm.ParentAddress = parent + shPrm.ECParentAddress = ecParent existed, locked, err := e.exists(ctx, shPrm) if err != nil { return err } if !existed && locked { - lockers, err := e.GetLocked(ctx, parent) + lockers, err := e.GetLocked(ctx, ecParent) if err != nil { return err } diff --git a/pkg/local_object_storage/engine/writecache.go b/pkg/local_object_storage/engine/writecache.go index e9ba3410f..7710bc7f4 100644 --- a/pkg/local_object_storage/engine/writecache.go +++ b/pkg/local_object_storage/engine/writecache.go @@ -169,16 +169,18 @@ func (m *writeCacheMetrics) Put(d time.Duration, success bool, st writecache.Sto m.metrics.AddMethodDuration(m.shardID, m.path, st.String(), "Put", success, d) } -func (m *writeCacheMetrics) SetEstimateSize(size uint64) { - m.metrics.SetEstimateSize(m.shardID, m.path, writecache.StorageTypeFSTree.String(), size) +func (m *writeCacheMetrics) SetEstimateSize(db, fstree uint64) { + m.metrics.SetEstimateSize(m.shardID, m.path, writecache.StorageTypeDB.String(), db) + m.metrics.SetEstimateSize(m.shardID, m.path, writecache.StorageTypeFSTree.String(), fstree) } func (m *writeCacheMetrics) SetMode(mod mode.ComponentMode) { m.metrics.SetMode(m.shardID, mod.String()) } -func (m *writeCacheMetrics) SetActualCounters(count uint64) { - m.metrics.SetActualCount(m.shardID, m.path, writecache.StorageTypeFSTree.String(), count) +func (m *writeCacheMetrics) SetActualCounters(db, fstree uint64) { + m.metrics.SetActualCount(m.shardID, m.path, writecache.StorageTypeDB.String(), db) + m.metrics.SetActualCount(m.shardID, m.path, writecache.StorageTypeFSTree.String(), fstree) } func (m *writeCacheMetrics) Flush(success bool, st writecache.StorageType) { diff --git a/pkg/local_object_storage/metabase/exists.go b/pkg/local_object_storage/metabase/exists.go index 0294dd3ba..ccd2a622d 100644 --- a/pkg/local_object_storage/metabase/exists.go +++ b/pkg/local_object_storage/metabase/exists.go @@ -1,7 +1,6 @@ package meta import ( - "bytes" "context" "fmt" "time" @@ -20,8 +19,8 @@ import ( // ExistsPrm groups the parameters of Exists operation. type ExistsPrm struct { - addr oid.Address - paddr oid.Address + addr oid.Address + ecParentAddr oid.Address } // ExistsRes groups the resulting values of Exists operation. @@ -37,9 +36,9 @@ func (p *ExistsPrm) SetAddress(addr oid.Address) { p.addr = addr } -// SetParent is an Exists option to set objects parent. -func (p *ExistsPrm) SetParent(addr oid.Address) { - p.paddr = addr +// SetECParent is an Exists option to set objects parent. +func (p *ExistsPrm) SetECParent(addr oid.Address) { + p.ecParentAddr = addr } // Exists returns the fact that the object is in the metabase. @@ -82,7 +81,7 @@ func (db *DB) Exists(ctx context.Context, prm ExistsPrm) (res ExistsRes, err err currEpoch := db.epochState.CurrentEpoch() err = db.boltDB.View(func(tx *bbolt.Tx) error { - res.exists, res.locked, err = db.exists(tx, prm.addr, prm.paddr, currEpoch) + res.exists, res.locked, err = db.exists(tx, prm.addr, prm.ecParentAddr, currEpoch) return err }) @@ -90,10 +89,21 @@ func (db *DB) Exists(ctx context.Context, prm ExistsPrm) (res ExistsRes, err err return res, metaerr.Wrap(err) } -func (db *DB) exists(tx *bbolt.Tx, addr oid.Address, parent oid.Address, currEpoch uint64) (bool, bool, error) { +func (db *DB) exists(tx *bbolt.Tx, addr oid.Address, ecParent oid.Address, currEpoch uint64) (bool, bool, error) { var locked bool - if !parent.Equals(oid.Address{}) { - locked = objectLocked(tx, parent.Container(), parent.Object()) + if !ecParent.Equals(oid.Address{}) { + st, err := objectStatus(tx, ecParent, currEpoch) + if err != nil { + return false, false, err + } + switch st { + case 2: + return false, locked, logicerr.Wrap(new(apistatus.ObjectAlreadyRemoved)) + case 3: + return false, locked, ErrObjectIsExpired + } + + locked = objectLocked(tx, ecParent.Container(), ecParent.Object()) } // check graveyard and object expiration first st, err := objectStatus(tx, addr, currEpoch) @@ -216,7 +226,7 @@ func getSplitInfo(tx *bbolt.Tx, cnr cid.ID, key []byte) (*objectSDK.SplitInfo, e splitInfo := objectSDK.NewSplitInfo() - err := splitInfo.Unmarshal(bytes.Clone(rawSplitInfo)) + err := splitInfo.Unmarshal(rawSplitInfo) if err != nil { return nil, fmt.Errorf("can't unmarshal split info from root index: %w", err) } diff --git a/pkg/local_object_storage/metabase/get.go b/pkg/local_object_storage/metabase/get.go index 1cbf78ab2..145190f56 100644 --- a/pkg/local_object_storage/metabase/get.go +++ b/pkg/local_object_storage/metabase/get.go @@ -1,7 +1,6 @@ package meta import ( - "bytes" "context" "fmt" "time" @@ -112,7 +111,7 @@ func (db *DB) get(tx *bbolt.Tx, addr oid.Address, key []byte, checkStatus, raw b // check in primary index data := getFromBucket(tx, primaryBucketName(cnr, bucketName), key) if len(data) != 0 { - return obj, obj.Unmarshal(bytes.Clone(data)) + return obj, obj.Unmarshal(data) } data = getFromBucket(tx, ecInfoBucketName(cnr, bucketName), key) @@ -123,13 +122,13 @@ func (db *DB) get(tx *bbolt.Tx, addr oid.Address, key []byte, checkStatus, raw b // if not found then check in tombstone index data = getFromBucket(tx, tombstoneBucketName(cnr, bucketName), key) if len(data) != 0 { - return obj, obj.Unmarshal(bytes.Clone(data)) + return obj, obj.Unmarshal(data) } // if not found then check in locker index data = getFromBucket(tx, bucketNameLockers(cnr, bucketName), key) if len(data) != 0 { - return obj, obj.Unmarshal(bytes.Clone(data)) + return obj, obj.Unmarshal(data) } // if not found then check if object is a virtual @@ -185,7 +184,7 @@ func getVirtualObject(tx *bbolt.Tx, cnr cid.ID, key []byte, raw bool) (*objectSD child := objectSDK.New() - err = child.Unmarshal(bytes.Clone(data)) + err = child.Unmarshal(data) if err != nil { return nil, fmt.Errorf("can't unmarshal child with parent: %w", err) } @@ -219,7 +218,7 @@ func getECInfoError(tx *bbolt.Tx, cnr cid.ID, data []byte) error { objData := getFromBucket(tx, primaryBucketName(cnr, make([]byte, bucketKeySize)), key) if len(objData) != 0 { obj := objectSDK.New() - if err := obj.Unmarshal(bytes.Clone(objData)); err != nil { + if err := obj.Unmarshal(objData); err != nil { return err } chunk := objectSDK.ECChunk{} diff --git a/pkg/local_object_storage/metabase/iterators.go b/pkg/local_object_storage/metabase/iterators.go index 5d42e4125..0ca07725c 100644 --- a/pkg/local_object_storage/metabase/iterators.go +++ b/pkg/local_object_storage/metabase/iterators.go @@ -1,7 +1,6 @@ package meta import ( - "bytes" "context" "errors" "fmt" @@ -195,7 +194,7 @@ func iteratePhyObjects(tx *bbolt.Tx, f func(cid.ID, oid.ID, *objectSDK.Object) e } return b.ForEach(func(k, v []byte) error { - if oid.Decode(k) == nil && obj.Unmarshal(bytes.Clone(v)) == nil { + if oid.Decode(k) == nil && obj.Unmarshal(v) == nil { return f(cid, oid, obj) } diff --git a/pkg/local_object_storage/metabase/list.go b/pkg/local_object_storage/metabase/list.go index a7ff2222f..b007ef0da 100644 --- a/pkg/local_object_storage/metabase/list.go +++ b/pkg/local_object_storage/metabase/list.go @@ -247,7 +247,7 @@ func selectNFromBucket(bkt *bbolt.Bucket, // main bucket var ecInfo *objectcore.ECInfo if objType == objectSDK.TypeRegular { var o objectSDK.Object - if err := o.Unmarshal(bytes.Clone(v)); err != nil { + if err := o.Unmarshal(v); err != nil { return nil, nil, nil, err } isLinkingObj = isLinkObject(&o) @@ -413,7 +413,7 @@ func (db *DB) iterateOverObjectsInContainer(ctx context.Context, tx *bbolt.Tx, p var ecInfo *objectcore.ECInfo if prm.ObjectType == objectSDK.TypeRegular { var o objectSDK.Object - if err := o.Unmarshal(bytes.Clone(v)); err != nil { + if err := o.Unmarshal(v); err != nil { return err } isLinkingObj = isLinkObject(&o) diff --git a/pkg/local_object_storage/metabase/put.go b/pkg/local_object_storage/metabase/put.go index d7675869f..383bc1143 100644 --- a/pkg/local_object_storage/metabase/put.go +++ b/pkg/local_object_storage/metabase/put.go @@ -1,7 +1,6 @@ package meta import ( - "bytes" "context" "encoding/binary" "errors" @@ -121,9 +120,15 @@ func (db *DB) put(tx *bbolt.Tx, return PutRes{}, errors.New("missing container in object") } + var ecParentAddress oid.Address + if ecHeader := obj.ECHeader(); ecHeader != nil { + ecParentAddress.SetContainer(cnr) + ecParentAddress.SetObject(ecHeader.Parent()) + } + isParent := si != nil - exists, _, err := db.exists(tx, objectCore.AddressOf(obj), oid.Address{}, currEpoch) + exists, _, err := db.exists(tx, objectCore.AddressOf(obj), ecParentAddress, currEpoch) var splitInfoError *objectSDK.SplitInfoError if errors.As(err, &splitInfoError) { @@ -314,7 +319,7 @@ func updateSplitInfoIndex(tx *bbolt.Tx, objKey []byte, cnr cid.ID, bucketName [] return si.Marshal() default: oldSI := objectSDK.NewSplitInfo() - if err := oldSI.Unmarshal(bytes.Clone(old)); err != nil { + if err := oldSI.Unmarshal(old); err != nil { return nil, err } si = util.MergeSplitInfo(si, oldSI) diff --git a/pkg/local_object_storage/metrics/fstree.go b/pkg/local_object_storage/metrics/fstree.go index d93363fa3..76822ac2c 100644 --- a/pkg/local_object_storage/metrics/fstree.go +++ b/pkg/local_object_storage/metrics/fstree.go @@ -38,10 +38,6 @@ func (m *fstreeMetrics) Iterate(d time.Duration, success bool) { m.m.MethodDuration(m.shardID, m.path, "Iterate", d, success) } -func (m *fstreeMetrics) IterateInfo(d time.Duration, success bool) { - m.m.MethodDuration(m.shardID, m.path, "IterateInfo", d, success) -} - func (m *fstreeMetrics) Delete(d time.Duration, success bool) { m.m.MethodDuration(m.shardID, m.path, "Delete", d, success) } diff --git a/pkg/local_object_storage/shard/exists.go b/pkg/local_object_storage/shard/exists.go index 784bf293a..82ce48dde 100644 --- a/pkg/local_object_storage/shard/exists.go +++ b/pkg/local_object_storage/shard/exists.go @@ -18,7 +18,7 @@ type ExistsPrm struct { // Exists option to set object checked for existence. Address oid.Address // Exists option to set parent object checked for existence. - ParentAddress oid.Address + ECParentAddress oid.Address } // ExistsRes groups the resulting values of Exists operation. @@ -74,7 +74,7 @@ func (s *Shard) Exists(ctx context.Context, prm ExistsPrm) (ExistsRes, error) { } else { var existsPrm meta.ExistsPrm existsPrm.SetAddress(prm.Address) - existsPrm.SetParent(prm.ParentAddress) + existsPrm.SetECParent(prm.ECParentAddress) var res meta.ExistsRes res, err = s.metaBase.Exists(ctx, existsPrm) diff --git a/pkg/local_object_storage/writecache/benchmark/writecache_test.go b/pkg/local_object_storage/writecache/benchmark/writecache_test.go index fd85b4501..e96f67049 100644 --- a/pkg/local_object_storage/writecache/benchmark/writecache_test.go +++ b/pkg/local_object_storage/writecache/benchmark/writecache_test.go @@ -118,5 +118,6 @@ func newCache(b *testing.B) writecache.Cache { writecache.WithBlobstor(bs), writecache.WithMetabase(testMetabase{}), writecache.WithMaxCacheSize(256<<30), + writecache.WithSmallObjectSize(128<<10), ) } diff --git a/pkg/local_object_storage/writecache/cache.go b/pkg/local_object_storage/writecache/cachebbolt.go similarity index 71% rename from pkg/local_object_storage/writecache/cache.go rename to pkg/local_object_storage/writecache/cachebbolt.go index e829d013c..9bcc89f6a 100644 --- a/pkg/local_object_storage/writecache/cache.go +++ b/pkg/local_object_storage/writecache/cachebbolt.go @@ -2,7 +2,7 @@ package writecache import ( "context" - "fmt" + "os" "sync" "sync/atomic" @@ -10,7 +10,8 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" - oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" + objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" + "go.etcd.io/bbolt" "go.uber.org/zap" ) @@ -26,41 +27,48 @@ type cache struct { cancel atomic.Value // wg is a wait group for flush workers. wg sync.WaitGroup + // store contains underlying database. + store // fsTree contains big files stored directly on file-system. fsTree *fstree.FSTree - // counter contains atomic counters for the number of objects stored in cache. - counter *fstree.SimpleCounter } // wcStorageType is used for write-cache operations logging. const wcStorageType = "write-cache" type objectInfo struct { - addr oid.Address - size uint64 + addr string + data []byte + obj *objectSDK.Object } const ( - defaultMaxObjectSize = 64 * 1024 * 1024 // 64 MiB - defaultMaxCacheSize = 1 << 30 // 1 GiB + defaultMaxObjectSize = 64 * 1024 * 1024 // 64 MiB + defaultSmallObjectSize = 32 * 1024 // 32 KiB + defaultMaxCacheSize = 1 << 30 // 1 GiB ) -var dummyCanceler context.CancelFunc = func() {} +var ( + defaultBucket = []byte{0} + dummyCanceler context.CancelFunc = func() {} +) // New creates new writecache instance. func New(opts ...Option) Cache { c := &cache{ flushCh: make(chan objectInfo), mode: mode.Disabled, - counter: fstree.NewSimpleCounter(), options: options{ - log: logger.NewLoggerWrapper(zap.NewNop()), - maxObjectSize: defaultMaxObjectSize, - workersCount: defaultFlushWorkersCount, - maxCacheSize: defaultMaxCacheSize, - metrics: DefaultMetrics(), - flushSizeLimit: defaultFlushWorkersCount * defaultMaxObjectSize, + log: logger.NewLoggerWrapper(zap.NewNop()), + maxObjectSize: defaultMaxObjectSize, + smallObjectSize: defaultSmallObjectSize, + workersCount: defaultFlushWorkersCount, + maxCacheSize: defaultMaxCacheSize, + maxBatchSize: bbolt.DefaultMaxBatchSize, + maxBatchDelay: bbolt.DefaultMaxBatchDelay, + openFile: os.OpenFile, + metrics: DefaultMetrics(), }, } @@ -94,23 +102,21 @@ func (c *cache) Open(_ context.Context, mod mode.Mode) error { if err != nil { return metaerr.Wrap(err) } + return metaerr.Wrap(c.initCounters()) } // Init runs necessary services. func (c *cache) Init(ctx context.Context) error { c.metrics.SetMode(mode.ConvertToComponentModeDegraded(c.mode)) - if err := c.flushAndDropBBoltDB(ctx); err != nil { - return fmt.Errorf("flush previous version write-cache database: %w", err) - } - ctx, cancel := context.WithCancel(context.WithoutCancel(ctx)) // canceling performed by cache + ctx, cancel := context.WithCancel(ctx) c.cancel.Store(cancel) c.runFlushLoop(ctx) return nil } // Close closes db connection and stops services. Executes ObjectCounters.FlushAndClose op. -func (c *cache) Close(ctx context.Context) error { +func (c *cache) Close(_ context.Context) error { if cancelValue := c.cancel.Swap(dummyCanceler); cancelValue != nil { cancelValue.(context.CancelFunc)() } @@ -126,10 +132,10 @@ func (c *cache) Close(ctx context.Context) error { defer c.modeMtx.Unlock() var err error - if c.fsTree != nil { - err = c.fsTree.Close(ctx) + if c.db != nil { + err = c.db.Close() if err != nil { - c.fsTree = nil + c.db = nil } } c.metrics.Close() diff --git a/pkg/local_object_storage/writecache/delete.go b/pkg/local_object_storage/writecache/delete.go index 94a0a40db..a17c1c0b6 100644 --- a/pkg/local_object_storage/writecache/delete.go +++ b/pkg/local_object_storage/writecache/delete.go @@ -2,6 +2,7 @@ package writecache import ( "context" + "math" "time" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" @@ -9,6 +10,7 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" + "go.etcd.io/bbolt" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" ) @@ -43,11 +45,46 @@ func (c *cache) Delete(ctx context.Context, addr oid.Address) error { return ErrDegraded } + saddr := addr.EncodeToString() + + var dataSize int + _ = c.db.View(func(tx *bbolt.Tx) error { + b := tx.Bucket(defaultBucket) + dataSize = len(b.Get([]byte(saddr))) + return nil + }) + + if dataSize > 0 { + storageType = StorageTypeDB + var recordDeleted bool + err := c.db.Update(func(tx *bbolt.Tx) error { + b := tx.Bucket(defaultBucket) + key := []byte(saddr) + recordDeleted = b.Get(key) != nil + err := b.Delete(key) + return err + }) + if err != nil { + return err + } + storagelog.Write(ctx, c.log, + storagelog.AddressField(saddr), + storagelog.StorageTypeField(wcStorageType), + storagelog.OpField("db DELETE"), + ) + if recordDeleted { + c.objCounters.cDB.Add(math.MaxUint64) + c.estimateCacheSize() + } + deleted = true + return nil + } + storageType = StorageTypeFSTree _, err := c.fsTree.Delete(ctx, common.DeletePrm{Address: addr}) if err == nil { storagelog.Write(ctx, c.log, - storagelog.AddressField(addr.EncodeToString()), + storagelog.AddressField(saddr), storagelog.StorageTypeField(wcStorageType), storagelog.OpField("fstree DELETE"), ) diff --git a/pkg/local_object_storage/writecache/flush.go b/pkg/local_object_storage/writecache/flush.go index d9e34ceab..a869eb028 100644 --- a/pkg/local_object_storage/writecache/flush.go +++ b/pkg/local_object_storage/writecache/flush.go @@ -1,6 +1,7 @@ package writecache import ( + "bytes" "context" "errors" "time" @@ -9,23 +10,28 @@ import ( objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr" meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" - "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" + oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" + "github.com/mr-tron/base58" + "go.etcd.io/bbolt" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" "go.uber.org/zap" ) const ( + // flushBatchSize is amount of keys which will be read from cache to be flushed + // to the main storage. It is used to reduce contention between cache put + // and cache persist. + flushBatchSize = 512 // defaultFlushWorkersCount is number of workers for putting objects in main storage. defaultFlushWorkersCount = 20 // defaultFlushInterval is default time interval between successive flushes. - defaultFlushInterval = 10 * time.Second + defaultFlushInterval = time.Second ) var errIterationCompleted = errors.New("iteration completed") @@ -35,53 +41,126 @@ func (c *cache) runFlushLoop(ctx context.Context) { if c.disableBackgroundFlush { return } - fl := newFlushLimiter(c.flushSizeLimit) + for range c.workersCount { + c.wg.Add(1) + go c.workerFlushSmall(ctx) + } + + c.wg.Add(1) + go func() { + c.workerFlushBig(ctx) + c.wg.Done() + }() + c.wg.Add(1) go func() { defer c.wg.Done() - c.pushToFlushQueue(ctx, fl) - }() - for range c.workersCount { - c.wg.Add(1) - go c.workerFlush(ctx, fl) + tt := time.NewTimer(defaultFlushInterval) + defer tt.Stop() + + for { + select { + case <-tt.C: + c.flushSmallObjects(ctx) + tt.Reset(defaultFlushInterval) + c.estimateCacheSize() + case <-ctx.Done(): + return + } + } + }() +} + +func (c *cache) flushSmallObjects(ctx context.Context) { + var lastKey []byte + for { + select { + case <-ctx.Done(): + return + default: + } + + var m []objectInfo + + c.modeMtx.RLock() + if c.readOnly() { + c.modeMtx.RUnlock() + time.Sleep(time.Second) + continue + } + + // We put objects in batches of fixed size to not interfere with main put cycle a lot. + _ = c.db.View(func(tx *bbolt.Tx) error { + b := tx.Bucket(defaultBucket) + cs := b.Cursor() + + var k, v []byte + + if len(lastKey) == 0 { + k, v = cs.First() + } else { + k, v = cs.Seek(lastKey) + if bytes.Equal(k, lastKey) { + k, v = cs.Next() + } + } + + for ; k != nil && len(m) < flushBatchSize; k, v = cs.Next() { + if len(lastKey) == len(k) { + copy(lastKey, k) + } else { + lastKey = bytes.Clone(k) + } + + m = append(m, objectInfo{ + addr: string(k), + data: bytes.Clone(v), + }) + } + return nil + }) + + var count int + for i := range m { + obj := objectSDK.New() + if err := obj.Unmarshal(m[i].data); err != nil { + continue + } + m[i].obj = obj + + count++ + select { + case c.flushCh <- m[i]: + case <-ctx.Done(): + c.modeMtx.RUnlock() + return + } + } + + c.modeMtx.RUnlock() + if count == 0 { + break + } + + c.log.Debug(ctx, logs.WritecacheTriedToFlushItemsFromWritecache, + zap.Int("count", count), + zap.String("start", base58.Encode(lastKey))) } } -func (c *cache) pushToFlushQueue(ctx context.Context, fl *flushLimiter) { - stopf := context.AfterFunc(ctx, func() { - fl.close() - }) - defer stopf() - - tick := time.NewTicker(defaultFlushInterval) +func (c *cache) workerFlushBig(ctx context.Context) { + tick := time.NewTicker(defaultFlushInterval * 10) for { select { case <-tick.C: c.modeMtx.RLock() if c.readOnly() || c.noMetabase() { c.modeMtx.RUnlock() - continue + break } - err := c.fsTree.IterateInfo(ctx, func(oi fstree.ObjectInfo) error { - if err := fl.acquire(oi.DataSize); err != nil { - return err - } - select { - case c.flushCh <- objectInfo{ - addr: oi.Address, - size: oi.DataSize, - }: - return nil - case <-ctx.Done(): - fl.release(oi.DataSize) - return ctx.Err() - } - }) - if err != nil { - c.log.Warn(ctx, logs.BlobstorErrorOccurredDuringTheIteration, zap.Error(err)) - } + _ = c.flushFSTree(ctx, true) c.modeMtx.RUnlock() case <-ctx.Done(): @@ -90,42 +169,6 @@ func (c *cache) pushToFlushQueue(ctx context.Context, fl *flushLimiter) { } } -func (c *cache) workerFlush(ctx context.Context, fl *flushLimiter) { - defer c.wg.Done() - - var objInfo objectInfo - for { - select { - case objInfo = <-c.flushCh: - c.flushIfAnObjectExistsWorker(ctx, objInfo, fl) - case <-ctx.Done(): - return - } - } -} - -func (c *cache) flushIfAnObjectExistsWorker(ctx context.Context, objInfo objectInfo, fl *flushLimiter) { - defer fl.release(objInfo.size) - - res, err := c.fsTree.Get(ctx, common.GetPrm{ - Address: objInfo.addr, - }) - if err != nil { - if !client.IsErrObjectNotFound(err) { - c.reportFlushError(ctx, logs.WritecacheCantGetObject, objInfo.addr.EncodeToString(), metaerr.Wrap(err)) - } - return - } - - err = c.flushObject(ctx, res.Object, res.RawData, StorageTypeFSTree) - if err != nil { - // Error is handled in flushObject. - return - } - - c.deleteFromDisk(ctx, objInfo.addr, uint64(len(res.RawData))) -} - func (c *cache) reportFlushError(ctx context.Context, msg string, addr string, err error) { if c.reportError != nil { c.reportError(ctx, msg, err) @@ -154,10 +197,13 @@ func (c *cache) flushFSTree(ctx context.Context, ignoreErrors bool) error { err = c.flushObject(ctx, &obj, e.ObjectData, StorageTypeFSTree) if err != nil { + if ignoreErrors { + return nil + } return err } - c.deleteFromDisk(ctx, e.Address, uint64(len(e.ObjectData))) + c.deleteFromDisk(ctx, e.Address) return nil } @@ -165,6 +211,29 @@ func (c *cache) flushFSTree(ctx context.Context, ignoreErrors bool) error { return err } +// workerFlushSmall writes small objects to the main storage. +func (c *cache) workerFlushSmall(ctx context.Context) { + defer c.wg.Done() + + var objInfo objectInfo + for { + // Give priority to direct put. + select { + case objInfo = <-c.flushCh: + case <-ctx.Done(): + return + } + + err := c.flushObject(ctx, objInfo.obj, objInfo.data, StorageTypeDB) + if err != nil { + // Error is handled in flushObject. + continue + } + + c.deleteFromDB(ctx, objInfo.addr, true) + } +} + // flushObject is used to write object directly to the main storage. func (c *cache) flushObject(ctx context.Context, obj *objectSDK.Object, data []byte, st StorageType) error { var err error @@ -231,5 +300,74 @@ func (c *cache) Flush(ctx context.Context, ignoreErrors, seal bool) error { } func (c *cache) flush(ctx context.Context, ignoreErrors bool) error { - return c.flushFSTree(ctx, ignoreErrors) + if err := c.flushFSTree(ctx, ignoreErrors); err != nil { + return err + } + + var last string + for { + batch, err := c.readNextDBBatch(ctx, ignoreErrors, last) + if err != nil { + return err + } + if len(batch) == 0 { + break + } + for _, item := range batch { + var obj objectSDK.Object + if err := obj.Unmarshal(item.data); err != nil { + c.reportFlushError(ctx, logs.FSTreeCantDecodeDBObjectAddress, item.address, metaerr.Wrap(err)) + if ignoreErrors { + continue + } + return err + } + + if err := c.flushObject(ctx, &obj, item.data, StorageTypeDB); err != nil { + return err + } + c.deleteFromDB(ctx, item.address, false) + } + last = batch[len(batch)-1].address + } + return nil +} + +type batchItem struct { + data []byte + address string +} + +func (c *cache) readNextDBBatch(ctx context.Context, ignoreErrors bool, last string) ([]batchItem, error) { + const batchSize = 100 + var batch []batchItem + err := c.db.View(func(tx *bbolt.Tx) error { + var addr oid.Address + + b := tx.Bucket(defaultBucket) + cs := b.Cursor() + for k, data := cs.Seek([]byte(last)); k != nil; k, data = cs.Next() { + sa := string(k) + if sa == last { + continue + } + if err := addr.DecodeString(sa); err != nil { + c.reportFlushError(ctx, logs.FSTreeCantDecodeDBObjectAddress, sa, metaerr.Wrap(err)) + if ignoreErrors { + continue + } + return err + } + + batch = append(batch, batchItem{data: bytes.Clone(data), address: sa}) + if len(batch) == batchSize { + return errIterationCompleted + } + } + return nil + }) + if err == nil || errors.Is(err, errIterationCompleted) { + return batch, nil + } + return nil, err } diff --git a/pkg/local_object_storage/writecache/flush_test.go b/pkg/local_object_storage/writecache/flush_test.go index 7fc84657c..db3d79a86 100644 --- a/pkg/local_object_storage/writecache/flush_test.go +++ b/pkg/local_object_storage/writecache/flush_test.go @@ -19,17 +19,19 @@ import ( oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test" "github.com/stretchr/testify/require" + "go.etcd.io/bbolt" "go.uber.org/zap" ) func TestFlush(t *testing.T) { testlogger := test.NewLogger(t) - createCacheFn := func(t *testing.T, mb *meta.DB, bs MainStorage, opts ...Option) Cache { + createCacheFn := func(t *testing.T, smallSize uint64, mb *meta.DB, bs MainStorage, opts ...Option) Cache { return New( append([]Option{ WithLogger(testlogger), WithPath(filepath.Join(t.TempDir(), "writecache")), + WithSmallObjectSize(smallSize), WithMetabase(mb), WithBlobstor(bs), WithDisableBackgroundFlush(), @@ -45,6 +47,31 @@ func TestFlush(t *testing.T) { } failures := []TestFailureInjector[Option]{ + { + Desc: "db, invalid address", + InjectFn: func(t *testing.T, wc Cache) { + c := wc.(*cache) + obj := testutil.GenerateObject() + data, err := obj.Marshal() + require.NoError(t, err) + require.NoError(t, c.db.Batch(func(tx *bbolt.Tx) error { + b := tx.Bucket(defaultBucket) + return b.Put([]byte{1, 2, 3}, data) + })) + }, + }, + { + Desc: "db, invalid object", + InjectFn: func(t *testing.T, wc Cache) { + c := wc.(*cache) + require.NoError(t, c.db.Batch(func(tx *bbolt.Tx) error { + b := tx.Bucket(defaultBucket) + k := []byte(oidtest.Address().EncodeToString()) + v := []byte{1, 2, 3} + return b.Put(k, v) + })) + }, + }, { Desc: "fs, read error", InjectFn: func(t *testing.T, wc Cache) { @@ -91,6 +118,7 @@ const ( type CreateCacheFunc[Option any] func( t *testing.T, + smallSize uint64, meta *meta.DB, bs MainStorage, opts ...Option, @@ -113,7 +141,7 @@ func runFlushTest[Option any]( failures ...TestFailureInjector[Option], ) { t.Run("no errors", func(t *testing.T) { - wc, bs, mb := newCache(t, createCacheFn) + wc, bs, mb := newCache(t, createCacheFn, smallSize) defer func() { require.NoError(t, wc.Close(context.Background())) }() objects := putObjects(t, wc) @@ -126,7 +154,7 @@ func runFlushTest[Option any]( }) t.Run("flush on moving to degraded mode", func(t *testing.T) { - wc, bs, mb := newCache(t, createCacheFn) + wc, bs, mb := newCache(t, createCacheFn, smallSize) defer func() { require.NoError(t, wc.Close(context.Background())) }() objects := putObjects(t, wc) @@ -144,7 +172,7 @@ func runFlushTest[Option any]( for _, f := range failures { t.Run(f.Desc, func(t *testing.T) { errCountOpt, errCount := errCountOption() - wc, bs, mb := newCache(t, createCacheFn, errCountOpt) + wc, bs, mb := newCache(t, createCacheFn, smallSize, errCountOpt) defer func() { require.NoError(t, wc.Close(context.Background())) }() objects := putObjects(t, wc) f.InjectFn(t, wc) @@ -166,6 +194,7 @@ func runFlushTest[Option any]( func newCache[Option any]( t *testing.T, createCacheFn CreateCacheFunc[Option], + smallSize uint64, opts ...Option, ) (Cache, *blobstor.BlobStor, *meta.DB) { dir := t.TempDir() @@ -186,7 +215,7 @@ func newCache[Option any]( require.NoError(t, bs.Open(context.Background(), mode.ReadWrite)) require.NoError(t, bs.Init(context.Background())) - wc := createCacheFn(t, mb, bs, opts...) + wc := createCacheFn(t, smallSize, mb, bs, opts...) require.NoError(t, wc.Open(context.Background(), mode.ReadWrite)) require.NoError(t, wc.Init(context.Background())) @@ -234,7 +263,7 @@ func check(t *testing.T, mb *meta.DB, bs *blobstor.BlobStor, objects []objectPai prm.StorageID = mRes.StorageID() res, err := bs.Get(context.Background(), prm) - require.NoError(t, err, objects[i].addr) + require.NoError(t, err) require.Equal(t, objects[i].obj, res.Object) } } diff --git a/pkg/local_object_storage/writecache/get.go b/pkg/local_object_storage/writecache/get.go index c0847a65f..bf26833bd 100644 --- a/pkg/local_object_storage/writecache/get.go +++ b/pkg/local_object_storage/writecache/get.go @@ -37,11 +37,11 @@ func (c *cache) Get(ctx context.Context, addr oid.Address) (*objectSDK.Object, e return nil, ErrDegraded } - obj, err := c.getInternal(ctx, addr) + obj, err := c.getInternal(ctx, saddr, addr) return obj, metaerr.Wrap(err) } -func (c *cache) getInternal(ctx context.Context, addr oid.Address) (*objectSDK.Object, error) { +func (c *cache) getInternal(ctx context.Context, saddr string, addr oid.Address) (*objectSDK.Object, error) { found := false storageType := StorageTypeUndefined startedAt := time.Now() @@ -49,6 +49,14 @@ func (c *cache) getInternal(ctx context.Context, addr oid.Address) (*objectSDK.O c.metrics.Get(time.Since(startedAt), found, storageType) }() + value, err := Get(c.db, []byte(saddr)) + if err == nil { + obj := objectSDK.New() + found = true + storageType = StorageTypeDB + return obj, obj.Unmarshal(value) + } + res, err := c.fsTree.Get(ctx, common.GetPrm{Address: addr}) if err != nil { return nil, logicerr.Wrap(new(apistatus.ObjectNotFound)) @@ -79,7 +87,7 @@ func (c *cache) Head(ctx context.Context, addr oid.Address) (*objectSDK.Object, return nil, ErrDegraded } - obj, err := c.getInternal(ctx, addr) + obj, err := c.getInternal(ctx, saddr, addr) if err != nil { return nil, metaerr.Wrap(err) } diff --git a/pkg/local_object_storage/writecache/limiter.go b/pkg/local_object_storage/writecache/limiter.go deleted file mode 100644 index ddc4101be..000000000 --- a/pkg/local_object_storage/writecache/limiter.go +++ /dev/null @@ -1,70 +0,0 @@ -package writecache - -import ( - "errors" - "sync" -) - -var errLimiterClosed = errors.New("acquire failed: limiter closed") - -// flushLimiter is used to limit the total size of objects -// being flushed to blobstore at the same time. This is a necessary -// limitation so that the flushing process does not have -// a strong impact on user requests. -type flushLimiter struct { - count, size uint64 - maxSize uint64 - cond *sync.Cond - closed bool -} - -func newFlushLimiter(maxSize uint64) *flushLimiter { - return &flushLimiter{ - maxSize: maxSize, - cond: sync.NewCond(&sync.Mutex{}), - } -} - -func (l *flushLimiter) acquire(size uint64) error { - l.cond.L.Lock() - defer l.cond.L.Unlock() - - // it is allowed to overflow maxSize to allow flushing objects with size > maxSize - for l.count > 0 && l.size+size > l.maxSize && !l.closed { - l.cond.Wait() - if l.closed { - return errLimiterClosed - } - } - l.count++ - l.size += size - return nil -} - -func (l *flushLimiter) release(size uint64) { - l.cond.L.Lock() - defer l.cond.L.Unlock() - - if l.size >= size { - l.size -= size - } else { - panic("flushLimiter: invalid size") - } - - if l.count > 0 { - l.count-- - } else { - panic("flushLimiter: invalid count") - } - - l.cond.Broadcast() -} - -func (l *flushLimiter) close() { - l.cond.L.Lock() - defer l.cond.L.Unlock() - - l.closed = true - - l.cond.Broadcast() -} diff --git a/pkg/local_object_storage/writecache/limiter_test.go b/pkg/local_object_storage/writecache/limiter_test.go deleted file mode 100644 index 1ca3e1156..000000000 --- a/pkg/local_object_storage/writecache/limiter_test.go +++ /dev/null @@ -1,27 +0,0 @@ -package writecache - -import ( - "sync/atomic" - "testing" - - "github.com/stretchr/testify/require" - "golang.org/x/sync/errgroup" -) - -func TestLimiter(t *testing.T) { - var maxSize uint64 = 10 - var single uint64 = 3 - l := newFlushLimiter(uint64(maxSize)) - var currSize atomic.Int64 - var eg errgroup.Group - for range 10_000 { - eg.Go(func() error { - defer l.release(single) - defer currSize.Add(-1) - l.acquire(single) - require.True(t, currSize.Add(1) <= 3) - return nil - }) - } - require.NoError(t, eg.Wait()) -} diff --git a/pkg/local_object_storage/writecache/metrics.go b/pkg/local_object_storage/writecache/metrics.go index e3641f85e..e68b6d8be 100644 --- a/pkg/local_object_storage/writecache/metrics.go +++ b/pkg/local_object_storage/writecache/metrics.go @@ -26,9 +26,9 @@ type Metrics interface { Flush(success bool, st StorageType) Evict(st StorageType) - SetEstimateSize(uint64) + SetEstimateSize(db, fstree uint64) SetMode(m mode.ComponentMode) - SetActualCounters(uint64) + SetActualCounters(db, fstree uint64) SetPath(path string) Close() } @@ -47,11 +47,11 @@ func (metricsStub) Delete(time.Duration, bool, StorageType) {} func (metricsStub) Put(time.Duration, bool, StorageType) {} -func (metricsStub) SetEstimateSize(uint64) {} +func (metricsStub) SetEstimateSize(uint64, uint64) {} func (metricsStub) SetMode(mode.ComponentMode) {} -func (metricsStub) SetActualCounters(uint64) {} +func (metricsStub) SetActualCounters(uint64, uint64) {} func (metricsStub) Flush(bool, StorageType) {} diff --git a/pkg/local_object_storage/writecache/mode.go b/pkg/local_object_storage/writecache/mode.go index 73d12fd33..603da7e78 100644 --- a/pkg/local_object_storage/writecache/mode.go +++ b/pkg/local_object_storage/writecache/mode.go @@ -5,12 +5,13 @@ import ( "errors" "fmt" "os" + "path/filepath" "time" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" + "go.etcd.io/bbolt" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" ) @@ -52,7 +53,7 @@ func (c *cache) setMode(ctx context.Context, m mode.Mode, prm setModePrm) error } } - if err := c.closeStorage(ctx, prm.shrink); err != nil { + if err := c.closeDB(ctx, prm.shrink); err != nil { return err } @@ -77,37 +78,33 @@ func (c *cache) setMode(ctx context.Context, m mode.Mode, prm setModePrm) error return nil } -func (c *cache) closeStorage(ctx context.Context, shrink bool) error { - if c.fsTree == nil { +func (c *cache) closeDB(ctx context.Context, shrink bool) error { + if c.db == nil { return nil } if !shrink { - if err := c.fsTree.Close(ctx); err != nil { - return fmt.Errorf("can't close write-cache storage: %w", err) + if err := c.db.Close(); err != nil { + return fmt.Errorf("can't close write-cache database: %w", err) } return nil } - empty := true - _, err := c.fsTree.Iterate(ctx, common.IteratePrm{ - Handler: func(common.IterationElement) error { - return errIterationCompleted - }, + var empty bool + err := c.db.View(func(tx *bbolt.Tx) error { + b := tx.Bucket(defaultBucket) + empty = b == nil || b.Stats().KeyN == 0 + return nil }) - if err != nil { - if errors.Is(err, errIterationCompleted) { - empty = false - } else { - return fmt.Errorf("failed to check write-cache items: %w", err) - } + if err != nil && !errors.Is(err, bbolt.ErrDatabaseNotOpen) { + return fmt.Errorf("failed to check DB items: %w", err) } - if err := c.fsTree.Close(ctx); err != nil { - return fmt.Errorf("can't close write-cache storage: %w", err) + if err := c.db.Close(); err != nil { + return fmt.Errorf("can't close write-cache database: %w", err) } if empty { - err := os.RemoveAll(c.path) + err := os.Remove(filepath.Join(c.path, dbName)) if err != nil && !os.IsNotExist(err) { - return fmt.Errorf("failed to remove write-cache files: %w", err) + return fmt.Errorf("failed to remove DB file: %w", err) } } else { c.log.Info(ctx, logs.WritecacheShrinkSkippedNotEmpty) diff --git a/pkg/local_object_storage/writecache/mode_test.go b/pkg/local_object_storage/writecache/mode_test.go index 4fbadbc64..9facb784a 100644 --- a/pkg/local_object_storage/writecache/mode_test.go +++ b/pkg/local_object_storage/writecache/mode_test.go @@ -17,14 +17,14 @@ func TestMode(t *testing.T) { WithPath(t.TempDir())) require.NoError(t, wc.Open(context.Background(), mode.DegradedReadOnly)) - require.Nil(t, wc.(*cache).fsTree) + require.Nil(t, wc.(*cache).db) require.NoError(t, wc.Init(context.Background())) - require.Nil(t, wc.(*cache).fsTree) + require.Nil(t, wc.(*cache).db) require.NoError(t, wc.Close(context.Background())) require.NoError(t, wc.Open(context.Background(), mode.Degraded)) - require.Nil(t, wc.(*cache).fsTree) + require.Nil(t, wc.(*cache).db) require.NoError(t, wc.Init(context.Background())) - require.Nil(t, wc.(*cache).fsTree) + require.Nil(t, wc.(*cache).db) require.NoError(t, wc.Close(context.Background())) } diff --git a/pkg/local_object_storage/writecache/options.go b/pkg/local_object_storage/writecache/options.go index f2957fe98..96a30ed53 100644 --- a/pkg/local_object_storage/writecache/options.go +++ b/pkg/local_object_storage/writecache/options.go @@ -2,6 +2,9 @@ package writecache import ( "context" + "io/fs" + "os" + "time" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" "go.uber.org/zap" @@ -20,6 +23,8 @@ type options struct { metabase Metabase // maxObjectSize is the maximum size of the object stored in the write-cache. maxObjectSize uint64 + // smallObjectSize is the maximum size of the object stored in the database. + smallObjectSize uint64 // workersCount is the number of workers flushing objects in parallel. workersCount int // maxCacheSize is the maximum total size of all objects saved in cache (DB + FS). @@ -28,16 +33,24 @@ type options struct { // maxCacheCount is the maximum total count of all object saved in cache. // 0 (no limit) by default. maxCacheCount uint64 + // objCounters contains atomic counters for the number of objects stored in cache. + objCounters counters + // maxBatchSize is the maximum batch size for the small object database. + maxBatchSize int + // maxBatchDelay is the maximum batch wait time for the small object database. + maxBatchDelay time.Duration // noSync is true iff FSTree allows unsynchronized writes. noSync bool // reportError is the function called when encountering disk errors in background workers. reportError func(context.Context, string, error) + // openFile is the function called internally by bbolt to open database files. Useful for hermetic testing. + openFile func(string, int, fs.FileMode) (*os.File, error) // metrics is metrics implementation metrics Metrics // disableBackgroundFlush is for testing purposes only. disableBackgroundFlush bool - // flushSizeLimit is total size of flushing objects. - flushSizeLimit uint64 + // pageSize is bbolt's page size config value + pageSize int } // WithLogger sets logger. @@ -77,6 +90,15 @@ func WithMaxObjectSize(sz uint64) Option { } } +// WithSmallObjectSize sets maximum object size to be stored in write-cache. +func WithSmallObjectSize(sz uint64) Option { + return func(o *options) { + if sz > 0 { + o.smallObjectSize = sz + } + } +} + func WithFlushWorkersCount(c int) Option { return func(o *options) { if c > 0 { @@ -99,6 +121,24 @@ func WithMaxCacheCount(v uint64) Option { } } +// WithMaxBatchSize sets max batch size for the small object database. +func WithMaxBatchSize(sz int) Option { + return func(o *options) { + if sz > 0 { + o.maxBatchSize = sz + } + } +} + +// WithMaxBatchDelay sets max batch delay for the small object database. +func WithMaxBatchDelay(d time.Duration) Option { + return func(o *options) { + if d > 0 { + o.maxBatchDelay = d + } + } +} + // WithNoSync sets an option to allow returning to caller on PUT before write is persisted. // Note, that we use this flag for FSTree only and DO NOT use it for a bolt DB because // we cannot yet properly handle the corrupted database during the startup. This SHOULD NOT @@ -116,6 +156,13 @@ func WithReportErrorFunc(f func(context.Context, string, error)) Option { } } +// WithOpenFile sets the OpenFile function to use internally by bolt. Useful for hermetic testing. +func WithOpenFile(f func(string, int, fs.FileMode) (*os.File, error)) Option { + return func(o *options) { + o.openFile = f + } +} + // WithMetrics sets metrics implementation. func WithMetrics(metrics Metrics) Option { return func(o *options) { @@ -130,9 +177,9 @@ func WithDisableBackgroundFlush() Option { } } -// WithFlushSizeLimit sets flush size limit. -func WithFlushSizeLimit(v uint64) Option { +// WithPageSize sets bbolt's page size. +func WithPageSize(s int) Option { return func(o *options) { - o.flushSizeLimit = v + o.pageSize = s } } diff --git a/pkg/local_object_storage/writecache/put.go b/pkg/local_object_storage/writecache/put.go index 7da5c4d3a..b9515a401 100644 --- a/pkg/local_object_storage/writecache/put.go +++ b/pkg/local_object_storage/writecache/put.go @@ -8,6 +8,7 @@ import ( storagelog "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/log" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" + "go.etcd.io/bbolt" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" ) @@ -49,16 +50,62 @@ func (c *cache) Put(ctx context.Context, prm common.PutPrm) (common.PutRes, erro return common.PutRes{}, ErrBigObject } + oi := objectInfo{ + addr: prm.Address.EncodeToString(), + obj: prm.Object, + data: prm.RawData, + } + + if sz <= c.smallObjectSize { + storageType = StorageTypeDB + err := c.putSmall(ctx, oi) + if err == nil { + added = true + } + return common.PutRes{}, err + } + storageType = StorageTypeFSTree - err := c.putBig(ctx, prm) + err := c.putBig(ctx, oi.addr, prm) if err == nil { added = true } return common.PutRes{}, metaerr.Wrap(err) } +// putSmall persists small objects to the write-cache database and +// pushes the to the flush workers queue. +func (c *cache) putSmall(ctx context.Context, obj objectInfo) error { + if !c.hasEnoughSpaceDB() { + return ErrOutOfSpace + } + + var newRecord bool + err := c.db.Batch(func(tx *bbolt.Tx) error { + b := tx.Bucket(defaultBucket) + key := []byte(obj.addr) + newRecord = b.Get(key) == nil + if newRecord { + return b.Put(key, obj.data) + } + return nil + }) + if err == nil { + storagelog.Write(ctx, c.log, + storagelog.AddressField(obj.addr), + storagelog.StorageTypeField(wcStorageType), + storagelog.OpField("db PUT"), + ) + if newRecord { + c.objCounters.cDB.Add(1) + c.estimateCacheSize() + } + } + return err +} + // putBig writes object to FSTree and pushes it to the flush workers queue. -func (c *cache) putBig(ctx context.Context, prm common.PutPrm) error { +func (c *cache) putBig(ctx context.Context, addr string, prm common.PutPrm) error { if !c.hasEnoughSpaceFS() { return ErrOutOfSpace } @@ -69,7 +116,7 @@ func (c *cache) putBig(ctx context.Context, prm common.PutPrm) error { } storagelog.Write(ctx, c.log, - storagelog.AddressField(prm.Address.EncodeToString()), + storagelog.AddressField(addr), storagelog.StorageTypeField(wcStorageType), storagelog.OpField("fstree PUT"), ) diff --git a/pkg/local_object_storage/writecache/state.go b/pkg/local_object_storage/writecache/state.go index 835686fbb..d03f4a63e 100644 --- a/pkg/local_object_storage/writecache/state.go +++ b/pkg/local_object_storage/writecache/state.go @@ -1,10 +1,29 @@ package writecache +import ( + "fmt" + "math" + "sync/atomic" + + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree" + "go.etcd.io/bbolt" +) + func (c *cache) estimateCacheSize() (uint64, uint64) { - count, size := c.counter.CountSize() - c.metrics.SetEstimateSize(size) - c.metrics.SetActualCounters(count) - return count, size + dbCount := c.objCounters.DB() + fsCount := c.objCounters.FS() + if fsCount > 0 { + fsCount-- // db file + } + dbSize := dbCount * c.smallObjectSize + fsSize := fsCount * c.maxObjectSize + c.metrics.SetEstimateSize(dbSize, fsSize) + c.metrics.SetActualCounters(dbCount, fsCount) + return dbCount + fsCount, dbSize + fsSize +} + +func (c *cache) hasEnoughSpaceDB() bool { + return c.hasEnoughSpace(c.smallObjectSize) } func (c *cache) hasEnoughSpaceFS() bool { @@ -19,7 +38,48 @@ func (c *cache) hasEnoughSpace(objectSize uint64) bool { return c.maxCacheSize >= size+objectSize } +var _ fstree.FileCounter = &counters{} + +type counters struct { + cDB, cFS atomic.Uint64 +} + +func (x *counters) DB() uint64 { + return x.cDB.Load() +} + +func (x *counters) FS() uint64 { + return x.cFS.Load() +} + +// Set implements fstree.ObjectCounter. +func (x *counters) Set(v uint64) { + x.cFS.Store(v) +} + +// Inc implements fstree.ObjectCounter. +func (x *counters) Inc() { + x.cFS.Add(1) +} + +// Dec implements fstree.ObjectCounter. +func (x *counters) Dec() { + x.cFS.Add(math.MaxUint64) +} + func (c *cache) initCounters() error { + var inDB uint64 + err := c.db.View(func(tx *bbolt.Tx) error { + b := tx.Bucket(defaultBucket) + if b != nil { + inDB = uint64(b.Stats().KeyN) + } + return nil + }) + if err != nil { + return fmt.Errorf("could not read write-cache DB counter: %w", err) + } + c.objCounters.cDB.Store(inDB) c.estimateCacheSize() return nil } diff --git a/pkg/local_object_storage/writecache/storage.go b/pkg/local_object_storage/writecache/storage.go index a0e236cb7..feb44bd1d 100644 --- a/pkg/local_object_storage/writecache/storage.go +++ b/pkg/local_object_storage/writecache/storage.go @@ -3,6 +3,7 @@ package writecache import ( "context" "fmt" + "math" "os" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" @@ -13,22 +14,49 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" + "go.etcd.io/bbolt" "go.uber.org/zap" ) +// store represents persistent storage with in-memory LRU cache +// for flushed items on top of it. +type store struct { + db *bbolt.DB +} + +const dbName = "small.bolt" + func (c *cache) openStore(mod mode.ComponentMode) error { err := util.MkdirAllX(c.path, os.ModePerm) if err != nil { return err } + c.db, err = OpenDB(c.path, mod.ReadOnly(), c.openFile, c.pageSize) + if err != nil { + return fmt.Errorf("could not open database: %w", err) + } + + c.db.MaxBatchSize = c.maxBatchSize + c.db.MaxBatchDelay = c.maxBatchDelay + + if !mod.ReadOnly() { + err = c.db.Update(func(tx *bbolt.Tx) error { + _, err := tx.CreateBucketIfNotExists(defaultBucket) + return err + }) + if err != nil { + return fmt.Errorf("could not create default bucket: %w", err) + } + } + c.fsTree = fstree.New( fstree.WithPath(c.path), fstree.WithPerm(os.ModePerm), fstree.WithDepth(1), fstree.WithDirNameLen(1), fstree.WithNoSync(c.noSync), - fstree.WithFileCounter(c.counter), + fstree.WithFileCounter(&c.objCounters), ) if err := c.fsTree.Open(mod); err != nil { return fmt.Errorf("could not open FSTree: %w", err) @@ -40,8 +68,43 @@ func (c *cache) openStore(mod mode.ComponentMode) error { return nil } -func (c *cache) deleteFromDisk(ctx context.Context, addr oid.Address, size uint64) { - _, err := c.fsTree.Delete(ctx, common.DeletePrm{Address: addr, Size: size}) +func (c *cache) deleteFromDB(ctx context.Context, key string, batched bool) { + var recordDeleted bool + var err error + if batched { + err = c.db.Batch(func(tx *bbolt.Tx) error { + b := tx.Bucket(defaultBucket) + key := []byte(key) + recordDeleted = b.Get(key) != nil + return b.Delete(key) + }) + } else { + err = c.db.Update(func(tx *bbolt.Tx) error { + b := tx.Bucket(defaultBucket) + key := []byte(key) + recordDeleted = b.Get(key) != nil + return b.Delete(key) + }) + } + + if err == nil { + c.metrics.Evict(StorageTypeDB) + storagelog.Write(ctx, c.log, + storagelog.AddressField(key), + storagelog.StorageTypeField(wcStorageType), + storagelog.OpField("db DELETE"), + ) + if recordDeleted { + c.objCounters.cDB.Add(math.MaxUint64) + c.estimateCacheSize() + } + } else { + c.log.Error(ctx, logs.WritecacheCantRemoveObjectsFromTheDatabase, zap.Error(err)) + } +} + +func (c *cache) deleteFromDisk(ctx context.Context, addr oid.Address) { + _, err := c.fsTree.Delete(ctx, common.DeletePrm{Address: addr}) if err != nil && !client.IsErrObjectNotFound(err) { c.log.Error(ctx, logs.WritecacheCantRemoveObjectFromWritecache, zap.Error(err)) } else if err == nil { diff --git a/pkg/local_object_storage/writecache/upgrade.go b/pkg/local_object_storage/writecache/upgrade.go deleted file mode 100644 index 3a100f1a3..000000000 --- a/pkg/local_object_storage/writecache/upgrade.go +++ /dev/null @@ -1,110 +0,0 @@ -package writecache - -import ( - "bytes" - "context" - "errors" - "fmt" - "io/fs" - "os" - "path/filepath" - "time" - - objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" - oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" - "go.etcd.io/bbolt" -) - -const dbName = "small.bolt" - -var defaultBucket = []byte{0} - -func (c *cache) flushAndDropBBoltDB(ctx context.Context) error { - _, err := os.Stat(filepath.Join(c.path, dbName)) - if err != nil && os.IsNotExist(err) { - return nil - } - if err != nil { - return fmt.Errorf("could not check write-cache database existence: %w", err) - } - db, err := OpenDB(c.path, true, os.OpenFile) - if err != nil { - return fmt.Errorf("could not open write-cache database: %w", err) - } - defer func() { - _ = db.Close() - }() - - var last string - for { - batch, err := c.readNextDBBatch(db, last) - if err != nil { - return err - } - if len(batch) == 0 { - break - } - for _, item := range batch { - var obj objectSDK.Object - if err := obj.Unmarshal(item.data); err != nil { - return fmt.Errorf("unmarshal object from database: %w", err) - } - if err := c.flushObject(ctx, &obj, item.data, StorageTypeDB); err != nil { - return fmt.Errorf("flush object from database: %w", err) - } - } - last = batch[len(batch)-1].address - } - if err := db.Close(); err != nil { - return fmt.Errorf("close write-cache database: %w", err) - } - if err := os.Remove(filepath.Join(c.path, dbName)); err != nil { - return fmt.Errorf("remove write-cache database: %w", err) - } - return nil -} - -type batchItem struct { - data []byte - address string -} - -func (c *cache) readNextDBBatch(db *bbolt.DB, last string) ([]batchItem, error) { - const batchSize = 100 - var batch []batchItem - err := db.View(func(tx *bbolt.Tx) error { - var addr oid.Address - - b := tx.Bucket(defaultBucket) - cs := b.Cursor() - for k, data := cs.Seek([]byte(last)); k != nil; k, data = cs.Next() { - sa := string(k) - if sa == last { - continue - } - if err := addr.DecodeString(sa); err != nil { - return fmt.Errorf("decode address from database: %w", err) - } - - batch = append(batch, batchItem{data: bytes.Clone(data), address: sa}) - if len(batch) == batchSize { - return errIterationCompleted - } - } - return nil - }) - if err == nil || errors.Is(err, errIterationCompleted) { - return batch, nil - } - return nil, err -} - -// OpenDB opens BoltDB instance for write-cache. Opens in read-only mode if ro is true. -func OpenDB(p string, ro bool, openFile func(string, int, fs.FileMode) (*os.File, error)) (*bbolt.DB, error) { - return bbolt.Open(filepath.Join(p, dbName), os.ModePerm, &bbolt.Options{ - NoFreelistSync: true, - ReadOnly: ro, - Timeout: 100 * time.Millisecond, - OpenFile: openFile, - }) -} diff --git a/pkg/local_object_storage/writecache/util.go b/pkg/local_object_storage/writecache/util.go new file mode 100644 index 000000000..ad3b443f3 --- /dev/null +++ b/pkg/local_object_storage/writecache/util.go @@ -0,0 +1,21 @@ +package writecache + +import ( + "io/fs" + "os" + "path/filepath" + "time" + + "go.etcd.io/bbolt" +) + +// OpenDB opens BoltDB instance for write-cache. Opens in read-only mode if ro is true. +func OpenDB(p string, ro bool, openFile func(string, int, fs.FileMode) (*os.File, error), pageSize int) (*bbolt.DB, error) { + return bbolt.Open(filepath.Join(p, dbName), os.ModePerm, &bbolt.Options{ + NoFreelistSync: true, + ReadOnly: ro, + Timeout: 100 * time.Millisecond, + OpenFile: openFile, + PageSize: pageSize, + }) +} diff --git a/pkg/network/transport/container/grpc/service.go b/pkg/network/transport/container/grpc/service.go index 49d083a90..d932ecc07 100644 --- a/pkg/network/transport/container/grpc/service.go +++ b/pkg/network/transport/container/grpc/service.go @@ -6,6 +6,9 @@ import ( containersvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/container" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/container" containerGRPC "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/container/grpc" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) // Server wraps FrostFS API Container service and @@ -80,3 +83,7 @@ func (s *Server) List(ctx context.Context, req *containerGRPC.ListRequest) (*con return resp.ToGRPCMessage().(*containerGRPC.ListResponse), nil } + +func (s *Server) ListStream(_ *containerGRPC.ListStreamRequest, _ grpc.ServerStreamingServer[containerGRPC.ListStreamResponse]) error { + return status.Error(codes.Unimplemented, "method ListStream not implemented") +} diff --git a/pkg/services/common/ape/checker.go b/pkg/services/common/ape/checker.go index eb4fd03c7..86021c3db 100644 --- a/pkg/services/common/ape/checker.go +++ b/pkg/services/common/ape/checker.go @@ -11,7 +11,6 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/ape" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer" - apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user" apechain "git.frostfs.info/TrueCloudLab/policy-engine/pkg/chain" @@ -104,14 +103,7 @@ func (c *checkerCoreImpl) CheckAPE(prm CheckPrm) error { if found && status == apechain.Allow { return nil } - err = fmt.Errorf("access to operation %s is denied by access policy engine: %s", prm.Request.Operation(), status.String()) - return apeErr(err) -} - -func apeErr(err error) error { - errAccessDenied := &apistatus.ObjectAccessDenied{} - errAccessDenied.WriteReason(err.Error()) - return errAccessDenied + return newChainRouterError(prm.Request.Operation(), status) } // isValidBearer checks whether bearer token was correctly signed by authorized diff --git a/pkg/services/common/ape/error.go b/pkg/services/common/ape/error.go new file mode 100644 index 000000000..d3c381de7 --- /dev/null +++ b/pkg/services/common/ape/error.go @@ -0,0 +1,33 @@ +package ape + +import ( + "fmt" + + apechain "git.frostfs.info/TrueCloudLab/policy-engine/pkg/chain" +) + +// ChainRouterError is returned when chain router validation prevents +// the APE request from being processed (no rule found, access denied, etc.). +type ChainRouterError struct { + operation string + status apechain.Status +} + +func (e *ChainRouterError) Error() string { + return fmt.Sprintf("access to operation %s is denied by access policy engine: %s", e.Operation(), e.Status()) +} + +func (e *ChainRouterError) Operation() string { + return e.operation +} + +func (e *ChainRouterError) Status() apechain.Status { + return e.status +} + +func newChainRouterError(operation string, status apechain.Status) *ChainRouterError { + return &ChainRouterError{ + operation: operation, + status: status, + } +} diff --git a/pkg/services/object/ape/errors.go b/pkg/services/object/ape/errors.go index 1b2024ed5..6e458b384 100644 --- a/pkg/services/object/ape/errors.go +++ b/pkg/services/object/ape/errors.go @@ -1,10 +1,19 @@ package ape import ( + "errors" + + checkercore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/common/ape" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" ) func toStatusErr(err error) error { + var chRouterErr *checkercore.ChainRouterError + if !errors.As(err, &chRouterErr) { + errServerInternal := &apistatus.ServerInternal{} + apistatus.WriteInternalServerErr(errServerInternal, err) + return errServerInternal + } errAccessDenied := &apistatus.ObjectAccessDenied{} errAccessDenied.WriteReason("ape denied request: " + err.Error()) return errAccessDenied diff --git a/pkg/services/object/transport_splitter.go b/pkg/services/object/transport_splitter.go index 0b3676edb..b446d3605 100644 --- a/pkg/services/object/transport_splitter.go +++ b/pkg/services/object/transport_splitter.go @@ -162,13 +162,13 @@ func (s *searchStreamMsgSizeCtrl) Send(resp *object.SearchResponse) error { var newResp *object.SearchResponse - for ln := uint64(len(ids)); ; { + for { if newResp == nil { newResp = new(object.SearchResponse) newResp.SetBody(body) } - cut := min(s.addrAmount, ln) + cut := min(s.addrAmount, uint64(len(ids))) body.SetIDList(ids[:cut]) newResp.SetMetaHeader(resp.GetMetaHeader()) diff --git a/pkg/services/object_manager/placement/cache_test.go b/pkg/services/object_manager/placement/cache_test.go index a890d5357..7242970b5 100644 --- a/pkg/services/object_manager/placement/cache_test.go +++ b/pkg/services/object_manager/placement/cache_test.go @@ -85,7 +85,10 @@ func TestContainerNodesCache(t *testing.T) { }) t.Run("the error is propagated", func(t *testing.T) { var pp netmapSDK.PlacementPolicy - require.NoError(t, pp.DecodeString("REP 1 SELECT 1 FROM X FILTER ATTR EQ 42 AS X")) + r := netmapSDK.ReplicaDescriptor{} + r.SetNumberOfObjects(1) + r.SetSelectorName("Missing") + pp.AddReplicas(r) c := placement.NewContainerNodesCache(size) _, err := c.ContainerNodes(nm(1, nodes[0:1]), cidtest.ID(), pp) diff --git a/pkg/services/object_manager/placement/traverser.go b/pkg/services/object_manager/placement/traverser.go index 7c720b204..6a949e938 100644 --- a/pkg/services/object_manager/placement/traverser.go +++ b/pkg/services/object_manager/placement/traverser.go @@ -202,7 +202,7 @@ func sortVector(cfg *cfg, unsortedVector []netmap.NodeInfo) ([]netmap.NodeInfo, metrics: m, } } - slices.SortFunc(nm, func(a, b nodeMetrics) int { + slices.SortStableFunc(nm, func(a, b nodeMetrics) int { return slices.Compare(a.metrics, b.metrics) }) sortedVector := make([]netmap.NodeInfo, len(unsortedVector)) diff --git a/pkg/services/policer/ec.go b/pkg/services/policer/ec.go index cb583f1d3..db640e323 100644 --- a/pkg/services/policer/ec.go +++ b/pkg/services/policer/ec.go @@ -281,6 +281,8 @@ func (p *Policer) adjustECPlacement(ctx context.Context, objInfo objectcore.Info } chunkIDs[ch.Index] = ecInfoChunkID } + } else if client.IsErrObjectAlreadyRemoved(err) { + restore = false } else if !p.cfg.netmapKeys.IsLocalKey(n.PublicKey()) && uint32(idx) < objInfo.ECInfo.Total { p.log.Warn(ctx, logs.PolicerCouldNotGetObjectFromNodeMoving, zap.String("node", hex.EncodeToString(n.PublicKey())), zap.Stringer("object", parentAddress), zap.Error(err)) p.replicator.HandleReplicationTask(ctx, replicator.Task{ diff --git a/pkg/services/tree/signature.go b/pkg/services/tree/signature.go index 4fd4a7e1e..b0f00615a 100644 --- a/pkg/services/tree/signature.go +++ b/pkg/services/tree/signature.go @@ -9,8 +9,10 @@ import ( "fmt" core "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container" + checkercore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/common/ape" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/refs" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer" + apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/acl" cidSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" frostfscrypto "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/crypto" @@ -62,7 +64,22 @@ func (s *Service) verifyClient(ctx context.Context, req message, cid cidSDK.ID, return fmt.Errorf("can't get request role: %w", err) } - return s.checkAPE(ctx, bt, cnr, cid, op, role, pubKey) + if err = s.checkAPE(ctx, bt, cnr, cid, op, role, pubKey); err != nil { + return apeErr(err) + } + return nil +} + +func apeErr(err error) error { + var chRouterErr *checkercore.ChainRouterError + if !errors.As(err, &chRouterErr) { + errServerInternal := &apistatus.ServerInternal{} + apistatus.WriteInternalServerErr(errServerInternal, err) + return errServerInternal + } + errAccessDenied := &apistatus.ObjectAccessDenied{} + errAccessDenied.WriteReason(err.Error()) + return errAccessDenied } // Returns true iff the operation is read-only and request was signed