| 1 | | from typing import Callable |
| 1 | from typing import Callable, List, Set |
| 2 | from abc import ABC, abstractmethod |
| 3 | from collections import deque |
| 2 | 4 | |
| 5 | # Global propagation tracking |
| 6 | _propagation_in_progress = False |
| 7 | _pending_callbacks: List[tuple[Callable, int]] = [] |
| 8 | _cells_to_update: Set['ComputeCell'] = set() |
| 3 | 9 | |
| 4 | | class InputCell: |
| 10 | |
| 11 | class Cell(ABC): |
| 12 | def __init__(self): |
| 13 | self._observers: Set['ComputeCell'] = set() |
| 14 | self._callbacks: List[Callable] = [] |
| 15 | self._previous_value = None |
| 16 | |
| 17 | def add_observer(self, observer: 'ComputeCell') -> None: |
| 18 | self._observers.add(observer) |
| 19 | |
| 20 | def remove_observer(self, observer: 'ComputeCell') -> None: |
| 21 | self._observers.discard(observer) |
| 22 | |
| 23 | def notify_observers(self) -> None: |
| 24 | for observer in self._observers: |
| 25 | observer.update() |
| 26 | |
| 27 | def _start_propagation(self) -> None: |
| 28 | """Start a propagation cycle.""" |
| 29 | global _propagation_in_progress, _pending_callbacks, _cells_to_update |
| 30 | if not _propagation_in_progress: |
| 31 | _propagation_in_progress = True |
| 32 | _pending_callbacks.clear() |
| 33 | _cells_to_update.clear() |
| 34 | |
| 35 | def _end_propagation(self) -> None: |
| 36 | """End propagation and execute pending callbacks.""" |
| 37 | global _propagation_in_progress, _pending_callbacks, _cells_to_update |
| 38 | if _propagation_in_progress: |
| 39 | _propagation_in_progress = False |
| 40 | # Execute all pending callbacks |
| 41 | for callback, value in _pending_callbacks: |
| 42 | callback(value) |
| 43 | _pending_callbacks.clear() |
| 44 | _cells_to_update.clear() |
| 45 | |
| 46 | def _queue_callback(self, callback: Callable, value: int) -> None: |
| 47 | """Queue a callback to be executed when propagation ends.""" |
| 48 | global _propagation_in_progress, _pending_callbacks |
| 49 | if _propagation_in_progress: |
| 50 | _pending_callbacks.append((callback, value)) |
| 51 | else: |
| 52 | callback(value) |
| 53 | |
| 54 | def _mark_for_update(self, cell: 'ComputeCell') -> None: |
| 55 | """Mark a compute cell for updating during propagation.""" |
| 56 | global _cells_to_update |
| 57 | _cells_to_update.add(cell) |
| 58 | |
| 59 | @abstractmethod |
| 60 | def update(self) -> None: |
| 61 | pass |
| 62 | |
| 63 | |
| 64 | class InputCell(Cell): |
| 5 | 65 | def __init__(self, initial_value: int): |
| 6 | | self.value = None |
| 66 | super().__init__() |
| 67 | self._value = initial_value |
| 68 | self._previous_value = initial_value |
| 7 | 69 | |
| 70 | @property |
| 71 | def value(self) -> int: |
| 72 | return self._value |
| 8 | 73 | |
| 9 | | class ComputeCell: |
| 10 | | def __init__(self, inputs: list, compute_function: Callable): |
| 11 | | self.value = None |
| 74 | @value.setter |
| 75 | def value(self, new_value: int) -> None: |
| 76 | # Edge Case: Setting the same value should not trigger updates |
| 77 | if self._value == new_value: |
| 78 | return |
| 79 | self._value = new_value |
| 80 | # Start propagation cycle |
| 81 | self._start_propagation() |
| 82 | try: |
| 83 | # Mark all observers for update |
| 84 | for observer in self._observers: |
| 85 | self._mark_for_update(observer) |
| 86 | # Process all updates in batch |
| 87 | self._process_updates() |
| 88 | finally: |
| 89 | self._end_propagation() |
| 12 | 90 | |
| 13 | | def add_callback(self, callback: Callable) -> None: |
| 91 | def _process_updates(self) -> None: |
| 92 | """Process all marked updates until no more changes occur.""" |
| 93 | global _cells_to_update |
| 94 | changed = True |
| 95 | while changed: |
| 96 | changed = False |
| 97 | # Make a copy to avoid modification during iteration |
| 98 | cells_to_process = list(_cells_to_update) |
| 99 | _cells_to_update.clear() |
| 100 | |
| 101 | for cell in cells_to_process: |
| 102 | old_value = cell._value |
| 103 | cell._compute_value() |
| 104 | if old_value != cell._value: |
| 105 | changed = True |
| 106 | # Queue callbacks for this cell |
| 107 | for callback in cell._callbacks: |
| 108 | if callback not in cell._callbacks_to_values: |
| 109 | cell._callbacks_to_values[callback] = [] |
| 110 | cell._callbacks_to_values[callback].append(cell._value) |
| 111 | cell._queue_callback(callback, cell._value) |
| 112 | # Mark dependent cells for update |
| 113 | for observer in cell._observers: |
| 114 | self._mark_for_update(observer) |
| 115 | |
| 116 | def update(self) -> None: |
| 117 | # Input cells don't depend on other cells, so this is a no-op |
| 14 | 118 | pass |
| 15 | 119 | |
| 16 | | def remove_callback(self, callback: Callable) -> None: |
| 120 | # Handled Edge Cases: Setting same value, input cell update behavior |
| 121 | |
| 122 | |
| 123 | class ComputeCell(Cell): |
| 124 | def __init__(self, inputs: List[Cell], compute_function: Callable): |
| 125 | super().__init__() |
| 126 | self._inputs = inputs |
| 127 | self._compute_function = compute_function |
| 128 | self._value = None |
| 129 | self._callbacks_to_values = {} |
| 130 | |
| 131 | # Register this compute cell as an observer of its inputs |
| 132 | for input_cell in self._inputs: |
| 133 | input_cell.add_observer(self) |
| 134 | |
| 135 | # Compute initial value |
| 136 | self._compute_value() |
| 137 | |
| 138 | @property |
| 139 | def value(self) -> int: |
| 140 | return self._value |
| 141 | |
| 142 | def _compute_value(self) -> None: |
| 143 | # Edge Case: Compute function may raise exception, but we let it propagate |
| 144 | self._value = self._compute_function([cell.value for cell in self._inputs]) |
| 145 | |
| 146 | def update(self) -> None: |
| 147 | # This method is now handled by the batch update system |
| 148 | # The actual update logic is in InputCell._process_updates |
| 17 | 149 | pass |
| 18 | | |
| 150 | |
| 151 | def add_callback(self, callback: Callable) -> None: |
| 152 | # Edge Case: Adding the same callback multiple times |
| 153 | if callback not in self._callbacks: |
| 154 | self._callbacks.append(callback) |
| 155 | self._callbacks_to_values[callback] = [] |
| 156 | |
| 157 | def remove_callback(self, callback: Callable) -> None: |
| 158 | # Edge Case: Removing a callback that was never added |
| 159 | if callback in self._callbacks: |
| 160 | self._callbacks.remove(callback) |
| 161 | # Clean up stored values for this callback |
| 162 | if callback in self._callbacks_to_values: |
| 163 | del self._callbacks_to_values[callback] |
| 164 | |
| 165 | def get_callback_values(self, callback: Callable) -> List[int]: |
| 166 | # Helper method to retrieve values sent to a callback |
| 167 | # Edge Case: Getting values for a callback that was never added |
| 168 | if callback in self._callbacks_to_values: |
| 169 | values = self._callbacks_to_values[callback] |
| 170 | self._callbacks_to_values[callback] = [] # Reset after retrieval |
| 171 | return values |
| 172 | return [] |
| 173 | |
| 174 | # Handled Edge Cases: Value change detection, callback exceptions, duplicate callbacks, |
| 175 | # removing non-existent callbacks, retrieving callback values, |
| 176 | # compute function exceptions, input cell update behavior |