Use FSTree only for writecache #1367
|
@ -8,6 +8,7 @@ import (
|
||||||
type DeletePrm struct {
|
type DeletePrm struct {
|
||||||
Address oid.Address
|
Address oid.Address
|
||||||
StorageID []byte
|
StorageID []byte
|
||||||
|
Size uint64
|
||||||
}
|
}
|
||||||
|
|
||||||
// DeleteRes groups the resulting values of Delete operation.
|
// DeleteRes groups the resulting values of Delete operation.
|
||||||
|
|
|
@ -338,7 +338,7 @@ func (t *FSTree) Delete(ctx context.Context, prm common.DeletePrm) (common.Delet
|
||||||
}
|
}
|
||||||
|
|
||||||
p := t.treePath(prm.Address)
|
p := t.treePath(prm.Address)
|
||||||
err = t.writer.removeFile(p)
|
err = t.writer.removeFile(p, prm.Size)
|
||||||
return common.DeleteRes{}, err
|
return common.DeleteRes{}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -68,34 +68,70 @@ func TestObjectCounter(t *testing.T) {
|
||||||
var delPrm common.DeletePrm
|
var delPrm common.DeletePrm
|
||||||
delPrm.Address = addr
|
delPrm.Address = addr
|
||||||
|
|
||||||
eg, egCtx := errgroup.WithContext(context.Background())
|
t.Run("without size hint", func(t *testing.T) {
|
||||||
|
eg, egCtx := errgroup.WithContext(context.Background())
|
||||||
|
|
||||||
eg.Go(func() error {
|
eg.Go(func() error {
|
||||||
for range 1_000 {
|
for range 1_000 {
|
||||||
_, err := fst.Put(egCtx, putPrm)
|
_, err := fst.Put(egCtx, putPrm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
return nil
|
||||||
return nil
|
})
|
||||||
|
|
||||||
|
eg.Go(func() error {
|
||||||
|
var le logicerr.Logical
|
||||||
|
for range 1_000 {
|
||||||
|
_, err := fst.Delete(egCtx, delPrm)
|
||||||
|
if err != nil && !errors.As(err, &le) {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
|
||||||
|
require.NoError(t, eg.Wait())
|
||||||
|
|
||||||
|
count, size = counter.CountSize()
|
||||||
|
realCount, realSize, err := fst.countFiles()
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, realCount, count, "real %d, actual %d", realCount, count)
|
||||||
|
require.Equal(t, realSize, size, "real %d, actual %d", realSize, size)
|
||||||
})
|
})
|
||||||
|
|
||||||
eg.Go(func() error {
|
t.Run("with size hint", func(t *testing.T) {
|
||||||
var le logicerr.Logical
|
delPrm.Size = uint64(len(putPrm.RawData))
|
||||||
for range 1_000 {
|
eg, egCtx := errgroup.WithContext(context.Background())
|
||||||
_, err := fst.Delete(egCtx, delPrm)
|
|
||||||
if err != nil && !errors.As(err, &le) {
|
eg.Go(func() error {
|
||||||
return err
|
for range 1_000 {
|
||||||
|
_, err := fst.Put(egCtx, putPrm)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
return nil
|
||||||
return nil
|
})
|
||||||
|
|
||||||
|
eg.Go(func() error {
|
||||||
|
var le logicerr.Logical
|
||||||
|
for range 1_000 {
|
||||||
|
_, err := fst.Delete(egCtx, delPrm)
|
||||||
|
if err != nil && !errors.As(err, &le) {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
|
||||||
|
require.NoError(t, eg.Wait())
|
||||||
|
|
||||||
|
count, size = counter.CountSize()
|
||||||
|
realCount, realSize, err := fst.countFiles()
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, realCount, count, "real %d, actual %d", realCount, count)
|
||||||
|
require.Equal(t, realSize, size, "real %d, actual %d", realSize, size)
|
||||||
})
|
})
|
||||||
|
|
||||||
require.NoError(t, eg.Wait())
|
|
||||||
|
|
||||||
count, size = counter.CountSize()
|
|
||||||
realCount, realSize, err := fst.countFiles()
|
|
||||||
require.NoError(t, err)
|
|
||||||
require.Equal(t, realCount, count, "real %d, actual %d", realCount, count)
|
|
||||||
require.Equal(t, realSize, size, "real %d, actual %d", realSize, size)
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -16,7 +16,7 @@ import (
|
||||||
|
|
||||||
type writer interface {
|
type writer interface {
|
||||||
writeData(string, []byte) error
|
writeData(string, []byte) error
|
||||||
removeFile(string) error
|
removeFile(string, uint64) error
|
||||||
}
|
}
|
||||||
|
|
||||||
type genericWriter struct {
|
type genericWriter struct {
|
||||||
|
@ -107,10 +107,10 @@ func (w *genericWriter) writeFile(p string, data []byte) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *genericWriter) removeFile(p string) error {
|
func (w *genericWriter) removeFile(p string, size uint64) error {
|
||||||
var err error
|
var err error
|
||||||
if w.fileCounterEnabled {
|
if w.fileCounterEnabled {
|
||||||
err = w.removeWithCounter(p)
|
err = w.removeWithCounter(p, size)
|
||||||
} else {
|
} else {
|
||||||
err = os.Remove(p)
|
err = os.Remove(p)
|
||||||
}
|
}
|
||||||
|
@ -121,18 +121,21 @@ func (w *genericWriter) removeFile(p string) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|||||||
|
|
||||||
func (w *genericWriter) removeWithCounter(p string) error {
|
func (w *genericWriter) removeWithCounter(p string, size uint64) error {
|
||||||
w.fileGuard.Lock(p)
|
w.fileGuard.Lock(p)
|
||||||
defer w.fileGuard.Unlock(p)
|
defer w.fileGuard.Unlock(p)
|
||||||
|
|
||||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
Honestly, I do not like the idea of using a separate syscall just to be able to track size. Honestly, I do not like the idea of using a separate syscall just to be able to track size.
Precise estimation is obviously a new feature with non-zero cost, do we need it?
dstepanov-yadro
commented
`Bbolt + fstree` implementation estimates writecache size as `dbCount * small_object_size + fstreeCount * max_object_size`. Such estimate is not very accurate.
If the same estimate were applied for the current implementation, then the cache size limit would depend only on the objects count, and not on the actual size of the data.
fyrchik
commented
In the writecache flush thread you actually read the file before removing it, so you know the size exactly. In the writecache flush thread you actually read the file before removing it, so you know the size exactly.
Do we need to extend fstree?
dstepanov-yadro
commented
Without fstree extend (adding filelocker) it is required to move filelocker to writecache and to add Without fstree extend (adding filelocker) it is required to move filelocker to writecache and to add `fstree.Exists` check on `writecache.Put` to handle save the same object twice. `fstree.Exists` on `writecache.Put` adds `os.Stat` call on direct client request.
So I prefer to have extra syscall for background flushing but not for Put request handler.
Also now writecache passes size to `fstree.Delete` to not to call `os.Stat`
|
|||||||
stat, err := os.Stat(p)
|
if size == 0 {
|
||||||
if err != nil {
|
stat, err := os.Stat(p)
|
||||||
return err
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
size = uint64(stat.Size())
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := os.Remove(p); err != nil {
|
if err := os.Remove(p); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
w.fileCounter.Dec(uint64(stat.Size()))
|
w.fileCounter.Dec(uint64(size))
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -91,25 +91,30 @@ func (w *linuxWriter) writeFile(p string, data []byte) error {
|
||||||
return errClose
|
return errClose
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *linuxWriter) removeFile(p string) error {
|
func (w *linuxWriter) removeFile(p string, size uint64) error {
|
||||||
if w.fileCounterEnabled {
|
if w.fileCounterEnabled {
|
||||||
w.fileGuard.Lock(p)
|
w.fileGuard.Lock(p)
|
||||||
defer w.fileGuard.Unlock(p)
|
defer w.fileGuard.Unlock(p)
|
||||||
}
|
|
||||||
var stat unix.Stat_t
|
if size == 0 {
|
||||||
err := unix.Stat(p, &stat)
|
var stat unix.Stat_t
|
||||||
if err != nil {
|
err := unix.Stat(p, &stat)
|
||||||
if err == unix.ENOENT {
|
if err != nil {
|
||||||
return logicerr.Wrap(new(apistatus.ObjectNotFound))
|
if err == unix.ENOENT {
|
||||||
|
return logicerr.Wrap(new(apistatus.ObjectNotFound))
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
size = uint64(stat.Size)
|
||||||
}
|
}
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
err = unix.Unlink(p)
|
|
||||||
|
err := unix.Unlink(p)
|
||||||
if err != nil && err == unix.ENOENT {
|
if err != nil && err == unix.ENOENT {
|
||||||
return logicerr.Wrap(new(apistatus.ObjectNotFound))
|
return logicerr.Wrap(new(apistatus.ObjectNotFound))
|
||||||
}
|
}
|
||||||
if err == nil {
|
if err == nil {
|
||||||
w.fileCounter.Dec(uint64(stat.Size))
|
w.fileCounter.Dec(uint64(size))
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
@ -123,7 +123,7 @@ func (c *cache) flushIfAnObjectExistsWorker(ctx context.Context, objInfo objectI
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
c.deleteFromDisk(ctx, objInfo.addr)
|
c.deleteFromDisk(ctx, objInfo.addr, uint64(len(res.RawData)))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *cache) reportFlushError(msg string, addr string, err error) {
|
func (c *cache) reportFlushError(msg string, addr string, err error) {
|
||||||
|
@ -157,7 +157,7 @@ func (c *cache) flushFSTree(ctx context.Context, ignoreErrors bool) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
c.deleteFromDisk(ctx, e.Address)
|
c.deleteFromDisk(ctx, e.Address, uint64(len(e.ObjectData)))
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -40,8 +40,8 @@ func (c *cache) openStore(mod mode.ComponentMode) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *cache) deleteFromDisk(ctx context.Context, addr oid.Address) {
|
func (c *cache) deleteFromDisk(ctx context.Context, addr oid.Address, size uint64) {
|
||||||
_, err := c.fsTree.Delete(ctx, common.DeletePrm{Address: addr})
|
_, err := c.fsTree.Delete(ctx, common.DeletePrm{Address: addr, Size: size})
|
||||||
if err != nil && !client.IsErrObjectNotFound(err) {
|
if err != nil && !client.IsErrObjectNotFound(err) {
|
||||||
c.log.Error(logs.WritecacheCantRemoveObjectFromWritecache, zap.Error(err))
|
c.log.Error(logs.WritecacheCantRemoveObjectFromWritecache, zap.Error(err))
|
||||||
} else if err == nil {
|
} else if err == nil {
|
||||||
|
|
unrelated
fixed