node: Stop flushing big object when termination signal received #379
11 changed files with 39 additions and 21 deletions
|
@ -1,6 +1,7 @@
|
||||||
package blobovniczatree
|
package blobovniczatree
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
|
||||||
|
@ -11,7 +12,7 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
// Iterate iterates over all objects in b.
|
// Iterate iterates over all objects in b.
|
||||||
func (b *Blobovniczas) Iterate(prm common.IteratePrm) (common.IterateRes, error) {
|
func (b *Blobovniczas) Iterate(_ context.Context, prm common.IteratePrm) (common.IterateRes, error) {
|
||||||
return common.IterateRes{}, b.iterateBlobovniczas(prm.IgnoreErrors, func(p string, blz *blobovnicza.Blobovnicza) error {
|
return common.IterateRes{}, b.iterateBlobovniczas(prm.IgnoreErrors, func(p string, blz *blobovnicza.Blobovnicza) error {
|
||||||
var subPrm blobovnicza.IteratePrm
|
var subPrm blobovnicza.IteratePrm
|
||||||
subPrm.SetHandler(func(elem blobovnicza.IterationElement) error {
|
subPrm.SetHandler(func(elem blobovnicza.IterationElement) error {
|
||||||
|
|
|
@ -25,5 +25,5 @@ type Storage interface {
|
||||||
Exists(context.Context, ExistsPrm) (ExistsRes, error)
|
Exists(context.Context, ExistsPrm) (ExistsRes, error)
|
||||||
Put(context.Context, PutPrm) (PutRes, error)
|
Put(context.Context, PutPrm) (PutRes, error)
|
||||||
Delete(context.Context, DeletePrm) (DeleteRes, error)
|
Delete(context.Context, DeletePrm) (DeleteRes, error)
|
||||||
Iterate(IteratePrm) (IterateRes, error)
|
Iterate(context.Context, IteratePrm) (IterateRes, error)
|
||||||
}
|
}
|
||||||
|
|
|
@ -100,11 +100,11 @@ func addressFromString(s string) (oid.Address, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Iterate iterates over all stored objects.
|
// Iterate iterates over all stored objects.
|
||||||
func (t *FSTree) Iterate(prm common.IteratePrm) (common.IterateRes, error) {
|
func (t *FSTree) Iterate(ctx context.Context, prm common.IteratePrm) (common.IterateRes, error) {
|
||||||
return common.IterateRes{}, t.iterate(0, []string{t.RootPath}, prm)
|
return common.IterateRes{}, t.iterate(ctx, 0, []string{t.RootPath}, prm)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *FSTree) iterate(depth uint64, curPath []string, prm common.IteratePrm) error {
|
func (t *FSTree) iterate(ctx context.Context, depth uint64, curPath []string, prm common.IteratePrm) error {
|
||||||
curName := strings.Join(curPath[1:], "")
|
curName := strings.Join(curPath[1:], "")
|
||||||
des, err := os.ReadDir(filepath.Join(curPath...))
|
des, err := os.ReadDir(filepath.Join(curPath...))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -119,10 +119,15 @@ func (t *FSTree) iterate(depth uint64, curPath []string, prm common.IteratePrm)
|
||||||
curPath = append(curPath, "")
|
curPath = append(curPath, "")
|
||||||
|
|
||||||
for i := range des {
|
for i := range des {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return ctx.Err()
|
||||||
|
default:
|
||||||
|
}
|
||||||
curPath[l] = des[i].Name()
|
curPath[l] = des[i].Name()
|
||||||
|
|
||||||
if !isLast && des[i].IsDir() {
|
if !isLast && des[i].IsDir() {
|
||||||
err := t.iterate(depth+1, curPath, prm)
|
err := t.iterate(ctx, depth+1, curPath, prm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// Must be error from handler in case errors are ignored.
|
// Must be error from handler in case errors are ignored.
|
||||||
// Need to report.
|
// Need to report.
|
||||||
|
|
|
@ -49,7 +49,7 @@ func runTestNormalHandler(t *testing.T, s common.Storage, objects []objectDesc)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err := s.Iterate(iterPrm)
|
_, err := s.Iterate(context.Background(), iterPrm)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, len(objects), len(seen))
|
require.Equal(t, len(objects), len(seen))
|
||||||
for i := range objects {
|
for i := range objects {
|
||||||
|
@ -72,7 +72,7 @@ func runTestLazyHandler(t *testing.T, s common.Storage, objects []objectDesc) {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err := s.Iterate(iterPrm)
|
_, err := s.Iterate(context.Background(), iterPrm)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, len(objects), len(seen))
|
require.Equal(t, len(objects), len(seen))
|
||||||
for i := range objects {
|
for i := range objects {
|
||||||
|
@ -107,7 +107,7 @@ func runTestIgnoreLogicalErrors(t *testing.T, s common.Storage, objects []object
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err := s.Iterate(iterPrm)
|
_, err := s.Iterate(context.Background(), iterPrm)
|
||||||
require.Equal(t, err, logicErr)
|
require.Equal(t, err, logicErr)
|
||||||
require.Equal(t, len(objects)/2, len(seen))
|
require.Equal(t, len(objects)/2, len(seen))
|
||||||
for i := range objects {
|
for i := range objects {
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package blobstor
|
package blobstor
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||||
|
@ -16,12 +17,12 @@ import (
|
||||||
// did not allow to completely iterate over the storage.
|
// did not allow to completely iterate over the storage.
|
||||||
//
|
//
|
||||||
// If handler returns an error, method wraps and returns it immediately.
|
// If handler returns an error, method wraps and returns it immediately.
|
||||||
func (b *BlobStor) Iterate(prm common.IteratePrm) (common.IterateRes, error) {
|
func (b *BlobStor) Iterate(ctx context.Context, prm common.IteratePrm) (common.IterateRes, error) {
|
||||||
b.modeMtx.RLock()
|
b.modeMtx.RLock()
|
||||||
defer b.modeMtx.RUnlock()
|
defer b.modeMtx.RUnlock()
|
||||||
|
|
||||||
for i := range b.storage {
|
for i := range b.storage {
|
||||||
_, err := b.storage[i].Storage.Iterate(prm)
|
_, err := b.storage[i].Storage.Iterate(ctx, prm)
|
||||||
if err != nil && !prm.IgnoreErrors {
|
if err != nil && !prm.IgnoreErrors {
|
||||||
return common.IterateRes{}, fmt.Errorf("blobstor iterator failure: %w", err)
|
return common.IterateRes{}, fmt.Errorf("blobstor iterator failure: %w", err)
|
||||||
}
|
}
|
||||||
|
@ -31,7 +32,7 @@ func (b *BlobStor) Iterate(prm common.IteratePrm) (common.IterateRes, error) {
|
||||||
|
|
||||||
// IterateBinaryObjects is a helper function which iterates over BlobStor and passes binary objects to f.
|
// IterateBinaryObjects is a helper function which iterates over BlobStor and passes binary objects to f.
|
||||||
// Errors related to object reading and unmarshaling are logged and skipped.
|
// Errors related to object reading and unmarshaling are logged and skipped.
|
||||||
func IterateBinaryObjects(blz *BlobStor, f func(addr oid.Address, data []byte, descriptor []byte) error) error {
|
func IterateBinaryObjects(ctx context.Context, blz *BlobStor, f func(addr oid.Address, data []byte, descriptor []byte) error) error {
|
||||||
var prm common.IteratePrm
|
var prm common.IteratePrm
|
||||||
|
|
||||||
prm.Handler = func(elem common.IterationElement) error {
|
prm.Handler = func(elem common.IterationElement) error {
|
||||||
|
@ -45,7 +46,7 @@ func IterateBinaryObjects(blz *BlobStor, f func(addr oid.Address, data []byte, d
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err := blz.Iterate(prm)
|
_, err := blz.Iterate(ctx, prm)
|
||||||
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
@ -68,7 +68,7 @@ func TestIterateObjects(t *testing.T) {
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
err := IterateBinaryObjects(blobStor, func(addr oid.Address, data []byte, descriptor []byte) error {
|
err := IterateBinaryObjects(context.Background(), blobStor, func(addr oid.Address, data []byte, descriptor []byte) error {
|
||||||
v, ok := mObjs[string(data)]
|
v, ok := mObjs[string(data)]
|
||||||
require.True(t, ok)
|
require.True(t, ok)
|
||||||
|
|
||||||
|
|
|
@ -126,7 +126,7 @@ func (s *memstoreImpl) Delete(_ context.Context, req common.DeletePrm) (common.D
|
||||||
return common.DeleteRes{}, logicerr.Wrap(apistatus.ObjectNotFound{})
|
return common.DeleteRes{}, logicerr.Wrap(apistatus.ObjectNotFound{})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *memstoreImpl) Iterate(req common.IteratePrm) (common.IterateRes, error) {
|
func (s *memstoreImpl) Iterate(_ context.Context, req common.IteratePrm) (common.IterateRes, error) {
|
||||||
s.mu.RLock()
|
s.mu.RLock()
|
||||||
defer s.mu.RUnlock()
|
defer s.mu.RUnlock()
|
||||||
for k, v := range s.objs {
|
for k, v := range s.objs {
|
||||||
|
|
|
@ -207,7 +207,7 @@ func BenchmarkSubstorageIteratePerf(b *testing.B) {
|
||||||
// Benchmark iterate
|
// Benchmark iterate
|
||||||
cnt := 0
|
cnt := 0
|
||||||
b.ResetTimer()
|
b.ResetTimer()
|
||||||
_, err := st.Iterate(common.IteratePrm{
|
_, err := st.Iterate(context.Background(), common.IteratePrm{
|
||||||
Handler: func(elem common.IterationElement) error {
|
Handler: func(elem common.IterationElement) error {
|
||||||
cnt++
|
cnt++
|
||||||
return nil
|
return nil
|
||||||
|
|
|
@ -202,14 +202,14 @@ func (s *TestStore) Delete(ctx context.Context, req common.DeletePrm) (common.De
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *TestStore) Iterate(req common.IteratePrm) (common.IterateRes, error) {
|
func (s *TestStore) Iterate(ctx context.Context, req common.IteratePrm) (common.IterateRes, error) {
|
||||||
s.mu.RLock()
|
s.mu.RLock()
|
||||||
defer s.mu.RUnlock()
|
defer s.mu.RUnlock()
|
||||||
switch {
|
switch {
|
||||||
case s.overrides.Iterate != nil:
|
case s.overrides.Iterate != nil:
|
||||||
return s.overrides.Iterate(req)
|
return s.overrides.Iterate(req)
|
||||||
case s.st != nil:
|
case s.st != nil:
|
||||||
return s.st.Iterate(req)
|
return s.st.Iterate(ctx, req)
|
||||||
default:
|
default:
|
||||||
panic(fmt.Sprintf("unexpected storage call: Iterate(%+v)", req))
|
panic(fmt.Sprintf("unexpected storage call: Iterate(%+v)", req))
|
||||||
}
|
}
|
||||||
|
|
|
@ -170,7 +170,7 @@ func (s *Shard) refillMetabase(ctx context.Context) error {
|
||||||
|
|
||||||
obj := objectSDK.New()
|
obj := objectSDK.New()
|
||||||
|
|
||||||
err = blobstor.IterateBinaryObjects(s.blobStor, func(addr oid.Address, data []byte, descriptor []byte) error {
|
err = blobstor.IterateBinaryObjects(ctx, s.blobStor, func(addr oid.Address, data []byte, descriptor []byte) error {
|
||||||
if err := obj.Unmarshal(data); err != nil {
|
if err := obj.Unmarshal(data); err != nil {
|
||||||
s.log.Warn(logs.ShardCouldNotUnmarshalObject,
|
s.log.Warn(logs.ShardCouldNotUnmarshalObject,
|
||||||
zap.Stringer("address", addr),
|
zap.Stringer("address", addr),
|
||||||
|
|
|
@ -35,6 +35,17 @@ const (
|
||||||
|
|
||||||
// runFlushLoop starts background workers which periodically flush objects to the blobstor.
|
// runFlushLoop starts background workers which periodically flush objects to the blobstor.
|
||||||
func (c *cache) runFlushLoop() {
|
func (c *cache) runFlushLoop() {
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
ch := c.closeCh
|
||||||
|
c.wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
<-ch
|
||||||
|
cancel()
|
||||||
|
c.wg.Done()
|
||||||
|
}()
|
||||||
|
|
||||||
for i := 0; i < c.workersCount; i++ {
|
for i := 0; i < c.workersCount; i++ {
|
||||||
c.wg.Add(1)
|
c.wg.Add(1)
|
||||||
go c.workerFlushSmall()
|
go c.workerFlushSmall()
|
||||||
|
@ -42,7 +53,7 @@ func (c *cache) runFlushLoop() {
|
||||||
|
|
||||||
c.wg.Add(1)
|
c.wg.Add(1)
|
||||||
go func() {
|
go func() {
|
||||||
c.workerFlushBig(context.TODO())
|
c.workerFlushBig(ctx)
|
||||||
c.wg.Done()
|
c.wg.Done()
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
@ -211,7 +222,7 @@ func (c *cache) flushFSTree(ctx context.Context, ignoreErrors bool) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err := c.fsTree.Iterate(prm)
|
_, err := c.fsTree.Iterate(ctx, prm)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue