frostfs-node/pkg/services/object/search/util.go

150 lines
3.5 KiB
Go
Raw Permalink Normal View History

package searchsvc
import (
"context"
"sync"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
internalclient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
)
type uniqueIDWriter struct {
mtx sync.Mutex
written map[oid.ID]struct{}
writer IDListWriter
}
type clientConstructorWrapper struct {
constructor ClientConstructor
}
type clientWrapper struct {
client client.MultiAddressClient
}
type storageEngineWrapper struct {
storage *engine.StorageEngine
}
type traverseGeneratorWrapper util.TraverserGenerator
type nmSrcWrapper struct {
nmSrc netmap.Source
}
func newUniqueAddressWriter(w IDListWriter) IDListWriter {
return &uniqueIDWriter{
written: make(map[oid.ID]struct{}),
writer: w,
}
}
func (w *uniqueIDWriter) WriteIDs(list []oid.ID) error {
w.mtx.Lock()
for i := 0; i < len(list); i++ { // don't use range, slice mutates in body
if _, ok := w.written[list[i]]; !ok {
// mark address as processed
w.written[list[i]] = struct{}{}
continue
}
// exclude processed address
list = append(list[:i], list[i+1:]...)
i--
}
w.mtx.Unlock()
return w.writer.WriteIDs(list)
}
func (c *clientConstructorWrapper) get(info client.NodeInfo) (searchClient, error) {
clt, err := c.constructor.Get(info)
if err != nil {
return nil, err
}
return &clientWrapper{
client: clt,
}, nil
}
func (c *clientWrapper) searchObjects(ctx context.Context, exec *execCtx, info client.NodeInfo) ([]oid.ID, error) {
if exec.prm.forwarder != nil {
return exec.prm.forwarder(ctx, info, c.client)
}
var sessionInfo *util.SessionInfo
if tok := exec.prm.common.SessionToken(); tok != nil {
sessionInfo = &util.SessionInfo{
ID: tok.ID(),
Owner: tok.Issuer(),
}
}
key, err := exec.svc.keyStore.GetKey(sessionInfo)
if err != nil {
return nil, err
}
var prm internalclient.SearchObjectsPrm
prm.SetClient(c.client)
prm.SetPrivateKey(key)
prm.SetSessionToken(exec.prm.common.SessionToken())
prm.SetBearerToken(exec.prm.common.BearerToken())
prm.SetTTL(exec.prm.common.TTL())
prm.SetXHeaders(exec.prm.common.XHeaders())
prm.SetNetmapEpoch(exec.curProcEpoch)
prm.SetContainerID(exec.containerID())
prm.SetFilters(exec.searchFilters())
res, err := internalclient.SearchObjects(ctx, prm)
if err != nil {
return nil, err
}
return res.IDList(), nil
}
func (e *storageEngineWrapper) search(exec *execCtx) ([]oid.ID, error) {
var selectPrm engine.SelectPrm
selectPrm.WithFilters(exec.searchFilters())
selectPrm.WithContainerID(exec.containerID())
r, err := e.storage.Select(selectPrm)
if err != nil {
return nil, err
}
return idsFromAddresses(r.AddressList()), nil
}
func idsFromAddresses(addrs []oid.Address) []oid.ID {
ids := make([]oid.ID, len(addrs))
for i := range addrs {
ids[i] = addrs[i].Object()
}
return ids
}
func (e *traverseGeneratorWrapper) generateTraverser(cnr cid.ID, epoch uint64) (*placement.Traverser, error) {
return (*util.TraverserGenerator)(e).GenerateTraverser(cnr, nil, epoch)
}
func (n *nmSrcWrapper) currentEpoch() (uint64, error) {
return n.nmSrc.Epoch()
}