[#574] Check if the container has been really removed from neo-go #624
8 changed files with 96 additions and 1 deletions
|
@ -31,6 +31,10 @@ func (c cnrSource) Get(id cid.ID) (*container.Container, error) {
|
||||||
return c.src.Get(id)
|
return c.src.Get(id)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c cnrSource) DeletionInfo(cid cid.ID) (*container.DelInfo, error) {
|
||||||
|
return c.cli.DeletionInfo(cid)
|
||||||
|
}
|
||||||
|
|
||||||
func (c cnrSource) List() ([]cid.ID, error) {
|
func (c cnrSource) List() ([]cid.ID, error) {
|
||||||
return c.cli.ContainersOf(nil)
|
return c.cli.ContainersOf(nil)
|
||||||
}
|
}
|
||||||
|
|
|
@ -70,6 +70,7 @@ const (
|
||||||
TreeContainerTreesHaveBeenSynced = "container trees have been synced"
|
TreeContainerTreesHaveBeenSynced = "container trees have been synced"
|
||||||
TreeCouldNotQueryTreesForSynchronization = "could not query trees for synchronization"
|
TreeCouldNotQueryTreesForSynchronization = "could not query trees for synchronization"
|
||||||
TreeRemovingRedundantTrees = "removing redundant trees..."
|
TreeRemovingRedundantTrees = "removing redundant trees..."
|
||||||
|
TreeCouldNotCheckIfContainerExisted = "could not check if the container ever existed"
|
||||||
TreeCouldNotRemoveRedundantTree = "could not remove redundant tree"
|
TreeCouldNotRemoveRedundantTree = "could not remove redundant tree"
|
||||||
TreeCouldNotCalculateContainerNodes = "could not calculate container nodes"
|
TreeCouldNotCalculateContainerNodes = "could not calculate container nodes"
|
||||||
TreeFailedToApplyReplicatedOperation = "failed to apply replicated operation"
|
TreeFailedToApplyReplicatedOperation = "failed to apply replicated operation"
|
||||||
|
|
|
@ -20,6 +20,15 @@ type Container struct {
|
||||||
Session *session.Container
|
Session *session.Container
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// DelInfo contains info about removed container.
|
||||||
|
type DelInfo struct {
|
||||||
|
// Container owner.
|
||||||
|
Owner []byte
|
||||||
|
|
||||||
|
// Epoch indicates when the container was removed.
|
||||||
|
Epoch int
|
||||||
|
}
|
||||||
|
|
||||||
// Source is an interface that wraps
|
// Source is an interface that wraps
|
||||||
// basic container receiving method.
|
// basic container receiving method.
|
||||||
type Source interface {
|
type Source interface {
|
||||||
|
|
|
@ -29,6 +29,7 @@ const (
|
||||||
containersOfMethod = "containersOf"
|
containersOfMethod = "containersOf"
|
||||||
eaclMethod = "eACL"
|
eaclMethod = "eACL"
|
||||||
setEACLMethod = "setEACL"
|
setEACLMethod = "setEACL"
|
||||||
|
deletionInfoMethod = "deletionInfo"
|
||||||
|
|
||||||
startEstimationMethod = "startContainerEstimation"
|
startEstimationMethod = "startContainerEstimation"
|
||||||
stopEstimationMethod = "stopContainerEstimation"
|
stopEstimationMethod = "stopContainerEstimation"
|
||||||
|
|
52
pkg/morph/client/container/deletion_info.go
Normal file
52
pkg/morph/client/container/deletion_info.go
Normal file
|
@ -0,0 +1,52 @@
|
||||||
|
package container
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"strings"
|
||||||
|
|
||||||
|
containerContract "git.frostfs.info/TrueCloudLab/frostfs-contract/container"
|
||||||
|
containercore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client"
|
||||||
|
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||||
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
|
)
|
||||||
|
|
||||||
|
func (c *Client) DeletionInfo(cid cid.ID) (*containercore.DelInfo, error) {
|
||||||
|
prm := client.TestInvokePrm{}
|
||||||
|
prm.SetMethod(deletionInfoMethod)
|
||||||
|
prm.SetArgs(cid)
|
||||||
|
|
||||||
|
res, err := c.client.TestInvoke(prm)
|
||||||
|
if err != nil {
|
||||||
|
if strings.Contains(err.Error(), containerContract.NotFoundError) {
|
||||||
|
return nil, new(apistatus.ContainerNotFound)
|
||||||
|
}
|
||||||
|
return nil, fmt.Errorf("could not perform test invocation (%s): %w", deletionInfoMethod, err)
|
||||||
|
} else if ln := len(res); ln != 1 {
|
||||||
|
return nil, fmt.Errorf("unexpected stack item count (%s): %d", deletionInfoMethod, ln)
|
||||||
|
}
|
||||||
|
|
||||||
|
arr, err := client.ArrayFromStackItem(res[0])
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("could not get item array of container (%s): %w", deletionInfoMethod, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(arr) != 2 {
|
||||||
|
return nil, fmt.Errorf("unexpected container stack item count (%s): %d", deletionInfoMethod, len(arr))
|
||||||
|
}
|
||||||
|
|
||||||
|
owner, err := client.BytesFromStackItem(arr[0])
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("could not get byte array of container (%s): %w", deletionInfoMethod, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
epoch, err := client.IntFromStackItem(arr[1])
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("could not get byte array of container signature (%s): %w", deletionInfoMethod, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return &containercore.DelInfo{
|
||||||
|
Owner: owner,
|
||||||
|
Epoch: int(epoch),
|
||||||
|
}, nil
|
||||||
|
}
|
|
@ -14,6 +14,9 @@ import (
|
||||||
|
|
||||||
type ContainerSource interface {
|
type ContainerSource interface {
|
||||||
container.Source
|
container.Source
|
||||||
|
|
||||||
|
DeletionInfo(cid.ID) (*container.DelInfo, error)
|
||||||
|
|
||||||
// List must return list of all the containers in the FrostFS network
|
// List must return list of all the containers in the FrostFS network
|
||||||
// at the moment of a call and any error that does not allow fetching
|
// at the moment of a call and any error that does not allow fetching
|
||||||
// container information.
|
// container information.
|
||||||
|
|
|
@ -53,6 +53,10 @@ func (s dummyContainerSource) Get(id cid.ID) (*containercore.Container, error) {
|
||||||
return cnt, nil
|
return cnt, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s dummyContainerSource) DeletionInfo(id cid.ID) (*containercore.DelInfo, error) {
|
||||||
|
return &containercore.DelInfo{}, nil
|
||||||
|
}
|
||||||
|
|
||||||
type dummyEACLSource map[string]*containercore.EACL
|
type dummyEACLSource map[string]*containercore.EACL
|
||||||
|
|
||||||
func (s dummyEACLSource) GetEACL(id cid.ID) (*containercore.EACL, error) {
|
func (s dummyEACLSource) GetEACL(id cid.ID) (*containercore.EACL, error) {
|
||||||
|
|
|
@ -18,6 +18,7 @@ import (
|
||||||
metrics "git.frostfs.info/TrueCloudLab/frostfs-observability/metrics/grpc"
|
metrics "git.frostfs.info/TrueCloudLab/frostfs-observability/metrics/grpc"
|
||||||
tracing "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
tracing "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||||
tracing_grpc "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing/grpc"
|
tracing_grpc "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing/grpc"
|
||||||
|
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||||
"github.com/panjf2000/ants/v2"
|
"github.com/panjf2000/ants/v2"
|
||||||
|
@ -440,6 +441,18 @@ func (s *Service) syncContainers(ctx context.Context, cnrs []cid.ID) {
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *Service) containerEverExisted(cid cid.ID) (bool, error) {
|
||||||
|
_, err := s.cnrSource.DeletionInfo(cid)
|
||||||
|
if err == nil {
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
var errContainerNotFound *apistatus.ContainerNotFound
|
||||||
|
if errors.As(err, &errContainerNotFound) {
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
|
||||||
func (s *Service) removeContainers(ctx context.Context, newContainers map[cid.ID]struct{}) {
|
func (s *Service) removeContainers(ctx context.Context, newContainers map[cid.ID]struct{}) {
|
||||||
ctx, span := tracing.StartSpanFromContext(ctx, "TreeService.removeContainers")
|
ctx, span := tracing.StartSpanFromContext(ctx, "TreeService.removeContainers")
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
@ -452,7 +465,15 @@ func (s *Service) removeContainers(ctx context.Context, newContainers map[cid.ID
|
||||||
if _, ok := newContainers[cnr]; ok {
|
if _, ok := newContainers[cnr]; ok {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
removed = append(removed, cnr)
|
|
||||||
|
existed, err := s.containerEverExisted(cnr)
|
||||||
|
if err != nil {
|
||||||
|
s.log.Error(logs.TreeCouldNotCheckIfContainerExisted,
|
||||||
|
zap.Stringer("cid", cnr),
|
||||||
|
zap.Error(err))
|
||||||
|
} else if existed {
|
||||||
|
removed = append(removed, cnr)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
for i := range removed {
|
for i := range removed {
|
||||||
delete(s.cnrMap, removed[i])
|
delete(s.cnrMap, removed[i])
|
||||||
|
|
Loading…
Add table
Reference in a new issue