| 1 | import json |
| 2 | |
| 1 | 3 | class RestAPI: |
| 2 | 4 | def __init__(self, database: dict | None = None): |
| 3 | | pass |
| 4 | | |
| 5 | """ |
| 6 | Initialize the REST API with a database. |
| 7 | |
| 8 | Args: |
| 9 | database: A dictionary containing user data. |
| 10 | """ |
| 11 | self.database = database if database else {"users": []} |
| 12 | |
| 5 | 13 | def get(self, url: str, payload: str | None = None) -> str: |
| 6 | | pass |
| 7 | | |
| 14 | """ |
| 15 | Handle GET requests to the API. |
| 16 | |
| 17 | Args: |
| 18 | url: The endpoint URL. |
| 19 | payload: Optional JSON payload. |
| 20 | |
| 21 | Returns: |
| 22 | A JSON string response. |
| 23 | """ |
| 24 | if url == "/users": |
| 25 | if payload is None: |
| 26 | # Return all users sorted by name |
| 27 | users = self._get_all_users() |
| 28 | return json.dumps({"users": users}) |
| 29 | else: |
| 30 | # Return specific users sorted by name |
| 31 | data = json.loads(payload) |
| 32 | user_names = data.get("users", []) |
| 33 | |
| 34 | # Edge Case: Empty user list in payload |
| 35 | if not user_names: |
| 36 | return json.dumps({"users": []}) |
| 37 | |
| 38 | users = self._get_users_by_names(user_names) |
| 39 | return json.dumps({"users": users}) |
| 40 | |
| 41 | # Edge Case: Invalid URL |
| 42 | return json.dumps({"error": "Invalid URL"}) |
| 43 | |
| 8 | 44 | def post(self, url: str, payload: str | None = None) -> str: |
| 9 | | pass |
| 45 | """ |
| 46 | Handle POST requests to the API. |
| 47 | |
| 48 | Args: |
| 49 | url: The endpoint URL. |
| 50 | payload: JSON payload. |
| 51 | |
| 52 | Returns: |
| 53 | A JSON string response. |
| 54 | """ |
| 55 | if payload is None: |
| 56 | # Edge Case: Missing payload |
| 57 | return json.dumps({"error": "Payload required"}) |
| 58 | |
| 59 | data = json.loads(payload) |
| 60 | |
| 61 | if url == "/add": |
| 62 | user_name = data.get("user") |
| 63 | |
| 64 | # Edge Case: Missing user name |
| 65 | if not user_name: |
| 66 | return json.dumps({"error": "User name required"}) |
| 67 | |
| 68 | # Edge Case: User already exists |
| 69 | if self._user_exists(user_name): |
| 70 | return json.dumps({"error": "User already exists"}) |
| 71 | |
| 72 | new_user = { |
| 73 | "name": user_name, |
| 74 | "owes": {}, |
| 75 | "owed_by": {}, |
| 76 | "balance": 0.0 |
| 77 | } |
| 78 | |
| 79 | self.database["users"].append(new_user) |
| 80 | return json.dumps(new_user) |
| 81 | |
| 82 | elif url == "/iou": |
| 83 | lender_name = data.get("lender") |
| 84 | borrower_name = data.get("borrower") |
| 85 | amount = data.get("amount") |
| 86 | |
| 87 | # Edge Case: Missing required fields |
| 88 | if not lender_name or not borrower_name or amount is None: |
| 89 | return json.dumps({"error": "Lender, borrower, and amount required"}) |
| 90 | |
| 91 | # Edge Case: Invalid amount type |
| 92 | if not isinstance(amount, (int, float)): |
| 93 | return json.dumps({"error": "Amount must be a number"}) |
| 94 | |
| 95 | # Edge Case: Zero amount |
| 96 | if amount == 0: |
| 97 | return json.dumps({"error": "Amount must be non-zero"}) |
| 98 | |
| 99 | # Edge Case: Lender or borrower does not exist |
| 100 | if not self._user_exists(lender_name) or not self._user_exists(borrower_name): |
| 101 | return json.dumps({"error": "Both users must exist"}) |
| 102 | |
| 103 | # Edge Case: Lender and borrower are the same |
| 104 | if lender_name == borrower_name: |
| 105 | return json.dumps({"error": "Lender and borrower cannot be the same"}) |
| 106 | |
| 107 | # Update IOUs |
| 108 | self._update_iou(lender_name, borrower_name, amount) |
| 109 | |
| 110 | # Return updated user objects |
| 111 | lender = self._get_user_by_name(lender_name) |
| 112 | borrower = self._get_user_by_name(borrower_name) |
| 113 | |
| 114 | users = sorted([lender, borrower], key=lambda x: x["name"]) |
| 115 | return json.dumps({"users": users}) |
| 116 | |
| 117 | # Edge Case: Invalid URL |
| 118 | return json.dumps({"error": "Invalid URL"}) |
| 119 | |
| 120 | def _get_all_users(self) -> list: |
| 121 | """ |
| 122 | Get all users from the database, sorted by name. |
| 123 | |
| 124 | Returns: |
| 125 | A list of user objects sorted by name. |
| 126 | """ |
| 127 | return sorted(self.database["users"], key=lambda x: x["name"]) |
| 128 | |
| 129 | def _get_users_by_names(self, names: list) -> list: |
| 130 | """ |
| 131 | Get specific users by their names. |
| 132 | |
| 133 | Args: |
| 134 | names: A list of user names. |
| 135 | |
| 136 | Returns: |
| 137 | A list of user objects sorted by name. |
| 138 | """ |
| 139 | users = [user for user in self.database["users"] if user["name"] in names] |
| 140 | return sorted(users, key=lambda x: x["name"]) |
| 141 | |
| 142 | def _user_exists(self, name: str) -> bool: |
| 143 | """ |
| 144 | Check if a user exists in the database. |
| 145 | |
| 146 | Args: |
| 147 | name: The name of the user. |
| 148 | |
| 149 | Returns: |
| 150 | True if the user exists, False otherwise. |
| 151 | """ |
| 152 | return any(user["name"] == name for user in self.database["users"]) |
| 153 | |
| 154 | def _get_user_by_name(self, name: str) -> dict | None: |
| 155 | """ |
| 156 | Get a user by their name. |
| 157 | |
| 158 | Args: |
| 159 | name: The name of the user. |
| 160 | |
| 161 | Returns: |
| 162 | The user object if found, None otherwise. |
| 163 | """ |
| 164 | for user in self.database["users"]: |
| 165 | if user["name"] == name: |
| 166 | return user |
| 167 | return None |
| 168 | |
| 169 | def _update_iou(self, lender_name: str, borrower_name: str, amount: float) -> None: |
| 170 | """ |
| 171 | Update the IOU records between lender and borrower. |
| 172 | |
| 173 | Args: |
| 174 | lender_name: The name of the lender. |
| 175 | borrower_name: The name of the borrower. |
| 176 | amount: The amount of the IOU. |
| 177 | """ |
| 178 | # Handle negative amounts by reversing roles |
| 179 | if amount < 0: |
| 180 | lender_name, borrower_name = borrower_name, lender_name |
| 181 | amount = -amount |
| 182 | |
| 183 | lender = self._get_user_by_name(lender_name) |
| 184 | borrower = self._get_user_by_name(borrower_name) |
| 185 | |
| 186 | # Check if borrower already owes money to lender (reverse debt) |
| 187 | if lender_name in borrower["owed_by"]: |
| 188 | existing_debt = borrower["owed_by"][lender_name] |
| 189 | |
| 190 | if amount >= existing_debt: |
| 191 | # Overpaying: clear existing debt and create reverse debt |
| 192 | del borrower["owed_by"][lender_name] |
| 193 | del lender["owes"][borrower_name] |
| 194 | |
| 195 | remaining_amount = amount - existing_debt |
| 196 | if remaining_amount > 0: |
| 197 | # Create new debt in opposite direction |
| 198 | if borrower_name in lender["owed_by"]: |
| 199 | lender["owed_by"][borrower_name] += remaining_amount |
| 200 | else: |
| 201 | lender["owed_by"][borrower_name] = remaining_amount |
| 202 | |
| 203 | if lender_name in borrower["owes"]: |
| 204 | borrower["owes"][lender_name] += remaining_amount |
| 205 | else: |
| 206 | borrower["owes"][lender_name] = remaining_amount |
| 207 | else: |
| 208 | # Partial payment: reduce existing debt |
| 209 | borrower["owed_by"][lender_name] -= amount |
| 210 | lender["owes"][borrower_name] -= amount |
| 211 | |
| 212 | # Clean up if debt reaches zero |
| 213 | if borrower["owed_by"][lender_name] == 0: |
| 214 | del borrower["owed_by"][lender_name] |
| 215 | del lender["owes"][borrower_name] |
| 216 | else: |
| 217 | # No existing reverse debt, create new debt |
| 218 | if borrower_name in lender["owed_by"]: |
| 219 | lender["owed_by"][borrower_name] += amount |
| 220 | else: |
| 221 | lender["owed_by"][borrower_name] = amount |
| 222 | |
| 223 | if lender_name in borrower["owes"]: |
| 224 | borrower["owes"][lender_name] += amount |
| 225 | else: |
| 226 | borrower["owes"][lender_name] = amount |
| 227 | |
| 228 | # Update balances |
| 229 | self._update_balance(lender) |
| 230 | self._update_balance(borrower) |
| 231 | |
| 232 | def _update_balance(self, user: dict) -> None: |
| 233 | """ |
| 234 | Update the balance for a user based on their owes and owed_by. |
| 235 | |
| 236 | Args: |
| 237 | user: The user object to update. |
| 238 | """ |
| 239 | total_owed_by = sum(user["owed_by"].values()) |
| 240 | total_owes = sum(user["owes"].values()) |
| 241 | user["balance"] = round(total_owed_by - total_owes, 2) |
| 242 | |
| 243 | # Handled Edge Cases: Invalid URL, Missing payload, Missing user name, User already exists, Missing required fields, Invalid amount type, Negative amount, Lender or borrower does not exist, Lender and borrower are the same, Empty user list in payload |