[#574] policer: Check the container has been really removed #640
10 changed files with 120 additions and 45 deletions
|
@ -142,7 +142,8 @@ func (c *lruNetCache) get(key uint64) (*netmapSDK.NetMap, error) {
|
|||
// wrapper over TTL cache of values read from the network
|
||||
// that implements container storage.
|
||||
type ttlContainerStorage struct {
|
||||
*ttlNetCache[cid.ID, *container.Container]
|
||||
containerCache *ttlNetCache[cid.ID, *container.Container]
|
||||
delInfoCache *ttlNetCache[cid.ID, *container.DelInfo]
|
||||
}
|
||||
|
||||
func newCachedContainerStorage(v container.Source, ttl time.Duration) ttlContainerStorage {
|
||||
|
@ -151,18 +152,31 @@ func newCachedContainerStorage(v container.Source, ttl time.Duration) ttlContain
|
|||
lruCnrCache := newNetworkTTLCache(containerCacheSize, ttl, func(id cid.ID) (*container.Container, error) {
|
||||
return v.Get(id)
|
||||
})
|
||||
lruDelInfoCache := newNetworkTTLCache(containerCacheSize, ttl, func(id cid.ID) (*container.DelInfo, error) {
|
||||
return v.DeletionInfo(id)
|
||||
})
|
||||
|
||||
return ttlContainerStorage{lruCnrCache}
|
||||
return ttlContainerStorage{
|
||||
containerCache: lruCnrCache,
|
||||
delInfoCache: lruDelInfoCache,
|
||||
}
|
||||
}
|
||||
|
||||
func (s ttlContainerStorage) handleRemoval(cnr cid.ID) {
|
||||
s.set(cnr, nil, new(apistatus.ContainerNotFound))
|
||||
s.containerCache.set(cnr, nil, new(apistatus.ContainerNotFound))
|
||||
|
||||
// The removal invalidates possibly stored error response.
|
||||
s.delInfoCache.remove(cnr)
|
||||
}
|
||||
|
||||
// Get returns container value from the cache. If value is missing in the cache
|
||||
// or expired, then it returns value from side chain and updates the cache.
|
||||
func (s ttlContainerStorage) Get(cnr cid.ID) (*container.Container, error) {
|
||||
return s.get(cnr)
|
||||
return s.containerCache.get(cnr)
|
||||
}
|
||||
|
||||
func (s ttlContainerStorage) DeletionInfo(cnr cid.ID) (*container.DelInfo, error) {
|
||||
return s.delInfoCache.get(cnr)
|
||||
}
|
||||
|
||||
type ttlEACLStorage struct {
|
||||
|
|
|
@ -113,7 +113,7 @@ func configureEACLAndContainerSources(c *cfg, client *cntClient.Client, cnrSrc c
|
|||
c.cfgObject.eaclSource = eACLFetcher
|
||||
cnrRdr.eacl = eACLFetcher
|
||||
c.cfgObject.cnrSource = cnrSrc
|
||||
cnrRdr.get = cnrSrc
|
||||
cnrRdr.src = cnrSrc
|
||||
cnrRdr.lister = client
|
||||
} else {
|
||||
// use RPC node as source of Container contract items (with caching)
|
||||
|
@ -131,7 +131,7 @@ func configureEACLAndContainerSources(c *cfg, client *cntClient.Client, cnrSrc c
|
|||
cnr, err := cnrSrc.Get(ev.ID)
|
||||
if err == nil {
|
||||
cachedContainerLister.update(cnr.Value.Owner(), ev.ID, true)
|
||||
cachedContainerStorage.set(ev.ID, cnr, nil)
|
||||
cachedContainerStorage.containerCache.set(ev.ID, cnr, nil)
|
||||
} else {
|
||||
// unlike removal, we expect successful receive of the container
|
||||
// after successful creation, so logging can be useful
|
||||
|
@ -159,7 +159,6 @@ func configureEACLAndContainerSources(c *cfg, client *cntClient.Client, cnrSrc c
|
|||
}
|
||||
|
||||
cachedContainerStorage.handleRemoval(ev.ID)
|
||||
|
||||
c.log.Debug(logs.FrostFSNodeContainerRemovalEventsReceipt,
|
||||
zap.Stringer("id", ev.ID),
|
||||
)
|
||||
|
@ -170,7 +169,7 @@ func configureEACLAndContainerSources(c *cfg, client *cntClient.Client, cnrSrc c
|
|||
|
||||
cnrRdr.lister = cachedContainerLister
|
||||
cnrRdr.eacl = c.cfgObject.eaclSource
|
||||
cnrRdr.get = c.cfgObject.cnrSource
|
||||
cnrRdr.src = c.cfgObject.cnrSource
|
||||
|
||||
cnrWrt.cacheEnabled = true
|
||||
cnrWrt.eacls = cachedEACLStorage
|
||||
|
@ -641,7 +640,7 @@ func (c *usedSpaceService) processLoadValue(_ context.Context, a containerSDK.Si
|
|||
type morphContainerReader struct {
|
||||
eacl containerCore.EACLSource
|
||||
|
||||
get containerCore.Source
|
||||
src containerCore.Source
|
||||
|
||||
lister interface {
|
||||
List(*user.ID) ([]cid.ID, error)
|
||||
|
@ -649,7 +648,11 @@ type morphContainerReader struct {
|
|||
}
|
||||
|
||||
func (x *morphContainerReader) Get(id cid.ID) (*containerCore.Container, error) {
|
||||
return x.get.Get(id)
|
||||
return x.src.Get(id)
|
||||
}
|
||||
|
||||
func (x *morphContainerReader) DeletionInfo(id cid.ID) (*containerCore.DelInfo, error) {
|
||||
return x.src.DeletionInfo(id)
|
||||
}
|
||||
|
||||
func (x *morphContainerReader) GetEACL(id cid.ID) (*containerCore.EACL, error) {
|
||||
|
|
|
@ -32,7 +32,7 @@ func (c cnrSource) Get(id cid.ID) (*container.Container, error) {
|
|||
}
|
||||
|
||||
func (c cnrSource) DeletionInfo(cid cid.ID) (*container.DelInfo, error) {
|
||||
return c.cli.DeletionInfo(cid)
|
||||
return c.src.DeletionInfo(cid)
|
||||
}
|
||||
|
||||
func (c cnrSource) List() ([]cid.ID, error) {
|
||||
|
|
|
@ -42,6 +42,7 @@ const (
|
|||
NotificatorNotificatorStartProcessingObjectNotifications = "notificator: start processing object notifications"
|
||||
NotificatorNotificatorProcessingObjectNotification = "notificator: processing object notification"
|
||||
PolicerCouldNotGetContainer = "could not get container"
|
||||
PolicerCouldNotConfirmContainerRemoval = "could not confirm container removal"
|
||||
PolicerCouldNotInhumeObjectWithMissingContainer = "could not inhume object with missing container"
|
||||
PolicerCouldNotBuildPlacementVectorForObject = "could not build placement vector for object"
|
||||
PolicerRedundantLocalObjectCopyDetected = "redundant local object copy detected"
|
||||
|
|
|
@ -41,6 +41,8 @@ type Source interface {
|
|||
// Implementations must not retain the container pointer and modify
|
||||
// the container through it.
|
||||
Get(cid.ID) (*Container, error)
|
||||
|
||||
DeletionInfo(cid.ID) (*DelInfo, error)
|
||||
}
|
||||
|
||||
// EACL groups information about the FrostFS container's extended ACL stored in
|
||||
|
|
22
pkg/core/container/util.go
Normal file
22
pkg/core/container/util.go
Normal file
|
@ -0,0 +1,22 @@
|
|||
package container
|
||||
|
||||
import (
|
||||
"errors"
|
||||
|
||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
)
|
||||
|
||||
// WasRemoved checks whether the container ever existed or
|
||||
// it just has not been created yet at the current epoch.
|
||||
func WasRemoved(s Source, cid cid.ID) (bool, error) {
|
||||
_, err := s.DeletionInfo(cid)
|
||||
if err == nil {
|
||||
return true, nil
|
||||
}
|
||||
var errContainerNotFound *apistatus.ContainerNotFound
|
||||
if errors.As(err, &errContainerNotFound) {
|
||||
return false, nil
|
||||
}
|
||||
return false, err
|
||||
}
|
|
@ -1,6 +1,7 @@
|
|||
package container
|
||||
|
||||
import (
|
||||
"crypto/sha256"
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
|
@ -11,7 +12,22 @@ import (
|
|||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
)
|
||||
|
||||
func (c *Client) DeletionInfo(cid cid.ID) (*containercore.DelInfo, error) {
|
||||
func (x *containerSource) DeletionInfo(cnr cid.ID) (*containercore.DelInfo, error) {
|
||||
return DeletionInfo((*Client)(x), cnr)
|
||||
}
|
||||
|
||||
type deletionInfo interface {
|
||||
DeletionInfo(cid []byte) (*containercore.DelInfo, error)
|
||||
}
|
||||
|
||||
func DeletionInfo(c deletionInfo, cnr cid.ID) (*containercore.DelInfo, error) {
|
||||
fyrchik marked this conversation as resolved
|
||||
binCnr := make([]byte, sha256.Size)
|
||||
cnr.Encode(binCnr)
|
||||
|
||||
return c.DeletionInfo(binCnr)
|
||||
}
|
||||
|
||||
func (c *Client) DeletionInfo(cid []byte) (*containercore.DelInfo, error) {
|
||||
prm := client.TestInvokePrm{}
|
||||
prm.SetMethod(deletionInfoMethod)
|
||||
prm.SetArgs(cid)
|
||||
|
|
|
@ -5,6 +5,7 @@ import (
|
|||
"errors"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
containercore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
||||
objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
||||
|
@ -27,12 +28,20 @@ func (p *Policer) processObject(ctx context.Context, addrWithType objectcore.Add
|
|||
zap.String("error", err.Error()),
|
||||
)
|
||||
if client.IsErrContainerNotFound(err) {
|
||||
err := p.buryFn(ctx, addrWithType.Address)
|
||||
existed, err := containercore.WasRemoved(p.cnrSrc, idCnr)
|
||||
if err != nil {
|
||||
p.log.Error(logs.PolicerCouldNotInhumeObjectWithMissingContainer,
|
||||
p.log.Error(logs.PolicerCouldNotConfirmContainerRemoval,
|
||||
zap.Stringer("cid", idCnr),
|
||||
zap.Stringer("oid", idObj),
|
||||
zap.String("error", err.Error()))
|
||||
} else if existed {
|
||||
err := p.buryFn(ctx, addrWithType.Address)
|
||||
if err != nil {
|
||||
p.log.Error(logs.PolicerCouldNotInhumeObjectWithMissingContainer,
|
||||
zap.Stringer("cid", idCnr),
|
||||
zap.Stringer("oid", idObj),
|
||||
zap.String("error", err.Error()))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -34,8 +34,13 @@ func TestBuryObjectWithoutContainer(t *testing.T) {
|
|||
|
||||
// Container source and bury function
|
||||
buryCh := make(chan oid.Address)
|
||||
containerSrc := func(id cid.ID) (*container.Container, error) {
|
||||
return nil, new(apistatus.ContainerNotFound)
|
||||
containerSrc := containerSrc{
|
||||
get: func(id cid.ID) (*container.Container, error) {
|
||||
return nil, new(apistatus.ContainerNotFound)
|
||||
},
|
||||
deletionInfo: func(id cid.ID) (*container.DelInfo, error) {
|
||||
return &container.DelInfo{}, nil
|
||||
fyrchik
commented
Do we check the behaviour we introduce in this task, namely ContainerNotFound in both methods? Do we check the behaviour we introduce in this task, namely ContainerNotFound in both methods?
aarifullin
commented
We don't but actually no good idea comes to my mind how I would check this case This scenario when container is really removed can be tested by checking bury channel. In the case when container never existed in chain - nothing happens. Probably, to check the scenario I need to make some refactoring for We don't but actually no good idea comes to my mind how I would check this case
This scenario when container is _really_ removed can be tested by checking bury channel. In the case when container never existed in chain - nothing happens. Probably, to check the scenario I need to make some refactoring for `processObject` and put messages like (`object won't be deleted`) and check them from the unit-test side. WDYT?
fyrchik
commented
Just check that bury channel is empty (maybe wait for 2 seconds)? I mean if there is no definite info, we do not expect anything there. Just check that bury channel is empty (maybe wait for 2 seconds)? I mean if there is no definite info, we do not expect anything there.
|
||||
},
|
||||
}
|
||||
buryFn := func(ctx context.Context, a oid.Address) error {
|
||||
buryCh <- a
|
||||
|
@ -49,7 +54,7 @@ func TestBuryObjectWithoutContainer(t *testing.T) {
|
|||
// Policer instance
|
||||
p := New(
|
||||
WithKeySpaceIterator(&sliceKeySpaceIterator{objs: objs}),
|
||||
WithContainerSource(containerSrcFunc(containerSrc)),
|
||||
WithContainerSource(containerSrc),
|
||||
WithBuryFunc(buryFn),
|
||||
WithPool(pool),
|
||||
)
|
||||
|
@ -194,12 +199,17 @@ func TestProcessObject(t *testing.T) {
|
|||
cnr := &container.Container{}
|
||||
cnr.Value.Init()
|
||||
cnr.Value.SetPlacementPolicy(policy)
|
||||
containerSrc := func(id cid.ID) (*container.Container, error) {
|
||||
if id.Equals(addr.Container()) {
|
||||
return cnr, nil
|
||||
}
|
||||
t.Errorf("unexpected container requested: got=%v, want=%v", id, addr.Container())
|
||||
return nil, new(apistatus.ContainerNotFound)
|
||||
containerSrc := containerSrc{
|
||||
get: func(id cid.ID) (*container.Container, error) {
|
||||
if id.Equals(addr.Container()) {
|
||||
return cnr, nil
|
||||
}
|
||||
t.Errorf("unexpected container requested: got=%v, want=%v", id, addr.Container())
|
||||
return nil, new(apistatus.ContainerNotFound)
|
||||
},
|
||||
deletionInfo: func(id cid.ID) (*container.DelInfo, error) {
|
||||
return &container.DelInfo{}, nil
|
||||
},
|
||||
}
|
||||
buryFn := func(ctx context.Context, a oid.Address) error {
|
||||
t.Errorf("unexpected object buried: %v", a)
|
||||
|
@ -211,7 +221,7 @@ func TestProcessObject(t *testing.T) {
|
|||
var gotReplicateTo []int
|
||||
|
||||
p := New(
|
||||
WithContainerSource(containerSrcFunc(containerSrc)),
|
||||
WithContainerSource(containerSrc),
|
||||
WithPlacementBuilder(placementBuilderFunc(placementBuilder)),
|
||||
WithNetmapKeys(announcedKeysFunc(func(k []byte) bool {
|
||||
return bytes.Equal(k, nodes[0].PublicKey())
|
||||
|
@ -251,9 +261,6 @@ func TestIteratorContract(t *testing.T) {
|
|||
Type: objectSDK.TypeRegular,
|
||||
}}
|
||||
|
||||
containerSrc := func(id cid.ID) (*container.Container, error) {
|
||||
return nil, new(apistatus.ContainerNotFound)
|
||||
}
|
||||
buryFn := func(ctx context.Context, a oid.Address) error {
|
||||
return nil
|
||||
}
|
||||
|
@ -273,9 +280,18 @@ func TestIteratorContract(t *testing.T) {
|
|||
finishCh: make(chan struct{}),
|
||||
}
|
||||
|
||||
containerSrc := containerSrc{
|
||||
get: func(id cid.ID) (*container.Container, error) {
|
||||
return nil, new(apistatus.ContainerNotFound)
|
||||
},
|
||||
deletionInfo: func(id cid.ID) (*container.DelInfo, error) {
|
||||
return &container.DelInfo{}, nil
|
||||
},
|
||||
}
|
||||
|
||||
p := New(
|
||||
WithKeySpaceIterator(it),
|
||||
WithContainerSource(containerSrcFunc(containerSrc)),
|
||||
WithContainerSource(containerSrc),
|
||||
WithBuryFunc(buryFn),
|
||||
WithPool(pool),
|
||||
func(c *cfg) {
|
||||
|
@ -353,10 +369,14 @@ func (it *sliceKeySpaceIterator) Rewind() {
|
|||
it.cur = 0
|
||||
}
|
||||
|
||||
// containerSrcFunc is a container.Source backed by a function.
|
||||
type containerSrcFunc func(cid.ID) (*container.Container, error)
|
||||
type containerSrc struct {
|
||||
get func(id cid.ID) (*container.Container, error)
|
||||
deletionInfo func(id cid.ID) (*container.DelInfo, error)
|
||||
}
|
||||
|
||||
func (f containerSrcFunc) Get(id cid.ID) (*container.Container, error) { return f(id) }
|
||||
func (f containerSrc) Get(id cid.ID) (*container.Container, error) { return f.get(id) }
|
||||
|
||||
func (f containerSrc) DeletionInfo(id cid.ID) (*container.DelInfo, error) { return f.deletionInfo(id) }
|
||||
|
||||
// placementBuilderFunc is a placement.Builder backed by a function
|
||||
type placementBuilderFunc func(cid.ID, *oid.ID, netmap.PlacementPolicy) ([][]netmap.NodeInfo, error)
|
||||
|
|
|
@ -12,13 +12,13 @@ import (
|
|||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
containerCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/netmap"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
|
||||
metrics "git.frostfs.info/TrueCloudLab/frostfs-observability/metrics/grpc"
|
||||
tracing "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||
tracing_grpc "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing/grpc"
|
||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||
"github.com/panjf2000/ants/v2"
|
||||
|
@ -441,18 +441,6 @@ func (s *Service) syncContainers(ctx context.Context, cnrs []cid.ID) {
|
|||
wg.Wait()
|
||||
}
|
||||
|
||||
func (s *Service) containerEverExisted(cid cid.ID) (bool, error) {
|
||||
_, err := s.cnrSource.DeletionInfo(cid)
|
||||
if err == nil {
|
||||
return true, nil
|
||||
}
|
||||
var errContainerNotFound *apistatus.ContainerNotFound
|
||||
if errors.As(err, &errContainerNotFound) {
|
||||
return false, nil
|
||||
}
|
||||
return false, err
|
||||
}
|
||||
|
||||
func (s *Service) removeContainers(ctx context.Context, newContainers map[cid.ID]struct{}) {
|
||||
ctx, span := tracing.StartSpanFromContext(ctx, "TreeService.removeContainers")
|
||||
defer span.End()
|
||||
|
@ -466,7 +454,7 @@ func (s *Service) removeContainers(ctx context.Context, newContainers map[cid.ID
|
|||
continue
|
||||
}
|
||||
|
||||
existed, err := s.containerEverExisted(cnr)
|
||||
existed, err := containerCore.WasRemoved(s.cnrSource, cnr)
|
||||
if err != nil {
|
||||
s.log.Error(logs.TreeCouldNotCheckIfContainerExisted,
|
||||
zap.Stringer("cid", cnr),
|
||||
|
|
Loading…
Add table
Reference in a new issue
The title should be adjusted (do we already have
AsContainerSource
?Sorry, my fault, bad refactoring