From b8e93d4c08962aa54dfd9f345f75853053ea555b Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Thu, 9 Mar 2023 11:02:27 +0300 Subject: [PATCH] [#85] get-service: Use assembler to assemble LOB Signed-off-by: Dmitrii Stepanov --- pkg/services/object/get/assemble.go | 300 ++++++------------ pkg/services/object/get/assembler.go | 15 +- pkg/services/object/get/exec.go | 88 +---- pkg/services/object/get/get_test.go | 20 +- pkg/services/object/get/prm.go | 5 +- pkg/services/object/get/util.go | 15 +- pkg/services/object/get/v2/streamer.go | 8 +- pkg/services/object/get/v2/util.go | 8 +- .../object_manager/tombstone/source/source.go | 2 +- 9 files changed, 126 insertions(+), 335 deletions(-) diff --git a/pkg/services/object/get/assemble.go b/pkg/services/object/get/assemble.go index b6e1c4fb7..ed77b5693 100644 --- a/pkg/services/object/get/assemble.go +++ b/pkg/services/object/get/assemble.go @@ -1,6 +1,7 @@ package getsvc import ( + "context" "errors" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" @@ -35,223 +36,102 @@ func (exec *execCtx) assemble() { exec.log.Debug("trying to assemble the object...") - splitInfo := exec.splitInfo() + assembler := newAssembler(exec.address(), exec.splitInfo(), exec.ctxRange(), exec) - childID, ok := splitInfo.Link() - if !ok { - childID, ok = splitInfo.LastPart() - if !ok { - exec.log.Debug("neither linking nor last part of split-chain is presented in split info") - return - } - } - - prev, children := exec.initFromChild(childID) - - if len(children) > 0 { - if exec.ctxRange() == nil { - if ok := exec.writeCollectedHeader(); ok { - exec.overtakePayloadDirectly(children, nil, true) - } - } else { - // TODO: #1155 choose one-by-one restoring algorithm according to size - // * if size > MAX => go right-to-left with HEAD and back with GET - // * else go right-to-left with GET and compose in single object before writing - - if ok := exec.overtakePayloadInReverse(children[len(children)-1]); ok { - // payload of all children except the last are written, write last payload - exec.writeObjectPayload(exec.collectedObject) - } - } - } else if prev != nil { - if ok := exec.writeCollectedHeader(); ok { - // TODO: #1155 choose one-by-one restoring algorithm according to size - // * if size > MAX => go right-to-left with HEAD and back with GET - // * else go right-to-left with GET and compose in single object before writing - - if ok := exec.overtakePayloadInReverse(*prev); ok { - // payload of all children except the last are written, write last payloa - exec.writeObjectPayload(exec.collectedObject) - } - } - } else { - exec.log.Debug("could not init parent from child") - } -} - -func (exec *execCtx) initFromChild(obj oid.ID) (prev *oid.ID, children []oid.ID) { - log := exec.log.With(zap.Stringer("child ID", obj)) - - log.Debug("starting assembling from child") - - child, ok := exec.getChild(obj, nil, true) - if !ok { - return - } - - par := child.Parent() - if par == nil { - exec.status = statusUndefined - exec.err = errors.New("received child with empty parent") - - log.Debug("received child with empty parent") - - return - } - - exec.collectedObject = par - - var payload []byte - - if rng := exec.ctxRange(); rng != nil { - seekOff := rng.GetOffset() - seekLen := rng.GetLength() - seekTo := seekOff + seekLen - parSize := par.PayloadSize() - - if seekTo < seekOff || parSize < seekOff || parSize < seekTo { - var errOutOfRange apistatus.ObjectOutOfRange - - exec.err = &errOutOfRange - exec.status = statusOutOfRange - - return - } - - childSize := child.PayloadSize() - - exec.curOff = parSize - childSize - - from := uint64(0) - if exec.curOff < seekOff { - from = seekOff - exec.curOff - } - - to := uint64(0) - if seekOff+seekLen > exec.curOff+from { - to = seekOff + seekLen - exec.curOff - } - - payload = child.Payload()[from:to] - rng.SetLength(rng.GetLength() - to + from) - } else { - payload = child.Payload() - } - - exec.collectedObject.SetPayload(payload) - - idPrev, ok := child.PreviousID() - if ok { - return &idPrev, child.Children() - } - - return nil, child.Children() -} - -func (exec *execCtx) overtakePayloadDirectly(children []oid.ID, rngs []objectSDK.Range, checkRight bool) { - withRng := len(rngs) > 0 && exec.ctxRange() != nil - - for i := range children { - var r *objectSDK.Range - if withRng { - r = &rngs[i] - } - - child, ok := exec.getChild(children[i], r, !withRng && checkRight) - if !ok { - return - } - - if ok := exec.writeObjectPayload(child); !ok { - return - } - } - - exec.status = statusOK - exec.err = nil -} - -func (exec *execCtx) overtakePayloadInReverse(prev oid.ID) bool { - chain, rngs, ok := exec.buildChainInReverse(prev) - if !ok { - return false - } - - reverseRngs := len(rngs) > 0 - - // reverse chain - for left, right := 0, len(chain)-1; left < right; left, right = left+1, right-1 { - chain[left], chain[right] = chain[right], chain[left] - - if reverseRngs { - rngs[left], rngs[right] = rngs[right], rngs[left] - } - } - - exec.overtakePayloadDirectly(chain, rngs, false) - - return exec.status == statusOK -} - -func (exec *execCtx) buildChainInReverse(prev oid.ID) ([]oid.ID, []objectSDK.Range, bool) { - var ( - chain = make([]oid.ID, 0) - rngs = make([]objectSDK.Range, 0) - seekRng = exec.ctxRange() - from = seekRng.GetOffset() - to = from + seekRng.GetLength() - - withPrev = true + exec.log.Debug("assembling splitted object...", + zap.Stringer("address", exec.address()), + zap.Uint64("range_offset", exec.ctxRange().GetOffset()), + zap.Uint64("range_length", exec.ctxRange().GetLength()), + ) + defer exec.log.Debug("assembling splitted object completed", + zap.Stringer("address", exec.address()), + zap.Uint64("range_offset", exec.ctxRange().GetOffset()), + zap.Uint64("range_length", exec.ctxRange().GetLength()), ) - // fill the chain end-to-start - for withPrev { - // check that only for "range" requests, - // for `GET` it stops via the false `withPrev` - if seekRng != nil && exec.curOff <= from { - break - } - - head, ok := exec.headChild(prev) - if !ok { - return nil, nil, false - } - - if seekRng != nil { - sz := head.PayloadSize() - - exec.curOff -= sz - - if exec.curOff < to { - off := uint64(0) - if from > exec.curOff { - off = from - exec.curOff - sz -= from - exec.curOff - } - - if to < exec.curOff+off+sz { - sz = to - off - exec.curOff - } - - index := len(rngs) - rngs = append(rngs, objectSDK.Range{}) - rngs[index].SetOffset(off) - rngs[index].SetLength(sz) - - id, _ := head.ID() - chain = append(chain, id) - } - } else { - id, _ := head.ID() - chain = append(chain, id) - } - - prev, withPrev = head.PreviousID() + obj, err := assembler.Assemble(exec.context(), exec.prm.objWriter) + if err != nil { + exec.log.Warn("failed to assemble splitted object", + zap.Error(err), + zap.Stringer("address", exec.address()), + zap.Uint64("range_offset", exec.ctxRange().GetOffset()), + zap.Uint64("range_length", exec.ctxRange().GetLength()), + ) } - return chain, rngs, true + var errSplitInfo *objectSDK.SplitInfoError + var errRemovedRemote *apistatus.ObjectAlreadyRemoved + var errOutOfRangeRemote *apistatus.ObjectOutOfRange + var errRemovedLocal apistatus.ObjectAlreadyRemoved + var errOutOfRangeLocal apistatus.ObjectOutOfRange + + switch { + default: + exec.status = statusUndefined + exec.err = err + case err == nil: + exec.status = statusOK + exec.err = nil + exec.collectedObject = obj + case errors.As(err, &errRemovedRemote): + exec.status = statusINHUMED + exec.err = errRemovedRemote + case errors.As(err, &errRemovedLocal): + exec.status = statusINHUMED + exec.err = errRemovedLocal + case errors.As(err, &errSplitInfo): + exec.status = statusVIRTUAL + exec.err = errSplitInfo + case errors.As(err, &errOutOfRangeRemote): + exec.status = statusOutOfRange + exec.err = errOutOfRangeRemote + case errors.As(err, &errOutOfRangeLocal): + exec.status = statusOutOfRange + exec.err = errOutOfRangeLocal + } } func equalAddresses(a, b oid.Address) bool { return a.Container().Equals(b.Container()) && a.Object().Equals(b.Object()) } + +func (exec *execCtx) HeadObject(ctx context.Context, id oid.ID) (*objectSDK.Object, error) { + p := exec.prm + p.common = p.common.WithLocalOnly(false) + p.addr.SetContainer(exec.containerID()) + p.addr.SetObject(id) + + prm := HeadPrm{ + commonPrm: p.commonPrm, + } + + w := NewSimpleObjectWriter() + prm.SetHeaderWriter(w) + + err := exec.svc.Head(exec.context(), prm) + + if err != nil { + return nil, err + } + + return w.Object(), nil +} + +func (exec *execCtx) GetObject(ctx context.Context, id oid.ID, rng *objectSDK.Range) (*objectSDK.Object, error) { + w := NewSimpleObjectWriter() + + p := exec.prm + p.common = p.common.WithLocalOnly(false) + p.objWriter = w + p.SetRange(rng) + + p.addr.SetContainer(exec.containerID()) + p.addr.SetObject(id) + + statusError := exec.svc.get(exec.context(), p.commonPrm, withPayloadRange(rng)) + + if statusError.err != nil { + return nil, statusError.err + } + return w.Object(), nil +} diff --git a/pkg/services/object/get/assembler.go b/pkg/services/object/get/assembler.go index 97f88cc9c..4ae1981b1 100644 --- a/pkg/services/object/get/assembler.go +++ b/pkg/services/object/get/assembler.go @@ -15,11 +15,6 @@ type objectGetter interface { HeadObject(ctx context.Context, id oid.ID) (*objectSDK.Object, error) } -type objectWriter interface { - WriteChunk(context.Context, []byte) error - WriteHeader(context.Context, *objectSDK.Object) error -} - var ( errParentAddressDiffers = errors.New("parent address in child object differs") ) @@ -50,7 +45,7 @@ func newAssembler( // Assemble assembles splitted large object and writes it's content to ObjectWriter. // It returns parent object. -func (a *assembler) Assemble(ctx context.Context, writer objectWriter) (*objectSDK.Object, error) { +func (a *assembler) Assemble(ctx context.Context, writer ObjectWriter) (*objectSDK.Object, error) { sourceObjectID, ok := a.getLastPartOrLinkObjectID() if !ok { return nil, objectSDK.NewSplitInfoError(a.splitInfo) @@ -153,7 +148,7 @@ func (a *assembler) getChildObject(ctx context.Context, id oid.ID, rng *objectSD return obj, nil } -func (a *assembler) assembleObjectByChildrenList(ctx context.Context, childrenIDs []oid.ID, writer objectWriter) error { +func (a *assembler) assembleObjectByChildrenList(ctx context.Context, childrenIDs []oid.ID, writer ObjectWriter) error { if a.rng == nil { if err := writer.WriteHeader(ctx, a.parentObject.CutPayload()); err != nil { return err @@ -170,7 +165,7 @@ func (a *assembler) assembleObjectByChildrenList(ctx context.Context, childrenID return nil } -func (a *assembler) assemleObjectByPreviousIDInReverse(ctx context.Context, prevID oid.ID, writer objectWriter) error { +func (a *assembler) assemleObjectByPreviousIDInReverse(ctx context.Context, prevID oid.ID, writer ObjectWriter) error { if a.rng == nil { if err := writer.WriteHeader(ctx, a.parentObject.CutPayload()); err != nil { return err @@ -186,7 +181,7 @@ func (a *assembler) assemleObjectByPreviousIDInReverse(ctx context.Context, prev return nil } -func (a *assembler) assemblePayloadByObjectIDs(ctx context.Context, writer objectWriter, partIDs []oid.ID, partRanges []objectSDK.Range, verifyIsChild bool) error { +func (a *assembler) assemblePayloadByObjectIDs(ctx context.Context, writer ObjectWriter, partIDs []oid.ID, partRanges []objectSDK.Range, verifyIsChild bool) error { withRng := len(partRanges) > 0 && a.rng != nil for i := range partIDs { @@ -207,7 +202,7 @@ func (a *assembler) assemblePayloadByObjectIDs(ctx context.Context, writer objec return nil } -func (a *assembler) assemblePayloadInReverse(ctx context.Context, writer objectWriter, prevID oid.ID) error { +func (a *assembler) assemblePayloadInReverse(ctx context.Context, writer ObjectWriter, prevID oid.ID) error { chain, rngs, err := a.buildChain(ctx, prevID) if err != nil { return err diff --git a/pkg/services/object/get/exec.go b/pkg/services/object/get/exec.go index 505ff043c..ac26eac34 100644 --- a/pkg/services/object/get/exec.go +++ b/pkg/services/object/get/exec.go @@ -3,10 +3,8 @@ package getsvc import ( "context" "crypto/ecdsa" - "errors" clientcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" @@ -36,8 +34,6 @@ type execCtx struct { collectedObject *objectSDK.Object - curOff uint64 - head bool curProcEpoch uint64 @@ -99,15 +95,6 @@ func (exec execCtx) address() oid.Address { return exec.prm.addr } -// isChild checks if reading object is a parent of the given object. -// Object without reference to the parent (only children with the parent header -// have it) is automatically considered as child: this should be guaranteed by -// upper level logic. -func (exec execCtx) isChild(obj *objectSDK.Object) bool { - par := obj.Parent() - return par == nil || equalAddresses(exec.address(), object.AddressOf(par)) -} - func (exec execCtx) key() (*ecdsa.PrivateKey, error) { if exec.prm.signerKey != nil { // the key has already been requested and @@ -199,78 +186,6 @@ func (exec *execCtx) generateTraverser(addr oid.Address) (*placement.Traverser, } } -func (exec *execCtx) getChild(id oid.ID, rng *objectSDK.Range, withHdr bool) (*objectSDK.Object, bool) { - w := NewSimpleObjectWriter() - - p := exec.prm - p.common = p.common.WithLocalOnly(false) - p.objWriter = w - p.SetRange(rng) - - p.addr.SetContainer(exec.containerID()) - p.addr.SetObject(id) - - exec.statusError = exec.svc.get(exec.context(), p.commonPrm, withPayloadRange(rng)) - - child := w.Object() - ok := exec.status == statusOK - - if ok && withHdr && !exec.isChild(child) { - exec.status = statusUndefined - exec.err = errors.New("wrong child header") - - exec.log.Debug("parent address in child object differs") - - return nil, false - } - - return child, ok -} - -func (exec *execCtx) headChild(id oid.ID) (*objectSDK.Object, bool) { - p := exec.prm - p.common = p.common.WithLocalOnly(false) - p.addr.SetContainer(exec.containerID()) - p.addr.SetObject(id) - - prm := HeadPrm{ - commonPrm: p.commonPrm, - } - - w := NewSimpleObjectWriter() - prm.SetHeaderWriter(w) - - err := exec.svc.Head(exec.context(), prm) - - switch { - default: - exec.status = statusUndefined - exec.err = err - - exec.log.Debug("could not get child object header", - zap.Stringer("child ID", id), - zap.String("error", err.Error()), - ) - - return nil, false - case err == nil: - child := w.Object() - - if !exec.isChild(child) { - exec.status = statusUndefined - exec.err = errors.New("parent address in child object differs") - - exec.log.Debug("parent address in child object differs") - return nil, false - } else { - exec.status = statusOK - exec.err = nil - } - - return child, true - } -} - func (exec execCtx) remoteClient(info clientcore.NodeInfo) (getClient, bool) { c, err := exec.svc.clientCache.get(info) @@ -307,6 +222,7 @@ func (exec *execCtx) writeCollectedHeader() bool { } err := exec.prm.objWriter.WriteHeader( + exec.context(), exec.collectedObject.CutPayload(), ) @@ -331,7 +247,7 @@ func (exec *execCtx) writeObjectPayload(obj *objectSDK.Object) bool { return true } - err := exec.prm.objWriter.WriteChunk(obj.Payload()) + err := exec.prm.objWriter.WriteChunk(exec.context(), obj.Payload()) switch { default: diff --git a/pkg/services/object/get/get_test.go b/pkg/services/object/get/get_test.go index 8329a82a7..530e7054d 100644 --- a/pkg/services/object/get/get_test.go +++ b/pkg/services/object/get/get_test.go @@ -216,11 +216,11 @@ func (whe *writeHeaderError) Error() string { type writeHeaderErrorObjectWriter struct { } -func (w *writeHeaderErrorObjectWriter) WriteHeader(_ *objectSDK.Object) error { +func (w *writeHeaderErrorObjectWriter) WriteHeader(_ context.Context, _ *objectSDK.Object) error { return &writeHeaderError{} } -func (w *writeHeaderErrorObjectWriter) WriteChunk(p []byte) error { +func (w *writeHeaderErrorObjectWriter) WriteChunk(_ context.Context, _ []byte) error { return nil } @@ -233,11 +233,11 @@ func (whe *writePayloadError) Error() string { type writePayloadErrorObjectWriter struct { } -func (w *writePayloadErrorObjectWriter) WriteHeader(_ *objectSDK.Object) error { +func (w *writePayloadErrorObjectWriter) WriteHeader(_ context.Context, _ *objectSDK.Object) error { return nil } -func (w *writePayloadErrorObjectWriter) WriteChunk(p []byte) error { +func (w *writePayloadErrorObjectWriter) WriteChunk(_ context.Context, _ []byte) error { return &writePayloadError{} } @@ -1094,8 +1094,7 @@ func TestGetRemoteSmall(t *testing.T) { p.WithAddress(addr) err := svc.Get(ctx, p) - require.Error(t, err) - require.Equal(t, err.Error(), "wrong child header") + require.ErrorIs(t, err, errParentAddressDiffers) w = NewSimpleObjectWriter() payloadSz := srcObj.PayloadSize() @@ -1107,8 +1106,7 @@ func TestGetRemoteSmall(t *testing.T) { rngPrm.WithAddress(addr) err = svc.GetRange(ctx, rngPrm) - require.Error(t, err) - require.Equal(t, err.Error(), "wrong child header") + require.ErrorIs(t, err, errParentAddressDiffers) }) t.Run("linked object with parent udefined", func(t *testing.T) { @@ -1464,8 +1462,7 @@ func TestGetRemoteSmall(t *testing.T) { p.WithAddress(addr) err := svc.Get(ctx, p) - require.Error(t, err) - require.Equal(t, err.Error(), "parent address in child object differs") + require.ErrorIs(t, err, errParentAddressDiffers) w = NewSimpleObjectWriter() payloadSz := srcObj.PayloadSize() @@ -1477,8 +1474,7 @@ func TestGetRemoteSmall(t *testing.T) { rngPrm.WithAddress(addr) err = svc.GetRange(ctx, rngPrm) - require.Error(t, err) - require.Equal(t, err.Error(), "parent address in child object differs") + require.ErrorIs(t, err, errParentAddressDiffers) }) t.Run("OK", func(t *testing.T) { diff --git a/pkg/services/object/get/prm.go b/pkg/services/object/get/prm.go index f548aecfa..88848264e 100644 --- a/pkg/services/object/get/prm.go +++ b/pkg/services/object/get/prm.go @@ -1,6 +1,7 @@ package getsvc import ( + "context" "crypto/ecdsa" "errors" "hash" @@ -85,13 +86,13 @@ type commonPrm struct { // ChunkWriter is an interface of target component // to write payload chunk. type ChunkWriter interface { - WriteChunk([]byte) error + WriteChunk(context.Context, []byte) error } // HeaderWriter is an interface of target component // to write object header. type HeaderWriter interface { - WriteHeader(*object.Object) error + WriteHeader(context.Context, *object.Object) error } // ObjectWriter is an interface of target component to write object. diff --git a/pkg/services/object/get/util.go b/pkg/services/object/get/util.go index bfd79bde9..dc98e6c58 100644 --- a/pkg/services/object/get/util.go +++ b/pkg/services/object/get/util.go @@ -1,6 +1,7 @@ package getsvc import ( + "context" "crypto/ecdsa" "errors" "io" @@ -54,7 +55,7 @@ func NewSimpleObjectWriter() *SimpleObjectWriter { } } -func (s *SimpleObjectWriter) WriteHeader(obj *object.Object) error { +func (s *SimpleObjectWriter) WriteHeader(_ context.Context, obj *object.Object) error { s.obj = obj s.pld = make([]byte, 0, obj.PayloadSize()) @@ -62,7 +63,7 @@ func (s *SimpleObjectWriter) WriteHeader(obj *object.Object) error { return nil } -func (s *SimpleObjectWriter) WriteChunk(p []byte) error { +func (s *SimpleObjectWriter) WriteChunk(_ context.Context, p []byte) error { s.pld = append(s.pld, p...) return nil } @@ -231,12 +232,12 @@ func (e *storageEngineWrapper) get(exec *execCtx) (*object.Object, error) { } } -func (w *partWriter) WriteChunk(p []byte) error { - return w.chunkWriter.WriteChunk(p) +func (w *partWriter) WriteChunk(ctx context.Context, p []byte) error { + return w.chunkWriter.WriteChunk(ctx, p) } -func (w *partWriter) WriteHeader(o *object.Object) error { - return w.headWriter.WriteHeader(o) +func (w *partWriter) WriteHeader(ctx context.Context, o *object.Object) error { + return w.headWriter.WriteHeader(ctx, o) } func payloadOnlyObject(payload []byte) *object.Object { @@ -246,7 +247,7 @@ func payloadOnlyObject(payload []byte) *object.Object { return obj } -func (h *hasherWrapper) WriteChunk(p []byte) error { +func (h *hasherWrapper) WriteChunk(_ context.Context, p []byte) error { _, err := h.hash.Write(p) return err } diff --git a/pkg/services/object/get/v2/streamer.go b/pkg/services/object/get/v2/streamer.go index bccc409ce..4347ef416 100644 --- a/pkg/services/object/get/v2/streamer.go +++ b/pkg/services/object/get/v2/streamer.go @@ -1,6 +1,8 @@ package getsvc import ( + "context" + objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object" objectSvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" @@ -14,7 +16,7 @@ type streamObjectRangeWriter struct { objectSvc.GetObjectRangeStream } -func (s *streamObjectWriter) WriteHeader(obj *object.Object) error { +func (s *streamObjectWriter) WriteHeader(_ context.Context, obj *object.Object) error { p := new(objectV2.GetObjectPartInit) objV2 := obj.ToV2() @@ -25,7 +27,7 @@ func (s *streamObjectWriter) WriteHeader(obj *object.Object) error { return s.GetObjectStream.Send(newResponse(p)) } -func (s *streamObjectWriter) WriteChunk(chunk []byte) error { +func (s *streamObjectWriter) WriteChunk(_ context.Context, chunk []byte) error { p := new(objectV2.GetObjectPartChunk) p.SetChunk(chunk) @@ -43,7 +45,7 @@ func newResponse(p objectV2.GetObjectPart) *objectV2.GetResponse { return r } -func (s *streamObjectRangeWriter) WriteChunk(chunk []byte) error { +func (s *streamObjectRangeWriter) WriteChunk(_ context.Context, chunk []byte) error { return s.GetObjectRangeStream.Send(newRangeResponse(chunk)) } diff --git a/pkg/services/object/get/v2/util.go b/pkg/services/object/get/v2/util.go index d3190d6d9..e0393e56f 100644 --- a/pkg/services/object/get/v2/util.go +++ b/pkg/services/object/get/v2/util.go @@ -162,7 +162,7 @@ func (s *Service) toPrm(req *objectV2.GetRequest, stream objectSvc.GetObjectStre obj.SetHeader(v.GetHeader()) onceHeaderSending.Do(func() { - err = streamWrapper.WriteHeader(object.NewFromV2(obj)) + err = streamWrapper.WriteHeader(stream.Context(), object.NewFromV2(obj)) }) if err != nil { return nil, fmt.Errorf("could not write object header in Get forwarder: %w", err) @@ -180,7 +180,7 @@ func (s *Service) toPrm(req *objectV2.GetRequest, stream objectSvc.GetObjectStre continue } - if err = streamWrapper.WriteChunk(chunk); err != nil { + if err = streamWrapper.WriteChunk(stream.Context(), chunk); err != nil { return nil, fmt.Errorf("could not write object chunk in Get forwarder: %w", err) } @@ -320,7 +320,7 @@ func (s *Service) toRangePrm(req *objectV2.GetRangeRequest, stream objectSvc.Get continue } - if err = streamWrapper.WriteChunk(chunk); err != nil { + if err = streamWrapper.WriteChunk(stream.Context(), chunk); err != nil { return nil, fmt.Errorf("could not write object chunk in GetRange forwarder: %w", err) } @@ -414,7 +414,7 @@ type headResponseWriter struct { body *objectV2.HeadResponseBody } -func (w *headResponseWriter) WriteHeader(hdr *object.Object) error { +func (w *headResponseWriter) WriteHeader(_ context.Context, hdr *object.Object) error { if w.mainOnly { w.body.SetHeaderPart(toShortObjectHeader(hdr)) } else { diff --git a/pkg/services/object_manager/tombstone/source/source.go b/pkg/services/object_manager/tombstone/source/source.go index 1cb408399..8dd0dcabb 100644 --- a/pkg/services/object_manager/tombstone/source/source.go +++ b/pkg/services/object_manager/tombstone/source/source.go @@ -51,7 +51,7 @@ type headerWriter struct { o *objectSDK.Object } -func (h *headerWriter) WriteHeader(o *objectSDK.Object) error { +func (h *headerWriter) WriteHeader(_ context.Context, o *objectSDK.Object) error { h.o = o return nil }