feat: add option to disable combining the pending part

Signed-off-by: Libo Huang <huanglibo2010@gmail.com>
pull/3556/head
libo.huang 2021-12-31 14:13:16 +08:00 committed by Libo Huang
parent 77f2180378
commit 117757a5cb
2 changed files with 28 additions and 1 deletions

View File

@ -97,6 +97,7 @@ type DriverParameters struct {
MultipartCopyChunkSize int64
MultipartCopyMaxConcurrency int64
MultipartCopyThresholdSize int64
MultipartCombineSmallPart bool
RootDirectory string
StorageClass string
UserAgent string
@ -146,6 +147,7 @@ type driver struct {
MultipartCopyChunkSize int64
MultipartCopyMaxConcurrency int64
MultipartCopyThresholdSize int64
MultipartCombineSmallPart bool
RootDirectory string
StorageClass string
ObjectACL string
@ -356,6 +358,23 @@ func FromParameters(parameters map[string]interface{}) (*Driver, error) {
return nil, fmt.Errorf("the useDualStack parameter should be a boolean")
}
mutlipartCombineSmallPart := true
combine := parameters["multipartcombinesmallpart"]
switch combine := combine.(type) {
case string:
b, err := strconv.ParseBool(combine)
if err != nil {
return nil, fmt.Errorf("the multipartcombinesmallpart parameter should be a boolean")
}
mutlipartCombineSmallPart = b
case bool:
mutlipartCombineSmallPart = combine
case nil:
// do nothing
default:
return nil, fmt.Errorf("the multipartcombinesmallpart parameter should be a boolean")
}
sessionToken := ""
params := DriverParameters{
@ -373,6 +392,7 @@ func FromParameters(parameters map[string]interface{}) (*Driver, error) {
multipartCopyChunkSize,
multipartCopyMaxConcurrency,
multipartCopyThresholdSize,
mutlipartCombineSmallPart,
fmt.Sprint(rootDirectory),
storageClass,
fmt.Sprint(userAgent),
@ -497,6 +517,7 @@ func New(params DriverParameters) (*Driver, error) {
MultipartCopyChunkSize: params.MultipartCopyChunkSize,
MultipartCopyMaxConcurrency: params.MultipartCopyMaxConcurrency,
MultipartCopyThresholdSize: params.MultipartCopyThresholdSize,
MultipartCombineSmallPart: params.MultipartCombineSmallPart,
RootDirectory: params.RootDirectory,
StorageClass: params.StorageClass,
ObjectACL: params.ObjectACL,
@ -1386,7 +1407,7 @@ func (w *writer) flushPart() error {
// nothing to write
return nil
}
if len(w.pendingPart) < int(w.driver.ChunkSize) {
if w.driver.MultipartCombineSmallPart && len(w.pendingPart) < int(w.driver.ChunkSize) {
// closing with a small pending part
// combine ready and pending to avoid writing a small part
w.readyPart = append(w.readyPart, w.pendingPart...)

View File

@ -43,6 +43,7 @@ func init() {
regionEndpoint := os.Getenv("REGION_ENDPOINT")
sessionToken := os.Getenv("AWS_SESSION_TOKEN")
useDualStack := os.Getenv("S3_USE_DUALSTACK")
combineSmallPart := os.Getenv("MULTIPART_COMBINE_SMALL_PART")
if err != nil {
panic(err)
}
@ -84,6 +85,10 @@ func init() {
useDualStackBool := false
if useDualStack != "" {
useDualStackBool, err = strconv.ParseBool(useDualStack)
}
multipartCombineSmallPart := true
if combineSmallPart != "" {
multipartCombineSmallPart, err = strconv.ParseBool(combineSmallPart)
if err != nil {
return nil, err
}
@ -104,6 +109,7 @@ func init() {
defaultMultipartCopyChunkSize,
defaultMultipartCopyMaxConcurrency,
defaultMultipartCopyThresholdSize,
multipartCombineSmallPart,
rootDirectory,
storageClass,
driverName + "-test",