[#92] Refactor policer and add some unit tests #485

Merged
fyrchik merged 3 commits from ale64bit/frostfs-node:fix/policer-refactor-test into master 2023-07-03 07:05:33 +00:00
13 changed files with 655 additions and 324 deletions

View file

@ -0,0 +1,28 @@
package main
import (
"context"
"fmt"
objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
)
type keySpaceIterator struct {
ng *engine.StorageEngine
cur *engine.Cursor
}
func (it *keySpaceIterator) Next(ctx context.Context, batchSize uint32) ([]objectcore.AddressWithType, error) {
var prm engine.ListWithCursorPrm
prm.WithCursor(it.cur)
prm.WithCount(batchSize)
res, err := it.ng.ListWithCursor(ctx, prm)
if err != nil {
return nil, fmt.Errorf("cannot list objects in engine: %w", err)
}
it.cur = res.Cursor()
return res.AddressList(), nil
}

View file

@ -38,6 +38,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
eaclSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/eacl"
netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
@ -211,15 +212,30 @@ func addPolicer(c *cfg, keyStorage *util.KeyStorage, clientConstructor *cache.Cl
ls := c.cfgObject.cfgLocalStorage.localStorage
buryFn := func(ctx context.Context, addr oid.Address) error {
var prm engine.InhumePrm
prm.MarkAsGarbage(addr)
prm.WithForceRemoval()
_, err := ls.Inhume(ctx, prm)
return err
}
remoteHeader := headsvc.NewRemoteHeader(keyStorage, clientConstructor)
pol := policer.New(
policer.WithLogger(c.log),
policer.WithLocalStorage(ls),
policer.WithKeySpaceIterator(&keySpaceIterator{ng: ls}),
policer.WithBuryFunc(buryFn),
policer.WithContainerSource(c.cfgObject.cnrSource),
policer.WithPlacementBuilder(
placement.NewNetworkMapSourceBuilder(c.netMapSource),
),
policer.WithRemoteHeader(
headsvc.NewRemoteHeader(keyStorage, clientConstructor),
policer.WithRemoteObjectHeaderFunc(
func(ctx context.Context, ni netmapSDK.NodeInfo, a oid.Address) (*objectSDK.Object, error) {
prm := new(headsvc.RemoteHeadPrm).WithNodeInfo(ni).WithObjectAddress(a)
return remoteHeader.Head(ctx, prm)
},
),
policer.WithNetmapKeys(c),
policer.WithHeadTimeout(

View file

@ -84,11 +84,12 @@ func (s *Server) replicate(ctx context.Context, addr oid.Address, obj *objectSDK
}
var res replicatorResult
var task replicator.Task
task.SetObject(obj)
task.SetObjectAddress(addr)
task.SetCopiesNumber(1)
task.SetNodes(nodes)
task := replicator.Task{
NumCopies: 1,
Addr: addr,
Obj: obj,
Nodes: nodes,
}
s.replicator.HandleTask(ctx, task, &res)
if res.count == 0 {

View file

@ -7,8 +7,6 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
headsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/head"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
@ -18,55 +16,6 @@ import (
"go.uber.org/zap"
)
// tracks Policer's check progress.
type nodeCache map[uint64]bool
func newNodeCache() *nodeCache {
m := make(map[uint64]bool)
return (*nodeCache)(&m)
}
func (n *nodeCache) set(node netmap.NodeInfo, val bool) {
(*n)[node.Hash()] = val
}
// submits storage node as a candidate to store the object replica in case of
// shortage.
func (n *nodeCache) submitReplicaCandidate(node netmap.NodeInfo) {
n.set(node, false)
}
// submits storage node as a current object replica holder.
func (n *nodeCache) submitReplicaHolder(node netmap.NodeInfo) {
n.set(node, true)
}
// processStatus returns current processing status of the storage node
//
// >0 if node does not currently hold the object
// 0 if node already holds the object
// <0 if node has not been processed yet
func (n *nodeCache) processStatus(node netmap.NodeInfo) int8 {
val, ok := (*n)[node.Hash()]
if !ok {
return -1
}
if val {
return 0
}
return 1
}
// SubmitSuccessfulReplication marks given storage node as a current object
// replica holder.
//
// SubmitSuccessfulReplication implements replicator.TaskResult.
func (n *nodeCache) SubmitSuccessfulReplication(node netmap.NodeInfo) {
n.submitReplicaHolder(node)
}
func (p *Policer) processObject(ctx context.Context, addrWithType objectcore.AddressWithType) {
addr := addrWithType.Address
idCnr := addr.Container()
@ -79,11 +28,7 @@ func (p *Policer) processObject(ctx context.Context, addrWithType objectcore.Add
zap.String("error", err.Error()),
)
if container.IsErrNotFound(err) {
var prm engine.InhumePrm
prm.MarkAsGarbage(addrWithType.Address)
prm.WithForceRemoval()
_, err := p.jobQueue.localStorage.Inhume(ctx, prm)
err := p.buryFn(ctx, addrWithType.Address)
if err != nil {
p.log.Error(logs.PolicerCouldNotInhumeObjectWithMissingContainer,
zap.Stringer("cid", idCnr),
@ -145,10 +90,9 @@ type placementRequirements struct {
}
func (p *Policer) processNodes(ctx context.Context, requirements *placementRequirements, addrWithType objectcore.AddressWithType,
nodes []netmap.NodeInfo, shortage uint32, checkedNodes *nodeCache) {
nodes []netmap.NodeInfo, shortage uint32, checkedNodes nodeCache) {
addr := addrWithType.Address
typ := addrWithType.Type
prm := new(headsvc.RemoteHeadPrm).WithObjectAddress(addr)
// Number of copies that are stored on maintenance nodes.
var uncheckedCopies int
@ -175,8 +119,8 @@ func (p *Policer) processNodes(ctx context.Context, requirements *placementRequi
} else if nodes[i].IsMaintenance() {
shortage, uncheckedCopies = p.handleMaintenance(nodes[i], checkedNodes, shortage, uncheckedCopies)
} else {
if status := checkedNodes.processStatus(nodes[i]); status >= 0 {
if status == 0 {
if status := checkedNodes.processStatus(nodes[i]); status.Processed() {
if status == nodeHoldsObject {
// node already contains replica, no need to replicate
nodes = append(nodes[:i], nodes[i+1:]...)
i--
@ -188,7 +132,7 @@ func (p *Policer) processNodes(ctx context.Context, requirements *placementRequi
callCtx, cancel := context.WithTimeout(ctx, p.headTimeout)
_, err := p.remoteHeader.Head(callCtx, prm.WithNodeInfo(nodes[i]))
_, err := p.remoteHeader(callCtx, nodes[i], addr)
cancel()
@ -224,7 +168,7 @@ func (p *Policer) processNodes(ctx context.Context, requirements *placementRequi
// prevent spam with new replicas.
// However, additional copies should not be removed in this case,
// because we can remove the only copy this way.
func (p *Policer) handleMaintenance(node netmap.NodeInfo, checkedNodes *nodeCache, shortage uint32, uncheckedCopies int) (uint32, int) {
func (p *Policer) handleMaintenance(node netmap.NodeInfo, checkedNodes nodeCache, shortage uint32, uncheckedCopies int) (uint32, int) {
checkedNodes.submitReplicaHolder(node)
shortage--
uncheckedCopies++
@ -236,25 +180,29 @@ func (p *Policer) handleMaintenance(node netmap.NodeInfo, checkedNodes *nodeCach
}
func (p *Policer) handleProcessNodesResult(ctx context.Context, addr oid.Address, requirements *placementRequirements,
nodes []netmap.NodeInfo, checkedNodes *nodeCache, shortage uint32, uncheckedCopies int) {
if shortage > 0 {
nodes []netmap.NodeInfo, checkedNodes nodeCache, shortage uint32, uncheckedCopies int) {
switch {
case shortage > 0:
p.log.Debug(logs.PolicerShortageOfObjectCopiesDetected,
zap.Stringer("object", addr),
zap.Uint32("shortage", shortage),
)
var task replicator.Task
task.SetObjectAddress(addr)
task.SetNodes(nodes)
task.SetCopiesNumber(shortage)
task := replicator.Task{
NumCopies: shortage,
Addr: addr,
Nodes: nodes,
}
p.replicator.HandleTask(ctx, task, checkedNodes)
} else if uncheckedCopies > 0 {
case uncheckedCopies > 0:
// If we have more copies than needed, but some of them are from the maintenance nodes,
// save the local copy.
p.log.Debug(logs.PolicerSomeOfTheCopiesAreStoredOnNodesUnderMaintenance,
zap.Int("count", uncheckedCopies))
} else if uncheckedCopies == 0 {
case uncheckedCopies == 0:
// Safe to remove: checked all copies, shortage == 0.
requirements.removeLocalCopy = true
}

View file

@ -11,14 +11,14 @@ func TestNodeCache(t *testing.T) {
cache := newNodeCache()
node := netmaptest.NodeInfo()
require.Negative(t, cache.processStatus(node))
require.Equal(t, cache.processStatus(node), nodeNotProcessed)
cache.SubmitSuccessfulReplication(node)
require.Zero(t, cache.processStatus(node))
require.Equal(t, cache.processStatus(node), nodeHoldsObject)
cache.submitReplicaCandidate(node)
require.Positive(t, cache.processStatus(node))
require.Equal(t, cache.processStatus(node), nodeDoesNotHoldObject)
cache.submitReplicaHolder(node)
require.Zero(t, cache.processStatus(node))
require.Equal(t, cache.processStatus(node), nodeHoldsObject)
}

View file

@ -0,0 +1,57 @@
package policer
import "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
type nodeProcessStatus int8
const (
nodeNotProcessed nodeProcessStatus = iota
nodeDoesNotHoldObject
nodeHoldsObject
)
func (st nodeProcessStatus) Processed() bool {
return st != nodeNotProcessed
}
// nodeCache tracks Policer's check progress.
type nodeCache map[uint64]bool
func newNodeCache() nodeCache {
return make(map[uint64]bool)
}
func (n nodeCache) set(node netmap.NodeInfo, val bool) {
n[node.Hash()] = val
}
// submits storage node as a candidate to store the object replica in case of
// shortage.
func (n nodeCache) submitReplicaCandidate(node netmap.NodeInfo) {
n.set(node, false)
}
// submits storage node as a current object replica holder.
func (n nodeCache) submitReplicaHolder(node netmap.NodeInfo) {
n.set(node, true)
}
// processStatus returns current processing status of the storage node.
func (n nodeCache) processStatus(node netmap.NodeInfo) nodeProcessStatus {
switch val, ok := n[node.Hash()]; {
case !ok:
return nodeNotProcessed
case val:
return nodeHoldsObject
default:
return nodeDoesNotHoldObject
}
}
// SubmitSuccessfulReplication marks given storage node as a current object
// replica holder.
//
// SubmitSuccessfulReplication implements replicator.TaskResult.
func (n nodeCache) SubmitSuccessfulReplication(node netmap.NodeInfo) {
n.submitReplicaHolder(node)
}

View file

@ -0,0 +1,185 @@
package policer
import (
"context"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"github.com/panjf2000/ants/v2"
"go.uber.org/zap"
)
// KeySpaceIterator is the interface that allows iterating over the key space
// of local storage.
// Note that the underlying implementation might be circular: i.e. it can restart
// when the end of the key space is reached.
type KeySpaceIterator interface {
Next(context.Context, uint32) ([]objectcore.AddressWithType, error)
}
// RedundantCopyCallback is a callback to pass
// the redundant local copy of the object.
type RedundantCopyCallback func(context.Context, oid.Address)
// BuryFunc is the function to bury (i.e. inhume) an object.
type BuryFunc func(context.Context, oid.Address) error
// Replicator is the interface to a consumer of replication tasks.
type Replicator interface {
HandleTask(ctx context.Context, task replicator.Task, res replicator.TaskResult)
}
// RemoteObjectHeaderFunc is the function to obtain HEAD info from a specific remote node.
type RemoteObjectHeaderFunc func(context.Context, netmapSDK.NodeInfo, oid.Address) (*objectSDK.Object, error)
// NodeLoader provides application load statistics.
type nodeLoader interface {
// ObjectServiceLoad returns object service load value in [0:1] range.
ObjectServiceLoad() float64
}
type cfg struct {
headTimeout time.Duration
log *logger.Logger
keySpaceIterator KeySpaceIterator
buryFn BuryFunc
cnrSrc container.Source
placementBuilder placement.Builder
remoteHeader RemoteObjectHeaderFunc
netmapKeys netmap.AnnouncedKeys
replicator Replicator
cbRedundantCopy RedundantCopyCallback
taskPool *ants.Pool
loader nodeLoader
maxCapacity int
batchSize, cacheSize uint32
rebalanceFreq, evictDuration time.Duration
}
func defaultCfg() *cfg {
return &cfg{
log: &logger.Logger{Logger: zap.L()},
batchSize: 10,
cacheSize: 1024, // 1024 * address size = 1024 * 64 = 64 MiB
rebalanceFreq: 1 * time.Second,
evictDuration: 30 * time.Second,
}
}
// Option is an option for Policer constructor.
type Option func(*cfg)
// WithHeadTimeout returns option to set Head timeout of Policer.
func WithHeadTimeout(v time.Duration) Option {
return func(c *cfg) {
c.headTimeout = v
}
}
// WithLogger returns option to set Logger of Policer.
func WithLogger(v *logger.Logger) Option {
return func(c *cfg) {
c.log = v
}
}
func WithKeySpaceIterator(it KeySpaceIterator) Option {
return func(c *cfg) {
c.keySpaceIterator = it
}
}
func WithBuryFunc(f BuryFunc) Option {
return func(c *cfg) {
c.buryFn = f
}
}
// WithContainerSource returns option to set container source of Policer.
func WithContainerSource(v container.Source) Option {
return func(c *cfg) {
c.cnrSrc = v
}
}
// WithPlacementBuilder returns option to set object placement builder of Policer.
func WithPlacementBuilder(v placement.Builder) Option {
return func(c *cfg) {
c.placementBuilder = v
}
}
// WithRemoteObjectHeader returns option to set object header receiver of Policer.
func WithRemoteObjectHeaderFunc(v RemoteObjectHeaderFunc) Option {
return func(c *cfg) {
c.remoteHeader = v
}
}
// WithNetmapKeys returns option to set tool to work with announced public keys.
func WithNetmapKeys(v netmap.AnnouncedKeys) Option {
return func(c *cfg) {
c.netmapKeys = v
}
}
// WithReplicator returns option to set object replicator of Policer.
func WithReplicator(v Replicator) Option {
return func(c *cfg) {
c.replicator = v
}
}
// WithRedundantCopyCallback returns option to set
// callback to pass redundant local object copies
// detected by Policer.
func WithRedundantCopyCallback(cb RedundantCopyCallback) Option {
return func(c *cfg) {
c.cbRedundantCopy = cb
}
}
// WithMaxCapacity returns option to set max capacity
// that can be set to the pool.
func WithMaxCapacity(capacity int) Option {
return func(c *cfg) {
c.maxCapacity = capacity
}
}
// WithPool returns option to set pool for
// policy and replication operations.
func WithPool(p *ants.Pool) Option {
return func(c *cfg) {
c.taskPool = p
}
}
// WithNodeLoader returns option to set FrostFS node load source.
func WithNodeLoader(l nodeLoader) Option {
return func(c *cfg) {
c.loader = l
}
}

View file

@ -1,52 +1,40 @@
package policer
import (
"context"
"sync"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
headsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/head"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
lru "github.com/hashicorp/golang-lru/v2"
"github.com/panjf2000/ants/v2"
"go.uber.org/zap"
)
// NodeLoader provides application load statistics.
type nodeLoader interface {
// ObjectServiceLoad returns object service load value in [0:1] range.
ObjectServiceLoad() float64
}
type objectsInWork struct {
m sync.RWMutex
sync.RWMutex
objs map[oid.Address]struct{}
}
func (oiw *objectsInWork) inWork(addr oid.Address) bool {
oiw.m.RLock()
oiw.RLock()
_, ok := oiw.objs[addr]
oiw.m.RUnlock()
oiw.RUnlock()
return ok
}
func (oiw *objectsInWork) remove(addr oid.Address) {
oiw.m.Lock()
oiw.Lock()
delete(oiw.objs, addr)
oiw.m.Unlock()
oiw.Unlock()
}
func (oiw *objectsInWork) add(addr oid.Address) {
oiw.m.Lock()
func (oiw *objectsInWork) add(addr oid.Address) bool {
oiw.Lock()
_, exists := oiw.objs[addr]
oiw.objs[addr] = struct{}{}
oiw.m.Unlock()
oiw.Unlock()
return !exists
}
// Policer represents the utility that verifies
@ -59,53 +47,6 @@ type Policer struct {
objsInWork *objectsInWork
}
// Option is an option for Policer constructor.
type Option func(*cfg)
// RedundantCopyCallback is a callback to pass
// the redundant local copy of the object.
type RedundantCopyCallback func(context.Context, oid.Address)
type cfg struct {
headTimeout time.Duration
log *logger.Logger
jobQueue jobQueue
cnrSrc container.Source
placementBuilder placement.Builder
remoteHeader *headsvc.RemoteHeader
netmapKeys netmap.AnnouncedKeys
replicator *replicator.Replicator
cbRedundantCopy RedundantCopyCallback
taskPool *ants.Pool
loader nodeLoader
maxCapacity int
batchSize, cacheSize uint32
rebalanceFreq, evictDuration time.Duration
}
func defaultCfg() *cfg {
return &cfg{
log: &logger.Logger{Logger: zap.L()},
batchSize: 10,
cacheSize: 1024, // 1024 * address size = 1024 * 64 = 64 MiB
rebalanceFreq: 1 * time.Second,
evictDuration: 30 * time.Second,
}
}
// New creates, initializes and returns Policer instance.
func New(opts ...Option) *Policer {
c := defaultCfg()
@ -129,91 +70,3 @@ func New(opts ...Option) *Policer {
},
}
}
// WithHeadTimeout returns option to set Head timeout of Policer.
func WithHeadTimeout(v time.Duration) Option {
return func(c *cfg) {
c.headTimeout = v
}
}
// WithLogger returns option to set Logger of Policer.
func WithLogger(v *logger.Logger) Option {
return func(c *cfg) {
c.log = v
}
}
// WithLocalStorage returns option to set local object storage of Policer.
func WithLocalStorage(v *engine.StorageEngine) Option {
return func(c *cfg) {
c.jobQueue.localStorage = v
}
}
// WithContainerSource returns option to set container source of Policer.
func WithContainerSource(v container.Source) Option {
return func(c *cfg) {
c.cnrSrc = v
}
}
// WithPlacementBuilder returns option to set object placement builder of Policer.
func WithPlacementBuilder(v placement.Builder) Option {
return func(c *cfg) {
c.placementBuilder = v
}
}
// WithRemoteHeader returns option to set object header receiver of Policer.
func WithRemoteHeader(v *headsvc.RemoteHeader) Option {
return func(c *cfg) {
c.remoteHeader = v
}
}
// WithNetmapKeys returns option to set tool to work with announced public keys.
func WithNetmapKeys(v netmap.AnnouncedKeys) Option {
return func(c *cfg) {
c.netmapKeys = v
}
}
// WithReplicator returns option to set object replicator of Policer.
func WithReplicator(v *replicator.Replicator) Option {
return func(c *cfg) {
c.replicator = v
}
}
// WithRedundantCopyCallback returns option to set
// callback to pass redundant local object copies
// detected by Policer.
func WithRedundantCopyCallback(cb RedundantCopyCallback) Option {
return func(c *cfg) {
c.cbRedundantCopy = cb
}
}
// WithMaxCapacity returns option to set max capacity
// that can be set to the pool.
func WithMaxCapacity(capacity int) Option {
return func(c *cfg) {
c.maxCapacity = capacity
}
}
// WithPool returns option to set pool for
// policy and replication operations.
func WithPool(p *ants.Pool) Option {
return func(c *cfg) {
c.taskPool = p
}
}
// WithNodeLoader returns option to set FrostFS node load source.
func WithNodeLoader(l nodeLoader) Option {
return func(c *cfg) {
c.loader = l
}
}

View file

@ -0,0 +1,299 @@
package policer
import (
"bytes"
"context"
"errors"
"sort"
"testing"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
"github.com/panjf2000/ants/v2"
"github.com/stretchr/testify/require"
)
func TestBuryObjectWithoutContainer(t *testing.T) {
// Key space
addr := oidtest.Address()
objs := []objectcore.AddressWithType{
{
Address: addr,
Type: object.TypeRegular,
},
}
// Container source and bury function
buryCh := make(chan oid.Address)
containerSrc := func(id cid.ID) (*container.Container, error) {
return nil, apistatus.ContainerNotFound{}
}
buryFn := func(ctx context.Context, a oid.Address) error {
buryCh <- a
return nil
}
// Task pool
pool, err := ants.NewPool(4)
require.NoError(t, err)
// Policer instance
p := New(
WithKeySpaceIterator(&sliceKeySpaceIterator{objs: objs}),
WithContainerSource(containerSrcFunc(containerSrc)),
WithBuryFunc(buryFn),
WithPool(pool),
WithNodeLoader(constNodeLoader(0)),
)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go p.Run(ctx)
require.Equal(t, addr, <-buryCh)
}
func TestProcessObject(t *testing.T) {
// Notes:
// - nodes are referred to by their index throughout, which is embedded in the public key
// - node with index 0 always refers to the local node, so there's no need to add it to objHolders
// - policy is used only to match the number of replicas for each index in the placement
tests := []struct {
desc string
objType object.Type
nodeCount int
policy string
placement [][]int
objHolders []int
maintenanceNodes []int
wantRemoveRedundant bool
wantReplicateTo []int
}{
{
desc: "1 copy already held by local node",
nodeCount: 1,
policy: `REP 1`,
placement: [][]int{{0}},
},
{
desc: "1 copy already held by the remote node",
nodeCount: 2,
policy: `REP 1`,
placement: [][]int{{1}},
objHolders: []int{1},
wantRemoveRedundant: true,
},
{
desc: "1 copy not yet held by the remote node",
nodeCount: 2,
policy: `REP 1`,
placement: [][]int{{1}},
wantReplicateTo: []int{1},
},
{
desc: "2 copies already held by local and remote node",
nodeCount: 2,
policy: `REP 2`,
placement: [][]int{{0, 1}},
objHolders: []int{1},
},
{
desc: "2 copies but not held by remote node",
nodeCount: 2,
policy: `REP 2`,
placement: [][]int{{0, 1}},
wantReplicateTo: []int{1},
},
{
desc: "multiple vectors already held by remote node",
nodeCount: 2,
policy: `REP 2 REP 2`,
placement: [][]int{{0, 1}, {0, 1}},
objHolders: []int{1},
},
{
desc: "multiple vectors not yet held by remote node",
nodeCount: 2,
policy: `REP 2 REP 2`,
placement: [][]int{{0, 1}, {0, 1}},
wantReplicateTo: []int{1, 1}, // is this actually good?
},
{
desc: "lock object must be replicated to all nodes",
objType: object.TypeLock,
nodeCount: 3,
policy: `REP 1`,
placement: [][]int{{0, 1, 2}},
wantReplicateTo: []int{1, 2},
},
{
desc: "preserve local copy when maintenance nodes exist",
nodeCount: 3,
policy: `REP 2`,
placement: [][]int{{1, 2}},
objHolders: []int{1},
maintenanceNodes: []int{2},
},
}
for i := range tests {
ti := tests[i]
t.Run(ti.desc, func(t *testing.T) {
addr := oidtest.Address()
// Netmap, placement policy and placement builder
nodes := make([]netmap.NodeInfo, ti.nodeCount)
for i := range nodes {
nodes[i].SetPublicKey([]byte{byte(i)})
}
for _, i := range ti.maintenanceNodes {
nodes[i].SetMaintenance()
}
var policy netmap.PlacementPolicy
require.NoError(t, policy.DecodeString(ti.policy))
placementVectors := make([][]netmap.NodeInfo, len(ti.placement))
for i, pv := range ti.placement {
for _, nj := range pv {
placementVectors[i] = append(placementVectors[i], nodes[nj])
}
}
placementBuilder := func(cnr cid.ID, obj *oid.ID, p netmap.PlacementPolicy) ([][]netmap.NodeInfo, error) {
if cnr.Equals(addr.Container()) && obj != nil && obj.Equals(addr.Object()) {
return placementVectors, nil
}
t.Errorf("unexpected placement build: cid=%v oid=%v", cnr, obj)
return nil, errors.New("unexpected placement build")
}
// Object remote header
headFn := func(_ context.Context, ni netmap.NodeInfo, a oid.Address) (*object.Object, error) {
index := int(ni.PublicKey()[0])
if a != addr || index < 1 || index >= ti.nodeCount {
t.Errorf("unexpected remote object head: node=%+v addr=%v", ni, a)
return nil, errors.New("unexpected object head")
}
for _, i := range ti.objHolders {
if index == i {
return nil, nil
}
}
return nil, apistatus.ObjectNotFound{}
}
// Container source
cnr := &container.Container{}
cnr.Value.Init()
cnr.Value.SetPlacementPolicy(policy)
containerSrc := func(id cid.ID) (*container.Container, error) {
if id.Equals(addr.Container()) {
return cnr, nil
}
t.Errorf("unexpected container requested: got=%v, want=%v", id, addr.Container())
return nil, apistatus.ContainerNotFound{}
}
buryFn := func(ctx context.Context, a oid.Address) error {
t.Errorf("unexpected object buried: %v", a)
return nil
}
// Policer instance
var gotRemoveRedundant bool
var gotReplicateTo []int
p := New(
WithContainerSource(containerSrcFunc(containerSrc)),
WithPlacementBuilder(placementBuilderFunc(placementBuilder)),
WithNetmapKeys(announcedKeysFunc(func(k []byte) bool {
return bytes.Equal(k, nodes[0].PublicKey())
})),
WithRemoteObjectHeaderFunc(headFn),
WithBuryFunc(buryFn),
WithRedundantCopyCallback(func(_ context.Context, a oid.Address) {
require.True(t, eqAddr(a, addr), "unexpected redundant copy callback: a=%v", a)
gotRemoveRedundant = true
}),
WithReplicator(replicatorFunc(func(_ context.Context, task replicator.Task, res replicator.TaskResult) {
require.True(t, eqAddr(task.Addr, addr), "unexpected replicator task: %+v", task)
for _, node := range task.Nodes {
gotReplicateTo = append(gotReplicateTo, int(node.PublicKey()[0]))
}
})),
)
addrWithType := objectcore.AddressWithType{
Address: addr,
Type: ti.objType,
}
p.processObject(context.Background(), addrWithType)
sort.Ints(gotReplicateTo)
require.Equal(t, ti.wantRemoveRedundant, gotRemoveRedundant)
require.Equal(t, ti.wantReplicateTo, gotReplicateTo)
})
}
}
// TODO(https://git.frostfs.info/TrueCloudLab/frostfs-sdk-go/issues/101)
func eqAddr(a, b oid.Address) bool {
return a.Container().Equals(b.Container()) && a.Object().Equals(b.Object())
}
// sliceKeySpaceIterator is a KeySpaceIterator backed by a slice.
type sliceKeySpaceIterator struct {
objs []objectcore.AddressWithType
cur int
}
func (it *sliceKeySpaceIterator) Next(_ context.Context, size uint32) ([]objectcore.AddressWithType, error) {
if it.cur >= len(it.objs) {
it.cur = 0
return nil, engine.ErrEndOfListing
}
end := it.cur + int(size)
if end > len(it.objs) {
end = len(it.objs)
}
ret := it.objs[it.cur:end]
it.cur = end
return ret, nil
}
// containerSrcFunc is a container.Source backed by a function.
type containerSrcFunc func(cid.ID) (*container.Container, error)
func (f containerSrcFunc) Get(id cid.ID) (*container.Container, error) { return f(id) }
// placementBuilderFunc is a placement.Builder backed by a function
type placementBuilderFunc func(cid.ID, *oid.ID, netmap.PlacementPolicy) ([][]netmap.NodeInfo, error)
func (f placementBuilderFunc) BuildPlacement(c cid.ID, o *oid.ID, p netmap.PlacementPolicy) ([][]netmap.NodeInfo, error) {
return f(c, o, p)
}
// announcedKeysFunc is a netmap.AnnouncedKeys backed by a function.
type announcedKeysFunc func([]byte) bool
func (f announcedKeysFunc) IsLocalKey(k []byte) bool { return f(k) }
// constNodeLoader is a nodeLoader that always returns a fixed value.
type constNodeLoader float64
func (f constNodeLoader) ObjectServiceLoad() float64 { return float64(f) }
// replicatorFunc is a Replicator backed by a function.
type replicatorFunc func(context.Context, replicator.Task, replicator.TaskResult)
func (f replicatorFunc) HandleTask(ctx context.Context, task replicator.Task, res replicator.TaskResult) {
f(ctx, task, res)
}

View file

@ -6,27 +6,17 @@ import (
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
"go.uber.org/zap"
)
func (p *Policer) Run(ctx context.Context) {
defer func() {
p.log.Info(logs.PolicerRoutineStopped)
}()
go p.poolCapacityWorker(ctx)
p.shardPolicyWorker(ctx)
p.log.Info(logs.PolicerRoutineStopped)
}
func (p *Policer) shardPolicyWorker(ctx context.Context) {
var (
addrs []objectcore.AddressWithType
cursor *engine.Cursor
err error
)
for {
select {
case <-ctx.Done():
@ -34,7 +24,7 @@ func (p *Policer) shardPolicyWorker(ctx context.Context) {
default:
}
addrs, cursor, err = p.jobQueue.Select(ctx, cursor, p.batchSize)
addrs, err := p.keySpaceIterator.Next(ctx, p.batchSize)
if err != nil {
if errors.Is(err, engine.ErrEndOfListing) {
time.Sleep(time.Second) // finished whole cycle, sleep a bit
@ -55,18 +45,17 @@ func (p *Policer) shardPolicyWorker(ctx context.Context) {
continue
}
err = p.taskPool.Submit(func() {
err := p.taskPool.Submit(func() {
v, ok := p.cache.Get(addr.Address)
if ok && time.Since(v) < p.evictDuration {
return
}
p.objsInWork.add(addr.Address)
p.processObject(ctx, addr)
p.cache.Add(addr.Address, time.Now())
p.objsInWork.remove(addr.Address)
if p.objsInWork.add(addr.Address) {
p.processObject(ctx, addr)
p.cache.Add(addr.Address, time.Now())
p.objsInWork.remove(addr.Address)
}
})
if err != nil {
p.log.Warn(logs.PolicerPoolSubmission, zap.Error(err))
@ -78,10 +67,10 @@ func (p *Policer) shardPolicyWorker(ctx context.Context) {
func (p *Policer) poolCapacityWorker(ctx context.Context) {
ticker := time.NewTicker(p.rebalanceFreq)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
ticker.Stop()
return
case <-ticker.C:
frostfsSysLoad := p.loader.ObjectServiceLoad()

View file

@ -1,26 +0,0 @@
package policer
import (
"context"
"fmt"
objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
)
type jobQueue struct {
localStorage *engine.StorageEngine
}
func (q *jobQueue) Select(ctx context.Context, cursor *engine.Cursor, count uint32) ([]objectcore.AddressWithType, *engine.Cursor, error) {
var prm engine.ListWithCursorPrm
prm.WithCursor(cursor)
prm.WithCount(count)
res, err := q.localStorage.ListWithCursor(ctx, prm)
if err != nil {
return nil, nil, fmt.Errorf("cannot list objects in engine: %w", err)
}
return res.AddressList(), res.Cursor(), nil
}

View file

@ -24,16 +24,16 @@ func (p *Replicator) HandleTask(ctx context.Context, task Task, res TaskResult)
defer p.metrics.DecInFlightRequest()
defer func() {
p.log.Debug(logs.ReplicatorFinishWork,
zap.Uint32("amount of unfinished replicas", task.quantity),
zap.Uint32("amount of unfinished replicas", task.NumCopies),
)
}()
if task.obj == nil {
if task.Obj == nil {
var err error
task.obj, err = engine.Get(ctx, p.localStorage, task.addr)
task.Obj, err = engine.Get(ctx, p.localStorage, task.Addr)
if err != nil {
p.log.Error(logs.ReplicatorCouldNotGetObjectFromLocalStorage,
zap.Stringer("object", task.addr),
zap.Stringer("object", task.Addr),
zap.Error(err))
return
@ -41,9 +41,9 @@ func (p *Replicator) HandleTask(ctx context.Context, task Task, res TaskResult)
}
prm := new(putsvc.RemotePutPrm).
WithObject(task.obj)
WithObject(task.Obj)
for i := 0; task.quantity > 0 && i < len(task.nodes); i++ {
for i := 0; task.NumCopies > 0 && i < len(task.Nodes); i++ {
select {
case <-ctx.Done():
return
@ -51,13 +51,13 @@ func (p *Replicator) HandleTask(ctx context.Context, task Task, res TaskResult)
}
log := p.log.With(
zap.String("node", netmap.StringifyPublicKey(task.nodes[i])),
zap.Stringer("object", task.addr),
zap.String("node", netmap.StringifyPublicKey(task.Nodes[i])),
zap.Stringer("object", task.Addr),
)
callCtx, cancel := context.WithTimeout(ctx, p.putTimeout)
err := p.remoteSender.PutObject(callCtx, prm.WithNodeInfo(task.nodes[i]))
err := p.remoteSender.PutObject(callCtx, prm.WithNodeInfo(task.Nodes[i]))
cancel()
@ -68,12 +68,12 @@ func (p *Replicator) HandleTask(ctx context.Context, task Task, res TaskResult)
} else {
log.Debug(logs.ReplicatorObjectSuccessfullyReplicated)
task.quantity--
task.NumCopies--
res.SubmitSuccessfulReplication(task.nodes[i])
res.SubmitSuccessfulReplication(task.Nodes[i])
p.metrics.IncProcessedObjects()
p.metrics.AddPayloadSize(int64(task.obj.PayloadSize()))
p.metrics.AddPayloadSize(int64(task.Obj.PayloadSize()))
}
}
}

View file

@ -8,31 +8,12 @@ import (
// Task represents group of Replicator task parameters.
type Task struct {
quantity uint32
addr oid.Address
obj *objectSDK.Object
nodes []netmap.NodeInfo
}
// SetCopiesNumber sets number of copies to replicate.
func (t *Task) SetCopiesNumber(v uint32) {
t.quantity = v
}
// SetObjectAddress sets address of local object.
func (t *Task) SetObjectAddress(v oid.Address) {
t.addr = v
}
// SetObject sets object to avoid fetching it from the local storage.
func (t *Task) SetObject(obj *objectSDK.Object) {
t.obj = obj
}
// SetNodes sets a list of potential object holders.
func (t *Task) SetNodes(v []netmap.NodeInfo) {
t.nodes = v
// NumCopies is the number of copies to replicate.
NumCopies uint32
// Addr is the address of the local object.
Addr oid.Address
// Obj is the object to avoid fetching it from the local storage.
Obj *objectSDK.Object
// Nodes is a list of potential object holders.
Nodes []netmap.NodeInfo
}