package main import ( "context" "crypto/sha256" eaclSDK "github.com/nspcc-dev/neofs-api-go/pkg/acl/eacl" "github.com/nspcc-dev/neofs-api-go/pkg/client" "github.com/nspcc-dev/neofs-api-go/pkg/container" objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object" "github.com/nspcc-dev/neofs-api-go/pkg/owner" "github.com/nspcc-dev/neofs-api-go/util/signature" "github.com/nspcc-dev/neofs-api-go/v2/object" objectGRPC "github.com/nspcc-dev/neofs-api-go/v2/object/grpc" "github.com/nspcc-dev/neofs-node/pkg/core/netmap" objectCore "github.com/nspcc-dev/neofs-node/pkg/core/object" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine" "github.com/nspcc-dev/neofs-node/pkg/morph/client/container/wrapper" "github.com/nspcc-dev/neofs-node/pkg/morph/event" "github.com/nspcc-dev/neofs-node/pkg/network" "github.com/nspcc-dev/neofs-node/pkg/network/cache" objectTransportGRPC "github.com/nspcc-dev/neofs-node/pkg/network/transport/object/grpc" objectService "github.com/nspcc-dev/neofs-node/pkg/services/object" "github.com/nspcc-dev/neofs-node/pkg/services/object/acl" "github.com/nspcc-dev/neofs-node/pkg/services/object/acl/eacl" deletesvc "github.com/nspcc-dev/neofs-node/pkg/services/object/delete" deletesvcV2 "github.com/nspcc-dev/neofs-node/pkg/services/object/delete/v2" getsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/get" getsvcV2 "github.com/nspcc-dev/neofs-node/pkg/services/object/get/v2" headsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/head" putsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/put" putsvcV2 "github.com/nspcc-dev/neofs-node/pkg/services/object/put/v2" searchsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/search" searchsvcV2 "github.com/nspcc-dev/neofs-node/pkg/services/object/search/v2" "github.com/nspcc-dev/neofs-node/pkg/services/object/util" "github.com/nspcc-dev/neofs-node/pkg/services/object_manager/gc" "github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement" "github.com/nspcc-dev/neofs-node/pkg/services/policer" "github.com/nspcc-dev/neofs-node/pkg/services/replicator" "github.com/nspcc-dev/neofs-node/pkg/services/reputation" truststorage "github.com/nspcc-dev/neofs-node/pkg/services/reputation/local/storage" "github.com/nspcc-dev/neofs-node/pkg/util/logger" "github.com/pkg/errors" "go.uber.org/zap" ) type objectSvc struct { put *putsvcV2.Service search *searchsvcV2.Service get *getsvcV2.Service delete *deletesvcV2.Service } func (c *cfg) MaxObjectSize() uint64 { sz, err := c.cfgNetmap.wrapper.MaxObjectSize() if err != nil { c.log.Error("could not get max object size value", zap.String("error", err.Error()), ) } return sz } func (s *objectSvc) Put(ctx context.Context) (objectService.PutObjectStream, error) { return s.put.Put(ctx) } func (s *objectSvc) Head(ctx context.Context, req *object.HeadRequest) (*object.HeadResponse, error) { return s.get.Head(ctx, req) } func (s *objectSvc) Search(req *object.SearchRequest, stream objectService.SearchStream) error { return s.search.Search(req, stream) } func (s *objectSvc) Get(req *object.GetRequest, stream objectService.GetObjectStream) error { return s.get.Get(req, stream) } func (s *objectSvc) Delete(ctx context.Context, req *object.DeleteRequest) (*object.DeleteResponse, error) { return s.delete.Delete(ctx, req) } func (s *objectSvc) GetRange(req *object.GetRangeRequest, stream objectService.GetObjectRangeStream) error { return s.get.GetRange(req, stream) } func (s *objectSvc) GetRangeHash(ctx context.Context, req *object.GetRangeHashRequest) (*object.GetRangeHashResponse, error) { return s.get.GetRangeHash(ctx, req) } type localObjectRemover struct { storage *engine.StorageEngine log *logger.Logger } type localObjectInhumer struct { storage *engine.StorageEngine log *logger.Logger } func (r *localObjectRemover) Delete(addr ...*objectSDK.Address) error { _, err := r.storage.Delete(new(engine.DeletePrm). WithAddresses(addr...), ) return err } func (r *localObjectInhumer) DeleteObjects(ts *objectSDK.Address, addr ...*objectSDK.Address) { prm := new(engine.InhumePrm) for _, a := range addr { prm.WithTarget(ts, a) if _, err := r.storage.Inhume(prm); err != nil { r.log.Error("could not delete object", zap.Stringer("address", a), zap.String("error", err.Error()), ) } } } type delNetInfo struct { netmap.State tsLifetime uint64 } func (i *delNetInfo) TombstoneLifetime() (uint64, error) { return i.tsLifetime, nil } func initObjectService(c *cfg) { ls := c.cfgObject.cfgLocalStorage.localStorage keyStorage := util.NewKeyStorage(c.key, c.privateTokenStore) nodeOwner := owner.NewID() neo3Wallet, err := owner.NEO3WalletFromPublicKey(&c.key.PublicKey) fatalOnErr(err) nodeOwner.SetNeo3Wallet(neo3Wallet) clientCache := cache.NewSDKClientCache( client.WithDialTimeout(c.viper.GetDuration(cfgDialTimeout))) clientConstructor := &reputationClientConstructor{ log: c.log, nmSrc: c.cfgObject.netMapStorage, netState: c.cfgNetmap.state, trustStorage: c.cfgReputation.localTrustStorage, basicConstructor: clientCache, } objRemover := &localObjectRemover{ storage: ls, log: c.log, } objInhumer := &localObjectInhumer{ storage: ls, log: c.log, } objGC := gc.New( gc.WithLogger(c.log), gc.WithRemover(objRemover), gc.WithQueueCapacity(c.viper.GetUint32(cfgGCQueueSize)), gc.WithSleepInterval(c.viper.GetDuration(cfgGCQueueTick)), gc.WithWorkingInterval(c.viper.GetDuration(cfgGCTimeout)), ) c.workers = append(c.workers, objGC) repl := replicator.New( replicator.WithLogger(c.log), replicator.WithPutTimeout( c.viper.GetDuration(cfgReplicatorPutTimeout), ), replicator.WithLocalStorage(ls), replicator.WithRemoteSender( putsvc.NewRemoteSender(keyStorage, clientConstructor), ), ) c.workers = append(c.workers, repl) ch := make(chan *policer.Task, 1) pol := policer.New( policer.WithLogger(c.log), policer.WithLocalStorage(ls), policer.WithContainerSource(c.cfgObject.cnrStorage), policer.WithPlacementBuilder( placement.NewNetworkMapSourceBuilder(c.cfgObject.netMapStorage), ), policer.WithWorkScope( c.viper.GetInt(cfgPolicerWorkScope), ), policer.WithExpansionRate( c.viper.GetInt(cfgPolicerExpRate), ), policer.WithTrigger(ch), policer.WithRemoteHeader( headsvc.NewRemoteHeader(keyStorage, clientConstructor), ), policer.WithLocalAddressSource(c), policer.WithHeadTimeout( c.viper.GetDuration(cfgPolicerHeadTimeout), ), policer.WithReplicator(repl), policer.WithRedundantCopyCallback(func(addr *objectSDK.Address) { _, err := ls.Inhume(new(engine.InhumePrm).MarkAsGarbage(addr)) if err != nil { c.log.Warn("could not inhume mark redundant copy as garbage", zap.String("error", err.Error()), ) } }), ) addNewEpochNotificationHandler(c, func(ev event.Event) { select { case ch <- new(policer.Task): case <-c.ctx.Done(): close(ch) default: c.log.Info("policer is busy") } }) traverseGen := util.NewTraverserGenerator(c.cfgObject.netMapStorage, c.cfgObject.cnrStorage, c) c.workers = append(c.workers, pol) sPut := putsvc.NewService( putsvc.WithKeyStorage(keyStorage), putsvc.WithClientConstructor(clientConstructor), putsvc.WithMaxSizeSource(c), putsvc.WithLocalStorage(ls), putsvc.WithContainerSource(c.cfgObject.cnrStorage), putsvc.WithNetworkMapSource(c.cfgObject.netMapStorage), putsvc.WithLocalAddressSource(c), putsvc.WithFormatValidatorOpts( objectCore.WithDeleteHandler(objInhumer), ), putsvc.WithNetworkState(c.cfgNetmap.state), putsvc.WithWorkerPool(c.cfgObject.pool.put), putsvc.WithLogger(c.log), ) sPutV2 := putsvcV2.NewService( putsvcV2.WithInternalService(sPut), ) sSearch := searchsvc.New( searchsvc.WithLogger(c.log), searchsvc.WithLocalStorageEngine(ls), searchsvc.WithClientConstructor(clientConstructor), searchsvc.WithTraverserGenerator( traverseGen.WithTraverseOptions( placement.WithoutSuccessTracking(), ), ), searchsvc.WithNetMapSource(c.cfgNetmap.wrapper), ) sSearchV2 := searchsvcV2.NewService( searchsvcV2.WithInternalService(sSearch), searchsvcV2.WithKeyStorage(keyStorage), ) sGet := getsvc.New( getsvc.WithLogger(c.log), getsvc.WithLocalStorageEngine(ls), getsvc.WithClientConstructor(clientConstructor), getsvc.WithTraverserGenerator( traverseGen.WithTraverseOptions( placement.SuccessAfter(1), ), ), getsvc.WithNetMapSource(c.cfgNetmap.wrapper), ) sGetV2 := getsvcV2.NewService( getsvcV2.WithInternalService(sGet), getsvcV2.WithKeyStorage(keyStorage), ) sDelete := deletesvc.New( deletesvc.WithLogger(c.log), deletesvc.WithHeadService(sGet), deletesvc.WithSearchService(sSearch), deletesvc.WithPutService(sPut), deletesvc.WithNetworkInfo(&delNetInfo{ State: c.cfgNetmap.state, tsLifetime: c.viper.GetUint64(cfgTombstoneLifetime), }), ) sDeleteV2 := deletesvcV2.NewService( deletesvcV2.WithInternalService(sDelete), deletesvcV2.WithKeyStorage(keyStorage), ) // build service pipeline // grpc | | acl | signature | response | split splitSvc := objectService.NewTransportSplitter( c.cfgGRPC.maxChunkSize, c.cfgGRPC.maxAddrAmount, &objectSvc{ put: sPutV2, search: sSearchV2, get: sGetV2, delete: sDeleteV2, }, ) respSvc := objectService.NewResponseService( splitSvc, c.respSvc, ) signSvc := objectService.NewSignService( c.key, respSvc, ) aclSvc := acl.New( acl.WithSenderClassifier( acl.NewSenderClassifier( c.log, c.cfgNetmap.wrapper, c.cfgNetmap.wrapper, ), ), acl.WithContainerSource( c.cfgObject.cnrStorage, ), acl.WithNextService(signSvc), acl.WithLocalStorage(ls), acl.WithEACLValidatorOptions( eacl.WithEACLStorage(newCachedEACLStorage(&morphEACLStorage{ w: c.cfgObject.cnrClient, })), eacl.WithLogger(c.log), ), acl.WithNetmapState(c.cfgNetmap.state), ) var firstSvc objectService.ServiceServer = aclSvc if c.metricsCollector != nil { firstSvc = objectService.NewMetricCollector(aclSvc, c.metricsCollector) } objectGRPC.RegisterObjectServiceServer(c.cfgGRPC.server, objectTransportGRPC.New(firstSvc), ) } type morphEACLStorage struct { w *wrapper.Wrapper } type signedEACLTable eaclSDK.Table func (s *signedEACLTable) ReadSignedData(buf []byte) ([]byte, error) { return (*eaclSDK.Table)(s).Marshal(buf) } func (s *signedEACLTable) SignedDataSize() int { // TODO: add eacl.Table.Size method return (*eaclSDK.Table)(s).ToV2().StableSize() } func (s *morphEACLStorage) GetEACL(cid *container.ID) (*eaclSDK.Table, error) { table, sig, err := s.w.GetEACL(cid) if err != nil { return nil, err } if err := signature.VerifyDataWithSource( (*signedEACLTable)(table), func() ([]byte, []byte) { return sig.Key(), sig.Sign() }, signature.SignWithRFC6979(), ); err != nil { return nil, errors.Wrap(err, "incorrect signature") } return table, nil } type reputationClientConstructor struct { log *logger.Logger nmSrc netmap.Source netState netmap.State trustStorage *truststorage.Storage basicConstructor interface { Get(string) (client.Client, error) } } type reputationClient struct { client.Client prm truststorage.UpdatePrm cons *reputationClientConstructor } func (c *reputationClient) submitResult(err error) { prm := c.prm prm.SetSatisfactory(err == nil) prm.SetEpoch(c.cons.netState.CurrentEpoch()) c.cons.trustStorage.Update(prm) } func (c *reputationClient) PutObject(ctx context.Context, prm *client.PutObjectParams, opts ...client.CallOption) (*objectSDK.ID, error) { id, err := c.Client.PutObject(ctx, prm, opts...) c.submitResult(err) return id, err } func (c *reputationClient) DeleteObject(ctx context.Context, prm *client.DeleteObjectParams, opts ...client.CallOption) error { err := c.Client.DeleteObject(ctx, prm, opts...) c.submitResult(err) return err } func (c *reputationClient) GetObject(ctx context.Context, prm *client.GetObjectParams, opts ...client.CallOption) (*objectSDK.Object, error) { obj, err := c.Client.GetObject(ctx, prm, opts...) c.submitResult(err) return obj, err } func (c *reputationClient) GetObjectHeader(ctx context.Context, prm *client.ObjectHeaderParams, opts ...client.CallOption) (*objectSDK.Object, error) { obj, err := c.Client.GetObjectHeader(ctx, prm, opts...) c.submitResult(err) return obj, err } func (c *reputationClient) ObjectPayloadRangeData(ctx context.Context, prm *client.RangeDataParams, opts ...client.CallOption) ([]byte, error) { rng, err := c.Client.ObjectPayloadRangeData(ctx, prm, opts...) c.submitResult(err) return rng, err } func (c *reputationClient) ObjectPayloadRangeSHA256(ctx context.Context, prm *client.RangeChecksumParams, opts ...client.CallOption) ([][sha256.Size]byte, error) { hashes, err := c.Client.ObjectPayloadRangeSHA256(ctx, prm, opts...) c.submitResult(err) return hashes, err } func (c *reputationClient) ObjectPayloadRangeTZ(ctx context.Context, prm *client.RangeChecksumParams, opts ...client.CallOption) ([][client.TZSize]byte, error) { hashes, err := c.Client.ObjectPayloadRangeTZ(ctx, prm, opts...) c.submitResult(err) return hashes, err } func (c *reputationClient) SearchObject(ctx context.Context, prm *client.SearchObjectParams, opts ...client.CallOption) ([]*objectSDK.ID, error) { ids, err := c.Client.SearchObject(ctx, prm, opts...) c.submitResult(err) return ids, err } func (c *reputationClientConstructor) Get(addr string) (client.Client, error) { cl, err := c.basicConstructor.Get(addr) if err != nil { return nil, err } nm, err := netmap.GetLatestNetworkMap(c.nmSrc) if err == nil { for i := range nm.Nodes { ipAddr, err := network.IPAddrFromMultiaddr(nm.Nodes[i].Address()) if err == nil { if ipAddr == addr { prm := truststorage.UpdatePrm{} prm.SetPeer(reputation.PeerIDFromBytes(nm.Nodes[i].PublicKey())) return &reputationClient{ Client: cl, prm: prm, cons: c, }, nil } } } } else { c.log.Warn("could not get latest network map to overload the client", zap.String("error", err.Error()), ) } return cl, nil }