Compare commits

...

3 commits

Author SHA1 Message Date
51c5c227c2 [#31] Add force bucket delete flag
Signed-off-by: Pavel Pogodaev <p.pogodaev@yadro.com>
2024-07-25 14:04:54 +03:00
c506620199 [#439] Update SDK version
New SDK version supports extended log records in pool component during inner nodes health check.

Signed-off-by: Marina Biryukova <m.biryukova@yadro.com>
2024-07-24 12:16:37 +00:00
6cb0026007 [#427] layer: Split FrostFS ReadObject to separate methods
Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
2024-07-23 16:53:59 +03:00
19 changed files with 306 additions and 153 deletions

View file

@ -237,9 +237,18 @@ func (h *handler) DeleteBucketHandler(w http.ResponseWriter, r *http.Request) {
sessionToken = boxData.Gate.SessionTokenForDelete()
}
skipObjCheck := false
if value, ok := r.Header[api.AmzForceBucketDelete]; ok {
s := value[0]
if s == "true" {
skipObjCheck = true
}
}
if err = h.obj.DeleteBucket(r.Context(), &layer.DeleteBucketParams{
BktInfo: bktInfo,
SessionToken: sessionToken,
SkipCheck: skipObjCheck,
}); err != nil {
h.logAndSendError(w, "couldn't delete bucket", reqInfo, err)
return

View file

@ -85,6 +85,24 @@ func TestDeleteBucketOnNotFoundError(t *testing.T) {
deleteBucket(t, hc, bktName, http.StatusNoContent)
}
func TestForceDeleteBucket(t *testing.T) {
hc := prepareHandlerContext(t)
bktName, objName := "bucket-for-removal", "object-to-delete"
bktInfo := createTestBucket(hc, bktName)
putObject(hc, bktName, objName)
nodeVersion, err := hc.tree.GetUnversioned(hc.context, bktInfo, objName)
require.NoError(t, err)
var addr oid.Address
addr.SetContainer(bktInfo.CID)
addr.SetObject(nodeVersion.OID)
deleteBucketForce(t, hc, bktName, http.StatusConflict, "false")
deleteBucketForce(t, hc, bktName, http.StatusNoContent, "true")
}
func TestDeleteMultipleObjectCheckUniqueness(t *testing.T) {
hc := prepareHandlerContext(t)
@ -517,6 +535,13 @@ func deleteObjectsBase(hc *handlerContext, bktName string, objVersions [][2]stri
return w
}
func deleteBucketForce(t *testing.T, tc *handlerContext, bktName string, code int, value string) {
w, r := prepareTestRequest(tc, bktName, "", nil)
r.Header.Set(api.AmzForceBucketDelete, value)
tc.Handler().DeleteBucketHandler(w, r)
assertStatus(t, w, code)
}
func deleteBucket(t *testing.T, tc *handlerContext, bktName string, code int) {
w, r := prepareTestRequest(tc, bktName, "", nil)
tc.Handler().DeleteBucketHandler(w, r)

View file

@ -37,7 +37,7 @@ func TestSimpleGetEncrypted(t *testing.T) {
objInfo, err := tc.Layer().GetObjectInfo(tc.Context(), &layer.HeadObjectParams{BktInfo: bktInfo, Object: objName})
require.NoError(t, err)
obj, err := tc.MockedPool().ReadObject(tc.Context(), layer.PrmObjectRead{Container: bktInfo.CID, Object: objInfo.ID})
obj, err := tc.MockedPool().GetObject(tc.Context(), layer.PrmObjectGet{Container: bktInfo.CID, Object: objInfo.ID})
require.NoError(t, err)
encryptedContent, err := io.ReadAll(obj.Payload)
require.NoError(t, err)

View file

@ -62,6 +62,7 @@ const (
AmzMaxParts = "X-Amz-Max-Parts"
AmzPartNumberMarker = "X-Amz-Part-Number-Marker"
AmzStorageClass = "X-Amz-Storage-Class"
AmzForceBucketDelete = "X-Amz-Force-Delete-Bucket"
AmzServerSideEncryptionCustomerAlgorithm = "x-amz-server-side-encryption-customer-algorithm"
AmzServerSideEncryptionCustomerKey = "x-amz-server-side-encryption-customer-key"

View file

@ -78,8 +78,32 @@ type PrmAuth struct {
PrivateKey *ecdsa.PrivateKey
}
// PrmObjectRead groups parameters of FrostFS.ReadObject operation.
type PrmObjectRead struct {
// PrmObjectHead groups parameters of FrostFS.HeadObject operation.
type PrmObjectHead struct {
// Authentication parameters.
PrmAuth
// Container to read the object header from.
Container cid.ID
// ID of the object for which to read the header.
Object oid.ID
}
// PrmObjectGet groups parameters of FrostFS.GetObject operation.
type PrmObjectGet struct {
// Authentication parameters.
PrmAuth
// Container to read the object header from.
Container cid.ID
// ID of the object for which to read the header.
Object oid.ID
}
// PrmObjectRange groups parameters of FrostFS.RangeObject operation.
type PrmObjectRange struct {
// Authentication parameters.
PrmAuth
@ -89,20 +113,14 @@ type PrmObjectRead struct {
// ID of the object for which to read the header.
Object oid.ID
// Flag to read object header.
WithHeader bool
// Flag to read object payload. False overlaps payload range.
WithPayload bool
// Offset-length range of the object payload to be read.
PayloadRange [2]uint64
}
// ObjectPart represents partially read FrostFS object.
type ObjectPart struct {
// Object header with optional in-memory payload part.
Head *object.Object
// Object represents full read FrostFS object.
type Object struct {
// Object header (doesn't contain payload).
Header object.Object
// Object payload part encapsulated in io.Reader primitive.
// Returns ErrAccessDenied on read access violation.
@ -213,13 +231,15 @@ type FrostFS interface {
// It returns any error encountered which prevented the removal request from being sent.
DeleteContainer(context.Context, cid.ID, *session.Container) error
// ReadObject reads a part of the object from the FrostFS container by identifier.
// Exact part is returned according to the parameters:
// * with header only: empty payload (both in-mem and reader parts are nil);
// * with payload only: header is nil (zero range means full payload);
// * with header and payload: full in-mem object, payload reader is nil.
// HeadObject reads an info of the object from the FrostFS container by identifier.
//
// WithHeader or WithPayload is true. Range length is positive if offset is positive.
// It returns ErrAccessDenied on read access violation.
//
// It returns exactly one non-nil value. It returns any error encountered which
// prevented the object header from being read.
HeadObject(ctx context.Context, prm PrmObjectHead) (*object.Object, error)
// GetObject reads an object from the FrostFS container by identifier.
//
// Payload reader should be closed if it is no longer needed.
//
@ -227,7 +247,17 @@ type FrostFS interface {
//
// It returns exactly one non-nil value. It returns any error encountered which
// prevented the object header from being read.
ReadObject(context.Context, PrmObjectRead) (*ObjectPart, error)
GetObject(ctx context.Context, prm PrmObjectGet) (*Object, error)
// RangeObject reads a part of object from the FrostFS container by identifier.
//
// Payload reader should be closed if it is no longer needed.
//
// It returns ErrAccessDenied on read access violation.
//
// It returns exactly one non-nil value. It returns any error encountered which
// prevented the object header from being read.
RangeObject(ctx context.Context, prm PrmObjectRange) (io.ReadCloser, error)
// CreateObject creates and saves a parameterized object in the FrostFS container.
// It sets 'Timestamp' attribute to the current time.

View file

@ -204,10 +204,10 @@ func (t *TestFrostFS) UserContainers(context.Context, PrmUserContainers) ([]cid.
return res, nil
}
func (t *TestFrostFS) ReadObject(ctx context.Context, prm PrmObjectRead) (*ObjectPart, error) {
func (t *TestFrostFS) retrieveObject(ctx context.Context, cnrID cid.ID, objID oid.ID) (*object.Object, error) {
var addr oid.Address
addr.SetContainer(prm.Container)
addr.SetObject(prm.Object)
addr.SetContainer(cnrID)
addr.SetObject(objID)
sAddr := addr.EncodeToString()
@ -217,26 +217,44 @@ func (t *TestFrostFS) ReadObject(ctx context.Context, prm PrmObjectRead) (*Objec
if obj, ok := t.objects[sAddr]; ok {
owner := getBearerOwner(ctx)
if !t.checkAccess(prm.Container, owner) {
if !t.checkAccess(cnrID, owner) {
return nil, ErrAccessDenied
}
payload := obj.Payload()
if prm.PayloadRange[0]+prm.PayloadRange[1] > 0 {
off := prm.PayloadRange[0]
payload = payload[off : off+prm.PayloadRange[1]]
}
return &ObjectPart{
Head: obj,
Payload: io.NopCloser(bytes.NewReader(payload)),
}, nil
return obj, nil
}
return nil, fmt.Errorf("%w: %s", &apistatus.ObjectNotFound{}, addr)
}
func (t *TestFrostFS) HeadObject(ctx context.Context, prm PrmObjectHead) (*object.Object, error) {
return t.retrieveObject(ctx, prm.Container, prm.Object)
}
func (t *TestFrostFS) GetObject(ctx context.Context, prm PrmObjectGet) (*Object, error) {
obj, err := t.retrieveObject(ctx, prm.Container, prm.Object)
if err != nil {
return nil, err
}
return &Object{
Header: *obj,
Payload: io.NopCloser(bytes.NewReader(obj.Payload())),
}, nil
}
func (t *TestFrostFS) RangeObject(ctx context.Context, prm PrmObjectRange) (io.ReadCloser, error) {
obj, err := t.retrieveObject(ctx, prm.Container, prm.Object)
if err != nil {
return nil, err
}
off := prm.PayloadRange[0]
payload := obj.Payload()[off : off+prm.PayloadRange[1]]
return io.NopCloser(bytes.NewReader(payload)), nil
}
func (t *TestFrostFS) CreateObject(_ context.Context, prm PrmObjectCreate) (oid.ID, error) {
b := make([]byte, 32)
if _, err := io.ReadFull(rand.Reader, b); err != nil {

View file

@ -170,6 +170,7 @@ type (
DeleteBucketParams struct {
BktInfo *data.BucketInfo
SessionToken *session.Container
SkipCheck bool
}
// ListObjectVersionsParams stores list objects versions parameters.
@ -742,7 +743,7 @@ func (n *Layer) removeCombinedObject(ctx context.Context, bkt *data.BucketInfo,
}
var parts []*data.PartInfo
if err = json.Unmarshal(combinedObj.Payload(), &parts); err != nil {
if err = json.NewDecoder(combinedObj.Payload).Decode(&parts); err != nil {
return fmt.Errorf("unmarshal combined object parts: %w", err)
}
@ -804,16 +805,18 @@ func (n *Layer) ResolveBucket(ctx context.Context, name string) (cid.ID, error)
}
func (n *Layer) DeleteBucket(ctx context.Context, p *DeleteBucketParams) error {
res, _, err := n.getAllObjectsVersions(ctx, commonVersionsListingParams{
BktInfo: p.BktInfo,
MaxKeys: 1,
})
if !p.SkipCheck {
res, _, err := n.getAllObjectsVersions(ctx, commonVersionsListingParams{
BktInfo: p.BktInfo,
MaxKeys: 1,
})
if err != nil {
return err
}
if len(res) != 0 {
return errors.GetAPIError(errors.ErrBucketNotEmpty)
if err != nil {
return err
}
if len(res) != 0 {
return errors.GetAPIError(errors.ErrBucketNotEmpty)
}
}
n.cache.DeleteBucket(p.BktInfo)

View file

@ -68,20 +68,14 @@ func newAddress(cnr cid.ID, obj oid.ID) oid.Address {
// objectHead returns all object's headers.
func (n *Layer) objectHead(ctx context.Context, bktInfo *data.BucketInfo, idObj oid.ID) (*object.Object, error) {
prm := PrmObjectRead{
Container: bktInfo.CID,
Object: idObj,
WithHeader: true,
prm := PrmObjectHead{
Container: bktInfo.CID,
Object: idObj,
}
n.prepareAuthParameters(ctx, &prm.PrmAuth, bktInfo.Owner)
res, err := n.frostFS.ReadObject(ctx, prm)
if err != nil {
return nil, err
}
return res.Head, nil
return n.frostFS.HeadObject(ctx, prm)
}
func (n *Layer) initObjectPayloadReader(ctx context.Context, p getParams) (io.Reader, error) {
@ -100,7 +94,7 @@ func (n *Layer) initObjectPayloadReader(ctx context.Context, p getParams) (io.Re
}
var parts []*data.PartInfo
if err = json.Unmarshal(combinedObj.Payload(), &parts); err != nil {
if err = json.NewDecoder(combinedObj.Payload).Decode(&parts); err != nil {
return nil, fmt.Errorf("unmarshal combined object parts: %w", err)
}
@ -132,16 +126,27 @@ func (n *Layer) initObjectPayloadReader(ctx context.Context, p getParams) (io.Re
// initializes payload reader of the FrostFS object.
// Zero range corresponds to full payload (panics if only offset is set).
func (n *Layer) initFrostFSObjectPayloadReader(ctx context.Context, p getFrostFSParams) (io.Reader, error) {
prm := PrmObjectRead{
Container: p.bktInfo.CID,
Object: p.oid,
WithPayload: true,
PayloadRange: [2]uint64{p.off, p.ln},
var prmAuth PrmAuth
n.prepareAuthParameters(ctx, &prmAuth, p.bktInfo.Owner)
if p.off+p.ln != 0 {
prm := PrmObjectRange{
PrmAuth: prmAuth,
Container: p.bktInfo.CID,
Object: p.oid,
PayloadRange: [2]uint64{p.off, p.ln},
}
return n.frostFS.RangeObject(ctx, prm)
}
n.prepareAuthParameters(ctx, &prm.PrmAuth, p.bktInfo.Owner)
prm := PrmObjectGet{
PrmAuth: prmAuth,
Container: p.bktInfo.CID,
Object: p.oid,
}
res, err := n.frostFS.ReadObject(ctx, prm)
res, err := n.frostFS.GetObject(ctx, prm)
if err != nil {
return nil, err
}
@ -150,32 +155,25 @@ func (n *Layer) initFrostFSObjectPayloadReader(ctx context.Context, p getFrostFS
}
// objectGet returns an object with payload in the object.
func (n *Layer) objectGet(ctx context.Context, bktInfo *data.BucketInfo, objID oid.ID) (*object.Object, error) {
func (n *Layer) objectGet(ctx context.Context, bktInfo *data.BucketInfo, objID oid.ID) (*Object, error) {
return n.objectGetBase(ctx, bktInfo, objID, PrmAuth{})
}
// objectGetWithAuth returns an object with payload in the object. Uses provided PrmAuth.
func (n *Layer) objectGetWithAuth(ctx context.Context, bktInfo *data.BucketInfo, objID oid.ID, auth PrmAuth) (*object.Object, error) {
func (n *Layer) objectGetWithAuth(ctx context.Context, bktInfo *data.BucketInfo, objID oid.ID, auth PrmAuth) (*Object, error) {
return n.objectGetBase(ctx, bktInfo, objID, auth)
}
func (n *Layer) objectGetBase(ctx context.Context, bktInfo *data.BucketInfo, objID oid.ID, auth PrmAuth) (*object.Object, error) {
prm := PrmObjectRead{
PrmAuth: auth,
Container: bktInfo.CID,
Object: objID,
WithHeader: true,
WithPayload: true,
func (n *Layer) objectGetBase(ctx context.Context, bktInfo *data.BucketInfo, objID oid.ID, auth PrmAuth) (*Object, error) {
prm := PrmObjectGet{
PrmAuth: auth,
Container: bktInfo.CID,
Object: objID,
}
n.prepareAuthParameters(ctx, &prm.PrmAuth, bktInfo.Owner)
res, err := n.frostFS.ReadObject(ctx, prm)
if err != nil {
return nil, err
}
return res.Head, nil
return n.frostFS.GetObject(ctx, prm)
}
// MimeByFilePath detect mime type by file path extension.

View file

@ -183,8 +183,7 @@ func (n *Layer) getCORS(ctx context.Context, bkt *data.BucketInfo) (*data.CORSCo
}
cors := &data.CORSConfiguration{}
if err = xml.Unmarshal(obj.Payload(), &cors); err != nil {
if err = xml.NewDecoder(obj.Payload).Decode(&cors); err != nil {
return nil, fmt.Errorf("unmarshal cors: %w", err)
}

View file

@ -50,7 +50,7 @@ func createFrostFS(ctx context.Context, log *zap.Logger, cfg PoolConfig) (*frost
return nil, fmt.Errorf("dial pool: %w", err)
}
return frostfs.NewAuthmateFrostFS(frostfs.NewFrostFS(p, cfg.Key)), nil
return frostfs.NewAuthmateFrostFS(frostfs.NewFrostFS(p, cfg.Key), log), nil
}
func parsePolicies(val string) (authmate.ContainerPolicies, error) {

View file

@ -124,7 +124,7 @@ func newApp(ctx context.Context, log *Logger, v *viper.Viper) *App {
objPool, treePool, key := getPools(ctx, log.logger, v)
cfg := tokens.Config{
FrostFS: frostfs.NewAuthmateFrostFS(frostfs.NewFrostFS(objPool, key)),
FrostFS: frostfs.NewAuthmateFrostFS(frostfs.NewFrostFS(objPool, key), log.logger),
Key: key,
CacheConfig: getAccessBoxCacheConfig(v, log.logger),
RemovingCheckAfterDurations: fetchRemovingCheckInterval(v, log.logger),

View file

@ -92,6 +92,7 @@ type FrostFS interface {
//
// It returns exactly one non-nil value. It returns any error encountered which
// prevented the object payload from being read.
// Object must contain full payload.
GetCredsObject(context.Context, oid.Address) (*object.Object, error)
}

52
docs/extensions.md Normal file
View file

@ -0,0 +1,52 @@
# S3 API Extension
## Bucket operations management
### Action to delete bucket (DeleteBucket)
Deletes bucket with all objects in it.
#### Request Parameters
- **Bucket**
Specifies the bucket being deleted.
#### Errors
- **NoSuchEntity**
The request was rejected because it referenced a resource entity that does not exist.
HTTP Status Code: 404
- **ServiceFailure**
The request processing has failed because of an unknown error, exception or failure.
HTTP Status Code: 500
#### Example
Sample Request
```text
DELETE / HTTP/1.1
X-Amz-Force-Delete-Bucket: true
Host: data.s3.<Region>.frostfs-s3-gw.com
Date: Wed, 01 Mar 2024 12:00:00 GMT
Authorization: authorization string
```
Sample Response
```text
HTTP/1.1 204 No Content
x-amz-id-2: JuKZqmXuiwFeDQxhD7M8KtsKobSzWA1QEjLbTMTagkKdBX2z7Il/jGhDeJ3j6s80
x-amz-request-id: 32FE2CEB32F5EE25
Date: Wed, 01 Mar 2006 12:00:00 GMT
Connection: close
Server: AmazonS3
```

2
go.mod
View file

@ -6,7 +6,7 @@ require (
git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.16.1-0.20240716113920-f517e3949164
git.frostfs.info/TrueCloudLab/frostfs-contract v0.19.3-0.20240621131249-49e5270f673e
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20230531082742-c97d21411eb6
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20240718141740-ce8270568d36
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20240722121227-fa89999d919c
git.frostfs.info/TrueCloudLab/policy-engine v0.0.0-20240611102930-ac965e8d176a
git.frostfs.info/TrueCloudLab/zapjournald v0.0.0-20240124114243-cb2e66427d02
github.com/aws/aws-sdk-go v1.44.6

4
go.sum
View file

@ -44,8 +44,8 @@ git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0 h1:FxqFDhQYYgpe41qsIHVOcdzSV
git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0/go.mod h1:RUIKZATQLJ+TaYQa60X2fTDwfuhMfm8Ar60bQ5fr+vU=
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20230531082742-c97d21411eb6 h1:aGQ6QaAnTerQ5Dq5b2/f9DUQtSqPkZZ/bkMx/HKuLCo=
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20230531082742-c97d21411eb6/go.mod h1:W8Nn08/l6aQ7UlIbpF7FsQou7TVpcRD1ZT1KG4TrFhE=
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20240718141740-ce8270568d36 h1:MV/vKJWLQT34RRbXYvkNKFYGNjL5bRNuCQMXkbC7fLI=
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20240718141740-ce8270568d36/go.mod h1:vluJ/+yQMcq8ZIZZSA7Te+JKClr0lgtRErjICvb8wto=
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20240722121227-fa89999d919c h1:8ZS6eUFnOhzUo9stFqwq1Zyq+Y5YNcYAidCGICcZVL4=
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20240722121227-fa89999d919c/go.mod h1:vluJ/+yQMcq8ZIZZSA7Te+JKClr0lgtRErjICvb8wto=
git.frostfs.info/TrueCloudLab/hrw v1.2.1 h1:ccBRK21rFvY5R1WotI6LNoPlizk7qSvdfD8lNIRudVc=
git.frostfs.info/TrueCloudLab/hrw v1.2.1/go.mod h1:C1Ygde2n843yTZEQ0FP69jYiuaYV0kriLvP4zm8JuvM=
git.frostfs.info/TrueCloudLab/policy-engine v0.0.0-20240611102930-ac965e8d176a h1:Bk1fB4cQASPKgAVGCdlBOEp5ohZfDxqK6fZM8eP+Emo=

View file

@ -4,18 +4,22 @@ import (
"bytes"
"context"
"fmt"
"io"
"strconv"
"time"
objectv2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/middleware"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/authmate"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/creds/tokens"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/frostfs/crdt"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/acl"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.uber.org/zap"
)
const (
@ -25,11 +29,12 @@ const (
// AuthmateFrostFS is a mediator which implements authmate.FrostFS through pool.Pool.
type AuthmateFrostFS struct {
frostFS layer.FrostFS
log *zap.Logger
}
// NewAuthmateFrostFS creates new AuthmateFrostFS using provided pool.Pool.
func NewAuthmateFrostFS(frostFS layer.FrostFS) *AuthmateFrostFS {
return &AuthmateFrostFS{frostFS: frostFS}
func NewAuthmateFrostFS(frostFS layer.FrostFS, log *zap.Logger) *AuthmateFrostFS {
return &AuthmateFrostFS{frostFS: frostFS, log: log}
}
// ContainerExists implements authmate.FrostFS interface method.
@ -79,17 +84,27 @@ func (x *AuthmateFrostFS) GetCredsObject(ctx context.Context, addr oid.Address)
credObjID = last.ObjID
}
res, err := x.frostFS.ReadObject(ctx, layer.PrmObjectRead{
Container: addr.Container(),
Object: credObjID,
WithPayload: true,
WithHeader: true,
res, err := x.frostFS.GetObject(ctx, layer.PrmObjectGet{
Container: addr.Container(),
Object: credObjID,
})
if err != nil {
return nil, err
}
return res.Head, err
defer func() {
if closeErr := res.Payload.Close(); closeErr != nil {
x.reqLogger(ctx).Warn(logs.CloseCredsObjectPayload, zap.Error(closeErr))
}
}()
data, err := io.ReadAll(res.Payload)
if err != nil {
return nil, err
}
res.Header.SetPayload(data)
return &res.Header, err
}
// CreateObject implements authmate.FrostFS interface method.
@ -143,21 +158,28 @@ func (x *AuthmateFrostFS) getCredVersions(ctx context.Context, addr oid.Address)
versions := crdt.NewObjectVersions(objCredSystemName)
for _, id := range credVersions {
objVersion, err := x.frostFS.ReadObject(ctx, layer.PrmObjectRead{
Container: addr.Container(),
Object: id,
WithHeader: true,
objVersion, err := x.frostFS.HeadObject(ctx, layer.PrmObjectHead{
Container: addr.Container(),
Object: id,
})
if err != nil {
return nil, fmt.Errorf("head crdt access box '%s': %w", id.EncodeToString(), err)
}
versions.AppendVersion(crdt.NewObjectVersion(objVersion.Head))
versions.AppendVersion(crdt.NewObjectVersion(objVersion))
}
return versions, nil
}
func (x *AuthmateFrostFS) reqLogger(ctx context.Context) *zap.Logger {
reqLogger := middleware.GetReqLog(ctx)
if reqLogger != nil {
return reqLogger
}
return x.log
}
func credVersionSysName(cnrID cid.ID, objID oid.ID) string {
return cnrID.EncodeToString() + "0" + objID.EncodeToString()
}

View file

@ -14,6 +14,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zaptest"
)
func TestGetCredsObject(t *testing.T) {
@ -35,7 +36,7 @@ func TestGetCredsObject(t *testing.T) {
},
}})
frostfs := NewAuthmateFrostFS(layer.NewTestFrostFS(key))
frostfs := NewAuthmateFrostFS(layer.NewTestFrostFS(key), zaptest.NewLogger(t))
cid, err := frostfs.CreateContainer(ctx, authmate.PrmContainerCreate{
FriendlyName: bktName,

View file

@ -237,8 +237,12 @@ func (x *FrostFS) CreateObject(ctx context.Context, prm layer.PrmObjectCreate) (
prmPut.UseKey(prm.PrivateKey)
}
idObj, err := x.pool.PutObject(ctx, prmPut)
return idObj, handleObjectError("save object via connection pool", err)
res, err := x.pool.PutObject(ctx, prmPut)
if err = handleObjectError("save object via connection pool", err); err != nil {
return oid.ID{}, err
}
return res.ObjectID, nil
}
// wraps io.ReadCloser and transforms Read errors related to access violation
@ -255,8 +259,31 @@ func (x payloadReader) Read(p []byte) (int, error) {
return n, handleObjectError("read payload", err)
}
// ReadObject implements frostfs.FrostFS interface method.
func (x *FrostFS) ReadObject(ctx context.Context, prm layer.PrmObjectRead) (*layer.ObjectPart, error) {
// HeadObject implements frostfs.FrostFS interface method.
func (x *FrostFS) HeadObject(ctx context.Context, prm layer.PrmObjectHead) (*object.Object, error) {
var addr oid.Address
addr.SetContainer(prm.Container)
addr.SetObject(prm.Object)
var prmHead pool.PrmObjectHead
prmHead.SetAddress(addr)
if prm.BearerToken != nil {
prmHead.UseBearer(*prm.BearerToken)
} else {
prmHead.UseKey(prm.PrivateKey)
}
res, err := x.pool.HeadObject(ctx, prmHead)
if err != nil {
return nil, handleObjectError("read object header via connection pool", err)
}
return &res, nil
}
// GetObject implements frostfs.FrostFS interface method.
func (x *FrostFS) GetObject(ctx context.Context, prm layer.PrmObjectGet) (*layer.Object, error) {
var addr oid.Address
addr.SetContainer(prm.Container)
addr.SetObject(prm.Object)
@ -270,55 +297,23 @@ func (x *FrostFS) ReadObject(ctx context.Context, prm layer.PrmObjectRead) (*lay
prmGet.UseKey(prm.PrivateKey)
}
if prm.WithHeader {
if prm.WithPayload {
res, err := x.pool.GetObject(ctx, prmGet)
if err != nil {
return nil, handleObjectError("init full object reading via connection pool", err)
}
defer res.Payload.Close()
payload, err := io.ReadAll(res.Payload)
if err != nil {
return nil, handleObjectError("read full object payload", err)
}
res.Header.SetPayload(payload)
return &layer.ObjectPart{
Head: &res.Header,
}, nil
}
var prmHead pool.PrmObjectHead
prmHead.SetAddress(addr)
if prm.BearerToken != nil {
prmHead.UseBearer(*prm.BearerToken)
} else {
prmHead.UseKey(prm.PrivateKey)
}
hdr, err := x.pool.HeadObject(ctx, prmHead)
if err != nil {
return nil, handleObjectError("read object header via connection pool", err)
}
return &layer.ObjectPart{
Head: &hdr,
}, nil
} else if prm.PayloadRange[0]+prm.PayloadRange[1] == 0 {
res, err := x.pool.GetObject(ctx, prmGet)
if err != nil {
return nil, handleObjectError("init full payload range reading via connection pool", err)
}
return &layer.ObjectPart{
Payload: res.Payload,
}, nil
res, err := x.pool.GetObject(ctx, prmGet)
if err != nil {
return nil, handleObjectError("init full object reading via connection pool", err)
}
return &layer.Object{
Header: res.Header,
Payload: res.Payload,
}, nil
}
// RangeObject implements frostfs.FrostFS interface method.
func (x *FrostFS) RangeObject(ctx context.Context, prm layer.PrmObjectRange) (io.ReadCloser, error) {
var addr oid.Address
addr.SetContainer(prm.Container)
addr.SetObject(prm.Object)
var prmRange pool.PrmObjectRange
prmRange.SetAddress(addr)
prmRange.SetOffset(prm.PayloadRange[0])
@ -335,9 +330,7 @@ func (x *FrostFS) ReadObject(ctx context.Context, prm layer.PrmObjectRead) (*lay
return nil, handleObjectError("init payload range reading via connection pool", err)
}
return &layer.ObjectPart{
Payload: payloadReader{&res},
}, nil
return payloadReader{&res}, nil
}
// DeleteObject implements frostfs.FrostFS interface method.

View file

@ -150,4 +150,5 @@ const (
FoundSeveralSystemNodes = "found several system nodes, latest be used"
FailedToParsePartInfo = "failed to parse part info"
CouldNotFetchCORSContainerInfo = "couldn't fetch CORS container info"
CloseCredsObjectPayload = "close creds object payload"
)