Add PutSingle method for object service #103

Merged
dstepanov-yadro merged 1 commit from dstepanov-yadro/frostfs-sdk-go:feat/put-single into master 2023-07-26 21:08:02 +00:00
2 changed files with 163 additions and 7 deletions

116
client/object_put_single.go Normal file
View file

@ -0,0 +1,116 @@
package client
import (
"context"
"crypto/ecdsa"
"fmt"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/acl"
v2object "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
rpcapi "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client"
v2session "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/session"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/signature"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session"
)
// PrmObjectPutSingle groups parameters of PutSingle operation.
type PrmObjectPutSingle struct {
copyNum []uint32
meta v2session.RequestMetaHeader
object *v2object.Object
key *ecdsa.PrivateKey
}
// SetCopiesNumber sets ordered list of minimal required object copies numbers
// per placement vector. List's length MUST equal container's placement vector number,
// otherwise request will fail.
func (x *PrmObjectPutSingle) SetCopiesNumber(v []uint32) {
x.copyNum = v
}
// UseKey specifies private key to sign the requests.
// If key is not provided, then Client default key is used.
func (x *PrmObjectPutSingle) UseKey(key *ecdsa.PrivateKey) {
x.key = key
}
// WithBearerToken attaches bearer token to be used for the operation.
// Should be called once before any writing steps.
func (x *PrmObjectPutSingle) WithBearerToken(t bearer.Token) {
v2token := &acl.BearerToken{}
t.WriteToV2(v2token)
x.meta.SetBearerToken(v2token)
}
// WithinSession specifies session within which object should be stored.
// Should be called once before any writing steps.
func (x *PrmObjectPutSingle) WithinSession(t session.Object) {
tv2 := &v2session.Token{}
t.WriteToV2(tv2)
x.meta.SetSessionToken(tv2)
}
// ExecuteLocal tells the server to execute the operation locally.
func (x *PrmObjectPutSingle) ExecuteLocal() {
x.meta.SetTTL(1)
}
// WithXHeaders specifies list of extended headers (string key-value pairs)
// to be attached to the request. Must have an even length.
//
// Slice must not be mutated until the operation completes.
func (x *PrmObjectPutSingle) WithXHeaders(hs ...string) {
writeXHeadersToMeta(hs, &x.meta)
}
// SetObject specifies prepared object to put.
func (x *PrmObjectPutSingle) SetObject(o *v2object.Object) {
x.object = o
}
// ResObjectPutSingle groups resulting values of PutSingle operation.
type ResObjectPutSingle struct {
statusRes
}
// ObjectPutSingle writes prepared object to FrostFS.
// Object must have payload, also containerID, objectID, ownerID, payload hash, payload length of an object must be set.
// Exactly one return value is non-nil. By default, server status is returned in res structure.
// Any client's internal or transport errors are returned as Go built-in error.
// If Client is tuned to resolve FrostFS API statuses, then FrostFS failures
// codes are returned as error.
func (c *Client) ObjectPutSingle(ctx context.Context, prm PrmObjectPutSingle) (*ResObjectPutSingle, error) {
body := &v2object.PutSingleRequestBody{}
body.SetCopiesNumber(prm.copyNum)
body.SetObject(prm.object)
req := &v2object.PutSingleRequest{}
req.SetBody(body)
c.prepareRequest(req, &prm.meta)
key := &c.prm.key
if prm.key != nil {
key = prm.key
}
err := signature.SignServiceMessage(key, req)
if err != nil {
return nil, fmt.Errorf("sign request: %w", err)
}
resp, err := rpcapi.PutSingleObject(&c.c, req, client.WithContext(ctx))
if err != nil {
return nil, err
}
var res ResObjectPutSingle
res.st, err = c.processResponse(resp)
if err != nil {
return nil, err
}
return &res, nil
}

View file

@ -5,6 +5,8 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/transformer"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
func (c *Client) objectPutInitTransformer(prm PrmObjectPutInit) (*objectWriterTransformer, error) {
@ -56,11 +58,12 @@ func (x *objectWriterTransformer) Close(ctx context.Context) (*ResObjectPut, err
}
type internalTarget struct {
current *object.Object
client *Client
res *ResObjectPut
prm PrmObjectPutInit
payload []byte
current *object.Object
client *Client
res *ResObjectPut
prm PrmObjectPutInit
payload []byte
useStream bool
}
func (it *internalTarget) WriteHeader(_ context.Context, object *object.Object) error {
@ -75,9 +78,19 @@ func (it *internalTarget) Write(_ context.Context, p []byte) (n int, err error)
func (it *internalTarget) Close(ctx context.Context) (*transformer.AccessIdentifiers, error) {
it.current.SetPayload(it.payload)
putSingleImplemented, err := it.tryPutSingle(ctx)
if putSingleImplemented {
return nil, err
}
it.useStream = true
return nil, it.putAsStream(ctx)
}
func (it *internalTarget) putAsStream(ctx context.Context) error {
wrt, err := it.client.objectPutInitRaw(ctx, it.prm)
if err != nil {
return nil, err
return err
}
if wrt.WriteHeader(ctx, *it.current) {
wrt.WritePayloadChunk(ctx, it.current.Payload())
@ -85,5 +98,32 @@ func (it *internalTarget) Close(ctx context.Context) (*transformer.AccessIdentif
it.res, err = wrt.Close(ctx)
it.current = nil
it.payload = nil
return nil, err
return err
}
func (it *internalTarget) tryPutSingle(ctx context.Context) (bool, error) {
if it.useStream {
return false, nil
}
var prm PrmObjectPutSingle
prm.SetCopiesNumber(it.prm.copyNum)
prm.SetObject(it.current.ToV2())
prm.UseKey(prm.key)
prm.meta = it.prm.meta
res, err := it.client.ObjectPutSingle(ctx, prm)
if err != nil && status.Code(err) == codes.Unimplemented {
return false, err
}
if err == nil {
id, _ := it.current.ID()
it.res = &ResObjectPut{
statusRes: res.statusRes,
obj: id,
}
}
it.current = nil
it.payload = nil
return true, err
}