Compare commits

...

33 commits

Author SHA1 Message Date
fc4551b843 [#172] pool: Use priority of errors in tree pool
When retry happens, use priority map to decide
which error to return. Consider network errors
less desirable than business logic errors.

Signed-off-by: Alex Vanin <a.vanin@yadro.com>
2023-10-11 12:00:34 +03:00
eb5288f4a5 [#172] pool: Do more retries on unexpected tree service responses
1. Try its best by looking for nodes during 'GetNodeByPath'
2. Retry on 'tree not found' and other not found errors

Signed-off-by: Alex Vanin <a.vanin@yadro.com>
2023-10-06 11:12:48 +03:00
60463871db [#171] pool: Add test for healthy status monitor
Signed-off-by: Alex Vanin <a.vanin@yadro.com>
2023-10-03 19:47:22 +03:00
8a04638749 [#171] pool: Close only dialed connections
To avoid panics during close operation, close
only dialed connections.

Signed-off-by: Alex Vanin <a.vanin@yadro.com>
2023-10-03 19:47:22 +03:00
ddbfb758c9 [#171] pool: Use dial status to close connections during restarts
Every client restart, pool creates new client instance. If client
failed due to dial error, there was no prior connection and go
routine on a server side. If client failed due to communication
or business logic errors, then server side maintains connection and
client should close it to avoid routine and connection leak.

Dialing is a part of healthcheck, so health status is now a enum
of three values:
- unhealthy due to dial fail,
- unhealthy due to transmission fail,
- healthy.

Signed-off-by: Alex Vanin <a.vanin@yadro.com>
2023-10-03 17:00:36 +03:00
d71a0e0755 [#88] netmap: use bool, fix hrw_sort tests
Signed-off-by: Andrew Danilin <andnilin@gmail.com>
2023-10-03 07:05:03 +00:00
163b3e1961 [#88] netmap: fix min aggregator bug, add tests
Signed-off-by: Andrew Danilin <andnilin@gmail.com>
2023-10-03 07:05:03 +00:00
84b9d29fc9 [#170] checksum: Use constant mapping for checksum types
Signed-off-by: Airat Arifullin <a.arifullin@yadro.com>
2023-09-28 17:20:24 +03:00
99c273f499 [#169] pool: Close inner pools during close routine
Some apps do not reuse pool instance and expect that
`pool.Close()` free resources. But it didn't actually
close inner SDK clients, so it leads to goroutine leak
in storage.

Signed-off-by: Alex Vanin <a.vanin@yadro.com>
2023-09-20 12:16:13 +03:00
555ccc63b2 [#167] netmap: Allow to select insufficient number of nodes
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2023-09-15 14:47:54 +03:00
0550438b53 [#167] netmap/tests: Add replica to invalid tests
Make sure we fail exactly because of the reason specified.

Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2023-09-15 14:33:44 +03:00
c899163860 [#167] netmap/tests: Add json file name to the test output
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2023-09-15 14:33:44 +03:00
ac8fc6d440 [#162] netmap: Allow to parse single unnamed selectors
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2023-09-11 15:22:24 +03:00
0a0b590df3 [#162] Fix pre-commit warnings
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2023-09-11 15:21:35 +03:00
4df642e941 [#162] netmap: Fix possible panic
Placement policy is unvalidated external input.
Under no circumstances should we panic here.

Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2023-09-11 15:19:56 +03:00
8bc64e088e [#161] .golangci.yml: Reenable deprecated usage warnings
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2023-09-08 17:17:02 +03:00
49ad985cad [#161] *: Do not use math/rand.Read()
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2023-09-08 17:17:02 +03:00
aa12d8c6a6 [#121] client: Make PrmObjectHash fields public
* Introduce buildRequest for PrmObjectHash

Signed-off-by: Airat Arifullin <a.arifullin@yadro.com>
2023-09-08 13:36:45 +00:00
303508328a [#121] pool: Refactor PrmSessionCreate usage
Signed-off-by: Airat Arifullin <a.arifullin@yadro.com>
2023-09-05 17:32:03 +03:00
55699d1480 [#121] client: Make PrmSessionCreate fields public
Signed-off-by: Airat Arifullin <a.arifullin@yadro.com>
2023-09-05 17:30:42 +03:00
55a1f23e71 [#121] client: Make PrmEndpointInfo, PrmNetworkInfo fields public
Signed-off-by: Airat Arifullin <a.arifullin@yadro.com>
2023-09-04 19:55:23 +03:00
291a71ba84 [#121] client: Make PrmAnnounceSpace fields public
Signed-off-by: Airat Arifullin <a.arifullin@yadro.com>
2023-09-04 19:53:34 +03:00
5a471e5002 [#121] client: Make PrmObjectDelete fields public
* Introduce buildRequest for PrmObjectDelete
* Refactor the usage of these params in pool

Signed-off-by: Airat Arifullin <a.arifullin@yadro.com>
2023-09-04 14:14:22 +00:00
b5fe52d6bd [#150] policy: Check for redundant selectors and filters
Signed-off-by: Anton Nikiforov <an.nikiforov@yadro.com>
2023-08-29 14:16:57 +03:00
84e7e69f98 [#121] client: Make PrmObjectGet/Head/GetRange fields public
* Remove common PrmObjectRead structure
* Introduce buildRequest for PrmObjectGet/Head/GetRange
* Refactor the usage of these params in pool

Signed-off-by: Airat Arifullin <a.arifullin@yadro.com>
2023-08-28 11:26:57 +03:00
46a214d065 [#149] pool: Configure homomorphic hash and buffer size
Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
2023-08-25 09:45:15 +03:00
202412230a [#115] pool: Drop part buffer pool
Tests showed that using part buffer pool doesn't save memory a lot.
Especially on big parts.
Probably we can use pool only for small parts
after adding buffer in payloadSizeLimiter

Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
2023-08-21 12:03:03 +03:00
3cb3841073 [#115] pool: Try putSingle if possible
Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
2023-08-21 12:03:03 +03:00
faeeeab87a [#114] pool: Don't use part buffers when client cut is off
Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
2023-08-21 12:02:40 +03:00
cae215534f [#114] pool: Fix linter errors
Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
2023-08-21 12:02:40 +03:00
518fb79bc0 [#114] pool: Support client cut with memory limiter
Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
2023-08-21 12:02:40 +03:00
342524159a [#121] pool: Make PrmContainerSetEACL fields public
Signed-off-by: Airat Arifullin <a.arifullin@yadro.com>
2023-08-21 10:33:19 +03:00
22978303f8 [#121] clientt: Make PrmContainerSetEACL fields public
Signed-off-by: Airat Arifullin <a.arifullin@yadro.com>
2023-08-18 18:05:47 +03:00
52 changed files with 1477 additions and 666 deletions

View file

@ -25,7 +25,7 @@ linters-settings:
# report about shadowed variables # report about shadowed variables
check-shadowing: false check-shadowing: false
staticcheck: staticcheck:
checks: ["all", "-SA1019"] # TODO Enable SA1019 after deprecated warning are fixed. checks: ["all"]
funlen: funlen:
lines: 80 # default 60 lines: 80 # default 60
statements: 60 # default 40 statements: 60 # default 40

View file

@ -24,17 +24,17 @@ type Checksum refs.Checksum
// Type represents the enumeration // Type represents the enumeration
// of checksum types. // of checksum types.
type Type uint8 type Type refs.ChecksumType
const ( const (
// Unknown is an undefined checksum type. // Unknown is an undefined checksum type.
Unknown Type = iota Unknown Type = Type(refs.UnknownChecksum)
// SHA256 is a SHA256 checksum type. // SHA256 is a SHA256 checksum type.
SHA256 SHA256 = Type(refs.SHA256)
// TZ is a Tillich-Zémor checksum type. // TZ is a Tillich-Zémor checksum type.
TZ TZ = Type(refs.TillichZemor)
) )
// ReadFromV2 reads Checksum from the refs.Checksum message. Checks if the // ReadFromV2 reads Checksum from the refs.Checksum message. Checks if the

View file

@ -2,9 +2,9 @@ package checksum
import ( import (
"bytes" "bytes"
"crypto/rand"
"crypto/sha256" "crypto/sha256"
"fmt" "fmt"
"math/rand"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/refs" "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/refs"
) )

View file

@ -1,8 +1,8 @@
package checksumtest package checksumtest
import ( import (
"crypto/rand"
"crypto/sha256" "crypto/sha256"
"math/rand"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/checksum" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/checksum"
) )
@ -11,7 +11,7 @@ import (
func Checksum() checksum.Checksum { func Checksum() checksum.Checksum {
var cs [sha256.Size]byte var cs [sha256.Size]byte
rand.Read(cs[:]) _, _ = rand.Read(cs[:])
var x checksum.Checksum var x checksum.Checksum

View file

@ -47,6 +47,8 @@ func writeXHeadersToMeta(xHeaders []string, h *v2session.RequestMetaHeader) {
return return
} }
// TODO (aarifullin): remove the panic when all client parameters will check XHeaders
// within buildRequest invocation.
if len(xHeaders)%2 != 0 { if len(xHeaders)%2 != 0 {
panic("slice of X-Headers with odd length") panic("slice of X-Headers with odd length")
} }

View file

@ -18,20 +18,20 @@ import (
// PrmContainerSetEACL groups parameters of ContainerSetEACL operation. // PrmContainerSetEACL groups parameters of ContainerSetEACL operation.
type PrmContainerSetEACL struct { type PrmContainerSetEACL struct {
prmCommonMeta // FrostFS request X-Headers.
XHeaders []string
tableSet bool Table *eacl.Table
table eacl.Table
sessionSet bool Session *session.Container
session session.Container
} }
// SetTable sets eACL table structure to be set for the container. // SetTable sets eACL table structure to be set for the container.
// Required parameter. // Required parameter.
//
// Deprecated: Use PrmContainerSetEACL.Table instead.
func (x *PrmContainerSetEACL) SetTable(table eacl.Table) { func (x *PrmContainerSetEACL) SetTable(table eacl.Table) {
x.table = table x.Table = &table
x.tableSet = true
} }
// WithinSession specifies session within which extended ACL of the container // WithinSession specifies session within which extended ACL of the container
@ -45,17 +45,22 @@ func (x *PrmContainerSetEACL) SetTable(table eacl.Table) {
// for which extended ACL is going to be set // for which extended ACL is going to be set
// - session operation MUST be session.VerbContainerSetEACL (ForVerb) // - session operation MUST be session.VerbContainerSetEACL (ForVerb)
// - token MUST be signed using private key of the owner of the container to be saved // - token MUST be signed using private key of the owner of the container to be saved
//
// Deprecated: Use PrmContainerSetEACL.Session instead.
func (x *PrmContainerSetEACL) WithinSession(s session.Container) { func (x *PrmContainerSetEACL) WithinSession(s session.Container) {
x.session = s x.Session = &s
x.sessionSet = true
} }
func (x *PrmContainerSetEACL) buildRequest(c *Client) (*v2container.SetExtendedACLRequest, error) { func (x *PrmContainerSetEACL) buildRequest(c *Client) (*v2container.SetExtendedACLRequest, error) {
if !x.tableSet { if x.Table == nil {
return nil, errorEACLTableNotSet return nil, errorEACLTableNotSet
} }
eaclV2 := x.table.ToV2() if len(x.XHeaders)%2 != 0 {
return nil, errorInvalidXHeaders
}
eaclV2 := x.Table.ToV2()
var sig frostfscrypto.Signature var sig frostfscrypto.Signature
@ -72,11 +77,11 @@ func (x *PrmContainerSetEACL) buildRequest(c *Client) (*v2container.SetExtendedA
reqBody.SetSignature(&sigv2) reqBody.SetSignature(&sigv2)
var meta v2session.RequestMetaHeader var meta v2session.RequestMetaHeader
writeXHeadersToMeta(x.prmCommonMeta.xHeaders, &meta) writeXHeadersToMeta(x.XHeaders, &meta)
if x.sessionSet { if x.Session != nil {
var tokv2 v2session.Token var tokv2 v2session.Token
x.session.WriteToV2(&tokv2) x.Session.WriteToV2(&tokv2)
meta.SetSessionToken(&tokv2) meta.SetSessionToken(&tokv2)
} }

View file

@ -14,27 +14,29 @@ import (
// PrmAnnounceSpace groups parameters of ContainerAnnounceUsedSpace operation. // PrmAnnounceSpace groups parameters of ContainerAnnounceUsedSpace operation.
type PrmAnnounceSpace struct { type PrmAnnounceSpace struct {
prmCommonMeta XHeaders []string
announcements []container.SizeEstimation Announcements []container.SizeEstimation
} }
// SetValues sets values describing volume of space that is used for the container objects. // SetValues sets values describing volume of space that is used for the container objects.
// Required parameter. Must not be empty. // Required parameter. Must not be empty.
// //
// Must not be mutated before the end of the operation. // Must not be mutated before the end of the operation.
//
// Deprecated: Use PrmAnnounceSpace.Announcements instead.
func (x *PrmAnnounceSpace) SetValues(vs []container.SizeEstimation) { func (x *PrmAnnounceSpace) SetValues(vs []container.SizeEstimation) {
x.announcements = vs x.Announcements = vs
} }
func (x *PrmAnnounceSpace) buildRequest(c *Client) (*v2container.AnnounceUsedSpaceRequest, error) { func (x *PrmAnnounceSpace) buildRequest(c *Client) (*v2container.AnnounceUsedSpaceRequest, error) {
if len(x.announcements) == 0 { if len(x.Announcements) == 0 {
return nil, errorMissingAnnouncements return nil, errorMissingAnnouncements
} }
v2announce := make([]v2container.UsedSpaceAnnouncement, len(x.announcements)) v2announce := make([]v2container.UsedSpaceAnnouncement, len(x.Announcements))
for i := range x.announcements { for i := range x.Announcements {
x.announcements[i].WriteToV2(&v2announce[i]) x.Announcements[i].WriteToV2(&v2announce[i])
} }
reqBody := new(v2container.AnnounceUsedSpaceRequestBody) reqBody := new(v2container.AnnounceUsedSpaceRequestBody)

View file

@ -16,12 +16,12 @@ import (
// PrmEndpointInfo groups parameters of EndpointInfo operation. // PrmEndpointInfo groups parameters of EndpointInfo operation.
type PrmEndpointInfo struct { type PrmEndpointInfo struct {
prmCommonMeta XHeaders []string
} }
func (x *PrmEndpointInfo) buildRequest(c *Client) (*v2netmap.LocalNodeInfoRequest, error) { func (x *PrmEndpointInfo) buildRequest(c *Client) (*v2netmap.LocalNodeInfoRequest, error) {
meta := new(v2session.RequestMetaHeader) meta := new(v2session.RequestMetaHeader)
writeXHeadersToMeta(x.xHeaders, meta) writeXHeadersToMeta(x.XHeaders, meta)
req := new(v2netmap.LocalNodeInfoRequest) req := new(v2netmap.LocalNodeInfoRequest)
req.SetBody(new(v2netmap.LocalNodeInfoRequestBody)) req.SetBody(new(v2netmap.LocalNodeInfoRequestBody))
@ -112,12 +112,12 @@ func (c *Client) EndpointInfo(ctx context.Context, prm PrmEndpointInfo) (*ResEnd
// PrmNetworkInfo groups parameters of NetworkInfo operation. // PrmNetworkInfo groups parameters of NetworkInfo operation.
type PrmNetworkInfo struct { type PrmNetworkInfo struct {
prmCommonMeta XHeaders []string
} }
func (x PrmNetworkInfo) buildRequest(c *Client) (*v2netmap.NetworkInfoRequest, error) { func (x PrmNetworkInfo) buildRequest(c *Client) (*v2netmap.NetworkInfoRequest, error) {
meta := new(v2session.RequestMetaHeader) meta := new(v2session.RequestMetaHeader)
writeXHeadersToMeta(x.xHeaders, meta) writeXHeadersToMeta(x.XHeaders, meta)
var req v2netmap.NetworkInfoRequest var req v2netmap.NetworkInfoRequest
req.SetBody(new(v2netmap.NetworkInfoRequestBody)) req.SetBody(new(v2netmap.NetworkInfoRequestBody))

View file

@ -21,71 +21,25 @@ import (
// PrmObjectDelete groups parameters of ObjectDelete operation. // PrmObjectDelete groups parameters of ObjectDelete operation.
type PrmObjectDelete struct { type PrmObjectDelete struct {
meta v2session.RequestMetaHeader XHeaders []string
body v2object.DeleteRequestBody BearerToken *bearer.Token
addr v2refs.Address Session *session.Object
keySet bool ContainerID *cid.ID
key ecdsa.PrivateKey
}
// WithinSession specifies session within which object should be read. ObjectID *oid.ID
//
// Creator of the session acquires the authorship of the request.
// This may affect the execution of an operation (e.g. access control).
//
// Must be signed.
func (x *PrmObjectDelete) WithinSession(t session.Object) {
var tv2 v2session.Token
t.WriteToV2(&tv2)
x.meta.SetSessionToken(&tv2) Key *ecdsa.PrivateKey
}
// WithBearerToken attaches bearer token to be used for the operation.
//
// If set, underlying eACL rules will be used in access control.
//
// Must be signed.
func (x *PrmObjectDelete) WithBearerToken(t bearer.Token) {
var v2token acl.BearerToken
t.WriteToV2(&v2token)
x.meta.SetBearerToken(&v2token)
}
// FromContainer specifies FrostFS container of the object.
// Required parameter.
func (x *PrmObjectDelete) FromContainer(id cid.ID) {
var cidV2 v2refs.ContainerID
id.WriteToV2(&cidV2)
x.addr.SetContainerID(&cidV2)
}
// ByID specifies identifier of the requested object.
// Required parameter.
func (x *PrmObjectDelete) ByID(id oid.ID) {
var idV2 v2refs.ObjectID
id.WriteToV2(&idV2)
x.addr.SetObjectID(&idV2)
} }
// UseKey specifies private key to sign the requests. // UseKey specifies private key to sign the requests.
// If key is not provided, then Client default key is used. // If key is not provided, then Client default key is used.
func (x *PrmObjectDelete) UseKey(key ecdsa.PrivateKey) {
x.keySet = true
x.key = key
}
// WithXHeaders specifies list of extended headers (string key-value pairs)
// to be attached to the request. Must have an even length.
// //
// Slice must not be mutated until the operation completes. // Deprecated: Use PrmObjectDelete.Key instead.
func (x *PrmObjectDelete) WithXHeaders(hs ...string) { func (prm *PrmObjectDelete) UseKey(key ecdsa.PrivateKey) {
writeXHeadersToMeta(hs, &x.meta) prm.Key = &key
} }
// ResObjectDelete groups resulting values of ObjectDelete operation. // ResObjectDelete groups resulting values of ObjectDelete operation.
@ -100,6 +54,54 @@ func (x ResObjectDelete) Tombstone() oid.ID {
return x.tomb return x.tomb
} }
func (prm *PrmObjectDelete) buildRequest(c *Client) (*v2object.DeleteRequest, error) {
if prm.ContainerID == nil {
return nil, errorMissingContainer
}
if prm.ObjectID == nil {
return nil, errorMissingObject
}
if len(prm.XHeaders)%2 != 0 {
return nil, errorInvalidXHeaders
}
meta := new(v2session.RequestMetaHeader)
writeXHeadersToMeta(prm.XHeaders, meta)
if prm.BearerToken != nil {
v2BearerToken := new(acl.BearerToken)
prm.BearerToken.WriteToV2(v2BearerToken)
meta.SetBearerToken(v2BearerToken)
}
if prm.Session != nil {
v2SessionToken := new(v2session.Token)
prm.Session.WriteToV2(v2SessionToken)
meta.SetSessionToken(v2SessionToken)
}
addr := new(v2refs.Address)
cnrV2 := new(v2refs.ContainerID)
prm.ContainerID.WriteToV2(cnrV2)
addr.SetContainerID(cnrV2)
objV2 := new(v2refs.ObjectID)
prm.ObjectID.WriteToV2(objV2)
addr.SetObjectID(objV2)
body := new(v2object.DeleteRequestBody)
body.SetAddress(addr)
req := new(v2object.DeleteRequest)
req.SetBody(body)
c.prepareRequest(req, meta)
return req, nil
}
// ObjectDelete marks an object for deletion from the container using FrostFS API protocol. // ObjectDelete marks an object for deletion from the container using FrostFS API protocol.
// As a marker, a special unit called a tombstone is placed in the container. // As a marker, a special unit called a tombstone is placed in the container.
// It confirms the user's intent to delete the object, and is itself a container object. // It confirms the user's intent to delete the object, and is itself a container object.
@ -124,32 +126,22 @@ func (x ResObjectDelete) Tombstone() oid.ID {
// - *apistatus.ObjectLocked; // - *apistatus.ObjectLocked;
// - *apistatus.SessionTokenExpired. // - *apistatus.SessionTokenExpired.
func (c *Client) ObjectDelete(ctx context.Context, prm PrmObjectDelete) (*ResObjectDelete, error) { func (c *Client) ObjectDelete(ctx context.Context, prm PrmObjectDelete) (*ResObjectDelete, error) {
switch { req, err := prm.buildRequest(c)
case prm.addr.GetContainerID() == nil: if err != nil {
return nil, errorMissingContainer return nil, err
case prm.addr.GetObjectID() == nil:
return nil, errorMissingObject
} }
// form request body
prm.body.SetAddress(&prm.addr)
// form request
var req v2object.DeleteRequest
req.SetBody(&prm.body)
c.prepareRequest(&req, &prm.meta)
key := c.prm.key key := c.prm.key
if prm.keySet { if prm.Key != nil {
key = prm.key key = *prm.Key
} }
err := signature.SignServiceMessage(&key, &req) err = signature.SignServiceMessage(&key, req)
if err != nil { if err != nil {
return nil, fmt.Errorf("sign request: %w", err) return nil, fmt.Errorf("sign request: %w", err)
} }
resp, err := rpcapi.DeleteObject(&c.c, &req, client.WithContext(ctx)) resp, err := rpcapi.DeleteObject(&c.c, req, client.WithContext(ctx))
if err != nil { if err != nil {
return nil, err return nil, err
} }

View file

@ -22,77 +22,76 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session"
) )
// shared parameters of GET/HEAD/RANGE.
type prmObjectRead struct {
meta v2session.RequestMetaHeader
raw bool
addr v2refs.Address
}
// WithXHeaders specifies list of extended headers (string key-value pairs)
// to be attached to the request. Must have an even length.
//
// Slice must not be mutated until the operation completes.
func (x *prmObjectRead) WithXHeaders(hs ...string) {
writeXHeadersToMeta(hs, &x.meta)
}
// MarkRaw marks an intent to read physically stored object.
func (x *prmObjectRead) MarkRaw() {
x.raw = true
}
// MarkLocal tells the server to execute the operation locally.
func (x *prmObjectRead) MarkLocal() {
x.meta.SetTTL(1)
}
// WithinSession specifies session within which object should be read.
//
// Creator of the session acquires the authorship of the request.
// This may affect the execution of an operation (e.g. access control).
//
// Must be signed.
func (x *prmObjectRead) WithinSession(t session.Object) {
var tokv2 v2session.Token
t.WriteToV2(&tokv2)
x.meta.SetSessionToken(&tokv2)
}
// WithBearerToken attaches bearer token to be used for the operation.
//
// If set, underlying eACL rules will be used in access control.
//
// Must be signed.
func (x *prmObjectRead) WithBearerToken(t bearer.Token) {
var v2token acl.BearerToken
t.WriteToV2(&v2token)
x.meta.SetBearerToken(&v2token)
}
// FromContainer specifies FrostFS container of the object.
// Required parameter.
func (x *prmObjectRead) FromContainer(id cid.ID) {
var cnrV2 v2refs.ContainerID
id.WriteToV2(&cnrV2)
x.addr.SetContainerID(&cnrV2)
}
// ByID specifies identifier of the requested object.
// Required parameter.
func (x *prmObjectRead) ByID(id oid.ID) {
var objV2 v2refs.ObjectID
id.WriteToV2(&objV2)
x.addr.SetObjectID(&objV2)
}
// PrmObjectGet groups parameters of ObjectGetInit operation. // PrmObjectGet groups parameters of ObjectGetInit operation.
type PrmObjectGet struct { type PrmObjectGet struct {
prmObjectRead XHeaders []string
key *ecdsa.PrivateKey BearerToken *bearer.Token
Session *session.Object
Raw bool
Local bool
ContainerID *cid.ID
ObjectID *oid.ID
Key *ecdsa.PrivateKey
}
func (prm *PrmObjectGet) buildRequest(c *Client) (*v2object.GetRequest, error) {
if prm.ContainerID == nil {
return nil, errorMissingContainer
}
if prm.ObjectID == nil {
return nil, errorMissingObject
}
if len(prm.XHeaders)%2 != 0 {
return nil, errorInvalidXHeaders
}
meta := new(v2session.RequestMetaHeader)
writeXHeadersToMeta(prm.XHeaders, meta)
if prm.BearerToken != nil {
v2BearerToken := new(acl.BearerToken)
prm.BearerToken.WriteToV2(v2BearerToken)
meta.SetBearerToken(v2BearerToken)
}
if prm.Session != nil {
v2SessionToken := new(v2session.Token)
prm.Session.WriteToV2(v2SessionToken)
meta.SetSessionToken(v2SessionToken)
}
if prm.Local {
meta.SetTTL(1)
}
addr := new(v2refs.Address)
cnrV2 := new(v2refs.ContainerID)
prm.ContainerID.WriteToV2(cnrV2)
addr.SetContainerID(cnrV2)
objV2 := new(v2refs.ObjectID)
prm.ObjectID.WriteToV2(objV2)
addr.SetObjectID(objV2)
body := new(v2object.GetRequestBody)
body.SetRaw(prm.Raw)
body.SetAddress(addr)
req := new(v2object.GetRequest)
req.SetBody(body)
c.prepareRequest(req, meta)
return req, nil
} }
// ResObjectGet groups the final result values of ObjectGetInit operation. // ResObjectGet groups the final result values of ObjectGetInit operation.
@ -122,8 +121,10 @@ type ObjectReader struct {
// UseKey specifies private key to sign the requests. // UseKey specifies private key to sign the requests.
// If key is not provided, then Client default key is used. // If key is not provided, then Client default key is used.
func (x *PrmObjectGet) UseKey(key ecdsa.PrivateKey) { //
x.key = &key // Deprecated: Use PrmObjectGet.Key instead.
func (prm *PrmObjectGet) UseKey(key ecdsa.PrivateKey) {
prm.Key = &key
} }
// ReadHeader reads header of the object. Result means success. // ReadHeader reads header of the object. Result means success.
@ -299,39 +300,24 @@ func (x *ObjectReader) Read(p []byte) (int, error) {
// Returns an error if parameters are set incorrectly (see PrmObjectGet docs). // Returns an error if parameters are set incorrectly (see PrmObjectGet docs).
// Context is required and must not be nil. It is used for network communication. // Context is required and must not be nil. It is used for network communication.
func (c *Client) ObjectGetInit(ctx context.Context, prm PrmObjectGet) (*ObjectReader, error) { func (c *Client) ObjectGetInit(ctx context.Context, prm PrmObjectGet) (*ObjectReader, error) {
// check parameters req, err := prm.buildRequest(c)
switch { if err != nil {
case prm.addr.GetContainerID() == nil: return nil, err
return nil, errorMissingContainer
case prm.addr.GetObjectID() == nil:
return nil, errorMissingObject
} }
// form request body key := prm.Key
var body v2object.GetRequestBody
body.SetRaw(prm.raw)
body.SetAddress(&prm.addr)
// form request
var req v2object.GetRequest
req.SetBody(&body)
c.prepareRequest(&req, &prm.meta)
key := prm.key
if key == nil { if key == nil {
key = &c.prm.key key = &c.prm.key
} }
err := signature.SignServiceMessage(key, &req) err = signature.SignServiceMessage(key, req)
if err != nil { if err != nil {
return nil, fmt.Errorf("sign request: %w", err) return nil, fmt.Errorf("sign request: %w", err)
} }
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
stream, err := rpcapi.GetObject(&c.c, &req, client.WithContext(ctx)) stream, err := rpcapi.GetObject(&c.c, req, client.WithContext(ctx))
if err != nil { if err != nil {
cancel() cancel()
return nil, fmt.Errorf("open stream: %w", err) return nil, fmt.Errorf("open stream: %w", err)
@ -347,17 +333,29 @@ func (c *Client) ObjectGetInit(ctx context.Context, prm PrmObjectGet) (*ObjectRe
// PrmObjectHead groups parameters of ObjectHead operation. // PrmObjectHead groups parameters of ObjectHead operation.
type PrmObjectHead struct { type PrmObjectHead struct {
prmObjectRead XHeaders []string
keySet bool BearerToken *bearer.Token
key ecdsa.PrivateKey
Session *session.Object
Raw bool
Local bool
ContainerID *cid.ID
ObjectID *oid.ID
Key *ecdsa.PrivateKey
} }
// UseKey specifies private key to sign the requests. // UseKey specifies private key to sign the requests.
// If key is not provided, then Client default key is used. // If key is not provided, then Client default key is used.
func (x *PrmObjectHead) UseKey(key ecdsa.PrivateKey) { //
x.keySet = true // Deprecated: Use PrmObjectHead.Key instead.
x.key = key func (prm *PrmObjectHead) UseKey(key ecdsa.PrivateKey) {
prm.Key = &key
} }
// ResObjectHead groups resulting values of ObjectHead operation. // ResObjectHead groups resulting values of ObjectHead operation.
@ -390,6 +388,58 @@ func (x *ResObjectHead) ReadHeader(dst *object.Object) bool {
return true return true
} }
func (prm *PrmObjectHead) buildRequest(c *Client) (*v2object.HeadRequest, error) {
if prm.ContainerID == nil {
return nil, errorMissingContainer
}
if prm.ObjectID == nil {
return nil, errorMissingObject
}
if len(prm.XHeaders)%2 != 0 {
return nil, errorInvalidXHeaders
}
meta := new(v2session.RequestMetaHeader)
writeXHeadersToMeta(prm.XHeaders, meta)
if prm.BearerToken != nil {
v2BearerToken := new(acl.BearerToken)
prm.BearerToken.WriteToV2(v2BearerToken)
meta.SetBearerToken(v2BearerToken)
}
if prm.Session != nil {
v2SessionToken := new(v2session.Token)
prm.Session.WriteToV2(v2SessionToken)
meta.SetSessionToken(v2SessionToken)
}
if prm.Local {
meta.SetTTL(1)
}
addr := new(v2refs.Address)
cnrV2 := new(v2refs.ContainerID)
prm.ContainerID.WriteToV2(cnrV2)
addr.SetContainerID(cnrV2)
objV2 := new(v2refs.ObjectID)
prm.ObjectID.WriteToV2(objV2)
addr.SetObjectID(objV2)
body := new(v2object.HeadRequestBody)
body.SetRaw(prm.Raw)
body.SetAddress(addr)
req := new(v2object.HeadRequest)
req.SetBody(body)
c.prepareRequest(req, meta)
return req, nil
}
// ObjectHead reads object header through a remote server using FrostFS API protocol. // ObjectHead reads object header through a remote server using FrostFS API protocol.
// //
// Exactly one return value is non-nil. By default, server status is returned in res structure. // Exactly one return value is non-nil. By default, server status is returned in res structure.
@ -413,33 +463,24 @@ func (x *ResObjectHead) ReadHeader(dst *object.Object) bool {
// - *apistatus.ObjectAlreadyRemoved; // - *apistatus.ObjectAlreadyRemoved;
// - *apistatus.SessionTokenExpired. // - *apistatus.SessionTokenExpired.
func (c *Client) ObjectHead(ctx context.Context, prm PrmObjectHead) (*ResObjectHead, error) { func (c *Client) ObjectHead(ctx context.Context, prm PrmObjectHead) (*ResObjectHead, error) {
switch { req, err := prm.buildRequest(c)
case prm.addr.GetContainerID() == nil: if err != nil {
return nil, errorMissingContainer return nil, err
case prm.addr.GetObjectID() == nil:
return nil, errorMissingObject
} }
var body v2object.HeadRequestBody
body.SetRaw(prm.raw)
body.SetAddress(&prm.addr)
var req v2object.HeadRequest
req.SetBody(&body)
c.prepareRequest(&req, &prm.meta)
key := c.prm.key key := c.prm.key
if prm.keySet { if prm.Key != nil {
key = prm.key key = *prm.Key
} }
// sign the request // sign the request
err := signature.SignServiceMessage(&key, &req)
err = signature.SignServiceMessage(&key, req)
if err != nil { if err != nil {
return nil, fmt.Errorf("sign request: %w", err) return nil, fmt.Errorf("sign request: %w", err)
} }
resp, err := rpcapi.HeadObject(&c.c, &req, client.WithContext(ctx)) resp, err := rpcapi.HeadObject(&c.c, req, client.WithContext(ctx))
if err != nil { if err != nil {
return nil, fmt.Errorf("write request: %w", err) return nil, fmt.Errorf("write request: %w", err)
} }
@ -454,7 +495,7 @@ func (c *Client) ObjectHead(ctx context.Context, prm PrmObjectHead) (*ResObjectH
return &res, nil return &res, nil
} }
_ = res.idObj.ReadFromV2(*prm.addr.GetObjectID()) res.idObj = *prm.ObjectID
switch v := resp.GetBody().GetHeaderPart().(type) { switch v := resp.GetBody().GetHeaderPart().(type) {
default: default:
@ -470,29 +511,95 @@ func (c *Client) ObjectHead(ctx context.Context, prm PrmObjectHead) (*ResObjectH
// PrmObjectRange groups parameters of ObjectRange operation. // PrmObjectRange groups parameters of ObjectRange operation.
type PrmObjectRange struct { type PrmObjectRange struct {
prmObjectRead XHeaders []string
key *ecdsa.PrivateKey BearerToken *bearer.Token
rng v2object.Range Session *session.Object
Raw bool
Local bool
ContainerID *cid.ID
ObjectID *oid.ID
Key *ecdsa.PrivateKey
Offset uint64
Length uint64
} }
// SetOffset sets offset of the payload range to be read. func (prm *PrmObjectRange) buildRequest(c *Client) (*v2object.GetRangeRequest, error) {
// Zero by default. if prm.Length == 0 {
func (x *PrmObjectRange) SetOffset(off uint64) { return nil, errorZeroRangeLength
x.rng.SetOffset(off) }
}
// SetLength sets length of the payload range to be read. if prm.ContainerID == nil {
// Must be positive. return nil, errorMissingContainer
func (x *PrmObjectRange) SetLength(ln uint64) { }
x.rng.SetLength(ln)
if prm.ObjectID == nil {
return nil, errorMissingObject
}
if len(prm.XHeaders)%2 != 0 {
return nil, errorInvalidXHeaders
}
meta := new(v2session.RequestMetaHeader)
writeXHeadersToMeta(prm.XHeaders, meta)
if prm.BearerToken != nil {
v2BearerToken := new(acl.BearerToken)
prm.BearerToken.WriteToV2(v2BearerToken)
meta.SetBearerToken(v2BearerToken)
}
if prm.Session != nil {
v2SessionToken := new(v2session.Token)
prm.Session.WriteToV2(v2SessionToken)
meta.SetSessionToken(v2SessionToken)
}
if prm.Local {
meta.SetTTL(1)
}
addr := new(v2refs.Address)
cnrV2 := new(v2refs.ContainerID)
prm.ContainerID.WriteToV2(cnrV2)
addr.SetContainerID(cnrV2)
objV2 := new(v2refs.ObjectID)
prm.ObjectID.WriteToV2(objV2)
addr.SetObjectID(objV2)
rng := new(v2object.Range)
rng.SetLength(prm.Length)
rng.SetOffset(prm.Offset)
body := new(v2object.GetRangeRequestBody)
body.SetRaw(prm.Raw)
body.SetAddress(addr)
body.SetRange(rng)
req := new(v2object.GetRangeRequest)
req.SetBody(body)
c.prepareRequest(req, meta)
return req, nil
} }
// UseKey specifies private key to sign the requests. // UseKey specifies private key to sign the requests.
// If key is not provided, then Client default key is used. // If key is not provided, then Client default key is used.
func (x *PrmObjectRange) UseKey(key ecdsa.PrivateKey) { //
x.key = &key // Deprecated: Use PrmObjectRange.Key instead.
func (prm *PrmObjectRange) UseKey(key ecdsa.PrivateKey) {
prm.Key = &key
} }
// ResObjectRange groups the final result values of ObjectRange operation. // ResObjectRange groups the final result values of ObjectRange operation.
@ -662,49 +769,31 @@ func (x *ObjectRangeReader) Read(p []byte) (int, error) {
// Returns an error if parameters are set incorrectly (see PrmObjectRange docs). // Returns an error if parameters are set incorrectly (see PrmObjectRange docs).
// Context is required and must not be nil. It is used for network communication. // Context is required and must not be nil. It is used for network communication.
func (c *Client) ObjectRangeInit(ctx context.Context, prm PrmObjectRange) (*ObjectRangeReader, error) { func (c *Client) ObjectRangeInit(ctx context.Context, prm PrmObjectRange) (*ObjectRangeReader, error) {
// check parameters req, err := prm.buildRequest(c)
switch { if err != nil {
case prm.addr.GetContainerID() == nil: return nil, err
return nil, errorMissingContainer
case prm.addr.GetObjectID() == nil:
return nil, errorMissingObject
case prm.rng.GetLength() == 0:
return nil, errorZeroRangeLength
} }
// form request body key := prm.Key
var body v2object.GetRangeRequestBody
body.SetRaw(prm.raw)
body.SetAddress(&prm.addr)
body.SetRange(&prm.rng)
// form request
var req v2object.GetRangeRequest
req.SetBody(&body)
c.prepareRequest(&req, &prm.meta)
key := prm.key
if key == nil { if key == nil {
key = &c.prm.key key = &c.prm.key
} }
err := signature.SignServiceMessage(key, &req) err = signature.SignServiceMessage(key, req)
if err != nil { if err != nil {
return nil, fmt.Errorf("sign request: %w", err) return nil, fmt.Errorf("sign request: %w", err)
} }
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
stream, err := rpcapi.GetObjectRange(&c.c, &req, client.WithContext(ctx)) stream, err := rpcapi.GetObjectRange(&c.c, req, client.WithContext(ctx))
if err != nil { if err != nil {
cancel() cancel()
return nil, fmt.Errorf("open stream: %w", err) return nil, fmt.Errorf("open stream: %w", err)
} }
var r ObjectRangeReader var r ObjectRangeReader
r.remainingPayloadLen = int(prm.rng.GetLength()) r.remainingPayloadLen = int(prm.Length)
r.cancelCtxStream = cancel r.cancelCtxStream = cancel
r.stream = stream r.stream = stream
r.client = c r.client = c

View file

@ -13,121 +13,53 @@ import (
v2session "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/session" v2session "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/session"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/signature" "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/signature"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/checksum"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session"
) )
// PrmObjectHash groups parameters of ObjectHash operation. // PrmObjectHash groups parameters of ObjectHash operation.
type PrmObjectHash struct { type PrmObjectHash struct {
meta v2session.RequestMetaHeader XHeaders []string
body v2object.GetRangeHashRequestBody BearerToken *bearer.Token
csAlgo v2refs.ChecksumType Session *session.Object
addr v2refs.Address Local bool
keySet bool Ranges []object.Range
key ecdsa.PrivateKey
Salt []byte
ChecksumType checksum.Type
ContainerID *cid.ID
ObjectID *oid.ID
Key *ecdsa.PrivateKey
} }
// UseKey specifies private key to sign the requests. // UseKey specifies private key to sign the requests.
// If key is not provided, then Client default key is used. // If key is not provided, then Client default key is used.
func (x *PrmObjectHash) UseKey(key ecdsa.PrivateKey) {
x.keySet = true
x.key = key
}
// MarkLocal tells the server to execute the operation locally.
func (x *PrmObjectHash) MarkLocal() {
x.meta.SetTTL(1)
}
// WithinSession specifies session within which object should be read.
// //
// Creator of the session acquires the authorship of the request. // Deprecated: Use PrmObjectHash.Key instead.
// This may affect the execution of an operation (e.g. access control). func (prm *PrmObjectHash) UseKey(key ecdsa.PrivateKey) {
// prm.Key = &key
// Must be signed.
func (x *PrmObjectHash) WithinSession(t session.Object) {
var tv2 v2session.Token
t.WriteToV2(&tv2)
x.meta.SetSessionToken(&tv2)
}
// WithBearerToken attaches bearer token to be used for the operation.
//
// If set, underlying eACL rules will be used in access control.
//
// Must be signed.
func (x *PrmObjectHash) WithBearerToken(t bearer.Token) {
var v2token acl.BearerToken
t.WriteToV2(&v2token)
x.meta.SetBearerToken(&v2token)
}
// FromContainer specifies FrostFS container of the object.
// Required parameter.
func (x *PrmObjectHash) FromContainer(id cid.ID) {
var cidV2 v2refs.ContainerID
id.WriteToV2(&cidV2)
x.addr.SetContainerID(&cidV2)
}
// ByID specifies identifier of the requested object.
// Required parameter.
func (x *PrmObjectHash) ByID(id oid.ID) {
var idV2 v2refs.ObjectID
id.WriteToV2(&idV2)
x.addr.SetObjectID(&idV2)
}
// SetRangeList sets list of ranges in (offset, length) pair format.
// Required parameter.
//
// If passed as slice, then it must not be mutated before the operation completes.
func (x *PrmObjectHash) SetRangeList(r ...uint64) {
ln := len(r)
if ln%2 != 0 {
panic("odd number of range parameters")
}
rs := make([]v2object.Range, ln/2)
for i := 0; i < ln/2; i++ {
rs[i].SetOffset(r[2*i])
rs[i].SetLength(r[2*i+1])
}
x.body.SetRanges(rs)
} }
// TillichZemorAlgo changes the hash function to Tillich-Zemor // TillichZemorAlgo changes the hash function to Tillich-Zemor
// (https://link.springer.com/content/pdf/10.1007/3-540-48658-5_5.pdf). // (https://link.springer.com/content/pdf/10.1007/3-540-48658-5_5.pdf).
// //
// By default, SHA256 hash function is used. // By default, SHA256 hash function is used/.
func (x *PrmObjectHash) TillichZemorAlgo() {
x.csAlgo = v2refs.TillichZemor
}
// UseSalt sets the salt to XOR the data range before hashing.
// //
// Must not be mutated before the operation completes. // Deprecated: Use PrmObjectHash.ChecksumType instead.
func (x *PrmObjectHash) UseSalt(salt []byte) { func (prm *PrmObjectHash) TillichZemorAlgo() {
x.body.SetSalt(salt) prm.ChecksumType = checksum.TZ
}
// WithXHeaders specifies list of extended headers (string key-value pairs)
// to be attached to the request. Must have an even length.
//
// Slice must not be mutated until the operation completes.
func (x *PrmObjectHash) WithXHeaders(hs ...string) {
writeXHeadersToMeta(hs, &x.meta)
} }
// ResObjectHash groups resulting values of ObjectHash operation. // ResObjectHash groups resulting values of ObjectHash operation.
@ -142,6 +74,76 @@ func (x ResObjectHash) Checksums() [][]byte {
return x.checksums return x.checksums
} }
func (prm *PrmObjectHash) buildRequest(c *Client) (*v2object.GetRangeHashRequest, error) {
if prm.ContainerID == nil {
return nil, errorMissingContainer
}
if prm.ObjectID == nil {
return nil, errorMissingObject
}
if len(prm.XHeaders)%2 != 0 {
return nil, errorInvalidXHeaders
}
if len(prm.Ranges) == 0 {
return nil, errorMissingRanges
}
meta := new(v2session.RequestMetaHeader)
writeXHeadersToMeta(prm.XHeaders, meta)
if prm.BearerToken != nil {
v2BearerToken := new(acl.BearerToken)
prm.BearerToken.WriteToV2(v2BearerToken)
meta.SetBearerToken(v2BearerToken)
}
if prm.Session != nil {
v2SessionToken := new(v2session.Token)
prm.Session.WriteToV2(v2SessionToken)
meta.SetSessionToken(v2SessionToken)
}
if prm.Local {
meta.SetTTL(1)
}
addr := new(v2refs.Address)
cnrV2 := new(v2refs.ContainerID)
prm.ContainerID.WriteToV2(cnrV2)
addr.SetContainerID(cnrV2)
objV2 := new(v2refs.ObjectID)
prm.ObjectID.WriteToV2(objV2)
addr.SetObjectID(objV2)
rs := make([]v2object.Range, len(prm.Ranges))
for i := range prm.Ranges {
rs[i].SetOffset(prm.Ranges[i].GetOffset())
rs[i].SetLength(prm.Ranges[i].GetLength())
}
body := new(v2object.GetRangeHashRequestBody)
body.SetAddress(addr)
body.SetRanges(rs)
body.SetSalt(prm.Salt)
if prm.ChecksumType == checksum.Unknown {
body.SetType(v2refs.SHA256)
} else {
body.SetType(v2refs.ChecksumType(prm.ChecksumType))
}
req := new(v2object.GetRangeHashRequest)
req.SetBody(body)
c.prepareRequest(req, meta)
return req, nil
}
// ObjectHash requests checksum of the range list of the object payload using // ObjectHash requests checksum of the range list of the object payload using
// FrostFS API protocol. // FrostFS API protocol.
// //
@ -165,37 +167,22 @@ func (x ResObjectHash) Checksums() [][]byte {
// - *apistatus.ObjectOutOfRange; // - *apistatus.ObjectOutOfRange;
// - *apistatus.SessionTokenExpired. // - *apistatus.SessionTokenExpired.
func (c *Client) ObjectHash(ctx context.Context, prm PrmObjectHash) (*ResObjectHash, error) { func (c *Client) ObjectHash(ctx context.Context, prm PrmObjectHash) (*ResObjectHash, error) {
switch { req, err := prm.buildRequest(c)
case prm.addr.GetContainerID() == nil: if err != nil {
return nil, errorMissingContainer return nil, err
case prm.addr.GetObjectID() == nil:
return nil, errorMissingObject
case len(prm.body.GetRanges()) == 0:
return nil, errorMissingRanges
} }
prm.body.SetAddress(&prm.addr)
if prm.csAlgo == v2refs.UnknownChecksum {
prm.body.SetType(v2refs.SHA256)
} else {
prm.body.SetType(prm.csAlgo)
}
var req v2object.GetRangeHashRequest
c.prepareRequest(&req, &prm.meta)
req.SetBody(&prm.body)
key := c.prm.key key := c.prm.key
if prm.keySet { if prm.Key != nil {
key = prm.key key = *prm.Key
} }
err := signature.SignServiceMessage(&key, &req) err = signature.SignServiceMessage(&key, req)
if err != nil { if err != nil {
return nil, fmt.Errorf("sign request: %w", err) return nil, fmt.Errorf("sign request: %w", err)
} }
resp, err := rpcapi.HashObjectRange(&c.c, &req, client.WithContext(ctx)) resp, err := rpcapi.HashObjectRange(&c.c, req, client.WithContext(ctx))
if err != nil { if err != nil {
return nil, fmt.Errorf("write request: %w", err) return nil, fmt.Errorf("write request: %w", err)
} }

View file

@ -16,30 +16,32 @@ import (
// PrmSessionCreate groups parameters of SessionCreate operation. // PrmSessionCreate groups parameters of SessionCreate operation.
type PrmSessionCreate struct { type PrmSessionCreate struct {
prmCommonMeta XHeaders []string
exp uint64 Expiration uint64
keySet bool Key *ecdsa.PrivateKey
key ecdsa.PrivateKey
} }
// SetExp sets number of the last NepFS epoch in the lifetime of the session after which it will be expired. // SetExp sets number of the last NepFS epoch in the lifetime of the session after which it will be expired.
//
// Deprecated: Use PrmSessionCreate.Expiration instead.
func (x *PrmSessionCreate) SetExp(exp uint64) { func (x *PrmSessionCreate) SetExp(exp uint64) {
x.exp = exp x.Expiration = exp
} }
// UseKey specifies private key to sign the requests and compute token owner. // UseKey specifies private key to sign the requests and compute token owner.
// If key is not provided, then Client default key is used. // If key is not provided, then Client default key is used.
//
// Deprecated: Use PrmSessionCreate.Key instead.
func (x *PrmSessionCreate) UseKey(key ecdsa.PrivateKey) { func (x *PrmSessionCreate) UseKey(key ecdsa.PrivateKey) {
x.keySet = true x.Key = &key
x.key = key
} }
func (x *PrmSessionCreate) buildRequest(c *Client) (*v2session.CreateRequest, error) { func (x *PrmSessionCreate) buildRequest(c *Client) (*v2session.CreateRequest, error) {
ownerKey := c.prm.key.PublicKey ownerKey := c.prm.key.PublicKey
if x.keySet { if x.Key != nil {
ownerKey = x.key.PublicKey ownerKey = x.Key.PublicKey
} }
var ownerID user.ID var ownerID user.ID
user.IDFromKey(&ownerID, ownerKey) user.IDFromKey(&ownerID, ownerKey)
@ -49,10 +51,10 @@ func (x *PrmSessionCreate) buildRequest(c *Client) (*v2session.CreateRequest, er
reqBody := new(v2session.CreateRequestBody) reqBody := new(v2session.CreateRequestBody)
reqBody.SetOwnerID(&ownerIDV2) reqBody.SetOwnerID(&ownerIDV2)
reqBody.SetExpiration(x.exp) reqBody.SetExpiration(x.Expiration)
var meta v2session.RequestMetaHeader var meta v2session.RequestMetaHeader
writeXHeadersToMeta(x.xHeaders, &meta) writeXHeadersToMeta(x.XHeaders, &meta)
var req v2session.CreateRequest var req v2session.CreateRequest
req.SetBody(reqBody) req.SetBody(reqBody)

View file

@ -1,8 +1,8 @@
package cid_test package cid_test
import ( import (
"crypto/rand"
"crypto/sha256" "crypto/sha256"
"math/rand"
"testing" "testing"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/refs" "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/refs"

View file

@ -1,8 +1,8 @@
package cidtest package cidtest
import ( import (
"crypto/rand"
"crypto/sha256" "crypto/sha256"
"math/rand"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
) )
@ -11,7 +11,7 @@ import (
func ID() cid.ID { func ID() cid.ID {
checksum := [sha256.Size]byte{} checksum := [sha256.Size]byte{}
rand.Read(checksum[:]) _, _ = rand.Read(checksum[:])
return IDWithChecksum(checksum) return IDWithChecksum(checksum)
} }

View file

@ -1,7 +1,7 @@
package frostfscrypto_test package frostfscrypto_test
import ( import (
"math/rand" "crypto/rand"
"testing" "testing"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/refs" "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/refs"

View file

@ -131,7 +131,8 @@ Its basic syntax is as follows:
REP <count> {IN <select>} REP <count> {IN <select>}
``` ```
If a select is not specified, then the entire netmap is used as input. The resulting nodes will be used to actually store objects and they constitute a replica group (or simply, "a replica"). If a select is not specified, then the entire netmap is used as input. The only exception to this rule is when exactly 1 replica and 1 selector are being present: in this case the only selector is being used instead of the whole netmap.
The resulting nodes will be used to actually store objects and they constitute a replica group (or simply, "a replica").
Examples Examples
```sql ```sql

View file

@ -2,7 +2,7 @@ package eacltest
import ( import (
"bytes" "bytes"
"math/rand" "crypto/rand"
"testing" "testing"
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test" cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"

View file

@ -1,7 +1,7 @@
package eacl package eacl
import ( import (
"math/rand" "crypto/rand"
"testing" "testing"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"

View file

@ -23,7 +23,8 @@ type (
} }
minAgg struct { minAgg struct {
min float64 min float64
minFound bool
} }
meanIQRAgg struct { meanIQRAgg struct {
@ -102,7 +103,13 @@ func (a *meanAgg) Compute() float64 {
} }
func (a *minAgg) Add(n float64) { func (a *minAgg) Add(n float64) {
if a.min == 0 || n < a.min { if !a.minFound {
a.min = n
a.minFound = true
return
}
if n < a.min {
a.min = n a.min = n
} }
} }

44
netmap/aggregator_test.go Normal file
View file

@ -0,0 +1,44 @@
package netmap
import (
"testing"
"github.com/stretchr/testify/require"
)
func TestMinAgg(t *testing.T) {
tests := []struct {
vals []float64
res float64
}{
{
vals: []float64{1, 2, 3, 0, 10},
res: 0,
},
{
vals: []float64{10, 0, 10, 0},
res: 0,
},
{
vals: []float64{0, 1, 2, 3, 10},
res: 0,
},
{
vals: []float64{0, 0, 0, 0},
res: 0,
},
{
vals: []float64{10, 10, 10, 10},
res: 10,
},
}
for _, test := range tests {
a := newMinAgg()
for _, val := range test.vals {
a.Add(val)
}
require.Equal(t, test.res, a.Compute())
}
}

View file

@ -40,6 +40,10 @@ type context struct {
// policy uses the UNIQUE flag. Nodes marked as used are not used in subsequent // policy uses the UNIQUE flag. Nodes marked as used are not used in subsequent
// base selections. // base selections.
usedNodes map[uint64]bool usedNodes map[uint64]bool
// If true, returns an error when netmap does not contain enough nodes for selection.
// By default best effort is taken.
strict bool
} }
// Various validation errors. // Various validation errors.

View file

@ -2,6 +2,7 @@ package netmap
import ( import (
"encoding/json" "encoding/json"
"fmt"
"os" "os"
"path/filepath" "path/filepath"
"testing" "testing"
@ -47,7 +48,8 @@ func TestPlacementPolicy_Interopability(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
for i := range ds { for i := range ds {
bs, err := os.ReadFile(filepath.Join(testsDir, ds[i].Name())) filename := filepath.Join(testsDir, ds[i].Name())
bs, err := os.ReadFile(filename)
require.NoError(t, err) require.NoError(t, err)
var tc TestCase var tc TestCase
@ -56,7 +58,7 @@ func TestPlacementPolicy_Interopability(t *testing.T) {
srcNodes := make([]NodeInfo, len(tc.Nodes)) srcNodes := make([]NodeInfo, len(tc.Nodes))
copy(srcNodes, tc.Nodes) copy(srcNodes, tc.Nodes)
t.Run(tc.Name, func(t *testing.T) { t.Run(fmt.Sprintf("%s:%s", filename, tc.Name), func(t *testing.T) {
var nm NetMap var nm NetMap
nm.SetNodes(tc.Nodes) nm.SetNodes(tc.Nodes)

View file

@ -146,19 +146,19 @@
"select 3 nodes in 3 distinct countries, same placement": { "select 3 nodes in 3 distinct countries, same placement": {
"policy": {"replicas":[{"count":1,"selector":"Main"}],"containerBackupFactor":1,"selectors":[{"name":"Main","count":3,"clause":"DISTINCT","attribute":"Country","filter":"*"}],"filters":[]}, "policy": {"replicas":[{"count":1,"selector":"Main"}],"containerBackupFactor":1,"selectors":[{"name":"Main","count":3,"clause":"DISTINCT","attribute":"Country","filter":"*"}],"filters":[]},
"pivot": "Y29udGFpbmVySUQ=", "pivot": "Y29udGFpbmVySUQ=",
"result": [[4, 0, 7]], "result": [[0, 2, 3]],
"placement": { "placement": {
"pivot": "b2JqZWN0SUQ=", "pivot": "b2JqZWN0SUQ=",
"result": [[4, 0, 7]] "result": [[0, 2, 3]]
} }
}, },
"select 6 nodes in 3 distinct countries, different placement": { "select 6 nodes in 3 distinct countries, different placement": {
"policy": {"replicas":[{"count":1,"selector":"Main"}],"containerBackupFactor":2,"selectors":[{"name":"Main","count":3,"clause":"DISTINCT","attribute":"Country","filter":"*"}],"filters":[]}, "policy": {"replicas":[{"count":1,"selector":"Main"}],"containerBackupFactor":2,"selectors":[{"name":"Main","count":3,"clause":"DISTINCT","attribute":"Country","filter":"*"}],"filters":[]},
"pivot": "Y29udGFpbmVySUQ=", "pivot": "Y29udGFpbmVySUQ=",
"result": [[4, 3, 0, 1, 7, 2]], "result": [[0, 1, 2, 6, 3, 4]],
"placement": { "placement": {
"pivot": "b2JqZWN0SUQ=", "pivot": "b2JqZWN0SUQ=",
"result": [[4, 3, 0, 7, 2, 1]] "result": [[0, 1, 2, 6, 3, 4]]
} }
} }
} }

View file

@ -0,0 +1,95 @@
{
"name": "non-strict selections",
"comment": "These test specify loose selection behaviour, to allow fetching already PUT objects even when there is not enough nodes to select from.",
"nodes": [
{
"attributes": [
{
"key": "Country",
"value": "Russia"
}
]
},
{
"attributes": [
{
"key": "Country",
"value": "Germany"
}
]
},
{
"attributes": [ ]
}
],
"tests": {
"not enough nodes (backup factor)": {
"policy": {
"replicas": [
{
"count": 1,
"selector": "MyStore"
}
],
"containerBackupFactor": 2,
"selectors": [
{
"name": "MyStore",
"count": 2,
"clause": "DISTINCT",
"attribute": "Country",
"filter": "FromRU"
}
],
"filters": [
{
"name": "FromRU",
"key": "Country",
"op": "EQ",
"value": "Russia",
"filters": [ ]
}
]
},
"result": [
[
0
]
]
},
"not enough nodes (buckets)": {
"policy": {
"replicas": [
{
"count": 1,
"selector": "MyStore"
}
],
"containerBackupFactor": 1,
"selectors": [
{
"name": "MyStore",
"count": 2,
"clause": "DISTINCT",
"attribute": "Country",
"filter": "FromRU"
}
],
"filters": [
{
"name": "FromRU",
"key": "Country",
"op": "EQ",
"value": "Russia",
"filters": [ ]
}
]
},
"result": [
[
0
]
]
}
}
}

View file

@ -104,7 +104,14 @@
"selectors": [], "selectors": [],
"filters": [] "filters": []
}, },
"error": "not enough nodes" "result": [
[
0,
1,
2,
3
]
]
} }
} }
} }

View file

@ -24,7 +24,12 @@
"tests": { "tests": {
"missing filter": { "missing filter": {
"policy": { "policy": {
"replicas": [], "replicas": [
{
"count": 1,
"selector": "MyStore"
}
],
"containerBackupFactor": 1, "containerBackupFactor": 1,
"selectors": [ "selectors": [
{ {
@ -47,9 +52,14 @@
}, },
"error": "filter not found" "error": "filter not found"
}, },
"not enough nodes (backup factor)": { "not enough nodes (filter results in empty set)": {
"policy": { "policy": {
"replicas": [], "replicas": [
{
"count": 1,
"selector": "MyStore"
}
],
"containerBackupFactor": 2, "containerBackupFactor": 2,
"selectors": [ "selectors": [
{ {
@ -57,40 +67,15 @@
"count": 2, "count": 2,
"clause": "DISTINCT", "clause": "DISTINCT",
"attribute": "Country", "attribute": "Country",
"filter": "FromRU" "filter": "FromMoon"
} }
], ],
"filters": [ "filters": [
{ {
"name": "FromRU", "name": "FromMoon",
"key": "Country", "key": "Country",
"op": "EQ", "op": "EQ",
"value": "Russia", "value": "Moon",
"filters": []
}
]
},
"error": "not enough nodes"
},
"not enough nodes (buckets)": {
"policy": {
"replicas": [],
"containerBackupFactor": 1,
"selectors": [
{
"name": "MyStore",
"count": 2,
"clause": "DISTINCT",
"attribute": "Country",
"filter": "FromRU"
}
],
"filters": [
{
"name": "FromRU",
"key": "Country",
"op": "EQ",
"value": "Russia",
"filters": [] "filters": []
} }
] ]

View file

@ -238,7 +238,7 @@ func (m NetMap) ContainerNodes(p PlacementPolicy, pivot []byte) ([][]NodeInfo, e
// marked as used by earlier replicas. // marked as used by earlier replicas.
for i := range p.replicas { for i := range p.replicas {
sName := p.replicas[i].GetSelector() sName := p.replicas[i].GetSelector()
if sName == "" { if sName == "" && !(len(p.replicas) == 1 && len(p.selectors) == 1) {
var s netmap.Selector var s netmap.Selector
s.SetCount(p.replicas[i].GetCount()) s.SetCount(p.replicas[i].GetCount())
s.SetFilter(mainFilterName) s.SetFilter(mainFilterName)
@ -258,6 +258,9 @@ func (m NetMap) ContainerNodes(p PlacementPolicy, pivot []byte) ([][]NodeInfo, e
} }
if p.unique { if p.unique {
if c.processedSelectors[sName] == nil {
return nil, fmt.Errorf("selector not found: '%s'", sName)
}
nodes, err := c.getSelection(*c.processedSelectors[sName]) nodes, err := c.getSelection(*c.processedSelectors[sName])
if err != nil { if err != nil {
return nil, err return nil, err

View file

@ -551,7 +551,7 @@ func (p *PlacementPolicy) DecodeString(s string) error {
return errors.New("parsed nil value") return errors.New("parsed nil value")
} }
if err := validatePolicy(*p); err != nil { if err := validatePolicy(*parsed); err != nil {
return fmt.Errorf("invalid policy: %w", err) return fmt.Errorf("invalid policy: %w", err)
} }
@ -605,6 +605,13 @@ var (
errUnknownSelector = errors.New("policy: selector not found") errUnknownSelector = errors.New("policy: selector not found")
// errSyntaxError is returned for errors found by ANTLR parser. // errSyntaxError is returned for errors found by ANTLR parser.
errSyntaxError = errors.New("policy: syntax error") errSyntaxError = errors.New("policy: syntax error")
// errRedundantSelector is returned for errors found by selectors policy validator.
errRedundantSelector = errors.New("policy: found redundant selector")
// errUnnamedSelector is returned for errors found by selectors policy validator.
errUnnamedSelector = errors.New("policy: unnamed selectors are useless, " +
"make sure to pair REP and SELECT clauses: \"REP .. IN X\" + \"SELECT ... AS X\"")
// errRedundantSelector is returned for errors found by filters policy validator.
errRedundantFilter = errors.New("policy: found redundant filter")
) )
type policyVisitor struct { type policyVisitor struct {
@ -845,25 +852,52 @@ func (p *policyVisitor) VisitExpr(ctx *parser.ExprContext) any {
// validatePolicy checks high-level constraints such as filter link in SELECT // validatePolicy checks high-level constraints such as filter link in SELECT
// being actually defined in FILTER section. // being actually defined in FILTER section.
func validatePolicy(p PlacementPolicy) error { func validatePolicy(p PlacementPolicy) error {
canOmitNames := len(p.selectors) == 1 && len(p.replicas) == 1
seenFilters := map[string]bool{} seenFilters := map[string]bool{}
expectedFilters := map[string]struct{}{}
for i := range p.filters { for i := range p.filters {
seenFilters[p.filters[i].GetName()] = true seenFilters[p.filters[i].GetName()] = true
} for _, f := range p.filters[i].GetFilters() {
if f.GetName() != "" {
seenSelectors := map[string]bool{} expectedFilters[f.GetName()] = struct{}{}
}
for i := range p.selectors {
if flt := p.selectors[i].GetFilter(); flt != mainFilterName && !seenFilters[flt] {
return fmt.Errorf("%w: '%s'", errUnknownFilter, flt)
} }
seenSelectors[p.selectors[i].GetName()] = true
} }
seenSelectors := map[string]*netmap.Selector{}
for i := range p.selectors {
if p.selectors[i].GetName() == "" && !canOmitNames {
return errUnnamedSelector
}
if flt := p.selectors[i].GetFilter(); flt != mainFilterName {
expectedFilters[flt] = struct{}{}
if !seenFilters[flt] {
return fmt.Errorf("%w: '%s'", errUnknownFilter, flt)
}
}
seenSelectors[p.selectors[i].GetName()] = &p.selectors[i]
}
for _, f := range p.filters {
if _, ok := expectedFilters[f.GetName()]; !ok {
return fmt.Errorf("%w: '%s'", errRedundantFilter, f.GetName())
}
}
expectedSelectors := map[string]struct{}{}
for i := range p.replicas { for i := range p.replicas {
if sel := p.replicas[i].GetSelector(); sel != "" && !seenSelectors[sel] { selName := p.replicas[i].GetSelector()
return fmt.Errorf("%w: '%s'", errUnknownSelector, sel) if selName != "" || canOmitNames {
expectedSelectors[selName] = struct{}{}
if seenSelectors[selName] == nil {
return fmt.Errorf("%w: '%s'", errUnknownSelector, selName)
}
}
}
for _, s := range p.selectors {
if _, ok := expectedSelectors[s.GetName()]; !ok {
return fmt.Errorf("%w: to use selector '%s' use keyword IN", errRedundantSelector, s.GetName())
} }
} }

View file

@ -0,0 +1,65 @@
package netmap
import (
"strings"
"testing"
"github.com/stretchr/testify/require"
)
func TestDecodeString(t *testing.T) {
testCases := []string{
`REP 2
CBF 2
SELECT 2 FROM *`,
`REP 1 IN X
CBF 1
SELECT 2 IN SAME Location FROM * AS X`,
`REP 1 IN X
REP 2 IN Y
CBF 1
SELECT 2 FROM * AS X
SELECT 3 FROM * AS Y`,
`REP 1 IN X
SELECT 2 IN City FROM Good AS X
FILTER Country EQ RU AS FromRU
FILTER Country EQ EN AS FromEN
FILTER @FromRU AND @FromEN AND Rating GT 7 AS Good`,
`REP 7 IN SPB
SELECT 1 IN City FROM SPBSSD AS SPB
FILTER City EQ SPB AND SSD EQ true OR City EQ SPB AND Rating GE 5 AS SPBSSD`,
`REP 7 IN SPB
SELECT 1 IN City FROM SPBSSD AS SPB
FILTER NOT (NOT (City EQ SPB) AND SSD EQ true OR City EQ SPB AND Rating GE 5) AS SPBSSD`,
`UNIQUE
REP 1
REP 1`,
}
var p PlacementPolicy
for _, testCase := range testCases {
require.NoError(t, p.DecodeString(testCase), "unable parse %s", testCase)
var b strings.Builder
require.NoError(t, p.WriteStringTo(&b))
require.Equal(t, testCase, b.String())
}
invalidTestCases := map[string]error{
`?REP 1`: errSyntaxError,
`REP 1 trailing garbage`: errSyntaxError,
`REP 1 REP 1 SELECT 4 FROM *`: errUnnamedSelector,
`REP 1 SELECT 4 FROM * SELECT 1 FROM *`: errUnnamedSelector,
`REP 1 IN X SELECT 4 FROM *`: errUnknownSelector,
`REP 1 IN X REP 2 SELECT 4 FROM * AS X FILTER 'UN-LOCODE' EQ 'RU LED' AS F`: errRedundantFilter,
}
for i := range invalidTestCases {
require.ErrorIs(t, p.DecodeString(i), invalidTestCases[i], "#%s", i)
}
}

View file

@ -1,7 +1,6 @@
package netmap_test package netmap_test
import ( import (
"strings"
"testing" "testing"
. "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" . "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
@ -9,55 +8,6 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
func TestEncode(t *testing.T) {
testCases := []string{
`REP 1 IN X
CBF 1
SELECT 2 IN SAME Location FROM * AS X`,
`REP 1
SELECT 2 IN City FROM Good
FILTER Country EQ RU AS FromRU
FILTER @FromRU AND Rating GT 7 AS Good`,
`REP 7 IN SPB
SELECT 1 IN City FROM SPBSSD AS SPB
FILTER City EQ SPB AND SSD EQ true OR City EQ SPB AND Rating GE 5 AS SPBSSD`,
`REP 7 IN SPB
SELECT 1 IN City FROM SPBSSD AS SPB
FILTER NOT (NOT (City EQ SPB) AND SSD EQ true OR City EQ SPB AND Rating GE 5) AS SPBSSD`,
`UNIQUE
REP 1
REP 1`,
`REP 1 IN X
SELECT 1 FROM F AS X
FILTER 'UN-LOCODE' EQ 'RU LED' AS F`,
}
var p PlacementPolicy
for _, testCase := range testCases {
require.NoError(t, p.DecodeString(testCase))
var b strings.Builder
require.NoError(t, p.WriteStringTo(&b))
require.Equal(t, testCase, b.String())
}
invalidTestCases := []string{
`?REP 1`,
`REP 1 trailing garbage`,
}
for i := range invalidTestCases {
require.Error(t, p.DecodeString(invalidTestCases[i]), "#%d", i)
}
}
func TestPlacementPolicyEncoding(t *testing.T) { func TestPlacementPolicyEncoding(t *testing.T) {
v := netmaptest.PlacementPolicy() v := netmaptest.PlacementPolicy()

View file

@ -60,7 +60,7 @@ func (c *context) getSelection(s netmap.Selector) ([]nodes, error) {
bucketCount, nodesInBucket := calcNodesCount(s) bucketCount, nodesInBucket := calcNodesCount(s)
buckets := c.getSelectionBase(s) buckets := c.getSelectionBase(s)
if len(buckets) < bucketCount { if c.strict && len(buckets) < bucketCount {
return nil, fmt.Errorf("%w: '%s'", errNotEnoughNodes, s.GetName()) return nil, fmt.Errorf("%w: '%s'", errNotEnoughNodes, s.GetName())
} }
@ -96,7 +96,7 @@ func (c *context) getSelection(s netmap.Selector) ([]nodes, error) {
if len(res) < bucketCount { if len(res) < bucketCount {
// Fallback to using minimum allowed backup factor (1). // Fallback to using minimum allowed backup factor (1).
res = append(res, fallback...) res = append(res, fallback...)
if len(res) < bucketCount { if c.strict && len(res) < bucketCount {
return nil, fmt.Errorf("%w: '%s'", errNotEnoughNodes, s.GetName()) return nil, fmt.Errorf("%w: '%s'", errNotEnoughNodes, s.GetName())
} }
} }
@ -110,6 +110,13 @@ func (c *context) getSelection(s netmap.Selector) ([]nodes, error) {
hrw.SortHasherSliceByWeightValue(res, weights, c.hrwSeedHash) hrw.SortHasherSliceByWeightValue(res, weights, c.hrwSeedHash)
} }
if len(res) < bucketCount {
if len(res) == 0 {
return nil, errNotEnoughNodes
}
bucketCount = len(res)
}
if s.GetAttribute() == "" { if s.GetAttribute() == "" {
res, fallback = res[:bucketCount], res[bucketCount:] res, fallback = res[:bucketCount], res[bucketCount:]
for i := range fallback { for i := range fallback {

View file

@ -1,9 +1,10 @@
package netmap package netmap
import ( import (
"crypto/rand"
"encoding/binary" "encoding/binary"
"fmt" "fmt"
"math/rand" mrand "math/rand"
"sort" "sort"
"strconv" "strconv"
"testing" "testing"
@ -28,10 +29,10 @@ func BenchmarkHRWSort(b *testing.B) {
node.SetPublicKey(key) node.SetPublicKey(key)
vectors[i] = nodes{node} vectors[i] = nodes{node}
weights[i] = float64(rand.Uint32()%10) / 10.0 weights[i] = float64(mrand.Uint32()%10) / 10.0
} }
pivot := rand.Uint64() pivot := mrand.Uint64()
b.Run("sort by index, no weight", func(b *testing.B) { b.Run("sort by index, no weight", func(b *testing.B) {
realNodes := make([]nodes, netmapSize) realNodes := make([]nodes, netmapSize)
b.ResetTimer() b.ResetTimer()
@ -282,6 +283,55 @@ func TestPlacementPolicy_Unique(t *testing.T) {
} }
} }
func TestPlacementPolicy_SingleOmitNames(t *testing.T) {
nodes := []NodeInfo{
nodeInfoFromAttributes("ID", "1", "Country", "Russia", "City", "SPB"),
nodeInfoFromAttributes("ID", "2", "Country", "Germany", "City", "Berlin"),
nodeInfoFromAttributes("ID", "3", "Country", "Russia", "City", "Moscow"),
nodeInfoFromAttributes("ID", "4", "Country", "France", "City", "Paris"),
nodeInfoFromAttributes("ID", "5", "Country", "France", "City", "Lyon"),
nodeInfoFromAttributes("ID", "6", "Country", "Russia", "City", "SPB"),
nodeInfoFromAttributes("ID", "7", "Country", "Russia", "City", "Moscow"),
nodeInfoFromAttributes("ID", "8", "Country", "Germany", "City", "Darmstadt"),
nodeInfoFromAttributes("ID", "9", "Country", "Germany", "City", "Frankfurt"),
nodeInfoFromAttributes("ID", "10", "Country", "Russia", "City", "SPB"),
nodeInfoFromAttributes("ID", "11", "Country", "Russia", "City", "Moscow"),
nodeInfoFromAttributes("ID", "12", "Country", "Germany", "City", "London"),
}
for i := range nodes {
pub := make([]byte, 33)
rand.Read(pub)
nodes[i].SetPublicKey(pub)
}
var nm NetMap
nm.SetNodes(nodes)
for _, unique := range []bool{false, true} {
t.Run(fmt.Sprintf("unique=%t", unique), func(t *testing.T) {
ssNamed := []Selector{newSelector("X", "City", 2, "FromRU", (*Selector).SelectDistinct)}
fsNamed := []Filter{newFilter("FromRU", "Country", "Russia", netmap.EQ)}
rsNamed := []ReplicaDescriptor{newReplica(1, "X")}
pNamed := newPlacementPolicy(3, rsNamed, ssNamed, fsNamed)
pNamed.unique = unique
vNamed, err := nm.ContainerNodes(pNamed, []byte{1})
require.NoError(t, err)
ssUnnamed := []Selector{newSelector("", "City", 2, "FromRU", (*Selector).SelectDistinct)}
fsUnnamed := []Filter{newFilter("FromRU", "Country", "Russia", netmap.EQ)}
rsUnnamed := []ReplicaDescriptor{newReplica(1, "")}
pUnnamed := newPlacementPolicy(3, rsUnnamed, ssUnnamed, fsUnnamed)
pUnnamed.unique = unique
vUnnamed, err := nm.ContainerNodes(pUnnamed, []byte{1})
require.NoError(t, err)
require.Equal(t, vNamed, vUnnamed)
})
}
}
func TestPlacementPolicy_MultiREP(t *testing.T) { func TestPlacementPolicy_MultiREP(t *testing.T) {
nodes := []NodeInfo{ nodes := []NodeInfo{
nodeInfoFromAttributes("ID", "1", "Country", "Russia", "City", "SPB"), nodeInfoFromAttributes("ID", "1", "Country", "Russia", "City", "SPB"),

View file

@ -1,7 +1,7 @@
package netmaptest package netmaptest
import ( import (
"math/rand" "crypto/rand"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
) )
@ -70,7 +70,7 @@ func NetworkInfo() (x netmap.NetworkInfo) {
// NodeInfo returns random netmap.NodeInfo. // NodeInfo returns random netmap.NodeInfo.
func NodeInfo() (x netmap.NodeInfo) { func NodeInfo() (x netmap.NodeInfo) {
key := make([]byte, 33) key := make([]byte, 33)
rand.Read(key) _, _ = rand.Read(key)
x.SetPublicKey(key) x.SetPublicKey(key)
x.SetNetworkEndpoints("1", "2", "3") x.SetNetworkEndpoints("1", "2", "3")

View file

@ -1,10 +1,10 @@
package ns package ns
import ( import (
"crypto/rand"
"errors" "errors"
"fmt" "fmt"
"math/big" "math/big"
"math/rand"
"strings" "strings"
"testing" "testing"

View file

@ -1,8 +1,8 @@
package oidtest package oidtest
import ( import (
"crypto/rand"
"crypto/sha256" "crypto/sha256"
"math/rand"
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test" cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
@ -12,7 +12,7 @@ import (
func ID() oid.ID { func ID() oid.ID {
checksum := [sha256.Size]byte{} checksum := [sha256.Size]byte{}
rand.Read(checksum[:]) _, _ = rand.Read(checksum[:])
return idWithChecksum(checksum) return idWithChecksum(checksum)
} }

View file

@ -1,8 +1,8 @@
package object_test package object_test
import ( import (
"crypto/rand"
"crypto/sha256" "crypto/sha256"
"math/rand"
"testing" "testing"
v2object "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object" v2object "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"

View file

@ -1,8 +1,8 @@
package object package object
import ( import (
"crypto/rand"
"crypto/sha256" "crypto/sha256"
"math/rand"
"testing" "testing"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/tombstone" "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/tombstone"

View file

@ -69,3 +69,7 @@ func (c *sessionCache) expired(val *cacheValue) bool {
// use epoch+1 (clear cache beforehand) to prevent 'expired session token' error right after epoch tick // use epoch+1 (clear cache beforehand) to prevent 'expired session token' error right after epoch tick
return val.token.ExpiredAt(epoch + 1) return val.token.ExpiredAt(epoch + 1)
} }
func (c *sessionCache) Epoch() uint64 {
return c.currentEpoch.Load()
}

View file

@ -189,3 +189,7 @@ func (m *mockClient) restartIfUnhealthy(ctx context.Context) (healthy bool, chan
} }
return return
} }
func (m *mockClient) close() error {
return nil
}

View file

@ -0,0 +1,172 @@
package pool
import (
"context"
sdkClient "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/transformer"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
type logger interface {
log(level zapcore.Level, msg string, fields ...zap.Field)
}
type PrmObjectPutClientCutInit struct {
PrmObjectPut
}
func (c *clientWrapper) objectPutInitTransformer(prm PrmObjectPutClientCutInit) (*objectWriterTransformer, error) {
cl, err := c.getClient()
if err != nil {
return nil, err
}
var w objectWriterTransformer
w.it = internalTarget{
client: cl,
prm: prm,
address: c.address(),
logger: &c.clientStatusMonitor,
}
key := &c.prm.key
if prm.key != nil {
key = prm.key
}
w.ot = transformer.NewPayloadSizeLimiter(transformer.Params{
Key: key,
NextTargetInit: func() transformer.ObjectWriter { return &w.it },
MaxSize: prm.networkInfo.MaxObjectSize(),
WithoutHomomorphicHash: prm.withoutHomomorphicHash,
NetworkState: prm.networkInfo,
SessionToken: prm.stoken,
})
return &w, nil
}
type objectWriterTransformer struct {
ot transformer.ChunkedObjectWriter
it internalTarget
err error
}
func (x *objectWriterTransformer) WriteHeader(ctx context.Context, hdr object.Object) bool {
x.err = x.ot.WriteHeader(ctx, &hdr)
return x.err == nil
}
func (x *objectWriterTransformer) WritePayloadChunk(ctx context.Context, chunk []byte) bool {
_, x.err = x.ot.Write(ctx, chunk)
return x.err == nil
}
// ResObjectPut groups the final result values of ObjectPutInit operation.
type ResObjectPut struct {
Status apistatus.Status
OID oid.ID
}
// Close return non nil result in any case. If error occurred, the result contains only buffer for further reusing.
func (x *objectWriterTransformer) Close(ctx context.Context) (*ResObjectPut, error) {
ai, err := x.ot.Close(ctx)
if err != nil {
return nil, err
}
if ai != nil && ai.ParentID != nil {
x.it.res.OID = *ai.ParentID
}
return &x.it.res, nil
}
type internalTarget struct {
client *sdkClient.Client
res ResObjectPut
prm PrmObjectPutClientCutInit
useStream bool
address string
logger logger
resolveFrostFSErrors bool
}
func (it *internalTarget) WriteObject(ctx context.Context, o *object.Object) error {
putSingleImplemented, err := it.tryPutSingle(ctx, o)
if putSingleImplemented {
return err
}
it.logger.log(zapcore.DebugLevel, "putSingle not implemented, trying put as stream", zap.String("address", it.address))
it.useStream = true
return it.putAsStream(ctx, o)
}
func (it *internalTarget) putAsStream(ctx context.Context, o *object.Object) error {
var cliPrm sdkClient.PrmObjectPutInit
cliPrm.SetCopiesNumberByVectors(it.prm.copiesNumber)
if it.prm.stoken != nil {
cliPrm.WithinSession(*it.prm.stoken)
}
if it.prm.key != nil {
cliPrm.UseKey(*it.prm.key)
}
if it.prm.btoken != nil {
cliPrm.WithBearerToken(*it.prm.btoken)
}
wrt, err := it.client.ObjectPutInit(ctx, cliPrm)
if err != nil {
return err
}
if wrt.WriteHeader(ctx, *o) {
wrt.WritePayloadChunk(ctx, o.Payload())
}
res, err := wrt.Close(ctx)
if res != nil {
it.res.Status = res.Status()
it.res.OID = res.StoredObjectID()
}
return err
}
func (it *internalTarget) tryPutSingle(ctx context.Context, o *object.Object) (bool, error) {
if it.useStream {
return false, nil
}
var cliPrm sdkClient.PrmObjectPutSingle
cliPrm.SetCopiesNumber(it.prm.copiesNumber)
cliPrm.UseKey(it.prm.key)
if it.prm.stoken != nil {
cliPrm.WithinSession(*it.prm.stoken)
}
if it.prm.btoken != nil {
cliPrm.WithBearerToken(*it.prm.btoken)
}
cliPrm.SetObject(o.ToV2())
res, err := it.client.ObjectPutSingle(ctx, cliPrm)
if err != nil && status.Code(err) == codes.Unimplemented {
return false, err
}
if err == nil {
id, _ := o.ID()
it.res = ResObjectPut{
Status: res.Status(),
OID: id,
}
if !it.resolveFrostFSErrors && !apistatus.IsSuccessful(it.res.Status) {
return true, apistatus.ErrFromStatus(it.res.Status)
}
return true, nil
}
return true, err
}

View file

@ -78,12 +78,16 @@ type client interface {
dial(ctx context.Context) error dial(ctx context.Context) error
// see clientWrapper.restartIfUnhealthy. // see clientWrapper.restartIfUnhealthy.
restartIfUnhealthy(ctx context.Context) (bool, bool) restartIfUnhealthy(ctx context.Context) (bool, bool)
// see clientWrapper.close.
close() error
} }
// clientStatus provide access to some metrics for connection. // clientStatus provide access to some metrics for connection.
type clientStatus interface { type clientStatus interface {
// isHealthy checks if the connection can handle requests. // isHealthy checks if the connection can handle requests.
isHealthy() bool isHealthy() bool
// isDialed checks if the connection was created.
isDialed() bool
// setUnhealthy marks client as unhealthy. // setUnhealthy marks client as unhealthy.
setUnhealthy() setUnhealthy()
// address return address of endpoint. // address return address of endpoint.
@ -105,7 +109,7 @@ var errPoolClientUnhealthy = errors.New("pool client unhealthy")
type clientStatusMonitor struct { type clientStatusMonitor struct {
logger *zap.Logger logger *zap.Logger
addr string addr string
healthy *atomic.Bool healthy *atomic.Uint32
errorThreshold uint32 errorThreshold uint32
mu sync.RWMutex // protect counters mu sync.RWMutex // protect counters
@ -114,6 +118,22 @@ type clientStatusMonitor struct {
methods []*methodStatus methods []*methodStatus
} }
// values for healthy status of clientStatusMonitor.
const (
// statusUnhealthyOnDial is set when dialing to the endpoint is failed,
// so there is no connection to the endpoint, and pool should not close it
// before re-establishing connection once again.
statusUnhealthyOnDial = iota
// statusUnhealthyOnRequest is set when communication after dialing to the
// endpoint is failed due to immediate or accumulated errors, connection is
// available and pool should close it before re-establishing connection once again.
statusUnhealthyOnRequest
// statusHealthy is set when connection is ready to be used by the pool.
statusHealthy
)
// methodStatus provide statistic for specific method. // methodStatus provide statistic for specific method.
type methodStatus struct { type methodStatus struct {
name string name string
@ -195,8 +215,8 @@ func newClientStatusMonitor(logger *zap.Logger, addr string, errorThreshold uint
methods[i] = &methodStatus{name: i.String()} methods[i] = &methodStatus{name: i.String()}
} }
healthy := new(atomic.Bool) healthy := new(atomic.Uint32)
healthy.Store(true) healthy.Store(statusHealthy)
return clientStatusMonitor{ return clientStatusMonitor{
logger: logger, logger: logger,
@ -252,6 +272,11 @@ func (x *wrapperPrm) setKey(key ecdsa.PrivateKey) {
x.key = key x.key = key
} }
// setLogger sets sdkClient.Client logger.
func (x *wrapperPrm) setLogger(logger *zap.Logger) {
x.logger = logger
}
// setDialTimeout sets the timeout for connection to be established. // setDialTimeout sets the timeout for connection to be established.
func (x *wrapperPrm) setDialTimeout(timeout time.Duration) { func (x *wrapperPrm) setDialTimeout(timeout time.Duration) {
x.dialTimeout = timeout x.dialTimeout = timeout
@ -317,7 +342,7 @@ func (c *clientWrapper) dial(ctx context.Context) error {
prmDial.SetGRPCDialOptions(c.prm.dialOptions...) prmDial.SetGRPCDialOptions(c.prm.dialOptions...)
if err = cl.Dial(ctx, prmDial); err != nil { if err = cl.Dial(ctx, prmDial); err != nil {
c.setUnhealthy() c.setUnhealthyOnDial()
return err return err
} }
@ -334,6 +359,12 @@ func (c *clientWrapper) restartIfUnhealthy(ctx context.Context) (healthy, change
wasHealthy = true wasHealthy = true
} }
// if connection is dialed before, to avoid routine / connection leak,
// pool has to close it and then initialize once again.
if c.isDialed() {
_ = c.close()
}
var cl sdkClient.Client var cl sdkClient.Client
var prmInit sdkClient.PrmInit var prmInit sdkClient.PrmInit
prmInit.SetDefaultPrivateKey(c.prm.key) prmInit.SetDefaultPrivateKey(c.prm.key)
@ -348,7 +379,7 @@ func (c *clientWrapper) restartIfUnhealthy(ctx context.Context) (healthy, change
prmDial.SetGRPCDialOptions(c.prm.dialOptions...) prmDial.SetGRPCDialOptions(c.prm.dialOptions...)
if err := cl.Dial(ctx, prmDial); err != nil { if err := cl.Dial(ctx, prmDial); err != nil {
c.setUnhealthy() c.setUnhealthyOnDial()
return false, wasHealthy return false, wasHealthy
} }
@ -549,11 +580,9 @@ func (c *clientWrapper) containerSetEACL(ctx context.Context, prm PrmContainerSe
return err return err
} }
var cliPrm sdkClient.PrmContainerSetEACL cliPrm := sdkClient.PrmContainerSetEACL{
cliPrm.SetTable(prm.table) Table: &prm.Table,
Session: prm.Session,
if prm.sessionSet {
cliPrm.WithinSession(prm.session)
} }
start := time.Now() start := time.Now()
@ -567,16 +596,19 @@ func (c *clientWrapper) containerSetEACL(ctx context.Context, prm PrmContainerSe
return fmt.Errorf("set eacl on client: %w", err) return fmt.Errorf("set eacl on client: %w", err)
} }
if !prm.waitParamsSet { if prm.WaitParams == nil {
prm.waitParams.setDefaults() prm.WaitParams = defaultWaitParams()
}
if err := prm.WaitParams.CheckValidity(); err != nil {
return fmt.Errorf("invalid wait parameters: %w", err)
} }
var cIDp *cid.ID var cIDp *cid.ID
if cID, set := prm.table.CID(); set { if cID, set := prm.Table.CID(); set {
cIDp = &cID cIDp = &cID
} }
err = waitForEACLPresence(ctx, c, cIDp, &prm.table, &prm.waitParams) err = waitForEACLPresence(ctx, c, cIDp, &prm.Table, prm.WaitParams)
if err = c.handleError(ctx, nil, err); err != nil { if err = c.handleError(ctx, nil, err); err != nil {
return fmt.Errorf("wait eacl presence on client: %w", err) return fmt.Errorf("wait eacl presence on client: %w", err)
} }
@ -628,6 +660,18 @@ func (c *clientWrapper) networkInfo(ctx context.Context, _ prmNetworkInfo) (netm
// objectPut writes object to FrostFS. // objectPut writes object to FrostFS.
func (c *clientWrapper) objectPut(ctx context.Context, prm PrmObjectPut) (oid.ID, error) { func (c *clientWrapper) objectPut(ctx context.Context, prm PrmObjectPut) (oid.ID, error) {
if prm.bufferMaxSize == 0 {
prm.bufferMaxSize = defaultBufferMaxSizeForPut
}
if prm.clientCut {
return c.objectPutClientCut(ctx, prm)
}
return c.objectPutServerCut(ctx, prm)
}
func (c *clientWrapper) objectPutServerCut(ctx context.Context, prm PrmObjectPut) (oid.ID, error) {
cl, err := c.getClient() cl, err := c.getClient()
if err != nil { if err != nil {
return oid.ID{}, err return oid.ID{}, err
@ -665,10 +709,8 @@ func (c *clientWrapper) objectPut(ctx context.Context, prm PrmObjectPut) (oid.ID
} }
if prm.payload != nil { if prm.payload != nil {
const defaultBufferSizePut = 3 << 20 // configure? if sz == 0 || sz > prm.bufferMaxSize {
sz = prm.bufferMaxSize
if sz == 0 || sz > defaultBufferSizePut {
sz = defaultBufferSizePut
} }
buf := make([]byte, sz) buf := make([]byte, sz)
@ -709,6 +751,73 @@ func (c *clientWrapper) objectPut(ctx context.Context, prm PrmObjectPut) (oid.ID
return res.StoredObjectID(), nil return res.StoredObjectID(), nil
} }
func (c *clientWrapper) objectPutClientCut(ctx context.Context, prm PrmObjectPut) (oid.ID, error) {
putInitPrm := PrmObjectPutClientCutInit{
PrmObjectPut: prm,
}
start := time.Now()
wObj, err := c.objectPutInitTransformer(putInitPrm)
c.incRequests(time.Since(start), methodObjectPut)
if err = c.handleError(ctx, nil, err); err != nil {
return oid.ID{}, fmt.Errorf("init writing on API client: %w", err)
}
if wObj.WriteHeader(ctx, prm.hdr) {
sz := prm.hdr.PayloadSize()
if data := prm.hdr.Payload(); len(data) > 0 {
if prm.payload != nil {
prm.payload = io.MultiReader(bytes.NewReader(data), prm.payload)
} else {
prm.payload = bytes.NewReader(data)
sz = uint64(len(data))
}
}
if prm.payload != nil {
if sz == 0 || sz > prm.bufferMaxSize {
sz = prm.bufferMaxSize
}
buf := make([]byte, sz)
var n int
for {
n, err = prm.payload.Read(buf)
if n > 0 {
start = time.Now()
successWrite := wObj.WritePayloadChunk(ctx, buf[:n])
c.incRequests(time.Since(start), methodObjectPut)
if !successWrite {
break
}
continue
}
if errors.Is(err, io.EOF) {
break
}
return oid.ID{}, fmt.Errorf("read payload: %w", c.handleError(ctx, nil, err))
}
}
}
res, err := wObj.Close(ctx)
var st apistatus.Status
if res != nil {
st = res.Status
}
if err = c.handleError(ctx, st, err); err != nil { // here err already carries both status and client errors
return oid.ID{}, fmt.Errorf("client failure: %w", err)
}
return res.OID, nil
}
// objectDelete invokes sdkClient.ObjectDelete parse response status to error. // objectDelete invokes sdkClient.ObjectDelete parse response status to error.
func (c *clientWrapper) objectDelete(ctx context.Context, prm PrmObjectDelete) error { func (c *clientWrapper) objectDelete(ctx context.Context, prm PrmObjectDelete) error {
cl, err := c.getClient() cl, err := c.getClient()
@ -716,20 +825,15 @@ func (c *clientWrapper) objectDelete(ctx context.Context, prm PrmObjectDelete) e
return err return err
} }
var cliPrm sdkClient.PrmObjectDelete cnr := prm.addr.Container()
cliPrm.FromContainer(prm.addr.Container()) obj := prm.addr.Object()
cliPrm.ByID(prm.addr.Object())
if prm.stoken != nil { cliPrm := sdkClient.PrmObjectDelete{
cliPrm.WithinSession(*prm.stoken) BearerToken: prm.btoken,
} Session: prm.stoken,
ContainerID: &cnr,
if prm.btoken != nil { ObjectID: &obj,
cliPrm.WithBearerToken(*prm.btoken) Key: prm.key,
}
if prm.key != nil {
cliPrm.UseKey(*prm.key)
} }
start := time.Now() start := time.Now()
@ -752,20 +856,15 @@ func (c *clientWrapper) objectGet(ctx context.Context, prm PrmObjectGet) (ResGet
return ResGetObject{}, err return ResGetObject{}, err
} }
var cliPrm sdkClient.PrmObjectGet prmCnr := prm.addr.Container()
cliPrm.FromContainer(prm.addr.Container()) prmObj := prm.addr.Object()
cliPrm.ByID(prm.addr.Object())
if prm.stoken != nil { cliPrm := sdkClient.PrmObjectGet{
cliPrm.WithinSession(*prm.stoken) BearerToken: prm.btoken,
} Session: prm.stoken,
ContainerID: &prmCnr,
if prm.btoken != nil { ObjectID: &prmObj,
cliPrm.WithBearerToken(*prm.btoken) Key: prm.key,
}
if prm.key != nil {
cliPrm.UseKey(*prm.key)
} }
var res ResGetObject var res ResGetObject
@ -805,23 +904,16 @@ func (c *clientWrapper) objectHead(ctx context.Context, prm PrmObjectHead) (obje
return object.Object{}, err return object.Object{}, err
} }
var cliPrm sdkClient.PrmObjectHead prmCnr := prm.addr.Container()
cliPrm.FromContainer(prm.addr.Container()) prmObj := prm.addr.Object()
cliPrm.ByID(prm.addr.Object())
if prm.raw {
cliPrm.MarkRaw()
}
if prm.stoken != nil { cliPrm := sdkClient.PrmObjectHead{
cliPrm.WithinSession(*prm.stoken) BearerToken: prm.btoken,
} Session: prm.stoken,
Raw: prm.raw,
if prm.btoken != nil { ContainerID: &prmCnr,
cliPrm.WithBearerToken(*prm.btoken) ObjectID: &prmObj,
} Key: prm.key,
if prm.key != nil {
cliPrm.UseKey(*prm.key)
} }
var obj object.Object var obj object.Object
@ -850,22 +942,17 @@ func (c *clientWrapper) objectRange(ctx context.Context, prm PrmObjectRange) (Re
return ResObjectRange{}, err return ResObjectRange{}, err
} }
var cliPrm sdkClient.PrmObjectRange prmCnr := prm.addr.Container()
cliPrm.FromContainer(prm.addr.Container()) prmObj := prm.addr.Object()
cliPrm.ByID(prm.addr.Object())
cliPrm.SetOffset(prm.off)
cliPrm.SetLength(prm.ln)
if prm.stoken != nil { cliPrm := sdkClient.PrmObjectRange{
cliPrm.WithinSession(*prm.stoken) BearerToken: prm.btoken,
} Session: prm.stoken,
ContainerID: &prmCnr,
if prm.btoken != nil { ObjectID: &prmObj,
cliPrm.WithBearerToken(*prm.btoken) Offset: prm.off,
} Length: prm.ln,
Key: prm.key,
if prm.key != nil {
cliPrm.UseKey(*prm.key)
} }
start := time.Now() start := time.Now()
@ -922,9 +1009,10 @@ func (c *clientWrapper) sessionCreate(ctx context.Context, prm prmCreateSession)
return resCreateSession{}, err return resCreateSession{}, err
} }
var cliPrm sdkClient.PrmSessionCreate cliPrm := sdkClient.PrmSessionCreate{
cliPrm.SetExp(prm.exp) Expiration: prm.exp,
cliPrm.UseKey(prm.key) Key: &prm.key,
}
start := time.Now() start := time.Now()
res, err := cl.SessionCreate(ctx, cliPrm) res, err := cl.SessionCreate(ctx, cliPrm)
@ -944,15 +1032,23 @@ func (c *clientWrapper) sessionCreate(ctx context.Context, prm prmCreateSession)
} }
func (c *clientStatusMonitor) isHealthy() bool { func (c *clientStatusMonitor) isHealthy() bool {
return c.healthy.Load() return c.healthy.Load() == statusHealthy
}
func (c *clientStatusMonitor) isDialed() bool {
return c.healthy.Load() != statusUnhealthyOnDial
} }
func (c *clientStatusMonitor) setHealthy() { func (c *clientStatusMonitor) setHealthy() {
c.healthy.Store(true) c.healthy.Store(statusHealthy)
} }
func (c *clientStatusMonitor) setUnhealthy() { func (c *clientStatusMonitor) setUnhealthy() {
c.healthy.Store(false) c.healthy.Store(statusUnhealthyOnRequest)
}
func (c *clientStatusMonitor) setUnhealthyOnDial() {
c.healthy.Store(statusUnhealthyOnDial)
} }
func (c *clientStatusMonitor) address() string { func (c *clientStatusMonitor) address() string {
@ -971,12 +1067,20 @@ func (c *clientStatusMonitor) incErrorRate() {
} }
c.mu.Unlock() c.mu.Unlock()
if thresholdReached && c.logger != nil { if thresholdReached {
c.logger.Warn("error threshold reached", c.log(zapcore.WarnLevel, "error threshold reached",
zap.String("address", c.addr), zap.Uint32("threshold", c.errorThreshold)) zap.String("address", c.addr), zap.Uint32("threshold", c.errorThreshold))
} }
} }
func (c *clientStatusMonitor) log(level zapcore.Level, msg string, fields ...zap.Field) {
if c.logger == nil {
return
}
c.logger.Log(level, msg, fields...)
}
func (c *clientStatusMonitor) currentErrorRate() uint32 { func (c *clientStatusMonitor) currentErrorRate() uint32 {
c.mu.RLock() c.mu.RLock()
defer c.mu.RUnlock() defer c.mu.RUnlock()
@ -1010,6 +1114,13 @@ func (c *clientWrapper) incRequests(elapsed time.Duration, method MethodIndex) {
} }
} }
func (c *clientWrapper) close() error {
if c.client != nil {
return c.client.Close()
}
return nil
}
func (c *clientStatusMonitor) handleError(ctx context.Context, st apistatus.Status, err error) error { func (c *clientStatusMonitor) handleError(ctx context.Context, st apistatus.Status, err error) error {
if err != nil { if err != nil {
if needCountError(ctx, err) { if needCountError(ctx, err) {
@ -1227,12 +1338,6 @@ func (x *WaitParams) SetPollInterval(tick time.Duration) {
x.PollInterval = tick x.PollInterval = tick
} }
// Deprecated: Use defaultWaitParams() instead.
func (x *WaitParams) setDefaults() {
x.Timeout = 120 * time.Second
x.PollInterval = 5 * time.Second
}
func defaultWaitParams() *WaitParams { func defaultWaitParams() *WaitParams {
return &WaitParams{ return &WaitParams{
Timeout: 120 * time.Second, Timeout: 120 * time.Second,
@ -1321,6 +1426,13 @@ type PrmObjectPut struct {
payload io.Reader payload io.Reader
copiesNumber []uint32 copiesNumber []uint32
clientCut bool
networkInfo netmap.NetworkInfo
withoutHomomorphicHash bool
bufferMaxSize uint64
} }
// SetHeader specifies header of the object. // SetHeader specifies header of the object.
@ -1345,6 +1457,32 @@ func (x *PrmObjectPut) SetCopiesNumberVector(copiesNumber []uint32) {
x.copiesNumber = copiesNumber x.copiesNumber = copiesNumber
} }
// SetClientCut enables client cut for objects. It means that full object is prepared on client side
// and retrying is possible. But this leads to additional memory using for buffering object parts.
// Buffer size for every put is MaxObjectSize value from FrostFS network.
// There is limit for total memory allocation for in-flight request and
// can be set by InitParameters.SetMaxClientCutMemory (default value is 1gb).
// Put requests will fail if this limit be reached.
func (x *PrmObjectPut) SetClientCut(clientCut bool) {
x.clientCut = clientCut
}
// WithoutHomomorphicHash if set to true do not use Tillich-Zémor hash for payload.
func (x *PrmObjectPut) WithoutHomomorphicHash(v bool) {
x.withoutHomomorphicHash = v
}
// SetBufferMaxSize sets max buffer size to read payload.
// This value isn't used if object size is set explicitly and less than this value.
// Default value 3MB.
func (x *PrmObjectPut) SetBufferMaxSize(size uint64) {
x.bufferMaxSize = size
}
func (x *PrmObjectPut) setNetworkInfo(ni netmap.NetworkInfo) {
x.networkInfo = ni
}
// PrmObjectDelete groups parameters of DeleteObject operation. // PrmObjectDelete groups parameters of DeleteObject operation.
type PrmObjectDelete struct { type PrmObjectDelete struct {
prmCommon prmCommon
@ -1532,39 +1670,41 @@ func (x *PrmContainerEACL) SetContainerID(cnrID cid.ID) {
// PrmContainerSetEACL groups parameters of SetEACL operation. // PrmContainerSetEACL groups parameters of SetEACL operation.
type PrmContainerSetEACL struct { type PrmContainerSetEACL struct {
table eacl.Table Table eacl.Table
sessionSet bool Session *session.Container
session session.Container
waitParams WaitParams WaitParams *WaitParams
waitParamsSet bool
} }
// SetTable sets structure of container's extended ACL to be used as a // SetTable sets structure of container's extended ACL to be used as a
// parameter of the base client's operation. // parameter of the base client's operation.
// //
// See git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client.PrmContainerSetEACL.SetTable. // See git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client.PrmContainerSetEACL.SetTable.
//
// Deprecated: Use PrmContainerSetEACL.Table instead.
func (x *PrmContainerSetEACL) SetTable(table eacl.Table) { func (x *PrmContainerSetEACL) SetTable(table eacl.Table) {
x.table = table x.Table = table
} }
// WithinSession specifies session to be used as a parameter of the base // WithinSession specifies session to be used as a parameter of the base
// client's operation. // client's operation.
// //
// See git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client.PrmContainerSetEACL.WithinSession. // See git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client.PrmContainerSetEACL.WithinSession.
//
// Deprecated: Use PrmContainerSetEACL.Session instead.
func (x *PrmContainerSetEACL) WithinSession(s session.Container) { func (x *PrmContainerSetEACL) WithinSession(s session.Container) {
x.session = s x.Session = &s
x.sessionSet = true
} }
// SetWaitParams specifies timeout params to complete operation. // SetWaitParams specifies timeout params to complete operation.
// If not provided the default one will be used. // If not provided the default one will be used.
// Panics if any of the wait params isn't positive. // Panics if any of the wait params isn't positive.
//
// Deprecated: Use PrmContainerSetEACL.WaitParams instead.
func (x *PrmContainerSetEACL) SetWaitParams(waitParams WaitParams) { func (x *PrmContainerSetEACL) SetWaitParams(waitParams WaitParams) {
waitParams.checkForPositive() waitParams.checkForPositive()
x.waitParams = waitParams x.WaitParams = &waitParams
x.waitParamsSet = true
} }
// PrmBalanceGet groups parameters of Balance operation. // PrmBalanceGet groups parameters of Balance operation.
@ -1638,6 +1778,8 @@ type Pool struct {
rebalanceParams rebalanceParameters rebalanceParams rebalanceParameters
clientBuilder clientBuilder clientBuilder clientBuilder
logger *zap.Logger logger *zap.Logger
maxObjectSize uint64
} }
type innerPool struct { type innerPool struct {
@ -1654,6 +1796,8 @@ const (
defaultHealthcheckTimeout = 4 * time.Second defaultHealthcheckTimeout = 4 * time.Second
defaultDialTimeout = 5 * time.Second defaultDialTimeout = 5 * time.Second
defaultStreamTimeout = 10 * time.Second defaultStreamTimeout = 10 * time.Second
defaultBufferMaxSizeForPut = 3 * 1024 * 1024 // 3 MB
) )
// NewPool creates connection pool using parameters. // NewPool creates connection pool using parameters.
@ -1713,7 +1857,7 @@ func (p *Pool) Dial(ctx context.Context) error {
} }
var st session.Object var st session.Object
err := initSessionForDuration(ctx, &st, clients[j], p.rebalanceParams.sessionExpirationDuration, *p.key) err := initSessionForDuration(ctx, &st, clients[j], p.rebalanceParams.sessionExpirationDuration, *p.key, false)
if err != nil { if err != nil {
clients[j].setUnhealthy() clients[j].setUnhealthy()
p.log(zap.WarnLevel, "failed to create frostfs session token for client", p.log(zap.WarnLevel, "failed to create frostfs session token for client",
@ -1721,7 +1865,7 @@ func (p *Pool) Dial(ctx context.Context) error {
continue continue
} }
_ = p.cache.Put(formCacheKey(addr, p.key), st) _ = p.cache.Put(formCacheKey(addr, p.key, false), st)
atLeastOneHealthy = true atLeastOneHealthy = true
} }
source := rand.NewSource(time.Now().UnixNano()) source := rand.NewSource(time.Now().UnixNano())
@ -1742,6 +1886,12 @@ func (p *Pool) Dial(ctx context.Context) error {
p.closedCh = make(chan struct{}) p.closedCh = make(chan struct{})
p.innerPools = inner p.innerPools = inner
ni, err := p.NetworkInfo(ctx)
if err != nil {
return fmt.Errorf("get network info for max object size: %w", err)
}
p.maxObjectSize = ni.MaxObjectSize()
go p.startRebalance(ctx) go p.startRebalance(ctx)
return nil return nil
} }
@ -1784,6 +1934,7 @@ func fillDefaultInitParams(params *InitParameters, cache *sessionCache) {
var prm wrapperPrm var prm wrapperPrm
prm.setAddress(addr) prm.setAddress(addr)
prm.setKey(*params.key) prm.setKey(*params.key)
prm.setLogger(params.logger)
prm.setDialTimeout(params.nodeDialTimeout) prm.setDialTimeout(params.nodeDialTimeout)
prm.setStreamTimeout(params.nodeStreamTimeout) prm.setStreamTimeout(params.nodeStreamTimeout)
prm.setErrorThreshold(params.errorThreshold) prm.setErrorThreshold(params.errorThreshold)
@ -1952,9 +2103,15 @@ func (p *innerPool) connection() (client, error) {
return nil, errors.New("no healthy client") return nil, errors.New("no healthy client")
} }
func formCacheKey(address string, key *ecdsa.PrivateKey) string { func formCacheKey(address string, key *ecdsa.PrivateKey, clientCut bool) string {
k := keys.PrivateKey{PrivateKey: *key} k := keys.PrivateKey{PrivateKey: *key}
return address + k.String()
stype := "server"
if clientCut {
stype = "client"
}
return address + stype + k.String()
} }
func (p *Pool) checkSessionTokenErr(err error, address string) bool { func (p *Pool) checkSessionTokenErr(err error, address string) bool {
@ -1970,7 +2127,7 @@ func (p *Pool) checkSessionTokenErr(err error, address string) bool {
return false return false
} }
func initSessionForDuration(ctx context.Context, dst *session.Object, c client, dur uint64, ownerKey ecdsa.PrivateKey) error { func initSessionForDuration(ctx context.Context, dst *session.Object, c client, dur uint64, ownerKey ecdsa.PrivateKey, clientCut bool) error {
ni, err := c.networkInfo(ctx, prmNetworkInfo{}) ni, err := c.networkInfo(ctx, prmNetworkInfo{})
if err != nil { if err != nil {
return err return err
@ -1988,23 +2145,25 @@ func initSessionForDuration(ctx context.Context, dst *session.Object, c client,
prm.setExp(exp) prm.setExp(exp)
prm.useKey(ownerKey) prm.useKey(ownerKey)
res, err := c.sessionCreate(ctx, prm) var (
if err != nil { id uuid.UUID
return err key frostfsecdsa.PublicKey
} )
var id uuid.UUID if clientCut {
id = uuid.New()
err = id.UnmarshalBinary(res.id) key = frostfsecdsa.PublicKey(ownerKey.PublicKey)
if err != nil { } else {
return fmt.Errorf("invalid session token ID: %w", err) res, err := c.sessionCreate(ctx, prm)
} if err != nil {
return err
var key frostfsecdsa.PublicKey }
if err = id.UnmarshalBinary(res.id); err != nil {
err = key.Decode(res.sessionKey) return fmt.Errorf("invalid session token ID: %w", err)
if err != nil { }
return fmt.Errorf("invalid public session key: %w", err) if err = key.Decode(res.sessionKey); err != nil {
return fmt.Errorf("invalid public session key: %w", err)
}
} }
dst.SetID(id) dst.SetID(id)
@ -2030,6 +2189,8 @@ type callContext struct {
sessionCnr cid.ID sessionCnr cid.ID
sessionObjSet bool sessionObjSet bool
sessionObjs []oid.ID sessionObjs []oid.ID
sessionClientCut bool
} }
func (p *Pool) initCallContext(ctx *callContext, cfg prmCommon, prmCtx prmContext) error { func (p *Pool) initCallContext(ctx *callContext, cfg prmCommon, prmCtx prmContext) error {
@ -2066,12 +2227,12 @@ func (p *Pool) initCallContext(ctx *callContext, cfg prmCommon, prmCtx prmContex
// opens new session or uses cached one. // opens new session or uses cached one.
// Must be called only on initialized callContext with set sessionTarget. // Must be called only on initialized callContext with set sessionTarget.
func (p *Pool) openDefaultSession(ctx context.Context, cc *callContext) error { func (p *Pool) openDefaultSession(ctx context.Context, cc *callContext) error {
cacheKey := formCacheKey(cc.endpoint, cc.key) cacheKey := formCacheKey(cc.endpoint, cc.key, cc.sessionClientCut)
tok, ok := p.cache.Get(cacheKey) tok, ok := p.cache.Get(cacheKey)
if !ok { if !ok {
// init new session // init new session
err := initSessionForDuration(ctx, &tok, cc.client, p.stokenDuration, *cc.key) err := initSessionForDuration(ctx, &tok, cc.client, p.stokenDuration, *cc.key, cc.sessionClientCut)
if err != nil { if err != nil {
return fmt.Errorf("session API client: %w", err) return fmt.Errorf("session API client: %w", err)
} }
@ -2136,6 +2297,7 @@ func (p *Pool) PutObject(ctx context.Context, prm PrmObjectPut) (oid.ID, error)
p.fillAppropriateKey(&prm.prmCommon) p.fillAppropriateKey(&prm.prmCommon)
var ctxCall callContext var ctxCall callContext
ctxCall.sessionClientCut = prm.clientCut
if err := p.initCallContext(&ctxCall, prm.prmCommon, prmCtx); err != nil { if err := p.initCallContext(&ctxCall, prm.prmCommon, prmCtx); err != nil {
return oid.ID{}, fmt.Errorf("init call context: %w", err) return oid.ID{}, fmt.Errorf("init call context: %w", err)
} }
@ -2147,6 +2309,13 @@ func (p *Pool) PutObject(ctx context.Context, prm PrmObjectPut) (oid.ID, error)
} }
} }
if prm.clientCut {
var ni netmap.NetworkInfo
ni.SetCurrentEpoch(p.cache.Epoch())
ni.SetMaxObjectSize(p.maxObjectSize) // we want to use initial max object size in PayloadSizeLimiter
prm.setNetworkInfo(ni)
}
id, err := ctxCall.client.objectPut(ctx, prm) id, err := ctxCall.client.objectPut(ctx, prm)
if err != nil { if err != nil {
// removes session token from cache in case of token error // removes session token from cache in case of token error
@ -2638,6 +2807,15 @@ func (p *Pool) NetworkInfo(ctx context.Context) (netmap.NetworkInfo, error) {
func (p *Pool) Close() { func (p *Pool) Close() {
p.cancel() p.cancel()
<-p.closedCh <-p.closedCh
// close all clients
for _, pools := range p.innerPools {
for _, cli := range pools.clients {
if cli.isDialed() {
_ = cli.close()
}
}
}
} }
// SyncContainerWithNetwork applies network configuration received via // SyncContainerWithNetwork applies network configuration received via

View file

@ -106,7 +106,7 @@ func TestBuildPoolOneNodeFailed(t *testing.T) {
if err != nil { if err != nil {
return false return false
} }
st, _ := clientPool.cache.Get(formCacheKey(cp.address(), clientPool.key)) st, _ := clientPool.cache.Get(formCacheKey(cp.address(), clientPool.key, false))
return st.AssertAuthKey(&expectedAuthKey) return st.AssertAuthKey(&expectedAuthKey)
} }
require.Never(t, condition, 900*time.Millisecond, 100*time.Millisecond) require.Never(t, condition, 900*time.Millisecond, 100*time.Millisecond)
@ -141,7 +141,7 @@ func TestOneNode(t *testing.T) {
cp, err := pool.connection() cp, err := pool.connection()
require.NoError(t, err) require.NoError(t, err)
st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key)) st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key, false))
expectedAuthKey := frostfsecdsa.PublicKey(key1.PublicKey) expectedAuthKey := frostfsecdsa.PublicKey(key1.PublicKey)
require.True(t, st.AssertAuthKey(&expectedAuthKey)) require.True(t, st.AssertAuthKey(&expectedAuthKey))
} }
@ -171,7 +171,7 @@ func TestTwoNodes(t *testing.T) {
cp, err := pool.connection() cp, err := pool.connection()
require.NoError(t, err) require.NoError(t, err)
st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key)) st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key, false))
require.True(t, assertAuthKeyForAny(st, clientKeys)) require.True(t, assertAuthKeyForAny(st, clientKeys))
} }
@ -226,7 +226,7 @@ func TestOneOfTwoFailed(t *testing.T) {
for i := 0; i < 5; i++ { for i := 0; i < 5; i++ {
cp, err := pool.connection() cp, err := pool.connection()
require.NoError(t, err) require.NoError(t, err)
st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key)) st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key, false))
require.True(t, assertAuthKeyForAny(st, clientKeys)) require.True(t, assertAuthKeyForAny(st, clientKeys))
} }
} }
@ -296,7 +296,7 @@ func TestSessionCache(t *testing.T) {
// cache must contain session token // cache must contain session token
cp, err := pool.connection() cp, err := pool.connection()
require.NoError(t, err) require.NoError(t, err)
st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key)) st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key, false))
require.True(t, st.AssertAuthKey(&expectedAuthKey)) require.True(t, st.AssertAuthKey(&expectedAuthKey))
var prm PrmObjectGet var prm PrmObjectGet
@ -309,7 +309,7 @@ func TestSessionCache(t *testing.T) {
// cache must not contain session token // cache must not contain session token
cp, err = pool.connection() cp, err = pool.connection()
require.NoError(t, err) require.NoError(t, err)
_, ok := pool.cache.Get(formCacheKey(cp.address(), pool.key)) _, ok := pool.cache.Get(formCacheKey(cp.address(), pool.key, false))
require.False(t, ok) require.False(t, ok)
var prm2 PrmObjectPut var prm2 PrmObjectPut
@ -321,7 +321,7 @@ func TestSessionCache(t *testing.T) {
// cache must contain session token // cache must contain session token
cp, err = pool.connection() cp, err = pool.connection()
require.NoError(t, err) require.NoError(t, err)
st, _ = pool.cache.Get(formCacheKey(cp.address(), pool.key)) st, _ = pool.cache.Get(formCacheKey(cp.address(), pool.key, false))
require.True(t, st.AssertAuthKey(&expectedAuthKey)) require.True(t, st.AssertAuthKey(&expectedAuthKey))
} }
@ -365,7 +365,7 @@ func TestPriority(t *testing.T) {
firstNode := func() bool { firstNode := func() bool {
cp, err := pool.connection() cp, err := pool.connection()
require.NoError(t, err) require.NoError(t, err)
st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key)) st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key, false))
return st.AssertAuthKey(&expectedAuthKey1) return st.AssertAuthKey(&expectedAuthKey1)
} }
@ -373,7 +373,7 @@ func TestPriority(t *testing.T) {
secondNode := func() bool { secondNode := func() bool {
cp, err := pool.connection() cp, err := pool.connection()
require.NoError(t, err) require.NoError(t, err)
st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key)) st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key, false))
return st.AssertAuthKey(&expectedAuthKey2) return st.AssertAuthKey(&expectedAuthKey2)
} }
require.Never(t, secondNode, time.Second, 200*time.Millisecond) require.Never(t, secondNode, time.Second, 200*time.Millisecond)
@ -410,7 +410,7 @@ func TestSessionCacheWithKey(t *testing.T) {
// cache must contain session token // cache must contain session token
cp, err := pool.connection() cp, err := pool.connection()
require.NoError(t, err) require.NoError(t, err)
st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key)) st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key, false))
require.True(t, st.AssertAuthKey(&expectedAuthKey)) require.True(t, st.AssertAuthKey(&expectedAuthKey))
var prm PrmObjectDelete var prm PrmObjectDelete
@ -420,7 +420,7 @@ func TestSessionCacheWithKey(t *testing.T) {
err = pool.DeleteObject(ctx, prm) err = pool.DeleteObject(ctx, prm)
require.NoError(t, err) require.NoError(t, err)
st, _ = pool.cache.Get(formCacheKey(cp.address(), anonKey)) st, _ = pool.cache.Get(formCacheKey(cp.address(), anonKey, false))
require.True(t, st.AssertAuthKey(&expectedAuthKey)) require.True(t, st.AssertAuthKey(&expectedAuthKey))
} }
@ -523,6 +523,44 @@ func TestStatusMonitor(t *testing.T) {
require.Equal(t, uint64(count), monitor.overallErrorRate()) require.Equal(t, uint64(count), monitor.overallErrorRate())
require.Equal(t, uint32(1), monitor.currentErrorRate()) require.Equal(t, uint32(1), monitor.currentErrorRate())
t.Run("healthy status", func(t *testing.T) {
cases := []struct {
action func(*clientStatusMonitor)
status uint32
isDialed bool
isHealthy bool
description string
}{
{
action: func(m *clientStatusMonitor) { m.setUnhealthyOnDial() },
status: statusUnhealthyOnDial,
isDialed: false,
isHealthy: false,
description: "set unhealthy on dial",
},
{
action: func(m *clientStatusMonitor) { m.setUnhealthy() },
status: statusUnhealthyOnRequest,
isDialed: true,
isHealthy: false,
description: "set unhealthy on request",
},
{
action: func(m *clientStatusMonitor) { m.setHealthy() },
status: statusHealthy,
isDialed: true,
isHealthy: true,
description: "set healthy",
},
}
for _, tc := range cases {
tc.action(&monitor)
require.Equal(t, tc.status, monitor.healthy.Load())
require.Equal(t, tc.isDialed, monitor.isDialed())
require.Equal(t, tc.isHealthy, monitor.isHealthy())
}
})
} }
func TestHandleError(t *testing.T) { func TestHandleError(t *testing.T) {

View file

@ -3,6 +3,7 @@ package tree
import ( import (
"context" "context"
"crypto/tls" "crypto/tls"
"errors"
"fmt" "fmt"
"sync" "sync"
@ -22,6 +23,11 @@ type treeClient struct {
healthy bool healthy bool
} }
var (
// ErrUnhealthyEndpoint is returned when client in the pool considered unavailable.
ErrUnhealthyEndpoint = errors.New("unhealthy endpoint")
)
// newTreeClient creates new tree client with auto dial. // newTreeClient creates new tree client with auto dial.
func newTreeClient(addr string, opts ...grpc.DialOption) *treeClient { func newTreeClient(addr string, opts ...grpc.DialOption) *treeClient {
return &treeClient{ return &treeClient{
@ -102,7 +108,7 @@ func (c *treeClient) serviceClient() (grpcService.TreeServiceClient, error) {
defer c.mu.RUnlock() defer c.mu.RUnlock()
if c.conn == nil || !c.healthy { if c.conn == nil || !c.healthy {
return nil, fmt.Errorf("unhealthy endpoint: '%s'", c.address) return nil, fmt.Errorf("%w: '%s'", ErrUnhealthyEndpoint, c.address)
} }
return c.service, nil return c.service, nil

View file

@ -32,6 +32,9 @@ var (
// ErrNodeAccessDenied is returned from Tree service in case of access denied error. // ErrNodeAccessDenied is returned from Tree service in case of access denied error.
ErrNodeAccessDenied = errors.New("access denied") ErrNodeAccessDenied = errors.New("access denied")
// errNodeEmpty is used to trigger retry when 'GetNodeByPath' return empty result.
errNodeEmptyResult = errors.New("empty result")
) )
// client represents virtual connection to the single FrostFS tree service from which Pool is formed. // client represents virtual connection to the single FrostFS tree service from which Pool is formed.
@ -296,8 +299,15 @@ func (p *Pool) GetNodes(ctx context.Context, prm GetNodesParams) ([]*grpcService
var resp *grpcService.GetNodeByPathResponse var resp *grpcService.GetNodeByPathResponse
if err := p.requestWithRetry(func(client grpcService.TreeServiceClient) (inErr error) { if err := p.requestWithRetry(func(client grpcService.TreeServiceClient) (inErr error) {
resp, inErr = client.GetNodeByPath(ctx, request) resp, inErr = client.GetNodeByPath(ctx, request)
// Pool wants to do retry 'GetNodeByPath' request if result is empty.
// Empty result is expected due to delayed tree service sync.
// Return an error there to trigger retry and ignore it after,
// to keep compatibility with 'GetNodeByPath' implementation.
if inErr == nil && len(resp.Body.Nodes) == 0 {
return errNodeEmptyResult
}
return handleError("failed to get node by path", inErr) return handleError("failed to get node by path", inErr)
}); err != nil { }); err != nil && !errors.Is(err, errNodeEmptyResult) {
return nil, err return nil, err
} }
@ -720,8 +730,8 @@ func (p *Pool) setStartIndices(i, j int) {
func (p *Pool) requestWithRetry(fn func(client grpcService.TreeServiceClient) error) error { func (p *Pool) requestWithRetry(fn func(client grpcService.TreeServiceClient) error) error {
var ( var (
err error err, finErr error
cl grpcService.TreeServiceClient cl grpcService.TreeServiceClient
) )
startI, startJ := p.getStartIndices() startI, startJ := p.getStartIndices()
@ -740,16 +750,44 @@ func (p *Pool) requestWithRetry(fn func(client grpcService.TreeServiceClient) er
} }
return err return err
} }
finErr = finalError(finErr, err)
p.log(zap.DebugLevel, "tree request error", zap.String("address", p.innerPools[indexI].clients[indexJ].endpoint()), zap.Error(err)) p.log(zap.DebugLevel, "tree request error", zap.String("address", p.innerPools[indexI].clients[indexJ].endpoint()), zap.Error(err))
} }
startJ = 0 startJ = 0
} }
return err return finErr
} }
func shouldTryAgain(err error) bool { func shouldTryAgain(err error) bool {
return !(err == nil || return !(err == nil || errors.Is(err, ErrNodeAccessDenied))
errors.Is(err, ErrNodeNotFound) || }
errors.Is(err, ErrNodeAccessDenied))
func prioErr(err error) int {
switch {
case err == nil:
return -1
case errors.Is(err, ErrNodeAccessDenied):
return 100
case errors.Is(err, ErrNodeNotFound) ||
errors.Is(err, errNodeEmptyResult):
return 200
case errors.Is(err, ErrUnhealthyEndpoint):
return 300
default:
return 500
}
}
func finalError(current, candidate error) error {
if current == nil || candidate == nil {
return candidate
}
// lower priority error is more desirable
if prioErr(candidate) < prioErr(current) {
return candidate
}
return current
} }

View file

@ -156,6 +156,43 @@ func TestRetry(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
checkIndicesAndReset(t, p, 0, 0) checkIndicesAndReset(t, p, 0, 0)
}) })
t.Run("error empty result", func(t *testing.T) {
errNodes, index := 2, 0
err := p.requestWithRetry(func(client grpcService.TreeServiceClient) error {
if index < errNodes {
index++
return errNodeEmptyResult
}
return nil
})
require.NoError(t, err)
checkIndicesAndReset(t, p, 0, errNodes)
})
t.Run("error not found", func(t *testing.T) {
errNodes, index := 2, 0
err := p.requestWithRetry(func(client grpcService.TreeServiceClient) error {
if index < errNodes {
index++
return ErrNodeNotFound
}
return nil
})
require.NoError(t, err)
checkIndicesAndReset(t, p, 0, errNodes)
})
t.Run("error access denied", func(t *testing.T) {
var index int
err := p.requestWithRetry(func(client grpcService.TreeServiceClient) error {
index++
return ErrNodeAccessDenied
})
require.ErrorIs(t, err, ErrNodeAccessDenied)
require.Equal(t, 1, index)
checkIndicesAndReset(t, p, 0, 0)
})
} }
func TestRebalance(t *testing.T) { func TestRebalance(t *testing.T) {

View file

@ -1,9 +1,10 @@
package session_test package session_test
import ( import (
"crypto/rand"
"fmt" "fmt"
"math" "math"
"math/rand" mrand "math/rand"
"testing" "testing"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/refs" "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/refs"
@ -396,7 +397,7 @@ func TestContainer_AppliedTo(t *testing.T) {
func TestContainer_InvalidAt(t *testing.T) { func TestContainer_InvalidAt(t *testing.T) {
var x session.Container var x session.Container
nbf := rand.Uint64() nbf := mrand.Uint64()
if nbf == math.MaxUint64 { if nbf == math.MaxUint64 {
nbf-- nbf--
} }

View file

@ -1,7 +1,7 @@
package user_test package user_test
import ( import (
"math/rand" "crypto/rand"
"testing" "testing"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/refs" "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/refs"