frostfs-node/pkg/services/object/search/util.go
Leonard Lyubich 1e170c3812 [#234] services/object: Support netmap epoch and lookup dead in read ops
Support processing of NetmapEpoch and NetmapLookupDepth X-headers when
processing object read operations. Placement for operations
Get/Head/GetRange/GetRangeHash/Search is built for the epoch specified in
NetmapEpoch X-header (by default latest). Also the specified operations are
processed until success is achieved for network maps from the past up to
NetmapLookupDepth value. Behavior for default values (zero or missing) left
unchanged.

Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
2021-01-12 19:19:11 +03:00

119 lines
2.7 KiB
Go

package searchsvc
import (
"crypto/ecdsa"
"sync"
"github.com/nspcc-dev/neofs-api-go/pkg/client"
"github.com/nspcc-dev/neofs-api-go/pkg/container"
objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object"
"github.com/nspcc-dev/neofs-node/pkg/core/netmap"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine"
"github.com/nspcc-dev/neofs-node/pkg/network/cache"
"github.com/nspcc-dev/neofs-node/pkg/services/object/util"
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement"
)
type uniqueIDWriter struct {
mtx sync.Mutex
written map[string]struct{}
writer IDListWriter
}
type clientCacheWrapper struct {
cache *cache.ClientCache
opts []client.Option
}
type clientWrapper struct {
client *client.Client
}
type storageEngineWrapper engine.StorageEngine
type traverseGeneratorWrapper util.TraverserGenerator
type nmSrcWrapper struct {
nmSrc netmap.Source
}
func newUniqueAddressWriter(w IDListWriter) IDListWriter {
return &uniqueIDWriter{
written: make(map[string]struct{}),
writer: w,
}
}
func (w *uniqueIDWriter) WriteIDs(list []*objectSDK.ID) error {
w.mtx.Lock()
for i := 0; i < len(list); i++ { // don't use range, slice mutates in body
s := list[i].String()
// standard stringer is quite costly, it is better
// to facilitate the calculation of the key
if _, ok := w.written[s]; !ok {
// mark address as processed
w.written[s] = struct{}{}
continue
}
// exclude processed address
list = append(list[:i], list[i+1:]...)
i--
}
w.mtx.Unlock()
return w.writer.WriteIDs(list)
}
func (c *clientCacheWrapper) get(key *ecdsa.PrivateKey, addr string) (searchClient, error) {
clt, err := c.cache.Get(key, addr, c.opts...)
return &clientWrapper{
client: clt,
}, err
}
func (c *clientWrapper) searchObjects(exec *execCtx) ([]*objectSDK.ID, error) {
return c.client.SearchObject(exec.context(),
exec.remotePrm(),
exec.callOptions()...)
}
func (e *storageEngineWrapper) search(exec *execCtx) ([]*objectSDK.ID, error) {
r, err := (*engine.StorageEngine)(e).Select(new(engine.SelectPrm).
WithFilters(exec.searchFilters()).
WithContainerID(exec.containerID()),
)
if err != nil {
return nil, err
}
return idsFromAddresses(r.AddressList()), nil
}
func idsFromAddresses(addrs []*objectSDK.Address) []*objectSDK.ID {
ids := make([]*objectSDK.ID, len(addrs))
for i := range addrs {
ids[i] = addrs[i].ObjectID()
}
return ids
}
func (e *traverseGeneratorWrapper) generateTraverser(cid *container.ID, epoch uint64) (*placement.Traverser, error) {
a := objectSDK.NewAddress()
a.SetContainerID(cid)
return (*util.TraverserGenerator)(e).GenerateTraverser(a, epoch)
}
func (n *nmSrcWrapper) currentEpoch() (uint64, error) {
return n.nmSrc.Epoch()
}