Coverage for src/scrilla/analysis/estimators.py: 75%

163 statements  

« prev     ^ index     » next       coverage.py v6.4.2, created at 2022-07-18 18:14 +0000

1# This file is part of scrilla: https://github.com/chinchalinchin/scrilla. 

2 

3# scrilla is free software: you can redistribute it and/or modify 

4# it under the terms of the GNU General Public License version 3 

5# as published by the Free Software Foundation. 

6 

7# scrilla is distributed in the hope that it will be useful, 

8# but WITHOUT ANY WARRANTY; without even the implied warranty of 

9# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

10# GNU General Public License for more details. 

11 

12# You should have received a copy of the GNU General Public License 

13# along with scrilla. If not, see <https://www.gnu.org/licenses/> 

14# or <https://github.com/chinchalinchin/scrilla/blob/develop/main/LICENSE>. 

15 

16""" 

17A module of statistical point estimators and likelihood functions. 

18""" 

19 

20from os import path 

21from sys import path as sys_path 

22from typing import List, Union 

23from numpy import inf 

24from math import log, sqrt, exp 

25from scipy.stats import norm, multivariate_normal 

26 

27from scrilla.util import errors 

28 

29if __name__ == "__main__": 29 ↛ 30line 29 didn't jump to line 30, because the condition on line 29 was never true

30 APP_DIR = path.dirname(path.dirname(path.abspath(__file__))) 

31 sys_path.append(APP_DIR) 

32 

33from scrilla import settings, cache 

34from scrilla.static.constants import constants 

35import scrilla.util.outputter as outputter 

36 

37logger = outputter.Logger('scrilla.analysis.estimators', settings.LOG_LEVEL) 

38profile_cache = cache.ProfileCache() 

39correlation_cache = cache.CorrelationCache() 

40 

41 

42def univariate_normal_likelihood_function(params: list, data: list) -> float: 

43 """ 

44 This function returns the likelihood of a vector of parameters being observed from a sample univariate data of normal data. It can be used as objective function input for `scipy.optimize`'s optimization methods.  

45 

46 Parameters 

47 ---------- 

48 1. **x** : ``list`` 

49 Array representing a vector of parameters , in this case the mean rate of return and volatility from a sample of data. 

50 2. **data** : ``list`` 

51 A list of data that has been drawn from a univariate normal population. 

52 """ 

53 likelihood = 0 

54 for point in data: 

55 likelihood += norm.logpdf(x=point, loc=params[0], scale=params[1]) 

56 return likelihood 

57 

58 

59def bivariate_normal_likelihood_function(params: list, data: list) -> float: 

60 r""" 

61 Returns the likelihood of a vector of parameters being observed from a sample bivariate data of normal data. It can be used as objective function input for `scipy.optimize`'s optimization methods.  

62 

63 Parameters 

64 ---------- 

65 1. **params** : ``list`` 

66 Array representing a vector of parameters, in this case the mean rate of returns, voledatilities and covariance for a bivariate normal distribution. *Important*: The vector must be order: 1. params[0] = \\(\mu_x\\), params[1]=\\(\mu_y\\), params[2] = \\(\sigma_x\\), params[3] = \\(\sigma_y\\), params[4] = \\(\rho_{xy} \cdot \sigma_x \cdot \sigma_y\\). The matrix is parameterized in this manner in order to interface more easily with `scipy.optimize.minimize`. 

67 2. **data** : ``list`` 

68 A list of data that has been drawn from a bivariate normal population. Must be formatted in the following manner: `[ [x1,y1], [x2,y2],...]` 

69 

70 .. notes:: 

71 * the covariance matrix of a bivariate normal distribution must be positive semi-definite (PSD) and non-singular. PSD can be checked with the [Slyvester Criterion](https://en.wikipedia.org/wiki/Sylvester%27s_criterion) or [Cauchy-Schwarz Inequality](https://en.wikipedia.org/wiki/Cauchy%E2%80%93Schwarz_inequality#Probability_theory). Since sample variance will always be positive, this reduces to checking the determinant of the covariance matrix is greater than 0. This function will return `numpy.inf` if the covariance matrix is singular or non-positive semi-definite. 

72 

73 """ 

74 mean = [params[0], params[1]] 

75 cov = [[params[2], params[4]], [params[4], params[3]]] 

76 

77 determinant = params[2]*params[3] - params[4]**2 

78 if determinant == 0 or determinant < 0 or determinant < (10**(-constants['ACCURACY'])): 78 ↛ 79line 78 didn't jump to line 79, because the condition on line 78 was never true

79 return inf 

80 

81 likelihood = 0 

82 for point in data: 

83 likelihood += multivariate_normal.logpdf(x=point, mean=mean, cov=cov) 

84 return likelihood 

85 

86 

87def sample_percentile(data: List[float], percentile: float): 

88 """ 

89 Returns the observation in a sample data corresponding to the given percentile, i.e. the observation from a sorted sample where the percentage of the observations below that point is specified by the percentile. If the percentile falls between data points, the observation is smoothed based on the distance from the adjoining observations in the following manner, 

90 

91 .. todo:: add latex here 

92 

93 Parameters 

94 ---------- 

95 1. **data** : ``list`` 

96 Array representing the set of data whose percentile is to be calculated. 

97 2. **percentile**: ``float`` 

98 The percentile corresponding to the desired observation. 

99 """ 

100 data.sort() 

101 

102 obs_number = (len(data) + 1)*percentile 

103 extrapolate = obs_number - int(obs_number) 

104 

105 if extrapolate == 0: 

106 return data[int(obs_number)-1] 

107 if obs_number > len(data): 

108 return data[-1] 

109 first_index = int(obs_number) - 1 

110 second_index = first_index + 1 

111 weight = obs_number - int(obs_number) 

112 return (1-weight)*data[first_index] + weight*data[second_index] 

113 

114 

115def empirical_copula(sample: List[List[float]], x_order: float, y_order: float): 

116 """ 

117 Computes an empirical estimate of the copula distribution for a bivariate sample, i.e. 

118 

119 $$ C(u_{1}, u_{2}) = F_{c} ({U_{1}) < F_1^{-1}(u_{1}), {U_{2}}< F_2^{-1}(u_{2})) $$ 

120 

121 Using the empirical estimate defined by, 

122 

123 $$ C_{n} = \frac{\{ (x,y) \vert x \leq x_{p} & y \leq y_{p} \}}{n} $$  

124 

125 where \\(x_p\\) and \\(y_p\\) are the *p*-th percentiles of their respective univariate samples. 

126 """ 

127 n = len(sample) 

128 

129 def x_order_bounds(test_point): 

130 return test_point < x_order or test_point == x_order 

131 

132 def y_order_bounds(test_point): 

133 return test_point < y_order or test_point == y_order 

134 

135 copula = [1 for point in sample if x_order_bounds( 

136 point[0]) and y_order_bounds(point[1])] 

137 return len(copula) / n 

138 

139 

140def sample_correlation(x: List[float], y: List[float]): 

141 """ 

142 Returns the sample correlation calculated using the Pearson correlation coefficient estimator, 

143 

144 .. todo:: Pearson coefficient formula here 

145 

146 Parameters 

147 ---------- 

148 1. **x**: ``list`` 

149 The *x* sample of paired data (*x*, *y*). Must preserve order with **y**. 

150 2. **y**: ``list`` 

151 The *y* sample of paired data (*x*, *y*). Must preserve order with **x**. 

152 

153 Raises  

154 ------ 

155 1. `scrilla.errors.SampleSizeError` : 

156 If the sample sizes do not meet the requirements for estimation, this error will be thrown. 

157 2. **ValueError** : 

158 If the denominator of the correlation coefficient becomes too small for floating point arithmetic, this error is thrown. 

159 

160 .. todos :: 

161 * Possibly wrap the correlation coefficient numerator and denominator in `Decimal` class before calculation to bypass the **ValueError** that occurs in some samples where the denominator is too small for the arithmetic to detect. 

162 """ 

163 if len(x) != len(y): 163 ↛ 164line 163 didn't jump to line 164, because the condition on line 163 was never true

164 raise errors.SampleSizeError('Samples are not of comparable lengths') 

165 

166 if len(x) in [0, 1]: 166 ↛ 167line 166 didn't jump to line 167, because the condition on line 166 was never true

167 raise errors.SampleSizeError( 

168 'Sample correlation cannot be computed for a sample size less than or equal to 1.') 

169 

170 sumproduct, sum_x_squared, sum_x, sum_y, sum_y_squared = 0, 0, 0, 0, 0 

171 n = len(x) 

172 for i, item in enumerate(x): 

173 sumproduct += item*y[i] 

174 sum_x += item 

175 sum_x_squared += item**2 

176 sum_y += y[i] 

177 sum_y_squared += y[i]**2 

178 correl_num = ((n*sumproduct) - sum_x*sum_y) 

179 correl_den = sqrt((n*sum_x_squared-sum_x**2)*(n*sum_y_squared-sum_y**2)) 

180 

181 # LET'S DO SOME MATHEMATICS! (to get around division by zero!) 

182 # Unfortunately, this only works when A and B > 0 because log 

183 # of a negative number only exists in complex plane. 

184 # 1. correl = A/B 

185 # 2. log(correl) = log(A/B) = log(A) - log(B) 

186 # 3. exp(log(correl)) = exp(log(A/B)) 

187 # 4. correl = exp(log(A/B)) 

188 if correl_num > 0 and correl_den > 0: 

189 log_correl = log(correl_num) - log(correl_den) 

190 correlation = exp(log_correl) 

191 else: 

192 if correl_den != 0: 192 ↛ 195line 192 didn't jump to line 195, because the condition on line 192 was never false

193 correlation = correl_num / correl_den 

194 else: 

195 raise ValueError( 

196 'Denominator for correlation formula to small for division') 

197 

198 return correlation 

199 

200 

201def recursive_rolling_correlation(correl_previous, new_x_observation, lost_x_obs, 

202 new_y_obs, lost_y_obs, n=settings.DEFAULT_ANALYSIS_PERIOD): 

203 # TODO: after other rolling functions work... 

204 pass 

205 

206 

207def sample_mean(x: List[float]) -> float: 

208 r""" 

209 Returns the sample mean from a sample of data \\(\{x_1 , x_2, ... , x_n \}\\), 

210 

211 $$ \bar{x} = \frac{\sum_{i=1}^{n} x_i}/{n} $$ 

212 

213 Parameters 

214 ---------- 

215 1. **x**: ``List[Union[float,int]]`` 

216 List containing a sample of numerical data. 

217 

218 Raises  

219 ------ 

220 1. **scrilla.errors.SampleSizeError** 

221 If ``len(x)==0``, this error will be thrown. 

222 2. **ValueError** 

223 If the sample contains null or non-numerical data, this error will be thrown. 

224 """ 

225 xbar, n = 0, len(x) 

226 

227 if not all(this_x is not None and isinstance(this_x, (float, int)) for this_x in x): 

228 raise ValueError( 

229 'Sample contains null values') 

230 

231 if n == 0: 

232 raise errors.SampleSizeError( 

233 'Sample mean cannot be computed for a sample size of 0.') 

234 

235 for i in x: 

236 xbar += i/n 

237 return xbar 

238 

239 

240def recursive_rolling_mean(xbar_previous, new_obs, lost_obs, n=settings.DEFAULT_ANALYSIS_PERIOD): 

241 # this should be done in terms of the sample arrays, not the observations themselves, i think. 

242 xbar_next = xbar_previous + (new_obs - lost_obs)/n 

243 return xbar_next 

244 

245 

246def sample_variance(x: List[float]): 

247 r""" 

248 Returns the sample variance from a sample of data \\(\{x_1 , x_2, ... , x_n \}\\), 

249 

250 $$ s^2=\frac{\sum_{i=1}^{n} (x_i - \bar{x})^2}/{n-1} $$ 

251 

252 Parameters 

253 ---------- 

254 1. **x**: ``list`` 

255 List containing a sample of numerical data. 

256 

257 Raises  

258 ------ 

259 1. `scrilla.errors.SampleSizeError` 

260 

261 .. notes:: 

262 * This a naive two-pass implementation of sample variance. The function does not shift data around the mean, resulting in skewed calculations if the variance is numerically small and the sample is large; if you are concerned about the accuracy of your variance calculation in these instances, a better method is to use `scrilla.analysis.estimators.recursive_sum_of_squares` and then compute the variance by dividing by _(n-1)_, where _n_ is the number of samples. 

263 """ 

264 # TODO: this is a 'naive' estimation of variance: https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance 

265 

266 mu, sigma, n = sample_mean(x=x), 0, len(x) 

267 

268 if not all(this_x is not None and isinstance(this_x, (float, int)) for this_x in x): 268 ↛ 269line 268 didn't jump to line 269, because the condition on line 268 was never true

269 raise ValueError( 

270 'Sample contains null values') 

271 

272 if n == 0: 272 ↛ 273line 272 didn't jump to line 273, because the condition on line 272 was never true

273 raise errors.SampleSizeError( 

274 'Sample variance cannot be computed for a sample size of 0.') 

275 

276 if n == 1: 

277 # no variance for a sample of 1. 

278 return 0 

279 

280 for i in x: 

281 sigma += ((i-mu)**2)/(n-1) 

282 return sigma 

283 

284 

285def recursive_rolling_variance(var_previous, xbar_previous, new_obs, lost_obs, n=settings.DEFAULT_ANALYSIS_PERIOD): 

286 # TODO: slightly off for some reason...Formula not correct? Rework it out. 

287 xbar_new = recursive_rolling_mean(xbar_previous=xbar_previous, new_obs=new_obs, 

288 lost_obs=lost_obs, n=n) 

289 var_new = var_previous + \ 

290 (n/(n-1))*((new_obs**2 - lost_obs**2)/n + (xbar_previous**2-xbar_new**2)) 

291 return var_new 

292 

293 

294def recursive_sum_of_squares(x: List[float], checked: bool = False): 

295 n = len(x) 

296 

297 if not checked: 

298 if not all(this_x is not None and isinstance(this_x, (float, int)) for this_x in x): 298 ↛ 299line 298 didn't jump to line 299, because the condition on line 298 was never true

299 raise ValueError( 

300 'Sample contains null values') 

301 

302 if n == 0: 302 ↛ 303line 302 didn't jump to line 303, because the condition on line 302 was never true

303 raise errors.SampleSizeError( 

304 'Sample variance cannot be computed for a sample size of 0.') 

305 

306 if n == 1: 

307 return 0 

308 

309 term_variance = (n*x[-1] - sum(x))**2/(n*(n-1)) 

310 return recursive_sum_of_squares(x[:-1], True) + term_variance 

311 

312 

313def sample_covariance(x: list, y: list): 

314 """ 

315 Parameters 

316 ---------- 

317 1. **x**: ``list`` 

318 The *x* sample of paired data (*x*, *y*). Must preserve order with **y**. 

319 2. **y**: ``list`` 

320 The *y* sample of paired data (*x*, *y*). Must preserve order with **x**. 

321 

322 Raises  

323 ------ 

324 1. `scrilla.errors.SampleSizeError` 

325 If ``len(x) != len(y)`` (samples of incomparable length) or ``len(x) in [0,1]`` (insufficient data/degrees of freedom), this error will be thrown. 

326 """ 

327 

328 if len(x) != len(y): 328 ↛ 329line 328 didn't jump to line 329, because the condition on line 328 was never true

329 raise errors.SampleSizeError('Samples are not of comparable length') 

330 

331 if len(x) in [0, 1]: 331 ↛ 332line 331 didn't jump to line 332, because the condition on line 331 was never true

332 raise errors.SampleSizeError( 

333 'Sample correlation cannot be computed for a sample size less than or equal to 1.') 

334 

335 n, covariance = len(x), 0 

336 

337 x_mean, y_mean = sample_mean(x=x), sample_mean(x=y) 

338 

339 for i, item in enumerate(x): 

340 covariance += (item - x_mean)*(y[i] - y_mean) / (n - 1) 

341 

342 return covariance 

343 

344 

345def recursive_rolling_covariance(covar_previous: float, new_x_obs: float, lost_x_obs: float, previous_x_bar: float, new_y_obs: float, lost_y_obs: float, previous_y_bar: float, n: int = settings.DEFAULT_ANALYSIS_PERIOD): 

346 # TODO: no work. 

347 new_sum_term = new_x_obs*new_y_obs - lost_x_obs*lost_y_obs 

348 xy_cross_term = previous_x_bar*(new_y_obs-lost_y_obs) 

349 yx_cross_term = previous_y_bar*(new_x_obs-lost_x_obs) 

350 perturbation = (new_x_obs-lost_x_obs)*(new_y_obs-lost_y_obs) / n 

351 numerator = new_sum_term - xy_cross_term - yx_cross_term - perturbation 

352 covar_new = covar_previous + numerator / (n-1) 

353 return covar_new 

354 

355 

356def simple_regression_beta(x: List[float], y: List[float]): 

357 """ 

358 Parameters 

359 ---------- 

360 1. **x**: ``list`` 

361 The *x* sample of paired data (*x*, *y*). Must preserve order with **y**. 

362 2. **y**: ``list`` 

363 The *y* sample of paired data (*x*, *y*). Must preserve order with **x**. 

364 

365 Raises  

366 ------ 

367 1. `scrilla.errors.statistics.SampleSizeError` 

368 If ``len(x) != len(y)`` (samples of incomparable length) or ``len(x) < 3`` (insufficient data/degrees of freedom), this error will be thrown. 

369 """ 

370 

371 if len(x) != len(y): 371 ↛ 372line 371 didn't jump to line 372, because the condition on line 371 was never true

372 raise errors.SampleSizeError(f'len(x) = {len(x)} != len(y) = {len(y)}') 

373 if len(x) < 3: 373 ↛ 374line 373 didn't jump to line 374, because the condition on line 373 was never true

374 raise errors.SampleSizeError( 

375 f'Sample size of {len(x)} is less than the necessary degrees of freedom (n > 2) for regression estimation.') 

376 

377 correl = sample_correlation(x=x, y=y) 

378 vol_x = sqrt(sample_variance(x=x)) 

379 vol_y = sqrt(sample_variance(x=y)) 

380 

381 beta = correl * vol_y / vol_x 

382 return beta 

383 

384 

385def simple_regression_alpha(x: List[float], y: List[float]): 

386 """ 

387 Parameters 

388 ---------- 

389 1. **x**: ``list`` 

390 The *x* sample of paired data (*x*, *y*). Must preserve order with **y**. 

391 2. **y**: ``list`` 

392 The *y* sample of paired data (*x*, *y*). Must preserve order with **x**. 

393 

394 Raises  

395 ------ 

396 1. `scrilla.errors.SampleSizeError` 

397 If ``len(x) != len(y)`` (samples of incomparable length) or ``len(x) < 3`` (insufficient data/degrees of freedom), this error will be thrown. 

398 """ 

399 

400 if len(x) != len(y): 400 ↛ 401line 400 didn't jump to line 401, because the condition on line 400 was never true

401 raise errors.SampleSizeError( 

402 f'len(x) == {len(x)} != len(y) == {len(y)}') 

403 

404 if len(x) < 3: 404 ↛ 405line 404 didn't jump to line 405, because the condition on line 404 was never true

405 raise errors.SampleSizeError( 

406 f'Sample size of {len(x)} is less than the necessary degrees of freedom (n > 2) for regression estimation.') 

407 

408 y_mean, x_mean = sample_mean(y), sample_mean(x) 

409 

410 alpha = y_mean - simple_regression_beta(x=x, y=y)*x_mean 

411 return alpha 

412 

413 

414def qq_series_for_sample(sample: List[float]) -> List[list]: 

415 """ 

416 Calculates the QQ series for a sample of data, i.e. the set defined by the ordered pair of sample percentiles and theoretical normal percentiles. A sample's normality can be assessed by how linear the result graph is. 

417 

418 Parameters 

419 ---------- 

420 1. **sample**: ``list`` 

421 A sample of numerical data. 

422 """ 

423 qq_series = [] 

424 n = len(sample) 

425 for i in range(len(sample)): 

426 percentile = (i + 0.5)/n 

427 percentile_sample = sample_percentile( 

428 data=sample, percentile=percentile) 

429 percentile_norm = norm.ppf(q=percentile) 

430 qq_series += [[percentile_norm, percentile_sample]] 

431 

432 return qq_series 

433 

434 

435def standardize(x: List[float]): 

436 mu = sample_mean(x) 

437 sigma = sqrt(sample_variance(x)) 

438 return [(this_x - mu)/sigma for this_x in x]