WIP: Process EC container for Get/GetRange/Head concurrently #1237
8 changed files with 28 additions and 30 deletions
|
@ -58,21 +58,11 @@ func (r *request) processCurrentEpoch(ctx context.Context) bool {
|
|||
zap.Uint64("number", r.curProcEpoch),
|
||||
)
|
||||
|
||||
traverser, ok := r.generateTraverser(r.address())
|
||||
traverser, cnr, ok := r.generateTraverser(r.address())
|
||||
if !ok {
|
||||
return true
|
||||
}
|
||||
|
||||
// In most cases, the value from the cache will be returned.
|
||||
// Container appears in the cache when traverser is generated.
|
||||
cnr, err := r.containerSource.Get(r.address().Container())
|
||||
if err != nil {
|
||||
r.status = statusUndefined
|
||||
r.err = err
|
||||
r.log.Debug(logs.GetCouldNotGetContainer, zap.Error(err))
|
||||
return true
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
|
|
|
@ -79,7 +79,7 @@ func newTestStorage() *testStorage {
|
|||
}
|
||||
}
|
||||
|
||||
func (g *testTraverserGenerator) GenerateTraverser(cnr cid.ID, obj *oid.ID, e uint64) (*placement.Traverser, error) {
|
||||
func (g *testTraverserGenerator) GenerateTraverser(cnr cid.ID, obj *oid.ID, e uint64) (*placement.Traverser, *containerCore.Container, error) {
|
||||
opts := make([]placement.Option, 0, 4)
|
||||
opts = append(opts,
|
||||
placement.ForContainer(g.c),
|
||||
|
@ -91,7 +91,10 @@ func (g *testTraverserGenerator) GenerateTraverser(cnr cid.ID, obj *oid.ID, e ui
|
|||
opts = append(opts, placement.ForObject(*obj))
|
||||
}
|
||||
|
||||
return placement.NewTraverser(opts...)
|
||||
t, err := placement.NewTraverser(opts...)
|
||||
return t, &containerCore.Container{
|
||||
Value: g.c,
|
||||
}, err
|
||||
}
|
||||
|
||||
func (p *testPlacementBuilder) BuildPlacement(cnr cid.ID, obj *oid.ID, _ netmap.PlacementPolicy) ([][]netmap.NodeInfo, error) {
|
||||
|
|
|
@ -138,22 +138,19 @@ func (r *request) initEpoch() bool {
|
|||
}
|
||||
}
|
||||
|
||||
func (r *request) generateTraverser(addr oid.Address) (*placement.Traverser, bool) {
|
||||
func (r *request) generateTraverser(addr oid.Address) (*placement.Traverser, *container.Container, bool) {
|
||||
obj := addr.Object()
|
||||
|
||||
t, err := r.traverserGenerator.GenerateTraverser(addr.Container(), &obj, r.curProcEpoch)
|
||||
|
||||
switch {
|
||||
default:
|
||||
t, cnr, err := r.traverserGenerator.GenerateTraverser(addr.Container(), &obj, r.curProcEpoch)
|
||||
if err != nil {
|
||||
r.status = statusUndefined
|
||||
r.err = err
|
||||
|
||||
r.log.Debug(logs.GetCouldNotGenerateContainerTraverser, zap.Error(err))
|
||||
|
||||
return nil, false
|
||||
case err == nil:
|
||||
return t, true
|
||||
return nil, nil, false
|
||||
}
|
||||
return t, cnr, true
|
||||
}
|
||||
|
||||
func (r *request) getRemoteStorage(info clientcore.NodeInfo) (remoteStorage, bool) {
|
||||
|
|
|
@ -6,6 +6,7 @@ import (
|
|||
"errors"
|
||||
|
||||
coreclient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
|
||||
internalclient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal/client"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
|
||||
|
@ -23,7 +24,7 @@ type epochSource interface {
|
|||
}
|
||||
|
||||
type traverserGenerator interface {
|
||||
GenerateTraverser(cid.ID, *oid.ID, uint64) (*placement.Traverser, error)
|
||||
GenerateTraverser(cid.ID, *oid.ID, uint64) (*placement.Traverser, *container.Container, error)
|
||||
}
|
||||
|
||||
type keyStorage interface {
|
||||
|
|
|
@ -47,7 +47,7 @@ func (exec *execCtx) processCurrentEpoch(ctx context.Context) error {
|
|||
zap.Uint64("number", exec.curProcEpoch),
|
||||
)
|
||||
|
||||
traverser, err := exec.svc.traverserGenerator.GenerateTraverser(exec.containerID(), nil, exec.curProcEpoch)
|
||||
traverser, _, err := exec.svc.traverserGenerator.GenerateTraverser(exec.containerID(), nil, exec.curProcEpoch)
|
||||
if err != nil {
|
||||
return fmt.Errorf("%s: %w", logs.SearchCouldNotGenerateContainerTraverser, err)
|
||||
}
|
||||
|
|
|
@ -11,6 +11,7 @@ import (
|
|||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/session"
|
||||
clientcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
|
||||
containerCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
||||
netmapcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
|
||||
|
@ -80,12 +81,13 @@ func newTestStorage() *testStorage {
|
|||
}
|
||||
}
|
||||
|
||||
func (g *testTraverserGenerator) GenerateTraverser(_ cid.ID, _ *oid.ID, epoch uint64) (*placement.Traverser, error) {
|
||||
return placement.NewTraverser(
|
||||
func (g *testTraverserGenerator) GenerateTraverser(_ cid.ID, _ *oid.ID, epoch uint64) (*placement.Traverser, *containerCore.Container, error) {
|
||||
t, err := placement.NewTraverser(
|
||||
placement.ForContainer(g.c),
|
||||
placement.UseBuilder(g.b[epoch]),
|
||||
placement.WithoutSuccessTracking(),
|
||||
)
|
||||
return t, &containerCore.Container{Value: g.c}, err
|
||||
}
|
||||
|
||||
func (p *testPlacementBuilder) BuildPlacement(cnr cid.ID, obj *oid.ID, _ netmap.PlacementPolicy) ([][]netmap.NodeInfo, error) {
|
||||
|
|
|
@ -4,6 +4,7 @@ import (
|
|||
"context"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
|
||||
|
@ -45,7 +46,7 @@ type cfg struct {
|
|||
}
|
||||
|
||||
traverserGenerator interface {
|
||||
GenerateTraverser(cid.ID, *oid.ID, uint64) (*placement.Traverser, error)
|
||||
GenerateTraverser(cid.ID, *oid.ID, uint64) (*placement.Traverser, *container.Container, error)
|
||||
}
|
||||
|
||||
currentEpochReceiver interface {
|
||||
|
|
|
@ -122,17 +122,17 @@ func (g *TraverserGenerator) WithTraverseOptions(opts ...placement.Option) *Trav
|
|||
|
||||
// GenerateTraverser generates placement Traverser for provided object address
|
||||
// using epoch-th network map.
|
||||
func (g *TraverserGenerator) GenerateTraverser(idCnr cid.ID, idObj *oid.ID, epoch uint64) (*placement.Traverser, error) {
|
||||
func (g *TraverserGenerator) GenerateTraverser(idCnr cid.ID, idObj *oid.ID, epoch uint64) (*placement.Traverser, *container.Container, error) {
|
||||
// get network map by epoch
|
||||
nm, err := g.netMapSrc.GetNetMapByEpoch(epoch)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not get network map #%d: %w", epoch, err)
|
||||
return nil, nil, fmt.Errorf("could not get network map #%d: %w", epoch, err)
|
||||
}
|
||||
|
||||
// get container related container
|
||||
cnr, err := g.cnrSrc.Get(idCnr)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not get container: %w", err)
|
||||
return nil, nil, fmt.Errorf("could not get container: %w", err)
|
||||
}
|
||||
|
||||
// allocate placement traverser options
|
||||
|
@ -160,5 +160,9 @@ func (g *TraverserGenerator) GenerateTraverser(idCnr cid.ID, idObj *oid.ID, epoc
|
|||
)
|
||||
}
|
||||
|
||||
return placement.NewTraverser(traverseOpts...)
|
||||
t, err := placement.NewTraverser(traverseOpts...)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
return t, cnr, nil
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue