1use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17use std::time::{Duration, Instant};
18
19use anyhow::anyhow;
20use chrono::{DateTime, Duration as ChronoDuration, Utc};
21use parking_lot::Mutex;
22use risingwave_common::catalog::{DatabaseId, FragmentTypeFlag, TableId};
23use risingwave_common::util::epoch::Epoch;
24use risingwave_meta_model::ActorId;
25use risingwave_meta_model::refresh_job::{self, RefreshState};
26use risingwave_pb::catalog::table::OptionalAssociatedSourceId;
27use risingwave_pb::id::SourceId;
28use risingwave_pb::meta::{RefreshRequest, RefreshResponse};
29use thiserror_ext::AsReport;
30use tokio::sync::{Notify, oneshot};
31use tokio::task::JoinHandle;
32
33use crate::barrier::{BarrierScheduler, Command, SharedActorInfos};
34use crate::manager::{MetaSrvEnv, MetadataManager};
35use crate::rpc::metrics::GLOBAL_META_METRICS;
36use crate::{MetaError, MetaResult};
37
38pub type GlobalRefreshManagerRef = Arc<GlobalRefreshManager>;
39
40pub struct GlobalRefreshManager {
41 metadata_manager: MetadataManager,
42 barrier_scheduler: BarrierScheduler,
43 shared_actor_infos: SharedActorInfos,
44 progress_trackers: Mutex<GlobalRefreshTableProgressTracker>,
45 scheduler_notify: Notify,
46 scheduler_interval: Duration,
47}
48
49impl GlobalRefreshManager {
50 pub async fn start(
51 metadata_manager: MetadataManager,
52 barrier_scheduler: BarrierScheduler,
53 env: &MetaSrvEnv,
54 scheduler_interval: Duration,
55 ) -> MetaResult<(GlobalRefreshManagerRef, JoinHandle<()>, oneshot::Sender<()>)> {
56 let shared_actor_infos = env.shared_actor_infos().clone();
57 let manager = Arc::new(Self {
58 metadata_manager: metadata_manager.clone(),
59 barrier_scheduler,
60 shared_actor_infos,
61 progress_trackers: Mutex::new(GlobalRefreshTableProgressTracker::default()),
62 scheduler_notify: Notify::new(),
63 scheduler_interval,
64 });
65
66 manager
67 .metadata_manager
68 .reset_all_refresh_jobs_to_idle()
69 .await?;
70 manager.sync_refreshable_jobs().await?;
71
72 let (shutdown_tx, shutdown_rx) = oneshot::channel();
73 let join_handle = Self::spawn_scheduler(manager.clone(), shutdown_rx);
74
75 Ok((manager, join_handle, shutdown_tx))
76 }
77
78 fn spawn_scheduler(
79 manager: GlobalRefreshManagerRef,
80 mut shutdown_rx: oneshot::Receiver<()>,
81 ) -> JoinHandle<()> {
82 let scheduler_interval = manager.scheduler_interval;
83 tokio::spawn(async move {
84 let mut interval = tokio::time::interval(scheduler_interval);
85 loop {
86 tokio::select! {
87 _ = interval.tick() => {
88 if let Err(err) = manager.handle_scheduler_tick().await {
89 tracing::warn!(error = %err.as_report(), "refresh scheduler tick failed");
90 }
91 }
92 _ = manager.scheduler_notify.notified() => {
93 if let Err(err) = manager.handle_scheduler_tick().await {
94 tracing::warn!(error = %err.as_report(), "refresh scheduler tick failed");
95 }
96 }
97 _ = &mut shutdown_rx => {
98 tracing::info!("refresh scheduler shutting down");
99 break;
100 }
101 }
102 }
103 })
104 }
105
106 pub async fn trigger_manual_refresh(
107 self: &Arc<Self>,
108 request: RefreshRequest,
109 shared_actor_infos: &SharedActorInfos,
110 ) -> MetaResult<RefreshResponse> {
111 let table_id = request.table_id;
112 let associated_source_id = request.associated_source_id;
113 tracing::info!(%table_id, %associated_source_id, "trigger manual refresh");
114
115 self.ensure_refreshable(table_id, associated_source_id)
116 .await?;
117
118 let result = self
119 .execute_refresh(table_id, associated_source_id, shared_actor_infos)
120 .await;
121
122 match result {
123 Ok(_) => Ok(RefreshResponse { status: None }),
124 Err(err) => Err(err),
125 }
126 }
127
128 pub async fn mark_refresh_complete(&self, table_id: TableId) -> MetaResult<()> {
129 self.metadata_manager
130 .update_refresh_job_status(table_id, RefreshState::Idle, None, true)
131 .await?;
132 self.remove_progress_tracker(table_id, "success");
133 tracing::info!(%table_id, "Table refresh completed, state updated to Idle");
134 Ok(())
135 }
136
137 pub fn mark_list_stage_finished(
138 &self,
139 table_id: TableId,
140 actors: &HashSet<ActorId>,
141 ) -> MetaResult<bool> {
142 let mut guard = self.progress_trackers.lock();
143 let tracker = guard.inner.get_mut(&table_id).ok_or_else(|| {
144 MetaError::from(anyhow!("Table tracker not found for table {}", table_id))
145 })?;
146 tracker.report_list_finished(actors.iter().copied());
147 tracker.is_list_finished()
148 }
149
150 pub fn mark_load_stage_finished(
151 &self,
152 table_id: TableId,
153 actors: &HashSet<ActorId>,
154 ) -> MetaResult<bool> {
155 let mut guard = self.progress_trackers.lock();
156 let tracker = guard.inner.get_mut(&table_id).ok_or_else(|| {
157 MetaError::from(anyhow!("Table tracker not found for table {}", table_id))
158 })?;
159 tracker.report_load_finished(actors.iter().copied());
160 tracker.is_load_finished()
161 }
162
163 pub fn remove_trackers_by_database(&self, database_id: DatabaseId) {
164 let mut guard = self.progress_trackers.lock();
165 guard.remove_tracker_by_database_id(database_id);
166 }
167
168 pub fn notify_scheduler(&self) {
169 self.scheduler_notify.notify_one();
170 }
171
172 async fn handle_scheduler_tick(self: &Arc<Self>) -> MetaResult<()> {
173 let jobs = self.metadata_manager.list_refresh_jobs().await?;
174 for job in jobs {
175 if let Err(err) = self.try_trigger_scheduled_refresh(&job).await {
176 tracing::warn!(
177 table_id = %job.table_id,
178 error = %err.as_report(),
179 "failed to trigger scheduled refresh"
180 );
181 }
182 }
183 Ok(())
184 }
185
186 async fn sync_refreshable_jobs(&self) -> MetaResult<()> {
187 let table_ids = self.metadata_manager.list_refreshable_table_ids().await?;
188 for table_id in table_ids {
189 self.metadata_manager.ensure_refresh_job(table_id).await?;
190 }
191 Ok(())
192 }
193
194 async fn try_trigger_scheduled_refresh(
195 self: &Arc<Self>,
196 job: &refresh_job::Model,
197 ) -> MetaResult<()> {
198 if job.current_status != RefreshState::Idle {
199 GLOBAL_META_METRICS
200 .refresh_cron_job_miss_cnt
201 .with_guarded_label_values(&[&job.table_id.to_string()])
202 .inc();
203 tracing::warn!(table_id = %job.table_id, "skip scheduled refresh: current status is not idle: {:?}", job.current_status);
204 return Ok(());
205 }
206 let Some(interval_secs) = job.trigger_interval_secs else {
207 return Ok(());
208 };
209 if interval_secs <= 0 {
210 return Ok(());
211 }
212
213 let interval = ChronoDuration::seconds(interval_secs);
214 let last_run = if let Some(last_run) = job.last_trigger_time {
215 last_run
216 } else {
217 self.metadata_manager
218 .get_table_catalog_by_ids(&[job.table_id])
219 .await?
220 .first()
221 .map(|t| {
222 Epoch(t.created_at_epoch())
223 .as_timestamptz()
224 .to_datetime_utc()
225 .timestamp_millis()
226 })
227 .unwrap()
228 };
229 let now = Utc::now().naive_utc();
230 if now.signed_duration_since(
231 DateTime::from_timestamp_millis(last_run)
232 .unwrap()
233 .naive_utc(),
234 ) < interval
235 {
236 return Ok(());
237 }
238
239 let table = self
240 .metadata_manager
241 .catalog_controller
242 .get_table_by_id(job.table_id)
243 .await?;
244 if !table.refreshable {
245 return Ok(());
246 }
247
248 let Some(OptionalAssociatedSourceId::AssociatedSourceId(src_id)) =
249 table.optional_associated_source_id
250 else {
251 tracing::warn!(
252 table_id = %job.table_id,
253 "skip scheduled refresh: missing associated source id"
254 );
255 return Ok(());
256 };
257 let associated_source_id = SourceId::new(src_id);
258
259 let table_id_str = job.table_id.to_string();
261 GLOBAL_META_METRICS
262 .refresh_cron_job_trigger_cnt
263 .with_guarded_label_values(&[&table_id_str])
264 .inc();
265 tracing::info!(table_id = %job.table_id, "trigger scheduled refresh at interval {:?}", interval);
266
267 self.ensure_refreshable(job.table_id, associated_source_id)
268 .await?;
269 self.execute_refresh(job.table_id, associated_source_id, &self.shared_actor_infos)
270 .await?;
271 Ok(())
272 }
273
274 async fn execute_refresh(
275 self: &Arc<Self>,
276 table_id: TableId,
277 associated_source_id: SourceId,
278 shared_actor_infos: &SharedActorInfos,
279 ) -> MetaResult<()> {
280 let trigger_time = Utc::now().naive_utc();
281 let database_id = self
282 .metadata_manager
283 .catalog_controller
284 .get_object_database_id(table_id)
285 .await?;
286
287 let job_fragments = self
288 .metadata_manager
289 .get_job_fragments_by_id(table_id.as_job_id())
290 .await?;
291
292 let mut tracker = SingleTableRefreshProgressTracker::new();
293 {
294 let fragment_info_guard = shared_actor_infos.read_guard();
295 for (fragment_id, fragment) in &job_fragments.fragments {
296 if fragment
297 .fragment_type_mask
298 .contains(FragmentTypeFlag::Source)
299 && !fragment.fragment_type_mask.contains(FragmentTypeFlag::Dml)
300 {
301 let fragment_info = fragment_info_guard
302 .get_fragment(*fragment_id)
303 .ok_or_else(|| MetaError::fragment_not_found(*fragment_id))?;
304 tracker.expected_list_actors.extend(
305 fragment_info
306 .actors
307 .keys()
308 .map(|actor_id| *actor_id as ActorId),
309 );
310 }
311
312 if fragment
313 .fragment_type_mask
314 .contains(FragmentTypeFlag::FsFetch)
315 && let Some(fragment_info) = fragment_info_guard.get_fragment(*fragment_id)
316 {
317 tracker.expected_fetch_actors.extend(
318 fragment_info
319 .actors
320 .keys()
321 .map(|actor_id| *actor_id as ActorId),
322 );
323 }
324 }
325 }
326
327 self.register_progress_tracker(table_id, database_id, tracker);
328
329 self.metadata_manager
330 .update_refresh_job_status(
331 table_id,
332 RefreshState::Refreshing,
333 Some(trigger_time),
334 false,
335 )
336 .await?;
337
338 let refresh_command = Command::Refresh {
339 table_id,
340 associated_source_id,
341 };
342
343 let result = self
344 .barrier_scheduler
345 .run_command(database_id, refresh_command)
346 .await;
347
348 match result {
349 Ok(_) => {
350 tracing::info!(table_id = %table_id, "refresh command scheduled");
351 Ok(())
352 }
353 Err(err) => {
354 tracing::error!(
355 error = %err.as_report(),
356 table_id = %table_id,
357 "failed to execute refresh command"
358 );
359 self.metadata_manager
360 .update_refresh_job_status(table_id, RefreshState::Idle, None, false)
361 .await?;
362 self.remove_progress_tracker(table_id, "failure");
363 Err(anyhow!(err)
364 .context(format!("Failed to refresh table {}", table_id))
365 .into())
366 }
367 }
368 }
369
370 async fn ensure_refreshable(
371 &self,
372 table_id: TableId,
373 associated_source_id: SourceId,
374 ) -> MetaResult<()> {
375 let table = self
376 .metadata_manager
377 .catalog_controller
378 .get_table_by_id(table_id)
379 .await?;
380
381 if !table.refreshable {
382 return Err(MetaError::invalid_parameter(format!(
383 "Table '{}' is not refreshable. Only tables created with REFRESHABLE flag support refresh.",
384 table.name
385 )));
386 }
387
388 if table.optional_associated_source_id != Some(associated_source_id.into()) {
389 return Err(MetaError::invalid_parameter(format!(
390 "Table '{}' is not associated with source '{}'. table.optional_associated_source_id: {:?}",
391 table.name, associated_source_id, table.optional_associated_source_id
392 )));
393 }
394
395 let refresh_job_state = self
396 .metadata_manager
397 .catalog_controller
398 .get_refresh_job_state_by_table_id(table_id)
399 .await?;
400 if refresh_job_state != RefreshState::Idle {
401 return Err(MetaError::invalid_parameter(format!(
402 "Table '{}' is not in idle state. Current state: {:?}",
403 table.name, refresh_job_state
404 )));
405 }
406
407 Ok(())
408 }
409
410 fn register_progress_tracker(
411 &self,
412 table_id: TableId,
413 database_id: DatabaseId,
414 tracker: SingleTableRefreshProgressTracker,
415 ) {
416 let mut guard = self.progress_trackers.lock();
417 guard.inner.insert(table_id, tracker);
418 guard
419 .table_id_by_database_id
420 .entry(database_id)
421 .or_default()
422 .insert(table_id);
423 }
424
425 pub fn remove_progress_tracker(&self, table_id: TableId, status: &str) {
426 let mut guard = self.progress_trackers.lock();
427 if let Some(entry) = guard.inner.remove(&table_id) {
428 let status = status.to_owned();
429 GLOBAL_META_METRICS
430 .refresh_job_duration
431 .with_guarded_label_values(&[&table_id.to_string(), &status])
432 .set(entry.start_time.elapsed().as_secs());
433 GLOBAL_META_METRICS
434 .refresh_job_finish_cnt
435 .with_guarded_label_values(&[&table_id.to_string(), &status])
436 .inc();
437 }
438 guard.table_id_by_database_id.values_mut().for_each(|set| {
439 set.remove(&table_id);
440 });
441 }
442}
443
444#[derive(Default, Debug)]
445pub struct GlobalRefreshTableProgressTracker {
446 pub inner: HashMap<TableId, SingleTableRefreshProgressTracker>,
447 pub table_id_by_database_id: HashMap<DatabaseId, HashSet<TableId>>,
448}
449
450impl GlobalRefreshTableProgressTracker {
451 pub fn remove_tracker_by_database_id(&mut self, database_id: DatabaseId) {
452 if let Some(table_ids) = self.table_id_by_database_id.remove(&database_id) {
453 for table_id in table_ids {
454 self.inner.remove(&table_id);
455 }
456 }
457 }
458}
459
460#[derive(Debug)]
461pub struct SingleTableRefreshProgressTracker {
462 pub expected_list_actors: HashSet<ActorId>,
463 pub expected_fetch_actors: HashSet<ActorId>,
464 pub list_finished_actors: HashSet<ActorId>,
465 pub fetch_finished_actors: HashSet<ActorId>,
466
467 pub start_time: Instant,
468}
469
470impl SingleTableRefreshProgressTracker {
471 pub fn new() -> Self {
472 Self {
473 expected_list_actors: HashSet::new(),
474 expected_fetch_actors: HashSet::new(),
475 list_finished_actors: HashSet::new(),
476 fetch_finished_actors: HashSet::new(),
477 start_time: Instant::now(),
478 }
479 }
480
481 pub fn report_list_finished(&mut self, actor_ids: impl Iterator<Item = ActorId>) {
482 self.list_finished_actors.extend(actor_ids);
483 }
484
485 pub fn is_list_finished(&self) -> MetaResult<bool> {
486 if self.list_finished_actors.len() >= self.expected_list_actors.len() {
487 if self.expected_list_actors == self.list_finished_actors {
488 Ok(true)
489 } else {
490 Err(MetaError::from(anyhow!(
491 "list finished actors mismatch: expected: {:?}, actual: {:?}",
492 self.expected_list_actors,
493 self.list_finished_actors
494 )))
495 }
496 } else {
497 Ok(false)
498 }
499 }
500
501 pub fn report_load_finished(&mut self, actor_ids: impl Iterator<Item = ActorId>) {
502 self.fetch_finished_actors.extend(actor_ids);
503 }
504
505 pub fn is_load_finished(&self) -> MetaResult<bool> {
506 if self.fetch_finished_actors.len() >= self.expected_fetch_actors.len() {
507 if self.expected_fetch_actors == self.fetch_finished_actors {
508 Ok(true)
509 } else {
510 Err(MetaError::from(anyhow!(
511 "fetch finished actors mismatch: expected: {:?}, actual: {:?}",
512 self.expected_fetch_actors,
513 self.fetch_finished_actors
514 )))
515 }
516 } else {
517 Ok(false)
518 }
519 }
520}