node: Stop flushing big object when termination signal received #379
11 changed files with 39 additions and 21 deletions
|
@ -1,6 +1,7 @@
|
|||
package blobovniczatree
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"path/filepath"
|
||||
|
||||
|
@ -11,7 +12,7 @@ import (
|
|||
)
|
||||
|
||||
// Iterate iterates over all objects in b.
|
||||
func (b *Blobovniczas) Iterate(prm common.IteratePrm) (common.IterateRes, error) {
|
||||
func (b *Blobovniczas) Iterate(_ context.Context, prm common.IteratePrm) (common.IterateRes, error) {
|
||||
|
||||
return common.IterateRes{}, b.iterateBlobovniczas(prm.IgnoreErrors, func(p string, blz *blobovnicza.Blobovnicza) error {
|
||||
var subPrm blobovnicza.IteratePrm
|
||||
subPrm.SetHandler(func(elem blobovnicza.IterationElement) error {
|
||||
|
|
|
@ -25,5 +25,5 @@ type Storage interface {
|
|||
Exists(context.Context, ExistsPrm) (ExistsRes, error)
|
||||
Put(context.Context, PutPrm) (PutRes, error)
|
||||
Delete(context.Context, DeletePrm) (DeleteRes, error)
|
||||
Iterate(IteratePrm) (IterateRes, error)
|
||||
Iterate(context.Context, IteratePrm) (IterateRes, error)
|
||||
}
|
||||
|
|
|
@ -100,11 +100,11 @@ func addressFromString(s string) (oid.Address, error) {
|
|||
}
|
||||
|
||||
// Iterate iterates over all stored objects.
|
||||
func (t *FSTree) Iterate(prm common.IteratePrm) (common.IterateRes, error) {
|
||||
return common.IterateRes{}, t.iterate(0, []string{t.RootPath}, prm)
|
||||
func (t *FSTree) Iterate(ctx context.Context, prm common.IteratePrm) (common.IterateRes, error) {
|
||||
return common.IterateRes{}, t.iterate(ctx, 0, []string{t.RootPath}, prm)
|
||||
}
|
||||
|
||||
func (t *FSTree) iterate(depth uint64, curPath []string, prm common.IteratePrm) error {
|
||||
func (t *FSTree) iterate(ctx context.Context, depth uint64, curPath []string, prm common.IteratePrm) error {
|
||||
curName := strings.Join(curPath[1:], "")
|
||||
des, err := os.ReadDir(filepath.Join(curPath...))
|
||||
if err != nil {
|
||||
|
@ -119,10 +119,15 @@ func (t *FSTree) iterate(depth uint64, curPath []string, prm common.IteratePrm)
|
|||
curPath = append(curPath, "")
|
||||
|
||||
for i := range des {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
default:
|
||||
}
|
||||
curPath[l] = des[i].Name()
|
||||
|
||||
if !isLast && des[i].IsDir() {
|
||||
err := t.iterate(depth+1, curPath, prm)
|
||||
err := t.iterate(ctx, depth+1, curPath, prm)
|
||||
if err != nil {
|
||||
// Must be error from handler in case errors are ignored.
|
||||
// Need to report.
|
||||
|
|
|
@ -49,7 +49,7 @@ func runTestNormalHandler(t *testing.T, s common.Storage, objects []objectDesc)
|
|||
return nil
|
||||
}
|
||||
|
||||
_, err := s.Iterate(iterPrm)
|
||||
_, err := s.Iterate(context.Background(), iterPrm)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, len(objects), len(seen))
|
||||
for i := range objects {
|
||||
|
@ -72,7 +72,7 @@ func runTestLazyHandler(t *testing.T, s common.Storage, objects []objectDesc) {
|
|||
return nil
|
||||
}
|
||||
|
||||
_, err := s.Iterate(iterPrm)
|
||||
_, err := s.Iterate(context.Background(), iterPrm)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, len(objects), len(seen))
|
||||
for i := range objects {
|
||||
|
@ -107,7 +107,7 @@ func runTestIgnoreLogicalErrors(t *testing.T, s common.Storage, objects []object
|
|||
return nil
|
||||
}
|
||||
|
||||
_, err := s.Iterate(iterPrm)
|
||||
_, err := s.Iterate(context.Background(), iterPrm)
|
||||
require.Equal(t, err, logicErr)
|
||||
require.Equal(t, len(objects)/2, len(seen))
|
||||
for i := range objects {
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package blobstor
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
|
@ -16,12 +17,12 @@ import (
|
|||
// did not allow to completely iterate over the storage.
|
||||
//
|
||||
// If handler returns an error, method wraps and returns it immediately.
|
||||
func (b *BlobStor) Iterate(prm common.IteratePrm) (common.IterateRes, error) {
|
||||
func (b *BlobStor) Iterate(ctx context.Context, prm common.IteratePrm) (common.IterateRes, error) {
|
||||
b.modeMtx.RLock()
|
||||
defer b.modeMtx.RUnlock()
|
||||
|
||||
for i := range b.storage {
|
||||
_, err := b.storage[i].Storage.Iterate(prm)
|
||||
_, err := b.storage[i].Storage.Iterate(ctx, prm)
|
||||
if err != nil && !prm.IgnoreErrors {
|
||||
return common.IterateRes{}, fmt.Errorf("blobstor iterator failure: %w", err)
|
||||
}
|
||||
|
@ -31,7 +32,7 @@ func (b *BlobStor) Iterate(prm common.IteratePrm) (common.IterateRes, error) {
|
|||
|
||||
// IterateBinaryObjects is a helper function which iterates over BlobStor and passes binary objects to f.
|
||||
// Errors related to object reading and unmarshaling are logged and skipped.
|
||||
func IterateBinaryObjects(blz *BlobStor, f func(addr oid.Address, data []byte, descriptor []byte) error) error {
|
||||
func IterateBinaryObjects(ctx context.Context, blz *BlobStor, f func(addr oid.Address, data []byte, descriptor []byte) error) error {
|
||||
var prm common.IteratePrm
|
||||
|
||||
prm.Handler = func(elem common.IterationElement) error {
|
||||
|
@ -45,7 +46,7 @@ func IterateBinaryObjects(blz *BlobStor, f func(addr oid.Address, data []byte, d
|
|||
return nil
|
||||
}
|
||||
|
||||
_, err := blz.Iterate(prm)
|
||||
_, err := blz.Iterate(ctx, prm)
|
||||
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -68,7 +68,7 @@ func TestIterateObjects(t *testing.T) {
|
|||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
err := IterateBinaryObjects(blobStor, func(addr oid.Address, data []byte, descriptor []byte) error {
|
||||
err := IterateBinaryObjects(context.Background(), blobStor, func(addr oid.Address, data []byte, descriptor []byte) error {
|
||||
v, ok := mObjs[string(data)]
|
||||
require.True(t, ok)
|
||||
|
||||
|
|
|
@ -126,7 +126,7 @@ func (s *memstoreImpl) Delete(_ context.Context, req common.DeletePrm) (common.D
|
|||
return common.DeleteRes{}, logicerr.Wrap(apistatus.ObjectNotFound{})
|
||||
}
|
||||
|
||||
func (s *memstoreImpl) Iterate(req common.IteratePrm) (common.IterateRes, error) {
|
||||
func (s *memstoreImpl) Iterate(_ context.Context, req common.IteratePrm) (common.IterateRes, error) {
|
||||
s.mu.RLock()
|
||||
defer s.mu.RUnlock()
|
||||
for k, v := range s.objs {
|
||||
|
|
|
@ -207,7 +207,7 @@ func BenchmarkSubstorageIteratePerf(b *testing.B) {
|
|||
// Benchmark iterate
|
||||
cnt := 0
|
||||
b.ResetTimer()
|
||||
_, err := st.Iterate(common.IteratePrm{
|
||||
_, err := st.Iterate(context.Background(), common.IteratePrm{
|
||||
Handler: func(elem common.IterationElement) error {
|
||||
cnt++
|
||||
return nil
|
||||
|
|
|
@ -202,14 +202,14 @@ func (s *TestStore) Delete(ctx context.Context, req common.DeletePrm) (common.De
|
|||
}
|
||||
}
|
||||
|
||||
func (s *TestStore) Iterate(req common.IteratePrm) (common.IterateRes, error) {
|
||||
func (s *TestStore) Iterate(ctx context.Context, req common.IteratePrm) (common.IterateRes, error) {
|
||||
s.mu.RLock()
|
||||
defer s.mu.RUnlock()
|
||||
switch {
|
||||
case s.overrides.Iterate != nil:
|
||||
return s.overrides.Iterate(req)
|
||||
case s.st != nil:
|
||||
return s.st.Iterate(req)
|
||||
return s.st.Iterate(ctx, req)
|
||||
default:
|
||||
panic(fmt.Sprintf("unexpected storage call: Iterate(%+v)", req))
|
||||
}
|
||||
|
|
|
@ -170,7 +170,7 @@ func (s *Shard) refillMetabase(ctx context.Context) error {
|
|||
|
||||
obj := objectSDK.New()
|
||||
|
||||
err = blobstor.IterateBinaryObjects(s.blobStor, func(addr oid.Address, data []byte, descriptor []byte) error {
|
||||
err = blobstor.IterateBinaryObjects(ctx, s.blobStor, func(addr oid.Address, data []byte, descriptor []byte) error {
|
||||
if err := obj.Unmarshal(data); err != nil {
|
||||
s.log.Warn(logs.ShardCouldNotUnmarshalObject,
|
||||
zap.Stringer("address", addr),
|
||||
|
|
|
@ -35,6 +35,17 @@ const (
|
|||
|
||||
// runFlushLoop starts background workers which periodically flush objects to the blobstor.
|
||||
func (c *cache) runFlushLoop() {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
ch := c.closeCh
|
||||
c.wg.Add(1)
|
||||
go func() {
|
||||
<-ch
|
||||
cancel()
|
||||
c.wg.Done()
|
||||
}()
|
||||
|
||||
for i := 0; i < c.workersCount; i++ {
|
||||
c.wg.Add(1)
|
||||
go c.workerFlushSmall()
|
||||
|
@ -42,7 +53,7 @@ func (c *cache) runFlushLoop() {
|
|||
|
||||
c.wg.Add(1)
|
||||
go func() {
|
||||
c.workerFlushBig(context.TODO())
|
||||
c.workerFlushBig(ctx)
|
||||
c.wg.Done()
|
||||
}()
|
||||
|
||||
|
@ -211,7 +222,7 @@ func (c *cache) flushFSTree(ctx context.Context, ignoreErrors bool) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
_, err := c.fsTree.Iterate(prm)
|
||||
_, err := c.fsTree.Iterate(ctx, prm)
|
||||
return err
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue
Shouldn't it respect context too?
I prefer to do this in separate PR - another bunch of files need to refactor. Created #394 for tracking.