Engine implementation

Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
This commit is contained in:
Evgenii Stratonikov 2024-03-22 22:16:50 +03:00
parent 29708b78d7
commit 3a77ed1e7a
Signed by: fyrchik
SSH key fingerprint: SHA256:m/TTwCzjnRkXgnzEx9X92ccxy1CcVeinOgDb3NPWWmg
18 changed files with 1804 additions and 2 deletions

View file

@ -58,6 +58,7 @@ func initTreeService(c *cfg) {
tree.WithPrivateKey(&c.key.PrivateKey),
tree.WithLogger(c.log),
tree.WithStorage(c.cfgObject.cfgLocalStorage.localStorage),
tree.WithBurnedStorage(c.cfgObject.cfgLocalStorage.localStorage),
tree.WithContainerCacheSize(treeConfig.CacheSize()),
tree.WithReplicationTimeout(treeConfig.ReplicationTimeout()),
tree.WithReplicationChannelCapacity(treeConfig.ReplicationChannelCapacity()),

2
go.mod
View file

@ -4,7 +4,7 @@ go 1.22
require (
code.gitea.io/sdk/gitea v0.17.1
git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.16.1-0.20241007120543-29c522d5d8a3
git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.16.1-0.20241011114054-f0fc40e116d1
git.frostfs.info/TrueCloudLab/frostfs-contract v0.20.0
git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0
git.frostfs.info/TrueCloudLab/frostfs-locode-db v0.4.1-0.20240710074952-65761deb5c0d

2
go.sum
View file

@ -2,6 +2,8 @@ code.gitea.io/sdk/gitea v0.17.1 h1:3jCPOG2ojbl8AcfaUCRYLT5MUcBMFwS0OSK2mA5Zok8=
code.gitea.io/sdk/gitea v0.17.1/go.mod h1:aCnBqhHpoEWA180gMbaCtdX9Pl6BWBAuuP2miadoTNM=
git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.16.1-0.20241007120543-29c522d5d8a3 h1:6QXNnfBgYx81UZsBdpPnQY+ZMSKGFbFc29wV7DJ/UG4=
git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.16.1-0.20241007120543-29c522d5d8a3/go.mod h1:F5GS7hRb62PUy5sTYDC4ajVdeffoAfjHSSHTKUJEaYU=
git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.16.1-0.20241011114054-f0fc40e116d1 h1:ivcdxQeQDnx4srF2ezoaeVlF0FAycSAztwfIUJnUI4s=
git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.16.1-0.20241011114054-f0fc40e116d1/go.mod h1:F5GS7hRb62PUy5sTYDC4ajVdeffoAfjHSSHTKUJEaYU=
git.frostfs.info/TrueCloudLab/frostfs-contract v0.20.0 h1:8Z5iPhieCrbcdhxBuY/Bajh6V5fki7Whh0b4S2zYJYU=
git.frostfs.info/TrueCloudLab/frostfs-contract v0.20.0/go.mod h1:Y2Xorxc8SBO4phoek7n3XxaPZz5rIrFgDsU4TOjmlGA=
git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0 h1:FxqFDhQYYgpe41qsIHVOcdzSVCB8JNSfPG7Uk4r2oSk=

View file

@ -9,6 +9,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
tracingPkg "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
cidSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
@ -440,3 +441,32 @@ func (e *StorageEngine) getTreeShard(ctx context.Context, cid cidSDK.ID, treeID
return 0, lst, pilorama.ErrTreeNotFound
}
func (e *StorageEngine) Add(d pilorama.CIDDescriptor, treeID string, key []byte, meta pilorama.Meta) (pilorama.Operation, error) {
lst := e.sortShards(d.CID)
return lst[0].Add(d, treeID, key, meta)
}
func (e *StorageEngine) Remove(d pilorama.CIDDescriptor, treeID string, key []byte) (pilorama.Operation, error) {
lst := e.sortShards(d.CID)
return lst[0].Remove(d, treeID, key)
}
func (e *StorageEngine) Apply(cnr cid.ID, treeID string, op pilorama.Operation) error {
lst := e.sortShards(cnr)
return lst[0].Apply(cnr, treeID, op)
}
func (e *StorageEngine) GetLatestByPrefix(cnr cid.ID, treeID string, key []byte) (pilorama.MetaVersion, error) {
lst := e.sortShards(cnr)
return lst[0].GetLatestByPrefix(cnr, treeID, key)
}
func (e *StorageEngine) GetLatest(cnr cid.ID, treeID string, key []byte) (pilorama.Meta, error) {
lst := e.sortShards(cnr)
return lst[0].GetLatest(cnr, treeID, key)
}
func (e *StorageEngine) ListByPrefix(cnr cid.ID, treeID string, key []byte) ([]pilorama.MetaVersion, error) {
lst := e.sortShards(cnr)
return lst[0].ListByPrefix(cnr, treeID, key)
}
func (e *StorageEngine) ListAll(cnr cid.ID, treeID string, param pilorama.ListParam) ([][]byte, error) {
lst := e.sortShards(cnr)
return lst[0].ListAll(cnr, treeID, param)
}

View file

@ -0,0 +1,420 @@
package pilorama
import (
"bytes"
"context"
"encoding/binary"
"errors"
"fmt"
"os"
"path/filepath"
"sync"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"github.com/nspcc-dev/neo-go/pkg/io"
"go.etcd.io/bbolt"
)
type BurnedForest interface {
SetMode(m mode.Mode) error
Open(_ context.Context, mode mode.Mode) error
Init() error
Close() error
BurnedFully
}
type BurnedFully interface {
Add(d CIDDescriptor, treeID string, key []byte, meta Meta) (Operation, error)
Remove(d CIDDescriptor, treeID string, key []byte) (Operation, error)
Apply(cnr cid.ID, treeID string, op Operation) error
GetLatestByPrefix(cnr cid.ID, treeID string, prefix []byte) (MetaVersion, error)
GetLatest(cnr cid.ID, treeID string, key []byte) (Meta, error)
ListByPrefix(cnr cid.ID, treeID string, key []byte) ([]MetaVersion, error)
ListAll(cnr cid.ID, treeID string, param ListParam) ([][]byte, error)
}
type newPilorama struct {
db *bbolt.DB
modeMtx sync.RWMutex
mode mode.Mode
cfg
}
var ErrValueNotFound = metaerr.New("value not found")
type Kind byte
const (
OpInsert = iota
OpDelete = iota
)
type Operation struct {
Kind Kind
Key []byte
Meta Meta
}
func NewPilorama(opts ...Option) *newPilorama {
p := &newPilorama{
cfg: cfg{
perm: os.ModePerm,
maxBatchDelay: bbolt.DefaultMaxBatchDelay,
maxBatchSize: bbolt.DefaultMaxBatchSize,
openFile: os.OpenFile,
metrics: &noopMetrics{},
},
}
for i := range opts {
opts[i](&p.cfg)
}
// p.cfg.path = filepath.Join(p.cfg.path, ".new")
return p
}
func (p *newPilorama) SetMode(m mode.Mode) error {
p.modeMtx.Lock()
defer p.modeMtx.Unlock()
if p.mode == m {
return nil
}
err := p.Close()
if err == nil && !m.NoMetabase() {
if err = p.openBolt(m); err == nil {
err = p.Init()
}
}
if err != nil {
return fmt.Errorf("can't set pilorama mode (old=%s, new=%s): %w", p.mode, m, err)
}
p.mode = m
p.metrics.SetMode(mode.ConvertToComponentMode(m))
return nil
}
func (p *newPilorama) Open(_ context.Context, mode mode.Mode) error {
p.modeMtx.Lock()
defer p.modeMtx.Unlock()
p.mode = mode
if mode.NoMetabase() {
return nil
}
return p.openBolt(mode)
}
func (p *newPilorama) openBolt(m mode.Mode) error {
readOnly := m.ReadOnly()
err := util.MkdirAllX(filepath.Dir(p.path), p.perm)
if err != nil {
return metaerr.Wrap(fmt.Errorf("can't create dir %s for the pilorama: %w", p.path, err))
}
opts := *bbolt.DefaultOptions
opts.ReadOnly = readOnly
opts.NoSync = p.noSync
opts.Timeout = 100 * time.Millisecond
opts.OpenFile = p.openFile
p.db, err = bbolt.Open(p.path, p.perm, &opts)
if err != nil {
return metaerr.Wrap(fmt.Errorf("can't open the pilorama DB: %w", err))
}
p.db.MaxBatchSize = p.maxBatchSize
p.db.MaxBatchDelay = p.maxBatchDelay
p.metrics.SetMode(mode.ConvertToComponentMode(m))
return nil
}
func (p *newPilorama) Init() error {
if p.mode.NoMetabase() || p.db.IsReadOnly() {
return nil
}
return nil
}
func (p *newPilorama) Close() error {
var err error
if p.db != nil {
err = p.db.Close()
}
if err == nil {
p.metrics.Close()
}
return err
}
var ErrInvalidOperation = errors.New("invalid operation")
func (op *Operation) Bytes() []byte {
w := io.NewBufBinWriter()
w.WriteB(byte(op.Kind))
w.WriteVarBytes(op.Key)
op.Meta.EncodeBinary(w.BinWriter)
return w.Bytes()
}
func (op *Operation) FromBytes(data []byte) error {
r := io.NewBinReaderFromBuf(data)
op.Kind = Kind(r.ReadB())
op.Key = r.ReadVarBytes()
op.Meta.DecodeBinary(r)
return r.Err
}
func (p *newPilorama) Add(d CIDDescriptor, treeID string, key []byte, meta Meta) (Operation, error) {
op := Operation{
Kind: OpInsert,
Key: key,
Meta: meta,
}
err := p.db.Batch(func(tx *bbolt.Tx) error {
blog, btree, err := p.getTreeBuckets(tx, d.CID, treeID)
if err != nil {
return err
}
ts := p.getLatestTimestamp(blog, d.Position, d.Size)
op.Meta.Time = ts
return p.apply(blog, btree, op)
})
return op, err
}
func (p *newPilorama) Remove(d CIDDescriptor, treeID string, key []byte) (Operation, error) {
op := Operation{
Kind: OpDelete,
Key: key,
}
err := p.db.Batch(func(tx *bbolt.Tx) error {
blog, btree, err := p.getTreeBuckets(tx, d.CID, treeID)
if err != nil {
return err
}
ts := p.getLatestTimestamp(blog, d.Position, d.Size)
op.Meta.Time = ts
return p.apply(blog, btree, op)
})
return op, err
}
func (p *newPilorama) getLatestTimestamp(b *bbolt.Bucket, pos, size int) uint64 {
var ts uint64
c := b.Cursor()
key, _ := c.Last()
if len(key) != 0 {
ts = binary.BigEndian.Uint64(key)
}
return nextTimestamp(ts, uint64(pos), uint64(size))
}
func (p *newPilorama) Apply(cnr cid.ID, treeID string, op Operation) error {
return p.db.Batch(func(tx *bbolt.Tx) error {
blog, btree, err := p.getTreeBuckets(tx, cnr, treeID)
if err != nil {
return err
}
return p.apply(blog, btree, op)
})
}
func (p *newPilorama) apply(blog, btree *bbolt.Bucket, op Operation) error {
ts := make([]byte, 8)
binary.BigEndian.PutUint64(ts, op.Meta.Time)
if err := blog.Put(ts, op.Bytes()); err != nil {
return err
}
current := btree.Get(op.Key)
if len(current) == 8 && op.Meta.Time <= binary.BigEndian.Uint64(current) {
return nil
}
return btree.Put(op.Key, ts)
}
func (p *newPilorama) getTreeBuckets(tx *bbolt.Tx, cnr cid.ID, treeID string) (*bbolt.Bucket, *bbolt.Bucket, error) {
namelog, nametree := burnedNames(cnr, treeID)
blog := tx.Bucket(namelog)
if blog == nil {
var err error
blog, err = tx.CreateBucketIfNotExists(namelog)
if err != nil {
return nil, nil, err
}
}
btree := tx.Bucket(nametree)
if btree == nil {
var err error
btree, err = tx.CreateBucketIfNotExists(nametree)
if err != nil {
return nil, nil, err
}
}
return blog, btree, nil
}
func burnedName(cnr cid.ID, treeID string, typ byte) []byte {
b := make([]byte, 32+len(treeID)+1)
cnr.Encode(b)
copy(b[32:], treeID)
b[len(b)-1] = typ
return b
}
func burnedNames(cnr cid.ID, treeID string) ([]byte, []byte) {
blog := burnedName(cnr, treeID, 'L')
btree := burnedName(cnr, treeID, 'T')
return blog, btree
}
func (p *newPilorama) GetLatestByPrefix(cnr cid.ID, treeID string, prefix []byte) (MetaVersion, error) {
// TODO(@fyrchik): can be more optimal
ms, err := p.ListByPrefix(cnr, treeID, prefix)
if err != nil {
return MetaVersion{}, err
}
var maxIndex int
for i := 1; i < len(ms); i++ {
if ms[i].Meta.Time > ms[maxIndex].Meta.Time {
maxIndex = i
}
}
return ms[maxIndex], nil
}
func (p *newPilorama) GetLatest(cnr cid.ID, treeID string, key []byte) (Meta, error) {
var rawMeta []byte
nlog, ntree := burnedNames(cnr, treeID)
err := p.db.View(func(tx *bbolt.Tx) error {
blog := tx.Bucket(nlog)
btree := tx.Bucket(ntree)
if blog == nil || btree == nil {
return ErrTreeNotFound
}
ts := btree.Get(key)
if ts == nil {
return ErrValueNotFound
}
rawMeta = bytes.Clone(blog.Get(ts))
return nil
})
if err != nil {
return Meta{}, err
}
var m Meta
if err := m.FromBytes(rawMeta); err != nil {
return Meta{}, err
}
return m, nil
}
type MetaVersion struct {
Key []byte
Meta Meta
}
func (p *newPilorama) ListByPrefix(cnr cid.ID, treeID string, prefix []byte) ([]MetaVersion, error) {
var rawMetas [][]byte
nlog, ntree := burnedNames(cnr, treeID)
err := p.db.View(func(tx *bbolt.Tx) error {
blog := tx.Bucket(nlog)
btree := tx.Bucket(ntree)
if blog == nil || btree == nil {
return ErrTreeNotFound
}
c := btree.Cursor()
for k, v := c.Seek(prefix); k != nil && bytes.HasPrefix(k, prefix); k, v = c.Next() {
rawMetas = append(rawMetas, bytes.Clone(blog.Get(v)))
}
return nil
})
if err != nil {
return nil, err
}
res := make([]MetaVersion, 0, len(rawMetas))
for i := range rawMetas {
var op Operation
if err := op.FromBytes(rawMetas[i]); err != nil {
return nil, err
}
res = append(res, MetaVersion{
Key: op.Key,
Meta: op.Meta,
})
}
return res, err
}
type ListParam struct {
Inclusive bool
Start []byte // Non-inclusive
Count int
}
func (p *newPilorama) ListAll(cnr cid.ID, treeID string, param ListParam) ([][]byte, error) {
var keys [][]byte
_, ntree := burnedNames(cnr, treeID)
exact := make([]byte, len(param.Start))
copy(exact, param.Start)
err := p.db.View(func(tx *bbolt.Tx) error {
btree := tx.Bucket(ntree)
if btree == nil {
return ErrTreeNotFound
}
c := btree.Cursor()
k, _ := c.Seek(exact)
if !param.Inclusive && bytes.Equal(k, exact) {
k, _ = c.Next()
}
for ; k != nil; k, _ = c.Next() {
keys = append(keys, bytes.Clone(k))
if len(keys) == param.Count {
return nil
}
}
return nil
})
if err != nil {
return nil, err
}
return keys, nil
}
func makeFilenameKey(key []byte, version []byte) []byte {
filenameKey := make([]byte, len(key)+len(version)+1)
n := copy(filenameKey, key)
filenameKey[n] = 0
n++
copy(filenameKey[n:], version)
return filenameKey
}

View file

@ -0,0 +1,151 @@
package pilorama
import (
"context"
mrand "math/rand"
"path/filepath"
"strconv"
"testing"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
"github.com/stretchr/testify/require"
)
func newTestNewPilorama(t testing.TB, opts ...Option) BurnedForest {
f := NewPilorama(
append([]Option{
WithPath(filepath.Join(t.TempDir(), "test.db")),
WithMaxBatchSize(1),
}, opts...)...)
require.NoError(t, f.Open(context.Background(), mode.ReadWrite))
require.NoError(t, f.Init())
return f
}
func BenchmarkBurnedForestApply(b *testing.B) {
for _, bs := range batchSizes {
b.Run("batchsize="+strconv.Itoa(bs), func(b *testing.B) {
r := mrand.New(mrand.NewSource(time.Now().Unix()))
s := newTestNewPilorama(b, WithMaxBatchSize(bs))
defer func() { require.NoError(b, s.Close()) }()
benchmarkApplyBurned(b, s, func(opCount int) []Operation {
ops := make([]Operation, opCount)
for i := range ops {
ops[i] = randomOp(b, r)
ops[i].Meta.Time = Timestamp(i)
}
return ops
})
})
}
}
func benchmarkApplyBurned(b *testing.B, s BurnedForest, genFunc func(int) []Operation) {
ops := genFunc(b.N)
cid := cidtest.ID()
treeID := "version"
ch := make(chan int, b.N)
for i := 0; i < b.N; i++ {
ch <- i
}
b.ResetTimer()
b.ReportAllocs()
b.SetParallelism(10)
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
if err := s.Apply(cid, treeID, ops[<-ch]); err != nil {
b.Fatalf("error in `Apply`: %v", err)
}
}
})
}
func TestBurnedForestApplyRandom(t *testing.T) {
r := mrand.New(mrand.NewSource(42))
const (
nodeCount = 5
opCount = 20
)
ops := prepareRandomBurnedForest(t, nodeCount, opCount)
cnr := cidtest.ID()
treeID := "version"
expected := newTestNewPilorama(t, WithNoSync(true))
defer func() { require.NoError(t, expected.Close()) }()
for i := range ops {
require.NoError(t, expected.Apply(cnr, treeID, ops[i]))
}
const iterCount = 200
for i := 0; i < iterCount; i++ {
// Shuffle random operations, leave initialization in place.
r.Shuffle(len(ops), func(i, j int) { ops[i], ops[j] = ops[j], ops[i] })
actual := newTestNewPilorama(t, WithNoSync(true))
for i := range ops {
require.NoError(t, actual.Apply(cnr, treeID, ops[i]))
}
compareBurnedForests(t, expected, actual, cnr, treeID, nodeCount)
require.NoError(t, actual.Close())
}
}
func compareBurnedForests(t testing.TB, expected, actual BurnedForest, cnr cid.ID, treeID string, nodeCount int) {
for i := 0; i < nodeCount; i++ {
expectedMetas, err := expected.ListByPrefix(cnr, treeID, []byte(strconv.Itoa(i)))
require.NoError(t, err)
actualMetas, err := actual.ListByPrefix(cnr, treeID, []byte(strconv.Itoa(i)))
require.NoError(t, err)
require.Equal(t, expectedMetas, actualMetas)
r1, err := expected.ListAll(cnr, treeID, ListParam{Start: nil, Count: 100})
require.NoError(t, err)
r2, err := actual.ListAll(cnr, treeID, ListParam{Start: nil, Count: 100})
require.NoError(t, err)
require.Equal(t, r1, r2)
}
}
func prepareRandomBurnedForest(t testing.TB, nodeCount, opCount int) []Operation {
r := mrand.New(mrand.NewSource(42))
ops := make([]Operation, nodeCount+opCount)
for i := 0; i < nodeCount; i++ {
ops[i] = randomOp(t, r)
ops[i].Key = []byte(strconv.Itoa(i))
}
for i := nodeCount; i < len(ops); i++ {
ops[i] = randomOp(t, r)
ops[i].Key = ops[r.Intn(nodeCount)].Key
}
return ops
}
func randomOp(t testing.TB, r *mrand.Rand) Operation {
kv := make([][]byte, 5)
for i := range kv {
kv[i] = make([]byte, r.Intn(10)+1)
r.Read(kv[i])
}
return Operation{
Key: []byte(strconv.Itoa(r.Int())),
Meta: Meta{
Time: Timestamp(r.Uint64()),
Items: []KeyValue{
{Key: string(kv[0]), Value: kv[1]},
{Key: string(kv[2]), Value: kv[3]},
},
},
}
}

View file

@ -0,0 +1,326 @@
package pilorama
import (
"encoding/binary"
"errors"
"fmt"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"go.etcd.io/bbolt"
)
const (
systemTreeID = "system"
versionTreeID = "version"
versionAttribute = "OID"
isUnversionedAttribute = "IsUnversioned"
nullVersionValue = "null"
versionTagAttribute = "IsTag"
versionLockAttribute = "IsLock"
multipartInfoNumberAttribute = "Number"
multipartInfoUploadIDAttribute = "UploadId"
multipartInfoNewKeySuffix = "Info"
systemSettingsKey = "bucket-settings"
systemCorsKey = "bucket-cors"
systemTaggingKey = "bucket-tagging"
systemLifecycleKey = "bucket-lifecycle"
)
type keyValue struct {
timestamp uint64
key []byte
value []byte
}
type migrationContext struct {
treeID string
blog *bbolt.Bucket
btree *bbolt.Bucket
}
func (t *boltForest) migrateVersionAndSystem(tx *bbolt.Tx, cnr cid.ID) error {
if err := t.iterateFull(tx, cnr, versionTreeID); err != nil {
return err
}
if err := t.iterateFull(tx, cnr, systemTreeID); err != nil {
return err
}
return nil
}
func (t *boltForest) iterateFull(tx *bbolt.Tx, cnr cid.ID, treeID string) error {
switch treeID {
case versionTreeID, systemTreeID:
default:
return fmt.Errorf("unexpected tree ID: '%s'", treeID)
}
_, oldTree, err := t.getTreeBuckets(tx, bucketName(cnr, treeID))
if err != nil {
return err
}
logName, treeName := burnedNames(cnr, treeID)
blog, err := tx.CreateBucketIfNotExists(logName)
if err != nil {
return err
}
btree, err := tx.CreateBucketIfNotExists(treeName)
if err != nil {
return err
}
ctx := migrationContext{
treeID: treeID,
blog: blog,
btree: btree,
}
var filePath []string
stack := [][]Node{{RootID}}
for len(stack) != 0 {
if len(stack[len(stack)-1]) == 0 {
stack = stack[:len(stack)-1]
if len(filePath) != 0 {
filePath = filePath[:len(filePath)-1]
}
continue
}
var next Node
next, stack[len(stack)-1] = stack[len(stack)-1][0], stack[len(stack)-1][1:]
_, _, rawMeta, _ := t.getState(oldTree, stateKey(make([]byte, 9), next))
var m Meta
if err := m.FromBytes(rawMeta); err != nil {
return err
}
fileName := m.GetAttr(AttributeFilename)
filePath = append(filePath, string(fileName))
descend, err := t.duplicateOnUpdate(ctx, oldTree, next, filePath)
if err != nil {
return err
}
var childIDs []Node
if descend {
key := make([]byte, 9)
key[0] = 'c'
binary.LittleEndian.PutUint64(key[1:], next)
c := oldTree.Cursor()
for k, _ := c.Seek(key); len(k) == childrenKeySize && binary.LittleEndian.Uint64(k[1:]) == next; k, _ = c.Next() {
childID := binary.LittleEndian.Uint64(k[9:])
childIDs = append(childIDs, childID)
}
}
stack = append(stack, childIDs)
}
return nil
}
func (t *boltForest) duplicateOnUpdate(ctx migrationContext, oldTree *bbolt.Bucket, nodeID Node, filePath []string) (bool, error) {
fmt.Println("duplicateOnUpdate(", filePath, ")")
const ()
var kvs []keyValue
var err error
switch ctx.treeID {
case systemTreeID:
kvs, err = t.systemRouter(oldTree, nodeID)
case versionTreeID:
kvs, err = t.versionRouter(oldTree, nodeID)
default:
return false, fmt.Errorf("unexpected tree ID: '%s'", ctx.treeID)
}
if err != nil {
return false, err
}
for _, kv := range kvs {
ts := make([]byte, 8)
binary.BigEndian.PutUint64(ts, kv.timestamp)
if err := ctx.blog.Put(ts, kv.value); err != nil {
return false, err
}
current := ctx.btree.Get(kv.key)
if len(current) == 8 && kv.timestamp <= binary.BigEndian.Uint64(current) {
continue
}
if err := ctx.btree.Put(kv.key, ts); err != nil {
return false, err
}
}
return len(kvs) == 0, nil
}
func (t *boltForest) buildPathInreverse(bTree *bbolt.Bucket, nodeID Node) (bool, Timestamp, Meta, []byte, error) {
// 1. Get filepath to the current node.
var components [][]byte
var size int
parent, ts, rawMeta, _ := t.getState(bTree, stateKey(make([]byte, 9), nodeID))
var meta Meta
if err := meta.FromBytes(rawMeta); err != nil {
return false, 0, Meta{}, nil, err
}
if len(meta.Items) <= 1 {
return false, 0, Meta{}, nil, nil
}
for {
var m Meta
if err := m.FromBytes(rawMeta); err != nil {
return false, 0, Meta{}, nil, err
}
fname := m.GetAttr(AttributeFilename)
if fname == nil {
return false, 0, Meta{}, nil, fmt.Errorf("empty '%s' was found in ancestors", AttributeFilename)
}
size += len(fname)
components = append(components, fname)
if parent == RootID {
break
}
nodeID = parent
parent, _, rawMeta, _ = t.getState(bTree, stateKey(make([]byte, 9), nodeID))
}
var prefix []byte
for i := len(components) - 1; i >= 0; i-- {
prefix = append(prefix, components[i]...)
if i > 0 {
prefix = append(prefix, '/')
}
}
return true, ts, meta, prefix, nil
}
func (t *boltForest) systemRouter(bTree *bbolt.Bucket, nodeID Node) ([]keyValue, error) {
// 1. Get filepath to the current node.
isExternalNode, ts, systemMeta, prefix, err := t.buildPathInreverse(bTree, nodeID)
if err != nil || !isExternalNode {
return nil, err
}
uploadID := systemMeta.GetAttr(multipartInfoUploadIDAttribute)
if uploadID == nil {
attr := systemMeta.GetAttr(AttributeFilename)
switch fname := string(systemMeta.GetAttr(AttributeFilename)); fname {
case systemSettingsKey, systemCorsKey, systemTaggingKey, systemLifecycleKey:
return []keyValue{{
timestamp: ts,
key: attr,
value: systemMeta.Bytes(),
}}, nil
default:
return nil, fmt.Errorf("neither %s nor %s ('%s') are found in system tree", multipartInfoUploadIDAttribute, AttributeFilename, fname)
}
}
// The key is '<key>\u0000<UploadId>\u0000Info'.
prefix = append(prefix, 0)
prefix = append(prefix, uploadID...)
prefix = append(prefix, 0)
prefix = prefix[:len(prefix):len(prefix)]
kvs := []keyValue{{
timestamp: ts,
key: append(prefix, multipartInfoNewKeySuffix...),
value: systemMeta.Bytes(),
}}
t.forEachChild(bTree, nodeID, func(info childInfo) error {
number := info.meta.GetAttr(multipartInfoNumberAttribute)
if len(number) == 0 {
return errors.New("empty 'Number' attribute in multipart info")
}
// The key is '<key>\u0000<UploadId>\u0000<Number>', where `<Number>` is the `Number` attribute.
newKey := make([]byte, len(prefix)+len(number))
copy(newKey, prefix)
copy(newKey[len(prefix):], number)
kvs = append(kvs, keyValue{
timestamp: info.time,
key: newKey,
value: info.rawMeta,
})
return nil
})
return kvs, nil
}
func (t *boltForest) versionRouter(bTree *bbolt.Bucket, nodeID Node) ([]keyValue, error) {
// 1. Get filepath to the current node.
isExternalNode, ts, versionMeta, prefix, err := t.buildPathInreverse(bTree, nodeID)
if err != nil || !isExternalNode {
return nil, err
}
version := versionMeta.GetAttr(versionAttribute)
if version == nil {
if len(versionMeta.GetAttr(isUnversionedAttribute)) == 0 {
return nil, fmt.Errorf("both '%s' and '%s' attributes are empty", versionAttribute, isUnversionedAttribute)
}
version = []byte(nullVersionValue)
}
t.forEachChild(bTree, nodeID, func(info childInfo) error {
if info.meta.GetAttr(versionTagAttribute) != nil || info.meta.GetAttr(versionLockAttribute) != nil {
versionMeta.Items = append(versionMeta.Items, info.meta.Items...)
}
return nil
})
// The key is '<key>\u0000<versionId>'.
prefix = append(prefix, 0)
prefix = append(prefix, version...)
prefix = prefix[:len(prefix):len(prefix)]
return []keyValue{{
timestamp: ts,
key: prefix,
value: versionMeta.Bytes(),
}}, nil
}
type childInfo struct {
time Timestamp
meta Meta
rawMeta []byte
}
func (t *boltForest) forEachChild(bTree *bbolt.Bucket, nodeID Node, f func(childInfo) error) error {
key := make([]byte, 9)
key[0] = 'c'
binary.LittleEndian.PutUint64(key[1:], nodeID)
c := bTree.Cursor()
for k, _ := c.Seek(key); len(k) == childrenKeySize && binary.LittleEndian.Uint64(k[1:]) == nodeID; k, _ = c.Next() {
childID := binary.LittleEndian.Uint64(k[9:])
_, ts, rawMeta, inTree := t.getState(bTree, stateKey(make([]byte, 9), childID))
if !inTree {
continue
}
var m Meta
if err := m.FromBytes(rawMeta); err != nil {
return err
}
if err := f(childInfo{time: ts, rawMeta: rawMeta, meta: m}); err != nil {
return err
}
}
return nil
}

View file

@ -0,0 +1,371 @@
package pilorama
import (
"context"
"crypto/rand"
"encoding/base64"
"encoding/hex"
"fmt"
randv2 "math/rand/v2"
"path/filepath"
"strconv"
"strings"
"testing"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
usertest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user/test"
"github.com/davecgh/go-spew/spew"
"github.com/stretchr/testify/require"
"go.etcd.io/bbolt"
)
/*
Ниже представлены текущие данные в деревяшке и их новые аналоги.
system tree - настройки бакета, информация о мультипартах
settings (при миграции атрибуты не меняются, ключ в новом дереве bucket-settings):
FileName: bucket-settings
Versioning
LockConfiguration
cannedACL
ownerKey
cors (при миграции атрибуты не меняются, ключ в новом дереве bucket-cors):
FileName: bucket-cors
OID
bucket-tagging (при миграции атрибуты не меняются, ключ в новом дереве bucket-tagging):
FileName: bucket-tagging
User-Tag-<key>
multiparts info (при миграции атрибуты не меняются, ключ в новом дереве <key>\u0000<UploadId>\u0000Info):
FileName
UploadId
Owner
Created
Finished
multiparts parts (были дочерними для multipart info; при миграции атрибуты не меняются, ключ в новом дереве <key>\u0000<UploadId>\u0000<Number>)
Number
OID
Size
Created
ETag
MD5
version tree - хранит информацию о конкретных версиях объетов и их дополнительной мета информации (в том числе теги/локи)
version
old attrs:
OID
FileName
Owner
Created
Size
ETag
MD5
IdDeleteMarker
IsCombined
IsUnversioned
new attrs:
OID
FilePath
Key
Owner
Created
Size
ETag
MD5
IdDeleteMarker
IsCombined
IsUnversioned
User-Tag-<key>
LegalHoldOID
RetentionOID
UntilDate
IsCompliance
Ключ по которому сохраняется информация о версии <key>\u0000<versionId> (где version-id это OID либо null если установлен флаг IsUnversioned)
Часть новых атрибутов в мете версии взята из тегов и локов, которые были раньше отдельными дочерними узлами версии.
теги:
IsTag
User-Tag-<key>
локи:
IsLock
LegalHoldOID
RetentionOID
UntilDate
IsCompliance
*/
func toKeyValue(m map[string]string) []KeyValue {
kvs := make([]KeyValue, 0, len(m))
for k, v := range m {
kvs = append(kvs, KeyValue{Key: k, Value: []byte(v)})
}
return kvs
}
func randBytes(n int) []byte {
a := make([]byte, n)
rand.Read(a)
return a
}
func newMultipartInfo(filename string) (string, []KeyValue) {
uploadID := base64.StdEncoding.EncodeToString(randBytes(10))
return uploadID, toKeyValue(map[string]string{
AttributeFilename: filename,
multipartInfoUploadIDAttribute: uploadID,
"Owner": usertest.ID().EncodeToString(),
"Created": time.Now().String(),
"Finished": time.Now().Add(time.Hour).String(),
})
}
func newVersionKV(filename string) (string, []KeyValue) {
objID := oidtest.ID().EncodeToString()
return objID, toKeyValue(map[string]string{
versionAttribute: objID,
AttributeFilename: filename,
"Owner": usertest.ID().EncodeToString(),
"Created": time.Now().String(),
"Size": strconv.Itoa(randv2.Int()),
"ETag": hex.EncodeToString(randBytes(32)),
"MD5": hex.EncodeToString(randBytes(16)),
"IdDeleteMarker": "false",
"IsCombined": "false",
"IsUnversioned": "false",
})
}
func TestUpgradeReal(t *testing.T) {
ctx := context.Background()
filename := filepath.Join("./testpilorama.db")
f := NewBoltForest(WithPath(filename))
require.NoError(t, f.Open(ctx, mode.ReadWrite))
require.NoError(t, f.Init())
seen := make(map[cid.ID]struct{})
err := f.(*boltForest).db.Update(func(tx *bbolt.Tx) error {
c := tx.Cursor()
for key, _ := c.First(); key != nil; key, _ = c.Next() {
if len(key) < 32 {
spew.Dump(key)
continue
}
var cnr cid.ID
if err := cnr.Decode(key[:32]); err != nil {
return fmt.Errorf("(%T).Decode: %w", cnr, err)
}
fmt.Println(cnr.String(), string(key[32:]))
if _, ok := seen[cnr]; ok {
continue
}
seen[cnr] = struct{}{}
if err := f.(*boltForest).migrateVersionAndSystem(tx, cnr); err != nil {
return err
}
}
return nil
})
require.NoError(t, err)
require.NoError(t, f.Close())
nf := NewPilorama(WithPath(filename))
require.NoError(t, nf.Open(ctx, mode.ReadOnly))
require.NoError(t, nf.Init())
defer nf.Close()
}
func TestUpgrade(t *testing.T) {
ctx := context.Background()
filename := filepath.Join(t.TempDir(), "pilorama")
f := NewBoltForest(WithPath(filename))
require.NoError(t, f.Open(ctx, mode.ReadWrite))
require.NoError(t, f.Init())
cnr := cidtest.ID()
d := CIDDescriptor{CID: cnr, Position: 1, Size: 4}
addByPath := func(treeID string, filePath []string, meta []KeyValue) []Move {
res, err := f.TreeAddByPath(ctx, d, treeID, AttributeFilename, filePath, meta)
require.NoError(t, err)
return res
}
move := func(treeID string, parent Node, meta []KeyValue) {
_, err := f.TreeMove(ctx, d, treeID, &Move{
Parent: parent,
Meta: Meta{Items: meta},
})
require.NoError(t, err)
}
type addOp struct {
filepath string
version string
meta []KeyValue
unversionedMeta []KeyValue
}
ops := []addOp{
{filepath: "my/dir/object1"},
{filepath: "my/dir/object2"},
{filepath: "my/dir1/object1"},
{filepath: "toplevel"},
}
for i := range ops {
fp := strings.Split(ops[i].filepath, "/")
version, m := newVersionKV(fp[len(fp)-1])
res := addByPath(versionTreeID, fp[:len(fp)-1], m)
switch i {
case 0:
// Attach lock info to the first object.
aux := toKeyValue(map[string]string{
versionLockAttribute: "true",
"LegalHoldOID": "garbage1",
"RetentionOID": "garbage2",
"UntilDate": time.Now().String(),
"IsCompliance": "false",
})
move(versionTreeID, res[len(res)-1].Child, aux)
m = append(m, aux...)
case 1:
// Attach user tag info to the second object.
aux := toKeyValue(map[string]string{
versionTagAttribute: "true",
"User-Tag-kek": "my-user-tag",
})
move(versionTreeID, res[len(res)-1].Child, aux)
m = append(m, aux...)
case 2:
// Add multiple unversioned objects. The last one should remain.
aux := toKeyValue(map[string]string{
isUnversionedAttribute: "true",
AttributeFilename: fp[len(fp)-1],
"attempt": "1",
})
addByPath(versionTreeID, fp[:len(fp)-1], aux)
aux = toKeyValue(map[string]string{
isUnversionedAttribute: "true",
AttributeFilename: fp[len(fp)-1],
"attempt": "2",
})
addByPath(versionTreeID, fp[:len(fp)-1], aux)
ops[i].unversionedMeta = aux
}
ops[i].version = version
ops[i].meta = m
}
// System tree.
type multipart struct {
filepath string
uploadID string
meta []KeyValue
parts []int
partsMeta [][]KeyValue
}
multipartOps := []multipart{
{filepath: "path1/object1"},
{filepath: "path1/object2"},
{filepath: "path2"},
{filepath: systemSettingsKey}, // Dirty test for naming conflict.
}
settingsMeta := toKeyValue(map[string]string{
AttributeFilename: systemSettingsKey,
"Versioning": "enabled",
"LockConfiguration": "something",
})
addByPath(systemTreeID, nil, settingsMeta)
lifecycleMeta := toKeyValue(map[string]string{
AttributeFilename: systemLifecycleKey,
"LockConfiguration": "something",
})
addByPath(systemTreeID, nil, lifecycleMeta)
corsMeta := toKeyValue(map[string]string{
AttributeFilename: systemCorsKey,
"OID": oidtest.ID().String(),
})
addByPath(systemTreeID, nil, corsMeta)
taggingMeta := toKeyValue(map[string]string{
AttributeFilename: systemTaggingKey,
"User-Tag-mytag": "abc",
})
addByPath(systemTreeID, nil, taggingMeta)
for i := range multipartOps {
fp := strings.Split(multipartOps[i].filepath, "/")
uploadID, m := newMultipartInfo(fp[len(fp)-1])
res := addByPath(systemTreeID, fp[:len(fp)-1], m)
for j := 0; j < 4; j++ {
if randv2.Int()%2 == 0 {
aux := toKeyValue(map[string]string{
multipartInfoNumberAttribute: strconv.Itoa(j),
"OID": oidtest.ID().EncodeToString(),
"Size": strconv.Itoa(randv2.Int()),
"Created": time.Now().String(),
"ETag": hex.EncodeToString(randBytes(10)),
"MD5": hex.EncodeToString(randBytes(16)),
})
move(systemTreeID, res[len(res)-1].Child, aux)
multipartOps[i].parts = append(multipartOps[i].parts, j)
multipartOps[i].partsMeta = append(multipartOps[i].partsMeta, aux)
}
}
multipartOps[i].uploadID = uploadID
multipartOps[i].meta = m
}
err := f.(*boltForest).db.Update(func(tx *bbolt.Tx) error {
return f.(*boltForest).migrateVersionAndSystem(tx, cnr)
})
require.NoError(t, err)
require.NoError(t, f.Close())
nf := NewPilorama(WithPath(filename))
require.NoError(t, nf.Open(ctx, mode.ReadOnly))
require.NoError(t, nf.Init())
checkMeta := func(treeID string, key []byte, expected []KeyValue) {
meta, err := nf.GetLatest(cnr, treeID, key)
require.NoError(t, err)
require.Equal(t, expected, meta.Items)
}
for i := range ops {
key := []byte(ops[i].filepath + "\x00" + ops[i].version)
checkMeta(versionTreeID, key, ops[i].meta)
if ops[i].unversionedMeta != nil {
key := []byte(ops[i].filepath + "\x00" + nullVersionValue)
checkMeta(versionTreeID, key, ops[i].unversionedMeta)
}
}
checkMeta(systemTreeID, []byte(systemSettingsKey), settingsMeta)
checkMeta(systemTreeID, []byte(systemCorsKey), corsMeta)
checkMeta(systemTreeID, []byte(systemLifecycleKey), lifecycleMeta)
checkMeta(systemTreeID, []byte(systemTaggingKey), taggingMeta)
for i := range multipartOps {
key := []byte(multipartOps[i].filepath + "\x00" + multipartOps[i].uploadID + "\x00" + "Info")
checkMeta(systemTreeID, key, multipartOps[i].meta)
for j, number := range multipartOps[i].parts {
key := []byte(multipartOps[i].filepath + "\x00" + multipartOps[i].uploadID + "\x00" + strconv.Itoa(number))
checkMeta(systemTreeID, key, multipartOps[i].partsMeta[j])
}
}
}

View file

@ -63,6 +63,9 @@ func (s *Shard) Open(ctx context.Context) error {
if s.pilorama != nil {
components = append(components, s.pilorama)
}
if s.burned != nil {
components = append(components, s.burned)
}
for i, component := range components {
if err := component.Open(ctx, m); err != nil {
@ -168,6 +171,9 @@ func (s *Shard) initializeComponents(m mode.Mode) error {
if s.pilorama != nil {
components = append(components, s.pilorama)
}
if s.burned != nil {
components = append(components, s.burned)
}
for _, component := range components {
if err := component.Init(); err != nil {
@ -373,6 +379,9 @@ func (s *Shard) Close() error {
if s.pilorama != nil {
components = append(components, s.pilorama)
}
if s.burned != nil {
components = append(components, s.burned)
}
if s.hasWriteCache() {
prev := s.writecacheSealCancel.Swap(notInitializedCancel)

View file

@ -41,6 +41,9 @@ func (s *Shard) setMode(m mode.Mode) error {
if s.pilorama != nil {
components = append(components, s.pilorama)
}
if s.burned != nil {
components = append(components, s.burned)
}
// The usual flow of the requests (pilorama is independent):
// writecache -> blobstor -> metabase

View file

@ -32,6 +32,8 @@ type Shard struct {
pilorama pilorama.ForestStorage
burned pilorama.BurnedForest
metaBase *meta.DB
tsSource TombstoneSource
@ -146,7 +148,8 @@ func New(opts ...Option) *Shard {
}
if s.piloramaOpts != nil {
s.pilorama = pilorama.NewBoltForest(c.piloramaOpts...)
//s.pilorama = pilorama.NewBoltForest(c.piloramaOpts...)
s.burned = pilorama.NewPilorama(c.piloramaOpts...)
}
s.fillInfo()

View file

@ -7,6 +7,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
cidSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
@ -427,3 +428,110 @@ func (s *Shard) TreeApplyStream(ctx context.Context, cnr cidSDK.ID, treeID strin
}
return s.pilorama.TreeApplyStream(ctx, cnr, treeID, source)
}
func (s *Shard) Add(d pilorama.CIDDescriptor, treeID string, key []byte, meta pilorama.Meta) (pilorama.Operation, error) {
if s.burned == nil {
return pilorama.Operation{}, ErrPiloramaDisabled
}
s.m.RLock()
defer s.m.RUnlock()
if s.info.Mode.ReadOnly() {
return pilorama.Operation{}, ErrReadOnlyMode
}
if s.info.Mode.NoMetabase() {
return pilorama.Operation{}, ErrDegradedMode
}
return s.burned.Add(d, treeID, key, meta)
}
func (s *Shard) Remove(d pilorama.CIDDescriptor, treeID string, key []byte) (pilorama.Operation, error) {
if s.burned == nil {
return pilorama.Operation{}, ErrPiloramaDisabled
}
s.m.RLock()
defer s.m.RUnlock()
if s.info.Mode.ReadOnly() {
return pilorama.Operation{}, ErrReadOnlyMode
}
if s.info.Mode.NoMetabase() {
return pilorama.Operation{}, ErrDegradedMode
}
return s.burned.Remove(d, treeID, key)
}
func (s *Shard) Apply(cnr cid.ID, treeID string, op pilorama.Operation) error {
if s.burned == nil {
return ErrPiloramaDisabled
}
s.m.RLock()
defer s.m.RUnlock()
if s.info.Mode.ReadOnly() {
return ErrReadOnlyMode
}
if s.info.Mode.NoMetabase() {
return ErrDegradedMode
}
return s.burned.Apply(cnr, treeID, op)
}
func (s *Shard) GetLatest(cnr cid.ID, treeID string, key []byte) (pilorama.Meta, error) {
if s.burned == nil {
return pilorama.Meta{}, ErrPiloramaDisabled
}
s.m.RLock()
defer s.m.RUnlock()
if s.info.Mode.NoMetabase() {
return pilorama.Meta{}, ErrDegradedMode
}
return s.burned.GetLatest(cnr, treeID, key)
}
func (s *Shard) GetLatestByPrefix(cnr cid.ID, treeID string, prefix []byte) (pilorama.MetaVersion, error) {
if s.burned == nil {
return pilorama.MetaVersion{}, ErrPiloramaDisabled
}
s.m.RLock()
defer s.m.RUnlock()
if s.info.Mode.NoMetabase() {
return pilorama.MetaVersion{}, ErrDegradedMode
}
return s.burned.GetLatestByPrefix(cnr, treeID, prefix)
}
func (s *Shard) ListByPrefix(cnr cid.ID, treeID string, key []byte) ([]pilorama.MetaVersion, error) {
if s.burned == nil {
return nil, ErrPiloramaDisabled
}
s.m.RLock()
defer s.m.RUnlock()
if s.info.Mode.NoMetabase() {
return nil, ErrDegradedMode
}
return s.burned.ListByPrefix(cnr, treeID, key)
}
func (s *Shard) ListAll(cnr cid.ID, treeID string, param pilorama.ListParam) ([][]byte, error) {
if s.burned == nil {
return nil, ErrPiloramaDisabled
}
s.m.RLock()
defer s.m.RUnlock()
if s.info.Mode.NoMetabase() {
return nil, ErrDegradedMode
}
return s.burned.ListAll(cnr, treeID, param)
}

View file

@ -35,6 +35,8 @@ type cfg struct {
cnrSource ContainerSource
frostfsidSubjectProvider frostfsidcore.SubjectProvider
forest pilorama.Forest
burned pilorama.BurnedFully
// replication-related parameters
replicatorChannelCapacity int
replicatorWorkerCount int
@ -97,6 +99,13 @@ func WithStorage(s pilorama.Forest) Option {
}
}
// WithBurnedStorage sets tree storage for a service.
func WithBurnedStorage(s pilorama.BurnedFully) Option {
return func(c *cfg) {
c.burned = s
}
}
func WithReplicationChannelCapacity(n int) Option {
return func(c *cfg) {
if n > 0 {

View file

@ -51,6 +51,146 @@ service TreeService {
rpc GetOpLog(GetOpLogRequest) returns (stream GetOpLogResponse);
// Healthcheck is a dummy rpc to check service availability
rpc Healthcheck(HealthcheckRequest) returns (HealthcheckResponse);
rpc BurnedAdd(BurnedAddRequest) returns (BurnedAddResponse);
rpc BurnedRemove(BurnedRemoveRequest) returns (BurnedRemoveResponse);
rpc BurnedApply(BurnedApplyRequest) returns (BurnedApplyResponse);
rpc BurnedGet(BurnedGetRequest) returns (BurnedGetResponse);
rpc BurnedGetLatestByPrefix(BurnedGetLatestByPrefixRequest)
returns (BurnedGetLatestByPrefixResponse);
rpc BurnedListByPrefix(BurnedListByPrefixRequest)
returns (BurnedListByPrefixResponse);
rpc BurnedList(BurnedListRequest) returns (stream BurnedListResponse);
}
message BurnedAddRequest {
message Body {
bytes container_id = 1;
string tree_id = 2;
string key = 3;
repeated KeyValue meta = 4;
}
Body body = 1;
Signature signature = 2;
}
message BurnedAddResponse {
message Body {}
Body body = 1;
Signature signature = 2;
}
message BurnedRemoveRequest {
message Body {
bytes container_id = 1;
string tree_id = 2;
string key = 3;
}
Body body = 1;
Signature signature = 2;
}
message BurnedRemoveResponse {
message Body {}
Body body = 1;
Signature signature = 2;
}
message BurnedApplyRequest {
message Body {
bytes container_id = 1;
string tree_id = 2;
bytes op = 3;
}
Body body = 1;
Signature signature = 2;
}
message BurnedApplyResponse {
message Body {}
Body body = 1;
Signature signature = 2;
}
message BurnedGetRequest {
message Body {
bytes container_id = 1;
string tree_id = 2;
string key = 3;
bytes bearer_token = 5;
}
Body body = 1;
Signature signature = 2;
}
message BurnedGetResponse {
message Body {
uint64 timestamp = 1;
repeated KeyValue meta = 2;
}
Body body = 1;
Signature signature = 2;
}
message BurnedGetLatestByPrefixRequest {
message Body {
bytes container_id = 1;
string tree_id = 2;
string prefix = 3;
bytes bearer_token = 5;
}
Body body = 1;
Signature signature = 2;
}
message BurnedGetLatestByPrefixResponse {
message Body {
string key = 1;
uint64 timestamp = 2;
repeated KeyValue meta = 3;
}
Body body = 1;
Signature signature = 2;
}
message BurnedListByPrefixRequest {
message Body {
bytes container_id = 1;
string tree_id = 2;
string key = 3;
}
Body body = 1;
Signature signature = 2;
}
message BurnedListByPrefixResponse {
message Body {
message Info {
string key = 1;
uint64 timestamp = 2;
repeated KeyValue meta = 3;
}
repeated Info list = 1;
}
Body body = 1;
Signature signature = 2;
}
message BurnedListRequest {
message Body {
bytes container_id = 1;
string tree_id = 2;
string start = 3;
}
Body body = 1;
Signature signature = 2;
}
message BurnedListResponse {
message Body {
string key = 1;
uint64 timestamp = 2;
repeated KeyValue meta = 3;
}
Body body = 1;
Signature signature = 2;
}
message AddRequest {

Binary file not shown.

Binary file not shown.

View file

@ -0,0 +1,229 @@
package tree
import (
"context"
"errors"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
cidSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
)
func (s *Service) BurnedAdd(ctx context.Context, req *BurnedAddRequest) (*BurnedAddResponse, error) {
b := req.GetBody()
var cid cidSDK.ID
if err := cid.Decode(b.GetContainerId()); err != nil {
return nil, err
}
ns, pos, err := s.getContainerNodes(cid)
if err != nil {
return nil, err
}
if pos < 0 {
return nil, errors.New("the node is not in the container")
}
d := pilorama.CIDDescriptor{CID: cid, Position: pos, Size: len(ns)}
meta := protoToMeta(b.GetMeta())
key := b.GetKey()
if key == "" {
for i := range meta {
if meta[i].Key == pilorama.AttributeFilename {
key = string(meta[i].Value)
}
}
}
log, err := s.burned.Add(d, b.GetTreeId(), []byte(key), pilorama.Meta{
Items: meta,
})
if err != nil {
return nil, err
}
_ = log
// s.pushToQueue(cid, b.GetTreeId(), log)
return &BurnedAddResponse{Body: &BurnedAddResponse_Body{}}, nil
}
func (s *Service) BurnedRemove(ctx context.Context, req *BurnedRemoveRequest) (*BurnedRemoveResponse, error) {
b := req.GetBody()
var cid cidSDK.ID
if err := cid.Decode(b.GetContainerId()); err != nil {
return nil, err
}
ns, pos, err := s.getContainerNodes(cid)
if err != nil {
return nil, err
}
if pos < 0 {
return nil, errors.New("the node is not in the container")
}
d := pilorama.CIDDescriptor{CID: cid, Position: pos, Size: len(ns)}
key := b.GetKey()
log, err := s.burned.Remove(d, b.GetTreeId(), []byte(key))
if err != nil {
return nil, err
}
_ = log
// s.pushToQueue(cid, b.GetTreeId(), log)
return &BurnedRemoveResponse{Body: &BurnedRemoveResponse_Body{}}, nil
}
func (s *Service) BurnedApply(ctx context.Context, req *BurnedApplyRequest) (*BurnedApplyResponse, error) {
var cid cidSDK.ID
if err := cid.Decode(req.GetBody().GetContainerId()); err != nil {
return nil, err
}
key := req.GetSignature().GetKey()
_, pos, _, err := s.getContainerInfo(cid, key)
if err != nil {
return nil, err
}
if pos < 0 {
return nil, errors.New("`Apply` request must be signed by a container node")
}
rawOp := req.GetBody().GetOp()
var op pilorama.Operation
if err := op.FromBytes(rawOp); err != nil {
return nil, err
}
// select {
// case s.replicateLocalCh <- applyOp{
// treeID: req.GetBody().GetTreeId(),
// cid: cid,
// Move: pilorama.Move{
// Parent: op.GetParentId(),
// Child: op.GetChildId(),
// Meta: meta,
// },
// }:
// default:
// }
return &BurnedApplyResponse{Body: &BurnedApplyResponse_Body{}, Signature: &Signature{}}, nil
}
func (s *Service) BurnedGet(ctx context.Context, req *BurnedGetRequest) (*BurnedGetResponse, error) {
b := req.GetBody()
var cid cidSDK.ID
if err := cid.Decode(req.GetBody().GetContainerId()); err != nil {
return nil, err
}
meta, err := s.burned.GetLatest(cid, b.GetTreeId(), []byte(b.GetKey()))
if err != nil {
return nil, err
}
res := metaToProto(meta.Items)
return &BurnedGetResponse{
Body: &BurnedGetResponse_Body{
Timestamp: meta.Time,
Meta: res,
},
}, nil
}
func (s *Service) BurnedGetLatestByPrefix(ctx context.Context, req *BurnedGetLatestByPrefixRequest) (*BurnedGetLatestByPrefixResponse, error) {
b := req.GetBody()
var cid cidSDK.ID
if err := cid.Decode(req.GetBody().GetContainerId()); err != nil {
return nil, err
}
meta, err := s.burned.GetLatestByPrefix(cid, b.GetTreeId(), []byte(b.GetPrefix()))
if err != nil {
return nil, err
}
res := metaToProto(meta.Meta.Items)
return &BurnedGetLatestByPrefixResponse{
Body: &BurnedGetLatestByPrefixResponse_Body{
Timestamp: meta.Meta.Time,
Meta: res,
},
}, nil
}
func (s *Service) BurnedListByPrefix(ctx context.Context, req *BurnedListByPrefixRequest) (*BurnedListByPrefixResponse, error) {
b := req.GetBody()
var cid cidSDK.ID
if err := cid.Decode(req.GetBody().GetContainerId()); err != nil {
return nil, err
}
vs, err := s.burned.ListByPrefix(cid, b.GetTreeId(), []byte(b.GetKey()))
if err != nil {
return nil, err
}
res := &BurnedListByPrefixResponse{Body: &BurnedListByPrefixResponse_Body{}}
for i := range vs {
res.Body.List = append(res.Body.List, BurnedListByPrefixResponse_Body_Info{
Key: string(vs[i].Key),
Timestamp: vs[i].Meta.Time,
Meta: metaToProto(vs[i].Meta.Items),
})
}
return res, nil
}
func (s *Service) BurnedList(req *BurnedListRequest, srv TreeService_BurnedListServer) error {
b := req.GetBody()
var cid cidSDK.ID
if err := cid.Decode(req.GetBody().GetContainerId()); err != nil {
return err
}
const count = 100
var start []byte
var inclusive bool
if len(b.GetStart()) != 0 {
start = []byte(b.GetStart())
inclusive = true
}
for {
res, err := s.burned.ListAll(cid, b.GetTreeId(), pilorama.ListParam{
Inclusive: inclusive,
Start: start,
Count: count,
})
if err != nil {
return err
}
for i := range res {
meta, err := s.burned.GetLatest(cid, b.GetTreeId(), res[i])
if err != nil {
return err
}
err = srv.Send(&BurnedListResponse{
Body: &BurnedListResponse_Body{
Key: string(res[i]),
Timestamp: meta.Time,
Meta: metaToProto(meta.Items),
},
})
if err != nil {
return err
}
}
if len(res) < count {
return nil
}
inclusive = false
start = res[len(res)-1]
}
}

Binary file not shown.