package pool import ( "bytes" "context" "crypto/ecdsa" "errors" "fmt" "io" "sync" "sync/atomic" "time" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/accounting" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/ape" sdkClient "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user" "go.uber.org/zap" "go.uber.org/zap/zapcore" "google.golang.org/grpc" ) // errPoolClientUnhealthy is an error to indicate that client in pool is unhealthy. var errPoolClientUnhealthy = errors.New("pool client unhealthy") // clientStatusMonitor count error rate and other statistics for connection. type clientStatusMonitor struct { logger *zap.Logger addr string healthy *atomic.Uint32 errorThreshold uint32 mu sync.RWMutex // protect counters currentErrorCount uint32 overallErrorCount uint64 methods []*MethodStatus // RLock means the client is being used for an operation. // Lock means the client is marked as closed. status sync.RWMutex } // values for healthy status of clientStatusMonitor. const ( // statusUnhealthyOnRequest is set when communication after dialing to the // endpoint is failed due to immediate or accumulated errors, connection is // available and pool should close it before re-establishing connection once again. statusUnhealthyOnRequest = iota // statusHealthy is set when connection is ready to be used by the pool. statusHealthy ) // MethodIndex index of method in list of statuses in clientStatusMonitor. type MethodIndex int const ( methodBalanceGet MethodIndex = iota methodContainerPut methodContainerGet methodContainerList methodContainerListStream methodContainerDelete methodEndpointInfo methodNetworkInfo methodNetMapSnapshot methodObjectPut methodObjectDelete methodObjectGet methodObjectHead methodObjectRange methodObjectPatch methodSessionCreate methodAPEManagerAddChain methodAPEManagerRemoveChain methodAPEManagerListChains methodLast ) // String implements fmt.Stringer. func (m MethodIndex) String() string { switch m { case methodBalanceGet: return "balanceGet" case methodContainerPut: return "containerPut" case methodContainerGet: return "containerGet" case methodContainerList: return "containerList" case methodContainerListStream: return "containerListStream" case methodContainerDelete: return "containerDelete" case methodEndpointInfo: return "endpointInfo" case methodNetworkInfo: return "networkInfo" case methodNetMapSnapshot: return "netMapSnapshot" case methodObjectPut: return "objectPut" case methodObjectPatch: return "objectPatch" case methodObjectDelete: return "objectDelete" case methodObjectGet: return "objectGet" case methodObjectHead: return "objectHead" case methodObjectRange: return "objectRange" case methodSessionCreate: return "sessionCreate" case methodAPEManagerAddChain: return "apeManagerAddChain" case methodAPEManagerRemoveChain: return "apeManagerRemoveChain" case methodAPEManagerListChains: return "apeManagerListChains" case methodLast: return "it's a system name rather than a method" default: return "unknown" } } func newClientStatusMonitor(logger *zap.Logger, addr string, errorThreshold uint32) clientStatusMonitor { methods := make([]*MethodStatus, methodLast) for i := methodBalanceGet; i < methodLast; i++ { methods[i] = &MethodStatus{name: i.String()} } healthy := new(atomic.Uint32) healthy.Store(statusHealthy) return clientStatusMonitor{ logger: logger, addr: addr, healthy: healthy, errorThreshold: errorThreshold, methods: methods, } } // clientWrapper is used by default, alternative implementations are intended for testing purposes only. type clientWrapper struct { clientMutex sync.RWMutex client *sdkClient.Client dialed bool prm wrapperPrm clientStatusMonitor } // wrapperPrm is params to create clientWrapper. type wrapperPrm struct { logger *zap.Logger address string key ecdsa.PrivateKey dialTimeout time.Duration streamTimeout time.Duration errorThreshold uint32 responseInfoCallback func(sdkClient.ResponseMetaInfo) error poolRequestInfoCallback func(RequestInfo) dialOptions []grpc.DialOption gracefulCloseOnSwitchTimeout time.Duration } // setAddress sets endpoint to connect in FrostFS network. func (x *wrapperPrm) setAddress(address string) { x.address = address } // setKey sets sdkClient.Client private key to be used for the protocol communication by default. func (x *wrapperPrm) setKey(key ecdsa.PrivateKey) { x.key = key } // setLogger sets sdkClient.Client logger. func (x *wrapperPrm) setLogger(logger *zap.Logger) { x.logger = logger } // setDialTimeout sets the timeout for connection to be established. func (x *wrapperPrm) setDialTimeout(timeout time.Duration) { x.dialTimeout = timeout } // setStreamTimeout sets the timeout for individual operations in streaming RPC. func (x *wrapperPrm) setStreamTimeout(timeout time.Duration) { x.streamTimeout = timeout } // setErrorThreshold sets threshold after reaching which connection is considered unhealthy // until Pool.startRebalance routing updates its status. func (x *wrapperPrm) setErrorThreshold(threshold uint32) { x.errorThreshold = threshold } // setGracefulCloseOnSwitchTimeout specifies the timeout after which unhealthy client be closed during rebalancing // if it will become healthy back. // // See also setErrorThreshold. func (x *wrapperPrm) setGracefulCloseOnSwitchTimeout(timeout time.Duration) { x.gracefulCloseOnSwitchTimeout = timeout } // setPoolRequestCallback sets callback that will be invoked after every pool response. func (x *wrapperPrm) setPoolRequestCallback(f func(RequestInfo)) { x.poolRequestInfoCallback = f } // setResponseInfoCallback sets callback that will be invoked after every response. func (x *wrapperPrm) setResponseInfoCallback(f func(sdkClient.ResponseMetaInfo) error) { x.responseInfoCallback = f } // setGRPCDialOptions sets the gRPC dial options for new gRPC client connection. func (x *wrapperPrm) setGRPCDialOptions(opts []grpc.DialOption) { x.dialOptions = opts } // newWrapper creates a clientWrapper that implements the client interface. func newWrapper(prm wrapperPrm) *clientWrapper { var cl sdkClient.Client prmInit := sdkClient.PrmInit{ Key: prm.key, ResponseInfoCallback: prm.responseInfoCallback, } cl.Init(prmInit) res := &clientWrapper{ client: &cl, clientStatusMonitor: newClientStatusMonitor(prm.logger, prm.address, prm.errorThreshold), prm: prm, } return res } // dial establishes a connection to the server from the FrostFS network. // Returns an error describing failure reason. If failed, the client // SHOULD NOT be used. func (c *clientWrapper) dial(ctx context.Context) error { cl, err := c.getClient() if err != nil { return err } prmDial := sdkClient.PrmDial{ Endpoint: c.prm.address, DialTimeout: c.prm.dialTimeout, StreamTimeout: c.prm.streamTimeout, GRPCDialOptions: c.prm.dialOptions, } err = cl.Dial(ctx, prmDial) c.setDialed(err == nil) if err != nil { return err } return nil } // restart recreates and redial inner sdk client. func (c *clientWrapper) restart(ctx context.Context) error { var cl sdkClient.Client prmInit := sdkClient.PrmInit{ Key: c.prm.key, ResponseInfoCallback: c.prm.responseInfoCallback, } cl.Init(prmInit) prmDial := sdkClient.PrmDial{ Endpoint: c.prm.address, DialTimeout: c.prm.dialTimeout, StreamTimeout: c.prm.streamTimeout, GRPCDialOptions: c.prm.dialOptions, } // if connection is dialed before, to avoid routine / connection leak, // pool has to close it and then initialize once again. if c.isDialed() { c.scheduleGracefulClose() } err := cl.Dial(ctx, prmDial) c.setDialed(err == nil) if err != nil { return err } c.clientMutex.Lock() c.client = &cl c.clientMutex.Unlock() return nil } func (c *clientWrapper) isDialed() bool { c.mu.RLock() defer c.mu.RUnlock() return c.dialed } func (c *clientWrapper) setDialed(dialed bool) { c.mu.Lock() c.dialed = dialed c.mu.Unlock() } func (c *clientWrapper) getClient() (*sdkClient.Client, error) { c.clientMutex.RLock() defer c.clientMutex.RUnlock() if c.isHealthy() { return c.client, nil } return nil, errPoolClientUnhealthy } func (c *clientWrapper) getClientRaw() *sdkClient.Client { c.clientMutex.RLock() defer c.clientMutex.RUnlock() return c.client } // balanceGet invokes sdkClient.BalanceGet parse response status to error and return result as is. func (c *clientWrapper) balanceGet(ctx context.Context, prm PrmBalanceGet) (accounting.Decimal, error) { cl, err := c.getClient() if err != nil { return accounting.Decimal{}, err } cliPrm := sdkClient.PrmBalanceGet{ Account: prm.account, } start := time.Now() res, err := cl.BalanceGet(ctx, cliPrm) c.incRequests(time.Since(start), methodBalanceGet) var st apistatus.Status if res != nil { st = res.Status() } if err = c.handleError(ctx, st, err); err != nil { return accounting.Decimal{}, fmt.Errorf("balance get on client: %w", err) } return res.Amount(), nil } // containerPut invokes sdkClient.ContainerPut parse response status to error and return result as is. // It also waits for the container to appear on the network. func (c *clientWrapper) containerPut(ctx context.Context, prm PrmContainerPut) (cid.ID, error) { cl, err := c.getClient() if err != nil { return cid.ID{}, err } start := time.Now() res, err := cl.ContainerPut(ctx, prm.ClientParams) c.incRequests(time.Since(start), methodContainerPut) var st apistatus.Status if res != nil { st = res.Status() } if err = c.handleError(ctx, st, err); err != nil { return cid.ID{}, fmt.Errorf("container put on client: %w", err) } if prm.WaitParams == nil { prm.WaitParams = defaultWaitParams() } if err = prm.WaitParams.CheckValidity(); err != nil { return cid.ID{}, fmt.Errorf("invalid wait parameters: %w", err) } idCnr := res.ID() getPrm := PrmContainerGet{ ContainerID: idCnr, Session: prm.ClientParams.Session, } err = waitForContainerPresence(ctx, c, getPrm, prm.WaitParams) if err = c.handleError(ctx, nil, err); err != nil { return cid.ID{}, fmt.Errorf("wait container presence on client: %w", err) } return idCnr, nil } // containerGet invokes sdkClient.ContainerGet parse response status to error and return result as is. func (c *clientWrapper) containerGet(ctx context.Context, prm PrmContainerGet) (container.Container, error) { cl, err := c.getClient() if err != nil { return container.Container{}, err } cliPrm := sdkClient.PrmContainerGet{ ContainerID: &prm.ContainerID, Session: prm.Session, } start := time.Now() res, err := cl.ContainerGet(ctx, cliPrm) c.incRequests(time.Since(start), methodContainerGet) var st apistatus.Status if res != nil { st = res.Status() } if err = c.handleError(ctx, st, err); err != nil { return container.Container{}, fmt.Errorf("container get on client: %w", err) } return res.Container(), nil } // containerList invokes sdkClient.ContainerList parse response status to error and return result as is. func (c *clientWrapper) containerList(ctx context.Context, prm PrmContainerList) ([]cid.ID, error) { cl, err := c.getClient() if err != nil { return nil, err } cliPrm := sdkClient.PrmContainerList{ OwnerID: prm.OwnerID, Session: prm.Session, } start := time.Now() res, err := cl.ContainerList(ctx, cliPrm) c.incRequests(time.Since(start), methodContainerList) var st apistatus.Status if res != nil { st = res.Status() } if err = c.handleError(ctx, st, err); err != nil { return nil, fmt.Errorf("container list on client: %w", err) } return res.Containers(), nil } func (c *clientStatusMonitor) incRequestCount() bool { return c.status.TryRLock() } func (c *clientStatusMonitor) decRequestCount() { c.status.RUnlock() } func (c *clientStatusMonitor) markClientClosed() { c.status.Lock() } // PrmListStream groups parameters of ListContainersStream operation. type PrmListStream struct { OwnerID user.ID Session *session.Container } // ResListStream is designed to read list of object identifiers from FrostFS system. // // Must be initialized using Pool.ListContainersStream, any other usage is unsafe. type ResListStream struct { r *sdkClient.ContainerListReader elapsedTimeCallback func(time.Duration) handleError func(context.Context, apistatus.Status, error) error } // Read reads another list of the container identifiers. func (x *ResListStream) Read(buf []cid.ID) (int, error) { start := time.Now() n, ok := x.r.Read(buf) x.elapsedTimeCallback(time.Since(start)) if !ok { res, err := x.r.Close() if err == nil { return n, io.EOF } var status apistatus.Status if res != nil { status = res.Status() } err = x.handleError(nil, status, err) return n, err } return n, nil } // Iterate iterates over the list of found container identifiers. // f can return true to stop iteration earlier. // // Returns an error if container can't be read. func (x *ResListStream) Iterate(f func(cid.ID) bool) error { start := time.Now() err := x.r.Iterate(func(id cid.ID) bool { x.elapsedTimeCallback(time.Since(start)) stop := f(id) start = time.Now() return stop }) return err } // Close ends reading list of the matched containers and returns the result of the operation // along with the final results. Must be called after using the ResListStream. func (x *ResListStream) Close() { _, _ = x.r.Close() } // containerList invokes sdkClient.ContainerList parse response status to error and return result as is. func (c *clientWrapper) containerListStream(ctx context.Context, prm PrmListStream) (ResListStream, error) { cl, err := c.getClient() if err != nil { return ResListStream{}, err } cliPrm := sdkClient.PrmContainerListStream{ OwnerID: prm.OwnerID, Session: prm.Session, } start := time.Now() cnrRdr, err := cl.ContainerListInit(ctx, cliPrm) c.incRequests(time.Since(start), methodContainerListStream) if err = c.handleError(ctx, nil, err); err != nil { return ResListStream{}, fmt.Errorf("init container listing on client: %w", err) } return ResListStream{ r: cnrRdr, elapsedTimeCallback: func(elapsed time.Duration) { c.incRequests(elapsed, methodContainerListStream) }, handleError: c.handleError, }, nil } // containerDelete invokes sdkClient.ContainerDelete parse response status to error. // It also waits for the container to be removed from the network. func (c *clientWrapper) containerDelete(ctx context.Context, prm PrmContainerDelete) error { cl, err := c.getClient() if err != nil { return err } cliPrm := sdkClient.PrmContainerDelete{ ContainerID: &prm.ContainerID, Session: prm.Session, } start := time.Now() res, err := cl.ContainerDelete(ctx, cliPrm) c.incRequests(time.Since(start), methodContainerDelete) var st apistatus.Status if res != nil { st = res.Status() } if err = c.handleError(ctx, st, err); err != nil { return fmt.Errorf("container delete on client: %w", err) } if prm.WaitParams == nil { prm.WaitParams = defaultWaitParams() } if err := prm.WaitParams.CheckValidity(); err != nil { return fmt.Errorf("invalid wait parameters: %w", err) } getPrm := PrmContainerGet{ ContainerID: prm.ContainerID, Session: prm.Session, } return waitForContainerRemoved(ctx, c, getPrm, prm.WaitParams) } // apeManagerAddChain invokes sdkClient.APEManagerAddChain and parse response status to error. func (c *clientWrapper) apeManagerAddChain(ctx context.Context, prm PrmAddAPEChain) error { cl, err := c.getClient() if err != nil { return err } cliPrm := sdkClient.PrmAPEManagerAddChain{ ChainTarget: prm.Target, Chain: prm.Chain, } start := time.Now() res, err := cl.APEManagerAddChain(ctx, cliPrm) c.incRequests(time.Since(start), methodAPEManagerAddChain) var st apistatus.Status if res != nil { st = res.Status() } if err = c.handleError(ctx, st, err); err != nil { return fmt.Errorf("add chain error: %w", err) } return nil } // apeManagerRemoveChain invokes sdkClient.APEManagerRemoveChain and parse response status to error. func (c *clientWrapper) apeManagerRemoveChain(ctx context.Context, prm PrmRemoveAPEChain) error { cl, err := c.getClient() if err != nil { return err } cliPrm := sdkClient.PrmAPEManagerRemoveChain{ ChainTarget: prm.Target, ChainID: prm.ChainID, } start := time.Now() res, err := cl.APEManagerRemoveChain(ctx, cliPrm) c.incRequests(time.Since(start), methodAPEManagerRemoveChain) var st apistatus.Status if res != nil { st = res.Status() } if err = c.handleError(ctx, st, err); err != nil { return fmt.Errorf("remove chain error: %w", err) } return nil } // apeManagerListChains invokes sdkClient.APEManagerListChains. Returns chains and parsed response status to error. func (c *clientWrapper) apeManagerListChains(ctx context.Context, prm PrmListAPEChains) ([]ape.Chain, error) { cl, err := c.getClient() if err != nil { return nil, err } cliPrm := sdkClient.PrmAPEManagerListChains{ ChainTarget: prm.Target, } start := time.Now() res, err := cl.APEManagerListChains(ctx, cliPrm) c.incRequests(time.Since(start), methodAPEManagerListChains) var st apistatus.Status if res != nil { st = res.Status() } if err = c.handleError(ctx, st, err); err != nil { return nil, fmt.Errorf("list chains error: %w", err) } return res.Chains, nil } // endpointInfo invokes sdkClient.EndpointInfo parse response status to error and return result as is. func (c *clientWrapper) endpointInfo(ctx context.Context, _ prmEndpointInfo) (netmap.NodeInfo, error) { cl, err := c.getClient() if err != nil { return netmap.NodeInfo{}, err } return c.endpointInfoRaw(ctx, cl) } func (c *clientWrapper) healthcheck(ctx context.Context) (netmap.NodeInfo, error) { cl := c.getClientRaw() return c.endpointInfoRaw(ctx, cl) } func (c *clientWrapper) endpointInfoRaw(ctx context.Context, cl *sdkClient.Client) (netmap.NodeInfo, error) { start := time.Now() res, err := cl.EndpointInfo(ctx, sdkClient.PrmEndpointInfo{}) c.incRequests(time.Since(start), methodEndpointInfo) var st apistatus.Status if res != nil { st = res.Status() } if err = c.handleError(ctx, st, err); err != nil { return netmap.NodeInfo{}, fmt.Errorf("endpoint info on client: %w", err) } return res.NodeInfo(), nil } // networkInfo invokes sdkClient.NetworkInfo parse response status to error and return result as is. func (c *clientWrapper) networkInfo(ctx context.Context, _ prmNetworkInfo) (netmap.NetworkInfo, error) { cl, err := c.getClient() if err != nil { return netmap.NetworkInfo{}, err } start := time.Now() res, err := cl.NetworkInfo(ctx, sdkClient.PrmNetworkInfo{}) c.incRequests(time.Since(start), methodNetworkInfo) var st apistatus.Status if res != nil { st = res.Status() } if err = c.handleError(ctx, st, err); err != nil { return netmap.NetworkInfo{}, fmt.Errorf("network info on client: %w", err) } return res.Info(), nil } // networkInfo invokes sdkClient.NetworkInfo parse response status to error and return result as is. func (c *clientWrapper) netMapSnapshot(ctx context.Context, _ prmNetMapSnapshot) (netmap.NetMap, error) { cl, err := c.getClient() if err != nil { return netmap.NetMap{}, err } start := time.Now() res, err := cl.NetMapSnapshot(ctx, sdkClient.PrmNetMapSnapshot{}) c.incRequests(time.Since(start), methodNetMapSnapshot) var st apistatus.Status if res != nil { st = res.Status() } if err = c.handleError(ctx, st, err); err != nil { return netmap.NetMap{}, fmt.Errorf("network map snapshot on client: %w", err) } return res.NetMap(), nil } // objectPatch patches object in FrostFS. func (c *clientWrapper) objectPatch(ctx context.Context, prm PrmObjectPatch) (ResPatchObject, error) { cl, err := c.getClient() if err != nil { return ResPatchObject{}, err } start := time.Now() pObj, err := cl.ObjectPatchInit(ctx, sdkClient.PrmObjectPatch{ Address: prm.addr, Session: prm.stoken, Key: prm.key, BearerToken: prm.btoken, MaxChunkLength: prm.maxPayloadPatchChunkLength, }) if err = c.handleError(ctx, nil, err); err != nil { return ResPatchObject{}, fmt.Errorf("init patching on API client: %w", err) } c.incRequests(time.Since(start), methodObjectPatch) start = time.Now() attrPatchSuccess := pObj.PatchAttributes(ctx, prm.newAttrs, prm.replaceAttrs) c.incRequests(time.Since(start), methodObjectPatch) if attrPatchSuccess { start = time.Now() _ = pObj.PatchPayload(ctx, prm.rng, prm.payload) c.incRequests(time.Since(start), methodObjectPatch) } res, err := pObj.Close(ctx) var st apistatus.Status if res != nil { st = res.Status() } if err = c.handleError(ctx, st, err); err != nil { return ResPatchObject{}, fmt.Errorf("client failure: %w", err) } return ResPatchObject{ObjectID: res.ObjectID()}, nil } // objectPut writes object to FrostFS. func (c *clientWrapper) objectPut(ctx context.Context, prm PrmObjectPut) (ResPutObject, error) { if prm.bufferMaxSize == 0 { prm.bufferMaxSize = defaultBufferMaxSizeForPut } if prm.clientCut { return c.objectPutClientCut(ctx, prm) } return c.objectPutServerCut(ctx, prm) } func (c *clientWrapper) objectPutServerCut(ctx context.Context, prm PrmObjectPut) (ResPutObject, error) { cl, err := c.getClient() if err != nil { return ResPutObject{}, err } cliPrm := sdkClient.PrmObjectPutInit{ CopiesNumber: prm.copiesNumber, Session: prm.stoken, Key: prm.key, BearerToken: prm.btoken, } start := time.Now() wObj, err := cl.ObjectPutInit(ctx, cliPrm) c.incRequests(time.Since(start), methodObjectPut) if err = c.handleError(ctx, nil, err); err != nil { return ResPutObject{}, fmt.Errorf("init writing on API client: %w", err) } if wObj.WriteHeader(ctx, prm.hdr) { sz := prm.hdr.PayloadSize() if data := prm.hdr.Payload(); len(data) > 0 { if prm.payload != nil { prm.payload = io.MultiReader(bytes.NewReader(data), prm.payload) } else { prm.payload = bytes.NewReader(data) sz = uint64(len(data)) } } if prm.payload != nil { if sz == 0 || sz > prm.bufferMaxSize { sz = prm.bufferMaxSize } buf := make([]byte, sz) var n int for { n, err = prm.payload.Read(buf) if n > 0 { start = time.Now() successWrite := wObj.WritePayloadChunk(ctx, buf[:n]) c.incRequests(time.Since(start), methodObjectPut) if !successWrite { break } continue } if errors.Is(err, io.EOF) { break } return ResPutObject{}, fmt.Errorf("read payload: %w", c.handleError(ctx, nil, err)) } } } res, err := wObj.Close(ctx) var st apistatus.Status if res != nil { st = res.Status() } if err = c.handleError(ctx, st, err); err != nil { // here err already carries both status and client errors return ResPutObject{}, fmt.Errorf("client failure: %w", err) } return ResPutObject{ ObjectID: res.StoredObjectID(), Epoch: res.StoredEpoch(), }, nil } func (c *clientWrapper) objectPutClientCut(ctx context.Context, prm PrmObjectPut) (ResPutObject, error) { putInitPrm := PrmObjectPutClientCutInit{ PrmObjectPut: prm, } start := time.Now() wObj, err := c.objectPutInitTransformer(putInitPrm) c.incRequests(time.Since(start), methodObjectPut) if err = c.handleError(ctx, nil, err); err != nil { return ResPutObject{}, fmt.Errorf("init writing on API client: %w", err) } if wObj.WriteHeader(ctx, prm.hdr) { sz := prm.hdr.PayloadSize() if data := prm.hdr.Payload(); len(data) > 0 { if prm.payload != nil { prm.payload = io.MultiReader(bytes.NewReader(data), prm.payload) } else { prm.payload = bytes.NewReader(data) sz = uint64(len(data)) } } if prm.payload != nil { if sz == 0 || sz > prm.bufferMaxSize { sz = prm.bufferMaxSize } buf := make([]byte, sz) var n int for { n, err = prm.payload.Read(buf) if n > 0 { start = time.Now() successWrite := wObj.WritePayloadChunk(ctx, buf[:n]) c.incRequests(time.Since(start), methodObjectPut) if !successWrite { break } continue } if errors.Is(err, io.EOF) { break } return ResPutObject{}, fmt.Errorf("read payload: %w", c.handleError(ctx, nil, err)) } } } res, err := wObj.Close(ctx) var st apistatus.Status if res != nil { st = res.Status } if err = c.handleError(ctx, st, err); err != nil { // here err already carries both status and client errors return ResPutObject{}, fmt.Errorf("client failure: %w", err) } return ResPutObject{ ObjectID: res.OID, Epoch: res.Epoch, }, nil } // objectDelete invokes sdkClient.ObjectDelete parse response status to error. func (c *clientWrapper) objectDelete(ctx context.Context, prm PrmObjectDelete) error { cl, err := c.getClient() if err != nil { return err } cnr := prm.addr.Container() obj := prm.addr.Object() cliPrm := sdkClient.PrmObjectDelete{ BearerToken: prm.btoken, Session: prm.stoken, ContainerID: &cnr, ObjectID: &obj, Key: prm.key, } start := time.Now() res, err := cl.ObjectDelete(ctx, cliPrm) c.incRequests(time.Since(start), methodObjectDelete) var st apistatus.Status if res != nil { st = res.Status() } if err = c.handleError(ctx, st, err); err != nil { return fmt.Errorf("delete object on client: %w", err) } return nil } // objectGet returns reader for object. func (c *clientWrapper) objectGet(ctx context.Context, prm PrmObjectGet) (ResGetObject, error) { cl, err := c.getClient() if err != nil { return ResGetObject{}, err } prmCnr := prm.addr.Container() prmObj := prm.addr.Object() cliPrm := sdkClient.PrmObjectGet{ BearerToken: prm.btoken, Session: prm.stoken, ContainerID: &prmCnr, ObjectID: &prmObj, Key: prm.key, } var res ResGetObject rObj, err := cl.ObjectGetInit(ctx, cliPrm) if err = c.handleError(ctx, nil, err); err != nil { return ResGetObject{}, fmt.Errorf("init object reading on client: %w", err) } start := time.Now() successReadHeader := rObj.ReadHeader(&res.Header) c.incRequests(time.Since(start), methodObjectGet) if !successReadHeader { rObjRes, err := rObj.Close() var st apistatus.Status if rObjRes != nil { st = rObjRes.Status() } err = c.handleError(ctx, st, err) return res, fmt.Errorf("read header: %w", err) } res.Payload = &objectReadCloser{ reader: rObj, elapsedTimeCallback: func(elapsed time.Duration) { c.incRequests(elapsed, methodObjectGet) }, } return res, nil } // objectHead invokes sdkClient.ObjectHead parse response status to error and return result as is. func (c *clientWrapper) objectHead(ctx context.Context, prm PrmObjectHead) (object.Object, error) { cl, err := c.getClient() if err != nil { return object.Object{}, err } prmCnr := prm.addr.Container() prmObj := prm.addr.Object() cliPrm := sdkClient.PrmObjectHead{ BearerToken: prm.btoken, Session: prm.stoken, Raw: prm.raw, ContainerID: &prmCnr, ObjectID: &prmObj, Key: prm.key, } var obj object.Object start := time.Now() res, err := cl.ObjectHead(ctx, cliPrm) c.incRequests(time.Since(start), methodObjectHead) var st apistatus.Status if res != nil { st = res.Status() } if err = c.handleError(ctx, st, err); err != nil { return obj, fmt.Errorf("read object header via client: %w", err) } if !res.ReadHeader(&obj) { return obj, errors.New("missing object header in response") } return obj, nil } // objectRange returns object range reader. func (c *clientWrapper) objectRange(ctx context.Context, prm PrmObjectRange) (ResObjectRange, error) { cl, err := c.getClient() if err != nil { return ResObjectRange{}, err } prmCnr := prm.addr.Container() prmObj := prm.addr.Object() cliPrm := sdkClient.PrmObjectRange{ BearerToken: prm.btoken, Session: prm.stoken, ContainerID: &prmCnr, ObjectID: &prmObj, Offset: prm.off, Length: prm.ln, Key: prm.key, } start := time.Now() res, err := cl.ObjectRangeInit(ctx, cliPrm) c.incRequests(time.Since(start), methodObjectRange) if err = c.handleError(ctx, nil, err); err != nil { return ResObjectRange{}, fmt.Errorf("init payload range reading on client: %w", err) } return ResObjectRange{ payload: res, elapsedTimeCallback: func(elapsed time.Duration) { c.incRequests(elapsed, methodObjectRange) }, }, nil } // objectSearch invokes sdkClient.ObjectSearchInit parse response status to error and return result as is. func (c *clientWrapper) objectSearch(ctx context.Context, prm PrmObjectSearch) (ResObjectSearch, error) { cl, err := c.getClient() if err != nil { return ResObjectSearch{}, err } cliPrm := sdkClient.PrmObjectSearch{ ContainerID: &prm.cnrID, Filters: prm.filters, Session: prm.stoken, BearerToken: prm.btoken, Key: prm.key, } res, err := cl.ObjectSearchInit(ctx, cliPrm) if err = c.handleError(ctx, nil, err); err != nil { return ResObjectSearch{}, fmt.Errorf("init object searching on client: %w", err) } return ResObjectSearch{r: res, handleError: c.handleError}, nil } // sessionCreate invokes sdkClient.SessionCreate parse response status to error and return result as is. func (c *clientWrapper) sessionCreate(ctx context.Context, prm prmCreateSession) (resCreateSession, error) { cl, err := c.getClient() if err != nil { return resCreateSession{}, err } cliPrm := sdkClient.PrmSessionCreate{ Expiration: prm.exp, Key: &prm.key, } start := time.Now() res, err := cl.SessionCreate(ctx, cliPrm) c.incRequests(time.Since(start), methodSessionCreate) var st apistatus.Status if res != nil { st = res.Status() } if err = c.handleError(ctx, st, err); err != nil { return resCreateSession{}, fmt.Errorf("session creation on client: %w", err) } return resCreateSession{ id: res.ID(), sessionKey: res.PublicKey(), }, nil } func (c *clientStatusMonitor) isHealthy() bool { return c.healthy.Load() == statusHealthy } func (c *clientStatusMonitor) setHealthy() { c.healthy.Store(statusHealthy) } func (c *clientStatusMonitor) setUnhealthy() { c.healthy.Store(statusUnhealthyOnRequest) } func (c *clientStatusMonitor) address() string { return c.addr } func (c *clientStatusMonitor) incErrorRate() { c.mu.Lock() c.currentErrorCount++ c.overallErrorCount++ thresholdReached := c.currentErrorCount >= c.errorThreshold if thresholdReached { c.setUnhealthy() c.currentErrorCount = 0 } c.mu.Unlock() if thresholdReached { c.log(zapcore.WarnLevel, "error threshold reached", zap.String("address", c.addr), zap.Uint32("threshold", c.errorThreshold)) } } func (c *clientStatusMonitor) incErrorRateToUnhealthy(err error) { c.mu.Lock() c.currentErrorCount = 0 c.overallErrorCount++ c.setUnhealthy() c.mu.Unlock() c.log(zapcore.WarnLevel, "explicitly mark node unhealthy", zap.String("address", c.addr), zap.Error(err)) } func (c *clientStatusMonitor) log(level zapcore.Level, msg string, fields ...zap.Field) { if c.logger == nil { return } c.logger.Log(level, msg, fields...) } func (c *clientStatusMonitor) currentErrorRate() uint32 { c.mu.RLock() defer c.mu.RUnlock() return c.currentErrorCount } func (c *clientStatusMonitor) overallErrorRate() uint64 { c.mu.RLock() defer c.mu.RUnlock() return c.overallErrorCount } func (c *clientStatusMonitor) methodsStatus() []StatusSnapshot { result := make([]StatusSnapshot, len(c.methods)) for i, val := range c.methods { result[i] = val.Snapshot() } return result } func (c *clientWrapper) incRequests(elapsed time.Duration, method MethodIndex) { methodStat := c.methods[method] methodStat.IncRequests(elapsed) if c.prm.poolRequestInfoCallback != nil { c.prm.poolRequestInfoCallback(RequestInfo{ Address: c.prm.address, Method: method, Elapsed: elapsed, }) } } func (c *clientWrapper) close() error { if !c.isDialed() { return nil } if cl := c.getClientRaw(); cl != nil { return cl.Close() } return nil } func (c *clientWrapper) scheduleGracefulClose() { cl := c.getClientRaw() if cl == nil { return } time.AfterFunc(c.prm.gracefulCloseOnSwitchTimeout, func() { if err := cl.Close(); err != nil { c.log(zap.DebugLevel, "close unhealthy client during rebalance", zap.String("address", c.address()), zap.Error(err)) } }) } func (c *clientStatusMonitor) handleError(ctx context.Context, st apistatus.Status, err error) error { if stErr := apistatus.ErrFromStatus(st); stErr != nil { switch stErr.(type) { case *apistatus.ServerInternal, *apistatus.WrongMagicNumber, *apistatus.SignatureVerification: c.incErrorRate() case *apistatus.NodeUnderMaintenance: c.incErrorRateToUnhealthy(stErr) } if err == nil { err = stErr } return err } if err != nil { if needCountError(ctx, err) { if sdkClient.IsErrNodeUnderMaintenance(err) { c.incErrorRateToUnhealthy(err) } else { c.incErrorRate() } } return err } return nil } func needCountError(ctx context.Context, err error) bool { // non-status logic error that could be returned // from the SDK client; should not be considered // as a connection error var siErr *object.SplitInfoError if errors.As(err, &siErr) { return false } var eiErr *object.ECInfoError if errors.As(err, &eiErr) { return false } if ctx != nil && errors.Is(ctx.Err(), context.Canceled) { return false } return true } // clientBuilder is a type alias of client constructors which open connection // to the given endpoint. type clientBuilder = func(endpoint string) client // RequestInfo groups info about pool request. type RequestInfo struct { Address string Method MethodIndex Elapsed time.Duration }