From 84e7e69f98ac8c0d6ef09fb90bb60ba035aade17 Mon Sep 17 00:00:00 2001 From: Airat Arifullin Date: Mon, 28 Aug 2023 11:07:18 +0300 Subject: [PATCH] [#121] client: Make PrmObjectGet/Head/GetRange fields public * Remove common PrmObjectRead structure * Introduce buildRequest for PrmObjectGet/Head/GetRange * Refactor the usage of these params in pool Signed-off-by: Airat Arifullin --- client/common.go | 2 + client/object_get.go | 397 ++++++++++++++++++++++++++----------------- pool/pool.go | 71 +++----- 3 files changed, 272 insertions(+), 198 deletions(-) diff --git a/client/common.go b/client/common.go index 907f6be..3895b00 100644 --- a/client/common.go +++ b/client/common.go @@ -47,6 +47,8 @@ func writeXHeadersToMeta(xHeaders []string, h *v2session.RequestMetaHeader) { return } + // TODO (aarifullin): remove the panic when all client parameters will check XHeaders + // within buildRequest invocation. if len(xHeaders)%2 != 0 { panic("slice of X-Headers with odd length") } diff --git a/client/object_get.go b/client/object_get.go index 02de74a..56cbfda 100644 --- a/client/object_get.go +++ b/client/object_get.go @@ -22,77 +22,76 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session" ) -// shared parameters of GET/HEAD/RANGE. -type prmObjectRead struct { - meta v2session.RequestMetaHeader - - raw bool - - addr v2refs.Address -} - -// WithXHeaders specifies list of extended headers (string key-value pairs) -// to be attached to the request. Must have an even length. -// -// Slice must not be mutated until the operation completes. -func (x *prmObjectRead) WithXHeaders(hs ...string) { - writeXHeadersToMeta(hs, &x.meta) -} - -// MarkRaw marks an intent to read physically stored object. -func (x *prmObjectRead) MarkRaw() { - x.raw = true -} - -// MarkLocal tells the server to execute the operation locally. -func (x *prmObjectRead) MarkLocal() { - x.meta.SetTTL(1) -} - -// WithinSession specifies session within which object should be read. -// -// Creator of the session acquires the authorship of the request. -// This may affect the execution of an operation (e.g. access control). -// -// Must be signed. -func (x *prmObjectRead) WithinSession(t session.Object) { - var tokv2 v2session.Token - t.WriteToV2(&tokv2) - x.meta.SetSessionToken(&tokv2) -} - -// WithBearerToken attaches bearer token to be used for the operation. -// -// If set, underlying eACL rules will be used in access control. -// -// Must be signed. -func (x *prmObjectRead) WithBearerToken(t bearer.Token) { - var v2token acl.BearerToken - t.WriteToV2(&v2token) - x.meta.SetBearerToken(&v2token) -} - -// FromContainer specifies FrostFS container of the object. -// Required parameter. -func (x *prmObjectRead) FromContainer(id cid.ID) { - var cnrV2 v2refs.ContainerID - id.WriteToV2(&cnrV2) - x.addr.SetContainerID(&cnrV2) -} - -// ByID specifies identifier of the requested object. -// Required parameter. -func (x *prmObjectRead) ByID(id oid.ID) { - var objV2 v2refs.ObjectID - id.WriteToV2(&objV2) - x.addr.SetObjectID(&objV2) -} - // PrmObjectGet groups parameters of ObjectGetInit operation. type PrmObjectGet struct { - prmObjectRead + XHeaders []string - key *ecdsa.PrivateKey + BearerToken *bearer.Token + + Session *session.Object + + Raw bool + + Local bool + + ContainerID *cid.ID + + ObjectID *oid.ID + + Key *ecdsa.PrivateKey +} + +func (prm *PrmObjectGet) buildRequest(c *Client) (*v2object.GetRequest, error) { + if prm.ContainerID == nil { + return nil, errorMissingContainer + } + + if prm.ObjectID == nil { + return nil, errorMissingObject + } + + if len(prm.XHeaders)%2 != 0 { + return nil, errorInvalidXHeaders + } + + meta := new(v2session.RequestMetaHeader) + writeXHeadersToMeta(prm.XHeaders, meta) + + if prm.BearerToken != nil { + v2BearerToken := new(acl.BearerToken) + prm.BearerToken.WriteToV2(v2BearerToken) + meta.SetBearerToken(v2BearerToken) + } + + if prm.Session != nil { + v2SessionToken := new(v2session.Token) + prm.Session.WriteToV2(v2SessionToken) + meta.SetSessionToken(v2SessionToken) + } + + if prm.Local { + meta.SetTTL(1) + } + + addr := new(v2refs.Address) + + cnrV2 := new(v2refs.ContainerID) + prm.ContainerID.WriteToV2(cnrV2) + addr.SetContainerID(cnrV2) + + objV2 := new(v2refs.ObjectID) + prm.ObjectID.WriteToV2(objV2) + addr.SetObjectID(objV2) + + body := new(v2object.GetRequestBody) + body.SetRaw(prm.Raw) + body.SetAddress(addr) + + req := new(v2object.GetRequest) + req.SetBody(body) + c.prepareRequest(req, meta) + + return req, nil } // ResObjectGet groups the final result values of ObjectGetInit operation. @@ -122,8 +121,10 @@ type ObjectReader struct { // UseKey specifies private key to sign the requests. // If key is not provided, then Client default key is used. -func (x *PrmObjectGet) UseKey(key ecdsa.PrivateKey) { - x.key = &key +// +// Deprecated: Use PrmObjectGet.Key instead. +func (prm *PrmObjectGet) UseKey(key ecdsa.PrivateKey) { + prm.Key = &key } // ReadHeader reads header of the object. Result means success. @@ -299,39 +300,24 @@ func (x *ObjectReader) Read(p []byte) (int, error) { // Returns an error if parameters are set incorrectly (see PrmObjectGet docs). // Context is required and must not be nil. It is used for network communication. func (c *Client) ObjectGetInit(ctx context.Context, prm PrmObjectGet) (*ObjectReader, error) { - // check parameters - switch { - case prm.addr.GetContainerID() == nil: - return nil, errorMissingContainer - case prm.addr.GetObjectID() == nil: - return nil, errorMissingObject + req, err := prm.buildRequest(c) + if err != nil { + return nil, err } - // form request body - var body v2object.GetRequestBody - - body.SetRaw(prm.raw) - body.SetAddress(&prm.addr) - - // form request - var req v2object.GetRequest - - req.SetBody(&body) - c.prepareRequest(&req, &prm.meta) - - key := prm.key + key := prm.Key if key == nil { key = &c.prm.key } - err := signature.SignServiceMessage(key, &req) + err = signature.SignServiceMessage(key, req) if err != nil { return nil, fmt.Errorf("sign request: %w", err) } ctx, cancel := context.WithCancel(ctx) - stream, err := rpcapi.GetObject(&c.c, &req, client.WithContext(ctx)) + stream, err := rpcapi.GetObject(&c.c, req, client.WithContext(ctx)) if err != nil { cancel() return nil, fmt.Errorf("open stream: %w", err) @@ -347,17 +333,29 @@ func (c *Client) ObjectGetInit(ctx context.Context, prm PrmObjectGet) (*ObjectRe // PrmObjectHead groups parameters of ObjectHead operation. type PrmObjectHead struct { - prmObjectRead + XHeaders []string - keySet bool - key ecdsa.PrivateKey + BearerToken *bearer.Token + + Session *session.Object + + Raw bool + + Local bool + + ContainerID *cid.ID + + ObjectID *oid.ID + + Key *ecdsa.PrivateKey } // UseKey specifies private key to sign the requests. // If key is not provided, then Client default key is used. -func (x *PrmObjectHead) UseKey(key ecdsa.PrivateKey) { - x.keySet = true - x.key = key +// +// Deprecated: Use PrmObjectHead.Key instead. +func (prm *PrmObjectHead) UseKey(key ecdsa.PrivateKey) { + prm.Key = &key } // ResObjectHead groups resulting values of ObjectHead operation. @@ -390,6 +388,58 @@ func (x *ResObjectHead) ReadHeader(dst *object.Object) bool { return true } +func (prm *PrmObjectHead) buildRequest(c *Client) (*v2object.HeadRequest, error) { + if prm.ContainerID == nil { + return nil, errorMissingContainer + } + + if prm.ObjectID == nil { + return nil, errorMissingObject + } + + if len(prm.XHeaders)%2 != 0 { + return nil, errorInvalidXHeaders + } + + meta := new(v2session.RequestMetaHeader) + writeXHeadersToMeta(prm.XHeaders, meta) + + if prm.BearerToken != nil { + v2BearerToken := new(acl.BearerToken) + prm.BearerToken.WriteToV2(v2BearerToken) + meta.SetBearerToken(v2BearerToken) + } + + if prm.Session != nil { + v2SessionToken := new(v2session.Token) + prm.Session.WriteToV2(v2SessionToken) + meta.SetSessionToken(v2SessionToken) + } + + if prm.Local { + meta.SetTTL(1) + } + + addr := new(v2refs.Address) + + cnrV2 := new(v2refs.ContainerID) + prm.ContainerID.WriteToV2(cnrV2) + addr.SetContainerID(cnrV2) + + objV2 := new(v2refs.ObjectID) + prm.ObjectID.WriteToV2(objV2) + addr.SetObjectID(objV2) + body := new(v2object.HeadRequestBody) + body.SetRaw(prm.Raw) + body.SetAddress(addr) + + req := new(v2object.HeadRequest) + req.SetBody(body) + c.prepareRequest(req, meta) + + return req, nil +} + // ObjectHead reads object header through a remote server using FrostFS API protocol. // // Exactly one return value is non-nil. By default, server status is returned in res structure. @@ -413,33 +463,24 @@ func (x *ResObjectHead) ReadHeader(dst *object.Object) bool { // - *apistatus.ObjectAlreadyRemoved; // - *apistatus.SessionTokenExpired. func (c *Client) ObjectHead(ctx context.Context, prm PrmObjectHead) (*ResObjectHead, error) { - switch { - case prm.addr.GetContainerID() == nil: - return nil, errorMissingContainer - case prm.addr.GetObjectID() == nil: - return nil, errorMissingObject + req, err := prm.buildRequest(c) + if err != nil { + return nil, err } - var body v2object.HeadRequestBody - body.SetRaw(prm.raw) - body.SetAddress(&prm.addr) - - var req v2object.HeadRequest - req.SetBody(&body) - c.prepareRequest(&req, &prm.meta) - key := c.prm.key - if prm.keySet { - key = prm.key + if prm.Key != nil { + key = *prm.Key } // sign the request - err := signature.SignServiceMessage(&key, &req) + + err = signature.SignServiceMessage(&key, req) if err != nil { return nil, fmt.Errorf("sign request: %w", err) } - resp, err := rpcapi.HeadObject(&c.c, &req, client.WithContext(ctx)) + resp, err := rpcapi.HeadObject(&c.c, req, client.WithContext(ctx)) if err != nil { return nil, fmt.Errorf("write request: %w", err) } @@ -454,7 +495,7 @@ func (c *Client) ObjectHead(ctx context.Context, prm PrmObjectHead) (*ResObjectH return &res, nil } - _ = res.idObj.ReadFromV2(*prm.addr.GetObjectID()) + res.idObj = *prm.ObjectID switch v := resp.GetBody().GetHeaderPart().(type) { default: @@ -470,29 +511,95 @@ func (c *Client) ObjectHead(ctx context.Context, prm PrmObjectHead) (*ResObjectH // PrmObjectRange groups parameters of ObjectRange operation. type PrmObjectRange struct { - prmObjectRead + XHeaders []string - key *ecdsa.PrivateKey + BearerToken *bearer.Token - rng v2object.Range + Session *session.Object + + Raw bool + + Local bool + + ContainerID *cid.ID + + ObjectID *oid.ID + + Key *ecdsa.PrivateKey + + Offset uint64 + + Length uint64 } -// SetOffset sets offset of the payload range to be read. -// Zero by default. -func (x *PrmObjectRange) SetOffset(off uint64) { - x.rng.SetOffset(off) -} +func (prm *PrmObjectRange) buildRequest(c *Client) (*v2object.GetRangeRequest, error) { + if prm.Length == 0 { + return nil, errorZeroRangeLength + } -// SetLength sets length of the payload range to be read. -// Must be positive. -func (x *PrmObjectRange) SetLength(ln uint64) { - x.rng.SetLength(ln) + if prm.ContainerID == nil { + return nil, errorMissingContainer + } + + if prm.ObjectID == nil { + return nil, errorMissingObject + } + + if len(prm.XHeaders)%2 != 0 { + return nil, errorInvalidXHeaders + } + + meta := new(v2session.RequestMetaHeader) + writeXHeadersToMeta(prm.XHeaders, meta) + + if prm.BearerToken != nil { + v2BearerToken := new(acl.BearerToken) + prm.BearerToken.WriteToV2(v2BearerToken) + meta.SetBearerToken(v2BearerToken) + } + + if prm.Session != nil { + v2SessionToken := new(v2session.Token) + prm.Session.WriteToV2(v2SessionToken) + meta.SetSessionToken(v2SessionToken) + } + + if prm.Local { + meta.SetTTL(1) + } + + addr := new(v2refs.Address) + + cnrV2 := new(v2refs.ContainerID) + prm.ContainerID.WriteToV2(cnrV2) + addr.SetContainerID(cnrV2) + + objV2 := new(v2refs.ObjectID) + prm.ObjectID.WriteToV2(objV2) + addr.SetObjectID(objV2) + + rng := new(v2object.Range) + rng.SetLength(prm.Length) + rng.SetOffset(prm.Offset) + + body := new(v2object.GetRangeRequestBody) + body.SetRaw(prm.Raw) + body.SetAddress(addr) + body.SetRange(rng) + + req := new(v2object.GetRangeRequest) + req.SetBody(body) + c.prepareRequest(req, meta) + + return req, nil } // UseKey specifies private key to sign the requests. // If key is not provided, then Client default key is used. -func (x *PrmObjectRange) UseKey(key ecdsa.PrivateKey) { - x.key = &key +// +// Deprecated: Use PrmObjectRange.Key instead. +func (prm *PrmObjectRange) UseKey(key ecdsa.PrivateKey) { + prm.Key = &key } // ResObjectRange groups the final result values of ObjectRange operation. @@ -662,49 +769,31 @@ func (x *ObjectRangeReader) Read(p []byte) (int, error) { // Returns an error if parameters are set incorrectly (see PrmObjectRange docs). // Context is required and must not be nil. It is used for network communication. func (c *Client) ObjectRangeInit(ctx context.Context, prm PrmObjectRange) (*ObjectRangeReader, error) { - // check parameters - switch { - case prm.addr.GetContainerID() == nil: - return nil, errorMissingContainer - case prm.addr.GetObjectID() == nil: - return nil, errorMissingObject - case prm.rng.GetLength() == 0: - return nil, errorZeroRangeLength + req, err := prm.buildRequest(c) + if err != nil { + return nil, err } - // form request body - var body v2object.GetRangeRequestBody - - body.SetRaw(prm.raw) - body.SetAddress(&prm.addr) - body.SetRange(&prm.rng) - - // form request - var req v2object.GetRangeRequest - - req.SetBody(&body) - c.prepareRequest(&req, &prm.meta) - - key := prm.key + key := prm.Key if key == nil { key = &c.prm.key } - err := signature.SignServiceMessage(key, &req) + err = signature.SignServiceMessage(key, req) if err != nil { return nil, fmt.Errorf("sign request: %w", err) } ctx, cancel := context.WithCancel(ctx) - stream, err := rpcapi.GetObjectRange(&c.c, &req, client.WithContext(ctx)) + stream, err := rpcapi.GetObjectRange(&c.c, req, client.WithContext(ctx)) if err != nil { cancel() return nil, fmt.Errorf("open stream: %w", err) } var r ObjectRangeReader - r.remainingPayloadLen = int(prm.rng.GetLength()) + r.remainingPayloadLen = int(prm.Length) r.cancelCtxStream = cancel r.stream = stream r.client = c diff --git a/pool/pool.go b/pool/pool.go index 4f8a42a..b4cc887 100644 --- a/pool/pool.go +++ b/pool/pool.go @@ -835,20 +835,15 @@ func (c *clientWrapper) objectGet(ctx context.Context, prm PrmObjectGet) (ResGet return ResGetObject{}, err } - var cliPrm sdkClient.PrmObjectGet - cliPrm.FromContainer(prm.addr.Container()) - cliPrm.ByID(prm.addr.Object()) + prmCnr := prm.addr.Container() + prmObj := prm.addr.Object() - if prm.stoken != nil { - cliPrm.WithinSession(*prm.stoken) - } - - if prm.btoken != nil { - cliPrm.WithBearerToken(*prm.btoken) - } - - if prm.key != nil { - cliPrm.UseKey(*prm.key) + cliPrm := sdkClient.PrmObjectGet{ + BearerToken: prm.btoken, + Session: prm.stoken, + ContainerID: &prmCnr, + ObjectID: &prmObj, + Key: prm.key, } var res ResGetObject @@ -888,23 +883,16 @@ func (c *clientWrapper) objectHead(ctx context.Context, prm PrmObjectHead) (obje return object.Object{}, err } - var cliPrm sdkClient.PrmObjectHead - cliPrm.FromContainer(prm.addr.Container()) - cliPrm.ByID(prm.addr.Object()) - if prm.raw { - cliPrm.MarkRaw() - } + prmCnr := prm.addr.Container() + prmObj := prm.addr.Object() - if prm.stoken != nil { - cliPrm.WithinSession(*prm.stoken) - } - - if prm.btoken != nil { - cliPrm.WithBearerToken(*prm.btoken) - } - - if prm.key != nil { - cliPrm.UseKey(*prm.key) + cliPrm := sdkClient.PrmObjectHead{ + BearerToken: prm.btoken, + Session: prm.stoken, + Raw: prm.raw, + ContainerID: &prmCnr, + ObjectID: &prmObj, + Key: prm.key, } var obj object.Object @@ -933,22 +921,17 @@ func (c *clientWrapper) objectRange(ctx context.Context, prm PrmObjectRange) (Re return ResObjectRange{}, err } - var cliPrm sdkClient.PrmObjectRange - cliPrm.FromContainer(prm.addr.Container()) - cliPrm.ByID(prm.addr.Object()) - cliPrm.SetOffset(prm.off) - cliPrm.SetLength(prm.ln) + prmCnr := prm.addr.Container() + prmObj := prm.addr.Object() - if prm.stoken != nil { - cliPrm.WithinSession(*prm.stoken) - } - - if prm.btoken != nil { - cliPrm.WithBearerToken(*prm.btoken) - } - - if prm.key != nil { - cliPrm.UseKey(*prm.key) + cliPrm := sdkClient.PrmObjectRange{ + BearerToken: prm.btoken, + Session: prm.stoken, + ContainerID: &prmCnr, + ObjectID: &prmObj, + Offset: prm.off, + Length: prm.ln, + Key: prm.key, } start := time.Now()