From a8c8bdaadf955698795c6574c728ad0bef471ee4 Mon Sep 17 00:00:00 2001 From: Evgeniy Kulikov Date: Mon, 3 Aug 2020 18:08:55 +0300 Subject: [PATCH] Add NeoFS helper to fetch object by refs.Address Signed-off-by: Evgeniy Kulikov --- neofs/layer/layer.go | 82 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 82 insertions(+) diff --git a/neofs/layer/layer.go b/neofs/layer/layer.go index 9d43e67f..e02b5bd7 100644 --- a/neofs/layer/layer.go +++ b/neofs/layer/layer.go @@ -5,10 +5,12 @@ import ( "crypto/ecdsa" "io" "strings" + "time" "github.com/minio/minio/neofs/pool" "github.com/nspcc-dev/neofs-api-go/object" "github.com/nspcc-dev/neofs-api-go/refs" + "github.com/nspcc-dev/neofs-api-go/service" "github.com/pkg/errors" "go.uber.org/zap" ) @@ -44,7 +46,13 @@ type ( DstObject string } + NeoFS interface { + Get(ctx context.Context, address refs.Address) (*object.Object, error) + } + Client interface { + NeoFS + ListBuckets(ctx context.Context) ([]BucketInfo, error) GetBucketInfo(ctx context.Context, name string) (*BucketInfo, error) @@ -80,6 +88,80 @@ func NewLayer(log *zap.Logger, cli pool.Client, key *ecdsa.PrivateKey) (Client, }, nil } +// Get NeoFS Object by refs.Address (should be used by auth.Center) +func (n *layer) Get(ctx context.Context, address refs.Address) (*object.Object, error) { + conn, err := n.cli.GetConnection(ctx) + if err != nil { + return nil, err + } + + token, err := n.cli.SessionToken(ctx, &pool.SessionParams{ + Conn: conn, + Addr: address, + Verb: service.Token_Info_Get, + }) + + if err != nil { + return nil, err + } + + req := new(object.GetRequest) + req.Address = address + req.SetTTL(service.SingleForwardingTTL) + req.SetToken(token) + + err = service.SignRequestData(n.key, req) + if err != nil { + return nil, err + } + + // todo: think about timeout + ctx, cancel := context.WithTimeout(ctx, 30*time.Second) + defer cancel() + + cli, err := object.NewServiceClient(conn).Get(ctx, req) + if err != nil { + return nil, err + } + + var ( + off int + obj *object.Object + ) + + for { + resp, err := cli.Recv() + if err != nil { + if err == io.EOF { + break + } + + return nil, err + } + + switch o := resp.R.(type) { + case *object.GetResponse_Object: + obj = o.Object + + _, hdr := obj.LastHeader(object.HeaderType(object.TombstoneHdr)) + if hdr != nil { + return nil, errors.New("object already removed") + } + + obj.Payload = make([]byte, obj.SystemHeader.PayloadLength) + case *object.GetResponse_Chunk: + if obj == nil { + return nil, errors.New("object headers not received") + } + off += copy(obj.Payload[off:], o.Chunk) + default: + return nil, errors.Errorf("unknown response %T", o) + } + } + + return obj, nil +} + // GetBucketInfo returns bucket name. func (n *layer) GetBucketInfo(ctx context.Context, name string) (*BucketInfo, error) { list, err := n.containerList(ctx)