WIP: Blobtree substorage #645
7 changed files with 109 additions and 23 deletions
|
@ -2,17 +2,19 @@ package blobtree
|
|||
|
||||
import (
|
||||
"context"
|
||||
"encoding/binary"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
)
|
||||
|
||||
func (b *BlobTree) Delete(_ context.Context, prm common.DeletePrm) (common.DeleteRes, error) {
|
||||
func (b *BlobTree) Delete(ctx context.Context, prm common.DeletePrm) (common.DeleteRes, error) {
|
||||
var (
|
||||
success = false
|
||||
startedAt = time.Now()
|
||||
|
@ -21,14 +23,22 @@ func (b *BlobTree) Delete(_ context.Context, prm common.DeletePrm) (common.Delet
|
|||
b.cfg.metrics.Delete(time.Since(startedAt), success, prm.StorageID != nil)
|
||||
}()
|
||||
|
||||
_, span := tracing.StartSpanFromContext(ctx, "BlobTree.Delete",
|
||||
trace.WithAttributes(
|
||||
attribute.String("path", b.cfg.rootPath),
|
||||
attribute.String("address", prm.Address.EncodeToString()),
|
||||
attribute.String("storage_id", storageIDToIdxStringSafe(prm.StorageID)),
|
||||
))
|
||||
defer span.End()
|
||||
|
||||
if b.cfg.readOnly {
|
||||
return common.DeleteRes{}, common.ErrReadOnly
|
||||
}
|
||||
|
||||
var res common.DeleteRes
|
||||
var err error
|
||||
if len(prm.StorageID) == storageIDLength {
|
||||
res, err = b.deleteFromIdx(prm.Address, binary.LittleEndian.Uint64(prm.StorageID))
|
||||
if idx, ok := tryParseIdxFromStorageID(prm.StorageID); ok {
|
||||
res, err = b.deleteFromIdx(prm.Address, idx)
|
||||
} else {
|
||||
res, err = b.findAndDelete(prm.Address)
|
||||
}
|
||||
|
|
|
@ -2,16 +2,18 @@ package blobtree
|
|||
|
||||
import (
|
||||
"context"
|
||||
"encoding/binary"
|
||||
"errors"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
)
|
||||
|
||||
func (b *BlobTree) Exists(_ context.Context, prm common.ExistsPrm) (common.ExistsRes, error) {
|
||||
func (b *BlobTree) Exists(ctx context.Context, prm common.ExistsPrm) (common.ExistsRes, error) {
|
||||
var (
|
||||
startedAt = time.Now()
|
||||
success = false
|
||||
|
@ -20,10 +22,18 @@ func (b *BlobTree) Exists(_ context.Context, prm common.ExistsPrm) (common.Exist
|
|||
b.cfg.metrics.Exists(time.Since(startedAt), success, prm.StorageID != nil)
|
||||
}()
|
||||
|
||||
_, span := tracing.StartSpanFromContext(ctx, "BlobTree.Exists",
|
||||
trace.WithAttributes(
|
||||
attribute.String("path", b.cfg.rootPath),
|
||||
attribute.String("address", prm.Address.EncodeToString()),
|
||||
attribute.String("storage_id", storageIDToIdxStringSafe(prm.StorageID)),
|
||||
))
|
||||
defer span.End()
|
||||
|
||||
var res common.ExistsRes
|
||||
var err error
|
||||
if len(prm.StorageID) == storageIDLength {
|
||||
res, err = b.existsFromIdx(prm.Address, binary.LittleEndian.Uint64(prm.StorageID))
|
||||
if idx, ok := tryParseIdxFromStorageID(prm.StorageID); ok {
|
||||
res, err = b.existsFromIdx(prm.Address, idx)
|
||||
} else {
|
||||
res, err = b.findAndCheck(prm.Address)
|
||||
}
|
||||
|
|
|
@ -2,18 +2,20 @@ package blobtree
|
|||
|
||||
import (
|
||||
"context"
|
||||
"encoding/binary"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
)
|
||||
|
||||
func (b *BlobTree) Get(_ context.Context, prm common.GetPrm) (common.GetRes, error) {
|
||||
func (b *BlobTree) Get(ctx context.Context, prm common.GetPrm) (common.GetRes, error) {
|
||||
var (
|
||||
startedAt = time.Now()
|
||||
success = false
|
||||
|
@ -23,6 +25,15 @@ func (b *BlobTree) Get(_ context.Context, prm common.GetPrm) (common.GetRes, err
|
|||
b.cfg.metrics.Get(time.Since(startedAt), size, success, prm.StorageID != nil)
|
||||
}()
|
||||
|
||||
_, span := tracing.StartSpanFromContext(ctx, "BlobTree.Get",
|
||||
trace.WithAttributes(
|
||||
attribute.String("path", b.cfg.rootPath),
|
||||
attribute.String("address", prm.Address.EncodeToString()),
|
||||
attribute.String("storage_id", storageIDToIdxStringSafe(prm.StorageID)),
|
||||
attribute.Bool("raw", prm.Raw),
|
||||
))
|
||||
defer span.End()
|
||||
|
||||
res, err := b.get(prm)
|
||||
success = err == nil
|
||||
size = len(res.RawData)
|
||||
|
@ -30,8 +41,8 @@ func (b *BlobTree) Get(_ context.Context, prm common.GetPrm) (common.GetRes, err
|
|||
}
|
||||
|
||||
func (b *BlobTree) get(prm common.GetPrm) (common.GetRes, error) {
|
||||
if len(prm.StorageID) == storageIDLength {
|
||||
return b.getFromIdx(prm.Address, binary.LittleEndian.Uint64(prm.StorageID))
|
||||
if idx, ok := tryParseIdxFromStorageID(prm.StorageID); ok {
|
||||
return b.getFromIdx(prm.Address, idx)
|
||||
}
|
||||
return b.findAndGet(prm.Address)
|
||||
}
|
||||
|
|
|
@ -2,11 +2,15 @@ package blobtree
|
|||
|
||||
import (
|
||||
"context"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
)
|
||||
|
||||
func (b *BlobTree) GetRange(ctx context.Context, prm common.GetRangePrm) (common.GetRangeRes, error) {
|
||||
|
@ -19,6 +23,16 @@ func (b *BlobTree) GetRange(ctx context.Context, prm common.GetRangePrm) (common
|
|||
b.cfg.metrics.GetRange(time.Since(startedAt), size, success, prm.StorageID != nil)
|
||||
}()
|
||||
|
||||
_, span := tracing.StartSpanFromContext(ctx, "BlobTree.GetRange",
|
||||
trace.WithAttributes(
|
||||
attribute.String("path", b.cfg.rootPath),
|
||||
attribute.String("address", prm.Address.EncodeToString()),
|
||||
attribute.String("storage_id", storageIDToIdxStringSafe(prm.StorageID)),
|
||||
attribute.String("offset", strconv.FormatUint(prm.Range.GetOffset(), 10)),
|
||||
attribute.String("length", strconv.FormatUint(prm.Range.GetLength(), 10)),
|
||||
))
|
||||
defer span.End()
|
||||
|
||||
gRes, err := b.get(common.GetPrm{Address: prm.Address, StorageID: prm.StorageID})
|
||||
if err != nil {
|
||||
return common.GetRangeRes{}, err
|
||||
|
|
|
@ -2,15 +2,17 @@ package blobtree
|
|||
|
||||
import (
|
||||
"context"
|
||||
"encoding/binary"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
)
|
||||
|
||||
func (b *BlobTree) Iterate(_ context.Context, prm common.IteratePrm) (common.IterateRes, error) {
|
||||
func (b *BlobTree) Iterate(ctx context.Context, prm common.IteratePrm) (common.IterateRes, error) {
|
||||
var (
|
||||
startedAt = time.Now()
|
||||
err error
|
||||
|
@ -19,6 +21,13 @@ func (b *BlobTree) Iterate(_ context.Context, prm common.IteratePrm) (common.Ite
|
|||
b.cfg.metrics.Iterate(time.Since(startedAt), err == nil)
|
||||
}()
|
||||
|
||||
_, span := tracing.StartSpanFromContext(ctx, "BlobTree.Iterate",
|
||||
trace.WithAttributes(
|
||||
attribute.String("path", b.cfg.rootPath),
|
||||
attribute.Bool("ignore_errors", prm.IgnoreErrors),
|
||||
))
|
||||
defer span.End()
|
||||
|
||||
err = b.iterateDir(b.cfg.rootPath, 0, prm)
|
||||
return common.IterateRes{}, err
|
||||
}
|
||||
|
@ -91,12 +100,10 @@ func (b *BlobTree) iterateRecords(idx uint64, path string, prm common.IteratePrm
|
|||
return err
|
||||
}
|
||||
|
||||
storageID := make([]byte, storageIDLength)
|
||||
binary.LittleEndian.PutUint64(storageID, idx)
|
||||
err = prm.Handler(common.IterationElement{
|
||||
Address: record.Address,
|
||||
ObjectData: record.Data,
|
||||
StorageID: storageID,
|
||||
StorageID: idxToStorageID(idx),
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
|
|
|
@ -2,20 +2,21 @@ package blobtree
|
|||
|
||||
import (
|
||||
"context"
|
||||
"encoding/binary"
|
||||
"os"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
)
|
||||
|
||||
const (
|
||||
tempFileSymbols = "###"
|
||||
storageIDLength = 8
|
||||
)
|
||||
|
||||
func (b *BlobTree) Put(_ context.Context, prm common.PutPrm) (common.PutRes, error) {
|
||||
func (b *BlobTree) Put(ctx context.Context, prm common.PutPrm) (common.PutRes, error) {
|
||||
var (
|
||||
success bool
|
||||
size int
|
||||
|
@ -25,6 +26,13 @@ func (b *BlobTree) Put(_ context.Context, prm common.PutPrm) (common.PutRes, err
|
|||
b.cfg.metrics.Put(time.Since(startedAt), size, success)
|
||||
}()
|
||||
|
||||
_, span := tracing.StartSpanFromContext(ctx, "BlobTree.Put",
|
||||
trace.WithAttributes(
|
||||
attribute.String("address", prm.Address.EncodeToString()),
|
||||
attribute.Bool("dont_compress", prm.DontCompress),
|
||||
))
|
||||
defer span.End()
|
||||
|
||||
if b.cfg.readOnly {
|
||||
return common.PutRes{}, common.ErrReadOnly
|
||||
}
|
||||
|
@ -47,9 +55,7 @@ func (b *BlobTree) Put(_ context.Context, prm common.PutPrm) (common.PutRes, err
|
|||
success = true
|
||||
size = len(prm.RawData)
|
||||
|
||||
storageID := make([]byte, storageIDLength)
|
||||
binary.LittleEndian.PutUint64(storageID, idx)
|
||||
return common.PutRes{StorageID: storageID}, nil
|
||||
return common.PutRes{StorageID: idxToStorageID(idx)}, nil
|
||||
}
|
||||
|
||||
func (b *BlobTree) saveToFile(prm common.PutPrm, dir string) (uint64, error) {
|
||||
|
|
28
pkg/local_object_storage/blobstor/blobtree/storage_id.go
Normal file
28
pkg/local_object_storage/blobstor/blobtree/storage_id.go
Normal file
|
@ -0,0 +1,28 @@
|
|||
package blobtree
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"strconv"
|
||||
)
|
||||
|
||||
const storageIDLength = 8
|
||||
|
||||
func tryParseIdxFromStorageID(storageID []byte) (uint64, bool) {
|
||||
if len(storageID) == storageIDLength {
|
||||
return binary.LittleEndian.Uint64(storageID), true
|
||||
}
|
||||
return 0, false
|
||||
}
|
||||
|
||||
func storageIDToIdxStringSafe(storageID []byte) string {
|
||||
if len(storageID) == storageIDLength {
|
||||
return strconv.FormatUint(binary.LittleEndian.Uint64(storageID), 10)
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
func idxToStorageID(idx uint64) []byte {
|
||||
storageID := make([]byte, storageIDLength)
|
||||
binary.LittleEndian.PutUint64(storageID, idx)
|
||||
return storageID
|
||||
}
|
Loading…
Reference in a new issue