risingwave_meta/stream/
refresh_manager.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::sync::LazyLock;
17
18use anyhow::anyhow;
19use parking_lot::Mutex;
20use risingwave_common::catalog::{DatabaseId, FragmentTypeFlag, TableId};
21use risingwave_meta_model::ActorId;
22use risingwave_meta_model::table::RefreshState;
23use risingwave_pb::id::SourceId;
24use risingwave_pb::meta::{RefreshRequest, RefreshResponse};
25use thiserror_ext::AsReport;
26
27use crate::barrier::{BarrierScheduler, Command, SharedActorInfos};
28use crate::manager::MetadataManager;
29use crate::{MetaError, MetaResult};
30
31/// Global, per-table refresh progress tracker.
32///
33/// Lifecycle for each entry (keyed by `TableId`):
34/// - Created at refresh start in `RefreshManager::refresh_table` before any `await`,
35///   populated with the expected source/fetch actor sets for that table.
36/// - Updated on each barrier in checkpoint control when executors report
37///   list/load progress; see `CheckpointControl::handle_refresh_table_info`.
38/// - Removed when the table is reported as refresh-finished by compute on a
39///   barrier (`refresh_finished_table_ids`).
40///
41/// Failure/retry notes:
42/// - If scheduling the refresh fails, the table state is reset to `Idle`
43pub static REFRESH_TABLE_PROGRESS_TRACKER: LazyLock<Mutex<GlobalRefreshTableProgressTracker>> =
44    LazyLock::new(|| Mutex::new(GlobalRefreshTableProgressTracker::default()));
45
46#[derive(Default, Debug)]
47pub struct GlobalRefreshTableProgressTracker {
48    pub inner: HashMap<TableId, SingleTableRefreshProgressTracker>,
49    pub table_id_by_database_id: HashMap<DatabaseId, HashSet<TableId>>,
50}
51
52impl GlobalRefreshTableProgressTracker {
53    pub fn remove_tracker_by_database_id(&mut self, database_id: DatabaseId) {
54        let table_ids = self
55            .table_id_by_database_id
56            .remove(&database_id)
57            .unwrap_or_default();
58        for table_id in table_ids {
59            self.inner.remove(&table_id);
60        }
61    }
62}
63
64/// # High level design for refresh table
65///
66/// - Three tables:
67///
68/// - Main table: serves queries.
69/// - Staging table: receives refreshed content during `Refreshing`.
70/// - Progress table: per-VNode progress state for resumable refresh.
71///
72/// - Phased execution:
73///
74/// - Normal → Refreshing → Merging → Cleanup → Normal.
75/// - Refreshing: load and write to staging.
76/// - Merging: chunked sort-merge integrates staging into main; per-VNode progress persists checkpoints.
77/// - Cleanup: purge staging and reset progress.
78///
79/// - Barrier-first responsiveness:
80///
81/// - Executor uses left-priority `select_with_strategy`, always handling upstream messages/barriers before background merge.
82/// - On barriers, the executor persists progress so restarts resume exactly.
83///
84/// - Meta-managed state:
85///
86/// - `refresh_state` on each table enforces no concurrent refresh and enables recovery after failures.
87/// - Startup recovery resets lingering `Refreshing` tables to `Idle` and lets executors resume `Finishing` safely.
88///
89/// ## Progress Table (Conceptual)
90/// Tracks, per VNode:
91/// - last processed position (e.g., last PK),
92/// - completion flag,
93/// - processed row count,
94/// - last checkpoint epoch.
95///
96/// The executor initializes entries on `RefreshStart`, updates them during merge, and loads them at startup to resume from the last checkpoint.
97///
98/// ## Barrier Coordination and Completion
99/// - Compute reports:
100///
101/// - `refresh_finished_table_ids`: indicates a materialized view finished refreshing.
102/// - `truncate_tables`: staging tables to be cleaned up.
103// - Checkpoint control aggregates these across barrier types; completion handlers in meta:
104/// - update `refresh_state` to `Idle`,
105/// - schedule/handle `LoadFinish`,
106/// - drive cleanup work reliably after the storage version commit.
107///
108/// Manager responsible for handling refresh operations on refreshable tables
109pub struct RefreshManager {
110    metadata_manager: MetadataManager,
111    barrier_scheduler: BarrierScheduler,
112}
113
114impl RefreshManager {
115    /// Create a new `RefreshManager` instance
116    pub fn new(metadata_manager: MetadataManager, barrier_scheduler: BarrierScheduler) -> Self {
117        Self {
118            metadata_manager,
119            barrier_scheduler,
120        }
121    }
122
123    /// Execute a refresh operation for the specified table
124    ///
125    /// This method:
126    /// 1. Validates that the table exists and is refreshable
127    /// 2. Checks current refresh state and ensures no concurrent refresh
128    /// 3. Atomically sets the table state to REFRESHING
129    /// 4. Sends a refresh command through the barrier system
130    /// 5. Returns the result of the refresh operation
131    pub async fn refresh_table(
132        &self,
133        request: RefreshRequest,
134        shared_actor_infos: &SharedActorInfos,
135    ) -> MetaResult<RefreshResponse> {
136        let table_id = request.table_id;
137        let associated_source_id = request.associated_source_id;
138
139        // Validate that the table exists and is refreshable
140        self.validate_refreshable_table(table_id, associated_source_id)
141            .await?;
142
143        tracing::info!("Starting refresh operation for table {}", table_id);
144
145        // Get database_id for the table
146        let database_id = self
147            .metadata_manager
148            .catalog_controller
149            .get_object_database_id(table_id)
150            .await?;
151
152        // load actor info for refresh
153        let job_fragments = self
154            .metadata_manager
155            .get_job_fragments_by_id(table_id.as_job_id())
156            .await?;
157
158        {
159            let fragment_to_actor_mapping = shared_actor_infos.read_guard();
160            let mut tracker = SingleTableRefreshProgressTracker::default();
161            for (fragment_id, fragment) in &job_fragments.fragments {
162                if fragment
163                    .fragment_type_mask
164                    .contains(FragmentTypeFlag::Source)
165                    // should exclude dml fragments to avoid selecting the DML sql
166                    && !fragment.fragment_type_mask.contains(FragmentTypeFlag::Dml)
167                {
168                    let fragment_info = fragment_to_actor_mapping
169                        .get_fragment(*fragment_id)
170                        .ok_or_else(|| MetaError::fragment_not_found(*fragment_id))?;
171                    tracker.expected_list_actors.extend(
172                        fragment_info
173                            .actors
174                            .keys()
175                            .map(|actor_id| *actor_id as ActorId),
176                    );
177                }
178                if fragment
179                    .fragment_type_mask
180                    .contains(FragmentTypeFlag::FsFetch)
181                    && let Some(fragment_info) =
182                        fragment_to_actor_mapping.get_fragment(*fragment_id)
183                {
184                    tracker.expected_fetch_actors.extend(
185                        fragment_info
186                            .actors
187                            .keys()
188                            .map(|actor_id| *actor_id as ActorId),
189                    );
190                }
191            }
192
193            {
194                // Store tracker in global tracker before guard is dropped
195                let mut lock_handle = REFRESH_TABLE_PROGRESS_TRACKER.lock();
196                lock_handle.inner.insert(table_id, tracker);
197                lock_handle
198                    .table_id_by_database_id
199                    .entry(database_id)
200                    .or_default()
201                    .insert(table_id);
202            }
203
204            Ok::<_, MetaError>(())
205        }?;
206
207        // Create refresh command
208        let refresh_command = Command::Refresh {
209            table_id,
210            associated_source_id,
211        };
212
213        // Send refresh command through barrier system
214        match self
215            .barrier_scheduler
216            .run_command(database_id, refresh_command)
217            .await
218        {
219            Ok(_) => {
220                tracing::info!(
221                    table_id = %table_id,
222                    "Refresh command completed successfully"
223                );
224
225                Ok(RefreshResponse { status: None })
226            }
227            Err(e) => {
228                tracing::error!(
229                    error = %e.as_report(),
230                    table_id = %table_id,
231                    "Failed to execute refresh command, resetting refresh state to Idle"
232                );
233
234                self.metadata_manager
235                    .catalog_controller
236                    .set_table_refresh_state(table_id, RefreshState::Idle)
237                    .await?;
238
239                {
240                    let mut lock_handle = REFRESH_TABLE_PROGRESS_TRACKER.lock();
241                    lock_handle.inner.remove(&table_id);
242                    if let Some(table_ids) =
243                        lock_handle.table_id_by_database_id.get_mut(&database_id)
244                    {
245                        table_ids.remove(&table_id);
246                    }
247                }
248
249                Err(anyhow!(e)
250                    .context(format!("Failed to refresh table {}", table_id))
251                    .into())
252            }
253        }
254    }
255
256    /// Validate that the specified table exists and supports refresh operations
257    async fn validate_refreshable_table(
258        &self,
259        table_id: TableId,
260        associated_source_id: SourceId,
261    ) -> MetaResult<()> {
262        // Check if table exists in catalog
263        let table = self
264            .metadata_manager
265            .catalog_controller
266            .get_table_by_id(table_id)
267            .await?;
268
269        // Check if table is refreshable
270        if !table.refreshable {
271            return Err(MetaError::invalid_parameter(format!(
272                "Table '{}' is not refreshable. Only tables created with REFRESHABLE flag support manual refresh.",
273                table.name
274            )));
275        }
276
277        if table.optional_associated_source_id != Some(associated_source_id.into()) {
278            return Err(MetaError::invalid_parameter(format!(
279                "Table '{}' is not associated with source '{}'. table.optional_associated_source_id: {:?}",
280                table.name, associated_source_id, table.optional_associated_source_id
281            )));
282        }
283
284        let current_state = self
285            .metadata_manager
286            .catalog_controller
287            .get_table_refresh_state(table_id)
288            .await?;
289        match current_state {
290            Some(RefreshState::Idle) | None => {
291                // the table is not refreshing. issue a refresh
292            }
293            state @ (Some(RefreshState::Finishing) | Some(RefreshState::Refreshing)) => {
294                return Err(MetaError::invalid_parameter(format!(
295                    "Table '{}' is currently in state {:?}. Cannot start a new refresh operation.",
296                    table.name,
297                    state.unwrap()
298                )));
299            }
300        }
301
302        tracing::debug!(
303            table_id = %table_id,
304            table_name = %table.name,
305            "Table validation passed for refresh operation"
306        );
307
308        Ok(())
309    }
310}
311
312#[derive(Default, Debug)]
313pub struct SingleTableRefreshProgressTracker {
314    pub expected_list_actors: HashSet<ActorId>,
315    pub expected_fetch_actors: HashSet<ActorId>,
316    pub list_finished_actors: HashSet<ActorId>,
317    pub fetch_finished_actors: HashSet<ActorId>,
318}
319
320impl SingleTableRefreshProgressTracker {
321    pub fn report_list_finished(&mut self, actor_ids: impl Iterator<Item = ActorId>) {
322        self.list_finished_actors.extend(actor_ids);
323    }
324
325    pub fn is_list_finished(&self) -> MetaResult<bool> {
326        if self.list_finished_actors.len() >= self.expected_list_actors.len() {
327            if self.expected_list_actors == self.list_finished_actors {
328                Ok(true)
329            } else {
330                Err(MetaError::from(anyhow!(
331                    "list finished actors mismatch: expected: {:?}, actual: {:?}",
332                    self.expected_list_actors,
333                    self.list_finished_actors
334                )))
335            }
336        } else {
337            Ok(false)
338        }
339    }
340
341    pub fn report_load_finished(&mut self, actor_ids: impl Iterator<Item = ActorId>) {
342        self.fetch_finished_actors.extend(actor_ids);
343    }
344
345    pub fn is_load_finished(&self) -> MetaResult<bool> {
346        if self.fetch_finished_actors.len() >= self.expected_fetch_actors.len() {
347            if self.expected_fetch_actors == self.fetch_finished_actors {
348                Ok(true)
349            } else {
350                Err(MetaError::from(anyhow!(
351                    "fetch finished actors mismatch: expected: {:?}, actual: {:?}",
352                    self.expected_fetch_actors,
353                    self.fetch_finished_actors
354                )))
355            }
356        } else {
357            Ok(false)
358        }
359    }
360}