2020-12-03 12:01:21 +00:00
|
|
|
package shard
|
|
|
|
|
|
|
|
import (
|
2023-04-12 14:01:29 +00:00
|
|
|
"context"
|
2020-12-03 12:01:21 +00:00
|
|
|
"fmt"
|
|
|
|
|
2023-04-12 14:35:10 +00:00
|
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
2023-03-07 13:38:26 +00:00
|
|
|
objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
|
|
|
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
2023-09-27 08:02:06 +00:00
|
|
|
tracingPkg "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/tracing"
|
2023-06-06 09:27:19 +00:00
|
|
|
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
2023-03-07 13:38:26 +00:00
|
|
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
2023-07-06 12:36:41 +00:00
|
|
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
2023-06-06 09:27:19 +00:00
|
|
|
"go.opentelemetry.io/otel/attribute"
|
|
|
|
"go.opentelemetry.io/otel/trace"
|
2020-12-03 12:01:21 +00:00
|
|
|
"go.uber.org/zap"
|
|
|
|
)
|
|
|
|
|
2021-11-11 14:27:11 +00:00
|
|
|
// Cursor is a type for continuous object listing.
|
|
|
|
type Cursor = meta.Cursor
|
|
|
|
|
2021-11-17 11:31:31 +00:00
|
|
|
// ErrEndOfListing is returned from object listing with cursor
|
|
|
|
// when storage can't return any more objects after provided
|
|
|
|
// cursor. Use nil cursor object to start listing again.
|
|
|
|
var ErrEndOfListing = meta.ErrEndOfListing
|
|
|
|
|
2021-01-22 13:21:45 +00:00
|
|
|
type ListContainersPrm struct{}
|
|
|
|
|
|
|
|
type ListContainersRes struct {
|
2022-05-31 17:00:41 +00:00
|
|
|
containers []cid.ID
|
2021-01-22 13:21:45 +00:00
|
|
|
}
|
|
|
|
|
2022-05-31 11:50:39 +00:00
|
|
|
func (r ListContainersRes) Containers() []cid.ID {
|
2021-01-22 13:21:45 +00:00
|
|
|
return r.containers
|
|
|
|
}
|
|
|
|
|
2024-09-18 09:15:32 +00:00
|
|
|
// IterateOverContainersPrm contains parameters for IterateOverContainers operation.
|
|
|
|
type IterateOverContainersPrm struct {
|
|
|
|
// Handler function executed upon containers in db.
|
|
|
|
Handler func(context.Context, []byte, cid.ID) error
|
|
|
|
}
|
|
|
|
|
|
|
|
// IterateOverObjectsInContainerPrm contains parameters for IterateOverObjectsInContainer operation.
|
|
|
|
type IterateOverObjectsInContainerPrm struct {
|
|
|
|
// BucketName container's bucket name.
|
|
|
|
BucketName []byte
|
2024-09-03 12:42:38 +00:00
|
|
|
// Handler function executed upon objects in db.
|
2024-09-18 09:15:32 +00:00
|
|
|
Handler func(context.Context, *objectcore.Info) error
|
|
|
|
}
|
|
|
|
|
2024-09-03 12:42:38 +00:00
|
|
|
// CountAliveObjectsInBucketPrm contains parameters for CountAliveObjectsInBucket operation.
|
|
|
|
type CountAliveObjectsInBucketPrm struct {
|
|
|
|
// BucketName container's bucket name.
|
|
|
|
BucketName []byte
|
|
|
|
}
|
|
|
|
|
2021-10-27 13:33:26 +00:00
|
|
|
// ListWithCursorPrm contains parameters for ListWithCursor operation.
|
|
|
|
type ListWithCursorPrm struct {
|
|
|
|
count uint32
|
2021-11-11 14:27:11 +00:00
|
|
|
cursor *Cursor
|
2021-10-27 13:33:26 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// ListWithCursorRes contains values returned from ListWithCursor operation.
|
|
|
|
type ListWithCursorRes struct {
|
2024-05-13 13:50:21 +00:00
|
|
|
addrList []objectcore.Info
|
2021-11-11 14:27:11 +00:00
|
|
|
cursor *Cursor
|
2021-10-27 13:33:26 +00:00
|
|
|
}
|
|
|
|
|
2021-11-12 09:52:19 +00:00
|
|
|
// WithCount sets maximum amount of addresses that ListWithCursor should return.
|
2022-05-20 18:08:59 +00:00
|
|
|
func (p *ListWithCursorPrm) WithCount(count uint32) {
|
2021-10-27 13:33:26 +00:00
|
|
|
p.count = count
|
|
|
|
}
|
|
|
|
|
|
|
|
// WithCursor sets cursor for ListWithCursor operation. For initial request,
|
2021-11-12 09:52:19 +00:00
|
|
|
// ignore this param or use nil value. For consecutive requests, use value
|
2021-10-27 13:33:26 +00:00
|
|
|
// from ListWithCursorRes.
|
2022-05-20 18:08:59 +00:00
|
|
|
func (p *ListWithCursorPrm) WithCursor(cursor *Cursor) {
|
2021-10-27 13:33:26 +00:00
|
|
|
p.cursor = cursor
|
|
|
|
}
|
|
|
|
|
|
|
|
// AddressList returns addresses selected by ListWithCursor operation.
|
2024-05-13 13:50:21 +00:00
|
|
|
func (r ListWithCursorRes) AddressList() []objectcore.Info {
|
2021-10-27 13:33:26 +00:00
|
|
|
return r.addrList
|
|
|
|
}
|
|
|
|
|
|
|
|
// Cursor returns cursor for consecutive listing requests.
|
2021-11-11 14:27:11 +00:00
|
|
|
func (r ListWithCursorRes) Cursor() *Cursor {
|
2021-10-27 13:33:26 +00:00
|
|
|
return r.cursor
|
|
|
|
}
|
|
|
|
|
2021-09-24 15:04:00 +00:00
|
|
|
// List returns all objects physically stored in the Shard.
|
2023-06-06 09:27:19 +00:00
|
|
|
func (s *Shard) List(ctx context.Context) (res SelectRes, err error) {
|
|
|
|
ctx, span := tracing.StartSpanFromContext(ctx, "Shard.List",
|
|
|
|
trace.WithAttributes(
|
|
|
|
attribute.String("shard_id", s.ID().String()),
|
|
|
|
))
|
|
|
|
defer span.End()
|
|
|
|
|
2022-12-07 17:42:35 +00:00
|
|
|
s.m.RLock()
|
|
|
|
defer s.m.RUnlock()
|
|
|
|
|
|
|
|
if s.info.Mode.NoMetabase() {
|
2022-10-26 06:12:09 +00:00
|
|
|
return SelectRes{}, ErrDegradedMode
|
|
|
|
}
|
|
|
|
|
2023-06-06 09:27:19 +00:00
|
|
|
lst, err := s.metaBase.Containers(ctx)
|
2020-12-03 12:01:21 +00:00
|
|
|
if err != nil {
|
2022-05-31 11:50:39 +00:00
|
|
|
return res, fmt.Errorf("can't list stored containers: %w", err)
|
2020-12-03 12:01:21 +00:00
|
|
|
}
|
|
|
|
|
2023-07-06 12:36:41 +00:00
|
|
|
filters := objectSDK.NewSearchFilters()
|
2021-09-24 15:04:00 +00:00
|
|
|
filters.AddPhyFilter()
|
2020-12-03 12:01:21 +00:00
|
|
|
|
|
|
|
for i := range lst {
|
2022-07-12 14:42:55 +00:00
|
|
|
var sPrm meta.SelectPrm
|
2022-07-12 14:59:37 +00:00
|
|
|
sPrm.SetContainerID(lst[i])
|
|
|
|
sPrm.SetFilters(filters)
|
2022-07-12 14:42:55 +00:00
|
|
|
|
2023-06-06 09:27:19 +00:00
|
|
|
sRes, err := s.metaBase.Select(ctx, sPrm) // consider making List in metabase
|
2020-12-03 12:01:21 +00:00
|
|
|
if err != nil {
|
2023-04-12 14:35:10 +00:00
|
|
|
s.log.Debug(logs.ShardCantSelectAllObjects,
|
2020-12-03 12:01:21 +00:00
|
|
|
zap.Stringer("cid", lst[i]),
|
2023-09-27 08:02:06 +00:00
|
|
|
zap.String("error", err.Error()),
|
|
|
|
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
|
2020-12-03 12:01:21 +00:00
|
|
|
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
2022-07-12 14:42:55 +00:00
|
|
|
res.addrList = append(res.addrList, sRes.AddressList()...)
|
2020-12-03 12:01:21 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
return res, nil
|
|
|
|
}
|
2021-01-22 13:21:45 +00:00
|
|
|
|
2023-06-06 09:27:19 +00:00
|
|
|
func (s *Shard) ListContainers(ctx context.Context, _ ListContainersPrm) (ListContainersRes, error) {
|
|
|
|
ctx, span := tracing.StartSpanFromContext(ctx, "Shard.ListContainers",
|
|
|
|
trace.WithAttributes(
|
|
|
|
attribute.String("shard_id", s.ID().String()),
|
|
|
|
))
|
|
|
|
defer span.End()
|
|
|
|
|
2022-10-26 06:12:09 +00:00
|
|
|
if s.GetMode().NoMetabase() {
|
|
|
|
return ListContainersRes{}, ErrDegradedMode
|
|
|
|
}
|
|
|
|
|
2023-06-06 09:27:19 +00:00
|
|
|
containers, err := s.metaBase.Containers(ctx)
|
2021-01-22 13:21:45 +00:00
|
|
|
if err != nil {
|
2022-05-31 11:50:39 +00:00
|
|
|
return ListContainersRes{}, fmt.Errorf("could not get list of containers: %w", err)
|
2021-01-22 13:21:45 +00:00
|
|
|
}
|
|
|
|
|
2022-05-31 11:50:39 +00:00
|
|
|
return ListContainersRes{
|
2021-01-22 13:21:45 +00:00
|
|
|
containers: containers,
|
|
|
|
}, nil
|
|
|
|
}
|
|
|
|
|
2021-10-27 13:33:26 +00:00
|
|
|
// ListWithCursor lists physical objects available in shard starting from
|
|
|
|
// cursor. Includes regular, tombstone and storage group objects. Does not
|
|
|
|
// include inhumed objects. Use cursor value from response for consecutive requests.
|
2021-11-12 09:52:19 +00:00
|
|
|
//
|
|
|
|
// Returns ErrEndOfListing if there are no more objects to return or count
|
|
|
|
// parameter set to zero.
|
2023-06-06 09:27:19 +00:00
|
|
|
func (s *Shard) ListWithCursor(ctx context.Context, prm ListWithCursorPrm) (ListWithCursorRes, error) {
|
|
|
|
_, span := tracing.StartSpanFromContext(ctx, "shard.ListWithCursor",
|
|
|
|
trace.WithAttributes(
|
|
|
|
attribute.Int64("count", int64(prm.count)),
|
|
|
|
attribute.Bool("has_cursor", prm.cursor != nil),
|
|
|
|
))
|
|
|
|
defer span.End()
|
|
|
|
|
2022-06-29 11:27:36 +00:00
|
|
|
if s.GetMode().NoMetabase() {
|
|
|
|
return ListWithCursorRes{}, ErrDegradedMode
|
|
|
|
}
|
|
|
|
|
2022-05-20 16:48:14 +00:00
|
|
|
var metaPrm meta.ListPrm
|
2022-07-12 14:59:37 +00:00
|
|
|
metaPrm.SetCount(prm.count)
|
|
|
|
metaPrm.SetCursor(prm.cursor)
|
2023-06-06 09:27:19 +00:00
|
|
|
res, err := s.metaBase.ListWithCursor(ctx, metaPrm)
|
2021-10-27 13:33:26 +00:00
|
|
|
if err != nil {
|
2022-05-31 11:50:39 +00:00
|
|
|
return ListWithCursorRes{}, fmt.Errorf("could not get list of objects: %w", err)
|
2021-10-27 13:33:26 +00:00
|
|
|
}
|
|
|
|
|
2022-05-31 11:50:39 +00:00
|
|
|
return ListWithCursorRes{
|
2021-10-27 13:33:26 +00:00
|
|
|
addrList: res.AddressList(),
|
|
|
|
cursor: res.Cursor(),
|
|
|
|
}, nil
|
|
|
|
}
|
2024-09-18 09:15:32 +00:00
|
|
|
|
|
|
|
// IterateOverContainers lists physical containers presented in shard.
|
|
|
|
func (s *Shard) IterateOverContainers(ctx context.Context, prm IterateOverContainersPrm) error {
|
|
|
|
_, span := tracing.StartSpanFromContext(ctx, "shard.IterateOverContainers",
|
|
|
|
trace.WithAttributes(
|
|
|
|
attribute.Bool("has_handler", prm.Handler != nil),
|
|
|
|
))
|
|
|
|
defer span.End()
|
|
|
|
|
|
|
|
s.m.RLock()
|
|
|
|
defer s.m.RUnlock()
|
|
|
|
|
|
|
|
if s.info.Mode.NoMetabase() {
|
|
|
|
return ErrDegradedMode
|
|
|
|
}
|
|
|
|
|
|
|
|
var metaPrm meta.IterateOverContainersPrm
|
|
|
|
metaPrm.Handler = prm.Handler
|
|
|
|
err := s.metaBase.IterateOverContainers(ctx, metaPrm)
|
|
|
|
if err != nil {
|
|
|
|
return fmt.Errorf("could not iterate over containers: %w", err)
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// IterateOverObjectsInContainer lists physical objects presented in shard for provided container's bucket name.
|
|
|
|
func (s *Shard) IterateOverObjectsInContainer(ctx context.Context, prm IterateOverObjectsInContainerPrm) error {
|
|
|
|
_, span := tracing.StartSpanFromContext(ctx, "shard.IterateOverObjectsInContainer",
|
|
|
|
trace.WithAttributes(
|
|
|
|
attribute.Bool("has_handler", prm.Handler != nil),
|
|
|
|
))
|
|
|
|
defer span.End()
|
|
|
|
|
|
|
|
s.m.RLock()
|
|
|
|
defer s.m.RUnlock()
|
|
|
|
|
|
|
|
if s.info.Mode.NoMetabase() {
|
|
|
|
return ErrDegradedMode
|
|
|
|
}
|
|
|
|
|
|
|
|
var metaPrm meta.IterateOverObjectsInContainerPrm
|
|
|
|
metaPrm.BucketName = prm.BucketName
|
|
|
|
metaPrm.Handler = prm.Handler
|
|
|
|
err := s.metaBase.IterateOverObjectsInContainer(ctx, metaPrm)
|
|
|
|
if err != nil {
|
|
|
|
return fmt.Errorf("could not iterate over objects: %w", err)
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
2024-09-03 12:42:38 +00:00
|
|
|
|
|
|
|
// CountAliveObjectsInBucket count objects in bucket which aren't in graveyard or garbage.
|
|
|
|
func (s *Shard) CountAliveObjectsInBucket(ctx context.Context, prm CountAliveObjectsInBucketPrm) (uint64, error) {
|
|
|
|
_, span := tracing.StartSpanFromContext(ctx, "shard.CountAliveObjectsInBucket")
|
|
|
|
defer span.End()
|
|
|
|
|
|
|
|
s.m.RLock()
|
|
|
|
defer s.m.RUnlock()
|
|
|
|
|
|
|
|
if s.info.Mode.NoMetabase() {
|
|
|
|
return 0, ErrDegradedMode
|
|
|
|
}
|
|
|
|
|
|
|
|
var metaPrm meta.CountAliveObjectsInBucketPrm
|
|
|
|
metaPrm.BucketName = prm.BucketName
|
|
|
|
count, err := s.metaBase.CountAliveObjectsInBucket(ctx, metaPrm)
|
|
|
|
if err != nil {
|
|
|
|
return 0, fmt.Errorf("could not count alive objects in bucket: %w", err)
|
|
|
|
}
|
|
|
|
|
|
|
|
return count, nil
|
|
|
|
}
|