fstree: Use O_TMPFILE for temporary files #970
9 changed files with 253 additions and 90 deletions
2
go.mod
2
go.mod
|
@ -37,6 +37,7 @@ require (
|
|||
go.uber.org/zap v1.26.0
|
||||
golang.org/x/exp v0.0.0-20240119083558-1b970713d09a
|
||||
golang.org/x/sync v0.6.0
|
||||
golang.org/x/sys v0.16.0
|
||||
golang.org/x/term v0.16.0
|
||||
google.golang.org/grpc v1.61.0
|
||||
google.golang.org/protobuf v1.32.0
|
||||
|
@ -120,7 +121,6 @@ require (
|
|||
go.uber.org/multierr v1.11.0 // indirect
|
||||
golang.org/x/crypto v0.18.0 // indirect
|
||||
golang.org/x/net v0.20.0 // indirect
|
||||
golang.org/x/sys v0.16.0 // indirect
|
||||
golang.org/x/text v0.14.0 // indirect
|
||||
google.golang.org/genproto/googleapis/api v0.0.0-20240123012728-ef4313101c80 // indirect
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20240123012728-ef4313101c80 // indirect
|
||||
|
|
|
@ -16,6 +16,13 @@ func (t *FSTree) Init() error {
|
|||
if err := util.MkdirAllX(t.RootPath, t.Permissions); err != nil {
|
||||
return err
|
||||
}
|
||||
if !t.readOnly {
|
||||
f := newSpecificWriteData(t.fileCounter, t.RootPath, t.Permissions, t.noSync)
|
||||
if f != nil {
|
||||
t.writer = f
|
||||
}
|
||||
}
|
||||
|
||||
return t.initFileCounter()
|
||||
}
|
||||
|
||||
|
|
|
@ -18,6 +18,11 @@ func (c *noopCounter) Set(uint64) {}
|
|||
func (c *noopCounter) Inc() {}
|
||||
func (c *noopCounter) Dec() {}
|
||||
|
||||
func counterEnabled(c FileCounter) bool {
|
||||
_, noop := c.(*noopCounter)
|
||||
return !noop
|
||||
}
|
||||
|
||||
type SimpleCounter struct {
|
||||
v atomic.Uint64
|
||||
}
|
||||
|
|
|
@ -10,7 +10,6 @@ import (
|
|||
"path/filepath"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync/atomic"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
|
@ -54,11 +53,9 @@ type FSTree struct {
|
|||
readOnly bool
|
||||
metrics Metrics
|
||||
|
||||
fileGuard keyLock
|
||||
fileCounter FileCounter
|
||||
fileCounterEnabled bool
|
||||
fileCounter FileCounter
|
||||
|
||||
suffix atomic.Uint64
|
||||
writer writer
|
||||
}
|
||||
|
||||
// Info groups the information about file storage.
|
||||
|
@ -89,13 +86,13 @@ func New(opts ...Option) *FSTree {
|
|||
Depth: 4,
|
||||
DirNameLen: DirNameLen,
|
||||
metrics: &noopMetrics{},
|
||||
fileGuard: &noopKeyLock{},
|
||||
fileCounter: &noopCounter{},
|
||||
log: &logger.Logger{Logger: zap.L()},
|
||||
}
|
||||
for i := range opts {
|
||||
opts[i](f)
|
||||
}
|
||||
f.writer = newGenericWriteData(f.fileCounter, f.Permissions, f.noSync)
|
||||
|
||||
return f
|
||||
}
|
||||
|
@ -266,21 +263,7 @@ func (t *FSTree) Delete(ctx context.Context, prm common.DeletePrm) (common.Delet
|
|||
}
|
||||
|
||||
p := t.treePath(prm.Address)
|
||||
|
||||
if t.fileCounterEnabled {
|
||||
t.fileGuard.Lock(p)
|
||||
err = os.Remove(p)
|
||||
t.fileGuard.Unlock(p)
|
||||
if err == nil {
|
||||
t.fileCounter.Dec()
|
||||
}
|
||||
} else {
|
||||
err = os.Remove(p)
|
||||
}
|
||||
|
||||
if err != nil && os.IsNotExist(err) {
|
||||
err = logicerr.Wrap(new(apistatus.ObjectNotFound))
|
||||
}
|
||||
err = t.writer.removeFile(p)
|
||||
return common.DeleteRes{}, err
|
||||
}
|
||||
|
||||
|
@ -357,67 +340,7 @@ func (t *FSTree) Put(ctx context.Context, prm common.PutPrm) (common.PutRes, err
|
|||
}
|
||||
|
||||
size = len(prm.RawData)
|
||||
tmpPath := p + "#" + strconv.FormatUint(t.suffix.Add(1), 10)
|
||||
err = t.writeAndRename(tmpPath, p, prm.RawData)
|
||||
return common.PutRes{StorageID: []byte{}}, err
|
||||
}
|
||||
|
||||
// writeAndRename opens tmpPath exclusively, writes data to it and renames it to p.
|
||||
func (t *FSTree) writeAndRename(tmpPath, p string, data []byte) error {
|
||||
if t.fileCounterEnabled {
|
||||
t.fileGuard.Lock(p)
|
||||
defer t.fileGuard.Unlock(p)
|
||||
}
|
||||
|
||||
err := t.writeFile(tmpPath, data)
|
||||
if err != nil {
|
||||
var pe *fs.PathError
|
||||
if errors.As(err, &pe) {
|
||||
switch pe.Err {
|
||||
case syscall.ENOSPC:
|
||||
err = common.ErrNoSpace
|
||||
_ = os.RemoveAll(tmpPath)
|
||||
}
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
if t.fileCounterEnabled {
|
||||
t.fileCounter.Inc()
|
||||
var targetFileExists bool
|
||||
if _, e := os.Stat(p); e == nil {
|
||||
targetFileExists = true
|
||||
}
|
||||
err = os.Rename(tmpPath, p)
|
||||
if err == nil && targetFileExists {
|
||||
t.fileCounter.Dec()
|
||||
}
|
||||
} else {
|
||||
err = os.Rename(tmpPath, p)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (t *FSTree) writeFlags() int {
|
||||
flags := os.O_WRONLY | os.O_CREATE | os.O_TRUNC | os.O_EXCL
|
||||
if t.noSync {
|
||||
return flags
|
||||
}
|
||||
return flags | os.O_SYNC
|
||||
}
|
||||
|
||||
// writeFile writes data to a file with path p.
|
||||
// The code is copied from `os.WriteFile` with minor corrections for flags.
|
||||
func (t *FSTree) writeFile(p string, data []byte) error {
|
||||
f, err := os.OpenFile(p, t.writeFlags(), t.Permissions)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_, err = f.Write(data)
|
||||
if err1 := f.Close(); err1 != nil && err == nil {
|
||||
err = err1
|
||||
}
|
||||
return err
|
||||
return common.PutRes{StorageID: []byte{}}, t.writer.writeData(p, prm.RawData)
|
||||
}
|
||||
|
||||
// Get returns an object from the storage by address.
|
||||
|
@ -518,7 +441,7 @@ func (t *FSTree) GetRange(ctx context.Context, prm common.GetRangePrm) (common.G
|
|||
// initFileCounter walks the file tree rooted at FSTree's root,
|
||||
// counts total items count, inits counter and returns number of stored objects.
|
||||
func (t *FSTree) initFileCounter() error {
|
||||
if !t.fileCounterEnabled {
|
||||
if !counterEnabled(t.fileCounter) {
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
127
pkg/local_object_storage/blobstor/fstree/fstree_write_generic.go
Normal file
127
pkg/local_object_storage/blobstor/fstree/fstree_write_generic.go
Normal file
|
@ -0,0 +1,127 @@
|
|||
package fstree
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"io/fs"
|
||||
"os"
|
||||
"strconv"
|
||||
"sync/atomic"
|
||||
"syscall"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
|
||||
utilSync "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/sync"
|
||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||
)
|
||||
|
||||
type writer interface {
|
||||
writeData(string, []byte) error
|
||||
removeFile(string) error
|
||||
}
|
||||
|
||||
type genericWriter struct {
|
||||
perm fs.FileMode
|
||||
flags int
|
||||
|
||||
fileGuard keyLock
|
||||
fileCounter FileCounter
|
||||
fileCounterEnabled bool
|
||||
suffix atomic.Uint64
|
||||
}
|
||||
|
||||
func newGenericWriteData(c FileCounter, perm fs.FileMode, noSync bool) writer {
|
||||
flags := os.O_WRONLY | os.O_CREATE | os.O_TRUNC | os.O_EXCL
|
||||
if !noSync {
|
||||
flags |= os.O_SYNC
|
||||
}
|
||||
|
||||
var fileGuard keyLock = &noopKeyLock{}
|
||||
fileCounterEnabled := counterEnabled(c)
|
||||
if fileCounterEnabled {
|
||||
fileGuard = utilSync.NewKeyLocker[string]()
|
||||
}
|
||||
|
||||
var w = &genericWriter{
|
||||
perm: perm,
|
||||
flags: flags,
|
||||
|
||||
fileCounterEnabled: fileCounterEnabled,
|
||||
fileGuard: fileGuard,
|
||||
fileCounter: c,
|
||||
}
|
||||
return w
|
||||
}
|
||||
|
||||
func (w *genericWriter) writeData(p string, data []byte) error {
|
||||
tmpPath := p + "#" + strconv.FormatUint(w.suffix.Add(1), 10)
|
||||
return w.writeAndRename(tmpPath, p, data)
|
||||
}
|
||||
|
||||
// writeAndRename opens tmpPath exclusively, writes data to it and renames it to p.
|
||||
func (w *genericWriter) writeAndRename(tmpPath, p string, data []byte) error {
|
||||
if w.fileCounterEnabled {
|
||||
w.fileGuard.Lock(p)
|
||||
defer w.fileGuard.Unlock(p)
|
||||
}
|
||||
|
||||
err := w.writeFile(tmpPath, data)
|
||||
if err != nil {
|
||||
var pe *fs.PathError
|
||||
if errors.As(err, &pe) {
|
||||
switch pe.Err {
|
||||
case syscall.ENOSPC:
|
||||
err = common.ErrNoSpace
|
||||
_ = os.RemoveAll(tmpPath)
|
||||
}
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
if w.fileCounterEnabled {
|
||||
w.fileCounter.Inc()
|
||||
var targetFileExists bool
|
||||
if _, e := os.Stat(p); e == nil {
|
||||
targetFileExists = true
|
||||
}
|
||||
err = os.Rename(tmpPath, p)
|
||||
if err == nil && targetFileExists {
|
||||
w.fileCounter.Dec()
|
||||
}
|
||||
} else {
|
||||
err = os.Rename(tmpPath, p)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// writeFile writes data to a file with path p.
|
||||
// The code is copied from `os.WriteFile` with minor corrections for flags.
|
||||
func (w *genericWriter) writeFile(p string, data []byte) error {
|
||||
f, err := os.OpenFile(p, w.flags, w.perm)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_, err = f.Write(data)
|
||||
if err1 := f.Close(); err1 != nil && err == nil {
|
||||
err = err1
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (w *genericWriter) removeFile(p string) error {
|
||||
var err error
|
||||
if w.fileCounterEnabled {
|
||||
w.fileGuard.Lock(p)
|
||||
err = os.Remove(p)
|
||||
w.fileGuard.Unlock(p)
|
||||
if err == nil {
|
||||
w.fileCounter.Dec()
|
||||
}
|
||||
} else {
|
||||
err = os.Remove(p)
|
||||
}
|
||||
|
||||
if err != nil && os.IsNotExist(err) {
|
||||
err = logicerr.Wrap(new(apistatus.ObjectNotFound))
|
||||
}
|
||||
return err
|
||||
}
|
|
@ -0,0 +1,89 @@
|
|||
//go:build linux && !fstree_generic
|
||||
|
||||
package fstree
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"io/fs"
|
||||
"strconv"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
|
||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||
"golang.org/x/sys/unix"
|
||||
)
|
||||
|
||||
type linuxWriter struct {
|
||||
root string
|
||||
perm uint32
|
||||
flags int
|
||||
|
||||
counter FileCounter
|
||||
}
|
||||
|
||||
func newSpecificWriteData(c FileCounter, root string, perm fs.FileMode, noSync bool) writer {
|
||||
flags := unix.O_WRONLY | unix.O_TMPFILE | unix.O_CLOEXEC
|
||||
if !noSync {
|
||||
flags |= unix.O_DSYNC
|
||||
}
|
||||
fd, err := unix.Open(root, flags, uint32(perm))
|
||||
if err != nil {
|
||||
// Which means that OS-specific writeData can't be created
|
||||
acid-ant marked this conversation as resolved
Outdated
|
||||
// and FSTree should use the generic one.
|
||||
return nil
|
||||
}
|
||||
_ = unix.Close(fd) // Don't care about error.
|
||||
w := &linuxWriter{
|
||||
root: root,
|
||||
perm: uint32(perm),
|
||||
flags: flags,
|
||||
counter: c,
|
||||
}
|
||||
return w
|
||||
}
|
||||
|
||||
func (w *linuxWriter) writeData(p string, data []byte) error {
|
||||
err := w.writeFile(p, data)
|
||||
if errors.Is(err, unix.ENOSPC) {
|
||||
return common.ErrNoSpace
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (w *linuxWriter) writeFile(p string, data []byte) error {
|
||||
fd, err := unix.Open(w.root, w.flags, w.perm)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
tmpPath := "/proc/self/fd/" + strconv.FormatUint(uint64(fd), 10)
|
||||
n, err := unix.Write(fd, data)
|
||||
if err == nil {
|
||||
if n == len(data) {
|
||||
err = unix.Linkat(unix.AT_FDCWD, tmpPath, unix.AT_FDCWD, p, unix.AT_SYMLINK_FOLLOW)
|
||||
if err == nil {
|
||||
w.counter.Inc()
|
||||
}
|
||||
if errors.Is(err, unix.EEXIST) {
|
||||
err = nil
|
||||
}
|
||||
} else {
|
||||
err = errors.New("incomplete write")
|
||||
}
|
||||
}
|
||||
errClose := unix.Close(fd)
|
||||
if err != nil {
|
||||
return err // Close() error is ignored, we have a better one.
|
||||
}
|
||||
return errClose
|
||||
}
|
||||
|
||||
func (w *linuxWriter) removeFile(p string) error {
|
||||
err := unix.Unlink(p)
|
||||
if err != nil && err == unix.ENOENT {
|
||||
return logicerr.Wrap(new(apistatus.ObjectNotFound))
|
||||
}
|
||||
if err == nil {
|
||||
w.counter.Dec()
|
||||
}
|
||||
return err
|
||||
}
|
|
@ -0,0 +1,11 @@
|
|||
//go:build !linux || fstree_generic
|
||||
|
||||
package fstree
|
||||
|
||||
import (
|
||||
"io/fs"
|
||||
)
|
||||
|
||||
func newSpecificWriteData(_ FileCounter, _ string, _ fs.FileMode, _ bool) writer {
|
||||
return nil
|
||||
}
|
|
@ -4,7 +4,6 @@ import (
|
|||
"io/fs"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||
utilSync "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/sync"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
|
@ -48,9 +47,7 @@ func WithMetrics(m Metrics) Option {
|
|||
|
||||
func WithFileCounter(c FileCounter) Option {
|
||||
return func(f *FSTree) {
|
||||
f.fileCounterEnabled = true
|
||||
f.fileCounter = c
|
||||
f.fileGuard = utilSync.NewKeyLocker[string]()
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -2,6 +2,7 @@ package shard
|
|||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io/fs"
|
||||
"math"
|
||||
"os"
|
||||
|
@ -11,7 +12,6 @@ import (
|
|||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/teststore"
|
||||
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
||||
|
@ -145,8 +145,12 @@ func TestRefillMetabaseCorrupted(t *testing.T) {
|
|||
require.NoError(t, sh.Close())
|
||||
|
||||
addr := object.AddressOf(obj)
|
||||
_, err = fsTree.Put(context.Background(), common.PutPrm{Address: addr, RawData: []byte("not an object")})
|
||||
require.NoError(t, err)
|
||||
// This is copied from `fstree.treePath()` to avoid exporting function just for tests.
|
||||
{
|
||||
saddr := addr.Object().EncodeToString() + "." + addr.Container().EncodeToString()
|
||||
p := fmt.Sprintf("%s/%s/%s", fsTree.RootPath, saddr[:2], saddr[2:])
|
||||
require.NoError(t, os.WriteFile(p, []byte("not an object"), fsTree.Permissions))
|
||||
}
|
||||
|
||||
sh = New(
|
||||
WithID(NewIDFromBytes([]byte{})),
|
||||
|
|
Loading…
Reference in a new issue
Looks like it needs to be continued.
Fixed