package internal import ( "bytes" "context" "crypto/ecdsa" "errors" "fmt" "io" coreclient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session" ) type commonPrm struct { cli coreclient.Client key *ecdsa.PrivateKey tokenSession *session.Object tokenBearer *bearer.Token local bool xHeaders []string } // SetClient sets base client for ForstFS API communication. // // Required parameter. func (x *commonPrm) SetClient(cli coreclient.Client) { x.cli = cli } // SetPrivateKey sets private key to sign the request(s). // // Required parameter. func (x *commonPrm) SetPrivateKey(key *ecdsa.PrivateKey) { x.key = key } // SetSessionToken sets token of the session within which request should be sent. // // By default the request will be sent outside the session. func (x *commonPrm) SetSessionToken(tok *session.Object) { x.tokenSession = tok } // SetBearerToken sets bearer token to be attached to the request. // // By default token is not attached to the request. func (x *commonPrm) SetBearerToken(tok *bearer.Token) { x.tokenBearer = tok } // SetTTL sets time-to-live call option. func (x *commonPrm) SetTTL(ttl uint32) { x.local = ttl < 2 } // SetXHeaders sets request X-Headers. // // By default X-Headers will not be attached to the request. func (x *commonPrm) SetXHeaders(hs []string) { x.xHeaders = hs } type readPrmCommon struct { commonPrm } // SetNetmapEpoch sets the epoch number to be used to locate the objectSDK. // // By default current epoch on the server will be used. func (x *readPrmCommon) SetNetmapEpoch(_ uint64) { // FIXME(@fyrchik): https://git.frostfs.info/TrueCloudLab/frostfs-node/issues/465 } // GetObjectPrm groups parameters of GetObject operation. type GetObjectPrm struct { readPrmCommon ClientParams client.PrmObjectGet obj oid.ID } // SetRawFlag sets raw flag of the request. // // By default request will not be raw. func (x *GetObjectPrm) SetRawFlag() { x.ClientParams.Raw = true } // SetAddress sets object address. // // Required parameter. func (x *GetObjectPrm) SetAddress(addr oid.Address) { x.obj = addr.Object() cnr := addr.Container() x.ClientParams.ContainerID = &cnr x.ClientParams.ObjectID = &x.obj } // GetObjectRes groups the resulting values of GetObject operation. type GetObjectRes struct { obj *objectSDK.Object } // Object returns requested objectSDK. func (x GetObjectRes) Object() *objectSDK.Object { return x.obj } // GetObject reads the object by address. // // Client, context and key must be set. // // Returns any error which prevented the operation from completing correctly in error return. // Returns: // - error of type *objectSDK.SplitInfoError if object raw flag is set and requested object is virtual; // - error of type *apistatus.ObjectAlreadyRemoved if the requested object is marked to be removed. // // GetObject ignores the provided session if it is not related to the requested objectSDK. func GetObject(ctx context.Context, prm GetObjectPrm) (*GetObjectRes, error) { // here we ignore session if it is opened for other object since such // request will almost definitely fail. The case can occur, for example, // when session is bound to the parent object and child object is requested. if prm.tokenSession != nil && prm.tokenSession.AssertObject(prm.obj) { prm.ClientParams.Session = prm.tokenSession } prm.ClientParams.XHeaders = prm.xHeaders prm.ClientParams.BearerToken = prm.tokenBearer prm.ClientParams.Local = prm.local prm.ClientParams.Key = prm.key rdr, err := prm.cli.ObjectGetInit(ctx, prm.ClientParams) if err != nil { return nil, fmt.Errorf("init object reading: %w", err) } var obj objectSDK.Object if !rdr.ReadHeader(&obj) { res, err := rdr.Close() if err == nil { // pull out an error from status err = apistatus.ErrFromStatus(res.Status()) } else { ReportError(prm.cli, err) } return nil, fmt.Errorf("read object header: %w", err) } buf := make([]byte, obj.PayloadSize()) _, err = rdr.Read(buf) if err != nil && !errors.Is(err, io.EOF) { return nil, fmt.Errorf("read payload: %w", err) } obj.SetPayload(buf) return &GetObjectRes{ obj: &obj, }, nil } // HeadObjectPrm groups parameters of HeadObject operation. type HeadObjectPrm struct { readPrmCommon ClientParams client.PrmObjectHead obj oid.ID } // SetRawFlag sets raw flag of the request. // // By default request will not be raw. func (x *HeadObjectPrm) SetRawFlag() { x.ClientParams.Raw = true } // SetAddress sets object address. // // Required parameter. func (x *HeadObjectPrm) SetAddress(addr oid.Address) { x.obj = addr.Object() cnr := addr.Container() x.ClientParams.ContainerID = &cnr x.ClientParams.ObjectID = &x.obj } // HeadObjectRes groups the resulting values of GetObject operation. type HeadObjectRes struct { hdr *objectSDK.Object } // Header returns requested object header. func (x HeadObjectRes) Header() *objectSDK.Object { return x.hdr } // HeadObject reads object header by address. // // Client and key must be set. // // Returns any error which prevented the operation from completing correctly in error return. // Returns: // // error of type *objectSDK.SplitInfoError if object raw flag is set and requested object is virtual; // error of type *apistatus.ObjectAlreadyRemoved if the requested object is marked to be removed. // // HeadObject ignores the provided session if it is not related to the requested objectSDK. func HeadObject(ctx context.Context, prm HeadObjectPrm) (*HeadObjectRes, error) { // see details in same statement of GetObject if prm.tokenSession != nil && prm.tokenSession.AssertObject(prm.obj) { prm.ClientParams.Session = prm.tokenSession } prm.ClientParams.BearerToken = prm.tokenBearer prm.ClientParams.Local = prm.local prm.ClientParams.XHeaders = prm.xHeaders cliRes, err := prm.cli.ObjectHead(ctx, prm.ClientParams) if err == nil { // pull out an error from status err = apistatus.ErrFromStatus(cliRes.Status()) } if err != nil { return nil, fmt.Errorf("read object header from FrostFS: %w", err) } var hdr objectSDK.Object if !cliRes.ReadHeader(&hdr) { return nil, errors.New("missing object header in the response") } return &HeadObjectRes{ hdr: &hdr, }, nil } // PayloadRangePrm groups parameters of PayloadRange operation. type PayloadRangePrm struct { readPrmCommon ln uint64 ClientParams client.PrmObjectRange obj oid.ID } // SetRawFlag sets raw flag of the request. // // By default request will not be raw. func (x *PayloadRangePrm) SetRawFlag() { x.ClientParams.Raw = true } // SetAddress sets object address. // // Required parameter. func (x *PayloadRangePrm) SetAddress(addr oid.Address) { x.obj = addr.Object() cnr := addr.Container() x.ClientParams.ContainerID = &cnr x.ClientParams.ObjectID = &x.obj } // SetRange range of the object payload to be read. // // Required parameter. func (x *PayloadRangePrm) SetRange(rng *objectSDK.Range) { x.ClientParams.Offset = rng.GetOffset() x.ln = rng.GetLength() } // PayloadRangeRes groups the resulting values of GetObject operation. type PayloadRangeRes struct { data []byte } // PayloadRange returns data of the requested payload range. func (x PayloadRangeRes) PayloadRange() []byte { return x.data } // maxInitialBufferSize is the maximum initial buffer size for PayloadRange result. // We don't want to allocate a lot of space in advance because a query can // fail with apistatus.ObjectOutOfRange status. const maxInitialBufferSize = 1024 * 1024 // 1 MiB // PayloadRange reads object payload range by address. // // Client and key must be set. // // Returns any error which prevented the operation from completing correctly in error return. // Returns: // // error of type *objectSDK.SplitInfoError if object raw flag is set and requested object is virtual; // error of type *apistatus.ObjectAlreadyRemoved if the requested object is marked to be removed; // error of type *apistatus.ObjectOutOfRange if the requested range is too big. // // PayloadRange ignores the provided session if it is not related to the requested objectSDK. func PayloadRange(ctx context.Context, prm PayloadRangePrm) (*PayloadRangeRes, error) { // see details in same statement of GetObject if prm.tokenSession != nil && prm.tokenSession.AssertObject(prm.obj) { prm.ClientParams.Session = prm.tokenSession } prm.ClientParams.XHeaders = prm.xHeaders prm.ClientParams.BearerToken = prm.tokenBearer prm.ClientParams.Local = prm.local prm.ClientParams.Length = prm.ln rdr, err := prm.cli.ObjectRangeInit(ctx, prm.ClientParams) if err != nil { return nil, fmt.Errorf("init payload reading: %w", err) } if int64(prm.ln) < 0 { // `CopyN` expects `int64`, this check ensures that the result is positive. // On practice this means that we can return incorrect results for objects // with size > 8_388 Petabytes, this will be fixed later with support for streaming. return nil, new(apistatus.ObjectOutOfRange) } ln := prm.ln if ln > maxInitialBufferSize { ln = maxInitialBufferSize } w := bytes.NewBuffer(make([]byte, ln)) _, err = io.CopyN(w, rdr, int64(prm.ln)) if err != nil { return nil, fmt.Errorf("read payload: %w", err) } return &PayloadRangeRes{ data: w.Bytes(), }, nil } // PutObjectPrm groups parameters of PutObject operation. type PutObjectPrm struct { commonPrm obj *objectSDK.Object } // SetObject sets object to be stored. // // Required parameter. func (x *PutObjectPrm) SetObject(obj *objectSDK.Object) { x.obj = obj } // PutObjectRes groups the resulting values of PutObject operation. type PutObjectRes struct { id oid.ID } // ID returns identifier of the stored objectSDK. func (x PutObjectRes) ID() oid.ID { return x.id } // PutObject saves the object in local storage of the remote node. // // Client and key must be set. // // Returns any error which prevented the operation from completing correctly in error return. func PutObject(ctx context.Context, prm PutObjectPrm) (*PutObjectRes, error) { ctx, span := tracing.StartSpanFromContext(ctx, "client.PutObject") defer span.End() prmCli := client.PrmObjectPutInit{ XHeaders: prm.xHeaders, BearerToken: prm.tokenBearer, Session: prm.tokenSession, Local: true, Key: prm.key, } w, err := prm.cli.ObjectPutInit(ctx, prmCli) if err != nil { return nil, fmt.Errorf("init object writing on client: %w", err) } if w.WriteHeader(ctx, *prm.obj) { w.WritePayloadChunk(ctx, prm.obj.Payload()) } cliRes, err := w.Close(ctx) if err == nil { err = apistatus.ErrFromStatus(cliRes.Status()) } else { ReportError(prm.cli, err) } if err != nil { return nil, fmt.Errorf("write object via client: %w", err) } return &PutObjectRes{ id: cliRes.StoredObjectID(), }, nil } // PutObjectSingle saves the object in local storage of the remote node with PutSingle RPC. // // Client and key must be set. // // Returns any error which prevented the operation from completing correctly in error return. func PutObjectSingle(ctx context.Context, prm PutObjectPrm) (*PutObjectRes, error) { ctx, span := tracing.StartSpanFromContext(ctx, "client.PutObjectSingle") defer span.End() objID, isSet := prm.obj.ID() if !isSet { return nil, errors.New("missing object id") } prmCli := client.PrmObjectPutSingle{ XHeaders: prm.xHeaders, BearerToken: prm.tokenBearer, Session: prm.tokenSession, Local: true, Key: prm.key, Object: prm.obj, } res, err := prm.cli.ObjectPutSingle(ctx, prmCli) if err != nil { ReportError(prm.cli, err) return nil, fmt.Errorf("put single object on client: %w", err) } if err = apistatus.ErrFromStatus(res.Status()); err != nil { return nil, fmt.Errorf("put single object via client: %w", err) } return &PutObjectRes{ id: objID, }, nil } // SearchObjectsPrm groups parameters of SearchObjects operation. type SearchObjectsPrm struct { readPrmCommon cliPrm client.PrmObjectSearch } // SetContainerID sets identifier of the container to search the objects. // // Required parameter. func (x *SearchObjectsPrm) SetContainerID(id cid.ID) { x.cliPrm.ContainerID = &id } // SetFilters sets search filters. func (x *SearchObjectsPrm) SetFilters(fs objectSDK.SearchFilters) { x.cliPrm.Filters = fs } // SearchObjectsRes groups the resulting values of SearchObjects operation. type SearchObjectsRes struct { ids []oid.ID } // IDList returns identifiers of the matched objects. func (x SearchObjectsRes) IDList() []oid.ID { return x.ids } // SearchObjects selects objects from container which match the filters. // // Returns any error which prevented the operation from completing correctly in error return. func SearchObjects(ctx context.Context, prm SearchObjectsPrm) (*SearchObjectsRes, error) { prm.cliPrm.Local = prm.local prm.cliPrm.Session = prm.tokenSession prm.cliPrm.BearerToken = prm.tokenBearer prm.cliPrm.XHeaders = prm.xHeaders prm.cliPrm.Key = prm.key rdr, err := prm.cli.ObjectSearchInit(ctx, prm.cliPrm) if err != nil { return nil, fmt.Errorf("init object searching in client: %w", err) } buf := make([]oid.ID, 10) var ids []oid.ID var n int var ok bool for { n, ok = rdr.Read(buf) if n > 0 { for i := range buf[:n] { v := buf[i] ids = append(ids, v) } } if !ok { break } } res, err := rdr.Close() if err == nil { // pull out an error from status err = apistatus.ErrFromStatus(res.Status()) } if err != nil { return nil, fmt.Errorf("read object list: %w", err) } return &SearchObjectsRes{ ids: ids, }, nil }