risingwave_meta/barrier/context/
context_impl.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use anyhow::Context;
18use risingwave_common::catalog::{DatabaseId, FragmentTypeFlag, TableId};
19use risingwave_pb::common::WorkerNode;
20use risingwave_pb::hummock::HummockVersionStats;
21use risingwave_pb::meta::object::PbObjectInfo;
22use risingwave_pb::meta::subscribe_response::{Operation, PbInfo};
23use risingwave_pb::meta::{PbObject, PbObjectGroup};
24use risingwave_pb::stream_service::streaming_control_stream_request::PbInitRequest;
25use risingwave_rpc_client::StreamingControlHandle;
26use thiserror_ext::AsReport;
27
28use crate::MetaResult;
29use crate::barrier::command::CommandContext;
30use crate::barrier::context::{GlobalBarrierWorkerContext, GlobalBarrierWorkerContextImpl};
31use crate::barrier::progress::TrackingJob;
32use crate::barrier::schedule::MarkReadyOptions;
33use crate::barrier::{
34    BarrierManagerStatus, BarrierWorkerRuntimeInfoSnapshot, Command, CreateStreamingJobCommandInfo,
35    CreateStreamingJobType, DatabaseRuntimeInfoSnapshot, RecoveryReason, ReplaceStreamJobPlan,
36    Scheduled,
37};
38use crate::hummock::CommitEpochInfo;
39use crate::model::FragmentDownstreamRelation;
40use crate::stream::SourceChange;
41
42impl GlobalBarrierWorkerContext for GlobalBarrierWorkerContextImpl {
43    #[await_tree::instrument]
44    async fn commit_epoch(&self, commit_info: CommitEpochInfo) -> MetaResult<HummockVersionStats> {
45        self.hummock_manager.commit_epoch(commit_info).await?;
46        Ok(self.hummock_manager.get_version_stats().await)
47    }
48
49    #[await_tree::instrument("next_scheduled_barrier")]
50    async fn next_scheduled(&self) -> Scheduled {
51        self.scheduled_barriers.next_scheduled().await
52    }
53
54    fn abort_and_mark_blocked(
55        &self,
56        database_id: Option<DatabaseId>,
57        recovery_reason: RecoveryReason,
58    ) {
59        if database_id.is_none() {
60            self.set_status(BarrierManagerStatus::Recovering(recovery_reason));
61        }
62
63        // Mark blocked and abort buffered schedules, they might be dirty already.
64        self.scheduled_barriers
65            .abort_and_mark_blocked(database_id, "cluster is under recovering");
66    }
67
68    fn mark_ready(&self, options: MarkReadyOptions) {
69        let is_global = matches!(&options, MarkReadyOptions::Global { .. });
70        self.scheduled_barriers.mark_ready(options);
71        if is_global {
72            self.set_status(BarrierManagerStatus::Running);
73        }
74    }
75
76    #[await_tree::instrument("post_collect_command({command})")]
77    async fn post_collect_command<'a>(&'a self, command: &'a CommandContext) -> MetaResult<()> {
78        command.post_collect(self).await
79    }
80
81    async fn notify_creating_job_failed(&self, database_id: Option<DatabaseId>, err: String) {
82        self.metadata_manager
83            .notify_finish_failed(database_id, err)
84            .await
85    }
86
87    #[await_tree::instrument("finish_creating_job({job})")]
88    async fn finish_creating_job(&self, job: TrackingJob) -> MetaResult<()> {
89        job.finish(&self.metadata_manager).await
90    }
91
92    #[await_tree::instrument("finish_cdc_table_backfill({job})")]
93    async fn finish_cdc_table_backfill(&self, job: TableId) -> MetaResult<()> {
94        self.env.cdc_table_backfill_tracker.complete_job(job).await
95    }
96
97    #[await_tree::instrument("new_control_stream({})", node.id)]
98    async fn new_control_stream(
99        &self,
100        node: &WorkerNode,
101        init_request: &PbInitRequest,
102    ) -> MetaResult<StreamingControlHandle> {
103        self.new_control_stream_impl(node, init_request).await
104    }
105
106    async fn reload_runtime_info(&self) -> MetaResult<BarrierWorkerRuntimeInfoSnapshot> {
107        self.reload_runtime_info_impl().await
108    }
109
110    async fn reload_database_runtime_info(
111        &self,
112        database_id: DatabaseId,
113    ) -> MetaResult<Option<DatabaseRuntimeInfoSnapshot>> {
114        self.reload_database_runtime_info_impl(database_id).await
115    }
116
117    async fn handle_load_finished_source_ids(
118        &self,
119        load_finished_source_ids: Vec<u32>,
120    ) -> MetaResult<()> {
121        use risingwave_common::catalog::TableId;
122
123        tracing::info!(
124            "Handling load finished source IDs: {:?}",
125            load_finished_source_ids
126        );
127
128        use crate::barrier::Command;
129        for associated_source_id in load_finished_source_ids {
130            let res: MetaResult<()> = try {
131                tracing::info!(%associated_source_id, "Scheduling LoadFinish command for refreshable batch source");
132
133                // For refreshable batch sources, associated_source_id is the table_id
134                let table_id = TableId::new(associated_source_id);
135                let associated_source_id = table_id;
136
137                // Find the database ID for this table
138                let database_id = self
139                    .metadata_manager
140                    .catalog_controller
141                    .get_object_database_id(table_id.table_id() as _)
142                    .await
143                    .context("Failed to get database id for table")?;
144
145                // Create LoadFinish command
146                let load_finish_command = Command::LoadFinish {
147                    table_id,
148                    associated_source_id,
149                };
150
151                // Schedule the command through the barrier system without waiting
152                self.barrier_scheduler
153                    .run_command_no_wait(
154                        risingwave_common::catalog::DatabaseId::new(database_id as u32),
155                        load_finish_command,
156                    )
157                    .context("Failed to schedule LoadFinish command")?;
158
159                tracing::info!(%table_id, %associated_source_id, "LoadFinish command scheduled successfully");
160            };
161            if let Err(e) = res {
162                tracing::error!(error = %e.as_report(),%associated_source_id, "Failed to handle source load finished");
163            }
164        }
165
166        Ok(())
167    }
168}
169
170impl GlobalBarrierWorkerContextImpl {
171    fn set_status(&self, new_status: BarrierManagerStatus) {
172        self.status.store(Arc::new(new_status));
173    }
174}
175
176impl CommandContext {
177    /// Do some stuffs after barriers are collected and the new storage version is committed, for
178    /// the given command.
179    pub async fn post_collect(
180        &self,
181        barrier_manager_context: &GlobalBarrierWorkerContextImpl,
182    ) -> MetaResult<()> {
183        let Some(command) = &self.command else {
184            return Ok(());
185        };
186        match command {
187            Command::Flush => {}
188
189            Command::Throttle(_) => {}
190
191            Command::Pause => {}
192
193            Command::Resume => {}
194
195            Command::SourceChangeSplit(split_assignment) => {
196                barrier_manager_context
197                    .metadata_manager
198                    .update_actor_splits_by_split_assignment(split_assignment)
199                    .await?;
200                barrier_manager_context
201                    .source_manager
202                    .apply_source_change(SourceChange::SplitChange(split_assignment.clone()))
203                    .await;
204            }
205
206            Command::DropStreamingJobs {
207                unregistered_state_table_ids,
208                ..
209            } => {
210                barrier_manager_context
211                    .hummock_manager
212                    .unregister_table_ids(unregistered_state_table_ids.iter().cloned())
213                    .await?;
214                let tables = barrier_manager_context
215                    .metadata_manager
216                    .catalog_controller
217                    .complete_dropped_tables(
218                        unregistered_state_table_ids
219                            .iter()
220                            .map(|id| id.table_id as _),
221                    )
222                    .await;
223                let objects = tables
224                    .into_iter()
225                    .map(|t| PbObject {
226                        object_info: Some(PbObjectInfo::Table(t)),
227                    })
228                    .collect();
229                let group = PbInfo::ObjectGroup(PbObjectGroup { objects });
230                barrier_manager_context
231                    .env
232                    .notification_manager()
233                    .notify_hummock(Operation::Delete, group.clone())
234                    .await;
235                barrier_manager_context
236                    .env
237                    .notification_manager()
238                    .notify_compactor(Operation::Delete, group)
239                    .await;
240            }
241            Command::ConnectorPropsChange(obj_id_map_props) => {
242                // todo: we dont know the type of the object id, it can be a source or a sink. Should carry more info in the barrier command.
243                barrier_manager_context
244                    .source_manager
245                    .apply_source_change(SourceChange::UpdateSourceProps {
246                        source_id_map_new_props: obj_id_map_props.clone(),
247                    })
248                    .await;
249            }
250            Command::CreateStreamingJob {
251                info,
252                job_type,
253                cross_db_snapshot_backfill_info,
254            } => {
255                match job_type {
256                    CreateStreamingJobType::SinkIntoTable(_) | CreateStreamingJobType::Normal => {
257                        barrier_manager_context
258                            .metadata_manager
259                            .catalog_controller
260                            .fill_snapshot_backfill_epoch(
261                                info.stream_job_fragments.fragments.iter().filter_map(
262                                    |(fragment_id, fragment)| {
263                                        if fragment.fragment_type_mask.contains(
264                                            FragmentTypeFlag::CrossDbSnapshotBackfillStreamScan,
265                                        ) {
266                                            Some(*fragment_id as _)
267                                        } else {
268                                            None
269                                        }
270                                    },
271                                ),
272                                None,
273                                cross_db_snapshot_backfill_info,
274                            )
275                            .await?
276                    }
277                    CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info) => {
278                        barrier_manager_context
279                            .metadata_manager
280                            .catalog_controller
281                            .fill_snapshot_backfill_epoch(
282                                info.stream_job_fragments.fragments.iter().filter_map(
283                                    |(fragment_id, fragment)| {
284                                        if fragment.fragment_type_mask.contains_any([
285                                            FragmentTypeFlag::SnapshotBackfillStreamScan,
286                                            FragmentTypeFlag::CrossDbSnapshotBackfillStreamScan,
287                                        ]) {
288                                            Some(*fragment_id as _)
289                                        } else {
290                                            None
291                                        }
292                                    },
293                                ),
294                                Some(snapshot_backfill_info),
295                                cross_db_snapshot_backfill_info,
296                            )
297                            .await?
298                    }
299                }
300
301                // Do `post_collect_job_fragments` of the original streaming job in the end, so that in any previous failure,
302                // we won't mark the job as `Creating`, and then the job will be later clean by the recovery triggered by the returned error.
303                let CreateStreamingJobCommandInfo {
304                    stream_job_fragments,
305                    upstream_fragment_downstreams,
306                    init_split_assignment,
307                    ..
308                } = info;
309                let new_sink_downstream =
310                    if let CreateStreamingJobType::SinkIntoTable(ctx) = job_type {
311                        let new_downstreams = ctx.new_sink_downstream.clone();
312                        let new_downstreams = FragmentDownstreamRelation::from([(
313                            ctx.sink_fragment_id,
314                            vec![new_downstreams],
315                        )]);
316                        Some(new_downstreams)
317                    } else {
318                        None
319                    };
320
321                barrier_manager_context
322                    .metadata_manager
323                    .catalog_controller
324                    .post_collect_job_fragments(
325                        stream_job_fragments.stream_job_id().table_id as _,
326                        stream_job_fragments.actor_ids(),
327                        upstream_fragment_downstreams,
328                        init_split_assignment,
329                        new_sink_downstream,
330                    )
331                    .await?;
332
333                let source_change = SourceChange::CreateJob {
334                    added_source_fragments: stream_job_fragments.stream_source_fragments(),
335                    added_backfill_fragments: stream_job_fragments.source_backfill_fragments(),
336                    split_assignment: init_split_assignment.clone(),
337                };
338
339                barrier_manager_context
340                    .source_manager
341                    .apply_source_change(source_change)
342                    .await;
343            }
344            Command::RescheduleFragment {
345                reschedules,
346                post_updates,
347                ..
348            } => {
349                barrier_manager_context
350                    .scale_controller
351                    .post_apply_reschedule(reschedules, post_updates)
352                    .await?;
353            }
354
355            Command::ReplaceStreamJob(
356                replace_plan @ ReplaceStreamJobPlan {
357                    old_fragments,
358                    new_fragments,
359                    upstream_fragment_downstreams,
360                    init_split_assignment,
361                    to_drop_state_table_ids,
362                    auto_refresh_schema_sinks,
363                    ..
364                },
365            ) => {
366                // Update actors and actor_dispatchers for new table fragments.
367                barrier_manager_context
368                    .metadata_manager
369                    .catalog_controller
370                    .post_collect_job_fragments(
371                        new_fragments.stream_job_id.table_id as _,
372                        new_fragments.actor_ids(),
373                        upstream_fragment_downstreams,
374                        init_split_assignment,
375                        None,
376                    )
377                    .await?;
378
379                if let Some(sinks) = auto_refresh_schema_sinks {
380                    for sink in sinks {
381                        barrier_manager_context
382                            .metadata_manager
383                            .catalog_controller
384                            .post_collect_job_fragments(
385                                sink.tmp_sink_id,
386                                sink.actor_status.keys().cloned().collect(),
387                                &Default::default(), // upstream_fragment_downstreams is already inserted in the job of upstream table
388                                &Default::default(), // no split assignment
389                                None, // no replace plan
390                            )
391                            .await?;
392                    }
393                }
394
395                // Apply the split changes in source manager.
396                barrier_manager_context
397                    .source_manager
398                    .handle_replace_job(
399                        old_fragments,
400                        new_fragments.stream_source_fragments(),
401                        init_split_assignment.clone(),
402                        replace_plan,
403                    )
404                    .await;
405                barrier_manager_context
406                    .hummock_manager
407                    .unregister_table_ids(to_drop_state_table_ids.iter().cloned())
408                    .await?;
409            }
410
411            Command::CreateSubscription {
412                subscription_id, ..
413            } => {
414                barrier_manager_context
415                    .metadata_manager
416                    .catalog_controller
417                    .finish_create_subscription_catalog(*subscription_id)
418                    .await?
419            }
420            Command::DropSubscription { .. } => {}
421            Command::MergeSnapshotBackfillStreamingJobs(_) => {}
422            Command::StartFragmentBackfill { .. } => {}
423            Command::Refresh { .. } => {}
424            Command::LoadFinish { .. } => {}
425        }
426
427        Ok(())
428    }
429}