risingwave_meta/stream/
cdc.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::iter;
17use std::time::Duration;
18
19use anyhow::Context;
20use futures::pin_mut;
21use futures_async_stream::for_await;
22use itertools::Itertools;
23use risingwave_common::catalog::Schema;
24use risingwave_common::row::Row;
25use risingwave_common::util::iter_util::ZipEqDebug;
26use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont;
27use risingwave_connector::source::cdc::external::{
28    CdcTableSnapshotSplitOption, CdcTableType, ExternalTableConfig, ExternalTableReader,
29    SchemaTableName,
30};
31use risingwave_connector::source::cdc::{CdcScanOptions, CdcTableSnapshotSplitAssignment};
32use risingwave_connector::source::{CdcTableSnapshotSplit, CdcTableSnapshotSplitRaw};
33use risingwave_meta_model::{TableId, cdc_table_snapshot_split};
34use risingwave_pb::plan_common::ExternalTableDesc;
35use risingwave_pb::stream_plan::stream_node::NodeBody;
36use risingwave_pb::stream_plan::{StreamCdcScanNode, StreamCdcScanOptions};
37use sea_orm::{EntityTrait, Set, TransactionTrait};
38
39use crate::MetaResult;
40use crate::controller::SqlMetaStore;
41use crate::model::{Fragment, StreamJobFragments};
42
43/// A CDC table snapshot splits can only be successfully initialized once.
44/// Subsequent attempts to write to the metastore with the same primary key will be rejected.
45pub(crate) async fn try_init_parallel_cdc_table_snapshot_splits(
46    table_id: u32,
47    table_desc: &ExternalTableDesc,
48    meta_store: &SqlMetaStore,
49    per_table_options: &Option<StreamCdcScanOptions>,
50    insert_batch_size: u64,
51    sleep_split_interval: u64,
52    sleep_duration_millis: u64,
53) -> MetaResult<()> {
54    let split_options = if let Some(per_table_options) = per_table_options {
55        if !CdcScanOptions::from_proto(per_table_options).is_parallelized_backfill() {
56            return Ok(());
57        }
58        CdcTableSnapshotSplitOption {
59            backfill_num_rows_per_split: per_table_options.backfill_num_rows_per_split,
60            backfill_as_even_splits: per_table_options.backfill_as_even_splits,
61            backfill_split_pk_column_index: per_table_options.backfill_split_pk_column_index,
62        }
63    } else {
64        return Ok(());
65    };
66    let table_type = CdcTableType::from_properties(&table_desc.connect_properties);
67    // Filter out additional columns to construct the external table schema
68    let table_schema: Schema = table_desc
69        .columns
70        .iter()
71        .filter(|col| {
72            col.additional_column
73                .as_ref()
74                .is_none_or(|a_col| a_col.column_type.is_none())
75        })
76        .map(Into::into)
77        .collect();
78    let table_pk_indices = table_desc
79        .pk
80        .iter()
81        .map(|k| k.column_index as usize)
82        .collect_vec();
83    let table_config = ExternalTableConfig::try_from_btreemap(
84        table_desc.connect_properties.clone(),
85        table_desc.secret_refs.clone(),
86    )
87    .context("failed to parse external table config")?;
88    let schema_table_name = SchemaTableName::from_properties(&table_desc.connect_properties);
89    let reader = table_type
90        .create_table_reader(
91            table_config,
92            table_schema,
93            table_pk_indices,
94            schema_table_name,
95        )
96        .await?;
97    let stream = reader.get_parallel_cdc_splits(split_options);
98    let mut insert_batch = vec![];
99    let mut splits_num = 0;
100    let txn = meta_store.conn.begin().await?;
101    pin_mut!(stream);
102    #[for_await]
103    for split in stream {
104        let split: CdcTableSnapshotSplit = split?;
105        splits_num += 1;
106        insert_batch.push(cdc_table_snapshot_split::ActiveModel {
107            table_id: Set(table_id.try_into().unwrap()),
108            split_id: Set(split.split_id.to_owned()),
109            left: Set(split.left_bound_inclusive.value_serialize()),
110            right: Set(split.right_bound_exclusive.value_serialize()),
111        });
112        if insert_batch.len() >= insert_batch_size as usize {
113            cdc_table_snapshot_split::Entity::insert_many(std::mem::take(&mut insert_batch))
114                .exec(&txn)
115                .await?;
116        }
117        if splits_num % sleep_split_interval == 0 {
118            tokio::time::sleep(Duration::from_millis(sleep_duration_millis)).await;
119        }
120    }
121    if !insert_batch.is_empty() {
122        cdc_table_snapshot_split::Entity::insert_many(std::mem::take(&mut insert_batch))
123            .exec(&txn)
124            .await?;
125    }
126    txn.commit().await?;
127    // TODO(zw): handle metadata backup&restore
128    Ok(())
129}
130
131/// Returns true if the fragment is CDC scan and has parallelized backfill enabled.
132fn is_parallelized_backfill_enabled_cdc_scan_fragment(fragment: &Fragment) -> bool {
133    let mut b = false;
134    visit_stream_node_cont(&fragment.nodes, |node| {
135        if let Some(NodeBody::StreamCdcScan(node)) = &node.node_body {
136            b = is_parallelized_backfill_enabled(node);
137            false
138        } else {
139            true
140        }
141    });
142    b
143}
144
145pub fn is_parallelized_backfill_enabled(node: &StreamCdcScanNode) -> bool {
146    if let Some(options) = &node.options
147        && CdcScanOptions::from_proto(options).is_parallelized_backfill()
148    {
149        return true;
150    }
151    false
152}
153
154pub(crate) async fn assign_cdc_table_snapshot_splits(
155    table_fragments: impl Iterator<Item = &StreamJobFragments>,
156    meta_store: &SqlMetaStore,
157) -> MetaResult<CdcTableSnapshotSplitAssignment> {
158    let mut assignments = HashMap::default();
159    for job in table_fragments {
160        let mut stream_scan_fragments = job
161            .fragments
162            .values()
163            .filter(|f| is_parallelized_backfill_enabled_cdc_scan_fragment(f))
164            .collect_vec();
165        if stream_scan_fragments.is_empty() {
166            continue;
167        }
168        assert_eq!(stream_scan_fragments.len(), 1);
169        let stream_scan_fragment = stream_scan_fragments.swap_remove(0);
170        let assignment = assign_cdc_table_snapshot_splits_impl(
171            job.stream_job_id.table_id,
172            stream_scan_fragment
173                .actors
174                .iter()
175                .map(|a| a.actor_id)
176                .collect(),
177            meta_store,
178        )
179        .await?;
180        assignments.extend(assignment);
181    }
182    Ok(assignments)
183}
184
185pub(crate) async fn assign_cdc_table_snapshot_splits_pairs(
186    table_id_actor_ids: impl IntoIterator<Item = (u32, HashSet<u32>)>,
187    meta_store: &SqlMetaStore,
188) -> MetaResult<CdcTableSnapshotSplitAssignment> {
189    let mut assignments = HashMap::default();
190    for (table_id, actor_ids) in table_id_actor_ids {
191        assignments
192            .extend(assign_cdc_table_snapshot_splits_impl(table_id, actor_ids, meta_store).await?);
193    }
194    Ok(assignments)
195}
196
197pub(crate) async fn assign_cdc_table_snapshot_splits_impl(
198    table_id: u32,
199    actor_ids: HashSet<u32>,
200    meta_store: &SqlMetaStore,
201) -> MetaResult<CdcTableSnapshotSplitAssignment> {
202    if actor_ids.is_empty() {
203        return Err(anyhow::anyhow!("Expect at least 1 actor, 0 was found.").into());
204    }
205    let mut assignments = HashMap::default();
206    let splits = try_get_cdc_table_snapshot_splits(table_id, meta_store).await?;
207    if splits.is_empty() {
208        return Err(
209            anyhow::anyhow!("Expect at least 1 CDC table snapshot splits, 0 was found.").into(),
210        );
211    }
212    let splits_per_actor = splits.len().div_ceil(actor_ids.len());
213    for (actor_id, splits) in actor_ids.iter().copied().zip_eq_debug(
214        splits
215            .into_iter()
216            .chunks(splits_per_actor)
217            .into_iter()
218            .map(|c| c.collect_vec())
219            .chain(iter::repeat(Vec::default()))
220            .take(actor_ids.len()),
221    ) {
222        assignments.insert(actor_id, splits);
223    }
224    Ok(assignments)
225}
226
227pub(crate) async fn assign_cdc_table_snapshot_splits_for_replace_table(
228    original_table_id: u32,
229    job: &StreamJobFragments,
230    meta_store: &SqlMetaStore,
231) -> MetaResult<CdcTableSnapshotSplitAssignment> {
232    let mut stream_scan_fragments = job
233        .fragments
234        .values()
235        .filter(|f| is_parallelized_backfill_enabled_cdc_scan_fragment(f))
236        .collect_vec();
237    if stream_scan_fragments.is_empty() {
238        return Ok(HashMap::default());
239    }
240    assert_eq!(
241        stream_scan_fragments.len(),
242        1,
243        "Expect 1 scan fragment, {} was found.",
244        stream_scan_fragments.len()
245    );
246    let stream_scan_fragment = stream_scan_fragments.swap_remove(0);
247    let assignment = assign_cdc_table_snapshot_splits_impl(
248        original_table_id,
249        stream_scan_fragment
250            .actors
251            .iter()
252            .map(|a| a.actor_id)
253            .collect(),
254        meta_store,
255    )
256    .await?;
257    Ok(assignment)
258}
259
260pub async fn try_get_cdc_table_snapshot_splits(
261    table_id: u32,
262    meta_store: &SqlMetaStore,
263) -> MetaResult<Vec<CdcTableSnapshotSplitRaw>> {
264    use sea_orm::{ColumnTrait, EntityTrait, QueryFilter, QuerySelect};
265    let splits: Vec<(i64, Vec<u8>, Vec<u8>)> = cdc_table_snapshot_split::Entity::find()
266        .select_only()
267        .columns([
268            cdc_table_snapshot_split::Column::SplitId,
269            cdc_table_snapshot_split::Column::Left,
270            cdc_table_snapshot_split::Column::Right,
271        ])
272        .filter(
273            cdc_table_snapshot_split::Column::TableId
274                .eq(TryInto::<TableId>::try_into(table_id).unwrap()),
275        )
276        .into_tuple()
277        .all(&meta_store.conn)
278        .await?;
279    let splits: Vec<_> = splits
280        .into_iter()
281        // The try_init_parallel_cdc_table_snapshot_splits ensures that split with a larger split_id will always have a larger left bound.
282        // Assigning consecutive splits to the same actor enables potential optimization in CDC backfill executor.
283        .sorted_by_key(|(split_id, _, _)| *split_id)
284        .map(
285            |(split_id, left_bound_inclusive, right_bound_exclusive)| CdcTableSnapshotSplitRaw {
286                split_id,
287                left_bound_inclusive,
288                right_bound_exclusive,
289            },
290        )
291        .collect();
292    Ok(splits)
293}