* Added new method for listing containers to container service. It opens stream and sends containers in batches. * Added TransportSplitter wrapper around ExecutionService to split container ID list read from contract in parts that are smaller than grpc max message size. Batch size can be changed in node configuration file (as in example config file). * Changed `container list` implementaion in cli: now ListStream is called by default. Old List is called only if ListStream is not implemented. * Changed `internalclient.ListContainersPrm`.`Account` to `OwnerID` since `client.PrmContainerList`.`Account` was renamed to `OwnerID` in sdk. Signed-off-by: Ekaterina Lebedeva <ekaterina.lebedeva@yadro.com>
250 lines
8.9 KiB
Go
250 lines
8.9 KiB
Go
package main
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"net"
|
|
|
|
containerconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/container"
|
|
morphconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/morph"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/metrics"
|
|
containerCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
|
frostfsidcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/frostfsid"
|
|
cntClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/container"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/frostfsid"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event"
|
|
containerEvent "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event/container"
|
|
containerTransportGRPC "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network/transport/container/grpc"
|
|
containerService "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/container"
|
|
containerMorph "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/container/morph"
|
|
containerGRPC "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/container/grpc"
|
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
|
"git.frostfs.info/TrueCloudLab/policy-engine/pkg/engine"
|
|
"go.uber.org/zap"
|
|
"google.golang.org/grpc"
|
|
)
|
|
|
|
func initContainerService(_ context.Context, c *cfg) {
|
|
// container wrapper that tries to invoke notary
|
|
// requests if chain is configured so
|
|
wrap, err := cntClient.NewFromMorph(c.cfgMorph.client, c.cfgContainer.scriptHash, 0)
|
|
fatalOnErr(err)
|
|
|
|
c.shared.cnrClient = wrap
|
|
|
|
cnrSrc := cntClient.AsContainerSource(wrap)
|
|
|
|
cnrRdr, cnrWrt := configureEACLAndContainerSources(c, wrap, cnrSrc)
|
|
|
|
var frostfsIDSubjectProvider frostfsidcore.SubjectProvider
|
|
frostfsIDSubjectProvider, err = frostfsid.NewFromMorph(c.cfgMorph.client, c.cfgFrostfsID.scriptHash, 0)
|
|
fatalOnErr(err)
|
|
|
|
cacheSize := morphconfig.FrostfsIDCacheSize(c.appCfg)
|
|
if cacheSize > 0 {
|
|
frostfsIDSubjectProvider = newMorphFrostfsIDCache(frostfsIDSubjectProvider, int(cacheSize), c.cfgMorph.cacheTTL, metrics.NewCacheMetrics("frostfs_id"))
|
|
}
|
|
|
|
c.shared.frostfsidClient = frostfsIDSubjectProvider
|
|
c.cfgContainer.containerBatchSize = containerconfig.ContainerBatchSize(c.appCfg)
|
|
|
|
defaultChainRouter := engine.NewDefaultChainRouterWithLocalOverrides(
|
|
c.cfgObject.cfgAccessPolicyEngine.accessPolicyEngine.MorphRuleChainStorage(),
|
|
c.cfgObject.cfgAccessPolicyEngine.accessPolicyEngine.LocalStorage(),
|
|
)
|
|
service := containerService.NewSignService(
|
|
&c.key.PrivateKey,
|
|
containerService.NewAPEServer(defaultChainRouter, cnrRdr,
|
|
newCachedIRFetcher(createInnerRingFetcher(c)), c.netMapSource, c.shared.frostfsidClient,
|
|
containerService.NewSplitterService(
|
|
c.cfgContainer.containerBatchSize, c.respSvc,
|
|
containerService.NewExecutionService(containerMorph.NewExecutor(cnrRdr, cnrWrt), c.respSvc)),
|
|
),
|
|
)
|
|
service = containerService.NewAuditService(service, c.log, c.audit)
|
|
server := containerTransportGRPC.New(service)
|
|
|
|
c.cfgGRPC.performAndSave(func(_ string, _ net.Listener, s *grpc.Server) {
|
|
containerGRPC.RegisterContainerServiceServer(s, server)
|
|
|
|
// TODO(@aarifullin): #1487 remove the dual service support.
|
|
s.RegisterService(frostFSServiceDesc(containerGRPC.ContainerService_ServiceDesc), server)
|
|
})
|
|
|
|
c.cfgObject.cfgLocalStorage.localStorage.SetContainerSource(cnrRdr)
|
|
}
|
|
|
|
func configureEACLAndContainerSources(c *cfg, client *cntClient.Client, cnrSrc containerCore.Source) (*morphContainerReader, *morphContainerWriter) {
|
|
cnrRdr := new(morphContainerReader)
|
|
|
|
cnrWrt := &morphContainerWriter{
|
|
neoClient: client,
|
|
}
|
|
|
|
if c.cfgMorph.cacheTTL <= 0 {
|
|
c.cfgObject.cnrSource = cnrSrc
|
|
cnrRdr.src = cnrSrc
|
|
cnrRdr.lister = client
|
|
} else {
|
|
// use RPC node as source of Container contract items (with caching)
|
|
c.cfgObject.cnrSource = cnrSrc
|
|
if c.cfgMorph.containerCacheSize > 0 {
|
|
containerCache := newCachedContainerStorage(cnrSrc, c.cfgMorph.cacheTTL, c.cfgMorph.containerCacheSize)
|
|
|
|
subscribeToContainerCreation(c, func(ctx context.Context, e event.Event) {
|
|
ev := e.(containerEvent.PutSuccess)
|
|
|
|
// read owner of the created container in order to update the reading cache.
|
|
// TODO: use owner directly from the event after neofs-contract#256 will become resolved
|
|
// but don't forget about the profit of reading the new container and caching it:
|
|
// creation success are most commonly tracked by polling GET op.
|
|
cnr, err := cnrSrc.Get(ev.ID)
|
|
if err == nil {
|
|
containerCache.containerCache.set(ev.ID, cnr, nil)
|
|
} else {
|
|
// unlike removal, we expect successful receive of the container
|
|
// after successful creation, so logging can be useful
|
|
c.log.Error(ctx, logs.FrostFSNodeReadNewlyCreatedContainerAfterTheNotification,
|
|
zap.Stringer("id", ev.ID),
|
|
zap.Error(err),
|
|
)
|
|
}
|
|
|
|
c.log.Debug(ctx, logs.FrostFSNodeContainerCreationEventsReceipt,
|
|
zap.Stringer("id", ev.ID),
|
|
)
|
|
})
|
|
|
|
subscribeToContainerRemoval(c, func(ctx context.Context, e event.Event) {
|
|
ev := e.(containerEvent.DeleteSuccess)
|
|
containerCache.handleRemoval(ev.ID)
|
|
c.log.Debug(ctx, logs.FrostFSNodeContainerRemovalEventsReceipt,
|
|
zap.Stringer("id", ev.ID),
|
|
)
|
|
})
|
|
c.cfgObject.cnrSource = containerCache
|
|
}
|
|
|
|
cnrRdr.lister = client
|
|
cnrRdr.src = c.cfgObject.cnrSource
|
|
}
|
|
|
|
return cnrRdr, cnrWrt
|
|
}
|
|
|
|
// addContainerNotificationHandler adds handler that will be executed synchronously.
|
|
func addContainerNotificationHandler(c *cfg, sTyp string, h event.Handler) {
|
|
typ := event.TypeFromString(sTyp)
|
|
|
|
if c.cfgContainer.subscribers == nil {
|
|
c.cfgContainer.subscribers = make(map[event.Type][]event.Handler, 1)
|
|
}
|
|
|
|
c.cfgContainer.subscribers[typ] = append(c.cfgContainer.subscribers[typ], h)
|
|
}
|
|
|
|
// addContainerAsyncNotificationHandler adds handler that will be executed asynchronously via container workerPool.
|
|
func addContainerAsyncNotificationHandler(c *cfg, sTyp string, h event.Handler) {
|
|
addContainerNotificationHandler(
|
|
c,
|
|
sTyp,
|
|
event.WorkerPoolHandler(
|
|
c.cfgContainer.workerPool,
|
|
h,
|
|
c.log,
|
|
),
|
|
)
|
|
}
|
|
|
|
// stores already registered parsers of the notification events thrown by Container contract.
|
|
// MUST NOT be used concurrently.
|
|
var mRegisteredParsersContainer = make(map[string]struct{})
|
|
|
|
// registers event parser by name once. MUST NOT be called concurrently.
|
|
func registerEventParserOnceContainer(c *cfg, name string, p event.NotificationParser) {
|
|
if _, ok := mRegisteredParsersContainer[name]; !ok {
|
|
setContainerNotificationParser(c, name, p)
|
|
mRegisteredParsersContainer[name] = struct{}{}
|
|
}
|
|
}
|
|
|
|
// subscribes to successful container creation. Provided handler is called asynchronously
|
|
// on corresponding routine pool. MUST NOT be called concurrently with itself and other
|
|
// similar functions.
|
|
func subscribeToContainerCreation(c *cfg, h event.Handler) {
|
|
const eventNameContainerCreated = "PutSuccess"
|
|
registerEventParserOnceContainer(c, eventNameContainerCreated, containerEvent.ParsePutSuccess)
|
|
addContainerAsyncNotificationHandler(c, eventNameContainerCreated, h)
|
|
}
|
|
|
|
// like subscribeToContainerCreation but for removal.
|
|
func subscribeToContainerRemoval(c *cfg, h event.Handler) {
|
|
const eventNameContainerRemoved = "DeleteSuccess"
|
|
registerEventParserOnceContainer(c, eventNameContainerRemoved, containerEvent.ParseDeleteSuccess)
|
|
addContainerAsyncNotificationHandler(c, eventNameContainerRemoved, h)
|
|
}
|
|
|
|
func setContainerNotificationParser(c *cfg, sTyp string, p event.NotificationParser) {
|
|
typ := event.TypeFromString(sTyp)
|
|
|
|
if c.cfgContainer.parsers == nil {
|
|
c.cfgContainer.parsers = make(map[event.Type]event.NotificationParser, 1)
|
|
}
|
|
|
|
c.cfgContainer.parsers[typ] = p
|
|
}
|
|
|
|
func (c *cfg) PublicKey() []byte {
|
|
return nodeKeyFromNetmap(c)
|
|
}
|
|
|
|
func (c *cfg) IsLocalKey(key []byte) bool {
|
|
return bytes.Equal(key, c.PublicKey())
|
|
}
|
|
|
|
func (c *cfg) IterateAddresses(f func(string) bool) {
|
|
c.iterateNetworkAddresses(f)
|
|
}
|
|
|
|
func (c *cfg) NumberOfAddresses() int {
|
|
return c.addressNum()
|
|
}
|
|
|
|
func (c *cfg) ExternalAddresses() []string {
|
|
return c.cfgNodeInfo.localInfo.ExternalAddresses()
|
|
}
|
|
|
|
// implements interface required by container service provided by morph executor.
|
|
type morphContainerReader struct {
|
|
src containerCore.Source
|
|
|
|
lister interface {
|
|
ContainersOf(*user.ID) ([]cid.ID, error)
|
|
}
|
|
}
|
|
|
|
func (x *morphContainerReader) Get(id cid.ID) (*containerCore.Container, error) {
|
|
return x.src.Get(id)
|
|
}
|
|
|
|
func (x *morphContainerReader) DeletionInfo(id cid.ID) (*containerCore.DelInfo, error) {
|
|
return x.src.DeletionInfo(id)
|
|
}
|
|
|
|
func (x *morphContainerReader) ContainersOf(id *user.ID) ([]cid.ID, error) {
|
|
return x.lister.ContainersOf(id)
|
|
}
|
|
|
|
type morphContainerWriter struct {
|
|
neoClient *cntClient.Client
|
|
}
|
|
|
|
func (m morphContainerWriter) Put(ctx context.Context, cnr containerCore.Container) (*cid.ID, error) {
|
|
return cntClient.Put(ctx, m.neoClient, cnr)
|
|
}
|
|
|
|
func (m morphContainerWriter) Delete(ctx context.Context, witness containerCore.RemovalWitness) error {
|
|
return cntClient.Delete(ctx, m.neoClient, witness)
|
|
}
|