risingwave_meta/stream/
cdc.rs1use std::collections::{HashMap, HashSet};
16use std::iter;
17use std::time::Duration;
18
19use anyhow::Context;
20use futures::pin_mut;
21use futures_async_stream::for_await;
22use itertools::Itertools;
23use risingwave_common::catalog::Schema;
24use risingwave_common::row::Row;
25use risingwave_common::util::iter_util::ZipEqDebug;
26use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont;
27use risingwave_connector::source::cdc::external::{
28 CdcTableSnapshotSplitOption, CdcTableType, ExternalTableConfig, ExternalTableReader,
29 SchemaTableName,
30};
31use risingwave_connector::source::cdc::{CdcScanOptions, CdcTableSnapshotSplitAssignment};
32use risingwave_connector::source::{CdcTableSnapshotSplit, CdcTableSnapshotSplitRaw};
33use risingwave_meta_model::{TableId, cdc_table_snapshot_split};
34use risingwave_pb::plan_common::ExternalTableDesc;
35use risingwave_pb::stream_plan::stream_node::NodeBody;
36use risingwave_pb::stream_plan::{StreamCdcScanNode, StreamCdcScanOptions};
37use sea_orm::{EntityTrait, Set, TransactionTrait};
38
39use crate::MetaResult;
40use crate::controller::SqlMetaStore;
41use crate::model::{Fragment, StreamJobFragments};
42
43pub(crate) async fn try_init_parallel_cdc_table_snapshot_splits(
46 table_id: u32,
47 table_desc: &ExternalTableDesc,
48 meta_store: &SqlMetaStore,
49 per_table_options: &Option<StreamCdcScanOptions>,
50 insert_batch_size: u64,
51 sleep_split_interval: u64,
52 sleep_duration_millis: u64,
53) -> MetaResult<()> {
54 let split_options = if let Some(per_table_options) = per_table_options {
55 if !CdcScanOptions::from_proto(per_table_options).is_parallelized_backfill() {
56 return Ok(());
57 }
58 CdcTableSnapshotSplitOption {
59 backfill_num_rows_per_split: per_table_options.backfill_num_rows_per_split,
60 backfill_as_even_splits: per_table_options.backfill_as_even_splits,
61 backfill_split_pk_column_index: per_table_options.backfill_split_pk_column_index,
62 }
63 } else {
64 return Ok(());
65 };
66 let table_type = CdcTableType::from_properties(&table_desc.connect_properties);
67 let table_schema: Schema = table_desc
69 .columns
70 .iter()
71 .filter(|col| {
72 col.additional_column
73 .as_ref()
74 .is_none_or(|a_col| a_col.column_type.is_none())
75 })
76 .map(Into::into)
77 .collect();
78 let table_pk_indices = table_desc
79 .pk
80 .iter()
81 .map(|k| k.column_index as usize)
82 .collect_vec();
83 let table_config = ExternalTableConfig::try_from_btreemap(
84 table_desc.connect_properties.clone(),
85 table_desc.secret_refs.clone(),
86 )
87 .context("failed to parse external table config")?;
88 let schema_table_name = SchemaTableName::from_properties(&table_desc.connect_properties);
89 let reader = table_type
90 .create_table_reader(
91 table_config,
92 table_schema,
93 table_pk_indices,
94 schema_table_name,
95 )
96 .await?;
97 let stream = reader.get_parallel_cdc_splits(split_options);
98 let mut insert_batch = vec![];
99 let mut splits_num = 0;
100 let txn = meta_store.conn.begin().await?;
101 pin_mut!(stream);
102 #[for_await]
103 for split in stream {
104 let split: CdcTableSnapshotSplit = split?;
105 splits_num += 1;
106 insert_batch.push(cdc_table_snapshot_split::ActiveModel {
107 table_id: Set(table_id.try_into().unwrap()),
108 split_id: Set(split.split_id.to_owned()),
109 left: Set(split.left_bound_inclusive.value_serialize()),
110 right: Set(split.right_bound_exclusive.value_serialize()),
111 });
112 if insert_batch.len() >= insert_batch_size as usize {
113 cdc_table_snapshot_split::Entity::insert_many(std::mem::take(&mut insert_batch))
114 .exec(&txn)
115 .await?;
116 }
117 if splits_num % sleep_split_interval == 0 {
118 tokio::time::sleep(Duration::from_millis(sleep_duration_millis)).await;
119 }
120 }
121 if !insert_batch.is_empty() {
122 cdc_table_snapshot_split::Entity::insert_many(std::mem::take(&mut insert_batch))
123 .exec(&txn)
124 .await?;
125 }
126 txn.commit().await?;
127 Ok(())
129}
130
131fn is_parallelized_backfill_enabled_cdc_scan_fragment(fragment: &Fragment) -> bool {
133 let mut b = false;
134 visit_stream_node_cont(&fragment.nodes, |node| {
135 if let Some(NodeBody::StreamCdcScan(node)) = &node.node_body {
136 b = is_parallelized_backfill_enabled(node);
137 false
138 } else {
139 true
140 }
141 });
142 b
143}
144
145pub fn is_parallelized_backfill_enabled(node: &StreamCdcScanNode) -> bool {
146 if let Some(options) = &node.options
147 && CdcScanOptions::from_proto(options).is_parallelized_backfill()
148 {
149 return true;
150 }
151 false
152}
153
154pub(crate) async fn assign_cdc_table_snapshot_splits(
155 table_fragments: impl Iterator<Item = &StreamJobFragments>,
156 meta_store: &SqlMetaStore,
157) -> MetaResult<CdcTableSnapshotSplitAssignment> {
158 let mut assignments = HashMap::default();
159 for job in table_fragments {
160 let mut stream_scan_fragments = job
161 .fragments
162 .values()
163 .filter(|f| is_parallelized_backfill_enabled_cdc_scan_fragment(f))
164 .collect_vec();
165 if stream_scan_fragments.is_empty() {
166 continue;
167 }
168 assert_eq!(stream_scan_fragments.len(), 1);
169 let stream_scan_fragment = stream_scan_fragments.swap_remove(0);
170 let assignment = assign_cdc_table_snapshot_splits_impl(
171 job.stream_job_id.table_id,
172 stream_scan_fragment
173 .actors
174 .iter()
175 .map(|a| a.actor_id)
176 .collect(),
177 meta_store,
178 )
179 .await?;
180 assignments.extend(assignment);
181 }
182 Ok(assignments)
183}
184
185pub(crate) async fn assign_cdc_table_snapshot_splits_pairs(
186 table_id_actor_ids: impl IntoIterator<Item = (u32, HashSet<u32>)>,
187 meta_store: &SqlMetaStore,
188) -> MetaResult<CdcTableSnapshotSplitAssignment> {
189 let mut assignments = HashMap::default();
190 for (table_id, actor_ids) in table_id_actor_ids {
191 assignments
192 .extend(assign_cdc_table_snapshot_splits_impl(table_id, actor_ids, meta_store).await?);
193 }
194 Ok(assignments)
195}
196
197pub(crate) async fn assign_cdc_table_snapshot_splits_impl(
198 table_id: u32,
199 actor_ids: HashSet<u32>,
200 meta_store: &SqlMetaStore,
201) -> MetaResult<CdcTableSnapshotSplitAssignment> {
202 if actor_ids.is_empty() {
203 return Err(anyhow::anyhow!("Expect at least 1 actor, 0 was found.").into());
204 }
205 let mut assignments = HashMap::default();
206 let splits = try_get_cdc_table_snapshot_splits(table_id, meta_store).await?;
207 if splits.is_empty() {
208 return Err(
209 anyhow::anyhow!("Expect at least 1 CDC table snapshot splits, 0 was found.").into(),
210 );
211 }
212 let splits_per_actor = splits.len().div_ceil(actor_ids.len());
213 for (actor_id, splits) in actor_ids.iter().copied().zip_eq_debug(
214 splits
215 .into_iter()
216 .chunks(splits_per_actor)
217 .into_iter()
218 .map(|c| c.collect_vec())
219 .chain(iter::repeat(Vec::default()))
220 .take(actor_ids.len()),
221 ) {
222 assignments.insert(actor_id, splits);
223 }
224 Ok(assignments)
225}
226
227pub(crate) async fn assign_cdc_table_snapshot_splits_for_replace_table(
228 original_table_id: u32,
229 job: &StreamJobFragments,
230 meta_store: &SqlMetaStore,
231) -> MetaResult<CdcTableSnapshotSplitAssignment> {
232 let mut stream_scan_fragments = job
233 .fragments
234 .values()
235 .filter(|f| is_parallelized_backfill_enabled_cdc_scan_fragment(f))
236 .collect_vec();
237 if stream_scan_fragments.is_empty() {
238 return Ok(HashMap::default());
239 }
240 assert_eq!(
241 stream_scan_fragments.len(),
242 1,
243 "Expect 1 scan fragment, {} was found.",
244 stream_scan_fragments.len()
245 );
246 let stream_scan_fragment = stream_scan_fragments.swap_remove(0);
247 let assignment = assign_cdc_table_snapshot_splits_impl(
248 original_table_id,
249 stream_scan_fragment
250 .actors
251 .iter()
252 .map(|a| a.actor_id)
253 .collect(),
254 meta_store,
255 )
256 .await?;
257 Ok(assignment)
258}
259
260pub async fn try_get_cdc_table_snapshot_splits(
261 table_id: u32,
262 meta_store: &SqlMetaStore,
263) -> MetaResult<Vec<CdcTableSnapshotSplitRaw>> {
264 use sea_orm::{ColumnTrait, EntityTrait, QueryFilter, QuerySelect};
265 let splits: Vec<(i64, Vec<u8>, Vec<u8>)> = cdc_table_snapshot_split::Entity::find()
266 .select_only()
267 .columns([
268 cdc_table_snapshot_split::Column::SplitId,
269 cdc_table_snapshot_split::Column::Left,
270 cdc_table_snapshot_split::Column::Right,
271 ])
272 .filter(
273 cdc_table_snapshot_split::Column::TableId
274 .eq(TryInto::<TableId>::try_into(table_id).unwrap()),
275 )
276 .into_tuple()
277 .all(&meta_store.conn)
278 .await?;
279 let splits: Vec<_> = splits
280 .into_iter()
281 .sorted_by_key(|(split_id, _, _)| *split_id)
284 .map(
285 |(split_id, left_bound_inclusive, right_bound_exclusive)| CdcTableSnapshotSplitRaw {
286 split_id,
287 left_bound_inclusive,
288 right_bound_exclusive,
289 },
290 )
291 .collect();
292 Ok(splits)
293}