blockfetcher: add retry for GET and SEARCH operations

Close #3564

Signed-off-by: Ekaterina Pavlova <ekt@morphbits.io>
This commit is contained in:
Ekaterina Pavlova 2024-11-26 00:25:38 +03:00
parent 9fa07d8d6d
commit 92ff8409a9

View file

@ -44,6 +44,18 @@ const (
defaultHealthcheckTimeout = 10 * time.Second defaultHealthcheckTimeout = 10 * time.Second
) )
// Constants related to retry mechanism.
const (
// maxRetries is the maximum number of retries for a single operation.
maxRetries = 5
// initialBackoff is the initial backoff duration.
initialBackoff = 500 * time.Millisecond
// backoffFactor is the factor by which the backoff duration is multiplied.
backoffFactor = 2
// maxBackoff is the maximum backoff duration.
maxBackoff = 20 * time.Second
)
// Ledger is an interface to Blockchain sufficient for Service. // Ledger is an interface to Blockchain sufficient for Service.
type Ledger interface { type Ledger interface {
GetConfig() config.Blockchain GetConfig() config.Blockchain
@ -215,7 +227,9 @@ func (bfs *Service) oidDownloader() {
} }
var force bool var force bool
if err != nil { if err != nil {
bfs.log.Error("NeoFS BlockFetcher service: OID downloading routine failed", zap.Error(err)) if !isContextCanceledErr(err) {
bfs.log.Error("NeoFS BlockFetcher service: OID downloading routine failed", zap.Error(err))
}
force = true force = true
} }
// Stop the service since there's nothing to do anymore. // Stop the service since there's nothing to do anymore.
@ -489,21 +503,64 @@ func (bfs *Service) IsActive() bool {
return bfs.isActive.Load() return bfs.isActive.Load()
} }
// retry function with exponential backoff.
func (bfs *Service) retry(action func() error) error {
var (
err error
backoff = initialBackoff
timer = time.NewTimer(0)
)
defer func() {
if !timer.Stop() {
<-timer.C
}
}()
for i := range maxRetries {
if err = action(); err == nil {
return nil
}
if i == maxRetries-1 {
break
}
timer.Reset(backoff)
select {
case <-timer.C:
case <-bfs.ctx.Done():
return bfs.ctx.Err()
}
backoff *= time.Duration(backoffFactor)
if backoff > maxBackoff {
backoff = maxBackoff
}
}
return err
}
func (bfs *Service) objectGet(ctx context.Context, oid string) (io.ReadCloser, error) { func (bfs *Service) objectGet(ctx context.Context, oid string) (io.ReadCloser, error) {
u, err := url.Parse(fmt.Sprintf("neofs:%s/%s", bfs.cfg.ContainerID, oid)) u, err := url.Parse(fmt.Sprintf("neofs:%s/%s", bfs.cfg.ContainerID, oid))
if err != nil { if err != nil {
return nil, err return nil, err
} }
rc, err := neofs.GetWithClient(ctx, bfs.pool, bfs.account.PrivateKey(), u, false) var rc io.ReadCloser
if err != nil { err = bfs.retry(func() error {
return nil, err rc, err = neofs.GetWithClient(ctx, bfs.pool, bfs.account.PrivateKey(), u, false)
} return err
})
return rc, nil return rc, err
} }
func (bfs *Service) objectSearch(ctx context.Context, prm client.PrmObjectSearch) ([]oid.ID, error) { func (bfs *Service) objectSearch(ctx context.Context, prm client.PrmObjectSearch) ([]oid.ID, error) {
return neofs.ObjectSearch(ctx, bfs.pool, bfs.account.PrivateKey(), bfs.cfg.ContainerID, prm) var (
oids []oid.ID
err error
)
err = bfs.retry(func() error {
oids, err = neofs.ObjectSearch(ctx, bfs.pool, bfs.account.PrivateKey(), bfs.cfg.ContainerID, prm)
return err
})
return oids, err
} }
// isContextCanceledErr returns whether error is a wrapped [context.Canceled]. // isContextCanceledErr returns whether error is a wrapped [context.Canceled].