trying to capture segments info during upload to swift backend and
delete if there is error duing upload object.
This commit is contained in:
parent
9f4589a997
commit
ca324b5084
1 changed files with 26 additions and 0 deletions
|
@ -1127,6 +1127,7 @@ func (o *Object) updateChunks(in0 io.Reader, headers swift.Headers, size int64,
|
||||||
uniquePrefix := fmt.Sprintf("%s/%d", swift.TimeToFloatString(time.Now()), size)
|
uniquePrefix := fmt.Sprintf("%s/%d", swift.TimeToFloatString(time.Now()), size)
|
||||||
segmentsPath := fmt.Sprintf("%s%s/%s", o.fs.root, o.remote, uniquePrefix)
|
segmentsPath := fmt.Sprintf("%s%s/%s", o.fs.root, o.remote, uniquePrefix)
|
||||||
in := bufio.NewReader(in0)
|
in := bufio.NewReader(in0)
|
||||||
|
segmentInfos := make([]string, 0, ((size / int64(o.fs.opt.ChunkSize)) + 1))
|
||||||
for {
|
for {
|
||||||
// can we read at least one byte?
|
// can we read at least one byte?
|
||||||
if _, err := in.Peek(1); err != nil {
|
if _, err := in.Peek(1); err != nil {
|
||||||
|
@ -1148,9 +1149,22 @@ func (o *Object) updateChunks(in0 io.Reader, headers swift.Headers, size int64,
|
||||||
err = o.fs.pacer.CallNoRetry(func() (bool, error) {
|
err = o.fs.pacer.CallNoRetry(func() (bool, error) {
|
||||||
var rxHeaders swift.Headers
|
var rxHeaders swift.Headers
|
||||||
rxHeaders, err = o.fs.c.ObjectPut(o.fs.segmentsContainer, segmentPath, segmentReader, true, "", "", headers)
|
rxHeaders, err = o.fs.c.ObjectPut(o.fs.segmentsContainer, segmentPath, segmentReader, true, "", "", headers)
|
||||||
|
if err == nil {
|
||||||
|
segmentInfos = append(segmentInfos, segmentPath)
|
||||||
|
}
|
||||||
return shouldRetryHeaders(rxHeaders, err)
|
return shouldRetryHeaders(rxHeaders, err)
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
if len(segmentInfos) > 0 {
|
||||||
|
for _, v := range segmentInfos {
|
||||||
|
fs.Debugf(o, "Delete segment file %q on %q", v, o.fs.segmentsContainer)
|
||||||
|
e := o.fs.c.ObjectDelete(o.fs.segmentsContainer, v)
|
||||||
|
if e != nil {
|
||||||
|
fs.Errorf(o, "Error occured in delete segment file %q on %q , error: %q", v, o.fs.segmentsContainer, e)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
segmentInfos = nil
|
||||||
|
}
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
i++
|
i++
|
||||||
|
@ -1165,6 +1179,18 @@ func (o *Object) updateChunks(in0 io.Reader, headers swift.Headers, size int64,
|
||||||
rxHeaders, err = o.fs.c.ObjectPut(o.fs.container, manifestName, emptyReader, true, "", contentType, headers)
|
rxHeaders, err = o.fs.c.ObjectPut(o.fs.container, manifestName, emptyReader, true, "", contentType, headers)
|
||||||
return shouldRetryHeaders(rxHeaders, err)
|
return shouldRetryHeaders(rxHeaders, err)
|
||||||
})
|
})
|
||||||
|
if err != nil {
|
||||||
|
if segmentInfos != nil && len(segmentInfos) > 0 {
|
||||||
|
for _, v := range segmentInfos {
|
||||||
|
fs.Debugf(o, "Delete segment file %q on %q", v, o.fs.segmentsContainer)
|
||||||
|
e := o.fs.c.ObjectDelete(o.fs.segmentsContainer, v)
|
||||||
|
if e != nil {
|
||||||
|
fs.Errorf(o, "Error occured in delete segment file %q on %q , error: %q", v, o.fs.segmentsContainer, e)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
segmentInfos = nil
|
||||||
|
}
|
||||||
|
}
|
||||||
return uniquePrefix + "/", err
|
return uniquePrefix + "/", err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Add table
Reference in a new issue