214 lines
5.6 KiB
Go
214 lines
5.6 KiB
Go
package policer
|
|
|
|
import (
|
|
"context"
|
|
"time"
|
|
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
|
|
objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
|
netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
|
"github.com/panjf2000/ants/v2"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
// KeySpaceIterator is the interface that allows iterating over the key space
|
|
// of local storage.
|
|
// Note that the underlying implementation might be circular: i.e. it can restart
|
|
// when the end of the key space is reached.
|
|
type KeySpaceIterator interface {
|
|
Next(context.Context, uint32) ([]objectcore.Info, error)
|
|
Rewind()
|
|
}
|
|
|
|
// RedundantCopyCallback is a callback to pass
|
|
// the redundant local copy of the object.
|
|
type RedundantCopyCallback func(context.Context, oid.Address)
|
|
|
|
// BuryFunc is the function to bury (i.e. inhume) an object.
|
|
type BuryFunc func(context.Context, oid.Address) error
|
|
|
|
// Replicator is the interface to a consumer of replication tasks.
|
|
type Replicator interface {
|
|
HandleReplicationTask(ctx context.Context, task replicator.Task, res replicator.TaskResult)
|
|
HandlePullTask(ctx context.Context, task replicator.Task)
|
|
HandleLocalPutTask(ctx context.Context, task replicator.Task)
|
|
}
|
|
|
|
// RemoteObjectHeaderFunc is the function to obtain HEAD info from a specific remote node.
|
|
type RemoteObjectHeaderFunc func(context.Context, netmapSDK.NodeInfo, oid.Address, bool) (*objectSDK.Object, error)
|
|
|
|
// LocalObjectHeaderFunc is the function to obtain HEAD info from the current node.
|
|
type LocalObjectHeaderFunc func(context.Context, oid.Address) (*objectSDK.Object, error)
|
|
|
|
type RemoteObjectGetFunc func(context.Context, netmapSDK.NodeInfo, oid.Address) (*objectSDK.Object, error)
|
|
|
|
type LocalObjectGetFunc func(context.Context, oid.Address) (*objectSDK.Object, error)
|
|
|
|
type cfg struct {
|
|
headTimeout time.Duration
|
|
|
|
log *logger.Logger
|
|
|
|
keySpaceIterator KeySpaceIterator
|
|
|
|
buryFn BuryFunc
|
|
|
|
cnrSrc container.Source
|
|
|
|
placementBuilder placement.Builder
|
|
|
|
remoteHeader RemoteObjectHeaderFunc
|
|
|
|
localHeader LocalObjectHeaderFunc
|
|
|
|
netmapKeys netmap.AnnouncedKeys
|
|
|
|
replicator Replicator
|
|
|
|
cbRedundantCopy RedundantCopyCallback
|
|
|
|
taskPool *ants.Pool
|
|
|
|
batchSize, cacheSize uint32
|
|
|
|
evictDuration, sleepDuration time.Duration
|
|
|
|
metrics MetricsRegister
|
|
|
|
remoteObject RemoteObjectGetFunc
|
|
|
|
localObject LocalObjectGetFunc
|
|
|
|
keyStorage *util.KeyStorage
|
|
}
|
|
|
|
func defaultCfg() *cfg {
|
|
return &cfg{
|
|
log: &logger.Logger{Logger: zap.L()},
|
|
batchSize: 10,
|
|
cacheSize: 1024, // 1024 * address size = 1024 * 64 = 64 MiB
|
|
sleepDuration: 1 * time.Second,
|
|
evictDuration: 30 * time.Second,
|
|
metrics: noopMetrics{},
|
|
}
|
|
}
|
|
|
|
// Option is an option for Policer constructor.
|
|
type Option func(*cfg)
|
|
|
|
// WithHeadTimeout returns option to set Head timeout of Policer.
|
|
func WithHeadTimeout(v time.Duration) Option {
|
|
return func(c *cfg) {
|
|
c.headTimeout = v
|
|
}
|
|
}
|
|
|
|
// WithLogger returns option to set Logger of Policer.
|
|
func WithLogger(v *logger.Logger) Option {
|
|
return func(c *cfg) {
|
|
c.log = v
|
|
}
|
|
}
|
|
|
|
func WithKeySpaceIterator(it KeySpaceIterator) Option {
|
|
return func(c *cfg) {
|
|
c.keySpaceIterator = it
|
|
}
|
|
}
|
|
|
|
func WithBuryFunc(f BuryFunc) Option {
|
|
return func(c *cfg) {
|
|
c.buryFn = f
|
|
}
|
|
}
|
|
|
|
// WithContainerSource returns option to set container source of Policer.
|
|
func WithContainerSource(v container.Source) Option {
|
|
return func(c *cfg) {
|
|
c.cnrSrc = v
|
|
}
|
|
}
|
|
|
|
// WithPlacementBuilder returns option to set object placement builder of Policer.
|
|
func WithPlacementBuilder(v placement.Builder) Option {
|
|
return func(c *cfg) {
|
|
c.placementBuilder = v
|
|
}
|
|
}
|
|
|
|
// WithRemoteObjectHeaderFunc returns option to set remote object header receiver of Policer.
|
|
func WithRemoteObjectHeaderFunc(v RemoteObjectHeaderFunc) Option {
|
|
return func(c *cfg) {
|
|
c.remoteHeader = v
|
|
}
|
|
}
|
|
|
|
// WithLocalObjectHeaderFunc returns option to set local object header receiver of Policer.
|
|
func WithLocalObjectHeaderFunc(v LocalObjectHeaderFunc) Option {
|
|
return func(c *cfg) {
|
|
c.localHeader = v
|
|
}
|
|
}
|
|
|
|
func WithRemoteObjectGetFunc(v RemoteObjectGetFunc) Option {
|
|
return func(c *cfg) {
|
|
c.remoteObject = v
|
|
}
|
|
}
|
|
|
|
func WithLocalObjectGetFunc(v LocalObjectGetFunc) Option {
|
|
return func(c *cfg) {
|
|
c.localObject = v
|
|
}
|
|
}
|
|
|
|
// WithNetmapKeys returns option to set tool to work with announced public keys.
|
|
func WithNetmapKeys(v netmap.AnnouncedKeys) Option {
|
|
return func(c *cfg) {
|
|
c.netmapKeys = v
|
|
}
|
|
}
|
|
|
|
// WithReplicator returns option to set object replicator of Policer.
|
|
func WithReplicator(v Replicator) Option {
|
|
return func(c *cfg) {
|
|
c.replicator = v
|
|
}
|
|
}
|
|
|
|
// WithRedundantCopyCallback returns option to set
|
|
// callback to pass redundant local object copies
|
|
// detected by Policer.
|
|
func WithRedundantCopyCallback(cb RedundantCopyCallback) Option {
|
|
return func(c *cfg) {
|
|
c.cbRedundantCopy = cb
|
|
}
|
|
}
|
|
|
|
// WithPool returns option to set pool for
|
|
// policy and replication operations.
|
|
func WithPool(p *ants.Pool) Option {
|
|
return func(c *cfg) {
|
|
c.taskPool = p
|
|
}
|
|
}
|
|
|
|
// WithMetrics returns option to set metrics.
|
|
func WithMetrics(m MetricsRegister) Option {
|
|
return func(c *cfg) {
|
|
c.metrics = m
|
|
}
|
|
}
|
|
|
|
func WithKeyStorage(ks *util.KeyStorage) Option {
|
|
return func(c *cfg) {
|
|
c.keyStorage = ks
|
|
}
|
|
}
|