frostfs-node/pkg/local_object_storage/engine/tree.go
Evgenii Stratonikov 9d1c915c42 [#1251] pilorama: Allow traversing multiple branches in parallel
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2024-07-18 14:18:06 +03:00

442 lines
14 KiB
Go

package engine
import (
"context"
"errors"
"strconv"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
tracingPkg "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
cidSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
)
var _ pilorama.Forest = (*StorageEngine)(nil)
// TreeMove implements the pilorama.Forest interface.
func (e *StorageEngine) TreeMove(ctx context.Context, d pilorama.CIDDescriptor, treeID string, m *pilorama.Move) (*pilorama.Move, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.TreeMove",
trace.WithAttributes(
attribute.String("container_id", d.CID.EncodeToString()),
attribute.Int("position", d.Position),
attribute.Int("size", d.Size),
attribute.String("tree_id", treeID),
),
)
defer span.End()
index, lst, err := e.getTreeShard(ctx, d.CID, treeID)
if err != nil && !errors.Is(err, pilorama.ErrTreeNotFound) {
return nil, err
}
lm, err := lst[index].TreeMove(ctx, d, treeID, m)
if err != nil {
if !errors.Is(err, shard.ErrReadOnlyMode) && err != shard.ErrPiloramaDisabled {
e.reportShardError(lst[index], "can't perform `TreeMove`", err,
zap.Stringer("cid", d.CID),
zap.String("tree", treeID),
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
}
return nil, err
}
return lm, nil
}
// TreeAddByPath implements the pilorama.Forest interface.
func (e *StorageEngine) TreeAddByPath(ctx context.Context, d pilorama.CIDDescriptor, treeID string, attr string, path []string, m []pilorama.KeyValue) ([]pilorama.Move, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.TreeAddByPath",
trace.WithAttributes(
attribute.String("container_id", d.CID.EncodeToString()),
attribute.Int("position", d.Position),
attribute.Int("size", d.Size),
attribute.String("tree_id", treeID),
attribute.String("attr", attr),
attribute.Int("path_count", len(path)),
attribute.Int("meta_count", len(m)),
),
)
defer span.End()
index, lst, err := e.getTreeShard(ctx, d.CID, treeID)
if err != nil && !errors.Is(err, pilorama.ErrTreeNotFound) {
return nil, err
}
lm, err := lst[index].TreeAddByPath(ctx, d, treeID, attr, path, m)
if err != nil {
if !errors.Is(err, shard.ErrReadOnlyMode) && err != shard.ErrPiloramaDisabled {
e.reportShardError(lst[index], "can't perform `TreeAddByPath`", err,
zap.Stringer("cid", d.CID),
zap.String("tree", treeID),
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
}
return nil, err
}
return lm, nil
}
// TreeApply implements the pilorama.Forest interface.
func (e *StorageEngine) TreeApply(ctx context.Context, cnr cidSDK.ID, treeID string, m *pilorama.Move, backgroundSync bool) error {
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.TreeApply",
trace.WithAttributes(
attribute.String("container_id", cnr.EncodeToString()),
attribute.String("tree_id", treeID),
attribute.Bool("background", backgroundSync),
),
)
defer span.End()
index, lst, err := e.getTreeShard(ctx, cnr, treeID)
if err != nil && !errors.Is(err, pilorama.ErrTreeNotFound) {
return err
}
err = lst[index].TreeApply(ctx, cnr, treeID, m, backgroundSync)
if err != nil {
if !errors.Is(err, shard.ErrReadOnlyMode) && err != shard.ErrPiloramaDisabled {
e.reportShardError(lst[index], "can't perform `TreeApply`", err,
zap.Stringer("cid", cnr),
zap.String("tree", treeID),
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
}
return err
}
return nil
}
// TreeGetByPath implements the pilorama.Forest interface.
func (e *StorageEngine) TreeGetByPath(ctx context.Context, cid cidSDK.ID, treeID string, attr string, path []string, latest bool) ([]pilorama.Node, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.TreeGetByPath",
trace.WithAttributes(
attribute.String("container_id", cid.EncodeToString()),
attribute.String("tree_id", treeID),
attribute.String("attr", attr),
attribute.Int("path_count", len(path)),
attribute.Bool("latest", latest),
),
)
defer span.End()
var err error
var nodes []pilorama.Node
for _, sh := range e.sortShards(cid) {
nodes, err = sh.TreeGetByPath(ctx, cid, treeID, attr, path, latest)
if err != nil {
if err == shard.ErrPiloramaDisabled {
break
}
if !errors.Is(err, pilorama.ErrTreeNotFound) {
e.reportShardError(sh, "can't perform `TreeGetByPath`", err,
zap.Stringer("cid", cid),
zap.String("tree", treeID),
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
}
continue
}
return nodes, nil
}
return nil, err
}
// TreeGetMeta implements the pilorama.Forest interface.
func (e *StorageEngine) TreeGetMeta(ctx context.Context, cid cidSDK.ID, treeID string, nodeID pilorama.Node) (pilorama.Meta, uint64, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.TreeGetMeta",
trace.WithAttributes(
attribute.String("container_id", cid.EncodeToString()),
attribute.String("tree_id", treeID),
attribute.String("node_id", strconv.FormatUint(nodeID, 10)),
),
)
defer span.End()
var err error
var m pilorama.Meta
var p uint64
for _, sh := range e.sortShards(cid) {
m, p, err = sh.TreeGetMeta(ctx, cid, treeID, nodeID)
if err != nil {
if err == shard.ErrPiloramaDisabled {
break
}
if !errors.Is(err, pilorama.ErrTreeNotFound) {
e.reportShardError(sh, "can't perform `TreeGetMeta`", err,
zap.Stringer("cid", cid),
zap.String("tree", treeID),
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
}
continue
}
return m, p, nil
}
return pilorama.Meta{}, 0, err
}
// TreeGetChildren implements the pilorama.Forest interface.
func (e *StorageEngine) TreeGetChildren(ctx context.Context, cid cidSDK.ID, treeID string, nodeID pilorama.Node) ([]pilorama.NodeInfo, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.TreeGetChildren",
trace.WithAttributes(
attribute.String("container_id", cid.EncodeToString()),
attribute.String("tree_id", treeID),
attribute.String("node_id", strconv.FormatUint(nodeID, 10)),
),
)
defer span.End()
var err error
var nodes []pilorama.NodeInfo
for _, sh := range e.sortShards(cid) {
nodes, err = sh.TreeGetChildren(ctx, cid, treeID, nodeID)
if err != nil {
if err == shard.ErrPiloramaDisabled {
break
}
if !errors.Is(err, pilorama.ErrTreeNotFound) {
e.reportShardError(sh, "can't perform `TreeGetChildren`", err,
zap.Stringer("cid", cid),
zap.String("tree", treeID),
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
}
continue
}
return nodes, nil
}
return nil, err
}
// TreeSortedByFilename implements the pilorama.Forest interface.
func (e *StorageEngine) TreeSortedByFilename(ctx context.Context, cid cidSDK.ID, treeID string, nodeID pilorama.MultiNode, last *string, count int) ([]pilorama.MultiNodeInfo, *string, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.TreeSortedByFilename",
trace.WithAttributes(
attribute.String("container_id", cid.EncodeToString()),
attribute.String("tree_id", treeID),
),
)
defer span.End()
var err error
var nodes []pilorama.MultiNodeInfo
var cursor *string
for _, sh := range e.sortShards(cid) {
nodes, cursor, err = sh.TreeSortedByFilename(ctx, cid, treeID, nodeID, last, count)
if err != nil {
if err == shard.ErrPiloramaDisabled {
break
}
if !errors.Is(err, pilorama.ErrTreeNotFound) {
e.reportShardError(sh, "can't perform `TreeSortedByFilename`", err,
zap.Stringer("cid", cid),
zap.String("tree", treeID),
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
}
continue
}
return nodes, cursor, nil
}
return nil, last, err
}
// TreeGetOpLog implements the pilorama.Forest interface.
func (e *StorageEngine) TreeGetOpLog(ctx context.Context, cid cidSDK.ID, treeID string, height uint64) (pilorama.Move, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.TreeGetOpLog",
trace.WithAttributes(
attribute.String("container_id", cid.EncodeToString()),
attribute.String("tree_id", treeID),
attribute.String("height", strconv.FormatUint(height, 10)),
),
)
defer span.End()
var err error
var lm pilorama.Move
for _, sh := range e.sortShards(cid) {
lm, err = sh.TreeGetOpLog(ctx, cid, treeID, height)
if err != nil {
if err == shard.ErrPiloramaDisabled {
break
}
if !errors.Is(err, pilorama.ErrTreeNotFound) {
e.reportShardError(sh, "can't perform `TreeGetOpLog`", err,
zap.Stringer("cid", cid),
zap.String("tree", treeID),
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
}
continue
}
return lm, nil
}
return lm, err
}
// TreeDrop implements the pilorama.Forest interface.
func (e *StorageEngine) TreeDrop(ctx context.Context, cid cidSDK.ID, treeID string) error {
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.TreeDrop",
trace.WithAttributes(
attribute.String("container_id", cid.EncodeToString()),
attribute.String("tree_id", treeID),
),
)
defer span.End()
var err error
for _, sh := range e.sortShards(cid) {
err = sh.TreeDrop(ctx, cid, treeID)
if err != nil {
if err == shard.ErrPiloramaDisabled {
break
}
if !errors.Is(err, pilorama.ErrTreeNotFound) && !errors.Is(err, shard.ErrReadOnlyMode) {
e.reportShardError(sh, "can't perform `TreeDrop`", err,
zap.Stringer("cid", cid),
zap.String("tree", treeID),
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
}
continue
}
return nil
}
return err
}
// TreeList implements the pilorama.Forest interface.
func (e *StorageEngine) TreeList(ctx context.Context, cid cidSDK.ID) ([]string, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.TreeList",
trace.WithAttributes(
attribute.String("container_id", cid.EncodeToString()),
),
)
defer span.End()
var resIDs []string
for _, sh := range e.unsortedShards() {
ids, err := sh.TreeList(ctx, cid)
if err != nil {
if errors.Is(err, shard.ErrPiloramaDisabled) || errors.Is(err, shard.ErrReadOnlyMode) {
return nil, err
}
e.reportShardError(sh, "can't perform `TreeList`", err,
zap.Stringer("cid", cid),
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
// returns as much info about
// trees as possible
continue
}
resIDs = append(resIDs, ids...)
}
return resIDs, nil
}
// TreeExists implements the pilorama.Forest interface.
func (e *StorageEngine) TreeExists(ctx context.Context, cid cidSDK.ID, treeID string) (bool, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.TreeExists",
trace.WithAttributes(
attribute.String("container_id", cid.EncodeToString()),
attribute.String("tree_id", treeID),
),
)
defer span.End()
_, _, err := e.getTreeShard(ctx, cid, treeID)
if errors.Is(err, pilorama.ErrTreeNotFound) {
return false, nil
}
return err == nil, err
}
func (e *StorageEngine) TreeHeight(ctx context.Context, cid cidSDK.ID, treeID string) (uint64, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.TreeHeight",
trace.WithAttributes(
attribute.String("container_id", cid.EncodeToString()),
attribute.String("tree_id", treeID),
),
)
defer span.End()
index, lst, err := e.getTreeShard(ctx, cid, treeID)
if err != nil {
return 0, nil
}
return lst[index].TreeHeight(ctx, cid, treeID)
}
// TreeUpdateLastSyncHeight implements the pilorama.Forest interface.
func (e *StorageEngine) TreeUpdateLastSyncHeight(ctx context.Context, cid cidSDK.ID, treeID string, height uint64) error {
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.TreeUpdateLastSyncHeight",
trace.WithAttributes(
attribute.String("container_id", cid.EncodeToString()),
attribute.String("tree_id", treeID),
attribute.String("height", strconv.FormatUint(height, 10)),
),
)
defer span.End()
index, lst, err := e.getTreeShard(ctx, cid, treeID)
if err != nil && !errors.Is(err, pilorama.ErrTreeNotFound) {
return err
}
err = lst[index].TreeUpdateLastSyncHeight(ctx, cid, treeID, height)
if err != nil && !errors.Is(err, shard.ErrReadOnlyMode) && err != shard.ErrPiloramaDisabled {
e.reportShardError(lst[index], "can't update tree synchronization height", err,
zap.Stringer("cid", cid),
zap.String("tree", treeID),
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
}
return err
}
// TreeLastSyncHeight implements the pilorama.Forest interface.
func (e *StorageEngine) TreeLastSyncHeight(ctx context.Context, cid cidSDK.ID, treeID string) (uint64, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.TreeLastSyncHeight",
trace.WithAttributes(
attribute.String("container_id", cid.EncodeToString()),
attribute.String("tree_id", treeID),
),
)
defer span.End()
var err error
var height uint64
for _, sh := range e.sortShards(cid) {
height, err = sh.TreeLastSyncHeight(ctx, cid, treeID)
if err != nil {
if err == shard.ErrPiloramaDisabled {
break
}
if !errors.Is(err, pilorama.ErrTreeNotFound) {
e.reportShardError(sh, "can't read tree synchronization height", err,
zap.Stringer("cid", cid),
zap.String("tree", treeID),
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
}
continue
}
return height, err
}
return height, err
}
func (e *StorageEngine) getTreeShard(ctx context.Context, cid cidSDK.ID, treeID string) (int, []hashedShard, error) {
lst := e.sortShards(cid)
for i, sh := range lst {
exists, err := sh.TreeExists(ctx, cid, treeID)
if err != nil {
return 0, nil, err
}
if exists {
return i, lst, err
}
}
return 0, lst, pilorama.ErrTreeNotFound
}