source: trunk/src/allmydata/test/test_statistics.py

Last change on this file was 53084f7, checked in by Alexandre Detiste <alexandre.detiste@…>, at 2024-02-27T23:49:07Z

remove more Python2 compatibility

  • Property mode set to 100644
File size: 5.8 KB
Line 
1"""
2Tests for allmydata.util.statistics.
3
4Ported to Python 3.
5"""
6
7from io import StringIO
8
9from twisted.trial import unittest
10
11from allmydata.util import statistics
12
13
14class Statistics(unittest.TestCase):
15    def should_assert(self, msg, func, *args, **kwargs):
16        try:
17            func(*args, **kwargs)
18            self.fail(msg)
19        except AssertionError:
20            pass
21
22    def failUnlessListEqual(self, a, b, msg = None):
23        self.failUnlessEqual(len(a), len(b))
24        for i in range(len(a)):
25            self.failUnlessEqual(a[i], b[i], msg)
26
27    def failUnlessListAlmostEqual(self, a, b, places = 7, msg = None):
28        self.failUnlessEqual(len(a), len(b))
29        for i in range(len(a)):
30            self.failUnlessAlmostEqual(a[i], b[i], places, msg)
31
32    def test_binomial_coeff(self):
33        f = statistics.binomial_coeff
34        self.failUnlessEqual(f(20, 0), 1)
35        self.failUnlessEqual(f(20, 1), 20)
36        self.failUnlessEqual(f(20, 2), 190)
37        self.failUnlessEqual(f(20, 8), f(20, 12))
38        self.should_assert("Should assert if n < k", f, 2, 3)
39        self.assertEqual(f(5, 3), f(5, 2))
40
41    def test_binomial_distribution_pmf(self):
42        f = statistics.binomial_distribution_pmf
43
44        pmf_comp = f(2, .1)
45        pmf_stat = [0.81, 0.18, 0.01]
46        self.failUnlessListAlmostEqual(pmf_comp, pmf_stat)
47
48        # Summing across a PMF should give the total probability 1
49        self.failUnlessAlmostEqual(sum(pmf_comp), 1)
50        self.should_assert("Should assert if not 0<=p<=1", f, 1, -1)
51        self.should_assert("Should assert if n < 1", f, 0, .1)
52
53        out = StringIO()
54        statistics.print_pmf(pmf_comp, out=out)
55        lines = out.getvalue().splitlines()
56        self.failUnlessEqual(lines[0], "i=0: 0.81")
57        self.failUnlessEqual(lines[1], "i=1: 0.18")
58        self.failUnlessEqual(lines[2], "i=2: 0.01")
59
60    def test_survival_pmf(self):
61        f = statistics.survival_pmf
62        # Cross-check binomial-distribution method against convolution
63        # method.
64        p_list = [.9999] * 100 + [.99] * 50 + [.8] * 20
65        pmf1 = statistics.survival_pmf_via_conv(p_list)
66        pmf2 = statistics.survival_pmf_via_bd(p_list)
67        self.failUnlessListAlmostEqual(pmf1, pmf2)
68        self.failUnlessTrue(statistics.valid_pmf(pmf1))
69        self.should_assert("Should assert if p_i > 1", f, [1.1]);
70        self.should_assert("Should assert if p_i < 0", f, [-.1]);
71
72    def test_repair_count_pmf(self):
73        survival_pmf = statistics.binomial_distribution_pmf(5, .9)
74        repair_pmf = statistics.repair_count_pmf(survival_pmf, 3)
75        # repair_pmf[0] == sum(survival_pmf[0,1,2,5])
76        # repair_pmf[1] == survival_pmf[4]
77        # repair_pmf[2] = survival_pmf[3]
78        self.failUnlessListAlmostEqual(repair_pmf,
79                                       [0.00001 + 0.00045 + 0.0081 + 0.59049,
80                                        .32805,
81                                        .0729,
82                                        0, 0, 0])
83
84    def test_repair_cost(self):
85        survival_pmf = statistics.binomial_distribution_pmf(5, .9)
86        bwcost = statistics.bandwidth_cost_function
87        cost = statistics.mean_repair_cost(bwcost, 1000,
88                                           survival_pmf, 3, ul_dl_ratio=1.0)
89        self.failUnlessAlmostEqual(cost, 558.90)
90        cost = statistics.mean_repair_cost(bwcost, 1000,
91                                           survival_pmf, 3, ul_dl_ratio=8.0)
92        self.failUnlessAlmostEqual(cost, 1664.55)
93
94        # I haven't manually checked the math beyond here -warner
95        cost = statistics.eternal_repair_cost(bwcost, 1000,
96                                              survival_pmf, 3,
97                                              discount_rate=0, ul_dl_ratio=1.0)
98        self.failUnlessAlmostEqual(cost, 65292.056074766246)
99        cost = statistics.eternal_repair_cost(bwcost, 1000,
100                                              survival_pmf, 3,
101                                              discount_rate=0.05,
102                                              ul_dl_ratio=1.0)
103        self.failUnlessAlmostEqual(cost, 9133.6097158191551)
104
105    def test_convolve(self):
106        f = statistics.convolve
107        v1 = [ 1, 2, 3 ]
108        v2 = [ 4, 5, 6 ]
109        v3 = [ 7, 8 ]
110        v1v2result = [ 4, 13, 28, 27, 18 ]
111        # Convolution is commutative
112        r1 = f(v1, v2)
113        r2 = f(v2, v1)
114        self.failUnlessListEqual(r1, r2, "Convolution should be commutative")
115        self.failUnlessListEqual(r1, v1v2result, "Didn't match known result")
116        # Convolution is associative
117        r1 = f(f(v1, v2), v3)
118        r2 = f(v1, f(v2, v3))
119        self.failUnlessListEqual(r1, r2, "Convolution should be associative")
120        # Convolution is distributive
121        r1 = f(v3, [ a + b for a, b in zip(v1, v2) ])
122        tmp1 = f(v3, v1)
123        tmp2 = f(v3, v2)
124        r2 = [ a + b for a, b in zip(tmp1, tmp2) ]
125        self.failUnlessListEqual(r1, r2, "Convolution should be distributive")
126        # Convolution is scalar multiplication associative
127        tmp1 = f(v1, v2)
128        r1 = [ a * 4 for a in tmp1 ]
129        tmp2 = [ a * 4 for a in v1 ]
130        r2 = f(tmp2, v2)
131        self.failUnlessListEqual(r1, r2, "Convolution should be scalar multiplication associative")
132
133    def test_find_k(self):
134        f = statistics.find_k
135        g = statistics.pr_file_loss
136        plist = [.9] * 10 + [.8] * 10 # N=20
137        t = .0001
138        k = f(plist, t)
139        self.failUnlessEqual(k, 10)
140        self.failUnless(g(plist, k) < t)
141
142    def test_pr_file_loss(self):
143        f = statistics.pr_file_loss
144        plist = [.5] * 10
145        self.failUnlessEqual(f(plist, 3), .0546875)
146
147    def test_pr_backup_file_loss(self):
148        f = statistics.pr_backup_file_loss
149        plist = [.5] * 10
150        self.failUnlessEqual(f(plist, .5, 3), .02734375)
Note: See TracBrowser for help on using the repository browser.