feature/integrate_tree_service #8
89
go.mod
|
@ -37,6 +37,11 @@ require (
|
||||||
)
|
)
|
||||||
|
|
||||||
require (
|
require (
|
||||||
|
github.com/benbjohnson/clock v1.1.0 // indirect
|
||||||
|
cloud.google.com/go v0.110.0 // indirect
|
||||||
|
cloud.google.com/go/compute v1.19.1 // indirect
|
||||||
|
cloud.google.com/go/compute/metadata v0.2.3 // indirect
|
||||||
|
cloud.google.com/go/iam v0.13.0 // indirect
|
||||||
dario.cat/mergo v1.0.0 // indirect
|
dario.cat/mergo v1.0.0 // indirect
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.15.1-0.20230802075510-964c3edb3f44 // indirect
|
git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.15.1-0.20230802075510-964c3edb3f44 // indirect
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-contract v0.0.0-20230307110621-19a8ef2d02fb // indirect
|
git.frostfs.info/TrueCloudLab/frostfs-contract v0.0.0-20230307110621-19a8ef2d02fb // indirect
|
||||||
|
@ -44,23 +49,47 @@ require (
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20230825064515-46a214d065f8
|
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20230825064515-46a214d065f8
|
||||||
git.frostfs.info/TrueCloudLab/hrw v1.2.1 // indirect
|
git.frostfs.info/TrueCloudLab/hrw v1.2.1 // indirect
|
||||||
git.frostfs.info/TrueCloudLab/rfc6979 v0.4.0 // indirect
|
git.frostfs.info/TrueCloudLab/rfc6979 v0.4.0 // indirect
|
||||||
|
github.com/Azure/azure-sdk-for-go/sdk/internal v1.3.0 // indirect
|
||||||
github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 // indirect
|
github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 // indirect
|
||||||
|
github.com/AzureAD/microsoft-authentication-library-for-go v1.0.0 // indirect
|
||||||
github.com/Microsoft/go-winio v0.6.1 // indirect
|
github.com/Microsoft/go-winio v0.6.1 // indirect
|
||||||
github.com/antlr4-go/antlr/v4 v4.13.0 // indirect
|
github.com/antlr4-go/antlr/v4 v4.13.0 // indirect
|
||||||
|
github.com/beorn7/perks v1.0.1 // indirect
|
||||||
|
github.com/bitly/go-simplejson v0.5.0 // indirect
|
||||||
|
github.com/bugsnag/osext v0.0.0-20130617224835-0dd3f918b21b // indirect
|
||||||
|
github.com/bugsnag/panicwrap v0.0.0-20151223152923-e2c28503fcd0 // indirect
|
||||||
github.com/cenkalti/backoff/v4 v4.2.1 // indirect
|
github.com/cenkalti/backoff/v4 v4.2.1 // indirect
|
||||||
|
github.com/cespare/xxhash/v2 v2.2.0 // indirect
|
||||||
github.com/containerd/containerd v1.7.3 // indirect
|
github.com/containerd/containerd v1.7.3 // indirect
|
||||||
github.com/cpuguy83/dockercfg v0.3.1 // indirect
|
github.com/cpuguy83/dockercfg v0.3.1 // indirect
|
||||||
github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect
|
github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect
|
||||||
|
github.com/cyphar/filepath-securejoin v0.2.3 // indirect
|
||||||
github.com/davecgh/go-spew v1.1.1 // indirect
|
github.com/davecgh/go-spew v1.1.1 // indirect
|
||||||
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 // indirect
|
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 // indirect
|
||||||
github.com/docker/distribution v2.8.2+incompatible // indirect
|
github.com/docker/distribution v2.8.2+incompatible // indirect
|
||||||
github.com/docker/docker v24.0.5+incompatible // indirect
|
github.com/docker/docker v24.0.5+incompatible // indirect
|
||||||
github.com/docker/go-connections v0.4.0 // indirect
|
github.com/docker/go-connections v0.4.0 // indirect
|
||||||
github.com/docker/go-units v0.5.0 // indirect
|
github.com/docker/go-units v0.5.0 // indirect
|
||||||
|
github.com/felixge/httpsnoop v1.0.1 // indirect
|
||||||
github.com/gogo/protobuf v1.3.2 // indirect
|
github.com/gogo/protobuf v1.3.2 // indirect
|
||||||
|
github.com/golang-jwt/jwt/v4 v4.5.0 // indirect
|
||||||
|
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
|
||||||
|
github.com/golang/protobuf v1.5.3 // indirect
|
||||||
|
github.com/google/go-cmp v0.5.9 // indirect
|
||||||
|
github.com/google/uuid v1.3.0
|
||||||
|
github.com/googleapis/enterprise-certificate-proxy v0.2.3 // indirect
|
||||||
|
github.com/googleapis/gax-go/v2 v2.7.1 // indirect
|
||||||
github.com/gorilla/websocket v1.5.0 // indirect
|
github.com/gorilla/websocket v1.5.0 // indirect
|
||||||
github.com/hashicorp/golang-lru v0.6.0 // indirect
|
github.com/hashicorp/golang-lru v0.6.0 // indirect
|
||||||
|
github.com/hashicorp/golang-lru/v2 v2.0.5 // indirect
|
||||||
|
github.com/inconshreveable/mousetrap v1.0.1 // indirect
|
||||||
|
github.com/jmespath/go-jmespath v0.4.0 // indirect
|
||||||
|
github.com/kr/pretty v0.3.0 // indirect
|
||||||
|
github.com/kr/text v0.2.0 // indirect
|
||||||
|
github.com/kylelemons/godebug v1.1.0 // indirect
|
||||||
github.com/magiconair/properties v1.8.7 // indirect
|
github.com/magiconair/properties v1.8.7 // indirect
|
||||||
|
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
|
||||||
|
github.com/mitchellh/osext v0.0.0-20151018003038-5e2d6d41470f // indirect
|
||||||
github.com/moby/patternmatcher v0.5.0 // indirect
|
github.com/moby/patternmatcher v0.5.0 // indirect
|
||||||
github.com/moby/sys/sequential v0.5.0 // indirect
|
github.com/moby/sys/sequential v0.5.0 // indirect
|
||||||
github.com/moby/term v0.5.0 // indirect
|
github.com/moby/term v0.5.0 // indirect
|
||||||
|
@ -70,63 +99,35 @@ require (
|
||||||
github.com/nspcc-dev/neo-go/pkg/interop v0.0.0-20230615193820-9185820289ce // indirect
|
github.com/nspcc-dev/neo-go/pkg/interop v0.0.0-20230615193820-9185820289ce // indirect
|
||||||
github.com/nspcc-dev/rfc6979 v0.2.0 // indirect
|
github.com/nspcc-dev/rfc6979 v0.2.0 // indirect
|
||||||
github.com/opencontainers/runc v1.1.5 // indirect
|
github.com/opencontainers/runc v1.1.5 // indirect
|
||||||
|
github.com/pkg/browser v0.0.0-20210911075715-681adbf594b8 // indirect
|
||||||
github.com/pkg/errors v0.9.1 // indirect
|
github.com/pkg/errors v0.9.1 // indirect
|
||||||
github.com/pmezard/go-difflib v1.0.0 // indirect
|
github.com/pmezard/go-difflib v1.0.0 // indirect
|
||||||
|
github.com/prometheus/client_golang v1.14.0 // indirect; updated to latest
|
||||||
|
github.com/prometheus/client_model v0.3.0 // indirect
|
||||||
|
github.com/prometheus/common v0.37.0 // indirect
|
||||||
|
github.com/prometheus/procfs v0.8.0 // indirect
|
||||||
github.com/rogpeppe/go-internal v1.8.1 // indirect
|
github.com/rogpeppe/go-internal v1.8.1 // indirect
|
||||||
github.com/russross/blackfriday/v2 v2.1.0 // indirect
|
github.com/russross/blackfriday/v2 v2.1.0 // indirect
|
||||||
|
github.com/spf13/pflag v1.0.5 // indirect
|
||||||
github.com/twmb/murmur3 v1.1.8 // indirect
|
github.com/twmb/murmur3 v1.1.8 // indirect
|
||||||
github.com/urfave/cli v1.22.12 // indirect
|
github.com/urfave/cli v1.22.12 // indirect
|
||||||
|
go.opencensus.io v0.24.0 // indirect
|
||||||
go.uber.org/atomic v1.10.0 // indirect
|
go.uber.org/atomic v1.10.0 // indirect
|
||||||
go.uber.org/multierr v1.11.0 // indirect
|
go.uber.org/multierr v1.11.0 // indirect
|
||||||
go.uber.org/zap v1.24.0 // indirect
|
go.uber.org/zap v1.24.0
|
||||||
golang.org/x/exp v0.0.0-20230515195305-f3d0a9c9a5cc // indirect
|
golang.org/x/exp v0.0.0-20230515195305-f3d0a9c9a5cc // indirect
|
||||||
golang.org/x/mod v0.9.0 // indirect
|
golang.org/x/mod v0.9.0 // indirect
|
||||||
|
golang.org/x/net v0.10.0 // indirect; updated for CVE-2022-27664, CVE-2022-41717
|
||||||
golang.org/x/sync v0.2.0 // indirect
|
golang.org/x/sync v0.2.0 // indirect
|
||||||
|
golang.org/x/sys v0.11.0 // indirect
|
||||||
|
golang.org/x/text v0.9.0 // indirect
|
||||||
golang.org/x/tools v0.7.0 // indirect
|
golang.org/x/tools v0.7.0 // indirect
|
||||||
|
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
|
||||||
|
google.golang.org/appengine v1.6.7 // indirect
|
||||||
|
google.golang.org/genproto v0.0.0-20230526161137-0005af68ea54 // indirect
|
||||||
google.golang.org/genproto/googleapis/api v0.0.0-20230525234035-dd9d682886f9 // indirect
|
google.golang.org/genproto/googleapis/api v0.0.0-20230525234035-dd9d682886f9 // indirect
|
||||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20230525234030-28d5490b6b19 // indirect
|
google.golang.org/genproto/googleapis/rpc v0.0.0-20230525234030-28d5490b6b19 // indirect
|
||||||
|
google.golang.org/grpc v1.57.0
|
||||||
|
google.golang.org/protobuf v1.30.0 // indirect
|
||||||
gopkg.in/yaml.v3 v3.0.1 // indirect
|
gopkg.in/yaml.v3 v3.0.1 // indirect
|
||||||
cloud.google.com/go v0.110.0 // indirect
|
|
||||||
cloud.google.com/go/compute v1.19.1 // indirect
|
|
||||||
cloud.google.com/go/compute/metadata v0.2.3 // indirect
|
|
||||||
cloud.google.com/go/iam v0.13.0 // indirect
|
|
||||||
github.com/Azure/azure-sdk-for-go/sdk/internal v1.3.0 // indirect
|
|
||||||
github.com/AzureAD/microsoft-authentication-library-for-go v1.0.0 // indirect
|
|
||||||
github.com/beorn7/perks v1.0.1 // indirect
|
|
||||||
github.com/bitly/go-simplejson v0.5.0 // indirect
|
|
||||||
github.com/bugsnag/osext v0.0.0-20130617224835-0dd3f918b21b // indirect
|
|
||||||
github.com/bugsnag/panicwrap v0.0.0-20151223152923-e2c28503fcd0 // indirect
|
|
||||||
github.com/cespare/xxhash/v2 v2.2.0 // indirect
|
|
||||||
github.com/cyphar/filepath-securejoin v0.2.3 // indirect
|
|
||||||
github.com/felixge/httpsnoop v1.0.1 // indirect
|
|
||||||
github.com/golang-jwt/jwt/v4 v4.5.0 // indirect
|
|
||||||
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
|
|
||||||
github.com/golang/protobuf v1.5.3 // indirect
|
|
||||||
github.com/google/go-cmp v0.5.9 // indirect
|
|
||||||
github.com/google/uuid v1.3.0
|
|
||||||
github.com/googleapis/enterprise-certificate-proxy v0.2.3 // indirect
|
|
||||||
github.com/googleapis/gax-go/v2 v2.7.1 // indirect
|
|
||||||
github.com/hashicorp/golang-lru/v2 v2.0.5 // indirect
|
|
||||||
github.com/inconshreveable/mousetrap v1.0.1 // indirect
|
|
||||||
github.com/jmespath/go-jmespath v0.4.0 // indirect
|
|
||||||
github.com/kr/pretty v0.3.0 // indirect
|
|
||||||
github.com/kr/text v0.2.0 // indirect
|
|
||||||
github.com/kylelemons/godebug v1.1.0 // indirect
|
|
||||||
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
|
|
||||||
github.com/mitchellh/osext v0.0.0-20151018003038-5e2d6d41470f // indirect
|
|
||||||
github.com/pkg/browser v0.0.0-20210911075715-681adbf594b8 // indirect
|
|
||||||
github.com/prometheus/client_golang v1.14.0 // indirect; updated to latest
|
|
||||||
github.com/prometheus/client_model v0.3.0 // indirect
|
|
||||||
github.com/prometheus/common v0.37.0 // indirect
|
|
||||||
github.com/prometheus/procfs v0.8.0 // indirect
|
|
||||||
github.com/spf13/pflag v1.0.5 // indirect
|
|
||||||
go.opencensus.io v0.24.0 // indirect
|
|
||||||
golang.org/x/net v0.10.0 // indirect; updated for CVE-2022-27664, CVE-2022-41717
|
|
||||||
golang.org/x/sys v0.11.0 // indirect
|
|
||||||
golang.org/x/text v0.9.0 // indirect
|
|
||||||
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
|
|
||||||
google.golang.org/appengine v1.6.7 // indirect
|
|
||||||
google.golang.org/genproto v0.0.0-20230526161137-0005af68ea54 // indirect
|
|
||||||
google.golang.org/grpc v1.57.0 // indirect
|
|
||||||
google.golang.org/protobuf v1.30.0 // indirect
|
|
||||||
)
|
)
|
||||||
|
|
|
@ -83,7 +83,7 @@ func newSizeLimiterWriter(ctx context.Context, d *driver, path string, splitInfo
|
||||||
previous: formPreviousChain(splitInfo, parts),
|
previous: formPreviousChain(splitInfo, parts),
|
||||||
parentHashers: parentHashers,
|
parentHashers: parentHashers,
|
||||||
targetInit: func() transformer.ChunkedObjectWriter {
|
targetInit: func() transformer.ChunkedObjectWriter {
|
||||||
return d.newObjTarget()
|
return d.newObjTarget(path, splitInfo.SplitID())
|
||||||
},
|
},
|
||||||
parent: parent,
|
parent: parent,
|
||||||
}
|
}
|
||||||
|
@ -233,6 +233,11 @@ func (w *writer) release(withParent bool) (*transformer.AccessIdentifiers, error
|
||||||
if _, err = w.release(false); err != nil {
|
if _, err = w.release(false); err != nil {
|
||||||
return nil, fmt.Errorf("could not release linking object: %w", err)
|
return nil, fmt.Errorf("could not release linking object: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
err = w.driver.treeService.DeleteObjectsBySplitID(w.ctx, w.driver.containerID, w.path, w.splitInfo.SplitID())
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("could not delete split objects: %w", err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return ids, nil
|
return ids, nil
|
||||||
|
@ -459,5 +464,10 @@ func (w *writer) deleteParts() error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
err := w.driver.treeService.DeleteObjectsBySplitID(w.ctx, w.driver.containerID, w.path, w.splitInfo.SplitID())
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("could not delete split objects: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,9 +3,11 @@ package frostfs
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"crypto/ecdsa"
|
"crypto/ecdsa"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
"regexp"
|
||||||
"sort"
|
"sort"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
|
@ -18,12 +20,15 @@ import (
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/transformer"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/transformer"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
|
||||||
|
treepool "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool/tree"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/version"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/version"
|
||||||
dcontext "github.com/distribution/distribution/v3/context"
|
dcontext "github.com/distribution/distribution/v3/context"
|
||||||
storagedriver "github.com/distribution/distribution/v3/registry/storage/driver"
|
storagedriver "github.com/distribution/distribution/v3/registry/storage/driver"
|
||||||
"github.com/distribution/distribution/v3/registry/storage/driver/base"
|
"github.com/distribution/distribution/v3/registry/storage/driver/base"
|
||||||
"github.com/distribution/distribution/v3/registry/storage/driver/factory"
|
"github.com/distribution/distribution/v3/registry/storage/driver/factory"
|
||||||
|
"github.com/distribution/distribution/v3/registry/storage/driver/frostfs/tree"
|
||||||
|
"github.com/distribution/distribution/v3/registry/storage/driver/frostfs/wrapper"
|
||||||
"github.com/nspcc-dev/neo-go/cli/flags"
|
"github.com/nspcc-dev/neo-go/cli/flags"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/wallet"
|
"github.com/nspcc-dev/neo-go/pkg/wallet"
|
||||||
)
|
)
|
||||||
|
@ -56,6 +61,11 @@ const (
|
||||||
defaultSessionExpirationDuration = 100 // in epoch
|
defaultSessionExpirationDuration = 100 // in epoch
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
filePathValidateRegex = regexp.MustCompile(`^/([^/]+/)*[^/]+$`)
|
||||||
|
prefixPathValidateRegex = regexp.MustCompile(`^/([^/]+/?)*$`)
|
||||||
|
)
|
||||||
|
|
||||||
// DriverParameters is a struct that encapsulates all of the driver parameters after all values have been set.
|
// DriverParameters is a struct that encapsulates all of the driver parameters after all values have been set.
|
||||||
type DriverParameters struct {
|
type DriverParameters struct {
|
||||||
ContainerID string
|
ContainerID string
|
||||||
|
@ -94,6 +104,7 @@ func (n *frostfsDriverFactory) Create(parameters map[string]interface{}) (storag
|
||||||
|
|
||||||
type driver struct {
|
type driver struct {
|
||||||
sdkPool *pool.Pool
|
sdkPool *pool.Pool
|
||||||
|
treeService *tree.Tree
|
||||||
owner *user.ID
|
owner *user.ID
|
||||||
key *ecdsa.PrivateKey
|
key *ecdsa.PrivateKey
|
||||||
containerID cid.ID
|
containerID cid.ID
|
||||||
|
@ -306,7 +317,7 @@ func New(params DriverParameters) (*Driver, error) {
|
||||||
var owner user.ID
|
var owner user.ID
|
||||||
user.IDFromKey(&owner, acc.PrivateKey().PrivateKey.PublicKey)
|
user.IDFromKey(&owner, acc.PrivateKey().PrivateKey.PublicKey)
|
||||||
|
|
||||||
sdkPool, err := createPool(ctx, acc, params)
|
sdkPool, treePool, err := createPool(ctx, acc, params)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("couldn't create sdk pool: %w", err)
|
return nil, fmt.Errorf("couldn't create sdk pool: %w", err)
|
||||||
}
|
}
|
||||||
|
@ -321,8 +332,11 @@ func New(params DriverParameters) (*Driver, error) {
|
||||||
return nil, fmt.Errorf("couldn't get container id: %w", err)
|
return nil, fmt.Errorf("couldn't get container id: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
treeService := wrapper.NewPoolWrapper(treePool)
|
||||||
|
|
||||||
d := &driver{
|
d := &driver{
|
||||||
sdkPool: sdkPool,
|
sdkPool: sdkPool,
|
||||||
|
treeService: tree.NewTree(treeService, nil),
|
||||||
owner: &owner,
|
owner: &owner,
|
||||||
key: &acc.PrivateKey().PrivateKey,
|
key: &acc.PrivateKey().PrivateKey,
|
||||||
containerID: cnrID,
|
containerID: cnrID,
|
||||||
|
@ -376,28 +390,49 @@ func getContainerID(params DriverParameters) (cid.ID, error) {
|
||||||
return cnrID, nil
|
return cnrID, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func createPool(ctx context.Context, acc *wallet.Account, param DriverParameters) (*pool.Pool, error) {
|
func createPool(ctx context.Context, acc *wallet.Account, param DriverParameters) (*pool.Pool, *treepool.Pool, error) {
|
||||||
var prm pool.InitParameters
|
var prm pool.InitParameters
|
||||||
|
var prmTree treepool.InitParameters
|
||||||
|
|
||||||
prm.SetKey(&acc.PrivateKey().PrivateKey)
|
prm.SetKey(&acc.PrivateKey().PrivateKey)
|
||||||
|
prmTree.SetKey(acc.PrivateKey())
|
||||||
|
|
||||||
prm.SetNodeDialTimeout(param.ConnectionTimeout)
|
prm.SetNodeDialTimeout(param.ConnectionTimeout)
|
||||||
|
prmTree.SetNodeDialTimeout(param.ConnectionTimeout)
|
||||||
|
|
||||||
prm.SetHealthcheckTimeout(param.RequestTimeout)
|
prm.SetHealthcheckTimeout(param.RequestTimeout)
|
||||||
|
prmTree.SetHealthcheckTimeout(param.RequestTimeout)
|
||||||
|
|
||||||
prm.SetClientRebalanceInterval(param.RebalanceInterval)
|
prm.SetClientRebalanceInterval(param.RebalanceInterval)
|
||||||
|
prmTree.SetClientRebalanceInterval(param.RebalanceInterval)
|
||||||
|
|
||||||
prm.SetSessionExpirationDuration(param.SessionExpirationDuration)
|
prm.SetSessionExpirationDuration(param.SessionExpirationDuration)
|
||||||
|
|
||||||
for _, peer := range param.Peers {
|
for _, peer := range param.Peers {
|
||||||
prm.AddNode(pool.NewNodeParam(peer.Priority, peer.Address, peer.Weight))
|
nodeParam := pool.NewNodeParam(peer.Priority, peer.Address, peer.Weight)
|
||||||
|
prm.AddNode(nodeParam)
|
||||||
|
prmTree.AddNode(nodeParam)
|
||||||
}
|
}
|
||||||
|
|
||||||
p, err := pool.NewPool(prm)
|
p, err := pool.NewPool(prm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("create pool: %w", err)
|
return nil, nil, fmt.Errorf("create pool: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err = p.Dial(ctx); err != nil {
|
if err = p.Dial(ctx); err != nil {
|
||||||
return nil, fmt.Errorf("dial pool: %w", err)
|
return nil, nil, fmt.Errorf("dial pool: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return p, nil
|
treePool, err := treepool.NewPool(prmTree)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, fmt.Errorf("create tree pool: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = treePool.Dial(ctx); err != nil {
|
||||||
|
return nil, nil, fmt.Errorf("dial tree pool: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return p, treePool, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func createNnsResolver(params DriverParameters) (*resolver.NNS, error) {
|
func createNnsResolver(params DriverParameters) (*resolver.NNS, error) {
|
||||||
|
@ -468,23 +503,34 @@ func (d *driver) Name() string {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *driver) GetContent(ctx context.Context, path string) ([]byte, error) {
|
func (d *driver) GetContent(ctx context.Context, path string) ([]byte, error) {
|
||||||
id, err := d.searchOne(ctx, path)
|
if ok := validateFilePath(path); !ok {
|
||||||
|
return nil, storagedriver.InvalidPathError{Path: path}
|
||||||
|
}
|
||||||
|
|
||||||
|
treeObj, err := d.treeService.GetObjectByPath(ctx, d.containerID, path)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
if errors.Is(err, tree.ErrNodeNotFound) {
|
||||||
|
return nil, storagedriver.PathNotFoundError{Path: path, DriverName: driverName}
|
||||||
|
}
|
||||||
|
return nil, fmt.Errorf("couldn't get object from tree: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
var prm pool.PrmObjectGet
|
var prm pool.PrmObjectGet
|
||||||
prm.SetAddress(d.objectAddress(id))
|
prm.SetAddress(d.objectAddress(treeObj.ObjID))
|
||||||
|
|
||||||
obj, err := d.sdkPool.GetObject(ctx, prm)
|
obj, err := d.sdkPool.GetObject(ctx, prm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("couldn't get object '%s': %w", id, err)
|
return nil, fmt.Errorf("couldn't get object '%s': %w", treeObj.ObjID, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return io.ReadAll(obj.Payload)
|
return io.ReadAll(obj.Payload)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *driver) PutContent(ctx context.Context, path string, content []byte) error {
|
func (d *driver) PutContent(ctx context.Context, path string, content []byte) error {
|
||||||
|
if ok := validateFilePath(path); !ok {
|
||||||
|
return storagedriver.InvalidPathError{Path: path}
|
||||||
|
}
|
||||||
|
|
||||||
if err := d.Delete(ctx, path); err != nil {
|
if err := d.Delete(ctx, path); err != nil {
|
||||||
return fmt.Errorf("couldn't delete '%s': %s", path, err)
|
return fmt.Errorf("couldn't delete '%s': %s", path, err)
|
||||||
}
|
}
|
||||||
|
@ -495,34 +541,35 @@ func (d *driver) PutContent(ctx context.Context, path string, content []byte) er
|
||||||
var prm pool.PrmObjectPut
|
var prm pool.PrmObjectPut
|
||||||
prm.SetHeader(*obj)
|
prm.SetHeader(*obj)
|
||||||
|
|
||||||
if _, err := d.sdkPool.PutObject(ctx, prm); err != nil {
|
id, err := d.sdkPool.PutObject(ctx, prm)
|
||||||
|
if err != nil {
|
||||||
return fmt.Errorf("couldn't put object '%s': %w", path, err)
|
return fmt.Errorf("couldn't put object '%s': %w", path, err)
|
||||||
}
|
}
|
||||||
|
_, err = d.treeService.AddObject(ctx, d.containerID, path, id, uint64(len(content)))
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("can't add object to tree: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *driver) Reader(ctx context.Context, path string, offset int64) (io.ReadCloser, error) {
|
func (d *driver) Reader(ctx context.Context, path string, offset int64) (io.ReadCloser, error) {
|
||||||
id, err := d.searchOne(ctx, path)
|
if ok := validateFilePath(path); !ok {
|
||||||
|
return nil, storagedriver.InvalidPathError{Path: path}
|
||||||
|
}
|
||||||
|
|
||||||
|
treeObj, err := d.treeService.GetObjectByPath(ctx, d.containerID, path)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, fmt.Errorf("couldn't get object from tree: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
addr := d.objectAddress(id)
|
addr := d.objectAddress(treeObj.ObjID)
|
||||||
|
|
||||||
var prmHead pool.PrmObjectHead
|
if uint64(offset) >= treeObj.PayloadSize {
|
||||||
prmHead.SetAddress(addr)
|
return nil, fmt.Errorf("invalid offset %d for object length %d", offset, treeObj.PayloadSize)
|
||||||
|
|
||||||
obj, err := d.sdkPool.HeadObject(ctx, prmHead)
|
|
||||||
alexvanin marked this conversation as resolved
Outdated
|
|||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("couldn't head object '%s', id '%s': %w", path, id, err)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if uint64(offset) >= obj.PayloadSize() {
|
length := treeObj.PayloadSize - uint64(offset)
|
||||||
return nil, fmt.Errorf("invalid offset %d for object length %d", offset, obj.PayloadSize())
|
|
||||||
}
|
|
||||||
|
|
||||||
length := obj.PayloadSize() - uint64(offset)
|
|
||||||
|
|
||||||
var prmRange pool.PrmObjectRange
|
var prmRange pool.PrmObjectRange
|
||||||
prmRange.SetAddress(addr)
|
prmRange.SetAddress(addr)
|
||||||
|
@ -532,7 +579,7 @@ func (d *driver) Reader(ctx context.Context, path string, offset int64) (io.Read
|
||||||
res, err := d.sdkPool.ObjectRange(ctx, prmRange)
|
res, err := d.sdkPool.ObjectRange(ctx, prmRange)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("couldn't get payload range of object '%s', offset %d, length %d, id '%s': %w",
|
return nil, fmt.Errorf("couldn't get payload range of object '%s', offset %d, length %d, id '%s': %w",
|
||||||
path, offset, length, id, err)
|
path, offset, length, treeObj.ObjID, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return &res, nil
|
return &res, nil
|
||||||
|
@ -543,6 +590,10 @@ func getUploadUUID(ctx context.Context) (uuid string) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *driver) Writer(ctx context.Context, path string, append bool) (storagedriver.FileWriter, error) {
|
func (d *driver) Writer(ctx context.Context, path string, append bool) (storagedriver.FileWriter, error) {
|
||||||
|
if ok := validateFilePath(path); !ok {
|
||||||
|
return nil, storagedriver.InvalidPathError{Path: path}
|
||||||
|
}
|
||||||
|
|
||||||
splitID := object.NewSplitID()
|
splitID := object.NewSplitID()
|
||||||
uploadUUID := getUploadUUID(ctx)
|
uploadUUID := getUploadUUID(ctx)
|
||||||
if err := splitID.Parse(uploadUUID); err != nil {
|
if err := splitID.Parse(uploadUUID); err != nil {
|
||||||
|
@ -591,56 +642,42 @@ func (d *driver) Stat(ctx context.Context, path string) (storagedriver.FileInfo,
|
||||||
return newFileInfoDir(path), nil
|
return newFileInfoDir(path), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
id, ok, err := d.searchOneBase(ctx, path, true)
|
if ok := validatePrefixPath(path); !ok {
|
||||||
dkirillov marked this conversation as resolved
Outdated
dkirillov
commented
Methods Methods `d.searchOneBase` and `d.searchOne` are not used anymore. Can be dropped.
|
|||||||
|
return nil, storagedriver.InvalidPathError{Path: path}
|
||||||
|
}
|
||||||
|
path = preparePrefixPath(path)
|
||||||
|
|
||||||
|
treeObj, err := d.treeService.GetObjectByPathDir(ctx, d.containerID, path)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
if errors.Is(err, tree.ErrNodeNotFound) {
|
||||||
|
return nil, storagedriver.PathNotFoundError{Path: path, DriverName: driverName}
|
||||||
|
} else if errors.Is(err, tree.ErrOnlyDirFound) {
|
||||||
|
return newFileInfoDir(path), nil
|
||||||
|
}
|
||||||
|
return nil, fmt.Errorf("couldn't get object from tree: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// assume there is not object with directory name
|
fileInfo := newFileInfo(*treeObj, "")
|
||||||
// e.g. if file '/a/b/c' exists, files '/a/b' and '/a' don't
|
|
||||||
if !ok {
|
|
||||||
return newFileInfoDir(path), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
var prm pool.PrmObjectHead
|
|
||||||
prm.SetAddress(d.objectAddress(id))
|
|
||||||
|
|
||||||
obj, err := d.sdkPool.HeadObject(ctx, prm)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("couldn't get head object '%s': %w", id, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
fileInfo := newFileInfo(ctx, obj, "")
|
|
||||||
|
|
||||||
// e.g. search '/a/b' but because of prefix search we found '/a/b/c'
|
|
||||||
// so we should return directory
|
|
||||||
if fileInfo.Path() != path {
|
|
||||||
return newFileInfoDir(path), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
return fileInfo, nil
|
return fileInfo, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *driver) List(ctx context.Context, path string) ([]string, error) {
|
func (d *driver) List(ctx context.Context, path string) ([]string, error) {
|
||||||
ids, err := d.searchByPrefix(ctx, path)
|
if ok := validatePrefixPath(path); !ok {
|
||||||
|
return nil, storagedriver.InvalidPathError{Path: path}
|
||||||
|
}
|
||||||
|
path = preparePrefixPath(path)
|
||||||
|
|
||||||
|
treeObjList, err := d.treeService.GetListObjectByPrefix(ctx, d.containerID, path)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("couldn't search by prefix '%s': %w", path, err)
|
return nil, fmt.Errorf("couldn't get object from tree by prefix '%s': %w", path, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
added := make(map[string]bool)
|
added := make(map[string]bool)
|
||||||
|
|
||||||
result := make([]string, 0, len(ids))
|
result := make([]string, 0, len(treeObjList))
|
||||||
for _, id := range ids {
|
for _, treeObj := range treeObjList {
|
||||||
var prm pool.PrmObjectHead
|
fileInf := newFileInfo(treeObj, path)
|
||||||
prm.SetAddress(d.objectAddress(id))
|
|
||||||
|
|
||||||
obj, err := d.sdkPool.HeadObject(ctx, prm)
|
|
||||||
if err != nil {
|
|
||||||
dcontext.GetLogger(ctx).Warnf("couldn't get list object '%s' in path '%s': %s", id, path, err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
fileInf := newFileInfo(ctx, obj, path)
|
|
||||||
if !added[fileInf.Path()] {
|
if !added[fileInf.Path()] {
|
||||||
result = append(result, fileInf.Path())
|
result = append(result, fileInf.Path())
|
||||||
added[fileInf.Path()] = true
|
added[fileInf.Path()] = true
|
||||||
|
@ -652,106 +689,64 @@ func (d *driver) List(ctx context.Context, path string) ([]string, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *driver) Move(ctx context.Context, sourcePath string, destPath string) error {
|
func (d *driver) Move(ctx context.Context, sourcePath string, destPath string) error {
|
||||||
alexvanin marked this conversation as resolved
Outdated
alexvanin
commented
As far as I see, move operation does not modify object payload. I guess it is sufficient to change tree node in tree service and do not fetch and reupload object into storage. Is it correct? As far as I see, move operation does not modify object payload. I guess it is sufficient to change tree node in tree service and do not fetch and reupload object into storage. Is it correct?
dkirillov
commented
Currently we store filepath in object attributes, so we have to reupload object to change this attribute Currently we store filepath in object attributes, so we have to reupload object to change this attribute
alexvanin
commented
Aside of restoring tree in case of pilorama faillure on all container nodes, do we need filepath attribute in the object? Aside of restoring tree in case of pilorama faillure on all container nodes, do we need filepath attribute in the object?
dkirillov
commented
Probably not Probably not
alexvanin
commented
We've decided to keep filepath attribute in the object in case of global tree failures. At the same time, We've decided to keep filepath attribute in the object in case of global tree failures. At the same time, `Move` operation will not update filepath attribute, therefore will not reupload object.
|
|||||||
sourceID, err := d.searchOne(ctx, sourcePath)
|
if ok := validateFilePath(sourcePath); !ok {
|
||||||
|
return storagedriver.InvalidPathError{Path: sourcePath}
|
||||||
|
}
|
||||||
|
|
||||||
|
if ok := validateFilePath(destPath); !ok {
|
||||||
|
return storagedriver.InvalidPathError{Path: destPath}
|
||||||
|
}
|
||||||
|
|
||||||
|
sourceTreeObj, err := d.treeService.GetObjectByPath(ctx, d.containerID, sourcePath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return fmt.Errorf("couldn't get object from tree: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err = d.Delete(ctx, destPath); err != nil {
|
_, err = d.treeService.AddObject(ctx, d.containerID, destPath, sourceTreeObj.ObjID, sourceTreeObj.PayloadSize)
|
||||||
return fmt.Errorf("couldn't delete '%s' object: %w", destPath, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
sourceAddr := d.objectAddress(sourceID)
|
|
||||||
|
|
||||||
var prmGet pool.PrmObjectGet
|
|
||||||
prmGet.SetAddress(sourceAddr)
|
|
||||||
|
|
||||||
obj, err := d.sdkPool.GetObject(ctx, prmGet)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not get source object '%s' by oid '%s': %w", sourcePath, sourceID, err)
|
return fmt.Errorf("can't add object to tree: %w", err)
|
||||||
}
|
|
||||||
defer func() {
|
|
||||||
if err = obj.Payload.Close(); err != nil {
|
|
||||||
dcontext.GetLogger(ctx).Errorf("couldn't close object payload reader, path '%s' by oid '%s': %s",
|
|
||||||
sourcePath, sourceID, err.Error())
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
objHeader := d.formObject(destPath)
|
|
||||||
var prmPut pool.PrmObjectPut
|
|
||||||
prmPut.SetHeader(*objHeader)
|
|
||||||
prmPut.SetPayload(obj.Payload)
|
|
||||||
|
|
||||||
if _, err = d.sdkPool.PutObject(ctx, prmPut); err != nil {
|
|
||||||
return fmt.Errorf("couldn't put object '%s': %w", destPath, err)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
var prmDelete pool.PrmObjectDelete
|
if err = d.treeService.DeleteObject(ctx, d.containerID, sourceTreeObj.ID); err != nil {
|
||||||
prmDelete.SetAddress(sourceAddr)
|
return fmt.Errorf("couldn't delete object from tree: %w", err)
|
||||||
|
|
||||||
if err = d.sdkPool.DeleteObject(ctx, prmDelete); err != nil {
|
|
||||||
return fmt.Errorf("couldn't remove source file '%s', id '%s': %w", sourcePath, sourceID, err)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *driver) Delete(ctx context.Context, path string) error {
|
func (d *driver) Delete(ctx context.Context, path string) error {
|
||||||
filters := object.NewSearchFilters()
|
if ok := validatePrefixPath(path); !ok {
|
||||||
filters.AddRootFilter()
|
return storagedriver.InvalidPathError{Path: path}
|
||||||
filters.AddFilter(attributeFilePath, path, object.MatchCommonPrefix)
|
}
|
||||||
|
path = preparePrefixPath(path)
|
||||||
|
|
||||||
var prmSearch pool.PrmObjectSearch
|
treeObjList, err := d.treeService.GetListObjectByPrefix(ctx, d.containerID, path)
|
||||||
prmSearch.SetContainerID(d.containerID)
|
|
||||||
prmSearch.SetFilters(filters)
|
|
||||||
|
|
||||||
res, err := d.sdkPool.SearchObjects(ctx, prmSearch)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("init searching using client: %w", err)
|
return fmt.Errorf("couldn't get object from tree by prefix '%s': %w", path, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
defer res.Close()
|
rootTreeObj, err := d.treeService.GetObjectByPath(ctx, d.containerID, path)
|
||||||
|
|
||||||
var inErr error
|
|
||||||
err = res.Iterate(func(id oid.ID) bool {
|
|
||||||
// Check if a key is a subpath (so that deleting "/a" does not delete "/ab").
|
|
||||||
isSubPath, err := d.checkIsSubPath(ctx, path, id)
|
|
||||||
if err != nil {
|
|
||||||
inErr = err
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
if isSubPath {
|
|
||||||
if err = d.delete(ctx, id); err != nil {
|
|
||||||
inErr = fmt.Errorf("couldn't delete object by path '%s': %w", path, err)
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return false
|
|
||||||
})
|
|
||||||
if err == nil {
|
|
||||||
err = inErr
|
|
||||||
}
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("iterate objects: %w", err)
|
if !errors.Is(err, tree.ErrNodeNotFound) {
|
||||||
|
return fmt.Errorf("couldn't get object from tree: %w", err)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
treeObjList = append(treeObjList, *rootTreeObj)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, treeObj := range treeObjList {
|
||||||
|
if err := d.delete(ctx, treeObj.ObjID); err != nil {
|
||||||
|
return fmt.Errorf("error in delete object oid: %s : %w", treeObj.ObjID, err)
|
||||||
|
}
|
||||||
|
|
||||||
dkirillov marked this conversation as resolved
Outdated
dkirillov
commented
I'm not sure this is correct because we get all object and I'm not sure this is correct because we get all object and `deleting "/a" does delete "/ab"` but must not.
It's better to get all nodes by the exact names and list sub tree if one of that node is intermediate.
r.loginov
commented
Not quite. In this case, when requesting deletion with the path Not quite. In this case, when requesting deletion with the path `/a`, the file `/ab` will not be deleted, because:
`GetListObjectByPrefix` will first find the root node (intermediate) and then take a subtree with no depth limit. Then a separate request is made to get a node for the `/a` file via `getObjectByPath` (in which `GetNodes` is called along the `/a` path). As a result, `/ab` will not be in the final list. But if, despite this, it is better to use your approach, no problem.
dkirillov
commented
Please check the following test:
Please check the following test:
```diff
diff --git a/registry/storage/driver/frostfs/frostfs.go b/registry/storage/driver/frostfs/frostfs.go
index 2d899ff9..ac32f6ad 100644
--- a/registry/storage/driver/frostfs/frostfs.go
+++ b/registry/storage/driver/frostfs/frostfs.go
@@ -17,6 +17,7 @@ import (
resolver "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/ns"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
+ oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/transformer"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
treepool "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool/tree"
@@ -527,11 +528,7 @@ func (d *driver) PutContent(ctx context.Context, path string, content []byte) er
var prm pool.PrmObjectPut
prm.SetHeader(*obj)
- id, err := d.sdkPool.PutObject(ctx, prm)
- if err != nil {
- return fmt.Errorf("couldn't put object '%s': %w", path, err)
- }
- _, err = d.treeService.AddObject(ctx, d.containerID, path, id, uint64(len(content)))
+ _, err := d.treeService.AddObject(ctx, d.containerID, path, oidtest.ID(), uint64(len(content)))
if err != nil {
return fmt.Errorf("can't add object to tree: %w", err)
}
@@ -690,10 +687,6 @@ func (d *driver) Delete(ctx context.Context, path string) error {
}
for _, treeObj := range treeObjList {
- if err := d.delete(ctx, treeObj.ObjID); err != nil {
- return fmt.Errorf("error in delete object oid: %s : %w", treeObj.ObjID, err)
- }
-
if err = d.treeService.DeleteObject(ctx, d.containerID, treeObj.ID); err != nil {
return fmt.Errorf("couldn't delete object from tree: %w", err)
}
diff --git a/registry/storage/driver/frostfs/frostfs_test.go b/registry/storage/driver/frostfs/frostfs_test.go
index 201c97af..ed8fc934 100644
--- a/registry/storage/driver/frostfs/frostfs_test.go
+++ b/registry/storage/driver/frostfs/frostfs_test.go
@@ -12,16 +12,19 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
+ cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
storagedriver "github.com/distribution/distribution/v3/registry/storage/driver"
+ "github.com/distribution/distribution/v3/registry/storage/driver/frostfs/tree"
"github.com/google/uuid"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/nspcc-dev/neo-go/pkg/wallet"
"github.com/stretchr/testify/require"
"github.com/testcontainers/testcontainers-go"
"github.com/testcontainers/testcontainers-go/wait"
+ "go.uber.org/zap/zaptest"
)
const (
@@ -44,6 +47,42 @@ func params(walletPath string, containerID cid.ID) map[string]interface{} {
}
}
+func TestDelete(t *testing.T) {
+ ctx := context.Background()
+
+ key, err := keys.NewPrivateKeyFromHex("1dd37fba80fec4e6a6f13fd708d8dcb3b29def768017052f6c930fa1c5d90bbb")
+ require.NoError(t, err)
+
+ var owner user.ID
+ user.IDFromKey(&owner, key.PrivateKey.PublicKey)
+
+ log := zaptest.NewLogger(t)
+
+ treeCli, err := tree.NewTreeServiceClientMemory()
+ require.NoError(t, err)
+
+ d := driver{
+ treeService: tree.NewTree(treeCli, log),
+ owner: &owner,
+ key: &key.PrivateKey,
+ containerID: cidtest.ID(),
+ }
+
+ err = d.PutContent(ctx, "a", []byte("test"))
+ require.NoError(t, err)
+
+ err = d.PutContent(ctx, "ab", []byte("test"))
+ require.NoError(t, err)
+
+ err = d.Delete(ctx, "a")
+ require.NoError(t, err)
+
+ res, err := d.List(ctx, "")
+ require.NoError(t, err)
+ fmt.Println(res)
+ require.Contains(t, res, "ab")
+}
+
func TestIntegration(t *testing.T) {
f, err := os.CreateTemp("", "wallet")
require.NoError(t, err)
diff --git a/registry/storage/driver/frostfs/tree/tree.go b/registry/storage/driver/frostfs/tree/tree.go
index a8e18337..0cccb787 100644
--- a/registry/storage/driver/frostfs/tree/tree.go
+++ b/registry/storage/driver/frostfs/tree/tree.go
@@ -196,6 +196,9 @@ func (c *Tree) GetListObjectByPrefix(ctx context.Context, containerID cid.ID, pr
nodes := make([]NodeResponse, 0)
for _, node := range subTree {
+ if node.GetNodeID() == 0 {
+ continue
+ }
if !isIntermediate(node) {
nodes = append(nodes, node)
}
```
r.loginov
commented
This test will not pass due to the fact that paths must start with a slash (distribution, when calling driver methods, always passes paths starting with a slash). But now I have added path validation to the driver methods. I also made a change regarding skipping the root node in An example of a test that works and considers the situation you described above:
This test will not pass due to the fact that paths must start with a slash (distribution, when calling driver methods, always passes paths starting with a slash). But now I have added path validation to the driver methods. I also made a change regarding skipping the root node in `GetListObjectByPrefix`.
An example of a test that works and considers the situation you described above:
```
func TestDelete(t *testing.T) {
ctx := context.Background()
key, err := keys.NewPrivateKeyFromHex("1dd37fba80fec4e6a6f13fd708d8dcb3b29def768017052f6c930fa1c5d90bbb")
require.NoError(t, err)
var owner user.ID
user.IDFromKey(&owner, key.PrivateKey.PublicKey)
treeCli, err := tree.NewTreeServiceClientMemory()
require.NoError(t, err)
d := &driver{
treeService: tree.NewTree(treeCli, nil),
owner: &owner,
key: &key.PrivateKey,
containerID: cidtest.ID(),
}
err = d.PutContent(ctx, "/a", []byte("test"))
require.NoError(t, err)
err = d.PutContent(ctx, "/ab", []byte("test"))
require.NoError(t, err)
err = d.Delete(ctx, "/a")
require.NoError(t, err)
res, err := d.List(ctx, "/")
require.NoError(t, err)
require.Contains(t, res, "/ab")
}
```
|
|||||||
|
if err = d.treeService.DeleteObject(ctx, d.containerID, treeObj.ID); err != nil {
|
||||||
|
return fmt.Errorf("couldn't delete object from tree: %w", err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *driver) checkIsSubPath(ctx context.Context, path string, id oid.ID) (bool, error) {
|
|
||||||
var prmHead pool.PrmObjectHead
|
|
||||||
prmHead.SetAddress(d.objectAddress(id))
|
|
||||||
|
|
||||||
obj, err := d.sdkPool.HeadObject(ctx, prmHead)
|
|
||||||
if err != nil {
|
|
||||||
return false, fmt.Errorf("couldn't head object part '%s', id '%s': %w", path, id, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
fileInf := newFileInfo(ctx, obj, "")
|
|
||||||
return fileInf.Path() <= path || fileInf.Path()[len(path)] == '/', nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (d *driver) delete(ctx context.Context, id oid.ID) error {
|
func (d *driver) delete(ctx context.Context, id oid.ID) error {
|
||||||
var prm pool.PrmObjectDelete
|
var prm pool.PrmObjectDelete
|
||||||
prm.SetAddress(d.objectAddress(id))
|
prm.SetAddress(d.objectAddress(id))
|
||||||
|
@ -770,166 +765,47 @@ func (d *driver) Walk(ctx context.Context, path string, fn storagedriver.WalkFn)
|
||||||
return storagedriver.WalkFallback(ctx, d, path, fn)
|
return storagedriver.WalkFallback(ctx, d, path, fn)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *driver) searchByPrefix(ctx context.Context, prefix string) ([]oid.ID, error) {
|
|
||||||
filters := object.NewSearchFilters()
|
|
||||||
filters.AddRootFilter()
|
|
||||||
filters.AddFilter(attributeFilePath, prefix, object.MatchCommonPrefix)
|
|
||||||
|
|
||||||
return d.baseSearch(ctx, filters)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (d *driver) headSplitParts(ctx context.Context, splitID *object.SplitID, path string, isAppend bool) ([]*object.Object, map[oid.ID]struct{}, error) {
|
func (d *driver) headSplitParts(ctx context.Context, splitID *object.SplitID, path string, isAppend bool) ([]*object.Object, map[oid.ID]struct{}, error) {
|
||||||
filters := object.NewSearchFilters()
|
ids, err := d.treeService.GetListOIDBySplitID(ctx, d.containerID, path, splitID)
|
||||||
filters.AddPhyFilter()
|
|
||||||
filters.AddSplitIDFilter(object.MatchStringEqual, splitID)
|
|
||||||
|
|
||||||
var prm pool.PrmObjectSearch
|
|
||||||
prm.SetContainerID(d.containerID)
|
|
||||||
prm.SetFilters(filters)
|
|
||||||
|
|
||||||
res, err := d.sdkPool.SearchObjects(ctx, prm)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, fmt.Errorf("init searching using client: %w", err)
|
return nil, nil, fmt.Errorf("couldn't get object from tree by split id '%s': %w", path, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
defer res.Close()
|
var (
|
||||||
|
addr oid.Address
|
||||||
|
prmHead pool.PrmObjectHead
|
||||||
|
objects []*object.Object
|
||||||
|
)
|
||||||
|
|
||||||
var addr oid.Address
|
|
||||||
addr.SetContainer(d.containerID)
|
addr.SetContainer(d.containerID)
|
||||||
|
|
||||||
var inErr error
|
|
||||||
var prmHead pool.PrmObjectHead
|
|
||||||
|
|
||||||
var objects []*object.Object
|
|
||||||
noChild := make(map[oid.ID]struct{})
|
noChild := make(map[oid.ID]struct{})
|
||||||
|
|
||||||
err = res.Iterate(func(id oid.ID) bool {
|
for _, objID := range ids {
|
||||||
addr.SetObject(id)
|
addr.SetObject(objID)
|
||||||
prmHead.SetAddress(addr)
|
prmHead.SetAddress(addr)
|
||||||
|
|
||||||
obj, err := d.sdkPool.HeadObject(ctx, prmHead)
|
obj, err := d.sdkPool.HeadObject(ctx, prmHead)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
inErr = fmt.Errorf("couldn't head object part '%s', id '%s', splitID '%s': %w", path, id, splitID, err)
|
return nil, nil, fmt.Errorf("couldn't head object part '%s', id '%s', splitID '%s': %w", path, objID, splitID, err)
|
||||||
return true
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if isAppend {
|
if isAppend {
|
||||||
objects = append(objects, &obj)
|
objects = append(objects, &obj)
|
||||||
noChild[id] = struct{}{}
|
noChild[objID] = struct{}{}
|
||||||
return false
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
inErr = fmt.Errorf("init upload part '%s' already exist, splitID '%s'", path, splitID)
|
return nil, nil, fmt.Errorf("init upload part '%s' already exist, splitID '%s'", path, splitID)
|
||||||
return true
|
|
||||||
})
|
|
||||||
if err == nil {
|
|
||||||
err = inErr
|
|
||||||
}
|
|
||||||
if err != nil {
|
|
||||||
return nil, nil, fmt.Errorf("iterate objects: %w", err)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return objects, noChild, nil
|
return objects, noChild, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *driver) baseSearch(ctx context.Context, filters object.SearchFilters) ([]oid.ID, error) {
|
func newFileInfo(treeObj tree.TreeNode, prefix string) storagedriver.FileInfo {
|
||||||
var prm pool.PrmObjectSearch
|
|
||||||
prm.SetContainerID(d.containerID)
|
|
||||||
prm.SetFilters(filters)
|
|
||||||
|
|
||||||
res, err := d.sdkPool.SearchObjects(ctx, prm)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("init searching using client: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
defer res.Close()
|
|
||||||
|
|
||||||
var buf []oid.ID
|
|
||||||
|
|
||||||
err = res.Iterate(func(id oid.ID) bool {
|
|
||||||
buf = append(buf, id)
|
|
||||||
return false
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("iterate objects: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return buf, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (d *driver) searchOne(ctx context.Context, path string) (oid.ID, error) {
|
|
||||||
id, ok, err := d.searchOneBase(ctx, path, false)
|
|
||||||
if err != nil {
|
|
||||||
return oid.ID{}, err
|
|
||||||
}
|
|
||||||
|
|
||||||
if !ok {
|
|
||||||
return oid.ID{}, fmt.Errorf("found more than one object by path '%s'", path)
|
|
||||||
}
|
|
||||||
|
|
||||||
return id, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (d *driver) searchOneBase(ctx context.Context, path string, byPrefix bool) (resID oid.ID, ok bool, err error) {
|
|
||||||
filters := object.NewSearchFilters()
|
|
||||||
filters.AddRootFilter()
|
|
||||||
if byPrefix {
|
|
||||||
filters.AddFilter(attributeFilePath, path, object.MatchCommonPrefix)
|
|
||||||
} else {
|
|
||||||
filters.AddFilter(attributeFilePath, path, object.MatchStringEqual)
|
|
||||||
}
|
|
||||||
|
|
||||||
var prm pool.PrmObjectSearch
|
|
||||||
prm.SetContainerID(d.containerID)
|
|
||||||
prm.SetFilters(filters)
|
|
||||||
|
|
||||||
res, err := d.sdkPool.SearchObjects(ctx, prm)
|
|
||||||
if err != nil {
|
|
||||||
return oid.ID{}, false, fmt.Errorf("init searching using client: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
defer res.Close()
|
|
||||||
|
|
||||||
var found bool
|
|
||||||
err = res.Iterate(func(id oid.ID) bool {
|
|
||||||
if found {
|
|
||||||
ok = false
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
found = true
|
|
||||||
resID = id
|
|
||||||
ok = true
|
|
||||||
return false
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
return oid.ID{}, false, fmt.Errorf("iterate objects by path '%s': %w", path, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if !found {
|
|
||||||
return oid.ID{}, false, storagedriver.PathNotFoundError{Path: path, DriverName: driverName}
|
|
||||||
}
|
|
||||||
|
|
||||||
return resID, ok, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func newFileInfo(ctx context.Context, obj object.Object, prefix string) storagedriver.FileInfo {
|
|
||||||
fileInfoFields := storagedriver.FileInfoFields{
|
fileInfoFields := storagedriver.FileInfoFields{
|
||||||
Size: int64(obj.PayloadSize()),
|
Path: treeObj.FullPath,
|
||||||
}
|
Size: int64(treeObj.PayloadSize),
|
||||||
|
ModTime: treeObj.ModificationTime,
|
||||||
for _, attr := range obj.Attributes() {
|
|
||||||
switch attr.Key() {
|
|
||||||
case attributeFilePath:
|
|
||||||
fileInfoFields.Path = attr.Value()
|
|
||||||
case object.AttributeTimestamp:
|
|
||||||
timestamp, err := strconv.ParseInt(attr.Value(), 10, 64)
|
|
||||||
if err != nil {
|
|
||||||
objID, _ := obj.ID()
|
|
||||||
dcontext.GetLogger(ctx).Warnf("object '%s' has invalid timestamp '%s'", objID.EncodeToString(), attr.Value())
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
fileInfoFields.ModTime = time.Unix(timestamp, 0)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(prefix) > 0 {
|
if len(prefix) > 0 {
|
||||||
|
@ -956,18 +832,39 @@ func newFileInfoDir(path string) storagedriver.FileInfo {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *driver) newObjTarget() transformer.ChunkedObjectWriter {
|
func validatePrefixPath(path string) bool {
|
||||||
|
return prefixPathValidateRegex.MatchString(path)
|
||||||
|
}
|
||||||
|
|
||||||
|
func validateFilePath(path string) bool {
|
||||||
|
return filePathValidateRegex.MatchString(path)
|
||||||
|
}
|
||||||
|
|
||||||
|
func preparePrefixPath(path string) string {
|
||||||
|
if strings.HasSuffix(path, "/") {
|
||||||
|
return path[:len(path)-1]
|
||||||
|
}
|
||||||
|
return path
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *driver) newObjTarget(path string, splitID *object.SplitID) transformer.ChunkedObjectWriter {
|
||||||
return &objTarget{
|
return &objTarget{
|
||||||
sdkPool: d.sdkPool,
|
sdkPool: d.sdkPool,
|
||||||
key: d.key,
|
treeService: d.treeService,
|
||||||
|
splitID: splitID,
|
||||||
|
key: d.key,
|
||||||
|
path: path,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
type objTarget struct {
|
type objTarget struct {
|
||||||
sdkPool *pool.Pool
|
sdkPool *pool.Pool
|
||||||
key *ecdsa.PrivateKey
|
treeService *tree.Tree
|
||||||
obj *object.Object
|
path string
|
||||||
chunks [][]byte
|
splitID *object.SplitID
|
||||||
|
key *ecdsa.PrivateKey
|
||||||
|
obj *object.Object
|
||||||
|
chunks [][]byte
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *objTarget) WriteHeader(_ context.Context, obj *object.Object) error {
|
func (t *objTarget) WriteHeader(_ context.Context, obj *object.Object) error {
|
||||||
|
@ -999,11 +896,14 @@ func (t *objTarget) Close(ctx context.Context) (*transformer.AccessIdentifiers,
|
||||||
t.obj.SetCreationEpoch(currEpoch)
|
t.obj.SetCreationEpoch(currEpoch)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
parID oid.ID
|
parID oid.ID
|
||||||
parHdr *object.Object
|
parHdr *object.Object
|
||||||
|
isCommit bool
|
||||||
|
sizeFinalObject uint64
|
||||||
)
|
)
|
||||||
|
|
||||||
if par := t.obj.Parent(); par != nil && par.Signature() == nil {
|
if par := t.obj.Parent(); par != nil && par.Signature() == nil {
|
||||||
|
isCommit = true
|
||||||
objPar := object.NewFromV2(par.ToV2())
|
objPar := object.NewFromV2(par.ToV2())
|
||||||
|
|
||||||
objPar.SetCreationEpoch(currEpoch)
|
objPar.SetCreationEpoch(currEpoch)
|
||||||
|
@ -1013,6 +913,7 @@ func (t *objTarget) Close(ctx context.Context) (*transformer.AccessIdentifiers,
|
||||||
}
|
}
|
||||||
|
|
||||||
parID, _ = objPar.ID()
|
parID, _ = objPar.ID()
|
||||||
|
sizeFinalObject = objPar.PayloadSize()
|
||||||
|
|
||||||
t.obj.SetParent(objPar)
|
t.obj.SetParent(objPar)
|
||||||
}
|
}
|
||||||
|
@ -1029,12 +930,28 @@ func (t *objTarget) Close(ctx context.Context) (*transformer.AccessIdentifiers,
|
||||||
|
|
||||||
var prm pool.PrmObjectPut
|
var prm pool.PrmObjectPut
|
||||||
prm.SetHeader(*t.obj)
|
prm.SetHeader(*t.obj)
|
||||||
|
id, err := t.sdkPool.PutObject(ctx, prm)
|
||||||
_, err = t.sdkPool.PutObject(ctx, prm)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("couldn't put part: %w", err)
|
return nil, fmt.Errorf("couldn't put part: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
containerID, ok := t.obj.ContainerID()
|
||||||
|
if !ok {
|
||||||
|
return nil, fmt.Errorf("error in get container id from object")
|
||||||
|
}
|
||||||
|
|
||||||
|
if isCommit {
|
||||||
|
_, err := t.treeService.AddObject(ctx, containerID, t.path, parID, sizeFinalObject)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("can't add object to tree: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = t.treeService.AddPHYObject(ctx, containerID, t.path, id, t.splitID)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("can't add phy object to tree: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
objID, _ := t.obj.ID()
|
objID, _ := t.obj.ID()
|
||||||
|
|
||||||
return &transformer.AccessIdentifiers{
|
return &transformer.AccessIdentifiers{
|
||||||
|
|
|
@ -7,6 +7,7 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"os"
|
"os"
|
||||||
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -75,8 +76,7 @@ func TestIntegration(t *testing.T) {
|
||||||
rootCtx := context.Background()
|
rootCtx := context.Background()
|
||||||
aioImage := "truecloudlab/frostfs-aio:"
|
aioImage := "truecloudlab/frostfs-aio:"
|
||||||
versions := []string{
|
versions := []string{
|
||||||
"1.2.7",
|
"1.3.0",
|
||||||
"1.2.8",
|
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, aioVersion := range versions {
|
for _, aioVersion := range versions {
|
||||||
|
@ -214,6 +214,7 @@ func testWriteRead(rootCtx context.Context, t *testing.T, drvr storagedriver.Sto
|
||||||
|
|
||||||
func testList(rootCtx context.Context, t *testing.T, drvr storagedriver.StorageDriver, version string) {
|
func testList(rootCtx context.Context, t *testing.T, drvr storagedriver.StorageDriver, version string) {
|
||||||
ctx, path := formCtxAndPath(rootCtx, version)
|
ctx, path := formCtxAndPath(rootCtx, version)
|
||||||
|
path = "/list" + path
|
||||||
|
|
||||||
fileWriter, err := drvr.Writer(ctx, path, false)
|
fileWriter, err := drvr.Writer(ctx, path, false)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
@ -229,7 +230,7 @@ func testList(rootCtx context.Context, t *testing.T, drvr storagedriver.StorageD
|
||||||
err = fileWriter.Commit()
|
err = fileWriter.Commit()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
res, err := drvr.List(ctx, path)
|
res, err := drvr.List(ctx, getParentDirPath(path))
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Len(t, res, 1)
|
require.Len(t, res, 1)
|
||||||
require.Contains(t, res, path)
|
require.Contains(t, res, path)
|
||||||
|
@ -340,3 +341,12 @@ func createContainer(ctx context.Context, t *testing.T, clientPool *pool.Pool, o
|
||||||
|
|
||||||
return cnrID
|
return cnrID
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func getParentDirPath(path string) string {
|
||||||
|
lastSlashIndex := strings.LastIndex(path, "/")
|
||||||
|
if lastSlashIndex != -1 {
|
||||||
|
path = path[:lastSlashIndex]
|
||||||
|
}
|
||||||
|
|
||||||
|
return path
|
||||||
|
}
|
||||||
|
|
410
registry/storage/driver/frostfs/tree/tree.go
Normal file
|
@ -0,0 +1,410 @@
|
||||||
|
package tree
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"strconv"
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
|
"go.uber.org/zap"
|
||||||
|
)
|
||||||
|
|
||||||
|
type (
|
||||||
|
Tree struct {
|
||||||
|
service ServiceClient
|
||||||
|
log *zap.Logger
|
||||||
|
}
|
||||||
|
|
||||||
|
// ServiceClient is a client to interact with tree service.
|
||||||
|
// Each method must return ErrNodeNotFound or ErrNodeAccessDenied if relevant.
|
||||||
|
ServiceClient interface {
|
||||||
|
GetNodes(ctx context.Context, p *GetNodesParams) ([]NodeResponse, error)
|
||||||
|
GetSubTree(ctx context.Context, containerID cid.ID, treeID string, rootID uint64, depth uint32) ([]NodeResponse, error)
|
||||||
dkirillov marked this conversation as resolved
Outdated
dkirillov
commented
This is unused method (some others too). Can we drop it then? This is unused method (some others too). Can we drop it then?
|
|||||||
|
AddNodeByPath(ctx context.Context, containerID cid.ID, treeID string, path []string, meta map[string]string) (uint64, error)
|
||||||
|
RemoveNode(ctx context.Context, containerID cid.ID, treeID string, nodeID uint64) error
|
||||||
|
}
|
||||||
|
|
||||||
|
SubTreeStream interface {
|
||||||
|
Next() (NodeResponse, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
TreeNode struct {
|
||||||
|
ID uint64
|
||||||
|
ParentID uint64
|
||||||
|
ObjID oid.ID
|
||||||
|
TimeStamp uint64
|
||||||
|
ModificationTime time.Time
|
||||||
|
PayloadSize uint64
|
||||||
|
FullPath string
|
||||||
|
Meta map[string]string
|
||||||
|
}
|
||||||
|
|
||||||
|
GetNodesParams struct {
|
||||||
|
ContainerID cid.ID
|
||||||
|
TreeID string
|
||||||
|
Path []string
|
||||||
|
Meta []string
|
||||||
|
LatestOnly bool
|
||||||
|
AllAttrs bool
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
type Meta interface {
|
||||||
|
GetKey() string
|
||||||
|
GetValue() []byte
|
||||||
|
}
|
||||||
|
|
||||||
|
type NodeResponse interface {
|
||||||
|
GetMeta() []Meta
|
||||||
|
GetNodeID() uint64
|
||||||
|
GetParentID() uint64
|
||||||
|
GetTimestamp() uint64
|
||||||
|
}
|
||||||
|
|
||||||
|
const (
|
||||||
|
FileNameKey = "FileName"
|
||||||
|
oidKV = "OID"
|
||||||
|
splitIDKV = "SplitID"
|
||||||
|
payloadSize = "PayloadSize"
|
||||||
|
fullPath = "FullPath"
|
||||||
|
modificationTime = "ModificationTime"
|
||||||
dkirillov marked this conversation as resolved
Outdated
dkirillov
commented
@alexvanin Did we decide use the same tree as for s3 objects? @alexvanin Did we decide use the same tree as for s3 objects?
alexvanin
commented
I think so, but there is no strong opinion on this. I think so, but there is no strong opinion on this.
dkirillov
commented
I have some concerns about using the same container/bucket in both ways: via s3 and distribution I have some concerns about using the same container/bucket in both ways: via s3 and distribution
alexvanin
commented
Decided to use different tree names for distribution. Decided to use different tree names for distribution.
|
|||||||
|
|
||||||
|
objectTree = "distribution-object"
|
||||||
|
splitTree = "distribution-split"
|
||||||
dkirillov marked this conversation as resolved
Outdated
dkirillov
commented
Can we name this more distribution specific? Can we name this more distribution specific?
Like `distribution-object`, `distribution-split` for example
|
|||||||
|
|
||||||
|
separator = "/"
|
||||||
|
|
||||||
|
maxDepth = 0
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
// ErrNodeNotFound is returned from ServiceClient in case of not found error.
|
||||||
|
ErrNodeNotFound = errors.New("not found")
|
||||||
|
|
||||||
|
ErrOnlyDirFound = errors.New("only directory found")
|
||||||
|
|
||||||
|
// ErrNodeAccessDenied is returned from ServiceClient service in case of access denied error.
|
||||||
|
ErrNodeAccessDenied = errors.New("access denied")
|
||||||
|
|
||||||
|
// ErrGatewayTimeout is returned from ServiceClient service in case of timeout error.
|
||||||
|
ErrGatewayTimeout = errors.New("gateway timeout")
|
||||||
|
)
|
||||||
|
|
||||||
|
func NewTree(service ServiceClient, log *zap.Logger) *Tree {
|
||||||
|
return &Tree{
|
||||||
|
service: service,
|
||||||
|
log: log,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Tree) GetObjectByPath(ctx context.Context, containerID cid.ID, path string) (*TreeNode, error) {
|
||||||
|
newPath := pathFromName(path)
|
||||||
|
p := &GetNodesParams{
|
||||||
|
ContainerID: containerID,
|
||||||
|
TreeID: objectTree,
|
||||||
|
Path: newPath,
|
||||||
|
LatestOnly: false,
|
||||||
|
AllAttrs: true,
|
||||||
|
}
|
||||||
|
nodes, err := c.service.GetNodes(ctx, p)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
var result NodeResponse
|
||||||
|
|
||||||
|
switch {
|
||||||
|
case len(nodes) == 0 || len(nodes) == 1 && isIntermediate(nodes[0]):
|
||||||
|
return nil, ErrNodeNotFound
|
||||||
|
case len(nodes) > 2:
|
||||||
|
return nil, fmt.Errorf("found more than two nodes")
|
||||||
|
default:
|
||||||
|
result = nodes[0]
|
||||||
|
if len(nodes) == 2 && isIntermediate(result) {
|
||||||
|
result = nodes[1]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
treeNode, err := newTreeNode(result)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("error parse tree node: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return treeNode, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Tree) GetObjectByPathDir(ctx context.Context, containerID cid.ID, path string) (*TreeNode, error) {
|
||||||
|
newPath := pathFromName(path)
|
||||||
|
p := &GetNodesParams{
|
||||||
|
ContainerID: containerID,
|
||||||
|
TreeID: objectTree,
|
||||||
|
Path: newPath,
|
||||||
|
LatestOnly: false,
|
||||||
|
AllAttrs: true,
|
||||||
|
}
|
||||||
|
nodes, err := c.service.GetNodes(ctx, p)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
var result NodeResponse
|
||||||
|
|
||||||
|
switch {
|
||||||
|
case len(nodes) == 0:
|
||||||
|
return nil, ErrNodeNotFound
|
||||||
|
case len(nodes) == 1 && isIntermediate(nodes[0]):
|
||||||
|
return nil, ErrOnlyDirFound
|
||||||
|
case len(nodes) > 2:
|
||||||
|
return nil, fmt.Errorf("found more than two nodes")
|
||||||
|
default:
|
||||||
|
result = nodes[0]
|
||||||
|
if len(nodes) == 2 && isIntermediate(result) {
|
||||||
|
result = nodes[1]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
treeNode, err := newTreeNode(result)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("error parse tree node: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return treeNode, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Tree) GetListObjectByPrefix(ctx context.Context, containerID cid.ID, prefixPath string) ([]TreeNode, error) {
|
||||||
|
rootID, err := c.determinePrefixNode(ctx, containerID, objectTree, prefixPath)
|
||||||
|
if err != nil {
|
||||||
|
if errors.Is(err, ErrNodeNotFound) {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
subTree, err := c.service.GetSubTree(ctx, containerID, objectTree, rootID, maxDepth)
|
||||||
|
if err != nil {
|
||||||
|
if errors.Is(err, ErrNodeNotFound) {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
nodes := make([]NodeResponse, 0)
|
||||||
|
|
||||||
|
for _, node := range subTree {
|
||||||
|
if node.GetNodeID() == rootID {
|
||||||
dkirillov marked this conversation as resolved
Outdated
dkirillov
commented
Actually we have to check for
Actually we have to check for `rootID`:
```golang
if node.GetNodeID() == rootID {
```
|
|||||||
|
continue
|
||||||
|
}
|
||||||
|
if !isIntermediate(node) {
|
||||||
|
nodes = append(nodes, node)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
result := make([]TreeNode, 0, len(nodes))
|
||||||
|
for _, v := range nodes {
|
||||||
|
node, err := newTreeNode(v)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("error parse tree node: %w", err)
|
||||||
|
}
|
||||||
|
result = append(result, *node)
|
||||||
|
}
|
||||||
|
|
||||||
|
return result, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Tree) GetListOIDBySplitID(ctx context.Context, containerID cid.ID, path string, splitID *object.SplitID) ([]oid.ID, error) {
|
||||||
|
nodes, err := c.getNodesBySplitID(ctx, containerID, path, splitID)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
ids := make([]oid.ID, len(nodes))
|
||||||
|
for i, node := range nodes {
|
||||||
|
ids[i] = node.ObjID
|
||||||
|
}
|
||||||
|
|
||||||
|
return ids, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Tree) AddObject(ctx context.Context, containerID cid.ID, filePath string, oid oid.ID, size uint64) (uint64, error) {
|
||||||
|
path := pathFromName(filePath)
|
||||||
|
meta := map[string]string{
|
||||||
|
oidKV: oid.String(),
|
||||||
|
FileNameKey: path[len(path)-1],
|
||||||
|
payloadSize: strconv.FormatUint(size, 10),
|
||||||
|
fullPath: filePath,
|
||||||
|
modificationTime: strconv.FormatInt(time.Now().Unix(), 10),
|
||||||
|
}
|
||||||
|
|
||||||
|
nodeID, err := c.service.AddNodeByPath(ctx, containerID, objectTree, path[:len(path)-1], meta)
|
||||||
|
|
||||||
|
return nodeID, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Tree) AddPHYObject(ctx context.Context, containerID cid.ID, filePath string, oid oid.ID, splitID *object.SplitID) (uint64, error) {
|
||||||
|
meta := map[string]string{
|
||||||
|
oidKV: oid.String(),
|
||||||
|
splitIDKV: splitID.String(),
|
||||||
|
}
|
||||||
|
|
||||||
|
nodeID, err := c.service.AddNodeByPath(ctx, containerID, splitTree, pathFromName(filePath), meta)
|
||||||
|
|
||||||
|
return nodeID, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Tree) DeleteObject(ctx context.Context, containerID cid.ID, nodeID uint64) error {
|
||||||
|
return c.service.RemoveNode(ctx, containerID, objectTree, nodeID)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Tree) DeleteObjectsBySplitID(ctx context.Context, containerID cid.ID, path string, splitID *object.SplitID) error {
|
||||||
|
splitNodes, err := c.getNodesBySplitID(ctx, containerID, path, splitID)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, node := range splitNodes {
|
||||||
|
err = c.service.RemoveNode(ctx, containerID, splitTree, node.ID)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Tree) determinePrefixNode(ctx context.Context, containerID cid.ID, treeID, prefixPath string) (uint64, error) {
|
||||||
|
var rootID uint64
|
||||||
|
path := pathFromName(prefixPath)
|
||||||
|
|
||||||
|
if len(path) > 1 {
|
||||||
|
var err error
|
||||||
|
rootID, err = c.getPrefixNodeID(ctx, containerID, treeID, path)
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return rootID, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Tree) getPrefixNodeID(ctx context.Context, containerID cid.ID, treeID string, prefixPath []string) (uint64, error) {
|
||||||
|
p := &GetNodesParams{
|
||||||
|
ContainerID: containerID,
|
||||||
|
TreeID: treeID,
|
||||||
|
Path: prefixPath,
|
||||||
|
LatestOnly: false,
|
||||||
|
AllAttrs: true,
|
||||||
|
}
|
||||||
|
nodes, err := c.service.GetNodes(ctx, p)
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
|
||||||
|
var intermediateNodes []uint64
|
||||||
|
for _, node := range nodes {
|
||||||
|
if isIntermediate(node) {
|
||||||
|
intermediateNodes = append(intermediateNodes, node.GetNodeID())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(intermediateNodes) == 0 {
|
||||||
|
return 0, ErrNodeNotFound
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(intermediateNodes) > 1 {
|
||||||
|
return 0, fmt.Errorf("found more than one intermediate nodes")
|
||||||
|
}
|
||||||
|
|
||||||
|
return intermediateNodes[0], nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Tree) getNodesBySplitID(ctx context.Context, containerID cid.ID, filePath string, splitID *object.SplitID) ([]TreeNode, error) {
|
||||||
|
rootID, err := c.determinePrefixNode(ctx, containerID, splitTree, filePath)
|
||||||
|
if err != nil {
|
||||||
|
if errors.Is(err, ErrNodeNotFound) {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
subTree, err := c.service.GetSubTree(ctx, containerID, splitTree, rootID, 2)
|
||||||
|
if err != nil {
|
||||||
|
if errors.Is(err, ErrNodeNotFound) {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
nodes := make([]TreeNode, 0, len(subTree))
|
||||||
|
for _, v := range subTree {
|
||||||
|
node, err := newTreeNode(v)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("error parse tree node: %w", err)
|
||||||
|
}
|
||||||
|
nodes = append(nodes, *node)
|
||||||
|
}
|
||||||
|
|
||||||
|
result := make([]TreeNode, 0)
|
||||||
|
for _, node := range nodes {
|
||||||
|
if val, ok := node.Meta[splitIDKV]; ok {
|
||||||
|
if val == splitID.String() {
|
||||||
|
result = append(result, node)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return result, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func isIntermediate(node NodeResponse) bool {
|
||||||
|
if len(node.GetMeta()) != 1 {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
return node.GetMeta()[0].GetKey() == FileNameKey
|
||||||
|
}
|
||||||
|
|
||||||
|
func newTreeNode(nodeInfo NodeResponse) (*TreeNode, error) {
|
||||||
|
treeNode := &TreeNode{
|
||||||
|
ID: nodeInfo.GetNodeID(),
|
||||||
|
ParentID: nodeInfo.GetParentID(),
|
||||||
|
TimeStamp: nodeInfo.GetTimestamp(),
|
||||||
|
Meta: make(map[string]string, len(nodeInfo.GetMeta())),
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, kv := range nodeInfo.GetMeta() {
|
||||||
|
switch kv.GetKey() {
|
||||||
|
case oidKV:
|
||||||
|
if err := treeNode.ObjID.DecodeString(string(kv.GetValue())); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
case payloadSize:
|
||||||
|
if sizeStr := string(kv.GetValue()); len(sizeStr) > 0 {
|
||||||
|
var err error
|
||||||
|
if treeNode.PayloadSize, err = strconv.ParseUint(sizeStr, 10, 64); err != nil {
|
||||||
|
return nil, fmt.Errorf("invalid payload size value '%s': %w", sizeStr, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
case modificationTime:
|
||||||
|
timestamp, err := strconv.ParseInt(string(kv.GetValue()), 10, 64)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("invalid modification time value '%s': %w", string(kv.GetValue()), err)
|
||||||
|
}
|
||||||
|
treeNode.ModificationTime = time.Unix(timestamp, 0)
|
||||||
|
case fullPath:
|
||||||
|
treeNode.FullPath = string(kv.GetValue())
|
||||||
|
default:
|
||||||
|
treeNode.Meta[kv.GetKey()] = string(kv.GetValue())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return treeNode, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func pathFromName(objectName string) []string {
|
||||||
|
return strings.Split(objectName, separator)
|
||||||
|
}
|
347
registry/storage/driver/frostfs/tree/tree_client_in_memory.go
Normal file
|
@ -0,0 +1,347 @@
|
||||||
|
package tree
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"sort"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
|
)
|
||||||
|
|
||||||
|
type nodeMeta struct {
|
||||||
|
key string
|
||||||
|
value []byte
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m nodeMeta) GetKey() string {
|
||||||
|
return m.key
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m nodeMeta) GetValue() []byte {
|
||||||
|
return m.value
|
||||||
|
}
|
||||||
|
|
||||||
|
type nodeResponse struct {
|
||||||
|
meta []nodeMeta
|
||||||
|
nodeID uint64
|
||||||
|
parentID uint64
|
||||||
|
timestamp uint64
|
||||||
|
}
|
||||||
|
|
||||||
|
func (n nodeResponse) GetNodeID() uint64 {
|
||||||
|
return n.nodeID
|
||||||
|
}
|
||||||
|
|
||||||
|
func (n nodeResponse) GetParentID() uint64 {
|
||||||
|
return n.parentID
|
||||||
|
}
|
||||||
|
|
||||||
|
func (n nodeResponse) GetTimestamp() uint64 {
|
||||||
|
return n.timestamp
|
||||||
|
}
|
||||||
|
|
||||||
|
func (n nodeResponse) GetMeta() []Meta {
|
||||||
|
res := make([]Meta, len(n.meta))
|
||||||
|
for i, value := range n.meta {
|
||||||
|
res[i] = value
|
||||||
|
}
|
||||||
|
return res
|
||||||
|
}
|
||||||
|
|
||||||
|
func (n nodeResponse) getValue(key string) string {
|
||||||
|
for _, value := range n.meta {
|
||||||
|
if value.key == key {
|
||||||
|
return string(value.value)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
|
||||||
|
type ServiceClientMemory struct {
|
||||||
|
containers map[string]containerInfo
|
||||||
|
}
|
||||||
|
|
||||||
|
type containerInfo struct {
|
||||||
|
containerID cid.ID
|
||||||
|
trees map[string]memoryTree
|
||||||
|
}
|
||||||
|
|
||||||
|
type memoryTree struct {
|
||||||
|
idCounter uint64
|
||||||
|
treeData *treeNodeMemory
|
||||||
|
}
|
||||||
|
|
||||||
|
type treeNodeMemory struct {
|
||||||
|
data nodeResponse
|
||||||
|
parent *treeNodeMemory
|
||||||
|
children []*treeNodeMemory
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *treeNodeMemory) getNode(nodeID uint64) *treeNodeMemory {
|
||||||
|
if t.data.nodeID == nodeID {
|
||||||
|
return t
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, child := range t.children {
|
||||||
|
if node := child.getNode(nodeID); node != nil {
|
||||||
|
return node
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *memoryTree) getNodesByPath(path []string) []nodeResponse {
|
||||||
|
if len(path) == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
var res []nodeResponse
|
||||||
|
for _, child := range t.treeData.children {
|
||||||
|
res = child.listNodesByPath(res, path)
|
||||||
|
}
|
||||||
|
|
||||||
|
return res
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *treeNodeMemory) listNodesByPath(res []nodeResponse, path []string) []nodeResponse {
|
||||||
|
if len(path) == 0 || t.data.getValue(FileNameKey) != path[0] {
|
||||||
|
return res
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(path) == 1 {
|
||||||
|
return append(res, t.data)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, ch := range t.children {
|
||||||
|
res = ch.listNodesByPath(res, path[1:])
|
||||||
|
}
|
||||||
|
|
||||||
|
return res
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *memoryTree) createPathIfNotExist(parent *treeNodeMemory, path []string) *treeNodeMemory {
|
||||||
|
if len(path) == 0 {
|
||||||
|
return parent
|
||||||
|
}
|
||||||
|
|
||||||
|
var node *treeNodeMemory
|
||||||
|
for _, child := range parent.children {
|
||||||
|
if len(child.data.meta) == 1 && child.data.getValue(FileNameKey) == path[0] {
|
||||||
|
node = child
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if node == nil {
|
||||||
|
node = &treeNodeMemory{
|
||||||
|
data: nodeResponse{
|
||||||
|
meta: []nodeMeta{{key: FileNameKey, value: []byte(path[0])}},
|
||||||
|
nodeID: t.idCounter,
|
||||||
|
parentID: parent.data.nodeID,
|
||||||
|
timestamp: uint64(time.Now().UnixMicro()),
|
||||||
|
},
|
||||||
|
parent: parent,
|
||||||
|
}
|
||||||
|
t.idCounter++
|
||||||
|
parent.children = append(parent.children, node)
|
||||||
|
}
|
||||||
|
|
||||||
|
return t.createPathIfNotExist(node, path[1:])
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *treeNodeMemory) removeChild(nodeID uint64) {
|
||||||
|
ind := -1
|
||||||
|
for i, ch := range t.children {
|
||||||
|
if ch.data.nodeID == nodeID {
|
||||||
|
ind = i
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if ind != -1 {
|
||||||
|
t.children = append(t.children[:ind], t.children[ind+1:]...)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *treeNodeMemory) listNodes(res []NodeResponse, depth uint32) []NodeResponse {
|
||||||
|
res = append(res, t.data)
|
||||||
|
|
||||||
|
if depth == 0 {
|
||||||
|
return res
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, ch := range t.children {
|
||||||
|
res = ch.listNodes(res, depth-1)
|
||||||
|
}
|
||||||
|
return res
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewTreeServiceClientMemory() (*ServiceClientMemory, error) {
|
||||||
|
return &ServiceClientMemory{
|
||||||
|
containers: make(map[string]containerInfo),
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *ServiceClientMemory) GetNodes(_ context.Context, p *GetNodesParams) ([]NodeResponse, error) {
|
||||||
|
cnr, ok := c.containers[p.ContainerID.EncodeToString()]
|
||||||
|
if !ok {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
tr, ok := cnr.trees[p.TreeID]
|
||||||
|
if !ok {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
res := tr.getNodesByPath(p.Path)
|
||||||
|
sort.Slice(res, func(i, j int) bool {
|
||||||
|
return res[i].timestamp < res[j].timestamp
|
||||||
|
})
|
||||||
|
|
||||||
|
if p.LatestOnly && len(res) != 0 {
|
||||||
|
res = res[len(res)-1:]
|
||||||
|
}
|
||||||
|
|
||||||
|
res2 := make([]NodeResponse, len(res))
|
||||||
|
for i, n := range res {
|
||||||
|
res2[i] = n
|
||||||
|
}
|
||||||
|
|
||||||
|
return res2, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *ServiceClientMemory) GetSubTree(_ context.Context, containerID cid.ID, treeID string, rootID uint64, depth uint32) ([]NodeResponse, error) {
|
||||||
|
cnr, ok := c.containers[containerID.EncodeToString()]
|
||||||
|
if !ok {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
tr, ok := cnr.trees[treeID]
|
||||||
|
if !ok {
|
||||||
|
return nil, ErrNodeNotFound
|
||||||
|
}
|
||||||
|
|
||||||
|
sortNode(tr.treeData)
|
||||||
|
|
||||||
|
node := tr.treeData.getNode(rootID)
|
||||||
|
if node == nil {
|
||||||
|
return nil, ErrNodeNotFound
|
||||||
|
}
|
||||||
|
|
||||||
|
// we depth-1 in case of uint32 and 0 as mark to get all subtree leads to overflow and depth is getting quite big to walk all tree levels
|
||||||
|
return node.listNodes(nil, depth-1), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func newContainerInfo(containerID cid.ID, treeID string) containerInfo {
|
||||||
|
return containerInfo{
|
||||||
|
containerID: containerID,
|
||||||
|
trees: map[string]memoryTree{
|
||||||
|
treeID: {
|
||||||
|
idCounter: 1,
|
||||||
|
treeData: &treeNodeMemory{
|
||||||
|
data: nodeResponse{
|
||||||
|
timestamp: uint64(time.Now().UnixMicro()),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func newMemoryTree() memoryTree {
|
||||||
|
return memoryTree{
|
||||||
|
idCounter: 1,
|
||||||
dkirillov marked this conversation as resolved
Outdated
|
|||||||
|
treeData: &treeNodeMemory{
|
||||||
|
data: nodeResponse{
|
||||||
|
timestamp: uint64(time.Now().UnixMicro()),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *ServiceClientMemory) AddNodeByPath(_ context.Context, containerID cid.ID, treeID string, path []string, meta map[string]string) (uint64, error) {
|
||||||
|
cnr, ok := c.containers[containerID.EncodeToString()]
|
||||||
|
if !ok {
|
||||||
|
cnr = newContainerInfo(containerID, treeID)
|
||||||
|
c.containers[containerID.EncodeToString()] = cnr
|
||||||
|
}
|
||||||
|
|
||||||
|
tr, ok := cnr.trees[treeID]
|
||||||
|
if !ok {
|
||||||
|
tr = newMemoryTree()
|
||||||
|
cnr.trees[treeID] = tr
|
||||||
|
}
|
||||||
|
|
||||||
|
parentNode := tr.createPathIfNotExist(tr.treeData, path)
|
||||||
|
if parentNode == nil {
|
||||||
|
return 0, fmt.Errorf("create path '%s'", path)
|
||||||
|
}
|
||||||
|
|
||||||
|
newID := tr.idCounter
|
||||||
|
tr.idCounter++
|
||||||
|
|
||||||
|
tn := &treeNodeMemory{
|
||||||
|
data: nodeResponse{
|
||||||
|
meta: metaToNodeMeta(meta),
|
||||||
|
nodeID: newID,
|
||||||
|
parentID: parentNode.data.nodeID,
|
||||||
|
timestamp: uint64(time.Now().UnixMicro()),
|
||||||
|
},
|
||||||
|
parent: parentNode,
|
||||||
|
}
|
||||||
|
|
||||||
|
parentNode.children = append(parentNode.children, tn)
|
||||||
|
cnr.trees[treeID] = tr
|
||||||
|
|
||||||
|
return newID, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func sortNode(node *treeNodeMemory) {
|
||||||
|
if node == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
sortNodes(node.children)
|
||||||
|
|
||||||
|
for _, child := range node.children {
|
||||||
|
sortNode(child)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func sortNodes(list []*treeNodeMemory) {
|
||||||
|
sort.Slice(list, func(i, j int) bool {
|
||||||
|
return list[i].data.getValue(FileNameKey) < list[j].data.getValue(FileNameKey)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *ServiceClientMemory) RemoveNode(_ context.Context, containerID cid.ID, treeID string, nodeID uint64) error {
|
||||||
|
cnr, ok := c.containers[containerID.EncodeToString()]
|
||||||
|
if !ok {
|
||||||
|
return ErrNodeNotFound
|
||||||
|
}
|
||||||
|
|
||||||
|
tr, ok := cnr.trees[treeID]
|
||||||
|
if !ok {
|
||||||
|
return ErrNodeNotFound
|
||||||
|
}
|
||||||
|
|
||||||
|
node := tr.treeData.getNode(nodeID)
|
||||||
|
if node == nil {
|
||||||
|
return ErrNodeNotFound
|
||||||
|
}
|
||||||
|
|
||||||
|
node.parent.removeChild(nodeID)
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func metaToNodeMeta(m map[string]string) []nodeMeta {
|
||||||
|
result := make([]nodeMeta, 0, len(m))
|
||||||
|
|
||||||
|
for key, value := range m {
|
||||||
|
result = append(result, nodeMeta{key: key, value: []byte(value)})
|
||||||
|
}
|
||||||
|
|
||||||
|
return result
|
||||||
|
}
|
232
registry/storage/driver/frostfs/tree/tree_test.go
Normal file
|
@ -0,0 +1,232 @@
|
||||||
|
package tree
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
|
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
|
||||||
|
"github.com/google/uuid"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
"go.uber.org/zap/zaptest"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestGetObjectByPath(t *testing.T) {
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
memCli, err := NewTreeServiceClientMemory()
|
||||||
|
require.NoError(t, err)
|
||||||
|
treeService := NewTree(memCli, zaptest.NewLogger(t))
|
||||||
|
|
||||||
|
cidTest := cidtest.ID()
|
||||||
|
oidTest1 := oidtest.ID()
|
||||||
|
oidTest2 := oidtest.ID()
|
||||||
|
testSize1 := uint64(10)
|
||||||
|
testSize2 := uint64(20)
|
||||||
|
|
||||||
|
nodeID1, err := treeService.AddObject(ctx, cidTest, "/a/b", oidTest1, testSize1)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
nodeID2, err := treeService.AddObject(ctx, cidTest, "/a/b/c/d", oidTest2, testSize2)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
node1, err := treeService.GetObjectByPath(ctx, cidTest, "/a/b")
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, nodeID1, node1.ID)
|
||||||
|
require.Equal(t, oidTest1, node1.ObjID)
|
||||||
|
require.Equal(t, testSize1, node1.PayloadSize)
|
||||||
|
|
||||||
|
node2, err := treeService.GetObjectByPath(ctx, cidTest, "/a/b/c/d")
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, nodeID2, node2.ID)
|
||||||
|
require.Equal(t, oidTest2, node2.ObjID)
|
||||||
|
require.Equal(t, testSize2, node2.PayloadSize)
|
||||||
|
|
||||||
|
_, err = treeService.GetObjectByPath(ctx, cidTest, "/g")
|
||||||
|
require.ErrorIs(t, err, ErrNodeNotFound)
|
||||||
|
|
||||||
|
_, err = treeService.GetObjectByPath(ctx, cidTest, "/a/b/c")
|
||||||
|
require.ErrorIs(t, err, ErrNodeNotFound)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestGetObjectByPathDir(t *testing.T) {
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
memCli, err := NewTreeServiceClientMemory()
|
||||||
|
require.NoError(t, err)
|
||||||
|
treeService := NewTree(memCli, zaptest.NewLogger(t))
|
||||||
|
|
||||||
|
cidTest := cidtest.ID()
|
||||||
|
oidTest1 := oidtest.ID()
|
||||||
|
oidTest2 := oidtest.ID()
|
||||||
|
testSize1 := uint64(10)
|
||||||
|
testSize2 := uint64(20)
|
||||||
|
|
||||||
|
nodeID1, err := treeService.AddObject(ctx, cidTest, "/a/b", oidTest1, testSize1)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
nodeID2, err := treeService.AddObject(ctx, cidTest, "/a/b/c/d", oidTest2, testSize2)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
node1, err := treeService.GetObjectByPath(ctx, cidTest, "/a/b")
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, nodeID1, node1.ID)
|
||||||
|
require.Equal(t, oidTest1, node1.ObjID)
|
||||||
|
require.Equal(t, testSize1, node1.PayloadSize)
|
||||||
|
|
||||||
|
node2, err := treeService.GetObjectByPath(ctx, cidTest, "/a/b/c/d")
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, nodeID2, node2.ID)
|
||||||
|
require.Equal(t, oidTest2, node2.ObjID)
|
||||||
|
require.Equal(t, testSize2, node2.PayloadSize)
|
||||||
|
|
||||||
|
_, err = treeService.GetObjectByPathDir(ctx, cidTest, "/g")
|
||||||
|
require.ErrorIs(t, err, ErrNodeNotFound)
|
||||||
|
|
||||||
|
_, err = treeService.GetObjectByPathDir(ctx, cidTest, "/a/b/c")
|
||||||
|
require.ErrorIs(t, err, ErrOnlyDirFound)
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestGetListObjectByPrefix(t *testing.T) {
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
memCli, err := NewTreeServiceClientMemory()
|
||||||
|
require.NoError(t, err)
|
||||||
|
treeService := NewTree(memCli, zaptest.NewLogger(t))
|
||||||
|
|
||||||
|
cidTest := cidtest.ID()
|
||||||
|
oidTest1 := oidtest.ID()
|
||||||
|
oidTest2 := oidtest.ID()
|
||||||
|
oidTest3 := oidtest.ID()
|
||||||
|
oidTest4 := oidtest.ID()
|
||||||
|
testSize1 := uint64(10)
|
||||||
|
testSize2 := uint64(20)
|
||||||
|
testSize3 := uint64(30)
|
||||||
|
testSize4 := uint64(40)
|
||||||
|
|
||||||
|
_, err = treeService.AddObject(ctx, cidTest, "/a/b", oidTest1, testSize1)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
nodeID2, err := treeService.AddObject(ctx, cidTest, "/a/b/c", oidTest2, testSize2)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
nodeID3, err := treeService.AddObject(ctx, cidTest, "/a/b/c/d", oidTest3, testSize3)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
nodeID4, err := treeService.AddObject(ctx, cidTest, "/a/b/c/d/e", oidTest4, testSize4)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
nodes, err := treeService.GetListObjectByPrefix(ctx, cidTest, "/a/b")
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, nodeID2, nodes[0].ID)
|
||||||
|
require.Equal(t, nodeID3, nodes[1].ID)
|
||||||
|
require.Equal(t, nodeID4, nodes[2].ID)
|
||||||
|
require.Equal(t, testSize2, nodes[0].PayloadSize)
|
||||||
|
require.Equal(t, testSize3, nodes[1].PayloadSize)
|
||||||
|
require.Equal(t, testSize4, nodes[2].PayloadSize)
|
||||||
|
|
||||||
|
nodes, err = treeService.GetListObjectByPrefix(ctx, cidTest, "/g/s")
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, 0, len(nodes))
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestGetListOIDBySplitID(t *testing.T) {
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
memCli, err := NewTreeServiceClientMemory()
|
||||||
|
require.NoError(t, err)
|
||||||
|
treeService := NewTree(memCli, zaptest.NewLogger(t))
|
||||||
|
|
||||||
|
cidTest := cidtest.ID()
|
||||||
|
oidTest1 := oidtest.ID()
|
||||||
|
oidTest2 := oidtest.ID()
|
||||||
|
oidTest3 := oidtest.ID()
|
||||||
|
|
||||||
|
splitID := object.NewSplitID()
|
||||||
|
splitID.SetUUID(uuid.New())
|
||||||
|
|
||||||
|
_, err = treeService.AddPHYObject(ctx, cidTest, "/a/b", oidTest1, splitID)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
_, err = treeService.AddPHYObject(ctx, cidTest, "/a/b", oidTest2, splitID)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
_, err = treeService.AddPHYObject(ctx, cidTest, "/a/b", oidTest3, splitID)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
ids, err := treeService.GetListOIDBySplitID(ctx, cidTest, "/a/b", splitID)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, oidTest1, ids[0])
|
||||||
|
require.Equal(t, oidTest2, ids[1])
|
||||||
|
require.Equal(t, oidTest3, ids[2])
|
||||||
|
|
||||||
|
ids, err = treeService.GetListOIDBySplitID(ctx, cidTest, "/c/d", splitID)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, 0, len(ids))
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestDeleteObject(t *testing.T) {
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
memCli, err := NewTreeServiceClientMemory()
|
||||||
|
require.NoError(t, err)
|
||||||
|
treeService := NewTree(memCli, zaptest.NewLogger(t))
|
||||||
|
|
||||||
|
cidTest := cidtest.ID()
|
||||||
|
oidTest1 := oidtest.ID()
|
||||||
|
testSize := uint64(10)
|
||||||
|
|
||||||
|
nodeID1, err := treeService.AddObject(ctx, cidTest, "/a/b", oidTest1, testSize)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
err = treeService.DeleteObject(ctx, cidTest, nodeID1)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
_, err = treeService.GetObjectByPath(ctx, cidTest, "/a/b")
|
||||||
|
require.ErrorIs(t, err, ErrNodeNotFound)
|
||||||
|
|
||||||
|
err = treeService.DeleteObject(ctx, cidTest, nodeID1+1)
|
||||||
|
require.ErrorIs(t, err, ErrNodeNotFound)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestDeleteObjectsBySplitID(t *testing.T) {
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
memCli, err := NewTreeServiceClientMemory()
|
||||||
|
require.NoError(t, err)
|
||||||
|
treeService := NewTree(memCli, zaptest.NewLogger(t))
|
||||||
|
|
||||||
|
cidTest := cidtest.ID()
|
||||||
|
oidTest1 := oidtest.ID()
|
||||||
|
oidTest2 := oidtest.ID()
|
||||||
|
oidTest3 := oidtest.ID()
|
||||||
|
|
||||||
|
splitID := object.NewSplitID()
|
||||||
|
splitID.SetUUID(uuid.New())
|
||||||
|
|
||||||
|
_, err = treeService.AddPHYObject(ctx, cidTest, "/a/b", oidTest1, splitID)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
_, err = treeService.AddPHYObject(ctx, cidTest, "/a/b", oidTest2, splitID)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
_, err = treeService.AddPHYObject(ctx, cidTest, "/a/b", oidTest3, splitID)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
ids, err := treeService.GetListOIDBySplitID(ctx, cidTest, "/a/b", splitID)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, 3, len(ids))
|
||||||
|
|
||||||
|
err = treeService.DeleteObjectsBySplitID(ctx, cidTest, "/a/b", splitID)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
ids, err = treeService.GetListOIDBySplitID(ctx, cidTest, "/a/b", splitID)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, 0, len(ids))
|
||||||
|
|
||||||
|
ids, err = treeService.GetListOIDBySplitID(ctx, cidTest, "/c/d", splitID)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, 0, len(ids))
|
||||||
|
}
|
178
registry/storage/driver/frostfs/wrapper/pool_wrapper.go
Normal file
|
@ -0,0 +1,178 @@
|
||||||
|
package wrapper
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"strings"
|
||||||
|
|
||||||
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
|
treepool "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool/tree"
|
||||||
|
grpcService "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool/tree/service"
|
||||||
dkirillov marked this conversation as resolved
Outdated
dkirillov
commented
It seems there isn't such package It seems there isn't such package
|
|||||||
|
"github.com/distribution/distribution/v3/registry/storage/driver/frostfs/tree"
|
||||||
|
"google.golang.org/grpc/codes"
|
||||||
|
"google.golang.org/grpc/status"
|
||||||
|
)
|
||||||
|
|
||||||
|
type GetNodeByPathResponseInfoWrapper struct {
|
||||||
|
response *grpcService.GetNodeByPathResponse_Info
|
||||||
|
}
|
||||||
|
|
||||||
|
func (n GetNodeByPathResponseInfoWrapper) GetNodeID() uint64 {
|
||||||
|
return n.response.GetNodeId()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (n GetNodeByPathResponseInfoWrapper) GetParentID() uint64 {
|
||||||
|
return n.response.GetParentId()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (n GetNodeByPathResponseInfoWrapper) GetTimestamp() uint64 {
|
||||||
|
return n.response.GetTimestamp()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (n GetNodeByPathResponseInfoWrapper) GetMeta() []tree.Meta {
|
||||||
|
res := make([]tree.Meta, len(n.response.Meta))
|
||||||
|
for i, value := range n.response.Meta {
|
||||||
|
res[i] = value
|
||||||
|
}
|
||||||
|
return res
|
||||||
|
}
|
||||||
|
|
||||||
|
type GetSubTreeResponseBodyWrapper struct {
|
||||||
|
response *grpcService.GetSubTreeResponse_Body
|
||||||
|
}
|
||||||
|
|
||||||
|
func (n GetSubTreeResponseBodyWrapper) GetNodeID() uint64 {
|
||||||
|
return n.response.GetNodeId()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (n GetSubTreeResponseBodyWrapper) GetParentID() uint64 {
|
||||||
|
return n.response.GetParentId()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (n GetSubTreeResponseBodyWrapper) GetTimestamp() uint64 {
|
||||||
|
return n.response.GetTimestamp()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (n GetSubTreeResponseBodyWrapper) GetMeta() []tree.Meta {
|
||||||
|
res := make([]tree.Meta, len(n.response.Meta))
|
||||||
|
for i, value := range n.response.Meta {
|
||||||
|
res[i] = value
|
||||||
|
}
|
||||||
|
return res
|
||||||
|
}
|
||||||
|
|
||||||
|
type PoolWrapper struct {
|
||||||
|
p *treepool.Pool
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewPoolWrapper(p *treepool.Pool) *PoolWrapper {
|
||||||
|
return &PoolWrapper{p: p}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *PoolWrapper) GetNodes(ctx context.Context, prm *tree.GetNodesParams) ([]tree.NodeResponse, error) {
|
||||||
|
poolPrm := treepool.GetNodesParams{
|
||||||
|
CID: prm.ContainerID,
|
||||||
|
TreeID: prm.TreeID,
|
||||||
|
Path: prm.Path,
|
||||||
|
Meta: prm.Meta,
|
||||||
|
PathAttribute: tree.FileNameKey,
|
||||||
|
LatestOnly: prm.LatestOnly,
|
||||||
|
AllAttrs: prm.AllAttrs,
|
||||||
|
}
|
||||||
|
|
||||||
|
nodes, err := w.p.GetNodes(ctx, poolPrm)
|
||||||
|
if err != nil {
|
||||||
|
return nil, handleError(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
res := make([]tree.NodeResponse, len(nodes))
|
||||||
|
for i, info := range nodes {
|
||||||
|
res[i] = GetNodeByPathResponseInfoWrapper{info}
|
||||||
|
}
|
||||||
|
|
||||||
|
return res, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *PoolWrapper) GetSubTree(ctx context.Context, containerID cid.ID, treeID string, rootID uint64, depth uint32) ([]tree.NodeResponse, error) {
|
||||||
|
poolPrm := treepool.GetSubTreeParams{
|
||||||
|
CID: containerID,
|
||||||
|
TreeID: treeID,
|
||||||
|
RootID: rootID,
|
||||||
|
Depth: depth,
|
||||||
|
}
|
||||||
|
|
||||||
|
subTreeReader, err := w.p.GetSubTree(ctx, poolPrm)
|
||||||
|
if err != nil {
|
||||||
|
return nil, handleError(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
var subtree []tree.NodeResponse
|
||||||
|
|
||||||
|
node, err := subTreeReader.Next()
|
||||||
|
for err == nil {
|
||||||
|
subtree = append(subtree, GetSubTreeResponseBodyWrapper{node})
|
||||||
|
node, err = subTreeReader.Next()
|
||||||
|
}
|
||||||
|
if err != nil && err != io.EOF {
|
||||||
|
return nil, handleError(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return subtree, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *PoolWrapper) AddNodeByPath(ctx context.Context, containerID cid.ID, treeID string, path []string, meta map[string]string) (uint64, error) {
|
||||||
|
nodeID, err := w.p.AddNodeByPath(ctx, treepool.AddNodeByPathParams{
|
||||||
|
CID: containerID,
|
||||||
|
TreeID: treeID,
|
||||||
|
Path: path,
|
||||||
|
Meta: meta,
|
||||||
|
PathAttribute: tree.FileNameKey,
|
||||||
|
})
|
||||||
|
return nodeID, handleError(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *PoolWrapper) RemoveNode(ctx context.Context, containerID cid.ID, treeID string, nodeID uint64) error {
|
||||||
|
return handleError(w.p.RemoveNode(ctx, treepool.RemoveNodeParams{
|
||||||
|
CID: containerID,
|
||||||
|
TreeID: treeID,
|
||||||
|
NodeID: nodeID,
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
|
||||||
|
func handleError(err error) error {
|
||||||
|
if err == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if errors.Is(err, treepool.ErrNodeNotFound) {
|
||||||
|
return fmt.Errorf("%w: %s", tree.ErrNodeNotFound, err.Error())
|
||||||
|
}
|
||||||
|
if errors.Is(err, treepool.ErrNodeAccessDenied) {
|
||||||
|
return fmt.Errorf("%w: %s", tree.ErrNodeAccessDenied, err.Error())
|
||||||
|
}
|
||||||
|
if isTimeoutError(err) {
|
||||||
|
return fmt.Errorf("%w: %s", tree.ErrGatewayTimeout, err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func unwrapErr(err error) error {
|
||||||
|
unwrappedErr := errors.Unwrap(err)
|
||||||
|
for unwrappedErr != nil {
|
||||||
|
err = unwrappedErr
|
||||||
|
unwrappedErr = errors.Unwrap(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func isTimeoutError(err error) bool {
|
||||||
|
if strings.Contains(err.Error(), "timeout") ||
|
||||||
|
errors.Is(err, context.DeadlineExceeded) {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
return status.Code(unwrapErr(err)) == codes.DeadlineExceeded
|
||||||
|
}
|
Seems like we can store payload size in k/v pair in
treeObj
meta, so we can remove this head request. The same is applicable for other head requests.Maybe I am wrong, because we upload object part-by-part and we don't know total size unless we use head request. Need to look into it.
If object isn't fully uploaded we shouldn't read it I suppose