Coverage for src/scrilla/cache.py: 79%
458 statements
« prev ^ index » next coverage.py v6.4.2, created at 2022-07-18 18:14 +0000
« prev ^ index » next coverage.py v6.4.2, created at 2022-07-18 18:14 +0000
1# This file is part of scrilla: https://github.com/chinchalinchin/scrilla.
3# scrilla is free software: you can redistribute it and/or modify
4# it under the terms of the GNU General Public License version 3
5# as published by the Free Software Foundation.
7# scrilla is distributed in the hope that it will be useful,
8# but WITHOUT ANY WARRANTY; without even the implied warranty of
9# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10# GNU General Public License for more details.
12# You should have received a copy of the GNU General Public License
13# along with scrilla. If not, see <https://www.gnu.org/licenses/>
14# or <https://github.com/chinchalinchin/scrilla/blob/develop/main/LICENSE>.
16"""
17This module provides a data access layer for a SQLite database maintained on the local file system at the location set by the environment variable **SQLITE_FILE**. If this environment variable is not set, the file location defaults to the *installation_directory*/data/cache/scrilla.db. The database caches asset prices, statistical calculations and interest rates. This allows the program to avoid excessive API calls to external services for calculations that involve the same quantity. For instance, to calculate correlation, the mean and variance of the individual assets must be calculated over the price history of each before the correlation is calculated over their combined price history; this involves four references to a sample of prices, at different points in the program which do not necessarily share scope with the location of the other calculations, so they can not share the in-memory version of the prices.
19In addition to preventing excessive API calls, the cache prevents redundant calculations. For example, calculating the market beta for a series of assets requires the variance of the market proxy for each calculation. Rather than recalculate this quantity each time, the program will defer to the values stored in the cache.
20"""
21import itertools
22import datetime
23import sqlite3
24from typing import Union
25import uuid
27from scrilla import files, settings
28from scrilla.cloud import aws
29from scrilla.static import config, keys
30from scrilla.util import dater, errors, outputter
32logger = outputter.Logger("scrilla.cache", settings.LOG_LEVEL)
35class Singleton(type):
37 _instances = {}
39 def __call__(cls, *args, **kwargs):
40 if cls not in cls._instances:
41 cls._instances[cls] = super(
42 Singleton, cls).__call__(*args, **kwargs)
43 else:
44 cls._instances[cls].__init__(*args, **kwargs)
45 return cls._instances[cls]
48class Cache():
49 """
50 Class with static methods all other Caches employ. This class tries to hide as much implementation detail as possible behind its methods, i.e. this class is concerned with executing commits and transactions, whereas the other cache classes are concerned with the data structure that is created with these methods.
51 """
53 @staticmethod
54 def provision(table_configuration, mode=settings.CACHE_MODE):
55 if mode == 'dynamodb': 55 ↛ exitline 55 didn't return from function 'provision', because the condition on line 55 was never false
56 logger.debug(
57 f'Provisioning {table_configuration["TableName"]} DynamoDB Table', 'Cache.provision')
58 return aws.dynamo_table(table_configuration)
60 @staticmethod
61 def execute(query, formatter=None, mode=settings.CACHE_MODE):
62 """
63 Executes and commits a transaction against the cache.
65 Parameters
66 ----------
67 1. **transaction**: ``str``
68 Statement to be executed and committed.
69 2. formatter: `Union[dict, List[dict]]``
70 Dictionary of parameters used to format statement. Statements are formatted with DB-API's name substitution. See [sqlite3 documentation](https://docs.python.org/3/library/sqlite3.html) for more information. A list of dictionaries can be passed in to perform a batch execute transaction. If nothing is passed in, method will assume the query is unparameterized.
71 """
72 if mode == 'sqlite': 72 ↛ 84line 72 didn't jump to line 84, because the condition on line 72 was never false
73 con = sqlite3.connect(settings.CACHE_SQLITE_FILE)
74 executor = con.cursor()
75 if formatter is not None:
76 if isinstance(formatter, list):
77 response = executor.executemany(
78 query, formatter).fetchall()
79 else:
80 response = executor.execute(query, formatter).fetchall()
81 else:
82 response = executor.execute(query).fetchall()
83 con.commit(), con.close()
84 elif mode == 'dynamodb':
85 response = aws.dynamo_statement(query, formatter)
86 else:
87 raise errors.ConfigurationError(
88 'CACHE_MODE has not been set in "settings.py"')
89 return response
92class PriceCache(metaclass=Singleton):
93 """
94 `scrilla.cache.PriceCache` statically accesses *SQLite* functionality from `scrilla.cache.Cache`. It extends basic functionality to cache interest rate data in a table with columns ``(ticker, date, open, close)``. `scrilla.cache.PriceCache` has a `scrilla.cache.Singleton` for its `metaclass`, meaning `PriceCache` is a singleton; it can only be created once; any subsequent instantiations will return the same instance of `PriceCache`. This is done so that all instances of `PriceCache` share the same `self.internal_cache`, allowing frequently accessed data to be stored in memory.
96 Attributes
97 ----------
98 1. **internal_cache**: ``dict``
99 Dictionary used by `PriceCache` to store API responses in memory. Used to quickly access data that is requested frequently.
100 2. **inited**: ``bool``
101 Flag used to determine if `InterestCache` has been instantiated prior to current instantiation.
102 3. **sqlite_create_table_transaction**: ``str``
103 *SQLite* transaction passed to the super class used to create price cache table if it does not already exist.
104 4. **sqlite_insert_row_transaction**: ``str``
105 *SQLite* transaction used to insert row into price cache table.
106 5. **sqlite_price_query**: ``str```
107 *SQLite* query to retrieve prices from cache.
108 """
109 internal_cache = {}
110 inited = False
111 sqlite_create_table_transaction = "CREATE TABLE IF NOT EXISTS prices (ticker text, date text, open real, close real, UNIQUE(ticker, date))"
112 sqlite_insert_row_transaction = "INSERT OR IGNORE INTO prices (ticker, date, open, close) VALUES (:ticker, :date, :open, :close)"
113 sqlite_price_query = "SELECT date, open, close FROM prices WHERE ticker = :ticker AND date <= date(:end_date) AND date >= date(:start_date) ORDER BY date(date) DESC"
115 dynamodb_table_configuration = config.dynamo_price_table_conf
117 dynamodb_insert_transaction = "INSERT INTO \"prices\" VALUE {'ticker': ?, 'date': ?, 'open': ?, 'close': ? }"
118 dynamodb_price_query = "SELECT \"date\", \"open\", \"close\" FROM \"prices\" WHERE \"ticker\"=? AND \"date\">=? AND \"date\"<=?"
119 # No PartiQL ORDER BY clause yet: https://github.com/partiql/partiql-lang-kotlin/issues/47
120 dynamodb_identity_query = "EXISTS(SELECT ticker FROM \"prices\" WHERE ticker=? and date= ?)"
122 @staticmethod
123 def to_dict(query_results, mode=settings.CACHE_MODE):
124 """
125 Returns the SQLite query results formatted for the application.
127 Parameters
128 ----------
129 1. **query_results**: ``list``
130 Raw SQLite query results.
131 """
132 if mode == 'sqlite': 132 ↛ 133line 132 didn't jump to line 133, because the condition on line 132 was never true
133 return {
134 result[0]: {
135 keys.keys['PRICES']['OPEN']: result[1],
136 keys.keys['PRICES']['CLOSE']: result[2]
137 } for result in query_results
138 }
139 elif mode == 'dynamodb': 139 ↛ exitline 139 didn't return from function 'to_dict', because the condition on line 139 was never false
140 dates = [result['date'] for result in query_results]
141 dates.sort(key=dater.parse)
142 dates.reverse()
143 formatted_results = {
144 result['date']: {
145 keys.keys['PRICES']['OPEN']: result[keys.keys['PRICES']['OPEN']],
146 keys.keys['PRICES']['CLOSE']: result[keys.keys['PRICES']['CLOSE']]
147 } for result in query_results
148 }
149 return {key: formatted_results[key] for key in dates}
151 @staticmethod
152 def _to_params(ticker, prices):
153 return [
154 {
155 'ticker': ticker,
156 'date': date,
157 'open': prices[date][keys.keys['PRICES']['OPEN']],
158 'close': prices[date][keys.keys['PRICES']['CLOSE']]
159 } for date in prices
160 ]
162 def __init__(self, mode=settings.CACHE_MODE):
163 """
164 Initializes `PriceCache`. A random UUID will be assigned to the `PriceCache` the first time it is created. Since `PriceCache` is a singelton, all subsequent instantiations of `PriceCache` will have the same UUID.
166 Parameters
167 ----------
168 1. **mode**: ``str``
169 Determines the data source that acts as the cache. Defaults to `scrilla.settings.CACHE_MODE`. Can be set to either `sqlite` or `dynamodb`.
170 """
171 if not self.inited:
172 self.uuid = uuid.uuid4()
173 self.inited = True
175 self.mode = mode
177 if not files.get_memory_json()['cache'][mode]['prices']: 177 ↛ exitline 177 didn't return from function '__init__', because the condition on line 177 was never false
178 self._table()
180 def _table(self):
181 if self.mode == 'sqlite':
182 Cache.execute(query=self.sqlite_create_table_transaction,
183 mode=self.mode)
184 elif self.mode == 'dynamodb': 184 ↛ exitline 184 didn't return from function '_table', because the condition on line 184 was never false
185 self.dynamodb_table_configuration = aws.dynamo_table_conf(
186 self.dynamodb_table_configuration)
187 Cache.provision(self.dynamodb_table_configuration, self.mode)
189 def _insert(self):
190 if self.mode == 'sqlite': 190 ↛ 192line 190 didn't jump to line 192, because the condition on line 190 was never false
191 return self.sqlite_insert_row_transaction
192 elif self.mode == 'dynamodb':
193 return self.dynamodb_insert_transaction
195 def _query(self):
196 if self.mode == 'sqlite': 196 ↛ 198line 196 didn't jump to line 198, because the condition on line 196 was never false
197 return self.sqlite_price_query
198 elif self.mode == 'dynamodb':
199 return self.dynamodb_price_query
201 def _update_internal_cache(self, ticker, prices):
202 if ticker not in list(self.internal_cache):
203 self.internal_cache[ticker] = prices
204 else:
205 self.internal_cache[ticker].update(prices)
207 def _retrieve_from_internal_cache(self, ticker, start_date, end_date):
208 dates = list(self.internal_cache[ticker].keys())
209 start_string = dater.to_string(start_date)
210 end_string = dater.to_string(end_date)
212 if start_string in dates and end_string in dates:
213 end_index = dates.index(start_string)
214 start_index = dates.index(end_string)
215 if start_index > end_index:
216 # NOTE: DynamoDB respones are not necessarily ordered
217 # `to_dict` will take care of ordering
218 start_index, end_index = end_index, start_index
219 prices = dict(itertools.islice(
220 self.internal_cache[ticker].items(), start_index, end_index+1))
221 return prices
222 return None
224 def save_rows(self, ticker, prices):
225 self._update_internal_cache(ticker, prices)
226 logger.verbose(
227 F'Attempting to insert {ticker} prices to cache', 'ProfileCache.save_rows')
228 Cache.execute(
229 query=self._insert(),
230 formatter=self._to_params(ticker, prices),
231 mode=self.mode
232 )
234 def filter(self, ticker, start_date, end_date):
235 if ticker in list(self.internal_cache):
236 prices = self._retrieve_from_internal_cache(
237 ticker, start_date, end_date)
238 if prices is not None:
239 logger.debug(f'{ticker} prices found in memory',
240 'ProfileCachce.filter')
241 return prices
243 logger.debug(
244 f'Querying {self.mode} cache \n\t{self._query()}\n\t\t with :ticker={ticker}, :start_date={start_date}, :end_date={end_date}', 'ProfileCache.filter')
245 formatter = {'ticker': ticker,
246 'start_date': start_date, 'end_date': end_date}
247 results = Cache.execute(
248 query=self._query(),
249 formatter=formatter,
250 mode=self.mode)
252 if len(results) > 0: 252 ↛ 253line 252 didn't jump to line 253, because the condition on line 252 was never true
253 logger.debug(
254 f'Found {ticker} prices in the cache', 'ProfileCache.filter')
255 prices = self.to_dict(results)
256 self._update_internal_cache(ticker, prices)
257 return prices
258 logger.debug(
259 f'No results found for {ticker} prices in the cache', 'ProfileCache.filter')
260 return None
263class InterestCache(metaclass=Singleton):
264 """
265 `scrilla.cache.InterestCache` statically accesses *SQLite* functionality from `scrilla.cache.Cache`. It extends basic functionality to cache interest rate data in a table with columns ``(maturity, date, value)``. `scrilla.cache.InterestCache` has a `scrilla.cache.Singleton` for its `metaclass`, meaning `InterestCache` is a singleton; it can only be created once; any subsequent instantiations will return the same instance of `InterestCache`.This is done so that all instances of `InterestCache` share the same `self.internal_cache`, allowing frequently accessed data to be stored in memory.
268 Attributes
269 ----------
270 1. **internal_cache**: ``dict``
271 Dictionary used by `InterestCache` to store API responses in memory. Used to quickly access data that is requested frequently.
272 2. **inited**: ``bool``
273 Flag used to determine if `InterestCache` has been instantiated prior to current instantiation.
274 2. **sqlite_create_table_transaction**: ``str``
275 *SQLite* transaction passed to `scrilla.cache.Cache` used to create interest cache table if it does not already exist.
276 3. **sqlite_insert_row_transaction**: ``str``
277 *SQLite* transaction used to insert row into correlation cache table.
278 4. **sqlite_interest_query**: ``str```
279 *SQLite* query to retrieve an interest from cache.
280 5. **dynamodb_table_configuration**: ``str``
281 6. **dynamo_insert_transaction**: ``str``
282 7. **dynamo_query**: ``str``
283 8. **dynamo_identity_query**: ``str``
284 """
285 internal_cache = {}
286 inited = False
287 sqlite_create_table_transaction = "CREATE TABLE IF NOT EXISTS interest(maturity text, date text, value real, UNIQUE(maturity, date))"
288 sqlite_insert_row_transaction = "INSERT OR IGNORE INTO interest (maturity, date, value) VALUES (:maturity, :date, :value)"
289 sqlite_interest_query = "SELECT date, value FROM interest WHERE maturity=:maturity AND date <=date(:end_date) AND date>=date(:start_date) ORDER BY date(date) DESC"
291 dynamodb_table_configuration = config.dynamo_interest_table_conf
292 dynamodb_insert_transaction = "INSERT INTO \"interest\" VALUE {'maturity': ?, 'date': ?, 'value': ? }"
293 dynamodb_query = "SELECT \"date\", \"value\" FROM \"interest\" WHERE \"maturity\"=? AND \"date\">=? AND \"date\"<=?"
294 # NOTE: No PartiQL ORDER BY clause yet: https://github.com/partiql/partiql-lang-kotlin/issues/47
295 dynamodb_identity_query = "EXISTS(SELECT 'maturity' FROM \"interest\" WHERE 'maturity'=? AND 'date'<= ?)"
297 @staticmethod
298 def to_dict(query_results, mode=settings.CACHE_MODE):
299 """
300 Returns the SQLite query results formatted for the application.
302 Parameters
303 ----------
304 1. **query_results**: ``list``
305 Raw SQLite query results.
306 """
307 if mode == 'sqlite': 307 ↛ 308line 307 didn't jump to line 308, because the condition on line 307 was never true
308 return {result[0]: result[1] for result in query_results}
309 elif mode == 'dynamodb': 309 ↛ exitline 309 didn't return from function 'to_dict', because the condition on line 309 was never false
310 # TODO: need to order by date!
311 dates = [result['date'] for result in query_results]
312 dates.sort(key=dater.parse)
313 dates.reverse()
314 formatted_results = {result['date']: result['value']
315 for result in query_results}
316 return {key: formatted_results[key] for key in dates}
318 @staticmethod
319 def _to_params(rates):
320 params = []
321 for date in rates:
322 for index, maturity in enumerate(keys.keys['YIELD_CURVE']):
323 entry = {
324 'maturity': maturity,
325 'date': date,
326 'value': rates[date][index]
327 }
328 params.append(entry)
329 return params
331 def __init__(self, mode=settings.CACHE_MODE):
332 """
333 Initializes `ProfileCache`. A random UUID will be assigned to the `InteretCache` the first time it is created. Since `InterestCache` is a singelton, all subsequent instantiations of `InterestCache` will have the same UUID.
335 Parameters
336 ----------
337 1. **mode**: ``str``
338 Determines the data source that acts as the cache. Defaults to `scrilla.settings.CACHE_MODE`. Can be set to either `sqlite` or `dynamodb`.
339 """
340 if not self.inited:
341 self.uuid = uuid.uuid4()
342 self._init_internal_cache()
343 self.inited = True
345 self.mode = mode
347 if not files.get_memory_json()['cache'][self.mode]['interest']: 347 ↛ exitline 347 didn't return from function '__init__', because the condition on line 347 was never false
348 self._table()
350 def _table(self):
351 if self.mode == 'sqlite':
352 Cache.execute(query=self.sqlite_create_table_transaction,
353 mode=self.mode)
354 elif self.mode == 'dynamodb': 354 ↛ exitline 354 didn't return from function '_table', because the condition on line 354 was never false
355 self.dynamodb_table_configuration = aws.dynamo_table_conf(
356 self.dynamodb_table_configuration)
357 Cache.provision(self.dynamodb_table_configuration, self.mode)
359 def _init_internal_cache(self):
360 for maturity in keys.keys['YIELD_CURVE']:
361 self.internal_cache[maturity] = {}
363 def _insert(self):
364 if self.mode == 'sqlite': 364 ↛ 366line 364 didn't jump to line 366, because the condition on line 364 was never false
365 return self.sqlite_insert_row_transaction
366 elif self.mode == 'dynamodb':
367 return self.dynamodb_insert_transaction
369 def _query(self):
370 if self.mode == 'sqlite': 370 ↛ 372line 370 didn't jump to line 372, because the condition on line 370 was never false
371 return self.sqlite_interest_query
372 elif self.mode == 'dynamodb':
373 return self.dynamodb_query
375 def _save_internal_cache(self, rates):
376 """
377 Stores interest rate data in an internal cache, to minimize direct queries to the cache.
379 Parameters
380 ----------
381 1. **rates**: ``dict``
382 Dictionary containing interest rate data that needs persisted in-memory.
384 .. notes::
385 - The internal cache data structure is as follows,
386 ```json
387 {
388 "maturity": {
389 "date": "value"
390 "date": "value"
391 },
393 }
394 ```
395 """
396 for date in rates:
397 for index, maturity in enumerate(keys.keys['YIELD_CURVE']):
398 self.internal_cache[maturity][date] = rates[date][index]
400 def _update_internal_cache(self, values, maturity):
401 self.internal_cache[maturity].update(values)
403 def _retrieve_from_internal_cache(self, maturity, start_date, end_date):
404 dates = list(self.internal_cache[maturity].keys())
405 start_string, end_string = dater.to_string(
406 start_date), dater.to_string(end_date)
408 if start_string in dates and end_string in dates:
409 start_index = dates.index(start_string)
410 end_index = dates.index(end_string)
412 if start_index > end_index: 412 ↛ 415line 412 didn't jump to line 415, because the condition on line 412 was never true
413 # NOTE: DynamoDB respones are not necessarily ordered
414 # `to_dict` will take care of ordering
415 start_index, end_index = end_index, start_index
417 rates = dict(itertools.islice(
418 self.internal_cache[maturity].items(), start_index, end_index+1))
420 if dater.business_days_between(start_date, end_date) == len(rates):
421 logger.debug('Found interest in memory',
422 'InterestCache._retrieve_from_internal_cache')
423 return rates
424 return None
426 def save_rows(self, rates):
427 """
429 .. notes::
430 - this is called with the response from `scrilla.services.StatManaget.get_interest_rates()`. At this point, the data should be formatted as folows,
431 ```json
432 {
433 "date" : [ "value", "value", ... , "value" ],
434 "date" : [ "value", "value", ... , "value" ]
435 }
436 ```
437 """
439 self._save_internal_cache(rates)
440 logger.verbose(
441 'Attempting to insert interest rates into cache', 'InterestCache.save_rows')
442 Cache.execute(
443 query=self._insert(),
444 formatter=self._to_params(rates)
445 )
447 def filter(self, maturity, start_date, end_date):
448 """
450 .. notes::
451 - `scrilla.cache.InterestCache.filter()` is called in `scrilla.services.get_daily_interest_history()` _before_ the API response from the Treasury is saved, i.e. before `scrilla.cache.InterestCache.save_rows()` and thus `scrilla.cache.InterestCache._save_internal_cache()` are called. If the application has just been installed and the cache is empty, then nothing unusual happens. If the application has just been installed and the cache is not empty (perhaps the application was re-installed or data has been inserted manually into the cache), then calling `filter` will return results and those results will populate the internal_cache with a `scrilla.InterestCache._update_internal_cache()` call, meaning in this case the internal cache is hydrated by the `update` method instead of the `save` method. In other words, the internal cache has two different entrypoints and care must be taken so both are taken into account when initializing the internal cache.
453 """
454 rates = self._retrieve_from_internal_cache(
455 maturity, start_date, end_date)
456 if rates is not None:
457 logger.debug(f'{maturity} interet found in memory',
458 'InterestCache.filter')
459 return rates
461 logger.debug(
462 f'Querying {self.mode} cache \n\t{self._query()}\n\t\t with :maturity={maturity}, :start_date={start_date}, :end_date={end_date}',
463 'InterestCache.filter')
464 formatter = {'maturity': maturity,
465 'start_date': start_date, 'end_date': end_date}
466 results = Cache.execute(
467 query=self._query(), formatter=formatter, mode=self.mode)
468 # NOTE: [ [ 'date', 'value ] ] at this point
470 if len(results) > 0: 470 ↛ 471line 470 didn't jump to line 471, because the condition on line 470 was never true
471 logger.debug(
472 f'Found {maturity} yield on in the cache', 'InterestCache.filter')
473 rates = self.to_dict(results)
474 # NOTE: { 'date': 'value' } at this point
475 self._update_internal_cache(rates, maturity)
476 return rates
478 logger.debug(
479 f'No results found for {maturity} yield in cache', 'InterestCache.filter')
480 return None
483class CorrelationCache(metaclass=Singleton):
484 """
485 `scrilla.cache.CorrelationCache` statically accesses *SQLite* functionality from `scrilla.cache.Cache`. It extends basic functionality to cache correlations in a table with columns ``(ticker_1, ticker_2, start_date, end_date, correlation, method, weekends)``. `scrilla.cache.CorrelationCache` has a `scrilla.cache.Singleton` for its `metaclass`, meaning `CorrelationCache` is a singleton; it can only be created once; any subsequent instantiations will return the same instance of `CorrelationCache`.This is done so that all instances of `CorrelationCache` share the same `self.internal_cache`, allowing frequently accessed data to be stored in memory.
487 Attributes
488 ----------
489 1. **internal_cache**: ``dict``
490 Dictionary used by `CorrelationCache` to store API responses in memory. Used to quickly access data that is requested frequently.
491 2. **inited**: ``bool``
492 Flag used to determine if `CorrelationCache` has been instantiated prior to current instantiation.
493 3. **sqlite_create_table_transaction**: ``str``
494 *SQLite* transaction passed to the super class used to create correlation cache table if it does not already exist.
495 4. **sqlite_insert_row_transaction**: ``str``
496 *SQLite* transaction used to insert row into correlation cache table.
497 5. **sqlite_correlation_query**: ``str```
498 *SQLite* query to retrieve correlation from cache.
500 .. notes::
501 * do not need to order `correlation_query` and `profile_query` because profiles and correlations are uniquely determined by the (`start_date`, `end_date`, 'ticker_1', 'ticker_2')-tuple. More or less. There is a bit of fuzziness, since the permutation of the previous tuple, ('start_date', 'end_date', 'ticker_2', 'ticker_1'), will also be associated with the same correlation value. No other mappings between a date's correlation value and the correlation's tickers are possible though. In other words, the query, for a given (ticker_1, ticker_2)-permutation will only ever return one result.
502 * `method` corresponds to the estimation method used by the application to calculate a given statistic.
503 * `weekends` corresponds to a flag representing whether or not the calculation used weekends. This will always be 0 in the case of equities, but for cryptocurrencies, this flag is important and will affect the calculation.
504 """
505 internal_cache = {}
506 inited = False
507 sqlite_create_table_transaction = "CREATE TABLE IF NOT EXISTS correlations (ticker_1 TEXT, ticker_2 TEXT, start_date TEXT, end_date TEXT, correlation REAL, method TEXT, weekends INT)"
508 sqlite_insert_row_transaction = "INSERT INTO correlations (ticker_1, ticker_2, start_date, end_date, correlation, method, weekends) VALUES (:ticker_1, :ticker_2, :start_date, :end_date, :correlation, :method, :weekends)"
509 sqlite_correlation_query = "SELECT correlation FROM correlations WHERE ticker_1=:ticker_1 AND ticker_2=:ticker_2 AND start_date=date(:start_date) AND end_date=date(:end_date) AND method=:method AND weekends=:weekends"
511 dynamodb_table_configuration = config.dynamo_correlation_table_conf
512 dynamodb_insert_transaction = "INSERT INTO \"correlations\" VALUE { 'ticker_1': ?, 'ticker_2': ?, 'end_date': ?, 'start_date': ?, 'method': ?, 'weekends': ?, 'id': ?, 'correlation': ? }"
513 dynamodb_query = "SELECT correlation FROM \"correlations\" WHERE \"ticker_1\"=? AND \"ticker_2\"=? AND \"end_date\"=? AND \"start_date\"=? AND \"method\"=? AND \"weekends\"=?"
514 dynamodb_identity_query = "EXISTS(SELECT correlation FROM \"correlations\" WHERE \"ticker_1\"=? AND \"ticker_2\"=? AND \"end_date\"=? AND \"start_date\"=? AND \"method\"=? AND \"weekends\"=?)"
516 @staticmethod
517 def to_dict(query_results):
518 """
519 Returns the SQLite query results formatted for the application.
521 Parameters
522 ----------
523 1. **query_results**: ``list``
524 Raw SQLite query results.
525 """
526 return {keys.keys['STATISTICS']['CORRELATION']: query_results[0][0]}
528 @staticmethod
529 def generate_id(params):
530 hashish_key = ''
531 for param in params.values():
532 if isinstance(param, str):
533 hashish_key += param
534 elif isinstance(param, (float, int)):
535 hashish_key += str(param)
536 elif isinstance(param, datetime.date): 536 ↛ 531line 536 didn't jump to line 531, because the condition on line 536 was never false
537 hashish_key += dater.to_string(param)
538 return hashish_key
540 def __init__(self, mode=settings.CACHE_MODE):
541 """
542 Initializes `CorrelationCache`. A random UUID will be assigned to the `CorrelationCache` the first time it is created. Since `CorrelationCache` is a singelton, all subsequent instantiations of `CorrelationCache` will have the same UUID.
544 Parameters
545 ----------
546 1. **mode**: ``str``
547 Determines the data source that acts as the cache. Defaults to `scrilla.settings.CACHE_MODE`. Can be set to either `sqlite` or `dynamodb`.
548 """
549 if not self.inited:
550 self.uuid = uuid.uuid4()
551 self.inited = True
552 self.mode = mode
553 if not files.get_memory_json()['cache'][mode]['correlations']: 553 ↛ exitline 553 didn't return from function '__init__', because the condition on line 553 was never false
554 self._table()
556 def _table(self):
557 if self.mode == 'sqlite':
558 Cache.execute(query=self.sqlite_create_table_transaction,
559 mode=self.mode)
560 elif self.mode == 'dynamodb': 560 ↛ exitline 560 didn't return from function '_table', because the condition on line 560 was never false
561 self.dynamodb_table_configuration = aws.dynamo_table_conf(
562 self.dynamodb_table_configuration)
563 Cache.provision(self.dynamodb_table_configuration, self.mode)
565 def _insert(self):
566 if self.mode == 'sqlite': 566 ↛ 568line 566 didn't jump to line 568, because the condition on line 566 was never false
567 return self.sqlite_insert_row_transaction
568 elif self.mode == 'dynamodb':
569 return self.dynamodb_insert_transaction
571 def _query(self):
572 if self.mode == 'sqlite': 572 ↛ 574line 572 didn't jump to line 574, because the condition on line 572 was never false
573 return self.sqlite_correlation_query
574 elif self.mode == 'dynamodb':
575 return self.dynamodb_query
577 def _update_internal_cache(self, params, permuted_params, correlation):
578 correl_id = self.generate_id(params)
579 permuted_id = self.generate_id(permuted_params)
580 self.internal_cache[correl_id] = {'correlation': correlation}
581 self.internal_cache[permuted_id] = {'correlation': correlation}
582 pass
584 def _retrieve_from_internal_cache(self, params, permuted_params):
585 first_id = self.generate_id(params)
586 second_id = self.generate_id(permuted_params)
587 if first_id in list(self.internal_cache):
588 return self.internal_cache[first_id]
589 if second_id in list(self.internal_cache): 589 ↛ 590line 589 didn't jump to line 590, because the condition on line 589 was never true
590 return self.internal_cache[second_id]
591 return None
593 def save_row(self, ticker_1: str, ticker_2: str, start_date: datetime.date, end_date: datetime.date, correlation: float, weekends: bool, method: str = settings.ESTIMATION_METHOD):
594 """
595 Uses `self.insert_row_transaction` to save the passed-in information to the SQLite cache.
597 Parameters
598 ----------
599 1. **ticker_1**: ``str``
600 2. **ticker_2**: ``str``
601 3. **start_date**: ``datetime.date``
602 4. **end_date**: ``datetime.date``
603 5. **correlation**: ``float``
604 6. **weekends**: ``bool``
605 7. **method**: ``str``
606 *Optional*. Method used to calculate the correlation. Defaults to `scrilla.settings.ESTIMATION_METHOD`, which in turn is configured by the environment variable, *DEFAULT_ESTIMATION_METHOD*.
607 """
608 # TODO: it would probably make more sense passing in **kwargs...
609 logger.verbose(
610 f'Saving ({ticker_1}, {ticker_2}) correlation from {start_date} to {end_date} to the cache',
611 'CorrelationCache.save_row')
612 formatter_1 = {'ticker_1': ticker_1, 'ticker_2': ticker_2,
613 'end_date': end_date, 'start_date': start_date,
614 'method': method, 'weekends': weekends}
615 formatter_2 = {'ticker_1': ticker_2, 'ticker_2': ticker_1,
616 'end_date': end_date, 'start_date': start_date,
617 'method': method, 'weekends': weekends}
619 # NOTE: if correlation or id are in the dictionary, it screws up this call, so
620 # add them after this call. Either that, or add a conditional to the following
621 # method.
622 self._update_internal_cache(formatter_1, formatter_2, correlation)
624 key_1 = self.generate_id(formatter_1)
625 key_2 = self.generate_id(formatter_2)
627 formatter_1.update({'id': key_1, 'correlation': correlation})
628 formatter_2.update({'id': key_2, 'correlation': correlation})
630 Cache.execute(
631 query=self._insert(), formatter=[formatter_1, formatter_2], mode=self.mode)
632 Cache.execute(
633 query=self._insert(), formatter=formatter_2, mode=self.mode)
635 def filter(self, ticker_1, ticker_2, start_date, end_date, weekends, method=settings.ESTIMATION_METHOD):
636 formatter_1 = {'ticker_1': ticker_1, 'ticker_2': ticker_2,
637 'end_date': end_date, 'start_date': start_date,
638 'method': method, 'weekends': weekends}
639 formatter_2 = {'ticker_1': ticker_2, 'ticker_2': ticker_1,
640 'end_date': end_date, 'start_date': start_date,
641 'method': method, 'weekends': weekends}
643 memory = self._retrieve_from_internal_cache(formatter_1, formatter_2)
644 if memory is not None:
645 return memory
647 logger.debug(
648 f'Querying {self.mode} cache \n\t{self._query()}\n\t\t with :ticker_1={ticker_1}, :ticker_2={ticker_2},:start_date={start_date}, :end_date={end_date}', 'CorrelationCache.filter')
649 results = Cache.execute(
650 query=self._query(), formatter=formatter_1, mode=self.mode)
652 if len(results) > 0: 652 ↛ 653line 652 didn't jump to line 653, because the condition on line 652 was never true
653 logger.debug(
654 f'Found ({ticker_1},{ticker_2}) correlation in the cache', 'CorrelationCache.filter')
655 if self.mode == 'sqlite':
656 correl = self.to_dict(results)
657 elif self.mode == 'dynamodb':
658 correl = results[0]
659 self._update_internal_cache(formatter_1, formatter_2, correl)
660 return correl
662 results = Cache.execute(
663 query=self._query(), formatter=formatter_2, mode=self.mode)
665 if len(results) > 0: 665 ↛ 666line 665 didn't jump to line 666, because the condition on line 665 was never true
666 logger.debug(
667 f'Found ({ticker_1},{ticker_2}) correlation in the cache', 'CorrelationCache.filter')
668 if self.mode == 'sqlite':
669 correl = self.to_dict(results)
670 elif self.mode == 'dynamodb':
671 correl = results[0]
672 self._update_internal_cache(formatter_1, formatter_2, correl)
673 return correl
674 logger.debug(
675 f'No results found for ({ticker_1}, {ticker_2}) correlation in the cache', 'CorrelationCache.filter')
676 return None
679class ProfileCache(metaclass=Singleton):
680 """
681 `scrilla.cache.ProfileCache` statically accesses *SQLite* functionality from `scrilla.cache.Cache`. It extends basic functionality to cache correlations in a table with columns ``(ticker, start_date, end_date, annual_return, annual_volatility, sharpe_ratio, asset_beta, method, weekends)``. `scrilla.cache.ProfileCache` has a `scrilla.cache.Singleton` for its `metaclass`, meaning `CorrelationCache` is a singleton; it can only be created once; any subsequent instantiations will return the same instance of `CorrelationCache`.This is done so that all instances of `CorrelationCache` share the same `self.internal_cache`, allowing frequently accessed data to be stored in memory.
683 Attributes
684 ----------
685 1. **internal_cache**: ``dict``
686 Dictionary used by `PriceCache` to store API responses in memory. Used to quickly access data that is requested frequently.
687 2. **inited**: ``bool``
688 Flag used to determine if `InterestCache` has been instantiated prior to current instantiation.
689 3. **sqlite_create_table_transaction**: ``str``
690 *SQLite* transaction passed to `scrilla.cache.Cache` used to create profile cache table if it does not already exist.
691 4. **sqlite_insert_row_transaction**: ``str``
692 *SQLite* transaction used to insert row into correlation cache table.
693 5. **sqlite_interest_query**: ``str```
694 *SQLite* query to retrieve an interest from cache.
695 6. **dynamodb_table_configuration**: ``str``
696 Configuration posted to **DynamoDB** when provisioning cache tables.
697 7. **dynamo_insert_transaction**: ``str``
698 **PartiQL** statement used to insert new value into the **DynamoDB** tables
699 8. **dynamo_query**: ``str``
700 9. **dynamo_identity_query**: ``str``
702 .. notes::
703 * do not need to order `correlation_query` and `profile_query` because profiles and correlations are uniquely determined by the (`start_date`, `end_date`, 'ticker_1', 'ticker_2')-tuple. More or less. There is a bit of fuzziness, since the permutation of the previous tuple, ('start_date', 'end_date', 'ticker_2', 'ticker_1'), will also be associated with the same correlation value. No other mappings between a date's correlation value and the correlation's tickers are possible though. In other words, the query, for a given (ticker_1, ticker_2)-permutation will only ever return one result.
704 * `method` corresponds to the estimation method used by the application to calculate a given statistic.
705 * `weekends` corresponds to a flag representing whether or not the calculation used weekends. This will always be 0 in the case of equities, but for cryptocurrencies, this flag is important and will affect the calculation.
706 """
707 internal_cache = {}
708 inited = False
709 sqlite_create_table_transaction = "CREATE TABLE IF NOT EXISTS profile (id INTEGER PRIMARY KEY, ticker TEXT, start_date TEXT, end_date TEXT, annual_return REAL, annual_volatility REAL, sharpe_ratio REAL, asset_beta REAL, equity_cost REAL, method TEXT, weekends INT)"
710 sqlite_filter = "ticker=:ticker AND start_date=date(:start_date) AND end_date=date(:end_date) AND :method=method AND weekends=:weekends"
711 sqlite_identity_query = "SELECT id FROM profile WHERE ticker=:ticker AND start_date=:start_date AND end_date=:end_date AND method=:method AND weekends=:weekends"
712 sqlite_profile_query = "SELECT ifnull(annual_return, 'empty'), ifnull(annual_volatility, 'empty'), ifnull(sharpe_ratio, 'empty'), ifnull(asset_beta, 'empty'), ifnull(equity_cost, 'empty') FROM profile WHERE {sqlite_filter}".format(
713 sqlite_filter=sqlite_filter)
715 dynamodb_table_configuration = config.dynamo_profile_table_conf
716 dynamodb_profile_query = "SELECT annual_return,annual_volatility,sharpe_ratio,asset_beta,equity_cost FROM \"profile\" WHERE ticker=? AND start_date=? AND end_date=? AND method=? AND weekends=?"
717 # dynamodb_identity_query = "EXISTS(SELECT * FROM \"profile\" WHERE ticker=? AND start_date=? AND end_date=? AND method=? AND weekends=?)"
718 # See NOTE in save_or_update_row
719 dynamodb_identity_query = "SELECT * FROM \"profile\" WHERE ticker=? AND start_date=? AND end_date=? AND method=? AND weekends=?"
721 @staticmethod
722 def to_dict(query_result, mode=settings.CACHE_MODE):
723 """
724 Returns the SQLite query results formatted for the application.
726 Parameters
727 ----------
728 1. **query_results**: ``list``
729 Raw SQLite query results.
730 """
731 if mode == 'sqlite':
732 return {
733 keys.keys['STATISTICS']['RETURN']: query_result[0][0] if query_result[0][0] != 'empty' else None,
734 keys.keys['STATISTICS']['VOLATILITY']: query_result[0][1] if query_result[0][1] != 'empty' else None,
735 keys.keys['STATISTICS']['SHARPE']: query_result[0][2] if query_result[0][2] != 'empty' else None,
736 keys.keys['STATISTICS']['BETA']: query_result[0][3] if query_result[0][3] != 'empty' else None,
737 keys.keys['STATISTICS']['EQUITY']: query_result[0][4] if query_result[0][4] != 'empty' else None
738 }
739 elif mode == 'dynamodb':
740 return query_result[0]
742 @staticmethod
743 def _construct_update(params, mode=settings.CACHE_MODE):
744 if mode == 'sqlite': 744 ↛ 752line 744 didn't jump to line 752, because the condition on line 744 was never false
745 update_query = 'UPDATE profile SET '
746 for param in params.keys():
747 update_query += f'{param}=:{param}'
748 if list(params.keys()).index(param) != len(params)-1:
749 update_query += ','
750 update_query += " WHERE ticker=:ticker AND start_date=:start_date AND end_date=:end_date AND method=:method AND weekends=:weekends"
751 return update_query
752 elif mode == 'dynamodb':
753 update_query = 'UPDATE profile '
754 for param in params.keys():
755 update_query += f'SET {param}=? '
756 update_query += "WHERE ticker=? AND start_date=? AND end_date=? AND method=? AND weekends=?"
757 return update_query
759 @staticmethod
760 def _construct_insert(params_and_filter, mode=settings.CACHE_MODE):
761 if mode == 'sqlite': 761 ↛ 776line 761 didn't jump to line 776, because the condition on line 761 was never false
762 insert_query = 'INSERT INTO profile ('
763 for param in params_and_filter.keys():
764 insert_query += f'{param}'
765 if list(params_and_filter.keys()).index(param) != len(params_and_filter) - 1:
766 insert_query += ","
767 else:
768 insert_query += ") VALUES ("
769 for param in params_and_filter.keys():
770 insert_query += f':{param}'
771 if list(params_and_filter.keys()).index(param) != len(params_and_filter) - 1:
772 insert_query += ","
773 else:
774 insert_query += ")"
775 return insert_query
776 elif mode == 'dynamodb':
777 insert_query = "INSERT INTO 'profile' VALUE {"
778 for param in params_and_filter.keys():
779 insert_query += f"'{param}': ?"
780 if list(params_and_filter.keys()).index(param) != len(params_and_filter)-1:
781 insert_query += ", "
782 else:
783 insert_query += "}"
784 return insert_query
786 @staticmethod
787 def _create_cache_key(filters):
788 hashish_key = ''
789 for filt in filters.values():
790 if isinstance(filt, str):
791 hashish_key += filt
792 elif isinstance(filt, (int, float)):
793 hashish_key += str(filt)
794 elif isinstance(filt, datetime.date):
795 hashish_key += dater.to_string(filt)
796 return hashish_key
798 def __init__(self, mode=settings.CACHE_MODE):
799 """
800 Initializes `ProfileCache`. A random UUID will be assigned to the `ProfileCache` the first time it is created. Since `ProfileCache` is a singelton, all subsequent instantiations of `ProfileCache` will have the same UUID.
802 Parameters
803 ----------
804 1. **mode**: ``str``
805 Determines the data source that acts as the cache. Defaults to `scrilla.settings.CACHE_MODE`. Can be set to either `sqlite` or `dynamodb`.
806 """
807 if not self.inited:
808 self.uuid = uuid.uuid4()
809 self.inited = True
811 self.mode = mode
813 if not files.get_memory_json()['cache'][mode]['profile']:
814 self._table()
816 def _table(self):
817 if self.mode == 'sqlite':
818 Cache.execute(query=self.sqlite_create_table_transaction,
819 mode=self.mode)
820 elif self.mode == 'dynamodb': 820 ↛ exitline 820 didn't return from function '_table', because the condition on line 820 was never false
821 self.dynamodb_table_configuration = aws.dynamo_table_conf(
822 self.dynamodb_table_configuration)
823 Cache.provision(self.dynamodb_table_configuration, self.mode)
825 def _query(self):
826 if self.mode == 'sqlite': 826 ↛ 828line 826 didn't jump to line 828, because the condition on line 826 was never false
827 return self.sqlite_profile_query
828 elif settings.CACHE_MODE == 'dynamodb':
829 return self.dynamodb_profile_query
831 def _identity(self):
832 if self.mode == 'sqlite': 832 ↛ 834line 832 didn't jump to line 834, because the condition on line 832 was never false
833 return self.sqlite_identity_query
834 elif self.mode == 'dynamodb':
835 return self.dynamodb_identity_query
837 def _update_internal_cache(self, profile, profile_keys):
838 key = self._create_cache_key(profile_keys)
839 self.internal_cache[key] = profile
841 def _retrieve_from_internal_cache(self, profile_keys):
842 key = self._create_cache_key(profile_keys)
843 if key in list(self.internal_cache):
844 return self.internal_cache[key]
845 return None
847 def save_or_update_row(self, ticker: str, start_date: datetime.date, end_date: datetime.date, annual_return: Union[float, None] = None, annual_volatility: Union[float, None] = None, sharpe_ratio: Union[float, None] = None, asset_beta: Union[float, None] = None, equity_cost: Union[float, None] = None, weekends: int = 0, method: str = settings.ESTIMATION_METHOD):
848 filters = {'ticker': ticker, 'start_date': start_date,
849 'end_date': end_date, 'method': method, 'weekends': weekends}
850 params = {}
852 if annual_return is not None:
853 params['annual_return'] = annual_return
854 if annual_volatility is not None:
855 params['annual_volatility'] = annual_volatility
856 if sharpe_ratio is not None:
857 params['sharpe_ratio'] = sharpe_ratio
858 if asset_beta is not None:
859 params['asset_beta'] = asset_beta
860 if equity_cost is not None:
861 params['equity_cost'] = equity_cost
863 self._update_internal_cache(params, filters)
865 identity = Cache.execute(self._identity(), filters, self.mode)
866 # NOTE: in order to uses EXISTS function, need to execute identity query as transaction.
867 # will need to differentiate between sqlite and dynamodb mode here since all executes
868 # are passed through statement, not transaction...
869 # could add a flag to execute method to explicitly perform a transaction.
870 # not wild about that idea, though.
872 logger.verbose(
873 'Attempting to insert/update risk profile into cache', 'ProfileCache.save_or_update_rows')
875 if len(identity) == 0:
876 return Cache.execute(self._construct_insert({**params, **filters}),
877 {**params, **filters}, self.mode)
878 return Cache.execute(self._construct_update(params),
879 {**params, **filters}, self.mode)
881 def filter(self, ticker: str, start_date: datetime.date, end_date: datetime.date, weekends: int = 0, method=settings.ESTIMATION_METHOD):
882 filters = {'ticker': ticker, 'start_date': start_date,
883 'end_date': end_date, 'method': method, 'weekends': weekends}
885 in_memory = self._retrieve_from_internal_cache(filters)
886 if in_memory:
887 logger.debug(f'{ticker} profile found in memory',
888 'ProfileCachce.filter')
889 return in_memory
891 logger.debug(
892 f'Querying {self.mode} cache: \n\t{self._query()}\n\t\t with :ticker={ticker}, :start_date={start_date}, :end_date={end_date}', 'ProfileCache.filter')
894 result = Cache.execute(
895 query=self._query(), formatter=filters, mode=self.mode)
897 if len(result) > 0: 897 ↛ 898line 897 didn't jump to line 898, because the condition on line 897 was never true
898 logger.debug(f'{ticker} profile found in cache',
899 'ProfileCache.filter')
900 self._update_internal_cache(self.to_dict(result), filters)
901 return self.to_dict(result)
902 logger.debug(
903 f'No results found for {ticker} profile in the cache', 'ProfileCache.filter')
904 return None
907def init_cache():
908 memory = files.get_memory_json()
909 if not memory['cache'][settings.CACHE_MODE]['prices']: 909 ↛ 912line 909 didn't jump to line 912, because the condition on line 909 was never false
910 PriceCache()
911 memory['cache'][settings.CACHE_MODE]['prices'] = True
912 if not memory['cache'][settings.CACHE_MODE]['interest']: 912 ↛ 915line 912 didn't jump to line 915, because the condition on line 912 was never false
913 InterestCache()
914 memory['cache'][settings.CACHE_MODE]['interest'] = True
915 if not memory['cache'][settings.CACHE_MODE]['profile']: 915 ↛ 918line 915 didn't jump to line 918, because the condition on line 915 was never false
916 ProfileCache()
917 memory['cache'][settings.CACHE_MODE]['profile'] = True
918 if not memory['cache'][settings.CACHE_MODE]['correlations']: 918 ↛ 921line 918 didn't jump to line 921, because the condition on line 918 was never false
919 CorrelationCache()
920 memory['cache'][settings.CACHE_MODE]['correlations'] = True
921 files.save_memory_json(memory)