Add context to the transformer #63

Merged
fyrchik merged 1 commit from fyrchik/frostfs-sdk-go:transformer-add-context into master 2023-07-26 21:08:02 +00:00

View file

@ -132,7 +132,6 @@ func (s *payloadSizeLimiter) initPayloadHashers() {
}
}
// nolint: funlen
func (s *payloadSizeLimiter) release(ctx context.Context, finalize bool) (*AccessIdentifiers, error) {
// Arg finalize is true only when called from Close method.
// We finalize parent and generate linking objects only if it is more
@ -152,6 +151,36 @@ func (s *payloadSizeLimiter) release(ctx context.Context, finalize bool) (*Acces
s.currentHashers[i].writeChecksum(s.current)
}
ids, err := s.fillHeader()
if err != nil {
return nil, fmt.Errorf("fillHeader: %w", err)
}
if err := s.NextTarget.WriteHeader(ctx, s.current); err != nil {
return nil, fmt.Errorf("could not write header to next target: %w", err)
}
if _, err := s.NextTarget.Close(ctx); err != nil {
return nil, fmt.Errorf("could not close next target: %w", err)
}
// save identifier of the released object
s.previous = append(s.previous, ids.SelfID)
if withParent {
// generate and release linking object
s.initializeLinking(ids.ParentHeader)
s.initializeCurrent()
if _, err := s.release(ctx, false); err != nil {
return nil, fmt.Errorf("could not release linking object: %w", err)
}
}
return ids, nil
}
func (s *payloadSizeLimiter) fillHeader() (*AccessIdentifiers, error) {
curEpoch := s.NetworkState.CurrentEpoch()
ver := version.Current()
@ -186,36 +215,12 @@ func (s *payloadSizeLimiter) release(ctx context.Context, finalize bool) (*Acces
return nil, fmt.Errorf("could not finalize object: %w", err)
}
if err := s.NextTarget.WriteHeader(ctx, s.current); err != nil {
return nil, fmt.Errorf("could not write header to next target: %w", err)
}
if _, err := s.NextTarget.Close(ctx); err != nil {
return nil, fmt.Errorf("could not close next target: %w", err)
}
id, _ := s.current.ID()
ids := &AccessIdentifiers{
return &AccessIdentifiers{
ParentID: parID,
SelfID: id,
ParentHeader: parHdr,
}
// save identifier of the released object
s.previous = append(s.previous, ids.SelfID)
if withParent {
// generate and release linking object
s.initializeLinking(ids.ParentHeader)
s.initializeCurrent()
if _, err := s.release(ctx, false); err != nil {
return nil, fmt.Errorf("could not release linking object: %w", err)
}
}
return ids, nil
}, nil
}
func (s *payloadSizeLimiter) initializeLinking(parHdr *object.Object) {