package tree import ( "context" "encoding/hex" "errors" "fmt" "io" "sort" "strconv" "strings" "time" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/middleware" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user" "github.com/nspcc-dev/neo-go/pkg/crypto/keys" "go.uber.org/zap" "golang.org/x/exp/maps" ) type ( Tree struct { service ServiceClient log *zap.Logger } // ServiceClient is a client to interact with tree service. // Each method must return ErrNodeNotFound or ErrNodeAccessDenied if relevant. ServiceClient interface { GetNodes(ctx context.Context, p *GetNodesParams) ([]NodeResponse, error) GetSubTree(ctx context.Context, bktInfo *data.BucketInfo, treeID string, rootID []uint64, depth uint32) ([]NodeResponse, error) GetSubTreeStream(ctx context.Context, bktInfo *data.BucketInfo, treeID string, rootID []uint64, depth uint32) (SubTreeStream, error) AddNode(ctx context.Context, bktInfo *data.BucketInfo, treeID string, parent uint64, meta map[string]string) (uint64, error) AddNodeByPath(ctx context.Context, bktInfo *data.BucketInfo, treeID string, path []string, meta map[string]string) (uint64, error) MoveNode(ctx context.Context, bktInfo *data.BucketInfo, treeID string, nodeID, parentID uint64, meta map[string]string) error RemoveNode(ctx context.Context, bktInfo *data.BucketInfo, treeID string, nodeID uint64) error } SubTreeStream interface { Next() (NodeResponse, error) } treeNode struct { ID []uint64 ParentID []uint64 ObjID oid.ID TimeStamp []uint64 Size uint64 Meta map[string]string } GetNodesParams struct { BktInfo *data.BucketInfo TreeID string Path []string Meta []string LatestOnly bool AllAttrs bool } ) const ( FileNameKey = "FileName" ) var ( // ErrNodeNotFound is returned from ServiceClient in case of not found error. ErrNodeNotFound = layer.ErrNodeNotFound // ErrNodeAccessDenied is returned from ServiceClient service in case of access denied error. ErrNodeAccessDenied = layer.ErrNodeAccessDenied // ErrGatewayTimeout is returned from ServiceClient service in case of timeout error. ErrGatewayTimeout = layer.ErrGatewayTimeout ) const ( versioningKV = "Versioning" cannedACLKV = "cannedACL" ownerKeyKV = "ownerKey" lockConfigurationKV = "LockConfiguration" oidKV = "OID" isCombinedKV = "IsCombined" isUnversionedKV = "IsUnversioned" isTagKV = "IsTag" uploadIDKV = "UploadId" partNumberKV = "Number" sizeKV = "Size" etagKV = "ETag" md5KV = "MD5" finishedKV = "Finished" // keys for lock. isLockKV = "IsLock" legalHoldOIDKV = "LegalHoldOID" retentionOIDKV = "RetentionOID" untilDateKV = "UntilDate" isComplianceKV = "IsCompliance" // keys for delete marker nodes. isDeleteMarkerKV = "IsDeleteMarker" ownerKV = "Owner" createdKV = "Created" settingsFileName = "bucket-settings" corsFilename = "bucket-cors" bucketTaggingFilename = "bucket-tagging" // versionTree -- ID of a tree with object versions. versionTree = "version" // systemTree -- ID of a tree with system objects // i.e. bucket settings with versioning and lock configuration, cors. systemTree = "system" separator = "/" userDefinedTagPrefix = "User-Tag-" maxGetSubTreeDepth = 0 // means all subTree ) // NewTree creates instance of Tree using provided address and create grpc connection. func NewTree(service ServiceClient, log *zap.Logger) *Tree { return &Tree{ service: service, log: log, } } type Meta interface { GetKey() string GetValue() []byte } type NodeResponse interface { GetMeta() []Meta GetNodeID() []uint64 GetParentID() []uint64 GetTimestamp() []uint64 } func newTreeNode(nodeInfo NodeResponse) (*treeNode, error) { tNode := &treeNode{ ID: nodeInfo.GetNodeID(), ParentID: nodeInfo.GetParentID(), TimeStamp: nodeInfo.GetTimestamp(), Meta: make(map[string]string, len(nodeInfo.GetMeta())), } if len(tNode.ID) == 0 || len(tNode.ParentID) == 0 || len(tNode.TimeStamp) == 0 { return nil, errors.New("invalid tree node: missing id") } if len(tNode.ID) != len(tNode.ParentID) || len(tNode.ID) != len(tNode.TimeStamp) { return nil, errors.New("invalid tree node: length multiple ids mismatch") } for _, kv := range nodeInfo.GetMeta() { switch kv.GetKey() { case oidKV: if err := tNode.ObjID.DecodeString(string(kv.GetValue())); err != nil { return nil, err } case sizeKV: if sizeStr := string(kv.GetValue()); len(sizeStr) > 0 { var err error if tNode.Size, err = strconv.ParseUint(sizeStr, 10, 64); err != nil { return nil, fmt.Errorf("invalid size value '%s': %w", sizeStr, err) } } default: tNode.Meta[kv.GetKey()] = string(kv.GetValue()) } } return tNode, nil } func (n *treeNode) Get(key string) (string, bool) { value, ok := n.Meta[key] return value, ok } func (n *treeNode) FileName() (string, bool) { value, ok := n.Meta[FileNameKey] return value, ok } func (n *treeNode) IsSplit() bool { return len(n.ID) != 1 || len(n.ParentID) != 1 || len(n.TimeStamp) != 1 } func (n *treeNode) GetLatestNodeIndex() int { var ( maxTimestamp uint64 index int ) for i, timestamp := range n.TimeStamp { if timestamp > maxTimestamp { maxTimestamp = timestamp index = i } } return index } func newNodeVersion(log *zap.Logger, filePath string, node NodeResponse) (*data.NodeVersion, error) { tNode, err := newTreeNode(node) if err != nil { return nil, fmt.Errorf("invalid tree node: %w", err) } return newNodeVersionFromTreeNode(log, filePath, tNode) } func newNodeVersionFromTreeNode(log *zap.Logger, filePath string, treeNode *treeNode) (*data.NodeVersion, error) { _, isUnversioned := treeNode.Get(isUnversionedKV) _, isDeleteMarker := treeNode.Get(isDeleteMarkerKV) _, isCombined := treeNode.Get(isCombinedKV) eTag, _ := treeNode.Get(etagKV) md5, _ := treeNode.Get(md5KV) if treeNode.IsSplit() { return nil, errors.New("invalid version tree node: this is split node") } version := &data.NodeVersion{ BaseNodeVersion: data.BaseNodeVersion{ ID: treeNode.ID[0], ParenID: treeNode.ParentID[0], OID: treeNode.ObjID, Timestamp: treeNode.TimeStamp[0], ETag: eTag, MD5: md5, Size: treeNode.Size, FilePath: filePath, IsDeleteMarker: isDeleteMarker, }, IsUnversioned: isUnversioned, IsCombined: isCombined, } if createdStr, ok := treeNode.Get(createdKV); ok { if utcMilli, err := strconv.ParseInt(createdStr, 10, 64); err != nil { log.Warn(logs.InvalidTreeKV, zap.String(createdKV, createdStr), zap.Error(err)) } else { created := time.UnixMilli(utcMilli) version.Created = &created } } if ownerStr, ok := treeNode.Get(ownerKV); ok { var owner user.ID if err := owner.DecodeString(ownerStr); err != nil { log.Warn(logs.InvalidTreeKV, zap.String(ownerKV, ownerStr), zap.Error(err)) } else { version.Owner = &owner } } return version, nil } func newMultipartInfoFromTreeNode(log *zap.Logger, filePath string, treeNode *treeNode) (*data.MultipartInfo, error) { uploadID, _ := treeNode.Get(uploadIDKV) if uploadID == "" { return nil, fmt.Errorf("it's not a multipart node: missing UploadId") } if treeNode.IsSplit() { return nil, fmt.Errorf("invalid multipart node '%s': tree node is split", filePath) } multipartInfo := &data.MultipartInfo{ ID: treeNode.ID[0], Key: filePath, UploadID: uploadID, Meta: treeNode.Meta, } if ownerID, ok := treeNode.Get(ownerKV); ok { if err := multipartInfo.Owner.DecodeString(ownerID); err != nil { log.Warn(logs.InvalidTreeKV, zap.String(ownerKV, ownerID), zap.Error(err)) } } if created, ok := treeNode.Get(createdKV); ok { if utcMilli, err := strconv.ParseInt(created, 10, 64); err != nil { log.Warn(logs.InvalidTreeKV, zap.String(createdKV, created), zap.Error(err)) } else { multipartInfo.Created = time.UnixMilli(utcMilli) } } if finished, ok := treeNode.Get(finishedKV); ok { if flag, err := strconv.ParseBool(finished); err != nil { log.Warn(logs.InvalidTreeKV, zap.String(finishedKV, finished), zap.Error(err)) } else { multipartInfo.Finished = flag } } return multipartInfo, nil } func newMultipartInfo(log *zap.Logger, node NodeResponse) (*data.MultipartInfo, error) { if len(node.GetNodeID()) != 1 { return nil, errors.New("invalid multipart node: this is split node") } multipartInfo := &data.MultipartInfo{ ID: node.GetNodeID()[0], Meta: make(map[string]string, len(node.GetMeta())), } for _, kv := range node.GetMeta() { switch kv.GetKey() { case uploadIDKV: multipartInfo.UploadID = string(kv.GetValue()) case FileNameKey: multipartInfo.Key = string(kv.GetValue()) case createdKV: if utcMilli, err := strconv.ParseInt(string(kv.GetValue()), 10, 64); err != nil { log.Warn(logs.InvalidTreeKV, zap.String(createdKV, string(kv.GetValue())), zap.Error(err)) } else { multipartInfo.Created = time.UnixMilli(utcMilli) } case ownerKV: if err := multipartInfo.Owner.DecodeString(string(kv.GetValue())); err != nil { log.Warn(logs.InvalidTreeKV, zap.String(ownerKV, string(kv.GetValue())), zap.Error(err)) } case finishedKV: if isFinished, err := strconv.ParseBool(string(kv.GetValue())); err != nil { log.Warn(logs.InvalidTreeKV, zap.String(finishedKV, string(kv.GetValue())), zap.Error(err)) } else { multipartInfo.Finished = isFinished } default: multipartInfo.Meta[kv.GetKey()] = string(kv.GetValue()) } } if multipartInfo.UploadID == "" { return nil, fmt.Errorf("it's not a multipart node") } return multipartInfo, nil } func newPartInfo(node NodeResponse) (*data.PartInfo, error) { var err error partInfo := &data.PartInfo{} for _, kv := range node.GetMeta() { value := string(kv.GetValue()) switch kv.GetKey() { case partNumberKV: if partInfo.Number, err = strconv.Atoi(value); err != nil { return nil, fmt.Errorf("invalid part number: %w", err) } case oidKV: if err = partInfo.OID.DecodeString(value); err != nil { return nil, fmt.Errorf("invalid oid: %w", err) } case etagKV: partInfo.ETag = value case sizeKV: if partInfo.Size, err = strconv.ParseUint(value, 10, 64); err != nil { return nil, fmt.Errorf("invalid part size: %w", err) } case createdKV: var utcMilli int64 if utcMilli, err = strconv.ParseInt(value, 10, 64); err != nil { return nil, fmt.Errorf("invalid created timestamp: %w", err) } partInfo.Created = time.UnixMilli(utcMilli) case md5KV: partInfo.MD5 = value } } if partInfo.Number <= 0 { return nil, fmt.Errorf("it's not a part node") } return partInfo, nil } func (c *Tree) GetSettingsNode(ctx context.Context, bktInfo *data.BucketInfo) (*data.BucketSettings, error) { node, err := c.getSystemNode(ctx, bktInfo, []string{settingsFileName}) if err != nil { return nil, fmt.Errorf("couldn't get node: %w", err) } settings := &data.BucketSettings{Versioning: data.VersioningUnversioned} if versioningValue, ok := node.Get(versioningKV); ok { settings.Versioning = versioningValue } if lockConfigurationValue, ok := node.Get(lockConfigurationKV); ok { if settings.LockConfiguration, err = parseLockConfiguration(lockConfigurationValue); err != nil { return nil, fmt.Errorf("settings node: invalid lock configuration: %w", err) } } settings.CannedACL, _ = node.Get(cannedACLKV) if ownerKeyHex, ok := node.Get(ownerKeyKV); ok { if settings.OwnerKey, err = keys.NewPublicKeyFromString(ownerKeyHex); err != nil { c.reqLogger(ctx).Error(logs.SettingsNodeInvalidOwnerKey, zap.Error(err)) } } return settings, nil } func (c *Tree) PutSettingsNode(ctx context.Context, bktInfo *data.BucketInfo, settings *data.BucketSettings) error { node, err := c.getSystemNode(ctx, bktInfo, []string{settingsFileName}) isErrNotFound := errors.Is(err, layer.ErrNodeNotFound) if err != nil && !isErrNotFound { return fmt.Errorf("couldn't get node: %w", err) } meta := metaFromSettings(settings) if isErrNotFound { _, err = c.service.AddNode(ctx, bktInfo, systemTree, 0, meta) return err } ind := node.GetLatestNodeIndex() if node.IsSplit() { c.reqLogger(ctx).Warn(logs.FoundSeveralBucketSettingsNodes) } return c.service.MoveNode(ctx, bktInfo, systemTree, node.ID[ind], 0, meta) } func (c *Tree) GetBucketCORS(ctx context.Context, bktInfo *data.BucketInfo) (oid.ID, error) { node, err := c.getSystemNode(ctx, bktInfo, []string{corsFilename}) if err != nil { return oid.ID{}, err } return node.ObjID, nil } func (c *Tree) PutBucketCORS(ctx context.Context, bktInfo *data.BucketInfo, objID oid.ID) (oid.ID, error) { node, err := c.getSystemNode(ctx, bktInfo, []string{corsFilename}) isErrNotFound := errors.Is(err, layer.ErrNodeNotFound) if err != nil && !isErrNotFound { return oid.ID{}, fmt.Errorf("couldn't get node: %w", err) } meta := make(map[string]string) meta[FileNameKey] = corsFilename meta[oidKV] = objID.EncodeToString() if isErrNotFound { if _, err = c.service.AddNode(ctx, bktInfo, systemTree, 0, meta); err != nil { return oid.ID{}, err } return oid.ID{}, layer.ErrNoNodeToRemove } ind := node.GetLatestNodeIndex() if node.IsSplit() { c.reqLogger(ctx).Warn(logs.FoundSeveralBucketCorsNodes) } return node.ObjID, c.service.MoveNode(ctx, bktInfo, systemTree, node.ID[ind], 0, meta) } func (c *Tree) DeleteBucketCORS(ctx context.Context, bktInfo *data.BucketInfo) (oid.ID, error) { node, err := c.getSystemNode(ctx, bktInfo, []string{corsFilename}) if err != nil && !errors.Is(err, layer.ErrNodeNotFound) { return oid.ID{}, err } if node != nil { ind := node.GetLatestNodeIndex() if node.IsSplit() { c.reqLogger(ctx).Warn(logs.FoundSeveralBucketCorsNodes) } return node.ObjID, c.service.RemoveNode(ctx, bktInfo, systemTree, node.ID[ind]) } return oid.ID{}, layer.ErrNoNodeToRemove } func (c *Tree) GetObjectTagging(ctx context.Context, bktInfo *data.BucketInfo, objVersion *data.NodeVersion) (map[string]string, error) { tagNode, err := c.getTreeNode(ctx, bktInfo, objVersion.ID, isTagKV) if err != nil { return nil, err } return getObjectTagging(tagNode), nil } func getObjectTagging(tagNode *treeNode) map[string]string { if tagNode == nil { return nil } meta := make(map[string]string) for key, val := range tagNode.Meta { if strings.HasPrefix(key, userDefinedTagPrefix) { meta[strings.TrimPrefix(key, userDefinedTagPrefix)] = val } } return meta } func (c *Tree) PutObjectTagging(ctx context.Context, bktInfo *data.BucketInfo, objVersion *data.NodeVersion, tagSet map[string]string) error { tagNode, err := c.getTreeNode(ctx, bktInfo, objVersion.ID, isTagKV) if err != nil { return err } treeTagSet := make(map[string]string) treeTagSet[isTagKV] = "true" for key, val := range tagSet { treeTagSet[userDefinedTagPrefix+key] = val } if tagNode == nil { _, err = c.service.AddNode(ctx, bktInfo, versionTree, objVersion.ID, treeTagSet) return err } ind := tagNode.GetLatestNodeIndex() if tagNode.IsSplit() { c.reqLogger(ctx).Warn(logs.FoundSeveralObjectTaggingNodes) } return c.service.MoveNode(ctx, bktInfo, versionTree, tagNode.ID[ind], objVersion.ID, treeTagSet) } func (c *Tree) DeleteObjectTagging(ctx context.Context, bktInfo *data.BucketInfo, objVersion *data.NodeVersion) error { return c.PutObjectTagging(ctx, bktInfo, objVersion, nil) } func (c *Tree) GetBucketTagging(ctx context.Context, bktInfo *data.BucketInfo) (map[string]string, error) { node, err := c.getSystemNode(ctx, bktInfo, []string{bucketTaggingFilename}) if err != nil { return nil, err } tags := make(map[string]string) for key, val := range node.Meta { if strings.HasPrefix(key, userDefinedTagPrefix) { tags[strings.TrimPrefix(key, userDefinedTagPrefix)] = val } } return tags, nil } func (c *Tree) PutBucketTagging(ctx context.Context, bktInfo *data.BucketInfo, tagSet map[string]string) error { node, err := c.getSystemNode(ctx, bktInfo, []string{bucketTaggingFilename}) isErrNotFound := errors.Is(err, layer.ErrNodeNotFound) if err != nil && !isErrNotFound { return fmt.Errorf("couldn't get node: %w", err) } treeTagSet := make(map[string]string) treeTagSet[FileNameKey] = bucketTaggingFilename for key, val := range tagSet { treeTagSet[userDefinedTagPrefix+key] = val } if isErrNotFound { _, err = c.service.AddNode(ctx, bktInfo, systemTree, 0, treeTagSet) return err } ind := node.GetLatestNodeIndex() if node.IsSplit() { c.reqLogger(ctx).Warn(logs.FoundSeveralBucketTaggingNodes) } return c.service.MoveNode(ctx, bktInfo, systemTree, node.ID[ind], 0, treeTagSet) } func (c *Tree) DeleteBucketTagging(ctx context.Context, bktInfo *data.BucketInfo) error { return c.PutBucketTagging(ctx, bktInfo, nil) } func (c *Tree) getTreeNode(ctx context.Context, bktInfo *data.BucketInfo, nodeID uint64, key string) (*treeNode, error) { nodes, err := c.getTreeNodes(ctx, bktInfo, nodeID, key) if err != nil { return nil, err } // if there will be many allocations, consider having separate // implementations of 'getTreeNode' and 'getTreeNodes' return nodes[key], nil } func (c *Tree) getTreeNodes(ctx context.Context, bktInfo *data.BucketInfo, nodeID uint64, keys ...string) (map[string]*treeNode, error) { subtree, err := c.service.GetSubTree(ctx, bktInfo, versionTree, []uint64{nodeID}, 2) if err != nil { return nil, err } treeNodes := make(map[string]*treeNode, len(keys)) for _, s := range subtree { node, err := newTreeNode(s) if err != nil { return nil, err } for _, key := range keys { if _, ok := node.Get(key); ok { treeNodes[key] = node break } } if len(treeNodes) == len(keys) { break } } return treeNodes, nil } func (c *Tree) GetVersions(ctx context.Context, bktInfo *data.BucketInfo, filepath string) ([]*data.NodeVersion, error) { return c.getVersions(ctx, bktInfo, versionTree, filepath, false) } func (c *Tree) GetLatestVersion(ctx context.Context, bktInfo *data.BucketInfo, objectName string) (*data.NodeVersion, error) { meta := []string{oidKV, isCombinedKV, isUnversionedKV, isDeleteMarkerKV, etagKV, sizeKV, md5KV} path := pathFromName(objectName) p := &GetNodesParams{ BktInfo: bktInfo, TreeID: versionTree, Path: path, Meta: meta, LatestOnly: false, AllAttrs: false, } nodes, err := c.service.GetNodes(ctx, p) if err != nil { return nil, err } latestNode, err := getLatestVersionNode(nodes) if err != nil { return nil, err } return newNodeVersion(c.reqLogger(ctx), objectName, latestNode) } func getLatestVersionNode(nodes []NodeResponse) (NodeResponse, error) { var ( maxCreationTime uint64 targetIndexNode = -1 ) for i, node := range nodes { if !checkExistOID(node.GetMeta()) { continue } if currentCreationTime := getMaxTimestamp(node); currentCreationTime > maxCreationTime { targetIndexNode = i maxCreationTime = currentCreationTime } } if targetIndexNode == -1 { return nil, layer.ErrNodeNotFound } return nodes[targetIndexNode], nil } func getLatestNode(nodes []NodeResponse) NodeResponse { if len(nodes) == 0 { return nil } var ( index int maxTimestamp uint64 ) for i, node := range nodes { if timestamp := getMaxTimestamp(node); timestamp > maxTimestamp { index = i maxTimestamp = timestamp } } return nodes[index] } func getMaxTimestamp(node NodeResponse) uint64 { var maxTimestamp uint64 for _, timestamp := range node.GetTimestamp() { if timestamp > maxTimestamp { maxTimestamp = timestamp } } return maxTimestamp } func checkExistOID(meta []Meta) bool { for _, kv := range meta { if kv.GetKey() == "OID" { return true } } return false } // pathFromName splits name by '/'. func pathFromName(objectName string) []string { return strings.Split(objectName, separator) } type DummySubTreeStream struct { data NodeResponse read bool } func (s *DummySubTreeStream) Next() (NodeResponse, error) { if s.read { return nil, io.EOF } s.read = true return s.data, nil } type MultiID []uint64 func (m MultiID) Equal(id MultiID) bool { seen := make(map[uint64]struct{}, len(m)) for i := range m { seen[m[i]] = struct{}{} } for i := range id { if _, ok := seen[id[i]]; !ok { return false } } return true } type VersionsByPrefixStreamImpl struct { ctx context.Context rootID MultiID intermediateRootID MultiID service ServiceClient bktInfo *data.BucketInfo mainStream SubTreeStream innerStream SubTreeStream headPrefix string tailPrefix string namesMap map[uint64]string ended bool latestOnly bool currentLatest *data.NodeVersion log *zap.Logger } func (s *VersionsByPrefixStreamImpl) Next(context.Context) (*data.NodeVersion, error) { if s.ended { return nil, io.EOF } for { if s.innerStream == nil { node, err := s.getNodeFromMainStream() if err != nil { if errors.Is(err, io.EOF) { s.ended = true if s.currentLatest != nil { return s.currentLatest, nil } } return nil, fmt.Errorf("get node from main stream: %w", err) } if err = s.initInnerStream(node); err != nil { return nil, fmt.Errorf("init inner stream: %w", err) } } nodeVersion, err := s.getNodeVersionFromInnerStream() if err != nil { if errors.Is(err, io.EOF) { s.innerStream = nil maps.Clear(s.namesMap) if s.currentLatest != nil && !s.intermediateRootID.Equal([]uint64{s.currentLatest.ID}) { return s.currentLatest, nil } continue } return nil, fmt.Errorf("inner stream: %w", err) } return nodeVersion, nil } } func (s *VersionsByPrefixStreamImpl) getNodeFromMainStream() (NodeResponse, error) { for { node, err := s.mainStream.Next() if err != nil { if errors.Is(err, ErrNodeNotFound) { return nil, io.EOF } return nil, fmt.Errorf("main stream next: %w", err) } if !s.rootID.Equal(node.GetNodeID()) && strings.HasPrefix(getFilename(node), s.tailPrefix) { return node, nil } } } func (s *VersionsByPrefixStreamImpl) initInnerStream(node NodeResponse) (err error) { if s.rootID.Equal(node.GetParentID()) { s.intermediateRootID = node.GetNodeID() } if isIntermediate(node) { s.innerStream, err = s.service.GetSubTreeStream(s.ctx, s.bktInfo, versionTree, node.GetNodeID(), maxGetSubTreeDepth) if err != nil { return fmt.Errorf("get sub tree node from main stream: %w", err) } } else { s.innerStream = &DummySubTreeStream{data: node} } return nil } func (s *VersionsByPrefixStreamImpl) getNodeVersionFromInnerStream() (*data.NodeVersion, error) { for { node, err := s.innerStream.Next() if err != nil { return nil, fmt.Errorf("inner stream: %w", err) } nodeVersion, skip, err := s.parseNodeResponse(node) if err != nil { return nil, err } if skip { continue } if s.latestOnly { if s.currentLatest == nil { s.currentLatest = nodeVersion continue } if s.currentLatest.FilePath != nodeVersion.FilePath { res := s.currentLatest s.currentLatest = nodeVersion return res, nil } if s.currentLatest.Timestamp < nodeVersion.Timestamp { s.currentLatest = nodeVersion } continue } return nodeVersion, nil } } func (s *VersionsByPrefixStreamImpl) parseNodeResponse(node NodeResponse) (res *data.NodeVersion, skip bool, err error) { trNode, fileName, err := parseTreeNode(node) if err != nil { s.log.Debug(logs.ParseTreeNode, zap.Error(err)) return nil, true, nil } var parentPrefix string if s.headPrefix != "" { // The root of subTree can also have a parent parentPrefix = strings.TrimSuffix(s.headPrefix, separator) + separator // To avoid 'foo//bar' } var filepath string if !s.intermediateRootID.Equal(trNode.ID) { if filepath, err = formFilePath(node, fileName, s.namesMap); err != nil { return nil, false, fmt.Errorf("invalid node order: %w", err) } } else { filepath = parentPrefix + fileName for _, id := range trNode.ID { s.namesMap[id] = filepath } } if trNode.ObjID.Equals(oid.ID{}) { // The node can be intermediate, but we still want to update namesMap return nil, true, nil } nodeVersion, err := newNodeVersionFromTreeNode(s.log, filepath, trNode) return nodeVersion, false, err } func (c *Tree) InitVersionsByPrefixStream(ctx context.Context, bktInfo *data.BucketInfo, prefix string, latestOnly bool) (data.VersionsStream, error) { mainStream, tailPrefix, rootID, err := c.getSubTreeByPrefixMainStream(ctx, bktInfo, versionTree, prefix) if err != nil { if errors.Is(err, io.EOF) { return &VersionsByPrefixStreamImpl{ended: true}, nil } return nil, err } return &VersionsByPrefixStreamImpl{ ctx: ctx, namesMap: map[uint64]string{}, rootID: rootID, service: c.service, bktInfo: bktInfo, mainStream: mainStream, headPrefix: strings.TrimSuffix(prefix, tailPrefix), tailPrefix: tailPrefix, latestOnly: latestOnly, log: c.reqLogger(ctx), }, nil } func (c *Tree) getSubTreeByPrefixMainStream(ctx context.Context, bktInfo *data.BucketInfo, treeID, prefix string) (SubTreeStream, string, []uint64, error) { rootID, tailPrefix, err := c.determinePrefixNode(ctx, bktInfo, treeID, prefix) if err != nil { if errors.Is(err, layer.ErrNodeNotFound) { return nil, "", nil, io.EOF } return nil, "", nil, err } subTree, err := c.service.GetSubTreeStream(ctx, bktInfo, treeID, rootID, 2) if err != nil { if errors.Is(err, layer.ErrNodeNotFound) { return nil, "", nil, io.EOF } return nil, "", nil, err } return subTree, tailPrefix, rootID, nil } func (c *Tree) determinePrefixNode(ctx context.Context, bktInfo *data.BucketInfo, treeID, prefix string) ([]uint64, string, error) { rootID := []uint64{0} path := strings.Split(prefix, separator) tailPrefix := path[len(path)-1] if len(path) > 1 { var err error rootID, err = c.getPrefixNodeID(ctx, bktInfo, treeID, path[:len(path)-1]) if err != nil { return nil, "", err } } return rootID, tailPrefix, nil } func (c *Tree) getPrefixNodeID(ctx context.Context, bktInfo *data.BucketInfo, treeID string, prefixPath []string) ([]uint64, error) { p := &GetNodesParams{ BktInfo: bktInfo, TreeID: treeID, Path: prefixPath, LatestOnly: false, AllAttrs: true, } nodes, err := c.service.GetNodes(ctx, p) if err != nil { return nil, err } var intermediateNodes []uint64 for _, node := range nodes { if isIntermediate(node) { intermediateNodes = append(intermediateNodes, node.GetNodeID()...) } } if len(intermediateNodes) == 0 { return nil, layer.ErrNodeNotFound } return intermediateNodes, nil } func (c *Tree) getSubTreeByPrefix(ctx context.Context, bktInfo *data.BucketInfo, treeID, prefix string, latestOnly bool) ([]NodeResponse, string, error) { rootID, tailPrefix, err := c.determinePrefixNode(ctx, bktInfo, treeID, prefix) if err != nil { if errors.Is(err, layer.ErrNodeNotFound) { return nil, "", nil } return nil, "", err } subTree, err := c.service.GetSubTree(ctx, bktInfo, treeID, rootID, 2) if err != nil { if errors.Is(err, layer.ErrNodeNotFound) { return nil, "", nil } return nil, "", err } nodesMap := make(map[string][]NodeResponse, len(subTree)) for _, node := range subTree { if MultiID(rootID).Equal(node.GetNodeID()) { continue } fileName := getFilename(node) if !strings.HasPrefix(fileName, tailPrefix) { continue } nodes := nodesMap[fileName] // Add all nodes if flag latestOnly is false. // Add all intermediate nodes // and only latest leaf (object) nodes. To do this store and replace last leaf (object) node in nodes[0] if len(nodes) == 0 { nodes = []NodeResponse{node} } else if !latestOnly || isIntermediate(node) { nodes = append(nodes, node) } else if isIntermediate(nodes[0]) { nodes = append([]NodeResponse{node}, nodes...) } else if getMaxTimestamp(node) > getMaxTimestamp(nodes[0]) { nodes[0] = node } nodesMap[fileName] = nodes } result := make([]NodeResponse, 0, len(subTree)) for _, nodes := range nodesMap { result = append(result, nodes...) } return result, strings.TrimSuffix(prefix, tailPrefix), nil } func getFilename(node NodeResponse) string { for _, kv := range node.GetMeta() { if kv.GetKey() == FileNameKey { return string(kv.GetValue()) } } return "" } func isIntermediate(node NodeResponse) bool { if len(node.GetMeta()) != 1 { return false } return node.GetMeta()[0].GetKey() == FileNameKey } func formFilePath(node NodeResponse, fileName string, namesMap map[uint64]string) (string, error) { var filepath string for i, id := range node.GetParentID() { parentPath, ok := namesMap[id] if !ok { return "", fmt.Errorf("couldn't get parent path") } filepath = parentPath + separator + fileName namesMap[node.GetNodeID()[i]] = filepath } return filepath, nil } func parseTreeNode(node NodeResponse) (*treeNode, string, error) { tNode, err := newTreeNode(node) if err != nil { // invalid OID attribute return nil, "", err } fileName, ok := tNode.FileName() if !ok { return nil, "", fmt.Errorf("doesn't contain FileName") } return tNode, fileName, nil } func formLatestNodeKey(parentID uint64, fileName string) string { return strconv.FormatUint(parentID, 10) + "." + fileName } func (c *Tree) GetUnversioned(ctx context.Context, bktInfo *data.BucketInfo, filepath string) (*data.NodeVersion, error) { return c.getUnversioned(ctx, bktInfo, versionTree, filepath) } func (c *Tree) getUnversioned(ctx context.Context, bktInfo *data.BucketInfo, treeID, filepath string) (*data.NodeVersion, error) { nodes, err := c.getVersions(ctx, bktInfo, treeID, filepath, true) if err != nil { return nil, err } if len(nodes) == 0 { return nil, layer.ErrNodeNotFound } if len(nodes) > 1 { c.reqLogger(ctx).Debug(logs.FoundMoreThanOneUnversionedNode, zap.String("treeID", treeID), zap.String("filepath", filepath)) } sort.Slice(nodes, func(i, j int) bool { return nodes[i].Timestamp > nodes[j].Timestamp }) return nodes[0], nil } func (c *Tree) AddVersion(ctx context.Context, bktInfo *data.BucketInfo, version *data.NodeVersion) (uint64, error) { return c.addVersion(ctx, bktInfo, versionTree, version) } func (c *Tree) RemoveVersion(ctx context.Context, bktInfo *data.BucketInfo, id uint64) error { return c.service.RemoveNode(ctx, bktInfo, versionTree, id) } func (c *Tree) CreateMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo, info *data.MultipartInfo) error { path := pathFromName(info.Key) meta := metaFromMultipart(info, path[len(path)-1]) _, err := c.service.AddNodeByPath(ctx, bktInfo, systemTree, path[:len(path)-1], meta) return err } func (c *Tree) GetMultipartUploadsByPrefix(ctx context.Context, bktInfo *data.BucketInfo, prefix string) ([]*data.MultipartInfo, error) { subTreeNodes, headPrefix, err := c.getSubTreeByPrefix(ctx, bktInfo, systemTree, prefix, false) if err != nil { return nil, err } var result []*data.MultipartInfo for _, node := range subTreeNodes { multipartUploads, err := c.getSubTreeMultipartUploads(ctx, bktInfo, node.GetNodeID(), headPrefix) if err != nil { return nil, err } result = append(result, multipartUploads...) } return result, nil } func (c *Tree) getSubTreeMultipartUploads(ctx context.Context, bktInfo *data.BucketInfo, nodeID []uint64, parentFilePath string) ([]*data.MultipartInfo, error) { subTree, err := c.service.GetSubTree(ctx, bktInfo, systemTree, nodeID, maxGetSubTreeDepth) if err != nil { return nil, err } var parentPrefix string if parentFilePath != "" { // The root of subTree can also have a parent parentPrefix = strings.TrimSuffix(parentFilePath, separator) + separator // To avoid 'foo//bar' } var filepath string namesMap := make(map[uint64]string, len(subTree)) multiparts := make(map[string][]*data.MultipartInfo, len(subTree)) for i, node := range subTree { tNode, fileName, err := parseTreeNode(node) if err != nil { continue } if i != 0 { if filepath, err = formFilePath(node, fileName, namesMap); err != nil { return nil, fmt.Errorf("invalid node order: %w", err) } } else { filepath = parentPrefix + fileName for _, id := range tNode.ID { namesMap[id] = filepath } } multipartInfo, err := newMultipartInfoFromTreeNode(c.reqLogger(ctx), filepath, tNode) if err != nil || multipartInfo.Finished { continue } for _, id := range node.GetParentID() { key := formLatestNodeKey(id, fileName) multipartInfos, ok := multiparts[key] if !ok { multipartInfos = []*data.MultipartInfo{multipartInfo} } else { multipartInfos = append(multipartInfos, multipartInfo) } multiparts[key] = multipartInfos } } result := make([]*data.MultipartInfo, 0, len(multiparts)) for _, multipartInfo := range multiparts { result = append(result, multipartInfo...) } return result, nil } func (c *Tree) GetMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo, objectName, uploadID string) (*data.MultipartInfo, error) { path := pathFromName(objectName) p := &GetNodesParams{ BktInfo: bktInfo, TreeID: systemTree, Path: path, AllAttrs: true, } nodes, err := c.service.GetNodes(ctx, p) if err != nil { return nil, err } log := c.reqLogger(ctx) for _, node := range nodes { info, err := newMultipartInfo(log, node) if err != nil { continue } if info.UploadID == uploadID { if info.Finished { break } return info, nil } } return nil, layer.ErrNodeNotFound } func (c *Tree) AddPart(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64, info *data.PartInfo) (oldObjIDToDelete oid.ID, err error) { parts, err := c.service.GetSubTree(ctx, bktInfo, systemTree, []uint64{multipartNodeID}, 2) if err != nil { return oid.ID{}, err } meta := map[string]string{ partNumberKV: strconv.Itoa(info.Number), oidKV: info.OID.EncodeToString(), sizeKV: strconv.FormatUint(info.Size, 10), createdKV: strconv.FormatInt(info.Created.UTC().UnixMilli(), 10), etagKV: info.ETag, md5KV: info.MD5, } for _, part := range parts { if len(part.GetNodeID()) != 1 { // multipart parts nodeID shouldn't have multiple values c.reqLogger(ctx).Warn(logs.UnexpectedMultiNodeIDsInSubTreeMultiParts, zap.String("key", info.Key), zap.String("upload id", info.UploadID), zap.Uint64("multipart node id ", multipartNodeID), zap.Uint64s("node ids", part.GetNodeID())) continue } nodeID := part.GetNodeID()[0] if nodeID == multipartNodeID { continue } partInfo, err := newPartInfo(part) if err != nil { c.reqLogger(ctx).Warn(logs.FailedToParsePartInfo, zap.String("key", info.Key), zap.String("upload id", info.UploadID), zap.Uint64("multipart node id ", multipartNodeID), zap.Error(err)) continue } if partInfo.Number == info.Number { return partInfo.OID, c.service.MoveNode(ctx, bktInfo, systemTree, nodeID, multipartNodeID, meta) } } if _, err = c.service.AddNode(ctx, bktInfo, systemTree, multipartNodeID, meta); err != nil { return oid.ID{}, err } return oid.ID{}, layer.ErrNoNodeToRemove } func (c *Tree) GetParts(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64) ([]*data.PartInfo, error) { parts, err := c.service.GetSubTree(ctx, bktInfo, systemTree, []uint64{multipartNodeID}, 2) if err != nil { return nil, err } result := make([]*data.PartInfo, 0, len(parts)) for _, part := range parts { if len(part.GetNodeID()) != 1 { // multipart parts nodeID shouldn't have multiple values c.reqLogger(ctx).Warn(logs.UnexpectedMultiNodeIDsInSubTreeMultiParts, zap.Uint64("multipart node id ", multipartNodeID), zap.Uint64s("node ids", part.GetNodeID())) continue } if part.GetNodeID()[0] == multipartNodeID { continue } partInfo, err := newPartInfo(part) if err != nil { c.reqLogger(ctx).Warn(logs.FailedToParsePartInfo, zap.Uint64("multipart node id ", multipartNodeID), zap.Uint64s("node ids", part.GetNodeID()), zap.Error(err)) continue } result = append(result, partInfo) } return result, nil } func (c *Tree) DeleteMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo, multipartInfo *data.MultipartInfo) error { err := c.service.RemoveNode(ctx, bktInfo, systemTree, multipartInfo.ID) if err != nil { return err } multipartInfo.Finished = true return c.CreateMultipartUpload(ctx, bktInfo, multipartInfo) } func (c *Tree) PutLock(ctx context.Context, bktInfo *data.BucketInfo, nodeID uint64, lock *data.LockInfo) error { meta := map[string]string{isLockKV: "true"} if lock.IsLegalHoldSet() { meta[legalHoldOIDKV] = lock.LegalHold().EncodeToString() } if lock.IsRetentionSet() { meta[retentionOIDKV] = lock.Retention().EncodeToString() meta[untilDateKV] = lock.UntilDate() if lock.IsCompliance() { meta[isComplianceKV] = "true" } } if lock.ID() == 0 { _, err := c.service.AddNode(ctx, bktInfo, versionTree, nodeID, meta) return err } return c.service.MoveNode(ctx, bktInfo, versionTree, lock.ID(), nodeID, meta) } func (c *Tree) GetLock(ctx context.Context, bktInfo *data.BucketInfo, nodeID uint64) (*data.LockInfo, error) { lockNode, err := c.getTreeNode(ctx, bktInfo, nodeID, isLockKV) if err != nil { return nil, err } return getLock(lockNode) } func getLock(lockNode *treeNode) (*data.LockInfo, error) { if lockNode == nil { return &data.LockInfo{}, nil } if lockNode.IsSplit() { return nil, errors.New("invalid lock node: this is split node") } lockInfo := data.NewLockInfo(lockNode.ID[0]) if legalHold, ok := lockNode.Get(legalHoldOIDKV); ok { var legalHoldOID oid.ID if err := legalHoldOID.DecodeString(legalHold); err != nil { return nil, fmt.Errorf("invalid legal hold object id: %w", err) } lockInfo.SetLegalHold(legalHoldOID) } if retention, ok := lockNode.Get(retentionOIDKV); ok { var retentionOID oid.ID if err := retentionOID.DecodeString(retention); err != nil { return nil, fmt.Errorf("invalid retention object id: %w", err) } _, isCompliance := lockNode.Get(isComplianceKV) untilDate, _ := lockNode.Get(untilDateKV) lockInfo.SetRetention(retentionOID, untilDate, isCompliance) } return lockInfo, nil } func (c *Tree) GetObjectTaggingAndLock(ctx context.Context, bktInfo *data.BucketInfo, objVersion *data.NodeVersion) (map[string]string, *data.LockInfo, error) { nodes, err := c.getTreeNodes(ctx, bktInfo, objVersion.ID, isTagKV, isLockKV) if err != nil { return nil, nil, err } lockInfo, err := getLock(nodes[isLockKV]) if err != nil { return nil, nil, err } return getObjectTagging(nodes[isTagKV]), lockInfo, nil } func (c *Tree) addVersion(ctx context.Context, bktInfo *data.BucketInfo, treeID string, version *data.NodeVersion) (uint64, error) { path := pathFromName(version.FilePath) meta := map[string]string{ oidKV: version.OID.EncodeToString(), FileNameKey: path[len(path)-1], ownerKV: version.Owner.EncodeToString(), createdKV: strconv.FormatInt(version.Created.UTC().UnixMilli(), 10), } if version.Size > 0 { meta[sizeKV] = strconv.FormatUint(version.Size, 10) } if len(version.ETag) > 0 { meta[etagKV] = version.ETag } if len(version.MD5) > 0 { meta[md5KV] = version.MD5 } if version.IsDeleteMarker { meta[isDeleteMarkerKV] = "true" } if version.IsCombined { meta[isCombinedKV] = "true" } if version.IsUnversioned { meta[isUnversionedKV] = "true" node, err := c.getUnversioned(ctx, bktInfo, treeID, version.FilePath) if err == nil { if err = c.service.MoveNode(ctx, bktInfo, treeID, node.ID, node.ParenID, meta); err != nil { return 0, err } return node.ID, c.clearOutdatedVersionInfo(ctx, bktInfo, treeID, node.ID) } if !errors.Is(err, layer.ErrNodeNotFound) { return 0, err } } return c.service.AddNodeByPath(ctx, bktInfo, treeID, path[:len(path)-1], meta) } func (c *Tree) clearOutdatedVersionInfo(ctx context.Context, bktInfo *data.BucketInfo, treeID string, nodeID uint64) error { taggingNode, err := c.getTreeNode(ctx, bktInfo, nodeID, isTagKV) if err != nil { return err } if taggingNode != nil { return c.service.RemoveNode(ctx, bktInfo, treeID, taggingNode.ID[0]) } return nil } func (c *Tree) getVersions(ctx context.Context, bktInfo *data.BucketInfo, treeID, filepath string, onlyUnversioned bool) ([]*data.NodeVersion, error) { keysToReturn := []string{oidKV, isCombinedKV, isUnversionedKV, isDeleteMarkerKV, etagKV, sizeKV, md5KV} path := pathFromName(filepath) p := &GetNodesParams{ BktInfo: bktInfo, TreeID: treeID, Path: path, Meta: keysToReturn, LatestOnly: false, AllAttrs: false, } nodes, err := c.service.GetNodes(ctx, p) if err != nil { if errors.Is(err, layer.ErrNodeNotFound) { return nil, nil } return nil, err } log := c.reqLogger(ctx) result := make([]*data.NodeVersion, 0, len(nodes)) for _, node := range nodes { nodeVersion, err := newNodeVersion(log, filepath, node) if err != nil { return nil, err } if onlyUnversioned && !nodeVersion.IsUnversioned { continue } result = append(result, nodeVersion) } return result, nil } func metaFromSettings(settings *data.BucketSettings) map[string]string { results := make(map[string]string, 3) results[FileNameKey] = settingsFileName results[versioningKV] = settings.Versioning results[lockConfigurationKV] = encodeLockConfiguration(settings.LockConfiguration) results[cannedACLKV] = settings.CannedACL if settings.OwnerKey != nil { results[ownerKeyKV] = hex.EncodeToString(settings.OwnerKey.Bytes()) } return results } func metaFromMultipart(info *data.MultipartInfo, fileName string) map[string]string { info.Meta[FileNameKey] = fileName info.Meta[uploadIDKV] = info.UploadID info.Meta[ownerKV] = info.Owner.EncodeToString() info.Meta[createdKV] = strconv.FormatInt(info.Created.UTC().UnixMilli(), 10) if info.Finished { info.Meta[finishedKV] = strconv.FormatBool(info.Finished) } return info.Meta } func (c *Tree) getSystemNode(ctx context.Context, bktInfo *data.BucketInfo, path []string) (*treeNode, error) { p := &GetNodesParams{ BktInfo: bktInfo, TreeID: systemTree, Path: path, LatestOnly: false, AllAttrs: true, } nodes, err := c.service.GetNodes(ctx, p) if err != nil { return nil, err } nodes = filterMultipartNodes(nodes) if len(nodes) == 0 { return nil, layer.ErrNodeNotFound } if len(nodes) != 1 { c.reqLogger(ctx).Warn(logs.FoundSeveralSystemNodes, zap.String("path", strings.Join(path, "/"))) } return newTreeNode(getLatestNode(nodes)) } func filterMultipartNodes(nodes []NodeResponse) []NodeResponse { res := make([]NodeResponse, 0, len(nodes)) LOOP: for _, node := range nodes { for _, meta := range node.GetMeta() { if meta.GetKey() == uploadIDKV { continue LOOP } } res = append(res, node) } return res } func (c *Tree) reqLogger(ctx context.Context) *zap.Logger { reqLogger := middleware.GetReqLog(ctx) if reqLogger != nil { return reqLogger } return c.log } func parseLockConfiguration(value string) (*data.ObjectLockConfiguration, error) { result := &data.ObjectLockConfiguration{} if len(value) == 0 { return result, nil } lockValues := strings.Split(value, ",") result.ObjectLockEnabled = lockValues[0] if len(lockValues) == 1 { return result, nil } if len(lockValues) != 4 { return nil, fmt.Errorf("invalid lock configuration: %s", value) } var err error var days, years int64 if len(lockValues[1]) > 0 { if days, err = strconv.ParseInt(lockValues[1], 10, 64); err != nil { return nil, fmt.Errorf("invalid lock configuration: %s", value) } } if len(lockValues[3]) > 0 { if years, err = strconv.ParseInt(lockValues[3], 10, 64); err != nil { return nil, fmt.Errorf("invalid lock configuration: %s", value) } } result.Rule = &data.ObjectLockRule{ DefaultRetention: &data.DefaultRetention{ Days: days, Mode: lockValues[2], Years: years, }, } return result, nil } func encodeLockConfiguration(conf *data.ObjectLockConfiguration) string { if conf == nil { return "" } if conf.Rule == nil || conf.Rule.DefaultRetention == nil { return conf.ObjectLockEnabled } defaults := conf.Rule.DefaultRetention return fmt.Sprintf("%s,%d,%s,%d", conf.ObjectLockEnabled, defaults.Days, defaults.Mode, defaults.Years) }