diff --git a/pool/client.go b/pool/client.go new file mode 100644 index 0000000..f1abbf2 --- /dev/null +++ b/pool/client.go @@ -0,0 +1,1283 @@ +package pool + +import ( + "bytes" + "context" + "crypto/ecdsa" + "errors" + "fmt" + "io" + "sync" + "sync/atomic" + "time" + + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/accounting" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/ape" + sdkClient "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client" + apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container" + cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" + "google.golang.org/grpc" +) + +// errPoolClientUnhealthy is an error to indicate that client in pool is unhealthy. +var errPoolClientUnhealthy = errors.New("pool client unhealthy") + +// clientStatusMonitor count error rate and other statistics for connection. +type clientStatusMonitor struct { + logger *zap.Logger + addr string + healthy *atomic.Uint32 + errorThreshold uint32 + + mu sync.RWMutex // protect counters + currentErrorCount uint32 + overallErrorCount uint64 + methods []*MethodStatus +} + +// values for healthy status of clientStatusMonitor. +const ( + // statusUnhealthyOnRequest is set when communication after dialing to the + // endpoint is failed due to immediate or accumulated errors, connection is + // available and pool should close it before re-establishing connection once again. + statusUnhealthyOnRequest = iota + + // statusHealthy is set when connection is ready to be used by the pool. + statusHealthy +) + +// MethodIndex index of method in list of statuses in clientStatusMonitor. +type MethodIndex int + +const ( + methodBalanceGet MethodIndex = iota + methodContainerPut + methodContainerGet + methodContainerList + methodContainerListStream + methodContainerDelete + methodEndpointInfo + methodNetworkInfo + methodNetMapSnapshot + methodObjectPut + methodObjectDelete + methodObjectGet + methodObjectHead + methodObjectRange + methodObjectPatch + methodSessionCreate + methodAPEManagerAddChain + methodAPEManagerRemoveChain + methodAPEManagerListChains + methodLast +) + +// String implements fmt.Stringer. +func (m MethodIndex) String() string { + switch m { + case methodBalanceGet: + return "balanceGet" + case methodContainerPut: + return "containerPut" + case methodContainerGet: + return "containerGet" + case methodContainerList: + return "containerList" + case methodContainerDelete: + return "containerDelete" + case methodEndpointInfo: + return "endpointInfo" + case methodNetworkInfo: + return "networkInfo" + case methodNetMapSnapshot: + return "netMapSnapshot" + case methodObjectPut: + return "objectPut" + case methodObjectPatch: + return "objectPatch" + case methodObjectDelete: + return "objectDelete" + case methodObjectGet: + return "objectGet" + case methodObjectHead: + return "objectHead" + case methodObjectRange: + return "objectRange" + case methodSessionCreate: + return "sessionCreate" + case methodAPEManagerAddChain: + return "apeManagerAddChain" + case methodAPEManagerRemoveChain: + return "apeManagerRemoveChain" + case methodAPEManagerListChains: + return "apeManagerListChains" + case methodLast: + return "it's a system name rather than a method" + default: + return "unknown" + } +} + +func newClientStatusMonitor(logger *zap.Logger, addr string, errorThreshold uint32) clientStatusMonitor { + methods := make([]*MethodStatus, methodLast) + for i := methodBalanceGet; i < methodLast; i++ { + methods[i] = &MethodStatus{name: i.String()} + } + + healthy := new(atomic.Uint32) + healthy.Store(statusHealthy) + + return clientStatusMonitor{ + logger: logger, + addr: addr, + healthy: healthy, + errorThreshold: errorThreshold, + methods: methods, + } +} + +// clientWrapper is used by default, alternative implementations are intended for testing purposes only. +type clientWrapper struct { + clientMutex sync.RWMutex + client *sdkClient.Client + dialed bool + prm wrapperPrm + + clientStatusMonitor +} + +// wrapperPrm is params to create clientWrapper. +type wrapperPrm struct { + logger *zap.Logger + address string + key ecdsa.PrivateKey + dialTimeout time.Duration + streamTimeout time.Duration + errorThreshold uint32 + responseInfoCallback func(sdkClient.ResponseMetaInfo) error + poolRequestInfoCallback func(RequestInfo) + dialOptions []grpc.DialOption + + gracefulCloseOnSwitchTimeout time.Duration +} + +// setAddress sets endpoint to connect in FrostFS network. +func (x *wrapperPrm) setAddress(address string) { + x.address = address +} + +// setKey sets sdkClient.Client private key to be used for the protocol communication by default. +func (x *wrapperPrm) setKey(key ecdsa.PrivateKey) { + x.key = key +} + +// setLogger sets sdkClient.Client logger. +func (x *wrapperPrm) setLogger(logger *zap.Logger) { + x.logger = logger +} + +// setDialTimeout sets the timeout for connection to be established. +func (x *wrapperPrm) setDialTimeout(timeout time.Duration) { + x.dialTimeout = timeout +} + +// setStreamTimeout sets the timeout for individual operations in streaming RPC. +func (x *wrapperPrm) setStreamTimeout(timeout time.Duration) { + x.streamTimeout = timeout +} + +// setErrorThreshold sets threshold after reaching which connection is considered unhealthy +// until Pool.startRebalance routing updates its status. +func (x *wrapperPrm) setErrorThreshold(threshold uint32) { + x.errorThreshold = threshold +} + +// setGracefulCloseOnSwitchTimeout specifies the timeout after which unhealthy client be closed during rebalancing +// if it will become healthy back. +// +// See also setErrorThreshold. +func (x *wrapperPrm) setGracefulCloseOnSwitchTimeout(timeout time.Duration) { + x.gracefulCloseOnSwitchTimeout = timeout +} + +// setPoolRequestCallback sets callback that will be invoked after every pool response. +func (x *wrapperPrm) setPoolRequestCallback(f func(RequestInfo)) { + x.poolRequestInfoCallback = f +} + +// setResponseInfoCallback sets callback that will be invoked after every response. +func (x *wrapperPrm) setResponseInfoCallback(f func(sdkClient.ResponseMetaInfo) error) { + x.responseInfoCallback = f +} + +// setGRPCDialOptions sets the gRPC dial options for new gRPC client connection. +func (x *wrapperPrm) setGRPCDialOptions(opts []grpc.DialOption) { + x.dialOptions = opts +} + +// newWrapper creates a clientWrapper that implements the client interface. +func newWrapper(prm wrapperPrm) *clientWrapper { + var cl sdkClient.Client + prmInit := sdkClient.PrmInit{ + Key: prm.key, + ResponseInfoCallback: prm.responseInfoCallback, + } + + cl.Init(prmInit) + + res := &clientWrapper{ + client: &cl, + clientStatusMonitor: newClientStatusMonitor(prm.logger, prm.address, prm.errorThreshold), + prm: prm, + } + + return res +} + +// dial establishes a connection to the server from the FrostFS network. +// Returns an error describing failure reason. If failed, the client +// SHOULD NOT be used. +func (c *clientWrapper) dial(ctx context.Context) error { + cl, err := c.getClient() + if err != nil { + return err + } + + prmDial := sdkClient.PrmDial{ + Endpoint: c.prm.address, + DialTimeout: c.prm.dialTimeout, + StreamTimeout: c.prm.streamTimeout, + GRPCDialOptions: c.prm.dialOptions, + } + + err = cl.Dial(ctx, prmDial) + c.setDialed(err == nil) + if err != nil { + return err + } + + return nil +} + +// restart recreates and redial inner sdk client. +func (c *clientWrapper) restart(ctx context.Context) error { + var cl sdkClient.Client + prmInit := sdkClient.PrmInit{ + Key: c.prm.key, + ResponseInfoCallback: c.prm.responseInfoCallback, + } + + cl.Init(prmInit) + + prmDial := sdkClient.PrmDial{ + Endpoint: c.prm.address, + DialTimeout: c.prm.dialTimeout, + StreamTimeout: c.prm.streamTimeout, + GRPCDialOptions: c.prm.dialOptions, + } + + // if connection is dialed before, to avoid routine / connection leak, + // pool has to close it and then initialize once again. + if c.isDialed() { + c.scheduleGracefulClose() + } + + err := cl.Dial(ctx, prmDial) + c.setDialed(err == nil) + if err != nil { + return err + } + + c.clientMutex.Lock() + c.client = &cl + c.clientMutex.Unlock() + + return nil +} + +func (c *clientWrapper) isDialed() bool { + c.mu.RLock() + defer c.mu.RUnlock() + return c.dialed +} + +func (c *clientWrapper) setDialed(dialed bool) { + c.mu.Lock() + c.dialed = dialed + c.mu.Unlock() +} + +func (c *clientWrapper) getClient() (*sdkClient.Client, error) { + c.clientMutex.RLock() + defer c.clientMutex.RUnlock() + if c.isHealthy() { + return c.client, nil + } + return nil, errPoolClientUnhealthy +} + +func (c *clientWrapper) getClientRaw() *sdkClient.Client { + c.clientMutex.RLock() + defer c.clientMutex.RUnlock() + return c.client +} + +// balanceGet invokes sdkClient.BalanceGet parse response status to error and return result as is. +func (c *clientWrapper) balanceGet(ctx context.Context, prm PrmBalanceGet) (accounting.Decimal, error) { + cl, err := c.getClient() + if err != nil { + return accounting.Decimal{}, err + } + + cliPrm := sdkClient.PrmBalanceGet{ + Account: prm.account, + } + + start := time.Now() + res, err := cl.BalanceGet(ctx, cliPrm) + c.incRequests(time.Since(start), methodBalanceGet) + var st apistatus.Status + if res != nil { + st = res.Status() + } + if err = c.handleError(ctx, st, err); err != nil { + return accounting.Decimal{}, fmt.Errorf("balance get on client: %w", err) + } + + return res.Amount(), nil +} + +// containerPut invokes sdkClient.ContainerPut parse response status to error and return result as is. +// It also waits for the container to appear on the network. +func (c *clientWrapper) containerPut(ctx context.Context, prm PrmContainerPut) (cid.ID, error) { + cl, err := c.getClient() + if err != nil { + return cid.ID{}, err + } + + start := time.Now() + res, err := cl.ContainerPut(ctx, prm.ClientParams) + c.incRequests(time.Since(start), methodContainerPut) + var st apistatus.Status + if res != nil { + st = res.Status() + } + if err = c.handleError(ctx, st, err); err != nil { + return cid.ID{}, fmt.Errorf("container put on client: %w", err) + } + + if prm.WaitParams == nil { + prm.WaitParams = defaultWaitParams() + } + if err = prm.WaitParams.CheckValidity(); err != nil { + return cid.ID{}, fmt.Errorf("invalid wait parameters: %w", err) + } + + idCnr := res.ID() + + getPrm := PrmContainerGet{ + ContainerID: idCnr, + Session: prm.ClientParams.Session, + } + + err = waitForContainerPresence(ctx, c, getPrm, prm.WaitParams) + if err = c.handleError(ctx, nil, err); err != nil { + return cid.ID{}, fmt.Errorf("wait container presence on client: %w", err) + } + + return idCnr, nil +} + +// containerGet invokes sdkClient.ContainerGet parse response status to error and return result as is. +func (c *clientWrapper) containerGet(ctx context.Context, prm PrmContainerGet) (container.Container, error) { + cl, err := c.getClient() + if err != nil { + return container.Container{}, err + } + + cliPrm := sdkClient.PrmContainerGet{ + ContainerID: &prm.ContainerID, + Session: prm.Session, + } + + start := time.Now() + res, err := cl.ContainerGet(ctx, cliPrm) + c.incRequests(time.Since(start), methodContainerGet) + var st apistatus.Status + if res != nil { + st = res.Status() + } + if err = c.handleError(ctx, st, err); err != nil { + return container.Container{}, fmt.Errorf("container get on client: %w", err) + } + + return res.Container(), nil +} + +// containerList invokes sdkClient.ContainerList parse response status to error and return result as is. +func (c *clientWrapper) containerList(ctx context.Context, prm PrmContainerList) ([]cid.ID, error) { + cl, err := c.getClient() + if err != nil { + return nil, err + } + + cliPrm := sdkClient.PrmContainerList{ + OwnerID: prm.OwnerID, + Session: prm.Session, + } + + start := time.Now() + res, err := cl.ContainerList(ctx, cliPrm) + c.incRequests(time.Since(start), methodContainerList) + var st apistatus.Status + if res != nil { + st = res.Status() + } + if err = c.handleError(ctx, st, err); err != nil { + return nil, fmt.Errorf("container list on client: %w", err) + } + return res.Containers(), nil +} + +// PrmListStream groups parameters of ListContainersStream operation. +type PrmListStream struct { + OwnerID user.ID + + Session *session.Container +} + +// ResListStream is designed to read list of object identifiers from FrostFS system. +// +// Must be initialized using Pool.ListContainersStream, any other usage is unsafe. +type ResListStream struct { + r *sdkClient.ContainerListReader + handleError func(context.Context, apistatus.Status, error) error +} + +// Read reads another list of the container identifiers. +func (x *ResListStream) Read(buf []cid.ID) (int, error) { + n, ok := x.r.Read(buf) + if !ok { + res, err := x.r.Close() + if err == nil { + return n, io.EOF + } + + var status apistatus.Status + if res != nil { + status = res.Status() + } + err = x.handleError(nil, status, err) + + return n, err + } + + return n, nil +} + +// Iterate iterates over the list of found container identifiers. +// f can return true to stop iteration earlier. +// +// Returns an error if container can't be read. +func (x *ResListStream) Iterate(f func(cid.ID) bool) error { + return x.r.Iterate(f) +} + +// Close ends reading list of the matched containers and returns the result of the operation +// along with the final results. Must be called after using the ResListStream. +func (x *ResListStream) Close() { + _, _ = x.r.Close() +} + +// containerList invokes sdkClient.ContainerList parse response status to error and return result as is. +func (c *clientWrapper) containerListStream(ctx context.Context, prm PrmListStream) (ResListStream, error) { + cl, err := c.getClient() + if err != nil { + return ResListStream{}, err + } + + cliPrm := sdkClient.PrmContainerListStream{ + OwnerID: prm.OwnerID, + Session: prm.Session, + } + + res, err := cl.ContainerListInit(ctx, cliPrm) + if err = c.handleError(ctx, nil, err); err != nil { + return ResListStream{}, fmt.Errorf("init container listing on client: %w", err) + } + return ResListStream{r: res, handleError: c.handleError}, nil +} + +// containerDelete invokes sdkClient.ContainerDelete parse response status to error. +// It also waits for the container to be removed from the network. +func (c *clientWrapper) containerDelete(ctx context.Context, prm PrmContainerDelete) error { + cl, err := c.getClient() + if err != nil { + return err + } + + cliPrm := sdkClient.PrmContainerDelete{ + ContainerID: &prm.ContainerID, + Session: prm.Session, + } + + start := time.Now() + res, err := cl.ContainerDelete(ctx, cliPrm) + c.incRequests(time.Since(start), methodContainerDelete) + var st apistatus.Status + if res != nil { + st = res.Status() + } + if err = c.handleError(ctx, st, err); err != nil { + return fmt.Errorf("container delete on client: %w", err) + } + + if prm.WaitParams == nil { + prm.WaitParams = defaultWaitParams() + } + if err := prm.WaitParams.CheckValidity(); err != nil { + return fmt.Errorf("invalid wait parameters: %w", err) + } + + getPrm := PrmContainerGet{ + ContainerID: prm.ContainerID, + Session: prm.Session, + } + + return waitForContainerRemoved(ctx, c, getPrm, prm.WaitParams) +} + +// apeManagerAddChain invokes sdkClient.APEManagerAddChain and parse response status to error. +func (c *clientWrapper) apeManagerAddChain(ctx context.Context, prm PrmAddAPEChain) error { + cl, err := c.getClient() + if err != nil { + return err + } + + cliPrm := sdkClient.PrmAPEManagerAddChain{ + ChainTarget: prm.Target, + Chain: prm.Chain, + } + + start := time.Now() + res, err := cl.APEManagerAddChain(ctx, cliPrm) + c.incRequests(time.Since(start), methodAPEManagerAddChain) + var st apistatus.Status + if res != nil { + st = res.Status() + } + if err = c.handleError(ctx, st, err); err != nil { + return fmt.Errorf("add chain error: %w", err) + } + + return nil +} + +// apeManagerRemoveChain invokes sdkClient.APEManagerRemoveChain and parse response status to error. +func (c *clientWrapper) apeManagerRemoveChain(ctx context.Context, prm PrmRemoveAPEChain) error { + cl, err := c.getClient() + if err != nil { + return err + } + + cliPrm := sdkClient.PrmAPEManagerRemoveChain{ + ChainTarget: prm.Target, + ChainID: prm.ChainID, + } + + start := time.Now() + res, err := cl.APEManagerRemoveChain(ctx, cliPrm) + c.incRequests(time.Since(start), methodAPEManagerRemoveChain) + var st apistatus.Status + if res != nil { + st = res.Status() + } + if err = c.handleError(ctx, st, err); err != nil { + return fmt.Errorf("remove chain error: %w", err) + } + + return nil +} + +// apeManagerListChains invokes sdkClient.APEManagerListChains. Returns chains and parsed response status to error. +func (c *clientWrapper) apeManagerListChains(ctx context.Context, prm PrmListAPEChains) ([]ape.Chain, error) { + cl, err := c.getClient() + if err != nil { + return nil, err + } + + cliPrm := sdkClient.PrmAPEManagerListChains{ + ChainTarget: prm.Target, + } + + start := time.Now() + res, err := cl.APEManagerListChains(ctx, cliPrm) + c.incRequests(time.Since(start), methodAPEManagerListChains) + var st apistatus.Status + if res != nil { + st = res.Status() + } + if err = c.handleError(ctx, st, err); err != nil { + return nil, fmt.Errorf("list chains error: %w", err) + } + + return res.Chains, nil +} + +// endpointInfo invokes sdkClient.EndpointInfo parse response status to error and return result as is. +func (c *clientWrapper) endpointInfo(ctx context.Context, _ prmEndpointInfo) (netmap.NodeInfo, error) { + cl, err := c.getClient() + if err != nil { + return netmap.NodeInfo{}, err + } + + return c.endpointInfoRaw(ctx, cl) +} + +func (c *clientWrapper) healthcheck(ctx context.Context) (netmap.NodeInfo, error) { + cl := c.getClientRaw() + return c.endpointInfoRaw(ctx, cl) +} + +func (c *clientWrapper) endpointInfoRaw(ctx context.Context, cl *sdkClient.Client) (netmap.NodeInfo, error) { + start := time.Now() + res, err := cl.EndpointInfo(ctx, sdkClient.PrmEndpointInfo{}) + c.incRequests(time.Since(start), methodEndpointInfo) + var st apistatus.Status + if res != nil { + st = res.Status() + } + if err = c.handleError(ctx, st, err); err != nil { + return netmap.NodeInfo{}, fmt.Errorf("endpoint info on client: %w", err) + } + + return res.NodeInfo(), nil +} + +// networkInfo invokes sdkClient.NetworkInfo parse response status to error and return result as is. +func (c *clientWrapper) networkInfo(ctx context.Context, _ prmNetworkInfo) (netmap.NetworkInfo, error) { + cl, err := c.getClient() + if err != nil { + return netmap.NetworkInfo{}, err + } + + start := time.Now() + res, err := cl.NetworkInfo(ctx, sdkClient.PrmNetworkInfo{}) + c.incRequests(time.Since(start), methodNetworkInfo) + var st apistatus.Status + if res != nil { + st = res.Status() + } + if err = c.handleError(ctx, st, err); err != nil { + return netmap.NetworkInfo{}, fmt.Errorf("network info on client: %w", err) + } + + return res.Info(), nil +} + +// networkInfo invokes sdkClient.NetworkInfo parse response status to error and return result as is. +func (c *clientWrapper) netMapSnapshot(ctx context.Context, _ prmNetMapSnapshot) (netmap.NetMap, error) { + cl, err := c.getClient() + if err != nil { + return netmap.NetMap{}, err + } + + start := time.Now() + res, err := cl.NetMapSnapshot(ctx, sdkClient.PrmNetMapSnapshot{}) + c.incRequests(time.Since(start), methodNetMapSnapshot) + var st apistatus.Status + if res != nil { + st = res.Status() + } + if err = c.handleError(ctx, st, err); err != nil { + return netmap.NetMap{}, fmt.Errorf("network map snapshot on client: %w", err) + } + + return res.NetMap(), nil +} + +// objectPatch patches object in FrostFS. +func (c *clientWrapper) objectPatch(ctx context.Context, prm PrmObjectPatch) (ResPatchObject, error) { + cl, err := c.getClient() + if err != nil { + return ResPatchObject{}, err + } + + start := time.Now() + pObj, err := cl.ObjectPatchInit(ctx, sdkClient.PrmObjectPatch{ + Address: prm.addr, + Session: prm.stoken, + Key: prm.key, + BearerToken: prm.btoken, + MaxChunkLength: prm.maxPayloadPatchChunkLength, + }) + if err = c.handleError(ctx, nil, err); err != nil { + return ResPatchObject{}, fmt.Errorf("init patching on API client: %w", err) + } + c.incRequests(time.Since(start), methodObjectPatch) + + start = time.Now() + attrPatchSuccess := pObj.PatchAttributes(ctx, prm.newAttrs, prm.replaceAttrs) + c.incRequests(time.Since(start), methodObjectPatch) + + if attrPatchSuccess { + start = time.Now() + _ = pObj.PatchPayload(ctx, prm.rng, prm.payload) + c.incRequests(time.Since(start), methodObjectPatch) + } + + res, err := pObj.Close(ctx) + var st apistatus.Status + if res != nil { + st = res.Status() + } + if err = c.handleError(ctx, st, err); err != nil { + return ResPatchObject{}, fmt.Errorf("client failure: %w", err) + } + + return ResPatchObject{ObjectID: res.ObjectID()}, nil +} + +// objectPut writes object to FrostFS. +func (c *clientWrapper) objectPut(ctx context.Context, prm PrmObjectPut) (ResPutObject, error) { + if prm.bufferMaxSize == 0 { + prm.bufferMaxSize = defaultBufferMaxSizeForPut + } + + if prm.clientCut { + return c.objectPutClientCut(ctx, prm) + } + + return c.objectPutServerCut(ctx, prm) +} + +func (c *clientWrapper) objectPutServerCut(ctx context.Context, prm PrmObjectPut) (ResPutObject, error) { + cl, err := c.getClient() + if err != nil { + return ResPutObject{}, err + } + + cliPrm := sdkClient.PrmObjectPutInit{ + CopiesNumber: prm.copiesNumber, + Session: prm.stoken, + Key: prm.key, + BearerToken: prm.btoken, + } + + start := time.Now() + wObj, err := cl.ObjectPutInit(ctx, cliPrm) + c.incRequests(time.Since(start), methodObjectPut) + if err = c.handleError(ctx, nil, err); err != nil { + return ResPutObject{}, fmt.Errorf("init writing on API client: %w", err) + } + + if wObj.WriteHeader(ctx, prm.hdr) { + sz := prm.hdr.PayloadSize() + + if data := prm.hdr.Payload(); len(data) > 0 { + if prm.payload != nil { + prm.payload = io.MultiReader(bytes.NewReader(data), prm.payload) + } else { + prm.payload = bytes.NewReader(data) + sz = uint64(len(data)) + } + } + + if prm.payload != nil { + if sz == 0 || sz > prm.bufferMaxSize { + sz = prm.bufferMaxSize + } + + buf := make([]byte, sz) + + var n int + + for { + n, err = prm.payload.Read(buf) + if n > 0 { + start = time.Now() + successWrite := wObj.WritePayloadChunk(ctx, buf[:n]) + c.incRequests(time.Since(start), methodObjectPut) + if !successWrite { + break + } + + continue + } + + if errors.Is(err, io.EOF) { + break + } + + return ResPutObject{}, fmt.Errorf("read payload: %w", c.handleError(ctx, nil, err)) + } + } + } + + res, err := wObj.Close(ctx) + var st apistatus.Status + if res != nil { + st = res.Status() + } + if err = c.handleError(ctx, st, err); err != nil { // here err already carries both status and client errors + return ResPutObject{}, fmt.Errorf("client failure: %w", err) + } + + return ResPutObject{ + ObjectID: res.StoredObjectID(), + Epoch: res.StoredEpoch(), + }, nil +} + +func (c *clientWrapper) objectPutClientCut(ctx context.Context, prm PrmObjectPut) (ResPutObject, error) { + putInitPrm := PrmObjectPutClientCutInit{ + PrmObjectPut: prm, + } + + start := time.Now() + wObj, err := c.objectPutInitTransformer(putInitPrm) + c.incRequests(time.Since(start), methodObjectPut) + if err = c.handleError(ctx, nil, err); err != nil { + return ResPutObject{}, fmt.Errorf("init writing on API client: %w", err) + } + + if wObj.WriteHeader(ctx, prm.hdr) { + sz := prm.hdr.PayloadSize() + + if data := prm.hdr.Payload(); len(data) > 0 { + if prm.payload != nil { + prm.payload = io.MultiReader(bytes.NewReader(data), prm.payload) + } else { + prm.payload = bytes.NewReader(data) + sz = uint64(len(data)) + } + } + + if prm.payload != nil { + if sz == 0 || sz > prm.bufferMaxSize { + sz = prm.bufferMaxSize + } + + buf := make([]byte, sz) + + var n int + + for { + n, err = prm.payload.Read(buf) + if n > 0 { + start = time.Now() + successWrite := wObj.WritePayloadChunk(ctx, buf[:n]) + c.incRequests(time.Since(start), methodObjectPut) + if !successWrite { + break + } + + continue + } + + if errors.Is(err, io.EOF) { + break + } + + return ResPutObject{}, fmt.Errorf("read payload: %w", c.handleError(ctx, nil, err)) + } + } + } + + res, err := wObj.Close(ctx) + var st apistatus.Status + if res != nil { + st = res.Status + } + if err = c.handleError(ctx, st, err); err != nil { // here err already carries both status and client errors + return ResPutObject{}, fmt.Errorf("client failure: %w", err) + } + + return ResPutObject{ + ObjectID: res.OID, + Epoch: res.Epoch, + }, nil +} + +// objectDelete invokes sdkClient.ObjectDelete parse response status to error. +func (c *clientWrapper) objectDelete(ctx context.Context, prm PrmObjectDelete) error { + cl, err := c.getClient() + if err != nil { + return err + } + + cnr := prm.addr.Container() + obj := prm.addr.Object() + + cliPrm := sdkClient.PrmObjectDelete{ + BearerToken: prm.btoken, + Session: prm.stoken, + ContainerID: &cnr, + ObjectID: &obj, + Key: prm.key, + } + + start := time.Now() + res, err := cl.ObjectDelete(ctx, cliPrm) + c.incRequests(time.Since(start), methodObjectDelete) + var st apistatus.Status + if res != nil { + st = res.Status() + } + if err = c.handleError(ctx, st, err); err != nil { + return fmt.Errorf("delete object on client: %w", err) + } + return nil +} + +// objectGet returns reader for object. +func (c *clientWrapper) objectGet(ctx context.Context, prm PrmObjectGet) (ResGetObject, error) { + cl, err := c.getClient() + if err != nil { + return ResGetObject{}, err + } + + prmCnr := prm.addr.Container() + prmObj := prm.addr.Object() + + cliPrm := sdkClient.PrmObjectGet{ + BearerToken: prm.btoken, + Session: prm.stoken, + ContainerID: &prmCnr, + ObjectID: &prmObj, + Key: prm.key, + } + + var res ResGetObject + + rObj, err := cl.ObjectGetInit(ctx, cliPrm) + if err = c.handleError(ctx, nil, err); err != nil { + return ResGetObject{}, fmt.Errorf("init object reading on client: %w", err) + } + + start := time.Now() + successReadHeader := rObj.ReadHeader(&res.Header) + c.incRequests(time.Since(start), methodObjectGet) + if !successReadHeader { + rObjRes, err := rObj.Close() + var st apistatus.Status + if rObjRes != nil { + st = rObjRes.Status() + } + err = c.handleError(ctx, st, err) + return res, fmt.Errorf("read header: %w", err) + } + + res.Payload = &objectReadCloser{ + reader: rObj, + elapsedTimeCallback: func(elapsed time.Duration) { + c.incRequests(elapsed, methodObjectGet) + }, + } + + return res, nil +} + +// objectHead invokes sdkClient.ObjectHead parse response status to error and return result as is. +func (c *clientWrapper) objectHead(ctx context.Context, prm PrmObjectHead) (object.Object, error) { + cl, err := c.getClient() + if err != nil { + return object.Object{}, err + } + + prmCnr := prm.addr.Container() + prmObj := prm.addr.Object() + + cliPrm := sdkClient.PrmObjectHead{ + BearerToken: prm.btoken, + Session: prm.stoken, + Raw: prm.raw, + ContainerID: &prmCnr, + ObjectID: &prmObj, + Key: prm.key, + } + + var obj object.Object + + start := time.Now() + res, err := cl.ObjectHead(ctx, cliPrm) + c.incRequests(time.Since(start), methodObjectHead) + var st apistatus.Status + if res != nil { + st = res.Status() + } + if err = c.handleError(ctx, st, err); err != nil { + return obj, fmt.Errorf("read object header via client: %w", err) + } + if !res.ReadHeader(&obj) { + return obj, errors.New("missing object header in response") + } + + return obj, nil +} + +// objectRange returns object range reader. +func (c *clientWrapper) objectRange(ctx context.Context, prm PrmObjectRange) (ResObjectRange, error) { + cl, err := c.getClient() + if err != nil { + return ResObjectRange{}, err + } + + prmCnr := prm.addr.Container() + prmObj := prm.addr.Object() + + cliPrm := sdkClient.PrmObjectRange{ + BearerToken: prm.btoken, + Session: prm.stoken, + ContainerID: &prmCnr, + ObjectID: &prmObj, + Offset: prm.off, + Length: prm.ln, + Key: prm.key, + } + + start := time.Now() + res, err := cl.ObjectRangeInit(ctx, cliPrm) + c.incRequests(time.Since(start), methodObjectRange) + if err = c.handleError(ctx, nil, err); err != nil { + return ResObjectRange{}, fmt.Errorf("init payload range reading on client: %w", err) + } + + return ResObjectRange{ + payload: res, + elapsedTimeCallback: func(elapsed time.Duration) { + c.incRequests(elapsed, methodObjectRange) + }, + }, nil +} + +// objectSearch invokes sdkClient.ObjectSearchInit parse response status to error and return result as is. +func (c *clientWrapper) objectSearch(ctx context.Context, prm PrmObjectSearch) (ResObjectSearch, error) { + cl, err := c.getClient() + if err != nil { + return ResObjectSearch{}, err + } + + cliPrm := sdkClient.PrmObjectSearch{ + ContainerID: &prm.cnrID, + Filters: prm.filters, + Session: prm.stoken, + BearerToken: prm.btoken, + Key: prm.key, + } + + res, err := cl.ObjectSearchInit(ctx, cliPrm) + if err = c.handleError(ctx, nil, err); err != nil { + return ResObjectSearch{}, fmt.Errorf("init object searching on client: %w", err) + } + + return ResObjectSearch{r: res, handleError: c.handleError}, nil +} + +// sessionCreate invokes sdkClient.SessionCreate parse response status to error and return result as is. +func (c *clientWrapper) sessionCreate(ctx context.Context, prm prmCreateSession) (resCreateSession, error) { + cl, err := c.getClient() + if err != nil { + return resCreateSession{}, err + } + + cliPrm := sdkClient.PrmSessionCreate{ + Expiration: prm.exp, + Key: &prm.key, + } + + start := time.Now() + res, err := cl.SessionCreate(ctx, cliPrm) + c.incRequests(time.Since(start), methodSessionCreate) + var st apistatus.Status + if res != nil { + st = res.Status() + } + if err = c.handleError(ctx, st, err); err != nil { + return resCreateSession{}, fmt.Errorf("session creation on client: %w", err) + } + + return resCreateSession{ + id: res.ID(), + sessionKey: res.PublicKey(), + }, nil +} + +func (c *clientStatusMonitor) isHealthy() bool { + return c.healthy.Load() == statusHealthy +} + +func (c *clientStatusMonitor) setHealthy() { + c.healthy.Store(statusHealthy) +} + +func (c *clientStatusMonitor) setUnhealthy() { + c.healthy.Store(statusUnhealthyOnRequest) +} + +func (c *clientStatusMonitor) address() string { + return c.addr +} + +func (c *clientStatusMonitor) incErrorRate() { + c.mu.Lock() + c.currentErrorCount++ + c.overallErrorCount++ + + thresholdReached := c.currentErrorCount >= c.errorThreshold + if thresholdReached { + c.setUnhealthy() + c.currentErrorCount = 0 + } + c.mu.Unlock() + + if thresholdReached { + c.log(zapcore.WarnLevel, "error threshold reached", + zap.String("address", c.addr), zap.Uint32("threshold", c.errorThreshold)) + } +} + +func (c *clientStatusMonitor) incErrorRateToUnhealthy(err error) { + c.mu.Lock() + c.currentErrorCount = 0 + c.overallErrorCount++ + c.setUnhealthy() + c.mu.Unlock() + + c.log(zapcore.WarnLevel, "explicitly mark node unhealthy", zap.String("address", c.addr), zap.Error(err)) +} + +func (c *clientStatusMonitor) log(level zapcore.Level, msg string, fields ...zap.Field) { + if c.logger == nil { + return + } + + c.logger.Log(level, msg, fields...) +} + +func (c *clientStatusMonitor) currentErrorRate() uint32 { + c.mu.RLock() + defer c.mu.RUnlock() + return c.currentErrorCount +} + +func (c *clientStatusMonitor) overallErrorRate() uint64 { + c.mu.RLock() + defer c.mu.RUnlock() + return c.overallErrorCount +} + +func (c *clientStatusMonitor) methodsStatus() []StatusSnapshot { + result := make([]StatusSnapshot, len(c.methods)) + for i, val := range c.methods { + result[i] = val.Snapshot() + } + + return result +} + +func (c *clientWrapper) incRequests(elapsed time.Duration, method MethodIndex) { + methodStat := c.methods[method] + methodStat.IncRequests(elapsed) + if c.prm.poolRequestInfoCallback != nil { + c.prm.poolRequestInfoCallback(RequestInfo{ + Address: c.prm.address, + Method: method, + Elapsed: elapsed, + }) + } +} + +func (c *clientWrapper) close() error { + if !c.isDialed() { + return nil + } + if cl := c.getClientRaw(); cl != nil { + return cl.Close() + } + return nil +} + +func (c *clientWrapper) scheduleGracefulClose() { + cl := c.getClientRaw() + if cl == nil { + return + } + + time.AfterFunc(c.prm.gracefulCloseOnSwitchTimeout, func() { + if err := cl.Close(); err != nil { + c.log(zap.DebugLevel, "close unhealthy client during rebalance", zap.String("address", c.address()), zap.Error(err)) + } + }) +} + +func (c *clientStatusMonitor) handleError(ctx context.Context, st apistatus.Status, err error) error { + if stErr := apistatus.ErrFromStatus(st); stErr != nil { + switch stErr.(type) { + case *apistatus.ServerInternal, + *apistatus.WrongMagicNumber, + *apistatus.SignatureVerification: + c.incErrorRate() + case *apistatus.NodeUnderMaintenance: + c.incErrorRateToUnhealthy(stErr) + } + + if err == nil { + err = stErr + } + + return err + } + + if err != nil { + if needCountError(ctx, err) { + if sdkClient.IsErrNodeUnderMaintenance(err) { + c.incErrorRateToUnhealthy(err) + } else { + c.incErrorRate() + } + } + + return err + } + + return nil +} + +func needCountError(ctx context.Context, err error) bool { + // non-status logic error that could be returned + // from the SDK client; should not be considered + // as a connection error + var siErr *object.SplitInfoError + if errors.As(err, &siErr) { + return false + } + var eiErr *object.ECInfoError + if errors.As(err, &eiErr) { + return false + } + + if ctx != nil && errors.Is(ctx.Err(), context.Canceled) { + return false + } + + return true +} + +// clientBuilder is a type alias of client constructors which open connection +// to the given endpoint. +type clientBuilder = func(endpoint string) client + +// RequestInfo groups info about pool request. +type RequestInfo struct { + Address string + Method MethodIndex + Elapsed time.Duration +} diff --git a/pool/pool.go b/pool/pool.go index bbbebba..869ff2c 100644 --- a/pool/pool.go +++ b/pool/pool.go @@ -1,15 +1,12 @@ package pool import ( - "bytes" "context" "crypto/ecdsa" "errors" "fmt" "io" "math" - "sync" - "sync/atomic" "time" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/accounting" @@ -29,7 +26,6 @@ import ( "github.com/google/uuid" "github.com/nspcc-dev/neo-go/pkg/crypto/keys" "go.uber.org/zap" - "go.uber.org/zap/zapcore" "google.golang.org/grpc" ) @@ -110,1262 +106,6 @@ type clientStatus interface { methodsStatus() []StatusSnapshot } -// errPoolClientUnhealthy is an error to indicate that client in pool is unhealthy. -var errPoolClientUnhealthy = errors.New("pool client unhealthy") - -// clientStatusMonitor count error rate and other statistics for connection. -type clientStatusMonitor struct { - logger *zap.Logger - addr string - healthy *atomic.Uint32 - errorThreshold uint32 - - mu sync.RWMutex // protect counters - currentErrorCount uint32 - overallErrorCount uint64 - methods []*MethodStatus -} - -// values for healthy status of clientStatusMonitor. -const ( - // statusUnhealthyOnRequest is set when communication after dialing to the - // endpoint is failed due to immediate or accumulated errors, connection is - // available and pool should close it before re-establishing connection once again. - statusUnhealthyOnRequest = iota - - // statusHealthy is set when connection is ready to be used by the pool. - statusHealthy -) - -// MethodIndex index of method in list of statuses in clientStatusMonitor. -type MethodIndex int - -const ( - methodBalanceGet MethodIndex = iota - methodContainerPut - methodContainerGet - methodContainerList - methodContainerListStream - methodContainerDelete - methodEndpointInfo - methodNetworkInfo - methodNetMapSnapshot - methodObjectPut - methodObjectDelete - methodObjectGet - methodObjectHead - methodObjectRange - methodObjectPatch - methodSessionCreate - methodAPEManagerAddChain - methodAPEManagerRemoveChain - methodAPEManagerListChains - methodLast -) - -// String implements fmt.Stringer. -func (m MethodIndex) String() string { - switch m { - case methodBalanceGet: - return "balanceGet" - case methodContainerPut: - return "containerPut" - case methodContainerGet: - return "containerGet" - case methodContainerList: - return "containerList" - case methodContainerDelete: - return "containerDelete" - case methodEndpointInfo: - return "endpointInfo" - case methodNetworkInfo: - return "networkInfo" - case methodNetMapSnapshot: - return "netMapSnapshot" - case methodObjectPut: - return "objectPut" - case methodObjectPatch: - return "objectPatch" - case methodObjectDelete: - return "objectDelete" - case methodObjectGet: - return "objectGet" - case methodObjectHead: - return "objectHead" - case methodObjectRange: - return "objectRange" - case methodSessionCreate: - return "sessionCreate" - case methodAPEManagerAddChain: - return "apeManagerAddChain" - case methodAPEManagerRemoveChain: - return "apeManagerRemoveChain" - case methodAPEManagerListChains: - return "apeManagerListChains" - case methodLast: - return "it's a system name rather than a method" - default: - return "unknown" - } -} - -func newClientStatusMonitor(logger *zap.Logger, addr string, errorThreshold uint32) clientStatusMonitor { - methods := make([]*MethodStatus, methodLast) - for i := methodBalanceGet; i < methodLast; i++ { - methods[i] = &MethodStatus{name: i.String()} - } - - healthy := new(atomic.Uint32) - healthy.Store(statusHealthy) - - return clientStatusMonitor{ - logger: logger, - addr: addr, - healthy: healthy, - errorThreshold: errorThreshold, - methods: methods, - } -} - -// clientWrapper is used by default, alternative implementations are intended for testing purposes only. -type clientWrapper struct { - clientMutex sync.RWMutex - client *sdkClient.Client - dialed bool - prm wrapperPrm - - clientStatusMonitor -} - -// wrapperPrm is params to create clientWrapper. -type wrapperPrm struct { - logger *zap.Logger - address string - key ecdsa.PrivateKey - dialTimeout time.Duration - streamTimeout time.Duration - errorThreshold uint32 - responseInfoCallback func(sdkClient.ResponseMetaInfo) error - poolRequestInfoCallback func(RequestInfo) - dialOptions []grpc.DialOption - - gracefulCloseOnSwitchTimeout time.Duration -} - -// setAddress sets endpoint to connect in FrostFS network. -func (x *wrapperPrm) setAddress(address string) { - x.address = address -} - -// setKey sets sdkClient.Client private key to be used for the protocol communication by default. -func (x *wrapperPrm) setKey(key ecdsa.PrivateKey) { - x.key = key -} - -// setLogger sets sdkClient.Client logger. -func (x *wrapperPrm) setLogger(logger *zap.Logger) { - x.logger = logger -} - -// setDialTimeout sets the timeout for connection to be established. -func (x *wrapperPrm) setDialTimeout(timeout time.Duration) { - x.dialTimeout = timeout -} - -// setStreamTimeout sets the timeout for individual operations in streaming RPC. -func (x *wrapperPrm) setStreamTimeout(timeout time.Duration) { - x.streamTimeout = timeout -} - -// setErrorThreshold sets threshold after reaching which connection is considered unhealthy -// until Pool.startRebalance routing updates its status. -func (x *wrapperPrm) setErrorThreshold(threshold uint32) { - x.errorThreshold = threshold -} - -// setGracefulCloseOnSwitchTimeout specifies the timeout after which unhealthy client be closed during rebalancing -// if it will become healthy back. -// -// See also setErrorThreshold. -func (x *wrapperPrm) setGracefulCloseOnSwitchTimeout(timeout time.Duration) { - x.gracefulCloseOnSwitchTimeout = timeout -} - -// setPoolRequestCallback sets callback that will be invoked after every pool response. -func (x *wrapperPrm) setPoolRequestCallback(f func(RequestInfo)) { - x.poolRequestInfoCallback = f -} - -// setResponseInfoCallback sets callback that will be invoked after every response. -func (x *wrapperPrm) setResponseInfoCallback(f func(sdkClient.ResponseMetaInfo) error) { - x.responseInfoCallback = f -} - -// setGRPCDialOptions sets the gRPC dial options for new gRPC client connection. -func (x *wrapperPrm) setGRPCDialOptions(opts []grpc.DialOption) { - x.dialOptions = opts -} - -// newWrapper creates a clientWrapper that implements the client interface. -func newWrapper(prm wrapperPrm) *clientWrapper { - var cl sdkClient.Client - prmInit := sdkClient.PrmInit{ - Key: prm.key, - ResponseInfoCallback: prm.responseInfoCallback, - } - - cl.Init(prmInit) - - res := &clientWrapper{ - client: &cl, - clientStatusMonitor: newClientStatusMonitor(prm.logger, prm.address, prm.errorThreshold), - prm: prm, - } - - return res -} - -// dial establishes a connection to the server from the FrostFS network. -// Returns an error describing failure reason. If failed, the client -// SHOULD NOT be used. -func (c *clientWrapper) dial(ctx context.Context) error { - cl, err := c.getClient() - if err != nil { - return err - } - - prmDial := sdkClient.PrmDial{ - Endpoint: c.prm.address, - DialTimeout: c.prm.dialTimeout, - StreamTimeout: c.prm.streamTimeout, - GRPCDialOptions: c.prm.dialOptions, - } - - err = cl.Dial(ctx, prmDial) - c.setDialed(err == nil) - if err != nil { - return err - } - - return nil -} - -// restart recreates and redial inner sdk client. -func (c *clientWrapper) restart(ctx context.Context) error { - var cl sdkClient.Client - prmInit := sdkClient.PrmInit{ - Key: c.prm.key, - ResponseInfoCallback: c.prm.responseInfoCallback, - } - - cl.Init(prmInit) - - prmDial := sdkClient.PrmDial{ - Endpoint: c.prm.address, - DialTimeout: c.prm.dialTimeout, - StreamTimeout: c.prm.streamTimeout, - GRPCDialOptions: c.prm.dialOptions, - } - - // if connection is dialed before, to avoid routine / connection leak, - // pool has to close it and then initialize once again. - if c.isDialed() { - c.scheduleGracefulClose() - } - - err := cl.Dial(ctx, prmDial) - c.setDialed(err == nil) - if err != nil { - return err - } - - c.clientMutex.Lock() - c.client = &cl - c.clientMutex.Unlock() - - return nil -} - -func (c *clientWrapper) isDialed() bool { - c.mu.RLock() - defer c.mu.RUnlock() - return c.dialed -} - -func (c *clientWrapper) setDialed(dialed bool) { - c.mu.Lock() - c.dialed = dialed - c.mu.Unlock() -} - -func (c *clientWrapper) getClient() (*sdkClient.Client, error) { - c.clientMutex.RLock() - defer c.clientMutex.RUnlock() - if c.isHealthy() { - return c.client, nil - } - return nil, errPoolClientUnhealthy -} - -func (c *clientWrapper) getClientRaw() *sdkClient.Client { - c.clientMutex.RLock() - defer c.clientMutex.RUnlock() - return c.client -} - -// balanceGet invokes sdkClient.BalanceGet parse response status to error and return result as is. -func (c *clientWrapper) balanceGet(ctx context.Context, prm PrmBalanceGet) (accounting.Decimal, error) { - cl, err := c.getClient() - if err != nil { - return accounting.Decimal{}, err - } - - cliPrm := sdkClient.PrmBalanceGet{ - Account: prm.account, - } - - start := time.Now() - res, err := cl.BalanceGet(ctx, cliPrm) - c.incRequests(time.Since(start), methodBalanceGet) - var st apistatus.Status - if res != nil { - st = res.Status() - } - if err = c.handleError(ctx, st, err); err != nil { - return accounting.Decimal{}, fmt.Errorf("balance get on client: %w", err) - } - - return res.Amount(), nil -} - -// containerPut invokes sdkClient.ContainerPut parse response status to error and return result as is. -// It also waits for the container to appear on the network. -func (c *clientWrapper) containerPut(ctx context.Context, prm PrmContainerPut) (cid.ID, error) { - cl, err := c.getClient() - if err != nil { - return cid.ID{}, err - } - - start := time.Now() - res, err := cl.ContainerPut(ctx, prm.ClientParams) - c.incRequests(time.Since(start), methodContainerPut) - var st apistatus.Status - if res != nil { - st = res.Status() - } - if err = c.handleError(ctx, st, err); err != nil { - return cid.ID{}, fmt.Errorf("container put on client: %w", err) - } - - if prm.WaitParams == nil { - prm.WaitParams = defaultWaitParams() - } - if err = prm.WaitParams.CheckValidity(); err != nil { - return cid.ID{}, fmt.Errorf("invalid wait parameters: %w", err) - } - - idCnr := res.ID() - - getPrm := PrmContainerGet{ - ContainerID: idCnr, - Session: prm.ClientParams.Session, - } - - err = waitForContainerPresence(ctx, c, getPrm, prm.WaitParams) - if err = c.handleError(ctx, nil, err); err != nil { - return cid.ID{}, fmt.Errorf("wait container presence on client: %w", err) - } - - return idCnr, nil -} - -// containerGet invokes sdkClient.ContainerGet parse response status to error and return result as is. -func (c *clientWrapper) containerGet(ctx context.Context, prm PrmContainerGet) (container.Container, error) { - cl, err := c.getClient() - if err != nil { - return container.Container{}, err - } - - cliPrm := sdkClient.PrmContainerGet{ - ContainerID: &prm.ContainerID, - Session: prm.Session, - } - - start := time.Now() - res, err := cl.ContainerGet(ctx, cliPrm) - c.incRequests(time.Since(start), methodContainerGet) - var st apistatus.Status - if res != nil { - st = res.Status() - } - if err = c.handleError(ctx, st, err); err != nil { - return container.Container{}, fmt.Errorf("container get on client: %w", err) - } - - return res.Container(), nil -} - -// containerList invokes sdkClient.ContainerList parse response status to error and return result as is. -func (c *clientWrapper) containerList(ctx context.Context, prm PrmContainerList) ([]cid.ID, error) { - cl, err := c.getClient() - if err != nil { - return nil, err - } - - cliPrm := sdkClient.PrmContainerList{ - OwnerID: prm.OwnerID, - Session: prm.Session, - } - - start := time.Now() - res, err := cl.ContainerList(ctx, cliPrm) - c.incRequests(time.Since(start), methodContainerList) - var st apistatus.Status - if res != nil { - st = res.Status() - } - if err = c.handleError(ctx, st, err); err != nil { - return nil, fmt.Errorf("container list on client: %w", err) - } - return res.Containers(), nil -} - -// PrmListStream groups parameters of ListContainersStream operation. -type PrmListStream struct { - OwnerID user.ID - - Session *session.Container -} - -// ResListStream is designed to read list of object identifiers from FrostFS system. -// -// Must be initialized using Pool.ListContainersStream, any other usage is unsafe. -type ResListStream struct { - r *sdkClient.ContainerListReader - handleError func(context.Context, apistatus.Status, error) error -} - -// Read reads another list of the container identifiers. -func (x *ResListStream) Read(buf []cid.ID) (int, error) { - n, ok := x.r.Read(buf) - if !ok { - res, err := x.r.Close() - if err == nil { - return n, io.EOF - } - - var status apistatus.Status - if res != nil { - status = res.Status() - } - err = x.handleError(nil, status, err) - - return n, err - } - - return n, nil -} - -// Iterate iterates over the list of found container identifiers. -// f can return true to stop iteration earlier. -// -// Returns an error if container can't be read. -func (x *ResListStream) Iterate(f func(cid.ID) bool) error { - return x.r.Iterate(f) -} - -// Close ends reading list of the matched containers and returns the result of the operation -// along with the final results. Must be called after using the ResListStream. -func (x *ResListStream) Close() { - _, _ = x.r.Close() -} - -// containerList invokes sdkClient.ContainerList parse response status to error and return result as is. -func (c *clientWrapper) containerListStream(ctx context.Context, prm PrmListStream) (ResListStream, error) { - cl, err := c.getClient() - if err != nil { - return ResListStream{}, err - } - - cliPrm := sdkClient.PrmContainerListStream{ - OwnerID: prm.OwnerID, - Session: prm.Session, - } - - res, err := cl.ContainerListInit(ctx, cliPrm) - if err = c.handleError(ctx, nil, err); err != nil { - return ResListStream{}, fmt.Errorf("init container listing on client: %w", err) - } - return ResListStream{r: res, handleError: c.handleError}, nil -} - -// containerDelete invokes sdkClient.ContainerDelete parse response status to error. -// It also waits for the container to be removed from the network. -func (c *clientWrapper) containerDelete(ctx context.Context, prm PrmContainerDelete) error { - cl, err := c.getClient() - if err != nil { - return err - } - - cliPrm := sdkClient.PrmContainerDelete{ - ContainerID: &prm.ContainerID, - Session: prm.Session, - } - - start := time.Now() - res, err := cl.ContainerDelete(ctx, cliPrm) - c.incRequests(time.Since(start), methodContainerDelete) - var st apistatus.Status - if res != nil { - st = res.Status() - } - if err = c.handleError(ctx, st, err); err != nil { - return fmt.Errorf("container delete on client: %w", err) - } - - if prm.WaitParams == nil { - prm.WaitParams = defaultWaitParams() - } - if err := prm.WaitParams.CheckValidity(); err != nil { - return fmt.Errorf("invalid wait parameters: %w", err) - } - - getPrm := PrmContainerGet{ - ContainerID: prm.ContainerID, - Session: prm.Session, - } - - return waitForContainerRemoved(ctx, c, getPrm, prm.WaitParams) -} - -// apeManagerAddChain invokes sdkClient.APEManagerAddChain and parse response status to error. -func (c *clientWrapper) apeManagerAddChain(ctx context.Context, prm PrmAddAPEChain) error { - cl, err := c.getClient() - if err != nil { - return err - } - - cliPrm := sdkClient.PrmAPEManagerAddChain{ - ChainTarget: prm.Target, - Chain: prm.Chain, - } - - start := time.Now() - res, err := cl.APEManagerAddChain(ctx, cliPrm) - c.incRequests(time.Since(start), methodAPEManagerAddChain) - var st apistatus.Status - if res != nil { - st = res.Status() - } - if err = c.handleError(ctx, st, err); err != nil { - return fmt.Errorf("add chain error: %w", err) - } - - return nil -} - -// apeManagerRemoveChain invokes sdkClient.APEManagerRemoveChain and parse response status to error. -func (c *clientWrapper) apeManagerRemoveChain(ctx context.Context, prm PrmRemoveAPEChain) error { - cl, err := c.getClient() - if err != nil { - return err - } - - cliPrm := sdkClient.PrmAPEManagerRemoveChain{ - ChainTarget: prm.Target, - ChainID: prm.ChainID, - } - - start := time.Now() - res, err := cl.APEManagerRemoveChain(ctx, cliPrm) - c.incRequests(time.Since(start), methodAPEManagerRemoveChain) - var st apistatus.Status - if res != nil { - st = res.Status() - } - if err = c.handleError(ctx, st, err); err != nil { - return fmt.Errorf("remove chain error: %w", err) - } - - return nil -} - -// apeManagerListChains invokes sdkClient.APEManagerListChains. Returns chains and parsed response status to error. -func (c *clientWrapper) apeManagerListChains(ctx context.Context, prm PrmListAPEChains) ([]ape.Chain, error) { - cl, err := c.getClient() - if err != nil { - return nil, err - } - - cliPrm := sdkClient.PrmAPEManagerListChains{ - ChainTarget: prm.Target, - } - - start := time.Now() - res, err := cl.APEManagerListChains(ctx, cliPrm) - c.incRequests(time.Since(start), methodAPEManagerListChains) - var st apistatus.Status - if res != nil { - st = res.Status() - } - if err = c.handleError(ctx, st, err); err != nil { - return nil, fmt.Errorf("list chains error: %w", err) - } - - return res.Chains, nil -} - -// endpointInfo invokes sdkClient.EndpointInfo parse response status to error and return result as is. -func (c *clientWrapper) endpointInfo(ctx context.Context, _ prmEndpointInfo) (netmap.NodeInfo, error) { - cl, err := c.getClient() - if err != nil { - return netmap.NodeInfo{}, err - } - - return c.endpointInfoRaw(ctx, cl) -} - -func (c *clientWrapper) healthcheck(ctx context.Context) (netmap.NodeInfo, error) { - cl := c.getClientRaw() - return c.endpointInfoRaw(ctx, cl) -} - -func (c *clientWrapper) endpointInfoRaw(ctx context.Context, cl *sdkClient.Client) (netmap.NodeInfo, error) { - start := time.Now() - res, err := cl.EndpointInfo(ctx, sdkClient.PrmEndpointInfo{}) - c.incRequests(time.Since(start), methodEndpointInfo) - var st apistatus.Status - if res != nil { - st = res.Status() - } - if err = c.handleError(ctx, st, err); err != nil { - return netmap.NodeInfo{}, fmt.Errorf("endpoint info on client: %w", err) - } - - return res.NodeInfo(), nil -} - -// networkInfo invokes sdkClient.NetworkInfo parse response status to error and return result as is. -func (c *clientWrapper) networkInfo(ctx context.Context, _ prmNetworkInfo) (netmap.NetworkInfo, error) { - cl, err := c.getClient() - if err != nil { - return netmap.NetworkInfo{}, err - } - - start := time.Now() - res, err := cl.NetworkInfo(ctx, sdkClient.PrmNetworkInfo{}) - c.incRequests(time.Since(start), methodNetworkInfo) - var st apistatus.Status - if res != nil { - st = res.Status() - } - if err = c.handleError(ctx, st, err); err != nil { - return netmap.NetworkInfo{}, fmt.Errorf("network info on client: %w", err) - } - - return res.Info(), nil -} - -// networkInfo invokes sdkClient.NetworkInfo parse response status to error and return result as is. -func (c *clientWrapper) netMapSnapshot(ctx context.Context, _ prmNetMapSnapshot) (netmap.NetMap, error) { - cl, err := c.getClient() - if err != nil { - return netmap.NetMap{}, err - } - - start := time.Now() - res, err := cl.NetMapSnapshot(ctx, sdkClient.PrmNetMapSnapshot{}) - c.incRequests(time.Since(start), methodNetMapSnapshot) - var st apistatus.Status - if res != nil { - st = res.Status() - } - if err = c.handleError(ctx, st, err); err != nil { - return netmap.NetMap{}, fmt.Errorf("network map snapshot on client: %w", err) - } - - return res.NetMap(), nil -} - -// objectPatch patches object in FrostFS. -func (c *clientWrapper) objectPatch(ctx context.Context, prm PrmObjectPatch) (ResPatchObject, error) { - cl, err := c.getClient() - if err != nil { - return ResPatchObject{}, err - } - - start := time.Now() - pObj, err := cl.ObjectPatchInit(ctx, sdkClient.PrmObjectPatch{ - Address: prm.addr, - Session: prm.stoken, - Key: prm.key, - BearerToken: prm.btoken, - MaxChunkLength: prm.maxPayloadPatchChunkLength, - }) - if err = c.handleError(ctx, nil, err); err != nil { - return ResPatchObject{}, fmt.Errorf("init patching on API client: %w", err) - } - c.incRequests(time.Since(start), methodObjectPatch) - - start = time.Now() - attrPatchSuccess := pObj.PatchAttributes(ctx, prm.newAttrs, prm.replaceAttrs) - c.incRequests(time.Since(start), methodObjectPatch) - - if attrPatchSuccess { - start = time.Now() - _ = pObj.PatchPayload(ctx, prm.rng, prm.payload) - c.incRequests(time.Since(start), methodObjectPatch) - } - - res, err := pObj.Close(ctx) - var st apistatus.Status - if res != nil { - st = res.Status() - } - if err = c.handleError(ctx, st, err); err != nil { - return ResPatchObject{}, fmt.Errorf("client failure: %w", err) - } - - return ResPatchObject{ObjectID: res.ObjectID()}, nil -} - -// objectPut writes object to FrostFS. -func (c *clientWrapper) objectPut(ctx context.Context, prm PrmObjectPut) (ResPutObject, error) { - if prm.bufferMaxSize == 0 { - prm.bufferMaxSize = defaultBufferMaxSizeForPut - } - - if prm.clientCut { - return c.objectPutClientCut(ctx, prm) - } - - return c.objectPutServerCut(ctx, prm) -} - -func (c *clientWrapper) objectPutServerCut(ctx context.Context, prm PrmObjectPut) (ResPutObject, error) { - cl, err := c.getClient() - if err != nil { - return ResPutObject{}, err - } - - cliPrm := sdkClient.PrmObjectPutInit{ - CopiesNumber: prm.copiesNumber, - Session: prm.stoken, - Key: prm.key, - BearerToken: prm.btoken, - } - - start := time.Now() - wObj, err := cl.ObjectPutInit(ctx, cliPrm) - c.incRequests(time.Since(start), methodObjectPut) - if err = c.handleError(ctx, nil, err); err != nil { - return ResPutObject{}, fmt.Errorf("init writing on API client: %w", err) - } - - if wObj.WriteHeader(ctx, prm.hdr) { - sz := prm.hdr.PayloadSize() - - if data := prm.hdr.Payload(); len(data) > 0 { - if prm.payload != nil { - prm.payload = io.MultiReader(bytes.NewReader(data), prm.payload) - } else { - prm.payload = bytes.NewReader(data) - sz = uint64(len(data)) - } - } - - if prm.payload != nil { - if sz == 0 || sz > prm.bufferMaxSize { - sz = prm.bufferMaxSize - } - - buf := make([]byte, sz) - - var n int - - for { - n, err = prm.payload.Read(buf) - if n > 0 { - start = time.Now() - successWrite := wObj.WritePayloadChunk(ctx, buf[:n]) - c.incRequests(time.Since(start), methodObjectPut) - if !successWrite { - break - } - - continue - } - - if errors.Is(err, io.EOF) { - break - } - - return ResPutObject{}, fmt.Errorf("read payload: %w", c.handleError(ctx, nil, err)) - } - } - } - - res, err := wObj.Close(ctx) - var st apistatus.Status - if res != nil { - st = res.Status() - } - if err = c.handleError(ctx, st, err); err != nil { // here err already carries both status and client errors - return ResPutObject{}, fmt.Errorf("client failure: %w", err) - } - - return ResPutObject{ - ObjectID: res.StoredObjectID(), - Epoch: res.StoredEpoch(), - }, nil -} - -func (c *clientWrapper) objectPutClientCut(ctx context.Context, prm PrmObjectPut) (ResPutObject, error) { - putInitPrm := PrmObjectPutClientCutInit{ - PrmObjectPut: prm, - } - - start := time.Now() - wObj, err := c.objectPutInitTransformer(putInitPrm) - c.incRequests(time.Since(start), methodObjectPut) - if err = c.handleError(ctx, nil, err); err != nil { - return ResPutObject{}, fmt.Errorf("init writing on API client: %w", err) - } - - if wObj.WriteHeader(ctx, prm.hdr) { - sz := prm.hdr.PayloadSize() - - if data := prm.hdr.Payload(); len(data) > 0 { - if prm.payload != nil { - prm.payload = io.MultiReader(bytes.NewReader(data), prm.payload) - } else { - prm.payload = bytes.NewReader(data) - sz = uint64(len(data)) - } - } - - if prm.payload != nil { - if sz == 0 || sz > prm.bufferMaxSize { - sz = prm.bufferMaxSize - } - - buf := make([]byte, sz) - - var n int - - for { - n, err = prm.payload.Read(buf) - if n > 0 { - start = time.Now() - successWrite := wObj.WritePayloadChunk(ctx, buf[:n]) - c.incRequests(time.Since(start), methodObjectPut) - if !successWrite { - break - } - - continue - } - - if errors.Is(err, io.EOF) { - break - } - - return ResPutObject{}, fmt.Errorf("read payload: %w", c.handleError(ctx, nil, err)) - } - } - } - - res, err := wObj.Close(ctx) - var st apistatus.Status - if res != nil { - st = res.Status - } - if err = c.handleError(ctx, st, err); err != nil { // here err already carries both status and client errors - return ResPutObject{}, fmt.Errorf("client failure: %w", err) - } - - return ResPutObject{ - ObjectID: res.OID, - Epoch: res.Epoch, - }, nil -} - -// objectDelete invokes sdkClient.ObjectDelete parse response status to error. -func (c *clientWrapper) objectDelete(ctx context.Context, prm PrmObjectDelete) error { - cl, err := c.getClient() - if err != nil { - return err - } - - cnr := prm.addr.Container() - obj := prm.addr.Object() - - cliPrm := sdkClient.PrmObjectDelete{ - BearerToken: prm.btoken, - Session: prm.stoken, - ContainerID: &cnr, - ObjectID: &obj, - Key: prm.key, - } - - start := time.Now() - res, err := cl.ObjectDelete(ctx, cliPrm) - c.incRequests(time.Since(start), methodObjectDelete) - var st apistatus.Status - if res != nil { - st = res.Status() - } - if err = c.handleError(ctx, st, err); err != nil { - return fmt.Errorf("delete object on client: %w", err) - } - return nil -} - -// objectGet returns reader for object. -func (c *clientWrapper) objectGet(ctx context.Context, prm PrmObjectGet) (ResGetObject, error) { - cl, err := c.getClient() - if err != nil { - return ResGetObject{}, err - } - - prmCnr := prm.addr.Container() - prmObj := prm.addr.Object() - - cliPrm := sdkClient.PrmObjectGet{ - BearerToken: prm.btoken, - Session: prm.stoken, - ContainerID: &prmCnr, - ObjectID: &prmObj, - Key: prm.key, - } - - var res ResGetObject - - rObj, err := cl.ObjectGetInit(ctx, cliPrm) - if err = c.handleError(ctx, nil, err); err != nil { - return ResGetObject{}, fmt.Errorf("init object reading on client: %w", err) - } - - start := time.Now() - successReadHeader := rObj.ReadHeader(&res.Header) - c.incRequests(time.Since(start), methodObjectGet) - if !successReadHeader { - rObjRes, err := rObj.Close() - var st apistatus.Status - if rObjRes != nil { - st = rObjRes.Status() - } - err = c.handleError(ctx, st, err) - return res, fmt.Errorf("read header: %w", err) - } - - res.Payload = &objectReadCloser{ - reader: rObj, - elapsedTimeCallback: func(elapsed time.Duration) { - c.incRequests(elapsed, methodObjectGet) - }, - } - - return res, nil -} - -// objectHead invokes sdkClient.ObjectHead parse response status to error and return result as is. -func (c *clientWrapper) objectHead(ctx context.Context, prm PrmObjectHead) (object.Object, error) { - cl, err := c.getClient() - if err != nil { - return object.Object{}, err - } - - prmCnr := prm.addr.Container() - prmObj := prm.addr.Object() - - cliPrm := sdkClient.PrmObjectHead{ - BearerToken: prm.btoken, - Session: prm.stoken, - Raw: prm.raw, - ContainerID: &prmCnr, - ObjectID: &prmObj, - Key: prm.key, - } - - var obj object.Object - - start := time.Now() - res, err := cl.ObjectHead(ctx, cliPrm) - c.incRequests(time.Since(start), methodObjectHead) - var st apistatus.Status - if res != nil { - st = res.Status() - } - if err = c.handleError(ctx, st, err); err != nil { - return obj, fmt.Errorf("read object header via client: %w", err) - } - if !res.ReadHeader(&obj) { - return obj, errors.New("missing object header in response") - } - - return obj, nil -} - -// objectRange returns object range reader. -func (c *clientWrapper) objectRange(ctx context.Context, prm PrmObjectRange) (ResObjectRange, error) { - cl, err := c.getClient() - if err != nil { - return ResObjectRange{}, err - } - - prmCnr := prm.addr.Container() - prmObj := prm.addr.Object() - - cliPrm := sdkClient.PrmObjectRange{ - BearerToken: prm.btoken, - Session: prm.stoken, - ContainerID: &prmCnr, - ObjectID: &prmObj, - Offset: prm.off, - Length: prm.ln, - Key: prm.key, - } - - start := time.Now() - res, err := cl.ObjectRangeInit(ctx, cliPrm) - c.incRequests(time.Since(start), methodObjectRange) - if err = c.handleError(ctx, nil, err); err != nil { - return ResObjectRange{}, fmt.Errorf("init payload range reading on client: %w", err) - } - - return ResObjectRange{ - payload: res, - elapsedTimeCallback: func(elapsed time.Duration) { - c.incRequests(elapsed, methodObjectRange) - }, - }, nil -} - -// objectSearch invokes sdkClient.ObjectSearchInit parse response status to error and return result as is. -func (c *clientWrapper) objectSearch(ctx context.Context, prm PrmObjectSearch) (ResObjectSearch, error) { - cl, err := c.getClient() - if err != nil { - return ResObjectSearch{}, err - } - - cliPrm := sdkClient.PrmObjectSearch{ - ContainerID: &prm.cnrID, - Filters: prm.filters, - Session: prm.stoken, - BearerToken: prm.btoken, - Key: prm.key, - } - - res, err := cl.ObjectSearchInit(ctx, cliPrm) - if err = c.handleError(ctx, nil, err); err != nil { - return ResObjectSearch{}, fmt.Errorf("init object searching on client: %w", err) - } - - return ResObjectSearch{r: res, handleError: c.handleError}, nil -} - -// sessionCreate invokes sdkClient.SessionCreate parse response status to error and return result as is. -func (c *clientWrapper) sessionCreate(ctx context.Context, prm prmCreateSession) (resCreateSession, error) { - cl, err := c.getClient() - if err != nil { - return resCreateSession{}, err - } - - cliPrm := sdkClient.PrmSessionCreate{ - Expiration: prm.exp, - Key: &prm.key, - } - - start := time.Now() - res, err := cl.SessionCreate(ctx, cliPrm) - c.incRequests(time.Since(start), methodSessionCreate) - var st apistatus.Status - if res != nil { - st = res.Status() - } - if err = c.handleError(ctx, st, err); err != nil { - return resCreateSession{}, fmt.Errorf("session creation on client: %w", err) - } - - return resCreateSession{ - id: res.ID(), - sessionKey: res.PublicKey(), - }, nil -} - -func (c *clientStatusMonitor) isHealthy() bool { - return c.healthy.Load() == statusHealthy -} - -func (c *clientStatusMonitor) setHealthy() { - c.healthy.Store(statusHealthy) -} - -func (c *clientStatusMonitor) setUnhealthy() { - c.healthy.Store(statusUnhealthyOnRequest) -} - -func (c *clientStatusMonitor) address() string { - return c.addr -} - -func (c *clientStatusMonitor) incErrorRate() { - c.mu.Lock() - c.currentErrorCount++ - c.overallErrorCount++ - - thresholdReached := c.currentErrorCount >= c.errorThreshold - if thresholdReached { - c.setUnhealthy() - c.currentErrorCount = 0 - } - c.mu.Unlock() - - if thresholdReached { - c.log(zapcore.WarnLevel, "error threshold reached", - zap.String("address", c.addr), zap.Uint32("threshold", c.errorThreshold)) - } -} - -func (c *clientStatusMonitor) incErrorRateToUnhealthy(err error) { - c.mu.Lock() - c.currentErrorCount = 0 - c.overallErrorCount++ - c.setUnhealthy() - c.mu.Unlock() - - c.log(zapcore.WarnLevel, "explicitly mark node unhealthy", zap.String("address", c.addr), zap.Error(err)) -} - -func (c *clientStatusMonitor) log(level zapcore.Level, msg string, fields ...zap.Field) { - if c.logger == nil { - return - } - - c.logger.Log(level, msg, fields...) -} - -func (c *clientStatusMonitor) currentErrorRate() uint32 { - c.mu.RLock() - defer c.mu.RUnlock() - return c.currentErrorCount -} - -func (c *clientStatusMonitor) overallErrorRate() uint64 { - c.mu.RLock() - defer c.mu.RUnlock() - return c.overallErrorCount -} - -func (c *clientStatusMonitor) methodsStatus() []StatusSnapshot { - result := make([]StatusSnapshot, len(c.methods)) - for i, val := range c.methods { - result[i] = val.Snapshot() - } - - return result -} - -func (c *clientWrapper) incRequests(elapsed time.Duration, method MethodIndex) { - methodStat := c.methods[method] - methodStat.IncRequests(elapsed) - if c.prm.poolRequestInfoCallback != nil { - c.prm.poolRequestInfoCallback(RequestInfo{ - Address: c.prm.address, - Method: method, - Elapsed: elapsed, - }) - } -} - -func (c *clientWrapper) close() error { - if !c.isDialed() { - return nil - } - if cl := c.getClientRaw(); cl != nil { - return cl.Close() - } - return nil -} - -func (c *clientWrapper) scheduleGracefulClose() { - cl := c.getClientRaw() - if cl == nil { - return - } - - time.AfterFunc(c.prm.gracefulCloseOnSwitchTimeout, func() { - if err := cl.Close(); err != nil { - c.log(zap.DebugLevel, "close unhealthy client during rebalance", zap.String("address", c.address()), zap.Error(err)) - } - }) -} - -func (c *clientStatusMonitor) handleError(ctx context.Context, st apistatus.Status, err error) error { - if stErr := apistatus.ErrFromStatus(st); stErr != nil { - switch stErr.(type) { - case *apistatus.ServerInternal, - *apistatus.WrongMagicNumber, - *apistatus.SignatureVerification: - c.incErrorRate() - case *apistatus.NodeUnderMaintenance: - c.incErrorRateToUnhealthy(stErr) - } - - if err == nil { - err = stErr - } - - return err - } - - if err != nil { - if needCountError(ctx, err) { - if sdkClient.IsErrNodeUnderMaintenance(err) { - c.incErrorRateToUnhealthy(err) - } else { - c.incErrorRate() - } - } - - return err - } - - return nil -} - -func needCountError(ctx context.Context, err error) bool { - // non-status logic error that could be returned - // from the SDK client; should not be considered - // as a connection error - var siErr *object.SplitInfoError - if errors.As(err, &siErr) { - return false - } - var eiErr *object.ECInfoError - if errors.As(err, &eiErr) { - return false - } - - if ctx != nil && errors.Is(ctx.Err(), context.Canceled) { - return false - } - - return true -} - -// clientBuilder is a type alias of client constructors which open connection -// to the given endpoint. -type clientBuilder = func(endpoint string) client - -// RequestInfo groups info about pool request. -type RequestInfo struct { - Address string - Method MethodIndex - Elapsed time.Duration -} - // InitParameters contains values used to initialize connection Pool. type InitParameters struct { key *ecdsa.PrivateKey