[#574] Check if the container has been really removed from neo-go #624
8 changed files with 96 additions and 1 deletions
|
@ -31,6 +31,10 @@ func (c cnrSource) Get(id cid.ID) (*container.Container, error) {
|
|||
return c.src.Get(id)
|
||||
}
|
||||
|
||||
func (c cnrSource) DeletionInfo(cid cid.ID) (*container.DelInfo, error) {
|
||||
return c.cli.DeletionInfo(cid)
|
||||
}
|
||||
|
||||
func (c cnrSource) List() ([]cid.ID, error) {
|
||||
return c.cli.ContainersOf(nil)
|
||||
}
|
||||
|
|
|
@ -70,6 +70,7 @@ const (
|
|||
TreeContainerTreesHaveBeenSynced = "container trees have been synced"
|
||||
TreeCouldNotQueryTreesForSynchronization = "could not query trees for synchronization"
|
||||
TreeRemovingRedundantTrees = "removing redundant trees..."
|
||||
TreeCouldNotCheckIfContainerExisted = "could not check if the container ever existed"
|
||||
TreeCouldNotRemoveRedundantTree = "could not remove redundant tree"
|
||||
TreeCouldNotCalculateContainerNodes = "could not calculate container nodes"
|
||||
TreeFailedToApplyReplicatedOperation = "failed to apply replicated operation"
|
||||
|
|
|
@ -20,6 +20,15 @@ type Container struct {
|
|||
Session *session.Container
|
||||
}
|
||||
|
||||
// DelInfo contains info about removed container.
|
||||
type DelInfo struct {
|
||||
// Container owner.
|
||||
Owner []byte
|
||||
|
||||
// Epoch indicates when the container was removed.
|
||||
Epoch int
|
||||
}
|
||||
|
||||
// Source is an interface that wraps
|
||||
// basic container receiving method.
|
||||
type Source interface {
|
||||
|
|
|
@ -29,6 +29,7 @@ const (
|
|||
containersOfMethod = "containersOf"
|
||||
eaclMethod = "eACL"
|
||||
setEACLMethod = "setEACL"
|
||||
deletionInfoMethod = "deletionInfo"
|
||||
|
||||
startEstimationMethod = "startContainerEstimation"
|
||||
stopEstimationMethod = "stopContainerEstimation"
|
||||
|
|
52
pkg/morph/client/container/deletion_info.go
Normal file
52
pkg/morph/client/container/deletion_info.go
Normal file
|
@ -0,0 +1,52 @@
|
|||
package container
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
containerContract "git.frostfs.info/TrueCloudLab/frostfs-contract/container"
|
||||
containercore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client"
|
||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
)
|
||||
|
||||
func (c *Client) DeletionInfo(cid cid.ID) (*containercore.DelInfo, error) {
|
||||
prm := client.TestInvokePrm{}
|
||||
prm.SetMethod(deletionInfoMethod)
|
||||
prm.SetArgs(cid)
|
||||
|
||||
res, err := c.client.TestInvoke(prm)
|
||||
if err != nil {
|
||||
if strings.Contains(err.Error(), containerContract.NotFoundError) {
|
||||
return nil, new(apistatus.ContainerNotFound)
|
||||
}
|
||||
return nil, fmt.Errorf("could not perform test invocation (%s): %w", deletionInfoMethod, err)
|
||||
} else if ln := len(res); ln != 1 {
|
||||
return nil, fmt.Errorf("unexpected stack item count (%s): %d", deletionInfoMethod, ln)
|
||||
}
|
||||
|
||||
arr, err := client.ArrayFromStackItem(res[0])
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not get item array of container (%s): %w", deletionInfoMethod, err)
|
||||
}
|
||||
|
||||
if len(arr) != 2 {
|
||||
return nil, fmt.Errorf("unexpected container stack item count (%s): %d", deletionInfoMethod, len(arr))
|
||||
}
|
||||
|
||||
owner, err := client.BytesFromStackItem(arr[0])
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not get byte array of container (%s): %w", deletionInfoMethod, err)
|
||||
}
|
||||
|
||||
epoch, err := client.IntFromStackItem(arr[1])
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not get byte array of container signature (%s): %w", deletionInfoMethod, err)
|
||||
}
|
||||
|
||||
return &containercore.DelInfo{
|
||||
Owner: owner,
|
||||
Epoch: int(epoch),
|
||||
}, nil
|
||||
}
|
|
@ -14,6 +14,9 @@ import (
|
|||
|
||||
type ContainerSource interface {
|
||||
container.Source
|
||||
|
||||
DeletionInfo(cid.ID) (*container.DelInfo, error)
|
||||
|
||||
// List must return list of all the containers in the FrostFS network
|
||||
// at the moment of a call and any error that does not allow fetching
|
||||
// container information.
|
||||
|
|
|
@ -53,6 +53,10 @@ func (s dummyContainerSource) Get(id cid.ID) (*containercore.Container, error) {
|
|||
return cnt, nil
|
||||
}
|
||||
|
||||
func (s dummyContainerSource) DeletionInfo(id cid.ID) (*containercore.DelInfo, error) {
|
||||
return &containercore.DelInfo{}, nil
|
||||
}
|
||||
|
||||
type dummyEACLSource map[string]*containercore.EACL
|
||||
|
||||
func (s dummyEACLSource) GetEACL(id cid.ID) (*containercore.EACL, error) {
|
||||
|
|
|
@ -18,6 +18,7 @@ import (
|
|||
metrics "git.frostfs.info/TrueCloudLab/frostfs-observability/metrics/grpc"
|
||||
tracing "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||
tracing_grpc "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing/grpc"
|
||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||
"github.com/panjf2000/ants/v2"
|
||||
|
@ -440,6 +441,18 @@ func (s *Service) syncContainers(ctx context.Context, cnrs []cid.ID) {
|
|||
wg.Wait()
|
||||
}
|
||||
|
||||
func (s *Service) containerEverExisted(cid cid.ID) (bool, error) {
|
||||
_, err := s.cnrSource.DeletionInfo(cid)
|
||||
if err == nil {
|
||||
return true, nil
|
||||
}
|
||||
var errContainerNotFound *apistatus.ContainerNotFound
|
||||
if errors.As(err, &errContainerNotFound) {
|
||||
return false, nil
|
||||
}
|
||||
return false, err
|
||||
}
|
||||
|
||||
func (s *Service) removeContainers(ctx context.Context, newContainers map[cid.ID]struct{}) {
|
||||
ctx, span := tracing.StartSpanFromContext(ctx, "TreeService.removeContainers")
|
||||
defer span.End()
|
||||
|
@ -452,8 +465,16 @@ func (s *Service) removeContainers(ctx context.Context, newContainers map[cid.ID
|
|||
if _, ok := newContainers[cnr]; ok {
|
||||
continue
|
||||
}
|
||||
|
||||
existed, err := s.containerEverExisted(cnr)
|
||||
if err != nil {
|
||||
s.log.Error(logs.TreeCouldNotCheckIfContainerExisted,
|
||||
zap.Stringer("cid", cnr),
|
||||
zap.Error(err))
|
||||
} else if existed {
|
||||
removed = append(removed, cnr)
|
||||
}
|
||||
}
|
||||
for i := range removed {
|
||||
delete(s.cnrMap, removed[i])
|
||||
}
|
||||
|
|
Loading…
Add table
Reference in a new issue