1use std::collections::{HashMap, HashSet};
16use std::iter;
17use std::time::Duration;
18
19use anyhow::Context;
20use futures::pin_mut;
21use futures_async_stream::for_await;
22use itertools::Itertools;
23use risingwave_common::bail;
24use risingwave_common::catalog::{FragmentTypeFlag, FragmentTypeMask, Schema};
25use risingwave_common::id::{ActorId, JobId};
26use risingwave_common::row::{OwnedRow, Row};
27use risingwave_common::util::iter_util::ZipEqDebug;
28use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont;
29use risingwave_connector::source::cdc::external::{
30 CdcTableSnapshotSplitOption, ExternalCdcTableType, ExternalTableConfig, ExternalTableReader,
31 SchemaTableName,
32};
33use risingwave_connector::source::cdc::{CdcScanOptions, build_cdc_table_snapshot_split};
34use risingwave_connector::source::{CdcTableSnapshotSplit, CdcTableSnapshotSplitRaw};
35use risingwave_meta_model::cdc_table_snapshot_split::Relation::Object;
36use risingwave_meta_model::{cdc_table_snapshot_split, object};
37use risingwave_meta_model_migration::JoinType;
38use risingwave_pb::id::{DatabaseId, TableId};
39use risingwave_pb::plan_common::ExternalTableDesc;
40use risingwave_pb::source::PbCdcTableSnapshotSplits;
41use risingwave_pb::stream_plan::stream_node::NodeBody;
42use risingwave_pb::stream_plan::{StreamCdcScanNode, StreamCdcScanOptions, StreamNode};
43use sea_orm::{
44 ColumnTrait, ConnectionTrait, EntityTrait, QueryFilter, QuerySelect, RelationTrait, Set,
45 TransactionTrait,
46};
47
48use crate::MetaResult;
49use crate::controller::SqlMetaStore;
50use crate::model::Fragment;
51
52pub(crate) async fn try_init_parallel_cdc_table_snapshot_splits(
55 table_id: TableId,
56 table_desc: &ExternalTableDesc,
57 meta_store: &SqlMetaStore,
58 per_table_options: &StreamCdcScanOptions,
59 insert_batch_size: u64,
60 sleep_split_interval: u64,
61 sleep_duration_millis: u64,
62) -> MetaResult<Vec<CdcTableSnapshotSplitRaw>> {
63 let split_options = CdcTableSnapshotSplitOption {
64 backfill_num_rows_per_split: per_table_options.backfill_num_rows_per_split,
65 backfill_as_even_splits: per_table_options.backfill_as_even_splits,
66 backfill_split_pk_column_index: per_table_options.backfill_split_pk_column_index,
67 };
68 let table_type = ExternalCdcTableType::from_properties(&table_desc.connect_properties);
69 let table_schema: Schema = table_desc
71 .columns
72 .iter()
73 .filter(|col| {
74 col.additional_column
75 .as_ref()
76 .is_none_or(|a_col| a_col.column_type.is_none())
77 })
78 .map(Into::into)
79 .collect();
80 let table_pk_indices = table_desc
81 .pk
82 .iter()
83 .map(|k| k.column_index as usize)
84 .collect_vec();
85 let table_config = ExternalTableConfig::try_from_btreemap(
86 table_desc.connect_properties.clone(),
87 table_desc.secret_refs.clone(),
88 )
89 .context("failed to parse external table config")?;
90 let schema_table_name = SchemaTableName::from_properties(&table_desc.connect_properties);
91 let reader = table_type
92 .create_table_reader(
93 table_config,
94 table_schema,
95 table_pk_indices,
96 schema_table_name,
97 )
98 .await?;
99 let stream = reader.get_parallel_cdc_splits(split_options);
100 let mut insert_batch = vec![];
101 let mut splits_num = 0;
102 let mut splits = vec![];
103 let txn = meta_store.conn.begin().await?;
104 pin_mut!(stream);
105 #[for_await]
106 for split in stream {
107 let split: CdcTableSnapshotSplit = split?;
108 splits_num += 1;
109 let left = split.left_bound_inclusive.value_serialize();
110 let right = split.right_bound_exclusive.value_serialize();
111 insert_batch.push(cdc_table_snapshot_split::ActiveModel {
112 table_id: Set(table_id.as_job_id()),
113 split_id: Set(split.split_id.to_owned()),
114 left: Set(left.clone()),
115 right: Set(right.clone()),
116 is_backfill_finished: Set(0),
117 });
118 splits.push(CdcTableSnapshotSplitRaw {
119 split_id: split.split_id,
120 left_bound_inclusive: left,
121 right_bound_exclusive: right,
122 });
123 if insert_batch.len() >= insert_batch_size as usize {
124 cdc_table_snapshot_split::Entity::insert_many(std::mem::take(&mut insert_batch))
125 .exec(&txn)
126 .await?;
127 }
128 if splits_num % sleep_split_interval == 0 {
129 tokio::time::sleep(Duration::from_millis(sleep_duration_millis)).await;
130 }
131 }
132 if !insert_batch.is_empty() {
133 cdc_table_snapshot_split::Entity::insert_many(std::mem::take(&mut insert_batch))
134 .exec(&txn)
135 .await?;
136 }
137 txn.commit().await?;
138 Ok(splits)
139}
140
141pub(crate) fn is_parallelized_backfill_enabled_cdc_scan_fragment(
143 fragment_type_mask: FragmentTypeMask,
144 nodes: &StreamNode,
145) -> Option<&StreamCdcScanNode> {
146 if !fragment_type_mask.contains(FragmentTypeFlag::StreamCdcScan) {
147 return None;
148 }
149 let mut stream_cdc_scan = None;
150 visit_stream_node_cont(nodes, |node| {
151 if let Some(NodeBody::StreamCdcScan(node)) = &node.node_body {
152 if is_parallelized_backfill_enabled(node) {
153 stream_cdc_scan = Some(&**node);
154 }
155 false
156 } else {
157 true
158 }
159 });
160 stream_cdc_scan
161}
162
163fn is_parallelized_backfill_enabled(node: &StreamCdcScanNode) -> bool {
164 if let Some(options) = &node.options
165 && CdcScanOptions::from_proto(options).is_parallelized_backfill()
166 {
167 return true;
168 }
169 false
170}
171
172pub(crate) fn parallel_cdc_table_backfill_fragment<'a>(
173 fragments: impl Iterator<Item = &'a Fragment>,
174) -> Option<(&'a Fragment, &'a StreamCdcScanNode)> {
175 let mut stream_scan_fragments = fragments.filter_map(|f| {
176 is_parallelized_backfill_enabled_cdc_scan_fragment(f.fragment_type_mask, &f.nodes)
177 .map(|cdc_scan| (f, cdc_scan))
178 });
179 let fragment = stream_scan_fragments.next()?;
180 assert_eq!(
181 stream_scan_fragments.count(),
182 0,
183 "Expect no remaining scan fragment",
184 );
185 Some(fragment)
186}
187
188pub(crate) fn assign_cdc_table_snapshot_splits(
189 actor_ids: HashSet<ActorId>,
190 splits: &[CdcTableSnapshotSplitRaw],
191 generation: u64,
192) -> MetaResult<HashMap<ActorId, PbCdcTableSnapshotSplits>> {
193 if actor_ids.is_empty() {
194 return Err(anyhow::anyhow!("Expect at least 1 actor, 0 was found.").into());
195 }
196 if splits.is_empty() {
197 return Err(
198 anyhow::anyhow!("Expect at least 1 CDC table snapshot splits, 0 was found.").into(),
199 );
200 }
201 let splits_per_actor = splits.len().div_ceil(actor_ids.len());
202 let mut assignments = HashMap::new();
203 for (actor_id, splits) in actor_ids.iter().copied().zip_eq_debug(
204 splits
205 .iter()
206 .map(build_cdc_table_snapshot_split)
207 .chunks(splits_per_actor)
208 .into_iter()
209 .map(|c| c.collect_vec())
210 .chain(iter::repeat(Vec::default()))
211 .take(actor_ids.len()),
212 ) {
213 assignments.insert(actor_id, PbCdcTableSnapshotSplits { splits, generation });
214 }
215 Ok(assignments)
216}
217
218#[derive(Debug)]
219pub enum CdcTableSnapshotSplits {
220 Backfilling(Vec<CdcTableSnapshotSplitRaw>),
221 Finished,
222}
223
224pub async fn reload_cdc_table_snapshot_splits(
225 txn: &impl ConnectionTrait,
226 database_id: Option<DatabaseId>,
227) -> MetaResult<HashMap<JobId, CdcTableSnapshotSplits>> {
228 let columns = [
229 cdc_table_snapshot_split::Column::TableId,
230 cdc_table_snapshot_split::Column::SplitId,
231 cdc_table_snapshot_split::Column::Left,
232 cdc_table_snapshot_split::Column::Right,
233 cdc_table_snapshot_split::Column::IsBackfillFinished,
234 ];
235 #[expect(clippy::type_complexity)]
236 let all_splits: Vec<(JobId, i64, Vec<u8>, Vec<u8>, i16)> =
237 if let Some(database_id) = database_id {
238 cdc_table_snapshot_split::Entity::find()
239 .join(JoinType::LeftJoin, Object.def())
240 .select_only()
241 .columns(columns)
242 .filter(object::Column::DatabaseId.eq(database_id))
243 .into_tuple()
244 .all(txn)
245 .await?
246 } else {
247 cdc_table_snapshot_split::Entity::find()
248 .select_only()
249 .columns(columns)
250 .into_tuple()
251 .all(txn)
252 .await?
253 };
254
255 let mut job_splits = HashMap::<_, Vec<_>>::new();
256 for (job_id, split_id, left, right, is_backfill_finished) in all_splits {
257 job_splits
258 .entry(job_id)
259 .or_default()
260 .push((split_id, left, right, is_backfill_finished));
261 }
262
263 job_splits
264 .into_iter()
265 .map(|(job_id, splits)| {
266 let splits = compose_job_splits(job_id, splits)?;
267 Ok((job_id, splits))
268 })
269 .try_collect()
270}
271
272pub fn compose_job_splits(
273 job_id: JobId,
274 splits: Vec<(i64, Vec<u8>, Vec<u8>, i16)>,
275) -> MetaResult<CdcTableSnapshotSplits> {
276 let split_completed_count = splits
277 .iter()
278 .filter(|(_, _, _, is_backfill_finished)| *is_backfill_finished == 1)
279 .count();
280 assert!(
281 split_completed_count <= 1,
282 "split_completed_count = {}",
283 split_completed_count
284 );
285 let is_backfill_finished = split_completed_count == 1;
286 let splits = if is_backfill_finished {
287 if splits.len() != 1 {
288 tracing::error!(%job_id, ?splits, "unexpected split count");
291 bail!(
292 "unexpected split count: job_id={job_id}, split_total_count={}, split_completed_count={split_completed_count}",
293 splits.len()
294 );
295 }
296 CdcTableSnapshotSplits::Finished
297 } else {
298 let splits: Vec<_> = splits
299 .into_iter()
300 .sorted_by_key(|(split_id, _, _, _)| *split_id)
303 .map(
304 |(split_id, left_bound_inclusive, right_bound_exclusive, _)| {
305 CdcTableSnapshotSplitRaw {
306 split_id,
307 left_bound_inclusive,
308 right_bound_exclusive,
309 }
310 },
311 )
312 .collect();
313 CdcTableSnapshotSplits::Backfilling(splits)
314 };
315 Ok(splits)
316}
317
318pub fn single_merged_split() -> CdcTableSnapshotSplitRaw {
319 CdcTableSnapshotSplitRaw {
320 split_id: 1,
322 left_bound_inclusive: OwnedRow::new(vec![None]).value_serialize(),
323 right_bound_exclusive: OwnedRow::new(vec![None]).value_serialize(),
324 }
325}