[#574] core: Extend Source interface with DeletionInfo method

* Introduce common method EverExisted
* Define DeletionInfo for struct that must implement Source
* Refactor tree srv

Signed-off-by: Airat Arifullin <a.arifullin@yadro.com>
This commit is contained in:
Airat Arifullin 2023-08-24 15:27:24 +03:00 committed by Evgenii Stratonikov
parent 9072772a09
commit 554ff2c06b
8 changed files with 115 additions and 43 deletions

View file

@ -34,8 +34,13 @@ func TestBuryObjectWithoutContainer(t *testing.T) {
// Container source and bury function
buryCh := make(chan oid.Address)
containerSrc := func(id cid.ID) (*container.Container, error) {
return nil, new(apistatus.ContainerNotFound)
containerSrc := containerSrc{
get: func(id cid.ID) (*container.Container, error) {
return nil, new(apistatus.ContainerNotFound)
},
deletionInfo: func(id cid.ID) (*container.DelInfo, error) {
return &container.DelInfo{}, nil
},
}
buryFn := func(ctx context.Context, a oid.Address) error {
buryCh <- a
@ -49,7 +54,7 @@ func TestBuryObjectWithoutContainer(t *testing.T) {
// Policer instance
p := New(
WithKeySpaceIterator(&sliceKeySpaceIterator{objs: objs}),
WithContainerSource(containerSrcFunc(containerSrc)),
WithContainerSource(containerSrc),
WithBuryFunc(buryFn),
WithPool(pool),
)
@ -194,12 +199,17 @@ func TestProcessObject(t *testing.T) {
cnr := &container.Container{}
cnr.Value.Init()
cnr.Value.SetPlacementPolicy(policy)
containerSrc := func(id cid.ID) (*container.Container, error) {
if id.Equals(addr.Container()) {
return cnr, nil
}
t.Errorf("unexpected container requested: got=%v, want=%v", id, addr.Container())
return nil, new(apistatus.ContainerNotFound)
containerSrc := containerSrc{
get: func(id cid.ID) (*container.Container, error) {
if id.Equals(addr.Container()) {
return cnr, nil
}
t.Errorf("unexpected container requested: got=%v, want=%v", id, addr.Container())
return nil, new(apistatus.ContainerNotFound)
},
deletionInfo: func(id cid.ID) (*container.DelInfo, error) {
return &container.DelInfo{}, nil
},
}
buryFn := func(ctx context.Context, a oid.Address) error {
t.Errorf("unexpected object buried: %v", a)
@ -211,7 +221,7 @@ func TestProcessObject(t *testing.T) {
var gotReplicateTo []int
p := New(
WithContainerSource(containerSrcFunc(containerSrc)),
WithContainerSource(containerSrc),
WithPlacementBuilder(placementBuilderFunc(placementBuilder)),
WithNetmapKeys(announcedKeysFunc(func(k []byte) bool {
return bytes.Equal(k, nodes[0].PublicKey())
@ -251,9 +261,6 @@ func TestIteratorContract(t *testing.T) {
Type: objectSDK.TypeRegular,
}}
containerSrc := func(id cid.ID) (*container.Container, error) {
return nil, new(apistatus.ContainerNotFound)
}
buryFn := func(ctx context.Context, a oid.Address) error {
return nil
}
@ -273,9 +280,18 @@ func TestIteratorContract(t *testing.T) {
finishCh: make(chan struct{}),
}
containerSrc := containerSrc{
get: func(id cid.ID) (*container.Container, error) {
return nil, new(apistatus.ContainerNotFound)
},
deletionInfo: func(id cid.ID) (*container.DelInfo, error) {
return &container.DelInfo{}, nil
},
}
p := New(
WithKeySpaceIterator(it),
WithContainerSource(containerSrcFunc(containerSrc)),
WithContainerSource(containerSrc),
WithBuryFunc(buryFn),
WithPool(pool),
func(c *cfg) {
@ -353,10 +369,14 @@ func (it *sliceKeySpaceIterator) Rewind() {
it.cur = 0
}
// containerSrcFunc is a container.Source backed by a function.
type containerSrcFunc func(cid.ID) (*container.Container, error)
type containerSrc struct {
get func(id cid.ID) (*container.Container, error)
deletionInfo func(id cid.ID) (*container.DelInfo, error)
}
func (f containerSrcFunc) Get(id cid.ID) (*container.Container, error) { return f(id) }
func (f containerSrc) Get(id cid.ID) (*container.Container, error) { return f.get(id) }
func (f containerSrc) DeletionInfo(id cid.ID) (*container.DelInfo, error) { return f.deletionInfo(id) }
// placementBuilderFunc is a placement.Builder backed by a function
type placementBuilderFunc func(cid.ID, *oid.ID, netmap.PlacementPolicy) ([][]netmap.NodeInfo, error)

View file

@ -12,13 +12,13 @@ import (
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
containerCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
metrics "git.frostfs.info/TrueCloudLab/frostfs-observability/metrics/grpc"
tracing "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
tracing_grpc "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing/grpc"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
"github.com/panjf2000/ants/v2"
@ -441,18 +441,6 @@ func (s *Service) syncContainers(ctx context.Context, cnrs []cid.ID) {
wg.Wait()
}
func (s *Service) containerEverExisted(cid cid.ID) (bool, error) {
_, err := s.cnrSource.DeletionInfo(cid)
if err == nil {
return true, nil
}
var errContainerNotFound *apistatus.ContainerNotFound
if errors.As(err, &errContainerNotFound) {
return false, nil
}
return false, err
}
func (s *Service) removeContainers(ctx context.Context, newContainers map[cid.ID]struct{}) {
ctx, span := tracing.StartSpanFromContext(ctx, "TreeService.removeContainers")
defer span.End()
@ -466,7 +454,7 @@ func (s *Service) removeContainers(ctx context.Context, newContainers map[cid.ID
continue
}
existed, err := s.containerEverExisted(cnr)
existed, err := containerCore.EverExisted(s.cnrSource, cnr)
if err != nil {
s.log.Error(logs.TreeCouldNotCheckIfContainerExisted,
zap.Stringer("cid", cnr),