risingwave_meta/stream/
refresh_manager.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17use std::time::{Duration, Instant};
18
19use anyhow::anyhow;
20use chrono::{DateTime, Duration as ChronoDuration, Utc};
21use parking_lot::Mutex;
22use risingwave_common::catalog::{DatabaseId, FragmentTypeFlag, TableId};
23use risingwave_common::util::epoch::Epoch;
24use risingwave_meta_model::ActorId;
25use risingwave_meta_model::refresh_job::{self, RefreshState};
26use risingwave_pb::catalog::table::OptionalAssociatedSourceId;
27use risingwave_pb::id::SourceId;
28use risingwave_pb::meta::{RefreshRequest, RefreshResponse};
29use thiserror_ext::AsReport;
30use tokio::sync::{Notify, oneshot};
31use tokio::task::JoinHandle;
32
33use crate::barrier::{BarrierScheduler, Command, SharedActorInfos};
34use crate::manager::{MetaSrvEnv, MetadataManager};
35use crate::rpc::metrics::GLOBAL_META_METRICS;
36use crate::{MetaError, MetaResult};
37
38pub type GlobalRefreshManagerRef = Arc<GlobalRefreshManager>;
39
40pub struct GlobalRefreshManager {
41    metadata_manager: MetadataManager,
42    barrier_scheduler: BarrierScheduler,
43    shared_actor_infos: SharedActorInfos,
44    progress_trackers: Mutex<GlobalRefreshTableProgressTracker>,
45    scheduler_notify: Notify,
46    scheduler_interval: Duration,
47}
48
49impl GlobalRefreshManager {
50    pub async fn start(
51        metadata_manager: MetadataManager,
52        barrier_scheduler: BarrierScheduler,
53        env: &MetaSrvEnv,
54        scheduler_interval: Duration,
55    ) -> MetaResult<(GlobalRefreshManagerRef, JoinHandle<()>, oneshot::Sender<()>)> {
56        let shared_actor_infos = env.shared_actor_infos().clone();
57        let manager = Arc::new(Self {
58            metadata_manager: metadata_manager.clone(),
59            barrier_scheduler,
60            shared_actor_infos,
61            progress_trackers: Mutex::new(GlobalRefreshTableProgressTracker::default()),
62            scheduler_notify: Notify::new(),
63            scheduler_interval,
64        });
65
66        manager
67            .metadata_manager
68            .reset_all_refresh_jobs_to_idle()
69            .await?;
70        manager.sync_refreshable_jobs().await?;
71
72        let (shutdown_tx, shutdown_rx) = oneshot::channel();
73        let join_handle = Self::spawn_scheduler(manager.clone(), shutdown_rx);
74
75        Ok((manager, join_handle, shutdown_tx))
76    }
77
78    fn spawn_scheduler(
79        manager: GlobalRefreshManagerRef,
80        mut shutdown_rx: oneshot::Receiver<()>,
81    ) -> JoinHandle<()> {
82        let scheduler_interval = manager.scheduler_interval;
83        tokio::spawn(async move {
84            let mut interval = tokio::time::interval(scheduler_interval);
85            loop {
86                tokio::select! {
87                    _ = interval.tick() => {
88                        if let Err(err) = manager.handle_scheduler_tick().await {
89                            tracing::warn!(error = %err.as_report(), "refresh scheduler tick failed");
90                        }
91                    }
92                    _ = manager.scheduler_notify.notified() => {
93                        if let Err(err) = manager.handle_scheduler_tick().await {
94                            tracing::warn!(error = %err.as_report(), "refresh scheduler tick failed");
95                        }
96                    }
97                    _ = &mut shutdown_rx => {
98                        tracing::info!("refresh scheduler shutting down");
99                        break;
100                    }
101                }
102            }
103        })
104    }
105
106    pub async fn trigger_manual_refresh(
107        self: &Arc<Self>,
108        request: RefreshRequest,
109        shared_actor_infos: &SharedActorInfos,
110    ) -> MetaResult<RefreshResponse> {
111        let table_id = request.table_id;
112        let associated_source_id = request.associated_source_id;
113        tracing::info!(%table_id, %associated_source_id, "trigger manual refresh");
114
115        self.ensure_refreshable(table_id, associated_source_id)
116            .await?;
117
118        let result = self
119            .execute_refresh(table_id, associated_source_id, shared_actor_infos)
120            .await;
121
122        match result {
123            Ok(_) => Ok(RefreshResponse { status: None }),
124            Err(err) => Err(err),
125        }
126    }
127
128    pub async fn mark_refresh_complete(&self, table_id: TableId) -> MetaResult<()> {
129        self.metadata_manager
130            .update_refresh_job_status(table_id, RefreshState::Idle, None, true)
131            .await?;
132        self.remove_progress_tracker(table_id, "success");
133        tracing::info!(%table_id, "Table refresh completed, state updated to Idle");
134        Ok(())
135    }
136
137    pub fn mark_list_stage_finished(
138        &self,
139        table_id: TableId,
140        actors: &HashSet<ActorId>,
141    ) -> MetaResult<bool> {
142        let mut guard = self.progress_trackers.lock();
143        let tracker = guard.inner.get_mut(&table_id).ok_or_else(|| {
144            MetaError::from(anyhow!("Table tracker not found for table {}", table_id))
145        })?;
146        tracker.report_list_finished(actors.iter().copied());
147        tracker.is_list_finished()
148    }
149
150    pub fn mark_load_stage_finished(
151        &self,
152        table_id: TableId,
153        actors: &HashSet<ActorId>,
154    ) -> MetaResult<bool> {
155        let mut guard = self.progress_trackers.lock();
156        let tracker = guard.inner.get_mut(&table_id).ok_or_else(|| {
157            MetaError::from(anyhow!("Table tracker not found for table {}", table_id))
158        })?;
159        tracker.report_load_finished(actors.iter().copied());
160        tracker.is_load_finished()
161    }
162
163    pub fn remove_trackers_by_database(&self, database_id: DatabaseId) {
164        let mut guard = self.progress_trackers.lock();
165        guard.remove_tracker_by_database_id(database_id);
166    }
167
168    pub fn notify_scheduler(&self) {
169        self.scheduler_notify.notify_one();
170    }
171
172    async fn handle_scheduler_tick(self: &Arc<Self>) -> MetaResult<()> {
173        let jobs = self.metadata_manager.list_refresh_jobs().await?;
174        for job in jobs {
175            if let Err(err) = self.try_trigger_scheduled_refresh(&job).await {
176                tracing::warn!(
177                    table_id = %job.table_id,
178                    error = %err.as_report(),
179                    "failed to trigger scheduled refresh"
180                );
181            }
182        }
183        Ok(())
184    }
185
186    async fn sync_refreshable_jobs(&self) -> MetaResult<()> {
187        let table_ids = self.metadata_manager.list_refreshable_table_ids().await?;
188        for table_id in table_ids {
189            self.metadata_manager.ensure_refresh_job(table_id).await?;
190        }
191        Ok(())
192    }
193
194    async fn try_trigger_scheduled_refresh(
195        self: &Arc<Self>,
196        job: &refresh_job::Model,
197    ) -> MetaResult<()> {
198        if job.current_status != RefreshState::Idle {
199            GLOBAL_META_METRICS
200                .refresh_cron_job_miss_cnt
201                .with_guarded_label_values(&[&job.table_id.to_string()])
202                .inc();
203            tracing::warn!(table_id = %job.table_id, "skip scheduled refresh: current status is not idle: {:?}", job.current_status);
204            return Ok(());
205        }
206        let Some(interval_secs) = job.trigger_interval_secs else {
207            return Ok(());
208        };
209        if interval_secs <= 0 {
210            return Ok(());
211        }
212
213        let interval = ChronoDuration::seconds(interval_secs);
214        let last_run = if let Some(last_run) = job.last_trigger_time {
215            last_run
216        } else {
217            self.metadata_manager
218                .get_table_catalog_by_ids(&[job.table_id])
219                .await?
220                .first()
221                .map(|t| {
222                    Epoch(t.created_at_epoch())
223                        .as_timestamptz()
224                        .to_datetime_utc()
225                        .timestamp_millis()
226                })
227                .unwrap()
228        };
229        let now = Utc::now().naive_utc();
230        if now.signed_duration_since(
231            DateTime::from_timestamp_millis(last_run)
232                .unwrap()
233                .naive_utc(),
234        ) < interval
235        {
236            return Ok(());
237        }
238
239        let table = self
240            .metadata_manager
241            .catalog_controller
242            .get_table_by_id(job.table_id)
243            .await?;
244        if !table.refreshable {
245            return Ok(());
246        }
247
248        let Some(OptionalAssociatedSourceId::AssociatedSourceId(src_id)) =
249            table.optional_associated_source_id
250        else {
251            tracing::warn!(
252                table_id = %job.table_id,
253                "skip scheduled refresh: missing associated source id"
254            );
255            return Ok(());
256        };
257        let associated_source_id = SourceId::new(src_id);
258
259        // Increment cron job trigger counter
260        let table_id_str = job.table_id.to_string();
261        GLOBAL_META_METRICS
262            .refresh_cron_job_trigger_cnt
263            .with_guarded_label_values(&[&table_id_str])
264            .inc();
265        tracing::info!(table_id = %job.table_id, "trigger scheduled refresh at interval {:?}", interval);
266
267        self.ensure_refreshable(job.table_id, associated_source_id)
268            .await?;
269        self.execute_refresh(job.table_id, associated_source_id, &self.shared_actor_infos)
270            .await?;
271        Ok(())
272    }
273
274    async fn execute_refresh(
275        self: &Arc<Self>,
276        table_id: TableId,
277        associated_source_id: SourceId,
278        shared_actor_infos: &SharedActorInfos,
279    ) -> MetaResult<()> {
280        let trigger_time = Utc::now().naive_utc();
281        let database_id = self
282            .metadata_manager
283            .catalog_controller
284            .get_object_database_id(table_id)
285            .await?;
286
287        let job_fragments = self
288            .metadata_manager
289            .get_job_fragments_by_id(table_id.as_job_id())
290            .await?;
291
292        let mut tracker = SingleTableRefreshProgressTracker::new();
293        {
294            let fragment_info_guard = shared_actor_infos.read_guard();
295            for (fragment_id, fragment) in &job_fragments.fragments {
296                if fragment
297                    .fragment_type_mask
298                    .contains(FragmentTypeFlag::Source)
299                    && !fragment.fragment_type_mask.contains(FragmentTypeFlag::Dml)
300                {
301                    let fragment_info = fragment_info_guard
302                        .get_fragment(*fragment_id)
303                        .ok_or_else(|| MetaError::fragment_not_found(*fragment_id))?;
304                    tracker.expected_list_actors.extend(
305                        fragment_info
306                            .actors
307                            .keys()
308                            .map(|actor_id| *actor_id as ActorId),
309                    );
310                }
311
312                if fragment
313                    .fragment_type_mask
314                    .contains(FragmentTypeFlag::FsFetch)
315                    && let Some(fragment_info) = fragment_info_guard.get_fragment(*fragment_id)
316                {
317                    tracker.expected_fetch_actors.extend(
318                        fragment_info
319                            .actors
320                            .keys()
321                            .map(|actor_id| *actor_id as ActorId),
322                    );
323                }
324            }
325        }
326
327        self.register_progress_tracker(table_id, database_id, tracker);
328
329        self.metadata_manager
330            .update_refresh_job_status(
331                table_id,
332                RefreshState::Refreshing,
333                Some(trigger_time),
334                false,
335            )
336            .await?;
337
338        let refresh_command = Command::Refresh {
339            table_id,
340            associated_source_id,
341        };
342
343        let result = self
344            .barrier_scheduler
345            .run_command(database_id, refresh_command)
346            .await;
347
348        match result {
349            Ok(_) => {
350                tracing::info!(table_id = %table_id, "refresh command scheduled");
351                Ok(())
352            }
353            Err(err) => {
354                tracing::error!(
355                    error = %err.as_report(),
356                    table_id = %table_id,
357                    "failed to execute refresh command"
358                );
359                self.metadata_manager
360                    .update_refresh_job_status(table_id, RefreshState::Idle, None, false)
361                    .await?;
362                self.remove_progress_tracker(table_id, "failure");
363                Err(anyhow!(err)
364                    .context(format!("Failed to refresh table {}", table_id))
365                    .into())
366            }
367        }
368    }
369
370    async fn ensure_refreshable(
371        &self,
372        table_id: TableId,
373        associated_source_id: SourceId,
374    ) -> MetaResult<()> {
375        let table = self
376            .metadata_manager
377            .catalog_controller
378            .get_table_by_id(table_id)
379            .await?;
380
381        if !table.refreshable {
382            return Err(MetaError::invalid_parameter(format!(
383                "Table '{}' is not refreshable. Only tables created with REFRESHABLE flag support refresh.",
384                table.name
385            )));
386        }
387
388        if table.optional_associated_source_id != Some(associated_source_id.into()) {
389            return Err(MetaError::invalid_parameter(format!(
390                "Table '{}' is not associated with source '{}'. table.optional_associated_source_id: {:?}",
391                table.name, associated_source_id, table.optional_associated_source_id
392            )));
393        }
394
395        let refresh_job_state = self
396            .metadata_manager
397            .catalog_controller
398            .get_refresh_job_state_by_table_id(table_id)
399            .await?;
400        if refresh_job_state != RefreshState::Idle {
401            return Err(MetaError::invalid_parameter(format!(
402                "Table '{}' is not in idle state. Current state: {:?}",
403                table.name, refresh_job_state
404            )));
405        }
406
407        Ok(())
408    }
409
410    fn register_progress_tracker(
411        &self,
412        table_id: TableId,
413        database_id: DatabaseId,
414        tracker: SingleTableRefreshProgressTracker,
415    ) {
416        let mut guard = self.progress_trackers.lock();
417        guard.inner.insert(table_id, tracker);
418        guard
419            .table_id_by_database_id
420            .entry(database_id)
421            .or_default()
422            .insert(table_id);
423    }
424
425    pub fn remove_progress_tracker(&self, table_id: TableId, status: &str) {
426        let mut guard = self.progress_trackers.lock();
427        if let Some(entry) = guard.inner.remove(&table_id) {
428            let status = status.to_owned();
429            GLOBAL_META_METRICS
430                .refresh_job_duration
431                .with_guarded_label_values(&[&table_id.to_string(), &status])
432                .set(entry.start_time.elapsed().as_secs());
433            GLOBAL_META_METRICS
434                .refresh_job_finish_cnt
435                .with_guarded_label_values(&[&table_id.to_string(), &status])
436                .inc();
437        }
438        guard.table_id_by_database_id.values_mut().for_each(|set| {
439            set.remove(&table_id);
440        });
441    }
442}
443
444#[derive(Default, Debug)]
445pub struct GlobalRefreshTableProgressTracker {
446    pub inner: HashMap<TableId, SingleTableRefreshProgressTracker>,
447    pub table_id_by_database_id: HashMap<DatabaseId, HashSet<TableId>>,
448}
449
450impl GlobalRefreshTableProgressTracker {
451    pub fn remove_tracker_by_database_id(&mut self, database_id: DatabaseId) {
452        if let Some(table_ids) = self.table_id_by_database_id.remove(&database_id) {
453            for table_id in table_ids {
454                self.inner.remove(&table_id);
455            }
456        }
457    }
458}
459
460#[derive(Debug)]
461pub struct SingleTableRefreshProgressTracker {
462    pub expected_list_actors: HashSet<ActorId>,
463    pub expected_fetch_actors: HashSet<ActorId>,
464    pub list_finished_actors: HashSet<ActorId>,
465    pub fetch_finished_actors: HashSet<ActorId>,
466
467    pub start_time: Instant,
468}
469
470impl SingleTableRefreshProgressTracker {
471    pub fn new() -> Self {
472        Self {
473            expected_list_actors: HashSet::new(),
474            expected_fetch_actors: HashSet::new(),
475            list_finished_actors: HashSet::new(),
476            fetch_finished_actors: HashSet::new(),
477            start_time: Instant::now(),
478        }
479    }
480
481    pub fn report_list_finished(&mut self, actor_ids: impl Iterator<Item = ActorId>) {
482        self.list_finished_actors.extend(actor_ids);
483    }
484
485    pub fn is_list_finished(&self) -> MetaResult<bool> {
486        if self.list_finished_actors.len() >= self.expected_list_actors.len() {
487            if self.expected_list_actors == self.list_finished_actors {
488                Ok(true)
489            } else {
490                Err(MetaError::from(anyhow!(
491                    "list finished actors mismatch: expected: {:?}, actual: {:?}",
492                    self.expected_list_actors,
493                    self.list_finished_actors
494                )))
495            }
496        } else {
497            Ok(false)
498        }
499    }
500
501    pub fn report_load_finished(&mut self, actor_ids: impl Iterator<Item = ActorId>) {
502        self.fetch_finished_actors.extend(actor_ids);
503    }
504
505    pub fn is_load_finished(&self) -> MetaResult<bool> {
506        if self.fetch_finished_actors.len() >= self.expected_fetch_actors.len() {
507            if self.expected_fetch_actors == self.fetch_finished_actors {
508                Ok(true)
509            } else {
510                Err(MetaError::from(anyhow!(
511                    "fetch finished actors mismatch: expected: {:?}, actual: {:?}",
512                    self.expected_fetch_actors,
513                    self.fetch_finished_actors
514                )))
515            }
516        } else {
517            Ok(false)
518        }
519    }
520}