From cd64f41ce83ebf3ac2693167bdded567d93c681e Mon Sep 17 00:00:00 2001 From: Leonard Lyubich Date: Tue, 1 Mar 2022 22:02:24 +0300 Subject: [PATCH] [#346] *: Refactor communication with NeoFS at the protocol level Make `tokens`, `authmate` and `layer` packages to depend from locally defined `NeoFS` interface of the virtual connection to NeoFS network. Create internal `neofs` package and implement these interfaces through `pool.Pool` there. Implement mediators between `NeoFS` interfaces and `neofs.NeoFS` implementation. Signed-off-by: Leonard Lyubich --- api/auth/center.go | 12 +- api/layer/container.go | 82 +++-- api/layer/layer.go | 249 ++++++++++++-- api/layer/multipart_upload.go | 4 +- api/layer/object.go | 215 +++++-------- api/layer/system_object.go | 53 ++- api/layer/versioning_test.go | 343 ++++++++++---------- api/resolver/resolver.go | 52 +-- authmate/authmate.go | 139 ++++---- cmd/authmate/main.go | 25 +- cmd/s3-gw/app.go | 47 +-- cmd/s3-gw/neofs.go | 42 +++ creds/tokens/credentials.go | 102 +++--- internal/neofs/neofs.go | 589 ++++++++++++++++++++++++++++++++++ 14 files changed, 1348 insertions(+), 606 deletions(-) create mode 100644 cmd/s3-gw/neofs.go create mode 100644 internal/neofs/neofs.go diff --git a/api/auth/center.go b/api/auth/center.go index f469636..56c5992 100644 --- a/api/auth/center.go +++ b/api/auth/center.go @@ -22,8 +22,6 @@ import ( "github.com/nspcc-dev/neofs-s3-gw/creds/accessbox" "github.com/nspcc-dev/neofs-s3-gw/creds/tokens" "github.com/nspcc-dev/neofs-sdk-go/object/address" - "github.com/nspcc-dev/neofs-sdk-go/pool" - "go.uber.org/zap" ) // authorizationFieldRegexp -- is regexp for credentials with Base58 encoded cid and oid and '0' (zero) as delimiter. @@ -44,12 +42,6 @@ type ( cli tokens.Credentials } - // Params stores node connection parameters. - Params struct { - Pool pool.Pool - Logger *zap.Logger - } - prs int authHeader struct { @@ -82,9 +74,9 @@ func (p prs) Seek(_ int64, _ int) (int64, error) { var _ io.ReadSeeker = prs(0) // New creates an instance of AuthCenter. -func New(conns pool.Pool, key *keys.PrivateKey, config *cache.Config) Center { +func New(neoFS tokens.NeoFS, key *keys.PrivateKey, config *cache.Config) Center { return ¢er{ - cli: tokens.New(conns, key, config), + cli: tokens.New(neoFS, key, config), reg: ®expSubmatcher{re: authorizationFieldRegexp}, postReg: ®expSubmatcher{re: postPolicyCredentialRegexp}, } diff --git a/api/layer/container.go b/api/layer/container.go index a6fc5fc..4d4581f 100644 --- a/api/layer/container.go +++ b/api/layer/container.go @@ -15,7 +15,6 @@ import ( "github.com/nspcc-dev/neofs-sdk-go/container" cid "github.com/nspcc-dev/neofs-sdk-go/container/id" "github.com/nspcc-dev/neofs-sdk-go/eacl" - "github.com/nspcc-dev/neofs-sdk-go/pool" "github.com/nspcc-dev/neofs-sdk-go/session" "go.uber.org/zap" ) @@ -30,21 +29,21 @@ type ( const locationConstraintAttr = ".s3-location-constraint" -func (n *layer) containerInfo(ctx context.Context, cid *cid.ID) (*data.BucketInfo, error) { +func (n *layer) containerInfo(ctx context.Context, idCnr *cid.ID) (*data.BucketInfo, error) { var ( err error res *container.Container rid = api.GetRequestID(ctx) info = &data.BucketInfo{ - CID: cid, - Name: cid.String(), + CID: idCnr, + Name: idCnr.String(), } ) - res, err = n.pool.GetContainer(ctx, cid, n.CallOptions(ctx)...) + res, err = n.neoFS.Container(ctx, *idCnr) if err != nil { n.log.Error("could not fetch container", - zap.Stringer("cid", cid), + zap.Stringer("cid", idCnr), zap.String("request_id", rid), zap.Error(err)) @@ -65,7 +64,7 @@ func (n *layer) containerInfo(ctx context.Context, cid *cid.ID) (*data.BucketInf unix, err := strconv.ParseInt(attr.Value(), 10, 64) if err != nil { n.log.Error("could not parse container creation time", - zap.Stringer("cid", cid), + zap.Stringer("cid", idCnr), zap.String("request_id", rid), zap.String("created_at", val), zap.Error(err)) @@ -81,7 +80,7 @@ func (n *layer) containerInfo(ctx context.Context, cid *cid.ID) (*data.BucketInf if err := n.bucketCache.Put(info); err != nil { n.log.Warn("could not put bucket info into cache", - zap.Stringer("cid", cid), + zap.Stringer("cid", idCnr), zap.String("bucket_name", info.Name), zap.Error(err)) } @@ -93,20 +92,20 @@ func (n *layer) containerList(ctx context.Context) ([]*data.BucketInfo, error) { var ( err error own = n.Owner(ctx) - res []*cid.ID + res []cid.ID rid = api.GetRequestID(ctx) ) - res, err = n.pool.ListContainers(ctx, own, n.CallOptions(ctx)...) + res, err = n.neoFS.UserContainers(ctx, *own) if err != nil { - n.log.Error("could not fetch container", + n.log.Error("could not list user containers", zap.String("request_id", rid), zap.Error(err)) return nil, err } list := make([]*data.BucketInfo, 0, len(res)) - for _, cid := range res { - info, err := n.containerInfo(ctx, cid) + for i := range res { + info, err := n.containerInfo(ctx, &res[i]) if err != nil { n.log.Error("could not fetch container info", zap.String("request_id", rid), @@ -130,28 +129,23 @@ func (n *layer) createContainer(ctx context.Context, p *CreateBucketParams) (*ci LocationConstraint: p.LocationConstraint, } - options := []container.Option{ - container.WithPolicy(p.Policy), - container.WithCustomBasicACL(acl.BasicACL(p.ACL)), - container.WithAttribute(container.AttributeName, p.Name), - container.WithAttribute(container.AttributeTimestamp, strconv.FormatInt(bktInfo.Created.Unix(), 10)), - } + var locConstAttr *container.Attribute if p.LocationConstraint != "" { - options = append(options, container.WithAttribute(locationConstraintAttr, p.LocationConstraint)) + locConstAttr = container.NewAttribute() + locConstAttr.SetKey(locationConstraintAttr) + locConstAttr.SetValue(p.LocationConstraint) } - cnr := container.New(options...) - container.SetNativeName(cnr, p.Name) - - cnr.SetSessionToken(p.SessionToken) - cnr.SetOwnerID(bktInfo.Owner) - - if bktInfo.CID, err = n.pool.PutContainer(ctx, cnr); err != nil { - return nil, err - } - - if err = n.pool.WaitForContainerPresence(ctx, bktInfo.CID, pool.DefaultPollingParams()); err != nil { + if bktInfo.CID, err = n.neoFS.CreateContainer(ctx, PrmContainerCreate{ + Creator: *bktInfo.Owner, + Policy: *p.Policy, + Name: p.Name, + SessionToken: p.SessionToken, + Time: bktInfo.Created, + BasicACL: acl.BasicACL(p.ACL), + LocationConstraintAttribute: locConstAttr, + }); err != nil { return nil, err } @@ -172,21 +166,20 @@ func (n *layer) createContainer(ctx context.Context, p *CreateBucketParams) (*ci func (n *layer) setContainerEACLTable(ctx context.Context, cid *cid.ID, table *eacl.Table) error { table.SetCID(cid) - var sessionToken *session.Token boxData, err := GetBoxData(ctx) if err == nil { - sessionToken = boxData.Gate.SessionTokenForSetEACL() + table.SetSessionToken(boxData.Gate.SessionTokenForSetEACL()) } - if err := n.pool.SetEACL(ctx, table, pool.WithSession(sessionToken)); err != nil { + if err := n.neoFS.SetContainerEACL(ctx, *table); err != nil { return err } - return n.waitEACLPresence(ctx, cid, table, defaultWaitParams()) + return n.waitEACLPresence(ctx, *cid, table, defaultWaitParams()) } func (n *layer) GetContainerEACL(ctx context.Context, cid *cid.ID) (*eacl.Table, error) { - return n.pool.GetEACL(ctx, cid) + return n.neoFS.ContainerEACL(ctx, *cid) } type waitParams struct { @@ -201,7 +194,7 @@ func defaultWaitParams() *waitParams { } } -func (n *layer) waitEACLPresence(ctx context.Context, cid *cid.ID, table *eacl.Table, params *waitParams) error { +func (n *layer) waitEACLPresence(ctx context.Context, cid cid.ID, table *eacl.Table, params *waitParams) error { exp, err := table.Marshal() if err != nil { return fmt.Errorf("couldn't marshal eacl: %w", err) @@ -213,6 +206,8 @@ func (n *layer) waitEACLPresence(ctx context.Context, cid *cid.ID, table *eacl.T defer ticker.Stop() wdone := wctx.Done() done := ctx.Done() + var eaclTable *eacl.Table + var got []byte for { select { case <-done: @@ -220,10 +215,13 @@ func (n *layer) waitEACLPresence(ctx context.Context, cid *cid.ID, table *eacl.T case <-wdone: return wctx.Err() case <-ticker.C: - eaclTable, err := n.pool.GetEACL(ctx, cid) + eaclTable, err = n.neoFS.ContainerEACL(ctx, cid) if err == nil { - got, err := eaclTable.Marshal() - if err == nil && bytes.Equal(exp, got) { + got, err = eaclTable.Marshal() + if err != nil { + // not expected, but if occurred - doesn't make sense to continue + return fmt.Errorf("marshal received eACL: %w", err) + } else if bytes.Equal(exp, got) { return nil } } @@ -232,11 +230,11 @@ func (n *layer) waitEACLPresence(ctx context.Context, cid *cid.ID, table *eacl.T } } -func (n *layer) deleteContainer(ctx context.Context, cid *cid.ID) error { +func (n *layer) deleteContainer(ctx context.Context, idCnr *cid.ID) error { var sessionToken *session.Token boxData, err := GetBoxData(ctx) if err == nil { sessionToken = boxData.Gate.SessionTokenForDelete() } - return n.pool.DeleteContainer(ctx, cid, pool.WithSession(sessionToken)) + return n.neoFS.DeleteContainer(ctx, *idCnr, sessionToken) } diff --git a/api/layer/layer.go b/api/layer/layer.go index 9d697a5..df8b9f8 100644 --- a/api/layer/layer.go +++ b/api/layer/layer.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "crypto/ecdsa" + stderrors "errors" "fmt" "io" "net/url" @@ -18,6 +19,8 @@ import ( "github.com/nspcc-dev/neofs-s3-gw/api/notifications" "github.com/nspcc-dev/neofs-s3-gw/api/resolver" "github.com/nspcc-dev/neofs-s3-gw/creds/accessbox" + "github.com/nspcc-dev/neofs-sdk-go/acl" + "github.com/nspcc-dev/neofs-sdk-go/container" cid "github.com/nspcc-dev/neofs-sdk-go/container/id" "github.com/nspcc-dev/neofs-sdk-go/eacl" "github.com/nspcc-dev/neofs-sdk-go/netmap" @@ -27,12 +30,216 @@ import ( "github.com/nspcc-dev/neofs-sdk-go/owner" "github.com/nspcc-dev/neofs-sdk-go/pool" "github.com/nspcc-dev/neofs-sdk-go/session" + "github.com/nspcc-dev/neofs-sdk-go/token" "go.uber.org/zap" ) +// PrmContainerCreate groups parameters of NeoFS.CreateContainer operation. +type PrmContainerCreate struct { + // NeoFS identifier of the container creator. + Creator owner.ID + + // Container placement policy. + Policy netmap.PlacementPolicy + + // Name for the container. + Name string + + // Token of the container's creation session. Nil means session absence. + SessionToken *session.Token + + // Time when container is created. + Time time.Time + + // Basic ACL of the container. + BasicACL acl.BasicACL + + // Attribute for LocationConstraint parameter (optional). + LocationConstraintAttribute *container.Attribute +} + +// PrmAuth groups authentication parameters for the NeoFS operation. +type PrmAuth struct { + // Bearer token to be used for the operation. Overlaps PrivateKey. Optional. + BearerToken *token.BearerToken + + // Private key used for the operation if BearerToken is missing (in this case non-nil). + PrivateKey *ecdsa.PrivateKey +} + +// PrmObjectSelect groups parameters of NeoFS.SelectObjects operation. +type PrmObjectSelect struct { + // Authentication parameters. + PrmAuth + + // Container to select the objects from. + Container cid.ID + + // Key-value object attribute which should exactly be + // presented in selected objects. Optional, empty key means any. + ExactAttribute [2]string + + // File prefix of the selected objects. Optional, empty value means any. + FilePrefix string +} + +// PrmObjectRead groups parameters of NeoFS.ReadObject operation. +type PrmObjectRead struct { + // Authentication parameters. + PrmAuth + + // Container to read the object header from. + Container cid.ID + + // ID of the object for which to read the header. + Object oid.ID + + // Flag to read object header. + WithHeader bool + + // Flag to read object payload. False overlaps payload range. + WithPayload bool + + // Offset-length range of the object payload to be read. + PayloadRange [2]uint64 +} + +// ObjectPart represents partially read NeoFS object. +type ObjectPart struct { + // Object header with optional in-memory payload part. + Head *object.Object + + // Object payload part encapsulated in io.Reader primitive. + // Returns ErrAccessDenied on read access violation. + Payload io.ReadCloser +} + +// PrmObjectCreate groups parameters of NeoFS.CreateObject operation. +type PrmObjectCreate struct { + // Authentication parameters. + PrmAuth + + // Container to store the object. + Container cid.ID + + // NeoFS identifier of the object creator. + Creator owner.ID + + // Key-value object attributes. + Attributes [][2]string + + // Full payload size (optional). + PayloadSize uint64 + + // Associated filename (optional). + Filename string + + // Object payload encapsulated in io.Reader primitive. + Payload io.Reader +} + +// PrmObjectDelete groups parameters of NeoFS.DeleteObject operation. +type PrmObjectDelete struct { + // Authentication parameters. + PrmAuth + + // Container to delete the object from. + Container cid.ID + + // Identifier of the removed object. + Object oid.ID +} + +// ErrAccessDenied is returned from NeoFS in case of access violation. +var ErrAccessDenied = stderrors.New("access denied") + +// NeoFS represents virtual connection to NeoFS network. +type NeoFS interface { + // CreateContainer creates and saves parameterized container in NeoFS. + // Returns ID of the saved container. + // + // Returns exactly one non-nil value. Returns any error encountered which + // prevented the container to be created. + CreateContainer(context.Context, PrmContainerCreate) (*cid.ID, error) + + // Container reads container from NeoFS by ID. + // + // Returns exactly one non-nil value. Returns any error encountered which + // prevented the container to be read. + Container(context.Context, cid.ID) (*container.Container, error) + + // UserContainers reads list of the containers owned by specified user. + // + // Returns exactly one non-nil value. Returns any error encountered which + // prevented the containers to be listed. + UserContainers(context.Context, owner.ID) ([]cid.ID, error) + + // SetContainerEACL saves eACL table of the container in NeoFS. + // + // Returns any error encountered which prevented the eACL to be saved. + SetContainerEACL(context.Context, eacl.Table) error + + // ContainerEACL reads container eACL from NeoFS by container ID. + // + // Returns exactly one non-nil value. Returns any error encountered which + // prevented the eACL to be read. + ContainerEACL(context.Context, cid.ID) (*eacl.Table, error) + + // DeleteContainer marks the container to be removed from NeoFS by ID. + // Request is sent within session if the session token is specified. + // Successful return does not guarantee the actual removal. + // + // Returns any error encountered which prevented the removal request to be sent. + DeleteContainer(context.Context, cid.ID, *session.Token) error + + // SelectObjects perform object selection from the NeoFS container according + // to specified parameters. Selects user objects only. + // + // Returns ErrAccessDenied on selection access violation. + // + // Returns exactly one non-nil value. Returns any error encountered which + // prevented the objects to be selected. + SelectObjects(context.Context, PrmObjectSelect) ([]oid.ID, error) + + // ReadObject reads part of the object from the NeoFS container by identifier. + // Exact part is returned according to the parameters: + // * with header only: empty payload (both in-mem and reader parts are nil); + // * with payload only: header is nil (zero range means full payload); + // * with header and payload: full in-mem object, payload reader is nil. + // + // WithHeader or WithPayload is true. Range length is positive if offset is positive. + // + // Payload reader should be closed if it is no longer needed. + // + // Returns ErrAccessDenied on read access violation. + // + // Returns exactly one non-nil value. Returns any error encountered which + // prevented the object header to be read. + ReadObject(context.Context, PrmObjectRead) (*ObjectPart, error) + + // CreateObject creates and saves parameterized object in the NeoFS container. + // Returns ID of the saved object. + // + // Creation time should be written into object (UTC). + // + // Returns ErrAccessDenied on write access violation. + // + // Returns exactly one non-nil value. Returns any error encountered which + // prevented the container to be created. + CreateObject(context.Context, PrmObjectCreate) (*oid.ID, error) + + // DeleteObject marks the object to be removed from the NeoFS container by identifier. + // Successful return does not guarantee the actual removal. + // + // Returns ErrAccessDenied on remove access violation. + // + // Returns any error encountered which prevented the removal request to be sent. + DeleteObject(context.Context, PrmObjectDelete) error +} + type ( layer struct { - pool pool.Pool + neoFS NeoFS log *zap.Logger anonKey AnonymousKey resolver *resolver.BucketResolver @@ -66,14 +273,6 @@ type ( System *cache.Config } - // Params stores basic API parameters. - Params struct { - Pool pool.Pool - Logger *zap.Logger - Timeout time.Duration - Key *ecdsa.PrivateKey - } - // GetObjectParams stores object get request parameters. GetObjectParams struct { Range *RangeParams @@ -183,15 +382,8 @@ type ( TagSet map[string]string } - // NeoFS provides basic NeoFS interface. - NeoFS interface { - Get(ctx context.Context, address *address.Address) (*object.Object, error) - } - // Client provides S3 API client interface. Client interface { - NeoFS - EphemeralKey() *keys.PublicKey PutBucketVersioning(ctx context.Context, p *PutVersioningParams) (*data.ObjectInfo, error) @@ -262,9 +454,9 @@ func DefaultCachesConfigs() *CachesConfig { // NewLayer creates instance of layer. It checks credentials // and establishes gRPC connection with node. -func NewLayer(log *zap.Logger, conns pool.Pool, config *Config) Client { +func NewLayer(log *zap.Logger, neoFS NeoFS, config *Config) Client { return &layer{ - pool: conns, + neoFS: neoFS, log: log, anonKey: config.AnonKey, resolver: config.Resolver, @@ -293,17 +485,26 @@ func IsAuthenticatedRequest(ctx context.Context) bool { // Owner returns owner id from BearerToken (context) or from client owner. func (n *layer) Owner(ctx context.Context) *owner.ID { - if data, ok := ctx.Value(api.BoxData).(*accessbox.Box); ok && data != nil && data.Gate != nil { - return data.Gate.BearerToken.Issuer() + if bd, ok := ctx.Value(api.BoxData).(*accessbox.Box); ok && bd != nil && bd.Gate != nil { + return bd.Gate.BearerToken.Issuer() } return owner.NewIDFromPublicKey((*ecdsa.PublicKey)(n.EphemeralKey())) } +func (n *layer) prepareAuthParameters(ctx context.Context, prm *PrmAuth) { + if bd, ok := ctx.Value(api.BoxData).(*accessbox.Box); ok && bd != nil && bd.Gate != nil { + prm.BearerToken = bd.Gate.BearerToken + return + } + + prm.PrivateKey = &n.anonKey.Key.PrivateKey +} + // CallOptions returns []pool.CallOption options: client.WithBearer or client.WithKey (if request is anonymous). func (n *layer) CallOptions(ctx context.Context) []pool.CallOption { - if data, ok := ctx.Value(api.BoxData).(*accessbox.Box); ok && data != nil && data.Gate != nil { - return []pool.CallOption{pool.WithBearer(data.Gate.BearerToken)} + if bd, ok := ctx.Value(api.BoxData).(*accessbox.Box); ok && bd != nil && bd.Gate != nil { + return []pool.CallOption{pool.WithBearer(bd.Gate.BearerToken)} } return []pool.CallOption{pool.WithKey(&n.anonKey.Key.PrivateKey)} @@ -341,14 +542,14 @@ func (n *layer) GetBucketACL(ctx context.Context, name string) (*BucketACL, erro return nil, err } - eacl, err := n.GetContainerEACL(ctx, inf.CID) + eACL, err := n.GetContainerEACL(ctx, inf.CID) if err != nil { return nil, err } return &BucketACL{ Info: inf, - EACL: eacl, + EACL: eACL, }, nil } diff --git a/api/layer/multipart_upload.go b/api/layer/multipart_upload.go index f0f8f70..4bd55ac 100644 --- a/api/layer/multipart_upload.go +++ b/api/layer/multipart_upload.go @@ -308,8 +308,8 @@ func (n *layer) ListMultipartUploads(ctx context.Context, p *ListMultipartUpload } f := &findParams{ - filters: []filter{{attr: UploadPartNumberAttributeName, val: "0"}}, - cid: p.Bkt.CID, + attr: [2]string{UploadPartNumberAttributeName, "0"}, + cid: p.Bkt.CID, } ids, err := n.objectSearch(ctx, f) diff --git a/api/layer/object.go b/api/layer/object.go index 1fe2e53..105b90f 100644 --- a/api/layer/object.go +++ b/api/layer/object.go @@ -3,10 +3,8 @@ package layer import ( "context" "errors" - "fmt" "io" "sort" - "strconv" "strings" "time" @@ -18,20 +16,14 @@ import ( "github.com/nspcc-dev/neofs-sdk-go/object" "github.com/nspcc-dev/neofs-sdk-go/object/address" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" - "github.com/nspcc-dev/neofs-sdk-go/owner" "go.uber.org/zap" ) type ( findParams struct { - filters []filter - cid *cid.ID - prefix string - } - - filter struct { - attr string - val string + attr [2]string + cid *cid.ID + prefix string } getParams struct { @@ -76,54 +68,25 @@ type ( func (n *layer) objectSearchByName(ctx context.Context, cid *cid.ID, filename string) ([]oid.ID, error) { f := &findParams{ - filters: []filter{{attr: object.AttributeFileName, val: filename}}, - cid: cid, - prefix: "", + attr: [2]string{object.AttributeFileName, filename}, + cid: cid, } return n.objectSearch(ctx, f) } // objectSearch returns all available objects by search params. func (n *layer) objectSearch(ctx context.Context, p *findParams) ([]oid.ID, error) { - var filters object.SearchFilters - filters.AddRootFilter() - - for _, filter := range p.filters { - filters.AddFilter(filter.attr, filter.val, object.MatchStringEqual) + prm := PrmObjectSelect{ + Container: *p.cid, + ExactAttribute: p.attr, + FilePrefix: p.prefix, } - if p.prefix != "" { - filters.AddFilter(object.AttributeFileName, p.prefix, object.MatchCommonPrefix) - } + n.prepareAuthParameters(ctx, &prm.PrmAuth) - res, err := n.pool.SearchObjects(ctx, *p.cid, filters, n.CallOptions(ctx)...) - if err != nil { - return nil, fmt.Errorf("init searching using client: %w", err) - } + res, err := n.neoFS.SelectObjects(ctx, prm) - defer res.Close() - - var num, read int - buf := make([]oid.ID, 10) - - for { - num, err = res.Read(buf[read:]) - if num > 0 { - read += num - buf = append(buf, oid.ID{}) - buf = buf[:cap(buf)] - } - - if err != nil { - if errors.Is(err, io.EOF) { - break - } - - return nil, n.transformNeofsError(ctx, err) - } - } - - return buf[:read], nil + return res, n.transformNeofsError(ctx, err) } func newAddress(cid *cid.ID, oid *oid.ID) *address.Address { @@ -135,83 +98,69 @@ func newAddress(cid *cid.ID, oid *oid.ID) *address.Address { // objectHead returns all object's headers. func (n *layer) objectHead(ctx context.Context, idCnr *cid.ID, idObj *oid.ID) (*object.Object, error) { - var addr address.Address + prm := PrmObjectRead{ + Container: *idCnr, + Object: *idObj, + WithHeader: true, + } - addr.SetContainerID(idCnr) - addr.SetObjectID(idObj) + n.prepareAuthParameters(ctx, &prm.PrmAuth) - obj, err := n.pool.HeadObject(ctx, addr, n.CallOptions(ctx)...) - return obj, n.transformNeofsError(ctx, err) + res, err := n.neoFS.ReadObject(ctx, prm) + if err != nil { + return nil, n.transformNeofsError(ctx, err) + } + + return res.Head, nil } // writes payload part of the NeoFS object to the provided io.Writer. // Zero range corresponds to full payload (panics if only offset is set). func (n *layer) objectWritePayload(ctx context.Context, p getParams) error { - // form object address - var a address.Address - - a.SetContainerID(p.cid) - a.SetObjectID(p.oid) - - fmt.Println("objectWritePayload", p.cid, p.oid) - - // init payload reader - var r io.ReadCloser - - if p.ln+p.off == 0 { - res, err := n.pool.GetObject(ctx, a, n.CallOptions(ctx)...) - if err != nil { - return n.transformNeofsError(ctx, fmt.Errorf("get object using client: %w", err)) - } - - p.ln = res.Header.PayloadSize() - r = res.Payload - } else { - res, err := n.pool.ObjectRange(ctx, a, p.off, p.ln, n.CallOptions(ctx)...) - if err != nil { - return n.transformNeofsError(ctx, fmt.Errorf("range object payload using client: %w", err)) - } - - r = res + prm := PrmObjectRead{ + Container: *p.cid, + Object: *p.oid, + WithPayload: true, + PayloadRange: [2]uint64{p.off, p.ln}, } - defer r.Close() + n.prepareAuthParameters(ctx, &prm.PrmAuth) - if p.ln > 0 { - if p.ln > 4096 { // configure? - p.ln = 4096 + res, err := n.neoFS.ReadObject(ctx, prm) + if err == nil { + defer res.Payload.Close() + + if p.ln == 0 { + p.ln = 4096 // configure? } // alloc buffer for copying buf := make([]byte, p.ln) // sync-pool it? // copy full payload - _, err := io.CopyBuffer(p.w, r, buf) - if err != nil { - return n.transformNeofsError(ctx, fmt.Errorf("copy payload range: %w", err)) - } + _, err = io.CopyBuffer(p.w, res.Payload, buf) } - return nil + return n.transformNeofsError(ctx, err) } // objectGet returns an object with payload in the object. func (n *layer) objectGet(ctx context.Context, addr *address.Address) (*object.Object, error) { - res, err := n.pool.GetObject(ctx, *addr, n.CallOptions(ctx)...) + prm := PrmObjectRead{ + Container: *addr.ContainerID(), + Object: *addr.ObjectID(), + WithHeader: true, + WithPayload: true, + } + + n.prepareAuthParameters(ctx, &prm.PrmAuth) + + res, err := n.neoFS.ReadObject(ctx, prm) if err != nil { return nil, n.transformNeofsError(ctx, err) } - defer res.Payload.Close() - - payload, err := io.ReadAll(res.Payload) - if err != nil { - return nil, fmt.Errorf("read payload: %w", err) - } - - object.NewRawFrom(&res.Header).SetPayload(payload) - - return &res.Header, nil + return res.Head, nil } // objectPut into NeoFS, took payload from io.Reader. @@ -235,9 +184,24 @@ func (n *layer) objectPut(ctx context.Context, bkt *data.BucketInfo, p *PutObjec r = d.MultiReader() } } - rawObject := formRawObject(p, bkt.CID, own, p.Object) - id, err := n.pool.PutObject(ctx, *rawObject.Object(), r, n.CallOptions(ctx)...) + prm := PrmObjectCreate{ + Container: *bkt.CID, + Creator: *own, + PayloadSize: uint64(p.Size), + Filename: p.Object, + Payload: r, + } + + prm.Attributes = make([][2]string, 0, len(p.Header)) + + for k, v := range p.Header { + prm.Attributes = append(prm.Attributes, [2]string{k, v}) + } + + n.prepareAuthParameters(ctx, &prm.PrmAuth) + + id, err := n.neoFS.CreateObject(ctx, prm) if err != nil { return nil, n.transformNeofsError(ctx, err) } @@ -292,35 +256,6 @@ func (n *layer) objectPut(ctx context.Context, bkt *data.BucketInfo, p *PutObjec }, nil } -func formRawObject(p *PutObjectParams, bktID *cid.ID, own *owner.ID, obj string) *object.RawObject { - attributes := make([]*object.Attribute, 0, len(p.Header)+2) - filename := object.NewAttribute() - filename.SetKey(object.AttributeFileName) - filename.SetValue(obj) - - createdAt := object.NewAttribute() - createdAt.SetKey(object.AttributeTimestamp) - createdAt.SetValue(strconv.FormatInt(time.Now().UTC().Unix(), 10)) - - attributes = append(attributes, filename, createdAt) - - for k, v := range p.Header { - ua := object.NewAttribute() - ua.SetKey(k) - ua.SetValue(v) - - attributes = append(attributes, ua) - } - - raw := object.NewRaw() - raw.SetOwnerID(own) - raw.SetContainerID(bktID) - raw.SetAttributes(attributes...) - raw.SetPayloadSize(uint64(p.Size)) - - return raw -} - func updateCRDT2PSetHeaders(header map[string]string, versions *objectVersions, versioningEnabled bool) []*oid.ID { if !versioningEnabled { header[versionsUnversionedAttr] = "true" @@ -483,11 +418,17 @@ func (n *layer) headVersion(ctx context.Context, bkt *data.BucketInfo, p *HeadOb } // objectDelete puts tombstone object into neofs. -func (n *layer) objectDelete(ctx context.Context, cid *cid.ID, oid *oid.ID) error { - addr := newAddress(cid, oid) - n.objCache.Delete(addr) - err := n.pool.DeleteObject(ctx, *addr, n.CallOptions(ctx)...) - return n.transformNeofsError(ctx, err) +func (n *layer) objectDelete(ctx context.Context, idCnr *cid.ID, idObj *oid.ID) error { + prm := PrmObjectDelete{ + Container: *idCnr, + Object: *idObj, + } + + n.prepareAuthParameters(ctx, &prm.PrmAuth) + + n.objCache.Delete(newAddress(idCnr, idObj)) + + return n.transformNeofsError(ctx, n.neoFS.DeleteObject(ctx, prm)) } // ListObjectsV1 returns objects in a bucket for requests of Version 1. @@ -737,7 +678,7 @@ func (n *layer) transformNeofsError(ctx context.Context, err error) error { return nil } - if strings.Contains(err.Error(), "access to operation") && strings.Contains(err.Error(), "is denied by") { + if errors.Is(err, ErrAccessDenied) { n.log.Debug("error was transformed", zap.String("request_id", api.GetRequestID(ctx)), zap.Error(err)) return apiErrors.GetAPIError(apiErrors.ErrAccessDenied) } diff --git a/api/layer/system_object.go b/api/layer/system_object.go index 960ddd5..9bedc52 100644 --- a/api/layer/system_object.go +++ b/api/layer/system_object.go @@ -3,8 +3,6 @@ package layer import ( "context" "encoding/xml" - "strconv" - "time" "github.com/nspcc-dev/neofs-s3-gw/api/data" "github.com/nspcc-dev/neofs-s3-gw/api/errors" @@ -45,8 +43,8 @@ func (n *layer) headSystemObject(ctx context.Context, bkt *data.BucketInfo, objN func (n *layer) deleteSystemObject(ctx context.Context, bktInfo *data.BucketInfo, name string) error { f := &findParams{ - filters: []filter{{attr: objectSystemAttributeName, val: name}}, - cid: bktInfo.CID, + attr: [2]string{objectSystemAttributeName, name}, + cid: bktInfo.CID, } ids, err := n.objectSearch(ctx, f) if err != nil { @@ -68,52 +66,41 @@ func (n *layer) putSystemObjectIntoNeoFS(ctx context.Context, p *PutSystemObject if err != nil && !errors.IsS3Error(err, errors.ErrNoSuchKey) { return nil, err } - idsToDeleteArr := updateCRDT2PSetHeaders(p.Metadata, versions, false) // false means "last write wins" - attributes := make([]*object.Attribute, 0, 3) + prm := PrmObjectCreate{ + Container: *p.BktInfo.CID, + Creator: *p.BktInfo.Owner, + Attributes: make([][2]string, 2, 2+len(p.Metadata)), + Payload: p.Reader, + } - filename := object.NewAttribute() - filename.SetKey(objectSystemAttributeName) - filename.SetValue(p.ObjName) - - createdAt := object.NewAttribute() - createdAt.SetKey(object.AttributeTimestamp) - createdAt.SetValue(strconv.FormatInt(time.Now().UTC().Unix(), 10)) - - versioningIgnore := object.NewAttribute() - versioningIgnore.SetKey(attrVersionsIgnore) - versioningIgnore.SetValue(strconv.FormatBool(true)) - - attributes = append(attributes, filename, createdAt, versioningIgnore) + prm.Attributes[0][0], prm.Attributes[0][1] = objectSystemAttributeName, p.ObjName + prm.Attributes[1][0], prm.Attributes[1][1] = attrVersionsIgnore, "true" for k, v := range p.Metadata { - attr := object.NewAttribute() if !IsSystemHeader(k) { k = p.Prefix + k } - attr.SetKey(k) - if p.Prefix == tagPrefix && v == "" { + + if v == "" && p.Prefix == tagPrefix { v = tagEmptyMark } - attr.SetValue(v) - attributes = append(attributes, attr) + + prm.Attributes = append(prm.Attributes, [2]string{k, v}) } - raw := object.NewRaw() - raw.SetOwnerID(p.BktInfo.Owner) - raw.SetContainerID(p.BktInfo.CID) - raw.SetAttributes(attributes...) - - oid, err := n.pool.PutObject(ctx, *raw.Object(), p.Reader, n.CallOptions(ctx)...) + id, err := n.neoFS.CreateObject(ctx, prm) if err != nil { return nil, n.transformNeofsError(ctx, err) } - meta, err := n.objectHead(ctx, p.BktInfo.CID, oid) + meta, err := n.objectHead(ctx, p.BktInfo.CID, id) if err != nil { return nil, err } + idsToDeleteArr := updateCRDT2PSetHeaders(p.Metadata, versions, false) // false means "last write wins" + for _, id := range idsToDeleteArr { if err = n.objectDelete(ctx, p.BktInfo.CID, id); err != nil { n.log.Warn("couldn't delete system object", @@ -178,8 +165,8 @@ func (n *layer) getCORS(ctx context.Context, bkt *data.BucketInfo, sysName strin func (n *layer) headSystemVersions(ctx context.Context, bkt *data.BucketInfo, sysName string) (*objectVersions, error) { f := &findParams{ - filters: []filter{{attr: objectSystemAttributeName, val: sysName}}, - cid: bkt.CID, + attr: [2]string{objectSystemAttributeName, sysName}, + cid: bkt.CID, } ids, err := n.objectSearch(ctx, f) if err != nil { diff --git a/api/layer/versioning_test.go b/api/layer/versioning_test.go index fcc6074..ac486c6 100644 --- a/api/layer/versioning_test.go +++ b/api/layer/versioning_test.go @@ -15,47 +15,168 @@ import ( "github.com/nspcc-dev/neofs-s3-gw/api" "github.com/nspcc-dev/neofs-s3-gw/api/data" "github.com/nspcc-dev/neofs-s3-gw/creds/accessbox" - "github.com/nspcc-dev/neofs-sdk-go/accounting" "github.com/nspcc-dev/neofs-sdk-go/container" cid "github.com/nspcc-dev/neofs-sdk-go/container/id" - "github.com/nspcc-dev/neofs-sdk-go/eacl" "github.com/nspcc-dev/neofs-sdk-go/logger" "github.com/nspcc-dev/neofs-sdk-go/object" "github.com/nspcc-dev/neofs-sdk-go/object/address" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" "github.com/nspcc-dev/neofs-sdk-go/object/id/test" "github.com/nspcc-dev/neofs-sdk-go/owner" - "github.com/nspcc-dev/neofs-sdk-go/pool" - "github.com/nspcc-dev/neofs-sdk-go/session" - "github.com/nspcc-dev/neofs-sdk-go/token" + tokentest "github.com/nspcc-dev/neofs-sdk-go/token/test" "github.com/stretchr/testify/require" ) -type testPool struct { - pool.Pool +type testNeoFS struct { + NeoFS objects map[string]*object.Object containers map[string]*container.Container currentEpoch uint64 } -func newTestPool() *testPool { - return &testPool{ - objects: make(map[string]*object.Object), - containers: make(map[string]*container.Container), +func (t *testNeoFS) CreateContainer(_ context.Context, prm PrmContainerCreate) (*cid.ID, error) { + var opts []container.Option + + opts = append(opts, + container.WithOwnerID(&prm.Creator), + container.WithPolicy(&prm.Policy), + container.WithCustomBasicACL(prm.BasicACL), + container.WithAttribute(container.AttributeTimestamp, strconv.FormatInt(prm.Time.Unix(), 10)), + ) + + if prm.Name != "" { + opts = append(opts, container.WithAttribute(container.AttributeName, prm.Name)) } + + cnr := container.New(opts...) + cnr.SetSessionToken(prm.SessionToken) + + if prm.Name != "" { + container.SetNativeName(cnr, prm.Name) + } + + b := make([]byte, 32) + if _, err := io.ReadFull(rand.Reader, b); err != nil { + return nil, err + } + + id := cid.New() + id.SetSHA256(sha256.Sum256(b)) + t.containers[id.String()] = cnr + + return id, nil } -func (t *testPool) PutObject(_ context.Context, hdr object.Object, payload io.Reader, _ ...pool.CallOption) (*oid.ID, error) { +func (t *testNeoFS) Container(_ context.Context, id cid.ID) (*container.Container, error) { + for k, v := range t.containers { + if k == id.String() { + return v, nil + } + } + + return nil, fmt.Errorf("container not found " + id.String()) +} + +func (t *testNeoFS) UserContainers(_ context.Context, _ owner.ID) ([]cid.ID, error) { + var res []cid.ID + for k := range t.containers { + var idCnr cid.ID + if err := idCnr.Parse(k); err != nil { + return nil, err + } + res = append(res, idCnr) + } + + return res, nil +} + +func (t *testNeoFS) SelectObjects(_ context.Context, prm PrmObjectSelect) ([]oid.ID, error) { + var filters object.SearchFilters + filters.AddRootFilter() + + if prm.FilePrefix != "" { + filters.AddFilter(object.AttributeFileName, prm.FilePrefix, object.MatchCommonPrefix) + } + + if prm.ExactAttribute[0] != "" { + filters.AddFilter(prm.ExactAttribute[0], prm.ExactAttribute[1], object.MatchStringEqual) + } + + cidStr := prm.Container.String() + + var res []oid.ID + + if len(filters) == 1 { + for k, v := range t.objects { + if strings.Contains(k, cidStr) { + res = append(res, *v.ID()) + } + } + return res, nil + } + + filter := filters[1] + if len(filters) != 2 || filter.Operation() != object.MatchStringEqual || + (filter.Header() != object.AttributeFileName && filter.Header() != objectSystemAttributeName) { + return nil, fmt.Errorf("usupported filters") + } + + for k, v := range t.objects { + if strings.Contains(k, cidStr) && isMatched(v.Attributes(), filter) { + res = append(res, *v.ID()) + } + } + + return res, nil +} + +func (t *testNeoFS) ReadObject(_ context.Context, prm PrmObjectRead) (*ObjectPart, error) { + var addr address.Address + addr.SetContainerID(&prm.Container) + addr.SetObjectID(&prm.Object) + + sAddr := addr.String() + + if obj, ok := t.objects[sAddr]; ok { + return &ObjectPart{ + Head: obj, + Payload: io.NopCloser(bytes.NewReader(obj.Payload())), + }, nil + } + + return nil, fmt.Errorf("object not found " + addr.String()) +} + +func (t *testNeoFS) CreateObject(_ context.Context, prm PrmObjectCreate) (*oid.ID, error) { id := test.ID() - raw := object.NewRawFrom(&hdr) + attrs := make([]*object.Attribute, 0) + + if prm.Filename != "" { + a := object.NewAttribute() + a.SetKey(object.AttributeFileName) + a.SetValue(prm.Filename) + attrs = append(attrs, a) + } + + for i := range prm.Attributes { + a := object.NewAttribute() + a.SetKey(prm.Attributes[i][0]) + a.SetValue(prm.Attributes[i][1]) + attrs = append(attrs, a) + } + + raw := object.NewRaw() + raw.SetContainerID(&prm.Container) raw.SetID(id) + raw.SetPayloadSize(prm.PayloadSize) + raw.SetAttributes(attrs...) raw.SetCreationEpoch(t.currentEpoch) t.currentEpoch++ - if payload != nil { - all, err := io.ReadAll(payload) + if prm.Payload != nil { + all, err := io.ReadAll(prm.Payload) if err != nil { return nil, err } @@ -67,60 +188,21 @@ func (t *testPool) PutObject(_ context.Context, hdr object.Object, payload io.Re return raw.ID(), nil } -func (t *testPool) DeleteObject(ctx context.Context, addr address.Address, option ...pool.CallOption) error { +func (t *testNeoFS) DeleteObject(_ context.Context, prm PrmObjectDelete) error { + var addr address.Address + addr.SetContainerID(&prm.Container) + addr.SetObjectID(&prm.Object) + delete(t.objects, addr.String()) + return nil } -func (t *testPool) GetObject(_ context.Context, addr address.Address, _ ...pool.CallOption) (*pool.ResGetObject, error) { - sAddr := addr.String() - - if obj, ok := t.objects[sAddr]; ok { - return &pool.ResGetObject{ - Header: *obj, - Payload: io.NopCloser(bytes.NewReader(obj.Payload())), - }, nil +func newTestPool() *testNeoFS { + return &testNeoFS{ + objects: make(map[string]*object.Object), + containers: make(map[string]*container.Container), } - - return nil, fmt.Errorf("object not found " + addr.String()) -} - -func (t *testPool) HeadObject(ctx context.Context, addr address.Address, _ ...pool.CallOption) (*object.Object, error) { - res, err := t.GetObject(ctx, addr) - if err != nil { - return nil, err - } - - return &res.Header, nil -} - -func (t *testPool) SearchObjects(_ context.Context, idCnr cid.ID, filters object.SearchFilters, _ ...pool.CallOption) (*pool.ResObjectSearch, error) { - cidStr := idCnr.String() - - var res []*oid.ID - - if len(filters) == 1 { - for k, v := range t.objects { - if strings.Contains(k, cidStr) { - res = append(res, v.ID()) - } - } - return nil, nil - } - - filter := filters[1] - if len(filters) != 2 || filter.Operation() != object.MatchStringEqual || - (filter.Header() != object.AttributeFileName && filter.Header() != objectSystemAttributeName) { - return nil, fmt.Errorf("usupported filters") - } - - for k, v := range t.objects { - if strings.Contains(k, cidStr) && isMatched(v.Attributes(), filter) { - res = append(res, v.ID()) - } - } - - return nil, nil } func isMatched(attributes []*object.Attribute, filter object.SearchFilter) bool { @@ -133,79 +215,6 @@ func isMatched(attributes []*object.Attribute, filter object.SearchFilter) bool return false } -func (t *testPool) PutContainer(ctx context.Context, container *container.Container, option ...pool.CallOption) (*cid.ID, error) { - b := make([]byte, 32) - if _, err := io.ReadFull(rand.Reader, b); err != nil { - return nil, err - } - - id := cid.New() - id.SetSHA256(sha256.Sum256(b)) - t.containers[id.String()] = container - - return id, nil -} - -func (t *testPool) GetContainer(ctx context.Context, id *cid.ID, option ...pool.CallOption) (*container.Container, error) { - for k, v := range t.containers { - if k == id.String() { - return v, nil - } - } - - return nil, fmt.Errorf("container not found " + id.String()) -} - -func (t *testPool) ListContainers(ctx context.Context, id *owner.ID, option ...pool.CallOption) ([]*cid.ID, error) { - var res []*cid.ID - for k := range t.containers { - cID := cid.New() - if err := cID.Parse(k); err != nil { - return nil, err - } - res = append(res, cID) - } - - return res, nil -} - -func (t *testPool) DeleteContainer(ctx context.Context, id *cid.ID, option ...pool.CallOption) error { - delete(t.containers, id.String()) - return nil -} - -func (t *testPool) GetEACL(ctx context.Context, id *cid.ID, option ...pool.CallOption) (*eacl.Table, error) { - panic("implement me") -} - -func (t *testPool) Balance(ctx context.Context, owner *owner.ID, opts ...pool.CallOption) (*accounting.Decimal, error) { - panic("implement me") -} - -func (t *testPool) SetEACL(ctx context.Context, table *eacl.Table, option ...pool.CallOption) error { - panic("implement me") -} - -func (t *testPool) AnnounceContainerUsedSpace(ctx context.Context, announcements []container.UsedSpaceAnnouncement, option ...pool.CallOption) error { - panic("implement me") -} - -func (t *testPool) Connection() (pool.Client, *session.Token, error) { - panic("implement me") -} - -func (t *testPool) Close() { - panic("implement me") -} - -func (t *testPool) OwnerID() *owner.ID { - return nil -} - -func (t *testPool) WaitForContainerPresence(ctx context.Context, id *cid.ID, params *pool.ContainerPollingParams) error { - return nil -} - func (tc *testContext) putObject(content []byte) *data.ObjectInfo { objInfo, err := tc.layer.PutObject(tc.ctx, &PutObjectParams{ Bucket: tc.bktID.String(), @@ -298,7 +307,7 @@ func (tc *testContext) checkListObjects(ids ...*oid.ID) { } func (tc *testContext) getSystemObject(objectName string) *object.Object { - for _, obj := range tc.testPool.objects { + for _, obj := range tc.testNeoFS.objects { for _, attr := range obj.Attributes() { if attr.Key() == objectSystemAttributeName && attr.Value() == objectName { return obj @@ -309,22 +318,25 @@ func (tc *testContext) getSystemObject(objectName string) *object.Object { } type testContext struct { - t *testing.T - ctx context.Context - layer Client - bkt string - bktID *cid.ID - obj string - testPool *testPool + t *testing.T + ctx context.Context + layer Client + bkt string + bktID *cid.ID + obj string + testNeoFS *testNeoFS } func prepareContext(t *testing.T, cachesConfig ...*CachesConfig) *testContext { key, err := keys.NewPrivateKey() require.NoError(t, err) + bearerToken := tokentest.BearerToken() + require.NoError(t, bearerToken.SignToken(&key.PrivateKey)) + ctx := context.WithValue(context.Background(), api.BoxData, &accessbox.Box{ Gate: &accessbox.GateData{ - BearerToken: token.NewBearerToken(), + BearerToken: bearerToken, GateKey: key.PublicKey(), }, }) @@ -333,8 +345,9 @@ func prepareContext(t *testing.T, cachesConfig ...*CachesConfig) *testContext { tp := newTestPool() bktName := "testbucket1" - cnr := container.New(container.WithAttribute(container.AttributeName, bktName)) - bktID, err := tp.PutContainer(ctx, cnr) + bktID, err := tp.CreateContainer(ctx, PrmContainerCreate{ + Name: bktName, + }) require.NoError(t, err) config := DefaultCachesConfigs() @@ -348,19 +361,17 @@ func prepareContext(t *testing.T, cachesConfig ...*CachesConfig) *testContext { } return &testContext{ - ctx: ctx, - layer: NewLayer(l, tp, layerCfg), - bkt: bktName, - bktID: bktID, - obj: "obj1", - t: t, - testPool: tp, + ctx: ctx, + layer: NewLayer(l, tp, layerCfg), + bkt: bktName, + bktID: bktID, + obj: "obj1", + t: t, + testNeoFS: tp, } } func TestSimpleVersioning(t *testing.T) { - // https://github.com/nspcc-dev/neofs-s3-gw/issues/349 - t.Skip("pool.Pool does not support overriding") tc := prepareContext(t) _, err := tc.layer.PutBucketVersioning(tc.ctx, &PutVersioningParams{ Bucket: tc.bktID.String(), @@ -385,8 +396,6 @@ func TestSimpleVersioning(t *testing.T) { } func TestSimpleNoVersioning(t *testing.T) { - // https://github.com/nspcc-dev/neofs-s3-gw/issues/349 - t.Skip("pool.Pool does not support overriding") tc := prepareContext(t) obj1Content1 := []byte("content obj1 v1") @@ -404,8 +413,6 @@ func TestSimpleNoVersioning(t *testing.T) { } func TestVersioningDeleteObject(t *testing.T) { - // https://github.com/nspcc-dev/neofs-s3-gw/issues/349 - t.Skip("pool.Pool does not support overriding") tc := prepareContext(t) _, err := tc.layer.PutBucketVersioning(tc.ctx, &PutVersioningParams{ Bucket: tc.bktID.String(), @@ -423,8 +430,6 @@ func TestVersioningDeleteObject(t *testing.T) { } func TestVersioningDeleteSpecificObjectVersion(t *testing.T) { - // https://github.com/nspcc-dev/neofs-s3-gw/issues/349 - t.Skip("pool.Pool does not support overriding") tc := prepareContext(t) _, err := tc.layer.PutBucketVersioning(tc.ctx, &PutVersioningParams{ Bucket: tc.bktID.String(), @@ -532,8 +537,6 @@ func TestGetLastVersion(t *testing.T) { } func TestNoVersioningDeleteObject(t *testing.T) { - // https://github.com/nspcc-dev/neofs-s3-gw/issues/349 - t.Skip("pool.Pool does not support overriding") tc := prepareContext(t) tc.putObject([]byte("content obj1 v1")) @@ -789,8 +792,6 @@ func TestUpdateCRDT2PSetHeaders(t *testing.T) { } func TestSystemObjectsVersioning(t *testing.T) { - // https://github.com/nspcc-dev/neofs-s3-gw/issues/349 - t.Skip("pool.Pool does not support overriding") cacheConfig := DefaultCachesConfigs() cacheConfig.System.Lifetime = 0 @@ -801,7 +802,7 @@ func TestSystemObjectsVersioning(t *testing.T) { }) require.NoError(t, err) - objMeta, ok := tc.testPool.objects[objInfo.Address().String()] + objMeta, ok := tc.testNeoFS.objects[objInfo.Address().String()] require.True(t, ok) _, err = tc.layer.PutBucketVersioning(tc.ctx, &PutVersioningParams{ @@ -811,7 +812,7 @@ func TestSystemObjectsVersioning(t *testing.T) { require.NoError(t, err) // simulate failed deletion - tc.testPool.objects[objInfo.Address().String()] = objMeta + tc.testNeoFS.objects[objInfo.Address().String()] = objMeta versioning, err := tc.layer.GetBucketVersioning(tc.ctx, tc.bkt) require.NoError(t, err) @@ -819,8 +820,6 @@ func TestSystemObjectsVersioning(t *testing.T) { } func TestDeleteSystemObjectsVersioning(t *testing.T) { - // https://github.com/nspcc-dev/neofs-s3-gw/issues/349 - t.Skip("pool.Pool does not support overriding") cacheConfig := DefaultCachesConfigs() cacheConfig.System.Lifetime = 0 @@ -840,7 +839,7 @@ func TestDeleteSystemObjectsVersioning(t *testing.T) { require.NoError(t, err) // simulate failed deletion - tc.testPool.objects[newAddress(objMeta.ContainerID(), objMeta.ID()).String()] = objMeta + tc.testNeoFS.objects[newAddress(objMeta.ContainerID(), objMeta.ID()).String()] = objMeta tagging, err := tc.layer.GetBucketTagging(tc.ctx, tc.bkt) require.NoError(t, err) diff --git a/api/resolver/resolver.go b/api/resolver/resolver.go index b515e4a..02fb8e3 100644 --- a/api/resolver/resolver.go +++ b/api/resolver/resolver.go @@ -5,24 +5,27 @@ import ( "fmt" "github.com/nspcc-dev/neo-go/pkg/rpc/client" - neofsclient "github.com/nspcc-dev/neofs-sdk-go/client" - apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" cid "github.com/nspcc-dev/neofs-sdk-go/container/id" - "github.com/nspcc-dev/neofs-sdk-go/netmap" - "github.com/nspcc-dev/neofs-sdk-go/pool" "github.com/nspcc-dev/neofs-sdk-go/resolver" ) const ( NNSResolver = "nns" DNSResolver = "dns" - - networkSystemDNSParam = "SystemDNS" ) +// NeoFS represents virtual connection to the NeoFS network. +type NeoFS interface { + // SystemDNS reads system DNS network parameters of the NeoFS. + // + // Returns exactly on non-zero value. Returns any error encountered + // which prevented the parameter to be read. + SystemDNS(context.Context) (string, error) +} + type Config struct { - Pool pool.Pool - RPC *client.Client + NeoFS NeoFS + RPC *client.Client } type BucketResolver struct { @@ -69,7 +72,7 @@ func NewResolver(order []string, cfg *Config) (*BucketResolver, error) { func newResolver(name string, cfg *Config, next *BucketResolver) (*BucketResolver, error) { switch name { case DNSResolver: - return NewDNSResolver(cfg.Pool, next) + return NewDNSResolver(cfg.NeoFS, next) case NNSResolver: return NewNNSResolver(cfg.RPC, next) default: @@ -77,38 +80,15 @@ func newResolver(name string, cfg *Config, next *BucketResolver) (*BucketResolve } } -func NewDNSResolver(p pool.Pool, next *BucketResolver) (*BucketResolver, error) { - if p == nil { +func NewDNSResolver(neoFS NeoFS, next *BucketResolver) (*BucketResolver, error) { + if neoFS == nil { return nil, fmt.Errorf("pool must not be nil for DNS resolver") } resolveFunc := func(ctx context.Context, name string) (*cid.ID, error) { - conn, _, err := p.Connection() + domain, err := neoFS.SystemDNS(ctx) if err != nil { - return nil, err - } - - networkInfoRes, err := conn.NetworkInfo(ctx, neofsclient.PrmNetworkInfo{}) - if err == nil { - err = apistatus.ErrFromStatus(networkInfoRes.Status()) - } - if err != nil { - return nil, err - } - - networkInfo := networkInfoRes.Info() - - var domain string - networkInfo.NetworkConfig().IterateParameters(func(parameter *netmap.NetworkParameter) bool { - if string(parameter.Key()) == networkSystemDNSParam { - domain = string(parameter.Value()) - return true - } - return false - }) - - if domain == "" { - return nil, fmt.Errorf("couldn't resolve container '%s': not found", name) + return nil, fmt.Errorf("read system DNS parameter of the NeoFS: %w", err) } domain = name + "." + domain diff --git a/authmate/authmate.go b/authmate/authmate.go index 7cb626e..105015a 100644 --- a/authmate/authmate.go +++ b/authmate/authmate.go @@ -3,14 +3,12 @@ package authmate import ( "context" "crypto/ecdsa" - "encoding/binary" "encoding/hex" "encoding/json" "fmt" "io" "math" "os" - "strconv" "time" "github.com/google/uuid" @@ -18,35 +16,75 @@ import ( "github.com/nspcc-dev/neofs-s3-gw/api/cache" "github.com/nspcc-dev/neofs-s3-gw/creds/accessbox" "github.com/nspcc-dev/neofs-s3-gw/creds/tokens" - "github.com/nspcc-dev/neofs-sdk-go/acl" - "github.com/nspcc-dev/neofs-sdk-go/client" - apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" - "github.com/nspcc-dev/neofs-sdk-go/container" cid "github.com/nspcc-dev/neofs-sdk-go/container/id" "github.com/nspcc-dev/neofs-sdk-go/eacl" "github.com/nspcc-dev/neofs-sdk-go/netmap" "github.com/nspcc-dev/neofs-sdk-go/object/address" "github.com/nspcc-dev/neofs-sdk-go/owner" "github.com/nspcc-dev/neofs-sdk-go/policy" - "github.com/nspcc-dev/neofs-sdk-go/pool" "github.com/nspcc-dev/neofs-sdk-go/session" "github.com/nspcc-dev/neofs-sdk-go/token" "go.uber.org/zap" ) -const ( - defaultAuthContainerBasicACL acl.BasicACL = 0b00111100100011001000110011001110 // 0x3C8C8CCE - private container with only GET allowed to others -) +// PrmContainerCreate groups parameters of containers created by authmate. +type PrmContainerCreate struct { + // NeoFS identifier of the container creator. + Owner owner.ID + + // Container placement policy. + Policy netmap.PlacementPolicy + + // Friendly name for the container (optional). + FriendlyName string +} + +// NetworkState represents NeoFS network state which is needed for authmate processing. +type NetworkState struct { + // Current NeoFS time. + Epoch uint64 + // Duration of the Morph chain block in ms. + BlockDuration int64 + // Duration of the NeoFS epoch in Morph chain blocks. + EpochDuration uint64 +} + +// NeoFS represents virtual connection to NeoFS network. +type NeoFS interface { + // NeoFS interface required by credential tool. + tokens.NeoFS + + // ContainerExists checks container presence in NeoFS by identifier. + // Returns nil iff container exists. + ContainerExists(context.Context, cid.ID) error + + // CreateContainer creates and saves parameterized container in NeoFS. + // Returns ID of the saved container. + // + // The container must be private with GET access of OTHERS group. + // Creation time should also be stamped. + // + // Returns exactly one non-nil value. Returns any error encountered which + // prevented the container to be created. + CreateContainer(context.Context, PrmContainerCreate) (*cid.ID, error) + + // NetworkState returns current state of the NeoFS network. + // Returns any error encountered which prevented state to be read. + // + // Returns exactly one non-nil value. Returns any error encountered which + // prevented the state to be read. + NetworkState(context.Context) (*NetworkState, error) +} // Agent contains client communicating with NeoFS and logger. type Agent struct { - pool pool.Pool - log *zap.Logger + neoFS NeoFS + log *zap.Logger } // New creates an object of type Agent that consists of Client and logger. -func New(log *zap.Logger, conns pool.Pool) *Agent { - return &Agent{log: log, pool: conns} +func New(log *zap.Logger, neoFS NeoFS) *Agent { + return &Agent{log: log, neoFS: neoFS} } type ( @@ -85,12 +123,6 @@ type lifetimeOptions struct { Exp uint64 } -type epochDurations struct { - currentEpoch uint64 - msPerBlock int64 - blocksInEpoch uint64 -} - type ( issuingResult struct { AccessKeyID string `json:"access_key_id"` @@ -108,8 +140,7 @@ type ( func (a *Agent) checkContainer(ctx context.Context, opts ContainerOptions, idOwner *owner.ID) (*cid.ID, error) { if opts.ID != nil { // check that container exists - _, err := a.pool.GetContainer(ctx, opts.ID) - return opts.ID, err + return opts.ID, a.neoFS.ContainerExists(ctx, *opts.ID) } pp, err := policy.Parse(opts.PlacementPolicy) @@ -117,62 +148,18 @@ func (a *Agent) checkContainer(ctx context.Context, opts ContainerOptions, idOwn return nil, fmt.Errorf("failed to build placement policy: %w", err) } - cnrOptions := []container.Option{ - container.WithPolicy(pp), - container.WithCustomBasicACL(defaultAuthContainerBasicACL), - container.WithAttribute(container.AttributeTimestamp, strconv.FormatInt(time.Now().Unix(), 10)), - container.WithOwnerID(idOwner), - } - if opts.FriendlyName != "" { - cnrOptions = append(cnrOptions, container.WithAttribute(container.AttributeName, opts.FriendlyName)) - } - - cnr := container.New(cnrOptions...) - if opts.FriendlyName != "" { - container.SetNativeName(cnr, opts.FriendlyName) - } - - cnrID, err := a.pool.PutContainer(ctx, cnr) + cnrID, err := a.neoFS.CreateContainer(ctx, PrmContainerCreate{ + Owner: *idOwner, + Policy: *pp, + FriendlyName: opts.FriendlyName, + }) if err != nil { return nil, err } - if err := a.pool.WaitForContainerPresence(ctx, cnrID, pool.DefaultPollingParams()); err != nil { - return nil, err - } return cnrID, nil } -func (a *Agent) getEpochDurations(ctx context.Context) (*epochDurations, error) { - if conn, _, err := a.pool.Connection(); err != nil { - return nil, err - } else if networkInfoRes, err := conn.NetworkInfo(ctx, client.PrmNetworkInfo{}); err != nil { - return nil, err - } else if err = apistatus.ErrFromStatus(networkInfoRes.Status()); err != nil { - return nil, err - } else { - networkInfo := networkInfoRes.Info() - res := &epochDurations{ - currentEpoch: networkInfo.CurrentEpoch(), - msPerBlock: networkInfo.MsPerBlock(), - } - - networkInfo.NetworkConfig().IterateParameters(func(parameter *netmap.NetworkParameter) bool { - if string(parameter.Key()) == "EpochDuration" { - data := make([]byte, 8) - copy(data, parameter.Value()) - res.blocksInEpoch = binary.LittleEndian.Uint64(data) - return true - } - return false - }) - if res.blocksInEpoch == 0 { - return nil, fmt.Errorf("not found param: EpochDuration") - } - return res, nil - } -} - func checkPolicy(policyString string) (*netmap.PlacementPolicy, error) { result, err := policy.Parse(policyString) if err == nil { @@ -226,12 +213,12 @@ func (a *Agent) IssueSecret(ctx context.Context, w io.Writer, options *IssueSecr return err } - durations, err := a.getEpochDurations(ctx) + netState, err := a.neoFS.NetworkState(ctx) if err != nil { return err } - lifetime.Iat = durations.currentEpoch - msPerEpoch := durations.blocksInEpoch * uint64(durations.msPerBlock) + lifetime.Iat = netState.Epoch + msPerEpoch := netState.EpochDuration * uint64(netState.BlockDuration) epochLifetime := uint64(options.Lifetime.Milliseconds()) / msPerEpoch if uint64(options.Lifetime.Milliseconds())%msPerEpoch != 0 { epochLifetime++ @@ -268,7 +255,7 @@ func (a *Agent) IssueSecret(ctx context.Context, w io.Writer, options *IssueSecr zap.Stringer("owner_tkn", idOwner)) addr, err := tokens. - New(a.pool, secrets.EphemeralKey, cache.DefaultAccessBoxConfig()). + New(a.neoFS, secrets.EphemeralKey, cache.DefaultAccessBoxConfig()). Put(ctx, id, idOwner, box, lifetime.Exp, options.GatesPublicKeys...) if err != nil { return fmt.Errorf("failed to put bearer token: %w", err) @@ -310,7 +297,7 @@ func (a *Agent) IssueSecret(ctx context.Context, w io.Writer, options *IssueSecr // ObtainSecret receives an existing secret access key from NeoFS and // writes to io.Writer the secret access key. func (a *Agent) ObtainSecret(ctx context.Context, w io.Writer, options *ObtainSecretOptions) error { - bearerCreds := tokens.New(a.pool, options.GatePrivateKey, cache.DefaultAccessBoxConfig()) + bearerCreds := tokens.New(a.neoFS, options.GatePrivateKey, cache.DefaultAccessBoxConfig()) addr := address.NewAddress() if err := addr.Parse(options.SecretAddress); err != nil { return fmt.Errorf("failed to parse secret address: %w", err) diff --git a/cmd/authmate/main.go b/cmd/authmate/main.go index cf7b826..6f28cce 100644 --- a/cmd/authmate/main.go +++ b/cmd/authmate/main.go @@ -13,6 +13,7 @@ import ( "github.com/nspcc-dev/neo-go/pkg/crypto/keys" "github.com/nspcc-dev/neofs-s3-gw/authmate" + "github.com/nspcc-dev/neofs-s3-gw/internal/neofs" "github.com/nspcc-dev/neofs-s3-gw/internal/version" "github.com/nspcc-dev/neofs-s3-gw/internal/wallet" cid "github.com/nspcc-dev/neofs-sdk-go/container/id" @@ -238,12 +239,12 @@ It will be ceil rounded to the nearest amount of epoch.`, ctx, cancel := context.WithCancel(ctx) defer cancel() - client, err := createSDKClient(ctx, log, &key.PrivateKey, peerAddressFlag) + neoFS, err := createNeoFS(ctx, log, &key.PrivateKey, peerAddressFlag) if err != nil { - return cli.Exit(fmt.Sprintf("failed to create sdk client: %s", err), 2) + return cli.Exit(fmt.Sprintf("failed to create NeoFS component: %s", err), 2) } - agent := authmate.New(log, client) + agent := authmate.New(log, neoFS) var containerID *cid.ID if len(containerIDFlag) > 0 { containerID = cid.New() @@ -388,12 +389,12 @@ func obtainSecret() *cli.Command { ctx, cancel := context.WithCancel(ctx) defer cancel() - client, err := createSDKClient(ctx, log, &key.PrivateKey, peerAddressFlag) + neoFS, err := createNeoFS(ctx, log, &key.PrivateKey, peerAddressFlag) if err != nil { - return cli.Exit(fmt.Sprintf("failed to create sdk client: %s", err), 2) + return cli.Exit(fmt.Sprintf("failed to create NeoFS component: %s", err), 2) } - agent := authmate.New(log, client) + agent := authmate.New(log, neoFS) var _ = agent @@ -424,7 +425,7 @@ func obtainSecret() *cli.Command { return command } -func createSDKClient(ctx context.Context, log *zap.Logger, key *ecdsa.PrivateKey, peerAddress string) (pool.Pool, error) { +func createNeoFS(ctx context.Context, log *zap.Logger, key *ecdsa.PrivateKey, peerAddress string) (authmate.NeoFS, error) { log.Debug("prepare connection pool") pb := new(pool.Builder) @@ -435,5 +436,13 @@ func createSDKClient(ctx context.Context, log *zap.Logger, key *ecdsa.PrivateKey NodeConnectionTimeout: poolConnectTimeout, NodeRequestTimeout: poolRequestTimeout, } - return pb.Build(ctx, opts) + p, err := pb.Build(ctx, opts) + if err != nil { + return nil, err + } + + var neoFS neofs.AuthmateNeoFS + neoFS.SetConnectionPool(p) + + return &neoFS, nil } diff --git a/cmd/s3-gw/app.go b/cmd/s3-gw/app.go index 98fe939..bdbc102 100644 --- a/cmd/s3-gw/app.go +++ b/cmd/s3-gw/app.go @@ -17,6 +17,7 @@ import ( "github.com/nspcc-dev/neofs-s3-gw/api/layer" "github.com/nspcc-dev/neofs-s3-gw/api/notifications" "github.com/nspcc-dev/neofs-s3-gw/api/resolver" + "github.com/nspcc-dev/neofs-s3-gw/internal/neofs" "github.com/nspcc-dev/neofs-s3-gw/internal/version" "github.com/nspcc-dev/neofs-s3-gw/internal/wallet" "github.com/nspcc-dev/neofs-sdk-go/policy" @@ -28,13 +29,13 @@ import ( type ( // App is the main application structure. App struct { - pool pool.Pool - ctr auth.Center - log *zap.Logger - cfg *viper.Viper - tls *tlsConfig - obj layer.Client - api api.Handler + ctr auth.Center + log *zap.Logger + cfg *viper.Viper + tls *tlsConfig + obj layer.Client + api api.Handler + nc *notifications.Controller maxClients api.MaxClients @@ -50,7 +51,6 @@ type ( func newApp(ctx context.Context, l *zap.Logger, v *viper.Viper) *App { var ( - conns pool.Pool key *keys.PrivateKey err error tls *tlsConfig @@ -109,7 +109,7 @@ func newApp(ctx context.Context, l *zap.Logger, v *viper.Viper) *App { NodeRequestTimeout: reqTimeout, ClientRebalanceInterval: reBalance, } - conns, err = poolPeers.Build(ctx, opts) + conns, err := poolPeers.Build(ctx, opts) if err != nil { l.Fatal("failed to create connection pool", zap.Error(err)) } @@ -120,8 +120,11 @@ func newApp(ctx context.Context, l *zap.Logger, v *viper.Viper) *App { l.Fatal("couldn't generate random key", zap.Error(err)) } + var neoFS neofs.NeoFS + neoFS.SetConnectionPool(conns) + resolveCfg := &resolver.Config{ - Pool: conns, + NeoFS: &neoFS, } if rpcEndpoint := v.GetString(cfgRPCEndpoint); rpcEndpoint != "" { @@ -155,12 +158,16 @@ func newApp(ctx context.Context, l *zap.Logger, v *viper.Viper) *App { NotificationController: nc, } + var n neofs.NeoFS + n.SetConnectionPool(conns) + // prepare object layer - obj = layer.NewLayer(l, conns, layerCfg) + obj = layer.NewLayer(l, &layerNeoFS{&n}, layerCfg) // prepare auth center - ctr = auth.New(conns, key, getAccessBoxCacheConfig(v, l)) - + ctr = auth.New(&neofs.AuthmateNeoFS{ + NeoFS: n, + }, key, getAccessBoxCacheConfig(v, l)) handlerOptions := getHandlerOptions(v, l) if caller, err = handler.New(l, obj, handlerOptions); err != nil { @@ -168,13 +175,13 @@ func newApp(ctx context.Context, l *zap.Logger, v *viper.Viper) *App { } return &App{ - ctr: ctr, - pool: conns, - log: l, - cfg: v, - obj: obj, - tls: tls, - api: caller, + ctr: ctr, + log: l, + cfg: v, + obj: obj, + tls: tls, + api: caller, + nc: nc, webDone: make(chan struct{}, 1), wrkDone: make(chan struct{}, 1), diff --git a/cmd/s3-gw/neofs.go b/cmd/s3-gw/neofs.go new file mode 100644 index 0000000..ea213fd --- /dev/null +++ b/cmd/s3-gw/neofs.go @@ -0,0 +1,42 @@ +package main + +import ( + "context" + "time" + + "github.com/nspcc-dev/neofs-s3-gw/api/layer" + "github.com/nspcc-dev/neofs-s3-gw/internal/neofs" + cid "github.com/nspcc-dev/neofs-sdk-go/container/id" + oid "github.com/nspcc-dev/neofs-sdk-go/object/id" +) + +// mediator which implements layer.NeoFS through neofs.NeoFS. +type layerNeoFS struct { + *neofs.NeoFS +} + +func (x *layerNeoFS) CreateContainer(ctx context.Context, prm layer.PrmContainerCreate) (*cid.ID, error) { + return x.NeoFS.CreateContainer(ctx, neofs.PrmContainerCreate{ + Creator: prm.Creator, + Policy: prm.Policy, + Name: prm.Name, + Time: prm.Time, + BasicACL: prm.BasicACL, + SessionToken: prm.SessionToken, + LocationConstraintAttribute: prm.LocationConstraintAttribute, + }) +} + +func (x *layerNeoFS) CreateObject(ctx context.Context, prm layer.PrmObjectCreate) (*oid.ID, error) { + return x.NeoFS.CreateObject(ctx, neofs.PrmObjectCreate{ + Creator: prm.Creator, + Container: prm.Container, + Time: time.Now().UTC(), + Filename: prm.Filename, + PayloadSize: prm.PayloadSize, + Attributes: prm.Attributes, + Payload: prm.Payload, + BearerToken: prm.BearerToken, + PrivateKey: prm.PrivateKey, + }) +} diff --git a/creds/tokens/credentials.go b/creds/tokens/credentials.go index ef818d8..e250ec2 100644 --- a/creds/tokens/credentials.go +++ b/creds/tokens/credentials.go @@ -4,7 +4,6 @@ import ( "context" "errors" "fmt" - "io" "strconv" "time" @@ -12,10 +11,9 @@ import ( "github.com/nspcc-dev/neofs-s3-gw/api/cache" "github.com/nspcc-dev/neofs-s3-gw/creds/accessbox" cid "github.com/nspcc-dev/neofs-sdk-go/container/id" - "github.com/nspcc-dev/neofs-sdk-go/object" "github.com/nspcc-dev/neofs-sdk-go/object/address" + oid "github.com/nspcc-dev/neofs-sdk-go/object/id" "github.com/nspcc-dev/neofs-sdk-go/owner" - "github.com/nspcc-dev/neofs-sdk-go/pool" ) type ( @@ -27,11 +25,49 @@ type ( cred struct { key *keys.PrivateKey - pool pool.Pool + neoFS NeoFS cache *cache.AccessBoxCache } ) +// PrmObjectCreate groups parameters of objects created by credential tool. +type PrmObjectCreate struct { + // NeoFS identifier of the object creator. + Creator owner.ID + + // NeoFS container to store the object. + Container cid.ID + + // Object creation time. + Time time.Time + + // File name. + Filename string + + // Last NeoFS epoch of the object lifetime. + ExpirationEpoch uint64 + + // Object payload. + Payload []byte +} + +// NeoFS represents virtual connection to NeoFS network. +type NeoFS interface { + // CreateObject creates and saves a parameterized object in the specified + // NeoFS container from a specific user. Returns ID of the saved object. + // + // Returns exactly one non-nil value. Returns any error encountered which + // prevented the object to be created. + CreateObject(context.Context, PrmObjectCreate) (*oid.ID, error) + + // ReadObjectPayload reads payload of the object from NeoFS network by address + // into memory. + // + // Returns exactly one non-nil value. Returns any error encountered which + // prevented the object payload to be read. + ReadObjectPayload(context.Context, address.Address) ([]byte, error) +} + var ( // ErrEmptyPublicKeys is returned when no HCS keys are provided. ErrEmptyPublicKeys = errors.New("HCS public keys could not be empty") @@ -42,8 +78,8 @@ var ( var _ = New // New creates new Credentials instance using given cli and key. -func New(conns pool.Pool, key *keys.PrivateKey, config *cache.Config) Credentials { - return &cred{pool: conns, key: key, cache: cache.NewAccessBoxCache(config)} +func New(neoFS NeoFS, key *keys.PrivateKey, config *cache.Config) Credentials { + return &cred{neoFS: neoFS, key: key, cache: cache.NewAccessBoxCache(config)} } func (c *cred) GetBox(ctx context.Context, addr *address.Address) (*accessbox.Box, error) { @@ -70,24 +106,9 @@ func (c *cred) GetBox(ctx context.Context, addr *address.Address) (*accessbox.Bo } func (c *cred) getAccessBox(ctx context.Context, addr *address.Address) (*accessbox.AccessBox, error) { - // init payload reader - res, err := c.pool.GetObject(ctx, *addr) + data, err := c.neoFS.ReadObjectPayload(ctx, *addr) if err != nil { - return nil, fmt.Errorf("client pool failure: %w", err) - } - - defer res.Payload.Close() - - // read payload - var data []byte - - if sz := res.Header.PayloadSize(); sz > 0 { - data = make([]byte, sz) - - _, err = io.ReadFull(res.Payload, data) - if err != nil { - return nil, fmt.Errorf("read payload: %w", err) - } + return nil, fmt.Errorf("read payload: %w", err) } // decode access box @@ -99,10 +120,10 @@ func (c *cred) getAccessBox(ctx context.Context, addr *address.Address) (*access return &box, nil } -func (c *cred) Put(ctx context.Context, cid *cid.ID, issuer *owner.ID, box *accessbox.AccessBox, expiration uint64, keys ...*keys.PublicKey) (*address.Address, error) { +func (c *cred) Put(ctx context.Context, idCnr *cid.ID, issuer *owner.ID, box *accessbox.AccessBox, expiration uint64, keys ...*keys.PublicKey) (*address.Address, error) { var ( err error - created = strconv.FormatInt(time.Now().Unix(), 10) + created = time.Now() ) if len(keys) == 0 { @@ -115,31 +136,20 @@ func (c *cred) Put(ctx context.Context, cid *cid.ID, issuer *owner.ID, box *acce return nil, err } - timestamp := object.NewAttribute() - timestamp.SetKey(object.AttributeTimestamp) - timestamp.SetValue(created) - - filename := object.NewAttribute() - filename.SetKey(object.AttributeFileName) - filename.SetValue(created + "_access.box") - - expirationAttr := object.NewAttribute() - expirationAttr.SetKey("__NEOFS__EXPIRATION_EPOCH") - expirationAttr.SetValue(strconv.FormatUint(expiration, 10)) - - raw := object.NewRaw() - raw.SetContainerID(cid) - raw.SetOwnerID(issuer) - raw.SetAttributes(filename, timestamp, expirationAttr) - raw.SetPayload(data) - - oid, err := c.pool.PutObject(ctx, *raw.Object(), nil) + idObj, err := c.neoFS.CreateObject(ctx, PrmObjectCreate{ + Creator: *issuer, + Container: *idCnr, + Time: created, + Filename: strconv.FormatInt(created.Unix(), 10) + "_access.box", + ExpirationEpoch: expiration, + Payload: data, + }) if err != nil { return nil, err } addr := address.NewAddress() - addr.SetObjectID(oid) - addr.SetContainerID(cid) + addr.SetObjectID(idObj) + addr.SetContainerID(idCnr) return addr, nil } diff --git a/internal/neofs/neofs.go b/internal/neofs/neofs.go new file mode 100644 index 0000000..4cf6dc5 --- /dev/null +++ b/internal/neofs/neofs.go @@ -0,0 +1,589 @@ +package neofs + +import ( + "bytes" + "context" + "crypto/ecdsa" + "encoding/binary" + "errors" + "fmt" + "io" + "strconv" + "strings" + "time" + + "github.com/nspcc-dev/neofs-s3-gw/api/layer" + "github.com/nspcc-dev/neofs-s3-gw/authmate" + "github.com/nspcc-dev/neofs-s3-gw/creds/tokens" + "github.com/nspcc-dev/neofs-sdk-go/acl" + "github.com/nspcc-dev/neofs-sdk-go/client" + "github.com/nspcc-dev/neofs-sdk-go/container" + cid "github.com/nspcc-dev/neofs-sdk-go/container/id" + "github.com/nspcc-dev/neofs-sdk-go/eacl" + "github.com/nspcc-dev/neofs-sdk-go/netmap" + "github.com/nspcc-dev/neofs-sdk-go/object" + "github.com/nspcc-dev/neofs-sdk-go/object/address" + oid "github.com/nspcc-dev/neofs-sdk-go/object/id" + "github.com/nspcc-dev/neofs-sdk-go/owner" + "github.com/nspcc-dev/neofs-sdk-go/pool" + "github.com/nspcc-dev/neofs-sdk-go/session" + "github.com/nspcc-dev/neofs-sdk-go/token" +) + +// NeoFS represents virtual connection to the NeoFS network. +// It is used to provide an interface to dependent packages +// which work with NeoFS. +type NeoFS struct { + pool pool.Pool +} + +// SetConnectionPool binds underlying pool.Pool. Must be +// called on initialization stage before any usage. +func (x *NeoFS) SetConnectionPool(p pool.Pool) { + x.pool = p +} + +// NetworkState implements authmate.NeoFS interface method. +func (x *NeoFS) NetworkState(ctx context.Context) (*authmate.NetworkState, error) { + conn, _, err := x.pool.Connection() + if err != nil { + return nil, fmt.Errorf("get connection from pool: %w", err) + } + + res, err := conn.NetworkInfo(ctx, client.PrmNetworkInfo{}) + if err != nil { + return nil, fmt.Errorf("get network info via client: %w", err) + } + + networkInfo := res.Info() + var durEpoch uint64 + + networkInfo.NetworkConfig().IterateParameters(func(parameter *netmap.NetworkParameter) bool { + if string(parameter.Key()) == "EpochDuration" { + data := make([]byte, 8) + + copy(data, parameter.Value()) + + durEpoch = binary.LittleEndian.Uint64(data) + + return true + } + + return false + }) + + if durEpoch == 0 { + return nil, errors.New("epoch duration is missing or zero") + } + + return &authmate.NetworkState{ + Epoch: networkInfo.CurrentEpoch(), + BlockDuration: networkInfo.MsPerBlock(), + EpochDuration: durEpoch, + }, nil +} + +// Container reads container by ID using connection pool. Returns exact one non-nil value. +func (x *NeoFS) Container(ctx context.Context, idCnr cid.ID) (*container.Container, error) { + res, err := x.pool.GetContainer(ctx, &idCnr) + if err != nil { + return nil, fmt.Errorf("read container via connection pool: %w", err) + } + + return res, nil +} + +// ContainerExists implements authmate.NeoFS interface method. +func (x *NeoFS) ContainerExists(ctx context.Context, idCnr cid.ID) error { + _, err := x.pool.GetContainer(ctx, &idCnr) + if err != nil { + return fmt.Errorf("get container via connection pool: %w", err) + } + + return nil +} + +// PrmContainerCreate groups parameters of CreateContainer operation. +type PrmContainerCreate struct { + // NeoFS identifier of the container creator. + Creator owner.ID + + // Container placement policy. + Policy netmap.PlacementPolicy + + // Name for the container. + Name string + + // Time when container is created. + Time time.Time + + // Basic ACL of the container. + BasicACL acl.BasicACL + + // Token of the container's creation session (optional, nil means session absence). + SessionToken *session.Token + + // Attribute for LocationConstraint parameter (optional). + LocationConstraintAttribute *container.Attribute +} + +// CreateContainer constructs new container from the parameters and saves it in NeoFS +// using connection pool. Returns any error encountered which prevent the container +// to be saved. +func (x *NeoFS) CreateContainer(ctx context.Context, prm PrmContainerCreate) (*cid.ID, error) { + // fill container structure + cnrOptions := []container.Option{ + container.WithPolicy(&prm.Policy), + container.WithOwnerID(&prm.Creator), + container.WithCustomBasicACL(prm.BasicACL), + container.WithAttribute(container.AttributeTimestamp, strconv.FormatInt(time.Now().Unix(), 10)), + } + + if prm.Name != "" { + cnrOptions = append(cnrOptions, container.WithAttribute(container.AttributeName, prm.Name)) + } + + if prm.LocationConstraintAttribute != nil { + cnrOptions = append(cnrOptions, container.WithAttribute( + prm.LocationConstraintAttribute.Key(), + prm.LocationConstraintAttribute.Value(), + )) + } + + cnr := container.New(cnrOptions...) + cnr.SetSessionToken(prm.SessionToken) + + if prm.Name != "" { + container.SetNativeName(cnr, prm.Name) + } + + // send request to save the container + idCnr, err := x.pool.PutContainer(ctx, cnr) + if err != nil { + return nil, fmt.Errorf("save container via connection pool: %w", err) + } + + // wait the container to be persisted + err = x.pool.WaitForContainerPresence(ctx, idCnr, pool.DefaultPollingParams()) + if err != nil { + return nil, fmt.Errorf("wait for container to be saved: %w", err) + } + + return idCnr, nil +} + +// UserContainers reads list of user containers from NeoFS using connection pool. +// Returns any error encountered which prevent the containers to be listed. +func (x *NeoFS) UserContainers(ctx context.Context, id owner.ID) ([]cid.ID, error) { + r, err := x.pool.ListContainers(ctx, &id) + if err != nil { + return nil, fmt.Errorf("list user containers via connection pool: %w", err) + } + + res := make([]cid.ID, len(r)) + for i := range r { + res[i] = *r[i] + } + + return res, nil +} + +// SetContainerEACL saves eACL table of the container in NeoFS using connection pool. +// Returns any error encountered which prevented the eACL to be saved. +func (x *NeoFS) SetContainerEACL(ctx context.Context, table eacl.Table) error { + err := x.pool.SetEACL(ctx, &table) + if err != nil { + return fmt.Errorf("save eACL via connection pool: %w", err) + } + + return err +} + +// ContainerEACL reads eACL table of the container from NeoFS using connection pool. +// Returns any error encountered which prevented the eACL to be read. +func (x *NeoFS) ContainerEACL(ctx context.Context, id cid.ID) (*eacl.Table, error) { + res, err := x.pool.GetEACL(ctx, &id) + if err != nil { + return nil, fmt.Errorf("read eACL via connection pool: %w", err) + } + + return res, nil +} + +// DeleteContainer marks container to be removed from NeoFS using connection pool. +// Returns any error encountered which prevented removal request to be sent. +func (x *NeoFS) DeleteContainer(ctx context.Context, id cid.ID, token *session.Token) error { + err := x.pool.DeleteContainer(ctx, &id, pool.WithSession(token)) + if err != nil { + return fmt.Errorf("delete container via connection pool: %w", err) + } + + return nil +} + +type PrmObjectCreate struct { + // NeoFS identifier of the object creator. + Creator owner.ID + + // NeoFS container to store the object. + Container cid.ID + + // Object creation time. + Time time.Time + + // Associated filename (optional). + Filename string + + // Last NeoFS epoch of the object lifetime (optional). + ExpirationEpoch uint64 + + // Full payload size (optional). + PayloadSize uint64 + + // Key-value object attributes. + Attributes [][2]string + + // Object payload encapsulated in io.Reader primitive. + Payload io.Reader + + // Bearer token to be used for the operation. Overlaps PrivateKey. Optional. + BearerToken *token.BearerToken + + // Private key used for the operation if BearerToken is missing (in this case non-nil). + PrivateKey *ecdsa.PrivateKey +} + +// CreateObject creates and saves a parameterized object in the specified +// NeoFS container from a specific user. Returns ID of the saved object. +// +// Returns exactly one non-nil value. Returns any error encountered which +// prevented the object to be created. +func (x *NeoFS) CreateObject(ctx context.Context, prm PrmObjectCreate) (*oid.ID, error) { + attrNum := len(prm.Attributes) + 1 // + creation time + + if prm.Filename != "" { + attrNum++ + } + + if prm.ExpirationEpoch > 0 { + attrNum++ + } + + attrs := make([]*object.Attribute, 0, attrNum) + var a *object.Attribute + + a = object.NewAttribute() + a.SetKey(object.AttributeTimestamp) + a.SetValue(strconv.FormatInt(prm.Time.Unix(), 10)) + attrs = append(attrs, a) + + for i := range prm.Attributes { + a = object.NewAttribute() + a.SetKey(prm.Attributes[i][0]) + a.SetValue(prm.Attributes[i][1]) + attrs = append(attrs, a) + } + + if prm.Filename != "" { + a = object.NewAttribute() + a.SetKey(object.AttributeFileName) + a.SetValue(prm.Filename) + attrs = append(attrs, a) + } + + if prm.ExpirationEpoch > 0 { + a = object.NewAttribute() + a.SetKey("__NEOFS__EXPIRATION_EPOCH") + a.SetValue(strconv.FormatUint(prm.ExpirationEpoch, 10)) + attrs = append(attrs, a) + } + + raw := object.NewRaw() + raw.SetContainerID(&prm.Container) + raw.SetOwnerID(&prm.Creator) + raw.SetAttributes(attrs...) + raw.SetPayloadSize(prm.PayloadSize) + + var callOpt pool.CallOption + + if prm.BearerToken != nil { + callOpt = pool.WithBearer(prm.BearerToken) + } else { + callOpt = pool.WithKey(prm.PrivateKey) + } + + idObj, err := x.pool.PutObject(ctx, *raw.Object(), prm.Payload, callOpt) + if err != nil { + return nil, fmt.Errorf("save object via connection pool: %w", err) + } + + return idObj, nil +} + +// SelectObjects selects user objects which match specified filters from the NeoFS container +// using connection pool. +// +// Returns any error encountered which prevented the selection to be finished. +// Returns layer.ErrAccessDenied on access violation. +func (x *NeoFS) SelectObjects(ctx context.Context, prm layer.PrmObjectSelect) ([]oid.ID, error) { + var filters object.SearchFilters + filters.AddRootFilter() + + if prm.ExactAttribute[0] != "" { + filters.AddFilter(prm.ExactAttribute[0], prm.ExactAttribute[1], object.MatchStringEqual) + } + + if prm.FilePrefix != "" { + filters.AddFilter(object.AttributeFileName, prm.FilePrefix, object.MatchCommonPrefix) + } + + var callOpt pool.CallOption + + if prm.BearerToken != nil { + callOpt = pool.WithBearer(prm.BearerToken) + } else { + callOpt = pool.WithKey(prm.PrivateKey) + } + + res, err := x.pool.SearchObjects(ctx, prm.Container, filters, callOpt) + if err != nil { + return nil, fmt.Errorf("init object search via connection pool: %w", err) + } + + defer res.Close() + + var num, read int + buf := make([]oid.ID, 10) + + for { + num, err = res.Read(buf[read:]) + if num > 0 { + read += num + buf = append(buf, oid.ID{}) + buf = buf[:cap(buf)] + } + + if err != nil { + if errors.Is(err, io.EOF) { + break + } + + // TODO: (neofs-s3-gw#367) use NeoFS SDK API to check the status return + if strings.Contains(err.Error(), "access to operation") && strings.Contains(err.Error(), "is denied by") { + return nil, layer.ErrAccessDenied + } + + return nil, fmt.Errorf("read object list: %w", err) + } + } + + return buf[:read], nil +} + +// wraps io.ReadCloser and transforms Read errors related to access violation +// to layer.ErrAccessDenied. +type payloadReader struct { + io.ReadCloser +} + +func (x payloadReader) Read(p []byte) (int, error) { + n, err := x.ReadCloser.Read(p) + if err != nil { + // TODO: (neofs-s3-gw#367) use NeoFS SDK API to check the status return + if strings.Contains(err.Error(), "access to operation") && strings.Contains(err.Error(), "is denied by") { + return n, layer.ErrAccessDenied + } + } + + return n, err +} + +// ReadObject reads object part from the NeoFS container by identifier using connection pool: +// * if with header only, then HeadObject is called; +// * if with non-zero payload range only, then ObjectRange is called; +// * else GetObject is called. +// +// Returns any error encountered which prevented the object to be read. +// Returns layer.ErrAccessDenied on access violation. +func (x *NeoFS) ReadObject(ctx context.Context, prm layer.PrmObjectRead) (*layer.ObjectPart, error) { + var addr address.Address + addr.SetContainerID(&prm.Container) + addr.SetObjectID(&prm.Object) + + var callOpt pool.CallOption + + if prm.BearerToken != nil { + callOpt = pool.WithBearer(prm.BearerToken) + } else { + callOpt = pool.WithKey(prm.PrivateKey) + } + + if prm.WithHeader { + if prm.WithPayload { + res, err := x.pool.GetObject(ctx, addr, callOpt) + if err != nil { + // TODO: (neofs-s3-gw#367) use NeoFS SDK API to check the status return + if strings.Contains(err.Error(), "access to operation") && strings.Contains(err.Error(), "is denied by") { + return nil, layer.ErrAccessDenied + } + + return nil, fmt.Errorf("init full object reading via connection pool: %w", err) + } + + return &layer.ObjectPart{ + Head: &res.Header, + Payload: res.Payload, + }, nil + } + + hdr, err := x.pool.HeadObject(ctx, addr, callOpt) + if err != nil { + // TODO: (neofs-s3-gw#367) use NeoFS SDK API to check the status return + if strings.Contains(err.Error(), "access to operation") && strings.Contains(err.Error(), "is denied by") { + return nil, layer.ErrAccessDenied + } + + return nil, fmt.Errorf("read object header via connection pool: %w", err) + } + + return &layer.ObjectPart{ + Head: hdr, + }, nil + } else if prm.PayloadRange[0]+prm.PayloadRange[1] == 0 { + res, err := x.pool.GetObject(ctx, addr, callOpt) + if err != nil { + // TODO: (neofs-s3-gw#367) use NeoFS SDK API to check the status return + if strings.Contains(err.Error(), "access to operation") && strings.Contains(err.Error(), "is denied by") { + return nil, layer.ErrAccessDenied + } + + return nil, fmt.Errorf("init full payload range reading via connection pool: %w", err) + } + + return &layer.ObjectPart{ + Payload: res.Payload, + }, nil + } + + res, err := x.pool.ObjectRange(ctx, addr, prm.PayloadRange[0], prm.PayloadRange[1], callOpt) + if err != nil { + // TODO: (neofs-s3-gw#367) use NeoFS SDK API to check the status return + if strings.Contains(err.Error(), "access to operation") && strings.Contains(err.Error(), "is denied by") { + return nil, layer.ErrAccessDenied + } + + return nil, fmt.Errorf("init payload range reading via connection pool: %w", err) + } + + return &layer.ObjectPart{ + Payload: payloadReader{res}, + }, nil +} + +// DeleteObject marks the object to be removed from the NeoFS container by identifier. +// Successful return does not guarantee the actual removal. +// +// Returns ErrAccessDenied on remove access violation. +// +// Returns any error encountered which prevented the removal request to be sent. +func (x *NeoFS) DeleteObject(ctx context.Context, prm layer.PrmObjectDelete) error { + var addr address.Address + addr.SetContainerID(&prm.Container) + addr.SetObjectID(&prm.Object) + + var callOpt pool.CallOption + + if prm.BearerToken != nil { + callOpt = pool.WithBearer(prm.BearerToken) + } else { + callOpt = pool.WithKey(prm.PrivateKey) + } + + err := x.pool.DeleteObject(ctx, addr, callOpt) + if err != nil { + // TODO: (neofs-s3-gw#367) use NeoFS SDK API to check the status return + if strings.Contains(err.Error(), "access to operation") && strings.Contains(err.Error(), "is denied by") { + return layer.ErrAccessDenied + } + + return fmt.Errorf("mark object removal via connection pool: %w", err) + } + + return nil +} + +// AuthmateNeoFS is a mediator which implements authmate.NeoFS through NeoFS. +type AuthmateNeoFS struct { + NeoFS +} + +func (x *AuthmateNeoFS) CreateContainer(ctx context.Context, prm authmate.PrmContainerCreate) (*cid.ID, error) { + return x.NeoFS.CreateContainer(ctx, PrmContainerCreate{ + Creator: prm.Owner, + Policy: prm.Policy, + Name: prm.FriendlyName, + Time: time.Now(), + BasicACL: 0b0011_1100_1000_1100_1000_1100_1100_1110, // 0x3C8C8CCE + }) +} + +func (x *AuthmateNeoFS) ReadObjectPayload(ctx context.Context, addr address.Address) ([]byte, error) { + res, err := x.NeoFS.ReadObject(ctx, layer.PrmObjectRead{ + Container: *addr.ContainerID(), + Object: *addr.ObjectID(), + WithPayload: true, + }) + if err != nil { + return nil, err + } + + defer res.Payload.Close() + + return io.ReadAll(res.Payload) +} + +func (x *AuthmateNeoFS) CreateObject(ctx context.Context, prm tokens.PrmObjectCreate) (*oid.ID, error) { + return x.NeoFS.CreateObject(ctx, PrmObjectCreate{ + Creator: prm.Creator, + Container: prm.Container, + Time: prm.Time, + Filename: prm.Filename, + ExpirationEpoch: prm.ExpirationEpoch, + Payload: bytes.NewReader(prm.Payload), + }) +} + +// SystemDNS reads "SystemDNS" network parameter of the NeoFS. +// +// Returns exactly on non-zero value. Returns any error encountered +// which prevented the parameter to be read. +func (x *NeoFS) SystemDNS(ctx context.Context) (string, error) { + conn, _, err := x.pool.Connection() + if err != nil { + return "", fmt.Errorf("get connection from the pool: %w", err) + } + + var prmCli client.PrmNetworkInfo + + res, err := conn.NetworkInfo(ctx, prmCli) + if err != nil { + return "", fmt.Errorf("read network info via client: %w", err) + } + + var domain string + + res.Info().NetworkConfig().IterateParameters(func(parameter *netmap.NetworkParameter) bool { + if string(parameter.Key()) == "SystemDNS" { + domain = string(parameter.Value()) + return true + } + + return false + }) + + if domain == "" { + return "", errors.New("system DNS parameter not found or empty") + } + + return domain, nil +}