package pool import ( "context" "crypto/ecdsa" "errors" "fmt" "io" "math" "time" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/accounting" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/ape" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer" sdkClient "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" frostfsecdsa "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/crypto/ecdsa" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/relations" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user" "github.com/google/uuid" "github.com/nspcc-dev/neo-go/pkg/crypto/keys" "go.uber.org/zap" "google.golang.org/grpc" ) // client represents virtual connection to the single FrostFS network endpoint from which Pool is formed. // This interface is expected to have exactly one production implementation - clientWrapper. // Others are expected to be for test purposes only. type client interface { // see clientWrapper.balanceGet. balanceGet(context.Context, PrmBalanceGet) (accounting.Decimal, error) // see clientWrapper.containerPut. containerPut(context.Context, PrmContainerPut) (cid.ID, error) // see clientWrapper.containerGet. containerGet(context.Context, PrmContainerGet) (container.Container, error) // see clientWrapper.containerList. containerList(context.Context, PrmContainerList) ([]cid.ID, error) // see clientWrapper.containerListStream. containerListStream(context.Context, PrmListStream) (ResListStream, error) // see clientWrapper.containerDelete. containerDelete(context.Context, PrmContainerDelete) error // see clientWrapper.apeManagerAddChain. apeManagerAddChain(context.Context, PrmAddAPEChain) error // see clientWrapper.apeManagerRemoveChain. apeManagerRemoveChain(context.Context, PrmRemoveAPEChain) error // see clientWrapper.apeManagerListChains. apeManagerListChains(context.Context, PrmListAPEChains) ([]ape.Chain, error) // see clientWrapper.endpointInfo. endpointInfo(context.Context, prmEndpointInfo) (netmap.NodeInfo, error) // see clientWrapper.healthcheck. healthcheck(ctx context.Context) (netmap.NodeInfo, error) // see clientWrapper.networkInfo. networkInfo(context.Context, prmNetworkInfo) (netmap.NetworkInfo, error) // see clientWrapper.netMapSnapshot netMapSnapshot(context.Context, prmNetMapSnapshot) (netmap.NetMap, error) // see clientWrapper.objectPut. objectPut(context.Context, PrmObjectPut) (ResPutObject, error) // see clientWrapper.objectPatch. objectPatch(context.Context, PrmObjectPatch) (ResPatchObject, error) // see clientWrapper.objectDelete. objectDelete(context.Context, PrmObjectDelete) error // see clientWrapper.objectGet. objectGet(context.Context, PrmObjectGet) (ResGetObject, error) // see clientWrapper.objectHead. objectHead(context.Context, PrmObjectHead) (object.Object, error) // see clientWrapper.objectRange. objectRange(context.Context, PrmObjectRange) (ResObjectRange, error) // see clientWrapper.objectSearch. objectSearch(context.Context, PrmObjectSearch) (ResObjectSearch, error) // see clientWrapper.sessionCreate. sessionCreate(context.Context, prmCreateSession) (resCreateSession, error) clientStatus // see clientWrapper.dial. dial(ctx context.Context) error // see clientWrapper.restart. restart(ctx context.Context) error // see clientWrapper.close. close() error } // clientStatus provide access to some metrics for connection. type clientStatus interface { // isHealthy checks if the connection can handle requests. isHealthy() bool // setUnhealthy marks client as unhealthy. setUnhealthy() // setHealthy marks client as healthy. setHealthy() // address return address of endpoint. address() string // currentErrorRate returns current errors rate. // After specific threshold connection is considered as unhealthy. // Pool.startRebalance routine can make this connection healthy again. currentErrorRate() uint32 // overallErrorRate returns the number of all happened errors. overallErrorRate() uint64 // methodsStatus returns statistic for all used methods. methodsStatus() []StatusSnapshot // incRequestCount increases the counter of ongoing operations. incRequestCount() bool // decRequestCount decreases the counter of ongoing operations. decRequestCount() // markClientClosed marks the client as closed. markClientClosed() } // InitParameters contains values used to initialize connection Pool. type InitParameters struct { key *ecdsa.PrivateKey logger *zap.Logger nodeDialTimeout time.Duration nodeStreamTimeout time.Duration healthcheckTimeout time.Duration clientRebalanceInterval time.Duration sessionExpirationDuration uint64 errorThreshold uint32 nodeParams []NodeParam requestCallback func(RequestInfo) dialOptions []grpc.DialOption clientBuilder clientBuilder gracefulCloseOnSwitchTimeout time.Duration } // SetKey specifies default key to be used for the protocol communication by default. func (x *InitParameters) SetKey(key *ecdsa.PrivateKey) { x.key = key } // SetLogger specifies logger. func (x *InitParameters) SetLogger(logger *zap.Logger) { x.logger = logger } // SetNodeDialTimeout specifies the timeout for connection to be established. func (x *InitParameters) SetNodeDialTimeout(timeout time.Duration) { x.nodeDialTimeout = timeout } // SetNodeStreamTimeout specifies the timeout for individual operations in streaming RPC. func (x *InitParameters) SetNodeStreamTimeout(timeout time.Duration) { x.nodeStreamTimeout = timeout } // SetHealthcheckTimeout specifies the timeout for request to node to decide if it is alive. // // See also Pool.Dial. func (x *InitParameters) SetHealthcheckTimeout(timeout time.Duration) { x.healthcheckTimeout = timeout } // SetClientRebalanceInterval specifies the interval for updating nodes health status. // // See also Pool.Dial. func (x *InitParameters) SetClientRebalanceInterval(interval time.Duration) { x.clientRebalanceInterval = interval } // SetGracefulCloseOnSwitchTimeout specifies the timeout after which unhealthy client be closed during rebalancing // if it will become healthy back. // Generally this param should be less than client rebalance interval (see SetClientRebalanceInterval). // // See also SetErrorThreshold. func (x *InitParameters) SetGracefulCloseOnSwitchTimeout(timeout time.Duration) { x.gracefulCloseOnSwitchTimeout = timeout } // SetSessionExpirationDuration specifies the session token lifetime in epochs. func (x *InitParameters) SetSessionExpirationDuration(expirationDuration uint64) { x.sessionExpirationDuration = expirationDuration } // SetErrorThreshold specifies the number of errors on connection after which node is considered as unhealthy. func (x *InitParameters) SetErrorThreshold(threshold uint32) { x.errorThreshold = threshold } // SetRequestCallback makes the pool client to pass RequestInfo for each // request to f. Nil (default) means ignore RequestInfo. func (x *InitParameters) SetRequestCallback(f func(RequestInfo)) { x.requestCallback = f } // AddNode append information about the node to which you want to connect. func (x *InitParameters) AddNode(nodeParam NodeParam) { x.nodeParams = append(x.nodeParams, nodeParam) } // SetGRPCDialOptions sets the gRPC dial options for new gRPC client connection. func (x *InitParameters) SetGRPCDialOptions(opts ...grpc.DialOption) { x.dialOptions = opts } // setClientBuilder sets clientBuilder used for client construction. // Wraps setClientBuilderContext without a context. func (x *InitParameters) setClientBuilder(builder clientBuilder) { x.clientBuilder = builder } // isMissingClientBuilder checks if client constructor was not specified. func (x *InitParameters) isMissingClientBuilder() bool { return x.clientBuilder == nil } type rebalanceParameters struct { nodesParams []*nodesParam nodeRequestTimeout time.Duration clientRebalanceInterval time.Duration sessionExpirationDuration uint64 } type nodesParam struct { priority int addresses []string weights []float64 } // NodeParam groups parameters of remote node. type NodeParam struct { priority int address string weight float64 } // NewNodeParam creates NodeParam using parameters. func NewNodeParam(priority int, address string, weight float64) (prm NodeParam) { prm.SetPriority(priority) prm.SetAddress(address) prm.SetWeight(weight) return } // SetPriority specifies priority of the node. // Negative value is allowed. In the result node groups // with the same priority will be sorted by descent. func (x *NodeParam) SetPriority(priority int) { x.priority = priority } // Priority returns priority of the node. // Requests will be served by nodes subset with the highest priority (the smaller value - the higher priority). // If there are no healthy nodes in subsets with current or higher priority, requests will be served // by nodes subset with lower priority. func (x *NodeParam) Priority() int { return x.priority } // SetAddress specifies address of the node. func (x *NodeParam) SetAddress(address string) { x.address = address } // Address returns address of the node. func (x *NodeParam) Address() string { return x.address } // SetWeight specifies weight of the node. // Weights used to adjust requests' distribution between nodes with the same priority. func (x *NodeParam) SetWeight(weight float64) { x.weight = weight } // Weight returns weight of the node. func (x *NodeParam) Weight() float64 { return x.weight } // WaitParams contains parameters used in polling is a something applied on FrostFS network. type WaitParams struct { Timeout time.Duration PollInterval time.Duration } // SetTimeout specifies the time to wait for the operation to complete. // // Deprecated: Use WaitParams.Timeout instead. func (x *WaitParams) SetTimeout(timeout time.Duration) { x.Timeout = timeout } // SetPollInterval specifies the interval, once it will check the completion of the operation. // // Deprecated: Use WaitParams.PollInterval instead. func (x *WaitParams) SetPollInterval(tick time.Duration) { x.PollInterval = tick } func defaultWaitParams() *WaitParams { return &WaitParams{ Timeout: 120 * time.Second, PollInterval: 5 * time.Second, } } // checkForPositive panics if any of the wait params isn't positive. func (x *WaitParams) checkForPositive() { if x.Timeout <= 0 || x.PollInterval <= 0 { panic("all wait params must be positive") } } // CheckValidity checks if all wait params are non-negative. func (x *WaitParams) CheckValidity() error { if x.Timeout <= 0 { return errors.New("timeout cannot be negative") } if x.PollInterval <= 0 { return errors.New("poll interval cannot be negative") } return nil } type prmContext struct { defaultSession bool verb session.ObjectVerb cnr cid.ID objSet bool objs []oid.ID } func (x *prmContext) useDefaultSession() { x.defaultSession = true } func (x *prmContext) useContainer(cnr cid.ID) { x.cnr = cnr } func (x *prmContext) useObjects(ids []oid.ID) { x.objs = ids x.objSet = true } func (x *prmContext) useAddress(addr oid.Address) { x.cnr = addr.Container() x.objs = []oid.ID{addr.Object()} x.objSet = true } func (x *prmContext) useVerb(verb session.ObjectVerb) { x.verb = verb } type prmCommon struct { key *ecdsa.PrivateKey btoken *bearer.Token stoken *session.Object } // UseKey specifies private key to sign the requests. // If key is not provided, then Pool default key is used. func (x *prmCommon) UseKey(key *ecdsa.PrivateKey) { x.key = key } // UseBearer attaches bearer token to be used for the operation. func (x *prmCommon) UseBearer(token bearer.Token) { x.btoken = &token } // UseSession specifies session within which operation should be performed. func (x *prmCommon) UseSession(token session.Object) { x.stoken = &token } // PrmObjectPut groups parameters of PutObject operation. type PrmObjectPut struct { prmCommon hdr object.Object payload io.Reader copiesNumber []uint32 clientCut bool networkInfo netmap.NetworkInfo withoutHomomorphicHash bool bufferMaxSize uint64 } // SetHeader specifies header of the object. func (x *PrmObjectPut) SetHeader(hdr object.Object) { x.hdr = hdr } // SetPayload specifies payload of the object. func (x *PrmObjectPut) SetPayload(payload io.Reader) { x.payload = payload } // SetCopiesNumber sets number of object copies that is enough to consider put successful. // Zero means using default behavior. func (x *PrmObjectPut) SetCopiesNumber(copiesNumber uint32) { x.copiesNumber = []uint32{copiesNumber} } // SetCopiesNumberVector sets number of object copies that is enough to consider put successful, provided as array. // Nil/empty vector means using default behavior. func (x *PrmObjectPut) SetCopiesNumberVector(copiesNumber []uint32) { x.copiesNumber = copiesNumber } // SetClientCut enables client cut for objects. It means that full object is prepared on client side // and retrying is possible. But this leads to additional memory using for buffering object parts. // Buffer size for every put is MaxObjectSize value from FrostFS network. // There is limit for total memory allocation for in-flight request and // can be set by InitParameters.SetMaxClientCutMemory (default value is 1gb). // Put requests will fail if this limit be reached. func (x *PrmObjectPut) SetClientCut(clientCut bool) { x.clientCut = clientCut } // WithoutHomomorphicHash if set to true do not use Tillich-ZĂ©mor hash for payload. func (x *PrmObjectPut) WithoutHomomorphicHash(v bool) { x.withoutHomomorphicHash = v } // SetBufferMaxSize sets max buffer size to read payload. // This value isn't used if object size is set explicitly and less than this value. // Default value 3MB. func (x *PrmObjectPut) SetBufferMaxSize(size uint64) { x.bufferMaxSize = size } func (x *PrmObjectPut) setNetworkInfo(ni netmap.NetworkInfo) { x.networkInfo = ni } // PrmObjectPatch groups parameters of PatchObject operation. type PrmObjectPatch struct { prmCommon addr oid.Address rng *object.Range payload io.Reader newAttrs []object.Attribute replaceAttrs bool maxPayloadPatchChunkLength int } // SetAddress sets the address of the object that is patched. func (x *PrmObjectPatch) SetAddress(addr oid.Address) { x.addr = addr } // SetRange sets the patch's range. func (x *PrmObjectPatch) SetRange(rng *object.Range) { x.rng = rng } // SetPayloadReader sets a payload reader. func (x *PrmObjectPatch) SetPayloadReader(payload io.Reader) { x.payload = payload } // SetRange sets the new attributes to the patch. func (x *PrmObjectPatch) SetNewAttributes(newAttrs []object.Attribute) { x.newAttrs = newAttrs } // SetRange sets the replace attributes flag to the patch. func (x *PrmObjectPatch) SetReplaceAttributes(replaceAttrs bool) { x.replaceAttrs = replaceAttrs } // SetMaxPayloadPatchChunkSize sets a max buf size to read the patch's payload. func (x *PrmObjectPatch) SetMaxPayloadPatchChunkSize(maxPayloadPatchChunkSize int) { x.maxPayloadPatchChunkLength = maxPayloadPatchChunkSize } // PrmObjectDelete groups parameters of DeleteObject operation. type PrmObjectDelete struct { prmCommon addr oid.Address } // SetAddress specifies FrostFS address of the object. func (x *PrmObjectDelete) SetAddress(addr oid.Address) { x.addr = addr } // PrmObjectGet groups parameters of GetObject operation. type PrmObjectGet struct { prmCommon addr oid.Address } // SetAddress specifies FrostFS address of the object. func (x *PrmObjectGet) SetAddress(addr oid.Address) { x.addr = addr } // PrmObjectHead groups parameters of HeadObject operation. type PrmObjectHead struct { prmCommon addr oid.Address raw bool } // SetAddress specifies FrostFS address of the object. func (x *PrmObjectHead) SetAddress(addr oid.Address) { x.addr = addr } // MarkRaw marks an intent to read physically stored object. func (x *PrmObjectHead) MarkRaw() { x.raw = true } // PrmObjectRange groups parameters of RangeObject operation. type PrmObjectRange struct { prmCommon addr oid.Address off, ln uint64 } // SetAddress specifies FrostFS address of the object. func (x *PrmObjectRange) SetAddress(addr oid.Address) { x.addr = addr } // SetOffset sets offset of the payload range to be read. func (x *PrmObjectRange) SetOffset(offset uint64) { x.off = offset } // SetLength sets length of the payload range to be read. func (x *PrmObjectRange) SetLength(length uint64) { x.ln = length } // PrmObjectSearch groups parameters of SearchObjects operation. type PrmObjectSearch struct { prmCommon cnrID cid.ID filters object.SearchFilters } // SetContainerID specifies the container in which to look for objects. func (x *PrmObjectSearch) SetContainerID(cnrID cid.ID) { x.cnrID = cnrID } // SetFilters specifies filters by which to select objects. func (x *PrmObjectSearch) SetFilters(filters object.SearchFilters) { x.filters = filters } // PrmContainerPut groups parameters of PutContainer operation. type PrmContainerPut struct { ClientParams sdkClient.PrmContainerPut WaitParams *WaitParams } // SetContainer container structure to be used as a parameter of the base // client's operation. // // See git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client.PrmContainerPut.SetContainer. // // Deprecated: Use PrmContainerPut.ClientParams.Container instead. func (x *PrmContainerPut) SetContainer(cnr container.Container) { x.ClientParams.SetContainer(cnr) } // WithinSession specifies session to be used as a parameter of the base // client's operation. // // See git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client.PrmContainerPut.WithinSession. // // Deprecated: Use PrmContainerPut.ClientParams.Session instead. func (x *PrmContainerPut) WithinSession(s session.Container) { x.ClientParams.WithinSession(s) } // SetWaitParams specifies timeout params to complete operation. // If not provided the default one will be used. // Panics if any of the wait params isn't positive. // // Deprecated: Use PrmContainerPut.ClientParams.WaitParams instead. func (x *PrmContainerPut) SetWaitParams(waitParams WaitParams) { waitParams.checkForPositive() x.WaitParams = &waitParams } // PrmContainerGet groups parameters of GetContainer operation. type PrmContainerGet struct { ContainerID cid.ID Session *session.Container } // SetContainerID specifies identifier of the container to be read. // // Deprecated: Use PrmContainerGet.ContainerID instead. func (prm *PrmContainerGet) SetContainerID(cnrID cid.ID) { prm.ContainerID = cnrID } // PrmContainerList groups parameters of ListContainers operation. type PrmContainerList struct { OwnerID user.ID Session *session.Container } // SetOwnerID specifies identifier of the FrostFS account to list the containers. // // Deprecated: Use PrmContainerList.OwnerID instead. func (x *PrmContainerList) SetOwnerID(ownerID user.ID) { x.OwnerID = ownerID } // PrmContainerDelete groups parameters of DeleteContainer operation. type PrmContainerDelete struct { ContainerID cid.ID Session *session.Container WaitParams *WaitParams } // SetContainerID specifies identifier of the FrostFS container to be removed. // // Deprecated: Use PrmContainerDelete.ContainerID instead. func (x *PrmContainerDelete) SetContainerID(cnrID cid.ID) { x.ContainerID = cnrID } // SetSessionToken specifies session within which operation should be performed. // // Deprecated: Use PrmContainerDelete.Session instead. func (x *PrmContainerDelete) SetSessionToken(token session.Container) { x.Session = &token } // SetWaitParams specifies timeout params to complete operation. // If not provided the default one will be used. // Panics if any of the wait params isn't positive. // // Deprecated: Use PrmContainerDelete.WaitParams instead. func (x *PrmContainerDelete) SetWaitParams(waitParams WaitParams) { waitParams.checkForPositive() x.WaitParams = &waitParams } type PrmAddAPEChain struct { Target ape.ChainTarget Chain ape.Chain } type PrmRemoveAPEChain struct { Target ape.ChainTarget ChainID ape.ChainID } type PrmListAPEChains struct { Target ape.ChainTarget } // PrmBalanceGet groups parameters of Balance operation. type PrmBalanceGet struct { account user.ID } // SetAccount specifies identifier of the FrostFS account for which the balance is requested. func (x *PrmBalanceGet) SetAccount(id user.ID) { x.account = id } // prmEndpointInfo groups parameters of sessionCreate operation. type prmCreateSession struct { exp uint64 key ecdsa.PrivateKey } // setExp sets number of the last FrostFS epoch in the lifetime of the session after which it will be expired. func (x *prmCreateSession) setExp(exp uint64) { x.exp = exp } // useKey specifies owner private key for session token. // If key is not provided, then Pool default key is used. func (x *prmCreateSession) useKey(key ecdsa.PrivateKey) { x.key = key } // prmEndpointInfo groups parameters of endpointInfo operation. type prmEndpointInfo struct{} // prmNetworkInfo groups parameters of networkInfo operation. type prmNetworkInfo struct{} // prmNetMapSnapshot groups parameters of netMapSnapshot operation. type prmNetMapSnapshot struct{} // resCreateSession groups resulting values of sessionCreate operation. type resCreateSession struct { id []byte sessionKey []byte } // Pool represents virtual connection to the FrostFS network to communicate // with multiple FrostFS servers without thinking about switching between servers // due to load balancing proportions or their unavailability. // It is designed to provide a convenient abstraction from the multiple sdkClient.client types. // // Pool can be created and initialized using NewPool function. // Before executing the FrostFS operations using the Pool, connection to the // servers MUST BE correctly established (see Dial method). // Using the Pool before connecting have been established can lead to a panic. // After the work, the Pool SHOULD BE closed (see Close method): it frees internal // and system resources which were allocated for the period of work of the Pool. // Calling Dial/Close methods during the communication process step strongly discouraged // as it leads to undefined behavior. // // Each method which produces a FrostFS API call may return an error. // Status of underlying server response is casted to built-in error instance. // Certain statuses can be checked using `sdkClient` and standard `errors` packages. // Note that package provides some helper functions to work with status returns // (e.g. sdkClient.IsErrContainerNotFound, sdkClient.IsErrObjectNotFound). // // See pool package overview to get some examples. type Pool struct { manager *connectionManager logger *zap.Logger key *ecdsa.PrivateKey cache *sessionCache stokenDuration uint64 maxObjectSize uint64 } const ( defaultSessionTokenExpirationDuration = 100 // in epochs defaultErrorThreshold = 100 defaultGracefulCloseOnSwitchTimeout = 10 * time.Second defaultRebalanceInterval = 15 * time.Second defaultHealthcheckTimeout = 4 * time.Second defaultDialTimeout = 5 * time.Second defaultStreamTimeout = 10 * time.Second defaultBufferMaxSizeForPut = 3 * 1024 * 1024 // 3 MB ) // NewPool returns an instance of Pool configured according to the parameters. // // Before using Pool, you MUST call Dial. func NewPool(options InitParameters) (*Pool, error) { cache, err := newCache(options.sessionExpirationDuration) if err != nil { return nil, fmt.Errorf("couldn't create cache: %w", err) } fillDefaultInitParams(&options, cache) manager, err := newConnectionManager(options) if err != nil { return nil, err } pool := &Pool{ cache: cache, key: options.key, logger: options.logger, manager: manager, stokenDuration: options.sessionExpirationDuration, } return pool, nil } // Dial establishes a connection to the servers from the FrostFS network. // It also starts a routine that checks the health of the nodes and // updates the weights of the nodes for balancing. // Returns an error describing failure reason. // // If failed, the Pool SHOULD NOT be used. // // See also InitParameters.SetClientRebalanceInterval. func (p *Pool) Dial(ctx context.Context) error { err := p.manager.dial(ctx) if err != nil { return err } var atLeastOneHealthy bool p.manager.iterate(func(cl client) { var st session.Object err := initSessionForDuration(ctx, &st, cl, p.manager.rebalanceParams.sessionExpirationDuration, *p.key, false) if err != nil { if p.logger != nil { p.logger.Log(zap.WarnLevel, "failed to create frostfs session token for client", zap.String("address", cl.address()), zap.Error(err)) } return } _ = p.cache.Put(formCacheKey(cl.address(), p.key, false), st) atLeastOneHealthy = true }) if !atLeastOneHealthy { return fmt.Errorf("at least one node must be healthy") } ni, err := p.NetworkInfo(ctx) if err != nil { return fmt.Errorf("get network info for max object size: %w", err) } p.maxObjectSize = ni.MaxObjectSize() return nil } func fillDefaultInitParams(params *InitParameters, cache *sessionCache) { if params.sessionExpirationDuration == 0 { params.sessionExpirationDuration = defaultSessionTokenExpirationDuration } if params.errorThreshold == 0 { params.errorThreshold = defaultErrorThreshold } if params.clientRebalanceInterval <= 0 { params.clientRebalanceInterval = defaultRebalanceInterval } if params.gracefulCloseOnSwitchTimeout <= 0 { params.gracefulCloseOnSwitchTimeout = defaultGracefulCloseOnSwitchTimeout } if params.healthcheckTimeout <= 0 { params.healthcheckTimeout = defaultHealthcheckTimeout } if params.nodeDialTimeout <= 0 { params.nodeDialTimeout = defaultDialTimeout } if params.nodeStreamTimeout <= 0 { params.nodeStreamTimeout = defaultStreamTimeout } if cache.tokenDuration == 0 { cache.tokenDuration = defaultSessionTokenExpirationDuration } if params.isMissingClientBuilder() { params.setClientBuilder(func(addr string) client { var prm wrapperPrm prm.setAddress(addr) prm.setKey(*params.key) prm.setLogger(params.logger) prm.setDialTimeout(params.nodeDialTimeout) prm.setStreamTimeout(params.nodeStreamTimeout) prm.setErrorThreshold(params.errorThreshold) prm.setGracefulCloseOnSwitchTimeout(params.gracefulCloseOnSwitchTimeout) prm.setPoolRequestCallback(params.requestCallback) prm.setGRPCDialOptions(params.dialOptions) prm.setResponseInfoCallback(func(info sdkClient.ResponseMetaInfo) error { cache.updateEpoch(info.Epoch()) return nil }) return newWrapper(prm) }) } } func formCacheKey(address string, key *ecdsa.PrivateKey, clientCut bool) string { k := keys.PrivateKey{PrivateKey: *key} stype := "server" if clientCut { stype = "client" } return address + stype + k.String() } func (p *Pool) checkSessionTokenErr(err error, address string) bool { if err == nil { return false } if sdkClient.IsErrSessionNotFound(err) || sdkClient.IsErrSessionExpired(err) { p.cache.DeleteByPrefix(address) return true } return false } func initSessionForDuration(ctx context.Context, dst *session.Object, c client, dur uint64, ownerKey ecdsa.PrivateKey, clientCut bool) error { ni, err := c.networkInfo(ctx, prmNetworkInfo{}) if err != nil { return err } epoch := ni.CurrentEpoch() var exp uint64 if math.MaxUint64-epoch < dur { exp = math.MaxUint64 } else { exp = epoch + dur } var prm prmCreateSession prm.setExp(exp) prm.useKey(ownerKey) var ( id uuid.UUID key frostfsecdsa.PublicKey ) if clientCut { id = uuid.New() key = frostfsecdsa.PublicKey(ownerKey.PublicKey) } else { res, err := c.sessionCreate(ctx, prm) if err != nil { return err } if err = id.UnmarshalBinary(res.id); err != nil { return fmt.Errorf("invalid session token ID: %w", err) } if err = key.Decode(res.sessionKey); err != nil { return fmt.Errorf("invalid public session key: %w", err) } } dst.SetID(id) dst.SetAuthKey(&key) dst.SetExp(exp) return nil } type callContext struct { client client // client endpoint endpoint string // request signer key *ecdsa.PrivateKey // flag to open default session if session token is missing sessionDefault bool sessionTarget func(session.Object) sessionVerb session.ObjectVerb sessionCnr cid.ID sessionObjSet bool sessionObjs []oid.ID sessionClientCut bool } func (p *Pool) initCall(ctxCall *callContext, cfg prmCommon, prmCtx prmContext) error { p.fillAppropriateKey(&cfg) cp, err := p.manager.connection() if err != nil { return err } ctxCall.key = cfg.key if ctxCall.key == nil { // use pool key if caller didn't specify its own ctxCall.key = p.key } ctxCall.endpoint = cp.address() ctxCall.client = cp if ctxCall.sessionTarget != nil && cfg.stoken != nil { ctxCall.sessionTarget(*cfg.stoken) } // note that we don't override session provided by the caller ctxCall.sessionDefault = cfg.stoken == nil && prmCtx.defaultSession if ctxCall.sessionDefault { ctxCall.sessionVerb = prmCtx.verb ctxCall.sessionCnr = prmCtx.cnr ctxCall.sessionObjSet = prmCtx.objSet ctxCall.sessionObjs = prmCtx.objs } return err } // opens new session or uses cached one. // Must be called only on initialized callContext with set sessionTarget. func (p *Pool) openDefaultSession(ctx context.Context, cc *callContext) error { cacheKey := formCacheKey(cc.endpoint, cc.key, cc.sessionClientCut) tok, ok := p.cache.Get(cacheKey) if !ok { // init new session err := initSessionForDuration(ctx, &tok, cc.client, p.stokenDuration, *cc.key, cc.sessionClientCut) if err != nil { return fmt.Errorf("session API client: %w", err) } // cache the opened session p.cache.Put(cacheKey, tok) } tok.ForVerb(cc.sessionVerb) tok.BindContainer(cc.sessionCnr) if cc.sessionObjSet { tok.LimitByObjects(cc.sessionObjs...) } // sign the token if err := tok.Sign(*cc.key); err != nil { return fmt.Errorf("sign token of the opened session: %w", err) } cc.sessionTarget(tok) return nil } // opens default session (if sessionDefault is set), and calls f. If f returns // session-related error then cached token is removed. func (p *Pool) call(ctx context.Context, cc *callContext, f func() error) error { var err error if cc.sessionDefault { err = p.openDefaultSession(ctx, cc) if err != nil { return fmt.Errorf("open default session: %w", err) } } err = f() _ = p.checkSessionTokenErr(err, cc.endpoint) return err } // fillAppropriateKey use pool key if caller didn't specify its own. func (p *Pool) fillAppropriateKey(prm *prmCommon) { if prm.key == nil { prm.key = p.key } } // ResPutObject is designed to provide identifier and creation epoch of the saved object. type ResPutObject struct { ObjectID oid.ID Epoch uint64 } // ResPatchObject is designed to provide identifier for the saved patched object. type ResPatchObject struct { ObjectID oid.ID } // PatchObject patches an object through a remote server using FrostFS API protocol. func (p *Pool) PatchObject(ctx context.Context, prm PrmObjectPatch) (ResPatchObject, error) { var prmCtx prmContext prmCtx.useDefaultSession() prmCtx.useVerb(session.VerbObjectPatch) prmCtx.useContainer(prm.addr.Container()) var ctxCall callContext err := p.initCall(&ctxCall, prm.prmCommon, prmCtx) defer p.manager.returnConnection(ctxCall.client) if err != nil { return ResPatchObject{}, fmt.Errorf("init call context: %w", err) } if ctxCall.sessionDefault { ctxCall.sessionTarget = prm.UseSession if err := p.openDefaultSession(ctx, &ctxCall); err != nil { return ResPatchObject{}, fmt.Errorf("open default session: %w", err) } } res, err := ctxCall.client.objectPatch(ctx, prm) if err != nil { // removes session token from cache in case of token error p.checkSessionTokenErr(err, ctxCall.endpoint) return ResPatchObject{}, fmt.Errorf("init patching on API client %s: %w", ctxCall.endpoint, err) } return res, nil } // PutObject writes an object through a remote server using FrostFS API protocol. func (p *Pool) PutObject(ctx context.Context, prm PrmObjectPut) (ResPutObject, error) { cnr, _ := prm.hdr.ContainerID() var prmCtx prmContext prmCtx.useDefaultSession() prmCtx.useVerb(session.VerbObjectPut) prmCtx.useContainer(cnr) var ctxCall callContext ctxCall.sessionClientCut = prm.clientCut err := p.initCall(&ctxCall, prm.prmCommon, prmCtx) defer p.manager.returnConnection(ctxCall.client) if err != nil { return ResPutObject{}, fmt.Errorf("init call context: %w", err) } if ctxCall.sessionDefault { ctxCall.sessionTarget = prm.UseSession if err := p.openDefaultSession(ctx, &ctxCall); err != nil { return ResPutObject{}, fmt.Errorf("open default session: %w", err) } } if prm.clientCut { var ni netmap.NetworkInfo ni.SetCurrentEpoch(p.cache.Epoch()) ni.SetMaxObjectSize(p.maxObjectSize) // we want to use initial max object size in PayloadSizeLimiter prm.setNetworkInfo(ni) } res, err := ctxCall.client.objectPut(ctx, prm) if err != nil { // removes session token from cache in case of token error p.checkSessionTokenErr(err, ctxCall.endpoint) return ResPutObject{}, fmt.Errorf("init writing on API client %s: %w", ctxCall.endpoint, err) } return res, nil } // DeleteObject marks an object for deletion from the container using FrostFS API protocol. // As a marker, a special unit called a tombstone is placed in the container. // It confirms the user's intent to delete the object, and is itself a container object. // Explicit deletion is done asynchronously, and is generally not guaranteed. func (p *Pool) DeleteObject(ctx context.Context, prm PrmObjectDelete) error { var prmCtx prmContext prmCtx.useDefaultSession() prmCtx.useVerb(session.VerbObjectDelete) prmCtx.useAddress(prm.addr) if prm.stoken == nil { // collect phy objects only if we are about to open default session var tokens relations.Tokens tokens.Bearer = prm.btoken relatives, err := relations.ListAllRelations(ctx, p, prm.addr.Container(), prm.addr.Object(), tokens) if err != nil { return fmt.Errorf("failed to collect relatives: %w", err) } if len(relatives) != 0 { prmCtx.useContainer(prm.addr.Container()) prmCtx.useObjects(append(relatives, prm.addr.Object())) } } var cc callContext cc.sessionTarget = prm.UseSession err := p.initCall(&cc, prm.prmCommon, prmCtx) defer p.manager.returnConnection(cc.client) if err != nil { return err } return p.call(ctx, &cc, func() error { if err = cc.client.objectDelete(ctx, prm); err != nil { return fmt.Errorf("remove object via client %s: %w", cc.endpoint, err) } return nil }) } type objectReadCloser struct { reader *sdkClient.ObjectReader elapsedTimeCallback func(time.Duration) } // Read implements io.Reader of the object payload. func (x *objectReadCloser) Read(p []byte) (int, error) { start := time.Now() n, err := x.reader.Read(p) x.elapsedTimeCallback(time.Since(start)) return n, err } // Close implements io.Closer of the object payload. func (x *objectReadCloser) Close() error { _, err := x.reader.Close() return err } // ResGetObject is designed to provide object header nad read one object payload from FrostFS system. type ResGetObject struct { Header object.Object Payload io.ReadCloser } // GetObject reads object header and initiates reading an object payload through a remote server using FrostFS API protocol. func (p *Pool) GetObject(ctx context.Context, prm PrmObjectGet) (ResGetObject, error) { var cc callContext cc.sessionTarget = prm.UseSession var res ResGetObject err := p.initCall(&cc, prm.prmCommon, prmContext{}) defer p.manager.returnConnection(cc.client) if err != nil { return res, err } return res, p.call(ctx, &cc, func() error { res, err = cc.client.objectGet(ctx, prm) if err != nil { return fmt.Errorf("get object via client %s: %w", cc.endpoint, err) } return nil }) } // HeadObject reads object header through a remote server using FrostFS API protocol. func (p *Pool) HeadObject(ctx context.Context, prm PrmObjectHead) (object.Object, error) { var cc callContext cc.sessionTarget = prm.UseSession var obj object.Object err := p.initCall(&cc, prm.prmCommon, prmContext{}) defer p.manager.returnConnection(cc.client) if err != nil { return obj, err } return obj, p.call(ctx, &cc, func() error { obj, err = cc.client.objectHead(ctx, prm) if err != nil { return fmt.Errorf("head object via client %s: %w", cc.endpoint, err) } return nil }) } // ResObjectRange is designed to read payload range of one object // from FrostFS system. // // Must be initialized using Pool.ObjectRange, any other // usage is unsafe. type ResObjectRange struct { payload *sdkClient.ObjectRangeReader elapsedTimeCallback func(time.Duration) } // Read implements io.Reader of the object payload. func (x *ResObjectRange) Read(p []byte) (int, error) { start := time.Now() n, err := x.payload.Read(p) x.elapsedTimeCallback(time.Since(start)) return n, err } // Close ends reading the payload range and returns the result of the operation // along with the final results. Must be called after using the ResObjectRange. func (x *ResObjectRange) Close() error { _, err := x.payload.Close() return err } // ObjectRange initiates reading an object's payload range through a remote // server using FrostFS API protocol. func (p *Pool) ObjectRange(ctx context.Context, prm PrmObjectRange) (ResObjectRange, error) { var cc callContext cc.sessionTarget = prm.UseSession var res ResObjectRange err := p.initCall(&cc, prm.prmCommon, prmContext{}) defer p.manager.returnConnection(cc.client) if err != nil { return res, err } return res, p.call(ctx, &cc, func() error { res, err = cc.client.objectRange(ctx, prm) if err != nil { return fmt.Errorf("object range via client %s: %w", cc.endpoint, err) } return nil }) } // ResObjectSearch is designed to read list of object identifiers from FrostFS system. // // Must be initialized using Pool.SearchObjects, any other usage is unsafe. type ResObjectSearch struct { r *sdkClient.ObjectListReader handleError func(context.Context, apistatus.Status, error) error } // Read reads another list of the object identifiers. func (x *ResObjectSearch) Read(buf []oid.ID) (int, error) { n, ok := x.r.Read(buf) if !ok { res, err := x.r.Close() if err == nil { return n, io.EOF } var status apistatus.Status if res != nil { status = res.Status() } err = x.handleError(nil, status, err) return n, err } return n, nil } // Iterate iterates over the list of found object identifiers. // f can return true to stop iteration earlier. // // Returns an error if object can't be read. func (x *ResObjectSearch) Iterate(f func(oid.ID) bool) error { return x.r.Iterate(f) } // Close ends reading list of the matched objects and returns the result of the operation // along with the final results. Must be called after using the ResObjectSearch. func (x *ResObjectSearch) Close() { _, _ = x.r.Close() } // SearchObjects initiates object selection through a remote server using FrostFS API protocol. // // The call only opens the transmission channel, explicit fetching of matched objects // is done using the ResObjectSearch. Resulting reader must be finally closed. func (p *Pool) SearchObjects(ctx context.Context, prm PrmObjectSearch) (ResObjectSearch, error) { var cc callContext cc.sessionTarget = prm.UseSession var res ResObjectSearch err := p.initCall(&cc, prm.prmCommon, prmContext{}) if err != nil { return res, err } return res, p.call(ctx, &cc, func() error { res, err = cc.client.objectSearch(ctx, prm) if err != nil { return fmt.Errorf("search object via client %s: %w", cc.endpoint, err) } return nil }) } // PutContainer sends request to save container in FrostFS and waits for the operation to complete. // // Waiting parameters can be specified using SetWaitParams. If not called, defaults are used: // // polling interval: 5s // waiting timeout: 120s // // Success can be verified by reading by identifier (see GetContainer). func (p *Pool) PutContainer(ctx context.Context, prm PrmContainerPut) (cid.ID, error) { cp, err := p.manager.connection() defer p.manager.returnConnection(cp) if err != nil { return cid.ID{}, err } cnrID, err := cp.containerPut(ctx, prm) if err != nil { return cid.ID{}, fmt.Errorf("put container via client '%s': %w", cp.address(), err) } return cnrID, nil } // GetContainer reads FrostFS container by ID. func (p *Pool) GetContainer(ctx context.Context, prm PrmContainerGet) (container.Container, error) { cp, err := p.manager.connection() defer p.manager.returnConnection(cp) if err != nil { return container.Container{}, err } cnrs, err := cp.containerGet(ctx, prm) if err != nil { return container.Container{}, fmt.Errorf("get container via client '%s': %w", cp.address(), err) } return cnrs, nil } // ListContainers requests identifiers of the account-owned containers. func (p *Pool) ListContainers(ctx context.Context, prm PrmContainerList) ([]cid.ID, error) { cp, err := p.manager.connection() defer p.manager.returnConnection(cp) if err != nil { return nil, err } cnrIDs, err := cp.containerList(ctx, prm) if err != nil { return []cid.ID{}, fmt.Errorf("list containers via client '%s': %w", cp.address(), err) } return cnrIDs, nil } // ListContainersStream requests identifiers of the account-owned containers. func (p *Pool) ListContainersStream(ctx context.Context, prm PrmListStream) (ResListStream, error) { var res ResListStream cp, err := p.manager.connection() defer p.manager.returnConnection(cp) if err != nil { return res, err } res, err = cp.containerListStream(ctx, prm) if err != nil { return res, fmt.Errorf("list containers stream via client '%s': %w", cp.address(), err) } return res, nil } // DeleteContainer sends request to remove the FrostFS container and waits for the operation to complete. // // Waiting parameters can be specified using SetWaitParams. If not called, defaults are used: // // polling interval: 5s // waiting timeout: 120s // // Success can be verified by reading by identifier (see GetContainer). func (p *Pool) DeleteContainer(ctx context.Context, prm PrmContainerDelete) error { cp, err := p.manager.connection() defer p.manager.returnConnection(cp) if err != nil { return err } err = cp.containerDelete(ctx, prm) if err != nil { return fmt.Errorf("delete container via client '%s': %w", cp.address(), err) } return nil } // AddAPEChain sends a request to set APE chain rules for a target (basically, for a container). func (p *Pool) AddAPEChain(ctx context.Context, prm PrmAddAPEChain) error { cp, err := p.manager.connection() defer p.manager.returnConnection(cp) if err != nil { return err } err = cp.apeManagerAddChain(ctx, prm) if err != nil { return fmt.Errorf("add ape chain via client '%s': %w", cp.address(), err) } return nil } // RemoveAPEChain sends a request to remove APE chain rules for a target. func (p *Pool) RemoveAPEChain(ctx context.Context, prm PrmRemoveAPEChain) error { cp, err := p.manager.connection() defer p.manager.returnConnection(cp) if err != nil { return err } err = cp.apeManagerRemoveChain(ctx, prm) if err != nil { return fmt.Errorf("remove ape chain via client '%s': %w", cp.address(), err) } return nil } // ListAPEChains sends a request to list APE chains rules for a target. func (p *Pool) ListAPEChains(ctx context.Context, prm PrmListAPEChains) ([]ape.Chain, error) { cp, err := p.manager.connection() defer p.manager.returnConnection(cp) if err != nil { return nil, err } chains, err := cp.apeManagerListChains(ctx, prm) if err != nil { return nil, fmt.Errorf("list ape chains via client '%s': %w", cp.address(), err) } return chains, nil } // Balance requests current balance of the FrostFS account. func (p *Pool) Balance(ctx context.Context, prm PrmBalanceGet) (accounting.Decimal, error) { cp, err := p.manager.connection() defer p.manager.returnConnection(cp) if err != nil { return accounting.Decimal{}, err } balance, err := cp.balanceGet(ctx, prm) if err != nil { return accounting.Decimal{}, fmt.Errorf("get balance via client '%s': %w", cp.address(), err) } return balance, nil } // Statistic returns connection statistics. func (p Pool) Statistic() Statistic { return p.manager.Statistic() } // waitForContainerPresence waits until the container is found on the FrostFS network. func waitForContainerPresence(ctx context.Context, cli client, prm PrmContainerGet, waitParams *WaitParams) error { return waitFor(ctx, waitParams, func(ctx context.Context) bool { _, err := cli.containerGet(ctx, prm) return err == nil }) } // waitForContainerRemoved waits until the container is removed from the FrostFS network. func waitForContainerRemoved(ctx context.Context, cli client, prm PrmContainerGet, waitParams *WaitParams) error { return waitFor(ctx, waitParams, func(ctx context.Context) bool { _, err := cli.containerGet(ctx, prm) return sdkClient.IsErrContainerNotFound(err) }) } // waitFor await that given condition will be met in waitParams time. func waitFor(ctx context.Context, params *WaitParams, condition func(context.Context) bool) error { wctx, cancel := context.WithTimeout(ctx, params.Timeout) defer cancel() ticker := time.NewTimer(params.PollInterval) defer ticker.Stop() wdone := wctx.Done() done := ctx.Done() for { select { case <-done: return ctx.Err() case <-wdone: return wctx.Err() case <-ticker.C: if condition(ctx) { return nil } ticker.Reset(params.PollInterval) } } } // NetworkInfo requests information about the FrostFS network of which the remote server is a part. func (p *Pool) NetworkInfo(ctx context.Context) (netmap.NetworkInfo, error) { cp, err := p.manager.connection() defer p.manager.returnConnection(cp) if err != nil { return netmap.NetworkInfo{}, err } netInfo, err := cp.networkInfo(ctx, prmNetworkInfo{}) if err != nil { return netmap.NetworkInfo{}, fmt.Errorf("get network info via client '%s': %w", cp.address(), err) } return netInfo, nil } // NetMapSnapshot requests information about the FrostFS network map. func (p *Pool) NetMapSnapshot(ctx context.Context) (netmap.NetMap, error) { cp, err := p.manager.connection() defer p.manager.returnConnection(cp) if err != nil { return netmap.NetMap{}, err } netMap, err := cp.netMapSnapshot(ctx, prmNetMapSnapshot{}) if err != nil { return netmap.NetMap{}, fmt.Errorf("get network map via client '%s': %w", cp.address(), err) } return netMap, nil } // Close closes the Pool and releases all the associated resources. func (p *Pool) Close() { p.manager.close() } // SyncContainerWithNetwork applies network configuration received via // the Pool to the container. Changes the container if it does not satisfy // network configuration. // // Pool and container MUST not be nil. // // Returns any error that does not allow reading configuration // from the network. func SyncContainerWithNetwork(ctx context.Context, cnr *container.Container, p *Pool) error { ni, err := p.NetworkInfo(ctx) if err != nil { return fmt.Errorf("network info: %w", err) } container.ApplyNetworkConfig(cnr, ni) return nil } // GetSplitInfo implements relations.Relations. func (p *Pool) GetSplitInfo(ctx context.Context, cnrID cid.ID, objID oid.ID, tokens relations.Tokens) (*object.SplitInfo, error) { var addr oid.Address addr.SetContainer(cnrID) addr.SetObject(objID) var prm PrmObjectHead prm.SetAddress(addr) if tokens.Bearer != nil { prm.UseBearer(*tokens.Bearer) } if tokens.Session != nil { prm.UseSession(*tokens.Session) } prm.MarkRaw() _, err := p.HeadObject(ctx, prm) var errSplit *object.SplitInfoError var errECInfo *object.ECInfoError switch { case errors.As(err, &errSplit): return errSplit.SplitInfo(), nil case err == nil || errors.As(err, &errECInfo): return nil, relations.ErrNoSplitInfo default: return nil, fmt.Errorf("failed to get raw object header: %w", err) } } // ListChildrenByLinker implements relations.Relations. func (p *Pool) ListChildrenByLinker(ctx context.Context, cnrID cid.ID, objID oid.ID, tokens relations.Tokens) ([]oid.ID, error) { var addr oid.Address addr.SetContainer(cnrID) addr.SetObject(objID) var prm PrmObjectHead prm.SetAddress(addr) if tokens.Bearer != nil { prm.UseBearer(*tokens.Bearer) } if tokens.Session != nil { prm.UseSession(*tokens.Session) } res, err := p.HeadObject(ctx, prm) if err != nil { return nil, fmt.Errorf("failed to get linking object's header: %w", err) } return res.Children(), nil } // GetLeftSibling implements relations.Relations. func (p *Pool) GetLeftSibling(ctx context.Context, cnrID cid.ID, objID oid.ID, tokens relations.Tokens) (oid.ID, error) { var addr oid.Address addr.SetContainer(cnrID) addr.SetObject(objID) var prm PrmObjectHead prm.SetAddress(addr) if tokens.Bearer != nil { prm.UseBearer(*tokens.Bearer) } if tokens.Session != nil { prm.UseSession(*tokens.Session) } res, err := p.HeadObject(ctx, prm) if err != nil { return oid.ID{}, fmt.Errorf("failed to read split chain member's header: %w", err) } idMember, ok := res.PreviousID() if !ok { return oid.ID{}, relations.ErrNoLeftSibling } return idMember, nil } // FindSiblingBySplitID implements relations.Relations. func (p *Pool) FindSiblingBySplitID(ctx context.Context, cnrID cid.ID, splitID *object.SplitID, tokens relations.Tokens) ([]oid.ID, error) { var query object.SearchFilters query.AddSplitIDFilter(object.MatchStringEqual, splitID) var prm PrmObjectSearch prm.SetContainerID(cnrID) prm.SetFilters(query) if tokens.Bearer != nil { prm.UseBearer(*tokens.Bearer) } if tokens.Session != nil { prm.UseSession(*tokens.Session) } res, err := p.SearchObjects(ctx, prm) if err != nil { return nil, fmt.Errorf("failed to search objects by split ID: %w", err) } var members []oid.ID err = res.Iterate(func(id oid.ID) bool { members = append(members, id) return false }) if err != nil { return nil, fmt.Errorf("failed to iterate found objects: %w", err) } return members, nil } // FindSiblingByParentID implements relations.Relations. func (p *Pool) FindSiblingByParentID(ctx context.Context, cnrID cid.ID, objID oid.ID, tokens relations.Tokens) ([]oid.ID, error) { var query object.SearchFilters query.AddParentIDFilter(object.MatchStringEqual, objID) var prm PrmObjectSearch prm.SetContainerID(cnrID) prm.SetFilters(query) if tokens.Bearer != nil { prm.UseBearer(*tokens.Bearer) } if tokens.Session != nil { prm.UseSession(*tokens.Session) } resSearch, err := p.SearchObjects(ctx, prm) if err != nil { return nil, fmt.Errorf("failed to find object children: %w", err) } var res []oid.ID err = resSearch.Iterate(func(id oid.ID) bool { res = append(res, id) return false }) if err != nil { return nil, fmt.Errorf("failed to iterate found objects: %w", err) } return res, nil }