frostfs-node/pkg/local_object_storage/engine/select.go

166 lines
4.7 KiB
Go
Raw Normal View History

package engine
import (
"context"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)
// SelectPrm groups the parameters of Select operation.
type SelectPrm struct {
cnr cid.ID
filters objectSDK.SearchFilters
indexedContainer bool
}
// SelectRes groups the resulting values of Select operation.
type SelectRes struct {
addrList []oid.Address
}
// WithContainerID is a Select option to set the container id to search in.
func (p *SelectPrm) WithContainerID(cnr cid.ID, indexedContainer bool) {
p.cnr = cnr
p.indexedContainer = indexedContainer
}
// WithFilters is a Select option to set the object filters.
func (p *SelectPrm) WithFilters(fs objectSDK.SearchFilters) {
p.filters = fs
}
// AddressList returns list of addresses of the selected objects.
func (r SelectRes) AddressList() []oid.Address {
return r.addrList
}
// Select selects the objects from local storage that match select parameters.
//
// Returns any error encountered that did not allow to completely select the objects.
//
// Returns an error if executions are blocked (see BlockExecution).
func (e *StorageEngine) Select(ctx context.Context, prm SelectPrm) (res SelectRes, err error) {
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.Select",
trace.WithAttributes(
attribute.String("container_id", prm.cnr.EncodeToString()),
))
defer span.End()
defer elapsed("Select", e.metrics.AddMethodDuration)()
err = e.execIfNotBlocked(func() error {
res, err = e._select(ctx, prm)
return err
})
return
}
func (e *StorageEngine) _select(ctx context.Context, prm SelectPrm) (SelectRes, error) {
addrList := make([]oid.Address, 0)
uniqueMap := make(map[string]struct{})
var outError error
var shPrm shard.SelectPrm
shPrm.SetContainerID(prm.cnr, prm.indexedContainer)
shPrm.SetFilters(prm.filters)
e.iterateOverUnsortedShards(func(sh hashedShard) (stop bool) {
res, err := sh.Select(ctx, shPrm)
if err != nil {
e.reportShardError(ctx, sh, "could not select objects from shard", err)
return false
}
for _, addr := range res.AddressList() { // save only unique values
if _, ok := uniqueMap[addr.EncodeToString()]; !ok {
uniqueMap[addr.EncodeToString()] = struct{}{}
addrList = append(addrList, addr)
}
}
return false
})
return SelectRes{
addrList: addrList,
}, outError
}
// List returns `limit` available physically storage object addresses in engine.
// If limit is zero, then returns all available object addresses.
//
// Returns an error if executions are blocked (see BlockExecution).
func (e *StorageEngine) List(ctx context.Context, limit uint64) (res SelectRes, err error) {
defer elapsed("List", e.metrics.AddMethodDuration)()
err = e.execIfNotBlocked(func() error {
res, err = e.list(ctx, limit)
return err
})
return
}
func (e *StorageEngine) list(ctx context.Context, limit uint64) (SelectRes, error) {
addrList := make([]oid.Address, 0, limit)
uniqueMap := make(map[string]struct{})
ln := uint64(0)
// consider iterating over shuffled shards
e.iterateOverUnsortedShards(func(sh hashedShard) (stop bool) {
res, err := sh.List(ctx) // consider limit result of shard iterator
if err != nil {
e.reportShardError(ctx, sh, "could not select objects from shard", err)
} else {
for _, addr := range res.AddressList() { // save only unique values
if _, ok := uniqueMap[addr.EncodeToString()]; !ok {
uniqueMap[addr.EncodeToString()] = struct{}{}
addrList = append(addrList, addr)
ln++
if limit > 0 && ln >= limit {
return true
}
}
}
}
return false
})
return SelectRes{
addrList: addrList,
}, nil
}
// Select selects objects from local storage using provided filters.
func Select(ctx context.Context, storage *StorageEngine, cnr cid.ID, isIndexedContainer bool, fs objectSDK.SearchFilters) ([]oid.Address, error) {
var selectPrm SelectPrm
selectPrm.WithContainerID(cnr, isIndexedContainer)
selectPrm.WithFilters(fs)
res, err := storage.Select(ctx, selectPrm)
if err != nil {
return nil, err
}
return res.AddressList(), nil
}
// List returns `limit` available physically storage object addresses in
// engine. If limit is zero, then returns all available object addresses.
func List(ctx context.Context, storage *StorageEngine, limit uint64) ([]oid.Address, error) {
res, err := storage.List(ctx, limit)
if err != nil {
return nil, err
}
return res.AddressList(), nil
}