|
| 1 | +# EventLoader |
| 2 | + |
| 3 | +EventLoader is a component that loads DML events (INSERT/UPDATE/DELETE) captured from binlog and stored in DB tables, then delivers them to Redis for Worker processing. |
| 4 | + |
| 5 | +## Basic Operation |
| 6 | + |
| 7 | +``` |
| 8 | +EventHandler (binlog parsing) |
| 9 | + │ |
| 10 | + ▼ |
| 11 | +inserted_pk / updated_pk / deleted_pk tables (DB) |
| 12 | + │ |
| 13 | + ▼ |
| 14 | +EventLoader (loads from DB) |
| 15 | + │ |
| 16 | + ▼ |
| 17 | +Redis updated_pk_set / removed_pk_set |
| 18 | + │ |
| 19 | + ▼ |
| 20 | +Worker (actual data synchronization) |
| 21 | +``` |
| 22 | + |
| 23 | +### Core Functions |
| 24 | + |
| 25 | +1. **`get_start_timestamp()`**: Retrieves the last loaded timestamp from `apply_dml_events_status` table |
| 26 | +2. **`get_max_timestamp()`**: Retrieves the maximum timestamp across all event tables |
| 27 | +3. **`get_end_timestamp()`**: Dynamically adjusts batch size (reduces `batch_duration //= 2` if too many events) |
| 28 | +4. **`get_pk_batch()`**: Loads PKs within a timestamp range |
| 29 | +5. **`load_events_from_db()`**: Main loading logic |
| 30 | + |
| 31 | +--- |
| 32 | + |
| 33 | +## Challenging Part 1: Stage Transition with EventHandler |
| 34 | + |
| 35 | +### Stage Flow |
| 36 | + |
| 37 | +``` |
| 38 | +APPLY_DML_EVENTS → APPLY_DML_EVENTS_PRE_VALIDATION → APPLY_DML_EVENTS_VALIDATION |
| 39 | +``` |
| 40 | + |
| 41 | +### APPLY_DML_EVENTS → PRE_VALIDATION Transition |
| 42 | + |
| 43 | +`eventhandler.py:231-235`: |
| 44 | +```python |
| 45 | +def apply_dml_events(self): |
| 46 | + self.start_event_loader() |
| 47 | + if len(self.redis_data.updated_pk_set) == 0 and len(self.redis_data.removed_pk_set) == 0 and \ |
| 48 | + self.event_store.last_event_timestamp - self.event_loader.last_loaded_timestamp < 60: |
| 49 | + self.redis_data.set_current_stage(Stage.APPLY_DML_EVENTS_PRE_VALIDATION) |
| 50 | +``` |
| 51 | + |
| 52 | +Transition conditions: |
| 53 | +1. `updated_pk_set == 0`: Worker has processed all updated PKs |
| 54 | +2. `removed_pk_set == 0`: Worker has processed all deleted PKs |
| 55 | +3. `last_event_timestamp - last_loaded_timestamp < 60`: EventLoader has nearly caught up with binlog (within 60 seconds) |
| 56 | + |
| 57 | +### PRE_VALIDATION → VALIDATION Transition |
| 58 | + |
| 59 | +`eventhandler.py:237-258`: |
| 60 | +```python |
| 61 | +def apply_dml_events_pre_validation(self): |
| 62 | + self.start_event_loader() |
| 63 | + self.save() |
| 64 | + # ... count queries ... |
| 65 | + if inserted_count + updated_count + deleted_count > 0: |
| 66 | + while self.event_store.last_event_timestamp != self.event_loader.last_loaded_timestamp: |
| 67 | + if self.stop_flag: |
| 68 | + return |
| 69 | + time.sleep(60) |
| 70 | + self.event_loader.set_stop_flag() |
| 71 | + if self.are_indexes_created(): |
| 72 | + self.live_mode = True |
| 73 | + self.redis_data.set_current_stage(Stage.APPLY_DML_EVENTS_VALIDATION) |
| 74 | +``` |
| 75 | + |
| 76 | +Transition conditions: |
| 77 | +1. `last_event_timestamp == last_loaded_timestamp`: EventLoader has fully loaded all events |
| 78 | +2. Index creation completed |
| 79 | + |
| 80 | +### Key: last_loaded_timestamp Initialization Issue |
| 81 | + |
| 82 | +`eventloader.py:174-188`: |
| 83 | +```python |
| 84 | +if start_timestamp == 0 or start_timestamp > max_timestamp: |
| 85 | + self.logger.info("No events to load") |
| 86 | + if self.last_loaded_timestamp == 1: |
| 87 | + # Set last loaded timestamp to initial timestamp |
| 88 | + # By updating it here, eventhandler can move to next stage |
| 89 | + # Also it will prevent eventhandler from moving to next stage too early even before loading events |
| 90 | + with self.db.cursor(role='reader') as cursor: |
| 91 | + cursor.execute(f''' |
| 92 | + SELECT last_event_timestamp FROM {config.SBOSC_DB}.event_handler_status |
| 93 | + WHERE migration_id = %s ORDER BY id LIMIT 1 |
| 94 | + ''', (self.migration_id,)) |
| 95 | + if cursor.rowcount > 0: |
| 96 | + self.last_loaded_timestamp = cursor.fetchone()[0] |
| 97 | +``` |
| 98 | + |
| 99 | +**Why is this needed?** |
| 100 | + |
| 101 | +To handle the case when there are no DML events at all: |
| 102 | + |
| 103 | +1. EventHandler parses binlog and updates `event_handler_status.last_event_timestamp` |
| 104 | +2. But if there are no actual DML events, `inserted_pk`, `updated_pk`, `deleted_pk` tables remain empty |
| 105 | +3. EventLoader has nothing to load, so `last_loaded_timestamp` stays at initial value (1) |
| 106 | +4. The condition `last_event_timestamp - last_loaded_timestamp < 60` in `apply_dml_events()` is never satisfied |
| 107 | + |
| 108 | +**Solution:** |
| 109 | +- When there are no events and `last_loaded_timestamp` is still initial value (1), fetch EventHandler's `last_event_timestamp` |
| 110 | +- This allows the condition `last_event_timestamp - last_loaded_timestamp < 60` to be satisfied, enabling progression to next stage |
| 111 | + |
| 112 | +**Two purposes:** |
| 113 | +1. Enables progression to next stage even when there are no events |
| 114 | +2. Prevents premature stage transition before event loading has started (initial value 1 fails the condition) |
| 115 | + |
| 116 | +--- |
| 117 | + |
| 118 | +## Challenging Part 2: Empty Range Jump Logic |
| 119 | + |
| 120 | +### Problem Scenario |
| 121 | + |
| 122 | +Binlog timestamps are recorded in 1-second units. When there are no DML events in a specific timestamp range and events exist much later: |
| 123 | + |
| 124 | +``` |
| 125 | +Timeline: |
| 126 | +[3000] ─── event exists (start_timestamp) |
| 127 | +[3001-9999] ─── no events |
| 128 | +[10000] ─── next event |
| 129 | +``` |
| 130 | + |
| 131 | +**Issue with previous logic:** |
| 132 | +1. `get_pk_batch(3000, 6000)` called → only loads event at `start_timestamp=3000` |
| 133 | +2. No events greater than `start_timestamp` in range → `max_timestamp_in_batch = start_timestamp = 3000` |
| 134 | +3. Saves `last_loaded_timestamp = 3000` to `apply_dml_events_status` |
| 135 | +4. Next loop: `get_start_timestamp()` → returns `3000` |
| 136 | +5. **Infinite loop on same range (Stuck)** |
| 137 | + |
| 138 | +### Solution: Jump to next_timestamp |
| 139 | + |
| 140 | +`eventloader.py:204-208`: |
| 141 | +```python |
| 142 | +# Save last loaded event timestamp |
| 143 | +if max_timestamp_in_batch == start_timestamp and max_timestamp > start_timestamp: |
| 144 | + last_loaded_timestamp = next_timestamp |
| 145 | +else: |
| 146 | + last_loaded_timestamp = max_timestamp_in_batch |
| 147 | +``` |
| 148 | + |
| 149 | +**Condition Analysis:** |
| 150 | + |
| 151 | +| max_timestamp_in_batch | max_timestamp | Meaning | Action | |
| 152 | +|------------------------|---------------|---------|--------| |
| 153 | +| == start_timestamp | > start_timestamp | No events in current batch, exists later | Jump to next_timestamp | |
| 154 | +| == start_timestamp | == start_timestamp | Events only in current batch, none later | Retry same range (wait for new events) | |
| 155 | +| > start_timestamp | - | Events exist in current batch | Normal progression | |
| 156 | + |
| 157 | +**Example:** |
| 158 | + |
| 159 | +``` |
| 160 | +Scenario: start=3000, batch_duration=3000, next event=10000 |
| 161 | +
|
| 162 | +1. get_pk_batch(3000, 6000) → max_timestamp_in_batch = 3000 |
| 163 | +2. max_timestamp = 10000 (max across all tables) |
| 164 | +3. Check conditions: |
| 165 | + - max_timestamp_in_batch == start_timestamp? → True (3000 == 3000) |
| 166 | + - max_timestamp > start_timestamp? → True (10000 > 3000) |
| 167 | +4. last_loaded_timestamp = next_timestamp = 6000 (jump!) |
| 168 | +5. Next loop: start_timestamp = 6000 |
| 169 | +6. get_pk_batch(6000, 9000) → empty range again → jump to 9000 |
| 170 | +7. get_pk_batch(9000, 12000) → loads event at 10000! |
| 171 | +``` |
| 172 | + |
| 173 | +### Why Batches Overlap at Timestamp Boundaries |
| 174 | + |
| 175 | +Binlog timestamps have 1-second granularity, so multiple events can share the same timestamp. New events can also arrive at a timestamp that was already processed: |
| 176 | + |
| 177 | +``` |
| 178 | +Batch 1: [3000, 6000] → max_timestamp_in_batch = 5000 → save last_loaded_timestamp = 5000 |
| 179 | +Batch 2: [5000, 8000] → starts from 5000, not 5001 |
| 180 | +``` |
| 181 | + |
| 182 | +**Key design: Next batch starts from `last_loaded_timestamp`, not `last_loaded_timestamp + 1`** |
| 183 | + |
| 184 | +This ensures that if new events arrive at timestamp 5000 after Batch 1 completed, Batch 2 will still pick them up. |
| 185 | + |
| 186 | +**When `max_timestamp_in_batch == start_timestamp`:** |
| 187 | +- No events with timestamp greater than `start_timestamp` in the current batch |
| 188 | +- If `max_timestamp == start_timestamp`: no events beyond this timestamp in entire table yet |
| 189 | + - New events may still arrive at this timestamp from binlog |
| 190 | + - Save `last_loaded_timestamp = start_timestamp` and retry same range |
| 191 | +- If `max_timestamp > start_timestamp`: events exist further ahead |
| 192 | + - Safe to jump to `next_timestamp` since binlog has moved past current timestamp |
| 193 | + - No more events will arrive at `start_timestamp` |
| 194 | + |
| 195 | +--- |
| 196 | + |
| 197 | +## Related Tables |
| 198 | + |
| 199 | +```sql |
| 200 | +-- EventLoader progress status |
| 201 | +CREATE TABLE apply_dml_events_status ( |
| 202 | + id int PRIMARY KEY AUTO_INCREMENT, |
| 203 | + migration_id int, |
| 204 | + last_loaded_timestamp bigint, -- last loaded event_timestamp |
| 205 | + created_at datetime |
| 206 | +); |
| 207 | + |
| 208 | +-- EventHandler binlog position |
| 209 | +CREATE TABLE event_handler_status ( |
| 210 | + id int PRIMARY KEY AUTO_INCREMENT, |
| 211 | + migration_id int, |
| 212 | + log_file varchar(128), |
| 213 | + log_pos bigint, |
| 214 | + last_event_timestamp bigint, -- max timestamp processed from binlog |
| 215 | + created_at datetime |
| 216 | +); |
| 217 | + |
| 218 | +-- DML event storage |
| 219 | +CREATE TABLE inserted_pk_{migration_id} (source_pk bigint PRIMARY KEY, event_timestamp bigint); |
| 220 | +CREATE TABLE updated_pk_{migration_id} (source_pk bigint PRIMARY KEY, event_timestamp bigint); |
| 221 | +CREATE TABLE deleted_pk_{migration_id} (source_pk bigint PRIMARY KEY, event_timestamp bigint); |
| 222 | +``` |
0 commit comments