forked from TrueCloudLab/frostfs-node
[#574] core: Extend Source interface with DeletionInfo method
* Introduce common method EverExisted * Define DeletionInfo for struct that must implement Source * Refactor tree srv Signed-off-by: Airat Arifullin <a.arifullin@yadro.com>
This commit is contained in:
parent
c9e3c9956e
commit
4bc7655f65
8 changed files with 115 additions and 43 deletions
|
@ -142,7 +142,8 @@ func (c *lruNetCache) get(key uint64) (*netmapSDK.NetMap, error) {
|
|||
// wrapper over TTL cache of values read from the network
|
||||
// that implements container storage.
|
||||
type ttlContainerStorage struct {
|
||||
*ttlNetCache[cid.ID, *container.Container]
|
||||
containerCache *ttlNetCache[cid.ID, *container.Container]
|
||||
delInfoCache *ttlNetCache[cid.ID, *container.DelInfo]
|
||||
}
|
||||
|
||||
func newCachedContainerStorage(v container.Source, ttl time.Duration) ttlContainerStorage {
|
||||
|
@ -151,18 +152,32 @@ func newCachedContainerStorage(v container.Source, ttl time.Duration) ttlContain
|
|||
lruCnrCache := newNetworkTTLCache(containerCacheSize, ttl, func(id cid.ID) (*container.Container, error) {
|
||||
return v.Get(id)
|
||||
})
|
||||
lruDelInfoCache := newNetworkTTLCache(containerCacheSize, ttl, func(id cid.ID) (*container.DelInfo, error) {
|
||||
return v.DeletionInfo(id)
|
||||
})
|
||||
|
||||
return ttlContainerStorage{lruCnrCache}
|
||||
return ttlContainerStorage{
|
||||
containerCache: lruCnrCache,
|
||||
delInfoCache: lruDelInfoCache,
|
||||
}
|
||||
}
|
||||
|
||||
func (s ttlContainerStorage) handleRemoval(cnr cid.ID) {
|
||||
s.set(cnr, nil, new(apistatus.ContainerNotFound))
|
||||
s.containerCache.set(cnr, nil, new(apistatus.ContainerNotFound))
|
||||
|
||||
// The removal causes the cache miss and thus deletion info (that contains
|
||||
// ownerID and epoch) for the container will be updated from sidechain.
|
||||
s.delInfoCache.remove(cnr)
|
||||
}
|
||||
|
||||
// Get returns container value from the cache. If value is missing in the cache
|
||||
// or expired, then it returns value from side chain and updates the cache.
|
||||
func (s ttlContainerStorage) Get(cnr cid.ID) (*container.Container, error) {
|
||||
return s.get(cnr)
|
||||
return s.containerCache.get(cnr)
|
||||
}
|
||||
|
||||
func (s ttlContainerStorage) DeletionInfo(cnr cid.ID) (*container.DelInfo, error) {
|
||||
return s.delInfoCache.get(cnr)
|
||||
}
|
||||
|
||||
type ttlEACLStorage struct {
|
||||
|
|
|
@ -28,6 +28,7 @@ import (
|
|||
containerMorph "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/container/morph"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||
apiClient "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||
containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||
|
@ -113,7 +114,7 @@ func configureEACLAndContainerSources(c *cfg, client *cntClient.Client, cnrSrc c
|
|||
c.cfgObject.eaclSource = eACLFetcher
|
||||
cnrRdr.eacl = eACLFetcher
|
||||
c.cfgObject.cnrSource = cnrSrc
|
||||
cnrRdr.get = cnrSrc
|
||||
cnrRdr.src = cnrSrc
|
||||
cnrRdr.lister = client
|
||||
} else {
|
||||
// use RPC node as source of Container contract items (with caching)
|
||||
|
@ -131,7 +132,8 @@ func configureEACLAndContainerSources(c *cfg, client *cntClient.Client, cnrSrc c
|
|||
cnr, err := cnrSrc.Get(ev.ID)
|
||||
if err == nil {
|
||||
cachedContainerLister.update(cnr.Value.Owner(), ev.ID, true)
|
||||
cachedContainerStorage.set(ev.ID, cnr, nil)
|
||||
cachedContainerStorage.containerCache.set(ev.ID, cnr, nil)
|
||||
cachedContainerStorage.delInfoCache.set(ev.ID, nil, new(apistatus.ContainerNotFound))
|
||||
} else {
|
||||
// unlike removal, we expect successful receive of the container
|
||||
// after successful creation, so logging can be useful
|
||||
|
@ -159,7 +161,6 @@ func configureEACLAndContainerSources(c *cfg, client *cntClient.Client, cnrSrc c
|
|||
}
|
||||
|
||||
cachedContainerStorage.handleRemoval(ev.ID)
|
||||
|
||||
c.log.Debug(logs.FrostFSNodeContainerRemovalEventsReceipt,
|
||||
zap.Stringer("id", ev.ID),
|
||||
)
|
||||
|
@ -170,7 +171,7 @@ func configureEACLAndContainerSources(c *cfg, client *cntClient.Client, cnrSrc c
|
|||
|
||||
cnrRdr.lister = cachedContainerLister
|
||||
cnrRdr.eacl = c.cfgObject.eaclSource
|
||||
cnrRdr.get = c.cfgObject.cnrSource
|
||||
cnrRdr.src = c.cfgObject.cnrSource
|
||||
|
||||
cnrWrt.cacheEnabled = true
|
||||
cnrWrt.eacls = cachedEACLStorage
|
||||
|
@ -641,7 +642,7 @@ func (c *usedSpaceService) processLoadValue(_ context.Context, a containerSDK.Si
|
|||
type morphContainerReader struct {
|
||||
eacl containerCore.EACLSource
|
||||
|
||||
get containerCore.Source
|
||||
src containerCore.Source
|
||||
|
||||
lister interface {
|
||||
List(*user.ID) ([]cid.ID, error)
|
||||
|
@ -649,7 +650,11 @@ type morphContainerReader struct {
|
|||
}
|
||||
|
||||
func (x *morphContainerReader) Get(id cid.ID) (*containerCore.Container, error) {
|
||||
return x.get.Get(id)
|
||||
return x.src.Get(id)
|
||||
}
|
||||
|
||||
func (x *morphContainerReader) DeletionInfo(id cid.ID) (*containerCore.DelInfo, error) {
|
||||
return x.src.DeletionInfo(id)
|
||||
}
|
||||
|
||||
func (x *morphContainerReader) GetEACL(id cid.ID) (*containerCore.EACL, error) {
|
||||
|
|
|
@ -32,7 +32,7 @@ func (c cnrSource) Get(id cid.ID) (*container.Container, error) {
|
|||
}
|
||||
|
||||
func (c cnrSource) DeletionInfo(cid cid.ID) (*container.DelInfo, error) {
|
||||
return c.cli.DeletionInfo(cid)
|
||||
return c.src.DeletionInfo(cid)
|
||||
}
|
||||
|
||||
func (c cnrSource) List() ([]cid.ID, error) {
|
||||
|
|
|
@ -41,6 +41,8 @@ type Source interface {
|
|||
// Implementations must not retain the container pointer and modify
|
||||
// the container through it.
|
||||
Get(cid.ID) (*Container, error)
|
||||
|
||||
DeletionInfo(cid.ID) (*DelInfo, error)
|
||||
}
|
||||
|
||||
// EACL groups information about the FrostFS container's extended ACL stored in
|
||||
|
|
22
pkg/core/container/util.go
Normal file
22
pkg/core/container/util.go
Normal file
|
@ -0,0 +1,22 @@
|
|||
package container
|
||||
|
||||
import (
|
||||
"errors"
|
||||
|
||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
)
|
||||
|
||||
// EverExisted checks whether the container ever existed or
|
||||
// it just has not been created yet at the current epoch.
|
||||
func EverExisted(s Source, cid cid.ID) (bool, error) {
|
||||
_, err := s.DeletionInfo(cid)
|
||||
if err == nil {
|
||||
return true, nil
|
||||
}
|
||||
var errContainerNotFound *apistatus.ContainerNotFound
|
||||
if errors.As(err, &errContainerNotFound) {
|
||||
return false, nil
|
||||
}
|
||||
return false, err
|
||||
}
|
|
@ -1,6 +1,7 @@
|
|||
package container
|
||||
|
||||
import (
|
||||
"crypto/sha256"
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
|
@ -11,7 +12,26 @@ import (
|
|||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
)
|
||||
|
||||
func (c *Client) DeletionInfo(cid cid.ID) (*containercore.DelInfo, error) {
|
||||
func (x *containerSource) DeletionInfo(cnr cid.ID) (*containercore.DelInfo, error) {
|
||||
return DeletionInfo((*Client)(x), cnr)
|
||||
}
|
||||
|
||||
type deletionInfo interface {
|
||||
DeletionInfo(cid []byte) (*containercore.DelInfo, error)
|
||||
}
|
||||
|
||||
func AsContainerSpecInfoProvider(w *Client) containercore.Source {
|
||||
return (*containerSource)(w)
|
||||
}
|
||||
|
||||
func DeletionInfo(c deletionInfo, cnr cid.ID) (*containercore.DelInfo, error) {
|
||||
binCnr := make([]byte, sha256.Size)
|
||||
cnr.Encode(binCnr)
|
||||
|
||||
return c.DeletionInfo(binCnr)
|
||||
}
|
||||
|
||||
func (c *Client) DeletionInfo(cid []byte) (*containercore.DelInfo, error) {
|
||||
prm := client.TestInvokePrm{}
|
||||
prm.SetMethod(deletionInfoMethod)
|
||||
prm.SetArgs(cid)
|
||||
|
|
|
@ -34,8 +34,13 @@ func TestBuryObjectWithoutContainer(t *testing.T) {
|
|||
|
||||
// Container source and bury function
|
||||
buryCh := make(chan oid.Address)
|
||||
containerSrc := func(id cid.ID) (*container.Container, error) {
|
||||
return nil, new(apistatus.ContainerNotFound)
|
||||
containerSrc := containerSrc{
|
||||
get: func(id cid.ID) (*container.Container, error) {
|
||||
return nil, new(apistatus.ContainerNotFound)
|
||||
},
|
||||
deletionInfo: func(id cid.ID) (*container.DelInfo, error) {
|
||||
return &container.DelInfo{}, nil
|
||||
},
|
||||
}
|
||||
buryFn := func(ctx context.Context, a oid.Address) error {
|
||||
buryCh <- a
|
||||
|
@ -49,7 +54,7 @@ func TestBuryObjectWithoutContainer(t *testing.T) {
|
|||
// Policer instance
|
||||
p := New(
|
||||
WithKeySpaceIterator(&sliceKeySpaceIterator{objs: objs}),
|
||||
WithContainerSource(containerSrcFunc(containerSrc)),
|
||||
WithContainerSource(containerSrc),
|
||||
WithBuryFunc(buryFn),
|
||||
WithPool(pool),
|
||||
)
|
||||
|
@ -194,12 +199,17 @@ func TestProcessObject(t *testing.T) {
|
|||
cnr := &container.Container{}
|
||||
cnr.Value.Init()
|
||||
cnr.Value.SetPlacementPolicy(policy)
|
||||
containerSrc := func(id cid.ID) (*container.Container, error) {
|
||||
if id.Equals(addr.Container()) {
|
||||
return cnr, nil
|
||||
}
|
||||
t.Errorf("unexpected container requested: got=%v, want=%v", id, addr.Container())
|
||||
return nil, new(apistatus.ContainerNotFound)
|
||||
containerSrc := containerSrc{
|
||||
get: func(id cid.ID) (*container.Container, error) {
|
||||
if id.Equals(addr.Container()) {
|
||||
return cnr, nil
|
||||
}
|
||||
t.Errorf("unexpected container requested: got=%v, want=%v", id, addr.Container())
|
||||
return nil, new(apistatus.ContainerNotFound)
|
||||
},
|
||||
deletionInfo: func(id cid.ID) (*container.DelInfo, error) {
|
||||
return &container.DelInfo{}, nil
|
||||
},
|
||||
}
|
||||
buryFn := func(ctx context.Context, a oid.Address) error {
|
||||
t.Errorf("unexpected object buried: %v", a)
|
||||
|
@ -211,7 +221,7 @@ func TestProcessObject(t *testing.T) {
|
|||
var gotReplicateTo []int
|
||||
|
||||
p := New(
|
||||
WithContainerSource(containerSrcFunc(containerSrc)),
|
||||
WithContainerSource(containerSrc),
|
||||
WithPlacementBuilder(placementBuilderFunc(placementBuilder)),
|
||||
WithNetmapKeys(announcedKeysFunc(func(k []byte) bool {
|
||||
return bytes.Equal(k, nodes[0].PublicKey())
|
||||
|
@ -251,9 +261,6 @@ func TestIteratorContract(t *testing.T) {
|
|||
Type: objectSDK.TypeRegular,
|
||||
}}
|
||||
|
||||
containerSrc := func(id cid.ID) (*container.Container, error) {
|
||||
return nil, new(apistatus.ContainerNotFound)
|
||||
}
|
||||
buryFn := func(ctx context.Context, a oid.Address) error {
|
||||
return nil
|
||||
}
|
||||
|
@ -273,9 +280,18 @@ func TestIteratorContract(t *testing.T) {
|
|||
finishCh: make(chan struct{}),
|
||||
}
|
||||
|
||||
containerSrc := containerSrc{
|
||||
get: func(id cid.ID) (*container.Container, error) {
|
||||
return nil, new(apistatus.ContainerNotFound)
|
||||
},
|
||||
deletionInfo: func(id cid.ID) (*container.DelInfo, error) {
|
||||
return &container.DelInfo{}, nil
|
||||
},
|
||||
}
|
||||
|
||||
p := New(
|
||||
WithKeySpaceIterator(it),
|
||||
WithContainerSource(containerSrcFunc(containerSrc)),
|
||||
WithContainerSource(containerSrc),
|
||||
WithBuryFunc(buryFn),
|
||||
WithPool(pool),
|
||||
func(c *cfg) {
|
||||
|
@ -353,10 +369,14 @@ func (it *sliceKeySpaceIterator) Rewind() {
|
|||
it.cur = 0
|
||||
}
|
||||
|
||||
// containerSrcFunc is a container.Source backed by a function.
|
||||
type containerSrcFunc func(cid.ID) (*container.Container, error)
|
||||
type containerSrc struct {
|
||||
get func(id cid.ID) (*container.Container, error)
|
||||
deletionInfo func(id cid.ID) (*container.DelInfo, error)
|
||||
}
|
||||
|
||||
func (f containerSrcFunc) Get(id cid.ID) (*container.Container, error) { return f(id) }
|
||||
func (f containerSrc) Get(id cid.ID) (*container.Container, error) { return f.get(id) }
|
||||
|
||||
func (f containerSrc) DeletionInfo(id cid.ID) (*container.DelInfo, error) { return f.deletionInfo(id) }
|
||||
|
||||
// placementBuilderFunc is a placement.Builder backed by a function
|
||||
type placementBuilderFunc func(cid.ID, *oid.ID, netmap.PlacementPolicy) ([][]netmap.NodeInfo, error)
|
||||
|
|
|
@ -12,13 +12,13 @@ import (
|
|||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
containerCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/netmap"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
|
||||
metrics "git.frostfs.info/TrueCloudLab/frostfs-observability/metrics/grpc"
|
||||
tracing "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||
tracing_grpc "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing/grpc"
|
||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||
"github.com/panjf2000/ants/v2"
|
||||
|
@ -441,18 +441,6 @@ func (s *Service) syncContainers(ctx context.Context, cnrs []cid.ID) {
|
|||
wg.Wait()
|
||||
}
|
||||
|
||||
func (s *Service) containerEverExisted(cid cid.ID) (bool, error) {
|
||||
_, err := s.cnrSource.DeletionInfo(cid)
|
||||
if err == nil {
|
||||
return true, nil
|
||||
}
|
||||
var errContainerNotFound *apistatus.ContainerNotFound
|
||||
if errors.As(err, &errContainerNotFound) {
|
||||
return false, nil
|
||||
}
|
||||
return false, err
|
||||
}
|
||||
|
||||
func (s *Service) removeContainers(ctx context.Context, newContainers map[cid.ID]struct{}) {
|
||||
ctx, span := tracing.StartSpanFromContext(ctx, "TreeService.removeContainers")
|
||||
defer span.End()
|
||||
|
@ -466,7 +454,7 @@ func (s *Service) removeContainers(ctx context.Context, newContainers map[cid.ID
|
|||
continue
|
||||
}
|
||||
|
||||
existed, err := s.containerEverExisted(cnr)
|
||||
existed, err := containerCore.EverExisted(s.cnrSource, cnr)
|
||||
if err != nil {
|
||||
s.log.Error(logs.TreeCouldNotCheckIfContainerExisted,
|
||||
zap.Stringer("cid", cnr),
|
||||
|
|
Loading…
Reference in a new issue