diff --git a/cmd/neofs-node/object.go b/cmd/neofs-node/object.go index 0f6e19d7..35b9e4e2 100644 --- a/cmd/neofs-node/object.go +++ b/cmd/neofs-node/object.go @@ -2,6 +2,7 @@ package main import ( "context" + "crypto/sha256" eaclSDK "github.com/nspcc-dev/neofs-api-go/pkg/acl/eacl" "github.com/nspcc-dev/neofs-api-go/pkg/client" @@ -16,6 +17,7 @@ import ( "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine" "github.com/nspcc-dev/neofs-node/pkg/morph/client/container/wrapper" "github.com/nspcc-dev/neofs-node/pkg/morph/event" + "github.com/nspcc-dev/neofs-node/pkg/network" "github.com/nspcc-dev/neofs-node/pkg/network/cache" objectTransportGRPC "github.com/nspcc-dev/neofs-node/pkg/network/transport/object/grpc" objectService "github.com/nspcc-dev/neofs-node/pkg/services/object" @@ -35,6 +37,8 @@ import ( "github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement" "github.com/nspcc-dev/neofs-node/pkg/services/policer" "github.com/nspcc-dev/neofs-node/pkg/services/replicator" + "github.com/nspcc-dev/neofs-node/pkg/services/reputation" + truststorage "github.com/nspcc-dev/neofs-node/pkg/services/reputation/local/storage" "github.com/nspcc-dev/neofs-node/pkg/util/logger" "github.com/pkg/errors" "go.uber.org/zap" @@ -147,6 +151,14 @@ func initObjectService(c *cfg) { clientCache := cache.NewSDKClientCache( client.WithDialTimeout(c.viper.GetDuration(cfgDialTimeout))) + clientConstructor := &reputationClientConstructor{ + log: c.log, + nmSrc: c.cfgObject.netMapStorage, + netState: c.cfgNetmap.state, + trustStorage: c.cfgReputation.localTrustStorage, + basicConstructor: clientCache, + } + objRemover := &localObjectRemover{ storage: ls, log: c.log, @@ -174,7 +186,7 @@ func initObjectService(c *cfg) { ), replicator.WithLocalStorage(ls), replicator.WithRemoteSender( - putsvc.NewRemoteSender(keyStorage, clientCache), + putsvc.NewRemoteSender(keyStorage, clientConstructor), ), ) @@ -197,7 +209,7 @@ func initObjectService(c *cfg) { ), policer.WithTrigger(ch), policer.WithRemoteHeader( - headsvc.NewRemoteHeader(keyStorage, clientCache), + headsvc.NewRemoteHeader(keyStorage, clientConstructor), ), policer.WithLocalAddressSource(c), policer.WithHeadTimeout( @@ -230,7 +242,7 @@ func initObjectService(c *cfg) { sPut := putsvc.NewService( putsvc.WithKeyStorage(keyStorage), - putsvc.WithClientConstructor(clientCache), + putsvc.WithClientConstructor(clientConstructor), putsvc.WithMaxSizeSource(c), putsvc.WithLocalStorage(ls), putsvc.WithContainerSource(c.cfgObject.cnrStorage), @@ -251,7 +263,7 @@ func initObjectService(c *cfg) { sSearch := searchsvc.New( searchsvc.WithLogger(c.log), searchsvc.WithLocalStorageEngine(ls), - searchsvc.WithClientConstructor(clientCache), + searchsvc.WithClientConstructor(clientConstructor), searchsvc.WithTraverserGenerator( traverseGen.WithTraverseOptions( placement.WithoutSuccessTracking(), @@ -268,7 +280,7 @@ func initObjectService(c *cfg) { sGet := getsvc.New( getsvc.WithLogger(c.log), getsvc.WithLocalStorageEngine(ls), - getsvc.WithClientConstructor(clientCache), + getsvc.WithClientConstructor(clientConstructor), getsvc.WithTraverserGenerator( traverseGen.WithTraverseOptions( placement.SuccessAfter(1), @@ -387,3 +399,129 @@ func (s *morphEACLStorage) GetEACL(cid *container.ID) (*eaclSDK.Table, error) { return table, nil } + +type reputationClientConstructor struct { + log *logger.Logger + + nmSrc netmap.Source + + netState netmap.State + + trustStorage *truststorage.Storage + + basicConstructor interface { + Get(string) (client.Client, error) + } +} + +type reputationClient struct { + client.Client + + prm truststorage.UpdatePrm + + cons *reputationClientConstructor +} + +func (c *reputationClient) submitResult(err error) { + prm := c.prm + prm.SetSatisfactory(err == nil) + prm.SetEpoch(c.cons.netState.CurrentEpoch()) + + c.cons.trustStorage.Update(prm) +} + +func (c *reputationClient) PutObject(ctx context.Context, prm *client.PutObjectParams, opts ...client.CallOption) (*objectSDK.ID, error) { + id, err := c.Client.PutObject(ctx, prm, opts...) + + c.submitResult(err) + + return id, err +} + +func (c *reputationClient) DeleteObject(ctx context.Context, prm *client.DeleteObjectParams, opts ...client.CallOption) error { + err := c.Client.DeleteObject(ctx, prm, opts...) + + c.submitResult(err) + + return err +} + +func (c *reputationClient) GetObject(ctx context.Context, prm *client.GetObjectParams, opts ...client.CallOption) (*objectSDK.Object, error) { + obj, err := c.Client.GetObject(ctx, prm, opts...) + + c.submitResult(err) + + return obj, err +} + +func (c *reputationClient) GetObjectHeader(ctx context.Context, prm *client.ObjectHeaderParams, opts ...client.CallOption) (*objectSDK.Object, error) { + obj, err := c.Client.GetObjectHeader(ctx, prm, opts...) + + c.submitResult(err) + + return obj, err +} + +func (c *reputationClient) ObjectPayloadRangeData(ctx context.Context, prm *client.RangeDataParams, opts ...client.CallOption) ([]byte, error) { + rng, err := c.Client.ObjectPayloadRangeData(ctx, prm, opts...) + + c.submitResult(err) + + return rng, err +} + +func (c *reputationClient) ObjectPayloadRangeSHA256(ctx context.Context, prm *client.RangeChecksumParams, opts ...client.CallOption) ([][sha256.Size]byte, error) { + hashes, err := c.Client.ObjectPayloadRangeSHA256(ctx, prm, opts...) + + c.submitResult(err) + + return hashes, err +} + +func (c *reputationClient) ObjectPayloadRangeTZ(ctx context.Context, prm *client.RangeChecksumParams, opts ...client.CallOption) ([][client.TZSize]byte, error) { + hashes, err := c.Client.ObjectPayloadRangeTZ(ctx, prm, opts...) + + c.submitResult(err) + + return hashes, err +} + +func (c *reputationClient) SearchObject(ctx context.Context, prm *client.SearchObjectParams, opts ...client.CallOption) ([]*objectSDK.ID, error) { + ids, err := c.Client.SearchObject(ctx, prm, opts...) + + c.submitResult(err) + + return ids, err +} + +func (c *reputationClientConstructor) Get(addr string) (client.Client, error) { + cl, err := c.basicConstructor.Get(addr) + if err != nil { + return nil, err + } + + nm, err := netmap.GetLatestNetworkMap(c.nmSrc) + if err == nil { + for i := range nm.Nodes { + ipAddr, err := network.IPAddrFromMultiaddr(nm.Nodes[i].Address()) + if err == nil { + if ipAddr == addr { + prm := truststorage.UpdatePrm{} + prm.SetPeer(reputation.PeerIDFromBytes(nm.Nodes[i].PublicKey())) + + return &reputationClient{ + Client: cl, + prm: prm, + cons: c, + }, nil + } + } + } + } else { + c.log.Warn("could not get latest network map to overload the client", + zap.String("error", err.Error()), + ) + } + + return cl, nil +}