Compare commits

..

No commits in common. "fc4551b84341ab45cf856b88713c1dbfa01afad7" and "6fdbe755179efb5fc5cdf5f1abc78cb4e982c3c8" have entirely different histories.

52 changed files with 654 additions and 1465 deletions

View file

@ -25,7 +25,7 @@ linters-settings:
# report about shadowed variables # report about shadowed variables
check-shadowing: false check-shadowing: false
staticcheck: staticcheck:
checks: ["all"] checks: ["all", "-SA1019"] # TODO Enable SA1019 after deprecated warning are fixed.
funlen: funlen:
lines: 80 # default 60 lines: 80 # default 60
statements: 60 # default 40 statements: 60 # default 40

View file

@ -24,17 +24,17 @@ type Checksum refs.Checksum
// Type represents the enumeration // Type represents the enumeration
// of checksum types. // of checksum types.
type Type refs.ChecksumType type Type uint8
const ( const (
// Unknown is an undefined checksum type. // Unknown is an undefined checksum type.
Unknown Type = Type(refs.UnknownChecksum) Unknown Type = iota
// SHA256 is a SHA256 checksum type. // SHA256 is a SHA256 checksum type.
SHA256 = Type(refs.SHA256) SHA256
// TZ is a Tillich-Zémor checksum type. // TZ is a Tillich-Zémor checksum type.
TZ = Type(refs.TillichZemor) TZ
) )
// ReadFromV2 reads Checksum from the refs.Checksum message. Checks if the // ReadFromV2 reads Checksum from the refs.Checksum message. Checks if the

View file

@ -2,9 +2,9 @@ package checksum
import ( import (
"bytes" "bytes"
"crypto/rand"
"crypto/sha256" "crypto/sha256"
"fmt" "fmt"
"math/rand"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/refs" "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/refs"
) )

View file

@ -1,8 +1,8 @@
package checksumtest package checksumtest
import ( import (
"crypto/rand"
"crypto/sha256" "crypto/sha256"
"math/rand"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/checksum" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/checksum"
) )
@ -11,7 +11,7 @@ import (
func Checksum() checksum.Checksum { func Checksum() checksum.Checksum {
var cs [sha256.Size]byte var cs [sha256.Size]byte
_, _ = rand.Read(cs[:]) rand.Read(cs[:])
var x checksum.Checksum var x checksum.Checksum

View file

@ -47,8 +47,6 @@ func writeXHeadersToMeta(xHeaders []string, h *v2session.RequestMetaHeader) {
return return
} }
// TODO (aarifullin): remove the panic when all client parameters will check XHeaders
// within buildRequest invocation.
if len(xHeaders)%2 != 0 { if len(xHeaders)%2 != 0 {
panic("slice of X-Headers with odd length") panic("slice of X-Headers with odd length")
} }

View file

@ -18,20 +18,20 @@ import (
// PrmContainerSetEACL groups parameters of ContainerSetEACL operation. // PrmContainerSetEACL groups parameters of ContainerSetEACL operation.
type PrmContainerSetEACL struct { type PrmContainerSetEACL struct {
// FrostFS request X-Headers. prmCommonMeta
XHeaders []string
Table *eacl.Table tableSet bool
table eacl.Table
Session *session.Container sessionSet bool
session session.Container
} }
// SetTable sets eACL table structure to be set for the container. // SetTable sets eACL table structure to be set for the container.
// Required parameter. // Required parameter.
//
// Deprecated: Use PrmContainerSetEACL.Table instead.
func (x *PrmContainerSetEACL) SetTable(table eacl.Table) { func (x *PrmContainerSetEACL) SetTable(table eacl.Table) {
x.Table = &table x.table = table
x.tableSet = true
} }
// WithinSession specifies session within which extended ACL of the container // WithinSession specifies session within which extended ACL of the container
@ -45,22 +45,17 @@ func (x *PrmContainerSetEACL) SetTable(table eacl.Table) {
// for which extended ACL is going to be set // for which extended ACL is going to be set
// - session operation MUST be session.VerbContainerSetEACL (ForVerb) // - session operation MUST be session.VerbContainerSetEACL (ForVerb)
// - token MUST be signed using private key of the owner of the container to be saved // - token MUST be signed using private key of the owner of the container to be saved
//
// Deprecated: Use PrmContainerSetEACL.Session instead.
func (x *PrmContainerSetEACL) WithinSession(s session.Container) { func (x *PrmContainerSetEACL) WithinSession(s session.Container) {
x.Session = &s x.session = s
x.sessionSet = true
} }
func (x *PrmContainerSetEACL) buildRequest(c *Client) (*v2container.SetExtendedACLRequest, error) { func (x *PrmContainerSetEACL) buildRequest(c *Client) (*v2container.SetExtendedACLRequest, error) {
if x.Table == nil { if !x.tableSet {
return nil, errorEACLTableNotSet return nil, errorEACLTableNotSet
} }
if len(x.XHeaders)%2 != 0 { eaclV2 := x.table.ToV2()
return nil, errorInvalidXHeaders
}
eaclV2 := x.Table.ToV2()
var sig frostfscrypto.Signature var sig frostfscrypto.Signature
@ -77,11 +72,11 @@ func (x *PrmContainerSetEACL) buildRequest(c *Client) (*v2container.SetExtendedA
reqBody.SetSignature(&sigv2) reqBody.SetSignature(&sigv2)
var meta v2session.RequestMetaHeader var meta v2session.RequestMetaHeader
writeXHeadersToMeta(x.XHeaders, &meta) writeXHeadersToMeta(x.prmCommonMeta.xHeaders, &meta)
if x.Session != nil { if x.sessionSet {
var tokv2 v2session.Token var tokv2 v2session.Token
x.Session.WriteToV2(&tokv2) x.session.WriteToV2(&tokv2)
meta.SetSessionToken(&tokv2) meta.SetSessionToken(&tokv2)
} }

View file

@ -14,29 +14,27 @@ import (
// PrmAnnounceSpace groups parameters of ContainerAnnounceUsedSpace operation. // PrmAnnounceSpace groups parameters of ContainerAnnounceUsedSpace operation.
type PrmAnnounceSpace struct { type PrmAnnounceSpace struct {
XHeaders []string prmCommonMeta
Announcements []container.SizeEstimation announcements []container.SizeEstimation
} }
// SetValues sets values describing volume of space that is used for the container objects. // SetValues sets values describing volume of space that is used for the container objects.
// Required parameter. Must not be empty. // Required parameter. Must not be empty.
// //
// Must not be mutated before the end of the operation. // Must not be mutated before the end of the operation.
//
// Deprecated: Use PrmAnnounceSpace.Announcements instead.
func (x *PrmAnnounceSpace) SetValues(vs []container.SizeEstimation) { func (x *PrmAnnounceSpace) SetValues(vs []container.SizeEstimation) {
x.Announcements = vs x.announcements = vs
} }
func (x *PrmAnnounceSpace) buildRequest(c *Client) (*v2container.AnnounceUsedSpaceRequest, error) { func (x *PrmAnnounceSpace) buildRequest(c *Client) (*v2container.AnnounceUsedSpaceRequest, error) {
if len(x.Announcements) == 0 { if len(x.announcements) == 0 {
return nil, errorMissingAnnouncements return nil, errorMissingAnnouncements
} }
v2announce := make([]v2container.UsedSpaceAnnouncement, len(x.Announcements)) v2announce := make([]v2container.UsedSpaceAnnouncement, len(x.announcements))
for i := range x.Announcements { for i := range x.announcements {
x.Announcements[i].WriteToV2(&v2announce[i]) x.announcements[i].WriteToV2(&v2announce[i])
} }
reqBody := new(v2container.AnnounceUsedSpaceRequestBody) reqBody := new(v2container.AnnounceUsedSpaceRequestBody)

View file

@ -16,12 +16,12 @@ import (
// PrmEndpointInfo groups parameters of EndpointInfo operation. // PrmEndpointInfo groups parameters of EndpointInfo operation.
type PrmEndpointInfo struct { type PrmEndpointInfo struct {
XHeaders []string prmCommonMeta
} }
func (x *PrmEndpointInfo) buildRequest(c *Client) (*v2netmap.LocalNodeInfoRequest, error) { func (x *PrmEndpointInfo) buildRequest(c *Client) (*v2netmap.LocalNodeInfoRequest, error) {
meta := new(v2session.RequestMetaHeader) meta := new(v2session.RequestMetaHeader)
writeXHeadersToMeta(x.XHeaders, meta) writeXHeadersToMeta(x.xHeaders, meta)
req := new(v2netmap.LocalNodeInfoRequest) req := new(v2netmap.LocalNodeInfoRequest)
req.SetBody(new(v2netmap.LocalNodeInfoRequestBody)) req.SetBody(new(v2netmap.LocalNodeInfoRequestBody))
@ -112,12 +112,12 @@ func (c *Client) EndpointInfo(ctx context.Context, prm PrmEndpointInfo) (*ResEnd
// PrmNetworkInfo groups parameters of NetworkInfo operation. // PrmNetworkInfo groups parameters of NetworkInfo operation.
type PrmNetworkInfo struct { type PrmNetworkInfo struct {
XHeaders []string prmCommonMeta
} }
func (x PrmNetworkInfo) buildRequest(c *Client) (*v2netmap.NetworkInfoRequest, error) { func (x PrmNetworkInfo) buildRequest(c *Client) (*v2netmap.NetworkInfoRequest, error) {
meta := new(v2session.RequestMetaHeader) meta := new(v2session.RequestMetaHeader)
writeXHeadersToMeta(x.XHeaders, meta) writeXHeadersToMeta(x.xHeaders, meta)
var req v2netmap.NetworkInfoRequest var req v2netmap.NetworkInfoRequest
req.SetBody(new(v2netmap.NetworkInfoRequestBody)) req.SetBody(new(v2netmap.NetworkInfoRequestBody))

View file

@ -21,25 +21,71 @@ import (
// PrmObjectDelete groups parameters of ObjectDelete operation. // PrmObjectDelete groups parameters of ObjectDelete operation.
type PrmObjectDelete struct { type PrmObjectDelete struct {
XHeaders []string meta v2session.RequestMetaHeader
BearerToken *bearer.Token body v2object.DeleteRequestBody
Session *session.Object addr v2refs.Address
ContainerID *cid.ID keySet bool
key ecdsa.PrivateKey
}
ObjectID *oid.ID // WithinSession specifies session within which object should be read.
//
// Creator of the session acquires the authorship of the request.
// This may affect the execution of an operation (e.g. access control).
//
// Must be signed.
func (x *PrmObjectDelete) WithinSession(t session.Object) {
var tv2 v2session.Token
t.WriteToV2(&tv2)
Key *ecdsa.PrivateKey x.meta.SetSessionToken(&tv2)
}
// WithBearerToken attaches bearer token to be used for the operation.
//
// If set, underlying eACL rules will be used in access control.
//
// Must be signed.
func (x *PrmObjectDelete) WithBearerToken(t bearer.Token) {
var v2token acl.BearerToken
t.WriteToV2(&v2token)
x.meta.SetBearerToken(&v2token)
}
// FromContainer specifies FrostFS container of the object.
// Required parameter.
func (x *PrmObjectDelete) FromContainer(id cid.ID) {
var cidV2 v2refs.ContainerID
id.WriteToV2(&cidV2)
x.addr.SetContainerID(&cidV2)
}
// ByID specifies identifier of the requested object.
// Required parameter.
func (x *PrmObjectDelete) ByID(id oid.ID) {
var idV2 v2refs.ObjectID
id.WriteToV2(&idV2)
x.addr.SetObjectID(&idV2)
} }
// UseKey specifies private key to sign the requests. // UseKey specifies private key to sign the requests.
// If key is not provided, then Client default key is used. // If key is not provided, then Client default key is used.
func (x *PrmObjectDelete) UseKey(key ecdsa.PrivateKey) {
x.keySet = true
x.key = key
}
// WithXHeaders specifies list of extended headers (string key-value pairs)
// to be attached to the request. Must have an even length.
// //
// Deprecated: Use PrmObjectDelete.Key instead. // Slice must not be mutated until the operation completes.
func (prm *PrmObjectDelete) UseKey(key ecdsa.PrivateKey) { func (x *PrmObjectDelete) WithXHeaders(hs ...string) {
prm.Key = &key writeXHeadersToMeta(hs, &x.meta)
} }
// ResObjectDelete groups resulting values of ObjectDelete operation. // ResObjectDelete groups resulting values of ObjectDelete operation.
@ -54,54 +100,6 @@ func (x ResObjectDelete) Tombstone() oid.ID {
return x.tomb return x.tomb
} }
func (prm *PrmObjectDelete) buildRequest(c *Client) (*v2object.DeleteRequest, error) {
if prm.ContainerID == nil {
return nil, errorMissingContainer
}
if prm.ObjectID == nil {
return nil, errorMissingObject
}
if len(prm.XHeaders)%2 != 0 {
return nil, errorInvalidXHeaders
}
meta := new(v2session.RequestMetaHeader)
writeXHeadersToMeta(prm.XHeaders, meta)
if prm.BearerToken != nil {
v2BearerToken := new(acl.BearerToken)
prm.BearerToken.WriteToV2(v2BearerToken)
meta.SetBearerToken(v2BearerToken)
}
if prm.Session != nil {
v2SessionToken := new(v2session.Token)
prm.Session.WriteToV2(v2SessionToken)
meta.SetSessionToken(v2SessionToken)
}
addr := new(v2refs.Address)
cnrV2 := new(v2refs.ContainerID)
prm.ContainerID.WriteToV2(cnrV2)
addr.SetContainerID(cnrV2)
objV2 := new(v2refs.ObjectID)
prm.ObjectID.WriteToV2(objV2)
addr.SetObjectID(objV2)
body := new(v2object.DeleteRequestBody)
body.SetAddress(addr)
req := new(v2object.DeleteRequest)
req.SetBody(body)
c.prepareRequest(req, meta)
return req, nil
}
// ObjectDelete marks an object for deletion from the container using FrostFS API protocol. // ObjectDelete marks an object for deletion from the container using FrostFS API protocol.
// As a marker, a special unit called a tombstone is placed in the container. // As a marker, a special unit called a tombstone is placed in the container.
// It confirms the user's intent to delete the object, and is itself a container object. // It confirms the user's intent to delete the object, and is itself a container object.
@ -126,22 +124,32 @@ func (prm *PrmObjectDelete) buildRequest(c *Client) (*v2object.DeleteRequest, er
// - *apistatus.ObjectLocked; // - *apistatus.ObjectLocked;
// - *apistatus.SessionTokenExpired. // - *apistatus.SessionTokenExpired.
func (c *Client) ObjectDelete(ctx context.Context, prm PrmObjectDelete) (*ResObjectDelete, error) { func (c *Client) ObjectDelete(ctx context.Context, prm PrmObjectDelete) (*ResObjectDelete, error) {
req, err := prm.buildRequest(c) switch {
if err != nil { case prm.addr.GetContainerID() == nil:
return nil, err return nil, errorMissingContainer
case prm.addr.GetObjectID() == nil:
return nil, errorMissingObject
} }
// form request body
prm.body.SetAddress(&prm.addr)
// form request
var req v2object.DeleteRequest
req.SetBody(&prm.body)
c.prepareRequest(&req, &prm.meta)
key := c.prm.key key := c.prm.key
if prm.Key != nil { if prm.keySet {
key = *prm.Key key = prm.key
} }
err = signature.SignServiceMessage(&key, req) err := signature.SignServiceMessage(&key, &req)
if err != nil { if err != nil {
return nil, fmt.Errorf("sign request: %w", err) return nil, fmt.Errorf("sign request: %w", err)
} }
resp, err := rpcapi.DeleteObject(&c.c, req, client.WithContext(ctx)) resp, err := rpcapi.DeleteObject(&c.c, &req, client.WithContext(ctx))
if err != nil { if err != nil {
return nil, err return nil, err
} }

View file

@ -22,76 +22,77 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session"
) )
// shared parameters of GET/HEAD/RANGE.
type prmObjectRead struct {
meta v2session.RequestMetaHeader
raw bool
addr v2refs.Address
}
// WithXHeaders specifies list of extended headers (string key-value pairs)
// to be attached to the request. Must have an even length.
//
// Slice must not be mutated until the operation completes.
func (x *prmObjectRead) WithXHeaders(hs ...string) {
writeXHeadersToMeta(hs, &x.meta)
}
// MarkRaw marks an intent to read physically stored object.
func (x *prmObjectRead) MarkRaw() {
x.raw = true
}
// MarkLocal tells the server to execute the operation locally.
func (x *prmObjectRead) MarkLocal() {
x.meta.SetTTL(1)
}
// WithinSession specifies session within which object should be read.
//
// Creator of the session acquires the authorship of the request.
// This may affect the execution of an operation (e.g. access control).
//
// Must be signed.
func (x *prmObjectRead) WithinSession(t session.Object) {
var tokv2 v2session.Token
t.WriteToV2(&tokv2)
x.meta.SetSessionToken(&tokv2)
}
// WithBearerToken attaches bearer token to be used for the operation.
//
// If set, underlying eACL rules will be used in access control.
//
// Must be signed.
func (x *prmObjectRead) WithBearerToken(t bearer.Token) {
var v2token acl.BearerToken
t.WriteToV2(&v2token)
x.meta.SetBearerToken(&v2token)
}
// FromContainer specifies FrostFS container of the object.
// Required parameter.
func (x *prmObjectRead) FromContainer(id cid.ID) {
var cnrV2 v2refs.ContainerID
id.WriteToV2(&cnrV2)
x.addr.SetContainerID(&cnrV2)
}
// ByID specifies identifier of the requested object.
// Required parameter.
func (x *prmObjectRead) ByID(id oid.ID) {
var objV2 v2refs.ObjectID
id.WriteToV2(&objV2)
x.addr.SetObjectID(&objV2)
}
// PrmObjectGet groups parameters of ObjectGetInit operation. // PrmObjectGet groups parameters of ObjectGetInit operation.
type PrmObjectGet struct { type PrmObjectGet struct {
XHeaders []string prmObjectRead
BearerToken *bearer.Token key *ecdsa.PrivateKey
Session *session.Object
Raw bool
Local bool
ContainerID *cid.ID
ObjectID *oid.ID
Key *ecdsa.PrivateKey
}
func (prm *PrmObjectGet) buildRequest(c *Client) (*v2object.GetRequest, error) {
if prm.ContainerID == nil {
return nil, errorMissingContainer
}
if prm.ObjectID == nil {
return nil, errorMissingObject
}
if len(prm.XHeaders)%2 != 0 {
return nil, errorInvalidXHeaders
}
meta := new(v2session.RequestMetaHeader)
writeXHeadersToMeta(prm.XHeaders, meta)
if prm.BearerToken != nil {
v2BearerToken := new(acl.BearerToken)
prm.BearerToken.WriteToV2(v2BearerToken)
meta.SetBearerToken(v2BearerToken)
}
if prm.Session != nil {
v2SessionToken := new(v2session.Token)
prm.Session.WriteToV2(v2SessionToken)
meta.SetSessionToken(v2SessionToken)
}
if prm.Local {
meta.SetTTL(1)
}
addr := new(v2refs.Address)
cnrV2 := new(v2refs.ContainerID)
prm.ContainerID.WriteToV2(cnrV2)
addr.SetContainerID(cnrV2)
objV2 := new(v2refs.ObjectID)
prm.ObjectID.WriteToV2(objV2)
addr.SetObjectID(objV2)
body := new(v2object.GetRequestBody)
body.SetRaw(prm.Raw)
body.SetAddress(addr)
req := new(v2object.GetRequest)
req.SetBody(body)
c.prepareRequest(req, meta)
return req, nil
} }
// ResObjectGet groups the final result values of ObjectGetInit operation. // ResObjectGet groups the final result values of ObjectGetInit operation.
@ -121,10 +122,8 @@ type ObjectReader struct {
// UseKey specifies private key to sign the requests. // UseKey specifies private key to sign the requests.
// If key is not provided, then Client default key is used. // If key is not provided, then Client default key is used.
// func (x *PrmObjectGet) UseKey(key ecdsa.PrivateKey) {
// Deprecated: Use PrmObjectGet.Key instead. x.key = &key
func (prm *PrmObjectGet) UseKey(key ecdsa.PrivateKey) {
prm.Key = &key
} }
// ReadHeader reads header of the object. Result means success. // ReadHeader reads header of the object. Result means success.
@ -300,24 +299,39 @@ func (x *ObjectReader) Read(p []byte) (int, error) {
// Returns an error if parameters are set incorrectly (see PrmObjectGet docs). // Returns an error if parameters are set incorrectly (see PrmObjectGet docs).
// Context is required and must not be nil. It is used for network communication. // Context is required and must not be nil. It is used for network communication.
func (c *Client) ObjectGetInit(ctx context.Context, prm PrmObjectGet) (*ObjectReader, error) { func (c *Client) ObjectGetInit(ctx context.Context, prm PrmObjectGet) (*ObjectReader, error) {
req, err := prm.buildRequest(c) // check parameters
if err != nil { switch {
return nil, err case prm.addr.GetContainerID() == nil:
return nil, errorMissingContainer
case prm.addr.GetObjectID() == nil:
return nil, errorMissingObject
} }
key := prm.Key // form request body
var body v2object.GetRequestBody
body.SetRaw(prm.raw)
body.SetAddress(&prm.addr)
// form request
var req v2object.GetRequest
req.SetBody(&body)
c.prepareRequest(&req, &prm.meta)
key := prm.key
if key == nil { if key == nil {
key = &c.prm.key key = &c.prm.key
} }
err = signature.SignServiceMessage(key, req) err := signature.SignServiceMessage(key, &req)
if err != nil { if err != nil {
return nil, fmt.Errorf("sign request: %w", err) return nil, fmt.Errorf("sign request: %w", err)
} }
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
stream, err := rpcapi.GetObject(&c.c, req, client.WithContext(ctx)) stream, err := rpcapi.GetObject(&c.c, &req, client.WithContext(ctx))
if err != nil { if err != nil {
cancel() cancel()
return nil, fmt.Errorf("open stream: %w", err) return nil, fmt.Errorf("open stream: %w", err)
@ -333,29 +347,17 @@ func (c *Client) ObjectGetInit(ctx context.Context, prm PrmObjectGet) (*ObjectRe
// PrmObjectHead groups parameters of ObjectHead operation. // PrmObjectHead groups parameters of ObjectHead operation.
type PrmObjectHead struct { type PrmObjectHead struct {
XHeaders []string prmObjectRead
BearerToken *bearer.Token keySet bool
key ecdsa.PrivateKey
Session *session.Object
Raw bool
Local bool
ContainerID *cid.ID
ObjectID *oid.ID
Key *ecdsa.PrivateKey
} }
// UseKey specifies private key to sign the requests. // UseKey specifies private key to sign the requests.
// If key is not provided, then Client default key is used. // If key is not provided, then Client default key is used.
// func (x *PrmObjectHead) UseKey(key ecdsa.PrivateKey) {
// Deprecated: Use PrmObjectHead.Key instead. x.keySet = true
func (prm *PrmObjectHead) UseKey(key ecdsa.PrivateKey) { x.key = key
prm.Key = &key
} }
// ResObjectHead groups resulting values of ObjectHead operation. // ResObjectHead groups resulting values of ObjectHead operation.
@ -388,58 +390,6 @@ func (x *ResObjectHead) ReadHeader(dst *object.Object) bool {
return true return true
} }
func (prm *PrmObjectHead) buildRequest(c *Client) (*v2object.HeadRequest, error) {
if prm.ContainerID == nil {
return nil, errorMissingContainer
}
if prm.ObjectID == nil {
return nil, errorMissingObject
}
if len(prm.XHeaders)%2 != 0 {
return nil, errorInvalidXHeaders
}
meta := new(v2session.RequestMetaHeader)
writeXHeadersToMeta(prm.XHeaders, meta)
if prm.BearerToken != nil {
v2BearerToken := new(acl.BearerToken)
prm.BearerToken.WriteToV2(v2BearerToken)
meta.SetBearerToken(v2BearerToken)
}
if prm.Session != nil {
v2SessionToken := new(v2session.Token)
prm.Session.WriteToV2(v2SessionToken)
meta.SetSessionToken(v2SessionToken)
}
if prm.Local {
meta.SetTTL(1)
}
addr := new(v2refs.Address)
cnrV2 := new(v2refs.ContainerID)
prm.ContainerID.WriteToV2(cnrV2)
addr.SetContainerID(cnrV2)
objV2 := new(v2refs.ObjectID)
prm.ObjectID.WriteToV2(objV2)
addr.SetObjectID(objV2)
body := new(v2object.HeadRequestBody)
body.SetRaw(prm.Raw)
body.SetAddress(addr)
req := new(v2object.HeadRequest)
req.SetBody(body)
c.prepareRequest(req, meta)
return req, nil
}
// ObjectHead reads object header through a remote server using FrostFS API protocol. // ObjectHead reads object header through a remote server using FrostFS API protocol.
// //
// Exactly one return value is non-nil. By default, server status is returned in res structure. // Exactly one return value is non-nil. By default, server status is returned in res structure.
@ -463,24 +413,33 @@ func (prm *PrmObjectHead) buildRequest(c *Client) (*v2object.HeadRequest, error)
// - *apistatus.ObjectAlreadyRemoved; // - *apistatus.ObjectAlreadyRemoved;
// - *apistatus.SessionTokenExpired. // - *apistatus.SessionTokenExpired.
func (c *Client) ObjectHead(ctx context.Context, prm PrmObjectHead) (*ResObjectHead, error) { func (c *Client) ObjectHead(ctx context.Context, prm PrmObjectHead) (*ResObjectHead, error) {
req, err := prm.buildRequest(c) switch {
if err != nil { case prm.addr.GetContainerID() == nil:
return nil, err return nil, errorMissingContainer
case prm.addr.GetObjectID() == nil:
return nil, errorMissingObject
} }
var body v2object.HeadRequestBody
body.SetRaw(prm.raw)
body.SetAddress(&prm.addr)
var req v2object.HeadRequest
req.SetBody(&body)
c.prepareRequest(&req, &prm.meta)
key := c.prm.key key := c.prm.key
if prm.Key != nil { if prm.keySet {
key = *prm.Key key = prm.key
} }
// sign the request // sign the request
err := signature.SignServiceMessage(&key, &req)
err = signature.SignServiceMessage(&key, req)
if err != nil { if err != nil {
return nil, fmt.Errorf("sign request: %w", err) return nil, fmt.Errorf("sign request: %w", err)
} }
resp, err := rpcapi.HeadObject(&c.c, req, client.WithContext(ctx)) resp, err := rpcapi.HeadObject(&c.c, &req, client.WithContext(ctx))
if err != nil { if err != nil {
return nil, fmt.Errorf("write request: %w", err) return nil, fmt.Errorf("write request: %w", err)
} }
@ -495,7 +454,7 @@ func (c *Client) ObjectHead(ctx context.Context, prm PrmObjectHead) (*ResObjectH
return &res, nil return &res, nil
} }
res.idObj = *prm.ObjectID _ = res.idObj.ReadFromV2(*prm.addr.GetObjectID())
switch v := resp.GetBody().GetHeaderPart().(type) { switch v := resp.GetBody().GetHeaderPart().(type) {
default: default:
@ -511,95 +470,29 @@ func (c *Client) ObjectHead(ctx context.Context, prm PrmObjectHead) (*ResObjectH
// PrmObjectRange groups parameters of ObjectRange operation. // PrmObjectRange groups parameters of ObjectRange operation.
type PrmObjectRange struct { type PrmObjectRange struct {
XHeaders []string prmObjectRead
BearerToken *bearer.Token key *ecdsa.PrivateKey
Session *session.Object rng v2object.Range
Raw bool
Local bool
ContainerID *cid.ID
ObjectID *oid.ID
Key *ecdsa.PrivateKey
Offset uint64
Length uint64
} }
func (prm *PrmObjectRange) buildRequest(c *Client) (*v2object.GetRangeRequest, error) { // SetOffset sets offset of the payload range to be read.
if prm.Length == 0 { // Zero by default.
return nil, errorZeroRangeLength func (x *PrmObjectRange) SetOffset(off uint64) {
x.rng.SetOffset(off)
} }
if prm.ContainerID == nil { // SetLength sets length of the payload range to be read.
return nil, errorMissingContainer // Must be positive.
} func (x *PrmObjectRange) SetLength(ln uint64) {
x.rng.SetLength(ln)
if prm.ObjectID == nil {
return nil, errorMissingObject
}
if len(prm.XHeaders)%2 != 0 {
return nil, errorInvalidXHeaders
}
meta := new(v2session.RequestMetaHeader)
writeXHeadersToMeta(prm.XHeaders, meta)
if prm.BearerToken != nil {
v2BearerToken := new(acl.BearerToken)
prm.BearerToken.WriteToV2(v2BearerToken)
meta.SetBearerToken(v2BearerToken)
}
if prm.Session != nil {
v2SessionToken := new(v2session.Token)
prm.Session.WriteToV2(v2SessionToken)
meta.SetSessionToken(v2SessionToken)
}
if prm.Local {
meta.SetTTL(1)
}
addr := new(v2refs.Address)
cnrV2 := new(v2refs.ContainerID)
prm.ContainerID.WriteToV2(cnrV2)
addr.SetContainerID(cnrV2)
objV2 := new(v2refs.ObjectID)
prm.ObjectID.WriteToV2(objV2)
addr.SetObjectID(objV2)
rng := new(v2object.Range)
rng.SetLength(prm.Length)
rng.SetOffset(prm.Offset)
body := new(v2object.GetRangeRequestBody)
body.SetRaw(prm.Raw)
body.SetAddress(addr)
body.SetRange(rng)
req := new(v2object.GetRangeRequest)
req.SetBody(body)
c.prepareRequest(req, meta)
return req, nil
} }
// UseKey specifies private key to sign the requests. // UseKey specifies private key to sign the requests.
// If key is not provided, then Client default key is used. // If key is not provided, then Client default key is used.
// func (x *PrmObjectRange) UseKey(key ecdsa.PrivateKey) {
// Deprecated: Use PrmObjectRange.Key instead. x.key = &key
func (prm *PrmObjectRange) UseKey(key ecdsa.PrivateKey) {
prm.Key = &key
} }
// ResObjectRange groups the final result values of ObjectRange operation. // ResObjectRange groups the final result values of ObjectRange operation.
@ -769,31 +662,49 @@ func (x *ObjectRangeReader) Read(p []byte) (int, error) {
// Returns an error if parameters are set incorrectly (see PrmObjectRange docs). // Returns an error if parameters are set incorrectly (see PrmObjectRange docs).
// Context is required and must not be nil. It is used for network communication. // Context is required and must not be nil. It is used for network communication.
func (c *Client) ObjectRangeInit(ctx context.Context, prm PrmObjectRange) (*ObjectRangeReader, error) { func (c *Client) ObjectRangeInit(ctx context.Context, prm PrmObjectRange) (*ObjectRangeReader, error) {
req, err := prm.buildRequest(c) // check parameters
if err != nil { switch {
return nil, err case prm.addr.GetContainerID() == nil:
return nil, errorMissingContainer
case prm.addr.GetObjectID() == nil:
return nil, errorMissingObject
case prm.rng.GetLength() == 0:
return nil, errorZeroRangeLength
} }
key := prm.Key // form request body
var body v2object.GetRangeRequestBody
body.SetRaw(prm.raw)
body.SetAddress(&prm.addr)
body.SetRange(&prm.rng)
// form request
var req v2object.GetRangeRequest
req.SetBody(&body)
c.prepareRequest(&req, &prm.meta)
key := prm.key
if key == nil { if key == nil {
key = &c.prm.key key = &c.prm.key
} }
err = signature.SignServiceMessage(key, req) err := signature.SignServiceMessage(key, &req)
if err != nil { if err != nil {
return nil, fmt.Errorf("sign request: %w", err) return nil, fmt.Errorf("sign request: %w", err)
} }
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
stream, err := rpcapi.GetObjectRange(&c.c, req, client.WithContext(ctx)) stream, err := rpcapi.GetObjectRange(&c.c, &req, client.WithContext(ctx))
if err != nil { if err != nil {
cancel() cancel()
return nil, fmt.Errorf("open stream: %w", err) return nil, fmt.Errorf("open stream: %w", err)
} }
var r ObjectRangeReader var r ObjectRangeReader
r.remainingPayloadLen = int(prm.Length) r.remainingPayloadLen = int(prm.rng.GetLength())
r.cancelCtxStream = cancel r.cancelCtxStream = cancel
r.stream = stream r.stream = stream
r.client = c r.client = c

View file

@ -13,53 +13,121 @@ import (
v2session "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/session" v2session "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/session"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/signature" "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/signature"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/checksum"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session"
) )
// PrmObjectHash groups parameters of ObjectHash operation. // PrmObjectHash groups parameters of ObjectHash operation.
type PrmObjectHash struct { type PrmObjectHash struct {
XHeaders []string meta v2session.RequestMetaHeader
BearerToken *bearer.Token body v2object.GetRangeHashRequestBody
Session *session.Object csAlgo v2refs.ChecksumType
Local bool addr v2refs.Address
Ranges []object.Range keySet bool
key ecdsa.PrivateKey
Salt []byte
ChecksumType checksum.Type
ContainerID *cid.ID
ObjectID *oid.ID
Key *ecdsa.PrivateKey
} }
// UseKey specifies private key to sign the requests. // UseKey specifies private key to sign the requests.
// If key is not provided, then Client default key is used. // If key is not provided, then Client default key is used.
func (x *PrmObjectHash) UseKey(key ecdsa.PrivateKey) {
x.keySet = true
x.key = key
}
// MarkLocal tells the server to execute the operation locally.
func (x *PrmObjectHash) MarkLocal() {
x.meta.SetTTL(1)
}
// WithinSession specifies session within which object should be read.
// //
// Deprecated: Use PrmObjectHash.Key instead. // Creator of the session acquires the authorship of the request.
func (prm *PrmObjectHash) UseKey(key ecdsa.PrivateKey) { // This may affect the execution of an operation (e.g. access control).
prm.Key = &key //
// Must be signed.
func (x *PrmObjectHash) WithinSession(t session.Object) {
var tv2 v2session.Token
t.WriteToV2(&tv2)
x.meta.SetSessionToken(&tv2)
}
// WithBearerToken attaches bearer token to be used for the operation.
//
// If set, underlying eACL rules will be used in access control.
//
// Must be signed.
func (x *PrmObjectHash) WithBearerToken(t bearer.Token) {
var v2token acl.BearerToken
t.WriteToV2(&v2token)
x.meta.SetBearerToken(&v2token)
}
// FromContainer specifies FrostFS container of the object.
// Required parameter.
func (x *PrmObjectHash) FromContainer(id cid.ID) {
var cidV2 v2refs.ContainerID
id.WriteToV2(&cidV2)
x.addr.SetContainerID(&cidV2)
}
// ByID specifies identifier of the requested object.
// Required parameter.
func (x *PrmObjectHash) ByID(id oid.ID) {
var idV2 v2refs.ObjectID
id.WriteToV2(&idV2)
x.addr.SetObjectID(&idV2)
}
// SetRangeList sets list of ranges in (offset, length) pair format.
// Required parameter.
//
// If passed as slice, then it must not be mutated before the operation completes.
func (x *PrmObjectHash) SetRangeList(r ...uint64) {
ln := len(r)
if ln%2 != 0 {
panic("odd number of range parameters")
}
rs := make([]v2object.Range, ln/2)
for i := 0; i < ln/2; i++ {
rs[i].SetOffset(r[2*i])
rs[i].SetLength(r[2*i+1])
}
x.body.SetRanges(rs)
} }
// TillichZemorAlgo changes the hash function to Tillich-Zemor // TillichZemorAlgo changes the hash function to Tillich-Zemor
// (https://link.springer.com/content/pdf/10.1007/3-540-48658-5_5.pdf). // (https://link.springer.com/content/pdf/10.1007/3-540-48658-5_5.pdf).
// //
// By default, SHA256 hash function is used/. // By default, SHA256 hash function is used.
func (x *PrmObjectHash) TillichZemorAlgo() {
x.csAlgo = v2refs.TillichZemor
}
// UseSalt sets the salt to XOR the data range before hashing.
// //
// Deprecated: Use PrmObjectHash.ChecksumType instead. // Must not be mutated before the operation completes.
func (prm *PrmObjectHash) TillichZemorAlgo() { func (x *PrmObjectHash) UseSalt(salt []byte) {
prm.ChecksumType = checksum.TZ x.body.SetSalt(salt)
}
// WithXHeaders specifies list of extended headers (string key-value pairs)
// to be attached to the request. Must have an even length.
//
// Slice must not be mutated until the operation completes.
func (x *PrmObjectHash) WithXHeaders(hs ...string) {
writeXHeadersToMeta(hs, &x.meta)
} }
// ResObjectHash groups resulting values of ObjectHash operation. // ResObjectHash groups resulting values of ObjectHash operation.
@ -74,76 +142,6 @@ func (x ResObjectHash) Checksums() [][]byte {
return x.checksums return x.checksums
} }
func (prm *PrmObjectHash) buildRequest(c *Client) (*v2object.GetRangeHashRequest, error) {
if prm.ContainerID == nil {
return nil, errorMissingContainer
}
if prm.ObjectID == nil {
return nil, errorMissingObject
}
if len(prm.XHeaders)%2 != 0 {
return nil, errorInvalidXHeaders
}
if len(prm.Ranges) == 0 {
return nil, errorMissingRanges
}
meta := new(v2session.RequestMetaHeader)
writeXHeadersToMeta(prm.XHeaders, meta)
if prm.BearerToken != nil {
v2BearerToken := new(acl.BearerToken)
prm.BearerToken.WriteToV2(v2BearerToken)
meta.SetBearerToken(v2BearerToken)
}
if prm.Session != nil {
v2SessionToken := new(v2session.Token)
prm.Session.WriteToV2(v2SessionToken)
meta.SetSessionToken(v2SessionToken)
}
if prm.Local {
meta.SetTTL(1)
}
addr := new(v2refs.Address)
cnrV2 := new(v2refs.ContainerID)
prm.ContainerID.WriteToV2(cnrV2)
addr.SetContainerID(cnrV2)
objV2 := new(v2refs.ObjectID)
prm.ObjectID.WriteToV2(objV2)
addr.SetObjectID(objV2)
rs := make([]v2object.Range, len(prm.Ranges))
for i := range prm.Ranges {
rs[i].SetOffset(prm.Ranges[i].GetOffset())
rs[i].SetLength(prm.Ranges[i].GetLength())
}
body := new(v2object.GetRangeHashRequestBody)
body.SetAddress(addr)
body.SetRanges(rs)
body.SetSalt(prm.Salt)
if prm.ChecksumType == checksum.Unknown {
body.SetType(v2refs.SHA256)
} else {
body.SetType(v2refs.ChecksumType(prm.ChecksumType))
}
req := new(v2object.GetRangeHashRequest)
req.SetBody(body)
c.prepareRequest(req, meta)
return req, nil
}
// ObjectHash requests checksum of the range list of the object payload using // ObjectHash requests checksum of the range list of the object payload using
// FrostFS API protocol. // FrostFS API protocol.
// //
@ -167,22 +165,37 @@ func (prm *PrmObjectHash) buildRequest(c *Client) (*v2object.GetRangeHashRequest
// - *apistatus.ObjectOutOfRange; // - *apistatus.ObjectOutOfRange;
// - *apistatus.SessionTokenExpired. // - *apistatus.SessionTokenExpired.
func (c *Client) ObjectHash(ctx context.Context, prm PrmObjectHash) (*ResObjectHash, error) { func (c *Client) ObjectHash(ctx context.Context, prm PrmObjectHash) (*ResObjectHash, error) {
req, err := prm.buildRequest(c) switch {
if err != nil { case prm.addr.GetContainerID() == nil:
return nil, err return nil, errorMissingContainer
case prm.addr.GetObjectID() == nil:
return nil, errorMissingObject
case len(prm.body.GetRanges()) == 0:
return nil, errorMissingRanges
} }
prm.body.SetAddress(&prm.addr)
if prm.csAlgo == v2refs.UnknownChecksum {
prm.body.SetType(v2refs.SHA256)
} else {
prm.body.SetType(prm.csAlgo)
}
var req v2object.GetRangeHashRequest
c.prepareRequest(&req, &prm.meta)
req.SetBody(&prm.body)
key := c.prm.key key := c.prm.key
if prm.Key != nil { if prm.keySet {
key = *prm.Key key = prm.key
} }
err = signature.SignServiceMessage(&key, req) err := signature.SignServiceMessage(&key, &req)
if err != nil { if err != nil {
return nil, fmt.Errorf("sign request: %w", err) return nil, fmt.Errorf("sign request: %w", err)
} }
resp, err := rpcapi.HashObjectRange(&c.c, req, client.WithContext(ctx)) resp, err := rpcapi.HashObjectRange(&c.c, &req, client.WithContext(ctx))
if err != nil { if err != nil {
return nil, fmt.Errorf("write request: %w", err) return nil, fmt.Errorf("write request: %w", err)
} }

View file

@ -16,32 +16,30 @@ import (
// PrmSessionCreate groups parameters of SessionCreate operation. // PrmSessionCreate groups parameters of SessionCreate operation.
type PrmSessionCreate struct { type PrmSessionCreate struct {
XHeaders []string prmCommonMeta
Expiration uint64 exp uint64
Key *ecdsa.PrivateKey keySet bool
key ecdsa.PrivateKey
} }
// SetExp sets number of the last NepFS epoch in the lifetime of the session after which it will be expired. // SetExp sets number of the last NepFS epoch in the lifetime of the session after which it will be expired.
//
// Deprecated: Use PrmSessionCreate.Expiration instead.
func (x *PrmSessionCreate) SetExp(exp uint64) { func (x *PrmSessionCreate) SetExp(exp uint64) {
x.Expiration = exp x.exp = exp
} }
// UseKey specifies private key to sign the requests and compute token owner. // UseKey specifies private key to sign the requests and compute token owner.
// If key is not provided, then Client default key is used. // If key is not provided, then Client default key is used.
//
// Deprecated: Use PrmSessionCreate.Key instead.
func (x *PrmSessionCreate) UseKey(key ecdsa.PrivateKey) { func (x *PrmSessionCreate) UseKey(key ecdsa.PrivateKey) {
x.Key = &key x.keySet = true
x.key = key
} }
func (x *PrmSessionCreate) buildRequest(c *Client) (*v2session.CreateRequest, error) { func (x *PrmSessionCreate) buildRequest(c *Client) (*v2session.CreateRequest, error) {
ownerKey := c.prm.key.PublicKey ownerKey := c.prm.key.PublicKey
if x.Key != nil { if x.keySet {
ownerKey = x.Key.PublicKey ownerKey = x.key.PublicKey
} }
var ownerID user.ID var ownerID user.ID
user.IDFromKey(&ownerID, ownerKey) user.IDFromKey(&ownerID, ownerKey)
@ -51,10 +49,10 @@ func (x *PrmSessionCreate) buildRequest(c *Client) (*v2session.CreateRequest, er
reqBody := new(v2session.CreateRequestBody) reqBody := new(v2session.CreateRequestBody)
reqBody.SetOwnerID(&ownerIDV2) reqBody.SetOwnerID(&ownerIDV2)
reqBody.SetExpiration(x.Expiration) reqBody.SetExpiration(x.exp)
var meta v2session.RequestMetaHeader var meta v2session.RequestMetaHeader
writeXHeadersToMeta(x.XHeaders, &meta) writeXHeadersToMeta(x.xHeaders, &meta)
var req v2session.CreateRequest var req v2session.CreateRequest
req.SetBody(reqBody) req.SetBody(reqBody)

View file

@ -1,8 +1,8 @@
package cid_test package cid_test
import ( import (
"crypto/rand"
"crypto/sha256" "crypto/sha256"
"math/rand"
"testing" "testing"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/refs" "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/refs"

View file

@ -1,8 +1,8 @@
package cidtest package cidtest
import ( import (
"crypto/rand"
"crypto/sha256" "crypto/sha256"
"math/rand"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
) )
@ -11,7 +11,7 @@ import (
func ID() cid.ID { func ID() cid.ID {
checksum := [sha256.Size]byte{} checksum := [sha256.Size]byte{}
_, _ = rand.Read(checksum[:]) rand.Read(checksum[:])
return IDWithChecksum(checksum) return IDWithChecksum(checksum)
} }

View file

@ -1,7 +1,7 @@
package frostfscrypto_test package frostfscrypto_test
import ( import (
"crypto/rand" "math/rand"
"testing" "testing"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/refs" "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/refs"

View file

@ -131,8 +131,7 @@ Its basic syntax is as follows:
REP <count> {IN <select>} REP <count> {IN <select>}
``` ```
If a select is not specified, then the entire netmap is used as input. The only exception to this rule is when exactly 1 replica and 1 selector are being present: in this case the only selector is being used instead of the whole netmap. If a select is not specified, then the entire netmap is used as input. The resulting nodes will be used to actually store objects and they constitute a replica group (or simply, "a replica").
The resulting nodes will be used to actually store objects and they constitute a replica group (or simply, "a replica").
Examples Examples
```sql ```sql

View file

@ -2,7 +2,7 @@ package eacltest
import ( import (
"bytes" "bytes"
"crypto/rand" "math/rand"
"testing" "testing"
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test" cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"

View file

@ -1,7 +1,7 @@
package eacl package eacl
import ( import (
"crypto/rand" "math/rand"
"testing" "testing"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"

View file

@ -24,7 +24,6 @@ type (
minAgg struct { minAgg struct {
min float64 min float64
minFound bool
} }
meanIQRAgg struct { meanIQRAgg struct {
@ -103,13 +102,7 @@ func (a *meanAgg) Compute() float64 {
} }
func (a *minAgg) Add(n float64) { func (a *minAgg) Add(n float64) {
if !a.minFound { if a.min == 0 || n < a.min {
a.min = n
a.minFound = true
return
}
if n < a.min {
a.min = n a.min = n
} }
} }

View file

@ -1,44 +0,0 @@
package netmap
import (
"testing"
"github.com/stretchr/testify/require"
)
func TestMinAgg(t *testing.T) {
tests := []struct {
vals []float64
res float64
}{
{
vals: []float64{1, 2, 3, 0, 10},
res: 0,
},
{
vals: []float64{10, 0, 10, 0},
res: 0,
},
{
vals: []float64{0, 1, 2, 3, 10},
res: 0,
},
{
vals: []float64{0, 0, 0, 0},
res: 0,
},
{
vals: []float64{10, 10, 10, 10},
res: 10,
},
}
for _, test := range tests {
a := newMinAgg()
for _, val := range test.vals {
a.Add(val)
}
require.Equal(t, test.res, a.Compute())
}
}

View file

@ -40,10 +40,6 @@ type context struct {
// policy uses the UNIQUE flag. Nodes marked as used are not used in subsequent // policy uses the UNIQUE flag. Nodes marked as used are not used in subsequent
// base selections. // base selections.
usedNodes map[uint64]bool usedNodes map[uint64]bool
// If true, returns an error when netmap does not contain enough nodes for selection.
// By default best effort is taken.
strict bool
} }
// Various validation errors. // Various validation errors.

View file

@ -2,7 +2,6 @@ package netmap
import ( import (
"encoding/json" "encoding/json"
"fmt"
"os" "os"
"path/filepath" "path/filepath"
"testing" "testing"
@ -48,8 +47,7 @@ func TestPlacementPolicy_Interopability(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
for i := range ds { for i := range ds {
filename := filepath.Join(testsDir, ds[i].Name()) bs, err := os.ReadFile(filepath.Join(testsDir, ds[i].Name()))
bs, err := os.ReadFile(filename)
require.NoError(t, err) require.NoError(t, err)
var tc TestCase var tc TestCase
@ -58,7 +56,7 @@ func TestPlacementPolicy_Interopability(t *testing.T) {
srcNodes := make([]NodeInfo, len(tc.Nodes)) srcNodes := make([]NodeInfo, len(tc.Nodes))
copy(srcNodes, tc.Nodes) copy(srcNodes, tc.Nodes)
t.Run(fmt.Sprintf("%s:%s", filename, tc.Name), func(t *testing.T) { t.Run(tc.Name, func(t *testing.T) {
var nm NetMap var nm NetMap
nm.SetNodes(tc.Nodes) nm.SetNodes(tc.Nodes)

View file

@ -146,19 +146,19 @@
"select 3 nodes in 3 distinct countries, same placement": { "select 3 nodes in 3 distinct countries, same placement": {
"policy": {"replicas":[{"count":1,"selector":"Main"}],"containerBackupFactor":1,"selectors":[{"name":"Main","count":3,"clause":"DISTINCT","attribute":"Country","filter":"*"}],"filters":[]}, "policy": {"replicas":[{"count":1,"selector":"Main"}],"containerBackupFactor":1,"selectors":[{"name":"Main","count":3,"clause":"DISTINCT","attribute":"Country","filter":"*"}],"filters":[]},
"pivot": "Y29udGFpbmVySUQ=", "pivot": "Y29udGFpbmVySUQ=",
"result": [[0, 2, 3]], "result": [[4, 0, 7]],
"placement": { "placement": {
"pivot": "b2JqZWN0SUQ=", "pivot": "b2JqZWN0SUQ=",
"result": [[0, 2, 3]] "result": [[4, 0, 7]]
} }
}, },
"select 6 nodes in 3 distinct countries, different placement": { "select 6 nodes in 3 distinct countries, different placement": {
"policy": {"replicas":[{"count":1,"selector":"Main"}],"containerBackupFactor":2,"selectors":[{"name":"Main","count":3,"clause":"DISTINCT","attribute":"Country","filter":"*"}],"filters":[]}, "policy": {"replicas":[{"count":1,"selector":"Main"}],"containerBackupFactor":2,"selectors":[{"name":"Main","count":3,"clause":"DISTINCT","attribute":"Country","filter":"*"}],"filters":[]},
"pivot": "Y29udGFpbmVySUQ=", "pivot": "Y29udGFpbmVySUQ=",
"result": [[0, 1, 2, 6, 3, 4]], "result": [[4, 3, 0, 1, 7, 2]],
"placement": { "placement": {
"pivot": "b2JqZWN0SUQ=", "pivot": "b2JqZWN0SUQ=",
"result": [[0, 1, 2, 6, 3, 4]] "result": [[4, 3, 0, 7, 2, 1]]
} }
} }
} }

View file

@ -1,95 +0,0 @@
{
"name": "non-strict selections",
"comment": "These test specify loose selection behaviour, to allow fetching already PUT objects even when there is not enough nodes to select from.",
"nodes": [
{
"attributes": [
{
"key": "Country",
"value": "Russia"
}
]
},
{
"attributes": [
{
"key": "Country",
"value": "Germany"
}
]
},
{
"attributes": [ ]
}
],
"tests": {
"not enough nodes (backup factor)": {
"policy": {
"replicas": [
{
"count": 1,
"selector": "MyStore"
}
],
"containerBackupFactor": 2,
"selectors": [
{
"name": "MyStore",
"count": 2,
"clause": "DISTINCT",
"attribute": "Country",
"filter": "FromRU"
}
],
"filters": [
{
"name": "FromRU",
"key": "Country",
"op": "EQ",
"value": "Russia",
"filters": [ ]
}
]
},
"result": [
[
0
]
]
},
"not enough nodes (buckets)": {
"policy": {
"replicas": [
{
"count": 1,
"selector": "MyStore"
}
],
"containerBackupFactor": 1,
"selectors": [
{
"name": "MyStore",
"count": 2,
"clause": "DISTINCT",
"attribute": "Country",
"filter": "FromRU"
}
],
"filters": [
{
"name": "FromRU",
"key": "Country",
"op": "EQ",
"value": "Russia",
"filters": [ ]
}
]
},
"result": [
[
0
]
]
}
}
}

View file

@ -104,14 +104,7 @@
"selectors": [], "selectors": [],
"filters": [] "filters": []
}, },
"result": [ "error": "not enough nodes"
[
0,
1,
2,
3
]
]
} }
} }
} }

View file

@ -24,12 +24,7 @@
"tests": { "tests": {
"missing filter": { "missing filter": {
"policy": { "policy": {
"replicas": [ "replicas": [],
{
"count": 1,
"selector": "MyStore"
}
],
"containerBackupFactor": 1, "containerBackupFactor": 1,
"selectors": [ "selectors": [
{ {
@ -52,14 +47,9 @@
}, },
"error": "filter not found" "error": "filter not found"
}, },
"not enough nodes (filter results in empty set)": { "not enough nodes (backup factor)": {
"policy": { "policy": {
"replicas": [ "replicas": [],
{
"count": 1,
"selector": "MyStore"
}
],
"containerBackupFactor": 2, "containerBackupFactor": 2,
"selectors": [ "selectors": [
{ {
@ -67,15 +57,40 @@
"count": 2, "count": 2,
"clause": "DISTINCT", "clause": "DISTINCT",
"attribute": "Country", "attribute": "Country",
"filter": "FromMoon" "filter": "FromRU"
} }
], ],
"filters": [ "filters": [
{ {
"name": "FromMoon", "name": "FromRU",
"key": "Country", "key": "Country",
"op": "EQ", "op": "EQ",
"value": "Moon", "value": "Russia",
"filters": []
}
]
},
"error": "not enough nodes"
},
"not enough nodes (buckets)": {
"policy": {
"replicas": [],
"containerBackupFactor": 1,
"selectors": [
{
"name": "MyStore",
"count": 2,
"clause": "DISTINCT",
"attribute": "Country",
"filter": "FromRU"
}
],
"filters": [
{
"name": "FromRU",
"key": "Country",
"op": "EQ",
"value": "Russia",
"filters": [] "filters": []
} }
] ]

View file

@ -238,7 +238,7 @@ func (m NetMap) ContainerNodes(p PlacementPolicy, pivot []byte) ([][]NodeInfo, e
// marked as used by earlier replicas. // marked as used by earlier replicas.
for i := range p.replicas { for i := range p.replicas {
sName := p.replicas[i].GetSelector() sName := p.replicas[i].GetSelector()
if sName == "" && !(len(p.replicas) == 1 && len(p.selectors) == 1) { if sName == "" {
var s netmap.Selector var s netmap.Selector
s.SetCount(p.replicas[i].GetCount()) s.SetCount(p.replicas[i].GetCount())
s.SetFilter(mainFilterName) s.SetFilter(mainFilterName)
@ -258,9 +258,6 @@ func (m NetMap) ContainerNodes(p PlacementPolicy, pivot []byte) ([][]NodeInfo, e
} }
if p.unique { if p.unique {
if c.processedSelectors[sName] == nil {
return nil, fmt.Errorf("selector not found: '%s'", sName)
}
nodes, err := c.getSelection(*c.processedSelectors[sName]) nodes, err := c.getSelection(*c.processedSelectors[sName])
if err != nil { if err != nil {
return nil, err return nil, err

View file

@ -551,7 +551,7 @@ func (p *PlacementPolicy) DecodeString(s string) error {
return errors.New("parsed nil value") return errors.New("parsed nil value")
} }
if err := validatePolicy(*parsed); err != nil { if err := validatePolicy(*p); err != nil {
return fmt.Errorf("invalid policy: %w", err) return fmt.Errorf("invalid policy: %w", err)
} }
@ -605,13 +605,6 @@ var (
errUnknownSelector = errors.New("policy: selector not found") errUnknownSelector = errors.New("policy: selector not found")
// errSyntaxError is returned for errors found by ANTLR parser. // errSyntaxError is returned for errors found by ANTLR parser.
errSyntaxError = errors.New("policy: syntax error") errSyntaxError = errors.New("policy: syntax error")
// errRedundantSelector is returned for errors found by selectors policy validator.
errRedundantSelector = errors.New("policy: found redundant selector")
// errUnnamedSelector is returned for errors found by selectors policy validator.
errUnnamedSelector = errors.New("policy: unnamed selectors are useless, " +
"make sure to pair REP and SELECT clauses: \"REP .. IN X\" + \"SELECT ... AS X\"")
// errRedundantSelector is returned for errors found by filters policy validator.
errRedundantFilter = errors.New("policy: found redundant filter")
) )
type policyVisitor struct { type policyVisitor struct {
@ -852,52 +845,25 @@ func (p *policyVisitor) VisitExpr(ctx *parser.ExprContext) any {
// validatePolicy checks high-level constraints such as filter link in SELECT // validatePolicy checks high-level constraints such as filter link in SELECT
// being actually defined in FILTER section. // being actually defined in FILTER section.
func validatePolicy(p PlacementPolicy) error { func validatePolicy(p PlacementPolicy) error {
canOmitNames := len(p.selectors) == 1 && len(p.replicas) == 1
seenFilters := map[string]bool{} seenFilters := map[string]bool{}
expectedFilters := map[string]struct{}{}
for i := range p.filters { for i := range p.filters {
seenFilters[p.filters[i].GetName()] = true seenFilters[p.filters[i].GetName()] = true
for _, f := range p.filters[i].GetFilters() {
if f.GetName() != "" {
expectedFilters[f.GetName()] = struct{}{}
}
}
} }
seenSelectors := map[string]*netmap.Selector{} seenSelectors := map[string]bool{}
for i := range p.selectors { for i := range p.selectors {
if p.selectors[i].GetName() == "" && !canOmitNames { if flt := p.selectors[i].GetFilter(); flt != mainFilterName && !seenFilters[flt] {
return errUnnamedSelector
}
if flt := p.selectors[i].GetFilter(); flt != mainFilterName {
expectedFilters[flt] = struct{}{}
if !seenFilters[flt] {
return fmt.Errorf("%w: '%s'", errUnknownFilter, flt) return fmt.Errorf("%w: '%s'", errUnknownFilter, flt)
} }
}
seenSelectors[p.selectors[i].GetName()] = &p.selectors[i] seenSelectors[p.selectors[i].GetName()] = true
} }
for _, f := range p.filters {
if _, ok := expectedFilters[f.GetName()]; !ok {
return fmt.Errorf("%w: '%s'", errRedundantFilter, f.GetName())
}
}
expectedSelectors := map[string]struct{}{}
for i := range p.replicas { for i := range p.replicas {
selName := p.replicas[i].GetSelector() if sel := p.replicas[i].GetSelector(); sel != "" && !seenSelectors[sel] {
if selName != "" || canOmitNames { return fmt.Errorf("%w: '%s'", errUnknownSelector, sel)
expectedSelectors[selName] = struct{}{}
if seenSelectors[selName] == nil {
return fmt.Errorf("%w: '%s'", errUnknownSelector, selName)
}
}
}
for _, s := range p.selectors {
if _, ok := expectedSelectors[s.GetName()]; !ok {
return fmt.Errorf("%w: to use selector '%s' use keyword IN", errRedundantSelector, s.GetName())
} }
} }

View file

@ -1,65 +0,0 @@
package netmap
import (
"strings"
"testing"
"github.com/stretchr/testify/require"
)
func TestDecodeString(t *testing.T) {
testCases := []string{
`REP 2
CBF 2
SELECT 2 FROM *`,
`REP 1 IN X
CBF 1
SELECT 2 IN SAME Location FROM * AS X`,
`REP 1 IN X
REP 2 IN Y
CBF 1
SELECT 2 FROM * AS X
SELECT 3 FROM * AS Y`,
`REP 1 IN X
SELECT 2 IN City FROM Good AS X
FILTER Country EQ RU AS FromRU
FILTER Country EQ EN AS FromEN
FILTER @FromRU AND @FromEN AND Rating GT 7 AS Good`,
`REP 7 IN SPB
SELECT 1 IN City FROM SPBSSD AS SPB
FILTER City EQ SPB AND SSD EQ true OR City EQ SPB AND Rating GE 5 AS SPBSSD`,
`REP 7 IN SPB
SELECT 1 IN City FROM SPBSSD AS SPB
FILTER NOT (NOT (City EQ SPB) AND SSD EQ true OR City EQ SPB AND Rating GE 5) AS SPBSSD`,
`UNIQUE
REP 1
REP 1`,
}
var p PlacementPolicy
for _, testCase := range testCases {
require.NoError(t, p.DecodeString(testCase), "unable parse %s", testCase)
var b strings.Builder
require.NoError(t, p.WriteStringTo(&b))
require.Equal(t, testCase, b.String())
}
invalidTestCases := map[string]error{
`?REP 1`: errSyntaxError,
`REP 1 trailing garbage`: errSyntaxError,
`REP 1 REP 1 SELECT 4 FROM *`: errUnnamedSelector,
`REP 1 SELECT 4 FROM * SELECT 1 FROM *`: errUnnamedSelector,
`REP 1 IN X SELECT 4 FROM *`: errUnknownSelector,
`REP 1 IN X REP 2 SELECT 4 FROM * AS X FILTER 'UN-LOCODE' EQ 'RU LED' AS F`: errRedundantFilter,
}
for i := range invalidTestCases {
require.ErrorIs(t, p.DecodeString(i), invalidTestCases[i], "#%s", i)
}
}

View file

@ -1,6 +1,7 @@
package netmap_test package netmap_test
import ( import (
"strings"
"testing" "testing"
. "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" . "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
@ -8,6 +9,55 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
func TestEncode(t *testing.T) {
testCases := []string{
`REP 1 IN X
CBF 1
SELECT 2 IN SAME Location FROM * AS X`,
`REP 1
SELECT 2 IN City FROM Good
FILTER Country EQ RU AS FromRU
FILTER @FromRU AND Rating GT 7 AS Good`,
`REP 7 IN SPB
SELECT 1 IN City FROM SPBSSD AS SPB
FILTER City EQ SPB AND SSD EQ true OR City EQ SPB AND Rating GE 5 AS SPBSSD`,
`REP 7 IN SPB
SELECT 1 IN City FROM SPBSSD AS SPB
FILTER NOT (NOT (City EQ SPB) AND SSD EQ true OR City EQ SPB AND Rating GE 5) AS SPBSSD`,
`UNIQUE
REP 1
REP 1`,
`REP 1 IN X
SELECT 1 FROM F AS X
FILTER 'UN-LOCODE' EQ 'RU LED' AS F`,
}
var p PlacementPolicy
for _, testCase := range testCases {
require.NoError(t, p.DecodeString(testCase))
var b strings.Builder
require.NoError(t, p.WriteStringTo(&b))
require.Equal(t, testCase, b.String())
}
invalidTestCases := []string{
`?REP 1`,
`REP 1 trailing garbage`,
}
for i := range invalidTestCases {
require.Error(t, p.DecodeString(invalidTestCases[i]), "#%d", i)
}
}
func TestPlacementPolicyEncoding(t *testing.T) { func TestPlacementPolicyEncoding(t *testing.T) {
v := netmaptest.PlacementPolicy() v := netmaptest.PlacementPolicy()

View file

@ -60,7 +60,7 @@ func (c *context) getSelection(s netmap.Selector) ([]nodes, error) {
bucketCount, nodesInBucket := calcNodesCount(s) bucketCount, nodesInBucket := calcNodesCount(s)
buckets := c.getSelectionBase(s) buckets := c.getSelectionBase(s)
if c.strict && len(buckets) < bucketCount { if len(buckets) < bucketCount {
return nil, fmt.Errorf("%w: '%s'", errNotEnoughNodes, s.GetName()) return nil, fmt.Errorf("%w: '%s'", errNotEnoughNodes, s.GetName())
} }
@ -96,7 +96,7 @@ func (c *context) getSelection(s netmap.Selector) ([]nodes, error) {
if len(res) < bucketCount { if len(res) < bucketCount {
// Fallback to using minimum allowed backup factor (1). // Fallback to using minimum allowed backup factor (1).
res = append(res, fallback...) res = append(res, fallback...)
if c.strict && len(res) < bucketCount { if len(res) < bucketCount {
return nil, fmt.Errorf("%w: '%s'", errNotEnoughNodes, s.GetName()) return nil, fmt.Errorf("%w: '%s'", errNotEnoughNodes, s.GetName())
} }
} }
@ -110,13 +110,6 @@ func (c *context) getSelection(s netmap.Selector) ([]nodes, error) {
hrw.SortHasherSliceByWeightValue(res, weights, c.hrwSeedHash) hrw.SortHasherSliceByWeightValue(res, weights, c.hrwSeedHash)
} }
if len(res) < bucketCount {
if len(res) == 0 {
return nil, errNotEnoughNodes
}
bucketCount = len(res)
}
if s.GetAttribute() == "" { if s.GetAttribute() == "" {
res, fallback = res[:bucketCount], res[bucketCount:] res, fallback = res[:bucketCount], res[bucketCount:]
for i := range fallback { for i := range fallback {

View file

@ -1,10 +1,9 @@
package netmap package netmap
import ( import (
"crypto/rand"
"encoding/binary" "encoding/binary"
"fmt" "fmt"
mrand "math/rand" "math/rand"
"sort" "sort"
"strconv" "strconv"
"testing" "testing"
@ -29,10 +28,10 @@ func BenchmarkHRWSort(b *testing.B) {
node.SetPublicKey(key) node.SetPublicKey(key)
vectors[i] = nodes{node} vectors[i] = nodes{node}
weights[i] = float64(mrand.Uint32()%10) / 10.0 weights[i] = float64(rand.Uint32()%10) / 10.0
} }
pivot := mrand.Uint64() pivot := rand.Uint64()
b.Run("sort by index, no weight", func(b *testing.B) { b.Run("sort by index, no weight", func(b *testing.B) {
realNodes := make([]nodes, netmapSize) realNodes := make([]nodes, netmapSize)
b.ResetTimer() b.ResetTimer()
@ -283,55 +282,6 @@ func TestPlacementPolicy_Unique(t *testing.T) {
} }
} }
func TestPlacementPolicy_SingleOmitNames(t *testing.T) {
nodes := []NodeInfo{
nodeInfoFromAttributes("ID", "1", "Country", "Russia", "City", "SPB"),
nodeInfoFromAttributes("ID", "2", "Country", "Germany", "City", "Berlin"),
nodeInfoFromAttributes("ID", "3", "Country", "Russia", "City", "Moscow"),
nodeInfoFromAttributes("ID", "4", "Country", "France", "City", "Paris"),
nodeInfoFromAttributes("ID", "5", "Country", "France", "City", "Lyon"),
nodeInfoFromAttributes("ID", "6", "Country", "Russia", "City", "SPB"),
nodeInfoFromAttributes("ID", "7", "Country", "Russia", "City", "Moscow"),
nodeInfoFromAttributes("ID", "8", "Country", "Germany", "City", "Darmstadt"),
nodeInfoFromAttributes("ID", "9", "Country", "Germany", "City", "Frankfurt"),
nodeInfoFromAttributes("ID", "10", "Country", "Russia", "City", "SPB"),
nodeInfoFromAttributes("ID", "11", "Country", "Russia", "City", "Moscow"),
nodeInfoFromAttributes("ID", "12", "Country", "Germany", "City", "London"),
}
for i := range nodes {
pub := make([]byte, 33)
rand.Read(pub)
nodes[i].SetPublicKey(pub)
}
var nm NetMap
nm.SetNodes(nodes)
for _, unique := range []bool{false, true} {
t.Run(fmt.Sprintf("unique=%t", unique), func(t *testing.T) {
ssNamed := []Selector{newSelector("X", "City", 2, "FromRU", (*Selector).SelectDistinct)}
fsNamed := []Filter{newFilter("FromRU", "Country", "Russia", netmap.EQ)}
rsNamed := []ReplicaDescriptor{newReplica(1, "X")}
pNamed := newPlacementPolicy(3, rsNamed, ssNamed, fsNamed)
pNamed.unique = unique
vNamed, err := nm.ContainerNodes(pNamed, []byte{1})
require.NoError(t, err)
ssUnnamed := []Selector{newSelector("", "City", 2, "FromRU", (*Selector).SelectDistinct)}
fsUnnamed := []Filter{newFilter("FromRU", "Country", "Russia", netmap.EQ)}
rsUnnamed := []ReplicaDescriptor{newReplica(1, "")}
pUnnamed := newPlacementPolicy(3, rsUnnamed, ssUnnamed, fsUnnamed)
pUnnamed.unique = unique
vUnnamed, err := nm.ContainerNodes(pUnnamed, []byte{1})
require.NoError(t, err)
require.Equal(t, vNamed, vUnnamed)
})
}
}
func TestPlacementPolicy_MultiREP(t *testing.T) { func TestPlacementPolicy_MultiREP(t *testing.T) {
nodes := []NodeInfo{ nodes := []NodeInfo{
nodeInfoFromAttributes("ID", "1", "Country", "Russia", "City", "SPB"), nodeInfoFromAttributes("ID", "1", "Country", "Russia", "City", "SPB"),

View file

@ -1,7 +1,7 @@
package netmaptest package netmaptest
import ( import (
"crypto/rand" "math/rand"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
) )
@ -70,7 +70,7 @@ func NetworkInfo() (x netmap.NetworkInfo) {
// NodeInfo returns random netmap.NodeInfo. // NodeInfo returns random netmap.NodeInfo.
func NodeInfo() (x netmap.NodeInfo) { func NodeInfo() (x netmap.NodeInfo) {
key := make([]byte, 33) key := make([]byte, 33)
_, _ = rand.Read(key) rand.Read(key)
x.SetPublicKey(key) x.SetPublicKey(key)
x.SetNetworkEndpoints("1", "2", "3") x.SetNetworkEndpoints("1", "2", "3")

View file

@ -1,10 +1,10 @@
package ns package ns
import ( import (
"crypto/rand"
"errors" "errors"
"fmt" "fmt"
"math/big" "math/big"
"math/rand"
"strings" "strings"
"testing" "testing"

View file

@ -1,8 +1,8 @@
package oidtest package oidtest
import ( import (
"crypto/rand"
"crypto/sha256" "crypto/sha256"
"math/rand"
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test" cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
@ -12,7 +12,7 @@ import (
func ID() oid.ID { func ID() oid.ID {
checksum := [sha256.Size]byte{} checksum := [sha256.Size]byte{}
_, _ = rand.Read(checksum[:]) rand.Read(checksum[:])
return idWithChecksum(checksum) return idWithChecksum(checksum)
} }

View file

@ -1,8 +1,8 @@
package object_test package object_test
import ( import (
"crypto/rand"
"crypto/sha256" "crypto/sha256"
"math/rand"
"testing" "testing"
v2object "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object" v2object "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"

View file

@ -1,8 +1,8 @@
package object package object
import ( import (
"crypto/rand"
"crypto/sha256" "crypto/sha256"
"math/rand"
"testing" "testing"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/tombstone" "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/tombstone"

View file

@ -69,7 +69,3 @@ func (c *sessionCache) expired(val *cacheValue) bool {
// use epoch+1 (clear cache beforehand) to prevent 'expired session token' error right after epoch tick // use epoch+1 (clear cache beforehand) to prevent 'expired session token' error right after epoch tick
return val.token.ExpiredAt(epoch + 1) return val.token.ExpiredAt(epoch + 1)
} }
func (c *sessionCache) Epoch() uint64 {
return c.currentEpoch.Load()
}

View file

@ -189,7 +189,3 @@ func (m *mockClient) restartIfUnhealthy(ctx context.Context) (healthy bool, chan
} }
return return
} }
func (m *mockClient) close() error {
return nil
}

View file

@ -1,172 +0,0 @@
package pool
import (
"context"
sdkClient "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/transformer"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
type logger interface {
log(level zapcore.Level, msg string, fields ...zap.Field)
}
type PrmObjectPutClientCutInit struct {
PrmObjectPut
}
func (c *clientWrapper) objectPutInitTransformer(prm PrmObjectPutClientCutInit) (*objectWriterTransformer, error) {
cl, err := c.getClient()
if err != nil {
return nil, err
}
var w objectWriterTransformer
w.it = internalTarget{
client: cl,
prm: prm,
address: c.address(),
logger: &c.clientStatusMonitor,
}
key := &c.prm.key
if prm.key != nil {
key = prm.key
}
w.ot = transformer.NewPayloadSizeLimiter(transformer.Params{
Key: key,
NextTargetInit: func() transformer.ObjectWriter { return &w.it },
MaxSize: prm.networkInfo.MaxObjectSize(),
WithoutHomomorphicHash: prm.withoutHomomorphicHash,
NetworkState: prm.networkInfo,
SessionToken: prm.stoken,
})
return &w, nil
}
type objectWriterTransformer struct {
ot transformer.ChunkedObjectWriter
it internalTarget
err error
}
func (x *objectWriterTransformer) WriteHeader(ctx context.Context, hdr object.Object) bool {
x.err = x.ot.WriteHeader(ctx, &hdr)
return x.err == nil
}
func (x *objectWriterTransformer) WritePayloadChunk(ctx context.Context, chunk []byte) bool {
_, x.err = x.ot.Write(ctx, chunk)
return x.err == nil
}
// ResObjectPut groups the final result values of ObjectPutInit operation.
type ResObjectPut struct {
Status apistatus.Status
OID oid.ID
}
// Close return non nil result in any case. If error occurred, the result contains only buffer for further reusing.
func (x *objectWriterTransformer) Close(ctx context.Context) (*ResObjectPut, error) {
ai, err := x.ot.Close(ctx)
if err != nil {
return nil, err
}
if ai != nil && ai.ParentID != nil {
x.it.res.OID = *ai.ParentID
}
return &x.it.res, nil
}
type internalTarget struct {
client *sdkClient.Client
res ResObjectPut
prm PrmObjectPutClientCutInit
useStream bool
address string
logger logger
resolveFrostFSErrors bool
}
func (it *internalTarget) WriteObject(ctx context.Context, o *object.Object) error {
putSingleImplemented, err := it.tryPutSingle(ctx, o)
if putSingleImplemented {
return err
}
it.logger.log(zapcore.DebugLevel, "putSingle not implemented, trying put as stream", zap.String("address", it.address))
it.useStream = true
return it.putAsStream(ctx, o)
}
func (it *internalTarget) putAsStream(ctx context.Context, o *object.Object) error {
var cliPrm sdkClient.PrmObjectPutInit
cliPrm.SetCopiesNumberByVectors(it.prm.copiesNumber)
if it.prm.stoken != nil {
cliPrm.WithinSession(*it.prm.stoken)
}
if it.prm.key != nil {
cliPrm.UseKey(*it.prm.key)
}
if it.prm.btoken != nil {
cliPrm.WithBearerToken(*it.prm.btoken)
}
wrt, err := it.client.ObjectPutInit(ctx, cliPrm)
if err != nil {
return err
}
if wrt.WriteHeader(ctx, *o) {
wrt.WritePayloadChunk(ctx, o.Payload())
}
res, err := wrt.Close(ctx)
if res != nil {
it.res.Status = res.Status()
it.res.OID = res.StoredObjectID()
}
return err
}
func (it *internalTarget) tryPutSingle(ctx context.Context, o *object.Object) (bool, error) {
if it.useStream {
return false, nil
}
var cliPrm sdkClient.PrmObjectPutSingle
cliPrm.SetCopiesNumber(it.prm.copiesNumber)
cliPrm.UseKey(it.prm.key)
if it.prm.stoken != nil {
cliPrm.WithinSession(*it.prm.stoken)
}
if it.prm.btoken != nil {
cliPrm.WithBearerToken(*it.prm.btoken)
}
cliPrm.SetObject(o.ToV2())
res, err := it.client.ObjectPutSingle(ctx, cliPrm)
if err != nil && status.Code(err) == codes.Unimplemented {
return false, err
}
if err == nil {
id, _ := o.ID()
it.res = ResObjectPut{
Status: res.Status(),
OID: id,
}
if !it.resolveFrostFSErrors && !apistatus.IsSuccessful(it.res.Status) {
return true, apistatus.ErrFromStatus(it.res.Status)
}
return true, nil
}
return true, err
}

View file

@ -78,16 +78,12 @@ type client interface {
dial(ctx context.Context) error dial(ctx context.Context) error
// see clientWrapper.restartIfUnhealthy. // see clientWrapper.restartIfUnhealthy.
restartIfUnhealthy(ctx context.Context) (bool, bool) restartIfUnhealthy(ctx context.Context) (bool, bool)
// see clientWrapper.close.
close() error
} }
// clientStatus provide access to some metrics for connection. // clientStatus provide access to some metrics for connection.
type clientStatus interface { type clientStatus interface {
// isHealthy checks if the connection can handle requests. // isHealthy checks if the connection can handle requests.
isHealthy() bool isHealthy() bool
// isDialed checks if the connection was created.
isDialed() bool
// setUnhealthy marks client as unhealthy. // setUnhealthy marks client as unhealthy.
setUnhealthy() setUnhealthy()
// address return address of endpoint. // address return address of endpoint.
@ -109,7 +105,7 @@ var errPoolClientUnhealthy = errors.New("pool client unhealthy")
type clientStatusMonitor struct { type clientStatusMonitor struct {
logger *zap.Logger logger *zap.Logger
addr string addr string
healthy *atomic.Uint32 healthy *atomic.Bool
errorThreshold uint32 errorThreshold uint32
mu sync.RWMutex // protect counters mu sync.RWMutex // protect counters
@ -118,22 +114,6 @@ type clientStatusMonitor struct {
methods []*methodStatus methods []*methodStatus
} }
// values for healthy status of clientStatusMonitor.
const (
// statusUnhealthyOnDial is set when dialing to the endpoint is failed,
// so there is no connection to the endpoint, and pool should not close it
// before re-establishing connection once again.
statusUnhealthyOnDial = iota
// statusUnhealthyOnRequest is set when communication after dialing to the
// endpoint is failed due to immediate or accumulated errors, connection is
// available and pool should close it before re-establishing connection once again.
statusUnhealthyOnRequest
// statusHealthy is set when connection is ready to be used by the pool.
statusHealthy
)
// methodStatus provide statistic for specific method. // methodStatus provide statistic for specific method.
type methodStatus struct { type methodStatus struct {
name string name string
@ -215,8 +195,8 @@ func newClientStatusMonitor(logger *zap.Logger, addr string, errorThreshold uint
methods[i] = &methodStatus{name: i.String()} methods[i] = &methodStatus{name: i.String()}
} }
healthy := new(atomic.Uint32) healthy := new(atomic.Bool)
healthy.Store(statusHealthy) healthy.Store(true)
return clientStatusMonitor{ return clientStatusMonitor{
logger: logger, logger: logger,
@ -272,11 +252,6 @@ func (x *wrapperPrm) setKey(key ecdsa.PrivateKey) {
x.key = key x.key = key
} }
// setLogger sets sdkClient.Client logger.
func (x *wrapperPrm) setLogger(logger *zap.Logger) {
x.logger = logger
}
// setDialTimeout sets the timeout for connection to be established. // setDialTimeout sets the timeout for connection to be established.
func (x *wrapperPrm) setDialTimeout(timeout time.Duration) { func (x *wrapperPrm) setDialTimeout(timeout time.Duration) {
x.dialTimeout = timeout x.dialTimeout = timeout
@ -342,7 +317,7 @@ func (c *clientWrapper) dial(ctx context.Context) error {
prmDial.SetGRPCDialOptions(c.prm.dialOptions...) prmDial.SetGRPCDialOptions(c.prm.dialOptions...)
if err = cl.Dial(ctx, prmDial); err != nil { if err = cl.Dial(ctx, prmDial); err != nil {
c.setUnhealthyOnDial() c.setUnhealthy()
return err return err
} }
@ -359,12 +334,6 @@ func (c *clientWrapper) restartIfUnhealthy(ctx context.Context) (healthy, change
wasHealthy = true wasHealthy = true
} }
// if connection is dialed before, to avoid routine / connection leak,
// pool has to close it and then initialize once again.
if c.isDialed() {
_ = c.close()
}
var cl sdkClient.Client var cl sdkClient.Client
var prmInit sdkClient.PrmInit var prmInit sdkClient.PrmInit
prmInit.SetDefaultPrivateKey(c.prm.key) prmInit.SetDefaultPrivateKey(c.prm.key)
@ -379,7 +348,7 @@ func (c *clientWrapper) restartIfUnhealthy(ctx context.Context) (healthy, change
prmDial.SetGRPCDialOptions(c.prm.dialOptions...) prmDial.SetGRPCDialOptions(c.prm.dialOptions...)
if err := cl.Dial(ctx, prmDial); err != nil { if err := cl.Dial(ctx, prmDial); err != nil {
c.setUnhealthyOnDial() c.setUnhealthy()
return false, wasHealthy return false, wasHealthy
} }
@ -580,9 +549,11 @@ func (c *clientWrapper) containerSetEACL(ctx context.Context, prm PrmContainerSe
return err return err
} }
cliPrm := sdkClient.PrmContainerSetEACL{ var cliPrm sdkClient.PrmContainerSetEACL
Table: &prm.Table, cliPrm.SetTable(prm.table)
Session: prm.Session,
if prm.sessionSet {
cliPrm.WithinSession(prm.session)
} }
start := time.Now() start := time.Now()
@ -596,19 +567,16 @@ func (c *clientWrapper) containerSetEACL(ctx context.Context, prm PrmContainerSe
return fmt.Errorf("set eacl on client: %w", err) return fmt.Errorf("set eacl on client: %w", err)
} }
if prm.WaitParams == nil { if !prm.waitParamsSet {
prm.WaitParams = defaultWaitParams() prm.waitParams.setDefaults()
}
if err := prm.WaitParams.CheckValidity(); err != nil {
return fmt.Errorf("invalid wait parameters: %w", err)
} }
var cIDp *cid.ID var cIDp *cid.ID
if cID, set := prm.Table.CID(); set { if cID, set := prm.table.CID(); set {
cIDp = &cID cIDp = &cID
} }
err = waitForEACLPresence(ctx, c, cIDp, &prm.Table, prm.WaitParams) err = waitForEACLPresence(ctx, c, cIDp, &prm.table, &prm.waitParams)
if err = c.handleError(ctx, nil, err); err != nil { if err = c.handleError(ctx, nil, err); err != nil {
return fmt.Errorf("wait eacl presence on client: %w", err) return fmt.Errorf("wait eacl presence on client: %w", err)
} }
@ -660,18 +628,6 @@ func (c *clientWrapper) networkInfo(ctx context.Context, _ prmNetworkInfo) (netm
// objectPut writes object to FrostFS. // objectPut writes object to FrostFS.
func (c *clientWrapper) objectPut(ctx context.Context, prm PrmObjectPut) (oid.ID, error) { func (c *clientWrapper) objectPut(ctx context.Context, prm PrmObjectPut) (oid.ID, error) {
if prm.bufferMaxSize == 0 {
prm.bufferMaxSize = defaultBufferMaxSizeForPut
}
if prm.clientCut {
return c.objectPutClientCut(ctx, prm)
}
return c.objectPutServerCut(ctx, prm)
}
func (c *clientWrapper) objectPutServerCut(ctx context.Context, prm PrmObjectPut) (oid.ID, error) {
cl, err := c.getClient() cl, err := c.getClient()
if err != nil { if err != nil {
return oid.ID{}, err return oid.ID{}, err
@ -709,8 +665,10 @@ func (c *clientWrapper) objectPutServerCut(ctx context.Context, prm PrmObjectPut
} }
if prm.payload != nil { if prm.payload != nil {
if sz == 0 || sz > prm.bufferMaxSize { const defaultBufferSizePut = 3 << 20 // configure?
sz = prm.bufferMaxSize
if sz == 0 || sz > defaultBufferSizePut {
sz = defaultBufferSizePut
} }
buf := make([]byte, sz) buf := make([]byte, sz)
@ -751,73 +709,6 @@ func (c *clientWrapper) objectPutServerCut(ctx context.Context, prm PrmObjectPut
return res.StoredObjectID(), nil return res.StoredObjectID(), nil
} }
func (c *clientWrapper) objectPutClientCut(ctx context.Context, prm PrmObjectPut) (oid.ID, error) {
putInitPrm := PrmObjectPutClientCutInit{
PrmObjectPut: prm,
}
start := time.Now()
wObj, err := c.objectPutInitTransformer(putInitPrm)
c.incRequests(time.Since(start), methodObjectPut)
if err = c.handleError(ctx, nil, err); err != nil {
return oid.ID{}, fmt.Errorf("init writing on API client: %w", err)
}
if wObj.WriteHeader(ctx, prm.hdr) {
sz := prm.hdr.PayloadSize()
if data := prm.hdr.Payload(); len(data) > 0 {
if prm.payload != nil {
prm.payload = io.MultiReader(bytes.NewReader(data), prm.payload)
} else {
prm.payload = bytes.NewReader(data)
sz = uint64(len(data))
}
}
if prm.payload != nil {
if sz == 0 || sz > prm.bufferMaxSize {
sz = prm.bufferMaxSize
}
buf := make([]byte, sz)
var n int
for {
n, err = prm.payload.Read(buf)
if n > 0 {
start = time.Now()
successWrite := wObj.WritePayloadChunk(ctx, buf[:n])
c.incRequests(time.Since(start), methodObjectPut)
if !successWrite {
break
}
continue
}
if errors.Is(err, io.EOF) {
break
}
return oid.ID{}, fmt.Errorf("read payload: %w", c.handleError(ctx, nil, err))
}
}
}
res, err := wObj.Close(ctx)
var st apistatus.Status
if res != nil {
st = res.Status
}
if err = c.handleError(ctx, st, err); err != nil { // here err already carries both status and client errors
return oid.ID{}, fmt.Errorf("client failure: %w", err)
}
return res.OID, nil
}
// objectDelete invokes sdkClient.ObjectDelete parse response status to error. // objectDelete invokes sdkClient.ObjectDelete parse response status to error.
func (c *clientWrapper) objectDelete(ctx context.Context, prm PrmObjectDelete) error { func (c *clientWrapper) objectDelete(ctx context.Context, prm PrmObjectDelete) error {
cl, err := c.getClient() cl, err := c.getClient()
@ -825,15 +716,20 @@ func (c *clientWrapper) objectDelete(ctx context.Context, prm PrmObjectDelete) e
return err return err
} }
cnr := prm.addr.Container() var cliPrm sdkClient.PrmObjectDelete
obj := prm.addr.Object() cliPrm.FromContainer(prm.addr.Container())
cliPrm.ByID(prm.addr.Object())
cliPrm := sdkClient.PrmObjectDelete{ if prm.stoken != nil {
BearerToken: prm.btoken, cliPrm.WithinSession(*prm.stoken)
Session: prm.stoken, }
ContainerID: &cnr,
ObjectID: &obj, if prm.btoken != nil {
Key: prm.key, cliPrm.WithBearerToken(*prm.btoken)
}
if prm.key != nil {
cliPrm.UseKey(*prm.key)
} }
start := time.Now() start := time.Now()
@ -856,15 +752,20 @@ func (c *clientWrapper) objectGet(ctx context.Context, prm PrmObjectGet) (ResGet
return ResGetObject{}, err return ResGetObject{}, err
} }
prmCnr := prm.addr.Container() var cliPrm sdkClient.PrmObjectGet
prmObj := prm.addr.Object() cliPrm.FromContainer(prm.addr.Container())
cliPrm.ByID(prm.addr.Object())
cliPrm := sdkClient.PrmObjectGet{ if prm.stoken != nil {
BearerToken: prm.btoken, cliPrm.WithinSession(*prm.stoken)
Session: prm.stoken, }
ContainerID: &prmCnr,
ObjectID: &prmObj, if prm.btoken != nil {
Key: prm.key, cliPrm.WithBearerToken(*prm.btoken)
}
if prm.key != nil {
cliPrm.UseKey(*prm.key)
} }
var res ResGetObject var res ResGetObject
@ -904,16 +805,23 @@ func (c *clientWrapper) objectHead(ctx context.Context, prm PrmObjectHead) (obje
return object.Object{}, err return object.Object{}, err
} }
prmCnr := prm.addr.Container() var cliPrm sdkClient.PrmObjectHead
prmObj := prm.addr.Object() cliPrm.FromContainer(prm.addr.Container())
cliPrm.ByID(prm.addr.Object())
if prm.raw {
cliPrm.MarkRaw()
}
cliPrm := sdkClient.PrmObjectHead{ if prm.stoken != nil {
BearerToken: prm.btoken, cliPrm.WithinSession(*prm.stoken)
Session: prm.stoken, }
Raw: prm.raw,
ContainerID: &prmCnr, if prm.btoken != nil {
ObjectID: &prmObj, cliPrm.WithBearerToken(*prm.btoken)
Key: prm.key, }
if prm.key != nil {
cliPrm.UseKey(*prm.key)
} }
var obj object.Object var obj object.Object
@ -942,17 +850,22 @@ func (c *clientWrapper) objectRange(ctx context.Context, prm PrmObjectRange) (Re
return ResObjectRange{}, err return ResObjectRange{}, err
} }
prmCnr := prm.addr.Container() var cliPrm sdkClient.PrmObjectRange
prmObj := prm.addr.Object() cliPrm.FromContainer(prm.addr.Container())
cliPrm.ByID(prm.addr.Object())
cliPrm.SetOffset(prm.off)
cliPrm.SetLength(prm.ln)
cliPrm := sdkClient.PrmObjectRange{ if prm.stoken != nil {
BearerToken: prm.btoken, cliPrm.WithinSession(*prm.stoken)
Session: prm.stoken, }
ContainerID: &prmCnr,
ObjectID: &prmObj, if prm.btoken != nil {
Offset: prm.off, cliPrm.WithBearerToken(*prm.btoken)
Length: prm.ln, }
Key: prm.key,
if prm.key != nil {
cliPrm.UseKey(*prm.key)
} }
start := time.Now() start := time.Now()
@ -1009,10 +922,9 @@ func (c *clientWrapper) sessionCreate(ctx context.Context, prm prmCreateSession)
return resCreateSession{}, err return resCreateSession{}, err
} }
cliPrm := sdkClient.PrmSessionCreate{ var cliPrm sdkClient.PrmSessionCreate
Expiration: prm.exp, cliPrm.SetExp(prm.exp)
Key: &prm.key, cliPrm.UseKey(prm.key)
}
start := time.Now() start := time.Now()
res, err := cl.SessionCreate(ctx, cliPrm) res, err := cl.SessionCreate(ctx, cliPrm)
@ -1032,23 +944,15 @@ func (c *clientWrapper) sessionCreate(ctx context.Context, prm prmCreateSession)
} }
func (c *clientStatusMonitor) isHealthy() bool { func (c *clientStatusMonitor) isHealthy() bool {
return c.healthy.Load() == statusHealthy return c.healthy.Load()
}
func (c *clientStatusMonitor) isDialed() bool {
return c.healthy.Load() != statusUnhealthyOnDial
} }
func (c *clientStatusMonitor) setHealthy() { func (c *clientStatusMonitor) setHealthy() {
c.healthy.Store(statusHealthy) c.healthy.Store(true)
} }
func (c *clientStatusMonitor) setUnhealthy() { func (c *clientStatusMonitor) setUnhealthy() {
c.healthy.Store(statusUnhealthyOnRequest) c.healthy.Store(false)
}
func (c *clientStatusMonitor) setUnhealthyOnDial() {
c.healthy.Store(statusUnhealthyOnDial)
} }
func (c *clientStatusMonitor) address() string { func (c *clientStatusMonitor) address() string {
@ -1067,20 +971,12 @@ func (c *clientStatusMonitor) incErrorRate() {
} }
c.mu.Unlock() c.mu.Unlock()
if thresholdReached { if thresholdReached && c.logger != nil {
c.log(zapcore.WarnLevel, "error threshold reached", c.logger.Warn("error threshold reached",
zap.String("address", c.addr), zap.Uint32("threshold", c.errorThreshold)) zap.String("address", c.addr), zap.Uint32("threshold", c.errorThreshold))
} }
} }
func (c *clientStatusMonitor) log(level zapcore.Level, msg string, fields ...zap.Field) {
if c.logger == nil {
return
}
c.logger.Log(level, msg, fields...)
}
func (c *clientStatusMonitor) currentErrorRate() uint32 { func (c *clientStatusMonitor) currentErrorRate() uint32 {
c.mu.RLock() c.mu.RLock()
defer c.mu.RUnlock() defer c.mu.RUnlock()
@ -1114,13 +1010,6 @@ func (c *clientWrapper) incRequests(elapsed time.Duration, method MethodIndex) {
} }
} }
func (c *clientWrapper) close() error {
if c.client != nil {
return c.client.Close()
}
return nil
}
func (c *clientStatusMonitor) handleError(ctx context.Context, st apistatus.Status, err error) error { func (c *clientStatusMonitor) handleError(ctx context.Context, st apistatus.Status, err error) error {
if err != nil { if err != nil {
if needCountError(ctx, err) { if needCountError(ctx, err) {
@ -1338,6 +1227,12 @@ func (x *WaitParams) SetPollInterval(tick time.Duration) {
x.PollInterval = tick x.PollInterval = tick
} }
// Deprecated: Use defaultWaitParams() instead.
func (x *WaitParams) setDefaults() {
x.Timeout = 120 * time.Second
x.PollInterval = 5 * time.Second
}
func defaultWaitParams() *WaitParams { func defaultWaitParams() *WaitParams {
return &WaitParams{ return &WaitParams{
Timeout: 120 * time.Second, Timeout: 120 * time.Second,
@ -1426,13 +1321,6 @@ type PrmObjectPut struct {
payload io.Reader payload io.Reader
copiesNumber []uint32 copiesNumber []uint32
clientCut bool
networkInfo netmap.NetworkInfo
withoutHomomorphicHash bool
bufferMaxSize uint64
} }
// SetHeader specifies header of the object. // SetHeader specifies header of the object.
@ -1457,32 +1345,6 @@ func (x *PrmObjectPut) SetCopiesNumberVector(copiesNumber []uint32) {
x.copiesNumber = copiesNumber x.copiesNumber = copiesNumber
} }
// SetClientCut enables client cut for objects. It means that full object is prepared on client side
// and retrying is possible. But this leads to additional memory using for buffering object parts.
// Buffer size for every put is MaxObjectSize value from FrostFS network.
// There is limit for total memory allocation for in-flight request and
// can be set by InitParameters.SetMaxClientCutMemory (default value is 1gb).
// Put requests will fail if this limit be reached.
func (x *PrmObjectPut) SetClientCut(clientCut bool) {
x.clientCut = clientCut
}
// WithoutHomomorphicHash if set to true do not use Tillich-Zémor hash for payload.
func (x *PrmObjectPut) WithoutHomomorphicHash(v bool) {
x.withoutHomomorphicHash = v
}
// SetBufferMaxSize sets max buffer size to read payload.
// This value isn't used if object size is set explicitly and less than this value.
// Default value 3MB.
func (x *PrmObjectPut) SetBufferMaxSize(size uint64) {
x.bufferMaxSize = size
}
func (x *PrmObjectPut) setNetworkInfo(ni netmap.NetworkInfo) {
x.networkInfo = ni
}
// PrmObjectDelete groups parameters of DeleteObject operation. // PrmObjectDelete groups parameters of DeleteObject operation.
type PrmObjectDelete struct { type PrmObjectDelete struct {
prmCommon prmCommon
@ -1670,41 +1532,39 @@ func (x *PrmContainerEACL) SetContainerID(cnrID cid.ID) {
// PrmContainerSetEACL groups parameters of SetEACL operation. // PrmContainerSetEACL groups parameters of SetEACL operation.
type PrmContainerSetEACL struct { type PrmContainerSetEACL struct {
Table eacl.Table table eacl.Table
Session *session.Container sessionSet bool
session session.Container
WaitParams *WaitParams waitParams WaitParams
waitParamsSet bool
} }
// SetTable sets structure of container's extended ACL to be used as a // SetTable sets structure of container's extended ACL to be used as a
// parameter of the base client's operation. // parameter of the base client's operation.
// //
// See git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client.PrmContainerSetEACL.SetTable. // See git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client.PrmContainerSetEACL.SetTable.
//
// Deprecated: Use PrmContainerSetEACL.Table instead.
func (x *PrmContainerSetEACL) SetTable(table eacl.Table) { func (x *PrmContainerSetEACL) SetTable(table eacl.Table) {
x.Table = table x.table = table
} }
// WithinSession specifies session to be used as a parameter of the base // WithinSession specifies session to be used as a parameter of the base
// client's operation. // client's operation.
// //
// See git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client.PrmContainerSetEACL.WithinSession. // See git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client.PrmContainerSetEACL.WithinSession.
//
// Deprecated: Use PrmContainerSetEACL.Session instead.
func (x *PrmContainerSetEACL) WithinSession(s session.Container) { func (x *PrmContainerSetEACL) WithinSession(s session.Container) {
x.Session = &s x.session = s
x.sessionSet = true
} }
// SetWaitParams specifies timeout params to complete operation. // SetWaitParams specifies timeout params to complete operation.
// If not provided the default one will be used. // If not provided the default one will be used.
// Panics if any of the wait params isn't positive. // Panics if any of the wait params isn't positive.
//
// Deprecated: Use PrmContainerSetEACL.WaitParams instead.
func (x *PrmContainerSetEACL) SetWaitParams(waitParams WaitParams) { func (x *PrmContainerSetEACL) SetWaitParams(waitParams WaitParams) {
waitParams.checkForPositive() waitParams.checkForPositive()
x.WaitParams = &waitParams x.waitParams = waitParams
x.waitParamsSet = true
} }
// PrmBalanceGet groups parameters of Balance operation. // PrmBalanceGet groups parameters of Balance operation.
@ -1778,8 +1638,6 @@ type Pool struct {
rebalanceParams rebalanceParameters rebalanceParams rebalanceParameters
clientBuilder clientBuilder clientBuilder clientBuilder
logger *zap.Logger logger *zap.Logger
maxObjectSize uint64
} }
type innerPool struct { type innerPool struct {
@ -1796,8 +1654,6 @@ const (
defaultHealthcheckTimeout = 4 * time.Second defaultHealthcheckTimeout = 4 * time.Second
defaultDialTimeout = 5 * time.Second defaultDialTimeout = 5 * time.Second
defaultStreamTimeout = 10 * time.Second defaultStreamTimeout = 10 * time.Second
defaultBufferMaxSizeForPut = 3 * 1024 * 1024 // 3 MB
) )
// NewPool creates connection pool using parameters. // NewPool creates connection pool using parameters.
@ -1857,7 +1713,7 @@ func (p *Pool) Dial(ctx context.Context) error {
} }
var st session.Object var st session.Object
err := initSessionForDuration(ctx, &st, clients[j], p.rebalanceParams.sessionExpirationDuration, *p.key, false) err := initSessionForDuration(ctx, &st, clients[j], p.rebalanceParams.sessionExpirationDuration, *p.key)
if err != nil { if err != nil {
clients[j].setUnhealthy() clients[j].setUnhealthy()
p.log(zap.WarnLevel, "failed to create frostfs session token for client", p.log(zap.WarnLevel, "failed to create frostfs session token for client",
@ -1865,7 +1721,7 @@ func (p *Pool) Dial(ctx context.Context) error {
continue continue
} }
_ = p.cache.Put(formCacheKey(addr, p.key, false), st) _ = p.cache.Put(formCacheKey(addr, p.key), st)
atLeastOneHealthy = true atLeastOneHealthy = true
} }
source := rand.NewSource(time.Now().UnixNano()) source := rand.NewSource(time.Now().UnixNano())
@ -1886,12 +1742,6 @@ func (p *Pool) Dial(ctx context.Context) error {
p.closedCh = make(chan struct{}) p.closedCh = make(chan struct{})
p.innerPools = inner p.innerPools = inner
ni, err := p.NetworkInfo(ctx)
if err != nil {
return fmt.Errorf("get network info for max object size: %w", err)
}
p.maxObjectSize = ni.MaxObjectSize()
go p.startRebalance(ctx) go p.startRebalance(ctx)
return nil return nil
} }
@ -1934,7 +1784,6 @@ func fillDefaultInitParams(params *InitParameters, cache *sessionCache) {
var prm wrapperPrm var prm wrapperPrm
prm.setAddress(addr) prm.setAddress(addr)
prm.setKey(*params.key) prm.setKey(*params.key)
prm.setLogger(params.logger)
prm.setDialTimeout(params.nodeDialTimeout) prm.setDialTimeout(params.nodeDialTimeout)
prm.setStreamTimeout(params.nodeStreamTimeout) prm.setStreamTimeout(params.nodeStreamTimeout)
prm.setErrorThreshold(params.errorThreshold) prm.setErrorThreshold(params.errorThreshold)
@ -2103,15 +1952,9 @@ func (p *innerPool) connection() (client, error) {
return nil, errors.New("no healthy client") return nil, errors.New("no healthy client")
} }
func formCacheKey(address string, key *ecdsa.PrivateKey, clientCut bool) string { func formCacheKey(address string, key *ecdsa.PrivateKey) string {
k := keys.PrivateKey{PrivateKey: *key} k := keys.PrivateKey{PrivateKey: *key}
return address + k.String()
stype := "server"
if clientCut {
stype = "client"
}
return address + stype + k.String()
} }
func (p *Pool) checkSessionTokenErr(err error, address string) bool { func (p *Pool) checkSessionTokenErr(err error, address string) bool {
@ -2127,7 +1970,7 @@ func (p *Pool) checkSessionTokenErr(err error, address string) bool {
return false return false
} }
func initSessionForDuration(ctx context.Context, dst *session.Object, c client, dur uint64, ownerKey ecdsa.PrivateKey, clientCut bool) error { func initSessionForDuration(ctx context.Context, dst *session.Object, c client, dur uint64, ownerKey ecdsa.PrivateKey) error {
ni, err := c.networkInfo(ctx, prmNetworkInfo{}) ni, err := c.networkInfo(ctx, prmNetworkInfo{})
if err != nil { if err != nil {
return err return err
@ -2145,26 +1988,24 @@ func initSessionForDuration(ctx context.Context, dst *session.Object, c client,
prm.setExp(exp) prm.setExp(exp)
prm.useKey(ownerKey) prm.useKey(ownerKey)
var (
id uuid.UUID
key frostfsecdsa.PublicKey
)
if clientCut {
id = uuid.New()
key = frostfsecdsa.PublicKey(ownerKey.PublicKey)
} else {
res, err := c.sessionCreate(ctx, prm) res, err := c.sessionCreate(ctx, prm)
if err != nil { if err != nil {
return err return err
} }
if err = id.UnmarshalBinary(res.id); err != nil {
var id uuid.UUID
err = id.UnmarshalBinary(res.id)
if err != nil {
return fmt.Errorf("invalid session token ID: %w", err) return fmt.Errorf("invalid session token ID: %w", err)
} }
if err = key.Decode(res.sessionKey); err != nil {
var key frostfsecdsa.PublicKey
err = key.Decode(res.sessionKey)
if err != nil {
return fmt.Errorf("invalid public session key: %w", err) return fmt.Errorf("invalid public session key: %w", err)
} }
}
dst.SetID(id) dst.SetID(id)
dst.SetAuthKey(&key) dst.SetAuthKey(&key)
@ -2189,8 +2030,6 @@ type callContext struct {
sessionCnr cid.ID sessionCnr cid.ID
sessionObjSet bool sessionObjSet bool
sessionObjs []oid.ID sessionObjs []oid.ID
sessionClientCut bool
} }
func (p *Pool) initCallContext(ctx *callContext, cfg prmCommon, prmCtx prmContext) error { func (p *Pool) initCallContext(ctx *callContext, cfg prmCommon, prmCtx prmContext) error {
@ -2227,12 +2066,12 @@ func (p *Pool) initCallContext(ctx *callContext, cfg prmCommon, prmCtx prmContex
// opens new session or uses cached one. // opens new session or uses cached one.
// Must be called only on initialized callContext with set sessionTarget. // Must be called only on initialized callContext with set sessionTarget.
func (p *Pool) openDefaultSession(ctx context.Context, cc *callContext) error { func (p *Pool) openDefaultSession(ctx context.Context, cc *callContext) error {
cacheKey := formCacheKey(cc.endpoint, cc.key, cc.sessionClientCut) cacheKey := formCacheKey(cc.endpoint, cc.key)
tok, ok := p.cache.Get(cacheKey) tok, ok := p.cache.Get(cacheKey)
if !ok { if !ok {
// init new session // init new session
err := initSessionForDuration(ctx, &tok, cc.client, p.stokenDuration, *cc.key, cc.sessionClientCut) err := initSessionForDuration(ctx, &tok, cc.client, p.stokenDuration, *cc.key)
if err != nil { if err != nil {
return fmt.Errorf("session API client: %w", err) return fmt.Errorf("session API client: %w", err)
} }
@ -2297,7 +2136,6 @@ func (p *Pool) PutObject(ctx context.Context, prm PrmObjectPut) (oid.ID, error)
p.fillAppropriateKey(&prm.prmCommon) p.fillAppropriateKey(&prm.prmCommon)
var ctxCall callContext var ctxCall callContext
ctxCall.sessionClientCut = prm.clientCut
if err := p.initCallContext(&ctxCall, prm.prmCommon, prmCtx); err != nil { if err := p.initCallContext(&ctxCall, prm.prmCommon, prmCtx); err != nil {
return oid.ID{}, fmt.Errorf("init call context: %w", err) return oid.ID{}, fmt.Errorf("init call context: %w", err)
} }
@ -2309,13 +2147,6 @@ func (p *Pool) PutObject(ctx context.Context, prm PrmObjectPut) (oid.ID, error)
} }
} }
if prm.clientCut {
var ni netmap.NetworkInfo
ni.SetCurrentEpoch(p.cache.Epoch())
ni.SetMaxObjectSize(p.maxObjectSize) // we want to use initial max object size in PayloadSizeLimiter
prm.setNetworkInfo(ni)
}
id, err := ctxCall.client.objectPut(ctx, prm) id, err := ctxCall.client.objectPut(ctx, prm)
if err != nil { if err != nil {
// removes session token from cache in case of token error // removes session token from cache in case of token error
@ -2807,15 +2638,6 @@ func (p *Pool) NetworkInfo(ctx context.Context) (netmap.NetworkInfo, error) {
func (p *Pool) Close() { func (p *Pool) Close() {
p.cancel() p.cancel()
<-p.closedCh <-p.closedCh
// close all clients
for _, pools := range p.innerPools {
for _, cli := range pools.clients {
if cli.isDialed() {
_ = cli.close()
}
}
}
} }
// SyncContainerWithNetwork applies network configuration received via // SyncContainerWithNetwork applies network configuration received via

View file

@ -106,7 +106,7 @@ func TestBuildPoolOneNodeFailed(t *testing.T) {
if err != nil { if err != nil {
return false return false
} }
st, _ := clientPool.cache.Get(formCacheKey(cp.address(), clientPool.key, false)) st, _ := clientPool.cache.Get(formCacheKey(cp.address(), clientPool.key))
return st.AssertAuthKey(&expectedAuthKey) return st.AssertAuthKey(&expectedAuthKey)
} }
require.Never(t, condition, 900*time.Millisecond, 100*time.Millisecond) require.Never(t, condition, 900*time.Millisecond, 100*time.Millisecond)
@ -141,7 +141,7 @@ func TestOneNode(t *testing.T) {
cp, err := pool.connection() cp, err := pool.connection()
require.NoError(t, err) require.NoError(t, err)
st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key, false)) st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key))
expectedAuthKey := frostfsecdsa.PublicKey(key1.PublicKey) expectedAuthKey := frostfsecdsa.PublicKey(key1.PublicKey)
require.True(t, st.AssertAuthKey(&expectedAuthKey)) require.True(t, st.AssertAuthKey(&expectedAuthKey))
} }
@ -171,7 +171,7 @@ func TestTwoNodes(t *testing.T) {
cp, err := pool.connection() cp, err := pool.connection()
require.NoError(t, err) require.NoError(t, err)
st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key, false)) st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key))
require.True(t, assertAuthKeyForAny(st, clientKeys)) require.True(t, assertAuthKeyForAny(st, clientKeys))
} }
@ -226,7 +226,7 @@ func TestOneOfTwoFailed(t *testing.T) {
for i := 0; i < 5; i++ { for i := 0; i < 5; i++ {
cp, err := pool.connection() cp, err := pool.connection()
require.NoError(t, err) require.NoError(t, err)
st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key, false)) st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key))
require.True(t, assertAuthKeyForAny(st, clientKeys)) require.True(t, assertAuthKeyForAny(st, clientKeys))
} }
} }
@ -296,7 +296,7 @@ func TestSessionCache(t *testing.T) {
// cache must contain session token // cache must contain session token
cp, err := pool.connection() cp, err := pool.connection()
require.NoError(t, err) require.NoError(t, err)
st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key, false)) st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key))
require.True(t, st.AssertAuthKey(&expectedAuthKey)) require.True(t, st.AssertAuthKey(&expectedAuthKey))
var prm PrmObjectGet var prm PrmObjectGet
@ -309,7 +309,7 @@ func TestSessionCache(t *testing.T) {
// cache must not contain session token // cache must not contain session token
cp, err = pool.connection() cp, err = pool.connection()
require.NoError(t, err) require.NoError(t, err)
_, ok := pool.cache.Get(formCacheKey(cp.address(), pool.key, false)) _, ok := pool.cache.Get(formCacheKey(cp.address(), pool.key))
require.False(t, ok) require.False(t, ok)
var prm2 PrmObjectPut var prm2 PrmObjectPut
@ -321,7 +321,7 @@ func TestSessionCache(t *testing.T) {
// cache must contain session token // cache must contain session token
cp, err = pool.connection() cp, err = pool.connection()
require.NoError(t, err) require.NoError(t, err)
st, _ = pool.cache.Get(formCacheKey(cp.address(), pool.key, false)) st, _ = pool.cache.Get(formCacheKey(cp.address(), pool.key))
require.True(t, st.AssertAuthKey(&expectedAuthKey)) require.True(t, st.AssertAuthKey(&expectedAuthKey))
} }
@ -365,7 +365,7 @@ func TestPriority(t *testing.T) {
firstNode := func() bool { firstNode := func() bool {
cp, err := pool.connection() cp, err := pool.connection()
require.NoError(t, err) require.NoError(t, err)
st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key, false)) st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key))
return st.AssertAuthKey(&expectedAuthKey1) return st.AssertAuthKey(&expectedAuthKey1)
} }
@ -373,7 +373,7 @@ func TestPriority(t *testing.T) {
secondNode := func() bool { secondNode := func() bool {
cp, err := pool.connection() cp, err := pool.connection()
require.NoError(t, err) require.NoError(t, err)
st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key, false)) st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key))
return st.AssertAuthKey(&expectedAuthKey2) return st.AssertAuthKey(&expectedAuthKey2)
} }
require.Never(t, secondNode, time.Second, 200*time.Millisecond) require.Never(t, secondNode, time.Second, 200*time.Millisecond)
@ -410,7 +410,7 @@ func TestSessionCacheWithKey(t *testing.T) {
// cache must contain session token // cache must contain session token
cp, err := pool.connection() cp, err := pool.connection()
require.NoError(t, err) require.NoError(t, err)
st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key, false)) st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key))
require.True(t, st.AssertAuthKey(&expectedAuthKey)) require.True(t, st.AssertAuthKey(&expectedAuthKey))
var prm PrmObjectDelete var prm PrmObjectDelete
@ -420,7 +420,7 @@ func TestSessionCacheWithKey(t *testing.T) {
err = pool.DeleteObject(ctx, prm) err = pool.DeleteObject(ctx, prm)
require.NoError(t, err) require.NoError(t, err)
st, _ = pool.cache.Get(formCacheKey(cp.address(), anonKey, false)) st, _ = pool.cache.Get(formCacheKey(cp.address(), anonKey))
require.True(t, st.AssertAuthKey(&expectedAuthKey)) require.True(t, st.AssertAuthKey(&expectedAuthKey))
} }
@ -523,44 +523,6 @@ func TestStatusMonitor(t *testing.T) {
require.Equal(t, uint64(count), monitor.overallErrorRate()) require.Equal(t, uint64(count), monitor.overallErrorRate())
require.Equal(t, uint32(1), monitor.currentErrorRate()) require.Equal(t, uint32(1), monitor.currentErrorRate())
t.Run("healthy status", func(t *testing.T) {
cases := []struct {
action func(*clientStatusMonitor)
status uint32
isDialed bool
isHealthy bool
description string
}{
{
action: func(m *clientStatusMonitor) { m.setUnhealthyOnDial() },
status: statusUnhealthyOnDial,
isDialed: false,
isHealthy: false,
description: "set unhealthy on dial",
},
{
action: func(m *clientStatusMonitor) { m.setUnhealthy() },
status: statusUnhealthyOnRequest,
isDialed: true,
isHealthy: false,
description: "set unhealthy on request",
},
{
action: func(m *clientStatusMonitor) { m.setHealthy() },
status: statusHealthy,
isDialed: true,
isHealthy: true,
description: "set healthy",
},
}
for _, tc := range cases {
tc.action(&monitor)
require.Equal(t, tc.status, monitor.healthy.Load())
require.Equal(t, tc.isDialed, monitor.isDialed())
require.Equal(t, tc.isHealthy, monitor.isHealthy())
}
})
} }
func TestHandleError(t *testing.T) { func TestHandleError(t *testing.T) {

View file

@ -3,7 +3,6 @@ package tree
import ( import (
"context" "context"
"crypto/tls" "crypto/tls"
"errors"
"fmt" "fmt"
"sync" "sync"
@ -23,11 +22,6 @@ type treeClient struct {
healthy bool healthy bool
} }
var (
// ErrUnhealthyEndpoint is returned when client in the pool considered unavailable.
ErrUnhealthyEndpoint = errors.New("unhealthy endpoint")
)
// newTreeClient creates new tree client with auto dial. // newTreeClient creates new tree client with auto dial.
func newTreeClient(addr string, opts ...grpc.DialOption) *treeClient { func newTreeClient(addr string, opts ...grpc.DialOption) *treeClient {
return &treeClient{ return &treeClient{
@ -108,7 +102,7 @@ func (c *treeClient) serviceClient() (grpcService.TreeServiceClient, error) {
defer c.mu.RUnlock() defer c.mu.RUnlock()
if c.conn == nil || !c.healthy { if c.conn == nil || !c.healthy {
return nil, fmt.Errorf("%w: '%s'", ErrUnhealthyEndpoint, c.address) return nil, fmt.Errorf("unhealthy endpoint: '%s'", c.address)
} }
return c.service, nil return c.service, nil

View file

@ -32,9 +32,6 @@ var (
// ErrNodeAccessDenied is returned from Tree service in case of access denied error. // ErrNodeAccessDenied is returned from Tree service in case of access denied error.
ErrNodeAccessDenied = errors.New("access denied") ErrNodeAccessDenied = errors.New("access denied")
// errNodeEmpty is used to trigger retry when 'GetNodeByPath' return empty result.
errNodeEmptyResult = errors.New("empty result")
) )
// client represents virtual connection to the single FrostFS tree service from which Pool is formed. // client represents virtual connection to the single FrostFS tree service from which Pool is formed.
@ -299,15 +296,8 @@ func (p *Pool) GetNodes(ctx context.Context, prm GetNodesParams) ([]*grpcService
var resp *grpcService.GetNodeByPathResponse var resp *grpcService.GetNodeByPathResponse
if err := p.requestWithRetry(func(client grpcService.TreeServiceClient) (inErr error) { if err := p.requestWithRetry(func(client grpcService.TreeServiceClient) (inErr error) {
resp, inErr = client.GetNodeByPath(ctx, request) resp, inErr = client.GetNodeByPath(ctx, request)
// Pool wants to do retry 'GetNodeByPath' request if result is empty.
// Empty result is expected due to delayed tree service sync.
// Return an error there to trigger retry and ignore it after,
// to keep compatibility with 'GetNodeByPath' implementation.
if inErr == nil && len(resp.Body.Nodes) == 0 {
return errNodeEmptyResult
}
return handleError("failed to get node by path", inErr) return handleError("failed to get node by path", inErr)
}); err != nil && !errors.Is(err, errNodeEmptyResult) { }); err != nil {
return nil, err return nil, err
} }
@ -730,7 +720,7 @@ func (p *Pool) setStartIndices(i, j int) {
func (p *Pool) requestWithRetry(fn func(client grpcService.TreeServiceClient) error) error { func (p *Pool) requestWithRetry(fn func(client grpcService.TreeServiceClient) error) error {
var ( var (
err, finErr error err error
cl grpcService.TreeServiceClient cl grpcService.TreeServiceClient
) )
@ -750,44 +740,16 @@ func (p *Pool) requestWithRetry(fn func(client grpcService.TreeServiceClient) er
} }
return err return err
} }
finErr = finalError(finErr, err)
p.log(zap.DebugLevel, "tree request error", zap.String("address", p.innerPools[indexI].clients[indexJ].endpoint()), zap.Error(err)) p.log(zap.DebugLevel, "tree request error", zap.String("address", p.innerPools[indexI].clients[indexJ].endpoint()), zap.Error(err))
} }
startJ = 0 startJ = 0
} }
return finErr return err
} }
func shouldTryAgain(err error) bool { func shouldTryAgain(err error) bool {
return !(err == nil || errors.Is(err, ErrNodeAccessDenied)) return !(err == nil ||
} errors.Is(err, ErrNodeNotFound) ||
errors.Is(err, ErrNodeAccessDenied))
func prioErr(err error) int {
switch {
case err == nil:
return -1
case errors.Is(err, ErrNodeAccessDenied):
return 100
case errors.Is(err, ErrNodeNotFound) ||
errors.Is(err, errNodeEmptyResult):
return 200
case errors.Is(err, ErrUnhealthyEndpoint):
return 300
default:
return 500
}
}
func finalError(current, candidate error) error {
if current == nil || candidate == nil {
return candidate
}
// lower priority error is more desirable
if prioErr(candidate) < prioErr(current) {
return candidate
}
return current
} }

View file

@ -156,43 +156,6 @@ func TestRetry(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
checkIndicesAndReset(t, p, 0, 0) checkIndicesAndReset(t, p, 0, 0)
}) })
t.Run("error empty result", func(t *testing.T) {
errNodes, index := 2, 0
err := p.requestWithRetry(func(client grpcService.TreeServiceClient) error {
if index < errNodes {
index++
return errNodeEmptyResult
}
return nil
})
require.NoError(t, err)
checkIndicesAndReset(t, p, 0, errNodes)
})
t.Run("error not found", func(t *testing.T) {
errNodes, index := 2, 0
err := p.requestWithRetry(func(client grpcService.TreeServiceClient) error {
if index < errNodes {
index++
return ErrNodeNotFound
}
return nil
})
require.NoError(t, err)
checkIndicesAndReset(t, p, 0, errNodes)
})
t.Run("error access denied", func(t *testing.T) {
var index int
err := p.requestWithRetry(func(client grpcService.TreeServiceClient) error {
index++
return ErrNodeAccessDenied
})
require.ErrorIs(t, err, ErrNodeAccessDenied)
require.Equal(t, 1, index)
checkIndicesAndReset(t, p, 0, 0)
})
} }
func TestRebalance(t *testing.T) { func TestRebalance(t *testing.T) {

View file

@ -1,10 +1,9 @@
package session_test package session_test
import ( import (
"crypto/rand"
"fmt" "fmt"
"math" "math"
mrand "math/rand" "math/rand"
"testing" "testing"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/refs" "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/refs"
@ -397,7 +396,7 @@ func TestContainer_AppliedTo(t *testing.T) {
func TestContainer_InvalidAt(t *testing.T) { func TestContainer_InvalidAt(t *testing.T) {
var x session.Container var x session.Container
nbf := mrand.Uint64() nbf := rand.Uint64()
if nbf == math.MaxUint64 { if nbf == math.MaxUint64 {
nbf-- nbf--
} }

View file

@ -1,7 +1,7 @@
package user_test package user_test
import ( import (
"crypto/rand" "math/rand"
"testing" "testing"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/refs" "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/refs"