From 82a30c0775cd46824299d61d293467778cf1bf8d Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Thu, 24 Aug 2023 17:44:09 +0300 Subject: [PATCH] [#645] blobstor: Add simple blobtree impl Signed-off-by: Dmitrii Stepanov --- .../blobstor/blobtree/blobtree.go | 80 ++++++++ .../blobstor/blobtree/config.go | 14 ++ .../blobstor/blobtree/content.go | 191 ++++++++++++++++++ .../blobstor/blobtree/control.go | 98 +++++++++ .../blobstor/blobtree/delete.go | 124 ++++++++++++ .../blobstor/blobtree/dispatcher.go | 94 +++++++++ .../blobstor/blobtree/dispatcher_test.go | 29 +++ .../blobstor/blobtree/exists.go | 54 +++++ .../blobstor/blobtree/generic_test.go | 39 ++++ .../blobstor/blobtree/get.go | 107 ++++++++++ .../blobstor/blobtree/get_range.go | 28 +++ .../blobstor/blobtree/iterate.go | 96 +++++++++ .../blobstor/blobtree/option.go | 29 +++ .../blobstor/blobtree/put.go | 88 ++++++++ .../blobstor/perf_test.go | 10 + pkg/util/sync/key_locker.go | 46 ++++- pkg/util/sync/key_locker_test.go | 16 +- 17 files changed, 1133 insertions(+), 10 deletions(-) create mode 100644 pkg/local_object_storage/blobstor/blobtree/blobtree.go create mode 100644 pkg/local_object_storage/blobstor/blobtree/config.go create mode 100644 pkg/local_object_storage/blobstor/blobtree/content.go create mode 100644 pkg/local_object_storage/blobstor/blobtree/control.go create mode 100644 pkg/local_object_storage/blobstor/blobtree/delete.go create mode 100644 pkg/local_object_storage/blobstor/blobtree/dispatcher.go create mode 100644 pkg/local_object_storage/blobstor/blobtree/dispatcher_test.go create mode 100644 pkg/local_object_storage/blobstor/blobtree/exists.go create mode 100644 pkg/local_object_storage/blobstor/blobtree/generic_test.go create mode 100644 pkg/local_object_storage/blobstor/blobtree/get.go create mode 100644 pkg/local_object_storage/blobstor/blobtree/get_range.go create mode 100644 pkg/local_object_storage/blobstor/blobtree/iterate.go create mode 100644 pkg/local_object_storage/blobstor/blobtree/option.go create mode 100644 pkg/local_object_storage/blobstor/blobtree/put.go diff --git a/pkg/local_object_storage/blobstor/blobtree/blobtree.go b/pkg/local_object_storage/blobstor/blobtree/blobtree.go new file mode 100644 index 000000000..3f51c345f --- /dev/null +++ b/pkg/local_object_storage/blobstor/blobtree/blobtree.go @@ -0,0 +1,80 @@ +package blobtree + +import ( + "errors" + "path/filepath" + "strings" + "sync/atomic" + "syscall" + + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/compression" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util" + utilSync "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/sync" + oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" +) + +var _ common.Storage = &BlobTree{} + +type BlobTree struct { + cfg cfg + dirLock *utilSync.KeyLocker[string] + fileLock *utilSync.KeyLocker[string] + compressor *compression.Config + dispatcher *rootDispatcher + suffix atomic.Uint64 +} + +func New(opts ...Option) *BlobTree { + b := &BlobTree{ + cfg: cfg{ + targetFileSizeBytes: 4 * 1024 * 1024, + rootPath: "./", + depth: 3, + permissions: 0700, + initWorkersCount: 1000, + }, + dirLock: utilSync.NewKeyLocker[string](), + fileLock: utilSync.NewKeyLocker[string](), + } + + for _, opt := range opts { + opt(&b.cfg) + } + + b.dispatcher = newRootDispatcher() + + return b +} + +func (b *BlobTree) getDirectoryPath(addr oid.Address) string { + sAddr := addr.Object().EncodeToString() + "." + addr.Container().EncodeToString() + var sb strings.Builder + size := int(1+b.cfg.depth*(directoryLength+1)) + len(b.cfg.rootPath) // /path + slash + (character + slash for every level) + sb.Grow(size) + sb.WriteString(b.cfg.rootPath) + + for i := uint64(0); i < b.cfg.depth; i++ { + sb.WriteRune(filepath.Separator) + sb.WriteString(sAddr[:directoryLength]) + sAddr = sAddr[directoryLength:] + } + + sb.WriteRune(filepath.Separator) + return sb.String() +} + +func (b *BlobTree) createDir(dir string) error { + b.dirLock.Lock(dir) + defer b.dirLock.Unlock(dir) + + if err := util.MkdirAllX(dir, b.cfg.permissions); err != nil { + if errors.Is(err, syscall.ENOSPC) { + err = common.ErrNoSpace + return err + } + return err + } + + return nil +} diff --git a/pkg/local_object_storage/blobstor/blobtree/config.go b/pkg/local_object_storage/blobstor/blobtree/config.go new file mode 100644 index 000000000..823c5b6f7 --- /dev/null +++ b/pkg/local_object_storage/blobstor/blobtree/config.go @@ -0,0 +1,14 @@ +package blobtree + +import "io/fs" + +var directoryLength uint64 = 1 + +type cfg struct { + rootPath string + depth uint64 + targetFileSizeBytes uint64 + permissions fs.FileMode + readOnly bool + initWorkersCount int +} diff --git a/pkg/local_object_storage/blobstor/blobtree/content.go b/pkg/local_object_storage/blobstor/blobtree/content.go new file mode 100644 index 000000000..3760d1648 --- /dev/null +++ b/pkg/local_object_storage/blobstor/blobtree/content.go @@ -0,0 +1,191 @@ +package blobtree + +import ( + "crypto/sha256" + "encoding/binary" + "errors" + "fmt" + "os" + "path/filepath" + "strconv" + + cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" + oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" +) + +const ( + defaultVersion = 0 + + sizeOfVersion = 1 + sizeOfCount = 8 + sizeOfDataLength = 8 + sizeOfContainerID = sha256.Size + sizeOfObjectID = sha256.Size +) + +var ( + errFileToSmall = errors.New("invalid file content: not enough bytes to read count of records") + errInvalidFileContentVersion = errors.New("invalid file content: not enough bytes to read record version") + errInvalidFileContentContainerID = errors.New("invalid file content: not enough bytes to read container ID") + errInvalidFileContentObjectID = errors.New("invalid file content: not enough bytes to read object ID") + errInvalidFileContentLength = errors.New("invalid file content: not enough bytes to read data length") + errInvalidFileContentData = errors.New("invalid file content: not enough bytes to read data") +) + +type objectData struct { + Version byte + Address oid.Address + Data []byte +} + +func (b *BlobTree) readFileContent(path string) ([]objectData, error) { + rawData, err := os.ReadFile(path) + if err != nil { + if os.IsNotExist(err) { + return []objectData{}, nil + } + return nil, err + } + return b.unmarshalSlice(rawData) +} + +func (b *BlobTree) unmarshalSlice(data []byte) ([]objectData, error) { + if len(data) < sizeOfCount { + return nil, errFileToSmall + } + count := binary.LittleEndian.Uint64(data[:8]) + result := make([]objectData, 0, count) + + data = data[sizeOfCount:] + var idx uint64 + for idx = 0; idx < count; idx++ { + record, read, err := b.unmarshalRecord(data) + if err != nil { + return nil, err + } + result = append(result, record) + data = data[read:] + } + + return result, nil +} + +func (b *BlobTree) unmarshalRecord(data []byte) (objectData, uint64, error) { + if len(data) < sizeOfVersion { + return objectData{}, 0, errInvalidFileContentVersion + } + var result objectData + var read uint64 + result.Version = data[0] + if result.Version != defaultVersion { + return objectData{}, 0, fmt.Errorf("invalid file content: unknown version %d", result.Version) + } + read += sizeOfVersion + + if len(data[read:]) < sizeOfContainerID { + return objectData{}, 0, errInvalidFileContentContainerID + } + var contID cid.ID + if err := contID.Decode(data[read : read+sizeOfContainerID]); err != nil { + return objectData{}, 0, fmt.Errorf("invalid file content: failed to read container ID: %w", err) + } + read += sizeOfContainerID + + if len(data[read:]) < sizeOfObjectID { + return objectData{}, 0, errInvalidFileContentObjectID + } + var objID oid.ID + if err := objID.Decode(data[read : read+sizeOfObjectID]); err != nil { + return objectData{}, 0, fmt.Errorf("invalid file content: failed to read object ID: %w", err) + } + read += sizeOfObjectID + + result.Address.SetContainer(contID) + result.Address.SetObject(objID) + + if len(data[read:]) < sizeOfDataLength { + return objectData{}, 0, errInvalidFileContentLength + } + dataLength := binary.LittleEndian.Uint64(data[read : read+sizeOfDataLength]) + read += sizeOfDataLength + + if uint64(len(data[read:])) < dataLength { + return objectData{}, 0, errInvalidFileContentData + } + result.Data = make([]byte, dataLength) + copy(result.Data, data[read:read+dataLength]) + read += dataLength + + return result, read, nil +} + +func (b *BlobTree) saveContentToFile(records []objectData, path string) (uint64, error) { + data, err := b.marshalSlice(records) + if err != nil { + return 0, err + } + return uint64(len(data)), b.writeFile(path, data) +} + +func (b *BlobTree) writeFile(p string, data []byte) error { + f, err := os.OpenFile(p, os.O_WRONLY|os.O_CREATE|os.O_TRUNC|os.O_EXCL|os.O_SYNC, b.cfg.permissions) + if err != nil { + return err + } + _, err = f.Write(data) + if err1 := f.Close(); err1 != nil && err == nil { + err = err1 + } + return err +} + +func (b *BlobTree) marshalSlice(records []objectData) ([]byte, error) { + buf := make([]byte, b.estimateSize(records)) + result := buf + binary.LittleEndian.PutUint64(buf, uint64(len(records))) + buf = buf[sizeOfCount:] + for _, record := range records { + written := b.marshalRecord(record, buf) + buf = buf[written:] + } + return result, nil +} + +func (b *BlobTree) marshalRecord(record objectData, dst []byte) uint64 { + var written uint64 + + dst[0] = record.Version + dst = dst[sizeOfVersion:] + written += sizeOfVersion + + record.Address.Container().Encode(dst) + dst = dst[sizeOfContainerID:] + written += sizeOfContainerID + + record.Address.Object().Encode(dst) + dst = dst[sizeOfObjectID:] + written += sizeOfObjectID + + binary.LittleEndian.PutUint64(dst, uint64(len(record.Data))) + dst = dst[sizeOfDataLength:] + written += sizeOfDataLength + + copy(dst, record.Data) + written += uint64(len(record.Data)) + + return written +} + +func (b *BlobTree) estimateSize(records []objectData) uint64 { + var result uint64 + result += sizeOfCount + for _, record := range records { + result += (sizeOfVersion + sizeOfContainerID + sizeOfObjectID + sizeOfDataLength) + result += uint64(len(record.Data)) + } + return result +} + +func (b *BlobTree) getFilePath(dir string, idx uint64) string { + return filepath.Join(dir, strconv.FormatUint(idx, 16)) +} diff --git a/pkg/local_object_storage/blobstor/blobtree/control.go b/pkg/local_object_storage/blobstor/blobtree/control.go new file mode 100644 index 000000000..ec33ef2cf --- /dev/null +++ b/pkg/local_object_storage/blobstor/blobtree/control.go @@ -0,0 +1,98 @@ +package blobtree + +import ( + "os" + "path/filepath" + "strconv" + "strings" + + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/compression" + "golang.org/x/sync/errgroup" +) + +var Type = "blobtree" + +func (b *BlobTree) Open(readOnly bool) error { + b.cfg.readOnly = readOnly + return nil +} + +func (b *BlobTree) Init() error { + if err := b.createDir(b.cfg.rootPath); err != nil { + return err + } + + var eg errgroup.Group + eg.SetLimit(b.cfg.initWorkersCount) + eg.Go(func() error { + return b.initDir(&eg, b.cfg.rootPath, 0) + }) + return eg.Wait() +} + +func (b *BlobTree) initDir(eg *errgroup.Group, dir string, depth uint64) error { + entities, err := os.ReadDir(dir) + if err != nil { + return err + } + for _, entity := range entities { + if depth < b.cfg.depth && entity.IsDir() { + eg.Go(func() error { + return b.initDir(eg, filepath.Join(dir, entity.Name()), depth+1) + }) + continue + } + + if depth != b.cfg.depth { + continue + } + + if b.isTempFile(entity.Name()) { + if err = os.Remove(filepath.Join(dir, entity.Name())); err != nil { + return err + } + continue + } + + idx, err := b.parseIdx(entity.Name()) + if err != nil { + continue + } + b.dispatcher.Init(dir, idx) + + stat, err := os.Stat(filepath.Join(dir, entity.Name())) + if err != nil { + return err + } + if stat.Size() < int64(b.cfg.targetFileSizeBytes) { + b.dispatcher.ReturnIdx(dir, idx) + } + } + return nil +} + +func (b *BlobTree) isTempFile(name string) bool { + return strings.Contains(name, tempFileSymbols) +} + +func (b *BlobTree) parseIdx(name string) (uint64, error) { + return strconv.ParseUint(name, 16, 64) +} + +func (b *BlobTree) Close() error { + return nil +} + +func (b *BlobTree) Type() string { return Type } +func (b *BlobTree) Path() string { return b.cfg.rootPath } + +func (b *BlobTree) SetCompressor(cc *compression.Config) { + b.compressor = cc +} + +func (b *BlobTree) Compressor() *compression.Config { + return b.compressor +} + +func (b *BlobTree) SetReportErrorFunc(_ func(string, error)) {} +func (b *BlobTree) SetParentID(_ string) {} diff --git a/pkg/local_object_storage/blobstor/blobtree/delete.go b/pkg/local_object_storage/blobstor/blobtree/delete.go new file mode 100644 index 000000000..482a8855a --- /dev/null +++ b/pkg/local_object_storage/blobstor/blobtree/delete.go @@ -0,0 +1,124 @@ +package blobtree + +import ( + "context" + "encoding/binary" + "os" + + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr" + apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" + oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" +) + +func (b *BlobTree) Delete(_ context.Context, prm common.DeletePrm) (common.DeleteRes, error) { + if b.cfg.readOnly { + return common.DeleteRes{}, common.ErrReadOnly + } + + if len(prm.StorageID) == storageIDLength { + return b.deleteFromIdx(prm.Address, binary.LittleEndian.Uint64(prm.StorageID)) + } + return b.findAndDelete(prm.Address) +} + +func (b *BlobTree) deleteFromIdx(addr oid.Address, idx uint64) (common.DeleteRes, error) { + dir := b.getDirectoryPath(addr) + path := b.getFilePath(dir, idx) + + b.fileLock.Lock(path) + defer b.fileLock.Unlock(path) + + records, err := b.readFileContent(path) + if err != nil { + return common.DeleteRes{}, err + } + + deleteIdx := -1 + for i := range records { + if records[i].Address.Equals(addr) { + deleteIdx = i + break + } + } + + if deleteIdx == -1 { + return common.DeleteRes{}, logicerr.Wrap(new(apistatus.ObjectNotFound)) + } + + if len(records) == 1 { + err = os.Remove(path) + if err == nil { + b.dispatcher.ReturnIdx(dir, idx) + // decrease files metric + } + return common.DeleteRes{}, err + } + + records = append(records[:idx], records[idx+1:]...) + size, err := b.writeToTmpAndRename(records, path) + if err != nil { + return common.DeleteRes{}, err + } + if size < b.cfg.targetFileSizeBytes { + b.dispatcher.ReturnIdx(dir, idx) + } + return common.DeleteRes{}, nil +} + +func (b *BlobTree) findAndDelete(addr oid.Address) (common.DeleteRes, error) { + dir := b.getDirectoryPath(addr) + idx, err := b.findFileIdx(dir, addr) + if err != nil { + return common.DeleteRes{}, err + } + return b.deleteFromIdx(addr, idx) +} + +func (b *BlobTree) findFileIdx(dir string, addr oid.Address) (uint64, error) { + entities, err := os.ReadDir(dir) + if err != nil { + if os.IsNotExist(err) { + return 0, logicerr.Wrap(new(apistatus.ObjectNotFound)) + } + return 0, err + } + for _, entity := range entities { + if entity.IsDir() { + continue + } + if b.isTempFile(entity.Name()) { + continue + } + idx, err := b.parseIdx(entity.Name()) + if err != nil { + continue + } + path := b.getFilePath(dir, idx) + contains, err := b.fileContainsObject(path, addr) + if err != nil { + return 0, err + } + if contains { + return idx, nil + } + } + return 0, logicerr.Wrap(new(apistatus.ObjectNotFound)) +} + +func (b *BlobTree) fileContainsObject(path string, addr oid.Address) (bool, error) { + b.fileLock.RLock(path) + defer b.fileLock.RUnlock(path) + + records, err := b.readFileContent(path) + if err != nil { + return false, err + } + + for i := range records { + if records[i].Address.Equals(addr) { + return true, nil + } + } + return false, nil +} diff --git a/pkg/local_object_storage/blobstor/blobtree/dispatcher.go b/pkg/local_object_storage/blobstor/blobtree/dispatcher.go new file mode 100644 index 000000000..1ce5d9da9 --- /dev/null +++ b/pkg/local_object_storage/blobstor/blobtree/dispatcher.go @@ -0,0 +1,94 @@ +package blobtree + +import ( + "sync" +) + +type rootDispatcher struct { + dispatchers map[string]*dirDispatcher + guard sync.Mutex +} + +func newRootDispatcher() *rootDispatcher { + return &rootDispatcher{ + dispatchers: make(map[string]*dirDispatcher), + } +} + +func (d *rootDispatcher) GetIdxForWrite(dir string) uint64 { + return d.getDirDispatcher(dir).GetIdxForWrite() +} + +func (d *rootDispatcher) ReturnIdx(dir string, idx uint64) { + d.getDirDispatcher(dir).ReturnIdx(idx) +} + +func (d *rootDispatcher) Init(dir string, idx uint64) { + d.getDirDispatcher(dir).Init(idx) +} + +func (d *rootDispatcher) getDirDispatcher(dir string) *dirDispatcher { + d.guard.Lock() + defer d.guard.Unlock() + + if result, ok := d.dispatchers[dir]; ok { + return result + } + + result := newDirDispatcher(dir) + d.dispatchers[dir] = result + return result +} + +type dirDispatcher struct { + dir string + guard sync.Mutex + indicies map[uint64]struct{} + nextIndex uint64 +} + +func newDirDispatcher(dir string) *dirDispatcher { + return &dirDispatcher{ + dir: dir, + indicies: make(map[uint64]struct{}), + } +} + +func (d *dirDispatcher) GetIdxForWrite() uint64 { + d.guard.Lock() + defer d.guard.Unlock() + + var result uint64 + var found bool + + for idx := range d.indicies { + result = idx + found = true + break + } + + if found { + delete(d.indicies, result) + return result + } + + result = d.nextIndex + d.nextIndex++ + return result +} + +func (d *dirDispatcher) ReturnIdx(idx uint64) { + d.guard.Lock() + defer d.guard.Unlock() + + d.indicies[idx] = struct{}{} +} + +func (d *dirDispatcher) Init(idx uint64) { + d.guard.Lock() + defer d.guard.Unlock() + + if d.nextIndex <= idx { + d.nextIndex = idx + 1 + } +} diff --git a/pkg/local_object_storage/blobstor/blobtree/dispatcher_test.go b/pkg/local_object_storage/blobstor/blobtree/dispatcher_test.go new file mode 100644 index 000000000..ed72bf948 --- /dev/null +++ b/pkg/local_object_storage/blobstor/blobtree/dispatcher_test.go @@ -0,0 +1,29 @@ +package blobtree + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestDispatcher(t *testing.T) { + t.Parallel() + + d := newRootDispatcher() + idx := d.GetIdxForWrite("/dir1") + require.Equal(t, uint64(0), idx) + d.ReturnIdx("/dir1", idx) + + idx = d.GetIdxForWrite("/dir1") + require.Equal(t, uint64(0), idx) + + idx = d.GetIdxForWrite("/dir1") + require.Equal(t, uint64(1), idx) + + d.Init("/dir2", 5) + idx = d.GetIdxForWrite("/dir2") + require.Equal(t, uint64(6), idx) + + idx = d.GetIdxForWrite("/dir2") + require.Equal(t, uint64(7), idx) +} diff --git a/pkg/local_object_storage/blobstor/blobtree/exists.go b/pkg/local_object_storage/blobstor/blobtree/exists.go new file mode 100644 index 000000000..d42dfa766 --- /dev/null +++ b/pkg/local_object_storage/blobstor/blobtree/exists.go @@ -0,0 +1,54 @@ +package blobtree + +import ( + "context" + "encoding/binary" + "errors" + + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" + apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" + oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" +) + +func (b *BlobTree) Exists(_ context.Context, prm common.ExistsPrm) (common.ExistsRes, error) { + if len(prm.StorageID) == storageIDLength { + return b.existsFromIdx(prm.Address, binary.LittleEndian.Uint64(prm.StorageID)) + } + return b.findAndCheck(prm.Address) +} + +func (b *BlobTree) existsFromIdx(addr oid.Address, idx uint64) (common.ExistsRes, error) { + dir := b.getDirectoryPath(addr) + path := b.getFilePath(dir, idx) + + b.fileLock.RLock(path) + defer b.fileLock.RUnlock(path) + + records, err := b.readFileContent(path) + if err != nil { + return common.ExistsRes{}, err + } + + for i := range records { + if records[i].Address.Equals(addr) { + return common.ExistsRes{ + Exists: true, + }, nil + } + } + + return common.ExistsRes{}, nil +} + +func (b *BlobTree) findAndCheck(addr oid.Address) (common.ExistsRes, error) { + dir := b.getDirectoryPath(addr) + _, err := b.findFileIdx(dir, addr) + if err != nil { + var notFound *apistatus.ObjectNotFound + if errors.As(err, ¬Found) { + return common.ExistsRes{}, nil + } + return common.ExistsRes{}, err + } + return common.ExistsRes{Exists: true}, nil +} diff --git a/pkg/local_object_storage/blobstor/blobtree/generic_test.go b/pkg/local_object_storage/blobstor/blobtree/generic_test.go new file mode 100644 index 000000000..f0c24aee5 --- /dev/null +++ b/pkg/local_object_storage/blobstor/blobtree/generic_test.go @@ -0,0 +1,39 @@ +package blobtree + +import ( + "testing" + + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/internal/blobstortest" +) + +func TestGeneric(t *testing.T) { + newTreeFromPath := func(path string) common.Storage { + return New( + WithPath(path), + WithDepth(2)) + } + + newTree := func(t *testing.T) common.Storage { + return newTreeFromPath(t.TempDir()) + } + + blobstortest.TestAll(t, newTree, 2048, 16*1024) + + t.Run("info", func(t *testing.T) { + path := t.TempDir() + blobstortest.TestInfo(t, func(*testing.T) common.Storage { + return newTreeFromPath(path) + }, Type, path) + }) +} + +func TestControl(t *testing.T) { + newTree := func(t *testing.T) common.Storage { + return New( + WithPath(t.TempDir()), + WithDepth(2)) + } + + blobstortest.TestControl(t, newTree, 2048, 2048) +} diff --git a/pkg/local_object_storage/blobstor/blobtree/get.go b/pkg/local_object_storage/blobstor/blobtree/get.go new file mode 100644 index 000000000..41ee462c9 --- /dev/null +++ b/pkg/local_object_storage/blobstor/blobtree/get.go @@ -0,0 +1,107 @@ +package blobtree + +import ( + "context" + "encoding/binary" + "os" + + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr" + apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" + objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" + oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" +) + +func (b *BlobTree) Get(_ context.Context, prm common.GetPrm) (common.GetRes, error) { + if len(prm.StorageID) == storageIDLength { + return b.getFromIdx(prm.Address, binary.LittleEndian.Uint64(prm.StorageID)) + } + return b.findAndGet(prm.Address) +} + +func (b *BlobTree) getFromIdx(addr oid.Address, idx uint64) (common.GetRes, error) { + dir := b.getDirectoryPath(addr) + path := b.getFilePath(dir, idx) + + b.fileLock.RLock(path) + defer b.fileLock.RUnlock(path) + + records, err := b.readFileContent(path) + if err != nil { + return common.GetRes{}, err + } + + for _, record := range records { + if record.Address.Equals(addr) { + return b.unmarshalGetRes(record) + } + } + + return common.GetRes{}, logicerr.Wrap(new(apistatus.ObjectNotFound)) +} + +func (b *BlobTree) unmarshalGetRes(record objectData) (common.GetRes, error) { + data, err := b.compressor.Decompress(record.Data) + if err != nil { + return common.GetRes{}, err + } + + obj := objectSDK.New() + if err := obj.Unmarshal(data); err != nil { + return common.GetRes{}, err + } + return common.GetRes{Object: obj, RawData: data}, nil +} + +func (b *BlobTree) findAndGet(addr oid.Address) (common.GetRes, error) { + dir := b.getDirectoryPath(addr) + entities, err := os.ReadDir(dir) + if err != nil { + if os.IsNotExist(err) { + return common.GetRes{}, logicerr.Wrap(new(apistatus.ObjectNotFound)) + } + return common.GetRes{}, err + } + for _, entity := range entities { + if entity.IsDir() { + continue + } + if b.isTempFile(entity.Name()) { + continue + } + idx, err := b.parseIdx(entity.Name()) + if err != nil { + continue + } + path := b.getFilePath(dir, idx) + res, err := b.tryReadObject(path, addr) + if err != nil { + return common.GetRes{}, err + } + if res.Object != nil { + return res, nil + } + } + return common.GetRes{}, logicerr.Wrap(new(apistatus.ObjectNotFound)) +} + +func (b *BlobTree) tryReadObject(path string, addr oid.Address) (common.GetRes, error) { + b.fileLock.RLock(path) + defer b.fileLock.RUnlock(path) + + records, err := b.readFileContent(path) + if err != nil { + return common.GetRes{}, err + } + + for _, record := range records { + if record.Address.Equals(addr) { + res, err := b.unmarshalGetRes(record) + if err != nil { + return common.GetRes{}, err + } + return res, nil + } + } + return common.GetRes{}, nil +} diff --git a/pkg/local_object_storage/blobstor/blobtree/get_range.go b/pkg/local_object_storage/blobstor/blobtree/get_range.go new file mode 100644 index 000000000..8740d33f9 --- /dev/null +++ b/pkg/local_object_storage/blobstor/blobtree/get_range.go @@ -0,0 +1,28 @@ +package blobtree + +import ( + "context" + + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr" + apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" +) + +func (b *BlobTree) GetRange(ctx context.Context, prm common.GetRangePrm) (common.GetRangeRes, error) { + res, err := b.Get(ctx, common.GetPrm{Address: prm.Address, StorageID: prm.StorageID}) + if err != nil { + return common.GetRangeRes{}, err + } + + payload := res.Object.Payload() + from := prm.Range.GetOffset() + to := from + prm.Range.GetLength() + + if pLen := uint64(len(payload)); to < from || pLen < from || pLen < to { + return common.GetRangeRes{}, logicerr.Wrap(new(apistatus.ObjectOutOfRange)) + } + + return common.GetRangeRes{ + Data: payload[from:to], + }, nil +} diff --git a/pkg/local_object_storage/blobstor/blobtree/iterate.go b/pkg/local_object_storage/blobstor/blobtree/iterate.go new file mode 100644 index 000000000..af39a4a5f --- /dev/null +++ b/pkg/local_object_storage/blobstor/blobtree/iterate.go @@ -0,0 +1,96 @@ +package blobtree + +import ( + "context" + "encoding/binary" + "os" + "path/filepath" + + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" +) + +func (b *BlobTree) Iterate(_ context.Context, prm common.IteratePrm) (common.IterateRes, error) { + return common.IterateRes{}, b.iterateDir(b.cfg.rootPath, 0, prm) +} + +func (b *BlobTree) iterateDir(dir string, depth uint64, prm common.IteratePrm) error { + entities, err := os.ReadDir(dir) + if err != nil { + if prm.IgnoreErrors { + return nil + } + return err + } + + for _, entity := range entities { + if depth < b.cfg.depth && entity.IsDir() { + err := b.iterateDir(filepath.Join(dir, entity.Name()), depth+1, prm) + if err != nil { + return err + } + } + if depth != b.cfg.depth { + continue + } + if b.isTempFile(entity.Name()) { + continue + } + idx, err := b.parseIdx(entity.Name()) + if err != nil { + continue + } + path := b.getFilePath(dir, idx) + err = b.iterateRecords(idx, path, prm) + if err != nil { + return err + } + } + return nil +} + +func (b *BlobTree) iterateRecords(idx uint64, path string, prm common.IteratePrm) error { + b.fileLock.RLock(path) + defer b.fileLock.RUnlock(path) + + records, err := b.readFileContent(path) + if err != nil { + if prm.IgnoreErrors { + return nil + } + return err + } + + for _, record := range records { + if prm.LazyHandler != nil { + if err = prm.LazyHandler(record.Address, func() ([]byte, error) { + return record.Data, nil + }); err != nil { + return err + } + continue + } + + record.Data, err = b.compressor.Decompress(record.Data) + if err != nil { + if prm.IgnoreErrors { + if prm.ErrorHandler != nil { + return prm.ErrorHandler(record.Address, err) + } + continue + } + return err + } + + storageID := make([]byte, storageIDLength) + binary.LittleEndian.PutUint64(storageID, idx) + err = prm.Handler(common.IterationElement{ + Address: record.Address, + ObjectData: record.Data, + StorageID: storageID, + }) + if err != nil { + return err + } + } + return nil +} diff --git a/pkg/local_object_storage/blobstor/blobtree/option.go b/pkg/local_object_storage/blobstor/blobtree/option.go new file mode 100644 index 000000000..9bd8557ea --- /dev/null +++ b/pkg/local_object_storage/blobstor/blobtree/option.go @@ -0,0 +1,29 @@ +package blobtree + +import "io/fs" + +type Option func(*cfg) + +func WithPath(path string) Option { + return func(c *cfg) { + c.rootPath = path + } +} + +func WithDepth(depth uint64) Option { + return func(c *cfg) { + c.depth = depth + } +} + +func WithPerm(p fs.FileMode) Option { + return func(c *cfg) { + c.permissions = p + } +} + +func WithTargetSize(size uint64) Option { + return func(c *cfg) { + c.targetFileSizeBytes = size + } +} diff --git a/pkg/local_object_storage/blobstor/blobtree/put.go b/pkg/local_object_storage/blobstor/blobtree/put.go new file mode 100644 index 000000000..9c2c6779b --- /dev/null +++ b/pkg/local_object_storage/blobstor/blobtree/put.go @@ -0,0 +1,88 @@ +package blobtree + +import ( + "context" + "encoding/binary" + "os" + "strconv" + + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" +) + +const ( + tempFileSymbols = "###" + storageIDLength = 8 +) + +func (b *BlobTree) Put(_ context.Context, prm common.PutPrm) (common.PutRes, error) { + if b.cfg.readOnly { + return common.PutRes{}, common.ErrReadOnly + } + + dir := b.getDirectoryPath(prm.Address) + + if err := b.createDir(dir); err != nil { + return common.PutRes{}, err + } + + if !prm.DontCompress { + prm.RawData = b.compressor.Compress(prm.RawData) + } + + idx, err := b.saveToFile(prm, dir) + if err != nil { + return common.PutRes{}, err + } + + storageID := make([]byte, storageIDLength) + binary.LittleEndian.PutUint64(storageID, idx) + return common.PutRes{StorageID: storageID}, nil +} + +func (b *BlobTree) saveToFile(prm common.PutPrm, dir string) (uint64, error) { + returnIdx := true + idx := b.dispatcher.GetIdxForWrite(dir) + path := b.getFilePath(dir, idx) + + b.fileLock.Lock(path) + defer b.fileLock.Unlock(path) + + defer func() { + if returnIdx { + b.dispatcher.ReturnIdx(dir, idx) + } + }() + + currentContent, err := b.readFileContent(path) + if err != nil { + return 0, err + } + var newRecord objectData + newRecord.Address = prm.Address + newRecord.Data = prm.RawData + + size, err := b.writeToTmpAndRename(append(currentContent, newRecord), path) + if err != nil { + return 0, err + } + returnIdx = size < b.cfg.targetFileSizeBytes + + return idx, nil +} + +func (b *BlobTree) writeToTmpAndRename(records []objectData, path string) (uint64, error) { + tmpFile := path + tempFileSymbols + strconv.FormatUint(b.suffix.Add(1), 16) + + size, err := b.saveContentToFile(records, tmpFile) + if err != nil { + _ = os.Remove(tmpFile) + return 0, err + } + + if err := os.Rename(tmpFile, path); err != nil { + _ = os.Remove(tmpFile) + return 0, err + } + + return size, nil +} diff --git a/pkg/local_object_storage/blobstor/perf_test.go b/pkg/local_object_storage/blobstor/perf_test.go index c773ea0ee..9b1c8a1ec 100644 --- a/pkg/local_object_storage/blobstor/perf_test.go +++ b/pkg/local_object_storage/blobstor/perf_test.go @@ -6,6 +6,7 @@ import ( "testing" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/blobovniczatree" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/blobtree" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/memstore" @@ -81,6 +82,15 @@ var storages = []storage{ ) }, }, + { + desc: "blobtree", + create: func(dir string) common.Storage { + return blobtree.New( + blobtree.WithDepth(2), + blobtree.WithPath(dir), + ) + }, + }, } func BenchmarkSubstorageReadPerf(b *testing.B) { diff --git a/pkg/util/sync/key_locker.go b/pkg/util/sync/key_locker.go index 97de0386d..2a5545569 100644 --- a/pkg/util/sync/key_locker.go +++ b/pkg/util/sync/key_locker.go @@ -3,8 +3,8 @@ package sync import "sync" type locker struct { - mtx sync.Mutex - waiters int // not protected by mtx, must used outer mutex to update concurrently + mtx sync.RWMutex + userCount int // not protected by mtx, must used outer mutex to update concurrently } type KeyLocker[K comparable] struct { @@ -19,26 +19,50 @@ func NewKeyLocker[K comparable]() *KeyLocker[K] { } func (l *KeyLocker[K]) Lock(key K) { + l.lock(key, false) +} + +func (l *KeyLocker[K]) RLock(key K) { + l.lock(key, true) +} + +func (l *KeyLocker[K]) lock(key K, read bool) { l.lockersMtx.Lock() if locker, found := l.lockers[key]; found { - locker.waiters++ + locker.userCount++ l.lockersMtx.Unlock() - locker.mtx.Lock() + if read { + locker.mtx.RLock() + } else { + locker.mtx.Lock() + } return } locker := &locker{ - waiters: 1, + userCount: 1, + } + if read { + locker.mtx.RLock() + } else { + locker.mtx.Lock() } - locker.mtx.Lock() l.lockers[key] = locker l.lockersMtx.Unlock() } func (l *KeyLocker[K]) Unlock(key K) { + l.unlock(key, false) +} + +func (l *KeyLocker[K]) RUnlock(key K) { + l.unlock(key, true) +} + +func (l *KeyLocker[K]) unlock(key K, read bool) { l.lockersMtx.Lock() defer l.lockersMtx.Unlock() @@ -47,10 +71,14 @@ func (l *KeyLocker[K]) Unlock(key K) { return } - if locker.waiters == 1 { + if locker.userCount == 1 { delete(l.lockers, key) } - locker.waiters-- + locker.userCount-- - locker.mtx.Unlock() + if read { + locker.mtx.RUnlock() + } else { + locker.mtx.Unlock() + } } diff --git a/pkg/util/sync/key_locker_test.go b/pkg/util/sync/key_locker_test.go index 3b3e6a694..f4ba3e19d 100644 --- a/pkg/util/sync/key_locker_test.go +++ b/pkg/util/sync/key_locker_test.go @@ -9,7 +9,7 @@ import ( "golang.org/x/sync/errgroup" ) -func TestKeyLocker(t *testing.T) { +func TestKeyLockerWrite(t *testing.T) { taken := false eg, _ := errgroup.WithContext(context.Background()) keyLocker := NewKeyLocker[int]() @@ -30,3 +30,17 @@ func TestKeyLocker(t *testing.T) { } require.NoError(t, eg.Wait()) } + +func TestKeyLockerRead(t *testing.T) { + eg, _ := errgroup.WithContext(context.Background()) + keyLocker := NewKeyLocker[int]() + for i := 0; i < 100; i++ { + eg.Go(func() error { + keyLocker.RLock(0) + defer keyLocker.RUnlock(0) + + return nil + }) + } + require.NoError(t, eg.Wait()) +}