risingwave_meta/stream/
cdc.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::iter;
17use std::time::Duration;
18
19use anyhow::Context;
20use futures::pin_mut;
21use futures_async_stream::for_await;
22use itertools::Itertools;
23use risingwave_common::bail;
24use risingwave_common::catalog::Schema;
25use risingwave_common::row::{OwnedRow, Row};
26use risingwave_common::util::iter_util::ZipEqDebug;
27use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont;
28use risingwave_connector::source::cdc::external::{
29    CdcTableSnapshotSplitOption, ExternalCdcTableType, ExternalTableConfig, ExternalTableReader,
30    SchemaTableName,
31};
32use risingwave_connector::source::cdc::{CdcScanOptions, CdcTableSnapshotSplitAssignment};
33use risingwave_connector::source::{CdcTableSnapshotSplit, CdcTableSnapshotSplitRaw};
34use risingwave_meta_model::{TableId, cdc_table_snapshot_split};
35use risingwave_pb::plan_common::ExternalTableDesc;
36use risingwave_pb::stream_plan::stream_node::NodeBody;
37use risingwave_pb::stream_plan::{StreamCdcScanNode, StreamCdcScanOptions};
38use sea_orm::{EntityTrait, Set, TransactionTrait};
39
40use crate::MetaResult;
41use crate::controller::SqlMetaStore;
42use crate::model::{Fragment, StreamJobFragments};
43
44/// A CDC table snapshot splits can only be successfully initialized once.
45/// Subsequent attempts to write to the metastore with the same primary key will be rejected.
46pub(crate) async fn try_init_parallel_cdc_table_snapshot_splits(
47    table_id: u32,
48    table_desc: &ExternalTableDesc,
49    meta_store: &SqlMetaStore,
50    per_table_options: &Option<StreamCdcScanOptions>,
51    insert_batch_size: u64,
52    sleep_split_interval: u64,
53    sleep_duration_millis: u64,
54) -> MetaResult<()> {
55    let split_options = if let Some(per_table_options) = per_table_options {
56        if !CdcScanOptions::from_proto(per_table_options).is_parallelized_backfill() {
57            return Ok(());
58        }
59        CdcTableSnapshotSplitOption {
60            backfill_num_rows_per_split: per_table_options.backfill_num_rows_per_split,
61            backfill_as_even_splits: per_table_options.backfill_as_even_splits,
62            backfill_split_pk_column_index: per_table_options.backfill_split_pk_column_index,
63        }
64    } else {
65        return Ok(());
66    };
67    let table_type = ExternalCdcTableType::from_properties(&table_desc.connect_properties);
68    // Filter out additional columns to construct the external table schema
69    let table_schema: Schema = table_desc
70        .columns
71        .iter()
72        .filter(|col| {
73            col.additional_column
74                .as_ref()
75                .is_none_or(|a_col| a_col.column_type.is_none())
76        })
77        .map(Into::into)
78        .collect();
79    let table_pk_indices = table_desc
80        .pk
81        .iter()
82        .map(|k| k.column_index as usize)
83        .collect_vec();
84    let table_config = ExternalTableConfig::try_from_btreemap(
85        table_desc.connect_properties.clone(),
86        table_desc.secret_refs.clone(),
87    )
88    .context("failed to parse external table config")?;
89    let schema_table_name = SchemaTableName::from_properties(&table_desc.connect_properties);
90    let reader = table_type
91        .create_table_reader(
92            table_config,
93            table_schema,
94            table_pk_indices,
95            schema_table_name,
96        )
97        .await?;
98    let stream = reader.get_parallel_cdc_splits(split_options);
99    let mut insert_batch = vec![];
100    let mut splits_num = 0;
101    let txn = meta_store.conn.begin().await?;
102    pin_mut!(stream);
103    #[for_await]
104    for split in stream {
105        let split: CdcTableSnapshotSplit = split?;
106        splits_num += 1;
107        insert_batch.push(cdc_table_snapshot_split::ActiveModel {
108            table_id: Set(table_id.try_into().unwrap()),
109            split_id: Set(split.split_id.to_owned()),
110            left: Set(split.left_bound_inclusive.value_serialize()),
111            right: Set(split.right_bound_exclusive.value_serialize()),
112            is_backfill_finished: Set(0),
113        });
114        if insert_batch.len() >= insert_batch_size as usize {
115            cdc_table_snapshot_split::Entity::insert_many(std::mem::take(&mut insert_batch))
116                .exec(&txn)
117                .await?;
118        }
119        if splits_num % sleep_split_interval == 0 {
120            tokio::time::sleep(Duration::from_millis(sleep_duration_millis)).await;
121        }
122    }
123    if !insert_batch.is_empty() {
124        cdc_table_snapshot_split::Entity::insert_many(std::mem::take(&mut insert_batch))
125            .exec(&txn)
126            .await?;
127    }
128    txn.commit().await?;
129    // TODO(zw): handle metadata backup&restore
130    Ok(())
131}
132
133/// Returns true if the fragment is CDC scan and has parallelized backfill enabled.
134pub(crate) fn is_parallelized_backfill_enabled_cdc_scan_fragment(fragment: &Fragment) -> bool {
135    let mut b = false;
136    visit_stream_node_cont(&fragment.nodes, |node| {
137        if let Some(NodeBody::StreamCdcScan(node)) = &node.node_body {
138            b = is_parallelized_backfill_enabled(node);
139            false
140        } else {
141            true
142        }
143    });
144    b
145}
146
147pub fn is_parallelized_backfill_enabled(node: &StreamCdcScanNode) -> bool {
148    if let Some(options) = &node.options
149        && CdcScanOptions::from_proto(options).is_parallelized_backfill()
150    {
151        return true;
152    }
153    false
154}
155
156pub(crate) async fn assign_cdc_table_snapshot_splits(
157    original_table_id: u32,
158    job: &StreamJobFragments,
159    meta_store: &SqlMetaStore,
160) -> MetaResult<CdcTableSnapshotSplitAssignment> {
161    let mut stream_scan_fragments = job
162        .fragments
163        .values()
164        .filter(|f| is_parallelized_backfill_enabled_cdc_scan_fragment(f))
165        .collect_vec();
166    if stream_scan_fragments.is_empty() {
167        return Ok(HashMap::default());
168    }
169    assert_eq!(
170        stream_scan_fragments.len(),
171        1,
172        "Expect 1 scan fragment, {} was found.",
173        stream_scan_fragments.len()
174    );
175    let stream_scan_fragment = stream_scan_fragments.swap_remove(0);
176    assign_cdc_table_snapshot_splits_impl(
177        original_table_id,
178        stream_scan_fragment
179            .actors
180            .iter()
181            .map(|a| a.actor_id)
182            .collect(),
183        meta_store,
184        None,
185    )
186    .await
187}
188
189pub(crate) async fn assign_cdc_table_snapshot_splits_pairs(
190    table_id_actor_ids: impl IntoIterator<Item = (u32, HashSet<u32>)>,
191    meta_store: &SqlMetaStore,
192    completed_cdc_job_ids: HashSet<u32>,
193) -> MetaResult<CdcTableSnapshotSplitAssignment> {
194    let mut assignments = HashMap::default();
195    for (table_id, actor_ids) in table_id_actor_ids {
196        assignments.extend(
197            assign_cdc_table_snapshot_splits_impl(
198                table_id,
199                actor_ids,
200                meta_store,
201                Some(&completed_cdc_job_ids),
202            )
203            .await?,
204        );
205    }
206    Ok(assignments)
207}
208
209pub(crate) async fn assign_cdc_table_snapshot_splits_impl(
210    table_id: u32,
211    actor_ids: HashSet<u32>,
212    meta_store: &SqlMetaStore,
213    completed_cdc_job_ids: Option<&HashSet<u32>>,
214) -> MetaResult<CdcTableSnapshotSplitAssignment> {
215    if actor_ids.is_empty() {
216        return Err(anyhow::anyhow!("Expect at least 1 actor, 0 was found.").into());
217    }
218    // Try to avoid meta store access in try_get_cdc_table_snapshot_splits.
219    let splits = if let Some(completed_cdc_job_ids) = completed_cdc_job_ids
220        && completed_cdc_job_ids.contains(&table_id)
221    {
222        vec![single_merged_split()]
223    } else {
224        try_get_cdc_table_snapshot_splits(table_id, meta_store).await?
225    };
226    if splits.is_empty() {
227        return Err(
228            anyhow::anyhow!("Expect at least 1 CDC table snapshot splits, 0 was found.").into(),
229        );
230    }
231    let splits_per_actor = splits.len().div_ceil(actor_ids.len());
232    let mut assignments: HashMap<
233        u32,
234        Vec<risingwave_connector::source::CdcTableSnapshotSplitCommon<Vec<u8>>>,
235        _,
236    > = HashMap::default();
237    for (actor_id, splits) in actor_ids.iter().copied().zip_eq_debug(
238        splits
239            .into_iter()
240            .chunks(splits_per_actor)
241            .into_iter()
242            .map(|c| c.collect_vec())
243            .chain(iter::repeat(Vec::default()))
244            .take(actor_ids.len()),
245    ) {
246        assignments.insert(actor_id, splits);
247    }
248    Ok(assignments)
249}
250
251pub async fn try_get_cdc_table_snapshot_splits(
252    table_id: u32,
253    meta_store: &SqlMetaStore,
254) -> MetaResult<Vec<CdcTableSnapshotSplitRaw>> {
255    use sea_orm::{ColumnTrait, EntityTrait, QueryFilter, QuerySelect};
256    let splits: Vec<(i64, Vec<u8>, Vec<u8>, i16)> = cdc_table_snapshot_split::Entity::find()
257        .select_only()
258        .columns([
259            cdc_table_snapshot_split::Column::SplitId,
260            cdc_table_snapshot_split::Column::Left,
261            cdc_table_snapshot_split::Column::Right,
262            cdc_table_snapshot_split::Column::IsBackfillFinished,
263        ])
264        .filter(
265            cdc_table_snapshot_split::Column::TableId
266                .eq(TryInto::<TableId>::try_into(table_id).unwrap()),
267        )
268        .into_tuple()
269        .all(&meta_store.conn)
270        .await?;
271    let split_completed_count = splits
272        .iter()
273        .filter(|(_, _, _, is_backfill_finished)| *is_backfill_finished == 1)
274        .count();
275    assert!(
276        split_completed_count <= 1,
277        "split_completed_count = {}",
278        split_completed_count
279    );
280    let is_backfill_finished = split_completed_count == 1;
281    if is_backfill_finished && splits.len() != 1 {
282        // CdcTableBackfillTracker::complete_job rewrites splits in a transaction.
283        // This error should only happen when the meta store reads uncommitted data.
284        tracing::error!(table_id, ?splits, "unexpected split count");
285        bail!(
286            "unexpected split count: table_id={table_id}, split_total_count={}, split_completed_count={split_completed_count}",
287            splits.len()
288        );
289    }
290    let splits: Vec<_> = splits
291        .into_iter()
292        // The try_init_parallel_cdc_table_snapshot_splits ensures that split with a larger split_id will always have a larger left bound.
293        // Assigning consecutive splits to the same actor enables potential optimization in CDC backfill executor.
294        .sorted_by_key(|(split_id, _, _, _)| *split_id)
295        .map(
296            |(split_id, left_bound_inclusive, right_bound_exclusive, _)| CdcTableSnapshotSplitRaw {
297                split_id,
298                left_bound_inclusive,
299                right_bound_exclusive,
300            },
301        )
302        .collect();
303    Ok(splits)
304}
305
306fn single_merged_split() -> CdcTableSnapshotSplitRaw {
307    CdcTableSnapshotSplitRaw {
308        // TODO(zw): remove magic number
309        split_id: 1,
310        left_bound_inclusive: OwnedRow::new(vec![None]).value_serialize(),
311        right_bound_exclusive: OwnedRow::new(vec![None]).value_serialize(),
312    }
313}