forked from TrueCloudLab/frostfs-node
Move to frostfs-node
Signed-off-by: Pavel Karpy <p.karpy@yadro.com>
This commit is contained in:
parent
42554a9298
commit
923f84722a
934 changed files with 3470 additions and 3451 deletions
684
cmd/frostfs-node/container.go
Normal file
684
cmd/frostfs-node/container.go
Normal file
|
@ -0,0 +1,684 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"crypto/ecdsa"
|
||||
"crypto/sha256"
|
||||
"errors"
|
||||
"fmt"
|
||||
"strconv"
|
||||
|
||||
containerV2 "github.com/TrueCloudLab/frostfs-api-go/v2/container"
|
||||
containerGRPC "github.com/TrueCloudLab/frostfs-api-go/v2/container/grpc"
|
||||
"github.com/TrueCloudLab/frostfs-node/pkg/core/client"
|
||||
containerCore "github.com/TrueCloudLab/frostfs-node/pkg/core/container"
|
||||
netmapCore "github.com/TrueCloudLab/frostfs-node/pkg/core/netmap"
|
||||
"github.com/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
|
||||
cntClient "github.com/TrueCloudLab/frostfs-node/pkg/morph/client/container"
|
||||
"github.com/TrueCloudLab/frostfs-node/pkg/morph/event"
|
||||
containerEvent "github.com/TrueCloudLab/frostfs-node/pkg/morph/event/container"
|
||||
containerTransportGRPC "github.com/TrueCloudLab/frostfs-node/pkg/network/transport/container/grpc"
|
||||
containerService "github.com/TrueCloudLab/frostfs-node/pkg/services/container"
|
||||
loadcontroller "github.com/TrueCloudLab/frostfs-node/pkg/services/container/announcement/load/controller"
|
||||
loadroute "github.com/TrueCloudLab/frostfs-node/pkg/services/container/announcement/load/route"
|
||||
placementrouter "github.com/TrueCloudLab/frostfs-node/pkg/services/container/announcement/load/route/placement"
|
||||
loadstorage "github.com/TrueCloudLab/frostfs-node/pkg/services/container/announcement/load/storage"
|
||||
containerMorph "github.com/TrueCloudLab/frostfs-node/pkg/services/container/morph"
|
||||
"github.com/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||
apiClient "github.com/TrueCloudLab/frostfs-sdk-go/client"
|
||||
containerSDK "github.com/TrueCloudLab/frostfs-sdk-go/container"
|
||||
cid "github.com/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
"github.com/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||
"github.com/TrueCloudLab/frostfs-sdk-go/user"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
const (
|
||||
startEstimationNotifyEvent = "StartEstimation"
|
||||
stopEstimationNotifyEvent = "StopEstimation"
|
||||
)
|
||||
|
||||
func initContainerService(c *cfg) {
|
||||
// container wrapper that tries to invoke notary
|
||||
// requests if chain is configured so
|
||||
wrap, err := cntClient.NewFromMorph(c.cfgMorph.client, c.cfgContainer.scriptHash, 0, cntClient.TryNotary())
|
||||
fatalOnErr(err)
|
||||
|
||||
c.shared.cnrClient = wrap
|
||||
|
||||
// container wrapper that always sends non-notary
|
||||
// requests
|
||||
wrapperNoNotary, err := cntClient.NewFromMorph(c.cfgMorph.client, c.cfgContainer.scriptHash, 0)
|
||||
fatalOnErr(err)
|
||||
|
||||
cnrSrc := cntClient.AsContainerSource(wrap)
|
||||
|
||||
eACLFetcher := &morphEACLFetcher{
|
||||
w: wrap,
|
||||
}
|
||||
|
||||
cnrRdr := new(morphContainerReader)
|
||||
|
||||
cnrWrt := &morphContainerWriter{
|
||||
neoClient: wrap,
|
||||
}
|
||||
|
||||
if c.cfgMorph.cacheTTL <= 0 {
|
||||
c.cfgObject.eaclSource = eACLFetcher
|
||||
cnrRdr.eacl = eACLFetcher
|
||||
c.cfgObject.cnrSource = cnrSrc
|
||||
cnrRdr.get = cnrSrc
|
||||
cnrRdr.lister = wrap
|
||||
} else {
|
||||
// use RPC node as source of Container contract items (with caching)
|
||||
cachedContainerStorage := newCachedContainerStorage(cnrSrc, c.cfgMorph.cacheTTL)
|
||||
cachedEACLStorage := newCachedEACLStorage(eACLFetcher, c.cfgMorph.cacheTTL)
|
||||
cachedContainerLister := newCachedContainerLister(wrap, c.cfgMorph.cacheTTL)
|
||||
|
||||
subscribeToContainerCreation(c, func(e event.Event) {
|
||||
ev := e.(containerEvent.PutSuccess)
|
||||
|
||||
// read owner of the created container in order to update the reading cache.
|
||||
// TODO: use owner directly from the event after neofs-contract#256 will become resolved
|
||||
// but don't forget about the profit of reading the new container and caching it:
|
||||
// creation success are most commonly tracked by polling GET op.
|
||||
cnr, err := cachedContainerStorage.Get(ev.ID)
|
||||
if err == nil {
|
||||
cachedContainerLister.update(cnr.Value.Owner(), ev.ID, true)
|
||||
} else {
|
||||
// unlike removal, we expect successful receive of the container
|
||||
// after successful creation, so logging can be useful
|
||||
c.log.Error("read newly created container after the notification",
|
||||
zap.Stringer("id", ev.ID),
|
||||
zap.Error(err),
|
||||
)
|
||||
}
|
||||
|
||||
c.log.Debug("container creation event's receipt",
|
||||
zap.Stringer("id", ev.ID),
|
||||
)
|
||||
})
|
||||
|
||||
subscribeToContainerRemoval(c, func(e event.Event) {
|
||||
ev := e.(containerEvent.DeleteSuccess)
|
||||
|
||||
// read owner of the removed container in order to update the listing cache.
|
||||
// It's strange to read already removed container, but we can successfully hit
|
||||
// the cache.
|
||||
// TODO: use owner directly from the event after neofs-contract#256 will become resolved
|
||||
cnr, err := cachedContainerStorage.Get(ev.ID)
|
||||
if err == nil {
|
||||
cachedContainerLister.update(cnr.Value.Owner(), ev.ID, false)
|
||||
}
|
||||
|
||||
cachedContainerStorage.handleRemoval(ev.ID)
|
||||
|
||||
c.log.Debug("container removal event's receipt",
|
||||
zap.Stringer("id", ev.ID),
|
||||
)
|
||||
})
|
||||
|
||||
c.cfgObject.eaclSource = cachedEACLStorage
|
||||
c.cfgObject.cnrSource = cachedContainerStorage
|
||||
|
||||
cnrRdr.lister = cachedContainerLister
|
||||
cnrRdr.eacl = c.cfgObject.eaclSource
|
||||
cnrRdr.get = c.cfgObject.cnrSource
|
||||
|
||||
cnrWrt.cacheEnabled = true
|
||||
cnrWrt.lists = cachedContainerLister
|
||||
cnrWrt.eacls = cachedEACLStorage
|
||||
}
|
||||
|
||||
localMetrics := &localStorageLoad{
|
||||
log: c.log,
|
||||
engine: c.cfgObject.cfgLocalStorage.localStorage,
|
||||
}
|
||||
|
||||
pubKey := c.key.PublicKey().Bytes()
|
||||
|
||||
resultWriter := &morphLoadWriter{
|
||||
log: c.log,
|
||||
cnrMorphClient: wrapperNoNotary,
|
||||
key: pubKey,
|
||||
}
|
||||
|
||||
loadAccumulator := loadstorage.New(loadstorage.Prm{})
|
||||
|
||||
loadPlacementBuilder := &loadPlacementBuilder{
|
||||
log: c.log,
|
||||
nmSrc: c.netMapSource,
|
||||
cnrSrc: cnrSrc,
|
||||
}
|
||||
|
||||
routeBuilder := placementrouter.New(placementrouter.Prm{
|
||||
PlacementBuilder: loadPlacementBuilder,
|
||||
})
|
||||
|
||||
loadRouter := loadroute.New(
|
||||
loadroute.Prm{
|
||||
LocalServerInfo: c,
|
||||
RemoteWriterProvider: &remoteLoadAnnounceProvider{
|
||||
key: &c.key.PrivateKey,
|
||||
netmapKeys: c,
|
||||
clientCache: c.bgClientCache,
|
||||
deadEndProvider: loadcontroller.SimpleWriterProvider(loadAccumulator),
|
||||
},
|
||||
Builder: routeBuilder,
|
||||
},
|
||||
loadroute.WithLogger(c.log),
|
||||
)
|
||||
|
||||
ctrl := loadcontroller.New(
|
||||
loadcontroller.Prm{
|
||||
LocalMetrics: loadcontroller.SimpleIteratorProvider(localMetrics),
|
||||
AnnouncementAccumulator: loadcontroller.SimpleIteratorProvider(loadAccumulator),
|
||||
LocalAnnouncementTarget: loadRouter,
|
||||
ResultReceiver: loadcontroller.SimpleWriterProvider(resultWriter),
|
||||
},
|
||||
loadcontroller.WithLogger(c.log),
|
||||
)
|
||||
|
||||
setContainerNotificationParser(c, startEstimationNotifyEvent, containerEvent.ParseStartEstimation)
|
||||
addContainerAsyncNotificationHandler(c, startEstimationNotifyEvent, func(ev event.Event) {
|
||||
ctrl.Start(loadcontroller.StartPrm{
|
||||
Epoch: ev.(containerEvent.StartEstimation).Epoch(),
|
||||
})
|
||||
})
|
||||
|
||||
setContainerNotificationParser(c, stopEstimationNotifyEvent, containerEvent.ParseStopEstimation)
|
||||
addContainerAsyncNotificationHandler(c, stopEstimationNotifyEvent, func(ev event.Event) {
|
||||
ctrl.Stop(loadcontroller.StopPrm{
|
||||
Epoch: ev.(containerEvent.StopEstimation).Epoch(),
|
||||
})
|
||||
})
|
||||
|
||||
server := containerTransportGRPC.New(
|
||||
containerService.NewSignService(
|
||||
&c.key.PrivateKey,
|
||||
containerService.NewResponseService(
|
||||
&usedSpaceService{
|
||||
Server: containerService.NewExecutionService(containerMorph.NewExecutor(cnrRdr, cnrWrt)),
|
||||
loadWriterProvider: loadRouter,
|
||||
loadPlacementBuilder: loadPlacementBuilder,
|
||||
routeBuilder: routeBuilder,
|
||||
cfg: c,
|
||||
},
|
||||
c.respSvc,
|
||||
),
|
||||
),
|
||||
)
|
||||
|
||||
for _, srv := range c.cfgGRPC.servers {
|
||||
containerGRPC.RegisterContainerServiceServer(srv, server)
|
||||
}
|
||||
}
|
||||
|
||||
// addContainerNotificationHandler adds handler that will be executed synchronously.
|
||||
func addContainerNotificationHandler(c *cfg, sTyp string, h event.Handler) {
|
||||
typ := event.TypeFromString(sTyp)
|
||||
|
||||
if c.cfgContainer.subscribers == nil {
|
||||
c.cfgContainer.subscribers = make(map[event.Type][]event.Handler, 1)
|
||||
}
|
||||
|
||||
c.cfgContainer.subscribers[typ] = append(c.cfgContainer.subscribers[typ], h)
|
||||
}
|
||||
|
||||
// addContainerAsyncNotificationHandler adds handler that will be executed asynchronously via container workerPool.
|
||||
func addContainerAsyncNotificationHandler(c *cfg, sTyp string, h event.Handler) {
|
||||
addContainerNotificationHandler(
|
||||
c,
|
||||
sTyp,
|
||||
event.WorkerPoolHandler(
|
||||
c.cfgContainer.workerPool,
|
||||
h,
|
||||
c.log,
|
||||
),
|
||||
)
|
||||
}
|
||||
|
||||
// stores already registered parsers of the notification events thrown by Container contract.
|
||||
// MUST NOT be used concurrently.
|
||||
var mRegisteredParsersContainer = make(map[string]struct{})
|
||||
|
||||
// registers event parser by name once. MUST NOT be called concurrently.
|
||||
func registerEventParserOnceContainer(c *cfg, name string, p event.NotificationParser) {
|
||||
if _, ok := mRegisteredParsersContainer[name]; !ok {
|
||||
setContainerNotificationParser(c, name, p)
|
||||
mRegisteredParsersContainer[name] = struct{}{}
|
||||
}
|
||||
}
|
||||
|
||||
// subscribes to successful container creation. Provided handler is called asynchronously
|
||||
// on corresponding routine pool. MUST NOT be called concurrently with itself and other
|
||||
// similar functions.
|
||||
func subscribeToContainerCreation(c *cfg, h event.Handler) {
|
||||
const eventNameContainerCreated = "PutSuccess"
|
||||
registerEventParserOnceContainer(c, eventNameContainerCreated, containerEvent.ParsePutSuccess)
|
||||
addContainerAsyncNotificationHandler(c, eventNameContainerCreated, h)
|
||||
}
|
||||
|
||||
// like subscribeToContainerCreation but for removal.
|
||||
func subscribeToContainerRemoval(c *cfg, h event.Handler) {
|
||||
const eventNameContainerRemoved = "DeleteSuccess"
|
||||
registerEventParserOnceContainer(c, eventNameContainerRemoved, containerEvent.ParseDeleteSuccess)
|
||||
addContainerAsyncNotificationHandler(c, eventNameContainerRemoved, h)
|
||||
}
|
||||
|
||||
func setContainerNotificationParser(c *cfg, sTyp string, p event.NotificationParser) {
|
||||
typ := event.TypeFromString(sTyp)
|
||||
|
||||
if c.cfgContainer.parsers == nil {
|
||||
c.cfgContainer.parsers = make(map[event.Type]event.NotificationParser, 1)
|
||||
}
|
||||
|
||||
c.cfgContainer.parsers[typ] = p
|
||||
}
|
||||
|
||||
type morphLoadWriter struct {
|
||||
log *logger.Logger
|
||||
|
||||
cnrMorphClient *cntClient.Client
|
||||
|
||||
key []byte
|
||||
}
|
||||
|
||||
func (w *morphLoadWriter) Put(a containerSDK.SizeEstimation) error {
|
||||
w.log.Debug("save used space announcement in contract",
|
||||
zap.Uint64("epoch", a.Epoch()),
|
||||
zap.Stringer("cid", a.Container()),
|
||||
zap.Uint64("size", a.Value()),
|
||||
)
|
||||
|
||||
prm := cntClient.AnnounceLoadPrm{}
|
||||
|
||||
prm.SetAnnouncement(a)
|
||||
prm.SetReporter(w.key)
|
||||
|
||||
return w.cnrMorphClient.AnnounceLoad(prm)
|
||||
}
|
||||
|
||||
func (*morphLoadWriter) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
type nopLoadWriter struct{}
|
||||
|
||||
func (nopLoadWriter) Put(containerSDK.SizeEstimation) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (nopLoadWriter) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
type remoteLoadAnnounceProvider struct {
|
||||
key *ecdsa.PrivateKey
|
||||
|
||||
netmapKeys netmapCore.AnnouncedKeys
|
||||
|
||||
clientCache interface {
|
||||
Get(client.NodeInfo) (client.Client, error)
|
||||
}
|
||||
|
||||
deadEndProvider loadcontroller.WriterProvider
|
||||
}
|
||||
|
||||
func (r *remoteLoadAnnounceProvider) InitRemote(srv loadroute.ServerInfo) (loadcontroller.WriterProvider, error) {
|
||||
if srv == nil {
|
||||
return r.deadEndProvider, nil
|
||||
}
|
||||
|
||||
if r.netmapKeys.IsLocalKey(srv.PublicKey()) {
|
||||
// if local => return no-op writer
|
||||
return loadcontroller.SimpleWriterProvider(new(nopLoadWriter)), nil
|
||||
}
|
||||
|
||||
var info client.NodeInfo
|
||||
|
||||
err := client.NodeInfoFromRawNetmapElement(&info, srv)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("parse client node info: %w", err)
|
||||
}
|
||||
|
||||
c, err := r.clientCache.Get(info)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not initialize API client: %w", err)
|
||||
}
|
||||
|
||||
return &remoteLoadAnnounceWriterProvider{
|
||||
client: c,
|
||||
}, nil
|
||||
}
|
||||
|
||||
type remoteLoadAnnounceWriterProvider struct {
|
||||
client client.Client
|
||||
}
|
||||
|
||||
func (p *remoteLoadAnnounceWriterProvider) InitWriter(ctx context.Context) (loadcontroller.Writer, error) {
|
||||
return &remoteLoadAnnounceWriter{
|
||||
ctx: ctx,
|
||||
client: p.client,
|
||||
}, nil
|
||||
}
|
||||
|
||||
type remoteLoadAnnounceWriter struct {
|
||||
ctx context.Context
|
||||
|
||||
client client.Client
|
||||
|
||||
buf []containerSDK.SizeEstimation
|
||||
}
|
||||
|
||||
func (r *remoteLoadAnnounceWriter) Put(a containerSDK.SizeEstimation) error {
|
||||
r.buf = append(r.buf, a)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *remoteLoadAnnounceWriter) Close() error {
|
||||
var cliPrm apiClient.PrmAnnounceSpace
|
||||
|
||||
cliPrm.SetValues(r.buf)
|
||||
|
||||
_, err := r.client.ContainerAnnounceUsedSpace(r.ctx, cliPrm)
|
||||
return err
|
||||
}
|
||||
|
||||
type loadPlacementBuilder struct {
|
||||
log *logger.Logger
|
||||
|
||||
nmSrc netmapCore.Source
|
||||
|
||||
cnrSrc containerCore.Source
|
||||
}
|
||||
|
||||
func (l *loadPlacementBuilder) BuildPlacement(epoch uint64, cnr cid.ID) ([][]netmap.NodeInfo, error) {
|
||||
cnrNodes, nm, err := l.buildPlacement(epoch, cnr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
const pivotPrefix = "load_announcement_"
|
||||
|
||||
pivot := []byte(
|
||||
pivotPrefix + strconv.FormatUint(epoch, 10),
|
||||
)
|
||||
|
||||
placement, err := nm.PlacementVectors(cnrNodes, pivot)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not build placement vectors: %w", err)
|
||||
}
|
||||
|
||||
return placement, nil
|
||||
}
|
||||
|
||||
func (l *loadPlacementBuilder) buildPlacement(epoch uint64, idCnr cid.ID) ([][]netmap.NodeInfo, *netmap.NetMap, error) {
|
||||
cnr, err := l.cnrSrc.Get(idCnr)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
nm, err := l.nmSrc.GetNetMapByEpoch(epoch)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("could not get network map: %w", err)
|
||||
}
|
||||
|
||||
binCnr := make([]byte, sha256.Size)
|
||||
idCnr.Encode(binCnr)
|
||||
|
||||
cnrNodes, err := nm.ContainerNodes(cnr.Value.PlacementPolicy(), binCnr)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("could not build container nodes: %w", err)
|
||||
}
|
||||
|
||||
return cnrNodes, nm, nil
|
||||
}
|
||||
|
||||
type localStorageLoad struct {
|
||||
log *logger.Logger
|
||||
|
||||
engine *engine.StorageEngine
|
||||
}
|
||||
|
||||
func (d *localStorageLoad) Iterate(f loadcontroller.UsedSpaceFilter, h loadcontroller.UsedSpaceHandler) error {
|
||||
idList, err := engine.ListContainers(d.engine)
|
||||
if err != nil {
|
||||
return fmt.Errorf("list containers on engine failure: %w", err)
|
||||
}
|
||||
|
||||
for i := range idList {
|
||||
sz, err := engine.ContainerSize(d.engine, idList[i])
|
||||
if err != nil {
|
||||
d.log.Debug("failed to calculate container size in storage engine",
|
||||
zap.Stringer("cid", idList[i]),
|
||||
zap.String("error", err.Error()),
|
||||
)
|
||||
|
||||
continue
|
||||
}
|
||||
|
||||
d.log.Debug("container size in storage engine calculated successfully",
|
||||
zap.Uint64("size", sz),
|
||||
zap.Stringer("cid", idList[i]),
|
||||
)
|
||||
|
||||
var a containerSDK.SizeEstimation
|
||||
a.SetContainer(idList[i])
|
||||
a.SetValue(sz)
|
||||
|
||||
if f != nil && !f(a) {
|
||||
continue
|
||||
}
|
||||
|
||||
if err := h(a); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
type usedSpaceService struct {
|
||||
containerService.Server
|
||||
|
||||
loadWriterProvider loadcontroller.WriterProvider
|
||||
|
||||
loadPlacementBuilder *loadPlacementBuilder
|
||||
|
||||
routeBuilder loadroute.Builder
|
||||
|
||||
cfg *cfg
|
||||
}
|
||||
|
||||
func (c *cfg) PublicKey() []byte {
|
||||
return nodeKeyFromNetmap(c)
|
||||
}
|
||||
|
||||
func (c *cfg) IsLocalKey(key []byte) bool {
|
||||
return bytes.Equal(key, c.PublicKey())
|
||||
}
|
||||
|
||||
func (c *cfg) IterateAddresses(f func(string) bool) {
|
||||
c.iterateNetworkAddresses(f)
|
||||
}
|
||||
|
||||
func (c *cfg) NumberOfAddresses() int {
|
||||
return c.addressNum()
|
||||
}
|
||||
|
||||
func (c *cfg) ExternalAddresses() []string {
|
||||
return c.cfgNodeInfo.localInfo.ExternalAddresses()
|
||||
}
|
||||
|
||||
func (c *usedSpaceService) PublicKey() []byte {
|
||||
return nodeKeyFromNetmap(c.cfg)
|
||||
}
|
||||
|
||||
func (c *usedSpaceService) IterateAddresses(f func(string) bool) {
|
||||
c.cfg.iterateNetworkAddresses(f)
|
||||
}
|
||||
|
||||
func (c *usedSpaceService) NumberOfAddresses() int {
|
||||
return c.cfg.addressNum()
|
||||
}
|
||||
|
||||
func (c *usedSpaceService) ExternalAddresses() []string {
|
||||
return c.cfg.ExternalAddresses()
|
||||
}
|
||||
|
||||
func (c *usedSpaceService) AnnounceUsedSpace(ctx context.Context, req *containerV2.AnnounceUsedSpaceRequest) (*containerV2.AnnounceUsedSpaceResponse, error) {
|
||||
var passedRoute []loadroute.ServerInfo
|
||||
|
||||
for hdr := req.GetVerificationHeader(); hdr != nil; hdr = hdr.GetOrigin() {
|
||||
passedRoute = append(passedRoute, &containerOnlyKeyRemoteServerInfo{
|
||||
key: hdr.GetBodySignature().GetKey(),
|
||||
})
|
||||
}
|
||||
|
||||
for left, right := 0, len(passedRoute)-1; left < right; left, right = left+1, right-1 {
|
||||
passedRoute[left], passedRoute[right] = passedRoute[right], passedRoute[left]
|
||||
}
|
||||
|
||||
passedRoute = append(passedRoute, c)
|
||||
|
||||
w, err := c.loadWriterProvider.InitWriter(loadroute.NewRouteContext(ctx, passedRoute))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not initialize container's used space writer: %w", err)
|
||||
}
|
||||
|
||||
var est containerSDK.SizeEstimation
|
||||
|
||||
for _, aV2 := range req.GetBody().GetAnnouncements() {
|
||||
err = est.ReadFromV2(aV2)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("invalid size announcement: %w", err)
|
||||
}
|
||||
|
||||
if err := c.processLoadValue(ctx, est, passedRoute, w); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
respBody := new(containerV2.AnnounceUsedSpaceResponseBody)
|
||||
|
||||
resp := new(containerV2.AnnounceUsedSpaceResponse)
|
||||
resp.SetBody(respBody)
|
||||
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
var errNodeOutsideContainer = errors.New("node outside the container")
|
||||
|
||||
type containerOnlyKeyRemoteServerInfo struct {
|
||||
key []byte
|
||||
}
|
||||
|
||||
func (i *containerOnlyKeyRemoteServerInfo) PublicKey() []byte {
|
||||
return i.key
|
||||
}
|
||||
|
||||
func (*containerOnlyKeyRemoteServerInfo) IterateAddresses(func(string) bool) {
|
||||
}
|
||||
|
||||
func (*containerOnlyKeyRemoteServerInfo) NumberOfAddresses() int {
|
||||
return 0
|
||||
}
|
||||
|
||||
func (*containerOnlyKeyRemoteServerInfo) ExternalAddresses() []string {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (l *loadPlacementBuilder) isNodeFromContainerKey(epoch uint64, cnr cid.ID, key []byte) (bool, error) {
|
||||
cnrNodes, _, err := l.buildPlacement(epoch, cnr)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
for i := range cnrNodes {
|
||||
for j := range cnrNodes[i] {
|
||||
if bytes.Equal(cnrNodes[i][j].PublicKey(), key) {
|
||||
return true, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return false, nil
|
||||
}
|
||||
|
||||
func (c *usedSpaceService) processLoadValue(_ context.Context, a containerSDK.SizeEstimation,
|
||||
route []loadroute.ServerInfo, w loadcontroller.Writer) error {
|
||||
fromCnr, err := c.loadPlacementBuilder.isNodeFromContainerKey(a.Epoch(), a.Container(), route[0].PublicKey())
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not verify that the sender belongs to the container: %w", err)
|
||||
} else if !fromCnr {
|
||||
return errNodeOutsideContainer
|
||||
}
|
||||
|
||||
err = loadroute.CheckRoute(c.routeBuilder, a, route)
|
||||
if err != nil {
|
||||
return fmt.Errorf("wrong route of container's used space value: %w", err)
|
||||
}
|
||||
|
||||
err = w.Put(a)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not write container's used space value: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// implements interface required by container service provided by morph executor.
|
||||
type morphContainerReader struct {
|
||||
eacl containerCore.EACLSource
|
||||
|
||||
get containerCore.Source
|
||||
|
||||
lister interface {
|
||||
List(*user.ID) ([]cid.ID, error)
|
||||
}
|
||||
}
|
||||
|
||||
func (x *morphContainerReader) Get(id cid.ID) (*containerCore.Container, error) {
|
||||
return x.get.Get(id)
|
||||
}
|
||||
|
||||
func (x *morphContainerReader) GetEACL(id cid.ID) (*containerCore.EACL, error) {
|
||||
return x.eacl.GetEACL(id)
|
||||
}
|
||||
|
||||
func (x *morphContainerReader) List(id *user.ID) ([]cid.ID, error) {
|
||||
return x.lister.List(id)
|
||||
}
|
||||
|
||||
type morphContainerWriter struct {
|
||||
neoClient *cntClient.Client
|
||||
|
||||
cacheEnabled bool
|
||||
eacls *ttlEACLStorage
|
||||
lists *ttlContainerLister
|
||||
}
|
||||
|
||||
func (m morphContainerWriter) Put(cnr containerCore.Container) (*cid.ID, error) {
|
||||
return cntClient.Put(m.neoClient, cnr)
|
||||
}
|
||||
|
||||
func (m morphContainerWriter) Delete(witness containerCore.RemovalWitness) error {
|
||||
return cntClient.Delete(m.neoClient, witness)
|
||||
}
|
||||
|
||||
func (m morphContainerWriter) PutEACL(eaclInfo containerCore.EACL) error {
|
||||
err := cntClient.PutEACL(m.neoClient, eaclInfo)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if m.cacheEnabled {
|
||||
id, _ := eaclInfo.Value.CID()
|
||||
m.eacls.InvalidateEACL(id)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue