Search service improvements #647

Merged
fyrchik merged 5 commits from fyrchik/frostfs-node:service-simplification-part6 into master 2024-09-04 19:51:03 +00:00
8 changed files with 155 additions and 178 deletions

View file

@ -3,6 +3,7 @@ package searchsvc
import ( import (
"context" "context"
"encoding/hex" "encoding/hex"
"fmt"
"sync" "sync"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
@ -10,12 +11,7 @@ import (
"go.uber.org/zap" "go.uber.org/zap"
) )
func (exec *execCtx) executeOnContainer(ctx context.Context) { func (exec *execCtx) executeOnContainer(ctx context.Context) error {
if exec.isLocal() {
exec.log.Debug(logs.SearchReturnResultDirectly)
return
}
lookupDepth := exec.netmapLookupDepth() lookupDepth := exec.netmapLookupDepth()
exec.log.Debug(logs.TryingToExecuteInContainer, exec.log.Debug(logs.TryingToExecuteInContainer,
@ -23,13 +19,12 @@ func (exec *execCtx) executeOnContainer(ctx context.Context) {
) )
// initialize epoch number // initialize epoch number
ok := exec.initEpoch() if err := exec.initEpoch(); err != nil {
if !ok { return fmt.Errorf("%s: %w", logs.CouldNotGetCurrentEpochNumber, err)
return
} }
for { for {
if exec.processCurrentEpoch(ctx) { if err := exec.processCurrentEpoch(ctx); err != nil {
break break
} }
@ -44,18 +39,17 @@ func (exec *execCtx) executeOnContainer(ctx context.Context) {
exec.curProcEpoch-- exec.curProcEpoch--
} }
exec.status = statusOK return nil
exec.err = nil
} }
func (exec *execCtx) processCurrentEpoch(ctx context.Context) bool { func (exec *execCtx) processCurrentEpoch(ctx context.Context) error {
exec.log.Debug(logs.ProcessEpoch, exec.log.Debug(logs.ProcessEpoch,
zap.Uint64("number", exec.curProcEpoch), zap.Uint64("number", exec.curProcEpoch),
) )
traverser, ok := exec.generateTraverser(exec.containerID()) traverser, err := exec.svc.traverserGenerator.GenerateTraverser(exec.containerID(), nil, exec.curProcEpoch)
if !ok { if err != nil {
return true return fmt.Errorf("%s: %w", logs.SearchCouldNotGenerateContainerTraverser, err)
} }
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
@ -91,12 +85,7 @@ func (exec *execCtx) processCurrentEpoch(ctx context.Context) bool {
c, err := exec.svc.clientConstructor.get(info) c, err := exec.svc.clientConstructor.get(info)
if err != nil { if err != nil {
mtx.Lock() exec.log.Debug(logs.SearchCouldNotConstructRemoteNodeClient, zap.String("error", err.Error()))
exec.status = statusUndefined
exec.err = err
mtx.Unlock()
exec.log.Debug(logs.SearchCouldNotConstructRemoteNodeClient)
return return
} }
@ -109,13 +98,17 @@ func (exec *execCtx) processCurrentEpoch(ctx context.Context) bool {
} }
mtx.Lock() mtx.Lock()
exec.writeIDList(ids) err = exec.writeIDList(ids)
mtx.Unlock() mtx.Unlock()
if err != nil {
exec.log.Debug(logs.SearchCouldNotWriteObjectIdentifiers, zap.String("error", err.Error()))
return
}
}(i) }(i)
} }
wg.Wait() wg.Wait()
} }
return false return nil
} }

View file

@ -1,8 +1,6 @@
package searchsvc package searchsvc
import ( import (
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
@ -10,34 +8,16 @@ import (
"go.uber.org/zap" "go.uber.org/zap"
) )
type statusError struct {
status int
err error
}
type execCtx struct { type execCtx struct {
svc *Service svc *Service
prm Prm prm Prm
statusError
log *logger.Logger log *logger.Logger
curProcEpoch uint64 curProcEpoch uint64
} }
const (
statusUndefined int = iota
statusOK
)
func (exec *execCtx) prepare() {
if _, ok := exec.prm.writer.(*uniqueIDWriter); !ok {
exec.prm.writer = newUniqueAddressWriter(exec.prm.writer)
}
}
func (exec *execCtx) setLogger(l *logger.Logger) { func (exec *execCtx) setLogger(l *logger.Logger) {
exec.log = &logger.Logger{Logger: l.With( exec.log = &logger.Logger{Logger: l.With(
zap.String("request", "SEARCH"), zap.String("request", "SEARCH"),
@ -68,64 +48,24 @@ func (exec *execCtx) netmapLookupDepth() uint64 {
return exec.prm.common.NetmapLookupDepth() return exec.prm.common.NetmapLookupDepth()
} }
func (exec *execCtx) initEpoch() bool { func (exec *execCtx) initEpoch() error {
exec.curProcEpoch = exec.netmapEpoch() exec.curProcEpoch = exec.netmapEpoch()
if exec.curProcEpoch > 0 { if exec.curProcEpoch > 0 {
return true return nil
} }
e, err := exec.svc.currentEpochReceiver.currentEpoch() e, err := exec.svc.currentEpochReceiver.Epoch()
if err != nil {
return err
}
switch {
default:
exec.status = statusUndefined
exec.err = err
exec.log.Debug(logs.CouldNotGetCurrentEpochNumber,
zap.String("error", err.Error()),
)
return false
case err == nil:
exec.curProcEpoch = e exec.curProcEpoch = e
return true return nil
}
} }
func (exec *execCtx) generateTraverser(cnr cid.ID) (*placement.Traverser, bool) { func (exec *execCtx) writeIDList(ids []oid.ID) error {
t, err := exec.svc.traverserGenerator.generateTraverser(cnr, exec.curProcEpoch)
switch {
default:
exec.status = statusUndefined
exec.err = err
exec.log.Debug(logs.SearchCouldNotGenerateContainerTraverser,
zap.String("error", err.Error()),
)
return nil, false
case err == nil:
return t, true
}
}
func (exec *execCtx) writeIDList(ids []oid.ID) {
ids = exec.filterAllowedObjectIDs(ids) ids = exec.filterAllowedObjectIDs(ids)
err := exec.prm.writer.WriteIDs(ids) return exec.prm.writer.WriteIDs(ids)
switch {
default:
exec.status = statusUndefined
exec.err = err
exec.log.Debug(logs.SearchCouldNotWriteObjectIdentifiers,
zap.String("error", err.Error()),
)
case err == nil:
exec.status = statusOK
exec.err = nil
}
} }
func (exec *execCtx) filterAllowedObjectIDs(objIDs []oid.ID) []oid.ID { func (exec *execCtx) filterAllowedObjectIDs(objIDs []oid.ID) []oid.ID {

View file

@ -2,24 +2,22 @@ package searchsvc
import ( import (
"context" "context"
"fmt"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"go.uber.org/zap" "go.uber.org/zap"
) )
func (exec *execCtx) executeLocal(ctx context.Context) { func (exec *execCtx) executeLocal(ctx context.Context) error {
ids, err := exec.svc.localStorage.search(ctx, exec) ids, err := exec.svc.localStorage.search(ctx, exec)
if err != nil { if err != nil {
exec.status = statusUndefined exec.log.Debug(logs.SearchLocalOperationFailed, zap.String("error", err.Error()))
exec.err = err return err
exec.log.Debug(logs.SearchLocalOperationFailed,
zap.String("error", err.Error()),
)
return
} }
exec.writeIDList(ids) if err := exec.writeIDList(ids); err != nil {
return fmt.Errorf("%s: %w", logs.SearchCouldNotWriteObjectIdentifiers, err)
}
return nil
} }

View file

@ -12,7 +12,7 @@ import (
// Prm groups parameters of Get service call. // Prm groups parameters of Get service call.
type Prm struct { type Prm struct {
writer IDListWriter writer *uniqueIDWriter
common *util.CommonPrm common *util.CommonPrm
@ -40,7 +40,7 @@ func (p *Prm) SetCommonParameters(common *util.CommonPrm) {
// SetWriter sets target component to write list of object identifiers. // SetWriter sets target component to write list of object identifiers.
func (p *Prm) SetWriter(w IDListWriter) { func (p *Prm) SetWriter(w IDListWriter) {
p.writer = w p.writer = newUniqueAddressWriter(w)
} }
// SetRequestForwarder sets callback for forwarding // SetRequestForwarder sets callback for forwarding

View file

@ -14,37 +14,32 @@ func (s *Service) Search(ctx context.Context, prm Prm) error {
prm: prm, prm: prm,
} }
exec.prepare()
exec.setLogger(s.log) exec.setLogger(s.log)
exec.execute(ctx) return exec.execute(ctx)
return exec.statusError.err
} }
func (exec *execCtx) execute(ctx context.Context) { func (exec *execCtx) execute(ctx context.Context) error {
exec.log.Debug(logs.ServingRequest) exec.log.Debug(logs.ServingRequest)
// perform local operation err := exec.executeLocal(ctx)
exec.executeLocal(ctx) exec.logResult(err)
exec.analyzeStatus(ctx, true) if exec.isLocal() {
exec.log.Debug(logs.SearchReturnResultDirectly)
return err
} }
func (exec *execCtx) analyzeStatus(ctx context.Context, execCnr bool) { err = exec.executeOnContainer(ctx)
// analyze local result exec.logResult(err)
switch exec.status { return err
}
func (exec *execCtx) logResult(err error) {
switch {
default: default:
exec.log.Debug(logs.OperationFinishedWithError, exec.log.Debug(logs.OperationFinishedWithError, zap.String("error", err.Error()))
zap.String("error", exec.err.Error()), case err == nil:
)
case statusOK:
exec.log.Debug(logs.OperationFinishedSuccessfully) exec.log.Debug(logs.OperationFinishedSuccessfully)
} }
if execCnr {
exec.executeOnContainer(ctx)
exec.analyzeStatus(ctx, false)
}
} }

View file

@ -22,6 +22,7 @@ import (
frostfsecdsa "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/crypto/ecdsa" frostfsecdsa "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/crypto/ecdsa"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
sessionsdk "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session" sessionsdk "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session"
"github.com/google/uuid" "github.com/google/uuid"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys" "github.com/nspcc-dev/neo-go/pkg/crypto/keys"
@ -56,10 +57,18 @@ type simpleIDWriter struct {
type testEpochReceiver uint64 type testEpochReceiver uint64
func (e testEpochReceiver) currentEpoch() (uint64, error) { func (e testEpochReceiver) Epoch() (uint64, error) {
return uint64(e), nil return uint64(e), nil
} }
type errIDWriter struct {
err error
}
func (e errIDWriter) WriteIDs(ids []oid.ID) error {
return e.err
}
func (s *simpleIDWriter) WriteIDs(ids []oid.ID) error { func (s *simpleIDWriter) WriteIDs(ids []oid.ID) error {
s.ids = append(s.ids, ids...) s.ids = append(s.ids, ids...)
return nil return nil
@ -71,7 +80,7 @@ func newTestStorage() *testStorage {
} }
} }
func (g *testTraverserGenerator) generateTraverser(_ cid.ID, epoch uint64) (*placement.Traverser, error) { func (g *testTraverserGenerator) GenerateTraverser(_ cid.ID, _ *oid.ID, epoch uint64) (*placement.Traverser, error) {
return placement.NewTraverser( return placement.NewTraverser(
placement.ForContainer(g.c), placement.ForContainer(g.c),
placement.UseBuilder(g.b[epoch]), placement.UseBuilder(g.b[epoch]),
@ -194,6 +203,20 @@ func TestGetLocalOnly(t *testing.T) {
w := new(simpleIDWriter) w := new(simpleIDWriter)
p := newPrm(cnr, w) p := newPrm(cnr, w)
err := svc.Search(ctx, p)
require.ErrorIs(t, err, testErr)
})
t.Run("FAIL while writing ID", func(t *testing.T) {
storage := newTestStorage()
svc := newSvc(storage)
cnr := cidtest.ID()
storage.addResult(cnr, []oid.ID{oidtest.ID()}, nil)
testErr := errors.New("any error")
w := errIDWriter{testErr}
p := newPrm(cnr, w)
err := svc.Search(ctx, p) err := svc.Search(ctx, p)
require.ErrorIs(t, err, testErr) require.ErrorIs(t, err, testErr)
}) })
@ -280,7 +303,6 @@ func TestGetRemoteSmall(t *testing.T) {
return p return p
} }
t.Run("OK", func(t *testing.T) {
var addr oid.Address var addr oid.Address
addr.SetContainer(id) addr.SetContainer(id)
@ -294,11 +316,9 @@ func TestGetRemoteSmall(t *testing.T) {
c1 := newTestStorage() c1 := newTestStorage()
ids1 := generateIDs(10) ids1 := generateIDs(10)
c1.addResult(id, ids1, nil)
c2 := newTestStorage() c2 := newTestStorage()
ids2 := generateIDs(10) ids2 := generateIDs(10)
c2.addResult(id, ids2, nil)
svc := newSvc(builder, &testClientCache{ svc := newSvc(builder, &testClientCache{
clients: map[string]*testStorage{ clients: map[string]*testStorage{
@ -307,6 +327,10 @@ func TestGetRemoteSmall(t *testing.T) {
}, },
}) })
t.Run("OK", func(t *testing.T) {
c1.addResult(id, ids1, nil)
c2.addResult(id, ids2, nil)
w := new(simpleIDWriter) w := new(simpleIDWriter)
p := newPrm(id, w) p := newPrm(id, w)
@ -319,6 +343,49 @@ func TestGetRemoteSmall(t *testing.T) {
require.Contains(t, w.ids, id) require.Contains(t, w.ids, id)
} }
}) })
t.Run("non-local fail is not a FAIL", func(t *testing.T) {
testErr := errors.New("opaque")
c1.addResult(id, ids1, nil)
c2.addResult(id, nil, testErr)
w := new(simpleIDWriter)
p := newPrm(id, w)
err := svc.Search(ctx, p)
require.NoError(t, err)
require.Equal(t, ids1, w.ids)
})
t.Run("client init fail is not a FAIL", func(t *testing.T) {
svc := newSvc(builder, &testClientCache{
clients: map[string]*testStorage{
as[0][0]: c1,
},
})
c1.addResult(id, ids1, nil)
c2.addResult(id, ids2, nil)
w := new(simpleIDWriter)
p := newPrm(id, w)
err := svc.Search(ctx, p)
require.NoError(t, err)
require.Equal(t, ids1, w.ids)
})
t.Run("context is respected", func(t *testing.T) {
c1.addResult(id, ids1, nil)
c2.addResult(id, ids2, nil)
w := new(simpleIDWriter)
p := newPrm(id, w)
ctx, cancel := context.WithCancel(context.Background())
cancel()
err := svc.Search(ctx, p)
require.NoError(t, err)
require.Empty(t, w.ids)
})
} }
func TestGetFromPastEpoch(t *testing.T) { func TestGetFromPastEpoch(t *testing.T) {

View file

@ -45,11 +45,11 @@ type cfg struct {
} }
traverserGenerator interface { traverserGenerator interface {
generateTraverser(cid.ID, uint64) (*placement.Traverser, error) GenerateTraverser(cid.ID, *oid.ID, uint64) (*placement.Traverser, error)
} }
currentEpochReceiver interface { currentEpochReceiver interface {
currentEpoch() (uint64, error) Epoch() (uint64, error)
} }
keyStore *util.KeyStorage keyStore *util.KeyStorage
@ -71,10 +71,8 @@ func New(e *engine.StorageEngine,
localStorage: &storageEngineWrapper{ localStorage: &storageEngineWrapper{
storage: e, storage: e,
}, },
traverserGenerator: (*traverseGeneratorWrapper)(tg), traverserGenerator: tg,
currentEpochReceiver: &nmSrcWrapper{ currentEpochReceiver: ns,
nmSrc: ns,
},
keyStore: ks, keyStore: ks,
} }

View file

@ -5,12 +5,9 @@ import (
"sync" "sync"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
internalclient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal/client" internalclient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
) )
@ -34,13 +31,10 @@ type storageEngineWrapper struct {
storage *engine.StorageEngine storage *engine.StorageEngine
} }
type traverseGeneratorWrapper util.TraverserGenerator func newUniqueAddressWriter(w IDListWriter) *uniqueIDWriter {
if w, ok := w.(*uniqueIDWriter); ok {
type nmSrcWrapper struct { return w
nmSrc netmap.Source
} }
func newUniqueAddressWriter(w IDListWriter) IDListWriter {
return &uniqueIDWriter{ return &uniqueIDWriter{
written: make(map[oid.ID]struct{}), written: make(map[oid.ID]struct{}),
writer: w, writer: w,
@ -139,11 +133,3 @@ func idsFromAddresses(addrs []oid.Address) []oid.ID {
return ids return ids
} }
func (e *traverseGeneratorWrapper) generateTraverser(cnr cid.ID, epoch uint64) (*placement.Traverser, error) {
return (*util.TraverserGenerator)(e).GenerateTraverser(cnr, nil, epoch)
}
func (n *nmSrcWrapper) currentEpoch() (uint64, error) {
return n.nmSrc.Epoch()
}