frostfs-node/cmd/frostfs-node/container.go
Dmitrii Stepanov ddbc9e255f [#231] node: Invalidate container cache on PutSuccess event
For example: frostfs-cli creates container and makes polling
GetContainer requests. These requests go through container cache,
so not found error stores in container cache.
So container cache can contain not found error when PutSuccess event received.

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2023-04-25 09:42:02 +03:00

691 lines
19 KiB
Go

package main
import (
"bytes"
"context"
"crypto/ecdsa"
"crypto/sha256"
"errors"
"fmt"
"strconv"
containerV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/container"
containerGRPC "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/container/grpc"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
containerCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
netmapCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
cntClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/container"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event"
containerEvent "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event/container"
containerTransportGRPC "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network/transport/container/grpc"
containerService "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/container"
loadcontroller "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/container/announcement/load/controller"
loadroute "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/container/announcement/load/route"
placementrouter "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/container/announcement/load/route/placement"
loadstorage "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/container/announcement/load/storage"
containerMorph "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/container/morph"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
apiClient "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
"go.uber.org/zap"
)
const (
startEstimationNotifyEvent = "StartEstimation"
stopEstimationNotifyEvent = "StopEstimation"
)
func initContainerService(ctx context.Context, c *cfg) {
// container wrapper that tries to invoke notary
// requests if chain is configured so
wrap, err := cntClient.NewFromMorph(c.cfgMorph.client, c.cfgContainer.scriptHash, 0, cntClient.TryNotary())
fatalOnErr(err)
c.shared.cnrClient = wrap
cnrSrc := cntClient.AsContainerSource(wrap)
cnrRdr, cnrWrt := configureEACLAndContainerSources(c, wrap, cnrSrc)
loadAccumulator := loadstorage.New(loadstorage.Prm{})
loadPlacementBuilder := &loadPlacementBuilder{
log: c.log,
nmSrc: c.netMapSource,
cnrSrc: cnrSrc,
}
routeBuilder := placementrouter.New(placementrouter.Prm{
PlacementBuilder: loadPlacementBuilder,
})
loadRouter := loadroute.New(
loadroute.Prm{
LocalServerInfo: c,
RemoteWriterProvider: &remoteLoadAnnounceProvider{
key: &c.key.PrivateKey,
netmapKeys: c,
clientCache: c.bgClientCache,
deadEndProvider: loadcontroller.SimpleWriterProvider(loadAccumulator),
},
Builder: routeBuilder,
},
loadroute.WithLogger(c.log),
)
setLoadController(ctx, c, loadRouter, loadAccumulator)
server := containerTransportGRPC.New(
containerService.NewSignService(
&c.key.PrivateKey,
containerService.NewResponseService(
&usedSpaceService{
Server: containerService.NewExecutionService(containerMorph.NewExecutor(cnrRdr, cnrWrt)),
loadWriterProvider: loadRouter,
loadPlacementBuilder: loadPlacementBuilder,
routeBuilder: routeBuilder,
cfg: c,
},
c.respSvc,
),
),
)
for _, srv := range c.cfgGRPC.servers {
containerGRPC.RegisterContainerServiceServer(srv, server)
}
}
func configureEACLAndContainerSources(c *cfg, client *cntClient.Client, cnrSrc containerCore.Source) (*morphContainerReader, *morphContainerWriter) {
eACLFetcher := &morphEACLFetcher{
w: client,
}
cnrRdr := new(morphContainerReader)
cnrWrt := &morphContainerWriter{
neoClient: client,
}
if c.cfgMorph.cacheTTL <= 0 {
c.cfgObject.eaclSource = eACLFetcher
cnrRdr.eacl = eACLFetcher
c.cfgObject.cnrSource = cnrSrc
cnrRdr.get = cnrSrc
cnrRdr.lister = client
} else {
// use RPC node as source of Container contract items (with caching)
cachedContainerStorage := newCachedContainerStorage(cnrSrc, c.cfgMorph.cacheTTL)
cachedEACLStorage := newCachedEACLStorage(eACLFetcher, c.cfgMorph.cacheTTL)
cachedContainerLister := newCachedContainerLister(client, c.cfgMorph.cacheTTL)
subscribeToContainerCreation(c, func(e event.Event) {
ev := e.(containerEvent.PutSuccess)
// read owner of the created container in order to update the reading cache.
// TODO: use owner directly from the event after neofs-contract#256 will become resolved
// but don't forget about the profit of reading the new container and caching it:
// creation success are most commonly tracked by polling GET op.
cnr, err := cnrSrc.Get(ev.ID)
if err == nil {
cachedContainerLister.update(cnr.Value.Owner(), ev.ID, true)
cachedContainerStorage.set(ev.ID, cnr, nil)
} else {
// unlike removal, we expect successful receive of the container
// after successful creation, so logging can be useful
c.log.Error(logs.FrostFSNodeReadNewlyCreatedContainerAfterTheNotification,
zap.Stringer("id", ev.ID),
zap.Error(err),
)
}
c.log.Debug(logs.FrostFSNodeContainerCreationEventsReceipt,
zap.Stringer("id", ev.ID),
)
})
subscribeToContainerRemoval(c, func(e event.Event) {
ev := e.(containerEvent.DeleteSuccess)
// read owner of the removed container in order to update the listing cache.
// It's strange to read already removed container, but we can successfully hit
// the cache.
// TODO: use owner directly from the event after neofs-contract#256 will become resolved
cnr, err := cachedContainerStorage.Get(ev.ID)
if err == nil {
cachedContainerLister.update(cnr.Value.Owner(), ev.ID, false)
}
cachedContainerStorage.handleRemoval(ev.ID)
c.log.Debug(logs.FrostFSNodeContainerRemovalEventsReceipt,
zap.Stringer("id", ev.ID),
)
})
c.cfgObject.eaclSource = cachedEACLStorage
c.cfgObject.cnrSource = cachedContainerStorage
cnrRdr.lister = cachedContainerLister
cnrRdr.eacl = c.cfgObject.eaclSource
cnrRdr.get = c.cfgObject.cnrSource
cnrWrt.cacheEnabled = true
cnrWrt.eacls = cachedEACLStorage
}
return cnrRdr, cnrWrt
}
func setLoadController(ctx context.Context, c *cfg, loadRouter *loadroute.Router, loadAccumulator *loadstorage.Storage) {
pubKey := c.key.PublicKey().Bytes()
// container wrapper that always sends non-notary
// requests
wrapperNoNotary, err := cntClient.NewFromMorph(c.cfgMorph.client, c.cfgContainer.scriptHash, 0)
fatalOnErr(err)
resultWriter := &morphLoadWriter{
log: c.log,
cnrMorphClient: wrapperNoNotary,
key: pubKey,
}
localMetrics := &localStorageLoad{
log: c.log,
engine: c.cfgObject.cfgLocalStorage.localStorage,
}
ctrl := loadcontroller.New(
loadcontroller.Prm{
LocalMetrics: loadcontroller.SimpleIteratorProvider(localMetrics),
AnnouncementAccumulator: loadcontroller.SimpleIteratorProvider(loadAccumulator),
LocalAnnouncementTarget: loadRouter,
ResultReceiver: loadcontroller.SimpleWriterProvider(resultWriter),
},
loadcontroller.WithLogger(c.log),
)
setContainerNotificationParser(c, startEstimationNotifyEvent, containerEvent.ParseStartEstimation)
addContainerAsyncNotificationHandler(c, startEstimationNotifyEvent, func(ev event.Event) {
ctrl.Start(ctx, loadcontroller.StartPrm{
Epoch: ev.(containerEvent.StartEstimation).Epoch(),
})
})
setContainerNotificationParser(c, stopEstimationNotifyEvent, containerEvent.ParseStopEstimation)
addContainerAsyncNotificationHandler(c, stopEstimationNotifyEvent, func(ev event.Event) {
ctrl.Stop(ctx, loadcontroller.StopPrm{
Epoch: ev.(containerEvent.StopEstimation).Epoch(),
})
})
}
// addContainerNotificationHandler adds handler that will be executed synchronously.
func addContainerNotificationHandler(c *cfg, sTyp string, h event.Handler) {
typ := event.TypeFromString(sTyp)
if c.cfgContainer.subscribers == nil {
c.cfgContainer.subscribers = make(map[event.Type][]event.Handler, 1)
}
c.cfgContainer.subscribers[typ] = append(c.cfgContainer.subscribers[typ], h)
}
// addContainerAsyncNotificationHandler adds handler that will be executed asynchronously via container workerPool.
func addContainerAsyncNotificationHandler(c *cfg, sTyp string, h event.Handler) {
addContainerNotificationHandler(
c,
sTyp,
event.WorkerPoolHandler(
c.cfgContainer.workerPool,
h,
c.log,
),
)
}
// stores already registered parsers of the notification events thrown by Container contract.
// MUST NOT be used concurrently.
var mRegisteredParsersContainer = make(map[string]struct{})
// registers event parser by name once. MUST NOT be called concurrently.
func registerEventParserOnceContainer(c *cfg, name string, p event.NotificationParser) {
if _, ok := mRegisteredParsersContainer[name]; !ok {
setContainerNotificationParser(c, name, p)
mRegisteredParsersContainer[name] = struct{}{}
}
}
// subscribes to successful container creation. Provided handler is called asynchronously
// on corresponding routine pool. MUST NOT be called concurrently with itself and other
// similar functions.
func subscribeToContainerCreation(c *cfg, h event.Handler) {
const eventNameContainerCreated = "PutSuccess"
registerEventParserOnceContainer(c, eventNameContainerCreated, containerEvent.ParsePutSuccess)
addContainerAsyncNotificationHandler(c, eventNameContainerCreated, h)
}
// like subscribeToContainerCreation but for removal.
func subscribeToContainerRemoval(c *cfg, h event.Handler) {
const eventNameContainerRemoved = "DeleteSuccess"
registerEventParserOnceContainer(c, eventNameContainerRemoved, containerEvent.ParseDeleteSuccess)
addContainerAsyncNotificationHandler(c, eventNameContainerRemoved, h)
}
func setContainerNotificationParser(c *cfg, sTyp string, p event.NotificationParser) {
typ := event.TypeFromString(sTyp)
if c.cfgContainer.parsers == nil {
c.cfgContainer.parsers = make(map[event.Type]event.NotificationParser, 1)
}
c.cfgContainer.parsers[typ] = p
}
type morphLoadWriter struct {
log *logger.Logger
cnrMorphClient *cntClient.Client
key []byte
}
func (w *morphLoadWriter) Put(a containerSDK.SizeEstimation) error {
w.log.Debug(logs.FrostFSNodeSaveUsedSpaceAnnouncementInContract,
zap.Uint64("epoch", a.Epoch()),
zap.Stringer("cid", a.Container()),
zap.Uint64("size", a.Value()),
)
prm := cntClient.AnnounceLoadPrm{}
prm.SetAnnouncement(a)
prm.SetReporter(w.key)
return w.cnrMorphClient.AnnounceLoad(prm)
}
func (*morphLoadWriter) Close(context.Context) error {
return nil
}
type nopLoadWriter struct{}
func (nopLoadWriter) Put(containerSDK.SizeEstimation) error {
return nil
}
func (nopLoadWriter) Close(context.Context) error {
return nil
}
type remoteLoadAnnounceProvider struct {
key *ecdsa.PrivateKey
netmapKeys netmapCore.AnnouncedKeys
clientCache interface {
Get(client.NodeInfo) (client.MultiAddressClient, error)
}
deadEndProvider loadcontroller.WriterProvider
}
func (r *remoteLoadAnnounceProvider) InitRemote(srv loadcontroller.ServerInfo) (loadcontroller.WriterProvider, error) {
if srv == nil {
return r.deadEndProvider, nil
}
if r.netmapKeys.IsLocalKey(srv.PublicKey()) {
// if local => return no-op writer
return loadcontroller.SimpleWriterProvider(new(nopLoadWriter)), nil
}
var info client.NodeInfo
err := client.NodeInfoFromRawNetmapElement(&info, srv)
if err != nil {
return nil, fmt.Errorf("parse client node info: %w", err)
}
c, err := r.clientCache.Get(info)
if err != nil {
return nil, fmt.Errorf("could not initialize API client: %w", err)
}
return &remoteLoadAnnounceWriterProvider{
client: c,
}, nil
}
type remoteLoadAnnounceWriterProvider struct {
client client.Client
}
func (p *remoteLoadAnnounceWriterProvider) InitWriter([]loadcontroller.ServerInfo) (loadcontroller.Writer, error) {
return &remoteLoadAnnounceWriter{
client: p.client,
}, nil
}
type remoteLoadAnnounceWriter struct {
client client.Client
buf []containerSDK.SizeEstimation
}
func (r *remoteLoadAnnounceWriter) Put(a containerSDK.SizeEstimation) error {
r.buf = append(r.buf, a)
return nil
}
func (r *remoteLoadAnnounceWriter) Close(ctx context.Context) error {
var cliPrm apiClient.PrmAnnounceSpace
cliPrm.SetValues(r.buf)
_, err := r.client.ContainerAnnounceUsedSpace(ctx, cliPrm)
return err
}
type loadPlacementBuilder struct {
log *logger.Logger
nmSrc netmapCore.Source
cnrSrc containerCore.Source
}
func (l *loadPlacementBuilder) BuildPlacement(epoch uint64, cnr cid.ID) ([][]netmap.NodeInfo, error) {
cnrNodes, nm, err := l.buildPlacement(epoch, cnr)
if err != nil {
return nil, err
}
const pivotPrefix = "load_announcement_"
pivot := []byte(
pivotPrefix + strconv.FormatUint(epoch, 10),
)
placement, err := nm.PlacementVectors(cnrNodes, pivot)
if err != nil {
return nil, fmt.Errorf("could not build placement vectors: %w", err)
}
return placement, nil
}
func (l *loadPlacementBuilder) buildPlacement(epoch uint64, idCnr cid.ID) ([][]netmap.NodeInfo, *netmap.NetMap, error) {
cnr, err := l.cnrSrc.Get(idCnr)
if err != nil {
return nil, nil, err
}
nm, err := l.nmSrc.GetNetMapByEpoch(epoch)
if err != nil {
return nil, nil, fmt.Errorf("could not get network map: %w", err)
}
binCnr := make([]byte, sha256.Size)
idCnr.Encode(binCnr)
cnrNodes, err := nm.ContainerNodes(cnr.Value.PlacementPolicy(), binCnr)
if err != nil {
return nil, nil, fmt.Errorf("could not build container nodes: %w", err)
}
return cnrNodes, nm, nil
}
type localStorageLoad struct {
log *logger.Logger
engine *engine.StorageEngine
}
func (d *localStorageLoad) Iterate(f loadcontroller.UsedSpaceFilter, h loadcontroller.UsedSpaceHandler) error {
idList, err := engine.ListContainers(d.engine)
if err != nil {
return fmt.Errorf("list containers on engine failure: %w", err)
}
for i := range idList {
sz, err := engine.ContainerSize(d.engine, idList[i])
if err != nil {
d.log.Debug(logs.FrostFSNodeFailedToCalculateContainerSizeInStorageEngine,
zap.Stringer("cid", idList[i]),
zap.String("error", err.Error()),
)
continue
}
d.log.Debug(logs.FrostFSNodeContainerSizeInStorageEngineCalculatedSuccessfully,
zap.Uint64("size", sz),
zap.Stringer("cid", idList[i]),
)
var a containerSDK.SizeEstimation
a.SetContainer(idList[i])
a.SetValue(sz)
if f != nil && !f(a) {
continue
}
if err := h(a); err != nil {
return err
}
}
return nil
}
type usedSpaceService struct {
containerService.Server
loadWriterProvider loadcontroller.WriterProvider
loadPlacementBuilder *loadPlacementBuilder
routeBuilder loadroute.Builder
cfg *cfg
}
func (c *cfg) PublicKey() []byte {
return nodeKeyFromNetmap(c)
}
func (c *cfg) IsLocalKey(key []byte) bool {
return bytes.Equal(key, c.PublicKey())
}
func (c *cfg) IterateAddresses(f func(string) bool) {
c.iterateNetworkAddresses(f)
}
func (c *cfg) NumberOfAddresses() int {
return c.addressNum()
}
func (c *cfg) ExternalAddresses() []string {
return c.cfgNodeInfo.localInfo.ExternalAddresses()
}
func (c *usedSpaceService) PublicKey() []byte {
return nodeKeyFromNetmap(c.cfg)
}
func (c *usedSpaceService) IterateAddresses(f func(string) bool) {
c.cfg.iterateNetworkAddresses(f)
}
func (c *usedSpaceService) NumberOfAddresses() int {
return c.cfg.addressNum()
}
func (c *usedSpaceService) ExternalAddresses() []string {
return c.cfg.ExternalAddresses()
}
func (c *usedSpaceService) AnnounceUsedSpace(ctx context.Context, req *containerV2.AnnounceUsedSpaceRequest) (*containerV2.AnnounceUsedSpaceResponse, error) {
var passedRoute []loadcontroller.ServerInfo
for hdr := req.GetVerificationHeader(); hdr != nil; hdr = hdr.GetOrigin() {
passedRoute = append(passedRoute, &containerOnlyKeyRemoteServerInfo{
key: hdr.GetBodySignature().GetKey(),
})
}
for left, right := 0, len(passedRoute)-1; left < right; left, right = left+1, right-1 {
passedRoute[left], passedRoute[right] = passedRoute[right], passedRoute[left]
}
passedRoute = append(passedRoute, c)
w, err := c.loadWriterProvider.InitWriter(passedRoute)
if err != nil {
return nil, fmt.Errorf("could not initialize container's used space writer: %w", err)
}
var est containerSDK.SizeEstimation
for _, aV2 := range req.GetBody().GetAnnouncements() {
err = est.ReadFromV2(aV2)
if err != nil {
return nil, fmt.Errorf("invalid size announcement: %w", err)
}
if err := c.processLoadValue(ctx, est, passedRoute, w); err != nil {
return nil, err
}
}
respBody := new(containerV2.AnnounceUsedSpaceResponseBody)
resp := new(containerV2.AnnounceUsedSpaceResponse)
resp.SetBody(respBody)
return resp, nil
}
var errNodeOutsideContainer = errors.New("node outside the container")
type containerOnlyKeyRemoteServerInfo struct {
key []byte
}
func (i *containerOnlyKeyRemoteServerInfo) PublicKey() []byte {
return i.key
}
func (*containerOnlyKeyRemoteServerInfo) IterateAddresses(func(string) bool) {
}
func (*containerOnlyKeyRemoteServerInfo) NumberOfAddresses() int {
return 0
}
func (*containerOnlyKeyRemoteServerInfo) ExternalAddresses() []string {
return nil
}
func (l *loadPlacementBuilder) isNodeFromContainerKey(epoch uint64, cnr cid.ID, key []byte) (bool, error) {
cnrNodes, _, err := l.buildPlacement(epoch, cnr)
if err != nil {
return false, err
}
for i := range cnrNodes {
for j := range cnrNodes[i] {
if bytes.Equal(cnrNodes[i][j].PublicKey(), key) {
return true, nil
}
}
}
return false, nil
}
func (c *usedSpaceService) processLoadValue(_ context.Context, a containerSDK.SizeEstimation,
route []loadcontroller.ServerInfo, w loadcontroller.Writer) error {
fromCnr, err := c.loadPlacementBuilder.isNodeFromContainerKey(a.Epoch(), a.Container(), route[0].PublicKey())
if err != nil {
return fmt.Errorf("could not verify that the sender belongs to the container: %w", err)
} else if !fromCnr {
return errNodeOutsideContainer
}
err = loadroute.CheckRoute(c.routeBuilder, a, route)
if err != nil {
return fmt.Errorf("wrong route of container's used space value: %w", err)
}
err = w.Put(a)
if err != nil {
return fmt.Errorf("could not write container's used space value: %w", err)
}
return nil
}
// implements interface required by container service provided by morph executor.
type morphContainerReader struct {
eacl containerCore.EACLSource
get containerCore.Source
lister interface {
List(*user.ID) ([]cid.ID, error)
}
}
func (x *morphContainerReader) Get(id cid.ID) (*containerCore.Container, error) {
return x.get.Get(id)
}
func (x *morphContainerReader) GetEACL(id cid.ID) (*containerCore.EACL, error) {
return x.eacl.GetEACL(id)
}
func (x *morphContainerReader) List(id *user.ID) ([]cid.ID, error) {
return x.lister.List(id)
}
type morphContainerWriter struct {
neoClient *cntClient.Client
cacheEnabled bool
eacls ttlEACLStorage
}
func (m morphContainerWriter) Put(cnr containerCore.Container) (*cid.ID, error) {
return cntClient.Put(m.neoClient, cnr)
}
func (m morphContainerWriter) Delete(witness containerCore.RemovalWitness) error {
return cntClient.Delete(m.neoClient, witness)
}
func (m morphContainerWriter) PutEACL(eaclInfo containerCore.EACL) error {
err := cntClient.PutEACL(m.neoClient, eaclInfo)
if err != nil {
return err
}
if m.cacheEnabled {
id, _ := eaclInfo.Value.CID()
m.eacls.InvalidateEACL(id)
}
return nil
}