package pool import ( "bytes" "context" "crypto/ecdsa" "errors" "fmt" "io" "math" "math/rand" "sort" "sync" "sync/atomic" "time" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/accounting" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/ape" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer" sdkClient "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" frostfsecdsa "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/crypto/ecdsa" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/relations" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user" "github.com/google/uuid" "github.com/nspcc-dev/neo-go/pkg/crypto/keys" "go.uber.org/zap" "go.uber.org/zap/zapcore" "google.golang.org/grpc" ) // client represents virtual connection to the single FrostFS network endpoint from which Pool is formed. // This interface is expected to have exactly one production implementation - clientWrapper. // Others are expected to be for test purposes only. type client interface { // see clientWrapper.balanceGet. balanceGet(context.Context, PrmBalanceGet) (accounting.Decimal, error) // see clientWrapper.containerPut. containerPut(context.Context, PrmContainerPut) (cid.ID, error) // see clientWrapper.containerGet. containerGet(context.Context, PrmContainerGet) (container.Container, error) // see clientWrapper.containerList. containerList(context.Context, PrmContainerList) ([]cid.ID, error) // see clientWrapper.containerListStream. containerListStream(context.Context, PrmListStream) (ResListStream, error) // see clientWrapper.containerDelete. containerDelete(context.Context, PrmContainerDelete) error // see clientWrapper.apeManagerAddChain. apeManagerAddChain(context.Context, PrmAddAPEChain) error // see clientWrapper.apeManagerRemoveChain. apeManagerRemoveChain(context.Context, PrmRemoveAPEChain) error // see clientWrapper.apeManagerListChains. apeManagerListChains(context.Context, PrmListAPEChains) ([]ape.Chain, error) // see clientWrapper.endpointInfo. endpointInfo(context.Context, prmEndpointInfo) (netmap.NodeInfo, error) // see clientWrapper.healthcheck. healthcheck(ctx context.Context) (netmap.NodeInfo, error) // see clientWrapper.networkInfo. networkInfo(context.Context, prmNetworkInfo) (netmap.NetworkInfo, error) // see clientWrapper.netMapSnapshot netMapSnapshot(context.Context, prmNetMapSnapshot) (netmap.NetMap, error) // see clientWrapper.objectPut. objectPut(context.Context, PrmObjectPut) (ResPutObject, error) // see clientWrapper.objectPatch. objectPatch(context.Context, PrmObjectPatch) (ResPatchObject, error) // see clientWrapper.objectDelete. objectDelete(context.Context, PrmObjectDelete) error // see clientWrapper.objectGet. objectGet(context.Context, PrmObjectGet) (ResGetObject, error) // see clientWrapper.objectHead. objectHead(context.Context, PrmObjectHead) (object.Object, error) // see clientWrapper.objectRange. objectRange(context.Context, PrmObjectRange) (ResObjectRange, error) // see clientWrapper.objectSearch. objectSearch(context.Context, PrmObjectSearch) (ResObjectSearch, error) // see clientWrapper.sessionCreate. sessionCreate(context.Context, prmCreateSession) (resCreateSession, error) clientStatus // see clientWrapper.dial. dial(ctx context.Context) error // see clientWrapper.restart. restart(ctx context.Context) error // see clientWrapper.close. close() error } // clientStatus provide access to some metrics for connection. type clientStatus interface { // isHealthy checks if the connection can handle requests. isHealthy() bool // setUnhealthy marks client as unhealthy. setUnhealthy() // setHealthy marks client as healthy. setHealthy() // address return address of endpoint. address() string // currentErrorRate returns current errors rate. // After specific threshold connection is considered as unhealthy. // Pool.startRebalance routine can make this connection healthy again. currentErrorRate() uint32 // overallErrorRate returns the number of all happened errors. overallErrorRate() uint64 // methodsStatus returns statistic for all used methods. methodsStatus() []StatusSnapshot } // errPoolClientUnhealthy is an error to indicate that client in pool is unhealthy. var errPoolClientUnhealthy = errors.New("pool client unhealthy") // clientStatusMonitor count error rate and other statistics for connection. type clientStatusMonitor struct { logger *zap.Logger addr string healthy *atomic.Uint32 errorThreshold uint32 mu sync.RWMutex // protect counters currentErrorCount uint32 overallErrorCount uint64 methods []*MethodStatus } // values for healthy status of clientStatusMonitor. const ( // statusUnhealthyOnRequest is set when communication after dialing to the // endpoint is failed due to immediate or accumulated errors, connection is // available and pool should close it before re-establishing connection once again. statusUnhealthyOnRequest = iota // statusHealthy is set when connection is ready to be used by the pool. statusHealthy ) // MethodIndex index of method in list of statuses in clientStatusMonitor. type MethodIndex int const ( methodBalanceGet MethodIndex = iota methodContainerPut methodContainerGet methodContainerList methodContainerListStream methodContainerDelete methodEndpointInfo methodNetworkInfo methodNetMapSnapshot methodObjectPut methodObjectDelete methodObjectGet methodObjectHead methodObjectRange methodObjectPatch methodSessionCreate methodAPEManagerAddChain methodAPEManagerRemoveChain methodAPEManagerListChains methodLast ) // String implements fmt.Stringer. func (m MethodIndex) String() string { switch m { case methodBalanceGet: return "balanceGet" case methodContainerPut: return "containerPut" case methodContainerGet: return "containerGet" case methodContainerList: return "containerList" case methodContainerDelete: return "containerDelete" case methodEndpointInfo: return "endpointInfo" case methodNetworkInfo: return "networkInfo" case methodNetMapSnapshot: return "netMapSnapshot" case methodObjectPut: return "objectPut" case methodObjectPatch: return "objectPatch" case methodObjectDelete: return "objectDelete" case methodObjectGet: return "objectGet" case methodObjectHead: return "objectHead" case methodObjectRange: return "objectRange" case methodSessionCreate: return "sessionCreate" case methodAPEManagerAddChain: return "apeManagerAddChain" case methodAPEManagerRemoveChain: return "apeManagerRemoveChain" case methodAPEManagerListChains: return "apeManagerListChains" case methodLast: return "it's a system name rather than a method" default: return "unknown" } } func newClientStatusMonitor(logger *zap.Logger, addr string, errorThreshold uint32) clientStatusMonitor { methods := make([]*MethodStatus, methodLast) for i := methodBalanceGet; i < methodLast; i++ { methods[i] = &MethodStatus{name: i.String()} } healthy := new(atomic.Uint32) healthy.Store(statusHealthy) return clientStatusMonitor{ logger: logger, addr: addr, healthy: healthy, errorThreshold: errorThreshold, methods: methods, } } // clientWrapper is used by default, alternative implementations are intended for testing purposes only. type clientWrapper struct { clientMutex sync.RWMutex client *sdkClient.Client dialed bool prm wrapperPrm clientStatusMonitor } // wrapperPrm is params to create clientWrapper. type wrapperPrm struct { logger *zap.Logger address string key ecdsa.PrivateKey dialTimeout time.Duration streamTimeout time.Duration errorThreshold uint32 responseInfoCallback func(sdkClient.ResponseMetaInfo) error poolRequestInfoCallback func(RequestInfo) dialOptions []grpc.DialOption gracefulCloseOnSwitchTimeout time.Duration } // setAddress sets endpoint to connect in FrostFS network. func (x *wrapperPrm) setAddress(address string) { x.address = address } // setKey sets sdkClient.Client private key to be used for the protocol communication by default. func (x *wrapperPrm) setKey(key ecdsa.PrivateKey) { x.key = key } // setLogger sets sdkClient.Client logger. func (x *wrapperPrm) setLogger(logger *zap.Logger) { x.logger = logger } // setDialTimeout sets the timeout for connection to be established. func (x *wrapperPrm) setDialTimeout(timeout time.Duration) { x.dialTimeout = timeout } // setStreamTimeout sets the timeout for individual operations in streaming RPC. func (x *wrapperPrm) setStreamTimeout(timeout time.Duration) { x.streamTimeout = timeout } // setErrorThreshold sets threshold after reaching which connection is considered unhealthy // until Pool.startRebalance routing updates its status. func (x *wrapperPrm) setErrorThreshold(threshold uint32) { x.errorThreshold = threshold } // setGracefulCloseOnSwitchTimeout specifies the timeout after which unhealthy client be closed during rebalancing // if it will become healthy back. // // See also setErrorThreshold. func (x *wrapperPrm) setGracefulCloseOnSwitchTimeout(timeout time.Duration) { x.gracefulCloseOnSwitchTimeout = timeout } // setPoolRequestCallback sets callback that will be invoked after every pool response. func (x *wrapperPrm) setPoolRequestCallback(f func(RequestInfo)) { x.poolRequestInfoCallback = f } // setResponseInfoCallback sets callback that will be invoked after every response. func (x *wrapperPrm) setResponseInfoCallback(f func(sdkClient.ResponseMetaInfo) error) { x.responseInfoCallback = f } // setGRPCDialOptions sets the gRPC dial options for new gRPC client connection. func (x *wrapperPrm) setGRPCDialOptions(opts []grpc.DialOption) { x.dialOptions = opts } // newWrapper creates a clientWrapper that implements the client interface. func newWrapper(prm wrapperPrm) *clientWrapper { var cl sdkClient.Client prmInit := sdkClient.PrmInit{ Key: prm.key, ResponseInfoCallback: prm.responseInfoCallback, } cl.Init(prmInit) res := &clientWrapper{ client: &cl, clientStatusMonitor: newClientStatusMonitor(prm.logger, prm.address, prm.errorThreshold), prm: prm, } return res } // dial establishes a connection to the server from the FrostFS network. // Returns an error describing failure reason. If failed, the client // SHOULD NOT be used. func (c *clientWrapper) dial(ctx context.Context) error { cl, err := c.getClient() if err != nil { return err } prmDial := sdkClient.PrmDial{ Endpoint: c.prm.address, DialTimeout: c.prm.dialTimeout, StreamTimeout: c.prm.streamTimeout, GRPCDialOptions: c.prm.dialOptions, } err = cl.Dial(ctx, prmDial) c.setDialed(err == nil) if err != nil { return err } return nil } // restart recreates and redial inner sdk client. func (c *clientWrapper) restart(ctx context.Context) error { var cl sdkClient.Client prmInit := sdkClient.PrmInit{ Key: c.prm.key, ResponseInfoCallback: c.prm.responseInfoCallback, } cl.Init(prmInit) prmDial := sdkClient.PrmDial{ Endpoint: c.prm.address, DialTimeout: c.prm.dialTimeout, StreamTimeout: c.prm.streamTimeout, GRPCDialOptions: c.prm.dialOptions, } // if connection is dialed before, to avoid routine / connection leak, // pool has to close it and then initialize once again. if c.isDialed() { c.scheduleGracefulClose() } err := cl.Dial(ctx, prmDial) c.setDialed(err == nil) if err != nil { return err } c.clientMutex.Lock() c.client = &cl c.clientMutex.Unlock() return nil } func (c *clientWrapper) isDialed() bool { c.mu.RLock() defer c.mu.RUnlock() return c.dialed } func (c *clientWrapper) setDialed(dialed bool) { c.mu.Lock() c.dialed = dialed c.mu.Unlock() } func (c *clientWrapper) getClient() (*sdkClient.Client, error) { c.clientMutex.RLock() defer c.clientMutex.RUnlock() if c.isHealthy() { return c.client, nil } return nil, errPoolClientUnhealthy } func (c *clientWrapper) getClientRaw() *sdkClient.Client { c.clientMutex.RLock() defer c.clientMutex.RUnlock() return c.client } // balanceGet invokes sdkClient.BalanceGet parse response status to error and return result as is. func (c *clientWrapper) balanceGet(ctx context.Context, prm PrmBalanceGet) (accounting.Decimal, error) { cl, err := c.getClient() if err != nil { return accounting.Decimal{}, err } cliPrm := sdkClient.PrmBalanceGet{ Account: prm.account, } start := time.Now() res, err := cl.BalanceGet(ctx, cliPrm) c.incRequests(time.Since(start), methodBalanceGet) var st apistatus.Status if res != nil { st = res.Status() } if err = c.handleError(ctx, st, err); err != nil { return accounting.Decimal{}, fmt.Errorf("balance get on client: %w", err) } return res.Amount(), nil } // containerPut invokes sdkClient.ContainerPut parse response status to error and return result as is. // It also waits for the container to appear on the network. func (c *clientWrapper) containerPut(ctx context.Context, prm PrmContainerPut) (cid.ID, error) { cl, err := c.getClient() if err != nil { return cid.ID{}, err } start := time.Now() res, err := cl.ContainerPut(ctx, prm.ClientParams) c.incRequests(time.Since(start), methodContainerPut) var st apistatus.Status if res != nil { st = res.Status() } if err = c.handleError(ctx, st, err); err != nil { return cid.ID{}, fmt.Errorf("container put on client: %w", err) } if prm.WaitParams == nil { prm.WaitParams = defaultWaitParams() } if err = prm.WaitParams.CheckValidity(); err != nil { return cid.ID{}, fmt.Errorf("invalid wait parameters: %w", err) } idCnr := res.ID() getPrm := PrmContainerGet{ ContainerID: idCnr, Session: prm.ClientParams.Session, } err = waitForContainerPresence(ctx, c, getPrm, prm.WaitParams) if err = c.handleError(ctx, nil, err); err != nil { return cid.ID{}, fmt.Errorf("wait container presence on client: %w", err) } return idCnr, nil } // containerGet invokes sdkClient.ContainerGet parse response status to error and return result as is. func (c *clientWrapper) containerGet(ctx context.Context, prm PrmContainerGet) (container.Container, error) { cl, err := c.getClient() if err != nil { return container.Container{}, err } cliPrm := sdkClient.PrmContainerGet{ ContainerID: &prm.ContainerID, Session: prm.Session, } start := time.Now() res, err := cl.ContainerGet(ctx, cliPrm) c.incRequests(time.Since(start), methodContainerGet) var st apistatus.Status if res != nil { st = res.Status() } if err = c.handleError(ctx, st, err); err != nil { return container.Container{}, fmt.Errorf("container get on client: %w", err) } return res.Container(), nil } // containerList invokes sdkClient.ContainerList parse response status to error and return result as is. func (c *clientWrapper) containerList(ctx context.Context, prm PrmContainerList) ([]cid.ID, error) { cl, err := c.getClient() if err != nil { return nil, err } cliPrm := sdkClient.PrmContainerList{ OwnerID: prm.OwnerID, Session: prm.Session, } start := time.Now() res, err := cl.ContainerList(ctx, cliPrm) c.incRequests(time.Since(start), methodContainerList) var st apistatus.Status if res != nil { st = res.Status() } if err = c.handleError(ctx, st, err); err != nil { return nil, fmt.Errorf("container list on client: %w", err) } return res.Containers(), nil } // PrmListStream groups parameters of ListContainersStream operation. type PrmListStream struct { OwnerID user.ID Session *session.Container } // ResListStream is designed to read list of object identifiers from FrostFS system. // // Must be initialized using Pool.ListContainersStream, any other usage is unsafe. type ResListStream struct { r *sdkClient.ContainerListReader handleError func(context.Context, apistatus.Status, error) error } // Read reads another list of the container identifiers. func (x *ResListStream) Read(buf []cid.ID) (int, error) { n, ok := x.r.Read(buf) if !ok { res, err := x.r.Close() if err == nil { return n, io.EOF } var status apistatus.Status if res != nil { status = res.Status() } err = x.handleError(nil, status, err) return n, err } return n, nil } // Iterate iterates over the list of found container identifiers. // f can return true to stop iteration earlier. // // Returns an error if container can't be read. func (x *ResListStream) Iterate(f func(cid.ID) bool) error { return x.r.Iterate(f) } // Close ends reading list of the matched containers and returns the result of the operation // along with the final results. Must be called after using the ResListStream. func (x *ResListStream) Close() { _, _ = x.r.Close() } // containerList invokes sdkClient.ContainerList parse response status to error and return result as is. func (c *clientWrapper) containerListStream(ctx context.Context, prm PrmListStream) (ResListStream, error) { cl, err := c.getClient() if err != nil { return ResListStream{}, err } cliPrm := sdkClient.PrmContainerListStream{ OwnerID: prm.OwnerID, Session: prm.Session, } res, err := cl.ContainerListInit(ctx, cliPrm) if err = c.handleError(ctx, nil, err); err != nil { return ResListStream{}, fmt.Errorf("init container listing on client: %w", err) } return ResListStream{r: res, handleError: c.handleError}, nil } // containerDelete invokes sdkClient.ContainerDelete parse response status to error. // It also waits for the container to be removed from the network. func (c *clientWrapper) containerDelete(ctx context.Context, prm PrmContainerDelete) error { cl, err := c.getClient() if err != nil { return err } cliPrm := sdkClient.PrmContainerDelete{ ContainerID: &prm.ContainerID, Session: prm.Session, } start := time.Now() res, err := cl.ContainerDelete(ctx, cliPrm) c.incRequests(time.Since(start), methodContainerDelete) var st apistatus.Status if res != nil { st = res.Status() } if err = c.handleError(ctx, st, err); err != nil { return fmt.Errorf("container delete on client: %w", err) } if prm.WaitParams == nil { prm.WaitParams = defaultWaitParams() } if err := prm.WaitParams.CheckValidity(); err != nil { return fmt.Errorf("invalid wait parameters: %w", err) } getPrm := PrmContainerGet{ ContainerID: prm.ContainerID, Session: prm.Session, } return waitForContainerRemoved(ctx, c, getPrm, prm.WaitParams) } // apeManagerAddChain invokes sdkClient.APEManagerAddChain and parse response status to error. func (c *clientWrapper) apeManagerAddChain(ctx context.Context, prm PrmAddAPEChain) error { cl, err := c.getClient() if err != nil { return err } cliPrm := sdkClient.PrmAPEManagerAddChain{ ChainTarget: prm.Target, Chain: prm.Chain, } start := time.Now() res, err := cl.APEManagerAddChain(ctx, cliPrm) c.incRequests(time.Since(start), methodAPEManagerAddChain) var st apistatus.Status if res != nil { st = res.Status() } if err = c.handleError(ctx, st, err); err != nil { return fmt.Errorf("add chain error: %w", err) } return nil } // apeManagerRemoveChain invokes sdkClient.APEManagerRemoveChain and parse response status to error. func (c *clientWrapper) apeManagerRemoveChain(ctx context.Context, prm PrmRemoveAPEChain) error { cl, err := c.getClient() if err != nil { return err } cliPrm := sdkClient.PrmAPEManagerRemoveChain{ ChainTarget: prm.Target, ChainID: prm.ChainID, } start := time.Now() res, err := cl.APEManagerRemoveChain(ctx, cliPrm) c.incRequests(time.Since(start), methodAPEManagerRemoveChain) var st apistatus.Status if res != nil { st = res.Status() } if err = c.handleError(ctx, st, err); err != nil { return fmt.Errorf("remove chain error: %w", err) } return nil } // apeManagerListChains invokes sdkClient.APEManagerListChains. Returns chains and parsed response status to error. func (c *clientWrapper) apeManagerListChains(ctx context.Context, prm PrmListAPEChains) ([]ape.Chain, error) { cl, err := c.getClient() if err != nil { return nil, err } cliPrm := sdkClient.PrmAPEManagerListChains{ ChainTarget: prm.Target, } start := time.Now() res, err := cl.APEManagerListChains(ctx, cliPrm) c.incRequests(time.Since(start), methodAPEManagerListChains) var st apistatus.Status if res != nil { st = res.Status() } if err = c.handleError(ctx, st, err); err != nil { return nil, fmt.Errorf("list chains error: %w", err) } return res.Chains, nil } // endpointInfo invokes sdkClient.EndpointInfo parse response status to error and return result as is. func (c *clientWrapper) endpointInfo(ctx context.Context, _ prmEndpointInfo) (netmap.NodeInfo, error) { cl, err := c.getClient() if err != nil { return netmap.NodeInfo{}, err } return c.endpointInfoRaw(ctx, cl) } func (c *clientWrapper) healthcheck(ctx context.Context) (netmap.NodeInfo, error) { cl := c.getClientRaw() return c.endpointInfoRaw(ctx, cl) } func (c *clientWrapper) endpointInfoRaw(ctx context.Context, cl *sdkClient.Client) (netmap.NodeInfo, error) { start := time.Now() res, err := cl.EndpointInfo(ctx, sdkClient.PrmEndpointInfo{}) c.incRequests(time.Since(start), methodEndpointInfo) var st apistatus.Status if res != nil { st = res.Status() } if err = c.handleError(ctx, st, err); err != nil { return netmap.NodeInfo{}, fmt.Errorf("endpoint info on client: %w", err) } return res.NodeInfo(), nil } // networkInfo invokes sdkClient.NetworkInfo parse response status to error and return result as is. func (c *clientWrapper) networkInfo(ctx context.Context, _ prmNetworkInfo) (netmap.NetworkInfo, error) { cl, err := c.getClient() if err != nil { return netmap.NetworkInfo{}, err } start := time.Now() res, err := cl.NetworkInfo(ctx, sdkClient.PrmNetworkInfo{}) c.incRequests(time.Since(start), methodNetworkInfo) var st apistatus.Status if res != nil { st = res.Status() } if err = c.handleError(ctx, st, err); err != nil { return netmap.NetworkInfo{}, fmt.Errorf("network info on client: %w", err) } return res.Info(), nil } // networkInfo invokes sdkClient.NetworkInfo parse response status to error and return result as is. func (c *clientWrapper) netMapSnapshot(ctx context.Context, _ prmNetMapSnapshot) (netmap.NetMap, error) { cl, err := c.getClient() if err != nil { return netmap.NetMap{}, err } start := time.Now() res, err := cl.NetMapSnapshot(ctx, sdkClient.PrmNetMapSnapshot{}) c.incRequests(time.Since(start), methodNetMapSnapshot) var st apistatus.Status if res != nil { st = res.Status() } if err = c.handleError(ctx, st, err); err != nil { return netmap.NetMap{}, fmt.Errorf("network map snapshot on client: %w", err) } return res.NetMap(), nil } // objectPatch patches object in FrostFS. func (c *clientWrapper) objectPatch(ctx context.Context, prm PrmObjectPatch) (ResPatchObject, error) { cl, err := c.getClient() if err != nil { return ResPatchObject{}, err } start := time.Now() pObj, err := cl.ObjectPatchInit(ctx, sdkClient.PrmObjectPatch{ Address: prm.addr, Session: prm.stoken, Key: prm.key, BearerToken: prm.btoken, MaxChunkLength: prm.maxPayloadPatchChunkLength, }) if err = c.handleError(ctx, nil, err); err != nil { return ResPatchObject{}, fmt.Errorf("init patching on API client: %w", err) } c.incRequests(time.Since(start), methodObjectPatch) start = time.Now() attrPatchSuccess := pObj.PatchAttributes(ctx, prm.newAttrs, prm.replaceAttrs) c.incRequests(time.Since(start), methodObjectPatch) if attrPatchSuccess { start = time.Now() _ = pObj.PatchPayload(ctx, prm.rng, prm.payload) c.incRequests(time.Since(start), methodObjectPatch) } res, err := pObj.Close(ctx) var st apistatus.Status if res != nil { st = res.Status() } if err = c.handleError(ctx, st, err); err != nil { return ResPatchObject{}, fmt.Errorf("client failure: %w", err) } return ResPatchObject{ObjectID: res.ObjectID()}, nil } // objectPut writes object to FrostFS. func (c *clientWrapper) objectPut(ctx context.Context, prm PrmObjectPut) (ResPutObject, error) { if prm.bufferMaxSize == 0 { prm.bufferMaxSize = defaultBufferMaxSizeForPut } if prm.clientCut { return c.objectPutClientCut(ctx, prm) } return c.objectPutServerCut(ctx, prm) } func (c *clientWrapper) objectPutServerCut(ctx context.Context, prm PrmObjectPut) (ResPutObject, error) { cl, err := c.getClient() if err != nil { return ResPutObject{}, err } cliPrm := sdkClient.PrmObjectPutInit{ CopiesNumber: prm.copiesNumber, Session: prm.stoken, Key: prm.key, BearerToken: prm.btoken, } start := time.Now() wObj, err := cl.ObjectPutInit(ctx, cliPrm) c.incRequests(time.Since(start), methodObjectPut) if err = c.handleError(ctx, nil, err); err != nil { return ResPutObject{}, fmt.Errorf("init writing on API client: %w", err) } if wObj.WriteHeader(ctx, prm.hdr) { sz := prm.hdr.PayloadSize() if data := prm.hdr.Payload(); len(data) > 0 { if prm.payload != nil { prm.payload = io.MultiReader(bytes.NewReader(data), prm.payload) } else { prm.payload = bytes.NewReader(data) sz = uint64(len(data)) } } if prm.payload != nil { if sz == 0 || sz > prm.bufferMaxSize { sz = prm.bufferMaxSize } buf := make([]byte, sz) var n int for { n, err = prm.payload.Read(buf) if n > 0 { start = time.Now() successWrite := wObj.WritePayloadChunk(ctx, buf[:n]) c.incRequests(time.Since(start), methodObjectPut) if !successWrite { break } continue } if errors.Is(err, io.EOF) { break } return ResPutObject{}, fmt.Errorf("read payload: %w", c.handleError(ctx, nil, err)) } } } res, err := wObj.Close(ctx) var st apistatus.Status if res != nil { st = res.Status() } if err = c.handleError(ctx, st, err); err != nil { // here err already carries both status and client errors return ResPutObject{}, fmt.Errorf("client failure: %w", err) } return ResPutObject{ ObjectID: res.StoredObjectID(), Epoch: res.StoredEpoch(), }, nil } func (c *clientWrapper) objectPutClientCut(ctx context.Context, prm PrmObjectPut) (ResPutObject, error) { putInitPrm := PrmObjectPutClientCutInit{ PrmObjectPut: prm, } start := time.Now() wObj, err := c.objectPutInitTransformer(putInitPrm) c.incRequests(time.Since(start), methodObjectPut) if err = c.handleError(ctx, nil, err); err != nil { return ResPutObject{}, fmt.Errorf("init writing on API client: %w", err) } if wObj.WriteHeader(ctx, prm.hdr) { sz := prm.hdr.PayloadSize() if data := prm.hdr.Payload(); len(data) > 0 { if prm.payload != nil { prm.payload = io.MultiReader(bytes.NewReader(data), prm.payload) } else { prm.payload = bytes.NewReader(data) sz = uint64(len(data)) } } if prm.payload != nil { if sz == 0 || sz > prm.bufferMaxSize { sz = prm.bufferMaxSize } buf := make([]byte, sz) var n int for { n, err = prm.payload.Read(buf) if n > 0 { start = time.Now() successWrite := wObj.WritePayloadChunk(ctx, buf[:n]) c.incRequests(time.Since(start), methodObjectPut) if !successWrite { break } continue } if errors.Is(err, io.EOF) { break } return ResPutObject{}, fmt.Errorf("read payload: %w", c.handleError(ctx, nil, err)) } } } res, err := wObj.Close(ctx) var st apistatus.Status if res != nil { st = res.Status } if err = c.handleError(ctx, st, err); err != nil { // here err already carries both status and client errors return ResPutObject{}, fmt.Errorf("client failure: %w", err) } return ResPutObject{ ObjectID: res.OID, Epoch: res.Epoch, }, nil } // objectDelete invokes sdkClient.ObjectDelete parse response status to error. func (c *clientWrapper) objectDelete(ctx context.Context, prm PrmObjectDelete) error { cl, err := c.getClient() if err != nil { return err } cnr := prm.addr.Container() obj := prm.addr.Object() cliPrm := sdkClient.PrmObjectDelete{ BearerToken: prm.btoken, Session: prm.stoken, ContainerID: &cnr, ObjectID: &obj, Key: prm.key, } start := time.Now() res, err := cl.ObjectDelete(ctx, cliPrm) c.incRequests(time.Since(start), methodObjectDelete) var st apistatus.Status if res != nil { st = res.Status() } if err = c.handleError(ctx, st, err); err != nil { return fmt.Errorf("delete object on client: %w", err) } return nil } // objectGet returns reader for object. func (c *clientWrapper) objectGet(ctx context.Context, prm PrmObjectGet) (ResGetObject, error) { cl, err := c.getClient() if err != nil { return ResGetObject{}, err } prmCnr := prm.addr.Container() prmObj := prm.addr.Object() cliPrm := sdkClient.PrmObjectGet{ BearerToken: prm.btoken, Session: prm.stoken, ContainerID: &prmCnr, ObjectID: &prmObj, Key: prm.key, } var res ResGetObject rObj, err := cl.ObjectGetInit(ctx, cliPrm) if err = c.handleError(ctx, nil, err); err != nil { return ResGetObject{}, fmt.Errorf("init object reading on client: %w", err) } start := time.Now() successReadHeader := rObj.ReadHeader(&res.Header) c.incRequests(time.Since(start), methodObjectGet) if !successReadHeader { rObjRes, err := rObj.Close() var st apistatus.Status if rObjRes != nil { st = rObjRes.Status() } err = c.handleError(ctx, st, err) return res, fmt.Errorf("read header: %w", err) } res.Payload = &objectReadCloser{ reader: rObj, elapsedTimeCallback: func(elapsed time.Duration) { c.incRequests(elapsed, methodObjectGet) }, } return res, nil } // objectHead invokes sdkClient.ObjectHead parse response status to error and return result as is. func (c *clientWrapper) objectHead(ctx context.Context, prm PrmObjectHead) (object.Object, error) { cl, err := c.getClient() if err != nil { return object.Object{}, err } prmCnr := prm.addr.Container() prmObj := prm.addr.Object() cliPrm := sdkClient.PrmObjectHead{ BearerToken: prm.btoken, Session: prm.stoken, Raw: prm.raw, ContainerID: &prmCnr, ObjectID: &prmObj, Key: prm.key, } var obj object.Object start := time.Now() res, err := cl.ObjectHead(ctx, cliPrm) c.incRequests(time.Since(start), methodObjectHead) var st apistatus.Status if res != nil { st = res.Status() } if err = c.handleError(ctx, st, err); err != nil { return obj, fmt.Errorf("read object header via client: %w", err) } if !res.ReadHeader(&obj) { return obj, errors.New("missing object header in response") } return obj, nil } // objectRange returns object range reader. func (c *clientWrapper) objectRange(ctx context.Context, prm PrmObjectRange) (ResObjectRange, error) { cl, err := c.getClient() if err != nil { return ResObjectRange{}, err } prmCnr := prm.addr.Container() prmObj := prm.addr.Object() cliPrm := sdkClient.PrmObjectRange{ BearerToken: prm.btoken, Session: prm.stoken, ContainerID: &prmCnr, ObjectID: &prmObj, Offset: prm.off, Length: prm.ln, Key: prm.key, } start := time.Now() res, err := cl.ObjectRangeInit(ctx, cliPrm) c.incRequests(time.Since(start), methodObjectRange) if err = c.handleError(ctx, nil, err); err != nil { return ResObjectRange{}, fmt.Errorf("init payload range reading on client: %w", err) } return ResObjectRange{ payload: res, elapsedTimeCallback: func(elapsed time.Duration) { c.incRequests(elapsed, methodObjectRange) }, }, nil } // objectSearch invokes sdkClient.ObjectSearchInit parse response status to error and return result as is. func (c *clientWrapper) objectSearch(ctx context.Context, prm PrmObjectSearch) (ResObjectSearch, error) { cl, err := c.getClient() if err != nil { return ResObjectSearch{}, err } cliPrm := sdkClient.PrmObjectSearch{ ContainerID: &prm.cnrID, Filters: prm.filters, Session: prm.stoken, BearerToken: prm.btoken, Key: prm.key, } res, err := cl.ObjectSearchInit(ctx, cliPrm) if err = c.handleError(ctx, nil, err); err != nil { return ResObjectSearch{}, fmt.Errorf("init object searching on client: %w", err) } return ResObjectSearch{r: res, handleError: c.handleError}, nil } // sessionCreate invokes sdkClient.SessionCreate parse response status to error and return result as is. func (c *clientWrapper) sessionCreate(ctx context.Context, prm prmCreateSession) (resCreateSession, error) { cl, err := c.getClient() if err != nil { return resCreateSession{}, err } cliPrm := sdkClient.PrmSessionCreate{ Expiration: prm.exp, Key: &prm.key, } start := time.Now() res, err := cl.SessionCreate(ctx, cliPrm) c.incRequests(time.Since(start), methodSessionCreate) var st apistatus.Status if res != nil { st = res.Status() } if err = c.handleError(ctx, st, err); err != nil { return resCreateSession{}, fmt.Errorf("session creation on client: %w", err) } return resCreateSession{ id: res.ID(), sessionKey: res.PublicKey(), }, nil } func (c *clientStatusMonitor) isHealthy() bool { return c.healthy.Load() == statusHealthy } func (c *clientStatusMonitor) setHealthy() { c.healthy.Store(statusHealthy) } func (c *clientStatusMonitor) setUnhealthy() { c.healthy.Store(statusUnhealthyOnRequest) } func (c *clientStatusMonitor) address() string { return c.addr } func (c *clientStatusMonitor) incErrorRate() { c.mu.Lock() c.currentErrorCount++ c.overallErrorCount++ thresholdReached := c.currentErrorCount >= c.errorThreshold if thresholdReached { c.setUnhealthy() c.currentErrorCount = 0 } c.mu.Unlock() if thresholdReached { c.log(zapcore.WarnLevel, "error threshold reached", zap.String("address", c.addr), zap.Uint32("threshold", c.errorThreshold)) } } func (c *clientStatusMonitor) incErrorRateToUnhealthy(err error) { c.mu.Lock() c.currentErrorCount = 0 c.overallErrorCount++ c.setUnhealthy() c.mu.Unlock() c.log(zapcore.WarnLevel, "explicitly mark node unhealthy", zap.String("address", c.addr), zap.Error(err)) } func (c *clientStatusMonitor) log(level zapcore.Level, msg string, fields ...zap.Field) { if c.logger == nil { return } c.logger.Log(level, msg, fields...) } func (c *clientStatusMonitor) currentErrorRate() uint32 { c.mu.RLock() defer c.mu.RUnlock() return c.currentErrorCount } func (c *clientStatusMonitor) overallErrorRate() uint64 { c.mu.RLock() defer c.mu.RUnlock() return c.overallErrorCount } func (c *clientStatusMonitor) methodsStatus() []StatusSnapshot { result := make([]StatusSnapshot, len(c.methods)) for i, val := range c.methods { result[i] = val.Snapshot() } return result } func (c *clientWrapper) incRequests(elapsed time.Duration, method MethodIndex) { methodStat := c.methods[method] methodStat.IncRequests(elapsed) if c.prm.poolRequestInfoCallback != nil { c.prm.poolRequestInfoCallback(RequestInfo{ Address: c.prm.address, Method: method, Elapsed: elapsed, }) } } func (c *clientWrapper) close() error { if !c.isDialed() { return nil } if cl := c.getClientRaw(); cl != nil { return cl.Close() } return nil } func (c *clientWrapper) scheduleGracefulClose() { cl := c.getClientRaw() if cl == nil { return } time.AfterFunc(c.prm.gracefulCloseOnSwitchTimeout, func() { if err := cl.Close(); err != nil { c.log(zap.DebugLevel, "close unhealthy client during rebalance", zap.String("address", c.address()), zap.Error(err)) } }) } func (c *clientStatusMonitor) handleError(ctx context.Context, st apistatus.Status, err error) error { if stErr := apistatus.ErrFromStatus(st); stErr != nil { switch stErr.(type) { case *apistatus.ServerInternal, *apistatus.WrongMagicNumber, *apistatus.SignatureVerification: c.incErrorRate() case *apistatus.NodeUnderMaintenance: c.incErrorRateToUnhealthy(stErr) } if err == nil { err = stErr } return err } if err != nil { if needCountError(ctx, err) { if sdkClient.IsErrNodeUnderMaintenance(err) { c.incErrorRateToUnhealthy(err) } else { c.incErrorRate() } } return err } return nil } func needCountError(ctx context.Context, err error) bool { // non-status logic error that could be returned // from the SDK client; should not be considered // as a connection error var siErr *object.SplitInfoError if errors.As(err, &siErr) { return false } var eiErr *object.ECInfoError if errors.As(err, &eiErr) { return false } if ctx != nil && errors.Is(ctx.Err(), context.Canceled) { return false } return true } // clientBuilder is a type alias of client constructors which open connection // to the given endpoint. type clientBuilder = func(endpoint string) client // RequestInfo groups info about pool request. type RequestInfo struct { Address string Method MethodIndex Elapsed time.Duration } // InitParameters contains values used to initialize connection Pool. type InitParameters struct { key *ecdsa.PrivateKey logger *zap.Logger nodeDialTimeout time.Duration nodeStreamTimeout time.Duration healthcheckTimeout time.Duration clientRebalanceInterval time.Duration sessionExpirationDuration uint64 errorThreshold uint32 nodeParams []NodeParam requestCallback func(RequestInfo) dialOptions []grpc.DialOption clientBuilder clientBuilder gracefulCloseOnSwitchTimeout time.Duration } // SetKey specifies default key to be used for the protocol communication by default. func (x *InitParameters) SetKey(key *ecdsa.PrivateKey) { x.key = key } // SetLogger specifies logger. func (x *InitParameters) SetLogger(logger *zap.Logger) { x.logger = logger } // SetNodeDialTimeout specifies the timeout for connection to be established. func (x *InitParameters) SetNodeDialTimeout(timeout time.Duration) { x.nodeDialTimeout = timeout } // SetNodeStreamTimeout specifies the timeout for individual operations in streaming RPC. func (x *InitParameters) SetNodeStreamTimeout(timeout time.Duration) { x.nodeStreamTimeout = timeout } // SetHealthcheckTimeout specifies the timeout for request to node to decide if it is alive. // // See also Pool.Dial. func (x *InitParameters) SetHealthcheckTimeout(timeout time.Duration) { x.healthcheckTimeout = timeout } // SetClientRebalanceInterval specifies the interval for updating nodes health status. // // See also Pool.Dial. func (x *InitParameters) SetClientRebalanceInterval(interval time.Duration) { x.clientRebalanceInterval = interval } // SetGracefulCloseOnSwitchTimeout specifies the timeout after which unhealthy client be closed during rebalancing // if it will become healthy back. // Generally this param should be less than client rebalance interval (see SetClientRebalanceInterval). // // See also SetErrorThreshold. func (x *InitParameters) SetGracefulCloseOnSwitchTimeout(timeout time.Duration) { x.gracefulCloseOnSwitchTimeout = timeout } // SetSessionExpirationDuration specifies the session token lifetime in epochs. func (x *InitParameters) SetSessionExpirationDuration(expirationDuration uint64) { x.sessionExpirationDuration = expirationDuration } // SetErrorThreshold specifies the number of errors on connection after which node is considered as unhealthy. func (x *InitParameters) SetErrorThreshold(threshold uint32) { x.errorThreshold = threshold } // SetRequestCallback makes the pool client to pass RequestInfo for each // request to f. Nil (default) means ignore RequestInfo. func (x *InitParameters) SetRequestCallback(f func(RequestInfo)) { x.requestCallback = f } // AddNode append information about the node to which you want to connect. func (x *InitParameters) AddNode(nodeParam NodeParam) { x.nodeParams = append(x.nodeParams, nodeParam) } // SetGRPCDialOptions sets the gRPC dial options for new gRPC client connection. func (x *InitParameters) SetGRPCDialOptions(opts ...grpc.DialOption) { x.dialOptions = opts } // setClientBuilder sets clientBuilder used for client construction. // Wraps setClientBuilderContext without a context. func (x *InitParameters) setClientBuilder(builder clientBuilder) { x.clientBuilder = builder } // isMissingClientBuilder checks if client constructor was not specified. func (x *InitParameters) isMissingClientBuilder() bool { return x.clientBuilder == nil } type rebalanceParameters struct { nodesParams []*nodesParam nodeRequestTimeout time.Duration clientRebalanceInterval time.Duration sessionExpirationDuration uint64 } type nodesParam struct { priority int addresses []string weights []float64 } // NodeParam groups parameters of remote node. type NodeParam struct { priority int address string weight float64 } // NewNodeParam creates NodeParam using parameters. func NewNodeParam(priority int, address string, weight float64) (prm NodeParam) { prm.SetPriority(priority) prm.SetAddress(address) prm.SetWeight(weight) return } // SetPriority specifies priority of the node. // Negative value is allowed. In the result node groups // with the same priority will be sorted by descent. func (x *NodeParam) SetPriority(priority int) { x.priority = priority } // Priority returns priority of the node. // Requests will be served by nodes subset with the highest priority (the smaller value - the higher priority). // If there are no healthy nodes in subsets with current or higher priority, requests will be served // by nodes subset with lower priority. func (x *NodeParam) Priority() int { return x.priority } // SetAddress specifies address of the node. func (x *NodeParam) SetAddress(address string) { x.address = address } // Address returns address of the node. func (x *NodeParam) Address() string { return x.address } // SetWeight specifies weight of the node. // Weights used to adjust requests' distribution between nodes with the same priority. func (x *NodeParam) SetWeight(weight float64) { x.weight = weight } // Weight returns weight of the node. func (x *NodeParam) Weight() float64 { return x.weight } // WaitParams contains parameters used in polling is a something applied on FrostFS network. type WaitParams struct { Timeout time.Duration PollInterval time.Duration } // SetTimeout specifies the time to wait for the operation to complete. // // Deprecated: Use WaitParams.Timeout instead. func (x *WaitParams) SetTimeout(timeout time.Duration) { x.Timeout = timeout } // SetPollInterval specifies the interval, once it will check the completion of the operation. // // Deprecated: Use WaitParams.PollInterval instead. func (x *WaitParams) SetPollInterval(tick time.Duration) { x.PollInterval = tick } func defaultWaitParams() *WaitParams { return &WaitParams{ Timeout: 120 * time.Second, PollInterval: 5 * time.Second, } } // checkForPositive panics if any of the wait params isn't positive. func (x *WaitParams) checkForPositive() { if x.Timeout <= 0 || x.PollInterval <= 0 { panic("all wait params must be positive") } } // CheckValidity checks if all wait params are non-negative. func (x *WaitParams) CheckValidity() error { if x.Timeout <= 0 { return errors.New("timeout cannot be negative") } if x.PollInterval <= 0 { return errors.New("poll interval cannot be negative") } return nil } type prmContext struct { defaultSession bool verb session.ObjectVerb cnr cid.ID objSet bool objs []oid.ID } func (x *prmContext) useDefaultSession() { x.defaultSession = true } func (x *prmContext) useContainer(cnr cid.ID) { x.cnr = cnr } func (x *prmContext) useObjects(ids []oid.ID) { x.objs = ids x.objSet = true } func (x *prmContext) useAddress(addr oid.Address) { x.cnr = addr.Container() x.objs = []oid.ID{addr.Object()} x.objSet = true } func (x *prmContext) useVerb(verb session.ObjectVerb) { x.verb = verb } type prmCommon struct { key *ecdsa.PrivateKey btoken *bearer.Token stoken *session.Object } // UseKey specifies private key to sign the requests. // If key is not provided, then Pool default key is used. func (x *prmCommon) UseKey(key *ecdsa.PrivateKey) { x.key = key } // UseBearer attaches bearer token to be used for the operation. func (x *prmCommon) UseBearer(token bearer.Token) { x.btoken = &token } // UseSession specifies session within which operation should be performed. func (x *prmCommon) UseSession(token session.Object) { x.stoken = &token } // PrmObjectPut groups parameters of PutObject operation. type PrmObjectPut struct { prmCommon hdr object.Object payload io.Reader copiesNumber []uint32 clientCut bool networkInfo netmap.NetworkInfo withoutHomomorphicHash bool bufferMaxSize uint64 } // SetHeader specifies header of the object. func (x *PrmObjectPut) SetHeader(hdr object.Object) { x.hdr = hdr } // SetPayload specifies payload of the object. func (x *PrmObjectPut) SetPayload(payload io.Reader) { x.payload = payload } // SetCopiesNumber sets number of object copies that is enough to consider put successful. // Zero means using default behavior. func (x *PrmObjectPut) SetCopiesNumber(copiesNumber uint32) { x.copiesNumber = []uint32{copiesNumber} } // SetCopiesNumberVector sets number of object copies that is enough to consider put successful, provided as array. // Nil/empty vector means using default behavior. func (x *PrmObjectPut) SetCopiesNumberVector(copiesNumber []uint32) { x.copiesNumber = copiesNumber } // SetClientCut enables client cut for objects. It means that full object is prepared on client side // and retrying is possible. But this leads to additional memory using for buffering object parts. // Buffer size for every put is MaxObjectSize value from FrostFS network. // There is limit for total memory allocation for in-flight request and // can be set by InitParameters.SetMaxClientCutMemory (default value is 1gb). // Put requests will fail if this limit be reached. func (x *PrmObjectPut) SetClientCut(clientCut bool) { x.clientCut = clientCut } // WithoutHomomorphicHash if set to true do not use Tillich-ZĂ©mor hash for payload. func (x *PrmObjectPut) WithoutHomomorphicHash(v bool) { x.withoutHomomorphicHash = v } // SetBufferMaxSize sets max buffer size to read payload. // This value isn't used if object size is set explicitly and less than this value. // Default value 3MB. func (x *PrmObjectPut) SetBufferMaxSize(size uint64) { x.bufferMaxSize = size } func (x *PrmObjectPut) setNetworkInfo(ni netmap.NetworkInfo) { x.networkInfo = ni } // PrmObjectPatch groups parameters of PatchObject operation. type PrmObjectPatch struct { prmCommon addr oid.Address rng *object.Range payload io.Reader newAttrs []object.Attribute replaceAttrs bool maxPayloadPatchChunkLength int } // SetAddress sets the address of the object that is patched. func (x *PrmObjectPatch) SetAddress(addr oid.Address) { x.addr = addr } // SetRange sets the patch's range. func (x *PrmObjectPatch) SetRange(rng *object.Range) { x.rng = rng } // SetPayloadReader sets a payload reader. func (x *PrmObjectPatch) SetPayloadReader(payload io.Reader) { x.payload = payload } // SetRange sets the new attributes to the patch. func (x *PrmObjectPatch) SetNewAttributes(newAttrs []object.Attribute) { x.newAttrs = newAttrs } // SetRange sets the replace attributes flag to the patch. func (x *PrmObjectPatch) SetReplaceAttributes(replaceAttrs bool) { x.replaceAttrs = replaceAttrs } // SetMaxPayloadPatchChunkSize sets a max buf size to read the patch's payload. func (x *PrmObjectPatch) SetMaxPayloadPatchChunkSize(maxPayloadPatchChunkSize int) { x.maxPayloadPatchChunkLength = maxPayloadPatchChunkSize } // PrmObjectDelete groups parameters of DeleteObject operation. type PrmObjectDelete struct { prmCommon addr oid.Address } // SetAddress specifies FrostFS address of the object. func (x *PrmObjectDelete) SetAddress(addr oid.Address) { x.addr = addr } // PrmObjectGet groups parameters of GetObject operation. type PrmObjectGet struct { prmCommon addr oid.Address } // SetAddress specifies FrostFS address of the object. func (x *PrmObjectGet) SetAddress(addr oid.Address) { x.addr = addr } // PrmObjectHead groups parameters of HeadObject operation. type PrmObjectHead struct { prmCommon addr oid.Address raw bool } // SetAddress specifies FrostFS address of the object. func (x *PrmObjectHead) SetAddress(addr oid.Address) { x.addr = addr } // MarkRaw marks an intent to read physically stored object. func (x *PrmObjectHead) MarkRaw() { x.raw = true } // PrmObjectRange groups parameters of RangeObject operation. type PrmObjectRange struct { prmCommon addr oid.Address off, ln uint64 } // SetAddress specifies FrostFS address of the object. func (x *PrmObjectRange) SetAddress(addr oid.Address) { x.addr = addr } // SetOffset sets offset of the payload range to be read. func (x *PrmObjectRange) SetOffset(offset uint64) { x.off = offset } // SetLength sets length of the payload range to be read. func (x *PrmObjectRange) SetLength(length uint64) { x.ln = length } // PrmObjectSearch groups parameters of SearchObjects operation. type PrmObjectSearch struct { prmCommon cnrID cid.ID filters object.SearchFilters } // SetContainerID specifies the container in which to look for objects. func (x *PrmObjectSearch) SetContainerID(cnrID cid.ID) { x.cnrID = cnrID } // SetFilters specifies filters by which to select objects. func (x *PrmObjectSearch) SetFilters(filters object.SearchFilters) { x.filters = filters } // PrmContainerPut groups parameters of PutContainer operation. type PrmContainerPut struct { ClientParams sdkClient.PrmContainerPut WaitParams *WaitParams } // SetContainer container structure to be used as a parameter of the base // client's operation. // // See git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client.PrmContainerPut.SetContainer. // // Deprecated: Use PrmContainerPut.ClientParams.Container instead. func (x *PrmContainerPut) SetContainer(cnr container.Container) { x.ClientParams.SetContainer(cnr) } // WithinSession specifies session to be used as a parameter of the base // client's operation. // // See git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client.PrmContainerPut.WithinSession. // // Deprecated: Use PrmContainerPut.ClientParams.Session instead. func (x *PrmContainerPut) WithinSession(s session.Container) { x.ClientParams.WithinSession(s) } // SetWaitParams specifies timeout params to complete operation. // If not provided the default one will be used. // Panics if any of the wait params isn't positive. // // Deprecated: Use PrmContainerPut.ClientParams.WaitParams instead. func (x *PrmContainerPut) SetWaitParams(waitParams WaitParams) { waitParams.checkForPositive() x.WaitParams = &waitParams } // PrmContainerGet groups parameters of GetContainer operation. type PrmContainerGet struct { ContainerID cid.ID Session *session.Container } // SetContainerID specifies identifier of the container to be read. // // Deprecated: Use PrmContainerGet.ContainerID instead. func (prm *PrmContainerGet) SetContainerID(cnrID cid.ID) { prm.ContainerID = cnrID } // PrmContainerList groups parameters of ListContainers operation. type PrmContainerList struct { OwnerID user.ID Session *session.Container } // SetOwnerID specifies identifier of the FrostFS account to list the containers. // // Deprecated: Use PrmContainerList.OwnerID instead. func (x *PrmContainerList) SetOwnerID(ownerID user.ID) { x.OwnerID = ownerID } // PrmContainerDelete groups parameters of DeleteContainer operation. type PrmContainerDelete struct { ContainerID cid.ID Session *session.Container WaitParams *WaitParams } // SetContainerID specifies identifier of the FrostFS container to be removed. // // Deprecated: Use PrmContainerDelete.ContainerID instead. func (x *PrmContainerDelete) SetContainerID(cnrID cid.ID) { x.ContainerID = cnrID } // SetSessionToken specifies session within which operation should be performed. // // Deprecated: Use PrmContainerDelete.Session instead. func (x *PrmContainerDelete) SetSessionToken(token session.Container) { x.Session = &token } // SetWaitParams specifies timeout params to complete operation. // If not provided the default one will be used. // Panics if any of the wait params isn't positive. // // Deprecated: Use PrmContainerDelete.WaitParams instead. func (x *PrmContainerDelete) SetWaitParams(waitParams WaitParams) { waitParams.checkForPositive() x.WaitParams = &waitParams } type PrmAddAPEChain struct { Target ape.ChainTarget Chain ape.Chain } type PrmRemoveAPEChain struct { Target ape.ChainTarget ChainID ape.ChainID } type PrmListAPEChains struct { Target ape.ChainTarget } // PrmBalanceGet groups parameters of Balance operation. type PrmBalanceGet struct { account user.ID } // SetAccount specifies identifier of the FrostFS account for which the balance is requested. func (x *PrmBalanceGet) SetAccount(id user.ID) { x.account = id } // prmEndpointInfo groups parameters of sessionCreate operation. type prmCreateSession struct { exp uint64 key ecdsa.PrivateKey } // setExp sets number of the last FrostFS epoch in the lifetime of the session after which it will be expired. func (x *prmCreateSession) setExp(exp uint64) { x.exp = exp } // useKey specifies owner private key for session token. // If key is not provided, then Pool default key is used. func (x *prmCreateSession) useKey(key ecdsa.PrivateKey) { x.key = key } // prmEndpointInfo groups parameters of endpointInfo operation. type prmEndpointInfo struct{} // prmNetworkInfo groups parameters of networkInfo operation. type prmNetworkInfo struct{} // prmNetMapSnapshot groups parameters of netMapSnapshot operation. type prmNetMapSnapshot struct{} // resCreateSession groups resulting values of sessionCreate operation. type resCreateSession struct { id []byte sessionKey []byte } // Pool represents virtual connection to the FrostFS network to communicate // with multiple FrostFS servers without thinking about switching between servers // due to load balancing proportions or their unavailability. // It is designed to provide a convenient abstraction from the multiple sdkClient.client types. // // Pool can be created and initialized using NewPool function. // Before executing the FrostFS operations using the Pool, connection to the // servers MUST BE correctly established (see Dial method). // Using the Pool before connecting have been established can lead to a panic. // After the work, the Pool SHOULD BE closed (see Close method): it frees internal // and system resources which were allocated for the period of work of the Pool. // Calling Dial/Close methods during the communication process step strongly discouraged // as it leads to undefined behavior. // // Each method which produces a FrostFS API call may return an error. // Status of underlying server response is casted to built-in error instance. // Certain statuses can be checked using `sdkClient` and standard `errors` packages. // Note that package provides some helper functions to work with status returns // (e.g. sdkClient.IsErrContainerNotFound, sdkClient.IsErrObjectNotFound). // // See pool package overview to get some examples. type Pool struct { innerPools []*innerPool key *ecdsa.PrivateKey cancel context.CancelFunc closedCh chan struct{} cache *sessionCache stokenDuration uint64 rebalanceParams rebalanceParameters clientBuilder clientBuilder logger *zap.Logger maxObjectSize uint64 } type innerPool struct { lock sync.RWMutex sampler *sampler clients []client } const ( defaultSessionTokenExpirationDuration = 100 // in epochs defaultErrorThreshold = 100 defaultGracefulCloseOnSwitchTimeout = 10 * time.Second defaultRebalanceInterval = 15 * time.Second defaultHealthcheckTimeout = 4 * time.Second defaultDialTimeout = 5 * time.Second defaultStreamTimeout = 10 * time.Second defaultBufferMaxSizeForPut = 3 * 1024 * 1024 // 3 MB ) // NewPool creates connection pool using parameters. func NewPool(options InitParameters) (*Pool, error) { if options.key == nil { return nil, fmt.Errorf("missed required parameter 'Key'") } nodesParams, err := adjustNodeParams(options.nodeParams) if err != nil { return nil, err } cache, err := newCache(options.sessionExpirationDuration) if err != nil { return nil, fmt.Errorf("couldn't create cache: %w", err) } fillDefaultInitParams(&options, cache) pool := &Pool{ key: options.key, cache: cache, logger: options.logger, stokenDuration: options.sessionExpirationDuration, rebalanceParams: rebalanceParameters{ nodesParams: nodesParams, nodeRequestTimeout: options.healthcheckTimeout, clientRebalanceInterval: options.clientRebalanceInterval, sessionExpirationDuration: options.sessionExpirationDuration, }, clientBuilder: options.clientBuilder, } return pool, nil } // Dial establishes a connection to the servers from the FrostFS network. // It also starts a routine that checks the health of the nodes and // updates the weights of the nodes for balancing. // Returns an error describing failure reason. // // If failed, the Pool SHOULD NOT be used. // // See also InitParameters.SetClientRebalanceInterval. func (p *Pool) Dial(ctx context.Context) error { inner := make([]*innerPool, len(p.rebalanceParams.nodesParams)) var atLeastOneHealthy bool for i, params := range p.rebalanceParams.nodesParams { clients := make([]client, len(params.weights)) for j, addr := range params.addresses { clients[j] = p.clientBuilder(addr) if err := clients[j].dial(ctx); err != nil { p.log(zap.WarnLevel, "failed to build client", zap.String("address", addr), zap.Error(err)) continue } var st session.Object err := initSessionForDuration(ctx, &st, clients[j], p.rebalanceParams.sessionExpirationDuration, *p.key, false) if err != nil { clients[j].setUnhealthy() p.log(zap.WarnLevel, "failed to create frostfs session token for client", zap.String("address", addr), zap.Error(err)) continue } _ = p.cache.Put(formCacheKey(addr, p.key, false), st) atLeastOneHealthy = true } source := rand.NewSource(time.Now().UnixNano()) sampl := newSampler(params.weights, source) inner[i] = &innerPool{ sampler: sampl, clients: clients, } } if !atLeastOneHealthy { return fmt.Errorf("at least one node must be healthy") } ctx, cancel := context.WithCancel(ctx) p.cancel = cancel p.closedCh = make(chan struct{}) p.innerPools = inner ni, err := p.NetworkInfo(ctx) if err != nil { return fmt.Errorf("get network info for max object size: %w", err) } p.maxObjectSize = ni.MaxObjectSize() go p.startRebalance(ctx) return nil } func (p *Pool) log(level zapcore.Level, msg string, fields ...zap.Field) { if p.logger == nil { return } p.logger.Log(level, msg, fields...) } func fillDefaultInitParams(params *InitParameters, cache *sessionCache) { if params.sessionExpirationDuration == 0 { params.sessionExpirationDuration = defaultSessionTokenExpirationDuration } if params.errorThreshold == 0 { params.errorThreshold = defaultErrorThreshold } if params.clientRebalanceInterval <= 0 { params.clientRebalanceInterval = defaultRebalanceInterval } if params.gracefulCloseOnSwitchTimeout <= 0 { params.gracefulCloseOnSwitchTimeout = defaultGracefulCloseOnSwitchTimeout } if params.healthcheckTimeout <= 0 { params.healthcheckTimeout = defaultHealthcheckTimeout } if params.nodeDialTimeout <= 0 { params.nodeDialTimeout = defaultDialTimeout } if params.nodeStreamTimeout <= 0 { params.nodeStreamTimeout = defaultStreamTimeout } if cache.tokenDuration == 0 { cache.tokenDuration = defaultSessionTokenExpirationDuration } if params.isMissingClientBuilder() { params.setClientBuilder(func(addr string) client { var prm wrapperPrm prm.setAddress(addr) prm.setKey(*params.key) prm.setLogger(params.logger) prm.setDialTimeout(params.nodeDialTimeout) prm.setStreamTimeout(params.nodeStreamTimeout) prm.setErrorThreshold(params.errorThreshold) prm.setGracefulCloseOnSwitchTimeout(params.gracefulCloseOnSwitchTimeout) prm.setPoolRequestCallback(params.requestCallback) prm.setGRPCDialOptions(params.dialOptions) prm.setResponseInfoCallback(func(info sdkClient.ResponseMetaInfo) error { cache.updateEpoch(info.Epoch()) return nil }) return newWrapper(prm) }) } } func adjustNodeParams(nodeParams []NodeParam) ([]*nodesParam, error) { if len(nodeParams) == 0 { return nil, errors.New("no FrostFS peers configured") } nodesParamsMap := make(map[int]*nodesParam) for _, param := range nodeParams { nodes, ok := nodesParamsMap[param.priority] if !ok { nodes = &nodesParam{priority: param.priority} } nodes.addresses = append(nodes.addresses, param.address) nodes.weights = append(nodes.weights, param.weight) nodesParamsMap[param.priority] = nodes } nodesParams := make([]*nodesParam, 0, len(nodesParamsMap)) for _, nodes := range nodesParamsMap { nodes.weights = adjustWeights(nodes.weights) nodesParams = append(nodesParams, nodes) } sort.Slice(nodesParams, func(i, j int) bool { return nodesParams[i].priority < nodesParams[j].priority }) return nodesParams, nil } // startRebalance runs loop to monitor connection healthy status. func (p *Pool) startRebalance(ctx context.Context) { ticker := time.NewTicker(p.rebalanceParams.clientRebalanceInterval) defer ticker.Stop() buffers := make([][]float64, len(p.rebalanceParams.nodesParams)) for i, params := range p.rebalanceParams.nodesParams { buffers[i] = make([]float64, len(params.weights)) } for { select { case <-ctx.Done(): close(p.closedCh) return case <-ticker.C: p.updateNodesHealth(ctx, buffers) ticker.Reset(p.rebalanceParams.clientRebalanceInterval) } } } func (p *Pool) updateNodesHealth(ctx context.Context, buffers [][]float64) { wg := sync.WaitGroup{} for i, inner := range p.innerPools { wg.Add(1) bufferWeights := buffers[i] go func(i int, _ *innerPool) { defer wg.Done() p.updateInnerNodesHealth(ctx, i, bufferWeights) }(i, inner) } wg.Wait() } func (p *Pool) updateInnerNodesHealth(ctx context.Context, i int, bufferWeights []float64) { if i > len(p.innerPools)-1 { return } pool := p.innerPools[i] options := p.rebalanceParams healthyChanged := new(atomic.Bool) wg := sync.WaitGroup{} for j, cli := range pool.clients { wg.Add(1) go func(j int, cli client) { defer wg.Done() tctx, c := context.WithTimeout(ctx, options.nodeRequestTimeout) defer c() changed, err := restartIfUnhealthy(tctx, cli) healthy := err == nil if healthy { bufferWeights[j] = options.nodesParams[i].weights[j] } else { bufferWeights[j] = 0 p.cache.DeleteByPrefix(cli.address()) } if changed { fields := []zap.Field{zap.String("address", cli.address()), zap.Bool("healthy", healthy)} if err != nil { fields = append(fields, zap.String("reason", err.Error())) } p.log(zap.DebugLevel, "health has changed", fields...) healthyChanged.Store(true) } }(j, cli) } wg.Wait() if healthyChanged.Load() { probabilities := adjustWeights(bufferWeights) source := rand.NewSource(time.Now().UnixNano()) pool.lock.Lock() pool.sampler = newSampler(probabilities, source) pool.lock.Unlock() } } // restartIfUnhealthy checks healthy status of client and recreate it if status is unhealthy. // Indicating if status was changed by this function call and returns error that caused unhealthy status. func restartIfUnhealthy(ctx context.Context, c client) (changed bool, err error) { defer func() { if err != nil { c.setUnhealthy() } else { c.setHealthy() } }() wasHealthy := c.isHealthy() if res, err := c.healthcheck(ctx); err == nil { if res.Status().IsMaintenance() { return wasHealthy, new(apistatus.NodeUnderMaintenance) } return !wasHealthy, nil } if err = c.restart(ctx); err != nil { return wasHealthy, err } res, err := c.healthcheck(ctx) if err != nil { return wasHealthy, err } if res.Status().IsMaintenance() { return wasHealthy, new(apistatus.NodeUnderMaintenance) } return !wasHealthy, nil } func adjustWeights(weights []float64) []float64 { adjusted := make([]float64, len(weights)) sum := 0.0 for _, weight := range weights { sum += weight } if sum > 0 { for i, weight := range weights { adjusted[i] = weight / sum } } return adjusted } func (p *Pool) connection() (client, error) { for _, inner := range p.innerPools { cp, err := inner.connection() if err == nil { return cp, nil } } return nil, errors.New("no healthy client") } func (p *innerPool) connection() (client, error) { p.lock.RLock() // need lock because of using p.sampler defer p.lock.RUnlock() if len(p.clients) == 1 { cp := p.clients[0] if cp.isHealthy() { return cp, nil } return nil, errors.New("no healthy client") } attempts := 3 * len(p.clients) for range attempts { i := p.sampler.Next() if cp := p.clients[i]; cp.isHealthy() { return cp, nil } } return nil, errors.New("no healthy client") } func formCacheKey(address string, key *ecdsa.PrivateKey, clientCut bool) string { k := keys.PrivateKey{PrivateKey: *key} stype := "server" if clientCut { stype = "client" } return address + stype + k.String() } func (p *Pool) checkSessionTokenErr(err error, address string) bool { if err == nil { return false } if sdkClient.IsErrSessionNotFound(err) || sdkClient.IsErrSessionExpired(err) { p.cache.DeleteByPrefix(address) return true } return false } func initSessionForDuration(ctx context.Context, dst *session.Object, c client, dur uint64, ownerKey ecdsa.PrivateKey, clientCut bool) error { ni, err := c.networkInfo(ctx, prmNetworkInfo{}) if err != nil { return err } epoch := ni.CurrentEpoch() var exp uint64 if math.MaxUint64-epoch < dur { exp = math.MaxUint64 } else { exp = epoch + dur } var prm prmCreateSession prm.setExp(exp) prm.useKey(ownerKey) var ( id uuid.UUID key frostfsecdsa.PublicKey ) if clientCut { id = uuid.New() key = frostfsecdsa.PublicKey(ownerKey.PublicKey) } else { res, err := c.sessionCreate(ctx, prm) if err != nil { return err } if err = id.UnmarshalBinary(res.id); err != nil { return fmt.Errorf("invalid session token ID: %w", err) } if err = key.Decode(res.sessionKey); err != nil { return fmt.Errorf("invalid public session key: %w", err) } } dst.SetID(id) dst.SetAuthKey(&key) dst.SetExp(exp) return nil } type callContext struct { client client // client endpoint endpoint string // request signer key *ecdsa.PrivateKey // flag to open default session if session token is missing sessionDefault bool sessionTarget func(session.Object) sessionVerb session.ObjectVerb sessionCnr cid.ID sessionObjSet bool sessionObjs []oid.ID sessionClientCut bool } func (p *Pool) initCallContext(ctx *callContext, cfg prmCommon, prmCtx prmContext) error { cp, err := p.connection() if err != nil { return err } ctx.key = cfg.key if ctx.key == nil { // use pool key if caller didn't specify its own ctx.key = p.key } ctx.endpoint = cp.address() ctx.client = cp if ctx.sessionTarget != nil && cfg.stoken != nil { ctx.sessionTarget(*cfg.stoken) } // note that we don't override session provided by the caller ctx.sessionDefault = cfg.stoken == nil && prmCtx.defaultSession if ctx.sessionDefault { ctx.sessionVerb = prmCtx.verb ctx.sessionCnr = prmCtx.cnr ctx.sessionObjSet = prmCtx.objSet ctx.sessionObjs = prmCtx.objs } return err } // opens new session or uses cached one. // Must be called only on initialized callContext with set sessionTarget. func (p *Pool) openDefaultSession(ctx context.Context, cc *callContext) error { cacheKey := formCacheKey(cc.endpoint, cc.key, cc.sessionClientCut) tok, ok := p.cache.Get(cacheKey) if !ok { // init new session err := initSessionForDuration(ctx, &tok, cc.client, p.stokenDuration, *cc.key, cc.sessionClientCut) if err != nil { return fmt.Errorf("session API client: %w", err) } // cache the opened session p.cache.Put(cacheKey, tok) } tok.ForVerb(cc.sessionVerb) tok.BindContainer(cc.sessionCnr) if cc.sessionObjSet { tok.LimitByObjects(cc.sessionObjs...) } // sign the token if err := tok.Sign(*cc.key); err != nil { return fmt.Errorf("sign token of the opened session: %w", err) } cc.sessionTarget(tok) return nil } // opens default session (if sessionDefault is set), and calls f. If f returns // session-related error then cached token is removed. func (p *Pool) call(ctx context.Context, cc *callContext, f func() error) error { var err error if cc.sessionDefault { err = p.openDefaultSession(ctx, cc) if err != nil { return fmt.Errorf("open default session: %w", err) } } err = f() _ = p.checkSessionTokenErr(err, cc.endpoint) return err } // fillAppropriateKey use pool key if caller didn't specify its own. func (p *Pool) fillAppropriateKey(prm *prmCommon) { if prm.key == nil { prm.key = p.key } } // ResPutObject is designed to provide identifier and creation epoch of the saved object. type ResPutObject struct { ObjectID oid.ID Epoch uint64 } // ResPatchObject is designed to provide identifier for the saved patched object. type ResPatchObject struct { ObjectID oid.ID } // PatchObject patches an object through a remote server using FrostFS API protocol. // // Main return value MUST NOT be processed on an erroneous return. func (p *Pool) PatchObject(ctx context.Context, prm PrmObjectPatch) (ResPatchObject, error) { var prmCtx prmContext prmCtx.useDefaultSession() prmCtx.useVerb(session.VerbObjectPatch) prmCtx.useContainer(prm.addr.Container()) p.fillAppropriateKey(&prm.prmCommon) var ctxCall callContext if err := p.initCallContext(&ctxCall, prm.prmCommon, prmCtx); err != nil { return ResPatchObject{}, fmt.Errorf("init call context: %w", err) } if ctxCall.sessionDefault { ctxCall.sessionTarget = prm.UseSession if err := p.openDefaultSession(ctx, &ctxCall); err != nil { return ResPatchObject{}, fmt.Errorf("open default session: %w", err) } } res, err := ctxCall.client.objectPatch(ctx, prm) if err != nil { // removes session token from cache in case of token error p.checkSessionTokenErr(err, ctxCall.endpoint) return ResPatchObject{}, fmt.Errorf("init patching on API client %s: %w", ctxCall.endpoint, err) } return res, nil } // PutObject writes an object through a remote server using FrostFS API protocol. // // Main return value MUST NOT be processed on an erroneous return. func (p *Pool) PutObject(ctx context.Context, prm PrmObjectPut) (ResPutObject, error) { cnr, _ := prm.hdr.ContainerID() var prmCtx prmContext prmCtx.useDefaultSession() prmCtx.useVerb(session.VerbObjectPut) prmCtx.useContainer(cnr) p.fillAppropriateKey(&prm.prmCommon) var ctxCall callContext ctxCall.sessionClientCut = prm.clientCut if err := p.initCallContext(&ctxCall, prm.prmCommon, prmCtx); err != nil { return ResPutObject{}, fmt.Errorf("init call context: %w", err) } if ctxCall.sessionDefault { ctxCall.sessionTarget = prm.UseSession if err := p.openDefaultSession(ctx, &ctxCall); err != nil { return ResPutObject{}, fmt.Errorf("open default session: %w", err) } } if prm.clientCut { var ni netmap.NetworkInfo ni.SetCurrentEpoch(p.cache.Epoch()) ni.SetMaxObjectSize(p.maxObjectSize) // we want to use initial max object size in PayloadSizeLimiter prm.setNetworkInfo(ni) } res, err := ctxCall.client.objectPut(ctx, prm) if err != nil { // removes session token from cache in case of token error p.checkSessionTokenErr(err, ctxCall.endpoint) return ResPutObject{}, fmt.Errorf("init writing on API client %s: %w", ctxCall.endpoint, err) } return res, nil } // DeleteObject marks an object for deletion from the container using FrostFS API protocol. // As a marker, a special unit called a tombstone is placed in the container. // It confirms the user's intent to delete the object, and is itself a container object. // Explicit deletion is done asynchronously, and is generally not guaranteed. func (p *Pool) DeleteObject(ctx context.Context, prm PrmObjectDelete) error { var prmCtx prmContext prmCtx.useDefaultSession() prmCtx.useVerb(session.VerbObjectDelete) prmCtx.useAddress(prm.addr) if prm.stoken == nil { // collect phy objects only if we are about to open default session var tokens relations.Tokens tokens.Bearer = prm.btoken relatives, err := relations.ListAllRelations(ctx, p, prm.addr.Container(), prm.addr.Object(), tokens) if err != nil { return fmt.Errorf("failed to collect relatives: %w", err) } if len(relatives) != 0 { prmCtx.useContainer(prm.addr.Container()) prmCtx.useObjects(append(relatives, prm.addr.Object())) } } p.fillAppropriateKey(&prm.prmCommon) var cc callContext cc.sessionTarget = prm.UseSession err := p.initCallContext(&cc, prm.prmCommon, prmCtx) if err != nil { return err } return p.call(ctx, &cc, func() error { if err = cc.client.objectDelete(ctx, prm); err != nil { return fmt.Errorf("remove object via client %s: %w", cc.endpoint, err) } return nil }) } type objectReadCloser struct { reader *sdkClient.ObjectReader elapsedTimeCallback func(time.Duration) } // Read implements io.Reader of the object payload. func (x *objectReadCloser) Read(p []byte) (int, error) { start := time.Now() n, err := x.reader.Read(p) x.elapsedTimeCallback(time.Since(start)) return n, err } // Close implements io.Closer of the object payload. func (x *objectReadCloser) Close() error { _, err := x.reader.Close() return err } // ResGetObject is designed to provide object header nad read one object payload from FrostFS system. type ResGetObject struct { Header object.Object Payload io.ReadCloser } // GetObject reads object header and initiates reading an object payload through a remote server using FrostFS API protocol. // // Main return value MUST NOT be processed on an erroneous return. func (p *Pool) GetObject(ctx context.Context, prm PrmObjectGet) (ResGetObject, error) { p.fillAppropriateKey(&prm.prmCommon) var cc callContext cc.sessionTarget = prm.UseSession var res ResGetObject err := p.initCallContext(&cc, prm.prmCommon, prmContext{}) if err != nil { return res, err } return res, p.call(ctx, &cc, func() error { res, err = cc.client.objectGet(ctx, prm) if err != nil { return fmt.Errorf("get object via client %s: %w", cc.endpoint, err) } return nil }) } // HeadObject reads object header through a remote server using FrostFS API protocol. // // Main return value MUST NOT be processed on an erroneous return. func (p *Pool) HeadObject(ctx context.Context, prm PrmObjectHead) (object.Object, error) { p.fillAppropriateKey(&prm.prmCommon) var cc callContext cc.sessionTarget = prm.UseSession var obj object.Object err := p.initCallContext(&cc, prm.prmCommon, prmContext{}) if err != nil { return obj, err } return obj, p.call(ctx, &cc, func() error { obj, err = cc.client.objectHead(ctx, prm) if err != nil { return fmt.Errorf("head object via client %s: %w", cc.endpoint, err) } return nil }) } // ResObjectRange is designed to read payload range of one object // from FrostFS system. // // Must be initialized using Pool.ObjectRange, any other // usage is unsafe. type ResObjectRange struct { payload *sdkClient.ObjectRangeReader elapsedTimeCallback func(time.Duration) } // Read implements io.Reader of the object payload. func (x *ResObjectRange) Read(p []byte) (int, error) { start := time.Now() n, err := x.payload.Read(p) x.elapsedTimeCallback(time.Since(start)) return n, err } // Close ends reading the payload range and returns the result of the operation // along with the final results. Must be called after using the ResObjectRange. func (x *ResObjectRange) Close() error { _, err := x.payload.Close() return err } // ObjectRange initiates reading an object's payload range through a remote // server using FrostFS API protocol. // // Main return value MUST NOT be processed on an erroneous return. func (p *Pool) ObjectRange(ctx context.Context, prm PrmObjectRange) (ResObjectRange, error) { p.fillAppropriateKey(&prm.prmCommon) var cc callContext cc.sessionTarget = prm.UseSession var res ResObjectRange err := p.initCallContext(&cc, prm.prmCommon, prmContext{}) if err != nil { return res, err } return res, p.call(ctx, &cc, func() error { res, err = cc.client.objectRange(ctx, prm) if err != nil { return fmt.Errorf("object range via client %s: %w", cc.endpoint, err) } return nil }) } // ResObjectSearch is designed to read list of object identifiers from FrostFS system. // // Must be initialized using Pool.SearchObjects, any other usage is unsafe. type ResObjectSearch struct { r *sdkClient.ObjectListReader handleError func(context.Context, apistatus.Status, error) error } // Read reads another list of the object identifiers. func (x *ResObjectSearch) Read(buf []oid.ID) (int, error) { n, ok := x.r.Read(buf) if !ok { res, err := x.r.Close() if err == nil { return n, io.EOF } var status apistatus.Status if res != nil { status = res.Status() } err = x.handleError(nil, status, err) return n, err } return n, nil } // Iterate iterates over the list of found object identifiers. // f can return true to stop iteration earlier. // // Returns an error if object can't be read. func (x *ResObjectSearch) Iterate(f func(oid.ID) bool) error { return x.r.Iterate(f) } // Close ends reading list of the matched objects and returns the result of the operation // along with the final results. Must be called after using the ResObjectSearch. func (x *ResObjectSearch) Close() { _, _ = x.r.Close() } // SearchObjects initiates object selection through a remote server using FrostFS API protocol. // // The call only opens the transmission channel, explicit fetching of matched objects // is done using the ResObjectSearch. Resulting reader must be finally closed. // // Main return value MUST NOT be processed on an erroneous return. func (p *Pool) SearchObjects(ctx context.Context, prm PrmObjectSearch) (ResObjectSearch, error) { p.fillAppropriateKey(&prm.prmCommon) var cc callContext cc.sessionTarget = prm.UseSession var res ResObjectSearch err := p.initCallContext(&cc, prm.prmCommon, prmContext{}) if err != nil { return res, err } return res, p.call(ctx, &cc, func() error { res, err = cc.client.objectSearch(ctx, prm) if err != nil { return fmt.Errorf("search object via client %s: %w", cc.endpoint, err) } return nil }) } // PutContainer sends request to save container in FrostFS and waits for the operation to complete. // // Waiting parameters can be specified using SetWaitParams. If not called, defaults are used: // // polling interval: 5s // waiting timeout: 120s // // Success can be verified by reading by identifier (see GetContainer). // // Main return value MUST NOT be processed on an erroneous return. func (p *Pool) PutContainer(ctx context.Context, prm PrmContainerPut) (cid.ID, error) { cp, err := p.connection() if err != nil { return cid.ID{}, err } cnrID, err := cp.containerPut(ctx, prm) if err != nil { return cid.ID{}, fmt.Errorf("put container via client '%s': %w", cp.address(), err) } return cnrID, nil } // GetContainer reads FrostFS container by ID. // // Main return value MUST NOT be processed on an erroneous return. func (p *Pool) GetContainer(ctx context.Context, prm PrmContainerGet) (container.Container, error) { cp, err := p.connection() if err != nil { return container.Container{}, err } cnrs, err := cp.containerGet(ctx, prm) if err != nil { return container.Container{}, fmt.Errorf("get container via client '%s': %w", cp.address(), err) } return cnrs, nil } // ListContainers requests identifiers of the account-owned containers. func (p *Pool) ListContainers(ctx context.Context, prm PrmContainerList) ([]cid.ID, error) { cp, err := p.connection() if err != nil { return nil, err } cnrIDs, err := cp.containerList(ctx, prm) if err != nil { return []cid.ID{}, fmt.Errorf("list containers via client '%s': %w", cp.address(), err) } return cnrIDs, nil } // ListContainersStream requests identifiers of the account-owned containers. func (p *Pool) ListContainersStream(ctx context.Context, prm PrmListStream) (ResListStream, error) { var res ResListStream cp, err := p.connection() if err != nil { return res, err } res, err = cp.containerListStream(ctx, prm) if err != nil { return res, fmt.Errorf("list containers stream via client '%s': %w", cp.address(), err) } return res, nil } // DeleteContainer sends request to remove the FrostFS container and waits for the operation to complete. // // Waiting parameters can be specified using SetWaitParams. If not called, defaults are used: // // polling interval: 5s // waiting timeout: 120s // // Success can be verified by reading by identifier (see GetContainer). func (p *Pool) DeleteContainer(ctx context.Context, prm PrmContainerDelete) error { cp, err := p.connection() if err != nil { return err } err = cp.containerDelete(ctx, prm) if err != nil { return fmt.Errorf("delete container via client '%s': %w", cp.address(), err) } return nil } // AddAPEChain sends a request to set APE chain rules for a target (basically, for a container). func (p *Pool) AddAPEChain(ctx context.Context, prm PrmAddAPEChain) error { cp, err := p.connection() if err != nil { return err } err = cp.apeManagerAddChain(ctx, prm) if err != nil { return fmt.Errorf("add ape chain via client '%s': %w", cp.address(), err) } return nil } // RemoveAPEChain sends a request to remove APE chain rules for a target. func (p *Pool) RemoveAPEChain(ctx context.Context, prm PrmRemoveAPEChain) error { cp, err := p.connection() if err != nil { return err } err = cp.apeManagerRemoveChain(ctx, prm) if err != nil { return fmt.Errorf("remove ape chain via client '%s': %w", cp.address(), err) } return nil } // ListAPEChains sends a request to list APE chains rules for a target. func (p *Pool) ListAPEChains(ctx context.Context, prm PrmListAPEChains) ([]ape.Chain, error) { cp, err := p.connection() if err != nil { return nil, err } chains, err := cp.apeManagerListChains(ctx, prm) if err != nil { return nil, fmt.Errorf("list ape chains via client '%s': %w", cp.address(), err) } return chains, nil } // Balance requests current balance of the FrostFS account. // // Main return value MUST NOT be processed on an erroneous return. func (p *Pool) Balance(ctx context.Context, prm PrmBalanceGet) (accounting.Decimal, error) { cp, err := p.connection() if err != nil { return accounting.Decimal{}, err } balance, err := cp.balanceGet(ctx, prm) if err != nil { return accounting.Decimal{}, fmt.Errorf("get balance via client '%s': %w", cp.address(), err) } return balance, nil } // Statistic returns connection statistics. func (p Pool) Statistic() Statistic { stat := Statistic{} for _, inner := range p.innerPools { nodes := make([]string, 0, len(inner.clients)) inner.lock.RLock() for _, cl := range inner.clients { if cl.isHealthy() { nodes = append(nodes, cl.address()) } node := NodeStatistic{ address: cl.address(), methods: cl.methodsStatus(), overallErrors: cl.overallErrorRate(), currentErrors: cl.currentErrorRate(), } stat.nodes = append(stat.nodes, node) stat.overallErrors += node.overallErrors } inner.lock.RUnlock() if len(stat.currentNodes) == 0 { stat.currentNodes = nodes } } return stat } // waitForContainerPresence waits until the container is found on the FrostFS network. func waitForContainerPresence(ctx context.Context, cli client, prm PrmContainerGet, waitParams *WaitParams) error { return waitFor(ctx, waitParams, func(ctx context.Context) bool { _, err := cli.containerGet(ctx, prm) return err == nil }) } // waitForContainerRemoved waits until the container is removed from the FrostFS network. func waitForContainerRemoved(ctx context.Context, cli client, prm PrmContainerGet, waitParams *WaitParams) error { return waitFor(ctx, waitParams, func(ctx context.Context) bool { _, err := cli.containerGet(ctx, prm) return sdkClient.IsErrContainerNotFound(err) }) } // waitFor await that given condition will be met in waitParams time. func waitFor(ctx context.Context, params *WaitParams, condition func(context.Context) bool) error { wctx, cancel := context.WithTimeout(ctx, params.Timeout) defer cancel() ticker := time.NewTimer(params.PollInterval) defer ticker.Stop() wdone := wctx.Done() done := ctx.Done() for { select { case <-done: return ctx.Err() case <-wdone: return wctx.Err() case <-ticker.C: if condition(ctx) { return nil } ticker.Reset(params.PollInterval) } } } // NetworkInfo requests information about the FrostFS network of which the remote server is a part. // // Main return value MUST NOT be processed on an erroneous return. func (p *Pool) NetworkInfo(ctx context.Context) (netmap.NetworkInfo, error) { cp, err := p.connection() if err != nil { return netmap.NetworkInfo{}, err } netInfo, err := cp.networkInfo(ctx, prmNetworkInfo{}) if err != nil { return netmap.NetworkInfo{}, fmt.Errorf("get network info via client '%s': %w", cp.address(), err) } return netInfo, nil } // NetMapSnapshot requests information about the FrostFS network map. // // Main return value MUST NOT be processed on an erroneous return. func (p *Pool) NetMapSnapshot(ctx context.Context) (netmap.NetMap, error) { cp, err := p.connection() if err != nil { return netmap.NetMap{}, err } netMap, err := cp.netMapSnapshot(ctx, prmNetMapSnapshot{}) if err != nil { return netmap.NetMap{}, fmt.Errorf("get network map via client '%s': %w", cp.address(), err) } return netMap, nil } // Close closes the Pool and releases all the associated resources. func (p *Pool) Close() { p.cancel() <-p.closedCh // close all clients for _, pools := range p.innerPools { for _, cli := range pools.clients { _ = cli.close() } } } // SyncContainerWithNetwork applies network configuration received via // the Pool to the container. Changes the container if it does not satisfy // network configuration. // // Pool and container MUST not be nil. // // Returns any error that does not allow reading configuration // from the network. func SyncContainerWithNetwork(ctx context.Context, cnr *container.Container, p *Pool) error { ni, err := p.NetworkInfo(ctx) if err != nil { return fmt.Errorf("network info: %w", err) } container.ApplyNetworkConfig(cnr, ni) return nil } // GetSplitInfo implements relations.Relations. func (p *Pool) GetSplitInfo(ctx context.Context, cnrID cid.ID, objID oid.ID, tokens relations.Tokens) (*object.SplitInfo, error) { var addr oid.Address addr.SetContainer(cnrID) addr.SetObject(objID) var prm PrmObjectHead prm.SetAddress(addr) if tokens.Bearer != nil { prm.UseBearer(*tokens.Bearer) } if tokens.Session != nil { prm.UseSession(*tokens.Session) } prm.MarkRaw() _, err := p.HeadObject(ctx, prm) var errSplit *object.SplitInfoError switch { case errors.As(err, &errSplit): return errSplit.SplitInfo(), nil case err == nil: return nil, relations.ErrNoSplitInfo default: return nil, fmt.Errorf("failed to get raw object header: %w", err) } } // ListChildrenByLinker implements relations.Relations. func (p *Pool) ListChildrenByLinker(ctx context.Context, cnrID cid.ID, objID oid.ID, tokens relations.Tokens) ([]oid.ID, error) { var addr oid.Address addr.SetContainer(cnrID) addr.SetObject(objID) var prm PrmObjectHead prm.SetAddress(addr) if tokens.Bearer != nil { prm.UseBearer(*tokens.Bearer) } if tokens.Session != nil { prm.UseSession(*tokens.Session) } res, err := p.HeadObject(ctx, prm) if err != nil { return nil, fmt.Errorf("failed to get linking object's header: %w", err) } return res.Children(), nil } // GetLeftSibling implements relations.Relations. func (p *Pool) GetLeftSibling(ctx context.Context, cnrID cid.ID, objID oid.ID, tokens relations.Tokens) (oid.ID, error) { var addr oid.Address addr.SetContainer(cnrID) addr.SetObject(objID) var prm PrmObjectHead prm.SetAddress(addr) if tokens.Bearer != nil { prm.UseBearer(*tokens.Bearer) } if tokens.Session != nil { prm.UseSession(*tokens.Session) } res, err := p.HeadObject(ctx, prm) if err != nil { return oid.ID{}, fmt.Errorf("failed to read split chain member's header: %w", err) } idMember, ok := res.PreviousID() if !ok { return oid.ID{}, relations.ErrNoLeftSibling } return idMember, nil } // FindSiblingBySplitID implements relations.Relations. func (p *Pool) FindSiblingBySplitID(ctx context.Context, cnrID cid.ID, splitID *object.SplitID, tokens relations.Tokens) ([]oid.ID, error) { var query object.SearchFilters query.AddSplitIDFilter(object.MatchStringEqual, splitID) var prm PrmObjectSearch prm.SetContainerID(cnrID) prm.SetFilters(query) if tokens.Bearer != nil { prm.UseBearer(*tokens.Bearer) } if tokens.Session != nil { prm.UseSession(*tokens.Session) } res, err := p.SearchObjects(ctx, prm) if err != nil { return nil, fmt.Errorf("failed to search objects by split ID: %w", err) } var members []oid.ID err = res.Iterate(func(id oid.ID) bool { members = append(members, id) return false }) if err != nil { return nil, fmt.Errorf("failed to iterate found objects: %w", err) } return members, nil } // FindSiblingByParentID implements relations.Relations. func (p *Pool) FindSiblingByParentID(ctx context.Context, cnrID cid.ID, objID oid.ID, tokens relations.Tokens) ([]oid.ID, error) { var query object.SearchFilters query.AddParentIDFilter(object.MatchStringEqual, objID) var prm PrmObjectSearch prm.SetContainerID(cnrID) prm.SetFilters(query) if tokens.Bearer != nil { prm.UseBearer(*tokens.Bearer) } if tokens.Session != nil { prm.UseSession(*tokens.Session) } resSearch, err := p.SearchObjects(ctx, prm) if err != nil { return nil, fmt.Errorf("failed to find object children: %w", err) } var res []oid.ID err = resSearch.Iterate(func(id oid.ID) bool { res = append(res, id) return false }) if err != nil { return nil, fmt.Errorf("failed to iterate found objects: %w", err) } return res, nil }