forked from TrueCloudLab/frostfs-node
Evgenii Stratonikov
a6c9a337cd
ContainersOf() is better in almost every aspect, besides creating a session when the containers number is between 1024 and 2048 (prefetch script does limited unwrapping). Making List() private helps to ensure it is no longer used and can be safely removed in future. Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
260 lines
8.2 KiB
Go
260 lines
8.2 KiB
Go
package main
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"net"
|
|
|
|
containerGRPC "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/container/grpc"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
|
containerCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
|
cntClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/container"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/frostfsid"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event"
|
|
containerEvent "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event/container"
|
|
containerTransportGRPC "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network/transport/container/grpc"
|
|
containerService "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/container"
|
|
containerMorph "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/container/morph"
|
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
|
"go.uber.org/zap"
|
|
"google.golang.org/grpc"
|
|
)
|
|
|
|
func initContainerService(_ context.Context, c *cfg) {
|
|
// container wrapper that tries to invoke notary
|
|
// requests if chain is configured so
|
|
wrap, err := cntClient.NewFromMorph(c.cfgMorph.client, c.cfgContainer.scriptHash, 0, cntClient.TryNotary())
|
|
fatalOnErr(err)
|
|
|
|
c.shared.cnrClient = wrap
|
|
|
|
cnrSrc := cntClient.AsContainerSource(wrap)
|
|
|
|
cnrRdr, cnrWrt := configureEACLAndContainerSources(c, wrap, cnrSrc)
|
|
|
|
frostFSIDClient, err := frostfsid.NewFromMorph(c.cfgMorph.client, c.cfgFrostfsID.scriptHash, 0)
|
|
fatalOnErr(err)
|
|
|
|
server := containerTransportGRPC.New(
|
|
containerService.NewSignService(
|
|
&c.key.PrivateKey,
|
|
containerService.NewAPEServer(c.cfgObject.cfgAccessPolicyEngine.accessPolicyEngine, cnrRdr,
|
|
newCachedIRFetcher(createInnerRingFetcher(c)), c.netMapSource, frostFSIDClient,
|
|
containerService.NewExecutionService(containerMorph.NewExecutor(cnrRdr, cnrWrt), c.respSvc),
|
|
),
|
|
),
|
|
)
|
|
|
|
c.cfgGRPC.performAndSave(func(_ string, _ net.Listener, s *grpc.Server) {
|
|
containerGRPC.RegisterContainerServiceServer(s, server)
|
|
})
|
|
|
|
c.cfgObject.cfgLocalStorage.localStorage.SetContainerSource(cnrRdr)
|
|
}
|
|
|
|
func configureEACLAndContainerSources(c *cfg, client *cntClient.Client, cnrSrc containerCore.Source) (*morphContainerReader, *morphContainerWriter) {
|
|
eACLFetcher := &morphEACLFetcher{
|
|
w: client,
|
|
}
|
|
|
|
cnrRdr := new(morphContainerReader)
|
|
|
|
cnrWrt := &morphContainerWriter{
|
|
neoClient: client,
|
|
}
|
|
|
|
if c.cfgMorph.cacheTTL <= 0 {
|
|
c.cfgObject.eaclSource = eACLFetcher
|
|
cnrRdr.eacl = eACLFetcher
|
|
c.cfgObject.cnrSource = cnrSrc
|
|
cnrRdr.src = cnrSrc
|
|
cnrRdr.lister = client
|
|
} else {
|
|
// use RPC node as source of Container contract items (with caching)
|
|
cachedContainerStorage := newCachedContainerStorage(cnrSrc, c.cfgMorph.cacheTTL)
|
|
cachedEACLStorage := newCachedEACLStorage(eACLFetcher, c.cfgMorph.cacheTTL)
|
|
|
|
subscribeToContainerCreation(c, func(e event.Event) {
|
|
ev := e.(containerEvent.PutSuccess)
|
|
|
|
// read owner of the created container in order to update the reading cache.
|
|
// TODO: use owner directly from the event after neofs-contract#256 will become resolved
|
|
// but don't forget about the profit of reading the new container and caching it:
|
|
// creation success are most commonly tracked by polling GET op.
|
|
cnr, err := cnrSrc.Get(ev.ID)
|
|
if err == nil {
|
|
cachedContainerStorage.containerCache.set(ev.ID, cnr, nil)
|
|
} else {
|
|
// unlike removal, we expect successful receive of the container
|
|
// after successful creation, so logging can be useful
|
|
c.log.Error(logs.FrostFSNodeReadNewlyCreatedContainerAfterTheNotification,
|
|
zap.Stringer("id", ev.ID),
|
|
zap.Error(err),
|
|
)
|
|
}
|
|
|
|
c.log.Debug(logs.FrostFSNodeContainerCreationEventsReceipt,
|
|
zap.Stringer("id", ev.ID),
|
|
)
|
|
})
|
|
|
|
subscribeToContainerRemoval(c, func(e event.Event) {
|
|
ev := e.(containerEvent.DeleteSuccess)
|
|
cachedContainerStorage.handleRemoval(ev.ID)
|
|
c.log.Debug(logs.FrostFSNodeContainerRemovalEventsReceipt,
|
|
zap.Stringer("id", ev.ID),
|
|
)
|
|
})
|
|
|
|
c.cfgObject.eaclSource = cachedEACLStorage
|
|
c.cfgObject.cnrSource = cachedContainerStorage
|
|
|
|
cnrRdr.lister = client
|
|
cnrRdr.eacl = c.cfgObject.eaclSource
|
|
cnrRdr.src = c.cfgObject.cnrSource
|
|
|
|
cnrWrt.cacheEnabled = true
|
|
cnrWrt.eacls = cachedEACLStorage
|
|
}
|
|
|
|
return cnrRdr, cnrWrt
|
|
}
|
|
|
|
// addContainerNotificationHandler adds handler that will be executed synchronously.
|
|
func addContainerNotificationHandler(c *cfg, sTyp string, h event.Handler) {
|
|
typ := event.TypeFromString(sTyp)
|
|
|
|
if c.cfgContainer.subscribers == nil {
|
|
c.cfgContainer.subscribers = make(map[event.Type][]event.Handler, 1)
|
|
}
|
|
|
|
c.cfgContainer.subscribers[typ] = append(c.cfgContainer.subscribers[typ], h)
|
|
}
|
|
|
|
// addContainerAsyncNotificationHandler adds handler that will be executed asynchronously via container workerPool.
|
|
func addContainerAsyncNotificationHandler(c *cfg, sTyp string, h event.Handler) {
|
|
addContainerNotificationHandler(
|
|
c,
|
|
sTyp,
|
|
event.WorkerPoolHandler(
|
|
c.cfgContainer.workerPool,
|
|
h,
|
|
c.log,
|
|
),
|
|
)
|
|
}
|
|
|
|
// stores already registered parsers of the notification events thrown by Container contract.
|
|
// MUST NOT be used concurrently.
|
|
var mRegisteredParsersContainer = make(map[string]struct{})
|
|
|
|
// registers event parser by name once. MUST NOT be called concurrently.
|
|
func registerEventParserOnceContainer(c *cfg, name string, p event.NotificationParser) {
|
|
if _, ok := mRegisteredParsersContainer[name]; !ok {
|
|
setContainerNotificationParser(c, name, p)
|
|
mRegisteredParsersContainer[name] = struct{}{}
|
|
}
|
|
}
|
|
|
|
// subscribes to successful container creation. Provided handler is called asynchronously
|
|
// on corresponding routine pool. MUST NOT be called concurrently with itself and other
|
|
// similar functions.
|
|
func subscribeToContainerCreation(c *cfg, h event.Handler) {
|
|
const eventNameContainerCreated = "PutSuccess"
|
|
registerEventParserOnceContainer(c, eventNameContainerCreated, containerEvent.ParsePutSuccess)
|
|
addContainerAsyncNotificationHandler(c, eventNameContainerCreated, h)
|
|
}
|
|
|
|
// like subscribeToContainerCreation but for removal.
|
|
func subscribeToContainerRemoval(c *cfg, h event.Handler) {
|
|
const eventNameContainerRemoved = "DeleteSuccess"
|
|
registerEventParserOnceContainer(c, eventNameContainerRemoved, containerEvent.ParseDeleteSuccess)
|
|
addContainerAsyncNotificationHandler(c, eventNameContainerRemoved, h)
|
|
}
|
|
|
|
func setContainerNotificationParser(c *cfg, sTyp string, p event.NotificationParser) {
|
|
typ := event.TypeFromString(sTyp)
|
|
|
|
if c.cfgContainer.parsers == nil {
|
|
c.cfgContainer.parsers = make(map[event.Type]event.NotificationParser, 1)
|
|
}
|
|
|
|
c.cfgContainer.parsers[typ] = p
|
|
}
|
|
|
|
func (c *cfg) PublicKey() []byte {
|
|
return nodeKeyFromNetmap(c)
|
|
}
|
|
|
|
func (c *cfg) IsLocalKey(key []byte) bool {
|
|
return bytes.Equal(key, c.PublicKey())
|
|
}
|
|
|
|
func (c *cfg) IterateAddresses(f func(string) bool) {
|
|
c.iterateNetworkAddresses(f)
|
|
}
|
|
|
|
func (c *cfg) NumberOfAddresses() int {
|
|
return c.addressNum()
|
|
}
|
|
|
|
func (c *cfg) ExternalAddresses() []string {
|
|
return c.cfgNodeInfo.localInfo.ExternalAddresses()
|
|
}
|
|
|
|
// implements interface required by container service provided by morph executor.
|
|
type morphContainerReader struct {
|
|
eacl containerCore.EACLSource
|
|
|
|
src containerCore.Source
|
|
|
|
lister interface {
|
|
ContainersOf(*user.ID) ([]cid.ID, error)
|
|
}
|
|
}
|
|
|
|
func (x *morphContainerReader) Get(id cid.ID) (*containerCore.Container, error) {
|
|
return x.src.Get(id)
|
|
}
|
|
|
|
func (x *morphContainerReader) DeletionInfo(id cid.ID) (*containerCore.DelInfo, error) {
|
|
return x.src.DeletionInfo(id)
|
|
}
|
|
|
|
func (x *morphContainerReader) GetEACL(id cid.ID) (*containerCore.EACL, error) {
|
|
return x.eacl.GetEACL(id)
|
|
}
|
|
|
|
func (x *morphContainerReader) ContainersOf(id *user.ID) ([]cid.ID, error) {
|
|
return x.lister.ContainersOf(id)
|
|
}
|
|
|
|
type morphContainerWriter struct {
|
|
neoClient *cntClient.Client
|
|
|
|
cacheEnabled bool
|
|
eacls ttlEACLStorage
|
|
}
|
|
|
|
func (m morphContainerWriter) Put(cnr containerCore.Container) (*cid.ID, error) {
|
|
return cntClient.Put(m.neoClient, cnr)
|
|
}
|
|
|
|
func (m morphContainerWriter) Delete(witness containerCore.RemovalWitness) error {
|
|
return cntClient.Delete(m.neoClient, witness)
|
|
}
|
|
|
|
func (m morphContainerWriter) PutEACL(eaclInfo containerCore.EACL) error {
|
|
err := cntClient.PutEACL(m.neoClient, eaclInfo)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if m.cacheEnabled {
|
|
id, _ := eaclInfo.Value.CID()
|
|
m.eacls.InvalidateEACL(id)
|
|
}
|
|
|
|
return nil
|
|
}
|