package main import ( "context" "errors" "fmt" "net" "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object" objectGRPC "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object/grpc" metricsconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/metrics" policerconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/policer" replicatorconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/replicator" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" containercore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap" objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine" morphClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client" cntClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/container" nmClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/netmap" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network/cache" objectTransportGRPC "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network/transport/object/grpc" objectService "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/acl" v2 "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/acl/v2" objectAPE "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/ape" deletesvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/delete" deletesvcV2 "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/delete/v2" getsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get" getsvcV2 "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get/v2" headsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/head" putsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/put" putsvcV2 "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/put/v2" searchsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/search" searchsvcV2 "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/search/v2" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/policer" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" eaclSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/eacl" netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user" "go.uber.org/zap" "google.golang.org/grpc" ) type objectSvc struct { put *putsvcV2.Service search *searchsvcV2.Service get *getsvcV2.Service delete *deletesvcV2.Service } func (c *cfg) MaxObjectSize() uint64 { sz, err := c.cfgNetmap.wrapper.MaxObjectSize() if err != nil { c.log.Error(logs.FrostFSNodeCouldNotGetMaxObjectSizeValue, zap.String("error", err.Error()), ) } return sz } func (s *objectSvc) Put() (objectService.PutObjectStream, error) { return s.put.Put() } func (s *objectSvc) PutSingle(ctx context.Context, req *object.PutSingleRequest) (*object.PutSingleResponse, error) { return s.put.PutSingle(ctx, req) } func (s *objectSvc) Head(ctx context.Context, req *object.HeadRequest) (*object.HeadResponse, error) { return s.get.Head(ctx, req) } func (s *objectSvc) Search(req *object.SearchRequest, stream objectService.SearchStream) error { return s.search.Search(req, stream) } func (s *objectSvc) Get(req *object.GetRequest, stream objectService.GetObjectStream) error { return s.get.Get(req, stream) } func (s *objectSvc) Delete(ctx context.Context, req *object.DeleteRequest) (*object.DeleteResponse, error) { return s.delete.Delete(ctx, req) } func (s *objectSvc) GetRange(req *object.GetRangeRequest, stream objectService.GetObjectRangeStream) error { return s.get.GetRange(req, stream) } func (s *objectSvc) GetRangeHash(ctx context.Context, req *object.GetRangeHashRequest) (*object.GetRangeHashResponse, error) { return s.get.GetRangeHash(ctx, req) } type delNetInfo struct { netmap.State tsLifetime uint64 cfg *cfg } func (i *delNetInfo) TombstoneLifetime() (uint64, error) { return i.tsLifetime, nil } // returns node owner ID calculated from configured private key. // // Implements method needed for Object.Delete service. func (i *delNetInfo) LocalNodeID() user.ID { return i.cfg.ownerIDFromKey } type innerRingFetcherWithNotary struct { sidechain *morphClient.Client } func (fn *innerRingFetcherWithNotary) InnerRingKeys() ([][]byte, error) { keys, err := fn.sidechain.NeoFSAlphabetList() if err != nil { return nil, fmt.Errorf("can't get inner ring keys from alphabet role: %w", err) } result := make([][]byte, 0, len(keys)) for i := range keys { result = append(result, keys[i].Bytes()) } return result, nil } type innerRingFetcherWithoutNotary struct { nm *nmClient.Client } func (f *innerRingFetcherWithoutNotary) InnerRingKeys() ([][]byte, error) { keys, err := f.nm.GetInnerRingList() if err != nil { return nil, fmt.Errorf("can't get inner ring keys from netmap contract: %w", err) } result := make([][]byte, 0, len(keys)) for i := range keys { result = append(result, keys[i].Bytes()) } return result, nil } func initObjectService(c *cfg) { keyStorage := util.NewKeyStorage(&c.key.PrivateKey, c.privateTokenStore, c.cfgNetmap.state) c.replicator = createReplicator(c, keyStorage, c.bgClientCache) addPolicer(c, keyStorage, c.bgClientCache) traverseGen := util.NewTraverserGenerator(c.netMapSource, c.cfgObject.cnrSource, c) irFetcher := newCachedIRFetcher(createInnerRingFetcher(c)) sPut := createPutSvc(c, keyStorage, &irFetcher) sPutV2 := createPutSvcV2(sPut, keyStorage) sSearch := createSearchSvc(c, keyStorage, traverseGen, c.clientCache) sSearchV2 := createSearchSvcV2(sSearch, keyStorage) sGet := createGetService(c, keyStorage, traverseGen, c.clientCache) *c.cfgObject.getSvc = *sGet // need smth better sGetV2 := createGetServiceV2(c, sGet, keyStorage) sDelete := createDeleteService(c, keyStorage, sGet, sSearch, sPut) sDeleteV2 := createDeleteServiceV2(sDelete) // build service pipeline // grpc | | signature | response | acl | ape | split splitSvc := createSplitService(c, sPutV2, sGetV2, sSearchV2, sDeleteV2) apeSvc := createAPEService(c, splitSvc) aclSvc := createACLServiceV2(c, apeSvc, &irFetcher) var commonSvc objectService.Common commonSvc.Init(&c.internals, aclSvc) respSvc := objectService.NewResponseService( &commonSvc, c.respSvc, ) signSvc := objectService.NewSignService( &c.key.PrivateKey, respSvc, ) c.shared.metricsSvc = objectService.NewMetricCollector( signSvc, c.metricsCollector.ObjectService(), metricsconfig.Enabled(c.appCfg)) server := objectTransportGRPC.New(c.shared.metricsSvc) c.cfgGRPC.performAndSave(func(_ string, _ net.Listener, s *grpc.Server) { objectGRPC.RegisterObjectServiceServer(s, server) }) } func addPolicer(c *cfg, keyStorage *util.KeyStorage, clientConstructor *cache.ClientCache) { if policerconfig.UnsafeDisable(c.appCfg) { c.log.Warn(logs.FrostFSNodePolicerIsDisabled) return } ls := c.cfgObject.cfgLocalStorage.localStorage buryFn := func(ctx context.Context, addr oid.Address) error { var prm engine.InhumePrm prm.MarkAsGarbage(addr) prm.WithForceRemoval() _, err := ls.Inhume(ctx, prm) return err } remoteHeader := headsvc.NewRemoteHeader(keyStorage, clientConstructor) pol := policer.New( policer.WithLogger(c.log), policer.WithKeySpaceIterator(&keySpaceIterator{ng: ls}), policer.WithBuryFunc(buryFn), policer.WithContainerSource(c.cfgObject.cnrSource), policer.WithPlacementBuilder( placement.NewNetworkMapSourceBuilder(c.netMapSource), ), policer.WithRemoteObjectHeaderFunc( func(ctx context.Context, ni netmapSDK.NodeInfo, a oid.Address) (*objectSDK.Object, error) { prm := new(headsvc.RemoteHeadPrm).WithNodeInfo(ni).WithObjectAddress(a) return remoteHeader.Head(ctx, prm) }, ), policer.WithNetmapKeys(c), policer.WithHeadTimeout( policerconfig.HeadTimeout(c.appCfg), ), policer.WithReplicator(c.replicator), policer.WithRedundantCopyCallback(func(ctx context.Context, addr oid.Address) { var inhumePrm engine.InhumePrm inhumePrm.MarkAsGarbage(addr) _, err := ls.Inhume(ctx, inhumePrm) if err != nil { c.log.Warn(logs.FrostFSNodeCouldNotInhumeMarkRedundantCopyAsGarbage, zap.String("error", err.Error()), ) } }), policer.WithPool(c.cfgObject.pool.replication), policer.WithMetrics(c.metricsCollector.PolicerMetrics()), ) c.workers = append(c.workers, worker{ fn: func(ctx context.Context) { pol.Run(ctx) }, }) } func createInnerRingFetcher(c *cfg) v2.InnerRingFetcher { if c.cfgMorph.client.ProbeNotary() { return &innerRingFetcherWithNotary{ sidechain: c.cfgMorph.client, } } return &innerRingFetcherWithoutNotary{ nm: c.cfgNetmap.wrapper, } } func createReplicator(c *cfg, keyStorage *util.KeyStorage, cache *cache.ClientCache) *replicator.Replicator { ls := c.cfgObject.cfgLocalStorage.localStorage return replicator.New( replicator.WithLogger(c.log), replicator.WithPutTimeout( replicatorconfig.PutTimeout(c.appCfg), ), replicator.WithLocalStorage(ls), replicator.WithRemoteSender( putsvc.NewRemoteSender(keyStorage, cache), ), replicator.WithMetrics(c.metricsCollector.Replicator()), ) } func createPutSvc(c *cfg, keyStorage *util.KeyStorage, irFetcher *cachedIRFetcher) *putsvc.Service { ls := c.cfgObject.cfgLocalStorage.localStorage var os putsvc.ObjectStorage = engineWithoutNotifications{ engine: ls, } if c.cfgNotifications.enabled { os = engineWithNotifications{ base: os, nw: c.cfgNotifications.nw, ns: c.cfgNetmap.state, defaultTopic: c.cfgNotifications.defaultTopic, } } return putsvc.NewService( keyStorage, c.putClientCache, newCachedMaxObjectSizeSource(c), os, c.cfgObject.cnrSource, c.netMapSource, c, c.cfgNetmap.state, irFetcher, putsvc.WithWorkerPools(c.cfgObject.pool.putRemote, c.cfgObject.pool.putLocal), putsvc.WithLogger(c.log), putsvc.WithVerifySessionTokenIssuer(!c.cfgObject.skipSessionTokenIssuerVerification), ) } func createPutSvcV2(sPut *putsvc.Service, keyStorage *util.KeyStorage) *putsvcV2.Service { return putsvcV2.NewService(sPut, keyStorage) } func createSearchSvc(c *cfg, keyStorage *util.KeyStorage, traverseGen *util.TraverserGenerator, coreConstructor *cache.ClientCache) *searchsvc.Service { ls := c.cfgObject.cfgLocalStorage.localStorage return searchsvc.New( ls, coreConstructor, traverseGen.WithTraverseOptions( placement.WithoutSuccessTracking(), ), c.netMapSource, keyStorage, searchsvc.WithLogger(c.log), ) } func createSearchSvcV2(sSearch *searchsvc.Service, keyStorage *util.KeyStorage) *searchsvcV2.Service { return searchsvcV2.NewService(sSearch, keyStorage) } func createGetService(c *cfg, keyStorage *util.KeyStorage, traverseGen *util.TraverserGenerator, coreConstructor *cache.ClientCache, ) *getsvc.Service { ls := c.cfgObject.cfgLocalStorage.localStorage return getsvc.New( keyStorage, c.netMapSource, ls, traverseGen.WithTraverseOptions( placement.SuccessAfter(1), ), coreConstructor, getsvc.WithLogger(c.log)) } func createGetServiceV2(c *cfg, sGet *getsvc.Service, keyStorage *util.KeyStorage) *getsvcV2.Service { return getsvcV2.NewService( sGet, keyStorage, c.clientCache, c.netMapSource, c, c.cfgObject.cnrSource, getsvcV2.WithLogger(c.log), ) } func createDeleteService(c *cfg, keyStorage *util.KeyStorage, sGet *getsvc.Service, sSearch *searchsvc.Service, sPut *putsvc.Service, ) *deletesvc.Service { return deletesvc.New( sGet, sSearch, sPut, &delNetInfo{ State: c.cfgNetmap.state, tsLifetime: c.cfgObject.tombstoneLifetime, cfg: c, }, keyStorage, deletesvc.WithLogger(c.log), ) } func createDeleteServiceV2(sDelete *deletesvc.Service) *deletesvcV2.Service { return deletesvcV2.NewService(sDelete) } func createSplitService(c *cfg, sPutV2 *putsvcV2.Service, sGetV2 *getsvcV2.Service, sSearchV2 *searchsvcV2.Service, sDeleteV2 *deletesvcV2.Service, ) *objectService.TransportSplitter { return objectService.NewTransportSplitter( c.cfgGRPC.maxChunkSize, c.cfgGRPC.maxAddrAmount, &objectSvc{ put: sPutV2, search: sSearchV2, get: sGetV2, delete: sDeleteV2, }, ) } func createACLServiceV2(c *cfg, apeSvc *objectAPE.Service, irFetcher *cachedIRFetcher) v2.Service { ls := c.cfgObject.cfgLocalStorage.localStorage return v2.New( apeSvc, c.netMapSource, irFetcher, acl.NewChecker( c.cfgNetmap.state, c.cfgObject.eaclSource, eaclSDK.NewValidator(), ls), c.cfgObject.cnrSource, v2.WithLogger(c.log), ) } func createAPEService(c *cfg, splitSvc *objectService.TransportSplitter) *objectAPE.Service { return objectAPE.NewService( c.log, objectAPE.NewChecker( c.cfgObject.cfgAccessPolicyEngine.accessPolicyEngine.chainRouter, objectAPE.NewStorageEngineHeaderProvider(c.cfgObject.cfgLocalStorage.localStorage), ), splitSvc, ) } type morphEACLFetcher struct { w *cntClient.Client } func (s *morphEACLFetcher) GetEACL(cnr cid.ID) (*containercore.EACL, error) { eaclInfo, err := s.w.GetEACL(cnr) if err != nil { return nil, err } binTable, err := eaclInfo.Value.Marshal() if err != nil { return nil, fmt.Errorf("marshal eACL table: %w", err) } if !eaclInfo.Signature.Verify(binTable) { // TODO(@cthulhu-rider): #468 use "const" error return nil, errors.New("invalid signature of the eACL table") } return eaclInfo, nil } type engineWithNotifications struct { base putsvc.ObjectStorage nw notificationWriter ns netmap.State defaultTopic string } func (e engineWithNotifications) IsLocked(ctx context.Context, address oid.Address) (bool, error) { return e.base.IsLocked(ctx, address) } func (e engineWithNotifications) Delete(ctx context.Context, tombstone oid.Address, toDelete []oid.ID) error { return e.base.Delete(ctx, tombstone, toDelete) } func (e engineWithNotifications) Lock(ctx context.Context, locker oid.Address, toLock []oid.ID) error { return e.base.Lock(ctx, locker, toLock) } func (e engineWithNotifications) Put(ctx context.Context, o *objectSDK.Object) error { if err := e.base.Put(ctx, o); err != nil { return err } ni, err := o.NotificationInfo() if err == nil { if epoch := ni.Epoch(); epoch == 0 || epoch == e.ns.CurrentEpoch() { topic := ni.Topic() if topic == "" { topic = e.defaultTopic } e.nw.Notify(topic, objectCore.AddressOf(o)) } } return nil } type engineWithoutNotifications struct { engine *engine.StorageEngine } func (e engineWithoutNotifications) IsLocked(ctx context.Context, address oid.Address) (bool, error) { return e.engine.IsLocked(ctx, address) } func (e engineWithoutNotifications) Delete(ctx context.Context, tombstone oid.Address, toDelete []oid.ID) error { var prm engine.InhumePrm addrs := make([]oid.Address, len(toDelete)) for i := range addrs { addrs[i].SetContainer(tombstone.Container()) addrs[i].SetObject(toDelete[i]) } prm.WithTarget(tombstone, addrs...) _, err := e.engine.Inhume(ctx, prm) return err } func (e engineWithoutNotifications) Lock(ctx context.Context, locker oid.Address, toLock []oid.ID) error { return e.engine.Lock(ctx, locker.Container(), locker.Object(), toLock) } func (e engineWithoutNotifications) Put(ctx context.Context, o *objectSDK.Object) error { return engine.Put(ctx, e.engine, o) }