[#648] objsvc/delete: Handle errors in Go style

Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
This commit is contained in:
Evgenii Stratonikov 2023-08-25 10:57:31 +03:00 committed by Evgenii Stratonikov
parent d2084ece41
commit f8ba60aa0c
4 changed files with 62 additions and 181 deletions

View file

@ -82,19 +82,12 @@ const (
PersistentCouldNotCleanUpExpiredTokens = "could not clean up expired tokens" PersistentCouldNotCleanUpExpiredTokens = "could not clean up expired tokens"
ControllerReportIsAlreadyStarted = "report is already started" ControllerReportIsAlreadyStarted = "report is already started"
TombstoneCouldNotGetTheTombstoneTheSource = "tombstone getter: could not get the tombstone the source" TombstoneCouldNotGetTheTombstoneTheSource = "tombstone getter: could not get the tombstone the source"
DeleteCouldNotComposeSplitInfo = "could not compose split info"
DeleteNoSplitInfoObjectIsPHY = "no split info, object is PHY" DeleteNoSplitInfoObjectIsPHY = "no split info, object is PHY"
DeleteAssemblingChain = "assembling chain..." DeleteAssemblingChain = "assembling chain..."
DeleteCouldNotGetPreviousSplitElement = "could not get previous split element"
DeleteCollectingChildren = "collecting children..." DeleteCollectingChildren = "collecting children..."
DeleteCouldNotCollectObjectChildren = "could not collect object children"
DeleteSupplementBySplitID = "supplement by split ID" DeleteSupplementBySplitID = "supplement by split ID"
DeleteCouldNotSearchForSplitChainMembers = "could not search for split chain members"
DeleteCouldNotMarshalTombstoneStructure = "could not marshal tombstone structure"
DeleteCouldNotSaveTheTombstone = "could not save the tombstone"
DeleteFormingTombstoneStructure = "forming tombstone structure..." DeleteFormingTombstoneStructure = "forming tombstone structure..."
DeleteTombstoneStructureSuccessfullyFormedSaving = "tombstone structure successfully formed, saving..." DeleteTombstoneStructureSuccessfullyFormedSaving = "tombstone structure successfully formed, saving..."
DeleteCouldNotReadTombstoneLifetimeConfig = "could not read tombstone lifetime config"
DeleteFormingSplitInfo = "forming split info..." DeleteFormingSplitInfo = "forming split info..."
DeleteSplitInfoSuccessfullyFormedCollectingMembers = "split info successfully formed, collecting members..." DeleteSplitInfoSuccessfullyFormedCollectingMembers = "split info successfully formed, collecting members..."
DeleteMembersSuccessfullyCollected = "members successfully collected" DeleteMembersSuccessfullyCollected = "members successfully collected"

View file

@ -29,27 +29,17 @@ func (s *Service) Delete(ctx context.Context, prm Prm) error {
exec.setLogger(s.log) exec.setLogger(s.log)
exec.execute(ctx) return exec.execute(ctx)
return exec.statusError.err
} }
func (exec *execCtx) execute(ctx context.Context) { func (exec *execCtx) execute(ctx context.Context) error {
exec.log.Debug(logs.ServingRequest) exec.log.Debug(logs.ServingRequest)
// perform local operation if err := exec.executeLocal(ctx); err != nil {
exec.executeLocal(ctx) exec.log.Debug(logs.OperationFinishedWithError, zap.String("error", err.Error()))
exec.analyzeStatus() return err
} }
func (exec *execCtx) analyzeStatus() {
// analyze local result
switch exec.status {
case statusOK:
exec.log.Debug(logs.OperationFinishedSuccessfully) exec.log.Debug(logs.OperationFinishedSuccessfully)
default: return nil
exec.log.Debug(logs.OperationFinishedWithError,
zap.String("error", exec.err.Error()),
)
}
} }

View file

@ -2,6 +2,7 @@ package deletesvc
import ( import (
"context" "context"
"fmt"
"strconv" "strconv"
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object" objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
@ -15,18 +16,11 @@ import (
"go.uber.org/zap" "go.uber.org/zap"
) )
type statusError struct {
status int
err error
}
type execCtx struct { type execCtx struct {
svc *Service svc *Service
prm Prm prm Prm
statusError
log *logger.Logger log *logger.Logger
tombstone *objectSDK.Tombstone tombstone *objectSDK.Tombstone
@ -36,11 +30,6 @@ type execCtx struct {
tombstoneObj *objectSDK.Object tombstoneObj *objectSDK.Object
} }
const (
statusUndefined int = iota
statusOK
)
func (exec *execCtx) setLogger(l *logger.Logger) { func (exec *execCtx) setLogger(l *logger.Logger) {
exec.log = &logger.Logger{Logger: l.With( exec.log = &logger.Logger{Logger: l.With(
zap.String("request", "DELETE"), zap.String("request", "DELETE"),
@ -75,48 +64,34 @@ func (exec *execCtx) newAddress(id oid.ID) oid.Address {
return a return a
} }
func (exec *execCtx) formSplitInfo(ctx context.Context) bool { func (exec *execCtx) formSplitInfo(ctx context.Context) error {
success := false
var err error var err error
exec.splitInfo, err = exec.svc.header.splitInfo(ctx, exec) exec.splitInfo, err = exec.svc.header.splitInfo(ctx, exec)
if err != nil && !apiclient.IsErrObjectAlreadyRemoved(err) {
switch {
default:
exec.status = statusUndefined
exec.err = err
exec.log.Debug(logs.DeleteCouldNotComposeSplitInfo,
zap.String("error", err.Error()),
)
case err == nil, apiclient.IsErrObjectAlreadyRemoved(err):
// IsErrObjectAlreadyRemoved check is required because splitInfo // IsErrObjectAlreadyRemoved check is required because splitInfo
// implicitly performs Head request that may return ObjectAlreadyRemoved // implicitly performs Head request that may return ObjectAlreadyRemoved
// status that is not specified for Delete // status that is not specified for Delete.
return err
exec.status = statusOK
exec.err = nil
success = true
} }
return success return nil
} }
func (exec *execCtx) collectMembers(ctx context.Context) (ok bool) { func (exec *execCtx) collectMembers(ctx context.Context) error {
if exec.splitInfo == nil { if exec.splitInfo == nil {
exec.log.Debug(logs.DeleteNoSplitInfoObjectIsPHY) exec.log.Debug(logs.DeleteNoSplitInfoObjectIsPHY)
return true return nil
} }
var err error
if _, withLink := exec.splitInfo.Link(); withLink { if _, withLink := exec.splitInfo.Link(); withLink {
ok = exec.collectChildren(ctx) err = exec.collectChildren(ctx)
} }
if !ok { if err != nil {
if _, withLast := exec.splitInfo.LastPart(); withLast { if _, withLast := exec.splitInfo.LastPart(); withLast {
ok = exec.collectChain(ctx) if err := exec.collectChain(ctx); err != nil {
if !ok { return err
return
} }
} }
} // may be fail if neither right nor linking ID is set? } // may be fail if neither right nor linking ID is set?
@ -124,7 +99,7 @@ func (exec *execCtx) collectMembers(ctx context.Context) (ok bool) {
return exec.supplementBySplitID(ctx) return exec.supplementBySplitID(ctx)
} }
func (exec *execCtx) collectChain(ctx context.Context) bool { func (exec *execCtx) collectChain(ctx context.Context) error {
var chain []oid.ID var chain []oid.ID
exec.log.Debug(logs.DeleteAssemblingChain) exec.log.Debug(logs.DeleteAssemblingChain)
@ -133,84 +108,43 @@ func (exec *execCtx) collectChain(ctx context.Context) bool {
chain = append(chain, prev) chain = append(chain, prev)
p, err := exec.svc.header.previous(ctx, exec, prev) p, err := exec.svc.header.previous(ctx, exec, prev)
if err != nil {
switch { return fmt.Errorf("get previous split element for %s: %w", prev, err)
default: }
exec.status = statusUndefined
exec.err = err
exec.log.Debug(logs.DeleteCouldNotGetPreviousSplitElement,
zap.Stringer("id", prev),
zap.String("error", err.Error()),
)
return false
case err == nil:
exec.status = statusOK
exec.err = nil
withPrev = p != nil withPrev = p != nil
if withPrev { if withPrev {
prev = *p prev = *p
} }
} }
}
exec.addMembers(chain) exec.addMembers(chain)
return nil
return true
} }
func (exec *execCtx) collectChildren(ctx context.Context) bool { func (exec *execCtx) collectChildren(ctx context.Context) error {
exec.log.Debug(logs.DeleteCollectingChildren) exec.log.Debug(logs.DeleteCollectingChildren)
children, err := exec.svc.header.children(ctx, exec) children, err := exec.svc.header.children(ctx, exec)
if err != nil {
switch { return fmt.Errorf("collect children: %w", err)
default: }
exec.status = statusUndefined
exec.err = err
exec.log.Debug(logs.DeleteCouldNotCollectObjectChildren,
zap.String("error", err.Error()),
)
return false
case err == nil:
exec.status = statusOK
exec.err = nil
link, _ := exec.splitInfo.Link() link, _ := exec.splitInfo.Link()
exec.addMembers(append(children, link)) exec.addMembers(append(children, link))
return nil
return true
}
} }
func (exec *execCtx) supplementBySplitID(ctx context.Context) bool { func (exec *execCtx) supplementBySplitID(ctx context.Context) error {
exec.log.Debug(logs.DeleteSupplementBySplitID) exec.log.Debug(logs.DeleteSupplementBySplitID)
chain, err := exec.svc.searcher.splitMembers(ctx, exec) chain, err := exec.svc.searcher.splitMembers(ctx, exec)
if err != nil {
switch { return fmt.Errorf("search split chain members: %w", err)
default: }
exec.status = statusUndefined
exec.err = err
exec.log.Debug(logs.DeleteCouldNotSearchForSplitChainMembers,
zap.String("error", err.Error()),
)
return false
case err == nil:
exec.status = statusOK
exec.err = nil
exec.addMembers(chain) exec.addMembers(chain)
return nil
return true
}
} }
func (exec *execCtx) addMembers(incoming []oid.ID) { func (exec *execCtx) addMembers(incoming []oid.ID) {
@ -228,17 +162,10 @@ func (exec *execCtx) addMembers(incoming []oid.ID) {
exec.tombstone.SetMembers(append(members, incoming...)) exec.tombstone.SetMembers(append(members, incoming...))
} }
func (exec *execCtx) initTombstoneObject() bool { func (exec *execCtx) initTombstoneObject() error {
payload, err := exec.tombstone.Marshal() payload, err := exec.tombstone.Marshal()
if err != nil { if err != nil {
exec.status = statusUndefined return fmt.Errorf("marshal tombstone: %w", err)
exec.err = err
exec.log.Debug(logs.DeleteCouldNotMarshalTombstoneStructure,
zap.String("error", err.Error()),
)
return false
} }
exec.tombstoneObj = objectSDK.New() exec.tombstoneObj = objectSDK.New()
@ -262,29 +189,15 @@ func (exec *execCtx) initTombstoneObject() bool {
exec.tombstoneObj.SetAttributes(a) exec.tombstoneObj.SetAttributes(a)
return true return nil
} }
func (exec *execCtx) saveTombstone(ctx context.Context) bool { func (exec *execCtx) saveTombstone(ctx context.Context) error {
id, err := exec.svc.placer.put(ctx, exec) id, err := exec.svc.placer.put(ctx, exec)
if err != nil {
switch { return fmt.Errorf("save tombstone: %w", err)
default:
exec.status = statusUndefined
exec.err = err
exec.log.Debug(logs.DeleteCouldNotSaveTheTombstone,
zap.String("error", err.Error()),
)
return false
case err == nil:
exec.status = statusOK
exec.err = nil
exec.prm.tombAddrWriter.
SetAddress(exec.newAddress(*id))
} }
return true exec.prm.tombAddrWriter.SetAddress(exec.newAddress(*id))
return nil
} }

View file

@ -2,37 +2,29 @@ package deletesvc
import ( import (
"context" "context"
"fmt"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.uber.org/zap"
) )
func (exec *execCtx) executeLocal(ctx context.Context) { func (exec *execCtx) executeLocal(ctx context.Context) error {
exec.log.Debug(logs.DeleteFormingTombstoneStructure) exec.log.Debug(logs.DeleteFormingTombstoneStructure)
ok := exec.formTombstone(ctx) if err := exec.formTombstone(ctx); err != nil {
if !ok { return err
return
} }
exec.log.Debug(logs.DeleteTombstoneStructureSuccessfullyFormedSaving) exec.log.Debug(logs.DeleteTombstoneStructureSuccessfullyFormedSaving)
exec.saveTombstone(ctx) return exec.saveTombstone(ctx)
} }
func (exec *execCtx) formTombstone(ctx context.Context) (ok bool) { func (exec *execCtx) formTombstone(ctx context.Context) error {
tsLifetime, err := exec.svc.netInfo.TombstoneLifetime() tsLifetime, err := exec.svc.netInfo.TombstoneLifetime()
if err != nil { if err != nil {
exec.status = statusUndefined return fmt.Errorf("fetch tombstone lifetime: %w", err)
exec.err = err
exec.log.Debug(logs.DeleteCouldNotReadTombstoneLifetimeConfig,
zap.String("error", err.Error()),
)
return false
} }
exec.tombstone = objectSDK.NewTombstone() exec.tombstone = objectSDK.NewTombstone()
@ -43,26 +35,19 @@ func (exec *execCtx) formTombstone(ctx context.Context) (ok bool) {
exec.log.Debug(logs.DeleteFormingSplitInfo) exec.log.Debug(logs.DeleteFormingSplitInfo)
ok = exec.formSplitInfo(ctx) if err := exec.formSplitInfo(ctx); err != nil {
if !ok { return fmt.Errorf("form split info: %w", err)
return
} }
exec.log.Debug(logs.DeleteSplitInfoSuccessfullyFormedCollectingMembers) exec.log.Debug(logs.DeleteSplitInfoSuccessfullyFormedCollectingMembers)
exec.tombstone.SetSplitID(exec.splitInfo.SplitID()) exec.tombstone.SetSplitID(exec.splitInfo.SplitID())
ok = exec.collectMembers(ctx) if err := exec.collectMembers(ctx); err != nil {
if !ok { return err
return
} }
exec.log.Debug(logs.DeleteMembersSuccessfullyCollected) exec.log.Debug(logs.DeleteMembersSuccessfullyCollected)
ok = exec.initTombstoneObject() return exec.initTombstoneObject()
if !ok {
return
}
return true
} }