Move changes from the support branch. #58
37 changed files with 787 additions and 254 deletions
26
CHANGELOG.md
26
CHANGELOG.md
|
@ -9,12 +9,17 @@ Changelog for FrostFS Node
|
||||||
- New `frostfs_node_object_container_size` metric for tracking size of reqular objects in a container (#2116)
|
- New `frostfs_node_object_container_size` metric for tracking size of reqular objects in a container (#2116)
|
||||||
- New `frostfs_node_object_payload_size` metric for tracking size of reqular objects on a single shard (#1794)
|
- New `frostfs_node_object_payload_size` metric for tracking size of reqular objects on a single shard (#1794)
|
||||||
- Add command `frostfs-adm morph netmap-candidates` (#1889)
|
- Add command `frostfs-adm morph netmap-candidates` (#1889)
|
||||||
|
- `object.delete.tombstone_lifetime` config parameter to set tombstone lifetime in the DELETE service (#2246)
|
||||||
|
- Reload config for pprof and metrics on SIGHUP in `neofs-node` (#1868)
|
||||||
|
|
||||||
### Changed
|
### Changed
|
||||||
- Change `frostfs_node_engine_container_size` to counting sizes of logical objects
|
- Change `frostfs_node_engine_container_size` to counting sizes of logical objects
|
||||||
- `common.PrintVerbose` prints via `cobra.Command.Printf` (#1962)
|
- `common.PrintVerbose` prints via `cobra.Command.Printf` (#1962)
|
||||||
- Env prefix in configuration changed to `FROSTFS_*` (#43)
|
- Env prefix in configuration changed to `FROSTFS_*` (#43)
|
||||||
- Link object is broadcast throughout the whole container now (#57)
|
- Link object is broadcast throughout the whole container now (#57)
|
||||||
|
- Pilorama now can merge multiple batches into one (#2231)
|
||||||
|
- Storage engine now can start even when some shard components are unavailable (#2238)
|
||||||
|
- `neofs-cli` buffer for object put increased from 4 KiB to 3 MiB (#2243)
|
||||||
|
|
||||||
### Fixed
|
### Fixed
|
||||||
- Increase payload size metric on shards' `put` operation (#1794)
|
- Increase payload size metric on shards' `put` operation (#1794)
|
||||||
|
@ -26,6 +31,14 @@ Changelog for FrostFS Node
|
||||||
- Set flag `mode` required for `frostfs-cli control shards set-mode` (#8)
|
- Set flag `mode` required for `frostfs-cli control shards set-mode` (#8)
|
||||||
- Fix `dirty` suffix in debian package version (#53)
|
- Fix `dirty` suffix in debian package version (#53)
|
||||||
- Prevent node process from killing by systemd when shutting down (#1465)
|
- Prevent node process from killing by systemd when shutting down (#1465)
|
||||||
|
- Restore subscriptions correctly on morph client switch (#2212)
|
||||||
|
- Expired objects could be returned if not marked with GC yet (#2213)
|
||||||
|
- `neofs-adm morph dump-hashes` now properly iterates over custom domain (#2224)
|
||||||
|
- Possible deadlock in write-cache (#2239)
|
||||||
|
- Fix `*_req_count` and `*_req_count_success` metric values (#2241)
|
||||||
|
- Storage ID update by write-cache (#2244)
|
||||||
|
- `neo-go` client deadlock on subscription restoration (#2244)
|
||||||
|
- Possible panic during write-cache initialization (#2234)
|
||||||
|
|
||||||
### Removed
|
### Removed
|
||||||
### Updated
|
### Updated
|
||||||
|
@ -38,9 +51,15 @@ Changelog for FrostFS Node
|
||||||
- Minimum go version to v1.18
|
- Minimum go version to v1.18
|
||||||
|
|
||||||
### Updating from v0.35.0
|
### Updating from v0.35.0
|
||||||
|
<<<<<<< HEAD
|
||||||
You need to change configuration environment variables to `FROSTFS_*` if you use any.
|
You need to change configuration environment variables to `FROSTFS_*` if you use any.
|
||||||
|
||||||| parent of 00afc576d ([#2246] node: Allow to configure tombsone lifetime)
|
||||||
|
=======
|
||||||
|
New config field `object.delete.tombstone_lifetime` allows to set tombstone lifetime
|
||||||
|
more appropriate for a specific deployment.
|
||||||
|
>>>>>>> 00afc576d ([#2246] node: Allow to configure tombsone lifetime)
|
||||||
|
|
||||||
## [0.35.0] - 2022-12-28 - Sindo (신도)
|
## [0.35.0] - 2022-12-28 - Sindo (신도, 信島)
|
||||||
|
|
||||||
### Added
|
### Added
|
||||||
- `morph list-containers` in `neofs-adm` (#1689)
|
- `morph list-containers` in `neofs-adm` (#1689)
|
||||||
|
@ -124,7 +143,6 @@ You need to change configuration environment variables to `FROSTFS_*` if you use
|
||||||
- `spf13/viper` to `v1.8.0`
|
- `spf13/viper` to `v1.8.0`
|
||||||
- `google.golang.org/grpc` to `v1.50.1`
|
- `google.golang.org/grpc` to `v1.50.1`
|
||||||
|
|
||||||
|
|
||||||
### Updating from v0.34.0
|
### Updating from v0.34.0
|
||||||
Pass CID and OID parameters via the `--cid` and `--oid` flags, not as the command arguments.
|
Pass CID and OID parameters via the `--cid` and `--oid` flags, not as the command arguments.
|
||||||
|
|
||||||
|
@ -138,9 +156,9 @@ to match the container owner. Use `--force` (`-f`) flag to bypass this requireme
|
||||||
|
|
||||||
Tree service network replication can now be fine-tuned with `tree.replication_timeout` config field.
|
Tree service network replication can now be fine-tuned with `tree.replication_timeout` config field.
|
||||||
|
|
||||||
## [0.34.0] - 2022-10-31 - Marado (마라도, 馬羅島)
|
## [0.34.0] - 2022-10-31 - Marado (마라도, 馬羅島)
|
||||||
|
|
||||||
# ## Added
|
### Added
|
||||||
- `--timeout` flag in `neofs-cli control` commands (#1917)
|
- `--timeout` flag in `neofs-cli control` commands (#1917)
|
||||||
- Document shard modes of operation (#1909)
|
- Document shard modes of operation (#1909)
|
||||||
- `tree list` CLI command (#1332)
|
- `tree list` CLI command (#1332)
|
||||||
|
|
|
@ -2,6 +2,7 @@ package morph
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"strings"
|
"strings"
|
||||||
"text/tabwriter"
|
"text/tabwriter"
|
||||||
|
@ -107,31 +108,30 @@ func dumpContractHashes(cmd *cobra.Command, _ []string) error {
|
||||||
|
|
||||||
func dumpCustomZoneHashes(cmd *cobra.Command, nnsHash util.Uint160, zone string, c Client) error {
|
func dumpCustomZoneHashes(cmd *cobra.Command, nnsHash util.Uint160, zone string, c Client) error {
|
||||||
const nnsMaxTokens = 100
|
const nnsMaxTokens = 100
|
||||||
inv := invoker.New(c, nil)
|
|
||||||
|
|
||||||
arr, err := unwrap.Array(inv.CallAndExpandIterator(nnsHash, "tokens", nnsMaxTokens))
|
inv := invoker.New(c, nil)
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("can't get a list of NNS domains: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if !strings.HasPrefix(zone, ".") {
|
if !strings.HasPrefix(zone, ".") {
|
||||||
zone = "." + zone
|
zone = "." + zone
|
||||||
}
|
}
|
||||||
|
|
||||||
var infos []contractDumpInfo
|
var infos []contractDumpInfo
|
||||||
for i := range arr {
|
processItem := func(item stackitem.Item) {
|
||||||
bs, err := arr[i].TryBytes()
|
bs, err := item.TryBytes()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
continue
|
cmd.PrintErrf("Invalid NNS record: %v\n", err)
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if !bytes.HasSuffix(bs, []byte(zone)) {
|
if !bytes.HasSuffix(bs, []byte(zone)) {
|
||||||
continue
|
// Related https://github.com/nspcc-dev/neofs-contract/issues/316.
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
h, err := nnsResolveHash(inv, nnsHash, string(bs))
|
h, err := nnsResolveHash(inv, nnsHash, string(bs))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
continue
|
cmd.PrintErrf("Could not resolve name %s: %v\n", string(bs), err)
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
infos = append(infos, contractDumpInfo{
|
infos = append(infos, contractDumpInfo{
|
||||||
|
@ -140,6 +140,39 @@ func dumpCustomZoneHashes(cmd *cobra.Command, nnsHash util.Uint160, zone string,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
sessionID, iter, err := unwrap.SessionIterator(inv.Call(nnsHash, "tokens"))
|
||||||
|
if err != nil {
|
||||||
|
if errors.Is(err, unwrap.ErrNoSessionID) {
|
||||||
|
items, err := unwrap.Array(inv.CallAndExpandIterator(nnsHash, "tokens", nnsMaxTokens))
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("can't get a list of NNS domains: %w", err)
|
||||||
|
}
|
||||||
|
if len(items) == nnsMaxTokens {
|
||||||
|
cmd.PrintErrln("Provided RPC endpoint doesn't support sessions, some hashes might be lost.")
|
||||||
|
}
|
||||||
|
for i := range items {
|
||||||
|
processItem(items[i])
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
defer func() {
|
||||||
|
_ = inv.TerminateSession(sessionID)
|
||||||
|
}()
|
||||||
|
|
||||||
|
items, err := inv.TraverseIterator(sessionID, &iter, nnsMaxTokens)
|
||||||
|
for err == nil && len(items) != 0 {
|
||||||
|
for i := range items {
|
||||||
|
processItem(items[i])
|
||||||
|
}
|
||||||
|
items, err = inv.TraverseIterator(sessionID, &iter, nnsMaxTokens)
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("error during NNS domains iteration: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
fillContractVersion(cmd, c, infos)
|
fillContractVersion(cmd, c, infos)
|
||||||
printContractInfo(cmd, infos)
|
printContractInfo(cmd, infos)
|
||||||
|
|
||||||
|
|
|
@ -404,8 +404,7 @@ func PutObject(prm PutObjectPrm) (*PutObjectRes, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if prm.rdr != nil {
|
if prm.rdr != nil {
|
||||||
// TODO: (neofs-node#1198) explore better values or configure it
|
const defaultBufferSizePut = 3 << 20 // Maximum chunk size is 3 MiB in the SDK.
|
||||||
const defaultBufferSizePut = 4096
|
|
||||||
|
|
||||||
if sz == 0 || sz > defaultBufferSizePut {
|
if sz == 0 || sz > defaultBufferSizePut {
|
||||||
sz = defaultBufferSizePut
|
sz = defaultBufferSizePut
|
||||||
|
|
|
@ -126,7 +126,7 @@ func initHTTPServers(cfg *viper.Viper, log *logger.Logger) []*httputil.Server {
|
||||||
|
|
||||||
addr := cfg.GetString(item.cfgPrefix + ".address")
|
addr := cfg.GetString(item.cfgPrefix + ".address")
|
||||||
|
|
||||||
var prm httputil.Prm
|
var prm httputil.HTTPSrvPrm
|
||||||
|
|
||||||
prm.Address = addr
|
prm.Address = addr
|
||||||
prm.Handler = item.handler()
|
prm.Handler = item.handler()
|
||||||
|
|
25
cmd/frostfs-node/closer.go
Normal file
25
cmd/frostfs-node/closer.go
Normal file
|
@ -0,0 +1,25 @@
|
||||||
|
package main
|
||||||
|
|
||||||
|
type closer struct {
|
||||||
|
name string
|
||||||
|
fn func()
|
||||||
|
}
|
||||||
|
|
||||||
|
func getCloser(c *cfg, name string) *closer {
|
||||||
|
for _, clsr := range c.closers {
|
||||||
|
if clsr.name == name {
|
||||||
|
return &clsr
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func delCloser(c *cfg, name string) {
|
||||||
|
for i, clsr := range c.closers {
|
||||||
|
if clsr.name == name {
|
||||||
|
c.closers[i] = c.closers[len(c.closers)-1]
|
||||||
|
c.closers = c.closers[:len(c.closers)-1]
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -24,7 +24,6 @@ import (
|
||||||
blobovniczaconfig "github.com/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/blobstor/blobovnicza"
|
blobovniczaconfig "github.com/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/blobstor/blobovnicza"
|
||||||
fstreeconfig "github.com/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/blobstor/fstree"
|
fstreeconfig "github.com/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/blobstor/fstree"
|
||||||
loggerconfig "github.com/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/logger"
|
loggerconfig "github.com/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/logger"
|
||||||
metricsconfig "github.com/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/metrics"
|
|
||||||
nodeconfig "github.com/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/node"
|
nodeconfig "github.com/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/node"
|
||||||
objectconfig "github.com/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/object"
|
objectconfig "github.com/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/object"
|
||||||
replicatorconfig "github.com/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/replicator"
|
replicatorconfig "github.com/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/replicator"
|
||||||
|
@ -48,6 +47,7 @@ import (
|
||||||
"github.com/TrueCloudLab/frostfs-node/pkg/network"
|
"github.com/TrueCloudLab/frostfs-node/pkg/network"
|
||||||
"github.com/TrueCloudLab/frostfs-node/pkg/network/cache"
|
"github.com/TrueCloudLab/frostfs-node/pkg/network/cache"
|
||||||
"github.com/TrueCloudLab/frostfs-node/pkg/services/control"
|
"github.com/TrueCloudLab/frostfs-node/pkg/services/control"
|
||||||
|
objectService "github.com/TrueCloudLab/frostfs-node/pkg/services/object"
|
||||||
getsvc "github.com/TrueCloudLab/frostfs-node/pkg/services/object/get"
|
getsvc "github.com/TrueCloudLab/frostfs-node/pkg/services/object/get"
|
||||||
"github.com/TrueCloudLab/frostfs-node/pkg/services/object_manager/tombstone"
|
"github.com/TrueCloudLab/frostfs-node/pkg/services/object_manager/tombstone"
|
||||||
tsourse "github.com/TrueCloudLab/frostfs-node/pkg/services/object_manager/tombstone/source"
|
tsourse "github.com/TrueCloudLab/frostfs-node/pkg/services/object_manager/tombstone/source"
|
||||||
|
@ -308,7 +308,7 @@ type internals struct {
|
||||||
|
|
||||||
wg *sync.WaitGroup
|
wg *sync.WaitGroup
|
||||||
workers []worker
|
workers []worker
|
||||||
closers []func()
|
closers []closer
|
||||||
|
|
||||||
apiVersion version.Version
|
apiVersion version.Version
|
||||||
healthStatus *atomic.Int32
|
healthStatus *atomic.Int32
|
||||||
|
@ -344,6 +344,7 @@ type shared struct {
|
||||||
|
|
||||||
clientCache *cache.ClientCache
|
clientCache *cache.ClientCache
|
||||||
bgClientCache *cache.ClientCache
|
bgClientCache *cache.ClientCache
|
||||||
|
putClientCache *cache.ClientCache
|
||||||
localAddr network.AddressGroup
|
localAddr network.AddressGroup
|
||||||
|
|
||||||
key *keys.PrivateKey
|
key *keys.PrivateKey
|
||||||
|
@ -363,12 +364,16 @@ type shared struct {
|
||||||
treeService *tree.Service
|
treeService *tree.Service
|
||||||
|
|
||||||
metricsCollector *metrics.NodeMetrics
|
metricsCollector *metrics.NodeMetrics
|
||||||
|
|
||||||
|
metricsSvc *objectService.MetricCollector
|
||||||
}
|
}
|
||||||
|
|
||||||
// dynamicConfiguration stores parameters of the
|
// dynamicConfiguration stores parameters of the
|
||||||
// components that supports runtime reconfigurations.
|
// components that supports runtime reconfigurations.
|
||||||
type dynamicConfiguration struct {
|
type dynamicConfiguration struct {
|
||||||
logger *logger.Prm
|
logger *logger.Prm
|
||||||
|
pprof *httpComponent
|
||||||
|
metrics *httpComponent
|
||||||
}
|
}
|
||||||
|
|
||||||
type cfg struct {
|
type cfg struct {
|
||||||
|
@ -472,6 +477,8 @@ type cfgObject struct {
|
||||||
pool cfgObjectRoutines
|
pool cfgObjectRoutines
|
||||||
|
|
||||||
cfgLocalStorage cfgLocalStorage
|
cfgLocalStorage cfgLocalStorage
|
||||||
|
|
||||||
|
tombstoneLifetime uint64
|
||||||
}
|
}
|
||||||
|
|
||||||
type cfgNotifications struct {
|
type cfgNotifications struct {
|
||||||
|
@ -574,6 +581,7 @@ func initCfg(appCfg *config.Config) *cfg {
|
||||||
respSvc: response.NewService(response.WithNetworkState(netState)),
|
respSvc: response.NewService(response.WithNetworkState(netState)),
|
||||||
clientCache: cache.NewSDKClientCache(cacheOpts),
|
clientCache: cache.NewSDKClientCache(cacheOpts),
|
||||||
bgClientCache: cache.NewSDKClientCache(cacheOpts),
|
bgClientCache: cache.NewSDKClientCache(cacheOpts),
|
||||||
|
putClientCache: cache.NewSDKClientCache(cacheOpts),
|
||||||
persistate: persistate,
|
persistate: persistate,
|
||||||
}
|
}
|
||||||
c.cfgAccounting = cfgAccounting{
|
c.cfgAccounting = cfgAccounting{
|
||||||
|
@ -599,6 +607,7 @@ func initCfg(appCfg *config.Config) *cfg {
|
||||||
}
|
}
|
||||||
c.cfgObject = cfgObject{
|
c.cfgObject = cfgObject{
|
||||||
pool: initObjectPool(appCfg),
|
pool: initObjectPool(appCfg),
|
||||||
|
tombstoneLifetime: objectconfig.TombstoneLifetime(appCfg),
|
||||||
}
|
}
|
||||||
c.cfgReputation = cfgReputation{
|
c.cfgReputation = cfgReputation{
|
||||||
scriptHash: contractsconfig.Reputation(appCfg),
|
scriptHash: contractsconfig.Reputation(appCfg),
|
||||||
|
@ -607,13 +616,12 @@ func initCfg(appCfg *config.Config) *cfg {
|
||||||
|
|
||||||
user.IDFromKey(&c.ownerIDFromKey, key.PrivateKey.PublicKey)
|
user.IDFromKey(&c.ownerIDFromKey, key.PrivateKey.PublicKey)
|
||||||
|
|
||||||
if metricsconfig.Enabled(c.appCfg) {
|
|
||||||
c.metricsCollector = metrics.NewNodeMetrics()
|
c.metricsCollector = metrics.NewNodeMetrics()
|
||||||
netState.metrics = c.metricsCollector
|
netState.metrics = c.metricsCollector
|
||||||
}
|
|
||||||
|
|
||||||
c.onShutdown(c.clientCache.CloseAll) // clean up connections
|
c.onShutdown(c.clientCache.CloseAll) // clean up connections
|
||||||
c.onShutdown(c.bgClientCache.CloseAll) // clean up connections
|
c.onShutdown(c.bgClientCache.CloseAll) // clean up connections
|
||||||
|
c.onShutdown(c.putClientCache.CloseAll) // clean up connections
|
||||||
c.onShutdown(func() { _ = c.persistate.Close() })
|
c.onShutdown(func() { _ = c.persistate.Close() })
|
||||||
|
|
||||||
return c
|
return c
|
||||||
|
@ -790,13 +798,18 @@ func initLocalStorage(c *cfg) {
|
||||||
tombstone.WithTombstoneSource(tombstoneSrc),
|
tombstone.WithTombstoneSource(tombstoneSrc),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var shardsAttached int
|
||||||
for _, optsWithMeta := range c.shardOpts() {
|
for _, optsWithMeta := range c.shardOpts() {
|
||||||
id, err := ls.AddShard(append(optsWithMeta.shOpts, shard.WithTombstoneSource(tombstoneSource))...)
|
id, err := ls.AddShard(append(optsWithMeta.shOpts, shard.WithTombstoneSource(tombstoneSource))...)
|
||||||
fatalOnErr(err)
|
if err != nil {
|
||||||
|
c.log.Error("failed to attach shard to engine", zap.Error(err))
|
||||||
c.log.Info("shard attached to engine",
|
} else {
|
||||||
zap.Stringer("id", id),
|
shardsAttached++
|
||||||
)
|
c.log.Info("shard attached to engine", zap.Stringer("id", id))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if shardsAttached == 0 {
|
||||||
|
fatalOnErr(engineconfig.ErrNoShardConfigured)
|
||||||
}
|
}
|
||||||
|
|
||||||
c.cfgObject.cfgLocalStorage.localStorage = ls
|
c.cfgObject.cfgLocalStorage.localStorage = ls
|
||||||
|
@ -904,11 +917,9 @@ func (c *cfg) ObjectServiceLoad() float64 {
|
||||||
return float64(c.cfgObject.pool.putRemote.Running()) / float64(c.cfgObject.pool.putRemoteCapacity)
|
return float64(c.cfgObject.pool.putRemote.Running()) / float64(c.cfgObject.pool.putRemoteCapacity)
|
||||||
}
|
}
|
||||||
|
|
||||||
type dCfg struct {
|
type dCmp struct {
|
||||||
name string
|
name string
|
||||||
cfg interface {
|
reloadFunc func() error
|
||||||
Reload() error
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *cfg) signalWatcher() {
|
func (c *cfg) signalWatcher() {
|
||||||
|
@ -953,7 +964,7 @@ func (c *cfg) reloadConfig() {
|
||||||
|
|
||||||
// all the components are expected to support
|
// all the components are expected to support
|
||||||
// Logger's dynamic reconfiguration approach
|
// Logger's dynamic reconfiguration approach
|
||||||
var components []dCfg
|
var components []dCmp
|
||||||
|
|
||||||
// Logger
|
// Logger
|
||||||
|
|
||||||
|
@ -963,7 +974,18 @@ func (c *cfg) reloadConfig() {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
components = append(components, dCfg{name: "logger", cfg: logPrm})
|
components = append(components, dCmp{"logger", logPrm.Reload})
|
||||||
|
if cmp, updated := metricsComponent(c); updated {
|
||||||
|
if cmp.enabled {
|
||||||
|
cmp.preReload = enableMetricsSvc
|
||||||
|
} else {
|
||||||
|
cmp.preReload = disableMetricsSvc
|
||||||
|
}
|
||||||
|
components = append(components, dCmp{cmp.name, cmp.reload})
|
||||||
|
}
|
||||||
|
if cmp, updated := pprofComponent(c); updated {
|
||||||
|
components = append(components, dCmp{cmp.name, cmp.reload})
|
||||||
|
}
|
||||||
|
|
||||||
// Storage Engine
|
// Storage Engine
|
||||||
|
|
||||||
|
@ -979,7 +1001,7 @@ func (c *cfg) reloadConfig() {
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, component := range components {
|
for _, component := range components {
|
||||||
err = component.cfg.Reload()
|
err = component.reloadFunc()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.log.Error("updated configuration applying",
|
c.log.Error("updated configuration applying",
|
||||||
zap.String("component", component.name),
|
zap.String("component", component.name),
|
||||||
|
@ -995,7 +1017,7 @@ func (c *cfg) shutdown() {
|
||||||
|
|
||||||
c.ctxCancel()
|
c.ctxCancel()
|
||||||
for i := range c.closers {
|
for i := range c.closers {
|
||||||
c.closers[len(c.closers)-1-i]()
|
c.closers[len(c.closers)-1-i].fn()
|
||||||
}
|
}
|
||||||
close(c.internalErr)
|
close(c.internalErr)
|
||||||
}
|
}
|
||||||
|
|
|
@ -14,12 +14,14 @@ func TestObjectSection(t *testing.T) {
|
||||||
empty := configtest.EmptyConfig()
|
empty := configtest.EmptyConfig()
|
||||||
|
|
||||||
require.Equal(t, objectconfig.PutPoolSizeDefault, objectconfig.Put(empty).PoolSizeRemote())
|
require.Equal(t, objectconfig.PutPoolSizeDefault, objectconfig.Put(empty).PoolSizeRemote())
|
||||||
|
require.EqualValues(t, objectconfig.DefaultTombstoneLifetime, objectconfig.TombstoneLifetime(empty))
|
||||||
})
|
})
|
||||||
|
|
||||||
const path = "../../../../config/example/node"
|
const path = "../../../../config/example/node"
|
||||||
|
|
||||||
var fileConfigTest = func(c *config.Config) {
|
var fileConfigTest = func(c *config.Config) {
|
||||||
require.Equal(t, 100, objectconfig.Put(c).PoolSizeRemote())
|
require.Equal(t, 100, objectconfig.Put(c).PoolSizeRemote())
|
||||||
|
require.EqualValues(t, 10, objectconfig.TombstoneLifetime(c))
|
||||||
}
|
}
|
||||||
|
|
||||||
configtest.ForEachFileType(path, fileConfigTest)
|
configtest.ForEachFileType(path, fileConfigTest)
|
||||||
|
|
19
cmd/frostfs-node/config/object/delete.go
Normal file
19
cmd/frostfs-node/config/object/delete.go
Normal file
|
@ -0,0 +1,19 @@
|
||||||
|
package objectconfig
|
||||||
|
|
||||||
|
import "github.com/TrueCloudLab/frostfs-node/cmd/frostfs-node/config"
|
||||||
|
|
||||||
|
const (
|
||||||
|
deleteSubsection = "delete"
|
||||||
|
|
||||||
|
// DefaultTombstoneLifetime is the default value of tombstone lifetime in epochs.
|
||||||
|
DefaultTombstoneLifetime = 5
|
||||||
|
)
|
||||||
|
|
||||||
|
// TombstoneLifetime returns the value of `tombstone_lifetime` config parameter.
|
||||||
|
func TombstoneLifetime(c *config.Config) uint64 {
|
||||||
|
ts := config.UintSafe(c.Sub(subsection).Sub(deleteSubsection), "tombstone_lifetime")
|
||||||
|
if ts <= 0 {
|
||||||
|
return DefaultTombstoneLifetime
|
||||||
|
}
|
||||||
|
return ts
|
||||||
|
}
|
70
cmd/frostfs-node/httpcomponent.go
Normal file
70
cmd/frostfs-node/httpcomponent.go
Normal file
|
@ -0,0 +1,70 @@
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"net/http"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
httputil "github.com/TrueCloudLab/frostfs-node/pkg/util/http"
|
||||||
|
)
|
||||||
|
|
||||||
|
type httpComponent struct {
|
||||||
|
address string
|
||||||
|
name string
|
||||||
|
handler http.Handler
|
||||||
|
shutdownDur time.Duration
|
||||||
|
enabled bool
|
||||||
|
cfg *cfg
|
||||||
|
preReload func(c *cfg)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cmp *httpComponent) init(c *cfg) {
|
||||||
|
if !cmp.enabled {
|
||||||
|
c.log.Info(fmt.Sprintf("%s is disabled", cmp.name))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
// Init server with parameters
|
||||||
|
srv := httputil.New(
|
||||||
|
*httputil.NewHTTPSrvPrm(
|
||||||
|
cmp.address,
|
||||||
|
cmp.handler,
|
||||||
|
),
|
||||||
|
httputil.WithShutdownTimeout(
|
||||||
|
cmp.shutdownDur,
|
||||||
|
),
|
||||||
|
)
|
||||||
|
c.closers = append(c.closers, closer{
|
||||||
|
cmp.name,
|
||||||
|
func() { stopAndLog(c, cmp.name, srv.Shutdown) },
|
||||||
|
})
|
||||||
|
c.workers = append(c.workers, worker{
|
||||||
|
cmp.name,
|
||||||
|
func(ctx context.Context) {
|
||||||
|
runAndLog(c, cmp.name, false, func(c *cfg) {
|
||||||
|
fatalOnErr(srv.Serve())
|
||||||
|
})
|
||||||
|
},
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cmp *httpComponent) reload() error {
|
||||||
|
if cmp.preReload != nil {
|
||||||
|
cmp.preReload(cmp.cfg)
|
||||||
|
}
|
||||||
|
// Shutdown server
|
||||||
|
closer := getCloser(cmp.cfg, cmp.name)
|
||||||
|
if closer != nil {
|
||||||
|
closer.fn()
|
||||||
|
}
|
||||||
|
// Cleanup
|
||||||
|
delCloser(cmp.cfg, cmp.name)
|
||||||
|
delWorker(cmp.cfg, cmp.name)
|
||||||
|
// Init server with new parameters
|
||||||
|
cmp.init(cmp.cfg)
|
||||||
|
// Start worker
|
||||||
|
if cmp.enabled {
|
||||||
|
startWorker(cmp.cfg, *getWorker(cmp.cfg, cmp.name))
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
|
@ -81,8 +81,10 @@ func initApp(c *cfg) {
|
||||||
c.wg.Done()
|
c.wg.Done()
|
||||||
}()
|
}()
|
||||||
|
|
||||||
initAndLog(c, "pprof", initProfiler)
|
pprof, _ := pprofComponent(c)
|
||||||
initAndLog(c, "prometheus", initMetrics)
|
metrics, _ := metricsComponent(c)
|
||||||
|
initAndLog(c, pprof.name, pprof.init)
|
||||||
|
initAndLog(c, metrics.name, metrics.init)
|
||||||
|
|
||||||
initLocalStorage(c)
|
initLocalStorage(c)
|
||||||
|
|
||||||
|
@ -114,6 +116,19 @@ func runAndLog(c *cfg, name string, logSuccess bool, starter func(*cfg)) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func stopAndLog(c *cfg, name string, stopper func() error) {
|
||||||
|
c.log.Debug(fmt.Sprintf("shutting down %s service", name))
|
||||||
|
|
||||||
|
err := stopper()
|
||||||
|
if err != nil {
|
||||||
|
c.log.Debug(fmt.Sprintf("could not shutdown %s server", name),
|
||||||
|
zap.String("error", err.Error()),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
c.log.Debug(fmt.Sprintf("%s service has been stopped", name))
|
||||||
|
}
|
||||||
|
|
||||||
func bootUp(c *cfg) {
|
func bootUp(c *cfg) {
|
||||||
runAndLog(c, "NATS", true, connectNats)
|
runAndLog(c, "NATS", true, connectNats)
|
||||||
runAndLog(c, "gRPC", false, serveGRPC)
|
runAndLog(c, "gRPC", false, serveGRPC)
|
||||||
|
@ -135,5 +150,5 @@ func wait(c *cfg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *cfg) onShutdown(f func()) {
|
func (c *cfg) onShutdown(f func()) {
|
||||||
c.closers = append(c.closers, f)
|
c.closers = append(c.closers, closer{"", f})
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,47 +1,45 @@
|
||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
|
||||||
|
|
||||||
metricsconfig "github.com/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/metrics"
|
metricsconfig "github.com/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/metrics"
|
||||||
httputil "github.com/TrueCloudLab/frostfs-node/pkg/util/http"
|
|
||||||
"github.com/prometheus/client_golang/prometheus/promhttp"
|
"github.com/prometheus/client_golang/prometheus/promhttp"
|
||||||
"go.uber.org/zap"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func initMetrics(c *cfg) {
|
func metricsComponent(c *cfg) (*httpComponent, bool) {
|
||||||
if !metricsconfig.Enabled(c.appCfg) {
|
var updated bool
|
||||||
c.log.Info("prometheus is disabled")
|
// check if it has been inited before
|
||||||
return
|
if c.dynamicConfiguration.metrics == nil {
|
||||||
|
c.dynamicConfiguration.metrics = new(httpComponent)
|
||||||
|
c.dynamicConfiguration.metrics.cfg = c
|
||||||
|
c.dynamicConfiguration.metrics.name = "metrics"
|
||||||
|
c.dynamicConfiguration.metrics.handler = promhttp.Handler()
|
||||||
|
updated = true
|
||||||
}
|
}
|
||||||
|
|
||||||
var prm httputil.Prm
|
// (re)init read configuration
|
||||||
|
enabled := metricsconfig.Enabled(c.appCfg)
|
||||||
prm.Address = metricsconfig.Address(c.appCfg)
|
if enabled != c.dynamicConfiguration.metrics.enabled {
|
||||||
prm.Handler = promhttp.Handler()
|
c.dynamicConfiguration.metrics.enabled = enabled
|
||||||
|
updated = true
|
||||||
srv := httputil.New(prm,
|
}
|
||||||
httputil.WithShutdownTimeout(
|
address := metricsconfig.Address(c.appCfg)
|
||||||
metricsconfig.ShutdownTimeout(c.appCfg),
|
if address != c.dynamicConfiguration.metrics.address {
|
||||||
),
|
c.dynamicConfiguration.metrics.address = address
|
||||||
)
|
updated = true
|
||||||
|
}
|
||||||
c.workers = append(c.workers, newWorkerFromFunc(func(context.Context) {
|
dur := metricsconfig.ShutdownTimeout(c.appCfg)
|
||||||
runAndLog(c, "metrics", false, func(c *cfg) {
|
if dur != c.dynamicConfiguration.metrics.shutdownDur {
|
||||||
fatalOnErr(srv.Serve())
|
c.dynamicConfiguration.metrics.shutdownDur = dur
|
||||||
})
|
updated = true
|
||||||
}))
|
|
||||||
|
|
||||||
c.closers = append(c.closers, func() {
|
|
||||||
c.log.Debug("shutting down metrics service")
|
|
||||||
|
|
||||||
err := srv.Shutdown()
|
|
||||||
if err != nil {
|
|
||||||
c.log.Debug("could not shutdown metrics server",
|
|
||||||
zap.String("error", err.Error()),
|
|
||||||
)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
c.log.Debug("metrics service has been stopped")
|
return c.dynamicConfiguration.metrics, updated
|
||||||
})
|
}
|
||||||
|
|
||||||
|
func enableMetricsSvc(c *cfg) {
|
||||||
|
c.shared.metricsSvc.Enable()
|
||||||
|
}
|
||||||
|
|
||||||
|
func disableMetricsSvc(c *cfg) {
|
||||||
|
c.shared.metricsSvc.Disable()
|
||||||
}
|
}
|
||||||
|
|
|
@ -8,6 +8,7 @@ import (
|
||||||
|
|
||||||
"github.com/TrueCloudLab/frostfs-api-go/v2/object"
|
"github.com/TrueCloudLab/frostfs-api-go/v2/object"
|
||||||
objectGRPC "github.com/TrueCloudLab/frostfs-api-go/v2/object/grpc"
|
objectGRPC "github.com/TrueCloudLab/frostfs-api-go/v2/object/grpc"
|
||||||
|
metricsconfig "github.com/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/metrics"
|
||||||
policerconfig "github.com/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/policer"
|
policerconfig "github.com/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/policer"
|
||||||
replicatorconfig "github.com/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/replicator"
|
replicatorconfig "github.com/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/replicator"
|
||||||
coreclient "github.com/TrueCloudLab/frostfs-node/pkg/core/client"
|
coreclient "github.com/TrueCloudLab/frostfs-node/pkg/core/client"
|
||||||
|
@ -182,6 +183,14 @@ func initObjectService(c *cfg) {
|
||||||
basicConstructor: c.clientCache,
|
basicConstructor: c.clientCache,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
putConstructor := &coreClientConstructor{
|
||||||
|
log: c.log,
|
||||||
|
nmSrc: c.netMapSource,
|
||||||
|
netState: c.cfgNetmap.state,
|
||||||
|
trustStorage: c.cfgReputation.localTrustStorage,
|
||||||
|
basicConstructor: c.putClientCache,
|
||||||
|
}
|
||||||
|
|
||||||
var irFetcher v2.InnerRingFetcher
|
var irFetcher v2.InnerRingFetcher
|
||||||
|
|
||||||
if c.cfgMorph.client.ProbeNotary() {
|
if c.cfgMorph.client.ProbeNotary() {
|
||||||
|
@ -238,7 +247,11 @@ func initObjectService(c *cfg) {
|
||||||
|
|
||||||
traverseGen := util.NewTraverserGenerator(c.netMapSource, c.cfgObject.cnrSource, c)
|
traverseGen := util.NewTraverserGenerator(c.netMapSource, c.cfgObject.cnrSource, c)
|
||||||
|
|
||||||
c.workers = append(c.workers, pol)
|
c.workers = append(c.workers, worker{
|
||||||
|
fn: func(ctx context.Context) {
|
||||||
|
pol.Run(ctx)
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
var os putsvc.ObjectStorage = engineWithoutNotifications{
|
var os putsvc.ObjectStorage = engineWithoutNotifications{
|
||||||
engine: ls,
|
engine: ls,
|
||||||
|
@ -255,7 +268,7 @@ func initObjectService(c *cfg) {
|
||||||
|
|
||||||
sPut := putsvc.NewService(
|
sPut := putsvc.NewService(
|
||||||
putsvc.WithKeyStorage(keyStorage),
|
putsvc.WithKeyStorage(keyStorage),
|
||||||
putsvc.WithClientConstructor(coreConstructor),
|
putsvc.WithClientConstructor(putConstructor),
|
||||||
putsvc.WithMaxSizeSource(newCachedMaxObjectSizeSource(c)),
|
putsvc.WithMaxSizeSource(newCachedMaxObjectSizeSource(c)),
|
||||||
putsvc.WithObjectStorage(os),
|
putsvc.WithObjectStorage(os),
|
||||||
putsvc.WithContainerSource(c.cfgObject.cnrSource),
|
putsvc.WithContainerSource(c.cfgObject.cnrSource),
|
||||||
|
@ -316,7 +329,7 @@ func initObjectService(c *cfg) {
|
||||||
deletesvc.WithPutService(sPut),
|
deletesvc.WithPutService(sPut),
|
||||||
deletesvc.WithNetworkInfo(&delNetInfo{
|
deletesvc.WithNetworkInfo(&delNetInfo{
|
||||||
State: c.cfgNetmap.state,
|
State: c.cfgNetmap.state,
|
||||||
tsLifetime: 5,
|
tsLifetime: c.cfgObject.tombstoneLifetime,
|
||||||
|
|
||||||
cfg: c,
|
cfg: c,
|
||||||
}),
|
}),
|
||||||
|
@ -372,12 +385,9 @@ func initObjectService(c *cfg) {
|
||||||
respSvc,
|
respSvc,
|
||||||
)
|
)
|
||||||
|
|
||||||
var firstSvc objectService.ServiceServer = signSvc
|
c.shared.metricsSvc = objectService.NewMetricCollector(
|
||||||
if c.metricsCollector != nil {
|
signSvc, c.metricsCollector, metricsconfig.Enabled(c.appCfg))
|
||||||
firstSvc = objectService.NewMetricCollector(signSvc, c.metricsCollector)
|
server := objectTransportGRPC.New(c.shared.metricsSvc)
|
||||||
}
|
|
||||||
|
|
||||||
server := objectTransportGRPC.New(firstSvc)
|
|
||||||
|
|
||||||
for _, srv := range c.cfgGRPC.servers {
|
for _, srv := range c.cfgGRPC.servers {
|
||||||
objectGRPC.RegisterObjectServiceServer(srv, server)
|
objectGRPC.RegisterObjectServiceServer(srv, server)
|
||||||
|
|
|
@ -1,46 +1,37 @@
|
||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
|
||||||
|
|
||||||
profilerconfig "github.com/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/profiler"
|
profilerconfig "github.com/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/profiler"
|
||||||
httputil "github.com/TrueCloudLab/frostfs-node/pkg/util/http"
|
httputil "github.com/TrueCloudLab/frostfs-node/pkg/util/http"
|
||||||
"go.uber.org/zap"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func initProfiler(c *cfg) {
|
func pprofComponent(c *cfg) (*httpComponent, bool) {
|
||||||
if !profilerconfig.Enabled(c.appCfg) {
|
var updated bool
|
||||||
c.log.Info("pprof is disabled")
|
// check if it has been inited before
|
||||||
return
|
if c.dynamicConfiguration.pprof == nil {
|
||||||
|
c.dynamicConfiguration.pprof = new(httpComponent)
|
||||||
|
c.dynamicConfiguration.pprof.cfg = c
|
||||||
|
c.dynamicConfiguration.pprof.name = "pprof"
|
||||||
|
c.dynamicConfiguration.pprof.handler = httputil.Handler()
|
||||||
|
updated = true
|
||||||
}
|
}
|
||||||
|
|
||||||
var prm httputil.Prm
|
// (re)init read configuration
|
||||||
|
enabled := profilerconfig.Enabled(c.appCfg)
|
||||||
prm.Address = profilerconfig.Address(c.appCfg)
|
if enabled != c.dynamicConfiguration.pprof.enabled {
|
||||||
prm.Handler = httputil.Handler()
|
c.dynamicConfiguration.pprof.enabled = enabled
|
||||||
|
updated = true
|
||||||
srv := httputil.New(prm,
|
}
|
||||||
httputil.WithShutdownTimeout(
|
address := profilerconfig.Address(c.appCfg)
|
||||||
profilerconfig.ShutdownTimeout(c.appCfg),
|
if address != c.dynamicConfiguration.pprof.address {
|
||||||
),
|
c.dynamicConfiguration.pprof.address = address
|
||||||
)
|
updated = true
|
||||||
|
}
|
||||||
c.workers = append(c.workers, newWorkerFromFunc(func(context.Context) {
|
dur := profilerconfig.ShutdownTimeout(c.appCfg)
|
||||||
runAndLog(c, "profiler", false, func(c *cfg) {
|
if dur != c.dynamicConfiguration.pprof.shutdownDur {
|
||||||
fatalOnErr(srv.Serve())
|
c.dynamicConfiguration.pprof.shutdownDur = dur
|
||||||
})
|
updated = true
|
||||||
}))
|
|
||||||
|
|
||||||
c.closers = append(c.closers, func() {
|
|
||||||
c.log.Debug("shutting down profiling service")
|
|
||||||
|
|
||||||
err := srv.Shutdown()
|
|
||||||
if err != nil {
|
|
||||||
c.log.Debug("could not shutdown pprof server",
|
|
||||||
zap.String("error", err.Error()),
|
|
||||||
)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
c.log.Debug("profiling service has been stopped")
|
return c.dynamicConfiguration.pprof, updated
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -4,31 +4,46 @@ import (
|
||||||
"context"
|
"context"
|
||||||
)
|
)
|
||||||
|
|
||||||
type worker interface {
|
type worker struct {
|
||||||
Run(context.Context)
|
name string
|
||||||
}
|
|
||||||
|
|
||||||
type workerFromFunc struct {
|
|
||||||
fn func(context.Context)
|
fn func(context.Context)
|
||||||
}
|
}
|
||||||
|
|
||||||
func newWorkerFromFunc(fn func(ctx context.Context)) worker {
|
func newWorkerFromFunc(fn func(ctx context.Context)) worker {
|
||||||
return &workerFromFunc{
|
return worker{
|
||||||
fn: fn,
|
fn: fn,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *workerFromFunc) Run(ctx context.Context) {
|
|
||||||
w.fn(ctx)
|
|
||||||
}
|
|
||||||
|
|
||||||
func startWorkers(c *cfg) {
|
func startWorkers(c *cfg) {
|
||||||
for _, wrk := range c.workers {
|
for _, wrk := range c.workers {
|
||||||
|
startWorker(c, wrk)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func startWorker(c *cfg, wrk worker) {
|
||||||
c.wg.Add(1)
|
c.wg.Add(1)
|
||||||
|
|
||||||
go func(w worker) {
|
go func(w worker) {
|
||||||
w.Run(c.ctx)
|
w.fn(c.ctx)
|
||||||
c.wg.Done()
|
c.wg.Done()
|
||||||
}(wrk)
|
}(wrk)
|
||||||
|
}
|
||||||
|
|
||||||
|
func delWorker(c *cfg, name string) {
|
||||||
|
for i, worker := range c.workers {
|
||||||
|
if worker.name == name {
|
||||||
|
c.workers = append(c.workers[:i], c.workers[i+1:]...)
|
||||||
|
return
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func getWorker(c *cfg, name string) *worker {
|
||||||
|
for _, wrk := range c.workers {
|
||||||
|
if wrk.name == name {
|
||||||
|
return &wrk
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
|
@ -85,6 +85,7 @@ FROSTFS_REPLICATOR_POOL_SIZE=10
|
||||||
|
|
||||||
# Object service section
|
# Object service section
|
||||||
FROSTFS_OBJECT_PUT_POOL_SIZE_REMOTE=100
|
FROSTFS_OBJECT_PUT_POOL_SIZE_REMOTE=100
|
||||||
|
FROSTFS_OBJECT_DELETE_TOMBSTONE_LIFETIME=10
|
||||||
|
|
||||||
# Storage engine section
|
# Storage engine section
|
||||||
FROSTFS_STORAGE_SHARD_POOL_SIZE=15
|
FROSTFS_STORAGE_SHARD_POOL_SIZE=15
|
||||||
|
|
|
@ -128,6 +128,9 @@
|
||||||
"put_timeout": "15s"
|
"put_timeout": "15s"
|
||||||
},
|
},
|
||||||
"object": {
|
"object": {
|
||||||
|
"delete": {
|
||||||
|
"tombstone_lifetime": 10
|
||||||
|
},
|
||||||
"put": {
|
"put": {
|
||||||
"pool_size_remote": 100
|
"pool_size_remote": 100
|
||||||
}
|
}
|
||||||
|
|
|
@ -107,6 +107,8 @@ replicator:
|
||||||
pool_size: 10 # maximum amount of concurrent replications
|
pool_size: 10 # maximum amount of concurrent replications
|
||||||
|
|
||||||
object:
|
object:
|
||||||
|
delete:
|
||||||
|
tombstone_lifetime: 10 # tombstone "local" lifetime in epochs
|
||||||
put:
|
put:
|
||||||
pool_size_remote: 100 # number of async workers for remote PUT operations
|
pool_size_remote: 100 # number of async workers for remote PUT operations
|
||||||
|
|
||||||
|
|
|
@ -415,7 +415,7 @@ replicator:
|
||||||
| `pool_size` | `int` | Equal to `object.put.pool_size_remote` | Maximum amount of concurrent replications. |
|
| `pool_size` | `int` | Equal to `object.put.pool_size_remote` | Maximum amount of concurrent replications. |
|
||||||
|
|
||||||
# `object` section
|
# `object` section
|
||||||
Contains pool sizes for object operations with remote nodes.
|
Contains object-service related parameters.
|
||||||
|
|
||||||
```yaml
|
```yaml
|
||||||
object:
|
object:
|
||||||
|
@ -424,5 +424,6 @@ object:
|
||||||
```
|
```
|
||||||
|
|
||||||
| Parameter | Type | Default value | Description |
|
| Parameter | Type | Default value | Description |
|
||||||
|------------------------|-------|---------------|------------------------------------------------------------------------------------------------|
|
|-----------------------------|-------|---------------|------------------------------------------------------------------------------------------------|
|
||||||
|
| `delete.tombstone_lifetime` | `int` | `5` | Tombstone lifetime for removed objects in epochs. |
|
||||||
| `put.pool_size_remote` | `int` | `10` | Max pool size for performing remote `PUT` operations. Used by Policer and Replicator services. |
|
| `put.pool_size_remote` | `int` | `10` | Max pool size for performing remote `PUT` operations. Used by Policer and Replicator services. |
|
||||||
|
|
|
@ -33,6 +33,8 @@ type MultiAddressClient interface {
|
||||||
// RawForAddress must return rawclient.Client
|
// RawForAddress must return rawclient.Client
|
||||||
// for the passed network.Address.
|
// for the passed network.Address.
|
||||||
RawForAddress(network.Address, func(cli *rawclient.Client) error) error
|
RawForAddress(network.Address, func(cli *rawclient.Client) error) error
|
||||||
|
|
||||||
|
ReportError(error)
|
||||||
}
|
}
|
||||||
|
|
||||||
// NodeInfo groups information about a FrostFS storage node needed for Client construction.
|
// NodeInfo groups information about a FrostFS storage node needed for Client construction.
|
||||||
|
|
|
@ -7,6 +7,7 @@ import (
|
||||||
"io/fs"
|
"io/fs"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"syscall"
|
"syscall"
|
||||||
|
|
||||||
|
@ -230,23 +231,63 @@ func (t *FSTree) Put(prm common.PutPrm) (common.PutRes, error) {
|
||||||
prm.RawData = t.Compress(prm.RawData)
|
prm.RawData = t.Compress(prm.RawData)
|
||||||
}
|
}
|
||||||
|
|
||||||
tmpPath := p + "#"
|
// Here is a situation:
|
||||||
err := t.writeFile(tmpPath, prm.RawData)
|
// Feb 09 13:10:37 buky neofs-node[32445]: 2023-02-09T13:10:37.161Z info log/log.go:13 local object storage operation {"shard_id": "SkT8BfjouW6t93oLuzQ79s", "address": "7NxFz4SruSi8TqXacr2Ae22nekMhgYk1sfkddJo9PpWk/5enyUJGCyU1sfrURDnHEjZFdbGqANVhayYGfdSqtA6wA", "op": "PUT", "type": "fstree", "storage_id": ""}
|
||||||
|
// Feb 09 13:10:37 buky neofs-node[32445]: 2023-02-09T13:10:37.183Z info log/log.go:13 local object storage operation {"shard_id": "SkT8BfjouW6t93oLuzQ79s", "address": "7NxFz4SruSi8TqXacr2Ae22nekMhgYk1sfkddJo9PpWk/5enyUJGCyU1sfrURDnHEjZFdbGqANVhayYGfdSqtA6wA", "op": "metabase PUT"}
|
||||||
|
// Feb 09 13:10:37 buky neofs-node[32445]: 2023-02-09T13:10:37.862Z debug policer/check.go:231 shortage of object copies detected {"component": "Object Policer", "object": "7NxFz4SruSi8TqXacr2Ae22nekMhgYk1sfkddJo9PpWk/5enyUJGCyU1sfrURDnHEjZFdbGqANVhayYGfdSqtA6wA", "shortage": 1}
|
||||||
|
// Feb 09 13:10:37 buky neofs-node[32445]: 2023-02-09T13:10:37.862Z debug shard/get.go:124 object is missing in write-cache {"shard_id": "SkT8BfjouW6t93oLuzQ79s", "addr": "7NxFz4SruSi8TqXacr2Ae22nekMhgYk1sfkddJo9PpWk/5enyUJGCyU1sfrURDnHEjZFdbGqANVhayYGfdSqtA6wA", "skip_meta": false}
|
||||||
|
//
|
||||||
|
// 1. We put an object on node 1.
|
||||||
|
// 2. Relentless policer sees that it has only 1 copy and tries to PUT it to node 2.
|
||||||
|
// 3. PUT operation started by client at (1) also puts an object here.
|
||||||
|
// 4. Now we have concurrent writes and one of `Rename` calls will return `no such file` error.
|
||||||
|
// Even more than that, concurrent writes can corrupt data.
|
||||||
|
//
|
||||||
|
// So here is a solution:
|
||||||
|
// 1. Write a file to 'name + 1'.
|
||||||
|
// 2. If it exists, retry with temporary name being 'name + 2'.
|
||||||
|
// 3. Set some reasonable number of attempts.
|
||||||
|
//
|
||||||
|
// It is a bit kludgey, but I am unusually proud about having found this out after
|
||||||
|
// hours of research on linux kernel, dirsync mount option and ext4 FS, turned out
|
||||||
|
// to be so hecking simple.
|
||||||
|
// In a very rare situation we can have multiple partially written copies on disk,
|
||||||
|
// this will be fixed in another issue (we should remove garbage on start).
|
||||||
|
const retryCount = 5
|
||||||
|
for i := 0; i < retryCount; i++ {
|
||||||
|
tmpPath := p + "#" + strconv.FormatUint(uint64(i), 10)
|
||||||
|
err := t.writeAndRename(tmpPath, p, prm.RawData)
|
||||||
|
if err != syscall.EEXIST || i == retryCount-1 {
|
||||||
|
return common.PutRes{StorageID: []byte{}}, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// unreachable, but precaution never hurts, especially 1 day before release.
|
||||||
|
return common.PutRes{StorageID: []byte{}}, fmt.Errorf("couldn't read file after %d retries", retryCount)
|
||||||
|
}
|
||||||
|
|
||||||
|
// writeAndRename opens tmpPath exclusively, writes data to it and renames it to p.
|
||||||
|
func (t *FSTree) writeAndRename(tmpPath, p string, data []byte) error {
|
||||||
|
err := t.writeFile(tmpPath, data)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
var pe *fs.PathError
|
var pe *fs.PathError
|
||||||
if errors.As(err, &pe) && pe.Err == syscall.ENOSPC {
|
if errors.As(err, &pe) {
|
||||||
|
switch pe.Err {
|
||||||
|
case syscall.ENOSPC:
|
||||||
err = common.ErrNoSpace
|
err = common.ErrNoSpace
|
||||||
_ = os.RemoveAll(tmpPath)
|
_ = os.RemoveAll(tmpPath)
|
||||||
|
case syscall.EEXIST:
|
||||||
|
return syscall.EEXIST
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
err = os.Rename(tmpPath, p)
|
err = os.Rename(tmpPath, p)
|
||||||
}
|
}
|
||||||
|
return err
|
||||||
return common.PutRes{StorageID: []byte{}}, err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *FSTree) writeFlags() int {
|
func (t *FSTree) writeFlags() int {
|
||||||
flags := os.O_WRONLY | os.O_CREATE | os.O_TRUNC
|
flags := os.O_WRONLY | os.O_CREATE | os.O_TRUNC | os.O_EXCL
|
||||||
if t.noSync {
|
if t.noSync {
|
||||||
return flags
|
return flags
|
||||||
}
|
}
|
||||||
|
|
|
@ -23,27 +23,44 @@ func (e *StorageEngine) Open() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *StorageEngine) open() error {
|
func (e *StorageEngine) open() error {
|
||||||
e.mtx.RLock()
|
e.mtx.Lock()
|
||||||
defer e.mtx.RUnlock()
|
defer e.mtx.Unlock()
|
||||||
|
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
var errCh = make(chan error, len(e.shards))
|
var errCh = make(chan shardInitError, len(e.shards))
|
||||||
|
|
||||||
for id, sh := range e.shards {
|
for id, sh := range e.shards {
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
go func(id string, sh *shard.Shard) {
|
go func(id string, sh *shard.Shard) {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
if err := sh.Open(); err != nil {
|
if err := sh.Open(); err != nil {
|
||||||
errCh <- fmt.Errorf("could not open shard %s: %w", id, err)
|
errCh <- shardInitError{
|
||||||
|
err: err,
|
||||||
|
id: id,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}(id, sh.Shard)
|
}(id, sh.Shard)
|
||||||
}
|
}
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
close(errCh)
|
close(errCh)
|
||||||
|
|
||||||
for err := range errCh {
|
for res := range errCh {
|
||||||
|
if res.err != nil {
|
||||||
|
e.log.Error("could not open shard, closing and skipping",
|
||||||
|
zap.String("id", res.id),
|
||||||
|
zap.Error(res.err))
|
||||||
|
|
||||||
|
sh := e.shards[res.id]
|
||||||
|
delete(e.shards, res.id)
|
||||||
|
|
||||||
|
err := sh.Close()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
e.log.Error("could not close partially initialized shard",
|
||||||
|
zap.String("id", res.id),
|
||||||
|
zap.Error(res.err))
|
||||||
|
}
|
||||||
|
|
||||||
|
continue
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -76,12 +93,20 @@ func (e *StorageEngine) Init() error {
|
||||||
for res := range errCh {
|
for res := range errCh {
|
||||||
if res.err != nil {
|
if res.err != nil {
|
||||||
if errors.Is(res.err, blobstor.ErrInitBlobovniczas) {
|
if errors.Is(res.err, blobstor.ErrInitBlobovniczas) {
|
||||||
delete(e.shards, res.id)
|
e.log.Error("could not initialize shard, closing and skipping",
|
||||||
|
|
||||||
e.log.Error("shard initialization failure, skipping",
|
|
||||||
zap.String("id", res.id),
|
zap.String("id", res.id),
|
||||||
zap.Error(res.err))
|
zap.Error(res.err))
|
||||||
|
|
||||||
|
sh := e.shards[res.id]
|
||||||
|
delete(e.shards, res.id)
|
||||||
|
|
||||||
|
err := sh.Close()
|
||||||
|
if err != nil {
|
||||||
|
e.log.Error("could not close partially initialized shard",
|
||||||
|
zap.String("id", res.id),
|
||||||
|
zap.Error(res.err))
|
||||||
|
}
|
||||||
|
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
return fmt.Errorf("could not initialize shard %s: %w", res.id, res.err)
|
return fmt.Errorf("could not initialize shard %s: %w", res.id, res.err)
|
||||||
|
|
|
@ -7,16 +7,148 @@ import (
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"strconv"
|
"strconv"
|
||||||
"testing"
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/TrueCloudLab/frostfs-node/pkg/core/object"
|
"github.com/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||||
"github.com/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
|
"github.com/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
|
||||||
meta "github.com/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
meta "github.com/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
||||||
|
"github.com/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
|
||||||
"github.com/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
|
"github.com/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
|
||||||
"github.com/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
|
"github.com/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
|
||||||
|
"github.com/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache"
|
||||||
|
"github.com/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||||
cidtest "github.com/TrueCloudLab/frostfs-sdk-go/container/id/test"
|
cidtest "github.com/TrueCloudLab/frostfs-sdk-go/container/id/test"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
|
"go.etcd.io/bbolt"
|
||||||
|
"go.uber.org/zap/zaptest"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// TestInitializationFailure checks that shard is initialized and closed even if media
|
||||||
|
// under any single component is absent. We emulate this with permission denied error.
|
||||||
|
func TestInitializationFailure(t *testing.T) {
|
||||||
|
type paths struct {
|
||||||
|
blobstor string
|
||||||
|
metabase string
|
||||||
|
writecache string
|
||||||
|
pilorama string
|
||||||
|
}
|
||||||
|
|
||||||
|
existsDir := filepath.Join(t.TempDir(), "shard")
|
||||||
|
badDir := filepath.Join(t.TempDir(), "missing")
|
||||||
|
|
||||||
|
testShard := func(c paths) []shard.Option {
|
||||||
|
sid, err := generateShardID()
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
return []shard.Option{
|
||||||
|
shard.WithID(sid),
|
||||||
|
shard.WithLogger(&logger.Logger{Logger: zaptest.NewLogger(t)}),
|
||||||
|
shard.WithBlobStorOptions(
|
||||||
|
blobstor.WithStorages(
|
||||||
|
newStorages(c.blobstor, 1<<20))),
|
||||||
|
shard.WithMetaBaseOptions(
|
||||||
|
meta.WithBoltDBOptions(&bbolt.Options{
|
||||||
|
Timeout: 100 * time.Millisecond,
|
||||||
|
}),
|
||||||
|
meta.WithPath(c.metabase),
|
||||||
|
meta.WithPermissions(0700),
|
||||||
|
meta.WithEpochState(epochState{})),
|
||||||
|
shard.WithWriteCache(true),
|
||||||
|
shard.WithWriteCacheOptions(writecache.WithPath(c.writecache)),
|
||||||
|
shard.WithPiloramaOptions(pilorama.WithPath(c.pilorama)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
t.Run("blobstor", func(t *testing.T) {
|
||||||
|
badDir := filepath.Join(badDir, t.Name())
|
||||||
|
require.NoError(t, os.MkdirAll(badDir, os.ModePerm))
|
||||||
|
require.NoError(t, os.Chmod(badDir, 0))
|
||||||
|
testEngineFailInitAndReload(t, badDir, false, testShard(paths{
|
||||||
|
blobstor: filepath.Join(badDir, "0"),
|
||||||
|
metabase: filepath.Join(existsDir, t.Name(), "1"),
|
||||||
|
writecache: filepath.Join(existsDir, t.Name(), "2"),
|
||||||
|
pilorama: filepath.Join(existsDir, t.Name(), "3"),
|
||||||
|
}))
|
||||||
|
})
|
||||||
|
t.Run("metabase", func(t *testing.T) {
|
||||||
|
badDir := filepath.Join(badDir, t.Name())
|
||||||
|
require.NoError(t, os.MkdirAll(badDir, os.ModePerm))
|
||||||
|
require.NoError(t, os.Chmod(badDir, 0))
|
||||||
|
testEngineFailInitAndReload(t, badDir, true, testShard(paths{
|
||||||
|
blobstor: filepath.Join(existsDir, t.Name(), "0"),
|
||||||
|
metabase: filepath.Join(badDir, "1"),
|
||||||
|
writecache: filepath.Join(existsDir, t.Name(), "2"),
|
||||||
|
pilorama: filepath.Join(existsDir, t.Name(), "3"),
|
||||||
|
}))
|
||||||
|
})
|
||||||
|
t.Run("write-cache", func(t *testing.T) {
|
||||||
|
badDir := filepath.Join(badDir, t.Name())
|
||||||
|
require.NoError(t, os.MkdirAll(badDir, os.ModePerm))
|
||||||
|
require.NoError(t, os.Chmod(badDir, 0))
|
||||||
|
testEngineFailInitAndReload(t, badDir, false, testShard(paths{
|
||||||
|
blobstor: filepath.Join(existsDir, t.Name(), "0"),
|
||||||
|
metabase: filepath.Join(existsDir, t.Name(), "1"),
|
||||||
|
writecache: filepath.Join(badDir, "2"),
|
||||||
|
pilorama: filepath.Join(existsDir, t.Name(), "3"),
|
||||||
|
}))
|
||||||
|
})
|
||||||
|
t.Run("pilorama", func(t *testing.T) {
|
||||||
|
badDir := filepath.Join(badDir, t.Name())
|
||||||
|
require.NoError(t, os.MkdirAll(badDir, os.ModePerm))
|
||||||
|
require.NoError(t, os.Chmod(badDir, 0))
|
||||||
|
testEngineFailInitAndReload(t, badDir, false, testShard(paths{
|
||||||
|
blobstor: filepath.Join(existsDir, t.Name(), "0"),
|
||||||
|
metabase: filepath.Join(existsDir, t.Name(), "1"),
|
||||||
|
writecache: filepath.Join(existsDir, t.Name(), "2"),
|
||||||
|
pilorama: filepath.Join(badDir, "3"),
|
||||||
|
}))
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func testEngineFailInitAndReload(t *testing.T, badDir string, errOnAdd bool, s []shard.Option) {
|
||||||
|
var configID string
|
||||||
|
|
||||||
|
e := New()
|
||||||
|
_, err := e.AddShard(s...)
|
||||||
|
if errOnAdd {
|
||||||
|
require.Error(t, err)
|
||||||
|
// This branch is only taken when we cannot update shard ID in the metabase.
|
||||||
|
// The id cannot be encountered during normal operation, but it is ok for tests:
|
||||||
|
// it is only compared for equality with other ids and we have 0 shards here.
|
||||||
|
configID = "id"
|
||||||
|
} else {
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
e.mtx.RLock()
|
||||||
|
var id string
|
||||||
|
for id = range e.shards {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
configID = calculateShardID(e.shards[id].Shard.DumpInfo())
|
||||||
|
e.mtx.RUnlock()
|
||||||
|
|
||||||
|
err = e.Open()
|
||||||
|
if err == nil {
|
||||||
|
require.Error(t, e.Init())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
e.mtx.RLock()
|
||||||
|
shardCount := len(e.shards)
|
||||||
|
e.mtx.RUnlock()
|
||||||
|
require.Equal(t, 0, shardCount)
|
||||||
|
|
||||||
|
require.NoError(t, os.Chmod(badDir, os.ModePerm))
|
||||||
|
require.NoError(t, e.Reload(ReConfiguration{
|
||||||
|
shards: map[string][]shard.Option{configID: s},
|
||||||
|
}))
|
||||||
|
|
||||||
|
e.mtx.RLock()
|
||||||
|
shardCount = len(e.shards)
|
||||||
|
e.mtx.RUnlock()
|
||||||
|
require.Equal(t, 1, shardCount)
|
||||||
|
}
|
||||||
|
|
||||||
func TestExecBlocks(t *testing.T) {
|
func TestExecBlocks(t *testing.T) {
|
||||||
e := testNewEngineWithShardNum(t, 2) // number doesn't matter in this test, 2 is several but not many
|
e := testNewEngineWithShardNum(t, 2) // number doesn't matter in this test, 2 is several but not many
|
||||||
t.Cleanup(func() {
|
t.Cleanup(func() {
|
||||||
|
|
|
@ -74,6 +74,7 @@ func (e *StorageEngine) get(prm GetPrm) (GetRes, error) {
|
||||||
shPrm.SetAddress(prm.addr)
|
shPrm.SetAddress(prm.addr)
|
||||||
|
|
||||||
var hasDegraded bool
|
var hasDegraded bool
|
||||||
|
var objectExpired bool
|
||||||
|
|
||||||
e.iterateOverSortedShards(prm.addr, func(_ int, sh hashedShard) (stop bool) {
|
e.iterateOverSortedShards(prm.addr, func(_ int, sh hashedShard) (stop bool) {
|
||||||
noMeta := sh.GetMode().NoMetabase()
|
noMeta := sh.GetMode().NoMetabase()
|
||||||
|
@ -113,7 +114,7 @@ func (e *StorageEngine) get(prm GetPrm) (GetRes, error) {
|
||||||
case shard.IsErrObjectExpired(err):
|
case shard.IsErrObjectExpired(err):
|
||||||
// object is found but should not
|
// object is found but should not
|
||||||
// be returned
|
// be returned
|
||||||
outError = errNotFound
|
objectExpired = true
|
||||||
return true
|
return true
|
||||||
default:
|
default:
|
||||||
e.reportShardError(sh, "could not get object from shard", err)
|
e.reportShardError(sh, "could not get object from shard", err)
|
||||||
|
@ -130,6 +131,10 @@ func (e *StorageEngine) get(prm GetPrm) (GetRes, error) {
|
||||||
return GetRes{}, logicerr.Wrap(objectSDK.NewSplitInfoError(outSI))
|
return GetRes{}, logicerr.Wrap(objectSDK.NewSplitInfoError(outSI))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if objectExpired {
|
||||||
|
return GetRes{}, errNotFound
|
||||||
|
}
|
||||||
|
|
||||||
if obj == nil {
|
if obj == nil {
|
||||||
if !hasDegraded && shardWithMeta.Shard == nil || !shard.IsErrNotFound(outError) {
|
if !hasDegraded && shardWithMeta.Shard == nil || !shard.IsErrNotFound(outError) {
|
||||||
return GetRes{}, outError
|
return GetRes{}, outError
|
||||||
|
|
|
@ -1,6 +1,8 @@
|
||||||
package meta
|
package meta
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"errors"
|
||||||
|
|
||||||
oid "github.com/TrueCloudLab/frostfs-sdk-go/object/id"
|
oid "github.com/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/util/slice"
|
"github.com/nspcc-dev/neo-go/pkg/util/slice"
|
||||||
"go.etcd.io/bbolt"
|
"go.etcd.io/bbolt"
|
||||||
|
@ -94,10 +96,11 @@ func (db *DB) UpdateStorageID(prm UpdateStorageIDPrm) (res UpdateStorageIDRes, e
|
||||||
|
|
||||||
err = db.boltDB.Batch(func(tx *bbolt.Tx) error {
|
err = db.boltDB.Batch(func(tx *bbolt.Tx) error {
|
||||||
exists, err := db.exists(tx, prm.addr, currEpoch)
|
exists, err := db.exists(tx, prm.addr, currEpoch)
|
||||||
if !exists || err != nil {
|
if err == nil && exists || errors.Is(err, ErrObjectIsExpired) {
|
||||||
return err
|
err = updateStorageID(tx, prm.addr, prm.id)
|
||||||
}
|
}
|
||||||
return updateStorageID(tx, prm.addr, prm.id)
|
|
||||||
|
return err
|
||||||
})
|
})
|
||||||
|
|
||||||
return
|
return
|
||||||
|
|
|
@ -12,6 +12,9 @@ import (
|
||||||
type batch struct {
|
type batch struct {
|
||||||
forest *boltForest
|
forest *boltForest
|
||||||
timer *time.Timer
|
timer *time.Timer
|
||||||
|
// mtx protects timer and operations fields.
|
||||||
|
// Because mtx can be taken inside a transaction,
|
||||||
|
// transactions MUST NOT be executed with the mutex taken to avoid a deadlock.
|
||||||
mtx sync.Mutex
|
mtx sync.Mutex
|
||||||
start sync.Once
|
start sync.Once
|
||||||
cid cidSDK.ID
|
cid cidSDK.ID
|
||||||
|
@ -24,16 +27,12 @@ func (b *batch) trigger() {
|
||||||
b.mtx.Lock()
|
b.mtx.Lock()
|
||||||
if b.timer != nil {
|
if b.timer != nil {
|
||||||
b.timer.Stop()
|
b.timer.Stop()
|
||||||
b.timer = nil
|
|
||||||
}
|
}
|
||||||
b.mtx.Unlock()
|
b.mtx.Unlock()
|
||||||
b.start.Do(b.run)
|
b.start.Do(b.run)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *batch) run() {
|
func (b *batch) run() {
|
||||||
sort.Slice(b.operations, func(i, j int) bool {
|
|
||||||
return b.operations[i].Time < b.operations[j].Time
|
|
||||||
})
|
|
||||||
fullID := bucketName(b.cid, b.treeID)
|
fullID := bucketName(b.cid, b.treeID)
|
||||||
err := b.forest.db.Update(func(tx *bbolt.Tx) error {
|
err := b.forest.db.Update(func(tx *bbolt.Tx) error {
|
||||||
bLog, bTree, err := b.forest.getTreeBuckets(tx, fullID)
|
bLog, bTree, err := b.forest.getTreeBuckets(tx, fullID)
|
||||||
|
@ -41,6 +40,16 @@ func (b *batch) run() {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
b.mtx.Lock()
|
||||||
|
b.timer = nil
|
||||||
|
b.mtx.Unlock()
|
||||||
|
|
||||||
|
// Sorting without a mutex is ok, because we append to this slice only if timer is non-nil.
|
||||||
|
// See (*boltForest).addBatch for details.
|
||||||
|
sort.Slice(b.operations, func(i, j int) bool {
|
||||||
|
return b.operations[i].Time < b.operations[j].Time
|
||||||
|
})
|
||||||
|
|
||||||
var lm Move
|
var lm Move
|
||||||
return b.forest.applyOperation(bLog, bTree, b.operations, &lm)
|
return b.forest.applyOperation(bLog, bTree, b.operations, &lm)
|
||||||
})
|
})
|
||||||
|
|
|
@ -377,7 +377,9 @@ func (t *boltForest) addBatch(d CIDDescriptor, treeID string, m *Move, ch chan e
|
||||||
results: []chan<- error{ch},
|
results: []chan<- error{ch},
|
||||||
operations: []*Move{m},
|
operations: []*Move{m},
|
||||||
}
|
}
|
||||||
|
b.mtx.Lock()
|
||||||
b.timer = time.AfterFunc(t.db.MaxBatchDelay, b.trigger)
|
b.timer = time.AfterFunc(t.db.MaxBatchDelay, b.trigger)
|
||||||
|
b.mtx.Unlock()
|
||||||
t.batches = append(t.batches, b)
|
t.batches = append(t.batches, b)
|
||||||
t.mtx.Unlock()
|
t.mtx.Unlock()
|
||||||
}
|
}
|
||||||
|
|
|
@ -255,9 +255,11 @@ func (s *Shard) Close() error {
|
||||||
|
|
||||||
components = append(components, s.blobStor, s.metaBase)
|
components = append(components, s.blobStor, s.metaBase)
|
||||||
|
|
||||||
|
var lastErr error
|
||||||
for _, component := range components {
|
for _, component := range components {
|
||||||
if err := component.Close(); err != nil {
|
if err := component.Close(); err != nil {
|
||||||
return fmt.Errorf("could not close %s: %w", component, err)
|
lastErr = err
|
||||||
|
s.log.Error("could not close shard component", zap.Error(err))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -266,7 +268,7 @@ func (s *Shard) Close() error {
|
||||||
s.gc.stop()
|
s.gc.stop()
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return lastErr
|
||||||
}
|
}
|
||||||
|
|
||||||
// Reload reloads configuration portions that are necessary.
|
// Reload reloads configuration portions that are necessary.
|
||||||
|
|
|
@ -121,9 +121,14 @@ func (s *Shard) fetchObjectData(addr oid.Address, skipMeta bool, cb storFetcher,
|
||||||
}
|
}
|
||||||
|
|
||||||
if IsErrNotFound(err) {
|
if IsErrNotFound(err) {
|
||||||
s.log.Debug("object is missing in write-cache")
|
s.log.Debug("object is missing in write-cache",
|
||||||
|
zap.Stringer("addr", addr),
|
||||||
|
zap.Bool("skip_meta", skipMeta))
|
||||||
} else {
|
} else {
|
||||||
s.log.Error("failed to fetch object from write-cache", zap.Error(err))
|
s.log.Error("failed to fetch object from write-cache",
|
||||||
|
zap.Error(err),
|
||||||
|
zap.Stringer("addr", addr),
|
||||||
|
zap.Bool("skip_meta", skipMeta))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -99,10 +99,6 @@ func (c *cache) flushDB() {
|
||||||
lastKey = slice.Copy(k)
|
lastKey = slice.Copy(k)
|
||||||
}
|
}
|
||||||
|
|
||||||
if _, ok := c.flushed.Peek(string(k)); ok {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
m = append(m, objectInfo{
|
m = append(m, objectInfo{
|
||||||
addr: string(k),
|
addr: string(k),
|
||||||
data: slice.Copy(v),
|
data: slice.Copy(v),
|
||||||
|
@ -111,12 +107,18 @@ func (c *cache) flushDB() {
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
|
|
||||||
|
var count int
|
||||||
for i := range m {
|
for i := range m {
|
||||||
|
if c.flushed.Contains(m[i].addr) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
obj := object.New()
|
obj := object.New()
|
||||||
if err := obj.Unmarshal(m[i].data); err != nil {
|
if err := obj.Unmarshal(m[i].data); err != nil {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
count++
|
||||||
select {
|
select {
|
||||||
case c.flushCh <- obj:
|
case c.flushCh <- obj:
|
||||||
case <-c.closeCh:
|
case <-c.closeCh:
|
||||||
|
@ -125,7 +127,7 @@ func (c *cache) flushDB() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(m) == 0 {
|
if count == 0 {
|
||||||
c.modeMtx.RUnlock()
|
c.modeMtx.RUnlock()
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
@ -133,7 +135,7 @@ func (c *cache) flushDB() {
|
||||||
c.modeMtx.RUnlock()
|
c.modeMtx.RUnlock()
|
||||||
|
|
||||||
c.log.Debug("tried to flush items from write-cache",
|
c.log.Debug("tried to flush items from write-cache",
|
||||||
zap.Int("count", len(m)),
|
zap.Int("count", count),
|
||||||
zap.String("start", base58.Encode(lastKey)))
|
zap.String("start", base58.Encode(lastKey)))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -46,6 +46,7 @@ func (c *cache) initFlushMarks() {
|
||||||
var batchSize = flushBatchSize
|
var batchSize = flushBatchSize
|
||||||
for {
|
for {
|
||||||
m = m[:0]
|
m = m[:0]
|
||||||
|
indices = indices[:0]
|
||||||
|
|
||||||
// We put objects in batches of fixed size to not interfere with main put cycle a lot.
|
// We put objects in batches of fixed size to not interfere with main put cycle a lot.
|
||||||
_ = c.db.View(func(tx *bbolt.Tx) error {
|
_ = c.db.View(func(tx *bbolt.Tx) error {
|
||||||
|
|
|
@ -23,6 +23,11 @@ type store struct {
|
||||||
maxFlushedMarksCount int
|
maxFlushedMarksCount int
|
||||||
maxRemoveBatchSize int
|
maxRemoveBatchSize int
|
||||||
|
|
||||||
|
// flushed contains addresses of objects that were already flushed to the main storage.
|
||||||
|
// We use LRU cache instead of map here to facilitate removing of unused object in favour of
|
||||||
|
// frequently read ones.
|
||||||
|
// MUST NOT be used inside bolt db transaction because it's eviction handler
|
||||||
|
// removes untracked items from the database.
|
||||||
flushed simplelru.LRUCache[string, bool]
|
flushed simplelru.LRUCache[string, bool]
|
||||||
db *bbolt.DB
|
db *bbolt.DB
|
||||||
|
|
||||||
|
|
|
@ -51,13 +51,13 @@ func newMethodCallCounter(name string) methodCount {
|
||||||
success: prometheus.NewCounter(prometheus.CounterOpts{
|
success: prometheus.NewCounter(prometheus.CounterOpts{
|
||||||
Namespace: namespace,
|
Namespace: namespace,
|
||||||
Subsystem: objectSubsystem,
|
Subsystem: objectSubsystem,
|
||||||
Name: fmt.Sprintf("%s_req_count", name),
|
Name: fmt.Sprintf("%s_req_count_success", name),
|
||||||
Help: fmt.Sprintf("The number of successful %s requests processed", name),
|
Help: fmt.Sprintf("The number of successful %s requests processed", name),
|
||||||
}),
|
}),
|
||||||
total: prometheus.NewCounter(prometheus.CounterOpts{
|
total: prometheus.NewCounter(prometheus.CounterOpts{
|
||||||
Namespace: namespace,
|
Namespace: namespace,
|
||||||
Subsystem: objectSubsystem,
|
Subsystem: objectSubsystem,
|
||||||
Name: fmt.Sprintf("%s_req_count_success", name),
|
Name: fmt.Sprintf("%s_req_count", name),
|
||||||
Help: fmt.Sprintf("Total number of %s requests processed", name),
|
Help: fmt.Sprintf("Total number of %s requests processed", name),
|
||||||
}),
|
}),
|
||||||
}
|
}
|
||||||
|
|
|
@ -211,6 +211,29 @@ func (c *Client) restoreSubscriptions(cli *rpcclient.WSClient, endpoint string)
|
||||||
id string
|
id string
|
||||||
)
|
)
|
||||||
|
|
||||||
|
stopCh := make(chan struct{})
|
||||||
|
defer close(stopCh)
|
||||||
|
|
||||||
|
// neo-go WS client says to _always_ read notifications
|
||||||
|
// from its channel. Subscribing to any notification
|
||||||
|
// while not reading them in another goroutine may
|
||||||
|
// lead to a dead-lock, thus that async side notification
|
||||||
|
// listening while restoring subscriptions
|
||||||
|
go func() {
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-stopCh:
|
||||||
|
return
|
||||||
|
case n, ok := <-cli.Notifications:
|
||||||
|
if !ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
c.notifications <- n
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
// new block events restoration
|
// new block events restoration
|
||||||
if c.subscribedToNewBlocks {
|
if c.subscribedToNewBlocks {
|
||||||
_, err = cli.SubscribeForNewBlocks(nil)
|
_, err = cli.SubscribeForNewBlocks(nil)
|
||||||
|
@ -226,6 +249,7 @@ func (c *Client) restoreSubscriptions(cli *rpcclient.WSClient, endpoint string)
|
||||||
|
|
||||||
// notification events restoration
|
// notification events restoration
|
||||||
for contract := range c.subscribedEvents {
|
for contract := range c.subscribedEvents {
|
||||||
|
contract := contract // See https://github.com/nspcc-dev/neo-go/issues/2890
|
||||||
id, err = cli.SubscribeForExecutionNotifications(&contract, nil)
|
id, err = cli.SubscribeForExecutionNotifications(&contract, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.logger.Error("could not restore notification subscription after RPC switch",
|
c.logger.Error("could not restore notification subscription after RPC switch",
|
||||||
|
@ -242,6 +266,7 @@ func (c *Client) restoreSubscriptions(cli *rpcclient.WSClient, endpoint string)
|
||||||
// notary notification events restoration
|
// notary notification events restoration
|
||||||
if c.notary != nil {
|
if c.notary != nil {
|
||||||
for signer := range c.subscribedNotaryEvents {
|
for signer := range c.subscribedNotaryEvents {
|
||||||
|
signer := signer // See https://github.com/nspcc-dev/neo-go/issues/2890
|
||||||
id, err = cli.SubscribeForNotaryRequests(nil, &signer)
|
id, err = cli.SubscribeForNotaryRequests(nil, &signer)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.logger.Error("could not restore notary notification subscription after RPC switch",
|
c.logger.Error("could not restore notary notification subscription after RPC switch",
|
||||||
|
|
7
pkg/network/cache/multi.go
vendored
7
pkg/network/cache/multi.go
vendored
|
@ -12,6 +12,8 @@ import (
|
||||||
"github.com/TrueCloudLab/frostfs-node/pkg/network"
|
"github.com/TrueCloudLab/frostfs-node/pkg/network"
|
||||||
"github.com/TrueCloudLab/frostfs-sdk-go/client"
|
"github.com/TrueCloudLab/frostfs-sdk-go/client"
|
||||||
"github.com/TrueCloudLab/frostfs-sdk-go/object"
|
"github.com/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
|
"google.golang.org/grpc/codes"
|
||||||
|
"google.golang.org/grpc/status"
|
||||||
)
|
)
|
||||||
|
|
||||||
type singleClient struct {
|
type singleClient struct {
|
||||||
|
@ -172,6 +174,10 @@ func (x *multiClient) ReportError(err error) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if status.Code(err) == codes.Canceled || errors.Is(err, context.Canceled) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
// non-status logic error that could be returned
|
// non-status logic error that could be returned
|
||||||
// from the SDK client; should not be considered
|
// from the SDK client; should not be considered
|
||||||
// as a connection error
|
// as a connection error
|
||||||
|
@ -196,7 +202,6 @@ func (s *singleClient) invalidate() {
|
||||||
_ = s.client.Close()
|
_ = s.client.Close()
|
||||||
}
|
}
|
||||||
s.client = nil
|
s.client = nil
|
||||||
s.lastAttempt = time.Now()
|
|
||||||
s.Unlock()
|
s.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -128,7 +128,7 @@ func (exec execCtx) key() (*ecdsa.PrivateKey, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (exec *execCtx) canAssemble() bool {
|
func (exec *execCtx) canAssemble() bool {
|
||||||
return exec.svc.assembly && !exec.isRaw() && !exec.headOnly()
|
return exec.svc.assembly && !exec.isRaw() && !exec.headOnly() && !exec.isLocal()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (exec *execCtx) splitInfo() *objectSDK.SplitInfo {
|
func (exec *execCtx) splitInfo() *objectSDK.SplitInfo {
|
||||||
|
|
|
@ -12,6 +12,7 @@ type (
|
||||||
MetricCollector struct {
|
MetricCollector struct {
|
||||||
next ServiceServer
|
next ServiceServer
|
||||||
metrics MetricRegister
|
metrics MetricRegister
|
||||||
|
enabled bool
|
||||||
}
|
}
|
||||||
|
|
||||||
getStreamMetric struct {
|
getStreamMetric struct {
|
||||||
|
@ -48,29 +49,34 @@ type (
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
func NewMetricCollector(next ServiceServer, register MetricRegister) *MetricCollector {
|
func NewMetricCollector(next ServiceServer, register MetricRegister, enabled bool) *MetricCollector {
|
||||||
return &MetricCollector{
|
return &MetricCollector{
|
||||||
next: next,
|
next: next,
|
||||||
metrics: register,
|
metrics: register,
|
||||||
|
enabled: enabled,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m MetricCollector) Get(req *object.GetRequest, stream GetObjectStream) (err error) {
|
func (m MetricCollector) Get(req *object.GetRequest, stream GetObjectStream) (err error) {
|
||||||
|
if m.enabled {
|
||||||
t := time.Now()
|
t := time.Now()
|
||||||
defer func() {
|
defer func() {
|
||||||
m.metrics.IncGetReqCounter(err == nil)
|
m.metrics.IncGetReqCounter(err == nil)
|
||||||
m.metrics.AddGetReqDuration(time.Since(t))
|
m.metrics.AddGetReqDuration(time.Since(t))
|
||||||
}()
|
}()
|
||||||
|
|
||||||
err = m.next.Get(req, &getStreamMetric{
|
err = m.next.Get(req, &getStreamMetric{
|
||||||
ServerStream: stream,
|
ServerStream: stream,
|
||||||
stream: stream,
|
stream: stream,
|
||||||
metrics: m.metrics,
|
metrics: m.metrics,
|
||||||
})
|
})
|
||||||
|
} else {
|
||||||
|
err = m.next.Get(req, stream)
|
||||||
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m MetricCollector) Put(ctx context.Context) (PutObjectStream, error) {
|
func (m MetricCollector) Put(ctx context.Context) (PutObjectStream, error) {
|
||||||
|
if m.enabled {
|
||||||
t := time.Now()
|
t := time.Now()
|
||||||
|
|
||||||
stream, err := m.next.Put(ctx)
|
stream, err := m.next.Put(ctx)
|
||||||
|
@ -83,9 +89,12 @@ func (m MetricCollector) Put(ctx context.Context) (PutObjectStream, error) {
|
||||||
metrics: m.metrics,
|
metrics: m.metrics,
|
||||||
start: t,
|
start: t,
|
||||||
}, nil
|
}, nil
|
||||||
|
}
|
||||||
|
return m.next.Put(ctx)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m MetricCollector) Head(ctx context.Context, request *object.HeadRequest) (*object.HeadResponse, error) {
|
func (m MetricCollector) Head(ctx context.Context, request *object.HeadRequest) (*object.HeadResponse, error) {
|
||||||
|
if m.enabled {
|
||||||
t := time.Now()
|
t := time.Now()
|
||||||
|
|
||||||
res, err := m.next.Head(ctx, request)
|
res, err := m.next.Head(ctx, request)
|
||||||
|
@ -94,9 +103,12 @@ func (m MetricCollector) Head(ctx context.Context, request *object.HeadRequest)
|
||||||
m.metrics.AddHeadReqDuration(time.Since(t))
|
m.metrics.AddHeadReqDuration(time.Since(t))
|
||||||
|
|
||||||
return res, err
|
return res, err
|
||||||
|
}
|
||||||
|
return m.next.Head(ctx, request)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m MetricCollector) Search(req *object.SearchRequest, stream SearchStream) error {
|
func (m MetricCollector) Search(req *object.SearchRequest, stream SearchStream) error {
|
||||||
|
if m.enabled {
|
||||||
t := time.Now()
|
t := time.Now()
|
||||||
|
|
||||||
err := m.next.Search(req, stream)
|
err := m.next.Search(req, stream)
|
||||||
|
@ -105,20 +117,25 @@ func (m MetricCollector) Search(req *object.SearchRequest, stream SearchStream)
|
||||||
m.metrics.AddSearchReqDuration(time.Since(t))
|
m.metrics.AddSearchReqDuration(time.Since(t))
|
||||||
|
|
||||||
return err
|
return err
|
||||||
|
}
|
||||||
|
return m.next.Search(req, stream)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m MetricCollector) Delete(ctx context.Context, request *object.DeleteRequest) (*object.DeleteResponse, error) {
|
func (m MetricCollector) Delete(ctx context.Context, request *object.DeleteRequest) (*object.DeleteResponse, error) {
|
||||||
|
if m.enabled {
|
||||||
t := time.Now()
|
t := time.Now()
|
||||||
|
|
||||||
res, err := m.next.Delete(ctx, request)
|
res, err := m.next.Delete(ctx, request)
|
||||||
|
|
||||||
m.metrics.IncDeleteReqCounter(err == nil)
|
m.metrics.IncDeleteReqCounter(err == nil)
|
||||||
m.metrics.AddDeleteReqDuration(time.Since(t))
|
m.metrics.AddDeleteReqDuration(time.Since(t))
|
||||||
|
|
||||||
return res, err
|
return res, err
|
||||||
|
}
|
||||||
|
return m.next.Delete(ctx, request)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m MetricCollector) GetRange(req *object.GetRangeRequest, stream GetObjectRangeStream) error {
|
func (m MetricCollector) GetRange(req *object.GetRangeRequest, stream GetObjectRangeStream) error {
|
||||||
|
if m.enabled {
|
||||||
t := time.Now()
|
t := time.Now()
|
||||||
|
|
||||||
err := m.next.GetRange(req, stream)
|
err := m.next.GetRange(req, stream)
|
||||||
|
@ -127,9 +144,12 @@ func (m MetricCollector) GetRange(req *object.GetRangeRequest, stream GetObjectR
|
||||||
m.metrics.AddRangeReqDuration(time.Since(t))
|
m.metrics.AddRangeReqDuration(time.Since(t))
|
||||||
|
|
||||||
return err
|
return err
|
||||||
|
}
|
||||||
|
return m.next.GetRange(req, stream)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m MetricCollector) GetRangeHash(ctx context.Context, request *object.GetRangeHashRequest) (*object.GetRangeHashResponse, error) {
|
func (m MetricCollector) GetRangeHash(ctx context.Context, request *object.GetRangeHashRequest) (*object.GetRangeHashResponse, error) {
|
||||||
|
if m.enabled {
|
||||||
t := time.Now()
|
t := time.Now()
|
||||||
|
|
||||||
res, err := m.next.GetRangeHash(ctx, request)
|
res, err := m.next.GetRangeHash(ctx, request)
|
||||||
|
@ -138,6 +158,16 @@ func (m MetricCollector) GetRangeHash(ctx context.Context, request *object.GetRa
|
||||||
m.metrics.AddRangeHashReqDuration(time.Since(t))
|
m.metrics.AddRangeHashReqDuration(time.Since(t))
|
||||||
|
|
||||||
return res, err
|
return res, err
|
||||||
|
}
|
||||||
|
return m.next.GetRangeHash(ctx, request)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *MetricCollector) Enable() {
|
||||||
|
m.enabled = true
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *MetricCollector) Disable() {
|
||||||
|
m.enabled = false
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s getStreamMetric) Send(resp *object.GetResponse) error {
|
func (s getStreamMetric) Send(resp *object.GetResponse) error {
|
||||||
|
|
|
@ -6,12 +6,12 @@ import (
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Prm groups the required parameters of the Server's constructor.
|
// HTTPSrvPrm groups the required parameters of the Server's constructor.
|
||||||
//
|
//
|
||||||
// All values must comply with the requirements imposed on them.
|
// All values must comply with the requirements imposed on them.
|
||||||
// Passing incorrect parameter values will result in constructor
|
// Passing incorrect parameter values will result in constructor
|
||||||
// failure (error or panic depending on the implementation).
|
// failure (error or panic depending on the implementation).
|
||||||
type Prm struct {
|
type HTTPSrvPrm struct {
|
||||||
// TCP address for the server to listen on.
|
// TCP address for the server to listen on.
|
||||||
//
|
//
|
||||||
// Must be a valid TCP address.
|
// Must be a valid TCP address.
|
||||||
|
@ -49,6 +49,15 @@ func panicOnValue(t, n string, v interface{}) {
|
||||||
panic(fmt.Sprintf(invalidValFmt, t, n, v, v))
|
panic(fmt.Sprintf(invalidValFmt, t, n, v, v))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func checkSrvPrm(addr string, handler http.Handler) {
|
||||||
|
switch {
|
||||||
|
case addr == "":
|
||||||
|
panicOnPrmValue("Address", addr)
|
||||||
|
case handler == nil:
|
||||||
|
panicOnPrmValue("Handler", handler)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// New creates a new instance of the Server.
|
// New creates a new instance of the Server.
|
||||||
//
|
//
|
||||||
// Panics if at least one value of the parameters is invalid.
|
// Panics if at least one value of the parameters is invalid.
|
||||||
|
@ -58,13 +67,8 @@ func panicOnValue(t, n string, v interface{}) {
|
||||||
//
|
//
|
||||||
// The created Server does not require additional
|
// The created Server does not require additional
|
||||||
// initialization and is completely ready for work.
|
// initialization and is completely ready for work.
|
||||||
func New(prm Prm, opts ...Option) *Server {
|
func New(prm HTTPSrvPrm, opts ...Option) *Server {
|
||||||
switch {
|
checkSrvPrm(prm.Address, prm.Handler)
|
||||||
case prm.Address == "":
|
|
||||||
panicOnPrmValue("Address", prm.Address)
|
|
||||||
case prm.Handler == nil:
|
|
||||||
panicOnPrmValue("Handler", prm.Handler)
|
|
||||||
}
|
|
||||||
|
|
||||||
c := defaultCfg()
|
c := defaultCfg()
|
||||||
|
|
||||||
|
@ -85,3 +89,14 @@ func New(prm Prm, opts ...Option) *Server {
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// NewHTTPSrvPrm creates a new instance of the HTTPSrvPrm.
|
||||||
|
//
|
||||||
|
// Panics if at least one value of the parameters is invalid.
|
||||||
|
func NewHTTPSrvPrm(addr string, handler http.Handler) *HTTPSrvPrm {
|
||||||
|
checkSrvPrm(addr, handler)
|
||||||
|
return &HTTPSrvPrm{
|
||||||
|
Address: addr,
|
||||||
|
Handler: handler,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in a new issue