frostfs-node/pkg/local_object_storage/shard/list.go

266 lines
7.7 KiB
Go
Raw Normal View History

package shard
import (
"context"
"fmt"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
tracingPkg "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
)
// Cursor is a type for continuous object listing.
type Cursor = meta.Cursor
// ErrEndOfListing is returned from object listing with cursor
// when storage can't return any more objects after provided
// cursor. Use nil cursor object to start listing again.
var ErrEndOfListing = meta.ErrEndOfListing
type ListContainersPrm struct{}
type ListContainersRes struct {
containers []cid.ID
}
func (r ListContainersRes) Containers() []cid.ID {
return r.containers
}
// IterateOverContainersPrm contains parameters for IterateOverContainers operation.
type IterateOverContainersPrm struct {
// Handler function executed upon containers in db.
Handler func(context.Context, objectSDK.Type, cid.ID) error
}
// IterateOverObjectsInContainerPrm contains parameters for IterateOverObjectsInContainer operation.
type IterateOverObjectsInContainerPrm struct {
// ObjectType type of objects to iterate over.
ObjectType objectSDK.Type
// ContainerID container for objects to iterate over.
ContainerID cid.ID
// Handler function executed upon objects in db.
Handler func(context.Context, *objectcore.Info) error
}
// CountAliveObjectsInContainerPrm contains parameters for CountAliveObjectsInContainer operation.
type CountAliveObjectsInContainerPrm struct {
// ObjectType type of objects to iterate over.
ObjectType objectSDK.Type
// ContainerID container for objects to iterate over.
ContainerID cid.ID
}
// ListWithCursorPrm contains parameters for ListWithCursor operation.
type ListWithCursorPrm struct {
count uint32
cursor *Cursor
}
// ListWithCursorRes contains values returned from ListWithCursor operation.
type ListWithCursorRes struct {
addrList []objectcore.Info
cursor *Cursor
}
// WithCount sets maximum amount of addresses that ListWithCursor should return.
func (p *ListWithCursorPrm) WithCount(count uint32) {
p.count = count
}
// WithCursor sets cursor for ListWithCursor operation. For initial request,
// ignore this param or use nil value. For consecutive requests, use value
// from ListWithCursorRes.
func (p *ListWithCursorPrm) WithCursor(cursor *Cursor) {
p.cursor = cursor
}
// AddressList returns addresses selected by ListWithCursor operation.
func (r ListWithCursorRes) AddressList() []objectcore.Info {
return r.addrList
}
// Cursor returns cursor for consecutive listing requests.
func (r ListWithCursorRes) Cursor() *Cursor {
return r.cursor
}
// List returns all objects physically stored in the Shard.
func (s *Shard) List(ctx context.Context) (res SelectRes, err error) {
ctx, span := tracing.StartSpanFromContext(ctx, "Shard.List",
trace.WithAttributes(
attribute.String("shard_id", s.ID().String()),
))
defer span.End()
s.m.RLock()
defer s.m.RUnlock()
if s.info.Mode.NoMetabase() {
return SelectRes{}, ErrDegradedMode
}
lst, err := s.metaBase.Containers(ctx)
if err != nil {
return res, fmt.Errorf("list stored containers: %w", err)
}
filters := objectSDK.NewSearchFilters()
filters.AddPhyFilter()
for i := range lst {
var sPrm meta.SelectPrm
sPrm.SetContainerID(lst[i])
sPrm.SetFilters(filters)
sRes, err := s.metaBase.Select(ctx, sPrm) // consider making List in metabase
if err != nil {
s.log.Debug(ctx, logs.ShardCantSelectAllObjects,
zap.Stringer("cid", lst[i]),
zap.Error(err),
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
continue
}
res.addrList = append(res.addrList, sRes.AddressList()...)
}
return res, nil
}
func (s *Shard) ListContainers(ctx context.Context, _ ListContainersPrm) (ListContainersRes, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "Shard.ListContainers",
trace.WithAttributes(
attribute.String("shard_id", s.ID().String()),
))
defer span.End()
if s.GetMode().NoMetabase() {
return ListContainersRes{}, ErrDegradedMode
}
containers, err := s.metaBase.Containers(ctx)
if err != nil {
return ListContainersRes{}, fmt.Errorf("get list of containers: %w", err)
}
return ListContainersRes{
containers: containers,
}, nil
}
// ListWithCursor lists physical objects available in shard starting from
// cursor. Includes regular, tombstone and storage group objects. Does not
// include inhumed objects. Use cursor value from response for consecutive requests.
//
// Returns ErrEndOfListing if there are no more objects to return or count
// parameter set to zero.
func (s *Shard) ListWithCursor(ctx context.Context, prm ListWithCursorPrm) (ListWithCursorRes, error) {
_, span := tracing.StartSpanFromContext(ctx, "shard.ListWithCursor",
trace.WithAttributes(
attribute.Int64("count", int64(prm.count)),
attribute.Bool("has_cursor", prm.cursor != nil),
))
defer span.End()
if s.GetMode().NoMetabase() {
return ListWithCursorRes{}, ErrDegradedMode
}
var metaPrm meta.ListPrm
metaPrm.SetCount(prm.count)
metaPrm.SetCursor(prm.cursor)
res, err := s.metaBase.ListWithCursor(ctx, metaPrm)
if err != nil {
return ListWithCursorRes{}, fmt.Errorf("get list of objects: %w", err)
}
return ListWithCursorRes{
addrList: res.AddressList(),
cursor: res.Cursor(),
}, nil
}
// IterateOverContainers lists physical containers presented in shard.
func (s *Shard) IterateOverContainers(ctx context.Context, prm IterateOverContainersPrm) error {
_, span := tracing.StartSpanFromContext(ctx, "shard.IterateOverContainers",
trace.WithAttributes(
attribute.Bool("has_handler", prm.Handler != nil),
))
defer span.End()
s.m.RLock()
defer s.m.RUnlock()
if s.info.Mode.NoMetabase() {
return ErrDegradedMode
}
var metaPrm meta.IterateOverContainersPrm
metaPrm.Handler = prm.Handler
err := s.metaBase.IterateOverContainers(ctx, metaPrm)
if err != nil {
return fmt.Errorf("iterate over containers: %w", err)
}
return nil
}
// IterateOverObjectsInContainer lists physical objects presented in shard for provided container's bucket name.
func (s *Shard) IterateOverObjectsInContainer(ctx context.Context, prm IterateOverObjectsInContainerPrm) error {
_, span := tracing.StartSpanFromContext(ctx, "shard.IterateOverObjectsInContainer",
trace.WithAttributes(
attribute.Bool("has_handler", prm.Handler != nil),
))
defer span.End()
s.m.RLock()
defer s.m.RUnlock()
if s.info.Mode.NoMetabase() {
return ErrDegradedMode
}
var metaPrm meta.IterateOverObjectsInContainerPrm
metaPrm.ContainerID = prm.ContainerID
metaPrm.ObjectType = prm.ObjectType
metaPrm.Handler = prm.Handler
err := s.metaBase.IterateOverObjectsInContainer(ctx, metaPrm)
if err != nil {
return fmt.Errorf("iterate over objects: %w", err)
}
return nil
}
// CountAliveObjectsInContainer count objects in bucket which aren't in graveyard or garbage.
func (s *Shard) CountAliveObjectsInContainer(ctx context.Context, prm CountAliveObjectsInContainerPrm) (uint64, error) {
_, span := tracing.StartSpanFromContext(ctx, "shard.CountAliveObjectsInBucket")
defer span.End()
s.m.RLock()
defer s.m.RUnlock()
if s.info.Mode.NoMetabase() {
return 0, ErrDegradedMode
}
var metaPrm meta.CountAliveObjectsInContainerPrm
metaPrm.ObjectType = prm.ObjectType
metaPrm.ContainerID = prm.ContainerID
count, err := s.metaBase.CountAliveObjectsInContainer(ctx, metaPrm)
if err != nil {
return 0, fmt.Errorf("count alive objects in bucket: %w", err)
}
return count, nil
}