From ec07fda97bd192db44f0fcc1b1311908ad9cd03e Mon Sep 17 00:00:00 2001 From: Pavel Karpy Date: Fri, 1 Jul 2022 21:16:28 +0300 Subject: [PATCH] [#1402] pkg: Move `SGSource` to the `core` directory Signed-off-by: Pavel Karpy --- pkg/core/storagegroup/storagegroup.go | 48 +++++++++++++++ pkg/innerring/processors/audit/process.go | 15 ++--- pkg/innerring/processors/audit/processor.go | 67 +-------------------- pkg/innerring/rpc.go | 14 ++--- 4 files changed, 66 insertions(+), 78 deletions(-) create mode 100644 pkg/core/storagegroup/storagegroup.go diff --git a/pkg/core/storagegroup/storagegroup.go b/pkg/core/storagegroup/storagegroup.go new file mode 100644 index 000000000..06ab6ad3f --- /dev/null +++ b/pkg/core/storagegroup/storagegroup.go @@ -0,0 +1,48 @@ +package storagegroup + +import ( + "context" + + "github.com/nspcc-dev/neofs-node/pkg/core/client" + "github.com/nspcc-dev/neofs-sdk-go/container/id" + "github.com/nspcc-dev/neofs-sdk-go/netmap" + "github.com/nspcc-dev/neofs-sdk-go/object/id" + "github.com/nspcc-dev/neofs-sdk-go/storagegroup" +) + +// SearchSGPrm groups the parameters which are formed by Processor to search the storage group objects. +type SearchSGPrm struct { + Context context.Context + + Container cid.ID + + NodeInfo client.NodeInfo +} + +// SearchSGDst groups the target values which Processor expects from SG searching to process. +type SearchSGDst struct { + Objects []oid.ID +} + +// GetSGPrm groups parameter of GetSG operation. +type GetSGPrm struct { + Context context.Context + + OID oid.ID + CID cid.ID + + NetMap netmap.NetMap + Container [][]netmap.NodeInfo +} + +// SGSource is a storage group information source interface. +type SGSource interface { + // ListSG must list storage group objects in the container. Formed list must be written to destination. + // + // Must return any error encountered which did not allow to form the list. + ListSG(*SearchSGDst, SearchSGPrm) error + + // GetSG must return storage group object for the provided CID, OID, + // container and netmap state. + GetSG(GetSGPrm) (*storagegroup.StorageGroup, error) +} diff --git a/pkg/innerring/processors/audit/process.go b/pkg/innerring/processors/audit/process.go index f751cbe49..29596de52 100644 --- a/pkg/innerring/processors/audit/process.go +++ b/pkg/innerring/processors/audit/process.go @@ -7,6 +7,7 @@ import ( clientcore "github.com/nspcc-dev/neofs-node/pkg/core/client" netmapcore "github.com/nspcc-dev/neofs-node/pkg/core/netmap" + "github.com/nspcc-dev/neofs-node/pkg/core/storagegroup" cntClient "github.com/nspcc-dev/neofs-node/pkg/morph/client/container" "github.com/nspcc-dev/neofs-node/pkg/services/audit" "github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement" @@ -126,10 +127,10 @@ func (ap *Processor) findStorageGroups(cnr cid.ID, shuffled netmapcore.Nodes) [] var ( info clientcore.NodeInfo - prm SearchSGPrm + prm storagegroup.SearchSGPrm ) - prm.id = cnr + prm.Container = cnr for i := range shuffled { // consider iterating over some part of container log := ap.log.With( @@ -148,10 +149,10 @@ func (ap *Processor) findStorageGroups(cnr cid.ID, shuffled netmapcore.Nodes) [] ctx, cancel := context.WithTimeout(context.Background(), ap.searchTimeout) - prm.ctx = ctx - prm.info = info + prm.Context = ctx + prm.NodeInfo = info - var dst SearchSGDst + var dst storagegroup.SearchSGDst err = ap.sgSrc.ListSG(&dst, prm) @@ -162,7 +163,7 @@ func (ap *Processor) findStorageGroups(cnr cid.ID, shuffled netmapcore.Nodes) [] continue } - sg = append(sg, dst.ids...) + sg = append(sg, dst.Objects...) break // we found storage groups, so break loop } @@ -174,7 +175,7 @@ func (ap *Processor) filterExpiredSG(cid cid.ID, sgIDs []oid.ID, cnr [][]netmap.NodeInfo, nm netmap.NetMap) map[oid.ID]storagegroupSDK.StorageGroup { sgs := make(map[oid.ID]storagegroupSDK.StorageGroup, len(sgIDs)) - var getSGPrm GetSGPrm + var getSGPrm storagegroup.GetSGPrm getSGPrm.CID = cid getSGPrm.Container = cnr getSGPrm.NetMap = nm diff --git a/pkg/innerring/processors/audit/processor.go b/pkg/innerring/processors/audit/processor.go index a4c3d50cb..71923a59f 100644 --- a/pkg/innerring/processors/audit/processor.go +++ b/pkg/innerring/processors/audit/processor.go @@ -7,15 +7,11 @@ import ( "fmt" "time" - "github.com/nspcc-dev/neofs-node/pkg/core/client" + "github.com/nspcc-dev/neofs-node/pkg/core/storagegroup" cntClient "github.com/nspcc-dev/neofs-node/pkg/morph/client/container" nmClient "github.com/nspcc-dev/neofs-node/pkg/morph/client/netmap" "github.com/nspcc-dev/neofs-node/pkg/morph/event" "github.com/nspcc-dev/neofs-node/pkg/services/audit" - cid "github.com/nspcc-dev/neofs-sdk-go/container/id" - "github.com/nspcc-dev/neofs-sdk-go/netmap" - oid "github.com/nspcc-dev/neofs-sdk-go/object/id" - storagegroupSDK "github.com/nspcc-dev/neofs-sdk-go/storagegroup" "github.com/panjf2000/ants/v2" "go.uber.org/zap" ) @@ -47,7 +43,7 @@ type ( log *zap.Logger pool *ants.Pool irList Indexer - sgSrc SGSource + sgSrc storagegroup.SGSource epochSrc EpochSource searchTimeout time.Duration @@ -65,7 +61,7 @@ type ( NetmapClient *nmClient.Client ContainerClient *cntClient.Client IRList Indexer - SGSource SGSource + SGSource storagegroup.SGSource RPCSearchTimeout time.Duration TaskManager TaskManager Reporter audit.Reporter @@ -74,63 +70,6 @@ type ( } ) -// SearchSGPrm groups the parameters which are formed by Processor to search the storage group objects. -type SearchSGPrm struct { - ctx context.Context - - id cid.ID - - info client.NodeInfo -} - -// Context returns context to use for network communication. -func (x SearchSGPrm) Context() context.Context { - return x.ctx -} - -// CID returns the identifier of the container to search SG in. -func (x SearchSGPrm) CID() cid.ID { - return x.id -} - -// NodeInfo returns information about a storage node to communicate with. -func (x SearchSGPrm) NodeInfo() client.NodeInfo { - return x.info -} - -// SearchSGDst groups the target values which Processor expects from SG searching to process. -type SearchSGDst struct { - ids []oid.ID -} - -// WriteIDList writes a list of identifiers of storage group objects stored in the container. -func (x *SearchSGDst) WriteIDList(ids []oid.ID) { - x.ids = ids -} - -// GetSGPrm groups parameter of GetSG operation. -type GetSGPrm struct { - Context context.Context - - OID oid.ID - CID cid.ID - - NetMap netmap.NetMap - Container [][]netmap.NodeInfo -} - -// SGSource is a storage group information source interface. -type SGSource interface { - // ListSG must list storage group objects in the container. Formed list must be written to destination. - // - // Must return any error encountered which did not allow to form the list. - ListSG(*SearchSGDst, SearchSGPrm) error - - // GetSG must return storage group object for the provided CID, OID, - // container and netmap state. - GetSG(GetSGPrm) (*storagegroupSDK.StorageGroup, error) -} - type epochAuditReporter struct { epoch uint64 diff --git a/pkg/innerring/rpc.go b/pkg/innerring/rpc.go index bc7928154..4e3aa1774 100644 --- a/pkg/innerring/rpc.go +++ b/pkg/innerring/rpc.go @@ -8,8 +8,8 @@ import ( clientcore "github.com/nspcc-dev/neofs-node/pkg/core/client" netmapcore "github.com/nspcc-dev/neofs-node/pkg/core/netmap" + storagegroup2 "github.com/nspcc-dev/neofs-node/pkg/core/storagegroup" neofsapiclient "github.com/nspcc-dev/neofs-node/pkg/innerring/internal/client" - auditproc "github.com/nspcc-dev/neofs-node/pkg/innerring/processors/audit" "github.com/nspcc-dev/neofs-node/pkg/network/cache" "github.com/nspcc-dev/neofs-node/pkg/services/audit/auditor" "github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement" @@ -62,7 +62,7 @@ func (c *ClientCache) Get(info clientcore.NodeInfo) (clientcore.Client, error) { // Returns storage groups structure from received object. // // Returns an error of type apistatus.ObjectNotFound if storage group is missing. -func (c *ClientCache) GetSG(prm auditproc.GetSGPrm) (*storagegroup.StorageGroup, error) { +func (c *ClientCache) GetSG(prm storagegroup2.GetSGPrm) (*storagegroup.StorageGroup, error) { var sgAddress oid.Address sgAddress.SetContainer(prm.CID) sgAddress.SetObject(prm.OID) @@ -212,23 +212,23 @@ func (c *ClientCache) getWrappedClient(info clientcore.NodeInfo) (neofsapiclient return cInternal, nil } -func (c ClientCache) ListSG(dst *auditproc.SearchSGDst, prm auditproc.SearchSGPrm) error { - cli, err := c.getWrappedClient(prm.NodeInfo()) +func (c ClientCache) ListSG(dst *storagegroup2.SearchSGDst, prm storagegroup2.SearchSGPrm) error { + cli, err := c.getWrappedClient(prm.NodeInfo) if err != nil { return fmt.Errorf("could not get API client from cache") } var cliPrm neofsapiclient.SearchSGPrm - cliPrm.SetContext(prm.Context()) - cliPrm.SetContainerID(prm.CID()) + cliPrm.SetContext(prm.Context) + cliPrm.SetContainerID(prm.Container) res, err := cli.SearchSG(cliPrm) if err != nil { return err } - dst.WriteIDList(res.IDList()) + dst.Objects = res.IDList() return nil }