Add PutSingle method for object service #103
2 changed files with 163 additions and 7 deletions
116
client/object_put_single.go
Normal file
116
client/object_put_single.go
Normal file
|
@ -0,0 +1,116 @@
|
|||
package client
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/ecdsa"
|
||||
"fmt"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/acl"
|
||||
v2object "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
|
||||
rpcapi "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client"
|
||||
v2session "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/session"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/signature"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session"
|
||||
)
|
||||
|
||||
// PrmObjectPutSingle groups parameters of PutSingle operation.
|
||||
type PrmObjectPutSingle struct {
|
||||
copyNum []uint32
|
||||
meta v2session.RequestMetaHeader
|
||||
object *v2object.Object
|
||||
key *ecdsa.PrivateKey
|
||||
}
|
||||
|
||||
// SetCopiesNumber sets ordered list of minimal required object copies numbers
|
||||
// per placement vector. List's length MUST equal container's placement vector number,
|
||||
// otherwise request will fail.
|
||||
func (x *PrmObjectPutSingle) SetCopiesNumber(v []uint32) {
|
||||
x.copyNum = v
|
||||
}
|
||||
|
||||
// UseKey specifies private key to sign the requests.
|
||||
// If key is not provided, then Client default key is used.
|
||||
func (x *PrmObjectPutSingle) UseKey(key *ecdsa.PrivateKey) {
|
||||
x.key = key
|
||||
}
|
||||
|
||||
// WithBearerToken attaches bearer token to be used for the operation.
|
||||
// Should be called once before any writing steps.
|
||||
func (x *PrmObjectPutSingle) WithBearerToken(t bearer.Token) {
|
||||
v2token := &acl.BearerToken{}
|
||||
t.WriteToV2(v2token)
|
||||
x.meta.SetBearerToken(v2token)
|
||||
}
|
||||
|
||||
// WithinSession specifies session within which object should be stored.
|
||||
// Should be called once before any writing steps.
|
||||
func (x *PrmObjectPutSingle) WithinSession(t session.Object) {
|
||||
tv2 := &v2session.Token{}
|
||||
t.WriteToV2(tv2)
|
||||
x.meta.SetSessionToken(tv2)
|
||||
}
|
||||
|
||||
// ExecuteLocal tells the server to execute the operation locally.
|
||||
func (x *PrmObjectPutSingle) ExecuteLocal() {
|
||||
x.meta.SetTTL(1)
|
||||
}
|
||||
|
||||
// WithXHeaders specifies list of extended headers (string key-value pairs)
|
||||
// to be attached to the request. Must have an even length.
|
||||
//
|
||||
// Slice must not be mutated until the operation completes.
|
||||
func (x *PrmObjectPutSingle) WithXHeaders(hs ...string) {
|
||||
writeXHeadersToMeta(hs, &x.meta)
|
||||
}
|
||||
|
||||
// SetObject specifies prepared object to put.
|
||||
func (x *PrmObjectPutSingle) SetObject(o *v2object.Object) {
|
||||
x.object = o
|
||||
}
|
||||
|
||||
// ResObjectPutSingle groups resulting values of PutSingle operation.
|
||||
type ResObjectPutSingle struct {
|
||||
statusRes
|
||||
}
|
||||
|
||||
// ObjectPutSingle writes prepared object to FrostFS.
|
||||
// Object must have payload, also containerID, objectID, ownerID, payload hash, payload length of an object must be set.
|
||||
// Exactly one return value is non-nil. By default, server status is returned in res structure.
|
||||
// Any client's internal or transport errors are returned as Go built-in error.
|
||||
// If Client is tuned to resolve FrostFS API statuses, then FrostFS failures
|
||||
// codes are returned as error.
|
||||
func (c *Client) ObjectPutSingle(ctx context.Context, prm PrmObjectPutSingle) (*ResObjectPutSingle, error) {
|
||||
body := &v2object.PutSingleRequestBody{}
|
||||
body.SetCopiesNumber(prm.copyNum)
|
||||
body.SetObject(prm.object)
|
||||
|
||||
req := &v2object.PutSingleRequest{}
|
||||
req.SetBody(body)
|
||||
|
||||
c.prepareRequest(req, &prm.meta)
|
||||
|
||||
key := &c.prm.key
|
||||
if prm.key != nil {
|
||||
key = prm.key
|
||||
}
|
||||
|
||||
err := signature.SignServiceMessage(key, req)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("sign request: %w", err)
|
||||
}
|
||||
|
||||
resp, err := rpcapi.PutSingleObject(&c.c, req, client.WithContext(ctx))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var res ResObjectPutSingle
|
||||
res.st, err = c.processResponse(resp)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &res, nil
|
||||
}
|
|
@ -5,6 +5,8 @@ import (
|
|||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/transformer"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
)
|
||||
|
||||
func (c *Client) objectPutInitTransformer(prm PrmObjectPutInit) (*objectWriterTransformer, error) {
|
||||
|
@ -56,11 +58,12 @@ func (x *objectWriterTransformer) Close(ctx context.Context) (*ResObjectPut, err
|
|||
}
|
||||
|
||||
type internalTarget struct {
|
||||
current *object.Object
|
||||
client *Client
|
||||
res *ResObjectPut
|
||||
prm PrmObjectPutInit
|
||||
payload []byte
|
||||
current *object.Object
|
||||
client *Client
|
||||
res *ResObjectPut
|
||||
prm PrmObjectPutInit
|
||||
payload []byte
|
||||
useStream bool
|
||||
}
|
||||
|
||||
func (it *internalTarget) WriteHeader(_ context.Context, object *object.Object) error {
|
||||
|
@ -75,9 +78,19 @@ func (it *internalTarget) Write(_ context.Context, p []byte) (n int, err error)
|
|||
|
||||
func (it *internalTarget) Close(ctx context.Context) (*transformer.AccessIdentifiers, error) {
|
||||
it.current.SetPayload(it.payload)
|
||||
|
||||
putSingleImplemented, err := it.tryPutSingle(ctx)
|
||||
if putSingleImplemented {
|
||||
return nil, err
|
||||
}
|
||||
it.useStream = true
|
||||
return nil, it.putAsStream(ctx)
|
||||
}
|
||||
|
||||
func (it *internalTarget) putAsStream(ctx context.Context) error {
|
||||
wrt, err := it.client.objectPutInitRaw(ctx, it.prm)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return err
|
||||
}
|
||||
if wrt.WriteHeader(ctx, *it.current) {
|
||||
wrt.WritePayloadChunk(ctx, it.current.Payload())
|
||||
|
@ -85,5 +98,32 @@ func (it *internalTarget) Close(ctx context.Context) (*transformer.AccessIdentif
|
|||
it.res, err = wrt.Close(ctx)
|
||||
it.current = nil
|
||||
it.payload = nil
|
||||
return nil, err
|
||||
return err
|
||||
}
|
||||
|
||||
func (it *internalTarget) tryPutSingle(ctx context.Context) (bool, error) {
|
||||
if it.useStream {
|
||||
return false, nil
|
||||
}
|
||||
var prm PrmObjectPutSingle
|
||||
prm.SetCopiesNumber(it.prm.copyNum)
|
||||
prm.SetObject(it.current.ToV2())
|
||||
prm.UseKey(prm.key)
|
||||
prm.meta = it.prm.meta
|
||||
|
||||
res, err := it.client.ObjectPutSingle(ctx, prm)
|
||||
if err != nil && status.Code(err) == codes.Unimplemented {
|
||||
return false, err
|
||||
}
|
||||
|
||||
if err == nil {
|
||||
id, _ := it.current.ID()
|
||||
it.res = &ResObjectPut{
|
||||
statusRes: res.statusRes,
|
||||
obj: id,
|
||||
}
|
||||
}
|
||||
it.current = nil
|
||||
it.payload = nil
|
||||
return true, err
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue