risingwave_meta/stream/
cdc.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::iter;
17use std::time::Duration;
18
19use anyhow::Context;
20use futures::pin_mut;
21use futures_async_stream::for_await;
22use itertools::Itertools;
23use risingwave_common::bail;
24use risingwave_common::catalog::Schema;
25use risingwave_common::id::{ActorId, JobId};
26use risingwave_common::row::{OwnedRow, Row};
27use risingwave_common::util::iter_util::ZipEqDebug;
28use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont;
29use risingwave_connector::source::cdc::external::{
30    CdcTableSnapshotSplitOption, ExternalCdcTableType, ExternalTableConfig, ExternalTableReader,
31    SchemaTableName,
32};
33use risingwave_connector::source::cdc::{CdcScanOptions, CdcTableSnapshotSplitAssignment};
34use risingwave_connector::source::{CdcTableSnapshotSplit, CdcTableSnapshotSplitRaw};
35use risingwave_meta_model::cdc_table_snapshot_split;
36use risingwave_pb::id::TableId;
37use risingwave_pb::plan_common::ExternalTableDesc;
38use risingwave_pb::stream_plan::stream_node::NodeBody;
39use risingwave_pb::stream_plan::{StreamCdcScanNode, StreamCdcScanOptions};
40use sea_orm::{EntityTrait, Set, TransactionTrait};
41
42use crate::MetaResult;
43use crate::controller::SqlMetaStore;
44use crate::model::{Fragment, StreamJobFragments};
45
46/// A CDC table snapshot splits can only be successfully initialized once.
47/// Subsequent attempts to write to the metastore with the same primary key will be rejected.
48pub(crate) async fn try_init_parallel_cdc_table_snapshot_splits(
49    table_id: TableId,
50    table_desc: &ExternalTableDesc,
51    meta_store: &SqlMetaStore,
52    per_table_options: &Option<StreamCdcScanOptions>,
53    insert_batch_size: u64,
54    sleep_split_interval: u64,
55    sleep_duration_millis: u64,
56) -> MetaResult<()> {
57    let split_options = if let Some(per_table_options) = per_table_options {
58        if !CdcScanOptions::from_proto(per_table_options).is_parallelized_backfill() {
59            return Ok(());
60        }
61        CdcTableSnapshotSplitOption {
62            backfill_num_rows_per_split: per_table_options.backfill_num_rows_per_split,
63            backfill_as_even_splits: per_table_options.backfill_as_even_splits,
64            backfill_split_pk_column_index: per_table_options.backfill_split_pk_column_index,
65        }
66    } else {
67        return Ok(());
68    };
69    let table_type = ExternalCdcTableType::from_properties(&table_desc.connect_properties);
70    // Filter out additional columns to construct the external table schema
71    let table_schema: Schema = table_desc
72        .columns
73        .iter()
74        .filter(|col| {
75            col.additional_column
76                .as_ref()
77                .is_none_or(|a_col| a_col.column_type.is_none())
78        })
79        .map(Into::into)
80        .collect();
81    let table_pk_indices = table_desc
82        .pk
83        .iter()
84        .map(|k| k.column_index as usize)
85        .collect_vec();
86    let table_config = ExternalTableConfig::try_from_btreemap(
87        table_desc.connect_properties.clone(),
88        table_desc.secret_refs.clone(),
89    )
90    .context("failed to parse external table config")?;
91    let schema_table_name = SchemaTableName::from_properties(&table_desc.connect_properties);
92    let reader = table_type
93        .create_table_reader(
94            table_config,
95            table_schema,
96            table_pk_indices,
97            schema_table_name,
98        )
99        .await?;
100    let stream = reader.get_parallel_cdc_splits(split_options);
101    let mut insert_batch = vec![];
102    let mut splits_num = 0;
103    let txn = meta_store.conn.begin().await?;
104    pin_mut!(stream);
105    #[for_await]
106    for split in stream {
107        let split: CdcTableSnapshotSplit = split?;
108        splits_num += 1;
109        insert_batch.push(cdc_table_snapshot_split::ActiveModel {
110            table_id: Set(table_id.as_job_id()),
111            split_id: Set(split.split_id.to_owned()),
112            left: Set(split.left_bound_inclusive.value_serialize()),
113            right: Set(split.right_bound_exclusive.value_serialize()),
114            is_backfill_finished: Set(0),
115        });
116        if insert_batch.len() >= insert_batch_size as usize {
117            cdc_table_snapshot_split::Entity::insert_many(std::mem::take(&mut insert_batch))
118                .exec(&txn)
119                .await?;
120        }
121        if splits_num % sleep_split_interval == 0 {
122            tokio::time::sleep(Duration::from_millis(sleep_duration_millis)).await;
123        }
124    }
125    if !insert_batch.is_empty() {
126        cdc_table_snapshot_split::Entity::insert_many(std::mem::take(&mut insert_batch))
127            .exec(&txn)
128            .await?;
129    }
130    txn.commit().await?;
131    // TODO(zw): handle metadata backup&restore
132    Ok(())
133}
134
135/// Returns true if the fragment is CDC scan and has parallelized backfill enabled.
136pub(crate) fn is_parallelized_backfill_enabled_cdc_scan_fragment(fragment: &Fragment) -> bool {
137    let mut b = false;
138    visit_stream_node_cont(&fragment.nodes, |node| {
139        if let Some(NodeBody::StreamCdcScan(node)) = &node.node_body {
140            b = is_parallelized_backfill_enabled(node);
141            false
142        } else {
143            true
144        }
145    });
146    b
147}
148
149pub fn is_parallelized_backfill_enabled(node: &StreamCdcScanNode) -> bool {
150    if let Some(options) = &node.options
151        && CdcScanOptions::from_proto(options).is_parallelized_backfill()
152    {
153        return true;
154    }
155    false
156}
157
158pub(crate) async fn assign_cdc_table_snapshot_splits(
159    original_table_job_id: JobId,
160    job: &StreamJobFragments,
161    meta_store: &SqlMetaStore,
162) -> MetaResult<CdcTableSnapshotSplitAssignment> {
163    let mut stream_scan_fragments = job
164        .fragments
165        .values()
166        .filter(|f| is_parallelized_backfill_enabled_cdc_scan_fragment(f))
167        .collect_vec();
168    if stream_scan_fragments.is_empty() {
169        return Ok(HashMap::default());
170    }
171    assert_eq!(
172        stream_scan_fragments.len(),
173        1,
174        "Expect 1 scan fragment, {} was found.",
175        stream_scan_fragments.len()
176    );
177    let stream_scan_fragment = stream_scan_fragments.swap_remove(0);
178    assign_cdc_table_snapshot_splits_impl(
179        original_table_job_id,
180        stream_scan_fragment
181            .actors
182            .iter()
183            .map(|a| a.actor_id)
184            .collect(),
185        meta_store,
186        None,
187    )
188    .await
189}
190
191pub(crate) async fn assign_cdc_table_snapshot_splits_pairs(
192    table_id_actor_ids: impl IntoIterator<Item = (JobId, HashSet<ActorId>)>,
193    meta_store: &SqlMetaStore,
194    completed_cdc_job_ids: HashSet<JobId>,
195) -> MetaResult<CdcTableSnapshotSplitAssignment> {
196    let mut assignments = HashMap::default();
197    for (table_id, actor_ids) in table_id_actor_ids {
198        assignments.extend(
199            assign_cdc_table_snapshot_splits_impl(
200                table_id,
201                actor_ids,
202                meta_store,
203                Some(&completed_cdc_job_ids),
204            )
205            .await?,
206        );
207    }
208    Ok(assignments)
209}
210
211pub(crate) async fn assign_cdc_table_snapshot_splits_impl(
212    table_job_id: JobId,
213    actor_ids: HashSet<ActorId>,
214    meta_store: &SqlMetaStore,
215    completed_cdc_job_ids: Option<&HashSet<JobId>>,
216) -> MetaResult<CdcTableSnapshotSplitAssignment> {
217    if actor_ids.is_empty() {
218        return Err(anyhow::anyhow!("Expect at least 1 actor, 0 was found.").into());
219    }
220    // Try to avoid meta store access in try_get_cdc_table_snapshot_splits.
221    let splits = if let Some(completed_cdc_job_ids) = completed_cdc_job_ids
222        && completed_cdc_job_ids.contains(&table_job_id)
223    {
224        vec![single_merged_split()]
225    } else {
226        try_get_cdc_table_snapshot_splits(table_job_id, meta_store).await?
227    };
228    if splits.is_empty() {
229        return Err(
230            anyhow::anyhow!("Expect at least 1 CDC table snapshot splits, 0 was found.").into(),
231        );
232    }
233    let splits_per_actor = splits.len().div_ceil(actor_ids.len());
234    let mut assignments: HashMap<
235        ActorId,
236        Vec<risingwave_connector::source::CdcTableSnapshotSplitCommon<Vec<u8>>>,
237        _,
238    > = HashMap::default();
239    for (actor_id, splits) in actor_ids.iter().copied().zip_eq_debug(
240        splits
241            .into_iter()
242            .chunks(splits_per_actor)
243            .into_iter()
244            .map(|c| c.collect_vec())
245            .chain(iter::repeat(Vec::default()))
246            .take(actor_ids.len()),
247    ) {
248        assignments.insert(actor_id, splits);
249    }
250    Ok(assignments)
251}
252
253pub async fn try_get_cdc_table_snapshot_splits(
254    job_id: JobId,
255    meta_store: &SqlMetaStore,
256) -> MetaResult<Vec<CdcTableSnapshotSplitRaw>> {
257    use sea_orm::{ColumnTrait, EntityTrait, QueryFilter, QuerySelect};
258    let splits: Vec<(i64, Vec<u8>, Vec<u8>, i16)> = cdc_table_snapshot_split::Entity::find()
259        .select_only()
260        .columns([
261            cdc_table_snapshot_split::Column::SplitId,
262            cdc_table_snapshot_split::Column::Left,
263            cdc_table_snapshot_split::Column::Right,
264            cdc_table_snapshot_split::Column::IsBackfillFinished,
265        ])
266        .filter(cdc_table_snapshot_split::Column::TableId.eq(job_id.as_mv_table_id()))
267        .into_tuple()
268        .all(&meta_store.conn)
269        .await?;
270    let split_completed_count = splits
271        .iter()
272        .filter(|(_, _, _, is_backfill_finished)| *is_backfill_finished == 1)
273        .count();
274    assert!(
275        split_completed_count <= 1,
276        "split_completed_count = {}",
277        split_completed_count
278    );
279    let is_backfill_finished = split_completed_count == 1;
280    if is_backfill_finished && splits.len() != 1 {
281        // CdcTableBackfillTracker::complete_job rewrites splits in a transaction.
282        // This error should only happen when the meta store reads uncommitted data.
283        tracing::error!(%job_id, ?splits, "unexpected split count");
284        bail!(
285            "unexpected split count: job_id={job_id}, split_total_count={}, split_completed_count={split_completed_count}",
286            splits.len()
287        );
288    }
289    let splits: Vec<_> = splits
290        .into_iter()
291        // The try_init_parallel_cdc_table_snapshot_splits ensures that split with a larger split_id will always have a larger left bound.
292        // Assigning consecutive splits to the same actor enables potential optimization in CDC backfill executor.
293        .sorted_by_key(|(split_id, _, _, _)| *split_id)
294        .map(
295            |(split_id, left_bound_inclusive, right_bound_exclusive, _)| CdcTableSnapshotSplitRaw {
296                split_id,
297                left_bound_inclusive,
298                right_bound_exclusive,
299            },
300        )
301        .collect();
302    Ok(splits)
303}
304
305fn single_merged_split() -> CdcTableSnapshotSplitRaw {
306    CdcTableSnapshotSplitRaw {
307        // TODO(zw): remove magic number
308        split_id: 1,
309        left_bound_inclusive: OwnedRow::new(vec![None]).value_serialize(),
310        right_bound_exclusive: OwnedRow::new(vec![None]).value_serialize(),
311    }
312}