package fstree import ( "context" "crypto/sha256" "errors" "fmt" "io/fs" "os" "path/filepath" "strconv" "strings" "sync/atomic" "syscall" "time" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/compression" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" ) type keyLock interface { Lock(string) Unlock(string) } type noopKeyLock struct{} func (l *noopKeyLock) Lock(string) {} func (l *noopKeyLock) Unlock(string) {} // FSTree represents an object storage as a filesystem tree. type FSTree struct { Info *compression.Config Depth uint64 DirNameLen int noSync bool readOnly bool metrics Metrics fileGuard keyLock fileCounter FileCounter fileCounterEnabled bool suffix atomic.Uint64 } // Info groups the information about file storage. type Info struct { // Permission bits of the root directory. Permissions fs.FileMode // Full path to the root directory. RootPath string } const ( // DirNameLen is how many bytes is used to group keys into directories. DirNameLen = 1 // in bytes // MaxDepth is maximum depth of nested directories. MaxDepth = (sha256.Size - 1) / DirNameLen ) var _ common.Storage = (*FSTree)(nil) func New(opts ...Option) *FSTree { f := &FSTree{ Info: Info{ Permissions: 0o700, RootPath: "./", }, Config: nil, Depth: 4, DirNameLen: DirNameLen, metrics: &noopMetrics{}, fileGuard: &noopKeyLock{}, fileCounter: &noopCounter{}, } for i := range opts { opts[i](f) } return f } func stringifyAddress(addr oid.Address) string { return addr.Object().EncodeToString() + "." + addr.Container().EncodeToString() } func addressFromString(s string) (oid.Address, error) { before, after, found := strings.Cut(s, ".") if !found { return oid.Address{}, errors.New("invalid address") } var obj oid.ID if err := obj.DecodeString(before); err != nil { return oid.Address{}, err } var cnr cid.ID if err := cnr.DecodeString(after); err != nil { return oid.Address{}, err } var addr oid.Address addr.SetObject(obj) addr.SetContainer(cnr) return addr, nil } // Iterate iterates over all stored objects. func (t *FSTree) Iterate(ctx context.Context, prm common.IteratePrm) (common.IterateRes, error) { var ( err error startedAt = time.Now() ) defer func() { t.metrics.Iterate(time.Since(startedAt), err == nil) }() _, span := tracing.StartSpanFromContext(ctx, "FSTree.Iterate", trace.WithAttributes( attribute.String("path", t.RootPath), attribute.Bool("ignore_errors", prm.IgnoreErrors), )) defer span.End() err = t.iterate(ctx, 0, []string{t.RootPath}, prm) return common.IterateRes{}, err } func (t *FSTree) iterate(ctx context.Context, depth uint64, curPath []string, prm common.IteratePrm) error { curName := strings.Join(curPath[1:], "") des, err := os.ReadDir(filepath.Join(curPath...)) if err != nil { if prm.IgnoreErrors { return nil } return err } isLast := depth >= t.Depth l := len(curPath) curPath = append(curPath, "") for i := range des { select { case <-ctx.Done(): return ctx.Err() default: } curPath[l] = des[i].Name() if !isLast && des[i].IsDir() { err := t.iterate(ctx, depth+1, curPath, prm) if err != nil { // Must be error from handler in case errors are ignored. // Need to report. return err } } if depth != t.Depth { continue } addr, err := addressFromString(curName + des[i].Name()) if err != nil { continue } data, err := os.ReadFile(filepath.Join(curPath...)) if err != nil && os.IsNotExist(err) { continue } if err == nil { data, err = t.Decompress(data) } if err != nil { if prm.IgnoreErrors { if prm.ErrorHandler != nil { return prm.ErrorHandler(addr, err) } continue } return err } err = prm.Handler(common.IterationElement{ Address: addr, ObjectData: data, StorageID: []byte{}, }) if err != nil { return err } } return nil } func (t *FSTree) treePath(addr oid.Address) string { sAddr := stringifyAddress(addr) var sb strings.Builder sb.Grow(len(t.RootPath) + len(sAddr) + int(t.Depth) + 1) sb.WriteString(t.RootPath) for i := 0; uint64(i) < t.Depth; i++ { sb.WriteRune(filepath.Separator) sb.WriteString(sAddr[:t.DirNameLen]) sAddr = sAddr[t.DirNameLen:] } sb.WriteRune(filepath.Separator) sb.WriteString(sAddr) return sb.String() } // Delete removes the object with the specified address from the storage. func (t *FSTree) Delete(ctx context.Context, prm common.DeletePrm) (common.DeleteRes, error) { var ( err error startedAt = time.Now() ) defer func() { t.metrics.Delete(time.Since(startedAt), err == nil) }() _, span := tracing.StartSpanFromContext(ctx, "FSTree.Delete", trace.WithAttributes( attribute.String("path", t.RootPath), attribute.String("address", prm.Address.EncodeToString()), )) defer span.End() if t.readOnly { err = common.ErrReadOnly return common.DeleteRes{}, err } p := t.treePath(prm.Address) if t.fileCounterEnabled { t.fileGuard.Lock(p) err = os.Remove(p) t.fileGuard.Unlock(p) if err == nil { t.fileCounter.Dec() } } else { err = os.Remove(p) } if err != nil && os.IsNotExist(err) { err = logicerr.Wrap(new(apistatus.ObjectNotFound)) } return common.DeleteRes{}, err } // Exists returns the path to the file with object contents if it exists in the storage // and an error otherwise. func (t *FSTree) Exists(ctx context.Context, prm common.ExistsPrm) (common.ExistsRes, error) { var ( success = false startedAt = time.Now() ) defer func() { t.metrics.Exists(time.Since(startedAt), success) }() _, span := tracing.StartSpanFromContext(ctx, "FSTree.Exists", trace.WithAttributes( attribute.String("path", t.RootPath), attribute.String("address", prm.Address.EncodeToString()), )) defer span.End() p := t.treePath(prm.Address) _, err := os.Stat(p) found := err == nil if os.IsNotExist(err) { err = nil } success = err == nil return common.ExistsRes{Exists: found}, err } // Put puts an object in the storage. func (t *FSTree) Put(ctx context.Context, prm common.PutPrm) (common.PutRes, error) { var ( size int startedAt = time.Now() err error ) defer func() { t.metrics.Put(time.Since(startedAt), size, err == nil) }() _, span := tracing.StartSpanFromContext(ctx, "FSTree.Put", trace.WithAttributes( attribute.String("path", t.RootPath), attribute.String("address", prm.Address.EncodeToString()), attribute.Bool("dont_compress", prm.DontCompress), )) defer span.End() if t.readOnly { err = common.ErrReadOnly return common.PutRes{}, err } p := t.treePath(prm.Address) if err = util.MkdirAllX(filepath.Dir(p), t.Permissions); err != nil { if errors.Is(err, syscall.ENOSPC) { err = common.ErrNoSpace return common.PutRes{}, err } return common.PutRes{}, err } if !prm.DontCompress { prm.RawData = t.Compress(prm.RawData) } size = len(prm.RawData) tmpPath := p + "#" + strconv.FormatUint(t.suffix.Add(1), 10) err = t.writeAndRename(tmpPath, p, prm.RawData) return common.PutRes{StorageID: []byte{}}, err } // writeAndRename opens tmpPath exclusively, writes data to it and renames it to p. func (t *FSTree) writeAndRename(tmpPath, p string, data []byte) error { if t.fileCounterEnabled { t.fileGuard.Lock(p) defer t.fileGuard.Unlock(p) } err := t.writeFile(tmpPath, data) if err != nil { var pe *fs.PathError if errors.As(err, &pe) { switch pe.Err { case syscall.ENOSPC: err = common.ErrNoSpace _ = os.RemoveAll(tmpPath) } } return err } if t.fileCounterEnabled { t.fileCounter.Inc() var targetFileExists bool if _, e := os.Stat(p); e == nil { targetFileExists = true } err = os.Rename(tmpPath, p) if err == nil && targetFileExists { t.fileCounter.Dec() } } else { err = os.Rename(tmpPath, p) } return err } func (t *FSTree) writeFlags() int { flags := os.O_WRONLY | os.O_CREATE | os.O_TRUNC | os.O_EXCL if t.noSync { return flags } return flags | os.O_SYNC } // writeFile writes data to a file with path p. // The code is copied from `os.WriteFile` with minor corrections for flags. func (t *FSTree) writeFile(p string, data []byte) error { f, err := os.OpenFile(p, t.writeFlags(), t.Permissions) if err != nil { return err } _, err = f.Write(data) if err1 := f.Close(); err1 != nil && err == nil { err = err1 } return err } // Get returns an object from the storage by address. func (t *FSTree) Get(ctx context.Context, prm common.GetPrm) (common.GetRes, error) { var ( startedAt = time.Now() success = false size = 0 ) defer func() { t.metrics.Get(time.Since(startedAt), size, success) }() ctx, span := tracing.StartSpanFromContext(ctx, "FSTree.Get", trace.WithAttributes( attribute.String("path", t.RootPath), attribute.Bool("raw", prm.Raw), attribute.String("address", prm.Address.EncodeToString()), )) defer span.End() p := t.treePath(prm.Address) if _, err := os.Stat(p); os.IsNotExist(err) { return common.GetRes{}, logicerr.Wrap(new(apistatus.ObjectNotFound)) } var data []byte var err error { _, span := tracing.StartSpanFromContext(ctx, "FSTree.Get.ReadFile") defer span.End() data, err = os.ReadFile(p) if err != nil { if os.IsNotExist(err) { return common.GetRes{}, logicerr.Wrap(new(apistatus.ObjectNotFound)) } return common.GetRes{}, err } } data, err = t.Decompress(data) if err != nil { return common.GetRes{}, err } size = len(data) obj := objectSDK.New() if err := obj.Unmarshal(data); err != nil { return common.GetRes{}, err } success = true return common.GetRes{Object: obj, RawData: data}, nil } // GetRange implements common.Storage. func (t *FSTree) GetRange(ctx context.Context, prm common.GetRangePrm) (common.GetRangeRes, error) { var ( startedAt = time.Now() success = false size = 0 ) defer func() { t.metrics.GetRange(time.Since(startedAt), size, success) }() ctx, span := tracing.StartSpanFromContext(ctx, "FSTree.GetRange", trace.WithAttributes( attribute.String("path", t.RootPath), attribute.String("address", prm.Address.EncodeToString()), attribute.String("offset", strconv.FormatUint(prm.Range.GetOffset(), 10)), attribute.String("length", strconv.FormatUint(prm.Range.GetLength(), 10)), )) defer span.End() res, err := t.Get(ctx, common.GetPrm{Address: prm.Address}) if err != nil { return common.GetRangeRes{}, err } payload := res.Object.Payload() from := prm.Range.GetOffset() to := from + prm.Range.GetLength() if pLen := uint64(len(payload)); to < from || pLen < from || pLen < to { return common.GetRangeRes{}, logicerr.Wrap(new(apistatus.ObjectOutOfRange)) } success = true data := payload[from:to] size = len(data) return common.GetRangeRes{ Data: data, }, nil } // initFileCounter walks the file tree rooted at FSTree's root, // counts total items count, inits counter and returns number of stored objects. func (t *FSTree) initFileCounter() error { if !t.fileCounterEnabled { return nil } counter, err := t.countFiles() if err != nil { return err } t.fileCounter.Set(counter) return nil } func (t *FSTree) countFiles() (uint64, error) { var counter uint64 // it is simpler to just consider every file // that is not directory as an object err := filepath.WalkDir(t.RootPath, func(_ string, d fs.DirEntry, _ error) error { if !d.IsDir() { counter++ } return nil }, ) if err != nil { return 0, fmt.Errorf("could not walk through %s directory: %w", t.RootPath, err) } return counter, nil } // Type is fstree storage type used in logs and configuration. const Type = "fstree" // Type implements common.Storage. func (*FSTree) Type() string { return Type } // Path implements common.Storage. func (t *FSTree) Path() string { return t.RootPath } // SetCompressor implements common.Storage. func (t *FSTree) SetCompressor(cc *compression.Config) { t.Config = cc } func (t *FSTree) Compressor() *compression.Config { return t.Config } // SetReportErrorFunc implements common.Storage. func (t *FSTree) SetReportErrorFunc(_ func(string, error)) { // Do nothing, FSTree can encounter only one error which is returned. } func (t *FSTree) SetParentID(parentID string) { t.metrics.SetParentID(parentID) }