forked from TrueCloudLab/frostfs-node
[#463] putsvc: Use PutSingle RPC for remote target
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
parent
b2487e8cc5
commit
8966dd8e35
4 changed files with 78 additions and 0 deletions
|
@ -13,6 +13,7 @@ import (
|
||||||
type Client interface {
|
type Client interface {
|
||||||
ContainerAnnounceUsedSpace(context.Context, client.PrmAnnounceSpace) (*client.ResAnnounceSpace, error)
|
ContainerAnnounceUsedSpace(context.Context, client.PrmAnnounceSpace) (*client.ResAnnounceSpace, error)
|
||||||
ObjectPutInit(context.Context, client.PrmObjectPutInit) (client.ObjectWriter, error)
|
ObjectPutInit(context.Context, client.PrmObjectPutInit) (client.ObjectWriter, error)
|
||||||
|
ObjectPutSingle(context.Context, client.PrmObjectPutSingle) (*client.ResObjectPutSingle, error)
|
||||||
ObjectDelete(context.Context, client.PrmObjectDelete) (*client.ResObjectDelete, error)
|
ObjectDelete(context.Context, client.PrmObjectDelete) (*client.ResObjectDelete, error)
|
||||||
ObjectGetInit(context.Context, client.PrmObjectGet) (*client.ObjectReader, error)
|
ObjectGetInit(context.Context, client.PrmObjectGet) (*client.ObjectReader, error)
|
||||||
ObjectHead(context.Context, client.PrmObjectHead) (*client.ResObjectHead, error)
|
ObjectHead(context.Context, client.PrmObjectHead) (*client.ResObjectHead, error)
|
||||||
|
|
9
pkg/network/cache/multi.go
vendored
9
pkg/network/cache/multi.go
vendored
|
@ -228,6 +228,15 @@ func (x *multiClient) ObjectPutInit(ctx context.Context, p client.PrmObjectPutIn
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (x *multiClient) ObjectPutSingle(ctx context.Context, p client.PrmObjectPutSingle) (res *client.ResObjectPutSingle, err error) {
|
||||||
|
err = x.iterateClients(ctx, func(c clientcore.Client) error {
|
||||||
|
res, err = c.ObjectPutSingle(ctx, p)
|
||||||
|
return err
|
||||||
|
})
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
func (x *multiClient) ContainerAnnounceUsedSpace(ctx context.Context, prm client.PrmAnnounceSpace) (res *client.ResAnnounceSpace, err error) {
|
func (x *multiClient) ContainerAnnounceUsedSpace(ctx context.Context, prm client.PrmAnnounceSpace) (res *client.ResAnnounceSpace, err error) {
|
||||||
err = x.iterateClients(ctx, func(c clientcore.Client) error {
|
err = x.iterateClients(ctx, func(c clientcore.Client) error {
|
||||||
res, err = c.ContainerAnnounceUsedSpace(ctx, prm)
|
res, err = c.ContainerAnnounceUsedSpace(ctx, prm)
|
||||||
|
|
|
@ -449,6 +449,54 @@ func PutObject(ctx context.Context, prm PutObjectPrm) (*PutObjectRes, error) {
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// PutObjectSingle saves the object in local storage of the remote node with PutSingle RPC.
|
||||||
|
//
|
||||||
|
// Client and key must be set.
|
||||||
|
//
|
||||||
|
// Returns any error which prevented the operation from completing correctly in error return.
|
||||||
|
func PutObjectSingle(ctx context.Context, prm PutObjectPrm) (*PutObjectRes, error) {
|
||||||
|
ctx, span := tracing.StartSpanFromContext(ctx, "client.PutObjectSingle")
|
||||||
|
defer span.End()
|
||||||
|
|
||||||
|
objID, isSet := prm.obj.ID()
|
||||||
|
if !isSet {
|
||||||
|
return nil, errors.New("missing object id")
|
||||||
|
}
|
||||||
|
|
||||||
|
var prmCli client.PrmObjectPutSingle
|
||||||
|
|
||||||
|
prmCli.ExecuteLocal()
|
||||||
|
|
||||||
|
if prm.key != nil {
|
||||||
|
prmCli.UseKey(prm.key)
|
||||||
|
}
|
||||||
|
|
||||||
|
if prm.tokenSession != nil {
|
||||||
|
prmCli.WithinSession(*prm.tokenSession)
|
||||||
|
}
|
||||||
|
|
||||||
|
if prm.tokenBearer != nil {
|
||||||
|
prmCli.WithBearerToken(*prm.tokenBearer)
|
||||||
|
}
|
||||||
|
|
||||||
|
prmCli.WithXHeaders(prm.xHeaders...)
|
||||||
|
prmCli.SetObject(prm.obj.ToV2())
|
||||||
|
|
||||||
|
res, err := prm.cli.ObjectPutSingle(ctx, prmCli)
|
||||||
|
if err != nil {
|
||||||
|
ReportError(prm.cli, err)
|
||||||
|
return nil, fmt.Errorf("put single object on client: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = apistatus.ErrFromStatus(res.Status()); err != nil {
|
||||||
|
return nil, fmt.Errorf("put single object via client: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return &PutObjectRes{
|
||||||
|
id: objID,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
// SearchObjectsPrm groups parameters of SearchObjects operation.
|
// SearchObjectsPrm groups parameters of SearchObjects operation.
|
||||||
type SearchObjectsPrm struct {
|
type SearchObjectsPrm struct {
|
||||||
readPrmCommon
|
readPrmCommon
|
||||||
|
|
|
@ -13,6 +13,8 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/transformer"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/transformer"
|
||||||
|
"google.golang.org/grpc/codes"
|
||||||
|
"google.golang.org/grpc/status"
|
||||||
)
|
)
|
||||||
|
|
||||||
type remoteTarget struct {
|
type remoteTarget struct {
|
||||||
|
@ -63,6 +65,15 @@ func (t *remoteTarget) Close(ctx context.Context) (*transformer.AccessIdentifier
|
||||||
prm.SetXHeaders(t.commonPrm.XHeaders())
|
prm.SetXHeaders(t.commonPrm.XHeaders())
|
||||||
prm.SetObject(t.obj)
|
prm.SetObject(t.obj)
|
||||||
|
|
||||||
|
res, err := t.putSingle(ctx, prm)
|
||||||
|
if status.Code(err) != codes.Unimplemented {
|
||||||
|
return res, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return t.putStream(ctx, prm)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *remoteTarget) putStream(ctx context.Context, prm internalclient.PutObjectPrm) (*transformer.AccessIdentifiers, error) {
|
||||||
res, err := internalclient.PutObject(ctx, prm)
|
res, err := internalclient.PutObject(ctx, prm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("(%T) could not put object to %s: %w", t, t.nodeInfo.AddressGroup(), err)
|
return nil, fmt.Errorf("(%T) could not put object to %s: %w", t, t.nodeInfo.AddressGroup(), err)
|
||||||
|
@ -71,6 +82,15 @@ func (t *remoteTarget) Close(ctx context.Context) (*transformer.AccessIdentifier
|
||||||
return &transformer.AccessIdentifiers{SelfID: res.ID()}, nil
|
return &transformer.AccessIdentifiers{SelfID: res.ID()}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (t *remoteTarget) putSingle(ctx context.Context, prm internalclient.PutObjectPrm) (*transformer.AccessIdentifiers, error) {
|
||||||
|
res, err := internalclient.PutObjectSingle(ctx, prm)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("(%T) could not put single object to %s: %w", t, t.nodeInfo.AddressGroup(), err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return &transformer.AccessIdentifiers{SelfID: res.ID()}, nil
|
||||||
|
}
|
||||||
|
|
||||||
// NewRemoteSender creates, initializes and returns new RemoteSender instance.
|
// NewRemoteSender creates, initializes and returns new RemoteSender instance.
|
||||||
func NewRemoteSender(keyStorage *util.KeyStorage, cons ClientConstructor) *RemoteSender {
|
func NewRemoteSender(keyStorage *util.KeyStorage, cons ClientConstructor) *RemoteSender {
|
||||||
return &RemoteSender{
|
return &RemoteSender{
|
||||||
|
|
Loading…
Reference in a new issue