[#92] Refactor policer and add some unit tests #485
13 changed files with 655 additions and 324 deletions
28
cmd/frostfs-node/keyspaceiterator.go
Normal file
28
cmd/frostfs-node/keyspaceiterator.go
Normal file
|
@ -0,0 +1,28 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
|
||||
)
|
||||
|
||||
type keySpaceIterator struct {
|
||||
ng *engine.StorageEngine
|
||||
cur *engine.Cursor
|
||||
}
|
||||
|
||||
func (it *keySpaceIterator) Next(ctx context.Context, batchSize uint32) ([]objectcore.AddressWithType, error) {
|
||||
var prm engine.ListWithCursorPrm
|
||||
prm.WithCursor(it.cur)
|
||||
prm.WithCount(batchSize)
|
||||
|
||||
res, err := it.ng.ListWithCursor(ctx, prm)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot list objects in engine: %w", err)
|
||||
}
|
||||
|
||||
it.cur = res.Cursor()
|
||||
return res.AddressList(), nil
|
||||
}
|
|
@ -38,6 +38,7 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator"
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
eaclSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/eacl"
|
||||
netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
||||
|
@ -211,15 +212,30 @@ func addPolicer(c *cfg, keyStorage *util.KeyStorage, clientConstructor *cache.Cl
|
|||
|
||||
ls := c.cfgObject.cfgLocalStorage.localStorage
|
||||
|
||||
buryFn := func(ctx context.Context, addr oid.Address) error {
|
||||
var prm engine.InhumePrm
|
||||
prm.MarkAsGarbage(addr)
|
||||
prm.WithForceRemoval()
|
||||
|
||||
_, err := ls.Inhume(ctx, prm)
|
||||
return err
|
||||
}
|
||||
|
||||
remoteHeader := headsvc.NewRemoteHeader(keyStorage, clientConstructor)
|
||||
|
||||
pol := policer.New(
|
||||
policer.WithLogger(c.log),
|
||||
policer.WithLocalStorage(ls),
|
||||
policer.WithKeySpaceIterator(&keySpaceIterator{ng: ls}),
|
||||
policer.WithBuryFunc(buryFn),
|
||||
policer.WithContainerSource(c.cfgObject.cnrSource),
|
||||
policer.WithPlacementBuilder(
|
||||
placement.NewNetworkMapSourceBuilder(c.netMapSource),
|
||||
),
|
||||
policer.WithRemoteHeader(
|
||||
headsvc.NewRemoteHeader(keyStorage, clientConstructor),
|
||||
policer.WithRemoteObjectHeaderFunc(
|
||||
func(ctx context.Context, ni netmapSDK.NodeInfo, a oid.Address) (*objectSDK.Object, error) {
|
||||
prm := new(headsvc.RemoteHeadPrm).WithNodeInfo(ni).WithObjectAddress(a)
|
||||
return remoteHeader.Head(ctx, prm)
|
||||
},
|
||||
),
|
||||
policer.WithNetmapKeys(c),
|
||||
policer.WithHeadTimeout(
|
||||
|
|
|
@ -84,11 +84,12 @@ func (s *Server) replicate(ctx context.Context, addr oid.Address, obj *objectSDK
|
|||
}
|
||||
|
||||
var res replicatorResult
|
||||
var task replicator.Task
|
||||
task.SetObject(obj)
|
||||
task.SetObjectAddress(addr)
|
||||
task.SetCopiesNumber(1)
|
||||
task.SetNodes(nodes)
|
||||
task := replicator.Task{
|
||||
NumCopies: 1,
|
||||
Addr: addr,
|
||||
Obj: obj,
|
||||
Nodes: nodes,
|
||||
}
|
||||
s.replicator.HandleTask(ctx, task, &res)
|
||||
|
||||
if res.count == 0 {
|
||||
|
|
|
@ -7,8 +7,6 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
||||
objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
|
||||
headsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/head"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||
|
@ -18,55 +16,6 @@ import (
|
|||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// tracks Policer's check progress.
|
||||
type nodeCache map[uint64]bool
|
||||
|
||||
func newNodeCache() *nodeCache {
|
||||
m := make(map[uint64]bool)
|
||||
return (*nodeCache)(&m)
|
||||
}
|
||||
|
||||
func (n *nodeCache) set(node netmap.NodeInfo, val bool) {
|
||||
(*n)[node.Hash()] = val
|
||||
}
|
||||
|
||||
// submits storage node as a candidate to store the object replica in case of
|
||||
// shortage.
|
||||
func (n *nodeCache) submitReplicaCandidate(node netmap.NodeInfo) {
|
||||
n.set(node, false)
|
||||
}
|
||||
|
||||
// submits storage node as a current object replica holder.
|
||||
func (n *nodeCache) submitReplicaHolder(node netmap.NodeInfo) {
|
||||
n.set(node, true)
|
||||
}
|
||||
|
||||
// processStatus returns current processing status of the storage node
|
||||
//
|
||||
// >0 if node does not currently hold the object
|
||||
// 0 if node already holds the object
|
||||
// <0 if node has not been processed yet
|
||||
func (n *nodeCache) processStatus(node netmap.NodeInfo) int8 {
|
||||
val, ok := (*n)[node.Hash()]
|
||||
if !ok {
|
||||
return -1
|
||||
}
|
||||
|
||||
if val {
|
||||
return 0
|
||||
}
|
||||
|
||||
return 1
|
||||
}
|
||||
|
||||
// SubmitSuccessfulReplication marks given storage node as a current object
|
||||
// replica holder.
|
||||
//
|
||||
// SubmitSuccessfulReplication implements replicator.TaskResult.
|
||||
func (n *nodeCache) SubmitSuccessfulReplication(node netmap.NodeInfo) {
|
||||
n.submitReplicaHolder(node)
|
||||
}
|
||||
|
||||
func (p *Policer) processObject(ctx context.Context, addrWithType objectcore.AddressWithType) {
|
||||
addr := addrWithType.Address
|
||||
idCnr := addr.Container()
|
||||
|
@ -79,11 +28,7 @@ func (p *Policer) processObject(ctx context.Context, addrWithType objectcore.Add
|
|||
zap.String("error", err.Error()),
|
||||
)
|
||||
if container.IsErrNotFound(err) {
|
||||
var prm engine.InhumePrm
|
||||
prm.MarkAsGarbage(addrWithType.Address)
|
||||
prm.WithForceRemoval()
|
||||
|
||||
_, err := p.jobQueue.localStorage.Inhume(ctx, prm)
|
||||
err := p.buryFn(ctx, addrWithType.Address)
|
||||
if err != nil {
|
||||
p.log.Error(logs.PolicerCouldNotInhumeObjectWithMissingContainer,
|
||||
zap.Stringer("cid", idCnr),
|
||||
|
@ -145,10 +90,9 @@ type placementRequirements struct {
|
|||
}
|
||||
|
||||
func (p *Policer) processNodes(ctx context.Context, requirements *placementRequirements, addrWithType objectcore.AddressWithType,
|
||||
nodes []netmap.NodeInfo, shortage uint32, checkedNodes *nodeCache) {
|
||||
nodes []netmap.NodeInfo, shortage uint32, checkedNodes nodeCache) {
|
||||
addr := addrWithType.Address
|
||||
typ := addrWithType.Type
|
||||
prm := new(headsvc.RemoteHeadPrm).WithObjectAddress(addr)
|
||||
|
||||
// Number of copies that are stored on maintenance nodes.
|
||||
var uncheckedCopies int
|
||||
|
@ -175,8 +119,8 @@ func (p *Policer) processNodes(ctx context.Context, requirements *placementRequi
|
|||
} else if nodes[i].IsMaintenance() {
|
||||
shortage, uncheckedCopies = p.handleMaintenance(nodes[i], checkedNodes, shortage, uncheckedCopies)
|
||||
} else {
|
||||
if status := checkedNodes.processStatus(nodes[i]); status >= 0 {
|
||||
if status == 0 {
|
||||
if status := checkedNodes.processStatus(nodes[i]); status.Processed() {
|
||||
if status == nodeHoldsObject {
|
||||
// node already contains replica, no need to replicate
|
||||
nodes = append(nodes[:i], nodes[i+1:]...)
|
||||
i--
|
||||
|
@ -188,7 +132,7 @@ func (p *Policer) processNodes(ctx context.Context, requirements *placementRequi
|
|||
|
||||
callCtx, cancel := context.WithTimeout(ctx, p.headTimeout)
|
||||
|
||||
_, err := p.remoteHeader.Head(callCtx, prm.WithNodeInfo(nodes[i]))
|
||||
_, err := p.remoteHeader(callCtx, nodes[i], addr)
|
||||
|
||||
cancel()
|
||||
|
||||
|
@ -224,7 +168,7 @@ func (p *Policer) processNodes(ctx context.Context, requirements *placementRequi
|
|||
// prevent spam with new replicas.
|
||||
// However, additional copies should not be removed in this case,
|
||||
// because we can remove the only copy this way.
|
||||
func (p *Policer) handleMaintenance(node netmap.NodeInfo, checkedNodes *nodeCache, shortage uint32, uncheckedCopies int) (uint32, int) {
|
||||
func (p *Policer) handleMaintenance(node netmap.NodeInfo, checkedNodes nodeCache, shortage uint32, uncheckedCopies int) (uint32, int) {
|
||||
checkedNodes.submitReplicaHolder(node)
|
||||
shortage--
|
||||
uncheckedCopies++
|
||||
|
@ -236,25 +180,29 @@ func (p *Policer) handleMaintenance(node netmap.NodeInfo, checkedNodes *nodeCach
|
|||
}
|
||||
|
||||
func (p *Policer) handleProcessNodesResult(ctx context.Context, addr oid.Address, requirements *placementRequirements,
|
||||
nodes []netmap.NodeInfo, checkedNodes *nodeCache, shortage uint32, uncheckedCopies int) {
|
||||
if shortage > 0 {
|
||||
nodes []netmap.NodeInfo, checkedNodes nodeCache, shortage uint32, uncheckedCopies int) {
|
||||
switch {
|
||||
case shortage > 0:
|
||||
p.log.Debug(logs.PolicerShortageOfObjectCopiesDetected,
|
||||
zap.Stringer("object", addr),
|
||||
zap.Uint32("shortage", shortage),
|
||||
)
|
||||
|
||||
var task replicator.Task
|
||||
task.SetObjectAddress(addr)
|
||||
task.SetNodes(nodes)
|
||||
task.SetCopiesNumber(shortage)
|
||||
task := replicator.Task{
|
||||
NumCopies: shortage,
|
||||
Addr: addr,
|
||||
Nodes: nodes,
|
||||
}
|
||||
|
||||
p.replicator.HandleTask(ctx, task, checkedNodes)
|
||||
} else if uncheckedCopies > 0 {
|
||||
|
||||
case uncheckedCopies > 0:
|
||||
// If we have more copies than needed, but some of them are from the maintenance nodes,
|
||||
// save the local copy.
|
||||
p.log.Debug(logs.PolicerSomeOfTheCopiesAreStoredOnNodesUnderMaintenance,
|
||||
zap.Int("count", uncheckedCopies))
|
||||
} else if uncheckedCopies == 0 {
|
||||
|
||||
case uncheckedCopies == 0:
|
||||
// Safe to remove: checked all copies, shortage == 0.
|
||||
requirements.removeLocalCopy = true
|
||||
}
|
||||
|
|
|
@ -11,14 +11,14 @@ func TestNodeCache(t *testing.T) {
|
|||
cache := newNodeCache()
|
||||
node := netmaptest.NodeInfo()
|
||||
|
||||
require.Negative(t, cache.processStatus(node))
|
||||
require.Equal(t, cache.processStatus(node), nodeNotProcessed)
|
||||
|
||||
cache.SubmitSuccessfulReplication(node)
|
||||
require.Zero(t, cache.processStatus(node))
|
||||
require.Equal(t, cache.processStatus(node), nodeHoldsObject)
|
||||
|
||||
cache.submitReplicaCandidate(node)
|
||||
require.Positive(t, cache.processStatus(node))
|
||||
require.Equal(t, cache.processStatus(node), nodeDoesNotHoldObject)
|
||||
|
||||
cache.submitReplicaHolder(node)
|
||||
require.Zero(t, cache.processStatus(node))
|
||||
require.Equal(t, cache.processStatus(node), nodeHoldsObject)
|
||||
}
|
||||
|
|
57
pkg/services/policer/nodecache.go
Normal file
57
pkg/services/policer/nodecache.go
Normal file
|
@ -0,0 +1,57 @@
|
|||
package policer
|
||||
|
||||
import "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||
|
||||
type nodeProcessStatus int8
|
||||
|
||||
const (
|
||||
nodeNotProcessed nodeProcessStatus = iota
|
||||
nodeDoesNotHoldObject
|
||||
nodeHoldsObject
|
||||
)
|
||||
|
||||
func (st nodeProcessStatus) Processed() bool {
|
||||
return st != nodeNotProcessed
|
||||
}
|
||||
|
||||
// nodeCache tracks Policer's check progress.
|
||||
type nodeCache map[uint64]bool
|
||||
|
||||
func newNodeCache() nodeCache {
|
||||
return make(map[uint64]bool)
|
||||
}
|
||||
|
||||
func (n nodeCache) set(node netmap.NodeInfo, val bool) {
|
||||
n[node.Hash()] = val
|
||||
}
|
||||
|
||||
// submits storage node as a candidate to store the object replica in case of
|
||||
// shortage.
|
||||
func (n nodeCache) submitReplicaCandidate(node netmap.NodeInfo) {
|
||||
n.set(node, false)
|
||||
}
|
||||
|
||||
// submits storage node as a current object replica holder.
|
||||
func (n nodeCache) submitReplicaHolder(node netmap.NodeInfo) {
|
||||
n.set(node, true)
|
||||
}
|
||||
|
||||
// processStatus returns current processing status of the storage node.
|
||||
func (n nodeCache) processStatus(node netmap.NodeInfo) nodeProcessStatus {
|
||||
switch val, ok := n[node.Hash()]; {
|
||||
case !ok:
|
||||
return nodeNotProcessed
|
||||
case val:
|
||||
return nodeHoldsObject
|
||||
default:
|
||||
return nodeDoesNotHoldObject
|
||||
}
|
||||
}
|
||||
|
||||
// SubmitSuccessfulReplication marks given storage node as a current object
|
||||
// replica holder.
|
||||
//
|
||||
// SubmitSuccessfulReplication implements replicator.TaskResult.
|
||||
func (n nodeCache) SubmitSuccessfulReplication(node netmap.NodeInfo) {
|
||||
n.submitReplicaHolder(node)
|
||||
}
|
185
pkg/services/policer/option.go
Normal file
185
pkg/services/policer/option.go
Normal file
|
@ -0,0 +1,185 @@
|
|||
package policer
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
|
||||
objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||
netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"github.com/panjf2000/ants/v2"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// KeySpaceIterator is the interface that allows iterating over the key space
|
||||
// of local storage.
|
||||
// Note that the underlying implementation might be circular: i.e. it can restart
|
||||
// when the end of the key space is reached.
|
||||
type KeySpaceIterator interface {
|
||||
Next(context.Context, uint32) ([]objectcore.AddressWithType, error)
|
||||
}
|
||||
|
||||
// RedundantCopyCallback is a callback to pass
|
||||
// the redundant local copy of the object.
|
||||
type RedundantCopyCallback func(context.Context, oid.Address)
|
||||
|
||||
// BuryFunc is the function to bury (i.e. inhume) an object.
|
||||
type BuryFunc func(context.Context, oid.Address) error
|
||||
|
||||
// Replicator is the interface to a consumer of replication tasks.
|
||||
type Replicator interface {
|
||||
HandleTask(ctx context.Context, task replicator.Task, res replicator.TaskResult)
|
||||
}
|
||||
|
||||
// RemoteObjectHeaderFunc is the function to obtain HEAD info from a specific remote node.
|
||||
type RemoteObjectHeaderFunc func(context.Context, netmapSDK.NodeInfo, oid.Address) (*objectSDK.Object, error)
|
||||
|
||||
// NodeLoader provides application load statistics.
|
||||
type nodeLoader interface {
|
||||
// ObjectServiceLoad returns object service load value in [0:1] range.
|
||||
ObjectServiceLoad() float64
|
||||
}
|
||||
|
||||
type cfg struct {
|
||||
headTimeout time.Duration
|
||||
|
||||
log *logger.Logger
|
||||
|
||||
keySpaceIterator KeySpaceIterator
|
||||
|
||||
buryFn BuryFunc
|
||||
|
||||
cnrSrc container.Source
|
||||
|
||||
placementBuilder placement.Builder
|
||||
|
||||
remoteHeader RemoteObjectHeaderFunc
|
||||
|
||||
netmapKeys netmap.AnnouncedKeys
|
||||
|
||||
replicator Replicator
|
||||
|
||||
cbRedundantCopy RedundantCopyCallback
|
||||
|
||||
taskPool *ants.Pool
|
||||
|
||||
loader nodeLoader
|
||||
|
||||
maxCapacity int
|
||||
|
||||
batchSize, cacheSize uint32
|
||||
|
||||
rebalanceFreq, evictDuration time.Duration
|
||||
}
|
||||
|
||||
func defaultCfg() *cfg {
|
||||
return &cfg{
|
||||
log: &logger.Logger{Logger: zap.L()},
|
||||
batchSize: 10,
|
||||
cacheSize: 1024, // 1024 * address size = 1024 * 64 = 64 MiB
|
||||
rebalanceFreq: 1 * time.Second,
|
||||
evictDuration: 30 * time.Second,
|
||||
}
|
||||
}
|
||||
|
||||
// Option is an option for Policer constructor.
|
||||
type Option func(*cfg)
|
||||
|
||||
// WithHeadTimeout returns option to set Head timeout of Policer.
|
||||
func WithHeadTimeout(v time.Duration) Option {
|
||||
return func(c *cfg) {
|
||||
c.headTimeout = v
|
||||
}
|
||||
}
|
||||
|
||||
// WithLogger returns option to set Logger of Policer.
|
||||
func WithLogger(v *logger.Logger) Option {
|
||||
return func(c *cfg) {
|
||||
c.log = v
|
||||
}
|
||||
}
|
||||
|
||||
func WithKeySpaceIterator(it KeySpaceIterator) Option {
|
||||
return func(c *cfg) {
|
||||
c.keySpaceIterator = it
|
||||
}
|
||||
}
|
||||
|
||||
func WithBuryFunc(f BuryFunc) Option {
|
||||
return func(c *cfg) {
|
||||
c.buryFn = f
|
||||
}
|
||||
}
|
||||
|
||||
// WithContainerSource returns option to set container source of Policer.
|
||||
func WithContainerSource(v container.Source) Option {
|
||||
return func(c *cfg) {
|
||||
c.cnrSrc = v
|
||||
}
|
||||
}
|
||||
|
||||
// WithPlacementBuilder returns option to set object placement builder of Policer.
|
||||
func WithPlacementBuilder(v placement.Builder) Option {
|
||||
return func(c *cfg) {
|
||||
c.placementBuilder = v
|
||||
}
|
||||
}
|
||||
|
||||
// WithRemoteObjectHeader returns option to set object header receiver of Policer.
|
||||
func WithRemoteObjectHeaderFunc(v RemoteObjectHeaderFunc) Option {
|
||||
return func(c *cfg) {
|
||||
c.remoteHeader = v
|
||||
}
|
||||
}
|
||||
|
||||
// WithNetmapKeys returns option to set tool to work with announced public keys.
|
||||
func WithNetmapKeys(v netmap.AnnouncedKeys) Option {
|
||||
return func(c *cfg) {
|
||||
c.netmapKeys = v
|
||||
}
|
||||
}
|
||||
|
||||
// WithReplicator returns option to set object replicator of Policer.
|
||||
func WithReplicator(v Replicator) Option {
|
||||
return func(c *cfg) {
|
||||
c.replicator = v
|
||||
}
|
||||
}
|
||||
|
||||
// WithRedundantCopyCallback returns option to set
|
||||
// callback to pass redundant local object copies
|
||||
// detected by Policer.
|
||||
func WithRedundantCopyCallback(cb RedundantCopyCallback) Option {
|
||||
return func(c *cfg) {
|
||||
c.cbRedundantCopy = cb
|
||||
}
|
||||
}
|
||||
|
||||
// WithMaxCapacity returns option to set max capacity
|
||||
// that can be set to the pool.
|
||||
func WithMaxCapacity(capacity int) Option {
|
||||
return func(c *cfg) {
|
||||
c.maxCapacity = capacity
|
||||
}
|
||||
}
|
||||
|
||||
// WithPool returns option to set pool for
|
||||
// policy and replication operations.
|
||||
func WithPool(p *ants.Pool) Option {
|
||||
return func(c *cfg) {
|
||||
c.taskPool = p
|
||||
}
|
||||
}
|
||||
|
||||
// WithNodeLoader returns option to set FrostFS node load source.
|
||||
func WithNodeLoader(l nodeLoader) Option {
|
||||
return func(c *cfg) {
|
||||
c.loader = l
|
||||
}
|
||||
}
|
|
@ -1,52 +1,40 @@
|
|||
package policer
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
|
||||
headsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/head"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
lru "github.com/hashicorp/golang-lru/v2"
|
||||
"github.com/panjf2000/ants/v2"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// NodeLoader provides application load statistics.
|
||||
type nodeLoader interface {
|
||||
// ObjectServiceLoad returns object service load value in [0:1] range.
|
||||
ObjectServiceLoad() float64
|
||||
}
|
||||
|
||||
type objectsInWork struct {
|
||||
m sync.RWMutex
|
||||
sync.RWMutex
|
||||
objs map[oid.Address]struct{}
|
||||
}
|
||||
|
||||
func (oiw *objectsInWork) inWork(addr oid.Address) bool {
|
||||
oiw.m.RLock()
|
||||
oiw.RLock()
|
||||
_, ok := oiw.objs[addr]
|
||||
oiw.m.RUnlock()
|
||||
oiw.RUnlock()
|
||||
|
||||
return ok
|
||||
}
|
||||
|
||||
func (oiw *objectsInWork) remove(addr oid.Address) {
|
||||
oiw.m.Lock()
|
||||
oiw.Lock()
|
||||
delete(oiw.objs, addr)
|
||||
oiw.m.Unlock()
|
||||
oiw.Unlock()
|
||||
}
|
||||
|
||||
func (oiw *objectsInWork) add(addr oid.Address) {
|
||||
oiw.m.Lock()
|
||||
func (oiw *objectsInWork) add(addr oid.Address) bool {
|
||||
oiw.Lock()
|
||||
_, exists := oiw.objs[addr]
|
||||
oiw.objs[addr] = struct{}{}
|
||||
oiw.m.Unlock()
|
||||
oiw.Unlock()
|
||||
return !exists
|
||||
}
|
||||
|
||||
// Policer represents the utility that verifies
|
||||
|
@ -59,53 +47,6 @@ type Policer struct {
|
|||
objsInWork *objectsInWork
|
||||
}
|
||||
|
||||
// Option is an option for Policer constructor.
|
||||
type Option func(*cfg)
|
||||
|
||||
// RedundantCopyCallback is a callback to pass
|
||||
// the redundant local copy of the object.
|
||||
type RedundantCopyCallback func(context.Context, oid.Address)
|
||||
|
||||
type cfg struct {
|
||||
headTimeout time.Duration
|
||||
|
||||
log *logger.Logger
|
||||
|
||||
jobQueue jobQueue
|
||||
|
||||
cnrSrc container.Source
|
||||
|
||||
placementBuilder placement.Builder
|
||||
|
||||
remoteHeader *headsvc.RemoteHeader
|
||||
|
||||
netmapKeys netmap.AnnouncedKeys
|
||||
|
||||
replicator *replicator.Replicator
|
||||
|
||||
cbRedundantCopy RedundantCopyCallback
|
||||
|
||||
taskPool *ants.Pool
|
||||
|
||||
loader nodeLoader
|
||||
|
||||
maxCapacity int
|
||||
|
||||
batchSize, cacheSize uint32
|
||||
|
||||
rebalanceFreq, evictDuration time.Duration
|
||||
}
|
||||
|
||||
func defaultCfg() *cfg {
|
||||
return &cfg{
|
||||
log: &logger.Logger{Logger: zap.L()},
|
||||
batchSize: 10,
|
||||
cacheSize: 1024, // 1024 * address size = 1024 * 64 = 64 MiB
|
||||
rebalanceFreq: 1 * time.Second,
|
||||
evictDuration: 30 * time.Second,
|
||||
}
|
||||
}
|
||||
|
||||
// New creates, initializes and returns Policer instance.
|
||||
func New(opts ...Option) *Policer {
|
||||
c := defaultCfg()
|
||||
|
@ -129,91 +70,3 @@ func New(opts ...Option) *Policer {
|
|||
},
|
||||
}
|
||||
}
|
||||
|
||||
// WithHeadTimeout returns option to set Head timeout of Policer.
|
||||
func WithHeadTimeout(v time.Duration) Option {
|
||||
return func(c *cfg) {
|
||||
c.headTimeout = v
|
||||
}
|
||||
}
|
||||
|
||||
// WithLogger returns option to set Logger of Policer.
|
||||
func WithLogger(v *logger.Logger) Option {
|
||||
return func(c *cfg) {
|
||||
c.log = v
|
||||
}
|
||||
}
|
||||
|
||||
// WithLocalStorage returns option to set local object storage of Policer.
|
||||
func WithLocalStorage(v *engine.StorageEngine) Option {
|
||||
return func(c *cfg) {
|
||||
c.jobQueue.localStorage = v
|
||||
}
|
||||
}
|
||||
|
||||
// WithContainerSource returns option to set container source of Policer.
|
||||
func WithContainerSource(v container.Source) Option {
|
||||
return func(c *cfg) {
|
||||
c.cnrSrc = v
|
||||
}
|
||||
}
|
||||
|
||||
// WithPlacementBuilder returns option to set object placement builder of Policer.
|
||||
func WithPlacementBuilder(v placement.Builder) Option {
|
||||
return func(c *cfg) {
|
||||
c.placementBuilder = v
|
||||
}
|
||||
}
|
||||
|
||||
// WithRemoteHeader returns option to set object header receiver of Policer.
|
||||
func WithRemoteHeader(v *headsvc.RemoteHeader) Option {
|
||||
return func(c *cfg) {
|
||||
c.remoteHeader = v
|
||||
}
|
||||
}
|
||||
|
||||
// WithNetmapKeys returns option to set tool to work with announced public keys.
|
||||
func WithNetmapKeys(v netmap.AnnouncedKeys) Option {
|
||||
return func(c *cfg) {
|
||||
c.netmapKeys = v
|
||||
}
|
||||
}
|
||||
|
||||
// WithReplicator returns option to set object replicator of Policer.
|
||||
func WithReplicator(v *replicator.Replicator) Option {
|
||||
return func(c *cfg) {
|
||||
c.replicator = v
|
||||
}
|
||||
}
|
||||
|
||||
// WithRedundantCopyCallback returns option to set
|
||||
// callback to pass redundant local object copies
|
||||
// detected by Policer.
|
||||
func WithRedundantCopyCallback(cb RedundantCopyCallback) Option {
|
||||
return func(c *cfg) {
|
||||
c.cbRedundantCopy = cb
|
||||
}
|
||||
}
|
||||
|
||||
// WithMaxCapacity returns option to set max capacity
|
||||
// that can be set to the pool.
|
||||
func WithMaxCapacity(capacity int) Option {
|
||||
return func(c *cfg) {
|
||||
c.maxCapacity = capacity
|
||||
}
|
||||
}
|
||||
|
||||
// WithPool returns option to set pool for
|
||||
// policy and replication operations.
|
||||
func WithPool(p *ants.Pool) Option {
|
||||
return func(c *cfg) {
|
||||
c.taskPool = p
|
||||
}
|
||||
}
|
||||
|
||||
// WithNodeLoader returns option to set FrostFS node load source.
|
||||
func WithNodeLoader(l nodeLoader) Option {
|
||||
return func(c *cfg) {
|
||||
c.loader = l
|
||||
}
|
||||
}
|
||||
|
|
299
pkg/services/policer/policer_test.go
Normal file
299
pkg/services/policer/policer_test.go
Normal file
|
@ -0,0 +1,299 @@
|
|||
package policer
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
"sort"
|
||||
"testing"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
||||
objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator"
|
||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
|
||||
"github.com/panjf2000/ants/v2"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestBuryObjectWithoutContainer(t *testing.T) {
|
||||
// Key space
|
||||
addr := oidtest.Address()
|
||||
objs := []objectcore.AddressWithType{
|
||||
{
|
||||
Address: addr,
|
||||
Type: object.TypeRegular,
|
||||
},
|
||||
}
|
||||
|
||||
// Container source and bury function
|
||||
buryCh := make(chan oid.Address)
|
||||
containerSrc := func(id cid.ID) (*container.Container, error) {
|
||||
return nil, apistatus.ContainerNotFound{}
|
||||
}
|
||||
buryFn := func(ctx context.Context, a oid.Address) error {
|
||||
buryCh <- a
|
||||
return nil
|
||||
}
|
||||
|
||||
// Task pool
|
||||
pool, err := ants.NewPool(4)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Policer instance
|
||||
p := New(
|
||||
WithKeySpaceIterator(&sliceKeySpaceIterator{objs: objs}),
|
||||
WithContainerSource(containerSrcFunc(containerSrc)),
|
||||
WithBuryFunc(buryFn),
|
||||
WithPool(pool),
|
||||
WithNodeLoader(constNodeLoader(0)),
|
||||
)
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
go p.Run(ctx)
|
||||
|
||||
require.Equal(t, addr, <-buryCh)
|
||||
}
|
||||
|
||||
func TestProcessObject(t *testing.T) {
|
||||
// Notes:
|
||||
// - nodes are referred to by their index throughout, which is embedded in the public key
|
||||
// - node with index 0 always refers to the local node, so there's no need to add it to objHolders
|
||||
// - policy is used only to match the number of replicas for each index in the placement
|
||||
tests := []struct {
|
||||
desc string
|
||||
objType object.Type
|
||||
nodeCount int
|
||||
policy string
|
||||
placement [][]int
|
||||
objHolders []int
|
||||
maintenanceNodes []int
|
||||
wantRemoveRedundant bool
|
||||
wantReplicateTo []int
|
||||
}{
|
||||
{
|
||||
desc: "1 copy already held by local node",
|
||||
nodeCount: 1,
|
||||
policy: `REP 1`,
|
||||
placement: [][]int{{0}},
|
||||
},
|
||||
{
|
||||
desc: "1 copy already held by the remote node",
|
||||
nodeCount: 2,
|
||||
policy: `REP 1`,
|
||||
placement: [][]int{{1}},
|
||||
objHolders: []int{1},
|
||||
wantRemoveRedundant: true,
|
||||
},
|
||||
{
|
||||
desc: "1 copy not yet held by the remote node",
|
||||
nodeCount: 2,
|
||||
policy: `REP 1`,
|
||||
placement: [][]int{{1}},
|
||||
wantReplicateTo: []int{1},
|
||||
},
|
||||
{
|
||||
desc: "2 copies already held by local and remote node",
|
||||
nodeCount: 2,
|
||||
policy: `REP 2`,
|
||||
placement: [][]int{{0, 1}},
|
||||
objHolders: []int{1},
|
||||
},
|
||||
{
|
||||
desc: "2 copies but not held by remote node",
|
||||
nodeCount: 2,
|
||||
policy: `REP 2`,
|
||||
placement: [][]int{{0, 1}},
|
||||
wantReplicateTo: []int{1},
|
||||
},
|
||||
{
|
||||
desc: "multiple vectors already held by remote node",
|
||||
nodeCount: 2,
|
||||
policy: `REP 2 REP 2`,
|
||||
placement: [][]int{{0, 1}, {0, 1}},
|
||||
objHolders: []int{1},
|
||||
},
|
||||
{
|
||||
desc: "multiple vectors not yet held by remote node",
|
||||
nodeCount: 2,
|
||||
policy: `REP 2 REP 2`,
|
||||
placement: [][]int{{0, 1}, {0, 1}},
|
||||
wantReplicateTo: []int{1, 1}, // is this actually good?
|
||||
},
|
||||
{
|
||||
desc: "lock object must be replicated to all nodes",
|
||||
objType: object.TypeLock,
|
||||
nodeCount: 3,
|
||||
policy: `REP 1`,
|
||||
placement: [][]int{{0, 1, 2}},
|
||||
wantReplicateTo: []int{1, 2},
|
||||
},
|
||||
{
|
||||
desc: "preserve local copy when maintenance nodes exist",
|
||||
nodeCount: 3,
|
||||
policy: `REP 2`,
|
||||
placement: [][]int{{1, 2}},
|
||||
objHolders: []int{1},
|
||||
maintenanceNodes: []int{2},
|
||||
},
|
||||
}
|
||||
|
||||
for i := range tests {
|
||||
ti := tests[i]
|
||||
t.Run(ti.desc, func(t *testing.T) {
|
||||
addr := oidtest.Address()
|
||||
|
||||
// Netmap, placement policy and placement builder
|
||||
nodes := make([]netmap.NodeInfo, ti.nodeCount)
|
||||
for i := range nodes {
|
||||
nodes[i].SetPublicKey([]byte{byte(i)})
|
||||
}
|
||||
for _, i := range ti.maintenanceNodes {
|
||||
nodes[i].SetMaintenance()
|
||||
}
|
||||
|
||||
var policy netmap.PlacementPolicy
|
||||
require.NoError(t, policy.DecodeString(ti.policy))
|
||||
|
||||
placementVectors := make([][]netmap.NodeInfo, len(ti.placement))
|
||||
for i, pv := range ti.placement {
|
||||
for _, nj := range pv {
|
||||
placementVectors[i] = append(placementVectors[i], nodes[nj])
|
||||
}
|
||||
}
|
||||
placementBuilder := func(cnr cid.ID, obj *oid.ID, p netmap.PlacementPolicy) ([][]netmap.NodeInfo, error) {
|
||||
if cnr.Equals(addr.Container()) && obj != nil && obj.Equals(addr.Object()) {
|
||||
return placementVectors, nil
|
||||
}
|
||||
t.Errorf("unexpected placement build: cid=%v oid=%v", cnr, obj)
|
||||
return nil, errors.New("unexpected placement build")
|
||||
}
|
||||
|
||||
// Object remote header
|
||||
headFn := func(_ context.Context, ni netmap.NodeInfo, a oid.Address) (*object.Object, error) {
|
||||
index := int(ni.PublicKey()[0])
|
||||
if a != addr || index < 1 || index >= ti.nodeCount {
|
||||
t.Errorf("unexpected remote object head: node=%+v addr=%v", ni, a)
|
||||
return nil, errors.New("unexpected object head")
|
||||
}
|
||||
for _, i := range ti.objHolders {
|
||||
if index == i {
|
||||
return nil, nil
|
||||
}
|
||||
}
|
||||
return nil, apistatus.ObjectNotFound{}
|
||||
}
|
||||
|
||||
// Container source
|
||||
cnr := &container.Container{}
|
||||
cnr.Value.Init()
|
||||
cnr.Value.SetPlacementPolicy(policy)
|
||||
containerSrc := func(id cid.ID) (*container.Container, error) {
|
||||
if id.Equals(addr.Container()) {
|
||||
return cnr, nil
|
||||
}
|
||||
t.Errorf("unexpected container requested: got=%v, want=%v", id, addr.Container())
|
||||
return nil, apistatus.ContainerNotFound{}
|
||||
}
|
||||
buryFn := func(ctx context.Context, a oid.Address) error {
|
||||
t.Errorf("unexpected object buried: %v", a)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Policer instance
|
||||
var gotRemoveRedundant bool
|
||||
var gotReplicateTo []int
|
||||
|
||||
p := New(
|
||||
WithContainerSource(containerSrcFunc(containerSrc)),
|
||||
WithPlacementBuilder(placementBuilderFunc(placementBuilder)),
|
||||
WithNetmapKeys(announcedKeysFunc(func(k []byte) bool {
|
||||
return bytes.Equal(k, nodes[0].PublicKey())
|
||||
})),
|
||||
WithRemoteObjectHeaderFunc(headFn),
|
||||
WithBuryFunc(buryFn),
|
||||
WithRedundantCopyCallback(func(_ context.Context, a oid.Address) {
|
||||
require.True(t, eqAddr(a, addr), "unexpected redundant copy callback: a=%v", a)
|
||||
gotRemoveRedundant = true
|
||||
}),
|
||||
WithReplicator(replicatorFunc(func(_ context.Context, task replicator.Task, res replicator.TaskResult) {
|
||||
require.True(t, eqAddr(task.Addr, addr), "unexpected replicator task: %+v", task)
|
||||
for _, node := range task.Nodes {
|
||||
gotReplicateTo = append(gotReplicateTo, int(node.PublicKey()[0]))
|
||||
}
|
||||
})),
|
||||
)
|
||||
|
||||
addrWithType := objectcore.AddressWithType{
|
||||
Address: addr,
|
||||
Type: ti.objType,
|
||||
}
|
||||
|
||||
p.processObject(context.Background(), addrWithType)
|
||||
sort.Ints(gotReplicateTo)
|
||||
|
||||
require.Equal(t, ti.wantRemoveRedundant, gotRemoveRedundant)
|
||||
require.Equal(t, ti.wantReplicateTo, gotReplicateTo)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// TODO(https://git.frostfs.info/TrueCloudLab/frostfs-sdk-go/issues/101)
|
||||
func eqAddr(a, b oid.Address) bool {
|
||||
return a.Container().Equals(b.Container()) && a.Object().Equals(b.Object())
|
||||
}
|
||||
|
||||
// sliceKeySpaceIterator is a KeySpaceIterator backed by a slice.
|
||||
type sliceKeySpaceIterator struct {
|
||||
objs []objectcore.AddressWithType
|
||||
cur int
|
||||
}
|
||||
|
||||
func (it *sliceKeySpaceIterator) Next(_ context.Context, size uint32) ([]objectcore.AddressWithType, error) {
|
||||
if it.cur >= len(it.objs) {
|
||||
it.cur = 0
|
||||
return nil, engine.ErrEndOfListing
|
||||
}
|
||||
end := it.cur + int(size)
|
||||
if end > len(it.objs) {
|
||||
end = len(it.objs)
|
||||
}
|
||||
ret := it.objs[it.cur:end]
|
||||
it.cur = end
|
||||
return ret, nil
|
||||
}
|
||||
|
||||
// containerSrcFunc is a container.Source backed by a function.
|
||||
type containerSrcFunc func(cid.ID) (*container.Container, error)
|
||||
|
||||
func (f containerSrcFunc) Get(id cid.ID) (*container.Container, error) { return f(id) }
|
||||
|
||||
// placementBuilderFunc is a placement.Builder backed by a function
|
||||
type placementBuilderFunc func(cid.ID, *oid.ID, netmap.PlacementPolicy) ([][]netmap.NodeInfo, error)
|
||||
|
||||
func (f placementBuilderFunc) BuildPlacement(c cid.ID, o *oid.ID, p netmap.PlacementPolicy) ([][]netmap.NodeInfo, error) {
|
||||
return f(c, o, p)
|
||||
}
|
||||
|
||||
// announcedKeysFunc is a netmap.AnnouncedKeys backed by a function.
|
||||
type announcedKeysFunc func([]byte) bool
|
||||
|
||||
func (f announcedKeysFunc) IsLocalKey(k []byte) bool { return f(k) }
|
||||
|
||||
// constNodeLoader is a nodeLoader that always returns a fixed value.
|
||||
type constNodeLoader float64
|
||||
|
||||
func (f constNodeLoader) ObjectServiceLoad() float64 { return float64(f) }
|
||||
|
||||
// replicatorFunc is a Replicator backed by a function.
|
||||
type replicatorFunc func(context.Context, replicator.Task, replicator.TaskResult)
|
||||
|
||||
func (f replicatorFunc) HandleTask(ctx context.Context, task replicator.Task, res replicator.TaskResult) {
|
||||
f(ctx, task, res)
|
||||
}
|
|
@ -6,27 +6,17 @@ import (
|
|||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
func (p *Policer) Run(ctx context.Context) {
|
||||
defer func() {
|
||||
p.log.Info(logs.PolicerRoutineStopped)
|
||||
}()
|
||||
|
||||
go p.poolCapacityWorker(ctx)
|
||||
p.shardPolicyWorker(ctx)
|
||||
p.log.Info(logs.PolicerRoutineStopped)
|
||||
}
|
||||
|
||||
func (p *Policer) shardPolicyWorker(ctx context.Context) {
|
||||
var (
|
||||
addrs []objectcore.AddressWithType
|
||||
cursor *engine.Cursor
|
||||
err error
|
||||
)
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
|
@ -34,7 +24,7 @@ func (p *Policer) shardPolicyWorker(ctx context.Context) {
|
|||
default:
|
||||
}
|
||||
|
||||
addrs, cursor, err = p.jobQueue.Select(ctx, cursor, p.batchSize)
|
||||
addrs, err := p.keySpaceIterator.Next(ctx, p.batchSize)
|
||||
if err != nil {
|
||||
if errors.Is(err, engine.ErrEndOfListing) {
|
||||
time.Sleep(time.Second) // finished whole cycle, sleep a bit
|
||||
|
@ -55,18 +45,17 @@ func (p *Policer) shardPolicyWorker(ctx context.Context) {
|
|||
continue
|
||||
}
|
||||
|
||||
err = p.taskPool.Submit(func() {
|
||||
err := p.taskPool.Submit(func() {
|
||||
v, ok := p.cache.Get(addr.Address)
|
||||
if ok && time.Since(v) < p.evictDuration {
|
||||
return
|
||||
}
|
||||
|
||||
p.objsInWork.add(addr.Address)
|
||||
|
||||
p.processObject(ctx, addr)
|
||||
|
||||
p.cache.Add(addr.Address, time.Now())
|
||||
p.objsInWork.remove(addr.Address)
|
||||
if p.objsInWork.add(addr.Address) {
|
||||
p.processObject(ctx, addr)
|
||||
p.cache.Add(addr.Address, time.Now())
|
||||
p.objsInWork.remove(addr.Address)
|
||||
}
|
||||
})
|
||||
if err != nil {
|
||||
p.log.Warn(logs.PolicerPoolSubmission, zap.Error(err))
|
||||
|
@ -78,10 +67,10 @@ func (p *Policer) shardPolicyWorker(ctx context.Context) {
|
|||
|
||||
func (p *Policer) poolCapacityWorker(ctx context.Context) {
|
||||
ticker := time.NewTicker(p.rebalanceFreq)
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
ticker.Stop()
|
||||
return
|
||||
case <-ticker.C:
|
||||
frostfsSysLoad := p.loader.ObjectServiceLoad()
|
||||
|
|
|
@ -1,26 +0,0 @@
|
|||
package policer
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
|
||||
)
|
||||
|
||||
type jobQueue struct {
|
||||
localStorage *engine.StorageEngine
|
||||
}
|
||||
|
||||
func (q *jobQueue) Select(ctx context.Context, cursor *engine.Cursor, count uint32) ([]objectcore.AddressWithType, *engine.Cursor, error) {
|
||||
var prm engine.ListWithCursorPrm
|
||||
prm.WithCursor(cursor)
|
||||
prm.WithCount(count)
|
||||
|
||||
res, err := q.localStorage.ListWithCursor(ctx, prm)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("cannot list objects in engine: %w", err)
|
||||
}
|
||||
|
||||
return res.AddressList(), res.Cursor(), nil
|
||||
}
|
|
@ -24,16 +24,16 @@ func (p *Replicator) HandleTask(ctx context.Context, task Task, res TaskResult)
|
|||
defer p.metrics.DecInFlightRequest()
|
||||
defer func() {
|
||||
p.log.Debug(logs.ReplicatorFinishWork,
|
||||
zap.Uint32("amount of unfinished replicas", task.quantity),
|
||||
zap.Uint32("amount of unfinished replicas", task.NumCopies),
|
||||
)
|
||||
}()
|
||||
|
||||
if task.obj == nil {
|
||||
if task.Obj == nil {
|
||||
var err error
|
||||
task.obj, err = engine.Get(ctx, p.localStorage, task.addr)
|
||||
task.Obj, err = engine.Get(ctx, p.localStorage, task.Addr)
|
||||
if err != nil {
|
||||
p.log.Error(logs.ReplicatorCouldNotGetObjectFromLocalStorage,
|
||||
zap.Stringer("object", task.addr),
|
||||
zap.Stringer("object", task.Addr),
|
||||
zap.Error(err))
|
||||
|
||||
return
|
||||
|
@ -41,9 +41,9 @@ func (p *Replicator) HandleTask(ctx context.Context, task Task, res TaskResult)
|
|||
}
|
||||
|
||||
prm := new(putsvc.RemotePutPrm).
|
||||
WithObject(task.obj)
|
||||
WithObject(task.Obj)
|
||||
|
||||
for i := 0; task.quantity > 0 && i < len(task.nodes); i++ {
|
||||
for i := 0; task.NumCopies > 0 && i < len(task.Nodes); i++ {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
|
@ -51,13 +51,13 @@ func (p *Replicator) HandleTask(ctx context.Context, task Task, res TaskResult)
|
|||
}
|
||||
|
||||
log := p.log.With(
|
||||
zap.String("node", netmap.StringifyPublicKey(task.nodes[i])),
|
||||
zap.Stringer("object", task.addr),
|
||||
zap.String("node", netmap.StringifyPublicKey(task.Nodes[i])),
|
||||
zap.Stringer("object", task.Addr),
|
||||
)
|
||||
|
||||
callCtx, cancel := context.WithTimeout(ctx, p.putTimeout)
|
||||
|
||||
err := p.remoteSender.PutObject(callCtx, prm.WithNodeInfo(task.nodes[i]))
|
||||
err := p.remoteSender.PutObject(callCtx, prm.WithNodeInfo(task.Nodes[i]))
|
||||
|
||||
cancel()
|
||||
|
||||
|
@ -68,12 +68,12 @@ func (p *Replicator) HandleTask(ctx context.Context, task Task, res TaskResult)
|
|||
} else {
|
||||
log.Debug(logs.ReplicatorObjectSuccessfullyReplicated)
|
||||
|
||||
task.quantity--
|
||||
task.NumCopies--
|
||||
|
||||
res.SubmitSuccessfulReplication(task.nodes[i])
|
||||
res.SubmitSuccessfulReplication(task.Nodes[i])
|
||||
|
||||
p.metrics.IncProcessedObjects()
|
||||
p.metrics.AddPayloadSize(int64(task.obj.PayloadSize()))
|
||||
p.metrics.AddPayloadSize(int64(task.Obj.PayloadSize()))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -8,31 +8,12 @@ import (
|
|||
|
||||
// Task represents group of Replicator task parameters.
|
||||
type Task struct {
|
||||
quantity uint32
|
||||
|
||||
addr oid.Address
|
||||
|
||||
obj *objectSDK.Object
|
||||
|
||||
nodes []netmap.NodeInfo
|
||||
}
|
||||
|
||||
// SetCopiesNumber sets number of copies to replicate.
|
||||
func (t *Task) SetCopiesNumber(v uint32) {
|
||||
t.quantity = v
|
||||
}
|
||||
|
||||
// SetObjectAddress sets address of local object.
|
||||
func (t *Task) SetObjectAddress(v oid.Address) {
|
||||
t.addr = v
|
||||
}
|
||||
|
||||
// SetObject sets object to avoid fetching it from the local storage.
|
||||
func (t *Task) SetObject(obj *objectSDK.Object) {
|
||||
t.obj = obj
|
||||
}
|
||||
|
||||
// SetNodes sets a list of potential object holders.
|
||||
func (t *Task) SetNodes(v []netmap.NodeInfo) {
|
||||
t.nodes = v
|
||||
// NumCopies is the number of copies to replicate.
|
||||
NumCopies uint32
|
||||
// Addr is the address of the local object.
|
||||
Addr oid.Address
|
||||
// Obj is the object to avoid fetching it from the local storage.
|
||||
Obj *objectSDK.Object
|
||||
// Nodes is a list of potential object holders.
|
||||
Nodes []netmap.NodeInfo
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue