1use std::collections::{HashMap, HashSet};
16use std::iter;
17use std::time::Duration;
18
19use anyhow::Context;
20use futures::pin_mut;
21use futures_async_stream::for_await;
22use itertools::Itertools;
23use risingwave_common::bail;
24use risingwave_common::catalog::Schema;
25use risingwave_common::row::{OwnedRow, Row};
26use risingwave_common::util::iter_util::ZipEqDebug;
27use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont;
28use risingwave_connector::source::cdc::external::{
29 CdcTableSnapshotSplitOption, ExternalCdcTableType, ExternalTableConfig, ExternalTableReader,
30 SchemaTableName,
31};
32use risingwave_connector::source::cdc::{CdcScanOptions, CdcTableSnapshotSplitAssignment};
33use risingwave_connector::source::{CdcTableSnapshotSplit, CdcTableSnapshotSplitRaw};
34use risingwave_meta_model::{TableId, cdc_table_snapshot_split};
35use risingwave_pb::plan_common::ExternalTableDesc;
36use risingwave_pb::stream_plan::stream_node::NodeBody;
37use risingwave_pb::stream_plan::{StreamCdcScanNode, StreamCdcScanOptions};
38use sea_orm::{EntityTrait, Set, TransactionTrait};
39
40use crate::MetaResult;
41use crate::controller::SqlMetaStore;
42use crate::model::{Fragment, StreamJobFragments};
43
44pub(crate) async fn try_init_parallel_cdc_table_snapshot_splits(
47 table_id: u32,
48 table_desc: &ExternalTableDesc,
49 meta_store: &SqlMetaStore,
50 per_table_options: &Option<StreamCdcScanOptions>,
51 insert_batch_size: u64,
52 sleep_split_interval: u64,
53 sleep_duration_millis: u64,
54) -> MetaResult<()> {
55 let split_options = if let Some(per_table_options) = per_table_options {
56 if !CdcScanOptions::from_proto(per_table_options).is_parallelized_backfill() {
57 return Ok(());
58 }
59 CdcTableSnapshotSplitOption {
60 backfill_num_rows_per_split: per_table_options.backfill_num_rows_per_split,
61 backfill_as_even_splits: per_table_options.backfill_as_even_splits,
62 backfill_split_pk_column_index: per_table_options.backfill_split_pk_column_index,
63 }
64 } else {
65 return Ok(());
66 };
67 let table_type = ExternalCdcTableType::from_properties(&table_desc.connect_properties);
68 let table_schema: Schema = table_desc
70 .columns
71 .iter()
72 .filter(|col| {
73 col.additional_column
74 .as_ref()
75 .is_none_or(|a_col| a_col.column_type.is_none())
76 })
77 .map(Into::into)
78 .collect();
79 let table_pk_indices = table_desc
80 .pk
81 .iter()
82 .map(|k| k.column_index as usize)
83 .collect_vec();
84 let table_config = ExternalTableConfig::try_from_btreemap(
85 table_desc.connect_properties.clone(),
86 table_desc.secret_refs.clone(),
87 )
88 .context("failed to parse external table config")?;
89 let schema_table_name = SchemaTableName::from_properties(&table_desc.connect_properties);
90 let reader = table_type
91 .create_table_reader(
92 table_config,
93 table_schema,
94 table_pk_indices,
95 schema_table_name,
96 )
97 .await?;
98 let stream = reader.get_parallel_cdc_splits(split_options);
99 let mut insert_batch = vec![];
100 let mut splits_num = 0;
101 let txn = meta_store.conn.begin().await?;
102 pin_mut!(stream);
103 #[for_await]
104 for split in stream {
105 let split: CdcTableSnapshotSplit = split?;
106 splits_num += 1;
107 insert_batch.push(cdc_table_snapshot_split::ActiveModel {
108 table_id: Set(table_id.try_into().unwrap()),
109 split_id: Set(split.split_id.to_owned()),
110 left: Set(split.left_bound_inclusive.value_serialize()),
111 right: Set(split.right_bound_exclusive.value_serialize()),
112 is_backfill_finished: Set(0),
113 });
114 if insert_batch.len() >= insert_batch_size as usize {
115 cdc_table_snapshot_split::Entity::insert_many(std::mem::take(&mut insert_batch))
116 .exec(&txn)
117 .await?;
118 }
119 if splits_num % sleep_split_interval == 0 {
120 tokio::time::sleep(Duration::from_millis(sleep_duration_millis)).await;
121 }
122 }
123 if !insert_batch.is_empty() {
124 cdc_table_snapshot_split::Entity::insert_many(std::mem::take(&mut insert_batch))
125 .exec(&txn)
126 .await?;
127 }
128 txn.commit().await?;
129 Ok(())
131}
132
133pub(crate) fn is_parallelized_backfill_enabled_cdc_scan_fragment(fragment: &Fragment) -> bool {
135 let mut b = false;
136 visit_stream_node_cont(&fragment.nodes, |node| {
137 if let Some(NodeBody::StreamCdcScan(node)) = &node.node_body {
138 b = is_parallelized_backfill_enabled(node);
139 false
140 } else {
141 true
142 }
143 });
144 b
145}
146
147pub fn is_parallelized_backfill_enabled(node: &StreamCdcScanNode) -> bool {
148 if let Some(options) = &node.options
149 && CdcScanOptions::from_proto(options).is_parallelized_backfill()
150 {
151 return true;
152 }
153 false
154}
155
156pub(crate) async fn assign_cdc_table_snapshot_splits(
157 original_table_id: u32,
158 job: &StreamJobFragments,
159 meta_store: &SqlMetaStore,
160) -> MetaResult<CdcTableSnapshotSplitAssignment> {
161 let mut stream_scan_fragments = job
162 .fragments
163 .values()
164 .filter(|f| is_parallelized_backfill_enabled_cdc_scan_fragment(f))
165 .collect_vec();
166 if stream_scan_fragments.is_empty() {
167 return Ok(HashMap::default());
168 }
169 assert_eq!(
170 stream_scan_fragments.len(),
171 1,
172 "Expect 1 scan fragment, {} was found.",
173 stream_scan_fragments.len()
174 );
175 let stream_scan_fragment = stream_scan_fragments.swap_remove(0);
176 assign_cdc_table_snapshot_splits_impl(
177 original_table_id,
178 stream_scan_fragment
179 .actors
180 .iter()
181 .map(|a| a.actor_id)
182 .collect(),
183 meta_store,
184 None,
185 )
186 .await
187}
188
189pub(crate) async fn assign_cdc_table_snapshot_splits_pairs(
190 table_id_actor_ids: impl IntoIterator<Item = (u32, HashSet<u32>)>,
191 meta_store: &SqlMetaStore,
192 completed_cdc_job_ids: HashSet<u32>,
193) -> MetaResult<CdcTableSnapshotSplitAssignment> {
194 let mut assignments = HashMap::default();
195 for (table_id, actor_ids) in table_id_actor_ids {
196 assignments.extend(
197 assign_cdc_table_snapshot_splits_impl(
198 table_id,
199 actor_ids,
200 meta_store,
201 Some(&completed_cdc_job_ids),
202 )
203 .await?,
204 );
205 }
206 Ok(assignments)
207}
208
209pub(crate) async fn assign_cdc_table_snapshot_splits_impl(
210 table_id: u32,
211 actor_ids: HashSet<u32>,
212 meta_store: &SqlMetaStore,
213 completed_cdc_job_ids: Option<&HashSet<u32>>,
214) -> MetaResult<CdcTableSnapshotSplitAssignment> {
215 if actor_ids.is_empty() {
216 return Err(anyhow::anyhow!("Expect at least 1 actor, 0 was found.").into());
217 }
218 let splits = if let Some(completed_cdc_job_ids) = completed_cdc_job_ids
220 && completed_cdc_job_ids.contains(&table_id)
221 {
222 vec![single_merged_split()]
223 } else {
224 try_get_cdc_table_snapshot_splits(table_id, meta_store).await?
225 };
226 if splits.is_empty() {
227 return Err(
228 anyhow::anyhow!("Expect at least 1 CDC table snapshot splits, 0 was found.").into(),
229 );
230 }
231 let splits_per_actor = splits.len().div_ceil(actor_ids.len());
232 let mut assignments: HashMap<
233 u32,
234 Vec<risingwave_connector::source::CdcTableSnapshotSplitCommon<Vec<u8>>>,
235 _,
236 > = HashMap::default();
237 for (actor_id, splits) in actor_ids.iter().copied().zip_eq_debug(
238 splits
239 .into_iter()
240 .chunks(splits_per_actor)
241 .into_iter()
242 .map(|c| c.collect_vec())
243 .chain(iter::repeat(Vec::default()))
244 .take(actor_ids.len()),
245 ) {
246 assignments.insert(actor_id, splits);
247 }
248 Ok(assignments)
249}
250
251pub async fn try_get_cdc_table_snapshot_splits(
252 table_id: u32,
253 meta_store: &SqlMetaStore,
254) -> MetaResult<Vec<CdcTableSnapshotSplitRaw>> {
255 use sea_orm::{ColumnTrait, EntityTrait, QueryFilter, QuerySelect};
256 let splits: Vec<(i64, Vec<u8>, Vec<u8>, i16)> = cdc_table_snapshot_split::Entity::find()
257 .select_only()
258 .columns([
259 cdc_table_snapshot_split::Column::SplitId,
260 cdc_table_snapshot_split::Column::Left,
261 cdc_table_snapshot_split::Column::Right,
262 cdc_table_snapshot_split::Column::IsBackfillFinished,
263 ])
264 .filter(
265 cdc_table_snapshot_split::Column::TableId
266 .eq(TryInto::<TableId>::try_into(table_id).unwrap()),
267 )
268 .into_tuple()
269 .all(&meta_store.conn)
270 .await?;
271 let split_completed_count = splits
272 .iter()
273 .filter(|(_, _, _, is_backfill_finished)| *is_backfill_finished == 1)
274 .count();
275 assert!(
276 split_completed_count <= 1,
277 "split_completed_count = {}",
278 split_completed_count
279 );
280 let is_backfill_finished = split_completed_count == 1;
281 if is_backfill_finished && splits.len() != 1 {
282 tracing::error!(table_id, ?splits, "unexpected split count");
285 bail!(
286 "unexpected split count: table_id={table_id}, split_total_count={}, split_completed_count={split_completed_count}",
287 splits.len()
288 );
289 }
290 let splits: Vec<_> = splits
291 .into_iter()
292 .sorted_by_key(|(split_id, _, _, _)| *split_id)
295 .map(
296 |(split_id, left_bound_inclusive, right_bound_exclusive, _)| CdcTableSnapshotSplitRaw {
297 split_id,
298 left_bound_inclusive,
299 right_bound_exclusive,
300 },
301 )
302 .collect();
303 Ok(splits)
304}
305
306fn single_merged_split() -> CdcTableSnapshotSplitRaw {
307 CdcTableSnapshotSplitRaw {
308 split_id: 1,
310 left_bound_inclusive: OwnedRow::new(vec![None]).value_serialize(),
311 right_bound_exclusive: OwnedRow::new(vec![None]).value_serialize(),
312 }
313}