1use std::collections::{HashMap, HashSet};
16use std::iter;
17use std::time::Duration;
18
19use anyhow::Context;
20use futures::pin_mut;
21use futures_async_stream::for_await;
22use itertools::Itertools;
23use risingwave_common::bail;
24use risingwave_common::catalog::Schema;
25use risingwave_common::id::{ActorId, JobId};
26use risingwave_common::row::{OwnedRow, Row};
27use risingwave_common::util::iter_util::ZipEqDebug;
28use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont;
29use risingwave_connector::source::cdc::external::{
30 CdcTableSnapshotSplitOption, ExternalCdcTableType, ExternalTableConfig, ExternalTableReader,
31 SchemaTableName,
32};
33use risingwave_connector::source::cdc::{CdcScanOptions, CdcTableSnapshotSplitAssignment};
34use risingwave_connector::source::{CdcTableSnapshotSplit, CdcTableSnapshotSplitRaw};
35use risingwave_meta_model::cdc_table_snapshot_split;
36use risingwave_pb::id::TableId;
37use risingwave_pb::plan_common::ExternalTableDesc;
38use risingwave_pb::stream_plan::stream_node::NodeBody;
39use risingwave_pb::stream_plan::{StreamCdcScanNode, StreamCdcScanOptions};
40use sea_orm::{EntityTrait, Set, TransactionTrait};
41
42use crate::MetaResult;
43use crate::controller::SqlMetaStore;
44use crate::model::{Fragment, StreamJobFragments};
45
46pub(crate) async fn try_init_parallel_cdc_table_snapshot_splits(
49 table_id: TableId,
50 table_desc: &ExternalTableDesc,
51 meta_store: &SqlMetaStore,
52 per_table_options: &Option<StreamCdcScanOptions>,
53 insert_batch_size: u64,
54 sleep_split_interval: u64,
55 sleep_duration_millis: u64,
56) -> MetaResult<()> {
57 let split_options = if let Some(per_table_options) = per_table_options {
58 if !CdcScanOptions::from_proto(per_table_options).is_parallelized_backfill() {
59 return Ok(());
60 }
61 CdcTableSnapshotSplitOption {
62 backfill_num_rows_per_split: per_table_options.backfill_num_rows_per_split,
63 backfill_as_even_splits: per_table_options.backfill_as_even_splits,
64 backfill_split_pk_column_index: per_table_options.backfill_split_pk_column_index,
65 }
66 } else {
67 return Ok(());
68 };
69 let table_type = ExternalCdcTableType::from_properties(&table_desc.connect_properties);
70 let table_schema: Schema = table_desc
72 .columns
73 .iter()
74 .filter(|col| {
75 col.additional_column
76 .as_ref()
77 .is_none_or(|a_col| a_col.column_type.is_none())
78 })
79 .map(Into::into)
80 .collect();
81 let table_pk_indices = table_desc
82 .pk
83 .iter()
84 .map(|k| k.column_index as usize)
85 .collect_vec();
86 let table_config = ExternalTableConfig::try_from_btreemap(
87 table_desc.connect_properties.clone(),
88 table_desc.secret_refs.clone(),
89 )
90 .context("failed to parse external table config")?;
91 let schema_table_name = SchemaTableName::from_properties(&table_desc.connect_properties);
92 let reader = table_type
93 .create_table_reader(
94 table_config,
95 table_schema,
96 table_pk_indices,
97 schema_table_name,
98 )
99 .await?;
100 let stream = reader.get_parallel_cdc_splits(split_options);
101 let mut insert_batch = vec![];
102 let mut splits_num = 0;
103 let txn = meta_store.conn.begin().await?;
104 pin_mut!(stream);
105 #[for_await]
106 for split in stream {
107 let split: CdcTableSnapshotSplit = split?;
108 splits_num += 1;
109 insert_batch.push(cdc_table_snapshot_split::ActiveModel {
110 table_id: Set(table_id.as_job_id()),
111 split_id: Set(split.split_id.to_owned()),
112 left: Set(split.left_bound_inclusive.value_serialize()),
113 right: Set(split.right_bound_exclusive.value_serialize()),
114 is_backfill_finished: Set(0),
115 });
116 if insert_batch.len() >= insert_batch_size as usize {
117 cdc_table_snapshot_split::Entity::insert_many(std::mem::take(&mut insert_batch))
118 .exec(&txn)
119 .await?;
120 }
121 if splits_num % sleep_split_interval == 0 {
122 tokio::time::sleep(Duration::from_millis(sleep_duration_millis)).await;
123 }
124 }
125 if !insert_batch.is_empty() {
126 cdc_table_snapshot_split::Entity::insert_many(std::mem::take(&mut insert_batch))
127 .exec(&txn)
128 .await?;
129 }
130 txn.commit().await?;
131 Ok(())
133}
134
135pub(crate) fn is_parallelized_backfill_enabled_cdc_scan_fragment(fragment: &Fragment) -> bool {
137 let mut b = false;
138 visit_stream_node_cont(&fragment.nodes, |node| {
139 if let Some(NodeBody::StreamCdcScan(node)) = &node.node_body {
140 b = is_parallelized_backfill_enabled(node);
141 false
142 } else {
143 true
144 }
145 });
146 b
147}
148
149pub fn is_parallelized_backfill_enabled(node: &StreamCdcScanNode) -> bool {
150 if let Some(options) = &node.options
151 && CdcScanOptions::from_proto(options).is_parallelized_backfill()
152 {
153 return true;
154 }
155 false
156}
157
158pub(crate) async fn assign_cdc_table_snapshot_splits(
159 original_table_job_id: JobId,
160 job: &StreamJobFragments,
161 meta_store: &SqlMetaStore,
162) -> MetaResult<CdcTableSnapshotSplitAssignment> {
163 let mut stream_scan_fragments = job
164 .fragments
165 .values()
166 .filter(|f| is_parallelized_backfill_enabled_cdc_scan_fragment(f))
167 .collect_vec();
168 if stream_scan_fragments.is_empty() {
169 return Ok(HashMap::default());
170 }
171 assert_eq!(
172 stream_scan_fragments.len(),
173 1,
174 "Expect 1 scan fragment, {} was found.",
175 stream_scan_fragments.len()
176 );
177 let stream_scan_fragment = stream_scan_fragments.swap_remove(0);
178 assign_cdc_table_snapshot_splits_impl(
179 original_table_job_id,
180 stream_scan_fragment
181 .actors
182 .iter()
183 .map(|a| a.actor_id)
184 .collect(),
185 meta_store,
186 None,
187 )
188 .await
189}
190
191pub(crate) async fn assign_cdc_table_snapshot_splits_pairs(
192 table_id_actor_ids: impl IntoIterator<Item = (JobId, HashSet<ActorId>)>,
193 meta_store: &SqlMetaStore,
194 completed_cdc_job_ids: HashSet<JobId>,
195) -> MetaResult<CdcTableSnapshotSplitAssignment> {
196 let mut assignments = HashMap::default();
197 for (table_id, actor_ids) in table_id_actor_ids {
198 assignments.extend(
199 assign_cdc_table_snapshot_splits_impl(
200 table_id,
201 actor_ids,
202 meta_store,
203 Some(&completed_cdc_job_ids),
204 )
205 .await?,
206 );
207 }
208 Ok(assignments)
209}
210
211pub(crate) async fn assign_cdc_table_snapshot_splits_impl(
212 table_job_id: JobId,
213 actor_ids: HashSet<ActorId>,
214 meta_store: &SqlMetaStore,
215 completed_cdc_job_ids: Option<&HashSet<JobId>>,
216) -> MetaResult<CdcTableSnapshotSplitAssignment> {
217 if actor_ids.is_empty() {
218 return Err(anyhow::anyhow!("Expect at least 1 actor, 0 was found.").into());
219 }
220 let splits = if let Some(completed_cdc_job_ids) = completed_cdc_job_ids
222 && completed_cdc_job_ids.contains(&table_job_id)
223 {
224 vec![single_merged_split()]
225 } else {
226 try_get_cdc_table_snapshot_splits(table_job_id, meta_store).await?
227 };
228 if splits.is_empty() {
229 return Err(
230 anyhow::anyhow!("Expect at least 1 CDC table snapshot splits, 0 was found.").into(),
231 );
232 }
233 let splits_per_actor = splits.len().div_ceil(actor_ids.len());
234 let mut assignments: HashMap<
235 ActorId,
236 Vec<risingwave_connector::source::CdcTableSnapshotSplitCommon<Vec<u8>>>,
237 _,
238 > = HashMap::default();
239 for (actor_id, splits) in actor_ids.iter().copied().zip_eq_debug(
240 splits
241 .into_iter()
242 .chunks(splits_per_actor)
243 .into_iter()
244 .map(|c| c.collect_vec())
245 .chain(iter::repeat(Vec::default()))
246 .take(actor_ids.len()),
247 ) {
248 assignments.insert(actor_id, splits);
249 }
250 Ok(assignments)
251}
252
253pub async fn try_get_cdc_table_snapshot_splits(
254 job_id: JobId,
255 meta_store: &SqlMetaStore,
256) -> MetaResult<Vec<CdcTableSnapshotSplitRaw>> {
257 use sea_orm::{ColumnTrait, EntityTrait, QueryFilter, QuerySelect};
258 let splits: Vec<(i64, Vec<u8>, Vec<u8>, i16)> = cdc_table_snapshot_split::Entity::find()
259 .select_only()
260 .columns([
261 cdc_table_snapshot_split::Column::SplitId,
262 cdc_table_snapshot_split::Column::Left,
263 cdc_table_snapshot_split::Column::Right,
264 cdc_table_snapshot_split::Column::IsBackfillFinished,
265 ])
266 .filter(cdc_table_snapshot_split::Column::TableId.eq(job_id.as_mv_table_id()))
267 .into_tuple()
268 .all(&meta_store.conn)
269 .await?;
270 let split_completed_count = splits
271 .iter()
272 .filter(|(_, _, _, is_backfill_finished)| *is_backfill_finished == 1)
273 .count();
274 assert!(
275 split_completed_count <= 1,
276 "split_completed_count = {}",
277 split_completed_count
278 );
279 let is_backfill_finished = split_completed_count == 1;
280 if is_backfill_finished && splits.len() != 1 {
281 tracing::error!(%job_id, ?splits, "unexpected split count");
284 bail!(
285 "unexpected split count: job_id={job_id}, split_total_count={}, split_completed_count={split_completed_count}",
286 splits.len()
287 );
288 }
289 let splits: Vec<_> = splits
290 .into_iter()
291 .sorted_by_key(|(split_id, _, _, _)| *split_id)
294 .map(
295 |(split_id, left_bound_inclusive, right_bound_exclusive, _)| CdcTableSnapshotSplitRaw {
296 split_id,
297 left_bound_inclusive,
298 right_bound_exclusive,
299 },
300 )
301 .collect();
302 Ok(splits)
303}
304
305fn single_merged_split() -> CdcTableSnapshotSplitRaw {
306 CdcTableSnapshotSplitRaw {
307 split_id: 1,
309 left_bound_inclusive: OwnedRow::new(vec![None]).value_serialize(),
310 right_bound_exclusive: OwnedRow::new(vec![None]).value_serialize(),
311 }
312}