diff --git a/CHANGELOG.md b/CHANGELOG.md index 9002ee10b..0387beae1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,12 +9,17 @@ Changelog for FrostFS Node - New `frostfs_node_object_container_size` metric for tracking size of reqular objects in a container (#2116) - New `frostfs_node_object_payload_size` metric for tracking size of reqular objects on a single shard (#1794) - Add command `frostfs-adm morph netmap-candidates` (#1889) +- `object.delete.tombstone_lifetime` config parameter to set tombstone lifetime in the DELETE service (#2246) +- Reload config for pprof and metrics on SIGHUP in `neofs-node` (#1868) ### Changed - Change `frostfs_node_engine_container_size` to counting sizes of logical objects - `common.PrintVerbose` prints via `cobra.Command.Printf` (#1962) - Env prefix in configuration changed to `FROSTFS_*` (#43) - Link object is broadcast throughout the whole container now (#57) +- Pilorama now can merge multiple batches into one (#2231) +- Storage engine now can start even when some shard components are unavailable (#2238) +- `neofs-cli` buffer for object put increased from 4 KiB to 3 MiB (#2243) ### Fixed - Increase payload size metric on shards' `put` operation (#1794) @@ -26,6 +31,14 @@ Changelog for FrostFS Node - Set flag `mode` required for `frostfs-cli control shards set-mode` (#8) - Fix `dirty` suffix in debian package version (#53) - Prevent node process from killing by systemd when shutting down (#1465) +- Restore subscriptions correctly on morph client switch (#2212) +- Expired objects could be returned if not marked with GC yet (#2213) +- `neofs-adm morph dump-hashes` now properly iterates over custom domain (#2224) +- Possible deadlock in write-cache (#2239) +- Fix `*_req_count` and `*_req_count_success` metric values (#2241) +- Storage ID update by write-cache (#2244) +- `neo-go` client deadlock on subscription restoration (#2244) +- Possible panic during write-cache initialization (#2234) ### Removed ### Updated @@ -38,9 +51,15 @@ Changelog for FrostFS Node - Minimum go version to v1.18 ### Updating from v0.35.0 +<<<<<<< HEAD You need to change configuration environment variables to `FROSTFS_*` if you use any. +||||||| parent of 00afc576d ([#2246] node: Allow to configure tombsone lifetime) +======= +New config field `object.delete.tombstone_lifetime` allows to set tombstone lifetime +more appropriate for a specific deployment. +>>>>>>> 00afc576d ([#2246] node: Allow to configure tombsone lifetime) -## [0.35.0] - 2022-12-28 - Sindo (신도) +## [0.35.0] - 2022-12-28 - Sindo (신도, 信島) ### Added - `morph list-containers` in `neofs-adm` (#1689) @@ -124,7 +143,6 @@ You need to change configuration environment variables to `FROSTFS_*` if you use - `spf13/viper` to `v1.8.0` - `google.golang.org/grpc` to `v1.50.1` - ### Updating from v0.34.0 Pass CID and OID parameters via the `--cid` and `--oid` flags, not as the command arguments. @@ -138,9 +156,9 @@ to match the container owner. Use `--force` (`-f`) flag to bypass this requireme Tree service network replication can now be fine-tuned with `tree.replication_timeout` config field. - ## [0.34.0] - 2022-10-31 - Marado (마라도, 馬羅島) +## [0.34.0] - 2022-10-31 - Marado (마라도, 馬羅島) -# ## Added +### Added - `--timeout` flag in `neofs-cli control` commands (#1917) - Document shard modes of operation (#1909) - `tree list` CLI command (#1332) diff --git a/cmd/frostfs-adm/internal/modules/morph/dump_hashes.go b/cmd/frostfs-adm/internal/modules/morph/dump_hashes.go index 4b7d579d1..030542fb9 100644 --- a/cmd/frostfs-adm/internal/modules/morph/dump_hashes.go +++ b/cmd/frostfs-adm/internal/modules/morph/dump_hashes.go @@ -2,6 +2,7 @@ package morph import ( "bytes" + "errors" "fmt" "strings" "text/tabwriter" @@ -107,31 +108,30 @@ func dumpContractHashes(cmd *cobra.Command, _ []string) error { func dumpCustomZoneHashes(cmd *cobra.Command, nnsHash util.Uint160, zone string, c Client) error { const nnsMaxTokens = 100 - inv := invoker.New(c, nil) - arr, err := unwrap.Array(inv.CallAndExpandIterator(nnsHash, "tokens", nnsMaxTokens)) - if err != nil { - return fmt.Errorf("can't get a list of NNS domains: %w", err) - } + inv := invoker.New(c, nil) if !strings.HasPrefix(zone, ".") { zone = "." + zone } var infos []contractDumpInfo - for i := range arr { - bs, err := arr[i].TryBytes() + processItem := func(item stackitem.Item) { + bs, err := item.TryBytes() if err != nil { - continue + cmd.PrintErrf("Invalid NNS record: %v\n", err) + return } if !bytes.HasSuffix(bs, []byte(zone)) { - continue + // Related https://github.com/nspcc-dev/neofs-contract/issues/316. + return } h, err := nnsResolveHash(inv, nnsHash, string(bs)) if err != nil { - continue + cmd.PrintErrf("Could not resolve name %s: %v\n", string(bs), err) + return } infos = append(infos, contractDumpInfo{ @@ -140,6 +140,39 @@ func dumpCustomZoneHashes(cmd *cobra.Command, nnsHash util.Uint160, zone string, }) } + sessionID, iter, err := unwrap.SessionIterator(inv.Call(nnsHash, "tokens")) + if err != nil { + if errors.Is(err, unwrap.ErrNoSessionID) { + items, err := unwrap.Array(inv.CallAndExpandIterator(nnsHash, "tokens", nnsMaxTokens)) + if err != nil { + return fmt.Errorf("can't get a list of NNS domains: %w", err) + } + if len(items) == nnsMaxTokens { + cmd.PrintErrln("Provided RPC endpoint doesn't support sessions, some hashes might be lost.") + } + for i := range items { + processItem(items[i]) + } + } else { + return err + } + } else { + defer func() { + _ = inv.TerminateSession(sessionID) + }() + + items, err := inv.TraverseIterator(sessionID, &iter, nnsMaxTokens) + for err == nil && len(items) != 0 { + for i := range items { + processItem(items[i]) + } + items, err = inv.TraverseIterator(sessionID, &iter, nnsMaxTokens) + } + if err != nil { + return fmt.Errorf("error during NNS domains iteration: %w", err) + } + } + fillContractVersion(cmd, c, infos) printContractInfo(cmd, infos) diff --git a/cmd/frostfs-cli/internal/client/client.go b/cmd/frostfs-cli/internal/client/client.go index fcfa2b46b..718110322 100644 --- a/cmd/frostfs-cli/internal/client/client.go +++ b/cmd/frostfs-cli/internal/client/client.go @@ -404,8 +404,7 @@ func PutObject(prm PutObjectPrm) (*PutObjectRes, error) { } if prm.rdr != nil { - // TODO: (neofs-node#1198) explore better values or configure it - const defaultBufferSizePut = 4096 + const defaultBufferSizePut = 3 << 20 // Maximum chunk size is 3 MiB in the SDK. if sz == 0 || sz > defaultBufferSizePut { sz = defaultBufferSizePut diff --git a/cmd/frostfs-ir/main.go b/cmd/frostfs-ir/main.go index 55f1ae7cc..f2afd7b8a 100644 --- a/cmd/frostfs-ir/main.go +++ b/cmd/frostfs-ir/main.go @@ -126,7 +126,7 @@ func initHTTPServers(cfg *viper.Viper, log *logger.Logger) []*httputil.Server { addr := cfg.GetString(item.cfgPrefix + ".address") - var prm httputil.Prm + var prm httputil.HTTPSrvPrm prm.Address = addr prm.Handler = item.handler() diff --git a/cmd/frostfs-node/closer.go b/cmd/frostfs-node/closer.go new file mode 100644 index 000000000..b370f56f9 --- /dev/null +++ b/cmd/frostfs-node/closer.go @@ -0,0 +1,25 @@ +package main + +type closer struct { + name string + fn func() +} + +func getCloser(c *cfg, name string) *closer { + for _, clsr := range c.closers { + if clsr.name == name { + return &clsr + } + } + return nil +} + +func delCloser(c *cfg, name string) { + for i, clsr := range c.closers { + if clsr.name == name { + c.closers[i] = c.closers[len(c.closers)-1] + c.closers = c.closers[:len(c.closers)-1] + return + } + } +} diff --git a/cmd/frostfs-node/config.go b/cmd/frostfs-node/config.go index d20e9b1cc..75cdfd7fa 100644 --- a/cmd/frostfs-node/config.go +++ b/cmd/frostfs-node/config.go @@ -24,7 +24,6 @@ import ( blobovniczaconfig "github.com/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/blobstor/blobovnicza" fstreeconfig "github.com/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/blobstor/fstree" loggerconfig "github.com/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/logger" - metricsconfig "github.com/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/metrics" nodeconfig "github.com/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/node" objectconfig "github.com/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/object" replicatorconfig "github.com/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/replicator" @@ -48,6 +47,7 @@ import ( "github.com/TrueCloudLab/frostfs-node/pkg/network" "github.com/TrueCloudLab/frostfs-node/pkg/network/cache" "github.com/TrueCloudLab/frostfs-node/pkg/services/control" + objectService "github.com/TrueCloudLab/frostfs-node/pkg/services/object" getsvc "github.com/TrueCloudLab/frostfs-node/pkg/services/object/get" "github.com/TrueCloudLab/frostfs-node/pkg/services/object_manager/tombstone" tsourse "github.com/TrueCloudLab/frostfs-node/pkg/services/object_manager/tombstone/source" @@ -308,7 +308,7 @@ type internals struct { wg *sync.WaitGroup workers []worker - closers []func() + closers []closer apiVersion version.Version healthStatus *atomic.Int32 @@ -342,9 +342,10 @@ type shared struct { privateTokenStore sessionStorage persistate *state.PersistentStorage - clientCache *cache.ClientCache - bgClientCache *cache.ClientCache - localAddr network.AddressGroup + clientCache *cache.ClientCache + bgClientCache *cache.ClientCache + putClientCache *cache.ClientCache + localAddr network.AddressGroup key *keys.PrivateKey binPublicKey []byte @@ -363,12 +364,16 @@ type shared struct { treeService *tree.Service metricsCollector *metrics.NodeMetrics + + metricsSvc *objectService.MetricCollector } // dynamicConfiguration stores parameters of the // components that supports runtime reconfigurations. type dynamicConfiguration struct { - logger *logger.Prm + logger *logger.Prm + pprof *httpComponent + metrics *httpComponent } type cfg struct { @@ -472,6 +477,8 @@ type cfgObject struct { pool cfgObjectRoutines cfgLocalStorage cfgLocalStorage + + tombstoneLifetime uint64 } type cfgNotifications struct { @@ -568,13 +575,14 @@ func initCfg(appCfg *config.Config) *cfg { ReconnectTimeout: apiclientconfig.ReconnectTimeout(appCfg), } c.shared = shared{ - key: key, - binPublicKey: key.PublicKey().Bytes(), - localAddr: netAddr, - respSvc: response.NewService(response.WithNetworkState(netState)), - clientCache: cache.NewSDKClientCache(cacheOpts), - bgClientCache: cache.NewSDKClientCache(cacheOpts), - persistate: persistate, + key: key, + binPublicKey: key.PublicKey().Bytes(), + localAddr: netAddr, + respSvc: response.NewService(response.WithNetworkState(netState)), + clientCache: cache.NewSDKClientCache(cacheOpts), + bgClientCache: cache.NewSDKClientCache(cacheOpts), + putClientCache: cache.NewSDKClientCache(cacheOpts), + persistate: persistate, } c.cfgAccounting = cfgAccounting{ scriptHash: contractsconfig.Balance(appCfg), @@ -598,7 +606,8 @@ func initCfg(appCfg *config.Config) *cfg { proxyScriptHash: contractsconfig.Proxy(appCfg), } c.cfgObject = cfgObject{ - pool: initObjectPool(appCfg), + pool: initObjectPool(appCfg), + tombstoneLifetime: objectconfig.TombstoneLifetime(appCfg), } c.cfgReputation = cfgReputation{ scriptHash: contractsconfig.Reputation(appCfg), @@ -607,13 +616,12 @@ func initCfg(appCfg *config.Config) *cfg { user.IDFromKey(&c.ownerIDFromKey, key.PrivateKey.PublicKey) - if metricsconfig.Enabled(c.appCfg) { - c.metricsCollector = metrics.NewNodeMetrics() - netState.metrics = c.metricsCollector - } + c.metricsCollector = metrics.NewNodeMetrics() + netState.metrics = c.metricsCollector - c.onShutdown(c.clientCache.CloseAll) // clean up connections - c.onShutdown(c.bgClientCache.CloseAll) // clean up connections + c.onShutdown(c.clientCache.CloseAll) // clean up connections + c.onShutdown(c.bgClientCache.CloseAll) // clean up connections + c.onShutdown(c.putClientCache.CloseAll) // clean up connections c.onShutdown(func() { _ = c.persistate.Close() }) return c @@ -790,13 +798,18 @@ func initLocalStorage(c *cfg) { tombstone.WithTombstoneSource(tombstoneSrc), ) + var shardsAttached int for _, optsWithMeta := range c.shardOpts() { id, err := ls.AddShard(append(optsWithMeta.shOpts, shard.WithTombstoneSource(tombstoneSource))...) - fatalOnErr(err) - - c.log.Info("shard attached to engine", - zap.Stringer("id", id), - ) + if err != nil { + c.log.Error("failed to attach shard to engine", zap.Error(err)) + } else { + shardsAttached++ + c.log.Info("shard attached to engine", zap.Stringer("id", id)) + } + } + if shardsAttached == 0 { + fatalOnErr(engineconfig.ErrNoShardConfigured) } c.cfgObject.cfgLocalStorage.localStorage = ls @@ -904,11 +917,9 @@ func (c *cfg) ObjectServiceLoad() float64 { return float64(c.cfgObject.pool.putRemote.Running()) / float64(c.cfgObject.pool.putRemoteCapacity) } -type dCfg struct { - name string - cfg interface { - Reload() error - } +type dCmp struct { + name string + reloadFunc func() error } func (c *cfg) signalWatcher() { @@ -953,7 +964,7 @@ func (c *cfg) reloadConfig() { // all the components are expected to support // Logger's dynamic reconfiguration approach - var components []dCfg + var components []dCmp // Logger @@ -963,7 +974,18 @@ func (c *cfg) reloadConfig() { return } - components = append(components, dCfg{name: "logger", cfg: logPrm}) + components = append(components, dCmp{"logger", logPrm.Reload}) + if cmp, updated := metricsComponent(c); updated { + if cmp.enabled { + cmp.preReload = enableMetricsSvc + } else { + cmp.preReload = disableMetricsSvc + } + components = append(components, dCmp{cmp.name, cmp.reload}) + } + if cmp, updated := pprofComponent(c); updated { + components = append(components, dCmp{cmp.name, cmp.reload}) + } // Storage Engine @@ -979,7 +1001,7 @@ func (c *cfg) reloadConfig() { } for _, component := range components { - err = component.cfg.Reload() + err = component.reloadFunc() if err != nil { c.log.Error("updated configuration applying", zap.String("component", component.name), @@ -995,7 +1017,7 @@ func (c *cfg) shutdown() { c.ctxCancel() for i := range c.closers { - c.closers[len(c.closers)-1-i]() + c.closers[len(c.closers)-1-i].fn() } close(c.internalErr) } diff --git a/cmd/frostfs-node/config/object/config_test.go b/cmd/frostfs-node/config/object/config_test.go index db7c1d62e..5ff504171 100644 --- a/cmd/frostfs-node/config/object/config_test.go +++ b/cmd/frostfs-node/config/object/config_test.go @@ -14,12 +14,14 @@ func TestObjectSection(t *testing.T) { empty := configtest.EmptyConfig() require.Equal(t, objectconfig.PutPoolSizeDefault, objectconfig.Put(empty).PoolSizeRemote()) + require.EqualValues(t, objectconfig.DefaultTombstoneLifetime, objectconfig.TombstoneLifetime(empty)) }) const path = "../../../../config/example/node" var fileConfigTest = func(c *config.Config) { require.Equal(t, 100, objectconfig.Put(c).PoolSizeRemote()) + require.EqualValues(t, 10, objectconfig.TombstoneLifetime(c)) } configtest.ForEachFileType(path, fileConfigTest) diff --git a/cmd/frostfs-node/config/object/delete.go b/cmd/frostfs-node/config/object/delete.go new file mode 100644 index 000000000..d2cd2aff4 --- /dev/null +++ b/cmd/frostfs-node/config/object/delete.go @@ -0,0 +1,19 @@ +package objectconfig + +import "github.com/TrueCloudLab/frostfs-node/cmd/frostfs-node/config" + +const ( + deleteSubsection = "delete" + + // DefaultTombstoneLifetime is the default value of tombstone lifetime in epochs. + DefaultTombstoneLifetime = 5 +) + +// TombstoneLifetime returns the value of `tombstone_lifetime` config parameter. +func TombstoneLifetime(c *config.Config) uint64 { + ts := config.UintSafe(c.Sub(subsection).Sub(deleteSubsection), "tombstone_lifetime") + if ts <= 0 { + return DefaultTombstoneLifetime + } + return ts +} diff --git a/cmd/frostfs-node/httpcomponent.go b/cmd/frostfs-node/httpcomponent.go new file mode 100644 index 000000000..7410b8078 --- /dev/null +++ b/cmd/frostfs-node/httpcomponent.go @@ -0,0 +1,70 @@ +package main + +import ( + "context" + "fmt" + "net/http" + "time" + + httputil "github.com/TrueCloudLab/frostfs-node/pkg/util/http" +) + +type httpComponent struct { + address string + name string + handler http.Handler + shutdownDur time.Duration + enabled bool + cfg *cfg + preReload func(c *cfg) +} + +func (cmp *httpComponent) init(c *cfg) { + if !cmp.enabled { + c.log.Info(fmt.Sprintf("%s is disabled", cmp.name)) + return + } + // Init server with parameters + srv := httputil.New( + *httputil.NewHTTPSrvPrm( + cmp.address, + cmp.handler, + ), + httputil.WithShutdownTimeout( + cmp.shutdownDur, + ), + ) + c.closers = append(c.closers, closer{ + cmp.name, + func() { stopAndLog(c, cmp.name, srv.Shutdown) }, + }) + c.workers = append(c.workers, worker{ + cmp.name, + func(ctx context.Context) { + runAndLog(c, cmp.name, false, func(c *cfg) { + fatalOnErr(srv.Serve()) + }) + }, + }) +} + +func (cmp *httpComponent) reload() error { + if cmp.preReload != nil { + cmp.preReload(cmp.cfg) + } + // Shutdown server + closer := getCloser(cmp.cfg, cmp.name) + if closer != nil { + closer.fn() + } + // Cleanup + delCloser(cmp.cfg, cmp.name) + delWorker(cmp.cfg, cmp.name) + // Init server with new parameters + cmp.init(cmp.cfg) + // Start worker + if cmp.enabled { + startWorker(cmp.cfg, *getWorker(cmp.cfg, cmp.name)) + } + return nil +} diff --git a/cmd/frostfs-node/main.go b/cmd/frostfs-node/main.go index cdc3c94b3..d6345d3b8 100644 --- a/cmd/frostfs-node/main.go +++ b/cmd/frostfs-node/main.go @@ -81,8 +81,10 @@ func initApp(c *cfg) { c.wg.Done() }() - initAndLog(c, "pprof", initProfiler) - initAndLog(c, "prometheus", initMetrics) + pprof, _ := pprofComponent(c) + metrics, _ := metricsComponent(c) + initAndLog(c, pprof.name, pprof.init) + initAndLog(c, metrics.name, metrics.init) initLocalStorage(c) @@ -114,6 +116,19 @@ func runAndLog(c *cfg, name string, logSuccess bool, starter func(*cfg)) { } } +func stopAndLog(c *cfg, name string, stopper func() error) { + c.log.Debug(fmt.Sprintf("shutting down %s service", name)) + + err := stopper() + if err != nil { + c.log.Debug(fmt.Sprintf("could not shutdown %s server", name), + zap.String("error", err.Error()), + ) + } + + c.log.Debug(fmt.Sprintf("%s service has been stopped", name)) +} + func bootUp(c *cfg) { runAndLog(c, "NATS", true, connectNats) runAndLog(c, "gRPC", false, serveGRPC) @@ -135,5 +150,5 @@ func wait(c *cfg) { } func (c *cfg) onShutdown(f func()) { - c.closers = append(c.closers, f) + c.closers = append(c.closers, closer{"", f}) } diff --git a/cmd/frostfs-node/metrics.go b/cmd/frostfs-node/metrics.go index 193b71d3d..249c7e9c0 100644 --- a/cmd/frostfs-node/metrics.go +++ b/cmd/frostfs-node/metrics.go @@ -1,47 +1,45 @@ package main import ( - "context" - metricsconfig "github.com/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/metrics" - httputil "github.com/TrueCloudLab/frostfs-node/pkg/util/http" "github.com/prometheus/client_golang/prometheus/promhttp" - "go.uber.org/zap" ) -func initMetrics(c *cfg) { - if !metricsconfig.Enabled(c.appCfg) { - c.log.Info("prometheus is disabled") - return +func metricsComponent(c *cfg) (*httpComponent, bool) { + var updated bool + // check if it has been inited before + if c.dynamicConfiguration.metrics == nil { + c.dynamicConfiguration.metrics = new(httpComponent) + c.dynamicConfiguration.metrics.cfg = c + c.dynamicConfiguration.metrics.name = "metrics" + c.dynamicConfiguration.metrics.handler = promhttp.Handler() + updated = true } - var prm httputil.Prm + // (re)init read configuration + enabled := metricsconfig.Enabled(c.appCfg) + if enabled != c.dynamicConfiguration.metrics.enabled { + c.dynamicConfiguration.metrics.enabled = enabled + updated = true + } + address := metricsconfig.Address(c.appCfg) + if address != c.dynamicConfiguration.metrics.address { + c.dynamicConfiguration.metrics.address = address + updated = true + } + dur := metricsconfig.ShutdownTimeout(c.appCfg) + if dur != c.dynamicConfiguration.metrics.shutdownDur { + c.dynamicConfiguration.metrics.shutdownDur = dur + updated = true + } - prm.Address = metricsconfig.Address(c.appCfg) - prm.Handler = promhttp.Handler() - - srv := httputil.New(prm, - httputil.WithShutdownTimeout( - metricsconfig.ShutdownTimeout(c.appCfg), - ), - ) - - c.workers = append(c.workers, newWorkerFromFunc(func(context.Context) { - runAndLog(c, "metrics", false, func(c *cfg) { - fatalOnErr(srv.Serve()) - }) - })) - - c.closers = append(c.closers, func() { - c.log.Debug("shutting down metrics service") - - err := srv.Shutdown() - if err != nil { - c.log.Debug("could not shutdown metrics server", - zap.String("error", err.Error()), - ) - } - - c.log.Debug("metrics service has been stopped") - }) + return c.dynamicConfiguration.metrics, updated +} + +func enableMetricsSvc(c *cfg) { + c.shared.metricsSvc.Enable() +} + +func disableMetricsSvc(c *cfg) { + c.shared.metricsSvc.Disable() } diff --git a/cmd/frostfs-node/object.go b/cmd/frostfs-node/object.go index 49a2394ce..602e8e25f 100644 --- a/cmd/frostfs-node/object.go +++ b/cmd/frostfs-node/object.go @@ -8,6 +8,7 @@ import ( "github.com/TrueCloudLab/frostfs-api-go/v2/object" objectGRPC "github.com/TrueCloudLab/frostfs-api-go/v2/object/grpc" + metricsconfig "github.com/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/metrics" policerconfig "github.com/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/policer" replicatorconfig "github.com/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/replicator" coreclient "github.com/TrueCloudLab/frostfs-node/pkg/core/client" @@ -182,6 +183,14 @@ func initObjectService(c *cfg) { basicConstructor: c.clientCache, } + putConstructor := &coreClientConstructor{ + log: c.log, + nmSrc: c.netMapSource, + netState: c.cfgNetmap.state, + trustStorage: c.cfgReputation.localTrustStorage, + basicConstructor: c.putClientCache, + } + var irFetcher v2.InnerRingFetcher if c.cfgMorph.client.ProbeNotary() { @@ -238,7 +247,11 @@ func initObjectService(c *cfg) { traverseGen := util.NewTraverserGenerator(c.netMapSource, c.cfgObject.cnrSource, c) - c.workers = append(c.workers, pol) + c.workers = append(c.workers, worker{ + fn: func(ctx context.Context) { + pol.Run(ctx) + }, + }) var os putsvc.ObjectStorage = engineWithoutNotifications{ engine: ls, @@ -255,7 +268,7 @@ func initObjectService(c *cfg) { sPut := putsvc.NewService( putsvc.WithKeyStorage(keyStorage), - putsvc.WithClientConstructor(coreConstructor), + putsvc.WithClientConstructor(putConstructor), putsvc.WithMaxSizeSource(newCachedMaxObjectSizeSource(c)), putsvc.WithObjectStorage(os), putsvc.WithContainerSource(c.cfgObject.cnrSource), @@ -316,7 +329,7 @@ func initObjectService(c *cfg) { deletesvc.WithPutService(sPut), deletesvc.WithNetworkInfo(&delNetInfo{ State: c.cfgNetmap.state, - tsLifetime: 5, + tsLifetime: c.cfgObject.tombstoneLifetime, cfg: c, }), @@ -372,12 +385,9 @@ func initObjectService(c *cfg) { respSvc, ) - var firstSvc objectService.ServiceServer = signSvc - if c.metricsCollector != nil { - firstSvc = objectService.NewMetricCollector(signSvc, c.metricsCollector) - } - - server := objectTransportGRPC.New(firstSvc) + c.shared.metricsSvc = objectService.NewMetricCollector( + signSvc, c.metricsCollector, metricsconfig.Enabled(c.appCfg)) + server := objectTransportGRPC.New(c.shared.metricsSvc) for _, srv := range c.cfgGRPC.servers { objectGRPC.RegisterObjectServiceServer(srv, server) diff --git a/cmd/frostfs-node/pprof.go b/cmd/frostfs-node/pprof.go index 73461879c..02d376496 100644 --- a/cmd/frostfs-node/pprof.go +++ b/cmd/frostfs-node/pprof.go @@ -1,46 +1,37 @@ package main import ( - "context" - profilerconfig "github.com/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/profiler" httputil "github.com/TrueCloudLab/frostfs-node/pkg/util/http" - "go.uber.org/zap" ) -func initProfiler(c *cfg) { - if !profilerconfig.Enabled(c.appCfg) { - c.log.Info("pprof is disabled") - return +func pprofComponent(c *cfg) (*httpComponent, bool) { + var updated bool + // check if it has been inited before + if c.dynamicConfiguration.pprof == nil { + c.dynamicConfiguration.pprof = new(httpComponent) + c.dynamicConfiguration.pprof.cfg = c + c.dynamicConfiguration.pprof.name = "pprof" + c.dynamicConfiguration.pprof.handler = httputil.Handler() + updated = true } - var prm httputil.Prm + // (re)init read configuration + enabled := profilerconfig.Enabled(c.appCfg) + if enabled != c.dynamicConfiguration.pprof.enabled { + c.dynamicConfiguration.pprof.enabled = enabled + updated = true + } + address := profilerconfig.Address(c.appCfg) + if address != c.dynamicConfiguration.pprof.address { + c.dynamicConfiguration.pprof.address = address + updated = true + } + dur := profilerconfig.ShutdownTimeout(c.appCfg) + if dur != c.dynamicConfiguration.pprof.shutdownDur { + c.dynamicConfiguration.pprof.shutdownDur = dur + updated = true + } - prm.Address = profilerconfig.Address(c.appCfg) - prm.Handler = httputil.Handler() - - srv := httputil.New(prm, - httputil.WithShutdownTimeout( - profilerconfig.ShutdownTimeout(c.appCfg), - ), - ) - - c.workers = append(c.workers, newWorkerFromFunc(func(context.Context) { - runAndLog(c, "profiler", false, func(c *cfg) { - fatalOnErr(srv.Serve()) - }) - })) - - c.closers = append(c.closers, func() { - c.log.Debug("shutting down profiling service") - - err := srv.Shutdown() - if err != nil { - c.log.Debug("could not shutdown pprof server", - zap.String("error", err.Error()), - ) - } - - c.log.Debug("profiling service has been stopped") - }) + return c.dynamicConfiguration.pprof, updated } diff --git a/cmd/frostfs-node/worker.go b/cmd/frostfs-node/worker.go index eebce73c2..21780e04f 100644 --- a/cmd/frostfs-node/worker.go +++ b/cmd/frostfs-node/worker.go @@ -4,31 +4,46 @@ import ( "context" ) -type worker interface { - Run(context.Context) -} - -type workerFromFunc struct { - fn func(context.Context) +type worker struct { + name string + fn func(context.Context) } func newWorkerFromFunc(fn func(ctx context.Context)) worker { - return &workerFromFunc{ + return worker{ fn: fn, } } -func (w *workerFromFunc) Run(ctx context.Context) { - w.fn(ctx) -} - func startWorkers(c *cfg) { for _, wrk := range c.workers { - c.wg.Add(1) - - go func(w worker) { - w.Run(c.ctx) - c.wg.Done() - }(wrk) + startWorker(c, wrk) } } + +func startWorker(c *cfg, wrk worker) { + c.wg.Add(1) + + go func(w worker) { + w.fn(c.ctx) + c.wg.Done() + }(wrk) +} + +func delWorker(c *cfg, name string) { + for i, worker := range c.workers { + if worker.name == name { + c.workers = append(c.workers[:i], c.workers[i+1:]...) + return + } + } +} + +func getWorker(c *cfg, name string) *worker { + for _, wrk := range c.workers { + if wrk.name == name { + return &wrk + } + } + return nil +} diff --git a/config/example/node.env b/config/example/node.env index 8901ffb11..ae51d473e 100644 --- a/config/example/node.env +++ b/config/example/node.env @@ -85,6 +85,7 @@ FROSTFS_REPLICATOR_POOL_SIZE=10 # Object service section FROSTFS_OBJECT_PUT_POOL_SIZE_REMOTE=100 +FROSTFS_OBJECT_DELETE_TOMBSTONE_LIFETIME=10 # Storage engine section FROSTFS_STORAGE_SHARD_POOL_SIZE=15 diff --git a/config/example/node.json b/config/example/node.json index b74b04ee9..2930bf9d1 100644 --- a/config/example/node.json +++ b/config/example/node.json @@ -128,6 +128,9 @@ "put_timeout": "15s" }, "object": { + "delete": { + "tombstone_lifetime": 10 + }, "put": { "pool_size_remote": 100 } diff --git a/config/example/node.yaml b/config/example/node.yaml index ed0ae47ac..110e54206 100644 --- a/config/example/node.yaml +++ b/config/example/node.yaml @@ -107,6 +107,8 @@ replicator: pool_size: 10 # maximum amount of concurrent replications object: + delete: + tombstone_lifetime: 10 # tombstone "local" lifetime in epochs put: pool_size_remote: 100 # number of async workers for remote PUT operations diff --git a/docs/storage-node-configuration.md b/docs/storage-node-configuration.md index c41aebc32..b0f79c257 100644 --- a/docs/storage-node-configuration.md +++ b/docs/storage-node-configuration.md @@ -415,7 +415,7 @@ replicator: | `pool_size` | `int` | Equal to `object.put.pool_size_remote` | Maximum amount of concurrent replications. | # `object` section -Contains pool sizes for object operations with remote nodes. +Contains object-service related parameters. ```yaml object: @@ -423,6 +423,7 @@ object: pool_size_remote: 100 ``` -| Parameter | Type | Default value | Description | -|------------------------|-------|---------------|------------------------------------------------------------------------------------------------| -| `put.pool_size_remote` | `int` | `10` | Max pool size for performing remote `PUT` operations. Used by Policer and Replicator services. | +| Parameter | Type | Default value | Description | +|-----------------------------|-------|---------------|------------------------------------------------------------------------------------------------| +| `delete.tombstone_lifetime` | `int` | `5` | Tombstone lifetime for removed objects in epochs. | +| `put.pool_size_remote` | `int` | `10` | Max pool size for performing remote `PUT` operations. Used by Policer and Replicator services. | diff --git a/pkg/core/client/client.go b/pkg/core/client/client.go index 0f6697882..f07dedc06 100644 --- a/pkg/core/client/client.go +++ b/pkg/core/client/client.go @@ -33,6 +33,8 @@ type MultiAddressClient interface { // RawForAddress must return rawclient.Client // for the passed network.Address. RawForAddress(network.Address, func(cli *rawclient.Client) error) error + + ReportError(error) } // NodeInfo groups information about a FrostFS storage node needed for Client construction. diff --git a/pkg/local_object_storage/blobstor/fstree/fstree.go b/pkg/local_object_storage/blobstor/fstree/fstree.go index 7dd8ca3da..8f468e293 100644 --- a/pkg/local_object_storage/blobstor/fstree/fstree.go +++ b/pkg/local_object_storage/blobstor/fstree/fstree.go @@ -7,6 +7,7 @@ import ( "io/fs" "os" "path/filepath" + "strconv" "strings" "syscall" @@ -230,23 +231,63 @@ func (t *FSTree) Put(prm common.PutPrm) (common.PutRes, error) { prm.RawData = t.Compress(prm.RawData) } - tmpPath := p + "#" - err := t.writeFile(tmpPath, prm.RawData) + // Here is a situation: + // Feb 09 13:10:37 buky neofs-node[32445]: 2023-02-09T13:10:37.161Z info log/log.go:13 local object storage operation {"shard_id": "SkT8BfjouW6t93oLuzQ79s", "address": "7NxFz4SruSi8TqXacr2Ae22nekMhgYk1sfkddJo9PpWk/5enyUJGCyU1sfrURDnHEjZFdbGqANVhayYGfdSqtA6wA", "op": "PUT", "type": "fstree", "storage_id": ""} + // Feb 09 13:10:37 buky neofs-node[32445]: 2023-02-09T13:10:37.183Z info log/log.go:13 local object storage operation {"shard_id": "SkT8BfjouW6t93oLuzQ79s", "address": "7NxFz4SruSi8TqXacr2Ae22nekMhgYk1sfkddJo9PpWk/5enyUJGCyU1sfrURDnHEjZFdbGqANVhayYGfdSqtA6wA", "op": "metabase PUT"} + // Feb 09 13:10:37 buky neofs-node[32445]: 2023-02-09T13:10:37.862Z debug policer/check.go:231 shortage of object copies detected {"component": "Object Policer", "object": "7NxFz4SruSi8TqXacr2Ae22nekMhgYk1sfkddJo9PpWk/5enyUJGCyU1sfrURDnHEjZFdbGqANVhayYGfdSqtA6wA", "shortage": 1} + // Feb 09 13:10:37 buky neofs-node[32445]: 2023-02-09T13:10:37.862Z debug shard/get.go:124 object is missing in write-cache {"shard_id": "SkT8BfjouW6t93oLuzQ79s", "addr": "7NxFz4SruSi8TqXacr2Ae22nekMhgYk1sfkddJo9PpWk/5enyUJGCyU1sfrURDnHEjZFdbGqANVhayYGfdSqtA6wA", "skip_meta": false} + // + // 1. We put an object on node 1. + // 2. Relentless policer sees that it has only 1 copy and tries to PUT it to node 2. + // 3. PUT operation started by client at (1) also puts an object here. + // 4. Now we have concurrent writes and one of `Rename` calls will return `no such file` error. + // Even more than that, concurrent writes can corrupt data. + // + // So here is a solution: + // 1. Write a file to 'name + 1'. + // 2. If it exists, retry with temporary name being 'name + 2'. + // 3. Set some reasonable number of attempts. + // + // It is a bit kludgey, but I am unusually proud about having found this out after + // hours of research on linux kernel, dirsync mount option and ext4 FS, turned out + // to be so hecking simple. + // In a very rare situation we can have multiple partially written copies on disk, + // this will be fixed in another issue (we should remove garbage on start). + const retryCount = 5 + for i := 0; i < retryCount; i++ { + tmpPath := p + "#" + strconv.FormatUint(uint64(i), 10) + err := t.writeAndRename(tmpPath, p, prm.RawData) + if err != syscall.EEXIST || i == retryCount-1 { + return common.PutRes{StorageID: []byte{}}, err + } + } + + // unreachable, but precaution never hurts, especially 1 day before release. + return common.PutRes{StorageID: []byte{}}, fmt.Errorf("couldn't read file after %d retries", retryCount) +} + +// writeAndRename opens tmpPath exclusively, writes data to it and renames it to p. +func (t *FSTree) writeAndRename(tmpPath, p string, data []byte) error { + err := t.writeFile(tmpPath, data) if err != nil { var pe *fs.PathError - if errors.As(err, &pe) && pe.Err == syscall.ENOSPC { - err = common.ErrNoSpace - _ = os.RemoveAll(tmpPath) + if errors.As(err, &pe) { + switch pe.Err { + case syscall.ENOSPC: + err = common.ErrNoSpace + _ = os.RemoveAll(tmpPath) + case syscall.EEXIST: + return syscall.EEXIST + } } } else { err = os.Rename(tmpPath, p) } - - return common.PutRes{StorageID: []byte{}}, err + return err } func (t *FSTree) writeFlags() int { - flags := os.O_WRONLY | os.O_CREATE | os.O_TRUNC + flags := os.O_WRONLY | os.O_CREATE | os.O_TRUNC | os.O_EXCL if t.noSync { return flags } diff --git a/pkg/local_object_storage/engine/control.go b/pkg/local_object_storage/engine/control.go index a56d7bbb2..b9a831359 100644 --- a/pkg/local_object_storage/engine/control.go +++ b/pkg/local_object_storage/engine/control.go @@ -23,27 +23,44 @@ func (e *StorageEngine) Open() error { } func (e *StorageEngine) open() error { - e.mtx.RLock() - defer e.mtx.RUnlock() + e.mtx.Lock() + defer e.mtx.Unlock() var wg sync.WaitGroup - var errCh = make(chan error, len(e.shards)) + var errCh = make(chan shardInitError, len(e.shards)) for id, sh := range e.shards { wg.Add(1) go func(id string, sh *shard.Shard) { defer wg.Done() if err := sh.Open(); err != nil { - errCh <- fmt.Errorf("could not open shard %s: %w", id, err) + errCh <- shardInitError{ + err: err, + id: id, + } } }(id, sh.Shard) } wg.Wait() close(errCh) - for err := range errCh { - if err != nil { - return err + for res := range errCh { + if res.err != nil { + e.log.Error("could not open shard, closing and skipping", + zap.String("id", res.id), + zap.Error(res.err)) + + sh := e.shards[res.id] + delete(e.shards, res.id) + + err := sh.Close() + if err != nil { + e.log.Error("could not close partially initialized shard", + zap.String("id", res.id), + zap.Error(res.err)) + } + + continue } } @@ -76,12 +93,20 @@ func (e *StorageEngine) Init() error { for res := range errCh { if res.err != nil { if errors.Is(res.err, blobstor.ErrInitBlobovniczas) { - delete(e.shards, res.id) - - e.log.Error("shard initialization failure, skipping", + e.log.Error("could not initialize shard, closing and skipping", zap.String("id", res.id), zap.Error(res.err)) + sh := e.shards[res.id] + delete(e.shards, res.id) + + err := sh.Close() + if err != nil { + e.log.Error("could not close partially initialized shard", + zap.String("id", res.id), + zap.Error(res.err)) + } + continue } return fmt.Errorf("could not initialize shard %s: %w", res.id, res.err) diff --git a/pkg/local_object_storage/engine/control_test.go b/pkg/local_object_storage/engine/control_test.go index 50ee1b552..c2d7a3211 100644 --- a/pkg/local_object_storage/engine/control_test.go +++ b/pkg/local_object_storage/engine/control_test.go @@ -7,16 +7,148 @@ import ( "path/filepath" "strconv" "testing" + "time" "github.com/TrueCloudLab/frostfs-node/pkg/core/object" "github.com/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor" meta "github.com/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" + "github.com/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama" "github.com/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard" "github.com/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" + "github.com/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache" + "github.com/TrueCloudLab/frostfs-node/pkg/util/logger" cidtest "github.com/TrueCloudLab/frostfs-sdk-go/container/id/test" "github.com/stretchr/testify/require" + "go.etcd.io/bbolt" + "go.uber.org/zap/zaptest" ) +// TestInitializationFailure checks that shard is initialized and closed even if media +// under any single component is absent. We emulate this with permission denied error. +func TestInitializationFailure(t *testing.T) { + type paths struct { + blobstor string + metabase string + writecache string + pilorama string + } + + existsDir := filepath.Join(t.TempDir(), "shard") + badDir := filepath.Join(t.TempDir(), "missing") + + testShard := func(c paths) []shard.Option { + sid, err := generateShardID() + require.NoError(t, err) + + return []shard.Option{ + shard.WithID(sid), + shard.WithLogger(&logger.Logger{Logger: zaptest.NewLogger(t)}), + shard.WithBlobStorOptions( + blobstor.WithStorages( + newStorages(c.blobstor, 1<<20))), + shard.WithMetaBaseOptions( + meta.WithBoltDBOptions(&bbolt.Options{ + Timeout: 100 * time.Millisecond, + }), + meta.WithPath(c.metabase), + meta.WithPermissions(0700), + meta.WithEpochState(epochState{})), + shard.WithWriteCache(true), + shard.WithWriteCacheOptions(writecache.WithPath(c.writecache)), + shard.WithPiloramaOptions(pilorama.WithPath(c.pilorama)), + } + } + + t.Run("blobstor", func(t *testing.T) { + badDir := filepath.Join(badDir, t.Name()) + require.NoError(t, os.MkdirAll(badDir, os.ModePerm)) + require.NoError(t, os.Chmod(badDir, 0)) + testEngineFailInitAndReload(t, badDir, false, testShard(paths{ + blobstor: filepath.Join(badDir, "0"), + metabase: filepath.Join(existsDir, t.Name(), "1"), + writecache: filepath.Join(existsDir, t.Name(), "2"), + pilorama: filepath.Join(existsDir, t.Name(), "3"), + })) + }) + t.Run("metabase", func(t *testing.T) { + badDir := filepath.Join(badDir, t.Name()) + require.NoError(t, os.MkdirAll(badDir, os.ModePerm)) + require.NoError(t, os.Chmod(badDir, 0)) + testEngineFailInitAndReload(t, badDir, true, testShard(paths{ + blobstor: filepath.Join(existsDir, t.Name(), "0"), + metabase: filepath.Join(badDir, "1"), + writecache: filepath.Join(existsDir, t.Name(), "2"), + pilorama: filepath.Join(existsDir, t.Name(), "3"), + })) + }) + t.Run("write-cache", func(t *testing.T) { + badDir := filepath.Join(badDir, t.Name()) + require.NoError(t, os.MkdirAll(badDir, os.ModePerm)) + require.NoError(t, os.Chmod(badDir, 0)) + testEngineFailInitAndReload(t, badDir, false, testShard(paths{ + blobstor: filepath.Join(existsDir, t.Name(), "0"), + metabase: filepath.Join(existsDir, t.Name(), "1"), + writecache: filepath.Join(badDir, "2"), + pilorama: filepath.Join(existsDir, t.Name(), "3"), + })) + }) + t.Run("pilorama", func(t *testing.T) { + badDir := filepath.Join(badDir, t.Name()) + require.NoError(t, os.MkdirAll(badDir, os.ModePerm)) + require.NoError(t, os.Chmod(badDir, 0)) + testEngineFailInitAndReload(t, badDir, false, testShard(paths{ + blobstor: filepath.Join(existsDir, t.Name(), "0"), + metabase: filepath.Join(existsDir, t.Name(), "1"), + writecache: filepath.Join(existsDir, t.Name(), "2"), + pilorama: filepath.Join(badDir, "3"), + })) + }) +} + +func testEngineFailInitAndReload(t *testing.T, badDir string, errOnAdd bool, s []shard.Option) { + var configID string + + e := New() + _, err := e.AddShard(s...) + if errOnAdd { + require.Error(t, err) + // This branch is only taken when we cannot update shard ID in the metabase. + // The id cannot be encountered during normal operation, but it is ok for tests: + // it is only compared for equality with other ids and we have 0 shards here. + configID = "id" + } else { + require.NoError(t, err) + + e.mtx.RLock() + var id string + for id = range e.shards { + break + } + configID = calculateShardID(e.shards[id].Shard.DumpInfo()) + e.mtx.RUnlock() + + err = e.Open() + if err == nil { + require.Error(t, e.Init()) + } + } + + e.mtx.RLock() + shardCount := len(e.shards) + e.mtx.RUnlock() + require.Equal(t, 0, shardCount) + + require.NoError(t, os.Chmod(badDir, os.ModePerm)) + require.NoError(t, e.Reload(ReConfiguration{ + shards: map[string][]shard.Option{configID: s}, + })) + + e.mtx.RLock() + shardCount = len(e.shards) + e.mtx.RUnlock() + require.Equal(t, 1, shardCount) +} + func TestExecBlocks(t *testing.T) { e := testNewEngineWithShardNum(t, 2) // number doesn't matter in this test, 2 is several but not many t.Cleanup(func() { diff --git a/pkg/local_object_storage/engine/get.go b/pkg/local_object_storage/engine/get.go index bbe5688f8..3db3e7c63 100644 --- a/pkg/local_object_storage/engine/get.go +++ b/pkg/local_object_storage/engine/get.go @@ -74,6 +74,7 @@ func (e *StorageEngine) get(prm GetPrm) (GetRes, error) { shPrm.SetAddress(prm.addr) var hasDegraded bool + var objectExpired bool e.iterateOverSortedShards(prm.addr, func(_ int, sh hashedShard) (stop bool) { noMeta := sh.GetMode().NoMetabase() @@ -113,7 +114,7 @@ func (e *StorageEngine) get(prm GetPrm) (GetRes, error) { case shard.IsErrObjectExpired(err): // object is found but should not // be returned - outError = errNotFound + objectExpired = true return true default: e.reportShardError(sh, "could not get object from shard", err) @@ -130,6 +131,10 @@ func (e *StorageEngine) get(prm GetPrm) (GetRes, error) { return GetRes{}, logicerr.Wrap(objectSDK.NewSplitInfoError(outSI)) } + if objectExpired { + return GetRes{}, errNotFound + } + if obj == nil { if !hasDegraded && shardWithMeta.Shard == nil || !shard.IsErrNotFound(outError) { return GetRes{}, outError diff --git a/pkg/local_object_storage/metabase/storage_id.go b/pkg/local_object_storage/metabase/storage_id.go index 4369215ae..4cda0b256 100644 --- a/pkg/local_object_storage/metabase/storage_id.go +++ b/pkg/local_object_storage/metabase/storage_id.go @@ -1,6 +1,8 @@ package meta import ( + "errors" + oid "github.com/TrueCloudLab/frostfs-sdk-go/object/id" "github.com/nspcc-dev/neo-go/pkg/util/slice" "go.etcd.io/bbolt" @@ -94,10 +96,11 @@ func (db *DB) UpdateStorageID(prm UpdateStorageIDPrm) (res UpdateStorageIDRes, e err = db.boltDB.Batch(func(tx *bbolt.Tx) error { exists, err := db.exists(tx, prm.addr, currEpoch) - if !exists || err != nil { - return err + if err == nil && exists || errors.Is(err, ErrObjectIsExpired) { + err = updateStorageID(tx, prm.addr, prm.id) } - return updateStorageID(tx, prm.addr, prm.id) + + return err }) return diff --git a/pkg/local_object_storage/pilorama/batch.go b/pkg/local_object_storage/pilorama/batch.go index 59712b761..43375ba1b 100644 --- a/pkg/local_object_storage/pilorama/batch.go +++ b/pkg/local_object_storage/pilorama/batch.go @@ -10,8 +10,11 @@ import ( ) type batch struct { - forest *boltForest - timer *time.Timer + forest *boltForest + timer *time.Timer + // mtx protects timer and operations fields. + // Because mtx can be taken inside a transaction, + // transactions MUST NOT be executed with the mutex taken to avoid a deadlock. mtx sync.Mutex start sync.Once cid cidSDK.ID @@ -24,16 +27,12 @@ func (b *batch) trigger() { b.mtx.Lock() if b.timer != nil { b.timer.Stop() - b.timer = nil } b.mtx.Unlock() b.start.Do(b.run) } func (b *batch) run() { - sort.Slice(b.operations, func(i, j int) bool { - return b.operations[i].Time < b.operations[j].Time - }) fullID := bucketName(b.cid, b.treeID) err := b.forest.db.Update(func(tx *bbolt.Tx) error { bLog, bTree, err := b.forest.getTreeBuckets(tx, fullID) @@ -41,6 +40,16 @@ func (b *batch) run() { return err } + b.mtx.Lock() + b.timer = nil + b.mtx.Unlock() + + // Sorting without a mutex is ok, because we append to this slice only if timer is non-nil. + // See (*boltForest).addBatch for details. + sort.Slice(b.operations, func(i, j int) bool { + return b.operations[i].Time < b.operations[j].Time + }) + var lm Move return b.forest.applyOperation(bLog, bTree, b.operations, &lm) }) diff --git a/pkg/local_object_storage/pilorama/boltdb.go b/pkg/local_object_storage/pilorama/boltdb.go index 0f546ef61..247d07d28 100644 --- a/pkg/local_object_storage/pilorama/boltdb.go +++ b/pkg/local_object_storage/pilorama/boltdb.go @@ -377,7 +377,9 @@ func (t *boltForest) addBatch(d CIDDescriptor, treeID string, m *Move, ch chan e results: []chan<- error{ch}, operations: []*Move{m}, } + b.mtx.Lock() b.timer = time.AfterFunc(t.db.MaxBatchDelay, b.trigger) + b.mtx.Unlock() t.batches = append(t.batches, b) t.mtx.Unlock() } diff --git a/pkg/local_object_storage/shard/control.go b/pkg/local_object_storage/shard/control.go index c96b267ed..2f698c694 100644 --- a/pkg/local_object_storage/shard/control.go +++ b/pkg/local_object_storage/shard/control.go @@ -255,9 +255,11 @@ func (s *Shard) Close() error { components = append(components, s.blobStor, s.metaBase) + var lastErr error for _, component := range components { if err := component.Close(); err != nil { - return fmt.Errorf("could not close %s: %w", component, err) + lastErr = err + s.log.Error("could not close shard component", zap.Error(err)) } } @@ -266,7 +268,7 @@ func (s *Shard) Close() error { s.gc.stop() } - return nil + return lastErr } // Reload reloads configuration portions that are necessary. diff --git a/pkg/local_object_storage/shard/get.go b/pkg/local_object_storage/shard/get.go index 4f00995c3..607d508d3 100644 --- a/pkg/local_object_storage/shard/get.go +++ b/pkg/local_object_storage/shard/get.go @@ -121,9 +121,14 @@ func (s *Shard) fetchObjectData(addr oid.Address, skipMeta bool, cb storFetcher, } if IsErrNotFound(err) { - s.log.Debug("object is missing in write-cache") + s.log.Debug("object is missing in write-cache", + zap.Stringer("addr", addr), + zap.Bool("skip_meta", skipMeta)) } else { - s.log.Error("failed to fetch object from write-cache", zap.Error(err)) + s.log.Error("failed to fetch object from write-cache", + zap.Error(err), + zap.Stringer("addr", addr), + zap.Bool("skip_meta", skipMeta)) } } diff --git a/pkg/local_object_storage/writecache/flush.go b/pkg/local_object_storage/writecache/flush.go index 7340512f1..f4fce0d0a 100644 --- a/pkg/local_object_storage/writecache/flush.go +++ b/pkg/local_object_storage/writecache/flush.go @@ -99,10 +99,6 @@ func (c *cache) flushDB() { lastKey = slice.Copy(k) } - if _, ok := c.flushed.Peek(string(k)); ok { - continue - } - m = append(m, objectInfo{ addr: string(k), data: slice.Copy(v), @@ -111,12 +107,18 @@ func (c *cache) flushDB() { return nil }) + var count int for i := range m { + if c.flushed.Contains(m[i].addr) { + continue + } + obj := object.New() if err := obj.Unmarshal(m[i].data); err != nil { continue } + count++ select { case c.flushCh <- obj: case <-c.closeCh: @@ -125,7 +127,7 @@ func (c *cache) flushDB() { } } - if len(m) == 0 { + if count == 0 { c.modeMtx.RUnlock() break } @@ -133,7 +135,7 @@ func (c *cache) flushDB() { c.modeMtx.RUnlock() c.log.Debug("tried to flush items from write-cache", - zap.Int("count", len(m)), + zap.Int("count", count), zap.String("start", base58.Encode(lastKey))) } } diff --git a/pkg/local_object_storage/writecache/init.go b/pkg/local_object_storage/writecache/init.go index fbcc455c0..2c8bcf9c8 100644 --- a/pkg/local_object_storage/writecache/init.go +++ b/pkg/local_object_storage/writecache/init.go @@ -46,6 +46,7 @@ func (c *cache) initFlushMarks() { var batchSize = flushBatchSize for { m = m[:0] + indices = indices[:0] // We put objects in batches of fixed size to not interfere with main put cycle a lot. _ = c.db.View(func(tx *bbolt.Tx) error { diff --git a/pkg/local_object_storage/writecache/storage.go b/pkg/local_object_storage/writecache/storage.go index 4a9724f4d..da533880e 100644 --- a/pkg/local_object_storage/writecache/storage.go +++ b/pkg/local_object_storage/writecache/storage.go @@ -23,6 +23,11 @@ type store struct { maxFlushedMarksCount int maxRemoveBatchSize int + // flushed contains addresses of objects that were already flushed to the main storage. + // We use LRU cache instead of map here to facilitate removing of unused object in favour of + // frequently read ones. + // MUST NOT be used inside bolt db transaction because it's eviction handler + // removes untracked items from the database. flushed simplelru.LRUCache[string, bool] db *bbolt.DB diff --git a/pkg/metrics/object.go b/pkg/metrics/object.go index 0a9497583..0bb16b780 100644 --- a/pkg/metrics/object.go +++ b/pkg/metrics/object.go @@ -51,13 +51,13 @@ func newMethodCallCounter(name string) methodCount { success: prometheus.NewCounter(prometheus.CounterOpts{ Namespace: namespace, Subsystem: objectSubsystem, - Name: fmt.Sprintf("%s_req_count", name), + Name: fmt.Sprintf("%s_req_count_success", name), Help: fmt.Sprintf("The number of successful %s requests processed", name), }), total: prometheus.NewCounter(prometheus.CounterOpts{ Namespace: namespace, Subsystem: objectSubsystem, - Name: fmt.Sprintf("%s_req_count_success", name), + Name: fmt.Sprintf("%s_req_count", name), Help: fmt.Sprintf("Total number of %s requests processed", name), }), } diff --git a/pkg/morph/client/notifications.go b/pkg/morph/client/notifications.go index 8eaf617b2..2afeebb83 100644 --- a/pkg/morph/client/notifications.go +++ b/pkg/morph/client/notifications.go @@ -211,6 +211,29 @@ func (c *Client) restoreSubscriptions(cli *rpcclient.WSClient, endpoint string) id string ) + stopCh := make(chan struct{}) + defer close(stopCh) + + // neo-go WS client says to _always_ read notifications + // from its channel. Subscribing to any notification + // while not reading them in another goroutine may + // lead to a dead-lock, thus that async side notification + // listening while restoring subscriptions + go func() { + for { + select { + case <-stopCh: + return + case n, ok := <-cli.Notifications: + if !ok { + return + } + + c.notifications <- n + } + } + }() + // new block events restoration if c.subscribedToNewBlocks { _, err = cli.SubscribeForNewBlocks(nil) @@ -226,6 +249,7 @@ func (c *Client) restoreSubscriptions(cli *rpcclient.WSClient, endpoint string) // notification events restoration for contract := range c.subscribedEvents { + contract := contract // See https://github.com/nspcc-dev/neo-go/issues/2890 id, err = cli.SubscribeForExecutionNotifications(&contract, nil) if err != nil { c.logger.Error("could not restore notification subscription after RPC switch", @@ -242,6 +266,7 @@ func (c *Client) restoreSubscriptions(cli *rpcclient.WSClient, endpoint string) // notary notification events restoration if c.notary != nil { for signer := range c.subscribedNotaryEvents { + signer := signer // See https://github.com/nspcc-dev/neo-go/issues/2890 id, err = cli.SubscribeForNotaryRequests(nil, &signer) if err != nil { c.logger.Error("could not restore notary notification subscription after RPC switch", diff --git a/pkg/network/cache/multi.go b/pkg/network/cache/multi.go index bf4beab54..e459ae42a 100644 --- a/pkg/network/cache/multi.go +++ b/pkg/network/cache/multi.go @@ -12,6 +12,8 @@ import ( "github.com/TrueCloudLab/frostfs-node/pkg/network" "github.com/TrueCloudLab/frostfs-sdk-go/client" "github.com/TrueCloudLab/frostfs-sdk-go/object" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) type singleClient struct { @@ -172,6 +174,10 @@ func (x *multiClient) ReportError(err error) { return } + if status.Code(err) == codes.Canceled || errors.Is(err, context.Canceled) { + return + } + // non-status logic error that could be returned // from the SDK client; should not be considered // as a connection error @@ -196,7 +202,6 @@ func (s *singleClient) invalidate() { _ = s.client.Close() } s.client = nil - s.lastAttempt = time.Now() s.Unlock() } diff --git a/pkg/services/object/get/exec.go b/pkg/services/object/get/exec.go index 89b3f8aac..5755973f7 100644 --- a/pkg/services/object/get/exec.go +++ b/pkg/services/object/get/exec.go @@ -128,7 +128,7 @@ func (exec execCtx) key() (*ecdsa.PrivateKey, error) { } func (exec *execCtx) canAssemble() bool { - return exec.svc.assembly && !exec.isRaw() && !exec.headOnly() + return exec.svc.assembly && !exec.isRaw() && !exec.headOnly() && !exec.isLocal() } func (exec *execCtx) splitInfo() *objectSDK.SplitInfo { diff --git a/pkg/services/object/metrics.go b/pkg/services/object/metrics.go index 11828875d..43c636e2c 100644 --- a/pkg/services/object/metrics.go +++ b/pkg/services/object/metrics.go @@ -12,6 +12,7 @@ type ( MetricCollector struct { next ServiceServer metrics MetricRegister + enabled bool } getStreamMetric struct { @@ -48,96 +49,125 @@ type ( } ) -func NewMetricCollector(next ServiceServer, register MetricRegister) *MetricCollector { +func NewMetricCollector(next ServiceServer, register MetricRegister, enabled bool) *MetricCollector { return &MetricCollector{ next: next, metrics: register, + enabled: enabled, } } func (m MetricCollector) Get(req *object.GetRequest, stream GetObjectStream) (err error) { - t := time.Now() - defer func() { - m.metrics.IncGetReqCounter(err == nil) - m.metrics.AddGetReqDuration(time.Since(t)) - }() - - err = m.next.Get(req, &getStreamMetric{ - ServerStream: stream, - stream: stream, - metrics: m.metrics, - }) + if m.enabled { + t := time.Now() + defer func() { + m.metrics.IncGetReqCounter(err == nil) + m.metrics.AddGetReqDuration(time.Since(t)) + }() + err = m.next.Get(req, &getStreamMetric{ + ServerStream: stream, + stream: stream, + metrics: m.metrics, + }) + } else { + err = m.next.Get(req, stream) + } return } func (m MetricCollector) Put(ctx context.Context) (PutObjectStream, error) { - t := time.Now() + if m.enabled { + t := time.Now() - stream, err := m.next.Put(ctx) - if err != nil { - return nil, err + stream, err := m.next.Put(ctx) + if err != nil { + return nil, err + } + + return &putStreamMetric{ + stream: stream, + metrics: m.metrics, + start: t, + }, nil } - - return &putStreamMetric{ - stream: stream, - metrics: m.metrics, - start: t, - }, nil + return m.next.Put(ctx) } func (m MetricCollector) Head(ctx context.Context, request *object.HeadRequest) (*object.HeadResponse, error) { - t := time.Now() + if m.enabled { + t := time.Now() - res, err := m.next.Head(ctx, request) + res, err := m.next.Head(ctx, request) - m.metrics.IncHeadReqCounter(err == nil) - m.metrics.AddHeadReqDuration(time.Since(t)) + m.metrics.IncHeadReqCounter(err == nil) + m.metrics.AddHeadReqDuration(time.Since(t)) - return res, err + return res, err + } + return m.next.Head(ctx, request) } func (m MetricCollector) Search(req *object.SearchRequest, stream SearchStream) error { - t := time.Now() + if m.enabled { + t := time.Now() - err := m.next.Search(req, stream) + err := m.next.Search(req, stream) - m.metrics.IncSearchReqCounter(err == nil) - m.metrics.AddSearchReqDuration(time.Since(t)) + m.metrics.IncSearchReqCounter(err == nil) + m.metrics.AddSearchReqDuration(time.Since(t)) - return err + return err + } + return m.next.Search(req, stream) } func (m MetricCollector) Delete(ctx context.Context, request *object.DeleteRequest) (*object.DeleteResponse, error) { - t := time.Now() + if m.enabled { + t := time.Now() - res, err := m.next.Delete(ctx, request) + res, err := m.next.Delete(ctx, request) - m.metrics.IncDeleteReqCounter(err == nil) - m.metrics.AddDeleteReqDuration(time.Since(t)) - - return res, err + m.metrics.IncDeleteReqCounter(err == nil) + m.metrics.AddDeleteReqDuration(time.Since(t)) + return res, err + } + return m.next.Delete(ctx, request) } func (m MetricCollector) GetRange(req *object.GetRangeRequest, stream GetObjectRangeStream) error { - t := time.Now() + if m.enabled { + t := time.Now() - err := m.next.GetRange(req, stream) + err := m.next.GetRange(req, stream) - m.metrics.IncRangeReqCounter(err == nil) - m.metrics.AddRangeReqDuration(time.Since(t)) + m.metrics.IncRangeReqCounter(err == nil) + m.metrics.AddRangeReqDuration(time.Since(t)) - return err + return err + } + return m.next.GetRange(req, stream) } func (m MetricCollector) GetRangeHash(ctx context.Context, request *object.GetRangeHashRequest) (*object.GetRangeHashResponse, error) { - t := time.Now() + if m.enabled { + t := time.Now() - res, err := m.next.GetRangeHash(ctx, request) + res, err := m.next.GetRangeHash(ctx, request) - m.metrics.IncRangeHashReqCounter(err == nil) - m.metrics.AddRangeHashReqDuration(time.Since(t)) + m.metrics.IncRangeHashReqCounter(err == nil) + m.metrics.AddRangeHashReqDuration(time.Since(t)) - return res, err + return res, err + } + return m.next.GetRangeHash(ctx, request) +} + +func (m *MetricCollector) Enable() { + m.enabled = true +} + +func (m *MetricCollector) Disable() { + m.enabled = false } func (s getStreamMetric) Send(resp *object.GetResponse) error { diff --git a/pkg/util/http/server.go b/pkg/util/http/server.go index cbfbda739..05a1f44c1 100644 --- a/pkg/util/http/server.go +++ b/pkg/util/http/server.go @@ -6,12 +6,12 @@ import ( "time" ) -// Prm groups the required parameters of the Server's constructor. +// HTTPSrvPrm groups the required parameters of the Server's constructor. // // All values must comply with the requirements imposed on them. // Passing incorrect parameter values will result in constructor // failure (error or panic depending on the implementation). -type Prm struct { +type HTTPSrvPrm struct { // TCP address for the server to listen on. // // Must be a valid TCP address. @@ -49,6 +49,15 @@ func panicOnValue(t, n string, v interface{}) { panic(fmt.Sprintf(invalidValFmt, t, n, v, v)) } +func checkSrvPrm(addr string, handler http.Handler) { + switch { + case addr == "": + panicOnPrmValue("Address", addr) + case handler == nil: + panicOnPrmValue("Handler", handler) + } +} + // New creates a new instance of the Server. // // Panics if at least one value of the parameters is invalid. @@ -58,13 +67,8 @@ func panicOnValue(t, n string, v interface{}) { // // The created Server does not require additional // initialization and is completely ready for work. -func New(prm Prm, opts ...Option) *Server { - switch { - case prm.Address == "": - panicOnPrmValue("Address", prm.Address) - case prm.Handler == nil: - panicOnPrmValue("Handler", prm.Handler) - } +func New(prm HTTPSrvPrm, opts ...Option) *Server { + checkSrvPrm(prm.Address, prm.Handler) c := defaultCfg() @@ -85,3 +89,14 @@ func New(prm Prm, opts ...Option) *Server { }, } } + +// NewHTTPSrvPrm creates a new instance of the HTTPSrvPrm. +// +// Panics if at least one value of the parameters is invalid. +func NewHTTPSrvPrm(addr string, handler http.Handler) *HTTPSrvPrm { + checkSrvPrm(addr, handler) + return &HTTPSrvPrm{ + Address: addr, + Handler: handler, + } +}