node: Stop flushing big object when termination signal received #379

Merged
fyrchik merged 1 commits from acid-ant/frostfs-node:bugfix/364-fix-flush into master 2023-07-26 21:07:58 +00:00
11 changed files with 39 additions and 21 deletions

View File

@ -1,6 +1,7 @@
package blobovniczatree
import (
"context"
"fmt"
"path/filepath"
@ -11,7 +12,7 @@ import (
)
// Iterate iterates over all objects in b.
func (b *Blobovniczas) Iterate(prm common.IteratePrm) (common.IterateRes, error) {
func (b *Blobovniczas) Iterate(_ context.Context, prm common.IteratePrm) (common.IterateRes, error) {
return common.IterateRes{}, b.iterateBlobovniczas(prm.IgnoreErrors, func(p string, blz *blobovnicza.Blobovnicza) error {
var subPrm blobovnicza.IteratePrm
subPrm.SetHandler(func(elem blobovnicza.IterationElement) error {

View File

@ -25,5 +25,5 @@ type Storage interface {
Exists(context.Context, ExistsPrm) (ExistsRes, error)
Put(context.Context, PutPrm) (PutRes, error)
Delete(context.Context, DeletePrm) (DeleteRes, error)
Iterate(IteratePrm) (IterateRes, error)
Iterate(context.Context, IteratePrm) (IterateRes, error)
}

View File

@ -100,11 +100,11 @@ func addressFromString(s string) (oid.Address, error) {
}
// Iterate iterates over all stored objects.
func (t *FSTree) Iterate(prm common.IteratePrm) (common.IterateRes, error) {
return common.IterateRes{}, t.iterate(0, []string{t.RootPath}, prm)
func (t *FSTree) Iterate(ctx context.Context, prm common.IteratePrm) (common.IterateRes, error) {
return common.IterateRes{}, t.iterate(ctx, 0, []string{t.RootPath}, prm)
}
func (t *FSTree) iterate(depth uint64, curPath []string, prm common.IteratePrm) error {
func (t *FSTree) iterate(ctx context.Context, depth uint64, curPath []string, prm common.IteratePrm) error {
curName := strings.Join(curPath[1:], "")
des, err := os.ReadDir(filepath.Join(curPath...))
if err != nil {
@ -119,10 +119,15 @@ func (t *FSTree) iterate(depth uint64, curPath []string, prm common.IteratePrm)
curPath = append(curPath, "")
for i := range des {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
curPath[l] = des[i].Name()
if !isLast && des[i].IsDir() {
err := t.iterate(depth+1, curPath, prm)
err := t.iterate(ctx, depth+1, curPath, prm)
if err != nil {
// Must be error from handler in case errors are ignored.
// Need to report.

View File

@ -49,7 +49,7 @@ func runTestNormalHandler(t *testing.T, s common.Storage, objects []objectDesc)
return nil
}
_, err := s.Iterate(iterPrm)
_, err := s.Iterate(context.Background(), iterPrm)
require.NoError(t, err)
require.Equal(t, len(objects), len(seen))
for i := range objects {
@ -72,7 +72,7 @@ func runTestLazyHandler(t *testing.T, s common.Storage, objects []objectDesc) {
return nil
}
_, err := s.Iterate(iterPrm)
_, err := s.Iterate(context.Background(), iterPrm)
require.NoError(t, err)
require.Equal(t, len(objects), len(seen))
for i := range objects {
@ -107,7 +107,7 @@ func runTestIgnoreLogicalErrors(t *testing.T, s common.Storage, objects []object
return nil
}
_, err := s.Iterate(iterPrm)
_, err := s.Iterate(context.Background(), iterPrm)
require.Equal(t, err, logicErr)
require.Equal(t, len(objects)/2, len(seen))
for i := range objects {

View File

@ -1,6 +1,7 @@
package blobstor
import (
"context"
"fmt"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
@ -16,12 +17,12 @@ import (
// did not allow to completely iterate over the storage.
//
// If handler returns an error, method wraps and returns it immediately.
func (b *BlobStor) Iterate(prm common.IteratePrm) (common.IterateRes, error) {
func (b *BlobStor) Iterate(ctx context.Context, prm common.IteratePrm) (common.IterateRes, error) {
b.modeMtx.RLock()
defer b.modeMtx.RUnlock()
for i := range b.storage {
_, err := b.storage[i].Storage.Iterate(prm)
_, err := b.storage[i].Storage.Iterate(ctx, prm)
if err != nil && !prm.IgnoreErrors {
return common.IterateRes{}, fmt.Errorf("blobstor iterator failure: %w", err)
}
@ -31,7 +32,7 @@ func (b *BlobStor) Iterate(prm common.IteratePrm) (common.IterateRes, error) {
// IterateBinaryObjects is a helper function which iterates over BlobStor and passes binary objects to f.
// Errors related to object reading and unmarshaling are logged and skipped.
func IterateBinaryObjects(blz *BlobStor, f func(addr oid.Address, data []byte, descriptor []byte) error) error {
func IterateBinaryObjects(ctx context.Context, blz *BlobStor, f func(addr oid.Address, data []byte, descriptor []byte) error) error {
var prm common.IteratePrm
prm.Handler = func(elem common.IterationElement) error {
@ -45,7 +46,7 @@ func IterateBinaryObjects(blz *BlobStor, f func(addr oid.Address, data []byte, d
return nil
}
_, err := blz.Iterate(prm)
_, err := blz.Iterate(ctx, prm)
return err
}

View File

@ -68,7 +68,7 @@ func TestIterateObjects(t *testing.T) {
require.NoError(t, err)
}
err := IterateBinaryObjects(blobStor, func(addr oid.Address, data []byte, descriptor []byte) error {
err := IterateBinaryObjects(context.Background(), blobStor, func(addr oid.Address, data []byte, descriptor []byte) error {
v, ok := mObjs[string(data)]
require.True(t, ok)

View File

@ -126,7 +126,7 @@ func (s *memstoreImpl) Delete(_ context.Context, req common.DeletePrm) (common.D
return common.DeleteRes{}, logicerr.Wrap(apistatus.ObjectNotFound{})
}
func (s *memstoreImpl) Iterate(req common.IteratePrm) (common.IterateRes, error) {
func (s *memstoreImpl) Iterate(_ context.Context, req common.IteratePrm) (common.IterateRes, error) {
s.mu.RLock()
defer s.mu.RUnlock()
for k, v := range s.objs {

View File

@ -207,7 +207,7 @@ func BenchmarkSubstorageIteratePerf(b *testing.B) {
// Benchmark iterate
cnt := 0
b.ResetTimer()
_, err := st.Iterate(common.IteratePrm{
_, err := st.Iterate(context.Background(), common.IteratePrm{
Handler: func(elem common.IterationElement) error {
cnt++
return nil

View File

@ -202,14 +202,14 @@ func (s *TestStore) Delete(ctx context.Context, req common.DeletePrm) (common.De
}
}
func (s *TestStore) Iterate(req common.IteratePrm) (common.IterateRes, error) {
func (s *TestStore) Iterate(ctx context.Context, req common.IteratePrm) (common.IterateRes, error) {
s.mu.RLock()
defer s.mu.RUnlock()
switch {
case s.overrides.Iterate != nil:
return s.overrides.Iterate(req)
case s.st != nil:
return s.st.Iterate(req)
return s.st.Iterate(ctx, req)
default:
panic(fmt.Sprintf("unexpected storage call: Iterate(%+v)", req))
}

View File

@ -170,7 +170,7 @@ func (s *Shard) refillMetabase(ctx context.Context) error {
obj := objectSDK.New()
err = blobstor.IterateBinaryObjects(s.blobStor, func(addr oid.Address, data []byte, descriptor []byte) error {
err = blobstor.IterateBinaryObjects(ctx, s.blobStor, func(addr oid.Address, data []byte, descriptor []byte) error {
if err := obj.Unmarshal(data); err != nil {
s.log.Warn(logs.ShardCouldNotUnmarshalObject,
zap.Stringer("address", addr),

View File

@ -35,6 +35,17 @@ const (
// runFlushLoop starts background workers which periodically flush objects to the blobstor.
func (c *cache) runFlushLoop() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ch := c.closeCh
c.wg.Add(1)
go func() {
<-ch
cancel()
c.wg.Done()
}()
for i := 0; i < c.workersCount; i++ {
c.wg.Add(1)
go c.workerFlushSmall()
@ -42,7 +53,7 @@ func (c *cache) runFlushLoop() {
c.wg.Add(1)
go func() {
c.workerFlushBig(context.TODO())
c.workerFlushBig(ctx)
c.wg.Done()
}()
@ -211,7 +222,7 @@ func (c *cache) flushFSTree(ctx context.Context, ignoreErrors bool) error {
return nil
}
_, err := c.fsTree.Iterate(prm)
_, err := c.fsTree.Iterate(ctx, prm)
return err
}