Refactor replicator #517

Merged
fyrchik merged 3 commits from dstepanov-yadro/frostfs-node:feat/replicator-put-single into master 2023-07-18 10:52:13 +00:00
4 changed files with 78 additions and 0 deletions
Showing only changes of commit b61ae94b71 - Show all commits

View file

@ -13,6 +13,7 @@ import (
type Client interface {
ContainerAnnounceUsedSpace(context.Context, client.PrmAnnounceSpace) (*client.ResAnnounceSpace, error)
ObjectPutInit(context.Context, client.PrmObjectPutInit) (client.ObjectWriter, error)
ObjectPutSingle(context.Context, client.PrmObjectPutSingle) (*client.ResObjectPutSingle, error)
ObjectDelete(context.Context, client.PrmObjectDelete) (*client.ResObjectDelete, error)
ObjectGetInit(context.Context, client.PrmObjectGet) (*client.ObjectReader, error)
ObjectHead(context.Context, client.PrmObjectHead) (*client.ResObjectHead, error)

View file

@ -228,6 +228,15 @@ func (x *multiClient) ObjectPutInit(ctx context.Context, p client.PrmObjectPutIn
return
}
func (x *multiClient) ObjectPutSingle(ctx context.Context, p client.PrmObjectPutSingle) (res *client.ResObjectPutSingle, err error) {
err = x.iterateClients(ctx, func(c clientcore.Client) error {
res, err = c.ObjectPutSingle(ctx, p)
return err
})
return
}
func (x *multiClient) ContainerAnnounceUsedSpace(ctx context.Context, prm client.PrmAnnounceSpace) (res *client.ResAnnounceSpace, err error) {
err = x.iterateClients(ctx, func(c clientcore.Client) error {
res, err = c.ContainerAnnounceUsedSpace(ctx, prm)

View file

@ -449,6 +449,54 @@ func PutObject(ctx context.Context, prm PutObjectPrm) (*PutObjectRes, error) {
}, nil
}
// PutObjectSingle saves the object in local storage of the remote node with PutSingle RPC.
//
// Client and key must be set.
//
// Returns any error which prevented the operation from completing correctly in error return.
func PutObjectSingle(ctx context.Context, prm PutObjectPrm) (*PutObjectRes, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "client.PutObjectSingle")
defer span.End()
objID, isSet := prm.obj.ID()
if !isSet {
return nil, errors.New("missing object id")
ale64bit marked this conversation as resolved Outdated

errors.New?

`errors.New`?

fixed

fixed
}
var prmCli client.PrmObjectPutSingle
prmCli.ExecuteLocal()
if prm.key != nil {
prmCli.UseKey(prm.key)
}
if prm.tokenSession != nil {
prmCli.WithinSession(*prm.tokenSession)
}
if prm.tokenBearer != nil {
prmCli.WithBearerToken(*prm.tokenBearer)
}
prmCli.WithXHeaders(prm.xHeaders...)
prmCli.SetObject(prm.obj.ToV2())
res, err := prm.cli.ObjectPutSingle(ctx, prmCli)
if err != nil {
ReportError(prm.cli, err)
return nil, fmt.Errorf("put single object on client: %w", err)
}
if err = apistatus.ErrFromStatus(res.Status()); err != nil {
return nil, fmt.Errorf("put single object via client: %w", err)
ale64bit marked this conversation as resolved Outdated

discussed offline

discussed offline

fixed

fixed
}
return &PutObjectRes{
id: objID,
}, nil
}
// SearchObjectsPrm groups parameters of SearchObjects operation.
type SearchObjectsPrm struct {
readPrmCommon

View file

@ -13,6 +13,8 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/transformer"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
type remoteTarget struct {
@ -63,6 +65,15 @@ func (t *remoteTarget) Close(ctx context.Context) (*transformer.AccessIdentifier
prm.SetXHeaders(t.commonPrm.XHeaders())
prm.SetObject(t.obj)
res, err := t.putSingle(ctx, prm)
if status.Code(err) != codes.Unimplemented {

remoteTarget is used by node only, so I would expect only prepared objects here.
Can this condition be false?

`remoteTarget` is used by node only, so I would expect only prepared objects here. Can this condition be false?

fixed

fixed
return res, err
}
return t.putStream(ctx, prm)
}
func (t *remoteTarget) putStream(ctx context.Context, prm internalclient.PutObjectPrm) (*transformer.AccessIdentifiers, error) {
res, err := internalclient.PutObject(ctx, prm)
if err != nil {
return nil, fmt.Errorf("(%T) could not put object to %s: %w", t, t.nodeInfo.AddressGroup(), err)
@ -71,6 +82,15 @@ func (t *remoteTarget) Close(ctx context.Context) (*transformer.AccessIdentifier
return &transformer.AccessIdentifiers{SelfID: res.ID()}, nil
}
func (t *remoteTarget) putSingle(ctx context.Context, prm internalclient.PutObjectPrm) (*transformer.AccessIdentifiers, error) {
res, err := internalclient.PutObjectSingle(ctx, prm)
if err != nil {
return nil, fmt.Errorf("(%T) could not put single object to %s: %w", t, t.nodeInfo.AddressGroup(), err)
}
return &transformer.AccessIdentifiers{SelfID: res.ID()}, nil
}
// NewRemoteSender creates, initializes and returns new RemoteSender instance.
func NewRemoteSender(keyStorage *util.KeyStorage, cons ClientConstructor) *RemoteSender {
return &RemoteSender{