feature/integrate_tree_service #8

Merged
alexvanin merged 2 commits from :feature/integrate_tree_service into tcl/master 2024-09-04 19:51:22 +00:00
8 changed files with 1441 additions and 336 deletions

89
go.mod
View file

@ -37,6 +37,11 @@ require (
) )
require ( require (
github.com/benbjohnson/clock v1.1.0 // indirect
cloud.google.com/go v0.110.0 // indirect
cloud.google.com/go/compute v1.19.1 // indirect
cloud.google.com/go/compute/metadata v0.2.3 // indirect
cloud.google.com/go/iam v0.13.0 // indirect
dario.cat/mergo v1.0.0 // indirect dario.cat/mergo v1.0.0 // indirect
git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.15.1-0.20230802075510-964c3edb3f44 // indirect git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.15.1-0.20230802075510-964c3edb3f44 // indirect
git.frostfs.info/TrueCloudLab/frostfs-contract v0.0.0-20230307110621-19a8ef2d02fb // indirect git.frostfs.info/TrueCloudLab/frostfs-contract v0.0.0-20230307110621-19a8ef2d02fb // indirect
@ -44,23 +49,47 @@ require (
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20230825064515-46a214d065f8 git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20230825064515-46a214d065f8
git.frostfs.info/TrueCloudLab/hrw v1.2.1 // indirect git.frostfs.info/TrueCloudLab/hrw v1.2.1 // indirect
git.frostfs.info/TrueCloudLab/rfc6979 v0.4.0 // indirect git.frostfs.info/TrueCloudLab/rfc6979 v0.4.0 // indirect
github.com/Azure/azure-sdk-for-go/sdk/internal v1.3.0 // indirect
github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 // indirect github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 // indirect
github.com/AzureAD/microsoft-authentication-library-for-go v1.0.0 // indirect
github.com/Microsoft/go-winio v0.6.1 // indirect github.com/Microsoft/go-winio v0.6.1 // indirect
github.com/antlr4-go/antlr/v4 v4.13.0 // indirect github.com/antlr4-go/antlr/v4 v4.13.0 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/bitly/go-simplejson v0.5.0 // indirect
github.com/bugsnag/osext v0.0.0-20130617224835-0dd3f918b21b // indirect
github.com/bugsnag/panicwrap v0.0.0-20151223152923-e2c28503fcd0 // indirect
github.com/cenkalti/backoff/v4 v4.2.1 // indirect github.com/cenkalti/backoff/v4 v4.2.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/containerd/containerd v1.7.3 // indirect github.com/containerd/containerd v1.7.3 // indirect
github.com/cpuguy83/dockercfg v0.3.1 // indirect github.com/cpuguy83/dockercfg v0.3.1 // indirect
github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect
github.com/cyphar/filepath-securejoin v0.2.3 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect github.com/davecgh/go-spew v1.1.1 // indirect
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 // indirect github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 // indirect
github.com/docker/distribution v2.8.2+incompatible // indirect github.com/docker/distribution v2.8.2+incompatible // indirect
github.com/docker/docker v24.0.5+incompatible // indirect github.com/docker/docker v24.0.5+incompatible // indirect
github.com/docker/go-connections v0.4.0 // indirect github.com/docker/go-connections v0.4.0 // indirect
github.com/docker/go-units v0.5.0 // indirect github.com/docker/go-units v0.5.0 // indirect
github.com/felixge/httpsnoop v1.0.1 // indirect
github.com/gogo/protobuf v1.3.2 // indirect github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang-jwt/jwt/v4 v4.5.0 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/google/go-cmp v0.5.9 // indirect
github.com/google/uuid v1.3.0
github.com/googleapis/enterprise-certificate-proxy v0.2.3 // indirect
github.com/googleapis/gax-go/v2 v2.7.1 // indirect
github.com/gorilla/websocket v1.5.0 // indirect github.com/gorilla/websocket v1.5.0 // indirect
github.com/hashicorp/golang-lru v0.6.0 // indirect github.com/hashicorp/golang-lru v0.6.0 // indirect
github.com/hashicorp/golang-lru/v2 v2.0.5 // indirect
github.com/inconshreveable/mousetrap v1.0.1 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/kr/pretty v0.3.0 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/kylelemons/godebug v1.1.0 // indirect
github.com/magiconair/properties v1.8.7 // indirect github.com/magiconair/properties v1.8.7 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
github.com/mitchellh/osext v0.0.0-20151018003038-5e2d6d41470f // indirect
github.com/moby/patternmatcher v0.5.0 // indirect github.com/moby/patternmatcher v0.5.0 // indirect
github.com/moby/sys/sequential v0.5.0 // indirect github.com/moby/sys/sequential v0.5.0 // indirect
github.com/moby/term v0.5.0 // indirect github.com/moby/term v0.5.0 // indirect
@ -70,63 +99,35 @@ require (
github.com/nspcc-dev/neo-go/pkg/interop v0.0.0-20230615193820-9185820289ce // indirect github.com/nspcc-dev/neo-go/pkg/interop v0.0.0-20230615193820-9185820289ce // indirect
github.com/nspcc-dev/rfc6979 v0.2.0 // indirect github.com/nspcc-dev/rfc6979 v0.2.0 // indirect
github.com/opencontainers/runc v1.1.5 // indirect github.com/opencontainers/runc v1.1.5 // indirect
github.com/pkg/browser v0.0.0-20210911075715-681adbf594b8 // indirect
github.com/pkg/errors v0.9.1 // indirect github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_golang v1.14.0 // indirect; updated to latest
github.com/prometheus/client_model v0.3.0 // indirect
github.com/prometheus/common v0.37.0 // indirect
github.com/prometheus/procfs v0.8.0 // indirect
github.com/rogpeppe/go-internal v1.8.1 // indirect github.com/rogpeppe/go-internal v1.8.1 // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/twmb/murmur3 v1.1.8 // indirect github.com/twmb/murmur3 v1.1.8 // indirect
github.com/urfave/cli v1.22.12 // indirect github.com/urfave/cli v1.22.12 // indirect
go.opencensus.io v0.24.0 // indirect
go.uber.org/atomic v1.10.0 // indirect go.uber.org/atomic v1.10.0 // indirect
go.uber.org/multierr v1.11.0 // indirect go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.24.0 // indirect go.uber.org/zap v1.24.0
golang.org/x/exp v0.0.0-20230515195305-f3d0a9c9a5cc // indirect golang.org/x/exp v0.0.0-20230515195305-f3d0a9c9a5cc // indirect
golang.org/x/mod v0.9.0 // indirect golang.org/x/mod v0.9.0 // indirect
golang.org/x/net v0.10.0 // indirect; updated for CVE-2022-27664, CVE-2022-41717
golang.org/x/sync v0.2.0 // indirect golang.org/x/sync v0.2.0 // indirect
golang.org/x/sys v0.11.0 // indirect
golang.org/x/text v0.9.0 // indirect
golang.org/x/tools v0.7.0 // indirect golang.org/x/tools v0.7.0 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto v0.0.0-20230526161137-0005af68ea54 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20230525234035-dd9d682886f9 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20230525234035-dd9d682886f9 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20230525234030-28d5490b6b19 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20230525234030-28d5490b6b19 // indirect
google.golang.org/grpc v1.57.0
google.golang.org/protobuf v1.30.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect
cloud.google.com/go v0.110.0 // indirect
cloud.google.com/go/compute v1.19.1 // indirect
cloud.google.com/go/compute/metadata v0.2.3 // indirect
cloud.google.com/go/iam v0.13.0 // indirect
github.com/Azure/azure-sdk-for-go/sdk/internal v1.3.0 // indirect
github.com/AzureAD/microsoft-authentication-library-for-go v1.0.0 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/bitly/go-simplejson v0.5.0 // indirect
github.com/bugsnag/osext v0.0.0-20130617224835-0dd3f918b21b // indirect
github.com/bugsnag/panicwrap v0.0.0-20151223152923-e2c28503fcd0 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/cyphar/filepath-securejoin v0.2.3 // indirect
github.com/felixge/httpsnoop v1.0.1 // indirect
github.com/golang-jwt/jwt/v4 v4.5.0 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/google/go-cmp v0.5.9 // indirect
github.com/google/uuid v1.3.0
github.com/googleapis/enterprise-certificate-proxy v0.2.3 // indirect
github.com/googleapis/gax-go/v2 v2.7.1 // indirect
github.com/hashicorp/golang-lru/v2 v2.0.5 // indirect
github.com/inconshreveable/mousetrap v1.0.1 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/kr/pretty v0.3.0 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/kylelemons/godebug v1.1.0 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
github.com/mitchellh/osext v0.0.0-20151018003038-5e2d6d41470f // indirect
github.com/pkg/browser v0.0.0-20210911075715-681adbf594b8 // indirect
github.com/prometheus/client_golang v1.14.0 // indirect; updated to latest
github.com/prometheus/client_model v0.3.0 // indirect
github.com/prometheus/common v0.37.0 // indirect
github.com/prometheus/procfs v0.8.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
go.opencensus.io v0.24.0 // indirect
golang.org/x/net v0.10.0 // indirect; updated for CVE-2022-27664, CVE-2022-41717
golang.org/x/sys v0.11.0 // indirect
golang.org/x/text v0.9.0 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto v0.0.0-20230526161137-0005af68ea54 // indirect
google.golang.org/grpc v1.57.0 // indirect
google.golang.org/protobuf v1.30.0 // indirect
) )

View file

@ -83,7 +83,7 @@ func newSizeLimiterWriter(ctx context.Context, d *driver, path string, splitInfo
previous: formPreviousChain(splitInfo, parts), previous: formPreviousChain(splitInfo, parts),
parentHashers: parentHashers, parentHashers: parentHashers,
targetInit: func() transformer.ChunkedObjectWriter { targetInit: func() transformer.ChunkedObjectWriter {
return d.newObjTarget() return d.newObjTarget(path, splitInfo.SplitID())
}, },
parent: parent, parent: parent,
} }
@ -233,6 +233,11 @@ func (w *writer) release(withParent bool) (*transformer.AccessIdentifiers, error
if _, err = w.release(false); err != nil { if _, err = w.release(false); err != nil {
return nil, fmt.Errorf("could not release linking object: %w", err) return nil, fmt.Errorf("could not release linking object: %w", err)
} }
err = w.driver.treeService.DeleteObjectsBySplitID(w.ctx, w.driver.containerID, w.path, w.splitInfo.SplitID())
if err != nil {
return nil, fmt.Errorf("could not delete split objects: %w", err)
}
} }
return ids, nil return ids, nil
@ -459,5 +464,10 @@ func (w *writer) deleteParts() error {
} }
} }
err := w.driver.treeService.DeleteObjectsBySplitID(w.ctx, w.driver.containerID, w.path, w.splitInfo.SplitID())
if err != nil {
return fmt.Errorf("could not delete split objects: %w", err)
}
return nil return nil
} }

View file

@ -3,9 +3,11 @@ package frostfs
import ( import (
"context" "context"
"crypto/ecdsa" "crypto/ecdsa"
"errors"
"fmt" "fmt"
"io" "io"
"path/filepath" "path/filepath"
"regexp"
"sort" "sort"
"strconv" "strconv"
"strings" "strings"
@ -18,12 +20,15 @@ import (
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/transformer" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/transformer"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
treepool "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool/tree"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/version" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/version"
dcontext "github.com/distribution/distribution/v3/context" dcontext "github.com/distribution/distribution/v3/context"
storagedriver "github.com/distribution/distribution/v3/registry/storage/driver" storagedriver "github.com/distribution/distribution/v3/registry/storage/driver"
"github.com/distribution/distribution/v3/registry/storage/driver/base" "github.com/distribution/distribution/v3/registry/storage/driver/base"
"github.com/distribution/distribution/v3/registry/storage/driver/factory" "github.com/distribution/distribution/v3/registry/storage/driver/factory"
"github.com/distribution/distribution/v3/registry/storage/driver/frostfs/tree"
"github.com/distribution/distribution/v3/registry/storage/driver/frostfs/wrapper"
"github.com/nspcc-dev/neo-go/cli/flags" "github.com/nspcc-dev/neo-go/cli/flags"
"github.com/nspcc-dev/neo-go/pkg/wallet" "github.com/nspcc-dev/neo-go/pkg/wallet"
) )
@ -56,6 +61,11 @@ const (
defaultSessionExpirationDuration = 100 // in epoch defaultSessionExpirationDuration = 100 // in epoch
) )
var (
filePathValidateRegex = regexp.MustCompile(`^/([^/]+/)*[^/]+$`)
prefixPathValidateRegex = regexp.MustCompile(`^/([^/]+/?)*$`)
)
// DriverParameters is a struct that encapsulates all of the driver parameters after all values have been set. // DriverParameters is a struct that encapsulates all of the driver parameters after all values have been set.
type DriverParameters struct { type DriverParameters struct {
ContainerID string ContainerID string
@ -94,6 +104,7 @@ func (n *frostfsDriverFactory) Create(parameters map[string]interface{}) (storag
type driver struct { type driver struct {
sdkPool *pool.Pool sdkPool *pool.Pool
treeService *tree.Tree
owner *user.ID owner *user.ID
key *ecdsa.PrivateKey key *ecdsa.PrivateKey
containerID cid.ID containerID cid.ID
@ -306,7 +317,7 @@ func New(params DriverParameters) (*Driver, error) {
var owner user.ID var owner user.ID
user.IDFromKey(&owner, acc.PrivateKey().PrivateKey.PublicKey) user.IDFromKey(&owner, acc.PrivateKey().PrivateKey.PublicKey)
sdkPool, err := createPool(ctx, acc, params) sdkPool, treePool, err := createPool(ctx, acc, params)
if err != nil { if err != nil {
return nil, fmt.Errorf("couldn't create sdk pool: %w", err) return nil, fmt.Errorf("couldn't create sdk pool: %w", err)
} }
@ -321,8 +332,11 @@ func New(params DriverParameters) (*Driver, error) {
return nil, fmt.Errorf("couldn't get container id: %w", err) return nil, fmt.Errorf("couldn't get container id: %w", err)
} }
treeService := wrapper.NewPoolWrapper(treePool)
d := &driver{ d := &driver{
sdkPool: sdkPool, sdkPool: sdkPool,
treeService: tree.NewTree(treeService, nil),
owner: &owner, owner: &owner,
key: &acc.PrivateKey().PrivateKey, key: &acc.PrivateKey().PrivateKey,
containerID: cnrID, containerID: cnrID,
@ -376,28 +390,49 @@ func getContainerID(params DriverParameters) (cid.ID, error) {
return cnrID, nil return cnrID, nil
} }
func createPool(ctx context.Context, acc *wallet.Account, param DriverParameters) (*pool.Pool, error) { func createPool(ctx context.Context, acc *wallet.Account, param DriverParameters) (*pool.Pool, *treepool.Pool, error) {
var prm pool.InitParameters var prm pool.InitParameters
var prmTree treepool.InitParameters
prm.SetKey(&acc.PrivateKey().PrivateKey) prm.SetKey(&acc.PrivateKey().PrivateKey)
prmTree.SetKey(acc.PrivateKey())
prm.SetNodeDialTimeout(param.ConnectionTimeout) prm.SetNodeDialTimeout(param.ConnectionTimeout)
prmTree.SetNodeDialTimeout(param.ConnectionTimeout)
prm.SetHealthcheckTimeout(param.RequestTimeout) prm.SetHealthcheckTimeout(param.RequestTimeout)
prmTree.SetHealthcheckTimeout(param.RequestTimeout)
prm.SetClientRebalanceInterval(param.RebalanceInterval) prm.SetClientRebalanceInterval(param.RebalanceInterval)
prmTree.SetClientRebalanceInterval(param.RebalanceInterval)
prm.SetSessionExpirationDuration(param.SessionExpirationDuration) prm.SetSessionExpirationDuration(param.SessionExpirationDuration)
for _, peer := range param.Peers { for _, peer := range param.Peers {
prm.AddNode(pool.NewNodeParam(peer.Priority, peer.Address, peer.Weight)) nodeParam := pool.NewNodeParam(peer.Priority, peer.Address, peer.Weight)
prm.AddNode(nodeParam)
prmTree.AddNode(nodeParam)
} }
p, err := pool.NewPool(prm) p, err := pool.NewPool(prm)
if err != nil { if err != nil {
return nil, fmt.Errorf("create pool: %w", err) return nil, nil, fmt.Errorf("create pool: %w", err)
} }
if err = p.Dial(ctx); err != nil { if err = p.Dial(ctx); err != nil {
return nil, fmt.Errorf("dial pool: %w", err) return nil, nil, fmt.Errorf("dial pool: %w", err)
} }
return p, nil treePool, err := treepool.NewPool(prmTree)
if err != nil {
return nil, nil, fmt.Errorf("create tree pool: %w", err)
}
if err = treePool.Dial(ctx); err != nil {
return nil, nil, fmt.Errorf("dial tree pool: %w", err)
}
return p, treePool, nil
} }
func createNnsResolver(params DriverParameters) (*resolver.NNS, error) { func createNnsResolver(params DriverParameters) (*resolver.NNS, error) {
@ -468,23 +503,34 @@ func (d *driver) Name() string {
} }
func (d *driver) GetContent(ctx context.Context, path string) ([]byte, error) { func (d *driver) GetContent(ctx context.Context, path string) ([]byte, error) {
id, err := d.searchOne(ctx, path) if ok := validateFilePath(path); !ok {
return nil, storagedriver.InvalidPathError{Path: path}
}
treeObj, err := d.treeService.GetObjectByPath(ctx, d.containerID, path)
if err != nil { if err != nil {
return nil, err if errors.Is(err, tree.ErrNodeNotFound) {
return nil, storagedriver.PathNotFoundError{Path: path, DriverName: driverName}
}
return nil, fmt.Errorf("couldn't get object from tree: %w", err)
} }
var prm pool.PrmObjectGet var prm pool.PrmObjectGet
prm.SetAddress(d.objectAddress(id)) prm.SetAddress(d.objectAddress(treeObj.ObjID))
obj, err := d.sdkPool.GetObject(ctx, prm) obj, err := d.sdkPool.GetObject(ctx, prm)
if err != nil { if err != nil {
return nil, fmt.Errorf("couldn't get object '%s': %w", id, err) return nil, fmt.Errorf("couldn't get object '%s': %w", treeObj.ObjID, err)
} }
return io.ReadAll(obj.Payload) return io.ReadAll(obj.Payload)
} }
func (d *driver) PutContent(ctx context.Context, path string, content []byte) error { func (d *driver) PutContent(ctx context.Context, path string, content []byte) error {
if ok := validateFilePath(path); !ok {
return storagedriver.InvalidPathError{Path: path}
}
if err := d.Delete(ctx, path); err != nil { if err := d.Delete(ctx, path); err != nil {
return fmt.Errorf("couldn't delete '%s': %s", path, err) return fmt.Errorf("couldn't delete '%s': %s", path, err)
} }
@ -495,34 +541,35 @@ func (d *driver) PutContent(ctx context.Context, path string, content []byte) er
var prm pool.PrmObjectPut var prm pool.PrmObjectPut
prm.SetHeader(*obj) prm.SetHeader(*obj)
if _, err := d.sdkPool.PutObject(ctx, prm); err != nil { id, err := d.sdkPool.PutObject(ctx, prm)
if err != nil {
return fmt.Errorf("couldn't put object '%s': %w", path, err) return fmt.Errorf("couldn't put object '%s': %w", path, err)
} }
_, err = d.treeService.AddObject(ctx, d.containerID, path, id, uint64(len(content)))
if err != nil {
return fmt.Errorf("can't add object to tree: %w", err)
}
return nil return nil
} }
func (d *driver) Reader(ctx context.Context, path string, offset int64) (io.ReadCloser, error) { func (d *driver) Reader(ctx context.Context, path string, offset int64) (io.ReadCloser, error) {
id, err := d.searchOne(ctx, path) if ok := validateFilePath(path); !ok {
return nil, storagedriver.InvalidPathError{Path: path}
}
treeObj, err := d.treeService.GetObjectByPath(ctx, d.containerID, path)
if err != nil { if err != nil {
return nil, err return nil, fmt.Errorf("couldn't get object from tree: %w", err)
} }
addr := d.objectAddress(id) addr := d.objectAddress(treeObj.ObjID)
var prmHead pool.PrmObjectHead if uint64(offset) >= treeObj.PayloadSize {
prmHead.SetAddress(addr) return nil, fmt.Errorf("invalid offset %d for object length %d", offset, treeObj.PayloadSize)
obj, err := d.sdkPool.HeadObject(ctx, prmHead)
alexvanin marked this conversation as resolved Outdated

Seems like we can store payload size in k/v pair in treeObj meta, so we can remove this head request. The same is applicable for other head requests.

Seems like we can store payload size in k/v pair in `treeObj` meta, so we can remove this head request. The same is applicable for other head requests.

Maybe I am wrong, because we upload object part-by-part and we don't know total size unless we use head request. Need to look into it.

Maybe I am wrong, because we upload object part-by-part and we don't know total size unless we use head request. Need to look into it.

If object isn't fully uploaded we shouldn't read it I suppose

If object isn't fully uploaded we shouldn't read it I suppose
if err != nil {
return nil, fmt.Errorf("couldn't head object '%s', id '%s': %w", path, id, err)
} }
if uint64(offset) >= obj.PayloadSize() { length := treeObj.PayloadSize - uint64(offset)
return nil, fmt.Errorf("invalid offset %d for object length %d", offset, obj.PayloadSize())
}
length := obj.PayloadSize() - uint64(offset)
var prmRange pool.PrmObjectRange var prmRange pool.PrmObjectRange
prmRange.SetAddress(addr) prmRange.SetAddress(addr)
@ -532,7 +579,7 @@ func (d *driver) Reader(ctx context.Context, path string, offset int64) (io.Read
res, err := d.sdkPool.ObjectRange(ctx, prmRange) res, err := d.sdkPool.ObjectRange(ctx, prmRange)
if err != nil { if err != nil {
return nil, fmt.Errorf("couldn't get payload range of object '%s', offset %d, length %d, id '%s': %w", return nil, fmt.Errorf("couldn't get payload range of object '%s', offset %d, length %d, id '%s': %w",
path, offset, length, id, err) path, offset, length, treeObj.ObjID, err)
} }
return &res, nil return &res, nil
@ -543,6 +590,10 @@ func getUploadUUID(ctx context.Context) (uuid string) {
} }
func (d *driver) Writer(ctx context.Context, path string, append bool) (storagedriver.FileWriter, error) { func (d *driver) Writer(ctx context.Context, path string, append bool) (storagedriver.FileWriter, error) {
if ok := validateFilePath(path); !ok {
return nil, storagedriver.InvalidPathError{Path: path}
}
splitID := object.NewSplitID() splitID := object.NewSplitID()
uploadUUID := getUploadUUID(ctx) uploadUUID := getUploadUUID(ctx)
if err := splitID.Parse(uploadUUID); err != nil { if err := splitID.Parse(uploadUUID); err != nil {
@ -591,56 +642,42 @@ func (d *driver) Stat(ctx context.Context, path string) (storagedriver.FileInfo,
return newFileInfoDir(path), nil return newFileInfoDir(path), nil
} }
id, ok, err := d.searchOneBase(ctx, path, true) if ok := validatePrefixPath(path); !ok {
dkirillov marked this conversation as resolved Outdated

Methods d.searchOneBase and d.searchOne are not used anymore. Can be dropped.

Methods `d.searchOneBase` and `d.searchOne` are not used anymore. Can be dropped.
return nil, storagedriver.InvalidPathError{Path: path}
}
path = preparePrefixPath(path)
treeObj, err := d.treeService.GetObjectByPathDir(ctx, d.containerID, path)
if err != nil { if err != nil {
return nil, err if errors.Is(err, tree.ErrNodeNotFound) {
return nil, storagedriver.PathNotFoundError{Path: path, DriverName: driverName}
} else if errors.Is(err, tree.ErrOnlyDirFound) {
return newFileInfoDir(path), nil
}
return nil, fmt.Errorf("couldn't get object from tree: %w", err)
} }
// assume there is not object with directory name fileInfo := newFileInfo(*treeObj, "")
// e.g. if file '/a/b/c' exists, files '/a/b' and '/a' don't
if !ok {
return newFileInfoDir(path), nil
}
var prm pool.PrmObjectHead
prm.SetAddress(d.objectAddress(id))
obj, err := d.sdkPool.HeadObject(ctx, prm)
if err != nil {
return nil, fmt.Errorf("couldn't get head object '%s': %w", id, err)
}
fileInfo := newFileInfo(ctx, obj, "")
// e.g. search '/a/b' but because of prefix search we found '/a/b/c'
// so we should return directory
if fileInfo.Path() != path {
return newFileInfoDir(path), nil
}
return fileInfo, nil return fileInfo, nil
} }
func (d *driver) List(ctx context.Context, path string) ([]string, error) { func (d *driver) List(ctx context.Context, path string) ([]string, error) {
ids, err := d.searchByPrefix(ctx, path) if ok := validatePrefixPath(path); !ok {
return nil, storagedriver.InvalidPathError{Path: path}
}
path = preparePrefixPath(path)
treeObjList, err := d.treeService.GetListObjectByPrefix(ctx, d.containerID, path)
if err != nil { if err != nil {
return nil, fmt.Errorf("couldn't search by prefix '%s': %w", path, err) return nil, fmt.Errorf("couldn't get object from tree by prefix '%s': %w", path, err)
} }
added := make(map[string]bool) added := make(map[string]bool)
result := make([]string, 0, len(ids)) result := make([]string, 0, len(treeObjList))
for _, id := range ids { for _, treeObj := range treeObjList {
var prm pool.PrmObjectHead fileInf := newFileInfo(treeObj, path)
prm.SetAddress(d.objectAddress(id))
obj, err := d.sdkPool.HeadObject(ctx, prm)
if err != nil {
dcontext.GetLogger(ctx).Warnf("couldn't get list object '%s' in path '%s': %s", id, path, err)
continue
}
fileInf := newFileInfo(ctx, obj, path)
if !added[fileInf.Path()] { if !added[fileInf.Path()] {
result = append(result, fileInf.Path()) result = append(result, fileInf.Path())
added[fileInf.Path()] = true added[fileInf.Path()] = true
@ -652,106 +689,64 @@ func (d *driver) List(ctx context.Context, path string) ([]string, error) {
} }
func (d *driver) Move(ctx context.Context, sourcePath string, destPath string) error { func (d *driver) Move(ctx context.Context, sourcePath string, destPath string) error {
alexvanin marked this conversation as resolved Outdated

As far as I see, move operation does not modify object payload. I guess it is sufficient to change tree node in tree service and do not fetch and reupload object into storage. Is it correct?

As far as I see, move operation does not modify object payload. I guess it is sufficient to change tree node in tree service and do not fetch and reupload object into storage. Is it correct?

Currently we store filepath in object attributes, so we have to reupload object to change this attribute

Currently we store filepath in object attributes, so we have to reupload object to change this attribute

Aside of restoring tree in case of pilorama faillure on all container nodes, do we need filepath attribute in the object?

Aside of restoring tree in case of pilorama faillure on all container nodes, do we need filepath attribute in the object?

Probably not

Probably not

We've decided to keep filepath attribute in the object in case of global tree failures. At the same time, Move operation will not update filepath attribute, therefore will not reupload object.

We've decided to keep filepath attribute in the object in case of global tree failures. At the same time, `Move` operation will not update filepath attribute, therefore will not reupload object.
sourceID, err := d.searchOne(ctx, sourcePath) if ok := validateFilePath(sourcePath); !ok {
return storagedriver.InvalidPathError{Path: sourcePath}
}
if ok := validateFilePath(destPath); !ok {
return storagedriver.InvalidPathError{Path: destPath}
}
sourceTreeObj, err := d.treeService.GetObjectByPath(ctx, d.containerID, sourcePath)
if err != nil { if err != nil {
return err return fmt.Errorf("couldn't get object from tree: %w", err)
} }
if err = d.Delete(ctx, destPath); err != nil { _, err = d.treeService.AddObject(ctx, d.containerID, destPath, sourceTreeObj.ObjID, sourceTreeObj.PayloadSize)
return fmt.Errorf("couldn't delete '%s' object: %w", destPath, err)
}
sourceAddr := d.objectAddress(sourceID)
var prmGet pool.PrmObjectGet
prmGet.SetAddress(sourceAddr)
obj, err := d.sdkPool.GetObject(ctx, prmGet)
if err != nil { if err != nil {
return fmt.Errorf("could not get source object '%s' by oid '%s': %w", sourcePath, sourceID, err) return fmt.Errorf("can't add object to tree: %w", err)
}
defer func() {
if err = obj.Payload.Close(); err != nil {
dcontext.GetLogger(ctx).Errorf("couldn't close object payload reader, path '%s' by oid '%s': %s",
sourcePath, sourceID, err.Error())
}
}()
objHeader := d.formObject(destPath)
var prmPut pool.PrmObjectPut
prmPut.SetHeader(*objHeader)
prmPut.SetPayload(obj.Payload)
if _, err = d.sdkPool.PutObject(ctx, prmPut); err != nil {
return fmt.Errorf("couldn't put object '%s': %w", destPath, err)
} }
var prmDelete pool.PrmObjectDelete if err = d.treeService.DeleteObject(ctx, d.containerID, sourceTreeObj.ID); err != nil {
prmDelete.SetAddress(sourceAddr) return fmt.Errorf("couldn't delete object from tree: %w", err)
if err = d.sdkPool.DeleteObject(ctx, prmDelete); err != nil {
return fmt.Errorf("couldn't remove source file '%s', id '%s': %w", sourcePath, sourceID, err)
} }
return nil return nil
} }
func (d *driver) Delete(ctx context.Context, path string) error { func (d *driver) Delete(ctx context.Context, path string) error {
filters := object.NewSearchFilters() if ok := validatePrefixPath(path); !ok {
filters.AddRootFilter() return storagedriver.InvalidPathError{Path: path}
filters.AddFilter(attributeFilePath, path, object.MatchCommonPrefix) }
path = preparePrefixPath(path)
var prmSearch pool.PrmObjectSearch treeObjList, err := d.treeService.GetListObjectByPrefix(ctx, d.containerID, path)
prmSearch.SetContainerID(d.containerID)
prmSearch.SetFilters(filters)
res, err := d.sdkPool.SearchObjects(ctx, prmSearch)
if err != nil { if err != nil {
return fmt.Errorf("init searching using client: %w", err) return fmt.Errorf("couldn't get object from tree by prefix '%s': %w", path, err)
} }
defer res.Close() rootTreeObj, err := d.treeService.GetObjectByPath(ctx, d.containerID, path)
var inErr error
err = res.Iterate(func(id oid.ID) bool {
// Check if a key is a subpath (so that deleting "/a" does not delete "/ab").
isSubPath, err := d.checkIsSubPath(ctx, path, id)
if err != nil {
inErr = err
return true
}
if isSubPath {
if err = d.delete(ctx, id); err != nil {
inErr = fmt.Errorf("couldn't delete object by path '%s': %w", path, err)
return true
}
}
return false
})
if err == nil {
err = inErr
}
if err != nil { if err != nil {
return fmt.Errorf("iterate objects: %w", err) if !errors.Is(err, tree.ErrNodeNotFound) {
return fmt.Errorf("couldn't get object from tree: %w", err)
}
} else {
treeObjList = append(treeObjList, *rootTreeObj)
}
for _, treeObj := range treeObjList {
if err := d.delete(ctx, treeObj.ObjID); err != nil {
return fmt.Errorf("error in delete object oid: %s : %w", treeObj.ObjID, err)
}
dkirillov marked this conversation as resolved Outdated

I'm not sure this is correct because we get all object and deleting "/a" does delete "/ab" but must not.
It's better to get all nodes by the exact names and list sub tree if one of that node is intermediate.

I'm not sure this is correct because we get all object and `deleting "/a" does delete "/ab"` but must not. It's better to get all nodes by the exact names and list sub tree if one of that node is intermediate.

Not quite. In this case, when requesting deletion with the path /a, the file /ab will not be deleted, because:
GetListObjectByPrefix will first find the root node (intermediate) and then take a subtree with no depth limit. Then a separate request is made to get a node for the /a file via getObjectByPath (in which GetNodes is called along the /a path). As a result, /ab will not be in the final list. But if, despite this, it is better to use your approach, no problem.

Not quite. In this case, when requesting deletion with the path `/a`, the file `/ab` will not be deleted, because: `GetListObjectByPrefix` will first find the root node (intermediate) and then take a subtree with no depth limit. Then a separate request is made to get a node for the `/a` file via `getObjectByPath` (in which `GetNodes` is called along the `/a` path). As a result, `/ab` will not be in the final list. But if, despite this, it is better to use your approach, no problem.

Please check the following test:

diff --git a/registry/storage/driver/frostfs/frostfs.go b/registry/storage/driver/frostfs/frostfs.go
index 2d899ff9..ac32f6ad 100644
--- a/registry/storage/driver/frostfs/frostfs.go
+++ b/registry/storage/driver/frostfs/frostfs.go
@@ -17,6 +17,7 @@ import (
 	resolver "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/ns"
 	"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
 	oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
+	oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
 	"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/transformer"
 	"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
 	treepool "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool/tree"
@@ -527,11 +528,7 @@ func (d *driver) PutContent(ctx context.Context, path string, content []byte) er
 	var prm pool.PrmObjectPut
 	prm.SetHeader(*obj)
 
-	id, err := d.sdkPool.PutObject(ctx, prm)
-	if err != nil {
-		return fmt.Errorf("couldn't put object '%s': %w", path, err)
-	}
-	_, err = d.treeService.AddObject(ctx, d.containerID, path, id, uint64(len(content)))
+	_, err := d.treeService.AddObject(ctx, d.containerID, path, oidtest.ID(), uint64(len(content)))
 	if err != nil {
 		return fmt.Errorf("can't add object to tree: %w", err)
 	}
@@ -690,10 +687,6 @@ func (d *driver) Delete(ctx context.Context, path string) error {
 	}
 
 	for _, treeObj := range treeObjList {
-		if err := d.delete(ctx, treeObj.ObjID); err != nil {
-			return fmt.Errorf("error in delete object oid: %s : %w", treeObj.ObjID, err)
-		}
-
 		if err = d.treeService.DeleteObject(ctx, d.containerID, treeObj.ID); err != nil {
 			return fmt.Errorf("couldn't delete object from tree: %w", err)
 		}
diff --git a/registry/storage/driver/frostfs/frostfs_test.go b/registry/storage/driver/frostfs/frostfs_test.go
index 201c97af..ed8fc934 100644
--- a/registry/storage/driver/frostfs/frostfs_test.go
+++ b/registry/storage/driver/frostfs/frostfs_test.go
@@ -12,16 +12,19 @@ import (
 
 	"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
 	cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
+	cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
 	"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
 	"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
 	"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
 	storagedriver "github.com/distribution/distribution/v3/registry/storage/driver"
+	"github.com/distribution/distribution/v3/registry/storage/driver/frostfs/tree"
 	"github.com/google/uuid"
 	"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
 	"github.com/nspcc-dev/neo-go/pkg/wallet"
 	"github.com/stretchr/testify/require"
 	"github.com/testcontainers/testcontainers-go"
 	"github.com/testcontainers/testcontainers-go/wait"
+	"go.uber.org/zap/zaptest"
 )
 
 const (
@@ -44,6 +47,42 @@ func params(walletPath string, containerID cid.ID) map[string]interface{} {
 	}
 }
 
+func TestDelete(t *testing.T) {
+	ctx := context.Background()
+
+	key, err := keys.NewPrivateKeyFromHex("1dd37fba80fec4e6a6f13fd708d8dcb3b29def768017052f6c930fa1c5d90bbb")
+	require.NoError(t, err)
+
+	var owner user.ID
+	user.IDFromKey(&owner, key.PrivateKey.PublicKey)
+
+	log := zaptest.NewLogger(t)
+
+	treeCli, err := tree.NewTreeServiceClientMemory()
+	require.NoError(t, err)
+
+	d := driver{
+		treeService: tree.NewTree(treeCli, log),
+		owner:       &owner,
+		key:         &key.PrivateKey,
+		containerID: cidtest.ID(),
+	}
+
+	err = d.PutContent(ctx, "a", []byte("test"))
+	require.NoError(t, err)
+
+	err = d.PutContent(ctx, "ab", []byte("test"))
+	require.NoError(t, err)
+
+	err = d.Delete(ctx, "a")
+	require.NoError(t, err)
+
+	res, err := d.List(ctx, "")
+	require.NoError(t, err)
+	fmt.Println(res)
+	require.Contains(t, res, "ab")
+}
+
 func TestIntegration(t *testing.T) {
 	f, err := os.CreateTemp("", "wallet")
 	require.NoError(t, err)
diff --git a/registry/storage/driver/frostfs/tree/tree.go b/registry/storage/driver/frostfs/tree/tree.go
index a8e18337..0cccb787 100644
--- a/registry/storage/driver/frostfs/tree/tree.go
+++ b/registry/storage/driver/frostfs/tree/tree.go
@@ -196,6 +196,9 @@ func (c *Tree) GetListObjectByPrefix(ctx context.Context, containerID cid.ID, pr
 	nodes := make([]NodeResponse, 0)
 
 	for _, node := range subTree {
+		if node.GetNodeID() == 0 {
+			continue
+		}
 		if !isIntermediate(node) {
 			nodes = append(nodes, node)
 		}
Please check the following test: ```diff diff --git a/registry/storage/driver/frostfs/frostfs.go b/registry/storage/driver/frostfs/frostfs.go index 2d899ff9..ac32f6ad 100644 --- a/registry/storage/driver/frostfs/frostfs.go +++ b/registry/storage/driver/frostfs/frostfs.go @@ -17,6 +17,7 @@ import ( resolver "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/ns" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" + oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/transformer" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool" treepool "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool/tree" @@ -527,11 +528,7 @@ func (d *driver) PutContent(ctx context.Context, path string, content []byte) er var prm pool.PrmObjectPut prm.SetHeader(*obj) - id, err := d.sdkPool.PutObject(ctx, prm) - if err != nil { - return fmt.Errorf("couldn't put object '%s': %w", path, err) - } - _, err = d.treeService.AddObject(ctx, d.containerID, path, id, uint64(len(content))) + _, err := d.treeService.AddObject(ctx, d.containerID, path, oidtest.ID(), uint64(len(content))) if err != nil { return fmt.Errorf("can't add object to tree: %w", err) } @@ -690,10 +687,6 @@ func (d *driver) Delete(ctx context.Context, path string) error { } for _, treeObj := range treeObjList { - if err := d.delete(ctx, treeObj.ObjID); err != nil { - return fmt.Errorf("error in delete object oid: %s : %w", treeObj.ObjID, err) - } - if err = d.treeService.DeleteObject(ctx, d.containerID, treeObj.ID); err != nil { return fmt.Errorf("couldn't delete object from tree: %w", err) } diff --git a/registry/storage/driver/frostfs/frostfs_test.go b/registry/storage/driver/frostfs/frostfs_test.go index 201c97af..ed8fc934 100644 --- a/registry/storage/driver/frostfs/frostfs_test.go +++ b/registry/storage/driver/frostfs/frostfs_test.go @@ -12,16 +12,19 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" + cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user" storagedriver "github.com/distribution/distribution/v3/registry/storage/driver" + "github.com/distribution/distribution/v3/registry/storage/driver/frostfs/tree" "github.com/google/uuid" "github.com/nspcc-dev/neo-go/pkg/crypto/keys" "github.com/nspcc-dev/neo-go/pkg/wallet" "github.com/stretchr/testify/require" "github.com/testcontainers/testcontainers-go" "github.com/testcontainers/testcontainers-go/wait" + "go.uber.org/zap/zaptest" ) const ( @@ -44,6 +47,42 @@ func params(walletPath string, containerID cid.ID) map[string]interface{} { } } +func TestDelete(t *testing.T) { + ctx := context.Background() + + key, err := keys.NewPrivateKeyFromHex("1dd37fba80fec4e6a6f13fd708d8dcb3b29def768017052f6c930fa1c5d90bbb") + require.NoError(t, err) + + var owner user.ID + user.IDFromKey(&owner, key.PrivateKey.PublicKey) + + log := zaptest.NewLogger(t) + + treeCli, err := tree.NewTreeServiceClientMemory() + require.NoError(t, err) + + d := driver{ + treeService: tree.NewTree(treeCli, log), + owner: &owner, + key: &key.PrivateKey, + containerID: cidtest.ID(), + } + + err = d.PutContent(ctx, "a", []byte("test")) + require.NoError(t, err) + + err = d.PutContent(ctx, "ab", []byte("test")) + require.NoError(t, err) + + err = d.Delete(ctx, "a") + require.NoError(t, err) + + res, err := d.List(ctx, "") + require.NoError(t, err) + fmt.Println(res) + require.Contains(t, res, "ab") +} + func TestIntegration(t *testing.T) { f, err := os.CreateTemp("", "wallet") require.NoError(t, err) diff --git a/registry/storage/driver/frostfs/tree/tree.go b/registry/storage/driver/frostfs/tree/tree.go index a8e18337..0cccb787 100644 --- a/registry/storage/driver/frostfs/tree/tree.go +++ b/registry/storage/driver/frostfs/tree/tree.go @@ -196,6 +196,9 @@ func (c *Tree) GetListObjectByPrefix(ctx context.Context, containerID cid.ID, pr nodes := make([]NodeResponse, 0) for _, node := range subTree { + if node.GetNodeID() == 0 { + continue + } if !isIntermediate(node) { nodes = append(nodes, node) } ```

This test will not pass due to the fact that paths must start with a slash (distribution, when calling driver methods, always passes paths starting with a slash). But now I have added path validation to the driver methods. I also made a change regarding skipping the root node in GetListObjectByPrefix.

An example of a test that works and considers the situation you described above:

func TestDelete(t *testing.T) {  
    ctx := context.Background()  
  
    key, err := keys.NewPrivateKeyFromHex("1dd37fba80fec4e6a6f13fd708d8dcb3b29def768017052f6c930fa1c5d90bbb")  
    require.NoError(t, err)  
  
    var owner user.ID  
    user.IDFromKey(&owner, key.PrivateKey.PublicKey)  
  
    treeCli, err := tree.NewTreeServiceClientMemory()  
    require.NoError(t, err)  
  
    d := &driver{  
       treeService: tree.NewTree(treeCli, nil),  
       owner:       &owner,  
       key:         &key.PrivateKey,  
       containerID: cidtest.ID(),  
    }  
  
    err = d.PutContent(ctx, "/a", []byte("test"))  
    require.NoError(t, err)  
  
    err = d.PutContent(ctx, "/ab", []byte("test"))  
    require.NoError(t, err)  
  
    err = d.Delete(ctx, "/a")  
    require.NoError(t, err)  
  
    res, err := d.List(ctx, "/")  
    require.NoError(t, err)  
    require.Contains(t, res, "/ab")  
}
This test will not pass due to the fact that paths must start with a slash (distribution, when calling driver methods, always passes paths starting with a slash). But now I have added path validation to the driver methods. I also made a change regarding skipping the root node in `GetListObjectByPrefix`. An example of a test that works and considers the situation you described above: ``` func TestDelete(t *testing.T) { ctx := context.Background() key, err := keys.NewPrivateKeyFromHex("1dd37fba80fec4e6a6f13fd708d8dcb3b29def768017052f6c930fa1c5d90bbb") require.NoError(t, err) var owner user.ID user.IDFromKey(&owner, key.PrivateKey.PublicKey) treeCli, err := tree.NewTreeServiceClientMemory() require.NoError(t, err) d := &driver{ treeService: tree.NewTree(treeCli, nil), owner: &owner, key: &key.PrivateKey, containerID: cidtest.ID(), } err = d.PutContent(ctx, "/a", []byte("test")) require.NoError(t, err) err = d.PutContent(ctx, "/ab", []byte("test")) require.NoError(t, err) err = d.Delete(ctx, "/a") require.NoError(t, err) res, err := d.List(ctx, "/") require.NoError(t, err) require.Contains(t, res, "/ab") } ```
if err = d.treeService.DeleteObject(ctx, d.containerID, treeObj.ID); err != nil {
return fmt.Errorf("couldn't delete object from tree: %w", err)
}
} }
return nil return nil
} }
func (d *driver) checkIsSubPath(ctx context.Context, path string, id oid.ID) (bool, error) {
var prmHead pool.PrmObjectHead
prmHead.SetAddress(d.objectAddress(id))
obj, err := d.sdkPool.HeadObject(ctx, prmHead)
if err != nil {
return false, fmt.Errorf("couldn't head object part '%s', id '%s': %w", path, id, err)
}
fileInf := newFileInfo(ctx, obj, "")
return fileInf.Path() <= path || fileInf.Path()[len(path)] == '/', nil
}
func (d *driver) delete(ctx context.Context, id oid.ID) error { func (d *driver) delete(ctx context.Context, id oid.ID) error {
var prm pool.PrmObjectDelete var prm pool.PrmObjectDelete
prm.SetAddress(d.objectAddress(id)) prm.SetAddress(d.objectAddress(id))
@ -770,166 +765,47 @@ func (d *driver) Walk(ctx context.Context, path string, fn storagedriver.WalkFn)
return storagedriver.WalkFallback(ctx, d, path, fn) return storagedriver.WalkFallback(ctx, d, path, fn)
} }
func (d *driver) searchByPrefix(ctx context.Context, prefix string) ([]oid.ID, error) {
filters := object.NewSearchFilters()
filters.AddRootFilter()
filters.AddFilter(attributeFilePath, prefix, object.MatchCommonPrefix)
return d.baseSearch(ctx, filters)
}
func (d *driver) headSplitParts(ctx context.Context, splitID *object.SplitID, path string, isAppend bool) ([]*object.Object, map[oid.ID]struct{}, error) { func (d *driver) headSplitParts(ctx context.Context, splitID *object.SplitID, path string, isAppend bool) ([]*object.Object, map[oid.ID]struct{}, error) {
filters := object.NewSearchFilters() ids, err := d.treeService.GetListOIDBySplitID(ctx, d.containerID, path, splitID)
filters.AddPhyFilter()
filters.AddSplitIDFilter(object.MatchStringEqual, splitID)
var prm pool.PrmObjectSearch
prm.SetContainerID(d.containerID)
prm.SetFilters(filters)
res, err := d.sdkPool.SearchObjects(ctx, prm)
if err != nil { if err != nil {
return nil, nil, fmt.Errorf("init searching using client: %w", err) return nil, nil, fmt.Errorf("couldn't get object from tree by split id '%s': %w", path, err)
} }
defer res.Close() var (
addr oid.Address
prmHead pool.PrmObjectHead
objects []*object.Object
)
var addr oid.Address
addr.SetContainer(d.containerID) addr.SetContainer(d.containerID)
var inErr error
var prmHead pool.PrmObjectHead
var objects []*object.Object
noChild := make(map[oid.ID]struct{}) noChild := make(map[oid.ID]struct{})
err = res.Iterate(func(id oid.ID) bool { for _, objID := range ids {
addr.SetObject(id) addr.SetObject(objID)
prmHead.SetAddress(addr) prmHead.SetAddress(addr)
obj, err := d.sdkPool.HeadObject(ctx, prmHead) obj, err := d.sdkPool.HeadObject(ctx, prmHead)
if err != nil { if err != nil {
inErr = fmt.Errorf("couldn't head object part '%s', id '%s', splitID '%s': %w", path, id, splitID, err) return nil, nil, fmt.Errorf("couldn't head object part '%s', id '%s', splitID '%s': %w", path, objID, splitID, err)
return true
} }
if isAppend { if isAppend {
objects = append(objects, &obj) objects = append(objects, &obj)
noChild[id] = struct{}{} noChild[objID] = struct{}{}
return false continue
} }
inErr = fmt.Errorf("init upload part '%s' already exist, splitID '%s'", path, splitID) return nil, nil, fmt.Errorf("init upload part '%s' already exist, splitID '%s'", path, splitID)
return true
})
if err == nil {
err = inErr
}
if err != nil {
return nil, nil, fmt.Errorf("iterate objects: %w", err)
} }
return objects, noChild, nil return objects, noChild, nil
} }
func (d *driver) baseSearch(ctx context.Context, filters object.SearchFilters) ([]oid.ID, error) { func newFileInfo(treeObj tree.TreeNode, prefix string) storagedriver.FileInfo {
var prm pool.PrmObjectSearch
prm.SetContainerID(d.containerID)
prm.SetFilters(filters)
res, err := d.sdkPool.SearchObjects(ctx, prm)
if err != nil {
return nil, fmt.Errorf("init searching using client: %w", err)
}
defer res.Close()
var buf []oid.ID
err = res.Iterate(func(id oid.ID) bool {
buf = append(buf, id)
return false
})
if err != nil {
return nil, fmt.Errorf("iterate objects: %w", err)
}
return buf, nil
}
func (d *driver) searchOne(ctx context.Context, path string) (oid.ID, error) {
id, ok, err := d.searchOneBase(ctx, path, false)
if err != nil {
return oid.ID{}, err
}
if !ok {
return oid.ID{}, fmt.Errorf("found more than one object by path '%s'", path)
}
return id, nil
}
func (d *driver) searchOneBase(ctx context.Context, path string, byPrefix bool) (resID oid.ID, ok bool, err error) {
filters := object.NewSearchFilters()
filters.AddRootFilter()
if byPrefix {
filters.AddFilter(attributeFilePath, path, object.MatchCommonPrefix)
} else {
filters.AddFilter(attributeFilePath, path, object.MatchStringEqual)
}
var prm pool.PrmObjectSearch
prm.SetContainerID(d.containerID)
prm.SetFilters(filters)
res, err := d.sdkPool.SearchObjects(ctx, prm)
if err != nil {
return oid.ID{}, false, fmt.Errorf("init searching using client: %w", err)
}
defer res.Close()
var found bool
err = res.Iterate(func(id oid.ID) bool {
if found {
ok = false
return true
}
found = true
resID = id
ok = true
return false
})
if err != nil {
return oid.ID{}, false, fmt.Errorf("iterate objects by path '%s': %w", path, err)
}
if !found {
return oid.ID{}, false, storagedriver.PathNotFoundError{Path: path, DriverName: driverName}
}
return resID, ok, nil
}
func newFileInfo(ctx context.Context, obj object.Object, prefix string) storagedriver.FileInfo {
fileInfoFields := storagedriver.FileInfoFields{ fileInfoFields := storagedriver.FileInfoFields{
Size: int64(obj.PayloadSize()), Path: treeObj.FullPath,
} Size: int64(treeObj.PayloadSize),
ModTime: treeObj.ModificationTime,
for _, attr := range obj.Attributes() {
switch attr.Key() {
case attributeFilePath:
fileInfoFields.Path = attr.Value()
case object.AttributeTimestamp:
timestamp, err := strconv.ParseInt(attr.Value(), 10, 64)
if err != nil {
objID, _ := obj.ID()
dcontext.GetLogger(ctx).Warnf("object '%s' has invalid timestamp '%s'", objID.EncodeToString(), attr.Value())
continue
}
fileInfoFields.ModTime = time.Unix(timestamp, 0)
}
} }
if len(prefix) > 0 { if len(prefix) > 0 {
@ -956,18 +832,39 @@ func newFileInfoDir(path string) storagedriver.FileInfo {
} }
} }
func (d *driver) newObjTarget() transformer.ChunkedObjectWriter { func validatePrefixPath(path string) bool {
return prefixPathValidateRegex.MatchString(path)
}
func validateFilePath(path string) bool {
return filePathValidateRegex.MatchString(path)
}
func preparePrefixPath(path string) string {
if strings.HasSuffix(path, "/") {
return path[:len(path)-1]
}
return path
}
func (d *driver) newObjTarget(path string, splitID *object.SplitID) transformer.ChunkedObjectWriter {
return &objTarget{ return &objTarget{
sdkPool: d.sdkPool, sdkPool: d.sdkPool,
key: d.key, treeService: d.treeService,
splitID: splitID,
key: d.key,
path: path,
} }
} }
type objTarget struct { type objTarget struct {
sdkPool *pool.Pool sdkPool *pool.Pool
key *ecdsa.PrivateKey treeService *tree.Tree
obj *object.Object path string
chunks [][]byte splitID *object.SplitID
key *ecdsa.PrivateKey
obj *object.Object
chunks [][]byte
} }
func (t *objTarget) WriteHeader(_ context.Context, obj *object.Object) error { func (t *objTarget) WriteHeader(_ context.Context, obj *object.Object) error {
@ -999,11 +896,14 @@ func (t *objTarget) Close(ctx context.Context) (*transformer.AccessIdentifiers,
t.obj.SetCreationEpoch(currEpoch) t.obj.SetCreationEpoch(currEpoch)
var ( var (
parID oid.ID parID oid.ID
parHdr *object.Object parHdr *object.Object
isCommit bool
sizeFinalObject uint64
) )
if par := t.obj.Parent(); par != nil && par.Signature() == nil { if par := t.obj.Parent(); par != nil && par.Signature() == nil {
isCommit = true
objPar := object.NewFromV2(par.ToV2()) objPar := object.NewFromV2(par.ToV2())
objPar.SetCreationEpoch(currEpoch) objPar.SetCreationEpoch(currEpoch)
@ -1013,6 +913,7 @@ func (t *objTarget) Close(ctx context.Context) (*transformer.AccessIdentifiers,
} }
parID, _ = objPar.ID() parID, _ = objPar.ID()
sizeFinalObject = objPar.PayloadSize()
t.obj.SetParent(objPar) t.obj.SetParent(objPar)
} }
@ -1029,12 +930,28 @@ func (t *objTarget) Close(ctx context.Context) (*transformer.AccessIdentifiers,
var prm pool.PrmObjectPut var prm pool.PrmObjectPut
prm.SetHeader(*t.obj) prm.SetHeader(*t.obj)
id, err := t.sdkPool.PutObject(ctx, prm)
_, err = t.sdkPool.PutObject(ctx, prm)
if err != nil { if err != nil {
return nil, fmt.Errorf("couldn't put part: %w", err) return nil, fmt.Errorf("couldn't put part: %w", err)
} }
containerID, ok := t.obj.ContainerID()
if !ok {
return nil, fmt.Errorf("error in get container id from object")
}
if isCommit {
_, err := t.treeService.AddObject(ctx, containerID, t.path, parID, sizeFinalObject)
if err != nil {
return nil, fmt.Errorf("can't add object to tree: %w", err)
}
}
_, err = t.treeService.AddPHYObject(ctx, containerID, t.path, id, t.splitID)
if err != nil {
return nil, fmt.Errorf("can't add phy object to tree: %w", err)
}
objID, _ := t.obj.ID() objID, _ := t.obj.ID()
return &transformer.AccessIdentifiers{ return &transformer.AccessIdentifiers{

View file

@ -7,6 +7,7 @@ import (
"fmt" "fmt"
"io" "io"
"os" "os"
"strings"
"testing" "testing"
"time" "time"
@ -75,8 +76,7 @@ func TestIntegration(t *testing.T) {
rootCtx := context.Background() rootCtx := context.Background()
aioImage := "truecloudlab/frostfs-aio:" aioImage := "truecloudlab/frostfs-aio:"
versions := []string{ versions := []string{
"1.2.7", "1.3.0",
"1.2.8",
} }
for _, aioVersion := range versions { for _, aioVersion := range versions {
@ -214,6 +214,7 @@ func testWriteRead(rootCtx context.Context, t *testing.T, drvr storagedriver.Sto
func testList(rootCtx context.Context, t *testing.T, drvr storagedriver.StorageDriver, version string) { func testList(rootCtx context.Context, t *testing.T, drvr storagedriver.StorageDriver, version string) {
ctx, path := formCtxAndPath(rootCtx, version) ctx, path := formCtxAndPath(rootCtx, version)
path = "/list" + path
fileWriter, err := drvr.Writer(ctx, path, false) fileWriter, err := drvr.Writer(ctx, path, false)
require.NoError(t, err) require.NoError(t, err)
@ -229,7 +230,7 @@ func testList(rootCtx context.Context, t *testing.T, drvr storagedriver.StorageD
err = fileWriter.Commit() err = fileWriter.Commit()
require.NoError(t, err) require.NoError(t, err)
res, err := drvr.List(ctx, path) res, err := drvr.List(ctx, getParentDirPath(path))
require.NoError(t, err) require.NoError(t, err)
require.Len(t, res, 1) require.Len(t, res, 1)
require.Contains(t, res, path) require.Contains(t, res, path)
@ -340,3 +341,12 @@ func createContainer(ctx context.Context, t *testing.T, clientPool *pool.Pool, o
return cnrID return cnrID
} }
func getParentDirPath(path string) string {
lastSlashIndex := strings.LastIndex(path, "/")
if lastSlashIndex != -1 {
path = path[:lastSlashIndex]
}
return path
}

View file

@ -0,0 +1,410 @@
package tree
import (
"context"
"errors"
"fmt"
"strconv"
"strings"
"time"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.uber.org/zap"
)
type (
Tree struct {
service ServiceClient
log *zap.Logger
}
// ServiceClient is a client to interact with tree service.
// Each method must return ErrNodeNotFound or ErrNodeAccessDenied if relevant.
ServiceClient interface {
GetNodes(ctx context.Context, p *GetNodesParams) ([]NodeResponse, error)
GetSubTree(ctx context.Context, containerID cid.ID, treeID string, rootID uint64, depth uint32) ([]NodeResponse, error)
dkirillov marked this conversation as resolved Outdated

This is unused method (some others too). Can we drop it then?

This is unused method (some others too). Can we drop it then?
AddNodeByPath(ctx context.Context, containerID cid.ID, treeID string, path []string, meta map[string]string) (uint64, error)
RemoveNode(ctx context.Context, containerID cid.ID, treeID string, nodeID uint64) error
}
SubTreeStream interface {
Next() (NodeResponse, error)
}
TreeNode struct {
ID uint64
ParentID uint64
ObjID oid.ID
TimeStamp uint64
ModificationTime time.Time
PayloadSize uint64
FullPath string
Meta map[string]string
}
GetNodesParams struct {
ContainerID cid.ID
TreeID string
Path []string
Meta []string
LatestOnly bool
AllAttrs bool
}
)
type Meta interface {
GetKey() string
GetValue() []byte
}
type NodeResponse interface {
GetMeta() []Meta
GetNodeID() uint64
GetParentID() uint64
GetTimestamp() uint64
}
const (
FileNameKey = "FileName"
oidKV = "OID"
splitIDKV = "SplitID"
payloadSize = "PayloadSize"
fullPath = "FullPath"
modificationTime = "ModificationTime"
dkirillov marked this conversation as resolved Outdated

@alexvanin Did we decide use the same tree as for s3 objects?

@alexvanin Did we decide use the same tree as for s3 objects?

I think so, but there is no strong opinion on this.

I think so, but there is no strong opinion on this.

I have some concerns about using the same container/bucket in both ways: via s3 and distribution

I have some concerns about using the same container/bucket in both ways: via s3 and distribution

Decided to use different tree names for distribution.

Decided to use different tree names for distribution.
objectTree = "distribution-object"
splitTree = "distribution-split"
dkirillov marked this conversation as resolved Outdated

Can we name this more distribution specific?
Like distribution-object, distribution-split for example

Can we name this more distribution specific? Like `distribution-object`, `distribution-split` for example
separator = "/"
maxDepth = 0
)
var (
// ErrNodeNotFound is returned from ServiceClient in case of not found error.
ErrNodeNotFound = errors.New("not found")
ErrOnlyDirFound = errors.New("only directory found")
// ErrNodeAccessDenied is returned from ServiceClient service in case of access denied error.
ErrNodeAccessDenied = errors.New("access denied")
// ErrGatewayTimeout is returned from ServiceClient service in case of timeout error.
ErrGatewayTimeout = errors.New("gateway timeout")
)
func NewTree(service ServiceClient, log *zap.Logger) *Tree {
return &Tree{
service: service,
log: log,
}
}
func (c *Tree) GetObjectByPath(ctx context.Context, containerID cid.ID, path string) (*TreeNode, error) {
newPath := pathFromName(path)
p := &GetNodesParams{
ContainerID: containerID,
TreeID: objectTree,
Path: newPath,
LatestOnly: false,
AllAttrs: true,
}
nodes, err := c.service.GetNodes(ctx, p)
if err != nil {
return nil, err
}
var result NodeResponse
switch {
case len(nodes) == 0 || len(nodes) == 1 && isIntermediate(nodes[0]):
return nil, ErrNodeNotFound
case len(nodes) > 2:
return nil, fmt.Errorf("found more than two nodes")
default:
result = nodes[0]
if len(nodes) == 2 && isIntermediate(result) {
result = nodes[1]
}
}
treeNode, err := newTreeNode(result)
if err != nil {
return nil, fmt.Errorf("error parse tree node: %w", err)
}
return treeNode, nil
}
func (c *Tree) GetObjectByPathDir(ctx context.Context, containerID cid.ID, path string) (*TreeNode, error) {
newPath := pathFromName(path)
p := &GetNodesParams{
ContainerID: containerID,
TreeID: objectTree,
Path: newPath,
LatestOnly: false,
AllAttrs: true,
}
nodes, err := c.service.GetNodes(ctx, p)
if err != nil {
return nil, err
}
var result NodeResponse
switch {
case len(nodes) == 0:
return nil, ErrNodeNotFound
case len(nodes) == 1 && isIntermediate(nodes[0]):
return nil, ErrOnlyDirFound
case len(nodes) > 2:
return nil, fmt.Errorf("found more than two nodes")
default:
result = nodes[0]
if len(nodes) == 2 && isIntermediate(result) {
result = nodes[1]
}
}
treeNode, err := newTreeNode(result)
if err != nil {
return nil, fmt.Errorf("error parse tree node: %w", err)
}
return treeNode, nil
}
func (c *Tree) GetListObjectByPrefix(ctx context.Context, containerID cid.ID, prefixPath string) ([]TreeNode, error) {
rootID, err := c.determinePrefixNode(ctx, containerID, objectTree, prefixPath)
if err != nil {
if errors.Is(err, ErrNodeNotFound) {
return nil, nil
}
return nil, err
}
subTree, err := c.service.GetSubTree(ctx, containerID, objectTree, rootID, maxDepth)
if err != nil {
if errors.Is(err, ErrNodeNotFound) {
return nil, nil
}
return nil, err
}
nodes := make([]NodeResponse, 0)
for _, node := range subTree {
if node.GetNodeID() == rootID {
dkirillov marked this conversation as resolved Outdated

Actually we have to check for rootID:

if node.GetNodeID() == rootID {
Actually we have to check for `rootID`: ```golang if node.GetNodeID() == rootID { ```
continue
}
if !isIntermediate(node) {
nodes = append(nodes, node)
}
}
result := make([]TreeNode, 0, len(nodes))
for _, v := range nodes {
node, err := newTreeNode(v)
if err != nil {
return nil, fmt.Errorf("error parse tree node: %w", err)
}
result = append(result, *node)
}
return result, nil
}
func (c *Tree) GetListOIDBySplitID(ctx context.Context, containerID cid.ID, path string, splitID *object.SplitID) ([]oid.ID, error) {
nodes, err := c.getNodesBySplitID(ctx, containerID, path, splitID)
if err != nil {
return nil, err
}
ids := make([]oid.ID, len(nodes))
for i, node := range nodes {
ids[i] = node.ObjID
}
return ids, nil
}
func (c *Tree) AddObject(ctx context.Context, containerID cid.ID, filePath string, oid oid.ID, size uint64) (uint64, error) {
path := pathFromName(filePath)
meta := map[string]string{
oidKV: oid.String(),
FileNameKey: path[len(path)-1],
payloadSize: strconv.FormatUint(size, 10),
fullPath: filePath,
modificationTime: strconv.FormatInt(time.Now().Unix(), 10),
}
nodeID, err := c.service.AddNodeByPath(ctx, containerID, objectTree, path[:len(path)-1], meta)
return nodeID, err
}
func (c *Tree) AddPHYObject(ctx context.Context, containerID cid.ID, filePath string, oid oid.ID, splitID *object.SplitID) (uint64, error) {
meta := map[string]string{
oidKV: oid.String(),
splitIDKV: splitID.String(),
}
nodeID, err := c.service.AddNodeByPath(ctx, containerID, splitTree, pathFromName(filePath), meta)
return nodeID, err
}
func (c *Tree) DeleteObject(ctx context.Context, containerID cid.ID, nodeID uint64) error {
return c.service.RemoveNode(ctx, containerID, objectTree, nodeID)
}
func (c *Tree) DeleteObjectsBySplitID(ctx context.Context, containerID cid.ID, path string, splitID *object.SplitID) error {
splitNodes, err := c.getNodesBySplitID(ctx, containerID, path, splitID)
if err != nil {
return err
}
for _, node := range splitNodes {
err = c.service.RemoveNode(ctx, containerID, splitTree, node.ID)
if err != nil {
return err
}
}
return nil
}
func (c *Tree) determinePrefixNode(ctx context.Context, containerID cid.ID, treeID, prefixPath string) (uint64, error) {
var rootID uint64
path := pathFromName(prefixPath)
if len(path) > 1 {
var err error
rootID, err = c.getPrefixNodeID(ctx, containerID, treeID, path)
if err != nil {
return 0, err
}
}
return rootID, nil
}
func (c *Tree) getPrefixNodeID(ctx context.Context, containerID cid.ID, treeID string, prefixPath []string) (uint64, error) {
p := &GetNodesParams{
ContainerID: containerID,
TreeID: treeID,
Path: prefixPath,
LatestOnly: false,
AllAttrs: true,
}
nodes, err := c.service.GetNodes(ctx, p)
if err != nil {
return 0, err
}
var intermediateNodes []uint64
for _, node := range nodes {
if isIntermediate(node) {
intermediateNodes = append(intermediateNodes, node.GetNodeID())
}
}
if len(intermediateNodes) == 0 {
return 0, ErrNodeNotFound
}
if len(intermediateNodes) > 1 {
return 0, fmt.Errorf("found more than one intermediate nodes")
}
return intermediateNodes[0], nil
}
func (c *Tree) getNodesBySplitID(ctx context.Context, containerID cid.ID, filePath string, splitID *object.SplitID) ([]TreeNode, error) {
rootID, err := c.determinePrefixNode(ctx, containerID, splitTree, filePath)
if err != nil {
if errors.Is(err, ErrNodeNotFound) {
return nil, nil
}
return nil, err
}
subTree, err := c.service.GetSubTree(ctx, containerID, splitTree, rootID, 2)
if err != nil {
if errors.Is(err, ErrNodeNotFound) {
return nil, nil
}
return nil, err
}
nodes := make([]TreeNode, 0, len(subTree))
for _, v := range subTree {
node, err := newTreeNode(v)
if err != nil {
return nil, fmt.Errorf("error parse tree node: %w", err)
}
nodes = append(nodes, *node)
}
result := make([]TreeNode, 0)
for _, node := range nodes {
if val, ok := node.Meta[splitIDKV]; ok {
if val == splitID.String() {
result = append(result, node)
}
}
}
return result, nil
}
func isIntermediate(node NodeResponse) bool {
if len(node.GetMeta()) != 1 {
return false
}
return node.GetMeta()[0].GetKey() == FileNameKey
}
func newTreeNode(nodeInfo NodeResponse) (*TreeNode, error) {
treeNode := &TreeNode{
ID: nodeInfo.GetNodeID(),
ParentID: nodeInfo.GetParentID(),
TimeStamp: nodeInfo.GetTimestamp(),
Meta: make(map[string]string, len(nodeInfo.GetMeta())),
}
for _, kv := range nodeInfo.GetMeta() {
switch kv.GetKey() {
case oidKV:
if err := treeNode.ObjID.DecodeString(string(kv.GetValue())); err != nil {
return nil, err
}
case payloadSize:
if sizeStr := string(kv.GetValue()); len(sizeStr) > 0 {
var err error
if treeNode.PayloadSize, err = strconv.ParseUint(sizeStr, 10, 64); err != nil {
return nil, fmt.Errorf("invalid payload size value '%s': %w", sizeStr, err)
}
}
case modificationTime:
timestamp, err := strconv.ParseInt(string(kv.GetValue()), 10, 64)
if err != nil {
return nil, fmt.Errorf("invalid modification time value '%s': %w", string(kv.GetValue()), err)
}
treeNode.ModificationTime = time.Unix(timestamp, 0)
case fullPath:
treeNode.FullPath = string(kv.GetValue())
default:
treeNode.Meta[kv.GetKey()] = string(kv.GetValue())
}
}
return treeNode, nil
}
func pathFromName(objectName string) []string {
return strings.Split(objectName, separator)
}

View file

@ -0,0 +1,347 @@
package tree
import (
"context"
"fmt"
"sort"
"time"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
)
type nodeMeta struct {
key string
value []byte
}
func (m nodeMeta) GetKey() string {
return m.key
}
func (m nodeMeta) GetValue() []byte {
return m.value
}
type nodeResponse struct {
meta []nodeMeta
nodeID uint64
parentID uint64
timestamp uint64
}
func (n nodeResponse) GetNodeID() uint64 {
return n.nodeID
}
func (n nodeResponse) GetParentID() uint64 {
return n.parentID
}
func (n nodeResponse) GetTimestamp() uint64 {
return n.timestamp
}
func (n nodeResponse) GetMeta() []Meta {
res := make([]Meta, len(n.meta))
for i, value := range n.meta {
res[i] = value
}
return res
}
func (n nodeResponse) getValue(key string) string {
for _, value := range n.meta {
if value.key == key {
return string(value.value)
}
}
return ""
}
type ServiceClientMemory struct {
containers map[string]containerInfo
}
type containerInfo struct {
containerID cid.ID
trees map[string]memoryTree
}
type memoryTree struct {
idCounter uint64
treeData *treeNodeMemory
}
type treeNodeMemory struct {
data nodeResponse
parent *treeNodeMemory
children []*treeNodeMemory
}
func (t *treeNodeMemory) getNode(nodeID uint64) *treeNodeMemory {
if t.data.nodeID == nodeID {
return t
}
for _, child := range t.children {
if node := child.getNode(nodeID); node != nil {
return node
}
}
return nil
}
func (t *memoryTree) getNodesByPath(path []string) []nodeResponse {
if len(path) == 0 {
return nil
}
var res []nodeResponse
for _, child := range t.treeData.children {
res = child.listNodesByPath(res, path)
}
return res
}
func (t *treeNodeMemory) listNodesByPath(res []nodeResponse, path []string) []nodeResponse {
if len(path) == 0 || t.data.getValue(FileNameKey) != path[0] {
return res
}
if len(path) == 1 {
return append(res, t.data)
}
for _, ch := range t.children {
res = ch.listNodesByPath(res, path[1:])
}
return res
}
func (t *memoryTree) createPathIfNotExist(parent *treeNodeMemory, path []string) *treeNodeMemory {
if len(path) == 0 {
return parent
}
var node *treeNodeMemory
for _, child := range parent.children {
if len(child.data.meta) == 1 && child.data.getValue(FileNameKey) == path[0] {
node = child
break
}
}
if node == nil {
node = &treeNodeMemory{
data: nodeResponse{
meta: []nodeMeta{{key: FileNameKey, value: []byte(path[0])}},
nodeID: t.idCounter,
parentID: parent.data.nodeID,
timestamp: uint64(time.Now().UnixMicro()),
},
parent: parent,
}
t.idCounter++
parent.children = append(parent.children, node)
}
return t.createPathIfNotExist(node, path[1:])
}
func (t *treeNodeMemory) removeChild(nodeID uint64) {
ind := -1
for i, ch := range t.children {
if ch.data.nodeID == nodeID {
ind = i
break
}
}
if ind != -1 {
t.children = append(t.children[:ind], t.children[ind+1:]...)
}
}
func (t *treeNodeMemory) listNodes(res []NodeResponse, depth uint32) []NodeResponse {
res = append(res, t.data)
if depth == 0 {
return res
}
for _, ch := range t.children {
res = ch.listNodes(res, depth-1)
}
return res
}
func NewTreeServiceClientMemory() (*ServiceClientMemory, error) {
return &ServiceClientMemory{
containers: make(map[string]containerInfo),
}, nil
}
func (c *ServiceClientMemory) GetNodes(_ context.Context, p *GetNodesParams) ([]NodeResponse, error) {
cnr, ok := c.containers[p.ContainerID.EncodeToString()]
if !ok {
return nil, nil
}
tr, ok := cnr.trees[p.TreeID]
if !ok {
return nil, nil
}
res := tr.getNodesByPath(p.Path)
sort.Slice(res, func(i, j int) bool {
return res[i].timestamp < res[j].timestamp
})
if p.LatestOnly && len(res) != 0 {
res = res[len(res)-1:]
}
res2 := make([]NodeResponse, len(res))
for i, n := range res {
res2[i] = n
}
return res2, nil
}
func (c *ServiceClientMemory) GetSubTree(_ context.Context, containerID cid.ID, treeID string, rootID uint64, depth uint32) ([]NodeResponse, error) {
cnr, ok := c.containers[containerID.EncodeToString()]
if !ok {
return nil, nil
}
tr, ok := cnr.trees[treeID]
if !ok {
return nil, ErrNodeNotFound
}
sortNode(tr.treeData)
node := tr.treeData.getNode(rootID)
if node == nil {
return nil, ErrNodeNotFound
}
// we depth-1 in case of uint32 and 0 as mark to get all subtree leads to overflow and depth is getting quite big to walk all tree levels
return node.listNodes(nil, depth-1), nil
}
func newContainerInfo(containerID cid.ID, treeID string) containerInfo {
return containerInfo{
containerID: containerID,
trees: map[string]memoryTree{
treeID: {
idCounter: 1,
treeData: &treeNodeMemory{
data: nodeResponse{
timestamp: uint64(time.Now().UnixMicro()),
},
},
},
},
}
}
func newMemoryTree() memoryTree {
return memoryTree{
idCounter: 1,
dkirillov marked this conversation as resolved Outdated

Unused

Unused
treeData: &treeNodeMemory{
data: nodeResponse{
timestamp: uint64(time.Now().UnixMicro()),
},
},
}
}
func (c *ServiceClientMemory) AddNodeByPath(_ context.Context, containerID cid.ID, treeID string, path []string, meta map[string]string) (uint64, error) {
cnr, ok := c.containers[containerID.EncodeToString()]
if !ok {
cnr = newContainerInfo(containerID, treeID)
c.containers[containerID.EncodeToString()] = cnr
}
tr, ok := cnr.trees[treeID]
if !ok {
tr = newMemoryTree()
cnr.trees[treeID] = tr
}
parentNode := tr.createPathIfNotExist(tr.treeData, path)
if parentNode == nil {
return 0, fmt.Errorf("create path '%s'", path)
}
newID := tr.idCounter
tr.idCounter++
tn := &treeNodeMemory{
data: nodeResponse{
meta: metaToNodeMeta(meta),
nodeID: newID,
parentID: parentNode.data.nodeID,
timestamp: uint64(time.Now().UnixMicro()),
},
parent: parentNode,
}
parentNode.children = append(parentNode.children, tn)
cnr.trees[treeID] = tr
return newID, nil
}
func sortNode(node *treeNodeMemory) {
if node == nil {
return
}
sortNodes(node.children)
for _, child := range node.children {
sortNode(child)
}
}
func sortNodes(list []*treeNodeMemory) {
sort.Slice(list, func(i, j int) bool {
return list[i].data.getValue(FileNameKey) < list[j].data.getValue(FileNameKey)
})
}
func (c *ServiceClientMemory) RemoveNode(_ context.Context, containerID cid.ID, treeID string, nodeID uint64) error {
cnr, ok := c.containers[containerID.EncodeToString()]
if !ok {
return ErrNodeNotFound
}
tr, ok := cnr.trees[treeID]
if !ok {
return ErrNodeNotFound
}
node := tr.treeData.getNode(nodeID)
if node == nil {
return ErrNodeNotFound
}
node.parent.removeChild(nodeID)
return nil
}
func metaToNodeMeta(m map[string]string) []nodeMeta {
result := make([]nodeMeta, 0, len(m))
for key, value := range m {
result = append(result, nodeMeta{key: key, value: []byte(value)})
}
return result
}

View file

@ -0,0 +1,232 @@
package tree
import (
"context"
"testing"
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
"github.com/google/uuid"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zaptest"
)
func TestGetObjectByPath(t *testing.T) {
ctx := context.Background()
memCli, err := NewTreeServiceClientMemory()
require.NoError(t, err)
treeService := NewTree(memCli, zaptest.NewLogger(t))
cidTest := cidtest.ID()
oidTest1 := oidtest.ID()
oidTest2 := oidtest.ID()
testSize1 := uint64(10)
testSize2 := uint64(20)
nodeID1, err := treeService.AddObject(ctx, cidTest, "/a/b", oidTest1, testSize1)
require.NoError(t, err)
nodeID2, err := treeService.AddObject(ctx, cidTest, "/a/b/c/d", oidTest2, testSize2)
require.NoError(t, err)
node1, err := treeService.GetObjectByPath(ctx, cidTest, "/a/b")
require.NoError(t, err)
require.Equal(t, nodeID1, node1.ID)
require.Equal(t, oidTest1, node1.ObjID)
require.Equal(t, testSize1, node1.PayloadSize)
node2, err := treeService.GetObjectByPath(ctx, cidTest, "/a/b/c/d")
require.NoError(t, err)
require.Equal(t, nodeID2, node2.ID)
require.Equal(t, oidTest2, node2.ObjID)
require.Equal(t, testSize2, node2.PayloadSize)
_, err = treeService.GetObjectByPath(ctx, cidTest, "/g")
require.ErrorIs(t, err, ErrNodeNotFound)
_, err = treeService.GetObjectByPath(ctx, cidTest, "/a/b/c")
require.ErrorIs(t, err, ErrNodeNotFound)
}
func TestGetObjectByPathDir(t *testing.T) {
ctx := context.Background()
memCli, err := NewTreeServiceClientMemory()
require.NoError(t, err)
treeService := NewTree(memCli, zaptest.NewLogger(t))
cidTest := cidtest.ID()
oidTest1 := oidtest.ID()
oidTest2 := oidtest.ID()
testSize1 := uint64(10)
testSize2 := uint64(20)
nodeID1, err := treeService.AddObject(ctx, cidTest, "/a/b", oidTest1, testSize1)
require.NoError(t, err)
nodeID2, err := treeService.AddObject(ctx, cidTest, "/a/b/c/d", oidTest2, testSize2)
require.NoError(t, err)
node1, err := treeService.GetObjectByPath(ctx, cidTest, "/a/b")
require.NoError(t, err)
require.Equal(t, nodeID1, node1.ID)
require.Equal(t, oidTest1, node1.ObjID)
require.Equal(t, testSize1, node1.PayloadSize)
node2, err := treeService.GetObjectByPath(ctx, cidTest, "/a/b/c/d")
require.NoError(t, err)
require.Equal(t, nodeID2, node2.ID)
require.Equal(t, oidTest2, node2.ObjID)
require.Equal(t, testSize2, node2.PayloadSize)
_, err = treeService.GetObjectByPathDir(ctx, cidTest, "/g")
require.ErrorIs(t, err, ErrNodeNotFound)
_, err = treeService.GetObjectByPathDir(ctx, cidTest, "/a/b/c")
require.ErrorIs(t, err, ErrOnlyDirFound)
}
func TestGetListObjectByPrefix(t *testing.T) {
ctx := context.Background()
memCli, err := NewTreeServiceClientMemory()
require.NoError(t, err)
treeService := NewTree(memCli, zaptest.NewLogger(t))
cidTest := cidtest.ID()
oidTest1 := oidtest.ID()
oidTest2 := oidtest.ID()
oidTest3 := oidtest.ID()
oidTest4 := oidtest.ID()
testSize1 := uint64(10)
testSize2 := uint64(20)
testSize3 := uint64(30)
testSize4 := uint64(40)
_, err = treeService.AddObject(ctx, cidTest, "/a/b", oidTest1, testSize1)
require.NoError(t, err)
nodeID2, err := treeService.AddObject(ctx, cidTest, "/a/b/c", oidTest2, testSize2)
require.NoError(t, err)
nodeID3, err := treeService.AddObject(ctx, cidTest, "/a/b/c/d", oidTest3, testSize3)
require.NoError(t, err)
nodeID4, err := treeService.AddObject(ctx, cidTest, "/a/b/c/d/e", oidTest4, testSize4)
require.NoError(t, err)
nodes, err := treeService.GetListObjectByPrefix(ctx, cidTest, "/a/b")
require.NoError(t, err)
require.Equal(t, nodeID2, nodes[0].ID)
require.Equal(t, nodeID3, nodes[1].ID)
require.Equal(t, nodeID4, nodes[2].ID)
require.Equal(t, testSize2, nodes[0].PayloadSize)
require.Equal(t, testSize3, nodes[1].PayloadSize)
require.Equal(t, testSize4, nodes[2].PayloadSize)
nodes, err = treeService.GetListObjectByPrefix(ctx, cidTest, "/g/s")
require.NoError(t, err)
require.Equal(t, 0, len(nodes))
}
func TestGetListOIDBySplitID(t *testing.T) {
ctx := context.Background()
memCli, err := NewTreeServiceClientMemory()
require.NoError(t, err)
treeService := NewTree(memCli, zaptest.NewLogger(t))
cidTest := cidtest.ID()
oidTest1 := oidtest.ID()
oidTest2 := oidtest.ID()
oidTest3 := oidtest.ID()
splitID := object.NewSplitID()
splitID.SetUUID(uuid.New())
_, err = treeService.AddPHYObject(ctx, cidTest, "/a/b", oidTest1, splitID)
require.NoError(t, err)
_, err = treeService.AddPHYObject(ctx, cidTest, "/a/b", oidTest2, splitID)
require.NoError(t, err)
_, err = treeService.AddPHYObject(ctx, cidTest, "/a/b", oidTest3, splitID)
require.NoError(t, err)
ids, err := treeService.GetListOIDBySplitID(ctx, cidTest, "/a/b", splitID)
require.NoError(t, err)
require.Equal(t, oidTest1, ids[0])
require.Equal(t, oidTest2, ids[1])
require.Equal(t, oidTest3, ids[2])
ids, err = treeService.GetListOIDBySplitID(ctx, cidTest, "/c/d", splitID)
require.NoError(t, err)
require.Equal(t, 0, len(ids))
}
func TestDeleteObject(t *testing.T) {
ctx := context.Background()
memCli, err := NewTreeServiceClientMemory()
require.NoError(t, err)
treeService := NewTree(memCli, zaptest.NewLogger(t))
cidTest := cidtest.ID()
oidTest1 := oidtest.ID()
testSize := uint64(10)
nodeID1, err := treeService.AddObject(ctx, cidTest, "/a/b", oidTest1, testSize)
require.NoError(t, err)
err = treeService.DeleteObject(ctx, cidTest, nodeID1)
require.NoError(t, err)
_, err = treeService.GetObjectByPath(ctx, cidTest, "/a/b")
require.ErrorIs(t, err, ErrNodeNotFound)
err = treeService.DeleteObject(ctx, cidTest, nodeID1+1)
require.ErrorIs(t, err, ErrNodeNotFound)
}
func TestDeleteObjectsBySplitID(t *testing.T) {
ctx := context.Background()
memCli, err := NewTreeServiceClientMemory()
require.NoError(t, err)
treeService := NewTree(memCli, zaptest.NewLogger(t))
cidTest := cidtest.ID()
oidTest1 := oidtest.ID()
oidTest2 := oidtest.ID()
oidTest3 := oidtest.ID()
splitID := object.NewSplitID()
splitID.SetUUID(uuid.New())
_, err = treeService.AddPHYObject(ctx, cidTest, "/a/b", oidTest1, splitID)
require.NoError(t, err)
_, err = treeService.AddPHYObject(ctx, cidTest, "/a/b", oidTest2, splitID)
require.NoError(t, err)
_, err = treeService.AddPHYObject(ctx, cidTest, "/a/b", oidTest3, splitID)
require.NoError(t, err)
ids, err := treeService.GetListOIDBySplitID(ctx, cidTest, "/a/b", splitID)
require.NoError(t, err)
require.Equal(t, 3, len(ids))
err = treeService.DeleteObjectsBySplitID(ctx, cidTest, "/a/b", splitID)
require.NoError(t, err)
ids, err = treeService.GetListOIDBySplitID(ctx, cidTest, "/a/b", splitID)
require.NoError(t, err)
require.Equal(t, 0, len(ids))
ids, err = treeService.GetListOIDBySplitID(ctx, cidTest, "/c/d", splitID)
require.NoError(t, err)
require.Equal(t, 0, len(ids))
}

View file

@ -0,0 +1,178 @@
package wrapper
import (
"context"
"errors"
"fmt"
"io"
"strings"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
treepool "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool/tree"
grpcService "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool/tree/service"
dkirillov marked this conversation as resolved Outdated

It seems there isn't such package

It seems there isn't such package
"github.com/distribution/distribution/v3/registry/storage/driver/frostfs/tree"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
type GetNodeByPathResponseInfoWrapper struct {
response *grpcService.GetNodeByPathResponse_Info
}
func (n GetNodeByPathResponseInfoWrapper) GetNodeID() uint64 {
return n.response.GetNodeId()
}
func (n GetNodeByPathResponseInfoWrapper) GetParentID() uint64 {
return n.response.GetParentId()
}
func (n GetNodeByPathResponseInfoWrapper) GetTimestamp() uint64 {
return n.response.GetTimestamp()
}
func (n GetNodeByPathResponseInfoWrapper) GetMeta() []tree.Meta {
res := make([]tree.Meta, len(n.response.Meta))
for i, value := range n.response.Meta {
res[i] = value
}
return res
}
type GetSubTreeResponseBodyWrapper struct {
response *grpcService.GetSubTreeResponse_Body
}
func (n GetSubTreeResponseBodyWrapper) GetNodeID() uint64 {
return n.response.GetNodeId()
}
func (n GetSubTreeResponseBodyWrapper) GetParentID() uint64 {
return n.response.GetParentId()
}
func (n GetSubTreeResponseBodyWrapper) GetTimestamp() uint64 {
return n.response.GetTimestamp()
}
func (n GetSubTreeResponseBodyWrapper) GetMeta() []tree.Meta {
res := make([]tree.Meta, len(n.response.Meta))
for i, value := range n.response.Meta {
res[i] = value
}
return res
}
type PoolWrapper struct {
p *treepool.Pool
}
func NewPoolWrapper(p *treepool.Pool) *PoolWrapper {
return &PoolWrapper{p: p}
}
func (w *PoolWrapper) GetNodes(ctx context.Context, prm *tree.GetNodesParams) ([]tree.NodeResponse, error) {
poolPrm := treepool.GetNodesParams{
CID: prm.ContainerID,
TreeID: prm.TreeID,
Path: prm.Path,
Meta: prm.Meta,
PathAttribute: tree.FileNameKey,
LatestOnly: prm.LatestOnly,
AllAttrs: prm.AllAttrs,
}
nodes, err := w.p.GetNodes(ctx, poolPrm)
if err != nil {
return nil, handleError(err)
}
res := make([]tree.NodeResponse, len(nodes))
for i, info := range nodes {
res[i] = GetNodeByPathResponseInfoWrapper{info}
}
return res, nil
}
func (w *PoolWrapper) GetSubTree(ctx context.Context, containerID cid.ID, treeID string, rootID uint64, depth uint32) ([]tree.NodeResponse, error) {
poolPrm := treepool.GetSubTreeParams{
CID: containerID,
TreeID: treeID,
RootID: rootID,
Depth: depth,
}
subTreeReader, err := w.p.GetSubTree(ctx, poolPrm)
if err != nil {
return nil, handleError(err)
}
var subtree []tree.NodeResponse
node, err := subTreeReader.Next()
for err == nil {
subtree = append(subtree, GetSubTreeResponseBodyWrapper{node})
node, err = subTreeReader.Next()
}
if err != nil && err != io.EOF {
return nil, handleError(err)
}
return subtree, nil
}
func (w *PoolWrapper) AddNodeByPath(ctx context.Context, containerID cid.ID, treeID string, path []string, meta map[string]string) (uint64, error) {
nodeID, err := w.p.AddNodeByPath(ctx, treepool.AddNodeByPathParams{
CID: containerID,
TreeID: treeID,
Path: path,
Meta: meta,
PathAttribute: tree.FileNameKey,
})
return nodeID, handleError(err)
}
func (w *PoolWrapper) RemoveNode(ctx context.Context, containerID cid.ID, treeID string, nodeID uint64) error {
return handleError(w.p.RemoveNode(ctx, treepool.RemoveNodeParams{
CID: containerID,
TreeID: treeID,
NodeID: nodeID,
}))
}
func handleError(err error) error {
if err == nil {
return nil
}
if errors.Is(err, treepool.ErrNodeNotFound) {
return fmt.Errorf("%w: %s", tree.ErrNodeNotFound, err.Error())
}
if errors.Is(err, treepool.ErrNodeAccessDenied) {
return fmt.Errorf("%w: %s", tree.ErrNodeAccessDenied, err.Error())
}
if isTimeoutError(err) {
return fmt.Errorf("%w: %s", tree.ErrGatewayTimeout, err.Error())
}
return err
}
func unwrapErr(err error) error {
unwrappedErr := errors.Unwrap(err)
for unwrappedErr != nil {
err = unwrappedErr
unwrappedErr = errors.Unwrap(err)
}
return err
}
func isTimeoutError(err error) bool {
if strings.Contains(err.Error(), "timeout") ||
errors.Is(err, context.DeadlineExceeded) {
return true
}
return status.Code(unwrapErr(err)) == codes.DeadlineExceeded
}