risingwave_meta/barrier/context/
context_impl.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use anyhow::Context;
19use risingwave_common::catalog::{DatabaseId, FragmentTypeFlag, TableId};
20use risingwave_common::id::JobId;
21use risingwave_meta_model::ActorId;
22use risingwave_meta_model::streaming_job::BackfillOrders;
23use risingwave_pb::common::WorkerNode;
24use risingwave_pb::hummock::HummockVersionStats;
25use risingwave_pb::id::SourceId;
26use risingwave_pb::stream_service::barrier_complete_response::{
27    PbListFinishedSource, PbLoadFinishedSource,
28};
29use risingwave_pb::stream_service::streaming_control_stream_request::PbInitRequest;
30use risingwave_rpc_client::StreamingControlHandle;
31
32use crate::barrier::cdc_progress::CdcTableBackfillTracker;
33use crate::barrier::command::{PostCollectCommand, ResumeBackfillTarget};
34use crate::barrier::context::{GlobalBarrierWorkerContext, GlobalBarrierWorkerContextImpl};
35use crate::barrier::progress::TrackingJob;
36use crate::barrier::schedule::MarkReadyOptions;
37use crate::barrier::{
38    BarrierManagerStatus, BarrierWorkerRuntimeInfoSnapshot, Command, CreateStreamingJobCommandInfo,
39    CreateStreamingJobType, DatabaseRuntimeInfoSnapshot, RecoveryReason, ReplaceStreamJobPlan,
40    Scheduled,
41};
42use crate::hummock::CommitEpochInfo;
43use crate::manager::LocalNotification;
44use crate::model::FragmentDownstreamRelation;
45use crate::stream::SourceChange;
46use crate::{MetaError, MetaResult};
47
48impl GlobalBarrierWorkerContext for GlobalBarrierWorkerContextImpl {
49    #[await_tree::instrument]
50    async fn commit_epoch(&self, commit_info: CommitEpochInfo) -> MetaResult<HummockVersionStats> {
51        self.hummock_manager.commit_epoch(commit_info).await?;
52        Ok(self.hummock_manager.get_version_stats().await)
53    }
54
55    #[await_tree::instrument("next_scheduled_barrier")]
56    async fn next_scheduled(&self) -> Scheduled {
57        self.scheduled_barriers.next_scheduled().await
58    }
59
60    fn abort_and_mark_blocked(
61        &self,
62        database_id: Option<DatabaseId>,
63        recovery_reason: RecoveryReason,
64    ) {
65        if database_id.is_none() {
66            self.set_status(BarrierManagerStatus::Recovering(recovery_reason));
67        }
68
69        // Mark blocked and abort buffered schedules, they might be dirty already.
70        self.scheduled_barriers
71            .abort_and_mark_blocked(database_id, "cluster is under recovering");
72    }
73
74    fn mark_ready(&self, options: MarkReadyOptions) {
75        let is_global = matches!(&options, MarkReadyOptions::Global { .. });
76        self.scheduled_barriers.mark_ready(options);
77        if is_global {
78            self.set_status(BarrierManagerStatus::Running);
79        }
80    }
81
82    #[await_tree::instrument("post_collect_command({command})")]
83    async fn post_collect_command(&self, command: PostCollectCommand) -> MetaResult<()> {
84        command.post_collect(self).await
85    }
86
87    async fn notify_creating_job_failed(&self, database_id: Option<DatabaseId>, err: String) {
88        self.metadata_manager
89            .notify_finish_failed(database_id, err)
90            .await
91    }
92
93    #[await_tree::instrument("finish_creating_job({job})")]
94    async fn finish_creating_job(&self, job: TrackingJob) -> MetaResult<()> {
95        let job_id = job.job_id();
96        job.finish(&self.metadata_manager, &self.source_manager)
97            .await?;
98        self.env
99            .notification_manager()
100            .notify_local_subscribers(LocalNotification::StreamingJobBackfillFinished(job_id));
101        Ok(())
102    }
103
104    #[await_tree::instrument("finish_cdc_table_backfill({job})")]
105    async fn finish_cdc_table_backfill(&self, job: JobId) -> MetaResult<()> {
106        CdcTableBackfillTracker::mark_complete_job(&self.env.meta_store().conn, job).await
107    }
108
109    #[await_tree::instrument("new_control_stream({})", node.id)]
110    async fn new_control_stream(
111        &self,
112        node: &WorkerNode,
113        init_request: &PbInitRequest,
114    ) -> MetaResult<StreamingControlHandle> {
115        self.new_control_stream_impl(node, init_request).await
116    }
117
118    async fn reload_runtime_info(&self) -> MetaResult<BarrierWorkerRuntimeInfoSnapshot> {
119        self.reload_runtime_info_impl().await
120    }
121
122    async fn reload_database_runtime_info(
123        &self,
124        database_id: DatabaseId,
125    ) -> MetaResult<DatabaseRuntimeInfoSnapshot> {
126        self.reload_database_runtime_info_impl(database_id).await
127    }
128
129    async fn handle_list_finished_source_ids(
130        &self,
131        list_finished: Vec<PbListFinishedSource>,
132    ) -> MetaResult<()> {
133        let mut list_finished_info: HashMap<(TableId, SourceId), HashSet<ActorId>> = HashMap::new();
134
135        for list_finished in list_finished {
136            let table_id = list_finished.table_id;
137            let associated_source_id = list_finished.associated_source_id;
138            list_finished_info
139                .entry((table_id, associated_source_id))
140                .or_default()
141                .insert(list_finished.reporter_actor_id);
142        }
143
144        for ((table_id, associated_source_id), actors) in list_finished_info {
145            let allow_yield = self
146                .refresh_manager
147                .mark_list_stage_finished(table_id, &actors)?;
148
149            if !allow_yield {
150                continue;
151            }
152
153            // Find the database ID for this table
154            let database_id = self
155                .metadata_manager
156                .catalog_controller
157                .get_object_database_id(associated_source_id)
158                .await
159                .context("Failed to get database id for table")?;
160
161            // Create ListFinish command
162            let list_finish_command = Command::ListFinish {
163                table_id,
164                associated_source_id,
165            };
166
167            // Schedule the command through the barrier system without waiting
168            self.barrier_scheduler
169                .run_command_no_wait(database_id, list_finish_command)
170                .context("Failed to schedule ListFinish command")?;
171
172            tracing::info!(
173                %table_id,
174                %associated_source_id,
175                "ListFinish command scheduled successfully"
176            );
177        }
178        Ok(())
179    }
180
181    async fn handle_load_finished_source_ids(
182        &self,
183        load_finished: Vec<PbLoadFinishedSource>,
184    ) -> MetaResult<()> {
185        let mut load_finished_info: HashMap<(TableId, SourceId), HashSet<ActorId>> = HashMap::new();
186
187        for load_finished in load_finished {
188            let table_id = load_finished.table_id;
189            let associated_source_id = load_finished.associated_source_id;
190            load_finished_info
191                .entry((table_id, associated_source_id))
192                .or_default()
193                .insert(load_finished.reporter_actor_id);
194        }
195
196        for ((table_id, associated_source_id), actors) in load_finished_info {
197            let allow_yield = self
198                .refresh_manager
199                .mark_load_stage_finished(table_id, &actors)?;
200
201            if !allow_yield {
202                continue;
203            }
204
205            // Find the database ID for this table
206            let database_id = self
207                .metadata_manager
208                .catalog_controller
209                .get_object_database_id(associated_source_id)
210                .await
211                .context("Failed to get database id for table")?;
212
213            // Create LoadFinish command
214            let load_finish_command = Command::LoadFinish {
215                table_id,
216                associated_source_id,
217            };
218
219            // Schedule the command through the barrier system without waiting
220            self.barrier_scheduler
221                .run_command_no_wait(database_id, load_finish_command)
222                .context("Failed to schedule LoadFinish command")?;
223
224            tracing::info!(
225                %table_id,
226                %associated_source_id,
227                "LoadFinish command scheduled successfully"
228            );
229        }
230
231        Ok(())
232    }
233
234    async fn handle_refresh_finished_table_ids(
235        &self,
236        refresh_finished_table_job_ids: Vec<JobId>,
237    ) -> MetaResult<()> {
238        for job_id in refresh_finished_table_job_ids {
239            let table_id = job_id.as_mv_table_id();
240
241            self.refresh_manager.mark_refresh_complete(table_id).await?;
242        }
243
244        Ok(())
245    }
246}
247
248impl GlobalBarrierWorkerContextImpl {
249    fn set_status(&self, new_status: BarrierManagerStatus) {
250        self.status.store(Arc::new(new_status));
251    }
252}
253
254impl PostCollectCommand {
255    /// Do some stuffs after barriers are collected and the new storage version is committed, for
256    /// the given command.
257    pub async fn post_collect(
258        self,
259        barrier_manager_context: &GlobalBarrierWorkerContextImpl,
260    ) -> MetaResult<()> {
261        match self {
262            PostCollectCommand::Command(_) => {}
263            PostCollectCommand::SourceChangeSplit {
264                split_assignment: assignment,
265            } => {
266                barrier_manager_context
267                    .metadata_manager
268                    .update_fragment_splits(&assignment)
269                    .await?;
270            }
271
272            PostCollectCommand::DropStreamingJobs {
273                streaming_job_ids,
274                unregistered_state_table_ids,
275            } => {
276                for job_id in streaming_job_ids {
277                    barrier_manager_context
278                        .refresh_manager
279                        .remove_progress_tracker(job_id.as_mv_table_id(), "drop_streaming_jobs");
280                }
281
282                barrier_manager_context
283                    .hummock_manager
284                    .unregister_table_ids(unregistered_state_table_ids.iter().cloned())
285                    .await?;
286                barrier_manager_context
287                    .metadata_manager
288                    .catalog_controller
289                    .complete_dropped_tables(unregistered_state_table_ids.iter().copied())
290                    .await;
291            }
292            PostCollectCommand::ConnectorPropsChange(obj_id_map_props) => {
293                // todo: we dont know the type of the object id, it can be a source or a sink. Should carry more info in the barrier command.
294                barrier_manager_context
295                    .source_manager
296                    .apply_source_change(SourceChange::UpdateSourceProps {
297                        // Only sources are managed in source manager. Convert object IDs to source IDs and let
298                        // source manager ignore unknown/unregistered sources.
299                        source_id_map_new_props: obj_id_map_props
300                            .iter()
301                            .map(|(object_id, props)| (object_id.as_source_id(), props.clone()))
302                            .collect(),
303                    })
304                    .await;
305            }
306            PostCollectCommand::ResumeBackfill { target } => match target {
307                ResumeBackfillTarget::Job(job_id) => {
308                    barrier_manager_context
309                        .metadata_manager
310                        .catalog_controller
311                        .update_backfill_orders_by_job_id(job_id, None)
312                        .await?;
313                }
314                ResumeBackfillTarget::Fragment(fragment_id) => {
315                    let mut job_ids = barrier_manager_context
316                        .metadata_manager
317                        .catalog_controller
318                        .get_fragment_job_id(vec![fragment_id])
319                        .await?;
320                    let job_id = job_ids.pop().ok_or_else(|| {
321                        MetaError::invalid_parameter("fragment not found".to_owned())
322                    })?;
323                    let job_id = JobId::new(job_id.as_raw_id());
324
325                    let extra_info = barrier_manager_context
326                        .metadata_manager
327                        .catalog_controller
328                        .get_streaming_job_extra_info(vec![job_id])
329                        .await?;
330                    let mut backfill_orders: BackfillOrders = extra_info
331                        .get(&job_id)
332                        .cloned()
333                        .ok_or_else(|| MetaError::invalid_parameter("job not found".to_owned()))?
334                        .backfill_orders
335                        .unwrap_or_default();
336
337                    let resumed_fragment_id = fragment_id.as_raw_id();
338                    for children in backfill_orders.0.values_mut() {
339                        children.retain(|child| *child != resumed_fragment_id);
340                    }
341                    backfill_orders.0.retain(|_, children| !children.is_empty());
342
343                    barrier_manager_context
344                        .metadata_manager
345                        .catalog_controller
346                        .update_backfill_orders_by_job_id(job_id, Some(backfill_orders))
347                        .await?;
348                }
349            },
350            PostCollectCommand::CreateStreamingJob {
351                info,
352                job_type,
353                cross_db_snapshot_backfill_info,
354            } => {
355                match &job_type {
356                    CreateStreamingJobType::SinkIntoTable(_) | CreateStreamingJobType::Normal => {
357                        barrier_manager_context
358                            .metadata_manager
359                            .catalog_controller
360                            .fill_snapshot_backfill_epoch(
361                                info.stream_job_fragments.fragments.iter().filter_map(
362                                    |(fragment_id, fragment)| {
363                                        if fragment.fragment_type_mask.contains(
364                                            FragmentTypeFlag::CrossDbSnapshotBackfillStreamScan,
365                                        ) {
366                                            Some(*fragment_id as _)
367                                        } else {
368                                            None
369                                        }
370                                    },
371                                ),
372                                None,
373                                &cross_db_snapshot_backfill_info,
374                            )
375                            .await?
376                    }
377                    CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info) => {
378                        barrier_manager_context
379                            .metadata_manager
380                            .catalog_controller
381                            .fill_snapshot_backfill_epoch(
382                                info.stream_job_fragments.fragments.iter().filter_map(
383                                    |(fragment_id, fragment)| {
384                                        if fragment.fragment_type_mask.contains_any([
385                                            FragmentTypeFlag::SnapshotBackfillStreamScan,
386                                            FragmentTypeFlag::CrossDbSnapshotBackfillStreamScan,
387                                        ]) {
388                                            Some(*fragment_id as _)
389                                        } else {
390                                            None
391                                        }
392                                    },
393                                ),
394                                Some(snapshot_backfill_info),
395                                &cross_db_snapshot_backfill_info,
396                            )
397                            .await?
398                    }
399                }
400
401                // Do `post_collect_job_fragments` of the original streaming job in the end, so that in any previous failure,
402                // we won't mark the job as `Creating`, and then the job will be later clean by the recovery triggered by the returned error.
403                let CreateStreamingJobCommandInfo {
404                    stream_job_fragments,
405                    upstream_fragment_downstreams,
406                    ..
407                } = info;
408                let new_sink_downstream =
409                    if let CreateStreamingJobType::SinkIntoTable(ctx) = job_type {
410                        let new_downstreams = ctx.new_sink_downstream.clone();
411                        let new_downstreams = FragmentDownstreamRelation::from([(
412                            ctx.sink_fragment_id,
413                            vec![new_downstreams],
414                        )]);
415                        Some(new_downstreams)
416                    } else {
417                        None
418                    };
419
420                barrier_manager_context
421                    .metadata_manager
422                    .catalog_controller
423                    .post_collect_job_fragments(
424                        stream_job_fragments.stream_job_id(),
425                        &upstream_fragment_downstreams,
426                        new_sink_downstream,
427                        Some(&info.init_split_assignment),
428                    )
429                    .await?;
430
431                let source_change = SourceChange::CreateJob {
432                    added_source_fragments: stream_job_fragments.stream_source_fragments(),
433                    added_backfill_fragments: stream_job_fragments.source_backfill_fragments(),
434                };
435
436                barrier_manager_context
437                    .source_manager
438                    .apply_source_change(source_change)
439                    .await;
440            }
441            PostCollectCommand::Reschedule { reschedules, .. } => {
442                let fragment_splits = reschedules
443                    .iter()
444                    .map(|(fragment_id, reschedule)| {
445                        (*fragment_id, reschedule.actor_splits.clone())
446                    })
447                    .collect();
448
449                barrier_manager_context
450                    .metadata_manager
451                    .update_fragment_splits(&fragment_splits)
452                    .await?;
453            }
454
455            PostCollectCommand::ReplaceStreamJob(replace_plan) => {
456                let ReplaceStreamJobPlan {
457                    old_fragments,
458                    new_fragments,
459                    upstream_fragment_downstreams,
460                    to_drop_state_table_ids,
461                    auto_refresh_schema_sinks,
462                    init_split_assignment,
463                    ..
464                } = &replace_plan;
465                // Update actors and actor_dispatchers for new table fragments.
466                barrier_manager_context
467                    .metadata_manager
468                    .catalog_controller
469                    .post_collect_job_fragments(
470                        new_fragments.stream_job_id,
471                        upstream_fragment_downstreams,
472                        None,
473                        Some(init_split_assignment),
474                    )
475                    .await?;
476
477                if let Some(sinks) = auto_refresh_schema_sinks {
478                    for sink in sinks {
479                        barrier_manager_context
480                            .metadata_manager
481                            .catalog_controller
482                            .post_collect_job_fragments(
483                                sink.tmp_sink_id.as_job_id(),
484                                &Default::default(), // upstream_fragment_downstreams is already inserted in the job of upstream table
485                                None, // no replace plan
486                                None, // no init split assignment
487                            )
488                            .await?;
489                    }
490                }
491
492                // Apply the split changes in source manager.
493                barrier_manager_context
494                    .source_manager
495                    .handle_replace_job(
496                        old_fragments,
497                        new_fragments.stream_source_fragments(),
498                        &replace_plan,
499                    )
500                    .await;
501                barrier_manager_context
502                    .hummock_manager
503                    .unregister_table_ids(to_drop_state_table_ids.iter().cloned())
504                    .await?;
505            }
506
507            PostCollectCommand::CreateSubscription { subscription_id } => {
508                barrier_manager_context
509                    .metadata_manager
510                    .catalog_controller
511                    .finish_create_subscription_catalog(subscription_id)
512                    .await?
513            }
514        }
515
516        Ok(())
517    }
518}