From b61ae94b712ee757d5019837489847b51697d762 Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Wed, 12 Jul 2023 17:47:42 +0300 Subject: [PATCH 1/3] [#463] putsvc: Use PutSingle RPC for remote target Signed-off-by: Dmitrii Stepanov --- pkg/core/client/client.go | 1 + pkg/network/cache/multi.go | 9 ++++ pkg/services/object/internal/client/client.go | 48 +++++++++++++++++++ pkg/services/object/put/remote.go | 20 ++++++++ 4 files changed, 78 insertions(+) diff --git a/pkg/core/client/client.go b/pkg/core/client/client.go index 422ca1a1a..8c92901f2 100644 --- a/pkg/core/client/client.go +++ b/pkg/core/client/client.go @@ -13,6 +13,7 @@ import ( type Client interface { ContainerAnnounceUsedSpace(context.Context, client.PrmAnnounceSpace) (*client.ResAnnounceSpace, error) ObjectPutInit(context.Context, client.PrmObjectPutInit) (client.ObjectWriter, error) + ObjectPutSingle(context.Context, client.PrmObjectPutSingle) (*client.ResObjectPutSingle, error) ObjectDelete(context.Context, client.PrmObjectDelete) (*client.ResObjectDelete, error) ObjectGetInit(context.Context, client.PrmObjectGet) (*client.ObjectReader, error) ObjectHead(context.Context, client.PrmObjectHead) (*client.ResObjectHead, error) diff --git a/pkg/network/cache/multi.go b/pkg/network/cache/multi.go index 18155849b..98d2f33e7 100644 --- a/pkg/network/cache/multi.go +++ b/pkg/network/cache/multi.go @@ -228,6 +228,15 @@ func (x *multiClient) ObjectPutInit(ctx context.Context, p client.PrmObjectPutIn return } +func (x *multiClient) ObjectPutSingle(ctx context.Context, p client.PrmObjectPutSingle) (res *client.ResObjectPutSingle, err error) { + err = x.iterateClients(ctx, func(c clientcore.Client) error { + res, err = c.ObjectPutSingle(ctx, p) + return err + }) + + return +} + func (x *multiClient) ContainerAnnounceUsedSpace(ctx context.Context, prm client.PrmAnnounceSpace) (res *client.ResAnnounceSpace, err error) { err = x.iterateClients(ctx, func(c clientcore.Client) error { res, err = c.ContainerAnnounceUsedSpace(ctx, prm) diff --git a/pkg/services/object/internal/client/client.go b/pkg/services/object/internal/client/client.go index cfab77efe..73f4ff7c4 100644 --- a/pkg/services/object/internal/client/client.go +++ b/pkg/services/object/internal/client/client.go @@ -449,6 +449,54 @@ func PutObject(ctx context.Context, prm PutObjectPrm) (*PutObjectRes, error) { }, nil } +// PutObjectSingle saves the object in local storage of the remote node with PutSingle RPC. +// +// Client and key must be set. +// +// Returns any error which prevented the operation from completing correctly in error return. +func PutObjectSingle(ctx context.Context, prm PutObjectPrm) (*PutObjectRes, error) { + ctx, span := tracing.StartSpanFromContext(ctx, "client.PutObjectSingle") + defer span.End() + + objID, isSet := prm.obj.ID() + if !isSet { + return nil, errors.New("missing object id") + } + + var prmCli client.PrmObjectPutSingle + + prmCli.ExecuteLocal() + + if prm.key != nil { + prmCli.UseKey(prm.key) + } + + if prm.tokenSession != nil { + prmCli.WithinSession(*prm.tokenSession) + } + + if prm.tokenBearer != nil { + prmCli.WithBearerToken(*prm.tokenBearer) + } + + prmCli.WithXHeaders(prm.xHeaders...) + prmCli.SetObject(prm.obj.ToV2()) + + res, err := prm.cli.ObjectPutSingle(ctx, prmCli) + if err != nil { + ReportError(prm.cli, err) + return nil, fmt.Errorf("put single object on client: %w", err) + } + + if err = apistatus.ErrFromStatus(res.Status()); err != nil { + return nil, fmt.Errorf("put single object via client: %w", err) + } + + return &PutObjectRes{ + id: objID, + }, nil +} + // SearchObjectsPrm groups parameters of SearchObjects operation. type SearchObjectsPrm struct { readPrmCommon diff --git a/pkg/services/object/put/remote.go b/pkg/services/object/put/remote.go index a5b3f643c..8116243ec 100644 --- a/pkg/services/object/put/remote.go +++ b/pkg/services/object/put/remote.go @@ -13,6 +13,8 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/transformer" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) type remoteTarget struct { @@ -63,6 +65,15 @@ func (t *remoteTarget) Close(ctx context.Context) (*transformer.AccessIdentifier prm.SetXHeaders(t.commonPrm.XHeaders()) prm.SetObject(t.obj) + res, err := t.putSingle(ctx, prm) + if status.Code(err) != codes.Unimplemented { + return res, err + } + + return t.putStream(ctx, prm) +} + +func (t *remoteTarget) putStream(ctx context.Context, prm internalclient.PutObjectPrm) (*transformer.AccessIdentifiers, error) { res, err := internalclient.PutObject(ctx, prm) if err != nil { return nil, fmt.Errorf("(%T) could not put object to %s: %w", t, t.nodeInfo.AddressGroup(), err) @@ -71,6 +82,15 @@ func (t *remoteTarget) Close(ctx context.Context) (*transformer.AccessIdentifier return &transformer.AccessIdentifiers{SelfID: res.ID()}, nil } +func (t *remoteTarget) putSingle(ctx context.Context, prm internalclient.PutObjectPrm) (*transformer.AccessIdentifiers, error) { + res, err := internalclient.PutObjectSingle(ctx, prm) + if err != nil { + return nil, fmt.Errorf("(%T) could not put single object to %s: %w", t, t.nodeInfo.AddressGroup(), err) + } + + return &transformer.AccessIdentifiers{SelfID: res.ID()}, nil +} + // NewRemoteSender creates, initializes and returns new RemoteSender instance. func NewRemoteSender(keyStorage *util.KeyStorage, cons ClientConstructor) *RemoteSender { return &RemoteSender{ -- 2.45.2 From a53bc2036f33abe9ee13e501c4137fcb3f8ec56d Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Thu, 13 Jul 2023 14:58:15 +0300 Subject: [PATCH 2/3] [#463] replicator: Add tracing span Signed-off-by: Dmitrii Stepanov --- pkg/services/replicator/process.go | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/pkg/services/replicator/process.go b/pkg/services/replicator/process.go index a54668e12..16bcec9c5 100644 --- a/pkg/services/replicator/process.go +++ b/pkg/services/replicator/process.go @@ -6,7 +6,10 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine" putsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/put" + "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" "go.uber.org/zap" ) @@ -28,6 +31,13 @@ func (p *Replicator) HandleTask(ctx context.Context, task Task, res TaskResult) ) }() + ctx, span := tracing.StartSpanFromContext(ctx, "Replicator.HandleTask", + trace.WithAttributes( + attribute.Stringer("address", task.Addr), + attribute.Int64("number_of_copies", int64(task.NumCopies)), + )) + defer span.End() + if task.Obj == nil { var err error task.Obj, err = engine.Get(ctx, p.localStorage, task.Addr) -- 2.45.2 From e39a9c9c68640a145314807cf00ac792069c503e Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Thu, 13 Jul 2023 15:08:46 +0300 Subject: [PATCH 3/3] [#463] policer: Remove capacity rebalance logic Current implementation has some quirks. For example, using only half of object.put.pool_size_remote threads tells replicator that is node is 50% loaded, but in reality we could be putting lot's of big objects. Signed-off-by: Dmitrii Stepanov --- cmd/frostfs-node/config.go | 7 ------- cmd/frostfs-node/object.go | 1 - internal/logs/logs.go | 1 - pkg/services/policer/option.go | 15 --------------- pkg/services/policer/policer_test.go | 7 ------- pkg/services/policer/process.go | 25 ------------------------- 6 files changed, 56 deletions(-) diff --git a/cmd/frostfs-node/config.go b/cmd/frostfs-node/config.go index 5d7adab29..f90136301 100644 --- a/cmd/frostfs-node/config.go +++ b/cmd/frostfs-node/config.go @@ -996,13 +996,6 @@ func (c *cfg) needBootstrap() bool { return c.cfgNetmap.needBootstrap } -// ObjectServiceLoad implements system loader interface for policer component. -// It is calculated as size/capacity ratio of "remote object put" worker. -// Returns float value between 0.0 and 1.0. -func (c *cfg) ObjectServiceLoad() float64 { - return float64(c.cfgObject.pool.putRemote.Running()) / float64(c.cfgObject.pool.putRemoteCapacity) -} - type dCmp struct { name string reloadFunc func() error diff --git a/cmd/frostfs-node/object.go b/cmd/frostfs-node/object.go index 1b9f0c817..84411d31b 100644 --- a/cmd/frostfs-node/object.go +++ b/cmd/frostfs-node/object.go @@ -259,7 +259,6 @@ func addPolicer(c *cfg, keyStorage *util.KeyStorage, clientConstructor *cache.Cl }), policer.WithMaxCapacity(c.cfgObject.pool.replicatorPoolSize), policer.WithPool(c.cfgObject.pool.replication), - policer.WithNodeLoader(c), ) c.workers = append(c.workers, worker{ diff --git a/internal/logs/logs.go b/internal/logs/logs.go index 2ba9e6e4b..d91365b36 100644 --- a/internal/logs/logs.go +++ b/internal/logs/logs.go @@ -52,7 +52,6 @@ const ( PolicerRoutineStopped = "routine stopped" // Info in ../node/pkg/services/policer/process.go PolicerFailureAtObjectSelectForReplication = "failure at object select for replication" // Warn in ../node/pkg/services/policer/process.go PolicerPoolSubmission = "pool submission" // Warn in ../node/pkg/services/policer/process.go - PolicerTuneReplicationCapacity = "tune replication capacity" // Debug in ../node/pkg/services/policer/process.go ReplicatorFinishWork = "finish work" // Debug in ../node/pkg/services/replicator/process.go ReplicatorCouldNotGetObjectFromLocalStorage = "could not get object from local storage" // Error in ../node/pkg/services/replicator/process.go ReplicatorCouldNotReplicateObject = "could not replicate object" // Error in ../node/pkg/services/replicator/process.go diff --git a/pkg/services/policer/option.go b/pkg/services/policer/option.go index 4194353ca..5058b026b 100644 --- a/pkg/services/policer/option.go +++ b/pkg/services/policer/option.go @@ -41,12 +41,6 @@ type Replicator interface { // RemoteObjectHeaderFunc is the function to obtain HEAD info from a specific remote node. type RemoteObjectHeaderFunc func(context.Context, netmapSDK.NodeInfo, oid.Address) (*objectSDK.Object, error) -// NodeLoader provides application load statistics. -type nodeLoader interface { - // ObjectServiceLoad returns object service load value in [0:1] range. - ObjectServiceLoad() float64 -} - type cfg struct { headTimeout time.Duration @@ -70,8 +64,6 @@ type cfg struct { taskPool *ants.Pool - loader nodeLoader - maxCapacity int batchSize, cacheSize uint32 @@ -178,10 +170,3 @@ func WithPool(p *ants.Pool) Option { c.taskPool = p } } - -// WithNodeLoader returns option to set FrostFS node load source. -func WithNodeLoader(l nodeLoader) Option { - return func(c *cfg) { - c.loader = l - } -} diff --git a/pkg/services/policer/policer_test.go b/pkg/services/policer/policer_test.go index 42428df23..0bd96a477 100644 --- a/pkg/services/policer/policer_test.go +++ b/pkg/services/policer/policer_test.go @@ -52,7 +52,6 @@ func TestBuryObjectWithoutContainer(t *testing.T) { WithContainerSource(containerSrcFunc(containerSrc)), WithBuryFunc(buryFn), WithPool(pool), - WithNodeLoader(constNodeLoader(0)), ) ctx, cancel := context.WithCancel(context.Background()) @@ -279,7 +278,6 @@ func TestIteratorContract(t *testing.T) { WithContainerSource(containerSrcFunc(containerSrc)), WithBuryFunc(buryFn), WithPool(pool), - WithNodeLoader(constNodeLoader(0)), func(c *cfg) { c.sleepDuration = time.Millisecond }, @@ -377,11 +375,6 @@ type announcedKeysFunc func([]byte) bool func (f announcedKeysFunc) IsLocalKey(k []byte) bool { return f(k) } -// constNodeLoader is a nodeLoader that always returns a fixed value. -type constNodeLoader float64 - -func (f constNodeLoader) ObjectServiceLoad() float64 { return float64(f) } - // replicatorFunc is a Replicator backed by a function. type replicatorFunc func(context.Context, replicator.Task, replicator.TaskResult) diff --git a/pkg/services/policer/process.go b/pkg/services/policer/process.go index 3b54bf929..1f61c69f4 100644 --- a/pkg/services/policer/process.go +++ b/pkg/services/policer/process.go @@ -11,7 +11,6 @@ import ( ) func (p *Policer) Run(ctx context.Context) { - go p.poolCapacityWorker(ctx) p.shardPolicyWorker(ctx) p.log.Info(logs.PolicerRoutineStopped) } @@ -65,27 +64,3 @@ func (p *Policer) shardPolicyWorker(ctx context.Context) { } } } - -func (p *Policer) poolCapacityWorker(ctx context.Context) { - ticker := time.NewTicker(p.rebalanceFreq) - defer ticker.Stop() - for { - select { - case <-ctx.Done(): - return - case <-ticker.C: - frostfsSysLoad := p.loader.ObjectServiceLoad() - newCapacity := int((1.0 - frostfsSysLoad) * float64(p.maxCapacity)) - if newCapacity == 0 { - newCapacity++ - } - - if p.taskPool.Cap() != newCapacity { - p.taskPool.Tune(newCapacity) - p.log.Debug(logs.PolicerTuneReplicationCapacity, - zap.Float64("system_load", frostfsSysLoad), - zap.Int("new_capacity", newCapacity)) - } - } - } -} -- 2.45.2