1use std::cmp::Ordering;
16
17use anyhow::Context;
18use futures::stream::BoxStream;
19use futures::{StreamExt, pin_mut};
20use futures_async_stream::{for_await, try_stream};
21use itertools::Itertools;
22use risingwave_common::catalog::{Field, Schema};
23use risingwave_common::row::{OwnedRow, Row};
24use risingwave_common::types::{DataType, Datum, ScalarImpl, ToOwnedDatum};
25use risingwave_common::util::iter_util::ZipEqFast;
26use serde_derive::{Deserialize, Serialize};
27use tokio_postgres::types::PgLsn;
28
29use crate::connector_common::create_pg_client;
30use crate::error::{ConnectorError, ConnectorResult};
31use crate::parser::scalar_adapter::ScalarAdapter;
32use crate::parser::{postgres_cell_to_scalar_impl, postgres_row_to_owned_row};
33use crate::source::CdcTableSnapshotSplit;
34use crate::source::cdc::external::{
35 CdcOffset, CdcOffsetParseFunc, CdcTableSnapshotSplitOption, DebeziumOffset,
36 ExternalTableConfig, ExternalTableReader, SchemaTableName,
37};
38
39#[derive(Debug, Clone, Default, PartialEq, Serialize, Deserialize)]
40pub struct PostgresOffset {
41 pub txid: i64,
42 pub lsn: u64,
45}
46
47impl PartialOrd for PostgresOffset {
49 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
50 self.lsn.partial_cmp(&other.lsn)
51 }
52}
53
54impl PostgresOffset {
55 pub fn parse_debezium_offset(offset: &str) -> ConnectorResult<Self> {
56 let dbz_offset: DebeziumOffset = serde_json::from_str(offset)
57 .with_context(|| format!("invalid upstream offset: {}", offset))?;
58
59 Ok(Self {
60 txid: dbz_offset
61 .source_offset
62 .txid
63 .context("invalid postgres txid")?,
64 lsn: dbz_offset
65 .source_offset
66 .lsn
67 .context("invalid postgres lsn")?,
68 })
69 }
70}
71
72pub struct PostgresExternalTableReader {
73 rw_schema: Schema,
74 field_names: String,
75 pk_indices: Vec<usize>,
76 client: tokio::sync::Mutex<tokio_postgres::Client>,
77 schema_table_name: SchemaTableName,
78}
79
80impl ExternalTableReader for PostgresExternalTableReader {
81 async fn current_cdc_offset(&self) -> ConnectorResult<CdcOffset> {
82 let mut client = self.client.lock().await;
83 let trxn = client.transaction().await?;
85 let row = trxn.query_one("SELECT pg_current_wal_lsn()", &[]).await?;
86 let mut pg_offset = PostgresOffset::default();
87 let pg_lsn = row.get::<_, PgLsn>(0);
88 tracing::debug!("current lsn: {}", pg_lsn);
89 pg_offset.lsn = pg_lsn.into();
90
91 let txid_row = trxn.query_one("SELECT txid_current()", &[]).await?;
92 let txid: i64 = txid_row.get::<_, i64>(0);
93 pg_offset.txid = txid;
94
95 trxn.commit().await?;
97
98 Ok(CdcOffset::Postgres(pg_offset))
99 }
100
101 fn snapshot_read(
102 &self,
103 table_name: SchemaTableName,
104 start_pk: Option<OwnedRow>,
105 primary_keys: Vec<String>,
106 limit: u32,
107 ) -> BoxStream<'_, ConnectorResult<OwnedRow>> {
108 assert_eq!(table_name, self.schema_table_name);
109 self.snapshot_read_inner(table_name, start_pk, primary_keys, limit)
110 }
111
112 #[try_stream(boxed, ok = CdcTableSnapshotSplit, error = ConnectorError)]
113 async fn get_parallel_cdc_splits(&self, options: CdcTableSnapshotSplitOption) {
114 let backfill_num_rows_per_split = options.backfill_num_rows_per_split;
115 if backfill_num_rows_per_split == 0 {
116 return Err(anyhow::anyhow!(
117 "invalid backfill_num_rows_per_split, must be greater than 0"
118 )
119 .into());
120 }
121 if options.backfill_split_pk_column_index as usize >= self.pk_indices.len() {
122 return Err(anyhow::anyhow!(format!(
123 "invalid backfill_split_pk_column_index {}, out of bound",
124 options.backfill_split_pk_column_index
125 ))
126 .into());
127 }
128 let split_column = self.split_column(&options);
129 let row_stream = if options.backfill_as_even_splits
130 && is_supported_even_split_data_type(&split_column.data_type)
131 {
132 tracing::info!(?self.schema_table_name, ?self.rw_schema, ?self.pk_indices, ?split_column, "Get parallel cdc table snapshot even splits.");
134 self.as_even_splits(options)
135 } else {
136 tracing::info!(?self.schema_table_name, ?self.rw_schema, ?self.pk_indices, ?split_column, "Get parallel cdc table snapshot uneven splits.");
137 self.as_uneven_splits(options)
138 };
139 pin_mut!(row_stream);
140 #[for_await]
141 for row in row_stream {
142 let row = row?;
143 yield row;
144 }
145 }
146
147 fn split_snapshot_read(
148 &self,
149 table_name: SchemaTableName,
150 left: OwnedRow,
151 right: OwnedRow,
152 split_columns: Vec<Field>,
153 ) -> BoxStream<'_, ConnectorResult<OwnedRow>> {
154 assert_eq!(table_name, self.schema_table_name);
155 self.split_snapshot_read_inner(table_name, left, right, split_columns)
156 }
157}
158
159impl PostgresExternalTableReader {
160 pub async fn new(
161 config: ExternalTableConfig,
162 rw_schema: Schema,
163 pk_indices: Vec<usize>,
164 schema_table_name: SchemaTableName,
165 ) -> ConnectorResult<Self> {
166 tracing::info!(
167 ?rw_schema,
168 ?pk_indices,
169 "create postgres external table reader"
170 );
171
172 let client = create_pg_client(
173 &config.username,
174 &config.password,
175 &config.host,
176 &config.port,
177 &config.database,
178 &config.ssl_mode,
179 &config.ssl_root_cert,
180 )
181 .await?;
182
183 let field_names = rw_schema
184 .fields
185 .iter()
186 .map(|f| Self::quote_column(&f.name))
187 .join(",");
188
189 Ok(Self {
190 rw_schema,
191 field_names,
192 pk_indices,
193 client: tokio::sync::Mutex::new(client),
194 schema_table_name,
195 })
196 }
197
198 pub fn get_normalized_table_name(table_name: &SchemaTableName) -> String {
199 format!(
200 "\"{}\".\"{}\"",
201 table_name.schema_name, table_name.table_name
202 )
203 }
204
205 pub fn get_cdc_offset_parser() -> CdcOffsetParseFunc {
206 Box::new(move |offset| {
207 Ok(CdcOffset::Postgres(PostgresOffset::parse_debezium_offset(
208 offset,
209 )?))
210 })
211 }
212
213 #[try_stream(boxed, ok = OwnedRow, error = ConnectorError)]
214 async fn snapshot_read_inner(
215 &self,
216 table_name: SchemaTableName,
217 start_pk_row: Option<OwnedRow>,
218 primary_keys: Vec<String>,
219 scan_limit: u32,
220 ) {
221 let order_key = Self::get_order_key(&primary_keys);
222 let client = self.client.lock().await;
223 client.execute("set time zone '+00:00'", &[]).await?;
224
225 let stream = match start_pk_row {
226 Some(ref pk_row) => {
227 let prepared_scan_stmt = {
230 let primary_keys = self
231 .pk_indices
232 .iter()
233 .map(|i| self.rw_schema.fields[*i].name.clone())
234 .collect_vec();
235
236 let order_key = Self::get_order_key(&primary_keys);
237 let scan_sql = format!(
238 "SELECT {} FROM {} WHERE {} ORDER BY {} LIMIT {scan_limit}",
239 self.field_names,
240 Self::get_normalized_table_name(&table_name),
241 Self::filter_expression(&primary_keys),
242 order_key,
243 );
244 client.prepare(&scan_sql).await?
245 };
246
247 let params: Vec<Option<ScalarAdapter>> = pk_row
248 .iter()
249 .zip_eq_fast(prepared_scan_stmt.params())
250 .map(|(datum, ty)| {
251 datum
252 .map(|scalar| ScalarAdapter::from_scalar(scalar, ty))
253 .transpose()
254 })
255 .try_collect()?;
256
257 client.query_raw(&prepared_scan_stmt, ¶ms).await?
258 }
259 None => {
260 let sql = format!(
261 "SELECT {} FROM {} ORDER BY {} LIMIT {scan_limit}",
262 self.field_names,
263 Self::get_normalized_table_name(&table_name),
264 order_key,
265 );
266 let params: Vec<Option<ScalarAdapter>> = vec![];
267 client.query_raw(&sql, ¶ms).await?
268 }
269 };
270
271 let row_stream = stream.map(|row| {
272 let row = row?;
273 Ok::<_, crate::error::ConnectorError>(postgres_row_to_owned_row(row, &self.rw_schema))
274 });
275
276 pin_mut!(row_stream);
277 #[for_await]
278 for row in row_stream {
279 let row = row?;
280 yield row;
281 }
282 }
283
284 fn filter_expression(columns: &[String]) -> String {
286 let mut col_expr = String::new();
287 let mut arg_expr = String::new();
288 for (i, column) in columns.iter().enumerate() {
289 if i > 0 {
290 col_expr.push_str(", ");
291 arg_expr.push_str(", ");
292 }
293 col_expr.push_str(&Self::quote_column(column));
294 arg_expr.push_str(format!("${}", i + 1).as_str());
295 }
296 format!("({}) > ({})", col_expr, arg_expr)
297 }
298
299 fn split_filter_expression(
301 columns: &[String],
302 is_first_split: bool,
303 is_last_split: bool,
304 ) -> String {
305 let mut left_col_expr = String::new();
306 let mut left_arg_expr = String::new();
307 let mut right_col_expr = String::new();
308 let mut right_arg_expr = String::new();
309 let mut c = 1;
310 if !is_first_split {
311 for (i, column) in columns.iter().enumerate() {
312 if i > 0 {
313 left_col_expr.push_str(", ");
314 left_arg_expr.push_str(", ");
315 }
316 left_col_expr.push_str(&Self::quote_column(column));
317 left_arg_expr.push_str(format!("${}", c).as_str());
318 c += 1;
319 }
320 }
321 if !is_last_split {
322 for (i, column) in columns.iter().enumerate() {
323 if i > 0 {
324 right_col_expr.push_str(", ");
325 right_arg_expr.push_str(", ");
326 }
327 right_col_expr.push_str(&Self::quote_column(column));
328 right_arg_expr.push_str(format!("${}", c).as_str());
329 c += 1;
330 }
331 }
332 if is_first_split && is_last_split {
333 "1 = 1".to_owned()
334 } else if is_first_split {
335 format!("({}) < ({})", right_col_expr, right_arg_expr,)
336 } else if is_last_split {
337 format!("({}) >= ({})", left_col_expr, left_arg_expr,)
338 } else {
339 format!(
340 "({}) >= ({}) AND ({}) < ({})",
341 left_col_expr, left_arg_expr, right_col_expr, right_arg_expr,
342 )
343 }
344 }
345
346 fn get_order_key(primary_keys: &Vec<String>) -> String {
347 primary_keys
348 .iter()
349 .map(|col| Self::quote_column(col))
350 .join(",")
351 }
352
353 fn quote_column(column: &str) -> String {
354 format!("\"{}\"", column)
355 }
356
357 async fn min_and_max(
358 &self,
359 split_column: &Field,
360 ) -> ConnectorResult<Option<(ScalarImpl, ScalarImpl)>> {
361 let sql = format!(
362 "SELECT MIN({}), MAX({}) FROM {}",
363 split_column.name,
364 split_column.name,
365 Self::get_normalized_table_name(&self.schema_table_name),
366 );
367 let client = self.client.lock().await;
368 let rows = client.query(&sql, &[]).await?;
369 if rows.is_empty() {
370 Ok(None)
371 } else {
372 let row = &rows[0];
373 let min =
374 postgres_cell_to_scalar_impl(row, &split_column.data_type, 0, &split_column.name);
375 let max =
376 postgres_cell_to_scalar_impl(row, &split_column.data_type, 1, &split_column.name);
377 match (min, max) {
378 (Some(min), Some(max)) => Ok(Some((min, max))),
379 _ => Ok(None),
380 }
381 }
382 }
383
384 async fn next_split_right_bound_exclusive(
385 &self,
386 left_value: &ScalarImpl,
387 max_value: &ScalarImpl,
388 max_split_size: u64,
389 split_column: &Field,
390 ) -> ConnectorResult<Option<Datum>> {
391 let sql = format!(
392 "WITH t as (SELECT {} FROM {} WHERE {} >= $1 ORDER BY {} ASC LIMIT {}) SELECT CASE WHEN MAX({}) < $2 THEN MAX({}) ELSE NULL END FROM t",
393 Self::quote_column(&split_column.name),
394 Self::get_normalized_table_name(&self.schema_table_name),
395 Self::quote_column(&split_column.name),
396 Self::quote_column(&split_column.name),
397 max_split_size,
398 Self::quote_column(&split_column.name),
399 Self::quote_column(&split_column.name),
400 );
401 let client = self.client.lock().await;
402 let prepared_stmt = client.prepare(&sql).await?;
403 let params: Vec<Option<ScalarAdapter>> = vec![
404 Some(ScalarAdapter::from_scalar(
405 left_value.as_scalar_ref_impl(),
406 &prepared_stmt.params()[0],
407 )?),
408 Some(ScalarAdapter::from_scalar(
409 max_value.as_scalar_ref_impl(),
410 &prepared_stmt.params()[1],
411 )?),
412 ];
413 let stream = client.query_raw(&prepared_stmt, ¶ms).await?;
414 let datum_stream = stream.map(|row| {
415 let row = row?;
416 Ok::<_, ConnectorError>(postgres_cell_to_scalar_impl(
417 &row,
418 &split_column.data_type,
419 0,
420 &split_column.name,
421 ))
422 });
423 pin_mut!(datum_stream);
424 #[for_await]
425 for datum in datum_stream {
426 let right = datum?;
427 return Ok(Some(right.to_owned_datum()));
428 }
429 Ok(None)
430 }
431
432 async fn next_greater_bound(
433 &self,
434 start_offset: &ScalarImpl,
435 max_value: &ScalarImpl,
436 split_column: &Field,
437 ) -> ConnectorResult<Option<Datum>> {
438 let sql = format!(
439 "SELECT MIN({}) FROM {} WHERE {} > $1 AND {} <$2",
440 Self::quote_column(&split_column.name),
441 Self::get_normalized_table_name(&self.schema_table_name),
442 Self::quote_column(&split_column.name),
443 Self::quote_column(&split_column.name),
444 );
445 let client = self.client.lock().await;
446 let prepared_stmt = client.prepare(&sql).await?;
447 let params: Vec<Option<ScalarAdapter>> = vec![
448 Some(ScalarAdapter::from_scalar(
449 start_offset.as_scalar_ref_impl(),
450 &prepared_stmt.params()[0],
451 )?),
452 Some(ScalarAdapter::from_scalar(
453 max_value.as_scalar_ref_impl(),
454 &prepared_stmt.params()[1],
455 )?),
456 ];
457 let stream = client.query_raw(&prepared_stmt, ¶ms).await?;
458 let datum_stream = stream.map(|row| {
459 let row = row?;
460 Ok::<_, ConnectorError>(postgres_cell_to_scalar_impl(
461 &row,
462 &split_column.data_type,
463 0,
464 &split_column.name,
465 ))
466 });
467 pin_mut!(datum_stream);
468 #[for_await]
469 for datum in datum_stream {
470 let right = datum?;
471 return Ok(Some(right));
472 }
473 Ok(None)
474 }
475
476 #[try_stream(boxed, ok = OwnedRow, error = ConnectorError)]
477 async fn split_snapshot_read_inner(
478 &self,
479 table_name: SchemaTableName,
480 left: OwnedRow,
481 right: OwnedRow,
482 split_columns: Vec<Field>,
483 ) {
484 assert_eq!(
485 split_columns.len(),
486 1,
487 "multiple split columns is not supported yet"
488 );
489 assert_eq!(left.len(), 1, "multiple split columns is not supported yet");
490 assert_eq!(
491 right.len(),
492 1,
493 "multiple split columns is not supported yet"
494 );
495 let is_first_split = left[0].is_none();
496 let is_last_split = right[0].is_none();
497 let split_column_names = split_columns.iter().map(|c| c.name.clone()).collect_vec();
498 let client = self.client.lock().await;
499 client.execute("set time zone '+00:00'", &[]).await?;
500 let prepared_scan_stmt = {
503 let scan_sql = format!(
504 "SELECT {} FROM {} WHERE {}",
505 self.field_names,
506 Self::get_normalized_table_name(&table_name),
507 Self::split_filter_expression(&split_column_names, is_first_split, is_last_split),
508 );
509 client.prepare(&scan_sql).await?
510 };
511
512 let mut params: Vec<Option<ScalarAdapter>> = vec![];
513 if !is_first_split {
514 let left_params: Vec<Option<ScalarAdapter>> = left
515 .iter()
516 .zip_eq_fast(prepared_scan_stmt.params().iter().take(left.len()))
517 .map(|(datum, ty)| {
518 datum
519 .map(|scalar| ScalarAdapter::from_scalar(scalar, ty))
520 .transpose()
521 })
522 .try_collect()?;
523 params.extend(left_params);
524 }
525 if !is_last_split {
526 let right_params: Vec<Option<ScalarAdapter>> = right
527 .iter()
528 .zip_eq_fast(prepared_scan_stmt.params().iter().skip(params.len()))
529 .map(|(datum, ty)| {
530 datum
531 .map(|scalar| ScalarAdapter::from_scalar(scalar, ty))
532 .transpose()
533 })
534 .try_collect()?;
535 params.extend(right_params);
536 }
537
538 let stream = client.query_raw(&prepared_scan_stmt, ¶ms).await?;
539 let row_stream = stream.map(|row| {
540 let row = row?;
541 Ok::<_, crate::error::ConnectorError>(postgres_row_to_owned_row(row, &self.rw_schema))
542 });
543
544 pin_mut!(row_stream);
545 #[for_await]
546 for row in row_stream {
547 let row = row?;
548 yield row;
549 }
550 }
551
552 #[try_stream(boxed, ok = CdcTableSnapshotSplit, error = ConnectorError)]
553 async fn as_uneven_splits(&self, options: CdcTableSnapshotSplitOption) {
554 let split_column = self.split_column(&options);
555 let mut split_id = 1;
556 let Some((min_value, max_value)) = self.min_and_max(&split_column).await? else {
557 let left_bound_row = OwnedRow::new(vec![None]);
558 let right_bound_row = OwnedRow::new(vec![None]);
559 let split = CdcTableSnapshotSplit {
560 split_id,
561 left_bound_inclusive: left_bound_row,
562 right_bound_exclusive: right_bound_row,
563 };
564 yield split;
565 return Ok(());
566 };
567 let mut next_left_bound_inclusive = min_value.clone();
569 loop {
570 let left_bound_inclusive: Datum = if next_left_bound_inclusive == min_value {
571 None
572 } else {
573 Some(next_left_bound_inclusive.clone())
574 };
575 let right_bound_exclusive;
576 let mut next_right = self
577 .next_split_right_bound_exclusive(
578 &next_left_bound_inclusive,
579 &max_value,
580 options.backfill_num_rows_per_split,
581 &split_column,
582 )
583 .await?;
584 if let Some(Some(ref inner)) = next_right
585 && *inner == next_left_bound_inclusive
586 {
587 next_right = self
588 .next_greater_bound(&next_left_bound_inclusive, &max_value, &split_column)
589 .await?;
590 }
591 if let Some(next_right) = next_right {
592 match next_right {
593 None => {
594 right_bound_exclusive = None;
596 }
597 Some(next_right) => {
598 next_left_bound_inclusive = next_right.to_owned();
599 right_bound_exclusive = Some(next_right);
600 }
601 }
602 } else {
603 right_bound_exclusive = None;
605 };
606 let is_completed = right_bound_exclusive.is_none();
607 if is_completed && left_bound_inclusive.is_none() {
608 assert_eq!(split_id, 1);
609 }
610 tracing::info!(
611 split_id,
612 ?left_bound_inclusive,
613 ?right_bound_exclusive,
614 "New CDC table snapshot split."
615 );
616 let left_bound_row = OwnedRow::new(vec![left_bound_inclusive]);
617 let right_bound_row = OwnedRow::new(vec![right_bound_exclusive]);
618 let split = CdcTableSnapshotSplit {
619 split_id,
620 left_bound_inclusive: left_bound_row,
621 right_bound_exclusive: right_bound_row,
622 };
623 try_increase_split_id(&mut split_id)?;
624 yield split;
625 if is_completed {
626 break;
627 }
628 }
629 }
630
631 #[try_stream(boxed, ok = CdcTableSnapshotSplit, error = ConnectorError)]
632 async fn as_even_splits(&self, options: CdcTableSnapshotSplitOption) {
633 let split_column = self.split_column(&options);
634 let mut split_id = 1;
635 let Some((min_value, max_value)) = self.min_and_max(&split_column).await? else {
636 let left_bound_row = OwnedRow::new(vec![None]);
637 let right_bound_row = OwnedRow::new(vec![None]);
638 let split = CdcTableSnapshotSplit {
639 split_id,
640 left_bound_inclusive: left_bound_row,
641 right_bound_exclusive: right_bound_row,
642 };
643 yield split;
644 return Ok(());
645 };
646 let min_value = min_value.as_integral();
647 let max_value = max_value.as_integral();
648 let saturated_split_max_size = options
649 .backfill_num_rows_per_split
650 .try_into()
651 .unwrap_or(i64::MAX);
652 let mut left = None;
653 let mut right = Some(min_value.saturating_add(saturated_split_max_size));
654 loop {
655 let mut is_completed = false;
656 if right.as_ref().map(|r| *r >= max_value).unwrap_or(true) {
657 right = None;
658 is_completed = true;
659 }
660 let split = CdcTableSnapshotSplit {
661 split_id,
662 left_bound_inclusive: OwnedRow::new(vec![
663 left.map(|l| to_int_scalar(l, &split_column.data_type)),
664 ]),
665 right_bound_exclusive: OwnedRow::new(vec![
666 right.map(|r| to_int_scalar(r, &split_column.data_type)),
667 ]),
668 };
669 try_increase_split_id(&mut split_id)?;
670 yield split;
671 if is_completed {
672 break;
673 }
674 left = right;
675 right = left.map(|l| l.saturating_add(saturated_split_max_size));
676 }
677 }
678
679 fn split_column(&self, options: &CdcTableSnapshotSplitOption) -> Field {
680 self.rw_schema.fields[self.pk_indices[options.backfill_split_pk_column_index as usize]]
681 .clone()
682 }
683}
684
685fn to_int_scalar(i: i64, data_type: &DataType) -> ScalarImpl {
686 match data_type {
687 DataType::Int16 => ScalarImpl::Int16(i.try_into().unwrap()),
688 DataType::Int32 => ScalarImpl::Int32(i.try_into().unwrap()),
689 DataType::Int64 => ScalarImpl::Int64(i),
690 _ => {
691 panic!("Can't convert int {} to ScalarImpl::{}", i, data_type)
692 }
693 }
694}
695
696fn try_increase_split_id(split_id: &mut i64) -> ConnectorResult<()> {
697 match split_id.checked_add(1) {
698 Some(s) => {
699 *split_id = s;
700 Ok(())
701 }
702 None => Err(anyhow::anyhow!("too many CDC snapshot splits").into()),
703 }
704}
705
706fn is_supported_even_split_data_type(data_type: &DataType) -> bool {
708 matches!(
709 data_type,
710 DataType::Int16 | DataType::Int32 | DataType::Int64
711 )
712}
713
714#[cfg(test)]
715mod tests {
716 use std::collections::HashMap;
717
718 use futures::pin_mut;
719 use futures_async_stream::for_await;
720 use maplit::{convert_args, hashmap};
721 use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, Schema};
722 use risingwave_common::row::OwnedRow;
723 use risingwave_common::types::{DataType, ScalarImpl};
724
725 use crate::connector_common::PostgresExternalTable;
726 use crate::source::cdc::external::postgres::{PostgresExternalTableReader, PostgresOffset};
727 use crate::source::cdc::external::{ExternalTableConfig, ExternalTableReader, SchemaTableName};
728
729 #[ignore]
730 #[tokio::test]
731 async fn test_postgres_schema() {
732 let config = ExternalTableConfig {
733 connector: "postgres-cdc".to_owned(),
734 host: "localhost".to_owned(),
735 port: "8432".to_owned(),
736 username: "myuser".to_owned(),
737 password: "123456".to_owned(),
738 database: "mydb".to_owned(),
739 schema: "public".to_owned(),
740 table: "mytest".to_owned(),
741 ssl_mode: Default::default(),
742 ssl_root_cert: None,
743 encrypt: "false".to_owned(),
744 };
745
746 let table = PostgresExternalTable::connect(
747 &config.username,
748 &config.password,
749 &config.host,
750 config.port.parse::<u16>().unwrap(),
751 &config.database,
752 &config.schema,
753 &config.table,
754 &config.ssl_mode,
755 &config.ssl_root_cert,
756 false,
757 )
758 .await
759 .unwrap();
760
761 println!("columns: {:?}", &table.column_descs());
762 println!("primary keys: {:?}", &table.pk_names());
763 }
764
765 #[test]
766 fn test_postgres_offset() {
767 let off1 = PostgresOffset { txid: 4, lsn: 2 };
768 let off2 = PostgresOffset { txid: 1, lsn: 3 };
769 let off3 = PostgresOffset { txid: 5, lsn: 1 };
770
771 assert!(off1 < off2);
772 assert!(off3 < off1);
773 assert!(off2 > off3);
774 }
775
776 #[test]
777 fn test_filter_expression() {
778 let cols = vec!["v1".to_owned()];
779 let expr = PostgresExternalTableReader::filter_expression(&cols);
780 assert_eq!(expr, "(\"v1\") > ($1)");
781
782 let cols = vec!["v1".to_owned(), "v2".to_owned()];
783 let expr = PostgresExternalTableReader::filter_expression(&cols);
784 assert_eq!(expr, "(\"v1\", \"v2\") > ($1, $2)");
785
786 let cols = vec!["v1".to_owned(), "v2".to_owned(), "v3".to_owned()];
787 let expr = PostgresExternalTableReader::filter_expression(&cols);
788 assert_eq!(expr, "(\"v1\", \"v2\", \"v3\") > ($1, $2, $3)");
789 }
790
791 #[test]
792 fn test_split_filter_expression() {
793 let cols = vec!["v1".to_owned()];
794 let expr = PostgresExternalTableReader::split_filter_expression(&cols, true, true);
795 assert_eq!(expr, "1 = 1");
796
797 let expr = PostgresExternalTableReader::split_filter_expression(&cols, true, false);
798 assert_eq!(expr, "(\"v1\") < ($1)");
799
800 let expr = PostgresExternalTableReader::split_filter_expression(&cols, false, true);
801 assert_eq!(expr, "(\"v1\") >= ($1)");
802
803 let expr = PostgresExternalTableReader::split_filter_expression(&cols, false, false);
804 assert_eq!(expr, "(\"v1\") >= ($1) AND (\"v1\") < ($2)");
805 }
806
807 #[ignore]
809 #[tokio::test]
810 async fn test_pg_table_reader() {
811 let columns = vec![
812 ColumnDesc::named("v1", ColumnId::new(1), DataType::Int32),
813 ColumnDesc::named("v2", ColumnId::new(2), DataType::Varchar),
814 ColumnDesc::named("v3", ColumnId::new(3), DataType::Decimal),
815 ColumnDesc::named("v4", ColumnId::new(4), DataType::Date),
816 ];
817 let rw_schema = Schema {
818 fields: columns.iter().map(Field::from).collect(),
819 };
820
821 let props: HashMap<String, String> = convert_args!(hashmap!(
822 "hostname" => "localhost",
823 "port" => "8432",
824 "username" => "myuser",
825 "password" => "123456",
826 "database.name" => "mydb",
827 "schema.name" => "public",
828 "table.name" => "t1"));
829
830 let config =
831 serde_json::from_value::<ExternalTableConfig>(serde_json::to_value(props).unwrap())
832 .unwrap();
833 let schema_table_name = SchemaTableName {
834 schema_name: "public".to_owned(),
835 table_name: "t1".to_owned(),
836 };
837 let reader = PostgresExternalTableReader::new(
838 config,
839 rw_schema,
840 vec![0, 1],
841 schema_table_name.clone(),
842 )
843 .await
844 .unwrap();
845
846 let offset = reader.current_cdc_offset().await.unwrap();
847 println!("CdcOffset: {:?}", offset);
848
849 let start_pk = OwnedRow::new(vec![Some(ScalarImpl::from(3)), Some(ScalarImpl::from("c"))]);
850 let stream = reader.snapshot_read(
851 schema_table_name,
852 Some(start_pk),
853 vec!["v1".to_owned(), "v2".to_owned()],
854 1000,
855 );
856
857 pin_mut!(stream);
858 #[for_await]
859 for row in stream {
860 println!("OwnedRow: {:?}", row);
861 }
862 }
863}