package shard import ( "context" "fmt" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" tracingPkg "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/tracing" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" "go.uber.org/zap" ) // Cursor is a type for continuous object listing. type Cursor = meta.Cursor // ErrEndOfListing is returned from object listing with cursor // when storage can't return any more objects after provided // cursor. Use nil cursor object to start listing again. var ErrEndOfListing = meta.ErrEndOfListing type ListContainersPrm struct{} type ListContainersRes struct { containers []cid.ID } func (r ListContainersRes) Containers() []cid.ID { return r.containers } // IterateOverContainersPrm contains parameters for IterateOverContainers operation. type IterateOverContainersPrm struct { // Handler function executed upon containers in db. Handler func(context.Context, objectSDK.Type, cid.ID) error } // IterateOverObjectsInContainerPrm contains parameters for IterateOverObjectsInContainer operation. type IterateOverObjectsInContainerPrm struct { // ObjectType type of objects to iterate over. ObjectType objectSDK.Type // ContainerID container for objects to iterate over. ContainerID cid.ID // Handler function executed upon objects in db. Handler func(context.Context, *objectcore.Info) error } // CountAliveObjectsInContainerPrm contains parameters for CountAliveObjectsInContainer operation. type CountAliveObjectsInContainerPrm struct { // ObjectType type of objects to iterate over. ObjectType objectSDK.Type // ContainerID container for objects to iterate over. ContainerID cid.ID } // ListWithCursorPrm contains parameters for ListWithCursor operation. type ListWithCursorPrm struct { count uint32 cursor *Cursor } // ListWithCursorRes contains values returned from ListWithCursor operation. type ListWithCursorRes struct { addrList []objectcore.Info cursor *Cursor } // WithCount sets maximum amount of addresses that ListWithCursor should return. func (p *ListWithCursorPrm) WithCount(count uint32) { p.count = count } // WithCursor sets cursor for ListWithCursor operation. For initial request, // ignore this param or use nil value. For consecutive requests, use value // from ListWithCursorRes. func (p *ListWithCursorPrm) WithCursor(cursor *Cursor) { p.cursor = cursor } // AddressList returns addresses selected by ListWithCursor operation. func (r ListWithCursorRes) AddressList() []objectcore.Info { return r.addrList } // Cursor returns cursor for consecutive listing requests. func (r ListWithCursorRes) Cursor() *Cursor { return r.cursor } // List returns all objects physically stored in the Shard. func (s *Shard) List(ctx context.Context) (res SelectRes, err error) { ctx, span := tracing.StartSpanFromContext(ctx, "Shard.List", trace.WithAttributes( attribute.String("shard_id", s.ID().String()), )) defer span.End() s.m.RLock() defer s.m.RUnlock() if s.info.Mode.NoMetabase() { return SelectRes{}, ErrDegradedMode } lst, err := s.metaBase.Containers(ctx) if err != nil { return res, fmt.Errorf("can't list stored containers: %w", err) } filters := objectSDK.NewSearchFilters() filters.AddPhyFilter() for i := range lst { var sPrm meta.SelectPrm sPrm.SetContainerID(lst[i]) sPrm.SetFilters(filters) sRes, err := s.metaBase.Select(ctx, sPrm) // consider making List in metabase if err != nil { s.log.Debug(ctx, logs.ShardCantSelectAllObjects, zap.Stringer("cid", lst[i]), zap.Error(err), zap.String("trace_id", tracingPkg.GetTraceID(ctx))) continue } res.addrList = append(res.addrList, sRes.AddressList()...) } return res, nil } func (s *Shard) ListContainers(ctx context.Context, _ ListContainersPrm) (ListContainersRes, error) { ctx, span := tracing.StartSpanFromContext(ctx, "Shard.ListContainers", trace.WithAttributes( attribute.String("shard_id", s.ID().String()), )) defer span.End() if s.GetMode().NoMetabase() { return ListContainersRes{}, ErrDegradedMode } containers, err := s.metaBase.Containers(ctx) if err != nil { return ListContainersRes{}, fmt.Errorf("could not get list of containers: %w", err) } return ListContainersRes{ containers: containers, }, nil } // ListWithCursor lists physical objects available in shard starting from // cursor. Includes regular, tombstone and storage group objects. Does not // include inhumed objects. Use cursor value from response for consecutive requests. // // Returns ErrEndOfListing if there are no more objects to return or count // parameter set to zero. func (s *Shard) ListWithCursor(ctx context.Context, prm ListWithCursorPrm) (ListWithCursorRes, error) { _, span := tracing.StartSpanFromContext(ctx, "shard.ListWithCursor", trace.WithAttributes( attribute.Int64("count", int64(prm.count)), attribute.Bool("has_cursor", prm.cursor != nil), )) defer span.End() if s.GetMode().NoMetabase() { return ListWithCursorRes{}, ErrDegradedMode } var metaPrm meta.ListPrm metaPrm.SetCount(prm.count) metaPrm.SetCursor(prm.cursor) res, err := s.metaBase.ListWithCursor(ctx, metaPrm) if err != nil { return ListWithCursorRes{}, fmt.Errorf("could not get list of objects: %w", err) } return ListWithCursorRes{ addrList: res.AddressList(), cursor: res.Cursor(), }, nil } // IterateOverContainers lists physical containers presented in shard. func (s *Shard) IterateOverContainers(ctx context.Context, prm IterateOverContainersPrm) error { _, span := tracing.StartSpanFromContext(ctx, "shard.IterateOverContainers", trace.WithAttributes( attribute.Bool("has_handler", prm.Handler != nil), )) defer span.End() s.m.RLock() defer s.m.RUnlock() if s.info.Mode.NoMetabase() { return ErrDegradedMode } var metaPrm meta.IterateOverContainersPrm metaPrm.Handler = prm.Handler err := s.metaBase.IterateOverContainers(ctx, metaPrm) if err != nil { return fmt.Errorf("could not iterate over containers: %w", err) } return nil } // IterateOverObjectsInContainer lists physical objects presented in shard for provided container's bucket name. func (s *Shard) IterateOverObjectsInContainer(ctx context.Context, prm IterateOverObjectsInContainerPrm) error { _, span := tracing.StartSpanFromContext(ctx, "shard.IterateOverObjectsInContainer", trace.WithAttributes( attribute.Bool("has_handler", prm.Handler != nil), )) defer span.End() s.m.RLock() defer s.m.RUnlock() if s.info.Mode.NoMetabase() { return ErrDegradedMode } var metaPrm meta.IterateOverObjectsInContainerPrm metaPrm.ContainerID = prm.ContainerID metaPrm.ObjectType = prm.ObjectType metaPrm.Handler = prm.Handler err := s.metaBase.IterateOverObjectsInContainer(ctx, metaPrm) if err != nil { return fmt.Errorf("could not iterate over objects: %w", err) } return nil } // CountAliveObjectsInContainer count objects in bucket which aren't in graveyard or garbage. func (s *Shard) CountAliveObjectsInContainer(ctx context.Context, prm CountAliveObjectsInContainerPrm) (uint64, error) { _, span := tracing.StartSpanFromContext(ctx, "shard.CountAliveObjectsInBucket") defer span.End() s.m.RLock() defer s.m.RUnlock() if s.info.Mode.NoMetabase() { return 0, ErrDegradedMode } var metaPrm meta.CountAliveObjectsInContainerPrm metaPrm.ObjectType = prm.ObjectType metaPrm.ContainerID = prm.ContainerID count, err := s.metaBase.CountAliveObjectsInContainer(ctx, metaPrm) if err != nil { return 0, fmt.Errorf("could not count alive objects in bucket: %w", err) } return count, nil }