frostfs-node/cmd/neofs-node/container.go
Leonard Lyubich d4569946c5 [#1313] container: Do not invalidate cache on Container.Delete
In previous implementation `Container.Delete` operation caused local
node's cache invalidation (container itself, eACL and listings). Any
subsequent `Container.Get` operation reversed invalidation. Given the
low latency sensitivity of deleting a container, there is no need to
touch the cache. With this approach, all pending deletion operations on
the node via the NeoFS API protocol will be delayed by the cache TTL.

Do not call cache invalidation ops in `morphContainerWriter.Delete`.
Remove no longer needed `InvalidateContainerListByCID` and
`InvalidateContainer` methods.

Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
2022-04-25 10:34:12 +03:00

598 lines
15 KiB
Go

package main
import (
"bytes"
"context"
"crypto/ecdsa"
"errors"
"fmt"
"strconv"
containerV2 "github.com/nspcc-dev/neofs-api-go/v2/container"
containerGRPC "github.com/nspcc-dev/neofs-api-go/v2/container/grpc"
"github.com/nspcc-dev/neofs-node/pkg/core/client"
containerCore "github.com/nspcc-dev/neofs-node/pkg/core/container"
netmapCore "github.com/nspcc-dev/neofs-node/pkg/core/netmap"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine"
cntClient "github.com/nspcc-dev/neofs-node/pkg/morph/client/container"
"github.com/nspcc-dev/neofs-node/pkg/morph/event"
containerEvent "github.com/nspcc-dev/neofs-node/pkg/morph/event/container"
containerTransportGRPC "github.com/nspcc-dev/neofs-node/pkg/network/transport/container/grpc"
containerService "github.com/nspcc-dev/neofs-node/pkg/services/container"
loadcontroller "github.com/nspcc-dev/neofs-node/pkg/services/container/announcement/load/controller"
loadroute "github.com/nspcc-dev/neofs-node/pkg/services/container/announcement/load/route"
placementrouter "github.com/nspcc-dev/neofs-node/pkg/services/container/announcement/load/route/placement"
loadstorage "github.com/nspcc-dev/neofs-node/pkg/services/container/announcement/load/storage"
containerMorph "github.com/nspcc-dev/neofs-node/pkg/services/container/morph"
"github.com/nspcc-dev/neofs-node/pkg/services/object/acl/eacl"
"github.com/nspcc-dev/neofs-node/pkg/util/logger"
apiClient "github.com/nspcc-dev/neofs-sdk-go/client"
containerSDK "github.com/nspcc-dev/neofs-sdk-go/container"
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
eaclSDK "github.com/nspcc-dev/neofs-sdk-go/eacl"
"github.com/nspcc-dev/neofs-sdk-go/netmap"
"github.com/nspcc-dev/neofs-sdk-go/owner"
"go.uber.org/zap"
)
const (
startEstimationNotifyEvent = "StartEstimation"
stopEstimationNotifyEvent = "StopEstimation"
)
func initContainerService(c *cfg) {
// container wrapper that tries to invoke notary
// requests if chain is configured so
wrap, err := cntClient.NewFromMorph(c.cfgMorph.client, c.cfgContainer.scriptHash, 0, cntClient.TryNotary())
fatalOnErr(err)
// container wrapper that always sends non-notary
// requests
wrapperNoNotary, err := cntClient.NewFromMorph(c.cfgMorph.client, c.cfgContainer.scriptHash, 0)
fatalOnErr(err)
cnrSrc := cntClient.AsContainerSource(wrap)
eACLFetcher := &morphEACLFetcher{
w: wrap,
}
cnrRdr := new(morphContainerReader)
cnrWrt := &morphContainerWriter{
neoClient: wrap,
}
if c.cfgMorph.disableCache {
c.cfgObject.eaclSource = eACLFetcher
cnrRdr.eacl = eACLFetcher
c.cfgObject.cnrSource = cnrSrc
cnrRdr.get = cnrSrc
cnrRdr.lister = wrap
} else {
// use RPC node as source of Container contract items (with caching)
cachedContainerStorage := newCachedContainerStorage(cnrSrc)
cachedEACLStorage := newCachedEACLStorage(eACLFetcher)
cachedContainerLister := newCachedContainerLister(wrap)
c.cfgObject.eaclSource = cachedEACLStorage
c.cfgObject.cnrSource = cachedContainerStorage
cnrRdr.lister = cachedContainerLister
cnrRdr.eacl = c.cfgObject.eaclSource
cnrRdr.get = c.cfgObject.cnrSource
cnrWrt.cacheEnabled = true
cnrWrt.lists = cachedContainerLister
cnrWrt.eacls = cachedEACLStorage
}
localMetrics := &localStorageLoad{
log: c.log,
engine: c.cfgObject.cfgLocalStorage.localStorage,
}
pubKey := c.key.PublicKey().Bytes()
resultWriter := &morphLoadWriter{
log: c.log,
cnrMorphClient: wrapperNoNotary,
key: pubKey,
}
loadAccumulator := loadstorage.New(loadstorage.Prm{})
loadPlacementBuilder := &loadPlacementBuilder{
log: c.log,
nmSrc: c.netMapSource,
cnrSrc: cnrSrc,
}
routeBuilder := placementrouter.New(placementrouter.Prm{
PlacementBuilder: loadPlacementBuilder,
})
loadRouter := loadroute.New(
loadroute.Prm{
LocalServerInfo: c,
RemoteWriterProvider: &remoteLoadAnnounceProvider{
key: &c.key.PrivateKey,
netmapKeys: c,
clientCache: c.clientCache,
deadEndProvider: loadcontroller.SimpleWriterProvider(loadAccumulator),
},
Builder: routeBuilder,
},
loadroute.WithLogger(c.log),
)
ctrl := loadcontroller.New(
loadcontroller.Prm{
LocalMetrics: loadcontroller.SimpleIteratorProvider(localMetrics),
AnnouncementAccumulator: loadcontroller.SimpleIteratorProvider(loadAccumulator),
LocalAnnouncementTarget: loadRouter,
ResultReceiver: loadcontroller.SimpleWriterProvider(resultWriter),
},
loadcontroller.WithLogger(c.log),
)
setContainerNotificationParser(c, startEstimationNotifyEvent, containerEvent.ParseStartEstimation)
addContainerAsyncNotificationHandler(c, startEstimationNotifyEvent, func(ev event.Event) {
ctrl.Start(loadcontroller.StartPrm{
Epoch: ev.(containerEvent.StartEstimation).Epoch(),
})
})
setContainerNotificationParser(c, stopEstimationNotifyEvent, containerEvent.ParseStopEstimation)
addContainerAsyncNotificationHandler(c, stopEstimationNotifyEvent, func(ev event.Event) {
ctrl.Stop(loadcontroller.StopPrm{
Epoch: ev.(containerEvent.StopEstimation).Epoch(),
})
})
server := containerTransportGRPC.New(
containerService.NewSignService(
&c.key.PrivateKey,
containerService.NewResponseService(
&usedSpaceService{
Server: containerService.NewExecutionService(containerMorph.NewExecutor(cnrRdr, cnrWrt)),
loadWriterProvider: loadRouter,
loadPlacementBuilder: loadPlacementBuilder,
routeBuilder: routeBuilder,
cfg: c,
},
c.respSvc,
),
),
)
for _, srv := range c.cfgGRPC.servers {
containerGRPC.RegisterContainerServiceServer(srv, server)
}
}
// addContainerNotificationHandler adds handler that will be executed synchronously
func addContainerNotificationHandler(c *cfg, sTyp string, h event.Handler) {
typ := event.TypeFromString(sTyp)
if c.cfgContainer.subscribers == nil {
c.cfgContainer.subscribers = make(map[event.Type][]event.Handler, 1)
}
c.cfgContainer.subscribers[typ] = append(c.cfgContainer.subscribers[typ], h)
}
// addContainerAsyncNotificationHandler adds handler that will be executed asynchronously via container workerPool
func addContainerAsyncNotificationHandler(c *cfg, sTyp string, h event.Handler) {
addContainerNotificationHandler(
c,
sTyp,
event.WorkerPoolHandler(
c.cfgContainer.workerPool,
h,
c.log,
),
)
}
func setContainerNotificationParser(c *cfg, sTyp string, p event.NotificationParser) {
typ := event.TypeFromString(sTyp)
if c.cfgContainer.parsers == nil {
c.cfgContainer.parsers = make(map[event.Type]event.NotificationParser, 1)
}
c.cfgContainer.parsers[typ] = p
}
type morphLoadWriter struct {
log *logger.Logger
cnrMorphClient *cntClient.Client
key []byte
}
func (w *morphLoadWriter) Put(a containerSDK.UsedSpaceAnnouncement) error {
w.log.Debug("save used space announcement in contract",
zap.Uint64("epoch", a.Epoch()),
zap.Stringer("cid", a.ContainerID()),
zap.Uint64("size", a.UsedSpace()),
)
prm := cntClient.AnnounceLoadPrm{}
prm.SetAnnouncement(a)
prm.SetReporter(w.key)
return w.cnrMorphClient.AnnounceLoad(prm)
}
func (*morphLoadWriter) Close() error {
return nil
}
type nopLoadWriter struct{}
func (nopLoadWriter) Put(containerSDK.UsedSpaceAnnouncement) error {
return nil
}
func (nopLoadWriter) Close() error {
return nil
}
type remoteLoadAnnounceProvider struct {
key *ecdsa.PrivateKey
netmapKeys netmapCore.AnnouncedKeys
clientCache interface {
Get(client.NodeInfo) (client.Client, error)
}
deadEndProvider loadcontroller.WriterProvider
}
func (r *remoteLoadAnnounceProvider) InitRemote(srv loadroute.ServerInfo) (loadcontroller.WriterProvider, error) {
if srv == nil {
return r.deadEndProvider, nil
}
if r.netmapKeys.IsLocalKey(srv.PublicKey()) {
// if local => return no-op writer
return loadcontroller.SimpleWriterProvider(new(nopLoadWriter)), nil
}
var info client.NodeInfo
err := client.NodeInfoFromRawNetmapElement(&info, srv)
if err != nil {
return nil, fmt.Errorf("parse client node info: %w", err)
}
c, err := r.clientCache.Get(info)
if err != nil {
return nil, fmt.Errorf("could not initialize API client: %w", err)
}
return &remoteLoadAnnounceWriterProvider{
client: c,
}, nil
}
type remoteLoadAnnounceWriterProvider struct {
client client.Client
}
func (p *remoteLoadAnnounceWriterProvider) InitWriter(ctx context.Context) (loadcontroller.Writer, error) {
return &remoteLoadAnnounceWriter{
ctx: ctx,
client: p.client,
}, nil
}
type remoteLoadAnnounceWriter struct {
ctx context.Context
client client.Client
buf []containerSDK.UsedSpaceAnnouncement
}
func (r *remoteLoadAnnounceWriter) Put(a containerSDK.UsedSpaceAnnouncement) error {
r.buf = append(r.buf, a)
return nil
}
func (r *remoteLoadAnnounceWriter) Close() error {
var cliPrm apiClient.PrmAnnounceSpace
cliPrm.SetValues(r.buf)
_, err := r.client.ContainerAnnounceUsedSpace(r.ctx, cliPrm)
return err
}
type loadPlacementBuilder struct {
log *logger.Logger
nmSrc netmapCore.Source
cnrSrc containerCore.Source
}
func (l *loadPlacementBuilder) BuildPlacement(epoch uint64, cid *cid.ID) ([]netmap.Nodes, error) {
cnrNodes, nm, err := l.buildPlacement(epoch, cid)
if err != nil {
return nil, err
}
const pivotPrefix = "load_announcement_"
pivot := []byte(
pivotPrefix + strconv.FormatUint(epoch, 10),
)
placement, err := nm.GetPlacementVectors(cnrNodes, pivot)
if err != nil {
return nil, fmt.Errorf("could not build placement vectors: %w", err)
}
return placement, nil
}
func (l *loadPlacementBuilder) buildPlacement(epoch uint64, cid *cid.ID) (netmap.ContainerNodes, *netmap.Netmap, error) {
cnr, err := l.cnrSrc.Get(cid)
if err != nil {
return nil, nil, err
}
nm, err := l.nmSrc.GetNetMapByEpoch(epoch)
if err != nil {
return nil, nil, fmt.Errorf("could not get network map: %w", err)
}
cnrNodes, err := nm.GetContainerNodes(cnr.PlacementPolicy(), cid.ToV2().GetValue())
if err != nil {
return nil, nil, fmt.Errorf("could not build container nodes: %w", err)
}
return cnrNodes, nm, nil
}
type localStorageLoad struct {
log *logger.Logger
engine *engine.StorageEngine
}
func (d *localStorageLoad) Iterate(f loadcontroller.UsedSpaceFilter, h loadcontroller.UsedSpaceHandler) error {
idList, err := engine.ListContainers(d.engine)
if err != nil {
return fmt.Errorf("list containers on engine failure: %w", err)
}
for i := range idList {
sz, err := engine.ContainerSize(d.engine, idList[i])
if err != nil {
d.log.Debug("failed to calculate container size in storage engine",
zap.Stringer("cid", idList[i]),
zap.String("error", err.Error()),
)
continue
}
d.log.Debug("container size in storage engine calculated successfully",
zap.Uint64("size", sz),
zap.Stringer("cid", idList[i]),
)
a := containerSDK.NewAnnouncement()
a.SetContainerID(idList[i])
a.SetUsedSpace(sz)
if f != nil && !f(*a) {
continue
}
if err := h(*a); err != nil {
return err
}
}
return nil
}
type usedSpaceService struct {
containerService.Server
loadWriterProvider loadcontroller.WriterProvider
loadPlacementBuilder *loadPlacementBuilder
routeBuilder loadroute.Builder
cfg *cfg
}
func (c *cfg) PublicKey() []byte {
return nodeKeyFromNetmap(c)
}
func (c *cfg) IsLocalKey(key []byte) bool {
return bytes.Equal(key, c.PublicKey())
}
func (c *cfg) IterateAddresses(f func(string) bool) {
c.iterateNetworkAddresses(f)
}
func (c *cfg) NumberOfAddresses() int {
return c.addressNum()
}
func (c *usedSpaceService) PublicKey() []byte {
return nodeKeyFromNetmap(c.cfg)
}
func (c *usedSpaceService) IterateAddresses(f func(string) bool) {
c.cfg.iterateNetworkAddresses(f)
}
func (c *usedSpaceService) NumberOfAddresses() int {
return c.cfg.addressNum()
}
func (c *usedSpaceService) AnnounceUsedSpace(ctx context.Context, req *containerV2.AnnounceUsedSpaceRequest) (*containerV2.AnnounceUsedSpaceResponse, error) {
var passedRoute []loadroute.ServerInfo
for hdr := req.GetVerificationHeader(); hdr != nil; hdr = hdr.GetOrigin() {
passedRoute = append(passedRoute, &containerOnlyKeyRemoteServerInfo{
key: hdr.GetBodySignature().GetKey(),
})
}
for left, right := 0, len(passedRoute)-1; left < right; left, right = left+1, right-1 {
passedRoute[left], passedRoute[right] = passedRoute[right], passedRoute[left]
}
passedRoute = append(passedRoute, c)
w, err := c.loadWriterProvider.InitWriter(loadroute.NewRouteContext(ctx, passedRoute))
if err != nil {
return nil, fmt.Errorf("could not initialize container's used space writer: %w", err)
}
for _, aV2 := range req.GetBody().GetAnnouncements() {
if err := c.processLoadValue(ctx, *containerSDK.NewAnnouncementFromV2(&aV2), passedRoute, w); err != nil {
return nil, err
}
}
respBody := new(containerV2.AnnounceUsedSpaceResponseBody)
resp := new(containerV2.AnnounceUsedSpaceResponse)
resp.SetBody(respBody)
return resp, nil
}
var errNodeOutsideContainer = errors.New("node outside the container")
type containerOnlyKeyRemoteServerInfo struct {
key []byte
}
func (i *containerOnlyKeyRemoteServerInfo) PublicKey() []byte {
return i.key
}
func (*containerOnlyKeyRemoteServerInfo) IterateAddresses(func(string) bool) {
}
func (*containerOnlyKeyRemoteServerInfo) NumberOfAddresses() int {
return 0
}
func (l *loadPlacementBuilder) isNodeFromContainerKey(epoch uint64, cid *cid.ID, key []byte) (bool, error) {
cnrNodes, _, err := l.buildPlacement(epoch, cid)
if err != nil {
return false, err
}
for _, vector := range cnrNodes.Replicas() {
for _, node := range vector {
if bytes.Equal(node.PublicKey(), key) {
return true, nil
}
}
}
return false, nil
}
func (c *usedSpaceService) processLoadValue(_ context.Context, a containerSDK.UsedSpaceAnnouncement,
route []loadroute.ServerInfo, w loadcontroller.Writer) error {
fromCnr, err := c.loadPlacementBuilder.isNodeFromContainerKey(a.Epoch(), a.ContainerID(), route[0].PublicKey())
if err != nil {
return fmt.Errorf("could not verify that the sender belongs to the container: %w", err)
} else if !fromCnr {
return errNodeOutsideContainer
}
err = loadroute.CheckRoute(c.routeBuilder, a, route)
if err != nil {
return fmt.Errorf("wrong route of container's used space value: %w", err)
}
err = w.Put(a)
if err != nil {
return fmt.Errorf("could not write container's used space value: %w", err)
}
return nil
}
// implements interface required by container service provided by morph executor.
type morphContainerReader struct {
eacl eacl.Source
get containerCore.Source
lister interface {
List(*owner.ID) ([]*cid.ID, error)
}
}
func (x *morphContainerReader) Get(id *cid.ID) (*containerSDK.Container, error) {
return x.get.Get(id)
}
func (x *morphContainerReader) GetEACL(id *cid.ID) (*eaclSDK.Table, error) {
return x.eacl.GetEACL(id)
}
func (x *morphContainerReader) List(id *owner.ID) ([]*cid.ID, error) {
return x.lister.List(id)
}
type morphContainerWriter struct {
neoClient *cntClient.Client
cacheEnabled bool
eacls *ttlEACLStorage
lists *ttlContainerLister
}
func (m morphContainerWriter) Put(cnr *containerSDK.Container) (*cid.ID, error) {
containerID, err := cntClient.Put(m.neoClient, cnr)
if err != nil {
return nil, err
}
if m.cacheEnabled {
m.lists.InvalidateContainerList(cnr.OwnerID())
}
return containerID, nil
}
func (m morphContainerWriter) Delete(witness containerCore.RemovalWitness) error {
return cntClient.Delete(m.neoClient, witness)
}
func (m morphContainerWriter) PutEACL(table *eaclSDK.Table) error {
err := cntClient.PutEACL(m.neoClient, table)
if err != nil {
return err
}
if m.cacheEnabled {
m.eacls.InvalidateEACL(table.CID())
}
return nil
}