frostfs-node/pkg/morph/client/container/wrapper/container.go
Evgenii Stratonikov 02be6c83a6 morph/client: update morph container wrapper
Signed-off-by: Evgenii Stratonikov <evgeniy@nspcc.ru>
2021-12-02 14:16:23 +03:00

424 lines
10 KiB
Go

package wrapper
import (
"crypto/sha256"
"errors"
"fmt"
"strings"
v2refs "github.com/nspcc-dev/neofs-api-go/v2/refs"
core "github.com/nspcc-dev/neofs-node/pkg/core/container"
staticli "github.com/nspcc-dev/neofs-node/pkg/morph/client"
client "github.com/nspcc-dev/neofs-node/pkg/morph/client/container"
"github.com/nspcc-dev/neofs-sdk-go/container"
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
"github.com/nspcc-dev/neofs-sdk-go/owner"
"github.com/nspcc-dev/neofs-sdk-go/session"
"github.com/nspcc-dev/neofs-sdk-go/signature"
)
var (
errNilArgument = errors.New("empty argument")
errUnsupported = errors.New("unsupported structure version")
)
// Put marshals container, and passes it to Wrapper's Put method
// along with sig.Key() and sig.Sign().
//
// Returns error if container is nil.
func Put(w *Wrapper, cnr *container.Container) (*cid.ID, error) {
if cnr == nil {
return nil, errNilArgument
}
data, err := cnr.Marshal()
if err != nil {
return nil, fmt.Errorf("can't marshal container: %w", err)
}
binToken, err := cnr.SessionToken().Marshal()
if err != nil {
return nil, fmt.Errorf("could not marshal session token: %w", err)
}
sig := cnr.Signature()
name, zone := container.GetNativeNameWithZone(cnr)
err = w.Put(PutPrm{
cnr: data,
key: sig.Key(),
sig: sig.Sign(),
token: binToken,
name: name,
zone: zone,
})
if err != nil {
return nil, err
}
id := cid.New()
id.SetSHA256(sha256.Sum256(data))
return id, nil
}
// PutPrm groups parameters of Put operation.
type PutPrm struct {
cnr []byte
key []byte
sig []byte
token []byte
name string
zone string
staticli.InvokePrmOptional
}
// SetContainer sets container data.
func (p *PutPrm) SetContainer(cnr []byte) {
p.cnr = cnr
}
// SetKey sets public key.
func (p *PutPrm) SetKey(key []byte) {
p.key = key
}
// SetSignature sets signature.
func (p *PutPrm) SetSignature(sig []byte) {
p.sig = sig
}
// SetToken sets session token.
func (p *PutPrm) SetToken(token []byte) {
p.token = token
}
// SetName sets native name.
func (p *PutPrm) SetName(name string) {
p.name = name
}
// SetZone sets zone.
func (p *PutPrm) SetZone(zone string) {
p.zone = zone
}
// Put saves binary container with its session token, key and signature
// in NeoFS system through Container contract call.
//
// Returns calculated container identifier and any error
// encountered that caused the saving to interrupt.
//
// If TryNotary is provided, calls notary contract.
func (w *Wrapper) Put(prm PutPrm) error {
if len(prm.sig) == 0 || len(prm.key) == 0 {
return errNilArgument
}
var args client.PutArgs
args.SetContainer(prm.cnr)
args.SetSignature(prm.sig)
args.SetPublicKey(prm.key)
args.SetSessionToken(prm.token)
args.SetNativeNameWithZone(prm.name, prm.zone)
args.InvokePrmOptional = prm.InvokePrmOptional
err := w.client.Put(args)
if err != nil {
return err
}
return nil
}
type containerSource Wrapper
func (x *containerSource) Get(cid *cid.ID) (*container.Container, error) {
return Get((*Wrapper)(x), cid)
}
// AsContainerSource provides container Source interface
// from Wrapper instance.
func AsContainerSource(w *Wrapper) core.Source {
return (*containerSource)(w)
}
// Get marshals container ID, and passes it to Wrapper's Get method.
//
// Returns error if cid is nil.
func Get(w *Wrapper, cid *cid.ID) (*container.Container, error) {
return w.Get(cid.ToV2().GetValue())
}
// Get reads the container from NeoFS system by binary identifier
// through Container contract call.
//
// If an empty slice is returned for the requested identifier,
// storage.ErrNotFound error is returned.
func (w *Wrapper) Get(cid []byte) (*container.Container, error) {
var args client.GetArgs
args.SetCID(cid)
// ask RPC neo node to get serialized container
rpcAnswer, err := w.client.Get(args)
if err != nil {
// TODO(fyrchik): reuse messages from container contract.
// Currently there are some dependency problems:
// github.com/nspcc-dev/neofs-node/pkg/innerring imports
// github.com/nspcc-dev/neofs-sdk-go/audit imports
// github.com/nspcc-dev/neofs-api-go/v2/audit: ambiguous import: found package github.com/nspcc-dev/neofs-api-go/v2/audit in multiple modules:
// github.com/nspcc-dev/neofs-api-go v1.27.1 (/home/dzeta/go/pkg/mod/github.com/nspcc-dev/neofs-api-go@v1.27.1/v2/audit)
// github.com/nspcc-dev/neofs-api-go/v2 v2.11.0-pre.0.20211201134523-3604d96f3fe1 (/home/dzeta/go/pkg/mod/github.com/nspcc-dev/neofs-api-go/v2@v2.11.0-pre.0.20211201134523-3604d96f3fe1/audit)
if strings.Contains(err.Error(), "container does not exist") {
return nil, core.ErrNotFound
}
return nil, err
}
// unmarshal container
cnr := container.New()
if err := cnr.Unmarshal(rpcAnswer.Container()); err != nil {
// use other major version if there any
return nil, fmt.Errorf("can't unmarshal container: %w", err)
}
binToken := rpcAnswer.SessionToken()
if len(binToken) > 0 {
tok := session.NewToken()
err = tok.Unmarshal(binToken)
if err != nil {
return nil, fmt.Errorf("could not unmarshal session token: %w", err)
}
cnr.SetSessionToken(tok)
}
sig := signature.New()
sig.SetKey(rpcAnswer.PublicKey())
sig.SetSign(rpcAnswer.Signature())
cnr.SetSignature(sig)
return cnr, nil
}
// Delete marshals container ID, and passes it to Wrapper's Delete method
// along with signature and session token.
//
// Returns error if container ID is nil.
func Delete(w *Wrapper, witness core.RemovalWitness) error {
id := witness.ContainerID()
if id == nil {
return errNilArgument
}
binToken, err := witness.SessionToken().Marshal()
if err != nil {
return fmt.Errorf("could not marshal session token: %w", err)
}
return w.Delete(
DeletePrm{
cid: id.ToV2().GetValue(),
signature: witness.Signature(),
token: binToken,
})
}
// DeletePrm groups parameters of Delete client operation.
type DeletePrm struct {
cid []byte
signature []byte
token []byte
staticli.InvokePrmOptional
}
// SetCID sets container ID.
func (d *DeletePrm) SetCID(cid []byte) {
d.cid = cid
}
// SetSignature sets signature.
func (d *DeletePrm) SetSignature(signature []byte) {
d.signature = signature
}
// SetToken sets session token.
func (d *DeletePrm) SetToken(token []byte) {
d.token = token
}
// Delete removes the container from NeoFS system
// through Container contract call.
//
// Returns any error encountered that caused
// the removal to interrupt.
//
// If TryNotary is provided, calls notary contract.
func (w *Wrapper) Delete(prm DeletePrm) error {
if len(prm.signature) == 0 {
return errNilArgument
}
var args client.DeleteArgs
args.SetSignature(prm.signature)
args.SetCID(prm.cid)
args.SetSessionToken(prm.token)
args.InvokePrmOptional = prm.InvokePrmOptional
return w.client.Delete(args)
}
// List returns a list of container identifiers belonging
// to the specified owner of NeoFS system. The list is composed
// through Container contract call.
//
// Returns the identifiers of all NeoFS containers if pointer
// to owner identifier is nil.
func (w *Wrapper) List(ownerID *owner.ID) ([]*cid.ID, error) {
args := client.ListArgs{}
if ownerID == nil {
args.SetOwnerID([]byte{})
} else if v2 := ownerID.ToV2(); v2 == nil {
return nil, errUnsupported // use other major version if there any
} else {
args.SetOwnerID(v2.GetValue())
}
// ask RPC neo node to get serialized container
rpcAnswer, err := w.client.List(args)
if err != nil {
return nil, err
}
rawIDs := rpcAnswer.CIDList()
result := make([]*cid.ID, 0, len(rawIDs))
for i := range rawIDs {
v2 := new(v2refs.ContainerID)
v2.SetValue(rawIDs[i])
id := cid.NewFromV2(v2)
result = append(result, id)
}
return result, nil
}
// AnnounceLoadPrm groups parameters of AnnounceLoad operation.
type AnnounceLoadPrm struct {
a container.UsedSpaceAnnouncement
key []byte
staticli.InvokePrmOptional
}
// SetAnnouncement sets announcement.
func (a2 *AnnounceLoadPrm) SetAnnouncement(a container.UsedSpaceAnnouncement) {
a2.a = a
}
// SetReporter sets public key of the reporter.
func (a2 *AnnounceLoadPrm) SetReporter(key []byte) {
a2.key = key
}
// AnnounceLoad saves container size estimation calculated by storage node
// with key in NeoFS system through Container contract call.
//
// Returns any error encountered that caused the saving to interrupt.
func (w *Wrapper) AnnounceLoad(prm AnnounceLoadPrm) error {
v2 := prm.a.ContainerID().ToV2()
if v2 == nil {
return errUnsupported // use other major version if there any
}
args := client.PutSizeArgs{}
args.SetContainerID(v2.GetValue())
args.SetEpoch(prm.a.Epoch())
args.SetSize(prm.a.UsedSpace())
args.SetReporterKey(prm.key)
args.InvokePrmOptional = prm.InvokePrmOptional
return w.client.PutSize(args)
}
// EstimationID is an identity of container load estimation inside Container contract.
type EstimationID []byte
// ListLoadEstimationsByEpoch returns a list of container load estimations for to the specified epoch.
// The list is composed through Container contract call.
func (w *Wrapper) ListLoadEstimationsByEpoch(epoch uint64) ([]EstimationID, error) {
args := client.ListSizesArgs{}
args.SetEpoch(epoch)
// ask RPC neo node to get serialized container
rpcAnswer, err := w.client.ListSizes(args)
if err != nil {
return nil, err
}
rawIDs := rpcAnswer.IDList()
result := make([]EstimationID, 0, len(rawIDs))
for i := range rawIDs {
result = append(result, rawIDs[i])
}
return result, nil
}
// Estimation is a structure of single container load estimation
// reported by storage node.
type Estimation struct {
Size uint64
Reporter []byte
}
// Estimations is a structure of grouped container load estimation inside Container contract.
type Estimations struct {
ContainerID *cid.ID
Values []Estimation
}
// GetUsedSpaceEstimations returns a list of container load estimations by ID.
// The list is composed through Container contract call.
func (w *Wrapper) GetUsedSpaceEstimations(id EstimationID) (*Estimations, error) {
args := client.GetSizeArgs{}
args.SetID(id)
rpcAnswer, err := w.client.GetContainerSize(args)
if err != nil {
return nil, err
}
es := rpcAnswer.Estimations()
v2 := new(v2refs.ContainerID)
v2.SetValue(es.ContainerID)
res := &Estimations{
ContainerID: cid.NewFromV2(v2),
Values: make([]Estimation, 0, len(es.Estimations)),
}
for i := range es.Estimations {
res.Values = append(res.Values, Estimation{
Size: uint64(es.Estimations[i].Size),
Reporter: es.Estimations[i].Reporter,
})
}
return res, nil
}