risingwave_meta/stream/
cdc.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::iter;
17use std::time::Duration;
18
19use anyhow::Context;
20use futures::pin_mut;
21use futures_async_stream::for_await;
22use itertools::Itertools;
23use risingwave_common::bail;
24use risingwave_common::catalog::{FragmentTypeFlag, FragmentTypeMask, Schema};
25use risingwave_common::id::{ActorId, JobId};
26use risingwave_common::row::{OwnedRow, Row};
27use risingwave_common::util::iter_util::ZipEqDebug;
28use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont;
29use risingwave_connector::source::cdc::external::{
30    CdcTableSnapshotSplitOption, ExternalCdcTableType, ExternalTableConfig, ExternalTableReader,
31    SchemaTableName,
32};
33use risingwave_connector::source::cdc::{CdcScanOptions, build_cdc_table_snapshot_split};
34use risingwave_connector::source::{CdcTableSnapshotSplit, CdcTableSnapshotSplitRaw};
35use risingwave_meta_model::cdc_table_snapshot_split::Relation::Object;
36use risingwave_meta_model::{cdc_table_snapshot_split, object};
37use risingwave_meta_model_migration::JoinType;
38use risingwave_pb::id::{DatabaseId, TableId};
39use risingwave_pb::plan_common::ExternalTableDesc;
40use risingwave_pb::source::PbCdcTableSnapshotSplits;
41use risingwave_pb::stream_plan::stream_node::NodeBody;
42use risingwave_pb::stream_plan::{StreamCdcScanNode, StreamCdcScanOptions, StreamNode};
43use sea_orm::{
44    ColumnTrait, ConnectionTrait, EntityTrait, QueryFilter, QuerySelect, RelationTrait, Set,
45    TransactionTrait,
46};
47
48use crate::MetaResult;
49use crate::controller::SqlMetaStore;
50use crate::model::Fragment;
51
52/// A CDC table snapshot splits can only be successfully initialized once.
53/// Subsequent attempts to write to the metastore with the same primary key will be rejected.
54pub(crate) async fn try_init_parallel_cdc_table_snapshot_splits(
55    table_id: TableId,
56    table_desc: &ExternalTableDesc,
57    meta_store: &SqlMetaStore,
58    per_table_options: &StreamCdcScanOptions,
59    insert_batch_size: u64,
60    sleep_split_interval: u64,
61    sleep_duration_millis: u64,
62) -> MetaResult<Vec<CdcTableSnapshotSplitRaw>> {
63    let split_options = CdcTableSnapshotSplitOption {
64        backfill_num_rows_per_split: per_table_options.backfill_num_rows_per_split,
65        backfill_as_even_splits: per_table_options.backfill_as_even_splits,
66        backfill_split_pk_column_index: per_table_options.backfill_split_pk_column_index,
67    };
68    let table_type = ExternalCdcTableType::from_properties(&table_desc.connect_properties);
69    // Filter out additional columns to construct the external table schema
70    let table_schema: Schema = table_desc
71        .columns
72        .iter()
73        .filter(|col| {
74            col.additional_column
75                .as_ref()
76                .is_none_or(|a_col| a_col.column_type.is_none())
77        })
78        .map(Into::into)
79        .collect();
80    let table_pk_indices = table_desc
81        .pk
82        .iter()
83        .map(|k| k.column_index as usize)
84        .collect_vec();
85    let table_config = ExternalTableConfig::try_from_btreemap(
86        table_desc.connect_properties.clone(),
87        table_desc.secret_refs.clone(),
88    )
89    .context("failed to parse external table config")?;
90    let schema_table_name = SchemaTableName::from_properties(&table_desc.connect_properties);
91    let reader = table_type
92        .create_table_reader(
93            table_config,
94            table_schema,
95            table_pk_indices,
96            schema_table_name,
97        )
98        .await?;
99    let stream = reader.get_parallel_cdc_splits(split_options);
100    let mut insert_batch = vec![];
101    let mut splits_num = 0;
102    let mut splits = vec![];
103    let txn = meta_store.conn.begin().await?;
104    pin_mut!(stream);
105    #[for_await]
106    for split in stream {
107        let split: CdcTableSnapshotSplit = split?;
108        splits_num += 1;
109        let left = split.left_bound_inclusive.value_serialize();
110        let right = split.right_bound_exclusive.value_serialize();
111        insert_batch.push(cdc_table_snapshot_split::ActiveModel {
112            table_id: Set(table_id.as_job_id()),
113            split_id: Set(split.split_id.to_owned()),
114            left: Set(left.clone()),
115            right: Set(right.clone()),
116            is_backfill_finished: Set(0),
117        });
118        splits.push(CdcTableSnapshotSplitRaw {
119            split_id: split.split_id,
120            left_bound_inclusive: left,
121            right_bound_exclusive: right,
122        });
123        if insert_batch.len() >= insert_batch_size as usize {
124            cdc_table_snapshot_split::Entity::insert_many(std::mem::take(&mut insert_batch))
125                .exec(&txn)
126                .await?;
127        }
128        if splits_num % sleep_split_interval == 0 {
129            tokio::time::sleep(Duration::from_millis(sleep_duration_millis)).await;
130        }
131    }
132    if !insert_batch.is_empty() {
133        cdc_table_snapshot_split::Entity::insert_many(std::mem::take(&mut insert_batch))
134            .exec(&txn)
135            .await?;
136    }
137    txn.commit().await?;
138    // TODO(zw): handle metadata backup&restore
139    Ok(splits)
140}
141
142/// Returns some cdc scan node if the fragment is CDC scan and has parallelized backfill enabled.
143pub(crate) fn is_parallelized_backfill_enabled_cdc_scan_fragment(
144    fragment_type_mask: FragmentTypeMask,
145    nodes: &StreamNode,
146) -> Option<&StreamCdcScanNode> {
147    if !fragment_type_mask.contains(FragmentTypeFlag::StreamCdcScan) {
148        return None;
149    }
150    let mut stream_cdc_scan = None;
151    visit_stream_node_cont(nodes, |node| {
152        if let Some(NodeBody::StreamCdcScan(node)) = &node.node_body {
153            if is_parallelized_backfill_enabled(node) {
154                stream_cdc_scan = Some(&**node);
155            }
156            false
157        } else {
158            true
159        }
160    });
161    stream_cdc_scan
162}
163
164fn is_parallelized_backfill_enabled(node: &StreamCdcScanNode) -> bool {
165    if let Some(options) = &node.options
166        && CdcScanOptions::from_proto(options).is_parallelized_backfill()
167    {
168        return true;
169    }
170    false
171}
172
173pub(crate) fn parallel_cdc_table_backfill_fragment<'a>(
174    fragments: impl Iterator<Item = &'a Fragment>,
175) -> Option<(&'a Fragment, &'a StreamCdcScanNode)> {
176    let mut stream_scan_fragments = fragments.filter_map(|f| {
177        is_parallelized_backfill_enabled_cdc_scan_fragment(f.fragment_type_mask, &f.nodes)
178            .map(|cdc_scan| (f, cdc_scan))
179    });
180    let fragment = stream_scan_fragments.next()?;
181    assert_eq!(
182        stream_scan_fragments.count(),
183        0,
184        "Expect no remaining scan fragment",
185    );
186    Some(fragment)
187}
188
189pub(crate) fn assign_cdc_table_snapshot_splits(
190    actor_ids: HashSet<ActorId>,
191    splits: &[CdcTableSnapshotSplitRaw],
192    generation: u64,
193) -> MetaResult<HashMap<ActorId, PbCdcTableSnapshotSplits>> {
194    if actor_ids.is_empty() {
195        return Err(anyhow::anyhow!("Expect at least 1 actor, 0 was found.").into());
196    }
197    if splits.is_empty() {
198        return Err(
199            anyhow::anyhow!("Expect at least 1 CDC table snapshot splits, 0 was found.").into(),
200        );
201    }
202    let splits_per_actor = splits.len().div_ceil(actor_ids.len());
203    let mut assignments = HashMap::new();
204    for (actor_id, splits) in actor_ids.iter().copied().zip_eq_debug(
205        splits
206            .iter()
207            .map(build_cdc_table_snapshot_split)
208            .chunks(splits_per_actor)
209            .into_iter()
210            .map(|c| c.collect_vec())
211            .chain(iter::repeat(Vec::default()))
212            .take(actor_ids.len()),
213    ) {
214        assignments.insert(actor_id, PbCdcTableSnapshotSplits { splits, generation });
215    }
216    Ok(assignments)
217}
218
219#[derive(Debug)]
220pub enum CdcTableSnapshotSplits {
221    Backfilling(Vec<CdcTableSnapshotSplitRaw>),
222    Finished,
223}
224
225pub async fn reload_cdc_table_snapshot_splits(
226    txn: &impl ConnectionTrait,
227    database_id: Option<DatabaseId>,
228) -> MetaResult<HashMap<JobId, CdcTableSnapshotSplits>> {
229    let columns = [
230        cdc_table_snapshot_split::Column::TableId,
231        cdc_table_snapshot_split::Column::SplitId,
232        cdc_table_snapshot_split::Column::Left,
233        cdc_table_snapshot_split::Column::Right,
234        cdc_table_snapshot_split::Column::IsBackfillFinished,
235    ];
236    #[expect(clippy::type_complexity)]
237    let all_splits: Vec<(JobId, i64, Vec<u8>, Vec<u8>, i16)> =
238        if let Some(database_id) = database_id {
239            cdc_table_snapshot_split::Entity::find()
240                .join(JoinType::LeftJoin, Object.def())
241                .select_only()
242                .columns(columns)
243                .filter(object::Column::DatabaseId.eq(database_id))
244                .into_tuple()
245                .all(txn)
246                .await?
247        } else {
248            cdc_table_snapshot_split::Entity::find()
249                .select_only()
250                .columns(columns)
251                .into_tuple()
252                .all(txn)
253                .await?
254        };
255
256    let mut job_splits = HashMap::<_, Vec<_>>::new();
257    for (job_id, split_id, left, right, is_backfill_finished) in all_splits {
258        job_splits
259            .entry(job_id)
260            .or_default()
261            .push((split_id, left, right, is_backfill_finished));
262    }
263
264    job_splits
265        .into_iter()
266        .map(|(job_id, splits)| {
267            let splits = compose_job_splits(job_id, splits)?;
268            Ok((job_id, splits))
269        })
270        .try_collect()
271}
272
273pub fn compose_job_splits(
274    job_id: JobId,
275    splits: Vec<(i64, Vec<u8>, Vec<u8>, i16)>,
276) -> MetaResult<CdcTableSnapshotSplits> {
277    let split_completed_count = splits
278        .iter()
279        .filter(|(_, _, _, is_backfill_finished)| *is_backfill_finished == 1)
280        .count();
281    assert!(
282        split_completed_count <= 1,
283        "split_completed_count = {}",
284        split_completed_count
285    );
286    let is_backfill_finished = split_completed_count == 1;
287    let splits = if is_backfill_finished {
288        if splits.len() != 1 {
289            // CdcTableBackfillTracker::complete_job rewrites splits in a transaction.
290            // This error should only happen when the meta store reads uncommitted data.
291            tracing::error!(%job_id, ?splits, "unexpected split count");
292            bail!(
293                "unexpected split count: job_id={job_id}, split_total_count={}, split_completed_count={split_completed_count}",
294                splits.len()
295            );
296        }
297        CdcTableSnapshotSplits::Finished
298    } else {
299        let splits: Vec<_> = splits
300            .into_iter()
301            // The try_init_parallel_cdc_table_snapshot_splits ensures that split with a larger split_id will always have a larger left bound.
302            // Assigning consecutive splits to the same actor enables potential optimization in CDC backfill executor.
303            .sorted_by_key(|(split_id, _, _, _)| *split_id)
304            .map(
305                |(split_id, left_bound_inclusive, right_bound_exclusive, _)| {
306                    CdcTableSnapshotSplitRaw {
307                        split_id,
308                        left_bound_inclusive,
309                        right_bound_exclusive,
310                    }
311                },
312            )
313            .collect();
314        CdcTableSnapshotSplits::Backfilling(splits)
315    };
316    Ok(splits)
317}
318
319pub fn single_merged_split() -> CdcTableSnapshotSplitRaw {
320    CdcTableSnapshotSplitRaw {
321        // TODO(zw): remove magic number
322        split_id: 1,
323        left_bound_inclusive: OwnedRow::new(vec![None]).value_serialize(),
324        right_bound_exclusive: OwnedRow::new(vec![None]).value_serialize(),
325    }
326}