risingwave_meta/barrier/context/
context_impl.rs1use std::sync::Arc;
16
17use anyhow::Context;
18use risingwave_common::catalog::{DatabaseId, FragmentTypeFlag, TableId};
19use risingwave_meta_model::table::RefreshState;
20use risingwave_pb::common::WorkerNode;
21use risingwave_pb::hummock::HummockVersionStats;
22use risingwave_pb::stream_service::streaming_control_stream_request::PbInitRequest;
23use risingwave_rpc_client::StreamingControlHandle;
24use thiserror_ext::AsReport;
25
26use crate::MetaResult;
27use crate::barrier::command::CommandContext;
28use crate::barrier::context::{GlobalBarrierWorkerContext, GlobalBarrierWorkerContextImpl};
29use crate::barrier::progress::TrackingJob;
30use crate::barrier::schedule::MarkReadyOptions;
31use crate::barrier::{
32    BarrierManagerStatus, BarrierWorkerRuntimeInfoSnapshot, Command, CreateStreamingJobCommandInfo,
33    CreateStreamingJobType, DatabaseRuntimeInfoSnapshot, RecoveryReason, ReplaceStreamJobPlan,
34    Scheduled,
35};
36use crate::hummock::CommitEpochInfo;
37use crate::model::FragmentDownstreamRelation;
38use crate::stream::{SourceChange, SplitState};
39
40impl GlobalBarrierWorkerContext for GlobalBarrierWorkerContextImpl {
41    #[await_tree::instrument]
42    async fn commit_epoch(&self, commit_info: CommitEpochInfo) -> MetaResult<HummockVersionStats> {
43        self.hummock_manager.commit_epoch(commit_info).await?;
44        Ok(self.hummock_manager.get_version_stats().await)
45    }
46
47    #[await_tree::instrument("next_scheduled_barrier")]
48    async fn next_scheduled(&self) -> Scheduled {
49        self.scheduled_barriers.next_scheduled().await
50    }
51
52    fn abort_and_mark_blocked(
53        &self,
54        database_id: Option<DatabaseId>,
55        recovery_reason: RecoveryReason,
56    ) {
57        if database_id.is_none() {
58            self.set_status(BarrierManagerStatus::Recovering(recovery_reason));
59        }
60
61        self.scheduled_barriers
63            .abort_and_mark_blocked(database_id, "cluster is under recovering");
64    }
65
66    fn mark_ready(&self, options: MarkReadyOptions) {
67        let is_global = matches!(&options, MarkReadyOptions::Global { .. });
68        self.scheduled_barriers.mark_ready(options);
69        if is_global {
70            self.set_status(BarrierManagerStatus::Running);
71        }
72    }
73
74    #[await_tree::instrument("post_collect_command({command})")]
75    async fn post_collect_command<'a>(&'a self, command: &'a CommandContext) -> MetaResult<()> {
76        command.post_collect(self).await
77    }
78
79    async fn notify_creating_job_failed(&self, database_id: Option<DatabaseId>, err: String) {
80        self.metadata_manager
81            .notify_finish_failed(database_id, err)
82            .await
83    }
84
85    #[await_tree::instrument("finish_creating_job({job})")]
86    async fn finish_creating_job(&self, job: TrackingJob) -> MetaResult<()> {
87        job.finish(&self.metadata_manager, &self.source_manager)
88            .await
89    }
90
91    #[await_tree::instrument("finish_cdc_table_backfill({job})")]
92    async fn finish_cdc_table_backfill(&self, job: TableId) -> MetaResult<()> {
93        self.env.cdc_table_backfill_tracker.complete_job(job).await
94    }
95
96    #[await_tree::instrument("new_control_stream({})", node.id)]
97    async fn new_control_stream(
98        &self,
99        node: &WorkerNode,
100        init_request: &PbInitRequest,
101    ) -> MetaResult<StreamingControlHandle> {
102        self.new_control_stream_impl(node, init_request).await
103    }
104
105    async fn reload_runtime_info(&self) -> MetaResult<BarrierWorkerRuntimeInfoSnapshot> {
106        self.reload_runtime_info_impl().await
107    }
108
109    async fn reload_database_runtime_info(
110        &self,
111        database_id: DatabaseId,
112    ) -> MetaResult<Option<DatabaseRuntimeInfoSnapshot>> {
113        self.reload_database_runtime_info_impl(database_id).await
114    }
115
116    async fn handle_list_finished_source_ids(
117        &self,
118        list_finished_source_ids: Vec<u32>,
119    ) -> MetaResult<()> {
120        use risingwave_common::catalog::TableId;
121
122        tracing::info!(
123            "Handling list finished source IDs: {:?}",
124            list_finished_source_ids
125        );
126
127        use crate::barrier::Command;
128        for associated_source_id in list_finished_source_ids {
129            let res: MetaResult<()> = try {
130                tracing::info!(%associated_source_id, "Scheduling ListFinish command for refreshable batch source");
131
132                let associated_source_id = TableId::new(associated_source_id);
134                let table_id = self
136                    .metadata_manager
137                    .catalog_controller
138                    .get_table_by_associate_source_id(associated_source_id.table_id() as _)
139                    .await
140                    .context("Failed to get table id for source")?
141                    .id
142                    .into();
143
144                let database_id = self
146                    .metadata_manager
147                    .catalog_controller
148                    .get_object_database_id(associated_source_id.table_id() as _)
149                    .await
150                    .context("Failed to get database id for table")?;
151
152                let list_finish_command = Command::ListFinish {
154                    table_id,
155                    associated_source_id,
156                };
157
158                self.barrier_scheduler
160                    .run_command_no_wait(
161                        risingwave_common::catalog::DatabaseId::new(database_id as u32),
162                        list_finish_command,
163                    )
164                    .context("Failed to schedule ListFinish command")?;
165
166                tracing::info!(%associated_source_id, %table_id, "ListFinish command scheduled successfully");
167            };
168            if let Err(e) = res {
169                tracing::error!(error = %e.as_report(), %associated_source_id, "Failed to handle source list finished");
170            }
171        }
172        Ok(())
173    }
174
175    async fn handle_load_finished_source_ids(
176        &self,
177        load_finished_source_ids: Vec<u32>,
178    ) -> MetaResult<()> {
179        use risingwave_common::catalog::TableId;
180
181        tracing::info!(
182            "Handling load finished source IDs: {:?}",
183            load_finished_source_ids
184        );
185
186        use crate::barrier::Command;
187        for associated_source_id in load_finished_source_ids {
188            let res: MetaResult<()> = try {
189                tracing::info!(%associated_source_id, "Scheduling LoadFinish command for refreshable batch source");
190
191                let associated_source_id = TableId::new(associated_source_id);
193                let table_id = self
195                    .metadata_manager
196                    .catalog_controller
197                    .get_table_by_associate_source_id(associated_source_id.table_id() as _)
198                    .await
199                    .context("Failed to get table id for source")?
200                    .id
201                    .into();
202
203                let database_id = self
205                    .metadata_manager
206                    .catalog_controller
207                    .get_object_database_id(associated_source_id.table_id() as _)
208                    .await
209                    .context("Failed to get database id for table")?;
210
211                let load_finish_command = Command::LoadFinish {
213                    table_id,
214                    associated_source_id,
215                };
216
217                self.barrier_scheduler
219                    .run_command_no_wait(
220                        risingwave_common::catalog::DatabaseId::new(database_id as u32),
221                        load_finish_command,
222                    )
223                    .context("Failed to schedule LoadFinish command")?;
224
225                tracing::info!(%associated_source_id, %associated_source_id, "LoadFinish command scheduled successfully");
226            };
227            if let Err(e) = res {
228                tracing::error!(error = %e.as_report(), %associated_source_id, "Failed to handle source load finished");
229            }
230        }
231
232        Ok(())
233    }
234
235    async fn handle_refresh_finished_table_ids(
236        &self,
237        refresh_finished_table_ids: Vec<u32>,
238    ) -> MetaResult<()> {
239        use risingwave_meta_model::table::RefreshState;
240
241        tracing::info!(
242            "Handling refresh finished table IDs: {:?}",
243            refresh_finished_table_ids
244        );
245
246        for table_id in refresh_finished_table_ids {
247            let res: MetaResult<()> = try {
248                tracing::info!(%table_id, "Processing refresh finished for materialized view");
249
250                self.metadata_manager
252                    .catalog_controller
253                    .set_table_refresh_state(table_id as i32, RefreshState::Idle)
254                    .await
255                    .context("Failed to set table refresh state to Idle")?;
256
257                tracing::info!(%table_id, "Table refresh completed, state updated to Idle");
258            };
259            if let Err(e) = res {
260                tracing::error!(error = %e.as_report(), %table_id, "Failed to handle refresh finished table");
261            }
262        }
263
264        Ok(())
265    }
266}
267
268impl GlobalBarrierWorkerContextImpl {
269    fn set_status(&self, new_status: BarrierManagerStatus) {
270        self.status.store(Arc::new(new_status));
271    }
272}
273
274impl CommandContext {
275    pub async fn post_collect(
278        &self,
279        barrier_manager_context: &GlobalBarrierWorkerContextImpl,
280    ) -> MetaResult<()> {
281        let Some(command) = &self.command else {
282            return Ok(());
283        };
284        match command {
285            Command::Flush => {}
286
287            Command::Throttle(_) => {}
288
289            Command::Pause => {}
290
291            Command::Resume => {}
292
293            Command::SourceChangeSplit(SplitState {
294                split_assignment: assignment,
295                ..
296            }) => {
297                barrier_manager_context
298                    .metadata_manager
299                    .update_fragment_splits(assignment)
300                    .await?;
301            }
302
303            Command::DropStreamingJobs {
304                unregistered_state_table_ids,
305                ..
306            } => {
307                barrier_manager_context
308                    .hummock_manager
309                    .unregister_table_ids(unregistered_state_table_ids.iter().cloned())
310                    .await?;
311                barrier_manager_context
312                    .metadata_manager
313                    .catalog_controller
314                    .complete_dropped_tables(
315                        unregistered_state_table_ids
316                            .iter()
317                            .map(|id| id.table_id as _),
318                    )
319                    .await;
320            }
321            Command::ConnectorPropsChange(obj_id_map_props) => {
322                barrier_manager_context
324                    .source_manager
325                    .apply_source_change(SourceChange::UpdateSourceProps {
326                        source_id_map_new_props: obj_id_map_props.clone(),
327                    })
328                    .await;
329            }
330            Command::CreateStreamingJob {
331                info,
332                job_type,
333                cross_db_snapshot_backfill_info,
334            } => {
335                match job_type {
336                    CreateStreamingJobType::SinkIntoTable(_) | CreateStreamingJobType::Normal => {
337                        barrier_manager_context
338                            .metadata_manager
339                            .catalog_controller
340                            .fill_snapshot_backfill_epoch(
341                                info.stream_job_fragments.fragments.iter().filter_map(
342                                    |(fragment_id, fragment)| {
343                                        if fragment.fragment_type_mask.contains(
344                                            FragmentTypeFlag::CrossDbSnapshotBackfillStreamScan,
345                                        ) {
346                                            Some(*fragment_id as _)
347                                        } else {
348                                            None
349                                        }
350                                    },
351                                ),
352                                None,
353                                cross_db_snapshot_backfill_info,
354                            )
355                            .await?
356                    }
357                    CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info) => {
358                        barrier_manager_context
359                            .metadata_manager
360                            .catalog_controller
361                            .fill_snapshot_backfill_epoch(
362                                info.stream_job_fragments.fragments.iter().filter_map(
363                                    |(fragment_id, fragment)| {
364                                        if fragment.fragment_type_mask.contains_any([
365                                            FragmentTypeFlag::SnapshotBackfillStreamScan,
366                                            FragmentTypeFlag::CrossDbSnapshotBackfillStreamScan,
367                                        ]) {
368                                            Some(*fragment_id as _)
369                                        } else {
370                                            None
371                                        }
372                                    },
373                                ),
374                                Some(snapshot_backfill_info),
375                                cross_db_snapshot_backfill_info,
376                            )
377                            .await?
378                    }
379                }
380
381                let CreateStreamingJobCommandInfo {
384                    stream_job_fragments,
385                    upstream_fragment_downstreams,
386                    ..
387                } = info;
388                let new_sink_downstream =
389                    if let CreateStreamingJobType::SinkIntoTable(ctx) = job_type {
390                        let new_downstreams = ctx.new_sink_downstream.clone();
391                        let new_downstreams = FragmentDownstreamRelation::from([(
392                            ctx.sink_fragment_id,
393                            vec![new_downstreams],
394                        )]);
395                        Some(new_downstreams)
396                    } else {
397                        None
398                    };
399
400                barrier_manager_context
401                    .metadata_manager
402                    .catalog_controller
403                    .post_collect_job_fragments(
404                        stream_job_fragments.stream_job_id().table_id as _,
405                        upstream_fragment_downstreams,
406                        new_sink_downstream,
407                        Some(&info.init_split_assignment),
408                    )
409                    .await?;
410
411                let source_change = SourceChange::CreateJob {
412                    added_source_fragments: stream_job_fragments.stream_source_fragments(),
413                    added_backfill_fragments: stream_job_fragments.source_backfill_fragments(),
414                };
415
416                barrier_manager_context
417                    .source_manager
418                    .apply_source_change(source_change)
419                    .await;
420            }
421            Command::RescheduleFragment { reschedules, .. } => {
422                let fragment_splits = reschedules
423                    .iter()
424                    .map(|(fragment_id, reschedule)| {
425                        (*fragment_id, reschedule.actor_splits.clone())
426                    })
427                    .collect();
428
429                barrier_manager_context
430                    .metadata_manager
431                    .update_fragment_splits(&fragment_splits)
432                    .await?;
433            }
434
435            Command::ReplaceStreamJob(
436                replace_plan @ ReplaceStreamJobPlan {
437                    old_fragments,
438                    new_fragments,
439                    upstream_fragment_downstreams,
440                    to_drop_state_table_ids,
441                    auto_refresh_schema_sinks,
442                    init_split_assignment,
443                    ..
444                },
445            ) => {
446                barrier_manager_context
448                    .metadata_manager
449                    .catalog_controller
450                    .post_collect_job_fragments(
451                        new_fragments.stream_job_id.table_id as _,
452                        upstream_fragment_downstreams,
453                        None,
454                        Some(init_split_assignment),
455                    )
456                    .await?;
457
458                if let Some(sinks) = auto_refresh_schema_sinks {
459                    for sink in sinks {
460                        barrier_manager_context
461                            .metadata_manager
462                            .catalog_controller
463                            .post_collect_job_fragments(
464                                sink.tmp_sink_id,
465                                &Default::default(), None, None, )
469                            .await?;
470                    }
471                }
472
473                barrier_manager_context
475                    .source_manager
476                    .handle_replace_job(
477                        old_fragments,
478                        new_fragments.stream_source_fragments(),
479                        replace_plan,
480                    )
481                    .await;
482                barrier_manager_context
483                    .hummock_manager
484                    .unregister_table_ids(to_drop_state_table_ids.iter().cloned())
485                    .await?;
486            }
487
488            Command::CreateSubscription {
489                subscription_id, ..
490            } => {
491                barrier_manager_context
492                    .metadata_manager
493                    .catalog_controller
494                    .finish_create_subscription_catalog(*subscription_id)
495                    .await?
496            }
497            Command::DropSubscription { .. } => {}
498            Command::MergeSnapshotBackfillStreamingJobs(_) => {}
499            Command::StartFragmentBackfill { .. } => {}
500            Command::Refresh { table_id, .. } => {
501                barrier_manager_context
502                    .metadata_manager
503                    .catalog_controller
504                    .set_table_refresh_state(table_id.table_id() as i32, RefreshState::Refreshing)
505                    .await?;
506            }
507            Command::ListFinish { .. } => {
508                }
511            Command::LoadFinish { table_id, .. } => {
512                barrier_manager_context
513                    .metadata_manager
514                    .catalog_controller
515                    .set_table_refresh_state(table_id.table_id() as i32, RefreshState::Finishing)
516                    .await?;
517            }
518        }
519
520        Ok(())
521    }
522}