fs/accounting: use transferMap instead of stringSet

This commit is contained in:
Max Sum 2020-06-14 02:37:42 +08:00 committed by Nick Craig-Wood
parent e2201689cf
commit e2183ad661
3 changed files with 131 additions and 135 deletions

View file

@ -26,11 +26,11 @@ type StatsInfo struct {
retryError bool retryError bool
retryAfter time.Time retryAfter time.Time
checks int64 checks int64
checking *stringSet checking *transferMap
checkQueue int checkQueue int
checkQueueSize int64 checkQueueSize int64
transfers int64 transfers int64
transferring *stringSet transferring *transferMap
transferQueue int transferQueue int
transferQueueSize int64 transferQueueSize int64
renames int64 renames int64
@ -47,8 +47,8 @@ type StatsInfo struct {
// NewStats creates an initialised StatsInfo // NewStats creates an initialised StatsInfo
func NewStats() *StatsInfo { func NewStats() *StatsInfo {
return &StatsInfo{ return &StatsInfo{
checking: newStringSet(fs.Config.Checkers, "checking"), checking: newTransferMap(fs.Config.Checkers, "checking"),
transferring: newStringSet(fs.Config.Transfers, "transferring"), transferring: newTransferMap(fs.Config.Transfers, "transferring"),
inProgress: newInProgress(), inProgress: newInProgress(),
} }
} }
@ -81,11 +81,11 @@ func (s *StatsInfo) RemoteStats() (out rc.Params, err error) {
s.transferring.mu.RLock() s.transferring.mu.RLock()
var t []rc.Params var t []rc.Params
for name := range s.transferring.items { for name, tr := range s.transferring.items {
if acc := s.inProgress.get(name); acc != nil { if acc := s.inProgress.get(name); acc != nil {
t = append(t, acc.RemoteStats()) t = append(t, acc.RemoteStats())
} else { } else {
t = append(t, s.transferRemoteStats(name)) t = append(t, s.transferRemoteStats(tr))
} }
} }
out["transferring"] = t out["transferring"] = t
@ -108,18 +108,11 @@ func (s *StatsInfo) Speed() float64 {
return speed return speed
} }
func (s *StatsInfo) transferRemoteStats(name string) rc.Params { func (s *StatsInfo) transferRemoteStats(tr *Transfer) rc.Params {
s.mu.RLock()
defer s.mu.RUnlock()
for _, tr := range s.startedTransfers {
if tr.remote == name {
return rc.Params{ return rc.Params{
"name": name, "name": tr.remote,
"size": tr.size, "size": tr.size,
} }
}
}
return rc.Params{"name": name}
} }
// timeRange is a start and end time of a transfer // timeRange is a start and end time of a transfer
@ -558,8 +551,9 @@ func (s *StatsInfo) RetryAfter() time.Time {
// NewCheckingTransfer adds a checking transfer to the stats, from the object. // NewCheckingTransfer adds a checking transfer to the stats, from the object.
func (s *StatsInfo) NewCheckingTransfer(obj fs.Object) *Transfer { func (s *StatsInfo) NewCheckingTransfer(obj fs.Object) *Transfer {
s.checking.add(obj.Remote()) tr := newCheckingTransfer(s, obj)
return newCheckingTransfer(s, obj) s.checking.add(tr)
return tr
} }
// DoneChecking removes a check from the stats // DoneChecking removes a check from the stats
@ -579,14 +573,16 @@ func (s *StatsInfo) GetTransfers() int64 {
// NewTransfer adds a transfer to the stats from the object. // NewTransfer adds a transfer to the stats from the object.
func (s *StatsInfo) NewTransfer(obj fs.Object) *Transfer { func (s *StatsInfo) NewTransfer(obj fs.Object) *Transfer {
s.transferring.add(obj.Remote()) tr := newTransfer(s, obj)
return newTransfer(s, obj) s.transferring.add(tr)
return tr
} }
// NewTransferRemoteSize adds a transfer to the stats based on remote and size. // NewTransferRemoteSize adds a transfer to the stats based on remote and size.
func (s *StatsInfo) NewTransferRemoteSize(remote string, size int64) *Transfer { func (s *StatsInfo) NewTransferRemoteSize(remote string, size int64) *Transfer {
s.transferring.add(remote) tr := newTransferRemoteSize(s, remote, size, false)
return newTransferRemoteSize(s, remote, size, false) s.transferring.add(tr)
return tr
} }
// DoneTransferring removes a transfer from the stats // DoneTransferring removes a transfer from the stats

View file

@ -1,112 +0,0 @@
package accounting
import (
"fmt"
"sort"
"strings"
"sync"
"github.com/rclone/rclone/fs"
)
// stringSet holds a set of strings
type stringSet struct {
mu sync.RWMutex
items map[string]struct{}
name string
}
// newStringSet creates a new empty string set of capacity size
func newStringSet(size int, name string) *stringSet {
return &stringSet{
items: make(map[string]struct{}, size),
name: name,
}
}
// add adds remote to the set
func (ss *stringSet) add(remote string) {
ss.mu.Lock()
ss.items[remote] = struct{}{}
ss.mu.Unlock()
}
// del removes remote from the set
func (ss *stringSet) del(remote string) {
ss.mu.Lock()
delete(ss.items, remote)
ss.mu.Unlock()
}
// merge adds items from another set
func (ss *stringSet) merge(m *stringSet) {
ss.mu.Lock()
m.mu.Lock()
for item := range m.items {
ss.items[item] = struct{}{}
}
m.mu.Unlock()
ss.mu.Unlock()
}
// empty returns whether the set has any items
func (ss *stringSet) empty() bool {
ss.mu.RLock()
defer ss.mu.RUnlock()
return len(ss.items) == 0
}
// count returns the number of items in the set
func (ss *stringSet) count() int {
ss.mu.RLock()
defer ss.mu.RUnlock()
return len(ss.items)
}
// String returns string representation of set items excluding any in
// exclude (if set).
func (ss *stringSet) String(progress *inProgress, exclude *stringSet) string {
ss.mu.RLock()
defer ss.mu.RUnlock()
strngs := make([]string, 0, len(ss.items))
for name := range ss.items {
if exclude != nil {
exclude.mu.RLock()
_, found := exclude.items[name]
exclude.mu.RUnlock()
if found {
continue
}
}
var out string
if acc := progress.get(name); acc != nil {
out = acc.String()
} else {
out = fmt.Sprintf("%*s: %s",
fs.Config.StatsFileNameLength,
shortenName(name, fs.Config.StatsFileNameLength),
ss.name,
)
}
strngs = append(strngs, " * "+out)
}
sorted := sort.StringSlice(strngs)
sorted.Sort()
return strings.Join(sorted, "\n")
}
// progress returns total bytes read as well as the size.
func (ss *stringSet) progress(stats *StatsInfo) (totalBytes, totalSize int64) {
ss.mu.RLock()
defer ss.mu.RUnlock()
for name := range ss.items {
if acc := stats.inProgress.get(name); acc != nil {
bytes, size := acc.progress()
if size >= 0 && bytes >= 0 {
totalBytes += bytes
totalSize += size
}
}
}
return totalBytes, totalSize
}

View file

@ -0,0 +1,112 @@
package accounting
import (
"fmt"
"sort"
"strings"
"sync"
"github.com/rclone/rclone/fs"
)
// transferMap holds name to transfer map
type transferMap struct {
mu sync.RWMutex
items map[string]*Transfer
name string
}
// newTransferMap creates a new empty transfer map of capacity size
func newTransferMap(size int, name string) *transferMap {
return &transferMap{
items: make(map[string]*Transfer, size),
name: name,
}
}
// add adds a new transfer to the map
func (tm *transferMap) add(tr *Transfer) {
tm.mu.Lock()
tm.items[tr.remote] = tr
tm.mu.Unlock()
}
// del removes a transfer from the map by name
func (tm *transferMap) del(remote string) {
tm.mu.Lock()
delete(tm.items, remote)
tm.mu.Unlock()
}
// merge adds items from another map
func (tm *transferMap) merge(m *transferMap) {
tm.mu.Lock()
m.mu.Lock()
for name, tr := range m.items {
tm.items[name] = tr
}
m.mu.Unlock()
tm.mu.Unlock()
}
// empty returns whether the map has any items
func (tm *transferMap) empty() bool {
tm.mu.RLock()
defer tm.mu.RUnlock()
return len(tm.items) == 0
}
// count returns the number of items in the map
func (tm *transferMap) count() int {
tm.mu.RLock()
defer tm.mu.RUnlock()
return len(tm.items)
}
// String returns string representation of map items excluding any in
// exclude (if set).
func (tm *transferMap) String(progress *inProgress, exclude *transferMap) string {
tm.mu.RLock()
defer tm.mu.RUnlock()
strngs := make([]string, 0, len(tm.items))
for name, _ := range tm.items {
if exclude != nil {
exclude.mu.RLock()
_, found := exclude.items[name]
exclude.mu.RUnlock()
if found {
continue
}
}
var out string
if acc := progress.get(name); acc != nil {
out = acc.String()
} else {
out = fmt.Sprintf("%*s: %s",
fs.Config.StatsFileNameLength,
shortenName(name, fs.Config.StatsFileNameLength),
tm.name,
)
}
strngs = append(strngs, " * "+out)
}
sorted := sort.StringSlice(strngs)
sorted.Sort()
return strings.Join(sorted, "\n")
}
// progress returns total bytes read as well as the size.
func (tm *transferMap) progress(stats *StatsInfo) (totalBytes, totalSize int64) {
tm.mu.RLock()
defer tm.mu.RUnlock()
for name := range tm.items {
if acc := stats.inProgress.get(name); acc != nil {
bytes, size := acc.progress()
if size >= 0 && bytes >= 0 {
totalBytes += bytes
totalSize += size
}
}
}
return totalBytes, totalSize
}