From 1f01a0a71a5dd6cf6801e87039f044a23fbfb780 Mon Sep 17 00:00:00 2001 From: Evgenii Stratonikov Date: Fri, 27 Jan 2023 13:26:49 +0300 Subject: [PATCH 01/25] [#2212] morph: Fix subscription restoration Signed-off-by: Evgenii Stratonikov --- CHANGELOG.md | 1 + pkg/morph/client/notifications.go | 2 ++ 2 files changed, 3 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9002ee10b..a5bd4e652 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -26,6 +26,7 @@ Changelog for FrostFS Node - Set flag `mode` required for `frostfs-cli control shards set-mode` (#8) - Fix `dirty` suffix in debian package version (#53) - Prevent node process from killing by systemd when shutting down (#1465) +- Restore subscriptions correctly on morph client switch (#2212) ### Removed ### Updated diff --git a/pkg/morph/client/notifications.go b/pkg/morph/client/notifications.go index 8eaf617b2..63e5a5c8a 100644 --- a/pkg/morph/client/notifications.go +++ b/pkg/morph/client/notifications.go @@ -226,6 +226,7 @@ func (c *Client) restoreSubscriptions(cli *rpcclient.WSClient, endpoint string) // notification events restoration for contract := range c.subscribedEvents { + contract := contract // See https://github.com/nspcc-dev/neo-go/issues/2890 id, err = cli.SubscribeForExecutionNotifications(&contract, nil) if err != nil { c.logger.Error("could not restore notification subscription after RPC switch", @@ -242,6 +243,7 @@ func (c *Client) restoreSubscriptions(cli *rpcclient.WSClient, endpoint string) // notary notification events restoration if c.notary != nil { for signer := range c.subscribedNotaryEvents { + signer := signer // See https://github.com/nspcc-dev/neo-go/issues/2890 id, err = cli.SubscribeForNotaryRequests(nil, &signer) if err != nil { c.logger.Error("could not restore notary notification subscription after RPC switch", -- 2.45.3 From 3b38aedb388fb2db333d52b6eabc1dee2d793613 Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Wed, 18 Jan 2023 17:33:45 +0300 Subject: [PATCH 02/25] CHANGELOG: fix whitespacing errors Signed-off-by: Roman Khimov --- CHANGELOG.md | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a5bd4e652..def99e02d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -125,7 +125,6 @@ You need to change configuration environment variables to `FROSTFS_*` if you use - `spf13/viper` to `v1.8.0` - `google.golang.org/grpc` to `v1.50.1` - ### Updating from v0.34.0 Pass CID and OID parameters via the `--cid` and `--oid` flags, not as the command arguments. @@ -139,9 +138,9 @@ to match the container owner. Use `--force` (`-f`) flag to bypass this requireme Tree service network replication can now be fine-tuned with `tree.replication_timeout` config field. - ## [0.34.0] - 2022-10-31 - Marado (마라도, 馬羅島) +## [0.34.0] - 2022-10-31 - Marado (마라도, 馬羅島) -# ## Added +### Added - `--timeout` flag in `neofs-cli control` commands (#1917) - Document shard modes of operation (#1909) - `tree list` CLI command (#1332) -- 2.45.3 From 7b0708f50be31e6de6ba123c08d8d4f0cf987476 Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Wed, 18 Jan 2023 17:34:28 +0300 Subject: [PATCH 03/25] CHANGELOG: add more fancy glyphs How could you forget adding it? Signed-off-by: Roman Khimov --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index def99e02d..abb66416e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -41,7 +41,7 @@ Changelog for FrostFS Node ### Updating from v0.35.0 You need to change configuration environment variables to `FROSTFS_*` if you use any. -## [0.35.0] - 2022-12-28 - Sindo (신도) +## [0.35.0] - 2022-12-28 - Sindo (신도, 信島) ### Added - `morph list-containers` in `neofs-adm` (#1689) -- 2.45.3 From e17bf71d19365546cce9e1cc18254d0bd06fd509 Mon Sep 17 00:00:00 2001 From: Pavel Karpy Date: Wed, 25 Jan 2023 18:48:13 +0300 Subject: [PATCH 04/25] [#2213] node: Do not return object expired object "Object is expired" means that object is presented in `meta` but it is not `ObjectNotFound` error. Previous implementation made `shard` search for an object without `meta` which was an error. Signed-off-by: Pavel Karpy --- CHANGELOG.md | 1 + pkg/local_object_storage/engine/get.go | 7 ++++++- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index abb66416e..df1022d3d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -27,6 +27,7 @@ Changelog for FrostFS Node - Fix `dirty` suffix in debian package version (#53) - Prevent node process from killing by systemd when shutting down (#1465) - Restore subscriptions correctly on morph client switch (#2212) +- Expired objects could be returned if not marked with GC yet (#2213) ### Removed ### Updated diff --git a/pkg/local_object_storage/engine/get.go b/pkg/local_object_storage/engine/get.go index bbe5688f8..3db3e7c63 100644 --- a/pkg/local_object_storage/engine/get.go +++ b/pkg/local_object_storage/engine/get.go @@ -74,6 +74,7 @@ func (e *StorageEngine) get(prm GetPrm) (GetRes, error) { shPrm.SetAddress(prm.addr) var hasDegraded bool + var objectExpired bool e.iterateOverSortedShards(prm.addr, func(_ int, sh hashedShard) (stop bool) { noMeta := sh.GetMode().NoMetabase() @@ -113,7 +114,7 @@ func (e *StorageEngine) get(prm GetPrm) (GetRes, error) { case shard.IsErrObjectExpired(err): // object is found but should not // be returned - outError = errNotFound + objectExpired = true return true default: e.reportShardError(sh, "could not get object from shard", err) @@ -130,6 +131,10 @@ func (e *StorageEngine) get(prm GetPrm) (GetRes, error) { return GetRes{}, logicerr.Wrap(objectSDK.NewSplitInfoError(outSI)) } + if objectExpired { + return GetRes{}, errNotFound + } + if obj == nil { if !hasDegraded && shardWithMeta.Shard == nil || !shard.IsErrNotFound(outError) { return GetRes{}, outError -- 2.45.3 From f1e3309ca3bd527cba139de2e9efa3eaa188a545 Mon Sep 17 00:00:00 2001 From: Evgenii Stratonikov Date: Fri, 27 Jan 2023 18:27:49 +0300 Subject: [PATCH 05/25] [#2224] adm: Use native neo-go sessions in `dump-hashes` If we had lots of domains in one zone, `dump-hashes` for all others can miss some domains, because we need to restrict ourselves with _some_ number. In this commit we use neo-go sessions by default, with a proper failback to in-script iterator unwrapping. Signed-off-by: Evgenii Stratonikov --- CHANGELOG.md | 1 + .../internal/modules/morph/dump_hashes.go | 53 +++++++++++++++---- 2 files changed, 44 insertions(+), 10 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index df1022d3d..3103208e2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -28,6 +28,7 @@ Changelog for FrostFS Node - Prevent node process from killing by systemd when shutting down (#1465) - Restore subscriptions correctly on morph client switch (#2212) - Expired objects could be returned if not marked with GC yet (#2213) +- `neofs-adm morph dump-hashes` now properly iterates over custom domain (#2224) ### Removed ### Updated diff --git a/cmd/frostfs-adm/internal/modules/morph/dump_hashes.go b/cmd/frostfs-adm/internal/modules/morph/dump_hashes.go index 4b7d579d1..030542fb9 100644 --- a/cmd/frostfs-adm/internal/modules/morph/dump_hashes.go +++ b/cmd/frostfs-adm/internal/modules/morph/dump_hashes.go @@ -2,6 +2,7 @@ package morph import ( "bytes" + "errors" "fmt" "strings" "text/tabwriter" @@ -107,31 +108,30 @@ func dumpContractHashes(cmd *cobra.Command, _ []string) error { func dumpCustomZoneHashes(cmd *cobra.Command, nnsHash util.Uint160, zone string, c Client) error { const nnsMaxTokens = 100 - inv := invoker.New(c, nil) - arr, err := unwrap.Array(inv.CallAndExpandIterator(nnsHash, "tokens", nnsMaxTokens)) - if err != nil { - return fmt.Errorf("can't get a list of NNS domains: %w", err) - } + inv := invoker.New(c, nil) if !strings.HasPrefix(zone, ".") { zone = "." + zone } var infos []contractDumpInfo - for i := range arr { - bs, err := arr[i].TryBytes() + processItem := func(item stackitem.Item) { + bs, err := item.TryBytes() if err != nil { - continue + cmd.PrintErrf("Invalid NNS record: %v\n", err) + return } if !bytes.HasSuffix(bs, []byte(zone)) { - continue + // Related https://github.com/nspcc-dev/neofs-contract/issues/316. + return } h, err := nnsResolveHash(inv, nnsHash, string(bs)) if err != nil { - continue + cmd.PrintErrf("Could not resolve name %s: %v\n", string(bs), err) + return } infos = append(infos, contractDumpInfo{ @@ -140,6 +140,39 @@ func dumpCustomZoneHashes(cmd *cobra.Command, nnsHash util.Uint160, zone string, }) } + sessionID, iter, err := unwrap.SessionIterator(inv.Call(nnsHash, "tokens")) + if err != nil { + if errors.Is(err, unwrap.ErrNoSessionID) { + items, err := unwrap.Array(inv.CallAndExpandIterator(nnsHash, "tokens", nnsMaxTokens)) + if err != nil { + return fmt.Errorf("can't get a list of NNS domains: %w", err) + } + if len(items) == nnsMaxTokens { + cmd.PrintErrln("Provided RPC endpoint doesn't support sessions, some hashes might be lost.") + } + for i := range items { + processItem(items[i]) + } + } else { + return err + } + } else { + defer func() { + _ = inv.TerminateSession(sessionID) + }() + + items, err := inv.TraverseIterator(sessionID, &iter, nnsMaxTokens) + for err == nil && len(items) != 0 { + for i := range items { + processItem(items[i]) + } + items, err = inv.TraverseIterator(sessionID, &iter, nnsMaxTokens) + } + if err != nil { + return fmt.Errorf("error during NNS domains iteration: %w", err) + } + } + fillContractVersion(cmd, c, infos) printContractInfo(cmd, infos) -- 2.45.3 From de344e9223faec5e885ff8176e934e49db533750 Mon Sep 17 00:00:00 2001 From: Evgenii Stratonikov Date: Wed, 1 Feb 2023 11:58:16 +0300 Subject: [PATCH 06/25] [#2232] pilorama: Merge in-queue batches To achieve high performance we must choose proper values for both batch size and delay. For user operations we want to set low delay. However it would prevent tree synchronization operations to form big enough batches. For these operations, batching gives the most benefit not only in terms of on-CPU execution cost, but also by speeding up transaction persist (`fsync`). In this commit we try merging batches that are already _triggered_, but not yet _started to execute_. This way we can still query batches for execution after the provided delay while also allowing multiple formed batches to execute faster. Signed-off-by: Evgenii Stratonikov --- CHANGELOG.md | 1 + pkg/local_object_storage/pilorama/batch.go | 21 +++++++++++++++------ pkg/local_object_storage/pilorama/boltdb.go | 2 ++ 3 files changed, 18 insertions(+), 6 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3103208e2..40d86d918 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,7 @@ Changelog for FrostFS Node - `common.PrintVerbose` prints via `cobra.Command.Printf` (#1962) - Env prefix in configuration changed to `FROSTFS_*` (#43) - Link object is broadcast throughout the whole container now (#57) +- Pilorama now can merge multiple batches into one (#2231) ### Fixed - Increase payload size metric on shards' `put` operation (#1794) diff --git a/pkg/local_object_storage/pilorama/batch.go b/pkg/local_object_storage/pilorama/batch.go index 59712b761..43375ba1b 100644 --- a/pkg/local_object_storage/pilorama/batch.go +++ b/pkg/local_object_storage/pilorama/batch.go @@ -10,8 +10,11 @@ import ( ) type batch struct { - forest *boltForest - timer *time.Timer + forest *boltForest + timer *time.Timer + // mtx protects timer and operations fields. + // Because mtx can be taken inside a transaction, + // transactions MUST NOT be executed with the mutex taken to avoid a deadlock. mtx sync.Mutex start sync.Once cid cidSDK.ID @@ -24,16 +27,12 @@ func (b *batch) trigger() { b.mtx.Lock() if b.timer != nil { b.timer.Stop() - b.timer = nil } b.mtx.Unlock() b.start.Do(b.run) } func (b *batch) run() { - sort.Slice(b.operations, func(i, j int) bool { - return b.operations[i].Time < b.operations[j].Time - }) fullID := bucketName(b.cid, b.treeID) err := b.forest.db.Update(func(tx *bbolt.Tx) error { bLog, bTree, err := b.forest.getTreeBuckets(tx, fullID) @@ -41,6 +40,16 @@ func (b *batch) run() { return err } + b.mtx.Lock() + b.timer = nil + b.mtx.Unlock() + + // Sorting without a mutex is ok, because we append to this slice only if timer is non-nil. + // See (*boltForest).addBatch for details. + sort.Slice(b.operations, func(i, j int) bool { + return b.operations[i].Time < b.operations[j].Time + }) + var lm Move return b.forest.applyOperation(bLog, bTree, b.operations, &lm) }) diff --git a/pkg/local_object_storage/pilorama/boltdb.go b/pkg/local_object_storage/pilorama/boltdb.go index 0f546ef61..247d07d28 100644 --- a/pkg/local_object_storage/pilorama/boltdb.go +++ b/pkg/local_object_storage/pilorama/boltdb.go @@ -377,7 +377,9 @@ func (t *boltForest) addBatch(d CIDDescriptor, treeID string, m *Move, ch chan e results: []chan<- error{ch}, operations: []*Move{m}, } + b.mtx.Lock() b.timer = time.AfterFunc(t.db.MaxBatchDelay, b.trigger) + b.mtx.Unlock() t.batches = append(t.batches, b) t.mtx.Unlock() } -- 2.45.3 From 75de3f1c1f8b9960498347e86713fd9adb01d8d8 Mon Sep 17 00:00:00 2001 From: Evgenii Stratonikov Date: Mon, 6 Feb 2023 16:03:37 +0300 Subject: [PATCH 07/25] [#2239] writecache: Fix possible deadlock LRU `Peek`/`Contains` take LRU mutex _inside_ of a `View` transaction. `View` transaction itself takes `mmapLock` [1], which is lifted after tx finishes (in `tx.Commit()` -> `tx.close()` -> `tx.db.removeTx`) When we evict items from LRU cache mutex order is different: first we take LRU mutex and then execute `Batch` which _does_ take `mmapLock` in case we need to remap. Thus the deadlock. [1] https://github.com/etcd-io/bbolt/blob/8f4a7e1f92975fbd1536a569d7aad6800809ef4e/db.go#L708 Signed-off-by: Evgenii Stratonikov --- CHANGELOG.md | 1 + pkg/local_object_storage/writecache/flush.go | 14 ++++++++------ pkg/local_object_storage/writecache/storage.go | 5 +++++ 3 files changed, 14 insertions(+), 6 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 40d86d918..06c8ebbb4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -30,6 +30,7 @@ Changelog for FrostFS Node - Restore subscriptions correctly on morph client switch (#2212) - Expired objects could be returned if not marked with GC yet (#2213) - `neofs-adm morph dump-hashes` now properly iterates over custom domain (#2224) +- Possible deadlock in write-cache (#2239) ### Removed ### Updated diff --git a/pkg/local_object_storage/writecache/flush.go b/pkg/local_object_storage/writecache/flush.go index 7340512f1..f4fce0d0a 100644 --- a/pkg/local_object_storage/writecache/flush.go +++ b/pkg/local_object_storage/writecache/flush.go @@ -99,10 +99,6 @@ func (c *cache) flushDB() { lastKey = slice.Copy(k) } - if _, ok := c.flushed.Peek(string(k)); ok { - continue - } - m = append(m, objectInfo{ addr: string(k), data: slice.Copy(v), @@ -111,12 +107,18 @@ func (c *cache) flushDB() { return nil }) + var count int for i := range m { + if c.flushed.Contains(m[i].addr) { + continue + } + obj := object.New() if err := obj.Unmarshal(m[i].data); err != nil { continue } + count++ select { case c.flushCh <- obj: case <-c.closeCh: @@ -125,7 +127,7 @@ func (c *cache) flushDB() { } } - if len(m) == 0 { + if count == 0 { c.modeMtx.RUnlock() break } @@ -133,7 +135,7 @@ func (c *cache) flushDB() { c.modeMtx.RUnlock() c.log.Debug("tried to flush items from write-cache", - zap.Int("count", len(m)), + zap.Int("count", count), zap.String("start", base58.Encode(lastKey))) } } diff --git a/pkg/local_object_storage/writecache/storage.go b/pkg/local_object_storage/writecache/storage.go index 4a9724f4d..da533880e 100644 --- a/pkg/local_object_storage/writecache/storage.go +++ b/pkg/local_object_storage/writecache/storage.go @@ -23,6 +23,11 @@ type store struct { maxFlushedMarksCount int maxRemoveBatchSize int + // flushed contains addresses of objects that were already flushed to the main storage. + // We use LRU cache instead of map here to facilitate removing of unused object in favour of + // frequently read ones. + // MUST NOT be used inside bolt db transaction because it's eviction handler + // removes untracked items from the database. flushed simplelru.LRUCache[string, bool] db *bbolt.DB -- 2.45.3 From 9a61e53162a1ae93592aeef7c784109d330f9aac Mon Sep 17 00:00:00 2001 From: Evgenii Stratonikov Date: Mon, 6 Feb 2023 13:30:33 +0300 Subject: [PATCH 08/25] [#2238] engine: Make `Open` and `Init` similar 1. Both could initialize shards in parallel. 2. Both should close shards after an error. Signed-off-by: Evgenii Stratonikov --- CHANGELOG.md | 1 + pkg/local_object_storage/engine/control.go | 45 +++++++++++++++++----- 2 files changed, 36 insertions(+), 10 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 06c8ebbb4..6be07ddaf 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,7 @@ Changelog for FrostFS Node - Env prefix in configuration changed to `FROSTFS_*` (#43) - Link object is broadcast throughout the whole container now (#57) - Pilorama now can merge multiple batches into one (#2231) +- Storage engine now can start even when some shard components are unavailable (#2238) ### Fixed - Increase payload size metric on shards' `put` operation (#1794) diff --git a/pkg/local_object_storage/engine/control.go b/pkg/local_object_storage/engine/control.go index a56d7bbb2..b9a831359 100644 --- a/pkg/local_object_storage/engine/control.go +++ b/pkg/local_object_storage/engine/control.go @@ -23,27 +23,44 @@ func (e *StorageEngine) Open() error { } func (e *StorageEngine) open() error { - e.mtx.RLock() - defer e.mtx.RUnlock() + e.mtx.Lock() + defer e.mtx.Unlock() var wg sync.WaitGroup - var errCh = make(chan error, len(e.shards)) + var errCh = make(chan shardInitError, len(e.shards)) for id, sh := range e.shards { wg.Add(1) go func(id string, sh *shard.Shard) { defer wg.Done() if err := sh.Open(); err != nil { - errCh <- fmt.Errorf("could not open shard %s: %w", id, err) + errCh <- shardInitError{ + err: err, + id: id, + } } }(id, sh.Shard) } wg.Wait() close(errCh) - for err := range errCh { - if err != nil { - return err + for res := range errCh { + if res.err != nil { + e.log.Error("could not open shard, closing and skipping", + zap.String("id", res.id), + zap.Error(res.err)) + + sh := e.shards[res.id] + delete(e.shards, res.id) + + err := sh.Close() + if err != nil { + e.log.Error("could not close partially initialized shard", + zap.String("id", res.id), + zap.Error(res.err)) + } + + continue } } @@ -76,12 +93,20 @@ func (e *StorageEngine) Init() error { for res := range errCh { if res.err != nil { if errors.Is(res.err, blobstor.ErrInitBlobovniczas) { - delete(e.shards, res.id) - - e.log.Error("shard initialization failure, skipping", + e.log.Error("could not initialize shard, closing and skipping", zap.String("id", res.id), zap.Error(res.err)) + sh := e.shards[res.id] + delete(e.shards, res.id) + + err := sh.Close() + if err != nil { + e.log.Error("could not close partially initialized shard", + zap.String("id", res.id), + zap.Error(res.err)) + } + continue } return fmt.Errorf("could not initialize shard %s: %w", res.id, res.err) -- 2.45.3 From 53067a7db03d0c36c29e9906d8a25e45043fd6d5 Mon Sep 17 00:00:00 2001 From: Evgenii Stratonikov Date: Mon, 6 Feb 2023 13:35:28 +0300 Subject: [PATCH 09/25] [#2238] shard: Try closing all components Signed-off-by: Evgenii Stratonikov --- pkg/local_object_storage/shard/control.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/pkg/local_object_storage/shard/control.go b/pkg/local_object_storage/shard/control.go index c96b267ed..2f698c694 100644 --- a/pkg/local_object_storage/shard/control.go +++ b/pkg/local_object_storage/shard/control.go @@ -255,9 +255,11 @@ func (s *Shard) Close() error { components = append(components, s.blobStor, s.metaBase) + var lastErr error for _, component := range components { if err := component.Close(); err != nil { - return fmt.Errorf("could not close %s: %w", component, err) + lastErr = err + s.log.Error("could not close shard component", zap.Error(err)) } } @@ -266,7 +268,7 @@ func (s *Shard) Close() error { s.gc.stop() } - return nil + return lastErr } // Reload reloads configuration portions that are necessary. -- 2.45.3 From 1fe9a650c705f06921f0d616bfc85d27ddaa8ba3 Mon Sep 17 00:00:00 2001 From: Evgenii Stratonikov Date: Mon, 6 Feb 2023 13:48:58 +0300 Subject: [PATCH 10/25] [#2238] neofs-node: Gracefully handle shard initialization errors Signed-off-by: Evgenii Stratonikov --- cmd/frostfs-node/config.go | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/cmd/frostfs-node/config.go b/cmd/frostfs-node/config.go index d20e9b1cc..9d47e43cb 100644 --- a/cmd/frostfs-node/config.go +++ b/cmd/frostfs-node/config.go @@ -790,13 +790,18 @@ func initLocalStorage(c *cfg) { tombstone.WithTombstoneSource(tombstoneSrc), ) + var shardsAttached int for _, optsWithMeta := range c.shardOpts() { id, err := ls.AddShard(append(optsWithMeta.shOpts, shard.WithTombstoneSource(tombstoneSource))...) - fatalOnErr(err) - - c.log.Info("shard attached to engine", - zap.Stringer("id", id), - ) + if err != nil { + c.log.Error("failed to attach shard to engine", zap.Error(err)) + } else { + shardsAttached++ + c.log.Info("shard attached to engine", zap.Stringer("id", id)) + } + } + if shardsAttached == 0 { + fatalOnErr(engineconfig.ErrNoShardConfigured) } c.cfgObject.cfgLocalStorage.localStorage = ls -- 2.45.3 From b2159b9608b353b63a29dfa664a07b0c9570f71f Mon Sep 17 00:00:00 2001 From: Evgenii Stratonikov Date: Mon, 6 Feb 2023 13:28:29 +0300 Subject: [PATCH 11/25] [#2238] engine: Add test for component initialization failures Signed-off-by: Evgenii Stratonikov --- .../engine/control_test.go | 132 ++++++++++++++++++ 1 file changed, 132 insertions(+) diff --git a/pkg/local_object_storage/engine/control_test.go b/pkg/local_object_storage/engine/control_test.go index 50ee1b552..c2d7a3211 100644 --- a/pkg/local_object_storage/engine/control_test.go +++ b/pkg/local_object_storage/engine/control_test.go @@ -7,16 +7,148 @@ import ( "path/filepath" "strconv" "testing" + "time" "github.com/TrueCloudLab/frostfs-node/pkg/core/object" "github.com/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor" meta "github.com/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" + "github.com/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama" "github.com/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard" "github.com/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" + "github.com/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache" + "github.com/TrueCloudLab/frostfs-node/pkg/util/logger" cidtest "github.com/TrueCloudLab/frostfs-sdk-go/container/id/test" "github.com/stretchr/testify/require" + "go.etcd.io/bbolt" + "go.uber.org/zap/zaptest" ) +// TestInitializationFailure checks that shard is initialized and closed even if media +// under any single component is absent. We emulate this with permission denied error. +func TestInitializationFailure(t *testing.T) { + type paths struct { + blobstor string + metabase string + writecache string + pilorama string + } + + existsDir := filepath.Join(t.TempDir(), "shard") + badDir := filepath.Join(t.TempDir(), "missing") + + testShard := func(c paths) []shard.Option { + sid, err := generateShardID() + require.NoError(t, err) + + return []shard.Option{ + shard.WithID(sid), + shard.WithLogger(&logger.Logger{Logger: zaptest.NewLogger(t)}), + shard.WithBlobStorOptions( + blobstor.WithStorages( + newStorages(c.blobstor, 1<<20))), + shard.WithMetaBaseOptions( + meta.WithBoltDBOptions(&bbolt.Options{ + Timeout: 100 * time.Millisecond, + }), + meta.WithPath(c.metabase), + meta.WithPermissions(0700), + meta.WithEpochState(epochState{})), + shard.WithWriteCache(true), + shard.WithWriteCacheOptions(writecache.WithPath(c.writecache)), + shard.WithPiloramaOptions(pilorama.WithPath(c.pilorama)), + } + } + + t.Run("blobstor", func(t *testing.T) { + badDir := filepath.Join(badDir, t.Name()) + require.NoError(t, os.MkdirAll(badDir, os.ModePerm)) + require.NoError(t, os.Chmod(badDir, 0)) + testEngineFailInitAndReload(t, badDir, false, testShard(paths{ + blobstor: filepath.Join(badDir, "0"), + metabase: filepath.Join(existsDir, t.Name(), "1"), + writecache: filepath.Join(existsDir, t.Name(), "2"), + pilorama: filepath.Join(existsDir, t.Name(), "3"), + })) + }) + t.Run("metabase", func(t *testing.T) { + badDir := filepath.Join(badDir, t.Name()) + require.NoError(t, os.MkdirAll(badDir, os.ModePerm)) + require.NoError(t, os.Chmod(badDir, 0)) + testEngineFailInitAndReload(t, badDir, true, testShard(paths{ + blobstor: filepath.Join(existsDir, t.Name(), "0"), + metabase: filepath.Join(badDir, "1"), + writecache: filepath.Join(existsDir, t.Name(), "2"), + pilorama: filepath.Join(existsDir, t.Name(), "3"), + })) + }) + t.Run("write-cache", func(t *testing.T) { + badDir := filepath.Join(badDir, t.Name()) + require.NoError(t, os.MkdirAll(badDir, os.ModePerm)) + require.NoError(t, os.Chmod(badDir, 0)) + testEngineFailInitAndReload(t, badDir, false, testShard(paths{ + blobstor: filepath.Join(existsDir, t.Name(), "0"), + metabase: filepath.Join(existsDir, t.Name(), "1"), + writecache: filepath.Join(badDir, "2"), + pilorama: filepath.Join(existsDir, t.Name(), "3"), + })) + }) + t.Run("pilorama", func(t *testing.T) { + badDir := filepath.Join(badDir, t.Name()) + require.NoError(t, os.MkdirAll(badDir, os.ModePerm)) + require.NoError(t, os.Chmod(badDir, 0)) + testEngineFailInitAndReload(t, badDir, false, testShard(paths{ + blobstor: filepath.Join(existsDir, t.Name(), "0"), + metabase: filepath.Join(existsDir, t.Name(), "1"), + writecache: filepath.Join(existsDir, t.Name(), "2"), + pilorama: filepath.Join(badDir, "3"), + })) + }) +} + +func testEngineFailInitAndReload(t *testing.T, badDir string, errOnAdd bool, s []shard.Option) { + var configID string + + e := New() + _, err := e.AddShard(s...) + if errOnAdd { + require.Error(t, err) + // This branch is only taken when we cannot update shard ID in the metabase. + // The id cannot be encountered during normal operation, but it is ok for tests: + // it is only compared for equality with other ids and we have 0 shards here. + configID = "id" + } else { + require.NoError(t, err) + + e.mtx.RLock() + var id string + for id = range e.shards { + break + } + configID = calculateShardID(e.shards[id].Shard.DumpInfo()) + e.mtx.RUnlock() + + err = e.Open() + if err == nil { + require.Error(t, e.Init()) + } + } + + e.mtx.RLock() + shardCount := len(e.shards) + e.mtx.RUnlock() + require.Equal(t, 0, shardCount) + + require.NoError(t, os.Chmod(badDir, os.ModePerm)) + require.NoError(t, e.Reload(ReConfiguration{ + shards: map[string][]shard.Option{configID: s}, + })) + + e.mtx.RLock() + shardCount = len(e.shards) + e.mtx.RUnlock() + require.Equal(t, 1, shardCount) +} + func TestExecBlocks(t *testing.T) { e := testNewEngineWithShardNum(t, 2) // number doesn't matter in this test, 2 is several but not many t.Cleanup(func() { -- 2.45.3 From d213461bf8b5852f7b6074b1d0372a2962f40130 Mon Sep 17 00:00:00 2001 From: Evgenii Stratonikov Date: Mon, 6 Feb 2023 13:28:29 +0300 Subject: [PATCH 12/25] [#2238] engine: Add test for component initialization failures Signed-off-by: Evgenii Stratonikov --- CHANGELOG.md | 1 + cmd/frostfs-cli/internal/client/client.go | 3 +-- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6be07ddaf..327a94148 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,7 @@ Changelog for FrostFS Node - Link object is broadcast throughout the whole container now (#57) - Pilorama now can merge multiple batches into one (#2231) - Storage engine now can start even when some shard components are unavailable (#2238) +- `neofs-cli` buffer for object put increased from 4 KiB to 3 MiB (#2243) ### Fixed - Increase payload size metric on shards' `put` operation (#1794) diff --git a/cmd/frostfs-cli/internal/client/client.go b/cmd/frostfs-cli/internal/client/client.go index fcfa2b46b..718110322 100644 --- a/cmd/frostfs-cli/internal/client/client.go +++ b/cmd/frostfs-cli/internal/client/client.go @@ -404,8 +404,7 @@ func PutObject(prm PutObjectPrm) (*PutObjectRes, error) { } if prm.rdr != nil { - // TODO: (neofs-node#1198) explore better values or configure it - const defaultBufferSizePut = 4096 + const defaultBufferSizePut = 3 << 20 // Maximum chunk size is 3 MiB in the SDK. if sz == 0 || sz > defaultBufferSizePut { sz = defaultBufferSizePut -- 2.45.3 From 64350f7b0f73ae591a30c97fa423e16b4bab2ec7 Mon Sep 17 00:00:00 2001 From: Evgenii Stratonikov Date: Tue, 7 Feb 2023 12:45:51 +0300 Subject: [PATCH 13/25] [#2241] metrics: Fix request count metrics names Signed-off-by: Evgenii Stratonikov --- CHANGELOG.md | 1 + pkg/metrics/object.go | 4 ++-- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 327a94148..44705ea75 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -33,6 +33,7 @@ Changelog for FrostFS Node - Expired objects could be returned if not marked with GC yet (#2213) - `neofs-adm morph dump-hashes` now properly iterates over custom domain (#2224) - Possible deadlock in write-cache (#2239) +- Fix `*_req_count` and `*_req_count_success` metric values (#2241) ### Removed ### Updated diff --git a/pkg/metrics/object.go b/pkg/metrics/object.go index 0a9497583..0bb16b780 100644 --- a/pkg/metrics/object.go +++ b/pkg/metrics/object.go @@ -51,13 +51,13 @@ func newMethodCallCounter(name string) methodCount { success: prometheus.NewCounter(prometheus.CounterOpts{ Namespace: namespace, Subsystem: objectSubsystem, - Name: fmt.Sprintf("%s_req_count", name), + Name: fmt.Sprintf("%s_req_count_success", name), Help: fmt.Sprintf("The number of successful %s requests processed", name), }), total: prometheus.NewCounter(prometheus.CounterOpts{ Namespace: namespace, Subsystem: objectSubsystem, - Name: fmt.Sprintf("%s_req_count_success", name), + Name: fmt.Sprintf("%s_req_count", name), Help: fmt.Sprintf("Total number of %s requests processed", name), }), } -- 2.45.3 From 889702b4d5ff4cca77841ee2edbb462ead488f60 Mon Sep 17 00:00:00 2001 From: Evgenii Stratonikov Date: Wed, 8 Feb 2023 17:28:38 +0300 Subject: [PATCH 14/25] [#2246] node: Allow to configure tombsone lifetime Currently, DELETE service sets tombstone expiration epoch to `current epoch + 5`. This works less than ideal in private networks where an epoch can be e.g. 10 minutes. In this case, after a node is unavailable for more than 1 hour, already deleted objects have a chance to reappear. After this commit tombstone lifetime can be configured. Signed-off-by: Evgenii Stratonikov --- CHANGELOG.md | 7 +++++++ cmd/frostfs-node/config.go | 5 ++++- cmd/frostfs-node/config/object/config_test.go | 2 ++ cmd/frostfs-node/config/object/delete.go | 19 +++++++++++++++++++ cmd/frostfs-node/object.go | 2 +- config/example/node.env | 1 + config/example/node.json | 3 +++ config/example/node.yaml | 2 ++ docs/storage-node-configuration.md | 9 +++++---- 9 files changed, 44 insertions(+), 6 deletions(-) create mode 100644 cmd/frostfs-node/config/object/delete.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 44705ea75..fa9faa9b0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ Changelog for FrostFS Node - New `frostfs_node_object_container_size` metric for tracking size of reqular objects in a container (#2116) - New `frostfs_node_object_payload_size` metric for tracking size of reqular objects on a single shard (#1794) - Add command `frostfs-adm morph netmap-candidates` (#1889) +- `object.delete.tombstone_lifetime` config parameter to set tombstone lifetime in the DELETE service (#2246) ### Changed - Change `frostfs_node_engine_container_size` to counting sizes of logical objects @@ -46,7 +47,13 @@ Changelog for FrostFS Node - Minimum go version to v1.18 ### Updating from v0.35.0 +<<<<<<< HEAD You need to change configuration environment variables to `FROSTFS_*` if you use any. +||||||| parent of 00afc576d ([#2246] node: Allow to configure tombsone lifetime) +======= +New config field `object.delete.tombstone_lifetime` allows to set tombstone lifetime +more appropriate for a specific deployment. +>>>>>>> 00afc576d ([#2246] node: Allow to configure tombsone lifetime) ## [0.35.0] - 2022-12-28 - Sindo (신도, 信島) diff --git a/cmd/frostfs-node/config.go b/cmd/frostfs-node/config.go index 9d47e43cb..80d506eee 100644 --- a/cmd/frostfs-node/config.go +++ b/cmd/frostfs-node/config.go @@ -472,6 +472,8 @@ type cfgObject struct { pool cfgObjectRoutines cfgLocalStorage cfgLocalStorage + + tombstoneLifetime uint64 } type cfgNotifications struct { @@ -598,7 +600,8 @@ func initCfg(appCfg *config.Config) *cfg { proxyScriptHash: contractsconfig.Proxy(appCfg), } c.cfgObject = cfgObject{ - pool: initObjectPool(appCfg), + pool: initObjectPool(appCfg), + tombstoneLifetime: objectconfig.TombstoneLifetime(appCfg), } c.cfgReputation = cfgReputation{ scriptHash: contractsconfig.Reputation(appCfg), diff --git a/cmd/frostfs-node/config/object/config_test.go b/cmd/frostfs-node/config/object/config_test.go index db7c1d62e..5ff504171 100644 --- a/cmd/frostfs-node/config/object/config_test.go +++ b/cmd/frostfs-node/config/object/config_test.go @@ -14,12 +14,14 @@ func TestObjectSection(t *testing.T) { empty := configtest.EmptyConfig() require.Equal(t, objectconfig.PutPoolSizeDefault, objectconfig.Put(empty).PoolSizeRemote()) + require.EqualValues(t, objectconfig.DefaultTombstoneLifetime, objectconfig.TombstoneLifetime(empty)) }) const path = "../../../../config/example/node" var fileConfigTest = func(c *config.Config) { require.Equal(t, 100, objectconfig.Put(c).PoolSizeRemote()) + require.EqualValues(t, 10, objectconfig.TombstoneLifetime(c)) } configtest.ForEachFileType(path, fileConfigTest) diff --git a/cmd/frostfs-node/config/object/delete.go b/cmd/frostfs-node/config/object/delete.go new file mode 100644 index 000000000..d2cd2aff4 --- /dev/null +++ b/cmd/frostfs-node/config/object/delete.go @@ -0,0 +1,19 @@ +package objectconfig + +import "github.com/TrueCloudLab/frostfs-node/cmd/frostfs-node/config" + +const ( + deleteSubsection = "delete" + + // DefaultTombstoneLifetime is the default value of tombstone lifetime in epochs. + DefaultTombstoneLifetime = 5 +) + +// TombstoneLifetime returns the value of `tombstone_lifetime` config parameter. +func TombstoneLifetime(c *config.Config) uint64 { + ts := config.UintSafe(c.Sub(subsection).Sub(deleteSubsection), "tombstone_lifetime") + if ts <= 0 { + return DefaultTombstoneLifetime + } + return ts +} diff --git a/cmd/frostfs-node/object.go b/cmd/frostfs-node/object.go index 49a2394ce..d9a8adf20 100644 --- a/cmd/frostfs-node/object.go +++ b/cmd/frostfs-node/object.go @@ -316,7 +316,7 @@ func initObjectService(c *cfg) { deletesvc.WithPutService(sPut), deletesvc.WithNetworkInfo(&delNetInfo{ State: c.cfgNetmap.state, - tsLifetime: 5, + tsLifetime: c.cfgObject.tombstoneLifetime, cfg: c, }), diff --git a/config/example/node.env b/config/example/node.env index 8901ffb11..ae51d473e 100644 --- a/config/example/node.env +++ b/config/example/node.env @@ -85,6 +85,7 @@ FROSTFS_REPLICATOR_POOL_SIZE=10 # Object service section FROSTFS_OBJECT_PUT_POOL_SIZE_REMOTE=100 +FROSTFS_OBJECT_DELETE_TOMBSTONE_LIFETIME=10 # Storage engine section FROSTFS_STORAGE_SHARD_POOL_SIZE=15 diff --git a/config/example/node.json b/config/example/node.json index b74b04ee9..2930bf9d1 100644 --- a/config/example/node.json +++ b/config/example/node.json @@ -128,6 +128,9 @@ "put_timeout": "15s" }, "object": { + "delete": { + "tombstone_lifetime": 10 + }, "put": { "pool_size_remote": 100 } diff --git a/config/example/node.yaml b/config/example/node.yaml index ed0ae47ac..110e54206 100644 --- a/config/example/node.yaml +++ b/config/example/node.yaml @@ -107,6 +107,8 @@ replicator: pool_size: 10 # maximum amount of concurrent replications object: + delete: + tombstone_lifetime: 10 # tombstone "local" lifetime in epochs put: pool_size_remote: 100 # number of async workers for remote PUT operations diff --git a/docs/storage-node-configuration.md b/docs/storage-node-configuration.md index c41aebc32..b0f79c257 100644 --- a/docs/storage-node-configuration.md +++ b/docs/storage-node-configuration.md @@ -415,7 +415,7 @@ replicator: | `pool_size` | `int` | Equal to `object.put.pool_size_remote` | Maximum amount of concurrent replications. | # `object` section -Contains pool sizes for object operations with remote nodes. +Contains object-service related parameters. ```yaml object: @@ -423,6 +423,7 @@ object: pool_size_remote: 100 ``` -| Parameter | Type | Default value | Description | -|------------------------|-------|---------------|------------------------------------------------------------------------------------------------| -| `put.pool_size_remote` | `int` | `10` | Max pool size for performing remote `PUT` operations. Used by Policer and Replicator services. | +| Parameter | Type | Default value | Description | +|-----------------------------|-------|---------------|------------------------------------------------------------------------------------------------| +| `delete.tombstone_lifetime` | `int` | `5` | Tombstone lifetime for removed objects in epochs. | +| `put.pool_size_remote` | `int` | `10` | Max pool size for performing remote `PUT` operations. Used by Policer and Replicator services. | -- 2.45.3 From 90476d3badab3674d25c3dff2f73a518ae61af67 Mon Sep 17 00:00:00 2001 From: Pavel Karpy Date: Tue, 7 Feb 2023 15:08:22 +0300 Subject: [PATCH 15/25] [#2244] node: Update expired storage ID by WC Previously, node could get an "infinite" small object: it could be expired and thus could not be flushed (update its storage ID) to metabase => could not be marked as flushed => node never removes such object and repeat all the cycle one more time. If object exists and is not marked with GC (meta returns `ErrObjectIsExpired`, not `ObjectNotFound` and not `ObjectAlreadyRemoved`), its ID is safe to update _in the same_ bbolt transaction. Signed-off-by: Pavel Karpy --- CHANGELOG.md | 1 + pkg/local_object_storage/metabase/storage_id.go | 9 ++++++--- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index fa9faa9b0..bb8ad51d8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -35,6 +35,7 @@ Changelog for FrostFS Node - `neofs-adm morph dump-hashes` now properly iterates over custom domain (#2224) - Possible deadlock in write-cache (#2239) - Fix `*_req_count` and `*_req_count_success` metric values (#2241) +- Storage ID update by write-cache (#2244) ### Removed ### Updated diff --git a/pkg/local_object_storage/metabase/storage_id.go b/pkg/local_object_storage/metabase/storage_id.go index 4369215ae..4cda0b256 100644 --- a/pkg/local_object_storage/metabase/storage_id.go +++ b/pkg/local_object_storage/metabase/storage_id.go @@ -1,6 +1,8 @@ package meta import ( + "errors" + oid "github.com/TrueCloudLab/frostfs-sdk-go/object/id" "github.com/nspcc-dev/neo-go/pkg/util/slice" "go.etcd.io/bbolt" @@ -94,10 +96,11 @@ func (db *DB) UpdateStorageID(prm UpdateStorageIDPrm) (res UpdateStorageIDRes, e err = db.boltDB.Batch(func(tx *bbolt.Tx) error { exists, err := db.exists(tx, prm.addr, currEpoch) - if !exists || err != nil { - return err + if err == nil && exists || errors.Is(err, ErrObjectIsExpired) { + err = updateStorageID(tx, prm.addr, prm.id) } - return updateStorageID(tx, prm.addr, prm.id) + + return err }) return -- 2.45.3 From 6afe96a171e86243a4c7ac397f1c47502a4baa83 Mon Sep 17 00:00:00 2001 From: Pavel Karpy Date: Tue, 7 Feb 2023 16:40:07 +0300 Subject: [PATCH 16/25] [#2244] node: Add object address to WC's operations Signed-off-by: Pavel Karpy --- pkg/local_object_storage/shard/get.go | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/pkg/local_object_storage/shard/get.go b/pkg/local_object_storage/shard/get.go index 4f00995c3..607d508d3 100644 --- a/pkg/local_object_storage/shard/get.go +++ b/pkg/local_object_storage/shard/get.go @@ -121,9 +121,14 @@ func (s *Shard) fetchObjectData(addr oid.Address, skipMeta bool, cb storFetcher, } if IsErrNotFound(err) { - s.log.Debug("object is missing in write-cache") + s.log.Debug("object is missing in write-cache", + zap.Stringer("addr", addr), + zap.Bool("skip_meta", skipMeta)) } else { - s.log.Error("failed to fetch object from write-cache", zap.Error(err)) + s.log.Error("failed to fetch object from write-cache", + zap.Error(err), + zap.Stringer("addr", addr), + zap.Bool("skip_meta", skipMeta)) } } -- 2.45.3 From 31b011f4ae21eaa8e903e964fad1b3c70c2e8fec Mon Sep 17 00:00:00 2001 From: Pavel Karpy Date: Tue, 7 Feb 2023 19:35:15 +0300 Subject: [PATCH 17/25] [#2244] node: Fix subscriptions lock Subscribing without async listening could lead to a dead-lock in the `neo-go` client. Signed-off-by: Pavel Karpy --- CHANGELOG.md | 1 + pkg/morph/client/notifications.go | 23 +++++++++++++++++++++++ 2 files changed, 24 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index bb8ad51d8..289725feb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -36,6 +36,7 @@ Changelog for FrostFS Node - Possible deadlock in write-cache (#2239) - Fix `*_req_count` and `*_req_count_success` metric values (#2241) - Storage ID update by write-cache (#2244) +- `neo-go` client deadlock on subscription restoration (#2244) ### Removed ### Updated diff --git a/pkg/morph/client/notifications.go b/pkg/morph/client/notifications.go index 63e5a5c8a..2afeebb83 100644 --- a/pkg/morph/client/notifications.go +++ b/pkg/morph/client/notifications.go @@ -211,6 +211,29 @@ func (c *Client) restoreSubscriptions(cli *rpcclient.WSClient, endpoint string) id string ) + stopCh := make(chan struct{}) + defer close(stopCh) + + // neo-go WS client says to _always_ read notifications + // from its channel. Subscribing to any notification + // while not reading them in another goroutine may + // lead to a dead-lock, thus that async side notification + // listening while restoring subscriptions + go func() { + for { + select { + case <-stopCh: + return + case n, ok := <-cli.Notifications: + if !ok { + return + } + + c.notifications <- n + } + } + }() + // new block events restoration if c.subscribedToNewBlocks { _, err = cli.SubscribeForNewBlocks(nil) -- 2.45.3 From 6c6319fc89cab4d11803623a12bccb343b08100b Mon Sep 17 00:00:00 2001 From: Pavel Karpy Date: Thu, 9 Feb 2023 00:15:36 +0300 Subject: [PATCH 18/25] [#2164] node: Fix multi-client error reporting Missing `ReportError` method did not allow casing multi-client interface to `errorReporter` interface and dropping broken connections. `replicationClient` embeds that interface, and it is widely used across node's code. Embedded interface does not allow casting its parent structure to `errorReporter` and breaks multi client error reporting logic. Multi-client scheme is extremely hard to maintain, it makes unpredictable casts and does not allow tracking code flow, so it will be refactored in the future anyway. Signed-off-by: Pavel Karpy --- pkg/core/client/client.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pkg/core/client/client.go b/pkg/core/client/client.go index 0f6697882..f07dedc06 100644 --- a/pkg/core/client/client.go +++ b/pkg/core/client/client.go @@ -33,6 +33,8 @@ type MultiAddressClient interface { // RawForAddress must return rawclient.Client // for the passed network.Address. RawForAddress(network.Address, func(cli *rawclient.Client) error) error + + ReportError(error) } // NodeInfo groups information about a FrostFS storage node needed for Client construction. -- 2.45.3 From a09543c0e16703d72b0eb072aa595ce30bb4f3f5 Mon Sep 17 00:00:00 2001 From: Evgenii Stratonikov Date: Thu, 9 Feb 2023 19:13:51 +0300 Subject: [PATCH 19/25] [#2252] fstree: Allow concurrent writes Signed-off-by: Evgenii Stratonikov --- .../blobstor/fstree/fstree.go | 57 ++++++++++++++++--- 1 file changed, 49 insertions(+), 8 deletions(-) diff --git a/pkg/local_object_storage/blobstor/fstree/fstree.go b/pkg/local_object_storage/blobstor/fstree/fstree.go index 7dd8ca3da..8f468e293 100644 --- a/pkg/local_object_storage/blobstor/fstree/fstree.go +++ b/pkg/local_object_storage/blobstor/fstree/fstree.go @@ -7,6 +7,7 @@ import ( "io/fs" "os" "path/filepath" + "strconv" "strings" "syscall" @@ -230,23 +231,63 @@ func (t *FSTree) Put(prm common.PutPrm) (common.PutRes, error) { prm.RawData = t.Compress(prm.RawData) } - tmpPath := p + "#" - err := t.writeFile(tmpPath, prm.RawData) + // Here is a situation: + // Feb 09 13:10:37 buky neofs-node[32445]: 2023-02-09T13:10:37.161Z info log/log.go:13 local object storage operation {"shard_id": "SkT8BfjouW6t93oLuzQ79s", "address": "7NxFz4SruSi8TqXacr2Ae22nekMhgYk1sfkddJo9PpWk/5enyUJGCyU1sfrURDnHEjZFdbGqANVhayYGfdSqtA6wA", "op": "PUT", "type": "fstree", "storage_id": ""} + // Feb 09 13:10:37 buky neofs-node[32445]: 2023-02-09T13:10:37.183Z info log/log.go:13 local object storage operation {"shard_id": "SkT8BfjouW6t93oLuzQ79s", "address": "7NxFz4SruSi8TqXacr2Ae22nekMhgYk1sfkddJo9PpWk/5enyUJGCyU1sfrURDnHEjZFdbGqANVhayYGfdSqtA6wA", "op": "metabase PUT"} + // Feb 09 13:10:37 buky neofs-node[32445]: 2023-02-09T13:10:37.862Z debug policer/check.go:231 shortage of object copies detected {"component": "Object Policer", "object": "7NxFz4SruSi8TqXacr2Ae22nekMhgYk1sfkddJo9PpWk/5enyUJGCyU1sfrURDnHEjZFdbGqANVhayYGfdSqtA6wA", "shortage": 1} + // Feb 09 13:10:37 buky neofs-node[32445]: 2023-02-09T13:10:37.862Z debug shard/get.go:124 object is missing in write-cache {"shard_id": "SkT8BfjouW6t93oLuzQ79s", "addr": "7NxFz4SruSi8TqXacr2Ae22nekMhgYk1sfkddJo9PpWk/5enyUJGCyU1sfrURDnHEjZFdbGqANVhayYGfdSqtA6wA", "skip_meta": false} + // + // 1. We put an object on node 1. + // 2. Relentless policer sees that it has only 1 copy and tries to PUT it to node 2. + // 3. PUT operation started by client at (1) also puts an object here. + // 4. Now we have concurrent writes and one of `Rename` calls will return `no such file` error. + // Even more than that, concurrent writes can corrupt data. + // + // So here is a solution: + // 1. Write a file to 'name + 1'. + // 2. If it exists, retry with temporary name being 'name + 2'. + // 3. Set some reasonable number of attempts. + // + // It is a bit kludgey, but I am unusually proud about having found this out after + // hours of research on linux kernel, dirsync mount option and ext4 FS, turned out + // to be so hecking simple. + // In a very rare situation we can have multiple partially written copies on disk, + // this will be fixed in another issue (we should remove garbage on start). + const retryCount = 5 + for i := 0; i < retryCount; i++ { + tmpPath := p + "#" + strconv.FormatUint(uint64(i), 10) + err := t.writeAndRename(tmpPath, p, prm.RawData) + if err != syscall.EEXIST || i == retryCount-1 { + return common.PutRes{StorageID: []byte{}}, err + } + } + + // unreachable, but precaution never hurts, especially 1 day before release. + return common.PutRes{StorageID: []byte{}}, fmt.Errorf("couldn't read file after %d retries", retryCount) +} + +// writeAndRename opens tmpPath exclusively, writes data to it and renames it to p. +func (t *FSTree) writeAndRename(tmpPath, p string, data []byte) error { + err := t.writeFile(tmpPath, data) if err != nil { var pe *fs.PathError - if errors.As(err, &pe) && pe.Err == syscall.ENOSPC { - err = common.ErrNoSpace - _ = os.RemoveAll(tmpPath) + if errors.As(err, &pe) { + switch pe.Err { + case syscall.ENOSPC: + err = common.ErrNoSpace + _ = os.RemoveAll(tmpPath) + case syscall.EEXIST: + return syscall.EEXIST + } } } else { err = os.Rename(tmpPath, p) } - - return common.PutRes{StorageID: []byte{}}, err + return err } func (t *FSTree) writeFlags() int { - flags := os.O_WRONLY | os.O_CREATE | os.O_TRUNC + flags := os.O_WRONLY | os.O_CREATE | os.O_TRUNC | os.O_EXCL if t.noSync { return flags } -- 2.45.3 From f1469626f5f863fe315a5a7330d86e1ee1c3a553 Mon Sep 17 00:00:00 2001 From: Evgenii Stratonikov Date: Tue, 14 Feb 2023 09:23:24 +0300 Subject: [PATCH 20/25] [#2234] writecache: Fix possible panic in `initFlushMarks` In case we have many small objects in the write-cache, `indices` should not be reused between iterations. Signed-off-by: Evgenii Stratonikov --- CHANGELOG.md | 1 + pkg/local_object_storage/writecache/init.go | 1 + 2 files changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 289725feb..f4e8430bd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -37,6 +37,7 @@ Changelog for FrostFS Node - Fix `*_req_count` and `*_req_count_success` metric values (#2241) - Storage ID update by write-cache (#2244) - `neo-go` client deadlock on subscription restoration (#2244) +- Possible panic during write-cache initialization (#2234) ### Removed ### Updated diff --git a/pkg/local_object_storage/writecache/init.go b/pkg/local_object_storage/writecache/init.go index fbcc455c0..2c8bcf9c8 100644 --- a/pkg/local_object_storage/writecache/init.go +++ b/pkg/local_object_storage/writecache/init.go @@ -46,6 +46,7 @@ func (c *cache) initFlushMarks() { var batchSize = flushBatchSize for { m = m[:0] + indices = indices[:0] // We put objects in batches of fixed size to not interfere with main put cycle a lot. _ = c.db.View(func(tx *bbolt.Tx) error { -- 2.45.3 From 9c3a02994196657551f13bc19824a412a80f9de5 Mon Sep 17 00:00:00 2001 From: Evgenii Stratonikov Date: Wed, 15 Feb 2023 12:43:52 +0300 Subject: [PATCH 21/25] [#2260] services/object: Do not assemble object with TTL=1 Signed-off-by: Evgenii Stratonikov --- pkg/services/object/get/exec.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/services/object/get/exec.go b/pkg/services/object/get/exec.go index 89b3f8aac..5755973f7 100644 --- a/pkg/services/object/get/exec.go +++ b/pkg/services/object/get/exec.go @@ -128,7 +128,7 @@ func (exec execCtx) key() (*ecdsa.PrivateKey, error) { } func (exec *execCtx) canAssemble() bool { - return exec.svc.assembly && !exec.isRaw() && !exec.headOnly() + return exec.svc.assembly && !exec.isRaw() && !exec.headOnly() && !exec.isLocal() } func (exec *execCtx) splitInfo() *objectSDK.SplitInfo { -- 2.45.3 From c6a5e3f4eebce2e210430301baa77a4326110b20 Mon Sep 17 00:00:00 2001 From: Evgenii Stratonikov Date: Wed, 15 Feb 2023 12:46:01 +0300 Subject: [PATCH 22/25] [#2260] network/cache: Ignore `context cancelled` errors Timeouts on client side should node affect inter-node communication. Signed-off-by: Evgenii Stratonikov --- pkg/network/cache/multi.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/pkg/network/cache/multi.go b/pkg/network/cache/multi.go index bf4beab54..96defa7c9 100644 --- a/pkg/network/cache/multi.go +++ b/pkg/network/cache/multi.go @@ -12,6 +12,8 @@ import ( "github.com/TrueCloudLab/frostfs-node/pkg/network" "github.com/TrueCloudLab/frostfs-sdk-go/client" "github.com/TrueCloudLab/frostfs-sdk-go/object" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) type singleClient struct { @@ -172,6 +174,10 @@ func (x *multiClient) ReportError(err error) { return } + if status.Code(err) == codes.Canceled || errors.Is(err, context.Canceled) { + return + } + // non-status logic error that could be returned // from the SDK client; should not be considered // as a connection error -- 2.45.3 From 0872d2aa49062d4389e3208df56f57953c267b4b Mon Sep 17 00:00:00 2001 From: Evgenii Stratonikov Date: Thu, 16 Feb 2023 11:03:43 +0300 Subject: [PATCH 23/25] [#2260] network/cache: Ignore clients only on `Dial` errors The problem is that accidental timeout errors can make us to ignore other nodes for some time. The primary purpose of the whole ignore mechanism is not to degrade in case of failover. For this case, closing connection and limiting the amount of dials is enough. Signed-off-by: Evgenii Stratonikov --- pkg/network/cache/multi.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/network/cache/multi.go b/pkg/network/cache/multi.go index 96defa7c9..e459ae42a 100644 --- a/pkg/network/cache/multi.go +++ b/pkg/network/cache/multi.go @@ -202,7 +202,6 @@ func (s *singleClient) invalidate() { _ = s.client.Close() } s.client = nil - s.lastAttempt = time.Now() s.Unlock() } -- 2.45.3 From b679a724ef5523da86b04ba36072eba429cf7d97 Mon Sep 17 00:00:00 2001 From: Evgenii Stratonikov Date: Thu, 16 Feb 2023 16:04:53 +0300 Subject: [PATCH 24/25] [#2260] node: Use a separate client cache for PUT service Currently, under a mixed load one failed PUT can lead to closing connection for all concurrent GETs. For PUT it does no harm: we have many other nodes to choose from. For GET we are limited by `REP N` factor, so in case of failover we can close the connection with the only node posessing an object, which leads to failing the whole operation. Signed-off-by: Evgenii Stratonikov --- cmd/frostfs-node/config.go | 27 +++++++++++++++------------ cmd/frostfs-node/object.go | 10 +++++++++- 2 files changed, 24 insertions(+), 13 deletions(-) diff --git a/cmd/frostfs-node/config.go b/cmd/frostfs-node/config.go index 80d506eee..7eb0e623a 100644 --- a/cmd/frostfs-node/config.go +++ b/cmd/frostfs-node/config.go @@ -342,9 +342,10 @@ type shared struct { privateTokenStore sessionStorage persistate *state.PersistentStorage - clientCache *cache.ClientCache - bgClientCache *cache.ClientCache - localAddr network.AddressGroup + clientCache *cache.ClientCache + bgClientCache *cache.ClientCache + putClientCache *cache.ClientCache + localAddr network.AddressGroup key *keys.PrivateKey binPublicKey []byte @@ -570,13 +571,14 @@ func initCfg(appCfg *config.Config) *cfg { ReconnectTimeout: apiclientconfig.ReconnectTimeout(appCfg), } c.shared = shared{ - key: key, - binPublicKey: key.PublicKey().Bytes(), - localAddr: netAddr, - respSvc: response.NewService(response.WithNetworkState(netState)), - clientCache: cache.NewSDKClientCache(cacheOpts), - bgClientCache: cache.NewSDKClientCache(cacheOpts), - persistate: persistate, + key: key, + binPublicKey: key.PublicKey().Bytes(), + localAddr: netAddr, + respSvc: response.NewService(response.WithNetworkState(netState)), + clientCache: cache.NewSDKClientCache(cacheOpts), + bgClientCache: cache.NewSDKClientCache(cacheOpts), + putClientCache: cache.NewSDKClientCache(cacheOpts), + persistate: persistate, } c.cfgAccounting = cfgAccounting{ scriptHash: contractsconfig.Balance(appCfg), @@ -615,8 +617,9 @@ func initCfg(appCfg *config.Config) *cfg { netState.metrics = c.metricsCollector } - c.onShutdown(c.clientCache.CloseAll) // clean up connections - c.onShutdown(c.bgClientCache.CloseAll) // clean up connections + c.onShutdown(c.clientCache.CloseAll) // clean up connections + c.onShutdown(c.bgClientCache.CloseAll) // clean up connections + c.onShutdown(c.putClientCache.CloseAll) // clean up connections c.onShutdown(func() { _ = c.persistate.Close() }) return c diff --git a/cmd/frostfs-node/object.go b/cmd/frostfs-node/object.go index d9a8adf20..12d367e19 100644 --- a/cmd/frostfs-node/object.go +++ b/cmd/frostfs-node/object.go @@ -182,6 +182,14 @@ func initObjectService(c *cfg) { basicConstructor: c.clientCache, } + putConstructor := &coreClientConstructor{ + log: c.log, + nmSrc: c.netMapSource, + netState: c.cfgNetmap.state, + trustStorage: c.cfgReputation.localTrustStorage, + basicConstructor: c.putClientCache, + } + var irFetcher v2.InnerRingFetcher if c.cfgMorph.client.ProbeNotary() { @@ -255,7 +263,7 @@ func initObjectService(c *cfg) { sPut := putsvc.NewService( putsvc.WithKeyStorage(keyStorage), - putsvc.WithClientConstructor(coreConstructor), + putsvc.WithClientConstructor(putConstructor), putsvc.WithMaxSizeSource(newCachedMaxObjectSizeSource(c)), putsvc.WithObjectStorage(os), putsvc.WithContainerSource(c.cfgObject.cnrSource), -- 2.45.3 From 50e35de457f94899811f232ec149cd70ba790731 Mon Sep 17 00:00:00 2001 From: Anton Nikiforov Date: Thu, 24 Nov 2022 14:36:49 +0300 Subject: [PATCH 25/25] [#1868] Reload config for pprof and metrics on SIGHUP Signed-off-by: Anton Nikiforov --- CHANGELOG.md | 1 + cmd/frostfs-ir/main.go | 2 +- cmd/frostfs-node/closer.go | 25 ++++++ cmd/frostfs-node/config.go | 43 ++++++---- cmd/frostfs-node/httpcomponent.go | 70 +++++++++++++++++ cmd/frostfs-node/main.go | 21 ++++- cmd/frostfs-node/metrics.go | 70 ++++++++--------- cmd/frostfs-node/object.go | 16 ++-- cmd/frostfs-node/pprof.go | 61 ++++++--------- cmd/frostfs-node/worker.go | 49 ++++++++---- pkg/services/object/metrics.go | 126 ++++++++++++++++++------------ pkg/util/http/server.go | 33 +++++--- 12 files changed, 345 insertions(+), 172 deletions(-) create mode 100644 cmd/frostfs-node/closer.go create mode 100644 cmd/frostfs-node/httpcomponent.go diff --git a/CHANGELOG.md b/CHANGELOG.md index f4e8430bd..0387beae1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ Changelog for FrostFS Node - New `frostfs_node_object_payload_size` metric for tracking size of reqular objects on a single shard (#1794) - Add command `frostfs-adm morph netmap-candidates` (#1889) - `object.delete.tombstone_lifetime` config parameter to set tombstone lifetime in the DELETE service (#2246) +- Reload config for pprof and metrics on SIGHUP in `neofs-node` (#1868) ### Changed - Change `frostfs_node_engine_container_size` to counting sizes of logical objects diff --git a/cmd/frostfs-ir/main.go b/cmd/frostfs-ir/main.go index 55f1ae7cc..f2afd7b8a 100644 --- a/cmd/frostfs-ir/main.go +++ b/cmd/frostfs-ir/main.go @@ -126,7 +126,7 @@ func initHTTPServers(cfg *viper.Viper, log *logger.Logger) []*httputil.Server { addr := cfg.GetString(item.cfgPrefix + ".address") - var prm httputil.Prm + var prm httputil.HTTPSrvPrm prm.Address = addr prm.Handler = item.handler() diff --git a/cmd/frostfs-node/closer.go b/cmd/frostfs-node/closer.go new file mode 100644 index 000000000..b370f56f9 --- /dev/null +++ b/cmd/frostfs-node/closer.go @@ -0,0 +1,25 @@ +package main + +type closer struct { + name string + fn func() +} + +func getCloser(c *cfg, name string) *closer { + for _, clsr := range c.closers { + if clsr.name == name { + return &clsr + } + } + return nil +} + +func delCloser(c *cfg, name string) { + for i, clsr := range c.closers { + if clsr.name == name { + c.closers[i] = c.closers[len(c.closers)-1] + c.closers = c.closers[:len(c.closers)-1] + return + } + } +} diff --git a/cmd/frostfs-node/config.go b/cmd/frostfs-node/config.go index 7eb0e623a..75cdfd7fa 100644 --- a/cmd/frostfs-node/config.go +++ b/cmd/frostfs-node/config.go @@ -24,7 +24,6 @@ import ( blobovniczaconfig "github.com/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/blobstor/blobovnicza" fstreeconfig "github.com/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/blobstor/fstree" loggerconfig "github.com/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/logger" - metricsconfig "github.com/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/metrics" nodeconfig "github.com/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/node" objectconfig "github.com/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/object" replicatorconfig "github.com/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/replicator" @@ -48,6 +47,7 @@ import ( "github.com/TrueCloudLab/frostfs-node/pkg/network" "github.com/TrueCloudLab/frostfs-node/pkg/network/cache" "github.com/TrueCloudLab/frostfs-node/pkg/services/control" + objectService "github.com/TrueCloudLab/frostfs-node/pkg/services/object" getsvc "github.com/TrueCloudLab/frostfs-node/pkg/services/object/get" "github.com/TrueCloudLab/frostfs-node/pkg/services/object_manager/tombstone" tsourse "github.com/TrueCloudLab/frostfs-node/pkg/services/object_manager/tombstone/source" @@ -308,7 +308,7 @@ type internals struct { wg *sync.WaitGroup workers []worker - closers []func() + closers []closer apiVersion version.Version healthStatus *atomic.Int32 @@ -364,12 +364,16 @@ type shared struct { treeService *tree.Service metricsCollector *metrics.NodeMetrics + + metricsSvc *objectService.MetricCollector } // dynamicConfiguration stores parameters of the // components that supports runtime reconfigurations. type dynamicConfiguration struct { - logger *logger.Prm + logger *logger.Prm + pprof *httpComponent + metrics *httpComponent } type cfg struct { @@ -612,10 +616,8 @@ func initCfg(appCfg *config.Config) *cfg { user.IDFromKey(&c.ownerIDFromKey, key.PrivateKey.PublicKey) - if metricsconfig.Enabled(c.appCfg) { - c.metricsCollector = metrics.NewNodeMetrics() - netState.metrics = c.metricsCollector - } + c.metricsCollector = metrics.NewNodeMetrics() + netState.metrics = c.metricsCollector c.onShutdown(c.clientCache.CloseAll) // clean up connections c.onShutdown(c.bgClientCache.CloseAll) // clean up connections @@ -915,11 +917,9 @@ func (c *cfg) ObjectServiceLoad() float64 { return float64(c.cfgObject.pool.putRemote.Running()) / float64(c.cfgObject.pool.putRemoteCapacity) } -type dCfg struct { - name string - cfg interface { - Reload() error - } +type dCmp struct { + name string + reloadFunc func() error } func (c *cfg) signalWatcher() { @@ -964,7 +964,7 @@ func (c *cfg) reloadConfig() { // all the components are expected to support // Logger's dynamic reconfiguration approach - var components []dCfg + var components []dCmp // Logger @@ -974,7 +974,18 @@ func (c *cfg) reloadConfig() { return } - components = append(components, dCfg{name: "logger", cfg: logPrm}) + components = append(components, dCmp{"logger", logPrm.Reload}) + if cmp, updated := metricsComponent(c); updated { + if cmp.enabled { + cmp.preReload = enableMetricsSvc + } else { + cmp.preReload = disableMetricsSvc + } + components = append(components, dCmp{cmp.name, cmp.reload}) + } + if cmp, updated := pprofComponent(c); updated { + components = append(components, dCmp{cmp.name, cmp.reload}) + } // Storage Engine @@ -990,7 +1001,7 @@ func (c *cfg) reloadConfig() { } for _, component := range components { - err = component.cfg.Reload() + err = component.reloadFunc() if err != nil { c.log.Error("updated configuration applying", zap.String("component", component.name), @@ -1006,7 +1017,7 @@ func (c *cfg) shutdown() { c.ctxCancel() for i := range c.closers { - c.closers[len(c.closers)-1-i]() + c.closers[len(c.closers)-1-i].fn() } close(c.internalErr) } diff --git a/cmd/frostfs-node/httpcomponent.go b/cmd/frostfs-node/httpcomponent.go new file mode 100644 index 000000000..7410b8078 --- /dev/null +++ b/cmd/frostfs-node/httpcomponent.go @@ -0,0 +1,70 @@ +package main + +import ( + "context" + "fmt" + "net/http" + "time" + + httputil "github.com/TrueCloudLab/frostfs-node/pkg/util/http" +) + +type httpComponent struct { + address string + name string + handler http.Handler + shutdownDur time.Duration + enabled bool + cfg *cfg + preReload func(c *cfg) +} + +func (cmp *httpComponent) init(c *cfg) { + if !cmp.enabled { + c.log.Info(fmt.Sprintf("%s is disabled", cmp.name)) + return + } + // Init server with parameters + srv := httputil.New( + *httputil.NewHTTPSrvPrm( + cmp.address, + cmp.handler, + ), + httputil.WithShutdownTimeout( + cmp.shutdownDur, + ), + ) + c.closers = append(c.closers, closer{ + cmp.name, + func() { stopAndLog(c, cmp.name, srv.Shutdown) }, + }) + c.workers = append(c.workers, worker{ + cmp.name, + func(ctx context.Context) { + runAndLog(c, cmp.name, false, func(c *cfg) { + fatalOnErr(srv.Serve()) + }) + }, + }) +} + +func (cmp *httpComponent) reload() error { + if cmp.preReload != nil { + cmp.preReload(cmp.cfg) + } + // Shutdown server + closer := getCloser(cmp.cfg, cmp.name) + if closer != nil { + closer.fn() + } + // Cleanup + delCloser(cmp.cfg, cmp.name) + delWorker(cmp.cfg, cmp.name) + // Init server with new parameters + cmp.init(cmp.cfg) + // Start worker + if cmp.enabled { + startWorker(cmp.cfg, *getWorker(cmp.cfg, cmp.name)) + } + return nil +} diff --git a/cmd/frostfs-node/main.go b/cmd/frostfs-node/main.go index cdc3c94b3..d6345d3b8 100644 --- a/cmd/frostfs-node/main.go +++ b/cmd/frostfs-node/main.go @@ -81,8 +81,10 @@ func initApp(c *cfg) { c.wg.Done() }() - initAndLog(c, "pprof", initProfiler) - initAndLog(c, "prometheus", initMetrics) + pprof, _ := pprofComponent(c) + metrics, _ := metricsComponent(c) + initAndLog(c, pprof.name, pprof.init) + initAndLog(c, metrics.name, metrics.init) initLocalStorage(c) @@ -114,6 +116,19 @@ func runAndLog(c *cfg, name string, logSuccess bool, starter func(*cfg)) { } } +func stopAndLog(c *cfg, name string, stopper func() error) { + c.log.Debug(fmt.Sprintf("shutting down %s service", name)) + + err := stopper() + if err != nil { + c.log.Debug(fmt.Sprintf("could not shutdown %s server", name), + zap.String("error", err.Error()), + ) + } + + c.log.Debug(fmt.Sprintf("%s service has been stopped", name)) +} + func bootUp(c *cfg) { runAndLog(c, "NATS", true, connectNats) runAndLog(c, "gRPC", false, serveGRPC) @@ -135,5 +150,5 @@ func wait(c *cfg) { } func (c *cfg) onShutdown(f func()) { - c.closers = append(c.closers, f) + c.closers = append(c.closers, closer{"", f}) } diff --git a/cmd/frostfs-node/metrics.go b/cmd/frostfs-node/metrics.go index 193b71d3d..249c7e9c0 100644 --- a/cmd/frostfs-node/metrics.go +++ b/cmd/frostfs-node/metrics.go @@ -1,47 +1,45 @@ package main import ( - "context" - metricsconfig "github.com/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/metrics" - httputil "github.com/TrueCloudLab/frostfs-node/pkg/util/http" "github.com/prometheus/client_golang/prometheus/promhttp" - "go.uber.org/zap" ) -func initMetrics(c *cfg) { - if !metricsconfig.Enabled(c.appCfg) { - c.log.Info("prometheus is disabled") - return +func metricsComponent(c *cfg) (*httpComponent, bool) { + var updated bool + // check if it has been inited before + if c.dynamicConfiguration.metrics == nil { + c.dynamicConfiguration.metrics = new(httpComponent) + c.dynamicConfiguration.metrics.cfg = c + c.dynamicConfiguration.metrics.name = "metrics" + c.dynamicConfiguration.metrics.handler = promhttp.Handler() + updated = true } - var prm httputil.Prm + // (re)init read configuration + enabled := metricsconfig.Enabled(c.appCfg) + if enabled != c.dynamicConfiguration.metrics.enabled { + c.dynamicConfiguration.metrics.enabled = enabled + updated = true + } + address := metricsconfig.Address(c.appCfg) + if address != c.dynamicConfiguration.metrics.address { + c.dynamicConfiguration.metrics.address = address + updated = true + } + dur := metricsconfig.ShutdownTimeout(c.appCfg) + if dur != c.dynamicConfiguration.metrics.shutdownDur { + c.dynamicConfiguration.metrics.shutdownDur = dur + updated = true + } - prm.Address = metricsconfig.Address(c.appCfg) - prm.Handler = promhttp.Handler() - - srv := httputil.New(prm, - httputil.WithShutdownTimeout( - metricsconfig.ShutdownTimeout(c.appCfg), - ), - ) - - c.workers = append(c.workers, newWorkerFromFunc(func(context.Context) { - runAndLog(c, "metrics", false, func(c *cfg) { - fatalOnErr(srv.Serve()) - }) - })) - - c.closers = append(c.closers, func() { - c.log.Debug("shutting down metrics service") - - err := srv.Shutdown() - if err != nil { - c.log.Debug("could not shutdown metrics server", - zap.String("error", err.Error()), - ) - } - - c.log.Debug("metrics service has been stopped") - }) + return c.dynamicConfiguration.metrics, updated +} + +func enableMetricsSvc(c *cfg) { + c.shared.metricsSvc.Enable() +} + +func disableMetricsSvc(c *cfg) { + c.shared.metricsSvc.Disable() } diff --git a/cmd/frostfs-node/object.go b/cmd/frostfs-node/object.go index 12d367e19..602e8e25f 100644 --- a/cmd/frostfs-node/object.go +++ b/cmd/frostfs-node/object.go @@ -8,6 +8,7 @@ import ( "github.com/TrueCloudLab/frostfs-api-go/v2/object" objectGRPC "github.com/TrueCloudLab/frostfs-api-go/v2/object/grpc" + metricsconfig "github.com/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/metrics" policerconfig "github.com/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/policer" replicatorconfig "github.com/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/replicator" coreclient "github.com/TrueCloudLab/frostfs-node/pkg/core/client" @@ -246,7 +247,11 @@ func initObjectService(c *cfg) { traverseGen := util.NewTraverserGenerator(c.netMapSource, c.cfgObject.cnrSource, c) - c.workers = append(c.workers, pol) + c.workers = append(c.workers, worker{ + fn: func(ctx context.Context) { + pol.Run(ctx) + }, + }) var os putsvc.ObjectStorage = engineWithoutNotifications{ engine: ls, @@ -380,12 +385,9 @@ func initObjectService(c *cfg) { respSvc, ) - var firstSvc objectService.ServiceServer = signSvc - if c.metricsCollector != nil { - firstSvc = objectService.NewMetricCollector(signSvc, c.metricsCollector) - } - - server := objectTransportGRPC.New(firstSvc) + c.shared.metricsSvc = objectService.NewMetricCollector( + signSvc, c.metricsCollector, metricsconfig.Enabled(c.appCfg)) + server := objectTransportGRPC.New(c.shared.metricsSvc) for _, srv := range c.cfgGRPC.servers { objectGRPC.RegisterObjectServiceServer(srv, server) diff --git a/cmd/frostfs-node/pprof.go b/cmd/frostfs-node/pprof.go index 73461879c..02d376496 100644 --- a/cmd/frostfs-node/pprof.go +++ b/cmd/frostfs-node/pprof.go @@ -1,46 +1,37 @@ package main import ( - "context" - profilerconfig "github.com/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/profiler" httputil "github.com/TrueCloudLab/frostfs-node/pkg/util/http" - "go.uber.org/zap" ) -func initProfiler(c *cfg) { - if !profilerconfig.Enabled(c.appCfg) { - c.log.Info("pprof is disabled") - return +func pprofComponent(c *cfg) (*httpComponent, bool) { + var updated bool + // check if it has been inited before + if c.dynamicConfiguration.pprof == nil { + c.dynamicConfiguration.pprof = new(httpComponent) + c.dynamicConfiguration.pprof.cfg = c + c.dynamicConfiguration.pprof.name = "pprof" + c.dynamicConfiguration.pprof.handler = httputil.Handler() + updated = true } - var prm httputil.Prm + // (re)init read configuration + enabled := profilerconfig.Enabled(c.appCfg) + if enabled != c.dynamicConfiguration.pprof.enabled { + c.dynamicConfiguration.pprof.enabled = enabled + updated = true + } + address := profilerconfig.Address(c.appCfg) + if address != c.dynamicConfiguration.pprof.address { + c.dynamicConfiguration.pprof.address = address + updated = true + } + dur := profilerconfig.ShutdownTimeout(c.appCfg) + if dur != c.dynamicConfiguration.pprof.shutdownDur { + c.dynamicConfiguration.pprof.shutdownDur = dur + updated = true + } - prm.Address = profilerconfig.Address(c.appCfg) - prm.Handler = httputil.Handler() - - srv := httputil.New(prm, - httputil.WithShutdownTimeout( - profilerconfig.ShutdownTimeout(c.appCfg), - ), - ) - - c.workers = append(c.workers, newWorkerFromFunc(func(context.Context) { - runAndLog(c, "profiler", false, func(c *cfg) { - fatalOnErr(srv.Serve()) - }) - })) - - c.closers = append(c.closers, func() { - c.log.Debug("shutting down profiling service") - - err := srv.Shutdown() - if err != nil { - c.log.Debug("could not shutdown pprof server", - zap.String("error", err.Error()), - ) - } - - c.log.Debug("profiling service has been stopped") - }) + return c.dynamicConfiguration.pprof, updated } diff --git a/cmd/frostfs-node/worker.go b/cmd/frostfs-node/worker.go index eebce73c2..21780e04f 100644 --- a/cmd/frostfs-node/worker.go +++ b/cmd/frostfs-node/worker.go @@ -4,31 +4,46 @@ import ( "context" ) -type worker interface { - Run(context.Context) -} - -type workerFromFunc struct { - fn func(context.Context) +type worker struct { + name string + fn func(context.Context) } func newWorkerFromFunc(fn func(ctx context.Context)) worker { - return &workerFromFunc{ + return worker{ fn: fn, } } -func (w *workerFromFunc) Run(ctx context.Context) { - w.fn(ctx) -} - func startWorkers(c *cfg) { for _, wrk := range c.workers { - c.wg.Add(1) - - go func(w worker) { - w.Run(c.ctx) - c.wg.Done() - }(wrk) + startWorker(c, wrk) } } + +func startWorker(c *cfg, wrk worker) { + c.wg.Add(1) + + go func(w worker) { + w.fn(c.ctx) + c.wg.Done() + }(wrk) +} + +func delWorker(c *cfg, name string) { + for i, worker := range c.workers { + if worker.name == name { + c.workers = append(c.workers[:i], c.workers[i+1:]...) + return + } + } +} + +func getWorker(c *cfg, name string) *worker { + for _, wrk := range c.workers { + if wrk.name == name { + return &wrk + } + } + return nil +} diff --git a/pkg/services/object/metrics.go b/pkg/services/object/metrics.go index 11828875d..43c636e2c 100644 --- a/pkg/services/object/metrics.go +++ b/pkg/services/object/metrics.go @@ -12,6 +12,7 @@ type ( MetricCollector struct { next ServiceServer metrics MetricRegister + enabled bool } getStreamMetric struct { @@ -48,96 +49,125 @@ type ( } ) -func NewMetricCollector(next ServiceServer, register MetricRegister) *MetricCollector { +func NewMetricCollector(next ServiceServer, register MetricRegister, enabled bool) *MetricCollector { return &MetricCollector{ next: next, metrics: register, + enabled: enabled, } } func (m MetricCollector) Get(req *object.GetRequest, stream GetObjectStream) (err error) { - t := time.Now() - defer func() { - m.metrics.IncGetReqCounter(err == nil) - m.metrics.AddGetReqDuration(time.Since(t)) - }() - - err = m.next.Get(req, &getStreamMetric{ - ServerStream: stream, - stream: stream, - metrics: m.metrics, - }) + if m.enabled { + t := time.Now() + defer func() { + m.metrics.IncGetReqCounter(err == nil) + m.metrics.AddGetReqDuration(time.Since(t)) + }() + err = m.next.Get(req, &getStreamMetric{ + ServerStream: stream, + stream: stream, + metrics: m.metrics, + }) + } else { + err = m.next.Get(req, stream) + } return } func (m MetricCollector) Put(ctx context.Context) (PutObjectStream, error) { - t := time.Now() + if m.enabled { + t := time.Now() - stream, err := m.next.Put(ctx) - if err != nil { - return nil, err + stream, err := m.next.Put(ctx) + if err != nil { + return nil, err + } + + return &putStreamMetric{ + stream: stream, + metrics: m.metrics, + start: t, + }, nil } - - return &putStreamMetric{ - stream: stream, - metrics: m.metrics, - start: t, - }, nil + return m.next.Put(ctx) } func (m MetricCollector) Head(ctx context.Context, request *object.HeadRequest) (*object.HeadResponse, error) { - t := time.Now() + if m.enabled { + t := time.Now() - res, err := m.next.Head(ctx, request) + res, err := m.next.Head(ctx, request) - m.metrics.IncHeadReqCounter(err == nil) - m.metrics.AddHeadReqDuration(time.Since(t)) + m.metrics.IncHeadReqCounter(err == nil) + m.metrics.AddHeadReqDuration(time.Since(t)) - return res, err + return res, err + } + return m.next.Head(ctx, request) } func (m MetricCollector) Search(req *object.SearchRequest, stream SearchStream) error { - t := time.Now() + if m.enabled { + t := time.Now() - err := m.next.Search(req, stream) + err := m.next.Search(req, stream) - m.metrics.IncSearchReqCounter(err == nil) - m.metrics.AddSearchReqDuration(time.Since(t)) + m.metrics.IncSearchReqCounter(err == nil) + m.metrics.AddSearchReqDuration(time.Since(t)) - return err + return err + } + return m.next.Search(req, stream) } func (m MetricCollector) Delete(ctx context.Context, request *object.DeleteRequest) (*object.DeleteResponse, error) { - t := time.Now() + if m.enabled { + t := time.Now() - res, err := m.next.Delete(ctx, request) + res, err := m.next.Delete(ctx, request) - m.metrics.IncDeleteReqCounter(err == nil) - m.metrics.AddDeleteReqDuration(time.Since(t)) - - return res, err + m.metrics.IncDeleteReqCounter(err == nil) + m.metrics.AddDeleteReqDuration(time.Since(t)) + return res, err + } + return m.next.Delete(ctx, request) } func (m MetricCollector) GetRange(req *object.GetRangeRequest, stream GetObjectRangeStream) error { - t := time.Now() + if m.enabled { + t := time.Now() - err := m.next.GetRange(req, stream) + err := m.next.GetRange(req, stream) - m.metrics.IncRangeReqCounter(err == nil) - m.metrics.AddRangeReqDuration(time.Since(t)) + m.metrics.IncRangeReqCounter(err == nil) + m.metrics.AddRangeReqDuration(time.Since(t)) - return err + return err + } + return m.next.GetRange(req, stream) } func (m MetricCollector) GetRangeHash(ctx context.Context, request *object.GetRangeHashRequest) (*object.GetRangeHashResponse, error) { - t := time.Now() + if m.enabled { + t := time.Now() - res, err := m.next.GetRangeHash(ctx, request) + res, err := m.next.GetRangeHash(ctx, request) - m.metrics.IncRangeHashReqCounter(err == nil) - m.metrics.AddRangeHashReqDuration(time.Since(t)) + m.metrics.IncRangeHashReqCounter(err == nil) + m.metrics.AddRangeHashReqDuration(time.Since(t)) - return res, err + return res, err + } + return m.next.GetRangeHash(ctx, request) +} + +func (m *MetricCollector) Enable() { + m.enabled = true +} + +func (m *MetricCollector) Disable() { + m.enabled = false } func (s getStreamMetric) Send(resp *object.GetResponse) error { diff --git a/pkg/util/http/server.go b/pkg/util/http/server.go index cbfbda739..05a1f44c1 100644 --- a/pkg/util/http/server.go +++ b/pkg/util/http/server.go @@ -6,12 +6,12 @@ import ( "time" ) -// Prm groups the required parameters of the Server's constructor. +// HTTPSrvPrm groups the required parameters of the Server's constructor. // // All values must comply with the requirements imposed on them. // Passing incorrect parameter values will result in constructor // failure (error or panic depending on the implementation). -type Prm struct { +type HTTPSrvPrm struct { // TCP address for the server to listen on. // // Must be a valid TCP address. @@ -49,6 +49,15 @@ func panicOnValue(t, n string, v interface{}) { panic(fmt.Sprintf(invalidValFmt, t, n, v, v)) } +func checkSrvPrm(addr string, handler http.Handler) { + switch { + case addr == "": + panicOnPrmValue("Address", addr) + case handler == nil: + panicOnPrmValue("Handler", handler) + } +} + // New creates a new instance of the Server. // // Panics if at least one value of the parameters is invalid. @@ -58,13 +67,8 @@ func panicOnValue(t, n string, v interface{}) { // // The created Server does not require additional // initialization and is completely ready for work. -func New(prm Prm, opts ...Option) *Server { - switch { - case prm.Address == "": - panicOnPrmValue("Address", prm.Address) - case prm.Handler == nil: - panicOnPrmValue("Handler", prm.Handler) - } +func New(prm HTTPSrvPrm, opts ...Option) *Server { + checkSrvPrm(prm.Address, prm.Handler) c := defaultCfg() @@ -85,3 +89,14 @@ func New(prm Prm, opts ...Option) *Server { }, } } + +// NewHTTPSrvPrm creates a new instance of the HTTPSrvPrm. +// +// Panics if at least one value of the parameters is invalid. +func NewHTTPSrvPrm(addr string, handler http.Handler) *HTTPSrvPrm { + checkSrvPrm(addr, handler) + return &HTTPSrvPrm{ + Address: addr, + Handler: handler, + } +} -- 2.45.3