forked from TrueCloudLab/frostfs-node
[#574] policer: Check if the container was really removed
Signed-off-by: Airat Arifullin <a.arifullin@yadro.com>
This commit is contained in:
parent
554ff2c06b
commit
4ea0df77d0
7 changed files with 16 additions and 13 deletions
|
@ -165,8 +165,7 @@ func newCachedContainerStorage(v container.Source, ttl time.Duration) ttlContain
|
|||
func (s ttlContainerStorage) handleRemoval(cnr cid.ID) {
|
||||
s.containerCache.set(cnr, nil, new(apistatus.ContainerNotFound))
|
||||
|
||||
// The removal causes the cache miss and thus deletion info (that contains
|
||||
// ownerID and epoch) for the container will be updated from sidechain.
|
||||
// The removal invalidates possibly stored error response.
|
||||
s.delInfoCache.remove(cnr)
|
||||
}
|
||||
|
||||
|
|
|
@ -28,7 +28,6 @@ import (
|
|||
containerMorph "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/container/morph"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||
apiClient "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||
containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||
|
@ -133,7 +132,6 @@ func configureEACLAndContainerSources(c *cfg, client *cntClient.Client, cnrSrc c
|
|||
if err == nil {
|
||||
cachedContainerLister.update(cnr.Value.Owner(), ev.ID, true)
|
||||
cachedContainerStorage.containerCache.set(ev.ID, cnr, nil)
|
||||
cachedContainerStorage.delInfoCache.set(ev.ID, nil, new(apistatus.ContainerNotFound))
|
||||
} else {
|
||||
// unlike removal, we expect successful receive of the container
|
||||
// after successful creation, so logging can be useful
|
||||
|
|
|
@ -42,6 +42,7 @@ const (
|
|||
NotificatorNotificatorStartProcessingObjectNotifications = "notificator: start processing object notifications"
|
||||
NotificatorNotificatorProcessingObjectNotification = "notificator: processing object notification"
|
||||
PolicerCouldNotGetContainer = "could not get container"
|
||||
PolicerCouldNotConfirmContainerRemoval = "could not confirm container removal"
|
||||
PolicerCouldNotInhumeObjectWithMissingContainer = "could not inhume object with missing container"
|
||||
PolicerCouldNotBuildPlacementVectorForObject = "could not build placement vector for object"
|
||||
PolicerRedundantLocalObjectCopyDetected = "redundant local object copy detected"
|
||||
|
|
|
@ -7,9 +7,9 @@ import (
|
|||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
)
|
||||
|
||||
// EverExisted checks whether the container ever existed or
|
||||
// WasRemoved checks whether the container ever existed or
|
||||
// it just has not been created yet at the current epoch.
|
||||
func EverExisted(s Source, cid cid.ID) (bool, error) {
|
||||
func WasRemoved(s Source, cid cid.ID) (bool, error) {
|
||||
_, err := s.DeletionInfo(cid)
|
||||
if err == nil {
|
||||
return true, nil
|
||||
|
|
|
@ -20,10 +20,6 @@ type deletionInfo interface {
|
|||
DeletionInfo(cid []byte) (*containercore.DelInfo, error)
|
||||
}
|
||||
|
||||
func AsContainerSpecInfoProvider(w *Client) containercore.Source {
|
||||
return (*containerSource)(w)
|
||||
}
|
||||
|
||||
func DeletionInfo(c deletionInfo, cnr cid.ID) (*containercore.DelInfo, error) {
|
||||
binCnr := make([]byte, sha256.Size)
|
||||
cnr.Encode(binCnr)
|
||||
|
|
|
@ -5,6 +5,7 @@ import (
|
|||
"errors"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
containercore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
||||
objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
||||
|
@ -27,6 +28,13 @@ func (p *Policer) processObject(ctx context.Context, addrWithType objectcore.Add
|
|||
zap.String("error", err.Error()),
|
||||
)
|
||||
if client.IsErrContainerNotFound(err) {
|
||||
existed, err := containercore.WasRemoved(p.cnrSrc, idCnr)
|
||||
if err != nil {
|
||||
p.log.Error(logs.PolicerCouldNotConfirmContainerRemoval,
|
||||
zap.Stringer("cid", idCnr),
|
||||
zap.Stringer("oid", idObj),
|
||||
zap.String("error", err.Error()))
|
||||
} else if existed {
|
||||
err := p.buryFn(ctx, addrWithType.Address)
|
||||
if err != nil {
|
||||
p.log.Error(logs.PolicerCouldNotInhumeObjectWithMissingContainer,
|
||||
|
@ -35,6 +43,7 @@ func (p *Policer) processObject(ctx context.Context, addrWithType objectcore.Add
|
|||
zap.String("error", err.Error()))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
|
|
@ -454,7 +454,7 @@ func (s *Service) removeContainers(ctx context.Context, newContainers map[cid.ID
|
|||
continue
|
||||
}
|
||||
|
||||
existed, err := containerCore.EverExisted(s.cnrSource, cnr)
|
||||
existed, err := containerCore.WasRemoved(s.cnrSource, cnr)
|
||||
if err != nil {
|
||||
s.log.Error(logs.TreeCouldNotCheckIfContainerExisted,
|
||||
zap.Stringer("cid", cnr),
|
||||
|
|
Loading…
Reference in a new issue