[#574] policer: Check the container has been really removed #640

Merged
fyrchik merged 2 commits from aarifullin/frostfs-node:fix/574-cnt_ever_existed into master 2023-08-28 14:21:40 +00:00
10 changed files with 120 additions and 45 deletions

View file

@ -142,7 +142,8 @@ func (c *lruNetCache) get(key uint64) (*netmapSDK.NetMap, error) {
// wrapper over TTL cache of values read from the network
// that implements container storage.
type ttlContainerStorage struct {
*ttlNetCache[cid.ID, *container.Container]
containerCache *ttlNetCache[cid.ID, *container.Container]
delInfoCache *ttlNetCache[cid.ID, *container.DelInfo]
}
func newCachedContainerStorage(v container.Source, ttl time.Duration) ttlContainerStorage {
@ -151,18 +152,31 @@ func newCachedContainerStorage(v container.Source, ttl time.Duration) ttlContain
lruCnrCache := newNetworkTTLCache(containerCacheSize, ttl, func(id cid.ID) (*container.Container, error) {
return v.Get(id)
})
lruDelInfoCache := newNetworkTTLCache(containerCacheSize, ttl, func(id cid.ID) (*container.DelInfo, error) {
return v.DeletionInfo(id)
})
return ttlContainerStorage{lruCnrCache}
return ttlContainerStorage{
containerCache: lruCnrCache,
delInfoCache: lruDelInfoCache,
}
}
func (s ttlContainerStorage) handleRemoval(cnr cid.ID) {
s.set(cnr, nil, new(apistatus.ContainerNotFound))
s.containerCache.set(cnr, nil, new(apistatus.ContainerNotFound))
// The removal invalidates possibly stored error response.
s.delInfoCache.remove(cnr)
}
// Get returns container value from the cache. If value is missing in the cache
// or expired, then it returns value from side chain and updates the cache.
func (s ttlContainerStorage) Get(cnr cid.ID) (*container.Container, error) {
return s.get(cnr)
return s.containerCache.get(cnr)
}
func (s ttlContainerStorage) DeletionInfo(cnr cid.ID) (*container.DelInfo, error) {
return s.delInfoCache.get(cnr)
}
type ttlEACLStorage struct {

View file

@ -113,7 +113,7 @@ func configureEACLAndContainerSources(c *cfg, client *cntClient.Client, cnrSrc c
c.cfgObject.eaclSource = eACLFetcher
cnrRdr.eacl = eACLFetcher
c.cfgObject.cnrSource = cnrSrc
cnrRdr.get = cnrSrc
cnrRdr.src = cnrSrc
cnrRdr.lister = client
} else {
// use RPC node as source of Container contract items (with caching)
@ -131,7 +131,7 @@ func configureEACLAndContainerSources(c *cfg, client *cntClient.Client, cnrSrc c
cnr, err := cnrSrc.Get(ev.ID)
if err == nil {
cachedContainerLister.update(cnr.Value.Owner(), ev.ID, true)
cachedContainerStorage.set(ev.ID, cnr, nil)
cachedContainerStorage.containerCache.set(ev.ID, cnr, nil)
} else {
// unlike removal, we expect successful receive of the container
// after successful creation, so logging can be useful
@ -159,7 +159,6 @@ func configureEACLAndContainerSources(c *cfg, client *cntClient.Client, cnrSrc c
}
cachedContainerStorage.handleRemoval(ev.ID)
c.log.Debug(logs.FrostFSNodeContainerRemovalEventsReceipt,
zap.Stringer("id", ev.ID),
)
@ -170,7 +169,7 @@ func configureEACLAndContainerSources(c *cfg, client *cntClient.Client, cnrSrc c
cnrRdr.lister = cachedContainerLister
cnrRdr.eacl = c.cfgObject.eaclSource
cnrRdr.get = c.cfgObject.cnrSource
cnrRdr.src = c.cfgObject.cnrSource
cnrWrt.cacheEnabled = true
cnrWrt.eacls = cachedEACLStorage
@ -641,7 +640,7 @@ func (c *usedSpaceService) processLoadValue(_ context.Context, a containerSDK.Si
type morphContainerReader struct {
eacl containerCore.EACLSource
get containerCore.Source
src containerCore.Source
lister interface {
List(*user.ID) ([]cid.ID, error)
@ -649,7 +648,11 @@ type morphContainerReader struct {
}
func (x *morphContainerReader) Get(id cid.ID) (*containerCore.Container, error) {
return x.get.Get(id)
return x.src.Get(id)
}
func (x *morphContainerReader) DeletionInfo(id cid.ID) (*containerCore.DelInfo, error) {
return x.src.DeletionInfo(id)
}
func (x *morphContainerReader) GetEACL(id cid.ID) (*containerCore.EACL, error) {

View file

@ -32,7 +32,7 @@ func (c cnrSource) Get(id cid.ID) (*container.Container, error) {
}
func (c cnrSource) DeletionInfo(cid cid.ID) (*container.DelInfo, error) {
return c.cli.DeletionInfo(cid)
return c.src.DeletionInfo(cid)
}
func (c cnrSource) List() ([]cid.ID, error) {

View file

@ -42,6 +42,7 @@ const (
NotificatorNotificatorStartProcessingObjectNotifications = "notificator: start processing object notifications"
NotificatorNotificatorProcessingObjectNotification = "notificator: processing object notification"
PolicerCouldNotGetContainer = "could not get container"
PolicerCouldNotConfirmContainerRemoval = "could not confirm container removal"
PolicerCouldNotInhumeObjectWithMissingContainer = "could not inhume object with missing container"
PolicerCouldNotBuildPlacementVectorForObject = "could not build placement vector for object"
PolicerRedundantLocalObjectCopyDetected = "redundant local object copy detected"

View file

@ -41,6 +41,8 @@ type Source interface {
// Implementations must not retain the container pointer and modify
// the container through it.
Get(cid.ID) (*Container, error)
DeletionInfo(cid.ID) (*DelInfo, error)
}
// EACL groups information about the FrostFS container's extended ACL stored in

View file

@ -0,0 +1,22 @@
package container
import (
"errors"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
)
// WasRemoved checks whether the container ever existed or
// it just has not been created yet at the current epoch.
func WasRemoved(s Source, cid cid.ID) (bool, error) {
_, err := s.DeletionInfo(cid)
if err == nil {
return true, nil
}
var errContainerNotFound *apistatus.ContainerNotFound
if errors.As(err, &errContainerNotFound) {
return false, nil
}
return false, err
}

View file

@ -1,6 +1,7 @@
package container
import (
"crypto/sha256"
"fmt"
"strings"
@ -11,7 +12,22 @@ import (
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
)
func (c *Client) DeletionInfo(cid cid.ID) (*containercore.DelInfo, error) {
func (x *containerSource) DeletionInfo(cnr cid.ID) (*containercore.DelInfo, error) {
return DeletionInfo((*Client)(x), cnr)
}
type deletionInfo interface {
DeletionInfo(cid []byte) (*containercore.DelInfo, error)
}
func DeletionInfo(c deletionInfo, cnr cid.ID) (*containercore.DelInfo, error) {
fyrchik marked this conversation as resolved
Review

The title should be adjusted (do we already have AsContainerSource?

The title should be adjusted (do we already have `AsContainerSource`?
Review

Sorry, my fault, bad refactoring

Sorry, my fault, bad refactoring
binCnr := make([]byte, sha256.Size)
cnr.Encode(binCnr)
return c.DeletionInfo(binCnr)
}
func (c *Client) DeletionInfo(cid []byte) (*containercore.DelInfo, error) {
prm := client.TestInvokePrm{}
prm.SetMethod(deletionInfoMethod)
prm.SetArgs(cid)

View file

@ -5,6 +5,7 @@ import (
"errors"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
containercore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
@ -27,12 +28,20 @@ func (p *Policer) processObject(ctx context.Context, addrWithType objectcore.Add
zap.String("error", err.Error()),
)
if client.IsErrContainerNotFound(err) {
err := p.buryFn(ctx, addrWithType.Address)
existed, err := containercore.WasRemoved(p.cnrSrc, idCnr)
if err != nil {
p.log.Error(logs.PolicerCouldNotInhumeObjectWithMissingContainer,
p.log.Error(logs.PolicerCouldNotConfirmContainerRemoval,
zap.Stringer("cid", idCnr),
zap.Stringer("oid", idObj),
zap.String("error", err.Error()))
} else if existed {
err := p.buryFn(ctx, addrWithType.Address)
if err != nil {
p.log.Error(logs.PolicerCouldNotInhumeObjectWithMissingContainer,
zap.Stringer("cid", idCnr),
zap.Stringer("oid", idObj),
zap.String("error", err.Error()))
}
}
}

View file

@ -34,8 +34,13 @@ func TestBuryObjectWithoutContainer(t *testing.T) {
// Container source and bury function
buryCh := make(chan oid.Address)
containerSrc := func(id cid.ID) (*container.Container, error) {
return nil, new(apistatus.ContainerNotFound)
containerSrc := containerSrc{
get: func(id cid.ID) (*container.Container, error) {
return nil, new(apistatus.ContainerNotFound)
},
deletionInfo: func(id cid.ID) (*container.DelInfo, error) {
return &container.DelInfo{}, nil
Review

Do we check the behaviour we introduce in this task, namely ContainerNotFound in both methods?

Do we check the behaviour we introduce in this task, namely ContainerNotFound in both methods?
Review

We don't but actually no good idea comes to my mind how I would check this case

This scenario when container is really removed can be tested by checking bury channel. In the case when container never existed in chain - nothing happens. Probably, to check the scenario I need to make some refactoring for processObject and put messages like (object won't be deleted) and check them from the unit-test side. WDYT?

We don't but actually no good idea comes to my mind how I would check this case This scenario when container is _really_ removed can be tested by checking bury channel. In the case when container never existed in chain - nothing happens. Probably, to check the scenario I need to make some refactoring for `processObject` and put messages like (`object won't be deleted`) and check them from the unit-test side. WDYT?
Review

Just check that bury channel is empty (maybe wait for 2 seconds)? I mean if there is no definite info, we do not expect anything there.

Just check that bury channel is empty (maybe wait for 2 seconds)? I mean if there is no definite info, we do not expect anything there.
},
}
buryFn := func(ctx context.Context, a oid.Address) error {
buryCh <- a
@ -49,7 +54,7 @@ func TestBuryObjectWithoutContainer(t *testing.T) {
// Policer instance
p := New(
WithKeySpaceIterator(&sliceKeySpaceIterator{objs: objs}),
WithContainerSource(containerSrcFunc(containerSrc)),
WithContainerSource(containerSrc),
WithBuryFunc(buryFn),
WithPool(pool),
)
@ -194,12 +199,17 @@ func TestProcessObject(t *testing.T) {
cnr := &container.Container{}
cnr.Value.Init()
cnr.Value.SetPlacementPolicy(policy)
containerSrc := func(id cid.ID) (*container.Container, error) {
if id.Equals(addr.Container()) {
return cnr, nil
}
t.Errorf("unexpected container requested: got=%v, want=%v", id, addr.Container())
return nil, new(apistatus.ContainerNotFound)
containerSrc := containerSrc{
get: func(id cid.ID) (*container.Container, error) {
if id.Equals(addr.Container()) {
return cnr, nil
}
t.Errorf("unexpected container requested: got=%v, want=%v", id, addr.Container())
return nil, new(apistatus.ContainerNotFound)
},
deletionInfo: func(id cid.ID) (*container.DelInfo, error) {
return &container.DelInfo{}, nil
},
}
buryFn := func(ctx context.Context, a oid.Address) error {
t.Errorf("unexpected object buried: %v", a)
@ -211,7 +221,7 @@ func TestProcessObject(t *testing.T) {
var gotReplicateTo []int
p := New(
WithContainerSource(containerSrcFunc(containerSrc)),
WithContainerSource(containerSrc),
WithPlacementBuilder(placementBuilderFunc(placementBuilder)),
WithNetmapKeys(announcedKeysFunc(func(k []byte) bool {
return bytes.Equal(k, nodes[0].PublicKey())
@ -251,9 +261,6 @@ func TestIteratorContract(t *testing.T) {
Type: objectSDK.TypeRegular,
}}
containerSrc := func(id cid.ID) (*container.Container, error) {
return nil, new(apistatus.ContainerNotFound)
}
buryFn := func(ctx context.Context, a oid.Address) error {
return nil
}
@ -273,9 +280,18 @@ func TestIteratorContract(t *testing.T) {
finishCh: make(chan struct{}),
}
containerSrc := containerSrc{
get: func(id cid.ID) (*container.Container, error) {
return nil, new(apistatus.ContainerNotFound)
},
deletionInfo: func(id cid.ID) (*container.DelInfo, error) {
return &container.DelInfo{}, nil
},
}
p := New(
WithKeySpaceIterator(it),
WithContainerSource(containerSrcFunc(containerSrc)),
WithContainerSource(containerSrc),
WithBuryFunc(buryFn),
WithPool(pool),
func(c *cfg) {
@ -353,10 +369,14 @@ func (it *sliceKeySpaceIterator) Rewind() {
it.cur = 0
}
// containerSrcFunc is a container.Source backed by a function.
type containerSrcFunc func(cid.ID) (*container.Container, error)
type containerSrc struct {
get func(id cid.ID) (*container.Container, error)
deletionInfo func(id cid.ID) (*container.DelInfo, error)
}
func (f containerSrcFunc) Get(id cid.ID) (*container.Container, error) { return f(id) }
func (f containerSrc) Get(id cid.ID) (*container.Container, error) { return f.get(id) }
func (f containerSrc) DeletionInfo(id cid.ID) (*container.DelInfo, error) { return f.deletionInfo(id) }
// placementBuilderFunc is a placement.Builder backed by a function
type placementBuilderFunc func(cid.ID, *oid.ID, netmap.PlacementPolicy) ([][]netmap.NodeInfo, error)

View file

@ -12,13 +12,13 @@ import (
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
containerCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
metrics "git.frostfs.info/TrueCloudLab/frostfs-observability/metrics/grpc"
tracing "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
tracing_grpc "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing/grpc"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
"github.com/panjf2000/ants/v2"
@ -441,18 +441,6 @@ func (s *Service) syncContainers(ctx context.Context, cnrs []cid.ID) {
wg.Wait()
}
func (s *Service) containerEverExisted(cid cid.ID) (bool, error) {
_, err := s.cnrSource.DeletionInfo(cid)
if err == nil {
return true, nil
}
var errContainerNotFound *apistatus.ContainerNotFound
if errors.As(err, &errContainerNotFound) {
return false, nil
}
return false, err
}
func (s *Service) removeContainers(ctx context.Context, newContainers map[cid.ID]struct{}) {
ctx, span := tracing.StartSpanFromContext(ctx, "TreeService.removeContainers")
defer span.End()
@ -466,7 +454,7 @@ func (s *Service) removeContainers(ctx context.Context, newContainers map[cid.ID
continue
}
existed, err := s.containerEverExisted(cnr)
existed, err := containerCore.WasRemoved(s.cnrSource, cnr)
if err != nil {
s.log.Error(logs.TreeCouldNotCheckIfContainerExisted,
zap.Stringer("cid", cnr),