package pool import ( "bytes" "context" "crypto/ecdsa" "crypto/sha256" "errors" "fmt" "io" "math" "math/rand" "sort" "strings" "sync" "time" "github.com/nspcc-dev/neo-go/pkg/crypto/keys" apiclient "github.com/nspcc-dev/neofs-api-go/v2/rpc/client" "github.com/nspcc-dev/neofs-sdk-go/accounting" "github.com/nspcc-dev/neofs-sdk-go/client" "github.com/nspcc-dev/neofs-sdk-go/container" cid "github.com/nspcc-dev/neofs-sdk-go/container/id" "github.com/nspcc-dev/neofs-sdk-go/eacl" "github.com/nspcc-dev/neofs-sdk-go/object" "github.com/nspcc-dev/neofs-sdk-go/object/address" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" "github.com/nspcc-dev/neofs-sdk-go/owner" "github.com/nspcc-dev/neofs-sdk-go/session" "github.com/nspcc-dev/neofs-sdk-go/token" "go.uber.org/zap" ) // Client is a wrapper for client.Client to generate mock. type Client interface { GetBalance(context.Context, client.GetBalancePrm) (*client.GetBalanceRes, error) PutContainer(context.Context, client.ContainerPutPrm) (*client.ContainerPutRes, error) GetContainer(context.Context, client.ContainerGetPrm) (*client.ContainerGetRes, error) ListContainers(context.Context, client.ContainerListPrm) (*client.ContainerListRes, error) DeleteContainer(context.Context, client.ContainerDeletePrm) (*client.ContainerDeleteRes, error) EACL(context.Context, client.EACLPrm) (*client.EACLRes, error) SetEACL(context.Context, client.SetEACLPrm) (*client.SetEACLRes, error) AnnounceContainerUsedSpace(context.Context, client.AnnounceSpacePrm) (*client.AnnounceSpaceRes, error) EndpointInfo(context.Context, client.EndpointInfoPrm) (*client.EndpointInfoRes, error) NetworkInfo(context.Context, client.NetworkInfoPrm) (*client.NetworkInfoRes, error) ObjectPutInit(context.Context, client.PrmObjectPutInit) (*client.ObjectWriter, error) DeleteObject(context.Context, *client.DeleteObjectParams, ...client.CallOption) (*client.ObjectDeleteRes, error) ObjectGetInit(context.Context, client.PrmObjectGet) (*client.ObjectReader, error) ObjectHead(context.Context, client.PrmObjectHead) (*client.ResObjectHead, error) ObjectRangeInit(context.Context, client.PrmObjectRange) (*client.ObjectRangeReader, error) HashObjectPayloadRanges(context.Context, *client.RangeChecksumParams, ...client.CallOption) (*client.ObjectRangeHashRes, error) SearchObjects(context.Context, *client.SearchObjectParams, ...client.CallOption) (*client.ObjectSearchRes, error) AnnounceLocalTrust(context.Context, client.AnnounceLocalTrustPrm) (*client.AnnounceLocalTrustRes, error) AnnounceIntermediateTrust(context.Context, client.AnnounceIntermediateTrustPrm) (*client.AnnounceIntermediateTrustRes, error) CreateSession(context.Context, client.CreateSessionPrm) (*client.CreateSessionRes, error) Raw() *apiclient.Client Conn() io.Closer } // BuilderOptions contains options used to build connection pool. type BuilderOptions struct { Key *ecdsa.PrivateKey Logger *zap.Logger NodeConnectionTimeout time.Duration NodeRequestTimeout time.Duration ClientRebalanceInterval time.Duration SessionTokenThreshold time.Duration SessionExpirationDuration uint64 nodesParams []*NodesParam clientBuilder func(opts ...client.Option) (Client, error) } type NodesParam struct { priority int addresses []string weights []float64 } type NodeParam struct { priority int address string weight float64 } // Builder is an interim structure used to collect node addresses/weights and // build connection pool subsequently. type Builder struct { nodeParams []NodeParam } // ContainerPollingParams contains parameters used in polling is a container created or not. type ContainerPollingParams struct { CreationTimeout time.Duration PollInterval time.Duration } // DefaultPollingParams creates ContainerPollingParams with default values. func DefaultPollingParams() *ContainerPollingParams { return &ContainerPollingParams{ CreationTimeout: 120 * time.Second, PollInterval: 5 * time.Second, } } // AddNode adds address/weight pair to node PoolBuilder list. func (pb *Builder) AddNode(address string, priority int, weight float64) *Builder { pb.nodeParams = append(pb.nodeParams, NodeParam{ address: address, priority: priority, weight: weight, }) return pb } // Build creates new pool based on current PoolBuilder state and options. func (pb *Builder) Build(ctx context.Context, options *BuilderOptions) (Pool, error) { if len(pb.nodeParams) == 0 { return nil, errors.New("no NeoFS peers configured") } nodesParams := make(map[int]*NodesParam) for _, param := range pb.nodeParams { nodes, ok := nodesParams[param.priority] if !ok { nodes = &NodesParam{priority: param.priority} } nodes.addresses = append(nodes.addresses, param.address) nodes.weights = append(nodes.weights, param.weight) nodesParams[param.priority] = nodes } for _, nodes := range nodesParams { nodes.weights = adjustWeights(nodes.weights) options.nodesParams = append(options.nodesParams, nodes) } sort.Slice(options.nodesParams, func(i, j int) bool { return options.nodesParams[i].priority < options.nodesParams[j].priority }) if options.clientBuilder == nil { options.clientBuilder = func(opts ...client.Option) (Client, error) { return client.New(opts...) } } return newPool(ctx, options) } // Pool is an interface providing connection artifacts on request. type Pool interface { Object Container Accounting Connection() (Client, *session.Token, error) OwnerID() *owner.ID WaitForContainerPresence(context.Context, *cid.ID, *ContainerPollingParams) error Close() } type Object interface { PutObject(ctx context.Context, hdr object.Object, payload io.Reader, opts ...CallOption) (*oid.ID, error) DeleteObject(ctx context.Context, params *client.DeleteObjectParams, opts ...CallOption) error GetObject(ctx context.Context, addr address.Address, opts ...CallOption) (*ResGetObject, error) HeadObject(context.Context, address.Address, ...CallOption) (*object.Object, error) ObjectRange(ctx context.Context, addr address.Address, off, ln uint64, opts ...CallOption) (*ResObjectRange, error) ObjectPayloadRangeSHA256(ctx context.Context, params *client.RangeChecksumParams, opts ...CallOption) ([][32]byte, error) ObjectPayloadRangeTZ(ctx context.Context, params *client.RangeChecksumParams, opts ...CallOption) ([][64]byte, error) SearchObject(ctx context.Context, params *client.SearchObjectParams, opts ...CallOption) ([]*oid.ID, error) } type Container interface { PutContainer(ctx context.Context, cnr *container.Container, opts ...CallOption) (*cid.ID, error) GetContainer(ctx context.Context, cid *cid.ID, opts ...CallOption) (*container.Container, error) ListContainers(ctx context.Context, ownerID *owner.ID, opts ...CallOption) ([]*cid.ID, error) DeleteContainer(ctx context.Context, cid *cid.ID, opts ...CallOption) error GetEACL(ctx context.Context, cid *cid.ID, opts ...CallOption) (*eacl.Table, error) SetEACL(ctx context.Context, table *eacl.Table, opts ...CallOption) error } type Accounting interface { Balance(ctx context.Context, owner *owner.ID, opts ...CallOption) (*accounting.Decimal, error) } type clientPack struct { client Client healthy bool address string } type CallOption func(config *callConfig) type callConfig struct { isRetry bool useDefaultSession bool key *ecdsa.PrivateKey btoken *token.BearerToken stoken *session.Token } func WithKey(key *ecdsa.PrivateKey) CallOption { return func(config *callConfig) { config.key = key } } func WithBearer(token *token.BearerToken) CallOption { return func(config *callConfig) { config.btoken = token } } func WithSession(token *session.Token) CallOption { return func(config *callConfig) { config.stoken = token } } func retry() CallOption { return func(config *callConfig) { config.isRetry = true } } func useDefaultSession() CallOption { return func(config *callConfig) { config.useDefaultSession = true } } func cfgFromOpts(opts ...CallOption) *callConfig { var cfg = new(callConfig) for _, opt := range opts { opt(cfg) } return cfg } var _ Pool = (*pool)(nil) type pool struct { innerPools []*innerPool key *ecdsa.PrivateKey owner *owner.ID cancel context.CancelFunc closedCh chan struct{} cache *SessionCache stokenDuration uint64 stokenThreshold time.Duration } type innerPool struct { lock sync.RWMutex sampler *Sampler clientPacks []*clientPack } const ( defaultSessionTokenExpirationDuration = 100 // in blocks defaultSessionTokenThreshold = 5 * time.Second ) func newPool(ctx context.Context, options *BuilderOptions) (Pool, error) { cache, err := NewCache() if err != nil { return nil, fmt.Errorf("couldn't create cache: %w", err) } if options.SessionExpirationDuration == 0 { options.SessionExpirationDuration = defaultSessionTokenExpirationDuration } if options.SessionTokenThreshold <= 0 { options.SessionTokenThreshold = defaultSessionTokenThreshold } ownerID := owner.NewIDFromPublicKey(&options.Key.PublicKey) inner := make([]*innerPool, len(options.nodesParams)) var atLeastOneHealthy bool for i, params := range options.nodesParams { clientPacks := make([]*clientPack, len(params.weights)) for j, addr := range params.addresses { c, err := options.clientBuilder(client.WithDefaultPrivateKey(options.Key), client.WithURIAddress(addr, nil), client.WithDialTimeout(options.NodeConnectionTimeout), client.WithNeoFSErrorParsing()) if err != nil { return nil, err } var healthy bool cliRes, err := createSessionTokenForDuration(ctx, c, options.SessionExpirationDuration) if err != nil && options.Logger != nil { options.Logger.Warn("failed to create neofs session token for client", zap.String("address", addr), zap.Error(err)) } else if err == nil { healthy, atLeastOneHealthy = true, true st := sessionTokenForOwner(ownerID, cliRes) _ = cache.Put(formCacheKey(addr, options.Key), st) } clientPacks[j] = &clientPack{client: c, healthy: healthy, address: addr} } source := rand.NewSource(time.Now().UnixNano()) sampler := NewSampler(params.weights, source) inner[i] = &innerPool{ sampler: sampler, clientPacks: clientPacks, } } if !atLeastOneHealthy { return nil, fmt.Errorf("at least one node must be healthy") } ctx, cancel := context.WithCancel(ctx) pool := &pool{ innerPools: inner, key: options.Key, owner: ownerID, cancel: cancel, closedCh: make(chan struct{}), cache: cache, stokenDuration: options.SessionExpirationDuration, stokenThreshold: options.SessionTokenThreshold, } go startRebalance(ctx, pool, options) return pool, nil } func startRebalance(ctx context.Context, p *pool, options *BuilderOptions) { ticker := time.NewTimer(options.ClientRebalanceInterval) buffers := make([][]float64, len(options.nodesParams)) for i, params := range options.nodesParams { buffers[i] = make([]float64, len(params.weights)) } for { select { case <-ctx.Done(): close(p.closedCh) return case <-ticker.C: updateNodesHealth(ctx, p, options, buffers) ticker.Reset(options.ClientRebalanceInterval) } } } func updateNodesHealth(ctx context.Context, p *pool, options *BuilderOptions, buffers [][]float64) { wg := sync.WaitGroup{} for i, inner := range p.innerPools { wg.Add(1) bufferWeights := buffers[i] go func(i int, innerPool *innerPool) { defer wg.Done() updateInnerNodesHealth(ctx, p, i, options, bufferWeights) }(i, inner) } wg.Wait() } func updateInnerNodesHealth(ctx context.Context, pool *pool, i int, options *BuilderOptions, bufferWeights []float64) { if i > len(pool.innerPools)-1 { return } p := pool.innerPools[i] healthyChanged := false wg := sync.WaitGroup{} var prmEndpoint client.EndpointInfoPrm for j, cPack := range p.clientPacks { wg.Add(1) go func(j int, cli Client) { defer wg.Done() ok := true tctx, c := context.WithTimeout(ctx, options.NodeRequestTimeout) defer c() if _, err := cli.EndpointInfo(tctx, prmEndpoint); err != nil { ok = false bufferWeights[j] = 0 } p.lock.RLock() cp := *p.clientPacks[j] p.lock.RUnlock() if ok { bufferWeights[j] = options.nodesParams[i].weights[j] if !cp.healthy { cliRes, err := createSessionTokenForDuration(ctx, cli, options.SessionExpirationDuration) if err != nil { ok = false bufferWeights[j] = 0 } else { tkn := pool.newSessionToken(cliRes) _ = pool.cache.Put(formCacheKey(cp.address, pool.key), tkn) } } } else { pool.cache.DeleteByPrefix(cp.address) } p.lock.Lock() if p.clientPacks[j].healthy != ok { p.clientPacks[j].healthy = ok healthyChanged = true } p.lock.Unlock() }(j, cPack.client) } wg.Wait() if healthyChanged { probabilities := adjustWeights(bufferWeights) source := rand.NewSource(time.Now().UnixNano()) p.lock.Lock() p.sampler = NewSampler(probabilities, source) p.lock.Unlock() } } func adjustWeights(weights []float64) []float64 { adjusted := make([]float64, len(weights)) sum := 0.0 for _, weight := range weights { sum += weight } if sum > 0 { for i, weight := range weights { adjusted[i] = weight / sum } } return adjusted } func (p *pool) Connection() (Client, *session.Token, error) { cp, err := p.connection() if err != nil { return nil, nil, err } tok := p.cache.Get(formCacheKey(cp.address, p.key)) return cp.client, tok, nil } func (p *pool) connection() (*clientPack, error) { for _, inner := range p.innerPools { cp, err := inner.connection() if err == nil { return cp, nil } } return nil, errors.New("no healthy client") } func (p *innerPool) connection() (*clientPack, error) { p.lock.RLock() defer p.lock.RUnlock() if len(p.clientPacks) == 1 { cp := p.clientPacks[0] if cp.healthy { return cp, nil } return nil, errors.New("no healthy client") } attempts := 3 * len(p.clientPacks) for k := 0; k < attempts; k++ { i := p.sampler.Next() if cp := p.clientPacks[i]; cp.healthy { return cp, nil } } return nil, errors.New("no healthy client") } func (p *pool) OwnerID() *owner.ID { return p.owner } func formCacheKey(address string, key *ecdsa.PrivateKey) string { k := keys.PrivateKey{PrivateKey: *key} return address + k.String() } func (p *pool) conn(ctx context.Context, cfg *callConfig) (*clientPack, []client.CallOption, error) { cp, err := p.connection() if err != nil { return nil, nil, err } clientCallOptions := make([]client.CallOption, 0, 3) key := p.key if cfg.key != nil { key = cfg.key } clientCallOptions = append(clientCallOptions, client.WithKey(key)) sessionToken := cfg.stoken if sessionToken == nil && cfg.useDefaultSession { cacheKey := formCacheKey(cp.address, key) sessionToken = p.cache.Get(cacheKey) if sessionToken == nil { cliRes, err := createSessionTokenForDuration(ctx, cp.client, p.stokenDuration) if err != nil { return nil, nil, err } ownerID := owner.NewIDFromPublicKey(&key.PublicKey) sessionToken = sessionTokenForOwner(ownerID, cliRes) cfg.stoken = sessionToken _ = p.cache.Put(cacheKey, sessionToken) } } clientCallOptions = append(clientCallOptions, client.WithSession(sessionToken)) if cfg.btoken != nil { clientCallOptions = append(clientCallOptions, client.WithBearer(cfg.btoken)) } return cp, clientCallOptions, nil } func (p *pool) checkSessionTokenErr(err error, address string) bool { if err == nil { return false } if strings.Contains(err.Error(), "session token does not exist") || strings.Contains(err.Error(), "session token has been expired") { p.cache.DeleteByPrefix(address) return true } return false } func createSessionTokenForDuration(ctx context.Context, c Client, dur uint64) (*client.CreateSessionRes, error) { ni, err := c.NetworkInfo(ctx, client.NetworkInfoPrm{}) if err != nil { return nil, err } epoch := ni.Info().CurrentEpoch() var prm client.CreateSessionPrm if math.MaxUint64-epoch < dur { prm.SetExp(math.MaxUint64) } else { prm.SetExp(epoch + dur) } return c.CreateSession(ctx, prm) } func (p *pool) removeSessionTokenAfterThreshold(cfg *callConfig) error { cp, err := p.connection() if err != nil { return err } key := p.key if cfg.key != nil { key = cfg.key } ts, ok := p.cache.GetAccessTime(formCacheKey(cp.address, key)) if ok && time.Since(ts) > p.stokenThreshold { p.cache.DeleteByPrefix(cp.address) } return nil } type callContext struct { // base context for RPC context.Context client Client // client endpoint endpoint string // request signer key *ecdsa.PrivateKey // flag to open default session if session token is missing sessionDefault bool sessionTarget func(session.Token) } func (p *pool) initCallContext(ctx *callContext, cfg *callConfig) error { cp, err := p.connection() if err != nil { return err } ctx.key = cfg.key if ctx.key == nil { // use pool key if caller didn't specify its own ctx.key = p.key } ctx.endpoint = cp.address ctx.client = cp.client if ctx.sessionTarget == nil && cfg.stoken != nil { ctx.sessionTarget(*cfg.stoken) } // note that we don't override session provided by the caller ctx.sessionDefault = cfg.stoken == nil && cfg.useDefaultSession return err } type callContextWithRetry struct { callContext noRetry bool } func (p *pool) initCallContextWithRetry(ctx *callContextWithRetry, cfg *callConfig) error { err := p.initCallContext(&ctx.callContext, cfg) if err != nil { return err } // don't retry if session was specified by the caller ctx.noRetry = cfg.stoken != nil return nil } // opens new session or uses cached one. // Must be called only on initialized callContext with set sessionTarget. func (p *pool) openDefaultSession(ctx *callContext) error { cacheKey := formCacheKey(ctx.endpoint, ctx.key) tok := p.cache.Get(cacheKey) if tok != nil { // use cached token ctx.sessionTarget(*tok) return nil } // open new session var cliPrm client.CreateSessionPrm cliPrm.SetExp(math.MaxUint32) cliRes, err := ctx.client.CreateSession(ctx, cliPrm) if err != nil { return fmt.Errorf("session API client: %w", err) } tok = sessionTokenForOwner(owner.NewIDFromPublicKey(&ctx.key.PublicKey), cliRes) // sign the token err = tok.Sign(ctx.key) if err != nil { return fmt.Errorf("sign token of the opened session: %w", err) } // cache the opened session p.cache.Put(cacheKey, tok) ctx.sessionTarget(*tok) return nil } // opens default session (if sessionDefault is set), and calls f. If f returns // session-related error (*), and retrying is enabled, then f is called once more. // // (*) in this case cached token is removed. func (p *pool) callWithRetry(ctx *callContextWithRetry, f func() error) error { var err error if ctx.sessionDefault { err = p.openDefaultSession(&ctx.callContext) if err != nil { return fmt.Errorf("open default session: %w", err) } } err = f() if p.checkSessionTokenErr(err, ctx.endpoint) && !ctx.noRetry { // don't retry anymore ctx.noRetry = true return p.callWithRetry(ctx, f) } return err } func (p *pool) PutObject(ctx context.Context, hdr object.Object, payload io.Reader, opts ...CallOption) (*oid.ID, error) { cfg := cfgFromOpts(append(opts, useDefaultSession())...) // Put object is different from other object service methods. Put request // can't be resent in case of session token failures (i.e. session token is // invalid due to lifetime expiration or server restart). The reason is that // object's payload can be provided as a stream that should be read only once. // // To solve this issue, pool regenerates session tokens upon each request. // In case of subsequent requests, pool avoids session token initialization // by checking when the session token was accessed for the last time. If it // hits a threshold, session token is removed from cache for a new one to be // issued. err := p.removeSessionTokenAfterThreshold(cfg) if err != nil { return nil, err } var ctxCall callContext ctxCall.Context = ctx err = p.initCallContext(&ctxCall, cfg) if err != nil { return nil, fmt.Errorf("init call context") } var prm client.PrmObjectPutInit wObj, err := ctxCall.client.ObjectPutInit(ctx, prm) if err != nil { return nil, fmt.Errorf("init writing on API client: %w", err) } if ctxCall.sessionDefault { ctxCall.sessionTarget = wObj.WithinSession err = p.openDefaultSession(&ctxCall) if err != nil { return nil, fmt.Errorf("open default session: %w", err) } } wObj.UseKey(*ctxCall.key) if cfg.btoken != nil { wObj.WithBearerToken(*cfg.btoken) } if wObj.WriteHeader(hdr) { sz := hdr.PayloadSize() if data := hdr.Payload(); len(data) > 0 { if payload != nil { payload = io.MultiReader(bytes.NewReader(data), payload) } else { payload = bytes.NewReader(data) sz = uint64(len(data)) } } if payload != nil { const defaultBufferSizePut = 4096 // configure? if sz == 0 || sz > defaultBufferSizePut { sz = defaultBufferSizePut } buf := make([]byte, sz) var n int for { n, err = payload.Read(buf) if n > 0 { if !wObj.WritePayloadChunk(buf[:n]) { break } continue } if errors.Is(err, io.EOF) { break } return nil, fmt.Errorf("read payload: %w", err) } } } res, err := wObj.Close() if err != nil { // here err already carries both status and client errors // removes session token from cache in case of token error p.checkSessionTokenErr(err, ctxCall.endpoint) return nil, fmt.Errorf("client failure: %w", err) } var id oid.ID if !res.ReadStoredObjectID(&id) { return nil, errors.New("missing ID of the stored object") } return &id, nil } func (p *pool) DeleteObject(ctx context.Context, params *client.DeleteObjectParams, opts ...CallOption) error { cfg := cfgFromOpts(append(opts, useDefaultSession())...) cp, options, err := p.conn(ctx, cfg) if err != nil { return err } _, err = cp.client.DeleteObject(ctx, params, options...) if p.checkSessionTokenErr(err, cp.address) && !cfg.isRetry { opts = append(opts, retry()) return p.DeleteObject(ctx, params, opts...) } // here err already carries both status and client errors return err } type objectReadCloser client.ObjectReader func (x *objectReadCloser) Read(p []byte) (int, error) { return (*client.ObjectReader)(x).Read(p) } func (x *objectReadCloser) Close() error { _, err := (*client.ObjectReader)(x).Close() return err } type ResGetObject struct { Header object.Object Payload io.ReadCloser } func (p *pool) GetObject(ctx context.Context, addr address.Address, opts ...CallOption) (*ResGetObject, error) { cfg := cfgFromOpts(append(opts, useDefaultSession())...) var prm client.PrmObjectGet var cc callContextWithRetry cc.Context = ctx cc.sessionTarget = prm.WithinSession err := p.initCallContextWithRetry(&cc, cfg) if err != nil { return nil, err } if cnr := addr.ContainerID(); cnr != nil { prm.FromContainer(*cnr) } if obj := addr.ObjectID(); obj != nil { prm.ByID(*obj) } var res ResGetObject err = p.callWithRetry(&cc, func() error { rObj, err := cc.client.ObjectGetInit(ctx, prm) if err != nil { return fmt.Errorf("init object reading on client: %w", err) } rObj.UseKey(*cc.key) if !rObj.ReadHeader(&res.Header) { _, err = rObj.Close() return fmt.Errorf("read header: %w", err) } res.Payload = (*objectReadCloser)(rObj) return nil }) if err != nil { return nil, err } return &res, nil } func (p *pool) HeadObject(ctx context.Context, addr address.Address, opts ...CallOption) (*object.Object, error) { cfg := cfgFromOpts(append(opts, useDefaultSession())...) var prm client.PrmObjectHead var cc callContextWithRetry cc.Context = ctx cc.sessionTarget = prm.WithinSession err := p.initCallContextWithRetry(&cc, cfg) if err != nil { return nil, err } if cnr := addr.ContainerID(); cnr != nil { prm.FromContainer(*cnr) } if obj := addr.ObjectID(); obj != nil { prm.ByID(*obj) } var obj object.Object err = p.callWithRetry(&cc, func() error { res, err := cc.client.ObjectHead(ctx, prm) if err != nil { return fmt.Errorf("read object header via client: %w", err) } if !res.ReadHeader(&obj) { return errors.New("missing object header in response") } return nil }) if err != nil { return nil, err } return &obj, nil } type ResObjectRange struct { payload *client.ObjectRangeReader } func (x *ResObjectRange) Read(p []byte) (int, error) { return x.payload.Read(p) } func (x *ResObjectRange) Close() error { _, err := x.payload.Close() return err } func (p *pool) ObjectRange(ctx context.Context, addr address.Address, off, ln uint64, opts ...CallOption) (*ResObjectRange, error) { cfg := cfgFromOpts(append(opts, useDefaultSession())...) var prm client.PrmObjectRange prm.SetOffset(off) prm.SetLength(ln) var cc callContextWithRetry cc.Context = ctx cc.sessionTarget = prm.WithinSession err := p.initCallContextWithRetry(&cc, cfg) if err != nil { return nil, err } if cnr := addr.ContainerID(); cnr != nil { prm.FromContainer(*cnr) } if obj := addr.ObjectID(); obj != nil { prm.ByID(*obj) } var res ResObjectRange err = p.callWithRetry(&cc, func() error { var err error res.payload, err = cc.client.ObjectRangeInit(ctx, prm) if err != nil { return fmt.Errorf("init payload range reading on client: %w", err) } res.payload.UseKey(*cc.key) return nil }) if err != nil { return nil, err } return &res, nil } func copyRangeChecksumParams(prm *client.RangeChecksumParams) *client.RangeChecksumParams { var prmCopy client.RangeChecksumParams prmCopy.WithAddress(prm.Address()) prmCopy.WithSalt(prm.Salt()) prmCopy.WithRangeList(prm.RangeList()...) return &prmCopy } func (p *pool) ObjectPayloadRangeSHA256(ctx context.Context, params *client.RangeChecksumParams, opts ...CallOption) ([][32]byte, error) { cfg := cfgFromOpts(append(opts, useDefaultSession())...) cp, options, err := p.conn(ctx, cfg) if err != nil { return nil, err } // FIXME: pretty bad approach but we should not mutate params through the pointer // If non-SHA256 algo is set then we need to reset it. params = copyRangeChecksumParams(params) // SHA256 by default, no need to do smth res, err := cp.client.HashObjectPayloadRanges(ctx, params, options...) if p.checkSessionTokenErr(err, cp.address) && !cfg.isRetry { opts = append(opts, retry()) return p.ObjectPayloadRangeSHA256(ctx, params, opts...) } if err != nil { // here err already carries both status and client errors return nil, err } cliHashes := res.Hashes() hs := make([][sha256.Size]byte, len(cliHashes)) for i := range cliHashes { if ln := len(cliHashes[i]); ln != sha256.Size { return nil, fmt.Errorf("invalid SHA256 checksum size %d", ln) } copy(hs[i][:], cliHashes[i]) } return hs, nil } func (p *pool) ObjectPayloadRangeTZ(ctx context.Context, params *client.RangeChecksumParams, opts ...CallOption) ([][64]byte, error) { cfg := cfgFromOpts(append(opts, useDefaultSession())...) cp, options, err := p.conn(ctx, cfg) if err != nil { return nil, err } // FIXME: pretty bad approach but we should not mutate params through the pointer // We need to set Tillich-Zemor algo. params = copyRangeChecksumParams(params) params.TZ() res, err := cp.client.HashObjectPayloadRanges(ctx, params, options...) if p.checkSessionTokenErr(err, cp.address) && !cfg.isRetry { opts = append(opts, retry()) return p.ObjectPayloadRangeTZ(ctx, params, opts...) } if err != nil { // here err already carries both status and client errors return nil, err } cliHashes := res.Hashes() hs := make([][client.TZSize]byte, len(cliHashes)) for i := range cliHashes { if ln := len(cliHashes[i]); ln != client.TZSize { return nil, fmt.Errorf("invalid TZ checksum size %d", ln) } copy(hs[i][:], cliHashes[i]) } return hs, nil } func (p *pool) SearchObject(ctx context.Context, params *client.SearchObjectParams, opts ...CallOption) ([]*oid.ID, error) { cfg := cfgFromOpts(append(opts, useDefaultSession())...) cp, options, err := p.conn(ctx, cfg) if err != nil { return nil, err } res, err := cp.client.SearchObjects(ctx, params, options...) if p.checkSessionTokenErr(err, cp.address) && !cfg.isRetry { opts = append(opts, retry()) return p.SearchObject(ctx, params, opts...) } if err != nil { // here err already carries both status and client errors return nil, err } return res.IDList(), nil } func (p *pool) PutContainer(ctx context.Context, cnr *container.Container, opts ...CallOption) (*cid.ID, error) { cfg := cfgFromOpts(opts...) cp, _, err := p.conn(ctx, cfg) if err != nil { return nil, err } var cliPrm client.ContainerPutPrm if cnr != nil { cliPrm.SetContainer(*cnr) } res, err := cp.client.PutContainer(ctx, cliPrm) if p.checkSessionTokenErr(err, cp.address) && !cfg.isRetry { opts = append(opts, retry()) return p.PutContainer(ctx, cnr, opts...) } if err != nil { // here err already carries both status and client errors return nil, err } return res.ID(), nil } func (p *pool) GetContainer(ctx context.Context, cid *cid.ID, opts ...CallOption) (*container.Container, error) { cfg := cfgFromOpts(opts...) cp, _, err := p.conn(ctx, cfg) if err != nil { return nil, err } var cliPrm client.ContainerGetPrm if cid != nil { cliPrm.SetContainer(*cid) } res, err := cp.client.GetContainer(ctx, cliPrm) if p.checkSessionTokenErr(err, cp.address) && !cfg.isRetry { opts = append(opts, retry()) return p.GetContainer(ctx, cid, opts...) } if err != nil { // here err already carries both status and client errors return nil, err } return res.Container(), nil } func (p *pool) ListContainers(ctx context.Context, ownerID *owner.ID, opts ...CallOption) ([]*cid.ID, error) { cfg := cfgFromOpts(opts...) cp, _, err := p.conn(ctx, cfg) if err != nil { return nil, err } var cliPrm client.ContainerListPrm if ownerID != nil { cliPrm.SetAccount(*ownerID) } res, err := cp.client.ListContainers(ctx, cliPrm) if p.checkSessionTokenErr(err, cp.address) && !cfg.isRetry { opts = append(opts, retry()) return p.ListContainers(ctx, ownerID, opts...) } if err != nil { // here err already carries both status and client errors return nil, err } return res.Containers(), nil } func (p *pool) DeleteContainer(ctx context.Context, cid *cid.ID, opts ...CallOption) error { cfg := cfgFromOpts(opts...) cp, _, err := p.conn(ctx, cfg) if err != nil { return err } var cliPrm client.ContainerDeletePrm if cid != nil { cliPrm.SetContainer(*cid) } if cfg.stoken != nil { cliPrm.SetSessionToken(*cfg.stoken) } _, err = cp.client.DeleteContainer(ctx, cliPrm) if p.checkSessionTokenErr(err, cp.address) && !cfg.isRetry { opts = append(opts, retry()) return p.DeleteContainer(ctx, cid, opts...) } // here err already carries both status and client errors return err } func (p *pool) GetEACL(ctx context.Context, cid *cid.ID, opts ...CallOption) (*eacl.Table, error) { cfg := cfgFromOpts(opts...) cp, _, err := p.conn(ctx, cfg) if err != nil { return nil, err } var cliPrm client.EACLPrm if cid != nil { cliPrm.SetContainer(*cid) } res, err := cp.client.EACL(ctx, cliPrm) if p.checkSessionTokenErr(err, cp.address) && !cfg.isRetry { opts = append(opts, retry()) return p.GetEACL(ctx, cid, opts...) } if err != nil { // here err already carries both status and client errors return nil, err } return res.Table(), nil } func (p *pool) SetEACL(ctx context.Context, table *eacl.Table, opts ...CallOption) error { cfg := cfgFromOpts(opts...) cp, _, err := p.conn(ctx, cfg) if err != nil { return err } var cliPrm client.SetEACLPrm if table != nil { cliPrm.SetTable(*table) } if cfg.stoken != nil { cliPrm.SetSessionToken(*cfg.stoken) } _, err = cp.client.SetEACL(ctx, cliPrm) if p.checkSessionTokenErr(err, cp.address) && !cfg.isRetry { opts = append(opts, retry()) return p.SetEACL(ctx, table, opts...) } // here err already carries both status and client errors return err } func (p *pool) Balance(ctx context.Context, o *owner.ID, opts ...CallOption) (*accounting.Decimal, error) { cfg := cfgFromOpts(opts...) cp, _, err := p.conn(ctx, cfg) if err != nil { return nil, err } var cliPrm client.GetBalancePrm if o != nil { cliPrm.SetAccount(*o) } res, err := cp.client.GetBalance(ctx, cliPrm) if err != nil { // here err already carries both status and client errors return nil, err } return res.Amount(), nil } func (p *pool) WaitForContainerPresence(ctx context.Context, cid *cid.ID, pollParams *ContainerPollingParams) error { conn, _, err := p.Connection() if err != nil { return err } wctx, cancel := context.WithTimeout(ctx, pollParams.CreationTimeout) defer cancel() ticker := time.NewTimer(pollParams.PollInterval) defer ticker.Stop() wdone := wctx.Done() done := ctx.Done() var cliPrm client.ContainerGetPrm if cid != nil { cliPrm.SetContainer(*cid) } for { select { case <-done: return ctx.Err() case <-wdone: return wctx.Err() case <-ticker.C: _, err = conn.GetContainer(ctx, cliPrm) if err == nil { return nil } ticker.Reset(pollParams.PollInterval) } } } // Close closes the pool and releases all the associated resources. func (p *pool) Close() { p.cancel() <-p.closedCh } // creates new session token from CreateSession call result. func (p *pool) newSessionToken(cliRes *client.CreateSessionRes) *session.Token { return sessionTokenForOwner(p.owner, cliRes) } // creates new session token with specified owner from CreateSession call result. func sessionTokenForOwner(id *owner.ID, cliRes *client.CreateSessionRes) *session.Token { st := session.NewToken() st.SetOwnerID(id) st.SetID(cliRes.ID()) st.SetSessionKey(cliRes.PublicKey()) return st }