Use FSTree only for writecache #1367
|
@ -8,6 +8,7 @@ import (
|
|||
type DeletePrm struct {
|
||||
Address oid.Address
|
||||
StorageID []byte
|
||||
Size uint64
|
||||
}
|
||||
|
||||
// DeleteRes groups the resulting values of Delete operation.
|
||||
|
|
|
@ -338,7 +338,7 @@ func (t *FSTree) Delete(ctx context.Context, prm common.DeletePrm) (common.Delet
|
|||
}
|
||||
|
||||
p := t.treePath(prm.Address)
|
||||
err = t.writer.removeFile(p)
|
||||
err = t.writer.removeFile(p, prm.Size)
|
||||
return common.DeleteRes{}, err
|
||||
}
|
||||
|
||||
|
|
|
@ -68,34 +68,70 @@ func TestObjectCounter(t *testing.T) {
|
|||
var delPrm common.DeletePrm
|
||||
delPrm.Address = addr
|
||||
|
||||
eg, egCtx := errgroup.WithContext(context.Background())
|
||||
t.Run("without size hint", func(t *testing.T) {
|
||||
eg, egCtx := errgroup.WithContext(context.Background())
|
||||
|
||||
eg.Go(func() error {
|
||||
for range 1_000 {
|
||||
_, err := fst.Put(egCtx, putPrm)
|
||||
if err != nil {
|
||||
return err
|
||||
eg.Go(func() error {
|
||||
for range 1_000 {
|
||||
_, err := fst.Put(egCtx, putPrm)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
return nil
|
||||
})
|
||||
|
||||
eg.Go(func() error {
|
||||
var le logicerr.Logical
|
||||
for range 1_000 {
|
||||
_, err := fst.Delete(egCtx, delPrm)
|
||||
if err != nil && !errors.As(err, &le) {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
require.NoError(t, eg.Wait())
|
||||
|
||||
count, size = counter.CountSize()
|
||||
realCount, realSize, err := fst.countFiles()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, realCount, count, "real %d, actual %d", realCount, count)
|
||||
require.Equal(t, realSize, size, "real %d, actual %d", realSize, size)
|
||||
})
|
||||
|
||||
eg.Go(func() error {
|
||||
var le logicerr.Logical
|
||||
for range 1_000 {
|
||||
_, err := fst.Delete(egCtx, delPrm)
|
||||
if err != nil && !errors.As(err, &le) {
|
||||
return err
|
||||
t.Run("with size hint", func(t *testing.T) {
|
||||
delPrm.Size = uint64(len(putPrm.RawData))
|
||||
eg, egCtx := errgroup.WithContext(context.Background())
|
||||
|
||||
eg.Go(func() error {
|
||||
for range 1_000 {
|
||||
_, err := fst.Put(egCtx, putPrm)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
return nil
|
||||
})
|
||||
|
||||
eg.Go(func() error {
|
||||
var le logicerr.Logical
|
||||
for range 1_000 {
|
||||
_, err := fst.Delete(egCtx, delPrm)
|
||||
if err != nil && !errors.As(err, &le) {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
require.NoError(t, eg.Wait())
|
||||
|
||||
count, size = counter.CountSize()
|
||||
realCount, realSize, err := fst.countFiles()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, realCount, count, "real %d, actual %d", realCount, count)
|
||||
require.Equal(t, realSize, size, "real %d, actual %d", realSize, size)
|
||||
})
|
||||
|
||||
require.NoError(t, eg.Wait())
|
||||
|
||||
count, size = counter.CountSize()
|
||||
realCount, realSize, err := fst.countFiles()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, realCount, count, "real %d, actual %d", realCount, count)
|
||||
require.Equal(t, realSize, size, "real %d, actual %d", realSize, size)
|
||||
}
|
||||
|
|
|
@ -16,7 +16,7 @@ import (
|
|||
|
||||
type writer interface {
|
||||
writeData(string, []byte) error
|
||||
removeFile(string) error
|
||||
removeFile(string, uint64) error
|
||||
}
|
||||
|
||||
type genericWriter struct {
|
||||
|
@ -107,10 +107,10 @@ func (w *genericWriter) writeFile(p string, data []byte) error {
|
|||
return err
|
||||
}
|
||||
|
||||
func (w *genericWriter) removeFile(p string) error {
|
||||
func (w *genericWriter) removeFile(p string, size uint64) error {
|
||||
var err error
|
||||
if w.fileCounterEnabled {
|
||||
err = w.removeWithCounter(p)
|
||||
err = w.removeWithCounter(p, size)
|
||||
} else {
|
||||
err = os.Remove(p)
|
||||
}
|
||||
|
@ -121,18 +121,21 @@ func (w *genericWriter) removeFile(p string) error {
|
|||
return err
|
||||
}
|
||||
|
||||
|
||||
func (w *genericWriter) removeWithCounter(p string) error {
|
||||
func (w *genericWriter) removeWithCounter(p string, size uint64) error {
|
||||
w.fileGuard.Lock(p)
|
||||
defer w.fileGuard.Unlock(p)
|
||||
|
||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
Honestly, I do not like the idea of using a separate syscall just to be able to track size. Honestly, I do not like the idea of using a separate syscall just to be able to track size.
Precise estimation is obviously a new feature with non-zero cost, do we need it?
dstepanov-yadro
commented
`Bbolt + fstree` implementation estimates writecache size as `dbCount * small_object_size + fstreeCount * max_object_size`. Such estimate is not very accurate.
If the same estimate were applied for the current implementation, then the cache size limit would depend only on the objects count, and not on the actual size of the data.
fyrchik
commented
In the writecache flush thread you actually read the file before removing it, so you know the size exactly. In the writecache flush thread you actually read the file before removing it, so you know the size exactly.
Do we need to extend fstree?
dstepanov-yadro
commented
Without fstree extend (adding filelocker) it is required to move filelocker to writecache and to add Without fstree extend (adding filelocker) it is required to move filelocker to writecache and to add `fstree.Exists` check on `writecache.Put` to handle save the same object twice. `fstree.Exists` on `writecache.Put` adds `os.Stat` call on direct client request.
So I prefer to have extra syscall for background flushing but not for Put request handler.
Also now writecache passes size to `fstree.Delete` to not to call `os.Stat`
|
||||
stat, err := os.Stat(p)
|
||||
if err != nil {
|
||||
return err
|
||||
if size == 0 {
|
||||
stat, err := os.Stat(p)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
size = uint64(stat.Size())
|
||||
}
|
||||
|
||||
if err := os.Remove(p); err != nil {
|
||||
return err
|
||||
}
|
||||
w.fileCounter.Dec(uint64(stat.Size()))
|
||||
w.fileCounter.Dec(uint64(size))
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -91,25 +91,30 @@ func (w *linuxWriter) writeFile(p string, data []byte) error {
|
|||
return errClose
|
||||
}
|
||||
|
||||
func (w *linuxWriter) removeFile(p string) error {
|
||||
func (w *linuxWriter) removeFile(p string, size uint64) error {
|
||||
if w.fileCounterEnabled {
|
||||
w.fileGuard.Lock(p)
|
||||
defer w.fileGuard.Unlock(p)
|
||||
}
|
||||
var stat unix.Stat_t
|
||||
err := unix.Stat(p, &stat)
|
||||
if err != nil {
|
||||
if err == unix.ENOENT {
|
||||
return logicerr.Wrap(new(apistatus.ObjectNotFound))
|
||||
|
||||
if size == 0 {
|
||||
var stat unix.Stat_t
|
||||
err := unix.Stat(p, &stat)
|
||||
if err != nil {
|
||||
if err == unix.ENOENT {
|
||||
return logicerr.Wrap(new(apistatus.ObjectNotFound))
|
||||
}
|
||||
return err
|
||||
}
|
||||
size = uint64(stat.Size)
|
||||
}
|
||||
return err
|
||||
}
|
||||
err = unix.Unlink(p)
|
||||
|
||||
err := unix.Unlink(p)
|
||||
if err != nil && err == unix.ENOENT {
|
||||
return logicerr.Wrap(new(apistatus.ObjectNotFound))
|
||||
}
|
||||
if err == nil {
|
||||
w.fileCounter.Dec(uint64(stat.Size))
|
||||
w.fileCounter.Dec(uint64(size))
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -123,7 +123,7 @@ func (c *cache) flushIfAnObjectExistsWorker(ctx context.Context, objInfo objectI
|
|||
return
|
||||
}
|
||||
|
||||
c.deleteFromDisk(ctx, objInfo.addr)
|
||||
c.deleteFromDisk(ctx, objInfo.addr, uint64(len(res.RawData)))
|
||||
}
|
||||
|
||||
func (c *cache) reportFlushError(msg string, addr string, err error) {
|
||||
|
@ -157,7 +157,7 @@ func (c *cache) flushFSTree(ctx context.Context, ignoreErrors bool) error {
|
|||
return err
|
||||
}
|
||||
|
||||
c.deleteFromDisk(ctx, e.Address)
|
||||
c.deleteFromDisk(ctx, e.Address, uint64(len(e.ObjectData)))
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -40,8 +40,8 @@ func (c *cache) openStore(mod mode.ComponentMode) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (c *cache) deleteFromDisk(ctx context.Context, addr oid.Address) {
|
||||
_, err := c.fsTree.Delete(ctx, common.DeletePrm{Address: addr})
|
||||
func (c *cache) deleteFromDisk(ctx context.Context, addr oid.Address, size uint64) {
|
||||
_, err := c.fsTree.Delete(ctx, common.DeletePrm{Address: addr, Size: size})
|
||||
if err != nil && !client.IsErrObjectNotFound(err) {
|
||||
c.log.Error(logs.WritecacheCantRemoveObjectFromWritecache, zap.Error(err))
|
||||
} else if err == nil {
|
||||
|
|
unrelated
fixed