forked from TrueCloudLab/frostfs-node
28fb8c971c
There is a need to track the results of Object interactions with the remote peers during node's lifetime. Each successful operation should increment the number of satisfactory interactions with the remote peer, and the failed ones, on the contrary, decrement. Collected numbers of satisfactory interactions are going to be used for calculation of the normalized local trust values according to original EigenTrust algorithm. Implement wrapper over local trust storage that calls `Update` method on it with the result of any object operation. The wrapper provides interface of the remote client constructor. It is used as a `ClientConstructor` component in Object service server of the app. Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
527 lines
14 KiB
Go
527 lines
14 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"crypto/sha256"
|
|
|
|
eaclSDK "github.com/nspcc-dev/neofs-api-go/pkg/acl/eacl"
|
|
"github.com/nspcc-dev/neofs-api-go/pkg/client"
|
|
"github.com/nspcc-dev/neofs-api-go/pkg/container"
|
|
objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object"
|
|
"github.com/nspcc-dev/neofs-api-go/pkg/owner"
|
|
"github.com/nspcc-dev/neofs-api-go/util/signature"
|
|
"github.com/nspcc-dev/neofs-api-go/v2/object"
|
|
objectGRPC "github.com/nspcc-dev/neofs-api-go/v2/object/grpc"
|
|
"github.com/nspcc-dev/neofs-node/pkg/core/netmap"
|
|
objectCore "github.com/nspcc-dev/neofs-node/pkg/core/object"
|
|
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine"
|
|
"github.com/nspcc-dev/neofs-node/pkg/morph/client/container/wrapper"
|
|
"github.com/nspcc-dev/neofs-node/pkg/morph/event"
|
|
"github.com/nspcc-dev/neofs-node/pkg/network"
|
|
"github.com/nspcc-dev/neofs-node/pkg/network/cache"
|
|
objectTransportGRPC "github.com/nspcc-dev/neofs-node/pkg/network/transport/object/grpc"
|
|
objectService "github.com/nspcc-dev/neofs-node/pkg/services/object"
|
|
"github.com/nspcc-dev/neofs-node/pkg/services/object/acl"
|
|
"github.com/nspcc-dev/neofs-node/pkg/services/object/acl/eacl"
|
|
deletesvc "github.com/nspcc-dev/neofs-node/pkg/services/object/delete"
|
|
deletesvcV2 "github.com/nspcc-dev/neofs-node/pkg/services/object/delete/v2"
|
|
getsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/get"
|
|
getsvcV2 "github.com/nspcc-dev/neofs-node/pkg/services/object/get/v2"
|
|
headsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/head"
|
|
putsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/put"
|
|
putsvcV2 "github.com/nspcc-dev/neofs-node/pkg/services/object/put/v2"
|
|
searchsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/search"
|
|
searchsvcV2 "github.com/nspcc-dev/neofs-node/pkg/services/object/search/v2"
|
|
"github.com/nspcc-dev/neofs-node/pkg/services/object/util"
|
|
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/gc"
|
|
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement"
|
|
"github.com/nspcc-dev/neofs-node/pkg/services/policer"
|
|
"github.com/nspcc-dev/neofs-node/pkg/services/replicator"
|
|
"github.com/nspcc-dev/neofs-node/pkg/services/reputation"
|
|
truststorage "github.com/nspcc-dev/neofs-node/pkg/services/reputation/local/storage"
|
|
"github.com/nspcc-dev/neofs-node/pkg/util/logger"
|
|
"github.com/pkg/errors"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
type objectSvc struct {
|
|
put *putsvcV2.Service
|
|
|
|
search *searchsvcV2.Service
|
|
|
|
get *getsvcV2.Service
|
|
|
|
delete *deletesvcV2.Service
|
|
}
|
|
|
|
func (c *cfg) MaxObjectSize() uint64 {
|
|
sz, err := c.cfgNetmap.wrapper.MaxObjectSize()
|
|
if err != nil {
|
|
c.log.Error("could not get max object size value",
|
|
zap.String("error", err.Error()),
|
|
)
|
|
}
|
|
|
|
return sz
|
|
}
|
|
|
|
func (s *objectSvc) Put(ctx context.Context) (objectService.PutObjectStream, error) {
|
|
return s.put.Put(ctx)
|
|
}
|
|
|
|
func (s *objectSvc) Head(ctx context.Context, req *object.HeadRequest) (*object.HeadResponse, error) {
|
|
return s.get.Head(ctx, req)
|
|
}
|
|
|
|
func (s *objectSvc) Search(req *object.SearchRequest, stream objectService.SearchStream) error {
|
|
return s.search.Search(req, stream)
|
|
}
|
|
|
|
func (s *objectSvc) Get(req *object.GetRequest, stream objectService.GetObjectStream) error {
|
|
return s.get.Get(req, stream)
|
|
}
|
|
|
|
func (s *objectSvc) Delete(ctx context.Context, req *object.DeleteRequest) (*object.DeleteResponse, error) {
|
|
return s.delete.Delete(ctx, req)
|
|
}
|
|
|
|
func (s *objectSvc) GetRange(req *object.GetRangeRequest, stream objectService.GetObjectRangeStream) error {
|
|
return s.get.GetRange(req, stream)
|
|
}
|
|
|
|
func (s *objectSvc) GetRangeHash(ctx context.Context, req *object.GetRangeHashRequest) (*object.GetRangeHashResponse, error) {
|
|
return s.get.GetRangeHash(ctx, req)
|
|
}
|
|
|
|
type localObjectRemover struct {
|
|
storage *engine.StorageEngine
|
|
|
|
log *logger.Logger
|
|
}
|
|
|
|
type localObjectInhumer struct {
|
|
storage *engine.StorageEngine
|
|
|
|
log *logger.Logger
|
|
}
|
|
|
|
func (r *localObjectRemover) Delete(addr ...*objectSDK.Address) error {
|
|
_, err := r.storage.Delete(new(engine.DeletePrm).
|
|
WithAddresses(addr...),
|
|
)
|
|
|
|
return err
|
|
}
|
|
|
|
func (r *localObjectInhumer) DeleteObjects(ts *objectSDK.Address, addr ...*objectSDK.Address) {
|
|
prm := new(engine.InhumePrm)
|
|
|
|
for _, a := range addr {
|
|
prm.WithTarget(ts, a)
|
|
|
|
if _, err := r.storage.Inhume(prm); err != nil {
|
|
r.log.Error("could not delete object",
|
|
zap.Stringer("address", a),
|
|
zap.String("error", err.Error()),
|
|
)
|
|
}
|
|
}
|
|
}
|
|
|
|
type delNetInfo struct {
|
|
netmap.State
|
|
|
|
tsLifetime uint64
|
|
}
|
|
|
|
func (i *delNetInfo) TombstoneLifetime() (uint64, error) {
|
|
return i.tsLifetime, nil
|
|
}
|
|
|
|
func initObjectService(c *cfg) {
|
|
ls := c.cfgObject.cfgLocalStorage.localStorage
|
|
keyStorage := util.NewKeyStorage(c.key, c.privateTokenStore)
|
|
nodeOwner := owner.NewID()
|
|
|
|
neo3Wallet, err := owner.NEO3WalletFromPublicKey(&c.key.PublicKey)
|
|
fatalOnErr(err)
|
|
|
|
nodeOwner.SetNeo3Wallet(neo3Wallet)
|
|
|
|
clientCache := cache.NewSDKClientCache(
|
|
client.WithDialTimeout(c.viper.GetDuration(cfgDialTimeout)))
|
|
|
|
clientConstructor := &reputationClientConstructor{
|
|
log: c.log,
|
|
nmSrc: c.cfgObject.netMapStorage,
|
|
netState: c.cfgNetmap.state,
|
|
trustStorage: c.cfgReputation.localTrustStorage,
|
|
basicConstructor: clientCache,
|
|
}
|
|
|
|
objRemover := &localObjectRemover{
|
|
storage: ls,
|
|
log: c.log,
|
|
}
|
|
|
|
objInhumer := &localObjectInhumer{
|
|
storage: ls,
|
|
log: c.log,
|
|
}
|
|
|
|
objGC := gc.New(
|
|
gc.WithLogger(c.log),
|
|
gc.WithRemover(objRemover),
|
|
gc.WithQueueCapacity(c.viper.GetUint32(cfgGCQueueSize)),
|
|
gc.WithSleepInterval(c.viper.GetDuration(cfgGCQueueTick)),
|
|
gc.WithWorkingInterval(c.viper.GetDuration(cfgGCTimeout)),
|
|
)
|
|
|
|
c.workers = append(c.workers, objGC)
|
|
|
|
repl := replicator.New(
|
|
replicator.WithLogger(c.log),
|
|
replicator.WithPutTimeout(
|
|
c.viper.GetDuration(cfgReplicatorPutTimeout),
|
|
),
|
|
replicator.WithLocalStorage(ls),
|
|
replicator.WithRemoteSender(
|
|
putsvc.NewRemoteSender(keyStorage, clientConstructor),
|
|
),
|
|
)
|
|
|
|
c.workers = append(c.workers, repl)
|
|
|
|
ch := make(chan *policer.Task, 1)
|
|
|
|
pol := policer.New(
|
|
policer.WithLogger(c.log),
|
|
policer.WithLocalStorage(ls),
|
|
policer.WithContainerSource(c.cfgObject.cnrStorage),
|
|
policer.WithPlacementBuilder(
|
|
placement.NewNetworkMapSourceBuilder(c.cfgObject.netMapStorage),
|
|
),
|
|
policer.WithWorkScope(
|
|
c.viper.GetInt(cfgPolicerWorkScope),
|
|
),
|
|
policer.WithExpansionRate(
|
|
c.viper.GetInt(cfgPolicerExpRate),
|
|
),
|
|
policer.WithTrigger(ch),
|
|
policer.WithRemoteHeader(
|
|
headsvc.NewRemoteHeader(keyStorage, clientConstructor),
|
|
),
|
|
policer.WithLocalAddressSource(c),
|
|
policer.WithHeadTimeout(
|
|
c.viper.GetDuration(cfgPolicerHeadTimeout),
|
|
),
|
|
policer.WithReplicator(repl),
|
|
policer.WithRedundantCopyCallback(func(addr *objectSDK.Address) {
|
|
_, err := ls.Inhume(new(engine.InhumePrm).MarkAsGarbage(addr))
|
|
if err != nil {
|
|
c.log.Warn("could not inhume mark redundant copy as garbage",
|
|
zap.String("error", err.Error()),
|
|
)
|
|
}
|
|
}),
|
|
)
|
|
|
|
addNewEpochNotificationHandler(c, func(ev event.Event) {
|
|
select {
|
|
case ch <- new(policer.Task):
|
|
case <-c.ctx.Done():
|
|
close(ch)
|
|
default:
|
|
c.log.Info("policer is busy")
|
|
}
|
|
})
|
|
|
|
traverseGen := util.NewTraverserGenerator(c.cfgObject.netMapStorage, c.cfgObject.cnrStorage, c)
|
|
|
|
c.workers = append(c.workers, pol)
|
|
|
|
sPut := putsvc.NewService(
|
|
putsvc.WithKeyStorage(keyStorage),
|
|
putsvc.WithClientConstructor(clientConstructor),
|
|
putsvc.WithMaxSizeSource(c),
|
|
putsvc.WithLocalStorage(ls),
|
|
putsvc.WithContainerSource(c.cfgObject.cnrStorage),
|
|
putsvc.WithNetworkMapSource(c.cfgObject.netMapStorage),
|
|
putsvc.WithLocalAddressSource(c),
|
|
putsvc.WithFormatValidatorOpts(
|
|
objectCore.WithDeleteHandler(objInhumer),
|
|
),
|
|
putsvc.WithNetworkState(c.cfgNetmap.state),
|
|
putsvc.WithWorkerPool(c.cfgObject.pool.put),
|
|
putsvc.WithLogger(c.log),
|
|
)
|
|
|
|
sPutV2 := putsvcV2.NewService(
|
|
putsvcV2.WithInternalService(sPut),
|
|
)
|
|
|
|
sSearch := searchsvc.New(
|
|
searchsvc.WithLogger(c.log),
|
|
searchsvc.WithLocalStorageEngine(ls),
|
|
searchsvc.WithClientConstructor(clientConstructor),
|
|
searchsvc.WithTraverserGenerator(
|
|
traverseGen.WithTraverseOptions(
|
|
placement.WithoutSuccessTracking(),
|
|
),
|
|
),
|
|
searchsvc.WithNetMapSource(c.cfgNetmap.wrapper),
|
|
)
|
|
|
|
sSearchV2 := searchsvcV2.NewService(
|
|
searchsvcV2.WithInternalService(sSearch),
|
|
searchsvcV2.WithKeyStorage(keyStorage),
|
|
)
|
|
|
|
sGet := getsvc.New(
|
|
getsvc.WithLogger(c.log),
|
|
getsvc.WithLocalStorageEngine(ls),
|
|
getsvc.WithClientConstructor(clientConstructor),
|
|
getsvc.WithTraverserGenerator(
|
|
traverseGen.WithTraverseOptions(
|
|
placement.SuccessAfter(1),
|
|
),
|
|
),
|
|
getsvc.WithNetMapSource(c.cfgNetmap.wrapper),
|
|
)
|
|
|
|
sGetV2 := getsvcV2.NewService(
|
|
getsvcV2.WithInternalService(sGet),
|
|
getsvcV2.WithKeyStorage(keyStorage),
|
|
)
|
|
|
|
sDelete := deletesvc.New(
|
|
deletesvc.WithLogger(c.log),
|
|
deletesvc.WithHeadService(sGet),
|
|
deletesvc.WithSearchService(sSearch),
|
|
deletesvc.WithPutService(sPut),
|
|
deletesvc.WithNetworkInfo(&delNetInfo{
|
|
State: c.cfgNetmap.state,
|
|
tsLifetime: c.viper.GetUint64(cfgTombstoneLifetime),
|
|
}),
|
|
)
|
|
|
|
sDeleteV2 := deletesvcV2.NewService(
|
|
deletesvcV2.WithInternalService(sDelete),
|
|
deletesvcV2.WithKeyStorage(keyStorage),
|
|
)
|
|
|
|
// build service pipeline
|
|
// grpc | <metrics> | acl | signature | response | split
|
|
|
|
splitSvc := objectService.NewTransportSplitter(
|
|
c.cfgGRPC.maxChunkSize,
|
|
c.cfgGRPC.maxAddrAmount,
|
|
&objectSvc{
|
|
put: sPutV2,
|
|
search: sSearchV2,
|
|
get: sGetV2,
|
|
delete: sDeleteV2,
|
|
},
|
|
)
|
|
|
|
respSvc := objectService.NewResponseService(
|
|
splitSvc,
|
|
c.respSvc,
|
|
)
|
|
|
|
signSvc := objectService.NewSignService(
|
|
c.key,
|
|
respSvc,
|
|
)
|
|
|
|
aclSvc := acl.New(
|
|
acl.WithSenderClassifier(
|
|
acl.NewSenderClassifier(
|
|
c.log,
|
|
c.cfgNetmap.wrapper,
|
|
c.cfgNetmap.wrapper,
|
|
),
|
|
),
|
|
acl.WithContainerSource(
|
|
c.cfgObject.cnrStorage,
|
|
),
|
|
acl.WithNextService(signSvc),
|
|
acl.WithLocalStorage(ls),
|
|
acl.WithEACLValidatorOptions(
|
|
eacl.WithEACLStorage(newCachedEACLStorage(&morphEACLStorage{
|
|
w: c.cfgObject.cnrClient,
|
|
})),
|
|
eacl.WithLogger(c.log),
|
|
),
|
|
acl.WithNetmapState(c.cfgNetmap.state),
|
|
)
|
|
|
|
var firstSvc objectService.ServiceServer = aclSvc
|
|
if c.metricsCollector != nil {
|
|
firstSvc = objectService.NewMetricCollector(aclSvc, c.metricsCollector)
|
|
}
|
|
|
|
objectGRPC.RegisterObjectServiceServer(c.cfgGRPC.server,
|
|
objectTransportGRPC.New(firstSvc),
|
|
)
|
|
}
|
|
|
|
type morphEACLStorage struct {
|
|
w *wrapper.Wrapper
|
|
}
|
|
|
|
type signedEACLTable eaclSDK.Table
|
|
|
|
func (s *signedEACLTable) ReadSignedData(buf []byte) ([]byte, error) {
|
|
return (*eaclSDK.Table)(s).Marshal(buf)
|
|
}
|
|
|
|
func (s *signedEACLTable) SignedDataSize() int {
|
|
// TODO: add eacl.Table.Size method
|
|
return (*eaclSDK.Table)(s).ToV2().StableSize()
|
|
}
|
|
|
|
func (s *morphEACLStorage) GetEACL(cid *container.ID) (*eaclSDK.Table, error) {
|
|
table, sig, err := s.w.GetEACL(cid)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if err := signature.VerifyDataWithSource(
|
|
(*signedEACLTable)(table),
|
|
func() ([]byte, []byte) {
|
|
return sig.Key(), sig.Sign()
|
|
},
|
|
signature.SignWithRFC6979(),
|
|
); err != nil {
|
|
return nil, errors.Wrap(err, "incorrect signature")
|
|
}
|
|
|
|
return table, nil
|
|
}
|
|
|
|
type reputationClientConstructor struct {
|
|
log *logger.Logger
|
|
|
|
nmSrc netmap.Source
|
|
|
|
netState netmap.State
|
|
|
|
trustStorage *truststorage.Storage
|
|
|
|
basicConstructor interface {
|
|
Get(string) (client.Client, error)
|
|
}
|
|
}
|
|
|
|
type reputationClient struct {
|
|
client.Client
|
|
|
|
prm truststorage.UpdatePrm
|
|
|
|
cons *reputationClientConstructor
|
|
}
|
|
|
|
func (c *reputationClient) submitResult(err error) {
|
|
prm := c.prm
|
|
prm.SetSatisfactory(err == nil)
|
|
prm.SetEpoch(c.cons.netState.CurrentEpoch())
|
|
|
|
c.cons.trustStorage.Update(prm)
|
|
}
|
|
|
|
func (c *reputationClient) PutObject(ctx context.Context, prm *client.PutObjectParams, opts ...client.CallOption) (*objectSDK.ID, error) {
|
|
id, err := c.Client.PutObject(ctx, prm, opts...)
|
|
|
|
c.submitResult(err)
|
|
|
|
return id, err
|
|
}
|
|
|
|
func (c *reputationClient) DeleteObject(ctx context.Context, prm *client.DeleteObjectParams, opts ...client.CallOption) error {
|
|
err := c.Client.DeleteObject(ctx, prm, opts...)
|
|
|
|
c.submitResult(err)
|
|
|
|
return err
|
|
}
|
|
|
|
func (c *reputationClient) GetObject(ctx context.Context, prm *client.GetObjectParams, opts ...client.CallOption) (*objectSDK.Object, error) {
|
|
obj, err := c.Client.GetObject(ctx, prm, opts...)
|
|
|
|
c.submitResult(err)
|
|
|
|
return obj, err
|
|
}
|
|
|
|
func (c *reputationClient) GetObjectHeader(ctx context.Context, prm *client.ObjectHeaderParams, opts ...client.CallOption) (*objectSDK.Object, error) {
|
|
obj, err := c.Client.GetObjectHeader(ctx, prm, opts...)
|
|
|
|
c.submitResult(err)
|
|
|
|
return obj, err
|
|
}
|
|
|
|
func (c *reputationClient) ObjectPayloadRangeData(ctx context.Context, prm *client.RangeDataParams, opts ...client.CallOption) ([]byte, error) {
|
|
rng, err := c.Client.ObjectPayloadRangeData(ctx, prm, opts...)
|
|
|
|
c.submitResult(err)
|
|
|
|
return rng, err
|
|
}
|
|
|
|
func (c *reputationClient) ObjectPayloadRangeSHA256(ctx context.Context, prm *client.RangeChecksumParams, opts ...client.CallOption) ([][sha256.Size]byte, error) {
|
|
hashes, err := c.Client.ObjectPayloadRangeSHA256(ctx, prm, opts...)
|
|
|
|
c.submitResult(err)
|
|
|
|
return hashes, err
|
|
}
|
|
|
|
func (c *reputationClient) ObjectPayloadRangeTZ(ctx context.Context, prm *client.RangeChecksumParams, opts ...client.CallOption) ([][client.TZSize]byte, error) {
|
|
hashes, err := c.Client.ObjectPayloadRangeTZ(ctx, prm, opts...)
|
|
|
|
c.submitResult(err)
|
|
|
|
return hashes, err
|
|
}
|
|
|
|
func (c *reputationClient) SearchObject(ctx context.Context, prm *client.SearchObjectParams, opts ...client.CallOption) ([]*objectSDK.ID, error) {
|
|
ids, err := c.Client.SearchObject(ctx, prm, opts...)
|
|
|
|
c.submitResult(err)
|
|
|
|
return ids, err
|
|
}
|
|
|
|
func (c *reputationClientConstructor) Get(addr string) (client.Client, error) {
|
|
cl, err := c.basicConstructor.Get(addr)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
nm, err := netmap.GetLatestNetworkMap(c.nmSrc)
|
|
if err == nil {
|
|
for i := range nm.Nodes {
|
|
ipAddr, err := network.IPAddrFromMultiaddr(nm.Nodes[i].Address())
|
|
if err == nil {
|
|
if ipAddr == addr {
|
|
prm := truststorage.UpdatePrm{}
|
|
prm.SetPeer(reputation.PeerIDFromBytes(nm.Nodes[i].PublicKey()))
|
|
|
|
return &reputationClient{
|
|
Client: cl,
|
|
prm: prm,
|
|
cons: c,
|
|
}, nil
|
|
}
|
|
}
|
|
}
|
|
} else {
|
|
c.log.Warn("could not get latest network map to overload the client",
|
|
zap.String("error", err.Error()),
|
|
)
|
|
}
|
|
|
|
return cl, nil
|
|
}
|