forked from TrueCloudLab/frostfs-s3-gw
Merge pull request #6 from nspcc-dev/layer-get-neofs-object
Add NeoFS helper to fetch object by refs.Address
This commit is contained in:
commit
8101161e51
1 changed files with 82 additions and 0 deletions
|
@ -5,10 +5,12 @@ import (
|
|||
"crypto/ecdsa"
|
||||
"io"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/minio/minio/neofs/pool"
|
||||
"github.com/nspcc-dev/neofs-api-go/object"
|
||||
"github.com/nspcc-dev/neofs-api-go/refs"
|
||||
"github.com/nspcc-dev/neofs-api-go/service"
|
||||
"github.com/pkg/errors"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
@ -44,7 +46,13 @@ type (
|
|||
DstObject string
|
||||
}
|
||||
|
||||
NeoFS interface {
|
||||
Get(ctx context.Context, address refs.Address) (*object.Object, error)
|
||||
}
|
||||
|
||||
Client interface {
|
||||
NeoFS
|
||||
|
||||
ListBuckets(ctx context.Context) ([]BucketInfo, error)
|
||||
GetBucketInfo(ctx context.Context, name string) (*BucketInfo, error)
|
||||
|
||||
|
@ -80,6 +88,80 @@ func NewLayer(log *zap.Logger, cli pool.Client, key *ecdsa.PrivateKey) (Client,
|
|||
}, nil
|
||||
}
|
||||
|
||||
// Get NeoFS Object by refs.Address (should be used by auth.Center)
|
||||
func (n *layer) Get(ctx context.Context, address refs.Address) (*object.Object, error) {
|
||||
conn, err := n.cli.GetConnection(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
token, err := n.cli.SessionToken(ctx, &pool.SessionParams{
|
||||
Conn: conn,
|
||||
Addr: address,
|
||||
Verb: service.Token_Info_Get,
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
req := new(object.GetRequest)
|
||||
req.Address = address
|
||||
req.SetTTL(service.SingleForwardingTTL)
|
||||
req.SetToken(token)
|
||||
|
||||
err = service.SignRequestData(n.key, req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// todo: think about timeout
|
||||
ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
|
||||
defer cancel()
|
||||
|
||||
cli, err := object.NewServiceClient(conn).Get(ctx, req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var (
|
||||
off int
|
||||
obj *object.Object
|
||||
)
|
||||
|
||||
for {
|
||||
resp, err := cli.Recv()
|
||||
if err != nil {
|
||||
if err == io.EOF {
|
||||
break
|
||||
}
|
||||
|
||||
return nil, err
|
||||
}
|
||||
|
||||
switch o := resp.R.(type) {
|
||||
case *object.GetResponse_Object:
|
||||
obj = o.Object
|
||||
|
||||
_, hdr := obj.LastHeader(object.HeaderType(object.TombstoneHdr))
|
||||
if hdr != nil {
|
||||
return nil, errors.New("object already removed")
|
||||
}
|
||||
|
||||
obj.Payload = make([]byte, obj.SystemHeader.PayloadLength)
|
||||
case *object.GetResponse_Chunk:
|
||||
if obj == nil {
|
||||
return nil, errors.New("object headers not received")
|
||||
}
|
||||
off += copy(obj.Payload[off:], o.Chunk)
|
||||
default:
|
||||
return nil, errors.Errorf("unknown response %T", o)
|
||||
}
|
||||
}
|
||||
|
||||
return obj, nil
|
||||
}
|
||||
|
||||
// GetBucketInfo returns bucket name.
|
||||
func (n *layer) GetBucketInfo(ctx context.Context, name string) (*BucketInfo, error) {
|
||||
list, err := n.containerList(ctx)
|
||||
|
|
Loading…
Reference in a new issue