package transformer import ( "context" "crypto/ecdsa" "crypto/sha256" "fmt" buffPool "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/util/pool" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/checksum" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/version" "git.frostfs.info/TrueCloudLab/tzhash/tz" ) type payloadSizeLimiter struct { Params written, writtenCurrent uint64 current, parent *object.Object payload []byte currentHashers, parentHashers []payloadChecksumHasher previous []oid.ID splitID *object.SplitID parAttrs []object.Attribute nextTarget ObjectWriter } type Params struct { Key *ecdsa.PrivateKey NextTargetInit TargetInitializer SessionToken *session.Object NetworkState EpochSource MaxSize uint64 WithoutHomomorphicHash bool // SizeHint is a hint for the total payload size to be processed. // It is used primarily to optimize allocations and doesn't affect // functionality. Primary usecases are providing file size when putting an object // with the frostfs-cli or using Content-Length header in gateways. SizeHint uint64 Pool *buffPool.BufferPool } // NewPayloadSizeLimiter returns ObjectTarget instance that restricts payload length // of the writing object and writes generated objects to targets from initializer. // // Calculates and adds homomorphic hash to resulting objects only if withoutHomomorphicHash // is false. // // Objects w/ payload size less or equal than max size remain untouched. func NewPayloadSizeLimiter(p Params) ChunkedObjectWriter { return &payloadSizeLimiter{ Params: p, splitID: object.NewSplitID(), } } func (s *payloadSizeLimiter) WriteHeader(_ context.Context, hdr *object.Object) error { s.current = fromObject(hdr) s.initialize() return nil } func (s *payloadSizeLimiter) Write(ctx context.Context, p []byte) (int, error) { if err := s.writeChunk(ctx, p); err != nil { return 0, err } return len(p), nil } func (s *payloadSizeLimiter) Close(ctx context.Context) (*AccessIdentifiers, error) { return s.release(ctx, true) } func (s *payloadSizeLimiter) initialize() { s.current = fromObject(s.current) // if it is an object after the 1st if ln := len(s.previous); ln > 0 { // initialize parent object once (after 1st object) if ln == 1 { ver := version.Current() s.parent = fromObject(s.current) s.parent.ResetRelations() s.parent.SetSignature(nil) s.parent.SetAttributes(s.parAttrs...) s.parent.SetVersion(&ver) s.parentHashers = append(s.parentHashers[:0], s.currentHashers...) } // set previous object to the last previous identifier s.current.SetPreviousID(s.previous[ln-1]) } s.initializeCurrent() } func fromObject(obj *object.Object) *object.Object { cnr, _ := obj.ContainerID() res := object.New() res.SetContainerID(cnr) res.SetOwnerID(obj.OwnerID()) res.SetAttributes(obj.Attributes()...) res.SetType(obj.Type()) // obj.SetSplitID creates splitHeader but we don't need to do it in case // of small objects, so we should make nil check. if obj.SplitID() != nil { res.SetSplitID(obj.SplitID()) } return res } func (s *payloadSizeLimiter) initializeCurrent() { s.nextTarget = s.NextTargetInit() s.writtenCurrent = 0 s.initPayloadHashers() var payloadSize uint64 // Check whether SizeHint is valid. if remaining := s.SizeHint - s.written; remaining <= s.SizeHint { if remaining >= s.MaxSize { payloadSize = s.MaxSize } else { payloadSize = remaining % s.MaxSize } } if s.Pool == nil { s.payload = make([]byte, 0, payloadSize) } else { buffer := s.Pool.Get(uint32(payloadSize)) s.payload = buffer.Data[:0] } } func (s *payloadSizeLimiter) initPayloadHashers() { s.currentHashers = append(s.currentHashers[:0], payloadChecksumHasher{ hasher: sha256.New(), typ: checksum.SHA256, }) if !s.WithoutHomomorphicHash { s.currentHashers = append(s.currentHashers, payloadChecksumHasher{ hasher: tz.New(), typ: checksum.TZ, }) } } func (s *payloadSizeLimiter) release(ctx context.Context, finalize bool) (*AccessIdentifiers, error) { // Arg finalize is true only when called from Close method. // We finalize parent and generate linking objects only if it is more // than 1 object in split-chain. withParent := finalize && len(s.previous) > 0 if withParent { for i := range s.parentHashers { s.parentHashers[i].writeChecksum(s.parent) } s.parent.SetPayloadSize(s.written) s.current.SetParent(s.parent) } // release current object for i := range s.currentHashers { s.currentHashers[i].writeChecksum(s.current) } ids, err := s.fillHeader() if err != nil { return nil, fmt.Errorf("fillHeader: %w", err) } s.current.SetPayload(s.payload) if err := s.nextTarget.WriteObject(ctx, s.current); err != nil { return nil, fmt.Errorf("could not write to next target: %w", err) } // save identifier of the released object s.previous = append(s.previous, ids.SelfID) if withParent { // generate and release linking object s.initializeLinking(ids.ParentHeader) s.initializeCurrent() if _, err := s.release(ctx, false); err != nil { return nil, fmt.Errorf("could not release linking object: %w", err) } } return ids, nil } func (s *payloadSizeLimiter) fillHeader() (*AccessIdentifiers, error) { curEpoch := s.NetworkState.CurrentEpoch() ver := version.Current() s.current.SetVersion(&ver) s.current.SetPayloadSize(s.writtenCurrent) s.current.SetSessionToken(s.SessionToken) s.current.SetCreationEpoch(curEpoch) var ( parID *oid.ID parHdr *object.Object ) if par := s.current.Parent(); par != nil && par.Signature() == nil { rawPar := object.NewFromV2(par.ToV2()) rawPar.SetSessionToken(s.SessionToken) rawPar.SetCreationEpoch(curEpoch) if err := object.SetIDWithSignature(*s.Key, rawPar); err != nil { return nil, fmt.Errorf("could not finalize parent object: %w", err) } id, _ := rawPar.ID() parID = &id parHdr = rawPar s.current.SetParent(parHdr) } if err := object.SetIDWithSignature(*s.Key, s.current); err != nil { return nil, fmt.Errorf("could not finalize object: %w", err) } id, _ := s.current.ID() return &AccessIdentifiers{ ParentID: parID, SelfID: id, ParentHeader: parHdr, Epoch: curEpoch, }, nil } func (s *payloadSizeLimiter) initializeLinking(parHdr *object.Object) { s.current = fromObject(s.current) s.current.SetParent(parHdr) s.current.SetChildren(s.previous...) s.current.SetSplitID(s.splitID) } func (s *payloadSizeLimiter) writeChunk(ctx context.Context, chunk []byte) error { for { // statement is true if the previous write of bytes reached exactly the boundary. if s.written > 0 && s.written%s.MaxSize == 0 { if s.written == s.MaxSize { s.prepareFirstChild() } // we need to release current object if _, err := s.release(ctx, false); err != nil { return fmt.Errorf("could not release object: %w", err) } // initialize another object s.initialize() } var ( ln = uint64(len(chunk)) cut = ln leftToEdge = s.MaxSize - s.written%s.MaxSize ) // write bytes no further than the boundary of the current object if ln > leftToEdge { cut = leftToEdge } if err := s.writeHashes(chunk[:cut]); err != nil { return fmt.Errorf("could not write chunk to target: %w", err) } // increase written bytes counter s.writtenCurrent += cut s.written += cut if cut == ln { return nil } // if there are more bytes in buffer we call method again to start filling another object chunk = chunk[cut:] } } func (s *payloadSizeLimiter) writeHashes(chunk []byte) error { s.payload = append(s.payload, chunk...) // The `Write` method of `hash.Hash` never returns an error. for i := range s.currentHashers { _, _ = s.currentHashers[i].hasher.Write(chunk) } for i := range s.parentHashers { _, _ = s.parentHashers[i].hasher.Write(chunk) } return nil } func (s *payloadSizeLimiter) prepareFirstChild() { // initialize split header with split ID on first object in chain s.current.InitRelations() s.current.SetSplitID(s.splitID) // cut source attributes s.parAttrs = s.current.Attributes() s.current.SetAttributes() // attributes will be added to parent in detachParent }