Refactor replicator #517

Merged
fyrchik merged 3 commits from dstepanov-yadro/frostfs-node:feat/replicator-put-single into master 2023-07-18 10:52:13 +00:00
11 changed files with 88 additions and 56 deletions

View file

@ -996,13 +996,6 @@ func (c *cfg) needBootstrap() bool {
return c.cfgNetmap.needBootstrap return c.cfgNetmap.needBootstrap
} }
// ObjectServiceLoad implements system loader interface for policer component.
// It is calculated as size/capacity ratio of "remote object put" worker.
// Returns float value between 0.0 and 1.0.
func (c *cfg) ObjectServiceLoad() float64 {
return float64(c.cfgObject.pool.putRemote.Running()) / float64(c.cfgObject.pool.putRemoteCapacity)
}
type dCmp struct { type dCmp struct {
name string name string
reloadFunc func() error reloadFunc func() error

View file

@ -259,7 +259,6 @@ func addPolicer(c *cfg, keyStorage *util.KeyStorage, clientConstructor *cache.Cl
}), }),
policer.WithMaxCapacity(c.cfgObject.pool.replicatorPoolSize), policer.WithMaxCapacity(c.cfgObject.pool.replicatorPoolSize),
policer.WithPool(c.cfgObject.pool.replication), policer.WithPool(c.cfgObject.pool.replication),
policer.WithNodeLoader(c),
) )
c.workers = append(c.workers, worker{ c.workers = append(c.workers, worker{

View file

@ -52,7 +52,6 @@ const (
PolicerRoutineStopped = "routine stopped" // Info in ../node/pkg/services/policer/process.go PolicerRoutineStopped = "routine stopped" // Info in ../node/pkg/services/policer/process.go
PolicerFailureAtObjectSelectForReplication = "failure at object select for replication" // Warn in ../node/pkg/services/policer/process.go PolicerFailureAtObjectSelectForReplication = "failure at object select for replication" // Warn in ../node/pkg/services/policer/process.go
PolicerPoolSubmission = "pool submission" // Warn in ../node/pkg/services/policer/process.go PolicerPoolSubmission = "pool submission" // Warn in ../node/pkg/services/policer/process.go
PolicerTuneReplicationCapacity = "tune replication capacity" // Debug in ../node/pkg/services/policer/process.go
ReplicatorFinishWork = "finish work" // Debug in ../node/pkg/services/replicator/process.go ReplicatorFinishWork = "finish work" // Debug in ../node/pkg/services/replicator/process.go
ReplicatorCouldNotGetObjectFromLocalStorage = "could not get object from local storage" // Error in ../node/pkg/services/replicator/process.go ReplicatorCouldNotGetObjectFromLocalStorage = "could not get object from local storage" // Error in ../node/pkg/services/replicator/process.go
ReplicatorCouldNotReplicateObject = "could not replicate object" // Error in ../node/pkg/services/replicator/process.go ReplicatorCouldNotReplicateObject = "could not replicate object" // Error in ../node/pkg/services/replicator/process.go

View file

@ -13,6 +13,7 @@ import (
type Client interface { type Client interface {
ContainerAnnounceUsedSpace(context.Context, client.PrmAnnounceSpace) (*client.ResAnnounceSpace, error) ContainerAnnounceUsedSpace(context.Context, client.PrmAnnounceSpace) (*client.ResAnnounceSpace, error)
ObjectPutInit(context.Context, client.PrmObjectPutInit) (client.ObjectWriter, error) ObjectPutInit(context.Context, client.PrmObjectPutInit) (client.ObjectWriter, error)
ObjectPutSingle(context.Context, client.PrmObjectPutSingle) (*client.ResObjectPutSingle, error)
ObjectDelete(context.Context, client.PrmObjectDelete) (*client.ResObjectDelete, error) ObjectDelete(context.Context, client.PrmObjectDelete) (*client.ResObjectDelete, error)
ObjectGetInit(context.Context, client.PrmObjectGet) (*client.ObjectReader, error) ObjectGetInit(context.Context, client.PrmObjectGet) (*client.ObjectReader, error)
ObjectHead(context.Context, client.PrmObjectHead) (*client.ResObjectHead, error) ObjectHead(context.Context, client.PrmObjectHead) (*client.ResObjectHead, error)

View file

@ -228,6 +228,15 @@ func (x *multiClient) ObjectPutInit(ctx context.Context, p client.PrmObjectPutIn
return return
} }
func (x *multiClient) ObjectPutSingle(ctx context.Context, p client.PrmObjectPutSingle) (res *client.ResObjectPutSingle, err error) {
err = x.iterateClients(ctx, func(c clientcore.Client) error {
res, err = c.ObjectPutSingle(ctx, p)
return err
})
return
}
func (x *multiClient) ContainerAnnounceUsedSpace(ctx context.Context, prm client.PrmAnnounceSpace) (res *client.ResAnnounceSpace, err error) { func (x *multiClient) ContainerAnnounceUsedSpace(ctx context.Context, prm client.PrmAnnounceSpace) (res *client.ResAnnounceSpace, err error) {
err = x.iterateClients(ctx, func(c clientcore.Client) error { err = x.iterateClients(ctx, func(c clientcore.Client) error {
res, err = c.ContainerAnnounceUsedSpace(ctx, prm) res, err = c.ContainerAnnounceUsedSpace(ctx, prm)

View file

@ -449,6 +449,54 @@ func PutObject(ctx context.Context, prm PutObjectPrm) (*PutObjectRes, error) {
}, nil }, nil
} }
// PutObjectSingle saves the object in local storage of the remote node with PutSingle RPC.
//
// Client and key must be set.
//
// Returns any error which prevented the operation from completing correctly in error return.
func PutObjectSingle(ctx context.Context, prm PutObjectPrm) (*PutObjectRes, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "client.PutObjectSingle")
defer span.End()
objID, isSet := prm.obj.ID()
if !isSet {
return nil, errors.New("missing object id")
ale64bit marked this conversation as resolved Outdated

errors.New?

`errors.New`?

fixed

fixed
}
var prmCli client.PrmObjectPutSingle
prmCli.ExecuteLocal()
if prm.key != nil {
prmCli.UseKey(prm.key)
}
if prm.tokenSession != nil {
prmCli.WithinSession(*prm.tokenSession)
}
if prm.tokenBearer != nil {
prmCli.WithBearerToken(*prm.tokenBearer)
}
prmCli.WithXHeaders(prm.xHeaders...)
prmCli.SetObject(prm.obj.ToV2())
res, err := prm.cli.ObjectPutSingle(ctx, prmCli)
if err != nil {
ReportError(prm.cli, err)
return nil, fmt.Errorf("put single object on client: %w", err)
}
if err = apistatus.ErrFromStatus(res.Status()); err != nil {
return nil, fmt.Errorf("put single object via client: %w", err)
ale64bit marked this conversation as resolved Outdated

discussed offline

discussed offline

fixed

fixed
}
return &PutObjectRes{
id: objID,
}, nil
}
// SearchObjectsPrm groups parameters of SearchObjects operation. // SearchObjectsPrm groups parameters of SearchObjects operation.
type SearchObjectsPrm struct { type SearchObjectsPrm struct {
readPrmCommon readPrmCommon

View file

@ -13,6 +13,8 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/transformer" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/transformer"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
) )
type remoteTarget struct { type remoteTarget struct {
@ -63,6 +65,15 @@ func (t *remoteTarget) Close(ctx context.Context) (*transformer.AccessIdentifier
prm.SetXHeaders(t.commonPrm.XHeaders()) prm.SetXHeaders(t.commonPrm.XHeaders())
prm.SetObject(t.obj) prm.SetObject(t.obj)
res, err := t.putSingle(ctx, prm)
if status.Code(err) != codes.Unimplemented {

remoteTarget is used by node only, so I would expect only prepared objects here.
Can this condition be false?

`remoteTarget` is used by node only, so I would expect only prepared objects here. Can this condition be false?

fixed

fixed
return res, err
}
return t.putStream(ctx, prm)
}
func (t *remoteTarget) putStream(ctx context.Context, prm internalclient.PutObjectPrm) (*transformer.AccessIdentifiers, error) {
res, err := internalclient.PutObject(ctx, prm) res, err := internalclient.PutObject(ctx, prm)
if err != nil { if err != nil {
return nil, fmt.Errorf("(%T) could not put object to %s: %w", t, t.nodeInfo.AddressGroup(), err) return nil, fmt.Errorf("(%T) could not put object to %s: %w", t, t.nodeInfo.AddressGroup(), err)
@ -71,6 +82,15 @@ func (t *remoteTarget) Close(ctx context.Context) (*transformer.AccessIdentifier
return &transformer.AccessIdentifiers{SelfID: res.ID()}, nil return &transformer.AccessIdentifiers{SelfID: res.ID()}, nil
} }
func (t *remoteTarget) putSingle(ctx context.Context, prm internalclient.PutObjectPrm) (*transformer.AccessIdentifiers, error) {
res, err := internalclient.PutObjectSingle(ctx, prm)
if err != nil {
return nil, fmt.Errorf("(%T) could not put single object to %s: %w", t, t.nodeInfo.AddressGroup(), err)
}
return &transformer.AccessIdentifiers{SelfID: res.ID()}, nil
}
// NewRemoteSender creates, initializes and returns new RemoteSender instance. // NewRemoteSender creates, initializes and returns new RemoteSender instance.
func NewRemoteSender(keyStorage *util.KeyStorage, cons ClientConstructor) *RemoteSender { func NewRemoteSender(keyStorage *util.KeyStorage, cons ClientConstructor) *RemoteSender {
return &RemoteSender{ return &RemoteSender{

View file

@ -41,12 +41,6 @@ type Replicator interface {
// RemoteObjectHeaderFunc is the function to obtain HEAD info from a specific remote node. // RemoteObjectHeaderFunc is the function to obtain HEAD info from a specific remote node.
type RemoteObjectHeaderFunc func(context.Context, netmapSDK.NodeInfo, oid.Address) (*objectSDK.Object, error) type RemoteObjectHeaderFunc func(context.Context, netmapSDK.NodeInfo, oid.Address) (*objectSDK.Object, error)
// NodeLoader provides application load statistics.
type nodeLoader interface {
// ObjectServiceLoad returns object service load value in [0:1] range.
ObjectServiceLoad() float64
}
type cfg struct { type cfg struct {
headTimeout time.Duration headTimeout time.Duration
@ -70,8 +64,6 @@ type cfg struct {
taskPool *ants.Pool taskPool *ants.Pool
loader nodeLoader
maxCapacity int maxCapacity int
batchSize, cacheSize uint32 batchSize, cacheSize uint32
@ -178,10 +170,3 @@ func WithPool(p *ants.Pool) Option {
c.taskPool = p c.taskPool = p
} }
} }
// WithNodeLoader returns option to set FrostFS node load source.
func WithNodeLoader(l nodeLoader) Option {
return func(c *cfg) {
c.loader = l
}
}

View file

@ -52,7 +52,6 @@ func TestBuryObjectWithoutContainer(t *testing.T) {
WithContainerSource(containerSrcFunc(containerSrc)), WithContainerSource(containerSrcFunc(containerSrc)),
WithBuryFunc(buryFn), WithBuryFunc(buryFn),
WithPool(pool), WithPool(pool),
WithNodeLoader(constNodeLoader(0)),
) )
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
@ -279,7 +278,6 @@ func TestIteratorContract(t *testing.T) {
WithContainerSource(containerSrcFunc(containerSrc)), WithContainerSource(containerSrcFunc(containerSrc)),
WithBuryFunc(buryFn), WithBuryFunc(buryFn),
WithPool(pool), WithPool(pool),
WithNodeLoader(constNodeLoader(0)),
func(c *cfg) { func(c *cfg) {
c.sleepDuration = time.Millisecond c.sleepDuration = time.Millisecond
}, },
@ -377,11 +375,6 @@ type announcedKeysFunc func([]byte) bool
func (f announcedKeysFunc) IsLocalKey(k []byte) bool { return f(k) } func (f announcedKeysFunc) IsLocalKey(k []byte) bool { return f(k) }
// constNodeLoader is a nodeLoader that always returns a fixed value.
type constNodeLoader float64
func (f constNodeLoader) ObjectServiceLoad() float64 { return float64(f) }
// replicatorFunc is a Replicator backed by a function. // replicatorFunc is a Replicator backed by a function.
type replicatorFunc func(context.Context, replicator.Task, replicator.TaskResult) type replicatorFunc func(context.Context, replicator.Task, replicator.TaskResult)

View file

@ -11,7 +11,6 @@ import (
) )
func (p *Policer) Run(ctx context.Context) { func (p *Policer) Run(ctx context.Context) {
go p.poolCapacityWorker(ctx)
p.shardPolicyWorker(ctx) p.shardPolicyWorker(ctx)
p.log.Info(logs.PolicerRoutineStopped) p.log.Info(logs.PolicerRoutineStopped)
} }
@ -65,27 +64,3 @@ func (p *Policer) shardPolicyWorker(ctx context.Context) {
} }
} }
} }
func (p *Policer) poolCapacityWorker(ctx context.Context) {
ticker := time.NewTicker(p.rebalanceFreq)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
frostfsSysLoad := p.loader.ObjectServiceLoad()
newCapacity := int((1.0 - frostfsSysLoad) * float64(p.maxCapacity))
if newCapacity == 0 {
newCapacity++
}
if p.taskPool.Cap() != newCapacity {
p.taskPool.Tune(newCapacity)
p.log.Debug(logs.PolicerTuneReplicationCapacity,
zap.Float64("system_load", frostfsSysLoad),
zap.Int("new_capacity", newCapacity))
}
}
}
}

View file

@ -6,7 +6,10 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
putsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/put" putsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/put"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap" "go.uber.org/zap"
) )
@ -28,6 +31,13 @@ func (p *Replicator) HandleTask(ctx context.Context, task Task, res TaskResult)
) )
}() }()
ctx, span := tracing.StartSpanFromContext(ctx, "Replicator.HandleTask",
trace.WithAttributes(
attribute.Stringer("address", task.Addr),
attribute.Int64("number_of_copies", int64(task.NumCopies)),
))
defer span.End()
if task.Obj == nil { if task.Obj == nil {
var err error var err error
task.Obj, err = engine.Get(ctx, p.localStorage, task.Addr) task.Obj, err = engine.Get(ctx, p.localStorage, task.Addr)