risingwave_stream/executor/backfill/cdc/
cdc_backill_v2.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::BTreeMap;
16
17use either::Either;
18use futures::stream;
19use futures::stream::select_with_strategy;
20use itertools::Itertools;
21use risingwave_common::bitmap::BitmapBuilder;
22use risingwave_common::catalog::{ColumnDesc, Field};
23use risingwave_common::row::RowDeserializer;
24use risingwave_common::util::iter_util::ZipEqFast;
25use risingwave_common::util::sort_util::{OrderType, cmp_datum};
26use risingwave_connector::parser::{TimeHandling, TimestampHandling, TimestamptzHandling};
27use risingwave_connector::source::cdc::CdcScanOptions;
28use risingwave_connector::source::cdc::external::ExternalTableReaderImpl;
29use risingwave_connector::source::{CdcTableSnapshotSplit, CdcTableSnapshotSplitRaw};
30use rw_futures_util::pausable;
31use thiserror_ext::AsReport;
32use tracing::Instrument;
33
34use crate::executor::UpdateMutation;
35use crate::executor::backfill::cdc::cdc_backfill::{
36    build_reader_and_poll_upstream, transform_upstream,
37};
38use crate::executor::backfill::cdc::state_v2::ParallelizedCdcBackfillState;
39use crate::executor::backfill::cdc::upstream_table::external::ExternalStorageTable;
40use crate::executor::backfill::cdc::upstream_table::snapshot::{
41    SplitSnapshotReadArgs, UpstreamTableRead, UpstreamTableReader,
42};
43use crate::executor::backfill::utils::{mapping_chunk, mapping_message};
44use crate::executor::prelude::*;
45use crate::executor::source::get_infinite_backoff_strategy;
46
47/// `split_id`, `is_finished`, `row_count` all occupy 1 column each.
48const METADATA_STATE_LEN: usize = 3;
49
50pub struct ParallelizedCdcBackfillExecutor<S: StateStore> {
51    actor_ctx: ActorContextRef,
52
53    /// The external table to be backfilled
54    external_table: ExternalStorageTable,
55
56    /// Upstream changelog stream which may contain metadata columns, e.g. `_rw_offset`
57    upstream: Executor,
58
59    /// The column indices need to be forwarded to the downstream from the upstream and table scan.
60    output_indices: Vec<usize>,
61
62    /// The schema of output chunk, including additional columns if any
63    output_columns: Vec<ColumnDesc>,
64
65    /// Rate limit in rows/s.
66    rate_limit_rps: Option<u32>,
67
68    options: CdcScanOptions,
69
70    state_table: StateTable<S>,
71
72    properties: BTreeMap<String, String>,
73}
74
75impl<S: StateStore> ParallelizedCdcBackfillExecutor<S> {
76    #[allow(clippy::too_many_arguments)]
77    pub fn new(
78        actor_ctx: ActorContextRef,
79        external_table: ExternalStorageTable,
80        upstream: Executor,
81        output_indices: Vec<usize>,
82        output_columns: Vec<ColumnDesc>,
83        _metrics: Arc<StreamingMetrics>,
84        state_table: StateTable<S>,
85        rate_limit_rps: Option<u32>,
86        options: CdcScanOptions,
87        properties: BTreeMap<String, String>,
88    ) -> Self {
89        Self {
90            actor_ctx,
91            external_table,
92            upstream,
93            output_indices,
94            output_columns,
95            rate_limit_rps,
96            options,
97            state_table,
98            properties,
99        }
100    }
101
102    #[try_stream(ok = Message, error = StreamExecutorError)]
103    async fn execute_inner(mut self) {
104        assert!(!self.options.disable_backfill);
105        // The indices to primary key columns
106        let pk_indices = self.external_table.pk_indices().to_vec();
107        let table_id = self.external_table.table_id().table_id;
108        let upstream_table_name = self.external_table.qualified_table_name();
109        let schema_table_name = self.external_table.schema_table_name().clone();
110        let external_database_name = self.external_table.database_name().to_owned();
111        let additional_columns = self
112            .output_columns
113            .iter()
114            .filter(|col| col.additional_column.column_type.is_some())
115            .cloned()
116            .collect_vec();
117        assert!(
118            (self.options.backfill_split_pk_column_index as usize) < pk_indices.len(),
119            "split pk column index {} out of bound",
120            self.options.backfill_split_pk_column_index
121        );
122        let snapshot_split_column_index =
123            pk_indices[self.options.backfill_split_pk_column_index as usize];
124        let cdc_table_snapshot_split_column =
125            vec![self.external_table.schema().fields[snapshot_split_column_index].clone()];
126
127        let mut upstream = self.upstream.execute();
128        // Poll the upstream to get the first barrier.
129        let first_barrier = expect_first_barrier(&mut upstream).await?;
130        // Make sure to use mapping_message after transform_upstream.
131
132        // If user sets debezium.time.precision.mode to "connect", it means the user can guarantee
133        // that the upstream data precision is MilliSecond. In this case, we don't use GuessNumberUnit
134        // mode to guess precision, but use Milli mode directly, which can handle extreme timestamps.
135        let timestamp_handling: Option<TimestampHandling> = self
136            .properties
137            .get("debezium.time.precision.mode")
138            .map(|v| v == "connect")
139            .unwrap_or(false)
140            .then_some(TimestampHandling::Milli);
141        let timestamptz_handling: Option<TimestamptzHandling> = self
142            .properties
143            .get("debezium.time.precision.mode")
144            .map(|v| v == "connect")
145            .unwrap_or(false)
146            .then_some(TimestamptzHandling::Milli);
147        let time_handling: Option<TimeHandling> = self
148            .properties
149            .get("debezium.time.precision.mode")
150            .map(|v| v == "connect")
151            .unwrap_or(false)
152            .then_some(TimeHandling::Milli);
153        let mut upstream = transform_upstream(
154            upstream,
155            self.output_columns.clone(),
156            timestamp_handling,
157            timestamptz_handling,
158            time_handling,
159        )
160        .boxed();
161        let mut next_reset_barrier = Some(first_barrier);
162        let mut is_reset = false;
163        let mut state_impl =
164            ParallelizedCdcBackfillState::new(self.state_table, METADATA_STATE_LEN);
165        let mut upstream_chunk_buffer: Vec<StreamChunk> = vec![];
166        // Need reset on CDC table snapshot splits reschedule.
167        'with_cdc_table_snapshot_splits: loop {
168            assert!(upstream_chunk_buffer.is_empty());
169            let reset_barrier = next_reset_barrier.take().unwrap();
170            let all_snapshot_splits = match reset_barrier.mutation.as_deref() {
171                Some(Mutation::Add(add)) => &add.actor_cdc_table_snapshot_splits,
172                Some(Mutation::Update(update)) => &update.actor_cdc_table_snapshot_splits,
173                _ => {
174                    return Err(anyhow::anyhow!("ParallelizedCdcBackfillExecutor expects either Mutation::Add or Mutation::Update to initialize CDC table snapshot splits.").into());
175                }
176            };
177            let mut actor_snapshot_splits = vec![];
178            // TODO(zw): optimization: remove consumed splits to reduce barrier size for downstream.
179            if let Some(splits) = all_snapshot_splits.get(&self.actor_ctx.id) {
180                actor_snapshot_splits = splits
181                    .iter()
182                    .map(|s: &CdcTableSnapshotSplitRaw| {
183                        let de = RowDeserializer::new(
184                            cdc_table_snapshot_split_column
185                                .iter()
186                                .map(Field::data_type)
187                                .collect_vec(),
188                        );
189                        let left_bound_inclusive =
190                            de.deserialize(s.left_bound_inclusive.as_ref()).unwrap();
191                        let right_bound_exclusive =
192                            de.deserialize(s.right_bound_exclusive.as_ref()).unwrap();
193                        CdcTableSnapshotSplit {
194                            split_id: s.split_id,
195                            left_bound_inclusive,
196                            right_bound_exclusive,
197                        }
198                    })
199                    .collect();
200            }
201            tracing::debug!(?actor_snapshot_splits, "actor splits");
202            assert_consecutive_splits(&actor_snapshot_splits);
203
204            let mut is_snapshot_paused = reset_barrier.is_pause_on_startup();
205            let barrier_epoch = reset_barrier.epoch;
206            yield Message::Barrier(reset_barrier);
207            if !is_reset {
208                state_impl.init_epoch(barrier_epoch).await?;
209                is_reset = true;
210                tracing::info!(table_id, "Initialize executor.");
211            } else {
212                tracing::info!(table_id, "Reset executor.");
213            }
214
215            let mut current_actor_bounds = None;
216            // Find next split that need backfill.
217            let mut next_split_idx = actor_snapshot_splits.len();
218            for (idx, split) in actor_snapshot_splits.iter().enumerate() {
219                let state = state_impl.restore_state(split.split_id).await?;
220                if !state.is_finished {
221                    next_split_idx = idx;
222                    break;
223                }
224                extends_current_actor_bound(&mut current_actor_bounds, split);
225            }
226            for split in actor_snapshot_splits.iter().skip(next_split_idx) {
227                // Initialize state so that overall progress can be measured.
228                state_impl.mutate_state(split.split_id, false, 0).await?;
229            }
230
231            // After init the state table and forward the initial barrier to downstream,
232            // we now try to create the table reader with retry.
233            let mut table_reader: Option<ExternalTableReaderImpl> = None;
234            let external_table = self.external_table.clone();
235            let mut future = Box::pin(async move {
236                let backoff = get_infinite_backoff_strategy();
237                tokio_retry::Retry::spawn(backoff, || async {
238                    match external_table.create_table_reader().await {
239                        Ok(reader) => Ok(reader),
240                        Err(e) => {
241                            tracing::warn!(error = %e.as_report(), "failed to create cdc table reader, retrying...");
242                            Err(e)
243                        }
244                    }
245                })
246                    .instrument(tracing::info_span!("create_cdc_table_reader_with_retry"))
247                    .await
248                    .expect("Retry create cdc table reader until success.")
249            });
250            loop {
251                if let Some(msg) =
252                    build_reader_and_poll_upstream(&mut upstream, &mut table_reader, &mut future)
253                        .await?
254                {
255                    if let Some(msg) = mapping_message(msg, &self.output_indices) {
256                        match msg {
257                            Message::Barrier(barrier) => {
258                                state_impl.commit_state(barrier.epoch).await?;
259                                if is_reset_barrier(&barrier, self.actor_ctx.id) {
260                                    next_reset_barrier = Some(barrier);
261                                    continue 'with_cdc_table_snapshot_splits;
262                                }
263                                yield Message::Barrier(barrier);
264                            }
265                            Message::Chunk(chunk) => {
266                                if chunk.cardinality() == 0 {
267                                    continue;
268                                }
269                                if let Some(filtered_chunk) = filter_stream_chunk(
270                                    chunk,
271                                    &current_actor_bounds,
272                                    snapshot_split_column_index,
273                                ) && filtered_chunk.cardinality() > 0
274                                {
275                                    yield Message::Chunk(filtered_chunk);
276                                }
277                            }
278                            Message::Watermark(_) => {
279                                // Ignore watermark, like the `CdcBackfillExecutor`.
280                            }
281                        }
282                    }
283                } else {
284                    assert!(table_reader.is_some(), "table reader must created");
285                    tracing::info!(
286                        table_id,
287                        upstream_table_name,
288                        "table reader created successfully"
289                    );
290                    break;
291                }
292            }
293            let upstream_table_reader = UpstreamTableReader::new(
294                self.external_table.clone(),
295                table_reader.expect("table reader must created"),
296            );
297            // Backfill snapshot splits sequentially.
298            for split in actor_snapshot_splits.iter().skip(next_split_idx) {
299                tracing::info!(
300                    table_id,
301                    upstream_table_name,
302                    ?split,
303                    is_snapshot_paused,
304                    "start cdc backfill split"
305                );
306                extends_current_actor_bound(&mut current_actor_bounds, split);
307                let left_upstream = upstream.by_ref().map(Either::Left);
308                let read_args = SplitSnapshotReadArgs::new(
309                    split.left_bound_inclusive.clone(),
310                    split.right_bound_exclusive.clone(),
311                    cdc_table_snapshot_split_column.clone(),
312                    self.rate_limit_rps,
313                    additional_columns.clone(),
314                    schema_table_name.clone(),
315                    external_database_name.clone(),
316                );
317                let right_snapshot = pin!(
318                    upstream_table_reader
319                        .snapshot_read_table_split(read_args)
320                        .map(Either::Right)
321                );
322                let (right_snapshot, snapshot_valve) = pausable(right_snapshot);
323                if is_snapshot_paused {
324                    snapshot_valve.pause();
325                }
326                let mut backfill_stream =
327                    select_with_strategy(left_upstream, right_snapshot, |_: &mut ()| {
328                        stream::PollNext::Left
329                    });
330                let mut row_count: u64 = 0;
331                #[for_await]
332                for either in &mut backfill_stream {
333                    match either {
334                        // Upstream
335                        Either::Left(msg) => {
336                            match msg? {
337                                Message::Barrier(barrier) => {
338                                    state_impl.commit_state(barrier.epoch).await?;
339                                    if let Some(mutation) = barrier.mutation.as_deref() {
340                                        use crate::executor::Mutation;
341                                        match mutation {
342                                            Mutation::Pause => {
343                                                is_snapshot_paused = true;
344                                                snapshot_valve.pause();
345                                            }
346                                            Mutation::Resume => {
347                                                is_snapshot_paused = false;
348                                                snapshot_valve.resume();
349                                            }
350                                            Mutation::Throttle(some) => {
351                                                // TODO(zw): optimization: improve throttle.
352                                                // 1. Handle rate limit 0. Currently, to resume the process, the actor must be rebuilt.
353                                                // 2. Apply new rate limit immediately.
354                                                if let Some(new_rate_limit) =
355                                                    some.get(&self.actor_ctx.id)
356                                                    && *new_rate_limit != self.rate_limit_rps
357                                                {
358                                                    // The new rate limit will take effect since next split.
359                                                    self.rate_limit_rps = *new_rate_limit;
360                                                }
361                                            }
362                                            Mutation::Update(UpdateMutation {
363                                                dropped_actors,
364                                                ..
365                                            }) => {
366                                                if dropped_actors.contains(&self.actor_ctx.id) {
367                                                    tracing::info!(
368                                                        table_id,
369                                                        upstream_table_name,
370                                                        "CdcBackfill has been dropped due to config change"
371                                                    );
372                                                    for chunk in upstream_chunk_buffer.drain(..) {
373                                                        yield Message::Chunk(mapping_chunk(
374                                                            chunk,
375                                                            &self.output_indices,
376                                                        ));
377                                                    }
378                                                    yield Message::Barrier(barrier);
379                                                    let () = futures::future::pending().await;
380                                                    unreachable!();
381                                                }
382                                            }
383                                            _ => (),
384                                        }
385                                    }
386                                    if is_reset_barrier(&barrier, self.actor_ctx.id) {
387                                        next_reset_barrier = Some(barrier);
388                                        for chunk in upstream_chunk_buffer.drain(..) {
389                                            yield Message::Chunk(mapping_chunk(
390                                                chunk,
391                                                &self.output_indices,
392                                            ));
393                                        }
394                                        continue 'with_cdc_table_snapshot_splits;
395                                    }
396                                    // emit barrier and continue to consume the backfill stream
397                                    yield Message::Barrier(barrier);
398                                }
399                                Message::Chunk(chunk) => {
400                                    // skip empty upstream chunk
401                                    if chunk.cardinality() == 0 {
402                                        continue;
403                                    }
404                                    if let Some(filtered_chunk) = filter_stream_chunk(
405                                        chunk,
406                                        &current_actor_bounds,
407                                        snapshot_split_column_index,
408                                    ) && filtered_chunk.cardinality() > 0
409                                    {
410                                        // Buffer the upstream chunk.
411                                        upstream_chunk_buffer.push(filtered_chunk.compact());
412                                    }
413                                }
414                                Message::Watermark(_) => {
415                                    // Ignore watermark during backfill, like the `CdcBackfillExecutor`.
416                                }
417                            }
418                        }
419                        // Snapshot read
420                        Either::Right(msg) => {
421                            match msg? {
422                                None => {
423                                    tracing::info!(
424                                        table_id,
425                                        split_id = split.split_id,
426                                        "snapshot read stream ends"
427                                    );
428                                    for chunk in upstream_chunk_buffer.drain(..) {
429                                        yield Message::Chunk(mapping_chunk(
430                                            chunk,
431                                            &self.output_indices,
432                                        ));
433                                    }
434                                    // Next split.
435                                    break;
436                                }
437                                Some(chunk) => {
438                                    let chunk_cardinality = chunk.cardinality() as u64;
439                                    row_count = row_count.saturating_add(chunk_cardinality);
440                                    yield Message::Chunk(mapping_chunk(
441                                        chunk,
442                                        &self.output_indices,
443                                    ));
444                                }
445                            }
446                        }
447                    }
448                }
449                // Mark current split backfill as finished. The state will be persisted by next barrier.
450                state_impl
451                    .mutate_state(split.split_id, true, row_count)
452                    .await?;
453            }
454
455            upstream_table_reader.disconnect().await?;
456            tracing::info!(
457                table_id,
458                upstream_table_name,
459                "CdcBackfill has already finished and will forward messages directly to the downstream"
460            );
461
462            // After backfill progress finished
463            // we can forward messages directly to the downstream,
464            // as backfill is finished.
465            #[for_await]
466            for msg in &mut upstream {
467                let Some(msg) = mapping_message(msg?, &self.output_indices) else {
468                    continue;
469                };
470                match msg {
471                    Message::Barrier(barrier) => {
472                        state_impl.commit_state(barrier.epoch).await?;
473                        if is_reset_barrier(&barrier, self.actor_ctx.id) {
474                            next_reset_barrier = Some(barrier);
475                            continue 'with_cdc_table_snapshot_splits;
476                        }
477                        yield Message::Barrier(barrier);
478                    }
479                    Message::Chunk(chunk) => {
480                        if chunk.cardinality() == 0 {
481                            continue;
482                        }
483                        if let Some(filtered_chunk) = filter_stream_chunk(
484                            chunk,
485                            &current_actor_bounds,
486                            snapshot_split_column_index,
487                        ) && filtered_chunk.cardinality() > 0
488                        {
489                            yield Message::Chunk(filtered_chunk);
490                        }
491                    }
492                    msg @ Message::Watermark(_) => {
493                        yield msg;
494                    }
495                }
496            }
497        }
498    }
499}
500
501fn filter_stream_chunk(
502    chunk: StreamChunk,
503    bound: &Option<(OwnedRow, OwnedRow)>,
504    snapshot_split_column_index: usize,
505) -> Option<StreamChunk> {
506    let Some((left, right)) = bound else {
507        return None;
508    };
509    assert_eq!(left.len(), 1, "multiple split columns is not supported yet");
510    assert_eq!(
511        right.len(),
512        1,
513        "multiple split columns is not supported yet"
514    );
515    let left_split_key = left.datum_at(0);
516    let right_split_key = right.datum_at(0);
517    let is_leftmost_bound = is_leftmost_bound(left);
518    let is_rightmost_bound = is_rightmost_bound(right);
519    if is_leftmost_bound && is_rightmost_bound {
520        return Some(chunk);
521    }
522    let mut new_bitmap = BitmapBuilder::with_capacity(chunk.capacity());
523    let (ops, columns, visibility) = chunk.into_inner();
524    for (row_split_key, v) in columns[snapshot_split_column_index]
525        .iter()
526        .zip_eq_fast(visibility.iter())
527    {
528        if !v {
529            new_bitmap.append(false);
530            continue;
531        }
532        let mut is_in_range = true;
533        if !is_leftmost_bound {
534            is_in_range = cmp_datum(
535                row_split_key,
536                left_split_key,
537                OrderType::ascending_nulls_first(),
538            )
539            .is_ge();
540        }
541        if is_in_range && !is_rightmost_bound {
542            is_in_range = cmp_datum(
543                row_split_key,
544                right_split_key,
545                OrderType::ascending_nulls_first(),
546            )
547            .is_lt();
548        }
549        if !is_in_range {
550            tracing::trace!(?row_split_key, ?left_split_key, ?right_split_key, snapshot_split_column_index, data_type = ?columns[snapshot_split_column_index].data_type(), "filter out row")
551        }
552        new_bitmap.append(is_in_range);
553    }
554    Some(StreamChunk::with_visibility(
555        ops,
556        columns,
557        new_bitmap.finish(),
558    ))
559}
560
561fn is_leftmost_bound(row: &OwnedRow) -> bool {
562    row.iter().all(|d| d.is_none())
563}
564
565fn is_rightmost_bound(row: &OwnedRow) -> bool {
566    row.iter().all(|d| d.is_none())
567}
568
569impl<S: StateStore> Execute for ParallelizedCdcBackfillExecutor<S> {
570    fn execute(self: Box<Self>) -> BoxedMessageStream {
571        self.execute_inner().boxed()
572    }
573}
574
575fn extends_current_actor_bound(
576    current: &mut Option<(OwnedRow, OwnedRow)>,
577    split: &CdcTableSnapshotSplit,
578) {
579    if current.is_none() {
580        *current = Some((
581            split.left_bound_inclusive.clone(),
582            split.right_bound_exclusive.clone(),
583        ));
584    } else {
585        current.as_mut().unwrap().1 = split.right_bound_exclusive.clone();
586    }
587}
588
589fn is_reset_barrier(barrier: &Barrier, actor_id: ActorId) -> bool {
590    match barrier.mutation.as_deref() {
591        Some(Mutation::Update(update)) => update
592            .actor_cdc_table_snapshot_splits
593            .contains_key(&actor_id),
594        _ => false,
595    }
596}
597
598fn assert_consecutive_splits(actor_snapshot_splits: &[CdcTableSnapshotSplit]) {
599    for i in 1..actor_snapshot_splits.len() {
600        assert_eq!(
601            actor_snapshot_splits[i].split_id,
602            actor_snapshot_splits[i - 1].split_id + 1,
603            "{:?}",
604            actor_snapshot_splits
605        );
606        assert!(
607            cmp_datum(
608                actor_snapshot_splits[i - 1]
609                    .right_bound_exclusive
610                    .datum_at(0),
611                actor_snapshot_splits[i].right_bound_exclusive.datum_at(0),
612                OrderType::ascending_nulls_last(),
613            )
614            .is_lt()
615        );
616    }
617}
618
619#[cfg(test)]
620mod tests {
621    use risingwave_common::array::StreamChunk;
622    use risingwave_common::row::OwnedRow;
623    use risingwave_common::types::ScalarImpl;
624
625    use crate::executor::backfill::cdc::cdc_backill_v2::filter_stream_chunk;
626
627    #[test]
628    fn test_filter_stream_chunk() {
629        use risingwave_common::array::StreamChunkTestExt;
630        let chunk = StreamChunk::from_pretty(
631            "  I I
632             + 1 6
633             - 2 .
634            U- 3 7
635            U+ 4 .",
636        );
637        let bound = None;
638        let c = filter_stream_chunk(chunk.clone(), &bound, 0);
639        assert!(c.is_none());
640
641        let bound = Some((OwnedRow::new(vec![None]), OwnedRow::new(vec![None])));
642        let c = filter_stream_chunk(chunk.clone(), &bound, 0);
643        assert_eq!(c.unwrap().compact(), chunk);
644
645        let bound = Some((
646            OwnedRow::new(vec![None]),
647            OwnedRow::new(vec![Some(ScalarImpl::Int64(3))]),
648        ));
649        let c = filter_stream_chunk(chunk.clone(), &bound, 0);
650        assert_eq!(
651            c.unwrap().compact(),
652            StreamChunk::from_pretty(
653                "  I I
654             + 1 6
655             - 2 .",
656            )
657        );
658
659        let bound = Some((
660            OwnedRow::new(vec![Some(ScalarImpl::Int64(3))]),
661            OwnedRow::new(vec![None]),
662        ));
663        let c = filter_stream_chunk(chunk.clone(), &bound, 0);
664        assert_eq!(
665            c.unwrap().compact(),
666            StreamChunk::from_pretty(
667                "  I I
668            U- 3 7
669            U+ 4 .",
670            )
671        );
672
673        let bound = Some((
674            OwnedRow::new(vec![Some(ScalarImpl::Int64(2))]),
675            OwnedRow::new(vec![Some(ScalarImpl::Int64(4))]),
676        ));
677        let c = filter_stream_chunk(chunk.clone(), &bound, 0);
678        assert_eq!(
679            c.unwrap().compact(),
680            StreamChunk::from_pretty(
681                "  I I
682             - 2 .
683            U- 3 7",
684            )
685        );
686
687        // Test NULL value.
688        let bound = None;
689        let c = filter_stream_chunk(chunk.clone(), &bound, 1);
690        assert!(c.is_none());
691
692        let bound = Some((OwnedRow::new(vec![None]), OwnedRow::new(vec![None])));
693        let c = filter_stream_chunk(chunk.clone(), &bound, 1);
694        assert_eq!(c.unwrap().compact(), chunk);
695
696        let bound = Some((
697            OwnedRow::new(vec![None]),
698            OwnedRow::new(vec![Some(ScalarImpl::Int64(7))]),
699        ));
700        let c = filter_stream_chunk(chunk.clone(), &bound, 1);
701        assert_eq!(
702            c.unwrap().compact(),
703            StreamChunk::from_pretty(
704                "  I I
705             + 1 6
706             - 2 .
707            U+ 4 .",
708            )
709        );
710
711        let bound = Some((
712            OwnedRow::new(vec![Some(ScalarImpl::Int64(7))]),
713            OwnedRow::new(vec![None]),
714        ));
715        let c = filter_stream_chunk(chunk.clone(), &bound, 1);
716        assert_eq!(
717            c.unwrap().compact(),
718            StreamChunk::from_pretty(
719                "  I I
720            U- 3 7",
721            )
722        );
723    }
724}