package client import ( "context" "errors" "fmt" "io" v2object "github.com/nspcc-dev/neofs-api-go/v2/object" v2refs "github.com/nspcc-dev/neofs-api-go/v2/refs" rpcapi "github.com/nspcc-dev/neofs-api-go/v2/rpc" "github.com/nspcc-dev/neofs-api-go/v2/rpc/client" v2session "github.com/nspcc-dev/neofs-api-go/v2/session" "github.com/nspcc-dev/neofs-api-go/v2/signature" apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" cid "github.com/nspcc-dev/neofs-sdk-go/container/id" "github.com/nspcc-dev/neofs-sdk-go/object" "github.com/nspcc-dev/neofs-sdk-go/object/address" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" signer "github.com/nspcc-dev/neofs-sdk-go/util/signature" ) // ObjectAddressWriter is an interface of the // component that writes the object address. type ObjectAddressWriter interface { SetAddress(*address.Address) } type DeleteObjectParams struct { addr *address.Address tombTgt ObjectAddressWriter } type RangeDataParams struct { addr *address.Address raw bool r *object.Range w io.Writer } type RangeChecksumParams struct { tz bool addr *address.Address rs []*object.Range salt []byte } type SearchObjectParams struct { cid *cid.ID filters object.SearchFilters } type checksumType int const ( _ checksumType = iota checksumSHA256 checksumTZ ) const TZSize = 64 const searchQueryVersion uint32 = 1 func rangesToV2(rs []*object.Range) []*v2object.Range { r2 := make([]*v2object.Range, 0, len(rs)) for i := range rs { r2 = append(r2, rs[i].ToV2()) } return r2 } func (t checksumType) toV2() v2refs.ChecksumType { switch t { case checksumSHA256: return v2refs.SHA256 case checksumTZ: return v2refs.TillichZemor default: panic(fmt.Sprintf("invalid checksum type %d", t)) } } func (p *DeleteObjectParams) WithAddress(v *address.Address) *DeleteObjectParams { if p != nil { p.addr = v } return p } func (p *DeleteObjectParams) Address() *address.Address { if p != nil { return p.addr } return nil } // WithTombstoneAddressTarget sets target component to write tombstone address. func (p *DeleteObjectParams) WithTombstoneAddressTarget(v ObjectAddressWriter) *DeleteObjectParams { if p != nil { p.tombTgt = v } return p } // TombstoneAddressTarget returns target component to write tombstone address. func (p *DeleteObjectParams) TombstoneAddressTarget() ObjectAddressWriter { if p != nil { return p.tombTgt } return nil } type ObjectDeleteRes struct { statusRes tombAddr *address.Address } func (x ObjectDeleteRes) TombstoneAddress() *address.Address { return x.tombAddr } func (x *ObjectDeleteRes) setTombstoneAddress(addr *address.Address) { x.tombAddr = addr } // DeleteObject removes object by address. // // If target of tombstone address is not set, the address is ignored. // // Any client's internal or transport errors are returned as `error`. // If WithNeoFSErrorParsing option has been provided, unsuccessful // NeoFS status codes are returned as `error`, otherwise, are included // in the returned result structure. func (c *Client) DeleteObject(ctx context.Context, p *DeleteObjectParams, opts ...CallOption) (*ObjectDeleteRes, error) { callOpts := c.defaultCallOptions() for i := range opts { if opts[i] != nil { opts[i](callOpts) } } // create request req := new(v2object.DeleteRequest) // initialize request body body := new(v2object.DeleteRequestBody) req.SetBody(body) // set meta header meta := v2MetaHeaderFromOpts(callOpts) if err := c.attachV2SessionToken(callOpts, meta, v2SessionReqInfo{ addr: p.addr.ToV2(), verb: v2session.ObjectVerbDelete, }); err != nil { return nil, fmt.Errorf("could not attach session token: %w", err) } req.SetMetaHeader(meta) // fill body fields body.SetAddress(p.addr.ToV2()) // sign the request if err := signature.SignServiceMessage(callOpts.key, req); err != nil { return nil, fmt.Errorf("signing the request failed: %w", err) } // send request resp, err := rpcapi.DeleteObject(c.Raw(), req, client.WithContext(ctx)) if err != nil { return nil, fmt.Errorf("sending the request failed: %w", err) } var ( res = new(ObjectDeleteRes) procPrm processResponseV2Prm procRes processResponseV2Res ) procPrm.callOpts = callOpts procPrm.resp = resp procRes.statusRes = res // process response in general if c.processResponseV2(&procRes, procPrm) { if procRes.cliErr != nil { return nil, procRes.cliErr } return res, nil } addrv2 := resp.GetBody().GetTombstone() res.setTombstoneAddress(address.NewAddressFromV2(addrv2)) return res, nil } var errWrongMessageSeq = errors.New("incorrect message sequence") type ObjectGetRes struct { statusRes objectRes } type objectRes struct { obj *object.Object } func (x *objectRes) setObject(obj *object.Object) { x.obj = obj } func (x objectRes) Object() *object.Object { return x.obj } func writeUnexpectedMessageTypeErr(res resCommon, val interface{}) { var st apistatus.ServerInternal // specific API status should be used apistatus.WriteInternalServerErr(&st, fmt.Errorf("unexpected message type %T", val)) res.setStatus(st) } func (p *RangeDataParams) WithAddress(v *address.Address) *RangeDataParams { if p != nil { p.addr = v } return p } func (p *RangeDataParams) Address() *address.Address { if p != nil { return p.addr } return nil } func (p *RangeDataParams) WithRaw(v bool) *RangeDataParams { if p != nil { p.raw = v } return p } func (p *RangeDataParams) Raw() bool { if p != nil { return p.raw } return false } func (p *RangeDataParams) WithRange(v *object.Range) *RangeDataParams { if p != nil { p.r = v } return p } func (p *RangeDataParams) Range() *object.Range { if p != nil { return p.r } return nil } func (p *RangeDataParams) WithDataWriter(v io.Writer) *RangeDataParams { if p != nil { p.w = v } return p } func (p *RangeDataParams) DataWriter() io.Writer { if p != nil { return p.w } return nil } type ObjectRangeRes struct { statusRes data []byte } func (x *ObjectRangeRes) setData(data []byte) { x.data = data } func (x ObjectRangeRes) Data() []byte { return x.data } // ObjectPayloadRangeData receives object's range payload data through NeoFS API call. // // Any client's internal or transport errors are returned as `error`. // If WithNeoFSErrorParsing option has been provided, unsuccessful // NeoFS status codes are returned as `error`, otherwise, are included // in the returned result structure. func (c *Client) ObjectPayloadRangeData(ctx context.Context, p *RangeDataParams, opts ...CallOption) (*ObjectRangeRes, error) { callOpts := c.defaultCallOptions() for i := range opts { if opts[i] != nil { opts[i](callOpts) } } // create request req := new(v2object.GetRangeRequest) // initialize request body body := new(v2object.GetRangeRequestBody) req.SetBody(body) // set meta header meta := v2MetaHeaderFromOpts(callOpts) if err := c.attachV2SessionToken(callOpts, meta, v2SessionReqInfo{ addr: p.addr.ToV2(), verb: v2session.ObjectVerbRange, }); err != nil { return nil, fmt.Errorf("could not attach session token: %w", err) } req.SetMetaHeader(meta) // fill body fields body.SetAddress(p.addr.ToV2()) body.SetRange(p.r.ToV2()) body.SetRaw(p.raw) // sign the request if err := signature.SignServiceMessage(callOpts.key, req); err != nil { return nil, fmt.Errorf("signing the request failed: %w", err) } // open stream stream, err := rpcapi.GetObjectRange(c.Raw(), req, client.WithContext(ctx)) if err != nil { return nil, fmt.Errorf("could not create Get payload range stream: %w", err) } var payload []byte if p.w != nil { payload = make([]byte, 0, p.r.GetLength()) } var ( resp = new(v2object.GetRangeResponse) chunkWas, messageWas bool res = new(ObjectRangeRes) procPrm processResponseV2Prm procRes processResponseV2Res ) procPrm.callOpts = callOpts procPrm.resp = resp procRes.statusRes = res for { // receive message from server stream err := stream.Read(resp) if err != nil { if errors.Is(err, io.EOF) { if !messageWas { return nil, errWrongMessageSeq } break } return nil, fmt.Errorf("reading the response failed: %w", err) } messageWas = true // process response in general if c.processResponseV2(&procRes, procPrm) { if procRes.cliErr != nil { return nil, procRes.cliErr } return res, nil } switch v := resp.GetBody().GetRangePart().(type) { case nil: writeUnexpectedMessageTypeErr(res, v) return res, nil case *v2object.GetRangePartChunk: chunkWas = true if p.w != nil { if _, err = p.w.Write(v.GetChunk()); err != nil { return nil, fmt.Errorf("could not write payload chunk: %w", err) } } else { payload = append(payload, v.GetChunk()...) } case *v2object.SplitInfo: if chunkWas { return nil, errWrongMessageSeq } si := object.NewSplitInfoFromV2(v) return nil, object.NewSplitInfoError(si) } } res.setData(payload) return res, nil } func (p *RangeChecksumParams) WithAddress(v *address.Address) *RangeChecksumParams { if p != nil { p.addr = v } return p } func (p *RangeChecksumParams) Address() *address.Address { if p != nil { return p.addr } return nil } func (p *RangeChecksumParams) WithRangeList(rs ...*object.Range) *RangeChecksumParams { if p != nil { p.rs = rs } return p } func (p *RangeChecksumParams) RangeList() []*object.Range { if p != nil { return p.rs } return nil } func (p *RangeChecksumParams) WithSalt(v []byte) *RangeChecksumParams { if p != nil { p.salt = v } return p } func (p *RangeChecksumParams) Salt() []byte { if p != nil { return p.salt } return nil } func (p *RangeChecksumParams) TZ() *RangeChecksumParams { p.tz = true return p } type ObjectRangeHashRes struct { statusRes hashes [][]byte } func (x *ObjectRangeHashRes) setHashes(v [][]byte) { x.hashes = v } func (x ObjectRangeHashRes) Hashes() [][]byte { return x.hashes } // HashObjectPayloadRanges receives range hash of the object // payload data through NeoFS API call. // // Any client's internal or transport errors are returned as `error`. // If WithNeoFSErrorParsing option has been provided, unsuccessful // NeoFS status codes are returned as `error`, otherwise, are included // in the returned result structure. func (c *Client) HashObjectPayloadRanges(ctx context.Context, p *RangeChecksumParams, opts ...CallOption) (*ObjectRangeHashRes, error) { callOpts := c.defaultCallOptions() for i := range opts { if opts[i] != nil { opts[i](callOpts) } } // create request req := new(v2object.GetRangeHashRequest) // initialize request body body := new(v2object.GetRangeHashRequestBody) req.SetBody(body) // set meta header meta := v2MetaHeaderFromOpts(callOpts) if err := c.attachV2SessionToken(callOpts, meta, v2SessionReqInfo{ addr: p.addr.ToV2(), verb: v2session.ObjectVerbRangeHash, }); err != nil { return nil, fmt.Errorf("could not attach session token: %w", err) } req.SetMetaHeader(meta) // fill body fields body.SetAddress(p.addr.ToV2()) body.SetSalt(p.salt) typ := checksumSHA256 if p.tz { typ = checksumTZ } typV2 := typ.toV2() body.SetType(typV2) rsV2 := rangesToV2(p.rs) body.SetRanges(rsV2) // sign the request if err := signature.SignServiceMessage(callOpts.key, req); err != nil { return nil, fmt.Errorf("signing the request failed: %w", err) } // send request resp, err := rpcapi.HashObjectRange(c.Raw(), req, client.WithContext(ctx)) if err != nil { return nil, fmt.Errorf("sending the request failed: %w", err) } var ( res = new(ObjectRangeHashRes) procPrm processResponseV2Prm procRes processResponseV2Res ) procPrm.callOpts = callOpts procPrm.resp = resp procRes.statusRes = res // process response in general if c.processResponseV2(&procRes, procPrm) { if procRes.cliErr != nil { return nil, procRes.cliErr } return res, nil } res.setHashes(resp.GetBody().GetHashList()) return res, nil } func (p *SearchObjectParams) WithContainerID(v *cid.ID) *SearchObjectParams { if p != nil { p.cid = v } return p } func (p *SearchObjectParams) ContainerID() *cid.ID { if p != nil { return p.cid } return nil } func (p *SearchObjectParams) WithSearchFilters(v object.SearchFilters) *SearchObjectParams { if p != nil { p.filters = v } return p } func (p *SearchObjectParams) SearchFilters() object.SearchFilters { if p != nil { return p.filters } return nil } type ObjectSearchRes struct { statusRes ids []*oid.ID } func (x *ObjectSearchRes) setIDList(v []*oid.ID) { x.ids = v } func (x ObjectSearchRes) IDList() []*oid.ID { return x.ids } // SearchObjects searches for the objects through NeoFS API call. // // Any client's internal or transport errors are returned as `error`. // If WithNeoFSErrorParsing option has been provided, unsuccessful // NeoFS status codes are returned as `error`, otherwise, are included // in the returned result structure. func (c *Client) SearchObjects(ctx context.Context, p *SearchObjectParams, opts ...CallOption) (*ObjectSearchRes, error) { callOpts := c.defaultCallOptions() for i := range opts { if opts[i] != nil { opts[i](callOpts) } } // create request req := new(v2object.SearchRequest) // initialize request body body := new(v2object.SearchRequestBody) req.SetBody(body) v2Addr := new(v2refs.Address) v2Addr.SetContainerID(p.cid.ToV2()) // set meta header meta := v2MetaHeaderFromOpts(callOpts) if err := c.attachV2SessionToken(callOpts, meta, v2SessionReqInfo{ addr: v2Addr, verb: v2session.ObjectVerbSearch, }); err != nil { return nil, fmt.Errorf("could not attach session token: %w", err) } req.SetMetaHeader(meta) // fill body fields body.SetContainerID(v2Addr.GetContainerID()) body.SetVersion(searchQueryVersion) body.SetFilters(p.filters.ToV2()) // sign the request if err := signature.SignServiceMessage(callOpts.key, req); err != nil { return nil, fmt.Errorf("signing the request failed: %w", err) } // create search stream stream, err := rpcapi.SearchObjects(c.Raw(), req, client.WithContext(ctx)) if err != nil { return nil, fmt.Errorf("stream opening failed: %w", err) } var ( searchResult []*oid.ID resp = new(v2object.SearchResponse) messageWas bool res = new(ObjectSearchRes) procPrm processResponseV2Prm procRes processResponseV2Res ) procPrm.callOpts = callOpts procPrm.resp = resp procRes.statusRes = res for { // receive message from server stream err := stream.Read(resp) if err != nil { if errors.Is(err, io.EOF) { if !messageWas { return nil, errWrongMessageSeq } break } return nil, fmt.Errorf("reading the response failed: %w", err) } messageWas = true // process response in general if c.processResponseV2(&procRes, procPrm) { if procRes.cliErr != nil { return nil, procRes.cliErr } return res, nil } chunk := resp.GetBody().GetIDList() for i := range chunk { searchResult = append(searchResult, oid.NewIDFromV2(chunk[i])) } } res.setIDList(searchResult) return res, nil } func (c *Client) attachV2SessionToken(opts *callOptions, hdr *v2session.RequestMetaHeader, info v2SessionReqInfo) error { if opts.session == nil { return nil } // Do not resign already prepared session token if opts.session.Signature() != nil { hdr.SetSessionToken(opts.session.ToV2()) return nil } opCtx := new(v2session.ObjectSessionContext) opCtx.SetAddress(info.addr) opCtx.SetVerb(info.verb) body := new(v2session.SessionTokenBody) body.SetID(opts.session.ID()) body.SetOwnerID(opts.session.OwnerID().ToV2()) body.SetSessionKey(opts.session.SessionKey()) body.SetContext(opCtx) token := new(v2session.SessionToken) token.SetBody(body) signWrapper := signature.StableMarshalerWrapper{SM: token.GetBody()} err := signer.SignDataWithHandler(opts.key, signWrapper, func(key []byte, sig []byte) { sessionTokenSignature := new(v2refs.Signature) sessionTokenSignature.SetKey(key) sessionTokenSignature.SetSign(sig) token.SetSignature(sessionTokenSignature) }) if err != nil { return err } hdr.SetSessionToken(token) return nil }