diff --git a/cmd/neofs-node/config.go b/cmd/neofs-node/config.go index ec1589c366..f30321839d 100644 --- a/cmd/neofs-node/config.go +++ b/cmd/neofs-node/config.go @@ -11,6 +11,8 @@ import ( "github.com/nspcc-dev/neofs-api-go/v2/netmap" crypto "github.com/nspcc-dev/neofs-crypto" "github.com/nspcc-dev/neofs-node/misc" + "github.com/nspcc-dev/neofs-node/pkg/core/container" + netmapCore "github.com/nspcc-dev/neofs-node/pkg/core/netmap" "github.com/nspcc-dev/neofs-node/pkg/morph/client" "github.com/nspcc-dev/neofs-node/pkg/network" tokenStorage "github.com/nspcc-dev/neofs-node/pkg/services/session/storage" @@ -78,6 +80,8 @@ type cfg struct { cfgNodeInfo cfgNodeInfo localAddr *network.Address + + cfgObject cfgObject } type cfgGRPC struct { @@ -115,6 +119,12 @@ type cfgNodeInfo struct { attributes []*netmap.Attribute } +type cfgObject struct { + netMapStorage netmapCore.Source + + cnrStorage container.Source +} + const ( _ BootstrapType = iota StorageNode diff --git a/cmd/neofs-node/object.go b/cmd/neofs-node/object.go index b287b012b7..70ea550d6e 100644 --- a/cmd/neofs-node/object.go +++ b/cmd/neofs-node/object.go @@ -6,22 +6,45 @@ import ( "sync" "github.com/mr-tron/base58" + "github.com/nspcc-dev/neofs-api-go/pkg/owner" "github.com/nspcc-dev/neofs-api-go/v2/object" objectGRPC "github.com/nspcc-dev/neofs-api-go/v2/object/grpc" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/bucket" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/localstore" objectTransportGRPC "github.com/nspcc-dev/neofs-node/pkg/network/transport/object/grpc" objectService "github.com/nspcc-dev/neofs-node/pkg/services/object" - headsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/head/v2" - putsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/put/v2" - searchsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/search/v2" + deletesvc "github.com/nspcc-dev/neofs-node/pkg/services/object/delete" + deletesvcV2 "github.com/nspcc-dev/neofs-node/pkg/services/object/delete/v2" + getsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/get" + getsvcV2 "github.com/nspcc-dev/neofs-node/pkg/services/object/get/v2" + headsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/head" + headsvcV2 "github.com/nspcc-dev/neofs-node/pkg/services/object/head/v2" + putsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/put" + putsvcV2 "github.com/nspcc-dev/neofs-node/pkg/services/object/put/v2" + rangesvc "github.com/nspcc-dev/neofs-node/pkg/services/object/range" + rangesvcV2 "github.com/nspcc-dev/neofs-node/pkg/services/object/range/v2" + rangehashsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/rangehash" + rangehashsvcV2 "github.com/nspcc-dev/neofs-node/pkg/services/object/rangehash/v2" + searchsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/search" + searchsvcV2 "github.com/nspcc-dev/neofs-node/pkg/services/object/search/v2" + "github.com/nspcc-dev/neofs-node/pkg/services/object/util" + "github.com/panjf2000/ants/v2" ) type objectSvc struct { - put *putsvc.Service + put *putsvcV2.Service - search *searchsvc.Service + search *searchsvcV2.Service - head *headsvc.Service + head *headsvcV2.Service + + get *getsvcV2.Service + + rng *rangesvcV2.Service + + rngHash *rangehashsvcV2.Service + + delete *deletesvcV2.Service } type inMemBucket struct { @@ -109,30 +132,136 @@ func (s *objectSvc) Search(ctx context.Context, req *object.SearchRequest) (obje return s.search.Search(ctx, req) } -func (*objectSvc) Get(context.Context, *object.GetRequest) (object.GetObjectStreamer, error) { - return nil, errors.New("unimplemented service call") +func (s *objectSvc) Get(ctx context.Context, req *object.GetRequest) (object.GetObjectStreamer, error) { + return s.get.Get(ctx, req) } -func (*objectSvc) Delete(context.Context, *object.DeleteRequest) (*object.DeleteResponse, error) { - return nil, errors.New("unimplemented service call") +func (s *objectSvc) Delete(ctx context.Context, req *object.DeleteRequest) (*object.DeleteResponse, error) { + return s.delete.Delete(ctx, req) } -func (*objectSvc) GetRange(context.Context, *object.GetRangeRequest) (object.GetRangeObjectStreamer, error) { - return nil, errors.New("unimplemented service call") +func (s *objectSvc) GetRange(ctx context.Context, req *object.GetRangeRequest) (object.GetRangeObjectStreamer, error) { + return s.rng.GetRange(ctx, req) } -func (*objectSvc) GetRangeHash(context.Context, *object.GetRangeHashRequest) (*object.GetRangeHashResponse, error) { - return nil, errors.New("unimplemented service call") +func (s *objectSvc) GetRangeHash(ctx context.Context, req *object.GetRangeHashRequest) (*object.GetRangeHashResponse, error) { + return s.rngHash.GetRangeHash(ctx, req) } func initObjectService(c *cfg) { - svc := new(objectSvc) + ls := localstore.New(newBucket(), newBucket()) + keyStorage := util.NewKeyStorage(c.key, c.privateTokenStore) + nodeOwner := owner.NewID() + + neo3Wallet, err := owner.NEO3WalletFromPublicKey(&c.key.PublicKey) + fatalOnErr(err) + + nodeOwner.SetNeo3Wallet(neo3Wallet) + + sPut := putsvc.NewService( + putsvc.WithKeyStorage(keyStorage), + putsvc.WithMaxSizeSource(&maxSzSrc{3}), + putsvc.WithLocalStorage(ls), + putsvc.WithContainerSource(c.cfgObject.cnrStorage), + putsvc.WithNetworkMapSource(c.cfgObject.netMapStorage), + putsvc.WithLocalAddressSource(c), + ) + + sPutV2 := putsvcV2.NewService( + putsvcV2.WithInternalService(sPut), + ) + + sSearch := searchsvc.NewService( + searchsvc.WithKeyStorage(keyStorage), + searchsvc.WithLocalStorage(ls), + searchsvc.WithContainerSource(c.cfgObject.cnrStorage), + searchsvc.WithNetworkMapSource(c.cfgObject.netMapStorage), + searchsvc.WithLocalAddressSource(c), + ) + + sSearchV2 := searchsvcV2.NewService( + searchsvcV2.WithInternalService(sSearch), + ) + + sHead := headsvc.NewService( + headsvc.WithKeyStorage(keyStorage), + headsvc.WithLocalStorage(ls), + headsvc.WithContainerSource(c.cfgObject.cnrStorage), + headsvc.WithNetworkMapSource(c.cfgObject.netMapStorage), + headsvc.WithLocalAddressSource(c), + headsvc.WithRightChildSearcher(searchsvc.NewRightChildSearcher(sSearch)), + ) + + sHeadV2 := headsvcV2.NewService( + headsvcV2.WithInternalService(sHead), + ) + + pool, err := ants.NewPool(10) + fatalOnErr(err) + + sRange := rangesvc.NewService( + rangesvc.WithKeyStorage(keyStorage), + rangesvc.WithLocalStorage(ls), + rangesvc.WithContainerSource(c.cfgObject.cnrStorage), + rangesvc.WithNetworkMapSource(c.cfgObject.netMapStorage), + rangesvc.WithLocalAddressSource(c), + rangesvc.WithWorkerPool(pool), + rangesvc.WithHeadService(sHead), + ) + + sRangeV2 := rangesvcV2.NewService( + rangesvcV2.WithInternalService(sRange), + ) + + sGet := getsvc.NewService( + getsvc.WithRangeService(sRange), + ) + + sGetV2 := getsvcV2.NewService( + getsvcV2.WithInternalService(sGet), + ) + + sRangeHash := rangehashsvc.NewService( + rangehashsvc.WithKeyStorage(keyStorage), + rangehashsvc.WithLocalStorage(ls), + rangehashsvc.WithContainerSource(c.cfgObject.cnrStorage), + rangehashsvc.WithNetworkMapSource(c.cfgObject.netMapStorage), + rangehashsvc.WithLocalAddressSource(c), + rangehashsvc.WithHeadService(sHead), + rangehashsvc.WithRangeService(sRange), + ) + + sRangeHashV2 := rangehashsvcV2.NewService( + rangehashsvcV2.WithInternalService(sRangeHash), + ) + + sDelete := deletesvc.NewService( + deletesvc.WithKeyStorage(keyStorage), + deletesvc.WitHeadService(sHead), + deletesvc.WithPutService(sPut), + deletesvc.WithOwnerID(nodeOwner), + deletesvc.WithLinkingHeader( + headsvc.NewRelationHeader(searchsvc.NewLinkingSearcher(sSearch), sHead), + ), + ) + + sDeleteV2 := deletesvcV2.NewService( + deletesvcV2.WithInternalService(sDelete), + ) objectGRPC.RegisterObjectServiceServer(c.cfgGRPC.server, objectTransportGRPC.New( objectService.NewSignService( c.key, - svc, + &objectSvc{ + put: sPutV2, + search: sSearchV2, + head: sHeadV2, + rng: sRangeV2, + get: sGetV2, + rngHash: sRangeHashV2, + delete: sDeleteV2, + }, ), ), )