diff --git a/pkg/innerring/innerring.go b/pkg/innerring/innerring.go index 394a51ae..4bb7bd90 100644 --- a/pkg/innerring/innerring.go +++ b/pkg/innerring/innerring.go @@ -544,7 +544,7 @@ func New(ctx context.Context, log *zap.Logger, cfg *viper.Viper) (*Server, error NetmapClient: server.netmapClient, ContainerClient: cnrClient, IRList: server, - ClientCache: clientCache, + SGSource: clientCache, Key: &server.key.PrivateKey, RPCSearchTimeout: cfg.GetDuration("audit.timeout.search"), TaskManager: auditTaskManager, diff --git a/pkg/innerring/internal/client/client.go b/pkg/innerring/internal/client/client.go new file mode 100644 index 00000000..f55f5953 --- /dev/null +++ b/pkg/innerring/internal/client/client.go @@ -0,0 +1,249 @@ +package neofsapiclient + +import ( + "context" + "crypto/ecdsa" + "fmt" + + "github.com/nspcc-dev/neofs-api-go/pkg/client" + cid "github.com/nspcc-dev/neofs-api-go/pkg/container/id" + "github.com/nspcc-dev/neofs-api-go/pkg/object" + "github.com/nspcc-dev/neofs-node/pkg/services/object_manager/storagegroup" +) + +// Client represents NeoFS API client cut down to the needs of a purely IR application. +type Client struct { + key *ecdsa.PrivateKey + + c client.Client +} + +// WrapBasicClient wraps client.Client instance to use it for NeoFS API RPC. +func (x *Client) WrapBasicClient(c client.Client) { + x.c = c +} + +// SetPrivateKey sets private key to sign RPC requests. +func (x *Client) SetPrivateKey(key *ecdsa.PrivateKey) { + x.key = key +} + +// SearchSGPrm groups parameters of SearchSG operation. +type SearchSGPrm struct { + contextPrm + + cnrID *cid.ID +} + +// SetContainerID sets ID of the container to search for storage groups. +func (x *SearchSGPrm) SetContainerID(id *cid.ID) { + x.cnrID = id +} + +// SearchSGRes groups resulting values of SearchSG operation. +type SearchSGRes struct { + cliRes []*object.ID +} + +// IDList returns list of IDs of storage groups in container. +func (x SearchSGRes) IDList() []*object.ID { + return x.cliRes +} + +var sgFilter = storagegroup.SearchQuery() + +// SearchSG lists objects of storage group type in the container. +func (x Client) SearchSG(prm SearchSGPrm) (res SearchSGRes, err error) { + var cliPrm client.SearchObjectParams + + cliPrm.WithContainerID(prm.cnrID) + cliPrm.WithSearchFilters(sgFilter) + + res.cliRes, err = x.c.SearchObject(prm.ctx, &cliPrm, + client.WithKey(x.key), + ) + + return +} + +// GetObjectPrm groups parameters of GetObject operation. +type GetObjectPrm struct { + getObjectPrm +} + +// GetObjectRes groups resulting values of GetObject operation. +type GetObjectRes struct { + cliRes *object.Object +} + +// Object returns received object. +func (x GetObjectRes) Object() *object.Object { + return x.cliRes +} + +// GetObject reads the object by address. +func (x Client) GetObject(prm GetObjectPrm) (res GetObjectRes, err error) { + var cliPrm client.GetObjectParams + + cliPrm.WithAddress(prm.objAddr) + + res.cliRes, err = x.c.GetObject(prm.ctx, &cliPrm, + client.WithKey(x.key), + ) + + return +} + +// HeadObjectPrm groups parameters of HeadObject operation. +type HeadObjectPrm struct { + getObjectPrm + + raw bool + + ttl uint32 +} + +// SetRawFlag sets flag of raw request. +func (x *HeadObjectPrm) SetRawFlag() { + x.raw = true +} + +// SetTTL sets request TTL value. +func (x *HeadObjectPrm) SetTTL(ttl uint32) { + x.ttl = ttl +} + +// HeadObjectRes groups resulting values of HeadObject operation. +type HeadObjectRes struct { + cliRes *object.Object +} + +// Header returns received object header. +func (x HeadObjectRes) Header() *object.Object { + return x.cliRes +} + +// HeadObject reads short object header by address. +// +// For raw requests, returns an error if requested object is virtual. +func (x Client) HeadObject(prm HeadObjectPrm) (res HeadObjectRes, err error) { + var cliPrm client.ObjectHeaderParams + + cliPrm.WithAddress(prm.objAddr) + cliPrm.WithRawFlag(prm.raw) + cliPrm.WithMainFields() + + res.cliRes, err = x.c.GetObjectHeader(prm.ctx, &cliPrm, + client.WithKey(x.key), + client.WithTTL(prm.ttl), + ) + + return +} + +// GetObjectPayload reads object by address from NeoFS via Client and returns its payload. +func GetObjectPayload(ctx context.Context, c Client, addr *object.Address) ([]byte, error) { + var prm GetObjectPrm + + prm.SetContext(ctx) + prm.SetAddress(addr) + + obj, err := c.GetObject(prm) + if err != nil { + return nil, err + } + + return obj.Object().Payload(), nil +} + +func headObject(ctx context.Context, c Client, addr *object.Address, raw bool, ttl uint32) (*object.Object, error) { + var prm HeadObjectPrm + + prm.SetContext(ctx) + prm.SetAddress(addr) + prm.SetTTL(ttl) + + if raw { + prm.SetRawFlag() + } + + obj, err := c.HeadObject(prm) + if err != nil { + return nil, err + } + + return obj.Header(), nil +} + +// GetRawObjectHeaderLocally reads raw short object header from server's local storage by address via Client. +func GetRawObjectHeaderLocally(ctx context.Context, c Client, addr *object.Address) (*object.Object, error) { + return headObject(ctx, c, addr, true, 1) +} + +// GetObjectHeaderFromContainer reads short object header by address via Client with TTL = 10 +// for deep traversal of the container. +func GetObjectHeaderFromContainer(ctx context.Context, c Client, addr *object.Address) (*object.Object, error) { + return headObject(ctx, c, addr, false, 10) +} + +// HashPayloadRangePrm groups parameters of HashPayloadRange operation. +type HashPayloadRangePrm struct { + getObjectPrm + + rng *object.Range +} + +// SetRange sets payload range to calculate the hash. +func (x *HashPayloadRangePrm) SetRange(rng *object.Range) { + x.rng = rng +} + +// HashPayloadRangeRes groups resulting values of HashPayloadRange operation. +type HashPayloadRangeRes struct { + h []byte +} + +// Hash returns hash of the object payload range. +func (x HashPayloadRangeRes) Hash() []byte { + return x.h +} + +// HashObjectRange requests to calculate Tillich-Zemor hash of the payload range of the object +// from the remote server's local storage. +func (x Client) HashPayloadRange(prm HashPayloadRangePrm) (res HashPayloadRangeRes, err error) { + var cliPrm client.RangeChecksumParams + + cliPrm.WithAddress(prm.objAddr) + cliPrm.WithRangeList(prm.rng) + + hs, err := x.c.ObjectPayloadRangeTZ(prm.ctx, &cliPrm, + client.WithKey(x.key), + client.WithTTL(1), + ) + if err == nil { + if ln := len(hs); ln != 1 { + err = fmt.Errorf("wrong number of hashes %d", ln) + } else { + res.h = hs[0][:] + } + } + + return +} + +// HashObjectRange reads Tillich-Zemor hash of the object payload range by address +// from the remote server's local storage via Client. +func HashObjectRange(ctx context.Context, c Client, addr *object.Address, rng *object.Range) ([]byte, error) { + var prm HashPayloadRangePrm + + prm.SetContext(ctx) + prm.SetAddress(addr) + prm.SetRange(rng) + + res, err := c.HashPayloadRange(prm) + if err != nil { + return nil, err + } + + return res.Hash(), nil +} diff --git a/pkg/innerring/internal/client/doc.go b/pkg/innerring/internal/client/doc.go new file mode 100644 index 00000000..8596c66d --- /dev/null +++ b/pkg/innerring/internal/client/doc.go @@ -0,0 +1,12 @@ +// Package neofsapiclient provides functionality for IR application communication with NeoFS network. +// +// The basic client for accessing remote nodes via NeoFS API is a NeoFS SDK Go API client. +// However, although it encapsulates a useful piece of business logic (e.g. the signature mechanism), +// the IR application does not fully use the client's flexible interface. +// +// In this regard, this package represents an abstraction - a type-wrapper over the base client. +// The type provides the minimum interface necessary for the application, and also allows you to concentrate +// the entire spectrum of the client's use in one place (this will be convenient both when updating the base client +// and for evaluating the UX of SDK library). So it is expected that all application packages will be limited +// to this package for the development of functionality requiring NeoFS API communication. +package neofsapiclient diff --git a/pkg/innerring/internal/client/prm.go b/pkg/innerring/internal/client/prm.go new file mode 100644 index 00000000..0e1c08d1 --- /dev/null +++ b/pkg/innerring/internal/client/prm.go @@ -0,0 +1,30 @@ +package neofsapiclient + +import ( + "context" + + "github.com/nspcc-dev/neofs-api-go/pkg/object" +) + +type contextPrm struct { + ctx context.Context +} + +// SetContext sets context.Context used for network communication. +func (x *contextPrm) SetContext(ctx context.Context) { + x.ctx = ctx +} + +type objectAddressPrm struct { + objAddr *object.Address +} + +// SetAddress sets address of the object. +func (x *objectAddressPrm) SetAddress(addr *object.Address) { + x.objAddr = addr +} + +type getObjectPrm struct { + contextPrm + objectAddressPrm +} diff --git a/pkg/innerring/processors/audit/process.go b/pkg/innerring/processors/audit/process.go index 90392ee3..998cbac7 100644 --- a/pkg/innerring/processors/audit/process.go +++ b/pkg/innerring/processors/audit/process.go @@ -4,20 +4,16 @@ import ( "context" "encoding/hex" - "github.com/nspcc-dev/neofs-api-go/pkg/client" cid "github.com/nspcc-dev/neofs-api-go/pkg/container/id" "github.com/nspcc-dev/neofs-api-go/pkg/netmap" "github.com/nspcc-dev/neofs-api-go/pkg/object" clientcore "github.com/nspcc-dev/neofs-node/pkg/core/client" "github.com/nspcc-dev/neofs-node/pkg/morph/client/container/wrapper" "github.com/nspcc-dev/neofs-node/pkg/services/audit" - "github.com/nspcc-dev/neofs-node/pkg/services/object_manager/storagegroup" "github.com/nspcc-dev/neofs-node/pkg/util/rand" "go.uber.org/zap" ) -var sgFilter = storagegroup.SearchQuery() - func (ap *Processor) processStartAudit(epoch uint64) { log := ap.log.With(zap.Uint64("epoch", epoch)) @@ -117,7 +113,12 @@ func (ap *Processor) findStorageGroups(cid *cid.ID, shuffled netmap.Nodes) []*ob ln := len(shuffled) - var info clientcore.NodeInfo + var ( + info clientcore.NodeInfo + prm SearchSGPrm + ) + + prm.id = cid for i := range shuffled { // consider iterating over some part of container log := ap.log.With( @@ -134,19 +135,15 @@ func (ap *Processor) findStorageGroups(cid *cid.ID, shuffled netmap.Nodes) []*ob continue } - cli, err := ap.clientCache.Get(info) - if err != nil { - log.Warn("can't setup remote connection", zap.String("error", err.Error())) - - continue - } - - sgSearchParams := &client.SearchObjectParams{} - sgSearchParams.WithContainerID(cid) - sgSearchParams.WithSearchFilters(sgFilter) - ctx, cancel := context.WithTimeout(context.Background(), ap.searchTimeout) - result, err := cli.SearchObject(ctx, sgSearchParams, client.WithKey(ap.key)) + + prm.ctx = ctx + prm.info = info + + var dst SearchSGDst + + err = ap.sgSrc.ListSG(&dst, prm) + cancel() if err != nil { @@ -154,7 +151,7 @@ func (ap *Processor) findStorageGroups(cid *cid.ID, shuffled netmap.Nodes) []*ob continue } - sg = append(sg, result...) + sg = append(sg, dst.ids...) break // we found storage groups, so break loop } diff --git a/pkg/innerring/processors/audit/processor.go b/pkg/innerring/processors/audit/processor.go index 4163aea2..25dff192 100644 --- a/pkg/innerring/processors/audit/processor.go +++ b/pkg/innerring/processors/audit/processor.go @@ -7,7 +7,8 @@ import ( "fmt" "time" - SDKClient "github.com/nspcc-dev/neofs-api-go/pkg/client" + cid "github.com/nspcc-dev/neofs-api-go/pkg/container/id" + "github.com/nspcc-dev/neofs-api-go/pkg/object" "github.com/nspcc-dev/neofs-node/pkg/core/client" wrapContainer "github.com/nspcc-dev/neofs-node/pkg/morph/client/container/wrapper" wrapNetmap "github.com/nspcc-dev/neofs-node/pkg/morph/client/netmap/wrapper" @@ -24,11 +25,6 @@ type ( InnerRingSize() int } - // NeoFSClientCache is an interface for cache of neofs RPC clients - NeoFSClientCache interface { - Get(client.NodeInfo) (SDKClient.Client, error) - } - TaskManager interface { PushTask(*audit.Task) error @@ -42,8 +38,7 @@ type ( log *zap.Logger pool *ants.Pool irList Indexer - clientCache NeoFSClientCache - key *ecdsa.PrivateKey + sgSrc SGSource searchTimeout time.Duration containerClient *wrapContainer.Wrapper @@ -60,7 +55,7 @@ type ( NetmapClient *wrapNetmap.Wrapper ContainerClient *wrapContainer.Wrapper IRList Indexer - ClientCache NeoFSClientCache + SGSource SGSource RPCSearchTimeout time.Duration TaskManager TaskManager Reporter audit.Reporter @@ -68,6 +63,48 @@ type ( } ) +// SearchSGPrm groups the parameters which are formed by Processor to search the storage group objects. +type SearchSGPrm struct { + ctx context.Context + + id *cid.ID + + info client.NodeInfo +} + +// Context returns context to use for network communication. +func (x SearchSGPrm) Context() context.Context { + return x.ctx +} + +// CID returns identifier of the container to search SG in. +func (x SearchSGPrm) CID() *cid.ID { + return x.id +} + +// NodeInfo returns information about storage node to communicate with. +func (x SearchSGPrm) NodeInfo() client.NodeInfo { + return x.info +} + +// SearchSGDst groups target values which Processor expects from SG searching to process. +type SearchSGDst struct { + ids []*object.ID +} + +// WriteIDList writes list of identifiers of storage group objects stored in the container. +func (x *SearchSGDst) WriteIDList(ids []*object.ID) { + x.ids = ids +} + +// SGSource is a storage group information source interface. +type SGSource interface { + // Lists storage group objects in the container. Formed list must be written to destination. + // + // Must return any error encountered which did not allow to form the list. + ListSG(*SearchSGDst, SearchSGPrm) error +} + type epochAuditReporter struct { epoch uint64 @@ -86,8 +123,8 @@ func New(p *Params) (*Processor, error) { return nil, errors.New("ir/audit: logger is not set") case p.IRList == nil: return nil, errors.New("ir/audit: global state is not set") - case p.ClientCache == nil: - return nil, errors.New("ir/audit: neofs RPC client cache is not set") + case p.SGSource == nil: + return nil, errors.New("ir/audit: SG source is not set") case p.TaskManager == nil: return nil, errors.New("ir/audit: audit task manager is not set") case p.Reporter == nil: @@ -106,8 +143,7 @@ func New(p *Params) (*Processor, error) { pool: pool, containerClient: p.ContainerClient, irList: p.IRList, - clientCache: p.ClientCache, - key: p.Key, + sgSrc: p.SGSource, searchTimeout: p.RPCSearchTimeout, netmapClient: p.NetmapClient, taskManager: p.TaskManager, diff --git a/pkg/innerring/rpc.go b/pkg/innerring/rpc.go index 5421a531..3bebee09 100644 --- a/pkg/innerring/rpc.go +++ b/pkg/innerring/rpc.go @@ -12,6 +12,8 @@ import ( "github.com/nspcc-dev/neofs-api-go/pkg/storagegroup" clientcore "github.com/nspcc-dev/neofs-node/pkg/core/client" coreObject "github.com/nspcc-dev/neofs-node/pkg/core/object" + neofsapiclient "github.com/nspcc-dev/neofs-node/pkg/innerring/internal/client" + auditproc "github.com/nspcc-dev/neofs-node/pkg/innerring/processors/audit" "github.com/nspcc-dev/neofs-node/pkg/network/cache" "github.com/nspcc-dev/neofs-node/pkg/services/audit" "github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement" @@ -82,7 +84,7 @@ func (c *ClientCache) getSG(ctx context.Context, addr *object.Address, nm *netma return nil, fmt.Errorf("parse client node info: %w", err) } - cli, err := c.Get(info) + cli, err := c.getWrappedClient(info) if err != nil { c.log.Warn("can't setup remote connection", zap.String("error", err.Error())) @@ -91,12 +93,15 @@ func (c *ClientCache) getSG(ctx context.Context, addr *object.Address, nm *netma } cctx, cancel := context.WithTimeout(ctx, c.sgTimeout) - obj, err := cli.GetObject(cctx, getParams, client.WithKey(c.key)) + + // NOTE: we use the function which does not verify object integrity (checksums, signature), + // but it would be useful to do as part of a data audit. + payload, err := neofsapiclient.GetObjectPayload(cctx, cli, addr) cancel() if err != nil { - c.log.Warn("can't get storage group object", + c.log.Warn("can't get payload of storage group object", zap.String("error", err.Error())) continue @@ -104,7 +109,7 @@ func (c *ClientCache) getSG(ctx context.Context, addr *object.Address, nm *netma sg := storagegroup.New() - err = sg.Unmarshal(obj.Payload()) + err = sg.Unmarshal(payload) if err != nil { return nil, fmt.Errorf("can't parse storage group payload: %w", err) } @@ -117,23 +122,10 @@ func (c *ClientCache) getSG(ctx context.Context, addr *object.Address, nm *netma // GetHeader requests node from the container under audit to return object header by id. func (c *ClientCache) GetHeader(task *audit.Task, node *netmap.Node, id *object.ID, relay bool) (*object.Object, error) { - raw := true - ttl := uint32(1) - - if relay { - ttl = 10 // todo: instead of hardcode value we can set TTL based on container length - raw = false - } - objAddress := new(object.Address) objAddress.SetContainerID(task.ContainerID()) objAddress.SetObjectID(id) - headParams := new(client.ObjectHeaderParams) - headParams.WithRawFlag(raw) - headParams.WithMainFields() - headParams.WithAddress(objAddress) - var info clientcore.NodeInfo err := clientcore.NodeInfoFromRawNetmapElement(&info, node) @@ -141,15 +133,21 @@ func (c *ClientCache) GetHeader(task *audit.Task, node *netmap.Node, id *object. return nil, fmt.Errorf("parse client node info: %w", err) } - cli, err := c.Get(info) + cli, err := c.getWrappedClient(info) if err != nil { return nil, fmt.Errorf("can't setup remote connection with %s: %w", info.AddressGroup(), err) } cctx, cancel := context.WithTimeout(task.AuditContext(), c.headTimeout) - head, err := cli.GetObjectHeader(cctx, headParams, - client.WithTTL(ttl), - client.WithKey(c.key)) + + var obj *object.Object + + if relay { + // todo: function sets hardcoded TTL value, but instead we can set TTL based on container length + obj, err = neofsapiclient.GetObjectHeaderFromContainer(cctx, cli, objAddress) + } else { + obj, err = neofsapiclient.GetRawObjectHeaderLocally(cctx, cli, objAddress) + } cancel() @@ -157,7 +155,7 @@ func (c *ClientCache) GetHeader(task *audit.Task, node *netmap.Node, id *object. return nil, fmt.Errorf("object head error: %w", err) } - return head, nil + return obj, nil } // GetRangeHash requests node from the container under audit to return Tillich-Zemor hash of the @@ -167,11 +165,6 @@ func (c *ClientCache) GetRangeHash(task *audit.Task, node *netmap.Node, id *obje objAddress.SetContainerID(task.ContainerID()) objAddress.SetObjectID(id) - rangeParams := new(client.RangeChecksumParams) - rangeParams.WithAddress(objAddress) - rangeParams.WithRangeList(rng) - rangeParams.WithSalt(nil) // it MUST be nil for correct hash concatenation in PDP game - var info clientcore.NodeInfo err := clientcore.NodeInfoFromRawNetmapElement(&info, node) @@ -179,15 +172,14 @@ func (c *ClientCache) GetRangeHash(task *audit.Task, node *netmap.Node, id *obje return nil, fmt.Errorf("parse client node info: %w", err) } - cli, err := c.Get(info) + cli, err := c.getWrappedClient(info) if err != nil { return nil, fmt.Errorf("can't setup remote connection with %s: %w", info.AddressGroup(), err) } cctx, cancel := context.WithTimeout(task.AuditContext(), c.rangeTimeout) - result, err := cli.ObjectPayloadRangeTZ(cctx, rangeParams, - client.WithTTL(1), - client.WithKey(c.key)) + + h, err := neofsapiclient.HashObjectRange(cctx, cli, objAddress, rng) cancel() @@ -195,7 +187,41 @@ func (c *ClientCache) GetRangeHash(task *audit.Task, node *netmap.Node, id *obje return nil, fmt.Errorf("object rangehash error: %w", err) } - // client guarantees that request and response have equal amount of ranges - - return result[0][:], nil + return h, nil +} + +func (c *ClientCache) getWrappedClient(info clientcore.NodeInfo) (neofsapiclient.Client, error) { + // can be also cached + var cInternal neofsapiclient.Client + + cli, err := c.Get(info) + if err != nil { + return cInternal, fmt.Errorf("could not get API client from cache") + } + + cInternal.WrapBasicClient(cli) + cInternal.SetPrivateKey(c.key) + + return cInternal, nil +} + +func (c ClientCache) ListSG(dst *auditproc.SearchSGDst, prm auditproc.SearchSGPrm) error { + cli, err := c.getWrappedClient(prm.NodeInfo()) + if err != nil { + return fmt.Errorf("could not get API client from cache") + } + + var cliPrm neofsapiclient.SearchSGPrm + + cliPrm.SetContext(prm.Context()) + cliPrm.SetContainerID(prm.CID()) + + res, err := cli.SearchSG(cliPrm) + if err != nil { + return err + } + + dst.WriteIDList(res.IDList()) + + return nil }