| 1 | import json |
| 2 | import sqlite3 |
| 3 | from typing import Dict, List, Optional |
| 4 | |
| 1 | 5 | class RestAPI: |
| 2 | 6 | def __init__(self, database: dict | None = None): |
| 3 | | pass |
| 4 | | |
| 7 | # Create an in-memory SQLite database |
| 8 | self.conn = sqlite3.connect(':memory:', check_same_thread=False) |
| 9 | self.conn.row_factory = sqlite3.Row # Enable column access by name |
| 10 | self.create_tables() |
| 11 | |
| 12 | # If initial database provided, populate it |
| 13 | if database and 'users' in database: |
| 14 | self.populate_initial_data(database['users']) |
| 15 | |
| 16 | def create_tables(self): |
| 17 | # Create users table |
| 18 | self.conn.execute(''' |
| 19 | CREATE TABLE IF NOT EXISTS users ( |
| 20 | id INTEGER PRIMARY KEY AUTOINCREMENT, |
| 21 | name TEXT UNIQUE NOT NULL |
| 22 | ) |
| 23 | ''') |
| 24 | |
| 25 | # Create ious table |
| 26 | self.conn.execute(''' |
| 27 | CREATE TABLE IF NOT EXISTS ious ( |
| 28 | id INTEGER PRIMARY KEY AUTOINCREMENT, |
| 29 | lender_id INTEGER NOT NULL, |
| 30 | borrower_id INTEGER NOT NULL, |
| 31 | amount REAL NOT NULL, |
| 32 | FOREIGN KEY (lender_id) REFERENCES users (id), |
| 33 | FOREIGN KEY (borrower_id) REFERENCES users (id) |
| 34 | ) |
| 35 | ''') |
| 36 | |
| 37 | self.conn.commit() |
| 38 | |
| 39 | def populate_initial_data(self, users: List[Dict]): |
| 40 | for user_data in users: |
| 41 | # Add user |
| 42 | self.conn.execute('INSERT OR IGNORE INTO users (name) VALUES (?)', (user_data['name'],)) |
| 43 | |
| 44 | # Add owed amounts (this user is owed by others) |
| 45 | if 'owed_by' in user_data: |
| 46 | lender_id = self.get_user_id(user_data['name']) |
| 47 | for borrower_name, amount in user_data['owed_by'].items(): |
| 48 | # Ensure borrower exists |
| 49 | self.conn.execute('INSERT OR IGNORE INTO users (name) VALUES (?)', (borrower_name,)) |
| 50 | borrower_id = self.get_user_id(borrower_name) |
| 51 | # Add IOU record |
| 52 | self.conn.execute( |
| 53 | 'INSERT INTO ious (lender_id, borrower_id, amount) VALUES (?, ?, ?)', |
| 54 | (lender_id, borrower_id, amount) |
| 55 | ) |
| 56 | self.conn.commit() |
| 57 | |
| 58 | def get_user_id(self, name: str) -> Optional[int]: |
| 59 | cursor = self.conn.execute('SELECT id FROM users WHERE name = ?', (name,)) |
| 60 | row = cursor.fetchone() |
| 61 | return row['id'] if row else None |
| 62 | |
| 63 | def get_user_name(self, user_id: int) -> Optional[str]: |
| 64 | cursor = self.conn.execute('SELECT name FROM users WHERE id = ?', (user_id,)) |
| 65 | row = cursor.fetchone() |
| 66 | return row['name'] if row else None |
| 67 | |
| 68 | def get_user_object(self, name: str) -> Optional[Dict]: |
| 69 | user_id = self.get_user_id(name) |
| 70 | if not user_id: |
| 71 | return None |
| 72 | |
| 73 | # Calculate amounts owed by this user to others |
| 74 | owes_cursor = self.conn.execute(''' |
| 75 | SELECT u.name, SUM(i.amount) as amount |
| 76 | FROM ious i |
| 77 | JOIN users u ON u.id = i.lender_id |
| 78 | WHERE i.borrower_id = ? |
| 79 | GROUP BY u.id |
| 80 | ''', (user_id,)) |
| 81 | |
| 82 | owes = {} |
| 83 | total_owes = 0 |
| 84 | for row in owes_cursor: |
| 85 | owes[row['name']] = row['amount'] |
| 86 | total_owes += row['amount'] |
| 87 | |
| 88 | # Calculate amounts owed to this user by others |
| 89 | owed_by_cursor = self.conn.execute(''' |
| 90 | SELECT u.name, SUM(i.amount) as amount |
| 91 | FROM ious i |
| 92 | JOIN users u ON u.id = i.borrower_id |
| 93 | WHERE i.lender_id = ? |
| 94 | GROUP BY u.id |
| 95 | ''', (user_id,)) |
| 96 | |
| 97 | owed_by = {} |
| 98 | total_owed_by = 0 |
| 99 | for row in owed_by_cursor: |
| 100 | owed_by[row['name']] = row['amount'] |
| 101 | total_owed_by += row['amount'] |
| 102 | |
| 103 | balance = round(total_owed_by - total_owes, 2) |
| 104 | |
| 105 | return { |
| 106 | 'name': name, |
| 107 | 'owes': owes, |
| 108 | 'owed_by': owed_by, |
| 109 | 'balance': balance |
| 110 | } |
| 111 | |
| 5 | 112 | def get(self, url: str, payload: str | None = None) -> str: |
| 6 | | pass |
| 7 | | |
| 113 | if url == '/users': |
| 114 | if payload is None: |
| 115 | # Return all users |
| 116 | cursor = self.conn.execute('SELECT name FROM users ORDER BY name') |
| 117 | user_names = [row['name'] for row in cursor.fetchall()] |
| 118 | users = [self.get_user_object(name) for name in user_names] |
| 119 | return json.dumps({'users': users}) |
| 120 | else: |
| 121 | # Return specified users |
| 122 | payload_data = json.loads(payload) |
| 123 | if 'users' not in payload_data: |
| 124 | return json.dumps({'error': 'Invalid payload'}) |
| 125 | |
| 126 | # Edge Case: Empty users list in payload |
| 127 | if not payload_data['users']: |
| 128 | return json.dumps({'users': []}) |
| 129 | |
| 130 | # Edge Case: Non-list users in payload |
| 131 | if not isinstance(payload_data['users'], list): |
| 132 | return json.dumps({'error': 'Invalid users format'}) |
| 133 | |
| 134 | user_names = sorted(payload_data['users']) |
| 135 | users = [] |
| 136 | for name in user_names: |
| 137 | user_obj = self.get_user_object(name) |
| 138 | # Edge Case: Requested user does not exist |
| 139 | if user_obj is None: |
| 140 | continue |
| 141 | users.append(user_obj) |
| 142 | return json.dumps({'users': users}) |
| 143 | else: |
| 144 | # Edge Case: Invalid URL for GET request |
| 145 | return json.dumps({'error': 'Invalid URL'}) |
| 146 | |
| 8 | 147 | def post(self, url: str, payload: str | None = None) -> str: |
| 9 | | pass |
| 148 | if payload is None: |
| 149 | # Edge Case: Missing payload for POST request |
| 150 | return json.dumps({'error': 'Missing payload'}) |
| 151 | |
| 152 | try: |
| 153 | payload_data = json.loads(payload) |
| 154 | except json.JSONDecodeError: |
| 155 | # Edge Case: Invalid JSON in payload |
| 156 | return json.dumps({'error': 'Invalid JSON'}) |
| 157 | |
| 158 | if url == '/add': |
| 159 | if 'user' not in payload_data: |
| 160 | # Edge Case: Missing user field in add request |
| 161 | return json.dumps({'error': 'Missing user field'}) |
| 162 | |
| 163 | name = payload_data['user'] |
| 164 | |
| 165 | # Edge Case: User already exists |
| 166 | if self.get_user_id(name) is not None: |
| 167 | return json.dumps({'error': 'User already exists'}) |
| 168 | |
| 169 | try: |
| 170 | self.conn.execute('INSERT INTO users (name) VALUES (?)', (name,)) |
| 171 | self.conn.commit() |
| 172 | except sqlite3.Error: |
| 173 | # Edge Case: Database error during user creation |
| 174 | return json.dumps({'error': 'Database error'}) |
| 175 | |
| 176 | return json.dumps(self.get_user_object(name)) |
| 177 | |
| 178 | elif url == '/iou': |
| 179 | required_fields = ['lender', 'borrower', 'amount'] |
| 180 | for field in required_fields: |
| 181 | if field not in payload_data: |
| 182 | # Edge Case: Missing required field in IOU request |
| 183 | return json.dumps({'error': f'Missing {field} field'}) |
| 184 | |
| 185 | lender_name = payload_data['lender'] |
| 186 | borrower_name = payload_data['borrower'] |
| 187 | amount = payload_data['amount'] |
| 188 | |
| 189 | # Edge Case: Lender and borrower are the same person |
| 190 | if lender_name == borrower_name: |
| 191 | return json.dumps({'error': 'Lender and borrower cannot be the same'}) |
| 192 | |
| 193 | # Edge Case: Negative or zero amount |
| 194 | if amount <= 0: |
| 195 | return json.dumps({'error': 'Amount must be positive'}) |
| 196 | |
| 197 | # Edge Case: Lender does not exist |
| 198 | lender_id = self.get_user_id(lender_name) |
| 199 | if lender_id is None: |
| 200 | return json.dumps({'error': 'Lender does not exist'}) |
| 201 | |
| 202 | # Edge Case: Borrower does not exist |
| 203 | borrower_id = self.get_user_id(borrower_name) |
| 204 | if borrower_id is None: |
| 205 | return json.dumps({'error': 'Borrower does not exist'}) |
| 206 | |
| 207 | try: |
| 208 | self.conn.execute( |
| 209 | 'INSERT INTO ious (lender_id, borrower_id, amount) VALUES (?, ?, ?)', |
| 210 | (lender_id, borrower_id, amount) |
| 211 | ) |
| 212 | self.conn.commit() |
| 213 | except sqlite3.Error: |
| 214 | # Edge Case: Database error during IOU creation |
| 215 | return json.dumps({'error': 'Database error'}) |
| 216 | |
| 217 | # Return updated user objects |
| 218 | lender_obj = self.get_user_object(lender_name) |
| 219 | borrower_obj = self.get_user_object(borrower_name) |
| 220 | |
| 221 | # Edge Case: Could not retrieve updated user objects |
| 222 | if lender_obj is None or borrower_obj is None: |
| 223 | return json.dumps({'error': 'Could not retrieve updated user data'}) |
| 224 | |
| 225 | users = sorted([lender_obj, borrower_obj], key=lambda x: x['name']) |
| 226 | return json.dumps({'users': users}) |
| 227 | else: |
| 228 | # Edge Case: Invalid URL for POST request |
| 229 | return json.dumps({'error': 'Invalid URL'}) |
| 230 | |
| 231 | def __del__(self): |
| 232 | if hasattr(self, 'conn'): |
| 233 | self.conn.close() |
| 234 | |
| 235 | # Handled Edge Cases: Invalid URL for GET request, Empty users list in payload, Non-list users in payload, Requested user does not exist, Missing payload for POST request, Invalid JSON in payload, Missing user field in add request, User already exists, Database error during user creation, Missing required field in IOU request, Lender and borrower are the same person, Negative or zero amount, Lender does not exist, Borrower does not exist, Database error during IOU creation, Could not retrieve updated user objects, Invalid URL for POST request |