forked from TrueCloudLab/frostfs-node
270 lines
8.7 KiB
Go
270 lines
8.7 KiB
Go
package main
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"net"
|
|
|
|
containerGRPC "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/container/grpc"
|
|
morphconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/morph"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
|
containerCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
|
frostfsidcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/frostfsid"
|
|
cntClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/container"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/frostfsid"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event"
|
|
containerEvent "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event/container"
|
|
containerTransportGRPC "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network/transport/container/grpc"
|
|
containerService "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/container"
|
|
containerMorph "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/container/morph"
|
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
|
"go.uber.org/zap"
|
|
"google.golang.org/grpc"
|
|
)
|
|
|
|
func initContainerService(_ context.Context, c *cfg) {
|
|
// container wrapper that tries to invoke notary
|
|
// requests if chain is configured so
|
|
wrap, err := cntClient.NewFromMorph(c.cfgMorph.client, c.cfgContainer.scriptHash, 0, cntClient.TryNotary())
|
|
fatalOnErr(err)
|
|
|
|
c.shared.cnrClient = wrap
|
|
|
|
cnrSrc := cntClient.AsContainerSource(wrap)
|
|
|
|
cnrRdr, cnrWrt := configureEACLAndContainerSources(c, wrap, cnrSrc)
|
|
|
|
var frostfsIDSubjectProvider frostfsidcore.SubjectProvider
|
|
frostfsIDSubjectProvider, err = frostfsid.NewFromMorph(c.cfgMorph.client, c.cfgFrostfsID.scriptHash, 0)
|
|
fatalOnErr(err)
|
|
|
|
cacheSize := morphconfig.FrostfsIDCacheSize(c.appCfg)
|
|
if cacheSize > 0 {
|
|
frostfsIDSubjectProvider = newMorphFrostfsIDCache(frostfsIDSubjectProvider, int(cacheSize), c.cfgMorph.cacheTTL)
|
|
}
|
|
|
|
c.shared.frostfsidClient = frostfsIDSubjectProvider
|
|
|
|
server := containerTransportGRPC.New(
|
|
containerService.NewSignService(
|
|
&c.key.PrivateKey,
|
|
containerService.NewAPEServer(c.cfgObject.cfgAccessPolicyEngine.accessPolicyEngine, cnrRdr,
|
|
newCachedIRFetcher(createInnerRingFetcher(c)), c.netMapSource, c.shared.frostfsidClient,
|
|
containerService.NewExecutionService(containerMorph.NewExecutor(cnrRdr, cnrWrt), c.respSvc),
|
|
),
|
|
),
|
|
)
|
|
|
|
c.cfgGRPC.performAndSave(func(_ string, _ net.Listener, s *grpc.Server) {
|
|
containerGRPC.RegisterContainerServiceServer(s, server)
|
|
})
|
|
|
|
c.cfgObject.cfgLocalStorage.localStorage.SetContainerSource(cnrRdr)
|
|
}
|
|
|
|
func configureEACLAndContainerSources(c *cfg, client *cntClient.Client, cnrSrc containerCore.Source) (*morphContainerReader, *morphContainerWriter) {
|
|
eACLFetcher := &morphEACLFetcher{
|
|
w: client,
|
|
}
|
|
|
|
cnrRdr := new(morphContainerReader)
|
|
|
|
cnrWrt := &morphContainerWriter{
|
|
neoClient: client,
|
|
}
|
|
|
|
if c.cfgMorph.cacheTTL <= 0 {
|
|
c.cfgObject.eaclSource = eACLFetcher
|
|
cnrRdr.eacl = eACLFetcher
|
|
c.cfgObject.cnrSource = cnrSrc
|
|
cnrRdr.src = cnrSrc
|
|
cnrRdr.lister = client
|
|
} else {
|
|
// use RPC node as source of Container contract items (with caching)
|
|
cachedContainerStorage := newCachedContainerStorage(cnrSrc, c.cfgMorph.cacheTTL)
|
|
cachedEACLStorage := newCachedEACLStorage(eACLFetcher, c.cfgMorph.cacheTTL)
|
|
|
|
subscribeToContainerCreation(c, func(e event.Event) {
|
|
ev := e.(containerEvent.PutSuccess)
|
|
|
|
// read owner of the created container in order to update the reading cache.
|
|
// TODO: use owner directly from the event after neofs-contract#256 will become resolved
|
|
// but don't forget about the profit of reading the new container and caching it:
|
|
// creation success are most commonly tracked by polling GET op.
|
|
cnr, err := cnrSrc.Get(ev.ID)
|
|
if err == nil {
|
|
cachedContainerStorage.containerCache.set(ev.ID, cnr, nil)
|
|
} else {
|
|
// unlike removal, we expect successful receive of the container
|
|
// after successful creation, so logging can be useful
|
|
c.log.Error(logs.FrostFSNodeReadNewlyCreatedContainerAfterTheNotification,
|
|
zap.Stringer("id", ev.ID),
|
|
zap.Error(err),
|
|
)
|
|
}
|
|
|
|
c.log.Debug(logs.FrostFSNodeContainerCreationEventsReceipt,
|
|
zap.Stringer("id", ev.ID),
|
|
)
|
|
})
|
|
|
|
subscribeToContainerRemoval(c, func(e event.Event) {
|
|
ev := e.(containerEvent.DeleteSuccess)
|
|
cachedContainerStorage.handleRemoval(ev.ID)
|
|
c.log.Debug(logs.FrostFSNodeContainerRemovalEventsReceipt,
|
|
zap.Stringer("id", ev.ID),
|
|
)
|
|
})
|
|
|
|
c.cfgObject.eaclSource = cachedEACLStorage
|
|
c.cfgObject.cnrSource = cachedContainerStorage
|
|
|
|
cnrRdr.lister = client
|
|
cnrRdr.eacl = c.cfgObject.eaclSource
|
|
cnrRdr.src = c.cfgObject.cnrSource
|
|
|
|
cnrWrt.cacheEnabled = true
|
|
cnrWrt.eacls = cachedEACLStorage
|
|
}
|
|
|
|
return cnrRdr, cnrWrt
|
|
}
|
|
|
|
// addContainerNotificationHandler adds handler that will be executed synchronously.
|
|
func addContainerNotificationHandler(c *cfg, sTyp string, h event.Handler) {
|
|
typ := event.TypeFromString(sTyp)
|
|
|
|
if c.cfgContainer.subscribers == nil {
|
|
c.cfgContainer.subscribers = make(map[event.Type][]event.Handler, 1)
|
|
}
|
|
|
|
c.cfgContainer.subscribers[typ] = append(c.cfgContainer.subscribers[typ], h)
|
|
}
|
|
|
|
// addContainerAsyncNotificationHandler adds handler that will be executed asynchronously via container workerPool.
|
|
func addContainerAsyncNotificationHandler(c *cfg, sTyp string, h event.Handler) {
|
|
addContainerNotificationHandler(
|
|
c,
|
|
sTyp,
|
|
event.WorkerPoolHandler(
|
|
c.cfgContainer.workerPool,
|
|
h,
|
|
c.log,
|
|
),
|
|
)
|
|
}
|
|
|
|
// stores already registered parsers of the notification events thrown by Container contract.
|
|
// MUST NOT be used concurrently.
|
|
var mRegisteredParsersContainer = make(map[string]struct{})
|
|
|
|
// registers event parser by name once. MUST NOT be called concurrently.
|
|
func registerEventParserOnceContainer(c *cfg, name string, p event.NotificationParser) {
|
|
if _, ok := mRegisteredParsersContainer[name]; !ok {
|
|
setContainerNotificationParser(c, name, p)
|
|
mRegisteredParsersContainer[name] = struct{}{}
|
|
}
|
|
}
|
|
|
|
// subscribes to successful container creation. Provided handler is called asynchronously
|
|
// on corresponding routine pool. MUST NOT be called concurrently with itself and other
|
|
// similar functions.
|
|
func subscribeToContainerCreation(c *cfg, h event.Handler) {
|
|
const eventNameContainerCreated = "PutSuccess"
|
|
registerEventParserOnceContainer(c, eventNameContainerCreated, containerEvent.ParsePutSuccess)
|
|
addContainerAsyncNotificationHandler(c, eventNameContainerCreated, h)
|
|
}
|
|
|
|
// like subscribeToContainerCreation but for removal.
|
|
func subscribeToContainerRemoval(c *cfg, h event.Handler) {
|
|
const eventNameContainerRemoved = "DeleteSuccess"
|
|
registerEventParserOnceContainer(c, eventNameContainerRemoved, containerEvent.ParseDeleteSuccess)
|
|
addContainerAsyncNotificationHandler(c, eventNameContainerRemoved, h)
|
|
}
|
|
|
|
func setContainerNotificationParser(c *cfg, sTyp string, p event.NotificationParser) {
|
|
typ := event.TypeFromString(sTyp)
|
|
|
|
if c.cfgContainer.parsers == nil {
|
|
c.cfgContainer.parsers = make(map[event.Type]event.NotificationParser, 1)
|
|
}
|
|
|
|
c.cfgContainer.parsers[typ] = p
|
|
}
|
|
|
|
func (c *cfg) PublicKey() []byte {
|
|
return nodeKeyFromNetmap(c)
|
|
}
|
|
|
|
func (c *cfg) IsLocalKey(key []byte) bool {
|
|
return bytes.Equal(key, c.PublicKey())
|
|
}
|
|
|
|
func (c *cfg) IterateAddresses(f func(string) bool) {
|
|
c.iterateNetworkAddresses(f)
|
|
}
|
|
|
|
func (c *cfg) NumberOfAddresses() int {
|
|
return c.addressNum()
|
|
}
|
|
|
|
func (c *cfg) ExternalAddresses() []string {
|
|
return c.cfgNodeInfo.localInfo.ExternalAddresses()
|
|
}
|
|
|
|
// implements interface required by container service provided by morph executor.
|
|
type morphContainerReader struct {
|
|
eacl containerCore.EACLSource
|
|
|
|
src containerCore.Source
|
|
|
|
lister interface {
|
|
ContainersOf(*user.ID) ([]cid.ID, error)
|
|
}
|
|
}
|
|
|
|
func (x *morphContainerReader) Get(id cid.ID) (*containerCore.Container, error) {
|
|
return x.src.Get(id)
|
|
}
|
|
|
|
func (x *morphContainerReader) DeletionInfo(id cid.ID) (*containerCore.DelInfo, error) {
|
|
return x.src.DeletionInfo(id)
|
|
}
|
|
|
|
func (x *morphContainerReader) GetEACL(id cid.ID) (*containerCore.EACL, error) {
|
|
return x.eacl.GetEACL(id)
|
|
}
|
|
|
|
func (x *morphContainerReader) ContainersOf(id *user.ID) ([]cid.ID, error) {
|
|
return x.lister.ContainersOf(id)
|
|
}
|
|
|
|
type morphContainerWriter struct {
|
|
neoClient *cntClient.Client
|
|
|
|
cacheEnabled bool
|
|
eacls ttlEACLStorage
|
|
}
|
|
|
|
func (m morphContainerWriter) Put(cnr containerCore.Container) (*cid.ID, error) {
|
|
return cntClient.Put(m.neoClient, cnr)
|
|
}
|
|
|
|
func (m morphContainerWriter) Delete(witness containerCore.RemovalWitness) error {
|
|
return cntClient.Delete(m.neoClient, witness)
|
|
}
|
|
|
|
func (m morphContainerWriter) PutEACL(eaclInfo containerCore.EACL) error {
|
|
err := cntClient.PutEACL(m.neoClient, eaclInfo)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if m.cacheEnabled {
|
|
id, _ := eaclInfo.Value.CID()
|
|
m.eacls.InvalidateEACL(id)
|
|
}
|
|
|
|
return nil
|
|
}
|