package writecachebitcask import ( "bytes" "context" "encoding/binary" "errors" "fmt" "io" "os" "time" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "go.uber.org/zap" ) func (c *cache) Flush(ctx context.Context, ignoreErrors bool) error { var lastErr error // Forcibly rotate all active membuffers and log files for _, r := range c.regions { r.flushWriteBatch() r.Lock() if err := r.rotateLogFile(ctx); err != nil && !ignoreErrors { lastErr = err } r.Unlock() } // Wait for all flush channels to drain for _, r := range c.regions { for len(r.flushCh) > 0 { time.Sleep(1 * time.Second) } } return lastErr } func (r *region) flushWorker() { for logIndex := range r.flushCh { again: // Read the whole log file contents in memory b, err := os.ReadFile(r.logFilePath(logIndex)) if err != nil { r.opts.log.Error(logs.WritecacheBitcaskReadingLogFile, zap.Int("region", r.index), zap.Uint32("logIndex", logIndex), zap.Error(err)) time.Sleep(1 * time.Second) goto again } // Flush the log file contents for { err := r.flushBytes(logIndex, b) if err == nil { break } r.opts.log.Error(logs.WritecacheBitcaskFlushingLogBytes, zap.Int("region", r.index), zap.Uint32("logIndex", logIndex), zap.Error(err)) time.Sleep(1 * time.Second) } // Delete the log file if err := os.Remove(r.logFilePath(logIndex)); err != nil { r.opts.log.Error(logs.WritecacheBitcaskRemovingLogFile, zap.Int("region", r.index), zap.Uint32("logIndex", logIndex), zap.Error(err)) } } } func (r *region) flushEntry(logIndex uint32, offset int, addr oid.Address, obj *objectSDK.Object) error { // Put the object to the underlying storage and store its storageID var storageID []byte if obj != nil { var prm common.PutPrm prm.Object = obj res, err := r.opts.blobstor.Put(context.TODO(), prm) if err != nil { return fmt.Errorf("putting object in main storage: %w", err) } storageID = res.StorageID } r.Lock() // Find the current log index and offset of the entry in the key directory. bucket := r.locateBucket(addr) var curLogIndex uint32 curOffset := -1 bucketIndex := -1 for i, e := range r.keyDir[bucket] { if e.addr.Equals(addr) { bucketIndex = i curLogIndex = e.logIndex curOffset = e.offset break } } // If the log file entry is up-to-date, then update the object metadata as well. if curLogIndex == logIndex && curOffset == offset && storageID != nil { var updPrm meta.UpdateStorageIDPrm updPrm.SetAddress(addr) updPrm.SetStorageID(storageID) if _, err := r.opts.metabase.UpdateStorageID(updPrm); err != nil { r.Unlock() return fmt.Errorf("updating object metadata: %w", err) } } // If the entry is currently in the key directory, remove it. if bucketIndex != -1 { last := len(r.keyDir[bucket]) - 1 r.keyDir[bucket][bucketIndex], r.keyDir[bucket][last] = r.keyDir[bucket][last], r.keyDir[bucket][bucketIndex] r.keyDir[bucket] = r.keyDir[bucket][:last] } r.Unlock() return nil } func (r *region) flushBytes(logIndex uint32, b []byte) error { rd := bytes.NewReader(b) for offset := 0; ; { addr, obj, n, err := readLogFileEntry(rd) if err != nil { if errors.Is(err, io.EOF) { break } return fmt.Errorf("reading log file entry: %w", err) } if err := r.flushEntry(logIndex, offset, addr, obj); err != nil { if rf := r.opts.reportError; rf != nil { rf(addr.EncodeToString(), err) } return fmt.Errorf("flushing log entry: %w", err) } offset += n } return nil } // readLogFileEntry reads an log file entry from the given reader. // It returns the corresponding address, object and entry size. // A nil object is returned if the entry correspond to object deletion. func readLogFileEntry(r io.Reader) (oid.Address, *objectSDK.Object, int, error) { // Read address header var addr oid.Address var c cid.ID var o oid.ID if _, err := r.Read(c[:]); err != nil { return addr, nil, 0, fmt.Errorf("reading container ID: %w", err) } if _, err := r.Read(o[:]); err != nil { return addr, nil, 0, fmt.Errorf("reading object ID: %w", err) } addr.SetContainer(c) addr.SetObject(o) // Read payload size var sizeBytes [sizeLen]byte if _, err := r.Read(sizeBytes[:]); err != nil { return addr, nil, 0, fmt.Errorf("reading object size: %w", err) } size := binary.LittleEndian.Uint32(sizeBytes[:]) // Read and unmarshal object, if needed var data []byte var obj *objectSDK.Object if size != tombstone { data = make([]byte, size) if _, err := r.Read(data); err != nil { return addr, nil, 0, fmt.Errorf("reading object data: %w", err) } obj = objectSDK.New() if err := obj.Unmarshal(data); err != nil { return addr, nil, 0, fmt.Errorf("unmarshaling object: %w", err) } } return addr, obj, keyLen + sizeLen + len(data), nil }