Compare commits

..

No commits in common. "fc4551b84341ab45cf856b88713c1dbfa01afad7" and "6fdbe755179efb5fc5cdf5f1abc78cb4e982c3c8" have entirely different histories.

52 changed files with 654 additions and 1465 deletions

View file

@ -25,7 +25,7 @@ linters-settings:
# report about shadowed variables # report about shadowed variables
check-shadowing: false check-shadowing: false
staticcheck: staticcheck:
checks: ["all"] checks: ["all", "-SA1019"] # TODO Enable SA1019 after deprecated warning are fixed.
funlen: funlen:
lines: 80 # default 60 lines: 80 # default 60
statements: 60 # default 40 statements: 60 # default 40

View file

@ -24,17 +24,17 @@ type Checksum refs.Checksum
// Type represents the enumeration // Type represents the enumeration
// of checksum types. // of checksum types.
type Type refs.ChecksumType type Type uint8
const ( const (
// Unknown is an undefined checksum type. // Unknown is an undefined checksum type.
Unknown Type = Type(refs.UnknownChecksum) Unknown Type = iota
// SHA256 is a SHA256 checksum type. // SHA256 is a SHA256 checksum type.
SHA256 = Type(refs.SHA256) SHA256
// TZ is a Tillich-Zémor checksum type. // TZ is a Tillich-Zémor checksum type.
TZ = Type(refs.TillichZemor) TZ
) )
// ReadFromV2 reads Checksum from the refs.Checksum message. Checks if the // ReadFromV2 reads Checksum from the refs.Checksum message. Checks if the

View file

@ -2,9 +2,9 @@ package checksum
import ( import (
"bytes" "bytes"
"crypto/rand"
"crypto/sha256" "crypto/sha256"
"fmt" "fmt"
"math/rand"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/refs" "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/refs"
) )

View file

@ -1,8 +1,8 @@
package checksumtest package checksumtest
import ( import (
"crypto/rand"
"crypto/sha256" "crypto/sha256"
"math/rand"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/checksum" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/checksum"
) )
@ -11,7 +11,7 @@ import (
func Checksum() checksum.Checksum { func Checksum() checksum.Checksum {
var cs [sha256.Size]byte var cs [sha256.Size]byte
_, _ = rand.Read(cs[:]) rand.Read(cs[:])
var x checksum.Checksum var x checksum.Checksum

View file

@ -47,8 +47,6 @@ func writeXHeadersToMeta(xHeaders []string, h *v2session.RequestMetaHeader) {
return return
} }
// TODO (aarifullin): remove the panic when all client parameters will check XHeaders
// within buildRequest invocation.
if len(xHeaders)%2 != 0 { if len(xHeaders)%2 != 0 {
panic("slice of X-Headers with odd length") panic("slice of X-Headers with odd length")
} }

View file

@ -18,20 +18,20 @@ import (
// PrmContainerSetEACL groups parameters of ContainerSetEACL operation. // PrmContainerSetEACL groups parameters of ContainerSetEACL operation.
type PrmContainerSetEACL struct { type PrmContainerSetEACL struct {
// FrostFS request X-Headers. prmCommonMeta
XHeaders []string
Table *eacl.Table tableSet bool
table eacl.Table
Session *session.Container sessionSet bool
session session.Container
} }
// SetTable sets eACL table structure to be set for the container. // SetTable sets eACL table structure to be set for the container.
// Required parameter. // Required parameter.
//
// Deprecated: Use PrmContainerSetEACL.Table instead.
func (x *PrmContainerSetEACL) SetTable(table eacl.Table) { func (x *PrmContainerSetEACL) SetTable(table eacl.Table) {
x.Table = &table x.table = table
x.tableSet = true
} }
// WithinSession specifies session within which extended ACL of the container // WithinSession specifies session within which extended ACL of the container
@ -45,22 +45,17 @@ func (x *PrmContainerSetEACL) SetTable(table eacl.Table) {
// for which extended ACL is going to be set // for which extended ACL is going to be set
// - session operation MUST be session.VerbContainerSetEACL (ForVerb) // - session operation MUST be session.VerbContainerSetEACL (ForVerb)
// - token MUST be signed using private key of the owner of the container to be saved // - token MUST be signed using private key of the owner of the container to be saved
//
// Deprecated: Use PrmContainerSetEACL.Session instead.
func (x *PrmContainerSetEACL) WithinSession(s session.Container) { func (x *PrmContainerSetEACL) WithinSession(s session.Container) {
x.Session = &s x.session = s
x.sessionSet = true
} }
func (x *PrmContainerSetEACL) buildRequest(c *Client) (*v2container.SetExtendedACLRequest, error) { func (x *PrmContainerSetEACL) buildRequest(c *Client) (*v2container.SetExtendedACLRequest, error) {
if x.Table == nil { if !x.tableSet {
return nil, errorEACLTableNotSet return nil, errorEACLTableNotSet
} }
if len(x.XHeaders)%2 != 0 { eaclV2 := x.table.ToV2()
return nil, errorInvalidXHeaders
}
eaclV2 := x.Table.ToV2()
var sig frostfscrypto.Signature var sig frostfscrypto.Signature
@ -77,11 +72,11 @@ func (x *PrmContainerSetEACL) buildRequest(c *Client) (*v2container.SetExtendedA
reqBody.SetSignature(&sigv2) reqBody.SetSignature(&sigv2)
var meta v2session.RequestMetaHeader var meta v2session.RequestMetaHeader
writeXHeadersToMeta(x.XHeaders, &meta) writeXHeadersToMeta(x.prmCommonMeta.xHeaders, &meta)
if x.Session != nil { if x.sessionSet {
var tokv2 v2session.Token var tokv2 v2session.Token
x.Session.WriteToV2(&tokv2) x.session.WriteToV2(&tokv2)
meta.SetSessionToken(&tokv2) meta.SetSessionToken(&tokv2)
} }

View file

@ -14,29 +14,27 @@ import (
// PrmAnnounceSpace groups parameters of ContainerAnnounceUsedSpace operation. // PrmAnnounceSpace groups parameters of ContainerAnnounceUsedSpace operation.
type PrmAnnounceSpace struct { type PrmAnnounceSpace struct {
XHeaders []string prmCommonMeta
Announcements []container.SizeEstimation announcements []container.SizeEstimation
} }
// SetValues sets values describing volume of space that is used for the container objects. // SetValues sets values describing volume of space that is used for the container objects.
// Required parameter. Must not be empty. // Required parameter. Must not be empty.
// //
// Must not be mutated before the end of the operation. // Must not be mutated before the end of the operation.
//
// Deprecated: Use PrmAnnounceSpace.Announcements instead.
func (x *PrmAnnounceSpace) SetValues(vs []container.SizeEstimation) { func (x *PrmAnnounceSpace) SetValues(vs []container.SizeEstimation) {
x.Announcements = vs x.announcements = vs
} }
func (x *PrmAnnounceSpace) buildRequest(c *Client) (*v2container.AnnounceUsedSpaceRequest, error) { func (x *PrmAnnounceSpace) buildRequest(c *Client) (*v2container.AnnounceUsedSpaceRequest, error) {
if len(x.Announcements) == 0 { if len(x.announcements) == 0 {
return nil, errorMissingAnnouncements return nil, errorMissingAnnouncements
} }
v2announce := make([]v2container.UsedSpaceAnnouncement, len(x.Announcements)) v2announce := make([]v2container.UsedSpaceAnnouncement, len(x.announcements))
for i := range x.Announcements { for i := range x.announcements {
x.Announcements[i].WriteToV2(&v2announce[i]) x.announcements[i].WriteToV2(&v2announce[i])
} }
reqBody := new(v2container.AnnounceUsedSpaceRequestBody) reqBody := new(v2container.AnnounceUsedSpaceRequestBody)

View file

@ -16,12 +16,12 @@ import (
// PrmEndpointInfo groups parameters of EndpointInfo operation. // PrmEndpointInfo groups parameters of EndpointInfo operation.
type PrmEndpointInfo struct { type PrmEndpointInfo struct {
XHeaders []string prmCommonMeta
} }
func (x *PrmEndpointInfo) buildRequest(c *Client) (*v2netmap.LocalNodeInfoRequest, error) { func (x *PrmEndpointInfo) buildRequest(c *Client) (*v2netmap.LocalNodeInfoRequest, error) {
meta := new(v2session.RequestMetaHeader) meta := new(v2session.RequestMetaHeader)
writeXHeadersToMeta(x.XHeaders, meta) writeXHeadersToMeta(x.xHeaders, meta)
req := new(v2netmap.LocalNodeInfoRequest) req := new(v2netmap.LocalNodeInfoRequest)
req.SetBody(new(v2netmap.LocalNodeInfoRequestBody)) req.SetBody(new(v2netmap.LocalNodeInfoRequestBody))
@ -112,12 +112,12 @@ func (c *Client) EndpointInfo(ctx context.Context, prm PrmEndpointInfo) (*ResEnd
// PrmNetworkInfo groups parameters of NetworkInfo operation. // PrmNetworkInfo groups parameters of NetworkInfo operation.
type PrmNetworkInfo struct { type PrmNetworkInfo struct {
XHeaders []string prmCommonMeta
} }
func (x PrmNetworkInfo) buildRequest(c *Client) (*v2netmap.NetworkInfoRequest, error) { func (x PrmNetworkInfo) buildRequest(c *Client) (*v2netmap.NetworkInfoRequest, error) {
meta := new(v2session.RequestMetaHeader) meta := new(v2session.RequestMetaHeader)
writeXHeadersToMeta(x.XHeaders, meta) writeXHeadersToMeta(x.xHeaders, meta)
var req v2netmap.NetworkInfoRequest var req v2netmap.NetworkInfoRequest
req.SetBody(new(v2netmap.NetworkInfoRequestBody)) req.SetBody(new(v2netmap.NetworkInfoRequestBody))

View file

@ -21,25 +21,71 @@ import (
// PrmObjectDelete groups parameters of ObjectDelete operation. // PrmObjectDelete groups parameters of ObjectDelete operation.
type PrmObjectDelete struct { type PrmObjectDelete struct {
XHeaders []string meta v2session.RequestMetaHeader
BearerToken *bearer.Token body v2object.DeleteRequestBody
Session *session.Object addr v2refs.Address
ContainerID *cid.ID keySet bool
key ecdsa.PrivateKey
}
ObjectID *oid.ID // WithinSession specifies session within which object should be read.
//
// Creator of the session acquires the authorship of the request.
// This may affect the execution of an operation (e.g. access control).
//
// Must be signed.
func (x *PrmObjectDelete) WithinSession(t session.Object) {
var tv2 v2session.Token
t.WriteToV2(&tv2)
Key *ecdsa.PrivateKey x.meta.SetSessionToken(&tv2)
}
// WithBearerToken attaches bearer token to be used for the operation.
//
// If set, underlying eACL rules will be used in access control.
//
// Must be signed.
func (x *PrmObjectDelete) WithBearerToken(t bearer.Token) {
var v2token acl.BearerToken
t.WriteToV2(&v2token)
x.meta.SetBearerToken(&v2token)
}
// FromContainer specifies FrostFS container of the object.
// Required parameter.
func (x *PrmObjectDelete) FromContainer(id cid.ID) {
var cidV2 v2refs.ContainerID
id.WriteToV2(&cidV2)
x.addr.SetContainerID(&cidV2)
}
// ByID specifies identifier of the requested object.
// Required parameter.
func (x *PrmObjectDelete) ByID(id oid.ID) {
var idV2 v2refs.ObjectID
id.WriteToV2(&idV2)
x.addr.SetObjectID(&idV2)
} }
// UseKey specifies private key to sign the requests. // UseKey specifies private key to sign the requests.
// If key is not provided, then Client default key is used. // If key is not provided, then Client default key is used.
func (x *PrmObjectDelete) UseKey(key ecdsa.PrivateKey) {
x.keySet = true
x.key = key
}
// WithXHeaders specifies list of extended headers (string key-value pairs)
// to be attached to the request. Must have an even length.
// //
// Deprecated: Use PrmObjectDelete.Key instead. // Slice must not be mutated until the operation completes.
func (prm *PrmObjectDelete) UseKey(key ecdsa.PrivateKey) { func (x *PrmObjectDelete) WithXHeaders(hs ...string) {
prm.Key = &key writeXHeadersToMeta(hs, &x.meta)
} }
// ResObjectDelete groups resulting values of ObjectDelete operation. // ResObjectDelete groups resulting values of ObjectDelete operation.
@ -54,54 +100,6 @@ func (x ResObjectDelete) Tombstone() oid.ID {
return x.tomb return x.tomb
} }
func (prm *PrmObjectDelete) buildRequest(c *Client) (*v2object.DeleteRequest, error) {
if prm.ContainerID == nil {
return nil, errorMissingContainer
}
if prm.ObjectID == nil {
return nil, errorMissingObject
}
if len(prm.XHeaders)%2 != 0 {
return nil, errorInvalidXHeaders
}
meta := new(v2session.RequestMetaHeader)
writeXHeadersToMeta(prm.XHeaders, meta)
if prm.BearerToken != nil {
v2BearerToken := new(acl.BearerToken)
prm.BearerToken.WriteToV2(v2BearerToken)
meta.SetBearerToken(v2BearerToken)
}
if prm.Session != nil {
v2SessionToken := new(v2session.Token)
prm.Session.WriteToV2(v2SessionToken)
meta.SetSessionToken(v2SessionToken)
}
addr := new(v2refs.Address)
cnrV2 := new(v2refs.ContainerID)
prm.ContainerID.WriteToV2(cnrV2)
addr.SetContainerID(cnrV2)
objV2 := new(v2refs.ObjectID)
prm.ObjectID.WriteToV2(objV2)
addr.SetObjectID(objV2)
body := new(v2object.DeleteRequestBody)
body.SetAddress(addr)
req := new(v2object.DeleteRequest)
req.SetBody(body)
c.prepareRequest(req, meta)
return req, nil
}
// ObjectDelete marks an object for deletion from the container using FrostFS API protocol. // ObjectDelete marks an object for deletion from the container using FrostFS API protocol.
// As a marker, a special unit called a tombstone is placed in the container. // As a marker, a special unit called a tombstone is placed in the container.
// It confirms the user's intent to delete the object, and is itself a container object. // It confirms the user's intent to delete the object, and is itself a container object.
@ -126,22 +124,32 @@ func (prm *PrmObjectDelete) buildRequest(c *Client) (*v2object.DeleteRequest, er
// - *apistatus.ObjectLocked; // - *apistatus.ObjectLocked;
// - *apistatus.SessionTokenExpired. // - *apistatus.SessionTokenExpired.
func (c *Client) ObjectDelete(ctx context.Context, prm PrmObjectDelete) (*ResObjectDelete, error) { func (c *Client) ObjectDelete(ctx context.Context, prm PrmObjectDelete) (*ResObjectDelete, error) {
req, err := prm.buildRequest(c) switch {
if err != nil { case prm.addr.GetContainerID() == nil:
return nil, err return nil, errorMissingContainer
case prm.addr.GetObjectID() == nil:
return nil, errorMissingObject
} }
// form request body
prm.body.SetAddress(&prm.addr)
// form request
var req v2object.DeleteRequest
req.SetBody(&prm.body)
c.prepareRequest(&req, &prm.meta)
key := c.prm.key key := c.prm.key
if prm.Key != nil { if prm.keySet {
key = *prm.Key key = prm.key
} }
err = signature.SignServiceMessage(&key, req) err := signature.SignServiceMessage(&key, &req)
if err != nil { if err != nil {
return nil, fmt.Errorf("sign request: %w", err) return nil, fmt.Errorf("sign request: %w", err)
} }
resp, err := rpcapi.DeleteObject(&c.c, req, client.WithContext(ctx)) resp, err := rpcapi.DeleteObject(&c.c, &req, client.WithContext(ctx))
if err != nil { if err != nil {
return nil, err return nil, err
} }

View file

@ -22,76 +22,77 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session"
) )
// PrmObjectGet groups parameters of ObjectGetInit operation. // shared parameters of GET/HEAD/RANGE.
type PrmObjectGet struct { type prmObjectRead struct {
XHeaders []string meta v2session.RequestMetaHeader
BearerToken *bearer.Token raw bool
Session *session.Object addr v2refs.Address
Raw bool
Local bool
ContainerID *cid.ID
ObjectID *oid.ID
Key *ecdsa.PrivateKey
} }
func (prm *PrmObjectGet) buildRequest(c *Client) (*v2object.GetRequest, error) { // WithXHeaders specifies list of extended headers (string key-value pairs)
if prm.ContainerID == nil { // to be attached to the request. Must have an even length.
return nil, errorMissingContainer //
} // Slice must not be mutated until the operation completes.
func (x *prmObjectRead) WithXHeaders(hs ...string) {
writeXHeadersToMeta(hs, &x.meta)
}
if prm.ObjectID == nil { // MarkRaw marks an intent to read physically stored object.
return nil, errorMissingObject func (x *prmObjectRead) MarkRaw() {
} x.raw = true
}
if len(prm.XHeaders)%2 != 0 { // MarkLocal tells the server to execute the operation locally.
return nil, errorInvalidXHeaders func (x *prmObjectRead) MarkLocal() {
} x.meta.SetTTL(1)
}
meta := new(v2session.RequestMetaHeader) // WithinSession specifies session within which object should be read.
writeXHeadersToMeta(prm.XHeaders, meta) //
// Creator of the session acquires the authorship of the request.
// This may affect the execution of an operation (e.g. access control).
//
// Must be signed.
func (x *prmObjectRead) WithinSession(t session.Object) {
var tokv2 v2session.Token
t.WriteToV2(&tokv2)
x.meta.SetSessionToken(&tokv2)
}
if prm.BearerToken != nil { // WithBearerToken attaches bearer token to be used for the operation.
v2BearerToken := new(acl.BearerToken) //
prm.BearerToken.WriteToV2(v2BearerToken) // If set, underlying eACL rules will be used in access control.
meta.SetBearerToken(v2BearerToken) //
} // Must be signed.
func (x *prmObjectRead) WithBearerToken(t bearer.Token) {
var v2token acl.BearerToken
t.WriteToV2(&v2token)
x.meta.SetBearerToken(&v2token)
}
if prm.Session != nil { // FromContainer specifies FrostFS container of the object.
v2SessionToken := new(v2session.Token) // Required parameter.
prm.Session.WriteToV2(v2SessionToken) func (x *prmObjectRead) FromContainer(id cid.ID) {
meta.SetSessionToken(v2SessionToken) var cnrV2 v2refs.ContainerID
} id.WriteToV2(&cnrV2)
x.addr.SetContainerID(&cnrV2)
}
if prm.Local { // ByID specifies identifier of the requested object.
meta.SetTTL(1) // Required parameter.
} func (x *prmObjectRead) ByID(id oid.ID) {
var objV2 v2refs.ObjectID
id.WriteToV2(&objV2)
x.addr.SetObjectID(&objV2)
}
addr := new(v2refs.Address) // PrmObjectGet groups parameters of ObjectGetInit operation.
type PrmObjectGet struct {
prmObjectRead
cnrV2 := new(v2refs.ContainerID) key *ecdsa.PrivateKey
prm.ContainerID.WriteToV2(cnrV2)
addr.SetContainerID(cnrV2)
objV2 := new(v2refs.ObjectID)
prm.ObjectID.WriteToV2(objV2)
addr.SetObjectID(objV2)
body := new(v2object.GetRequestBody)
body.SetRaw(prm.Raw)
body.SetAddress(addr)
req := new(v2object.GetRequest)
req.SetBody(body)
c.prepareRequest(req, meta)
return req, nil
} }
// ResObjectGet groups the final result values of ObjectGetInit operation. // ResObjectGet groups the final result values of ObjectGetInit operation.
@ -121,10 +122,8 @@ type ObjectReader struct {
// UseKey specifies private key to sign the requests. // UseKey specifies private key to sign the requests.
// If key is not provided, then Client default key is used. // If key is not provided, then Client default key is used.
// func (x *PrmObjectGet) UseKey(key ecdsa.PrivateKey) {
// Deprecated: Use PrmObjectGet.Key instead. x.key = &key
func (prm *PrmObjectGet) UseKey(key ecdsa.PrivateKey) {
prm.Key = &key
} }
// ReadHeader reads header of the object. Result means success. // ReadHeader reads header of the object. Result means success.
@ -300,24 +299,39 @@ func (x *ObjectReader) Read(p []byte) (int, error) {
// Returns an error if parameters are set incorrectly (see PrmObjectGet docs). // Returns an error if parameters are set incorrectly (see PrmObjectGet docs).
// Context is required and must not be nil. It is used for network communication. // Context is required and must not be nil. It is used for network communication.
func (c *Client) ObjectGetInit(ctx context.Context, prm PrmObjectGet) (*ObjectReader, error) { func (c *Client) ObjectGetInit(ctx context.Context, prm PrmObjectGet) (*ObjectReader, error) {
req, err := prm.buildRequest(c) // check parameters
if err != nil { switch {
return nil, err case prm.addr.GetContainerID() == nil:
return nil, errorMissingContainer
case prm.addr.GetObjectID() == nil:
return nil, errorMissingObject
} }
key := prm.Key // form request body
var body v2object.GetRequestBody
body.SetRaw(prm.raw)
body.SetAddress(&prm.addr)
// form request
var req v2object.GetRequest
req.SetBody(&body)
c.prepareRequest(&req, &prm.meta)
key := prm.key
if key == nil { if key == nil {
key = &c.prm.key key = &c.prm.key
} }
err = signature.SignServiceMessage(key, req) err := signature.SignServiceMessage(key, &req)
if err != nil { if err != nil {
return nil, fmt.Errorf("sign request: %w", err) return nil, fmt.Errorf("sign request: %w", err)
} }
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
stream, err := rpcapi.GetObject(&c.c, req, client.WithContext(ctx)) stream, err := rpcapi.GetObject(&c.c, &req, client.WithContext(ctx))
if err != nil { if err != nil {
cancel() cancel()
return nil, fmt.Errorf("open stream: %w", err) return nil, fmt.Errorf("open stream: %w", err)
@ -333,29 +347,17 @@ func (c *Client) ObjectGetInit(ctx context.Context, prm PrmObjectGet) (*ObjectRe
// PrmObjectHead groups parameters of ObjectHead operation. // PrmObjectHead groups parameters of ObjectHead operation.
type PrmObjectHead struct { type PrmObjectHead struct {
XHeaders []string prmObjectRead
BearerToken *bearer.Token keySet bool
key ecdsa.PrivateKey
Session *session.Object
Raw bool
Local bool
ContainerID *cid.ID
ObjectID *oid.ID
Key *ecdsa.PrivateKey
} }
// UseKey specifies private key to sign the requests. // UseKey specifies private key to sign the requests.
// If key is not provided, then Client default key is used. // If key is not provided, then Client default key is used.
// func (x *PrmObjectHead) UseKey(key ecdsa.PrivateKey) {
// Deprecated: Use PrmObjectHead.Key instead. x.keySet = true
func (prm *PrmObjectHead) UseKey(key ecdsa.PrivateKey) { x.key = key
prm.Key = &key
} }
// ResObjectHead groups resulting values of ObjectHead operation. // ResObjectHead groups resulting values of ObjectHead operation.
@ -388,58 +390,6 @@ func (x *ResObjectHead) ReadHeader(dst *object.Object) bool {
return true return true
} }
func (prm *PrmObjectHead) buildRequest(c *Client) (*v2object.HeadRequest, error) {
if prm.ContainerID == nil {
return nil, errorMissingContainer
}
if prm.ObjectID == nil {
return nil, errorMissingObject
}
if len(prm.XHeaders)%2 != 0 {
return nil, errorInvalidXHeaders
}
meta := new(v2session.RequestMetaHeader)
writeXHeadersToMeta(prm.XHeaders, meta)
if prm.BearerToken != nil {
v2BearerToken := new(acl.BearerToken)
prm.BearerToken.WriteToV2(v2BearerToken)
meta.SetBearerToken(v2BearerToken)
}
if prm.Session != nil {
v2SessionToken := new(v2session.Token)
prm.Session.WriteToV2(v2SessionToken)
meta.SetSessionToken(v2SessionToken)
}
if prm.Local {
meta.SetTTL(1)
}
addr := new(v2refs.Address)
cnrV2 := new(v2refs.ContainerID)
prm.ContainerID.WriteToV2(cnrV2)
addr.SetContainerID(cnrV2)
objV2 := new(v2refs.ObjectID)
prm.ObjectID.WriteToV2(objV2)
addr.SetObjectID(objV2)
body := new(v2object.HeadRequestBody)
body.SetRaw(prm.Raw)
body.SetAddress(addr)
req := new(v2object.HeadRequest)
req.SetBody(body)
c.prepareRequest(req, meta)
return req, nil
}
// ObjectHead reads object header through a remote server using FrostFS API protocol. // ObjectHead reads object header through a remote server using FrostFS API protocol.
// //
// Exactly one return value is non-nil. By default, server status is returned in res structure. // Exactly one return value is non-nil. By default, server status is returned in res structure.
@ -463,24 +413,33 @@ func (prm *PrmObjectHead) buildRequest(c *Client) (*v2object.HeadRequest, error)
// - *apistatus.ObjectAlreadyRemoved; // - *apistatus.ObjectAlreadyRemoved;
// - *apistatus.SessionTokenExpired. // - *apistatus.SessionTokenExpired.
func (c *Client) ObjectHead(ctx context.Context, prm PrmObjectHead) (*ResObjectHead, error) { func (c *Client) ObjectHead(ctx context.Context, prm PrmObjectHead) (*ResObjectHead, error) {
req, err := prm.buildRequest(c) switch {
if err != nil { case prm.addr.GetContainerID() == nil:
return nil, err return nil, errorMissingContainer
case prm.addr.GetObjectID() == nil:
return nil, errorMissingObject
} }
var body v2object.HeadRequestBody
body.SetRaw(prm.raw)
body.SetAddress(&prm.addr)
var req v2object.HeadRequest
req.SetBody(&body)
c.prepareRequest(&req, &prm.meta)
key := c.prm.key key := c.prm.key
if prm.Key != nil { if prm.keySet {
key = *prm.Key key = prm.key
} }
// sign the request // sign the request
err := signature.SignServiceMessage(&key, &req)
err = signature.SignServiceMessage(&key, req)
if err != nil { if err != nil {
return nil, fmt.Errorf("sign request: %w", err) return nil, fmt.Errorf("sign request: %w", err)
} }
resp, err := rpcapi.HeadObject(&c.c, req, client.WithContext(ctx)) resp, err := rpcapi.HeadObject(&c.c, &req, client.WithContext(ctx))
if err != nil { if err != nil {
return nil, fmt.Errorf("write request: %w", err) return nil, fmt.Errorf("write request: %w", err)
} }
@ -495,7 +454,7 @@ func (c *Client) ObjectHead(ctx context.Context, prm PrmObjectHead) (*ResObjectH
return &res, nil return &res, nil
} }
res.idObj = *prm.ObjectID _ = res.idObj.ReadFromV2(*prm.addr.GetObjectID())
switch v := resp.GetBody().GetHeaderPart().(type) { switch v := resp.GetBody().GetHeaderPart().(type) {
default: default:
@ -511,95 +470,29 @@ func (c *Client) ObjectHead(ctx context.Context, prm PrmObjectHead) (*ResObjectH
// PrmObjectRange groups parameters of ObjectRange operation. // PrmObjectRange groups parameters of ObjectRange operation.
type PrmObjectRange struct { type PrmObjectRange struct {
XHeaders []string prmObjectRead
BearerToken *bearer.Token key *ecdsa.PrivateKey
Session *session.Object rng v2object.Range
Raw bool
Local bool
ContainerID *cid.ID
ObjectID *oid.ID
Key *ecdsa.PrivateKey
Offset uint64
Length uint64
} }
func (prm *PrmObjectRange) buildRequest(c *Client) (*v2object.GetRangeRequest, error) { // SetOffset sets offset of the payload range to be read.
if prm.Length == 0 { // Zero by default.
return nil, errorZeroRangeLength func (x *PrmObjectRange) SetOffset(off uint64) {
} x.rng.SetOffset(off)
}
if prm.ContainerID == nil { // SetLength sets length of the payload range to be read.
return nil, errorMissingContainer // Must be positive.
} func (x *PrmObjectRange) SetLength(ln uint64) {
x.rng.SetLength(ln)
if prm.ObjectID == nil {
return nil, errorMissingObject
}
if len(prm.XHeaders)%2 != 0 {
return nil, errorInvalidXHeaders
}
meta := new(v2session.RequestMetaHeader)
writeXHeadersToMeta(prm.XHeaders, meta)
if prm.BearerToken != nil {
v2BearerToken := new(acl.BearerToken)
prm.BearerToken.WriteToV2(v2BearerToken)
meta.SetBearerToken(v2BearerToken)
}
if prm.Session != nil {
v2SessionToken := new(v2session.Token)
prm.Session.WriteToV2(v2SessionToken)
meta.SetSessionToken(v2SessionToken)
}
if prm.Local {
meta.SetTTL(1)
}
addr := new(v2refs.Address)
cnrV2 := new(v2refs.ContainerID)
prm.ContainerID.WriteToV2(cnrV2)
addr.SetContainerID(cnrV2)
objV2 := new(v2refs.ObjectID)
prm.ObjectID.WriteToV2(objV2)
addr.SetObjectID(objV2)
rng := new(v2object.Range)
rng.SetLength(prm.Length)
rng.SetOffset(prm.Offset)
body := new(v2object.GetRangeRequestBody)
body.SetRaw(prm.Raw)
body.SetAddress(addr)
body.SetRange(rng)
req := new(v2object.GetRangeRequest)
req.SetBody(body)
c.prepareRequest(req, meta)
return req, nil
} }
// UseKey specifies private key to sign the requests. // UseKey specifies private key to sign the requests.
// If key is not provided, then Client default key is used. // If key is not provided, then Client default key is used.
// func (x *PrmObjectRange) UseKey(key ecdsa.PrivateKey) {
// Deprecated: Use PrmObjectRange.Key instead. x.key = &key
func (prm *PrmObjectRange) UseKey(key ecdsa.PrivateKey) {
prm.Key = &key
} }
// ResObjectRange groups the final result values of ObjectRange operation. // ResObjectRange groups the final result values of ObjectRange operation.
@ -769,31 +662,49 @@ func (x *ObjectRangeReader) Read(p []byte) (int, error) {
// Returns an error if parameters are set incorrectly (see PrmObjectRange docs). // Returns an error if parameters are set incorrectly (see PrmObjectRange docs).
// Context is required and must not be nil. It is used for network communication. // Context is required and must not be nil. It is used for network communication.
func (c *Client) ObjectRangeInit(ctx context.Context, prm PrmObjectRange) (*ObjectRangeReader, error) { func (c *Client) ObjectRangeInit(ctx context.Context, prm PrmObjectRange) (*ObjectRangeReader, error) {
req, err := prm.buildRequest(c) // check parameters
if err != nil { switch {
return nil, err case prm.addr.GetContainerID() == nil:
return nil, errorMissingContainer
case prm.addr.GetObjectID() == nil:
return nil, errorMissingObject
case prm.rng.GetLength() == 0:
return nil, errorZeroRangeLength
} }
key := prm.Key // form request body
var body v2object.GetRangeRequestBody
body.SetRaw(prm.raw)
body.SetAddress(&prm.addr)
body.SetRange(&prm.rng)
// form request
var req v2object.GetRangeRequest
req.SetBody(&body)
c.prepareRequest(&req, &prm.meta)
key := prm.key
if key == nil { if key == nil {
key = &c.prm.key key = &c.prm.key
} }
err = signature.SignServiceMessage(key, req) err := signature.SignServiceMessage(key, &req)
if err != nil { if err != nil {
return nil, fmt.Errorf("sign request: %w", err) return nil, fmt.Errorf("sign request: %w", err)
} }
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
stream, err := rpcapi.GetObjectRange(&c.c, req, client.WithContext(ctx)) stream, err := rpcapi.GetObjectRange(&c.c, &req, client.WithContext(ctx))
if err != nil { if err != nil {
cancel() cancel()
return nil, fmt.Errorf("open stream: %w", err) return nil, fmt.Errorf("open stream: %w", err)
} }
var r ObjectRangeReader var r ObjectRangeReader
r.remainingPayloadLen = int(prm.Length) r.remainingPayloadLen = int(prm.rng.GetLength())
r.cancelCtxStream = cancel r.cancelCtxStream = cancel
r.stream = stream r.stream = stream
r.client = c r.client = c

View file

@ -13,53 +13,121 @@ import (
v2session "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/session" v2session "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/session"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/signature" "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/signature"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/checksum"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session"
) )
// PrmObjectHash groups parameters of ObjectHash operation. // PrmObjectHash groups parameters of ObjectHash operation.
type PrmObjectHash struct { type PrmObjectHash struct {
XHeaders []string meta v2session.RequestMetaHeader
BearerToken *bearer.Token body v2object.GetRangeHashRequestBody
Session *session.Object csAlgo v2refs.ChecksumType
Local bool addr v2refs.Address
Ranges []object.Range keySet bool
key ecdsa.PrivateKey
Salt []byte
ChecksumType checksum.Type
ContainerID *cid.ID
ObjectID *oid.ID
Key *ecdsa.PrivateKey
} }
// UseKey specifies private key to sign the requests. // UseKey specifies private key to sign the requests.
// If key is not provided, then Client default key is used. // If key is not provided, then Client default key is used.
func (x *PrmObjectHash) UseKey(key ecdsa.PrivateKey) {
x.keySet = true
x.key = key
}
// MarkLocal tells the server to execute the operation locally.
func (x *PrmObjectHash) MarkLocal() {
x.meta.SetTTL(1)
}
// WithinSession specifies session within which object should be read.
// //
// Deprecated: Use PrmObjectHash.Key instead. // Creator of the session acquires the authorship of the request.
func (prm *PrmObjectHash) UseKey(key ecdsa.PrivateKey) { // This may affect the execution of an operation (e.g. access control).
prm.Key = &key //
// Must be signed.
func (x *PrmObjectHash) WithinSession(t session.Object) {
var tv2 v2session.Token
t.WriteToV2(&tv2)
x.meta.SetSessionToken(&tv2)
}
// WithBearerToken attaches bearer token to be used for the operation.
//
// If set, underlying eACL rules will be used in access control.
//
// Must be signed.
func (x *PrmObjectHash) WithBearerToken(t bearer.Token) {
var v2token acl.BearerToken
t.WriteToV2(&v2token)
x.meta.SetBearerToken(&v2token)
}
// FromContainer specifies FrostFS container of the object.
// Required parameter.
func (x *PrmObjectHash) FromContainer(id cid.ID) {
var cidV2 v2refs.ContainerID
id.WriteToV2(&cidV2)
x.addr.SetContainerID(&cidV2)
}
// ByID specifies identifier of the requested object.
// Required parameter.
func (x *PrmObjectHash) ByID(id oid.ID) {
var idV2 v2refs.ObjectID
id.WriteToV2(&idV2)
x.addr.SetObjectID(&idV2)
}
// SetRangeList sets list of ranges in (offset, length) pair format.
// Required parameter.
//
// If passed as slice, then it must not be mutated before the operation completes.
func (x *PrmObjectHash) SetRangeList(r ...uint64) {
ln := len(r)
if ln%2 != 0 {
panic("odd number of range parameters")
}
rs := make([]v2object.Range, ln/2)
for i := 0; i < ln/2; i++ {
rs[i].SetOffset(r[2*i])
rs[i].SetLength(r[2*i+1])
}
x.body.SetRanges(rs)
} }
// TillichZemorAlgo changes the hash function to Tillich-Zemor // TillichZemorAlgo changes the hash function to Tillich-Zemor
// (https://link.springer.com/content/pdf/10.1007/3-540-48658-5_5.pdf). // (https://link.springer.com/content/pdf/10.1007/3-540-48658-5_5.pdf).
// //
// By default, SHA256 hash function is used/. // By default, SHA256 hash function is used.
func (x *PrmObjectHash) TillichZemorAlgo() {
x.csAlgo = v2refs.TillichZemor
}
// UseSalt sets the salt to XOR the data range before hashing.
// //
// Deprecated: Use PrmObjectHash.ChecksumType instead. // Must not be mutated before the operation completes.
func (prm *PrmObjectHash) TillichZemorAlgo() { func (x *PrmObjectHash) UseSalt(salt []byte) {
prm.ChecksumType = checksum.TZ x.body.SetSalt(salt)
}
// WithXHeaders specifies list of extended headers (string key-value pairs)
// to be attached to the request. Must have an even length.
//
// Slice must not be mutated until the operation completes.
func (x *PrmObjectHash) WithXHeaders(hs ...string) {
writeXHeadersToMeta(hs, &x.meta)
} }
// ResObjectHash groups resulting values of ObjectHash operation. // ResObjectHash groups resulting values of ObjectHash operation.
@ -74,76 +142,6 @@ func (x ResObjectHash) Checksums() [][]byte {
return x.checksums return x.checksums
} }
func (prm *PrmObjectHash) buildRequest(c *Client) (*v2object.GetRangeHashRequest, error) {
if prm.ContainerID == nil {
return nil, errorMissingContainer
}
if prm.ObjectID == nil {
return nil, errorMissingObject
}
if len(prm.XHeaders)%2 != 0 {
return nil, errorInvalidXHeaders
}
if len(prm.Ranges) == 0 {
return nil, errorMissingRanges
}
meta := new(v2session.RequestMetaHeader)
writeXHeadersToMeta(prm.XHeaders, meta)
if prm.BearerToken != nil {
v2BearerToken := new(acl.BearerToken)
prm.BearerToken.WriteToV2(v2BearerToken)
meta.SetBearerToken(v2BearerToken)
}
if prm.Session != nil {
v2SessionToken := new(v2session.Token)
prm.Session.WriteToV2(v2SessionToken)
meta.SetSessionToken(v2SessionToken)
}
if prm.Local {
meta.SetTTL(1)
}
addr := new(v2refs.Address)
cnrV2 := new(v2refs.ContainerID)
prm.ContainerID.WriteToV2(cnrV2)
addr.SetContainerID(cnrV2)
objV2 := new(v2refs.ObjectID)
prm.ObjectID.WriteToV2(objV2)
addr.SetObjectID(objV2)
rs := make([]v2object.Range, len(prm.Ranges))
for i := range prm.Ranges {
rs[i].SetOffset(prm.Ranges[i].GetOffset())
rs[i].SetLength(prm.Ranges[i].GetLength())
}
body := new(v2object.GetRangeHashRequestBody)
body.SetAddress(addr)
body.SetRanges(rs)
body.SetSalt(prm.Salt)
if prm.ChecksumType == checksum.Unknown {
body.SetType(v2refs.SHA256)
} else {
body.SetType(v2refs.ChecksumType(prm.ChecksumType))
}
req := new(v2object.GetRangeHashRequest)
req.SetBody(body)
c.prepareRequest(req, meta)
return req, nil
}
// ObjectHash requests checksum of the range list of the object payload using // ObjectHash requests checksum of the range list of the object payload using
// FrostFS API protocol. // FrostFS API protocol.
// //
@ -167,22 +165,37 @@ func (prm *PrmObjectHash) buildRequest(c *Client) (*v2object.GetRangeHashRequest
// - *apistatus.ObjectOutOfRange; // - *apistatus.ObjectOutOfRange;
// - *apistatus.SessionTokenExpired. // - *apistatus.SessionTokenExpired.
func (c *Client) ObjectHash(ctx context.Context, prm PrmObjectHash) (*ResObjectHash, error) { func (c *Client) ObjectHash(ctx context.Context, prm PrmObjectHash) (*ResObjectHash, error) {
req, err := prm.buildRequest(c) switch {
if err != nil { case prm.addr.GetContainerID() == nil:
return nil, err return nil, errorMissingContainer
case prm.addr.GetObjectID() == nil:
return nil, errorMissingObject
case len(prm.body.GetRanges()) == 0:
return nil, errorMissingRanges
} }
prm.body.SetAddress(&prm.addr)
if prm.csAlgo == v2refs.UnknownChecksum {
prm.body.SetType(v2refs.SHA256)
} else {
prm.body.SetType(prm.csAlgo)
}
var req v2object.GetRangeHashRequest
c.prepareRequest(&req, &prm.meta)
req.SetBody(&prm.body)
key := c.prm.key key := c.prm.key
if prm.Key != nil { if prm.keySet {
key = *prm.Key key = prm.key
} }
err = signature.SignServiceMessage(&key, req) err := signature.SignServiceMessage(&key, &req)
if err != nil { if err != nil {
return nil, fmt.Errorf("sign request: %w", err) return nil, fmt.Errorf("sign request: %w", err)
} }
resp, err := rpcapi.HashObjectRange(&c.c, req, client.WithContext(ctx)) resp, err := rpcapi.HashObjectRange(&c.c, &req, client.WithContext(ctx))
if err != nil { if err != nil {
return nil, fmt.Errorf("write request: %w", err) return nil, fmt.Errorf("write request: %w", err)
} }

View file

@ -16,32 +16,30 @@ import (
// PrmSessionCreate groups parameters of SessionCreate operation. // PrmSessionCreate groups parameters of SessionCreate operation.
type PrmSessionCreate struct { type PrmSessionCreate struct {
XHeaders []string prmCommonMeta
Expiration uint64 exp uint64
Key *ecdsa.PrivateKey keySet bool
key ecdsa.PrivateKey
} }
// SetExp sets number of the last NepFS epoch in the lifetime of the session after which it will be expired. // SetExp sets number of the last NepFS epoch in the lifetime of the session after which it will be expired.
//
// Deprecated: Use PrmSessionCreate.Expiration instead.
func (x *PrmSessionCreate) SetExp(exp uint64) { func (x *PrmSessionCreate) SetExp(exp uint64) {
x.Expiration = exp x.exp = exp
} }
// UseKey specifies private key to sign the requests and compute token owner. // UseKey specifies private key to sign the requests and compute token owner.
// If key is not provided, then Client default key is used. // If key is not provided, then Client default key is used.
//
// Deprecated: Use PrmSessionCreate.Key instead.
func (x *PrmSessionCreate) UseKey(key ecdsa.PrivateKey) { func (x *PrmSessionCreate) UseKey(key ecdsa.PrivateKey) {
x.Key = &key x.keySet = true
x.key = key
} }
func (x *PrmSessionCreate) buildRequest(c *Client) (*v2session.CreateRequest, error) { func (x *PrmSessionCreate) buildRequest(c *Client) (*v2session.CreateRequest, error) {
ownerKey := c.prm.key.PublicKey ownerKey := c.prm.key.PublicKey
if x.Key != nil { if x.keySet {
ownerKey = x.Key.PublicKey ownerKey = x.key.PublicKey
} }
var ownerID user.ID var ownerID user.ID
user.IDFromKey(&ownerID, ownerKey) user.IDFromKey(&ownerID, ownerKey)
@ -51,10 +49,10 @@ func (x *PrmSessionCreate) buildRequest(c *Client) (*v2session.CreateRequest, er
reqBody := new(v2session.CreateRequestBody) reqBody := new(v2session.CreateRequestBody)
reqBody.SetOwnerID(&ownerIDV2) reqBody.SetOwnerID(&ownerIDV2)
reqBody.SetExpiration(x.Expiration) reqBody.SetExpiration(x.exp)
var meta v2session.RequestMetaHeader var meta v2session.RequestMetaHeader
writeXHeadersToMeta(x.XHeaders, &meta) writeXHeadersToMeta(x.xHeaders, &meta)
var req v2session.CreateRequest var req v2session.CreateRequest
req.SetBody(reqBody) req.SetBody(reqBody)

View file

@ -1,8 +1,8 @@
package cid_test package cid_test
import ( import (
"crypto/rand"
"crypto/sha256" "crypto/sha256"
"math/rand"
"testing" "testing"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/refs" "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/refs"

View file

@ -1,8 +1,8 @@
package cidtest package cidtest
import ( import (
"crypto/rand"
"crypto/sha256" "crypto/sha256"
"math/rand"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
) )
@ -11,7 +11,7 @@ import (
func ID() cid.ID { func ID() cid.ID {
checksum := [sha256.Size]byte{} checksum := [sha256.Size]byte{}
_, _ = rand.Read(checksum[:]) rand.Read(checksum[:])
return IDWithChecksum(checksum) return IDWithChecksum(checksum)
} }

View file

@ -1,7 +1,7 @@
package frostfscrypto_test package frostfscrypto_test
import ( import (
"crypto/rand" "math/rand"
"testing" "testing"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/refs" "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/refs"

File diff suppressed because one or more lines are too long

Before

Width:  |  Height:  |  Size: 12 KiB

After

Width:  |  Height:  |  Size: 12 KiB

File diff suppressed because one or more lines are too long

Before

Width:  |  Height:  |  Size: 23 KiB

After

Width:  |  Height:  |  Size: 23 KiB

File diff suppressed because one or more lines are too long

Before

Width:  |  Height:  |  Size: 8.8 KiB

After

Width:  |  Height:  |  Size: 8.8 KiB

File diff suppressed because one or more lines are too long

Before

Width:  |  Height:  |  Size: 12 KiB

After

Width:  |  Height:  |  Size: 12 KiB

File diff suppressed because one or more lines are too long

Before

Width:  |  Height:  |  Size: 11 KiB

After

Width:  |  Height:  |  Size: 11 KiB

View file

@ -101,16 +101,16 @@ In a nutshell, a `SELECT` takes a filter result as input and outputs a specific
Let's see some examples Let's see some examples
```sql ```sql
-- Selects exactly one node from the entire netmap -- Selects exactly one node from the entire netmap
SELECT 1 FROM * SELECT 1 FROM *
-- Same as above, but with an identifier for the selection -- Same as above, but with an identifier for the selection
SELECT 1 FROM * AS ONE SELECT 1 FROM * AS ONE
-- Selects two nodes from the RedOrBlueNodes filter, such that both selected nodes -- Selects two nodes from the RedOrBlueNodes filter, such that both selected nodes
-- share the same value for the Color attribute, i.e. both red or both blue. -- share the same value for the Color attribute, i.e. both red or both blue.
SELECT 2 IN SAME Color FROM RedOrBlueNodes SELECT 2 IN SAME Color FROM RedOrBlueNodes
-- Selects two nodes from the RedOrBlueNodes filter, such that the selected nodes -- Selects two nodes from the RedOrBlueNodes filter, such that the selected nodes
-- have distinct values for the Color attribute, i.e. one red and one blue. -- have distinct values for the Color attribute, i.e. one red and one blue.
-- The selection is also given an identifier. -- The selection is also given an identifier.
SELECT 2 IN DISTINCT Color FROM RedOrBlueNodes AS MyNodes SELECT 2 IN DISTINCT Color FROM RedOrBlueNodes AS MyNodes
@ -131,8 +131,7 @@ Its basic syntax is as follows:
REP <count> {IN <select>} REP <count> {IN <select>}
``` ```
If a select is not specified, then the entire netmap is used as input. The only exception to this rule is when exactly 1 replica and 1 selector are being present: in this case the only selector is being used instead of the whole netmap. If a select is not specified, then the entire netmap is used as input. The resulting nodes will be used to actually store objects and they constitute a replica group (or simply, "a replica").
The resulting nodes will be used to actually store objects and they constitute a replica group (or simply, "a replica").
Examples Examples
```sql ```sql
@ -174,18 +173,18 @@ In additional to this basic syntax, there are a couple of additional useful opti
### The policy playground ### The policy playground
> This section assumes you have an up-to-date version of the `frostfs-cli`. > This section assumes you have an up-to-date version of the `frostfs-cli`.
While simple placement policies have predictable results that can be understood at a glance, more complex ones need careful consideration before deployment. In order to simplify understanding a policy's outcome and experimenting while learning, a builtin tool is provided as part of the `frostfs-cli` for this purpose: the policy playground. While simple placement policies have predictable results that can be understood at a glance, more complex ones need careful consideration before deployment. In order to simplify understanding a policy's outcome and experimenting while learning, a builtin tool is provided as part of the `frostfs-cli` for this purpose: the policy playground.
For the remainder of this guide, we will use the policy playground to setup a virtual netmap (that is, one that doesn't require any networking or deployment) and test various policies. In order to visualize this netmap easily, each node will have three attributes: a character, a shape and a color For the remainder of this guide, we will use the policy playground to setup a virtual netmap (that is, one that doesn't require any networking or deployment) and test various policies. In order to visualize this netmap easily, each node will have three attributes: a character, a shape and a color
![Sample Netmap](./image/sample_netmap.svg) ![Sample Netmap](./image/sample_netmap.svg)
We can start the policy playground as follows: We can start the policy playground as follows:
```sh ```sh
$ frostfs-cli container policy-playground $ frostfs-cli container policy-playground
> >
``` ```
Since we didn't pass any endpoint, the initial netmap is empty, which we can verify with the `ls` command (to list the nodes in the netmap): Since we didn't pass any endpoint, the initial netmap is empty, which we can verify with the `ls` command (to list the nodes in the netmap):
@ -401,7 +400,7 @@ FILTER Color EQ 'Green' AS GreenNodes
#### Example #6 #### Example #6
```sql ```sql
REP 1 IN MyNodes REP 1 IN MyNodes
REP 2 REP 2
CBF 2 CBF 2
SELECT 1 FROM CuteNodes AS MyNodes SELECT 1 FROM CuteNodes AS MyNodes
FILTER (Color EQ 'Blue') AND NOT (Shape EQ 'Circle' OR Shape EQ 'Square') AS CuteNodes FILTER (Color EQ 'Blue') AND NOT (Shape EQ 'Circle' OR Shape EQ 'Square') AS CuteNodes
@ -443,4 +442,4 @@ Others:
- `ls`: list nodes in the current netmap and their attributes - `ls`: list nodes in the current netmap and their attributes
- `add`: add a node to the current netmap. If it already exists, it will be overwritten. - `add`: add a node to the current netmap. If it already exists, it will be overwritten.
- `remove`: remove a node from the current netmap. - `remove`: remove a node from the current netmap.
- `eval`: evaluate a placement policy on the current netmap. - `eval`: evaluate a placement policy on the current netmap.

View file

@ -2,7 +2,7 @@ package eacltest
import ( import (
"bytes" "bytes"
"crypto/rand" "math/rand"
"testing" "testing"
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test" cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"

View file

@ -1,7 +1,7 @@
package eacl package eacl
import ( import (
"crypto/rand" "math/rand"
"testing" "testing"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"

View file

@ -23,8 +23,7 @@ type (
} }
minAgg struct { minAgg struct {
min float64 min float64
minFound bool
} }
meanIQRAgg struct { meanIQRAgg struct {
@ -103,13 +102,7 @@ func (a *meanAgg) Compute() float64 {
} }
func (a *minAgg) Add(n float64) { func (a *minAgg) Add(n float64) {
if !a.minFound { if a.min == 0 || n < a.min {
a.min = n
a.minFound = true
return
}
if n < a.min {
a.min = n a.min = n
} }
} }

View file

@ -1,44 +0,0 @@
package netmap
import (
"testing"
"github.com/stretchr/testify/require"
)
func TestMinAgg(t *testing.T) {
tests := []struct {
vals []float64
res float64
}{
{
vals: []float64{1, 2, 3, 0, 10},
res: 0,
},
{
vals: []float64{10, 0, 10, 0},
res: 0,
},
{
vals: []float64{0, 1, 2, 3, 10},
res: 0,
},
{
vals: []float64{0, 0, 0, 0},
res: 0,
},
{
vals: []float64{10, 10, 10, 10},
res: 10,
},
}
for _, test := range tests {
a := newMinAgg()
for _, val := range test.vals {
a.Add(val)
}
require.Equal(t, test.res, a.Compute())
}
}

View file

@ -40,10 +40,6 @@ type context struct {
// policy uses the UNIQUE flag. Nodes marked as used are not used in subsequent // policy uses the UNIQUE flag. Nodes marked as used are not used in subsequent
// base selections. // base selections.
usedNodes map[uint64]bool usedNodes map[uint64]bool
// If true, returns an error when netmap does not contain enough nodes for selection.
// By default best effort is taken.
strict bool
} }
// Various validation errors. // Various validation errors.

View file

@ -2,7 +2,6 @@ package netmap
import ( import (
"encoding/json" "encoding/json"
"fmt"
"os" "os"
"path/filepath" "path/filepath"
"testing" "testing"
@ -48,8 +47,7 @@ func TestPlacementPolicy_Interopability(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
for i := range ds { for i := range ds {
filename := filepath.Join(testsDir, ds[i].Name()) bs, err := os.ReadFile(filepath.Join(testsDir, ds[i].Name()))
bs, err := os.ReadFile(filename)
require.NoError(t, err) require.NoError(t, err)
var tc TestCase var tc TestCase
@ -58,7 +56,7 @@ func TestPlacementPolicy_Interopability(t *testing.T) {
srcNodes := make([]NodeInfo, len(tc.Nodes)) srcNodes := make([]NodeInfo, len(tc.Nodes))
copy(srcNodes, tc.Nodes) copy(srcNodes, tc.Nodes)
t.Run(fmt.Sprintf("%s:%s", filename, tc.Name), func(t *testing.T) { t.Run(tc.Name, func(t *testing.T) {
var nm NetMap var nm NetMap
nm.SetNodes(tc.Nodes) nm.SetNodes(tc.Nodes)

View file

@ -146,19 +146,19 @@
"select 3 nodes in 3 distinct countries, same placement": { "select 3 nodes in 3 distinct countries, same placement": {
"policy": {"replicas":[{"count":1,"selector":"Main"}],"containerBackupFactor":1,"selectors":[{"name":"Main","count":3,"clause":"DISTINCT","attribute":"Country","filter":"*"}],"filters":[]}, "policy": {"replicas":[{"count":1,"selector":"Main"}],"containerBackupFactor":1,"selectors":[{"name":"Main","count":3,"clause":"DISTINCT","attribute":"Country","filter":"*"}],"filters":[]},
"pivot": "Y29udGFpbmVySUQ=", "pivot": "Y29udGFpbmVySUQ=",
"result": [[0, 2, 3]], "result": [[4, 0, 7]],
"placement": { "placement": {
"pivot": "b2JqZWN0SUQ=", "pivot": "b2JqZWN0SUQ=",
"result": [[0, 2, 3]] "result": [[4, 0, 7]]
} }
}, },
"select 6 nodes in 3 distinct countries, different placement": { "select 6 nodes in 3 distinct countries, different placement": {
"policy": {"replicas":[{"count":1,"selector":"Main"}],"containerBackupFactor":2,"selectors":[{"name":"Main","count":3,"clause":"DISTINCT","attribute":"Country","filter":"*"}],"filters":[]}, "policy": {"replicas":[{"count":1,"selector":"Main"}],"containerBackupFactor":2,"selectors":[{"name":"Main","count":3,"clause":"DISTINCT","attribute":"Country","filter":"*"}],"filters":[]},
"pivot": "Y29udGFpbmVySUQ=", "pivot": "Y29udGFpbmVySUQ=",
"result": [[0, 1, 2, 6, 3, 4]], "result": [[4, 3, 0, 1, 7, 2]],
"placement": { "placement": {
"pivot": "b2JqZWN0SUQ=", "pivot": "b2JqZWN0SUQ=",
"result": [[0, 1, 2, 6, 3, 4]] "result": [[4, 3, 0, 7, 2, 1]]
} }
} }
} }

View file

@ -1,95 +0,0 @@
{
"name": "non-strict selections",
"comment": "These test specify loose selection behaviour, to allow fetching already PUT objects even when there is not enough nodes to select from.",
"nodes": [
{
"attributes": [
{
"key": "Country",
"value": "Russia"
}
]
},
{
"attributes": [
{
"key": "Country",
"value": "Germany"
}
]
},
{
"attributes": [ ]
}
],
"tests": {
"not enough nodes (backup factor)": {
"policy": {
"replicas": [
{
"count": 1,
"selector": "MyStore"
}
],
"containerBackupFactor": 2,
"selectors": [
{
"name": "MyStore",
"count": 2,
"clause": "DISTINCT",
"attribute": "Country",
"filter": "FromRU"
}
],
"filters": [
{
"name": "FromRU",
"key": "Country",
"op": "EQ",
"value": "Russia",
"filters": [ ]
}
]
},
"result": [
[
0
]
]
},
"not enough nodes (buckets)": {
"policy": {
"replicas": [
{
"count": 1,
"selector": "MyStore"
}
],
"containerBackupFactor": 1,
"selectors": [
{
"name": "MyStore",
"count": 2,
"clause": "DISTINCT",
"attribute": "Country",
"filter": "FromRU"
}
],
"filters": [
{
"name": "FromRU",
"key": "Country",
"op": "EQ",
"value": "Russia",
"filters": [ ]
}
]
},
"result": [
[
0
]
]
}
}
}

View file

@ -104,14 +104,7 @@
"selectors": [], "selectors": [],
"filters": [] "filters": []
}, },
"result": [ "error": "not enough nodes"
[
0,
1,
2,
3
]
]
} }
} }
} }

View file

@ -24,12 +24,7 @@
"tests": { "tests": {
"missing filter": { "missing filter": {
"policy": { "policy": {
"replicas": [ "replicas": [],
{
"count": 1,
"selector": "MyStore"
}
],
"containerBackupFactor": 1, "containerBackupFactor": 1,
"selectors": [ "selectors": [
{ {
@ -52,14 +47,9 @@
}, },
"error": "filter not found" "error": "filter not found"
}, },
"not enough nodes (filter results in empty set)": { "not enough nodes (backup factor)": {
"policy": { "policy": {
"replicas": [ "replicas": [],
{
"count": 1,
"selector": "MyStore"
}
],
"containerBackupFactor": 2, "containerBackupFactor": 2,
"selectors": [ "selectors": [
{ {
@ -67,15 +57,40 @@
"count": 2, "count": 2,
"clause": "DISTINCT", "clause": "DISTINCT",
"attribute": "Country", "attribute": "Country",
"filter": "FromMoon" "filter": "FromRU"
} }
], ],
"filters": [ "filters": [
{ {
"name": "FromMoon", "name": "FromRU",
"key": "Country", "key": "Country",
"op": "EQ", "op": "EQ",
"value": "Moon", "value": "Russia",
"filters": []
}
]
},
"error": "not enough nodes"
},
"not enough nodes (buckets)": {
"policy": {
"replicas": [],
"containerBackupFactor": 1,
"selectors": [
{
"name": "MyStore",
"count": 2,
"clause": "DISTINCT",
"attribute": "Country",
"filter": "FromRU"
}
],
"filters": [
{
"name": "FromRU",
"key": "Country",
"op": "EQ",
"value": "Russia",
"filters": [] "filters": []
} }
] ]

View file

@ -238,7 +238,7 @@ func (m NetMap) ContainerNodes(p PlacementPolicy, pivot []byte) ([][]NodeInfo, e
// marked as used by earlier replicas. // marked as used by earlier replicas.
for i := range p.replicas { for i := range p.replicas {
sName := p.replicas[i].GetSelector() sName := p.replicas[i].GetSelector()
if sName == "" && !(len(p.replicas) == 1 && len(p.selectors) == 1) { if sName == "" {
var s netmap.Selector var s netmap.Selector
s.SetCount(p.replicas[i].GetCount()) s.SetCount(p.replicas[i].GetCount())
s.SetFilter(mainFilterName) s.SetFilter(mainFilterName)
@ -258,9 +258,6 @@ func (m NetMap) ContainerNodes(p PlacementPolicy, pivot []byte) ([][]NodeInfo, e
} }
if p.unique { if p.unique {
if c.processedSelectors[sName] == nil {
return nil, fmt.Errorf("selector not found: '%s'", sName)
}
nodes, err := c.getSelection(*c.processedSelectors[sName]) nodes, err := c.getSelection(*c.processedSelectors[sName])
if err != nil { if err != nil {
return nil, err return nil, err

View file

@ -551,7 +551,7 @@ func (p *PlacementPolicy) DecodeString(s string) error {
return errors.New("parsed nil value") return errors.New("parsed nil value")
} }
if err := validatePolicy(*parsed); err != nil { if err := validatePolicy(*p); err != nil {
return fmt.Errorf("invalid policy: %w", err) return fmt.Errorf("invalid policy: %w", err)
} }
@ -605,13 +605,6 @@ var (
errUnknownSelector = errors.New("policy: selector not found") errUnknownSelector = errors.New("policy: selector not found")
// errSyntaxError is returned for errors found by ANTLR parser. // errSyntaxError is returned for errors found by ANTLR parser.
errSyntaxError = errors.New("policy: syntax error") errSyntaxError = errors.New("policy: syntax error")
// errRedundantSelector is returned for errors found by selectors policy validator.
errRedundantSelector = errors.New("policy: found redundant selector")
// errUnnamedSelector is returned for errors found by selectors policy validator.
errUnnamedSelector = errors.New("policy: unnamed selectors are useless, " +
"make sure to pair REP and SELECT clauses: \"REP .. IN X\" + \"SELECT ... AS X\"")
// errRedundantSelector is returned for errors found by filters policy validator.
errRedundantFilter = errors.New("policy: found redundant filter")
) )
type policyVisitor struct { type policyVisitor struct {
@ -852,52 +845,25 @@ func (p *policyVisitor) VisitExpr(ctx *parser.ExprContext) any {
// validatePolicy checks high-level constraints such as filter link in SELECT // validatePolicy checks high-level constraints such as filter link in SELECT
// being actually defined in FILTER section. // being actually defined in FILTER section.
func validatePolicy(p PlacementPolicy) error { func validatePolicy(p PlacementPolicy) error {
canOmitNames := len(p.selectors) == 1 && len(p.replicas) == 1
seenFilters := map[string]bool{} seenFilters := map[string]bool{}
expectedFilters := map[string]struct{}{}
for i := range p.filters { for i := range p.filters {
seenFilters[p.filters[i].GetName()] = true seenFilters[p.filters[i].GetName()] = true
for _, f := range p.filters[i].GetFilters() {
if f.GetName() != "" {
expectedFilters[f.GetName()] = struct{}{}
}
}
} }
seenSelectors := map[string]*netmap.Selector{} seenSelectors := map[string]bool{}
for i := range p.selectors { for i := range p.selectors {
if p.selectors[i].GetName() == "" && !canOmitNames { if flt := p.selectors[i].GetFilter(); flt != mainFilterName && !seenFilters[flt] {
return errUnnamedSelector return fmt.Errorf("%w: '%s'", errUnknownFilter, flt)
} }
if flt := p.selectors[i].GetFilter(); flt != mainFilterName {
expectedFilters[flt] = struct{}{} seenSelectors[p.selectors[i].GetName()] = true
if !seenFilters[flt] {
return fmt.Errorf("%w: '%s'", errUnknownFilter, flt)
}
}
seenSelectors[p.selectors[i].GetName()] = &p.selectors[i]
} }
for _, f := range p.filters {
if _, ok := expectedFilters[f.GetName()]; !ok {
return fmt.Errorf("%w: '%s'", errRedundantFilter, f.GetName())
}
}
expectedSelectors := map[string]struct{}{}
for i := range p.replicas { for i := range p.replicas {
selName := p.replicas[i].GetSelector() if sel := p.replicas[i].GetSelector(); sel != "" && !seenSelectors[sel] {
if selName != "" || canOmitNames { return fmt.Errorf("%w: '%s'", errUnknownSelector, sel)
expectedSelectors[selName] = struct{}{}
if seenSelectors[selName] == nil {
return fmt.Errorf("%w: '%s'", errUnknownSelector, selName)
}
}
}
for _, s := range p.selectors {
if _, ok := expectedSelectors[s.GetName()]; !ok {
return fmt.Errorf("%w: to use selector '%s' use keyword IN", errRedundantSelector, s.GetName())
} }
} }

View file

@ -1,65 +0,0 @@
package netmap
import (
"strings"
"testing"
"github.com/stretchr/testify/require"
)
func TestDecodeString(t *testing.T) {
testCases := []string{
`REP 2
CBF 2
SELECT 2 FROM *`,
`REP 1 IN X
CBF 1
SELECT 2 IN SAME Location FROM * AS X`,
`REP 1 IN X
REP 2 IN Y
CBF 1
SELECT 2 FROM * AS X
SELECT 3 FROM * AS Y`,
`REP 1 IN X
SELECT 2 IN City FROM Good AS X
FILTER Country EQ RU AS FromRU
FILTER Country EQ EN AS FromEN
FILTER @FromRU AND @FromEN AND Rating GT 7 AS Good`,
`REP 7 IN SPB
SELECT 1 IN City FROM SPBSSD AS SPB
FILTER City EQ SPB AND SSD EQ true OR City EQ SPB AND Rating GE 5 AS SPBSSD`,
`REP 7 IN SPB
SELECT 1 IN City FROM SPBSSD AS SPB
FILTER NOT (NOT (City EQ SPB) AND SSD EQ true OR City EQ SPB AND Rating GE 5) AS SPBSSD`,
`UNIQUE
REP 1
REP 1`,
}
var p PlacementPolicy
for _, testCase := range testCases {
require.NoError(t, p.DecodeString(testCase), "unable parse %s", testCase)
var b strings.Builder
require.NoError(t, p.WriteStringTo(&b))
require.Equal(t, testCase, b.String())
}
invalidTestCases := map[string]error{
`?REP 1`: errSyntaxError,
`REP 1 trailing garbage`: errSyntaxError,
`REP 1 REP 1 SELECT 4 FROM *`: errUnnamedSelector,
`REP 1 SELECT 4 FROM * SELECT 1 FROM *`: errUnnamedSelector,
`REP 1 IN X SELECT 4 FROM *`: errUnknownSelector,
`REP 1 IN X REP 2 SELECT 4 FROM * AS X FILTER 'UN-LOCODE' EQ 'RU LED' AS F`: errRedundantFilter,
}
for i := range invalidTestCases {
require.ErrorIs(t, p.DecodeString(i), invalidTestCases[i], "#%s", i)
}
}

View file

@ -1,6 +1,7 @@
package netmap_test package netmap_test
import ( import (
"strings"
"testing" "testing"
. "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" . "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
@ -8,6 +9,55 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
func TestEncode(t *testing.T) {
testCases := []string{
`REP 1 IN X
CBF 1
SELECT 2 IN SAME Location FROM * AS X`,
`REP 1
SELECT 2 IN City FROM Good
FILTER Country EQ RU AS FromRU
FILTER @FromRU AND Rating GT 7 AS Good`,
`REP 7 IN SPB
SELECT 1 IN City FROM SPBSSD AS SPB
FILTER City EQ SPB AND SSD EQ true OR City EQ SPB AND Rating GE 5 AS SPBSSD`,
`REP 7 IN SPB
SELECT 1 IN City FROM SPBSSD AS SPB
FILTER NOT (NOT (City EQ SPB) AND SSD EQ true OR City EQ SPB AND Rating GE 5) AS SPBSSD`,
`UNIQUE
REP 1
REP 1`,
`REP 1 IN X
SELECT 1 FROM F AS X
FILTER 'UN-LOCODE' EQ 'RU LED' AS F`,
}
var p PlacementPolicy
for _, testCase := range testCases {
require.NoError(t, p.DecodeString(testCase))
var b strings.Builder
require.NoError(t, p.WriteStringTo(&b))
require.Equal(t, testCase, b.String())
}
invalidTestCases := []string{
`?REP 1`,
`REP 1 trailing garbage`,
}
for i := range invalidTestCases {
require.Error(t, p.DecodeString(invalidTestCases[i]), "#%d", i)
}
}
func TestPlacementPolicyEncoding(t *testing.T) { func TestPlacementPolicyEncoding(t *testing.T) {
v := netmaptest.PlacementPolicy() v := netmaptest.PlacementPolicy()

View file

@ -60,7 +60,7 @@ func (c *context) getSelection(s netmap.Selector) ([]nodes, error) {
bucketCount, nodesInBucket := calcNodesCount(s) bucketCount, nodesInBucket := calcNodesCount(s)
buckets := c.getSelectionBase(s) buckets := c.getSelectionBase(s)
if c.strict && len(buckets) < bucketCount { if len(buckets) < bucketCount {
return nil, fmt.Errorf("%w: '%s'", errNotEnoughNodes, s.GetName()) return nil, fmt.Errorf("%w: '%s'", errNotEnoughNodes, s.GetName())
} }
@ -96,7 +96,7 @@ func (c *context) getSelection(s netmap.Selector) ([]nodes, error) {
if len(res) < bucketCount { if len(res) < bucketCount {
// Fallback to using minimum allowed backup factor (1). // Fallback to using minimum allowed backup factor (1).
res = append(res, fallback...) res = append(res, fallback...)
if c.strict && len(res) < bucketCount { if len(res) < bucketCount {
return nil, fmt.Errorf("%w: '%s'", errNotEnoughNodes, s.GetName()) return nil, fmt.Errorf("%w: '%s'", errNotEnoughNodes, s.GetName())
} }
} }
@ -110,13 +110,6 @@ func (c *context) getSelection(s netmap.Selector) ([]nodes, error) {
hrw.SortHasherSliceByWeightValue(res, weights, c.hrwSeedHash) hrw.SortHasherSliceByWeightValue(res, weights, c.hrwSeedHash)
} }
if len(res) < bucketCount {
if len(res) == 0 {
return nil, errNotEnoughNodes
}
bucketCount = len(res)
}
if s.GetAttribute() == "" { if s.GetAttribute() == "" {
res, fallback = res[:bucketCount], res[bucketCount:] res, fallback = res[:bucketCount], res[bucketCount:]
for i := range fallback { for i := range fallback {

View file

@ -1,10 +1,9 @@
package netmap package netmap
import ( import (
"crypto/rand"
"encoding/binary" "encoding/binary"
"fmt" "fmt"
mrand "math/rand" "math/rand"
"sort" "sort"
"strconv" "strconv"
"testing" "testing"
@ -29,10 +28,10 @@ func BenchmarkHRWSort(b *testing.B) {
node.SetPublicKey(key) node.SetPublicKey(key)
vectors[i] = nodes{node} vectors[i] = nodes{node}
weights[i] = float64(mrand.Uint32()%10) / 10.0 weights[i] = float64(rand.Uint32()%10) / 10.0
} }
pivot := mrand.Uint64() pivot := rand.Uint64()
b.Run("sort by index, no weight", func(b *testing.B) { b.Run("sort by index, no weight", func(b *testing.B) {
realNodes := make([]nodes, netmapSize) realNodes := make([]nodes, netmapSize)
b.ResetTimer() b.ResetTimer()
@ -283,55 +282,6 @@ func TestPlacementPolicy_Unique(t *testing.T) {
} }
} }
func TestPlacementPolicy_SingleOmitNames(t *testing.T) {
nodes := []NodeInfo{
nodeInfoFromAttributes("ID", "1", "Country", "Russia", "City", "SPB"),
nodeInfoFromAttributes("ID", "2", "Country", "Germany", "City", "Berlin"),
nodeInfoFromAttributes("ID", "3", "Country", "Russia", "City", "Moscow"),
nodeInfoFromAttributes("ID", "4", "Country", "France", "City", "Paris"),
nodeInfoFromAttributes("ID", "5", "Country", "France", "City", "Lyon"),
nodeInfoFromAttributes("ID", "6", "Country", "Russia", "City", "SPB"),
nodeInfoFromAttributes("ID", "7", "Country", "Russia", "City", "Moscow"),
nodeInfoFromAttributes("ID", "8", "Country", "Germany", "City", "Darmstadt"),
nodeInfoFromAttributes("ID", "9", "Country", "Germany", "City", "Frankfurt"),
nodeInfoFromAttributes("ID", "10", "Country", "Russia", "City", "SPB"),
nodeInfoFromAttributes("ID", "11", "Country", "Russia", "City", "Moscow"),
nodeInfoFromAttributes("ID", "12", "Country", "Germany", "City", "London"),
}
for i := range nodes {
pub := make([]byte, 33)
rand.Read(pub)
nodes[i].SetPublicKey(pub)
}
var nm NetMap
nm.SetNodes(nodes)
for _, unique := range []bool{false, true} {
t.Run(fmt.Sprintf("unique=%t", unique), func(t *testing.T) {
ssNamed := []Selector{newSelector("X", "City", 2, "FromRU", (*Selector).SelectDistinct)}
fsNamed := []Filter{newFilter("FromRU", "Country", "Russia", netmap.EQ)}
rsNamed := []ReplicaDescriptor{newReplica(1, "X")}
pNamed := newPlacementPolicy(3, rsNamed, ssNamed, fsNamed)
pNamed.unique = unique
vNamed, err := nm.ContainerNodes(pNamed, []byte{1})
require.NoError(t, err)
ssUnnamed := []Selector{newSelector("", "City", 2, "FromRU", (*Selector).SelectDistinct)}
fsUnnamed := []Filter{newFilter("FromRU", "Country", "Russia", netmap.EQ)}
rsUnnamed := []ReplicaDescriptor{newReplica(1, "")}
pUnnamed := newPlacementPolicy(3, rsUnnamed, ssUnnamed, fsUnnamed)
pUnnamed.unique = unique
vUnnamed, err := nm.ContainerNodes(pUnnamed, []byte{1})
require.NoError(t, err)
require.Equal(t, vNamed, vUnnamed)
})
}
}
func TestPlacementPolicy_MultiREP(t *testing.T) { func TestPlacementPolicy_MultiREP(t *testing.T) {
nodes := []NodeInfo{ nodes := []NodeInfo{
nodeInfoFromAttributes("ID", "1", "Country", "Russia", "City", "SPB"), nodeInfoFromAttributes("ID", "1", "Country", "Russia", "City", "SPB"),

View file

@ -1,7 +1,7 @@
package netmaptest package netmaptest
import ( import (
"crypto/rand" "math/rand"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
) )
@ -70,7 +70,7 @@ func NetworkInfo() (x netmap.NetworkInfo) {
// NodeInfo returns random netmap.NodeInfo. // NodeInfo returns random netmap.NodeInfo.
func NodeInfo() (x netmap.NodeInfo) { func NodeInfo() (x netmap.NodeInfo) {
key := make([]byte, 33) key := make([]byte, 33)
_, _ = rand.Read(key) rand.Read(key)
x.SetPublicKey(key) x.SetPublicKey(key)
x.SetNetworkEndpoints("1", "2", "3") x.SetNetworkEndpoints("1", "2", "3")

View file

@ -1,10 +1,10 @@
package ns package ns
import ( import (
"crypto/rand"
"errors" "errors"
"fmt" "fmt"
"math/big" "math/big"
"math/rand"
"strings" "strings"
"testing" "testing"

View file

@ -1,8 +1,8 @@
package oidtest package oidtest
import ( import (
"crypto/rand"
"crypto/sha256" "crypto/sha256"
"math/rand"
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test" cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
@ -12,7 +12,7 @@ import (
func ID() oid.ID { func ID() oid.ID {
checksum := [sha256.Size]byte{} checksum := [sha256.Size]byte{}
_, _ = rand.Read(checksum[:]) rand.Read(checksum[:])
return idWithChecksum(checksum) return idWithChecksum(checksum)
} }

View file

@ -1,8 +1,8 @@
package object_test package object_test
import ( import (
"crypto/rand"
"crypto/sha256" "crypto/sha256"
"math/rand"
"testing" "testing"
v2object "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object" v2object "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"

View file

@ -1,8 +1,8 @@
package object package object
import ( import (
"crypto/rand"
"crypto/sha256" "crypto/sha256"
"math/rand"
"testing" "testing"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/tombstone" "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/tombstone"

View file

@ -69,7 +69,3 @@ func (c *sessionCache) expired(val *cacheValue) bool {
// use epoch+1 (clear cache beforehand) to prevent 'expired session token' error right after epoch tick // use epoch+1 (clear cache beforehand) to prevent 'expired session token' error right after epoch tick
return val.token.ExpiredAt(epoch + 1) return val.token.ExpiredAt(epoch + 1)
} }
func (c *sessionCache) Epoch() uint64 {
return c.currentEpoch.Load()
}

View file

@ -189,7 +189,3 @@ func (m *mockClient) restartIfUnhealthy(ctx context.Context) (healthy bool, chan
} }
return return
} }
func (m *mockClient) close() error {
return nil
}

View file

@ -1,172 +0,0 @@
package pool
import (
"context"
sdkClient "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/transformer"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
type logger interface {
log(level zapcore.Level, msg string, fields ...zap.Field)
}
type PrmObjectPutClientCutInit struct {
PrmObjectPut
}
func (c *clientWrapper) objectPutInitTransformer(prm PrmObjectPutClientCutInit) (*objectWriterTransformer, error) {
cl, err := c.getClient()
if err != nil {
return nil, err
}
var w objectWriterTransformer
w.it = internalTarget{
client: cl,
prm: prm,
address: c.address(),
logger: &c.clientStatusMonitor,
}
key := &c.prm.key
if prm.key != nil {
key = prm.key
}
w.ot = transformer.NewPayloadSizeLimiter(transformer.Params{
Key: key,
NextTargetInit: func() transformer.ObjectWriter { return &w.it },
MaxSize: prm.networkInfo.MaxObjectSize(),
WithoutHomomorphicHash: prm.withoutHomomorphicHash,
NetworkState: prm.networkInfo,
SessionToken: prm.stoken,
})
return &w, nil
}
type objectWriterTransformer struct {
ot transformer.ChunkedObjectWriter
it internalTarget
err error
}
func (x *objectWriterTransformer) WriteHeader(ctx context.Context, hdr object.Object) bool {
x.err = x.ot.WriteHeader(ctx, &hdr)
return x.err == nil
}
func (x *objectWriterTransformer) WritePayloadChunk(ctx context.Context, chunk []byte) bool {
_, x.err = x.ot.Write(ctx, chunk)
return x.err == nil
}
// ResObjectPut groups the final result values of ObjectPutInit operation.
type ResObjectPut struct {
Status apistatus.Status
OID oid.ID
}
// Close return non nil result in any case. If error occurred, the result contains only buffer for further reusing.
func (x *objectWriterTransformer) Close(ctx context.Context) (*ResObjectPut, error) {
ai, err := x.ot.Close(ctx)
if err != nil {
return nil, err
}
if ai != nil && ai.ParentID != nil {
x.it.res.OID = *ai.ParentID
}
return &x.it.res, nil
}
type internalTarget struct {
client *sdkClient.Client
res ResObjectPut
prm PrmObjectPutClientCutInit
useStream bool
address string
logger logger
resolveFrostFSErrors bool
}
func (it *internalTarget) WriteObject(ctx context.Context, o *object.Object) error {
putSingleImplemented, err := it.tryPutSingle(ctx, o)
if putSingleImplemented {
return err
}
it.logger.log(zapcore.DebugLevel, "putSingle not implemented, trying put as stream", zap.String("address", it.address))
it.useStream = true
return it.putAsStream(ctx, o)
}
func (it *internalTarget) putAsStream(ctx context.Context, o *object.Object) error {
var cliPrm sdkClient.PrmObjectPutInit
cliPrm.SetCopiesNumberByVectors(it.prm.copiesNumber)
if it.prm.stoken != nil {
cliPrm.WithinSession(*it.prm.stoken)
}
if it.prm.key != nil {
cliPrm.UseKey(*it.prm.key)
}
if it.prm.btoken != nil {
cliPrm.WithBearerToken(*it.prm.btoken)
}
wrt, err := it.client.ObjectPutInit(ctx, cliPrm)
if err != nil {
return err
}
if wrt.WriteHeader(ctx, *o) {
wrt.WritePayloadChunk(ctx, o.Payload())
}
res, err := wrt.Close(ctx)
if res != nil {
it.res.Status = res.Status()
it.res.OID = res.StoredObjectID()
}
return err
}
func (it *internalTarget) tryPutSingle(ctx context.Context, o *object.Object) (bool, error) {
if it.useStream {
return false, nil
}
var cliPrm sdkClient.PrmObjectPutSingle
cliPrm.SetCopiesNumber(it.prm.copiesNumber)
cliPrm.UseKey(it.prm.key)
if it.prm.stoken != nil {
cliPrm.WithinSession(*it.prm.stoken)
}
if it.prm.btoken != nil {
cliPrm.WithBearerToken(*it.prm.btoken)
}
cliPrm.SetObject(o.ToV2())
res, err := it.client.ObjectPutSingle(ctx, cliPrm)
if err != nil && status.Code(err) == codes.Unimplemented {
return false, err
}
if err == nil {
id, _ := o.ID()
it.res = ResObjectPut{
Status: res.Status(),
OID: id,
}
if !it.resolveFrostFSErrors && !apistatus.IsSuccessful(it.res.Status) {
return true, apistatus.ErrFromStatus(it.res.Status)
}
return true, nil
}
return true, err
}

View file

@ -78,16 +78,12 @@ type client interface {
dial(ctx context.Context) error dial(ctx context.Context) error
// see clientWrapper.restartIfUnhealthy. // see clientWrapper.restartIfUnhealthy.
restartIfUnhealthy(ctx context.Context) (bool, bool) restartIfUnhealthy(ctx context.Context) (bool, bool)
// see clientWrapper.close.
close() error
} }
// clientStatus provide access to some metrics for connection. // clientStatus provide access to some metrics for connection.
type clientStatus interface { type clientStatus interface {
// isHealthy checks if the connection can handle requests. // isHealthy checks if the connection can handle requests.
isHealthy() bool isHealthy() bool
// isDialed checks if the connection was created.
isDialed() bool
// setUnhealthy marks client as unhealthy. // setUnhealthy marks client as unhealthy.
setUnhealthy() setUnhealthy()
// address return address of endpoint. // address return address of endpoint.
@ -109,7 +105,7 @@ var errPoolClientUnhealthy = errors.New("pool client unhealthy")
type clientStatusMonitor struct { type clientStatusMonitor struct {
logger *zap.Logger logger *zap.Logger
addr string addr string
healthy *atomic.Uint32 healthy *atomic.Bool
errorThreshold uint32 errorThreshold uint32
mu sync.RWMutex // protect counters mu sync.RWMutex // protect counters
@ -118,22 +114,6 @@ type clientStatusMonitor struct {
methods []*methodStatus methods []*methodStatus
} }
// values for healthy status of clientStatusMonitor.
const (
// statusUnhealthyOnDial is set when dialing to the endpoint is failed,
// so there is no connection to the endpoint, and pool should not close it
// before re-establishing connection once again.
statusUnhealthyOnDial = iota
// statusUnhealthyOnRequest is set when communication after dialing to the
// endpoint is failed due to immediate or accumulated errors, connection is
// available and pool should close it before re-establishing connection once again.
statusUnhealthyOnRequest
// statusHealthy is set when connection is ready to be used by the pool.
statusHealthy
)
// methodStatus provide statistic for specific method. // methodStatus provide statistic for specific method.
type methodStatus struct { type methodStatus struct {
name string name string
@ -215,8 +195,8 @@ func newClientStatusMonitor(logger *zap.Logger, addr string, errorThreshold uint
methods[i] = &methodStatus{name: i.String()} methods[i] = &methodStatus{name: i.String()}
} }
healthy := new(atomic.Uint32) healthy := new(atomic.Bool)
healthy.Store(statusHealthy) healthy.Store(true)
return clientStatusMonitor{ return clientStatusMonitor{
logger: logger, logger: logger,
@ -272,11 +252,6 @@ func (x *wrapperPrm) setKey(key ecdsa.PrivateKey) {
x.key = key x.key = key
} }
// setLogger sets sdkClient.Client logger.
func (x *wrapperPrm) setLogger(logger *zap.Logger) {
x.logger = logger
}
// setDialTimeout sets the timeout for connection to be established. // setDialTimeout sets the timeout for connection to be established.
func (x *wrapperPrm) setDialTimeout(timeout time.Duration) { func (x *wrapperPrm) setDialTimeout(timeout time.Duration) {
x.dialTimeout = timeout x.dialTimeout = timeout
@ -342,7 +317,7 @@ func (c *clientWrapper) dial(ctx context.Context) error {
prmDial.SetGRPCDialOptions(c.prm.dialOptions...) prmDial.SetGRPCDialOptions(c.prm.dialOptions...)
if err = cl.Dial(ctx, prmDial); err != nil { if err = cl.Dial(ctx, prmDial); err != nil {
c.setUnhealthyOnDial() c.setUnhealthy()
return err return err
} }
@ -359,12 +334,6 @@ func (c *clientWrapper) restartIfUnhealthy(ctx context.Context) (healthy, change
wasHealthy = true wasHealthy = true
} }
// if connection is dialed before, to avoid routine / connection leak,
// pool has to close it and then initialize once again.
if c.isDialed() {
_ = c.close()
}
var cl sdkClient.Client var cl sdkClient.Client
var prmInit sdkClient.PrmInit var prmInit sdkClient.PrmInit
prmInit.SetDefaultPrivateKey(c.prm.key) prmInit.SetDefaultPrivateKey(c.prm.key)
@ -379,7 +348,7 @@ func (c *clientWrapper) restartIfUnhealthy(ctx context.Context) (healthy, change
prmDial.SetGRPCDialOptions(c.prm.dialOptions...) prmDial.SetGRPCDialOptions(c.prm.dialOptions...)
if err := cl.Dial(ctx, prmDial); err != nil { if err := cl.Dial(ctx, prmDial); err != nil {
c.setUnhealthyOnDial() c.setUnhealthy()
return false, wasHealthy return false, wasHealthy
} }
@ -580,9 +549,11 @@ func (c *clientWrapper) containerSetEACL(ctx context.Context, prm PrmContainerSe
return err return err
} }
cliPrm := sdkClient.PrmContainerSetEACL{ var cliPrm sdkClient.PrmContainerSetEACL
Table: &prm.Table, cliPrm.SetTable(prm.table)
Session: prm.Session,
if prm.sessionSet {
cliPrm.WithinSession(prm.session)
} }
start := time.Now() start := time.Now()
@ -596,19 +567,16 @@ func (c *clientWrapper) containerSetEACL(ctx context.Context, prm PrmContainerSe
return fmt.Errorf("set eacl on client: %w", err) return fmt.Errorf("set eacl on client: %w", err)
} }
if prm.WaitParams == nil { if !prm.waitParamsSet {
prm.WaitParams = defaultWaitParams() prm.waitParams.setDefaults()
}
if err := prm.WaitParams.CheckValidity(); err != nil {
return fmt.Errorf("invalid wait parameters: %w", err)
} }
var cIDp *cid.ID var cIDp *cid.ID
if cID, set := prm.Table.CID(); set { if cID, set := prm.table.CID(); set {
cIDp = &cID cIDp = &cID
} }
err = waitForEACLPresence(ctx, c, cIDp, &prm.Table, prm.WaitParams) err = waitForEACLPresence(ctx, c, cIDp, &prm.table, &prm.waitParams)
if err = c.handleError(ctx, nil, err); err != nil { if err = c.handleError(ctx, nil, err); err != nil {
return fmt.Errorf("wait eacl presence on client: %w", err) return fmt.Errorf("wait eacl presence on client: %w", err)
} }
@ -660,18 +628,6 @@ func (c *clientWrapper) networkInfo(ctx context.Context, _ prmNetworkInfo) (netm
// objectPut writes object to FrostFS. // objectPut writes object to FrostFS.
func (c *clientWrapper) objectPut(ctx context.Context, prm PrmObjectPut) (oid.ID, error) { func (c *clientWrapper) objectPut(ctx context.Context, prm PrmObjectPut) (oid.ID, error) {
if prm.bufferMaxSize == 0 {
prm.bufferMaxSize = defaultBufferMaxSizeForPut
}
if prm.clientCut {
return c.objectPutClientCut(ctx, prm)
}
return c.objectPutServerCut(ctx, prm)
}
func (c *clientWrapper) objectPutServerCut(ctx context.Context, prm PrmObjectPut) (oid.ID, error) {
cl, err := c.getClient() cl, err := c.getClient()
if err != nil { if err != nil {
return oid.ID{}, err return oid.ID{}, err
@ -709,8 +665,10 @@ func (c *clientWrapper) objectPutServerCut(ctx context.Context, prm PrmObjectPut
} }
if prm.payload != nil { if prm.payload != nil {
if sz == 0 || sz > prm.bufferMaxSize { const defaultBufferSizePut = 3 << 20 // configure?
sz = prm.bufferMaxSize
if sz == 0 || sz > defaultBufferSizePut {
sz = defaultBufferSizePut
} }
buf := make([]byte, sz) buf := make([]byte, sz)
@ -751,73 +709,6 @@ func (c *clientWrapper) objectPutServerCut(ctx context.Context, prm PrmObjectPut
return res.StoredObjectID(), nil return res.StoredObjectID(), nil
} }
func (c *clientWrapper) objectPutClientCut(ctx context.Context, prm PrmObjectPut) (oid.ID, error) {
putInitPrm := PrmObjectPutClientCutInit{
PrmObjectPut: prm,
}
start := time.Now()
wObj, err := c.objectPutInitTransformer(putInitPrm)
c.incRequests(time.Since(start), methodObjectPut)
if err = c.handleError(ctx, nil, err); err != nil {
return oid.ID{}, fmt.Errorf("init writing on API client: %w", err)
}
if wObj.WriteHeader(ctx, prm.hdr) {
sz := prm.hdr.PayloadSize()
if data := prm.hdr.Payload(); len(data) > 0 {
if prm.payload != nil {
prm.payload = io.MultiReader(bytes.NewReader(data), prm.payload)
} else {
prm.payload = bytes.NewReader(data)
sz = uint64(len(data))
}
}
if prm.payload != nil {
if sz == 0 || sz > prm.bufferMaxSize {
sz = prm.bufferMaxSize
}
buf := make([]byte, sz)
var n int
for {
n, err = prm.payload.Read(buf)
if n > 0 {
start = time.Now()
successWrite := wObj.WritePayloadChunk(ctx, buf[:n])
c.incRequests(time.Since(start), methodObjectPut)
if !successWrite {
break
}
continue
}
if errors.Is(err, io.EOF) {
break
}
return oid.ID{}, fmt.Errorf("read payload: %w", c.handleError(ctx, nil, err))
}
}
}
res, err := wObj.Close(ctx)
var st apistatus.Status
if res != nil {
st = res.Status
}
if err = c.handleError(ctx, st, err); err != nil { // here err already carries both status and client errors
return oid.ID{}, fmt.Errorf("client failure: %w", err)
}
return res.OID, nil
}
// objectDelete invokes sdkClient.ObjectDelete parse response status to error. // objectDelete invokes sdkClient.ObjectDelete parse response status to error.
func (c *clientWrapper) objectDelete(ctx context.Context, prm PrmObjectDelete) error { func (c *clientWrapper) objectDelete(ctx context.Context, prm PrmObjectDelete) error {
cl, err := c.getClient() cl, err := c.getClient()
@ -825,15 +716,20 @@ func (c *clientWrapper) objectDelete(ctx context.Context, prm PrmObjectDelete) e
return err return err
} }
cnr := prm.addr.Container() var cliPrm sdkClient.PrmObjectDelete
obj := prm.addr.Object() cliPrm.FromContainer(prm.addr.Container())
cliPrm.ByID(prm.addr.Object())
cliPrm := sdkClient.PrmObjectDelete{ if prm.stoken != nil {
BearerToken: prm.btoken, cliPrm.WithinSession(*prm.stoken)
Session: prm.stoken, }
ContainerID: &cnr,
ObjectID: &obj, if prm.btoken != nil {
Key: prm.key, cliPrm.WithBearerToken(*prm.btoken)
}
if prm.key != nil {
cliPrm.UseKey(*prm.key)
} }
start := time.Now() start := time.Now()
@ -856,15 +752,20 @@ func (c *clientWrapper) objectGet(ctx context.Context, prm PrmObjectGet) (ResGet
return ResGetObject{}, err return ResGetObject{}, err
} }
prmCnr := prm.addr.Container() var cliPrm sdkClient.PrmObjectGet
prmObj := prm.addr.Object() cliPrm.FromContainer(prm.addr.Container())
cliPrm.ByID(prm.addr.Object())
cliPrm := sdkClient.PrmObjectGet{ if prm.stoken != nil {
BearerToken: prm.btoken, cliPrm.WithinSession(*prm.stoken)
Session: prm.stoken, }
ContainerID: &prmCnr,
ObjectID: &prmObj, if prm.btoken != nil {
Key: prm.key, cliPrm.WithBearerToken(*prm.btoken)
}
if prm.key != nil {
cliPrm.UseKey(*prm.key)
} }
var res ResGetObject var res ResGetObject
@ -904,16 +805,23 @@ func (c *clientWrapper) objectHead(ctx context.Context, prm PrmObjectHead) (obje
return object.Object{}, err return object.Object{}, err
} }
prmCnr := prm.addr.Container() var cliPrm sdkClient.PrmObjectHead
prmObj := prm.addr.Object() cliPrm.FromContainer(prm.addr.Container())
cliPrm.ByID(prm.addr.Object())
if prm.raw {
cliPrm.MarkRaw()
}
cliPrm := sdkClient.PrmObjectHead{ if prm.stoken != nil {
BearerToken: prm.btoken, cliPrm.WithinSession(*prm.stoken)
Session: prm.stoken, }
Raw: prm.raw,
ContainerID: &prmCnr, if prm.btoken != nil {
ObjectID: &prmObj, cliPrm.WithBearerToken(*prm.btoken)
Key: prm.key, }
if prm.key != nil {
cliPrm.UseKey(*prm.key)
} }
var obj object.Object var obj object.Object
@ -942,17 +850,22 @@ func (c *clientWrapper) objectRange(ctx context.Context, prm PrmObjectRange) (Re
return ResObjectRange{}, err return ResObjectRange{}, err
} }
prmCnr := prm.addr.Container() var cliPrm sdkClient.PrmObjectRange
prmObj := prm.addr.Object() cliPrm.FromContainer(prm.addr.Container())
cliPrm.ByID(prm.addr.Object())
cliPrm.SetOffset(prm.off)
cliPrm.SetLength(prm.ln)
cliPrm := sdkClient.PrmObjectRange{ if prm.stoken != nil {
BearerToken: prm.btoken, cliPrm.WithinSession(*prm.stoken)
Session: prm.stoken, }
ContainerID: &prmCnr,
ObjectID: &prmObj, if prm.btoken != nil {
Offset: prm.off, cliPrm.WithBearerToken(*prm.btoken)
Length: prm.ln, }
Key: prm.key,
if prm.key != nil {
cliPrm.UseKey(*prm.key)
} }
start := time.Now() start := time.Now()
@ -1009,10 +922,9 @@ func (c *clientWrapper) sessionCreate(ctx context.Context, prm prmCreateSession)
return resCreateSession{}, err return resCreateSession{}, err
} }
cliPrm := sdkClient.PrmSessionCreate{ var cliPrm sdkClient.PrmSessionCreate
Expiration: prm.exp, cliPrm.SetExp(prm.exp)
Key: &prm.key, cliPrm.UseKey(prm.key)
}
start := time.Now() start := time.Now()
res, err := cl.SessionCreate(ctx, cliPrm) res, err := cl.SessionCreate(ctx, cliPrm)
@ -1032,23 +944,15 @@ func (c *clientWrapper) sessionCreate(ctx context.Context, prm prmCreateSession)
} }
func (c *clientStatusMonitor) isHealthy() bool { func (c *clientStatusMonitor) isHealthy() bool {
return c.healthy.Load() == statusHealthy return c.healthy.Load()
}
func (c *clientStatusMonitor) isDialed() bool {
return c.healthy.Load() != statusUnhealthyOnDial
} }
func (c *clientStatusMonitor) setHealthy() { func (c *clientStatusMonitor) setHealthy() {
c.healthy.Store(statusHealthy) c.healthy.Store(true)
} }
func (c *clientStatusMonitor) setUnhealthy() { func (c *clientStatusMonitor) setUnhealthy() {
c.healthy.Store(statusUnhealthyOnRequest) c.healthy.Store(false)
}
func (c *clientStatusMonitor) setUnhealthyOnDial() {
c.healthy.Store(statusUnhealthyOnDial)
} }
func (c *clientStatusMonitor) address() string { func (c *clientStatusMonitor) address() string {
@ -1067,20 +971,12 @@ func (c *clientStatusMonitor) incErrorRate() {
} }
c.mu.Unlock() c.mu.Unlock()
if thresholdReached { if thresholdReached && c.logger != nil {
c.log(zapcore.WarnLevel, "error threshold reached", c.logger.Warn("error threshold reached",
zap.String("address", c.addr), zap.Uint32("threshold", c.errorThreshold)) zap.String("address", c.addr), zap.Uint32("threshold", c.errorThreshold))
} }
} }
func (c *clientStatusMonitor) log(level zapcore.Level, msg string, fields ...zap.Field) {
if c.logger == nil {
return
}
c.logger.Log(level, msg, fields...)
}
func (c *clientStatusMonitor) currentErrorRate() uint32 { func (c *clientStatusMonitor) currentErrorRate() uint32 {
c.mu.RLock() c.mu.RLock()
defer c.mu.RUnlock() defer c.mu.RUnlock()
@ -1114,13 +1010,6 @@ func (c *clientWrapper) incRequests(elapsed time.Duration, method MethodIndex) {
} }
} }
func (c *clientWrapper) close() error {
if c.client != nil {
return c.client.Close()
}
return nil
}
func (c *clientStatusMonitor) handleError(ctx context.Context, st apistatus.Status, err error) error { func (c *clientStatusMonitor) handleError(ctx context.Context, st apistatus.Status, err error) error {
if err != nil { if err != nil {
if needCountError(ctx, err) { if needCountError(ctx, err) {
@ -1338,6 +1227,12 @@ func (x *WaitParams) SetPollInterval(tick time.Duration) {
x.PollInterval = tick x.PollInterval = tick
} }
// Deprecated: Use defaultWaitParams() instead.
func (x *WaitParams) setDefaults() {
x.Timeout = 120 * time.Second
x.PollInterval = 5 * time.Second
}
func defaultWaitParams() *WaitParams { func defaultWaitParams() *WaitParams {
return &WaitParams{ return &WaitParams{
Timeout: 120 * time.Second, Timeout: 120 * time.Second,
@ -1426,13 +1321,6 @@ type PrmObjectPut struct {
payload io.Reader payload io.Reader
copiesNumber []uint32 copiesNumber []uint32
clientCut bool
networkInfo netmap.NetworkInfo
withoutHomomorphicHash bool
bufferMaxSize uint64
} }
// SetHeader specifies header of the object. // SetHeader specifies header of the object.
@ -1457,32 +1345,6 @@ func (x *PrmObjectPut) SetCopiesNumberVector(copiesNumber []uint32) {
x.copiesNumber = copiesNumber x.copiesNumber = copiesNumber
} }
// SetClientCut enables client cut for objects. It means that full object is prepared on client side
// and retrying is possible. But this leads to additional memory using for buffering object parts.
// Buffer size for every put is MaxObjectSize value from FrostFS network.
// There is limit for total memory allocation for in-flight request and
// can be set by InitParameters.SetMaxClientCutMemory (default value is 1gb).
// Put requests will fail if this limit be reached.
func (x *PrmObjectPut) SetClientCut(clientCut bool) {
x.clientCut = clientCut
}
// WithoutHomomorphicHash if set to true do not use Tillich-Zémor hash for payload.
func (x *PrmObjectPut) WithoutHomomorphicHash(v bool) {
x.withoutHomomorphicHash = v
}
// SetBufferMaxSize sets max buffer size to read payload.
// This value isn't used if object size is set explicitly and less than this value.
// Default value 3MB.
func (x *PrmObjectPut) SetBufferMaxSize(size uint64) {
x.bufferMaxSize = size
}
func (x *PrmObjectPut) setNetworkInfo(ni netmap.NetworkInfo) {
x.networkInfo = ni
}
// PrmObjectDelete groups parameters of DeleteObject operation. // PrmObjectDelete groups parameters of DeleteObject operation.
type PrmObjectDelete struct { type PrmObjectDelete struct {
prmCommon prmCommon
@ -1670,41 +1532,39 @@ func (x *PrmContainerEACL) SetContainerID(cnrID cid.ID) {
// PrmContainerSetEACL groups parameters of SetEACL operation. // PrmContainerSetEACL groups parameters of SetEACL operation.
type PrmContainerSetEACL struct { type PrmContainerSetEACL struct {
Table eacl.Table table eacl.Table
Session *session.Container sessionSet bool
session session.Container
WaitParams *WaitParams waitParams WaitParams
waitParamsSet bool
} }
// SetTable sets structure of container's extended ACL to be used as a // SetTable sets structure of container's extended ACL to be used as a
// parameter of the base client's operation. // parameter of the base client's operation.
// //
// See git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client.PrmContainerSetEACL.SetTable. // See git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client.PrmContainerSetEACL.SetTable.
//
// Deprecated: Use PrmContainerSetEACL.Table instead.
func (x *PrmContainerSetEACL) SetTable(table eacl.Table) { func (x *PrmContainerSetEACL) SetTable(table eacl.Table) {
x.Table = table x.table = table
} }
// WithinSession specifies session to be used as a parameter of the base // WithinSession specifies session to be used as a parameter of the base
// client's operation. // client's operation.
// //
// See git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client.PrmContainerSetEACL.WithinSession. // See git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client.PrmContainerSetEACL.WithinSession.
//
// Deprecated: Use PrmContainerSetEACL.Session instead.
func (x *PrmContainerSetEACL) WithinSession(s session.Container) { func (x *PrmContainerSetEACL) WithinSession(s session.Container) {
x.Session = &s x.session = s
x.sessionSet = true
} }
// SetWaitParams specifies timeout params to complete operation. // SetWaitParams specifies timeout params to complete operation.
// If not provided the default one will be used. // If not provided the default one will be used.
// Panics if any of the wait params isn't positive. // Panics if any of the wait params isn't positive.
//
// Deprecated: Use PrmContainerSetEACL.WaitParams instead.
func (x *PrmContainerSetEACL) SetWaitParams(waitParams WaitParams) { func (x *PrmContainerSetEACL) SetWaitParams(waitParams WaitParams) {
waitParams.checkForPositive() waitParams.checkForPositive()
x.WaitParams = &waitParams x.waitParams = waitParams
x.waitParamsSet = true
} }
// PrmBalanceGet groups parameters of Balance operation. // PrmBalanceGet groups parameters of Balance operation.
@ -1778,8 +1638,6 @@ type Pool struct {
rebalanceParams rebalanceParameters rebalanceParams rebalanceParameters
clientBuilder clientBuilder clientBuilder clientBuilder
logger *zap.Logger logger *zap.Logger
maxObjectSize uint64
} }
type innerPool struct { type innerPool struct {
@ -1796,8 +1654,6 @@ const (
defaultHealthcheckTimeout = 4 * time.Second defaultHealthcheckTimeout = 4 * time.Second
defaultDialTimeout = 5 * time.Second defaultDialTimeout = 5 * time.Second
defaultStreamTimeout = 10 * time.Second defaultStreamTimeout = 10 * time.Second
defaultBufferMaxSizeForPut = 3 * 1024 * 1024 // 3 MB
) )
// NewPool creates connection pool using parameters. // NewPool creates connection pool using parameters.
@ -1857,7 +1713,7 @@ func (p *Pool) Dial(ctx context.Context) error {
} }
var st session.Object var st session.Object
err := initSessionForDuration(ctx, &st, clients[j], p.rebalanceParams.sessionExpirationDuration, *p.key, false) err := initSessionForDuration(ctx, &st, clients[j], p.rebalanceParams.sessionExpirationDuration, *p.key)
if err != nil { if err != nil {
clients[j].setUnhealthy() clients[j].setUnhealthy()
p.log(zap.WarnLevel, "failed to create frostfs session token for client", p.log(zap.WarnLevel, "failed to create frostfs session token for client",
@ -1865,7 +1721,7 @@ func (p *Pool) Dial(ctx context.Context) error {
continue continue
} }
_ = p.cache.Put(formCacheKey(addr, p.key, false), st) _ = p.cache.Put(formCacheKey(addr, p.key), st)
atLeastOneHealthy = true atLeastOneHealthy = true
} }
source := rand.NewSource(time.Now().UnixNano()) source := rand.NewSource(time.Now().UnixNano())
@ -1886,12 +1742,6 @@ func (p *Pool) Dial(ctx context.Context) error {
p.closedCh = make(chan struct{}) p.closedCh = make(chan struct{})
p.innerPools = inner p.innerPools = inner
ni, err := p.NetworkInfo(ctx)
if err != nil {
return fmt.Errorf("get network info for max object size: %w", err)
}
p.maxObjectSize = ni.MaxObjectSize()
go p.startRebalance(ctx) go p.startRebalance(ctx)
return nil return nil
} }
@ -1934,7 +1784,6 @@ func fillDefaultInitParams(params *InitParameters, cache *sessionCache) {
var prm wrapperPrm var prm wrapperPrm
prm.setAddress(addr) prm.setAddress(addr)
prm.setKey(*params.key) prm.setKey(*params.key)
prm.setLogger(params.logger)
prm.setDialTimeout(params.nodeDialTimeout) prm.setDialTimeout(params.nodeDialTimeout)
prm.setStreamTimeout(params.nodeStreamTimeout) prm.setStreamTimeout(params.nodeStreamTimeout)
prm.setErrorThreshold(params.errorThreshold) prm.setErrorThreshold(params.errorThreshold)
@ -2103,15 +1952,9 @@ func (p *innerPool) connection() (client, error) {
return nil, errors.New("no healthy client") return nil, errors.New("no healthy client")
} }
func formCacheKey(address string, key *ecdsa.PrivateKey, clientCut bool) string { func formCacheKey(address string, key *ecdsa.PrivateKey) string {
k := keys.PrivateKey{PrivateKey: *key} k := keys.PrivateKey{PrivateKey: *key}
return address + k.String()
stype := "server"
if clientCut {
stype = "client"
}
return address + stype + k.String()
} }
func (p *Pool) checkSessionTokenErr(err error, address string) bool { func (p *Pool) checkSessionTokenErr(err error, address string) bool {
@ -2127,7 +1970,7 @@ func (p *Pool) checkSessionTokenErr(err error, address string) bool {
return false return false
} }
func initSessionForDuration(ctx context.Context, dst *session.Object, c client, dur uint64, ownerKey ecdsa.PrivateKey, clientCut bool) error { func initSessionForDuration(ctx context.Context, dst *session.Object, c client, dur uint64, ownerKey ecdsa.PrivateKey) error {
ni, err := c.networkInfo(ctx, prmNetworkInfo{}) ni, err := c.networkInfo(ctx, prmNetworkInfo{})
if err != nil { if err != nil {
return err return err
@ -2145,25 +1988,23 @@ func initSessionForDuration(ctx context.Context, dst *session.Object, c client,
prm.setExp(exp) prm.setExp(exp)
prm.useKey(ownerKey) prm.useKey(ownerKey)
var ( res, err := c.sessionCreate(ctx, prm)
id uuid.UUID if err != nil {
key frostfsecdsa.PublicKey return err
) }
if clientCut { var id uuid.UUID
id = uuid.New()
key = frostfsecdsa.PublicKey(ownerKey.PublicKey) err = id.UnmarshalBinary(res.id)
} else { if err != nil {
res, err := c.sessionCreate(ctx, prm) return fmt.Errorf("invalid session token ID: %w", err)
if err != nil { }
return err
} var key frostfsecdsa.PublicKey
if err = id.UnmarshalBinary(res.id); err != nil {
return fmt.Errorf("invalid session token ID: %w", err) err = key.Decode(res.sessionKey)
} if err != nil {
if err = key.Decode(res.sessionKey); err != nil { return fmt.Errorf("invalid public session key: %w", err)
return fmt.Errorf("invalid public session key: %w", err)
}
} }
dst.SetID(id) dst.SetID(id)
@ -2189,8 +2030,6 @@ type callContext struct {
sessionCnr cid.ID sessionCnr cid.ID
sessionObjSet bool sessionObjSet bool
sessionObjs []oid.ID sessionObjs []oid.ID
sessionClientCut bool
} }
func (p *Pool) initCallContext(ctx *callContext, cfg prmCommon, prmCtx prmContext) error { func (p *Pool) initCallContext(ctx *callContext, cfg prmCommon, prmCtx prmContext) error {
@ -2227,12 +2066,12 @@ func (p *Pool) initCallContext(ctx *callContext, cfg prmCommon, prmCtx prmContex
// opens new session or uses cached one. // opens new session or uses cached one.
// Must be called only on initialized callContext with set sessionTarget. // Must be called only on initialized callContext with set sessionTarget.
func (p *Pool) openDefaultSession(ctx context.Context, cc *callContext) error { func (p *Pool) openDefaultSession(ctx context.Context, cc *callContext) error {
cacheKey := formCacheKey(cc.endpoint, cc.key, cc.sessionClientCut) cacheKey := formCacheKey(cc.endpoint, cc.key)
tok, ok := p.cache.Get(cacheKey) tok, ok := p.cache.Get(cacheKey)
if !ok { if !ok {
// init new session // init new session
err := initSessionForDuration(ctx, &tok, cc.client, p.stokenDuration, *cc.key, cc.sessionClientCut) err := initSessionForDuration(ctx, &tok, cc.client, p.stokenDuration, *cc.key)
if err != nil { if err != nil {
return fmt.Errorf("session API client: %w", err) return fmt.Errorf("session API client: %w", err)
} }
@ -2297,7 +2136,6 @@ func (p *Pool) PutObject(ctx context.Context, prm PrmObjectPut) (oid.ID, error)
p.fillAppropriateKey(&prm.prmCommon) p.fillAppropriateKey(&prm.prmCommon)
var ctxCall callContext var ctxCall callContext
ctxCall.sessionClientCut = prm.clientCut
if err := p.initCallContext(&ctxCall, prm.prmCommon, prmCtx); err != nil { if err := p.initCallContext(&ctxCall, prm.prmCommon, prmCtx); err != nil {
return oid.ID{}, fmt.Errorf("init call context: %w", err) return oid.ID{}, fmt.Errorf("init call context: %w", err)
} }
@ -2309,13 +2147,6 @@ func (p *Pool) PutObject(ctx context.Context, prm PrmObjectPut) (oid.ID, error)
} }
} }
if prm.clientCut {
var ni netmap.NetworkInfo
ni.SetCurrentEpoch(p.cache.Epoch())
ni.SetMaxObjectSize(p.maxObjectSize) // we want to use initial max object size in PayloadSizeLimiter
prm.setNetworkInfo(ni)
}
id, err := ctxCall.client.objectPut(ctx, prm) id, err := ctxCall.client.objectPut(ctx, prm)
if err != nil { if err != nil {
// removes session token from cache in case of token error // removes session token from cache in case of token error
@ -2807,15 +2638,6 @@ func (p *Pool) NetworkInfo(ctx context.Context) (netmap.NetworkInfo, error) {
func (p *Pool) Close() { func (p *Pool) Close() {
p.cancel() p.cancel()
<-p.closedCh <-p.closedCh
// close all clients
for _, pools := range p.innerPools {
for _, cli := range pools.clients {
if cli.isDialed() {
_ = cli.close()
}
}
}
} }
// SyncContainerWithNetwork applies network configuration received via // SyncContainerWithNetwork applies network configuration received via

View file

@ -106,7 +106,7 @@ func TestBuildPoolOneNodeFailed(t *testing.T) {
if err != nil { if err != nil {
return false return false
} }
st, _ := clientPool.cache.Get(formCacheKey(cp.address(), clientPool.key, false)) st, _ := clientPool.cache.Get(formCacheKey(cp.address(), clientPool.key))
return st.AssertAuthKey(&expectedAuthKey) return st.AssertAuthKey(&expectedAuthKey)
} }
require.Never(t, condition, 900*time.Millisecond, 100*time.Millisecond) require.Never(t, condition, 900*time.Millisecond, 100*time.Millisecond)
@ -141,7 +141,7 @@ func TestOneNode(t *testing.T) {
cp, err := pool.connection() cp, err := pool.connection()
require.NoError(t, err) require.NoError(t, err)
st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key, false)) st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key))
expectedAuthKey := frostfsecdsa.PublicKey(key1.PublicKey) expectedAuthKey := frostfsecdsa.PublicKey(key1.PublicKey)
require.True(t, st.AssertAuthKey(&expectedAuthKey)) require.True(t, st.AssertAuthKey(&expectedAuthKey))
} }
@ -171,7 +171,7 @@ func TestTwoNodes(t *testing.T) {
cp, err := pool.connection() cp, err := pool.connection()
require.NoError(t, err) require.NoError(t, err)
st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key, false)) st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key))
require.True(t, assertAuthKeyForAny(st, clientKeys)) require.True(t, assertAuthKeyForAny(st, clientKeys))
} }
@ -226,7 +226,7 @@ func TestOneOfTwoFailed(t *testing.T) {
for i := 0; i < 5; i++ { for i := 0; i < 5; i++ {
cp, err := pool.connection() cp, err := pool.connection()
require.NoError(t, err) require.NoError(t, err)
st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key, false)) st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key))
require.True(t, assertAuthKeyForAny(st, clientKeys)) require.True(t, assertAuthKeyForAny(st, clientKeys))
} }
} }
@ -296,7 +296,7 @@ func TestSessionCache(t *testing.T) {
// cache must contain session token // cache must contain session token
cp, err := pool.connection() cp, err := pool.connection()
require.NoError(t, err) require.NoError(t, err)
st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key, false)) st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key))
require.True(t, st.AssertAuthKey(&expectedAuthKey)) require.True(t, st.AssertAuthKey(&expectedAuthKey))
var prm PrmObjectGet var prm PrmObjectGet
@ -309,7 +309,7 @@ func TestSessionCache(t *testing.T) {
// cache must not contain session token // cache must not contain session token
cp, err = pool.connection() cp, err = pool.connection()
require.NoError(t, err) require.NoError(t, err)
_, ok := pool.cache.Get(formCacheKey(cp.address(), pool.key, false)) _, ok := pool.cache.Get(formCacheKey(cp.address(), pool.key))
require.False(t, ok) require.False(t, ok)
var prm2 PrmObjectPut var prm2 PrmObjectPut
@ -321,7 +321,7 @@ func TestSessionCache(t *testing.T) {
// cache must contain session token // cache must contain session token
cp, err = pool.connection() cp, err = pool.connection()
require.NoError(t, err) require.NoError(t, err)
st, _ = pool.cache.Get(formCacheKey(cp.address(), pool.key, false)) st, _ = pool.cache.Get(formCacheKey(cp.address(), pool.key))
require.True(t, st.AssertAuthKey(&expectedAuthKey)) require.True(t, st.AssertAuthKey(&expectedAuthKey))
} }
@ -365,7 +365,7 @@ func TestPriority(t *testing.T) {
firstNode := func() bool { firstNode := func() bool {
cp, err := pool.connection() cp, err := pool.connection()
require.NoError(t, err) require.NoError(t, err)
st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key, false)) st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key))
return st.AssertAuthKey(&expectedAuthKey1) return st.AssertAuthKey(&expectedAuthKey1)
} }
@ -373,7 +373,7 @@ func TestPriority(t *testing.T) {
secondNode := func() bool { secondNode := func() bool {
cp, err := pool.connection() cp, err := pool.connection()
require.NoError(t, err) require.NoError(t, err)
st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key, false)) st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key))
return st.AssertAuthKey(&expectedAuthKey2) return st.AssertAuthKey(&expectedAuthKey2)
} }
require.Never(t, secondNode, time.Second, 200*time.Millisecond) require.Never(t, secondNode, time.Second, 200*time.Millisecond)
@ -410,7 +410,7 @@ func TestSessionCacheWithKey(t *testing.T) {
// cache must contain session token // cache must contain session token
cp, err := pool.connection() cp, err := pool.connection()
require.NoError(t, err) require.NoError(t, err)
st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key, false)) st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key))
require.True(t, st.AssertAuthKey(&expectedAuthKey)) require.True(t, st.AssertAuthKey(&expectedAuthKey))
var prm PrmObjectDelete var prm PrmObjectDelete
@ -420,7 +420,7 @@ func TestSessionCacheWithKey(t *testing.T) {
err = pool.DeleteObject(ctx, prm) err = pool.DeleteObject(ctx, prm)
require.NoError(t, err) require.NoError(t, err)
st, _ = pool.cache.Get(formCacheKey(cp.address(), anonKey, false)) st, _ = pool.cache.Get(formCacheKey(cp.address(), anonKey))
require.True(t, st.AssertAuthKey(&expectedAuthKey)) require.True(t, st.AssertAuthKey(&expectedAuthKey))
} }
@ -523,44 +523,6 @@ func TestStatusMonitor(t *testing.T) {
require.Equal(t, uint64(count), monitor.overallErrorRate()) require.Equal(t, uint64(count), monitor.overallErrorRate())
require.Equal(t, uint32(1), monitor.currentErrorRate()) require.Equal(t, uint32(1), monitor.currentErrorRate())
t.Run("healthy status", func(t *testing.T) {
cases := []struct {
action func(*clientStatusMonitor)
status uint32
isDialed bool
isHealthy bool
description string
}{
{
action: func(m *clientStatusMonitor) { m.setUnhealthyOnDial() },
status: statusUnhealthyOnDial,
isDialed: false,
isHealthy: false,
description: "set unhealthy on dial",
},
{
action: func(m *clientStatusMonitor) { m.setUnhealthy() },
status: statusUnhealthyOnRequest,
isDialed: true,
isHealthy: false,
description: "set unhealthy on request",
},
{
action: func(m *clientStatusMonitor) { m.setHealthy() },
status: statusHealthy,
isDialed: true,
isHealthy: true,
description: "set healthy",
},
}
for _, tc := range cases {
tc.action(&monitor)
require.Equal(t, tc.status, monitor.healthy.Load())
require.Equal(t, tc.isDialed, monitor.isDialed())
require.Equal(t, tc.isHealthy, monitor.isHealthy())
}
})
} }
func TestHandleError(t *testing.T) { func TestHandleError(t *testing.T) {

View file

@ -3,7 +3,6 @@ package tree
import ( import (
"context" "context"
"crypto/tls" "crypto/tls"
"errors"
"fmt" "fmt"
"sync" "sync"
@ -23,11 +22,6 @@ type treeClient struct {
healthy bool healthy bool
} }
var (
// ErrUnhealthyEndpoint is returned when client in the pool considered unavailable.
ErrUnhealthyEndpoint = errors.New("unhealthy endpoint")
)
// newTreeClient creates new tree client with auto dial. // newTreeClient creates new tree client with auto dial.
func newTreeClient(addr string, opts ...grpc.DialOption) *treeClient { func newTreeClient(addr string, opts ...grpc.DialOption) *treeClient {
return &treeClient{ return &treeClient{
@ -108,7 +102,7 @@ func (c *treeClient) serviceClient() (grpcService.TreeServiceClient, error) {
defer c.mu.RUnlock() defer c.mu.RUnlock()
if c.conn == nil || !c.healthy { if c.conn == nil || !c.healthy {
return nil, fmt.Errorf("%w: '%s'", ErrUnhealthyEndpoint, c.address) return nil, fmt.Errorf("unhealthy endpoint: '%s'", c.address)
} }
return c.service, nil return c.service, nil

View file

@ -32,9 +32,6 @@ var (
// ErrNodeAccessDenied is returned from Tree service in case of access denied error. // ErrNodeAccessDenied is returned from Tree service in case of access denied error.
ErrNodeAccessDenied = errors.New("access denied") ErrNodeAccessDenied = errors.New("access denied")
// errNodeEmpty is used to trigger retry when 'GetNodeByPath' return empty result.
errNodeEmptyResult = errors.New("empty result")
) )
// client represents virtual connection to the single FrostFS tree service from which Pool is formed. // client represents virtual connection to the single FrostFS tree service from which Pool is formed.
@ -299,15 +296,8 @@ func (p *Pool) GetNodes(ctx context.Context, prm GetNodesParams) ([]*grpcService
var resp *grpcService.GetNodeByPathResponse var resp *grpcService.GetNodeByPathResponse
if err := p.requestWithRetry(func(client grpcService.TreeServiceClient) (inErr error) { if err := p.requestWithRetry(func(client grpcService.TreeServiceClient) (inErr error) {
resp, inErr = client.GetNodeByPath(ctx, request) resp, inErr = client.GetNodeByPath(ctx, request)
// Pool wants to do retry 'GetNodeByPath' request if result is empty.
// Empty result is expected due to delayed tree service sync.
// Return an error there to trigger retry and ignore it after,
// to keep compatibility with 'GetNodeByPath' implementation.
if inErr == nil && len(resp.Body.Nodes) == 0 {
return errNodeEmptyResult
}
return handleError("failed to get node by path", inErr) return handleError("failed to get node by path", inErr)
}); err != nil && !errors.Is(err, errNodeEmptyResult) { }); err != nil {
return nil, err return nil, err
} }
@ -730,8 +720,8 @@ func (p *Pool) setStartIndices(i, j int) {
func (p *Pool) requestWithRetry(fn func(client grpcService.TreeServiceClient) error) error { func (p *Pool) requestWithRetry(fn func(client grpcService.TreeServiceClient) error) error {
var ( var (
err, finErr error err error
cl grpcService.TreeServiceClient cl grpcService.TreeServiceClient
) )
startI, startJ := p.getStartIndices() startI, startJ := p.getStartIndices()
@ -750,44 +740,16 @@ func (p *Pool) requestWithRetry(fn func(client grpcService.TreeServiceClient) er
} }
return err return err
} }
finErr = finalError(finErr, err)
p.log(zap.DebugLevel, "tree request error", zap.String("address", p.innerPools[indexI].clients[indexJ].endpoint()), zap.Error(err)) p.log(zap.DebugLevel, "tree request error", zap.String("address", p.innerPools[indexI].clients[indexJ].endpoint()), zap.Error(err))
} }
startJ = 0 startJ = 0
} }
return finErr return err
} }
func shouldTryAgain(err error) bool { func shouldTryAgain(err error) bool {
return !(err == nil || errors.Is(err, ErrNodeAccessDenied)) return !(err == nil ||
} errors.Is(err, ErrNodeNotFound) ||
errors.Is(err, ErrNodeAccessDenied))
func prioErr(err error) int {
switch {
case err == nil:
return -1
case errors.Is(err, ErrNodeAccessDenied):
return 100
case errors.Is(err, ErrNodeNotFound) ||
errors.Is(err, errNodeEmptyResult):
return 200
case errors.Is(err, ErrUnhealthyEndpoint):
return 300
default:
return 500
}
}
func finalError(current, candidate error) error {
if current == nil || candidate == nil {
return candidate
}
// lower priority error is more desirable
if prioErr(candidate) < prioErr(current) {
return candidate
}
return current
} }

View file

@ -156,43 +156,6 @@ func TestRetry(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
checkIndicesAndReset(t, p, 0, 0) checkIndicesAndReset(t, p, 0, 0)
}) })
t.Run("error empty result", func(t *testing.T) {
errNodes, index := 2, 0
err := p.requestWithRetry(func(client grpcService.TreeServiceClient) error {
if index < errNodes {
index++
return errNodeEmptyResult
}
return nil
})
require.NoError(t, err)
checkIndicesAndReset(t, p, 0, errNodes)
})
t.Run("error not found", func(t *testing.T) {
errNodes, index := 2, 0
err := p.requestWithRetry(func(client grpcService.TreeServiceClient) error {
if index < errNodes {
index++
return ErrNodeNotFound
}
return nil
})
require.NoError(t, err)
checkIndicesAndReset(t, p, 0, errNodes)
})
t.Run("error access denied", func(t *testing.T) {
var index int
err := p.requestWithRetry(func(client grpcService.TreeServiceClient) error {
index++
return ErrNodeAccessDenied
})
require.ErrorIs(t, err, ErrNodeAccessDenied)
require.Equal(t, 1, index)
checkIndicesAndReset(t, p, 0, 0)
})
} }
func TestRebalance(t *testing.T) { func TestRebalance(t *testing.T) {

View file

@ -1,10 +1,9 @@
package session_test package session_test
import ( import (
"crypto/rand"
"fmt" "fmt"
"math" "math"
mrand "math/rand" "math/rand"
"testing" "testing"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/refs" "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/refs"
@ -397,7 +396,7 @@ func TestContainer_AppliedTo(t *testing.T) {
func TestContainer_InvalidAt(t *testing.T) { func TestContainer_InvalidAt(t *testing.T) {
var x session.Container var x session.Container
nbf := mrand.Uint64() nbf := rand.Uint64()
if nbf == math.MaxUint64 { if nbf == math.MaxUint64 {
nbf-- nbf--
} }

View file

@ -1,7 +1,7 @@
package user_test package user_test
import ( import (
"crypto/rand" "math/rand"
"testing" "testing"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/refs" "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/refs"