forked from TrueCloudLab/frostfs-node
[#108] services: Implement Policer service
Implement Policer service that performs background work to check compliance with the placement policy for local objects in the container. In the initial implementation, the selection of the working queue of objects is simplified, and there is no transfer of the result to the replicator. Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
This commit is contained in:
parent
f6e56aa956
commit
0dab4b7581
4 changed files with 385 additions and 0 deletions
103
pkg/services/policer/check.go
Normal file
103
pkg/services/policer/check.go
Normal file
|
@ -0,0 +1,103 @@
|
|||
package policer
|
||||
|
||||
import (
|
||||
"context"
|
||||
"strings"
|
||||
|
||||
"github.com/nspcc-dev/neofs-api-go/pkg/netmap"
|
||||
"github.com/nspcc-dev/neofs-api-go/pkg/object"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/network"
|
||||
headsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/head"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
func (p *Policer) processObject(ctx context.Context, addr *object.Address) {
|
||||
cnr, err := p.cnrSrc.Get(addr.GetContainerID())
|
||||
if err != nil {
|
||||
p.log.Error("could not get container",
|
||||
zap.String("error", err.Error()),
|
||||
)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
policy := cnr.GetPlacementPolicy()
|
||||
|
||||
nn, err := p.placementBuilder.BuildPlacement(addr, policy)
|
||||
if err != nil {
|
||||
p.log.Error("could not build placement vector for object",
|
||||
zap.String("error", err.Error()),
|
||||
)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
replicas := policy.GetReplicas()
|
||||
|
||||
for i := range nn {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
p.processNodes(ctx, addr, nn[i], replicas[i].GetCount())
|
||||
}
|
||||
}
|
||||
|
||||
func (p *Policer) processNodes(ctx context.Context, addr *object.Address, nodes netmap.Nodes, shortage uint32) {
|
||||
prm := new(headsvc.RemoteHeadPrm).WithObjectAddress(addr)
|
||||
|
||||
for i := 0; shortage > 0 && i < len(nodes); i++ {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
netAddr := nodes[i].NetworkAddress()
|
||||
|
||||
log := p.log.With(zap.String("node", netAddr))
|
||||
|
||||
node, err := network.AddressFromString(netAddr)
|
||||
if err != nil {
|
||||
log.Error("could not parse network address")
|
||||
|
||||
continue
|
||||
}
|
||||
|
||||
if network.IsLocalAddress(p.localAddrSrc, node) {
|
||||
shortage--
|
||||
} else {
|
||||
callCtx, cancel := context.WithTimeout(ctx, p.headTimeout)
|
||||
|
||||
_, err = p.remoteHeader.Head(callCtx, prm.WithNodeAddress(node))
|
||||
|
||||
cancel()
|
||||
|
||||
if err != nil {
|
||||
// FIXME: this is a temporary solution to resolve 404 response from remote node
|
||||
// We need to distinguish problem nodes from nodes without an object.
|
||||
if strings.Contains(err.Error(), headsvc.ErrNotFound.Error()) {
|
||||
continue
|
||||
} else {
|
||||
log.Error("could not receive object header",
|
||||
zap.String("error", err.Error()),
|
||||
)
|
||||
}
|
||||
} else {
|
||||
shortage--
|
||||
}
|
||||
}
|
||||
|
||||
nodes = append(nodes[:i], nodes[i+1:]...)
|
||||
i--
|
||||
}
|
||||
|
||||
if shortage > 0 {
|
||||
p.log.Info("shortage of object copies detected",
|
||||
zap.Uint32("shortage", shortage),
|
||||
)
|
||||
// TODO: send task to replicator
|
||||
}
|
||||
}
|
140
pkg/services/policer/policer.go
Normal file
140
pkg/services/policer/policer.go
Normal file
|
@ -0,0 +1,140 @@
|
|||
package policer
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/nspcc-dev/neofs-node/pkg/core/container"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/localstore"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/network"
|
||||
headsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/head"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/util/logger"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// Policer represents the utility that verifies
|
||||
// compliance with the object storage policy.
|
||||
type Policer struct {
|
||||
*cfg
|
||||
|
||||
prevTask prevTask
|
||||
}
|
||||
|
||||
// Option is an option for Policer constructor.
|
||||
type Option func(*cfg)
|
||||
|
||||
type cfg struct {
|
||||
headTimeout time.Duration
|
||||
|
||||
workScope workScope
|
||||
|
||||
log *logger.Logger
|
||||
|
||||
trigger <-chan *Task
|
||||
|
||||
jobQueue jobQueue
|
||||
|
||||
cnrSrc container.Source
|
||||
|
||||
placementBuilder placement.Builder
|
||||
|
||||
remoteHeader *headsvc.RemoteHeader
|
||||
|
||||
localAddrSrc network.LocalAddressSource
|
||||
}
|
||||
|
||||
func defaultCfg() *cfg {
|
||||
return &cfg{
|
||||
log: zap.L(),
|
||||
}
|
||||
}
|
||||
|
||||
// New creates, initializes and returns Policer instance.
|
||||
func New(opts ...Option) *Policer {
|
||||
c := defaultCfg()
|
||||
|
||||
for i := range opts {
|
||||
opts[i](c)
|
||||
}
|
||||
|
||||
c.log = c.log.With(zap.String("component", "Object Policer"))
|
||||
|
||||
return &Policer{
|
||||
cfg: c,
|
||||
prevTask: prevTask{
|
||||
cancel: func() {},
|
||||
wait: new(sync.WaitGroup),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// WithHeadTimeout returns option to set Head timeout of Policer.
|
||||
func WithHeadTimeout(v time.Duration) Option {
|
||||
return func(c *cfg) {
|
||||
c.headTimeout = v
|
||||
}
|
||||
}
|
||||
|
||||
// WithWorkScope returns option to set job work scope value of Policer.
|
||||
func WithWorkScope(v int) Option {
|
||||
return func(c *cfg) {
|
||||
c.workScope.val = v
|
||||
}
|
||||
}
|
||||
|
||||
// WithExpansionRate returns option to set expansion rate of Policer's works scope (in %).
|
||||
func WithExpansionRate(v int) Option {
|
||||
return func(c *cfg) {
|
||||
c.workScope.expRate = v
|
||||
}
|
||||
}
|
||||
|
||||
// WithTrigger returns option to set triggering channel of Policer.
|
||||
func WithTrigger(v <-chan *Task) Option {
|
||||
return func(c *cfg) {
|
||||
c.trigger = v
|
||||
}
|
||||
}
|
||||
|
||||
// WithLogger returns option to set Logger of Policer.
|
||||
func WithLogger(v *logger.Logger) Option {
|
||||
return func(c *cfg) {
|
||||
c.log = v
|
||||
}
|
||||
}
|
||||
|
||||
// WithLocalStorage returns option to set local object storage of Policer.
|
||||
func WithLocalStorage(v *localstore.Storage) Option {
|
||||
return func(c *cfg) {
|
||||
c.jobQueue.localStorage = v
|
||||
}
|
||||
}
|
||||
|
||||
// WithContainerSource returns option to set container source of Policer.
|
||||
func WithContainerSource(v container.Source) Option {
|
||||
return func(c *cfg) {
|
||||
c.cnrSrc = v
|
||||
}
|
||||
}
|
||||
|
||||
// WithPlacementBuilder returns option to set object placement builder of Policer.
|
||||
func WithPlacementBuilder(v placement.Builder) Option {
|
||||
return func(c *cfg) {
|
||||
c.placementBuilder = v
|
||||
}
|
||||
}
|
||||
|
||||
// WithRemoteHeader returns option to set object header receiver of Policer.
|
||||
func WithRemoteHeader(v *headsvc.RemoteHeader) Option {
|
||||
return func(c *cfg) {
|
||||
c.remoteHeader = v
|
||||
}
|
||||
}
|
||||
|
||||
// WithLocalAddressSource returns option to set local address source of Policer.
|
||||
func WithLocalAddressSource(v network.LocalAddressSource) Option {
|
||||
return func(c *cfg) {
|
||||
c.localAddrSrc = v
|
||||
}
|
||||
}
|
114
pkg/services/policer/process.go
Normal file
114
pkg/services/policer/process.go
Normal file
|
@ -0,0 +1,114 @@
|
|||
package policer
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// Task represents group of Policer tact parameters.
|
||||
type Task struct{}
|
||||
|
||||
type prevTask struct {
|
||||
undone int
|
||||
|
||||
cancel context.CancelFunc
|
||||
|
||||
wait *sync.WaitGroup
|
||||
}
|
||||
|
||||
type workScope struct {
|
||||
val int
|
||||
|
||||
expRate int // in %
|
||||
}
|
||||
|
||||
func (p *Policer) Run(ctx context.Context) {
|
||||
defer func() {
|
||||
p.log.Info("routine stopped")
|
||||
}()
|
||||
|
||||
p.log.Info("process routine",
|
||||
zap.Int("work scope value", p.workScope.val),
|
||||
zap.Int("expansion rate (%)", p.workScope.val),
|
||||
zap.Duration("head timeout", p.headTimeout),
|
||||
)
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
p.prevTask.cancel()
|
||||
|
||||
p.log.Warn("context is done",
|
||||
zap.String("error", ctx.Err().Error()),
|
||||
)
|
||||
|
||||
return
|
||||
case task, ok := <-p.trigger:
|
||||
if !ok {
|
||||
p.log.Warn("trigger channel is closed")
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
p.prevTask.cancel()
|
||||
p.prevTask.wait.Wait()
|
||||
|
||||
var taskCtx context.Context
|
||||
|
||||
taskCtx, p.prevTask.cancel = context.WithCancel(ctx)
|
||||
|
||||
go p.handleTask(taskCtx, task)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (p *Policer) handleTask(ctx context.Context, task *Task) {
|
||||
p.prevTask.wait.Add(1)
|
||||
|
||||
defer func() {
|
||||
p.prevTask.wait.Done()
|
||||
p.log.Info("finish work",
|
||||
zap.Int("amount of unfinished objects", p.prevTask.undone),
|
||||
)
|
||||
}()
|
||||
|
||||
var delta int
|
||||
|
||||
// undone - amount of objects we couldn't process in last epoch
|
||||
if p.prevTask.undone > 0 {
|
||||
// if there are unprocessed objects, then lower your estimation
|
||||
delta = -p.prevTask.undone
|
||||
} else {
|
||||
// otherwise try to expand
|
||||
delta = p.workScope.val * p.workScope.expRate / 100
|
||||
}
|
||||
|
||||
addrs, err := p.jobQueue.Select(p.workScope.val + delta)
|
||||
if err != nil {
|
||||
p.log.Warn("could not select objects",
|
||||
zap.String("error", err.Error()),
|
||||
)
|
||||
}
|
||||
|
||||
// if there are NOT enough objects to fill the pool, do not change it
|
||||
// otherwise expand or shrink it with the delta value
|
||||
if len(addrs) >= p.workScope.val+delta {
|
||||
p.workScope.val += delta
|
||||
}
|
||||
|
||||
p.prevTask.undone = len(addrs)
|
||||
|
||||
for i := range addrs {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
p.processObject(ctx, addrs[i])
|
||||
|
||||
p.prevTask.undone--
|
||||
}
|
||||
}
|
28
pkg/services/policer/queue.go
Normal file
28
pkg/services/policer/queue.go
Normal file
|
@ -0,0 +1,28 @@
|
|||
package policer
|
||||
|
||||
import (
|
||||
"github.com/nspcc-dev/neofs-api-go/pkg/object"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/localstore"
|
||||
)
|
||||
|
||||
type jobQueue struct {
|
||||
localStorage *localstore.Storage
|
||||
}
|
||||
|
||||
func (q *jobQueue) Select(limit int) ([]*object.Address, error) {
|
||||
// TODO: optimize the logic for selecting objects
|
||||
// We can prioritize objects for migration, newly arrived objects, etc.
|
||||
// It is recommended to make changes after updating the metabase
|
||||
|
||||
res := make([]*object.Address, 0, limit)
|
||||
|
||||
if err := q.localStorage.Iterate(nil, func(meta *localstore.ObjectMeta) bool {
|
||||
res = append(res, meta.Head().Address())
|
||||
|
||||
return len(res) >= limit
|
||||
}); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return res, nil
|
||||
}
|
Loading…
Reference in a new issue