diff --git a/client/object_patch.go b/client/object_patch.go new file mode 100644 index 0000000..1e731c5 --- /dev/null +++ b/client/object_patch.go @@ -0,0 +1,238 @@ +package client + +import ( + "context" + "crypto/ecdsa" + "errors" + "fmt" + "io" + + "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/acl" + v2object "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object" + rpcapi "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc" + "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client" + v2session "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/session" + "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/signature" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer" + apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" + oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session" +) + +// ObjectPatcher is designed to patch an object. +// +// Must be initialized using Client.ObjectPatchInit, any other +// usage is unsafe. +type ObjectPatcher interface { + // Patch patches the object. + // + // If object.Patch parameter sets `NewAttributes`, `ReplaceAttributes` or both fields, then the + // request is taken for patching the object's attributes. Attributes can be patched no more than once, + // otherwise, the server returns an error. + // + // If object.Patch parameter sets `PayloadPatch` with a large `Chunk`, then this patch is splitted + // into a few patches. After split each `PayloadPatch` has a `Chunk` with the size <= `MaxChunkLength`. + // + // Result means success. Failure reason can be received via Close. + Patch(_ context.Context, patch *object.Patch) bool + // Close ends patching the object and returns the result of the operation + // along with the final results. Must be called after using the ObjectPatcher. + // + // Exactly one return value is non-nil. By default, server status is returned in res structure. + // Any client's internal or transport errors are returned as Go built-in error. + // If Client is tuned to resolve FrostFS API statuses, then FrostFS failures + // codes are returned as error. + // + // Return statuses: + // - global (see Client docs); + // - *apistatus.ContainerNotFound; + // - *apistatus.ContainerAccessDenied; + // - *apistatus.ObjectAccessDenied; + // - *apistatus.ObjectAlreadyRemoved; + // - *apistatus.ObjectLocked; + // - *apistatus.ObjectOutOfRange; + // - *apistatus.SessionTokenNotFound; + // - *apistatus.SessionTokenExpired. + Close(_ context.Context) (*ResObjectPatch, error) +} + +// ResObjectPatch groups resulting values of ObjectPatch operation. +type ResObjectPatch struct { + statusRes + + obj oid.ID +} + +// ObjectID returns an object ID of the patched object. +func (r ResObjectPatch) ObjectID() oid.ID { + return r.obj +} + +// PrmObjectPatch groups parameters of ObjectPatch operation. +type PrmObjectPatch struct { + XHeaders []string + + BearerToken *bearer.Token + + Session *session.Object + + Key *ecdsa.PrivateKey + + MaxChunkLength int +} + +// ObjectPatchInit initializes object patcher. +func (c *Client) ObjectPatchInit(ctx context.Context, prm PrmObjectPatch) (ObjectPatcher, error) { + if len(prm.XHeaders)%2 != 0 { + return nil, errorInvalidXHeaders + } + + var objectPatcher objectPatcher + stream, err := rpcapi.Patch(&c.c, &objectPatcher.respV2, client.WithContext(ctx)) + if err != nil { + return nil, fmt.Errorf("open stream: %w", err) + } + + objectPatcher.key = &c.prm.Key + if prm.Key != nil { + objectPatcher.key = prm.Key + } + objectPatcher.client = c + objectPatcher.stream = stream + + if prm.MaxChunkLength > 0 { + objectPatcher.maxChunkLen = prm.MaxChunkLength + } else { + objectPatcher.maxChunkLen = defaultGRPCPayloadChunkLen + } + + objectPatcher.req.SetBody(&v2object.PatchRequestBody{}) + + meta := new(v2session.RequestMetaHeader) + writeXHeadersToMeta(prm.XHeaders, meta) + + if prm.BearerToken != nil { + v2BearerToken := new(acl.BearerToken) + prm.BearerToken.WriteToV2(v2BearerToken) + meta.SetBearerToken(v2BearerToken) + } + + if prm.Session != nil { + v2SessionToken := new(v2session.Token) + prm.Session.WriteToV2(v2SessionToken) + meta.SetSessionToken(v2SessionToken) + } + + c.prepareRequest(&objectPatcher.req, meta) + + return &objectPatcher, nil +} + +type objectPatcher struct { + client *Client + + stream interface { + Write(*v2object.PatchRequest) error + Close() error + } + + key *ecdsa.PrivateKey + res ResObjectPatch + err error + + req v2object.PatchRequest + respV2 v2object.PatchResponse + + maxChunkLen int +} + +func (x *objectPatcher) Patch(_ context.Context, patch *object.Patch) bool { + if patch == nil { + x.err = errors.New("nil patch") + return false + } + + patches := []*object.Patch{patch} + if patch.PayloadPatch != nil && len(patch.PayloadPatch.Chunk) > x.maxChunkLen { + patches = x.splitPatch(patch) + } + + for _, p := range patches { + if !x.send(p) { + return false + } + } + + return true +} + +// splitPatch splits Patch into a few ones if the original Patch has a large PayloadPatch that is required to be splitted. +func (x *objectPatcher) splitPatch(patch *object.Patch) []*object.Patch { + splitted := patch.PayloadPatch.SplitByMaxChunkLength(x.maxChunkLen) + result := make([]*object.Patch, len(splitted)) + + for i, pp := range splitted { + result[i] = &object.Patch{ + Address: patch.Address, + PayloadPatch: pp, + } + } + + if len(result) > 0 { + result[0].NewAttributes = patch.NewAttributes + result[0].ReplaceAttributes = patch.ReplaceAttributes + } + + return result +} + +func (x *objectPatcher) send(patch *object.Patch) bool { + x.req.SetBody(patch.ToV2()) + x.req.SetVerificationHeader(nil) + + x.err = signature.SignServiceMessage(x.key, &x.req) + if x.err != nil { + x.err = fmt.Errorf("sign message: %w", x.err) + return false + } + + x.err = x.stream.Write(&x.req) + return x.err == nil +} + +func (x *objectPatcher) Close(_ context.Context) (*ResObjectPatch, error) { + // Ignore io.EOF error, because it is expected error for client-side + // stream termination by the server. E.g. when stream contains invalid + // message. Server returns an error in response message (in status). + if x.err != nil && !errors.Is(x.err, io.EOF) { + return nil, x.err + } + + if x.err = x.stream.Close(); x.err != nil { + return nil, x.err + } + + x.res.st, x.err = x.client.processResponse(&x.respV2) + if x.err != nil { + return nil, x.err + } + + if !apistatus.IsSuccessful(x.res.st) { + return &x.res, nil + } + + const fieldID = "ID" + + idV2 := x.respV2.Body.ObjectID + if idV2 == nil { + return nil, newErrMissingResponseField(fieldID) + } + + x.err = x.res.obj.ReadFromV2(*idV2) + if x.err != nil { + x.err = newErrInvalidResponseField(fieldID, x.err) + } + + return &x.res, nil +}