Coverage for src/scrilla/cloud/aws.py: 53%

119 statements  

« prev     ^ index     » next       coverage.py v6.4.2, created at 2022-07-18 18:14 +0000

1from typing import List, Union 

2import boto3 

3from botocore.exceptions import ClientError, ParamValidationError 

4from datetime import date 

5from scrilla import settings 

6from scrilla.util import outputter, dater 

7logger = outputter.Logger("scrilla.cloud.aws", settings.LOG_LEVEL) 

8 

9DYNAMO_STATEMENT_LIMIT = 25 

10 

11 

12def dynamo_client(): 

13 return boto3.client('dynamodb') 

14 

15 

16def dynamo_resource(): 

17 return boto3.resource('dynamodb') 

18 

19 

20def dynamo_json_to_params(document: dict) -> list: 

21 if document is None or len(document) == 0: 

22 return None 

23 

24 dynamo_json = [] 

25 for entry in document.values(): 

26 if isinstance(entry, str): 

27 dynamo_json.append({'S': str(entry)}) 

28 elif isinstance(entry, bool): 

29 # NOTE: bool evaluation has to come before int/float evaluation 

30 # because False/True also evaluate to ints in python 

31 dynamo_json.append({'BOOL': str(entry)}) 

32 elif isinstance(entry, (int, float)): 

33 dynamo_json.append({'N': str(entry)}) 

34 elif isinstance(entry, list): 

35 if all(isinstance(el, str) for el in entry): 

36 dynamo_json.append({'SS': entry}) 

37 if all(isinstance(el, (int, float)) for el in entry): 

38 dynamo_json.append({'NS': [str(el) for el in entry]}) 

39 elif isinstance(entry, date): 

40 dynamo_json.append({'S': dater.to_string(entry)}) 

41 elif entry is None: 41 ↛ 25line 41 didn't jump to line 25, because the condition on line 41 was never false

42 dynamo_json.append({'NULL': 'True'}) 

43 return dynamo_json 

44 

45 

46def dynamo_params_to_json(document: dict) -> list: 

47 if 'Items' in list(document.keys()): 47 ↛ 68line 47 didn't jump to line 68, because the condition on line 47 was never false

48 json_list = [] 

49 for doc in document['Items']: 

50 json_dict = {} 

51 for entry_key, entry_value in doc.items(): 

52 type_key = list(entry_value.keys())[0] 

53 type_value = list(entry_value.values())[0] 

54 if type_key == 'N': 

55 json_dict[entry_key] = float(type_value) 

56 elif type_key == 'S': 56 ↛ 58line 56 didn't jump to line 58, because the condition on line 56 was never false

57 json_dict[entry_key] = type_value 

58 elif type_key == 'BOOL': 

59 json_dict[entry_key] = type_value.lower() == 'true' 

60 elif type_key == 'SS': 

61 json_dict[entry_key] = type_value 

62 elif type_key == 'NS': 

63 json_dict[entry_key] = [float(el) for el in type_value] 

64 elif type_key == 'NULL': 

65 json_dict[entry_key] = None 

66 json_list.append(json_dict) 

67 return json_list 

68 elif 'Responses' in list(document.keys()): 

69 # TODO: Error handling? 

70 pass 

71 

72 

73def dynamo_table_conf(table_configuration) -> dict: 

74 table_configuration.update(settings.DYNAMO_CONF) 

75 return table_configuration 

76 

77 

78def dynamo_statement_args(statement: str, params=None) -> dict: 

79 if params is None: 

80 return { 

81 'Statement': statement 

82 } 

83 return { 

84 'Statement': statement, 

85 'Parameters': dynamo_json_to_params(params) 

86 } 

87 

88 

89def dynamo_table(table_configuration: dict): 

90 try: 

91 logger.debug( 

92 f'Provisioning DynamoDB {table_configuration["TableName"]} table', 'dynamo_table') 

93 return dynamo_client().create_table(**table_configuration) 

94 except ClientError as e: 

95 if not ( 95 ↛ 101line 95 didn't jump to line 101, because the condition on line 95 was never false

96 'Table already exists' in e.response['Error']['Message'] or 

97 'Table is being created' in e.response['Error']['Message'] 

98 ): 

99 logger.error(e, 'dynamo_table') 

100 logger.verbose(f'\n\t\t{table_configuration}', 'dynamo_table') 

101 return e 

102 except ParamValidationError as e: 

103 logger.error(e, 'dynamo_Table') 

104 return e 

105 except KeyError as e: 

106 logger.error(e, 'dynamo_table') 

107 return e 

108 

109 

110def dynamo_transaction(transaction, formatter=None): 

111 try: 

112 if isinstance(formatter, list): 

113 statements = [dynamo_statement_args( 

114 transaction, params) for params in formatter] 

115 if len(statements) > DYNAMO_STATEMENT_LIMIT: 

116 loops = len(statements) // DYNAMO_STATEMENT_LIMIT + \ 

117 (1 if len(statements) % DYNAMO_STATEMENT_LIMIT != 0 else 0) 

118 return [ 

119 dynamo_params_to_json( 

120 dynamo_client().execute_transaction( 

121 TransactStatements=statements[i*DYNAMO_STATEMENT_LIMIT: 

122 (i+1)*DYNAMO_STATEMENT_LIMIT] 

123 )) 

124 for i in range(0, loops) 

125 ] 

126 return dynamo_params_to_json( 

127 dynamo_client().execute_transaction(TransactStatements=statements 

128 )) 

129 return dynamo_params_to_json( 

130 dynamo_client().execute_transaction(TransactStatements=[ 

131 dynamo_statement_args(transaction, formatter)] 

132 )) 

133 except (ClientError, ParamValidationError) as e: 

134 logger.error(e, 'dynamo_transaction') 

135 logger.debug(f'\n\t\t{transaction}', 'dynamo_transaction') 

136 

137 

138def dynamo_statement(query, formatter=None): 

139 try: 

140 if isinstance(formatter, list): 

141 statements = [dynamo_statement_args( 

142 query, params) for params in formatter] 

143 

144 if len(statements) > DYNAMO_STATEMENT_LIMIT: 

145 

146 loops = len(statements) // DYNAMO_STATEMENT_LIMIT + \ 

147 (1 if len(statements) % DYNAMO_STATEMENT_LIMIT != 0 else 0) 

148 

149 return [ 

150 dynamo_params_to_json( 

151 dynamo_client().batch_execute_statement( 

152 Statements=statements[i*DYNAMO_STATEMENT_LIMIT: 

153 (i+1)*DYNAMO_STATEMENT_LIMIT] 

154 )) for i in range(0, loops) 

155 ] 

156 return dynamo_params_to_json( 

157 dynamo_client().batch_execute_statement(Statements=statements) 

158 ) 

159 return dynamo_params_to_json( 

160 dynamo_client().execute_statement(**dynamo_statement_args(query, formatter) 

161 )) 

162 except (ClientError, ParamValidationError) as e: 

163 logger.error(e, 'dynamo_statement') 

164 logger.debug(f'\n\t\t{query}', 'dynamo_statement') 

165 return e 

166 

167 

168def dynamo_drop_table(tables: Union[str, List[str]]) -> bool: 

169 try: 

170 if isinstance(tables, list): 

171 for tbl in tables: 

172 db_table = dynamo_resource().Table(tbl) 

173 db_table.delete() 

174 return True 

175 elif isinstance(tables, str): 

176 db_table = dynamo_resource().Table(tables) 

177 db_table.delete() 

178 return True 

179 else: 

180 raise ValueError('`table` must be instance of `str` or `list`!') 

181 except (ClientError, ParamValidationError) as e: 

182 logger.error(e, 'dynamo_drop_table') 

183 return False