forked from TrueCloudLab/frostfs-node
[#1402] pkg: Move SGSource
to the core
directory
Signed-off-by: Pavel Karpy <carpawell@nspcc.ru>
This commit is contained in:
parent
27304455bf
commit
ec07fda97b
4 changed files with 66 additions and 78 deletions
48
pkg/core/storagegroup/storagegroup.go
Normal file
48
pkg/core/storagegroup/storagegroup.go
Normal file
|
@ -0,0 +1,48 @@
|
|||
package storagegroup
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/nspcc-dev/neofs-node/pkg/core/client"
|
||||
"github.com/nspcc-dev/neofs-sdk-go/container/id"
|
||||
"github.com/nspcc-dev/neofs-sdk-go/netmap"
|
||||
"github.com/nspcc-dev/neofs-sdk-go/object/id"
|
||||
"github.com/nspcc-dev/neofs-sdk-go/storagegroup"
|
||||
)
|
||||
|
||||
// SearchSGPrm groups the parameters which are formed by Processor to search the storage group objects.
|
||||
type SearchSGPrm struct {
|
||||
Context context.Context
|
||||
|
||||
Container cid.ID
|
||||
|
||||
NodeInfo client.NodeInfo
|
||||
}
|
||||
|
||||
// SearchSGDst groups the target values which Processor expects from SG searching to process.
|
||||
type SearchSGDst struct {
|
||||
Objects []oid.ID
|
||||
}
|
||||
|
||||
// GetSGPrm groups parameter of GetSG operation.
|
||||
type GetSGPrm struct {
|
||||
Context context.Context
|
||||
|
||||
OID oid.ID
|
||||
CID cid.ID
|
||||
|
||||
NetMap netmap.NetMap
|
||||
Container [][]netmap.NodeInfo
|
||||
}
|
||||
|
||||
// SGSource is a storage group information source interface.
|
||||
type SGSource interface {
|
||||
// ListSG must list storage group objects in the container. Formed list must be written to destination.
|
||||
//
|
||||
// Must return any error encountered which did not allow to form the list.
|
||||
ListSG(*SearchSGDst, SearchSGPrm) error
|
||||
|
||||
// GetSG must return storage group object for the provided CID, OID,
|
||||
// container and netmap state.
|
||||
GetSG(GetSGPrm) (*storagegroup.StorageGroup, error)
|
||||
}
|
|
@ -7,6 +7,7 @@ import (
|
|||
|
||||
clientcore "github.com/nspcc-dev/neofs-node/pkg/core/client"
|
||||
netmapcore "github.com/nspcc-dev/neofs-node/pkg/core/netmap"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/core/storagegroup"
|
||||
cntClient "github.com/nspcc-dev/neofs-node/pkg/morph/client/container"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/services/audit"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement"
|
||||
|
@ -126,10 +127,10 @@ func (ap *Processor) findStorageGroups(cnr cid.ID, shuffled netmapcore.Nodes) []
|
|||
|
||||
var (
|
||||
info clientcore.NodeInfo
|
||||
prm SearchSGPrm
|
||||
prm storagegroup.SearchSGPrm
|
||||
)
|
||||
|
||||
prm.id = cnr
|
||||
prm.Container = cnr
|
||||
|
||||
for i := range shuffled { // consider iterating over some part of container
|
||||
log := ap.log.With(
|
||||
|
@ -148,10 +149,10 @@ func (ap *Processor) findStorageGroups(cnr cid.ID, shuffled netmapcore.Nodes) []
|
|||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), ap.searchTimeout)
|
||||
|
||||
prm.ctx = ctx
|
||||
prm.info = info
|
||||
prm.Context = ctx
|
||||
prm.NodeInfo = info
|
||||
|
||||
var dst SearchSGDst
|
||||
var dst storagegroup.SearchSGDst
|
||||
|
||||
err = ap.sgSrc.ListSG(&dst, prm)
|
||||
|
||||
|
@ -162,7 +163,7 @@ func (ap *Processor) findStorageGroups(cnr cid.ID, shuffled netmapcore.Nodes) []
|
|||
continue
|
||||
}
|
||||
|
||||
sg = append(sg, dst.ids...)
|
||||
sg = append(sg, dst.Objects...)
|
||||
|
||||
break // we found storage groups, so break loop
|
||||
}
|
||||
|
@ -174,7 +175,7 @@ func (ap *Processor) filterExpiredSG(cid cid.ID, sgIDs []oid.ID,
|
|||
cnr [][]netmap.NodeInfo, nm netmap.NetMap) map[oid.ID]storagegroupSDK.StorageGroup {
|
||||
sgs := make(map[oid.ID]storagegroupSDK.StorageGroup, len(sgIDs))
|
||||
|
||||
var getSGPrm GetSGPrm
|
||||
var getSGPrm storagegroup.GetSGPrm
|
||||
getSGPrm.CID = cid
|
||||
getSGPrm.Container = cnr
|
||||
getSGPrm.NetMap = nm
|
||||
|
|
|
@ -7,15 +7,11 @@ import (
|
|||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/nspcc-dev/neofs-node/pkg/core/client"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/core/storagegroup"
|
||||
cntClient "github.com/nspcc-dev/neofs-node/pkg/morph/client/container"
|
||||
nmClient "github.com/nspcc-dev/neofs-node/pkg/morph/client/netmap"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/morph/event"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/services/audit"
|
||||
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
|
||||
"github.com/nspcc-dev/neofs-sdk-go/netmap"
|
||||
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
|
||||
storagegroupSDK "github.com/nspcc-dev/neofs-sdk-go/storagegroup"
|
||||
"github.com/panjf2000/ants/v2"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
@ -47,7 +43,7 @@ type (
|
|||
log *zap.Logger
|
||||
pool *ants.Pool
|
||||
irList Indexer
|
||||
sgSrc SGSource
|
||||
sgSrc storagegroup.SGSource
|
||||
epochSrc EpochSource
|
||||
searchTimeout time.Duration
|
||||
|
||||
|
@ -65,7 +61,7 @@ type (
|
|||
NetmapClient *nmClient.Client
|
||||
ContainerClient *cntClient.Client
|
||||
IRList Indexer
|
||||
SGSource SGSource
|
||||
SGSource storagegroup.SGSource
|
||||
RPCSearchTimeout time.Duration
|
||||
TaskManager TaskManager
|
||||
Reporter audit.Reporter
|
||||
|
@ -74,63 +70,6 @@ type (
|
|||
}
|
||||
)
|
||||
|
||||
// SearchSGPrm groups the parameters which are formed by Processor to search the storage group objects.
|
||||
type SearchSGPrm struct {
|
||||
ctx context.Context
|
||||
|
||||
id cid.ID
|
||||
|
||||
info client.NodeInfo
|
||||
}
|
||||
|
||||
// Context returns context to use for network communication.
|
||||
func (x SearchSGPrm) Context() context.Context {
|
||||
return x.ctx
|
||||
}
|
||||
|
||||
// CID returns the identifier of the container to search SG in.
|
||||
func (x SearchSGPrm) CID() cid.ID {
|
||||
return x.id
|
||||
}
|
||||
|
||||
// NodeInfo returns information about a storage node to communicate with.
|
||||
func (x SearchSGPrm) NodeInfo() client.NodeInfo {
|
||||
return x.info
|
||||
}
|
||||
|
||||
// SearchSGDst groups the target values which Processor expects from SG searching to process.
|
||||
type SearchSGDst struct {
|
||||
ids []oid.ID
|
||||
}
|
||||
|
||||
// WriteIDList writes a list of identifiers of storage group objects stored in the container.
|
||||
func (x *SearchSGDst) WriteIDList(ids []oid.ID) {
|
||||
x.ids = ids
|
||||
}
|
||||
|
||||
// GetSGPrm groups parameter of GetSG operation.
|
||||
type GetSGPrm struct {
|
||||
Context context.Context
|
||||
|
||||
OID oid.ID
|
||||
CID cid.ID
|
||||
|
||||
NetMap netmap.NetMap
|
||||
Container [][]netmap.NodeInfo
|
||||
}
|
||||
|
||||
// SGSource is a storage group information source interface.
|
||||
type SGSource interface {
|
||||
// ListSG must list storage group objects in the container. Formed list must be written to destination.
|
||||
//
|
||||
// Must return any error encountered which did not allow to form the list.
|
||||
ListSG(*SearchSGDst, SearchSGPrm) error
|
||||
|
||||
// GetSG must return storage group object for the provided CID, OID,
|
||||
// container and netmap state.
|
||||
GetSG(GetSGPrm) (*storagegroupSDK.StorageGroup, error)
|
||||
}
|
||||
|
||||
type epochAuditReporter struct {
|
||||
epoch uint64
|
||||
|
||||
|
|
|
@ -8,8 +8,8 @@ import (
|
|||
|
||||
clientcore "github.com/nspcc-dev/neofs-node/pkg/core/client"
|
||||
netmapcore "github.com/nspcc-dev/neofs-node/pkg/core/netmap"
|
||||
storagegroup2 "github.com/nspcc-dev/neofs-node/pkg/core/storagegroup"
|
||||
neofsapiclient "github.com/nspcc-dev/neofs-node/pkg/innerring/internal/client"
|
||||
auditproc "github.com/nspcc-dev/neofs-node/pkg/innerring/processors/audit"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/network/cache"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/services/audit/auditor"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement"
|
||||
|
@ -62,7 +62,7 @@ func (c *ClientCache) Get(info clientcore.NodeInfo) (clientcore.Client, error) {
|
|||
// Returns storage groups structure from received object.
|
||||
//
|
||||
// Returns an error of type apistatus.ObjectNotFound if storage group is missing.
|
||||
func (c *ClientCache) GetSG(prm auditproc.GetSGPrm) (*storagegroup.StorageGroup, error) {
|
||||
func (c *ClientCache) GetSG(prm storagegroup2.GetSGPrm) (*storagegroup.StorageGroup, error) {
|
||||
var sgAddress oid.Address
|
||||
sgAddress.SetContainer(prm.CID)
|
||||
sgAddress.SetObject(prm.OID)
|
||||
|
@ -212,23 +212,23 @@ func (c *ClientCache) getWrappedClient(info clientcore.NodeInfo) (neofsapiclient
|
|||
return cInternal, nil
|
||||
}
|
||||
|
||||
func (c ClientCache) ListSG(dst *auditproc.SearchSGDst, prm auditproc.SearchSGPrm) error {
|
||||
cli, err := c.getWrappedClient(prm.NodeInfo())
|
||||
func (c ClientCache) ListSG(dst *storagegroup2.SearchSGDst, prm storagegroup2.SearchSGPrm) error {
|
||||
cli, err := c.getWrappedClient(prm.NodeInfo)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not get API client from cache")
|
||||
}
|
||||
|
||||
var cliPrm neofsapiclient.SearchSGPrm
|
||||
|
||||
cliPrm.SetContext(prm.Context())
|
||||
cliPrm.SetContainerID(prm.CID())
|
||||
cliPrm.SetContext(prm.Context)
|
||||
cliPrm.SetContainerID(prm.Container)
|
||||
|
||||
res, err := cli.SearchSG(cliPrm)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
dst.WriteIDList(res.IDList())
|
||||
dst.Objects = res.IDList()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue