forked from TrueCloudLab/frostfs-node
[#647] objsvc/search: Simplify error handling
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
This commit is contained in:
parent
56f841b022
commit
966ad22abf
4 changed files with 51 additions and 117 deletions
|
@ -3,6 +3,7 @@ package searchsvc
|
|||
import (
|
||||
"context"
|
||||
"encoding/hex"
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
|
@ -10,12 +11,7 @@ import (
|
|||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
func (exec *execCtx) executeOnContainer(ctx context.Context) {
|
||||
if exec.isLocal() {
|
||||
exec.log.Debug(logs.SearchReturnResultDirectly)
|
||||
return
|
||||
}
|
||||
|
||||
func (exec *execCtx) executeOnContainer(ctx context.Context) error {
|
||||
lookupDepth := exec.netmapLookupDepth()
|
||||
|
||||
exec.log.Debug(logs.TryingToExecuteInContainer,
|
||||
|
@ -23,13 +19,12 @@ func (exec *execCtx) executeOnContainer(ctx context.Context) {
|
|||
)
|
||||
|
||||
// initialize epoch number
|
||||
ok := exec.initEpoch()
|
||||
if !ok {
|
||||
return
|
||||
if err := exec.initEpoch(); err != nil {
|
||||
return fmt.Errorf("%s: %w", logs.CouldNotGetCurrentEpochNumber, err)
|
||||
}
|
||||
|
||||
for {
|
||||
if exec.processCurrentEpoch(ctx) {
|
||||
if err := exec.processCurrentEpoch(ctx); err != nil {
|
||||
break
|
||||
}
|
||||
|
||||
|
@ -44,18 +39,17 @@ func (exec *execCtx) executeOnContainer(ctx context.Context) {
|
|||
exec.curProcEpoch--
|
||||
}
|
||||
|
||||
exec.status = statusOK
|
||||
exec.err = nil
|
||||
return nil
|
||||
}
|
||||
|
||||
func (exec *execCtx) processCurrentEpoch(ctx context.Context) bool {
|
||||
func (exec *execCtx) processCurrentEpoch(ctx context.Context) error {
|
||||
exec.log.Debug(logs.ProcessEpoch,
|
||||
zap.Uint64("number", exec.curProcEpoch),
|
||||
)
|
||||
|
||||
traverser, ok := exec.generateTraverser(exec.containerID())
|
||||
if !ok {
|
||||
return true
|
||||
traverser, err := exec.svc.traverserGenerator.GenerateTraverser(exec.containerID(), nil, exec.curProcEpoch)
|
||||
if err != nil {
|
||||
return fmt.Errorf("%s: %w", logs.SearchCouldNotGenerateContainerTraverser, err)
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
|
@ -91,12 +85,7 @@ func (exec *execCtx) processCurrentEpoch(ctx context.Context) bool {
|
|||
|
||||
c, err := exec.svc.clientConstructor.get(info)
|
||||
if err != nil {
|
||||
mtx.Lock()
|
||||
exec.status = statusUndefined
|
||||
exec.err = err
|
||||
mtx.Unlock()
|
||||
|
||||
exec.log.Debug(logs.SearchCouldNotConstructRemoteNodeClient)
|
||||
exec.log.Debug(logs.SearchCouldNotConstructRemoteNodeClient, zap.String("error", err.Error()))
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -109,13 +98,17 @@ func (exec *execCtx) processCurrentEpoch(ctx context.Context) bool {
|
|||
}
|
||||
|
||||
mtx.Lock()
|
||||
exec.writeIDList(ids)
|
||||
err = exec.writeIDList(ids)
|
||||
mtx.Unlock()
|
||||
if err != nil {
|
||||
exec.log.Debug(logs.SearchCouldNotWriteObjectIdentifiers, zap.String("error", err.Error()))
|
||||
return
|
||||
}
|
||||
}(i)
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
return false
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -1,8 +1,6 @@
|
|||
package searchsvc
|
||||
|
||||
import (
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
|
@ -10,28 +8,16 @@ import (
|
|||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type statusError struct {
|
||||
status int
|
||||
err error
|
||||
}
|
||||
|
||||
type execCtx struct {
|
||||
svc *Service
|
||||
|
||||
prm Prm
|
||||
|
||||
statusError
|
||||
|
||||
log *logger.Logger
|
||||
|
||||
curProcEpoch uint64
|
||||
}
|
||||
|
||||
const (
|
||||
statusUndefined int = iota
|
||||
statusOK
|
||||
)
|
||||
|
||||
func (exec *execCtx) prepare() {
|
||||
if _, ok := exec.prm.writer.(*uniqueIDWriter); !ok {
|
||||
exec.prm.writer = newUniqueAddressWriter(exec.prm.writer)
|
||||
|
@ -68,64 +54,24 @@ func (exec *execCtx) netmapLookupDepth() uint64 {
|
|||
return exec.prm.common.NetmapLookupDepth()
|
||||
}
|
||||
|
||||
func (exec *execCtx) initEpoch() bool {
|
||||
func (exec *execCtx) initEpoch() error {
|
||||
exec.curProcEpoch = exec.netmapEpoch()
|
||||
if exec.curProcEpoch > 0 {
|
||||
return true
|
||||
return nil
|
||||
}
|
||||
|
||||
e, err := exec.svc.currentEpochReceiver.Epoch()
|
||||
|
||||
switch {
|
||||
default:
|
||||
exec.status = statusUndefined
|
||||
exec.err = err
|
||||
|
||||
exec.log.Debug(logs.CouldNotGetCurrentEpochNumber,
|
||||
zap.String("error", err.Error()),
|
||||
)
|
||||
|
||||
return false
|
||||
case err == nil:
|
||||
exec.curProcEpoch = e
|
||||
return true
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
exec.curProcEpoch = e
|
||||
return nil
|
||||
}
|
||||
|
||||
func (exec *execCtx) generateTraverser(cnr cid.ID) (*placement.Traverser, bool) {
|
||||
t, err := exec.svc.traverserGenerator.GenerateTraverser(cnr, nil, exec.curProcEpoch)
|
||||
|
||||
switch {
|
||||
default:
|
||||
exec.status = statusUndefined
|
||||
exec.err = err
|
||||
|
||||
exec.log.Debug(logs.SearchCouldNotGenerateContainerTraverser,
|
||||
zap.String("error", err.Error()),
|
||||
)
|
||||
|
||||
return nil, false
|
||||
case err == nil:
|
||||
return t, true
|
||||
}
|
||||
}
|
||||
|
||||
func (exec *execCtx) writeIDList(ids []oid.ID) {
|
||||
func (exec *execCtx) writeIDList(ids []oid.ID) error {
|
||||
ids = exec.filterAllowedObjectIDs(ids)
|
||||
err := exec.prm.writer.WriteIDs(ids)
|
||||
|
||||
switch {
|
||||
default:
|
||||
exec.status = statusUndefined
|
||||
exec.err = err
|
||||
|
||||
exec.log.Debug(logs.SearchCouldNotWriteObjectIdentifiers,
|
||||
zap.String("error", err.Error()),
|
||||
)
|
||||
case err == nil:
|
||||
exec.status = statusOK
|
||||
exec.err = nil
|
||||
}
|
||||
return exec.prm.writer.WriteIDs(ids)
|
||||
}
|
||||
|
||||
func (exec *execCtx) filterAllowedObjectIDs(objIDs []oid.ID) []oid.ID {
|
||||
|
|
|
@ -2,24 +2,22 @@ package searchsvc
|
|||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
func (exec *execCtx) executeLocal(ctx context.Context) {
|
||||
func (exec *execCtx) executeLocal(ctx context.Context) error {
|
||||
ids, err := exec.svc.localStorage.search(ctx, exec)
|
||||
|
||||
if err != nil {
|
||||
exec.status = statusUndefined
|
||||
exec.err = err
|
||||
|
||||
exec.log.Debug(logs.SearchLocalOperationFailed,
|
||||
zap.String("error", err.Error()),
|
||||
)
|
||||
|
||||
return
|
||||
exec.log.Debug(logs.SearchLocalOperationFailed, zap.String("error", err.Error()))
|
||||
return err
|
||||
}
|
||||
|
||||
exec.writeIDList(ids)
|
||||
if err := exec.writeIDList(ids); err != nil {
|
||||
return fmt.Errorf("%s: %w", logs.SearchCouldNotWriteObjectIdentifiers, err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -18,33 +18,30 @@ func (s *Service) Search(ctx context.Context, prm Prm) error {
|
|||
|
||||
exec.setLogger(s.log)
|
||||
|
||||
exec.execute(ctx)
|
||||
|
||||
return exec.statusError.err
|
||||
return exec.execute(ctx)
|
||||
}
|
||||
|
||||
func (exec *execCtx) execute(ctx context.Context) {
|
||||
func (exec *execCtx) execute(ctx context.Context) error {
|
||||
exec.log.Debug(logs.ServingRequest)
|
||||
|
||||
// perform local operation
|
||||
exec.executeLocal(ctx)
|
||||
err := exec.executeLocal(ctx)
|
||||
exec.logResult(err)
|
||||
|
||||
exec.analyzeStatus(ctx, true)
|
||||
if exec.isLocal() {
|
||||
exec.log.Debug(logs.SearchReturnResultDirectly)
|
||||
return err
|
||||
}
|
||||
|
||||
err = exec.executeOnContainer(ctx)
|
||||
exec.logResult(err)
|
||||
return err
|
||||
}
|
||||
|
||||
func (exec *execCtx) analyzeStatus(ctx context.Context, execCnr bool) {
|
||||
// analyze local result
|
||||
switch exec.status {
|
||||
func (exec *execCtx) logResult(err error) {
|
||||
switch {
|
||||
default:
|
||||
exec.log.Debug(logs.OperationFinishedWithError,
|
||||
zap.String("error", exec.err.Error()),
|
||||
)
|
||||
case statusOK:
|
||||
exec.log.Debug(logs.OperationFinishedWithError, zap.String("error", err.Error()))
|
||||
case err == nil:
|
||||
exec.log.Debug(logs.OperationFinishedSuccessfully)
|
||||
}
|
||||
|
||||
if execCnr {
|
||||
exec.executeOnContainer(ctx)
|
||||
exec.analyzeStatus(ctx, false)
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue