Compare commits

..

No commits in common. "fc4551b84341ab45cf856b88713c1dbfa01afad7" and "6fdbe755179efb5fc5cdf5f1abc78cb4e982c3c8" have entirely different histories.

52 changed files with 654 additions and 1465 deletions

View file

@ -25,7 +25,7 @@ linters-settings:
# report about shadowed variables
check-shadowing: false
staticcheck:
checks: ["all"]
checks: ["all", "-SA1019"] # TODO Enable SA1019 after deprecated warning are fixed.
funlen:
lines: 80 # default 60
statements: 60 # default 40

View file

@ -24,17 +24,17 @@ type Checksum refs.Checksum
// Type represents the enumeration
// of checksum types.
type Type refs.ChecksumType
type Type uint8
const (
// Unknown is an undefined checksum type.
Unknown Type = Type(refs.UnknownChecksum)
Unknown Type = iota
// SHA256 is a SHA256 checksum type.
SHA256 = Type(refs.SHA256)
SHA256
// TZ is a Tillich-Zémor checksum type.
TZ = Type(refs.TillichZemor)
TZ
)
// ReadFromV2 reads Checksum from the refs.Checksum message. Checks if the

View file

@ -2,9 +2,9 @@ package checksum
import (
"bytes"
"crypto/rand"
"crypto/sha256"
"fmt"
"math/rand"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/refs"
)

View file

@ -1,8 +1,8 @@
package checksumtest
import (
"crypto/rand"
"crypto/sha256"
"math/rand"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/checksum"
)
@ -11,7 +11,7 @@ import (
func Checksum() checksum.Checksum {
var cs [sha256.Size]byte
_, _ = rand.Read(cs[:])
rand.Read(cs[:])
var x checksum.Checksum

View file

@ -47,8 +47,6 @@ func writeXHeadersToMeta(xHeaders []string, h *v2session.RequestMetaHeader) {
return
}
// TODO (aarifullin): remove the panic when all client parameters will check XHeaders
// within buildRequest invocation.
if len(xHeaders)%2 != 0 {
panic("slice of X-Headers with odd length")
}

View file

@ -18,20 +18,20 @@ import (
// PrmContainerSetEACL groups parameters of ContainerSetEACL operation.
type PrmContainerSetEACL struct {
// FrostFS request X-Headers.
XHeaders []string
prmCommonMeta
Table *eacl.Table
tableSet bool
table eacl.Table
Session *session.Container
sessionSet bool
session session.Container
}
// SetTable sets eACL table structure to be set for the container.
// Required parameter.
//
// Deprecated: Use PrmContainerSetEACL.Table instead.
func (x *PrmContainerSetEACL) SetTable(table eacl.Table) {
x.Table = &table
x.table = table
x.tableSet = true
}
// WithinSession specifies session within which extended ACL of the container
@ -45,22 +45,17 @@ func (x *PrmContainerSetEACL) SetTable(table eacl.Table) {
// for which extended ACL is going to be set
// - session operation MUST be session.VerbContainerSetEACL (ForVerb)
// - token MUST be signed using private key of the owner of the container to be saved
//
// Deprecated: Use PrmContainerSetEACL.Session instead.
func (x *PrmContainerSetEACL) WithinSession(s session.Container) {
x.Session = &s
x.session = s
x.sessionSet = true
}
func (x *PrmContainerSetEACL) buildRequest(c *Client) (*v2container.SetExtendedACLRequest, error) {
if x.Table == nil {
if !x.tableSet {
return nil, errorEACLTableNotSet
}
if len(x.XHeaders)%2 != 0 {
return nil, errorInvalidXHeaders
}
eaclV2 := x.Table.ToV2()
eaclV2 := x.table.ToV2()
var sig frostfscrypto.Signature
@ -77,11 +72,11 @@ func (x *PrmContainerSetEACL) buildRequest(c *Client) (*v2container.SetExtendedA
reqBody.SetSignature(&sigv2)
var meta v2session.RequestMetaHeader
writeXHeadersToMeta(x.XHeaders, &meta)
writeXHeadersToMeta(x.prmCommonMeta.xHeaders, &meta)
if x.Session != nil {
if x.sessionSet {
var tokv2 v2session.Token
x.Session.WriteToV2(&tokv2)
x.session.WriteToV2(&tokv2)
meta.SetSessionToken(&tokv2)
}

View file

@ -14,29 +14,27 @@ import (
// PrmAnnounceSpace groups parameters of ContainerAnnounceUsedSpace operation.
type PrmAnnounceSpace struct {
XHeaders []string
prmCommonMeta
Announcements []container.SizeEstimation
announcements []container.SizeEstimation
}
// SetValues sets values describing volume of space that is used for the container objects.
// Required parameter. Must not be empty.
//
// Must not be mutated before the end of the operation.
//
// Deprecated: Use PrmAnnounceSpace.Announcements instead.
func (x *PrmAnnounceSpace) SetValues(vs []container.SizeEstimation) {
x.Announcements = vs
x.announcements = vs
}
func (x *PrmAnnounceSpace) buildRequest(c *Client) (*v2container.AnnounceUsedSpaceRequest, error) {
if len(x.Announcements) == 0 {
if len(x.announcements) == 0 {
return nil, errorMissingAnnouncements
}
v2announce := make([]v2container.UsedSpaceAnnouncement, len(x.Announcements))
for i := range x.Announcements {
x.Announcements[i].WriteToV2(&v2announce[i])
v2announce := make([]v2container.UsedSpaceAnnouncement, len(x.announcements))
for i := range x.announcements {
x.announcements[i].WriteToV2(&v2announce[i])
}
reqBody := new(v2container.AnnounceUsedSpaceRequestBody)

View file

@ -16,12 +16,12 @@ import (
// PrmEndpointInfo groups parameters of EndpointInfo operation.
type PrmEndpointInfo struct {
XHeaders []string
prmCommonMeta
}
func (x *PrmEndpointInfo) buildRequest(c *Client) (*v2netmap.LocalNodeInfoRequest, error) {
meta := new(v2session.RequestMetaHeader)
writeXHeadersToMeta(x.XHeaders, meta)
writeXHeadersToMeta(x.xHeaders, meta)
req := new(v2netmap.LocalNodeInfoRequest)
req.SetBody(new(v2netmap.LocalNodeInfoRequestBody))
@ -112,12 +112,12 @@ func (c *Client) EndpointInfo(ctx context.Context, prm PrmEndpointInfo) (*ResEnd
// PrmNetworkInfo groups parameters of NetworkInfo operation.
type PrmNetworkInfo struct {
XHeaders []string
prmCommonMeta
}
func (x PrmNetworkInfo) buildRequest(c *Client) (*v2netmap.NetworkInfoRequest, error) {
meta := new(v2session.RequestMetaHeader)
writeXHeadersToMeta(x.XHeaders, meta)
writeXHeadersToMeta(x.xHeaders, meta)
var req v2netmap.NetworkInfoRequest
req.SetBody(new(v2netmap.NetworkInfoRequestBody))

View file

@ -21,25 +21,71 @@ import (
// PrmObjectDelete groups parameters of ObjectDelete operation.
type PrmObjectDelete struct {
XHeaders []string
meta v2session.RequestMetaHeader
BearerToken *bearer.Token
body v2object.DeleteRequestBody
Session *session.Object
addr v2refs.Address
ContainerID *cid.ID
keySet bool
key ecdsa.PrivateKey
}
ObjectID *oid.ID
// WithinSession specifies session within which object should be read.
//
// Creator of the session acquires the authorship of the request.
// This may affect the execution of an operation (e.g. access control).
//
// Must be signed.
func (x *PrmObjectDelete) WithinSession(t session.Object) {
var tv2 v2session.Token
t.WriteToV2(&tv2)
Key *ecdsa.PrivateKey
x.meta.SetSessionToken(&tv2)
}
// WithBearerToken attaches bearer token to be used for the operation.
//
// If set, underlying eACL rules will be used in access control.
//
// Must be signed.
func (x *PrmObjectDelete) WithBearerToken(t bearer.Token) {
var v2token acl.BearerToken
t.WriteToV2(&v2token)
x.meta.SetBearerToken(&v2token)
}
// FromContainer specifies FrostFS container of the object.
// Required parameter.
func (x *PrmObjectDelete) FromContainer(id cid.ID) {
var cidV2 v2refs.ContainerID
id.WriteToV2(&cidV2)
x.addr.SetContainerID(&cidV2)
}
// ByID specifies identifier of the requested object.
// Required parameter.
func (x *PrmObjectDelete) ByID(id oid.ID) {
var idV2 v2refs.ObjectID
id.WriteToV2(&idV2)
x.addr.SetObjectID(&idV2)
}
// UseKey specifies private key to sign the requests.
// If key is not provided, then Client default key is used.
func (x *PrmObjectDelete) UseKey(key ecdsa.PrivateKey) {
x.keySet = true
x.key = key
}
// WithXHeaders specifies list of extended headers (string key-value pairs)
// to be attached to the request. Must have an even length.
//
// Deprecated: Use PrmObjectDelete.Key instead.
func (prm *PrmObjectDelete) UseKey(key ecdsa.PrivateKey) {
prm.Key = &key
// Slice must not be mutated until the operation completes.
func (x *PrmObjectDelete) WithXHeaders(hs ...string) {
writeXHeadersToMeta(hs, &x.meta)
}
// ResObjectDelete groups resulting values of ObjectDelete operation.
@ -54,54 +100,6 @@ func (x ResObjectDelete) Tombstone() oid.ID {
return x.tomb
}
func (prm *PrmObjectDelete) buildRequest(c *Client) (*v2object.DeleteRequest, error) {
if prm.ContainerID == nil {
return nil, errorMissingContainer
}
if prm.ObjectID == nil {
return nil, errorMissingObject
}
if len(prm.XHeaders)%2 != 0 {
return nil, errorInvalidXHeaders
}
meta := new(v2session.RequestMetaHeader)
writeXHeadersToMeta(prm.XHeaders, meta)
if prm.BearerToken != nil {
v2BearerToken := new(acl.BearerToken)
prm.BearerToken.WriteToV2(v2BearerToken)
meta.SetBearerToken(v2BearerToken)
}
if prm.Session != nil {
v2SessionToken := new(v2session.Token)
prm.Session.WriteToV2(v2SessionToken)
meta.SetSessionToken(v2SessionToken)
}
addr := new(v2refs.Address)
cnrV2 := new(v2refs.ContainerID)
prm.ContainerID.WriteToV2(cnrV2)
addr.SetContainerID(cnrV2)
objV2 := new(v2refs.ObjectID)
prm.ObjectID.WriteToV2(objV2)
addr.SetObjectID(objV2)
body := new(v2object.DeleteRequestBody)
body.SetAddress(addr)
req := new(v2object.DeleteRequest)
req.SetBody(body)
c.prepareRequest(req, meta)
return req, nil
}
// ObjectDelete marks an object for deletion from the container using FrostFS API protocol.
// As a marker, a special unit called a tombstone is placed in the container.
// It confirms the user's intent to delete the object, and is itself a container object.
@ -126,22 +124,32 @@ func (prm *PrmObjectDelete) buildRequest(c *Client) (*v2object.DeleteRequest, er
// - *apistatus.ObjectLocked;
// - *apistatus.SessionTokenExpired.
func (c *Client) ObjectDelete(ctx context.Context, prm PrmObjectDelete) (*ResObjectDelete, error) {
req, err := prm.buildRequest(c)
if err != nil {
return nil, err
switch {
case prm.addr.GetContainerID() == nil:
return nil, errorMissingContainer
case prm.addr.GetObjectID() == nil:
return nil, errorMissingObject
}
// form request body
prm.body.SetAddress(&prm.addr)
// form request
var req v2object.DeleteRequest
req.SetBody(&prm.body)
c.prepareRequest(&req, &prm.meta)
key := c.prm.key
if prm.Key != nil {
key = *prm.Key
if prm.keySet {
key = prm.key
}
err = signature.SignServiceMessage(&key, req)
err := signature.SignServiceMessage(&key, &req)
if err != nil {
return nil, fmt.Errorf("sign request: %w", err)
}
resp, err := rpcapi.DeleteObject(&c.c, req, client.WithContext(ctx))
resp, err := rpcapi.DeleteObject(&c.c, &req, client.WithContext(ctx))
if err != nil {
return nil, err
}

View file

@ -22,76 +22,77 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session"
)
// PrmObjectGet groups parameters of ObjectGetInit operation.
type PrmObjectGet struct {
XHeaders []string
// shared parameters of GET/HEAD/RANGE.
type prmObjectRead struct {
meta v2session.RequestMetaHeader
BearerToken *bearer.Token
raw bool
Session *session.Object
Raw bool
Local bool
ContainerID *cid.ID
ObjectID *oid.ID
Key *ecdsa.PrivateKey
addr v2refs.Address
}
func (prm *PrmObjectGet) buildRequest(c *Client) (*v2object.GetRequest, error) {
if prm.ContainerID == nil {
return nil, errorMissingContainer
}
// WithXHeaders specifies list of extended headers (string key-value pairs)
// to be attached to the request. Must have an even length.
//
// Slice must not be mutated until the operation completes.
func (x *prmObjectRead) WithXHeaders(hs ...string) {
writeXHeadersToMeta(hs, &x.meta)
}
if prm.ObjectID == nil {
return nil, errorMissingObject
}
// MarkRaw marks an intent to read physically stored object.
func (x *prmObjectRead) MarkRaw() {
x.raw = true
}
if len(prm.XHeaders)%2 != 0 {
return nil, errorInvalidXHeaders
}
// MarkLocal tells the server to execute the operation locally.
func (x *prmObjectRead) MarkLocal() {
x.meta.SetTTL(1)
}
meta := new(v2session.RequestMetaHeader)
writeXHeadersToMeta(prm.XHeaders, meta)
// WithinSession specifies session within which object should be read.
//
// Creator of the session acquires the authorship of the request.
// This may affect the execution of an operation (e.g. access control).
//
// Must be signed.
func (x *prmObjectRead) WithinSession(t session.Object) {
var tokv2 v2session.Token
t.WriteToV2(&tokv2)
x.meta.SetSessionToken(&tokv2)
}
if prm.BearerToken != nil {
v2BearerToken := new(acl.BearerToken)
prm.BearerToken.WriteToV2(v2BearerToken)
meta.SetBearerToken(v2BearerToken)
}
// WithBearerToken attaches bearer token to be used for the operation.
//
// If set, underlying eACL rules will be used in access control.
//
// Must be signed.
func (x *prmObjectRead) WithBearerToken(t bearer.Token) {
var v2token acl.BearerToken
t.WriteToV2(&v2token)
x.meta.SetBearerToken(&v2token)
}
if prm.Session != nil {
v2SessionToken := new(v2session.Token)
prm.Session.WriteToV2(v2SessionToken)
meta.SetSessionToken(v2SessionToken)
}
// FromContainer specifies FrostFS container of the object.
// Required parameter.
func (x *prmObjectRead) FromContainer(id cid.ID) {
var cnrV2 v2refs.ContainerID
id.WriteToV2(&cnrV2)
x.addr.SetContainerID(&cnrV2)
}
if prm.Local {
meta.SetTTL(1)
}
// ByID specifies identifier of the requested object.
// Required parameter.
func (x *prmObjectRead) ByID(id oid.ID) {
var objV2 v2refs.ObjectID
id.WriteToV2(&objV2)
x.addr.SetObjectID(&objV2)
}
addr := new(v2refs.Address)
// PrmObjectGet groups parameters of ObjectGetInit operation.
type PrmObjectGet struct {
prmObjectRead
cnrV2 := new(v2refs.ContainerID)
prm.ContainerID.WriteToV2(cnrV2)
addr.SetContainerID(cnrV2)
objV2 := new(v2refs.ObjectID)
prm.ObjectID.WriteToV2(objV2)
addr.SetObjectID(objV2)
body := new(v2object.GetRequestBody)
body.SetRaw(prm.Raw)
body.SetAddress(addr)
req := new(v2object.GetRequest)
req.SetBody(body)
c.prepareRequest(req, meta)
return req, nil
key *ecdsa.PrivateKey
}
// ResObjectGet groups the final result values of ObjectGetInit operation.
@ -121,10 +122,8 @@ type ObjectReader struct {
// UseKey specifies private key to sign the requests.
// If key is not provided, then Client default key is used.
//
// Deprecated: Use PrmObjectGet.Key instead.
func (prm *PrmObjectGet) UseKey(key ecdsa.PrivateKey) {
prm.Key = &key
func (x *PrmObjectGet) UseKey(key ecdsa.PrivateKey) {
x.key = &key
}
// ReadHeader reads header of the object. Result means success.
@ -300,24 +299,39 @@ func (x *ObjectReader) Read(p []byte) (int, error) {
// Returns an error if parameters are set incorrectly (see PrmObjectGet docs).
// Context is required and must not be nil. It is used for network communication.
func (c *Client) ObjectGetInit(ctx context.Context, prm PrmObjectGet) (*ObjectReader, error) {
req, err := prm.buildRequest(c)
if err != nil {
return nil, err
// check parameters
switch {
case prm.addr.GetContainerID() == nil:
return nil, errorMissingContainer
case prm.addr.GetObjectID() == nil:
return nil, errorMissingObject
}
key := prm.Key
// form request body
var body v2object.GetRequestBody
body.SetRaw(prm.raw)
body.SetAddress(&prm.addr)
// form request
var req v2object.GetRequest
req.SetBody(&body)
c.prepareRequest(&req, &prm.meta)
key := prm.key
if key == nil {
key = &c.prm.key
}
err = signature.SignServiceMessage(key, req)
err := signature.SignServiceMessage(key, &req)
if err != nil {
return nil, fmt.Errorf("sign request: %w", err)
}
ctx, cancel := context.WithCancel(ctx)
stream, err := rpcapi.GetObject(&c.c, req, client.WithContext(ctx))
stream, err := rpcapi.GetObject(&c.c, &req, client.WithContext(ctx))
if err != nil {
cancel()
return nil, fmt.Errorf("open stream: %w", err)
@ -333,29 +347,17 @@ func (c *Client) ObjectGetInit(ctx context.Context, prm PrmObjectGet) (*ObjectRe
// PrmObjectHead groups parameters of ObjectHead operation.
type PrmObjectHead struct {
XHeaders []string
prmObjectRead
BearerToken *bearer.Token
Session *session.Object
Raw bool
Local bool
ContainerID *cid.ID
ObjectID *oid.ID
Key *ecdsa.PrivateKey
keySet bool
key ecdsa.PrivateKey
}
// UseKey specifies private key to sign the requests.
// If key is not provided, then Client default key is used.
//
// Deprecated: Use PrmObjectHead.Key instead.
func (prm *PrmObjectHead) UseKey(key ecdsa.PrivateKey) {
prm.Key = &key
func (x *PrmObjectHead) UseKey(key ecdsa.PrivateKey) {
x.keySet = true
x.key = key
}
// ResObjectHead groups resulting values of ObjectHead operation.
@ -388,58 +390,6 @@ func (x *ResObjectHead) ReadHeader(dst *object.Object) bool {
return true
}
func (prm *PrmObjectHead) buildRequest(c *Client) (*v2object.HeadRequest, error) {
if prm.ContainerID == nil {
return nil, errorMissingContainer
}
if prm.ObjectID == nil {
return nil, errorMissingObject
}
if len(prm.XHeaders)%2 != 0 {
return nil, errorInvalidXHeaders
}
meta := new(v2session.RequestMetaHeader)
writeXHeadersToMeta(prm.XHeaders, meta)
if prm.BearerToken != nil {
v2BearerToken := new(acl.BearerToken)
prm.BearerToken.WriteToV2(v2BearerToken)
meta.SetBearerToken(v2BearerToken)
}
if prm.Session != nil {
v2SessionToken := new(v2session.Token)
prm.Session.WriteToV2(v2SessionToken)
meta.SetSessionToken(v2SessionToken)
}
if prm.Local {
meta.SetTTL(1)
}
addr := new(v2refs.Address)
cnrV2 := new(v2refs.ContainerID)
prm.ContainerID.WriteToV2(cnrV2)
addr.SetContainerID(cnrV2)
objV2 := new(v2refs.ObjectID)
prm.ObjectID.WriteToV2(objV2)
addr.SetObjectID(objV2)
body := new(v2object.HeadRequestBody)
body.SetRaw(prm.Raw)
body.SetAddress(addr)
req := new(v2object.HeadRequest)
req.SetBody(body)
c.prepareRequest(req, meta)
return req, nil
}
// ObjectHead reads object header through a remote server using FrostFS API protocol.
//
// Exactly one return value is non-nil. By default, server status is returned in res structure.
@ -463,24 +413,33 @@ func (prm *PrmObjectHead) buildRequest(c *Client) (*v2object.HeadRequest, error)
// - *apistatus.ObjectAlreadyRemoved;
// - *apistatus.SessionTokenExpired.
func (c *Client) ObjectHead(ctx context.Context, prm PrmObjectHead) (*ResObjectHead, error) {
req, err := prm.buildRequest(c)
if err != nil {
return nil, err
switch {
case prm.addr.GetContainerID() == nil:
return nil, errorMissingContainer
case prm.addr.GetObjectID() == nil:
return nil, errorMissingObject
}
var body v2object.HeadRequestBody
body.SetRaw(prm.raw)
body.SetAddress(&prm.addr)
var req v2object.HeadRequest
req.SetBody(&body)
c.prepareRequest(&req, &prm.meta)
key := c.prm.key
if prm.Key != nil {
key = *prm.Key
if prm.keySet {
key = prm.key
}
// sign the request
err = signature.SignServiceMessage(&key, req)
err := signature.SignServiceMessage(&key, &req)
if err != nil {
return nil, fmt.Errorf("sign request: %w", err)
}
resp, err := rpcapi.HeadObject(&c.c, req, client.WithContext(ctx))
resp, err := rpcapi.HeadObject(&c.c, &req, client.WithContext(ctx))
if err != nil {
return nil, fmt.Errorf("write request: %w", err)
}
@ -495,7 +454,7 @@ func (c *Client) ObjectHead(ctx context.Context, prm PrmObjectHead) (*ResObjectH
return &res, nil
}
res.idObj = *prm.ObjectID
_ = res.idObj.ReadFromV2(*prm.addr.GetObjectID())
switch v := resp.GetBody().GetHeaderPart().(type) {
default:
@ -511,95 +470,29 @@ func (c *Client) ObjectHead(ctx context.Context, prm PrmObjectHead) (*ResObjectH
// PrmObjectRange groups parameters of ObjectRange operation.
type PrmObjectRange struct {
XHeaders []string
prmObjectRead
BearerToken *bearer.Token
key *ecdsa.PrivateKey
Session *session.Object
Raw bool
Local bool
ContainerID *cid.ID
ObjectID *oid.ID
Key *ecdsa.PrivateKey
Offset uint64
Length uint64
rng v2object.Range
}
func (prm *PrmObjectRange) buildRequest(c *Client) (*v2object.GetRangeRequest, error) {
if prm.Length == 0 {
return nil, errorZeroRangeLength
}
// SetOffset sets offset of the payload range to be read.
// Zero by default.
func (x *PrmObjectRange) SetOffset(off uint64) {
x.rng.SetOffset(off)
}
if prm.ContainerID == nil {
return nil, errorMissingContainer
}
if prm.ObjectID == nil {
return nil, errorMissingObject
}
if len(prm.XHeaders)%2 != 0 {
return nil, errorInvalidXHeaders
}
meta := new(v2session.RequestMetaHeader)
writeXHeadersToMeta(prm.XHeaders, meta)
if prm.BearerToken != nil {
v2BearerToken := new(acl.BearerToken)
prm.BearerToken.WriteToV2(v2BearerToken)
meta.SetBearerToken(v2BearerToken)
}
if prm.Session != nil {
v2SessionToken := new(v2session.Token)
prm.Session.WriteToV2(v2SessionToken)
meta.SetSessionToken(v2SessionToken)
}
if prm.Local {
meta.SetTTL(1)
}
addr := new(v2refs.Address)
cnrV2 := new(v2refs.ContainerID)
prm.ContainerID.WriteToV2(cnrV2)
addr.SetContainerID(cnrV2)
objV2 := new(v2refs.ObjectID)
prm.ObjectID.WriteToV2(objV2)
addr.SetObjectID(objV2)
rng := new(v2object.Range)
rng.SetLength(prm.Length)
rng.SetOffset(prm.Offset)
body := new(v2object.GetRangeRequestBody)
body.SetRaw(prm.Raw)
body.SetAddress(addr)
body.SetRange(rng)
req := new(v2object.GetRangeRequest)
req.SetBody(body)
c.prepareRequest(req, meta)
return req, nil
// SetLength sets length of the payload range to be read.
// Must be positive.
func (x *PrmObjectRange) SetLength(ln uint64) {
x.rng.SetLength(ln)
}
// UseKey specifies private key to sign the requests.
// If key is not provided, then Client default key is used.
//
// Deprecated: Use PrmObjectRange.Key instead.
func (prm *PrmObjectRange) UseKey(key ecdsa.PrivateKey) {
prm.Key = &key
func (x *PrmObjectRange) UseKey(key ecdsa.PrivateKey) {
x.key = &key
}
// ResObjectRange groups the final result values of ObjectRange operation.
@ -769,31 +662,49 @@ func (x *ObjectRangeReader) Read(p []byte) (int, error) {
// Returns an error if parameters are set incorrectly (see PrmObjectRange docs).
// Context is required and must not be nil. It is used for network communication.
func (c *Client) ObjectRangeInit(ctx context.Context, prm PrmObjectRange) (*ObjectRangeReader, error) {
req, err := prm.buildRequest(c)
if err != nil {
return nil, err
// check parameters
switch {
case prm.addr.GetContainerID() == nil:
return nil, errorMissingContainer
case prm.addr.GetObjectID() == nil:
return nil, errorMissingObject
case prm.rng.GetLength() == 0:
return nil, errorZeroRangeLength
}
key := prm.Key
// form request body
var body v2object.GetRangeRequestBody
body.SetRaw(prm.raw)
body.SetAddress(&prm.addr)
body.SetRange(&prm.rng)
// form request
var req v2object.GetRangeRequest
req.SetBody(&body)
c.prepareRequest(&req, &prm.meta)
key := prm.key
if key == nil {
key = &c.prm.key
}
err = signature.SignServiceMessage(key, req)
err := signature.SignServiceMessage(key, &req)
if err != nil {
return nil, fmt.Errorf("sign request: %w", err)
}
ctx, cancel := context.WithCancel(ctx)
stream, err := rpcapi.GetObjectRange(&c.c, req, client.WithContext(ctx))
stream, err := rpcapi.GetObjectRange(&c.c, &req, client.WithContext(ctx))
if err != nil {
cancel()
return nil, fmt.Errorf("open stream: %w", err)
}
var r ObjectRangeReader
r.remainingPayloadLen = int(prm.Length)
r.remainingPayloadLen = int(prm.rng.GetLength())
r.cancelCtxStream = cancel
r.stream = stream
r.client = c

View file

@ -13,53 +13,121 @@ import (
v2session "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/session"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/signature"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/checksum"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session"
)
// PrmObjectHash groups parameters of ObjectHash operation.
type PrmObjectHash struct {
XHeaders []string
meta v2session.RequestMetaHeader
BearerToken *bearer.Token
body v2object.GetRangeHashRequestBody
Session *session.Object
csAlgo v2refs.ChecksumType
Local bool
addr v2refs.Address
Ranges []object.Range
Salt []byte
ChecksumType checksum.Type
ContainerID *cid.ID
ObjectID *oid.ID
Key *ecdsa.PrivateKey
keySet bool
key ecdsa.PrivateKey
}
// UseKey specifies private key to sign the requests.
// If key is not provided, then Client default key is used.
func (x *PrmObjectHash) UseKey(key ecdsa.PrivateKey) {
x.keySet = true
x.key = key
}
// MarkLocal tells the server to execute the operation locally.
func (x *PrmObjectHash) MarkLocal() {
x.meta.SetTTL(1)
}
// WithinSession specifies session within which object should be read.
//
// Deprecated: Use PrmObjectHash.Key instead.
func (prm *PrmObjectHash) UseKey(key ecdsa.PrivateKey) {
prm.Key = &key
// Creator of the session acquires the authorship of the request.
// This may affect the execution of an operation (e.g. access control).
//
// Must be signed.
func (x *PrmObjectHash) WithinSession(t session.Object) {
var tv2 v2session.Token
t.WriteToV2(&tv2)
x.meta.SetSessionToken(&tv2)
}
// WithBearerToken attaches bearer token to be used for the operation.
//
// If set, underlying eACL rules will be used in access control.
//
// Must be signed.
func (x *PrmObjectHash) WithBearerToken(t bearer.Token) {
var v2token acl.BearerToken
t.WriteToV2(&v2token)
x.meta.SetBearerToken(&v2token)
}
// FromContainer specifies FrostFS container of the object.
// Required parameter.
func (x *PrmObjectHash) FromContainer(id cid.ID) {
var cidV2 v2refs.ContainerID
id.WriteToV2(&cidV2)
x.addr.SetContainerID(&cidV2)
}
// ByID specifies identifier of the requested object.
// Required parameter.
func (x *PrmObjectHash) ByID(id oid.ID) {
var idV2 v2refs.ObjectID
id.WriteToV2(&idV2)
x.addr.SetObjectID(&idV2)
}
// SetRangeList sets list of ranges in (offset, length) pair format.
// Required parameter.
//
// If passed as slice, then it must not be mutated before the operation completes.
func (x *PrmObjectHash) SetRangeList(r ...uint64) {
ln := len(r)
if ln%2 != 0 {
panic("odd number of range parameters")
}
rs := make([]v2object.Range, ln/2)
for i := 0; i < ln/2; i++ {
rs[i].SetOffset(r[2*i])
rs[i].SetLength(r[2*i+1])
}
x.body.SetRanges(rs)
}
// TillichZemorAlgo changes the hash function to Tillich-Zemor
// (https://link.springer.com/content/pdf/10.1007/3-540-48658-5_5.pdf).
//
// By default, SHA256 hash function is used/.
// By default, SHA256 hash function is used.
func (x *PrmObjectHash) TillichZemorAlgo() {
x.csAlgo = v2refs.TillichZemor
}
// UseSalt sets the salt to XOR the data range before hashing.
//
// Deprecated: Use PrmObjectHash.ChecksumType instead.
func (prm *PrmObjectHash) TillichZemorAlgo() {
prm.ChecksumType = checksum.TZ
// Must not be mutated before the operation completes.
func (x *PrmObjectHash) UseSalt(salt []byte) {
x.body.SetSalt(salt)
}
// WithXHeaders specifies list of extended headers (string key-value pairs)
// to be attached to the request. Must have an even length.
//
// Slice must not be mutated until the operation completes.
func (x *PrmObjectHash) WithXHeaders(hs ...string) {
writeXHeadersToMeta(hs, &x.meta)
}
// ResObjectHash groups resulting values of ObjectHash operation.
@ -74,76 +142,6 @@ func (x ResObjectHash) Checksums() [][]byte {
return x.checksums
}
func (prm *PrmObjectHash) buildRequest(c *Client) (*v2object.GetRangeHashRequest, error) {
if prm.ContainerID == nil {
return nil, errorMissingContainer
}
if prm.ObjectID == nil {
return nil, errorMissingObject
}
if len(prm.XHeaders)%2 != 0 {
return nil, errorInvalidXHeaders
}
if len(prm.Ranges) == 0 {
return nil, errorMissingRanges
}
meta := new(v2session.RequestMetaHeader)
writeXHeadersToMeta(prm.XHeaders, meta)
if prm.BearerToken != nil {
v2BearerToken := new(acl.BearerToken)
prm.BearerToken.WriteToV2(v2BearerToken)
meta.SetBearerToken(v2BearerToken)
}
if prm.Session != nil {
v2SessionToken := new(v2session.Token)
prm.Session.WriteToV2(v2SessionToken)
meta.SetSessionToken(v2SessionToken)
}
if prm.Local {
meta.SetTTL(1)
}
addr := new(v2refs.Address)
cnrV2 := new(v2refs.ContainerID)
prm.ContainerID.WriteToV2(cnrV2)
addr.SetContainerID(cnrV2)
objV2 := new(v2refs.ObjectID)
prm.ObjectID.WriteToV2(objV2)
addr.SetObjectID(objV2)
rs := make([]v2object.Range, len(prm.Ranges))
for i := range prm.Ranges {
rs[i].SetOffset(prm.Ranges[i].GetOffset())
rs[i].SetLength(prm.Ranges[i].GetLength())
}
body := new(v2object.GetRangeHashRequestBody)
body.SetAddress(addr)
body.SetRanges(rs)
body.SetSalt(prm.Salt)
if prm.ChecksumType == checksum.Unknown {
body.SetType(v2refs.SHA256)
} else {
body.SetType(v2refs.ChecksumType(prm.ChecksumType))
}
req := new(v2object.GetRangeHashRequest)
req.SetBody(body)
c.prepareRequest(req, meta)
return req, nil
}
// ObjectHash requests checksum of the range list of the object payload using
// FrostFS API protocol.
//
@ -167,22 +165,37 @@ func (prm *PrmObjectHash) buildRequest(c *Client) (*v2object.GetRangeHashRequest
// - *apistatus.ObjectOutOfRange;
// - *apistatus.SessionTokenExpired.
func (c *Client) ObjectHash(ctx context.Context, prm PrmObjectHash) (*ResObjectHash, error) {
req, err := prm.buildRequest(c)
if err != nil {
return nil, err
switch {
case prm.addr.GetContainerID() == nil:
return nil, errorMissingContainer
case prm.addr.GetObjectID() == nil:
return nil, errorMissingObject
case len(prm.body.GetRanges()) == 0:
return nil, errorMissingRanges
}
prm.body.SetAddress(&prm.addr)
if prm.csAlgo == v2refs.UnknownChecksum {
prm.body.SetType(v2refs.SHA256)
} else {
prm.body.SetType(prm.csAlgo)
}
var req v2object.GetRangeHashRequest
c.prepareRequest(&req, &prm.meta)
req.SetBody(&prm.body)
key := c.prm.key
if prm.Key != nil {
key = *prm.Key
if prm.keySet {
key = prm.key
}
err = signature.SignServiceMessage(&key, req)
err := signature.SignServiceMessage(&key, &req)
if err != nil {
return nil, fmt.Errorf("sign request: %w", err)
}
resp, err := rpcapi.HashObjectRange(&c.c, req, client.WithContext(ctx))
resp, err := rpcapi.HashObjectRange(&c.c, &req, client.WithContext(ctx))
if err != nil {
return nil, fmt.Errorf("write request: %w", err)
}

View file

@ -16,32 +16,30 @@ import (
// PrmSessionCreate groups parameters of SessionCreate operation.
type PrmSessionCreate struct {
XHeaders []string
prmCommonMeta
Expiration uint64
exp uint64
Key *ecdsa.PrivateKey
keySet bool
key ecdsa.PrivateKey
}
// SetExp sets number of the last NepFS epoch in the lifetime of the session after which it will be expired.
//
// Deprecated: Use PrmSessionCreate.Expiration instead.
func (x *PrmSessionCreate) SetExp(exp uint64) {
x.Expiration = exp
x.exp = exp
}
// UseKey specifies private key to sign the requests and compute token owner.
// If key is not provided, then Client default key is used.
//
// Deprecated: Use PrmSessionCreate.Key instead.
func (x *PrmSessionCreate) UseKey(key ecdsa.PrivateKey) {
x.Key = &key
x.keySet = true
x.key = key
}
func (x *PrmSessionCreate) buildRequest(c *Client) (*v2session.CreateRequest, error) {
ownerKey := c.prm.key.PublicKey
if x.Key != nil {
ownerKey = x.Key.PublicKey
if x.keySet {
ownerKey = x.key.PublicKey
}
var ownerID user.ID
user.IDFromKey(&ownerID, ownerKey)
@ -51,10 +49,10 @@ func (x *PrmSessionCreate) buildRequest(c *Client) (*v2session.CreateRequest, er
reqBody := new(v2session.CreateRequestBody)
reqBody.SetOwnerID(&ownerIDV2)
reqBody.SetExpiration(x.Expiration)
reqBody.SetExpiration(x.exp)
var meta v2session.RequestMetaHeader
writeXHeadersToMeta(x.XHeaders, &meta)
writeXHeadersToMeta(x.xHeaders, &meta)
var req v2session.CreateRequest
req.SetBody(reqBody)

View file

@ -1,8 +1,8 @@
package cid_test
import (
"crypto/rand"
"crypto/sha256"
"math/rand"
"testing"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/refs"

View file

@ -1,8 +1,8 @@
package cidtest
import (
"crypto/rand"
"crypto/sha256"
"math/rand"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
)
@ -11,7 +11,7 @@ import (
func ID() cid.ID {
checksum := [sha256.Size]byte{}
_, _ = rand.Read(checksum[:])
rand.Read(checksum[:])
return IDWithChecksum(checksum)
}

View file

@ -1,7 +1,7 @@
package frostfscrypto_test
import (
"crypto/rand"
"math/rand"
"testing"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/refs"

File diff suppressed because one or more lines are too long

Before

Width:  |  Height:  |  Size: 12 KiB

After

Width:  |  Height:  |  Size: 12 KiB

File diff suppressed because one or more lines are too long

Before

Width:  |  Height:  |  Size: 23 KiB

After

Width:  |  Height:  |  Size: 23 KiB

File diff suppressed because one or more lines are too long

Before

Width:  |  Height:  |  Size: 8.8 KiB

After

Width:  |  Height:  |  Size: 8.8 KiB

File diff suppressed because one or more lines are too long

Before

Width:  |  Height:  |  Size: 12 KiB

After

Width:  |  Height:  |  Size: 12 KiB

File diff suppressed because one or more lines are too long

Before

Width:  |  Height:  |  Size: 11 KiB

After

Width:  |  Height:  |  Size: 11 KiB

View file

@ -101,16 +101,16 @@ In a nutshell, a `SELECT` takes a filter result as input and outputs a specific
Let's see some examples
```sql
-- Selects exactly one node from the entire netmap
SELECT 1 FROM *
SELECT 1 FROM *
-- Same as above, but with an identifier for the selection
SELECT 1 FROM * AS ONE
SELECT 1 FROM * AS ONE
-- Selects two nodes from the RedOrBlueNodes filter, such that both selected nodes
-- Selects two nodes from the RedOrBlueNodes filter, such that both selected nodes
-- share the same value for the Color attribute, i.e. both red or both blue.
SELECT 2 IN SAME Color FROM RedOrBlueNodes
SELECT 2 IN SAME Color FROM RedOrBlueNodes
-- Selects two nodes from the RedOrBlueNodes filter, such that the selected nodes
-- Selects two nodes from the RedOrBlueNodes filter, such that the selected nodes
-- have distinct values for the Color attribute, i.e. one red and one blue.
-- The selection is also given an identifier.
SELECT 2 IN DISTINCT Color FROM RedOrBlueNodes AS MyNodes
@ -131,8 +131,7 @@ Its basic syntax is as follows:
REP <count> {IN <select>}
```
If a select is not specified, then the entire netmap is used as input. The only exception to this rule is when exactly 1 replica and 1 selector are being present: in this case the only selector is being used instead of the whole netmap.
The resulting nodes will be used to actually store objects and they constitute a replica group (or simply, "a replica").
If a select is not specified, then the entire netmap is used as input. The resulting nodes will be used to actually store objects and they constitute a replica group (or simply, "a replica").
Examples
```sql
@ -174,18 +173,18 @@ In additional to this basic syntax, there are a couple of additional useful opti
### The policy playground
> This section assumes you have an up-to-date version of the `frostfs-cli`.
> This section assumes you have an up-to-date version of the `frostfs-cli`.
While simple placement policies have predictable results that can be understood at a glance, more complex ones need careful consideration before deployment. In order to simplify understanding a policy's outcome and experimenting while learning, a builtin tool is provided as part of the `frostfs-cli` for this purpose: the policy playground.
For the remainder of this guide, we will use the policy playground to setup a virtual netmap (that is, one that doesn't require any networking or deployment) and test various policies. In order to visualize this netmap easily, each node will have three attributes: a character, a shape and a color
For the remainder of this guide, we will use the policy playground to setup a virtual netmap (that is, one that doesn't require any networking or deployment) and test various policies. In order to visualize this netmap easily, each node will have three attributes: a character, a shape and a color
![Sample Netmap](./image/sample_netmap.svg)
We can start the policy playground as follows:
```sh
$ frostfs-cli container policy-playground
>
>
```
Since we didn't pass any endpoint, the initial netmap is empty, which we can verify with the `ls` command (to list the nodes in the netmap):
@ -401,7 +400,7 @@ FILTER Color EQ 'Green' AS GreenNodes
#### Example #6
```sql
REP 1 IN MyNodes
REP 2
REP 2
CBF 2
SELECT 1 FROM CuteNodes AS MyNodes
FILTER (Color EQ 'Blue') AND NOT (Shape EQ 'Circle' OR Shape EQ 'Square') AS CuteNodes
@ -443,4 +442,4 @@ Others:
- `ls`: list nodes in the current netmap and their attributes
- `add`: add a node to the current netmap. If it already exists, it will be overwritten.
- `remove`: remove a node from the current netmap.
- `eval`: evaluate a placement policy on the current netmap.
- `eval`: evaluate a placement policy on the current netmap.

View file

@ -2,7 +2,7 @@ package eacltest
import (
"bytes"
"crypto/rand"
"math/rand"
"testing"
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"

View file

@ -1,7 +1,7 @@
package eacl
import (
"crypto/rand"
"math/rand"
"testing"
"github.com/stretchr/testify/require"

View file

@ -23,8 +23,7 @@ type (
}
minAgg struct {
min float64
minFound bool
min float64
}
meanIQRAgg struct {
@ -103,13 +102,7 @@ func (a *meanAgg) Compute() float64 {
}
func (a *minAgg) Add(n float64) {
if !a.minFound {
a.min = n
a.minFound = true
return
}
if n < a.min {
if a.min == 0 || n < a.min {
a.min = n
}
}

View file

@ -1,44 +0,0 @@
package netmap
import (
"testing"
"github.com/stretchr/testify/require"
)
func TestMinAgg(t *testing.T) {
tests := []struct {
vals []float64
res float64
}{
{
vals: []float64{1, 2, 3, 0, 10},
res: 0,
},
{
vals: []float64{10, 0, 10, 0},
res: 0,
},
{
vals: []float64{0, 1, 2, 3, 10},
res: 0,
},
{
vals: []float64{0, 0, 0, 0},
res: 0,
},
{
vals: []float64{10, 10, 10, 10},
res: 10,
},
}
for _, test := range tests {
a := newMinAgg()
for _, val := range test.vals {
a.Add(val)
}
require.Equal(t, test.res, a.Compute())
}
}

View file

@ -40,10 +40,6 @@ type context struct {
// policy uses the UNIQUE flag. Nodes marked as used are not used in subsequent
// base selections.
usedNodes map[uint64]bool
// If true, returns an error when netmap does not contain enough nodes for selection.
// By default best effort is taken.
strict bool
}
// Various validation errors.

View file

@ -2,7 +2,6 @@ package netmap
import (
"encoding/json"
"fmt"
"os"
"path/filepath"
"testing"
@ -48,8 +47,7 @@ func TestPlacementPolicy_Interopability(t *testing.T) {
require.NoError(t, err)
for i := range ds {
filename := filepath.Join(testsDir, ds[i].Name())
bs, err := os.ReadFile(filename)
bs, err := os.ReadFile(filepath.Join(testsDir, ds[i].Name()))
require.NoError(t, err)
var tc TestCase
@ -58,7 +56,7 @@ func TestPlacementPolicy_Interopability(t *testing.T) {
srcNodes := make([]NodeInfo, len(tc.Nodes))
copy(srcNodes, tc.Nodes)
t.Run(fmt.Sprintf("%s:%s", filename, tc.Name), func(t *testing.T) {
t.Run(tc.Name, func(t *testing.T) {
var nm NetMap
nm.SetNodes(tc.Nodes)

View file

@ -146,19 +146,19 @@
"select 3 nodes in 3 distinct countries, same placement": {
"policy": {"replicas":[{"count":1,"selector":"Main"}],"containerBackupFactor":1,"selectors":[{"name":"Main","count":3,"clause":"DISTINCT","attribute":"Country","filter":"*"}],"filters":[]},
"pivot": "Y29udGFpbmVySUQ=",
"result": [[0, 2, 3]],
"result": [[4, 0, 7]],
"placement": {
"pivot": "b2JqZWN0SUQ=",
"result": [[0, 2, 3]]
"result": [[4, 0, 7]]
}
},
"select 6 nodes in 3 distinct countries, different placement": {
"policy": {"replicas":[{"count":1,"selector":"Main"}],"containerBackupFactor":2,"selectors":[{"name":"Main","count":3,"clause":"DISTINCT","attribute":"Country","filter":"*"}],"filters":[]},
"pivot": "Y29udGFpbmVySUQ=",
"result": [[0, 1, 2, 6, 3, 4]],
"result": [[4, 3, 0, 1, 7, 2]],
"placement": {
"pivot": "b2JqZWN0SUQ=",
"result": [[0, 1, 2, 6, 3, 4]]
"result": [[4, 3, 0, 7, 2, 1]]
}
}
}

View file

@ -1,95 +0,0 @@
{
"name": "non-strict selections",
"comment": "These test specify loose selection behaviour, to allow fetching already PUT objects even when there is not enough nodes to select from.",
"nodes": [
{
"attributes": [
{
"key": "Country",
"value": "Russia"
}
]
},
{
"attributes": [
{
"key": "Country",
"value": "Germany"
}
]
},
{
"attributes": [ ]
}
],
"tests": {
"not enough nodes (backup factor)": {
"policy": {
"replicas": [
{
"count": 1,
"selector": "MyStore"
}
],
"containerBackupFactor": 2,
"selectors": [
{
"name": "MyStore",
"count": 2,
"clause": "DISTINCT",
"attribute": "Country",
"filter": "FromRU"
}
],
"filters": [
{
"name": "FromRU",
"key": "Country",
"op": "EQ",
"value": "Russia",
"filters": [ ]
}
]
},
"result": [
[
0
]
]
},
"not enough nodes (buckets)": {
"policy": {
"replicas": [
{
"count": 1,
"selector": "MyStore"
}
],
"containerBackupFactor": 1,
"selectors": [
{
"name": "MyStore",
"count": 2,
"clause": "DISTINCT",
"attribute": "Country",
"filter": "FromRU"
}
],
"filters": [
{
"name": "FromRU",
"key": "Country",
"op": "EQ",
"value": "Russia",
"filters": [ ]
}
]
},
"result": [
[
0
]
]
}
}
}

View file

@ -104,14 +104,7 @@
"selectors": [],
"filters": []
},
"result": [
[
0,
1,
2,
3
]
]
"error": "not enough nodes"
}
}
}

View file

@ -24,12 +24,7 @@
"tests": {
"missing filter": {
"policy": {
"replicas": [
{
"count": 1,
"selector": "MyStore"
}
],
"replicas": [],
"containerBackupFactor": 1,
"selectors": [
{
@ -52,14 +47,9 @@
},
"error": "filter not found"
},
"not enough nodes (filter results in empty set)": {
"not enough nodes (backup factor)": {
"policy": {
"replicas": [
{
"count": 1,
"selector": "MyStore"
}
],
"replicas": [],
"containerBackupFactor": 2,
"selectors": [
{
@ -67,15 +57,40 @@
"count": 2,
"clause": "DISTINCT",
"attribute": "Country",
"filter": "FromMoon"
"filter": "FromRU"
}
],
"filters": [
{
"name": "FromMoon",
"name": "FromRU",
"key": "Country",
"op": "EQ",
"value": "Moon",
"value": "Russia",
"filters": []
}
]
},
"error": "not enough nodes"
},
"not enough nodes (buckets)": {
"policy": {
"replicas": [],
"containerBackupFactor": 1,
"selectors": [
{
"name": "MyStore",
"count": 2,
"clause": "DISTINCT",
"attribute": "Country",
"filter": "FromRU"
}
],
"filters": [
{
"name": "FromRU",
"key": "Country",
"op": "EQ",
"value": "Russia",
"filters": []
}
]

View file

@ -238,7 +238,7 @@ func (m NetMap) ContainerNodes(p PlacementPolicy, pivot []byte) ([][]NodeInfo, e
// marked as used by earlier replicas.
for i := range p.replicas {
sName := p.replicas[i].GetSelector()
if sName == "" && !(len(p.replicas) == 1 && len(p.selectors) == 1) {
if sName == "" {
var s netmap.Selector
s.SetCount(p.replicas[i].GetCount())
s.SetFilter(mainFilterName)
@ -258,9 +258,6 @@ func (m NetMap) ContainerNodes(p PlacementPolicy, pivot []byte) ([][]NodeInfo, e
}
if p.unique {
if c.processedSelectors[sName] == nil {
return nil, fmt.Errorf("selector not found: '%s'", sName)
}
nodes, err := c.getSelection(*c.processedSelectors[sName])
if err != nil {
return nil, err

View file

@ -551,7 +551,7 @@ func (p *PlacementPolicy) DecodeString(s string) error {
return errors.New("parsed nil value")
}
if err := validatePolicy(*parsed); err != nil {
if err := validatePolicy(*p); err != nil {
return fmt.Errorf("invalid policy: %w", err)
}
@ -605,13 +605,6 @@ var (
errUnknownSelector = errors.New("policy: selector not found")
// errSyntaxError is returned for errors found by ANTLR parser.
errSyntaxError = errors.New("policy: syntax error")
// errRedundantSelector is returned for errors found by selectors policy validator.
errRedundantSelector = errors.New("policy: found redundant selector")
// errUnnamedSelector is returned for errors found by selectors policy validator.
errUnnamedSelector = errors.New("policy: unnamed selectors are useless, " +
"make sure to pair REP and SELECT clauses: \"REP .. IN X\" + \"SELECT ... AS X\"")
// errRedundantSelector is returned for errors found by filters policy validator.
errRedundantFilter = errors.New("policy: found redundant filter")
)
type policyVisitor struct {
@ -852,52 +845,25 @@ func (p *policyVisitor) VisitExpr(ctx *parser.ExprContext) any {
// validatePolicy checks high-level constraints such as filter link in SELECT
// being actually defined in FILTER section.
func validatePolicy(p PlacementPolicy) error {
canOmitNames := len(p.selectors) == 1 && len(p.replicas) == 1
seenFilters := map[string]bool{}
expectedFilters := map[string]struct{}{}
for i := range p.filters {
seenFilters[p.filters[i].GetName()] = true
for _, f := range p.filters[i].GetFilters() {
if f.GetName() != "" {
expectedFilters[f.GetName()] = struct{}{}
}
}
}
seenSelectors := map[string]*netmap.Selector{}
seenSelectors := map[string]bool{}
for i := range p.selectors {
if p.selectors[i].GetName() == "" && !canOmitNames {
return errUnnamedSelector
if flt := p.selectors[i].GetFilter(); flt != mainFilterName && !seenFilters[flt] {
return fmt.Errorf("%w: '%s'", errUnknownFilter, flt)
}
if flt := p.selectors[i].GetFilter(); flt != mainFilterName {
expectedFilters[flt] = struct{}{}
if !seenFilters[flt] {
return fmt.Errorf("%w: '%s'", errUnknownFilter, flt)
}
}
seenSelectors[p.selectors[i].GetName()] = &p.selectors[i]
seenSelectors[p.selectors[i].GetName()] = true
}
for _, f := range p.filters {
if _, ok := expectedFilters[f.GetName()]; !ok {
return fmt.Errorf("%w: '%s'", errRedundantFilter, f.GetName())
}
}
expectedSelectors := map[string]struct{}{}
for i := range p.replicas {
selName := p.replicas[i].GetSelector()
if selName != "" || canOmitNames {
expectedSelectors[selName] = struct{}{}
if seenSelectors[selName] == nil {
return fmt.Errorf("%w: '%s'", errUnknownSelector, selName)
}
}
}
for _, s := range p.selectors {
if _, ok := expectedSelectors[s.GetName()]; !ok {
return fmt.Errorf("%w: to use selector '%s' use keyword IN", errRedundantSelector, s.GetName())
if sel := p.replicas[i].GetSelector(); sel != "" && !seenSelectors[sel] {
return fmt.Errorf("%w: '%s'", errUnknownSelector, sel)
}
}

View file

@ -1,65 +0,0 @@
package netmap
import (
"strings"
"testing"
"github.com/stretchr/testify/require"
)
func TestDecodeString(t *testing.T) {
testCases := []string{
`REP 2
CBF 2
SELECT 2 FROM *`,
`REP 1 IN X
CBF 1
SELECT 2 IN SAME Location FROM * AS X`,
`REP 1 IN X
REP 2 IN Y
CBF 1
SELECT 2 FROM * AS X
SELECT 3 FROM * AS Y`,
`REP 1 IN X
SELECT 2 IN City FROM Good AS X
FILTER Country EQ RU AS FromRU
FILTER Country EQ EN AS FromEN
FILTER @FromRU AND @FromEN AND Rating GT 7 AS Good`,
`REP 7 IN SPB
SELECT 1 IN City FROM SPBSSD AS SPB
FILTER City EQ SPB AND SSD EQ true OR City EQ SPB AND Rating GE 5 AS SPBSSD`,
`REP 7 IN SPB
SELECT 1 IN City FROM SPBSSD AS SPB
FILTER NOT (NOT (City EQ SPB) AND SSD EQ true OR City EQ SPB AND Rating GE 5) AS SPBSSD`,
`UNIQUE
REP 1
REP 1`,
}
var p PlacementPolicy
for _, testCase := range testCases {
require.NoError(t, p.DecodeString(testCase), "unable parse %s", testCase)
var b strings.Builder
require.NoError(t, p.WriteStringTo(&b))
require.Equal(t, testCase, b.String())
}
invalidTestCases := map[string]error{
`?REP 1`: errSyntaxError,
`REP 1 trailing garbage`: errSyntaxError,
`REP 1 REP 1 SELECT 4 FROM *`: errUnnamedSelector,
`REP 1 SELECT 4 FROM * SELECT 1 FROM *`: errUnnamedSelector,
`REP 1 IN X SELECT 4 FROM *`: errUnknownSelector,
`REP 1 IN X REP 2 SELECT 4 FROM * AS X FILTER 'UN-LOCODE' EQ 'RU LED' AS F`: errRedundantFilter,
}
for i := range invalidTestCases {
require.ErrorIs(t, p.DecodeString(i), invalidTestCases[i], "#%s", i)
}
}

View file

@ -1,6 +1,7 @@
package netmap_test
import (
"strings"
"testing"
. "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
@ -8,6 +9,55 @@ import (
"github.com/stretchr/testify/require"
)
func TestEncode(t *testing.T) {
testCases := []string{
`REP 1 IN X
CBF 1
SELECT 2 IN SAME Location FROM * AS X`,
`REP 1
SELECT 2 IN City FROM Good
FILTER Country EQ RU AS FromRU
FILTER @FromRU AND Rating GT 7 AS Good`,
`REP 7 IN SPB
SELECT 1 IN City FROM SPBSSD AS SPB
FILTER City EQ SPB AND SSD EQ true OR City EQ SPB AND Rating GE 5 AS SPBSSD`,
`REP 7 IN SPB
SELECT 1 IN City FROM SPBSSD AS SPB
FILTER NOT (NOT (City EQ SPB) AND SSD EQ true OR City EQ SPB AND Rating GE 5) AS SPBSSD`,
`UNIQUE
REP 1
REP 1`,
`REP 1 IN X
SELECT 1 FROM F AS X
FILTER 'UN-LOCODE' EQ 'RU LED' AS F`,
}
var p PlacementPolicy
for _, testCase := range testCases {
require.NoError(t, p.DecodeString(testCase))
var b strings.Builder
require.NoError(t, p.WriteStringTo(&b))
require.Equal(t, testCase, b.String())
}
invalidTestCases := []string{
`?REP 1`,
`REP 1 trailing garbage`,
}
for i := range invalidTestCases {
require.Error(t, p.DecodeString(invalidTestCases[i]), "#%d", i)
}
}
func TestPlacementPolicyEncoding(t *testing.T) {
v := netmaptest.PlacementPolicy()

View file

@ -60,7 +60,7 @@ func (c *context) getSelection(s netmap.Selector) ([]nodes, error) {
bucketCount, nodesInBucket := calcNodesCount(s)
buckets := c.getSelectionBase(s)
if c.strict && len(buckets) < bucketCount {
if len(buckets) < bucketCount {
return nil, fmt.Errorf("%w: '%s'", errNotEnoughNodes, s.GetName())
}
@ -96,7 +96,7 @@ func (c *context) getSelection(s netmap.Selector) ([]nodes, error) {
if len(res) < bucketCount {
// Fallback to using minimum allowed backup factor (1).
res = append(res, fallback...)
if c.strict && len(res) < bucketCount {
if len(res) < bucketCount {
return nil, fmt.Errorf("%w: '%s'", errNotEnoughNodes, s.GetName())
}
}
@ -110,13 +110,6 @@ func (c *context) getSelection(s netmap.Selector) ([]nodes, error) {
hrw.SortHasherSliceByWeightValue(res, weights, c.hrwSeedHash)
}
if len(res) < bucketCount {
if len(res) == 0 {
return nil, errNotEnoughNodes
}
bucketCount = len(res)
}
if s.GetAttribute() == "" {
res, fallback = res[:bucketCount], res[bucketCount:]
for i := range fallback {

View file

@ -1,10 +1,9 @@
package netmap
import (
"crypto/rand"
"encoding/binary"
"fmt"
mrand "math/rand"
"math/rand"
"sort"
"strconv"
"testing"
@ -29,10 +28,10 @@ func BenchmarkHRWSort(b *testing.B) {
node.SetPublicKey(key)
vectors[i] = nodes{node}
weights[i] = float64(mrand.Uint32()%10) / 10.0
weights[i] = float64(rand.Uint32()%10) / 10.0
}
pivot := mrand.Uint64()
pivot := rand.Uint64()
b.Run("sort by index, no weight", func(b *testing.B) {
realNodes := make([]nodes, netmapSize)
b.ResetTimer()
@ -283,55 +282,6 @@ func TestPlacementPolicy_Unique(t *testing.T) {
}
}
func TestPlacementPolicy_SingleOmitNames(t *testing.T) {
nodes := []NodeInfo{
nodeInfoFromAttributes("ID", "1", "Country", "Russia", "City", "SPB"),
nodeInfoFromAttributes("ID", "2", "Country", "Germany", "City", "Berlin"),
nodeInfoFromAttributes("ID", "3", "Country", "Russia", "City", "Moscow"),
nodeInfoFromAttributes("ID", "4", "Country", "France", "City", "Paris"),
nodeInfoFromAttributes("ID", "5", "Country", "France", "City", "Lyon"),
nodeInfoFromAttributes("ID", "6", "Country", "Russia", "City", "SPB"),
nodeInfoFromAttributes("ID", "7", "Country", "Russia", "City", "Moscow"),
nodeInfoFromAttributes("ID", "8", "Country", "Germany", "City", "Darmstadt"),
nodeInfoFromAttributes("ID", "9", "Country", "Germany", "City", "Frankfurt"),
nodeInfoFromAttributes("ID", "10", "Country", "Russia", "City", "SPB"),
nodeInfoFromAttributes("ID", "11", "Country", "Russia", "City", "Moscow"),
nodeInfoFromAttributes("ID", "12", "Country", "Germany", "City", "London"),
}
for i := range nodes {
pub := make([]byte, 33)
rand.Read(pub)
nodes[i].SetPublicKey(pub)
}
var nm NetMap
nm.SetNodes(nodes)
for _, unique := range []bool{false, true} {
t.Run(fmt.Sprintf("unique=%t", unique), func(t *testing.T) {
ssNamed := []Selector{newSelector("X", "City", 2, "FromRU", (*Selector).SelectDistinct)}
fsNamed := []Filter{newFilter("FromRU", "Country", "Russia", netmap.EQ)}
rsNamed := []ReplicaDescriptor{newReplica(1, "X")}
pNamed := newPlacementPolicy(3, rsNamed, ssNamed, fsNamed)
pNamed.unique = unique
vNamed, err := nm.ContainerNodes(pNamed, []byte{1})
require.NoError(t, err)
ssUnnamed := []Selector{newSelector("", "City", 2, "FromRU", (*Selector).SelectDistinct)}
fsUnnamed := []Filter{newFilter("FromRU", "Country", "Russia", netmap.EQ)}
rsUnnamed := []ReplicaDescriptor{newReplica(1, "")}
pUnnamed := newPlacementPolicy(3, rsUnnamed, ssUnnamed, fsUnnamed)
pUnnamed.unique = unique
vUnnamed, err := nm.ContainerNodes(pUnnamed, []byte{1})
require.NoError(t, err)
require.Equal(t, vNamed, vUnnamed)
})
}
}
func TestPlacementPolicy_MultiREP(t *testing.T) {
nodes := []NodeInfo{
nodeInfoFromAttributes("ID", "1", "Country", "Russia", "City", "SPB"),

View file

@ -1,7 +1,7 @@
package netmaptest
import (
"crypto/rand"
"math/rand"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
)
@ -70,7 +70,7 @@ func NetworkInfo() (x netmap.NetworkInfo) {
// NodeInfo returns random netmap.NodeInfo.
func NodeInfo() (x netmap.NodeInfo) {
key := make([]byte, 33)
_, _ = rand.Read(key)
rand.Read(key)
x.SetPublicKey(key)
x.SetNetworkEndpoints("1", "2", "3")

View file

@ -1,10 +1,10 @@
package ns
import (
"crypto/rand"
"errors"
"fmt"
"math/big"
"math/rand"
"strings"
"testing"

View file

@ -1,8 +1,8 @@
package oidtest
import (
"crypto/rand"
"crypto/sha256"
"math/rand"
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
@ -12,7 +12,7 @@ import (
func ID() oid.ID {
checksum := [sha256.Size]byte{}
_, _ = rand.Read(checksum[:])
rand.Read(checksum[:])
return idWithChecksum(checksum)
}

View file

@ -1,8 +1,8 @@
package object_test
import (
"crypto/rand"
"crypto/sha256"
"math/rand"
"testing"
v2object "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"

View file

@ -1,8 +1,8 @@
package object
import (
"crypto/rand"
"crypto/sha256"
"math/rand"
"testing"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/tombstone"

View file

@ -69,7 +69,3 @@ func (c *sessionCache) expired(val *cacheValue) bool {
// use epoch+1 (clear cache beforehand) to prevent 'expired session token' error right after epoch tick
return val.token.ExpiredAt(epoch + 1)
}
func (c *sessionCache) Epoch() uint64 {
return c.currentEpoch.Load()
}

View file

@ -189,7 +189,3 @@ func (m *mockClient) restartIfUnhealthy(ctx context.Context) (healthy bool, chan
}
return
}
func (m *mockClient) close() error {
return nil
}

View file

@ -1,172 +0,0 @@
package pool
import (
"context"
sdkClient "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/transformer"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
type logger interface {
log(level zapcore.Level, msg string, fields ...zap.Field)
}
type PrmObjectPutClientCutInit struct {
PrmObjectPut
}
func (c *clientWrapper) objectPutInitTransformer(prm PrmObjectPutClientCutInit) (*objectWriterTransformer, error) {
cl, err := c.getClient()
if err != nil {
return nil, err
}
var w objectWriterTransformer
w.it = internalTarget{
client: cl,
prm: prm,
address: c.address(),
logger: &c.clientStatusMonitor,
}
key := &c.prm.key
if prm.key != nil {
key = prm.key
}
w.ot = transformer.NewPayloadSizeLimiter(transformer.Params{
Key: key,
NextTargetInit: func() transformer.ObjectWriter { return &w.it },
MaxSize: prm.networkInfo.MaxObjectSize(),
WithoutHomomorphicHash: prm.withoutHomomorphicHash,
NetworkState: prm.networkInfo,
SessionToken: prm.stoken,
})
return &w, nil
}
type objectWriterTransformer struct {
ot transformer.ChunkedObjectWriter
it internalTarget
err error
}
func (x *objectWriterTransformer) WriteHeader(ctx context.Context, hdr object.Object) bool {
x.err = x.ot.WriteHeader(ctx, &hdr)
return x.err == nil
}
func (x *objectWriterTransformer) WritePayloadChunk(ctx context.Context, chunk []byte) bool {
_, x.err = x.ot.Write(ctx, chunk)
return x.err == nil
}
// ResObjectPut groups the final result values of ObjectPutInit operation.
type ResObjectPut struct {
Status apistatus.Status
OID oid.ID
}
// Close return non nil result in any case. If error occurred, the result contains only buffer for further reusing.
func (x *objectWriterTransformer) Close(ctx context.Context) (*ResObjectPut, error) {
ai, err := x.ot.Close(ctx)
if err != nil {
return nil, err
}
if ai != nil && ai.ParentID != nil {
x.it.res.OID = *ai.ParentID
}
return &x.it.res, nil
}
type internalTarget struct {
client *sdkClient.Client
res ResObjectPut
prm PrmObjectPutClientCutInit
useStream bool
address string
logger logger
resolveFrostFSErrors bool
}
func (it *internalTarget) WriteObject(ctx context.Context, o *object.Object) error {
putSingleImplemented, err := it.tryPutSingle(ctx, o)
if putSingleImplemented {
return err
}
it.logger.log(zapcore.DebugLevel, "putSingle not implemented, trying put as stream", zap.String("address", it.address))
it.useStream = true
return it.putAsStream(ctx, o)
}
func (it *internalTarget) putAsStream(ctx context.Context, o *object.Object) error {
var cliPrm sdkClient.PrmObjectPutInit
cliPrm.SetCopiesNumberByVectors(it.prm.copiesNumber)
if it.prm.stoken != nil {
cliPrm.WithinSession(*it.prm.stoken)
}
if it.prm.key != nil {
cliPrm.UseKey(*it.prm.key)
}
if it.prm.btoken != nil {
cliPrm.WithBearerToken(*it.prm.btoken)
}
wrt, err := it.client.ObjectPutInit(ctx, cliPrm)
if err != nil {
return err
}
if wrt.WriteHeader(ctx, *o) {
wrt.WritePayloadChunk(ctx, o.Payload())
}
res, err := wrt.Close(ctx)
if res != nil {
it.res.Status = res.Status()
it.res.OID = res.StoredObjectID()
}
return err
}
func (it *internalTarget) tryPutSingle(ctx context.Context, o *object.Object) (bool, error) {
if it.useStream {
return false, nil
}
var cliPrm sdkClient.PrmObjectPutSingle
cliPrm.SetCopiesNumber(it.prm.copiesNumber)
cliPrm.UseKey(it.prm.key)
if it.prm.stoken != nil {
cliPrm.WithinSession(*it.prm.stoken)
}
if it.prm.btoken != nil {
cliPrm.WithBearerToken(*it.prm.btoken)
}
cliPrm.SetObject(o.ToV2())
res, err := it.client.ObjectPutSingle(ctx, cliPrm)
if err != nil && status.Code(err) == codes.Unimplemented {
return false, err
}
if err == nil {
id, _ := o.ID()
it.res = ResObjectPut{
Status: res.Status(),
OID: id,
}
if !it.resolveFrostFSErrors && !apistatus.IsSuccessful(it.res.Status) {
return true, apistatus.ErrFromStatus(it.res.Status)
}
return true, nil
}
return true, err
}

View file

@ -78,16 +78,12 @@ type client interface {
dial(ctx context.Context) error
// see clientWrapper.restartIfUnhealthy.
restartIfUnhealthy(ctx context.Context) (bool, bool)
// see clientWrapper.close.
close() error
}
// clientStatus provide access to some metrics for connection.
type clientStatus interface {
// isHealthy checks if the connection can handle requests.
isHealthy() bool
// isDialed checks if the connection was created.
isDialed() bool
// setUnhealthy marks client as unhealthy.
setUnhealthy()
// address return address of endpoint.
@ -109,7 +105,7 @@ var errPoolClientUnhealthy = errors.New("pool client unhealthy")
type clientStatusMonitor struct {
logger *zap.Logger
addr string
healthy *atomic.Uint32
healthy *atomic.Bool
errorThreshold uint32
mu sync.RWMutex // protect counters
@ -118,22 +114,6 @@ type clientStatusMonitor struct {
methods []*methodStatus
}
// values for healthy status of clientStatusMonitor.
const (
// statusUnhealthyOnDial is set when dialing to the endpoint is failed,
// so there is no connection to the endpoint, and pool should not close it
// before re-establishing connection once again.
statusUnhealthyOnDial = iota
// statusUnhealthyOnRequest is set when communication after dialing to the
// endpoint is failed due to immediate or accumulated errors, connection is
// available and pool should close it before re-establishing connection once again.
statusUnhealthyOnRequest
// statusHealthy is set when connection is ready to be used by the pool.
statusHealthy
)
// methodStatus provide statistic for specific method.
type methodStatus struct {
name string
@ -215,8 +195,8 @@ func newClientStatusMonitor(logger *zap.Logger, addr string, errorThreshold uint
methods[i] = &methodStatus{name: i.String()}
}
healthy := new(atomic.Uint32)
healthy.Store(statusHealthy)
healthy := new(atomic.Bool)
healthy.Store(true)
return clientStatusMonitor{
logger: logger,
@ -272,11 +252,6 @@ func (x *wrapperPrm) setKey(key ecdsa.PrivateKey) {
x.key = key
}
// setLogger sets sdkClient.Client logger.
func (x *wrapperPrm) setLogger(logger *zap.Logger) {
x.logger = logger
}
// setDialTimeout sets the timeout for connection to be established.
func (x *wrapperPrm) setDialTimeout(timeout time.Duration) {
x.dialTimeout = timeout
@ -342,7 +317,7 @@ func (c *clientWrapper) dial(ctx context.Context) error {
prmDial.SetGRPCDialOptions(c.prm.dialOptions...)
if err = cl.Dial(ctx, prmDial); err != nil {
c.setUnhealthyOnDial()
c.setUnhealthy()
return err
}
@ -359,12 +334,6 @@ func (c *clientWrapper) restartIfUnhealthy(ctx context.Context) (healthy, change
wasHealthy = true
}
// if connection is dialed before, to avoid routine / connection leak,
// pool has to close it and then initialize once again.
if c.isDialed() {
_ = c.close()
}
var cl sdkClient.Client
var prmInit sdkClient.PrmInit
prmInit.SetDefaultPrivateKey(c.prm.key)
@ -379,7 +348,7 @@ func (c *clientWrapper) restartIfUnhealthy(ctx context.Context) (healthy, change
prmDial.SetGRPCDialOptions(c.prm.dialOptions...)
if err := cl.Dial(ctx, prmDial); err != nil {
c.setUnhealthyOnDial()
c.setUnhealthy()
return false, wasHealthy
}
@ -580,9 +549,11 @@ func (c *clientWrapper) containerSetEACL(ctx context.Context, prm PrmContainerSe
return err
}
cliPrm := sdkClient.PrmContainerSetEACL{
Table: &prm.Table,
Session: prm.Session,
var cliPrm sdkClient.PrmContainerSetEACL
cliPrm.SetTable(prm.table)
if prm.sessionSet {
cliPrm.WithinSession(prm.session)
}
start := time.Now()
@ -596,19 +567,16 @@ func (c *clientWrapper) containerSetEACL(ctx context.Context, prm PrmContainerSe
return fmt.Errorf("set eacl on client: %w", err)
}
if prm.WaitParams == nil {
prm.WaitParams = defaultWaitParams()
}
if err := prm.WaitParams.CheckValidity(); err != nil {
return fmt.Errorf("invalid wait parameters: %w", err)
if !prm.waitParamsSet {
prm.waitParams.setDefaults()
}
var cIDp *cid.ID
if cID, set := prm.Table.CID(); set {
if cID, set := prm.table.CID(); set {
cIDp = &cID
}
err = waitForEACLPresence(ctx, c, cIDp, &prm.Table, prm.WaitParams)
err = waitForEACLPresence(ctx, c, cIDp, &prm.table, &prm.waitParams)
if err = c.handleError(ctx, nil, err); err != nil {
return fmt.Errorf("wait eacl presence on client: %w", err)
}
@ -660,18 +628,6 @@ func (c *clientWrapper) networkInfo(ctx context.Context, _ prmNetworkInfo) (netm
// objectPut writes object to FrostFS.
func (c *clientWrapper) objectPut(ctx context.Context, prm PrmObjectPut) (oid.ID, error) {
if prm.bufferMaxSize == 0 {
prm.bufferMaxSize = defaultBufferMaxSizeForPut
}
if prm.clientCut {
return c.objectPutClientCut(ctx, prm)
}
return c.objectPutServerCut(ctx, prm)
}
func (c *clientWrapper) objectPutServerCut(ctx context.Context, prm PrmObjectPut) (oid.ID, error) {
cl, err := c.getClient()
if err != nil {
return oid.ID{}, err
@ -709,8 +665,10 @@ func (c *clientWrapper) objectPutServerCut(ctx context.Context, prm PrmObjectPut
}
if prm.payload != nil {
if sz == 0 || sz > prm.bufferMaxSize {
sz = prm.bufferMaxSize
const defaultBufferSizePut = 3 << 20 // configure?
if sz == 0 || sz > defaultBufferSizePut {
sz = defaultBufferSizePut
}
buf := make([]byte, sz)
@ -751,73 +709,6 @@ func (c *clientWrapper) objectPutServerCut(ctx context.Context, prm PrmObjectPut
return res.StoredObjectID(), nil
}
func (c *clientWrapper) objectPutClientCut(ctx context.Context, prm PrmObjectPut) (oid.ID, error) {
putInitPrm := PrmObjectPutClientCutInit{
PrmObjectPut: prm,
}
start := time.Now()
wObj, err := c.objectPutInitTransformer(putInitPrm)
c.incRequests(time.Since(start), methodObjectPut)
if err = c.handleError(ctx, nil, err); err != nil {
return oid.ID{}, fmt.Errorf("init writing on API client: %w", err)
}
if wObj.WriteHeader(ctx, prm.hdr) {
sz := prm.hdr.PayloadSize()
if data := prm.hdr.Payload(); len(data) > 0 {
if prm.payload != nil {
prm.payload = io.MultiReader(bytes.NewReader(data), prm.payload)
} else {
prm.payload = bytes.NewReader(data)
sz = uint64(len(data))
}
}
if prm.payload != nil {
if sz == 0 || sz > prm.bufferMaxSize {
sz = prm.bufferMaxSize
}
buf := make([]byte, sz)
var n int
for {
n, err = prm.payload.Read(buf)
if n > 0 {
start = time.Now()
successWrite := wObj.WritePayloadChunk(ctx, buf[:n])
c.incRequests(time.Since(start), methodObjectPut)
if !successWrite {
break
}
continue
}
if errors.Is(err, io.EOF) {
break
}
return oid.ID{}, fmt.Errorf("read payload: %w", c.handleError(ctx, nil, err))
}
}
}
res, err := wObj.Close(ctx)
var st apistatus.Status
if res != nil {
st = res.Status
}
if err = c.handleError(ctx, st, err); err != nil { // here err already carries both status and client errors
return oid.ID{}, fmt.Errorf("client failure: %w", err)
}
return res.OID, nil
}
// objectDelete invokes sdkClient.ObjectDelete parse response status to error.
func (c *clientWrapper) objectDelete(ctx context.Context, prm PrmObjectDelete) error {
cl, err := c.getClient()
@ -825,15 +716,20 @@ func (c *clientWrapper) objectDelete(ctx context.Context, prm PrmObjectDelete) e
return err
}
cnr := prm.addr.Container()
obj := prm.addr.Object()
var cliPrm sdkClient.PrmObjectDelete
cliPrm.FromContainer(prm.addr.Container())
cliPrm.ByID(prm.addr.Object())
cliPrm := sdkClient.PrmObjectDelete{
BearerToken: prm.btoken,
Session: prm.stoken,
ContainerID: &cnr,
ObjectID: &obj,
Key: prm.key,
if prm.stoken != nil {
cliPrm.WithinSession(*prm.stoken)
}
if prm.btoken != nil {
cliPrm.WithBearerToken(*prm.btoken)
}
if prm.key != nil {
cliPrm.UseKey(*prm.key)
}
start := time.Now()
@ -856,15 +752,20 @@ func (c *clientWrapper) objectGet(ctx context.Context, prm PrmObjectGet) (ResGet
return ResGetObject{}, err
}
prmCnr := prm.addr.Container()
prmObj := prm.addr.Object()
var cliPrm sdkClient.PrmObjectGet
cliPrm.FromContainer(prm.addr.Container())
cliPrm.ByID(prm.addr.Object())
cliPrm := sdkClient.PrmObjectGet{
BearerToken: prm.btoken,
Session: prm.stoken,
ContainerID: &prmCnr,
ObjectID: &prmObj,
Key: prm.key,
if prm.stoken != nil {
cliPrm.WithinSession(*prm.stoken)
}
if prm.btoken != nil {
cliPrm.WithBearerToken(*prm.btoken)
}
if prm.key != nil {
cliPrm.UseKey(*prm.key)
}
var res ResGetObject
@ -904,16 +805,23 @@ func (c *clientWrapper) objectHead(ctx context.Context, prm PrmObjectHead) (obje
return object.Object{}, err
}
prmCnr := prm.addr.Container()
prmObj := prm.addr.Object()
var cliPrm sdkClient.PrmObjectHead
cliPrm.FromContainer(prm.addr.Container())
cliPrm.ByID(prm.addr.Object())
if prm.raw {
cliPrm.MarkRaw()
}
cliPrm := sdkClient.PrmObjectHead{
BearerToken: prm.btoken,
Session: prm.stoken,
Raw: prm.raw,
ContainerID: &prmCnr,
ObjectID: &prmObj,
Key: prm.key,
if prm.stoken != nil {
cliPrm.WithinSession(*prm.stoken)
}
if prm.btoken != nil {
cliPrm.WithBearerToken(*prm.btoken)
}
if prm.key != nil {
cliPrm.UseKey(*prm.key)
}
var obj object.Object
@ -942,17 +850,22 @@ func (c *clientWrapper) objectRange(ctx context.Context, prm PrmObjectRange) (Re
return ResObjectRange{}, err
}
prmCnr := prm.addr.Container()
prmObj := prm.addr.Object()
var cliPrm sdkClient.PrmObjectRange
cliPrm.FromContainer(prm.addr.Container())
cliPrm.ByID(prm.addr.Object())
cliPrm.SetOffset(prm.off)
cliPrm.SetLength(prm.ln)
cliPrm := sdkClient.PrmObjectRange{
BearerToken: prm.btoken,
Session: prm.stoken,
ContainerID: &prmCnr,
ObjectID: &prmObj,
Offset: prm.off,
Length: prm.ln,
Key: prm.key,
if prm.stoken != nil {
cliPrm.WithinSession(*prm.stoken)
}
if prm.btoken != nil {
cliPrm.WithBearerToken(*prm.btoken)
}
if prm.key != nil {
cliPrm.UseKey(*prm.key)
}
start := time.Now()
@ -1009,10 +922,9 @@ func (c *clientWrapper) sessionCreate(ctx context.Context, prm prmCreateSession)
return resCreateSession{}, err
}
cliPrm := sdkClient.PrmSessionCreate{
Expiration: prm.exp,
Key: &prm.key,
}
var cliPrm sdkClient.PrmSessionCreate
cliPrm.SetExp(prm.exp)
cliPrm.UseKey(prm.key)
start := time.Now()
res, err := cl.SessionCreate(ctx, cliPrm)
@ -1032,23 +944,15 @@ func (c *clientWrapper) sessionCreate(ctx context.Context, prm prmCreateSession)
}
func (c *clientStatusMonitor) isHealthy() bool {
return c.healthy.Load() == statusHealthy
}
func (c *clientStatusMonitor) isDialed() bool {
return c.healthy.Load() != statusUnhealthyOnDial
return c.healthy.Load()
}
func (c *clientStatusMonitor) setHealthy() {
c.healthy.Store(statusHealthy)
c.healthy.Store(true)
}
func (c *clientStatusMonitor) setUnhealthy() {
c.healthy.Store(statusUnhealthyOnRequest)
}
func (c *clientStatusMonitor) setUnhealthyOnDial() {
c.healthy.Store(statusUnhealthyOnDial)
c.healthy.Store(false)
}
func (c *clientStatusMonitor) address() string {
@ -1067,20 +971,12 @@ func (c *clientStatusMonitor) incErrorRate() {
}
c.mu.Unlock()
if thresholdReached {
c.log(zapcore.WarnLevel, "error threshold reached",
if thresholdReached && c.logger != nil {
c.logger.Warn("error threshold reached",
zap.String("address", c.addr), zap.Uint32("threshold", c.errorThreshold))
}
}
func (c *clientStatusMonitor) log(level zapcore.Level, msg string, fields ...zap.Field) {
if c.logger == nil {
return
}
c.logger.Log(level, msg, fields...)
}
func (c *clientStatusMonitor) currentErrorRate() uint32 {
c.mu.RLock()
defer c.mu.RUnlock()
@ -1114,13 +1010,6 @@ func (c *clientWrapper) incRequests(elapsed time.Duration, method MethodIndex) {
}
}
func (c *clientWrapper) close() error {
if c.client != nil {
return c.client.Close()
}
return nil
}
func (c *clientStatusMonitor) handleError(ctx context.Context, st apistatus.Status, err error) error {
if err != nil {
if needCountError(ctx, err) {
@ -1338,6 +1227,12 @@ func (x *WaitParams) SetPollInterval(tick time.Duration) {
x.PollInterval = tick
}
// Deprecated: Use defaultWaitParams() instead.
func (x *WaitParams) setDefaults() {
x.Timeout = 120 * time.Second
x.PollInterval = 5 * time.Second
}
func defaultWaitParams() *WaitParams {
return &WaitParams{
Timeout: 120 * time.Second,
@ -1426,13 +1321,6 @@ type PrmObjectPut struct {
payload io.Reader
copiesNumber []uint32
clientCut bool
networkInfo netmap.NetworkInfo
withoutHomomorphicHash bool
bufferMaxSize uint64
}
// SetHeader specifies header of the object.
@ -1457,32 +1345,6 @@ func (x *PrmObjectPut) SetCopiesNumberVector(copiesNumber []uint32) {
x.copiesNumber = copiesNumber
}
// SetClientCut enables client cut for objects. It means that full object is prepared on client side
// and retrying is possible. But this leads to additional memory using for buffering object parts.
// Buffer size for every put is MaxObjectSize value from FrostFS network.
// There is limit for total memory allocation for in-flight request and
// can be set by InitParameters.SetMaxClientCutMemory (default value is 1gb).
// Put requests will fail if this limit be reached.
func (x *PrmObjectPut) SetClientCut(clientCut bool) {
x.clientCut = clientCut
}
// WithoutHomomorphicHash if set to true do not use Tillich-Zémor hash for payload.
func (x *PrmObjectPut) WithoutHomomorphicHash(v bool) {
x.withoutHomomorphicHash = v
}
// SetBufferMaxSize sets max buffer size to read payload.
// This value isn't used if object size is set explicitly and less than this value.
// Default value 3MB.
func (x *PrmObjectPut) SetBufferMaxSize(size uint64) {
x.bufferMaxSize = size
}
func (x *PrmObjectPut) setNetworkInfo(ni netmap.NetworkInfo) {
x.networkInfo = ni
}
// PrmObjectDelete groups parameters of DeleteObject operation.
type PrmObjectDelete struct {
prmCommon
@ -1670,41 +1532,39 @@ func (x *PrmContainerEACL) SetContainerID(cnrID cid.ID) {
// PrmContainerSetEACL groups parameters of SetEACL operation.
type PrmContainerSetEACL struct {
Table eacl.Table
table eacl.Table
Session *session.Container
sessionSet bool
session session.Container
WaitParams *WaitParams
waitParams WaitParams
waitParamsSet bool
}
// SetTable sets structure of container's extended ACL to be used as a
// parameter of the base client's operation.
//
// See git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client.PrmContainerSetEACL.SetTable.
//
// Deprecated: Use PrmContainerSetEACL.Table instead.
func (x *PrmContainerSetEACL) SetTable(table eacl.Table) {
x.Table = table
x.table = table
}
// WithinSession specifies session to be used as a parameter of the base
// client's operation.
//
// See git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client.PrmContainerSetEACL.WithinSession.
//
// Deprecated: Use PrmContainerSetEACL.Session instead.
func (x *PrmContainerSetEACL) WithinSession(s session.Container) {
x.Session = &s
x.session = s
x.sessionSet = true
}
// SetWaitParams specifies timeout params to complete operation.
// If not provided the default one will be used.
// Panics if any of the wait params isn't positive.
//
// Deprecated: Use PrmContainerSetEACL.WaitParams instead.
func (x *PrmContainerSetEACL) SetWaitParams(waitParams WaitParams) {
waitParams.checkForPositive()
x.WaitParams = &waitParams
x.waitParams = waitParams
x.waitParamsSet = true
}
// PrmBalanceGet groups parameters of Balance operation.
@ -1778,8 +1638,6 @@ type Pool struct {
rebalanceParams rebalanceParameters
clientBuilder clientBuilder
logger *zap.Logger
maxObjectSize uint64
}
type innerPool struct {
@ -1796,8 +1654,6 @@ const (
defaultHealthcheckTimeout = 4 * time.Second
defaultDialTimeout = 5 * time.Second
defaultStreamTimeout = 10 * time.Second
defaultBufferMaxSizeForPut = 3 * 1024 * 1024 // 3 MB
)
// NewPool creates connection pool using parameters.
@ -1857,7 +1713,7 @@ func (p *Pool) Dial(ctx context.Context) error {
}
var st session.Object
err := initSessionForDuration(ctx, &st, clients[j], p.rebalanceParams.sessionExpirationDuration, *p.key, false)
err := initSessionForDuration(ctx, &st, clients[j], p.rebalanceParams.sessionExpirationDuration, *p.key)
if err != nil {
clients[j].setUnhealthy()
p.log(zap.WarnLevel, "failed to create frostfs session token for client",
@ -1865,7 +1721,7 @@ func (p *Pool) Dial(ctx context.Context) error {
continue
}
_ = p.cache.Put(formCacheKey(addr, p.key, false), st)
_ = p.cache.Put(formCacheKey(addr, p.key), st)
atLeastOneHealthy = true
}
source := rand.NewSource(time.Now().UnixNano())
@ -1886,12 +1742,6 @@ func (p *Pool) Dial(ctx context.Context) error {
p.closedCh = make(chan struct{})
p.innerPools = inner
ni, err := p.NetworkInfo(ctx)
if err != nil {
return fmt.Errorf("get network info for max object size: %w", err)
}
p.maxObjectSize = ni.MaxObjectSize()
go p.startRebalance(ctx)
return nil
}
@ -1934,7 +1784,6 @@ func fillDefaultInitParams(params *InitParameters, cache *sessionCache) {
var prm wrapperPrm
prm.setAddress(addr)
prm.setKey(*params.key)
prm.setLogger(params.logger)
prm.setDialTimeout(params.nodeDialTimeout)
prm.setStreamTimeout(params.nodeStreamTimeout)
prm.setErrorThreshold(params.errorThreshold)
@ -2103,15 +1952,9 @@ func (p *innerPool) connection() (client, error) {
return nil, errors.New("no healthy client")
}
func formCacheKey(address string, key *ecdsa.PrivateKey, clientCut bool) string {
func formCacheKey(address string, key *ecdsa.PrivateKey) string {
k := keys.PrivateKey{PrivateKey: *key}
stype := "server"
if clientCut {
stype = "client"
}
return address + stype + k.String()
return address + k.String()
}
func (p *Pool) checkSessionTokenErr(err error, address string) bool {
@ -2127,7 +1970,7 @@ func (p *Pool) checkSessionTokenErr(err error, address string) bool {
return false
}
func initSessionForDuration(ctx context.Context, dst *session.Object, c client, dur uint64, ownerKey ecdsa.PrivateKey, clientCut bool) error {
func initSessionForDuration(ctx context.Context, dst *session.Object, c client, dur uint64, ownerKey ecdsa.PrivateKey) error {
ni, err := c.networkInfo(ctx, prmNetworkInfo{})
if err != nil {
return err
@ -2145,25 +1988,23 @@ func initSessionForDuration(ctx context.Context, dst *session.Object, c client,
prm.setExp(exp)
prm.useKey(ownerKey)
var (
id uuid.UUID
key frostfsecdsa.PublicKey
)
res, err := c.sessionCreate(ctx, prm)
if err != nil {
return err
}
if clientCut {
id = uuid.New()
key = frostfsecdsa.PublicKey(ownerKey.PublicKey)
} else {
res, err := c.sessionCreate(ctx, prm)
if err != nil {
return err
}
if err = id.UnmarshalBinary(res.id); err != nil {
return fmt.Errorf("invalid session token ID: %w", err)
}
if err = key.Decode(res.sessionKey); err != nil {
return fmt.Errorf("invalid public session key: %w", err)
}
var id uuid.UUID
err = id.UnmarshalBinary(res.id)
if err != nil {
return fmt.Errorf("invalid session token ID: %w", err)
}
var key frostfsecdsa.PublicKey
err = key.Decode(res.sessionKey)
if err != nil {
return fmt.Errorf("invalid public session key: %w", err)
}
dst.SetID(id)
@ -2189,8 +2030,6 @@ type callContext struct {
sessionCnr cid.ID
sessionObjSet bool
sessionObjs []oid.ID
sessionClientCut bool
}
func (p *Pool) initCallContext(ctx *callContext, cfg prmCommon, prmCtx prmContext) error {
@ -2227,12 +2066,12 @@ func (p *Pool) initCallContext(ctx *callContext, cfg prmCommon, prmCtx prmContex
// opens new session or uses cached one.
// Must be called only on initialized callContext with set sessionTarget.
func (p *Pool) openDefaultSession(ctx context.Context, cc *callContext) error {
cacheKey := formCacheKey(cc.endpoint, cc.key, cc.sessionClientCut)
cacheKey := formCacheKey(cc.endpoint, cc.key)
tok, ok := p.cache.Get(cacheKey)
if !ok {
// init new session
err := initSessionForDuration(ctx, &tok, cc.client, p.stokenDuration, *cc.key, cc.sessionClientCut)
err := initSessionForDuration(ctx, &tok, cc.client, p.stokenDuration, *cc.key)
if err != nil {
return fmt.Errorf("session API client: %w", err)
}
@ -2297,7 +2136,6 @@ func (p *Pool) PutObject(ctx context.Context, prm PrmObjectPut) (oid.ID, error)
p.fillAppropriateKey(&prm.prmCommon)
var ctxCall callContext
ctxCall.sessionClientCut = prm.clientCut
if err := p.initCallContext(&ctxCall, prm.prmCommon, prmCtx); err != nil {
return oid.ID{}, fmt.Errorf("init call context: %w", err)
}
@ -2309,13 +2147,6 @@ func (p *Pool) PutObject(ctx context.Context, prm PrmObjectPut) (oid.ID, error)
}
}
if prm.clientCut {
var ni netmap.NetworkInfo
ni.SetCurrentEpoch(p.cache.Epoch())
ni.SetMaxObjectSize(p.maxObjectSize) // we want to use initial max object size in PayloadSizeLimiter
prm.setNetworkInfo(ni)
}
id, err := ctxCall.client.objectPut(ctx, prm)
if err != nil {
// removes session token from cache in case of token error
@ -2807,15 +2638,6 @@ func (p *Pool) NetworkInfo(ctx context.Context) (netmap.NetworkInfo, error) {
func (p *Pool) Close() {
p.cancel()
<-p.closedCh
// close all clients
for _, pools := range p.innerPools {
for _, cli := range pools.clients {
if cli.isDialed() {
_ = cli.close()
}
}
}
}
// SyncContainerWithNetwork applies network configuration received via

View file

@ -106,7 +106,7 @@ func TestBuildPoolOneNodeFailed(t *testing.T) {
if err != nil {
return false
}
st, _ := clientPool.cache.Get(formCacheKey(cp.address(), clientPool.key, false))
st, _ := clientPool.cache.Get(formCacheKey(cp.address(), clientPool.key))
return st.AssertAuthKey(&expectedAuthKey)
}
require.Never(t, condition, 900*time.Millisecond, 100*time.Millisecond)
@ -141,7 +141,7 @@ func TestOneNode(t *testing.T) {
cp, err := pool.connection()
require.NoError(t, err)
st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key, false))
st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key))
expectedAuthKey := frostfsecdsa.PublicKey(key1.PublicKey)
require.True(t, st.AssertAuthKey(&expectedAuthKey))
}
@ -171,7 +171,7 @@ func TestTwoNodes(t *testing.T) {
cp, err := pool.connection()
require.NoError(t, err)
st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key, false))
st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key))
require.True(t, assertAuthKeyForAny(st, clientKeys))
}
@ -226,7 +226,7 @@ func TestOneOfTwoFailed(t *testing.T) {
for i := 0; i < 5; i++ {
cp, err := pool.connection()
require.NoError(t, err)
st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key, false))
st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key))
require.True(t, assertAuthKeyForAny(st, clientKeys))
}
}
@ -296,7 +296,7 @@ func TestSessionCache(t *testing.T) {
// cache must contain session token
cp, err := pool.connection()
require.NoError(t, err)
st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key, false))
st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key))
require.True(t, st.AssertAuthKey(&expectedAuthKey))
var prm PrmObjectGet
@ -309,7 +309,7 @@ func TestSessionCache(t *testing.T) {
// cache must not contain session token
cp, err = pool.connection()
require.NoError(t, err)
_, ok := pool.cache.Get(formCacheKey(cp.address(), pool.key, false))
_, ok := pool.cache.Get(formCacheKey(cp.address(), pool.key))
require.False(t, ok)
var prm2 PrmObjectPut
@ -321,7 +321,7 @@ func TestSessionCache(t *testing.T) {
// cache must contain session token
cp, err = pool.connection()
require.NoError(t, err)
st, _ = pool.cache.Get(formCacheKey(cp.address(), pool.key, false))
st, _ = pool.cache.Get(formCacheKey(cp.address(), pool.key))
require.True(t, st.AssertAuthKey(&expectedAuthKey))
}
@ -365,7 +365,7 @@ func TestPriority(t *testing.T) {
firstNode := func() bool {
cp, err := pool.connection()
require.NoError(t, err)
st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key, false))
st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key))
return st.AssertAuthKey(&expectedAuthKey1)
}
@ -373,7 +373,7 @@ func TestPriority(t *testing.T) {
secondNode := func() bool {
cp, err := pool.connection()
require.NoError(t, err)
st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key, false))
st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key))
return st.AssertAuthKey(&expectedAuthKey2)
}
require.Never(t, secondNode, time.Second, 200*time.Millisecond)
@ -410,7 +410,7 @@ func TestSessionCacheWithKey(t *testing.T) {
// cache must contain session token
cp, err := pool.connection()
require.NoError(t, err)
st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key, false))
st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key))
require.True(t, st.AssertAuthKey(&expectedAuthKey))
var prm PrmObjectDelete
@ -420,7 +420,7 @@ func TestSessionCacheWithKey(t *testing.T) {
err = pool.DeleteObject(ctx, prm)
require.NoError(t, err)
st, _ = pool.cache.Get(formCacheKey(cp.address(), anonKey, false))
st, _ = pool.cache.Get(formCacheKey(cp.address(), anonKey))
require.True(t, st.AssertAuthKey(&expectedAuthKey))
}
@ -523,44 +523,6 @@ func TestStatusMonitor(t *testing.T) {
require.Equal(t, uint64(count), monitor.overallErrorRate())
require.Equal(t, uint32(1), monitor.currentErrorRate())
t.Run("healthy status", func(t *testing.T) {
cases := []struct {
action func(*clientStatusMonitor)
status uint32
isDialed bool
isHealthy bool
description string
}{
{
action: func(m *clientStatusMonitor) { m.setUnhealthyOnDial() },
status: statusUnhealthyOnDial,
isDialed: false,
isHealthy: false,
description: "set unhealthy on dial",
},
{
action: func(m *clientStatusMonitor) { m.setUnhealthy() },
status: statusUnhealthyOnRequest,
isDialed: true,
isHealthy: false,
description: "set unhealthy on request",
},
{
action: func(m *clientStatusMonitor) { m.setHealthy() },
status: statusHealthy,
isDialed: true,
isHealthy: true,
description: "set healthy",
},
}
for _, tc := range cases {
tc.action(&monitor)
require.Equal(t, tc.status, monitor.healthy.Load())
require.Equal(t, tc.isDialed, monitor.isDialed())
require.Equal(t, tc.isHealthy, monitor.isHealthy())
}
})
}
func TestHandleError(t *testing.T) {

View file

@ -3,7 +3,6 @@ package tree
import (
"context"
"crypto/tls"
"errors"
"fmt"
"sync"
@ -23,11 +22,6 @@ type treeClient struct {
healthy bool
}
var (
// ErrUnhealthyEndpoint is returned when client in the pool considered unavailable.
ErrUnhealthyEndpoint = errors.New("unhealthy endpoint")
)
// newTreeClient creates new tree client with auto dial.
func newTreeClient(addr string, opts ...grpc.DialOption) *treeClient {
return &treeClient{
@ -108,7 +102,7 @@ func (c *treeClient) serviceClient() (grpcService.TreeServiceClient, error) {
defer c.mu.RUnlock()
if c.conn == nil || !c.healthy {
return nil, fmt.Errorf("%w: '%s'", ErrUnhealthyEndpoint, c.address)
return nil, fmt.Errorf("unhealthy endpoint: '%s'", c.address)
}
return c.service, nil

View file

@ -32,9 +32,6 @@ var (
// ErrNodeAccessDenied is returned from Tree service in case of access denied error.
ErrNodeAccessDenied = errors.New("access denied")
// errNodeEmpty is used to trigger retry when 'GetNodeByPath' return empty result.
errNodeEmptyResult = errors.New("empty result")
)
// client represents virtual connection to the single FrostFS tree service from which Pool is formed.
@ -299,15 +296,8 @@ func (p *Pool) GetNodes(ctx context.Context, prm GetNodesParams) ([]*grpcService
var resp *grpcService.GetNodeByPathResponse
if err := p.requestWithRetry(func(client grpcService.TreeServiceClient) (inErr error) {
resp, inErr = client.GetNodeByPath(ctx, request)
// Pool wants to do retry 'GetNodeByPath' request if result is empty.
// Empty result is expected due to delayed tree service sync.
// Return an error there to trigger retry and ignore it after,
// to keep compatibility with 'GetNodeByPath' implementation.
if inErr == nil && len(resp.Body.Nodes) == 0 {
return errNodeEmptyResult
}
return handleError("failed to get node by path", inErr)
}); err != nil && !errors.Is(err, errNodeEmptyResult) {
}); err != nil {
return nil, err
}
@ -730,8 +720,8 @@ func (p *Pool) setStartIndices(i, j int) {
func (p *Pool) requestWithRetry(fn func(client grpcService.TreeServiceClient) error) error {
var (
err, finErr error
cl grpcService.TreeServiceClient
err error
cl grpcService.TreeServiceClient
)
startI, startJ := p.getStartIndices()
@ -750,44 +740,16 @@ func (p *Pool) requestWithRetry(fn func(client grpcService.TreeServiceClient) er
}
return err
}
finErr = finalError(finErr, err)
p.log(zap.DebugLevel, "tree request error", zap.String("address", p.innerPools[indexI].clients[indexJ].endpoint()), zap.Error(err))
}
startJ = 0
}
return finErr
return err
}
func shouldTryAgain(err error) bool {
return !(err == nil || errors.Is(err, ErrNodeAccessDenied))
}
func prioErr(err error) int {
switch {
case err == nil:
return -1
case errors.Is(err, ErrNodeAccessDenied):
return 100
case errors.Is(err, ErrNodeNotFound) ||
errors.Is(err, errNodeEmptyResult):
return 200
case errors.Is(err, ErrUnhealthyEndpoint):
return 300
default:
return 500
}
}
func finalError(current, candidate error) error {
if current == nil || candidate == nil {
return candidate
}
// lower priority error is more desirable
if prioErr(candidate) < prioErr(current) {
return candidate
}
return current
return !(err == nil ||
errors.Is(err, ErrNodeNotFound) ||
errors.Is(err, ErrNodeAccessDenied))
}

View file

@ -156,43 +156,6 @@ func TestRetry(t *testing.T) {
require.NoError(t, err)
checkIndicesAndReset(t, p, 0, 0)
})
t.Run("error empty result", func(t *testing.T) {
errNodes, index := 2, 0
err := p.requestWithRetry(func(client grpcService.TreeServiceClient) error {
if index < errNodes {
index++
return errNodeEmptyResult
}
return nil
})
require.NoError(t, err)
checkIndicesAndReset(t, p, 0, errNodes)
})
t.Run("error not found", func(t *testing.T) {
errNodes, index := 2, 0
err := p.requestWithRetry(func(client grpcService.TreeServiceClient) error {
if index < errNodes {
index++
return ErrNodeNotFound
}
return nil
})
require.NoError(t, err)
checkIndicesAndReset(t, p, 0, errNodes)
})
t.Run("error access denied", func(t *testing.T) {
var index int
err := p.requestWithRetry(func(client grpcService.TreeServiceClient) error {
index++
return ErrNodeAccessDenied
})
require.ErrorIs(t, err, ErrNodeAccessDenied)
require.Equal(t, 1, index)
checkIndicesAndReset(t, p, 0, 0)
})
}
func TestRebalance(t *testing.T) {

View file

@ -1,10 +1,9 @@
package session_test
import (
"crypto/rand"
"fmt"
"math"
mrand "math/rand"
"math/rand"
"testing"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/refs"
@ -397,7 +396,7 @@ func TestContainer_AppliedTo(t *testing.T) {
func TestContainer_InvalidAt(t *testing.T) {
var x session.Container
nbf := mrand.Uint64()
nbf := rand.Uint64()
if nbf == math.MaxUint64 {
nbf--
}

View file

@ -1,7 +1,7 @@
package user_test
import (
"crypto/rand"
"math/rand"
"testing"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/refs"