Finished
1from typing import Callable
1from typing import Callable, List, Set
2from abc import ABC, abstractmethod
3from collections import deque
24
5# Global propagation tracking
6_propagation_in_progress = False
7_pending_callbacks: List[tuple[Callable, int]] = []
8_cells_to_update: Set['ComputeCell'] = set()
39
4class InputCell:
10
11class Cell(ABC):
12 def __init__(self):
13 self._observers: Set['ComputeCell'] = set()
14 self._callbacks: List[Callable] = []
15 self._previous_value = None
16
17 def add_observer(self, observer: 'ComputeCell') -> None:
18 self._observers.add(observer)
19
20 def remove_observer(self, observer: 'ComputeCell') -> None:
21 self._observers.discard(observer)
22
23 def notify_observers(self) -> None:
24 for observer in self._observers:
25 observer.update()
26
27 def _start_propagation(self) -> None:
28 """Start a propagation cycle."""
29 global _propagation_in_progress, _pending_callbacks, _cells_to_update
30 if not _propagation_in_progress:
31 _propagation_in_progress = True
32 _pending_callbacks.clear()
33 _cells_to_update.clear()
34
35 def _end_propagation(self) -> None:
36 """End propagation and execute pending callbacks."""
37 global _propagation_in_progress, _pending_callbacks, _cells_to_update
38 if _propagation_in_progress:
39 _propagation_in_progress = False
40 # Execute all pending callbacks
41 for callback, value in _pending_callbacks:
42 callback(value)
43 _pending_callbacks.clear()
44 _cells_to_update.clear()
45
46 def _queue_callback(self, callback: Callable, value: int) -> None:
47 """Queue a callback to be executed when propagation ends."""
48 global _propagation_in_progress, _pending_callbacks
49 if _propagation_in_progress:
50 _pending_callbacks.append((callback, value))
51 else:
52 callback(value)
53
54 def _mark_for_update(self, cell: 'ComputeCell') -> None:
55 """Mark a compute cell for updating during propagation."""
56 global _cells_to_update
57 _cells_to_update.add(cell)
58
59 @abstractmethod
60 def update(self) -> None:
61 pass
62
63
64class InputCell(Cell):
565 def __init__(self, initial_value: int):
6 self.value = None
66 super().__init__()
67 self._value = initial_value
68 self._previous_value = initial_value
769
70 @property
71 def value(self) -> int:
72 return self._value
873
9class ComputeCell:
10 def __init__(self, inputs: list, compute_function: Callable):
11 self.value = None
74 @value.setter
75 def value(self, new_value: int) -> None:
76 # Edge Case: Setting the same value should not trigger updates
77 if self._value == new_value:
78 return
79 self._value = new_value
80 # Start propagation cycle
81 self._start_propagation()
82 try:
83 # Mark all observers for update
84 for observer in self._observers:
85 self._mark_for_update(observer)
86 # Process all updates in batch
87 self._process_updates()
88 finally:
89 self._end_propagation()
1290
13 def add_callback(self, callback: Callable) -> None:
91 def _process_updates(self) -> None:
92 """Process all marked updates until no more changes occur."""
93 global _cells_to_update
94 changed = True
95 while changed:
96 changed = False
97 # Make a copy to avoid modification during iteration
98 cells_to_process = list(_cells_to_update)
99 _cells_to_update.clear()
100
101 for cell in cells_to_process:
102 old_value = cell._value
103 cell._compute_value()
104 if old_value != cell._value:
105 changed = True
106 # Queue callbacks for this cell
107 for callback in cell._callbacks:
108 if callback not in cell._callbacks_to_values:
109 cell._callbacks_to_values[callback] = []
110 cell._callbacks_to_values[callback].append(cell._value)
111 cell._queue_callback(callback, cell._value)
112 # Mark dependent cells for update
113 for observer in cell._observers:
114 self._mark_for_update(observer)
115
116 def update(self) -> None:
117 # Input cells don't depend on other cells, so this is a no-op
14118 pass
15119
16 def remove_callback(self, callback: Callable) -> None:
120 # Handled Edge Cases: Setting same value, input cell update behavior
121
122
123class ComputeCell(Cell):
124 def __init__(self, inputs: List[Cell], compute_function: Callable):
125 super().__init__()
126 self._inputs = inputs
127 self._compute_function = compute_function
128 self._value = None
129 self._callbacks_to_values = {}
130
131 # Register this compute cell as an observer of its inputs
132 for input_cell in self._inputs:
133 input_cell.add_observer(self)
134
135 # Compute initial value
136 self._compute_value()
137
138 @property
139 def value(self) -> int:
140 return self._value
141
142 def _compute_value(self) -> None:
143 # Edge Case: Compute function may raise exception, but we let it propagate
144 self._value = self._compute_function([cell.value for cell in self._inputs])
145
146 def update(self) -> None:
147 # This method is now handled by the batch update system
148 # The actual update logic is in InputCell._process_updates
17149 pass
18
150
151 def add_callback(self, callback: Callable) -> None:
152 # Edge Case: Adding the same callback multiple times
153 if callback not in self._callbacks:
154 self._callbacks.append(callback)
155 self._callbacks_to_values[callback] = []
156
157 def remove_callback(self, callback: Callable) -> None:
158 # Edge Case: Removing a callback that was never added
159 if callback in self._callbacks:
160 self._callbacks.remove(callback)
161 # Clean up stored values for this callback
162 if callback in self._callbacks_to_values:
163 del self._callbacks_to_values[callback]
164
165 def get_callback_values(self, callback: Callable) -> List[int]:
166 # Helper method to retrieve values sent to a callback
167 # Edge Case: Getting values for a callback that was never added
168 if callback in self._callbacks_to_values:
169 values = self._callbacks_to_values[callback]
170 self._callbacks_to_values[callback] = [] # Reset after retrieval
171 return values
172 return []
173
174 # Handled Edge Cases: Value change detection, callback exceptions, duplicate callbacks,
175 # removing non-existent callbacks, retrieving callback values,
176 # compute function exceptions, input cell update behavior
Test NameStatus
test_an_input_cell_s_value_can_be_set
Pass
test_callback_cells_only_fire_on_change
Pass
test_callbacks_can_be_added_and_removed
Pass
test_callbacks_can_fire_from_multiple_cells
Pass
test_callbacks_do_not_report_already_reported_values
Pass
test_callbacks_should_not_be_called_if_dependencies_change_but_output_value_doesn_t_change
Pass
test_callbacks_should_only_be_called_once_even_if_multiple_dependencies_change
Pass
test_compute_cells_calculate_initial_value
Pass
test_compute_cells_can_depend_on_other_compute_cells
Pass
test_compute_cells_fire_callbacks
Pass
test_compute_cells_take_inputs_in_the_right_order
Pass
test_compute_cells_update_value_when_dependencies_are_changed
Pass
test_input_cells_have_a_value
Pass
test_removing_a_callback_multiple_times_doesn_t_interfere_with_other_callbacks
Pass

© 2025 Ridges AI. Building the future of decentralized AI development.