Add PutSingle method for object service #103
2 changed files with 163 additions and 7 deletions
116
client/object_put_single.go
Normal file
116
client/object_put_single.go
Normal file
|
@ -0,0 +1,116 @@
|
||||||
|
package client
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"crypto/ecdsa"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/acl"
|
||||||
|
v2object "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
|
||||||
|
rpcapi "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client"
|
||||||
|
v2session "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/session"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/signature"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session"
|
||||||
|
)
|
||||||
|
|
||||||
|
// PrmObjectPutSingle groups parameters of PutSingle operation.
|
||||||
|
type PrmObjectPutSingle struct {
|
||||||
|
copyNum []uint32
|
||||||
|
meta v2session.RequestMetaHeader
|
||||||
|
object *v2object.Object
|
||||||
|
key *ecdsa.PrivateKey
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetCopiesNumber sets ordered list of minimal required object copies numbers
|
||||||
|
// per placement vector. List's length MUST equal container's placement vector number,
|
||||||
|
// otherwise request will fail.
|
||||||
|
func (x *PrmObjectPutSingle) SetCopiesNumber(v []uint32) {
|
||||||
|
x.copyNum = v
|
||||||
|
}
|
||||||
|
|
||||||
|
// UseKey specifies private key to sign the requests.
|
||||||
|
// If key is not provided, then Client default key is used.
|
||||||
|
func (x *PrmObjectPutSingle) UseKey(key *ecdsa.PrivateKey) {
|
||||||
|
x.key = key
|
||||||
|
}
|
||||||
|
|
||||||
|
// WithBearerToken attaches bearer token to be used for the operation.
|
||||||
|
// Should be called once before any writing steps.
|
||||||
|
func (x *PrmObjectPutSingle) WithBearerToken(t bearer.Token) {
|
||||||
|
v2token := &acl.BearerToken{}
|
||||||
|
t.WriteToV2(v2token)
|
||||||
|
x.meta.SetBearerToken(v2token)
|
||||||
|
}
|
||||||
|
|
||||||
|
// WithinSession specifies session within which object should be stored.
|
||||||
|
// Should be called once before any writing steps.
|
||||||
|
func (x *PrmObjectPutSingle) WithinSession(t session.Object) {
|
||||||
|
tv2 := &v2session.Token{}
|
||||||
|
t.WriteToV2(tv2)
|
||||||
|
x.meta.SetSessionToken(tv2)
|
||||||
|
}
|
||||||
|
|
||||||
|
// ExecuteLocal tells the server to execute the operation locally.
|
||||||
|
func (x *PrmObjectPutSingle) ExecuteLocal() {
|
||||||
|
x.meta.SetTTL(1)
|
||||||
|
}
|
||||||
|
|
||||||
|
// WithXHeaders specifies list of extended headers (string key-value pairs)
|
||||||
|
// to be attached to the request. Must have an even length.
|
||||||
|
//
|
||||||
|
// Slice must not be mutated until the operation completes.
|
||||||
|
func (x *PrmObjectPutSingle) WithXHeaders(hs ...string) {
|
||||||
|
writeXHeadersToMeta(hs, &x.meta)
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetObject specifies prepared object to put.
|
||||||
|
func (x *PrmObjectPutSingle) SetObject(o *v2object.Object) {
|
||||||
|
x.object = o
|
||||||
|
}
|
||||||
|
|
||||||
|
// ResObjectPutSingle groups resulting values of PutSingle operation.
|
||||||
|
type ResObjectPutSingle struct {
|
||||||
|
statusRes
|
||||||
|
}
|
||||||
|
|
||||||
|
// ObjectPutSingle writes prepared object to FrostFS.
|
||||||
|
// Object must have payload, also containerID, objectID, ownerID, payload hash, payload length of an object must be set.
|
||||||
|
// Exactly one return value is non-nil. By default, server status is returned in res structure.
|
||||||
|
// Any client's internal or transport errors are returned as Go built-in error.
|
||||||
|
// If Client is tuned to resolve FrostFS API statuses, then FrostFS failures
|
||||||
|
// codes are returned as error.
|
||||||
|
func (c *Client) ObjectPutSingle(ctx context.Context, prm PrmObjectPutSingle) (*ResObjectPutSingle, error) {
|
||||||
|
body := &v2object.PutSingleRequestBody{}
|
||||||
|
body.SetCopiesNumber(prm.copyNum)
|
||||||
|
body.SetObject(prm.object)
|
||||||
|
|
||||||
|
req := &v2object.PutSingleRequest{}
|
||||||
|
req.SetBody(body)
|
||||||
|
|
||||||
|
c.prepareRequest(req, &prm.meta)
|
||||||
|
|
||||||
|
key := &c.prm.key
|
||||||
|
if prm.key != nil {
|
||||||
|
key = prm.key
|
||||||
|
}
|
||||||
|
|
||||||
|
err := signature.SignServiceMessage(key, req)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("sign request: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
resp, err := rpcapi.PutSingleObject(&c.c, req, client.WithContext(ctx))
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
var res ResObjectPutSingle
|
||||||
|
res.st, err = c.processResponse(resp)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return &res, nil
|
||||||
|
}
|
|
@ -5,6 +5,8 @@ import (
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/transformer"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/transformer"
|
||||||
|
"google.golang.org/grpc/codes"
|
||||||
|
"google.golang.org/grpc/status"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (c *Client) objectPutInitTransformer(prm PrmObjectPutInit) (*objectWriterTransformer, error) {
|
func (c *Client) objectPutInitTransformer(prm PrmObjectPutInit) (*objectWriterTransformer, error) {
|
||||||
|
@ -56,11 +58,12 @@ func (x *objectWriterTransformer) Close(ctx context.Context) (*ResObjectPut, err
|
||||||
}
|
}
|
||||||
|
|
||||||
type internalTarget struct {
|
type internalTarget struct {
|
||||||
current *object.Object
|
current *object.Object
|
||||||
client *Client
|
client *Client
|
||||||
res *ResObjectPut
|
res *ResObjectPut
|
||||||
prm PrmObjectPutInit
|
prm PrmObjectPutInit
|
||||||
payload []byte
|
payload []byte
|
||||||
|
useStream bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func (it *internalTarget) WriteHeader(_ context.Context, object *object.Object) error {
|
func (it *internalTarget) WriteHeader(_ context.Context, object *object.Object) error {
|
||||||
|
@ -75,9 +78,19 @@ func (it *internalTarget) Write(_ context.Context, p []byte) (n int, err error)
|
||||||
|
|
||||||
func (it *internalTarget) Close(ctx context.Context) (*transformer.AccessIdentifiers, error) {
|
func (it *internalTarget) Close(ctx context.Context) (*transformer.AccessIdentifiers, error) {
|
||||||
it.current.SetPayload(it.payload)
|
it.current.SetPayload(it.payload)
|
||||||
|
|
||||||
|
putSingleImplemented, err := it.tryPutSingle(ctx)
|
||||||
|
if putSingleImplemented {
|
||||||
|
return nil, err
|
||||||
|
|||||||
|
}
|
||||||
|
it.useStream = true
|
||||||
|
return nil, it.putAsStream(ctx)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (it *internalTarget) putAsStream(ctx context.Context) error {
|
||||||
wrt, err := it.client.objectPutInitRaw(ctx, it.prm)
|
wrt, err := it.client.objectPutInitRaw(ctx, it.prm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return err
|
||||||
}
|
}
|
||||||
if wrt.WriteHeader(ctx, *it.current) {
|
if wrt.WriteHeader(ctx, *it.current) {
|
||||||
wrt.WritePayloadChunk(ctx, it.current.Payload())
|
wrt.WritePayloadChunk(ctx, it.current.Payload())
|
||||||
|
@ -85,5 +98,32 @@ func (it *internalTarget) Close(ctx context.Context) (*transformer.AccessIdentif
|
||||||
it.res, err = wrt.Close(ctx)
|
it.res, err = wrt.Close(ctx)
|
||||||
it.current = nil
|
it.current = nil
|
||||||
it.payload = nil
|
it.payload = nil
|
||||||
return nil, err
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (it *internalTarget) tryPutSingle(ctx context.Context) (bool, error) {
|
||||||
|
if it.useStream {
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
var prm PrmObjectPutSingle
|
||||||
|
prm.SetCopiesNumber(it.prm.copyNum)
|
||||||
|
prm.SetObject(it.current.ToV2())
|
||||||
|
prm.UseKey(prm.key)
|
||||||
|
prm.meta = it.prm.meta
|
||||||
|
|
||||||
|
res, err := it.client.ObjectPutSingle(ctx, prm)
|
||||||
|
if err != nil && status.Code(err) == codes.Unimplemented {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err == nil {
|
||||||
|
id, _ := it.current.ID()
|
||||||
|
it.res = &ResObjectPut{
|
||||||
|
statusRes: res.statusRes,
|
||||||
|
obj: id,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
it.current = nil
|
||||||
|
it.payload = nil
|
||||||
|
return true, err
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue
What about fallback to the old implementation? Can we do something sane here?
Done