From b19e3a48db1033053dde93205939d6a65337aec1 Mon Sep 17 00:00:00 2001 From: Leonard Lyubich Date: Fri, 4 Sep 2020 13:05:57 +0300 Subject: [PATCH] [#135] sdk/client: Implement object operations Signed-off-by: Leonard Lyubich --- pkg/client/object.go | 881 +++++++++++++++++++++++++++++++++++++++++++ pkg/client/opts.go | 3 + 2 files changed, 884 insertions(+) create mode 100644 pkg/client/object.go diff --git a/pkg/client/object.go b/pkg/client/object.go new file mode 100644 index 0000000..3bedecf --- /dev/null +++ b/pkg/client/object.go @@ -0,0 +1,881 @@ +package client + +import ( + "bytes" + "context" + "crypto/ecdsa" + "crypto/sha256" + "fmt" + "io" + + "github.com/nspcc-dev/neofs-api-go/pkg/object" + "github.com/nspcc-dev/neofs-api-go/v2/client" + v2object "github.com/nspcc-dev/neofs-api-go/v2/object" + v2refs "github.com/nspcc-dev/neofs-api-go/v2/refs" + "github.com/nspcc-dev/neofs-api-go/v2/signature" + "github.com/pkg/errors" +) + +type Range struct { + ln, off uint64 +} + +type PutObjectParams struct { + obj *object.Object + + r io.Reader +} + +type DeleteObjectParams struct { + addr *object.Address +} + +type GetObjectParams struct { + addr *object.Address + + w io.Writer +} + +type ObjectHeaderParams struct { + addr *object.Address + + short bool +} + +type RangeDataParams struct { + addr *object.Address + + r *Range + + w io.Writer +} + +type RangeChecksumParams struct { + typ checksumType + + addr *object.Address + + rs []*Range + + salt []byte +} + +type putObjectV2Writer struct { + key *ecdsa.PrivateKey + + chunkPart *v2object.PutObjectPartChunk + + req *v2object.PutRequest + + stream v2object.PutObjectStreamer +} + +type checksumType int + +const ( + _ checksumType = iota + checksumSHA256 + checksumTZ +) + +const chunkSize = 3 * (1 << 20) + +const TZSize = 64 + +func rangesToV2(rs []*Range) []*v2object.Range { + r2 := make([]*v2object.Range, 0, len(rs)) + + for i := range rs { + r2 = append(r2, rs[i].toV2()) + } + + return r2 +} + +func (t checksumType) toV2() v2refs.ChecksumType { + switch t { + case checksumSHA256: + return v2refs.SHA256 + case checksumTZ: + return v2refs.TillichZemor + default: + panic(fmt.Sprintf("invalid checksum type %d", t)) + } +} + +func (r *Range) WithLength(v uint64) *Range { + if r != nil { + r.ln = v + } + + return r +} + +func (r *Range) WithOffset(v uint64) *Range { + if r != nil { + r.off = v + } + + return r +} + +func (r *Range) toV2() *v2object.Range { + if r != nil { + r2 := new(v2object.Range) + r2.SetOffset(r.off) + r2.SetLength(r.ln) + + return r2 + } + + return nil +} + +func (w *putObjectV2Writer) Write(p []byte) (int, error) { + w.chunkPart.SetChunk(p) + + w.req.SetVerificationHeader(nil) + + if err := signature.SignServiceMessage(w.key, w.req); err != nil { + return 0, errors.Wrap(err, "could not sign chunk request message") + } + + if err := w.stream.Send(w.req); err != nil { + return 0, errors.Wrap(err, "could not send chunk request message") + } + + return len(p), nil +} + +func (p *PutObjectParams) WithObject(v *object.Object) *PutObjectParams { + if p != nil { + p.obj = v + } + + return p +} + +func (p *PutObjectParams) WithPayloadReader(v io.Reader) *PutObjectParams { + if p != nil { + p.r = v + } + + return p +} + +func (c *Client) PutObject(ctx context.Context, p *PutObjectParams, opts ...CallOption) (*object.ID, error) { + // check remote node version + switch c.remoteNode.Version.Major { + case 2: + return c.putObjectV2(ctx, p, opts...) + default: + return nil, unsupportedProtocolErr + } +} + +func (c *Client) putObjectV2(ctx context.Context, p *PutObjectParams, opts ...CallOption) (*object.ID, error) { + // convert object to V2 + obj, err := p.obj.ToV2(c.key) + if err != nil { + return nil, errors.Wrap(err, "could not convert object to V2") + } + + // create V2 Object client + cli, err := v2ObjectClient(c.remoteNode.Protocol, c.opts) + if err != nil { + return nil, errors.Wrap(err, "could not create Object V2 client") + } + + stream, err := cli.Put(ctx) + if err != nil { + return nil, errors.Wrap(err, "could not open Put object stream") + } + + callOpts := defaultCallOptions() + + for i := range opts { + if opts[i] != nil { + opts[i].apply(&callOpts) + } + } + + // create request + req := new(v2object.PutRequest) + + // initialize request body + body := new(v2object.PutRequestBody) + req.SetBody(body) + + // set meta header + req.SetMetaHeader(v2MetaHeaderFromOpts(callOpts)) + + // initialize init part + initPart := new(v2object.PutObjectPartInit) + body.SetObjectPart(initPart) + + // set init part fields + initPart.SetObjectID(obj.GetObjectID()) + initPart.SetSignature(obj.GetSignature()) + initPart.SetHeader(obj.GetHeader()) + + // sign the request + if err := signature.SignServiceMessage(c.key, req); err != nil { + return nil, errors.Wrapf(err, "could not sign %T", req) + } + + // send init part + if err := stream.Send(req); err != nil { + return nil, errors.Wrapf(err, "could not send %T", req) + } + + // create payload bytes reader + var rPayload io.Reader = bytes.NewReader(obj.GetPayload()) + if p.r != nil { + rPayload = io.MultiReader(rPayload, p.r) + } + + // create v2 payload stream writer + chunkPart := new(v2object.PutObjectPartChunk) + body.SetObjectPart(chunkPart) + + w := &putObjectV2Writer{ + key: c.key, + chunkPart: chunkPart, + req: req, + stream: stream, + } + + // copy payload from reader to stream writer + if _, err := io.CopyBuffer(w, rPayload, make([]byte, chunkSize)); err != nil { + return nil, errors.Wrap(err, "could not send payload bytes to Put object stream") + } + + // close object stream and receive response from remote node + resp, err := stream.CloseAndRecv() + if err != nil { + return nil, errors.Wrapf(err, "could not close %T", stream) + } + + // verify response structure + if err := signature.VerifyServiceMessage(resp); err != nil { + return nil, errors.Wrapf(err, "could not verify %T", resp) + } + + // convert object identifier + id, err := object.IDFromV2(resp.GetBody().GetObjectID()) + if err != nil { + return nil, errors.Wrap(err, "could not convert object identifier") + } + + return id, nil +} + +func (p *DeleteObjectParams) WithAddress(v *object.Address) *DeleteObjectParams { + if p != nil { + p.addr = v + } + + return p +} + +func (c *Client) DeleteObject(ctx context.Context, p *DeleteObjectParams, opts ...CallOption) error { + // check remote node version + switch c.remoteNode.Version.Major { + case 2: + return c.deleteObjectV2(ctx, p, opts...) + default: + return unsupportedProtocolErr + } +} + +func (c *Client) deleteObjectV2(ctx context.Context, p *DeleteObjectParams, opts ...CallOption) error { + // create V2 Object client + cli, err := v2ObjectClient(c.remoteNode.Protocol, c.opts) + if err != nil { + return errors.Wrap(err, "could not create Object V2 client") + } + + callOpts := defaultCallOptions() + + for i := range opts { + if opts[i] != nil { + opts[i].apply(&callOpts) + } + } + + // create request + req := new(v2object.DeleteRequest) + + // initialize request body + body := new(v2object.DeleteRequestBody) + req.SetBody(body) + + // set meta header + req.SetMetaHeader(v2MetaHeaderFromOpts(callOpts)) + + // fill body fields + body.SetAddress(p.addr.ToV2()) + + // sign the request + if err := signature.SignServiceMessage(c.key, req); err != nil { + return errors.Wrapf(err, "could not sign %T", req) + } + + // send request + resp, err := cli.Delete(ctx, req) + if err != nil { + return errors.Wrapf(err, "could not send %T", req) + } + + // verify response structure + if err := signature.VerifyServiceMessage(resp); err != nil { + return errors.Wrapf(err, "could not verify %T", resp) + } + + return nil +} + +func (p *GetObjectParams) WithAddress(v *object.Address) *GetObjectParams { + if p != nil { + p.addr = v + } + + return p +} + +func (p *GetObjectParams) WithPayloadWriter(w io.Writer) *GetObjectParams { + if p != nil { + p.w = w + } + + return p +} + +func (c *Client) GetObject(ctx context.Context, p *GetObjectParams, opts ...CallOption) (*object.Object, error) { + // check remote node version + switch c.remoteNode.Version.Major { + case 2: + return c.getObjectV2(ctx, p, opts...) + default: + return nil, unsupportedProtocolErr + } +} + +func (c *Client) getObjectV2(ctx context.Context, p *GetObjectParams, opts ...CallOption) (*object.Object, error) { + // create V2 Object client + cli, err := v2ObjectClient(c.remoteNode.Protocol, c.opts) + if err != nil { + return nil, errors.Wrap(err, "could not create Object V2 client") + } + + callOpts := defaultCallOptions() + + for i := range opts { + if opts[i] != nil { + opts[i].apply(&callOpts) + } + } + + // create request + req := new(v2object.GetRequest) + + // initialize request body + body := new(v2object.GetRequestBody) + req.SetBody(body) + + // set meta header + req.SetMetaHeader(v2MetaHeaderFromOpts(callOpts)) + + // fill body fields + body.SetAddress(p.addr.ToV2()) + + // sign the request + if err := signature.SignServiceMessage(c.key, req); err != nil { + return nil, errors.Wrapf(err, "could not sign %T", req) + } + + // create Get object stream + stream, err := cli.Get(ctx, req) + if err != nil { + return nil, errors.Wrap(err, "could not create Get object stream") + } + + var ( + payload []byte + obj = new(v2object.Object) + ) + + for { + // receive message from server stream + resp, err := stream.Recv() + if err != nil { + if errors.Is(errors.Cause(err), io.EOF) { + break + } + + return nil, errors.Wrap(err, "could not receive Get response") + } + + // verify response structure + if err := signature.VerifyServiceMessage(resp); err != nil { + return nil, errors.Wrapf(err, "could not verify %T", resp) + } + + switch v := resp.GetBody().GetObjectPart().(type) { + case nil: + return nil, errors.New("received nil object part") + case *v2object.GetObjectPartInit: + obj.SetObjectID(v.GetObjectID()) + obj.SetSignature(v.GetSignature()) + + hdr := v.GetHeader() + obj.SetHeader(hdr) + + if p.w == nil { + payload = make([]byte, 0, hdr.GetPayloadLength()) + } + case *v2object.GetObjectPartChunk: + if p.w != nil { + if _, err := p.w.Write(v.GetChunk()); err != nil { + return nil, errors.Wrap(err, "could not write payload chunk") + } + } else { + payload = append(payload, v.GetChunk()...) + } + default: + panic(fmt.Sprintf("unexpected Get object part type %T", v)) + } + } + + obj.SetPayload(payload) + + // convert the object + res, err := object.FromV2(obj) + if err != nil { + return nil, errors.Wrap(err, "could not convert V2 object") + } + + return res, nil +} + +func (p *ObjectHeaderParams) WithAddress(v *object.Address) *ObjectHeaderParams { + if p != nil { + p.addr = v + } + + return p +} + +func (p *ObjectHeaderParams) WithAllFields() *ObjectHeaderParams { + if p != nil { + p.short = false + } + + return p +} + +func (p *ObjectHeaderParams) WithMainFields() *ObjectHeaderParams { + if p != nil { + p.short = true + } + + return p +} + +func (c *Client) GetObjectHeader(ctx context.Context, p *ObjectHeaderParams, opts ...CallOption) (*object.Object, error) { + // check remote node version + switch c.remoteNode.Version.Major { + case 2: + return c.getObjectHeaderV2(ctx, p, opts...) + default: + return nil, unsupportedProtocolErr + } +} + +func (c *Client) getObjectHeaderV2(ctx context.Context, p *ObjectHeaderParams, opts ...CallOption) (*object.Object, error) { + // create V2 Object client + cli, err := v2ObjectClient(c.remoteNode.Protocol, c.opts) + if err != nil { + return nil, errors.Wrap(err, "could not create Object V2 client") + } + + callOpts := defaultCallOptions() + + for i := range opts { + if opts[i] != nil { + opts[i].apply(&callOpts) + } + } + + // create request + req := new(v2object.HeadRequest) + + // initialize request body + body := new(v2object.HeadRequestBody) + req.SetBody(body) + + // set meta header + req.SetMetaHeader(v2MetaHeaderFromOpts(callOpts)) + + // fill body fields + body.SetAddress(p.addr.ToV2()) + body.SetMainOnly(p.short) + + // sign the request + if err := signature.SignServiceMessage(c.key, req); err != nil { + return nil, errors.Wrapf(err, "could not sign %T", req) + } + + // send Head request + resp, err := cli.Head(ctx, req) + if err != nil { + return nil, errors.Wrapf(err, "could not send %T", req) + } + + // verify response structure + if err := signature.VerifyServiceMessage(resp); err != nil { + return nil, errors.Wrapf(err, "could not verify %T", resp) + } + + var hdr *v2object.Header + + switch v := resp.GetBody().GetHeaderPart().(type) { + case nil: + return nil, errors.New("received nil object header part") + case *v2object.GetHeaderPartShort: + if !p.short { + return nil, errors.Errorf("wrong header part type: expected %T, received %T", + (*v2object.GetHeaderPartFull)(nil), (*v2object.GetHeaderPartShort)(nil), + ) + } + + h := v.GetShortHeader() + + hdr = new(v2object.Header) + hdr.SetPayloadLength(h.GetPayloadLength()) + hdr.SetVersion(h.GetVersion()) + hdr.SetOwnerID(h.GetOwnerID()) + hdr.SetObjectType(h.GetObjectType()) + hdr.SetCreationEpoch(h.GetCreationEpoch()) + case *v2object.GetHeaderPartFull: + if p.short { + return nil, errors.Errorf("wrong header part type: expected %T, received %T", + (*v2object.GetHeaderPartShort)(nil), (*v2object.GetHeaderPartFull)(nil), + ) + } + + hdr = v.GetHeader() + default: + panic(fmt.Sprintf("unexpected Head object type %T", v)) + } + + obj := new(v2object.Object) + obj.SetHeader(hdr) + + // convert the object + res, err := object.FromV2(obj) + if err != nil { + return nil, errors.Wrap(err, "could not convert object") + } + + return res, nil +} + +func (p *RangeDataParams) WithAddress(v *object.Address) *RangeDataParams { + if p != nil { + p.addr = v + } + + return p +} + +func (p *RangeDataParams) WithRange(v *Range) *RangeDataParams { + if p != nil { + p.r = v + } + + return p +} + +func (p *RangeDataParams) WithDataWriter(v io.Writer) *RangeDataParams { + if p != nil { + p.w = v + } + + return p +} + +func (c *Client) ObjectPayloadRangeData(ctx context.Context, p *RangeDataParams, opts ...CallOption) ([]byte, error) { + // check remote node version + switch c.remoteNode.Version.Major { + case 2: + return c.objectPayloadRangeV2(ctx, p, opts...) + default: + return nil, unsupportedProtocolErr + } +} + +func (c *Client) objectPayloadRangeV2(ctx context.Context, p *RangeDataParams, opts ...CallOption) ([]byte, error) { + // create V2 Object client + cli, err := v2ObjectClient(c.remoteNode.Protocol, c.opts) + if err != nil { + return nil, errors.Wrap(err, "could not create Object V2 client") + } + + callOpts := defaultCallOptions() + + for i := range opts { + if opts[i] != nil { + opts[i].apply(&callOpts) + } + } + + // create request + req := new(v2object.GetRangeRequest) + + // initialize request body + body := new(v2object.GetRangeRequestBody) + req.SetBody(body) + + // set meta header + req.SetMetaHeader(v2MetaHeaderFromOpts(callOpts)) + + // fill body fields + body.SetAddress(p.addr.ToV2()) + body.SetRange(p.r.toV2()) + + // sign the request + if err := signature.SignServiceMessage(c.key, req); err != nil { + return nil, errors.Wrapf(err, "could not sign %T", req) + } + + // create Get payload range stream + stream, err := cli.GetRange(ctx, req) + if err != nil { + return nil, errors.Wrap(err, "could not create Get payload range stream") + } + + var payload []byte + if p.w != nil { + payload = make([]byte, p.r.ln) + } + + for { + // receive message from server stream + resp, err := stream.Recv() + if err != nil { + if errors.Is(errors.Cause(err), io.EOF) { + break + } + + return nil, errors.Wrap(err, "could not receive Get payload range response") + } + + // verify response structure + if err := signature.VerifyServiceMessage(resp); err != nil { + return nil, errors.Wrapf(err, "could not verify %T", resp) + } + + chunk := resp.GetBody().GetChunk() + + if p.w != nil { + if _, err := p.w.Write(chunk); err != nil { + return nil, errors.Wrap(err, "could not write payload chunk") + } + } else { + payload = append(payload, chunk...) + } + } + + return payload, nil +} + +func (p *RangeChecksumParams) WithAddress(v *object.Address) *RangeChecksumParams { + if p != nil { + p.addr = v + } + + return p +} + +func (p *RangeChecksumParams) WithRangeList(rs ...*Range) *RangeChecksumParams { + if p != nil { + p.rs = rs + } + + return p +} + +func (p *RangeChecksumParams) WithSalt(v []byte) *RangeChecksumParams { + if p != nil { + p.salt = v + } + + return p +} + +func (p *RangeChecksumParams) withChecksumType(t checksumType) *RangeChecksumParams { + if p != nil { + p.typ = t + } + + return p +} + +func (c *Client) ObjectPayloadRangeSHA256(ctx context.Context, p *RangeChecksumParams, opts ...CallOption) ([][sha256.Size]byte, error) { + res, err := c.objectPayloadRangeHash(ctx, p.withChecksumType(checksumSHA256), opts...) + if err != nil { + return nil, err + } + + return res.([][sha256.Size]byte), nil +} + +func (c *Client) ObjectPayloadRangeTZ(ctx context.Context, p *RangeChecksumParams, opts ...CallOption) ([][TZSize]byte, error) { + res, err := c.objectPayloadRangeHash(ctx, p.withChecksumType(checksumTZ), opts...) + if err != nil { + return nil, err + } + + return res.([][TZSize]byte), nil +} + +func (c *Client) objectPayloadRangeHash(ctx context.Context, p *RangeChecksumParams, opts ...CallOption) (interface{}, error) { + // check remote node version + switch c.remoteNode.Version.Major { + case 2: + return c.objectPayloadRangeHashV2(ctx, p, opts...) + default: + return nil, unsupportedProtocolErr + } +} + +func (c *Client) objectPayloadRangeHashV2(ctx context.Context, p *RangeChecksumParams, opts ...CallOption) (interface{}, error) { + // create V2 Object client + cli, err := v2ObjectClient(c.remoteNode.Protocol, c.opts) + if err != nil { + return nil, errors.Wrap(err, "could not create Object V2 client") + } + + callOpts := defaultCallOptions() + + for i := range opts { + if opts[i] != nil { + opts[i].apply(&callOpts) + } + } + + // create request + req := new(v2object.GetRangeHashRequest) + + // initialize request body + body := new(v2object.GetRangeHashRequestBody) + req.SetBody(body) + + // set meta header + req.SetMetaHeader(v2MetaHeaderFromOpts(callOpts)) + + // fill body fields + body.SetAddress(p.addr.ToV2()) + body.SetSalt(p.salt) + + typV2 := p.typ.toV2() + body.SetType(typV2) + + rsV2 := rangesToV2(p.rs) + body.SetRanges(rsV2) + + // sign the request + if err := signature.SignServiceMessage(c.key, req); err != nil { + return nil, errors.Wrapf(err, "could not sign %T", req) + } + + // send request + resp, err := cli.GetRangeHash(ctx, req) + if err != nil { + return nil, errors.Wrapf(err, "could not send %T", req) + } + + // verify response structure + if err := signature.VerifyServiceMessage(resp); err != nil { + return nil, errors.Wrapf(err, "could not verify %T", resp) + } + + respBody := resp.GetBody() + respType := respBody.GetType() + respHashes := respBody.GetHashList() + + if t := p.typ.toV2(); respType != t { + return nil, errors.Errorf("invalid checksum type: expected %v, received %v", t, respType) + } else if reqLn, respLn := len(rsV2), len(respHashes); reqLn != respLn { + return nil, errors.Errorf("wrong checksum number: expected %d, received %d", reqLn, respLn) + } + + var res interface{} + + switch p.typ { + case checksumSHA256: + r := make([][sha256.Size]byte, 0, len(respHashes)) + + for i := range respHashes { + if ln := len(respHashes[i]); ln != sha256.Size { + return nil, errors.Errorf("invalid checksum length: expected %d, received %d", sha256.Size, ln) + } + + cs := [sha256.Size]byte{} + copy(cs[:], respHashes[i]) + + r = append(r, cs) + } + + res = r + case checksumTZ: + r := make([][TZSize]byte, 0, len(respHashes)) + + for i := range respHashes { + if ln := len(respHashes[i]); ln != TZSize { + return nil, errors.Errorf("invalid checksum length: expected %d, received %d", TZSize, ln) + } + + cs := [TZSize]byte{} + copy(cs[:], respHashes[i]) + + r = append(r, cs) + } + + res = r + } + + return res, nil +} + +func v2ObjectClient(proto TransportProtocol, opts *clientOptions) (*v2object.Client, error) { + switch proto { + case GRPC: + var err error + + if opts.grpcOpts.objectClientV2 == nil { + var optsV2 []v2object.Option + + if opts.grpcOpts.conn != nil { + optsV2 = []v2object.Option{ + v2object.WithGlobalOpts( + client.WithGRPCConn(opts.grpcOpts.conn), + ), + } + } else { + optsV2 = []v2object.Option{ + v2object.WithGlobalOpts( + client.WithNetworkAddress(opts.addr), + ), + } + } + + opts.grpcOpts.objectClientV2, err = v2object.NewClient(optsV2...) + } + + return opts.grpcOpts.objectClientV2, err + default: + return nil, unsupportedProtocolErr + } +} diff --git a/pkg/client/opts.go b/pkg/client/opts.go index 93f72d0..ba7202c 100644 --- a/pkg/client/opts.go +++ b/pkg/client/opts.go @@ -4,6 +4,7 @@ import ( "github.com/nspcc-dev/neofs-api-go/pkg" v2accounting "github.com/nspcc-dev/neofs-api-go/v2/accounting" v2container "github.com/nspcc-dev/neofs-api-go/v2/container" + v2object "github.com/nspcc-dev/neofs-api-go/v2/object" v2session "github.com/nspcc-dev/neofs-api-go/v2/session" "google.golang.org/grpc" ) @@ -40,6 +41,8 @@ type ( conn *grpc.ClientConn v2ContainerClient *v2container.Client v2AccountingClient *v2accounting.Client + + objectClientV2 *v2object.Client } )