package tree

import (
	"context"
	"encoding/hex"
	"errors"
	"fmt"
	"io"
	"sort"
	"strconv"
	"strings"
	"time"

	"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
	"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer"
	"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/middleware"
	"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs"
	oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
	"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
	"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
	"go.uber.org/zap"
	"golang.org/x/exp/maps"
)

type (
	Tree struct {
		service ServiceClient
		log     *zap.Logger
	}

	// ServiceClient is a client to interact with tree service.
	// Each method must return ErrNodeNotFound or ErrNodeAccessDenied if relevant.
	ServiceClient interface {
		GetNodes(ctx context.Context, p *GetNodesParams) ([]NodeResponse, error)
		GetSubTree(ctx context.Context, bktInfo *data.BucketInfo, treeID string, rootID []uint64, depth uint32) ([]NodeResponse, error)
		GetSubTreeStream(ctx context.Context, bktInfo *data.BucketInfo, treeID string, rootID []uint64, depth uint32) (SubTreeStream, error)
		AddNode(ctx context.Context, bktInfo *data.BucketInfo, treeID string, parent uint64, meta map[string]string) (uint64, error)
		AddNodeByPath(ctx context.Context, bktInfo *data.BucketInfo, treeID string, path []string, meta map[string]string) (uint64, error)
		MoveNode(ctx context.Context, bktInfo *data.BucketInfo, treeID string, nodeID, parentID uint64, meta map[string]string) error
		RemoveNode(ctx context.Context, bktInfo *data.BucketInfo, treeID string, nodeID uint64) error
	}

	SubTreeStream interface {
		Next() (NodeResponse, error)
	}

	treeNode struct {
		ID        []uint64
		ParentID  []uint64
		ObjID     oid.ID
		TimeStamp []uint64
		Size      uint64
		Meta      map[string]string
	}

	GetNodesParams struct {
		BktInfo    *data.BucketInfo
		TreeID     string
		Path       []string
		Meta       []string
		LatestOnly bool
		AllAttrs   bool
	}
)

const (
	FileNameKey = "FileName"
)

var (
	// ErrNodeNotFound is returned from ServiceClient in case of not found error.
	ErrNodeNotFound = layer.ErrNodeNotFound

	// ErrNodeAccessDenied is returned from ServiceClient service in case of access denied error.
	ErrNodeAccessDenied = layer.ErrNodeAccessDenied

	// ErrGatewayTimeout is returned from ServiceClient service in case of timeout error.
	ErrGatewayTimeout = layer.ErrGatewayTimeout
)

const (
	versioningKV        = "Versioning"
	cannedACLKV         = "cannedACL"
	ownerKeyKV          = "ownerKey"
	lockConfigurationKV = "LockConfiguration"
	oidKV               = "OID"

	isCombinedKV    = "IsCombined"
	isUnversionedKV = "IsUnversioned"
	isTagKV         = "IsTag"
	uploadIDKV      = "UploadId"
	partNumberKV    = "Number"
	sizeKV          = "Size"
	etagKV          = "ETag"
	md5KV           = "MD5"
	finishedKV      = "Finished"

	// keys for lock.
	isLockKV       = "IsLock"
	legalHoldOIDKV = "LegalHoldOID"
	retentionOIDKV = "RetentionOID"
	untilDateKV    = "UntilDate"
	isComplianceKV = "IsCompliance"

	// keys for delete marker nodes.
	isDeleteMarkerKV = "IsDeleteMarker"
	ownerKV          = "Owner"
	createdKV        = "Created"

	settingsFileName      = "bucket-settings"
	notifConfFileName     = "bucket-notifications"
	corsFilename          = "bucket-cors"
	bucketTaggingFilename = "bucket-tagging"

	// versionTree -- ID of a tree with object versions.
	versionTree = "version"

	// systemTree -- ID of a tree with system objects
	// i.e. bucket settings with versioning and lock configuration, cors, notifications.
	systemTree = "system"

	separator            = "/"
	userDefinedTagPrefix = "User-Tag-"

	maxGetSubTreeDepth = 0 // means all subTree
)

// NewTree creates instance of Tree using provided address and create grpc connection.
func NewTree(service ServiceClient, log *zap.Logger) *Tree {
	return &Tree{
		service: service,
		log:     log,
	}
}

type Meta interface {
	GetKey() string
	GetValue() []byte
}

type NodeResponse interface {
	GetMeta() []Meta
	GetNodeID() []uint64
	GetParentID() []uint64
	GetTimestamp() []uint64
}

func newTreeNode(nodeInfo NodeResponse) (*treeNode, error) {
	tNode := &treeNode{
		ID:        nodeInfo.GetNodeID(),
		ParentID:  nodeInfo.GetParentID(),
		TimeStamp: nodeInfo.GetTimestamp(),
		Meta:      make(map[string]string, len(nodeInfo.GetMeta())),
	}

	if len(tNode.ID) == 0 || len(tNode.ParentID) == 0 || len(tNode.TimeStamp) == 0 {
		return nil, errors.New("invalid tree node: missing id")
	}

	if len(tNode.ID) != len(tNode.ParentID) || len(tNode.ID) != len(tNode.TimeStamp) {
		return nil, errors.New("invalid tree node: length multiple ids mismatch")
	}

	for _, kv := range nodeInfo.GetMeta() {
		switch kv.GetKey() {
		case oidKV:
			if err := tNode.ObjID.DecodeString(string(kv.GetValue())); err != nil {
				return nil, err
			}
		case sizeKV:
			if sizeStr := string(kv.GetValue()); len(sizeStr) > 0 {
				var err error
				if tNode.Size, err = strconv.ParseUint(sizeStr, 10, 64); err != nil {
					return nil, fmt.Errorf("invalid size value '%s': %w", sizeStr, err)
				}
			}
		default:
			tNode.Meta[kv.GetKey()] = string(kv.GetValue())
		}
	}

	return tNode, nil
}

func (n *treeNode) Get(key string) (string, bool) {
	value, ok := n.Meta[key]
	return value, ok
}

func (n *treeNode) FileName() (string, bool) {
	value, ok := n.Meta[FileNameKey]
	return value, ok
}

func (n *treeNode) IsSplit() bool {
	return len(n.ID) != 1 || len(n.ParentID) != 1 || len(n.TimeStamp) != 1
}

func (n *treeNode) GetLatestNodeIndex() int {
	var (
		maxTimestamp uint64
		index        int
	)

	for i, timestamp := range n.TimeStamp {
		if timestamp > maxTimestamp {
			maxTimestamp = timestamp
			index = i
		}
	}

	return index
}

func newNodeVersion(log *zap.Logger, filePath string, node NodeResponse) (*data.NodeVersion, error) {
	tNode, err := newTreeNode(node)
	if err != nil {
		return nil, fmt.Errorf("invalid tree node: %w", err)
	}

	return newNodeVersionFromTreeNode(log, filePath, tNode)
}

func newNodeVersionFromTreeNode(log *zap.Logger, filePath string, treeNode *treeNode) (*data.NodeVersion, error) {
	_, isUnversioned := treeNode.Get(isUnversionedKV)
	_, isDeleteMarker := treeNode.Get(isDeleteMarkerKV)
	_, isCombined := treeNode.Get(isCombinedKV)
	eTag, _ := treeNode.Get(etagKV)
	md5, _ := treeNode.Get(md5KV)

	if treeNode.IsSplit() {
		return nil, errors.New("invalid version tree node: this is split node")
	}

	version := &data.NodeVersion{
		BaseNodeVersion: data.BaseNodeVersion{
			ID:             treeNode.ID[0],
			ParenID:        treeNode.ParentID[0],
			OID:            treeNode.ObjID,
			Timestamp:      treeNode.TimeStamp[0],
			ETag:           eTag,
			MD5:            md5,
			Size:           treeNode.Size,
			FilePath:       filePath,
			IsDeleteMarker: isDeleteMarker,
		},
		IsUnversioned: isUnversioned,
		IsCombined:    isCombined,
	}

	if createdStr, ok := treeNode.Get(createdKV); ok {
		if utcMilli, err := strconv.ParseInt(createdStr, 10, 64); err != nil {
			log.Warn(logs.InvalidTreeKV, zap.String(createdKV, createdStr), zap.Error(err))
		} else {
			created := time.UnixMilli(utcMilli)
			version.Created = &created
		}
	}

	if ownerStr, ok := treeNode.Get(ownerKV); ok {
		var owner user.ID
		if err := owner.DecodeString(ownerStr); err != nil {
			log.Warn(logs.InvalidTreeKV, zap.String(ownerKV, ownerStr), zap.Error(err))
		} else {
			version.Owner = &owner
		}
	}

	return version, nil
}

func newMultipartInfoFromTreeNode(log *zap.Logger, filePath string, treeNode *treeNode) (*data.MultipartInfo, error) {
	uploadID, _ := treeNode.Get(uploadIDKV)
	if uploadID == "" {
		return nil, fmt.Errorf("it's not a multipart node: missing UploadId")
	}

	if treeNode.IsSplit() {
		return nil, fmt.Errorf("invalid multipart node '%s': tree node is split", filePath)
	}

	multipartInfo := &data.MultipartInfo{
		ID:       treeNode.ID[0],
		Key:      filePath,
		UploadID: uploadID,
		Meta:     treeNode.Meta,
	}

	if ownerID, ok := treeNode.Get(ownerKV); ok {
		if err := multipartInfo.Owner.DecodeString(ownerID); err != nil {
			log.Warn(logs.InvalidTreeKV, zap.String(ownerKV, ownerID), zap.Error(err))
		}
	}

	if created, ok := treeNode.Get(createdKV); ok {
		if utcMilli, err := strconv.ParseInt(created, 10, 64); err != nil {
			log.Warn(logs.InvalidTreeKV, zap.String(createdKV, created), zap.Error(err))
		} else {
			multipartInfo.Created = time.UnixMilli(utcMilli)
		}
	}

	if finished, ok := treeNode.Get(finishedKV); ok {
		if flag, err := strconv.ParseBool(finished); err != nil {
			log.Warn(logs.InvalidTreeKV, zap.String(finishedKV, finished), zap.Error(err))
		} else {
			multipartInfo.Finished = flag
		}
	}

	return multipartInfo, nil
}

func newMultipartInfo(log *zap.Logger, node NodeResponse) (*data.MultipartInfo, error) {
	if len(node.GetNodeID()) != 1 {
		return nil, errors.New("invalid multipart node: this is split node")
	}

	multipartInfo := &data.MultipartInfo{
		ID:   node.GetNodeID()[0],
		Meta: make(map[string]string, len(node.GetMeta())),
	}

	for _, kv := range node.GetMeta() {
		switch kv.GetKey() {
		case uploadIDKV:
			multipartInfo.UploadID = string(kv.GetValue())
		case FileNameKey:
			multipartInfo.Key = string(kv.GetValue())
		case createdKV:
			if utcMilli, err := strconv.ParseInt(string(kv.GetValue()), 10, 64); err != nil {
				log.Warn(logs.InvalidTreeKV, zap.String(createdKV, string(kv.GetValue())), zap.Error(err))
			} else {
				multipartInfo.Created = time.UnixMilli(utcMilli)
			}
		case ownerKV:
			if err := multipartInfo.Owner.DecodeString(string(kv.GetValue())); err != nil {
				log.Warn(logs.InvalidTreeKV, zap.String(ownerKV, string(kv.GetValue())), zap.Error(err))
			}
		case finishedKV:
			if isFinished, err := strconv.ParseBool(string(kv.GetValue())); err != nil {
				log.Warn(logs.InvalidTreeKV, zap.String(finishedKV, string(kv.GetValue())), zap.Error(err))
			} else {
				multipartInfo.Finished = isFinished
			}
		default:
			multipartInfo.Meta[kv.GetKey()] = string(kv.GetValue())
		}
	}

	if multipartInfo.UploadID == "" {
		return nil, fmt.Errorf("it's not a multipart node")
	}

	return multipartInfo, nil
}

func newPartInfo(node NodeResponse) (*data.PartInfo, error) {
	var err error
	partInfo := &data.PartInfo{}

	for _, kv := range node.GetMeta() {
		value := string(kv.GetValue())
		switch kv.GetKey() {
		case partNumberKV:
			if partInfo.Number, err = strconv.Atoi(value); err != nil {
				return nil, fmt.Errorf("invalid part number: %w", err)
			}
		case oidKV:
			if err = partInfo.OID.DecodeString(value); err != nil {
				return nil, fmt.Errorf("invalid oid: %w", err)
			}
		case etagKV:
			partInfo.ETag = value
		case sizeKV:
			if partInfo.Size, err = strconv.ParseUint(value, 10, 64); err != nil {
				return nil, fmt.Errorf("invalid part size: %w", err)
			}
		case createdKV:
			var utcMilli int64
			if utcMilli, err = strconv.ParseInt(value, 10, 64); err != nil {
				return nil, fmt.Errorf("invalid created timestamp: %w", err)
			}
			partInfo.Created = time.UnixMilli(utcMilli)
		case md5KV:
			partInfo.MD5 = value
		}
	}

	if partInfo.Number <= 0 {
		return nil, fmt.Errorf("it's not a part node")
	}

	return partInfo, nil
}

func (c *Tree) GetSettingsNode(ctx context.Context, bktInfo *data.BucketInfo) (*data.BucketSettings, error) {
	node, err := c.getSystemNode(ctx, bktInfo, []string{settingsFileName})
	if err != nil {
		return nil, fmt.Errorf("couldn't get node: %w", err)
	}

	settings := &data.BucketSettings{Versioning: data.VersioningUnversioned}
	if versioningValue, ok := node.Get(versioningKV); ok {
		settings.Versioning = versioningValue
	}

	if lockConfigurationValue, ok := node.Get(lockConfigurationKV); ok {
		if settings.LockConfiguration, err = parseLockConfiguration(lockConfigurationValue); err != nil {
			return nil, fmt.Errorf("settings node: invalid lock configuration: %w", err)
		}
	}

	settings.CannedACL, _ = node.Get(cannedACLKV)

	if ownerKeyHex, ok := node.Get(ownerKeyKV); ok {
		if settings.OwnerKey, err = keys.NewPublicKeyFromString(ownerKeyHex); err != nil {
			c.reqLogger(ctx).Error(logs.SettingsNodeInvalidOwnerKey, zap.Error(err))
		}
	}

	return settings, nil
}

func (c *Tree) PutSettingsNode(ctx context.Context, bktInfo *data.BucketInfo, settings *data.BucketSettings) error {
	node, err := c.getSystemNode(ctx, bktInfo, []string{settingsFileName})
	isErrNotFound := errors.Is(err, layer.ErrNodeNotFound)
	if err != nil && !isErrNotFound {
		return fmt.Errorf("couldn't get node: %w", err)
	}

	meta := metaFromSettings(settings)

	if isErrNotFound {
		_, err = c.service.AddNode(ctx, bktInfo, systemTree, 0, meta)
		return err
	}

	ind := node.GetLatestNodeIndex()
	if node.IsSplit() {
		c.reqLogger(ctx).Warn(logs.FoundSeveralBucketSettingsNodes)
	}

	return c.service.MoveNode(ctx, bktInfo, systemTree, node.ID[ind], 0, meta)
}

func (c *Tree) GetNotificationConfigurationNode(ctx context.Context, bktInfo *data.BucketInfo) (oid.ID, error) {
	node, err := c.getSystemNode(ctx, bktInfo, []string{notifConfFileName})
	if err != nil {
		return oid.ID{}, err
	}

	return node.ObjID, nil
}

func (c *Tree) PutNotificationConfigurationNode(ctx context.Context, bktInfo *data.BucketInfo, objID oid.ID) (oid.ID, error) {
	node, err := c.getSystemNode(ctx, bktInfo, []string{notifConfFileName})
	isErrNotFound := errors.Is(err, layer.ErrNodeNotFound)
	if err != nil && !isErrNotFound {
		return oid.ID{}, fmt.Errorf("couldn't get node: %w", err)
	}

	meta := make(map[string]string)
	meta[FileNameKey] = notifConfFileName
	meta[oidKV] = objID.EncodeToString()

	if isErrNotFound {
		if _, err = c.service.AddNode(ctx, bktInfo, systemTree, 0, meta); err != nil {
			return oid.ID{}, err
		}
		return oid.ID{}, layer.ErrNoNodeToRemove
	}

	ind := node.GetLatestNodeIndex()
	if node.IsSplit() {
		c.reqLogger(ctx).Warn(logs.FoundSeveralNotificationConfigurationNodes)
	}

	return node.ObjID, c.service.MoveNode(ctx, bktInfo, systemTree, node.ID[ind], 0, meta)
}

func (c *Tree) GetBucketCORS(ctx context.Context, bktInfo *data.BucketInfo) (oid.ID, error) {
	node, err := c.getSystemNode(ctx, bktInfo, []string{corsFilename})
	if err != nil {
		return oid.ID{}, err
	}

	return node.ObjID, nil
}

func (c *Tree) PutBucketCORS(ctx context.Context, bktInfo *data.BucketInfo, objID oid.ID) (oid.ID, error) {
	node, err := c.getSystemNode(ctx, bktInfo, []string{corsFilename})
	isErrNotFound := errors.Is(err, layer.ErrNodeNotFound)
	if err != nil && !isErrNotFound {
		return oid.ID{}, fmt.Errorf("couldn't get node: %w", err)
	}

	meta := make(map[string]string)
	meta[FileNameKey] = corsFilename
	meta[oidKV] = objID.EncodeToString()

	if isErrNotFound {
		if _, err = c.service.AddNode(ctx, bktInfo, systemTree, 0, meta); err != nil {
			return oid.ID{}, err
		}
		return oid.ID{}, layer.ErrNoNodeToRemove
	}

	ind := node.GetLatestNodeIndex()
	if node.IsSplit() {
		c.reqLogger(ctx).Warn(logs.FoundSeveralBucketCorsNodes)
	}

	return node.ObjID, c.service.MoveNode(ctx, bktInfo, systemTree, node.ID[ind], 0, meta)
}

func (c *Tree) DeleteBucketCORS(ctx context.Context, bktInfo *data.BucketInfo) (oid.ID, error) {
	node, err := c.getSystemNode(ctx, bktInfo, []string{corsFilename})
	if err != nil && !errors.Is(err, layer.ErrNodeNotFound) {
		return oid.ID{}, err
	}

	if node != nil {
		ind := node.GetLatestNodeIndex()
		if node.IsSplit() {
			c.reqLogger(ctx).Warn(logs.FoundSeveralBucketCorsNodes)
		}

		return node.ObjID, c.service.RemoveNode(ctx, bktInfo, systemTree, node.ID[ind])
	}

	return oid.ID{}, layer.ErrNoNodeToRemove
}

func (c *Tree) GetObjectTagging(ctx context.Context, bktInfo *data.BucketInfo, objVersion *data.NodeVersion) (map[string]string, error) {
	tagNode, err := c.getTreeNode(ctx, bktInfo, objVersion.ID, isTagKV)
	if err != nil {
		return nil, err
	}

	return getObjectTagging(tagNode), nil
}

func getObjectTagging(tagNode *treeNode) map[string]string {
	if tagNode == nil {
		return nil
	}

	meta := make(map[string]string)

	for key, val := range tagNode.Meta {
		if strings.HasPrefix(key, userDefinedTagPrefix) {
			meta[strings.TrimPrefix(key, userDefinedTagPrefix)] = val
		}
	}

	return meta
}

func (c *Tree) PutObjectTagging(ctx context.Context, bktInfo *data.BucketInfo, objVersion *data.NodeVersion, tagSet map[string]string) error {
	tagNode, err := c.getTreeNode(ctx, bktInfo, objVersion.ID, isTagKV)
	if err != nil {
		return err
	}

	treeTagSet := make(map[string]string)
	treeTagSet[isTagKV] = "true"

	for key, val := range tagSet {
		treeTagSet[userDefinedTagPrefix+key] = val
	}

	if tagNode == nil {
		_, err = c.service.AddNode(ctx, bktInfo, versionTree, objVersion.ID, treeTagSet)
		return err
	}

	ind := tagNode.GetLatestNodeIndex()
	if tagNode.IsSplit() {
		c.reqLogger(ctx).Warn(logs.FoundSeveralObjectTaggingNodes)
	}

	return c.service.MoveNode(ctx, bktInfo, versionTree, tagNode.ID[ind], objVersion.ID, treeTagSet)
}

func (c *Tree) DeleteObjectTagging(ctx context.Context, bktInfo *data.BucketInfo, objVersion *data.NodeVersion) error {
	return c.PutObjectTagging(ctx, bktInfo, objVersion, nil)
}

func (c *Tree) GetBucketTagging(ctx context.Context, bktInfo *data.BucketInfo) (map[string]string, error) {
	node, err := c.getSystemNode(ctx, bktInfo, []string{bucketTaggingFilename})
	if err != nil {
		return nil, err
	}

	tags := make(map[string]string)

	for key, val := range node.Meta {
		if strings.HasPrefix(key, userDefinedTagPrefix) {
			tags[strings.TrimPrefix(key, userDefinedTagPrefix)] = val
		}
	}

	return tags, nil
}

func (c *Tree) PutBucketTagging(ctx context.Context, bktInfo *data.BucketInfo, tagSet map[string]string) error {
	node, err := c.getSystemNode(ctx, bktInfo, []string{bucketTaggingFilename})
	isErrNotFound := errors.Is(err, layer.ErrNodeNotFound)
	if err != nil && !isErrNotFound {
		return fmt.Errorf("couldn't get node: %w", err)
	}

	treeTagSet := make(map[string]string)
	treeTagSet[FileNameKey] = bucketTaggingFilename

	for key, val := range tagSet {
		treeTagSet[userDefinedTagPrefix+key] = val
	}

	if isErrNotFound {
		_, err = c.service.AddNode(ctx, bktInfo, systemTree, 0, treeTagSet)
		return err
	}

	ind := node.GetLatestNodeIndex()
	if node.IsSplit() {
		c.reqLogger(ctx).Warn(logs.FoundSeveralBucketTaggingNodes)
	}

	return c.service.MoveNode(ctx, bktInfo, systemTree, node.ID[ind], 0, treeTagSet)
}

func (c *Tree) DeleteBucketTagging(ctx context.Context, bktInfo *data.BucketInfo) error {
	return c.PutBucketTagging(ctx, bktInfo, nil)
}

func (c *Tree) getTreeNode(ctx context.Context, bktInfo *data.BucketInfo, nodeID uint64, key string) (*treeNode, error) {
	nodes, err := c.getTreeNodes(ctx, bktInfo, nodeID, key)
	if err != nil {
		return nil, err
	}
	// if there will be many allocations, consider having separate
	// implementations of 'getTreeNode' and 'getTreeNodes'
	return nodes[key], nil
}

func (c *Tree) getTreeNodes(ctx context.Context, bktInfo *data.BucketInfo, nodeID uint64, keys ...string) (map[string]*treeNode, error) {
	subtree, err := c.service.GetSubTree(ctx, bktInfo, versionTree, []uint64{nodeID}, 2)
	if err != nil {
		return nil, err
	}

	treeNodes := make(map[string]*treeNode, len(keys))

	for _, s := range subtree {
		node, err := newTreeNode(s)
		if err != nil {
			return nil, err
		}
		for _, key := range keys {
			if _, ok := node.Get(key); ok {
				treeNodes[key] = node
				break
			}
		}
		if len(treeNodes) == len(keys) {
			break
		}
	}

	return treeNodes, nil
}

func (c *Tree) GetVersions(ctx context.Context, bktInfo *data.BucketInfo, filepath string) ([]*data.NodeVersion, error) {
	return c.getVersions(ctx, bktInfo, versionTree, filepath, false)
}

func (c *Tree) GetLatestVersion(ctx context.Context, bktInfo *data.BucketInfo, objectName string) (*data.NodeVersion, error) {
	meta := []string{oidKV, isCombinedKV, isUnversionedKV, isDeleteMarkerKV, etagKV, sizeKV, md5KV}
	path := pathFromName(objectName)

	p := &GetNodesParams{
		BktInfo:    bktInfo,
		TreeID:     versionTree,
		Path:       path,
		Meta:       meta,
		LatestOnly: false,
		AllAttrs:   false,
	}
	nodes, err := c.service.GetNodes(ctx, p)
	if err != nil {
		return nil, err
	}

	latestNode, err := getLatestVersionNode(nodes)
	if err != nil {
		return nil, err
	}

	return newNodeVersion(c.reqLogger(ctx), objectName, latestNode)
}

func getLatestVersionNode(nodes []NodeResponse) (NodeResponse, error) {
	var (
		maxCreationTime uint64
		targetIndexNode = -1
	)

	for i, node := range nodes {
		if !checkExistOID(node.GetMeta()) {
			continue
		}

		if currentCreationTime := getMaxTimestamp(node); currentCreationTime > maxCreationTime {
			targetIndexNode = i
			maxCreationTime = currentCreationTime
		}
	}

	if targetIndexNode == -1 {
		return nil, layer.ErrNodeNotFound
	}

	return nodes[targetIndexNode], nil
}

func getLatestNode(nodes []NodeResponse) NodeResponse {
	if len(nodes) == 0 {
		return nil
	}

	var (
		index        int
		maxTimestamp uint64
	)

	for i, node := range nodes {
		if timestamp := getMaxTimestamp(node); timestamp > maxTimestamp {
			index = i
			maxTimestamp = timestamp
		}
	}

	return nodes[index]
}

func getMaxTimestamp(node NodeResponse) uint64 {
	var maxTimestamp uint64

	for _, timestamp := range node.GetTimestamp() {
		if timestamp > maxTimestamp {
			maxTimestamp = timestamp
		}
	}

	return maxTimestamp
}

func checkExistOID(meta []Meta) bool {
	for _, kv := range meta {
		if kv.GetKey() == "OID" {
			return true
		}
	}

	return false
}

// pathFromName splits name by '/'.
func pathFromName(objectName string) []string {
	return strings.Split(objectName, separator)
}

type DummySubTreeStream struct {
	data NodeResponse
	read bool
}

func (s *DummySubTreeStream) Next() (NodeResponse, error) {
	if s.read {
		return nil, io.EOF
	}

	s.read = true
	return s.data, nil
}

type MultiID []uint64

func (m MultiID) Equal(id MultiID) bool {
	seen := make(map[uint64]struct{}, len(m))

	for i := range m {
		seen[m[i]] = struct{}{}
	}

	for i := range id {
		if _, ok := seen[id[i]]; !ok {
			return false
		}
	}

	return true
}

type VersionsByPrefixStreamImpl struct {
	ctx                context.Context
	rootID             MultiID
	intermediateRootID MultiID
	service            ServiceClient
	bktInfo            *data.BucketInfo
	mainStream         SubTreeStream
	innerStream        SubTreeStream
	headPrefix         string
	tailPrefix         string
	namesMap           map[uint64]string
	ended              bool
	latestOnly         bool
	currentLatest      *data.NodeVersion
	log                *zap.Logger
}

func (s *VersionsByPrefixStreamImpl) Next(context.Context) (*data.NodeVersion, error) {
	if s.ended {
		return nil, io.EOF
	}

	for {
		if s.innerStream == nil {
			node, err := s.getNodeFromMainStream()
			if err != nil {
				if errors.Is(err, io.EOF) {
					s.ended = true
					if s.currentLatest != nil {
						return s.currentLatest, nil
					}
				}
				return nil, fmt.Errorf("get node from main stream: %w", err)
			}

			if err = s.initInnerStream(node); err != nil {
				return nil, fmt.Errorf("init inner stream: %w", err)
			}
		}

		nodeVersion, err := s.getNodeVersionFromInnerStream()
		if err != nil {
			if errors.Is(err, io.EOF) {
				s.innerStream = nil
				maps.Clear(s.namesMap)
				if s.currentLatest != nil && !s.intermediateRootID.Equal([]uint64{s.currentLatest.ID}) {
					return s.currentLatest, nil
				}
				continue
			}
			return nil, fmt.Errorf("inner stream: %w", err)
		}
		return nodeVersion, nil
	}
}

func (s *VersionsByPrefixStreamImpl) getNodeFromMainStream() (NodeResponse, error) {
	for {
		node, err := s.mainStream.Next()
		if err != nil {
			if errors.Is(err, ErrNodeNotFound) {
				return nil, io.EOF
			}
			return nil, fmt.Errorf("main stream next: %w", err)
		}

		if !s.rootID.Equal(node.GetNodeID()) && strings.HasPrefix(getFilename(node), s.tailPrefix) {
			return node, nil
		}
	}
}

func (s *VersionsByPrefixStreamImpl) initInnerStream(node NodeResponse) (err error) {
	if s.rootID.Equal(node.GetParentID()) {
		s.intermediateRootID = node.GetNodeID()
	}

	if isIntermediate(node) {
		s.innerStream, err = s.service.GetSubTreeStream(s.ctx, s.bktInfo, versionTree, node.GetNodeID(), maxGetSubTreeDepth)
		if err != nil {
			return fmt.Errorf("get sub tree node from main stream: %w", err)
		}
	} else {
		s.innerStream = &DummySubTreeStream{data: node}
	}

	return nil
}

func (s *VersionsByPrefixStreamImpl) getNodeVersionFromInnerStream() (*data.NodeVersion, error) {
	for {
		node, err := s.innerStream.Next()
		if err != nil {
			return nil, fmt.Errorf("inner stream: %w", err)
		}

		nodeVersion, skip, err := s.parseNodeResponse(node)
		if err != nil {
			return nil, err
		}
		if skip {
			continue
		}

		if s.latestOnly {
			if s.currentLatest == nil {
				s.currentLatest = nodeVersion
				continue
			}

			if s.currentLatest.FilePath != nodeVersion.FilePath {
				res := s.currentLatest
				s.currentLatest = nodeVersion
				return res, nil
			}

			if s.currentLatest.Timestamp < nodeVersion.Timestamp {
				s.currentLatest = nodeVersion
			}

			continue
		}

		return nodeVersion, nil
	}
}

func (s *VersionsByPrefixStreamImpl) parseNodeResponse(node NodeResponse) (res *data.NodeVersion, skip bool, err error) {
	trNode, fileName, err := parseTreeNode(node)
	if err != nil {
		s.log.Debug(logs.ParseTreeNode, zap.Error(err))
		return nil, true, nil
	}

	var parentPrefix string
	if s.headPrefix != "" { // The root of subTree can also have a parent
		parentPrefix = strings.TrimSuffix(s.headPrefix, separator) + separator // To avoid 'foo//bar'
	}

	var filepath string
	if !s.intermediateRootID.Equal(trNode.ID) {
		if filepath, err = formFilePath(node, fileName, s.namesMap); err != nil {
			return nil, false, fmt.Errorf("invalid node order: %w", err)
		}
	} else {
		filepath = parentPrefix + fileName
		for _, id := range trNode.ID {
			s.namesMap[id] = filepath
		}
	}

	if trNode.ObjID.Equals(oid.ID{}) { // The node can be intermediate, but we still want to update namesMap
		return nil, true, nil
	}

	nodeVersion, err := newNodeVersionFromTreeNode(s.log, filepath, trNode)
	return nodeVersion, false, err
}

func (c *Tree) InitVersionsByPrefixStream(ctx context.Context, bktInfo *data.BucketInfo, prefix string, latestOnly bool) (data.VersionsStream, error) {
	mainStream, tailPrefix, rootID, err := c.getSubTreeByPrefixMainStream(ctx, bktInfo, versionTree, prefix)
	if err != nil {
		if errors.Is(err, io.EOF) {
			return &VersionsByPrefixStreamImpl{ended: true}, nil
		}
		return nil, err
	}

	return &VersionsByPrefixStreamImpl{
		ctx:        ctx,
		namesMap:   map[uint64]string{},
		rootID:     rootID,
		service:    c.service,
		bktInfo:    bktInfo,
		mainStream: mainStream,
		headPrefix: strings.TrimSuffix(prefix, tailPrefix),
		tailPrefix: tailPrefix,
		latestOnly: latestOnly,
		log:        c.reqLogger(ctx),
	}, nil
}

func (c *Tree) getSubTreeByPrefixMainStream(ctx context.Context, bktInfo *data.BucketInfo, treeID, prefix string) (SubTreeStream, string, []uint64, error) {
	rootID, tailPrefix, err := c.determinePrefixNode(ctx, bktInfo, treeID, prefix)
	if err != nil {
		if errors.Is(err, layer.ErrNodeNotFound) {
			return nil, "", nil, io.EOF
		}
		return nil, "", nil, err
	}

	subTree, err := c.service.GetSubTreeStream(ctx, bktInfo, treeID, rootID, 2)
	if err != nil {
		if errors.Is(err, layer.ErrNodeNotFound) {
			return nil, "", nil, io.EOF
		}
		return nil, "", nil, err
	}

	return subTree, tailPrefix, rootID, nil
}

func (c *Tree) determinePrefixNode(ctx context.Context, bktInfo *data.BucketInfo, treeID, prefix string) ([]uint64, string, error) {
	rootID := []uint64{0}
	path := strings.Split(prefix, separator)
	tailPrefix := path[len(path)-1]

	if len(path) > 1 {
		var err error
		rootID, err = c.getPrefixNodeID(ctx, bktInfo, treeID, path[:len(path)-1])
		if err != nil {
			return nil, "", err
		}
	}

	return rootID, tailPrefix, nil
}

func (c *Tree) getPrefixNodeID(ctx context.Context, bktInfo *data.BucketInfo, treeID string, prefixPath []string) ([]uint64, error) {
	p := &GetNodesParams{
		BktInfo:    bktInfo,
		TreeID:     treeID,
		Path:       prefixPath,
		LatestOnly: false,
		AllAttrs:   true,
	}
	nodes, err := c.service.GetNodes(ctx, p)
	if err != nil {
		return nil, err
	}

	var intermediateNodes []uint64
	for _, node := range nodes {
		if isIntermediate(node) {
			intermediateNodes = append(intermediateNodes, node.GetNodeID()...)
		}
	}

	if len(intermediateNodes) == 0 {
		return nil, layer.ErrNodeNotFound
	}

	return intermediateNodes, nil
}

func (c *Tree) getSubTreeByPrefix(ctx context.Context, bktInfo *data.BucketInfo, treeID, prefix string, latestOnly bool) ([]NodeResponse, string, error) {
	rootID, tailPrefix, err := c.determinePrefixNode(ctx, bktInfo, treeID, prefix)
	if err != nil {
		if errors.Is(err, layer.ErrNodeNotFound) {
			return nil, "", nil
		}
		return nil, "", err
	}

	subTree, err := c.service.GetSubTree(ctx, bktInfo, treeID, rootID, 2)
	if err != nil {
		if errors.Is(err, layer.ErrNodeNotFound) {
			return nil, "", nil
		}
		return nil, "", err
	}

	nodesMap := make(map[string][]NodeResponse, len(subTree))
	for _, node := range subTree {
		if MultiID(rootID).Equal(node.GetNodeID()) {
			continue
		}

		fileName := getFilename(node)
		if !strings.HasPrefix(fileName, tailPrefix) {
			continue
		}

		nodes := nodesMap[fileName]

		// Add all nodes if flag latestOnly is false.
		// Add all intermediate nodes
		// and only latest leaf (object) nodes. To do this store and replace last leaf (object) node in nodes[0]
		if len(nodes) == 0 {
			nodes = []NodeResponse{node}
		} else if !latestOnly || isIntermediate(node) {
			nodes = append(nodes, node)
		} else if isIntermediate(nodes[0]) {
			nodes = append([]NodeResponse{node}, nodes...)
		} else if getMaxTimestamp(node) > getMaxTimestamp(nodes[0]) {
			nodes[0] = node
		}

		nodesMap[fileName] = nodes
	}

	result := make([]NodeResponse, 0, len(subTree))
	for _, nodes := range nodesMap {
		result = append(result, nodes...)
	}

	return result, strings.TrimSuffix(prefix, tailPrefix), nil
}

func getFilename(node NodeResponse) string {
	for _, kv := range node.GetMeta() {
		if kv.GetKey() == FileNameKey {
			return string(kv.GetValue())
		}
	}

	return ""
}

func isIntermediate(node NodeResponse) bool {
	if len(node.GetMeta()) != 1 {
		return false
	}

	return node.GetMeta()[0].GetKey() == FileNameKey
}

func formFilePath(node NodeResponse, fileName string, namesMap map[uint64]string) (string, error) {
	var filepath string

	for i, id := range node.GetParentID() {
		parentPath, ok := namesMap[id]
		if !ok {
			return "", fmt.Errorf("couldn't get parent path")
		}

		filepath = parentPath + separator + fileName
		namesMap[node.GetNodeID()[i]] = filepath
	}

	return filepath, nil
}

func parseTreeNode(node NodeResponse) (*treeNode, string, error) {
	tNode, err := newTreeNode(node)
	if err != nil { // invalid OID attribute
		return nil, "", err
	}

	fileName, ok := tNode.FileName()
	if !ok {
		return nil, "", fmt.Errorf("doesn't contain FileName")
	}

	return tNode, fileName, nil
}

func formLatestNodeKey(parentID uint64, fileName string) string {
	return strconv.FormatUint(parentID, 10) + "." + fileName
}

func (c *Tree) GetUnversioned(ctx context.Context, bktInfo *data.BucketInfo, filepath string) (*data.NodeVersion, error) {
	return c.getUnversioned(ctx, bktInfo, versionTree, filepath)
}

func (c *Tree) getUnversioned(ctx context.Context, bktInfo *data.BucketInfo, treeID, filepath string) (*data.NodeVersion, error) {
	nodes, err := c.getVersions(ctx, bktInfo, treeID, filepath, true)
	if err != nil {
		return nil, err
	}

	if len(nodes) == 0 {
		return nil, layer.ErrNodeNotFound
	}

	if len(nodes) > 1 {
		c.reqLogger(ctx).Debug(logs.FoundMoreThanOneUnversionedNode,
			zap.String("treeID", treeID), zap.String("filepath", filepath))
	}

	sort.Slice(nodes, func(i, j int) bool {
		return nodes[i].Timestamp > nodes[j].Timestamp
	})

	return nodes[0], nil
}

func (c *Tree) AddVersion(ctx context.Context, bktInfo *data.BucketInfo, version *data.NodeVersion) (uint64, error) {
	return c.addVersion(ctx, bktInfo, versionTree, version)
}

func (c *Tree) RemoveVersion(ctx context.Context, bktInfo *data.BucketInfo, id uint64) error {
	return c.service.RemoveNode(ctx, bktInfo, versionTree, id)
}

func (c *Tree) CreateMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo, info *data.MultipartInfo) error {
	path := pathFromName(info.Key)
	meta := metaFromMultipart(info, path[len(path)-1])
	_, err := c.service.AddNodeByPath(ctx, bktInfo, systemTree, path[:len(path)-1], meta)

	return err
}

func (c *Tree) GetMultipartUploadsByPrefix(ctx context.Context, bktInfo *data.BucketInfo, prefix string) ([]*data.MultipartInfo, error) {
	subTreeNodes, headPrefix, err := c.getSubTreeByPrefix(ctx, bktInfo, systemTree, prefix, false)
	if err != nil {
		return nil, err
	}

	var result []*data.MultipartInfo
	for _, node := range subTreeNodes {
		multipartUploads, err := c.getSubTreeMultipartUploads(ctx, bktInfo, node.GetNodeID(), headPrefix)
		if err != nil {
			return nil, err
		}
		result = append(result, multipartUploads...)
	}

	return result, nil
}

func (c *Tree) getSubTreeMultipartUploads(ctx context.Context, bktInfo *data.BucketInfo, nodeID []uint64, parentFilePath string) ([]*data.MultipartInfo, error) {
	subTree, err := c.service.GetSubTree(ctx, bktInfo, systemTree, nodeID, maxGetSubTreeDepth)
	if err != nil {
		return nil, err
	}

	var parentPrefix string
	if parentFilePath != "" { // The root of subTree can also have a parent
		parentPrefix = strings.TrimSuffix(parentFilePath, separator) + separator // To avoid 'foo//bar'
	}

	var filepath string
	namesMap := make(map[uint64]string, len(subTree))
	multiparts := make(map[string][]*data.MultipartInfo, len(subTree))

	for i, node := range subTree {
		tNode, fileName, err := parseTreeNode(node)
		if err != nil {
			continue
		}

		if i != 0 {
			if filepath, err = formFilePath(node, fileName, namesMap); err != nil {
				return nil, fmt.Errorf("invalid node order: %w", err)
			}
		} else {
			filepath = parentPrefix + fileName
			for _, id := range tNode.ID {
				namesMap[id] = filepath
			}
		}

		multipartInfo, err := newMultipartInfoFromTreeNode(c.reqLogger(ctx), filepath, tNode)
		if err != nil || multipartInfo.Finished {
			continue
		}

		for _, id := range node.GetParentID() {
			key := formLatestNodeKey(id, fileName)
			multipartInfos, ok := multiparts[key]
			if !ok {
				multipartInfos = []*data.MultipartInfo{multipartInfo}
			} else {
				multipartInfos = append(multipartInfos, multipartInfo)
			}

			multiparts[key] = multipartInfos
		}
	}

	result := make([]*data.MultipartInfo, 0, len(multiparts))
	for _, multipartInfo := range multiparts {
		result = append(result, multipartInfo...)
	}

	return result, nil
}

func (c *Tree) GetMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo, objectName, uploadID string) (*data.MultipartInfo, error) {
	path := pathFromName(objectName)
	p := &GetNodesParams{
		BktInfo:  bktInfo,
		TreeID:   systemTree,
		Path:     path,
		AllAttrs: true,
	}

	nodes, err := c.service.GetNodes(ctx, p)
	if err != nil {
		return nil, err
	}

	log := c.reqLogger(ctx)
	for _, node := range nodes {
		info, err := newMultipartInfo(log, node)
		if err != nil {
			continue
		}
		if info.UploadID == uploadID {
			if info.Finished {
				break
			}
			return info, nil
		}
	}

	return nil, layer.ErrNodeNotFound
}

func (c *Tree) AddPart(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64, info *data.PartInfo) (oldObjIDToDelete oid.ID, err error) {
	parts, err := c.service.GetSubTree(ctx, bktInfo, systemTree, []uint64{multipartNodeID}, 2)
	if err != nil {
		return oid.ID{}, err
	}

	meta := map[string]string{
		partNumberKV: strconv.Itoa(info.Number),
		oidKV:        info.OID.EncodeToString(),
		sizeKV:       strconv.FormatUint(info.Size, 10),
		createdKV:    strconv.FormatInt(info.Created.UTC().UnixMilli(), 10),
		etagKV:       info.ETag,
		md5KV:        info.MD5,
	}

	for _, part := range parts {
		if len(part.GetNodeID()) != 1 {
			// multipart parts nodeID shouldn't have multiple values
			c.reqLogger(ctx).Warn(logs.UnexpectedMultiNodeIDsInSubTreeMultiParts,
				zap.String("key", info.Key),
				zap.String("upload id", info.UploadID),
				zap.Uint64("multipart node id ", multipartNodeID),
				zap.Uint64s("node ids", part.GetNodeID()))
			continue
		}
		nodeID := part.GetNodeID()[0]
		if nodeID == multipartNodeID {
			continue
		}
		partInfo, err := newPartInfo(part)
		if err != nil {
			c.reqLogger(ctx).Warn(logs.FailedToParsePartInfo,
				zap.String("key", info.Key),
				zap.String("upload id", info.UploadID),
				zap.Uint64("multipart node id ", multipartNodeID),
				zap.Error(err))
			continue
		}
		if partInfo.Number == info.Number {
			return partInfo.OID, c.service.MoveNode(ctx, bktInfo, systemTree, nodeID, multipartNodeID, meta)
		}
	}

	if _, err = c.service.AddNode(ctx, bktInfo, systemTree, multipartNodeID, meta); err != nil {
		return oid.ID{}, err
	}

	return oid.ID{}, layer.ErrNoNodeToRemove
}

func (c *Tree) GetParts(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64) ([]*data.PartInfo, error) {
	parts, err := c.service.GetSubTree(ctx, bktInfo, systemTree, []uint64{multipartNodeID}, 2)
	if err != nil {
		return nil, err
	}

	result := make([]*data.PartInfo, 0, len(parts))
	for _, part := range parts {
		if len(part.GetNodeID()) != 1 {
			// multipart parts nodeID shouldn't have multiple values
			c.reqLogger(ctx).Warn(logs.UnexpectedMultiNodeIDsInSubTreeMultiParts,
				zap.Uint64("multipart node id ", multipartNodeID),
				zap.Uint64s("node ids", part.GetNodeID()))
			continue
		}
		if part.GetNodeID()[0] == multipartNodeID {
			continue
		}
		partInfo, err := newPartInfo(part)
		if err != nil {
			c.reqLogger(ctx).Warn(logs.FailedToParsePartInfo,
				zap.Uint64("multipart node id ", multipartNodeID),
				zap.Uint64s("node ids", part.GetNodeID()),
				zap.Error(err))
			continue
		}
		result = append(result, partInfo)
	}

	return result, nil
}

func (c *Tree) DeleteMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo, multipartInfo *data.MultipartInfo) error {
	err := c.service.RemoveNode(ctx, bktInfo, systemTree, multipartInfo.ID)
	if err != nil {
		return err
	}

	multipartInfo.Finished = true

	return c.CreateMultipartUpload(ctx, bktInfo, multipartInfo)
}

func (c *Tree) PutLock(ctx context.Context, bktInfo *data.BucketInfo, nodeID uint64, lock *data.LockInfo) error {
	meta := map[string]string{isLockKV: "true"}

	if lock.IsLegalHoldSet() {
		meta[legalHoldOIDKV] = lock.LegalHold().EncodeToString()
	}
	if lock.IsRetentionSet() {
		meta[retentionOIDKV] = lock.Retention().EncodeToString()
		meta[untilDateKV] = lock.UntilDate()
		if lock.IsCompliance() {
			meta[isComplianceKV] = "true"
		}
	}

	if lock.ID() == 0 {
		_, err := c.service.AddNode(ctx, bktInfo, versionTree, nodeID, meta)
		return err
	}

	return c.service.MoveNode(ctx, bktInfo, versionTree, lock.ID(), nodeID, meta)
}

func (c *Tree) GetLock(ctx context.Context, bktInfo *data.BucketInfo, nodeID uint64) (*data.LockInfo, error) {
	lockNode, err := c.getTreeNode(ctx, bktInfo, nodeID, isLockKV)
	if err != nil {
		return nil, err
	}

	return getLock(lockNode)
}

func getLock(lockNode *treeNode) (*data.LockInfo, error) {
	if lockNode == nil {
		return &data.LockInfo{}, nil
	}
	if lockNode.IsSplit() {
		return nil, errors.New("invalid lock node: this is split node")
	}
	lockInfo := data.NewLockInfo(lockNode.ID[0])

	if legalHold, ok := lockNode.Get(legalHoldOIDKV); ok {
		var legalHoldOID oid.ID
		if err := legalHoldOID.DecodeString(legalHold); err != nil {
			return nil, fmt.Errorf("invalid legal hold object id: %w", err)
		}
		lockInfo.SetLegalHold(legalHoldOID)
	}

	if retention, ok := lockNode.Get(retentionOIDKV); ok {
		var retentionOID oid.ID
		if err := retentionOID.DecodeString(retention); err != nil {
			return nil, fmt.Errorf("invalid retention object id: %w", err)
		}
		_, isCompliance := lockNode.Get(isComplianceKV)
		untilDate, _ := lockNode.Get(untilDateKV)
		lockInfo.SetRetention(retentionOID, untilDate, isCompliance)
	}

	return lockInfo, nil
}

func (c *Tree) GetObjectTaggingAndLock(ctx context.Context, bktInfo *data.BucketInfo, objVersion *data.NodeVersion) (map[string]string, *data.LockInfo, error) {
	nodes, err := c.getTreeNodes(ctx, bktInfo, objVersion.ID, isTagKV, isLockKV)
	if err != nil {
		return nil, nil, err
	}

	lockInfo, err := getLock(nodes[isLockKV])
	if err != nil {
		return nil, nil, err
	}

	return getObjectTagging(nodes[isTagKV]), lockInfo, nil
}

func (c *Tree) addVersion(ctx context.Context, bktInfo *data.BucketInfo, treeID string, version *data.NodeVersion) (uint64, error) {
	path := pathFromName(version.FilePath)
	meta := map[string]string{
		oidKV:       version.OID.EncodeToString(),
		FileNameKey: path[len(path)-1],
		ownerKV:     version.Owner.EncodeToString(),
		createdKV:   strconv.FormatInt(version.Created.UTC().UnixMilli(), 10),
	}

	if version.Size > 0 {
		meta[sizeKV] = strconv.FormatUint(version.Size, 10)
	}
	if len(version.ETag) > 0 {
		meta[etagKV] = version.ETag
	}
	if len(version.MD5) > 0 {
		meta[md5KV] = version.MD5
	}

	if version.IsDeleteMarker {
		meta[isDeleteMarkerKV] = "true"
	}

	if version.IsCombined {
		meta[isCombinedKV] = "true"
	}

	if version.IsUnversioned {
		meta[isUnversionedKV] = "true"

		node, err := c.getUnversioned(ctx, bktInfo, treeID, version.FilePath)
		if err == nil {
			if err = c.service.MoveNode(ctx, bktInfo, treeID, node.ID, node.ParenID, meta); err != nil {
				return 0, err
			}

			return node.ID, c.clearOutdatedVersionInfo(ctx, bktInfo, treeID, node.ID)
		}

		if !errors.Is(err, layer.ErrNodeNotFound) {
			return 0, err
		}
	}

	return c.service.AddNodeByPath(ctx, bktInfo, treeID, path[:len(path)-1], meta)
}

func (c *Tree) clearOutdatedVersionInfo(ctx context.Context, bktInfo *data.BucketInfo, treeID string, nodeID uint64) error {
	taggingNode, err := c.getTreeNode(ctx, bktInfo, nodeID, isTagKV)
	if err != nil {
		return err
	}
	if taggingNode != nil {
		return c.service.RemoveNode(ctx, bktInfo, treeID, taggingNode.ID[0])
	}

	return nil
}

func (c *Tree) getVersions(ctx context.Context, bktInfo *data.BucketInfo, treeID, filepath string, onlyUnversioned bool) ([]*data.NodeVersion, error) {
	keysToReturn := []string{oidKV, isCombinedKV, isUnversionedKV, isDeleteMarkerKV, etagKV, sizeKV, md5KV}
	path := pathFromName(filepath)
	p := &GetNodesParams{
		BktInfo:    bktInfo,
		TreeID:     treeID,
		Path:       path,
		Meta:       keysToReturn,
		LatestOnly: false,
		AllAttrs:   false,
	}
	nodes, err := c.service.GetNodes(ctx, p)
	if err != nil {
		if errors.Is(err, layer.ErrNodeNotFound) {
			return nil, nil
		}
		return nil, err
	}

	log := c.reqLogger(ctx)
	result := make([]*data.NodeVersion, 0, len(nodes))
	for _, node := range nodes {
		nodeVersion, err := newNodeVersion(log, filepath, node)
		if err != nil {
			return nil, err
		}

		if onlyUnversioned && !nodeVersion.IsUnversioned {
			continue
		}

		result = append(result, nodeVersion)
	}

	return result, nil
}

func metaFromSettings(settings *data.BucketSettings) map[string]string {
	results := make(map[string]string, 3)

	results[FileNameKey] = settingsFileName
	results[versioningKV] = settings.Versioning
	results[lockConfigurationKV] = encodeLockConfiguration(settings.LockConfiguration)
	results[cannedACLKV] = settings.CannedACL
	if settings.OwnerKey != nil {
		results[ownerKeyKV] = hex.EncodeToString(settings.OwnerKey.Bytes())
	}

	return results
}

func metaFromMultipart(info *data.MultipartInfo, fileName string) map[string]string {
	info.Meta[FileNameKey] = fileName
	info.Meta[uploadIDKV] = info.UploadID
	info.Meta[ownerKV] = info.Owner.EncodeToString()
	info.Meta[createdKV] = strconv.FormatInt(info.Created.UTC().UnixMilli(), 10)
	if info.Finished {
		info.Meta[finishedKV] = strconv.FormatBool(info.Finished)
	}

	return info.Meta
}

func (c *Tree) getSystemNode(ctx context.Context, bktInfo *data.BucketInfo, path []string) (*treeNode, error) {
	p := &GetNodesParams{
		BktInfo:    bktInfo,
		TreeID:     systemTree,
		Path:       path,
		LatestOnly: false,
		AllAttrs:   true,
	}
	nodes, err := c.service.GetNodes(ctx, p)
	if err != nil {
		return nil, err
	}

	nodes = filterMultipartNodes(nodes)

	if len(nodes) == 0 {
		return nil, layer.ErrNodeNotFound
	}
	if len(nodes) != 1 {
		c.reqLogger(ctx).Warn(logs.FoundSeveralSystemNodes, zap.String("path", strings.Join(path, "/")))
	}

	return newTreeNode(getLatestNode(nodes))
}

func filterMultipartNodes(nodes []NodeResponse) []NodeResponse {
	res := make([]NodeResponse, 0, len(nodes))

LOOP:
	for _, node := range nodes {
		for _, meta := range node.GetMeta() {
			if meta.GetKey() == uploadIDKV {
				continue LOOP
			}
		}

		res = append(res, node)
	}

	return res
}

func (c *Tree) reqLogger(ctx context.Context) *zap.Logger {
	reqLogger := middleware.GetReqLog(ctx)
	if reqLogger != nil {
		return reqLogger
	}
	return c.log
}

func parseLockConfiguration(value string) (*data.ObjectLockConfiguration, error) {
	result := &data.ObjectLockConfiguration{}
	if len(value) == 0 {
		return result, nil
	}

	lockValues := strings.Split(value, ",")
	result.ObjectLockEnabled = lockValues[0]

	if len(lockValues) == 1 {
		return result, nil
	}

	if len(lockValues) != 4 {
		return nil, fmt.Errorf("invalid lock configuration: %s", value)
	}

	var err error
	var days, years int64

	if len(lockValues[1]) > 0 {
		if days, err = strconv.ParseInt(lockValues[1], 10, 64); err != nil {
			return nil, fmt.Errorf("invalid lock configuration: %s", value)
		}
	}

	if len(lockValues[3]) > 0 {
		if years, err = strconv.ParseInt(lockValues[3], 10, 64); err != nil {
			return nil, fmt.Errorf("invalid lock configuration: %s", value)
		}
	}

	result.Rule = &data.ObjectLockRule{
		DefaultRetention: &data.DefaultRetention{
			Days:  days,
			Mode:  lockValues[2],
			Years: years,
		},
	}

	return result, nil
}

func encodeLockConfiguration(conf *data.ObjectLockConfiguration) string {
	if conf == nil {
		return ""
	}

	if conf.Rule == nil || conf.Rule.DefaultRetention == nil {
		return conf.ObjectLockEnabled
	}

	defaults := conf.Rule.DefaultRetention
	return fmt.Sprintf("%s,%d,%s,%d", conf.ObjectLockEnabled, defaults.Days, defaults.Mode, defaults.Years)
}