frostfs-node/cmd/frostfs-node/container.go

261 lines
8.2 KiB
Go

package main
import (
"bytes"
"context"
"net"
containerGRPC "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/container/grpc"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
containerCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
cntClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/container"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/frostfsid"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event"
containerEvent "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event/container"
containerTransportGRPC "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network/transport/container/grpc"
containerService "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/container"
containerMorph "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/container/morph"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
"go.uber.org/zap"
"google.golang.org/grpc"
)
func initContainerService(_ context.Context, c *cfg) {
// container wrapper that tries to invoke notary
// requests if chain is configured so
wrap, err := cntClient.NewFromMorph(c.cfgMorph.client, c.cfgContainer.scriptHash, 0, cntClient.TryNotary())
fatalOnErr(err)
c.shared.cnrClient = wrap
cnrSrc := cntClient.AsContainerSource(wrap)
cnrRdr, cnrWrt := configureEACLAndContainerSources(c, wrap, cnrSrc)
frostFSIDClient, err := frostfsid.NewFromMorph(c.cfgMorph.client, c.cfgFrostfsID.scriptHash, 0)
fatalOnErr(err)
server := containerTransportGRPC.New(
containerService.NewSignService(
&c.key.PrivateKey,
containerService.NewAPEServer(c.cfgObject.cfgAccessPolicyEngine.accessPolicyEngine, cnrRdr,
newCachedIRFetcher(createInnerRingFetcher(c)), c.netMapSource, frostFSIDClient,
containerService.NewExecutionService(containerMorph.NewExecutor(cnrRdr, cnrWrt), c.respSvc),
),
),
)
c.cfgGRPC.performAndSave(func(_ string, _ net.Listener, s *grpc.Server) {
containerGRPC.RegisterContainerServiceServer(s, server)
})
c.cfgObject.cfgLocalStorage.localStorage.SetContainerSource(cnrRdr)
}
func configureEACLAndContainerSources(c *cfg, client *cntClient.Client, cnrSrc containerCore.Source) (*morphContainerReader, *morphContainerWriter) {
eACLFetcher := &morphEACLFetcher{
w: client,
}
cnrRdr := new(morphContainerReader)
cnrWrt := &morphContainerWriter{
neoClient: client,
}
if c.cfgMorph.cacheTTL <= 0 {
c.cfgObject.eaclSource = eACLFetcher
cnrRdr.eacl = eACLFetcher
c.cfgObject.cnrSource = cnrSrc
cnrRdr.src = cnrSrc
cnrRdr.lister = client
} else {
// use RPC node as source of Container contract items (with caching)
cachedContainerStorage := newCachedContainerStorage(cnrSrc, c.cfgMorph.cacheTTL)
cachedEACLStorage := newCachedEACLStorage(eACLFetcher, c.cfgMorph.cacheTTL)
subscribeToContainerCreation(c, func(e event.Event) {
ev := e.(containerEvent.PutSuccess)
// read owner of the created container in order to update the reading cache.
// TODO: use owner directly from the event after neofs-contract#256 will become resolved
// but don't forget about the profit of reading the new container and caching it:
// creation success are most commonly tracked by polling GET op.
cnr, err := cnrSrc.Get(ev.ID)
if err == nil {
cachedContainerStorage.containerCache.set(ev.ID, cnr, nil)
} else {
// unlike removal, we expect successful receive of the container
// after successful creation, so logging can be useful
c.log.Error(logs.FrostFSNodeReadNewlyCreatedContainerAfterTheNotification,
zap.Stringer("id", ev.ID),
zap.Error(err),
)
}
c.log.Debug(logs.FrostFSNodeContainerCreationEventsReceipt,
zap.Stringer("id", ev.ID),
)
})
subscribeToContainerRemoval(c, func(e event.Event) {
ev := e.(containerEvent.DeleteSuccess)
cachedContainerStorage.handleRemoval(ev.ID)
c.log.Debug(logs.FrostFSNodeContainerRemovalEventsReceipt,
zap.Stringer("id", ev.ID),
)
})
c.cfgObject.eaclSource = cachedEACLStorage
c.cfgObject.cnrSource = cachedContainerStorage
cnrRdr.lister = client
cnrRdr.eacl = c.cfgObject.eaclSource
cnrRdr.src = c.cfgObject.cnrSource
cnrWrt.cacheEnabled = true
cnrWrt.eacls = cachedEACLStorage
}
return cnrRdr, cnrWrt
}
// addContainerNotificationHandler adds handler that will be executed synchronously.
func addContainerNotificationHandler(c *cfg, sTyp string, h event.Handler) {
typ := event.TypeFromString(sTyp)
if c.cfgContainer.subscribers == nil {
c.cfgContainer.subscribers = make(map[event.Type][]event.Handler, 1)
}
c.cfgContainer.subscribers[typ] = append(c.cfgContainer.subscribers[typ], h)
}
// addContainerAsyncNotificationHandler adds handler that will be executed asynchronously via container workerPool.
func addContainerAsyncNotificationHandler(c *cfg, sTyp string, h event.Handler) {
addContainerNotificationHandler(
c,
sTyp,
event.WorkerPoolHandler(
c.cfgContainer.workerPool,
h,
c.log,
),
)
}
// stores already registered parsers of the notification events thrown by Container contract.
// MUST NOT be used concurrently.
var mRegisteredParsersContainer = make(map[string]struct{})
// registers event parser by name once. MUST NOT be called concurrently.
func registerEventParserOnceContainer(c *cfg, name string, p event.NotificationParser) {
if _, ok := mRegisteredParsersContainer[name]; !ok {
setContainerNotificationParser(c, name, p)
mRegisteredParsersContainer[name] = struct{}{}
}
}
// subscribes to successful container creation. Provided handler is called asynchronously
// on corresponding routine pool. MUST NOT be called concurrently with itself and other
// similar functions.
func subscribeToContainerCreation(c *cfg, h event.Handler) {
const eventNameContainerCreated = "PutSuccess"
registerEventParserOnceContainer(c, eventNameContainerCreated, containerEvent.ParsePutSuccess)
addContainerAsyncNotificationHandler(c, eventNameContainerCreated, h)
}
// like subscribeToContainerCreation but for removal.
func subscribeToContainerRemoval(c *cfg, h event.Handler) {
const eventNameContainerRemoved = "DeleteSuccess"
registerEventParserOnceContainer(c, eventNameContainerRemoved, containerEvent.ParseDeleteSuccess)
addContainerAsyncNotificationHandler(c, eventNameContainerRemoved, h)
}
func setContainerNotificationParser(c *cfg, sTyp string, p event.NotificationParser) {
typ := event.TypeFromString(sTyp)
if c.cfgContainer.parsers == nil {
c.cfgContainer.parsers = make(map[event.Type]event.NotificationParser, 1)
}
c.cfgContainer.parsers[typ] = p
}
func (c *cfg) PublicKey() []byte {
return nodeKeyFromNetmap(c)
}
func (c *cfg) IsLocalKey(key []byte) bool {
return bytes.Equal(key, c.PublicKey())
}
func (c *cfg) IterateAddresses(f func(string) bool) {
c.iterateNetworkAddresses(f)
}
func (c *cfg) NumberOfAddresses() int {
return c.addressNum()
}
func (c *cfg) ExternalAddresses() []string {
return c.cfgNodeInfo.localInfo.ExternalAddresses()
}
// implements interface required by container service provided by morph executor.
type morphContainerReader struct {
eacl containerCore.EACLSource
src containerCore.Source
lister interface {
ContainersOf(*user.ID) ([]cid.ID, error)
}
}
func (x *morphContainerReader) Get(id cid.ID) (*containerCore.Container, error) {
return x.src.Get(id)
}
func (x *morphContainerReader) DeletionInfo(id cid.ID) (*containerCore.DelInfo, error) {
return x.src.DeletionInfo(id)
}
func (x *morphContainerReader) GetEACL(id cid.ID) (*containerCore.EACL, error) {
return x.eacl.GetEACL(id)
}
func (x *morphContainerReader) ContainersOf(id *user.ID) ([]cid.ID, error) {
return x.lister.ContainersOf(id)
}
type morphContainerWriter struct {
neoClient *cntClient.Client
cacheEnabled bool
eacls ttlEACLStorage
}
func (m morphContainerWriter) Put(cnr containerCore.Container) (*cid.ID, error) {
return cntClient.Put(m.neoClient, cnr)
}
func (m morphContainerWriter) Delete(witness containerCore.RemovalWitness) error {
return cntClient.Delete(m.neoClient, witness)
}
func (m morphContainerWriter) PutEACL(eaclInfo containerCore.EACL) error {
err := cntClient.PutEACL(m.neoClient, eaclInfo)
if err != nil {
return err
}
if m.cacheEnabled {
id, _ := eaclInfo.Value.CID()
m.eacls.InvalidateEACL(id)
}
return nil
}