frostfs-node/cmd/frostfs-node/object.go

546 lines
16 KiB
Go

package main
import (
"context"
"errors"
"fmt"
"net"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
objectGRPC "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object/grpc"
metricsconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/metrics"
policerconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/policer"
replicatorconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/replicator"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
containercore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
morphClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client"
cntClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/container"
nmClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network/cache"
objectTransportGRPC "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network/transport/object/grpc"
objectService "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/acl"
v2 "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/acl/v2"
objectAPE "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/ape"
deletesvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/delete"
deletesvcV2 "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/delete/v2"
getsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get"
getsvcV2 "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get/v2"
headsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/head"
putsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/put"
putsvcV2 "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/put/v2"
searchsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/search"
searchsvcV2 "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/search/v2"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/policer"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
eaclSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/eacl"
netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
"go.uber.org/zap"
"google.golang.org/grpc"
)
type objectSvc struct {
put *putsvcV2.Service
search *searchsvcV2.Service
get *getsvcV2.Service
delete *deletesvcV2.Service
}
func (c *cfg) MaxObjectSize() uint64 {
sz, err := c.cfgNetmap.wrapper.MaxObjectSize()
if err != nil {
c.log.Error(logs.FrostFSNodeCouldNotGetMaxObjectSizeValue,
zap.String("error", err.Error()),
)
}
return sz
}
func (s *objectSvc) Put() (objectService.PutObjectStream, error) {
return s.put.Put()
}
func (s *objectSvc) PutSingle(ctx context.Context, req *object.PutSingleRequest) (*object.PutSingleResponse, error) {
return s.put.PutSingle(ctx, req)
}
func (s *objectSvc) Head(ctx context.Context, req *object.HeadRequest) (*object.HeadResponse, error) {
return s.get.Head(ctx, req)
}
func (s *objectSvc) Search(req *object.SearchRequest, stream objectService.SearchStream) error {
return s.search.Search(req, stream)
}
func (s *objectSvc) Get(req *object.GetRequest, stream objectService.GetObjectStream) error {
return s.get.Get(req, stream)
}
func (s *objectSvc) Delete(ctx context.Context, req *object.DeleteRequest) (*object.DeleteResponse, error) {
return s.delete.Delete(ctx, req)
}
func (s *objectSvc) GetRange(req *object.GetRangeRequest, stream objectService.GetObjectRangeStream) error {
return s.get.GetRange(req, stream)
}
func (s *objectSvc) GetRangeHash(ctx context.Context, req *object.GetRangeHashRequest) (*object.GetRangeHashResponse, error) {
return s.get.GetRangeHash(ctx, req)
}
type delNetInfo struct {
netmap.State
tsLifetime uint64
cfg *cfg
}
func (i *delNetInfo) TombstoneLifetime() (uint64, error) {
return i.tsLifetime, nil
}
// returns node owner ID calculated from configured private key.
//
// Implements method needed for Object.Delete service.
func (i *delNetInfo) LocalNodeID() user.ID {
return i.cfg.ownerIDFromKey
}
type innerRingFetcherWithNotary struct {
sidechain *morphClient.Client
}
func (fn *innerRingFetcherWithNotary) InnerRingKeys() ([][]byte, error) {
keys, err := fn.sidechain.NeoFSAlphabetList()
if err != nil {
return nil, fmt.Errorf("can't get inner ring keys from alphabet role: %w", err)
}
result := make([][]byte, 0, len(keys))
for i := range keys {
result = append(result, keys[i].Bytes())
}
return result, nil
}
type innerRingFetcherWithoutNotary struct {
nm *nmClient.Client
}
func (f *innerRingFetcherWithoutNotary) InnerRingKeys() ([][]byte, error) {
keys, err := f.nm.GetInnerRingList()
if err != nil {
return nil, fmt.Errorf("can't get inner ring keys from netmap contract: %w", err)
}
result := make([][]byte, 0, len(keys))
for i := range keys {
result = append(result, keys[i].Bytes())
}
return result, nil
}
func initObjectService(c *cfg) {
keyStorage := util.NewKeyStorage(&c.key.PrivateKey, c.privateTokenStore, c.cfgNetmap.state)
c.replicator = createReplicator(c, keyStorage, c.bgClientCache)
addPolicer(c, keyStorage, c.bgClientCache)
traverseGen := util.NewTraverserGenerator(c.netMapSource, c.cfgObject.cnrSource, c)
irFetcher := newCachedIRFetcher(createInnerRingFetcher(c))
sPut := createPutSvc(c, keyStorage, &irFetcher)
sPutV2 := createPutSvcV2(sPut, keyStorage)
sSearch := createSearchSvc(c, keyStorage, traverseGen, c.clientCache)
sSearchV2 := createSearchSvcV2(sSearch, keyStorage)
sGet := createGetService(c, keyStorage, traverseGen, c.clientCache)
*c.cfgObject.getSvc = *sGet // need smth better
sGetV2 := createGetServiceV2(c, sGet, keyStorage)
sDelete := createDeleteService(c, keyStorage, sGet, sSearch, sPut)
sDeleteV2 := createDeleteServiceV2(sDelete)
// build service pipeline
// grpc | <metrics> | signature | response | acl | ape | split
splitSvc := createSplitService(c, sPutV2, sGetV2, sSearchV2, sDeleteV2)
apeSvc := createAPEService(c, splitSvc)
aclSvc := createACLServiceV2(c, apeSvc, &irFetcher)
var commonSvc objectService.Common
commonSvc.Init(&c.internals, aclSvc)
respSvc := objectService.NewResponseService(
&commonSvc,
c.respSvc,
)
signSvc := objectService.NewSignService(
&c.key.PrivateKey,
respSvc,
)
c.shared.metricsSvc = objectService.NewMetricCollector(
signSvc, c.metricsCollector.ObjectService(), metricsconfig.Enabled(c.appCfg))
server := objectTransportGRPC.New(c.shared.metricsSvc)
c.cfgGRPC.performAndSave(func(_ string, _ net.Listener, s *grpc.Server) {
objectGRPC.RegisterObjectServiceServer(s, server)
})
}
func addPolicer(c *cfg, keyStorage *util.KeyStorage, clientConstructor *cache.ClientCache) {
if policerconfig.UnsafeDisable(c.appCfg) {
c.log.Warn(logs.FrostFSNodePolicerIsDisabled)
return
}
ls := c.cfgObject.cfgLocalStorage.localStorage
buryFn := func(ctx context.Context, addr oid.Address) error {
var prm engine.InhumePrm
prm.MarkAsGarbage(addr)
prm.WithForceRemoval()
_, err := ls.Inhume(ctx, prm)
return err
}
remoteHeader := headsvc.NewRemoteHeader(keyStorage, clientConstructor)
pol := policer.New(
policer.WithLogger(c.log),
policer.WithKeySpaceIterator(&keySpaceIterator{ng: ls}),
policer.WithBuryFunc(buryFn),
policer.WithContainerSource(c.cfgObject.cnrSource),
policer.WithPlacementBuilder(
placement.NewNetworkMapSourceBuilder(c.netMapSource),
),
policer.WithRemoteObjectHeaderFunc(
func(ctx context.Context, ni netmapSDK.NodeInfo, a oid.Address) (*objectSDK.Object, error) {
prm := new(headsvc.RemoteHeadPrm).WithNodeInfo(ni).WithObjectAddress(a)
return remoteHeader.Head(ctx, prm)
},
),
policer.WithNetmapKeys(c),
policer.WithHeadTimeout(
policerconfig.HeadTimeout(c.appCfg),
),
policer.WithReplicator(c.replicator),
policer.WithRedundantCopyCallback(func(ctx context.Context, addr oid.Address) {
var inhumePrm engine.InhumePrm
inhumePrm.MarkAsGarbage(addr)
_, err := ls.Inhume(ctx, inhumePrm)
if err != nil {
c.log.Warn(logs.FrostFSNodeCouldNotInhumeMarkRedundantCopyAsGarbage,
zap.String("error", err.Error()),
)
}
}),
policer.WithPool(c.cfgObject.pool.replication),
policer.WithMetrics(c.metricsCollector.PolicerMetrics()),
)
c.workers = append(c.workers, worker{
fn: func(ctx context.Context) {
pol.Run(ctx)
},
})
}
func createInnerRingFetcher(c *cfg) v2.InnerRingFetcher {
if c.cfgMorph.client.ProbeNotary() {
return &innerRingFetcherWithNotary{
sidechain: c.cfgMorph.client,
}
}
return &innerRingFetcherWithoutNotary{
nm: c.cfgNetmap.wrapper,
}
}
func createReplicator(c *cfg, keyStorage *util.KeyStorage, cache *cache.ClientCache) *replicator.Replicator {
ls := c.cfgObject.cfgLocalStorage.localStorage
return replicator.New(
replicator.WithLogger(c.log),
replicator.WithPutTimeout(
replicatorconfig.PutTimeout(c.appCfg),
),
replicator.WithLocalStorage(ls),
replicator.WithRemoteSender(
putsvc.NewRemoteSender(keyStorage, cache),
),
replicator.WithMetrics(c.metricsCollector.Replicator()),
)
}
func createPutSvc(c *cfg, keyStorage *util.KeyStorage, irFetcher *cachedIRFetcher) *putsvc.Service {
ls := c.cfgObject.cfgLocalStorage.localStorage
var os putsvc.ObjectStorage = engineWithoutNotifications{
engine: ls,
}
if c.cfgNotifications.enabled {
os = engineWithNotifications{
base: os,
nw: c.cfgNotifications.nw,
ns: c.cfgNetmap.state,
defaultTopic: c.cfgNotifications.defaultTopic,
}
}
return putsvc.NewService(
keyStorage,
c.putClientCache,
newCachedMaxObjectSizeSource(c),
os,
c.cfgObject.cnrSource,
c.netMapSource,
c,
c.cfgNetmap.state,
irFetcher,
putsvc.WithWorkerPools(c.cfgObject.pool.putRemote, c.cfgObject.pool.putLocal),
putsvc.WithLogger(c.log),
putsvc.WithVerifySessionTokenIssuer(!c.cfgObject.skipSessionTokenIssuerVerification),
)
}
func createPutSvcV2(sPut *putsvc.Service, keyStorage *util.KeyStorage) *putsvcV2.Service {
return putsvcV2.NewService(sPut, keyStorage)
}
func createSearchSvc(c *cfg, keyStorage *util.KeyStorage, traverseGen *util.TraverserGenerator, coreConstructor *cache.ClientCache) *searchsvc.Service {
ls := c.cfgObject.cfgLocalStorage.localStorage
return searchsvc.New(
ls,
coreConstructor,
traverseGen.WithTraverseOptions(
placement.WithoutSuccessTracking(),
),
c.netMapSource,
keyStorage,
searchsvc.WithLogger(c.log),
)
}
func createSearchSvcV2(sSearch *searchsvc.Service, keyStorage *util.KeyStorage) *searchsvcV2.Service {
return searchsvcV2.NewService(sSearch, keyStorage)
}
func createGetService(c *cfg, keyStorage *util.KeyStorage, traverseGen *util.TraverserGenerator,
coreConstructor *cache.ClientCache,
) *getsvc.Service {
ls := c.cfgObject.cfgLocalStorage.localStorage
return getsvc.New(
keyStorage,
c.netMapSource,
ls,
traverseGen.WithTraverseOptions(
placement.SuccessAfter(1),
),
coreConstructor,
getsvc.WithLogger(c.log))
}
func createGetServiceV2(c *cfg, sGet *getsvc.Service, keyStorage *util.KeyStorage) *getsvcV2.Service {
return getsvcV2.NewService(
sGet,
keyStorage,
c.clientCache,
c.netMapSource,
c,
c.cfgObject.cnrSource,
getsvcV2.WithLogger(c.log),
)
}
func createDeleteService(c *cfg, keyStorage *util.KeyStorage, sGet *getsvc.Service,
sSearch *searchsvc.Service, sPut *putsvc.Service,
) *deletesvc.Service {
return deletesvc.New(
sGet,
sSearch,
sPut,
&delNetInfo{
State: c.cfgNetmap.state,
tsLifetime: c.cfgObject.tombstoneLifetime,
cfg: c,
},
keyStorage,
deletesvc.WithLogger(c.log),
)
}
func createDeleteServiceV2(sDelete *deletesvc.Service) *deletesvcV2.Service {
return deletesvcV2.NewService(sDelete)
}
func createSplitService(c *cfg, sPutV2 *putsvcV2.Service, sGetV2 *getsvcV2.Service,
sSearchV2 *searchsvcV2.Service, sDeleteV2 *deletesvcV2.Service,
) *objectService.TransportSplitter {
return objectService.NewTransportSplitter(
c.cfgGRPC.maxChunkSize,
c.cfgGRPC.maxAddrAmount,
&objectSvc{
put: sPutV2,
search: sSearchV2,
get: sGetV2,
delete: sDeleteV2,
},
)
}
func createACLServiceV2(c *cfg, apeSvc *objectAPE.Service, irFetcher *cachedIRFetcher) v2.Service {
ls := c.cfgObject.cfgLocalStorage.localStorage
return v2.New(
apeSvc,
c.netMapSource,
irFetcher,
acl.NewChecker(
c.cfgNetmap.state,
c.cfgObject.eaclSource,
eaclSDK.NewValidator(),
ls),
c.cfgObject.cnrSource,
v2.WithLogger(c.log),
)
}
func createAPEService(c *cfg, splitSvc *objectService.TransportSplitter) *objectAPE.Service {
return objectAPE.NewService(
c.log,
objectAPE.NewChecker(
c.cfgObject.cfgAccessPolicyEngine.accessPolicyEngine.chainRouter,
objectAPE.NewStorageEngineHeaderProvider(c.cfgObject.cfgLocalStorage.localStorage),
),
splitSvc,
)
}
type morphEACLFetcher struct {
w *cntClient.Client
}
func (s *morphEACLFetcher) GetEACL(cnr cid.ID) (*containercore.EACL, error) {
eaclInfo, err := s.w.GetEACL(cnr)
if err != nil {
return nil, err
}
binTable, err := eaclInfo.Value.Marshal()
if err != nil {
return nil, fmt.Errorf("marshal eACL table: %w", err)
}
if !eaclInfo.Signature.Verify(binTable) {
// TODO(@cthulhu-rider): #468 use "const" error
return nil, errors.New("invalid signature of the eACL table")
}
return eaclInfo, nil
}
type engineWithNotifications struct {
base putsvc.ObjectStorage
nw notificationWriter
ns netmap.State
defaultTopic string
}
func (e engineWithNotifications) IsLocked(ctx context.Context, address oid.Address) (bool, error) {
return e.base.IsLocked(ctx, address)
}
func (e engineWithNotifications) Delete(ctx context.Context, tombstone oid.Address, toDelete []oid.ID) error {
return e.base.Delete(ctx, tombstone, toDelete)
}
func (e engineWithNotifications) Lock(ctx context.Context, locker oid.Address, toLock []oid.ID) error {
return e.base.Lock(ctx, locker, toLock)
}
func (e engineWithNotifications) Put(ctx context.Context, o *objectSDK.Object) error {
if err := e.base.Put(ctx, o); err != nil {
return err
}
ni, err := o.NotificationInfo()
if err == nil {
if epoch := ni.Epoch(); epoch == 0 || epoch == e.ns.CurrentEpoch() {
topic := ni.Topic()
if topic == "" {
topic = e.defaultTopic
}
e.nw.Notify(topic, objectCore.AddressOf(o))
}
}
return nil
}
type engineWithoutNotifications struct {
engine *engine.StorageEngine
}
func (e engineWithoutNotifications) IsLocked(ctx context.Context, address oid.Address) (bool, error) {
return e.engine.IsLocked(ctx, address)
}
func (e engineWithoutNotifications) Delete(ctx context.Context, tombstone oid.Address, toDelete []oid.ID) error {
var prm engine.InhumePrm
addrs := make([]oid.Address, len(toDelete))
for i := range addrs {
addrs[i].SetContainer(tombstone.Container())
addrs[i].SetObject(toDelete[i])
}
prm.WithTarget(tombstone, addrs...)
_, err := e.engine.Inhume(ctx, prm)
return err
}
func (e engineWithoutNotifications) Lock(ctx context.Context, locker oid.Address, toLock []oid.ID) error {
return e.engine.Lock(ctx, locker.Container(), locker.Object(), toLock)
}
func (e engineWithoutNotifications) Put(ctx context.Context, o *objectSDK.Object) error {
return engine.Put(ctx, e.engine, o)
}