Coverage for src/scrilla/util/outputter.py: 63%
197 statements
« prev ^ index » next coverage.py v6.4.2, created at 2022-07-18 18:14 +0000
« prev ^ index » next coverage.py v6.4.2, created at 2022-07-18 18:14 +0000
1import datetime
2from typing import Dict, List, Union
4from scrilla.static import constants, formats, definitions
7# GENERAL OUTPUT FUNCTIONS
8def space(n: int):
9 print('\n'*n)
12def title_line(title: str, line_length: int = formats.formats['LINE_LENGTH'], separator: str = formats.formats['separator'],
13 display: bool = True):
14 buff = int((line_length - len(title))/2)
15 result = separator*buff + title + separator*buff
16 if display: 16 ↛ 18line 16 didn't jump to line 18, because the condition on line 16 was never false
17 print(result)
18 return result
21def separator_line(line_length=formats.formats['LINE_LENGTH'], separator=formats.formats['separator'], display=True) -> str:
22 if display: 22 ↛ 24line 22 didn't jump to line 24, because the condition on line 22 was never false
23 print(separator*line_length)
24 return separator*line_length
27def break_lines(msg: str, line_length: int = formats.formats['LINE_LENGTH']) -> List[str]:
28 """
29 Generates a list of strings where each string is less than or equal to a specified line length. All elements in the array will be equal length except the last element; in other words, the inputted string will be read into an array, line by line, until there is nothing left to be read.
31 Parameters
32 ----------
33 1. ``msg``: ``str``
34 String to be broken into an array with each string no longer than `line_length`.
35 2. **line_length**: ``int``
36 *Optional*. Defaults to the value set by `static.formats.formats['LINE_LENGTH]`. The maximum length of a line.
38 Returns
39 -------
40 `list[str]`
41 A list of `line`'s with `len(line)<line_length or len(line)==line_length`.
42 """
44 if len(msg) > line_length:
45 return [msg[i:i+line_length] for i in range(0, len(msg), line_length)]
46 return [msg]
49def center(this_line: str, line_length: int = formats.formats['LINE_LENGTH'], display: bool = True) -> str:
50 """
51 Centers an inputted line relative to the line length.
53 Parameters
54 ----------
55 1. **this_line**: ``str``
56 the string to centered.
57 2. **line_length**: ``int``
58 *Optional*. Defaults to the value set by `static.formats.formats['LINE_LENGTH]`. The length of the line that is being centered over.
59 3. **display**: ``bool``
60 *Optional*. Defaults to `True`. If set to `True`, function will print result to screen. If set to `False`, the function will return the value as a string.
62 Returns
63 -------
64 `str`
65 The line with the difference between `line_length` and `this_line` distributed evenly on either side of the content as white space.
67 Raises
68 ------
69 1. **ValueError**
70 If the inputted line is larged than the line length, this error will be thrown.
71 """
73 if len(this_line) > line_length: 73 ↛ 74line 73 didn't jump to line 74, because the condition on line 73 was never true
74 raise ValueError(
75 'Line to be centered is larger than length being centered over.')
77 buff = int((line_length - len(this_line))/2)
78 output = ' '*buff + this_line + ' '*buff
79 if display: 79 ↛ 82line 79 didn't jump to line 82, because the condition on line 79 was never false
80 print(output)
81 return
82 return output
85def print_list(list_to_print, indent=formats.formats['INDENT']):
86 for i, item in enumerate(list_to_print):
87 print(' '*indent, f'{i}. {item}')
90def string_result(operation, result, indent=formats.formats['INDENT'], display=True):
91 output = ' '*indent + operation + ' = ' + result
92 if display: 92 ↛ 95line 92 didn't jump to line 95, because the condition on line 92 was never false
93 print(output)
94 return
95 return output
98def scalar_result(calculation, result, currency=True, indent=formats.formats['INDENT']):
99 if currency:
100 print(' '*indent, calculation, ' = $', round(float(result), 2))
101 else:
102 print(' '*indent, calculation, ' = ', round(float(result), 4))
105def percent_result(calculation, result, indent=formats.formats['INDENT']):
106 print(' '*indent, calculation, ' = ', round(float(result), 4), '%')
109def equivalent_result(right_hand, left_hand, value, indent=formats.formats['INDENT']):
110 print(' '*indent, f'{right_hand} = {left_hand} = {value}')
113def help_msg(indent: int = formats.formats['INDENT'], function_filter: Union[List[str], None] = None):
114 func_dict, arg_dict = definitions.FUNC_DICT, definitions.ARG_DICT
116 title_line('scrilla')
117 space(1)
119 for paragraph in definitions.HELP_MSG:
120 for line in break_lines(paragraph):
121 center(line)
122 space(1)
124 title_line('SYNTAX')
125 center(definitions.SYNTAX)
126 space(1)
128 for func_name in func_dict:
129 if function_filter is None or len(function_filter) == 0 or \
130 any(filt in func_dict[func_name]['values'] for filt in function_filter):
132 title_line(func_dict[func_name]['name'])
133 for line in break_lines(func_dict[func_name]['description']):
134 center(line)
135 separator_line()
137 commands = func_dict[func_name]['values']
138 print(' ', f'COMMAND: {commands[0]}, {commands[1]}')
140 if func_dict[func_name]['args'] is not None:
141 for arg_name in func_dict[func_name]['args']:
142 aliases = arg_dict[arg_name]['values']
144 print(
145 ' '*indent, f'OPTION: {aliases[0]}, {aliases[1]}, {aliases[2]}, {aliases[3]}')
147 if arg_dict[arg_name]['required']:
148 print(' '*2*indent, 'REQUIRED')
150 print(' '*2*indent, f'NAME: {arg_dict[arg_name]["name"]}')
152 if arg_dict[arg_name]['default'] is not None:
153 print(' '*2*indent,
154 f'DEFAULT: {arg_dict[arg_name]["default"]}')
156 if arg_dict[arg_name]['syntax'] is not None:
157 print(' '*2*indent,
158 f'FORMAT: {arg_dict[arg_name]["syntax"]}')
159 separator_line()
161# ANALYSIS SPECIFIC OUTPUT FUNCTIONS
164def portfolio_percent_result(result, tickers, indent=formats.formats['INDENT']):
165 for i, item in enumerate(tickers):
166 print(' '*indent, f'{item} =', round(100*result[i], 2), '%')
169def portfolio_shares_result(result, tickers, indent=formats.formats['INDENT']):
170 for i, item in enumerate(tickers):
171 print(' '*indent, f'{item} =', result[i])
174def spot_price(ticker, this_spot_price):
175 formatted_price = round(float(this_spot_price), 2)
176 scalar_result(f'{ticker} spot price', formatted_price)
179def model_price(ticker: str, this_model_price: Union[str, float], model: str) -> None:
180 formatted_price = round(float(this_model_price), 2)
181 scalar_result(f'{ticker} {str(model).upper()} price', formatted_price)
184def risk_profile(profiles: Dict[str, Dict[str, float]]) -> None:
185 for key, value in profiles.items():
186 title_line(f'{key} Risk Profile')
187 for subkey, subvalue in value.items():
188 scalar_result(f'{subkey}', f'{subvalue}', currency=False)
191def moving_average_result(ticker: str, averages: Dict[str, Dict[str, float]]) -> None:
192 """
193 Prints the results of `scrilla.analysis.models.geometric.statistics.calculate_moving_averages` or `scrilla.analysis.models.reversion.statistics.calculate_moving_averages` to *stdout*.
195 Parameters
196 ----------
197 1. **averages**: ``Dict[str, Dict[str,float]]``
198 The dictionary returned from a call to `scrilla.analysis.models.geometric.statistics.calculate_moving_averages` or `scrilla.analysis.models.reversion.statistics.calculate_moving_averages`.
199 """
200 title_line(f'{ticker} Moving Averages')
201 for this_date, average_dict in averages.items():
202 center(this_date)
203 for avg_key, average in average_dict.items():
204 scalar_result(calculation=avg_key, result=average, currency=False)
207def screen_results(info, model):
208 for ticker in info:
209 title_line(f'{ticker} {str(model).upper()} Model vs. Spot Price ')
210 spot_price(ticker=ticker, this_spot_price=info[ticker]['spot_price'])
211 model_price(ticker=ticker,
212 this_model_price=info[ticker]['model_price'], model=model)
213 scalar_result(f'{ticker} discount', info[ticker]['discount'])
214 separator_line()
216# TODO: can probably combine optimal_result and efficient_frontier into a single function
217# by wrapping the optimal_results in an array so when it iterates through frontier
218# in efficient_frontier, it will only pick up the single allocation array for the
219# optimal result.
222def optimal_result(portfolio, allocation, investment=None, latest_prices=None):
223 title_line('Optimal Percentage Allocation')
224 portfolio_percent_result(allocation, portfolio.tickers)
226 if investment is not None:
227 shares = portfolio.calculate_approximate_shares(
228 allocation, investment, latest_prices)
229 total = portfolio.calculate_actual_total(
230 allocation, investment, latest_prices)
232 title_line('Optimal Share Allocation')
233 portfolio_shares_result(shares, portfolio.tickers)
234 title_line('Optimal Portfolio Value')
235 scalar_result('Total', round(total, 2))
237 title_line('Risk-Return Profile')
238 scalar_result(calculation='Return', result=portfolio.return_function(
239 allocation), currency=False)
240 scalar_result(calculation='Volatility', result=portfolio.volatility_function(
241 allocation), currency=False)
244def efficient_frontier(portfolio, frontier, investment=None, latest_prices=None):
245 title_line('(Annual Return %, Annual Volatility %) Portfolio')
247 # TODO: edit title to include dates
249 for allocation in frontier:
250 separator_line()
251 return_string = str(
252 round(round(portfolio.return_function(allocation), 4)*100, 2))
253 vol_string = str(
254 round(round(portfolio.volatility_function(allocation), 4)*100, 2))
255 title_line(f'({return_string} %, {vol_string}%) Portfolio')
256 separator_line()
258 title_line('Optimal Percentage Allocation')
259 portfolio_percent_result(allocation, portfolio.tickers)
261 if investment is not None:
262 shares = portfolio.calculate_approximate_shares(
263 allocation, investment, latest_prices)
264 total = portfolio.calculate_actual_total(
265 allocation, investment, latest_prices)
267 title_line('Optimal Share Allocation')
268 portfolio_shares_result(shares, portfolio.tickers)
269 title_line('Optimal Portfolio Value')
270 scalar_result('Total', round(total, 2))
272 title_line('Risk-Return Profile')
273 scalar_result('Return', portfolio.return_function(
274 allocation), currency=False)
275 scalar_result('Volatility', portfolio.volatility_function(
276 allocation), currency=False)
277 print('\n')
280def correlation_matrix(tickers: List[str], correl_matrix: List[List[float]], display: bool = True):
281 """
282 Parameters
283 ----------
284 1. **tickers**: ``list``
285 Array of tickers for which the correlation matrix was calculated and formatted.
286 2. **indent**: ``int``
287 Amount of indent on each new line of the correlation matrix.
288 3. **start_date**: ``datetime.date``
289 Start date of the time period over which correlation was calculated.
290 4. **end_date**: ``datetime.date``
291 End date of the time period over which correlation was calculated.
293 Returns
294 ------
295 A correlation matrix string formatted with new lines and spaces.
296 """
297 entire_formatted_result, formatted_subtitle, formatted_title = "", "", ""
299 line_length, first_symbol_length = 0, 0
300 new_line = ""
301 no_symbols = len(tickers)
303 for i in range(no_symbols):
304 this_symbol = tickers[i]
305 symbol_string = ' '*formats.formats['INDENT'] + f'{this_symbol} '
307 if i != 0:
308 this_line = symbol_string + ' ' * \
309 (line_length - len(symbol_string) - 7*(no_symbols - i))
310 # NOTE: seven is number of chars in ' 100.0%'
311 else:
312 this_line = symbol_string
313 first_symbol_length = len(this_symbol)
315 new_line = this_line
317 for j in range(i, no_symbols):
318 if j == i:
319 new_line += " 100.0%"
321 else:
322 result = correl_matrix[i][j]
323 formatted_result = str(
324 100*result)[:(constants.constants['SIG_FIGS']+1)]
325 new_line += f' {formatted_result}%'
327 entire_formatted_result += new_line + '\n'
329 if i == 0:
330 line_length = len(new_line)
332 formatted_subtitle += ' ' * \
333 (formats.formats['INDENT'] + first_symbol_length+1)
334 for i, ticker in enumerate(tickers):
335 sym_len = len(ticker)
336 formatted_subtitle += f' {ticker}' + ' '*(7-sym_len)
337 # NOTE: seven is number of chars in ' 100.0%'
338 if i == 0:
339 formatted_title += f'({ticker},'
340 elif i < len(tickers)-1: 340 ↛ 341line 340 didn't jump to line 341, because the condition on line 340 was never true
341 formatted_title += f'{ticker},'
342 else:
343 formatted_title += f'{ticker}) correlation matrix'
345 formatted_subtitle += '\n'
347 whole_thing = formatted_subtitle + entire_formatted_result
349 if display: 349 ↛ 354line 349 didn't jump to line 354, because the condition on line 349 was never false
350 title_line(formatted_title)
351 print(f'\n{whole_thing}')
352 return
354 return whole_thing
357class Logger():
359 def __init__(self, location, log_level="info"):
360 self.location = location
361 self.log_level = log_level
363 # LOGGING FUNCTIONS
364 def comment(self, msg, method, level="INFO"):
365 now = datetime.datetime.now()
366 dt_string = now.strftime("%d/%m/%Y %H:%M:%S")
367 print(dt_string, ' : ', level, ':',
368 f'{self.location}.{method}', ' : ', msg)
370 def error(self, msg, method):
371 self.comment(msg, method, 'ERROR',)
373 def info(self, msg, method):
374 if self.log_level in [constants.constants['LOG_LEVEL']['INFO'], 374 ↛ 377line 374 didn't jump to line 377, because the condition on line 374 was never true
375 constants.constants['LOG_LEVEL']['DEBUG'],
376 constants.constants['LOG_LEVEL']['VERBOSE']]:
377 self.comment(msg, method, 'INFO')
379 def debug(self, msg, method):
380 if self.log_level in [constants.constants['LOG_LEVEL']['DEBUG'], 380 ↛ 382line 380 didn't jump to line 382, because the condition on line 380 was never true
381 constants.constants['LOG_LEVEL']['VERBOSE']]:
382 self.comment(msg, method, 'DEBUG')
384 def verbose(self, msg, method):
385 if self.log_level == constants.constants['LOG_LEVEL']['VERBOSE']: 385 ↛ 386line 385 didn't jump to line 386, because the condition on line 385 was never true
386 self.comment(msg, method, 'VERBOSE')