Coverage for src/scrilla/util/outputter.py: 63%

197 statements  

« prev     ^ index     » next       coverage.py v6.4.2, created at 2022-07-18 18:14 +0000

1import datetime 

2from typing import Dict, List, Union 

3 

4from scrilla.static import constants, formats, definitions 

5 

6 

7# GENERAL OUTPUT FUNCTIONS 

8def space(n: int): 

9 print('\n'*n) 

10 

11 

12def title_line(title: str, line_length: int = formats.formats['LINE_LENGTH'], separator: str = formats.formats['separator'], 

13 display: bool = True): 

14 buff = int((line_length - len(title))/2) 

15 result = separator*buff + title + separator*buff 

16 if display: 16 ↛ 18line 16 didn't jump to line 18, because the condition on line 16 was never false

17 print(result) 

18 return result 

19 

20 

21def separator_line(line_length=formats.formats['LINE_LENGTH'], separator=formats.formats['separator'], display=True) -> str: 

22 if display: 22 ↛ 24line 22 didn't jump to line 24, because the condition on line 22 was never false

23 print(separator*line_length) 

24 return separator*line_length 

25 

26 

27def break_lines(msg: str, line_length: int = formats.formats['LINE_LENGTH']) -> List[str]: 

28 """ 

29 Generates a list of strings where each string is less than or equal to a specified line length. All elements in the array will be equal length except the last element; in other words, the inputted string will be read into an array, line by line, until there is nothing left to be read. 

30 

31 Parameters 

32 ---------- 

33 1. ``msg``: ``str`` 

34 String to be broken into an array with each string no longer than `line_length`. 

35 2. **line_length**: ``int`` 

36 *Optional*. Defaults to the value set by `static.formats.formats['LINE_LENGTH]`. The maximum length of a line.  

37 

38 Returns 

39 ------- 

40 `list[str]` 

41 A list of `line`'s with `len(line)<line_length or len(line)==line_length`. 

42 """ 

43 

44 if len(msg) > line_length: 

45 return [msg[i:i+line_length] for i in range(0, len(msg), line_length)] 

46 return [msg] 

47 

48 

49def center(this_line: str, line_length: int = formats.formats['LINE_LENGTH'], display: bool = True) -> str: 

50 """ 

51 Centers an inputted line relative to the line length.  

52 

53 Parameters 

54 ---------- 

55 1. **this_line**: ``str`` 

56 the string to centered. 

57 2. **line_length**: ``int`` 

58 *Optional*. Defaults to the value set by `static.formats.formats['LINE_LENGTH]`. The length of the line that is being centered over.  

59 3. **display**: ``bool`` 

60 *Optional*. Defaults to `True`. If set to `True`, function will print result to screen. If set to `False`, the function will return the value as a string. 

61 

62 Returns 

63 ------- 

64 `str` 

65 The line with the difference between `line_length` and `this_line` distributed evenly on either side of the content as white space.  

66 

67 Raises 

68 ------ 

69 1. **ValueError** 

70 If the inputted line is larged than the line length, this error will be thrown. 

71 """ 

72 

73 if len(this_line) > line_length: 73 ↛ 74line 73 didn't jump to line 74, because the condition on line 73 was never true

74 raise ValueError( 

75 'Line to be centered is larger than length being centered over.') 

76 

77 buff = int((line_length - len(this_line))/2) 

78 output = ' '*buff + this_line + ' '*buff 

79 if display: 79 ↛ 82line 79 didn't jump to line 82, because the condition on line 79 was never false

80 print(output) 

81 return 

82 return output 

83 

84 

85def print_list(list_to_print, indent=formats.formats['INDENT']): 

86 for i, item in enumerate(list_to_print): 

87 print(' '*indent, f'{i}. {item}') 

88 

89 

90def string_result(operation, result, indent=formats.formats['INDENT'], display=True): 

91 output = ' '*indent + operation + ' = ' + result 

92 if display: 92 ↛ 95line 92 didn't jump to line 95, because the condition on line 92 was never false

93 print(output) 

94 return 

95 return output 

96 

97 

98def scalar_result(calculation, result, currency=True, indent=formats.formats['INDENT']): 

99 if currency: 

100 print(' '*indent, calculation, ' = $', round(float(result), 2)) 

101 else: 

102 print(' '*indent, calculation, ' = ', round(float(result), 4)) 

103 

104 

105def percent_result(calculation, result, indent=formats.formats['INDENT']): 

106 print(' '*indent, calculation, ' = ', round(float(result), 4), '%') 

107 

108 

109def equivalent_result(right_hand, left_hand, value, indent=formats.formats['INDENT']): 

110 print(' '*indent, f'{right_hand} = {left_hand} = {value}') 

111 

112 

113def help_msg(indent: int = formats.formats['INDENT'], function_filter: Union[List[str], None] = None): 

114 func_dict, arg_dict = definitions.FUNC_DICT, definitions.ARG_DICT 

115 

116 title_line('scrilla') 

117 space(1) 

118 

119 for paragraph in definitions.HELP_MSG: 

120 for line in break_lines(paragraph): 

121 center(line) 

122 space(1) 

123 

124 title_line('SYNTAX') 

125 center(definitions.SYNTAX) 

126 space(1) 

127 

128 for func_name in func_dict: 

129 if function_filter is None or len(function_filter) == 0 or \ 

130 any(filt in func_dict[func_name]['values'] for filt in function_filter): 

131 

132 title_line(func_dict[func_name]['name']) 

133 for line in break_lines(func_dict[func_name]['description']): 

134 center(line) 

135 separator_line() 

136 

137 commands = func_dict[func_name]['values'] 

138 print(' ', f'COMMAND: {commands[0]}, {commands[1]}') 

139 

140 if func_dict[func_name]['args'] is not None: 

141 for arg_name in func_dict[func_name]['args']: 

142 aliases = arg_dict[arg_name]['values'] 

143 

144 print( 

145 ' '*indent, f'OPTION: {aliases[0]}, {aliases[1]}, {aliases[2]}, {aliases[3]}') 

146 

147 if arg_dict[arg_name]['required']: 

148 print(' '*2*indent, 'REQUIRED') 

149 

150 print(' '*2*indent, f'NAME: {arg_dict[arg_name]["name"]}') 

151 

152 if arg_dict[arg_name]['default'] is not None: 

153 print(' '*2*indent, 

154 f'DEFAULT: {arg_dict[arg_name]["default"]}') 

155 

156 if arg_dict[arg_name]['syntax'] is not None: 

157 print(' '*2*indent, 

158 f'FORMAT: {arg_dict[arg_name]["syntax"]}') 

159 separator_line() 

160 

161# ANALYSIS SPECIFIC OUTPUT FUNCTIONS 

162 

163 

164def portfolio_percent_result(result, tickers, indent=formats.formats['INDENT']): 

165 for i, item in enumerate(tickers): 

166 print(' '*indent, f'{item} =', round(100*result[i], 2), '%') 

167 

168 

169def portfolio_shares_result(result, tickers, indent=formats.formats['INDENT']): 

170 for i, item in enumerate(tickers): 

171 print(' '*indent, f'{item} =', result[i]) 

172 

173 

174def spot_price(ticker, this_spot_price): 

175 formatted_price = round(float(this_spot_price), 2) 

176 scalar_result(f'{ticker} spot price', formatted_price) 

177 

178 

179def model_price(ticker: str, this_model_price: Union[str, float], model: str) -> None: 

180 formatted_price = round(float(this_model_price), 2) 

181 scalar_result(f'{ticker} {str(model).upper()} price', formatted_price) 

182 

183 

184def risk_profile(profiles: Dict[str, Dict[str, float]]) -> None: 

185 for key, value in profiles.items(): 

186 title_line(f'{key} Risk Profile') 

187 for subkey, subvalue in value.items(): 

188 scalar_result(f'{subkey}', f'{subvalue}', currency=False) 

189 

190 

191def moving_average_result(ticker: str, averages: Dict[str, Dict[str, float]]) -> None: 

192 """ 

193 Prints the results of `scrilla.analysis.models.geometric.statistics.calculate_moving_averages` or `scrilla.analysis.models.reversion.statistics.calculate_moving_averages` to *stdout*. 

194 

195 Parameters 

196 ---------- 

197 1. **averages**: ``Dict[str, Dict[str,float]]`` 

198 The dictionary returned from a call to `scrilla.analysis.models.geometric.statistics.calculate_moving_averages` or `scrilla.analysis.models.reversion.statistics.calculate_moving_averages`. 

199 """ 

200 title_line(f'{ticker} Moving Averages') 

201 for this_date, average_dict in averages.items(): 

202 center(this_date) 

203 for avg_key, average in average_dict.items(): 

204 scalar_result(calculation=avg_key, result=average, currency=False) 

205 

206 

207def screen_results(info, model): 

208 for ticker in info: 

209 title_line(f'{ticker} {str(model).upper()} Model vs. Spot Price ') 

210 spot_price(ticker=ticker, this_spot_price=info[ticker]['spot_price']) 

211 model_price(ticker=ticker, 

212 this_model_price=info[ticker]['model_price'], model=model) 

213 scalar_result(f'{ticker} discount', info[ticker]['discount']) 

214 separator_line() 

215 

216# TODO: can probably combine optimal_result and efficient_frontier into a single function 

217# by wrapping the optimal_results in an array so when it iterates through frontier 

218# in efficient_frontier, it will only pick up the single allocation array for the 

219# optimal result. 

220 

221 

222def optimal_result(portfolio, allocation, investment=None, latest_prices=None): 

223 title_line('Optimal Percentage Allocation') 

224 portfolio_percent_result(allocation, portfolio.tickers) 

225 

226 if investment is not None: 

227 shares = portfolio.calculate_approximate_shares( 

228 allocation, investment, latest_prices) 

229 total = portfolio.calculate_actual_total( 

230 allocation, investment, latest_prices) 

231 

232 title_line('Optimal Share Allocation') 

233 portfolio_shares_result(shares, portfolio.tickers) 

234 title_line('Optimal Portfolio Value') 

235 scalar_result('Total', round(total, 2)) 

236 

237 title_line('Risk-Return Profile') 

238 scalar_result(calculation='Return', result=portfolio.return_function( 

239 allocation), currency=False) 

240 scalar_result(calculation='Volatility', result=portfolio.volatility_function( 

241 allocation), currency=False) 

242 

243 

244def efficient_frontier(portfolio, frontier, investment=None, latest_prices=None): 

245 title_line('(Annual Return %, Annual Volatility %) Portfolio') 

246 

247 # TODO: edit title to include dates 

248 

249 for allocation in frontier: 

250 separator_line() 

251 return_string = str( 

252 round(round(portfolio.return_function(allocation), 4)*100, 2)) 

253 vol_string = str( 

254 round(round(portfolio.volatility_function(allocation), 4)*100, 2)) 

255 title_line(f'({return_string} %, {vol_string}%) Portfolio') 

256 separator_line() 

257 

258 title_line('Optimal Percentage Allocation') 

259 portfolio_percent_result(allocation, portfolio.tickers) 

260 

261 if investment is not None: 

262 shares = portfolio.calculate_approximate_shares( 

263 allocation, investment, latest_prices) 

264 total = portfolio.calculate_actual_total( 

265 allocation, investment, latest_prices) 

266 

267 title_line('Optimal Share Allocation') 

268 portfolio_shares_result(shares, portfolio.tickers) 

269 title_line('Optimal Portfolio Value') 

270 scalar_result('Total', round(total, 2)) 

271 

272 title_line('Risk-Return Profile') 

273 scalar_result('Return', portfolio.return_function( 

274 allocation), currency=False) 

275 scalar_result('Volatility', portfolio.volatility_function( 

276 allocation), currency=False) 

277 print('\n') 

278 

279 

280def correlation_matrix(tickers: List[str], correl_matrix: List[List[float]], display: bool = True): 

281 """ 

282 Parameters 

283 ---------- 

284 1. **tickers**: ``list`` 

285 Array of tickers for which the correlation matrix was calculated and formatted. 

286 2. **indent**: ``int`` 

287 Amount of indent on each new line of the correlation matrix. 

288 3. **start_date**: ``datetime.date``  

289 Start date of the time period over which correlation was calculated.  

290 4. **end_date**: ``datetime.date``  

291 End date of the time period over which correlation was calculated.  

292 

293 Returns 

294 ------ 

295 A correlation matrix string formatted with new lines and spaces. 

296 """ 

297 entire_formatted_result, formatted_subtitle, formatted_title = "", "", "" 

298 

299 line_length, first_symbol_length = 0, 0 

300 new_line = "" 

301 no_symbols = len(tickers) 

302 

303 for i in range(no_symbols): 

304 this_symbol = tickers[i] 

305 symbol_string = ' '*formats.formats['INDENT'] + f'{this_symbol} ' 

306 

307 if i != 0: 

308 this_line = symbol_string + ' ' * \ 

309 (line_length - len(symbol_string) - 7*(no_symbols - i)) 

310 # NOTE: seven is number of chars in ' 100.0%' 

311 else: 

312 this_line = symbol_string 

313 first_symbol_length = len(this_symbol) 

314 

315 new_line = this_line 

316 

317 for j in range(i, no_symbols): 

318 if j == i: 

319 new_line += " 100.0%" 

320 

321 else: 

322 result = correl_matrix[i][j] 

323 formatted_result = str( 

324 100*result)[:(constants.constants['SIG_FIGS']+1)] 

325 new_line += f' {formatted_result}%' 

326 

327 entire_formatted_result += new_line + '\n' 

328 

329 if i == 0: 

330 line_length = len(new_line) 

331 

332 formatted_subtitle += ' ' * \ 

333 (formats.formats['INDENT'] + first_symbol_length+1) 

334 for i, ticker in enumerate(tickers): 

335 sym_len = len(ticker) 

336 formatted_subtitle += f' {ticker}' + ' '*(7-sym_len) 

337 # NOTE: seven is number of chars in ' 100.0%' 

338 if i == 0: 

339 formatted_title += f'({ticker},' 

340 elif i < len(tickers)-1: 340 ↛ 341line 340 didn't jump to line 341, because the condition on line 340 was never true

341 formatted_title += f'{ticker},' 

342 else: 

343 formatted_title += f'{ticker}) correlation matrix' 

344 

345 formatted_subtitle += '\n' 

346 

347 whole_thing = formatted_subtitle + entire_formatted_result 

348 

349 if display: 349 ↛ 354line 349 didn't jump to line 354, because the condition on line 349 was never false

350 title_line(formatted_title) 

351 print(f'\n{whole_thing}') 

352 return 

353 

354 return whole_thing 

355 

356 

357class Logger(): 

358 

359 def __init__(self, location, log_level="info"): 

360 self.location = location 

361 self.log_level = log_level 

362 

363 # LOGGING FUNCTIONS 

364 def comment(self, msg, method, level="INFO"): 

365 now = datetime.datetime.now() 

366 dt_string = now.strftime("%d/%m/%Y %H:%M:%S") 

367 print(dt_string, ' : ', level, ':', 

368 f'{self.location}.{method}', ' : ', msg) 

369 

370 def error(self, msg, method): 

371 self.comment(msg, method, 'ERROR',) 

372 

373 def info(self, msg, method): 

374 if self.log_level in [constants.constants['LOG_LEVEL']['INFO'], 374 ↛ 377line 374 didn't jump to line 377, because the condition on line 374 was never true

375 constants.constants['LOG_LEVEL']['DEBUG'], 

376 constants.constants['LOG_LEVEL']['VERBOSE']]: 

377 self.comment(msg, method, 'INFO') 

378 

379 def debug(self, msg, method): 

380 if self.log_level in [constants.constants['LOG_LEVEL']['DEBUG'], 380 ↛ 382line 380 didn't jump to line 382, because the condition on line 380 was never true

381 constants.constants['LOG_LEVEL']['VERBOSE']]: 

382 self.comment(msg, method, 'DEBUG') 

383 

384 def verbose(self, msg, method): 

385 if self.log_level == constants.constants['LOG_LEVEL']['VERBOSE']: 385 ↛ 386line 385 didn't jump to line 386, because the condition on line 385 was never true

386 self.comment(msg, method, 'VERBOSE')