Denis Kirillov
456319d2f1
Update tree service to fix split tree problem. Tree intermediate nodes can be duplicated, so we must handle this. Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
257 lines
6.7 KiB
Go
257 lines
6.7 KiB
Go
package services
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
|
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/middleware"
|
|
errorsFrost "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/frostfs/errors"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/pkg/service/tree"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer"
|
|
treepool "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool/tree"
|
|
grpcService "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool/tree/service"
|
|
)
|
|
|
|
type GetNodeByPathResponseInfoWrapper struct {
|
|
response *grpcService.GetNodeByPathResponse_Info
|
|
}
|
|
|
|
func (n GetNodeByPathResponseInfoWrapper) GetNodeID() []uint64 {
|
|
return []uint64{n.response.GetNodeId()}
|
|
}
|
|
|
|
func (n GetNodeByPathResponseInfoWrapper) GetParentID() []uint64 {
|
|
return []uint64{n.response.GetParentId()}
|
|
}
|
|
|
|
func (n GetNodeByPathResponseInfoWrapper) GetTimestamp() []uint64 {
|
|
return []uint64{n.response.GetTimestamp()}
|
|
}
|
|
|
|
func (n GetNodeByPathResponseInfoWrapper) GetMeta() []tree.Meta {
|
|
res := make([]tree.Meta, len(n.response.Meta))
|
|
for i, value := range n.response.Meta {
|
|
res[i] = value
|
|
}
|
|
return res
|
|
}
|
|
|
|
type GetSubTreeResponseBodyWrapper struct {
|
|
response *grpcService.GetSubTreeResponse_Body
|
|
}
|
|
|
|
func (n GetSubTreeResponseBodyWrapper) GetNodeID() []uint64 {
|
|
return n.response.GetNodeId()
|
|
}
|
|
|
|
func (n GetSubTreeResponseBodyWrapper) GetParentID() []uint64 {
|
|
return n.response.GetParentId()
|
|
}
|
|
|
|
func (n GetSubTreeResponseBodyWrapper) GetTimestamp() []uint64 {
|
|
return n.response.GetTimestamp()
|
|
}
|
|
|
|
func (n GetSubTreeResponseBodyWrapper) GetMeta() []tree.Meta {
|
|
res := make([]tree.Meta, len(n.response.Meta))
|
|
for i, value := range n.response.Meta {
|
|
res[i] = value
|
|
}
|
|
return res
|
|
}
|
|
|
|
type PoolWrapper struct {
|
|
p *treepool.Pool
|
|
}
|
|
|
|
func NewPoolWrapper(p *treepool.Pool) *PoolWrapper {
|
|
return &PoolWrapper{p: p}
|
|
}
|
|
|
|
func (w *PoolWrapper) GetNodes(ctx context.Context, prm *tree.GetNodesParams) ([]tree.NodeResponse, error) {
|
|
poolPrm := treepool.GetNodesParams{
|
|
CID: prm.BktInfo.CID,
|
|
TreeID: prm.TreeID,
|
|
Path: prm.Path,
|
|
Meta: prm.Meta,
|
|
PathAttribute: tree.FileNameKey,
|
|
LatestOnly: prm.LatestOnly,
|
|
AllAttrs: prm.AllAttrs,
|
|
BearerToken: getBearer(ctx, prm.BktInfo),
|
|
}
|
|
|
|
nodes, err := w.p.GetNodes(ctx, poolPrm)
|
|
if err != nil {
|
|
return nil, handleError(err)
|
|
}
|
|
|
|
res := make([]tree.NodeResponse, len(nodes))
|
|
for i, info := range nodes {
|
|
res[i] = GetNodeByPathResponseInfoWrapper{info}
|
|
}
|
|
|
|
return res, nil
|
|
}
|
|
|
|
func (w *PoolWrapper) GetSubTree(ctx context.Context, bktInfo *data.BucketInfo, treeID string, rootID []uint64, depth uint32) ([]tree.NodeResponse, error) {
|
|
poolPrm := treepool.GetSubTreeParams{
|
|
CID: bktInfo.CID,
|
|
TreeID: treeID,
|
|
RootID: rootID,
|
|
Depth: depth,
|
|
BearerToken: getBearer(ctx, bktInfo),
|
|
Order: treepool.AscendingOrder,
|
|
}
|
|
|
|
subTreeReader, err := w.p.GetSubTree(ctx, poolPrm)
|
|
if err != nil {
|
|
return nil, handleError(err)
|
|
}
|
|
|
|
var subtree []tree.NodeResponse
|
|
|
|
node, err := subTreeReader.Next()
|
|
for err == nil {
|
|
subtree = append(subtree, GetSubTreeResponseBodyWrapper{node})
|
|
node, err = subTreeReader.Next()
|
|
}
|
|
if err != nil && err != io.EOF {
|
|
return nil, handleError(err)
|
|
}
|
|
|
|
return subtree, nil
|
|
}
|
|
|
|
type SubTreeStreamImpl struct {
|
|
r *treepool.SubTreeReader
|
|
buffer []*grpcService.GetSubTreeResponse_Body
|
|
eof bool
|
|
index int
|
|
ln int
|
|
}
|
|
|
|
const bufSize = 1000
|
|
|
|
func (s *SubTreeStreamImpl) Next() (tree.NodeResponse, error) {
|
|
if s.index != -1 {
|
|
node := s.buffer[s.index]
|
|
s.index++
|
|
if s.index >= s.ln {
|
|
s.index = -1
|
|
}
|
|
return GetSubTreeResponseBodyWrapper{response: node}, nil
|
|
}
|
|
if s.eof {
|
|
return nil, io.EOF
|
|
}
|
|
|
|
var err error
|
|
s.ln, err = s.r.Read(s.buffer)
|
|
if err != nil {
|
|
if err != io.EOF {
|
|
return nil, fmt.Errorf("sub tree stream impl pool wrap: %w", handleError(err))
|
|
}
|
|
s.eof = true
|
|
}
|
|
if s.ln > 0 {
|
|
s.index = 0
|
|
}
|
|
|
|
return s.Next()
|
|
}
|
|
|
|
func (w *PoolWrapper) GetSubTreeStream(ctx context.Context, bktInfo *data.BucketInfo, treeID string, rootID []uint64, depth uint32) (tree.SubTreeStream, error) {
|
|
poolPrm := treepool.GetSubTreeParams{
|
|
CID: bktInfo.CID,
|
|
TreeID: treeID,
|
|
RootID: rootID,
|
|
Depth: depth,
|
|
BearerToken: getBearer(ctx, bktInfo),
|
|
Order: treepool.AscendingOrder,
|
|
}
|
|
|
|
subTreeReader, err := w.p.GetSubTree(ctx, poolPrm)
|
|
if err != nil {
|
|
return nil, handleError(err)
|
|
}
|
|
|
|
return &SubTreeStreamImpl{
|
|
r: subTreeReader,
|
|
buffer: make([]*grpcService.GetSubTreeResponse_Body, bufSize),
|
|
index: -1,
|
|
}, nil
|
|
}
|
|
|
|
func (w *PoolWrapper) AddNode(ctx context.Context, bktInfo *data.BucketInfo, treeID string, parent uint64, meta map[string]string) (uint64, error) {
|
|
nodeID, err := w.p.AddNode(ctx, treepool.AddNodeParams{
|
|
CID: bktInfo.CID,
|
|
TreeID: treeID,
|
|
Parent: parent,
|
|
Meta: meta,
|
|
BearerToken: getBearer(ctx, bktInfo),
|
|
})
|
|
return nodeID, handleError(err)
|
|
}
|
|
|
|
func (w *PoolWrapper) AddNodeByPath(ctx context.Context, bktInfo *data.BucketInfo, treeID string, path []string, meta map[string]string) (uint64, error) {
|
|
nodeID, err := w.p.AddNodeByPath(ctx, treepool.AddNodeByPathParams{
|
|
CID: bktInfo.CID,
|
|
TreeID: treeID,
|
|
Path: path,
|
|
Meta: meta,
|
|
PathAttribute: tree.FileNameKey,
|
|
BearerToken: getBearer(ctx, bktInfo),
|
|
})
|
|
return nodeID, handleError(err)
|
|
}
|
|
|
|
func (w *PoolWrapper) MoveNode(ctx context.Context, bktInfo *data.BucketInfo, treeID string, nodeID, parentID uint64, meta map[string]string) error {
|
|
return handleError(w.p.MoveNode(ctx, treepool.MoveNodeParams{
|
|
CID: bktInfo.CID,
|
|
TreeID: treeID,
|
|
NodeID: nodeID,
|
|
ParentID: parentID,
|
|
Meta: meta,
|
|
BearerToken: getBearer(ctx, bktInfo),
|
|
}))
|
|
}
|
|
|
|
func (w *PoolWrapper) RemoveNode(ctx context.Context, bktInfo *data.BucketInfo, treeID string, nodeID uint64) error {
|
|
return handleError(w.p.RemoveNode(ctx, treepool.RemoveNodeParams{
|
|
CID: bktInfo.CID,
|
|
TreeID: treeID,
|
|
NodeID: nodeID,
|
|
BearerToken: getBearer(ctx, bktInfo),
|
|
}))
|
|
}
|
|
|
|
func getBearer(ctx context.Context, bktInfo *data.BucketInfo) []byte {
|
|
if bd, err := middleware.GetBoxData(ctx); err == nil {
|
|
if bd.Gate.BearerToken != nil {
|
|
if bd.Gate.BearerToken.Impersonate() || bktInfo.Owner.Equals(bearer.ResolveIssuer(*bd.Gate.BearerToken)) {
|
|
return bd.Gate.BearerToken.Marshal()
|
|
}
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func handleError(err error) error {
|
|
if err == nil {
|
|
return nil
|
|
}
|
|
if errors.Is(err, treepool.ErrNodeNotFound) {
|
|
return fmt.Errorf("%w: %s", tree.ErrNodeNotFound, err.Error())
|
|
}
|
|
if errors.Is(err, treepool.ErrNodeAccessDenied) {
|
|
return fmt.Errorf("%w: %s", tree.ErrNodeAccessDenied, err.Error())
|
|
}
|
|
if errorsFrost.IsTimeoutError(err) {
|
|
return fmt.Errorf("%w: %s", tree.ErrGatewayTimeout, err.Error())
|
|
}
|
|
|
|
return err
|
|
}
|