From f9730f090d1e9066d423f4a7b291bc9ffbb17a6b Mon Sep 17 00:00:00 2001 From: Alejandro Lopez Date: Thu, 29 Jun 2023 12:13:01 +0300 Subject: [PATCH] [#92] Refactor policer and add some unit tests Signed-off-by: Alejandro Lopez --- cmd/frostfs-node/keyspaceiterator.go | 28 +++ cmd/frostfs-node/object.go | 22 +- pkg/services/control/server/evacuate.go | 11 +- pkg/services/policer/check.go | 88 ++----- pkg/services/policer/check_test.go | 8 +- pkg/services/policer/nodecache.go | 57 +++++ pkg/services/policer/option.go | 185 +++++++++++++++ pkg/services/policer/policer.go | 149 ------------ pkg/services/policer/policer_test.go | 299 ++++++++++++++++++++++++ pkg/services/policer/process.go | 18 +- pkg/services/policer/queue.go | 26 --- pkg/services/replicator/process.go | 24 +- pkg/services/replicator/task.go | 35 +-- 13 files changed, 640 insertions(+), 310 deletions(-) create mode 100644 cmd/frostfs-node/keyspaceiterator.go create mode 100644 pkg/services/policer/nodecache.go create mode 100644 pkg/services/policer/option.go create mode 100644 pkg/services/policer/policer_test.go delete mode 100644 pkg/services/policer/queue.go diff --git a/cmd/frostfs-node/keyspaceiterator.go b/cmd/frostfs-node/keyspaceiterator.go new file mode 100644 index 000000000..8991964a0 --- /dev/null +++ b/cmd/frostfs-node/keyspaceiterator.go @@ -0,0 +1,28 @@ +package main + +import ( + "context" + "fmt" + + objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine" +) + +type keySpaceIterator struct { + ng *engine.StorageEngine + cur *engine.Cursor +} + +func (it *keySpaceIterator) Next(ctx context.Context, batchSize uint32) ([]objectcore.AddressWithType, error) { + var prm engine.ListWithCursorPrm + prm.WithCursor(it.cur) + prm.WithCount(batchSize) + + res, err := it.ng.ListWithCursor(ctx, prm) + if err != nil { + return nil, fmt.Errorf("cannot list objects in engine: %w", err) + } + + it.cur = res.Cursor() + return res.AddressList(), nil +} diff --git a/cmd/frostfs-node/object.go b/cmd/frostfs-node/object.go index afc4bcb41..4106f5dc1 100644 --- a/cmd/frostfs-node/object.go +++ b/cmd/frostfs-node/object.go @@ -38,6 +38,7 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" eaclSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/eacl" + netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user" @@ -211,15 +212,30 @@ func addPolicer(c *cfg, keyStorage *util.KeyStorage, clientConstructor *cache.Cl ls := c.cfgObject.cfgLocalStorage.localStorage + buryFn := func(ctx context.Context, addr oid.Address) error { + var prm engine.InhumePrm + prm.MarkAsGarbage(addr) + prm.WithForceRemoval() + + _, err := ls.Inhume(ctx, prm) + return err + } + + remoteHeader := headsvc.NewRemoteHeader(keyStorage, clientConstructor) + pol := policer.New( policer.WithLogger(c.log), - policer.WithLocalStorage(ls), + policer.WithKeySpaceIterator(&keySpaceIterator{ng: ls}), + policer.WithBuryFunc(buryFn), policer.WithContainerSource(c.cfgObject.cnrSource), policer.WithPlacementBuilder( placement.NewNetworkMapSourceBuilder(c.netMapSource), ), - policer.WithRemoteHeader( - headsvc.NewRemoteHeader(keyStorage, clientConstructor), + policer.WithRemoteObjectHeaderFunc( + func(ctx context.Context, ni netmapSDK.NodeInfo, a oid.Address) (*objectSDK.Object, error) { + prm := new(headsvc.RemoteHeadPrm).WithNodeInfo(ni).WithObjectAddress(a) + return remoteHeader.Head(ctx, prm) + }, ), policer.WithNetmapKeys(c), policer.WithHeadTimeout( diff --git a/pkg/services/control/server/evacuate.go b/pkg/services/control/server/evacuate.go index fc6dd3f60..8f62c3489 100644 --- a/pkg/services/control/server/evacuate.go +++ b/pkg/services/control/server/evacuate.go @@ -84,11 +84,12 @@ func (s *Server) replicate(ctx context.Context, addr oid.Address, obj *objectSDK } var res replicatorResult - var task replicator.Task - task.SetObject(obj) - task.SetObjectAddress(addr) - task.SetCopiesNumber(1) - task.SetNodes(nodes) + task := replicator.Task{ + NumCopies: 1, + Addr: addr, + Obj: obj, + Nodes: nodes, + } s.replicator.HandleTask(ctx, task, &res) if res.count == 0 { diff --git a/pkg/services/policer/check.go b/pkg/services/policer/check.go index e91b8871b..db297b68a 100644 --- a/pkg/services/policer/check.go +++ b/pkg/services/policer/check.go @@ -7,8 +7,6 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container" objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine" - headsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/head" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" @@ -18,55 +16,6 @@ import ( "go.uber.org/zap" ) -// tracks Policer's check progress. -type nodeCache map[uint64]bool - -func newNodeCache() *nodeCache { - m := make(map[uint64]bool) - return (*nodeCache)(&m) -} - -func (n *nodeCache) set(node netmap.NodeInfo, val bool) { - (*n)[node.Hash()] = val -} - -// submits storage node as a candidate to store the object replica in case of -// shortage. -func (n *nodeCache) submitReplicaCandidate(node netmap.NodeInfo) { - n.set(node, false) -} - -// submits storage node as a current object replica holder. -func (n *nodeCache) submitReplicaHolder(node netmap.NodeInfo) { - n.set(node, true) -} - -// processStatus returns current processing status of the storage node -// -// >0 if node does not currently hold the object -// 0 if node already holds the object -// <0 if node has not been processed yet -func (n *nodeCache) processStatus(node netmap.NodeInfo) int8 { - val, ok := (*n)[node.Hash()] - if !ok { - return -1 - } - - if val { - return 0 - } - - return 1 -} - -// SubmitSuccessfulReplication marks given storage node as a current object -// replica holder. -// -// SubmitSuccessfulReplication implements replicator.TaskResult. -func (n *nodeCache) SubmitSuccessfulReplication(node netmap.NodeInfo) { - n.submitReplicaHolder(node) -} - func (p *Policer) processObject(ctx context.Context, addrWithType objectcore.AddressWithType) { addr := addrWithType.Address idCnr := addr.Container() @@ -79,11 +28,7 @@ func (p *Policer) processObject(ctx context.Context, addrWithType objectcore.Add zap.String("error", err.Error()), ) if container.IsErrNotFound(err) { - var prm engine.InhumePrm - prm.MarkAsGarbage(addrWithType.Address) - prm.WithForceRemoval() - - _, err := p.jobQueue.localStorage.Inhume(ctx, prm) + err := p.buryFn(ctx, addrWithType.Address) if err != nil { p.log.Error(logs.PolicerCouldNotInhumeObjectWithMissingContainer, zap.Stringer("cid", idCnr), @@ -145,10 +90,9 @@ type placementRequirements struct { } func (p *Policer) processNodes(ctx context.Context, requirements *placementRequirements, addrWithType objectcore.AddressWithType, - nodes []netmap.NodeInfo, shortage uint32, checkedNodes *nodeCache) { + nodes []netmap.NodeInfo, shortage uint32, checkedNodes nodeCache) { addr := addrWithType.Address typ := addrWithType.Type - prm := new(headsvc.RemoteHeadPrm).WithObjectAddress(addr) // Number of copies that are stored on maintenance nodes. var uncheckedCopies int @@ -175,8 +119,8 @@ func (p *Policer) processNodes(ctx context.Context, requirements *placementRequi } else if nodes[i].IsMaintenance() { shortage, uncheckedCopies = p.handleMaintenance(nodes[i], checkedNodes, shortage, uncheckedCopies) } else { - if status := checkedNodes.processStatus(nodes[i]); status >= 0 { - if status == 0 { + if status := checkedNodes.processStatus(nodes[i]); status.Processed() { + if status == nodeHoldsObject { // node already contains replica, no need to replicate nodes = append(nodes[:i], nodes[i+1:]...) i-- @@ -188,7 +132,7 @@ func (p *Policer) processNodes(ctx context.Context, requirements *placementRequi callCtx, cancel := context.WithTimeout(ctx, p.headTimeout) - _, err := p.remoteHeader.Head(callCtx, prm.WithNodeInfo(nodes[i])) + _, err := p.remoteHeader(callCtx, nodes[i], addr) cancel() @@ -224,7 +168,7 @@ func (p *Policer) processNodes(ctx context.Context, requirements *placementRequi // prevent spam with new replicas. // However, additional copies should not be removed in this case, // because we can remove the only copy this way. -func (p *Policer) handleMaintenance(node netmap.NodeInfo, checkedNodes *nodeCache, shortage uint32, uncheckedCopies int) (uint32, int) { +func (p *Policer) handleMaintenance(node netmap.NodeInfo, checkedNodes nodeCache, shortage uint32, uncheckedCopies int) (uint32, int) { checkedNodes.submitReplicaHolder(node) shortage-- uncheckedCopies++ @@ -236,25 +180,29 @@ func (p *Policer) handleMaintenance(node netmap.NodeInfo, checkedNodes *nodeCach } func (p *Policer) handleProcessNodesResult(ctx context.Context, addr oid.Address, requirements *placementRequirements, - nodes []netmap.NodeInfo, checkedNodes *nodeCache, shortage uint32, uncheckedCopies int) { - if shortage > 0 { + nodes []netmap.NodeInfo, checkedNodes nodeCache, shortage uint32, uncheckedCopies int) { + switch { + case shortage > 0: p.log.Debug(logs.PolicerShortageOfObjectCopiesDetected, zap.Stringer("object", addr), zap.Uint32("shortage", shortage), ) - var task replicator.Task - task.SetObjectAddress(addr) - task.SetNodes(nodes) - task.SetCopiesNumber(shortage) + task := replicator.Task{ + NumCopies: shortage, + Addr: addr, + Nodes: nodes, + } p.replicator.HandleTask(ctx, task, checkedNodes) - } else if uncheckedCopies > 0 { + + case uncheckedCopies > 0: // If we have more copies than needed, but some of them are from the maintenance nodes, // save the local copy. p.log.Debug(logs.PolicerSomeOfTheCopiesAreStoredOnNodesUnderMaintenance, zap.Int("count", uncheckedCopies)) - } else if uncheckedCopies == 0 { + + case uncheckedCopies == 0: // Safe to remove: checked all copies, shortage == 0. requirements.removeLocalCopy = true } diff --git a/pkg/services/policer/check_test.go b/pkg/services/policer/check_test.go index b40ee90d2..d4c7ccbf9 100644 --- a/pkg/services/policer/check_test.go +++ b/pkg/services/policer/check_test.go @@ -11,14 +11,14 @@ func TestNodeCache(t *testing.T) { cache := newNodeCache() node := netmaptest.NodeInfo() - require.Negative(t, cache.processStatus(node)) + require.Equal(t, cache.processStatus(node), nodeNotProcessed) cache.SubmitSuccessfulReplication(node) - require.Zero(t, cache.processStatus(node)) + require.Equal(t, cache.processStatus(node), nodeHoldsObject) cache.submitReplicaCandidate(node) - require.Positive(t, cache.processStatus(node)) + require.Equal(t, cache.processStatus(node), nodeDoesNotHoldObject) cache.submitReplicaHolder(node) - require.Zero(t, cache.processStatus(node)) + require.Equal(t, cache.processStatus(node), nodeHoldsObject) } diff --git a/pkg/services/policer/nodecache.go b/pkg/services/policer/nodecache.go new file mode 100644 index 000000000..cd47cb0fc --- /dev/null +++ b/pkg/services/policer/nodecache.go @@ -0,0 +1,57 @@ +package policer + +import "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" + +type nodeProcessStatus int8 + +const ( + nodeNotProcessed nodeProcessStatus = iota + nodeDoesNotHoldObject + nodeHoldsObject +) + +func (st nodeProcessStatus) Processed() bool { + return st != nodeNotProcessed +} + +// nodeCache tracks Policer's check progress. +type nodeCache map[uint64]bool + +func newNodeCache() nodeCache { + return make(map[uint64]bool) +} + +func (n nodeCache) set(node netmap.NodeInfo, val bool) { + n[node.Hash()] = val +} + +// submits storage node as a candidate to store the object replica in case of +// shortage. +func (n nodeCache) submitReplicaCandidate(node netmap.NodeInfo) { + n.set(node, false) +} + +// submits storage node as a current object replica holder. +func (n nodeCache) submitReplicaHolder(node netmap.NodeInfo) { + n.set(node, true) +} + +// processStatus returns current processing status of the storage node. +func (n nodeCache) processStatus(node netmap.NodeInfo) nodeProcessStatus { + switch val, ok := n[node.Hash()]; { + case !ok: + return nodeNotProcessed + case val: + return nodeHoldsObject + default: + return nodeDoesNotHoldObject + } +} + +// SubmitSuccessfulReplication marks given storage node as a current object +// replica holder. +// +// SubmitSuccessfulReplication implements replicator.TaskResult. +func (n nodeCache) SubmitSuccessfulReplication(node netmap.NodeInfo) { + n.submitReplicaHolder(node) +} diff --git a/pkg/services/policer/option.go b/pkg/services/policer/option.go new file mode 100644 index 000000000..6f17b2947 --- /dev/null +++ b/pkg/services/policer/option.go @@ -0,0 +1,185 @@ +package policer + +import ( + "context" + "time" + + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap" + objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" + netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" + objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" + oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" + "github.com/panjf2000/ants/v2" + "go.uber.org/zap" +) + +// KeySpaceIterator is the interface that allows iterating over the key space +// of local storage. +// Note that the underlying implementation might be circular: i.e. it can restart +// when the end of the key space is reached. +type KeySpaceIterator interface { + Next(context.Context, uint32) ([]objectcore.AddressWithType, error) +} + +// RedundantCopyCallback is a callback to pass +// the redundant local copy of the object. +type RedundantCopyCallback func(context.Context, oid.Address) + +// BuryFunc is the function to bury (i.e. inhume) an object. +type BuryFunc func(context.Context, oid.Address) error + +// Replicator is the interface to a consumer of replication tasks. +type Replicator interface { + HandleTask(ctx context.Context, task replicator.Task, res replicator.TaskResult) +} + +// RemoteObjectHeaderFunc is the function to obtain HEAD info from a specific remote node. +type RemoteObjectHeaderFunc func(context.Context, netmapSDK.NodeInfo, oid.Address) (*objectSDK.Object, error) + +// NodeLoader provides application load statistics. +type nodeLoader interface { + // ObjectServiceLoad returns object service load value in [0:1] range. + ObjectServiceLoad() float64 +} + +type cfg struct { + headTimeout time.Duration + + log *logger.Logger + + keySpaceIterator KeySpaceIterator + + buryFn BuryFunc + + cnrSrc container.Source + + placementBuilder placement.Builder + + remoteHeader RemoteObjectHeaderFunc + + netmapKeys netmap.AnnouncedKeys + + replicator Replicator + + cbRedundantCopy RedundantCopyCallback + + taskPool *ants.Pool + + loader nodeLoader + + maxCapacity int + + batchSize, cacheSize uint32 + + rebalanceFreq, evictDuration time.Duration +} + +func defaultCfg() *cfg { + return &cfg{ + log: &logger.Logger{Logger: zap.L()}, + batchSize: 10, + cacheSize: 1024, // 1024 * address size = 1024 * 64 = 64 MiB + rebalanceFreq: 1 * time.Second, + evictDuration: 30 * time.Second, + } +} + +// Option is an option for Policer constructor. +type Option func(*cfg) + +// WithHeadTimeout returns option to set Head timeout of Policer. +func WithHeadTimeout(v time.Duration) Option { + return func(c *cfg) { + c.headTimeout = v + } +} + +// WithLogger returns option to set Logger of Policer. +func WithLogger(v *logger.Logger) Option { + return func(c *cfg) { + c.log = v + } +} + +func WithKeySpaceIterator(it KeySpaceIterator) Option { + return func(c *cfg) { + c.keySpaceIterator = it + } +} + +func WithBuryFunc(f BuryFunc) Option { + return func(c *cfg) { + c.buryFn = f + } +} + +// WithContainerSource returns option to set container source of Policer. +func WithContainerSource(v container.Source) Option { + return func(c *cfg) { + c.cnrSrc = v + } +} + +// WithPlacementBuilder returns option to set object placement builder of Policer. +func WithPlacementBuilder(v placement.Builder) Option { + return func(c *cfg) { + c.placementBuilder = v + } +} + +// WithRemoteObjectHeader returns option to set object header receiver of Policer. +func WithRemoteObjectHeaderFunc(v RemoteObjectHeaderFunc) Option { + return func(c *cfg) { + c.remoteHeader = v + } +} + +// WithNetmapKeys returns option to set tool to work with announced public keys. +func WithNetmapKeys(v netmap.AnnouncedKeys) Option { + return func(c *cfg) { + c.netmapKeys = v + } +} + +// WithReplicator returns option to set object replicator of Policer. +func WithReplicator(v Replicator) Option { + return func(c *cfg) { + c.replicator = v + } +} + +// WithRedundantCopyCallback returns option to set +// callback to pass redundant local object copies +// detected by Policer. +func WithRedundantCopyCallback(cb RedundantCopyCallback) Option { + return func(c *cfg) { + c.cbRedundantCopy = cb + } +} + +// WithMaxCapacity returns option to set max capacity +// that can be set to the pool. +func WithMaxCapacity(capacity int) Option { + return func(c *cfg) { + c.maxCapacity = capacity + } +} + +// WithPool returns option to set pool for +// policy and replication operations. +func WithPool(p *ants.Pool) Option { + return func(c *cfg) { + c.taskPool = p + } +} + +// WithNodeLoader returns option to set FrostFS node load source. +func WithNodeLoader(l nodeLoader) Option { + return func(c *cfg) { + c.loader = l + } +} diff --git a/pkg/services/policer/policer.go b/pkg/services/policer/policer.go index 541ab599c..4be13e0d8 100644 --- a/pkg/services/policer/policer.go +++ b/pkg/services/policer/policer.go @@ -1,29 +1,15 @@ package policer import ( - "context" "sync" "time" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine" - headsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/head" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" lru "github.com/hashicorp/golang-lru/v2" - "github.com/panjf2000/ants/v2" "go.uber.org/zap" ) -// NodeLoader provides application load statistics. -type nodeLoader interface { - // ObjectServiceLoad returns object service load value in [0:1] range. - ObjectServiceLoad() float64 -} - type objectsInWork struct { m sync.RWMutex objs map[oid.Address]struct{} @@ -59,53 +45,6 @@ type Policer struct { objsInWork *objectsInWork } -// Option is an option for Policer constructor. -type Option func(*cfg) - -// RedundantCopyCallback is a callback to pass -// the redundant local copy of the object. -type RedundantCopyCallback func(context.Context, oid.Address) - -type cfg struct { - headTimeout time.Duration - - log *logger.Logger - - jobQueue jobQueue - - cnrSrc container.Source - - placementBuilder placement.Builder - - remoteHeader *headsvc.RemoteHeader - - netmapKeys netmap.AnnouncedKeys - - replicator *replicator.Replicator - - cbRedundantCopy RedundantCopyCallback - - taskPool *ants.Pool - - loader nodeLoader - - maxCapacity int - - batchSize, cacheSize uint32 - - rebalanceFreq, evictDuration time.Duration -} - -func defaultCfg() *cfg { - return &cfg{ - log: &logger.Logger{Logger: zap.L()}, - batchSize: 10, - cacheSize: 1024, // 1024 * address size = 1024 * 64 = 64 MiB - rebalanceFreq: 1 * time.Second, - evictDuration: 30 * time.Second, - } -} - // New creates, initializes and returns Policer instance. func New(opts ...Option) *Policer { c := defaultCfg() @@ -129,91 +68,3 @@ func New(opts ...Option) *Policer { }, } } - -// WithHeadTimeout returns option to set Head timeout of Policer. -func WithHeadTimeout(v time.Duration) Option { - return func(c *cfg) { - c.headTimeout = v - } -} - -// WithLogger returns option to set Logger of Policer. -func WithLogger(v *logger.Logger) Option { - return func(c *cfg) { - c.log = v - } -} - -// WithLocalStorage returns option to set local object storage of Policer. -func WithLocalStorage(v *engine.StorageEngine) Option { - return func(c *cfg) { - c.jobQueue.localStorage = v - } -} - -// WithContainerSource returns option to set container source of Policer. -func WithContainerSource(v container.Source) Option { - return func(c *cfg) { - c.cnrSrc = v - } -} - -// WithPlacementBuilder returns option to set object placement builder of Policer. -func WithPlacementBuilder(v placement.Builder) Option { - return func(c *cfg) { - c.placementBuilder = v - } -} - -// WithRemoteHeader returns option to set object header receiver of Policer. -func WithRemoteHeader(v *headsvc.RemoteHeader) Option { - return func(c *cfg) { - c.remoteHeader = v - } -} - -// WithNetmapKeys returns option to set tool to work with announced public keys. -func WithNetmapKeys(v netmap.AnnouncedKeys) Option { - return func(c *cfg) { - c.netmapKeys = v - } -} - -// WithReplicator returns option to set object replicator of Policer. -func WithReplicator(v *replicator.Replicator) Option { - return func(c *cfg) { - c.replicator = v - } -} - -// WithRedundantCopyCallback returns option to set -// callback to pass redundant local object copies -// detected by Policer. -func WithRedundantCopyCallback(cb RedundantCopyCallback) Option { - return func(c *cfg) { - c.cbRedundantCopy = cb - } -} - -// WithMaxCapacity returns option to set max capacity -// that can be set to the pool. -func WithMaxCapacity(capacity int) Option { - return func(c *cfg) { - c.maxCapacity = capacity - } -} - -// WithPool returns option to set pool for -// policy and replication operations. -func WithPool(p *ants.Pool) Option { - return func(c *cfg) { - c.taskPool = p - } -} - -// WithNodeLoader returns option to set FrostFS node load source. -func WithNodeLoader(l nodeLoader) Option { - return func(c *cfg) { - c.loader = l - } -} diff --git a/pkg/services/policer/policer_test.go b/pkg/services/policer/policer_test.go new file mode 100644 index 000000000..a09957895 --- /dev/null +++ b/pkg/services/policer/policer_test.go @@ -0,0 +1,299 @@ +package policer + +import ( + "bytes" + "context" + "errors" + "sort" + "testing" + + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container" + objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator" + apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" + cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" + oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" + oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test" + "github.com/panjf2000/ants/v2" + "github.com/stretchr/testify/require" +) + +func TestBuryObjectWithoutContainer(t *testing.T) { + // Key space + addr := oidtest.Address() + objs := []objectcore.AddressWithType{ + { + Address: addr, + Type: object.TypeRegular, + }, + } + + // Container source and bury function + buryCh := make(chan oid.Address) + containerSrc := func(id cid.ID) (*container.Container, error) { + return nil, apistatus.ContainerNotFound{} + } + buryFn := func(ctx context.Context, a oid.Address) error { + buryCh <- a + return nil + } + + // Task pool + pool, err := ants.NewPool(4) + require.NoError(t, err) + + // Policer instance + p := New( + WithKeySpaceIterator(&sliceKeySpaceIterator{objs: objs}), + WithContainerSource(containerSrcFunc(containerSrc)), + WithBuryFunc(buryFn), + WithPool(pool), + WithNodeLoader(constNodeLoader(0)), + ) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go p.Run(ctx) + + require.Equal(t, addr, <-buryCh) +} + +func TestProcessObject(t *testing.T) { + // Notes: + // - nodes are referred to by their index throughout, which is embedded in the public key + // - node with index 0 always refers to the local node, so there's no need to add it to objHolders + // - policy is used only to match the number of replicas for each index in the placement + tests := []struct { + desc string + objType object.Type + nodeCount int + policy string + placement [][]int + objHolders []int + maintenanceNodes []int + wantRemoveRedundant bool + wantReplicateTo []int + }{ + { + desc: "1 copy already held by local node", + nodeCount: 1, + policy: `REP 1`, + placement: [][]int{{0}}, + }, + { + desc: "1 copy already held by the remote node", + nodeCount: 2, + policy: `REP 1`, + placement: [][]int{{1}}, + objHolders: []int{1}, + wantRemoveRedundant: true, + }, + { + desc: "1 copy not yet held by the remote node", + nodeCount: 2, + policy: `REP 1`, + placement: [][]int{{1}}, + wantReplicateTo: []int{1}, + }, + { + desc: "2 copies already held by local and remote node", + nodeCount: 2, + policy: `REP 2`, + placement: [][]int{{0, 1}}, + objHolders: []int{1}, + }, + { + desc: "2 copies but not held by remote node", + nodeCount: 2, + policy: `REP 2`, + placement: [][]int{{0, 1}}, + wantReplicateTo: []int{1}, + }, + { + desc: "multiple vectors already held by remote node", + nodeCount: 2, + policy: `REP 2 REP 2`, + placement: [][]int{{0, 1}, {0, 1}}, + objHolders: []int{1}, + }, + { + desc: "multiple vectors not yet held by remote node", + nodeCount: 2, + policy: `REP 2 REP 2`, + placement: [][]int{{0, 1}, {0, 1}}, + wantReplicateTo: []int{1, 1}, // is this actually good? + }, + { + desc: "lock object must be replicated to all nodes", + objType: object.TypeLock, + nodeCount: 3, + policy: `REP 1`, + placement: [][]int{{0, 1, 2}}, + wantReplicateTo: []int{1, 2}, + }, + { + desc: "preserve local copy when maintenance nodes exist", + nodeCount: 3, + policy: `REP 2`, + placement: [][]int{{1, 2}}, + objHolders: []int{1}, + maintenanceNodes: []int{2}, + }, + } + + for i := range tests { + ti := tests[i] + t.Run(ti.desc, func(t *testing.T) { + addr := oidtest.Address() + + // Netmap, placement policy and placement builder + nodes := make([]netmap.NodeInfo, ti.nodeCount) + for i := range nodes { + nodes[i].SetPublicKey([]byte{byte(i)}) + } + for _, i := range ti.maintenanceNodes { + nodes[i].SetMaintenance() + } + + var policy netmap.PlacementPolicy + require.NoError(t, policy.DecodeString(ti.policy)) + + placementVectors := make([][]netmap.NodeInfo, len(ti.placement)) + for i, pv := range ti.placement { + for _, nj := range pv { + placementVectors[i] = append(placementVectors[i], nodes[nj]) + } + } + placementBuilder := func(cnr cid.ID, obj *oid.ID, p netmap.PlacementPolicy) ([][]netmap.NodeInfo, error) { + if cnr.Equals(addr.Container()) && obj != nil && obj.Equals(addr.Object()) { + return placementVectors, nil + } + t.Errorf("unexpected placement build: cid=%v oid=%v", cnr, obj) + return nil, errors.New("unexpected placement build") + } + + // Object remote header + headFn := func(_ context.Context, ni netmap.NodeInfo, a oid.Address) (*object.Object, error) { + index := int(ni.PublicKey()[0]) + if a != addr || index < 1 || index >= ti.nodeCount { + t.Errorf("unexpected remote object head: node=%+v addr=%v", ni, a) + return nil, errors.New("unexpected object head") + } + for _, i := range ti.objHolders { + if index == i { + return nil, nil + } + } + return nil, apistatus.ObjectNotFound{} + } + + // Container source + cnr := &container.Container{} + cnr.Value.Init() + cnr.Value.SetPlacementPolicy(policy) + containerSrc := func(id cid.ID) (*container.Container, error) { + if id.Equals(addr.Container()) { + return cnr, nil + } + t.Errorf("unexpected container requested: got=%v, want=%v", id, addr.Container()) + return nil, apistatus.ContainerNotFound{} + } + buryFn := func(ctx context.Context, a oid.Address) error { + t.Errorf("unexpected object buried: %v", a) + return nil + } + + // Policer instance + var gotRemoveRedundant bool + var gotReplicateTo []int + + p := New( + WithContainerSource(containerSrcFunc(containerSrc)), + WithPlacementBuilder(placementBuilderFunc(placementBuilder)), + WithNetmapKeys(announcedKeysFunc(func(k []byte) bool { + return bytes.Equal(k, nodes[0].PublicKey()) + })), + WithRemoteObjectHeaderFunc(headFn), + WithBuryFunc(buryFn), + WithRedundantCopyCallback(func(_ context.Context, a oid.Address) { + require.True(t, eqAddr(a, addr), "unexpected redundant copy callback: a=%v", a) + gotRemoveRedundant = true + }), + WithReplicator(replicatorFunc(func(_ context.Context, task replicator.Task, res replicator.TaskResult) { + require.True(t, eqAddr(task.Addr, addr), "unexpected replicator task: %+v", task) + for _, node := range task.Nodes { + gotReplicateTo = append(gotReplicateTo, int(node.PublicKey()[0])) + } + })), + ) + + addrWithType := objectcore.AddressWithType{ + Address: addr, + Type: ti.objType, + } + + p.processObject(context.Background(), addrWithType) + sort.Ints(gotReplicateTo) + + require.Equal(t, ti.wantRemoveRedundant, gotRemoveRedundant) + require.Equal(t, ti.wantReplicateTo, gotReplicateTo) + }) + } +} + +// TODO(https://git.frostfs.info/TrueCloudLab/frostfs-sdk-go/issues/101) +func eqAddr(a, b oid.Address) bool { + return a.Container().Equals(b.Container()) && a.Object().Equals(b.Object()) +} + +// sliceKeySpaceIterator is a KeySpaceIterator backed by a slice. +type sliceKeySpaceIterator struct { + objs []objectcore.AddressWithType + cur int +} + +func (it *sliceKeySpaceIterator) Next(_ context.Context, size uint32) ([]objectcore.AddressWithType, error) { + if it.cur >= len(it.objs) { + it.cur = 0 + return nil, engine.ErrEndOfListing + } + end := it.cur + int(size) + if end > len(it.objs) { + end = len(it.objs) + } + ret := it.objs[it.cur:end] + it.cur = end + return ret, nil +} + +// containerSrcFunc is a container.Source backed by a function. +type containerSrcFunc func(cid.ID) (*container.Container, error) + +func (f containerSrcFunc) Get(id cid.ID) (*container.Container, error) { return f(id) } + +// placementBuilderFunc is a placement.Builder backed by a function +type placementBuilderFunc func(cid.ID, *oid.ID, netmap.PlacementPolicy) ([][]netmap.NodeInfo, error) + +func (f placementBuilderFunc) BuildPlacement(c cid.ID, o *oid.ID, p netmap.PlacementPolicy) ([][]netmap.NodeInfo, error) { + return f(c, o, p) +} + +// announcedKeysFunc is a netmap.AnnouncedKeys backed by a function. +type announcedKeysFunc func([]byte) bool + +func (f announcedKeysFunc) IsLocalKey(k []byte) bool { return f(k) } + +// constNodeLoader is a nodeLoader that always returns a fixed value. +type constNodeLoader float64 + +func (f constNodeLoader) ObjectServiceLoad() float64 { return float64(f) } + +// replicatorFunc is a Replicator backed by a function. +type replicatorFunc func(context.Context, replicator.Task, replicator.TaskResult) + +func (f replicatorFunc) HandleTask(ctx context.Context, task replicator.Task, res replicator.TaskResult) { + f(ctx, task, res) +} diff --git a/pkg/services/policer/process.go b/pkg/services/policer/process.go index 60e924755..6bd38f618 100644 --- a/pkg/services/policer/process.go +++ b/pkg/services/policer/process.go @@ -6,27 +6,17 @@ import ( "time" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" - objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine" "go.uber.org/zap" ) func (p *Policer) Run(ctx context.Context) { - defer func() { - p.log.Info(logs.PolicerRoutineStopped) - }() - go p.poolCapacityWorker(ctx) p.shardPolicyWorker(ctx) + p.log.Info(logs.PolicerRoutineStopped) } func (p *Policer) shardPolicyWorker(ctx context.Context) { - var ( - addrs []objectcore.AddressWithType - cursor *engine.Cursor - err error - ) - for { select { case <-ctx.Done(): @@ -34,7 +24,7 @@ func (p *Policer) shardPolicyWorker(ctx context.Context) { default: } - addrs, cursor, err = p.jobQueue.Select(ctx, cursor, p.batchSize) + addrs, err := p.keySpaceIterator.Next(ctx, p.batchSize) if err != nil { if errors.Is(err, engine.ErrEndOfListing) { time.Sleep(time.Second) // finished whole cycle, sleep a bit @@ -55,7 +45,7 @@ func (p *Policer) shardPolicyWorker(ctx context.Context) { continue } - err = p.taskPool.Submit(func() { + err := p.taskPool.Submit(func() { v, ok := p.cache.Get(addr.Address) if ok && time.Since(v) < p.evictDuration { return @@ -78,10 +68,10 @@ func (p *Policer) shardPolicyWorker(ctx context.Context) { func (p *Policer) poolCapacityWorker(ctx context.Context) { ticker := time.NewTicker(p.rebalanceFreq) + defer ticker.Stop() for { select { case <-ctx.Done(): - ticker.Stop() return case <-ticker.C: frostfsSysLoad := p.loader.ObjectServiceLoad() diff --git a/pkg/services/policer/queue.go b/pkg/services/policer/queue.go deleted file mode 100644 index 22012c835..000000000 --- a/pkg/services/policer/queue.go +++ /dev/null @@ -1,26 +0,0 @@ -package policer - -import ( - "context" - "fmt" - - objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine" -) - -type jobQueue struct { - localStorage *engine.StorageEngine -} - -func (q *jobQueue) Select(ctx context.Context, cursor *engine.Cursor, count uint32) ([]objectcore.AddressWithType, *engine.Cursor, error) { - var prm engine.ListWithCursorPrm - prm.WithCursor(cursor) - prm.WithCount(count) - - res, err := q.localStorage.ListWithCursor(ctx, prm) - if err != nil { - return nil, nil, fmt.Errorf("cannot list objects in engine: %w", err) - } - - return res.AddressList(), res.Cursor(), nil -} diff --git a/pkg/services/replicator/process.go b/pkg/services/replicator/process.go index 0f82ff232..a54668e12 100644 --- a/pkg/services/replicator/process.go +++ b/pkg/services/replicator/process.go @@ -24,16 +24,16 @@ func (p *Replicator) HandleTask(ctx context.Context, task Task, res TaskResult) defer p.metrics.DecInFlightRequest() defer func() { p.log.Debug(logs.ReplicatorFinishWork, - zap.Uint32("amount of unfinished replicas", task.quantity), + zap.Uint32("amount of unfinished replicas", task.NumCopies), ) }() - if task.obj == nil { + if task.Obj == nil { var err error - task.obj, err = engine.Get(ctx, p.localStorage, task.addr) + task.Obj, err = engine.Get(ctx, p.localStorage, task.Addr) if err != nil { p.log.Error(logs.ReplicatorCouldNotGetObjectFromLocalStorage, - zap.Stringer("object", task.addr), + zap.Stringer("object", task.Addr), zap.Error(err)) return @@ -41,9 +41,9 @@ func (p *Replicator) HandleTask(ctx context.Context, task Task, res TaskResult) } prm := new(putsvc.RemotePutPrm). - WithObject(task.obj) + WithObject(task.Obj) - for i := 0; task.quantity > 0 && i < len(task.nodes); i++ { + for i := 0; task.NumCopies > 0 && i < len(task.Nodes); i++ { select { case <-ctx.Done(): return @@ -51,13 +51,13 @@ func (p *Replicator) HandleTask(ctx context.Context, task Task, res TaskResult) } log := p.log.With( - zap.String("node", netmap.StringifyPublicKey(task.nodes[i])), - zap.Stringer("object", task.addr), + zap.String("node", netmap.StringifyPublicKey(task.Nodes[i])), + zap.Stringer("object", task.Addr), ) callCtx, cancel := context.WithTimeout(ctx, p.putTimeout) - err := p.remoteSender.PutObject(callCtx, prm.WithNodeInfo(task.nodes[i])) + err := p.remoteSender.PutObject(callCtx, prm.WithNodeInfo(task.Nodes[i])) cancel() @@ -68,12 +68,12 @@ func (p *Replicator) HandleTask(ctx context.Context, task Task, res TaskResult) } else { log.Debug(logs.ReplicatorObjectSuccessfullyReplicated) - task.quantity-- + task.NumCopies-- - res.SubmitSuccessfulReplication(task.nodes[i]) + res.SubmitSuccessfulReplication(task.Nodes[i]) p.metrics.IncProcessedObjects() - p.metrics.AddPayloadSize(int64(task.obj.PayloadSize())) + p.metrics.AddPayloadSize(int64(task.Obj.PayloadSize())) } } } diff --git a/pkg/services/replicator/task.go b/pkg/services/replicator/task.go index ec1b55788..d2b5b2506 100644 --- a/pkg/services/replicator/task.go +++ b/pkg/services/replicator/task.go @@ -8,31 +8,12 @@ import ( // Task represents group of Replicator task parameters. type Task struct { - quantity uint32 - - addr oid.Address - - obj *objectSDK.Object - - nodes []netmap.NodeInfo -} - -// SetCopiesNumber sets number of copies to replicate. -func (t *Task) SetCopiesNumber(v uint32) { - t.quantity = v -} - -// SetObjectAddress sets address of local object. -func (t *Task) SetObjectAddress(v oid.Address) { - t.addr = v -} - -// SetObject sets object to avoid fetching it from the local storage. -func (t *Task) SetObject(obj *objectSDK.Object) { - t.obj = obj -} - -// SetNodes sets a list of potential object holders. -func (t *Task) SetNodes(v []netmap.NodeInfo) { - t.nodes = v + // NumCopies is the number of copies to replicate. + NumCopies uint32 + // Addr is the address of the local object. + Addr oid.Address + // Obj is the object to avoid fetching it from the local storage. + Obj *objectSDK.Object + // Nodes is a list of potential object holders. + Nodes []netmap.NodeInfo }