Add context to the transformer #63
1 changed files with 32 additions and 27 deletions
|
@ -132,7 +132,6 @@ func (s *payloadSizeLimiter) initPayloadHashers() {
|
|||
}
|
||||
}
|
||||
|
||||
// nolint: funlen
|
||||
func (s *payloadSizeLimiter) release(ctx context.Context, finalize bool) (*AccessIdentifiers, error) {
|
||||
// Arg finalize is true only when called from Close method.
|
||||
// We finalize parent and generate linking objects only if it is more
|
||||
|
@ -152,6 +151,36 @@ func (s *payloadSizeLimiter) release(ctx context.Context, finalize bool) (*Acces
|
|||
s.currentHashers[i].writeChecksum(s.current)
|
||||
}
|
||||
|
||||
ids, err := s.fillHeader()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("fillHeader: %w", err)
|
||||
}
|
||||
|
||||
if err := s.NextTarget.WriteHeader(ctx, s.current); err != nil {
|
||||
return nil, fmt.Errorf("could not write header to next target: %w", err)
|
||||
}
|
||||
|
||||
if _, err := s.NextTarget.Close(ctx); err != nil {
|
||||
return nil, fmt.Errorf("could not close next target: %w", err)
|
||||
}
|
||||
|
||||
// save identifier of the released object
|
||||
s.previous = append(s.previous, ids.SelfID)
|
||||
|
||||
if withParent {
|
||||
// generate and release linking object
|
||||
s.initializeLinking(ids.ParentHeader)
|
||||
s.initializeCurrent()
|
||||
|
||||
if _, err := s.release(ctx, false); err != nil {
|
||||
return nil, fmt.Errorf("could not release linking object: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
return ids, nil
|
||||
}
|
||||
|
||||
func (s *payloadSizeLimiter) fillHeader() (*AccessIdentifiers, error) {
|
||||
curEpoch := s.NetworkState.CurrentEpoch()
|
||||
ver := version.Current()
|
||||
|
||||
|
@ -186,36 +215,12 @@ func (s *payloadSizeLimiter) release(ctx context.Context, finalize bool) (*Acces
|
|||
return nil, fmt.Errorf("could not finalize object: %w", err)
|
||||
}
|
||||
|
||||
if err := s.NextTarget.WriteHeader(ctx, s.current); err != nil {
|
||||
return nil, fmt.Errorf("could not write header to next target: %w", err)
|
||||
}
|
||||
|
||||
if _, err := s.NextTarget.Close(ctx); err != nil {
|
||||
return nil, fmt.Errorf("could not close next target: %w", err)
|
||||
}
|
||||
|
||||
id, _ := s.current.ID()
|
||||
|
||||
ids := &AccessIdentifiers{
|
||||
return &AccessIdentifiers{
|
||||
ParentID: parID,
|
||||
SelfID: id,
|
||||
ParentHeader: parHdr,
|
||||
}
|
||||
|
||||
// save identifier of the released object
|
||||
s.previous = append(s.previous, ids.SelfID)
|
||||
|
||||
if withParent {
|
||||
// generate and release linking object
|
||||
s.initializeLinking(ids.ParentHeader)
|
||||
s.initializeCurrent()
|
||||
|
||||
if _, err := s.release(ctx, false); err != nil {
|
||||
return nil, fmt.Errorf("could not release linking object: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
return ids, nil
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (s *payloadSizeLimiter) initializeLinking(parHdr *object.Object) {
|
||||
|
|
Loading…
Reference in a new issue