package client import ( "context" "crypto/ecdsa" "errors" "fmt" "io" "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/acl" v2object "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object" v2refs "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/refs" rpcapi "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc" "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client" v2session "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/session" "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/signature" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session" ) // PrmObjectGet groups parameters of ObjectGetInit operation. type PrmObjectGet struct { XHeaders []string BearerToken *bearer.Token Session *session.Object Raw bool Local bool ContainerID *cid.ID ObjectID *oid.ID Key *ecdsa.PrivateKey } func (prm *PrmObjectGet) buildRequest(c *Client) (*v2object.GetRequest, error) { if prm.ContainerID == nil { return nil, errorMissingContainer } if prm.ObjectID == nil { return nil, errorMissingObject } if len(prm.XHeaders)%2 != 0 { return nil, errorInvalidXHeaders } meta := new(v2session.RequestMetaHeader) writeXHeadersToMeta(prm.XHeaders, meta) if prm.BearerToken != nil { v2BearerToken := new(acl.BearerToken) prm.BearerToken.WriteToV2(v2BearerToken) meta.SetBearerToken(v2BearerToken) } if prm.Session != nil { v2SessionToken := new(v2session.Token) prm.Session.WriteToV2(v2SessionToken) meta.SetSessionToken(v2SessionToken) } if prm.Local { meta.SetTTL(1) } addr := new(v2refs.Address) cnrV2 := new(v2refs.ContainerID) prm.ContainerID.WriteToV2(cnrV2) addr.SetContainerID(cnrV2) objV2 := new(v2refs.ObjectID) prm.ObjectID.WriteToV2(objV2) addr.SetObjectID(objV2) body := new(v2object.GetRequestBody) body.SetRaw(prm.Raw) body.SetAddress(addr) req := new(v2object.GetRequest) req.SetBody(body) c.prepareRequest(req, meta) return req, nil } // ResObjectGet groups the final result values of ObjectGetInit operation. type ResObjectGet struct { statusRes } // ObjectReader is designed to read one object from FrostFS system. // // Must be initialized using Client.ObjectGetInit, any other // usage is unsafe. type ObjectReader struct { cancelCtxStream context.CancelFunc client *Client stream interface { Read(resp *v2object.GetResponse) error } res ResObjectGet err error tailPayload []byte remainingPayloadLen int } // UseKey specifies private key to sign the requests. // If key is not provided, then Client default key is used. // // Deprecated: Use PrmObjectGet.Key instead. func (prm *PrmObjectGet) UseKey(key ecdsa.PrivateKey) { prm.Key = &key } // ReadHeader reads header of the object. Result means success. // Failure reason can be received via Close. func (x *ObjectReader) ReadHeader(dst *object.Object) bool { var resp v2object.GetResponse x.err = x.stream.Read(&resp) if x.err != nil { return false } x.res.st, x.err = x.client.processResponse(&resp) if x.err != nil || !apistatus.IsSuccessful(x.res.st) { return false } var partInit *v2object.GetObjectPartInit switch v := resp.GetBody().GetObjectPart().(type) { default: x.err = fmt.Errorf("unexpected message instead of heading part: %T", v) return false case *v2object.SplitInfo: x.err = object.NewSplitInfoError(object.NewSplitInfoFromV2(v)) return false case *v2object.GetObjectPartInit: partInit = v } var objv2 v2object.Object objv2.SetObjectID(partInit.GetObjectID()) objv2.SetHeader(partInit.GetHeader()) objv2.SetSignature(partInit.GetSignature()) x.remainingPayloadLen = int(objv2.GetHeader().GetPayloadLength()) *dst = *object.NewFromV2(&objv2) // need smth better return true } func (x *ObjectReader) readChunk(buf []byte) (int, bool) { var read int // read remaining tail read = copy(buf, x.tailPayload) x.tailPayload = x.tailPayload[read:] if len(buf) == read { return read, true } var chunk []byte var lastRead int for { var resp v2object.GetResponse x.err = x.stream.Read(&resp) if x.err != nil { return read, false } x.res.st, x.err = x.client.processResponse(&resp) if x.err != nil || !apistatus.IsSuccessful(x.res.st) { return read, false } part := resp.GetBody().GetObjectPart() partChunk, ok := part.(*v2object.GetObjectPartChunk) if !ok { x.err = fmt.Errorf("unexpected message instead of chunk part: %T", part) return read, false } // read new chunk chunk = partChunk.GetChunk() if len(chunk) == 0 { // just skip empty chunks since they are not prohibited by protocol continue } lastRead = copy(buf[read:], chunk) read += lastRead if read == len(buf) { // save the tail x.tailPayload = append(x.tailPayload, chunk[lastRead:]...) return read, true } } } // ReadChunk reads another chunk of the object payload. Works similar to // io.Reader.Read but returns success flag instead of error. // // Failure reason can be received via Close. func (x *ObjectReader) ReadChunk(buf []byte) (int, bool) { return x.readChunk(buf) } func (x *ObjectReader) close(ignoreEOF bool) (*ResObjectGet, error) { defer x.cancelCtxStream() if x.err != nil { if !errors.Is(x.err, io.EOF) { return nil, x.err } else if !ignoreEOF { if x.remainingPayloadLen > 0 { return nil, io.ErrUnexpectedEOF } return nil, io.EOF } } return &x.res, nil } // Close ends reading the object and returns the result of the operation // along with the final results. Must be called after using the ObjectReader. // // Exactly one return value is non-nil. By default, server status is returned in res structure. // Any client's internal or transport errors are returned as Go built-in error. // If Client is tuned to resolve FrostFS API statuses, then FrostFS failures // codes are returned as error. // // Return errors: // // *object.SplitInfoError (returned on virtual objects with PrmObjectGet.MakeRaw). // // Return statuses: // - global (see Client docs); // - *apistatus.ContainerNotFound; // - *apistatus.ObjectNotFound; // - *apistatus.ObjectAccessDenied; // - *apistatus.ObjectAlreadyRemoved; // - *apistatus.SessionTokenExpired. func (x *ObjectReader) Close() (*ResObjectGet, error) { return x.close(true) } // Read implements io.Reader of the object payload. func (x *ObjectReader) Read(p []byte) (int, error) { n, ok := x.readChunk(p) x.remainingPayloadLen -= n if !ok { res, err := x.close(false) if err != nil { return n, err } return n, apistatus.ErrFromStatus(res.Status()) } if x.remainingPayloadLen < 0 { return n, errors.New("payload size overflow") } return n, nil } // ObjectGetInit initiates reading an object through a remote server using FrostFS API protocol. // // The call only opens the transmission channel, explicit fetching is done using the ObjectReader. // Exactly one return value is non-nil. Resulting reader must be finally closed. // // Returns an error if parameters are set incorrectly (see PrmObjectGet docs). // Context is required and must not be nil. It is used for network communication. func (c *Client) ObjectGetInit(ctx context.Context, prm PrmObjectGet) (*ObjectReader, error) { req, err := prm.buildRequest(c) if err != nil { return nil, err } key := prm.Key if key == nil { key = &c.prm.key } err = signature.SignServiceMessage(key, req) if err != nil { return nil, fmt.Errorf("sign request: %w", err) } ctx, cancel := context.WithCancel(ctx) stream, err := rpcapi.GetObject(&c.c, req, client.WithContext(ctx)) if err != nil { cancel() return nil, fmt.Errorf("open stream: %w", err) } var r ObjectReader r.cancelCtxStream = cancel r.stream = stream r.client = c return &r, nil } // PrmObjectHead groups parameters of ObjectHead operation. type PrmObjectHead struct { XHeaders []string BearerToken *bearer.Token Session *session.Object Raw bool Local bool ContainerID *cid.ID ObjectID *oid.ID Key *ecdsa.PrivateKey } // UseKey specifies private key to sign the requests. // If key is not provided, then Client default key is used. // // Deprecated: Use PrmObjectHead.Key instead. func (prm *PrmObjectHead) UseKey(key ecdsa.PrivateKey) { prm.Key = &key } // ResObjectHead groups resulting values of ObjectHead operation. type ResObjectHead struct { statusRes // requested object (response doesn't carry the ID) idObj oid.ID hdr *v2object.HeaderWithSignature } // ReadHeader reads header of the requested object. // Returns false if header is missing in the response (not read). func (x *ResObjectHead) ReadHeader(dst *object.Object) bool { if x.hdr == nil { return false } var objv2 v2object.Object objv2.SetHeader(x.hdr.GetHeader()) objv2.SetSignature(x.hdr.GetSignature()) obj := object.NewFromV2(&objv2) obj.SetID(x.idObj) *dst = *obj return true } func (prm *PrmObjectHead) buildRequest(c *Client) (*v2object.HeadRequest, error) { if prm.ContainerID == nil { return nil, errorMissingContainer } if prm.ObjectID == nil { return nil, errorMissingObject } if len(prm.XHeaders)%2 != 0 { return nil, errorInvalidXHeaders } meta := new(v2session.RequestMetaHeader) writeXHeadersToMeta(prm.XHeaders, meta) if prm.BearerToken != nil { v2BearerToken := new(acl.BearerToken) prm.BearerToken.WriteToV2(v2BearerToken) meta.SetBearerToken(v2BearerToken) } if prm.Session != nil { v2SessionToken := new(v2session.Token) prm.Session.WriteToV2(v2SessionToken) meta.SetSessionToken(v2SessionToken) } if prm.Local { meta.SetTTL(1) } addr := new(v2refs.Address) cnrV2 := new(v2refs.ContainerID) prm.ContainerID.WriteToV2(cnrV2) addr.SetContainerID(cnrV2) objV2 := new(v2refs.ObjectID) prm.ObjectID.WriteToV2(objV2) addr.SetObjectID(objV2) body := new(v2object.HeadRequestBody) body.SetRaw(prm.Raw) body.SetAddress(addr) req := new(v2object.HeadRequest) req.SetBody(body) c.prepareRequest(req, meta) return req, nil } // ObjectHead reads object header through a remote server using FrostFS API protocol. // // Exactly one return value is non-nil. By default, server status is returned in res structure. // Any client's internal or transport errors are returned as `error`, // If PrmInit.DontResolveFrostFSFailures has been called, unsuccessful // FrostFS status codes are included in the returned result structure, // otherwise, are also returned as `error`. // // Returns an error if parameters are set incorrectly (see PrmObjectHead docs). // Context is required and must not be nil. It is used for network communication. // // Return errors: // // *object.SplitInfoError (returned on virtual objects with PrmObjectHead.MakeRaw). // // Return statuses: // - global (see Client docs); // - *apistatus.ContainerNotFound; // - *apistatus.ObjectNotFound; // - *apistatus.ObjectAccessDenied; // - *apistatus.ObjectAlreadyRemoved; // - *apistatus.SessionTokenExpired. func (c *Client) ObjectHead(ctx context.Context, prm PrmObjectHead) (*ResObjectHead, error) { req, err := prm.buildRequest(c) if err != nil { return nil, err } key := c.prm.key if prm.Key != nil { key = *prm.Key } // sign the request err = signature.SignServiceMessage(&key, req) if err != nil { return nil, fmt.Errorf("sign request: %w", err) } resp, err := rpcapi.HeadObject(&c.c, req, client.WithContext(ctx)) if err != nil { return nil, fmt.Errorf("write request: %w", err) } var res ResObjectHead res.st, err = c.processResponse(resp) if err != nil { return nil, err } if !apistatus.IsSuccessful(res.st) { return &res, nil } res.idObj = *prm.ObjectID switch v := resp.GetBody().GetHeaderPart().(type) { default: return nil, fmt.Errorf("unexpected header type %T", v) case *v2object.SplitInfo: return nil, object.NewSplitInfoError(object.NewSplitInfoFromV2(v)) case *v2object.HeaderWithSignature: res.hdr = v } return &res, nil } // PrmObjectRange groups parameters of ObjectRange operation. type PrmObjectRange struct { XHeaders []string BearerToken *bearer.Token Session *session.Object Raw bool Local bool ContainerID *cid.ID ObjectID *oid.ID Key *ecdsa.PrivateKey Offset uint64 Length uint64 } func (prm *PrmObjectRange) buildRequest(c *Client) (*v2object.GetRangeRequest, error) { if prm.Length == 0 { return nil, errorZeroRangeLength } if prm.ContainerID == nil { return nil, errorMissingContainer } if prm.ObjectID == nil { return nil, errorMissingObject } if len(prm.XHeaders)%2 != 0 { return nil, errorInvalidXHeaders } meta := new(v2session.RequestMetaHeader) writeXHeadersToMeta(prm.XHeaders, meta) if prm.BearerToken != nil { v2BearerToken := new(acl.BearerToken) prm.BearerToken.WriteToV2(v2BearerToken) meta.SetBearerToken(v2BearerToken) } if prm.Session != nil { v2SessionToken := new(v2session.Token) prm.Session.WriteToV2(v2SessionToken) meta.SetSessionToken(v2SessionToken) } if prm.Local { meta.SetTTL(1) } addr := new(v2refs.Address) cnrV2 := new(v2refs.ContainerID) prm.ContainerID.WriteToV2(cnrV2) addr.SetContainerID(cnrV2) objV2 := new(v2refs.ObjectID) prm.ObjectID.WriteToV2(objV2) addr.SetObjectID(objV2) rng := new(v2object.Range) rng.SetLength(prm.Length) rng.SetOffset(prm.Offset) body := new(v2object.GetRangeRequestBody) body.SetRaw(prm.Raw) body.SetAddress(addr) body.SetRange(rng) req := new(v2object.GetRangeRequest) req.SetBody(body) c.prepareRequest(req, meta) return req, nil } // UseKey specifies private key to sign the requests. // If key is not provided, then Client default key is used. // // Deprecated: Use PrmObjectRange.Key instead. func (prm *PrmObjectRange) UseKey(key ecdsa.PrivateKey) { prm.Key = &key } // ResObjectRange groups the final result values of ObjectRange operation. type ResObjectRange struct { statusRes } // ObjectRangeReader is designed to read payload range of one object // from FrostFS system. // // Must be initialized using Client.ObjectRangeInit, any other // usage is unsafe. type ObjectRangeReader struct { cancelCtxStream context.CancelFunc client *Client res ResObjectRange err error stream interface { Read(resp *v2object.GetRangeResponse) error } tailPayload []byte remainingPayloadLen int } func (x *ObjectRangeReader) readChunk(buf []byte) (int, bool) { var read int // read remaining tail read = copy(buf, x.tailPayload) x.tailPayload = x.tailPayload[read:] if len(buf) == read { return read, true } var partChunk *v2object.GetRangePartChunk var chunk []byte var lastRead int for { var resp v2object.GetRangeResponse x.err = x.stream.Read(&resp) if x.err != nil { return read, false } x.res.st, x.err = x.client.processResponse(&resp) if x.err != nil || !apistatus.IsSuccessful(x.res.st) { return read, false } // get chunk message switch v := resp.GetBody().GetRangePart().(type) { default: x.err = fmt.Errorf("unexpected message received: %T", v) return read, false case *v2object.SplitInfo: x.err = object.NewSplitInfoError(object.NewSplitInfoFromV2(v)) return read, false case *v2object.GetRangePartChunk: partChunk = v } chunk = partChunk.GetChunk() if len(chunk) == 0 { // just skip empty chunks since they are not prohibited by protocol continue } lastRead = copy(buf[read:], chunk) read += lastRead if read == len(buf) { // save the tail x.tailPayload = append(x.tailPayload, chunk[lastRead:]...) return read, true } } } // ReadChunk reads another chunk of the object payload range. // Works similar to io.Reader.Read but returns success flag instead of error. // // Failure reason can be received via Close. func (x *ObjectRangeReader) ReadChunk(buf []byte) (int, bool) { return x.readChunk(buf) } func (x *ObjectRangeReader) close(ignoreEOF bool) (*ResObjectRange, error) { defer x.cancelCtxStream() if x.err != nil { if !errors.Is(x.err, io.EOF) { return nil, x.err } else if !ignoreEOF { if x.remainingPayloadLen > 0 { return nil, io.ErrUnexpectedEOF } return nil, io.EOF } } return &x.res, nil } // Close ends reading the payload range and returns the result of the operation // along with the final results. Must be called after using the ObjectRangeReader. // // Exactly one return value is non-nil. By default, server status is returned in res structure. // Any client's internal or transport errors are returned as Go built-in error. // If Client is tuned to resolve FrostFS API statuses, then FrostFS failures // codes are returned as error. // // Return errors: // // *object.SplitInfoError (returned on virtual objects with PrmObjectRange.MakeRaw). // // Return statuses: // - global (see Client docs); // - *apistatus.ContainerNotFound; // - *apistatus.ObjectNotFound; // - *apistatus.ObjectAccessDenied; // - *apistatus.ObjectAlreadyRemoved; // - *apistatus.ObjectOutOfRange; // - *apistatus.SessionTokenExpired. func (x *ObjectRangeReader) Close() (*ResObjectRange, error) { return x.close(true) } // Read implements io.Reader of the object payload. func (x *ObjectRangeReader) Read(p []byte) (int, error) { n, ok := x.readChunk(p) x.remainingPayloadLen -= n if !ok { res, err := x.close(false) if err != nil { return n, err } return n, apistatus.ErrFromStatus(res.Status()) } if x.remainingPayloadLen < 0 { return n, errors.New("payload range size overflow") } return n, nil } // ObjectRangeInit initiates reading an object's payload range through a remote // server using FrostFS API protocol. // // The call only opens the transmission channel, explicit fetching is done using the ObjectRangeReader. // Exactly one return value is non-nil. Resulting reader must be finally closed. // // Returns an error if parameters are set incorrectly (see PrmObjectRange docs). // Context is required and must not be nil. It is used for network communication. func (c *Client) ObjectRangeInit(ctx context.Context, prm PrmObjectRange) (*ObjectRangeReader, error) { req, err := prm.buildRequest(c) if err != nil { return nil, err } key := prm.Key if key == nil { key = &c.prm.key } err = signature.SignServiceMessage(key, req) if err != nil { return nil, fmt.Errorf("sign request: %w", err) } ctx, cancel := context.WithCancel(ctx) stream, err := rpcapi.GetObjectRange(&c.c, req, client.WithContext(ctx)) if err != nil { cancel() return nil, fmt.Errorf("open stream: %w", err) } var r ObjectRangeReader r.remainingPayloadLen = int(prm.Length) r.cancelCtxStream = cancel r.stream = stream r.client = c return &r, nil }