package blobovnicza import ( "context" "errors" "fmt" "syscall" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "go.etcd.io/bbolt" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" ) // PutPrm groups the parameters of Put operation. type PutPrm struct { addr oid.Address objData []byte force bool } // PutRes groups the resulting values of Put operation. type PutRes struct{} // SetAddress sets the address of the saving object. func (p *PutPrm) SetAddress(addr oid.Address) { p.addr = addr } // SetMarshaledObject sets binary representation of the object. func (p *PutPrm) SetMarshaledObject(data []byte) { p.objData = data } // SetForce sets force option. func (p *PutPrm) SetForce(f bool) { p.force = f } // Put saves an object in Blobovnicza. // // If binary representation of the object is not set, // it is calculated via Marshal method. // // The size of the object MUST BE less that or equal to // the size specified in WithObjectSizeLimit option. // // Returns any error encountered that // did not allow to completely save the object. // // Returns ErrFull if blobovnicza is filled. // // Should not be called in read-only configuration. func (b *Blobovnicza) Put(ctx context.Context, prm PutPrm) (PutRes, error) { _, span := tracing.StartSpanFromContext(ctx, "Blobovnicza.Put", trace.WithAttributes( attribute.String("path", b.path), attribute.String("address", prm.addr.EncodeToString()), attribute.Int("size", len(prm.objData)), )) defer span.End() b.controlMtx.RLock() defer b.controlMtx.RUnlock() sz := uint64(len(prm.objData)) bucketName := bucketForSize(sz) key := addressKey(prm.addr) recordSize := sz + uint64(len(key)) err := b.boltDB.Batch(func(tx *bbolt.Tx) error { buck := tx.Bucket(bucketName) if buck == nil { // expected to happen: // - before initialization step (incorrect usage by design) // - if DB is corrupted (in future this case should be handled) // - blobovnicza's object size changed before rebuild (handled if prm.force flag specified) if !prm.force { return logicerr.Wrap(fmt.Errorf("(%T) bucket for size %d not created", b, sz)) } var err error buck, err = tx.CreateBucket(bucketName) if err != nil { return fmt.Errorf("(%T) failed to create bucket for size %d: %w", b, sz, err) } } // save the object in bucket if err := buck.Put(key, prm.objData); err != nil { return fmt.Errorf("(%T) could not save object in bucket: %w", b, err) } return updateMeta(tx, func(count, size uint64) (uint64, uint64) { return count + 1, size + recordSize }) }) if err == nil { b.itemAdded(recordSize) } else if errors.Is(err, syscall.ENOSPC) { err = ErrNoSpace } return PutRes{}, err } func addressKey(addr oid.Address) []byte { return []byte(addr.EncodeToString()) } func addressFromKey(dst *oid.Address, data []byte) error { return dst.DecodeString(string(data)) }