Add context to the transformer #63
1 changed files with 32 additions and 27 deletions
|
@ -132,7 +132,6 @@ func (s *payloadSizeLimiter) initPayloadHashers() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// nolint: funlen
|
|
||||||
func (s *payloadSizeLimiter) release(ctx context.Context, finalize bool) (*AccessIdentifiers, error) {
|
func (s *payloadSizeLimiter) release(ctx context.Context, finalize bool) (*AccessIdentifiers, error) {
|
||||||
// Arg finalize is true only when called from Close method.
|
// Arg finalize is true only when called from Close method.
|
||||||
// We finalize parent and generate linking objects only if it is more
|
// We finalize parent and generate linking objects only if it is more
|
||||||
|
@ -152,6 +151,36 @@ func (s *payloadSizeLimiter) release(ctx context.Context, finalize bool) (*Acces
|
||||||
s.currentHashers[i].writeChecksum(s.current)
|
s.currentHashers[i].writeChecksum(s.current)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ids, err := s.fillHeader()
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("fillHeader: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := s.NextTarget.WriteHeader(ctx, s.current); err != nil {
|
||||||
|
return nil, fmt.Errorf("could not write header to next target: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, err := s.NextTarget.Close(ctx); err != nil {
|
||||||
|
return nil, fmt.Errorf("could not close next target: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// save identifier of the released object
|
||||||
|
s.previous = append(s.previous, ids.SelfID)
|
||||||
|
|
||||||
|
if withParent {
|
||||||
|
// generate and release linking object
|
||||||
|
s.initializeLinking(ids.ParentHeader)
|
||||||
|
s.initializeCurrent()
|
||||||
|
|
||||||
|
if _, err := s.release(ctx, false); err != nil {
|
||||||
|
return nil, fmt.Errorf("could not release linking object: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return ids, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *payloadSizeLimiter) fillHeader() (*AccessIdentifiers, error) {
|
||||||
curEpoch := s.NetworkState.CurrentEpoch()
|
curEpoch := s.NetworkState.CurrentEpoch()
|
||||||
ver := version.Current()
|
ver := version.Current()
|
||||||
|
|
||||||
|
@ -186,36 +215,12 @@ func (s *payloadSizeLimiter) release(ctx context.Context, finalize bool) (*Acces
|
||||||
return nil, fmt.Errorf("could not finalize object: %w", err)
|
return nil, fmt.Errorf("could not finalize object: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := s.NextTarget.WriteHeader(ctx, s.current); err != nil {
|
|
||||||
return nil, fmt.Errorf("could not write header to next target: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if _, err := s.NextTarget.Close(ctx); err != nil {
|
|
||||||
return nil, fmt.Errorf("could not close next target: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
id, _ := s.current.ID()
|
id, _ := s.current.ID()
|
||||||
|
return &AccessIdentifiers{
|
||||||
ids := &AccessIdentifiers{
|
|
||||||
ParentID: parID,
|
ParentID: parID,
|
||||||
SelfID: id,
|
SelfID: id,
|
||||||
ParentHeader: parHdr,
|
ParentHeader: parHdr,
|
||||||
}
|
}, nil
|
||||||
|
|
||||||
// save identifier of the released object
|
|
||||||
s.previous = append(s.previous, ids.SelfID)
|
|
||||||
|
|
||||||
if withParent {
|
|
||||||
// generate and release linking object
|
|
||||||
s.initializeLinking(ids.ParentHeader)
|
|
||||||
s.initializeCurrent()
|
|
||||||
|
|
||||||
if _, err := s.release(ctx, false); err != nil {
|
|
||||||
return nil, fmt.Errorf("could not release linking object: %w", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return ids, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *payloadSizeLimiter) initializeLinking(parHdr *object.Object) {
|
func (s *payloadSizeLimiter) initializeLinking(parHdr *object.Object) {
|
||||||
|
|
Loading…
Reference in a new issue