Coverage for src/scrilla/analysis/estimators.py: 75%
163 statements
« prev ^ index » next coverage.py v6.4.2, created at 2022-07-18 18:14 +0000
« prev ^ index » next coverage.py v6.4.2, created at 2022-07-18 18:14 +0000
1# This file is part of scrilla: https://github.com/chinchalinchin/scrilla.
3# scrilla is free software: you can redistribute it and/or modify
4# it under the terms of the GNU General Public License version 3
5# as published by the Free Software Foundation.
7# scrilla is distributed in the hope that it will be useful,
8# but WITHOUT ANY WARRANTY; without even the implied warranty of
9# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10# GNU General Public License for more details.
12# You should have received a copy of the GNU General Public License
13# along with scrilla. If not, see <https://www.gnu.org/licenses/>
14# or <https://github.com/chinchalinchin/scrilla/blob/develop/main/LICENSE>.
16"""
17A module of statistical point estimators and likelihood functions.
18"""
20from os import path
21from sys import path as sys_path
22from typing import List, Union
23from numpy import inf
24from math import log, sqrt, exp
25from scipy.stats import norm, multivariate_normal
27from scrilla.util import errors
29if __name__ == "__main__": 29 ↛ 30line 29 didn't jump to line 30, because the condition on line 29 was never true
30 APP_DIR = path.dirname(path.dirname(path.abspath(__file__)))
31 sys_path.append(APP_DIR)
33from scrilla import settings, cache
34from scrilla.static.constants import constants
35import scrilla.util.outputter as outputter
37logger = outputter.Logger('scrilla.analysis.estimators', settings.LOG_LEVEL)
38profile_cache = cache.ProfileCache()
39correlation_cache = cache.CorrelationCache()
42def univariate_normal_likelihood_function(params: list, data: list) -> float:
43 """
44 This function returns the likelihood of a vector of parameters being observed from a sample univariate data of normal data. It can be used as objective function input for `scipy.optimize`'s optimization methods.
46 Parameters
47 ----------
48 1. **x** : ``list``
49 Array representing a vector of parameters , in this case the mean rate of return and volatility from a sample of data.
50 2. **data** : ``list``
51 A list of data that has been drawn from a univariate normal population.
52 """
53 likelihood = 0
54 for point in data:
55 likelihood += norm.logpdf(x=point, loc=params[0], scale=params[1])
56 return likelihood
59def bivariate_normal_likelihood_function(params: list, data: list) -> float:
60 r"""
61 Returns the likelihood of a vector of parameters being observed from a sample bivariate data of normal data. It can be used as objective function input for `scipy.optimize`'s optimization methods.
63 Parameters
64 ----------
65 1. **params** : ``list``
66 Array representing a vector of parameters, in this case the mean rate of returns, voledatilities and covariance for a bivariate normal distribution. *Important*: The vector must be order: 1. params[0] = \\(\mu_x\\), params[1]=\\(\mu_y\\), params[2] = \\(\sigma_x\\), params[3] = \\(\sigma_y\\), params[4] = \\(\rho_{xy} \cdot \sigma_x \cdot \sigma_y\\). The matrix is parameterized in this manner in order to interface more easily with `scipy.optimize.minimize`.
67 2. **data** : ``list``
68 A list of data that has been drawn from a bivariate normal population. Must be formatted in the following manner: `[ [x1,y1], [x2,y2],...]`
70 .. notes::
71 * the covariance matrix of a bivariate normal distribution must be positive semi-definite (PSD) and non-singular. PSD can be checked with the [Slyvester Criterion](https://en.wikipedia.org/wiki/Sylvester%27s_criterion) or [Cauchy-Schwarz Inequality](https://en.wikipedia.org/wiki/Cauchy%E2%80%93Schwarz_inequality#Probability_theory). Since sample variance will always be positive, this reduces to checking the determinant of the covariance matrix is greater than 0. This function will return `numpy.inf` if the covariance matrix is singular or non-positive semi-definite.
73 """
74 mean = [params[0], params[1]]
75 cov = [[params[2], params[4]], [params[4], params[3]]]
77 determinant = params[2]*params[3] - params[4]**2
78 if determinant == 0 or determinant < 0 or determinant < (10**(-constants['ACCURACY'])): 78 ↛ 79line 78 didn't jump to line 79, because the condition on line 78 was never true
79 return inf
81 likelihood = 0
82 for point in data:
83 likelihood += multivariate_normal.logpdf(x=point, mean=mean, cov=cov)
84 return likelihood
87def sample_percentile(data: List[float], percentile: float):
88 """
89 Returns the observation in a sample data corresponding to the given percentile, i.e. the observation from a sorted sample where the percentage of the observations below that point is specified by the percentile. If the percentile falls between data points, the observation is smoothed based on the distance from the adjoining observations in the following manner,
91 .. todo:: add latex here
93 Parameters
94 ----------
95 1. **data** : ``list``
96 Array representing the set of data whose percentile is to be calculated.
97 2. **percentile**: ``float``
98 The percentile corresponding to the desired observation.
99 """
100 data.sort()
102 obs_number = (len(data) + 1)*percentile
103 extrapolate = obs_number - int(obs_number)
105 if extrapolate == 0:
106 return data[int(obs_number)-1]
107 if obs_number > len(data):
108 return data[-1]
109 first_index = int(obs_number) - 1
110 second_index = first_index + 1
111 weight = obs_number - int(obs_number)
112 return (1-weight)*data[first_index] + weight*data[second_index]
115def empirical_copula(sample: List[List[float]], x_order: float, y_order: float):
116 """
117 Computes an empirical estimate of the copula distribution for a bivariate sample, i.e.
119 $$ C(u_{1}, u_{2}) = F_{c} ({U_{1}) < F_1^{-1}(u_{1}), {U_{2}}< F_2^{-1}(u_{2})) $$
121 Using the empirical estimate defined by,
123 $$ C_{n} = \frac{\{ (x,y) \vert x \leq x_{p} & y \leq y_{p} \}}{n} $$
125 where \\(x_p\\) and \\(y_p\\) are the *p*-th percentiles of their respective univariate samples.
126 """
127 n = len(sample)
129 def x_order_bounds(test_point):
130 return test_point < x_order or test_point == x_order
132 def y_order_bounds(test_point):
133 return test_point < y_order or test_point == y_order
135 copula = [1 for point in sample if x_order_bounds(
136 point[0]) and y_order_bounds(point[1])]
137 return len(copula) / n
140def sample_correlation(x: List[float], y: List[float]):
141 """
142 Returns the sample correlation calculated using the Pearson correlation coefficient estimator,
144 .. todo:: Pearson coefficient formula here
146 Parameters
147 ----------
148 1. **x**: ``list``
149 The *x* sample of paired data (*x*, *y*). Must preserve order with **y**.
150 2. **y**: ``list``
151 The *y* sample of paired data (*x*, *y*). Must preserve order with **x**.
153 Raises
154 ------
155 1. `scrilla.errors.SampleSizeError` :
156 If the sample sizes do not meet the requirements for estimation, this error will be thrown.
157 2. **ValueError** :
158 If the denominator of the correlation coefficient becomes too small for floating point arithmetic, this error is thrown.
160 .. todos ::
161 * Possibly wrap the correlation coefficient numerator and denominator in `Decimal` class before calculation to bypass the **ValueError** that occurs in some samples where the denominator is too small for the arithmetic to detect.
162 """
163 if len(x) != len(y): 163 ↛ 164line 163 didn't jump to line 164, because the condition on line 163 was never true
164 raise errors.SampleSizeError('Samples are not of comparable lengths')
166 if len(x) in [0, 1]: 166 ↛ 167line 166 didn't jump to line 167, because the condition on line 166 was never true
167 raise errors.SampleSizeError(
168 'Sample correlation cannot be computed for a sample size less than or equal to 1.')
170 sumproduct, sum_x_squared, sum_x, sum_y, sum_y_squared = 0, 0, 0, 0, 0
171 n = len(x)
172 for i, item in enumerate(x):
173 sumproduct += item*y[i]
174 sum_x += item
175 sum_x_squared += item**2
176 sum_y += y[i]
177 sum_y_squared += y[i]**2
178 correl_num = ((n*sumproduct) - sum_x*sum_y)
179 correl_den = sqrt((n*sum_x_squared-sum_x**2)*(n*sum_y_squared-sum_y**2))
181 # LET'S DO SOME MATHEMATICS! (to get around division by zero!)
182 # Unfortunately, this only works when A and B > 0 because log
183 # of a negative number only exists in complex plane.
184 # 1. correl = A/B
185 # 2. log(correl) = log(A/B) = log(A) - log(B)
186 # 3. exp(log(correl)) = exp(log(A/B))
187 # 4. correl = exp(log(A/B))
188 if correl_num > 0 and correl_den > 0:
189 log_correl = log(correl_num) - log(correl_den)
190 correlation = exp(log_correl)
191 else:
192 if correl_den != 0: 192 ↛ 195line 192 didn't jump to line 195, because the condition on line 192 was never false
193 correlation = correl_num / correl_den
194 else:
195 raise ValueError(
196 'Denominator for correlation formula to small for division')
198 return correlation
201def recursive_rolling_correlation(correl_previous, new_x_observation, lost_x_obs,
202 new_y_obs, lost_y_obs, n=settings.DEFAULT_ANALYSIS_PERIOD):
203 # TODO: after other rolling functions work...
204 pass
207def sample_mean(x: List[float]) -> float:
208 r"""
209 Returns the sample mean from a sample of data \\(\{x_1 , x_2, ... , x_n \}\\),
211 $$ \bar{x} = \frac{\sum_{i=1}^{n} x_i}/{n} $$
213 Parameters
214 ----------
215 1. **x**: ``List[Union[float,int]]``
216 List containing a sample of numerical data.
218 Raises
219 ------
220 1. **scrilla.errors.SampleSizeError**
221 If ``len(x)==0``, this error will be thrown.
222 2. **ValueError**
223 If the sample contains null or non-numerical data, this error will be thrown.
224 """
225 xbar, n = 0, len(x)
227 if not all(this_x is not None and isinstance(this_x, (float, int)) for this_x in x):
228 raise ValueError(
229 'Sample contains null values')
231 if n == 0:
232 raise errors.SampleSizeError(
233 'Sample mean cannot be computed for a sample size of 0.')
235 for i in x:
236 xbar += i/n
237 return xbar
240def recursive_rolling_mean(xbar_previous, new_obs, lost_obs, n=settings.DEFAULT_ANALYSIS_PERIOD):
241 # this should be done in terms of the sample arrays, not the observations themselves, i think.
242 xbar_next = xbar_previous + (new_obs - lost_obs)/n
243 return xbar_next
246def sample_variance(x: List[float]):
247 r"""
248 Returns the sample variance from a sample of data \\(\{x_1 , x_2, ... , x_n \}\\),
250 $$ s^2=\frac{\sum_{i=1}^{n} (x_i - \bar{x})^2}/{n-1} $$
252 Parameters
253 ----------
254 1. **x**: ``list``
255 List containing a sample of numerical data.
257 Raises
258 ------
259 1. `scrilla.errors.SampleSizeError`
261 .. notes::
262 * This a naive two-pass implementation of sample variance. The function does not shift data around the mean, resulting in skewed calculations if the variance is numerically small and the sample is large; if you are concerned about the accuracy of your variance calculation in these instances, a better method is to use `scrilla.analysis.estimators.recursive_sum_of_squares` and then compute the variance by dividing by _(n-1)_, where _n_ is the number of samples.
263 """
264 # TODO: this is a 'naive' estimation of variance: https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance
266 mu, sigma, n = sample_mean(x=x), 0, len(x)
268 if not all(this_x is not None and isinstance(this_x, (float, int)) for this_x in x): 268 ↛ 269line 268 didn't jump to line 269, because the condition on line 268 was never true
269 raise ValueError(
270 'Sample contains null values')
272 if n == 0: 272 ↛ 273line 272 didn't jump to line 273, because the condition on line 272 was never true
273 raise errors.SampleSizeError(
274 'Sample variance cannot be computed for a sample size of 0.')
276 if n == 1:
277 # no variance for a sample of 1.
278 return 0
280 for i in x:
281 sigma += ((i-mu)**2)/(n-1)
282 return sigma
285def recursive_rolling_variance(var_previous, xbar_previous, new_obs, lost_obs, n=settings.DEFAULT_ANALYSIS_PERIOD):
286 # TODO: slightly off for some reason...Formula not correct? Rework it out.
287 xbar_new = recursive_rolling_mean(xbar_previous=xbar_previous, new_obs=new_obs,
288 lost_obs=lost_obs, n=n)
289 var_new = var_previous + \
290 (n/(n-1))*((new_obs**2 - lost_obs**2)/n + (xbar_previous**2-xbar_new**2))
291 return var_new
294def recursive_sum_of_squares(x: List[float], checked: bool = False):
295 n = len(x)
297 if not checked:
298 if not all(this_x is not None and isinstance(this_x, (float, int)) for this_x in x): 298 ↛ 299line 298 didn't jump to line 299, because the condition on line 298 was never true
299 raise ValueError(
300 'Sample contains null values')
302 if n == 0: 302 ↛ 303line 302 didn't jump to line 303, because the condition on line 302 was never true
303 raise errors.SampleSizeError(
304 'Sample variance cannot be computed for a sample size of 0.')
306 if n == 1:
307 return 0
309 term_variance = (n*x[-1] - sum(x))**2/(n*(n-1))
310 return recursive_sum_of_squares(x[:-1], True) + term_variance
313def sample_covariance(x: list, y: list):
314 """
315 Parameters
316 ----------
317 1. **x**: ``list``
318 The *x* sample of paired data (*x*, *y*). Must preserve order with **y**.
319 2. **y**: ``list``
320 The *y* sample of paired data (*x*, *y*). Must preserve order with **x**.
322 Raises
323 ------
324 1. `scrilla.errors.SampleSizeError`
325 If ``len(x) != len(y)`` (samples of incomparable length) or ``len(x) in [0,1]`` (insufficient data/degrees of freedom), this error will be thrown.
326 """
328 if len(x) != len(y): 328 ↛ 329line 328 didn't jump to line 329, because the condition on line 328 was never true
329 raise errors.SampleSizeError('Samples are not of comparable length')
331 if len(x) in [0, 1]: 331 ↛ 332line 331 didn't jump to line 332, because the condition on line 331 was never true
332 raise errors.SampleSizeError(
333 'Sample correlation cannot be computed for a sample size less than or equal to 1.')
335 n, covariance = len(x), 0
337 x_mean, y_mean = sample_mean(x=x), sample_mean(x=y)
339 for i, item in enumerate(x):
340 covariance += (item - x_mean)*(y[i] - y_mean) / (n - 1)
342 return covariance
345def recursive_rolling_covariance(covar_previous: float, new_x_obs: float, lost_x_obs: float, previous_x_bar: float, new_y_obs: float, lost_y_obs: float, previous_y_bar: float, n: int = settings.DEFAULT_ANALYSIS_PERIOD):
346 # TODO: no work.
347 new_sum_term = new_x_obs*new_y_obs - lost_x_obs*lost_y_obs
348 xy_cross_term = previous_x_bar*(new_y_obs-lost_y_obs)
349 yx_cross_term = previous_y_bar*(new_x_obs-lost_x_obs)
350 perturbation = (new_x_obs-lost_x_obs)*(new_y_obs-lost_y_obs) / n
351 numerator = new_sum_term - xy_cross_term - yx_cross_term - perturbation
352 covar_new = covar_previous + numerator / (n-1)
353 return covar_new
356def simple_regression_beta(x: List[float], y: List[float]):
357 """
358 Parameters
359 ----------
360 1. **x**: ``list``
361 The *x* sample of paired data (*x*, *y*). Must preserve order with **y**.
362 2. **y**: ``list``
363 The *y* sample of paired data (*x*, *y*). Must preserve order with **x**.
365 Raises
366 ------
367 1. `scrilla.errors.statistics.SampleSizeError`
368 If ``len(x) != len(y)`` (samples of incomparable length) or ``len(x) < 3`` (insufficient data/degrees of freedom), this error will be thrown.
369 """
371 if len(x) != len(y): 371 ↛ 372line 371 didn't jump to line 372, because the condition on line 371 was never true
372 raise errors.SampleSizeError(f'len(x) = {len(x)} != len(y) = {len(y)}')
373 if len(x) < 3: 373 ↛ 374line 373 didn't jump to line 374, because the condition on line 373 was never true
374 raise errors.SampleSizeError(
375 f'Sample size of {len(x)} is less than the necessary degrees of freedom (n > 2) for regression estimation.')
377 correl = sample_correlation(x=x, y=y)
378 vol_x = sqrt(sample_variance(x=x))
379 vol_y = sqrt(sample_variance(x=y))
381 beta = correl * vol_y / vol_x
382 return beta
385def simple_regression_alpha(x: List[float], y: List[float]):
386 """
387 Parameters
388 ----------
389 1. **x**: ``list``
390 The *x* sample of paired data (*x*, *y*). Must preserve order with **y**.
391 2. **y**: ``list``
392 The *y* sample of paired data (*x*, *y*). Must preserve order with **x**.
394 Raises
395 ------
396 1. `scrilla.errors.SampleSizeError`
397 If ``len(x) != len(y)`` (samples of incomparable length) or ``len(x) < 3`` (insufficient data/degrees of freedom), this error will be thrown.
398 """
400 if len(x) != len(y): 400 ↛ 401line 400 didn't jump to line 401, because the condition on line 400 was never true
401 raise errors.SampleSizeError(
402 f'len(x) == {len(x)} != len(y) == {len(y)}')
404 if len(x) < 3: 404 ↛ 405line 404 didn't jump to line 405, because the condition on line 404 was never true
405 raise errors.SampleSizeError(
406 f'Sample size of {len(x)} is less than the necessary degrees of freedom (n > 2) for regression estimation.')
408 y_mean, x_mean = sample_mean(y), sample_mean(x)
410 alpha = y_mean - simple_regression_beta(x=x, y=y)*x_mean
411 return alpha
414def qq_series_for_sample(sample: List[float]) -> List[list]:
415 """
416 Calculates the QQ series for a sample of data, i.e. the set defined by the ordered pair of sample percentiles and theoretical normal percentiles. A sample's normality can be assessed by how linear the result graph is.
418 Parameters
419 ----------
420 1. **sample**: ``list``
421 A sample of numerical data.
422 """
423 qq_series = []
424 n = len(sample)
425 for i in range(len(sample)):
426 percentile = (i + 0.5)/n
427 percentile_sample = sample_percentile(
428 data=sample, percentile=percentile)
429 percentile_norm = norm.ppf(q=percentile)
430 qq_series += [[percentile_norm, percentile_sample]]
432 return qq_series
435def standardize(x: List[float]):
436 mu = sample_mean(x)
437 sigma = sqrt(sample_variance(x))
438 return [(this_x - mu)/sigma for this_x in x]