package shard import ( "context" "strconv" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" cidSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" ) var _ pilorama.Forest = (*Shard)(nil) // ErrPiloramaDisabled is returned when pilorama was disabled in the configuration. var ErrPiloramaDisabled = logicerr.New("pilorama is disabled") // TreeMove implements the pilorama.Forest interface. func (s *Shard) TreeMove(ctx context.Context, d pilorama.CIDDescriptor, treeID string, m *pilorama.Move) (*pilorama.Move, error) { ctx, span := tracing.StartSpanFromContext(ctx, "Shard.TreeMove", trace.WithAttributes( attribute.String("shard_id", s.ID().String()), attribute.String("container_id", d.CID.EncodeToString()), attribute.Int("position", d.Position), attribute.Int("size", d.Size), attribute.String("tree_id", treeID), ), ) defer span.End() if s.pilorama == nil { return nil, ErrPiloramaDisabled } s.m.RLock() defer s.m.RUnlock() if s.info.Mode.ReadOnly() { return nil, ErrReadOnlyMode } if s.info.Mode.NoMetabase() { return nil, ErrDegradedMode } return s.pilorama.TreeMove(ctx, d, treeID, m) } // TreeAddByPath implements the pilorama.Forest interface. func (s *Shard) TreeAddByPath(ctx context.Context, d pilorama.CIDDescriptor, treeID string, attr string, path []string, meta []pilorama.KeyValue) ([]pilorama.Move, error) { ctx, span := tracing.StartSpanFromContext(ctx, "Shard.TreeAddByPath", trace.WithAttributes( attribute.String("shard_id", s.ID().String()), attribute.String("container_id", d.CID.EncodeToString()), attribute.Int("position", d.Position), attribute.Int("size", d.Size), attribute.String("tree_id", treeID), attribute.String("attr", attr), attribute.Int("path_count", len(path)), attribute.Int("meta_count", len(meta)), ), ) defer span.End() if s.pilorama == nil { return nil, ErrPiloramaDisabled } s.m.RLock() defer s.m.RUnlock() if s.info.Mode.ReadOnly() { return nil, ErrReadOnlyMode } if s.info.Mode.NoMetabase() { return nil, ErrDegradedMode } return s.pilorama.TreeAddByPath(ctx, d, treeID, attr, path, meta) } // TreeApply implements the pilorama.Forest interface. func (s *Shard) TreeApply(ctx context.Context, cnr cidSDK.ID, treeID string, m *pilorama.Move, backgroundSync bool) error { ctx, span := tracing.StartSpanFromContext(ctx, "Shard.TreeApply", trace.WithAttributes( attribute.String("shard_id", s.ID().String()), attribute.String("container_id", cnr.EncodeToString()), attribute.String("tree_id", treeID), attribute.Bool("background", backgroundSync), ), ) defer span.End() if s.pilorama == nil { return ErrPiloramaDisabled } s.m.RLock() defer s.m.RUnlock() if s.info.Mode.ReadOnly() { return ErrReadOnlyMode } if s.info.Mode.NoMetabase() { return ErrDegradedMode } return s.pilorama.TreeApply(ctx, cnr, treeID, m, backgroundSync) } // TreeGetByPath implements the pilorama.Forest interface. func (s *Shard) TreeGetByPath(ctx context.Context, cid cidSDK.ID, treeID string, attr string, path []string, latest bool) ([]pilorama.Node, error) { ctx, span := tracing.StartSpanFromContext(ctx, "Shard.TreeGetByPath", trace.WithAttributes( attribute.String("shard_id", s.ID().String()), attribute.String("container_id", cid.EncodeToString()), attribute.String("tree_id", treeID), attribute.String("attr", attr), attribute.Int("path_count", len(path)), attribute.Bool("latest", latest), ), ) defer span.End() if s.pilorama == nil { return nil, ErrPiloramaDisabled } s.m.RLock() defer s.m.RUnlock() if s.info.Mode.NoMetabase() { return nil, ErrDegradedMode } return s.pilorama.TreeGetByPath(ctx, cid, treeID, attr, path, latest) } // TreeGetMeta implements the pilorama.Forest interface. func (s *Shard) TreeGetMeta(ctx context.Context, cid cidSDK.ID, treeID string, nodeID pilorama.Node) (pilorama.Meta, uint64, error) { ctx, span := tracing.StartSpanFromContext(ctx, "Shard.TreeGetMeta", trace.WithAttributes( attribute.String("shard_id", s.ID().String()), attribute.String("container_id", cid.EncodeToString()), attribute.String("tree_id", treeID), attribute.String("node_id", strconv.FormatUint(nodeID, 10)), ), ) defer span.End() if s.pilorama == nil { return pilorama.Meta{}, 0, ErrPiloramaDisabled } s.m.RLock() defer s.m.RUnlock() if s.info.Mode.NoMetabase() { return pilorama.Meta{}, 0, ErrDegradedMode } return s.pilorama.TreeGetMeta(ctx, cid, treeID, nodeID) } // TreeGetChildren implements the pilorama.Forest interface. func (s *Shard) TreeGetChildren(ctx context.Context, cid cidSDK.ID, treeID string, nodeID pilorama.Node) ([]pilorama.NodeInfo, error) { ctx, span := tracing.StartSpanFromContext(ctx, "Shard.TreeGetChildren", trace.WithAttributes( attribute.String("shard_id", s.ID().String()), attribute.String("container_id", cid.EncodeToString()), attribute.String("tree_id", treeID), attribute.String("node_id", strconv.FormatUint(nodeID, 10)), ), ) defer span.End() if s.pilorama == nil { return nil, ErrPiloramaDisabled } s.m.RLock() defer s.m.RUnlock() if s.info.Mode.NoMetabase() { return nil, ErrDegradedMode } return s.pilorama.TreeGetChildren(ctx, cid, treeID, nodeID) } // TreeSortedByFilename implements the pilorama.Forest interface. func (s *Shard) TreeSortedByFilename(ctx context.Context, cid cidSDK.ID, treeID string, nodeID pilorama.Node, last *string, count int) ([]pilorama.NodeInfo, *string, error) { ctx, span := tracing.StartSpanFromContext(ctx, "Shard.TreeSortedByFilename", trace.WithAttributes( attribute.String("shard_id", s.ID().String()), attribute.String("container_id", cid.EncodeToString()), attribute.String("tree_id", treeID), attribute.String("node_id", strconv.FormatUint(nodeID, 10)), ), ) defer span.End() if s.pilorama == nil { return nil, last, ErrPiloramaDisabled } s.m.RLock() defer s.m.RUnlock() if s.info.Mode.NoMetabase() { return nil, last, ErrDegradedMode } return s.pilorama.TreeSortedByFilename(ctx, cid, treeID, nodeID, last, count) } // TreeGetOpLog implements the pilorama.Forest interface. func (s *Shard) TreeGetOpLog(ctx context.Context, cid cidSDK.ID, treeID string, height uint64) (pilorama.Move, error) { ctx, span := tracing.StartSpanFromContext(ctx, "Shard.TreeGetOpLog", trace.WithAttributes( attribute.String("shard_id", s.ID().String()), attribute.String("container_id", cid.EncodeToString()), attribute.String("tree_id", treeID), attribute.String("height", strconv.FormatUint(height, 10)), ), ) defer span.End() if s.pilorama == nil { return pilorama.Move{}, ErrPiloramaDisabled } s.m.RLock() defer s.m.RUnlock() if s.info.Mode.NoMetabase() { return pilorama.Move{}, ErrDegradedMode } return s.pilorama.TreeGetOpLog(ctx, cid, treeID, height) } // TreeDrop implements the pilorama.Forest interface. func (s *Shard) TreeDrop(ctx context.Context, cid cidSDK.ID, treeID string) error { ctx, span := tracing.StartSpanFromContext(ctx, "Shard.TreeDrop", trace.WithAttributes( attribute.String("shard_id", s.ID().String()), attribute.String("container_id", cid.EncodeToString()), attribute.String("tree_id", treeID), ), ) defer span.End() if s.pilorama == nil { return ErrPiloramaDisabled } s.m.RLock() defer s.m.RUnlock() if s.info.Mode.NoMetabase() { return ErrDegradedMode } return s.pilorama.TreeDrop(ctx, cid, treeID) } // TreeList implements the pilorama.Forest interface. func (s *Shard) TreeList(ctx context.Context, cid cidSDK.ID) ([]string, error) { ctx, span := tracing.StartSpanFromContext(ctx, "Shard.TreeList", trace.WithAttributes( attribute.String("shard_id", s.ID().String()), attribute.String("container_id", cid.EncodeToString()), ), ) defer span.End() if s.pilorama == nil { return nil, ErrPiloramaDisabled } s.m.RLock() defer s.m.RUnlock() if s.info.Mode.NoMetabase() { return nil, ErrDegradedMode } return s.pilorama.TreeList(ctx, cid) } func (s *Shard) TreeHeight(ctx context.Context, cid cidSDK.ID, treeID string) (uint64, error) { ctx, span := tracing.StartSpanFromContext(ctx, "Shard.TreeHeight", trace.WithAttributes( attribute.String("shard_id", s.ID().String()), attribute.String("container_id", cid.EncodeToString()), attribute.String("tree_id", treeID), ), ) defer span.End() s.m.RLock() defer s.m.RUnlock() if s.info.Mode.NoMetabase() { return 0, ErrDegradedMode } if s.pilorama == nil { return 0, ErrPiloramaDisabled } return s.pilorama.TreeHeight(ctx, cid, treeID) } // TreeExists implements the pilorama.Forest interface. func (s *Shard) TreeExists(ctx context.Context, cid cidSDK.ID, treeID string) (bool, error) { ctx, span := tracing.StartSpanFromContext(ctx, "Shard.TreeExists", trace.WithAttributes( attribute.String("shard_id", s.ID().String()), attribute.String("container_id", cid.EncodeToString()), attribute.String("tree_id", treeID), ), ) defer span.End() if s.pilorama == nil { return false, ErrPiloramaDisabled } s.m.RLock() defer s.m.RUnlock() if s.info.Mode.NoMetabase() { return false, ErrDegradedMode } return s.pilorama.TreeExists(ctx, cid, treeID) } // TreeUpdateLastSyncHeight implements the pilorama.Forest interface. func (s *Shard) TreeUpdateLastSyncHeight(ctx context.Context, cid cidSDK.ID, treeID string, height uint64) error { ctx, span := tracing.StartSpanFromContext(ctx, "Shard.TreeUpdateLastSyncHeight", trace.WithAttributes( attribute.String("shard_id", s.ID().String()), attribute.String("container_id", cid.EncodeToString()), attribute.String("tree_id", treeID), attribute.String("height", strconv.FormatUint(height, 10)), ), ) defer span.End() if s.pilorama == nil { return ErrPiloramaDisabled } s.m.RLock() defer s.m.RUnlock() if s.info.Mode.ReadOnly() { return ErrReadOnlyMode } if s.info.Mode.NoMetabase() { return ErrDegradedMode } return s.pilorama.TreeUpdateLastSyncHeight(ctx, cid, treeID, height) } // TreeLastSyncHeight implements the pilorama.Forest interface. func (s *Shard) TreeLastSyncHeight(ctx context.Context, cid cidSDK.ID, treeID string) (uint64, error) { ctx, span := tracing.StartSpanFromContext(ctx, "Shard.TreeLastSyncHeight", trace.WithAttributes( attribute.String("shard_id", s.ID().String()), attribute.String("container_id", cid.EncodeToString()), attribute.String("tree_id", treeID), ), ) defer span.End() if s.pilorama == nil { return 0, ErrPiloramaDisabled } s.m.RLock() defer s.m.RUnlock() if s.info.Mode.NoMetabase() { return 0, ErrDegradedMode } return s.pilorama.TreeLastSyncHeight(ctx, cid, treeID) } func (s *Shard) TreeListTrees(ctx context.Context, prm pilorama.TreeListTreesPrm) (*pilorama.TreeListTreesResult, error) { ctx, span := tracing.StartSpanFromContext(ctx, "Shard.TreeListTrees", trace.WithAttributes( attribute.String("shard_id", s.ID().String()), ), ) defer span.End() if s.pilorama == nil { return nil, ErrPiloramaDisabled } s.m.RLock() defer s.m.RUnlock() if s.info.Mode.NoMetabase() { return nil, ErrDegradedMode } return s.pilorama.TreeListTrees(ctx, prm) } func (s *Shard) PiloramaEnabled() bool { return s.pilorama != nil } func (s *Shard) TreeApplyStream(ctx context.Context, cnr cidSDK.ID, treeID string, source <-chan *pilorama.Move) error { ctx, span := tracing.StartSpanFromContext(ctx, "Shard.TreeApplyStream", trace.WithAttributes( attribute.String("shard_id", s.ID().String()), attribute.String("container_id", cnr.EncodeToString()), attribute.String("tree_id", treeID)), ) defer span.End() if s.pilorama == nil { return ErrPiloramaDisabled } s.m.RLock() defer s.m.RUnlock() if s.info.Mode.ReadOnly() { return ErrReadOnlyMode } if s.info.Mode.NoMetabase() { return ErrDegradedMode } return s.pilorama.TreeApplyStream(ctx, cnr, treeID, source) }