fs/accounting: add option to delete stats

Removed PruneAllTransfers because it had no use. startedTransfers are
set to nil in ResetCounters.
This commit is contained in:
Aleksandar Jankovic 2019-12-19 11:16:22 +01:00 committed by Nick Craig-Wood
parent 0e64df4b4c
commit b9fb313f71
4 changed files with 158 additions and 41 deletions

View file

@ -634,20 +634,6 @@ func (s *StatsInfo) RemoveTransfer(transfer *Transfer) {
s.mu.Unlock()
}
// PruneAllTransfers removes all finished transfers.
func (s *StatsInfo) PruneAllTransfers() {
s.mu.Lock()
for i := 0; i < len(s.startedTransfers); i++ {
tr := s.startedTransfers[i]
if tr.IsDone() {
s.removeTransfer(tr, i)
// i'th element is removed, recover iterator to not skip next element.
i--
}
}
s.mu.Unlock()
}
// PruneTransfers makes sure there aren't too many old transfers by removing
// single finished transfer.
func (s *StatsInfo) PruneTransfers() {

View file

@ -184,9 +184,8 @@ func rcResetStats(ctx context.Context, in rc.Params) (rc.Params, error) {
if group != "" {
stats := groups.get(group)
stats.ResetCounters()
stats.ResetErrors()
stats.PruneAllTransfers()
stats.ResetCounters()
} else {
groups.reset()
}
@ -210,6 +209,35 @@ Parameters
})
}
func rcDeleteStats(ctx context.Context, in rc.Params) (rc.Params, error) {
// Group name required because we only do single group.
group, err := in.GetString("group")
if rc.NotErrParamNotFound(err) {
return rc.Params{}, err
}
if group != "" {
groups.delete(group)
}
return rc.Params{}, nil
}
func init() {
rc.Add(rc.Call{
Path: "core/stats-delete",
Fn: rcDeleteStats,
Title: "Delete stats group.",
Help: `
This deletes entire stats group
Parameters
- group - name of the stats group (string)
`,
})
}
type statsGroupCtx int64
const statsGroupKey statsGroupCtx = 1
@ -282,13 +310,13 @@ func (sg *statsGroups) set(group string, stats *StatsInfo) {
// Limit number of groups kept in memory.
if len(sg.order) >= fs.Config.MaxStatsGroups {
group := sg.order[0]
fs.LogPrintf(fs.LogLevelInfo, nil, "Max number of stats groups reached removing %s", group)
//fs.LogPrintf(fs.LogLevelInfo, nil, "Max number of stats groups reached removing %s", group)
delete(sg.m, group)
r := (len(sg.order) - fs.Config.MaxStatsGroups) + 1
sg.order = sg.order[r:]
}
// Exclude global stats from
// Exclude global stats from listing
if group != globalStats {
sg.order = append(sg.order, group)
}
@ -343,9 +371,30 @@ func (sg *statsGroups) reset() {
for _, stats := range sg.m {
stats.ResetErrors()
stats.ResetCounters()
stats.PruneAllTransfers()
}
sg.m = make(map[string]*StatsInfo)
sg.order = nil
}
// delete removes all references to the group.
func (sg *statsGroups) delete(group string) {
sg.mu.Lock()
defer sg.mu.Unlock()
stats := sg.m[group]
if stats == nil {
return
}
stats.ResetErrors()
stats.ResetCounters()
delete(sg.m, group)
// Remove group reference from the ordering slice.
tmp := sg.order[:0]
for _, g := range sg.order {
if g != group {
tmp = append(tmp, g)
}
}
sg.order = tmp
}

View file

@ -0,0 +1,104 @@
package accounting
import (
"fmt"
"runtime"
"testing"
)
func TestStatsGroupOperations(t *testing.T) {
t.Run("empty group returns nil", func(t *testing.T) {
t.Parallel()
sg := newStatsGroups()
sg.get("invalid-group")
})
t.Run("set assigns stats to group", func(t *testing.T) {
t.Parallel()
stats := NewStats()
sg := newStatsGroups()
sg.set("test", stats)
sg.set("test1", stats)
if len(sg.m) != len(sg.names()) || len(sg.m) != 2 {
t.Fatalf("Expected two stats got %d, %d", len(sg.m), len(sg.order))
}
})
t.Run("get returns correct group", func(t *testing.T) {
t.Parallel()
stats := NewStats()
sg := newStatsGroups()
sg.set("test", stats)
sg.set("test1", stats)
got := sg.get("test")
if got != stats {
t.Fatal("get returns incorrect stats")
}
})
t.Run("sum returns correct values", func(t *testing.T) {
t.Parallel()
stats1 := NewStats()
stats1.bytes = 5
stats1.errors = 5
stats2 := NewStats()
sg := newStatsGroups()
sg.set("test1", stats1)
sg.set("test2", stats2)
sum := sg.sum()
if sum.bytes != stats1.bytes+stats2.bytes {
t.Fatalf("sum() => bytes %d, expected %d", sum.bytes, stats1.bytes+stats2.bytes)
}
if sum.errors != stats1.errors+stats2.errors {
t.Fatalf("sum() => errors %d, expected %d", sum.errors, stats1.errors+stats2.errors)
}
})
t.Run("delete removes stats", func(t *testing.T) {
t.Parallel()
stats := NewStats()
sg := newStatsGroups()
sg.set("test", stats)
sg.set("test1", stats)
sg.delete("test1")
if sg.get("test1") != nil {
t.Fatal("stats not deleted")
}
if len(sg.m) != len(sg.names()) || len(sg.m) != 1 {
t.Fatalf("Expected two stats got %d, %d", len(sg.m), len(sg.order))
}
})
t.Run("memory is reclaimed", func(t *testing.T) {
var (
count = 1000
start, end runtime.MemStats
sg = newStatsGroups()
)
runtime.GC()
runtime.ReadMemStats(&start)
for i := 0; i < count; i++ {
sg.set(fmt.Sprintf("test-%d", i), NewStats())
}
for i := 0; i < count; i++ {
sg.delete(fmt.Sprintf("test-%d", i))
}
runtime.GC()
runtime.ReadMemStats(&end)
t.Log(fmt.Sprintf("%+v\n%+v", start, end))
diff := percentDiff(start.HeapObjects, end.HeapObjects)
if diff > 1 || diff < 0 {
t.Errorf("HeapObjects = %d, expected %d", end.HeapObjects, start.HeapObjects)
}
})
}
func percentDiff(start, end uint64) uint64 {
return (start - end) * 100 / start
}

View file

@ -431,25 +431,3 @@ func TestPruneTransfers(t *testing.T) {
})
}
}
func TestPruneAllTransfers(t *testing.T) {
const transfers = 10
s := NewStats()
for i := int64(1); i <= int64(transfers); i++ {
s.AddTransfer(&Transfer{
startedAt: time.Unix(i, 0),
completedAt: time.Unix(i+1, 0),
})
}
s.mu.Lock()
assert.Equal(t, transfers, len(s.startedTransfers))
s.mu.Unlock()
s.PruneAllTransfers()
s.mu.Lock()
assert.Empty(t, s.startedTransfers)
s.mu.Unlock()
}