forked from TrueCloudLab/frostfs-s3-gw
[#346] Upgrade NeoFS SDK Go library
Core changes: - `object.ID` moved to new package `oid`; - `object.Address` moved to new package `address`; - `pool.Object` interface changes. Additionally: - Set container owner in `Agent.IssueSecret`. - Remove no longer needed fields from `GetObjectParams` - `Length` and `Offset` are never assigned. These values are set in `Range` field. Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
This commit is contained in:
parent
6a4fba4d09
commit
34a221c5c9
28 changed files with 430 additions and 518 deletions
|
@ -2,6 +2,8 @@ package layer
|
|||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"sort"
|
||||
"strconv"
|
||||
|
@ -12,9 +14,10 @@ import (
|
|||
"github.com/nspcc-dev/neofs-s3-gw/api/cache"
|
||||
"github.com/nspcc-dev/neofs-s3-gw/api/data"
|
||||
apiErrors "github.com/nspcc-dev/neofs-s3-gw/api/errors"
|
||||
"github.com/nspcc-dev/neofs-sdk-go/client"
|
||||
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
|
||||
"github.com/nspcc-dev/neofs-sdk-go/object"
|
||||
"github.com/nspcc-dev/neofs-sdk-go/object/address"
|
||||
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
|
||||
"github.com/nspcc-dev/neofs-sdk-go/owner"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
@ -32,13 +35,13 @@ type (
|
|||
}
|
||||
|
||||
getParams struct {
|
||||
io.Writer
|
||||
*object.Range
|
||||
w io.Writer
|
||||
|
||||
offset int64
|
||||
length int64
|
||||
cid *cid.ID
|
||||
oid *object.ID
|
||||
// payload range
|
||||
off, ln uint64
|
||||
|
||||
cid *cid.ID
|
||||
oid *oid.ID
|
||||
}
|
||||
|
||||
// ListObjectsParamsCommon contains common parameters for ListObjectsV1 and ListObjectsV2.
|
||||
|
@ -71,7 +74,7 @@ type (
|
|||
}
|
||||
)
|
||||
|
||||
func (n *layer) objectSearchByName(ctx context.Context, cid *cid.ID, filename string) ([]*object.ID, error) {
|
||||
func (n *layer) objectSearchByName(ctx context.Context, cid *cid.ID, filename string) ([]oid.ID, error) {
|
||||
f := &findParams{
|
||||
filters: []filter{{attr: object.AttributeFileName, val: filename}},
|
||||
cid: cid,
|
||||
|
@ -81,60 +84,134 @@ func (n *layer) objectSearchByName(ctx context.Context, cid *cid.ID, filename st
|
|||
}
|
||||
|
||||
// objectSearch returns all available objects by search params.
|
||||
func (n *layer) objectSearch(ctx context.Context, p *findParams) ([]*object.ID, error) {
|
||||
var opts object.SearchFilters
|
||||
|
||||
opts.AddRootFilter()
|
||||
func (n *layer) objectSearch(ctx context.Context, p *findParams) ([]oid.ID, error) {
|
||||
var filters object.SearchFilters
|
||||
filters.AddRootFilter()
|
||||
|
||||
for _, filter := range p.filters {
|
||||
opts.AddFilter(filter.attr, filter.val, object.MatchStringEqual)
|
||||
filters.AddFilter(filter.attr, filter.val, object.MatchStringEqual)
|
||||
}
|
||||
|
||||
if p.prefix != "" {
|
||||
opts.AddFilter(object.AttributeFileName, p.prefix, object.MatchCommonPrefix)
|
||||
filters.AddFilter(object.AttributeFileName, p.prefix, object.MatchCommonPrefix)
|
||||
}
|
||||
searchParams := new(client.SearchObjectParams).WithContainerID(p.cid).WithSearchFilters(opts)
|
||||
|
||||
ids, err := n.pool.SearchObject(ctx, searchParams, n.CallOptions(ctx)...)
|
||||
return ids, n.transformNeofsError(ctx, err)
|
||||
res, err := n.pool.SearchObjects(ctx, *p.cid, filters, n.CallOptions(ctx)...)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("init searching using client: %w", err)
|
||||
}
|
||||
|
||||
defer res.Close()
|
||||
|
||||
var num, read int
|
||||
buf := make([]oid.ID, 10)
|
||||
|
||||
for {
|
||||
num, err = res.Read(buf[read:])
|
||||
if num > 0 {
|
||||
read += num
|
||||
buf = append(buf, oid.ID{})
|
||||
buf = buf[:cap(buf)]
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
if errors.Is(err, io.EOF) {
|
||||
break
|
||||
}
|
||||
|
||||
return nil, n.transformNeofsError(ctx, err)
|
||||
}
|
||||
}
|
||||
|
||||
return buf[:read], nil
|
||||
}
|
||||
|
||||
func newAddress(cid *cid.ID, oid *object.ID) *object.Address {
|
||||
address := object.NewAddress()
|
||||
address.SetContainerID(cid)
|
||||
address.SetObjectID(oid)
|
||||
return address
|
||||
func newAddress(cid *cid.ID, oid *oid.ID) *address.Address {
|
||||
addr := address.NewAddress()
|
||||
addr.SetContainerID(cid)
|
||||
addr.SetObjectID(oid)
|
||||
return addr
|
||||
}
|
||||
|
||||
// objectHead returns all object's headers.
|
||||
func (n *layer) objectHead(ctx context.Context, cid *cid.ID, oid *object.ID) (*object.Object, error) {
|
||||
ops := new(client.ObjectHeaderParams).WithAddress(newAddress(cid, oid)).WithAllFields()
|
||||
obj, err := n.pool.GetObjectHeader(ctx, ops, n.CallOptions(ctx)...)
|
||||
func (n *layer) objectHead(ctx context.Context, idCnr *cid.ID, idObj *oid.ID) (*object.Object, error) {
|
||||
var addr address.Address
|
||||
|
||||
addr.SetContainerID(idCnr)
|
||||
addr.SetObjectID(idObj)
|
||||
|
||||
obj, err := n.pool.HeadObject(ctx, addr, n.CallOptions(ctx)...)
|
||||
return obj, n.transformNeofsError(ctx, err)
|
||||
}
|
||||
|
||||
// objectGetWithPayloadWriter and write it into provided io.Reader.
|
||||
func (n *layer) objectGetWithPayloadWriter(ctx context.Context, p *getParams) (*object.Object, error) {
|
||||
// prepare length/offset writer
|
||||
w := newWriter(p.Writer, p.offset, p.length)
|
||||
ops := new(client.GetObjectParams).WithAddress(newAddress(p.cid, p.oid)).WithPayloadWriter(w)
|
||||
obj, err := n.pool.GetObject(ctx, ops, n.CallOptions(ctx)...)
|
||||
return obj, n.transformNeofsError(ctx, err)
|
||||
// writes payload part of the NeoFS object to the provided io.Writer.
|
||||
// Zero range corresponds to full payload (panics if only offset is set).
|
||||
func (n *layer) objectWritePayload(ctx context.Context, p getParams) error {
|
||||
// form object address
|
||||
var a address.Address
|
||||
|
||||
a.SetContainerID(p.cid)
|
||||
a.SetObjectID(p.oid)
|
||||
|
||||
fmt.Println("objectWritePayload", p.cid, p.oid)
|
||||
|
||||
// init payload reader
|
||||
var r io.ReadCloser
|
||||
|
||||
if p.ln+p.off == 0 {
|
||||
res, err := n.pool.GetObject(ctx, a, n.CallOptions(ctx)...)
|
||||
if err != nil {
|
||||
return n.transformNeofsError(ctx, fmt.Errorf("get object using client: %w", err))
|
||||
}
|
||||
|
||||
p.ln = res.Header.PayloadSize()
|
||||
r = res.Payload
|
||||
} else {
|
||||
res, err := n.pool.ObjectRange(ctx, a, p.off, p.ln, n.CallOptions(ctx)...)
|
||||
if err != nil {
|
||||
return n.transformNeofsError(ctx, fmt.Errorf("range object payload using client: %w", err))
|
||||
}
|
||||
|
||||
r = res
|
||||
}
|
||||
|
||||
defer r.Close()
|
||||
|
||||
if p.ln > 0 {
|
||||
if p.ln > 4096 { // configure?
|
||||
p.ln = 4096
|
||||
}
|
||||
|
||||
// alloc buffer for copying
|
||||
buf := make([]byte, p.ln) // sync-pool it?
|
||||
|
||||
// copy full payload
|
||||
_, err := io.CopyBuffer(p.w, r, buf)
|
||||
if err != nil {
|
||||
return n.transformNeofsError(ctx, fmt.Errorf("copy payload range: %w", err))
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// objectGet returns an object with payload in the object.
|
||||
func (n *layer) objectGet(ctx context.Context, cid *cid.ID, oid *object.ID) (*object.Object, error) {
|
||||
ops := new(client.GetObjectParams).WithAddress(newAddress(cid, oid))
|
||||
obj, err := n.pool.GetObject(ctx, ops, n.CallOptions(ctx)...)
|
||||
return obj, n.transformNeofsError(ctx, err)
|
||||
}
|
||||
func (n *layer) objectGet(ctx context.Context, addr *address.Address) (*object.Object, error) {
|
||||
res, err := n.pool.GetObject(ctx, *addr, n.CallOptions(ctx)...)
|
||||
if err != nil {
|
||||
return nil, n.transformNeofsError(ctx, err)
|
||||
}
|
||||
|
||||
// objectRange gets object range and writes it into provided io.Writer.
|
||||
func (n *layer) objectRange(ctx context.Context, p *getParams) ([]byte, error) {
|
||||
w := newWriter(p.Writer, p.offset, p.length)
|
||||
ops := new(client.RangeDataParams).WithAddress(newAddress(p.cid, p.oid)).WithDataWriter(w).WithRange(p.Range)
|
||||
payload, err := n.pool.ObjectPayloadRangeData(ctx, ops, n.CallOptions(ctx)...)
|
||||
return payload, n.transformNeofsError(ctx, err)
|
||||
defer res.Payload.Close()
|
||||
|
||||
payload, err := io.ReadAll(res.Payload)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("read payload: %w", err)
|
||||
}
|
||||
|
||||
object.NewRawFrom(&res.Header).SetPayload(payload)
|
||||
|
||||
return &res.Header, nil
|
||||
}
|
||||
|
||||
// objectPut into NeoFS, took payload from io.Reader.
|
||||
|
@ -160,8 +237,7 @@ func (n *layer) objectPut(ctx context.Context, bkt *data.BucketInfo, p *PutObjec
|
|||
}
|
||||
rawObject := formRawObject(p, bkt.CID, own, p.Object)
|
||||
|
||||
ops := new(client.PutObjectParams).WithObject(rawObject.Object()).WithPayloadReader(r)
|
||||
oid, err := n.pool.PutObject(ctx, ops, n.CallOptions(ctx)...)
|
||||
id, err := n.pool.PutObject(ctx, *rawObject.Object(), r, n.CallOptions(ctx)...)
|
||||
if err != nil {
|
||||
return nil, n.transformNeofsError(ctx, err)
|
||||
}
|
||||
|
@ -172,7 +248,7 @@ func (n *layer) objectPut(ctx context.Context, bkt *data.BucketInfo, p *PutObjec
|
|||
}
|
||||
}
|
||||
|
||||
meta, err := n.objectHead(ctx, bkt.CID, oid)
|
||||
meta, err := n.objectHead(ctx, bkt.CID, id)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -201,7 +277,7 @@ func (n *layer) objectPut(ctx context.Context, bkt *data.BucketInfo, p *PutObjec
|
|||
}
|
||||
|
||||
return &data.ObjectInfo{
|
||||
ID: oid,
|
||||
ID: id,
|
||||
CID: bkt.CID,
|
||||
|
||||
Owner: own,
|
||||
|
@ -240,16 +316,17 @@ func formRawObject(p *PutObjectParams, bktID *cid.ID, own *owner.ID, obj string)
|
|||
raw.SetOwnerID(own)
|
||||
raw.SetContainerID(bktID)
|
||||
raw.SetAttributes(attributes...)
|
||||
raw.SetPayloadSize(uint64(p.Size))
|
||||
|
||||
return raw
|
||||
}
|
||||
|
||||
func updateCRDT2PSetHeaders(header map[string]string, versions *objectVersions, versioningEnabled bool) []*object.ID {
|
||||
func updateCRDT2PSetHeaders(header map[string]string, versions *objectVersions, versioningEnabled bool) []*oid.ID {
|
||||
if !versioningEnabled {
|
||||
header[versionsUnversionedAttr] = "true"
|
||||
}
|
||||
|
||||
var idsToDeleteArr []*object.ID
|
||||
var idsToDeleteArr []*oid.ID
|
||||
if versions.isEmpty() {
|
||||
return idsToDeleteArr
|
||||
}
|
||||
|
@ -298,8 +375,8 @@ func updateCRDT2PSetHeaders(header map[string]string, versions *objectVersions,
|
|||
}
|
||||
|
||||
func (n *layer) headLastVersionIfNotDeleted(ctx context.Context, bkt *data.BucketInfo, objectName string) (*data.ObjectInfo, error) {
|
||||
if address := n.namesCache.Get(bkt.Name + "/" + objectName); address != nil {
|
||||
if headInfo := n.objCache.Get(address); headInfo != nil {
|
||||
if addr := n.namesCache.Get(bkt.Name + "/" + objectName); addr != nil {
|
||||
if headInfo := n.objCache.Get(addr); headInfo != nil {
|
||||
return objInfoFromMeta(bkt, headInfo), nil
|
||||
}
|
||||
}
|
||||
|
@ -334,18 +411,18 @@ func (n *layer) headVersions(ctx context.Context, bkt *data.BucketInfo, objectNa
|
|||
return versions, apiErrors.GetAPIError(apiErrors.ErrNoSuchKey)
|
||||
}
|
||||
|
||||
for _, id := range ids {
|
||||
meta, err := n.objectHead(ctx, bkt.CID, id)
|
||||
for i := range ids {
|
||||
meta, err := n.objectHead(ctx, bkt.CID, &ids[i])
|
||||
if err != nil {
|
||||
n.log.Warn("couldn't head object",
|
||||
zap.Stringer("object id", id),
|
||||
zap.Stringer("object id", &ids[i]),
|
||||
zap.Stringer("bucket id", bkt.CID),
|
||||
zap.Error(err))
|
||||
continue
|
||||
}
|
||||
if err = n.objCache.Put(*meta); err != nil {
|
||||
n.log.Warn("couldn't put meta to objects cache",
|
||||
zap.Stringer("object id", id),
|
||||
zap.Stringer("object id", &ids[i]),
|
||||
zap.Stringer("bucket id", bkt.CID),
|
||||
zap.Error(err))
|
||||
}
|
||||
|
@ -375,16 +452,16 @@ func (n *layer) headVersion(ctx context.Context, bkt *data.BucketInfo, p *HeadOb
|
|||
return objInfo, nil
|
||||
}
|
||||
|
||||
oid := object.NewID()
|
||||
if err := oid.Parse(p.VersionID); err != nil {
|
||||
id := oid.NewID()
|
||||
if err := id.Parse(p.VersionID); err != nil {
|
||||
return nil, apiErrors.GetAPIError(apiErrors.ErrInvalidVersion)
|
||||
}
|
||||
|
||||
if headInfo := n.objCache.Get(newAddress(bkt.CID, oid)); headInfo != nil {
|
||||
if headInfo := n.objCache.Get(newAddress(bkt.CID, id)); headInfo != nil {
|
||||
return objInfoFromMeta(bkt, headInfo), nil
|
||||
}
|
||||
|
||||
meta, err := n.objectHead(ctx, bkt.CID, oid)
|
||||
meta, err := n.objectHead(ctx, bkt.CID, id)
|
||||
if err != nil {
|
||||
if strings.Contains(err.Error(), "not found") {
|
||||
return nil, apiErrors.GetAPIError(apiErrors.ErrNoSuchVersion)
|
||||
|
@ -406,12 +483,10 @@ func (n *layer) headVersion(ctx context.Context, bkt *data.BucketInfo, p *HeadOb
|
|||
}
|
||||
|
||||
// objectDelete puts tombstone object into neofs.
|
||||
func (n *layer) objectDelete(ctx context.Context, cid *cid.ID, oid *object.ID) error {
|
||||
address := newAddress(cid, oid)
|
||||
dop := new(client.DeleteObjectParams)
|
||||
dop.WithAddress(address)
|
||||
n.objCache.Delete(address)
|
||||
err := n.pool.DeleteObject(ctx, dop, n.CallOptions(ctx)...)
|
||||
func (n *layer) objectDelete(ctx context.Context, cid *cid.ID, oid *oid.ID) error {
|
||||
addr := newAddress(cid, oid)
|
||||
n.objCache.Delete(addr)
|
||||
err := n.pool.DeleteObject(ctx, *addr, n.CallOptions(ctx)...)
|
||||
return n.transformNeofsError(ctx, err)
|
||||
}
|
||||
|
||||
|
@ -529,7 +604,7 @@ func (n *layer) getAllObjectsVersions(ctx context.Context, bkt *data.BucketInfo,
|
|||
versions := make(map[string]*objectVersions, len(ids)/2)
|
||||
|
||||
for i := 0; i < len(ids); i++ {
|
||||
obj := n.objectFromObjectsCacheOrNeoFS(ctx, bkt.CID, ids[i])
|
||||
obj := n.objectFromObjectsCacheOrNeoFS(ctx, bkt.CID, &ids[i])
|
||||
if obj == nil {
|
||||
continue
|
||||
}
|
||||
|
@ -638,7 +713,7 @@ func (n *layer) isVersioningEnabled(ctx context.Context, bktInfo *data.BucketInf
|
|||
return settings.VersioningEnabled
|
||||
}
|
||||
|
||||
func (n *layer) objectFromObjectsCacheOrNeoFS(ctx context.Context, cid *cid.ID, oid *object.ID) *object.Object {
|
||||
func (n *layer) objectFromObjectsCacheOrNeoFS(ctx context.Context, cid *cid.ID, oid *oid.ID) *object.Object {
|
||||
var (
|
||||
err error
|
||||
meta = n.objCache.Get(newAddress(cid, oid))
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue