frostfs-node/pkg/services/object/search/container.go
Alexander Chuprov 9b113c3156
Some checks failed
DCO action / DCO (pull_request) Successful in 59s
Vulncheck / Vulncheck (pull_request) Successful in 1m4s
Pre-commit hooks / Pre-commit (pull_request) Successful in 1m55s
Build / Build Components (pull_request) Successful in 2m4s
Tests and linters / Staticcheck (pull_request) Successful in 2m38s
Tests and linters / Lint (pull_request) Successful in 3m16s
Tests and linters / Run gofumpt (pull_request) Successful in 3m54s
Tests and linters / Tests (pull_request) Successful in 4m12s
Tests and linters / gopls check (pull_request) Successful in 4m31s
Tests and linters / Tests with -race (pull_request) Successful in 4m38s
OCI image / Build container images (push) Failing after 18s
Vulncheck / Vulncheck (push) Successful in 1m2s
Pre-commit hooks / Pre-commit (push) Successful in 1m39s
Build / Build Components (push) Successful in 1m45s
Tests and linters / Staticcheck (push) Successful in 2m18s
Tests and linters / Run gofumpt (push) Successful in 2m46s
Tests and linters / Lint (push) Successful in 3m5s
Tests and linters / Tests with -race (push) Successful in 3m23s
Tests and linters / Tests (push) Successful in 3m52s
Tests and linters / gopls check (push) Successful in 4m18s
[#1613] morph: Add tracing for morph queries to neo-go
Signed-off-by: Alexander Chuprov <a.chuprov@yadro.com>
2025-02-05 16:38:20 +03:00

124 lines
2.7 KiB
Go

package searchsvc
import (
"context"
"encoding/hex"
"fmt"
"sync"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
"go.uber.org/zap"
)
func (exec *execCtx) executeOnContainer(ctx context.Context) error {
lookupDepth := exec.netmapLookupDepth()
exec.log.Debug(ctx, logs.TryingToExecuteInContainer,
zap.Uint64("netmap lookup depth", lookupDepth),
)
// initialize epoch number
if err := exec.initEpoch(ctx); err != nil {
return fmt.Errorf("%s: %w", logs.CouldNotGetCurrentEpochNumber, err)
}
for {
if err := exec.processCurrentEpoch(ctx); err != nil {
break
}
// check the maximum depth has been reached
if lookupDepth == 0 {
break
}
lookupDepth--
// go to the previous epoch
exec.curProcEpoch--
}
return nil
}
func (exec *execCtx) processCurrentEpoch(ctx context.Context) error {
exec.log.Debug(ctx, logs.ProcessEpoch,
zap.Uint64("number", exec.curProcEpoch),
)
traverser, _, err := exec.svc.traverserGenerator.GenerateTraverser(ctx, exec.containerID(), nil, exec.curProcEpoch)
if err != nil {
return fmt.Errorf("%s: %w", logs.SearchCouldNotGenerateContainerTraverser, err)
}
ctx, cancel := context.WithCancel(ctx)
defer cancel()
for {
addrs := traverser.Next()
if len(addrs) == 0 {
exec.log.Debug(ctx, logs.NoMoreNodesAbortPlacementIteration)
break
}
var wg sync.WaitGroup
var mtx sync.Mutex
for i := range addrs {
wg.Add(1)
go func(i int) {
defer wg.Done()
select {
case <-ctx.Done():
exec.log.Debug(ctx, logs.InterruptPlacementIterationByContext,
zap.Error(ctx.Err()))
return
default:
}
var info client.NodeInfo
client.NodeInfoFromNetmapElement(&info, addrs[i])
exec.log.Debug(ctx, logs.ProcessingNode, zap.String("key", hex.EncodeToString(addrs[i].PublicKey())))
c, err := exec.svc.clientConstructor.get(info)
if err != nil {
exec.log.Debug(ctx, logs.SearchCouldNotConstructRemoteNodeClient, zap.Error(err))
return
}
ids, err := c.searchObjects(ctx, exec, info)
if err != nil {
exec.log.Debug(ctx, logs.SearchRemoteOperationFailed,
zap.Error(err))
return
}
mtx.Lock()
err = exec.writeIDList(ids)
mtx.Unlock()
if err != nil {
exec.log.Debug(ctx, logs.SearchCouldNotWriteObjectIdentifiers, zap.Error(err))
return
}
}(i)
}
wg.Wait()
}
return nil
}
func (exec *execCtx) getContainer(ctx context.Context) (containerSDK.Container, error) {
cnrID := exec.containerID()
cnr, err := exec.svc.containerSource.Get(ctx, cnrID)
if err != nil {
return containerSDK.Container{}, err
}
return cnr.Value, nil
}