frostfs-node/pkg/services/object/search/container.go

122 lines
2.3 KiB
Go
Raw Normal View History

package searchsvc
import (
"context"
"encoding/hex"
"sync"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
"go.uber.org/zap"
)
func (exec *execCtx) executeOnContainer(ctx context.Context) {
if exec.isLocal() {
exec.log.Debug(logs.SearchReturnResultDirectly)
return
}
lookupDepth := exec.netmapLookupDepth()
exec.log.Debug(logs.SearchTryingToExecuteInContainer,
zap.Uint64("netmap lookup depth", lookupDepth),
)
// initialize epoch number
ok := exec.initEpoch()
if !ok {
return
}
for {
if exec.processCurrentEpoch(ctx) {
break
}
// check the maximum depth has been reached
if lookupDepth == 0 {
break
}
lookupDepth--
// go to the previous epoch
exec.curProcEpoch--
}
exec.status = statusOK
exec.err = nil
}
func (exec *execCtx) processCurrentEpoch(ctx context.Context) bool {
exec.log.Debug(logs.SearchProcessEpoch,
zap.Uint64("number", exec.curProcEpoch),
)
traverser, ok := exec.generateTraverser(exec.containerID())
if !ok {
return true
}
ctx, cancel := context.WithCancel(ctx)
defer cancel()
for {
addrs := traverser.Next()
if len(addrs) == 0 {
exec.log.Debug(logs.SearchNoMoreNodesAbortPlacementIteration)
break
}
var wg sync.WaitGroup
var mtx sync.Mutex
for i := range addrs {
wg.Add(1)
go func(i int) {
defer wg.Done()
select {
case <-ctx.Done():
exec.log.Debug(logs.SearchInterruptPlacementIterationByContext,
zap.String("error", ctx.Err().Error()))
return
default:
}
var info client.NodeInfo
client.NodeInfoFromNetmapElement(&info, addrs[i])
exec.log.Debug(logs.SearchProcessingNode, zap.String("key", hex.EncodeToString(addrs[i].PublicKey())))
c, err := exec.svc.clientConstructor.get(info)
if err != nil {
mtx.Lock()
exec.status = statusUndefined
exec.err = err
mtx.Unlock()
exec.log.Debug(logs.SearchCouldNotConstructRemoteNodeClient)
return
}
ids, err := c.searchObjects(ctx, exec, info)
if err != nil {
exec.log.Debug(logs.SearchRemoteOperationFailed,
zap.String("error", err.Error()))
return
}
mtx.Lock()
exec.writeIDList(ids)
mtx.Unlock()
}(i)
}
wg.Wait()
}
return false
}