risingwave_meta/barrier/context/
context_impl.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use anyhow::Context;
19use risingwave_common::catalog::{DatabaseId, FragmentTypeFlag, TableId};
20use risingwave_common::id::JobId;
21use risingwave_meta_model::ActorId;
22use risingwave_meta_model::streaming_job::BackfillOrders;
23use risingwave_pb::common::WorkerNode;
24use risingwave_pb::hummock::HummockVersionStats;
25use risingwave_pb::id::SourceId;
26use risingwave_pb::stream_service::barrier_complete_response::{
27    PbListFinishedSource, PbLoadFinishedSource,
28};
29use risingwave_pb::stream_service::streaming_control_stream_request::PbInitRequest;
30use risingwave_rpc_client::StreamingControlHandle;
31
32use crate::barrier::cdc_progress::CdcTableBackfillTracker;
33use crate::barrier::command::{PostCollectCommand, ResumeBackfillTarget};
34use crate::barrier::context::{GlobalBarrierWorkerContext, GlobalBarrierWorkerContextImpl};
35use crate::barrier::progress::TrackingJob;
36use crate::barrier::schedule::MarkReadyOptions;
37use crate::barrier::{
38    BarrierManagerStatus, BarrierWorkerRuntimeInfoSnapshot, Command, CreateStreamingJobCommandInfo,
39    CreateStreamingJobType, DatabaseRuntimeInfoSnapshot, RecoveryReason, ReplaceStreamJobPlan,
40    Scheduled,
41};
42use crate::hummock::CommitEpochInfo;
43use crate::manager::LocalNotification;
44use crate::model::FragmentDownstreamRelation;
45use crate::stream::{SourceChange, cleanup_dropped_streaming_jobs};
46use crate::{MetaError, MetaResult};
47
48impl GlobalBarrierWorkerContext for GlobalBarrierWorkerContextImpl {
49    #[await_tree::instrument]
50    async fn commit_epoch(&self, commit_info: CommitEpochInfo) -> MetaResult<HummockVersionStats> {
51        self.hummock_manager.commit_epoch(commit_info).await?;
52        Ok(self.hummock_manager.get_version_stats().await)
53    }
54
55    #[await_tree::instrument("next_scheduled_barrier")]
56    async fn next_scheduled(&self) -> Scheduled {
57        self.scheduled_barriers.next_scheduled().await
58    }
59
60    fn abort_and_mark_blocked(
61        &self,
62        database_id: Option<DatabaseId>,
63        recovery_reason: RecoveryReason,
64    ) {
65        if database_id.is_none() {
66            self.set_status(BarrierManagerStatus::Recovering(recovery_reason));
67        }
68
69        // Mark blocked and abort buffered schedules, they might be dirty already.
70        self.scheduled_barriers
71            .abort_and_mark_blocked(database_id, "cluster is under recovering");
72    }
73
74    fn mark_ready(&self, options: MarkReadyOptions) {
75        let is_global = matches!(&options, MarkReadyOptions::Global { .. });
76        self.scheduled_barriers.mark_ready(options);
77        if is_global {
78            self.set_status(BarrierManagerStatus::Running);
79        }
80    }
81
82    #[await_tree::instrument("post_collect_command({command})")]
83    async fn post_collect_command(&self, command: PostCollectCommand) -> MetaResult<()> {
84        command.post_collect(self).await
85    }
86
87    async fn notify_creating_job_failed(&self, database_id: Option<DatabaseId>, err: String) {
88        self.metadata_manager
89            .notify_finish_failed(database_id, err)
90            .await
91    }
92
93    #[await_tree::instrument("finish_creating_job({job})")]
94    async fn finish_creating_job(&self, job: TrackingJob) -> MetaResult<()> {
95        let job_id = job.job_id();
96        job.finish(&self.metadata_manager, &self.source_manager)
97            .await?;
98        self.env
99            .notification_manager()
100            .notify_local_subscribers(LocalNotification::StreamingJobBackfillFinished(job_id));
101        Ok(())
102    }
103
104    #[await_tree::instrument("finish_cdc_table_backfill({job})")]
105    async fn finish_cdc_table_backfill(&self, job: JobId) -> MetaResult<()> {
106        CdcTableBackfillTracker::mark_complete_job(&self.env.meta_store().conn, job).await
107    }
108
109    #[await_tree::instrument("new_control_stream({})", node.id)]
110    async fn new_control_stream(
111        &self,
112        node: &WorkerNode,
113        init_request: &PbInitRequest,
114    ) -> MetaResult<StreamingControlHandle> {
115        self.new_control_stream_impl(node, init_request).await
116    }
117
118    async fn reload_runtime_info(&self) -> MetaResult<BarrierWorkerRuntimeInfoSnapshot> {
119        self.reload_runtime_info_impl().await
120    }
121
122    async fn reload_database_runtime_info(
123        &self,
124        database_id: DatabaseId,
125    ) -> MetaResult<DatabaseRuntimeInfoSnapshot> {
126        self.reload_database_runtime_info_impl(database_id).await
127    }
128
129    async fn handle_list_finished_source_ids(
130        &self,
131        list_finished: Vec<PbListFinishedSource>,
132    ) -> MetaResult<()> {
133        let mut list_finished_info: HashMap<(TableId, SourceId), HashSet<ActorId>> = HashMap::new();
134
135        for list_finished in list_finished {
136            let table_id = list_finished.table_id;
137            let associated_source_id = list_finished.associated_source_id;
138            list_finished_info
139                .entry((table_id, associated_source_id))
140                .or_default()
141                .insert(list_finished.reporter_actor_id);
142        }
143
144        for ((table_id, associated_source_id), actors) in list_finished_info {
145            let allow_yield = self
146                .refresh_manager
147                .mark_list_stage_finished(table_id, &actors)?;
148
149            if !allow_yield {
150                continue;
151            }
152
153            let Some(database_id) = self
154                .get_source_database_id_for_refresh_stage(table_id, associated_source_id, "list")
155                .await?
156            else {
157                continue;
158            };
159
160            // Create ListFinish command
161            let list_finish_command = Command::ListFinish {
162                table_id,
163                associated_source_id,
164            };
165
166            // Schedule the command through the barrier system without waiting
167            self.barrier_scheduler
168                .run_command_no_wait(database_id, list_finish_command)
169                .context("Failed to schedule ListFinish command")?;
170
171            tracing::info!(
172                %table_id,
173                %associated_source_id,
174                "ListFinish command scheduled successfully"
175            );
176        }
177        Ok(())
178    }
179
180    async fn handle_load_finished_source_ids(
181        &self,
182        load_finished: Vec<PbLoadFinishedSource>,
183    ) -> MetaResult<()> {
184        let mut load_finished_info: HashMap<(TableId, SourceId), HashSet<ActorId>> = HashMap::new();
185
186        for load_finished in load_finished {
187            let table_id = load_finished.table_id;
188            let associated_source_id = load_finished.associated_source_id;
189            load_finished_info
190                .entry((table_id, associated_source_id))
191                .or_default()
192                .insert(load_finished.reporter_actor_id);
193        }
194
195        for ((table_id, associated_source_id), actors) in load_finished_info {
196            let allow_yield = self
197                .refresh_manager
198                .mark_load_stage_finished(table_id, &actors)?;
199
200            if !allow_yield {
201                continue;
202            }
203
204            let Some(database_id) = self
205                .get_source_database_id_for_refresh_stage(table_id, associated_source_id, "load")
206                .await?
207            else {
208                continue;
209            };
210
211            // Create LoadFinish command
212            let load_finish_command = Command::LoadFinish {
213                table_id,
214                associated_source_id,
215            };
216
217            // Schedule the command through the barrier system without waiting
218            self.barrier_scheduler
219                .run_command_no_wait(database_id, load_finish_command)
220                .context("Failed to schedule LoadFinish command")?;
221
222            tracing::info!(
223                %table_id,
224                %associated_source_id,
225                "LoadFinish command scheduled successfully"
226            );
227        }
228
229        Ok(())
230    }
231
232    async fn handle_refresh_finished_table_ids(
233        &self,
234        refresh_finished_table_job_ids: Vec<JobId>,
235    ) -> MetaResult<()> {
236        for job_id in refresh_finished_table_job_ids {
237            let table_id = job_id.as_mv_table_id();
238
239            self.refresh_manager.mark_refresh_complete(table_id).await?;
240        }
241
242        Ok(())
243    }
244}
245
246impl GlobalBarrierWorkerContextImpl {
247    async fn get_source_database_id_for_refresh_stage(
248        &self,
249        table_id: TableId,
250        associated_source_id: SourceId,
251        stage: &'static str,
252    ) -> MetaResult<Option<DatabaseId>> {
253        match self
254            .metadata_manager
255            .catalog_controller
256            .get_object_database_id(associated_source_id)
257            .await
258        {
259            Ok(database_id) => Ok(Some(database_id)),
260            Err(err) if err.is_catalog_id_not_found("object") => {
261                tracing::warn!(
262                    %table_id,
263                    %associated_source_id,
264                    stage,
265                    "skip refresh finish command because associated source is already dropped"
266                );
267                Ok(None)
268            }
269            Err(err) => Err(err)
270                .with_context(|| {
271                    format!(
272                        "failed to get database id for refresh stage: table_id={}, associated_source_id={}, stage={stage}",
273                        table_id, associated_source_id
274                    )
275                })
276                .map_err(Into::into),
277        }
278    }
279
280    fn set_status(&self, new_status: BarrierManagerStatus) {
281        self.status.store(Arc::new(new_status));
282    }
283}
284
285impl PostCollectCommand {
286    /// Do some stuffs after barriers are collected and the new storage version is committed, for
287    /// the given command.
288    pub async fn post_collect(
289        self,
290        barrier_manager_context: &GlobalBarrierWorkerContextImpl,
291    ) -> MetaResult<()> {
292        match self {
293            PostCollectCommand::Command(_) => {}
294            PostCollectCommand::SourceChangeSplit {
295                split_assignment: assignment,
296            } => {
297                barrier_manager_context
298                    .metadata_manager
299                    .update_fragment_splits(&assignment)
300                    .await?;
301            }
302
303            PostCollectCommand::DropStreamingJobs => {}
304            PostCollectCommand::ConnectorPropsChange(obj_id_map_props) => {
305                // todo: we dont know the type of the object id, it can be a source or a sink. Should carry more info in the barrier command.
306                barrier_manager_context
307                    .source_manager
308                    .apply_source_change(SourceChange::UpdateSourceProps {
309                        // Only sources are managed in source manager. Convert object IDs to source IDs and let
310                        // source manager ignore unknown/unregistered sources.
311                        source_id_map_new_props: obj_id_map_props
312                            .iter()
313                            .map(|(object_id, props)| (object_id.as_source_id(), props.clone()))
314                            .collect(),
315                    })
316                    .await;
317            }
318            PostCollectCommand::ResumeBackfill { target } => match target {
319                ResumeBackfillTarget::Job(job_id) => {
320                    barrier_manager_context
321                        .metadata_manager
322                        .catalog_controller
323                        .update_backfill_orders_by_job_id(job_id, None)
324                        .await?;
325                }
326                ResumeBackfillTarget::Fragment(fragment_id) => {
327                    let mut job_ids = barrier_manager_context
328                        .metadata_manager
329                        .catalog_controller
330                        .get_fragment_job_id(vec![fragment_id])
331                        .await?;
332                    let job_id = job_ids.pop().ok_or_else(|| {
333                        MetaError::invalid_parameter("fragment not found".to_owned())
334                    })?;
335                    let job_id = JobId::new(job_id.as_raw_id());
336
337                    let extra_info = barrier_manager_context
338                        .metadata_manager
339                        .catalog_controller
340                        .get_streaming_job_extra_info(vec![job_id])
341                        .await?;
342                    let mut backfill_orders: BackfillOrders = extra_info
343                        .get(&job_id)
344                        .cloned()
345                        .ok_or_else(|| MetaError::invalid_parameter("job not found".to_owned()))?
346                        .backfill_orders
347                        .unwrap_or_default();
348
349                    let resumed_fragment_id = fragment_id.as_raw_id();
350                    for children in backfill_orders.0.values_mut() {
351                        children.retain(|child| *child != resumed_fragment_id);
352                    }
353                    backfill_orders.0.retain(|_, children| !children.is_empty());
354
355                    barrier_manager_context
356                        .metadata_manager
357                        .catalog_controller
358                        .update_backfill_orders_by_job_id(job_id, Some(backfill_orders))
359                        .await?;
360                }
361            },
362            PostCollectCommand::CreateStreamingJob {
363                info,
364                job_type,
365                cross_db_snapshot_backfill_info,
366                resolved_split_assignment,
367            } => {
368                match &job_type {
369                    CreateStreamingJobType::SinkIntoTable(_) | CreateStreamingJobType::Normal => {
370                        barrier_manager_context
371                            .metadata_manager
372                            .catalog_controller
373                            .fill_snapshot_backfill_epoch(
374                                info.stream_job_fragments.fragments.iter().filter_map(
375                                    |(fragment_id, fragment)| {
376                                        if fragment.fragment_type_mask.contains(
377                                            FragmentTypeFlag::CrossDbSnapshotBackfillStreamScan,
378                                        ) {
379                                            Some(*fragment_id as _)
380                                        } else {
381                                            None
382                                        }
383                                    },
384                                ),
385                                None,
386                                &cross_db_snapshot_backfill_info,
387                            )
388                            .await?
389                    }
390                    CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info) => {
391                        barrier_manager_context
392                            .metadata_manager
393                            .catalog_controller
394                            .fill_snapshot_backfill_epoch(
395                                info.stream_job_fragments.fragments.iter().filter_map(
396                                    |(fragment_id, fragment)| {
397                                        if fragment.fragment_type_mask.contains_any([
398                                            FragmentTypeFlag::SnapshotBackfillStreamScan,
399                                            FragmentTypeFlag::CrossDbSnapshotBackfillStreamScan,
400                                        ]) {
401                                            Some(*fragment_id as _)
402                                        } else {
403                                            None
404                                        }
405                                    },
406                                ),
407                                Some(snapshot_backfill_info),
408                                &cross_db_snapshot_backfill_info,
409                            )
410                            .await?
411                    }
412                }
413
414                // Do `post_collect_job_fragments` of the original streaming job in the end, so that in any previous failure,
415                // we won't mark the job as `Creating`, and then the job will be later clean by the recovery triggered by the returned error.
416                let CreateStreamingJobCommandInfo {
417                    stream_job_fragments,
418                    upstream_fragment_downstreams,
419                    ..
420                } = info;
421                let new_sink_downstream =
422                    if let CreateStreamingJobType::SinkIntoTable(ctx) = job_type {
423                        let new_downstreams = ctx.new_sink_downstream.clone();
424                        let new_downstreams = FragmentDownstreamRelation::from([(
425                            ctx.sink_fragment_id,
426                            vec![new_downstreams],
427                        )]);
428                        Some(new_downstreams)
429                    } else {
430                        None
431                    };
432
433                barrier_manager_context
434                    .metadata_manager
435                    .catalog_controller
436                    .post_collect_job_fragments(
437                        stream_job_fragments.stream_job_id(),
438                        &upstream_fragment_downstreams,
439                        new_sink_downstream,
440                        Some(&resolved_split_assignment),
441                    )
442                    .await?;
443
444                let source_change = SourceChange::CreateJob {
445                    added_source_fragments: stream_job_fragments.stream_source_fragments(),
446                    added_backfill_fragments: stream_job_fragments.source_backfill_fragments(),
447                };
448
449                barrier_manager_context
450                    .source_manager
451                    .apply_source_change(source_change)
452                    .await;
453            }
454            PostCollectCommand::Reschedule { reschedules, .. } => {
455                let fragment_splits = reschedules
456                    .iter()
457                    .map(|(fragment_id, reschedule)| {
458                        (*fragment_id, reschedule.actor_splits.clone())
459                    })
460                    .collect();
461
462                barrier_manager_context
463                    .metadata_manager
464                    .update_fragment_splits(&fragment_splits)
465                    .await?;
466            }
467
468            PostCollectCommand::ReplaceStreamJob {
469                plan: replace_plan,
470                resolved_split_assignment,
471            } => {
472                let ReplaceStreamJobPlan {
473                    old_fragments,
474                    new_fragments,
475                    upstream_fragment_downstreams,
476                    to_drop_state_table_ids,
477                    auto_refresh_schema_sinks,
478                    ..
479                } = &replace_plan;
480                // Update actors and actor_dispatchers for new table fragments.
481                barrier_manager_context
482                    .metadata_manager
483                    .catalog_controller
484                    .post_collect_job_fragments(
485                        new_fragments.stream_job_id,
486                        upstream_fragment_downstreams,
487                        None,
488                        Some(&resolved_split_assignment),
489                    )
490                    .await?;
491
492                if let Some(sinks) = auto_refresh_schema_sinks {
493                    for sink in sinks {
494                        barrier_manager_context
495                            .metadata_manager
496                            .catalog_controller
497                            .post_collect_job_fragments(
498                                sink.tmp_sink_id.as_job_id(),
499                                &Default::default(), // upstream_fragment_downstreams is already inserted in the job of upstream table
500                                None, // no replace plan
501                                None, // no init split assignment
502                            )
503                            .await?;
504                    }
505                }
506
507                // Apply the split changes in source manager.
508                barrier_manager_context
509                    .source_manager
510                    .handle_replace_job(
511                        old_fragments,
512                        new_fragments.stream_source_fragments(),
513                        &replace_plan,
514                    )
515                    .await;
516                cleanup_dropped_streaming_jobs(
517                    &barrier_manager_context.refresh_manager,
518                    &barrier_manager_context.hummock_manager,
519                    &barrier_manager_context.metadata_manager,
520                    [],
521                    to_drop_state_table_ids.clone(),
522                    "replace_streaming_job",
523                )
524                .await?;
525            }
526
527            PostCollectCommand::CreateSubscription { subscription_id } => {
528                barrier_manager_context
529                    .metadata_manager
530                    .catalog_controller
531                    .finish_create_subscription_catalog(subscription_id)
532                    .await?
533            }
534        }
535
536        Ok(())
537    }
538}
539
540#[cfg(test)]
541mod tests {
542    use super::*;
543
544    #[test]
545    fn test_skip_refresh_finish_when_associated_source_missing() {
546        let err = MetaError::catalog_id_not_found("object", 42);
547        assert!(err.is_catalog_id_not_found("object"));
548    }
549
550    #[test]
551    fn test_do_not_skip_refresh_finish_for_other_not_found_types() {
552        let err = MetaError::catalog_id_not_found("table", 42);
553        assert!(!err.is_catalog_id_not_found("object"));
554    }
555}