[#346] *: Refactor communication with NeoFS at the protocol level

Make `tokens`, `authmate` and `layer` packages to depend from locally
defined `NeoFS` interface of the virtual connection to NeoFS network.
Create internal `neofs` package and implement these interfaces through
`pool.Pool` there. Implement mediators between `NeoFS` interfaces and
`neofs.NeoFS` implementation.

Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
This commit is contained in:
Leonard Lyubich 2022-03-01 22:02:24 +03:00 committed by LeL
parent 34a221c5c9
commit cd64f41ce8
14 changed files with 1348 additions and 606 deletions

View file

@ -3,10 +3,8 @@ package layer
import (
"context"
"errors"
"fmt"
"io"
"sort"
"strconv"
"strings"
"time"
@ -18,20 +16,14 @@ import (
"github.com/nspcc-dev/neofs-sdk-go/object"
"github.com/nspcc-dev/neofs-sdk-go/object/address"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
"github.com/nspcc-dev/neofs-sdk-go/owner"
"go.uber.org/zap"
)
type (
findParams struct {
filters []filter
cid *cid.ID
prefix string
}
filter struct {
attr string
val string
attr [2]string
cid *cid.ID
prefix string
}
getParams struct {
@ -76,54 +68,25 @@ type (
func (n *layer) objectSearchByName(ctx context.Context, cid *cid.ID, filename string) ([]oid.ID, error) {
f := &findParams{
filters: []filter{{attr: object.AttributeFileName, val: filename}},
cid: cid,
prefix: "",
attr: [2]string{object.AttributeFileName, filename},
cid: cid,
}
return n.objectSearch(ctx, f)
}
// objectSearch returns all available objects by search params.
func (n *layer) objectSearch(ctx context.Context, p *findParams) ([]oid.ID, error) {
var filters object.SearchFilters
filters.AddRootFilter()
for _, filter := range p.filters {
filters.AddFilter(filter.attr, filter.val, object.MatchStringEqual)
prm := PrmObjectSelect{
Container: *p.cid,
ExactAttribute: p.attr,
FilePrefix: p.prefix,
}
if p.prefix != "" {
filters.AddFilter(object.AttributeFileName, p.prefix, object.MatchCommonPrefix)
}
n.prepareAuthParameters(ctx, &prm.PrmAuth)
res, err := n.pool.SearchObjects(ctx, *p.cid, filters, n.CallOptions(ctx)...)
if err != nil {
return nil, fmt.Errorf("init searching using client: %w", err)
}
res, err := n.neoFS.SelectObjects(ctx, prm)
defer res.Close()
var num, read int
buf := make([]oid.ID, 10)
for {
num, err = res.Read(buf[read:])
if num > 0 {
read += num
buf = append(buf, oid.ID{})
buf = buf[:cap(buf)]
}
if err != nil {
if errors.Is(err, io.EOF) {
break
}
return nil, n.transformNeofsError(ctx, err)
}
}
return buf[:read], nil
return res, n.transformNeofsError(ctx, err)
}
func newAddress(cid *cid.ID, oid *oid.ID) *address.Address {
@ -135,83 +98,69 @@ func newAddress(cid *cid.ID, oid *oid.ID) *address.Address {
// objectHead returns all object's headers.
func (n *layer) objectHead(ctx context.Context, idCnr *cid.ID, idObj *oid.ID) (*object.Object, error) {
var addr address.Address
prm := PrmObjectRead{
Container: *idCnr,
Object: *idObj,
WithHeader: true,
}
addr.SetContainerID(idCnr)
addr.SetObjectID(idObj)
n.prepareAuthParameters(ctx, &prm.PrmAuth)
obj, err := n.pool.HeadObject(ctx, addr, n.CallOptions(ctx)...)
return obj, n.transformNeofsError(ctx, err)
res, err := n.neoFS.ReadObject(ctx, prm)
if err != nil {
return nil, n.transformNeofsError(ctx, err)
}
return res.Head, nil
}
// writes payload part of the NeoFS object to the provided io.Writer.
// Zero range corresponds to full payload (panics if only offset is set).
func (n *layer) objectWritePayload(ctx context.Context, p getParams) error {
// form object address
var a address.Address
a.SetContainerID(p.cid)
a.SetObjectID(p.oid)
fmt.Println("objectWritePayload", p.cid, p.oid)
// init payload reader
var r io.ReadCloser
if p.ln+p.off == 0 {
res, err := n.pool.GetObject(ctx, a, n.CallOptions(ctx)...)
if err != nil {
return n.transformNeofsError(ctx, fmt.Errorf("get object using client: %w", err))
}
p.ln = res.Header.PayloadSize()
r = res.Payload
} else {
res, err := n.pool.ObjectRange(ctx, a, p.off, p.ln, n.CallOptions(ctx)...)
if err != nil {
return n.transformNeofsError(ctx, fmt.Errorf("range object payload using client: %w", err))
}
r = res
prm := PrmObjectRead{
Container: *p.cid,
Object: *p.oid,
WithPayload: true,
PayloadRange: [2]uint64{p.off, p.ln},
}
defer r.Close()
n.prepareAuthParameters(ctx, &prm.PrmAuth)
if p.ln > 0 {
if p.ln > 4096 { // configure?
p.ln = 4096
res, err := n.neoFS.ReadObject(ctx, prm)
if err == nil {
defer res.Payload.Close()
if p.ln == 0 {
p.ln = 4096 // configure?
}
// alloc buffer for copying
buf := make([]byte, p.ln) // sync-pool it?
// copy full payload
_, err := io.CopyBuffer(p.w, r, buf)
if err != nil {
return n.transformNeofsError(ctx, fmt.Errorf("copy payload range: %w", err))
}
_, err = io.CopyBuffer(p.w, res.Payload, buf)
}
return nil
return n.transformNeofsError(ctx, err)
}
// objectGet returns an object with payload in the object.
func (n *layer) objectGet(ctx context.Context, addr *address.Address) (*object.Object, error) {
res, err := n.pool.GetObject(ctx, *addr, n.CallOptions(ctx)...)
prm := PrmObjectRead{
Container: *addr.ContainerID(),
Object: *addr.ObjectID(),
WithHeader: true,
WithPayload: true,
}
n.prepareAuthParameters(ctx, &prm.PrmAuth)
res, err := n.neoFS.ReadObject(ctx, prm)
if err != nil {
return nil, n.transformNeofsError(ctx, err)
}
defer res.Payload.Close()
payload, err := io.ReadAll(res.Payload)
if err != nil {
return nil, fmt.Errorf("read payload: %w", err)
}
object.NewRawFrom(&res.Header).SetPayload(payload)
return &res.Header, nil
return res.Head, nil
}
// objectPut into NeoFS, took payload from io.Reader.
@ -235,9 +184,24 @@ func (n *layer) objectPut(ctx context.Context, bkt *data.BucketInfo, p *PutObjec
r = d.MultiReader()
}
}
rawObject := formRawObject(p, bkt.CID, own, p.Object)
id, err := n.pool.PutObject(ctx, *rawObject.Object(), r, n.CallOptions(ctx)...)
prm := PrmObjectCreate{
Container: *bkt.CID,
Creator: *own,
PayloadSize: uint64(p.Size),
Filename: p.Object,
Payload: r,
}
prm.Attributes = make([][2]string, 0, len(p.Header))
for k, v := range p.Header {
prm.Attributes = append(prm.Attributes, [2]string{k, v})
}
n.prepareAuthParameters(ctx, &prm.PrmAuth)
id, err := n.neoFS.CreateObject(ctx, prm)
if err != nil {
return nil, n.transformNeofsError(ctx, err)
}
@ -292,35 +256,6 @@ func (n *layer) objectPut(ctx context.Context, bkt *data.BucketInfo, p *PutObjec
}, nil
}
func formRawObject(p *PutObjectParams, bktID *cid.ID, own *owner.ID, obj string) *object.RawObject {
attributes := make([]*object.Attribute, 0, len(p.Header)+2)
filename := object.NewAttribute()
filename.SetKey(object.AttributeFileName)
filename.SetValue(obj)
createdAt := object.NewAttribute()
createdAt.SetKey(object.AttributeTimestamp)
createdAt.SetValue(strconv.FormatInt(time.Now().UTC().Unix(), 10))
attributes = append(attributes, filename, createdAt)
for k, v := range p.Header {
ua := object.NewAttribute()
ua.SetKey(k)
ua.SetValue(v)
attributes = append(attributes, ua)
}
raw := object.NewRaw()
raw.SetOwnerID(own)
raw.SetContainerID(bktID)
raw.SetAttributes(attributes...)
raw.SetPayloadSize(uint64(p.Size))
return raw
}
func updateCRDT2PSetHeaders(header map[string]string, versions *objectVersions, versioningEnabled bool) []*oid.ID {
if !versioningEnabled {
header[versionsUnversionedAttr] = "true"
@ -483,11 +418,17 @@ func (n *layer) headVersion(ctx context.Context, bkt *data.BucketInfo, p *HeadOb
}
// objectDelete puts tombstone object into neofs.
func (n *layer) objectDelete(ctx context.Context, cid *cid.ID, oid *oid.ID) error {
addr := newAddress(cid, oid)
n.objCache.Delete(addr)
err := n.pool.DeleteObject(ctx, *addr, n.CallOptions(ctx)...)
return n.transformNeofsError(ctx, err)
func (n *layer) objectDelete(ctx context.Context, idCnr *cid.ID, idObj *oid.ID) error {
prm := PrmObjectDelete{
Container: *idCnr,
Object: *idObj,
}
n.prepareAuthParameters(ctx, &prm.PrmAuth)
n.objCache.Delete(newAddress(idCnr, idObj))
return n.transformNeofsError(ctx, n.neoFS.DeleteObject(ctx, prm))
}
// ListObjectsV1 returns objects in a bucket for requests of Version 1.
@ -737,7 +678,7 @@ func (n *layer) transformNeofsError(ctx context.Context, err error) error {
return nil
}
if strings.Contains(err.Error(), "access to operation") && strings.Contains(err.Error(), "is denied by") {
if errors.Is(err, ErrAccessDenied) {
n.log.Debug("error was transformed", zap.String("request_id", api.GetRequestID(ctx)), zap.Error(err))
return apiErrors.GetAPIError(apiErrors.ErrAccessDenied)
}