package main import ( "bytes" "context" "net" containerGRPC "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/container/grpc" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" containerCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container" cntClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/container" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/frostfsid" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event" containerEvent "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event/container" containerTransportGRPC "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network/transport/container/grpc" containerService "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/container" containerMorph "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/container/morph" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user" "go.uber.org/zap" "google.golang.org/grpc" ) func initContainerService(_ context.Context, c *cfg) { // container wrapper that tries to invoke notary // requests if chain is configured so wrap, err := cntClient.NewFromMorph(c.cfgMorph.client, c.cfgContainer.scriptHash, 0, cntClient.TryNotary()) fatalOnErr(err) c.shared.cnrClient = wrap cnrSrc := cntClient.AsContainerSource(wrap) cnrRdr, cnrWrt := configureEACLAndContainerSources(c, wrap, cnrSrc) frostFSIDClient, err := frostfsid.NewFromMorph(c.cfgMorph.client, c.cfgFrostfsID.scriptHash, 0) fatalOnErr(err) server := containerTransportGRPC.New( containerService.NewSignService( &c.key.PrivateKey, containerService.NewAPEServer(c.cfgObject.cfgAccessPolicyEngine.accessPolicyEngine, cnrRdr, newCachedIRFetcher(createInnerRingFetcher(c)), c.netMapSource, frostFSIDClient, containerService.NewExecutionService(containerMorph.NewExecutor(cnrRdr, cnrWrt), c.respSvc), ), ), ) c.cfgGRPC.performAndSave(func(_ string, _ net.Listener, s *grpc.Server) { containerGRPC.RegisterContainerServiceServer(s, server) }) c.cfgObject.cfgLocalStorage.localStorage.SetContainerSource(cnrRdr) } func configureEACLAndContainerSources(c *cfg, client *cntClient.Client, cnrSrc containerCore.Source) (*morphContainerReader, *morphContainerWriter) { eACLFetcher := &morphEACLFetcher{ w: client, } cnrRdr := new(morphContainerReader) cnrWrt := &morphContainerWriter{ neoClient: client, } if c.cfgMorph.cacheTTL <= 0 { c.cfgObject.eaclSource = eACLFetcher cnrRdr.eacl = eACLFetcher c.cfgObject.cnrSource = cnrSrc cnrRdr.src = cnrSrc cnrRdr.lister = client } else { // use RPC node as source of Container contract items (with caching) cachedContainerStorage := newCachedContainerStorage(cnrSrc, c.cfgMorph.cacheTTL) cachedEACLStorage := newCachedEACLStorage(eACLFetcher, c.cfgMorph.cacheTTL) subscribeToContainerCreation(c, func(e event.Event) { ev := e.(containerEvent.PutSuccess) // read owner of the created container in order to update the reading cache. // TODO: use owner directly from the event after neofs-contract#256 will become resolved // but don't forget about the profit of reading the new container and caching it: // creation success are most commonly tracked by polling GET op. cnr, err := cnrSrc.Get(ev.ID) if err == nil { cachedContainerStorage.containerCache.set(ev.ID, cnr, nil) } else { // unlike removal, we expect successful receive of the container // after successful creation, so logging can be useful c.log.Error(logs.FrostFSNodeReadNewlyCreatedContainerAfterTheNotification, zap.Stringer("id", ev.ID), zap.Error(err), ) } c.log.Debug(logs.FrostFSNodeContainerCreationEventsReceipt, zap.Stringer("id", ev.ID), ) }) subscribeToContainerRemoval(c, func(e event.Event) { ev := e.(containerEvent.DeleteSuccess) cachedContainerStorage.handleRemoval(ev.ID) c.log.Debug(logs.FrostFSNodeContainerRemovalEventsReceipt, zap.Stringer("id", ev.ID), ) }) c.cfgObject.eaclSource = cachedEACLStorage c.cfgObject.cnrSource = cachedContainerStorage cnrRdr.lister = client cnrRdr.eacl = c.cfgObject.eaclSource cnrRdr.src = c.cfgObject.cnrSource cnrWrt.cacheEnabled = true cnrWrt.eacls = cachedEACLStorage } return cnrRdr, cnrWrt } // addContainerNotificationHandler adds handler that will be executed synchronously. func addContainerNotificationHandler(c *cfg, sTyp string, h event.Handler) { typ := event.TypeFromString(sTyp) if c.cfgContainer.subscribers == nil { c.cfgContainer.subscribers = make(map[event.Type][]event.Handler, 1) } c.cfgContainer.subscribers[typ] = append(c.cfgContainer.subscribers[typ], h) } // addContainerAsyncNotificationHandler adds handler that will be executed asynchronously via container workerPool. func addContainerAsyncNotificationHandler(c *cfg, sTyp string, h event.Handler) { addContainerNotificationHandler( c, sTyp, event.WorkerPoolHandler( c.cfgContainer.workerPool, h, c.log, ), ) } // stores already registered parsers of the notification events thrown by Container contract. // MUST NOT be used concurrently. var mRegisteredParsersContainer = make(map[string]struct{}) // registers event parser by name once. MUST NOT be called concurrently. func registerEventParserOnceContainer(c *cfg, name string, p event.NotificationParser) { if _, ok := mRegisteredParsersContainer[name]; !ok { setContainerNotificationParser(c, name, p) mRegisteredParsersContainer[name] = struct{}{} } } // subscribes to successful container creation. Provided handler is called asynchronously // on corresponding routine pool. MUST NOT be called concurrently with itself and other // similar functions. func subscribeToContainerCreation(c *cfg, h event.Handler) { const eventNameContainerCreated = "PutSuccess" registerEventParserOnceContainer(c, eventNameContainerCreated, containerEvent.ParsePutSuccess) addContainerAsyncNotificationHandler(c, eventNameContainerCreated, h) } // like subscribeToContainerCreation but for removal. func subscribeToContainerRemoval(c *cfg, h event.Handler) { const eventNameContainerRemoved = "DeleteSuccess" registerEventParserOnceContainer(c, eventNameContainerRemoved, containerEvent.ParseDeleteSuccess) addContainerAsyncNotificationHandler(c, eventNameContainerRemoved, h) } func setContainerNotificationParser(c *cfg, sTyp string, p event.NotificationParser) { typ := event.TypeFromString(sTyp) if c.cfgContainer.parsers == nil { c.cfgContainer.parsers = make(map[event.Type]event.NotificationParser, 1) } c.cfgContainer.parsers[typ] = p } func (c *cfg) PublicKey() []byte { return nodeKeyFromNetmap(c) } func (c *cfg) IsLocalKey(key []byte) bool { return bytes.Equal(key, c.PublicKey()) } func (c *cfg) IterateAddresses(f func(string) bool) { c.iterateNetworkAddresses(f) } func (c *cfg) NumberOfAddresses() int { return c.addressNum() } func (c *cfg) ExternalAddresses() []string { return c.cfgNodeInfo.localInfo.ExternalAddresses() } // implements interface required by container service provided by morph executor. type morphContainerReader struct { eacl containerCore.EACLSource src containerCore.Source lister interface { ContainersOf(*user.ID) ([]cid.ID, error) } } func (x *morphContainerReader) Get(id cid.ID) (*containerCore.Container, error) { return x.src.Get(id) } func (x *morphContainerReader) DeletionInfo(id cid.ID) (*containerCore.DelInfo, error) { return x.src.DeletionInfo(id) } func (x *morphContainerReader) GetEACL(id cid.ID) (*containerCore.EACL, error) { return x.eacl.GetEACL(id) } func (x *morphContainerReader) ContainersOf(id *user.ID) ([]cid.ID, error) { return x.lister.ContainersOf(id) } type morphContainerWriter struct { neoClient *cntClient.Client cacheEnabled bool eacls ttlEACLStorage } func (m morphContainerWriter) Put(cnr containerCore.Container) (*cid.ID, error) { return cntClient.Put(m.neoClient, cnr) } func (m morphContainerWriter) Delete(witness containerCore.RemovalWitness) error { return cntClient.Delete(m.neoClient, witness) } func (m morphContainerWriter) PutEACL(eaclInfo containerCore.EACL) error { err := cntClient.PutEACL(m.neoClient, eaclInfo) if err != nil { return err } if m.cacheEnabled { id, _ := eaclInfo.Value.CID() m.eacls.InvalidateEACL(id) } return nil }