risingwave_meta/barrier/context/
context_impl.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use anyhow::Context;
19use risingwave_common::catalog::{DatabaseId, FragmentTypeFlag, TableId};
20use risingwave_common::id::JobId;
21use risingwave_meta_model::ActorId;
22use risingwave_pb::common::WorkerNode;
23use risingwave_pb::hummock::HummockVersionStats;
24use risingwave_pb::id::SourceId;
25use risingwave_pb::stream_service::barrier_complete_response::{
26    PbListFinishedSource, PbLoadFinishedSource,
27};
28use risingwave_pb::stream_service::streaming_control_stream_request::PbInitRequest;
29use risingwave_rpc_client::StreamingControlHandle;
30
31use crate::MetaResult;
32use crate::barrier::cdc_progress::CdcTableBackfillTracker;
33use crate::barrier::command::CommandContext;
34use crate::barrier::context::{GlobalBarrierWorkerContext, GlobalBarrierWorkerContextImpl};
35use crate::barrier::progress::TrackingJob;
36use crate::barrier::schedule::MarkReadyOptions;
37use crate::barrier::{
38    BarrierManagerStatus, BarrierWorkerRuntimeInfoSnapshot, Command, CreateStreamingJobCommandInfo,
39    CreateStreamingJobType, DatabaseRuntimeInfoSnapshot, RecoveryReason, ReplaceStreamJobPlan,
40    Scheduled,
41};
42use crate::hummock::CommitEpochInfo;
43use crate::manager::LocalNotification;
44use crate::model::FragmentDownstreamRelation;
45use crate::stream::{SourceChange, SplitState};
46
47impl GlobalBarrierWorkerContext for GlobalBarrierWorkerContextImpl {
48    #[await_tree::instrument]
49    async fn commit_epoch(&self, commit_info: CommitEpochInfo) -> MetaResult<HummockVersionStats> {
50        self.hummock_manager.commit_epoch(commit_info).await?;
51        Ok(self.hummock_manager.get_version_stats().await)
52    }
53
54    #[await_tree::instrument("next_scheduled_barrier")]
55    async fn next_scheduled(&self) -> Scheduled {
56        self.scheduled_barriers.next_scheduled().await
57    }
58
59    fn abort_and_mark_blocked(
60        &self,
61        database_id: Option<DatabaseId>,
62        recovery_reason: RecoveryReason,
63    ) {
64        if database_id.is_none() {
65            self.set_status(BarrierManagerStatus::Recovering(recovery_reason));
66        }
67
68        // Mark blocked and abort buffered schedules, they might be dirty already.
69        self.scheduled_barriers
70            .abort_and_mark_blocked(database_id, "cluster is under recovering");
71    }
72
73    fn mark_ready(&self, options: MarkReadyOptions) {
74        let is_global = matches!(&options, MarkReadyOptions::Global { .. });
75        self.scheduled_barriers.mark_ready(options);
76        if is_global {
77            self.set_status(BarrierManagerStatus::Running);
78        }
79    }
80
81    #[await_tree::instrument("post_collect_command({command})")]
82    async fn post_collect_command<'a>(&'a self, command: &'a CommandContext) -> MetaResult<()> {
83        command.post_collect(self).await
84    }
85
86    async fn notify_creating_job_failed(&self, database_id: Option<DatabaseId>, err: String) {
87        self.metadata_manager
88            .notify_finish_failed(database_id, err)
89            .await
90    }
91
92    #[await_tree::instrument("finish_creating_job({job})")]
93    async fn finish_creating_job(&self, job: TrackingJob) -> MetaResult<()> {
94        let job_id = job.job_id();
95        job.finish(&self.metadata_manager, &self.source_manager)
96            .await?;
97        self.env
98            .notification_manager()
99            .notify_local_subscribers(LocalNotification::StreamingJobBackfillFinished(job_id));
100        Ok(())
101    }
102
103    #[await_tree::instrument("finish_cdc_table_backfill({job})")]
104    async fn finish_cdc_table_backfill(&self, job: JobId) -> MetaResult<()> {
105        CdcTableBackfillTracker::mark_complete_job(&self.env.meta_store().conn, job).await
106    }
107
108    #[await_tree::instrument("new_control_stream({})", node.id)]
109    async fn new_control_stream(
110        &self,
111        node: &WorkerNode,
112        init_request: &PbInitRequest,
113    ) -> MetaResult<StreamingControlHandle> {
114        self.new_control_stream_impl(node, init_request).await
115    }
116
117    async fn reload_runtime_info(&self) -> MetaResult<BarrierWorkerRuntimeInfoSnapshot> {
118        self.reload_runtime_info_impl().await
119    }
120
121    async fn reload_database_runtime_info(
122        &self,
123        database_id: DatabaseId,
124    ) -> MetaResult<Option<DatabaseRuntimeInfoSnapshot>> {
125        self.reload_database_runtime_info_impl(database_id).await
126    }
127
128    async fn handle_list_finished_source_ids(
129        &self,
130        list_finished: Vec<PbListFinishedSource>,
131    ) -> MetaResult<()> {
132        let mut list_finished_info: HashMap<(TableId, SourceId), HashSet<ActorId>> = HashMap::new();
133
134        for list_finished in list_finished {
135            let table_id = list_finished.table_id;
136            let associated_source_id = list_finished.associated_source_id;
137            list_finished_info
138                .entry((table_id, associated_source_id))
139                .or_default()
140                .insert(list_finished.reporter_actor_id);
141        }
142
143        for ((table_id, associated_source_id), actors) in list_finished_info {
144            let allow_yield = self
145                .refresh_manager
146                .mark_list_stage_finished(table_id, &actors)?;
147
148            if !allow_yield {
149                continue;
150            }
151
152            // Find the database ID for this table
153            let database_id = self
154                .metadata_manager
155                .catalog_controller
156                .get_object_database_id(associated_source_id)
157                .await
158                .context("Failed to get database id for table")?;
159
160            // Create ListFinish command
161            let list_finish_command = Command::ListFinish {
162                table_id,
163                associated_source_id,
164            };
165
166            // Schedule the command through the barrier system without waiting
167            self.barrier_scheduler
168                .run_command_no_wait(database_id, list_finish_command)
169                .context("Failed to schedule ListFinish command")?;
170
171            tracing::info!(
172                %table_id,
173                %associated_source_id,
174                "ListFinish command scheduled successfully"
175            );
176        }
177        Ok(())
178    }
179
180    async fn handle_load_finished_source_ids(
181        &self,
182        load_finished: Vec<PbLoadFinishedSource>,
183    ) -> MetaResult<()> {
184        let mut load_finished_info: HashMap<(TableId, SourceId), HashSet<ActorId>> = HashMap::new();
185
186        for load_finished in load_finished {
187            let table_id = load_finished.table_id;
188            let associated_source_id = load_finished.associated_source_id;
189            load_finished_info
190                .entry((table_id, associated_source_id))
191                .or_default()
192                .insert(load_finished.reporter_actor_id);
193        }
194
195        for ((table_id, associated_source_id), actors) in load_finished_info {
196            let allow_yield = self
197                .refresh_manager
198                .mark_load_stage_finished(table_id, &actors)?;
199
200            if !allow_yield {
201                continue;
202            }
203
204            // Find the database ID for this table
205            let database_id = self
206                .metadata_manager
207                .catalog_controller
208                .get_object_database_id(associated_source_id)
209                .await
210                .context("Failed to get database id for table")?;
211
212            // Create LoadFinish command
213            let load_finish_command = Command::LoadFinish {
214                table_id,
215                associated_source_id,
216            };
217
218            // Schedule the command through the barrier system without waiting
219            self.barrier_scheduler
220                .run_command_no_wait(database_id, load_finish_command)
221                .context("Failed to schedule LoadFinish command")?;
222
223            tracing::info!(
224                %table_id,
225                %associated_source_id,
226                "LoadFinish command scheduled successfully"
227            );
228        }
229
230        Ok(())
231    }
232
233    async fn handle_refresh_finished_table_ids(
234        &self,
235        refresh_finished_table_job_ids: Vec<JobId>,
236    ) -> MetaResult<()> {
237        for job_id in refresh_finished_table_job_ids {
238            let table_id = job_id.as_mv_table_id();
239
240            self.refresh_manager.mark_refresh_complete(table_id).await?;
241        }
242
243        Ok(())
244    }
245}
246
247impl GlobalBarrierWorkerContextImpl {
248    fn set_status(&self, new_status: BarrierManagerStatus) {
249        self.status.store(Arc::new(new_status));
250    }
251}
252
253impl CommandContext {
254    /// Do some stuffs after barriers are collected and the new storage version is committed, for
255    /// the given command.
256    pub async fn post_collect(
257        &self,
258        barrier_manager_context: &GlobalBarrierWorkerContextImpl,
259    ) -> MetaResult<()> {
260        let Some(command) = &self.command else {
261            return Ok(());
262        };
263        match command {
264            Command::Flush => {}
265
266            Command::Throttle { .. } => {}
267
268            Command::Pause => {}
269
270            Command::Resume => {}
271
272            Command::SourceChangeSplit(SplitState {
273                split_assignment: assignment,
274                ..
275            }) => {
276                barrier_manager_context
277                    .metadata_manager
278                    .update_fragment_splits(assignment)
279                    .await?;
280            }
281
282            Command::DropStreamingJobs {
283                streaming_job_ids,
284                unregistered_state_table_ids,
285                ..
286            } => {
287                for job_id in streaming_job_ids {
288                    barrier_manager_context
289                        .refresh_manager
290                        .remove_progress_tracker(job_id.as_mv_table_id(), "drop_streaming_jobs");
291                }
292
293                barrier_manager_context
294                    .hummock_manager
295                    .unregister_table_ids(unregistered_state_table_ids.iter().cloned())
296                    .await?;
297                barrier_manager_context
298                    .metadata_manager
299                    .catalog_controller
300                    .complete_dropped_tables(unregistered_state_table_ids.iter().copied())
301                    .await;
302            }
303            Command::ConnectorPropsChange(obj_id_map_props) => {
304                // todo: we dont know the type of the object id, it can be a source or a sink. Should carry more info in the barrier command.
305                barrier_manager_context
306                    .source_manager
307                    .apply_source_change(SourceChange::UpdateSourceProps {
308                        // Only sources are managed in source manager. Convert object IDs to source IDs and let
309                        // source manager ignore unknown/unregistered sources.
310                        source_id_map_new_props: obj_id_map_props
311                            .iter()
312                            .map(|(object_id, props)| (object_id.as_source_id(), props.clone()))
313                            .collect(),
314                    })
315                    .await;
316            }
317            Command::CreateStreamingJob {
318                info,
319                job_type,
320                cross_db_snapshot_backfill_info,
321            } => {
322                match job_type {
323                    CreateStreamingJobType::SinkIntoTable(_) | CreateStreamingJobType::Normal => {
324                        barrier_manager_context
325                            .metadata_manager
326                            .catalog_controller
327                            .fill_snapshot_backfill_epoch(
328                                info.stream_job_fragments.fragments.iter().filter_map(
329                                    |(fragment_id, fragment)| {
330                                        if fragment.fragment_type_mask.contains(
331                                            FragmentTypeFlag::CrossDbSnapshotBackfillStreamScan,
332                                        ) {
333                                            Some(*fragment_id as _)
334                                        } else {
335                                            None
336                                        }
337                                    },
338                                ),
339                                None,
340                                cross_db_snapshot_backfill_info,
341                            )
342                            .await?
343                    }
344                    CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info) => {
345                        barrier_manager_context
346                            .metadata_manager
347                            .catalog_controller
348                            .fill_snapshot_backfill_epoch(
349                                info.stream_job_fragments.fragments.iter().filter_map(
350                                    |(fragment_id, fragment)| {
351                                        if fragment.fragment_type_mask.contains_any([
352                                            FragmentTypeFlag::SnapshotBackfillStreamScan,
353                                            FragmentTypeFlag::CrossDbSnapshotBackfillStreamScan,
354                                        ]) {
355                                            Some(*fragment_id as _)
356                                        } else {
357                                            None
358                                        }
359                                    },
360                                ),
361                                Some(snapshot_backfill_info),
362                                cross_db_snapshot_backfill_info,
363                            )
364                            .await?
365                    }
366                }
367
368                // Do `post_collect_job_fragments` of the original streaming job in the end, so that in any previous failure,
369                // we won't mark the job as `Creating`, and then the job will be later clean by the recovery triggered by the returned error.
370                let CreateStreamingJobCommandInfo {
371                    stream_job_fragments,
372                    upstream_fragment_downstreams,
373                    ..
374                } = info;
375                let new_sink_downstream =
376                    if let CreateStreamingJobType::SinkIntoTable(ctx) = job_type {
377                        let new_downstreams = ctx.new_sink_downstream.clone();
378                        let new_downstreams = FragmentDownstreamRelation::from([(
379                            ctx.sink_fragment_id,
380                            vec![new_downstreams],
381                        )]);
382                        Some(new_downstreams)
383                    } else {
384                        None
385                    };
386
387                barrier_manager_context
388                    .metadata_manager
389                    .catalog_controller
390                    .post_collect_job_fragments(
391                        stream_job_fragments.stream_job_id(),
392                        upstream_fragment_downstreams,
393                        new_sink_downstream,
394                        Some(&info.init_split_assignment),
395                    )
396                    .await?;
397
398                let source_change = SourceChange::CreateJob {
399                    added_source_fragments: stream_job_fragments.stream_source_fragments(),
400                    added_backfill_fragments: stream_job_fragments.source_backfill_fragments(),
401                };
402
403                barrier_manager_context
404                    .source_manager
405                    .apply_source_change(source_change)
406                    .await;
407            }
408            Command::RescheduleFragment { reschedules, .. } => {
409                let fragment_splits = reschedules
410                    .iter()
411                    .map(|(fragment_id, reschedule)| {
412                        (*fragment_id, reschedule.actor_splits.clone())
413                    })
414                    .collect();
415
416                barrier_manager_context
417                    .metadata_manager
418                    .update_fragment_splits(&fragment_splits)
419                    .await?;
420            }
421
422            Command::ReplaceStreamJob(
423                replace_plan @ ReplaceStreamJobPlan {
424                    old_fragments,
425                    new_fragments,
426                    upstream_fragment_downstreams,
427                    to_drop_state_table_ids,
428                    auto_refresh_schema_sinks,
429                    init_split_assignment,
430                    ..
431                },
432            ) => {
433                // Update actors and actor_dispatchers for new table fragments.
434                barrier_manager_context
435                    .metadata_manager
436                    .catalog_controller
437                    .post_collect_job_fragments(
438                        new_fragments.stream_job_id,
439                        upstream_fragment_downstreams,
440                        None,
441                        Some(init_split_assignment),
442                    )
443                    .await?;
444
445                if let Some(sinks) = auto_refresh_schema_sinks {
446                    for sink in sinks {
447                        barrier_manager_context
448                            .metadata_manager
449                            .catalog_controller
450                            .post_collect_job_fragments(
451                                sink.tmp_sink_id.as_job_id(),
452                                &Default::default(), // upstream_fragment_downstreams is already inserted in the job of upstream table
453                                None, // no replace plan
454                                None, // no init split assignment
455                            )
456                            .await?;
457                    }
458                }
459
460                // Apply the split changes in source manager.
461                barrier_manager_context
462                    .source_manager
463                    .handle_replace_job(
464                        old_fragments,
465                        new_fragments.stream_source_fragments(),
466                        replace_plan,
467                    )
468                    .await;
469                barrier_manager_context
470                    .hummock_manager
471                    .unregister_table_ids(to_drop_state_table_ids.iter().cloned())
472                    .await?;
473            }
474
475            Command::CreateSubscription {
476                subscription_id, ..
477            } => {
478                barrier_manager_context
479                    .metadata_manager
480                    .catalog_controller
481                    .finish_create_subscription_catalog(*subscription_id)
482                    .await?
483            }
484            Command::DropSubscription { .. } => {}
485            Command::ListFinish { .. } | Command::LoadFinish { .. } | Command::Refresh { .. } => {}
486            Command::ResetSource { .. } => {}
487        }
488
489        Ok(())
490    }
491}