node: Stop flushing big object when termination signal received #379

Merged
fyrchik merged 1 commit from acid-ant/frostfs-node:bugfix/364-fix-flush into master 2023-07-26 21:07:58 +00:00
11 changed files with 39 additions and 21 deletions

View file

@ -1,6 +1,7 @@
package blobovniczatree package blobovniczatree
import ( import (
"context"
"fmt" "fmt"
"path/filepath" "path/filepath"
@ -11,7 +12,7 @@ import (
) )
// Iterate iterates over all objects in b. // Iterate iterates over all objects in b.
func (b *Blobovniczas) Iterate(prm common.IteratePrm) (common.IterateRes, error) { func (b *Blobovniczas) Iterate(_ context.Context, prm common.IteratePrm) (common.IterateRes, error) {

Shouldn't it respect context too?

Shouldn't it respect context too?

I prefer to do this in separate PR - another bunch of files need to refactor. Created #394 for tracking.

I prefer to do this in separate PR - another bunch of files need to refactor. Created #394 for tracking.
return common.IterateRes{}, b.iterateBlobovniczas(prm.IgnoreErrors, func(p string, blz *blobovnicza.Blobovnicza) error { return common.IterateRes{}, b.iterateBlobovniczas(prm.IgnoreErrors, func(p string, blz *blobovnicza.Blobovnicza) error {
var subPrm blobovnicza.IteratePrm var subPrm blobovnicza.IteratePrm
subPrm.SetHandler(func(elem blobovnicza.IterationElement) error { subPrm.SetHandler(func(elem blobovnicza.IterationElement) error {

View file

@ -25,5 +25,5 @@ type Storage interface {
Exists(context.Context, ExistsPrm) (ExistsRes, error) Exists(context.Context, ExistsPrm) (ExistsRes, error)
Put(context.Context, PutPrm) (PutRes, error) Put(context.Context, PutPrm) (PutRes, error)
Delete(context.Context, DeletePrm) (DeleteRes, error) Delete(context.Context, DeletePrm) (DeleteRes, error)
Iterate(IteratePrm) (IterateRes, error) Iterate(context.Context, IteratePrm) (IterateRes, error)
} }

View file

@ -100,11 +100,11 @@ func addressFromString(s string) (oid.Address, error) {
} }
// Iterate iterates over all stored objects. // Iterate iterates over all stored objects.
func (t *FSTree) Iterate(prm common.IteratePrm) (common.IterateRes, error) { func (t *FSTree) Iterate(ctx context.Context, prm common.IteratePrm) (common.IterateRes, error) {
return common.IterateRes{}, t.iterate(0, []string{t.RootPath}, prm) return common.IterateRes{}, t.iterate(ctx, 0, []string{t.RootPath}, prm)
} }
func (t *FSTree) iterate(depth uint64, curPath []string, prm common.IteratePrm) error { func (t *FSTree) iterate(ctx context.Context, depth uint64, curPath []string, prm common.IteratePrm) error {
curName := strings.Join(curPath[1:], "") curName := strings.Join(curPath[1:], "")
des, err := os.ReadDir(filepath.Join(curPath...)) des, err := os.ReadDir(filepath.Join(curPath...))
if err != nil { if err != nil {
@ -119,10 +119,15 @@ func (t *FSTree) iterate(depth uint64, curPath []string, prm common.IteratePrm)
curPath = append(curPath, "") curPath = append(curPath, "")
for i := range des { for i := range des {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
curPath[l] = des[i].Name() curPath[l] = des[i].Name()
if !isLast && des[i].IsDir() { if !isLast && des[i].IsDir() {
err := t.iterate(depth+1, curPath, prm) err := t.iterate(ctx, depth+1, curPath, prm)
if err != nil { if err != nil {
// Must be error from handler in case errors are ignored. // Must be error from handler in case errors are ignored.
// Need to report. // Need to report.

View file

@ -49,7 +49,7 @@ func runTestNormalHandler(t *testing.T, s common.Storage, objects []objectDesc)
return nil return nil
} }
_, err := s.Iterate(iterPrm) _, err := s.Iterate(context.Background(), iterPrm)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, len(objects), len(seen)) require.Equal(t, len(objects), len(seen))
for i := range objects { for i := range objects {
@ -72,7 +72,7 @@ func runTestLazyHandler(t *testing.T, s common.Storage, objects []objectDesc) {
return nil return nil
} }
_, err := s.Iterate(iterPrm) _, err := s.Iterate(context.Background(), iterPrm)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, len(objects), len(seen)) require.Equal(t, len(objects), len(seen))
for i := range objects { for i := range objects {
@ -107,7 +107,7 @@ func runTestIgnoreLogicalErrors(t *testing.T, s common.Storage, objects []object
return nil return nil
} }
_, err := s.Iterate(iterPrm) _, err := s.Iterate(context.Background(), iterPrm)
require.Equal(t, err, logicErr) require.Equal(t, err, logicErr)
require.Equal(t, len(objects)/2, len(seen)) require.Equal(t, len(objects)/2, len(seen))
for i := range objects { for i := range objects {

View file

@ -1,6 +1,7 @@
package blobstor package blobstor
import ( import (
"context"
"fmt" "fmt"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
@ -16,12 +17,12 @@ import (
// did not allow to completely iterate over the storage. // did not allow to completely iterate over the storage.
// //
// If handler returns an error, method wraps and returns it immediately. // If handler returns an error, method wraps and returns it immediately.
func (b *BlobStor) Iterate(prm common.IteratePrm) (common.IterateRes, error) { func (b *BlobStor) Iterate(ctx context.Context, prm common.IteratePrm) (common.IterateRes, error) {
b.modeMtx.RLock() b.modeMtx.RLock()
defer b.modeMtx.RUnlock() defer b.modeMtx.RUnlock()
for i := range b.storage { for i := range b.storage {
_, err := b.storage[i].Storage.Iterate(prm) _, err := b.storage[i].Storage.Iterate(ctx, prm)
if err != nil && !prm.IgnoreErrors { if err != nil && !prm.IgnoreErrors {
return common.IterateRes{}, fmt.Errorf("blobstor iterator failure: %w", err) return common.IterateRes{}, fmt.Errorf("blobstor iterator failure: %w", err)
} }
@ -31,7 +32,7 @@ func (b *BlobStor) Iterate(prm common.IteratePrm) (common.IterateRes, error) {
// IterateBinaryObjects is a helper function which iterates over BlobStor and passes binary objects to f. // IterateBinaryObjects is a helper function which iterates over BlobStor and passes binary objects to f.
// Errors related to object reading and unmarshaling are logged and skipped. // Errors related to object reading and unmarshaling are logged and skipped.
func IterateBinaryObjects(blz *BlobStor, f func(addr oid.Address, data []byte, descriptor []byte) error) error { func IterateBinaryObjects(ctx context.Context, blz *BlobStor, f func(addr oid.Address, data []byte, descriptor []byte) error) error {
var prm common.IteratePrm var prm common.IteratePrm
prm.Handler = func(elem common.IterationElement) error { prm.Handler = func(elem common.IterationElement) error {
@ -45,7 +46,7 @@ func IterateBinaryObjects(blz *BlobStor, f func(addr oid.Address, data []byte, d
return nil return nil
} }
_, err := blz.Iterate(prm) _, err := blz.Iterate(ctx, prm)
return err return err
} }

View file

@ -68,7 +68,7 @@ func TestIterateObjects(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
} }
err := IterateBinaryObjects(blobStor, func(addr oid.Address, data []byte, descriptor []byte) error { err := IterateBinaryObjects(context.Background(), blobStor, func(addr oid.Address, data []byte, descriptor []byte) error {
v, ok := mObjs[string(data)] v, ok := mObjs[string(data)]
require.True(t, ok) require.True(t, ok)

View file

@ -126,7 +126,7 @@ func (s *memstoreImpl) Delete(_ context.Context, req common.DeletePrm) (common.D
return common.DeleteRes{}, logicerr.Wrap(apistatus.ObjectNotFound{}) return common.DeleteRes{}, logicerr.Wrap(apistatus.ObjectNotFound{})
} }
func (s *memstoreImpl) Iterate(req common.IteratePrm) (common.IterateRes, error) { func (s *memstoreImpl) Iterate(_ context.Context, req common.IteratePrm) (common.IterateRes, error) {
s.mu.RLock() s.mu.RLock()
defer s.mu.RUnlock() defer s.mu.RUnlock()
for k, v := range s.objs { for k, v := range s.objs {

View file

@ -207,7 +207,7 @@ func BenchmarkSubstorageIteratePerf(b *testing.B) {
// Benchmark iterate // Benchmark iterate
cnt := 0 cnt := 0
b.ResetTimer() b.ResetTimer()
_, err := st.Iterate(common.IteratePrm{ _, err := st.Iterate(context.Background(), common.IteratePrm{
Handler: func(elem common.IterationElement) error { Handler: func(elem common.IterationElement) error {
cnt++ cnt++
return nil return nil

View file

@ -202,14 +202,14 @@ func (s *TestStore) Delete(ctx context.Context, req common.DeletePrm) (common.De
} }
} }
func (s *TestStore) Iterate(req common.IteratePrm) (common.IterateRes, error) { func (s *TestStore) Iterate(ctx context.Context, req common.IteratePrm) (common.IterateRes, error) {
s.mu.RLock() s.mu.RLock()
defer s.mu.RUnlock() defer s.mu.RUnlock()
switch { switch {
case s.overrides.Iterate != nil: case s.overrides.Iterate != nil:
return s.overrides.Iterate(req) return s.overrides.Iterate(req)
case s.st != nil: case s.st != nil:
return s.st.Iterate(req) return s.st.Iterate(ctx, req)
default: default:
panic(fmt.Sprintf("unexpected storage call: Iterate(%+v)", req)) panic(fmt.Sprintf("unexpected storage call: Iterate(%+v)", req))
} }

View file

@ -170,7 +170,7 @@ func (s *Shard) refillMetabase(ctx context.Context) error {
obj := objectSDK.New() obj := objectSDK.New()
err = blobstor.IterateBinaryObjects(s.blobStor, func(addr oid.Address, data []byte, descriptor []byte) error { err = blobstor.IterateBinaryObjects(ctx, s.blobStor, func(addr oid.Address, data []byte, descriptor []byte) error {
if err := obj.Unmarshal(data); err != nil { if err := obj.Unmarshal(data); err != nil {
s.log.Warn(logs.ShardCouldNotUnmarshalObject, s.log.Warn(logs.ShardCouldNotUnmarshalObject,
zap.Stringer("address", addr), zap.Stringer("address", addr),

View file

@ -35,6 +35,17 @@ const (
// runFlushLoop starts background workers which periodically flush objects to the blobstor. // runFlushLoop starts background workers which periodically flush objects to the blobstor.
func (c *cache) runFlushLoop() { func (c *cache) runFlushLoop() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ch := c.closeCh
c.wg.Add(1)
go func() {
<-ch
cancel()
c.wg.Done()
}()
for i := 0; i < c.workersCount; i++ { for i := 0; i < c.workersCount; i++ {
c.wg.Add(1) c.wg.Add(1)
go c.workerFlushSmall() go c.workerFlushSmall()
@ -42,7 +53,7 @@ func (c *cache) runFlushLoop() {
c.wg.Add(1) c.wg.Add(1)
go func() { go func() {
c.workerFlushBig(context.TODO()) c.workerFlushBig(ctx)
c.wg.Done() c.wg.Done()
}() }()
@ -211,7 +222,7 @@ func (c *cache) flushFSTree(ctx context.Context, ignoreErrors bool) error {
return nil return nil
} }
_, err := c.fsTree.Iterate(prm) _, err := c.fsTree.Iterate(ctx, prm)
return err return err
} }