From 8966dd8e35c738306ca7e99accf324e97cb27601 Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Wed, 12 Jul 2023 17:47:42 +0300 Subject: [PATCH] [#463] putsvc: Use PutSingle RPC for remote target Signed-off-by: Dmitrii Stepanov --- pkg/core/client/client.go | 1 + pkg/network/cache/multi.go | 9 ++++ pkg/services/object/internal/client/client.go | 48 +++++++++++++++++++ pkg/services/object/put/remote.go | 20 ++++++++ 4 files changed, 78 insertions(+) diff --git a/pkg/core/client/client.go b/pkg/core/client/client.go index 422ca1a1..8c92901f 100644 --- a/pkg/core/client/client.go +++ b/pkg/core/client/client.go @@ -13,6 +13,7 @@ import ( type Client interface { ContainerAnnounceUsedSpace(context.Context, client.PrmAnnounceSpace) (*client.ResAnnounceSpace, error) ObjectPutInit(context.Context, client.PrmObjectPutInit) (client.ObjectWriter, error) + ObjectPutSingle(context.Context, client.PrmObjectPutSingle) (*client.ResObjectPutSingle, error) ObjectDelete(context.Context, client.PrmObjectDelete) (*client.ResObjectDelete, error) ObjectGetInit(context.Context, client.PrmObjectGet) (*client.ObjectReader, error) ObjectHead(context.Context, client.PrmObjectHead) (*client.ResObjectHead, error) diff --git a/pkg/network/cache/multi.go b/pkg/network/cache/multi.go index 18155849..98d2f33e 100644 --- a/pkg/network/cache/multi.go +++ b/pkg/network/cache/multi.go @@ -228,6 +228,15 @@ func (x *multiClient) ObjectPutInit(ctx context.Context, p client.PrmObjectPutIn return } +func (x *multiClient) ObjectPutSingle(ctx context.Context, p client.PrmObjectPutSingle) (res *client.ResObjectPutSingle, err error) { + err = x.iterateClients(ctx, func(c clientcore.Client) error { + res, err = c.ObjectPutSingle(ctx, p) + return err + }) + + return +} + func (x *multiClient) ContainerAnnounceUsedSpace(ctx context.Context, prm client.PrmAnnounceSpace) (res *client.ResAnnounceSpace, err error) { err = x.iterateClients(ctx, func(c clientcore.Client) error { res, err = c.ContainerAnnounceUsedSpace(ctx, prm) diff --git a/pkg/services/object/internal/client/client.go b/pkg/services/object/internal/client/client.go index cfab77ef..73f4ff7c 100644 --- a/pkg/services/object/internal/client/client.go +++ b/pkg/services/object/internal/client/client.go @@ -449,6 +449,54 @@ func PutObject(ctx context.Context, prm PutObjectPrm) (*PutObjectRes, error) { }, nil } +// PutObjectSingle saves the object in local storage of the remote node with PutSingle RPC. +// +// Client and key must be set. +// +// Returns any error which prevented the operation from completing correctly in error return. +func PutObjectSingle(ctx context.Context, prm PutObjectPrm) (*PutObjectRes, error) { + ctx, span := tracing.StartSpanFromContext(ctx, "client.PutObjectSingle") + defer span.End() + + objID, isSet := prm.obj.ID() + if !isSet { + return nil, errors.New("missing object id") + } + + var prmCli client.PrmObjectPutSingle + + prmCli.ExecuteLocal() + + if prm.key != nil { + prmCli.UseKey(prm.key) + } + + if prm.tokenSession != nil { + prmCli.WithinSession(*prm.tokenSession) + } + + if prm.tokenBearer != nil { + prmCli.WithBearerToken(*prm.tokenBearer) + } + + prmCli.WithXHeaders(prm.xHeaders...) + prmCli.SetObject(prm.obj.ToV2()) + + res, err := prm.cli.ObjectPutSingle(ctx, prmCli) + if err != nil { + ReportError(prm.cli, err) + return nil, fmt.Errorf("put single object on client: %w", err) + } + + if err = apistatus.ErrFromStatus(res.Status()); err != nil { + return nil, fmt.Errorf("put single object via client: %w", err) + } + + return &PutObjectRes{ + id: objID, + }, nil +} + // SearchObjectsPrm groups parameters of SearchObjects operation. type SearchObjectsPrm struct { readPrmCommon diff --git a/pkg/services/object/put/remote.go b/pkg/services/object/put/remote.go index a5b3f643..8116243e 100644 --- a/pkg/services/object/put/remote.go +++ b/pkg/services/object/put/remote.go @@ -13,6 +13,8 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/transformer" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) type remoteTarget struct { @@ -63,6 +65,15 @@ func (t *remoteTarget) Close(ctx context.Context) (*transformer.AccessIdentifier prm.SetXHeaders(t.commonPrm.XHeaders()) prm.SetObject(t.obj) + res, err := t.putSingle(ctx, prm) + if status.Code(err) != codes.Unimplemented { + return res, err + } + + return t.putStream(ctx, prm) +} + +func (t *remoteTarget) putStream(ctx context.Context, prm internalclient.PutObjectPrm) (*transformer.AccessIdentifiers, error) { res, err := internalclient.PutObject(ctx, prm) if err != nil { return nil, fmt.Errorf("(%T) could not put object to %s: %w", t, t.nodeInfo.AddressGroup(), err) @@ -71,6 +82,15 @@ func (t *remoteTarget) Close(ctx context.Context) (*transformer.AccessIdentifier return &transformer.AccessIdentifiers{SelfID: res.ID()}, nil } +func (t *remoteTarget) putSingle(ctx context.Context, prm internalclient.PutObjectPrm) (*transformer.AccessIdentifiers, error) { + res, err := internalclient.PutObjectSingle(ctx, prm) + if err != nil { + return nil, fmt.Errorf("(%T) could not put single object to %s: %w", t, t.nodeInfo.AddressGroup(), err) + } + + return &transformer.AccessIdentifiers{SelfID: res.ID()}, nil +} + // NewRemoteSender creates, initializes and returns new RemoteSender instance. func NewRemoteSender(keyStorage *util.KeyStorage, cons ClientConstructor) *RemoteSender { return &RemoteSender{