2020-11-17 12:26:03 +00:00
|
|
|
package engine
|
|
|
|
|
|
|
|
import (
|
2020-12-10 14:12:26 +00:00
|
|
|
"errors"
|
|
|
|
|
|
|
|
meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase"
|
2020-11-17 12:26:03 +00:00
|
|
|
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard"
|
2021-11-10 07:08:33 +00:00
|
|
|
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
|
|
|
|
"github.com/nspcc-dev/neofs-sdk-go/object"
|
2020-11-17 12:26:03 +00:00
|
|
|
"go.uber.org/zap"
|
|
|
|
)
|
|
|
|
|
|
|
|
// SelectPrm groups the parameters of Select operation.
|
|
|
|
type SelectPrm struct {
|
2021-05-31 11:03:17 +00:00
|
|
|
cid *cid.ID
|
2020-11-17 12:26:03 +00:00
|
|
|
filters object.SearchFilters
|
|
|
|
}
|
|
|
|
|
|
|
|
// SelectRes groups resulting values of Select operation.
|
|
|
|
type SelectRes struct {
|
|
|
|
addrList []*object.Address
|
|
|
|
}
|
|
|
|
|
2020-12-10 13:24:27 +00:00
|
|
|
// WithContainerID is a Select option to set the container id to search in.
|
2021-05-31 11:03:17 +00:00
|
|
|
func (p *SelectPrm) WithContainerID(cid *cid.ID) *SelectPrm {
|
2020-12-10 13:24:27 +00:00
|
|
|
if p != nil {
|
|
|
|
p.cid = cid
|
|
|
|
}
|
|
|
|
|
|
|
|
return p
|
|
|
|
}
|
|
|
|
|
2020-11-17 12:26:03 +00:00
|
|
|
// WithFilters is a Select option to set the object filters.
|
2020-11-18 12:45:15 +00:00
|
|
|
func (p *SelectPrm) WithFilters(fs object.SearchFilters) *SelectPrm {
|
2020-11-17 12:26:03 +00:00
|
|
|
if p != nil {
|
|
|
|
p.filters = fs
|
|
|
|
}
|
|
|
|
|
|
|
|
return p
|
|
|
|
}
|
|
|
|
|
|
|
|
// AddressList returns list of addresses of the selected objects.
|
2020-11-18 12:45:15 +00:00
|
|
|
func (r *SelectRes) AddressList() []*object.Address {
|
2020-11-17 12:26:03 +00:00
|
|
|
return r.addrList
|
|
|
|
}
|
|
|
|
|
|
|
|
// Select selects the objects from local storage that match select parameters.
|
|
|
|
//
|
|
|
|
// Returns any error encountered that did not allow to completely select the objects.
|
2021-11-09 15:46:12 +00:00
|
|
|
//
|
|
|
|
// Returns an error if executions are blocked (see BlockExecution).
|
|
|
|
func (e *StorageEngine) Select(prm *SelectPrm) (res *SelectRes, err error) {
|
2021-11-10 15:00:30 +00:00
|
|
|
err = e.execIfNotBlocked(func() error {
|
2021-11-09 15:46:12 +00:00
|
|
|
res, err = e._select(prm)
|
|
|
|
return err
|
|
|
|
})
|
|
|
|
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
func (e *StorageEngine) _select(prm *SelectPrm) (*SelectRes, error) {
|
2021-03-16 08:14:56 +00:00
|
|
|
if e.metrics != nil {
|
|
|
|
defer elapsed(e.metrics.AddSearchDuration)()
|
2021-03-15 13:09:27 +00:00
|
|
|
}
|
|
|
|
|
2020-11-17 12:26:03 +00:00
|
|
|
addrList := make([]*object.Address, 0)
|
2020-12-01 10:54:29 +00:00
|
|
|
uniqueMap := make(map[string]struct{})
|
2020-11-17 12:26:03 +00:00
|
|
|
|
2020-12-10 14:12:26 +00:00
|
|
|
var outError error
|
|
|
|
|
2020-11-17 12:26:03 +00:00
|
|
|
shPrm := new(shard.SelectPrm).
|
2020-12-10 13:24:27 +00:00
|
|
|
WithContainerID(prm.cid).
|
2020-11-17 12:26:03 +00:00
|
|
|
WithFilters(prm.filters)
|
|
|
|
|
2020-12-01 10:54:29 +00:00
|
|
|
e.iterateOverUnsortedShards(func(sh *shard.Shard) (stop bool) {
|
2020-11-17 12:26:03 +00:00
|
|
|
res, err := sh.Select(shPrm)
|
|
|
|
if err != nil {
|
2020-12-10 14:12:26 +00:00
|
|
|
switch {
|
|
|
|
case errors.Is(err, meta.ErrMissingContainerID): // should never happen
|
|
|
|
e.log.Error("missing container ID parameter")
|
|
|
|
outError = err
|
|
|
|
|
|
|
|
return true
|
|
|
|
default:
|
|
|
|
// TODO: smth wrong with shard, need to be processed
|
|
|
|
e.log.Warn("could not select objects from shard",
|
|
|
|
zap.Stringer("shard", sh.ID()),
|
|
|
|
zap.String("error", err.Error()),
|
|
|
|
)
|
|
|
|
return false
|
|
|
|
}
|
2020-11-17 12:26:03 +00:00
|
|
|
} else {
|
2020-12-01 10:54:29 +00:00
|
|
|
for _, addr := range res.AddressList() { // save only unique values
|
|
|
|
if _, ok := uniqueMap[addr.String()]; !ok {
|
|
|
|
uniqueMap[addr.String()] = struct{}{}
|
|
|
|
addrList = append(addrList, addr)
|
|
|
|
}
|
|
|
|
}
|
2020-11-17 12:26:03 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
return false
|
|
|
|
})
|
|
|
|
|
|
|
|
return &SelectRes{
|
|
|
|
addrList: addrList,
|
2020-12-10 14:12:26 +00:00
|
|
|
}, outError
|
2020-11-17 12:26:03 +00:00
|
|
|
}
|
2020-11-19 08:15:49 +00:00
|
|
|
|
2020-12-03 12:01:45 +00:00
|
|
|
// List returns `limit` available physically storage object addresses in engine.
|
|
|
|
// If limit is zero, then returns all available object addresses.
|
2021-11-09 15:46:12 +00:00
|
|
|
//
|
|
|
|
// Returns an error if executions are blocked (see BlockExecution).
|
|
|
|
func (e *StorageEngine) List(limit uint64) (res *SelectRes, err error) {
|
2021-11-10 15:00:30 +00:00
|
|
|
err = e.execIfNotBlocked(func() error {
|
2021-11-09 15:46:12 +00:00
|
|
|
res, err = e.list(limit)
|
|
|
|
return err
|
|
|
|
})
|
|
|
|
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
func (e *StorageEngine) list(limit uint64) (*SelectRes, error) {
|
2021-03-16 08:14:56 +00:00
|
|
|
if e.metrics != nil {
|
|
|
|
defer elapsed(e.metrics.AddListObjectsDuration)()
|
2021-03-15 13:09:27 +00:00
|
|
|
}
|
|
|
|
|
2021-08-24 10:51:00 +00:00
|
|
|
addrList := make([]*object.Address, 0, limit)
|
2020-12-03 12:01:45 +00:00
|
|
|
uniqueMap := make(map[string]struct{})
|
|
|
|
ln := uint64(0)
|
|
|
|
|
|
|
|
// consider iterating over shuffled shards
|
|
|
|
e.iterateOverUnsortedShards(func(sh *shard.Shard) (stop bool) {
|
|
|
|
res, err := sh.List() // consider limit result of shard iterator
|
|
|
|
if err != nil {
|
|
|
|
// TODO: smth wrong with shard, need to be processed
|
|
|
|
e.log.Warn("could not select objects from shard",
|
|
|
|
zap.Stringer("shard", sh.ID()),
|
|
|
|
zap.String("error", err.Error()),
|
|
|
|
)
|
|
|
|
} else {
|
|
|
|
for _, addr := range res.AddressList() { // save only unique values
|
|
|
|
if _, ok := uniqueMap[addr.String()]; !ok {
|
|
|
|
uniqueMap[addr.String()] = struct{}{}
|
|
|
|
addrList = append(addrList, addr)
|
|
|
|
|
|
|
|
ln++
|
|
|
|
if limit > 0 && ln >= limit {
|
|
|
|
return true
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return false
|
|
|
|
})
|
|
|
|
|
|
|
|
return &SelectRes{
|
|
|
|
addrList: addrList,
|
|
|
|
}, nil
|
|
|
|
}
|
|
|
|
|
2020-11-19 08:15:49 +00:00
|
|
|
// Select selects objects from local storage using provided filters.
|
2021-05-31 11:03:17 +00:00
|
|
|
func Select(storage *StorageEngine, cid *cid.ID, fs object.SearchFilters) ([]*object.Address, error) {
|
2020-11-19 08:15:49 +00:00
|
|
|
res, err := storage.Select(new(SelectPrm).
|
2020-12-10 13:24:27 +00:00
|
|
|
WithContainerID(cid).
|
2020-11-19 08:15:49 +00:00
|
|
|
WithFilters(fs),
|
|
|
|
)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
return res.AddressList(), nil
|
|
|
|
}
|
|
|
|
|
2020-12-03 12:01:45 +00:00
|
|
|
// List returns `limit` available physically storage object addresses in
|
|
|
|
// engine. If limit is zero, then returns all available object addresses.
|
|
|
|
func List(storage *StorageEngine, limit uint64) ([]*object.Address, error) {
|
|
|
|
res, err := storage.List(limit)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
return res.AddressList(), nil
|
2020-11-19 08:15:49 +00:00
|
|
|
}
|