forked from TrueCloudLab/frostfs-node
513 lines
16 KiB
Go
513 lines
16 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"net"
|
|
|
|
metricsconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/metrics"
|
|
policerconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/policer"
|
|
replicatorconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/replicator"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
|
containercore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
|
|
morphClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client"
|
|
nmClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/netmap"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network/cache"
|
|
objectTransportGRPC "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network/transport/object/grpc"
|
|
objectService "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object"
|
|
v2 "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/acl/v2"
|
|
objectAPE "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/ape"
|
|
objectwriter "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/common/writer"
|
|
deletesvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/delete"
|
|
deletesvcV2 "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/delete/v2"
|
|
getsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get"
|
|
getsvcV2 "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get/v2"
|
|
patchsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/patch"
|
|
putsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/put"
|
|
putsvcV2 "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/put/v2"
|
|
searchsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/search"
|
|
searchsvcV2 "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/search/v2"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/policer"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/object"
|
|
objectGRPC "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/object/grpc"
|
|
netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
|
"go.uber.org/zap"
|
|
"google.golang.org/grpc"
|
|
)
|
|
|
|
type objectSvc struct {
|
|
put *putsvcV2.Service
|
|
|
|
search *searchsvcV2.Service
|
|
|
|
get *getsvcV2.Service
|
|
|
|
delete *deletesvcV2.Service
|
|
|
|
patch *patchsvc.Service
|
|
}
|
|
|
|
func (c *cfg) MaxObjectSize() uint64 {
|
|
sz, err := c.cfgNetmap.wrapper.MaxObjectSize()
|
|
if err != nil {
|
|
c.log.Error(context.Background(), logs.FrostFSNodeCouldNotGetMaxObjectSizeValue,
|
|
zap.String("error", err.Error()),
|
|
)
|
|
}
|
|
|
|
return sz
|
|
}
|
|
|
|
func (s *objectSvc) Put(_ context.Context) (objectService.PutObjectStream, error) {
|
|
return s.put.Put()
|
|
}
|
|
|
|
func (s *objectSvc) Patch(_ context.Context) (objectService.PatchObjectStream, error) {
|
|
return s.patch.Patch()
|
|
}
|
|
|
|
func (s *objectSvc) PutSingle(ctx context.Context, req *object.PutSingleRequest) (*object.PutSingleResponse, error) {
|
|
return s.put.PutSingle(ctx, req)
|
|
}
|
|
|
|
func (s *objectSvc) Head(ctx context.Context, req *object.HeadRequest) (*object.HeadResponse, error) {
|
|
return s.get.Head(ctx, req)
|
|
}
|
|
|
|
func (s *objectSvc) Search(req *object.SearchRequest, stream objectService.SearchStream) error {
|
|
return s.search.Search(req, stream)
|
|
}
|
|
|
|
func (s *objectSvc) Get(req *object.GetRequest, stream objectService.GetObjectStream) error {
|
|
return s.get.Get(req, stream)
|
|
}
|
|
|
|
func (s *objectSvc) Delete(ctx context.Context, req *object.DeleteRequest) (*object.DeleteResponse, error) {
|
|
return s.delete.Delete(ctx, req)
|
|
}
|
|
|
|
func (s *objectSvc) GetRange(req *object.GetRangeRequest, stream objectService.GetObjectRangeStream) error {
|
|
return s.get.GetRange(req, stream)
|
|
}
|
|
|
|
func (s *objectSvc) GetRangeHash(ctx context.Context, req *object.GetRangeHashRequest) (*object.GetRangeHashResponse, error) {
|
|
return s.get.GetRangeHash(ctx, req)
|
|
}
|
|
|
|
type delNetInfo struct {
|
|
netmap.State
|
|
|
|
cfg *cfg
|
|
}
|
|
|
|
func (i *delNetInfo) TombstoneLifetime() (uint64, error) {
|
|
return i.cfg.cfgObject.tombstoneLifetime.Load(), nil
|
|
}
|
|
|
|
// LocalNodeID returns node owner ID calculated from configured private key.
|
|
//
|
|
// Implements method needed for Object.Delete service.
|
|
func (i *delNetInfo) LocalNodeID() user.ID {
|
|
return i.cfg.ownerIDFromKey
|
|
}
|
|
|
|
type innerRingFetcherWithNotary struct {
|
|
sidechain *morphClient.Client
|
|
}
|
|
|
|
func (fn *innerRingFetcherWithNotary) InnerRingKeys() ([][]byte, error) {
|
|
keys, err := fn.sidechain.NeoFSAlphabetList()
|
|
if err != nil {
|
|
return nil, fmt.Errorf("can't get inner ring keys from alphabet role: %w", err)
|
|
}
|
|
|
|
result := make([][]byte, 0, len(keys))
|
|
for i := range keys {
|
|
result = append(result, keys[i].Bytes())
|
|
}
|
|
|
|
return result, nil
|
|
}
|
|
|
|
type innerRingFetcherWithoutNotary struct {
|
|
nm *nmClient.Client
|
|
}
|
|
|
|
func (f *innerRingFetcherWithoutNotary) InnerRingKeys() ([][]byte, error) {
|
|
keys, err := f.nm.GetInnerRingList()
|
|
if err != nil {
|
|
return nil, fmt.Errorf("can't get inner ring keys from netmap contract: %w", err)
|
|
}
|
|
|
|
result := make([][]byte, 0, len(keys))
|
|
for i := range keys {
|
|
result = append(result, keys[i].Bytes())
|
|
}
|
|
|
|
return result, nil
|
|
}
|
|
|
|
func initObjectService(c *cfg) {
|
|
keyStorage := util.NewKeyStorage(&c.key.PrivateKey, c.privateTokenStore, c.cfgNetmap.state)
|
|
|
|
c.replicator = createReplicator(c, keyStorage, c.bgClientCache)
|
|
|
|
addPolicer(c, keyStorage, c.bgClientCache)
|
|
|
|
traverseGen := util.NewTraverserGenerator(c.netMapSource, c.cfgObject.cnrSource, c)
|
|
irFetcher := newCachedIRFetcher(createInnerRingFetcher(c))
|
|
|
|
sPut := createPutSvc(c, keyStorage, &irFetcher)
|
|
|
|
sPutV2 := createPutSvcV2(sPut, keyStorage)
|
|
|
|
sSearch := createSearchSvc(c, keyStorage, traverseGen, c.clientCache, c.cfgObject.cnrSource)
|
|
|
|
sSearchV2 := createSearchSvcV2(sSearch, keyStorage)
|
|
|
|
sGet := createGetService(c, keyStorage, traverseGen, c.clientCache, c.cfgObject.cnrSource,
|
|
c.ObjectCfg.priorityMetrics)
|
|
|
|
*c.cfgObject.getSvc = *sGet // need smth better
|
|
|
|
sGetV2 := createGetServiceV2(c, sGet, keyStorage)
|
|
|
|
sDelete := createDeleteService(c, keyStorage, sGet, sSearch, sPut)
|
|
|
|
sDeleteV2 := createDeleteServiceV2(sDelete)
|
|
|
|
sPatch := createPatchSvc(sGet, sPut)
|
|
|
|
// build service pipeline
|
|
// grpc | audit | <metrics> | signature | response | acl | ape | split
|
|
|
|
splitSvc := createSplitService(c, sPutV2, sGetV2, sSearchV2, sDeleteV2, sPatch)
|
|
|
|
apeSvc := createAPEService(c, splitSvc)
|
|
|
|
aclSvc := createACLServiceV2(c, apeSvc, &irFetcher)
|
|
|
|
var commonSvc objectService.Common
|
|
commonSvc.Init(&c.internals, aclSvc)
|
|
|
|
respSvc := objectService.NewResponseService(
|
|
&commonSvc,
|
|
c.respSvc,
|
|
)
|
|
|
|
signSvc := objectService.NewSignService(
|
|
&c.key.PrivateKey,
|
|
respSvc,
|
|
)
|
|
|
|
c.shared.metricsSvc = objectService.NewMetricCollector(
|
|
signSvc, c.metricsCollector.ObjectService(), metricsconfig.Enabled(c.appCfg))
|
|
auditSvc := objectService.NewAuditService(c.shared.metricsSvc, c.log, c.audit)
|
|
server := objectTransportGRPC.New(auditSvc)
|
|
|
|
c.cfgGRPC.performAndSave(func(_ string, _ net.Listener, s *grpc.Server) {
|
|
objectGRPC.RegisterObjectServiceServer(s, server)
|
|
|
|
// TODO(@aarifullin): #1487 remove the dual service support.
|
|
s.RegisterService(frostFSServiceDesc(objectGRPC.ObjectService_ServiceDesc), server)
|
|
})
|
|
}
|
|
|
|
func addPolicer(c *cfg, keyStorage *util.KeyStorage, clientConstructor *cache.ClientCache) {
|
|
if policerconfig.UnsafeDisable(c.appCfg) {
|
|
c.log.Warn(context.Background(), logs.FrostFSNodePolicerIsDisabled)
|
|
return
|
|
}
|
|
|
|
ls := c.cfgObject.cfgLocalStorage.localStorage
|
|
|
|
buryFn := func(ctx context.Context, addr oid.Address) error {
|
|
var prm engine.InhumePrm
|
|
prm.MarkAsGarbage(addr)
|
|
prm.WithForceRemoval()
|
|
|
|
_, err := ls.Inhume(ctx, prm)
|
|
return err
|
|
}
|
|
|
|
remoteReader := objectService.NewRemoteReader(keyStorage, clientConstructor)
|
|
|
|
pol := policer.New(
|
|
policer.WithLogger(c.log),
|
|
policer.WithKeySpaceIterator(&keySpaceIterator{ng: ls}),
|
|
policer.WithBuryFunc(buryFn),
|
|
policer.WithContainerSource(c.cfgObject.cnrSource),
|
|
policer.WithPlacementBuilder(
|
|
placement.NewNetworkMapSourceBuilder(c.netMapSource),
|
|
),
|
|
policer.WithRemoteObjectHeaderFunc(
|
|
func(ctx context.Context, ni netmapSDK.NodeInfo, a oid.Address, raw bool) (*objectSDK.Object, error) {
|
|
prm := new(objectService.RemoteRequestPrm).WithNodeInfo(ni).WithObjectAddress(a).WithRaw(raw)
|
|
return remoteReader.Head(ctx, prm)
|
|
},
|
|
),
|
|
policer.WithLocalObjectHeaderFunc(func(ctx context.Context, a oid.Address) (*objectSDK.Object, error) {
|
|
var prm engine.HeadPrm
|
|
prm.WithAddress(a)
|
|
res, err := c.cfgObject.cfgLocalStorage.localStorage.Head(ctx, prm)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return res.Header(), nil
|
|
}),
|
|
policer.WithRemoteObjectGetFunc(func(ctx context.Context, ni netmapSDK.NodeInfo, a oid.Address) (*objectSDK.Object, error) {
|
|
prm := new(objectService.RemoteRequestPrm).WithNodeInfo(ni).WithObjectAddress(a)
|
|
return remoteReader.Get(ctx, prm)
|
|
}),
|
|
policer.WithLocalObjectGetFunc(func(ctx context.Context, a oid.Address) (*objectSDK.Object, error) {
|
|
var prm engine.GetPrm
|
|
prm.WithAddress(a)
|
|
res, err := c.cfgObject.cfgLocalStorage.localStorage.Get(ctx, prm)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return res.Object(), nil
|
|
}),
|
|
policer.WithNetmapKeys(c),
|
|
policer.WithHeadTimeout(
|
|
policerconfig.HeadTimeout(c.appCfg),
|
|
),
|
|
policer.WithReplicator(c.replicator),
|
|
policer.WithRedundantCopyCallback(func(ctx context.Context, addr oid.Address) {
|
|
var inhumePrm engine.InhumePrm
|
|
inhumePrm.MarkAsGarbage(addr)
|
|
|
|
_, err := ls.Inhume(ctx, inhumePrm)
|
|
if err != nil {
|
|
c.log.Warn(ctx, logs.FrostFSNodeCouldNotInhumeMarkRedundantCopyAsGarbage,
|
|
zap.String("error", err.Error()),
|
|
)
|
|
}
|
|
}),
|
|
policer.WithPool(c.cfgObject.pool.replication),
|
|
policer.WithMetrics(c.metricsCollector.PolicerMetrics()),
|
|
policer.WithKeyStorage(keyStorage),
|
|
)
|
|
|
|
c.workers = append(c.workers, worker{
|
|
fn: func(ctx context.Context) {
|
|
pol.Run(ctx)
|
|
},
|
|
})
|
|
}
|
|
|
|
func createInnerRingFetcher(c *cfg) v2.InnerRingFetcher {
|
|
if c.cfgMorph.client.ProbeNotary() {
|
|
return &innerRingFetcherWithNotary{
|
|
sidechain: c.cfgMorph.client,
|
|
}
|
|
}
|
|
return &innerRingFetcherWithoutNotary{
|
|
nm: c.cfgNetmap.wrapper,
|
|
}
|
|
}
|
|
|
|
func createReplicator(c *cfg, keyStorage *util.KeyStorage, cache *cache.ClientCache) *replicator.Replicator {
|
|
ls := c.cfgObject.cfgLocalStorage.localStorage
|
|
|
|
return replicator.New(
|
|
replicator.WithLogger(c.log),
|
|
replicator.WithPutTimeout(
|
|
replicatorconfig.PutTimeout(c.appCfg),
|
|
),
|
|
replicator.WithLocalStorage(ls),
|
|
replicator.WithRemoteSender(
|
|
objectwriter.NewRemoteSender(keyStorage, cache),
|
|
),
|
|
replicator.WithRemoteGetter(
|
|
getsvc.NewRemoteGetter(c.clientCache, c.netMapSource, keyStorage),
|
|
),
|
|
replicator.WithMetrics(c.metricsCollector.Replicator()),
|
|
)
|
|
}
|
|
|
|
func createPutSvc(c *cfg, keyStorage *util.KeyStorage, irFetcher *cachedIRFetcher) *putsvc.Service {
|
|
ls := c.cfgObject.cfgLocalStorage.localStorage
|
|
|
|
var os objectwriter.ObjectStorage = engineWithoutNotifications{
|
|
engine: ls,
|
|
}
|
|
|
|
return putsvc.NewService(
|
|
keyStorage,
|
|
c.putClientCache,
|
|
newCachedMaxObjectSizeSource(c),
|
|
os,
|
|
c.cfgObject.cnrSource,
|
|
c.netMapSource,
|
|
c,
|
|
c.cfgNetmap.state,
|
|
irFetcher,
|
|
objectwriter.WithWorkerPools(c.cfgObject.pool.putRemote, c.cfgObject.pool.putLocal),
|
|
objectwriter.WithLogger(c.log),
|
|
objectwriter.WithVerifySessionTokenIssuer(!c.cfgObject.skipSessionTokenIssuerVerification),
|
|
)
|
|
}
|
|
|
|
func createPutSvcV2(sPut *putsvc.Service, keyStorage *util.KeyStorage) *putsvcV2.Service {
|
|
return putsvcV2.NewService(sPut, keyStorage)
|
|
}
|
|
|
|
func createPatchSvc(sGet *getsvc.Service, sPut *putsvc.Service) *patchsvc.Service {
|
|
return patchsvc.NewService(sPut.Config, sGet)
|
|
}
|
|
|
|
func createSearchSvc(c *cfg, keyStorage *util.KeyStorage, traverseGen *util.TraverserGenerator, coreConstructor *cache.ClientCache, containerSource containercore.Source) *searchsvc.Service {
|
|
ls := c.cfgObject.cfgLocalStorage.localStorage
|
|
|
|
return searchsvc.New(
|
|
ls,
|
|
coreConstructor,
|
|
traverseGen.WithTraverseOptions(
|
|
placement.WithoutSuccessTracking(),
|
|
),
|
|
c.netMapSource,
|
|
keyStorage,
|
|
containerSource,
|
|
searchsvc.WithLogger(c.log),
|
|
)
|
|
}
|
|
|
|
func createSearchSvcV2(sSearch *searchsvc.Service, keyStorage *util.KeyStorage) *searchsvcV2.Service {
|
|
return searchsvcV2.NewService(sSearch, keyStorage)
|
|
}
|
|
|
|
func createGetService(c *cfg, keyStorage *util.KeyStorage, traverseGen *util.TraverserGenerator,
|
|
coreConstructor *cache.ClientCache,
|
|
containerSource containercore.Source,
|
|
priorityMetrics []placement.Metric,
|
|
) *getsvc.Service {
|
|
ls := c.cfgObject.cfgLocalStorage.localStorage
|
|
|
|
return getsvc.New(
|
|
keyStorage,
|
|
c.netMapSource,
|
|
ls,
|
|
traverseGen.WithTraverseOptions(
|
|
placement.SuccessAfter(1),
|
|
placement.WithPriorityMetrics(priorityMetrics),
|
|
placement.WithNodeState(c),
|
|
),
|
|
coreConstructor,
|
|
containerSource,
|
|
getsvc.WithLogger(c.log))
|
|
}
|
|
|
|
func createGetServiceV2(c *cfg, sGet *getsvc.Service, keyStorage *util.KeyStorage) *getsvcV2.Service {
|
|
return getsvcV2.NewService(
|
|
sGet,
|
|
keyStorage,
|
|
c.clientCache,
|
|
c.netMapSource,
|
|
c,
|
|
c.cfgObject.cnrSource,
|
|
getsvcV2.WithLogger(c.log),
|
|
)
|
|
}
|
|
|
|
func createDeleteService(c *cfg, keyStorage *util.KeyStorage, sGet *getsvc.Service,
|
|
sSearch *searchsvc.Service, sPut *putsvc.Service,
|
|
) *deletesvc.Service {
|
|
return deletesvc.New(
|
|
sGet,
|
|
sSearch,
|
|
sPut,
|
|
&delNetInfo{
|
|
State: c.cfgNetmap.state,
|
|
|
|
cfg: c,
|
|
},
|
|
keyStorage,
|
|
deletesvc.WithLogger(c.log),
|
|
)
|
|
}
|
|
|
|
func createDeleteServiceV2(sDelete *deletesvc.Service) *deletesvcV2.Service {
|
|
return deletesvcV2.NewService(sDelete)
|
|
}
|
|
|
|
func createSplitService(c *cfg, sPutV2 *putsvcV2.Service, sGetV2 *getsvcV2.Service,
|
|
sSearchV2 *searchsvcV2.Service, sDeleteV2 *deletesvcV2.Service, sPatch *patchsvc.Service,
|
|
) *objectService.TransportSplitter {
|
|
return objectService.NewTransportSplitter(
|
|
c.cfgGRPC.maxChunkSize,
|
|
c.cfgGRPC.maxAddrAmount,
|
|
&objectSvc{
|
|
put: sPutV2,
|
|
search: sSearchV2,
|
|
get: sGetV2,
|
|
delete: sDeleteV2,
|
|
patch: sPatch,
|
|
},
|
|
)
|
|
}
|
|
|
|
func createACLServiceV2(c *cfg, apeSvc *objectAPE.Service, irFetcher *cachedIRFetcher) v2.Service {
|
|
return v2.New(
|
|
apeSvc,
|
|
c.netMapSource,
|
|
irFetcher,
|
|
c.cfgObject.cnrSource,
|
|
v2.WithLogger(c.log),
|
|
)
|
|
}
|
|
|
|
func createAPEService(c *cfg, splitSvc *objectService.TransportSplitter) *objectAPE.Service {
|
|
return objectAPE.NewService(
|
|
objectAPE.NewChecker(
|
|
c.cfgObject.cfgAccessPolicyEngine.accessPolicyEngine.LocalStorage(),
|
|
c.cfgObject.cfgAccessPolicyEngine.accessPolicyEngine.MorphRuleChainStorage(),
|
|
objectAPE.NewStorageEngineHeaderProvider(c.cfgObject.cfgLocalStorage.localStorage, c.cfgObject.getSvc),
|
|
c.shared.frostfsidClient,
|
|
c.netMapSource,
|
|
c.cfgNetmap.state,
|
|
c.cfgObject.cnrSource,
|
|
c.binPublicKey,
|
|
),
|
|
splitSvc,
|
|
)
|
|
}
|
|
|
|
type engineWithoutNotifications struct {
|
|
engine *engine.StorageEngine
|
|
}
|
|
|
|
func (e engineWithoutNotifications) IsLocked(ctx context.Context, address oid.Address) (bool, error) {
|
|
return e.engine.IsLocked(ctx, address)
|
|
}
|
|
|
|
func (e engineWithoutNotifications) Delete(ctx context.Context, tombstone oid.Address, toDelete []oid.ID) error {
|
|
var prm engine.InhumePrm
|
|
|
|
addrs := make([]oid.Address, len(toDelete))
|
|
for i := range addrs {
|
|
addrs[i].SetContainer(tombstone.Container())
|
|
addrs[i].SetObject(toDelete[i])
|
|
}
|
|
|
|
prm.WithTarget(tombstone, addrs...)
|
|
|
|
_, err := e.engine.Inhume(ctx, prm)
|
|
return err
|
|
}
|
|
|
|
func (e engineWithoutNotifications) Lock(ctx context.Context, locker oid.Address, toLock []oid.ID) error {
|
|
return e.engine.Lock(ctx, locker.Container(), locker.Object(), toLock)
|
|
}
|
|
|
|
func (e engineWithoutNotifications) Put(ctx context.Context, o *objectSDK.Object, indexedContainer bool) error {
|
|
return engine.Put(ctx, e.engine, o, indexedContainer)
|
|
}
|