@ -37,6 +37,11 @@ require (
require (
require (
github.com/benbjohnson/clock v1.1.0 // indirect
cloud.google.com/go v0.110.0 // indirect
cloud.google.com/go/compute v1.19.1 // indirect
cloud.google.com/go/compute/metadata v0.2.3 // indirect
cloud.google.com/go/iam v0.13.0 // indirect
dario.cat/mergo v1.0.0 // indirect
git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.15.1-0.20230802075510-964c3edb3f44 // indirect
git.frostfs.info/TrueCloudLab/frostfs-contract v0.0.0-20230307110621-19a8ef2d02fb // indirect
@ -44,23 +49,47 @@ require (
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20230825064515-46a214d065f8
git.frostfs.info/TrueCloudLab/hrw v1.2.1 // indirect
git.frostfs.info/TrueCloudLab/rfc6979 v0.4.0 // indirect
github.com/Azure/azure-sdk-for-go/sdk/internal v1.3.0 // indirect
github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 // indirect
github.com/AzureAD/microsoft-authentication-library-for-go v1.0.0 // indirect
github.com/Microsoft/go-winio v0.6.1 // indirect
github.com/antlr4-go/antlr/v4 v4.13.0 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/bitly/go-simplejson v0.5.0 // indirect
github.com/bugsnag/osext v0.0.0-20130617224835-0dd3f918b21b // indirect
github.com/bugsnag/panicwrap v0.0.0-20151223152923-e2c28503fcd0 // indirect
github.com/cenkalti/backoff/v4 v4.2.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/containerd/containerd v1.7.3 // indirect
github.com/cpuguy83/dockercfg v0.3.1 // indirect
github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect
github.com/cyphar/filepath-securejoin v0.2.3 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 // indirect
github.com/docker/distribution v2.8.2+incompatible // indirect
github.com/docker/docker v24.0.5+incompatible // indirect
github.com/docker/go-connections v0.4.0 // indirect
github.com/docker/go-units v0.5.0 // indirect
github.com/felixge/httpsnoop v1.0.1 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang-jwt/jwt/v4 v4.5.0 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/google/go-cmp v0.5.9 // indirect
github.com/google/uuid v1.3.0
github.com/googleapis/enterprise-certificate-proxy v0.2.3 // indirect
github.com/googleapis/gax-go/v2 v2.7.1 // indirect
github.com/gorilla/websocket v1.5.0 // indirect
github.com/hashicorp/golang-lru v0.6.0 // indirect
github.com/hashicorp/golang-lru/v2 v2.0.5 // indirect
github.com/inconshreveable/mousetrap v1.0.1 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/kr/pretty v0.3.0 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/kylelemons/godebug v1.1.0 // indirect
github.com/magiconair/properties v1.8.7 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
github.com/mitchellh/osext v0.0.0-20151018003038-5e2d6d41470f // indirect
github.com/moby/patternmatcher v0.5.0 // indirect
github.com/moby/sys/sequential v0.5.0 // indirect
github.com/moby/term v0.5.0 // indirect
@ -70,63 +99,35 @@ require (
github.com/nspcc-dev/neo-go/pkg/interop v0.0.0-20230615193820-9185820289ce // indirect
github.com/nspcc-dev/rfc6979 v0.2.0 // indirect
github.com/opencontainers/runc v1.1.5 // indirect
github.com/pkg/browser v0.0.0-20210911075715-681adbf594b8 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_golang v1.14.0 // indirect
github.com/prometheus/client_model v0.3.0 // indirect
github.com/prometheus/common v0.37.0 // indirect
github.com/prometheus/procfs v0.8.0 // indirect
github.com/rogpeppe/go-internal v1.8.1 // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/twmb/murmur3 v1.1.8 // indirect
github.com/twmb/murmur3 v1.1.8 // indirect
github.com/urfave/cli v1.22.12 // indirect
go.opencensus.io v0.24.0 // indirect
go.uber.org/atomic v1.10.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.24.0 // indirect
golang.org/x/exp v0.0.0-20230515195305-f3d0a9c9a5cc // indirect
golang.org/x/mod v0.9.0 // indirect
golang.org/x/net v0.10.0 // indirect
golang.org/x/sync v0.2.0 // indirect
golang.org/x/sys v0.11.0 // indirect
golang.org/x/text v0.9.0 // indirect
golang.org/x/tools v0.7.0 // indirect
golang.org/x/tools v0.7.0 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto v0.0.0-20230526161137-0005af68ea54 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20230525234035-dd9d682886f9 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20230525234030-28d5490b6b19 // indirect
google.golang.org/grpc v1.57.0
google.golang.org/protobuf v1.30.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
@ -83,7 +83,7 @@ func newSizeLimiterWriter(ctx context.Context, d *driver, path string, splitInfo
previous: formPreviousChain(splitInfo, parts),
previous: formPreviousChain(splitInfo, parts),
parentHashers: parentHashers,
parentHashers: parentHashers,
targetInit: func() transformer.ChunkedObjectWriter {
targetInit: func() transformer.ChunkedObjectWriter {
return d.newObjTarget()
return d.newObjTarget(path, splitInfo.SplitID())
parent: parent,
parent: parent,
@ -233,6 +233,11 @@ func (w *writer) release(withParent bool) (*transformer.AccessIdentifiers, error
if _, err = w.release(false); err != nil {
if _, err = w.release(false); err != nil {
return nil, fmt.Errorf("could not release linking object: %w", err)
return nil, fmt.Errorf("could not release linking object: %w", err)
err = w.driver.treeService.DeleteObjectsBySplitID(w.ctx, w.driver.containerID, w.path, w.splitInfo.SplitID())
if err != nil {
return nil, fmt.Errorf("could not delete split objects: %w", err)
return ids, nil
return ids, nil
@ -459,5 +464,10 @@ func (w *writer) deleteParts() error {
err := w.driver.treeService.DeleteObjectsBySplitID(w.ctx, w.driver.containerID, w.path, w.splitInfo.SplitID())
if err != nil {
return fmt.Errorf("could not delete split objects: %w", err)
return nil
return nil
@ -3,9 +3,11 @@ package frostfs
import (
import (
@ -18,12 +20,15 @@ import (
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
treepool "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool/tree"
dcontext "github.com/distribution/distribution/v3/context"
dcontext "github.com/distribution/distribution/v3/context"
storagedriver "github.com/distribution/distribution/v3/registry/storage/driver"
storagedriver "github.com/distribution/distribution/v3/registry/storage/driver"
@ -56,6 +61,11 @@ const (
defaultSessionExpirationDuration = 100 // in epoch
defaultSessionExpirationDuration = 100 // in epoch
var (
filePathValidateRegex = regexp.MustCompile(`^/([^/]+/)*[^/]+$`)
prefixPathValidateRegex = regexp.MustCompile(`^/([^/]+/?)*$`)
// DriverParameters is a struct that encapsulates all of the driver parameters after all values have been set.
// DriverParameters is a struct that encapsulates all of the driver parameters after all values have been set.
type DriverParameters struct {
type DriverParameters struct {
ContainerID string
ContainerID string
@ -94,6 +104,7 @@ func (n *frostfsDriverFactory) Create(parameters map[string]interface{}) (storag
type driver struct {
type driver struct {
sdkPool *pool.Pool
sdkPool *pool.Pool
treeService *tree.Tree
owner *user.ID
owner *user.ID
key *ecdsa.PrivateKey
key *ecdsa.PrivateKey
containerID cid.ID
containerID cid.ID
@ -306,7 +317,7 @@ func New(params DriverParameters) (*Driver, error) {
var owner user.ID
var owner user.ID
user.IDFromKey(&owner, acc.PrivateKey().PrivateKey.PublicKey)
user.IDFromKey(&owner, acc.PrivateKey().PrivateKey.PublicKey)
sdkPool, err := createPool(ctx, acc, params)
sdkPool, treePool, err := createPool(ctx, acc, params)
if err != nil {
if err != nil {
return nil, fmt.Errorf("couldn't create sdk pool: %w", err)
return nil, fmt.Errorf("couldn't create sdk pool: %w", err)
@ -321,8 +332,11 @@ func New(params DriverParameters) (*Driver, error) {
return nil, fmt.Errorf("couldn't get container id: %w", err)
return nil, fmt.Errorf("couldn't get container id: %w", err)
treeService := wrapper.NewPoolWrapper(treePool)
d := &driver{
d := &driver{
sdkPool: sdkPool,
sdkPool: sdkPool,
treeService: tree.NewTree(treeService, nil),
owner: &owner,
owner: &owner,
key: &acc.PrivateKey().PrivateKey,
key: &acc.PrivateKey().PrivateKey,
containerID: cnrID,
containerID: cnrID,
@ -376,28 +390,49 @@ func getContainerID(params DriverParameters) (cid.ID, error) {
return cnrID, nil
return cnrID, nil
func createPool(ctx context.Context, acc *wallet.Account, param DriverParameters) (*pool.Pool, error) {
func createPool(ctx context.Context, acc *wallet.Account, param DriverParameters) (*pool.Pool, *treepool.Pool, error) {
var prm pool.InitParameters
var prm pool.InitParameters
var prmTree treepool.InitParameters
for _, peer := range param.Peers {
for _, peer := range param.Peers {
prm.AddNode(pool.NewNodeParam(peer.Priority, peer.Address, peer.Weight))
nodeParam := pool.NewNodeParam(peer.Priority, peer.Address, peer.Weight)
p, err := pool.NewPool(prm)
p, err := pool.NewPool(prm)
if err != nil {
if err != nil {
return nil, fmt.Errorf("create pool: %w", err)
return nil, nil, fmt.Errorf("create pool: %w", err)
if err = p.Dial(ctx); err != nil {
if err = p.Dial(ctx); err != nil {
return nil, fmt.Errorf("dial pool: %w", err)
return nil, nil, fmt.Errorf("dial pool: %w", err)
return p, nil
treePool, err := treepool.NewPool(prmTree)
if err != nil {
return nil, nil, fmt.Errorf("create tree pool: %w", err)
if err = treePool.Dial(ctx); err != nil {
return nil, nil, fmt.Errorf("dial tree pool: %w", err)
return p, treePool, nil
func createNnsResolver(params DriverParameters) (*resolver.NNS, error) {
func createNnsResolver(params DriverParameters) (*resolver.NNS, error) {
@ -468,23 +503,34 @@ func (d *driver) Name() string {
func (d *driver) GetContent(ctx context.Context, path string) ([]byte, error) {
func (d *driver) GetContent(ctx context.Context, path string) ([]byte, error) {
id, err := d.searchOne(ctx, path)
if ok := validateFilePath(path); !ok {
return nil, storagedriver.InvalidPathError{Path: path}
treeObj, err := d.treeService.GetObjectByPath(ctx, d.containerID, path)
if err != nil {
if err != nil {
return nil, err
if errors.Is(err, tree.ErrNodeNotFound) {
return nil, storagedriver.PathNotFoundError{Path: path, DriverName: driverName}
return nil, fmt.Errorf("couldn't get object from tree: %w", err)
var prm pool.PrmObjectGet
var prm pool.PrmObjectGet
obj, err := d.sdkPool.GetObject(ctx, prm)
obj, err := d.sdkPool.GetObject(ctx, prm)
if err != nil {
if err != nil {
return nil, fmt.Errorf("couldn't get object '%s': %w", id, err)
return nil, fmt.Errorf("couldn't get object '%s': %w", treeObj.ObjID, err)
return io.ReadAll(obj.Payload)
return io.ReadAll(obj.Payload)
func (d *driver) PutContent(ctx context.Context, path string, content []byte) error {
func (d *driver) PutContent(ctx context.Context, path string, content []byte) error {
if ok := validateFilePath(path); !ok {
return storagedriver.InvalidPathError{Path: path}
if err := d.Delete(ctx, path); err != nil {
if err := d.Delete(ctx, path); err != nil {
return fmt.Errorf("couldn't delete '%s': %s", path, err)
return fmt.Errorf("couldn't delete '%s': %s", path, err)
@ -495,34 +541,35 @@ func (d *driver) PutContent(ctx context.Context, path string, content []byte) er
var prm pool.PrmObjectPut
var prm pool.PrmObjectPut
if _, err := d.sdkPool.PutObject(ctx, prm); err != nil {
id, err := d.sdkPool.PutObject(ctx, prm)
if err != nil {
return fmt.Errorf("couldn't put object '%s': %w", path, err)
return fmt.Errorf("couldn't put object '%s': %w", path, err)
_, err = d.treeService.AddObject(ctx, d.containerID, path, id, uint64(len(content)))
if err != nil {
return fmt.Errorf("can't add object to tree: %w", err)
return nil
return nil
func (d *driver) Reader(ctx context.Context, path string, offset int64) (io.ReadCloser, error) {
func (d *driver) Reader(ctx context.Context, path string, offset int64) (io.ReadCloser, error) {
id, err := d.searchOne(ctx, path)
if ok := validateFilePath(path); !ok {
return nil, storagedriver.InvalidPathError{Path: path}
treeObj, err := d.treeService.GetObjectByPath(ctx, d.containerID, path)
if err != nil {
if err != nil {
return nil, err
return nil, fmt.Errorf("couldn't get object from tree: %w", err)
addr := d.objectAddress(id)
addr := d.objectAddress(treeObj.ObjID)
var prmHead pool.PrmObjectHead
if uint64(offset) >= treeObj.PayloadSize {
return nil, fmt.Errorf("invalid offset %d for object length %d", offset, treeObj.PayloadSize)
obj, err := d.sdkPool.HeadObject(ctx, prmHead)
if err != nil {
return nil, fmt.Errorf("couldn't head object '%s', id '%s': %w", path, id, err)
if uint64(offset) >= obj.PayloadSize() {
length := treeObj.PayloadSize - uint64(offset)
return nil, fmt.Errorf("invalid offset %d for object length %d", offset, obj.PayloadSize())
length := obj.PayloadSize() - uint64(offset)
var prmRange pool.PrmObjectRange
var prmRange pool.PrmObjectRange
@ -532,7 +579,7 @@ func (d *driver) Reader(ctx context.Context, path string, offset int64) (io.Read
res, err := d.sdkPool.ObjectRange(ctx, prmRange)
res, err := d.sdkPool.ObjectRange(ctx, prmRange)
if err != nil {
if err != nil {
return nil, fmt.Errorf("couldn't get payload range of object '%s', offset %d, length %d, id '%s': %w",
return nil, fmt.Errorf("couldn't get payload range of object '%s', offset %d, length %d, id '%s': %w",
path, offset, length, id, err)
path, offset, length, treeObj.ObjID, err)
return &res, nil
return &res, nil
@ -543,6 +590,10 @@ func getUploadUUID(ctx context.Context) (uuid string) {
func (d *driver) Writer(ctx context.Context, path string, append bool) (storagedriver.FileWriter, error) {
func (d *driver) Writer(ctx context.Context, path string, append bool) (storagedriver.FileWriter, error) {
if ok := validateFilePath(path); !ok {
return nil, storagedriver.InvalidPathError{Path: path}
splitID := object.NewSplitID()
splitID := object.NewSplitID()
uploadUUID := getUploadUUID(ctx)
uploadUUID := getUploadUUID(ctx)
if err := splitID.Parse(uploadUUID); err != nil {
if err := splitID.Parse(uploadUUID); err != nil {
@ -591,56 +642,42 @@ func (d *driver) Stat(ctx context.Context, path string) (storagedriver.FileInfo,
return newFileInfoDir(path), nil
return newFileInfoDir(path), nil
id, ok, err := d.searchOneBase(ctx, path, true)
if ok := validatePrefixPath(path); !ok {
sourceID, err := d.searchOne(ctx, sourcePath)
if ok := validateFilePath(sourcePath); !ok {
return storagedriver.InvalidPathError{Path: sourcePath}
if ok := validateFilePath(destPath); !ok {
return storagedriver.InvalidPathError{Path: destPath}
sourceTreeObj, err := d.treeService.GetObjectByPath(ctx, d.containerID, sourcePath)
if err != nil {
if err != nil {
return err
return fmt.Errorf("couldn't get object from tree: %w", err)
if err = d.Delete(ctx, destPath); err != nil {
_, err = d.treeService.AddObject(ctx, d.containerID, destPath, sourceTreeObj.ObjID, sourceTreeObj.PayloadSize)
return fmt.Errorf("couldn't delete '%s' object: %w", destPath, err)
sourceAddr := d.objectAddress(sourceID)
var prmGet pool.PrmObjectGet
obj, err := d.sdkPool.GetObject(ctx, prmGet)
if err != nil {
if err != nil {
return fmt.Errorf("could not get source object '%s' by oid '%s': %w", sourcePath, sourceID, err)
return fmt.Errorf("can't add object to tree: %w", err)
defer func() {
if err = obj.Payload.Close(); err != nil {
dcontext.GetLogger(ctx).Errorf("couldn't close object payload reader, path '%s' by oid '%s': %s",
sourcePath, sourceID, err.Error())
objHeader := d.formObject(destPath)
var prmPut pool.PrmObjectPut
if _, err = d.sdkPool.PutObject(ctx, prmPut); err != nil {
return fmt.Errorf("couldn't put object '%s': %w", destPath, err)
var prmDelete pool.PrmObjectDelete
if err = d.treeService.DeleteObject(ctx, d.containerID, sourceTreeObj.ID); err != nil {
return fmt.Errorf("couldn't delete object from tree: %w", err)
if err = d.sdkPool.DeleteObject(ctx, prmDelete); err != nil {
return fmt.Errorf("couldn't remove source file '%s', id '%s': %w", sourcePath, sourceID, err)
return nil
return nil
func (d *driver) Delete(ctx context.Context, path string) error {
func (d *driver) Delete(ctx context.Context, path string) error {
filters := object.NewSearchFilters()
if ok := validatePrefixPath(path); !ok {
return storagedriver.InvalidPathError{Path: path}
filters.AddFilter(attributeFilePath, path, object.MatchCommonPrefix)
path = preparePrefixPath(path)
var prmSearch pool.PrmObjectSearch
treeObjList, err := d.treeService.GetListObjectByPrefix(ctx, d.containerID, path)
res, err := d.sdkPool.SearchObjects(ctx, prmSearch)
if err != nil {
if err != nil {
return fmt.Errorf("init searching using client: %w", err)
return fmt.Errorf("couldn't get object from tree by prefix '%s': %w", path, err)
defer res.Close()
rootTreeObj, err := d.treeService.GetObjectByPath(ctx, d.containerID, path)
var inErr error
err = res.Iterate(func(id oid.ID) bool {
// Check if a key is a subpath (so that deleting "/a" does not delete "/ab").
isSubPath, err := d.checkIsSubPath(ctx, path, id)
if err != nil {
inErr = err
return true
if isSubPath {
if err = d.delete(ctx, id); err != nil {
inErr = fmt.Errorf("couldn't delete object by path '%s': %w", path, err)
return true
return false
if err == nil {
err = inErr
if err != nil {
if err != nil {
return fmt.Errorf("iterate objects: %w", err)
if !errors.Is(err, tree.ErrNodeNotFound) {
return fmt.Errorf("couldn't get object from tree: %w", err)
} else {
treeObjList = append(treeObjList, *rootTreeObj)
for _, treeObj := range treeObjList {
if err := d.delete(ctx, treeObj.ObjID); err != nil {
return fmt.Errorf("error in delete object oid: %s : %w", treeObj.ObjID, err)
if err = d.treeService.DeleteObject(ctx, d.containerID, treeObj.ID); err != nil {
return fmt.Errorf("couldn't delete object from tree: %w", err)
return nil
return nil
func (d *driver) checkIsSubPath(ctx context.Context, path string, id oid.ID) (bool, error) {
var prmHead pool.PrmObjectHead
obj, err := d.sdkPool.HeadObject(ctx, prmHead)
if err != nil {
return false, fmt.Errorf("couldn't head object part '%s', id '%s': %w", path, id, err)
fileInf := newFileInfo(ctx, obj, "")
return fileInf.Path() <= path || fileInf.Path()[len(path)] == '/', nil
func (d *driver) delete(ctx context.Context, id oid.ID) error {
func (d *driver) delete(ctx context.Context, id oid.ID) error {
var prm pool.PrmObjectDelete
var prm pool.PrmObjectDelete
@ -770,166 +765,47 @@ func (d *driver) Walk(ctx context.Context, path string, fn storagedriver.WalkFn)
return storagedriver.WalkFallback(ctx, d, path, fn)
return storagedriver.WalkFallback(ctx, d, path, fn)
func (d *driver) searchByPrefix(ctx context.Context, prefix string) ([]oid.ID, error) {
filters := object.NewSearchFilters()
filters.AddFilter(attributeFilePath, prefix, object.MatchCommonPrefix)
return d.baseSearch(ctx, filters)
func (d *driver) headSplitParts(ctx context.Context, splitID *object.SplitID, path string, isAppend bool) ([]*object.Object, map[oid.ID]struct{}, error) {
func (d *driver) headSplitParts(ctx context.Context, splitID *object.SplitID, path string, isAppend bool) ([]*object.Object, map[oid.ID]struct{}, error) {
filters := object.NewSearchFilters()
ids, err := d.treeService.GetListOIDBySplitID(ctx, d.containerID, path, splitID)
filters.AddSplitIDFilter(object.MatchStringEqual, splitID)
var prm pool.PrmObjectSearch
res, err := d.sdkPool.SearchObjects(ctx, prm)
if err != nil {
if err != nil {
return nil, nil, fmt.Errorf("init searching using client: %w", err)
return nil, nil, fmt.Errorf("couldn't get object from tree by split id '%s': %w", path, err)
defer res.Close()
var (
addr oid.Address
prmHead pool.PrmObjectHead
objects []*object.Object
var addr oid.Address
var inErr error
var prmHead pool.PrmObjectHead
var objects []*object.Object
noChild := make(map[oid.ID]struct{})
noChild := make(map[oid.ID]struct{})
err = res.Iterate(func(id oid.ID) bool {
for _, objID := range ids {
obj, err := d.sdkPool.HeadObject(ctx, prmHead)
obj, err := d.sdkPool.HeadObject(ctx, prmHead)
if err != nil {
if err != nil {
inErr = fmt.Errorf("couldn't head object part '%s', id '%s', splitID '%s': %w", path, id, splitID, err)
return nil, nil, fmt.Errorf("couldn't head object part '%s', id '%s', splitID '%s': %w", path, objID, splitID, err)
return true
if isAppend {
if isAppend {
objects = append(objects, &obj)
objects = append(objects, &obj)
noChild[id] = struct{}{}
noChild[objID] = struct{}{}
return false
inErr = fmt.Errorf("init upload part '%s' already exist, splitID '%s'", path, splitID)
return nil, nil, fmt.Errorf("init upload part '%s' already exist, splitID '%s'", path, splitID)
return true
if err == nil {
err = inErr
if err != nil {
return nil, nil, fmt.Errorf("iterate objects: %w", err)
return objects, noChild, nil
return objects, noChild, nil
func (d *driver) baseSearch(ctx context.Context, filters object.SearchFilters) ([]oid.ID, error) {
func newFileInfo(treeObj tree.TreeNode, prefix string) storagedriver.FileInfo {
var prm pool.PrmObjectSearch
res, err := d.sdkPool.SearchObjects(ctx, prm)
if err != nil {
return nil, fmt.Errorf("init searching using client: %w", err)
defer res.Close()
var buf []oid.ID
err = res.Iterate(func(id oid.ID) bool {
buf = append(buf, id)
return false
if err != nil {
return nil, fmt.Errorf("iterate objects: %w", err)
return buf, nil
func (d *driver) searchOne(ctx context.Context, path string) (oid.ID, error) {
id, ok, err := d.searchOneBase(ctx, path, false)
if err != nil {
return oid.ID{}, err
if !ok {
return oid.ID{}, fmt.Errorf("found more than one object by path '%s'", path)
return id, nil
func (d *driver) searchOneBase(ctx context.Context, path string, byPrefix bool) (resID oid.ID, ok bool, err error) {
filters := object.NewSearchFilters()
if byPrefix {
filters.AddFilter(attributeFilePath, path, object.MatchCommonPrefix)
} else {
filters.AddFilter(attributeFilePath, path, object.MatchStringEqual)
var prm pool.PrmObjectSearch
res, err := d.sdkPool.SearchObjects(ctx, prm)
if err != nil {
return oid.ID{}, false, fmt.Errorf("init searching using client: %w", err)
defer res.Close()
var found bool
err = res.Iterate(func(id oid.ID) bool {
if found {
ok = false
return true
found = true
resID = id
ok = true
return false
if err != nil {
return oid.ID{}, false, fmt.Errorf("iterate objects by path '%s': %w", path, err)
if !found {
return oid.ID{}, false, storagedriver.PathNotFoundError{Path: path, DriverName: driverName}
return resID, ok, nil
func newFileInfo(ctx context.Context, obj object.Object, prefix string) storagedriver.FileInfo {
fileInfoFields := storagedriver.FileInfoFields{
fileInfoFields := storagedriver.FileInfoFields{
Size: int64(obj.PayloadSize()),
Path: treeObj.FullPath,
Size: int64(treeObj.PayloadSize),
ModTime: treeObj.ModificationTime,
for _, attr := range obj.Attributes() {
switch attr.Key() {
case attributeFilePath:
fileInfoFields.Path = attr.Value()
case object.AttributeTimestamp:
timestamp, err := strconv.ParseInt(attr.Value(), 10, 64)
if err != nil {
objID, _ := obj.ID()
dcontext.GetLogger(ctx).Warnf("object '%s' has invalid timestamp '%s'", objID.EncodeToString(), attr.Value())
fileInfoFields.ModTime = time.Unix(timestamp, 0)
if len(prefix) > 0 {
if len(prefix) > 0 {
@ -956,18 +832,39 @@ func newFileInfoDir(path string) storagedriver.FileInfo {
func (d *driver) newObjTarget() transformer.ChunkedObjectWriter {
func validatePrefixPath(path string) bool {
return prefixPathValidateRegex.MatchString(path)
func validateFilePath(path string) bool {
return filePathValidateRegex.MatchString(path)
func preparePrefixPath(path string) string {
if strings.HasSuffix(path, "/") {
return path[:len(path)-1]
return path
func (d *driver) newObjTarget(path string, splitID *object.SplitID) transformer.ChunkedObjectWriter {
return &objTarget{
return &objTarget{
sdkPool: d.sdkPool,
sdkPool: d.sdkPool,
key: d.key,
treeService: d.treeService,
splitID: splitID,
key: d.key,
path: path,
type objTarget struct {
type objTarget struct {
sdkPool *pool.Pool
sdkPool *pool.Pool
key *ecdsa.PrivateKey
treeService *tree.Tree
obj *object.Object
path string
chunks [][]byte
splitID *object.SplitID
key *ecdsa.PrivateKey
obj *object.Object
chunks [][]byte
func (t *objTarget) WriteHeader(_ context.Context, obj *object.Object) error {
func (t *objTarget) WriteHeader(_ context.Context, obj *object.Object) error {
@ -999,11 +896,14 @@ func (t *objTarget) Close(ctx context.Context) (*transformer.AccessIdentifiers,
var (
var (
parID oid.ID
parID oid.ID
parHdr *object.Object
parHdr *object.Object
isCommit bool
sizeFinalObject uint64
if par := t.obj.Parent(); par != nil && par.Signature() == nil {
if par := t.obj.Parent(); par != nil && par.Signature() == nil {
isCommit = true
objPar := object.NewFromV2(par.ToV2())
objPar := object.NewFromV2(par.ToV2())
@ -1013,6 +913,7 @@ func (t *objTarget) Close(ctx context.Context) (*transformer.AccessIdentifiers,
parID, _ = objPar.ID()
parID, _ = objPar.ID()
sizeFinalObject = objPar.PayloadSize()
@ -1029,12 +930,28 @@ func (t *objTarget) Close(ctx context.Context) (*transformer.AccessIdentifiers,
var prm pool.PrmObjectPut
var prm pool.PrmObjectPut
id, err := t.sdkPool.PutObject(ctx, prm)
_, err = t.sdkPool.PutObject(ctx, prm)
if err != nil {
if err != nil {
return nil, fmt.Errorf("couldn't put part: %w", err)
return nil, fmt.Errorf("couldn't put part: %w", err)
containerID, ok := t.obj.ContainerID()
if !ok {
return nil, fmt.Errorf("error in get container id from object")
if isCommit {
_, err := t.treeService.AddObject(ctx, containerID, t.path, parID, sizeFinalObject)
if err != nil {
return nil, fmt.Errorf("can't add object to tree: %w", err)
_, err = t.treeService.AddPHYObject(ctx, containerID, t.path, id, t.splitID)
if err != nil {
return nil, fmt.Errorf("can't add phy object to tree: %w", err)
objID, _ := t.obj.ID()
objID, _ := t.obj.ID()
return &transformer.AccessIdentifiers{
return &transformer.AccessIdentifiers{
@ -7,6 +7,7 @@ import (
@ -75,8 +76,7 @@ func TestIntegration(t *testing.T) {
rootCtx := context.Background()
rootCtx := context.Background()
aioImage := "truecloudlab/frostfs-aio:"
aioImage := "truecloudlab/frostfs-aio:"
versions := []string{
versions := []string{
for _, aioVersion := range versions {
for _, aioVersion := range versions {
@ -214,6 +214,7 @@ func testWriteRead(rootCtx context.Context, t *testing.T, drvr storagedriver.Sto
func testList(rootCtx context.Context, t *testing.T, drvr storagedriver.StorageDriver, version string) {
func testList(rootCtx context.Context, t *testing.T, drvr storagedriver.StorageDriver, version string) {
ctx, path := formCtxAndPath(rootCtx, version)
ctx, path := formCtxAndPath(rootCtx, version)
path = "/list" + path
fileWriter, err := drvr.Writer(ctx, path, false)
fileWriter, err := drvr.Writer(ctx, path, false)
require.NoError(t, err)
require.NoError(t, err)
@ -229,7 +230,7 @@ func testList(rootCtx context.Context, t *testing.T, drvr storagedriver.StorageD
err = fileWriter.Commit()
err = fileWriter.Commit()
require.NoError(t, err)
require.NoError(t, err)
res, err := drvr.List(ctx, path)
res, err := drvr.List(ctx, getParentDirPath(path))
require.NoError(t, err)
require.NoError(t, err)
require.Len(t, res, 1)
require.Len(t, res, 1)
require.Contains(t, res, path)
require.Contains(t, res, path)
@ -340,3 +341,12 @@ func createContainer(ctx context.Context, t *testing.T, clientPool *pool.Pool, o
return cnrID
return cnrID
func getParentDirPath(path string) string {
lastSlashIndex := strings.LastIndex(path, "/")
if lastSlashIndex != -1 {
path = path[:lastSlashIndex]
return path
@ -0,0 +1,410 @@
package tree
import (
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
type (
Tree struct {
service ServiceClient
log *zap.Logger
// ServiceClient is a client to interact with tree service.
// Each method must return ErrNodeNotFound or ErrNodeAccessDenied if relevant.
ServiceClient interface {
GetNodes(ctx context.Context, p *GetNodesParams) ([]NodeResponse, error)
GetSubTree(ctx context.Context, containerID cid.ID, treeID string, rootID uint64, depth uint32) ([]NodeResponse, error)
dkirillov marked this conversation as resolved
This is unused method (some others too). Can we drop it then? This is unused method (some others too). Can we drop it then?
AddNodeByPath(ctx context.Context, containerID cid.ID, treeID string, path []string, meta map[string]string) (uint64, error)
RemoveNode(ctx context.Context, containerID cid.ID, treeID string, nodeID uint64) error
SubTreeStream interface {
Next() (NodeResponse, error)
TreeNode struct {
ID uint64
ParentID uint64
ObjID oid.ID
TimeStamp uint64
ModificationTime time.Time
PayloadSize uint64
FullPath string
Meta map[string]string
GetNodesParams struct {
ContainerID cid.ID
TreeID string
Path []string
Meta []string
LatestOnly bool
AllAttrs bool
type Meta interface {
GetKey() string
GetValue() []byte
type NodeResponse interface {
GetMeta() []Meta
GetNodeID() uint64
GetParentID() uint64
GetTimestamp() uint64
const (
FileNameKey = "FileName"
oidKV = "OID"
splitIDKV = "SplitID"
payloadSize = "PayloadSize"
fullPath = "FullPath"
modificationTime = "ModificationTime"
dkirillov marked this conversation as resolved
@alexvanin Did we decide use the same tree as for s3 objects? @alexvanin Did we decide use the same tree as for s3 objects?
I think so, but there is no strong opinion on this. I think so, but there is no strong opinion on this.
I have some concerns about using the same container/bucket in both ways: via s3 and distribution I have some concerns about using the same container/bucket in both ways: via s3 and distribution
Decided to use different tree names for distribution. Decided to use different tree names for distribution.
objectTree = "distribution-object"
splitTree = "distribution-split"
dkirillov marked this conversation as resolved
Can we name this more distribution specific? Can we name this more distribution specific?
Like `distribution-object`, `distribution-split` for example
separator = "/"
maxDepth = 0
var (
// ErrNodeNotFound is returned from ServiceClient in case of not found error.
ErrNodeNotFound = errors.New("not found")
ErrOnlyDirFound = errors.New("only directory found")
// ErrNodeAccessDenied is returned from ServiceClient service in case of access denied error.
ErrNodeAccessDenied = errors.New("access denied")
// ErrGatewayTimeout is returned from ServiceClient service in case of timeout error.
ErrGatewayTimeout = errors.New("gateway timeout")
func NewTree(service ServiceClient, log *zap.Logger) *Tree {
return &Tree{
service: service,
log: log,
func (c *Tree) GetObjectByPath(ctx context.Context, containerID cid.ID, path string) (*TreeNode, error) {
newPath := pathFromName(path)
p := &GetNodesParams{
ContainerID: containerID,
TreeID: objectTree,
Path: newPath,
LatestOnly: false,
AllAttrs: true,
nodes, err := c.service.GetNodes(ctx, p)
if err != nil {
return nil, err
var result NodeResponse
switch {
case len(nodes) == 0 || len(nodes) == 1 && isIntermediate(nodes[0]):
return nil, ErrNodeNotFound
case len(nodes) > 2:
return nil, fmt.Errorf("found more than two nodes")
result = nodes[0]
if len(nodes) == 2 && isIntermediate(result) {
result = nodes[1]
treeNode, err := newTreeNode(result)
if err != nil {
return nil, fmt.Errorf("error parse tree node: %w", err)
return treeNode, nil
func (c *Tree) GetObjectByPathDir(ctx context.Context, containerID cid.ID, path string) (*TreeNode, error) {
newPath := pathFromName(path)
p := &GetNodesParams{
ContainerID: containerID,
TreeID: objectTree,
Path: newPath,
LatestOnly: false,
AllAttrs: true,
nodes, err := c.service.GetNodes(ctx, p)
if err != nil {
return nil, err
var result NodeResponse
switch {
case len(nodes) == 0:
return nil, ErrNodeNotFound
case len(nodes) == 1 && isIntermediate(nodes[0]):
return nil, ErrOnlyDirFound
case len(nodes) > 2:
return nil, fmt.Errorf("found more than two nodes")
result = nodes[0]
if len(nodes) == 2 && isIntermediate(result) {
result = nodes[1]
treeNode, err := newTreeNode(result)
if err != nil {
return nil, fmt.Errorf("error parse tree node: %w", err)
return treeNode, nil
func (c *Tree) GetListObjectByPrefix(ctx context.Context, containerID cid.ID, prefixPath string) ([]TreeNode, error) {
rootID, err := c.determinePrefixNode(ctx, containerID, objectTree, prefixPath)
if err != nil {
if errors.Is(err, ErrNodeNotFound) {
return nil, nil
return nil, err
subTree, err := c.service.GetSubTree(ctx, containerID, objectTree, rootID, maxDepth)
if err != nil {
if errors.Is(err, ErrNodeNotFound) {
return nil, nil
return nil, err
nodes := make([]NodeResponse, 0)
for _, node := range subTree {
if node.GetNodeID() == rootID {
dkirillov marked this conversation as resolved
Actually we have to check for
Actually we have to check for `rootID`:
if node.GetNodeID() == rootID {
if !isIntermediate(node) {
nodes = append(nodes, node)
result := make([]TreeNode, 0, len(nodes))
for _, v := range nodes {
node, err := newTreeNode(v)
if err != nil {
return nil, fmt.Errorf("error parse tree node: %w", err)
result = append(result, *node)
return result, nil
func (c *Tree) GetListOIDBySplitID(ctx context.Context, containerID cid.ID, path string, splitID *object.SplitID) ([]oid.ID, error) {
nodes, err := c.getNodesBySplitID(ctx, containerID, path, splitID)
if err != nil {
return nil, err
ids := make([]oid.ID, len(nodes))
for i, node := range nodes {
ids[i] = node.ObjID
return ids, nil
func (c *Tree) AddObject(ctx context.Context, containerID cid.ID, filePath string, oid oid.ID, size uint64) (uint64, error) {
path := pathFromName(filePath)
meta := map[string]string{
oidKV: oid.String(),
FileNameKey: path[len(path)-1],
payloadSize: strconv.FormatUint(size, 10),
fullPath: filePath,
modificationTime: strconv.FormatInt(time.Now().Unix(), 10),
nodeID, err := c.service.AddNodeByPath(ctx, containerID, objectTree, path[:len(path)-1], meta)
return nodeID, err
func (c *Tree) AddPHYObject(ctx context.Context, containerID cid.ID, filePath string, oid oid.ID, splitID *object.SplitID) (uint64, error) {
meta := map[string]string{
oidKV: oid.String(),
splitIDKV: splitID.String(),
nodeID, err := c.service.AddNodeByPath(ctx, containerID, splitTree, pathFromName(filePath), meta)
return nodeID, err
func (c *Tree) DeleteObject(ctx context.Context, containerID cid.ID, nodeID uint64) error {
return c.service.RemoveNode(ctx, containerID, objectTree, nodeID)
func (c *Tree) DeleteObjectsBySplitID(ctx context.Context, containerID cid.ID, path string, splitID *object.SplitID) error {
splitNodes, err := c.getNodesBySplitID(ctx, containerID, path, splitID)
if err != nil {
return err
for _, node := range splitNodes {
err = c.service.RemoveNode(ctx, containerID, splitTree, node.ID)
if err != nil {
return err
return nil
func (c *Tree) determinePrefixNode(ctx context.Context, containerID cid.ID, treeID, prefixPath string) (uint64, error) {
var rootID uint64
path := pathFromName(prefixPath)
if len(path) > 1 {
var err error
rootID, err = c.getPrefixNodeID(ctx, containerID, treeID, path)
if err != nil {
return 0, err
return rootID, nil
func (c *Tree) getPrefixNodeID(ctx context.Context, containerID cid.ID, treeID string, prefixPath []string) (uint64, error) {
p := &GetNodesParams{
ContainerID: containerID,
TreeID: treeID,
Path: prefixPath,
LatestOnly: false,
AllAttrs: true,
nodes, err := c.service.GetNodes(ctx, p)
if err != nil {
return 0, err
var intermediateNodes []uint64
for _, node := range nodes {
if isIntermediate(node) {
intermediateNodes = append(intermediateNodes, node.GetNodeID())
if len(intermediateNodes) == 0 {
return 0, ErrNodeNotFound
if len(intermediateNodes) > 1 {
return 0, fmt.Errorf("found more than one intermediate nodes")
return intermediateNodes[0], nil
func (c *Tree) getNodesBySplitID(ctx context.Context, containerID cid.ID, filePath string, splitID *object.SplitID) ([]TreeNode, error) {
rootID, err := c.determinePrefixNode(ctx, containerID, splitTree, filePath)
if err != nil {
if errors.Is(err, ErrNodeNotFound) {
return nil, nil
return nil, err
subTree, err := c.service.GetSubTree(ctx, containerID, splitTree, rootID, 2)
if err != nil {
if errors.Is(err, ErrNodeNotFound) {
return nil, nil
return nil, err
nodes := make([]TreeNode, 0, len(subTree))
for _, v := range subTree {
node, err := newTreeNode(v)
if err != nil {
return nil, fmt.Errorf("error parse tree node: %w", err)
nodes = append(nodes, *node)
result := make([]TreeNode, 0)
for _, node := range nodes {
if val, ok := node.Meta[splitIDKV]; ok {
if val == splitID.String() {
result = append(result, node)
return result, nil
func isIntermediate(node NodeResponse) bool {
if len(node.GetMeta()) != 1 {
return false
return node.GetMeta()[0].GetKey() == FileNameKey
func newTreeNode(nodeInfo NodeResponse) (*TreeNode, error) {
treeNode := &TreeNode{
ID: nodeInfo.GetNodeID(),
ParentID: nodeInfo.GetParentID(),
TimeStamp: nodeInfo.GetTimestamp(),
Meta: make(map[string]string, len(nodeInfo.GetMeta())),
for _, kv := range nodeInfo.GetMeta() {
switch kv.GetKey() {
case oidKV:
if err := treeNode.ObjID.DecodeString(string(kv.GetValue())); err != nil {
return nil, err
case payloadSize:
if sizeStr := string(kv.GetValue()); len(sizeStr) > 0 {
var err error
if treeNode.PayloadSize, err = strconv.ParseUint(sizeStr, 10, 64); err != nil {
return nil, fmt.Errorf("invalid payload size value '%s': %w", sizeStr, err)
case modificationTime:
timestamp, err := strconv.ParseInt(string(kv.GetValue()), 10, 64)
if err != nil {
return nil, fmt.Errorf("invalid modification time value '%s': %w", string(kv.GetValue()), err)
treeNode.ModificationTime = time.Unix(timestamp, 0)
case fullPath:
treeNode.FullPath = string(kv.GetValue())
treeNode.Meta[kv.GetKey()] = string(kv.GetValue())
return treeNode, nil
func pathFromName(objectName string) []string {
return strings.Split(objectName, separator)
@ -0,0 +1,347 @@
package tree
import (
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
type nodeMeta struct {
key string
value []byte
func (m nodeMeta) GetKey() string {
return m.key
func (m nodeMeta) GetValue() []byte {
return m.value
type nodeResponse struct {
meta []nodeMeta
nodeID uint64
parentID uint64
timestamp uint64
func (n nodeResponse) GetNodeID() uint64 {
return n.nodeID
func (n nodeResponse) GetParentID() uint64 {
return n.parentID
func (n nodeResponse) GetTimestamp() uint64 {
return n.timestamp
func (n nodeResponse) GetMeta() []Meta {
res := make([]Meta, len(n.meta))
for i, value := range n.meta {
res[i] = value
return res
func (n nodeResponse) getValue(key string) string {
for _, value := range n.meta {
if value.key == key {
return string(value.value)
return ""
type ServiceClientMemory struct {
containers map[string]containerInfo
type containerInfo struct {
containerID cid.ID
trees map[string]memoryTree
type memoryTree struct {
idCounter uint64
treeData *treeNodeMemory
type treeNodeMemory struct {
data nodeResponse
parent *treeNodeMemory
children []*treeNodeMemory
func (t *treeNodeMemory) getNode(nodeID uint64) *treeNodeMemory {
if t.data.nodeID == nodeID {
return t
for _, child := range t.children {
if node := child.getNode(nodeID); node != nil {
return node
return nil
func (t *memoryTree) getNodesByPath(path []string) []nodeResponse {
if len(path) == 0 {
return nil
var res []nodeResponse
for _, child := range t.treeData.children {
res = child.listNodesByPath(res, path)
return res
func (t *treeNodeMemory) listNodesByPath(res []nodeResponse, path []string) []nodeResponse {
if len(path) == 0 || t.data.getValue(FileNameKey) != path[0] {
return res
if len(path) == 1 {
return append(res, t.data)
for _, ch := range t.children {
res = ch.listNodesByPath(res, path[1:])
return res
func (t *memoryTree) createPathIfNotExist(parent *treeNodeMemory, path []string) *treeNodeMemory {
if len(path) == 0 {
return parent
var node *treeNodeMemory
for _, child := range parent.children {
if len(child.data.meta) == 1 && child.data.getValue(FileNameKey) == path[0] {
node = child
if node == nil {
node = &treeNodeMemory{
data: nodeResponse{
meta: []nodeMeta{{key: FileNameKey, value: []byte(path[0])}},
nodeID: t.idCounter,
parentID: parent.data.nodeID,
timestamp: uint64(time.Now().UnixMicro()),
parent: parent,
parent.children = append(parent.children, node)
return t.createPathIfNotExist(node, path[1:])
func (t *treeNodeMemory) removeChild(nodeID uint64) {
ind := -1
for i, ch := range t.children {
if ch.data.nodeID == nodeID {
ind = i
if ind != -1 {
t.children = append(t.children[:ind], t.children[ind+1:]...)
func (t *treeNodeMemory) listNodes(res []NodeResponse, depth uint32) []NodeResponse {
res = append(res, t.data)
if depth == 0 {
return res
for _, ch := range t.children {
res = ch.listNodes(res, depth-1)
return res
func NewTreeServiceClientMemory() (*ServiceClientMemory, error) {
return &ServiceClientMemory{
containers: make(map[string]containerInfo),
}, nil
func (c *ServiceClientMemory) GetNodes(_ context.Context, p *GetNodesParams) ([]NodeResponse, error) {
cnr, ok := c.containers[p.ContainerID.EncodeToString()]
if !ok {
return nil, nil
tr, ok := cnr.trees[p.TreeID]
if !ok {
return nil, nil
res := tr.getNodesByPath(p.Path)
sort.Slice(res, func(i, j int) bool {
return res[i].timestamp < res[j].timestamp
if p.LatestOnly && len(res) != 0 {
res = res[len(res)-1:]
res2 := make([]NodeResponse, len(res))
for i, n := range res {
res2[i] = n
return res2, nil
func (c *ServiceClientMemory) GetSubTree(_ context.Context, containerID cid.ID, treeID string, rootID uint64, depth uint32) ([]NodeResponse, error) {
cnr, ok := c.containers[containerID.EncodeToString()]
if !ok {
return nil, nil
tr, ok := cnr.trees[treeID]
if !ok {
return nil, ErrNodeNotFound
node := tr.treeData.getNode(rootID)
if node == nil {
return nil, ErrNodeNotFound
// we depth-1 in case of uint32 and 0 as mark to get all subtree leads to overflow and depth is getting quite big to walk all tree levels
return node.listNodes(nil, depth-1), nil
func newContainerInfo(containerID cid.ID, treeID string) containerInfo {
return containerInfo{
containerID: containerID,
trees: map[string]memoryTree{
treeID: {
idCounter: 1,
treeData: &treeNodeMemory{
data: nodeResponse{
timestamp: uint64(time.Now().UnixMicro()),
func newMemoryTree() memoryTree {
return memoryTree{
idCounter: 1,
dkirillov marked this conversation as resolved
treeData: &treeNodeMemory{
data: nodeResponse{
timestamp: uint64(time.Now().UnixMicro()),
func (c *ServiceClientMemory) AddNodeByPath(_ context.Context, containerID cid.ID, treeID string, path []string, meta map[string]string) (uint64, error) {
cnr, ok := c.containers[containerID.EncodeToString()]
if !ok {
cnr = newContainerInfo(containerID, treeID)
c.containers[containerID.EncodeToString()] = cnr
tr, ok := cnr.trees[treeID]
if !ok {
tr = newMemoryTree()
cnr.trees[treeID] = tr
parentNode := tr.createPathIfNotExist(tr.treeData, path)
if parentNode == nil {
return 0, fmt.Errorf("create path '%s'", path)
newID := tr.idCounter
tn := &treeNodeMemory{
data: nodeResponse{
meta: metaToNodeMeta(meta),
nodeID: newID,
parentID: parentNode.data.nodeID,
timestamp: uint64(time.Now().UnixMicro()),
parent: parentNode,
parentNode.children = append(parentNode.children, tn)
cnr.trees[treeID] = tr
return newID, nil
func sortNode(node *treeNodeMemory) {
if node == nil {
for _, child := range node.children {
func sortNodes(list []*treeNodeMemory) {
sort.Slice(list, func(i, j int) bool {
return list[i].data.getValue(FileNameKey) < list[j].data.getValue(FileNameKey)
func (c *ServiceClientMemory) RemoveNode(_ context.Context, containerID cid.ID, treeID string, nodeID uint64) error {
cnr, ok := c.containers[containerID.EncodeToString()]
if !ok {
return ErrNodeNotFound
tr, ok := cnr.trees[treeID]
if !ok {
return ErrNodeNotFound
node := tr.treeData.getNode(nodeID)
if node == nil {
return ErrNodeNotFound
return nil
func metaToNodeMeta(m map[string]string) []nodeMeta {
result := make([]nodeMeta, 0, len(m))
for key, value := range m {
result = append(result, nodeMeta{key: key, value: []byte(value)})
return result
@ -0,0 +1,232 @@
package tree
import (
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
func TestGetObjectByPath(t *testing.T) {
ctx := context.Background()
memCli, err := NewTreeServiceClientMemory()
require.NoError(t, err)
treeService := NewTree(memCli, zaptest.NewLogger(t))
cidTest := cidtest.ID()
oidTest1 := oidtest.ID()
oidTest2 := oidtest.ID()
testSize1 := uint64(10)
testSize2 := uint64(20)
nodeID1, err := treeService.AddObject(ctx, cidTest, "/a/b", oidTest1, testSize1)
require.NoError(t, err)
nodeID2, err := treeService.AddObject(ctx, cidTest, "/a/b/c/d", oidTest2, testSize2)
require.NoError(t, err)
node1, err := treeService.GetObjectByPath(ctx, cidTest, "/a/b")
require.NoError(t, err)
require.Equal(t, nodeID1, node1.ID)
require.Equal(t, oidTest1, node1.ObjID)
require.Equal(t, testSize1, node1.PayloadSize)
node2, err := treeService.GetObjectByPath(ctx, cidTest, "/a/b/c/d")
require.NoError(t, err)
require.Equal(t, nodeID2, node2.ID)
require.Equal(t, oidTest2, node2.ObjID)
require.Equal(t, testSize2, node2.PayloadSize)
_, err = treeService.GetObjectByPath(ctx, cidTest, "/g")
require.ErrorIs(t, err, ErrNodeNotFound)
_, err = treeService.GetObjectByPath(ctx, cidTest, "/a/b/c")
require.ErrorIs(t, err, ErrNodeNotFound)
func TestGetObjectByPathDir(t *testing.T) {
ctx := context.Background()
memCli, err := NewTreeServiceClientMemory()
require.NoError(t, err)
treeService := NewTree(memCli, zaptest.NewLogger(t))
cidTest := cidtest.ID()
oidTest1 := oidtest.ID()
oidTest2 := oidtest.ID()
testSize1 := uint64(10)
testSize2 := uint64(20)
nodeID1, err := treeService.AddObject(ctx, cidTest, "/a/b", oidTest1, testSize1)
require.NoError(t, err)
nodeID2, err := treeService.AddObject(ctx, cidTest, "/a/b/c/d", oidTest2, testSize2)
require.NoError(t, err)
node1, err := treeService.GetObjectByPath(ctx, cidTest, "/a/b")
require.NoError(t, err)
require.Equal(t, nodeID1, node1.ID)
require.Equal(t, oidTest1, node1.ObjID)
require.Equal(t, testSize1, node1.PayloadSize)
node2, err := treeService.GetObjectByPath(ctx, cidTest, "/a/b/c/d")
require.NoError(t, err)
require.Equal(t, nodeID2, node2.ID)
require.Equal(t, oidTest2, node2.ObjID)
require.Equal(t, testSize2, node2.PayloadSize)
_, err = treeService.GetObjectByPathDir(ctx, cidTest, "/g")
require.ErrorIs(t, err, ErrNodeNotFound)
_, err = treeService.GetObjectByPathDir(ctx, cidTest, "/a/b/c")
require.ErrorIs(t, err, ErrOnlyDirFound)
func TestGetListObjectByPrefix(t *testing.T) {
ctx := context.Background()
memCli, err := NewTreeServiceClientMemory()
require.NoError(t, err)
treeService := NewTree(memCli, zaptest.NewLogger(t))
cidTest := cidtest.ID()
oidTest1 := oidtest.ID()
oidTest2 := oidtest.ID()
oidTest3 := oidtest.ID()
oidTest4 := oidtest.ID()
testSize1 := uint64(10)
testSize2 := uint64(20)
testSize3 := uint64(30)
testSize4 := uint64(40)
_, err = treeService.AddObject(ctx, cidTest, "/a/b", oidTest1, testSize1)
require.NoError(t, err)
nodeID2, err := treeService.AddObject(ctx, cidTest, "/a/b/c", oidTest2, testSize2)
require.NoError(t, err)
nodeID3, err := treeService.AddObject(ctx, cidTest, "/a/b/c/d", oidTest3, testSize3)
require.NoError(t, err)
nodeID4, err := treeService.AddObject(ctx, cidTest, "/a/b/c/d/e", oidTest4, testSize4)
require.NoError(t, err)
nodes, err := treeService.GetListObjectByPrefix(ctx, cidTest, "/a/b")
require.NoError(t, err)
require.Equal(t, nodeID2, nodes[0].ID)
require.Equal(t, nodeID3, nodes[1].ID)
require.Equal(t, nodeID4, nodes[2].ID)
require.Equal(t, testSize2, nodes[0].PayloadSize)
require.Equal(t, testSize3, nodes[1].PayloadSize)
require.Equal(t, testSize4, nodes[2].PayloadSize)
nodes, err = treeService.GetListObjectByPrefix(ctx, cidTest, "/g/s")
require.NoError(t, err)
require.Equal(t, 0, len(nodes))
func TestGetListOIDBySplitID(t *testing.T) {
ctx := context.Background()
memCli, err := NewTreeServiceClientMemory()
require.NoError(t, err)
treeService := NewTree(memCli, zaptest.NewLogger(t))
cidTest := cidtest.ID()
oidTest1 := oidtest.ID()
oidTest2 := oidtest.ID()
oidTest3 := oidtest.ID()
splitID := object.NewSplitID()
_, err = treeService.AddPHYObject(ctx, cidTest, "/a/b", oidTest1, splitID)
require.NoError(t, err)
_, err = treeService.AddPHYObject(ctx, cidTest, "/a/b", oidTest2, splitID)
require.NoError(t, err)
_, err = treeService.AddPHYObject(ctx, cidTest, "/a/b", oidTest3, splitID)
require.NoError(t, err)
ids, err := treeService.GetListOIDBySplitID(ctx, cidTest, "/a/b", splitID)
require.NoError(t, err)
require.Equal(t, oidTest1, ids[0])
require.Equal(t, oidTest2, ids[1])
require.Equal(t, oidTest3, ids[2])
ids, err = treeService.GetListOIDBySplitID(ctx, cidTest, "/c/d", splitID)
require.NoError(t, err)
require.Equal(t, 0, len(ids))
func TestDeleteObject(t *testing.T) {
ctx := context.Background()
memCli, err := NewTreeServiceClientMemory()
require.NoError(t, err)
treeService := NewTree(memCli, zaptest.NewLogger(t))
cidTest := cidtest.ID()
oidTest1 := oidtest.ID()
testSize := uint64(10)
nodeID1, err := treeService.AddObject(ctx, cidTest, "/a/b", oidTest1, testSize)
require.NoError(t, err)
err = treeService.DeleteObject(ctx, cidTest, nodeID1)
require.NoError(t, err)
_, err = treeService.GetObjectByPath(ctx, cidTest, "/a/b")
require.ErrorIs(t, err, ErrNodeNotFound)
err = treeService.DeleteObject(ctx, cidTest, nodeID1+1)
require.ErrorIs(t, err, ErrNodeNotFound)
func TestDeleteObjectsBySplitID(t *testing.T) {
ctx := context.Background()
memCli, err := NewTreeServiceClientMemory()
require.NoError(t, err)
treeService := NewTree(memCli, zaptest.NewLogger(t))
cidTest := cidtest.ID()
oidTest1 := oidtest.ID()
oidTest2 := oidtest.ID()
oidTest3 := oidtest.ID()
splitID := object.NewSplitID()
_, err = treeService.AddPHYObject(ctx, cidTest, "/a/b", oidTest1, splitID)
require.NoError(t, err)
_, err = treeService.AddPHYObject(ctx, cidTest, "/a/b", oidTest2, splitID)
require.NoError(t, err)
_, err = treeService.AddPHYObject(ctx, cidTest, "/a/b", oidTest3, splitID)
require.NoError(t, err)
ids, err := treeService.GetListOIDBySplitID(ctx, cidTest, "/a/b", splitID)
require.NoError(t, err)
require.Equal(t, 3, len(ids))
err = treeService.DeleteObjectsBySplitID(ctx, cidTest, "/a/b", splitID)
require.NoError(t, err)
ids, err = treeService.GetListOIDBySplitID(ctx, cidTest, "/a/b", splitID)
require.NoError(t, err)
require.Equal(t, 0, len(ids))
ids, err = treeService.GetListOIDBySplitID(ctx, cidTest, "/c/d", splitID)
require.NoError(t, err)
require.Equal(t, 0, len(ids))
@ -0,0 +1,178 @@
package wrapper
import (
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
treepool "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool/tree"
grpcService "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool/tree/service"
dkirillov marked this conversation as resolved
It seems there isn't such package It seems there isn't such package
type GetNodeByPathResponseInfoWrapper struct {
response *grpcService.GetNodeByPathResponse_Info
func (n GetNodeByPathResponseInfoWrapper) GetNodeID() uint64 {
return n.response.GetNodeId()
func (n GetNodeByPathResponseInfoWrapper) GetParentID() uint64 {
return n.response.GetParentId()
func (n GetNodeByPathResponseInfoWrapper) GetTimestamp() uint64 {
return n.response.GetTimestamp()
func (n GetNodeByPathResponseInfoWrapper) GetMeta() []tree.Meta {
res := make([]tree.Meta, len(n.response.Meta))
for i, value := range n.response.Meta {
res[i] = value
return res
type GetSubTreeResponseBodyWrapper struct {
response *grpcService.GetSubTreeResponse_Body
func (n GetSubTreeResponseBodyWrapper) GetNodeID() uint64 {
return n.response.GetNodeId()
func (n GetSubTreeResponseBodyWrapper) GetParentID() uint64 {
return n.response.GetParentId()
func (n GetSubTreeResponseBodyWrapper) GetTimestamp() uint64 {
return n.response.GetTimestamp()
func (n GetSubTreeResponseBodyWrapper) GetMeta() []tree.Meta {
res := make([]tree.Meta, len(n.response.Meta))
for i, value := range n.response.Meta {
res[i] = value
return res
type PoolWrapper struct {
p *treepool.Pool
func NewPoolWrapper(p *treepool.Pool) *PoolWrapper {
return &PoolWrapper{p: p}
func (w *PoolWrapper) GetNodes(ctx context.Context, prm *tree.GetNodesParams) ([]tree.NodeResponse, error) {
poolPrm := treepool.GetNodesParams{
CID: prm.ContainerID,
TreeID: prm.TreeID,
Path: prm.Path,
Meta: prm.Meta,
PathAttribute: tree.FileNameKey,
LatestOnly: prm.LatestOnly,
AllAttrs: prm.AllAttrs,
nodes, err := w.p.GetNodes(ctx, poolPrm)
if err != nil {
return nil, handleError(err)
res := make([]tree.NodeResponse, len(nodes))
for i, info := range nodes {
res[i] = GetNodeByPathResponseInfoWrapper{info}
return res, nil
func (w *PoolWrapper) GetSubTree(ctx context.Context, containerID cid.ID, treeID string, rootID uint64, depth uint32) ([]tree.NodeResponse, error) {
poolPrm := treepool.GetSubTreeParams{
CID: containerID,
TreeID: treeID,
RootID: rootID,
Depth: depth,
subTreeReader, err := w.p.GetSubTree(ctx, poolPrm)
if err != nil {
return nil, handleError(err)
var subtree []tree.NodeResponse
node, err := subTreeReader.Next()
for err == nil {
subtree = append(subtree, GetSubTreeResponseBodyWrapper{node})
node, err = subTreeReader.Next()
if err != nil && err != io.EOF {
return nil, handleError(err)
return subtree, nil
func (w *PoolWrapper) AddNodeByPath(ctx context.Context, containerID cid.ID, treeID string, path []string, meta map[string]string) (uint64, error) {
nodeID, err := w.p.AddNodeByPath(ctx, treepool.AddNodeByPathParams{
CID: containerID,
TreeID: treeID,
Path: path,
Meta: meta,
PathAttribute: tree.FileNameKey,
return nodeID, handleError(err)
func (w *PoolWrapper) RemoveNode(ctx context.Context, containerID cid.ID, treeID string, nodeID uint64) error {
return handleError(w.p.RemoveNode(ctx, treepool.RemoveNodeParams{
CID: containerID,
TreeID: treeID,
NodeID: nodeID,
func handleError(err error) error {
if err == nil {
return nil
if errors.Is(err, treepool.ErrNodeNotFound) {
return fmt.Errorf("%w: %s", tree.ErrNodeNotFound, err.Error())
if errors.Is(err, treepool.ErrNodeAccessDenied) {
return fmt.Errorf("%w: %s", tree.ErrNodeAccessDenied, err.Error())
if isTimeoutError(err) {
return fmt.Errorf("%w: %s", tree.ErrGatewayTimeout, err.Error())
return err
func unwrapErr(err error) error {
unwrappedErr := errors.Unwrap(err)
for unwrappedErr != nil {
err = unwrappedErr
unwrappedErr = errors.Unwrap(err)
return err
func isTimeoutError(err error) bool {
if strings.Contains(err.Error(), "timeout") ||
errors.Is(err, context.DeadlineExceeded) {
return true
return status.Code(unwrapErr(err)) == codes.DeadlineExceeded
Seems like we can store payload size in k/v pair in
meta, so we can remove this head request. The same is applicable for other head requests.Maybe I am wrong, because we upload object part-by-part and we don't know total size unless we use head request. Need to look into it.
If object isn't fully uploaded we shouldn't read it I suppose