d4569946c5
In previous implementation `Container.Delete` operation caused local node's cache invalidation (container itself, eACL and listings). Any subsequent `Container.Get` operation reversed invalidation. Given the low latency sensitivity of deleting a container, there is no need to touch the cache. With this approach, all pending deletion operations on the node via the NeoFS API protocol will be delayed by the cache TTL. Do not call cache invalidation ops in `morphContainerWriter.Delete`. Remove no longer needed `InvalidateContainerListByCID` and `InvalidateContainer` methods. Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
598 lines
15 KiB
Go
598 lines
15 KiB
Go
package main
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"crypto/ecdsa"
|
|
"errors"
|
|
"fmt"
|
|
"strconv"
|
|
|
|
containerV2 "github.com/nspcc-dev/neofs-api-go/v2/container"
|
|
containerGRPC "github.com/nspcc-dev/neofs-api-go/v2/container/grpc"
|
|
"github.com/nspcc-dev/neofs-node/pkg/core/client"
|
|
containerCore "github.com/nspcc-dev/neofs-node/pkg/core/container"
|
|
netmapCore "github.com/nspcc-dev/neofs-node/pkg/core/netmap"
|
|
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine"
|
|
cntClient "github.com/nspcc-dev/neofs-node/pkg/morph/client/container"
|
|
"github.com/nspcc-dev/neofs-node/pkg/morph/event"
|
|
containerEvent "github.com/nspcc-dev/neofs-node/pkg/morph/event/container"
|
|
containerTransportGRPC "github.com/nspcc-dev/neofs-node/pkg/network/transport/container/grpc"
|
|
containerService "github.com/nspcc-dev/neofs-node/pkg/services/container"
|
|
loadcontroller "github.com/nspcc-dev/neofs-node/pkg/services/container/announcement/load/controller"
|
|
loadroute "github.com/nspcc-dev/neofs-node/pkg/services/container/announcement/load/route"
|
|
placementrouter "github.com/nspcc-dev/neofs-node/pkg/services/container/announcement/load/route/placement"
|
|
loadstorage "github.com/nspcc-dev/neofs-node/pkg/services/container/announcement/load/storage"
|
|
containerMorph "github.com/nspcc-dev/neofs-node/pkg/services/container/morph"
|
|
"github.com/nspcc-dev/neofs-node/pkg/services/object/acl/eacl"
|
|
"github.com/nspcc-dev/neofs-node/pkg/util/logger"
|
|
apiClient "github.com/nspcc-dev/neofs-sdk-go/client"
|
|
containerSDK "github.com/nspcc-dev/neofs-sdk-go/container"
|
|
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
|
|
eaclSDK "github.com/nspcc-dev/neofs-sdk-go/eacl"
|
|
"github.com/nspcc-dev/neofs-sdk-go/netmap"
|
|
"github.com/nspcc-dev/neofs-sdk-go/owner"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
const (
|
|
startEstimationNotifyEvent = "StartEstimation"
|
|
stopEstimationNotifyEvent = "StopEstimation"
|
|
)
|
|
|
|
func initContainerService(c *cfg) {
|
|
// container wrapper that tries to invoke notary
|
|
// requests if chain is configured so
|
|
wrap, err := cntClient.NewFromMorph(c.cfgMorph.client, c.cfgContainer.scriptHash, 0, cntClient.TryNotary())
|
|
fatalOnErr(err)
|
|
|
|
// container wrapper that always sends non-notary
|
|
// requests
|
|
wrapperNoNotary, err := cntClient.NewFromMorph(c.cfgMorph.client, c.cfgContainer.scriptHash, 0)
|
|
fatalOnErr(err)
|
|
|
|
cnrSrc := cntClient.AsContainerSource(wrap)
|
|
|
|
eACLFetcher := &morphEACLFetcher{
|
|
w: wrap,
|
|
}
|
|
|
|
cnrRdr := new(morphContainerReader)
|
|
|
|
cnrWrt := &morphContainerWriter{
|
|
neoClient: wrap,
|
|
}
|
|
|
|
if c.cfgMorph.disableCache {
|
|
c.cfgObject.eaclSource = eACLFetcher
|
|
cnrRdr.eacl = eACLFetcher
|
|
c.cfgObject.cnrSource = cnrSrc
|
|
cnrRdr.get = cnrSrc
|
|
cnrRdr.lister = wrap
|
|
} else {
|
|
// use RPC node as source of Container contract items (with caching)
|
|
cachedContainerStorage := newCachedContainerStorage(cnrSrc)
|
|
cachedEACLStorage := newCachedEACLStorage(eACLFetcher)
|
|
cachedContainerLister := newCachedContainerLister(wrap)
|
|
|
|
c.cfgObject.eaclSource = cachedEACLStorage
|
|
c.cfgObject.cnrSource = cachedContainerStorage
|
|
|
|
cnrRdr.lister = cachedContainerLister
|
|
cnrRdr.eacl = c.cfgObject.eaclSource
|
|
cnrRdr.get = c.cfgObject.cnrSource
|
|
|
|
cnrWrt.cacheEnabled = true
|
|
cnrWrt.lists = cachedContainerLister
|
|
cnrWrt.eacls = cachedEACLStorage
|
|
}
|
|
|
|
localMetrics := &localStorageLoad{
|
|
log: c.log,
|
|
engine: c.cfgObject.cfgLocalStorage.localStorage,
|
|
}
|
|
|
|
pubKey := c.key.PublicKey().Bytes()
|
|
|
|
resultWriter := &morphLoadWriter{
|
|
log: c.log,
|
|
cnrMorphClient: wrapperNoNotary,
|
|
key: pubKey,
|
|
}
|
|
|
|
loadAccumulator := loadstorage.New(loadstorage.Prm{})
|
|
|
|
loadPlacementBuilder := &loadPlacementBuilder{
|
|
log: c.log,
|
|
nmSrc: c.netMapSource,
|
|
cnrSrc: cnrSrc,
|
|
}
|
|
|
|
routeBuilder := placementrouter.New(placementrouter.Prm{
|
|
PlacementBuilder: loadPlacementBuilder,
|
|
})
|
|
|
|
loadRouter := loadroute.New(
|
|
loadroute.Prm{
|
|
LocalServerInfo: c,
|
|
RemoteWriterProvider: &remoteLoadAnnounceProvider{
|
|
key: &c.key.PrivateKey,
|
|
netmapKeys: c,
|
|
clientCache: c.clientCache,
|
|
deadEndProvider: loadcontroller.SimpleWriterProvider(loadAccumulator),
|
|
},
|
|
Builder: routeBuilder,
|
|
},
|
|
loadroute.WithLogger(c.log),
|
|
)
|
|
|
|
ctrl := loadcontroller.New(
|
|
loadcontroller.Prm{
|
|
LocalMetrics: loadcontroller.SimpleIteratorProvider(localMetrics),
|
|
AnnouncementAccumulator: loadcontroller.SimpleIteratorProvider(loadAccumulator),
|
|
LocalAnnouncementTarget: loadRouter,
|
|
ResultReceiver: loadcontroller.SimpleWriterProvider(resultWriter),
|
|
},
|
|
loadcontroller.WithLogger(c.log),
|
|
)
|
|
|
|
setContainerNotificationParser(c, startEstimationNotifyEvent, containerEvent.ParseStartEstimation)
|
|
addContainerAsyncNotificationHandler(c, startEstimationNotifyEvent, func(ev event.Event) {
|
|
ctrl.Start(loadcontroller.StartPrm{
|
|
Epoch: ev.(containerEvent.StartEstimation).Epoch(),
|
|
})
|
|
})
|
|
|
|
setContainerNotificationParser(c, stopEstimationNotifyEvent, containerEvent.ParseStopEstimation)
|
|
addContainerAsyncNotificationHandler(c, stopEstimationNotifyEvent, func(ev event.Event) {
|
|
ctrl.Stop(loadcontroller.StopPrm{
|
|
Epoch: ev.(containerEvent.StopEstimation).Epoch(),
|
|
})
|
|
})
|
|
|
|
server := containerTransportGRPC.New(
|
|
containerService.NewSignService(
|
|
&c.key.PrivateKey,
|
|
containerService.NewResponseService(
|
|
&usedSpaceService{
|
|
Server: containerService.NewExecutionService(containerMorph.NewExecutor(cnrRdr, cnrWrt)),
|
|
loadWriterProvider: loadRouter,
|
|
loadPlacementBuilder: loadPlacementBuilder,
|
|
routeBuilder: routeBuilder,
|
|
cfg: c,
|
|
},
|
|
c.respSvc,
|
|
),
|
|
),
|
|
)
|
|
|
|
for _, srv := range c.cfgGRPC.servers {
|
|
containerGRPC.RegisterContainerServiceServer(srv, server)
|
|
}
|
|
}
|
|
|
|
// addContainerNotificationHandler adds handler that will be executed synchronously
|
|
func addContainerNotificationHandler(c *cfg, sTyp string, h event.Handler) {
|
|
typ := event.TypeFromString(sTyp)
|
|
|
|
if c.cfgContainer.subscribers == nil {
|
|
c.cfgContainer.subscribers = make(map[event.Type][]event.Handler, 1)
|
|
}
|
|
|
|
c.cfgContainer.subscribers[typ] = append(c.cfgContainer.subscribers[typ], h)
|
|
}
|
|
|
|
// addContainerAsyncNotificationHandler adds handler that will be executed asynchronously via container workerPool
|
|
func addContainerAsyncNotificationHandler(c *cfg, sTyp string, h event.Handler) {
|
|
addContainerNotificationHandler(
|
|
c,
|
|
sTyp,
|
|
event.WorkerPoolHandler(
|
|
c.cfgContainer.workerPool,
|
|
h,
|
|
c.log,
|
|
),
|
|
)
|
|
}
|
|
|
|
func setContainerNotificationParser(c *cfg, sTyp string, p event.NotificationParser) {
|
|
typ := event.TypeFromString(sTyp)
|
|
|
|
if c.cfgContainer.parsers == nil {
|
|
c.cfgContainer.parsers = make(map[event.Type]event.NotificationParser, 1)
|
|
}
|
|
|
|
c.cfgContainer.parsers[typ] = p
|
|
}
|
|
|
|
type morphLoadWriter struct {
|
|
log *logger.Logger
|
|
|
|
cnrMorphClient *cntClient.Client
|
|
|
|
key []byte
|
|
}
|
|
|
|
func (w *morphLoadWriter) Put(a containerSDK.UsedSpaceAnnouncement) error {
|
|
w.log.Debug("save used space announcement in contract",
|
|
zap.Uint64("epoch", a.Epoch()),
|
|
zap.Stringer("cid", a.ContainerID()),
|
|
zap.Uint64("size", a.UsedSpace()),
|
|
)
|
|
|
|
prm := cntClient.AnnounceLoadPrm{}
|
|
|
|
prm.SetAnnouncement(a)
|
|
prm.SetReporter(w.key)
|
|
|
|
return w.cnrMorphClient.AnnounceLoad(prm)
|
|
}
|
|
|
|
func (*morphLoadWriter) Close() error {
|
|
return nil
|
|
}
|
|
|
|
type nopLoadWriter struct{}
|
|
|
|
func (nopLoadWriter) Put(containerSDK.UsedSpaceAnnouncement) error {
|
|
return nil
|
|
}
|
|
|
|
func (nopLoadWriter) Close() error {
|
|
return nil
|
|
}
|
|
|
|
type remoteLoadAnnounceProvider struct {
|
|
key *ecdsa.PrivateKey
|
|
|
|
netmapKeys netmapCore.AnnouncedKeys
|
|
|
|
clientCache interface {
|
|
Get(client.NodeInfo) (client.Client, error)
|
|
}
|
|
|
|
deadEndProvider loadcontroller.WriterProvider
|
|
}
|
|
|
|
func (r *remoteLoadAnnounceProvider) InitRemote(srv loadroute.ServerInfo) (loadcontroller.WriterProvider, error) {
|
|
if srv == nil {
|
|
return r.deadEndProvider, nil
|
|
}
|
|
|
|
if r.netmapKeys.IsLocalKey(srv.PublicKey()) {
|
|
// if local => return no-op writer
|
|
return loadcontroller.SimpleWriterProvider(new(nopLoadWriter)), nil
|
|
}
|
|
|
|
var info client.NodeInfo
|
|
|
|
err := client.NodeInfoFromRawNetmapElement(&info, srv)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("parse client node info: %w", err)
|
|
}
|
|
|
|
c, err := r.clientCache.Get(info)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("could not initialize API client: %w", err)
|
|
}
|
|
|
|
return &remoteLoadAnnounceWriterProvider{
|
|
client: c,
|
|
}, nil
|
|
}
|
|
|
|
type remoteLoadAnnounceWriterProvider struct {
|
|
client client.Client
|
|
}
|
|
|
|
func (p *remoteLoadAnnounceWriterProvider) InitWriter(ctx context.Context) (loadcontroller.Writer, error) {
|
|
return &remoteLoadAnnounceWriter{
|
|
ctx: ctx,
|
|
client: p.client,
|
|
}, nil
|
|
}
|
|
|
|
type remoteLoadAnnounceWriter struct {
|
|
ctx context.Context
|
|
|
|
client client.Client
|
|
|
|
buf []containerSDK.UsedSpaceAnnouncement
|
|
}
|
|
|
|
func (r *remoteLoadAnnounceWriter) Put(a containerSDK.UsedSpaceAnnouncement) error {
|
|
r.buf = append(r.buf, a)
|
|
|
|
return nil
|
|
}
|
|
|
|
func (r *remoteLoadAnnounceWriter) Close() error {
|
|
var cliPrm apiClient.PrmAnnounceSpace
|
|
|
|
cliPrm.SetValues(r.buf)
|
|
|
|
_, err := r.client.ContainerAnnounceUsedSpace(r.ctx, cliPrm)
|
|
return err
|
|
}
|
|
|
|
type loadPlacementBuilder struct {
|
|
log *logger.Logger
|
|
|
|
nmSrc netmapCore.Source
|
|
|
|
cnrSrc containerCore.Source
|
|
}
|
|
|
|
func (l *loadPlacementBuilder) BuildPlacement(epoch uint64, cid *cid.ID) ([]netmap.Nodes, error) {
|
|
cnrNodes, nm, err := l.buildPlacement(epoch, cid)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
const pivotPrefix = "load_announcement_"
|
|
|
|
pivot := []byte(
|
|
pivotPrefix + strconv.FormatUint(epoch, 10),
|
|
)
|
|
|
|
placement, err := nm.GetPlacementVectors(cnrNodes, pivot)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("could not build placement vectors: %w", err)
|
|
}
|
|
|
|
return placement, nil
|
|
}
|
|
|
|
func (l *loadPlacementBuilder) buildPlacement(epoch uint64, cid *cid.ID) (netmap.ContainerNodes, *netmap.Netmap, error) {
|
|
cnr, err := l.cnrSrc.Get(cid)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
nm, err := l.nmSrc.GetNetMapByEpoch(epoch)
|
|
if err != nil {
|
|
return nil, nil, fmt.Errorf("could not get network map: %w", err)
|
|
}
|
|
|
|
cnrNodes, err := nm.GetContainerNodes(cnr.PlacementPolicy(), cid.ToV2().GetValue())
|
|
if err != nil {
|
|
return nil, nil, fmt.Errorf("could not build container nodes: %w", err)
|
|
}
|
|
|
|
return cnrNodes, nm, nil
|
|
}
|
|
|
|
type localStorageLoad struct {
|
|
log *logger.Logger
|
|
|
|
engine *engine.StorageEngine
|
|
}
|
|
|
|
func (d *localStorageLoad) Iterate(f loadcontroller.UsedSpaceFilter, h loadcontroller.UsedSpaceHandler) error {
|
|
idList, err := engine.ListContainers(d.engine)
|
|
if err != nil {
|
|
return fmt.Errorf("list containers on engine failure: %w", err)
|
|
}
|
|
|
|
for i := range idList {
|
|
sz, err := engine.ContainerSize(d.engine, idList[i])
|
|
if err != nil {
|
|
d.log.Debug("failed to calculate container size in storage engine",
|
|
zap.Stringer("cid", idList[i]),
|
|
zap.String("error", err.Error()),
|
|
)
|
|
|
|
continue
|
|
}
|
|
|
|
d.log.Debug("container size in storage engine calculated successfully",
|
|
zap.Uint64("size", sz),
|
|
zap.Stringer("cid", idList[i]),
|
|
)
|
|
|
|
a := containerSDK.NewAnnouncement()
|
|
a.SetContainerID(idList[i])
|
|
a.SetUsedSpace(sz)
|
|
|
|
if f != nil && !f(*a) {
|
|
continue
|
|
}
|
|
|
|
if err := h(*a); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
type usedSpaceService struct {
|
|
containerService.Server
|
|
|
|
loadWriterProvider loadcontroller.WriterProvider
|
|
|
|
loadPlacementBuilder *loadPlacementBuilder
|
|
|
|
routeBuilder loadroute.Builder
|
|
|
|
cfg *cfg
|
|
}
|
|
|
|
func (c *cfg) PublicKey() []byte {
|
|
return nodeKeyFromNetmap(c)
|
|
}
|
|
|
|
func (c *cfg) IsLocalKey(key []byte) bool {
|
|
return bytes.Equal(key, c.PublicKey())
|
|
}
|
|
|
|
func (c *cfg) IterateAddresses(f func(string) bool) {
|
|
c.iterateNetworkAddresses(f)
|
|
}
|
|
|
|
func (c *cfg) NumberOfAddresses() int {
|
|
return c.addressNum()
|
|
}
|
|
|
|
func (c *usedSpaceService) PublicKey() []byte {
|
|
return nodeKeyFromNetmap(c.cfg)
|
|
}
|
|
|
|
func (c *usedSpaceService) IterateAddresses(f func(string) bool) {
|
|
c.cfg.iterateNetworkAddresses(f)
|
|
}
|
|
|
|
func (c *usedSpaceService) NumberOfAddresses() int {
|
|
return c.cfg.addressNum()
|
|
}
|
|
|
|
func (c *usedSpaceService) AnnounceUsedSpace(ctx context.Context, req *containerV2.AnnounceUsedSpaceRequest) (*containerV2.AnnounceUsedSpaceResponse, error) {
|
|
var passedRoute []loadroute.ServerInfo
|
|
|
|
for hdr := req.GetVerificationHeader(); hdr != nil; hdr = hdr.GetOrigin() {
|
|
passedRoute = append(passedRoute, &containerOnlyKeyRemoteServerInfo{
|
|
key: hdr.GetBodySignature().GetKey(),
|
|
})
|
|
}
|
|
|
|
for left, right := 0, len(passedRoute)-1; left < right; left, right = left+1, right-1 {
|
|
passedRoute[left], passedRoute[right] = passedRoute[right], passedRoute[left]
|
|
}
|
|
|
|
passedRoute = append(passedRoute, c)
|
|
|
|
w, err := c.loadWriterProvider.InitWriter(loadroute.NewRouteContext(ctx, passedRoute))
|
|
if err != nil {
|
|
return nil, fmt.Errorf("could not initialize container's used space writer: %w", err)
|
|
}
|
|
|
|
for _, aV2 := range req.GetBody().GetAnnouncements() {
|
|
if err := c.processLoadValue(ctx, *containerSDK.NewAnnouncementFromV2(&aV2), passedRoute, w); err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
respBody := new(containerV2.AnnounceUsedSpaceResponseBody)
|
|
|
|
resp := new(containerV2.AnnounceUsedSpaceResponse)
|
|
resp.SetBody(respBody)
|
|
|
|
return resp, nil
|
|
}
|
|
|
|
var errNodeOutsideContainer = errors.New("node outside the container")
|
|
|
|
type containerOnlyKeyRemoteServerInfo struct {
|
|
key []byte
|
|
}
|
|
|
|
func (i *containerOnlyKeyRemoteServerInfo) PublicKey() []byte {
|
|
return i.key
|
|
}
|
|
|
|
func (*containerOnlyKeyRemoteServerInfo) IterateAddresses(func(string) bool) {
|
|
}
|
|
|
|
func (*containerOnlyKeyRemoteServerInfo) NumberOfAddresses() int {
|
|
return 0
|
|
}
|
|
|
|
func (l *loadPlacementBuilder) isNodeFromContainerKey(epoch uint64, cid *cid.ID, key []byte) (bool, error) {
|
|
cnrNodes, _, err := l.buildPlacement(epoch, cid)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
|
|
for _, vector := range cnrNodes.Replicas() {
|
|
for _, node := range vector {
|
|
if bytes.Equal(node.PublicKey(), key) {
|
|
return true, nil
|
|
}
|
|
}
|
|
}
|
|
|
|
return false, nil
|
|
}
|
|
|
|
func (c *usedSpaceService) processLoadValue(_ context.Context, a containerSDK.UsedSpaceAnnouncement,
|
|
route []loadroute.ServerInfo, w loadcontroller.Writer) error {
|
|
fromCnr, err := c.loadPlacementBuilder.isNodeFromContainerKey(a.Epoch(), a.ContainerID(), route[0].PublicKey())
|
|
if err != nil {
|
|
return fmt.Errorf("could not verify that the sender belongs to the container: %w", err)
|
|
} else if !fromCnr {
|
|
return errNodeOutsideContainer
|
|
}
|
|
|
|
err = loadroute.CheckRoute(c.routeBuilder, a, route)
|
|
if err != nil {
|
|
return fmt.Errorf("wrong route of container's used space value: %w", err)
|
|
}
|
|
|
|
err = w.Put(a)
|
|
if err != nil {
|
|
return fmt.Errorf("could not write container's used space value: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// implements interface required by container service provided by morph executor.
|
|
type morphContainerReader struct {
|
|
eacl eacl.Source
|
|
|
|
get containerCore.Source
|
|
|
|
lister interface {
|
|
List(*owner.ID) ([]*cid.ID, error)
|
|
}
|
|
}
|
|
|
|
func (x *morphContainerReader) Get(id *cid.ID) (*containerSDK.Container, error) {
|
|
return x.get.Get(id)
|
|
}
|
|
|
|
func (x *morphContainerReader) GetEACL(id *cid.ID) (*eaclSDK.Table, error) {
|
|
return x.eacl.GetEACL(id)
|
|
}
|
|
|
|
func (x *morphContainerReader) List(id *owner.ID) ([]*cid.ID, error) {
|
|
return x.lister.List(id)
|
|
}
|
|
|
|
type morphContainerWriter struct {
|
|
neoClient *cntClient.Client
|
|
|
|
cacheEnabled bool
|
|
eacls *ttlEACLStorage
|
|
lists *ttlContainerLister
|
|
}
|
|
|
|
func (m morphContainerWriter) Put(cnr *containerSDK.Container) (*cid.ID, error) {
|
|
containerID, err := cntClient.Put(m.neoClient, cnr)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if m.cacheEnabled {
|
|
m.lists.InvalidateContainerList(cnr.OwnerID())
|
|
}
|
|
|
|
return containerID, nil
|
|
}
|
|
|
|
func (m morphContainerWriter) Delete(witness containerCore.RemovalWitness) error {
|
|
return cntClient.Delete(m.neoClient, witness)
|
|
}
|
|
|
|
func (m morphContainerWriter) PutEACL(table *eaclSDK.Table) error {
|
|
err := cntClient.PutEACL(m.neoClient, table)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if m.cacheEnabled {
|
|
m.eacls.InvalidateEACL(table.CID())
|
|
}
|
|
|
|
return nil
|
|
}
|