risingwave_meta/stream/
refresh_manager.rs1use std::collections::{HashMap, HashSet};
16use std::sync::LazyLock;
17
18use anyhow::anyhow;
19use parking_lot::Mutex;
20use risingwave_common::catalog::{DatabaseId, FragmentTypeFlag, TableId};
21use risingwave_meta_model::ActorId;
22use risingwave_meta_model::table::RefreshState;
23use risingwave_pb::id::SourceId;
24use risingwave_pb::meta::{RefreshRequest, RefreshResponse};
25use thiserror_ext::AsReport;
26
27use crate::barrier::{BarrierScheduler, Command, SharedActorInfos};
28use crate::manager::MetadataManager;
29use crate::{MetaError, MetaResult};
30
31pub static REFRESH_TABLE_PROGRESS_TRACKER: LazyLock<Mutex<GlobalRefreshTableProgressTracker>> =
44 LazyLock::new(|| Mutex::new(GlobalRefreshTableProgressTracker::default()));
45
46#[derive(Default, Debug)]
47pub struct GlobalRefreshTableProgressTracker {
48 pub inner: HashMap<TableId, SingleTableRefreshProgressTracker>,
49 pub table_id_by_database_id: HashMap<DatabaseId, HashSet<TableId>>,
50}
51
52impl GlobalRefreshTableProgressTracker {
53 pub fn remove_tracker_by_database_id(&mut self, database_id: DatabaseId) {
54 let table_ids = self
55 .table_id_by_database_id
56 .remove(&database_id)
57 .unwrap_or_default();
58 for table_id in table_ids {
59 self.inner.remove(&table_id);
60 }
61 }
62}
63
64pub struct RefreshManager {
110 metadata_manager: MetadataManager,
111 barrier_scheduler: BarrierScheduler,
112}
113
114impl RefreshManager {
115 pub fn new(metadata_manager: MetadataManager, barrier_scheduler: BarrierScheduler) -> Self {
117 Self {
118 metadata_manager,
119 barrier_scheduler,
120 }
121 }
122
123 pub async fn refresh_table(
132 &self,
133 request: RefreshRequest,
134 shared_actor_infos: &SharedActorInfos,
135 ) -> MetaResult<RefreshResponse> {
136 let table_id = request.table_id;
137 let associated_source_id = request.associated_source_id;
138
139 self.validate_refreshable_table(table_id, associated_source_id)
141 .await?;
142
143 tracing::info!("Starting refresh operation for table {}", table_id);
144
145 let database_id = self
147 .metadata_manager
148 .catalog_controller
149 .get_object_database_id(table_id)
150 .await?;
151
152 let job_fragments = self
154 .metadata_manager
155 .get_job_fragments_by_id(table_id.as_job_id())
156 .await?;
157
158 {
159 let fragment_to_actor_mapping = shared_actor_infos.read_guard();
160 let mut tracker = SingleTableRefreshProgressTracker::default();
161 for (fragment_id, fragment) in &job_fragments.fragments {
162 if fragment
163 .fragment_type_mask
164 .contains(FragmentTypeFlag::Source)
165 && !fragment.fragment_type_mask.contains(FragmentTypeFlag::Dml)
167 {
168 let fragment_info = fragment_to_actor_mapping
169 .get_fragment(*fragment_id)
170 .ok_or_else(|| MetaError::fragment_not_found(*fragment_id))?;
171 tracker.expected_list_actors.extend(
172 fragment_info
173 .actors
174 .keys()
175 .map(|actor_id| *actor_id as ActorId),
176 );
177 }
178 if fragment
179 .fragment_type_mask
180 .contains(FragmentTypeFlag::FsFetch)
181 && let Some(fragment_info) =
182 fragment_to_actor_mapping.get_fragment(*fragment_id)
183 {
184 tracker.expected_fetch_actors.extend(
185 fragment_info
186 .actors
187 .keys()
188 .map(|actor_id| *actor_id as ActorId),
189 );
190 }
191 }
192
193 {
194 let mut lock_handle = REFRESH_TABLE_PROGRESS_TRACKER.lock();
196 lock_handle.inner.insert(table_id, tracker);
197 lock_handle
198 .table_id_by_database_id
199 .entry(database_id)
200 .or_default()
201 .insert(table_id);
202 }
203
204 Ok::<_, MetaError>(())
205 }?;
206
207 let refresh_command = Command::Refresh {
209 table_id,
210 associated_source_id,
211 };
212
213 match self
215 .barrier_scheduler
216 .run_command(database_id, refresh_command)
217 .await
218 {
219 Ok(_) => {
220 tracing::info!(
221 table_id = %table_id,
222 "Refresh command completed successfully"
223 );
224
225 Ok(RefreshResponse { status: None })
226 }
227 Err(e) => {
228 tracing::error!(
229 error = %e.as_report(),
230 table_id = %table_id,
231 "Failed to execute refresh command, resetting refresh state to Idle"
232 );
233
234 self.metadata_manager
235 .catalog_controller
236 .set_table_refresh_state(table_id, RefreshState::Idle)
237 .await?;
238
239 {
240 let mut lock_handle = REFRESH_TABLE_PROGRESS_TRACKER.lock();
241 lock_handle.inner.remove(&table_id);
242 if let Some(table_ids) =
243 lock_handle.table_id_by_database_id.get_mut(&database_id)
244 {
245 table_ids.remove(&table_id);
246 }
247 }
248
249 Err(anyhow!(e)
250 .context(format!("Failed to refresh table {}", table_id))
251 .into())
252 }
253 }
254 }
255
256 async fn validate_refreshable_table(
258 &self,
259 table_id: TableId,
260 associated_source_id: SourceId,
261 ) -> MetaResult<()> {
262 let table = self
264 .metadata_manager
265 .catalog_controller
266 .get_table_by_id(table_id)
267 .await?;
268
269 if !table.refreshable {
271 return Err(MetaError::invalid_parameter(format!(
272 "Table '{}' is not refreshable. Only tables created with REFRESHABLE flag support manual refresh.",
273 table.name
274 )));
275 }
276
277 if table.optional_associated_source_id != Some(associated_source_id.into()) {
278 return Err(MetaError::invalid_parameter(format!(
279 "Table '{}' is not associated with source '{}'. table.optional_associated_source_id: {:?}",
280 table.name, associated_source_id, table.optional_associated_source_id
281 )));
282 }
283
284 let current_state = self
285 .metadata_manager
286 .catalog_controller
287 .get_table_refresh_state(table_id)
288 .await?;
289 match current_state {
290 Some(RefreshState::Idle) | None => {
291 }
293 state @ (Some(RefreshState::Finishing) | Some(RefreshState::Refreshing)) => {
294 return Err(MetaError::invalid_parameter(format!(
295 "Table '{}' is currently in state {:?}. Cannot start a new refresh operation.",
296 table.name,
297 state.unwrap()
298 )));
299 }
300 }
301
302 tracing::debug!(
303 table_id = %table_id,
304 table_name = %table.name,
305 "Table validation passed for refresh operation"
306 );
307
308 Ok(())
309 }
310}
311
312#[derive(Default, Debug)]
313pub struct SingleTableRefreshProgressTracker {
314 pub expected_list_actors: HashSet<ActorId>,
315 pub expected_fetch_actors: HashSet<ActorId>,
316 pub list_finished_actors: HashSet<ActorId>,
317 pub fetch_finished_actors: HashSet<ActorId>,
318}
319
320impl SingleTableRefreshProgressTracker {
321 pub fn report_list_finished(&mut self, actor_ids: impl Iterator<Item = ActorId>) {
322 self.list_finished_actors.extend(actor_ids);
323 }
324
325 pub fn is_list_finished(&self) -> MetaResult<bool> {
326 if self.list_finished_actors.len() >= self.expected_list_actors.len() {
327 if self.expected_list_actors == self.list_finished_actors {
328 Ok(true)
329 } else {
330 Err(MetaError::from(anyhow!(
331 "list finished actors mismatch: expected: {:?}, actual: {:?}",
332 self.expected_list_actors,
333 self.list_finished_actors
334 )))
335 }
336 } else {
337 Ok(false)
338 }
339 }
340
341 pub fn report_load_finished(&mut self, actor_ids: impl Iterator<Item = ActorId>) {
342 self.fetch_finished_actors.extend(actor_ids);
343 }
344
345 pub fn is_load_finished(&self) -> MetaResult<bool> {
346 if self.fetch_finished_actors.len() >= self.expected_fetch_actors.len() {
347 if self.expected_fetch_actors == self.fetch_finished_actors {
348 Ok(true)
349 } else {
350 Err(MetaError::from(anyhow!(
351 "fetch finished actors mismatch: expected: {:?}, actual: {:?}",
352 self.expected_fetch_actors,
353 self.fetch_finished_actors
354 )))
355 }
356 } else {
357 Ok(false)
358 }
359 }
360}