frostfs-sdk-go/pool/pool.go
Alexander Chuprov 7017ae6b28
[#300] pool: Remove obvious comments
Signed-off-by: Alexander Chuprov <a.chuprov@yadro.com>
2025-03-06 17:07:39 +03:00

1767 lines
50 KiB
Go

package pool
import (
"context"
"crypto/ecdsa"
"errors"
"fmt"
"io"
"math"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/accounting"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/ape"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer"
sdkClient "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
frostfsecdsa "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/crypto/ecdsa"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/relations"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
"github.com/google/uuid"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"go.uber.org/zap"
"google.golang.org/grpc"
)
// client represents virtual connection to the single FrostFS network endpoint from which Pool is formed.
// This interface is expected to have exactly one production implementation - clientWrapper.
// Others are expected to be for test purposes only.
type client interface {
// see clientWrapper.balanceGet.
balanceGet(context.Context, PrmBalanceGet) (accounting.Decimal, error)
// see clientWrapper.containerPut.
containerPut(context.Context, PrmContainerPut) (cid.ID, error)
// see clientWrapper.containerGet.
containerGet(context.Context, PrmContainerGet) (container.Container, error)
// see clientWrapper.containerList.
containerList(context.Context, PrmContainerList) ([]cid.ID, error)
// see clientWrapper.containerListStream.
containerListStream(context.Context, PrmListStream) (ResListStream, error)
// see clientWrapper.containerDelete.
containerDelete(context.Context, PrmContainerDelete) error
// see clientWrapper.apeManagerAddChain.
apeManagerAddChain(context.Context, PrmAddAPEChain) error
// see clientWrapper.apeManagerRemoveChain.
apeManagerRemoveChain(context.Context, PrmRemoveAPEChain) error
// see clientWrapper.apeManagerListChains.
apeManagerListChains(context.Context, PrmListAPEChains) ([]ape.Chain, error)
// see clientWrapper.endpointInfo.
endpointInfo(context.Context, prmEndpointInfo) (netmap.NodeInfo, error)
// see clientWrapper.healthcheck.
healthcheck(ctx context.Context) (netmap.NodeInfo, error)
// see clientWrapper.networkInfo.
networkInfo(context.Context, prmNetworkInfo) (netmap.NetworkInfo, error)
// see clientWrapper.netMapSnapshot
netMapSnapshot(context.Context, prmNetMapSnapshot) (netmap.NetMap, error)
// see clientWrapper.objectPut.
objectPut(context.Context, PrmObjectPut) (ResPutObject, error)
// see clientWrapper.objectPatch.
objectPatch(context.Context, PrmObjectPatch) (ResPatchObject, error)
// see clientWrapper.objectDelete.
objectDelete(context.Context, PrmObjectDelete) error
// see clientWrapper.objectGet.
objectGet(context.Context, PrmObjectGet) (ResGetObject, error)
// see clientWrapper.objectHead.
objectHead(context.Context, PrmObjectHead) (object.Object, error)
// see clientWrapper.objectRange.
objectRange(context.Context, PrmObjectRange) (ResObjectRange, error)
// see clientWrapper.objectSearch.
objectSearch(context.Context, PrmObjectSearch) (ResObjectSearch, error)
// see clientWrapper.sessionCreate.
sessionCreate(context.Context, prmCreateSession) (resCreateSession, error)
clientStatus
// see clientWrapper.dial.
dial(ctx context.Context) error
// see clientWrapper.restart.
restart(ctx context.Context) error
// see clientWrapper.close.
close() error
}
// clientStatus provide access to some metrics for connection.
type clientStatus interface {
// isHealthy checks if the connection can handle requests.
isHealthy() bool
// setUnhealthy marks client as unhealthy.
setUnhealthy()
// setHealthy marks client as healthy.
setHealthy()
// address return address of endpoint.
address() string
// currentErrorRate returns current errors rate.
// After specific threshold connection is considered as unhealthy.
// Pool.startRebalance routine can make this connection healthy again.
currentErrorRate() uint32
// overallErrorRate returns the number of all happened errors.
overallErrorRate() uint64
// methodsStatus returns statistic for all used methods.
methodsStatus() []StatusSnapshot
}
// InitParameters contains values used to initialize connection Pool.
type InitParameters struct {
key *ecdsa.PrivateKey
logger *zap.Logger
nodeDialTimeout time.Duration
nodeStreamTimeout time.Duration
healthcheckTimeout time.Duration
clientRebalanceInterval time.Duration
sessionExpirationDuration uint64
errorThreshold uint32
nodeParams []NodeParam
requestCallback func(RequestInfo)
dialOptions []grpc.DialOption
clientBuilder clientBuilder
gracefulCloseOnSwitchTimeout time.Duration
}
// SetKey specifies default key to be used for the protocol communication by default.
func (x *InitParameters) SetKey(key *ecdsa.PrivateKey) {
x.key = key
}
// SetLogger specifies logger.
func (x *InitParameters) SetLogger(logger *zap.Logger) {
x.logger = logger
}
// SetNodeDialTimeout specifies the timeout for connection to be established.
func (x *InitParameters) SetNodeDialTimeout(timeout time.Duration) {
x.nodeDialTimeout = timeout
}
// SetNodeStreamTimeout specifies the timeout for individual operations in streaming RPC.
func (x *InitParameters) SetNodeStreamTimeout(timeout time.Duration) {
x.nodeStreamTimeout = timeout
}
// SetHealthcheckTimeout specifies the timeout for request to node to decide if it is alive.
//
// See also Pool.Dial.
func (x *InitParameters) SetHealthcheckTimeout(timeout time.Duration) {
x.healthcheckTimeout = timeout
}
// SetClientRebalanceInterval specifies the interval for updating nodes health status.
//
// See also Pool.Dial.
func (x *InitParameters) SetClientRebalanceInterval(interval time.Duration) {
x.clientRebalanceInterval = interval
}
// SetGracefulCloseOnSwitchTimeout specifies the timeout after which unhealthy client be closed during rebalancing
// if it will become healthy back.
// Generally this param should be less than client rebalance interval (see SetClientRebalanceInterval).
//
// See also SetErrorThreshold.
func (x *InitParameters) SetGracefulCloseOnSwitchTimeout(timeout time.Duration) {
x.gracefulCloseOnSwitchTimeout = timeout
}
// SetSessionExpirationDuration specifies the session token lifetime in epochs.
func (x *InitParameters) SetSessionExpirationDuration(expirationDuration uint64) {
x.sessionExpirationDuration = expirationDuration
}
// SetErrorThreshold specifies the number of errors on connection after which node is considered as unhealthy.
func (x *InitParameters) SetErrorThreshold(threshold uint32) {
x.errorThreshold = threshold
}
// SetRequestCallback makes the pool client to pass RequestInfo for each
// request to f. Nil (default) means ignore RequestInfo.
func (x *InitParameters) SetRequestCallback(f func(RequestInfo)) {
x.requestCallback = f
}
// AddNode append information about the node to which you want to connect.
func (x *InitParameters) AddNode(nodeParam NodeParam) {
x.nodeParams = append(x.nodeParams, nodeParam)
}
// SetGRPCDialOptions sets the gRPC dial options for new gRPC client connection.
func (x *InitParameters) SetGRPCDialOptions(opts ...grpc.DialOption) {
x.dialOptions = opts
}
// setClientBuilder sets clientBuilder used for client construction.
// Wraps setClientBuilderContext without a context.
func (x *InitParameters) setClientBuilder(builder clientBuilder) {
x.clientBuilder = builder
}
// isMissingClientBuilder checks if client constructor was not specified.
func (x *InitParameters) isMissingClientBuilder() bool {
return x.clientBuilder == nil
}
type rebalanceParameters struct {
nodesParams []*nodesParam
nodeRequestTimeout time.Duration
clientRebalanceInterval time.Duration
sessionExpirationDuration uint64
}
type nodesParam struct {
priority int
addresses []string
weights []float64
}
// NodeParam groups parameters of remote node.
type NodeParam struct {
priority int
address string
weight float64
}
// NewNodeParam creates NodeParam using parameters.
func NewNodeParam(priority int, address string, weight float64) (prm NodeParam) {
prm.SetPriority(priority)
prm.SetAddress(address)
prm.SetWeight(weight)
return
}
// SetPriority specifies priority of the node.
// Negative value is allowed. In the result node groups
// with the same priority will be sorted by descent.
func (x *NodeParam) SetPriority(priority int) {
x.priority = priority
}
// Priority returns priority of the node.
// Requests will be served by nodes subset with the highest priority (the smaller value - the higher priority).
// If there are no healthy nodes in subsets with current or higher priority, requests will be served
// by nodes subset with lower priority.
func (x *NodeParam) Priority() int {
return x.priority
}
// SetAddress specifies address of the node.
func (x *NodeParam) SetAddress(address string) {
x.address = address
}
// Address returns address of the node.
func (x *NodeParam) Address() string {
return x.address
}
// SetWeight specifies weight of the node.
// Weights used to adjust requests' distribution between nodes with the same priority.
func (x *NodeParam) SetWeight(weight float64) {
x.weight = weight
}
// Weight returns weight of the node.
func (x *NodeParam) Weight() float64 {
return x.weight
}
// WaitParams contains parameters used in polling is a something applied on FrostFS network.
type WaitParams struct {
Timeout time.Duration
PollInterval time.Duration
}
// SetTimeout specifies the time to wait for the operation to complete.
//
// Deprecated: Use WaitParams.Timeout instead.
func (x *WaitParams) SetTimeout(timeout time.Duration) {
x.Timeout = timeout
}
// SetPollInterval specifies the interval, once it will check the completion of the operation.
//
// Deprecated: Use WaitParams.PollInterval instead.
func (x *WaitParams) SetPollInterval(tick time.Duration) {
x.PollInterval = tick
}
func defaultWaitParams() *WaitParams {
return &WaitParams{
Timeout: 120 * time.Second,
PollInterval: 5 * time.Second,
}
}
// checkForPositive panics if any of the wait params isn't positive.
func (x *WaitParams) checkForPositive() {
if x.Timeout <= 0 || x.PollInterval <= 0 {
panic("all wait params must be positive")
}
}
// CheckValidity checks if all wait params are non-negative.
func (x *WaitParams) CheckValidity() error {
if x.Timeout <= 0 {
return errors.New("timeout cannot be negative")
}
if x.PollInterval <= 0 {
return errors.New("poll interval cannot be negative")
}
return nil
}
type prmContext struct {
defaultSession bool
verb session.ObjectVerb
cnr cid.ID
objSet bool
objs []oid.ID
}
func (x *prmContext) useDefaultSession() {
x.defaultSession = true
}
func (x *prmContext) useContainer(cnr cid.ID) {
x.cnr = cnr
}
func (x *prmContext) useObjects(ids []oid.ID) {
x.objs = ids
x.objSet = true
}
func (x *prmContext) useAddress(addr oid.Address) {
x.cnr = addr.Container()
x.objs = []oid.ID{addr.Object()}
x.objSet = true
}
func (x *prmContext) useVerb(verb session.ObjectVerb) {
x.verb = verb
}
type prmCommon struct {
key *ecdsa.PrivateKey
btoken *bearer.Token
stoken *session.Object
}
// UseKey specifies private key to sign the requests.
// If key is not provided, then Pool default key is used.
func (x *prmCommon) UseKey(key *ecdsa.PrivateKey) {
x.key = key
}
// UseBearer attaches bearer token to be used for the operation.
func (x *prmCommon) UseBearer(token bearer.Token) {
x.btoken = &token
}
// UseSession specifies session within which operation should be performed.
func (x *prmCommon) UseSession(token session.Object) {
x.stoken = &token
}
// PrmObjectPut groups parameters of PutObject operation.
type PrmObjectPut struct {
prmCommon
hdr object.Object
payload io.Reader
copiesNumber []uint32
clientCut bool
networkInfo netmap.NetworkInfo
withoutHomomorphicHash bool
bufferMaxSize uint64
}
// SetHeader specifies header of the object.
func (x *PrmObjectPut) SetHeader(hdr object.Object) {
x.hdr = hdr
}
// SetPayload specifies payload of the object.
func (x *PrmObjectPut) SetPayload(payload io.Reader) {
x.payload = payload
}
// SetCopiesNumber sets number of object copies that is enough to consider put successful.
// Zero means using default behavior.
func (x *PrmObjectPut) SetCopiesNumber(copiesNumber uint32) {
x.copiesNumber = []uint32{copiesNumber}
}
// SetCopiesNumberVector sets number of object copies that is enough to consider put successful, provided as array.
// Nil/empty vector means using default behavior.
func (x *PrmObjectPut) SetCopiesNumberVector(copiesNumber []uint32) {
x.copiesNumber = copiesNumber
}
// SetClientCut enables client cut for objects. It means that full object is prepared on client side
// and retrying is possible. But this leads to additional memory using for buffering object parts.
// Buffer size for every put is MaxObjectSize value from FrostFS network.
// There is limit for total memory allocation for in-flight request and
// can be set by InitParameters.SetMaxClientCutMemory (default value is 1gb).
// Put requests will fail if this limit be reached.
func (x *PrmObjectPut) SetClientCut(clientCut bool) {
x.clientCut = clientCut
}
// WithoutHomomorphicHash if set to true do not use Tillich-Zémor hash for payload.
func (x *PrmObjectPut) WithoutHomomorphicHash(v bool) {
x.withoutHomomorphicHash = v
}
// SetBufferMaxSize sets max buffer size to read payload.
// This value isn't used if object size is set explicitly and less than this value.
// Default value 3MB.
func (x *PrmObjectPut) SetBufferMaxSize(size uint64) {
x.bufferMaxSize = size
}
func (x *PrmObjectPut) setNetworkInfo(ni netmap.NetworkInfo) {
x.networkInfo = ni
}
// PrmObjectPatch groups parameters of PatchObject operation.
type PrmObjectPatch struct {
prmCommon
addr oid.Address
rng *object.Range
payload io.Reader
newAttrs []object.Attribute
replaceAttrs bool
maxPayloadPatchChunkLength int
}
// SetAddress sets the address of the object that is patched.
func (x *PrmObjectPatch) SetAddress(addr oid.Address) {
x.addr = addr
}
// SetRange sets the patch's range.
func (x *PrmObjectPatch) SetRange(rng *object.Range) {
x.rng = rng
}
// SetPayloadReader sets a payload reader.
func (x *PrmObjectPatch) SetPayloadReader(payload io.Reader) {
x.payload = payload
}
// SetRange sets the new attributes to the patch.
func (x *PrmObjectPatch) SetNewAttributes(newAttrs []object.Attribute) {
x.newAttrs = newAttrs
}
// SetRange sets the replace attributes flag to the patch.
func (x *PrmObjectPatch) SetReplaceAttributes(replaceAttrs bool) {
x.replaceAttrs = replaceAttrs
}
// SetMaxPayloadPatchChunkSize sets a max buf size to read the patch's payload.
func (x *PrmObjectPatch) SetMaxPayloadPatchChunkSize(maxPayloadPatchChunkSize int) {
x.maxPayloadPatchChunkLength = maxPayloadPatchChunkSize
}
// PrmObjectDelete groups parameters of DeleteObject operation.
type PrmObjectDelete struct {
prmCommon
addr oid.Address
}
// SetAddress specifies FrostFS address of the object.
func (x *PrmObjectDelete) SetAddress(addr oid.Address) {
x.addr = addr
}
// PrmObjectGet groups parameters of GetObject operation.
type PrmObjectGet struct {
prmCommon
addr oid.Address
}
// SetAddress specifies FrostFS address of the object.
func (x *PrmObjectGet) SetAddress(addr oid.Address) {
x.addr = addr
}
// PrmObjectHead groups parameters of HeadObject operation.
type PrmObjectHead struct {
prmCommon
addr oid.Address
raw bool
}
// SetAddress specifies FrostFS address of the object.
func (x *PrmObjectHead) SetAddress(addr oid.Address) {
x.addr = addr
}
// MarkRaw marks an intent to read physically stored object.
func (x *PrmObjectHead) MarkRaw() {
x.raw = true
}
// PrmObjectRange groups parameters of RangeObject operation.
type PrmObjectRange struct {
prmCommon
addr oid.Address
off, ln uint64
}
// SetAddress specifies FrostFS address of the object.
func (x *PrmObjectRange) SetAddress(addr oid.Address) {
x.addr = addr
}
// SetOffset sets offset of the payload range to be read.
func (x *PrmObjectRange) SetOffset(offset uint64) {
x.off = offset
}
// SetLength sets length of the payload range to be read.
func (x *PrmObjectRange) SetLength(length uint64) {
x.ln = length
}
// PrmObjectSearch groups parameters of SearchObjects operation.
type PrmObjectSearch struct {
prmCommon
cnrID cid.ID
filters object.SearchFilters
}
// SetContainerID specifies the container in which to look for objects.
func (x *PrmObjectSearch) SetContainerID(cnrID cid.ID) {
x.cnrID = cnrID
}
// SetFilters specifies filters by which to select objects.
func (x *PrmObjectSearch) SetFilters(filters object.SearchFilters) {
x.filters = filters
}
// PrmContainerPut groups parameters of PutContainer operation.
type PrmContainerPut struct {
ClientParams sdkClient.PrmContainerPut
WaitParams *WaitParams
}
// SetContainer container structure to be used as a parameter of the base
// client's operation.
//
// See git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client.PrmContainerPut.SetContainer.
//
// Deprecated: Use PrmContainerPut.ClientParams.Container instead.
func (x *PrmContainerPut) SetContainer(cnr container.Container) {
x.ClientParams.SetContainer(cnr)
}
// WithinSession specifies session to be used as a parameter of the base
// client's operation.
//
// See git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client.PrmContainerPut.WithinSession.
//
// Deprecated: Use PrmContainerPut.ClientParams.Session instead.
func (x *PrmContainerPut) WithinSession(s session.Container) {
x.ClientParams.WithinSession(s)
}
// SetWaitParams specifies timeout params to complete operation.
// If not provided the default one will be used.
// Panics if any of the wait params isn't positive.
//
// Deprecated: Use PrmContainerPut.ClientParams.WaitParams instead.
func (x *PrmContainerPut) SetWaitParams(waitParams WaitParams) {
waitParams.checkForPositive()
x.WaitParams = &waitParams
}
// PrmContainerGet groups parameters of GetContainer operation.
type PrmContainerGet struct {
ContainerID cid.ID
Session *session.Container
}
// SetContainerID specifies identifier of the container to be read.
//
// Deprecated: Use PrmContainerGet.ContainerID instead.
func (prm *PrmContainerGet) SetContainerID(cnrID cid.ID) {
prm.ContainerID = cnrID
}
// PrmContainerList groups parameters of ListContainers operation.
type PrmContainerList struct {
OwnerID user.ID
Session *session.Container
}
// SetOwnerID specifies identifier of the FrostFS account to list the containers.
//
// Deprecated: Use PrmContainerList.OwnerID instead.
func (x *PrmContainerList) SetOwnerID(ownerID user.ID) {
x.OwnerID = ownerID
}
// PrmContainerDelete groups parameters of DeleteContainer operation.
type PrmContainerDelete struct {
ContainerID cid.ID
Session *session.Container
WaitParams *WaitParams
}
// SetContainerID specifies identifier of the FrostFS container to be removed.
//
// Deprecated: Use PrmContainerDelete.ContainerID instead.
func (x *PrmContainerDelete) SetContainerID(cnrID cid.ID) {
x.ContainerID = cnrID
}
// SetSessionToken specifies session within which operation should be performed.
//
// Deprecated: Use PrmContainerDelete.Session instead.
func (x *PrmContainerDelete) SetSessionToken(token session.Container) {
x.Session = &token
}
// SetWaitParams specifies timeout params to complete operation.
// If not provided the default one will be used.
// Panics if any of the wait params isn't positive.
//
// Deprecated: Use PrmContainerDelete.WaitParams instead.
func (x *PrmContainerDelete) SetWaitParams(waitParams WaitParams) {
waitParams.checkForPositive()
x.WaitParams = &waitParams
}
type PrmAddAPEChain struct {
Target ape.ChainTarget
Chain ape.Chain
}
type PrmRemoveAPEChain struct {
Target ape.ChainTarget
ChainID ape.ChainID
}
type PrmListAPEChains struct {
Target ape.ChainTarget
}
// PrmBalanceGet groups parameters of Balance operation.
type PrmBalanceGet struct {
account user.ID
}
// SetAccount specifies identifier of the FrostFS account for which the balance is requested.
func (x *PrmBalanceGet) SetAccount(id user.ID) {
x.account = id
}
// prmEndpointInfo groups parameters of sessionCreate operation.
type prmCreateSession struct {
exp uint64
key ecdsa.PrivateKey
}
// setExp sets number of the last FrostFS epoch in the lifetime of the session after which it will be expired.
func (x *prmCreateSession) setExp(exp uint64) {
x.exp = exp
}
// useKey specifies owner private key for session token.
// If key is not provided, then Pool default key is used.
func (x *prmCreateSession) useKey(key ecdsa.PrivateKey) {
x.key = key
}
// prmEndpointInfo groups parameters of endpointInfo operation.
type prmEndpointInfo struct{}
// prmNetworkInfo groups parameters of networkInfo operation.
type prmNetworkInfo struct{}
// prmNetMapSnapshot groups parameters of netMapSnapshot operation.
type prmNetMapSnapshot struct{}
// resCreateSession groups resulting values of sessionCreate operation.
type resCreateSession struct {
id []byte
sessionKey []byte
}
// Pool represents virtual connection to the FrostFS network to communicate
// with multiple FrostFS servers without thinking about switching between servers
// due to load balancing proportions or their unavailability.
// It is designed to provide a convenient abstraction from the multiple sdkClient.client types.
//
// Pool can be created and initialized using NewPool function.
// Before executing the FrostFS operations using the Pool, connection to the
// servers MUST BE correctly established (see Dial method).
// Using the Pool before connecting have been established can lead to a panic.
// After the work, the Pool SHOULD BE closed (see Close method): it frees internal
// and system resources which were allocated for the period of work of the Pool.
// Calling Dial/Close methods during the communication process step strongly discouraged
// as it leads to undefined behavior.
//
// Each method which produces a FrostFS API call may return an error.
// Status of underlying server response is casted to built-in error instance.
// Certain statuses can be checked using `sdkClient` and standard `errors` packages.
// Note that package provides some helper functions to work with status returns
// (e.g. sdkClient.IsErrContainerNotFound, sdkClient.IsErrObjectNotFound).
//
// See pool package overview to get some examples.
type Pool struct {
manager *connectionManager
logger *zap.Logger
key *ecdsa.PrivateKey
cache *sessionCache
stokenDuration uint64
maxObjectSize uint64
}
const (
defaultSessionTokenExpirationDuration = 100 // in epochs
defaultErrorThreshold = 100
defaultGracefulCloseOnSwitchTimeout = 10 * time.Second
defaultRebalanceInterval = 15 * time.Second
defaultHealthcheckTimeout = 4 * time.Second
defaultDialTimeout = 5 * time.Second
defaultStreamTimeout = 10 * time.Second
defaultBufferMaxSizeForPut = 3 * 1024 * 1024 // 3 MB
)
// NewPool returns an instance of Pool configured according to the parameters.
//
// Before using Pool, you MUST call Dial.
func NewPool(options InitParameters) (*Pool, error) {
cache, err := newCache(options.sessionExpirationDuration)
if err != nil {
return nil, fmt.Errorf("couldn't create cache: %w", err)
}
fillDefaultInitParams(&options, cache)
manager, err := newConnectionManager(options)
if err != nil {
return nil, err
}
pool := &Pool{
cache: cache,
key: options.key,
logger: options.logger,
manager: manager,
stokenDuration: options.sessionExpirationDuration,
}
return pool, nil
}
// Dial establishes a connection to the servers from the FrostFS network.
// It also starts a routine that checks the health of the nodes and
// updates the weights of the nodes for balancing.
// Returns an error describing failure reason.
//
// If failed, the Pool SHOULD NOT be used.
//
// See also InitParameters.SetClientRebalanceInterval.
func (p *Pool) Dial(ctx context.Context) error {
err := p.manager.dial(ctx)
if err != nil {
return err
}
var atLeastOneHealthy bool
p.manager.iterate(func(cl client) {
var st session.Object
err := initSessionForDuration(ctx, &st, cl, p.manager.rebalanceParams.sessionExpirationDuration, *p.key, false)
if err != nil {
if p.logger != nil {
p.logger.Log(zap.WarnLevel, "failed to create frostfs session token for client",
zap.String("address", cl.address()), zap.Error(err))
}
return
}
_ = p.cache.Put(formCacheKey(cl.address(), p.key, false), st)
atLeastOneHealthy = true
})
if !atLeastOneHealthy {
return fmt.Errorf("at least one node must be healthy")
}
ni, err := p.NetworkInfo(ctx)
if err != nil {
return fmt.Errorf("get network info for max object size: %w", err)
}
p.maxObjectSize = ni.MaxObjectSize()
return nil
}
func fillDefaultInitParams(params *InitParameters, cache *sessionCache) {
if params.sessionExpirationDuration == 0 {
params.sessionExpirationDuration = defaultSessionTokenExpirationDuration
}
if params.errorThreshold == 0 {
params.errorThreshold = defaultErrorThreshold
}
if params.clientRebalanceInterval <= 0 {
params.clientRebalanceInterval = defaultRebalanceInterval
}
if params.gracefulCloseOnSwitchTimeout <= 0 {
params.gracefulCloseOnSwitchTimeout = defaultGracefulCloseOnSwitchTimeout
}
if params.healthcheckTimeout <= 0 {
params.healthcheckTimeout = defaultHealthcheckTimeout
}
if params.nodeDialTimeout <= 0 {
params.nodeDialTimeout = defaultDialTimeout
}
if params.nodeStreamTimeout <= 0 {
params.nodeStreamTimeout = defaultStreamTimeout
}
if cache.tokenDuration == 0 {
cache.tokenDuration = defaultSessionTokenExpirationDuration
}
if params.isMissingClientBuilder() {
params.setClientBuilder(func(addr string) client {
var prm wrapperPrm
prm.setAddress(addr)
prm.setKey(*params.key)
prm.setLogger(params.logger)
prm.setDialTimeout(params.nodeDialTimeout)
prm.setStreamTimeout(params.nodeStreamTimeout)
prm.setErrorThreshold(params.errorThreshold)
prm.setGracefulCloseOnSwitchTimeout(params.gracefulCloseOnSwitchTimeout)
prm.setPoolRequestCallback(params.requestCallback)
prm.setGRPCDialOptions(params.dialOptions)
prm.setResponseInfoCallback(func(info sdkClient.ResponseMetaInfo) error {
cache.updateEpoch(info.Epoch())
return nil
})
return newWrapper(prm)
})
}
}
func formCacheKey(address string, key *ecdsa.PrivateKey, clientCut bool) string {
k := keys.PrivateKey{PrivateKey: *key}
stype := "server"
if clientCut {
stype = "client"
}
return address + stype + k.String()
}
func (p *Pool) checkSessionTokenErr(err error, address string) bool {
if err == nil {
return false
}
if sdkClient.IsErrSessionNotFound(err) || sdkClient.IsErrSessionExpired(err) {
p.cache.DeleteByPrefix(address)
return true
}
return false
}
func initSessionForDuration(ctx context.Context, dst *session.Object, c client, dur uint64, ownerKey ecdsa.PrivateKey, clientCut bool) error {
ni, err := c.networkInfo(ctx, prmNetworkInfo{})
if err != nil {
return err
}
epoch := ni.CurrentEpoch()
var exp uint64
if math.MaxUint64-epoch < dur {
exp = math.MaxUint64
} else {
exp = epoch + dur
}
var prm prmCreateSession
prm.setExp(exp)
prm.useKey(ownerKey)
var (
id uuid.UUID
key frostfsecdsa.PublicKey
)
if clientCut {
id = uuid.New()
key = frostfsecdsa.PublicKey(ownerKey.PublicKey)
} else {
res, err := c.sessionCreate(ctx, prm)
if err != nil {
return err
}
if err = id.UnmarshalBinary(res.id); err != nil {
return fmt.Errorf("invalid session token ID: %w", err)
}
if err = key.Decode(res.sessionKey); err != nil {
return fmt.Errorf("invalid public session key: %w", err)
}
}
dst.SetID(id)
dst.SetAuthKey(&key)
dst.SetExp(exp)
return nil
}
type callContext struct {
client client
// client endpoint
endpoint string
// request signer
key *ecdsa.PrivateKey
// flag to open default session if session token is missing
sessionDefault bool
sessionTarget func(session.Object)
sessionVerb session.ObjectVerb
sessionCnr cid.ID
sessionObjSet bool
sessionObjs []oid.ID
sessionClientCut bool
}
func (p *Pool) initCall(ctxCall *callContext, cfg prmCommon, prmCtx prmContext) error {
p.fillAppropriateKey(&cfg)
cp, err := p.manager.connection()
if err != nil {
return err
}
ctxCall.key = cfg.key
if ctxCall.key == nil {
// use pool key if caller didn't specify its own
ctxCall.key = p.key
}
ctxCall.endpoint = cp.address()
ctxCall.client = cp
if ctxCall.sessionTarget != nil && cfg.stoken != nil {
ctxCall.sessionTarget(*cfg.stoken)
}
// note that we don't override session provided by the caller
ctxCall.sessionDefault = cfg.stoken == nil && prmCtx.defaultSession
if ctxCall.sessionDefault {
ctxCall.sessionVerb = prmCtx.verb
ctxCall.sessionCnr = prmCtx.cnr
ctxCall.sessionObjSet = prmCtx.objSet
ctxCall.sessionObjs = prmCtx.objs
}
return err
}
// opens new session or uses cached one.
// Must be called only on initialized callContext with set sessionTarget.
func (p *Pool) openDefaultSession(ctx context.Context, cc *callContext) error {
cacheKey := formCacheKey(cc.endpoint, cc.key, cc.sessionClientCut)
tok, ok := p.cache.Get(cacheKey)
if !ok {
// init new session
err := initSessionForDuration(ctx, &tok, cc.client, p.stokenDuration, *cc.key, cc.sessionClientCut)
if err != nil {
return fmt.Errorf("session API client: %w", err)
}
// cache the opened session
p.cache.Put(cacheKey, tok)
}
tok.ForVerb(cc.sessionVerb)
tok.BindContainer(cc.sessionCnr)
if cc.sessionObjSet {
tok.LimitByObjects(cc.sessionObjs...)
}
// sign the token
if err := tok.Sign(*cc.key); err != nil {
return fmt.Errorf("sign token of the opened session: %w", err)
}
cc.sessionTarget(tok)
return nil
}
// opens default session (if sessionDefault is set), and calls f. If f returns
// session-related error then cached token is removed.
func (p *Pool) call(ctx context.Context, cc *callContext, f func() error) error {
var err error
if cc.sessionDefault {
err = p.openDefaultSession(ctx, cc)
if err != nil {
return fmt.Errorf("open default session: %w", err)
}
}
err = f()
_ = p.checkSessionTokenErr(err, cc.endpoint)
return err
}
// fillAppropriateKey use pool key if caller didn't specify its own.
func (p *Pool) fillAppropriateKey(prm *prmCommon) {
if prm.key == nil {
prm.key = p.key
}
}
// ResPutObject is designed to provide identifier and creation epoch of the saved object.
type ResPutObject struct {
ObjectID oid.ID
Epoch uint64
}
// ResPatchObject is designed to provide identifier for the saved patched object.
type ResPatchObject struct {
ObjectID oid.ID
}
// PatchObject patches an object through a remote server using FrostFS API protocol.
func (p *Pool) PatchObject(ctx context.Context, prm PrmObjectPatch) (ResPatchObject, error) {
var prmCtx prmContext
prmCtx.useDefaultSession()
prmCtx.useVerb(session.VerbObjectPatch)
prmCtx.useContainer(prm.addr.Container())
var ctxCall callContext
if err := p.initCall(&ctxCall, prm.prmCommon, prmCtx); err != nil {
return ResPatchObject{}, fmt.Errorf("init call context: %w", err)
}
if ctxCall.sessionDefault {
ctxCall.sessionTarget = prm.UseSession
if err := p.openDefaultSession(ctx, &ctxCall); err != nil {
return ResPatchObject{}, fmt.Errorf("open default session: %w", err)
}
}
res, err := ctxCall.client.objectPatch(ctx, prm)
if err != nil {
// removes session token from cache in case of token error
p.checkSessionTokenErr(err, ctxCall.endpoint)
return ResPatchObject{}, fmt.Errorf("init patching on API client %s: %w", ctxCall.endpoint, err)
}
return res, nil
}
// PutObject writes an object through a remote server using FrostFS API protocol.
func (p *Pool) PutObject(ctx context.Context, prm PrmObjectPut) (ResPutObject, error) {
cnr, _ := prm.hdr.ContainerID()
var prmCtx prmContext
prmCtx.useDefaultSession()
prmCtx.useVerb(session.VerbObjectPut)
prmCtx.useContainer(cnr)
var ctxCall callContext
ctxCall.sessionClientCut = prm.clientCut
if err := p.initCall(&ctxCall, prm.prmCommon, prmCtx); err != nil {
return ResPutObject{}, fmt.Errorf("init call context: %w", err)
}
if ctxCall.sessionDefault {
ctxCall.sessionTarget = prm.UseSession
if err := p.openDefaultSession(ctx, &ctxCall); err != nil {
return ResPutObject{}, fmt.Errorf("open default session: %w", err)
}
}
if prm.clientCut {
var ni netmap.NetworkInfo
ni.SetCurrentEpoch(p.cache.Epoch())
ni.SetMaxObjectSize(p.maxObjectSize) // we want to use initial max object size in PayloadSizeLimiter
prm.setNetworkInfo(ni)
}
res, err := ctxCall.client.objectPut(ctx, prm)
if err != nil {
// removes session token from cache in case of token error
p.checkSessionTokenErr(err, ctxCall.endpoint)
return ResPutObject{}, fmt.Errorf("init writing on API client %s: %w", ctxCall.endpoint, err)
}
return res, nil
}
// DeleteObject marks an object for deletion from the container using FrostFS API protocol.
// As a marker, a special unit called a tombstone is placed in the container.
// It confirms the user's intent to delete the object, and is itself a container object.
// Explicit deletion is done asynchronously, and is generally not guaranteed.
func (p *Pool) DeleteObject(ctx context.Context, prm PrmObjectDelete) error {
var prmCtx prmContext
prmCtx.useDefaultSession()
prmCtx.useVerb(session.VerbObjectDelete)
prmCtx.useAddress(prm.addr)
if prm.stoken == nil { // collect phy objects only if we are about to open default session
var tokens relations.Tokens
tokens.Bearer = prm.btoken
relatives, err := relations.ListAllRelations(ctx, p, prm.addr.Container(), prm.addr.Object(), tokens)
if err != nil {
return fmt.Errorf("failed to collect relatives: %w", err)
}
if len(relatives) != 0 {
prmCtx.useContainer(prm.addr.Container())
prmCtx.useObjects(append(relatives, prm.addr.Object()))
}
}
var cc callContext
cc.sessionTarget = prm.UseSession
err := p.initCall(&cc, prm.prmCommon, prmCtx)
if err != nil {
return err
}
return p.call(ctx, &cc, func() error {
if err = cc.client.objectDelete(ctx, prm); err != nil {
return fmt.Errorf("remove object via client %s: %w", cc.endpoint, err)
}
return nil
})
}
type objectReadCloser struct {
reader *sdkClient.ObjectReader
elapsedTimeCallback func(time.Duration)
}
// Read implements io.Reader of the object payload.
func (x *objectReadCloser) Read(p []byte) (int, error) {
start := time.Now()
n, err := x.reader.Read(p)
x.elapsedTimeCallback(time.Since(start))
return n, err
}
// Close implements io.Closer of the object payload.
func (x *objectReadCloser) Close() error {
_, err := x.reader.Close()
return err
}
// ResGetObject is designed to provide object header nad read one object payload from FrostFS system.
type ResGetObject struct {
Header object.Object
Payload io.ReadCloser
}
// GetObject reads object header and initiates reading an object payload through a remote server using FrostFS API protocol.
func (p *Pool) GetObject(ctx context.Context, prm PrmObjectGet) (ResGetObject, error) {
var cc callContext
cc.sessionTarget = prm.UseSession
var res ResGetObject
err := p.initCall(&cc, prm.prmCommon, prmContext{})
if err != nil {
return res, err
}
return res, p.call(ctx, &cc, func() error {
res, err = cc.client.objectGet(ctx, prm)
if err != nil {
return fmt.Errorf("get object via client %s: %w", cc.endpoint, err)
}
return nil
})
}
// HeadObject reads object header through a remote server using FrostFS API protocol.
func (p *Pool) HeadObject(ctx context.Context, prm PrmObjectHead) (object.Object, error) {
var cc callContext
cc.sessionTarget = prm.UseSession
var obj object.Object
err := p.initCall(&cc, prm.prmCommon, prmContext{})
if err != nil {
return obj, err
}
return obj, p.call(ctx, &cc, func() error {
obj, err = cc.client.objectHead(ctx, prm)
if err != nil {
return fmt.Errorf("head object via client %s: %w", cc.endpoint, err)
}
return nil
})
}
// ResObjectRange is designed to read payload range of one object
// from FrostFS system.
//
// Must be initialized using Pool.ObjectRange, any other
// usage is unsafe.
type ResObjectRange struct {
payload *sdkClient.ObjectRangeReader
elapsedTimeCallback func(time.Duration)
}
// Read implements io.Reader of the object payload.
func (x *ResObjectRange) Read(p []byte) (int, error) {
start := time.Now()
n, err := x.payload.Read(p)
x.elapsedTimeCallback(time.Since(start))
return n, err
}
// Close ends reading the payload range and returns the result of the operation
// along with the final results. Must be called after using the ResObjectRange.
func (x *ResObjectRange) Close() error {
_, err := x.payload.Close()
return err
}
// ObjectRange initiates reading an object's payload range through a remote
// server using FrostFS API protocol.
func (p *Pool) ObjectRange(ctx context.Context, prm PrmObjectRange) (ResObjectRange, error) {
var cc callContext
cc.sessionTarget = prm.UseSession
var res ResObjectRange
err := p.initCall(&cc, prm.prmCommon, prmContext{})
if err != nil {
return res, err
}
return res, p.call(ctx, &cc, func() error {
res, err = cc.client.objectRange(ctx, prm)
if err != nil {
return fmt.Errorf("object range via client %s: %w", cc.endpoint, err)
}
return nil
})
}
// ResObjectSearch is designed to read list of object identifiers from FrostFS system.
//
// Must be initialized using Pool.SearchObjects, any other usage is unsafe.
type ResObjectSearch struct {
r *sdkClient.ObjectListReader
handleError func(context.Context, apistatus.Status, error) error
}
// Read reads another list of the object identifiers.
func (x *ResObjectSearch) Read(buf []oid.ID) (int, error) {
n, ok := x.r.Read(buf)
if !ok {
res, err := x.r.Close()
if err == nil {
return n, io.EOF
}
var status apistatus.Status
if res != nil {
status = res.Status()
}
err = x.handleError(nil, status, err)
return n, err
}
return n, nil
}
// Iterate iterates over the list of found object identifiers.
// f can return true to stop iteration earlier.
//
// Returns an error if object can't be read.
func (x *ResObjectSearch) Iterate(f func(oid.ID) bool) error {
return x.r.Iterate(f)
}
// Close ends reading list of the matched objects and returns the result of the operation
// along with the final results. Must be called after using the ResObjectSearch.
func (x *ResObjectSearch) Close() {
_, _ = x.r.Close()
}
// SearchObjects initiates object selection through a remote server using FrostFS API protocol.
//
// The call only opens the transmission channel, explicit fetching of matched objects
// is done using the ResObjectSearch. Resulting reader must be finally closed.
func (p *Pool) SearchObjects(ctx context.Context, prm PrmObjectSearch) (ResObjectSearch, error) {
var cc callContext
cc.sessionTarget = prm.UseSession
var res ResObjectSearch
err := p.initCall(&cc, prm.prmCommon, prmContext{})
if err != nil {
return res, err
}
return res, p.call(ctx, &cc, func() error {
res, err = cc.client.objectSearch(ctx, prm)
if err != nil {
return fmt.Errorf("search object via client %s: %w", cc.endpoint, err)
}
return nil
})
}
// PutContainer sends request to save container in FrostFS and waits for the operation to complete.
//
// Waiting parameters can be specified using SetWaitParams. If not called, defaults are used:
//
// polling interval: 5s
// waiting timeout: 120s
//
// Success can be verified by reading by identifier (see GetContainer).
func (p *Pool) PutContainer(ctx context.Context, prm PrmContainerPut) (cid.ID, error) {
cp, err := p.manager.connection()
if err != nil {
return cid.ID{}, err
}
cnrID, err := cp.containerPut(ctx, prm)
if err != nil {
return cid.ID{}, fmt.Errorf("put container via client '%s': %w", cp.address(), err)
}
return cnrID, nil
}
// GetContainer reads FrostFS container by ID.
func (p *Pool) GetContainer(ctx context.Context, prm PrmContainerGet) (container.Container, error) {
cp, err := p.manager.connection()
if err != nil {
return container.Container{}, err
}
cnrs, err := cp.containerGet(ctx, prm)
if err != nil {
return container.Container{}, fmt.Errorf("get container via client '%s': %w", cp.address(), err)
}
return cnrs, nil
}
// ListContainers requests identifiers of the account-owned containers.
func (p *Pool) ListContainers(ctx context.Context, prm PrmContainerList) ([]cid.ID, error) {
cp, err := p.manager.connection()
if err != nil {
return nil, err
}
cnrIDs, err := cp.containerList(ctx, prm)
if err != nil {
return []cid.ID{}, fmt.Errorf("list containers via client '%s': %w", cp.address(), err)
}
return cnrIDs, nil
}
// ListContainersStream requests identifiers of the account-owned containers.
func (p *Pool) ListContainersStream(ctx context.Context, prm PrmListStream) (ResListStream, error) {
var res ResListStream
cp, err := p.manager.connection()
if err != nil {
return res, err
}
res, err = cp.containerListStream(ctx, prm)
if err != nil {
return res, fmt.Errorf("list containers stream via client '%s': %w", cp.address(), err)
}
return res, nil
}
// DeleteContainer sends request to remove the FrostFS container and waits for the operation to complete.
//
// Waiting parameters can be specified using SetWaitParams. If not called, defaults are used:
//
// polling interval: 5s
// waiting timeout: 120s
//
// Success can be verified by reading by identifier (see GetContainer).
func (p *Pool) DeleteContainer(ctx context.Context, prm PrmContainerDelete) error {
cp, err := p.manager.connection()
if err != nil {
return err
}
err = cp.containerDelete(ctx, prm)
if err != nil {
return fmt.Errorf("delete container via client '%s': %w", cp.address(), err)
}
return nil
}
// AddAPEChain sends a request to set APE chain rules for a target (basically, for a container).
func (p *Pool) AddAPEChain(ctx context.Context, prm PrmAddAPEChain) error {
cp, err := p.manager.connection()
if err != nil {
return err
}
err = cp.apeManagerAddChain(ctx, prm)
if err != nil {
return fmt.Errorf("add ape chain via client '%s': %w", cp.address(), err)
}
return nil
}
// RemoveAPEChain sends a request to remove APE chain rules for a target.
func (p *Pool) RemoveAPEChain(ctx context.Context, prm PrmRemoveAPEChain) error {
cp, err := p.manager.connection()
if err != nil {
return err
}
err = cp.apeManagerRemoveChain(ctx, prm)
if err != nil {
return fmt.Errorf("remove ape chain via client '%s': %w", cp.address(), err)
}
return nil
}
// ListAPEChains sends a request to list APE chains rules for a target.
func (p *Pool) ListAPEChains(ctx context.Context, prm PrmListAPEChains) ([]ape.Chain, error) {
cp, err := p.manager.connection()
if err != nil {
return nil, err
}
chains, err := cp.apeManagerListChains(ctx, prm)
if err != nil {
return nil, fmt.Errorf("list ape chains via client '%s': %w", cp.address(), err)
}
return chains, nil
}
// Balance requests current balance of the FrostFS account.
func (p *Pool) Balance(ctx context.Context, prm PrmBalanceGet) (accounting.Decimal, error) {
cp, err := p.manager.connection()
if err != nil {
return accounting.Decimal{}, err
}
balance, err := cp.balanceGet(ctx, prm)
if err != nil {
return accounting.Decimal{}, fmt.Errorf("get balance via client '%s': %w", cp.address(), err)
}
return balance, nil
}
// Statistic returns connection statistics.
func (p Pool) Statistic() Statistic {
return p.manager.Statistic()
}
// waitForContainerPresence waits until the container is found on the FrostFS network.
func waitForContainerPresence(ctx context.Context, cli client, prm PrmContainerGet, waitParams *WaitParams) error {
return waitFor(ctx, waitParams, func(ctx context.Context) bool {
_, err := cli.containerGet(ctx, prm)
return err == nil
})
}
// waitForContainerRemoved waits until the container is removed from the FrostFS network.
func waitForContainerRemoved(ctx context.Context, cli client, prm PrmContainerGet, waitParams *WaitParams) error {
return waitFor(ctx, waitParams, func(ctx context.Context) bool {
_, err := cli.containerGet(ctx, prm)
return sdkClient.IsErrContainerNotFound(err)
})
}
// waitFor await that given condition will be met in waitParams time.
func waitFor(ctx context.Context, params *WaitParams, condition func(context.Context) bool) error {
wctx, cancel := context.WithTimeout(ctx, params.Timeout)
defer cancel()
ticker := time.NewTimer(params.PollInterval)
defer ticker.Stop()
wdone := wctx.Done()
done := ctx.Done()
for {
select {
case <-done:
return ctx.Err()
case <-wdone:
return wctx.Err()
case <-ticker.C:
if condition(ctx) {
return nil
}
ticker.Reset(params.PollInterval)
}
}
}
// NetworkInfo requests information about the FrostFS network of which the remote server is a part.
func (p *Pool) NetworkInfo(ctx context.Context) (netmap.NetworkInfo, error) {
cp, err := p.manager.connection()
if err != nil {
return netmap.NetworkInfo{}, err
}
netInfo, err := cp.networkInfo(ctx, prmNetworkInfo{})
if err != nil {
return netmap.NetworkInfo{}, fmt.Errorf("get network info via client '%s': %w", cp.address(), err)
}
return netInfo, nil
}
// NetMapSnapshot requests information about the FrostFS network map.
func (p *Pool) NetMapSnapshot(ctx context.Context) (netmap.NetMap, error) {
cp, err := p.manager.connection()
if err != nil {
return netmap.NetMap{}, err
}
netMap, err := cp.netMapSnapshot(ctx, prmNetMapSnapshot{})
if err != nil {
return netmap.NetMap{}, fmt.Errorf("get network map via client '%s': %w", cp.address(), err)
}
return netMap, nil
}
// Close closes the Pool and releases all the associated resources.
func (p *Pool) Close() {
p.manager.close()
}
// SyncContainerWithNetwork applies network configuration received via
// the Pool to the container. Changes the container if it does not satisfy
// network configuration.
//
// Pool and container MUST not be nil.
//
// Returns any error that does not allow reading configuration
// from the network.
func SyncContainerWithNetwork(ctx context.Context, cnr *container.Container, p *Pool) error {
ni, err := p.NetworkInfo(ctx)
if err != nil {
return fmt.Errorf("network info: %w", err)
}
container.ApplyNetworkConfig(cnr, ni)
return nil
}
// GetSplitInfo implements relations.Relations.
func (p *Pool) GetSplitInfo(ctx context.Context, cnrID cid.ID, objID oid.ID, tokens relations.Tokens) (*object.SplitInfo, error) {
var addr oid.Address
addr.SetContainer(cnrID)
addr.SetObject(objID)
var prm PrmObjectHead
prm.SetAddress(addr)
if tokens.Bearer != nil {
prm.UseBearer(*tokens.Bearer)
}
if tokens.Session != nil {
prm.UseSession(*tokens.Session)
}
prm.MarkRaw()
_, err := p.HeadObject(ctx, prm)
var errSplit *object.SplitInfoError
var errECInfo *object.ECInfoError
switch {
case errors.As(err, &errSplit):
return errSplit.SplitInfo(), nil
case err == nil || errors.As(err, &errECInfo):
return nil, relations.ErrNoSplitInfo
default:
return nil, fmt.Errorf("failed to get raw object header: %w", err)
}
}
// ListChildrenByLinker implements relations.Relations.
func (p *Pool) ListChildrenByLinker(ctx context.Context, cnrID cid.ID, objID oid.ID, tokens relations.Tokens) ([]oid.ID, error) {
var addr oid.Address
addr.SetContainer(cnrID)
addr.SetObject(objID)
var prm PrmObjectHead
prm.SetAddress(addr)
if tokens.Bearer != nil {
prm.UseBearer(*tokens.Bearer)
}
if tokens.Session != nil {
prm.UseSession(*tokens.Session)
}
res, err := p.HeadObject(ctx, prm)
if err != nil {
return nil, fmt.Errorf("failed to get linking object's header: %w", err)
}
return res.Children(), nil
}
// GetLeftSibling implements relations.Relations.
func (p *Pool) GetLeftSibling(ctx context.Context, cnrID cid.ID, objID oid.ID, tokens relations.Tokens) (oid.ID, error) {
var addr oid.Address
addr.SetContainer(cnrID)
addr.SetObject(objID)
var prm PrmObjectHead
prm.SetAddress(addr)
if tokens.Bearer != nil {
prm.UseBearer(*tokens.Bearer)
}
if tokens.Session != nil {
prm.UseSession(*tokens.Session)
}
res, err := p.HeadObject(ctx, prm)
if err != nil {
return oid.ID{}, fmt.Errorf("failed to read split chain member's header: %w", err)
}
idMember, ok := res.PreviousID()
if !ok {
return oid.ID{}, relations.ErrNoLeftSibling
}
return idMember, nil
}
// FindSiblingBySplitID implements relations.Relations.
func (p *Pool) FindSiblingBySplitID(ctx context.Context, cnrID cid.ID, splitID *object.SplitID, tokens relations.Tokens) ([]oid.ID, error) {
var query object.SearchFilters
query.AddSplitIDFilter(object.MatchStringEqual, splitID)
var prm PrmObjectSearch
prm.SetContainerID(cnrID)
prm.SetFilters(query)
if tokens.Bearer != nil {
prm.UseBearer(*tokens.Bearer)
}
if tokens.Session != nil {
prm.UseSession(*tokens.Session)
}
res, err := p.SearchObjects(ctx, prm)
if err != nil {
return nil, fmt.Errorf("failed to search objects by split ID: %w", err)
}
var members []oid.ID
err = res.Iterate(func(id oid.ID) bool {
members = append(members, id)
return false
})
if err != nil {
return nil, fmt.Errorf("failed to iterate found objects: %w", err)
}
return members, nil
}
// FindSiblingByParentID implements relations.Relations.
func (p *Pool) FindSiblingByParentID(ctx context.Context, cnrID cid.ID, objID oid.ID, tokens relations.Tokens) ([]oid.ID, error) {
var query object.SearchFilters
query.AddParentIDFilter(object.MatchStringEqual, objID)
var prm PrmObjectSearch
prm.SetContainerID(cnrID)
prm.SetFilters(query)
if tokens.Bearer != nil {
prm.UseBearer(*tokens.Bearer)
}
if tokens.Session != nil {
prm.UseSession(*tokens.Session)
}
resSearch, err := p.SearchObjects(ctx, prm)
if err != nil {
return nil, fmt.Errorf("failed to find object children: %w", err)
}
var res []oid.ID
err = resSearch.Iterate(func(id oid.ID) bool {
res = append(res, id)
return false
})
if err != nil {
return nil, fmt.Errorf("failed to iterate found objects: %w", err)
}
return res, nil
}