diff --git a/backend/press/alg_gzip.go b/backend/press/alg_gzip.go deleted file mode 100644 index 8649f5ec8..000000000 --- a/backend/press/alg_gzip.go +++ /dev/null @@ -1,75 +0,0 @@ -package press - -import ( - "bufio" - "io" - - "github.com/klauspost/compress/gzip" -) - -// AlgGzip represents gzip compression algorithm -type AlgGzip struct { - level int - blockSize uint32 -} - -// InitializeGzip initializes the gzip compression Algorithm -func InitializeGzip(bs uint32, level int) Algorithm { - a := new(AlgGzip) - a.blockSize = bs - a.level = level - return a -} - -// GetFileExtension returns file extension -func (a *AlgGzip) GetFileExtension() string { - return ".gz" -} - -// GetHeader returns the Lz4 compression header -func (a *AlgGzip) GetHeader() []byte { - return []byte{} -} - -// GetFooter returns -func (a *AlgGzip) GetFooter() []byte { - return []byte{} -} - -// CompressBlock that compresses a block using gzip -func (a *AlgGzip) CompressBlock(in []byte, out io.Writer) (compressedSize uint32, uncompressedSize uint64, err error) { - // Initialize buffer - bufw := bufio.NewWriterSize(out, int(a.blockSize+(a.blockSize)>>4)) - - // Initialize block writer - outw, err := gzip.NewWriterLevel(bufw, a.level) - if err != nil { - return 0, 0, err - } - - // Compress block - _, err = outw.Write(in) - if err != nil { - return 0, 0, err - } - - // Finalize gzip file, flush buffer and return - err = outw.Close() - if err != nil { - return 0, 0, err - } - blockSize := uint32(bufw.Buffered()) - err = bufw.Flush() - - return blockSize, uint64(len(in)), err -} - -// DecompressBlock decompresses Lz4 compressed block -func (a *AlgGzip) DecompressBlock(in io.Reader, out io.Writer, BlockSize uint32) (n int, err error) { - gzipReader, err := gzip.NewReader(in) - if err != nil { - return 0, err - } - written, err := io.Copy(out, gzipReader) - return int(written), err -} diff --git a/backend/press/alg_lz4.go b/backend/press/alg_lz4.go deleted file mode 100644 index cf43716d2..000000000 --- a/backend/press/alg_lz4.go +++ /dev/null @@ -1,223 +0,0 @@ -package press - -// This file implements the LZ4 algorithm. -import ( - "bytes" - "encoding/binary" - "fmt" - "io" - "math/bits" - - "github.com/buengese/xxh32" - lz4 "github.com/pierrec/lz4" -) - -/* -Structure of LZ4 header: -Flags: - Version = 01 - Independent = 1 - Block Checksum = 1 - Content Size = 0 - Content Checksum = 0 - Reserved = 0 - Dictionary ID = 0 - -BD byte: - Reserved = 0 - Block Max Size = 101 (or 5; 256kb) - Reserved = 0000 - -Header checksum byte (xxhash(flags and bd byte) >> 1) & 0xff -*/ - -// LZ4Header - Header of our LZ4 file -//var LZ4Header = []byte{0x04, 0x22, 0x4d, 0x18, 0x70, 0x50, 0x84} - -// LZ4Footer - Footer of our LZ4 file -var LZ4Footer = []byte{0x00, 0x00, 0x00, 0x00} // This is just an empty block - -const ( - frameMagic uint32 = 0x184D2204 - - compressedBlockFlag = 1 << 31 - compressedBlockMask = compressedBlockFlag - 1 -) - -// AlgLz4 is the Lz4 Compression algorithm -type AlgLz4 struct { - Header lz4.Header - buf [19]byte // magic number(4) + header(flags(2)+[Size(8)+DictID(4)]+checksum(1)) does not exceed 19 bytes -} - -// InitializeLz4 creates an Lz4 compression algorithm -func InitializeLz4(bs uint32, blockChecksum bool) Algorithm { - a := new(AlgLz4) - a.Header.Reset() - a.Header = lz4.Header{ - BlockChecksum: blockChecksum, - BlockMaxSize: int(bs), - } - return a -} - -// GetFileExtension returns file extension -func (a *AlgLz4) GetFileExtension() string { - return ".lz4" -} - -// GetHeader returns the Lz4 compression header -func (a *AlgLz4) GetHeader() []byte { - // Size is optional. - buf := a.buf[:] - - // Set the fixed size data: magic number, block max size and flags. - binary.LittleEndian.PutUint32(buf[0:], frameMagic) - flg := byte(lz4.Version << 6) - flg |= 1 << 5 // No block dependency. - if a.Header.BlockChecksum { - flg |= 1 << 4 - } - if a.Header.Size > 0 { - flg |= 1 << 3 - } - buf[4] = flg - buf[5] = blockSizeValueToIndex(a.Header.BlockMaxSize) << 4 - - // Current buffer size: magic(4) + flags(1) + block max size (1). - n := 6 - if a.Header.Size > 0 { - binary.LittleEndian.PutUint64(buf[n:], a.Header.Size) - n += 8 - } - - // The header checksum includes the flags, block max size and optional Size. - buf[n] = byte(xxh32.ChecksumZero(buf[4:n]) >> 8 & 0xFF) - - // Header ready, write it out. - return buf[0 : n+1] -} - -// GetFooter returns -func (a *AlgLz4) GetFooter() []byte { - return LZ4Footer -} - -// CompressBlock that compresses a block using lz4 -func (a *AlgLz4) CompressBlock(in []byte, out io.Writer) (compressedSize uint32, uncompressedSize uint64, err error) { - if len(in) > 0 { - n, err := a.compressBlock(in, out) - if err != nil { - return 0, 0, err - } - return n, uint64(len(in)), nil - } - - return 0, 0, nil -} - -// compressBlock compresses a block. -func (a *AlgLz4) compressBlock(data []byte, dst io.Writer) (uint32, error) { - zdata := make([]byte, a.Header.BlockMaxSize) // The compressed block size cannot exceed the input's. - var zn int - if level := a.Header.CompressionLevel; level != 0 { - zn, _ = lz4.CompressBlockHC(data, zdata, level) - } else { - var hashTable [1 << 16]int - zn, _ = lz4.CompressBlock(data, zdata, hashTable[:]) - } - - var bLen uint32 - if zn > 0 && zn < len(data) { - // Compressible and compressed size smaller than uncompressed: ok! - bLen = uint32(zn) - zdata = zdata[:zn] - } else { - // Uncompressed block. - bLen = uint32(len(data)) | compressedBlockFlag - zdata = data - } - - // Write the block. - if err := a.writeUint32(bLen, dst); err != nil { - return 0, err - } - _, err := dst.Write(zdata) - if err != nil { - return 0, err - } - - if !a.Header.BlockChecksum { - return bLen, nil - } - checksum := xxh32.ChecksumZero(zdata) - if err := a.writeUint32(checksum, dst); err != nil { - return 0, err - } - return bLen, nil -} - -// writeUint32 writes a uint32 to the underlying writer. -func (a *AlgLz4) writeUint32(x uint32, dst io.Writer) error { - buf := make([]byte, 4) - binary.LittleEndian.PutUint32(buf, x) - _, err := dst.Write(buf) - return err -} - -func blockSizeValueToIndex(size int) byte { - return 4 + byte(bits.TrailingZeros(uint(size)>>16)/2) -} - -// DecompressBlock decompresses Lz4 compressed block -func (a *AlgLz4) DecompressBlock(in io.Reader, out io.Writer, BlockSize uint32) (n int, err error) { - // Get our compressed data - var b bytes.Buffer - _, err = io.Copy(&b, in) - if err != nil { - return 0, err - } - zdata := b.Bytes() - bLen := binary.LittleEndian.Uint32(zdata[:4]) - - if bLen&compressedBlockFlag > 0 { - // Uncompressed block. - bLen &= compressedBlockMask - - if bLen > BlockSize { - return 0, fmt.Errorf("lz4: invalid block size: %d", bLen) - } - data := zdata[4 : bLen+4] - - if a.Header.BlockChecksum { - checksum := binary.LittleEndian.Uint32(zdata[4+bLen:]) - - if h := xxh32.ChecksumZero(data); h != checksum { - return 0, fmt.Errorf("lz4: invalid block checksum: got %x; expected %x", h, checksum) - } - } - _, err := out.Write(data) - return len(data), err - } - - // compressed block - if bLen > BlockSize { - return 0, fmt.Errorf("lz4: invalid block size: %d", bLen) - } - - if a.Header.BlockChecksum { - checksum := binary.LittleEndian.Uint32(zdata[4+bLen:]) - - if h := xxh32.ChecksumZero(zdata[4 : bLen+4]); h != checksum { - return 0, fmt.Errorf("lz4: invalid block checksum: got %x; expected %x", h, checksum) - } - } - - data := make([]byte, BlockSize) - n, err = lz4.UncompressBlock(zdata[4:bLen+4], data) - if err != nil { - return 0, err - } - _, err = out.Write(data[:n]) - return n, err -} diff --git a/backend/press/alg_xz.go b/backend/press/alg_xz.go deleted file mode 100644 index 415bbb117..000000000 --- a/backend/press/alg_xz.go +++ /dev/null @@ -1,75 +0,0 @@ -package press - -import ( - "bufio" - "io" - - "github.com/ulikunitz/xz" -) - -// AlgXZ represents the XZ compression algorithm -type AlgXZ struct { - blockSize uint32 - config xz.WriterConfig -} - -// InitializeXZ creates an Lz4 compression algorithm -func InitializeXZ(bs uint32) Algorithm { - a := new(AlgXZ) - a.blockSize = bs - a.config = xz.WriterConfig{} - return a -} - -// GetFileExtension returns file extension -func (a *AlgXZ) GetFileExtension() string { - return ".xz" -} - -// GetHeader returns the Lz4 compression header -func (a *AlgXZ) GetHeader() []byte { - return []byte{} -} - -// GetFooter returns -func (a *AlgXZ) GetFooter() []byte { - return []byte{} -} - -// CompressBlock that compresses a block using lz4 -func (a *AlgXZ) CompressBlock(in []byte, out io.Writer) (compressedSize uint32, uncompressedSize uint64, err error) { - // Initialize buffer - bufw := bufio.NewWriterSize(out, int(a.blockSize+(a.blockSize)>>4)) - - // Initialize block writer - outw, err := a.config.NewWriter(bufw) - if err != nil { - return 0, 0, err - } - - // Compress block - _, err = outw.Write(in) - if err != nil { - return 0, 0, err - } - - // Finalize gzip file, flush buffer and return - err = outw.Close() - if err != nil { - return 0, 0, err - } - blockSize := uint32(bufw.Buffered()) - err = bufw.Flush() - - return blockSize, uint64(len(in)), err -} - -// DecompressBlock decompresses Lz4 compressed block -func (a *AlgXZ) DecompressBlock(in io.Reader, out io.Writer, BlockSize uint32) (n int, err error) { - xzReader, err := xz.NewReader(in) - if err != nil { - return 0, err - } - written, err := io.Copy(out, xzReader) - return int(written), err -} diff --git a/backend/press/compression.go b/backend/press/compression.go deleted file mode 100644 index b60e1e4b1..000000000 --- a/backend/press/compression.go +++ /dev/null @@ -1,526 +0,0 @@ -// Package press provides wrappers for Fs and Object which implement compression. -// This file is the backend implementation for seekable compression. -package press - -import ( - "bufio" - "bytes" - "errors" - "fmt" - "io" - "io/ioutil" - "log" -) - -// Compression modes -const ( - Uncompressed = -1 - LZ4 = 2 - Gzip = 4 - XZ = 8 -) - -// Errors -var ( - ErrMetadataCorrupted = errors.New("metadata may have been corrupted") -) - -// DEBUG - flag for debug mode -const DEBUG = false - -// Compression is a struct containing configurable variables (what used to be constants) -type Compression struct { - CompressionMode int // Compression mode - Algorithm Algorithm - BlockSize uint32 // Size of blocks. Higher block size means better compression but more download bandwidth needed for small downloads - // ~1MB is recommended for xz, while ~128KB is recommended for gzip and lz4 - HeuristicBytes int64 // Bytes to perform gzip heuristic on to determine whether a file should be compressed - NumThreads int // Number of threads to use for compression - MaxCompressionRatio float64 // Maximum compression ratio for a file to be considered compressible - BinPath string // Path to compression binary. This is used for all non-gzip compression. -} - -// Algorithm is the main compression Algorithm Interface -type Algorithm interface { - GetHeader() []byte - - GetFileExtension() string - - CompressBlock(in []byte, out io.Writer) (compressedSize uint32, uncompressedSize uint64, err error) - - DecompressBlock(in io.Reader, out io.Writer, BlockSize uint32) (n int, err error) - - GetFooter() []byte -} - -// NewCompressionPreset creates a Compression object with a preset mode/bs -func NewCompressionPreset(preset string) (*Compression, error) { - switch preset { - case "lz4": - alg := InitializeLz4(262144, true) - return NewCompression(LZ4, alg, 262144) // LZ4 compression (very fast) - case "gzip": - alg := InitializeGzip(131072, 6) - return NewCompression(Gzip, alg, 131070) // GZIP-default compression (medium)*/ - case "xz": - alg := InitializeXZ(1048576) - return NewCompression(XZ, alg, 1048576) // XZ compression (strong compression)*/ - } - return nil, errors.New("Compression mode doesn't exist") -} - -// NewCompressionPresetNumber creates a Compression object with a preset mode/bs -func NewCompressionPresetNumber(preset int) (*Compression, error) { - switch preset { - case LZ4: - alg := InitializeLz4(262144, true) - return NewCompression(LZ4, alg, 262144) // LZ4 compression (very fast) - case Gzip: - alg := InitializeGzip(131072, 6) - return NewCompression(Gzip, alg, 131070) // GZIP-default compression (medium)*/ - case XZ: - alg := InitializeXZ(1048576) - return NewCompression(XZ, alg, 1048576) // XZ compression (strong compression)*/ - } - return nil, errors.New("Compression mode doesn't exist") -} - -// NewCompression creates a Compression object with some default configuration values -func NewCompression(mode int, alg Algorithm, bs uint32) (*Compression, error) { - return NewCompressionAdvanced(mode, alg, bs, 1048576, 12, 0.9) -} - -// NewCompressionAdvanced creates a Compression object -func NewCompressionAdvanced(mode int, alg Algorithm, bs uint32, hb int64, threads int, mcr float64) (c *Compression, err error) { - // Set vars - c = new(Compression) - c.Algorithm = alg - c.CompressionMode = mode - c.BlockSize = bs - c.HeuristicBytes = hb - c.NumThreads = threads - c.MaxCompressionRatio = mcr - return c, err -} - -/*** UTILITY FUNCTIONS ***/ - -// GetFileExtension gets a file extension for current compression mode -func (c *Compression) GetFileExtension() string { - return c.Algorithm.GetFileExtension() -} - -// GetFileCompressionInfo gets a file extension along with compressibility of file -func (c *Compression) GetFileCompressionInfo(reader io.Reader) (compressable bool, extension string, err error) { - // Use our compression algorithm to do a heuristic on the first few bytes - var emulatedBlock, emulatedBlockCompressed bytes.Buffer - _, err = io.CopyN(&emulatedBlock, reader, c.HeuristicBytes) - if err != nil && err != io.EOF { - return false, "", err - } - compressedSize, uncompressedSize, err := c.Algorithm.CompressBlock(emulatedBlock.Bytes(), &emulatedBlockCompressed) - if err != nil { - return false, "", err - } - compressionRatio := float64(compressedSize) / float64(uncompressedSize) - - // If the data is not compressible, return so - if compressionRatio > c.MaxCompressionRatio { - return false, ".bin", nil - } - - // If the file is compressible, select file extension based on compression mode - return true, c.Algorithm.GetFileExtension(), nil -} - -/*** MAIN COMPRESSION INTERFACE ***/ -// compressionResult represents the result of compression for a single block (gotten by a single thread) -type compressionResult struct { - buffer *bytes.Buffer - n uint64 - err error -} - -// CompressFileReturningBlockData compresses a file returning the block data for that file. -func (c *Compression) CompressFileReturningBlockData(in io.Reader, out io.Writer) (blockData []uint32, err error) { - // Initialize buffered writer - bufw := bufio.NewWriterSize(out, int((c.BlockSize+(c.BlockSize)>>4)*uint32(c.NumThreads))) - - // Get blockData, copy over header, add length of header to blockData - blockData = make([]uint32, 0) - header := c.Algorithm.GetHeader() - _, err = bufw.Write(header) - if err != nil { - return nil, err - } - blockData = append(blockData, uint32(len(header))) - - // Compress blocks - for { - // Loop through threads, spawning a go procedure for each thread. If we get eof on one thread, set eofAt to that thread and break - compressionResults := make([]chan compressionResult, c.NumThreads) - eofAt := -1 - for i := 0; i < c.NumThreads; i++ { - // Create thread channel and allocate buffer to pass to thread - compressionResults[i] = make(chan compressionResult) - var inputBuffer bytes.Buffer - _, err = io.CopyN(&inputBuffer, in, int64(c.BlockSize)) - if err == io.EOF { - eofAt = i - } else if err != nil { - return nil, err - } - // Run thread - go func(i int, in []byte) { - // Initialize thread writer and result struct - var res compressionResult - var buffer bytes.Buffer - - // Compress block - _, n, err := c.Algorithm.CompressBlock(in, &buffer) - if err != nil && err != io.EOF { // This errored out. - res.buffer = nil - res.n = 0 - res.err = err - compressionResults[i] <- res - return - } - // Pass our data back to the main thread as a compression result - res.buffer = &buffer - res.n = n - res.err = err - compressionResults[i] <- res - }(i, inputBuffer.Bytes()) - // If we have reached eof, we don't need more threads - if eofAt != -1 { - break - } - } - - // Process writers in order - for i := 0; i < c.NumThreads; i++ { - if compressionResults[i] != nil { - // Get current compression result, get buffer, and copy buffer over to output - res := <-compressionResults[i] - close(compressionResults[i]) - if res.buffer == nil { - return nil, res.err - } - blockSize := uint32(res.buffer.Len()) - - _, err = io.Copy(bufw, res.buffer) - if err != nil { - return nil, err - } - if DEBUG { - fmt.Printf("%d %d\n", res.n, blockSize) - } - - // Append block size to block data - blockData = append(blockData, blockSize) - - // If this is the last block, add the raw size of the last block to the end of blockData and break - if eofAt == i { - if DEBUG { - log.Printf("%d %d %d\n", res.n, byte(res.n%256), byte(res.n/256)) - } - - blockData = append(blockData, uint32(res.n)) - break - } - } - } - - // Get number of bytes written in this block (they should all be in the bufio buffer), then close gzip and flush buffer - err = bufw.Flush() - if err != nil { - return nil, err - } - - // If eof happened, break - if eofAt != -1 { - if DEBUG { - log.Printf("%d", eofAt) - log.Printf("%v", blockData) - } - break - } - } - - // Write footer and flush - footer := c.Algorithm.GetFooter() - - _, err = bufw.Write(footer) - if err != nil { - return nil, err - } - err = bufw.Flush() - - // Return - return blockData, err -} - -/*** BLOCK DECOMPRESSION FUNCTIONS ***/ - -// Wrapper function for decompressBlock that implements multithreading -// decompressionResult represents the result of decompressing a block -type decompressionResult struct { - err error - buffer *bytes.Buffer -} - -func (d *Decompressor) decompressBlockRangeMultithreaded(in io.Reader, out io.Writer, startingBlock uint32) (n int, err error) { - // First, use bufio.Reader to reduce the number of reads and bufio.Writer to reduce the number of writes - bufin := bufio.NewReader(in) - bufout := bufio.NewWriter(out) - - // Decompress each block individually. - currBatch := startingBlock // Block # of start of current batch of blocks - totalBytesCopied := 0 - for { - // Loop through threads - eofAt := -1 - decompressionResults := make([]chan decompressionResult, d.c.NumThreads) - - for i := 0; i < d.c.NumThreads; i++ { - // Get currBlock - currBlock := currBatch + uint32(i) - - // Create channel - decompressionResults[i] = make(chan decompressionResult) - - // Check if we've reached EOF - if currBlock >= d.numBlocks { - eofAt = i - break - } - - // Get block to decompress - var compressedBlock bytes.Buffer - var err error - n, err := io.CopyN(&compressedBlock, bufin, d.blockStarts[currBlock+1]-d.blockStarts[currBlock]) - if err != nil || n == 0 { // End of stream - eofAt = i - break - } - - // Spawn thread to decompress block - if DEBUG { - log.Printf("Spawning %d", i) - } - go func(i int, currBlock uint32, in io.Reader) { - var block bytes.Buffer - var res decompressionResult - - // Decompress block - _, res.err = d.c.Algorithm.DecompressBlock(in, &block, d.c.BlockSize) - res.buffer = &block - decompressionResults[i] <- res - }(i, currBlock, &compressedBlock) - } - if DEBUG { - log.Printf("Eof at %d", eofAt) - } - - // Process results - for i := 0; i < d.c.NumThreads; i++ { - // If we got EOF, return - if eofAt == i { - return totalBytesCopied, bufout.Flush() // Flushing bufout is needed to prevent us from getting all nulls - } - - // Get result and close - res := <-decompressionResults[i] - close(decompressionResults[i]) - if res.err != nil { - return totalBytesCopied, res.err - } - - // Copy to output and add to total bytes copied - n, err := io.Copy(bufout, res.buffer) - totalBytesCopied += int(n) - if err != nil { - return totalBytesCopied, err - } - } - - // Add NumThreads to currBatch - currBatch += uint32(d.c.NumThreads) - } -} - -/*** MAIN DECOMPRESSION INTERFACE ***/ - -// Decompressor is the ReadSeeker implementation for decompression -type Decompressor struct { - cursorPos *int64 // The current location we have seeked to - blockStarts []int64 // The start of each block. These will be recovered from the block sizes - numBlocks uint32 // Number of blocks - decompressedSize int64 // Decompressed size of the file. - in io.ReadSeeker // Input - c *Compression // Compression options -} - -// Parses block data. Returns the number of blocks, the block start locations for each block, and the decompressed size of the entire file. -func parseBlockData(blockData []uint32, BlockSize uint32) (numBlocks uint32, blockStarts []int64, decompressedSize int64) { - // Parse the block data - blockDataLen := len(blockData) - numBlocks = uint32(blockDataLen - 1) - if DEBUG { - log.Printf("%v\n", blockData) - log.Printf("metadata len, numblocks = %d, %d", blockDataLen, numBlocks) - } - blockStarts = make([]int64, numBlocks+1) // Starts with start of first block (and end of header), ends with end of last block - currentBlockPosition := int64(0) - for i := uint32(0); i < numBlocks; i++ { // Loop through block data, getting starts of blocks. - currentBlockSize := blockData[i] - currentBlockPosition += int64(currentBlockSize) - blockStarts[i] = currentBlockPosition - } - blockStarts[numBlocks] = currentBlockPosition // End of last block - - //log.Printf("Block Starts: %v\n", d.blockStarts) - - numBlocks-- // Subtract 1 from number of blocks because our header technically isn't a block - - // Get uncompressed size of last block and derive uncompressed size of file - lastBlockRawSize := blockData[blockDataLen-1] - decompressedSize = int64(numBlocks-1)*int64(BlockSize) + int64(lastBlockRawSize) - if DEBUG { - log.Printf("Decompressed size = %d", decompressedSize) - } - - return numBlocks, blockStarts, decompressedSize -} - -// Initializes decompressor with the block data specified. -func (d *Decompressor) initWithBlockData(c *Compression, in io.ReadSeeker, size int64, blockData []uint32) (err error) { - // Copy over compression object - d.c = c - - // Initialize cursor position - d.cursorPos = new(int64) - - // Parse the block data - d.numBlocks, d.blockStarts, d.decompressedSize = parseBlockData(blockData, d.c.BlockSize) - - // Initialize cursor position value and copy over reader - *d.cursorPos = 0 - _, err = in.Seek(0, io.SeekStart) - d.in = in - - return err -} - -// Read reads data using a decompressor -func (d Decompressor) Read(p []byte) (int, error) { - if DEBUG { - log.Printf("Cursor pos before: %d\n", *d.cursorPos) - } - // Check if we're at the end of the file or before the beginning of the file - if *d.cursorPos >= d.decompressedSize || *d.cursorPos < 0 { - if DEBUG { - log.Println("Out of bounds EOF") - } - return 0, io.EOF - } - - // Get block range to read - blockNumber := *d.cursorPos / int64(d.c.BlockSize) - blockStart := d.blockStarts[blockNumber] // Start position of blocks to read - dataOffset := *d.cursorPos % int64(d.c.BlockSize) // Offset of data to read in blocks to read - bytesToRead := len(p) // Number of bytes to read - blocksToRead := (int64(bytesToRead)+dataOffset)/int64(d.c.BlockSize) + 1 // Number of blocks to read - returnEOF := false - if blockNumber+blocksToRead > int64(d.numBlocks) { // Overflowed the last block - blocksToRead = int64(d.numBlocks) - blockNumber - returnEOF = true - } - blockEnd := d.blockStarts[blockNumber+blocksToRead] // Start of the block after the last block we want to get is the end of the last block we want to get - blockLen := blockEnd - blockStart - - // Read compressed block range into buffer - var compressedBlocks bytes.Buffer - _, err := d.in.Seek(blockStart, io.SeekStart) - if err != nil { - return 0, err - } - n1, err := io.CopyN(&compressedBlocks, d.in, blockLen) - if DEBUG { - log.Printf("block # = %d @ %d <- %d, len %d, copied %d bytes", blockNumber, blockStart, *d.cursorPos, blockLen, n1) - } - if err != nil { - if DEBUG { - log.Println("Copy Error") - } - return 0, err - } - - // Decompress block range - var b bytes.Buffer - n, err := d.decompressBlockRangeMultithreaded(&compressedBlocks, &b, uint32(blockNumber)) - if err != nil { - log.Println("Decompression error") - return n, err - } - - // Calculate bytes read - readOverflow := *d.cursorPos + int64(bytesToRead) - d.decompressedSize - if readOverflow < 0 { - readOverflow = 0 - } - bytesRead := int64(bytesToRead) - readOverflow - if DEBUG { - log.Printf("Read offset = %d, overflow = %d", dataOffset, readOverflow) - log.Printf("Decompressed %d bytes; read %d out of %d bytes\n", n, bytesRead, bytesToRead) - // log.Printf("%v", b.Bytes()) - } - - // If we read 0 bytes, we reached the end of the file - if bytesRead == 0 { - log.Println("EOF") - return 0, io.EOF - } - - // Copy from buffer+offset to p - _, err = io.CopyN(ioutil.Discard, &b, dataOffset) - if err != nil { - return 0, err - } - n, err = b.Read(p) // Note: everything after bytesToRead bytes will be discarded; we are returning bytesToRead instead of n - if err != nil { - return n, err - } - - // Increment cursor position and return - *d.cursorPos += bytesRead - if returnEOF { - if DEBUG { - log.Println("EOF") - } - return int(bytesRead), io.EOF - } - return int(bytesRead), nil -} - -// Seek seeks to a location in compressed stream -func (d Decompressor) Seek(offset int64, whence int) (int64, error) { - // Seek to offset in cursorPos - if whence == io.SeekStart { - *d.cursorPos = offset - } else if whence == io.SeekCurrent { - *d.cursorPos += offset - } else if whence == io.SeekEnd { - *d.cursorPos = d.decompressedSize + offset - } - - // Return - return offset, nil -} - -// DecompressFileExtData decompresses a file using external block data. Argument "size" is very useful here. -func (c *Compression) DecompressFileExtData(in io.ReadSeeker, size int64, blockData []uint32) (FileHandle io.ReadSeeker, decompressedSize int64, err error) { - var decompressor Decompressor - err = decompressor.initWithBlockData(c, in, size, blockData) - return decompressor, decompressor.decompressedSize, err -} diff --git a/backend/press/compression_test.go b/backend/press/compression_test.go deleted file mode 100644 index ff625985c..000000000 --- a/backend/press/compression_test.go +++ /dev/null @@ -1,131 +0,0 @@ -package press - -import ( - "bufio" - "bytes" - "crypto/md5" - "encoding/base64" - "io" - "io/ioutil" - "math/rand" - "os" - "strings" - "testing" -) - -const TestStringSmall = "The quick brown fox jumps over the lazy dog." -const TestSizeLarge = 2097152 // 2 megabytes - -// Tests compression and decompression for a preset -func testCompressDecompress(t *testing.T, preset string, testString string) { - // Create compression instance - comp, err := NewCompressionPreset(preset) - if err != nil { - t.Fatal(err) - } - - // Open files and hashers - testFile := strings.NewReader(testString) - testFileHasher := md5.New() - if err != nil { - t.Fatal(err) - } - compressedFile, err := ioutil.TempFile(os.TempDir(), "rclone_compression_test") - if err != nil { - t.Fatal(err) - } - outHasher := md5.New() - - // Compress file and hash it (size doesn't matter here) - testFileReader, testFileWriter := io.Pipe() - go func() { - _, err := io.Copy(io.MultiWriter(testFileHasher, testFileWriter), testFile) - if err != nil { - t.Fatal("Failed to write compressed file") - } - err = testFileWriter.Close() - if err != nil { - t.Log("Failed to close compressed file") - } - }() - var blockData []uint32 - blockData, err = comp.CompressFileReturningBlockData(testFileReader, compressedFile) - if err != nil { - t.Fatalf("Compression failed with error: %v", err) - } - testFileHash := testFileHasher.Sum(nil) - - // Get the size, seek to the beginning of the compressed file - size, err := compressedFile.Seek(0, io.SeekEnd) - if err != nil { - t.Fatal(err) - } - _, err = compressedFile.Seek(0, io.SeekStart) - if err != nil { - t.Fatal(err) - } - t.Logf("Compressed size: %d\n", size) - - // Decompress file into a hasher - var FileHandle io.ReadSeeker - var decompressedSize int64 - FileHandle, decompressedSize, err = comp.DecompressFileExtData(compressedFile, size, blockData) - if err != nil { - t.Fatal(err) - } - t.Logf("Decompressed size: %d\n", decompressedSize) - bufr := bufio.NewReaderSize(FileHandle, 12345678) - _, err = io.Copy(outHasher, bufr) - if err != nil && err != io.EOF { - t.Fatal(err) - } - decompressedFileHash := outHasher.Sum(nil) - - // Clean up - err = compressedFile.Close() - if err != nil { - t.Log("Warning: cannot close compressed test file") - } - err = os.Remove(compressedFile.Name()) - if err != nil { - t.Log("Warning: cannot remove compressed test file") - } - - // Compare hashes - if !bytes.Equal(testFileHash, decompressedFileHash) { - t.Logf("Hash of original file: %x\n", testFileHash) - t.Logf("Hash of recovered file: %x\n", decompressedFileHash) - t.Fatal("Hashes do not match!") - } -} - -// Tests both small and large strings for a preset -func testSmallLarge(t *testing.T, preset string) { - testStringLarge := getCompressibleString(TestSizeLarge) - t.Run("TestSmall", func(t *testing.T) { - testCompressDecompress(t, preset, TestStringSmall) - }) - t.Run("TestLarge", func(t *testing.T) { - testCompressDecompress(t, preset, testStringLarge) - }) -} - -// Gets a compressible string -func getCompressibleString(size int) string { - // Get pseudorandom bytes - prbytes := make([]byte, size*3/4+16) - prsource := rand.New(rand.NewSource(0)) - prsource.Read(prbytes) - // Encode in base64 - encoding := base64.NewEncoding("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789+/") - return encoding.EncodeToString(prbytes)[:size] -} - -func TestCompression(t *testing.T) { - testCases := []string{"lz4", "gzip", "xz"} - for _, tc := range testCases { - t.Run(tc, func(t *testing.T) { - testSmallLarge(t, tc) - }) - } -} diff --git a/backend/press/press.go b/backend/press/press.go index 4045ebab4..3c42fbaf2 100644 --- a/backend/press/press.go +++ b/backend/press/press.go @@ -13,9 +13,11 @@ import ( "fmt" "io" "io/ioutil" + "regexp" "strings" "time" + "github.com/buengese/sgzip" "github.com/gabriel-vasile/mimetype" "github.com/pkg/errors" @@ -27,23 +29,36 @@ import ( "github.com/rclone/rclone/fs/fspath" "github.com/rclone/rclone/fs/hash" "github.com/rclone/rclone/fs/operations" - // Used for Rcat ) // Globals +const ( + initialChunkSize = 262144 // Initial and max sizes of chunks when reading parts of the file. Currently + maxChunkSize = 8388608 // at 256KB and 8 MB. + + bufferSize = 8388608 + heuristicBytes = 1048576 + + metaFileExt = ".meta" + uncompressedFileExt = ".bin" +) + +// Compression modes +const ( + Uncompressed = 0 + Gzip = 2 +) + +var unconpressibleRegexp = regexp.MustCompile("(^(video|image|audio)/.*)|(^.*?/(x-7z-compressed|zip|gzip|x-rar-compressed|zstd|x-xz|lzip|warc))") + // Register with Fs func init() { - // Build compression mode options. Show XZ options only if they're supported on the current system. - compressionModeOptions := []fs.OptionExample{{ // Default compression mode options - Value: "lz4", - Help: "Fast, real-time compression with reasonable compression ratios.", - }, { - Value: "gzip", - Help: "Standard gzip compression with fastest parameters.", - }, { - Value: "xz", - Help: "Standard xz compression with fastest parameters.", - }, + // Build compression mode options. + compressionModeOptions := []fs.OptionExample{ + { // Default compression mode options { + Value: "gzip", + Help: "Standard gzip compression with fastest parameters.", + }, } // Register our remote @@ -60,24 +75,33 @@ func init() { Help: "Compression mode.", Default: "gzip", Examples: compressionModeOptions, + }, { + Name: "compression_level", + Help: "gzip compression level -2 to 9", + Default: sgzip.DefaultCompression, + Advanced: true, }}, }) } -// Constants -const bufferSize = 8388608 // Size of buffer when compressing or decompressing the entire file. -// Larger size means more multithreading with larger block sizes and thread counts. -// Currently at 8MB. -const initialChunkSize = 262144 // Initial and max sizes of chunks when reading parts of the file. Currently -const maxChunkSize = 8388608 // at 256KB and 8 MB. +// Options defines the configuration for this backend +type Options struct { + Remote string `config:"remote"` + CompressionMode string `config:"compression_mode"` + CompressionLevel int `config:"compression_level"` +} -const metaFileExt = ".meta" -const uncompressedFileExt = ".bin" +/*** FILESYSTEM FUNCTIONS ***/ -// newCompressionForConfig constructs a Compression object for the given config name -func newCompressionForConfig(opt *Options) (*Compression, error) { - c, err := NewCompressionPreset(opt.CompressionMode) - return c, err +// Fs represents a wrapped fs.Fs +type Fs struct { + fs.Fs + wrapper fs.Fs + name string + root string + opt Options + mode int // compression mode id + features *fs.Features // optional features } // NewFs contstructs an Fs from the path, container:path @@ -99,16 +123,12 @@ func NewFs(name, rpath string, m configmap.Mapper) (fs.Fs, error) { return nil, errors.Wrapf(err, "failed to parse remote %q to wrap", remote) } - c, err := newCompressionForConfig(opt) - if err != nil { - return nil, err - } // Strip trailing slashes if they exist in rpath rpath = strings.TrimRight(rpath, "\\/") // First, check for a file // If a metadata file was found, return an error. Otherwise, check for a directory - remotePath := fspath.JoinRootPath(wPath, generateMetadataName(rpath)) + remotePath := fspath.JoinRootPath(wPath, makeMetadataName(rpath)) wrappedFs, err := wInfo.NewFs(wName, remotePath, wConfig) if err != fs.ErrorIsFile { remotePath = fspath.JoinRootPath(wPath, rpath) @@ -124,7 +144,7 @@ func NewFs(name, rpath string, m configmap.Mapper) (fs.Fs, error) { name: name, root: rpath, opt: *opt, - c: c, + mode: compressionModeFromName(opt.CompressionMode), } // the features here are ones we could support, and they are // ANDed with the ones from wrappedFs @@ -140,6 +160,7 @@ func NewFs(name, rpath string, m configmap.Mapper) (fs.Fs, error) { }).Fill(f).Mask(wrappedFs).WrapsFs(f, wrappedFs) // We support reading MIME types no matter the wrapped fs f.features.ReadMimeType = true + // We can only support putstream if we have serverside copy or move if wrappedFs.Features().Move == nil && wrappedFs.Features().Copy == nil { f.features.PutStream = nil } @@ -147,6 +168,15 @@ func NewFs(name, rpath string, m configmap.Mapper) (fs.Fs, error) { return f, err } +func compressionModeFromName(name string) int { + switch name { + case "gzip": + return Gzip + default: + return Uncompressed + } +} + // Converts an int64 to hex func int64ToHex(number int64) string { intBytes := make([]byte, 8) @@ -192,7 +222,7 @@ func processFileName(compressedFileName string) (origFileName string, extension } // Generates the file name for a metadata file -func generateMetadataName(remote string) (newRemote string) { +func makeMetadataName(remote string) (newRemote string) { return remote + metaFileExt } @@ -202,43 +232,20 @@ func isMetadataFile(filename string) bool { } // Generates the file name for a data file -func (c *Compression) generateDataName(remote string, size int64, compressed bool) (newRemote string) { - if compressed { - newRemote = remote + int64ToHex(size) + c.GetFileExtension() +func makeDataName(remote string, size int64, mode int) (newRemote string) { + if mode > 0 { + newRemote = remote + int64ToHex(size) + ".gz" } else { newRemote = remote + uncompressedFileExt } return newRemote } -// Generates the file name from a compression mode -func generateDataNameFromCompressionMode(remote string, size int64, mode int) (newRemote string) { - if mode != Uncompressed { - c, _ := NewCompressionPresetNumber(mode) - newRemote = c.generateDataName(remote, size, true) - } else { - newRemote = remote + uncompressedFileExt +func (f *Fs) dataName(remote string, size int64, compressed bool) (name string) { + if !compressed { + return makeDataName(remote, size, Uncompressed) } - return newRemote -} - -// Options defines the configuration for this backend -type Options struct { - Remote string `config:"remote"` - CompressionMode string `config:"compression_mode"` -} - -/*** FILESYSTEM FUNCTIONS ***/ - -// Fs represents a wrapped fs.Fs -type Fs struct { - fs.Fs - wrapper fs.Fs - name string - root string - opt Options - features *fs.Features // optional features - c *Compression + return makeDataName(remote, size, f.mode) } // Get an Object from a data DirEntry @@ -251,7 +258,7 @@ func (f *Fs) addData(entries *fs.DirEntries, o fs.Object) { if size == -2 { // File is uncompressed size = o.Size() } - metaName := generateMetadataName(origFileName) + metaName := makeMetadataName(origFileName) *entries = append(*entries, f.newObjectSizeAndNameOnly(o, metaName, size)) } @@ -271,9 +278,6 @@ func (f *Fs) processEntries(entries fs.DirEntries) (newEntries fs.DirEntries, er for _, entry := range entries { switch x := entry.(type) { case fs.Object: - // if isMetadataFile(x.Remote()) { - // f.addMeta(&newEntries, x) // Only care about metadata files; non-metadata files are redundant. - // } if !isMetadataFile(x.Remote()) { f.addData(&newEntries, x) // Only care about data files for now; metadata files are redundant. } @@ -333,7 +337,7 @@ func (f *Fs) ListR(ctx context.Context, dir string, callback fs.ListRCallback) ( // NewObject finds the Object at remote. func (f *Fs) NewObject(ctx context.Context, remote string) (fs.Object, error) { // Read metadata from metadata object - mo, err := f.Fs.NewObject(ctx, generateMetadataName(remote)) + mo, err := f.Fs.NewObject(ctx, makeMetadataName(remote)) if err != nil { return nil, err } @@ -342,34 +346,32 @@ func (f *Fs) NewObject(ctx context.Context, remote string) (fs.Object, error) { return nil, errors.New("error decoding metadata") } // Create our Object - o, err := f.Fs.NewObject(ctx, generateDataNameFromCompressionMode(remote, meta.Size, meta.CompressionMode)) + o, err := f.Fs.NewObject(ctx, makeDataName(remote, meta.Size, meta.Mode)) return f.newObject(o, mo, meta), err } -// Checks the compressibility and mime type of a file. Returns a rewinded reader, whether the file is compressible, and an error code. -func (c *Compression) checkFileCompressibilityAndType(in io.Reader) (newReader io.Reader, compressible bool, mimeType string, err error) { - // Unwrap accounting, get compressibility of file, rewind reader, then wrap accounting back on +// findMimeType attempts to find the mime type of the object so we can determine compressibility +// returns a multireader with the bytes that were read to determine mime type +func findMimeType(in io.Reader) (newReader io.Reader, mimeType string, err error) { in, wrap := accounting.UnWrap(in) var b bytes.Buffer - _, err = io.CopyN(&b, in, c.HeuristicBytes) + _, err = io.CopyN(&b, in, heuristicBytes) if err != nil && err != io.EOF { - return nil, false, "", err - } - compressible, _, err = c.GetFileCompressionInfo(bytes.NewReader(b.Bytes())) - if err != nil { - return nil, false, "", err + return nil, "", err } mime := mimetype.Detect(b.Bytes()) in = io.MultiReader(bytes.NewReader(b.Bytes()), in) - in = wrap(in) - return in, compressible, mime.String(), nil + return wrap(in), mime.String(), nil +} + +func isCompressible(mime string) bool { + return !unconpressibleRegexp.MatchString(mime) } // Verifies an object hash -func (f *Fs) verifyObjectHash(ctx context.Context, o fs.Object, hasher *hash.MultiHasher, ht hash.Type) (err error) { +func (f *Fs) verifyObjectHash(ctx context.Context, o fs.Object, hasher *hash.MultiHasher, ht hash.Type) error { srcHash := hasher.Sums()[ht] - var dstHash string - dstHash, err = o.Hash(ctx, ht) + dstHash, err := o.Hash(ctx, ht) if err != nil { return errors.Wrap(err, "failed to read destination hash") } @@ -386,9 +388,9 @@ func (f *Fs) verifyObjectHash(ctx context.Context, o fs.Object, hasher *hash.Mul type putFn func(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) -type blockDataAndError struct { - err error - blockData []uint32 +type compressionResult struct { + err error + meta sgzip.GzipMetadata } // Put a compressed version of a file. Returns a wrappable object and metadata. @@ -401,21 +403,29 @@ func (f *Fs) putCompress(ctx context.Context, in io.Reader, src fs.ObjectInfo, o in = io.TeeReader(in, metaHasher) // Compress the file - var wrappedIn io.Reader pipeReader, pipeWriter := io.Pipe() - compressionResult := make(chan blockDataAndError) + results := make(chan compressionResult) go func() { - blockData, err := f.c.CompressFileReturningBlockData(in, pipeWriter) - closeErr := pipeWriter.Close() - if closeErr != nil { - fs.Errorf(nil, "Failed to close compression pipe: %v", err) + gz, err := sgzip.NewWriterLevel(pipeWriter, f.opt.CompressionLevel) + if err != nil { + results <- compressionResult{err: err, meta: sgzip.GzipMetadata{}} + return + } + _, err = io.Copy(gz, in) + gzErr := gz.Close() + if gzErr != nil { + fs.Errorf(nil, "Failed to close compress: %v", gzErr) if err == nil { - err = closeErr + err = gzErr } } - compressionResult <- blockDataAndError{err: err, blockData: blockData} + closeErr := pipeWriter.Close() + if closeErr != nil { + fs.Errorf(nil, "Failed to close pipe: %v", closeErr) + } + results <- compressionResult{err: err, meta: gz.MetaData()} }() - wrappedIn = wrap(bufio.NewReaderSize(pipeReader, bufferSize)) // Bufio required for multithreading + wrappedIn := wrap(bufio.NewReaderSize(pipeReader, bufferSize)) // Probably no longer needed as sgzip has it's own buffering // If verifyCompressedObject is on, find a hash the destination supports to compute a hash of // the compressed data. @@ -435,8 +445,7 @@ func (f *Fs) putCompress(ctx context.Context, in io.Reader, src fs.ObjectInfo, o } // Transfer the data - //o, err := put(ctx, wrappedIn, f.wrapInfo(src, f.c.generateDataName(src.Remote(), src.Size(), true), src.Size()), options...) - o, err := operations.Rcat(ctx, f.Fs, f.c.generateDataName(src.Remote(), src.Size(), true), ioutil.NopCloser(wrappedIn), src.ModTime(ctx)) + o, err := operations.Rcat(ctx, f.Fs, makeDataName(src.Remote(), src.Size(), f.mode), ioutil.NopCloser(wrappedIn), src.ModTime(ctx)) if err != nil { if o != nil { removeErr := o.Remove(ctx) @@ -447,7 +456,7 @@ func (f *Fs) putCompress(ctx context.Context, in io.Reader, src fs.ObjectInfo, o return nil, nil, err } // Check whether we got an error during compression - result := <-compressionResult + result := <-results err = result.err if err != nil { if o != nil { @@ -460,13 +469,11 @@ func (f *Fs) putCompress(ctx context.Context, in io.Reader, src fs.ObjectInfo, o } // Generate metadata - blockData := result.blockData - _, _, decompressedSize := parseBlockData(blockData, f.c.BlockSize) - meta := newMetadata(decompressedSize, f.c.CompressionMode, blockData, metaHasher.Sum(nil), mimeType) + meta := newMetadata(result.meta.Size, f.mode, result.meta, metaHasher.Sum(nil), mimeType) // Check the hashes of the compressed data if we were comparing them if ht != hash.None && hasher != nil { - err := f.verifyObjectHash(ctx, o, hasher, ht) + err = f.verifyObjectHash(ctx, o, hasher, ht) if err != nil { return nil, nil, err } @@ -479,28 +486,23 @@ func (f *Fs) putCompress(ctx context.Context, in io.Reader, src fs.ObjectInfo, o func (f *Fs) putUncompress(ctx context.Context, in io.Reader, src fs.ObjectInfo, options []fs.OpenOption, put putFn, mimeType string, verifyCompressedObject bool) (fs.Object, *ObjectMetadata, error) { // Unwrap the accounting, add our metadata hasher, then wrap it back on in, wrap := accounting.UnWrap(in) - metaHasher := md5.New() + + hs := hash.NewHashSet(hash.MD5) + ht := f.Fs.Hashes().GetOne() + if verifyCompressedObject { + if !hs.Contains(ht) { + hs.Add(ht) + } + } + metaHasher, err := hash.NewMultiHasherTypes(hs) + if err != nil { + return nil, nil, err + } in = io.TeeReader(in, metaHasher) wrappedIn := wrap(in) - // If verifyCompressedObject is on, find a hash the destination supports to compute a hash of - // the compressed data. - ht := f.Fs.Hashes().GetOne() - var hasher *hash.MultiHasher - var err error - if ht != hash.None && verifyCompressedObject { - // unwrap the accounting again - wrappedIn, wrap = accounting.UnWrap(wrappedIn) - hasher, err = hash.NewMultiHasherTypes(hash.NewHashSet(ht)) - if err != nil { - return nil, nil, err - } - // add the hasher and re-wrap the accounting - wrappedIn = io.TeeReader(wrappedIn, hasher) - wrappedIn = wrap(wrappedIn) - } + // Put the object - o, err := put(ctx, wrappedIn, f.wrapInfo(src, f.c.generateDataName(src.Remote(), src.Size(), false), src.Size()), options...) - //o, err := operations.Rcat(f, f.c.generateDataName(src.Remote(), src.Size(), false), wrappedIn, src.ModTime()) + o, err := put(ctx, wrappedIn, f.wrapInfo(src, makeDataName(src.Remote(), src.Size(), Uncompressed), src.Size()), options...) if err != nil { if o != nil { removeErr := o.Remove(ctx) @@ -511,14 +513,19 @@ func (f *Fs) putUncompress(ctx context.Context, in io.Reader, src fs.ObjectInfo, return nil, nil, err } // Check the hashes of the compressed data if we were comparing them - if ht != hash.None && hasher != nil { - err := f.verifyObjectHash(ctx, o, hasher, ht) + if ht != hash.None && verifyCompressedObject { + err := f.verifyObjectHash(ctx, o, metaHasher, ht) if err != nil { return nil, nil, err } } + // Return our object and metadata - return o, newMetadata(o.Size(), Uncompressed, []uint32{}, metaHasher.Sum([]byte{}), mimeType), nil + sum, err := metaHasher.Sum(hash.MD5) + if err != nil { + return nil, nil, err + } + return o, newMetadata(o.Size(), Uncompressed, sgzip.GzipMetadata{}, sum, mimeType), nil } // This function will write a metadata struct to a metadata Object for an src. Returns a wrappable metadata object. @@ -538,7 +545,7 @@ func (f *Fs) putMetadata(ctx context.Context, meta *ObjectMetadata, src fs.Objec metaReader := bytes.NewReader(b.Bytes()) // Put the data - mo, err = put(ctx, metaReader, f.wrapInfo(src, generateMetadataName(src.Remote()), int64(b.Len())), options...) + mo, err = put(ctx, metaReader, f.wrapInfo(src, makeMetadataName(src.Remote()), int64(b.Len())), options...) if err != nil { removeErr := mo.Remove(ctx) if removeErr != nil { @@ -592,11 +599,11 @@ func (f *Fs) Put(ctx context.Context, in io.Reader, src fs.ObjectInfo, options . o, err := f.NewObject(ctx, src.Remote()) if err == fs.ErrorObjectNotFound { // Get our file compressibility - in, compressible, mimeType, err := f.c.checkFileCompressibilityAndType(in) + in, mimeType, err := findMimeType(in) if err != nil { return nil, err } - return f.putWithCustomFunctions(ctx, in, src, options, f.Fs.Put, f.Fs.Put, compressible, mimeType, true) + return f.putWithCustomFunctions(ctx, in, src, options, f.Fs.Put, f.Fs.Put, isCompressible(mimeType), mimeType, true) } if err != nil { return nil, err @@ -612,10 +619,11 @@ func (f *Fs) PutStream(ctx context.Context, in io.Reader, src fs.ObjectInfo, opt } found := err == nil - in, compressible, mimeType, err := f.c.checkFileCompressibilityAndType(in) + in, mimeType, err := findMimeType(in) if err != nil { return nil, err } + compressible := isCompressible(mimeType) newObj, err := f.putWithCustomFunctions(ctx, in, src, options, f.Fs.Features().PutStream, f.Fs.Put, compressible, mimeType, true) if err != nil { return nil, err @@ -633,16 +641,18 @@ func (f *Fs) PutStream(ctx context.Context, in io.Reader, src fs.ObjectInfo, opt moveFs, ok := f.Fs.(fs.Mover) var wrapObj fs.Object if ok { - wrapObj, err = moveFs.Move(ctx, newObj.Object, f.c.generateDataName(src.Remote(), newObj.size, compressible)) + wrapObj, err = moveFs.Move(ctx, newObj.Object, f.dataName(src.Remote(), newObj.size, compressible)) if err != nil { return nil, errors.Wrap(err, "Couldn't rename streamed object.") } + newObj.Object = wrapObj + return newObj, nil } // If we don't have move we'll need to resort to serverside copy and remove copyFs, ok := f.Fs.(fs.Copier) if ok { - wrapObj, err := copyFs.Copy(ctx, newObj.Object, f.c.generateDataName(src.Remote(), newObj.size, compressible)) + wrapObj, err := copyFs.Copy(ctx, newObj.Object, f.dataName(src.Remote(), newObj.size, compressible)) if err != nil { return nil, errors.Wrap(err, "Could't copy streamed object.") } @@ -652,9 +662,7 @@ func (f *Fs) PutStream(ctx context.Context, in io.Reader, src fs.ObjectInfo, opt return wrapObj, errors.Wrap(err, "Couldn't remove original streamed object. Remote may be in an incositent state.") } } - newObj.Object = wrapObj - return newObj, nil } @@ -665,15 +673,6 @@ func (f *Fs) PutStream(ctx context.Context, in io.Reader, src fs.ObjectInfo, opt // // This will create a duplicate if we upload a new file without // checking to see if there is one already - use Put() for that. -/*func (f *Fs) PutUnchecked(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) { - // If PutUnchecked is supported, do it. - // I'm unsure about this. With the current metadata model this might actually break things. Needs some manual testing. - do := f.Fs.Features().PutUnchecked - if do == nil { - return nil, errors.New("can't PutUnchecked") - } - return f.putWithCustomFunctions(ctx, in, src, options, do, do, false) -}*/ // Hashes returns the supported hash sets. func (f *Fs) Hashes() hash.Set { @@ -744,13 +743,13 @@ func (f *Fs) Copy(ctx context.Context, src fs.Object, remote string) (fs.Object, if err != nil { return nil, err } - newFilename := generateMetadataName(remote) + newFilename := makeMetadataName(remote) moResult, err := do(ctx, o.mo, newFilename) if err != nil { return nil, err } // Copy over data - newFilename = generateDataNameFromCompressionMode(remote, src.Size(), o.meta.CompressionMode) + newFilename = makeDataName(remote, src.Size(), o.meta.Mode) oResult, err := do(ctx, o.Object, newFilename) if err != nil { return nil, err @@ -794,14 +793,14 @@ func (f *Fs) Move(ctx context.Context, src fs.Object, remote string) (fs.Object, if err != nil { return nil, err } - newFilename := generateMetadataName(remote) + newFilename := makeMetadataName(remote) moResult, err := do(ctx, o.mo, newFilename) if err != nil { return nil, err } // Move data - newFilename = generateDataNameFromCompressionMode(remote, src.Size(), o.meta.CompressionMode) + newFilename = makeDataName(remote, src.Size(), o.meta.Mode) oResult, err := do(ctx, o.Object, newFilename) if err != nil { return nil, err @@ -908,7 +907,7 @@ func (f *Fs) ChangeNotify(ctx context.Context, notifyFunc func(string, fs.EntryT case fs.EntryObject: // Note: All we really need to do to monitor the object is to check whether the metadata changed, // as the metadata contains the hash. This will work unless there's a hash collision and the sizes stay the same. - wrappedPath = generateMetadataName(path) + wrappedPath = makeMetadataName(path) default: fs.Errorf(path, "press ChangeNotify: ignoring unknown EntryType %d", entryType) return @@ -936,11 +935,11 @@ func (f *Fs) PublicLink(ctx context.Context, remote string, duration fs.Duration // ObjectMetadata describes the metadata for an Object. type ObjectMetadata struct { - Size int64 // Uncompressed size of the file. - CompressionMode int // Compression mode of the file. - BlockData []uint32 // Block indexing data for the file. - Hash []byte // MD5 hash of the file. - MimeType string // Mime type of the file + Size int64 // Uncompressed size of the file. + Mode int // Compression mode of the file. + Hash []byte // MD5 hash of the file. + MimeType string // Mime type of the file + CompressionMetadata sgzip.GzipMetadata } // Object with external metadata @@ -954,11 +953,11 @@ type Object struct { } // This function generates a metadata object -func newMetadata(size int64, compressionMode int, blockData []uint32, hash []byte, mimeType string) *ObjectMetadata { +func newMetadata(size int64, mode int, cmeta sgzip.GzipMetadata, hash []byte, mimeType string) *ObjectMetadata { meta := new(ObjectMetadata) meta.Size = size - meta.CompressionMode = compressionMode - meta.BlockData = blockData + meta.Mode = mode + meta.CompressionMetadata = cmeta meta.Hash = hash meta.MimeType = mimeType return meta @@ -1042,35 +1041,20 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op return o.mo, o.mo.Update(ctx, in, src, options...) } - // Get our file compressibility - in, compressible, mimeType, err := o.f.c.checkFileCompressibilityAndType(in) + in, mimeType, err := findMimeType(in) if err != nil { return err } + compressible := isCompressible(mimeType) - // Since we're encoding the original filesize in the name we'll need to make sure that this name is updated before the actual update + // Since we are storing the filesize in the name the new object may have different name than the old + // We'll make sure to delete the old object in this case var newObject *Object origName := o.Remote() - if o.meta.CompressionMode != Uncompressed || compressible { - // If we aren't, we must either move-then-update or reupload-then-remove the object, and update the metadata. - // Check if this FS supports moving - moveFs, ok := o.f.Fs.(fs.Mover) - if ok { // If this fs supports moving, use move-then-update. This may help keep some versioning alive. - // First, move the object - var movedObject fs.Object - movedObject, err = moveFs.Move(ctx, o.Object, o.f.c.generateDataName(o.Remote(), src.Size(), compressible)) - if err != nil { - return err - } - // Create function that updates moved object, then update - update := func(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) { - return movedObject, movedObject.Update(ctx, in, src, options...) - } - newObject, err = o.f.putWithCustomFunctions(ctx, in, src, options, update, updateMeta, compressible, mimeType, true) - } else { // If this fs does not support moving, fall back to reuploading the object then removing the old one. - newObject, err = o.f.putWithCustomFunctions(ctx, in, o.f.wrapInfo(src, origName, src.Size()), options, o.f.Fs.Put, updateMeta, compressible, mimeType, true) - removeErr := o.Object.Remove(ctx) // Note: We must do remove later so a failed update doesn't destroy data. - if removeErr != nil { + if o.meta.Mode != Uncompressed || compressible { + newObject, err = o.f.putWithCustomFunctions(ctx, in, o.f.wrapInfo(src, origName, src.Size()), options, o.f.Fs.Put, updateMeta, compressible, mimeType, true) + if newObject.Object.Remote() != o.Object.Remote() { + if removeErr := o.Object.Remove(ctx); removeErr != nil { return removeErr } } @@ -1220,7 +1204,7 @@ func (o *Object) Open(ctx context.Context, options ...fs.OpenOption) (rc io.Read return nil, err } // If we're uncompressed, just pass this to the underlying object - if o.meta.CompressionMode == Uncompressed { + if o.meta.Mode == Uncompressed { return o.Object.Open(ctx, options...) } // Get offset and limit from OpenOptions, pass the rest to the underlying remote @@ -1239,27 +1223,21 @@ func (o *Object) Open(ctx context.Context, options ...fs.OpenOption) (rc io.Read // Get a chunkedreader for the wrapped object chunkedReader := chunkedreader.New(ctx, o.Object, initialChunkSize, maxChunkSize) // Get file handle - c, err := NewCompressionPresetNumber(o.meta.CompressionMode) + var file io.Reader + if offset != 0 { + file, err = sgzip.NewReaderAt(chunkedReader, &o.meta.CompressionMetadata, offset) + } else { + file, err = sgzip.NewReader(chunkedReader) + } if err != nil { return nil, err } - FileHandle, _, err := c.DecompressFileExtData(chunkedReader, o.Object.Size(), o.meta.BlockData) - if err != nil { - return nil, err - } - // Seek and limit according to the options given - // Note: This if statement is not required anymore because all 0-size files will be uncompressed. I'm leaving this here just in case I come back here debugging. - if offset != 0 { // Note: this if statement is only required because seeking to 0 on a 0-size file makes chunkedReader complain about an "invalid seek position". - _, err = FileHandle.Seek(offset, io.SeekStart) - if err != nil { - return nil, err - } - } + var fileReader io.Reader if limit != -1 { - fileReader = io.LimitReader(FileHandle, limit) + fileReader = io.LimitReader(file, limit) } else { - fileReader = FileHandle + fileReader = file } // Return a ReadCloser return combineReaderAndCloser(fileReader, chunkedReader), nil diff --git a/backend/press/press_test.go b/backend/press/press_test.go index 1f40d0498..eb0fdc373 100644 --- a/backend/press/press_test.go +++ b/backend/press/press_test.go @@ -6,6 +6,7 @@ import ( "path/filepath" "testing" + _ "github.com/rclone/rclone/backend/dropbox" _ "github.com/rclone/rclone/backend/local" "github.com/rclone/rclone/fstest" "github.com/rclone/rclone/fstest/fstests" @@ -35,37 +36,6 @@ func TestIntegration(t *testing.T) { }) } -// TestRemoteLz4 tests LZ4 compression -func TestRemoteLz4(t *testing.T) { - if *fstest.RemoteName != "" { - t.Skip("Skipping as -remote set") - } - tempdir := filepath.Join(os.TempDir(), "rclone-press-test-lz4") - name := "TestPressLz4" - fstests.Run(t, &fstests.Opt{ - RemoteName: name + ":", - NilObject: (*Object)(nil), - UnimplementableFsMethods: []string{ - "OpenWriterAt", - "MergeDirs", - "DirCacheFlush", - "PutUnchecked", - "PutStream", - "UserInfo", - "Disconnect", - }, - UnimplementableObjectMethods: []string{ - "GetTier", - "SetTier", - }, - ExtraConfig: []fstests.ExtraConfigItem{ - {Name: name, Key: "type", Value: "press"}, - {Name: name, Key: "remote", Value: tempdir}, - {Name: name, Key: "compression_mode", Value: "lz4"}, - }, - }) -} - // TestRemoteGzip tests GZIP compression func TestRemoteGzip(t *testing.T) { if *fstest.RemoteName != "" { @@ -96,34 +66,3 @@ func TestRemoteGzip(t *testing.T) { }, }) } - -// TestRemoteXz tests XZ compression -func TestRemoteXz(t *testing.T) { - if *fstest.RemoteName != "" { - t.Skip("Skipping as -remote set") - } - tempdir := filepath.Join(os.TempDir(), "rclone-press-test-xz") - name := "TestPressXz" - fstests.Run(t, &fstests.Opt{ - RemoteName: name + ":", - NilObject: (*Object)(nil), - UnimplementableFsMethods: []string{ - "OpenWriterAt", - "MergeDirs", - "DirCacheFlush", - "PutUnchecked", - "PutStream", - "UserInfo", - "Disconnect", - }, - UnimplementableObjectMethods: []string{ - "GetTier", - "SetTier", - }, - ExtraConfig: []fstests.ExtraConfigItem{ - {Name: name, Key: "type", Value: "press"}, - {Name: name, Key: "remote", Value: tempdir}, - {Name: name, Key: "compression_mode", Value: "xz"}, - }, - }) -}