frostfs-node/cmd/neofs-node/object.go
Leonard Lyubich fdd123ad98 [#607] network: Prevent potential panic in Address.Equal method
Make `Address.Equal` method to accept value instead of pointer in order to
prevent NPE.

Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
2021-06-18 18:09:50 +03:00

525 lines
14 KiB
Go

package main
import (
"context"
"crypto/ecdsa"
"crypto/sha256"
"fmt"
eaclSDK "github.com/nspcc-dev/neofs-api-go/pkg/acl/eacl"
"github.com/nspcc-dev/neofs-api-go/pkg/client"
cid "github.com/nspcc-dev/neofs-api-go/pkg/container/id"
objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object"
"github.com/nspcc-dev/neofs-api-go/pkg/owner"
"github.com/nspcc-dev/neofs-api-go/util/signature"
"github.com/nspcc-dev/neofs-api-go/v2/object"
objectGRPC "github.com/nspcc-dev/neofs-api-go/v2/object/grpc"
apiclientconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/apiclient"
policerconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/policer"
replicatorconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/replicator"
"github.com/nspcc-dev/neofs-node/pkg/core/netmap"
objectCore "github.com/nspcc-dev/neofs-node/pkg/core/object"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine"
morphClient "github.com/nspcc-dev/neofs-node/pkg/morph/client"
"github.com/nspcc-dev/neofs-node/pkg/morph/client/container/wrapper"
"github.com/nspcc-dev/neofs-node/pkg/morph/event"
"github.com/nspcc-dev/neofs-node/pkg/network"
"github.com/nspcc-dev/neofs-node/pkg/network/cache"
objectTransportGRPC "github.com/nspcc-dev/neofs-node/pkg/network/transport/object/grpc"
objectService "github.com/nspcc-dev/neofs-node/pkg/services/object"
"github.com/nspcc-dev/neofs-node/pkg/services/object/acl"
"github.com/nspcc-dev/neofs-node/pkg/services/object/acl/eacl"
deletesvc "github.com/nspcc-dev/neofs-node/pkg/services/object/delete"
deletesvcV2 "github.com/nspcc-dev/neofs-node/pkg/services/object/delete/v2"
getsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/get"
getsvcV2 "github.com/nspcc-dev/neofs-node/pkg/services/object/get/v2"
headsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/head"
putsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/put"
putsvcV2 "github.com/nspcc-dev/neofs-node/pkg/services/object/put/v2"
searchsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/search"
searchsvcV2 "github.com/nspcc-dev/neofs-node/pkg/services/object/search/v2"
"github.com/nspcc-dev/neofs-node/pkg/services/object/util"
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement"
"github.com/nspcc-dev/neofs-node/pkg/services/policer"
"github.com/nspcc-dev/neofs-node/pkg/services/replicator"
"github.com/nspcc-dev/neofs-node/pkg/services/reputation"
truststorage "github.com/nspcc-dev/neofs-node/pkg/services/reputation/local/storage"
"github.com/nspcc-dev/neofs-node/pkg/util/logger"
"go.uber.org/zap"
)
type objectSvc struct {
put *putsvcV2.Service
search *searchsvcV2.Service
get *getsvcV2.Service
delete *deletesvcV2.Service
}
func (c *cfg) MaxObjectSize() uint64 {
sz, err := c.cfgNetmap.wrapper.MaxObjectSize()
if err != nil {
c.log.Error("could not get max object size value",
zap.String("error", err.Error()),
)
}
return sz
}
func (s *objectSvc) Put(ctx context.Context) (objectService.PutObjectStream, error) {
return s.put.Put(ctx)
}
func (s *objectSvc) Head(ctx context.Context, req *object.HeadRequest) (*object.HeadResponse, error) {
return s.get.Head(ctx, req)
}
func (s *objectSvc) Search(req *object.SearchRequest, stream objectService.SearchStream) error {
return s.search.Search(req, stream)
}
func (s *objectSvc) Get(req *object.GetRequest, stream objectService.GetObjectStream) error {
return s.get.Get(req, stream)
}
func (s *objectSvc) Delete(ctx context.Context, req *object.DeleteRequest) (*object.DeleteResponse, error) {
return s.delete.Delete(ctx, req)
}
func (s *objectSvc) GetRange(req *object.GetRangeRequest, stream objectService.GetObjectRangeStream) error {
return s.get.GetRange(req, stream)
}
func (s *objectSvc) GetRangeHash(ctx context.Context, req *object.GetRangeHashRequest) (*object.GetRangeHashResponse, error) {
return s.get.GetRangeHash(ctx, req)
}
type localObjectInhumer struct {
storage *engine.StorageEngine
log *logger.Logger
}
func (r *localObjectInhumer) DeleteObjects(ts *objectSDK.Address, addr ...*objectSDK.Address) {
prm := new(engine.InhumePrm)
for _, a := range addr {
prm.WithTarget(ts, a)
if _, err := r.storage.Inhume(prm); err != nil {
r.log.Error("could not delete object",
zap.Stringer("address", a),
zap.String("error", err.Error()),
)
}
}
}
type delNetInfo struct {
netmap.State
tsLifetime uint64
}
func (i *delNetInfo) TombstoneLifetime() (uint64, error) {
return i.tsLifetime, nil
}
type innerRingFetcher struct {
sidechain *morphClient.Client
}
func (n *innerRingFetcher) InnerRingKeys() ([][]byte, error) {
keys, err := n.sidechain.NeoFSAlphabetList()
if err != nil {
return nil, fmt.Errorf("can't get inner ring keys: %w", err)
}
result := make([][]byte, 0, len(keys))
for i := range keys {
result = append(result, keys[i].Bytes())
}
return result, nil
}
func initObjectService(c *cfg) {
ls := c.cfgObject.cfgLocalStorage.localStorage
keyStorage := util.NewKeyStorage(&c.key.PrivateKey, c.privateTokenStore)
nodeOwner := owner.NewID()
neo3Wallet, err := owner.NEO3WalletFromPublicKey((*ecdsa.PublicKey)(c.key.PublicKey()))
fatalOnErr(err)
nodeOwner.SetNeo3Wallet(neo3Wallet)
clientCache := cache.NewSDKClientCache(
client.WithDialTimeout(apiclientconfig.DialTimeout(c.appCfg)))
c.onShutdown(clientCache.CloseAll)
clientConstructor := &reputationClientConstructor{
log: c.log,
nmSrc: c.cfgObject.netMapStorage,
netState: c.cfgNetmap.state,
trustStorage: c.cfgReputation.localTrustStorage,
basicConstructor: clientCache,
}
irFetcher := &innerRingFetcher{
sidechain: c.cfgMorph.client,
}
objInhumer := &localObjectInhumer{
storage: ls,
log: c.log,
}
repl := replicator.New(
replicator.WithLogger(c.log),
replicator.WithPutTimeout(
replicatorconfig.PutTimeout(c.appCfg),
),
replicator.WithLocalStorage(ls),
replicator.WithRemoteSender(
putsvc.NewRemoteSender(keyStorage, clientConstructor),
),
)
c.workers = append(c.workers, repl)
ch := make(chan *policer.Task, 1)
pol := policer.New(
policer.WithLogger(c.log),
policer.WithLocalStorage(ls),
policer.WithContainerSource(c.cfgObject.cnrStorage),
policer.WithPlacementBuilder(
placement.NewNetworkMapSourceBuilder(c.cfgObject.netMapStorage),
),
policer.WithWorkScope(100),
policer.WithExpansionRate(10),
policer.WithTrigger(ch),
policer.WithRemoteHeader(
headsvc.NewRemoteHeader(keyStorage, clientConstructor),
),
policer.WithLocalAddressSource(c),
policer.WithHeadTimeout(
policerconfig.HeadTimeout(c.appCfg),
),
policer.WithReplicator(repl),
policer.WithRedundantCopyCallback(func(addr *objectSDK.Address) {
_, err := ls.Inhume(new(engine.InhumePrm).MarkAsGarbage(addr))
if err != nil {
c.log.Warn("could not inhume mark redundant copy as garbage",
zap.String("error", err.Error()),
)
}
}),
)
addNewEpochNotificationHandler(c, func(ev event.Event) {
select {
case ch <- new(policer.Task):
case <-c.ctx.Done():
close(ch)
default:
c.log.Info("policer is busy")
}
})
traverseGen := util.NewTraverserGenerator(c.cfgObject.netMapStorage, c.cfgObject.cnrStorage, c)
c.workers = append(c.workers, pol)
sPut := putsvc.NewService(
putsvc.WithKeyStorage(keyStorage),
putsvc.WithClientConstructor(clientConstructor),
putsvc.WithMaxSizeSource(c),
putsvc.WithLocalStorage(ls),
putsvc.WithContainerSource(c.cfgObject.cnrStorage),
putsvc.WithNetworkMapSource(c.cfgObject.netMapStorage),
putsvc.WithLocalAddressSource(c),
putsvc.WithFormatValidatorOpts(
objectCore.WithDeleteHandler(objInhumer),
),
putsvc.WithNetworkState(c.cfgNetmap.state),
putsvc.WithWorkerPool(c.cfgObject.pool.put),
putsvc.WithLogger(c.log),
)
sPutV2 := putsvcV2.NewService(
putsvcV2.WithInternalService(sPut),
putsvcV2.WithKeyStorage(keyStorage),
)
sSearch := searchsvc.New(
searchsvc.WithLogger(c.log),
searchsvc.WithLocalStorageEngine(ls),
searchsvc.WithClientConstructor(clientConstructor),
searchsvc.WithTraverserGenerator(
traverseGen.WithTraverseOptions(
placement.WithoutSuccessTracking(),
),
),
searchsvc.WithNetMapSource(c.cfgNetmap.wrapper),
)
sSearchV2 := searchsvcV2.NewService(
searchsvcV2.WithInternalService(sSearch),
searchsvcV2.WithKeyStorage(keyStorage),
)
sGet := getsvc.New(
getsvc.WithLogger(c.log),
getsvc.WithLocalStorageEngine(ls),
getsvc.WithClientConstructor(clientConstructor),
getsvc.WithTraverserGenerator(
traverseGen.WithTraverseOptions(
placement.SuccessAfter(1),
),
),
getsvc.WithNetMapSource(c.cfgNetmap.wrapper),
)
sGetV2 := getsvcV2.NewService(
getsvcV2.WithInternalService(sGet),
getsvcV2.WithKeyStorage(keyStorage),
)
sDelete := deletesvc.New(
deletesvc.WithLogger(c.log),
deletesvc.WithHeadService(sGet),
deletesvc.WithSearchService(sSearch),
deletesvc.WithPutService(sPut),
deletesvc.WithNetworkInfo(&delNetInfo{
State: c.cfgNetmap.state,
tsLifetime: 5,
}),
)
sDeleteV2 := deletesvcV2.NewService(
deletesvcV2.WithInternalService(sDelete),
deletesvcV2.WithKeyStorage(keyStorage),
)
// build service pipeline
// grpc | <metrics> | acl | signature | response | split
splitSvc := objectService.NewTransportSplitter(
c.cfgGRPC.maxChunkSize,
c.cfgGRPC.maxAddrAmount,
&objectSvc{
put: sPutV2,
search: sSearchV2,
get: sGetV2,
delete: sDeleteV2,
},
)
respSvc := objectService.NewResponseService(
splitSvc,
c.respSvc,
)
signSvc := objectService.NewSignService(
&c.key.PrivateKey,
respSvc,
)
aclSvc := acl.New(
acl.WithSenderClassifier(
acl.NewSenderClassifier(
c.log,
irFetcher,
c.cfgNetmap.wrapper,
),
),
acl.WithContainerSource(
c.cfgObject.cnrStorage,
),
acl.WithNextService(signSvc),
acl.WithLocalStorage(ls),
acl.WithEACLValidatorOptions(
eacl.WithEACLStorage(newCachedEACLStorage(&morphEACLStorage{
w: c.cfgObject.cnrClient,
})),
eacl.WithLogger(c.log),
),
acl.WithNetmapState(c.cfgNetmap.state),
)
var firstSvc objectService.ServiceServer = aclSvc
if c.metricsCollector != nil {
firstSvc = objectService.NewMetricCollector(aclSvc, c.metricsCollector)
}
objectGRPC.RegisterObjectServiceServer(c.cfgGRPC.server,
objectTransportGRPC.New(firstSvc),
)
}
type morphEACLStorage struct {
w *wrapper.Wrapper
}
type signedEACLTable eaclSDK.Table
func (s *signedEACLTable) ReadSignedData(buf []byte) ([]byte, error) {
return (*eaclSDK.Table)(s).Marshal(buf)
}
func (s *signedEACLTable) SignedDataSize() int {
// TODO: add eacl.Table.Size method
return (*eaclSDK.Table)(s).ToV2().StableSize()
}
func (s *morphEACLStorage) GetEACL(cid *cid.ID) (*eaclSDK.Table, error) {
table, err := s.w.GetEACL(cid)
if err != nil {
return nil, err
}
sig := table.Signature()
if err := signature.VerifyDataWithSource(
(*signedEACLTable)(table),
func() ([]byte, []byte) {
return sig.Key(), sig.Sign()
},
signature.SignWithRFC6979(),
); err != nil {
return nil, fmt.Errorf("incorrect signature: %w", err)
}
return table, nil
}
type reputationClientConstructor struct {
log *logger.Logger
nmSrc netmap.Source
netState netmap.State
trustStorage *truststorage.Storage
basicConstructor interface {
Get(*network.Address) (client.Client, error)
}
}
type reputationClient struct {
client.Client
prm truststorage.UpdatePrm
cons *reputationClientConstructor
}
func (c *reputationClient) submitResult(err error) {
prm := c.prm
prm.SetSatisfactory(err == nil)
prm.SetEpoch(c.cons.netState.CurrentEpoch())
c.cons.trustStorage.Update(prm)
}
func (c *reputationClient) PutObject(ctx context.Context, prm *client.PutObjectParams, opts ...client.CallOption) (*objectSDK.ID, error) {
id, err := c.Client.PutObject(ctx, prm, opts...)
c.submitResult(err)
return id, err
}
func (c *reputationClient) DeleteObject(ctx context.Context, prm *client.DeleteObjectParams, opts ...client.CallOption) error {
err := c.Client.DeleteObject(ctx, prm, opts...)
c.submitResult(err)
return err
}
func (c *reputationClient) GetObject(ctx context.Context, prm *client.GetObjectParams, opts ...client.CallOption) (*objectSDK.Object, error) {
obj, err := c.Client.GetObject(ctx, prm, opts...)
c.submitResult(err)
return obj, err
}
func (c *reputationClient) GetObjectHeader(ctx context.Context, prm *client.ObjectHeaderParams, opts ...client.CallOption) (*objectSDK.Object, error) {
obj, err := c.Client.GetObjectHeader(ctx, prm, opts...)
c.submitResult(err)
return obj, err
}
func (c *reputationClient) ObjectPayloadRangeData(ctx context.Context, prm *client.RangeDataParams, opts ...client.CallOption) ([]byte, error) {
rng, err := c.Client.ObjectPayloadRangeData(ctx, prm, opts...)
c.submitResult(err)
return rng, err
}
func (c *reputationClient) ObjectPayloadRangeSHA256(ctx context.Context, prm *client.RangeChecksumParams, opts ...client.CallOption) ([][sha256.Size]byte, error) {
hashes, err := c.Client.ObjectPayloadRangeSHA256(ctx, prm, opts...)
c.submitResult(err)
return hashes, err
}
func (c *reputationClient) ObjectPayloadRangeTZ(ctx context.Context, prm *client.RangeChecksumParams, opts ...client.CallOption) ([][client.TZSize]byte, error) {
hashes, err := c.Client.ObjectPayloadRangeTZ(ctx, prm, opts...)
c.submitResult(err)
return hashes, err
}
func (c *reputationClient) SearchObject(ctx context.Context, prm *client.SearchObjectParams, opts ...client.CallOption) ([]*objectSDK.ID, error) {
ids, err := c.Client.SearchObject(ctx, prm, opts...)
c.submitResult(err)
return ids, err
}
func (c *reputationClientConstructor) Get(addr *network.Address) (client.Client, error) {
cl, err := c.basicConstructor.Get(addr)
if err != nil {
return nil, err
}
nm, err := netmap.GetLatestNetworkMap(c.nmSrc)
if err == nil {
for i := range nm.Nodes {
netAddr, err := network.AddressFromString(nm.Nodes[i].Address())
if err == nil {
if netAddr.Equal(*addr) {
prm := truststorage.UpdatePrm{}
prm.SetPeer(reputation.PeerIDFromBytes(nm.Nodes[i].PublicKey()))
return &reputationClient{
Client: cl,
prm: prm,
cons: c,
}, nil
}
}
}
} else {
c.log.Warn("could not get latest network map to overload the client",
zap.String("error", err.Error()),
)
}
return cl, nil
}