package searchsvc import ( "context" "sync" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine" internalclient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal/client" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" ) type uniqueIDWriter struct { mtx sync.Mutex written map[oid.ID]struct{} writer IDListWriter } type clientConstructorWrapper struct { constructor ClientConstructor } type clientWrapper struct { client client.MultiAddressClient } type storageEngineWrapper struct { storage *engine.StorageEngine } type traverseGeneratorWrapper util.TraverserGenerator type nmSrcWrapper struct { nmSrc netmap.Source } func newUniqueAddressWriter(w IDListWriter) IDListWriter { return &uniqueIDWriter{ written: make(map[oid.ID]struct{}), writer: w, } } func (w *uniqueIDWriter) WriteIDs(list []oid.ID) error { w.mtx.Lock() for i := 0; i < len(list); i++ { // don't use range, slice mutates in body if _, ok := w.written[list[i]]; !ok { // mark address as processed w.written[list[i]] = struct{}{} continue } // exclude processed address list = append(list[:i], list[i+1:]...) i-- } w.mtx.Unlock() return w.writer.WriteIDs(list) } func (c *clientConstructorWrapper) get(info client.NodeInfo) (searchClient, error) { clt, err := c.constructor.Get(info) if err != nil { return nil, err } return &clientWrapper{ client: clt, }, nil } func (c *clientWrapper) searchObjects(ctx context.Context, exec *execCtx, info client.NodeInfo) ([]oid.ID, error) { if exec.prm.forwarder != nil { return exec.prm.forwarder(ctx, info, c.client) } var sessionInfo *util.SessionInfo if tok := exec.prm.common.SessionToken(); tok != nil { sessionInfo = &util.SessionInfo{ ID: tok.ID(), Owner: tok.Issuer(), } } key, err := exec.svc.keyStore.GetKey(sessionInfo) if err != nil { return nil, err } var prm internalclient.SearchObjectsPrm prm.SetClient(c.client) prm.SetPrivateKey(key) prm.SetSessionToken(exec.prm.common.SessionToken()) prm.SetBearerToken(exec.prm.common.BearerToken()) prm.SetTTL(exec.prm.common.TTL()) prm.SetXHeaders(exec.prm.common.XHeaders()) prm.SetNetmapEpoch(exec.curProcEpoch) prm.SetContainerID(exec.containerID()) prm.SetFilters(exec.searchFilters()) res, err := internalclient.SearchObjects(ctx, prm) if err != nil { return nil, err } return res.IDList(), nil } func (e *storageEngineWrapper) search(ctx context.Context, exec *execCtx) ([]oid.ID, error) { var selectPrm engine.SelectPrm selectPrm.WithFilters(exec.searchFilters()) selectPrm.WithContainerID(exec.containerID()) r, err := e.storage.Select(ctx, selectPrm) if err != nil { return nil, err } return idsFromAddresses(r.AddressList()), nil } func idsFromAddresses(addrs []oid.Address) []oid.ID { ids := make([]oid.ID, len(addrs)) for i := range addrs { ids[i] = addrs[i].Object() } return ids } func (e *traverseGeneratorWrapper) generateTraverser(cnr cid.ID, epoch uint64) (*placement.Traverser, error) { return (*util.TraverserGenerator)(e).GenerateTraverser(cnr, nil, epoch) } func (n *nmSrcWrapper) currentEpoch() (uint64, error) { return n.nmSrc.Epoch() }