Coverage for src/scrilla/cloud/aws.py: 53%
119 statements
« prev ^ index » next coverage.py v6.4.2, created at 2022-07-18 18:14 +0000
« prev ^ index » next coverage.py v6.4.2, created at 2022-07-18 18:14 +0000
1from typing import List, Union
2import boto3
3from botocore.exceptions import ClientError, ParamValidationError
4from datetime import date
5from scrilla import settings
6from scrilla.util import outputter, dater
7logger = outputter.Logger("scrilla.cloud.aws", settings.LOG_LEVEL)
9DYNAMO_STATEMENT_LIMIT = 25
12def dynamo_client():
13 return boto3.client('dynamodb')
16def dynamo_resource():
17 return boto3.resource('dynamodb')
20def dynamo_json_to_params(document: dict) -> list:
21 if document is None or len(document) == 0:
22 return None
24 dynamo_json = []
25 for entry in document.values():
26 if isinstance(entry, str):
27 dynamo_json.append({'S': str(entry)})
28 elif isinstance(entry, bool):
29 # NOTE: bool evaluation has to come before int/float evaluation
30 # because False/True also evaluate to ints in python
31 dynamo_json.append({'BOOL': str(entry)})
32 elif isinstance(entry, (int, float)):
33 dynamo_json.append({'N': str(entry)})
34 elif isinstance(entry, list):
35 if all(isinstance(el, str) for el in entry):
36 dynamo_json.append({'SS': entry})
37 if all(isinstance(el, (int, float)) for el in entry):
38 dynamo_json.append({'NS': [str(el) for el in entry]})
39 elif isinstance(entry, date):
40 dynamo_json.append({'S': dater.to_string(entry)})
41 elif entry is None: 41 ↛ 25line 41 didn't jump to line 25, because the condition on line 41 was never false
42 dynamo_json.append({'NULL': 'True'})
43 return dynamo_json
46def dynamo_params_to_json(document: dict) -> list:
47 if 'Items' in list(document.keys()): 47 ↛ 68line 47 didn't jump to line 68, because the condition on line 47 was never false
48 json_list = []
49 for doc in document['Items']:
50 json_dict = {}
51 for entry_key, entry_value in doc.items():
52 type_key = list(entry_value.keys())[0]
53 type_value = list(entry_value.values())[0]
54 if type_key == 'N':
55 json_dict[entry_key] = float(type_value)
56 elif type_key == 'S': 56 ↛ 58line 56 didn't jump to line 58, because the condition on line 56 was never false
57 json_dict[entry_key] = type_value
58 elif type_key == 'BOOL':
59 json_dict[entry_key] = type_value.lower() == 'true'
60 elif type_key == 'SS':
61 json_dict[entry_key] = type_value
62 elif type_key == 'NS':
63 json_dict[entry_key] = [float(el) for el in type_value]
64 elif type_key == 'NULL':
65 json_dict[entry_key] = None
66 json_list.append(json_dict)
67 return json_list
68 elif 'Responses' in list(document.keys()):
69 # TODO: Error handling?
70 pass
73def dynamo_table_conf(table_configuration) -> dict:
74 table_configuration.update(settings.DYNAMO_CONF)
75 return table_configuration
78def dynamo_statement_args(statement: str, params=None) -> dict:
79 if params is None:
80 return {
81 'Statement': statement
82 }
83 return {
84 'Statement': statement,
85 'Parameters': dynamo_json_to_params(params)
86 }
89def dynamo_table(table_configuration: dict):
90 try:
91 logger.debug(
92 f'Provisioning DynamoDB {table_configuration["TableName"]} table', 'dynamo_table')
93 return dynamo_client().create_table(**table_configuration)
94 except ClientError as e:
95 if not ( 95 ↛ 101line 95 didn't jump to line 101, because the condition on line 95 was never false
96 'Table already exists' in e.response['Error']['Message'] or
97 'Table is being created' in e.response['Error']['Message']
98 ):
99 logger.error(e, 'dynamo_table')
100 logger.verbose(f'\n\t\t{table_configuration}', 'dynamo_table')
101 return e
102 except ParamValidationError as e:
103 logger.error(e, 'dynamo_Table')
104 return e
105 except KeyError as e:
106 logger.error(e, 'dynamo_table')
107 return e
110def dynamo_transaction(transaction, formatter=None):
111 try:
112 if isinstance(formatter, list):
113 statements = [dynamo_statement_args(
114 transaction, params) for params in formatter]
115 if len(statements) > DYNAMO_STATEMENT_LIMIT:
116 loops = len(statements) // DYNAMO_STATEMENT_LIMIT + \
117 (1 if len(statements) % DYNAMO_STATEMENT_LIMIT != 0 else 0)
118 return [
119 dynamo_params_to_json(
120 dynamo_client().execute_transaction(
121 TransactStatements=statements[i*DYNAMO_STATEMENT_LIMIT:
122 (i+1)*DYNAMO_STATEMENT_LIMIT]
123 ))
124 for i in range(0, loops)
125 ]
126 return dynamo_params_to_json(
127 dynamo_client().execute_transaction(TransactStatements=statements
128 ))
129 return dynamo_params_to_json(
130 dynamo_client().execute_transaction(TransactStatements=[
131 dynamo_statement_args(transaction, formatter)]
132 ))
133 except (ClientError, ParamValidationError) as e:
134 logger.error(e, 'dynamo_transaction')
135 logger.debug(f'\n\t\t{transaction}', 'dynamo_transaction')
138def dynamo_statement(query, formatter=None):
139 try:
140 if isinstance(formatter, list):
141 statements = [dynamo_statement_args(
142 query, params) for params in formatter]
144 if len(statements) > DYNAMO_STATEMENT_LIMIT:
146 loops = len(statements) // DYNAMO_STATEMENT_LIMIT + \
147 (1 if len(statements) % DYNAMO_STATEMENT_LIMIT != 0 else 0)
149 return [
150 dynamo_params_to_json(
151 dynamo_client().batch_execute_statement(
152 Statements=statements[i*DYNAMO_STATEMENT_LIMIT:
153 (i+1)*DYNAMO_STATEMENT_LIMIT]
154 )) for i in range(0, loops)
155 ]
156 return dynamo_params_to_json(
157 dynamo_client().batch_execute_statement(Statements=statements)
158 )
159 return dynamo_params_to_json(
160 dynamo_client().execute_statement(**dynamo_statement_args(query, formatter)
161 ))
162 except (ClientError, ParamValidationError) as e:
163 logger.error(e, 'dynamo_statement')
164 logger.debug(f'\n\t\t{query}', 'dynamo_statement')
165 return e
168def dynamo_drop_table(tables: Union[str, List[str]]) -> bool:
169 try:
170 if isinstance(tables, list):
171 for tbl in tables:
172 db_table = dynamo_resource().Table(tbl)
173 db_table.delete()
174 return True
175 elif isinstance(tables, str):
176 db_table = dynamo_resource().Table(tables)
177 db_table.delete()
178 return True
179 else:
180 raise ValueError('`table` must be instance of `str` or `list`!')
181 except (ClientError, ParamValidationError) as e:
182 logger.error(e, 'dynamo_drop_table')
183 return False