From 9ce6eed5ad0a9f7770e484f7a98d2e67ec464ce8 Mon Sep 17 00:00:00 2001 From: Airat Arifullin Date: Thu, 7 Sep 2023 11:53:49 +0300 Subject: [PATCH] [#121] client: Make PrmObjectPutSingle fields public Signed-off-by: Airat Arifullin --- client/object_put.go | 79 +++++++++++++------- client/object_put_raw.go | 38 ++++++++-- client/object_put_single.go | 121 ++++++++++++++++++++++--------- client/object_put_transformer.go | 25 ++++--- pool/pool.go | 15 ++-- 5 files changed, 191 insertions(+), 87 deletions(-) diff --git a/client/object_put.go b/client/object_put.go index 2861ea4..213ec7f 100644 --- a/client/object_put.go +++ b/client/object_put.go @@ -4,8 +4,6 @@ import ( "context" "crypto/ecdsa" - "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/acl" - v2session "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/session" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" @@ -19,25 +17,41 @@ const defaultGRPCPayloadChunkLen = 3 << 20 // PrmObjectPutInit groups parameters of ObjectPutInit operation. type PrmObjectPutInit struct { - copyNum []uint32 - key *ecdsa.PrivateKey - meta v2session.RequestMetaHeader - maxChunkLen int - maxSize uint64 - epochSource transformer.EpochSource - withoutHomomorphicHash bool + XHeaders []string + + BearerToken *bearer.Token + + Session *session.Object + + Local bool + + CopiesNumber []uint32 + + MaxChunkLength int + + MaxSize uint64 + + EpochSource transformer.EpochSource + + WithoutHomomorphHash bool + + Key *ecdsa.PrivateKey } // SetCopiesNumber sets number of object copies that is enough to consider put successful. +// +// Deprecated: Use PrmObjectPutInit.CopiesNumber instead. func (x *PrmObjectPutInit) SetCopiesNumber(copiesNumber uint32) { - x.copyNum = []uint32{copiesNumber} + x.CopiesNumber = []uint32{copiesNumber} } // SetCopiesNumberByVectors sets ordered list of minimal required object copies numbers // per placement vector. List's length MUST equal container's placement vector number, // otherwise request will fail. +// +// Deprecated: Use PrmObjectPutInit.CopiesNumber instead. func (x *PrmObjectPutInit) SetCopiesNumberByVectors(copiesNumbers []uint32) { - x.copyNum = copiesNumbers + x.CopiesNumber = copiesNumbers } // SetGRPCPayloadChunkLen sets maximum chunk length value for gRPC Put request. @@ -45,8 +59,10 @@ func (x *PrmObjectPutInit) SetCopiesNumberByVectors(copiesNumbers []uint32) { // transmitted in a single stream message. It depends on // server settings and other message fields. // If not specified or negative value set, default value of 3MiB will be used. +// +// Deprecated: Use PrmObjectPutInit.MaxChunkLength instead. func (x *PrmObjectPutInit) SetGRPCPayloadChunkLen(v int) { - x.maxChunkLen = v + x.MaxChunkLength = v } // ResObjectPut groups the final result values of ObjectPutInit operation. @@ -94,55 +110,66 @@ type ObjectWriter interface { // UseKey specifies private key to sign the requests. // If key is not provided, then Client default key is used. +// +// Deprecated: Use PrmObjectPutInit.Key instead. func (x *PrmObjectPutInit) UseKey(key ecdsa.PrivateKey) { - x.key = &key + x.Key = &key } // WithBearerToken attaches bearer token to be used for the operation. // Should be called once before any writing steps. +// +// Deprecated: Use PrmObjectPutInit.BearerToken instead. func (x *PrmObjectPutInit) WithBearerToken(t bearer.Token) { - var v2token acl.BearerToken - t.WriteToV2(&v2token) - x.meta.SetBearerToken(&v2token) + x.BearerToken = &t } // WithinSession specifies session within which object should be stored. // Should be called once before any writing steps. +// +// Deprecated: Use PrmObjectPutInit.Session instead. func (x *PrmObjectPutInit) WithinSession(t session.Object) { - var tv2 v2session.Token - t.WriteToV2(&tv2) - - x.meta.SetSessionToken(&tv2) + x.Session = &t } // MarkLocal tells the server to execute the operation locally. +// +// Deprecated: Use PrmObjectPutInit.Local instead. func (x *PrmObjectPutInit) MarkLocal() { - x.meta.SetTTL(1) + x.Local = true } // WithXHeaders specifies list of extended headers (string key-value pairs) // to be attached to the request. Must have an even length. // // Slice must not be mutated until the operation completes. +// +// Deprecated: Use PrmObjectPutInit.XHeaders instead. func (x *PrmObjectPutInit) WithXHeaders(hs ...string) { - writeXHeadersToMeta(hs, &x.meta) + x.XHeaders = hs } // WithObjectMaxSize specifies max object size value and use it during object splitting. // When specified, start writing to the stream only after the object is formed. // Continue processing the input only when the previous formed object has been successfully written. +// +// Deprecated: Use PrmObjectPutInit.MaxSize instead. func (x *PrmObjectPutInit) WithObjectMaxSize(maxSize uint64) { - x.maxSize = maxSize + x.MaxSize = maxSize } // WithoutHomomorphicHash if set to true do not use Tillich-ZĂ©mor hash for payload. +// +// Deprecated: Use PrmObjectPutInit.WithoutHomomorphHash instead. func (x *PrmObjectPutInit) WithoutHomomorphicHash(v bool) { - x.withoutHomomorphicHash = v + x.WithoutHomomorphHash = v } // WithEpochSource specifies epoch for object when split it on client side. +// +// Deprecated: Use PrmObjectPutInit.EpochSource instead. func (x *PrmObjectPutInit) WithEpochSource(es transformer.EpochSource) { - x.epochSource = es + x.EpochSource = es } // ObjectPutInit initiates writing an object through a remote server using FrostFS API protocol. @@ -153,7 +180,7 @@ func (x *PrmObjectPutInit) WithEpochSource(es transformer.EpochSource) { // Returns an error if parameters are set incorrectly. // Context is required and must not be nil. It is used for network communication. func (c *Client) ObjectPutInit(ctx context.Context, prm PrmObjectPutInit) (ObjectWriter, error) { - if prm.maxSize > 0 { + if prm.MaxSize > 0 { return c.objectPutInitTransformer(prm) } return c.objectPutInitRaw(ctx, prm) diff --git a/client/object_put_raw.go b/client/object_put_raw.go index 64cb250..0669052 100644 --- a/client/object_put_raw.go +++ b/client/object_put_raw.go @@ -7,15 +7,21 @@ import ( "fmt" "io" + "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/acl" v2object "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object" rpcapi "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc" "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client" + v2session "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/session" "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/signature" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" ) func (c *Client) objectPutInitRaw(ctx context.Context, prm PrmObjectPutInit) (*objectWriterRaw, error) { + if len(prm.XHeaders)%2 != 0 { + return nil, errorInvalidXHeaders + } + var w objectWriterRaw stream, err := rpcapi.PutObject(&c.c, &w.respV2, client.WithContext(ctx)) if err != nil { @@ -23,19 +29,39 @@ func (c *Client) objectPutInitRaw(ctx context.Context, prm PrmObjectPutInit) (*o } w.key = &c.prm.key - if prm.key != nil { - w.key = prm.key + if prm.Key != nil { + w.key = prm.Key } w.client = c w.stream = stream - w.partInit.SetCopiesNumber(prm.copyNum) + w.partInit.SetCopiesNumber(prm.CopiesNumber) w.req.SetBody(new(v2object.PutRequestBody)) - if prm.maxChunkLen > 0 { - w.maxChunkLen = prm.maxChunkLen + if prm.MaxChunkLength > 0 { + w.maxChunkLen = prm.MaxChunkLength } else { w.maxChunkLen = defaultGRPCPayloadChunkLen } - c.prepareRequest(&w.req, &prm.meta) + + meta := new(v2session.RequestMetaHeader) + writeXHeadersToMeta(prm.XHeaders, meta) + + if prm.BearerToken != nil { + v2BearerToken := new(acl.BearerToken) + prm.BearerToken.WriteToV2(v2BearerToken) + meta.SetBearerToken(v2BearerToken) + } + + if prm.Session != nil { + v2SessionToken := new(v2session.Token) + prm.Session.WriteToV2(v2SessionToken) + meta.SetSessionToken(v2SessionToken) + } + + if prm.Local { + meta.SetTTL(1) + } + + c.prepareRequest(&w.req, meta) return &w, nil } diff --git a/client/object_put_single.go b/client/object_put_single.go index 4ed03a6..0ae6bf8 100644 --- a/client/object_put_single.go +++ b/client/object_put_single.go @@ -12,62 +12,82 @@ import ( v2session "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/session" "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/signature" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session" ) // PrmObjectPutSingle groups parameters of PutSingle operation. type PrmObjectPutSingle struct { - copyNum []uint32 - meta v2session.RequestMetaHeader - object *v2object.Object - key *ecdsa.PrivateKey + XHeaders []string + + BearerToken *bearer.Token + + Session *session.Object + + Local bool + + CopiesNumber []uint32 + + Object *object.Object + + Key *ecdsa.PrivateKey } // SetCopiesNumber sets ordered list of minimal required object copies numbers // per placement vector. List's length MUST equal container's placement vector number, // otherwise request will fail. -func (x *PrmObjectPutSingle) SetCopiesNumber(v []uint32) { - x.copyNum = v +// +// Deprecated: Use PrmObjectPutSingle.CopiesNumber instead. +func (prm *PrmObjectPutSingle) SetCopiesNumber(v []uint32) { + prm.CopiesNumber = v } // UseKey specifies private key to sign the requests. // If key is not provided, then Client default key is used. -func (x *PrmObjectPutSingle) UseKey(key *ecdsa.PrivateKey) { - x.key = key +// +// Deprecated: Use PrmObjectPutSingle.Key instead. +func (prm *PrmObjectPutSingle) UseKey(key *ecdsa.PrivateKey) { + prm.Key = key } // WithBearerToken attaches bearer token to be used for the operation. // Should be called once before any writing steps. -func (x *PrmObjectPutSingle) WithBearerToken(t bearer.Token) { - v2token := &acl.BearerToken{} - t.WriteToV2(v2token) - x.meta.SetBearerToken(v2token) +// +// Deprecated: Use PrmObjectPutSingle.BearerToken instead. +func (prm *PrmObjectPutSingle) WithBearerToken(t bearer.Token) { + prm.BearerToken = &t } // WithinSession specifies session within which object should be stored. // Should be called once before any writing steps. -func (x *PrmObjectPutSingle) WithinSession(t session.Object) { - tv2 := &v2session.Token{} - t.WriteToV2(tv2) - x.meta.SetSessionToken(tv2) +// +// Deprecated: Use PrmObjectPutSingle.Session instead. +func (prm *PrmObjectPutSingle) WithinSession(t session.Object) { + prm.Session = &t } // ExecuteLocal tells the server to execute the operation locally. -func (x *PrmObjectPutSingle) ExecuteLocal() { - x.meta.SetTTL(1) +// +// Deprecated: Use PrmObjectPutSingle.Local instead. +func (prm *PrmObjectPutSingle) ExecuteLocal() { + prm.Local = true } // WithXHeaders specifies list of extended headers (string key-value pairs) // to be attached to the request. Must have an even length. // // Slice must not be mutated until the operation completes. -func (x *PrmObjectPutSingle) WithXHeaders(hs ...string) { - writeXHeadersToMeta(hs, &x.meta) +// +// Deprecated: Use PrmObjectPutSingle.XHeaders instead. +func (prm *PrmObjectPutSingle) WithXHeaders(hs ...string) { + prm.XHeaders = hs } // SetObject specifies prepared object to put. -func (x *PrmObjectPutSingle) SetObject(o *v2object.Object) { - x.object = o +// +// Deprecated: Use PrmObjectPutSingle.Object instead. +func (prm *PrmObjectPutSingle) SetObject(o *v2object.Object) { + prm.Object = object.NewFromV2(o) } // ResObjectPutSingle groups resulting values of PutSingle operation. @@ -75,6 +95,41 @@ type ResObjectPutSingle struct { statusRes } +func (prm *PrmObjectPutSingle) buildRequest(c *Client) (*v2object.PutSingleRequest, error) { + if len(prm.XHeaders)%2 != 0 { + return nil, errorInvalidXHeaders + } + + body := new(v2object.PutSingleRequestBody) + body.SetCopiesNumber(prm.CopiesNumber) + body.SetObject(prm.Object.ToV2()) + + meta := new(v2session.RequestMetaHeader) + writeXHeadersToMeta(prm.XHeaders, meta) + + if prm.BearerToken != nil { + v2BearerToken := new(acl.BearerToken) + prm.BearerToken.WriteToV2(v2BearerToken) + meta.SetBearerToken(v2BearerToken) + } + + if prm.Session != nil { + v2SessionToken := new(v2session.Token) + prm.Session.WriteToV2(v2SessionToken) + meta.SetSessionToken(v2SessionToken) + } + + if prm.Local { + meta.SetTTL(1) + } + + req := &v2object.PutSingleRequest{} + req.SetBody(body) + c.prepareRequest(req, meta) + + return req, nil +} + // ObjectPutSingle writes prepared object to FrostFS. // Object must have payload, also containerID, objectID, ownerID, payload hash, payload length of an object must be set. // Exactly one return value is non-nil. By default, server status is returned in res structure. @@ -82,21 +137,17 @@ type ResObjectPutSingle struct { // If Client is tuned to resolve FrostFS API statuses, then FrostFS failures // codes are returned as error. func (c *Client) ObjectPutSingle(ctx context.Context, prm PrmObjectPutSingle) (*ResObjectPutSingle, error) { - body := &v2object.PutSingleRequestBody{} - body.SetCopiesNumber(prm.copyNum) - body.SetObject(prm.object) - - req := &v2object.PutSingleRequest{} - req.SetBody(body) - - c.prepareRequest(req, &prm.meta) - - key := &c.prm.key - if prm.key != nil { - key = prm.key + req, err := prm.buildRequest(c) + if err != nil { + return nil, err } - err := signature.SignServiceMessage(key, req) + key := &c.prm.key + if prm.Key != nil { + key = prm.Key + } + + err = signature.SignServiceMessage(key, req) if err != nil { return nil, fmt.Errorf("sign request: %w", err) } diff --git a/client/object_put_transformer.go b/client/object_put_transformer.go index 402ad00..9013348 100644 --- a/client/object_put_transformer.go +++ b/client/object_put_transformer.go @@ -17,15 +17,15 @@ func (c *Client) objectPutInitTransformer(prm PrmObjectPutInit) (*objectWriterTr prm: prm, } key := &c.prm.key - if prm.key != nil { - key = prm.key + if prm.Key != nil { + key = prm.Key } w.ot = transformer.NewPayloadSizeLimiter(transformer.Params{ Key: key, NextTargetInit: func() transformer.ObjectWriter { return &w.it }, - MaxSize: prm.maxSize, - WithoutHomomorphicHash: prm.withoutHomomorphicHash, - NetworkState: prm.epochSource, + MaxSize: prm.MaxSize, + WithoutHomomorphicHash: prm.WithoutHomomorphHash, + NetworkState: prm.EpochSource, }) return &w, nil } @@ -93,11 +93,16 @@ func (it *internalTarget) tryPutSingle(ctx context.Context, o *object.Object) (b if it.useStream { return false, nil } - var prm PrmObjectPutSingle - prm.SetCopiesNumber(it.prm.copyNum) - prm.SetObject(o.ToV2()) - prm.UseKey(prm.key) - prm.meta = it.prm.meta + + prm := PrmObjectPutSingle{ + XHeaders: it.prm.XHeaders, + BearerToken: it.prm.BearerToken, + Session: it.prm.Session, + Local: it.prm.Local, + CopiesNumber: it.prm.CopiesNumber, + Object: o, + Key: it.prm.Key, + } res, err := it.client.ObjectPutSingle(ctx, prm) if err != nil && status.Code(err) == codes.Unimplemented { diff --git a/pool/pool.go b/pool/pool.go index 6dc8fb9..ba8e182 100644 --- a/pool/pool.go +++ b/pool/pool.go @@ -651,16 +651,11 @@ func (c *clientWrapper) objectPutServerCut(ctx context.Context, prm PrmObjectPut return oid.ID{}, err } - var cliPrm sdkClient.PrmObjectPutInit - cliPrm.SetCopiesNumberByVectors(prm.copiesNumber) - if prm.stoken != nil { - cliPrm.WithinSession(*prm.stoken) - } - if prm.key != nil { - cliPrm.UseKey(*prm.key) - } - if prm.btoken != nil { - cliPrm.WithBearerToken(*prm.btoken) + cliPrm := sdkClient.PrmObjectPutInit{ + CopiesNumber: prm.copiesNumber, + Session: prm.stoken, + Key: prm.key, + BearerToken: prm.btoken, } start := time.Now()