forked from TrueCloudLab/frostfs-sdk-go
Compare commits
33 commits
6fdbe75517
...
fc4551b843
Author | SHA1 | Date | |
---|---|---|---|
fc4551b843 | |||
eb5288f4a5 | |||
60463871db | |||
8a04638749 | |||
ddbfb758c9 | |||
d71a0e0755 | |||
163b3e1961 | |||
84b9d29fc9 | |||
99c273f499 | |||
555ccc63b2 | |||
0550438b53 | |||
c899163860 | |||
ac8fc6d440 | |||
0a0b590df3 | |||
4df642e941 | |||
8bc64e088e | |||
49ad985cad | |||
aa12d8c6a6 | |||
303508328a | |||
55699d1480 | |||
55a1f23e71 | |||
291a71ba84 | |||
5a471e5002 | |||
b5fe52d6bd | |||
84e7e69f98 | |||
46a214d065 | |||
202412230a | |||
3cb3841073 | |||
faeeeab87a | |||
cae215534f | |||
518fb79bc0 | |||
342524159a | |||
22978303f8 |
52 changed files with 1477 additions and 666 deletions
|
@ -25,7 +25,7 @@ linters-settings:
|
|||
# report about shadowed variables
|
||||
check-shadowing: false
|
||||
staticcheck:
|
||||
checks: ["all", "-SA1019"] # TODO Enable SA1019 after deprecated warning are fixed.
|
||||
checks: ["all"]
|
||||
funlen:
|
||||
lines: 80 # default 60
|
||||
statements: 60 # default 40
|
||||
|
|
|
@ -24,17 +24,17 @@ type Checksum refs.Checksum
|
|||
|
||||
// Type represents the enumeration
|
||||
// of checksum types.
|
||||
type Type uint8
|
||||
type Type refs.ChecksumType
|
||||
|
||||
const (
|
||||
// Unknown is an undefined checksum type.
|
||||
Unknown Type = iota
|
||||
Unknown Type = Type(refs.UnknownChecksum)
|
||||
|
||||
// SHA256 is a SHA256 checksum type.
|
||||
SHA256
|
||||
SHA256 = Type(refs.SHA256)
|
||||
|
||||
// TZ is a Tillich-Zémor checksum type.
|
||||
TZ
|
||||
TZ = Type(refs.TillichZemor)
|
||||
)
|
||||
|
||||
// ReadFromV2 reads Checksum from the refs.Checksum message. Checks if the
|
||||
|
|
|
@ -2,9 +2,9 @@ package checksum
|
|||
|
||||
import (
|
||||
"bytes"
|
||||
"crypto/rand"
|
||||
"crypto/sha256"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/refs"
|
||||
)
|
||||
|
|
|
@ -1,8 +1,8 @@
|
|||
package checksumtest
|
||||
|
||||
import (
|
||||
"crypto/rand"
|
||||
"crypto/sha256"
|
||||
"math/rand"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/checksum"
|
||||
)
|
||||
|
@ -11,7 +11,7 @@ import (
|
|||
func Checksum() checksum.Checksum {
|
||||
var cs [sha256.Size]byte
|
||||
|
||||
rand.Read(cs[:])
|
||||
_, _ = rand.Read(cs[:])
|
||||
|
||||
var x checksum.Checksum
|
||||
|
||||
|
|
|
@ -47,6 +47,8 @@ func writeXHeadersToMeta(xHeaders []string, h *v2session.RequestMetaHeader) {
|
|||
return
|
||||
}
|
||||
|
||||
// TODO (aarifullin): remove the panic when all client parameters will check XHeaders
|
||||
// within buildRequest invocation.
|
||||
if len(xHeaders)%2 != 0 {
|
||||
panic("slice of X-Headers with odd length")
|
||||
}
|
||||
|
|
|
@ -18,20 +18,20 @@ import (
|
|||
|
||||
// PrmContainerSetEACL groups parameters of ContainerSetEACL operation.
|
||||
type PrmContainerSetEACL struct {
|
||||
prmCommonMeta
|
||||
// FrostFS request X-Headers.
|
||||
XHeaders []string
|
||||
|
||||
tableSet bool
|
||||
table eacl.Table
|
||||
Table *eacl.Table
|
||||
|
||||
sessionSet bool
|
||||
session session.Container
|
||||
Session *session.Container
|
||||
}
|
||||
|
||||
// SetTable sets eACL table structure to be set for the container.
|
||||
// Required parameter.
|
||||
//
|
||||
// Deprecated: Use PrmContainerSetEACL.Table instead.
|
||||
func (x *PrmContainerSetEACL) SetTable(table eacl.Table) {
|
||||
x.table = table
|
||||
x.tableSet = true
|
||||
x.Table = &table
|
||||
}
|
||||
|
||||
// WithinSession specifies session within which extended ACL of the container
|
||||
|
@ -45,17 +45,22 @@ func (x *PrmContainerSetEACL) SetTable(table eacl.Table) {
|
|||
// for which extended ACL is going to be set
|
||||
// - session operation MUST be session.VerbContainerSetEACL (ForVerb)
|
||||
// - token MUST be signed using private key of the owner of the container to be saved
|
||||
//
|
||||
// Deprecated: Use PrmContainerSetEACL.Session instead.
|
||||
func (x *PrmContainerSetEACL) WithinSession(s session.Container) {
|
||||
x.session = s
|
||||
x.sessionSet = true
|
||||
x.Session = &s
|
||||
}
|
||||
|
||||
func (x *PrmContainerSetEACL) buildRequest(c *Client) (*v2container.SetExtendedACLRequest, error) {
|
||||
if !x.tableSet {
|
||||
if x.Table == nil {
|
||||
return nil, errorEACLTableNotSet
|
||||
}
|
||||
|
||||
eaclV2 := x.table.ToV2()
|
||||
if len(x.XHeaders)%2 != 0 {
|
||||
return nil, errorInvalidXHeaders
|
||||
}
|
||||
|
||||
eaclV2 := x.Table.ToV2()
|
||||
|
||||
var sig frostfscrypto.Signature
|
||||
|
||||
|
@ -72,11 +77,11 @@ func (x *PrmContainerSetEACL) buildRequest(c *Client) (*v2container.SetExtendedA
|
|||
reqBody.SetSignature(&sigv2)
|
||||
|
||||
var meta v2session.RequestMetaHeader
|
||||
writeXHeadersToMeta(x.prmCommonMeta.xHeaders, &meta)
|
||||
writeXHeadersToMeta(x.XHeaders, &meta)
|
||||
|
||||
if x.sessionSet {
|
||||
if x.Session != nil {
|
||||
var tokv2 v2session.Token
|
||||
x.session.WriteToV2(&tokv2)
|
||||
x.Session.WriteToV2(&tokv2)
|
||||
|
||||
meta.SetSessionToken(&tokv2)
|
||||
}
|
||||
|
|
|
@ -14,27 +14,29 @@ import (
|
|||
|
||||
// PrmAnnounceSpace groups parameters of ContainerAnnounceUsedSpace operation.
|
||||
type PrmAnnounceSpace struct {
|
||||
prmCommonMeta
|
||||
XHeaders []string
|
||||
|
||||
announcements []container.SizeEstimation
|
||||
Announcements []container.SizeEstimation
|
||||
}
|
||||
|
||||
// SetValues sets values describing volume of space that is used for the container objects.
|
||||
// Required parameter. Must not be empty.
|
||||
//
|
||||
// Must not be mutated before the end of the operation.
|
||||
//
|
||||
// Deprecated: Use PrmAnnounceSpace.Announcements instead.
|
||||
func (x *PrmAnnounceSpace) SetValues(vs []container.SizeEstimation) {
|
||||
x.announcements = vs
|
||||
x.Announcements = vs
|
||||
}
|
||||
|
||||
func (x *PrmAnnounceSpace) buildRequest(c *Client) (*v2container.AnnounceUsedSpaceRequest, error) {
|
||||
if len(x.announcements) == 0 {
|
||||
if len(x.Announcements) == 0 {
|
||||
return nil, errorMissingAnnouncements
|
||||
}
|
||||
|
||||
v2announce := make([]v2container.UsedSpaceAnnouncement, len(x.announcements))
|
||||
for i := range x.announcements {
|
||||
x.announcements[i].WriteToV2(&v2announce[i])
|
||||
v2announce := make([]v2container.UsedSpaceAnnouncement, len(x.Announcements))
|
||||
for i := range x.Announcements {
|
||||
x.Announcements[i].WriteToV2(&v2announce[i])
|
||||
}
|
||||
|
||||
reqBody := new(v2container.AnnounceUsedSpaceRequestBody)
|
||||
|
|
|
@ -16,12 +16,12 @@ import (
|
|||
|
||||
// PrmEndpointInfo groups parameters of EndpointInfo operation.
|
||||
type PrmEndpointInfo struct {
|
||||
prmCommonMeta
|
||||
XHeaders []string
|
||||
}
|
||||
|
||||
func (x *PrmEndpointInfo) buildRequest(c *Client) (*v2netmap.LocalNodeInfoRequest, error) {
|
||||
meta := new(v2session.RequestMetaHeader)
|
||||
writeXHeadersToMeta(x.xHeaders, meta)
|
||||
writeXHeadersToMeta(x.XHeaders, meta)
|
||||
|
||||
req := new(v2netmap.LocalNodeInfoRequest)
|
||||
req.SetBody(new(v2netmap.LocalNodeInfoRequestBody))
|
||||
|
@ -112,12 +112,12 @@ func (c *Client) EndpointInfo(ctx context.Context, prm PrmEndpointInfo) (*ResEnd
|
|||
|
||||
// PrmNetworkInfo groups parameters of NetworkInfo operation.
|
||||
type PrmNetworkInfo struct {
|
||||
prmCommonMeta
|
||||
XHeaders []string
|
||||
}
|
||||
|
||||
func (x PrmNetworkInfo) buildRequest(c *Client) (*v2netmap.NetworkInfoRequest, error) {
|
||||
meta := new(v2session.RequestMetaHeader)
|
||||
writeXHeadersToMeta(x.xHeaders, meta)
|
||||
writeXHeadersToMeta(x.XHeaders, meta)
|
||||
|
||||
var req v2netmap.NetworkInfoRequest
|
||||
req.SetBody(new(v2netmap.NetworkInfoRequestBody))
|
||||
|
|
|
@ -21,71 +21,25 @@ import (
|
|||
|
||||
// PrmObjectDelete groups parameters of ObjectDelete operation.
|
||||
type PrmObjectDelete struct {
|
||||
meta v2session.RequestMetaHeader
|
||||
XHeaders []string
|
||||
|
||||
body v2object.DeleteRequestBody
|
||||
BearerToken *bearer.Token
|
||||
|
||||
addr v2refs.Address
|
||||
Session *session.Object
|
||||
|
||||
keySet bool
|
||||
key ecdsa.PrivateKey
|
||||
}
|
||||
ContainerID *cid.ID
|
||||
|
||||
// WithinSession specifies session within which object should be read.
|
||||
//
|
||||
// Creator of the session acquires the authorship of the request.
|
||||
// This may affect the execution of an operation (e.g. access control).
|
||||
//
|
||||
// Must be signed.
|
||||
func (x *PrmObjectDelete) WithinSession(t session.Object) {
|
||||
var tv2 v2session.Token
|
||||
t.WriteToV2(&tv2)
|
||||
ObjectID *oid.ID
|
||||
|
||||
x.meta.SetSessionToken(&tv2)
|
||||
}
|
||||
|
||||
// WithBearerToken attaches bearer token to be used for the operation.
|
||||
//
|
||||
// If set, underlying eACL rules will be used in access control.
|
||||
//
|
||||
// Must be signed.
|
||||
func (x *PrmObjectDelete) WithBearerToken(t bearer.Token) {
|
||||
var v2token acl.BearerToken
|
||||
t.WriteToV2(&v2token)
|
||||
x.meta.SetBearerToken(&v2token)
|
||||
}
|
||||
|
||||
// FromContainer specifies FrostFS container of the object.
|
||||
// Required parameter.
|
||||
func (x *PrmObjectDelete) FromContainer(id cid.ID) {
|
||||
var cidV2 v2refs.ContainerID
|
||||
id.WriteToV2(&cidV2)
|
||||
|
||||
x.addr.SetContainerID(&cidV2)
|
||||
}
|
||||
|
||||
// ByID specifies identifier of the requested object.
|
||||
// Required parameter.
|
||||
func (x *PrmObjectDelete) ByID(id oid.ID) {
|
||||
var idV2 v2refs.ObjectID
|
||||
id.WriteToV2(&idV2)
|
||||
|
||||
x.addr.SetObjectID(&idV2)
|
||||
Key *ecdsa.PrivateKey
|
||||
}
|
||||
|
||||
// UseKey specifies private key to sign the requests.
|
||||
// If key is not provided, then Client default key is used.
|
||||
func (x *PrmObjectDelete) UseKey(key ecdsa.PrivateKey) {
|
||||
x.keySet = true
|
||||
x.key = key
|
||||
}
|
||||
|
||||
// WithXHeaders specifies list of extended headers (string key-value pairs)
|
||||
// to be attached to the request. Must have an even length.
|
||||
//
|
||||
// Slice must not be mutated until the operation completes.
|
||||
func (x *PrmObjectDelete) WithXHeaders(hs ...string) {
|
||||
writeXHeadersToMeta(hs, &x.meta)
|
||||
// Deprecated: Use PrmObjectDelete.Key instead.
|
||||
func (prm *PrmObjectDelete) UseKey(key ecdsa.PrivateKey) {
|
||||
prm.Key = &key
|
||||
}
|
||||
|
||||
// ResObjectDelete groups resulting values of ObjectDelete operation.
|
||||
|
@ -100,6 +54,54 @@ func (x ResObjectDelete) Tombstone() oid.ID {
|
|||
return x.tomb
|
||||
}
|
||||
|
||||
func (prm *PrmObjectDelete) buildRequest(c *Client) (*v2object.DeleteRequest, error) {
|
||||
if prm.ContainerID == nil {
|
||||
return nil, errorMissingContainer
|
||||
}
|
||||
|
||||
if prm.ObjectID == nil {
|
||||
return nil, errorMissingObject
|
||||
}
|
||||
|
||||
if len(prm.XHeaders)%2 != 0 {
|
||||
return nil, errorInvalidXHeaders
|
||||
}
|
||||
|
||||
meta := new(v2session.RequestMetaHeader)
|
||||
writeXHeadersToMeta(prm.XHeaders, meta)
|
||||
|
||||
if prm.BearerToken != nil {
|
||||
v2BearerToken := new(acl.BearerToken)
|
||||
prm.BearerToken.WriteToV2(v2BearerToken)
|
||||
meta.SetBearerToken(v2BearerToken)
|
||||
}
|
||||
|
||||
if prm.Session != nil {
|
||||
v2SessionToken := new(v2session.Token)
|
||||
prm.Session.WriteToV2(v2SessionToken)
|
||||
meta.SetSessionToken(v2SessionToken)
|
||||
}
|
||||
|
||||
addr := new(v2refs.Address)
|
||||
|
||||
cnrV2 := new(v2refs.ContainerID)
|
||||
prm.ContainerID.WriteToV2(cnrV2)
|
||||
addr.SetContainerID(cnrV2)
|
||||
|
||||
objV2 := new(v2refs.ObjectID)
|
||||
prm.ObjectID.WriteToV2(objV2)
|
||||
addr.SetObjectID(objV2)
|
||||
|
||||
body := new(v2object.DeleteRequestBody)
|
||||
body.SetAddress(addr)
|
||||
|
||||
req := new(v2object.DeleteRequest)
|
||||
req.SetBody(body)
|
||||
c.prepareRequest(req, meta)
|
||||
|
||||
return req, nil
|
||||
}
|
||||
|
||||
// ObjectDelete marks an object for deletion from the container using FrostFS API protocol.
|
||||
// As a marker, a special unit called a tombstone is placed in the container.
|
||||
// It confirms the user's intent to delete the object, and is itself a container object.
|
||||
|
@ -124,32 +126,22 @@ func (x ResObjectDelete) Tombstone() oid.ID {
|
|||
// - *apistatus.ObjectLocked;
|
||||
// - *apistatus.SessionTokenExpired.
|
||||
func (c *Client) ObjectDelete(ctx context.Context, prm PrmObjectDelete) (*ResObjectDelete, error) {
|
||||
switch {
|
||||
case prm.addr.GetContainerID() == nil:
|
||||
return nil, errorMissingContainer
|
||||
case prm.addr.GetObjectID() == nil:
|
||||
return nil, errorMissingObject
|
||||
req, err := prm.buildRequest(c)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// form request body
|
||||
prm.body.SetAddress(&prm.addr)
|
||||
|
||||
// form request
|
||||
var req v2object.DeleteRequest
|
||||
req.SetBody(&prm.body)
|
||||
c.prepareRequest(&req, &prm.meta)
|
||||
|
||||
key := c.prm.key
|
||||
if prm.keySet {
|
||||
key = prm.key
|
||||
if prm.Key != nil {
|
||||
key = *prm.Key
|
||||
}
|
||||
|
||||
err := signature.SignServiceMessage(&key, &req)
|
||||
err = signature.SignServiceMessage(&key, req)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("sign request: %w", err)
|
||||
}
|
||||
|
||||
resp, err := rpcapi.DeleteObject(&c.c, &req, client.WithContext(ctx))
|
||||
resp, err := rpcapi.DeleteObject(&c.c, req, client.WithContext(ctx))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -22,77 +22,76 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session"
|
||||
)
|
||||
|
||||
// shared parameters of GET/HEAD/RANGE.
|
||||
type prmObjectRead struct {
|
||||
meta v2session.RequestMetaHeader
|
||||
|
||||
raw bool
|
||||
|
||||
addr v2refs.Address
|
||||
}
|
||||
|
||||
// WithXHeaders specifies list of extended headers (string key-value pairs)
|
||||
// to be attached to the request. Must have an even length.
|
||||
//
|
||||
// Slice must not be mutated until the operation completes.
|
||||
func (x *prmObjectRead) WithXHeaders(hs ...string) {
|
||||
writeXHeadersToMeta(hs, &x.meta)
|
||||
}
|
||||
|
||||
// MarkRaw marks an intent to read physically stored object.
|
||||
func (x *prmObjectRead) MarkRaw() {
|
||||
x.raw = true
|
||||
}
|
||||
|
||||
// MarkLocal tells the server to execute the operation locally.
|
||||
func (x *prmObjectRead) MarkLocal() {
|
||||
x.meta.SetTTL(1)
|
||||
}
|
||||
|
||||
// WithinSession specifies session within which object should be read.
|
||||
//
|
||||
// Creator of the session acquires the authorship of the request.
|
||||
// This may affect the execution of an operation (e.g. access control).
|
||||
//
|
||||
// Must be signed.
|
||||
func (x *prmObjectRead) WithinSession(t session.Object) {
|
||||
var tokv2 v2session.Token
|
||||
t.WriteToV2(&tokv2)
|
||||
x.meta.SetSessionToken(&tokv2)
|
||||
}
|
||||
|
||||
// WithBearerToken attaches bearer token to be used for the operation.
|
||||
//
|
||||
// If set, underlying eACL rules will be used in access control.
|
||||
//
|
||||
// Must be signed.
|
||||
func (x *prmObjectRead) WithBearerToken(t bearer.Token) {
|
||||
var v2token acl.BearerToken
|
||||
t.WriteToV2(&v2token)
|
||||
x.meta.SetBearerToken(&v2token)
|
||||
}
|
||||
|
||||
// FromContainer specifies FrostFS container of the object.
|
||||
// Required parameter.
|
||||
func (x *prmObjectRead) FromContainer(id cid.ID) {
|
||||
var cnrV2 v2refs.ContainerID
|
||||
id.WriteToV2(&cnrV2)
|
||||
x.addr.SetContainerID(&cnrV2)
|
||||
}
|
||||
|
||||
// ByID specifies identifier of the requested object.
|
||||
// Required parameter.
|
||||
func (x *prmObjectRead) ByID(id oid.ID) {
|
||||
var objV2 v2refs.ObjectID
|
||||
id.WriteToV2(&objV2)
|
||||
x.addr.SetObjectID(&objV2)
|
||||
}
|
||||
|
||||
// PrmObjectGet groups parameters of ObjectGetInit operation.
|
||||
type PrmObjectGet struct {
|
||||
prmObjectRead
|
||||
XHeaders []string
|
||||
|
||||
key *ecdsa.PrivateKey
|
||||
BearerToken *bearer.Token
|
||||
|
||||
Session *session.Object
|
||||
|
||||
Raw bool
|
||||
|
||||
Local bool
|
||||
|
||||
ContainerID *cid.ID
|
||||
|
||||
ObjectID *oid.ID
|
||||
|
||||
Key *ecdsa.PrivateKey
|
||||
}
|
||||
|
||||
func (prm *PrmObjectGet) buildRequest(c *Client) (*v2object.GetRequest, error) {
|
||||
if prm.ContainerID == nil {
|
||||
return nil, errorMissingContainer
|
||||
}
|
||||
|
||||
if prm.ObjectID == nil {
|
||||
return nil, errorMissingObject
|
||||
}
|
||||
|
||||
if len(prm.XHeaders)%2 != 0 {
|
||||
return nil, errorInvalidXHeaders
|
||||
}
|
||||
|
||||
meta := new(v2session.RequestMetaHeader)
|
||||
writeXHeadersToMeta(prm.XHeaders, meta)
|
||||
|
||||
if prm.BearerToken != nil {
|
||||
v2BearerToken := new(acl.BearerToken)
|
||||
prm.BearerToken.WriteToV2(v2BearerToken)
|
||||
meta.SetBearerToken(v2BearerToken)
|
||||
}
|
||||
|
||||
if prm.Session != nil {
|
||||
v2SessionToken := new(v2session.Token)
|
||||
prm.Session.WriteToV2(v2SessionToken)
|
||||
meta.SetSessionToken(v2SessionToken)
|
||||
}
|
||||
|
||||
if prm.Local {
|
||||
meta.SetTTL(1)
|
||||
}
|
||||
|
||||
addr := new(v2refs.Address)
|
||||
|
||||
cnrV2 := new(v2refs.ContainerID)
|
||||
prm.ContainerID.WriteToV2(cnrV2)
|
||||
addr.SetContainerID(cnrV2)
|
||||
|
||||
objV2 := new(v2refs.ObjectID)
|
||||
prm.ObjectID.WriteToV2(objV2)
|
||||
addr.SetObjectID(objV2)
|
||||
|
||||
body := new(v2object.GetRequestBody)
|
||||
body.SetRaw(prm.Raw)
|
||||
body.SetAddress(addr)
|
||||
|
||||
req := new(v2object.GetRequest)
|
||||
req.SetBody(body)
|
||||
c.prepareRequest(req, meta)
|
||||
|
||||
return req, nil
|
||||
}
|
||||
|
||||
// ResObjectGet groups the final result values of ObjectGetInit operation.
|
||||
|
@ -122,8 +121,10 @@ type ObjectReader struct {
|
|||
|
||||
// UseKey specifies private key to sign the requests.
|
||||
// If key is not provided, then Client default key is used.
|
||||
func (x *PrmObjectGet) UseKey(key ecdsa.PrivateKey) {
|
||||
x.key = &key
|
||||
//
|
||||
// Deprecated: Use PrmObjectGet.Key instead.
|
||||
func (prm *PrmObjectGet) UseKey(key ecdsa.PrivateKey) {
|
||||
prm.Key = &key
|
||||
}
|
||||
|
||||
// ReadHeader reads header of the object. Result means success.
|
||||
|
@ -299,39 +300,24 @@ func (x *ObjectReader) Read(p []byte) (int, error) {
|
|||
// Returns an error if parameters are set incorrectly (see PrmObjectGet docs).
|
||||
// Context is required and must not be nil. It is used for network communication.
|
||||
func (c *Client) ObjectGetInit(ctx context.Context, prm PrmObjectGet) (*ObjectReader, error) {
|
||||
// check parameters
|
||||
switch {
|
||||
case prm.addr.GetContainerID() == nil:
|
||||
return nil, errorMissingContainer
|
||||
case prm.addr.GetObjectID() == nil:
|
||||
return nil, errorMissingObject
|
||||
req, err := prm.buildRequest(c)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// form request body
|
||||
var body v2object.GetRequestBody
|
||||
|
||||
body.SetRaw(prm.raw)
|
||||
body.SetAddress(&prm.addr)
|
||||
|
||||
// form request
|
||||
var req v2object.GetRequest
|
||||
|
||||
req.SetBody(&body)
|
||||
c.prepareRequest(&req, &prm.meta)
|
||||
|
||||
key := prm.key
|
||||
key := prm.Key
|
||||
if key == nil {
|
||||
key = &c.prm.key
|
||||
}
|
||||
|
||||
err := signature.SignServiceMessage(key, &req)
|
||||
err = signature.SignServiceMessage(key, req)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("sign request: %w", err)
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
|
||||
stream, err := rpcapi.GetObject(&c.c, &req, client.WithContext(ctx))
|
||||
stream, err := rpcapi.GetObject(&c.c, req, client.WithContext(ctx))
|
||||
if err != nil {
|
||||
cancel()
|
||||
return nil, fmt.Errorf("open stream: %w", err)
|
||||
|
@ -347,17 +333,29 @@ func (c *Client) ObjectGetInit(ctx context.Context, prm PrmObjectGet) (*ObjectRe
|
|||
|
||||
// PrmObjectHead groups parameters of ObjectHead operation.
|
||||
type PrmObjectHead struct {
|
||||
prmObjectRead
|
||||
XHeaders []string
|
||||
|
||||
keySet bool
|
||||
key ecdsa.PrivateKey
|
||||
BearerToken *bearer.Token
|
||||
|
||||
Session *session.Object
|
||||
|
||||
Raw bool
|
||||
|
||||
Local bool
|
||||
|
||||
ContainerID *cid.ID
|
||||
|
||||
ObjectID *oid.ID
|
||||
|
||||
Key *ecdsa.PrivateKey
|
||||
}
|
||||
|
||||
// UseKey specifies private key to sign the requests.
|
||||
// If key is not provided, then Client default key is used.
|
||||
func (x *PrmObjectHead) UseKey(key ecdsa.PrivateKey) {
|
||||
x.keySet = true
|
||||
x.key = key
|
||||
//
|
||||
// Deprecated: Use PrmObjectHead.Key instead.
|
||||
func (prm *PrmObjectHead) UseKey(key ecdsa.PrivateKey) {
|
||||
prm.Key = &key
|
||||
}
|
||||
|
||||
// ResObjectHead groups resulting values of ObjectHead operation.
|
||||
|
@ -390,6 +388,58 @@ func (x *ResObjectHead) ReadHeader(dst *object.Object) bool {
|
|||
return true
|
||||
}
|
||||
|
||||
func (prm *PrmObjectHead) buildRequest(c *Client) (*v2object.HeadRequest, error) {
|
||||
if prm.ContainerID == nil {
|
||||
return nil, errorMissingContainer
|
||||
}
|
||||
|
||||
if prm.ObjectID == nil {
|
||||
return nil, errorMissingObject
|
||||
}
|
||||
|
||||
if len(prm.XHeaders)%2 != 0 {
|
||||
return nil, errorInvalidXHeaders
|
||||
}
|
||||
|
||||
meta := new(v2session.RequestMetaHeader)
|
||||
writeXHeadersToMeta(prm.XHeaders, meta)
|
||||
|
||||
if prm.BearerToken != nil {
|
||||
v2BearerToken := new(acl.BearerToken)
|
||||
prm.BearerToken.WriteToV2(v2BearerToken)
|
||||
meta.SetBearerToken(v2BearerToken)
|
||||
}
|
||||
|
||||
if prm.Session != nil {
|
||||
v2SessionToken := new(v2session.Token)
|
||||
prm.Session.WriteToV2(v2SessionToken)
|
||||
meta.SetSessionToken(v2SessionToken)
|
||||
}
|
||||
|
||||
if prm.Local {
|
||||
meta.SetTTL(1)
|
||||
}
|
||||
|
||||
addr := new(v2refs.Address)
|
||||
|
||||
cnrV2 := new(v2refs.ContainerID)
|
||||
prm.ContainerID.WriteToV2(cnrV2)
|
||||
addr.SetContainerID(cnrV2)
|
||||
|
||||
objV2 := new(v2refs.ObjectID)
|
||||
prm.ObjectID.WriteToV2(objV2)
|
||||
addr.SetObjectID(objV2)
|
||||
body := new(v2object.HeadRequestBody)
|
||||
body.SetRaw(prm.Raw)
|
||||
body.SetAddress(addr)
|
||||
|
||||
req := new(v2object.HeadRequest)
|
||||
req.SetBody(body)
|
||||
c.prepareRequest(req, meta)
|
||||
|
||||
return req, nil
|
||||
}
|
||||
|
||||
// ObjectHead reads object header through a remote server using FrostFS API protocol.
|
||||
//
|
||||
// Exactly one return value is non-nil. By default, server status is returned in res structure.
|
||||
|
@ -413,33 +463,24 @@ func (x *ResObjectHead) ReadHeader(dst *object.Object) bool {
|
|||
// - *apistatus.ObjectAlreadyRemoved;
|
||||
// - *apistatus.SessionTokenExpired.
|
||||
func (c *Client) ObjectHead(ctx context.Context, prm PrmObjectHead) (*ResObjectHead, error) {
|
||||
switch {
|
||||
case prm.addr.GetContainerID() == nil:
|
||||
return nil, errorMissingContainer
|
||||
case prm.addr.GetObjectID() == nil:
|
||||
return nil, errorMissingObject
|
||||
req, err := prm.buildRequest(c)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var body v2object.HeadRequestBody
|
||||
body.SetRaw(prm.raw)
|
||||
body.SetAddress(&prm.addr)
|
||||
|
||||
var req v2object.HeadRequest
|
||||
req.SetBody(&body)
|
||||
c.prepareRequest(&req, &prm.meta)
|
||||
|
||||
key := c.prm.key
|
||||
if prm.keySet {
|
||||
key = prm.key
|
||||
if prm.Key != nil {
|
||||
key = *prm.Key
|
||||
}
|
||||
|
||||
// sign the request
|
||||
err := signature.SignServiceMessage(&key, &req)
|
||||
|
||||
err = signature.SignServiceMessage(&key, req)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("sign request: %w", err)
|
||||
}
|
||||
|
||||
resp, err := rpcapi.HeadObject(&c.c, &req, client.WithContext(ctx))
|
||||
resp, err := rpcapi.HeadObject(&c.c, req, client.WithContext(ctx))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("write request: %w", err)
|
||||
}
|
||||
|
@ -454,7 +495,7 @@ func (c *Client) ObjectHead(ctx context.Context, prm PrmObjectHead) (*ResObjectH
|
|||
return &res, nil
|
||||
}
|
||||
|
||||
_ = res.idObj.ReadFromV2(*prm.addr.GetObjectID())
|
||||
res.idObj = *prm.ObjectID
|
||||
|
||||
switch v := resp.GetBody().GetHeaderPart().(type) {
|
||||
default:
|
||||
|
@ -470,29 +511,95 @@ func (c *Client) ObjectHead(ctx context.Context, prm PrmObjectHead) (*ResObjectH
|
|||
|
||||
// PrmObjectRange groups parameters of ObjectRange operation.
|
||||
type PrmObjectRange struct {
|
||||
prmObjectRead
|
||||
XHeaders []string
|
||||
|
||||
key *ecdsa.PrivateKey
|
||||
BearerToken *bearer.Token
|
||||
|
||||
rng v2object.Range
|
||||
Session *session.Object
|
||||
|
||||
Raw bool
|
||||
|
||||
Local bool
|
||||
|
||||
ContainerID *cid.ID
|
||||
|
||||
ObjectID *oid.ID
|
||||
|
||||
Key *ecdsa.PrivateKey
|
||||
|
||||
Offset uint64
|
||||
|
||||
Length uint64
|
||||
}
|
||||
|
||||
// SetOffset sets offset of the payload range to be read.
|
||||
// Zero by default.
|
||||
func (x *PrmObjectRange) SetOffset(off uint64) {
|
||||
x.rng.SetOffset(off)
|
||||
func (prm *PrmObjectRange) buildRequest(c *Client) (*v2object.GetRangeRequest, error) {
|
||||
if prm.Length == 0 {
|
||||
return nil, errorZeroRangeLength
|
||||
}
|
||||
|
||||
// SetLength sets length of the payload range to be read.
|
||||
// Must be positive.
|
||||
func (x *PrmObjectRange) SetLength(ln uint64) {
|
||||
x.rng.SetLength(ln)
|
||||
if prm.ContainerID == nil {
|
||||
return nil, errorMissingContainer
|
||||
}
|
||||
|
||||
if prm.ObjectID == nil {
|
||||
return nil, errorMissingObject
|
||||
}
|
||||
|
||||
if len(prm.XHeaders)%2 != 0 {
|
||||
return nil, errorInvalidXHeaders
|
||||
}
|
||||
|
||||
meta := new(v2session.RequestMetaHeader)
|
||||
writeXHeadersToMeta(prm.XHeaders, meta)
|
||||
|
||||
if prm.BearerToken != nil {
|
||||
v2BearerToken := new(acl.BearerToken)
|
||||
prm.BearerToken.WriteToV2(v2BearerToken)
|
||||
meta.SetBearerToken(v2BearerToken)
|
||||
}
|
||||
|
||||
if prm.Session != nil {
|
||||
v2SessionToken := new(v2session.Token)
|
||||
prm.Session.WriteToV2(v2SessionToken)
|
||||
meta.SetSessionToken(v2SessionToken)
|
||||
}
|
||||
|
||||
if prm.Local {
|
||||
meta.SetTTL(1)
|
||||
}
|
||||
|
||||
addr := new(v2refs.Address)
|
||||
|
||||
cnrV2 := new(v2refs.ContainerID)
|
||||
prm.ContainerID.WriteToV2(cnrV2)
|
||||
addr.SetContainerID(cnrV2)
|
||||
|
||||
objV2 := new(v2refs.ObjectID)
|
||||
prm.ObjectID.WriteToV2(objV2)
|
||||
addr.SetObjectID(objV2)
|
||||
|
||||
rng := new(v2object.Range)
|
||||
rng.SetLength(prm.Length)
|
||||
rng.SetOffset(prm.Offset)
|
||||
|
||||
body := new(v2object.GetRangeRequestBody)
|
||||
body.SetRaw(prm.Raw)
|
||||
body.SetAddress(addr)
|
||||
body.SetRange(rng)
|
||||
|
||||
req := new(v2object.GetRangeRequest)
|
||||
req.SetBody(body)
|
||||
c.prepareRequest(req, meta)
|
||||
|
||||
return req, nil
|
||||
}
|
||||
|
||||
// UseKey specifies private key to sign the requests.
|
||||
// If key is not provided, then Client default key is used.
|
||||
func (x *PrmObjectRange) UseKey(key ecdsa.PrivateKey) {
|
||||
x.key = &key
|
||||
//
|
||||
// Deprecated: Use PrmObjectRange.Key instead.
|
||||
func (prm *PrmObjectRange) UseKey(key ecdsa.PrivateKey) {
|
||||
prm.Key = &key
|
||||
}
|
||||
|
||||
// ResObjectRange groups the final result values of ObjectRange operation.
|
||||
|
@ -662,49 +769,31 @@ func (x *ObjectRangeReader) Read(p []byte) (int, error) {
|
|||
// Returns an error if parameters are set incorrectly (see PrmObjectRange docs).
|
||||
// Context is required and must not be nil. It is used for network communication.
|
||||
func (c *Client) ObjectRangeInit(ctx context.Context, prm PrmObjectRange) (*ObjectRangeReader, error) {
|
||||
// check parameters
|
||||
switch {
|
||||
case prm.addr.GetContainerID() == nil:
|
||||
return nil, errorMissingContainer
|
||||
case prm.addr.GetObjectID() == nil:
|
||||
return nil, errorMissingObject
|
||||
case prm.rng.GetLength() == 0:
|
||||
return nil, errorZeroRangeLength
|
||||
req, err := prm.buildRequest(c)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// form request body
|
||||
var body v2object.GetRangeRequestBody
|
||||
|
||||
body.SetRaw(prm.raw)
|
||||
body.SetAddress(&prm.addr)
|
||||
body.SetRange(&prm.rng)
|
||||
|
||||
// form request
|
||||
var req v2object.GetRangeRequest
|
||||
|
||||
req.SetBody(&body)
|
||||
c.prepareRequest(&req, &prm.meta)
|
||||
|
||||
key := prm.key
|
||||
key := prm.Key
|
||||
if key == nil {
|
||||
key = &c.prm.key
|
||||
}
|
||||
|
||||
err := signature.SignServiceMessage(key, &req)
|
||||
err = signature.SignServiceMessage(key, req)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("sign request: %w", err)
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
|
||||
stream, err := rpcapi.GetObjectRange(&c.c, &req, client.WithContext(ctx))
|
||||
stream, err := rpcapi.GetObjectRange(&c.c, req, client.WithContext(ctx))
|
||||
if err != nil {
|
||||
cancel()
|
||||
return nil, fmt.Errorf("open stream: %w", err)
|
||||
}
|
||||
|
||||
var r ObjectRangeReader
|
||||
r.remainingPayloadLen = int(prm.rng.GetLength())
|
||||
r.remainingPayloadLen = int(prm.Length)
|
||||
r.cancelCtxStream = cancel
|
||||
r.stream = stream
|
||||
r.client = c
|
||||
|
|
|
@ -13,121 +13,53 @@ import (
|
|||
v2session "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/session"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/signature"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/checksum"
|
||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session"
|
||||
)
|
||||
|
||||
// PrmObjectHash groups parameters of ObjectHash operation.
|
||||
type PrmObjectHash struct {
|
||||
meta v2session.RequestMetaHeader
|
||||
XHeaders []string
|
||||
|
||||
body v2object.GetRangeHashRequestBody
|
||||
BearerToken *bearer.Token
|
||||
|
||||
csAlgo v2refs.ChecksumType
|
||||
Session *session.Object
|
||||
|
||||
addr v2refs.Address
|
||||
Local bool
|
||||
|
||||
keySet bool
|
||||
key ecdsa.PrivateKey
|
||||
Ranges []object.Range
|
||||
|
||||
Salt []byte
|
||||
|
||||
ChecksumType checksum.Type
|
||||
|
||||
ContainerID *cid.ID
|
||||
|
||||
ObjectID *oid.ID
|
||||
|
||||
Key *ecdsa.PrivateKey
|
||||
}
|
||||
|
||||
// UseKey specifies private key to sign the requests.
|
||||
// If key is not provided, then Client default key is used.
|
||||
func (x *PrmObjectHash) UseKey(key ecdsa.PrivateKey) {
|
||||
x.keySet = true
|
||||
x.key = key
|
||||
}
|
||||
|
||||
// MarkLocal tells the server to execute the operation locally.
|
||||
func (x *PrmObjectHash) MarkLocal() {
|
||||
x.meta.SetTTL(1)
|
||||
}
|
||||
|
||||
// WithinSession specifies session within which object should be read.
|
||||
//
|
||||
// Creator of the session acquires the authorship of the request.
|
||||
// This may affect the execution of an operation (e.g. access control).
|
||||
//
|
||||
// Must be signed.
|
||||
func (x *PrmObjectHash) WithinSession(t session.Object) {
|
||||
var tv2 v2session.Token
|
||||
t.WriteToV2(&tv2)
|
||||
|
||||
x.meta.SetSessionToken(&tv2)
|
||||
}
|
||||
|
||||
// WithBearerToken attaches bearer token to be used for the operation.
|
||||
//
|
||||
// If set, underlying eACL rules will be used in access control.
|
||||
//
|
||||
// Must be signed.
|
||||
func (x *PrmObjectHash) WithBearerToken(t bearer.Token) {
|
||||
var v2token acl.BearerToken
|
||||
t.WriteToV2(&v2token)
|
||||
x.meta.SetBearerToken(&v2token)
|
||||
}
|
||||
|
||||
// FromContainer specifies FrostFS container of the object.
|
||||
// Required parameter.
|
||||
func (x *PrmObjectHash) FromContainer(id cid.ID) {
|
||||
var cidV2 v2refs.ContainerID
|
||||
id.WriteToV2(&cidV2)
|
||||
|
||||
x.addr.SetContainerID(&cidV2)
|
||||
}
|
||||
|
||||
// ByID specifies identifier of the requested object.
|
||||
// Required parameter.
|
||||
func (x *PrmObjectHash) ByID(id oid.ID) {
|
||||
var idV2 v2refs.ObjectID
|
||||
id.WriteToV2(&idV2)
|
||||
|
||||
x.addr.SetObjectID(&idV2)
|
||||
}
|
||||
|
||||
// SetRangeList sets list of ranges in (offset, length) pair format.
|
||||
// Required parameter.
|
||||
//
|
||||
// If passed as slice, then it must not be mutated before the operation completes.
|
||||
func (x *PrmObjectHash) SetRangeList(r ...uint64) {
|
||||
ln := len(r)
|
||||
if ln%2 != 0 {
|
||||
panic("odd number of range parameters")
|
||||
}
|
||||
|
||||
rs := make([]v2object.Range, ln/2)
|
||||
|
||||
for i := 0; i < ln/2; i++ {
|
||||
rs[i].SetOffset(r[2*i])
|
||||
rs[i].SetLength(r[2*i+1])
|
||||
}
|
||||
|
||||
x.body.SetRanges(rs)
|
||||
// Deprecated: Use PrmObjectHash.Key instead.
|
||||
func (prm *PrmObjectHash) UseKey(key ecdsa.PrivateKey) {
|
||||
prm.Key = &key
|
||||
}
|
||||
|
||||
// TillichZemorAlgo changes the hash function to Tillich-Zemor
|
||||
// (https://link.springer.com/content/pdf/10.1007/3-540-48658-5_5.pdf).
|
||||
//
|
||||
// By default, SHA256 hash function is used.
|
||||
func (x *PrmObjectHash) TillichZemorAlgo() {
|
||||
x.csAlgo = v2refs.TillichZemor
|
||||
}
|
||||
|
||||
// UseSalt sets the salt to XOR the data range before hashing.
|
||||
// By default, SHA256 hash function is used/.
|
||||
//
|
||||
// Must not be mutated before the operation completes.
|
||||
func (x *PrmObjectHash) UseSalt(salt []byte) {
|
||||
x.body.SetSalt(salt)
|
||||
}
|
||||
|
||||
// WithXHeaders specifies list of extended headers (string key-value pairs)
|
||||
// to be attached to the request. Must have an even length.
|
||||
//
|
||||
// Slice must not be mutated until the operation completes.
|
||||
func (x *PrmObjectHash) WithXHeaders(hs ...string) {
|
||||
writeXHeadersToMeta(hs, &x.meta)
|
||||
// Deprecated: Use PrmObjectHash.ChecksumType instead.
|
||||
func (prm *PrmObjectHash) TillichZemorAlgo() {
|
||||
prm.ChecksumType = checksum.TZ
|
||||
}
|
||||
|
||||
// ResObjectHash groups resulting values of ObjectHash operation.
|
||||
|
@ -142,6 +74,76 @@ func (x ResObjectHash) Checksums() [][]byte {
|
|||
return x.checksums
|
||||
}
|
||||
|
||||
func (prm *PrmObjectHash) buildRequest(c *Client) (*v2object.GetRangeHashRequest, error) {
|
||||
if prm.ContainerID == nil {
|
||||
return nil, errorMissingContainer
|
||||
}
|
||||
|
||||
if prm.ObjectID == nil {
|
||||
return nil, errorMissingObject
|
||||
}
|
||||
|
||||
if len(prm.XHeaders)%2 != 0 {
|
||||
return nil, errorInvalidXHeaders
|
||||
}
|
||||
|
||||
if len(prm.Ranges) == 0 {
|
||||
return nil, errorMissingRanges
|
||||
}
|
||||
|
||||
meta := new(v2session.RequestMetaHeader)
|
||||
writeXHeadersToMeta(prm.XHeaders, meta)
|
||||
|
||||
if prm.BearerToken != nil {
|
||||
v2BearerToken := new(acl.BearerToken)
|
||||
prm.BearerToken.WriteToV2(v2BearerToken)
|
||||
meta.SetBearerToken(v2BearerToken)
|
||||
}
|
||||
|
||||
if prm.Session != nil {
|
||||
v2SessionToken := new(v2session.Token)
|
||||
prm.Session.WriteToV2(v2SessionToken)
|
||||
meta.SetSessionToken(v2SessionToken)
|
||||
}
|
||||
|
||||
if prm.Local {
|
||||
meta.SetTTL(1)
|
||||
}
|
||||
|
||||
addr := new(v2refs.Address)
|
||||
|
||||
cnrV2 := new(v2refs.ContainerID)
|
||||
prm.ContainerID.WriteToV2(cnrV2)
|
||||
addr.SetContainerID(cnrV2)
|
||||
|
||||
objV2 := new(v2refs.ObjectID)
|
||||
prm.ObjectID.WriteToV2(objV2)
|
||||
addr.SetObjectID(objV2)
|
||||
|
||||
rs := make([]v2object.Range, len(prm.Ranges))
|
||||
for i := range prm.Ranges {
|
||||
rs[i].SetOffset(prm.Ranges[i].GetOffset())
|
||||
rs[i].SetLength(prm.Ranges[i].GetLength())
|
||||
}
|
||||
|
||||
body := new(v2object.GetRangeHashRequestBody)
|
||||
body.SetAddress(addr)
|
||||
body.SetRanges(rs)
|
||||
body.SetSalt(prm.Salt)
|
||||
|
||||
if prm.ChecksumType == checksum.Unknown {
|
||||
body.SetType(v2refs.SHA256)
|
||||
} else {
|
||||
body.SetType(v2refs.ChecksumType(prm.ChecksumType))
|
||||
}
|
||||
|
||||
req := new(v2object.GetRangeHashRequest)
|
||||
req.SetBody(body)
|
||||
c.prepareRequest(req, meta)
|
||||
|
||||
return req, nil
|
||||
}
|
||||
|
||||
// ObjectHash requests checksum of the range list of the object payload using
|
||||
// FrostFS API protocol.
|
||||
//
|
||||
|
@ -165,37 +167,22 @@ func (x ResObjectHash) Checksums() [][]byte {
|
|||
// - *apistatus.ObjectOutOfRange;
|
||||
// - *apistatus.SessionTokenExpired.
|
||||
func (c *Client) ObjectHash(ctx context.Context, prm PrmObjectHash) (*ResObjectHash, error) {
|
||||
switch {
|
||||
case prm.addr.GetContainerID() == nil:
|
||||
return nil, errorMissingContainer
|
||||
case prm.addr.GetObjectID() == nil:
|
||||
return nil, errorMissingObject
|
||||
case len(prm.body.GetRanges()) == 0:
|
||||
return nil, errorMissingRanges
|
||||
req, err := prm.buildRequest(c)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
prm.body.SetAddress(&prm.addr)
|
||||
if prm.csAlgo == v2refs.UnknownChecksum {
|
||||
prm.body.SetType(v2refs.SHA256)
|
||||
} else {
|
||||
prm.body.SetType(prm.csAlgo)
|
||||
}
|
||||
|
||||
var req v2object.GetRangeHashRequest
|
||||
c.prepareRequest(&req, &prm.meta)
|
||||
req.SetBody(&prm.body)
|
||||
|
||||
key := c.prm.key
|
||||
if prm.keySet {
|
||||
key = prm.key
|
||||
if prm.Key != nil {
|
||||
key = *prm.Key
|
||||
}
|
||||
|
||||
err := signature.SignServiceMessage(&key, &req)
|
||||
err = signature.SignServiceMessage(&key, req)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("sign request: %w", err)
|
||||
}
|
||||
|
||||
resp, err := rpcapi.HashObjectRange(&c.c, &req, client.WithContext(ctx))
|
||||
resp, err := rpcapi.HashObjectRange(&c.c, req, client.WithContext(ctx))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("write request: %w", err)
|
||||
}
|
||||
|
|
|
@ -16,30 +16,32 @@ import (
|
|||
|
||||
// PrmSessionCreate groups parameters of SessionCreate operation.
|
||||
type PrmSessionCreate struct {
|
||||
prmCommonMeta
|
||||
XHeaders []string
|
||||
|
||||
exp uint64
|
||||
Expiration uint64
|
||||
|
||||
keySet bool
|
||||
key ecdsa.PrivateKey
|
||||
Key *ecdsa.PrivateKey
|
||||
}
|
||||
|
||||
// SetExp sets number of the last NepFS epoch in the lifetime of the session after which it will be expired.
|
||||
//
|
||||
// Deprecated: Use PrmSessionCreate.Expiration instead.
|
||||
func (x *PrmSessionCreate) SetExp(exp uint64) {
|
||||
x.exp = exp
|
||||
x.Expiration = exp
|
||||
}
|
||||
|
||||
// UseKey specifies private key to sign the requests and compute token owner.
|
||||
// If key is not provided, then Client default key is used.
|
||||
//
|
||||
// Deprecated: Use PrmSessionCreate.Key instead.
|
||||
func (x *PrmSessionCreate) UseKey(key ecdsa.PrivateKey) {
|
||||
x.keySet = true
|
||||
x.key = key
|
||||
x.Key = &key
|
||||
}
|
||||
|
||||
func (x *PrmSessionCreate) buildRequest(c *Client) (*v2session.CreateRequest, error) {
|
||||
ownerKey := c.prm.key.PublicKey
|
||||
if x.keySet {
|
||||
ownerKey = x.key.PublicKey
|
||||
if x.Key != nil {
|
||||
ownerKey = x.Key.PublicKey
|
||||
}
|
||||
var ownerID user.ID
|
||||
user.IDFromKey(&ownerID, ownerKey)
|
||||
|
@ -49,10 +51,10 @@ func (x *PrmSessionCreate) buildRequest(c *Client) (*v2session.CreateRequest, er
|
|||
|
||||
reqBody := new(v2session.CreateRequestBody)
|
||||
reqBody.SetOwnerID(&ownerIDV2)
|
||||
reqBody.SetExpiration(x.exp)
|
||||
reqBody.SetExpiration(x.Expiration)
|
||||
|
||||
var meta v2session.RequestMetaHeader
|
||||
writeXHeadersToMeta(x.xHeaders, &meta)
|
||||
writeXHeadersToMeta(x.XHeaders, &meta)
|
||||
|
||||
var req v2session.CreateRequest
|
||||
req.SetBody(reqBody)
|
||||
|
|
|
@ -1,8 +1,8 @@
|
|||
package cid_test
|
||||
|
||||
import (
|
||||
"crypto/rand"
|
||||
"crypto/sha256"
|
||||
"math/rand"
|
||||
"testing"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/refs"
|
||||
|
|
|
@ -1,8 +1,8 @@
|
|||
package cidtest
|
||||
|
||||
import (
|
||||
"crypto/rand"
|
||||
"crypto/sha256"
|
||||
"math/rand"
|
||||
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
)
|
||||
|
@ -11,7 +11,7 @@ import (
|
|||
func ID() cid.ID {
|
||||
checksum := [sha256.Size]byte{}
|
||||
|
||||
rand.Read(checksum[:])
|
||||
_, _ = rand.Read(checksum[:])
|
||||
|
||||
return IDWithChecksum(checksum)
|
||||
}
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
package frostfscrypto_test
|
||||
|
||||
import (
|
||||
"math/rand"
|
||||
"crypto/rand"
|
||||
"testing"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/refs"
|
||||
|
|
|
@ -131,7 +131,8 @@ Its basic syntax is as follows:
|
|||
REP <count> {IN <select>}
|
||||
```
|
||||
|
||||
If a select is not specified, then the entire netmap is used as input. The resulting nodes will be used to actually store objects and they constitute a replica group (or simply, "a replica").
|
||||
If a select is not specified, then the entire netmap is used as input. The only exception to this rule is when exactly 1 replica and 1 selector are being present: in this case the only selector is being used instead of the whole netmap.
|
||||
The resulting nodes will be used to actually store objects and they constitute a replica group (or simply, "a replica").
|
||||
|
||||
Examples
|
||||
```sql
|
||||
|
|
|
@ -2,7 +2,7 @@ package eacltest
|
|||
|
||||
import (
|
||||
"bytes"
|
||||
"math/rand"
|
||||
"crypto/rand"
|
||||
"testing"
|
||||
|
||||
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
package eacl
|
||||
|
||||
import (
|
||||
"math/rand"
|
||||
"crypto/rand"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
|
|
|
@ -24,6 +24,7 @@ type (
|
|||
|
||||
minAgg struct {
|
||||
min float64
|
||||
minFound bool
|
||||
}
|
||||
|
||||
meanIQRAgg struct {
|
||||
|
@ -102,7 +103,13 @@ func (a *meanAgg) Compute() float64 {
|
|||
}
|
||||
|
||||
func (a *minAgg) Add(n float64) {
|
||||
if a.min == 0 || n < a.min {
|
||||
if !a.minFound {
|
||||
a.min = n
|
||||
a.minFound = true
|
||||
return
|
||||
}
|
||||
|
||||
if n < a.min {
|
||||
a.min = n
|
||||
}
|
||||
}
|
||||
|
|
44
netmap/aggregator_test.go
Normal file
44
netmap/aggregator_test.go
Normal file
|
@ -0,0 +1,44 @@
|
|||
package netmap
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestMinAgg(t *testing.T) {
|
||||
tests := []struct {
|
||||
vals []float64
|
||||
res float64
|
||||
}{
|
||||
{
|
||||
vals: []float64{1, 2, 3, 0, 10},
|
||||
res: 0,
|
||||
},
|
||||
{
|
||||
vals: []float64{10, 0, 10, 0},
|
||||
res: 0,
|
||||
},
|
||||
{
|
||||
vals: []float64{0, 1, 2, 3, 10},
|
||||
res: 0,
|
||||
},
|
||||
{
|
||||
vals: []float64{0, 0, 0, 0},
|
||||
res: 0,
|
||||
},
|
||||
{
|
||||
vals: []float64{10, 10, 10, 10},
|
||||
res: 10,
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
a := newMinAgg()
|
||||
for _, val := range test.vals {
|
||||
a.Add(val)
|
||||
}
|
||||
|
||||
require.Equal(t, test.res, a.Compute())
|
||||
}
|
||||
}
|
|
@ -40,6 +40,10 @@ type context struct {
|
|||
// policy uses the UNIQUE flag. Nodes marked as used are not used in subsequent
|
||||
// base selections.
|
||||
usedNodes map[uint64]bool
|
||||
|
||||
// If true, returns an error when netmap does not contain enough nodes for selection.
|
||||
// By default best effort is taken.
|
||||
strict bool
|
||||
}
|
||||
|
||||
// Various validation errors.
|
||||
|
|
|
@ -2,6 +2,7 @@ package netmap
|
|||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
|
@ -47,7 +48,8 @@ func TestPlacementPolicy_Interopability(t *testing.T) {
|
|||
require.NoError(t, err)
|
||||
|
||||
for i := range ds {
|
||||
bs, err := os.ReadFile(filepath.Join(testsDir, ds[i].Name()))
|
||||
filename := filepath.Join(testsDir, ds[i].Name())
|
||||
bs, err := os.ReadFile(filename)
|
||||
require.NoError(t, err)
|
||||
|
||||
var tc TestCase
|
||||
|
@ -56,7 +58,7 @@ func TestPlacementPolicy_Interopability(t *testing.T) {
|
|||
srcNodes := make([]NodeInfo, len(tc.Nodes))
|
||||
copy(srcNodes, tc.Nodes)
|
||||
|
||||
t.Run(tc.Name, func(t *testing.T) {
|
||||
t.Run(fmt.Sprintf("%s:%s", filename, tc.Name), func(t *testing.T) {
|
||||
var nm NetMap
|
||||
nm.SetNodes(tc.Nodes)
|
||||
|
||||
|
|
|
@ -146,19 +146,19 @@
|
|||
"select 3 nodes in 3 distinct countries, same placement": {
|
||||
"policy": {"replicas":[{"count":1,"selector":"Main"}],"containerBackupFactor":1,"selectors":[{"name":"Main","count":3,"clause":"DISTINCT","attribute":"Country","filter":"*"}],"filters":[]},
|
||||
"pivot": "Y29udGFpbmVySUQ=",
|
||||
"result": [[4, 0, 7]],
|
||||
"result": [[0, 2, 3]],
|
||||
"placement": {
|
||||
"pivot": "b2JqZWN0SUQ=",
|
||||
"result": [[4, 0, 7]]
|
||||
"result": [[0, 2, 3]]
|
||||
}
|
||||
},
|
||||
"select 6 nodes in 3 distinct countries, different placement": {
|
||||
"policy": {"replicas":[{"count":1,"selector":"Main"}],"containerBackupFactor":2,"selectors":[{"name":"Main","count":3,"clause":"DISTINCT","attribute":"Country","filter":"*"}],"filters":[]},
|
||||
"pivot": "Y29udGFpbmVySUQ=",
|
||||
"result": [[4, 3, 0, 1, 7, 2]],
|
||||
"result": [[0, 1, 2, 6, 3, 4]],
|
||||
"placement": {
|
||||
"pivot": "b2JqZWN0SUQ=",
|
||||
"result": [[4, 3, 0, 7, 2, 1]]
|
||||
"result": [[0, 1, 2, 6, 3, 4]]
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
95
netmap/json_tests/non_strict.json
Normal file
95
netmap/json_tests/non_strict.json
Normal file
|
@ -0,0 +1,95 @@
|
|||
{
|
||||
"name": "non-strict selections",
|
||||
"comment": "These test specify loose selection behaviour, to allow fetching already PUT objects even when there is not enough nodes to select from.",
|
||||
"nodes": [
|
||||
{
|
||||
"attributes": [
|
||||
{
|
||||
"key": "Country",
|
||||
"value": "Russia"
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"attributes": [
|
||||
{
|
||||
"key": "Country",
|
||||
"value": "Germany"
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"attributes": [ ]
|
||||
}
|
||||
],
|
||||
"tests": {
|
||||
"not enough nodes (backup factor)": {
|
||||
"policy": {
|
||||
"replicas": [
|
||||
{
|
||||
"count": 1,
|
||||
"selector": "MyStore"
|
||||
}
|
||||
],
|
||||
"containerBackupFactor": 2,
|
||||
"selectors": [
|
||||
{
|
||||
"name": "MyStore",
|
||||
"count": 2,
|
||||
"clause": "DISTINCT",
|
||||
"attribute": "Country",
|
||||
"filter": "FromRU"
|
||||
}
|
||||
],
|
||||
"filters": [
|
||||
{
|
||||
"name": "FromRU",
|
||||
"key": "Country",
|
||||
"op": "EQ",
|
||||
"value": "Russia",
|
||||
"filters": [ ]
|
||||
}
|
||||
]
|
||||
},
|
||||
"result": [
|
||||
[
|
||||
0
|
||||
]
|
||||
]
|
||||
},
|
||||
"not enough nodes (buckets)": {
|
||||
"policy": {
|
||||
"replicas": [
|
||||
{
|
||||
"count": 1,
|
||||
"selector": "MyStore"
|
||||
}
|
||||
],
|
||||
"containerBackupFactor": 1,
|
||||
"selectors": [
|
||||
{
|
||||
"name": "MyStore",
|
||||
"count": 2,
|
||||
"clause": "DISTINCT",
|
||||
"attribute": "Country",
|
||||
"filter": "FromRU"
|
||||
}
|
||||
],
|
||||
"filters": [
|
||||
{
|
||||
"name": "FromRU",
|
||||
"key": "Country",
|
||||
"op": "EQ",
|
||||
"value": "Russia",
|
||||
"filters": [ ]
|
||||
}
|
||||
]
|
||||
},
|
||||
"result": [
|
||||
[
|
||||
0
|
||||
]
|
||||
]
|
||||
}
|
||||
}
|
||||
}
|
|
@ -104,7 +104,14 @@
|
|||
"selectors": [],
|
||||
"filters": []
|
||||
},
|
||||
"error": "not enough nodes"
|
||||
"result": [
|
||||
[
|
||||
0,
|
||||
1,
|
||||
2,
|
||||
3
|
||||
]
|
||||
]
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -24,7 +24,12 @@
|
|||
"tests": {
|
||||
"missing filter": {
|
||||
"policy": {
|
||||
"replicas": [],
|
||||
"replicas": [
|
||||
{
|
||||
"count": 1,
|
||||
"selector": "MyStore"
|
||||
}
|
||||
],
|
||||
"containerBackupFactor": 1,
|
||||
"selectors": [
|
||||
{
|
||||
|
@ -47,9 +52,14 @@
|
|||
},
|
||||
"error": "filter not found"
|
||||
},
|
||||
"not enough nodes (backup factor)": {
|
||||
"not enough nodes (filter results in empty set)": {
|
||||
"policy": {
|
||||
"replicas": [],
|
||||
"replicas": [
|
||||
{
|
||||
"count": 1,
|
||||
"selector": "MyStore"
|
||||
}
|
||||
],
|
||||
"containerBackupFactor": 2,
|
||||
"selectors": [
|
||||
{
|
||||
|
@ -57,40 +67,15 @@
|
|||
"count": 2,
|
||||
"clause": "DISTINCT",
|
||||
"attribute": "Country",
|
||||
"filter": "FromRU"
|
||||
"filter": "FromMoon"
|
||||
}
|
||||
],
|
||||
"filters": [
|
||||
{
|
||||
"name": "FromRU",
|
||||
"name": "FromMoon",
|
||||
"key": "Country",
|
||||
"op": "EQ",
|
||||
"value": "Russia",
|
||||
"filters": []
|
||||
}
|
||||
]
|
||||
},
|
||||
"error": "not enough nodes"
|
||||
},
|
||||
"not enough nodes (buckets)": {
|
||||
"policy": {
|
||||
"replicas": [],
|
||||
"containerBackupFactor": 1,
|
||||
"selectors": [
|
||||
{
|
||||
"name": "MyStore",
|
||||
"count": 2,
|
||||
"clause": "DISTINCT",
|
||||
"attribute": "Country",
|
||||
"filter": "FromRU"
|
||||
}
|
||||
],
|
||||
"filters": [
|
||||
{
|
||||
"name": "FromRU",
|
||||
"key": "Country",
|
||||
"op": "EQ",
|
||||
"value": "Russia",
|
||||
"value": "Moon",
|
||||
"filters": []
|
||||
}
|
||||
]
|
||||
|
|
|
@ -238,7 +238,7 @@ func (m NetMap) ContainerNodes(p PlacementPolicy, pivot []byte) ([][]NodeInfo, e
|
|||
// marked as used by earlier replicas.
|
||||
for i := range p.replicas {
|
||||
sName := p.replicas[i].GetSelector()
|
||||
if sName == "" {
|
||||
if sName == "" && !(len(p.replicas) == 1 && len(p.selectors) == 1) {
|
||||
var s netmap.Selector
|
||||
s.SetCount(p.replicas[i].GetCount())
|
||||
s.SetFilter(mainFilterName)
|
||||
|
@ -258,6 +258,9 @@ func (m NetMap) ContainerNodes(p PlacementPolicy, pivot []byte) ([][]NodeInfo, e
|
|||
}
|
||||
|
||||
if p.unique {
|
||||
if c.processedSelectors[sName] == nil {
|
||||
return nil, fmt.Errorf("selector not found: '%s'", sName)
|
||||
}
|
||||
nodes, err := c.getSelection(*c.processedSelectors[sName])
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
|
|
@ -551,7 +551,7 @@ func (p *PlacementPolicy) DecodeString(s string) error {
|
|||
return errors.New("parsed nil value")
|
||||
}
|
||||
|
||||
if err := validatePolicy(*p); err != nil {
|
||||
if err := validatePolicy(*parsed); err != nil {
|
||||
return fmt.Errorf("invalid policy: %w", err)
|
||||
}
|
||||
|
||||
|
@ -605,6 +605,13 @@ var (
|
|||
errUnknownSelector = errors.New("policy: selector not found")
|
||||
// errSyntaxError is returned for errors found by ANTLR parser.
|
||||
errSyntaxError = errors.New("policy: syntax error")
|
||||
// errRedundantSelector is returned for errors found by selectors policy validator.
|
||||
errRedundantSelector = errors.New("policy: found redundant selector")
|
||||
// errUnnamedSelector is returned for errors found by selectors policy validator.
|
||||
errUnnamedSelector = errors.New("policy: unnamed selectors are useless, " +
|
||||
"make sure to pair REP and SELECT clauses: \"REP .. IN X\" + \"SELECT ... AS X\"")
|
||||
// errRedundantSelector is returned for errors found by filters policy validator.
|
||||
errRedundantFilter = errors.New("policy: found redundant filter")
|
||||
)
|
||||
|
||||
type policyVisitor struct {
|
||||
|
@ -845,25 +852,52 @@ func (p *policyVisitor) VisitExpr(ctx *parser.ExprContext) any {
|
|||
// validatePolicy checks high-level constraints such as filter link in SELECT
|
||||
// being actually defined in FILTER section.
|
||||
func validatePolicy(p PlacementPolicy) error {
|
||||
canOmitNames := len(p.selectors) == 1 && len(p.replicas) == 1
|
||||
seenFilters := map[string]bool{}
|
||||
|
||||
expectedFilters := map[string]struct{}{}
|
||||
for i := range p.filters {
|
||||
seenFilters[p.filters[i].GetName()] = true
|
||||
for _, f := range p.filters[i].GetFilters() {
|
||||
if f.GetName() != "" {
|
||||
expectedFilters[f.GetName()] = struct{}{}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
seenSelectors := map[string]bool{}
|
||||
|
||||
seenSelectors := map[string]*netmap.Selector{}
|
||||
for i := range p.selectors {
|
||||
if flt := p.selectors[i].GetFilter(); flt != mainFilterName && !seenFilters[flt] {
|
||||
if p.selectors[i].GetName() == "" && !canOmitNames {
|
||||
return errUnnamedSelector
|
||||
}
|
||||
if flt := p.selectors[i].GetFilter(); flt != mainFilterName {
|
||||
expectedFilters[flt] = struct{}{}
|
||||
if !seenFilters[flt] {
|
||||
return fmt.Errorf("%w: '%s'", errUnknownFilter, flt)
|
||||
}
|
||||
|
||||
seenSelectors[p.selectors[i].GetName()] = true
|
||||
}
|
||||
seenSelectors[p.selectors[i].GetName()] = &p.selectors[i]
|
||||
}
|
||||
|
||||
for _, f := range p.filters {
|
||||
if _, ok := expectedFilters[f.GetName()]; !ok {
|
||||
return fmt.Errorf("%w: '%s'", errRedundantFilter, f.GetName())
|
||||
}
|
||||
}
|
||||
|
||||
expectedSelectors := map[string]struct{}{}
|
||||
for i := range p.replicas {
|
||||
if sel := p.replicas[i].GetSelector(); sel != "" && !seenSelectors[sel] {
|
||||
return fmt.Errorf("%w: '%s'", errUnknownSelector, sel)
|
||||
selName := p.replicas[i].GetSelector()
|
||||
if selName != "" || canOmitNames {
|
||||
expectedSelectors[selName] = struct{}{}
|
||||
if seenSelectors[selName] == nil {
|
||||
return fmt.Errorf("%w: '%s'", errUnknownSelector, selName)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for _, s := range p.selectors {
|
||||
if _, ok := expectedSelectors[s.GetName()]; !ok {
|
||||
return fmt.Errorf("%w: to use selector '%s' use keyword IN", errRedundantSelector, s.GetName())
|
||||
}
|
||||
}
|
||||
|
||||
|
|
65
netmap/policy_decode_test.go
Normal file
65
netmap/policy_decode_test.go
Normal file
|
@ -0,0 +1,65 @@
|
|||
package netmap
|
||||
|
||||
import (
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestDecodeString(t *testing.T) {
|
||||
testCases := []string{
|
||||
`REP 2
|
||||
CBF 2
|
||||
SELECT 2 FROM *`,
|
||||
`REP 1 IN X
|
||||
CBF 1
|
||||
SELECT 2 IN SAME Location FROM * AS X`,
|
||||
|
||||
`REP 1 IN X
|
||||
REP 2 IN Y
|
||||
CBF 1
|
||||
SELECT 2 FROM * AS X
|
||||
SELECT 3 FROM * AS Y`,
|
||||
|
||||
`REP 1 IN X
|
||||
SELECT 2 IN City FROM Good AS X
|
||||
FILTER Country EQ RU AS FromRU
|
||||
FILTER Country EQ EN AS FromEN
|
||||
FILTER @FromRU AND @FromEN AND Rating GT 7 AS Good`,
|
||||
|
||||
`REP 7 IN SPB
|
||||
SELECT 1 IN City FROM SPBSSD AS SPB
|
||||
FILTER City EQ SPB AND SSD EQ true OR City EQ SPB AND Rating GE 5 AS SPBSSD`,
|
||||
|
||||
`REP 7 IN SPB
|
||||
SELECT 1 IN City FROM SPBSSD AS SPB
|
||||
FILTER NOT (NOT (City EQ SPB) AND SSD EQ true OR City EQ SPB AND Rating GE 5) AS SPBSSD`,
|
||||
|
||||
`UNIQUE
|
||||
REP 1
|
||||
REP 1`,
|
||||
}
|
||||
|
||||
var p PlacementPolicy
|
||||
|
||||
for _, testCase := range testCases {
|
||||
require.NoError(t, p.DecodeString(testCase), "unable parse %s", testCase)
|
||||
var b strings.Builder
|
||||
require.NoError(t, p.WriteStringTo(&b))
|
||||
require.Equal(t, testCase, b.String())
|
||||
}
|
||||
|
||||
invalidTestCases := map[string]error{
|
||||
`?REP 1`: errSyntaxError,
|
||||
`REP 1 trailing garbage`: errSyntaxError,
|
||||
`REP 1 REP 1 SELECT 4 FROM *`: errUnnamedSelector,
|
||||
`REP 1 SELECT 4 FROM * SELECT 1 FROM *`: errUnnamedSelector,
|
||||
`REP 1 IN X SELECT 4 FROM *`: errUnknownSelector,
|
||||
`REP 1 IN X REP 2 SELECT 4 FROM * AS X FILTER 'UN-LOCODE' EQ 'RU LED' AS F`: errRedundantFilter,
|
||||
}
|
||||
|
||||
for i := range invalidTestCases {
|
||||
require.ErrorIs(t, p.DecodeString(i), invalidTestCases[i], "#%s", i)
|
||||
}
|
||||
}
|
|
@ -1,7 +1,6 @@
|
|||
package netmap_test
|
||||
|
||||
import (
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
. "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||
|
@ -9,55 +8,6 @@ import (
|
|||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestEncode(t *testing.T) {
|
||||
testCases := []string{
|
||||
`REP 1 IN X
|
||||
CBF 1
|
||||
SELECT 2 IN SAME Location FROM * AS X`,
|
||||
|
||||
`REP 1
|
||||
SELECT 2 IN City FROM Good
|
||||
FILTER Country EQ RU AS FromRU
|
||||
FILTER @FromRU AND Rating GT 7 AS Good`,
|
||||
|
||||
`REP 7 IN SPB
|
||||
SELECT 1 IN City FROM SPBSSD AS SPB
|
||||
FILTER City EQ SPB AND SSD EQ true OR City EQ SPB AND Rating GE 5 AS SPBSSD`,
|
||||
|
||||
`REP 7 IN SPB
|
||||
SELECT 1 IN City FROM SPBSSD AS SPB
|
||||
FILTER NOT (NOT (City EQ SPB) AND SSD EQ true OR City EQ SPB AND Rating GE 5) AS SPBSSD`,
|
||||
|
||||
`UNIQUE
|
||||
REP 1
|
||||
REP 1`,
|
||||
|
||||
`REP 1 IN X
|
||||
SELECT 1 FROM F AS X
|
||||
FILTER 'UN-LOCODE' EQ 'RU LED' AS F`,
|
||||
}
|
||||
|
||||
var p PlacementPolicy
|
||||
|
||||
for _, testCase := range testCases {
|
||||
require.NoError(t, p.DecodeString(testCase))
|
||||
|
||||
var b strings.Builder
|
||||
require.NoError(t, p.WriteStringTo(&b))
|
||||
|
||||
require.Equal(t, testCase, b.String())
|
||||
}
|
||||
|
||||
invalidTestCases := []string{
|
||||
`?REP 1`,
|
||||
`REP 1 trailing garbage`,
|
||||
}
|
||||
|
||||
for i := range invalidTestCases {
|
||||
require.Error(t, p.DecodeString(invalidTestCases[i]), "#%d", i)
|
||||
}
|
||||
}
|
||||
|
||||
func TestPlacementPolicyEncoding(t *testing.T) {
|
||||
v := netmaptest.PlacementPolicy()
|
||||
|
||||
|
|
|
@ -60,7 +60,7 @@ func (c *context) getSelection(s netmap.Selector) ([]nodes, error) {
|
|||
bucketCount, nodesInBucket := calcNodesCount(s)
|
||||
buckets := c.getSelectionBase(s)
|
||||
|
||||
if len(buckets) < bucketCount {
|
||||
if c.strict && len(buckets) < bucketCount {
|
||||
return nil, fmt.Errorf("%w: '%s'", errNotEnoughNodes, s.GetName())
|
||||
}
|
||||
|
||||
|
@ -96,7 +96,7 @@ func (c *context) getSelection(s netmap.Selector) ([]nodes, error) {
|
|||
if len(res) < bucketCount {
|
||||
// Fallback to using minimum allowed backup factor (1).
|
||||
res = append(res, fallback...)
|
||||
if len(res) < bucketCount {
|
||||
if c.strict && len(res) < bucketCount {
|
||||
return nil, fmt.Errorf("%w: '%s'", errNotEnoughNodes, s.GetName())
|
||||
}
|
||||
}
|
||||
|
@ -110,6 +110,13 @@ func (c *context) getSelection(s netmap.Selector) ([]nodes, error) {
|
|||
hrw.SortHasherSliceByWeightValue(res, weights, c.hrwSeedHash)
|
||||
}
|
||||
|
||||
if len(res) < bucketCount {
|
||||
if len(res) == 0 {
|
||||
return nil, errNotEnoughNodes
|
||||
}
|
||||
bucketCount = len(res)
|
||||
}
|
||||
|
||||
if s.GetAttribute() == "" {
|
||||
res, fallback = res[:bucketCount], res[bucketCount:]
|
||||
for i := range fallback {
|
||||
|
|
|
@ -1,9 +1,10 @@
|
|||
package netmap
|
||||
|
||||
import (
|
||||
"crypto/rand"
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
mrand "math/rand"
|
||||
"sort"
|
||||
"strconv"
|
||||
"testing"
|
||||
|
@ -28,10 +29,10 @@ func BenchmarkHRWSort(b *testing.B) {
|
|||
node.SetPublicKey(key)
|
||||
|
||||
vectors[i] = nodes{node}
|
||||
weights[i] = float64(rand.Uint32()%10) / 10.0
|
||||
weights[i] = float64(mrand.Uint32()%10) / 10.0
|
||||
}
|
||||
|
||||
pivot := rand.Uint64()
|
||||
pivot := mrand.Uint64()
|
||||
b.Run("sort by index, no weight", func(b *testing.B) {
|
||||
realNodes := make([]nodes, netmapSize)
|
||||
b.ResetTimer()
|
||||
|
@ -282,6 +283,55 @@ func TestPlacementPolicy_Unique(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestPlacementPolicy_SingleOmitNames(t *testing.T) {
|
||||
nodes := []NodeInfo{
|
||||
nodeInfoFromAttributes("ID", "1", "Country", "Russia", "City", "SPB"),
|
||||
nodeInfoFromAttributes("ID", "2", "Country", "Germany", "City", "Berlin"),
|
||||
nodeInfoFromAttributes("ID", "3", "Country", "Russia", "City", "Moscow"),
|
||||
nodeInfoFromAttributes("ID", "4", "Country", "France", "City", "Paris"),
|
||||
nodeInfoFromAttributes("ID", "5", "Country", "France", "City", "Lyon"),
|
||||
nodeInfoFromAttributes("ID", "6", "Country", "Russia", "City", "SPB"),
|
||||
nodeInfoFromAttributes("ID", "7", "Country", "Russia", "City", "Moscow"),
|
||||
nodeInfoFromAttributes("ID", "8", "Country", "Germany", "City", "Darmstadt"),
|
||||
nodeInfoFromAttributes("ID", "9", "Country", "Germany", "City", "Frankfurt"),
|
||||
nodeInfoFromAttributes("ID", "10", "Country", "Russia", "City", "SPB"),
|
||||
nodeInfoFromAttributes("ID", "11", "Country", "Russia", "City", "Moscow"),
|
||||
nodeInfoFromAttributes("ID", "12", "Country", "Germany", "City", "London"),
|
||||
}
|
||||
for i := range nodes {
|
||||
pub := make([]byte, 33)
|
||||
rand.Read(pub)
|
||||
nodes[i].SetPublicKey(pub)
|
||||
}
|
||||
|
||||
var nm NetMap
|
||||
nm.SetNodes(nodes)
|
||||
|
||||
for _, unique := range []bool{false, true} {
|
||||
t.Run(fmt.Sprintf("unique=%t", unique), func(t *testing.T) {
|
||||
ssNamed := []Selector{newSelector("X", "City", 2, "FromRU", (*Selector).SelectDistinct)}
|
||||
fsNamed := []Filter{newFilter("FromRU", "Country", "Russia", netmap.EQ)}
|
||||
rsNamed := []ReplicaDescriptor{newReplica(1, "X")}
|
||||
pNamed := newPlacementPolicy(3, rsNamed, ssNamed, fsNamed)
|
||||
pNamed.unique = unique
|
||||
|
||||
vNamed, err := nm.ContainerNodes(pNamed, []byte{1})
|
||||
require.NoError(t, err)
|
||||
|
||||
ssUnnamed := []Selector{newSelector("", "City", 2, "FromRU", (*Selector).SelectDistinct)}
|
||||
fsUnnamed := []Filter{newFilter("FromRU", "Country", "Russia", netmap.EQ)}
|
||||
rsUnnamed := []ReplicaDescriptor{newReplica(1, "")}
|
||||
pUnnamed := newPlacementPolicy(3, rsUnnamed, ssUnnamed, fsUnnamed)
|
||||
pUnnamed.unique = unique
|
||||
|
||||
vUnnamed, err := nm.ContainerNodes(pUnnamed, []byte{1})
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Equal(t, vNamed, vUnnamed)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestPlacementPolicy_MultiREP(t *testing.T) {
|
||||
nodes := []NodeInfo{
|
||||
nodeInfoFromAttributes("ID", "1", "Country", "Russia", "City", "SPB"),
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
package netmaptest
|
||||
|
||||
import (
|
||||
"math/rand"
|
||||
"crypto/rand"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||
)
|
||||
|
@ -70,7 +70,7 @@ func NetworkInfo() (x netmap.NetworkInfo) {
|
|||
// NodeInfo returns random netmap.NodeInfo.
|
||||
func NodeInfo() (x netmap.NodeInfo) {
|
||||
key := make([]byte, 33)
|
||||
rand.Read(key)
|
||||
_, _ = rand.Read(key)
|
||||
|
||||
x.SetPublicKey(key)
|
||||
x.SetNetworkEndpoints("1", "2", "3")
|
||||
|
|
|
@ -1,10 +1,10 @@
|
|||
package ns
|
||||
|
||||
import (
|
||||
"crypto/rand"
|
||||
"errors"
|
||||
"fmt"
|
||||
"math/big"
|
||||
"math/rand"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
|
|
|
@ -1,8 +1,8 @@
|
|||
package oidtest
|
||||
|
||||
import (
|
||||
"crypto/rand"
|
||||
"crypto/sha256"
|
||||
"math/rand"
|
||||
|
||||
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
|
@ -12,7 +12,7 @@ import (
|
|||
func ID() oid.ID {
|
||||
checksum := [sha256.Size]byte{}
|
||||
|
||||
rand.Read(checksum[:])
|
||||
_, _ = rand.Read(checksum[:])
|
||||
|
||||
return idWithChecksum(checksum)
|
||||
}
|
||||
|
|
|
@ -1,8 +1,8 @@
|
|||
package object_test
|
||||
|
||||
import (
|
||||
"crypto/rand"
|
||||
"crypto/sha256"
|
||||
"math/rand"
|
||||
"testing"
|
||||
|
||||
v2object "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
|
||||
|
|
|
@ -1,8 +1,8 @@
|
|||
package object
|
||||
|
||||
import (
|
||||
"crypto/rand"
|
||||
"crypto/sha256"
|
||||
"math/rand"
|
||||
"testing"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/tombstone"
|
||||
|
|
|
@ -69,3 +69,7 @@ func (c *sessionCache) expired(val *cacheValue) bool {
|
|||
// use epoch+1 (clear cache beforehand) to prevent 'expired session token' error right after epoch tick
|
||||
return val.token.ExpiredAt(epoch + 1)
|
||||
}
|
||||
|
||||
func (c *sessionCache) Epoch() uint64 {
|
||||
return c.currentEpoch.Load()
|
||||
}
|
||||
|
|
|
@ -189,3 +189,7 @@ func (m *mockClient) restartIfUnhealthy(ctx context.Context) (healthy bool, chan
|
|||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (m *mockClient) close() error {
|
||||
return nil
|
||||
}
|
||||
|
|
172
pool/object_put_pool_transformer.go
Normal file
172
pool/object_put_pool_transformer.go
Normal file
|
@ -0,0 +1,172 @@
|
|||
package pool
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
sdkClient "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/transformer"
|
||||
"go.uber.org/zap"
|
||||
"go.uber.org/zap/zapcore"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
)
|
||||
|
||||
type logger interface {
|
||||
log(level zapcore.Level, msg string, fields ...zap.Field)
|
||||
}
|
||||
|
||||
type PrmObjectPutClientCutInit struct {
|
||||
PrmObjectPut
|
||||
}
|
||||
|
||||
func (c *clientWrapper) objectPutInitTransformer(prm PrmObjectPutClientCutInit) (*objectWriterTransformer, error) {
|
||||
cl, err := c.getClient()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var w objectWriterTransformer
|
||||
|
||||
w.it = internalTarget{
|
||||
client: cl,
|
||||
prm: prm,
|
||||
address: c.address(),
|
||||
logger: &c.clientStatusMonitor,
|
||||
}
|
||||
|
||||
key := &c.prm.key
|
||||
if prm.key != nil {
|
||||
key = prm.key
|
||||
}
|
||||
|
||||
w.ot = transformer.NewPayloadSizeLimiter(transformer.Params{
|
||||
Key: key,
|
||||
NextTargetInit: func() transformer.ObjectWriter { return &w.it },
|
||||
MaxSize: prm.networkInfo.MaxObjectSize(),
|
||||
WithoutHomomorphicHash: prm.withoutHomomorphicHash,
|
||||
NetworkState: prm.networkInfo,
|
||||
SessionToken: prm.stoken,
|
||||
})
|
||||
return &w, nil
|
||||
}
|
||||
|
||||
type objectWriterTransformer struct {
|
||||
ot transformer.ChunkedObjectWriter
|
||||
it internalTarget
|
||||
err error
|
||||
}
|
||||
|
||||
func (x *objectWriterTransformer) WriteHeader(ctx context.Context, hdr object.Object) bool {
|
||||
x.err = x.ot.WriteHeader(ctx, &hdr)
|
||||
return x.err == nil
|
||||
}
|
||||
|
||||
func (x *objectWriterTransformer) WritePayloadChunk(ctx context.Context, chunk []byte) bool {
|
||||
_, x.err = x.ot.Write(ctx, chunk)
|
||||
return x.err == nil
|
||||
}
|
||||
|
||||
// ResObjectPut groups the final result values of ObjectPutInit operation.
|
||||
type ResObjectPut struct {
|
||||
Status apistatus.Status
|
||||
OID oid.ID
|
||||
}
|
||||
|
||||
// Close return non nil result in any case. If error occurred, the result contains only buffer for further reusing.
|
||||
func (x *objectWriterTransformer) Close(ctx context.Context) (*ResObjectPut, error) {
|
||||
ai, err := x.ot.Close(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if ai != nil && ai.ParentID != nil {
|
||||
x.it.res.OID = *ai.ParentID
|
||||
}
|
||||
return &x.it.res, nil
|
||||
}
|
||||
|
||||
type internalTarget struct {
|
||||
client *sdkClient.Client
|
||||
res ResObjectPut
|
||||
prm PrmObjectPutClientCutInit
|
||||
useStream bool
|
||||
address string
|
||||
logger logger
|
||||
resolveFrostFSErrors bool
|
||||
}
|
||||
|
||||
func (it *internalTarget) WriteObject(ctx context.Context, o *object.Object) error {
|
||||
putSingleImplemented, err := it.tryPutSingle(ctx, o)
|
||||
if putSingleImplemented {
|
||||
return err
|
||||
}
|
||||
|
||||
it.logger.log(zapcore.DebugLevel, "putSingle not implemented, trying put as stream", zap.String("address", it.address))
|
||||
|
||||
it.useStream = true
|
||||
return it.putAsStream(ctx, o)
|
||||
}
|
||||
|
||||
func (it *internalTarget) putAsStream(ctx context.Context, o *object.Object) error {
|
||||
var cliPrm sdkClient.PrmObjectPutInit
|
||||
cliPrm.SetCopiesNumberByVectors(it.prm.copiesNumber)
|
||||
if it.prm.stoken != nil {
|
||||
cliPrm.WithinSession(*it.prm.stoken)
|
||||
}
|
||||
if it.prm.key != nil {
|
||||
cliPrm.UseKey(*it.prm.key)
|
||||
}
|
||||
if it.prm.btoken != nil {
|
||||
cliPrm.WithBearerToken(*it.prm.btoken)
|
||||
}
|
||||
|
||||
wrt, err := it.client.ObjectPutInit(ctx, cliPrm)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if wrt.WriteHeader(ctx, *o) {
|
||||
wrt.WritePayloadChunk(ctx, o.Payload())
|
||||
}
|
||||
res, err := wrt.Close(ctx)
|
||||
if res != nil {
|
||||
it.res.Status = res.Status()
|
||||
it.res.OID = res.StoredObjectID()
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (it *internalTarget) tryPutSingle(ctx context.Context, o *object.Object) (bool, error) {
|
||||
if it.useStream {
|
||||
return false, nil
|
||||
}
|
||||
var cliPrm sdkClient.PrmObjectPutSingle
|
||||
cliPrm.SetCopiesNumber(it.prm.copiesNumber)
|
||||
cliPrm.UseKey(it.prm.key)
|
||||
if it.prm.stoken != nil {
|
||||
cliPrm.WithinSession(*it.prm.stoken)
|
||||
}
|
||||
if it.prm.btoken != nil {
|
||||
cliPrm.WithBearerToken(*it.prm.btoken)
|
||||
}
|
||||
cliPrm.SetObject(o.ToV2())
|
||||
|
||||
res, err := it.client.ObjectPutSingle(ctx, cliPrm)
|
||||
if err != nil && status.Code(err) == codes.Unimplemented {
|
||||
return false, err
|
||||
}
|
||||
|
||||
if err == nil {
|
||||
id, _ := o.ID()
|
||||
it.res = ResObjectPut{
|
||||
Status: res.Status(),
|
||||
OID: id,
|
||||
}
|
||||
if !it.resolveFrostFSErrors && !apistatus.IsSuccessful(it.res.Status) {
|
||||
return true, apistatus.ErrFromStatus(it.res.Status)
|
||||
}
|
||||
return true, nil
|
||||
}
|
||||
return true, err
|
||||
}
|
410
pool/pool.go
410
pool/pool.go
|
@ -78,12 +78,16 @@ type client interface {
|
|||
dial(ctx context.Context) error
|
||||
// see clientWrapper.restartIfUnhealthy.
|
||||
restartIfUnhealthy(ctx context.Context) (bool, bool)
|
||||
// see clientWrapper.close.
|
||||
close() error
|
||||
}
|
||||
|
||||
// clientStatus provide access to some metrics for connection.
|
||||
type clientStatus interface {
|
||||
// isHealthy checks if the connection can handle requests.
|
||||
isHealthy() bool
|
||||
// isDialed checks if the connection was created.
|
||||
isDialed() bool
|
||||
// setUnhealthy marks client as unhealthy.
|
||||
setUnhealthy()
|
||||
// address return address of endpoint.
|
||||
|
@ -105,7 +109,7 @@ var errPoolClientUnhealthy = errors.New("pool client unhealthy")
|
|||
type clientStatusMonitor struct {
|
||||
logger *zap.Logger
|
||||
addr string
|
||||
healthy *atomic.Bool
|
||||
healthy *atomic.Uint32
|
||||
errorThreshold uint32
|
||||
|
||||
mu sync.RWMutex // protect counters
|
||||
|
@ -114,6 +118,22 @@ type clientStatusMonitor struct {
|
|||
methods []*methodStatus
|
||||
}
|
||||
|
||||
// values for healthy status of clientStatusMonitor.
|
||||
const (
|
||||
// statusUnhealthyOnDial is set when dialing to the endpoint is failed,
|
||||
// so there is no connection to the endpoint, and pool should not close it
|
||||
// before re-establishing connection once again.
|
||||
statusUnhealthyOnDial = iota
|
||||
|
||||
// statusUnhealthyOnRequest is set when communication after dialing to the
|
||||
// endpoint is failed due to immediate or accumulated errors, connection is
|
||||
// available and pool should close it before re-establishing connection once again.
|
||||
statusUnhealthyOnRequest
|
||||
|
||||
// statusHealthy is set when connection is ready to be used by the pool.
|
||||
statusHealthy
|
||||
)
|
||||
|
||||
// methodStatus provide statistic for specific method.
|
||||
type methodStatus struct {
|
||||
name string
|
||||
|
@ -195,8 +215,8 @@ func newClientStatusMonitor(logger *zap.Logger, addr string, errorThreshold uint
|
|||
methods[i] = &methodStatus{name: i.String()}
|
||||
}
|
||||
|
||||
healthy := new(atomic.Bool)
|
||||
healthy.Store(true)
|
||||
healthy := new(atomic.Uint32)
|
||||
healthy.Store(statusHealthy)
|
||||
|
||||
return clientStatusMonitor{
|
||||
logger: logger,
|
||||
|
@ -252,6 +272,11 @@ func (x *wrapperPrm) setKey(key ecdsa.PrivateKey) {
|
|||
x.key = key
|
||||
}
|
||||
|
||||
// setLogger sets sdkClient.Client logger.
|
||||
func (x *wrapperPrm) setLogger(logger *zap.Logger) {
|
||||
x.logger = logger
|
||||
}
|
||||
|
||||
// setDialTimeout sets the timeout for connection to be established.
|
||||
func (x *wrapperPrm) setDialTimeout(timeout time.Duration) {
|
||||
x.dialTimeout = timeout
|
||||
|
@ -317,7 +342,7 @@ func (c *clientWrapper) dial(ctx context.Context) error {
|
|||
prmDial.SetGRPCDialOptions(c.prm.dialOptions...)
|
||||
|
||||
if err = cl.Dial(ctx, prmDial); err != nil {
|
||||
c.setUnhealthy()
|
||||
c.setUnhealthyOnDial()
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -334,6 +359,12 @@ func (c *clientWrapper) restartIfUnhealthy(ctx context.Context) (healthy, change
|
|||
wasHealthy = true
|
||||
}
|
||||
|
||||
// if connection is dialed before, to avoid routine / connection leak,
|
||||
// pool has to close it and then initialize once again.
|
||||
if c.isDialed() {
|
||||
_ = c.close()
|
||||
}
|
||||
|
||||
var cl sdkClient.Client
|
||||
var prmInit sdkClient.PrmInit
|
||||
prmInit.SetDefaultPrivateKey(c.prm.key)
|
||||
|
@ -348,7 +379,7 @@ func (c *clientWrapper) restartIfUnhealthy(ctx context.Context) (healthy, change
|
|||
prmDial.SetGRPCDialOptions(c.prm.dialOptions...)
|
||||
|
||||
if err := cl.Dial(ctx, prmDial); err != nil {
|
||||
c.setUnhealthy()
|
||||
c.setUnhealthyOnDial()
|
||||
return false, wasHealthy
|
||||
}
|
||||
|
||||
|
@ -549,11 +580,9 @@ func (c *clientWrapper) containerSetEACL(ctx context.Context, prm PrmContainerSe
|
|||
return err
|
||||
}
|
||||
|
||||
var cliPrm sdkClient.PrmContainerSetEACL
|
||||
cliPrm.SetTable(prm.table)
|
||||
|
||||
if prm.sessionSet {
|
||||
cliPrm.WithinSession(prm.session)
|
||||
cliPrm := sdkClient.PrmContainerSetEACL{
|
||||
Table: &prm.Table,
|
||||
Session: prm.Session,
|
||||
}
|
||||
|
||||
start := time.Now()
|
||||
|
@ -567,16 +596,19 @@ func (c *clientWrapper) containerSetEACL(ctx context.Context, prm PrmContainerSe
|
|||
return fmt.Errorf("set eacl on client: %w", err)
|
||||
}
|
||||
|
||||
if !prm.waitParamsSet {
|
||||
prm.waitParams.setDefaults()
|
||||
if prm.WaitParams == nil {
|
||||
prm.WaitParams = defaultWaitParams()
|
||||
}
|
||||
if err := prm.WaitParams.CheckValidity(); err != nil {
|
||||
return fmt.Errorf("invalid wait parameters: %w", err)
|
||||
}
|
||||
|
||||
var cIDp *cid.ID
|
||||
if cID, set := prm.table.CID(); set {
|
||||
if cID, set := prm.Table.CID(); set {
|
||||
cIDp = &cID
|
||||
}
|
||||
|
||||
err = waitForEACLPresence(ctx, c, cIDp, &prm.table, &prm.waitParams)
|
||||
err = waitForEACLPresence(ctx, c, cIDp, &prm.Table, prm.WaitParams)
|
||||
if err = c.handleError(ctx, nil, err); err != nil {
|
||||
return fmt.Errorf("wait eacl presence on client: %w", err)
|
||||
}
|
||||
|
@ -628,6 +660,18 @@ func (c *clientWrapper) networkInfo(ctx context.Context, _ prmNetworkInfo) (netm
|
|||
|
||||
// objectPut writes object to FrostFS.
|
||||
func (c *clientWrapper) objectPut(ctx context.Context, prm PrmObjectPut) (oid.ID, error) {
|
||||
if prm.bufferMaxSize == 0 {
|
||||
prm.bufferMaxSize = defaultBufferMaxSizeForPut
|
||||
}
|
||||
|
||||
if prm.clientCut {
|
||||
return c.objectPutClientCut(ctx, prm)
|
||||
}
|
||||
|
||||
return c.objectPutServerCut(ctx, prm)
|
||||
}
|
||||
|
||||
func (c *clientWrapper) objectPutServerCut(ctx context.Context, prm PrmObjectPut) (oid.ID, error) {
|
||||
cl, err := c.getClient()
|
||||
if err != nil {
|
||||
return oid.ID{}, err
|
||||
|
@ -665,10 +709,8 @@ func (c *clientWrapper) objectPut(ctx context.Context, prm PrmObjectPut) (oid.ID
|
|||
}
|
||||
|
||||
if prm.payload != nil {
|
||||
const defaultBufferSizePut = 3 << 20 // configure?
|
||||
|
||||
if sz == 0 || sz > defaultBufferSizePut {
|
||||
sz = defaultBufferSizePut
|
||||
if sz == 0 || sz > prm.bufferMaxSize {
|
||||
sz = prm.bufferMaxSize
|
||||
}
|
||||
|
||||
buf := make([]byte, sz)
|
||||
|
@ -709,6 +751,73 @@ func (c *clientWrapper) objectPut(ctx context.Context, prm PrmObjectPut) (oid.ID
|
|||
return res.StoredObjectID(), nil
|
||||
}
|
||||
|
||||
func (c *clientWrapper) objectPutClientCut(ctx context.Context, prm PrmObjectPut) (oid.ID, error) {
|
||||
putInitPrm := PrmObjectPutClientCutInit{
|
||||
PrmObjectPut: prm,
|
||||
}
|
||||
|
||||
start := time.Now()
|
||||
wObj, err := c.objectPutInitTransformer(putInitPrm)
|
||||
c.incRequests(time.Since(start), methodObjectPut)
|
||||
if err = c.handleError(ctx, nil, err); err != nil {
|
||||
return oid.ID{}, fmt.Errorf("init writing on API client: %w", err)
|
||||
}
|
||||
|
||||
if wObj.WriteHeader(ctx, prm.hdr) {
|
||||
sz := prm.hdr.PayloadSize()
|
||||
|
||||
if data := prm.hdr.Payload(); len(data) > 0 {
|
||||
if prm.payload != nil {
|
||||
prm.payload = io.MultiReader(bytes.NewReader(data), prm.payload)
|
||||
} else {
|
||||
prm.payload = bytes.NewReader(data)
|
||||
sz = uint64(len(data))
|
||||
}
|
||||
}
|
||||
|
||||
if prm.payload != nil {
|
||||
if sz == 0 || sz > prm.bufferMaxSize {
|
||||
sz = prm.bufferMaxSize
|
||||
}
|
||||
|
||||
buf := make([]byte, sz)
|
||||
|
||||
var n int
|
||||
|
||||
for {
|
||||
n, err = prm.payload.Read(buf)
|
||||
if n > 0 {
|
||||
start = time.Now()
|
||||
successWrite := wObj.WritePayloadChunk(ctx, buf[:n])
|
||||
c.incRequests(time.Since(start), methodObjectPut)
|
||||
if !successWrite {
|
||||
break
|
||||
}
|
||||
|
||||
continue
|
||||
}
|
||||
|
||||
if errors.Is(err, io.EOF) {
|
||||
break
|
||||
}
|
||||
|
||||
return oid.ID{}, fmt.Errorf("read payload: %w", c.handleError(ctx, nil, err))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
res, err := wObj.Close(ctx)
|
||||
var st apistatus.Status
|
||||
if res != nil {
|
||||
st = res.Status
|
||||
}
|
||||
if err = c.handleError(ctx, st, err); err != nil { // here err already carries both status and client errors
|
||||
return oid.ID{}, fmt.Errorf("client failure: %w", err)
|
||||
}
|
||||
|
||||
return res.OID, nil
|
||||
}
|
||||
|
||||
// objectDelete invokes sdkClient.ObjectDelete parse response status to error.
|
||||
func (c *clientWrapper) objectDelete(ctx context.Context, prm PrmObjectDelete) error {
|
||||
cl, err := c.getClient()
|
||||
|
@ -716,20 +825,15 @@ func (c *clientWrapper) objectDelete(ctx context.Context, prm PrmObjectDelete) e
|
|||
return err
|
||||
}
|
||||
|
||||
var cliPrm sdkClient.PrmObjectDelete
|
||||
cliPrm.FromContainer(prm.addr.Container())
|
||||
cliPrm.ByID(prm.addr.Object())
|
||||
cnr := prm.addr.Container()
|
||||
obj := prm.addr.Object()
|
||||
|
||||
if prm.stoken != nil {
|
||||
cliPrm.WithinSession(*prm.stoken)
|
||||
}
|
||||
|
||||
if prm.btoken != nil {
|
||||
cliPrm.WithBearerToken(*prm.btoken)
|
||||
}
|
||||
|
||||
if prm.key != nil {
|
||||
cliPrm.UseKey(*prm.key)
|
||||
cliPrm := sdkClient.PrmObjectDelete{
|
||||
BearerToken: prm.btoken,
|
||||
Session: prm.stoken,
|
||||
ContainerID: &cnr,
|
||||
ObjectID: &obj,
|
||||
Key: prm.key,
|
||||
}
|
||||
|
||||
start := time.Now()
|
||||
|
@ -752,20 +856,15 @@ func (c *clientWrapper) objectGet(ctx context.Context, prm PrmObjectGet) (ResGet
|
|||
return ResGetObject{}, err
|
||||
}
|
||||
|
||||
var cliPrm sdkClient.PrmObjectGet
|
||||
cliPrm.FromContainer(prm.addr.Container())
|
||||
cliPrm.ByID(prm.addr.Object())
|
||||
prmCnr := prm.addr.Container()
|
||||
prmObj := prm.addr.Object()
|
||||
|
||||
if prm.stoken != nil {
|
||||
cliPrm.WithinSession(*prm.stoken)
|
||||
}
|
||||
|
||||
if prm.btoken != nil {
|
||||
cliPrm.WithBearerToken(*prm.btoken)
|
||||
}
|
||||
|
||||
if prm.key != nil {
|
||||
cliPrm.UseKey(*prm.key)
|
||||
cliPrm := sdkClient.PrmObjectGet{
|
||||
BearerToken: prm.btoken,
|
||||
Session: prm.stoken,
|
||||
ContainerID: &prmCnr,
|
||||
ObjectID: &prmObj,
|
||||
Key: prm.key,
|
||||
}
|
||||
|
||||
var res ResGetObject
|
||||
|
@ -805,23 +904,16 @@ func (c *clientWrapper) objectHead(ctx context.Context, prm PrmObjectHead) (obje
|
|||
return object.Object{}, err
|
||||
}
|
||||
|
||||
var cliPrm sdkClient.PrmObjectHead
|
||||
cliPrm.FromContainer(prm.addr.Container())
|
||||
cliPrm.ByID(prm.addr.Object())
|
||||
if prm.raw {
|
||||
cliPrm.MarkRaw()
|
||||
}
|
||||
prmCnr := prm.addr.Container()
|
||||
prmObj := prm.addr.Object()
|
||||
|
||||
if prm.stoken != nil {
|
||||
cliPrm.WithinSession(*prm.stoken)
|
||||
}
|
||||
|
||||
if prm.btoken != nil {
|
||||
cliPrm.WithBearerToken(*prm.btoken)
|
||||
}
|
||||
|
||||
if prm.key != nil {
|
||||
cliPrm.UseKey(*prm.key)
|
||||
cliPrm := sdkClient.PrmObjectHead{
|
||||
BearerToken: prm.btoken,
|
||||
Session: prm.stoken,
|
||||
Raw: prm.raw,
|
||||
ContainerID: &prmCnr,
|
||||
ObjectID: &prmObj,
|
||||
Key: prm.key,
|
||||
}
|
||||
|
||||
var obj object.Object
|
||||
|
@ -850,22 +942,17 @@ func (c *clientWrapper) objectRange(ctx context.Context, prm PrmObjectRange) (Re
|
|||
return ResObjectRange{}, err
|
||||
}
|
||||
|
||||
var cliPrm sdkClient.PrmObjectRange
|
||||
cliPrm.FromContainer(prm.addr.Container())
|
||||
cliPrm.ByID(prm.addr.Object())
|
||||
cliPrm.SetOffset(prm.off)
|
||||
cliPrm.SetLength(prm.ln)
|
||||
prmCnr := prm.addr.Container()
|
||||
prmObj := prm.addr.Object()
|
||||
|
||||
if prm.stoken != nil {
|
||||
cliPrm.WithinSession(*prm.stoken)
|
||||
}
|
||||
|
||||
if prm.btoken != nil {
|
||||
cliPrm.WithBearerToken(*prm.btoken)
|
||||
}
|
||||
|
||||
if prm.key != nil {
|
||||
cliPrm.UseKey(*prm.key)
|
||||
cliPrm := sdkClient.PrmObjectRange{
|
||||
BearerToken: prm.btoken,
|
||||
Session: prm.stoken,
|
||||
ContainerID: &prmCnr,
|
||||
ObjectID: &prmObj,
|
||||
Offset: prm.off,
|
||||
Length: prm.ln,
|
||||
Key: prm.key,
|
||||
}
|
||||
|
||||
start := time.Now()
|
||||
|
@ -922,9 +1009,10 @@ func (c *clientWrapper) sessionCreate(ctx context.Context, prm prmCreateSession)
|
|||
return resCreateSession{}, err
|
||||
}
|
||||
|
||||
var cliPrm sdkClient.PrmSessionCreate
|
||||
cliPrm.SetExp(prm.exp)
|
||||
cliPrm.UseKey(prm.key)
|
||||
cliPrm := sdkClient.PrmSessionCreate{
|
||||
Expiration: prm.exp,
|
||||
Key: &prm.key,
|
||||
}
|
||||
|
||||
start := time.Now()
|
||||
res, err := cl.SessionCreate(ctx, cliPrm)
|
||||
|
@ -944,15 +1032,23 @@ func (c *clientWrapper) sessionCreate(ctx context.Context, prm prmCreateSession)
|
|||
}
|
||||
|
||||
func (c *clientStatusMonitor) isHealthy() bool {
|
||||
return c.healthy.Load()
|
||||
return c.healthy.Load() == statusHealthy
|
||||
}
|
||||
|
||||
func (c *clientStatusMonitor) isDialed() bool {
|
||||
return c.healthy.Load() != statusUnhealthyOnDial
|
||||
}
|
||||
|
||||
func (c *clientStatusMonitor) setHealthy() {
|
||||
c.healthy.Store(true)
|
||||
c.healthy.Store(statusHealthy)
|
||||
}
|
||||
|
||||
func (c *clientStatusMonitor) setUnhealthy() {
|
||||
c.healthy.Store(false)
|
||||
c.healthy.Store(statusUnhealthyOnRequest)
|
||||
}
|
||||
|
||||
func (c *clientStatusMonitor) setUnhealthyOnDial() {
|
||||
c.healthy.Store(statusUnhealthyOnDial)
|
||||
}
|
||||
|
||||
func (c *clientStatusMonitor) address() string {
|
||||
|
@ -971,12 +1067,20 @@ func (c *clientStatusMonitor) incErrorRate() {
|
|||
}
|
||||
c.mu.Unlock()
|
||||
|
||||
if thresholdReached && c.logger != nil {
|
||||
c.logger.Warn("error threshold reached",
|
||||
if thresholdReached {
|
||||
c.log(zapcore.WarnLevel, "error threshold reached",
|
||||
zap.String("address", c.addr), zap.Uint32("threshold", c.errorThreshold))
|
||||
}
|
||||
}
|
||||
|
||||
func (c *clientStatusMonitor) log(level zapcore.Level, msg string, fields ...zap.Field) {
|
||||
if c.logger == nil {
|
||||
return
|
||||
}
|
||||
|
||||
c.logger.Log(level, msg, fields...)
|
||||
}
|
||||
|
||||
func (c *clientStatusMonitor) currentErrorRate() uint32 {
|
||||
c.mu.RLock()
|
||||
defer c.mu.RUnlock()
|
||||
|
@ -1010,6 +1114,13 @@ func (c *clientWrapper) incRequests(elapsed time.Duration, method MethodIndex) {
|
|||
}
|
||||
}
|
||||
|
||||
func (c *clientWrapper) close() error {
|
||||
if c.client != nil {
|
||||
return c.client.Close()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *clientStatusMonitor) handleError(ctx context.Context, st apistatus.Status, err error) error {
|
||||
if err != nil {
|
||||
if needCountError(ctx, err) {
|
||||
|
@ -1227,12 +1338,6 @@ func (x *WaitParams) SetPollInterval(tick time.Duration) {
|
|||
x.PollInterval = tick
|
||||
}
|
||||
|
||||
// Deprecated: Use defaultWaitParams() instead.
|
||||
func (x *WaitParams) setDefaults() {
|
||||
x.Timeout = 120 * time.Second
|
||||
x.PollInterval = 5 * time.Second
|
||||
}
|
||||
|
||||
func defaultWaitParams() *WaitParams {
|
||||
return &WaitParams{
|
||||
Timeout: 120 * time.Second,
|
||||
|
@ -1321,6 +1426,13 @@ type PrmObjectPut struct {
|
|||
payload io.Reader
|
||||
|
||||
copiesNumber []uint32
|
||||
|
||||
clientCut bool
|
||||
networkInfo netmap.NetworkInfo
|
||||
|
||||
withoutHomomorphicHash bool
|
||||
|
||||
bufferMaxSize uint64
|
||||
}
|
||||
|
||||
// SetHeader specifies header of the object.
|
||||
|
@ -1345,6 +1457,32 @@ func (x *PrmObjectPut) SetCopiesNumberVector(copiesNumber []uint32) {
|
|||
x.copiesNumber = copiesNumber
|
||||
}
|
||||
|
||||
// SetClientCut enables client cut for objects. It means that full object is prepared on client side
|
||||
// and retrying is possible. But this leads to additional memory using for buffering object parts.
|
||||
// Buffer size for every put is MaxObjectSize value from FrostFS network.
|
||||
// There is limit for total memory allocation for in-flight request and
|
||||
// can be set by InitParameters.SetMaxClientCutMemory (default value is 1gb).
|
||||
// Put requests will fail if this limit be reached.
|
||||
func (x *PrmObjectPut) SetClientCut(clientCut bool) {
|
||||
x.clientCut = clientCut
|
||||
}
|
||||
|
||||
// WithoutHomomorphicHash if set to true do not use Tillich-Zémor hash for payload.
|
||||
func (x *PrmObjectPut) WithoutHomomorphicHash(v bool) {
|
||||
x.withoutHomomorphicHash = v
|
||||
}
|
||||
|
||||
// SetBufferMaxSize sets max buffer size to read payload.
|
||||
// This value isn't used if object size is set explicitly and less than this value.
|
||||
// Default value 3MB.
|
||||
func (x *PrmObjectPut) SetBufferMaxSize(size uint64) {
|
||||
x.bufferMaxSize = size
|
||||
}
|
||||
|
||||
func (x *PrmObjectPut) setNetworkInfo(ni netmap.NetworkInfo) {
|
||||
x.networkInfo = ni
|
||||
}
|
||||
|
||||
// PrmObjectDelete groups parameters of DeleteObject operation.
|
||||
type PrmObjectDelete struct {
|
||||
prmCommon
|
||||
|
@ -1532,39 +1670,41 @@ func (x *PrmContainerEACL) SetContainerID(cnrID cid.ID) {
|
|||
|
||||
// PrmContainerSetEACL groups parameters of SetEACL operation.
|
||||
type PrmContainerSetEACL struct {
|
||||
table eacl.Table
|
||||
Table eacl.Table
|
||||
|
||||
sessionSet bool
|
||||
session session.Container
|
||||
Session *session.Container
|
||||
|
||||
waitParams WaitParams
|
||||
waitParamsSet bool
|
||||
WaitParams *WaitParams
|
||||
}
|
||||
|
||||
// SetTable sets structure of container's extended ACL to be used as a
|
||||
// parameter of the base client's operation.
|
||||
//
|
||||
// See git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client.PrmContainerSetEACL.SetTable.
|
||||
//
|
||||
// Deprecated: Use PrmContainerSetEACL.Table instead.
|
||||
func (x *PrmContainerSetEACL) SetTable(table eacl.Table) {
|
||||
x.table = table
|
||||
x.Table = table
|
||||
}
|
||||
|
||||
// WithinSession specifies session to be used as a parameter of the base
|
||||
// client's operation.
|
||||
//
|
||||
// See git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client.PrmContainerSetEACL.WithinSession.
|
||||
//
|
||||
// Deprecated: Use PrmContainerSetEACL.Session instead.
|
||||
func (x *PrmContainerSetEACL) WithinSession(s session.Container) {
|
||||
x.session = s
|
||||
x.sessionSet = true
|
||||
x.Session = &s
|
||||
}
|
||||
|
||||
// SetWaitParams specifies timeout params to complete operation.
|
||||
// If not provided the default one will be used.
|
||||
// Panics if any of the wait params isn't positive.
|
||||
//
|
||||
// Deprecated: Use PrmContainerSetEACL.WaitParams instead.
|
||||
func (x *PrmContainerSetEACL) SetWaitParams(waitParams WaitParams) {
|
||||
waitParams.checkForPositive()
|
||||
x.waitParams = waitParams
|
||||
x.waitParamsSet = true
|
||||
x.WaitParams = &waitParams
|
||||
}
|
||||
|
||||
// PrmBalanceGet groups parameters of Balance operation.
|
||||
|
@ -1638,6 +1778,8 @@ type Pool struct {
|
|||
rebalanceParams rebalanceParameters
|
||||
clientBuilder clientBuilder
|
||||
logger *zap.Logger
|
||||
|
||||
maxObjectSize uint64
|
||||
}
|
||||
|
||||
type innerPool struct {
|
||||
|
@ -1654,6 +1796,8 @@ const (
|
|||
defaultHealthcheckTimeout = 4 * time.Second
|
||||
defaultDialTimeout = 5 * time.Second
|
||||
defaultStreamTimeout = 10 * time.Second
|
||||
|
||||
defaultBufferMaxSizeForPut = 3 * 1024 * 1024 // 3 MB
|
||||
)
|
||||
|
||||
// NewPool creates connection pool using parameters.
|
||||
|
@ -1713,7 +1857,7 @@ func (p *Pool) Dial(ctx context.Context) error {
|
|||
}
|
||||
|
||||
var st session.Object
|
||||
err := initSessionForDuration(ctx, &st, clients[j], p.rebalanceParams.sessionExpirationDuration, *p.key)
|
||||
err := initSessionForDuration(ctx, &st, clients[j], p.rebalanceParams.sessionExpirationDuration, *p.key, false)
|
||||
if err != nil {
|
||||
clients[j].setUnhealthy()
|
||||
p.log(zap.WarnLevel, "failed to create frostfs session token for client",
|
||||
|
@ -1721,7 +1865,7 @@ func (p *Pool) Dial(ctx context.Context) error {
|
|||
continue
|
||||
}
|
||||
|
||||
_ = p.cache.Put(formCacheKey(addr, p.key), st)
|
||||
_ = p.cache.Put(formCacheKey(addr, p.key, false), st)
|
||||
atLeastOneHealthy = true
|
||||
}
|
||||
source := rand.NewSource(time.Now().UnixNano())
|
||||
|
@ -1742,6 +1886,12 @@ func (p *Pool) Dial(ctx context.Context) error {
|
|||
p.closedCh = make(chan struct{})
|
||||
p.innerPools = inner
|
||||
|
||||
ni, err := p.NetworkInfo(ctx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("get network info for max object size: %w", err)
|
||||
}
|
||||
p.maxObjectSize = ni.MaxObjectSize()
|
||||
|
||||
go p.startRebalance(ctx)
|
||||
return nil
|
||||
}
|
||||
|
@ -1784,6 +1934,7 @@ func fillDefaultInitParams(params *InitParameters, cache *sessionCache) {
|
|||
var prm wrapperPrm
|
||||
prm.setAddress(addr)
|
||||
prm.setKey(*params.key)
|
||||
prm.setLogger(params.logger)
|
||||
prm.setDialTimeout(params.nodeDialTimeout)
|
||||
prm.setStreamTimeout(params.nodeStreamTimeout)
|
||||
prm.setErrorThreshold(params.errorThreshold)
|
||||
|
@ -1952,9 +2103,15 @@ func (p *innerPool) connection() (client, error) {
|
|||
return nil, errors.New("no healthy client")
|
||||
}
|
||||
|
||||
func formCacheKey(address string, key *ecdsa.PrivateKey) string {
|
||||
func formCacheKey(address string, key *ecdsa.PrivateKey, clientCut bool) string {
|
||||
k := keys.PrivateKey{PrivateKey: *key}
|
||||
return address + k.String()
|
||||
|
||||
stype := "server"
|
||||
if clientCut {
|
||||
stype = "client"
|
||||
}
|
||||
|
||||
return address + stype + k.String()
|
||||
}
|
||||
|
||||
func (p *Pool) checkSessionTokenErr(err error, address string) bool {
|
||||
|
@ -1970,7 +2127,7 @@ func (p *Pool) checkSessionTokenErr(err error, address string) bool {
|
|||
return false
|
||||
}
|
||||
|
||||
func initSessionForDuration(ctx context.Context, dst *session.Object, c client, dur uint64, ownerKey ecdsa.PrivateKey) error {
|
||||
func initSessionForDuration(ctx context.Context, dst *session.Object, c client, dur uint64, ownerKey ecdsa.PrivateKey, clientCut bool) error {
|
||||
ni, err := c.networkInfo(ctx, prmNetworkInfo{})
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -1988,24 +2145,26 @@ func initSessionForDuration(ctx context.Context, dst *session.Object, c client,
|
|||
prm.setExp(exp)
|
||||
prm.useKey(ownerKey)
|
||||
|
||||
var (
|
||||
id uuid.UUID
|
||||
key frostfsecdsa.PublicKey
|
||||
)
|
||||
|
||||
if clientCut {
|
||||
id = uuid.New()
|
||||
key = frostfsecdsa.PublicKey(ownerKey.PublicKey)
|
||||
} else {
|
||||
res, err := c.sessionCreate(ctx, prm)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var id uuid.UUID
|
||||
|
||||
err = id.UnmarshalBinary(res.id)
|
||||
if err != nil {
|
||||
if err = id.UnmarshalBinary(res.id); err != nil {
|
||||
return fmt.Errorf("invalid session token ID: %w", err)
|
||||
}
|
||||
|
||||
var key frostfsecdsa.PublicKey
|
||||
|
||||
err = key.Decode(res.sessionKey)
|
||||
if err != nil {
|
||||
if err = key.Decode(res.sessionKey); err != nil {
|
||||
return fmt.Errorf("invalid public session key: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
dst.SetID(id)
|
||||
dst.SetAuthKey(&key)
|
||||
|
@ -2030,6 +2189,8 @@ type callContext struct {
|
|||
sessionCnr cid.ID
|
||||
sessionObjSet bool
|
||||
sessionObjs []oid.ID
|
||||
|
||||
sessionClientCut bool
|
||||
}
|
||||
|
||||
func (p *Pool) initCallContext(ctx *callContext, cfg prmCommon, prmCtx prmContext) error {
|
||||
|
@ -2066,12 +2227,12 @@ func (p *Pool) initCallContext(ctx *callContext, cfg prmCommon, prmCtx prmContex
|
|||
// opens new session or uses cached one.
|
||||
// Must be called only on initialized callContext with set sessionTarget.
|
||||
func (p *Pool) openDefaultSession(ctx context.Context, cc *callContext) error {
|
||||
cacheKey := formCacheKey(cc.endpoint, cc.key)
|
||||
cacheKey := formCacheKey(cc.endpoint, cc.key, cc.sessionClientCut)
|
||||
|
||||
tok, ok := p.cache.Get(cacheKey)
|
||||
if !ok {
|
||||
// init new session
|
||||
err := initSessionForDuration(ctx, &tok, cc.client, p.stokenDuration, *cc.key)
|
||||
err := initSessionForDuration(ctx, &tok, cc.client, p.stokenDuration, *cc.key, cc.sessionClientCut)
|
||||
if err != nil {
|
||||
return fmt.Errorf("session API client: %w", err)
|
||||
}
|
||||
|
@ -2136,6 +2297,7 @@ func (p *Pool) PutObject(ctx context.Context, prm PrmObjectPut) (oid.ID, error)
|
|||
p.fillAppropriateKey(&prm.prmCommon)
|
||||
|
||||
var ctxCall callContext
|
||||
ctxCall.sessionClientCut = prm.clientCut
|
||||
if err := p.initCallContext(&ctxCall, prm.prmCommon, prmCtx); err != nil {
|
||||
return oid.ID{}, fmt.Errorf("init call context: %w", err)
|
||||
}
|
||||
|
@ -2147,6 +2309,13 @@ func (p *Pool) PutObject(ctx context.Context, prm PrmObjectPut) (oid.ID, error)
|
|||
}
|
||||
}
|
||||
|
||||
if prm.clientCut {
|
||||
var ni netmap.NetworkInfo
|
||||
ni.SetCurrentEpoch(p.cache.Epoch())
|
||||
ni.SetMaxObjectSize(p.maxObjectSize) // we want to use initial max object size in PayloadSizeLimiter
|
||||
prm.setNetworkInfo(ni)
|
||||
}
|
||||
|
||||
id, err := ctxCall.client.objectPut(ctx, prm)
|
||||
if err != nil {
|
||||
// removes session token from cache in case of token error
|
||||
|
@ -2638,6 +2807,15 @@ func (p *Pool) NetworkInfo(ctx context.Context) (netmap.NetworkInfo, error) {
|
|||
func (p *Pool) Close() {
|
||||
p.cancel()
|
||||
<-p.closedCh
|
||||
|
||||
// close all clients
|
||||
for _, pools := range p.innerPools {
|
||||
for _, cli := range pools.clients {
|
||||
if cli.isDialed() {
|
||||
_ = cli.close()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// SyncContainerWithNetwork applies network configuration received via
|
||||
|
|
|
@ -106,7 +106,7 @@ func TestBuildPoolOneNodeFailed(t *testing.T) {
|
|||
if err != nil {
|
||||
return false
|
||||
}
|
||||
st, _ := clientPool.cache.Get(formCacheKey(cp.address(), clientPool.key))
|
||||
st, _ := clientPool.cache.Get(formCacheKey(cp.address(), clientPool.key, false))
|
||||
return st.AssertAuthKey(&expectedAuthKey)
|
||||
}
|
||||
require.Never(t, condition, 900*time.Millisecond, 100*time.Millisecond)
|
||||
|
@ -141,7 +141,7 @@ func TestOneNode(t *testing.T) {
|
|||
|
||||
cp, err := pool.connection()
|
||||
require.NoError(t, err)
|
||||
st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key))
|
||||
st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key, false))
|
||||
expectedAuthKey := frostfsecdsa.PublicKey(key1.PublicKey)
|
||||
require.True(t, st.AssertAuthKey(&expectedAuthKey))
|
||||
}
|
||||
|
@ -171,7 +171,7 @@ func TestTwoNodes(t *testing.T) {
|
|||
|
||||
cp, err := pool.connection()
|
||||
require.NoError(t, err)
|
||||
st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key))
|
||||
st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key, false))
|
||||
require.True(t, assertAuthKeyForAny(st, clientKeys))
|
||||
}
|
||||
|
||||
|
@ -226,7 +226,7 @@ func TestOneOfTwoFailed(t *testing.T) {
|
|||
for i := 0; i < 5; i++ {
|
||||
cp, err := pool.connection()
|
||||
require.NoError(t, err)
|
||||
st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key))
|
||||
st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key, false))
|
||||
require.True(t, assertAuthKeyForAny(st, clientKeys))
|
||||
}
|
||||
}
|
||||
|
@ -296,7 +296,7 @@ func TestSessionCache(t *testing.T) {
|
|||
// cache must contain session token
|
||||
cp, err := pool.connection()
|
||||
require.NoError(t, err)
|
||||
st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key))
|
||||
st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key, false))
|
||||
require.True(t, st.AssertAuthKey(&expectedAuthKey))
|
||||
|
||||
var prm PrmObjectGet
|
||||
|
@ -309,7 +309,7 @@ func TestSessionCache(t *testing.T) {
|
|||
// cache must not contain session token
|
||||
cp, err = pool.connection()
|
||||
require.NoError(t, err)
|
||||
_, ok := pool.cache.Get(formCacheKey(cp.address(), pool.key))
|
||||
_, ok := pool.cache.Get(formCacheKey(cp.address(), pool.key, false))
|
||||
require.False(t, ok)
|
||||
|
||||
var prm2 PrmObjectPut
|
||||
|
@ -321,7 +321,7 @@ func TestSessionCache(t *testing.T) {
|
|||
// cache must contain session token
|
||||
cp, err = pool.connection()
|
||||
require.NoError(t, err)
|
||||
st, _ = pool.cache.Get(formCacheKey(cp.address(), pool.key))
|
||||
st, _ = pool.cache.Get(formCacheKey(cp.address(), pool.key, false))
|
||||
require.True(t, st.AssertAuthKey(&expectedAuthKey))
|
||||
}
|
||||
|
||||
|
@ -365,7 +365,7 @@ func TestPriority(t *testing.T) {
|
|||
firstNode := func() bool {
|
||||
cp, err := pool.connection()
|
||||
require.NoError(t, err)
|
||||
st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key))
|
||||
st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key, false))
|
||||
return st.AssertAuthKey(&expectedAuthKey1)
|
||||
}
|
||||
|
||||
|
@ -373,7 +373,7 @@ func TestPriority(t *testing.T) {
|
|||
secondNode := func() bool {
|
||||
cp, err := pool.connection()
|
||||
require.NoError(t, err)
|
||||
st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key))
|
||||
st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key, false))
|
||||
return st.AssertAuthKey(&expectedAuthKey2)
|
||||
}
|
||||
require.Never(t, secondNode, time.Second, 200*time.Millisecond)
|
||||
|
@ -410,7 +410,7 @@ func TestSessionCacheWithKey(t *testing.T) {
|
|||
// cache must contain session token
|
||||
cp, err := pool.connection()
|
||||
require.NoError(t, err)
|
||||
st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key))
|
||||
st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key, false))
|
||||
require.True(t, st.AssertAuthKey(&expectedAuthKey))
|
||||
|
||||
var prm PrmObjectDelete
|
||||
|
@ -420,7 +420,7 @@ func TestSessionCacheWithKey(t *testing.T) {
|
|||
|
||||
err = pool.DeleteObject(ctx, prm)
|
||||
require.NoError(t, err)
|
||||
st, _ = pool.cache.Get(formCacheKey(cp.address(), anonKey))
|
||||
st, _ = pool.cache.Get(formCacheKey(cp.address(), anonKey, false))
|
||||
require.True(t, st.AssertAuthKey(&expectedAuthKey))
|
||||
}
|
||||
|
||||
|
@ -523,6 +523,44 @@ func TestStatusMonitor(t *testing.T) {
|
|||
|
||||
require.Equal(t, uint64(count), monitor.overallErrorRate())
|
||||
require.Equal(t, uint32(1), monitor.currentErrorRate())
|
||||
|
||||
t.Run("healthy status", func(t *testing.T) {
|
||||
cases := []struct {
|
||||
action func(*clientStatusMonitor)
|
||||
status uint32
|
||||
isDialed bool
|
||||
isHealthy bool
|
||||
description string
|
||||
}{
|
||||
{
|
||||
action: func(m *clientStatusMonitor) { m.setUnhealthyOnDial() },
|
||||
status: statusUnhealthyOnDial,
|
||||
isDialed: false,
|
||||
isHealthy: false,
|
||||
description: "set unhealthy on dial",
|
||||
},
|
||||
{
|
||||
action: func(m *clientStatusMonitor) { m.setUnhealthy() },
|
||||
status: statusUnhealthyOnRequest,
|
||||
isDialed: true,
|
||||
isHealthy: false,
|
||||
description: "set unhealthy on request",
|
||||
},
|
||||
{
|
||||
action: func(m *clientStatusMonitor) { m.setHealthy() },
|
||||
status: statusHealthy,
|
||||
isDialed: true,
|
||||
isHealthy: true,
|
||||
description: "set healthy",
|
||||
},
|
||||
}
|
||||
for _, tc := range cases {
|
||||
tc.action(&monitor)
|
||||
require.Equal(t, tc.status, monitor.healthy.Load())
|
||||
require.Equal(t, tc.isDialed, monitor.isDialed())
|
||||
require.Equal(t, tc.isHealthy, monitor.isHealthy())
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestHandleError(t *testing.T) {
|
||||
|
|
|
@ -3,6 +3,7 @@ package tree
|
|||
import (
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"errors"
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
|
@ -22,6 +23,11 @@ type treeClient struct {
|
|||
healthy bool
|
||||
}
|
||||
|
||||
var (
|
||||
// ErrUnhealthyEndpoint is returned when client in the pool considered unavailable.
|
||||
ErrUnhealthyEndpoint = errors.New("unhealthy endpoint")
|
||||
)
|
||||
|
||||
// newTreeClient creates new tree client with auto dial.
|
||||
func newTreeClient(addr string, opts ...grpc.DialOption) *treeClient {
|
||||
return &treeClient{
|
||||
|
@ -102,7 +108,7 @@ func (c *treeClient) serviceClient() (grpcService.TreeServiceClient, error) {
|
|||
defer c.mu.RUnlock()
|
||||
|
||||
if c.conn == nil || !c.healthy {
|
||||
return nil, fmt.Errorf("unhealthy endpoint: '%s'", c.address)
|
||||
return nil, fmt.Errorf("%w: '%s'", ErrUnhealthyEndpoint, c.address)
|
||||
}
|
||||
|
||||
return c.service, nil
|
||||
|
|
|
@ -32,6 +32,9 @@ var (
|
|||
|
||||
// ErrNodeAccessDenied is returned from Tree service in case of access denied error.
|
||||
ErrNodeAccessDenied = errors.New("access denied")
|
||||
|
||||
// errNodeEmpty is used to trigger retry when 'GetNodeByPath' return empty result.
|
||||
errNodeEmptyResult = errors.New("empty result")
|
||||
)
|
||||
|
||||
// client represents virtual connection to the single FrostFS tree service from which Pool is formed.
|
||||
|
@ -296,8 +299,15 @@ func (p *Pool) GetNodes(ctx context.Context, prm GetNodesParams) ([]*grpcService
|
|||
var resp *grpcService.GetNodeByPathResponse
|
||||
if err := p.requestWithRetry(func(client grpcService.TreeServiceClient) (inErr error) {
|
||||
resp, inErr = client.GetNodeByPath(ctx, request)
|
||||
// Pool wants to do retry 'GetNodeByPath' request if result is empty.
|
||||
// Empty result is expected due to delayed tree service sync.
|
||||
// Return an error there to trigger retry and ignore it after,
|
||||
// to keep compatibility with 'GetNodeByPath' implementation.
|
||||
if inErr == nil && len(resp.Body.Nodes) == 0 {
|
||||
return errNodeEmptyResult
|
||||
}
|
||||
return handleError("failed to get node by path", inErr)
|
||||
}); err != nil {
|
||||
}); err != nil && !errors.Is(err, errNodeEmptyResult) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
|
@ -720,7 +730,7 @@ func (p *Pool) setStartIndices(i, j int) {
|
|||
|
||||
func (p *Pool) requestWithRetry(fn func(client grpcService.TreeServiceClient) error) error {
|
||||
var (
|
||||
err error
|
||||
err, finErr error
|
||||
cl grpcService.TreeServiceClient
|
||||
)
|
||||
|
||||
|
@ -740,16 +750,44 @@ func (p *Pool) requestWithRetry(fn func(client grpcService.TreeServiceClient) er
|
|||
}
|
||||
return err
|
||||
}
|
||||
finErr = finalError(finErr, err)
|
||||
p.log(zap.DebugLevel, "tree request error", zap.String("address", p.innerPools[indexI].clients[indexJ].endpoint()), zap.Error(err))
|
||||
}
|
||||
startJ = 0
|
||||
}
|
||||
|
||||
return err
|
||||
return finErr
|
||||
}
|
||||
|
||||
func shouldTryAgain(err error) bool {
|
||||
return !(err == nil ||
|
||||
errors.Is(err, ErrNodeNotFound) ||
|
||||
errors.Is(err, ErrNodeAccessDenied))
|
||||
return !(err == nil || errors.Is(err, ErrNodeAccessDenied))
|
||||
}
|
||||
|
||||
func prioErr(err error) int {
|
||||
switch {
|
||||
case err == nil:
|
||||
return -1
|
||||
case errors.Is(err, ErrNodeAccessDenied):
|
||||
return 100
|
||||
case errors.Is(err, ErrNodeNotFound) ||
|
||||
errors.Is(err, errNodeEmptyResult):
|
||||
return 200
|
||||
case errors.Is(err, ErrUnhealthyEndpoint):
|
||||
return 300
|
||||
default:
|
||||
return 500
|
||||
}
|
||||
}
|
||||
|
||||
func finalError(current, candidate error) error {
|
||||
if current == nil || candidate == nil {
|
||||
return candidate
|
||||
}
|
||||
|
||||
// lower priority error is more desirable
|
||||
if prioErr(candidate) < prioErr(current) {
|
||||
return candidate
|
||||
}
|
||||
|
||||
return current
|
||||
}
|
||||
|
|
|
@ -156,6 +156,43 @@ func TestRetry(t *testing.T) {
|
|||
require.NoError(t, err)
|
||||
checkIndicesAndReset(t, p, 0, 0)
|
||||
})
|
||||
|
||||
t.Run("error empty result", func(t *testing.T) {
|
||||
errNodes, index := 2, 0
|
||||
err := p.requestWithRetry(func(client grpcService.TreeServiceClient) error {
|
||||
if index < errNodes {
|
||||
index++
|
||||
return errNodeEmptyResult
|
||||
}
|
||||
return nil
|
||||
})
|
||||
require.NoError(t, err)
|
||||
checkIndicesAndReset(t, p, 0, errNodes)
|
||||
})
|
||||
|
||||
t.Run("error not found", func(t *testing.T) {
|
||||
errNodes, index := 2, 0
|
||||
err := p.requestWithRetry(func(client grpcService.TreeServiceClient) error {
|
||||
if index < errNodes {
|
||||
index++
|
||||
return ErrNodeNotFound
|
||||
}
|
||||
return nil
|
||||
})
|
||||
require.NoError(t, err)
|
||||
checkIndicesAndReset(t, p, 0, errNodes)
|
||||
})
|
||||
|
||||
t.Run("error access denied", func(t *testing.T) {
|
||||
var index int
|
||||
err := p.requestWithRetry(func(client grpcService.TreeServiceClient) error {
|
||||
index++
|
||||
return ErrNodeAccessDenied
|
||||
})
|
||||
require.ErrorIs(t, err, ErrNodeAccessDenied)
|
||||
require.Equal(t, 1, index)
|
||||
checkIndicesAndReset(t, p, 0, 0)
|
||||
})
|
||||
}
|
||||
|
||||
func TestRebalance(t *testing.T) {
|
||||
|
|
|
@ -1,9 +1,10 @@
|
|||
package session_test
|
||||
|
||||
import (
|
||||
"crypto/rand"
|
||||
"fmt"
|
||||
"math"
|
||||
"math/rand"
|
||||
mrand "math/rand"
|
||||
"testing"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/refs"
|
||||
|
@ -396,7 +397,7 @@ func TestContainer_AppliedTo(t *testing.T) {
|
|||
func TestContainer_InvalidAt(t *testing.T) {
|
||||
var x session.Container
|
||||
|
||||
nbf := rand.Uint64()
|
||||
nbf := mrand.Uint64()
|
||||
if nbf == math.MaxUint64 {
|
||||
nbf--
|
||||
}
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
package user_test
|
||||
|
||||
import (
|
||||
"math/rand"
|
||||
"crypto/rand"
|
||||
"testing"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/refs"
|
||||
|
|
Loading…
Reference in a new issue