forked from TrueCloudLab/frostfs-node
186 lines
4.7 KiB
Go
186 lines
4.7 KiB
Go
|
package policer
|
||
|
|
||
|
import (
|
||
|
"context"
|
||
|
"time"
|
||
|
|
||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
|
||
|
objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
|
||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator"
|
||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||
|
netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||
|
"github.com/panjf2000/ants/v2"
|
||
|
"go.uber.org/zap"
|
||
|
)
|
||
|
|
||
|
// KeySpaceIterator is the interface that allows iterating over the key space
|
||
|
// of local storage.
|
||
|
// Note that the underlying implementation might be circular: i.e. it can restart
|
||
|
// when the end of the key space is reached.
|
||
|
type KeySpaceIterator interface {
|
||
|
Next(context.Context, uint32) ([]objectcore.AddressWithType, error)
|
||
|
}
|
||
|
|
||
|
// RedundantCopyCallback is a callback to pass
|
||
|
// the redundant local copy of the object.
|
||
|
type RedundantCopyCallback func(context.Context, oid.Address)
|
||
|
|
||
|
// BuryFunc is the function to bury (i.e. inhume) an object.
|
||
|
type BuryFunc func(context.Context, oid.Address) error
|
||
|
|
||
|
// Replicator is the interface to a consumer of replication tasks.
|
||
|
type Replicator interface {
|
||
|
HandleTask(ctx context.Context, task replicator.Task, res replicator.TaskResult)
|
||
|
}
|
||
|
|
||
|
// RemoteObjectHeaderFunc is the function to obtain HEAD info from a specific remote node.
|
||
|
type RemoteObjectHeaderFunc func(context.Context, netmapSDK.NodeInfo, oid.Address) (*objectSDK.Object, error)
|
||
|
|
||
|
// NodeLoader provides application load statistics.
|
||
|
type nodeLoader interface {
|
||
|
// ObjectServiceLoad returns object service load value in [0:1] range.
|
||
|
ObjectServiceLoad() float64
|
||
|
}
|
||
|
|
||
|
type cfg struct {
|
||
|
headTimeout time.Duration
|
||
|
|
||
|
log *logger.Logger
|
||
|
|
||
|
keySpaceIterator KeySpaceIterator
|
||
|
|
||
|
buryFn BuryFunc
|
||
|
|
||
|
cnrSrc container.Source
|
||
|
|
||
|
placementBuilder placement.Builder
|
||
|
|
||
|
remoteHeader RemoteObjectHeaderFunc
|
||
|
|
||
|
netmapKeys netmap.AnnouncedKeys
|
||
|
|
||
|
replicator Replicator
|
||
|
|
||
|
cbRedundantCopy RedundantCopyCallback
|
||
|
|
||
|
taskPool *ants.Pool
|
||
|
|
||
|
loader nodeLoader
|
||
|
|
||
|
maxCapacity int
|
||
|
|
||
|
batchSize, cacheSize uint32
|
||
|
|
||
|
rebalanceFreq, evictDuration time.Duration
|
||
|
}
|
||
|
|
||
|
func defaultCfg() *cfg {
|
||
|
return &cfg{
|
||
|
log: &logger.Logger{Logger: zap.L()},
|
||
|
batchSize: 10,
|
||
|
cacheSize: 1024, // 1024 * address size = 1024 * 64 = 64 MiB
|
||
|
rebalanceFreq: 1 * time.Second,
|
||
|
evictDuration: 30 * time.Second,
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// Option is an option for Policer constructor.
|
||
|
type Option func(*cfg)
|
||
|
|
||
|
// WithHeadTimeout returns option to set Head timeout of Policer.
|
||
|
func WithHeadTimeout(v time.Duration) Option {
|
||
|
return func(c *cfg) {
|
||
|
c.headTimeout = v
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// WithLogger returns option to set Logger of Policer.
|
||
|
func WithLogger(v *logger.Logger) Option {
|
||
|
return func(c *cfg) {
|
||
|
c.log = v
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func WithKeySpaceIterator(it KeySpaceIterator) Option {
|
||
|
return func(c *cfg) {
|
||
|
c.keySpaceIterator = it
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func WithBuryFunc(f BuryFunc) Option {
|
||
|
return func(c *cfg) {
|
||
|
c.buryFn = f
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// WithContainerSource returns option to set container source of Policer.
|
||
|
func WithContainerSource(v container.Source) Option {
|
||
|
return func(c *cfg) {
|
||
|
c.cnrSrc = v
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// WithPlacementBuilder returns option to set object placement builder of Policer.
|
||
|
func WithPlacementBuilder(v placement.Builder) Option {
|
||
|
return func(c *cfg) {
|
||
|
c.placementBuilder = v
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// WithRemoteObjectHeader returns option to set object header receiver of Policer.
|
||
|
func WithRemoteObjectHeaderFunc(v RemoteObjectHeaderFunc) Option {
|
||
|
return func(c *cfg) {
|
||
|
c.remoteHeader = v
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// WithNetmapKeys returns option to set tool to work with announced public keys.
|
||
|
func WithNetmapKeys(v netmap.AnnouncedKeys) Option {
|
||
|
return func(c *cfg) {
|
||
|
c.netmapKeys = v
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// WithReplicator returns option to set object replicator of Policer.
|
||
|
func WithReplicator(v Replicator) Option {
|
||
|
return func(c *cfg) {
|
||
|
c.replicator = v
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// WithRedundantCopyCallback returns option to set
|
||
|
// callback to pass redundant local object copies
|
||
|
// detected by Policer.
|
||
|
func WithRedundantCopyCallback(cb RedundantCopyCallback) Option {
|
||
|
return func(c *cfg) {
|
||
|
c.cbRedundantCopy = cb
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// WithMaxCapacity returns option to set max capacity
|
||
|
// that can be set to the pool.
|
||
|
func WithMaxCapacity(capacity int) Option {
|
||
|
return func(c *cfg) {
|
||
|
c.maxCapacity = capacity
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// WithPool returns option to set pool for
|
||
|
// policy and replication operations.
|
||
|
func WithPool(p *ants.Pool) Option {
|
||
|
return func(c *cfg) {
|
||
|
c.taskPool = p
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// WithNodeLoader returns option to set FrostFS node load source.
|
||
|
func WithNodeLoader(l nodeLoader) Option {
|
||
|
return func(c *cfg) {
|
||
|
c.loader = l
|
||
|
}
|
||
|
}
|