frostfs-node/pkg/local_object_storage/shard/tree.go

406 lines
11 KiB
Go
Raw Permalink Normal View History

package shard
import (
"context"
"strconv"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
cidSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)
var _ pilorama.Forest = (*Shard)(nil)
// ErrPiloramaDisabled is returned when pilorama was disabled in the configuration.
var ErrPiloramaDisabled = logicerr.New("pilorama is disabled")
// TreeMove implements the pilorama.Forest interface.
func (s *Shard) TreeMove(ctx context.Context, d pilorama.CIDDescriptor, treeID string, m *pilorama.Move) (*pilorama.Move, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "Shard.TreeMove",
trace.WithAttributes(
attribute.String("shard_id", s.ID().String()),
attribute.String("container_id", d.CID.EncodeToString()),
attribute.Int("position", d.Position),
attribute.Int("size", d.Size),
attribute.String("tree_id", treeID),
),
)
defer span.End()
if s.pilorama == nil {
return nil, ErrPiloramaDisabled
}
s.m.RLock()
defer s.m.RUnlock()
if s.info.Mode.ReadOnly() {
return nil, ErrReadOnlyMode
}
if s.info.Mode.NoMetabase() {
return nil, ErrDegradedMode
}
return s.pilorama.TreeMove(ctx, d, treeID, m)
}
// TreeAddByPath implements the pilorama.Forest interface.
func (s *Shard) TreeAddByPath(ctx context.Context, d pilorama.CIDDescriptor, treeID string, attr string, path []string, meta []pilorama.KeyValue) ([]pilorama.Move, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "Shard.TreeAddByPath",
trace.WithAttributes(
attribute.String("shard_id", s.ID().String()),
attribute.String("container_id", d.CID.EncodeToString()),
attribute.Int("position", d.Position),
attribute.Int("size", d.Size),
attribute.String("tree_id", treeID),
attribute.String("attr", attr),
attribute.Int("path_count", len(path)),
attribute.Int("meta_count", len(meta)),
),
)
defer span.End()
if s.pilorama == nil {
return nil, ErrPiloramaDisabled
}
s.m.RLock()
defer s.m.RUnlock()
if s.info.Mode.ReadOnly() {
return nil, ErrReadOnlyMode
}
if s.info.Mode.NoMetabase() {
return nil, ErrDegradedMode
}
return s.pilorama.TreeAddByPath(ctx, d, treeID, attr, path, meta)
}
// TreeApply implements the pilorama.Forest interface.
func (s *Shard) TreeApply(ctx context.Context, cnr cidSDK.ID, treeID string, m *pilorama.Move, backgroundSync bool) error {
ctx, span := tracing.StartSpanFromContext(ctx, "Shard.TreeApply",
trace.WithAttributes(
attribute.String("shard_id", s.ID().String()),
attribute.String("container_id", cnr.EncodeToString()),
attribute.String("tree_id", treeID),
attribute.Bool("background", backgroundSync),
),
)
defer span.End()
if s.pilorama == nil {
return ErrPiloramaDisabled
}
s.m.RLock()
defer s.m.RUnlock()
if s.info.Mode.ReadOnly() {
return ErrReadOnlyMode
}
if s.info.Mode.NoMetabase() {
return ErrDegradedMode
}
return s.pilorama.TreeApply(ctx, cnr, treeID, m, backgroundSync)
}
// TreeGetByPath implements the pilorama.Forest interface.
func (s *Shard) TreeGetByPath(ctx context.Context, cid cidSDK.ID, treeID string, attr string, path []string, latest bool) ([]pilorama.Node, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "Shard.TreeGetByPath",
trace.WithAttributes(
attribute.String("shard_id", s.ID().String()),
attribute.String("container_id", cid.EncodeToString()),
attribute.String("tree_id", treeID),
attribute.String("attr", attr),
attribute.Int("path_count", len(path)),
attribute.Bool("latest", latest),
),
)
defer span.End()
if s.pilorama == nil {
return nil, ErrPiloramaDisabled
}
s.m.RLock()
defer s.m.RUnlock()
if s.info.Mode.NoMetabase() {
return nil, ErrDegradedMode
}
return s.pilorama.TreeGetByPath(ctx, cid, treeID, attr, path, latest)
}
// TreeGetMeta implements the pilorama.Forest interface.
func (s *Shard) TreeGetMeta(ctx context.Context, cid cidSDK.ID, treeID string, nodeID pilorama.Node) (pilorama.Meta, uint64, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "Shard.TreeGetMeta",
trace.WithAttributes(
attribute.String("shard_id", s.ID().String()),
attribute.String("container_id", cid.EncodeToString()),
attribute.String("tree_id", treeID),
attribute.String("node_id", strconv.FormatUint(nodeID, 10)),
),
)
defer span.End()
if s.pilorama == nil {
return pilorama.Meta{}, 0, ErrPiloramaDisabled
}
s.m.RLock()
defer s.m.RUnlock()
if s.info.Mode.NoMetabase() {
return pilorama.Meta{}, 0, ErrDegradedMode
}
return s.pilorama.TreeGetMeta(ctx, cid, treeID, nodeID)
}
// TreeGetChildren implements the pilorama.Forest interface.
func (s *Shard) TreeGetChildren(ctx context.Context, cid cidSDK.ID, treeID string, nodeID pilorama.Node) ([]pilorama.NodeInfo, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "Shard.TreeGetChildren",
trace.WithAttributes(
attribute.String("shard_id", s.ID().String()),
attribute.String("container_id", cid.EncodeToString()),
attribute.String("tree_id", treeID),
attribute.String("node_id", strconv.FormatUint(nodeID, 10)),
),
)
defer span.End()
if s.pilorama == nil {
return nil, ErrPiloramaDisabled
}
s.m.RLock()
defer s.m.RUnlock()
if s.info.Mode.NoMetabase() {
return nil, ErrDegradedMode
}
return s.pilorama.TreeGetChildren(ctx, cid, treeID, nodeID)
}
// TreeGetOpLog implements the pilorama.Forest interface.
func (s *Shard) TreeGetOpLog(ctx context.Context, cid cidSDK.ID, treeID string, height uint64) (pilorama.Move, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "Shard.TreeGetOpLog",
trace.WithAttributes(
attribute.String("shard_id", s.ID().String()),
attribute.String("container_id", cid.EncodeToString()),
attribute.String("tree_id", treeID),
attribute.String("height", strconv.FormatUint(height, 10)),
),
)
defer span.End()
if s.pilorama == nil {
return pilorama.Move{}, ErrPiloramaDisabled
}
s.m.RLock()
defer s.m.RUnlock()
if s.info.Mode.NoMetabase() {
return pilorama.Move{}, ErrDegradedMode
}
return s.pilorama.TreeGetOpLog(ctx, cid, treeID, height)
}
// TreeDrop implements the pilorama.Forest interface.
func (s *Shard) TreeDrop(ctx context.Context, cid cidSDK.ID, treeID string) error {
ctx, span := tracing.StartSpanFromContext(ctx, "Shard.TreeDrop",
trace.WithAttributes(
attribute.String("shard_id", s.ID().String()),
attribute.String("container_id", cid.EncodeToString()),
attribute.String("tree_id", treeID),
),
)
defer span.End()
if s.pilorama == nil {
return ErrPiloramaDisabled
}
s.m.RLock()
defer s.m.RUnlock()
if s.info.Mode.NoMetabase() {
return ErrDegradedMode
}
return s.pilorama.TreeDrop(ctx, cid, treeID)
}
// TreeList implements the pilorama.Forest interface.
func (s *Shard) TreeList(ctx context.Context, cid cidSDK.ID) ([]string, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "Shard.TreeList",
trace.WithAttributes(
attribute.String("shard_id", s.ID().String()),
attribute.String("container_id", cid.EncodeToString()),
),
)
defer span.End()
if s.pilorama == nil {
return nil, ErrPiloramaDisabled
}
s.m.RLock()
defer s.m.RUnlock()
if s.info.Mode.NoMetabase() {
return nil, ErrDegradedMode
}
return s.pilorama.TreeList(ctx, cid)
}
func (s *Shard) TreeHeight(ctx context.Context, cid cidSDK.ID, treeID string) (uint64, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "Shard.TreeHeight",
trace.WithAttributes(
attribute.String("shard_id", s.ID().String()),
attribute.String("container_id", cid.EncodeToString()),
attribute.String("tree_id", treeID),
),
)
defer span.End()
s.m.RLock()
defer s.m.RUnlock()
if s.info.Mode.NoMetabase() {
return 0, ErrDegradedMode
}
if s.pilorama == nil {
return 0, ErrPiloramaDisabled
}
return s.pilorama.TreeHeight(ctx, cid, treeID)
}
// TreeExists implements the pilorama.Forest interface.
func (s *Shard) TreeExists(ctx context.Context, cid cidSDK.ID, treeID string) (bool, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "Shard.TreeExists",
trace.WithAttributes(
attribute.String("shard_id", s.ID().String()),
attribute.String("container_id", cid.EncodeToString()),
attribute.String("tree_id", treeID),
),
)
defer span.End()
if s.pilorama == nil {
return false, ErrPiloramaDisabled
}
s.m.RLock()
defer s.m.RUnlock()
if s.info.Mode.NoMetabase() {
return false, ErrDegradedMode
}
return s.pilorama.TreeExists(ctx, cid, treeID)
}
// TreeUpdateLastSyncHeight implements the pilorama.Forest interface.
func (s *Shard) TreeUpdateLastSyncHeight(ctx context.Context, cid cidSDK.ID, treeID string, height uint64) error {
ctx, span := tracing.StartSpanFromContext(ctx, "Shard.TreeUpdateLastSyncHeight",
trace.WithAttributes(
attribute.String("shard_id", s.ID().String()),
attribute.String("container_id", cid.EncodeToString()),
attribute.String("tree_id", treeID),
attribute.String("height", strconv.FormatUint(height, 10)),
),
)
defer span.End()
if s.pilorama == nil {
return ErrPiloramaDisabled
}
s.m.RLock()
defer s.m.RUnlock()
if s.info.Mode.ReadOnly() {
return ErrReadOnlyMode
}
if s.info.Mode.NoMetabase() {
return ErrDegradedMode
}
return s.pilorama.TreeUpdateLastSyncHeight(ctx, cid, treeID, height)
}
// TreeLastSyncHeight implements the pilorama.Forest interface.
func (s *Shard) TreeLastSyncHeight(ctx context.Context, cid cidSDK.ID, treeID string) (uint64, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "Shard.TreeLastSyncHeight",
trace.WithAttributes(
attribute.String("shard_id", s.ID().String()),
attribute.String("container_id", cid.EncodeToString()),
attribute.String("tree_id", treeID),
),
)
defer span.End()
if s.pilorama == nil {
return 0, ErrPiloramaDisabled
}
s.m.RLock()
defer s.m.RUnlock()
if s.info.Mode.NoMetabase() {
return 0, ErrDegradedMode
}
return s.pilorama.TreeLastSyncHeight(ctx, cid, treeID)
}
func (s *Shard) TreeListTrees(ctx context.Context, prm pilorama.TreeListTreesPrm) (*pilorama.TreeListTreesResult, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "Shard.TreeListTrees",
trace.WithAttributes(
attribute.String("shard_id", s.ID().String()),
),
)
defer span.End()
if s.pilorama == nil {
return nil, ErrPiloramaDisabled
}
s.m.RLock()
defer s.m.RUnlock()
if s.info.Mode.NoMetabase() {
return nil, ErrDegradedMode
}
return s.pilorama.TreeListTrees(ctx, prm)
}
func (s *Shard) PiloramaEnabled() bool {
return s.pilorama != nil
}
func (s *Shard) TreeApplyStream(ctx context.Context, cnr cidSDK.ID, treeID string, source <-chan *pilorama.Move) error {
ctx, span := tracing.StartSpanFromContext(ctx, "Shard.TreeApplyStream",
trace.WithAttributes(
attribute.String("shard_id", s.ID().String()),
attribute.String("container_id", cnr.EncodeToString()),
attribute.String("tree_id", treeID)),
)
defer span.End()
if s.pilorama == nil {
return ErrPiloramaDisabled
}
s.m.RLock()
defer s.m.RUnlock()
if s.info.Mode.ReadOnly() {
return ErrReadOnlyMode
}
if s.info.Mode.NoMetabase() {
return ErrDegradedMode
}
return s.pilorama.TreeApplyStream(ctx, cnr, treeID, source)
}