Compare commits

...

33 commits

Author SHA1 Message Date
fc4551b843 [#172] pool: Use priority of errors in tree pool
When retry happens, use priority map to decide
which error to return. Consider network errors
less desirable than business logic errors.

Signed-off-by: Alex Vanin <a.vanin@yadro.com>
2023-10-11 12:00:34 +03:00
eb5288f4a5 [#172] pool: Do more retries on unexpected tree service responses
1. Try its best by looking for nodes during 'GetNodeByPath'
2. Retry on 'tree not found' and other not found errors

Signed-off-by: Alex Vanin <a.vanin@yadro.com>
2023-10-06 11:12:48 +03:00
60463871db [#171] pool: Add test for healthy status monitor
Signed-off-by: Alex Vanin <a.vanin@yadro.com>
2023-10-03 19:47:22 +03:00
8a04638749 [#171] pool: Close only dialed connections
To avoid panics during close operation, close
only dialed connections.

Signed-off-by: Alex Vanin <a.vanin@yadro.com>
2023-10-03 19:47:22 +03:00
ddbfb758c9 [#171] pool: Use dial status to close connections during restarts
Every client restart, pool creates new client instance. If client
failed due to dial error, there was no prior connection and go
routine on a server side. If client failed due to communication
or business logic errors, then server side maintains connection and
client should close it to avoid routine and connection leak.

Dialing is a part of healthcheck, so health status is now a enum
of three values:
- unhealthy due to dial fail,
- unhealthy due to transmission fail,
- healthy.

Signed-off-by: Alex Vanin <a.vanin@yadro.com>
2023-10-03 17:00:36 +03:00
d71a0e0755 [#88] netmap: use bool, fix hrw_sort tests
Signed-off-by: Andrew Danilin <andnilin@gmail.com>
2023-10-03 07:05:03 +00:00
163b3e1961 [#88] netmap: fix min aggregator bug, add tests
Signed-off-by: Andrew Danilin <andnilin@gmail.com>
2023-10-03 07:05:03 +00:00
84b9d29fc9 [#170] checksum: Use constant mapping for checksum types
Signed-off-by: Airat Arifullin <a.arifullin@yadro.com>
2023-09-28 17:20:24 +03:00
99c273f499 [#169] pool: Close inner pools during close routine
Some apps do not reuse pool instance and expect that
`pool.Close()` free resources. But it didn't actually
close inner SDK clients, so it leads to goroutine leak
in storage.

Signed-off-by: Alex Vanin <a.vanin@yadro.com>
2023-09-20 12:16:13 +03:00
555ccc63b2 [#167] netmap: Allow to select insufficient number of nodes
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2023-09-15 14:47:54 +03:00
0550438b53 [#167] netmap/tests: Add replica to invalid tests
Make sure we fail exactly because of the reason specified.

Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2023-09-15 14:33:44 +03:00
c899163860 [#167] netmap/tests: Add json file name to the test output
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2023-09-15 14:33:44 +03:00
ac8fc6d440 [#162] netmap: Allow to parse single unnamed selectors
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2023-09-11 15:22:24 +03:00
0a0b590df3 [#162] Fix pre-commit warnings
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2023-09-11 15:21:35 +03:00
4df642e941 [#162] netmap: Fix possible panic
Placement policy is unvalidated external input.
Under no circumstances should we panic here.

Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2023-09-11 15:19:56 +03:00
8bc64e088e [#161] .golangci.yml: Reenable deprecated usage warnings
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2023-09-08 17:17:02 +03:00
49ad985cad [#161] *: Do not use math/rand.Read()
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2023-09-08 17:17:02 +03:00
aa12d8c6a6 [#121] client: Make PrmObjectHash fields public
* Introduce buildRequest for PrmObjectHash

Signed-off-by: Airat Arifullin <a.arifullin@yadro.com>
2023-09-08 13:36:45 +00:00
303508328a [#121] pool: Refactor PrmSessionCreate usage
Signed-off-by: Airat Arifullin <a.arifullin@yadro.com>
2023-09-05 17:32:03 +03:00
55699d1480 [#121] client: Make PrmSessionCreate fields public
Signed-off-by: Airat Arifullin <a.arifullin@yadro.com>
2023-09-05 17:30:42 +03:00
55a1f23e71 [#121] client: Make PrmEndpointInfo, PrmNetworkInfo fields public
Signed-off-by: Airat Arifullin <a.arifullin@yadro.com>
2023-09-04 19:55:23 +03:00
291a71ba84 [#121] client: Make PrmAnnounceSpace fields public
Signed-off-by: Airat Arifullin <a.arifullin@yadro.com>
2023-09-04 19:53:34 +03:00
5a471e5002 [#121] client: Make PrmObjectDelete fields public
* Introduce buildRequest for PrmObjectDelete
* Refactor the usage of these params in pool

Signed-off-by: Airat Arifullin <a.arifullin@yadro.com>
2023-09-04 14:14:22 +00:00
b5fe52d6bd [#150] policy: Check for redundant selectors and filters
Signed-off-by: Anton Nikiforov <an.nikiforov@yadro.com>
2023-08-29 14:16:57 +03:00
84e7e69f98 [#121] client: Make PrmObjectGet/Head/GetRange fields public
* Remove common PrmObjectRead structure
* Introduce buildRequest for PrmObjectGet/Head/GetRange
* Refactor the usage of these params in pool

Signed-off-by: Airat Arifullin <a.arifullin@yadro.com>
2023-08-28 11:26:57 +03:00
46a214d065 [#149] pool: Configure homomorphic hash and buffer size
Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
2023-08-25 09:45:15 +03:00
202412230a [#115] pool: Drop part buffer pool
Tests showed that using part buffer pool doesn't save memory a lot.
Especially on big parts.
Probably we can use pool only for small parts
after adding buffer in payloadSizeLimiter

Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
2023-08-21 12:03:03 +03:00
3cb3841073 [#115] pool: Try putSingle if possible
Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
2023-08-21 12:03:03 +03:00
faeeeab87a [#114] pool: Don't use part buffers when client cut is off
Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
2023-08-21 12:02:40 +03:00
cae215534f [#114] pool: Fix linter errors
Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
2023-08-21 12:02:40 +03:00
518fb79bc0 [#114] pool: Support client cut with memory limiter
Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
2023-08-21 12:02:40 +03:00
342524159a [#121] pool: Make PrmContainerSetEACL fields public
Signed-off-by: Airat Arifullin <a.arifullin@yadro.com>
2023-08-21 10:33:19 +03:00
22978303f8 [#121] clientt: Make PrmContainerSetEACL fields public
Signed-off-by: Airat Arifullin <a.arifullin@yadro.com>
2023-08-18 18:05:47 +03:00
52 changed files with 1477 additions and 666 deletions

View file

@ -25,7 +25,7 @@ linters-settings:
# report about shadowed variables
check-shadowing: false
staticcheck:
checks: ["all", "-SA1019"] # TODO Enable SA1019 after deprecated warning are fixed.
checks: ["all"]
funlen:
lines: 80 # default 60
statements: 60 # default 40

View file

@ -24,17 +24,17 @@ type Checksum refs.Checksum
// Type represents the enumeration
// of checksum types.
type Type uint8
type Type refs.ChecksumType
const (
// Unknown is an undefined checksum type.
Unknown Type = iota
Unknown Type = Type(refs.UnknownChecksum)
// SHA256 is a SHA256 checksum type.
SHA256
SHA256 = Type(refs.SHA256)
// TZ is a Tillich-Zémor checksum type.
TZ
TZ = Type(refs.TillichZemor)
)
// ReadFromV2 reads Checksum from the refs.Checksum message. Checks if the

View file

@ -2,9 +2,9 @@ package checksum
import (
"bytes"
"crypto/rand"
"crypto/sha256"
"fmt"
"math/rand"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/refs"
)

View file

@ -1,8 +1,8 @@
package checksumtest
import (
"crypto/rand"
"crypto/sha256"
"math/rand"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/checksum"
)
@ -11,7 +11,7 @@ import (
func Checksum() checksum.Checksum {
var cs [sha256.Size]byte
rand.Read(cs[:])
_, _ = rand.Read(cs[:])
var x checksum.Checksum

View file

@ -47,6 +47,8 @@ func writeXHeadersToMeta(xHeaders []string, h *v2session.RequestMetaHeader) {
return
}
// TODO (aarifullin): remove the panic when all client parameters will check XHeaders
// within buildRequest invocation.
if len(xHeaders)%2 != 0 {
panic("slice of X-Headers with odd length")
}

View file

@ -18,20 +18,20 @@ import (
// PrmContainerSetEACL groups parameters of ContainerSetEACL operation.
type PrmContainerSetEACL struct {
prmCommonMeta
// FrostFS request X-Headers.
XHeaders []string
tableSet bool
table eacl.Table
Table *eacl.Table
sessionSet bool
session session.Container
Session *session.Container
}
// SetTable sets eACL table structure to be set for the container.
// Required parameter.
//
// Deprecated: Use PrmContainerSetEACL.Table instead.
func (x *PrmContainerSetEACL) SetTable(table eacl.Table) {
x.table = table
x.tableSet = true
x.Table = &table
}
// WithinSession specifies session within which extended ACL of the container
@ -45,17 +45,22 @@ func (x *PrmContainerSetEACL) SetTable(table eacl.Table) {
// for which extended ACL is going to be set
// - session operation MUST be session.VerbContainerSetEACL (ForVerb)
// - token MUST be signed using private key of the owner of the container to be saved
//
// Deprecated: Use PrmContainerSetEACL.Session instead.
func (x *PrmContainerSetEACL) WithinSession(s session.Container) {
x.session = s
x.sessionSet = true
x.Session = &s
}
func (x *PrmContainerSetEACL) buildRequest(c *Client) (*v2container.SetExtendedACLRequest, error) {
if !x.tableSet {
if x.Table == nil {
return nil, errorEACLTableNotSet
}
eaclV2 := x.table.ToV2()
if len(x.XHeaders)%2 != 0 {
return nil, errorInvalidXHeaders
}
eaclV2 := x.Table.ToV2()
var sig frostfscrypto.Signature
@ -72,11 +77,11 @@ func (x *PrmContainerSetEACL) buildRequest(c *Client) (*v2container.SetExtendedA
reqBody.SetSignature(&sigv2)
var meta v2session.RequestMetaHeader
writeXHeadersToMeta(x.prmCommonMeta.xHeaders, &meta)
writeXHeadersToMeta(x.XHeaders, &meta)
if x.sessionSet {
if x.Session != nil {
var tokv2 v2session.Token
x.session.WriteToV2(&tokv2)
x.Session.WriteToV2(&tokv2)
meta.SetSessionToken(&tokv2)
}

View file

@ -14,27 +14,29 @@ import (
// PrmAnnounceSpace groups parameters of ContainerAnnounceUsedSpace operation.
type PrmAnnounceSpace struct {
prmCommonMeta
XHeaders []string
announcements []container.SizeEstimation
Announcements []container.SizeEstimation
}
// SetValues sets values describing volume of space that is used for the container objects.
// Required parameter. Must not be empty.
//
// Must not be mutated before the end of the operation.
//
// Deprecated: Use PrmAnnounceSpace.Announcements instead.
func (x *PrmAnnounceSpace) SetValues(vs []container.SizeEstimation) {
x.announcements = vs
x.Announcements = vs
}
func (x *PrmAnnounceSpace) buildRequest(c *Client) (*v2container.AnnounceUsedSpaceRequest, error) {
if len(x.announcements) == 0 {
if len(x.Announcements) == 0 {
return nil, errorMissingAnnouncements
}
v2announce := make([]v2container.UsedSpaceAnnouncement, len(x.announcements))
for i := range x.announcements {
x.announcements[i].WriteToV2(&v2announce[i])
v2announce := make([]v2container.UsedSpaceAnnouncement, len(x.Announcements))
for i := range x.Announcements {
x.Announcements[i].WriteToV2(&v2announce[i])
}
reqBody := new(v2container.AnnounceUsedSpaceRequestBody)

View file

@ -16,12 +16,12 @@ import (
// PrmEndpointInfo groups parameters of EndpointInfo operation.
type PrmEndpointInfo struct {
prmCommonMeta
XHeaders []string
}
func (x *PrmEndpointInfo) buildRequest(c *Client) (*v2netmap.LocalNodeInfoRequest, error) {
meta := new(v2session.RequestMetaHeader)
writeXHeadersToMeta(x.xHeaders, meta)
writeXHeadersToMeta(x.XHeaders, meta)
req := new(v2netmap.LocalNodeInfoRequest)
req.SetBody(new(v2netmap.LocalNodeInfoRequestBody))
@ -112,12 +112,12 @@ func (c *Client) EndpointInfo(ctx context.Context, prm PrmEndpointInfo) (*ResEnd
// PrmNetworkInfo groups parameters of NetworkInfo operation.
type PrmNetworkInfo struct {
prmCommonMeta
XHeaders []string
}
func (x PrmNetworkInfo) buildRequest(c *Client) (*v2netmap.NetworkInfoRequest, error) {
meta := new(v2session.RequestMetaHeader)
writeXHeadersToMeta(x.xHeaders, meta)
writeXHeadersToMeta(x.XHeaders, meta)
var req v2netmap.NetworkInfoRequest
req.SetBody(new(v2netmap.NetworkInfoRequestBody))

View file

@ -21,71 +21,25 @@ import (
// PrmObjectDelete groups parameters of ObjectDelete operation.
type PrmObjectDelete struct {
meta v2session.RequestMetaHeader
XHeaders []string
body v2object.DeleteRequestBody
BearerToken *bearer.Token
addr v2refs.Address
Session *session.Object
keySet bool
key ecdsa.PrivateKey
}
ContainerID *cid.ID
// WithinSession specifies session within which object should be read.
//
// Creator of the session acquires the authorship of the request.
// This may affect the execution of an operation (e.g. access control).
//
// Must be signed.
func (x *PrmObjectDelete) WithinSession(t session.Object) {
var tv2 v2session.Token
t.WriteToV2(&tv2)
ObjectID *oid.ID
x.meta.SetSessionToken(&tv2)
}
// WithBearerToken attaches bearer token to be used for the operation.
//
// If set, underlying eACL rules will be used in access control.
//
// Must be signed.
func (x *PrmObjectDelete) WithBearerToken(t bearer.Token) {
var v2token acl.BearerToken
t.WriteToV2(&v2token)
x.meta.SetBearerToken(&v2token)
}
// FromContainer specifies FrostFS container of the object.
// Required parameter.
func (x *PrmObjectDelete) FromContainer(id cid.ID) {
var cidV2 v2refs.ContainerID
id.WriteToV2(&cidV2)
x.addr.SetContainerID(&cidV2)
}
// ByID specifies identifier of the requested object.
// Required parameter.
func (x *PrmObjectDelete) ByID(id oid.ID) {
var idV2 v2refs.ObjectID
id.WriteToV2(&idV2)
x.addr.SetObjectID(&idV2)
Key *ecdsa.PrivateKey
}
// UseKey specifies private key to sign the requests.
// If key is not provided, then Client default key is used.
func (x *PrmObjectDelete) UseKey(key ecdsa.PrivateKey) {
x.keySet = true
x.key = key
}
// WithXHeaders specifies list of extended headers (string key-value pairs)
// to be attached to the request. Must have an even length.
//
// Slice must not be mutated until the operation completes.
func (x *PrmObjectDelete) WithXHeaders(hs ...string) {
writeXHeadersToMeta(hs, &x.meta)
// Deprecated: Use PrmObjectDelete.Key instead.
func (prm *PrmObjectDelete) UseKey(key ecdsa.PrivateKey) {
prm.Key = &key
}
// ResObjectDelete groups resulting values of ObjectDelete operation.
@ -100,6 +54,54 @@ func (x ResObjectDelete) Tombstone() oid.ID {
return x.tomb
}
func (prm *PrmObjectDelete) buildRequest(c *Client) (*v2object.DeleteRequest, error) {
if prm.ContainerID == nil {
return nil, errorMissingContainer
}
if prm.ObjectID == nil {
return nil, errorMissingObject
}
if len(prm.XHeaders)%2 != 0 {
return nil, errorInvalidXHeaders
}
meta := new(v2session.RequestMetaHeader)
writeXHeadersToMeta(prm.XHeaders, meta)
if prm.BearerToken != nil {
v2BearerToken := new(acl.BearerToken)
prm.BearerToken.WriteToV2(v2BearerToken)
meta.SetBearerToken(v2BearerToken)
}
if prm.Session != nil {
v2SessionToken := new(v2session.Token)
prm.Session.WriteToV2(v2SessionToken)
meta.SetSessionToken(v2SessionToken)
}
addr := new(v2refs.Address)
cnrV2 := new(v2refs.ContainerID)
prm.ContainerID.WriteToV2(cnrV2)
addr.SetContainerID(cnrV2)
objV2 := new(v2refs.ObjectID)
prm.ObjectID.WriteToV2(objV2)
addr.SetObjectID(objV2)
body := new(v2object.DeleteRequestBody)
body.SetAddress(addr)
req := new(v2object.DeleteRequest)
req.SetBody(body)
c.prepareRequest(req, meta)
return req, nil
}
// ObjectDelete marks an object for deletion from the container using FrostFS API protocol.
// As a marker, a special unit called a tombstone is placed in the container.
// It confirms the user's intent to delete the object, and is itself a container object.
@ -124,32 +126,22 @@ func (x ResObjectDelete) Tombstone() oid.ID {
// - *apistatus.ObjectLocked;
// - *apistatus.SessionTokenExpired.
func (c *Client) ObjectDelete(ctx context.Context, prm PrmObjectDelete) (*ResObjectDelete, error) {
switch {
case prm.addr.GetContainerID() == nil:
return nil, errorMissingContainer
case prm.addr.GetObjectID() == nil:
return nil, errorMissingObject
req, err := prm.buildRequest(c)
if err != nil {
return nil, err
}
// form request body
prm.body.SetAddress(&prm.addr)
// form request
var req v2object.DeleteRequest
req.SetBody(&prm.body)
c.prepareRequest(&req, &prm.meta)
key := c.prm.key
if prm.keySet {
key = prm.key
if prm.Key != nil {
key = *prm.Key
}
err := signature.SignServiceMessage(&key, &req)
err = signature.SignServiceMessage(&key, req)
if err != nil {
return nil, fmt.Errorf("sign request: %w", err)
}
resp, err := rpcapi.DeleteObject(&c.c, &req, client.WithContext(ctx))
resp, err := rpcapi.DeleteObject(&c.c, req, client.WithContext(ctx))
if err != nil {
return nil, err
}

View file

@ -22,77 +22,76 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session"
)
// shared parameters of GET/HEAD/RANGE.
type prmObjectRead struct {
meta v2session.RequestMetaHeader
raw bool
addr v2refs.Address
}
// WithXHeaders specifies list of extended headers (string key-value pairs)
// to be attached to the request. Must have an even length.
//
// Slice must not be mutated until the operation completes.
func (x *prmObjectRead) WithXHeaders(hs ...string) {
writeXHeadersToMeta(hs, &x.meta)
}
// MarkRaw marks an intent to read physically stored object.
func (x *prmObjectRead) MarkRaw() {
x.raw = true
}
// MarkLocal tells the server to execute the operation locally.
func (x *prmObjectRead) MarkLocal() {
x.meta.SetTTL(1)
}
// WithinSession specifies session within which object should be read.
//
// Creator of the session acquires the authorship of the request.
// This may affect the execution of an operation (e.g. access control).
//
// Must be signed.
func (x *prmObjectRead) WithinSession(t session.Object) {
var tokv2 v2session.Token
t.WriteToV2(&tokv2)
x.meta.SetSessionToken(&tokv2)
}
// WithBearerToken attaches bearer token to be used for the operation.
//
// If set, underlying eACL rules will be used in access control.
//
// Must be signed.
func (x *prmObjectRead) WithBearerToken(t bearer.Token) {
var v2token acl.BearerToken
t.WriteToV2(&v2token)
x.meta.SetBearerToken(&v2token)
}
// FromContainer specifies FrostFS container of the object.
// Required parameter.
func (x *prmObjectRead) FromContainer(id cid.ID) {
var cnrV2 v2refs.ContainerID
id.WriteToV2(&cnrV2)
x.addr.SetContainerID(&cnrV2)
}
// ByID specifies identifier of the requested object.
// Required parameter.
func (x *prmObjectRead) ByID(id oid.ID) {
var objV2 v2refs.ObjectID
id.WriteToV2(&objV2)
x.addr.SetObjectID(&objV2)
}
// PrmObjectGet groups parameters of ObjectGetInit operation.
type PrmObjectGet struct {
prmObjectRead
XHeaders []string
key *ecdsa.PrivateKey
BearerToken *bearer.Token
Session *session.Object
Raw bool
Local bool
ContainerID *cid.ID
ObjectID *oid.ID
Key *ecdsa.PrivateKey
}
func (prm *PrmObjectGet) buildRequest(c *Client) (*v2object.GetRequest, error) {
if prm.ContainerID == nil {
return nil, errorMissingContainer
}
if prm.ObjectID == nil {
return nil, errorMissingObject
}
if len(prm.XHeaders)%2 != 0 {
return nil, errorInvalidXHeaders
}
meta := new(v2session.RequestMetaHeader)
writeXHeadersToMeta(prm.XHeaders, meta)
if prm.BearerToken != nil {
v2BearerToken := new(acl.BearerToken)
prm.BearerToken.WriteToV2(v2BearerToken)
meta.SetBearerToken(v2BearerToken)
}
if prm.Session != nil {
v2SessionToken := new(v2session.Token)
prm.Session.WriteToV2(v2SessionToken)
meta.SetSessionToken(v2SessionToken)
}
if prm.Local {
meta.SetTTL(1)
}
addr := new(v2refs.Address)
cnrV2 := new(v2refs.ContainerID)
prm.ContainerID.WriteToV2(cnrV2)
addr.SetContainerID(cnrV2)
objV2 := new(v2refs.ObjectID)
prm.ObjectID.WriteToV2(objV2)
addr.SetObjectID(objV2)
body := new(v2object.GetRequestBody)
body.SetRaw(prm.Raw)
body.SetAddress(addr)
req := new(v2object.GetRequest)
req.SetBody(body)
c.prepareRequest(req, meta)
return req, nil
}
// ResObjectGet groups the final result values of ObjectGetInit operation.
@ -122,8 +121,10 @@ type ObjectReader struct {
// UseKey specifies private key to sign the requests.
// If key is not provided, then Client default key is used.
func (x *PrmObjectGet) UseKey(key ecdsa.PrivateKey) {
x.key = &key
//
// Deprecated: Use PrmObjectGet.Key instead.
func (prm *PrmObjectGet) UseKey(key ecdsa.PrivateKey) {
prm.Key = &key
}
// ReadHeader reads header of the object. Result means success.
@ -299,39 +300,24 @@ func (x *ObjectReader) Read(p []byte) (int, error) {
// Returns an error if parameters are set incorrectly (see PrmObjectGet docs).
// Context is required and must not be nil. It is used for network communication.
func (c *Client) ObjectGetInit(ctx context.Context, prm PrmObjectGet) (*ObjectReader, error) {
// check parameters
switch {
case prm.addr.GetContainerID() == nil:
return nil, errorMissingContainer
case prm.addr.GetObjectID() == nil:
return nil, errorMissingObject
req, err := prm.buildRequest(c)
if err != nil {
return nil, err
}
// form request body
var body v2object.GetRequestBody
body.SetRaw(prm.raw)
body.SetAddress(&prm.addr)
// form request
var req v2object.GetRequest
req.SetBody(&body)
c.prepareRequest(&req, &prm.meta)
key := prm.key
key := prm.Key
if key == nil {
key = &c.prm.key
}
err := signature.SignServiceMessage(key, &req)
err = signature.SignServiceMessage(key, req)
if err != nil {
return nil, fmt.Errorf("sign request: %w", err)
}
ctx, cancel := context.WithCancel(ctx)
stream, err := rpcapi.GetObject(&c.c, &req, client.WithContext(ctx))
stream, err := rpcapi.GetObject(&c.c, req, client.WithContext(ctx))
if err != nil {
cancel()
return nil, fmt.Errorf("open stream: %w", err)
@ -347,17 +333,29 @@ func (c *Client) ObjectGetInit(ctx context.Context, prm PrmObjectGet) (*ObjectRe
// PrmObjectHead groups parameters of ObjectHead operation.
type PrmObjectHead struct {
prmObjectRead
XHeaders []string
keySet bool
key ecdsa.PrivateKey
BearerToken *bearer.Token
Session *session.Object
Raw bool
Local bool
ContainerID *cid.ID
ObjectID *oid.ID
Key *ecdsa.PrivateKey
}
// UseKey specifies private key to sign the requests.
// If key is not provided, then Client default key is used.
func (x *PrmObjectHead) UseKey(key ecdsa.PrivateKey) {
x.keySet = true
x.key = key
//
// Deprecated: Use PrmObjectHead.Key instead.
func (prm *PrmObjectHead) UseKey(key ecdsa.PrivateKey) {
prm.Key = &key
}
// ResObjectHead groups resulting values of ObjectHead operation.
@ -390,6 +388,58 @@ func (x *ResObjectHead) ReadHeader(dst *object.Object) bool {
return true
}
func (prm *PrmObjectHead) buildRequest(c *Client) (*v2object.HeadRequest, error) {
if prm.ContainerID == nil {
return nil, errorMissingContainer
}
if prm.ObjectID == nil {
return nil, errorMissingObject
}
if len(prm.XHeaders)%2 != 0 {
return nil, errorInvalidXHeaders
}
meta := new(v2session.RequestMetaHeader)
writeXHeadersToMeta(prm.XHeaders, meta)
if prm.BearerToken != nil {
v2BearerToken := new(acl.BearerToken)
prm.BearerToken.WriteToV2(v2BearerToken)
meta.SetBearerToken(v2BearerToken)
}
if prm.Session != nil {
v2SessionToken := new(v2session.Token)
prm.Session.WriteToV2(v2SessionToken)
meta.SetSessionToken(v2SessionToken)
}
if prm.Local {
meta.SetTTL(1)
}
addr := new(v2refs.Address)
cnrV2 := new(v2refs.ContainerID)
prm.ContainerID.WriteToV2(cnrV2)
addr.SetContainerID(cnrV2)
objV2 := new(v2refs.ObjectID)
prm.ObjectID.WriteToV2(objV2)
addr.SetObjectID(objV2)
body := new(v2object.HeadRequestBody)
body.SetRaw(prm.Raw)
body.SetAddress(addr)
req := new(v2object.HeadRequest)
req.SetBody(body)
c.prepareRequest(req, meta)
return req, nil
}
// ObjectHead reads object header through a remote server using FrostFS API protocol.
//
// Exactly one return value is non-nil. By default, server status is returned in res structure.
@ -413,33 +463,24 @@ func (x *ResObjectHead) ReadHeader(dst *object.Object) bool {
// - *apistatus.ObjectAlreadyRemoved;
// - *apistatus.SessionTokenExpired.
func (c *Client) ObjectHead(ctx context.Context, prm PrmObjectHead) (*ResObjectHead, error) {
switch {
case prm.addr.GetContainerID() == nil:
return nil, errorMissingContainer
case prm.addr.GetObjectID() == nil:
return nil, errorMissingObject
req, err := prm.buildRequest(c)
if err != nil {
return nil, err
}
var body v2object.HeadRequestBody
body.SetRaw(prm.raw)
body.SetAddress(&prm.addr)
var req v2object.HeadRequest
req.SetBody(&body)
c.prepareRequest(&req, &prm.meta)
key := c.prm.key
if prm.keySet {
key = prm.key
if prm.Key != nil {
key = *prm.Key
}
// sign the request
err := signature.SignServiceMessage(&key, &req)
err = signature.SignServiceMessage(&key, req)
if err != nil {
return nil, fmt.Errorf("sign request: %w", err)
}
resp, err := rpcapi.HeadObject(&c.c, &req, client.WithContext(ctx))
resp, err := rpcapi.HeadObject(&c.c, req, client.WithContext(ctx))
if err != nil {
return nil, fmt.Errorf("write request: %w", err)
}
@ -454,7 +495,7 @@ func (c *Client) ObjectHead(ctx context.Context, prm PrmObjectHead) (*ResObjectH
return &res, nil
}
_ = res.idObj.ReadFromV2(*prm.addr.GetObjectID())
res.idObj = *prm.ObjectID
switch v := resp.GetBody().GetHeaderPart().(type) {
default:
@ -470,29 +511,95 @@ func (c *Client) ObjectHead(ctx context.Context, prm PrmObjectHead) (*ResObjectH
// PrmObjectRange groups parameters of ObjectRange operation.
type PrmObjectRange struct {
prmObjectRead
XHeaders []string
key *ecdsa.PrivateKey
BearerToken *bearer.Token
rng v2object.Range
Session *session.Object
Raw bool
Local bool
ContainerID *cid.ID
ObjectID *oid.ID
Key *ecdsa.PrivateKey
Offset uint64
Length uint64
}
// SetOffset sets offset of the payload range to be read.
// Zero by default.
func (x *PrmObjectRange) SetOffset(off uint64) {
x.rng.SetOffset(off)
func (prm *PrmObjectRange) buildRequest(c *Client) (*v2object.GetRangeRequest, error) {
if prm.Length == 0 {
return nil, errorZeroRangeLength
}
// SetLength sets length of the payload range to be read.
// Must be positive.
func (x *PrmObjectRange) SetLength(ln uint64) {
x.rng.SetLength(ln)
if prm.ContainerID == nil {
return nil, errorMissingContainer
}
if prm.ObjectID == nil {
return nil, errorMissingObject
}
if len(prm.XHeaders)%2 != 0 {
return nil, errorInvalidXHeaders
}
meta := new(v2session.RequestMetaHeader)
writeXHeadersToMeta(prm.XHeaders, meta)
if prm.BearerToken != nil {
v2BearerToken := new(acl.BearerToken)
prm.BearerToken.WriteToV2(v2BearerToken)
meta.SetBearerToken(v2BearerToken)
}
if prm.Session != nil {
v2SessionToken := new(v2session.Token)
prm.Session.WriteToV2(v2SessionToken)
meta.SetSessionToken(v2SessionToken)
}
if prm.Local {
meta.SetTTL(1)
}
addr := new(v2refs.Address)
cnrV2 := new(v2refs.ContainerID)
prm.ContainerID.WriteToV2(cnrV2)
addr.SetContainerID(cnrV2)
objV2 := new(v2refs.ObjectID)
prm.ObjectID.WriteToV2(objV2)
addr.SetObjectID(objV2)
rng := new(v2object.Range)
rng.SetLength(prm.Length)
rng.SetOffset(prm.Offset)
body := new(v2object.GetRangeRequestBody)
body.SetRaw(prm.Raw)
body.SetAddress(addr)
body.SetRange(rng)
req := new(v2object.GetRangeRequest)
req.SetBody(body)
c.prepareRequest(req, meta)
return req, nil
}
// UseKey specifies private key to sign the requests.
// If key is not provided, then Client default key is used.
func (x *PrmObjectRange) UseKey(key ecdsa.PrivateKey) {
x.key = &key
//
// Deprecated: Use PrmObjectRange.Key instead.
func (prm *PrmObjectRange) UseKey(key ecdsa.PrivateKey) {
prm.Key = &key
}
// ResObjectRange groups the final result values of ObjectRange operation.
@ -662,49 +769,31 @@ func (x *ObjectRangeReader) Read(p []byte) (int, error) {
// Returns an error if parameters are set incorrectly (see PrmObjectRange docs).
// Context is required and must not be nil. It is used for network communication.
func (c *Client) ObjectRangeInit(ctx context.Context, prm PrmObjectRange) (*ObjectRangeReader, error) {
// check parameters
switch {
case prm.addr.GetContainerID() == nil:
return nil, errorMissingContainer
case prm.addr.GetObjectID() == nil:
return nil, errorMissingObject
case prm.rng.GetLength() == 0:
return nil, errorZeroRangeLength
req, err := prm.buildRequest(c)
if err != nil {
return nil, err
}
// form request body
var body v2object.GetRangeRequestBody
body.SetRaw(prm.raw)
body.SetAddress(&prm.addr)
body.SetRange(&prm.rng)
// form request
var req v2object.GetRangeRequest
req.SetBody(&body)
c.prepareRequest(&req, &prm.meta)
key := prm.key
key := prm.Key
if key == nil {
key = &c.prm.key
}
err := signature.SignServiceMessage(key, &req)
err = signature.SignServiceMessage(key, req)
if err != nil {
return nil, fmt.Errorf("sign request: %w", err)
}
ctx, cancel := context.WithCancel(ctx)
stream, err := rpcapi.GetObjectRange(&c.c, &req, client.WithContext(ctx))
stream, err := rpcapi.GetObjectRange(&c.c, req, client.WithContext(ctx))
if err != nil {
cancel()
return nil, fmt.Errorf("open stream: %w", err)
}
var r ObjectRangeReader
r.remainingPayloadLen = int(prm.rng.GetLength())
r.remainingPayloadLen = int(prm.Length)
r.cancelCtxStream = cancel
r.stream = stream
r.client = c

View file

@ -13,121 +13,53 @@ import (
v2session "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/session"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/signature"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/checksum"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session"
)
// PrmObjectHash groups parameters of ObjectHash operation.
type PrmObjectHash struct {
meta v2session.RequestMetaHeader
XHeaders []string
body v2object.GetRangeHashRequestBody
BearerToken *bearer.Token
csAlgo v2refs.ChecksumType
Session *session.Object
addr v2refs.Address
Local bool
keySet bool
key ecdsa.PrivateKey
Ranges []object.Range
Salt []byte
ChecksumType checksum.Type
ContainerID *cid.ID
ObjectID *oid.ID
Key *ecdsa.PrivateKey
}
// UseKey specifies private key to sign the requests.
// If key is not provided, then Client default key is used.
func (x *PrmObjectHash) UseKey(key ecdsa.PrivateKey) {
x.keySet = true
x.key = key
}
// MarkLocal tells the server to execute the operation locally.
func (x *PrmObjectHash) MarkLocal() {
x.meta.SetTTL(1)
}
// WithinSession specifies session within which object should be read.
//
// Creator of the session acquires the authorship of the request.
// This may affect the execution of an operation (e.g. access control).
//
// Must be signed.
func (x *PrmObjectHash) WithinSession(t session.Object) {
var tv2 v2session.Token
t.WriteToV2(&tv2)
x.meta.SetSessionToken(&tv2)
}
// WithBearerToken attaches bearer token to be used for the operation.
//
// If set, underlying eACL rules will be used in access control.
//
// Must be signed.
func (x *PrmObjectHash) WithBearerToken(t bearer.Token) {
var v2token acl.BearerToken
t.WriteToV2(&v2token)
x.meta.SetBearerToken(&v2token)
}
// FromContainer specifies FrostFS container of the object.
// Required parameter.
func (x *PrmObjectHash) FromContainer(id cid.ID) {
var cidV2 v2refs.ContainerID
id.WriteToV2(&cidV2)
x.addr.SetContainerID(&cidV2)
}
// ByID specifies identifier of the requested object.
// Required parameter.
func (x *PrmObjectHash) ByID(id oid.ID) {
var idV2 v2refs.ObjectID
id.WriteToV2(&idV2)
x.addr.SetObjectID(&idV2)
}
// SetRangeList sets list of ranges in (offset, length) pair format.
// Required parameter.
//
// If passed as slice, then it must not be mutated before the operation completes.
func (x *PrmObjectHash) SetRangeList(r ...uint64) {
ln := len(r)
if ln%2 != 0 {
panic("odd number of range parameters")
}
rs := make([]v2object.Range, ln/2)
for i := 0; i < ln/2; i++ {
rs[i].SetOffset(r[2*i])
rs[i].SetLength(r[2*i+1])
}
x.body.SetRanges(rs)
// Deprecated: Use PrmObjectHash.Key instead.
func (prm *PrmObjectHash) UseKey(key ecdsa.PrivateKey) {
prm.Key = &key
}
// TillichZemorAlgo changes the hash function to Tillich-Zemor
// (https://link.springer.com/content/pdf/10.1007/3-540-48658-5_5.pdf).
//
// By default, SHA256 hash function is used.
func (x *PrmObjectHash) TillichZemorAlgo() {
x.csAlgo = v2refs.TillichZemor
}
// UseSalt sets the salt to XOR the data range before hashing.
// By default, SHA256 hash function is used/.
//
// Must not be mutated before the operation completes.
func (x *PrmObjectHash) UseSalt(salt []byte) {
x.body.SetSalt(salt)
}
// WithXHeaders specifies list of extended headers (string key-value pairs)
// to be attached to the request. Must have an even length.
//
// Slice must not be mutated until the operation completes.
func (x *PrmObjectHash) WithXHeaders(hs ...string) {
writeXHeadersToMeta(hs, &x.meta)
// Deprecated: Use PrmObjectHash.ChecksumType instead.
func (prm *PrmObjectHash) TillichZemorAlgo() {
prm.ChecksumType = checksum.TZ
}
// ResObjectHash groups resulting values of ObjectHash operation.
@ -142,6 +74,76 @@ func (x ResObjectHash) Checksums() [][]byte {
return x.checksums
}
func (prm *PrmObjectHash) buildRequest(c *Client) (*v2object.GetRangeHashRequest, error) {
if prm.ContainerID == nil {
return nil, errorMissingContainer
}
if prm.ObjectID == nil {
return nil, errorMissingObject
}
if len(prm.XHeaders)%2 != 0 {
return nil, errorInvalidXHeaders
}
if len(prm.Ranges) == 0 {
return nil, errorMissingRanges
}
meta := new(v2session.RequestMetaHeader)
writeXHeadersToMeta(prm.XHeaders, meta)
if prm.BearerToken != nil {
v2BearerToken := new(acl.BearerToken)
prm.BearerToken.WriteToV2(v2BearerToken)
meta.SetBearerToken(v2BearerToken)
}
if prm.Session != nil {
v2SessionToken := new(v2session.Token)
prm.Session.WriteToV2(v2SessionToken)
meta.SetSessionToken(v2SessionToken)
}
if prm.Local {
meta.SetTTL(1)
}
addr := new(v2refs.Address)
cnrV2 := new(v2refs.ContainerID)
prm.ContainerID.WriteToV2(cnrV2)
addr.SetContainerID(cnrV2)
objV2 := new(v2refs.ObjectID)
prm.ObjectID.WriteToV2(objV2)
addr.SetObjectID(objV2)
rs := make([]v2object.Range, len(prm.Ranges))
for i := range prm.Ranges {
rs[i].SetOffset(prm.Ranges[i].GetOffset())
rs[i].SetLength(prm.Ranges[i].GetLength())
}
body := new(v2object.GetRangeHashRequestBody)
body.SetAddress(addr)
body.SetRanges(rs)
body.SetSalt(prm.Salt)
if prm.ChecksumType == checksum.Unknown {
body.SetType(v2refs.SHA256)
} else {
body.SetType(v2refs.ChecksumType(prm.ChecksumType))
}
req := new(v2object.GetRangeHashRequest)
req.SetBody(body)
c.prepareRequest(req, meta)
return req, nil
}
// ObjectHash requests checksum of the range list of the object payload using
// FrostFS API protocol.
//
@ -165,37 +167,22 @@ func (x ResObjectHash) Checksums() [][]byte {
// - *apistatus.ObjectOutOfRange;
// - *apistatus.SessionTokenExpired.
func (c *Client) ObjectHash(ctx context.Context, prm PrmObjectHash) (*ResObjectHash, error) {
switch {
case prm.addr.GetContainerID() == nil:
return nil, errorMissingContainer
case prm.addr.GetObjectID() == nil:
return nil, errorMissingObject
case len(prm.body.GetRanges()) == 0:
return nil, errorMissingRanges
req, err := prm.buildRequest(c)
if err != nil {
return nil, err
}
prm.body.SetAddress(&prm.addr)
if prm.csAlgo == v2refs.UnknownChecksum {
prm.body.SetType(v2refs.SHA256)
} else {
prm.body.SetType(prm.csAlgo)
}
var req v2object.GetRangeHashRequest
c.prepareRequest(&req, &prm.meta)
req.SetBody(&prm.body)
key := c.prm.key
if prm.keySet {
key = prm.key
if prm.Key != nil {
key = *prm.Key
}
err := signature.SignServiceMessage(&key, &req)
err = signature.SignServiceMessage(&key, req)
if err != nil {
return nil, fmt.Errorf("sign request: %w", err)
}
resp, err := rpcapi.HashObjectRange(&c.c, &req, client.WithContext(ctx))
resp, err := rpcapi.HashObjectRange(&c.c, req, client.WithContext(ctx))
if err != nil {
return nil, fmt.Errorf("write request: %w", err)
}

View file

@ -16,30 +16,32 @@ import (
// PrmSessionCreate groups parameters of SessionCreate operation.
type PrmSessionCreate struct {
prmCommonMeta
XHeaders []string
exp uint64
Expiration uint64
keySet bool
key ecdsa.PrivateKey
Key *ecdsa.PrivateKey
}
// SetExp sets number of the last NepFS epoch in the lifetime of the session after which it will be expired.
//
// Deprecated: Use PrmSessionCreate.Expiration instead.
func (x *PrmSessionCreate) SetExp(exp uint64) {
x.exp = exp
x.Expiration = exp
}
// UseKey specifies private key to sign the requests and compute token owner.
// If key is not provided, then Client default key is used.
//
// Deprecated: Use PrmSessionCreate.Key instead.
func (x *PrmSessionCreate) UseKey(key ecdsa.PrivateKey) {
x.keySet = true
x.key = key
x.Key = &key
}
func (x *PrmSessionCreate) buildRequest(c *Client) (*v2session.CreateRequest, error) {
ownerKey := c.prm.key.PublicKey
if x.keySet {
ownerKey = x.key.PublicKey
if x.Key != nil {
ownerKey = x.Key.PublicKey
}
var ownerID user.ID
user.IDFromKey(&ownerID, ownerKey)
@ -49,10 +51,10 @@ func (x *PrmSessionCreate) buildRequest(c *Client) (*v2session.CreateRequest, er
reqBody := new(v2session.CreateRequestBody)
reqBody.SetOwnerID(&ownerIDV2)
reqBody.SetExpiration(x.exp)
reqBody.SetExpiration(x.Expiration)
var meta v2session.RequestMetaHeader
writeXHeadersToMeta(x.xHeaders, &meta)
writeXHeadersToMeta(x.XHeaders, &meta)
var req v2session.CreateRequest
req.SetBody(reqBody)

View file

@ -1,8 +1,8 @@
package cid_test
import (
"crypto/rand"
"crypto/sha256"
"math/rand"
"testing"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/refs"

View file

@ -1,8 +1,8 @@
package cidtest
import (
"crypto/rand"
"crypto/sha256"
"math/rand"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
)
@ -11,7 +11,7 @@ import (
func ID() cid.ID {
checksum := [sha256.Size]byte{}
rand.Read(checksum[:])
_, _ = rand.Read(checksum[:])
return IDWithChecksum(checksum)
}

View file

@ -1,7 +1,7 @@
package frostfscrypto_test
import (
"math/rand"
"crypto/rand"
"testing"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/refs"

View file

@ -131,7 +131,8 @@ Its basic syntax is as follows:
REP <count> {IN <select>}
```
If a select is not specified, then the entire netmap is used as input. The resulting nodes will be used to actually store objects and they constitute a replica group (or simply, "a replica").
If a select is not specified, then the entire netmap is used as input. The only exception to this rule is when exactly 1 replica and 1 selector are being present: in this case the only selector is being used instead of the whole netmap.
The resulting nodes will be used to actually store objects and they constitute a replica group (or simply, "a replica").
Examples
```sql

View file

@ -2,7 +2,7 @@ package eacltest
import (
"bytes"
"math/rand"
"crypto/rand"
"testing"
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"

View file

@ -1,7 +1,7 @@
package eacl
import (
"math/rand"
"crypto/rand"
"testing"
"github.com/stretchr/testify/require"

View file

@ -24,6 +24,7 @@ type (
minAgg struct {
min float64
minFound bool
}
meanIQRAgg struct {
@ -102,7 +103,13 @@ func (a *meanAgg) Compute() float64 {
}
func (a *minAgg) Add(n float64) {
if a.min == 0 || n < a.min {
if !a.minFound {
a.min = n
a.minFound = true
return
}
if n < a.min {
a.min = n
}
}

44
netmap/aggregator_test.go Normal file
View file

@ -0,0 +1,44 @@
package netmap
import (
"testing"
"github.com/stretchr/testify/require"
)
func TestMinAgg(t *testing.T) {
tests := []struct {
vals []float64
res float64
}{
{
vals: []float64{1, 2, 3, 0, 10},
res: 0,
},
{
vals: []float64{10, 0, 10, 0},
res: 0,
},
{
vals: []float64{0, 1, 2, 3, 10},
res: 0,
},
{
vals: []float64{0, 0, 0, 0},
res: 0,
},
{
vals: []float64{10, 10, 10, 10},
res: 10,
},
}
for _, test := range tests {
a := newMinAgg()
for _, val := range test.vals {
a.Add(val)
}
require.Equal(t, test.res, a.Compute())
}
}

View file

@ -40,6 +40,10 @@ type context struct {
// policy uses the UNIQUE flag. Nodes marked as used are not used in subsequent
// base selections.
usedNodes map[uint64]bool
// If true, returns an error when netmap does not contain enough nodes for selection.
// By default best effort is taken.
strict bool
}
// Various validation errors.

View file

@ -2,6 +2,7 @@ package netmap
import (
"encoding/json"
"fmt"
"os"
"path/filepath"
"testing"
@ -47,7 +48,8 @@ func TestPlacementPolicy_Interopability(t *testing.T) {
require.NoError(t, err)
for i := range ds {
bs, err := os.ReadFile(filepath.Join(testsDir, ds[i].Name()))
filename := filepath.Join(testsDir, ds[i].Name())
bs, err := os.ReadFile(filename)
require.NoError(t, err)
var tc TestCase
@ -56,7 +58,7 @@ func TestPlacementPolicy_Interopability(t *testing.T) {
srcNodes := make([]NodeInfo, len(tc.Nodes))
copy(srcNodes, tc.Nodes)
t.Run(tc.Name, func(t *testing.T) {
t.Run(fmt.Sprintf("%s:%s", filename, tc.Name), func(t *testing.T) {
var nm NetMap
nm.SetNodes(tc.Nodes)

View file

@ -146,19 +146,19 @@
"select 3 nodes in 3 distinct countries, same placement": {
"policy": {"replicas":[{"count":1,"selector":"Main"}],"containerBackupFactor":1,"selectors":[{"name":"Main","count":3,"clause":"DISTINCT","attribute":"Country","filter":"*"}],"filters":[]},
"pivot": "Y29udGFpbmVySUQ=",
"result": [[4, 0, 7]],
"result": [[0, 2, 3]],
"placement": {
"pivot": "b2JqZWN0SUQ=",
"result": [[4, 0, 7]]
"result": [[0, 2, 3]]
}
},
"select 6 nodes in 3 distinct countries, different placement": {
"policy": {"replicas":[{"count":1,"selector":"Main"}],"containerBackupFactor":2,"selectors":[{"name":"Main","count":3,"clause":"DISTINCT","attribute":"Country","filter":"*"}],"filters":[]},
"pivot": "Y29udGFpbmVySUQ=",
"result": [[4, 3, 0, 1, 7, 2]],
"result": [[0, 1, 2, 6, 3, 4]],
"placement": {
"pivot": "b2JqZWN0SUQ=",
"result": [[4, 3, 0, 7, 2, 1]]
"result": [[0, 1, 2, 6, 3, 4]]
}
}
}

View file

@ -0,0 +1,95 @@
{
"name": "non-strict selections",
"comment": "These test specify loose selection behaviour, to allow fetching already PUT objects even when there is not enough nodes to select from.",
"nodes": [
{
"attributes": [
{
"key": "Country",
"value": "Russia"
}
]
},
{
"attributes": [
{
"key": "Country",
"value": "Germany"
}
]
},
{
"attributes": [ ]
}
],
"tests": {
"not enough nodes (backup factor)": {
"policy": {
"replicas": [
{
"count": 1,
"selector": "MyStore"
}
],
"containerBackupFactor": 2,
"selectors": [
{
"name": "MyStore",
"count": 2,
"clause": "DISTINCT",
"attribute": "Country",
"filter": "FromRU"
}
],
"filters": [
{
"name": "FromRU",
"key": "Country",
"op": "EQ",
"value": "Russia",
"filters": [ ]
}
]
},
"result": [
[
0
]
]
},
"not enough nodes (buckets)": {
"policy": {
"replicas": [
{
"count": 1,
"selector": "MyStore"
}
],
"containerBackupFactor": 1,
"selectors": [
{
"name": "MyStore",
"count": 2,
"clause": "DISTINCT",
"attribute": "Country",
"filter": "FromRU"
}
],
"filters": [
{
"name": "FromRU",
"key": "Country",
"op": "EQ",
"value": "Russia",
"filters": [ ]
}
]
},
"result": [
[
0
]
]
}
}
}

View file

@ -104,7 +104,14 @@
"selectors": [],
"filters": []
},
"error": "not enough nodes"
"result": [
[
0,
1,
2,
3
]
]
}
}
}

View file

@ -24,7 +24,12 @@
"tests": {
"missing filter": {
"policy": {
"replicas": [],
"replicas": [
{
"count": 1,
"selector": "MyStore"
}
],
"containerBackupFactor": 1,
"selectors": [
{
@ -47,9 +52,14 @@
},
"error": "filter not found"
},
"not enough nodes (backup factor)": {
"not enough nodes (filter results in empty set)": {
"policy": {
"replicas": [],
"replicas": [
{
"count": 1,
"selector": "MyStore"
}
],
"containerBackupFactor": 2,
"selectors": [
{
@ -57,40 +67,15 @@
"count": 2,
"clause": "DISTINCT",
"attribute": "Country",
"filter": "FromRU"
"filter": "FromMoon"
}
],
"filters": [
{
"name": "FromRU",
"name": "FromMoon",
"key": "Country",
"op": "EQ",
"value": "Russia",
"filters": []
}
]
},
"error": "not enough nodes"
},
"not enough nodes (buckets)": {
"policy": {
"replicas": [],
"containerBackupFactor": 1,
"selectors": [
{
"name": "MyStore",
"count": 2,
"clause": "DISTINCT",
"attribute": "Country",
"filter": "FromRU"
}
],
"filters": [
{
"name": "FromRU",
"key": "Country",
"op": "EQ",
"value": "Russia",
"value": "Moon",
"filters": []
}
]

View file

@ -238,7 +238,7 @@ func (m NetMap) ContainerNodes(p PlacementPolicy, pivot []byte) ([][]NodeInfo, e
// marked as used by earlier replicas.
for i := range p.replicas {
sName := p.replicas[i].GetSelector()
if sName == "" {
if sName == "" && !(len(p.replicas) == 1 && len(p.selectors) == 1) {
var s netmap.Selector
s.SetCount(p.replicas[i].GetCount())
s.SetFilter(mainFilterName)
@ -258,6 +258,9 @@ func (m NetMap) ContainerNodes(p PlacementPolicy, pivot []byte) ([][]NodeInfo, e
}
if p.unique {
if c.processedSelectors[sName] == nil {
return nil, fmt.Errorf("selector not found: '%s'", sName)
}
nodes, err := c.getSelection(*c.processedSelectors[sName])
if err != nil {
return nil, err

View file

@ -551,7 +551,7 @@ func (p *PlacementPolicy) DecodeString(s string) error {
return errors.New("parsed nil value")
}
if err := validatePolicy(*p); err != nil {
if err := validatePolicy(*parsed); err != nil {
return fmt.Errorf("invalid policy: %w", err)
}
@ -605,6 +605,13 @@ var (
errUnknownSelector = errors.New("policy: selector not found")
// errSyntaxError is returned for errors found by ANTLR parser.
errSyntaxError = errors.New("policy: syntax error")
// errRedundantSelector is returned for errors found by selectors policy validator.
errRedundantSelector = errors.New("policy: found redundant selector")
// errUnnamedSelector is returned for errors found by selectors policy validator.
errUnnamedSelector = errors.New("policy: unnamed selectors are useless, " +
"make sure to pair REP and SELECT clauses: \"REP .. IN X\" + \"SELECT ... AS X\"")
// errRedundantSelector is returned for errors found by filters policy validator.
errRedundantFilter = errors.New("policy: found redundant filter")
)
type policyVisitor struct {
@ -845,25 +852,52 @@ func (p *policyVisitor) VisitExpr(ctx *parser.ExprContext) any {
// validatePolicy checks high-level constraints such as filter link in SELECT
// being actually defined in FILTER section.
func validatePolicy(p PlacementPolicy) error {
canOmitNames := len(p.selectors) == 1 && len(p.replicas) == 1
seenFilters := map[string]bool{}
expectedFilters := map[string]struct{}{}
for i := range p.filters {
seenFilters[p.filters[i].GetName()] = true
for _, f := range p.filters[i].GetFilters() {
if f.GetName() != "" {
expectedFilters[f.GetName()] = struct{}{}
}
}
}
seenSelectors := map[string]bool{}
seenSelectors := map[string]*netmap.Selector{}
for i := range p.selectors {
if flt := p.selectors[i].GetFilter(); flt != mainFilterName && !seenFilters[flt] {
if p.selectors[i].GetName() == "" && !canOmitNames {
return errUnnamedSelector
}
if flt := p.selectors[i].GetFilter(); flt != mainFilterName {
expectedFilters[flt] = struct{}{}
if !seenFilters[flt] {
return fmt.Errorf("%w: '%s'", errUnknownFilter, flt)
}
seenSelectors[p.selectors[i].GetName()] = true
}
seenSelectors[p.selectors[i].GetName()] = &p.selectors[i]
}
for _, f := range p.filters {
if _, ok := expectedFilters[f.GetName()]; !ok {
return fmt.Errorf("%w: '%s'", errRedundantFilter, f.GetName())
}
}
expectedSelectors := map[string]struct{}{}
for i := range p.replicas {
if sel := p.replicas[i].GetSelector(); sel != "" && !seenSelectors[sel] {
return fmt.Errorf("%w: '%s'", errUnknownSelector, sel)
selName := p.replicas[i].GetSelector()
if selName != "" || canOmitNames {
expectedSelectors[selName] = struct{}{}
if seenSelectors[selName] == nil {
return fmt.Errorf("%w: '%s'", errUnknownSelector, selName)
}
}
}
for _, s := range p.selectors {
if _, ok := expectedSelectors[s.GetName()]; !ok {
return fmt.Errorf("%w: to use selector '%s' use keyword IN", errRedundantSelector, s.GetName())
}
}

View file

@ -0,0 +1,65 @@
package netmap
import (
"strings"
"testing"
"github.com/stretchr/testify/require"
)
func TestDecodeString(t *testing.T) {
testCases := []string{
`REP 2
CBF 2
SELECT 2 FROM *`,
`REP 1 IN X
CBF 1
SELECT 2 IN SAME Location FROM * AS X`,
`REP 1 IN X
REP 2 IN Y
CBF 1
SELECT 2 FROM * AS X
SELECT 3 FROM * AS Y`,
`REP 1 IN X
SELECT 2 IN City FROM Good AS X
FILTER Country EQ RU AS FromRU
FILTER Country EQ EN AS FromEN
FILTER @FromRU AND @FromEN AND Rating GT 7 AS Good`,
`REP 7 IN SPB
SELECT 1 IN City FROM SPBSSD AS SPB
FILTER City EQ SPB AND SSD EQ true OR City EQ SPB AND Rating GE 5 AS SPBSSD`,
`REP 7 IN SPB
SELECT 1 IN City FROM SPBSSD AS SPB
FILTER NOT (NOT (City EQ SPB) AND SSD EQ true OR City EQ SPB AND Rating GE 5) AS SPBSSD`,
`UNIQUE
REP 1
REP 1`,
}
var p PlacementPolicy
for _, testCase := range testCases {
require.NoError(t, p.DecodeString(testCase), "unable parse %s", testCase)
var b strings.Builder
require.NoError(t, p.WriteStringTo(&b))
require.Equal(t, testCase, b.String())
}
invalidTestCases := map[string]error{
`?REP 1`: errSyntaxError,
`REP 1 trailing garbage`: errSyntaxError,
`REP 1 REP 1 SELECT 4 FROM *`: errUnnamedSelector,
`REP 1 SELECT 4 FROM * SELECT 1 FROM *`: errUnnamedSelector,
`REP 1 IN X SELECT 4 FROM *`: errUnknownSelector,
`REP 1 IN X REP 2 SELECT 4 FROM * AS X FILTER 'UN-LOCODE' EQ 'RU LED' AS F`: errRedundantFilter,
}
for i := range invalidTestCases {
require.ErrorIs(t, p.DecodeString(i), invalidTestCases[i], "#%s", i)
}
}

View file

@ -1,7 +1,6 @@
package netmap_test
import (
"strings"
"testing"
. "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
@ -9,55 +8,6 @@ import (
"github.com/stretchr/testify/require"
)
func TestEncode(t *testing.T) {
testCases := []string{
`REP 1 IN X
CBF 1
SELECT 2 IN SAME Location FROM * AS X`,
`REP 1
SELECT 2 IN City FROM Good
FILTER Country EQ RU AS FromRU
FILTER @FromRU AND Rating GT 7 AS Good`,
`REP 7 IN SPB
SELECT 1 IN City FROM SPBSSD AS SPB
FILTER City EQ SPB AND SSD EQ true OR City EQ SPB AND Rating GE 5 AS SPBSSD`,
`REP 7 IN SPB
SELECT 1 IN City FROM SPBSSD AS SPB
FILTER NOT (NOT (City EQ SPB) AND SSD EQ true OR City EQ SPB AND Rating GE 5) AS SPBSSD`,
`UNIQUE
REP 1
REP 1`,
`REP 1 IN X
SELECT 1 FROM F AS X
FILTER 'UN-LOCODE' EQ 'RU LED' AS F`,
}
var p PlacementPolicy
for _, testCase := range testCases {
require.NoError(t, p.DecodeString(testCase))
var b strings.Builder
require.NoError(t, p.WriteStringTo(&b))
require.Equal(t, testCase, b.String())
}
invalidTestCases := []string{
`?REP 1`,
`REP 1 trailing garbage`,
}
for i := range invalidTestCases {
require.Error(t, p.DecodeString(invalidTestCases[i]), "#%d", i)
}
}
func TestPlacementPolicyEncoding(t *testing.T) {
v := netmaptest.PlacementPolicy()

View file

@ -60,7 +60,7 @@ func (c *context) getSelection(s netmap.Selector) ([]nodes, error) {
bucketCount, nodesInBucket := calcNodesCount(s)
buckets := c.getSelectionBase(s)
if len(buckets) < bucketCount {
if c.strict && len(buckets) < bucketCount {
return nil, fmt.Errorf("%w: '%s'", errNotEnoughNodes, s.GetName())
}
@ -96,7 +96,7 @@ func (c *context) getSelection(s netmap.Selector) ([]nodes, error) {
if len(res) < bucketCount {
// Fallback to using minimum allowed backup factor (1).
res = append(res, fallback...)
if len(res) < bucketCount {
if c.strict && len(res) < bucketCount {
return nil, fmt.Errorf("%w: '%s'", errNotEnoughNodes, s.GetName())
}
}
@ -110,6 +110,13 @@ func (c *context) getSelection(s netmap.Selector) ([]nodes, error) {
hrw.SortHasherSliceByWeightValue(res, weights, c.hrwSeedHash)
}
if len(res) < bucketCount {
if len(res) == 0 {
return nil, errNotEnoughNodes
}
bucketCount = len(res)
}
if s.GetAttribute() == "" {
res, fallback = res[:bucketCount], res[bucketCount:]
for i := range fallback {

View file

@ -1,9 +1,10 @@
package netmap
import (
"crypto/rand"
"encoding/binary"
"fmt"
"math/rand"
mrand "math/rand"
"sort"
"strconv"
"testing"
@ -28,10 +29,10 @@ func BenchmarkHRWSort(b *testing.B) {
node.SetPublicKey(key)
vectors[i] = nodes{node}
weights[i] = float64(rand.Uint32()%10) / 10.0
weights[i] = float64(mrand.Uint32()%10) / 10.0
}
pivot := rand.Uint64()
pivot := mrand.Uint64()
b.Run("sort by index, no weight", func(b *testing.B) {
realNodes := make([]nodes, netmapSize)
b.ResetTimer()
@ -282,6 +283,55 @@ func TestPlacementPolicy_Unique(t *testing.T) {
}
}
func TestPlacementPolicy_SingleOmitNames(t *testing.T) {
nodes := []NodeInfo{
nodeInfoFromAttributes("ID", "1", "Country", "Russia", "City", "SPB"),
nodeInfoFromAttributes("ID", "2", "Country", "Germany", "City", "Berlin"),
nodeInfoFromAttributes("ID", "3", "Country", "Russia", "City", "Moscow"),
nodeInfoFromAttributes("ID", "4", "Country", "France", "City", "Paris"),
nodeInfoFromAttributes("ID", "5", "Country", "France", "City", "Lyon"),
nodeInfoFromAttributes("ID", "6", "Country", "Russia", "City", "SPB"),
nodeInfoFromAttributes("ID", "7", "Country", "Russia", "City", "Moscow"),
nodeInfoFromAttributes("ID", "8", "Country", "Germany", "City", "Darmstadt"),
nodeInfoFromAttributes("ID", "9", "Country", "Germany", "City", "Frankfurt"),
nodeInfoFromAttributes("ID", "10", "Country", "Russia", "City", "SPB"),
nodeInfoFromAttributes("ID", "11", "Country", "Russia", "City", "Moscow"),
nodeInfoFromAttributes("ID", "12", "Country", "Germany", "City", "London"),
}
for i := range nodes {
pub := make([]byte, 33)
rand.Read(pub)
nodes[i].SetPublicKey(pub)
}
var nm NetMap
nm.SetNodes(nodes)
for _, unique := range []bool{false, true} {
t.Run(fmt.Sprintf("unique=%t", unique), func(t *testing.T) {
ssNamed := []Selector{newSelector("X", "City", 2, "FromRU", (*Selector).SelectDistinct)}
fsNamed := []Filter{newFilter("FromRU", "Country", "Russia", netmap.EQ)}
rsNamed := []ReplicaDescriptor{newReplica(1, "X")}
pNamed := newPlacementPolicy(3, rsNamed, ssNamed, fsNamed)
pNamed.unique = unique
vNamed, err := nm.ContainerNodes(pNamed, []byte{1})
require.NoError(t, err)
ssUnnamed := []Selector{newSelector("", "City", 2, "FromRU", (*Selector).SelectDistinct)}
fsUnnamed := []Filter{newFilter("FromRU", "Country", "Russia", netmap.EQ)}
rsUnnamed := []ReplicaDescriptor{newReplica(1, "")}
pUnnamed := newPlacementPolicy(3, rsUnnamed, ssUnnamed, fsUnnamed)
pUnnamed.unique = unique
vUnnamed, err := nm.ContainerNodes(pUnnamed, []byte{1})
require.NoError(t, err)
require.Equal(t, vNamed, vUnnamed)
})
}
}
func TestPlacementPolicy_MultiREP(t *testing.T) {
nodes := []NodeInfo{
nodeInfoFromAttributes("ID", "1", "Country", "Russia", "City", "SPB"),

View file

@ -1,7 +1,7 @@
package netmaptest
import (
"math/rand"
"crypto/rand"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
)
@ -70,7 +70,7 @@ func NetworkInfo() (x netmap.NetworkInfo) {
// NodeInfo returns random netmap.NodeInfo.
func NodeInfo() (x netmap.NodeInfo) {
key := make([]byte, 33)
rand.Read(key)
_, _ = rand.Read(key)
x.SetPublicKey(key)
x.SetNetworkEndpoints("1", "2", "3")

View file

@ -1,10 +1,10 @@
package ns
import (
"crypto/rand"
"errors"
"fmt"
"math/big"
"math/rand"
"strings"
"testing"

View file

@ -1,8 +1,8 @@
package oidtest
import (
"crypto/rand"
"crypto/sha256"
"math/rand"
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
@ -12,7 +12,7 @@ import (
func ID() oid.ID {
checksum := [sha256.Size]byte{}
rand.Read(checksum[:])
_, _ = rand.Read(checksum[:])
return idWithChecksum(checksum)
}

View file

@ -1,8 +1,8 @@
package object_test
import (
"crypto/rand"
"crypto/sha256"
"math/rand"
"testing"
v2object "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"

View file

@ -1,8 +1,8 @@
package object
import (
"crypto/rand"
"crypto/sha256"
"math/rand"
"testing"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/tombstone"

View file

@ -69,3 +69,7 @@ func (c *sessionCache) expired(val *cacheValue) bool {
// use epoch+1 (clear cache beforehand) to prevent 'expired session token' error right after epoch tick
return val.token.ExpiredAt(epoch + 1)
}
func (c *sessionCache) Epoch() uint64 {
return c.currentEpoch.Load()
}

View file

@ -189,3 +189,7 @@ func (m *mockClient) restartIfUnhealthy(ctx context.Context) (healthy bool, chan
}
return
}
func (m *mockClient) close() error {
return nil
}

View file

@ -0,0 +1,172 @@
package pool
import (
"context"
sdkClient "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/transformer"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
type logger interface {
log(level zapcore.Level, msg string, fields ...zap.Field)
}
type PrmObjectPutClientCutInit struct {
PrmObjectPut
}
func (c *clientWrapper) objectPutInitTransformer(prm PrmObjectPutClientCutInit) (*objectWriterTransformer, error) {
cl, err := c.getClient()
if err != nil {
return nil, err
}
var w objectWriterTransformer
w.it = internalTarget{
client: cl,
prm: prm,
address: c.address(),
logger: &c.clientStatusMonitor,
}
key := &c.prm.key
if prm.key != nil {
key = prm.key
}
w.ot = transformer.NewPayloadSizeLimiter(transformer.Params{
Key: key,
NextTargetInit: func() transformer.ObjectWriter { return &w.it },
MaxSize: prm.networkInfo.MaxObjectSize(),
WithoutHomomorphicHash: prm.withoutHomomorphicHash,
NetworkState: prm.networkInfo,
SessionToken: prm.stoken,
})
return &w, nil
}
type objectWriterTransformer struct {
ot transformer.ChunkedObjectWriter
it internalTarget
err error
}
func (x *objectWriterTransformer) WriteHeader(ctx context.Context, hdr object.Object) bool {
x.err = x.ot.WriteHeader(ctx, &hdr)
return x.err == nil
}
func (x *objectWriterTransformer) WritePayloadChunk(ctx context.Context, chunk []byte) bool {
_, x.err = x.ot.Write(ctx, chunk)
return x.err == nil
}
// ResObjectPut groups the final result values of ObjectPutInit operation.
type ResObjectPut struct {
Status apistatus.Status
OID oid.ID
}
// Close return non nil result in any case. If error occurred, the result contains only buffer for further reusing.
func (x *objectWriterTransformer) Close(ctx context.Context) (*ResObjectPut, error) {
ai, err := x.ot.Close(ctx)
if err != nil {
return nil, err
}
if ai != nil && ai.ParentID != nil {
x.it.res.OID = *ai.ParentID
}
return &x.it.res, nil
}
type internalTarget struct {
client *sdkClient.Client
res ResObjectPut
prm PrmObjectPutClientCutInit
useStream bool
address string
logger logger
resolveFrostFSErrors bool
}
func (it *internalTarget) WriteObject(ctx context.Context, o *object.Object) error {
putSingleImplemented, err := it.tryPutSingle(ctx, o)
if putSingleImplemented {
return err
}
it.logger.log(zapcore.DebugLevel, "putSingle not implemented, trying put as stream", zap.String("address", it.address))
it.useStream = true
return it.putAsStream(ctx, o)
}
func (it *internalTarget) putAsStream(ctx context.Context, o *object.Object) error {
var cliPrm sdkClient.PrmObjectPutInit
cliPrm.SetCopiesNumberByVectors(it.prm.copiesNumber)
if it.prm.stoken != nil {
cliPrm.WithinSession(*it.prm.stoken)
}
if it.prm.key != nil {
cliPrm.UseKey(*it.prm.key)
}
if it.prm.btoken != nil {
cliPrm.WithBearerToken(*it.prm.btoken)
}
wrt, err := it.client.ObjectPutInit(ctx, cliPrm)
if err != nil {
return err
}
if wrt.WriteHeader(ctx, *o) {
wrt.WritePayloadChunk(ctx, o.Payload())
}
res, err := wrt.Close(ctx)
if res != nil {
it.res.Status = res.Status()
it.res.OID = res.StoredObjectID()
}
return err
}
func (it *internalTarget) tryPutSingle(ctx context.Context, o *object.Object) (bool, error) {
if it.useStream {
return false, nil
}
var cliPrm sdkClient.PrmObjectPutSingle
cliPrm.SetCopiesNumber(it.prm.copiesNumber)
cliPrm.UseKey(it.prm.key)
if it.prm.stoken != nil {
cliPrm.WithinSession(*it.prm.stoken)
}
if it.prm.btoken != nil {
cliPrm.WithBearerToken(*it.prm.btoken)
}
cliPrm.SetObject(o.ToV2())
res, err := it.client.ObjectPutSingle(ctx, cliPrm)
if err != nil && status.Code(err) == codes.Unimplemented {
return false, err
}
if err == nil {
id, _ := o.ID()
it.res = ResObjectPut{
Status: res.Status(),
OID: id,
}
if !it.resolveFrostFSErrors && !apistatus.IsSuccessful(it.res.Status) {
return true, apistatus.ErrFromStatus(it.res.Status)
}
return true, nil
}
return true, err
}

View file

@ -78,12 +78,16 @@ type client interface {
dial(ctx context.Context) error
// see clientWrapper.restartIfUnhealthy.
restartIfUnhealthy(ctx context.Context) (bool, bool)
// see clientWrapper.close.
close() error
}
// clientStatus provide access to some metrics for connection.
type clientStatus interface {
// isHealthy checks if the connection can handle requests.
isHealthy() bool
// isDialed checks if the connection was created.
isDialed() bool
// setUnhealthy marks client as unhealthy.
setUnhealthy()
// address return address of endpoint.
@ -105,7 +109,7 @@ var errPoolClientUnhealthy = errors.New("pool client unhealthy")
type clientStatusMonitor struct {
logger *zap.Logger
addr string
healthy *atomic.Bool
healthy *atomic.Uint32
errorThreshold uint32
mu sync.RWMutex // protect counters
@ -114,6 +118,22 @@ type clientStatusMonitor struct {
methods []*methodStatus
}
// values for healthy status of clientStatusMonitor.
const (
// statusUnhealthyOnDial is set when dialing to the endpoint is failed,
// so there is no connection to the endpoint, and pool should not close it
// before re-establishing connection once again.
statusUnhealthyOnDial = iota
// statusUnhealthyOnRequest is set when communication after dialing to the
// endpoint is failed due to immediate or accumulated errors, connection is
// available and pool should close it before re-establishing connection once again.
statusUnhealthyOnRequest
// statusHealthy is set when connection is ready to be used by the pool.
statusHealthy
)
// methodStatus provide statistic for specific method.
type methodStatus struct {
name string
@ -195,8 +215,8 @@ func newClientStatusMonitor(logger *zap.Logger, addr string, errorThreshold uint
methods[i] = &methodStatus{name: i.String()}
}
healthy := new(atomic.Bool)
healthy.Store(true)
healthy := new(atomic.Uint32)
healthy.Store(statusHealthy)
return clientStatusMonitor{
logger: logger,
@ -252,6 +272,11 @@ func (x *wrapperPrm) setKey(key ecdsa.PrivateKey) {
x.key = key
}
// setLogger sets sdkClient.Client logger.
func (x *wrapperPrm) setLogger(logger *zap.Logger) {
x.logger = logger
}
// setDialTimeout sets the timeout for connection to be established.
func (x *wrapperPrm) setDialTimeout(timeout time.Duration) {
x.dialTimeout = timeout
@ -317,7 +342,7 @@ func (c *clientWrapper) dial(ctx context.Context) error {
prmDial.SetGRPCDialOptions(c.prm.dialOptions...)
if err = cl.Dial(ctx, prmDial); err != nil {
c.setUnhealthy()
c.setUnhealthyOnDial()
return err
}
@ -334,6 +359,12 @@ func (c *clientWrapper) restartIfUnhealthy(ctx context.Context) (healthy, change
wasHealthy = true
}
// if connection is dialed before, to avoid routine / connection leak,
// pool has to close it and then initialize once again.
if c.isDialed() {
_ = c.close()
}
var cl sdkClient.Client
var prmInit sdkClient.PrmInit
prmInit.SetDefaultPrivateKey(c.prm.key)
@ -348,7 +379,7 @@ func (c *clientWrapper) restartIfUnhealthy(ctx context.Context) (healthy, change
prmDial.SetGRPCDialOptions(c.prm.dialOptions...)
if err := cl.Dial(ctx, prmDial); err != nil {
c.setUnhealthy()
c.setUnhealthyOnDial()
return false, wasHealthy
}
@ -549,11 +580,9 @@ func (c *clientWrapper) containerSetEACL(ctx context.Context, prm PrmContainerSe
return err
}
var cliPrm sdkClient.PrmContainerSetEACL
cliPrm.SetTable(prm.table)
if prm.sessionSet {
cliPrm.WithinSession(prm.session)
cliPrm := sdkClient.PrmContainerSetEACL{
Table: &prm.Table,
Session: prm.Session,
}
start := time.Now()
@ -567,16 +596,19 @@ func (c *clientWrapper) containerSetEACL(ctx context.Context, prm PrmContainerSe
return fmt.Errorf("set eacl on client: %w", err)
}
if !prm.waitParamsSet {
prm.waitParams.setDefaults()
if prm.WaitParams == nil {
prm.WaitParams = defaultWaitParams()
}
if err := prm.WaitParams.CheckValidity(); err != nil {
return fmt.Errorf("invalid wait parameters: %w", err)
}
var cIDp *cid.ID
if cID, set := prm.table.CID(); set {
if cID, set := prm.Table.CID(); set {
cIDp = &cID
}
err = waitForEACLPresence(ctx, c, cIDp, &prm.table, &prm.waitParams)
err = waitForEACLPresence(ctx, c, cIDp, &prm.Table, prm.WaitParams)
if err = c.handleError(ctx, nil, err); err != nil {
return fmt.Errorf("wait eacl presence on client: %w", err)
}
@ -628,6 +660,18 @@ func (c *clientWrapper) networkInfo(ctx context.Context, _ prmNetworkInfo) (netm
// objectPut writes object to FrostFS.
func (c *clientWrapper) objectPut(ctx context.Context, prm PrmObjectPut) (oid.ID, error) {
if prm.bufferMaxSize == 0 {
prm.bufferMaxSize = defaultBufferMaxSizeForPut
}
if prm.clientCut {
return c.objectPutClientCut(ctx, prm)
}
return c.objectPutServerCut(ctx, prm)
}
func (c *clientWrapper) objectPutServerCut(ctx context.Context, prm PrmObjectPut) (oid.ID, error) {
cl, err := c.getClient()
if err != nil {
return oid.ID{}, err
@ -665,10 +709,8 @@ func (c *clientWrapper) objectPut(ctx context.Context, prm PrmObjectPut) (oid.ID
}
if prm.payload != nil {
const defaultBufferSizePut = 3 << 20 // configure?
if sz == 0 || sz > defaultBufferSizePut {
sz = defaultBufferSizePut
if sz == 0 || sz > prm.bufferMaxSize {
sz = prm.bufferMaxSize
}
buf := make([]byte, sz)
@ -709,6 +751,73 @@ func (c *clientWrapper) objectPut(ctx context.Context, prm PrmObjectPut) (oid.ID
return res.StoredObjectID(), nil
}
func (c *clientWrapper) objectPutClientCut(ctx context.Context, prm PrmObjectPut) (oid.ID, error) {
putInitPrm := PrmObjectPutClientCutInit{
PrmObjectPut: prm,
}
start := time.Now()
wObj, err := c.objectPutInitTransformer(putInitPrm)
c.incRequests(time.Since(start), methodObjectPut)
if err = c.handleError(ctx, nil, err); err != nil {
return oid.ID{}, fmt.Errorf("init writing on API client: %w", err)
}
if wObj.WriteHeader(ctx, prm.hdr) {
sz := prm.hdr.PayloadSize()
if data := prm.hdr.Payload(); len(data) > 0 {
if prm.payload != nil {
prm.payload = io.MultiReader(bytes.NewReader(data), prm.payload)
} else {
prm.payload = bytes.NewReader(data)
sz = uint64(len(data))
}
}
if prm.payload != nil {
if sz == 0 || sz > prm.bufferMaxSize {
sz = prm.bufferMaxSize
}
buf := make([]byte, sz)
var n int
for {
n, err = prm.payload.Read(buf)
if n > 0 {
start = time.Now()
successWrite := wObj.WritePayloadChunk(ctx, buf[:n])
c.incRequests(time.Since(start), methodObjectPut)
if !successWrite {
break
}
continue
}
if errors.Is(err, io.EOF) {
break
}
return oid.ID{}, fmt.Errorf("read payload: %w", c.handleError(ctx, nil, err))
}
}
}
res, err := wObj.Close(ctx)
var st apistatus.Status
if res != nil {
st = res.Status
}
if err = c.handleError(ctx, st, err); err != nil { // here err already carries both status and client errors
return oid.ID{}, fmt.Errorf("client failure: %w", err)
}
return res.OID, nil
}
// objectDelete invokes sdkClient.ObjectDelete parse response status to error.
func (c *clientWrapper) objectDelete(ctx context.Context, prm PrmObjectDelete) error {
cl, err := c.getClient()
@ -716,20 +825,15 @@ func (c *clientWrapper) objectDelete(ctx context.Context, prm PrmObjectDelete) e
return err
}
var cliPrm sdkClient.PrmObjectDelete
cliPrm.FromContainer(prm.addr.Container())
cliPrm.ByID(prm.addr.Object())
cnr := prm.addr.Container()
obj := prm.addr.Object()
if prm.stoken != nil {
cliPrm.WithinSession(*prm.stoken)
}
if prm.btoken != nil {
cliPrm.WithBearerToken(*prm.btoken)
}
if prm.key != nil {
cliPrm.UseKey(*prm.key)
cliPrm := sdkClient.PrmObjectDelete{
BearerToken: prm.btoken,
Session: prm.stoken,
ContainerID: &cnr,
ObjectID: &obj,
Key: prm.key,
}
start := time.Now()
@ -752,20 +856,15 @@ func (c *clientWrapper) objectGet(ctx context.Context, prm PrmObjectGet) (ResGet
return ResGetObject{}, err
}
var cliPrm sdkClient.PrmObjectGet
cliPrm.FromContainer(prm.addr.Container())
cliPrm.ByID(prm.addr.Object())
prmCnr := prm.addr.Container()
prmObj := prm.addr.Object()
if prm.stoken != nil {
cliPrm.WithinSession(*prm.stoken)
}
if prm.btoken != nil {
cliPrm.WithBearerToken(*prm.btoken)
}
if prm.key != nil {
cliPrm.UseKey(*prm.key)
cliPrm := sdkClient.PrmObjectGet{
BearerToken: prm.btoken,
Session: prm.stoken,
ContainerID: &prmCnr,
ObjectID: &prmObj,
Key: prm.key,
}
var res ResGetObject
@ -805,23 +904,16 @@ func (c *clientWrapper) objectHead(ctx context.Context, prm PrmObjectHead) (obje
return object.Object{}, err
}
var cliPrm sdkClient.PrmObjectHead
cliPrm.FromContainer(prm.addr.Container())
cliPrm.ByID(prm.addr.Object())
if prm.raw {
cliPrm.MarkRaw()
}
prmCnr := prm.addr.Container()
prmObj := prm.addr.Object()
if prm.stoken != nil {
cliPrm.WithinSession(*prm.stoken)
}
if prm.btoken != nil {
cliPrm.WithBearerToken(*prm.btoken)
}
if prm.key != nil {
cliPrm.UseKey(*prm.key)
cliPrm := sdkClient.PrmObjectHead{
BearerToken: prm.btoken,
Session: prm.stoken,
Raw: prm.raw,
ContainerID: &prmCnr,
ObjectID: &prmObj,
Key: prm.key,
}
var obj object.Object
@ -850,22 +942,17 @@ func (c *clientWrapper) objectRange(ctx context.Context, prm PrmObjectRange) (Re
return ResObjectRange{}, err
}
var cliPrm sdkClient.PrmObjectRange
cliPrm.FromContainer(prm.addr.Container())
cliPrm.ByID(prm.addr.Object())
cliPrm.SetOffset(prm.off)
cliPrm.SetLength(prm.ln)
prmCnr := prm.addr.Container()
prmObj := prm.addr.Object()
if prm.stoken != nil {
cliPrm.WithinSession(*prm.stoken)
}
if prm.btoken != nil {
cliPrm.WithBearerToken(*prm.btoken)
}
if prm.key != nil {
cliPrm.UseKey(*prm.key)
cliPrm := sdkClient.PrmObjectRange{
BearerToken: prm.btoken,
Session: prm.stoken,
ContainerID: &prmCnr,
ObjectID: &prmObj,
Offset: prm.off,
Length: prm.ln,
Key: prm.key,
}
start := time.Now()
@ -922,9 +1009,10 @@ func (c *clientWrapper) sessionCreate(ctx context.Context, prm prmCreateSession)
return resCreateSession{}, err
}
var cliPrm sdkClient.PrmSessionCreate
cliPrm.SetExp(prm.exp)
cliPrm.UseKey(prm.key)
cliPrm := sdkClient.PrmSessionCreate{
Expiration: prm.exp,
Key: &prm.key,
}
start := time.Now()
res, err := cl.SessionCreate(ctx, cliPrm)
@ -944,15 +1032,23 @@ func (c *clientWrapper) sessionCreate(ctx context.Context, prm prmCreateSession)
}
func (c *clientStatusMonitor) isHealthy() bool {
return c.healthy.Load()
return c.healthy.Load() == statusHealthy
}
func (c *clientStatusMonitor) isDialed() bool {
return c.healthy.Load() != statusUnhealthyOnDial
}
func (c *clientStatusMonitor) setHealthy() {
c.healthy.Store(true)
c.healthy.Store(statusHealthy)
}
func (c *clientStatusMonitor) setUnhealthy() {
c.healthy.Store(false)
c.healthy.Store(statusUnhealthyOnRequest)
}
func (c *clientStatusMonitor) setUnhealthyOnDial() {
c.healthy.Store(statusUnhealthyOnDial)
}
func (c *clientStatusMonitor) address() string {
@ -971,12 +1067,20 @@ func (c *clientStatusMonitor) incErrorRate() {
}
c.mu.Unlock()
if thresholdReached && c.logger != nil {
c.logger.Warn("error threshold reached",
if thresholdReached {
c.log(zapcore.WarnLevel, "error threshold reached",
zap.String("address", c.addr), zap.Uint32("threshold", c.errorThreshold))
}
}
func (c *clientStatusMonitor) log(level zapcore.Level, msg string, fields ...zap.Field) {
if c.logger == nil {
return
}
c.logger.Log(level, msg, fields...)
}
func (c *clientStatusMonitor) currentErrorRate() uint32 {
c.mu.RLock()
defer c.mu.RUnlock()
@ -1010,6 +1114,13 @@ func (c *clientWrapper) incRequests(elapsed time.Duration, method MethodIndex) {
}
}
func (c *clientWrapper) close() error {
if c.client != nil {
return c.client.Close()
}
return nil
}
func (c *clientStatusMonitor) handleError(ctx context.Context, st apistatus.Status, err error) error {
if err != nil {
if needCountError(ctx, err) {
@ -1227,12 +1338,6 @@ func (x *WaitParams) SetPollInterval(tick time.Duration) {
x.PollInterval = tick
}
// Deprecated: Use defaultWaitParams() instead.
func (x *WaitParams) setDefaults() {
x.Timeout = 120 * time.Second
x.PollInterval = 5 * time.Second
}
func defaultWaitParams() *WaitParams {
return &WaitParams{
Timeout: 120 * time.Second,
@ -1321,6 +1426,13 @@ type PrmObjectPut struct {
payload io.Reader
copiesNumber []uint32
clientCut bool
networkInfo netmap.NetworkInfo
withoutHomomorphicHash bool
bufferMaxSize uint64
}
// SetHeader specifies header of the object.
@ -1345,6 +1457,32 @@ func (x *PrmObjectPut) SetCopiesNumberVector(copiesNumber []uint32) {
x.copiesNumber = copiesNumber
}
// SetClientCut enables client cut for objects. It means that full object is prepared on client side
// and retrying is possible. But this leads to additional memory using for buffering object parts.
// Buffer size for every put is MaxObjectSize value from FrostFS network.
// There is limit for total memory allocation for in-flight request and
// can be set by InitParameters.SetMaxClientCutMemory (default value is 1gb).
// Put requests will fail if this limit be reached.
func (x *PrmObjectPut) SetClientCut(clientCut bool) {
x.clientCut = clientCut
}
// WithoutHomomorphicHash if set to true do not use Tillich-Zémor hash for payload.
func (x *PrmObjectPut) WithoutHomomorphicHash(v bool) {
x.withoutHomomorphicHash = v
}
// SetBufferMaxSize sets max buffer size to read payload.
// This value isn't used if object size is set explicitly and less than this value.
// Default value 3MB.
func (x *PrmObjectPut) SetBufferMaxSize(size uint64) {
x.bufferMaxSize = size
}
func (x *PrmObjectPut) setNetworkInfo(ni netmap.NetworkInfo) {
x.networkInfo = ni
}
// PrmObjectDelete groups parameters of DeleteObject operation.
type PrmObjectDelete struct {
prmCommon
@ -1532,39 +1670,41 @@ func (x *PrmContainerEACL) SetContainerID(cnrID cid.ID) {
// PrmContainerSetEACL groups parameters of SetEACL operation.
type PrmContainerSetEACL struct {
table eacl.Table
Table eacl.Table
sessionSet bool
session session.Container
Session *session.Container
waitParams WaitParams
waitParamsSet bool
WaitParams *WaitParams
}
// SetTable sets structure of container's extended ACL to be used as a
// parameter of the base client's operation.
//
// See git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client.PrmContainerSetEACL.SetTable.
//
// Deprecated: Use PrmContainerSetEACL.Table instead.
func (x *PrmContainerSetEACL) SetTable(table eacl.Table) {
x.table = table
x.Table = table
}
// WithinSession specifies session to be used as a parameter of the base
// client's operation.
//
// See git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client.PrmContainerSetEACL.WithinSession.
//
// Deprecated: Use PrmContainerSetEACL.Session instead.
func (x *PrmContainerSetEACL) WithinSession(s session.Container) {
x.session = s
x.sessionSet = true
x.Session = &s
}
// SetWaitParams specifies timeout params to complete operation.
// If not provided the default one will be used.
// Panics if any of the wait params isn't positive.
//
// Deprecated: Use PrmContainerSetEACL.WaitParams instead.
func (x *PrmContainerSetEACL) SetWaitParams(waitParams WaitParams) {
waitParams.checkForPositive()
x.waitParams = waitParams
x.waitParamsSet = true
x.WaitParams = &waitParams
}
// PrmBalanceGet groups parameters of Balance operation.
@ -1638,6 +1778,8 @@ type Pool struct {
rebalanceParams rebalanceParameters
clientBuilder clientBuilder
logger *zap.Logger
maxObjectSize uint64
}
type innerPool struct {
@ -1654,6 +1796,8 @@ const (
defaultHealthcheckTimeout = 4 * time.Second
defaultDialTimeout = 5 * time.Second
defaultStreamTimeout = 10 * time.Second
defaultBufferMaxSizeForPut = 3 * 1024 * 1024 // 3 MB
)
// NewPool creates connection pool using parameters.
@ -1713,7 +1857,7 @@ func (p *Pool) Dial(ctx context.Context) error {
}
var st session.Object
err := initSessionForDuration(ctx, &st, clients[j], p.rebalanceParams.sessionExpirationDuration, *p.key)
err := initSessionForDuration(ctx, &st, clients[j], p.rebalanceParams.sessionExpirationDuration, *p.key, false)
if err != nil {
clients[j].setUnhealthy()
p.log(zap.WarnLevel, "failed to create frostfs session token for client",
@ -1721,7 +1865,7 @@ func (p *Pool) Dial(ctx context.Context) error {
continue
}
_ = p.cache.Put(formCacheKey(addr, p.key), st)
_ = p.cache.Put(formCacheKey(addr, p.key, false), st)
atLeastOneHealthy = true
}
source := rand.NewSource(time.Now().UnixNano())
@ -1742,6 +1886,12 @@ func (p *Pool) Dial(ctx context.Context) error {
p.closedCh = make(chan struct{})
p.innerPools = inner
ni, err := p.NetworkInfo(ctx)
if err != nil {
return fmt.Errorf("get network info for max object size: %w", err)
}
p.maxObjectSize = ni.MaxObjectSize()
go p.startRebalance(ctx)
return nil
}
@ -1784,6 +1934,7 @@ func fillDefaultInitParams(params *InitParameters, cache *sessionCache) {
var prm wrapperPrm
prm.setAddress(addr)
prm.setKey(*params.key)
prm.setLogger(params.logger)
prm.setDialTimeout(params.nodeDialTimeout)
prm.setStreamTimeout(params.nodeStreamTimeout)
prm.setErrorThreshold(params.errorThreshold)
@ -1952,9 +2103,15 @@ func (p *innerPool) connection() (client, error) {
return nil, errors.New("no healthy client")
}
func formCacheKey(address string, key *ecdsa.PrivateKey) string {
func formCacheKey(address string, key *ecdsa.PrivateKey, clientCut bool) string {
k := keys.PrivateKey{PrivateKey: *key}
return address + k.String()
stype := "server"
if clientCut {
stype = "client"
}
return address + stype + k.String()
}
func (p *Pool) checkSessionTokenErr(err error, address string) bool {
@ -1970,7 +2127,7 @@ func (p *Pool) checkSessionTokenErr(err error, address string) bool {
return false
}
func initSessionForDuration(ctx context.Context, dst *session.Object, c client, dur uint64, ownerKey ecdsa.PrivateKey) error {
func initSessionForDuration(ctx context.Context, dst *session.Object, c client, dur uint64, ownerKey ecdsa.PrivateKey, clientCut bool) error {
ni, err := c.networkInfo(ctx, prmNetworkInfo{})
if err != nil {
return err
@ -1988,24 +2145,26 @@ func initSessionForDuration(ctx context.Context, dst *session.Object, c client,
prm.setExp(exp)
prm.useKey(ownerKey)
var (
id uuid.UUID
key frostfsecdsa.PublicKey
)
if clientCut {
id = uuid.New()
key = frostfsecdsa.PublicKey(ownerKey.PublicKey)
} else {
res, err := c.sessionCreate(ctx, prm)
if err != nil {
return err
}
var id uuid.UUID
err = id.UnmarshalBinary(res.id)
if err != nil {
if err = id.UnmarshalBinary(res.id); err != nil {
return fmt.Errorf("invalid session token ID: %w", err)
}
var key frostfsecdsa.PublicKey
err = key.Decode(res.sessionKey)
if err != nil {
if err = key.Decode(res.sessionKey); err != nil {
return fmt.Errorf("invalid public session key: %w", err)
}
}
dst.SetID(id)
dst.SetAuthKey(&key)
@ -2030,6 +2189,8 @@ type callContext struct {
sessionCnr cid.ID
sessionObjSet bool
sessionObjs []oid.ID
sessionClientCut bool
}
func (p *Pool) initCallContext(ctx *callContext, cfg prmCommon, prmCtx prmContext) error {
@ -2066,12 +2227,12 @@ func (p *Pool) initCallContext(ctx *callContext, cfg prmCommon, prmCtx prmContex
// opens new session or uses cached one.
// Must be called only on initialized callContext with set sessionTarget.
func (p *Pool) openDefaultSession(ctx context.Context, cc *callContext) error {
cacheKey := formCacheKey(cc.endpoint, cc.key)
cacheKey := formCacheKey(cc.endpoint, cc.key, cc.sessionClientCut)
tok, ok := p.cache.Get(cacheKey)
if !ok {
// init new session
err := initSessionForDuration(ctx, &tok, cc.client, p.stokenDuration, *cc.key)
err := initSessionForDuration(ctx, &tok, cc.client, p.stokenDuration, *cc.key, cc.sessionClientCut)
if err != nil {
return fmt.Errorf("session API client: %w", err)
}
@ -2136,6 +2297,7 @@ func (p *Pool) PutObject(ctx context.Context, prm PrmObjectPut) (oid.ID, error)
p.fillAppropriateKey(&prm.prmCommon)
var ctxCall callContext
ctxCall.sessionClientCut = prm.clientCut
if err := p.initCallContext(&ctxCall, prm.prmCommon, prmCtx); err != nil {
return oid.ID{}, fmt.Errorf("init call context: %w", err)
}
@ -2147,6 +2309,13 @@ func (p *Pool) PutObject(ctx context.Context, prm PrmObjectPut) (oid.ID, error)
}
}
if prm.clientCut {
var ni netmap.NetworkInfo
ni.SetCurrentEpoch(p.cache.Epoch())
ni.SetMaxObjectSize(p.maxObjectSize) // we want to use initial max object size in PayloadSizeLimiter
prm.setNetworkInfo(ni)
}
id, err := ctxCall.client.objectPut(ctx, prm)
if err != nil {
// removes session token from cache in case of token error
@ -2638,6 +2807,15 @@ func (p *Pool) NetworkInfo(ctx context.Context) (netmap.NetworkInfo, error) {
func (p *Pool) Close() {
p.cancel()
<-p.closedCh
// close all clients
for _, pools := range p.innerPools {
for _, cli := range pools.clients {
if cli.isDialed() {
_ = cli.close()
}
}
}
}
// SyncContainerWithNetwork applies network configuration received via

View file

@ -106,7 +106,7 @@ func TestBuildPoolOneNodeFailed(t *testing.T) {
if err != nil {
return false
}
st, _ := clientPool.cache.Get(formCacheKey(cp.address(), clientPool.key))
st, _ := clientPool.cache.Get(formCacheKey(cp.address(), clientPool.key, false))
return st.AssertAuthKey(&expectedAuthKey)
}
require.Never(t, condition, 900*time.Millisecond, 100*time.Millisecond)
@ -141,7 +141,7 @@ func TestOneNode(t *testing.T) {
cp, err := pool.connection()
require.NoError(t, err)
st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key))
st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key, false))
expectedAuthKey := frostfsecdsa.PublicKey(key1.PublicKey)
require.True(t, st.AssertAuthKey(&expectedAuthKey))
}
@ -171,7 +171,7 @@ func TestTwoNodes(t *testing.T) {
cp, err := pool.connection()
require.NoError(t, err)
st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key))
st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key, false))
require.True(t, assertAuthKeyForAny(st, clientKeys))
}
@ -226,7 +226,7 @@ func TestOneOfTwoFailed(t *testing.T) {
for i := 0; i < 5; i++ {
cp, err := pool.connection()
require.NoError(t, err)
st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key))
st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key, false))
require.True(t, assertAuthKeyForAny(st, clientKeys))
}
}
@ -296,7 +296,7 @@ func TestSessionCache(t *testing.T) {
// cache must contain session token
cp, err := pool.connection()
require.NoError(t, err)
st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key))
st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key, false))
require.True(t, st.AssertAuthKey(&expectedAuthKey))
var prm PrmObjectGet
@ -309,7 +309,7 @@ func TestSessionCache(t *testing.T) {
// cache must not contain session token
cp, err = pool.connection()
require.NoError(t, err)
_, ok := pool.cache.Get(formCacheKey(cp.address(), pool.key))
_, ok := pool.cache.Get(formCacheKey(cp.address(), pool.key, false))
require.False(t, ok)
var prm2 PrmObjectPut
@ -321,7 +321,7 @@ func TestSessionCache(t *testing.T) {
// cache must contain session token
cp, err = pool.connection()
require.NoError(t, err)
st, _ = pool.cache.Get(formCacheKey(cp.address(), pool.key))
st, _ = pool.cache.Get(formCacheKey(cp.address(), pool.key, false))
require.True(t, st.AssertAuthKey(&expectedAuthKey))
}
@ -365,7 +365,7 @@ func TestPriority(t *testing.T) {
firstNode := func() bool {
cp, err := pool.connection()
require.NoError(t, err)
st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key))
st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key, false))
return st.AssertAuthKey(&expectedAuthKey1)
}
@ -373,7 +373,7 @@ func TestPriority(t *testing.T) {
secondNode := func() bool {
cp, err := pool.connection()
require.NoError(t, err)
st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key))
st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key, false))
return st.AssertAuthKey(&expectedAuthKey2)
}
require.Never(t, secondNode, time.Second, 200*time.Millisecond)
@ -410,7 +410,7 @@ func TestSessionCacheWithKey(t *testing.T) {
// cache must contain session token
cp, err := pool.connection()
require.NoError(t, err)
st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key))
st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key, false))
require.True(t, st.AssertAuthKey(&expectedAuthKey))
var prm PrmObjectDelete
@ -420,7 +420,7 @@ func TestSessionCacheWithKey(t *testing.T) {
err = pool.DeleteObject(ctx, prm)
require.NoError(t, err)
st, _ = pool.cache.Get(formCacheKey(cp.address(), anonKey))
st, _ = pool.cache.Get(formCacheKey(cp.address(), anonKey, false))
require.True(t, st.AssertAuthKey(&expectedAuthKey))
}
@ -523,6 +523,44 @@ func TestStatusMonitor(t *testing.T) {
require.Equal(t, uint64(count), monitor.overallErrorRate())
require.Equal(t, uint32(1), monitor.currentErrorRate())
t.Run("healthy status", func(t *testing.T) {
cases := []struct {
action func(*clientStatusMonitor)
status uint32
isDialed bool
isHealthy bool
description string
}{
{
action: func(m *clientStatusMonitor) { m.setUnhealthyOnDial() },
status: statusUnhealthyOnDial,
isDialed: false,
isHealthy: false,
description: "set unhealthy on dial",
},
{
action: func(m *clientStatusMonitor) { m.setUnhealthy() },
status: statusUnhealthyOnRequest,
isDialed: true,
isHealthy: false,
description: "set unhealthy on request",
},
{
action: func(m *clientStatusMonitor) { m.setHealthy() },
status: statusHealthy,
isDialed: true,
isHealthy: true,
description: "set healthy",
},
}
for _, tc := range cases {
tc.action(&monitor)
require.Equal(t, tc.status, monitor.healthy.Load())
require.Equal(t, tc.isDialed, monitor.isDialed())
require.Equal(t, tc.isHealthy, monitor.isHealthy())
}
})
}
func TestHandleError(t *testing.T) {

View file

@ -3,6 +3,7 @@ package tree
import (
"context"
"crypto/tls"
"errors"
"fmt"
"sync"
@ -22,6 +23,11 @@ type treeClient struct {
healthy bool
}
var (
// ErrUnhealthyEndpoint is returned when client in the pool considered unavailable.
ErrUnhealthyEndpoint = errors.New("unhealthy endpoint")
)
// newTreeClient creates new tree client with auto dial.
func newTreeClient(addr string, opts ...grpc.DialOption) *treeClient {
return &treeClient{
@ -102,7 +108,7 @@ func (c *treeClient) serviceClient() (grpcService.TreeServiceClient, error) {
defer c.mu.RUnlock()
if c.conn == nil || !c.healthy {
return nil, fmt.Errorf("unhealthy endpoint: '%s'", c.address)
return nil, fmt.Errorf("%w: '%s'", ErrUnhealthyEndpoint, c.address)
}
return c.service, nil

View file

@ -32,6 +32,9 @@ var (
// ErrNodeAccessDenied is returned from Tree service in case of access denied error.
ErrNodeAccessDenied = errors.New("access denied")
// errNodeEmpty is used to trigger retry when 'GetNodeByPath' return empty result.
errNodeEmptyResult = errors.New("empty result")
)
// client represents virtual connection to the single FrostFS tree service from which Pool is formed.
@ -296,8 +299,15 @@ func (p *Pool) GetNodes(ctx context.Context, prm GetNodesParams) ([]*grpcService
var resp *grpcService.GetNodeByPathResponse
if err := p.requestWithRetry(func(client grpcService.TreeServiceClient) (inErr error) {
resp, inErr = client.GetNodeByPath(ctx, request)
// Pool wants to do retry 'GetNodeByPath' request if result is empty.
// Empty result is expected due to delayed tree service sync.
// Return an error there to trigger retry and ignore it after,
// to keep compatibility with 'GetNodeByPath' implementation.
if inErr == nil && len(resp.Body.Nodes) == 0 {
return errNodeEmptyResult
}
return handleError("failed to get node by path", inErr)
}); err != nil {
}); err != nil && !errors.Is(err, errNodeEmptyResult) {
return nil, err
}
@ -720,7 +730,7 @@ func (p *Pool) setStartIndices(i, j int) {
func (p *Pool) requestWithRetry(fn func(client grpcService.TreeServiceClient) error) error {
var (
err error
err, finErr error
cl grpcService.TreeServiceClient
)
@ -740,16 +750,44 @@ func (p *Pool) requestWithRetry(fn func(client grpcService.TreeServiceClient) er
}
return err
}
finErr = finalError(finErr, err)
p.log(zap.DebugLevel, "tree request error", zap.String("address", p.innerPools[indexI].clients[indexJ].endpoint()), zap.Error(err))
}
startJ = 0
}
return err
return finErr
}
func shouldTryAgain(err error) bool {
return !(err == nil ||
errors.Is(err, ErrNodeNotFound) ||
errors.Is(err, ErrNodeAccessDenied))
return !(err == nil || errors.Is(err, ErrNodeAccessDenied))
}
func prioErr(err error) int {
switch {
case err == nil:
return -1
case errors.Is(err, ErrNodeAccessDenied):
return 100
case errors.Is(err, ErrNodeNotFound) ||
errors.Is(err, errNodeEmptyResult):
return 200
case errors.Is(err, ErrUnhealthyEndpoint):
return 300
default:
return 500
}
}
func finalError(current, candidate error) error {
if current == nil || candidate == nil {
return candidate
}
// lower priority error is more desirable
if prioErr(candidate) < prioErr(current) {
return candidate
}
return current
}

View file

@ -156,6 +156,43 @@ func TestRetry(t *testing.T) {
require.NoError(t, err)
checkIndicesAndReset(t, p, 0, 0)
})
t.Run("error empty result", func(t *testing.T) {
errNodes, index := 2, 0
err := p.requestWithRetry(func(client grpcService.TreeServiceClient) error {
if index < errNodes {
index++
return errNodeEmptyResult
}
return nil
})
require.NoError(t, err)
checkIndicesAndReset(t, p, 0, errNodes)
})
t.Run("error not found", func(t *testing.T) {
errNodes, index := 2, 0
err := p.requestWithRetry(func(client grpcService.TreeServiceClient) error {
if index < errNodes {
index++
return ErrNodeNotFound
}
return nil
})
require.NoError(t, err)
checkIndicesAndReset(t, p, 0, errNodes)
})
t.Run("error access denied", func(t *testing.T) {
var index int
err := p.requestWithRetry(func(client grpcService.TreeServiceClient) error {
index++
return ErrNodeAccessDenied
})
require.ErrorIs(t, err, ErrNodeAccessDenied)
require.Equal(t, 1, index)
checkIndicesAndReset(t, p, 0, 0)
})
}
func TestRebalance(t *testing.T) {

View file

@ -1,9 +1,10 @@
package session_test
import (
"crypto/rand"
"fmt"
"math"
"math/rand"
mrand "math/rand"
"testing"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/refs"
@ -396,7 +397,7 @@ func TestContainer_AppliedTo(t *testing.T) {
func TestContainer_InvalidAt(t *testing.T) {
var x session.Container
nbf := rand.Uint64()
nbf := mrand.Uint64()
if nbf == math.MaxUint64 {
nbf--
}

View file

@ -1,7 +1,7 @@
package user_test
import (
"math/rand"
"crypto/rand"
"testing"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/refs"