frostfs-node/pkg/local_object_storage/blobstor/blobtree/put.go

122 lines
2.5 KiB
Go
Raw Normal View History

package blobtree
import (
"context"
"os"
"strconv"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)
const (
tempFileSymbols = "###"
)
func (b *BlobTree) Put(ctx context.Context, prm common.PutPrm) (common.PutRes, error) {
var (
success bool
size int
startedAt = time.Now()
)
defer func() {
b.cfg.metrics.Put(time.Since(startedAt), size, success)
}()
_, span := tracing.StartSpanFromContext(ctx, "BlobTree.Put",
trace.WithAttributes(
attribute.String("address", prm.Address.EncodeToString()),
attribute.Bool("dont_compress", prm.DontCompress),
))
defer span.End()
if b.cfg.readOnly {
return common.PutRes{}, common.ErrReadOnly
}
dir := b.getDir(prm.Address)
if err := b.createDir(dir, false); err != nil {
return common.PutRes{}, err
}
if !prm.DontCompress {
prm.RawData = b.compressor.Compress(prm.RawData)
}
path, err := b.saveToLocalDir(prm, dir)
if err != nil {
return common.PutRes{}, err
}
success = true
size = len(prm.RawData)
return common.PutRes{StorageID: pathToStorageID(path)}, nil
}
func (b *BlobTree) saveToLocalDir(prm common.PutPrm, dir string) (string, error) {
returnIdx := true
idx := b.dispatcher.GetIdxForWrite(dir)
path := b.getFilePath(dir, idx)
b.fileLock.Lock(path)
defer b.fileLock.Unlock(path)
defer func() {
if returnIdx {
b.dispatcher.ReturnIdx(dir, idx)
}
}()
currentContent, err := b.readFileContent(path)
if err != nil {
return "", err
}
var newRecord objectData
newRecord.Address = prm.Address
newRecord.Data = prm.RawData
size, err := b.writeToTmpAndRename(append(currentContent, newRecord), path)
if err != nil {
return "", err
}
returnIdx = size < b.cfg.targetFileSizeBytes
return path, nil
}
func (b *BlobTree) writeToTmpAndRename(records []objectData, path string) (uint64, error) {
tmpPath := path + tempFileSymbols + strconv.FormatUint(b.suffix.Add(1), 16)
size, err := b.saveContentToFile(records, tmpPath)
if err != nil {
_ = os.Remove(b.getSystemPath(tmpPath))
return 0, err
}
newFile := false
_, err = os.Stat(b.getSystemPath(path))
if err != nil {
if os.IsNotExist(err) {
newFile = true
} else {
return 0, err
}
}
if err := os.Rename(b.getSystemPath(tmpPath), b.getSystemPath(path)); err != nil {
_ = os.Remove(b.getSystemPath(tmpPath))
return 0, err
}
if newFile {
b.cfg.metrics.IncFilesCount()
}
return size, nil
}