From 98cab7ed61663452b1f9abfca22cd746539f3fe6 Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Thu, 6 Jul 2023 17:06:17 +0300 Subject: [PATCH] [#103] object: Add PutSingle method code Signed-off-by: Dmitrii Stepanov --- client/object_put_single.go | 116 +++++++++++++++++++++++++++++++ client/object_put_transformer.go | 54 ++++++++++++-- 2 files changed, 163 insertions(+), 7 deletions(-) create mode 100644 client/object_put_single.go diff --git a/client/object_put_single.go b/client/object_put_single.go new file mode 100644 index 00000000..4ed03a67 --- /dev/null +++ b/client/object_put_single.go @@ -0,0 +1,116 @@ +package client + +import ( + "context" + "crypto/ecdsa" + "fmt" + + "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/acl" + v2object "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object" + rpcapi "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc" + "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client" + v2session "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/session" + "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/signature" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session" +) + +// PrmObjectPutSingle groups parameters of PutSingle operation. +type PrmObjectPutSingle struct { + copyNum []uint32 + meta v2session.RequestMetaHeader + object *v2object.Object + key *ecdsa.PrivateKey +} + +// SetCopiesNumber sets ordered list of minimal required object copies numbers +// per placement vector. List's length MUST equal container's placement vector number, +// otherwise request will fail. +func (x *PrmObjectPutSingle) SetCopiesNumber(v []uint32) { + x.copyNum = v +} + +// UseKey specifies private key to sign the requests. +// If key is not provided, then Client default key is used. +func (x *PrmObjectPutSingle) UseKey(key *ecdsa.PrivateKey) { + x.key = key +} + +// WithBearerToken attaches bearer token to be used for the operation. +// Should be called once before any writing steps. +func (x *PrmObjectPutSingle) WithBearerToken(t bearer.Token) { + v2token := &acl.BearerToken{} + t.WriteToV2(v2token) + x.meta.SetBearerToken(v2token) +} + +// WithinSession specifies session within which object should be stored. +// Should be called once before any writing steps. +func (x *PrmObjectPutSingle) WithinSession(t session.Object) { + tv2 := &v2session.Token{} + t.WriteToV2(tv2) + x.meta.SetSessionToken(tv2) +} + +// ExecuteLocal tells the server to execute the operation locally. +func (x *PrmObjectPutSingle) ExecuteLocal() { + x.meta.SetTTL(1) +} + +// WithXHeaders specifies list of extended headers (string key-value pairs) +// to be attached to the request. Must have an even length. +// +// Slice must not be mutated until the operation completes. +func (x *PrmObjectPutSingle) WithXHeaders(hs ...string) { + writeXHeadersToMeta(hs, &x.meta) +} + +// SetObject specifies prepared object to put. +func (x *PrmObjectPutSingle) SetObject(o *v2object.Object) { + x.object = o +} + +// ResObjectPutSingle groups resulting values of PutSingle operation. +type ResObjectPutSingle struct { + statusRes +} + +// ObjectPutSingle writes prepared object to FrostFS. +// Object must have payload, also containerID, objectID, ownerID, payload hash, payload length of an object must be set. +// Exactly one return value is non-nil. By default, server status is returned in res structure. +// Any client's internal or transport errors are returned as Go built-in error. +// If Client is tuned to resolve FrostFS API statuses, then FrostFS failures +// codes are returned as error. +func (c *Client) ObjectPutSingle(ctx context.Context, prm PrmObjectPutSingle) (*ResObjectPutSingle, error) { + body := &v2object.PutSingleRequestBody{} + body.SetCopiesNumber(prm.copyNum) + body.SetObject(prm.object) + + req := &v2object.PutSingleRequest{} + req.SetBody(body) + + c.prepareRequest(req, &prm.meta) + + key := &c.prm.key + if prm.key != nil { + key = prm.key + } + + err := signature.SignServiceMessage(key, req) + if err != nil { + return nil, fmt.Errorf("sign request: %w", err) + } + + resp, err := rpcapi.PutSingleObject(&c.c, req, client.WithContext(ctx)) + if err != nil { + return nil, err + } + + var res ResObjectPutSingle + res.st, err = c.processResponse(resp) + if err != nil { + return nil, err + } + + return &res, nil +} diff --git a/client/object_put_transformer.go b/client/object_put_transformer.go index b010deb6..cb44f61d 100644 --- a/client/object_put_transformer.go +++ b/client/object_put_transformer.go @@ -5,6 +5,8 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/transformer" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) func (c *Client) objectPutInitTransformer(prm PrmObjectPutInit) (*objectWriterTransformer, error) { @@ -56,11 +58,12 @@ func (x *objectWriterTransformer) Close(ctx context.Context) (*ResObjectPut, err } type internalTarget struct { - current *object.Object - client *Client - res *ResObjectPut - prm PrmObjectPutInit - payload []byte + current *object.Object + client *Client + res *ResObjectPut + prm PrmObjectPutInit + payload []byte + useStream bool } func (it *internalTarget) WriteHeader(_ context.Context, object *object.Object) error { @@ -75,9 +78,19 @@ func (it *internalTarget) Write(_ context.Context, p []byte) (n int, err error) func (it *internalTarget) Close(ctx context.Context) (*transformer.AccessIdentifiers, error) { it.current.SetPayload(it.payload) + + putSingleImplemented, err := it.tryPutSingle(ctx) + if putSingleImplemented { + return nil, err + } + it.useStream = true + return nil, it.putAsStream(ctx) +} + +func (it *internalTarget) putAsStream(ctx context.Context) error { wrt, err := it.client.objectPutInitRaw(ctx, it.prm) if err != nil { - return nil, err + return err } if wrt.WriteHeader(ctx, *it.current) { wrt.WritePayloadChunk(ctx, it.current.Payload()) @@ -85,5 +98,32 @@ func (it *internalTarget) Close(ctx context.Context) (*transformer.AccessIdentif it.res, err = wrt.Close(ctx) it.current = nil it.payload = nil - return nil, err + return err +} + +func (it *internalTarget) tryPutSingle(ctx context.Context) (bool, error) { + if it.useStream { + return false, nil + } + var prm PrmObjectPutSingle + prm.SetCopiesNumber(it.prm.copyNum) + prm.SetObject(it.current.ToV2()) + prm.UseKey(prm.key) + prm.meta = it.prm.meta + + res, err := it.client.ObjectPutSingle(ctx, prm) + if err != nil && status.Code(err) == codes.Unimplemented { + return false, err + } + + if err == nil { + id, _ := it.current.ID() + it.res = &ResObjectPut{ + statusRes: res.statusRes, + obj: id, + } + } + it.current = nil + it.payload = nil + return true, err }