forked from TrueCloudLab/frostfs-node
[#92] Refactor policer and add some unit tests
Signed-off-by: Alejandro Lopez <a.lopez@yadro.com>
This commit is contained in:
parent
0c5b025788
commit
f9730f090d
13 changed files with 640 additions and 310 deletions
28
cmd/frostfs-node/keyspaceiterator.go
Normal file
28
cmd/frostfs-node/keyspaceiterator.go
Normal file
|
@ -0,0 +1,28 @@
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
|
||||||
|
)
|
||||||
|
|
||||||
|
type keySpaceIterator struct {
|
||||||
|
ng *engine.StorageEngine
|
||||||
|
cur *engine.Cursor
|
||||||
|
}
|
||||||
|
|
||||||
|
func (it *keySpaceIterator) Next(ctx context.Context, batchSize uint32) ([]objectcore.AddressWithType, error) {
|
||||||
|
var prm engine.ListWithCursorPrm
|
||||||
|
prm.WithCursor(it.cur)
|
||||||
|
prm.WithCount(batchSize)
|
||||||
|
|
||||||
|
res, err := it.ng.ListWithCursor(ctx, prm)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("cannot list objects in engine: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
it.cur = res.Cursor()
|
||||||
|
return res.AddressList(), nil
|
||||||
|
}
|
|
@ -38,6 +38,7 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator"
|
||||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
eaclSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/eacl"
|
eaclSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/eacl"
|
||||||
|
netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
||||||
|
@ -211,15 +212,30 @@ func addPolicer(c *cfg, keyStorage *util.KeyStorage, clientConstructor *cache.Cl
|
||||||
|
|
||||||
ls := c.cfgObject.cfgLocalStorage.localStorage
|
ls := c.cfgObject.cfgLocalStorage.localStorage
|
||||||
|
|
||||||
|
buryFn := func(ctx context.Context, addr oid.Address) error {
|
||||||
|
var prm engine.InhumePrm
|
||||||
|
prm.MarkAsGarbage(addr)
|
||||||
|
prm.WithForceRemoval()
|
||||||
|
|
||||||
|
_, err := ls.Inhume(ctx, prm)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
remoteHeader := headsvc.NewRemoteHeader(keyStorage, clientConstructor)
|
||||||
|
|
||||||
pol := policer.New(
|
pol := policer.New(
|
||||||
policer.WithLogger(c.log),
|
policer.WithLogger(c.log),
|
||||||
policer.WithLocalStorage(ls),
|
policer.WithKeySpaceIterator(&keySpaceIterator{ng: ls}),
|
||||||
|
policer.WithBuryFunc(buryFn),
|
||||||
policer.WithContainerSource(c.cfgObject.cnrSource),
|
policer.WithContainerSource(c.cfgObject.cnrSource),
|
||||||
policer.WithPlacementBuilder(
|
policer.WithPlacementBuilder(
|
||||||
placement.NewNetworkMapSourceBuilder(c.netMapSource),
|
placement.NewNetworkMapSourceBuilder(c.netMapSource),
|
||||||
),
|
),
|
||||||
policer.WithRemoteHeader(
|
policer.WithRemoteObjectHeaderFunc(
|
||||||
headsvc.NewRemoteHeader(keyStorage, clientConstructor),
|
func(ctx context.Context, ni netmapSDK.NodeInfo, a oid.Address) (*objectSDK.Object, error) {
|
||||||
|
prm := new(headsvc.RemoteHeadPrm).WithNodeInfo(ni).WithObjectAddress(a)
|
||||||
|
return remoteHeader.Head(ctx, prm)
|
||||||
|
},
|
||||||
),
|
),
|
||||||
policer.WithNetmapKeys(c),
|
policer.WithNetmapKeys(c),
|
||||||
policer.WithHeadTimeout(
|
policer.WithHeadTimeout(
|
||||||
|
|
|
@ -84,11 +84,12 @@ func (s *Server) replicate(ctx context.Context, addr oid.Address, obj *objectSDK
|
||||||
}
|
}
|
||||||
|
|
||||||
var res replicatorResult
|
var res replicatorResult
|
||||||
var task replicator.Task
|
task := replicator.Task{
|
||||||
task.SetObject(obj)
|
NumCopies: 1,
|
||||||
task.SetObjectAddress(addr)
|
Addr: addr,
|
||||||
task.SetCopiesNumber(1)
|
Obj: obj,
|
||||||
task.SetNodes(nodes)
|
Nodes: nodes,
|
||||||
|
}
|
||||||
s.replicator.HandleTask(ctx, task, &res)
|
s.replicator.HandleTask(ctx, task, &res)
|
||||||
|
|
||||||
if res.count == 0 {
|
if res.count == 0 {
|
||||||
|
|
|
@ -7,8 +7,6 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
||||||
objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
|
|
||||||
headsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/head"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
||||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||||
|
@ -18,55 +16,6 @@ import (
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
// tracks Policer's check progress.
|
|
||||||
type nodeCache map[uint64]bool
|
|
||||||
|
|
||||||
func newNodeCache() *nodeCache {
|
|
||||||
m := make(map[uint64]bool)
|
|
||||||
return (*nodeCache)(&m)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (n *nodeCache) set(node netmap.NodeInfo, val bool) {
|
|
||||||
(*n)[node.Hash()] = val
|
|
||||||
}
|
|
||||||
|
|
||||||
// submits storage node as a candidate to store the object replica in case of
|
|
||||||
// shortage.
|
|
||||||
func (n *nodeCache) submitReplicaCandidate(node netmap.NodeInfo) {
|
|
||||||
n.set(node, false)
|
|
||||||
}
|
|
||||||
|
|
||||||
// submits storage node as a current object replica holder.
|
|
||||||
func (n *nodeCache) submitReplicaHolder(node netmap.NodeInfo) {
|
|
||||||
n.set(node, true)
|
|
||||||
}
|
|
||||||
|
|
||||||
// processStatus returns current processing status of the storage node
|
|
||||||
//
|
|
||||||
// >0 if node does not currently hold the object
|
|
||||||
// 0 if node already holds the object
|
|
||||||
// <0 if node has not been processed yet
|
|
||||||
func (n *nodeCache) processStatus(node netmap.NodeInfo) int8 {
|
|
||||||
val, ok := (*n)[node.Hash()]
|
|
||||||
if !ok {
|
|
||||||
return -1
|
|
||||||
}
|
|
||||||
|
|
||||||
if val {
|
|
||||||
return 0
|
|
||||||
}
|
|
||||||
|
|
||||||
return 1
|
|
||||||
}
|
|
||||||
|
|
||||||
// SubmitSuccessfulReplication marks given storage node as a current object
|
|
||||||
// replica holder.
|
|
||||||
//
|
|
||||||
// SubmitSuccessfulReplication implements replicator.TaskResult.
|
|
||||||
func (n *nodeCache) SubmitSuccessfulReplication(node netmap.NodeInfo) {
|
|
||||||
n.submitReplicaHolder(node)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *Policer) processObject(ctx context.Context, addrWithType objectcore.AddressWithType) {
|
func (p *Policer) processObject(ctx context.Context, addrWithType objectcore.AddressWithType) {
|
||||||
addr := addrWithType.Address
|
addr := addrWithType.Address
|
||||||
idCnr := addr.Container()
|
idCnr := addr.Container()
|
||||||
|
@ -79,11 +28,7 @@ func (p *Policer) processObject(ctx context.Context, addrWithType objectcore.Add
|
||||||
zap.String("error", err.Error()),
|
zap.String("error", err.Error()),
|
||||||
)
|
)
|
||||||
if container.IsErrNotFound(err) {
|
if container.IsErrNotFound(err) {
|
||||||
var prm engine.InhumePrm
|
err := p.buryFn(ctx, addrWithType.Address)
|
||||||
prm.MarkAsGarbage(addrWithType.Address)
|
|
||||||
prm.WithForceRemoval()
|
|
||||||
|
|
||||||
_, err := p.jobQueue.localStorage.Inhume(ctx, prm)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
p.log.Error(logs.PolicerCouldNotInhumeObjectWithMissingContainer,
|
p.log.Error(logs.PolicerCouldNotInhumeObjectWithMissingContainer,
|
||||||
zap.Stringer("cid", idCnr),
|
zap.Stringer("cid", idCnr),
|
||||||
|
@ -145,10 +90,9 @@ type placementRequirements struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *Policer) processNodes(ctx context.Context, requirements *placementRequirements, addrWithType objectcore.AddressWithType,
|
func (p *Policer) processNodes(ctx context.Context, requirements *placementRequirements, addrWithType objectcore.AddressWithType,
|
||||||
nodes []netmap.NodeInfo, shortage uint32, checkedNodes *nodeCache) {
|
nodes []netmap.NodeInfo, shortage uint32, checkedNodes nodeCache) {
|
||||||
addr := addrWithType.Address
|
addr := addrWithType.Address
|
||||||
typ := addrWithType.Type
|
typ := addrWithType.Type
|
||||||
prm := new(headsvc.RemoteHeadPrm).WithObjectAddress(addr)
|
|
||||||
|
|
||||||
// Number of copies that are stored on maintenance nodes.
|
// Number of copies that are stored on maintenance nodes.
|
||||||
var uncheckedCopies int
|
var uncheckedCopies int
|
||||||
|
@ -175,8 +119,8 @@ func (p *Policer) processNodes(ctx context.Context, requirements *placementRequi
|
||||||
} else if nodes[i].IsMaintenance() {
|
} else if nodes[i].IsMaintenance() {
|
||||||
shortage, uncheckedCopies = p.handleMaintenance(nodes[i], checkedNodes, shortage, uncheckedCopies)
|
shortage, uncheckedCopies = p.handleMaintenance(nodes[i], checkedNodes, shortage, uncheckedCopies)
|
||||||
} else {
|
} else {
|
||||||
if status := checkedNodes.processStatus(nodes[i]); status >= 0 {
|
if status := checkedNodes.processStatus(nodes[i]); status.Processed() {
|
||||||
if status == 0 {
|
if status == nodeHoldsObject {
|
||||||
// node already contains replica, no need to replicate
|
// node already contains replica, no need to replicate
|
||||||
nodes = append(nodes[:i], nodes[i+1:]...)
|
nodes = append(nodes[:i], nodes[i+1:]...)
|
||||||
i--
|
i--
|
||||||
|
@ -188,7 +132,7 @@ func (p *Policer) processNodes(ctx context.Context, requirements *placementRequi
|
||||||
|
|
||||||
callCtx, cancel := context.WithTimeout(ctx, p.headTimeout)
|
callCtx, cancel := context.WithTimeout(ctx, p.headTimeout)
|
||||||
|
|
||||||
_, err := p.remoteHeader.Head(callCtx, prm.WithNodeInfo(nodes[i]))
|
_, err := p.remoteHeader(callCtx, nodes[i], addr)
|
||||||
|
|
||||||
cancel()
|
cancel()
|
||||||
|
|
||||||
|
@ -224,7 +168,7 @@ func (p *Policer) processNodes(ctx context.Context, requirements *placementRequi
|
||||||
// prevent spam with new replicas.
|
// prevent spam with new replicas.
|
||||||
// However, additional copies should not be removed in this case,
|
// However, additional copies should not be removed in this case,
|
||||||
// because we can remove the only copy this way.
|
// because we can remove the only copy this way.
|
||||||
func (p *Policer) handleMaintenance(node netmap.NodeInfo, checkedNodes *nodeCache, shortage uint32, uncheckedCopies int) (uint32, int) {
|
func (p *Policer) handleMaintenance(node netmap.NodeInfo, checkedNodes nodeCache, shortage uint32, uncheckedCopies int) (uint32, int) {
|
||||||
checkedNodes.submitReplicaHolder(node)
|
checkedNodes.submitReplicaHolder(node)
|
||||||
shortage--
|
shortage--
|
||||||
uncheckedCopies++
|
uncheckedCopies++
|
||||||
|
@ -236,25 +180,29 @@ func (p *Policer) handleMaintenance(node netmap.NodeInfo, checkedNodes *nodeCach
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *Policer) handleProcessNodesResult(ctx context.Context, addr oid.Address, requirements *placementRequirements,
|
func (p *Policer) handleProcessNodesResult(ctx context.Context, addr oid.Address, requirements *placementRequirements,
|
||||||
nodes []netmap.NodeInfo, checkedNodes *nodeCache, shortage uint32, uncheckedCopies int) {
|
nodes []netmap.NodeInfo, checkedNodes nodeCache, shortage uint32, uncheckedCopies int) {
|
||||||
if shortage > 0 {
|
switch {
|
||||||
|
case shortage > 0:
|
||||||
p.log.Debug(logs.PolicerShortageOfObjectCopiesDetected,
|
p.log.Debug(logs.PolicerShortageOfObjectCopiesDetected,
|
||||||
zap.Stringer("object", addr),
|
zap.Stringer("object", addr),
|
||||||
zap.Uint32("shortage", shortage),
|
zap.Uint32("shortage", shortage),
|
||||||
)
|
)
|
||||||
|
|
||||||
var task replicator.Task
|
task := replicator.Task{
|
||||||
task.SetObjectAddress(addr)
|
NumCopies: shortage,
|
||||||
task.SetNodes(nodes)
|
Addr: addr,
|
||||||
task.SetCopiesNumber(shortage)
|
Nodes: nodes,
|
||||||
|
}
|
||||||
|
|
||||||
p.replicator.HandleTask(ctx, task, checkedNodes)
|
p.replicator.HandleTask(ctx, task, checkedNodes)
|
||||||
} else if uncheckedCopies > 0 {
|
|
||||||
|
case uncheckedCopies > 0:
|
||||||
// If we have more copies than needed, but some of them are from the maintenance nodes,
|
// If we have more copies than needed, but some of them are from the maintenance nodes,
|
||||||
// save the local copy.
|
// save the local copy.
|
||||||
p.log.Debug(logs.PolicerSomeOfTheCopiesAreStoredOnNodesUnderMaintenance,
|
p.log.Debug(logs.PolicerSomeOfTheCopiesAreStoredOnNodesUnderMaintenance,
|
||||||
zap.Int("count", uncheckedCopies))
|
zap.Int("count", uncheckedCopies))
|
||||||
} else if uncheckedCopies == 0 {
|
|
||||||
|
case uncheckedCopies == 0:
|
||||||
// Safe to remove: checked all copies, shortage == 0.
|
// Safe to remove: checked all copies, shortage == 0.
|
||||||
requirements.removeLocalCopy = true
|
requirements.removeLocalCopy = true
|
||||||
}
|
}
|
||||||
|
|
|
@ -11,14 +11,14 @@ func TestNodeCache(t *testing.T) {
|
||||||
cache := newNodeCache()
|
cache := newNodeCache()
|
||||||
node := netmaptest.NodeInfo()
|
node := netmaptest.NodeInfo()
|
||||||
|
|
||||||
require.Negative(t, cache.processStatus(node))
|
require.Equal(t, cache.processStatus(node), nodeNotProcessed)
|
||||||
|
|
||||||
cache.SubmitSuccessfulReplication(node)
|
cache.SubmitSuccessfulReplication(node)
|
||||||
require.Zero(t, cache.processStatus(node))
|
require.Equal(t, cache.processStatus(node), nodeHoldsObject)
|
||||||
|
|
||||||
cache.submitReplicaCandidate(node)
|
cache.submitReplicaCandidate(node)
|
||||||
require.Positive(t, cache.processStatus(node))
|
require.Equal(t, cache.processStatus(node), nodeDoesNotHoldObject)
|
||||||
|
|
||||||
cache.submitReplicaHolder(node)
|
cache.submitReplicaHolder(node)
|
||||||
require.Zero(t, cache.processStatus(node))
|
require.Equal(t, cache.processStatus(node), nodeHoldsObject)
|
||||||
}
|
}
|
||||||
|
|
57
pkg/services/policer/nodecache.go
Normal file
57
pkg/services/policer/nodecache.go
Normal file
|
@ -0,0 +1,57 @@
|
||||||
|
package policer
|
||||||
|
|
||||||
|
import "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||||
|
|
||||||
|
type nodeProcessStatus int8
|
||||||
|
|
||||||
|
const (
|
||||||
|
nodeNotProcessed nodeProcessStatus = iota
|
||||||
|
nodeDoesNotHoldObject
|
||||||
|
nodeHoldsObject
|
||||||
|
)
|
||||||
|
|
||||||
|
func (st nodeProcessStatus) Processed() bool {
|
||||||
|
return st != nodeNotProcessed
|
||||||
|
}
|
||||||
|
|
||||||
|
// nodeCache tracks Policer's check progress.
|
||||||
|
type nodeCache map[uint64]bool
|
||||||
|
|
||||||
|
func newNodeCache() nodeCache {
|
||||||
|
return make(map[uint64]bool)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (n nodeCache) set(node netmap.NodeInfo, val bool) {
|
||||||
|
n[node.Hash()] = val
|
||||||
|
}
|
||||||
|
|
||||||
|
// submits storage node as a candidate to store the object replica in case of
|
||||||
|
// shortage.
|
||||||
|
func (n nodeCache) submitReplicaCandidate(node netmap.NodeInfo) {
|
||||||
|
n.set(node, false)
|
||||||
|
}
|
||||||
|
|
||||||
|
// submits storage node as a current object replica holder.
|
||||||
|
func (n nodeCache) submitReplicaHolder(node netmap.NodeInfo) {
|
||||||
|
n.set(node, true)
|
||||||
|
}
|
||||||
|
|
||||||
|
// processStatus returns current processing status of the storage node.
|
||||||
|
func (n nodeCache) processStatus(node netmap.NodeInfo) nodeProcessStatus {
|
||||||
|
switch val, ok := n[node.Hash()]; {
|
||||||
|
case !ok:
|
||||||
|
return nodeNotProcessed
|
||||||
|
case val:
|
||||||
|
return nodeHoldsObject
|
||||||
|
default:
|
||||||
|
return nodeDoesNotHoldObject
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// SubmitSuccessfulReplication marks given storage node as a current object
|
||||||
|
// replica holder.
|
||||||
|
//
|
||||||
|
// SubmitSuccessfulReplication implements replicator.TaskResult.
|
||||||
|
func (n nodeCache) SubmitSuccessfulReplication(node netmap.NodeInfo) {
|
||||||
|
n.submitReplicaHolder(node)
|
||||||
|
}
|
185
pkg/services/policer/option.go
Normal file
185
pkg/services/policer/option.go
Normal file
|
@ -0,0 +1,185 @@
|
||||||
|
package policer
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
|
||||||
|
objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||||
|
netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||||
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
|
"github.com/panjf2000/ants/v2"
|
||||||
|
"go.uber.org/zap"
|
||||||
|
)
|
||||||
|
|
||||||
|
// KeySpaceIterator is the interface that allows iterating over the key space
|
||||||
|
// of local storage.
|
||||||
|
// Note that the underlying implementation might be circular: i.e. it can restart
|
||||||
|
// when the end of the key space is reached.
|
||||||
|
type KeySpaceIterator interface {
|
||||||
|
Next(context.Context, uint32) ([]objectcore.AddressWithType, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
// RedundantCopyCallback is a callback to pass
|
||||||
|
// the redundant local copy of the object.
|
||||||
|
type RedundantCopyCallback func(context.Context, oid.Address)
|
||||||
|
|
||||||
|
// BuryFunc is the function to bury (i.e. inhume) an object.
|
||||||
|
type BuryFunc func(context.Context, oid.Address) error
|
||||||
|
|
||||||
|
// Replicator is the interface to a consumer of replication tasks.
|
||||||
|
type Replicator interface {
|
||||||
|
HandleTask(ctx context.Context, task replicator.Task, res replicator.TaskResult)
|
||||||
|
}
|
||||||
|
|
||||||
|
// RemoteObjectHeaderFunc is the function to obtain HEAD info from a specific remote node.
|
||||||
|
type RemoteObjectHeaderFunc func(context.Context, netmapSDK.NodeInfo, oid.Address) (*objectSDK.Object, error)
|
||||||
|
|
||||||
|
// NodeLoader provides application load statistics.
|
||||||
|
type nodeLoader interface {
|
||||||
|
// ObjectServiceLoad returns object service load value in [0:1] range.
|
||||||
|
ObjectServiceLoad() float64
|
||||||
|
}
|
||||||
|
|
||||||
|
type cfg struct {
|
||||||
|
headTimeout time.Duration
|
||||||
|
|
||||||
|
log *logger.Logger
|
||||||
|
|
||||||
|
keySpaceIterator KeySpaceIterator
|
||||||
|
|
||||||
|
buryFn BuryFunc
|
||||||
|
|
||||||
|
cnrSrc container.Source
|
||||||
|
|
||||||
|
placementBuilder placement.Builder
|
||||||
|
|
||||||
|
remoteHeader RemoteObjectHeaderFunc
|
||||||
|
|
||||||
|
netmapKeys netmap.AnnouncedKeys
|
||||||
|
|
||||||
|
replicator Replicator
|
||||||
|
|
||||||
|
cbRedundantCopy RedundantCopyCallback
|
||||||
|
|
||||||
|
taskPool *ants.Pool
|
||||||
|
|
||||||
|
loader nodeLoader
|
||||||
|
|
||||||
|
maxCapacity int
|
||||||
|
|
||||||
|
batchSize, cacheSize uint32
|
||||||
|
|
||||||
|
rebalanceFreq, evictDuration time.Duration
|
||||||
|
}
|
||||||
|
|
||||||
|
func defaultCfg() *cfg {
|
||||||
|
return &cfg{
|
||||||
|
log: &logger.Logger{Logger: zap.L()},
|
||||||
|
batchSize: 10,
|
||||||
|
cacheSize: 1024, // 1024 * address size = 1024 * 64 = 64 MiB
|
||||||
|
rebalanceFreq: 1 * time.Second,
|
||||||
|
evictDuration: 30 * time.Second,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Option is an option for Policer constructor.
|
||||||
|
type Option func(*cfg)
|
||||||
|
|
||||||
|
// WithHeadTimeout returns option to set Head timeout of Policer.
|
||||||
|
func WithHeadTimeout(v time.Duration) Option {
|
||||||
|
return func(c *cfg) {
|
||||||
|
c.headTimeout = v
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// WithLogger returns option to set Logger of Policer.
|
||||||
|
func WithLogger(v *logger.Logger) Option {
|
||||||
|
return func(c *cfg) {
|
||||||
|
c.log = v
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func WithKeySpaceIterator(it KeySpaceIterator) Option {
|
||||||
|
return func(c *cfg) {
|
||||||
|
c.keySpaceIterator = it
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func WithBuryFunc(f BuryFunc) Option {
|
||||||
|
return func(c *cfg) {
|
||||||
|
c.buryFn = f
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// WithContainerSource returns option to set container source of Policer.
|
||||||
|
func WithContainerSource(v container.Source) Option {
|
||||||
|
return func(c *cfg) {
|
||||||
|
c.cnrSrc = v
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// WithPlacementBuilder returns option to set object placement builder of Policer.
|
||||||
|
func WithPlacementBuilder(v placement.Builder) Option {
|
||||||
|
return func(c *cfg) {
|
||||||
|
c.placementBuilder = v
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// WithRemoteObjectHeader returns option to set object header receiver of Policer.
|
||||||
|
func WithRemoteObjectHeaderFunc(v RemoteObjectHeaderFunc) Option {
|
||||||
|
return func(c *cfg) {
|
||||||
|
c.remoteHeader = v
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// WithNetmapKeys returns option to set tool to work with announced public keys.
|
||||||
|
func WithNetmapKeys(v netmap.AnnouncedKeys) Option {
|
||||||
|
return func(c *cfg) {
|
||||||
|
c.netmapKeys = v
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// WithReplicator returns option to set object replicator of Policer.
|
||||||
|
func WithReplicator(v Replicator) Option {
|
||||||
|
return func(c *cfg) {
|
||||||
|
c.replicator = v
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// WithRedundantCopyCallback returns option to set
|
||||||
|
// callback to pass redundant local object copies
|
||||||
|
// detected by Policer.
|
||||||
|
func WithRedundantCopyCallback(cb RedundantCopyCallback) Option {
|
||||||
|
return func(c *cfg) {
|
||||||
|
c.cbRedundantCopy = cb
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// WithMaxCapacity returns option to set max capacity
|
||||||
|
// that can be set to the pool.
|
||||||
|
func WithMaxCapacity(capacity int) Option {
|
||||||
|
return func(c *cfg) {
|
||||||
|
c.maxCapacity = capacity
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// WithPool returns option to set pool for
|
||||||
|
// policy and replication operations.
|
||||||
|
func WithPool(p *ants.Pool) Option {
|
||||||
|
return func(c *cfg) {
|
||||||
|
c.taskPool = p
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// WithNodeLoader returns option to set FrostFS node load source.
|
||||||
|
func WithNodeLoader(l nodeLoader) Option {
|
||||||
|
return func(c *cfg) {
|
||||||
|
c.loader = l
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,29 +1,15 @@
|
||||||
package policer
|
package policer
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
|
|
||||||
headsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/head"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
lru "github.com/hashicorp/golang-lru/v2"
|
lru "github.com/hashicorp/golang-lru/v2"
|
||||||
"github.com/panjf2000/ants/v2"
|
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
// NodeLoader provides application load statistics.
|
|
||||||
type nodeLoader interface {
|
|
||||||
// ObjectServiceLoad returns object service load value in [0:1] range.
|
|
||||||
ObjectServiceLoad() float64
|
|
||||||
}
|
|
||||||
|
|
||||||
type objectsInWork struct {
|
type objectsInWork struct {
|
||||||
m sync.RWMutex
|
m sync.RWMutex
|
||||||
objs map[oid.Address]struct{}
|
objs map[oid.Address]struct{}
|
||||||
|
@ -59,53 +45,6 @@ type Policer struct {
|
||||||
objsInWork *objectsInWork
|
objsInWork *objectsInWork
|
||||||
}
|
}
|
||||||
|
|
||||||
// Option is an option for Policer constructor.
|
|
||||||
type Option func(*cfg)
|
|
||||||
|
|
||||||
// RedundantCopyCallback is a callback to pass
|
|
||||||
// the redundant local copy of the object.
|
|
||||||
type RedundantCopyCallback func(context.Context, oid.Address)
|
|
||||||
|
|
||||||
type cfg struct {
|
|
||||||
headTimeout time.Duration
|
|
||||||
|
|
||||||
log *logger.Logger
|
|
||||||
|
|
||||||
jobQueue jobQueue
|
|
||||||
|
|
||||||
cnrSrc container.Source
|
|
||||||
|
|
||||||
placementBuilder placement.Builder
|
|
||||||
|
|
||||||
remoteHeader *headsvc.RemoteHeader
|
|
||||||
|
|
||||||
netmapKeys netmap.AnnouncedKeys
|
|
||||||
|
|
||||||
replicator *replicator.Replicator
|
|
||||||
|
|
||||||
cbRedundantCopy RedundantCopyCallback
|
|
||||||
|
|
||||||
taskPool *ants.Pool
|
|
||||||
|
|
||||||
loader nodeLoader
|
|
||||||
|
|
||||||
maxCapacity int
|
|
||||||
|
|
||||||
batchSize, cacheSize uint32
|
|
||||||
|
|
||||||
rebalanceFreq, evictDuration time.Duration
|
|
||||||
}
|
|
||||||
|
|
||||||
func defaultCfg() *cfg {
|
|
||||||
return &cfg{
|
|
||||||
log: &logger.Logger{Logger: zap.L()},
|
|
||||||
batchSize: 10,
|
|
||||||
cacheSize: 1024, // 1024 * address size = 1024 * 64 = 64 MiB
|
|
||||||
rebalanceFreq: 1 * time.Second,
|
|
||||||
evictDuration: 30 * time.Second,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// New creates, initializes and returns Policer instance.
|
// New creates, initializes and returns Policer instance.
|
||||||
func New(opts ...Option) *Policer {
|
func New(opts ...Option) *Policer {
|
||||||
c := defaultCfg()
|
c := defaultCfg()
|
||||||
|
@ -129,91 +68,3 @@ func New(opts ...Option) *Policer {
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// WithHeadTimeout returns option to set Head timeout of Policer.
|
|
||||||
func WithHeadTimeout(v time.Duration) Option {
|
|
||||||
return func(c *cfg) {
|
|
||||||
c.headTimeout = v
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// WithLogger returns option to set Logger of Policer.
|
|
||||||
func WithLogger(v *logger.Logger) Option {
|
|
||||||
return func(c *cfg) {
|
|
||||||
c.log = v
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// WithLocalStorage returns option to set local object storage of Policer.
|
|
||||||
func WithLocalStorage(v *engine.StorageEngine) Option {
|
|
||||||
return func(c *cfg) {
|
|
||||||
c.jobQueue.localStorage = v
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// WithContainerSource returns option to set container source of Policer.
|
|
||||||
func WithContainerSource(v container.Source) Option {
|
|
||||||
return func(c *cfg) {
|
|
||||||
c.cnrSrc = v
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// WithPlacementBuilder returns option to set object placement builder of Policer.
|
|
||||||
func WithPlacementBuilder(v placement.Builder) Option {
|
|
||||||
return func(c *cfg) {
|
|
||||||
c.placementBuilder = v
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// WithRemoteHeader returns option to set object header receiver of Policer.
|
|
||||||
func WithRemoteHeader(v *headsvc.RemoteHeader) Option {
|
|
||||||
return func(c *cfg) {
|
|
||||||
c.remoteHeader = v
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// WithNetmapKeys returns option to set tool to work with announced public keys.
|
|
||||||
func WithNetmapKeys(v netmap.AnnouncedKeys) Option {
|
|
||||||
return func(c *cfg) {
|
|
||||||
c.netmapKeys = v
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// WithReplicator returns option to set object replicator of Policer.
|
|
||||||
func WithReplicator(v *replicator.Replicator) Option {
|
|
||||||
return func(c *cfg) {
|
|
||||||
c.replicator = v
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// WithRedundantCopyCallback returns option to set
|
|
||||||
// callback to pass redundant local object copies
|
|
||||||
// detected by Policer.
|
|
||||||
func WithRedundantCopyCallback(cb RedundantCopyCallback) Option {
|
|
||||||
return func(c *cfg) {
|
|
||||||
c.cbRedundantCopy = cb
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// WithMaxCapacity returns option to set max capacity
|
|
||||||
// that can be set to the pool.
|
|
||||||
func WithMaxCapacity(capacity int) Option {
|
|
||||||
return func(c *cfg) {
|
|
||||||
c.maxCapacity = capacity
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// WithPool returns option to set pool for
|
|
||||||
// policy and replication operations.
|
|
||||||
func WithPool(p *ants.Pool) Option {
|
|
||||||
return func(c *cfg) {
|
|
||||||
c.taskPool = p
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// WithNodeLoader returns option to set FrostFS node load source.
|
|
||||||
func WithNodeLoader(l nodeLoader) Option {
|
|
||||||
return func(c *cfg) {
|
|
||||||
c.loader = l
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
299
pkg/services/policer/policer_test.go
Normal file
299
pkg/services/policer/policer_test.go
Normal file
|
@ -0,0 +1,299 @@
|
||||||
|
package policer
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"sort"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
||||||
|
objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator"
|
||||||
|
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||||
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
|
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
|
||||||
|
"github.com/panjf2000/ants/v2"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestBuryObjectWithoutContainer(t *testing.T) {
|
||||||
|
// Key space
|
||||||
|
addr := oidtest.Address()
|
||||||
|
objs := []objectcore.AddressWithType{
|
||||||
|
{
|
||||||
|
Address: addr,
|
||||||
|
Type: object.TypeRegular,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
// Container source and bury function
|
||||||
|
buryCh := make(chan oid.Address)
|
||||||
|
containerSrc := func(id cid.ID) (*container.Container, error) {
|
||||||
|
return nil, apistatus.ContainerNotFound{}
|
||||||
|
}
|
||||||
|
buryFn := func(ctx context.Context, a oid.Address) error {
|
||||||
|
buryCh <- a
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Task pool
|
||||||
|
pool, err := ants.NewPool(4)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// Policer instance
|
||||||
|
p := New(
|
||||||
|
WithKeySpaceIterator(&sliceKeySpaceIterator{objs: objs}),
|
||||||
|
WithContainerSource(containerSrcFunc(containerSrc)),
|
||||||
|
WithBuryFunc(buryFn),
|
||||||
|
WithPool(pool),
|
||||||
|
WithNodeLoader(constNodeLoader(0)),
|
||||||
|
)
|
||||||
|
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
defer cancel()
|
||||||
|
go p.Run(ctx)
|
||||||
|
|
||||||
|
require.Equal(t, addr, <-buryCh)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestProcessObject(t *testing.T) {
|
||||||
|
// Notes:
|
||||||
|
// - nodes are referred to by their index throughout, which is embedded in the public key
|
||||||
|
// - node with index 0 always refers to the local node, so there's no need to add it to objHolders
|
||||||
|
// - policy is used only to match the number of replicas for each index in the placement
|
||||||
|
tests := []struct {
|
||||||
|
desc string
|
||||||
|
objType object.Type
|
||||||
|
nodeCount int
|
||||||
|
policy string
|
||||||
|
placement [][]int
|
||||||
|
objHolders []int
|
||||||
|
maintenanceNodes []int
|
||||||
|
wantRemoveRedundant bool
|
||||||
|
wantReplicateTo []int
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
desc: "1 copy already held by local node",
|
||||||
|
nodeCount: 1,
|
||||||
|
policy: `REP 1`,
|
||||||
|
placement: [][]int{{0}},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
desc: "1 copy already held by the remote node",
|
||||||
|
nodeCount: 2,
|
||||||
|
policy: `REP 1`,
|
||||||
|
placement: [][]int{{1}},
|
||||||
|
objHolders: []int{1},
|
||||||
|
wantRemoveRedundant: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
desc: "1 copy not yet held by the remote node",
|
||||||
|
nodeCount: 2,
|
||||||
|
policy: `REP 1`,
|
||||||
|
placement: [][]int{{1}},
|
||||||
|
wantReplicateTo: []int{1},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
desc: "2 copies already held by local and remote node",
|
||||||
|
nodeCount: 2,
|
||||||
|
policy: `REP 2`,
|
||||||
|
placement: [][]int{{0, 1}},
|
||||||
|
objHolders: []int{1},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
desc: "2 copies but not held by remote node",
|
||||||
|
nodeCount: 2,
|
||||||
|
policy: `REP 2`,
|
||||||
|
placement: [][]int{{0, 1}},
|
||||||
|
wantReplicateTo: []int{1},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
desc: "multiple vectors already held by remote node",
|
||||||
|
nodeCount: 2,
|
||||||
|
policy: `REP 2 REP 2`,
|
||||||
|
placement: [][]int{{0, 1}, {0, 1}},
|
||||||
|
objHolders: []int{1},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
desc: "multiple vectors not yet held by remote node",
|
||||||
|
nodeCount: 2,
|
||||||
|
policy: `REP 2 REP 2`,
|
||||||
|
placement: [][]int{{0, 1}, {0, 1}},
|
||||||
|
wantReplicateTo: []int{1, 1}, // is this actually good?
|
||||||
|
},
|
||||||
|
{
|
||||||
|
desc: "lock object must be replicated to all nodes",
|
||||||
|
objType: object.TypeLock,
|
||||||
|
nodeCount: 3,
|
||||||
|
policy: `REP 1`,
|
||||||
|
placement: [][]int{{0, 1, 2}},
|
||||||
|
wantReplicateTo: []int{1, 2},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
desc: "preserve local copy when maintenance nodes exist",
|
||||||
|
nodeCount: 3,
|
||||||
|
policy: `REP 2`,
|
||||||
|
placement: [][]int{{1, 2}},
|
||||||
|
objHolders: []int{1},
|
||||||
|
maintenanceNodes: []int{2},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := range tests {
|
||||||
|
ti := tests[i]
|
||||||
|
t.Run(ti.desc, func(t *testing.T) {
|
||||||
|
addr := oidtest.Address()
|
||||||
|
|
||||||
|
// Netmap, placement policy and placement builder
|
||||||
|
nodes := make([]netmap.NodeInfo, ti.nodeCount)
|
||||||
|
for i := range nodes {
|
||||||
|
nodes[i].SetPublicKey([]byte{byte(i)})
|
||||||
|
}
|
||||||
|
for _, i := range ti.maintenanceNodes {
|
||||||
|
nodes[i].SetMaintenance()
|
||||||
|
}
|
||||||
|
|
||||||
|
var policy netmap.PlacementPolicy
|
||||||
|
require.NoError(t, policy.DecodeString(ti.policy))
|
||||||
|
|
||||||
|
placementVectors := make([][]netmap.NodeInfo, len(ti.placement))
|
||||||
|
for i, pv := range ti.placement {
|
||||||
|
for _, nj := range pv {
|
||||||
|
placementVectors[i] = append(placementVectors[i], nodes[nj])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
placementBuilder := func(cnr cid.ID, obj *oid.ID, p netmap.PlacementPolicy) ([][]netmap.NodeInfo, error) {
|
||||||
|
if cnr.Equals(addr.Container()) && obj != nil && obj.Equals(addr.Object()) {
|
||||||
|
return placementVectors, nil
|
||||||
|
}
|
||||||
|
t.Errorf("unexpected placement build: cid=%v oid=%v", cnr, obj)
|
||||||
|
return nil, errors.New("unexpected placement build")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Object remote header
|
||||||
|
headFn := func(_ context.Context, ni netmap.NodeInfo, a oid.Address) (*object.Object, error) {
|
||||||
|
index := int(ni.PublicKey()[0])
|
||||||
|
if a != addr || index < 1 || index >= ti.nodeCount {
|
||||||
|
t.Errorf("unexpected remote object head: node=%+v addr=%v", ni, a)
|
||||||
|
return nil, errors.New("unexpected object head")
|
||||||
|
}
|
||||||
|
for _, i := range ti.objHolders {
|
||||||
|
if index == i {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil, apistatus.ObjectNotFound{}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Container source
|
||||||
|
cnr := &container.Container{}
|
||||||
|
cnr.Value.Init()
|
||||||
|
cnr.Value.SetPlacementPolicy(policy)
|
||||||
|
containerSrc := func(id cid.ID) (*container.Container, error) {
|
||||||
|
if id.Equals(addr.Container()) {
|
||||||
|
return cnr, nil
|
||||||
|
}
|
||||||
|
t.Errorf("unexpected container requested: got=%v, want=%v", id, addr.Container())
|
||||||
|
return nil, apistatus.ContainerNotFound{}
|
||||||
|
}
|
||||||
|
buryFn := func(ctx context.Context, a oid.Address) error {
|
||||||
|
t.Errorf("unexpected object buried: %v", a)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Policer instance
|
||||||
|
var gotRemoveRedundant bool
|
||||||
|
var gotReplicateTo []int
|
||||||
|
|
||||||
|
p := New(
|
||||||
|
WithContainerSource(containerSrcFunc(containerSrc)),
|
||||||
|
WithPlacementBuilder(placementBuilderFunc(placementBuilder)),
|
||||||
|
WithNetmapKeys(announcedKeysFunc(func(k []byte) bool {
|
||||||
|
return bytes.Equal(k, nodes[0].PublicKey())
|
||||||
|
})),
|
||||||
|
WithRemoteObjectHeaderFunc(headFn),
|
||||||
|
WithBuryFunc(buryFn),
|
||||||
|
WithRedundantCopyCallback(func(_ context.Context, a oid.Address) {
|
||||||
|
require.True(t, eqAddr(a, addr), "unexpected redundant copy callback: a=%v", a)
|
||||||
|
gotRemoveRedundant = true
|
||||||
|
}),
|
||||||
|
WithReplicator(replicatorFunc(func(_ context.Context, task replicator.Task, res replicator.TaskResult) {
|
||||||
|
require.True(t, eqAddr(task.Addr, addr), "unexpected replicator task: %+v", task)
|
||||||
|
for _, node := range task.Nodes {
|
||||||
|
gotReplicateTo = append(gotReplicateTo, int(node.PublicKey()[0]))
|
||||||
|
}
|
||||||
|
})),
|
||||||
|
)
|
||||||
|
|
||||||
|
addrWithType := objectcore.AddressWithType{
|
||||||
|
Address: addr,
|
||||||
|
Type: ti.objType,
|
||||||
|
}
|
||||||
|
|
||||||
|
p.processObject(context.Background(), addrWithType)
|
||||||
|
sort.Ints(gotReplicateTo)
|
||||||
|
|
||||||
|
require.Equal(t, ti.wantRemoveRedundant, gotRemoveRedundant)
|
||||||
|
require.Equal(t, ti.wantReplicateTo, gotReplicateTo)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO(https://git.frostfs.info/TrueCloudLab/frostfs-sdk-go/issues/101)
|
||||||
|
func eqAddr(a, b oid.Address) bool {
|
||||||
|
return a.Container().Equals(b.Container()) && a.Object().Equals(b.Object())
|
||||||
|
}
|
||||||
|
|
||||||
|
// sliceKeySpaceIterator is a KeySpaceIterator backed by a slice.
|
||||||
|
type sliceKeySpaceIterator struct {
|
||||||
|
objs []objectcore.AddressWithType
|
||||||
|
cur int
|
||||||
|
}
|
||||||
|
|
||||||
|
func (it *sliceKeySpaceIterator) Next(_ context.Context, size uint32) ([]objectcore.AddressWithType, error) {
|
||||||
|
if it.cur >= len(it.objs) {
|
||||||
|
it.cur = 0
|
||||||
|
return nil, engine.ErrEndOfListing
|
||||||
|
}
|
||||||
|
end := it.cur + int(size)
|
||||||
|
if end > len(it.objs) {
|
||||||
|
end = len(it.objs)
|
||||||
|
}
|
||||||
|
ret := it.objs[it.cur:end]
|
||||||
|
it.cur = end
|
||||||
|
return ret, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// containerSrcFunc is a container.Source backed by a function.
|
||||||
|
type containerSrcFunc func(cid.ID) (*container.Container, error)
|
||||||
|
|
||||||
|
func (f containerSrcFunc) Get(id cid.ID) (*container.Container, error) { return f(id) }
|
||||||
|
|
||||||
|
// placementBuilderFunc is a placement.Builder backed by a function
|
||||||
|
type placementBuilderFunc func(cid.ID, *oid.ID, netmap.PlacementPolicy) ([][]netmap.NodeInfo, error)
|
||||||
|
|
||||||
|
func (f placementBuilderFunc) BuildPlacement(c cid.ID, o *oid.ID, p netmap.PlacementPolicy) ([][]netmap.NodeInfo, error) {
|
||||||
|
return f(c, o, p)
|
||||||
|
}
|
||||||
|
|
||||||
|
// announcedKeysFunc is a netmap.AnnouncedKeys backed by a function.
|
||||||
|
type announcedKeysFunc func([]byte) bool
|
||||||
|
|
||||||
|
func (f announcedKeysFunc) IsLocalKey(k []byte) bool { return f(k) }
|
||||||
|
|
||||||
|
// constNodeLoader is a nodeLoader that always returns a fixed value.
|
||||||
|
type constNodeLoader float64
|
||||||
|
|
||||||
|
func (f constNodeLoader) ObjectServiceLoad() float64 { return float64(f) }
|
||||||
|
|
||||||
|
// replicatorFunc is a Replicator backed by a function.
|
||||||
|
type replicatorFunc func(context.Context, replicator.Task, replicator.TaskResult)
|
||||||
|
|
||||||
|
func (f replicatorFunc) HandleTask(ctx context.Context, task replicator.Task, res replicator.TaskResult) {
|
||||||
|
f(ctx, task, res)
|
||||||
|
}
|
|
@ -6,27 +6,17 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||||
objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (p *Policer) Run(ctx context.Context) {
|
func (p *Policer) Run(ctx context.Context) {
|
||||||
defer func() {
|
|
||||||
p.log.Info(logs.PolicerRoutineStopped)
|
|
||||||
}()
|
|
||||||
|
|
||||||
go p.poolCapacityWorker(ctx)
|
go p.poolCapacityWorker(ctx)
|
||||||
p.shardPolicyWorker(ctx)
|
p.shardPolicyWorker(ctx)
|
||||||
|
p.log.Info(logs.PolicerRoutineStopped)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *Policer) shardPolicyWorker(ctx context.Context) {
|
func (p *Policer) shardPolicyWorker(ctx context.Context) {
|
||||||
var (
|
|
||||||
addrs []objectcore.AddressWithType
|
|
||||||
cursor *engine.Cursor
|
|
||||||
err error
|
|
||||||
)
|
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
|
@ -34,7 +24,7 @@ func (p *Policer) shardPolicyWorker(ctx context.Context) {
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
|
|
||||||
addrs, cursor, err = p.jobQueue.Select(ctx, cursor, p.batchSize)
|
addrs, err := p.keySpaceIterator.Next(ctx, p.batchSize)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if errors.Is(err, engine.ErrEndOfListing) {
|
if errors.Is(err, engine.ErrEndOfListing) {
|
||||||
time.Sleep(time.Second) // finished whole cycle, sleep a bit
|
time.Sleep(time.Second) // finished whole cycle, sleep a bit
|
||||||
|
@ -55,7 +45,7 @@ func (p *Policer) shardPolicyWorker(ctx context.Context) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
err = p.taskPool.Submit(func() {
|
err := p.taskPool.Submit(func() {
|
||||||
v, ok := p.cache.Get(addr.Address)
|
v, ok := p.cache.Get(addr.Address)
|
||||||
if ok && time.Since(v) < p.evictDuration {
|
if ok && time.Since(v) < p.evictDuration {
|
||||||
return
|
return
|
||||||
|
@ -78,10 +68,10 @@ func (p *Policer) shardPolicyWorker(ctx context.Context) {
|
||||||
|
|
||||||
func (p *Policer) poolCapacityWorker(ctx context.Context) {
|
func (p *Policer) poolCapacityWorker(ctx context.Context) {
|
||||||
ticker := time.NewTicker(p.rebalanceFreq)
|
ticker := time.NewTicker(p.rebalanceFreq)
|
||||||
|
defer ticker.Stop()
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
ticker.Stop()
|
|
||||||
return
|
return
|
||||||
case <-ticker.C:
|
case <-ticker.C:
|
||||||
frostfsSysLoad := p.loader.ObjectServiceLoad()
|
frostfsSysLoad := p.loader.ObjectServiceLoad()
|
||||||
|
|
|
@ -1,26 +0,0 @@
|
||||||
package policer
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"fmt"
|
|
||||||
|
|
||||||
objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
|
|
||||||
)
|
|
||||||
|
|
||||||
type jobQueue struct {
|
|
||||||
localStorage *engine.StorageEngine
|
|
||||||
}
|
|
||||||
|
|
||||||
func (q *jobQueue) Select(ctx context.Context, cursor *engine.Cursor, count uint32) ([]objectcore.AddressWithType, *engine.Cursor, error) {
|
|
||||||
var prm engine.ListWithCursorPrm
|
|
||||||
prm.WithCursor(cursor)
|
|
||||||
prm.WithCount(count)
|
|
||||||
|
|
||||||
res, err := q.localStorage.ListWithCursor(ctx, prm)
|
|
||||||
if err != nil {
|
|
||||||
return nil, nil, fmt.Errorf("cannot list objects in engine: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return res.AddressList(), res.Cursor(), nil
|
|
||||||
}
|
|
|
@ -24,16 +24,16 @@ func (p *Replicator) HandleTask(ctx context.Context, task Task, res TaskResult)
|
||||||
defer p.metrics.DecInFlightRequest()
|
defer p.metrics.DecInFlightRequest()
|
||||||
defer func() {
|
defer func() {
|
||||||
p.log.Debug(logs.ReplicatorFinishWork,
|
p.log.Debug(logs.ReplicatorFinishWork,
|
||||||
zap.Uint32("amount of unfinished replicas", task.quantity),
|
zap.Uint32("amount of unfinished replicas", task.NumCopies),
|
||||||
)
|
)
|
||||||
}()
|
}()
|
||||||
|
|
||||||
if task.obj == nil {
|
if task.Obj == nil {
|
||||||
var err error
|
var err error
|
||||||
task.obj, err = engine.Get(ctx, p.localStorage, task.addr)
|
task.Obj, err = engine.Get(ctx, p.localStorage, task.Addr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
p.log.Error(logs.ReplicatorCouldNotGetObjectFromLocalStorage,
|
p.log.Error(logs.ReplicatorCouldNotGetObjectFromLocalStorage,
|
||||||
zap.Stringer("object", task.addr),
|
zap.Stringer("object", task.Addr),
|
||||||
zap.Error(err))
|
zap.Error(err))
|
||||||
|
|
||||||
return
|
return
|
||||||
|
@ -41,9 +41,9 @@ func (p *Replicator) HandleTask(ctx context.Context, task Task, res TaskResult)
|
||||||
}
|
}
|
||||||
|
|
||||||
prm := new(putsvc.RemotePutPrm).
|
prm := new(putsvc.RemotePutPrm).
|
||||||
WithObject(task.obj)
|
WithObject(task.Obj)
|
||||||
|
|
||||||
for i := 0; task.quantity > 0 && i < len(task.nodes); i++ {
|
for i := 0; task.NumCopies > 0 && i < len(task.Nodes); i++ {
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return
|
return
|
||||||
|
@ -51,13 +51,13 @@ func (p *Replicator) HandleTask(ctx context.Context, task Task, res TaskResult)
|
||||||
}
|
}
|
||||||
|
|
||||||
log := p.log.With(
|
log := p.log.With(
|
||||||
zap.String("node", netmap.StringifyPublicKey(task.nodes[i])),
|
zap.String("node", netmap.StringifyPublicKey(task.Nodes[i])),
|
||||||
zap.Stringer("object", task.addr),
|
zap.Stringer("object", task.Addr),
|
||||||
)
|
)
|
||||||
|
|
||||||
callCtx, cancel := context.WithTimeout(ctx, p.putTimeout)
|
callCtx, cancel := context.WithTimeout(ctx, p.putTimeout)
|
||||||
|
|
||||||
err := p.remoteSender.PutObject(callCtx, prm.WithNodeInfo(task.nodes[i]))
|
err := p.remoteSender.PutObject(callCtx, prm.WithNodeInfo(task.Nodes[i]))
|
||||||
|
|
||||||
cancel()
|
cancel()
|
||||||
|
|
||||||
|
@ -68,12 +68,12 @@ func (p *Replicator) HandleTask(ctx context.Context, task Task, res TaskResult)
|
||||||
} else {
|
} else {
|
||||||
log.Debug(logs.ReplicatorObjectSuccessfullyReplicated)
|
log.Debug(logs.ReplicatorObjectSuccessfullyReplicated)
|
||||||
|
|
||||||
task.quantity--
|
task.NumCopies--
|
||||||
|
|
||||||
res.SubmitSuccessfulReplication(task.nodes[i])
|
res.SubmitSuccessfulReplication(task.Nodes[i])
|
||||||
|
|
||||||
p.metrics.IncProcessedObjects()
|
p.metrics.IncProcessedObjects()
|
||||||
p.metrics.AddPayloadSize(int64(task.obj.PayloadSize()))
|
p.metrics.AddPayloadSize(int64(task.Obj.PayloadSize()))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -8,31 +8,12 @@ import (
|
||||||
|
|
||||||
// Task represents group of Replicator task parameters.
|
// Task represents group of Replicator task parameters.
|
||||||
type Task struct {
|
type Task struct {
|
||||||
quantity uint32
|
// NumCopies is the number of copies to replicate.
|
||||||
|
NumCopies uint32
|
||||||
addr oid.Address
|
// Addr is the address of the local object.
|
||||||
|
Addr oid.Address
|
||||||
obj *objectSDK.Object
|
// Obj is the object to avoid fetching it from the local storage.
|
||||||
|
Obj *objectSDK.Object
|
||||||
nodes []netmap.NodeInfo
|
// Nodes is a list of potential object holders.
|
||||||
}
|
Nodes []netmap.NodeInfo
|
||||||
|
|
||||||
// SetCopiesNumber sets number of copies to replicate.
|
|
||||||
func (t *Task) SetCopiesNumber(v uint32) {
|
|
||||||
t.quantity = v
|
|
||||||
}
|
|
||||||
|
|
||||||
// SetObjectAddress sets address of local object.
|
|
||||||
func (t *Task) SetObjectAddress(v oid.Address) {
|
|
||||||
t.addr = v
|
|
||||||
}
|
|
||||||
|
|
||||||
// SetObject sets object to avoid fetching it from the local storage.
|
|
||||||
func (t *Task) SetObject(obj *objectSDK.Object) {
|
|
||||||
t.obj = obj
|
|
||||||
}
|
|
||||||
|
|
||||||
// SetNodes sets a list of potential object holders.
|
|
||||||
func (t *Task) SetNodes(v []netmap.NodeInfo) {
|
|
||||||
t.nodes = v
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue