risingwave_connector/source/cdc/external/
postgres.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::Ordering;
16
17use anyhow::Context;
18use futures::stream::BoxStream;
19use futures::{StreamExt, pin_mut};
20use futures_async_stream::{for_await, try_stream};
21use itertools::Itertools;
22use risingwave_common::catalog::{Field, Schema};
23use risingwave_common::row::{OwnedRow, Row};
24use risingwave_common::types::{DataType, Datum, ScalarImpl, ToOwnedDatum};
25use risingwave_common::util::iter_util::ZipEqFast;
26use serde_derive::{Deserialize, Serialize};
27use tokio_postgres::types::PgLsn;
28
29use crate::connector_common::create_pg_client;
30use crate::error::{ConnectorError, ConnectorResult};
31use crate::parser::scalar_adapter::ScalarAdapter;
32use crate::parser::{postgres_cell_to_scalar_impl, postgres_row_to_owned_row};
33use crate::source::CdcTableSnapshotSplit;
34use crate::source::cdc::external::{
35    CdcOffset, CdcOffsetParseFunc, CdcTableSnapshotSplitOption, DebeziumOffset,
36    ExternalTableConfig, ExternalTableReader, SchemaTableName,
37};
38
39#[derive(Debug, Clone, Default, PartialEq, Serialize, Deserialize)]
40pub struct PostgresOffset {
41    pub txid: i64,
42    // In postgres, an LSN is a 64-bit integer, representing a byte position in the write-ahead log stream.
43    // It is printed as two hexadecimal numbers of up to 8 digits each, separated by a slash; for example, 16/B374D848
44    pub lsn: u64,
45}
46
47// only compare the lsn field
48impl PartialOrd for PostgresOffset {
49    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
50        self.lsn.partial_cmp(&other.lsn)
51    }
52}
53
54impl PostgresOffset {
55    pub fn parse_debezium_offset(offset: &str) -> ConnectorResult<Self> {
56        let dbz_offset: DebeziumOffset = serde_json::from_str(offset)
57            .with_context(|| format!("invalid upstream offset: {}", offset))?;
58
59        Ok(Self {
60            txid: dbz_offset
61                .source_offset
62                .txid
63                .context("invalid postgres txid")?,
64            lsn: dbz_offset
65                .source_offset
66                .lsn
67                .context("invalid postgres lsn")?,
68        })
69    }
70}
71
72pub struct PostgresExternalTableReader {
73    rw_schema: Schema,
74    field_names: String,
75    pk_indices: Vec<usize>,
76    client: tokio::sync::Mutex<tokio_postgres::Client>,
77    schema_table_name: SchemaTableName,
78}
79
80impl ExternalTableReader for PostgresExternalTableReader {
81    async fn current_cdc_offset(&self) -> ConnectorResult<CdcOffset> {
82        let mut client = self.client.lock().await;
83        // start a transaction to read current lsn and txid
84        let trxn = client.transaction().await?;
85        let row = trxn.query_one("SELECT pg_current_wal_lsn()", &[]).await?;
86        let mut pg_offset = PostgresOffset::default();
87        let pg_lsn = row.get::<_, PgLsn>(0);
88        tracing::debug!("current lsn: {}", pg_lsn);
89        pg_offset.lsn = pg_lsn.into();
90
91        let txid_row = trxn.query_one("SELECT txid_current()", &[]).await?;
92        let txid: i64 = txid_row.get::<_, i64>(0);
93        pg_offset.txid = txid;
94
95        // commit the transaction
96        trxn.commit().await?;
97
98        Ok(CdcOffset::Postgres(pg_offset))
99    }
100
101    fn snapshot_read(
102        &self,
103        table_name: SchemaTableName,
104        start_pk: Option<OwnedRow>,
105        primary_keys: Vec<String>,
106        limit: u32,
107    ) -> BoxStream<'_, ConnectorResult<OwnedRow>> {
108        assert_eq!(table_name, self.schema_table_name);
109        self.snapshot_read_inner(table_name, start_pk, primary_keys, limit)
110    }
111
112    #[try_stream(boxed, ok = CdcTableSnapshotSplit, error = ConnectorError)]
113    async fn get_parallel_cdc_splits(&self, options: CdcTableSnapshotSplitOption) {
114        let backfill_num_rows_per_split = options.backfill_num_rows_per_split;
115        if backfill_num_rows_per_split == 0 {
116            return Err(anyhow::anyhow!(
117                "invalid backfill_num_rows_per_split, must be greater than 0"
118            )
119            .into());
120        }
121        if options.backfill_split_pk_column_index as usize >= self.pk_indices.len() {
122            return Err(anyhow::anyhow!(format!(
123                "invalid backfill_split_pk_column_index {}, out of bound",
124                options.backfill_split_pk_column_index
125            ))
126            .into());
127        }
128        let split_column = self.split_column(&options);
129        let row_stream = if options.backfill_as_even_splits
130            && is_supported_even_split_data_type(&split_column.data_type)
131        {
132            // For certain types, use evenly-sized partition to optimize performance.
133            tracing::info!(?self.schema_table_name, ?self.rw_schema, ?self.pk_indices, ?split_column, "Get parallel cdc table snapshot even splits.");
134            self.as_even_splits(options)
135        } else {
136            tracing::info!(?self.schema_table_name, ?self.rw_schema, ?self.pk_indices, ?split_column, "Get parallel cdc table snapshot uneven splits.");
137            self.as_uneven_splits(options)
138        };
139        pin_mut!(row_stream);
140        #[for_await]
141        for row in row_stream {
142            let row = row?;
143            yield row;
144        }
145    }
146
147    fn split_snapshot_read(
148        &self,
149        table_name: SchemaTableName,
150        left: OwnedRow,
151        right: OwnedRow,
152        split_columns: Vec<Field>,
153    ) -> BoxStream<'_, ConnectorResult<OwnedRow>> {
154        assert_eq!(table_name, self.schema_table_name);
155        self.split_snapshot_read_inner(table_name, left, right, split_columns)
156    }
157}
158
159impl PostgresExternalTableReader {
160    pub async fn new(
161        config: ExternalTableConfig,
162        rw_schema: Schema,
163        pk_indices: Vec<usize>,
164        schema_table_name: SchemaTableName,
165    ) -> ConnectorResult<Self> {
166        tracing::info!(
167            ?rw_schema,
168            ?pk_indices,
169            "create postgres external table reader"
170        );
171
172        let client = create_pg_client(
173            &config.username,
174            &config.password,
175            &config.host,
176            &config.port,
177            &config.database,
178            &config.ssl_mode,
179            &config.ssl_root_cert,
180        )
181        .await?;
182
183        let field_names = rw_schema
184            .fields
185            .iter()
186            .map(|f| Self::quote_column(&f.name))
187            .join(",");
188
189        Ok(Self {
190            rw_schema,
191            field_names,
192            pk_indices,
193            client: tokio::sync::Mutex::new(client),
194            schema_table_name,
195        })
196    }
197
198    pub fn get_normalized_table_name(table_name: &SchemaTableName) -> String {
199        format!(
200            "\"{}\".\"{}\"",
201            table_name.schema_name, table_name.table_name
202        )
203    }
204
205    pub fn get_cdc_offset_parser() -> CdcOffsetParseFunc {
206        Box::new(move |offset| {
207            Ok(CdcOffset::Postgres(PostgresOffset::parse_debezium_offset(
208                offset,
209            )?))
210        })
211    }
212
213    #[try_stream(boxed, ok = OwnedRow, error = ConnectorError)]
214    async fn snapshot_read_inner(
215        &self,
216        table_name: SchemaTableName,
217        start_pk_row: Option<OwnedRow>,
218        primary_keys: Vec<String>,
219        scan_limit: u32,
220    ) {
221        let order_key = Self::get_order_key(&primary_keys);
222        let client = self.client.lock().await;
223        client.execute("set time zone '+00:00'", &[]).await?;
224
225        let stream = match start_pk_row {
226            Some(ref pk_row) => {
227                // prepare the scan statement, since we may need to convert the RW data type to postgres data type
228                // e.g. varchar to uuid
229                let prepared_scan_stmt = {
230                    let primary_keys = self
231                        .pk_indices
232                        .iter()
233                        .map(|i| self.rw_schema.fields[*i].name.clone())
234                        .collect_vec();
235
236                    let order_key = Self::get_order_key(&primary_keys);
237                    let scan_sql = format!(
238                        "SELECT {} FROM {} WHERE {} ORDER BY {} LIMIT {scan_limit}",
239                        self.field_names,
240                        Self::get_normalized_table_name(&table_name),
241                        Self::filter_expression(&primary_keys),
242                        order_key,
243                    );
244                    client.prepare(&scan_sql).await?
245                };
246
247                let params: Vec<Option<ScalarAdapter>> = pk_row
248                    .iter()
249                    .zip_eq_fast(prepared_scan_stmt.params())
250                    .map(|(datum, ty)| {
251                        datum
252                            .map(|scalar| ScalarAdapter::from_scalar(scalar, ty))
253                            .transpose()
254                    })
255                    .try_collect()?;
256
257                client.query_raw(&prepared_scan_stmt, &params).await?
258            }
259            None => {
260                let sql = format!(
261                    "SELECT {} FROM {} ORDER BY {} LIMIT {scan_limit}",
262                    self.field_names,
263                    Self::get_normalized_table_name(&table_name),
264                    order_key,
265                );
266                let params: Vec<Option<ScalarAdapter>> = vec![];
267                client.query_raw(&sql, &params).await?
268            }
269        };
270
271        let row_stream = stream.map(|row| {
272            let row = row?;
273            Ok::<_, crate::error::ConnectorError>(postgres_row_to_owned_row(row, &self.rw_schema))
274        });
275
276        pin_mut!(row_stream);
277        #[for_await]
278        for row in row_stream {
279            let row = row?;
280            yield row;
281        }
282    }
283
284    // row filter expression: (v1, v2, v3) > ($1, $2, $3)
285    fn filter_expression(columns: &[String]) -> String {
286        let mut col_expr = String::new();
287        let mut arg_expr = String::new();
288        for (i, column) in columns.iter().enumerate() {
289            if i > 0 {
290                col_expr.push_str(", ");
291                arg_expr.push_str(", ");
292            }
293            col_expr.push_str(&Self::quote_column(column));
294            arg_expr.push_str(format!("${}", i + 1).as_str());
295        }
296        format!("({}) > ({})", col_expr, arg_expr)
297    }
298
299    // row filter expression: (v1, v2, v3) >= ($1, $2, $3) AND (v1, v2, v3) < ($1, $2, $3)
300    fn split_filter_expression(
301        columns: &[String],
302        is_first_split: bool,
303        is_last_split: bool,
304    ) -> String {
305        let mut left_col_expr = String::new();
306        let mut left_arg_expr = String::new();
307        let mut right_col_expr = String::new();
308        let mut right_arg_expr = String::new();
309        let mut c = 1;
310        if !is_first_split {
311            for (i, column) in columns.iter().enumerate() {
312                if i > 0 {
313                    left_col_expr.push_str(", ");
314                    left_arg_expr.push_str(", ");
315                }
316                left_col_expr.push_str(&Self::quote_column(column));
317                left_arg_expr.push_str(format!("${}", c).as_str());
318                c += 1;
319            }
320        }
321        if !is_last_split {
322            for (i, column) in columns.iter().enumerate() {
323                if i > 0 {
324                    right_col_expr.push_str(", ");
325                    right_arg_expr.push_str(", ");
326                }
327                right_col_expr.push_str(&Self::quote_column(column));
328                right_arg_expr.push_str(format!("${}", c).as_str());
329                c += 1;
330            }
331        }
332        if is_first_split && is_last_split {
333            "1 = 1".to_owned()
334        } else if is_first_split {
335            format!("({}) < ({})", right_col_expr, right_arg_expr,)
336        } else if is_last_split {
337            format!("({}) >= ({})", left_col_expr, left_arg_expr,)
338        } else {
339            format!(
340                "({}) >= ({}) AND ({}) < ({})",
341                left_col_expr, left_arg_expr, right_col_expr, right_arg_expr,
342            )
343        }
344    }
345
346    fn get_order_key(primary_keys: &Vec<String>) -> String {
347        primary_keys
348            .iter()
349            .map(|col| Self::quote_column(col))
350            .join(",")
351    }
352
353    fn quote_column(column: &str) -> String {
354        format!("\"{}\"", column)
355    }
356
357    async fn min_and_max(
358        &self,
359        split_column: &Field,
360    ) -> ConnectorResult<Option<(ScalarImpl, ScalarImpl)>> {
361        let sql = format!(
362            "SELECT MIN({}), MAX({}) FROM {}",
363            split_column.name,
364            split_column.name,
365            Self::get_normalized_table_name(&self.schema_table_name),
366        );
367        let client = self.client.lock().await;
368        let rows = client.query(&sql, &[]).await?;
369        if rows.is_empty() {
370            Ok(None)
371        } else {
372            let row = &rows[0];
373            let min =
374                postgres_cell_to_scalar_impl(row, &split_column.data_type, 0, &split_column.name);
375            let max =
376                postgres_cell_to_scalar_impl(row, &split_column.data_type, 1, &split_column.name);
377            match (min, max) {
378                (Some(min), Some(max)) => Ok(Some((min, max))),
379                _ => Ok(None),
380            }
381        }
382    }
383
384    async fn next_split_right_bound_exclusive(
385        &self,
386        left_value: &ScalarImpl,
387        max_value: &ScalarImpl,
388        max_split_size: u64,
389        split_column: &Field,
390    ) -> ConnectorResult<Option<Datum>> {
391        let sql = format!(
392            "WITH t as (SELECT {} FROM {} WHERE {} >= $1 ORDER BY {} ASC LIMIT {}) SELECT CASE WHEN MAX({}) < $2 THEN MAX({}) ELSE NULL END FROM t",
393            Self::quote_column(&split_column.name),
394            Self::get_normalized_table_name(&self.schema_table_name),
395            Self::quote_column(&split_column.name),
396            Self::quote_column(&split_column.name),
397            max_split_size,
398            Self::quote_column(&split_column.name),
399            Self::quote_column(&split_column.name),
400        );
401        let client = self.client.lock().await;
402        let prepared_stmt = client.prepare(&sql).await?;
403        let params: Vec<Option<ScalarAdapter>> = vec![
404            Some(ScalarAdapter::from_scalar(
405                left_value.as_scalar_ref_impl(),
406                &prepared_stmt.params()[0],
407            )?),
408            Some(ScalarAdapter::from_scalar(
409                max_value.as_scalar_ref_impl(),
410                &prepared_stmt.params()[1],
411            )?),
412        ];
413        let stream = client.query_raw(&prepared_stmt, &params).await?;
414        let datum_stream = stream.map(|row| {
415            let row = row?;
416            Ok::<_, ConnectorError>(postgres_cell_to_scalar_impl(
417                &row,
418                &split_column.data_type,
419                0,
420                &split_column.name,
421            ))
422        });
423        pin_mut!(datum_stream);
424        #[for_await]
425        for datum in datum_stream {
426            let right = datum?;
427            return Ok(Some(right.to_owned_datum()));
428        }
429        Ok(None)
430    }
431
432    async fn next_greater_bound(
433        &self,
434        start_offset: &ScalarImpl,
435        max_value: &ScalarImpl,
436        split_column: &Field,
437    ) -> ConnectorResult<Option<Datum>> {
438        let sql = format!(
439            "SELECT MIN({}) FROM {} WHERE {} > $1 AND {} <$2",
440            Self::quote_column(&split_column.name),
441            Self::get_normalized_table_name(&self.schema_table_name),
442            Self::quote_column(&split_column.name),
443            Self::quote_column(&split_column.name),
444        );
445        let client = self.client.lock().await;
446        let prepared_stmt = client.prepare(&sql).await?;
447        let params: Vec<Option<ScalarAdapter>> = vec![
448            Some(ScalarAdapter::from_scalar(
449                start_offset.as_scalar_ref_impl(),
450                &prepared_stmt.params()[0],
451            )?),
452            Some(ScalarAdapter::from_scalar(
453                max_value.as_scalar_ref_impl(),
454                &prepared_stmt.params()[1],
455            )?),
456        ];
457        let stream = client.query_raw(&prepared_stmt, &params).await?;
458        let datum_stream = stream.map(|row| {
459            let row = row?;
460            Ok::<_, ConnectorError>(postgres_cell_to_scalar_impl(
461                &row,
462                &split_column.data_type,
463                0,
464                &split_column.name,
465            ))
466        });
467        pin_mut!(datum_stream);
468        #[for_await]
469        for datum in datum_stream {
470            let right = datum?;
471            return Ok(Some(right));
472        }
473        Ok(None)
474    }
475
476    #[try_stream(boxed, ok = OwnedRow, error = ConnectorError)]
477    async fn split_snapshot_read_inner(
478        &self,
479        table_name: SchemaTableName,
480        left: OwnedRow,
481        right: OwnedRow,
482        split_columns: Vec<Field>,
483    ) {
484        assert_eq!(
485            split_columns.len(),
486            1,
487            "multiple split columns is not supported yet"
488        );
489        assert_eq!(left.len(), 1, "multiple split columns is not supported yet");
490        assert_eq!(
491            right.len(),
492            1,
493            "multiple split columns is not supported yet"
494        );
495        let is_first_split = left[0].is_none();
496        let is_last_split = right[0].is_none();
497        let split_column_names = split_columns.iter().map(|c| c.name.clone()).collect_vec();
498        let client = self.client.lock().await;
499        client.execute("set time zone '+00:00'", &[]).await?;
500        // prepare the scan statement, since we may need to convert the RW data type to postgres data type
501        // e.g. varchar to uuid
502        let prepared_scan_stmt = {
503            let scan_sql = format!(
504                "SELECT {} FROM {} WHERE {}",
505                self.field_names,
506                Self::get_normalized_table_name(&table_name),
507                Self::split_filter_expression(&split_column_names, is_first_split, is_last_split),
508            );
509            client.prepare(&scan_sql).await?
510        };
511
512        let mut params: Vec<Option<ScalarAdapter>> = vec![];
513        if !is_first_split {
514            let left_params: Vec<Option<ScalarAdapter>> = left
515                .iter()
516                .zip_eq_fast(prepared_scan_stmt.params().iter().take(left.len()))
517                .map(|(datum, ty)| {
518                    datum
519                        .map(|scalar| ScalarAdapter::from_scalar(scalar, ty))
520                        .transpose()
521                })
522                .try_collect()?;
523            params.extend(left_params);
524        }
525        if !is_last_split {
526            let right_params: Vec<Option<ScalarAdapter>> = right
527                .iter()
528                .zip_eq_fast(prepared_scan_stmt.params().iter().skip(params.len()))
529                .map(|(datum, ty)| {
530                    datum
531                        .map(|scalar| ScalarAdapter::from_scalar(scalar, ty))
532                        .transpose()
533                })
534                .try_collect()?;
535            params.extend(right_params);
536        }
537
538        let stream = client.query_raw(&prepared_scan_stmt, &params).await?;
539        let row_stream = stream.map(|row| {
540            let row = row?;
541            Ok::<_, crate::error::ConnectorError>(postgres_row_to_owned_row(row, &self.rw_schema))
542        });
543
544        pin_mut!(row_stream);
545        #[for_await]
546        for row in row_stream {
547            let row = row?;
548            yield row;
549        }
550    }
551
552    #[try_stream(boxed, ok = CdcTableSnapshotSplit, error = ConnectorError)]
553    async fn as_uneven_splits(&self, options: CdcTableSnapshotSplitOption) {
554        let split_column = self.split_column(&options);
555        let mut split_id = 1;
556        let Some((min_value, max_value)) = self.min_and_max(&split_column).await? else {
557            let left_bound_row = OwnedRow::new(vec![None]);
558            let right_bound_row = OwnedRow::new(vec![None]);
559            let split = CdcTableSnapshotSplit {
560                split_id,
561                left_bound_inclusive: left_bound_row,
562                right_bound_exclusive: right_bound_row,
563            };
564            yield split;
565            return Ok(());
566        };
567        // left bound will never be NULL value.
568        let mut next_left_bound_inclusive = min_value.clone();
569        loop {
570            let left_bound_inclusive: Datum = if next_left_bound_inclusive == min_value {
571                None
572            } else {
573                Some(next_left_bound_inclusive.clone())
574            };
575            let right_bound_exclusive;
576            let mut next_right = self
577                .next_split_right_bound_exclusive(
578                    &next_left_bound_inclusive,
579                    &max_value,
580                    options.backfill_num_rows_per_split,
581                    &split_column,
582                )
583                .await?;
584            if let Some(Some(ref inner)) = next_right
585                && *inner == next_left_bound_inclusive
586            {
587                next_right = self
588                    .next_greater_bound(&next_left_bound_inclusive, &max_value, &split_column)
589                    .await?;
590            }
591            if let Some(next_right) = next_right {
592                match next_right {
593                    None => {
594                        // NULL found.
595                        right_bound_exclusive = None;
596                    }
597                    Some(next_right) => {
598                        next_left_bound_inclusive = next_right.to_owned();
599                        right_bound_exclusive = Some(next_right);
600                    }
601                }
602            } else {
603                // Not found.
604                right_bound_exclusive = None;
605            };
606            let is_completed = right_bound_exclusive.is_none();
607            if is_completed && left_bound_inclusive.is_none() {
608                assert_eq!(split_id, 1);
609            }
610            tracing::info!(
611                split_id,
612                ?left_bound_inclusive,
613                ?right_bound_exclusive,
614                "New CDC table snapshot split."
615            );
616            let left_bound_row = OwnedRow::new(vec![left_bound_inclusive]);
617            let right_bound_row = OwnedRow::new(vec![right_bound_exclusive]);
618            let split = CdcTableSnapshotSplit {
619                split_id,
620                left_bound_inclusive: left_bound_row,
621                right_bound_exclusive: right_bound_row,
622            };
623            try_increase_split_id(&mut split_id)?;
624            yield split;
625            if is_completed {
626                break;
627            }
628        }
629    }
630
631    #[try_stream(boxed, ok = CdcTableSnapshotSplit, error = ConnectorError)]
632    async fn as_even_splits(&self, options: CdcTableSnapshotSplitOption) {
633        let split_column = self.split_column(&options);
634        let mut split_id = 1;
635        let Some((min_value, max_value)) = self.min_and_max(&split_column).await? else {
636            let left_bound_row = OwnedRow::new(vec![None]);
637            let right_bound_row = OwnedRow::new(vec![None]);
638            let split = CdcTableSnapshotSplit {
639                split_id,
640                left_bound_inclusive: left_bound_row,
641                right_bound_exclusive: right_bound_row,
642            };
643            yield split;
644            return Ok(());
645        };
646        let min_value = min_value.as_integral();
647        let max_value = max_value.as_integral();
648        let saturated_split_max_size = options
649            .backfill_num_rows_per_split
650            .try_into()
651            .unwrap_or(i64::MAX);
652        let mut left = None;
653        let mut right = Some(min_value.saturating_add(saturated_split_max_size));
654        loop {
655            let mut is_completed = false;
656            if right.as_ref().map(|r| *r >= max_value).unwrap_or(true) {
657                right = None;
658                is_completed = true;
659            }
660            let split = CdcTableSnapshotSplit {
661                split_id,
662                left_bound_inclusive: OwnedRow::new(vec![
663                    left.map(|l| to_int_scalar(l, &split_column.data_type)),
664                ]),
665                right_bound_exclusive: OwnedRow::new(vec![
666                    right.map(|r| to_int_scalar(r, &split_column.data_type)),
667                ]),
668            };
669            try_increase_split_id(&mut split_id)?;
670            yield split;
671            if is_completed {
672                break;
673            }
674            left = right;
675            right = left.map(|l| l.saturating_add(saturated_split_max_size));
676        }
677    }
678
679    fn split_column(&self, options: &CdcTableSnapshotSplitOption) -> Field {
680        self.rw_schema.fields[self.pk_indices[options.backfill_split_pk_column_index as usize]]
681            .clone()
682    }
683}
684
685fn to_int_scalar(i: i64, data_type: &DataType) -> ScalarImpl {
686    match data_type {
687        DataType::Int16 => ScalarImpl::Int16(i.try_into().unwrap()),
688        DataType::Int32 => ScalarImpl::Int32(i.try_into().unwrap()),
689        DataType::Int64 => ScalarImpl::Int64(i),
690        _ => {
691            panic!("Can't convert int {} to ScalarImpl::{}", i, data_type)
692        }
693    }
694}
695
696fn try_increase_split_id(split_id: &mut i64) -> ConnectorResult<()> {
697    match split_id.checked_add(1) {
698        Some(s) => {
699            *split_id = s;
700            Ok(())
701        }
702        None => Err(anyhow::anyhow!("too many CDC snapshot splits").into()),
703    }
704}
705
706/// Use the first column of primary keys to split table.
707fn is_supported_even_split_data_type(data_type: &DataType) -> bool {
708    matches!(
709        data_type,
710        DataType::Int16 | DataType::Int32 | DataType::Int64
711    )
712}
713
714#[cfg(test)]
715mod tests {
716    use std::collections::HashMap;
717
718    use futures::pin_mut;
719    use futures_async_stream::for_await;
720    use maplit::{convert_args, hashmap};
721    use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, Schema};
722    use risingwave_common::row::OwnedRow;
723    use risingwave_common::types::{DataType, ScalarImpl};
724
725    use crate::connector_common::PostgresExternalTable;
726    use crate::source::cdc::external::postgres::{PostgresExternalTableReader, PostgresOffset};
727    use crate::source::cdc::external::{ExternalTableConfig, ExternalTableReader, SchemaTableName};
728
729    #[ignore]
730    #[tokio::test]
731    async fn test_postgres_schema() {
732        let config = ExternalTableConfig {
733            connector: "postgres-cdc".to_owned(),
734            host: "localhost".to_owned(),
735            port: "8432".to_owned(),
736            username: "myuser".to_owned(),
737            password: "123456".to_owned(),
738            database: "mydb".to_owned(),
739            schema: "public".to_owned(),
740            table: "mytest".to_owned(),
741            ssl_mode: Default::default(),
742            ssl_root_cert: None,
743            encrypt: "false".to_owned(),
744        };
745
746        let table = PostgresExternalTable::connect(
747            &config.username,
748            &config.password,
749            &config.host,
750            config.port.parse::<u16>().unwrap(),
751            &config.database,
752            &config.schema,
753            &config.table,
754            &config.ssl_mode,
755            &config.ssl_root_cert,
756            false,
757        )
758        .await
759        .unwrap();
760
761        println!("columns: {:?}", &table.column_descs());
762        println!("primary keys: {:?}", &table.pk_names());
763    }
764
765    #[test]
766    fn test_postgres_offset() {
767        let off1 = PostgresOffset { txid: 4, lsn: 2 };
768        let off2 = PostgresOffset { txid: 1, lsn: 3 };
769        let off3 = PostgresOffset { txid: 5, lsn: 1 };
770
771        assert!(off1 < off2);
772        assert!(off3 < off1);
773        assert!(off2 > off3);
774    }
775
776    #[test]
777    fn test_filter_expression() {
778        let cols = vec!["v1".to_owned()];
779        let expr = PostgresExternalTableReader::filter_expression(&cols);
780        assert_eq!(expr, "(\"v1\") > ($1)");
781
782        let cols = vec!["v1".to_owned(), "v2".to_owned()];
783        let expr = PostgresExternalTableReader::filter_expression(&cols);
784        assert_eq!(expr, "(\"v1\", \"v2\") > ($1, $2)");
785
786        let cols = vec!["v1".to_owned(), "v2".to_owned(), "v3".to_owned()];
787        let expr = PostgresExternalTableReader::filter_expression(&cols);
788        assert_eq!(expr, "(\"v1\", \"v2\", \"v3\") > ($1, $2, $3)");
789    }
790
791    #[test]
792    fn test_split_filter_expression() {
793        let cols = vec!["v1".to_owned()];
794        let expr = PostgresExternalTableReader::split_filter_expression(&cols, true, true);
795        assert_eq!(expr, "1 = 1");
796
797        let expr = PostgresExternalTableReader::split_filter_expression(&cols, true, false);
798        assert_eq!(expr, "(\"v1\") < ($1)");
799
800        let expr = PostgresExternalTableReader::split_filter_expression(&cols, false, true);
801        assert_eq!(expr, "(\"v1\") >= ($1)");
802
803        let expr = PostgresExternalTableReader::split_filter_expression(&cols, false, false);
804        assert_eq!(expr, "(\"v1\") >= ($1) AND (\"v1\") < ($2)");
805    }
806
807    // manual test
808    #[ignore]
809    #[tokio::test]
810    async fn test_pg_table_reader() {
811        let columns = vec![
812            ColumnDesc::named("v1", ColumnId::new(1), DataType::Int32),
813            ColumnDesc::named("v2", ColumnId::new(2), DataType::Varchar),
814            ColumnDesc::named("v3", ColumnId::new(3), DataType::Decimal),
815            ColumnDesc::named("v4", ColumnId::new(4), DataType::Date),
816        ];
817        let rw_schema = Schema {
818            fields: columns.iter().map(Field::from).collect(),
819        };
820
821        let props: HashMap<String, String> = convert_args!(hashmap!(
822                "hostname" => "localhost",
823                "port" => "8432",
824                "username" => "myuser",
825                "password" => "123456",
826                "database.name" => "mydb",
827                "schema.name" => "public",
828                "table.name" => "t1"));
829
830        let config =
831            serde_json::from_value::<ExternalTableConfig>(serde_json::to_value(props).unwrap())
832                .unwrap();
833        let schema_table_name = SchemaTableName {
834            schema_name: "public".to_owned(),
835            table_name: "t1".to_owned(),
836        };
837        let reader = PostgresExternalTableReader::new(
838            config,
839            rw_schema,
840            vec![0, 1],
841            schema_table_name.clone(),
842        )
843        .await
844        .unwrap();
845
846        let offset = reader.current_cdc_offset().await.unwrap();
847        println!("CdcOffset: {:?}", offset);
848
849        let start_pk = OwnedRow::new(vec![Some(ScalarImpl::from(3)), Some(ScalarImpl::from("c"))]);
850        let stream = reader.snapshot_read(
851            schema_table_name,
852            Some(start_pk),
853            vec!["v1".to_owned(), "v2".to_owned()],
854            1000,
855        );
856
857        pin_mut!(stream);
858        #[for_await]
859        for row in stream {
860            println!("OwnedRow: {:?}", row);
861        }
862    }
863}