[#92] Refactor policer and add some unit tests #485

Merged
fyrchik merged 3 commits from ale64bit/frostfs-node:fix/policer-refactor-test into master 2023-07-03 07:05:33 +00:00
13 changed files with 640 additions and 310 deletions
Showing only changes of commit 95c32af491 - Show all commits

View file

@ -0,0 +1,28 @@
package main
import (
"context"
"fmt"
objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
)
type keySpaceIterator struct {
ng *engine.StorageEngine
cur *engine.Cursor
}
func (it *keySpaceIterator) Next(ctx context.Context, batchSize uint32) ([]objectcore.AddressWithType, error) {
var prm engine.ListWithCursorPrm
prm.WithCursor(it.cur)
prm.WithCount(batchSize)
res, err := it.ng.ListWithCursor(ctx, prm)
if err != nil {
return nil, fmt.Errorf("cannot list objects in engine: %w", err)
}
it.cur = res.Cursor()
return res.AddressList(), nil
}

View file

@ -38,6 +38,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
eaclSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/eacl" eaclSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/eacl"
netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
@ -211,15 +212,30 @@ func addPolicer(c *cfg, keyStorage *util.KeyStorage, clientConstructor *cache.Cl
ls := c.cfgObject.cfgLocalStorage.localStorage ls := c.cfgObject.cfgLocalStorage.localStorage
buryFn := func(ctx context.Context, addr oid.Address) error {
var prm engine.InhumePrm
prm.MarkAsGarbage(addr)
prm.WithForceRemoval()
_, err := ls.Inhume(ctx, prm)
return err
}
remoteHeader := headsvc.NewRemoteHeader(keyStorage, clientConstructor)
pol := policer.New( pol := policer.New(
policer.WithLogger(c.log), policer.WithLogger(c.log),
policer.WithLocalStorage(ls), policer.WithKeySpaceIterator(&keySpaceIterator{ng: ls}),
policer.WithBuryFunc(buryFn),
policer.WithContainerSource(c.cfgObject.cnrSource), policer.WithContainerSource(c.cfgObject.cnrSource),
policer.WithPlacementBuilder( policer.WithPlacementBuilder(
placement.NewNetworkMapSourceBuilder(c.netMapSource), placement.NewNetworkMapSourceBuilder(c.netMapSource),
), ),
policer.WithRemoteHeader( policer.WithRemoteObjectHeaderFunc(
headsvc.NewRemoteHeader(keyStorage, clientConstructor), func(ctx context.Context, ni netmapSDK.NodeInfo, a oid.Address) (*objectSDK.Object, error) {
prm := new(headsvc.RemoteHeadPrm).WithNodeInfo(ni).WithObjectAddress(a)
return remoteHeader.Head(ctx, prm)
},
), ),
policer.WithNetmapKeys(c), policer.WithNetmapKeys(c),
policer.WithHeadTimeout( policer.WithHeadTimeout(

View file

@ -84,11 +84,12 @@ func (s *Server) replicate(ctx context.Context, addr oid.Address, obj *objectSDK
} }
var res replicatorResult var res replicatorResult
var task replicator.Task task := replicator.Task{
task.SetObject(obj) NumCopies: 1,
task.SetObjectAddress(addr) Addr: addr,
task.SetCopiesNumber(1) Obj: obj,
task.SetNodes(nodes) Nodes: nodes,
}
s.replicator.HandleTask(ctx, task, &res) s.replicator.HandleTask(ctx, task, &res)
if res.count == 0 { if res.count == 0 {

View file

@ -7,8 +7,6 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
headsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/head"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
@ -18,55 +16,6 @@ import (
"go.uber.org/zap" "go.uber.org/zap"
) )
// tracks Policer's check progress.
type nodeCache map[uint64]bool
func newNodeCache() *nodeCache {
m := make(map[uint64]bool)
return (*nodeCache)(&m)
}
func (n *nodeCache) set(node netmap.NodeInfo, val bool) {
(*n)[node.Hash()] = val
}
// submits storage node as a candidate to store the object replica in case of
// shortage.
func (n *nodeCache) submitReplicaCandidate(node netmap.NodeInfo) {
n.set(node, false)
}
// submits storage node as a current object replica holder.
func (n *nodeCache) submitReplicaHolder(node netmap.NodeInfo) {
n.set(node, true)
}
// processStatus returns current processing status of the storage node
//
// >0 if node does not currently hold the object
// 0 if node already holds the object
// <0 if node has not been processed yet
func (n *nodeCache) processStatus(node netmap.NodeInfo) int8 {
val, ok := (*n)[node.Hash()]
if !ok {
return -1
}
if val {
return 0
}
return 1
}
// SubmitSuccessfulReplication marks given storage node as a current object
// replica holder.
//
// SubmitSuccessfulReplication implements replicator.TaskResult.
func (n *nodeCache) SubmitSuccessfulReplication(node netmap.NodeInfo) {
n.submitReplicaHolder(node)
}
func (p *Policer) processObject(ctx context.Context, addrWithType objectcore.AddressWithType) { func (p *Policer) processObject(ctx context.Context, addrWithType objectcore.AddressWithType) {
addr := addrWithType.Address addr := addrWithType.Address
idCnr := addr.Container() idCnr := addr.Container()
@ -79,11 +28,7 @@ func (p *Policer) processObject(ctx context.Context, addrWithType objectcore.Add
zap.String("error", err.Error()), zap.String("error", err.Error()),
) )
if container.IsErrNotFound(err) { if container.IsErrNotFound(err) {
var prm engine.InhumePrm err := p.buryFn(ctx, addrWithType.Address)
prm.MarkAsGarbage(addrWithType.Address)
prm.WithForceRemoval()
_, err := p.jobQueue.localStorage.Inhume(ctx, prm)
if err != nil { if err != nil {
p.log.Error(logs.PolicerCouldNotInhumeObjectWithMissingContainer, p.log.Error(logs.PolicerCouldNotInhumeObjectWithMissingContainer,
zap.Stringer("cid", idCnr), zap.Stringer("cid", idCnr),
@ -145,10 +90,9 @@ type placementRequirements struct {
} }
func (p *Policer) processNodes(ctx context.Context, requirements *placementRequirements, addrWithType objectcore.AddressWithType, func (p *Policer) processNodes(ctx context.Context, requirements *placementRequirements, addrWithType objectcore.AddressWithType,
nodes []netmap.NodeInfo, shortage uint32, checkedNodes *nodeCache) { nodes []netmap.NodeInfo, shortage uint32, checkedNodes nodeCache) {
addr := addrWithType.Address addr := addrWithType.Address
typ := addrWithType.Type typ := addrWithType.Type
prm := new(headsvc.RemoteHeadPrm).WithObjectAddress(addr)
// Number of copies that are stored on maintenance nodes. // Number of copies that are stored on maintenance nodes.
var uncheckedCopies int var uncheckedCopies int
@ -175,8 +119,8 @@ func (p *Policer) processNodes(ctx context.Context, requirements *placementRequi
} else if nodes[i].IsMaintenance() { } else if nodes[i].IsMaintenance() {
shortage, uncheckedCopies = p.handleMaintenance(nodes[i], checkedNodes, shortage, uncheckedCopies) shortage, uncheckedCopies = p.handleMaintenance(nodes[i], checkedNodes, shortage, uncheckedCopies)
} else { } else {
if status := checkedNodes.processStatus(nodes[i]); status >= 0 { if status := checkedNodes.processStatus(nodes[i]); status.Processed() {
if status == 0 { if status == nodeHoldsObject {
// node already contains replica, no need to replicate // node already contains replica, no need to replicate
nodes = append(nodes[:i], nodes[i+1:]...) nodes = append(nodes[:i], nodes[i+1:]...)
i-- i--
@ -188,7 +132,7 @@ func (p *Policer) processNodes(ctx context.Context, requirements *placementRequi
callCtx, cancel := context.WithTimeout(ctx, p.headTimeout) callCtx, cancel := context.WithTimeout(ctx, p.headTimeout)
_, err := p.remoteHeader.Head(callCtx, prm.WithNodeInfo(nodes[i])) _, err := p.remoteHeader(callCtx, nodes[i], addr)
cancel() cancel()
@ -224,7 +168,7 @@ func (p *Policer) processNodes(ctx context.Context, requirements *placementRequi
// prevent spam with new replicas. // prevent spam with new replicas.
// However, additional copies should not be removed in this case, // However, additional copies should not be removed in this case,
// because we can remove the only copy this way. // because we can remove the only copy this way.
func (p *Policer) handleMaintenance(node netmap.NodeInfo, checkedNodes *nodeCache, shortage uint32, uncheckedCopies int) (uint32, int) { func (p *Policer) handleMaintenance(node netmap.NodeInfo, checkedNodes nodeCache, shortage uint32, uncheckedCopies int) (uint32, int) {
checkedNodes.submitReplicaHolder(node) checkedNodes.submitReplicaHolder(node)
shortage-- shortage--
uncheckedCopies++ uncheckedCopies++
@ -236,25 +180,29 @@ func (p *Policer) handleMaintenance(node netmap.NodeInfo, checkedNodes *nodeCach
} }
func (p *Policer) handleProcessNodesResult(ctx context.Context, addr oid.Address, requirements *placementRequirements, func (p *Policer) handleProcessNodesResult(ctx context.Context, addr oid.Address, requirements *placementRequirements,
nodes []netmap.NodeInfo, checkedNodes *nodeCache, shortage uint32, uncheckedCopies int) { nodes []netmap.NodeInfo, checkedNodes nodeCache, shortage uint32, uncheckedCopies int) {
if shortage > 0 { switch {
case shortage > 0:
p.log.Debug(logs.PolicerShortageOfObjectCopiesDetected, p.log.Debug(logs.PolicerShortageOfObjectCopiesDetected,
zap.Stringer("object", addr), zap.Stringer("object", addr),
zap.Uint32("shortage", shortage), zap.Uint32("shortage", shortage),
) )
var task replicator.Task task := replicator.Task{
task.SetObjectAddress(addr) NumCopies: shortage,
task.SetNodes(nodes) Addr: addr,
task.SetCopiesNumber(shortage) Nodes: nodes,
}
p.replicator.HandleTask(ctx, task, checkedNodes) p.replicator.HandleTask(ctx, task, checkedNodes)
} else if uncheckedCopies > 0 {
case uncheckedCopies > 0:
// If we have more copies than needed, but some of them are from the maintenance nodes, // If we have more copies than needed, but some of them are from the maintenance nodes,
// save the local copy. // save the local copy.
p.log.Debug(logs.PolicerSomeOfTheCopiesAreStoredOnNodesUnderMaintenance, p.log.Debug(logs.PolicerSomeOfTheCopiesAreStoredOnNodesUnderMaintenance,
zap.Int("count", uncheckedCopies)) zap.Int("count", uncheckedCopies))
} else if uncheckedCopies == 0 {
case uncheckedCopies == 0:
// Safe to remove: checked all copies, shortage == 0. // Safe to remove: checked all copies, shortage == 0.
requirements.removeLocalCopy = true requirements.removeLocalCopy = true
} }

View file

@ -11,14 +11,14 @@ func TestNodeCache(t *testing.T) {
cache := newNodeCache() cache := newNodeCache()
node := netmaptest.NodeInfo() node := netmaptest.NodeInfo()
require.Negative(t, cache.processStatus(node)) require.Equal(t, cache.processStatus(node), nodeNotProcessed)
cache.SubmitSuccessfulReplication(node) cache.SubmitSuccessfulReplication(node)
require.Zero(t, cache.processStatus(node)) require.Equal(t, cache.processStatus(node), nodeHoldsObject)
cache.submitReplicaCandidate(node) cache.submitReplicaCandidate(node)
require.Positive(t, cache.processStatus(node)) require.Equal(t, cache.processStatus(node), nodeDoesNotHoldObject)
cache.submitReplicaHolder(node) cache.submitReplicaHolder(node)
require.Zero(t, cache.processStatus(node)) require.Equal(t, cache.processStatus(node), nodeHoldsObject)
} }

View file

@ -0,0 +1,57 @@
package policer
import "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
type nodeProcessStatus int8
const (
nodeNotProcessed nodeProcessStatus = iota
nodeDoesNotHoldObject
nodeHoldsObject
)
func (st nodeProcessStatus) Processed() bool {
return st != nodeNotProcessed
}
// nodeCache tracks Policer's check progress.
type nodeCache map[uint64]bool
func newNodeCache() nodeCache {
return make(map[uint64]bool)
}
func (n nodeCache) set(node netmap.NodeInfo, val bool) {
n[node.Hash()] = val
}
// submits storage node as a candidate to store the object replica in case of
// shortage.
func (n nodeCache) submitReplicaCandidate(node netmap.NodeInfo) {
n.set(node, false)
}
// submits storage node as a current object replica holder.
func (n nodeCache) submitReplicaHolder(node netmap.NodeInfo) {
n.set(node, true)
}
// processStatus returns current processing status of the storage node.
func (n nodeCache) processStatus(node netmap.NodeInfo) nodeProcessStatus {
switch val, ok := n[node.Hash()]; {
case !ok:
return nodeNotProcessed
case val:
return nodeHoldsObject
default:
return nodeDoesNotHoldObject
}
}
// SubmitSuccessfulReplication marks given storage node as a current object
// replica holder.
//
// SubmitSuccessfulReplication implements replicator.TaskResult.
func (n nodeCache) SubmitSuccessfulReplication(node netmap.NodeInfo) {
n.submitReplicaHolder(node)
}

View file

@ -0,0 +1,185 @@
package policer
import (
"context"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"github.com/panjf2000/ants/v2"
"go.uber.org/zap"
)
// KeySpaceIterator is the interface that allows iterating over the key space
// of local storage.
// Note that the underlying implementation might be circular: i.e. it can restart
// when the end of the key space is reached.
type KeySpaceIterator interface {
Next(context.Context, uint32) ([]objectcore.AddressWithType, error)
}
// RedundantCopyCallback is a callback to pass
// the redundant local copy of the object.
type RedundantCopyCallback func(context.Context, oid.Address)
// BuryFunc is the function to bury (i.e. inhume) an object.
type BuryFunc func(context.Context, oid.Address) error
fyrchik marked this conversation as resolved Outdated

So BuryFunc is a function, but Replicator is a single-method interface.
What are the differences?

So `BuryFunc` is a function, but `Replicator` is a single-method interface. What are the differences?

I don't think there's a major difference, except where the wrapper will live. In this case, I tried to preserve the call-site option for Replicator by making it an interface in policer, while the bury function wasn't there in the first place so there was no need to wrap it in a struct/interface.

I don't think there's a major difference, except where the wrapper will live. In this case, I tried to preserve the call-site option for `Replicator` by making it an interface in policer, while the bury function wasn't there in the first place so there was no need to wrap it in a struct/interface.
// Replicator is the interface to a consumer of replication tasks.
type Replicator interface {
HandleTask(ctx context.Context, task replicator.Task, res replicator.TaskResult)
}
// RemoteObjectHeaderFunc is the function to obtain HEAD info from a specific remote node.
type RemoteObjectHeaderFunc func(context.Context, netmapSDK.NodeInfo, oid.Address) (*objectSDK.Object, error)
// NodeLoader provides application load statistics.
type nodeLoader interface {
// ObjectServiceLoad returns object service load value in [0:1] range.
ObjectServiceLoad() float64
}
type cfg struct {
headTimeout time.Duration
log *logger.Logger
keySpaceIterator KeySpaceIterator

Why It and not Iterator?

Why `It` and not `Iterator`?

It's a well-known abbreviation so it seems clear to shorten it.
Other fields use similar or less-known abbreviations, e.g. cfg, cnrSrc, cbRedundantCopy, rebalanceFreq` and so on.

It's a well-known abbreviation so it seems clear to shorten it. Other fields use similar or less-known abbreviations, e.g. `cfg, `cnrSrc`, `cbRedundantCopy`, `rebalanceFreq` and so on.

Ok, but I had some troubles with parsing -- it is also an anaphor and it is hard to tell the meaning without looking at the type.

Ok, but I had some troubles with parsing -- `it` is also an anaphor and it is hard to tell the meaning without looking at the type.

done

it is also an anaphor and it is hard to tell the meaning without looking at the type.

I don't think I've ever seen people writing variable names like iLikeToMoveItMoveIt or anything like that, where an It suffix might be ambiguous.

done > it is also an anaphor and it is hard to tell the meaning without looking at the type. I don't think I've ever seen people writing variable names like `iLikeToMoveItMoveIt` or anything like that, where an `It` suffix might be ambiguous.

Ironially, we have ToMoveIt in the metabase code. I don't like it though.

Ironially, we have `ToMoveIt` in the metabase code. I don't like it though.
buryFn BuryFunc
cnrSrc container.Source
placementBuilder placement.Builder
remoteHeader RemoteObjectHeaderFunc
netmapKeys netmap.AnnouncedKeys
replicator Replicator
cbRedundantCopy RedundantCopyCallback
taskPool *ants.Pool
loader nodeLoader
maxCapacity int
batchSize, cacheSize uint32
rebalanceFreq, evictDuration time.Duration
}
func defaultCfg() *cfg {
return &cfg{
log: &logger.Logger{Logger: zap.L()},
batchSize: 10,
cacheSize: 1024, // 1024 * address size = 1024 * 64 = 64 MiB
rebalanceFreq: 1 * time.Second,
evictDuration: 30 * time.Second,
}
}
// Option is an option for Policer constructor.
type Option func(*cfg)
// WithHeadTimeout returns option to set Head timeout of Policer.
func WithHeadTimeout(v time.Duration) Option {
return func(c *cfg) {
c.headTimeout = v
}
}
// WithLogger returns option to set Logger of Policer.
func WithLogger(v *logger.Logger) Option {
return func(c *cfg) {
c.log = v
}
}
func WithKeySpaceIterator(it KeySpaceIterator) Option {
return func(c *cfg) {
c.keySpaceIterator = it
}
}
func WithBuryFunc(f BuryFunc) Option {
return func(c *cfg) {
c.buryFn = f
}
}
// WithContainerSource returns option to set container source of Policer.
func WithContainerSource(v container.Source) Option {
return func(c *cfg) {
c.cnrSrc = v
}
}
// WithPlacementBuilder returns option to set object placement builder of Policer.
func WithPlacementBuilder(v placement.Builder) Option {
return func(c *cfg) {
c.placementBuilder = v
}
}
// WithRemoteObjectHeader returns option to set object header receiver of Policer.
func WithRemoteObjectHeaderFunc(v RemoteObjectHeaderFunc) Option {
return func(c *cfg) {
c.remoteHeader = v
}
}
// WithNetmapKeys returns option to set tool to work with announced public keys.
func WithNetmapKeys(v netmap.AnnouncedKeys) Option {
return func(c *cfg) {
c.netmapKeys = v
}
}
// WithReplicator returns option to set object replicator of Policer.
func WithReplicator(v Replicator) Option {
return func(c *cfg) {
c.replicator = v
}
}
// WithRedundantCopyCallback returns option to set
// callback to pass redundant local object copies
// detected by Policer.
func WithRedundantCopyCallback(cb RedundantCopyCallback) Option {
return func(c *cfg) {
c.cbRedundantCopy = cb
}
}
// WithMaxCapacity returns option to set max capacity
// that can be set to the pool.
func WithMaxCapacity(capacity int) Option {
return func(c *cfg) {
c.maxCapacity = capacity
}
}
// WithPool returns option to set pool for
// policy and replication operations.
func WithPool(p *ants.Pool) Option {
return func(c *cfg) {
c.taskPool = p
}
}
// WithNodeLoader returns option to set FrostFS node load source.
func WithNodeLoader(l nodeLoader) Option {
return func(c *cfg) {
c.loader = l
}
}

View file

@ -1,29 +1,15 @@
package policer package policer
import ( import (
"context"
"sync" "sync"
"time" "time"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
headsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/head"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
lru "github.com/hashicorp/golang-lru/v2" lru "github.com/hashicorp/golang-lru/v2"
"github.com/panjf2000/ants/v2"
"go.uber.org/zap" "go.uber.org/zap"
) )
// NodeLoader provides application load statistics.
type nodeLoader interface {
// ObjectServiceLoad returns object service load value in [0:1] range.
ObjectServiceLoad() float64
}
type objectsInWork struct { type objectsInWork struct {
m sync.RWMutex m sync.RWMutex
fyrchik marked this conversation as resolved Outdated

Why this change? Could you move it to a separate commit?

Why this change? Could you move it to a separate commit?

Why this change?

If the purpose of the struct is just to wrap a field and its lock, it seems cleaner to me to embed the lock to avoid the .mu stutter in methods.

Could you move it to a separate commit?

done

> Why this change? If the purpose of the `struct` is just to wrap a field and its lock, it seems cleaner to me to embed the lock to avoid the `.mu` stutter in methods. > Could you move it to a separate commit? done
objs map[oid.Address]struct{} objs map[oid.Address]struct{}
@ -59,53 +45,6 @@ type Policer struct {
objsInWork *objectsInWork objsInWork *objectsInWork
} }
// Option is an option for Policer constructor.
type Option func(*cfg)
// RedundantCopyCallback is a callback to pass
// the redundant local copy of the object.
type RedundantCopyCallback func(context.Context, oid.Address)
type cfg struct {
headTimeout time.Duration
log *logger.Logger
jobQueue jobQueue
cnrSrc container.Source
placementBuilder placement.Builder
remoteHeader *headsvc.RemoteHeader
netmapKeys netmap.AnnouncedKeys
replicator *replicator.Replicator
cbRedundantCopy RedundantCopyCallback
taskPool *ants.Pool
loader nodeLoader
maxCapacity int
batchSize, cacheSize uint32
rebalanceFreq, evictDuration time.Duration
}
func defaultCfg() *cfg {
return &cfg{
log: &logger.Logger{Logger: zap.L()},
batchSize: 10,
cacheSize: 1024, // 1024 * address size = 1024 * 64 = 64 MiB
rebalanceFreq: 1 * time.Second,
evictDuration: 30 * time.Second,
}
}
// New creates, initializes and returns Policer instance. // New creates, initializes and returns Policer instance.
func New(opts ...Option) *Policer { func New(opts ...Option) *Policer {
c := defaultCfg() c := defaultCfg()
@ -129,91 +68,3 @@ func New(opts ...Option) *Policer {
}, },
} }
} }
// WithHeadTimeout returns option to set Head timeout of Policer.
func WithHeadTimeout(v time.Duration) Option {
return func(c *cfg) {
c.headTimeout = v
}
}
// WithLogger returns option to set Logger of Policer.
func WithLogger(v *logger.Logger) Option {
return func(c *cfg) {
c.log = v
}
}
// WithLocalStorage returns option to set local object storage of Policer.
func WithLocalStorage(v *engine.StorageEngine) Option {
return func(c *cfg) {
c.jobQueue.localStorage = v
}
}
// WithContainerSource returns option to set container source of Policer.
func WithContainerSource(v container.Source) Option {
return func(c *cfg) {
c.cnrSrc = v
}
}
// WithPlacementBuilder returns option to set object placement builder of Policer.
func WithPlacementBuilder(v placement.Builder) Option {
return func(c *cfg) {
c.placementBuilder = v
}
}
// WithRemoteHeader returns option to set object header receiver of Policer.
func WithRemoteHeader(v *headsvc.RemoteHeader) Option {
return func(c *cfg) {
c.remoteHeader = v
}
}
// WithNetmapKeys returns option to set tool to work with announced public keys.
func WithNetmapKeys(v netmap.AnnouncedKeys) Option {
return func(c *cfg) {
c.netmapKeys = v
}
}
// WithReplicator returns option to set object replicator of Policer.
func WithReplicator(v *replicator.Replicator) Option {
return func(c *cfg) {
c.replicator = v
}
}
// WithRedundantCopyCallback returns option to set
// callback to pass redundant local object copies
// detected by Policer.
func WithRedundantCopyCallback(cb RedundantCopyCallback) Option {
return func(c *cfg) {
c.cbRedundantCopy = cb
}
}
// WithMaxCapacity returns option to set max capacity
// that can be set to the pool.
func WithMaxCapacity(capacity int) Option {
return func(c *cfg) {
c.maxCapacity = capacity
}
}
// WithPool returns option to set pool for
// policy and replication operations.
func WithPool(p *ants.Pool) Option {
return func(c *cfg) {
c.taskPool = p
}
}
// WithNodeLoader returns option to set FrostFS node load source.
func WithNodeLoader(l nodeLoader) Option {
return func(c *cfg) {
c.loader = l
}
}

View file

@ -0,0 +1,299 @@
package policer
import (
"bytes"
"context"
"errors"
"sort"
"testing"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
"github.com/panjf2000/ants/v2"
"github.com/stretchr/testify/require"
)
func TestBuryObjectWithoutContainer(t *testing.T) {
// Key space
addr := oidtest.Address()
objs := []objectcore.AddressWithType{
{
Address: addr,
Type: object.TypeRegular,
},
}
// Container source and bury function
buryCh := make(chan oid.Address)
containerSrc := func(id cid.ID) (*container.Container, error) {
return nil, apistatus.ContainerNotFound{}
}
buryFn := func(ctx context.Context, a oid.Address) error {
buryCh <- a
return nil
}
// Task pool
pool, err := ants.NewPool(4)
require.NoError(t, err)
// Policer instance
p := New(
WithKeySpaceIterator(&sliceKeySpaceIterator{objs: objs}),
WithContainerSource(containerSrcFunc(containerSrc)),
WithBuryFunc(buryFn),
WithPool(pool),
WithNodeLoader(constNodeLoader(0)),
)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go p.Run(ctx)
require.Equal(t, addr, <-buryCh)
}
func TestProcessObject(t *testing.T) {
// Notes:
// - nodes are referred to by their index throughout, which is embedded in the public key
// - node with index 0 always refers to the local node, so there's no need to add it to objHolders
// - policy is used only to match the number of replicas for each index in the placement
tests := []struct {
desc string
objType object.Type
nodeCount int
policy string
placement [][]int
objHolders []int
maintenanceNodes []int
wantRemoveRedundant bool
wantReplicateTo []int
}{
{
desc: "1 copy already held by local node",
nodeCount: 1,
policy: `REP 1`,
placement: [][]int{{0}},
},
{
desc: "1 copy already held by the remote node",
nodeCount: 2,
policy: `REP 1`,
placement: [][]int{{1}},
objHolders: []int{1},
wantRemoveRedundant: true,
},
{
desc: "1 copy not yet held by the remote node",
nodeCount: 2,
policy: `REP 1`,
placement: [][]int{{1}},
wantReplicateTo: []int{1},
},
{
desc: "2 copies already held by local and remote node",
nodeCount: 2,
policy: `REP 2`,
placement: [][]int{{0, 1}},
objHolders: []int{1},
},
{
desc: "2 copies but not held by remote node",
nodeCount: 2,
policy: `REP 2`,
placement: [][]int{{0, 1}},
wantReplicateTo: []int{1},
},
{
desc: "multiple vectors already held by remote node",
nodeCount: 2,
policy: `REP 2 REP 2`,
placement: [][]int{{0, 1}, {0, 1}},
objHolders: []int{1},
},
{
desc: "multiple vectors not yet held by remote node",
nodeCount: 2,
policy: `REP 2 REP 2`,
placement: [][]int{{0, 1}, {0, 1}},
wantReplicateTo: []int{1, 1}, // is this actually good?
},
{
desc: "lock object must be replicated to all nodes",
objType: object.TypeLock,
nodeCount: 3,
policy: `REP 1`,
placement: [][]int{{0, 1, 2}},
wantReplicateTo: []int{1, 2},
},
{
desc: "preserve local copy when maintenance nodes exist",
nodeCount: 3,
policy: `REP 2`,
placement: [][]int{{1, 2}},
objHolders: []int{1},
maintenanceNodes: []int{2},
},
}
for i := range tests {
ti := tests[i]
t.Run(ti.desc, func(t *testing.T) {
addr := oidtest.Address()
// Netmap, placement policy and placement builder
nodes := make([]netmap.NodeInfo, ti.nodeCount)
for i := range nodes {
nodes[i].SetPublicKey([]byte{byte(i)})
}
for _, i := range ti.maintenanceNodes {
nodes[i].SetMaintenance()
}
var policy netmap.PlacementPolicy
require.NoError(t, policy.DecodeString(ti.policy))
placementVectors := make([][]netmap.NodeInfo, len(ti.placement))
for i, pv := range ti.placement {
for _, nj := range pv {
placementVectors[i] = append(placementVectors[i], nodes[nj])
}
}
placementBuilder := func(cnr cid.ID, obj *oid.ID, p netmap.PlacementPolicy) ([][]netmap.NodeInfo, error) {
if cnr.Equals(addr.Container()) && obj != nil && obj.Equals(addr.Object()) {
fyrchik marked this conversation as resolved Outdated

We have cid.ID.Equals() for this purpose, which is better (see the comment to Equals). Same for oid.ID.

We have `cid.ID.Equals()` for this purpose, which is better (see the comment to `Equals`). Same for `oid.ID`.

done

done
return placementVectors, nil
}
t.Errorf("unexpected placement build: cid=%v oid=%v", cnr, obj)
return nil, errors.New("unexpected placement build")
}
// Object remote header
headFn := func(_ context.Context, ni netmap.NodeInfo, a oid.Address) (*object.Object, error) {
index := int(ni.PublicKey()[0])
if a != addr || index < 1 || index >= ti.nodeCount {
t.Errorf("unexpected remote object head: node=%+v addr=%v", ni, a)
return nil, errors.New("unexpected object head")
}
for _, i := range ti.objHolders {
if index == i {
return nil, nil
}
}
return nil, apistatus.ObjectNotFound{}
}
// Container source
cnr := &container.Container{}
cnr.Value.Init()
cnr.Value.SetPlacementPolicy(policy)
containerSrc := func(id cid.ID) (*container.Container, error) {
if id.Equals(addr.Container()) {
return cnr, nil
}
t.Errorf("unexpected container requested: got=%v, want=%v", id, addr.Container())
return nil, apistatus.ContainerNotFound{}
}
buryFn := func(ctx context.Context, a oid.Address) error {
t.Errorf("unexpected object buried: %v", a)
return nil
}
// Policer instance
var gotRemoveRedundant bool
var gotReplicateTo []int
p := New(
WithContainerSource(containerSrcFunc(containerSrc)),
WithPlacementBuilder(placementBuilderFunc(placementBuilder)),
WithNetmapKeys(announcedKeysFunc(func(k []byte) bool {
return bytes.Equal(k, nodes[0].PublicKey())
})),
WithRemoteObjectHeaderFunc(headFn),
WithBuryFunc(buryFn),
WithRedundantCopyCallback(func(_ context.Context, a oid.Address) {
require.True(t, eqAddr(a, addr), "unexpected redundant copy callback: a=%v", a)
gotRemoveRedundant = true
fyrchik marked this conversation as resolved Outdated

Why not require.Equal?

Why not `require.Equal`?
done (https://git.frostfs.info/TrueCloudLab/frostfs-sdk-go/issues/101)

No, I mean that we use https://github.com/stretchr/testify and not t.* directly across the codebase. Do we have a compelling reason to not be uniform here?

No, I mean that we use https://github.com/stretchr/testify and not `t.*` directly across the codebase. Do we have a compelling reason to not be uniform here?

but ... I did change it to:

require.True(t, eqAddr(a, addr), "unexpected redundant copy callback: a=%v", a)

What exactly do you propose?

but ... I did change it to: ``` require.True(t, eqAddr(a, addr), "unexpected redundant copy callback: a=%v", a) ``` What exactly do you propose?

I see, looked at the old version first.

I see, looked at the old version first.
}),
WithReplicator(replicatorFunc(func(_ context.Context, task replicator.Task, res replicator.TaskResult) {
require.True(t, eqAddr(task.Addr, addr), "unexpected replicator task: %+v", task)
for _, node := range task.Nodes {
gotReplicateTo = append(gotReplicateTo, int(node.PublicKey()[0]))
}
})),
)
addrWithType := objectcore.AddressWithType{
Address: addr,
Type: ti.objType,
}
p.processObject(context.Background(), addrWithType)
sort.Ints(gotReplicateTo)
require.Equal(t, ti.wantRemoveRedundant, gotRemoveRedundant)
require.Equal(t, ti.wantReplicateTo, gotReplicateTo)
})
}
}
// TODO(https://git.frostfs.info/TrueCloudLab/frostfs-sdk-go/issues/101)
func eqAddr(a, b oid.Address) bool {
return a.Container().Equals(b.Container()) && a.Object().Equals(b.Object())
}
// sliceKeySpaceIterator is a KeySpaceIterator backed by a slice.
type sliceKeySpaceIterator struct {
objs []objectcore.AddressWithType
cur int
}
func (it *sliceKeySpaceIterator) Next(_ context.Context, size uint32) ([]objectcore.AddressWithType, error) {
if it.cur >= len(it.objs) {
it.cur = 0
return nil, engine.ErrEndOfListing
}
end := it.cur + int(size)
if end > len(it.objs) {
end = len(it.objs)
}
ret := it.objs[it.cur:end]
it.cur = end
return ret, nil
}
// containerSrcFunc is a container.Source backed by a function.
type containerSrcFunc func(cid.ID) (*container.Container, error)
fyrchik marked this conversation as resolved Outdated

Dot at the end (here and below), tests are skipped by linter, godot would complain

Dot at the end (here and below), tests are skipped by linter, godot would complain

done

done
func (f containerSrcFunc) Get(id cid.ID) (*container.Container, error) { return f(id) }
// placementBuilderFunc is a placement.Builder backed by a function
type placementBuilderFunc func(cid.ID, *oid.ID, netmap.PlacementPolicy) ([][]netmap.NodeInfo, error)
func (f placementBuilderFunc) BuildPlacement(c cid.ID, o *oid.ID, p netmap.PlacementPolicy) ([][]netmap.NodeInfo, error) {
return f(c, o, p)
}
// announcedKeysFunc is a netmap.AnnouncedKeys backed by a function.
type announcedKeysFunc func([]byte) bool
func (f announcedKeysFunc) IsLocalKey(k []byte) bool { return f(k) }
// constNodeLoader is a nodeLoader that always returns a fixed value.
type constNodeLoader float64
func (f constNodeLoader) ObjectServiceLoad() float64 { return float64(f) }
// replicatorFunc is a Replicator backed by a function.
type replicatorFunc func(context.Context, replicator.Task, replicator.TaskResult)
func (f replicatorFunc) HandleTask(ctx context.Context, task replicator.Task, res replicator.TaskResult) {
f(ctx, task, res)
}

View file

@ -6,27 +6,17 @@ import (
"time" "time"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
"go.uber.org/zap" "go.uber.org/zap"
) )
func (p *Policer) Run(ctx context.Context) { func (p *Policer) Run(ctx context.Context) {
defer func() {
p.log.Info(logs.PolicerRoutineStopped)
}()
go p.poolCapacityWorker(ctx) go p.poolCapacityWorker(ctx)
p.shardPolicyWorker(ctx) p.shardPolicyWorker(ctx)
p.log.Info(logs.PolicerRoutineStopped)
} }
func (p *Policer) shardPolicyWorker(ctx context.Context) { func (p *Policer) shardPolicyWorker(ctx context.Context) {
var (
addrs []objectcore.AddressWithType
cursor *engine.Cursor
err error
)
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
@ -34,7 +24,7 @@ func (p *Policer) shardPolicyWorker(ctx context.Context) {
default: default:
} }
addrs, cursor, err = p.jobQueue.Select(ctx, cursor, p.batchSize) addrs, err := p.keySpaceIterator.Next(ctx, p.batchSize)
if err != nil { if err != nil {
if errors.Is(err, engine.ErrEndOfListing) { if errors.Is(err, engine.ErrEndOfListing) {
time.Sleep(time.Second) // finished whole cycle, sleep a bit time.Sleep(time.Second) // finished whole cycle, sleep a bit
@ -55,7 +45,7 @@ func (p *Policer) shardPolicyWorker(ctx context.Context) {
continue continue
} }
err = p.taskPool.Submit(func() { err := p.taskPool.Submit(func() {
v, ok := p.cache.Get(addr.Address) v, ok := p.cache.Get(addr.Address)
if ok && time.Since(v) < p.evictDuration { if ok && time.Since(v) < p.evictDuration {
return return
@ -78,10 +68,10 @@ func (p *Policer) shardPolicyWorker(ctx context.Context) {
func (p *Policer) poolCapacityWorker(ctx context.Context) { func (p *Policer) poolCapacityWorker(ctx context.Context) {
ticker := time.NewTicker(p.rebalanceFreq) ticker := time.NewTicker(p.rebalanceFreq)
defer ticker.Stop()
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
ticker.Stop()
return return
case <-ticker.C: case <-ticker.C:
frostfsSysLoad := p.loader.ObjectServiceLoad() frostfsSysLoad := p.loader.ObjectServiceLoad()

View file

@ -1,26 +0,0 @@
package policer
import (
"context"
"fmt"
objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
)
type jobQueue struct {
localStorage *engine.StorageEngine
}
func (q *jobQueue) Select(ctx context.Context, cursor *engine.Cursor, count uint32) ([]objectcore.AddressWithType, *engine.Cursor, error) {
var prm engine.ListWithCursorPrm
prm.WithCursor(cursor)
prm.WithCount(count)
res, err := q.localStorage.ListWithCursor(ctx, prm)
if err != nil {
return nil, nil, fmt.Errorf("cannot list objects in engine: %w", err)
}
return res.AddressList(), res.Cursor(), nil
}

View file

@ -24,16 +24,16 @@ func (p *Replicator) HandleTask(ctx context.Context, task Task, res TaskResult)
defer p.metrics.DecInFlightRequest() defer p.metrics.DecInFlightRequest()
defer func() { defer func() {
p.log.Debug(logs.ReplicatorFinishWork, p.log.Debug(logs.ReplicatorFinishWork,
zap.Uint32("amount of unfinished replicas", task.quantity), zap.Uint32("amount of unfinished replicas", task.NumCopies),
) )
}() }()
if task.obj == nil { if task.Obj == nil {
var err error var err error
task.obj, err = engine.Get(ctx, p.localStorage, task.addr) task.Obj, err = engine.Get(ctx, p.localStorage, task.Addr)
if err != nil { if err != nil {
p.log.Error(logs.ReplicatorCouldNotGetObjectFromLocalStorage, p.log.Error(logs.ReplicatorCouldNotGetObjectFromLocalStorage,
zap.Stringer("object", task.addr), zap.Stringer("object", task.Addr),
zap.Error(err)) zap.Error(err))
return return
@ -41,9 +41,9 @@ func (p *Replicator) HandleTask(ctx context.Context, task Task, res TaskResult)
} }
prm := new(putsvc.RemotePutPrm). prm := new(putsvc.RemotePutPrm).
WithObject(task.obj) WithObject(task.Obj)
for i := 0; task.quantity > 0 && i < len(task.nodes); i++ { for i := 0; task.NumCopies > 0 && i < len(task.Nodes); i++ {
select { select {
case <-ctx.Done(): case <-ctx.Done():
return return
@ -51,13 +51,13 @@ func (p *Replicator) HandleTask(ctx context.Context, task Task, res TaskResult)
} }
log := p.log.With( log := p.log.With(
zap.String("node", netmap.StringifyPublicKey(task.nodes[i])), zap.String("node", netmap.StringifyPublicKey(task.Nodes[i])),
zap.Stringer("object", task.addr), zap.Stringer("object", task.Addr),
) )
callCtx, cancel := context.WithTimeout(ctx, p.putTimeout) callCtx, cancel := context.WithTimeout(ctx, p.putTimeout)
err := p.remoteSender.PutObject(callCtx, prm.WithNodeInfo(task.nodes[i])) err := p.remoteSender.PutObject(callCtx, prm.WithNodeInfo(task.Nodes[i]))
cancel() cancel()
@ -68,12 +68,12 @@ func (p *Replicator) HandleTask(ctx context.Context, task Task, res TaskResult)
} else { } else {
log.Debug(logs.ReplicatorObjectSuccessfullyReplicated) log.Debug(logs.ReplicatorObjectSuccessfullyReplicated)
task.quantity-- task.NumCopies--
res.SubmitSuccessfulReplication(task.nodes[i]) res.SubmitSuccessfulReplication(task.Nodes[i])
p.metrics.IncProcessedObjects() p.metrics.IncProcessedObjects()
p.metrics.AddPayloadSize(int64(task.obj.PayloadSize())) p.metrics.AddPayloadSize(int64(task.Obj.PayloadSize()))
} }
} }
} }

View file

@ -8,31 +8,12 @@ import (
// Task represents group of Replicator task parameters. // Task represents group of Replicator task parameters.
type Task struct { type Task struct {
quantity uint32 // NumCopies is the number of copies to replicate.
NumCopies uint32
addr oid.Address // Addr is the address of the local object.
Addr oid.Address
obj *objectSDK.Object // Obj is the object to avoid fetching it from the local storage.
Obj *objectSDK.Object
nodes []netmap.NodeInfo // Nodes is a list of potential object holders.
} Nodes []netmap.NodeInfo
// SetCopiesNumber sets number of copies to replicate.
func (t *Task) SetCopiesNumber(v uint32) {
t.quantity = v
}
// SetObjectAddress sets address of local object.
func (t *Task) SetObjectAddress(v oid.Address) {
t.addr = v
}
// SetObject sets object to avoid fetching it from the local storage.
func (t *Task) SetObject(obj *objectSDK.Object) {
t.obj = obj
}
// SetNodes sets a list of potential object holders.
func (t *Task) SetNodes(v []netmap.NodeInfo) {
t.nodes = v
} }