Search service improvements #647
8 changed files with 155 additions and 178 deletions
|
@ -3,6 +3,7 @@ package searchsvc
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"encoding/hex"
|
"encoding/hex"
|
||||||
|
"fmt"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||||
|
@ -10,12 +11,7 @@ import (
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (exec *execCtx) executeOnContainer(ctx context.Context) {
|
func (exec *execCtx) executeOnContainer(ctx context.Context) error {
|
||||||
if exec.isLocal() {
|
|
||||||
exec.log.Debug(logs.SearchReturnResultDirectly)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
lookupDepth := exec.netmapLookupDepth()
|
lookupDepth := exec.netmapLookupDepth()
|
||||||
|
|
||||||
exec.log.Debug(logs.TryingToExecuteInContainer,
|
exec.log.Debug(logs.TryingToExecuteInContainer,
|
||||||
|
@ -23,13 +19,12 @@ func (exec *execCtx) executeOnContainer(ctx context.Context) {
|
||||||
)
|
)
|
||||||
|
|
||||||
// initialize epoch number
|
// initialize epoch number
|
||||||
ok := exec.initEpoch()
|
if err := exec.initEpoch(); err != nil {
|
||||||
if !ok {
|
return fmt.Errorf("%s: %w", logs.CouldNotGetCurrentEpochNumber, err)
|
||||||
return
|
|
||||||
}
|
}
|
||||||
|
|
||||||
for {
|
for {
|
||||||
if exec.processCurrentEpoch(ctx) {
|
if err := exec.processCurrentEpoch(ctx); err != nil {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -44,18 +39,17 @@ func (exec *execCtx) executeOnContainer(ctx context.Context) {
|
||||||
exec.curProcEpoch--
|
exec.curProcEpoch--
|
||||||
}
|
}
|
||||||
|
|
||||||
exec.status = statusOK
|
return nil
|
||||||
exec.err = nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (exec *execCtx) processCurrentEpoch(ctx context.Context) bool {
|
func (exec *execCtx) processCurrentEpoch(ctx context.Context) error {
|
||||||
exec.log.Debug(logs.ProcessEpoch,
|
exec.log.Debug(logs.ProcessEpoch,
|
||||||
zap.Uint64("number", exec.curProcEpoch),
|
zap.Uint64("number", exec.curProcEpoch),
|
||||||
)
|
)
|
||||||
|
|
||||||
traverser, ok := exec.generateTraverser(exec.containerID())
|
traverser, err := exec.svc.traverserGenerator.GenerateTraverser(exec.containerID(), nil, exec.curProcEpoch)
|
||||||
if !ok {
|
if err != nil {
|
||||||
return true
|
return fmt.Errorf("%s: %w", logs.SearchCouldNotGenerateContainerTraverser, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
ctx, cancel := context.WithCancel(ctx)
|
ctx, cancel := context.WithCancel(ctx)
|
||||||
|
@ -91,12 +85,7 @@ func (exec *execCtx) processCurrentEpoch(ctx context.Context) bool {
|
||||||
|
|
||||||
c, err := exec.svc.clientConstructor.get(info)
|
c, err := exec.svc.clientConstructor.get(info)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
mtx.Lock()
|
exec.log.Debug(logs.SearchCouldNotConstructRemoteNodeClient, zap.String("error", err.Error()))
|
||||||
exec.status = statusUndefined
|
|
||||||
exec.err = err
|
|
||||||
mtx.Unlock()
|
|
||||||
|
|
||||||
exec.log.Debug(logs.SearchCouldNotConstructRemoteNodeClient)
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -109,13 +98,17 @@ func (exec *execCtx) processCurrentEpoch(ctx context.Context) bool {
|
||||||
}
|
}
|
||||||
|
|
||||||
mtx.Lock()
|
mtx.Lock()
|
||||||
exec.writeIDList(ids)
|
err = exec.writeIDList(ids)
|
||||||
mtx.Unlock()
|
mtx.Unlock()
|
||||||
|
if err != nil {
|
||||||
|
exec.log.Debug(logs.SearchCouldNotWriteObjectIdentifiers, zap.String("error", err.Error()))
|
||||||
|
return
|
||||||
|
}
|
||||||
}(i)
|
}(i)
|
||||||
}
|
}
|
||||||
|
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
}
|
}
|
||||||
|
|
||||||
return false
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,8 +1,6 @@
|
||||||
package searchsvc
|
package searchsvc
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
|
@ -10,34 +8,16 @@ import (
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
type statusError struct {
|
|
||||||
status int
|
|
||||||
err error
|
|
||||||
}
|
|
||||||
|
|
||||||
type execCtx struct {
|
type execCtx struct {
|
||||||
svc *Service
|
svc *Service
|
||||||
|
|
||||||
prm Prm
|
prm Prm
|
||||||
|
|
||||||
statusError
|
|
||||||
|
|
||||||
log *logger.Logger
|
log *logger.Logger
|
||||||
|
|
||||||
curProcEpoch uint64
|
curProcEpoch uint64
|
||||||
}
|
}
|
||||||
|
|
||||||
const (
|
|
||||||
statusUndefined int = iota
|
|
||||||
statusOK
|
|
||||||
)
|
|
||||||
|
|
||||||
func (exec *execCtx) prepare() {
|
|
||||||
if _, ok := exec.prm.writer.(*uniqueIDWriter); !ok {
|
|
||||||
exec.prm.writer = newUniqueAddressWriter(exec.prm.writer)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (exec *execCtx) setLogger(l *logger.Logger) {
|
func (exec *execCtx) setLogger(l *logger.Logger) {
|
||||||
exec.log = &logger.Logger{Logger: l.With(
|
exec.log = &logger.Logger{Logger: l.With(
|
||||||
zap.String("request", "SEARCH"),
|
zap.String("request", "SEARCH"),
|
||||||
|
@ -68,64 +48,24 @@ func (exec *execCtx) netmapLookupDepth() uint64 {
|
||||||
return exec.prm.common.NetmapLookupDepth()
|
return exec.prm.common.NetmapLookupDepth()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (exec *execCtx) initEpoch() bool {
|
func (exec *execCtx) initEpoch() error {
|
||||||
exec.curProcEpoch = exec.netmapEpoch()
|
exec.curProcEpoch = exec.netmapEpoch()
|
||||||
if exec.curProcEpoch > 0 {
|
if exec.curProcEpoch > 0 {
|
||||||
return true
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
e, err := exec.svc.currentEpochReceiver.currentEpoch()
|
e, err := exec.svc.currentEpochReceiver.Epoch()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
switch {
|
|
||||||
default:
|
|
||||||
exec.status = statusUndefined
|
|
||||||
exec.err = err
|
|
||||||
|
|
||||||
exec.log.Debug(logs.CouldNotGetCurrentEpochNumber,
|
|
||||||
zap.String("error", err.Error()),
|
|
||||||
)
|
|
||||||
|
|
||||||
return false
|
|
||||||
case err == nil:
|
|
||||||
exec.curProcEpoch = e
|
exec.curProcEpoch = e
|
||||||
return true
|
return nil
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (exec *execCtx) generateTraverser(cnr cid.ID) (*placement.Traverser, bool) {
|
func (exec *execCtx) writeIDList(ids []oid.ID) error {
|
||||||
t, err := exec.svc.traverserGenerator.generateTraverser(cnr, exec.curProcEpoch)
|
|
||||||
|
|
||||||
switch {
|
|
||||||
default:
|
|
||||||
exec.status = statusUndefined
|
|
||||||
exec.err = err
|
|
||||||
|
|
||||||
exec.log.Debug(logs.SearchCouldNotGenerateContainerTraverser,
|
|
||||||
zap.String("error", err.Error()),
|
|
||||||
)
|
|
||||||
|
|
||||||
return nil, false
|
|
||||||
case err == nil:
|
|
||||||
return t, true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (exec *execCtx) writeIDList(ids []oid.ID) {
|
|
||||||
ids = exec.filterAllowedObjectIDs(ids)
|
ids = exec.filterAllowedObjectIDs(ids)
|
||||||
err := exec.prm.writer.WriteIDs(ids)
|
return exec.prm.writer.WriteIDs(ids)
|
||||||
|
|
||||||
switch {
|
|
||||||
default:
|
|
||||||
exec.status = statusUndefined
|
|
||||||
exec.err = err
|
|
||||||
|
|
||||||
exec.log.Debug(logs.SearchCouldNotWriteObjectIdentifiers,
|
|
||||||
zap.String("error", err.Error()),
|
|
||||||
)
|
|
||||||
case err == nil:
|
|
||||||
exec.status = statusOK
|
|
||||||
exec.err = nil
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (exec *execCtx) filterAllowedObjectIDs(objIDs []oid.ID) []oid.ID {
|
func (exec *execCtx) filterAllowedObjectIDs(objIDs []oid.ID) []oid.ID {
|
||||||
|
|
|
@ -2,24 +2,22 @@ package searchsvc
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (exec *execCtx) executeLocal(ctx context.Context) {
|
func (exec *execCtx) executeLocal(ctx context.Context) error {
|
||||||
ids, err := exec.svc.localStorage.search(ctx, exec)
|
ids, err := exec.svc.localStorage.search(ctx, exec)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
exec.status = statusUndefined
|
exec.log.Debug(logs.SearchLocalOperationFailed, zap.String("error", err.Error()))
|
||||||
exec.err = err
|
return err
|
||||||
|
|
||||||
exec.log.Debug(logs.SearchLocalOperationFailed,
|
|
||||||
zap.String("error", err.Error()),
|
|
||||||
)
|
|
||||||
|
|
||||||
return
|
|
||||||
}
|
}
|
||||||
|
|
||||||
exec.writeIDList(ids)
|
if err := exec.writeIDList(ids); err != nil {
|
||||||
|
return fmt.Errorf("%s: %w", logs.SearchCouldNotWriteObjectIdentifiers, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -12,7 +12,7 @@ import (
|
||||||
|
|
||||||
// Prm groups parameters of Get service call.
|
// Prm groups parameters of Get service call.
|
||||||
type Prm struct {
|
type Prm struct {
|
||||||
writer IDListWriter
|
writer *uniqueIDWriter
|
||||||
|
|
||||||
common *util.CommonPrm
|
common *util.CommonPrm
|
||||||
|
|
||||||
|
@ -40,7 +40,7 @@ func (p *Prm) SetCommonParameters(common *util.CommonPrm) {
|
||||||
|
|
||||||
// SetWriter sets target component to write list of object identifiers.
|
// SetWriter sets target component to write list of object identifiers.
|
||||||
func (p *Prm) SetWriter(w IDListWriter) {
|
func (p *Prm) SetWriter(w IDListWriter) {
|
||||||
p.writer = w
|
p.writer = newUniqueAddressWriter(w)
|
||||||
}
|
}
|
||||||
|
|
||||||
// SetRequestForwarder sets callback for forwarding
|
// SetRequestForwarder sets callback for forwarding
|
||||||
|
|
|
@ -14,37 +14,32 @@ func (s *Service) Search(ctx context.Context, prm Prm) error {
|
||||||
prm: prm,
|
prm: prm,
|
||||||
}
|
}
|
||||||
|
|
||||||
exec.prepare()
|
|
||||||
|
|
||||||
exec.setLogger(s.log)
|
exec.setLogger(s.log)
|
||||||
|
|
||||||
exec.execute(ctx)
|
return exec.execute(ctx)
|
||||||
|
|
||||||
return exec.statusError.err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (exec *execCtx) execute(ctx context.Context) {
|
func (exec *execCtx) execute(ctx context.Context) error {
|
||||||
exec.log.Debug(logs.ServingRequest)
|
exec.log.Debug(logs.ServingRequest)
|
||||||
|
|
||||||
// perform local operation
|
err := exec.executeLocal(ctx)
|
||||||
exec.executeLocal(ctx)
|
exec.logResult(err)
|
||||||
|
|
||||||
exec.analyzeStatus(ctx, true)
|
if exec.isLocal() {
|
||||||
|
exec.log.Debug(logs.SearchReturnResultDirectly)
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (exec *execCtx) analyzeStatus(ctx context.Context, execCnr bool) {
|
err = exec.executeOnContainer(ctx)
|
||||||
// analyze local result
|
exec.logResult(err)
|
||||||
switch exec.status {
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (exec *execCtx) logResult(err error) {
|
||||||
|
switch {
|
||||||
default:
|
default:
|
||||||
exec.log.Debug(logs.OperationFinishedWithError,
|
exec.log.Debug(logs.OperationFinishedWithError, zap.String("error", err.Error()))
|
||||||
zap.String("error", exec.err.Error()),
|
case err == nil:
|
||||||
)
|
|
||||||
case statusOK:
|
|
||||||
exec.log.Debug(logs.OperationFinishedSuccessfully)
|
exec.log.Debug(logs.OperationFinishedSuccessfully)
|
||||||
}
|
}
|
||||||
|
|
||||||
if execCnr {
|
|
||||||
exec.executeOnContainer(ctx)
|
|
||||||
exec.analyzeStatus(ctx, false)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -22,6 +22,7 @@ import (
|
||||||
frostfsecdsa "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/crypto/ecdsa"
|
frostfsecdsa "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/crypto/ecdsa"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
|
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
|
||||||
sessionsdk "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session"
|
sessionsdk "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session"
|
||||||
"github.com/google/uuid"
|
"github.com/google/uuid"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
||||||
|
@ -56,10 +57,18 @@ type simpleIDWriter struct {
|
||||||
|
|
||||||
type testEpochReceiver uint64
|
type testEpochReceiver uint64
|
||||||
|
|
||||||
func (e testEpochReceiver) currentEpoch() (uint64, error) {
|
func (e testEpochReceiver) Epoch() (uint64, error) {
|
||||||
return uint64(e), nil
|
return uint64(e), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type errIDWriter struct {
|
||||||
|
err error
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e errIDWriter) WriteIDs(ids []oid.ID) error {
|
||||||
|
return e.err
|
||||||
|
}
|
||||||
|
|
||||||
func (s *simpleIDWriter) WriteIDs(ids []oid.ID) error {
|
func (s *simpleIDWriter) WriteIDs(ids []oid.ID) error {
|
||||||
s.ids = append(s.ids, ids...)
|
s.ids = append(s.ids, ids...)
|
||||||
return nil
|
return nil
|
||||||
|
@ -71,7 +80,7 @@ func newTestStorage() *testStorage {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *testTraverserGenerator) generateTraverser(_ cid.ID, epoch uint64) (*placement.Traverser, error) {
|
func (g *testTraverserGenerator) GenerateTraverser(_ cid.ID, _ *oid.ID, epoch uint64) (*placement.Traverser, error) {
|
||||||
return placement.NewTraverser(
|
return placement.NewTraverser(
|
||||||
placement.ForContainer(g.c),
|
placement.ForContainer(g.c),
|
||||||
placement.UseBuilder(g.b[epoch]),
|
placement.UseBuilder(g.b[epoch]),
|
||||||
|
@ -194,6 +203,20 @@ func TestGetLocalOnly(t *testing.T) {
|
||||||
w := new(simpleIDWriter)
|
w := new(simpleIDWriter)
|
||||||
p := newPrm(cnr, w)
|
p := newPrm(cnr, w)
|
||||||
|
|
||||||
|
err := svc.Search(ctx, p)
|
||||||
|
require.ErrorIs(t, err, testErr)
|
||||||
|
})
|
||||||
|
t.Run("FAIL while writing ID", func(t *testing.T) {
|
||||||
|
storage := newTestStorage()
|
||||||
|
svc := newSvc(storage)
|
||||||
|
|
||||||
|
cnr := cidtest.ID()
|
||||||
|
storage.addResult(cnr, []oid.ID{oidtest.ID()}, nil)
|
||||||
|
|
||||||
|
testErr := errors.New("any error")
|
||||||
|
w := errIDWriter{testErr}
|
||||||
|
p := newPrm(cnr, w)
|
||||||
|
|
||||||
err := svc.Search(ctx, p)
|
err := svc.Search(ctx, p)
|
||||||
require.ErrorIs(t, err, testErr)
|
require.ErrorIs(t, err, testErr)
|
||||||
})
|
})
|
||||||
|
@ -280,7 +303,6 @@ func TestGetRemoteSmall(t *testing.T) {
|
||||||
return p
|
return p
|
||||||
}
|
}
|
||||||
|
|
||||||
t.Run("OK", func(t *testing.T) {
|
|
||||||
var addr oid.Address
|
var addr oid.Address
|
||||||
addr.SetContainer(id)
|
addr.SetContainer(id)
|
||||||
|
|
||||||
|
@ -294,11 +316,9 @@ func TestGetRemoteSmall(t *testing.T) {
|
||||||
|
|
||||||
c1 := newTestStorage()
|
c1 := newTestStorage()
|
||||||
ids1 := generateIDs(10)
|
ids1 := generateIDs(10)
|
||||||
c1.addResult(id, ids1, nil)
|
|
||||||
|
|
||||||
c2 := newTestStorage()
|
c2 := newTestStorage()
|
||||||
ids2 := generateIDs(10)
|
ids2 := generateIDs(10)
|
||||||
c2.addResult(id, ids2, nil)
|
|
||||||
|
|
||||||
svc := newSvc(builder, &testClientCache{
|
svc := newSvc(builder, &testClientCache{
|
||||||
clients: map[string]*testStorage{
|
clients: map[string]*testStorage{
|
||||||
|
@ -307,6 +327,10 @@ func TestGetRemoteSmall(t *testing.T) {
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
|
|
||||||
|
t.Run("OK", func(t *testing.T) {
|
||||||
|
c1.addResult(id, ids1, nil)
|
||||||
|
c2.addResult(id, ids2, nil)
|
||||||
|
|
||||||
w := new(simpleIDWriter)
|
w := new(simpleIDWriter)
|
||||||
|
|
||||||
p := newPrm(id, w)
|
p := newPrm(id, w)
|
||||||
|
@ -319,6 +343,49 @@ func TestGetRemoteSmall(t *testing.T) {
|
||||||
require.Contains(t, w.ids, id)
|
require.Contains(t, w.ids, id)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
t.Run("non-local fail is not a FAIL", func(t *testing.T) {
|
||||||
|
testErr := errors.New("opaque")
|
||||||
|
|
||||||
|
c1.addResult(id, ids1, nil)
|
||||||
|
c2.addResult(id, nil, testErr)
|
||||||
|
|
||||||
|
w := new(simpleIDWriter)
|
||||||
|
p := newPrm(id, w)
|
||||||
|
|
||||||
|
err := svc.Search(ctx, p)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, ids1, w.ids)
|
||||||
|
})
|
||||||
|
t.Run("client init fail is not a FAIL", func(t *testing.T) {
|
||||||
|
svc := newSvc(builder, &testClientCache{
|
||||||
|
clients: map[string]*testStorage{
|
||||||
|
as[0][0]: c1,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
c1.addResult(id, ids1, nil)
|
||||||
|
c2.addResult(id, ids2, nil)
|
||||||
|
|
||||||
|
w := new(simpleIDWriter)
|
||||||
|
p := newPrm(id, w)
|
||||||
|
|
||||||
|
err := svc.Search(ctx, p)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, ids1, w.ids)
|
||||||
|
})
|
||||||
|
t.Run("context is respected", func(t *testing.T) {
|
||||||
|
c1.addResult(id, ids1, nil)
|
||||||
|
c2.addResult(id, ids2, nil)
|
||||||
|
|
||||||
|
w := new(simpleIDWriter)
|
||||||
|
p := newPrm(id, w)
|
||||||
|
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
cancel()
|
||||||
|
|
||||||
|
err := svc.Search(ctx, p)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Empty(t, w.ids)
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestGetFromPastEpoch(t *testing.T) {
|
func TestGetFromPastEpoch(t *testing.T) {
|
||||||
|
|
|
@ -45,11 +45,11 @@ type cfg struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
traverserGenerator interface {
|
traverserGenerator interface {
|
||||||
generateTraverser(cid.ID, uint64) (*placement.Traverser, error)
|
GenerateTraverser(cid.ID, *oid.ID, uint64) (*placement.Traverser, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
currentEpochReceiver interface {
|
currentEpochReceiver interface {
|
||||||
currentEpoch() (uint64, error)
|
Epoch() (uint64, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
keyStore *util.KeyStorage
|
keyStore *util.KeyStorage
|
||||||
|
@ -71,10 +71,8 @@ func New(e *engine.StorageEngine,
|
||||||
localStorage: &storageEngineWrapper{
|
localStorage: &storageEngineWrapper{
|
||||||
storage: e,
|
storage: e,
|
||||||
},
|
},
|
||||||
traverserGenerator: (*traverseGeneratorWrapper)(tg),
|
traverserGenerator: tg,
|
||||||
currentEpochReceiver: &nmSrcWrapper{
|
currentEpochReceiver: ns,
|
||||||
nmSrc: ns,
|
|
||||||
},
|
|
||||||
keyStore: ks,
|
keyStore: ks,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -5,12 +5,9 @@ import (
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
|
||||||
internalclient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal/client"
|
internalclient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal/client"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
|
|
||||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -34,13 +31,10 @@ type storageEngineWrapper struct {
|
||||||
storage *engine.StorageEngine
|
storage *engine.StorageEngine
|
||||||
}
|
}
|
||||||
|
|
||||||
type traverseGeneratorWrapper util.TraverserGenerator
|
func newUniqueAddressWriter(w IDListWriter) *uniqueIDWriter {
|
||||||
|
if w, ok := w.(*uniqueIDWriter); ok {
|
||||||
type nmSrcWrapper struct {
|
return w
|
||||||
nmSrc netmap.Source
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func newUniqueAddressWriter(w IDListWriter) IDListWriter {
|
|
||||||
return &uniqueIDWriter{
|
return &uniqueIDWriter{
|
||||||
written: make(map[oid.ID]struct{}),
|
written: make(map[oid.ID]struct{}),
|
||||||
writer: w,
|
writer: w,
|
||||||
|
@ -139,11 +133,3 @@ func idsFromAddresses(addrs []oid.Address) []oid.ID {
|
||||||
|
|
||||||
return ids
|
return ids
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *traverseGeneratorWrapper) generateTraverser(cnr cid.ID, epoch uint64) (*placement.Traverser, error) {
|
|
||||||
return (*util.TraverserGenerator)(e).GenerateTraverser(cnr, nil, epoch)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (n *nmSrcWrapper) currentEpoch() (uint64, error) {
|
|
||||||
return n.nmSrc.Epoch()
|
|
||||||
}
|
|
||||||
|
|
Loading…
Add table
Reference in a new issue