risingwave_stream/executor/aggregate/
distinct.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::marker::PhantomData;
17use std::sync::atomic::AtomicU64;
18
19use itertools::Itertools;
20use risingwave_common::array::{ArrayRef, Op};
21use risingwave_common::bitmap::{Bitmap, BitmapBuilder};
22use risingwave_common::row::{self, CompactedRow, RowExt};
23use risingwave_common::util::iter_util::ZipEqFast;
24use risingwave_expr::aggregate::AggCall;
25
26use super::agg_group::GroupKey;
27use crate::cache::ManagedLruCache;
28use crate::common::metrics::MetricsInfo;
29use crate::executor::monitor::AggDistinctDedupMetrics;
30use crate::executor::prelude::*;
31
32type DedupCache = ManagedLruCache<CompactedRow, Box<[i64]>>;
33
34/// Deduplicater for one distinct column.
35struct ColumnDeduplicater<S: StateStore> {
36    cache: DedupCache,
37    metrics: AggDistinctDedupMetrics,
38    _phantom: PhantomData<S>,
39}
40
41impl<S: StateStore> ColumnDeduplicater<S> {
42    fn new(
43        distinct_col: usize,
44        dedup_table: &StateTable<S>,
45        watermark_sequence: Arc<AtomicU64>,
46        actor_ctx: &ActorContext,
47    ) -> Self {
48        let metrics_info = MetricsInfo::new(
49            actor_ctx.streaming_metrics.clone(),
50            dedup_table.table_id(),
51            actor_ctx.id,
52            format!("distinct dedup column {}", distinct_col),
53        );
54        let metrics = actor_ctx.streaming_metrics.new_agg_distinct_dedup_metrics(
55            dedup_table.table_id(),
56            actor_ctx.id,
57            actor_ctx.fragment_id,
58        );
59
60        Self {
61            cache: ManagedLruCache::unbounded(watermark_sequence, metrics_info),
62            metrics,
63            _phantom: PhantomData,
64        }
65    }
66
67    async fn dedup(
68        &mut self,
69        ops: &[Op],
70        column: &ArrayRef,
71        mut visibilities: Vec<&mut Bitmap>,
72        dedup_table: &mut StateTable<S>,
73        group_key: Option<&GroupKey>,
74    ) -> StreamExecutorResult<()> {
75        let n_calls = visibilities.len();
76
77        let mut prev_counts_map = HashMap::new(); // also serves as changeset
78
79        // inverted masks for visibilities, 1 means hidden, 0 means visible
80        let mut vis_masks_inv = (0..visibilities.len())
81            .map(|_| BitmapBuilder::zeroed(column.len()))
82            .collect_vec();
83        for (datum_idx, (op, datum)) in ops.iter().zip_eq_fast(column.iter()).enumerate() {
84            // skip if this item is hidden to all agg calls (this is likely to happen)
85            if !visibilities.iter().any(|vis| vis.is_set(datum_idx)) {
86                continue;
87            }
88
89            // get counts of the distinct key of all agg calls that distinct on this column
90            let row_prefix = group_key.map(GroupKey::table_row).chain(row::once(datum));
91            let table_pk = group_key.map(GroupKey::table_pk).chain(row::once(datum));
92            let cache_key =
93                CompactedRow::from(group_key.map(GroupKey::cache_key).chain(row::once(datum)));
94
95            self.metrics.agg_distinct_total_cache_count.inc();
96            // TODO(yuhao): avoid this `contains`.
97            // https://github.com/risingwavelabs/risingwave/issues/9233
98            let mut counts = if self.cache.contains(&cache_key) {
99                self.cache.get_mut(&cache_key).unwrap()
100            } else {
101                self.metrics.agg_distinct_cache_miss_count.inc();
102                // load from table into the cache
103                let counts = if let Some(counts_row) =
104                    dedup_table.get_row(&table_pk).await? as Option<OwnedRow>
105                {
106                    counts_row
107                        .iter()
108                        .map(|v| v.map_or(0, ScalarRefImpl::into_int64))
109                        .collect()
110                } else {
111                    // ensure there is a row in the dedup table for this distinct key
112                    dedup_table.insert(
113                        (&row_prefix).chain(row::repeat_n(Some(ScalarImpl::from(0i64)), n_calls)),
114                    );
115                    vec![0; n_calls].into_boxed_slice()
116                };
117                self.cache.put(cache_key.clone(), counts); // TODO(rc): can we avoid this clone?
118
119                self.cache.get_mut(&cache_key).unwrap()
120            };
121            debug_assert_eq!(counts.len(), visibilities.len());
122
123            // snapshot the counts as prev counts when first time seeing this distinct key
124            prev_counts_map
125                .entry(datum)
126                .or_insert_with(|| counts.to_owned());
127
128            match op {
129                Op::Insert | Op::UpdateInsert => {
130                    // iterate over vis of each distinct agg call, count up for visible datum
131                    for (i, vis) in visibilities.iter().enumerate() {
132                        if vis.is_set(datum_idx) {
133                            counts[i] += 1;
134                            if counts[i] > 1 {
135                                // duplicate, hide this one
136                                vis_masks_inv[i].set(datum_idx, true);
137                            }
138                        }
139                    }
140                }
141                Op::Delete | Op::UpdateDelete => {
142                    // iterate over vis of each distinct agg call, count down for visible datum
143                    for (i, vis) in visibilities.iter().enumerate() {
144                        if vis.is_set(datum_idx) {
145                            counts[i] -= 1;
146                            debug_assert!(counts[i] >= 0);
147                            if counts[i] > 0 {
148                                // still exists at least one duplicate, hide this one
149                                vis_masks_inv[i].set(datum_idx, true);
150                            }
151                        }
152                    }
153                }
154            }
155        }
156
157        // flush changes to dedup table
158        prev_counts_map
159            .into_iter()
160            .for_each(|(datum, prev_counts)| {
161                let row_prefix = group_key.map(GroupKey::table_row).chain(row::once(datum));
162                let cache_key =
163                    CompactedRow::from(group_key.map(GroupKey::cache_key).chain(row::once(datum)));
164                let new_counts = OwnedRow::new(
165                    self.cache
166                        .get(&cache_key)
167                        .expect("distinct key in `prev_counts_map` must also exist in `self.cache`")
168                        .iter()
169                        .map(|&v| Some(v.into()))
170                        .collect(),
171                );
172                let old_counts =
173                    OwnedRow::new(prev_counts.iter().map(|&v| Some(v.into())).collect());
174                let old_row = row_prefix.chain(old_counts);
175                if new_counts
176                    .iter()
177                    .all(|v| v.map_or(0, ScalarRefImpl::into_int64) == 0)
178                {
179                    // if new counts all dropped to 0, we need to delete the row from the dedup table
180                    dedup_table.delete(old_row);
181                    self.cache.remove(&cache_key);
182                } else {
183                    dedup_table.update(old_row, row_prefix.chain(new_counts));
184                }
185            });
186
187        for (vis, vis_mask_inv) in visibilities.iter_mut().zip_eq(vis_masks_inv.into_iter()) {
188            // update visibility
189            **vis &= !vis_mask_inv.finish();
190        }
191
192        // if we determine to flush to the table when processing every chunk instead of barrier
193        // coming, we can evict all including current epoch data.
194        self.cache.evict();
195
196        Ok(())
197    }
198
199    /// Flush the deduplication table.
200    fn flush(&mut self, _dedup_table: &StateTable<S>) {
201        // TODO(rc): now we flush the table in `dedup` method.
202        // WARN: if you want to change to batching the write to table. please remember to change
203        // `self.cache.evict()` too.
204        self.cache.evict();
205
206        self.metrics
207            .agg_distinct_cached_entry_count
208            .set(self.cache.len() as i64);
209    }
210}
211
212/// # Safety
213///
214/// There must not be duplicate items in `indices`.
215unsafe fn get_many_mut_from_slice<'a, T>(slice: &'a mut [T], indices: &[usize]) -> Vec<&'a mut T> {
216    let mut res = Vec::with_capacity(indices.len());
217    let ptr = slice.as_mut_ptr();
218    for &idx in indices {
219        unsafe {
220            res.push(&mut *ptr.add(idx));
221        }
222    }
223    res
224}
225
226pub struct DistinctDeduplicater<S: StateStore> {
227    /// Key: distinct column index;
228    /// Value: (agg call indices that distinct on the column, deduplicater for the column).
229    deduplicaters: HashMap<usize, (Box<[usize]>, ColumnDeduplicater<S>)>,
230}
231
232impl<S: StateStore> DistinctDeduplicater<S> {
233    pub fn new(
234        agg_calls: &[AggCall],
235        watermark_epoch: Arc<AtomicU64>,
236        distinct_dedup_tables: &HashMap<usize, StateTable<S>>,
237        actor_ctx: &ActorContext,
238    ) -> Self {
239        let deduplicaters: HashMap<_, _> = agg_calls
240            .iter()
241            .enumerate()
242            .filter(|(_, call)| call.distinct) // only distinct agg calls need dedup table
243            .into_group_map_by(|(_, call)| call.args.val_indices()[0])
244            .into_iter()
245            .map(|(distinct_col, indices_and_calls)| {
246                let dedup_table = distinct_dedup_tables.get(&distinct_col).unwrap();
247                let call_indices: Box<[_]> = indices_and_calls.into_iter().map(|v| v.0).collect();
248                let deduplicater = ColumnDeduplicater::new(
249                    distinct_col,
250                    dedup_table,
251                    watermark_epoch.clone(),
252                    actor_ctx,
253                );
254                (distinct_col, (call_indices, deduplicater))
255            })
256            .collect();
257        Self { deduplicaters }
258    }
259
260    pub fn dedup_caches_mut(&mut self) -> impl Iterator<Item = &mut DedupCache> {
261        self.deduplicaters
262            .values_mut()
263            .map(|(_, deduplicater)| &mut deduplicater.cache)
264    }
265
266    /// Deduplicate the chunk for each agg call, by returning new visibilities
267    /// that hide duplicate rows.
268    pub async fn dedup_chunk(
269        &mut self,
270        ops: &[Op],
271        columns: &[ArrayRef],
272        mut visibilities: Vec<Bitmap>,
273        dedup_tables: &mut HashMap<usize, StateTable<S>>,
274        group_key: Option<&GroupKey>,
275    ) -> StreamExecutorResult<Vec<Bitmap>> {
276        for (distinct_col, (call_indices, deduplicater)) in &mut self.deduplicaters {
277            let column = &columns[*distinct_col];
278            let dedup_table = dedup_tables.get_mut(distinct_col).unwrap();
279            // Select visibilities (as mutable references) of distinct agg calls that distinct on
280            // `distinct_col` so that `Deduplicater` doesn't need to care about index mapping.
281            // SAFETY: all items in `agg_call_indices` are unique by nature, see `new`.
282            let visibilities = unsafe { get_many_mut_from_slice(&mut visibilities, call_indices) };
283            deduplicater
284                .dedup(ops, column, visibilities, dedup_table, group_key)
285                .await?;
286        }
287        Ok(visibilities)
288    }
289
290    /// Flush dedup state caches to dedup tables.
291    pub fn flush(
292        &mut self,
293        dedup_tables: &mut HashMap<usize, StateTable<S>>,
294    ) -> StreamExecutorResult<()> {
295        for (distinct_col, (_, deduplicater)) in &mut self.deduplicaters {
296            let dedup_table = dedup_tables.get_mut(distinct_col).unwrap();
297            deduplicater.flush(dedup_table);
298        }
299        Ok(())
300    }
301}
302
303#[cfg(test)]
304mod tests {
305    use risingwave_common::catalog::{ColumnDesc, ColumnId, TableId};
306    use risingwave_common::test_prelude::StreamChunkTestExt;
307    use risingwave_common::util::epoch::{EpochPair, test_epoch};
308    use risingwave_common::util::sort_util::OrderType;
309    use risingwave_storage::memory::MemoryStateStore;
310
311    use super::*;
312    use crate::common::table::test_utils::gen_pbtable_with_value_indices;
313
314    async fn infer_dedup_tables<S: StateStore>(
315        agg_calls: &[AggCall],
316        group_key_types: &[DataType],
317        store: S,
318    ) -> HashMap<usize, StateTable<S>> {
319        // corresponding to `Agg::infer_distinct_dedup_table` in frontend
320        let mut dedup_tables = HashMap::new();
321
322        for (distinct_col, indices_and_calls) in agg_calls
323            .iter()
324            .enumerate()
325            .filter(|(_, call)| call.distinct) // only distinct agg calls need dedup table
326            .into_group_map_by(|(_, call)| call.args.val_indices()[0])
327        {
328            let mut columns = vec![];
329            let mut order_types = vec![];
330
331            let mut next_column_id = 0;
332            let mut add_column_desc = |data_type: DataType| {
333                columns.push(ColumnDesc::unnamed(
334                    ColumnId::new(next_column_id),
335                    data_type,
336                ));
337                next_column_id += 1;
338            };
339
340            // group key columns
341            for data_type in group_key_types {
342                add_column_desc(data_type.clone());
343                order_types.push(OrderType::ascending());
344            }
345
346            // distinct key column
347            add_column_desc(indices_and_calls[0].1.args.arg_types()[0].clone());
348            order_types.push(OrderType::ascending());
349
350            // count columns
351            for (_, _) in indices_and_calls {
352                add_column_desc(DataType::Int64);
353            }
354
355            let pk_indices = (0..(group_key_types.len() + 1)).collect::<Vec<_>>();
356            let value_indices = ((group_key_types.len() + 1)..columns.len()).collect::<Vec<_>>();
357            let table_pb = gen_pbtable_with_value_indices(
358                TableId::new(2333 + distinct_col as u32),
359                columns,
360                order_types,
361                pk_indices,
362                0,
363                value_indices,
364            );
365            let table = StateTable::from_table_catalog(&table_pb, store.clone(), None).await;
366            dedup_tables.insert(distinct_col, table);
367        }
368
369        dedup_tables
370    }
371
372    #[tokio::test]
373    async fn test_distinct_deduplicater() {
374        // Schema:
375        // a: int, b int, c int
376        // Agg calls:
377        // count(a), count(distinct a), sum(distinct a), count(distinct b)
378        // Group keys:
379        // empty
380
381        let agg_calls = [
382            AggCall::from_pretty("(count:int8 $0:int8)"), // count(a)
383            AggCall::from_pretty("(count:int8 $0:int8 distinct)"), // count(distinct a)
384            AggCall::from_pretty("(  sum:int8 $0:int8 distinct)"), // sum(distinct a)
385            AggCall::from_pretty("(count:int8 $1:int8 distinct)"), // count(distinct b)
386        ];
387
388        let store = MemoryStateStore::new();
389        let mut epoch = EpochPair::new_test_epoch(test_epoch(1));
390        let mut dedup_tables = infer_dedup_tables(&agg_calls, &[], store).await;
391        for table in dedup_tables.values_mut() {
392            table.init_epoch(epoch).await.unwrap()
393        }
394
395        let mut deduplicater = DistinctDeduplicater::new(
396            &agg_calls,
397            Arc::new(AtomicU64::new(0)),
398            &dedup_tables,
399            &ActorContext::for_test(0),
400        );
401
402        // --- chunk 1 ---
403
404        let chunk = StreamChunk::from_pretty(
405            " I   I     I
406            + 1  10   100
407            + 1  11   101",
408        );
409        let (ops, columns, visibility) = chunk.into_inner();
410
411        let visibilities = std::iter::repeat_n(visibility, agg_calls.len()).collect_vec();
412        let visibilities = deduplicater
413            .dedup_chunk(&ops, &columns, visibilities, &mut dedup_tables, None)
414            .await
415            .unwrap();
416        assert_eq!(
417            visibilities[0].iter().collect_vec(),
418            vec![true, true] // same as original chunk
419        );
420        assert_eq!(
421            visibilities[1].iter().collect_vec(),
422            vec![true, false] // distinct on a
423        );
424        assert_eq!(
425            visibilities[2].iter().collect_vec(),
426            vec![true, false] // distinct on a, same as above
427        );
428        assert_eq!(
429            visibilities[3].iter().collect_vec(),
430            vec![true, true] // distinct on b
431        );
432
433        deduplicater.flush(&mut dedup_tables).unwrap();
434
435        epoch.inc_for_test();
436        for table in dedup_tables.values_mut() {
437            table.commit_for_test(epoch).await.unwrap();
438        }
439
440        // --- chunk 2 ---
441
442        let chunk = StreamChunk::from_pretty(
443            " I   I     I
444            + 1  11  -102
445            + 2  12   103  D
446            + 2  12  -104",
447        );
448        let (ops, columns, visibility) = chunk.into_inner();
449
450        let visibilities = std::iter::repeat_n(visibility, agg_calls.len()).collect_vec();
451        let visibilities = deduplicater
452            .dedup_chunk(&ops, &columns, visibilities, &mut dedup_tables, None)
453            .await
454            .unwrap();
455        assert_eq!(
456            visibilities[0].iter().collect_vec(),
457            vec![true, false, true] // same as original chunk
458        );
459        assert_eq!(
460            visibilities[1].iter().collect_vec(),
461            vec![false, false, true] // distinct on a
462        );
463        assert_eq!(
464            visibilities[2].iter().collect_vec(),
465            vec![false, false, true] // distinct on a, same as above
466        );
467        assert_eq!(
468            visibilities[3].iter().collect_vec(),
469            vec![false, false, true] // distinct on b
470        );
471
472        deduplicater.flush(&mut dedup_tables).unwrap();
473
474        epoch.inc_for_test();
475        for table in dedup_tables.values_mut() {
476            table.commit_for_test(epoch).await.unwrap();
477        }
478
479        drop(deduplicater);
480
481        // test recovery
482        let mut deduplicater = DistinctDeduplicater::new(
483            &agg_calls,
484            Arc::new(AtomicU64::new(0)),
485            &dedup_tables,
486            &ActorContext::for_test(0),
487        );
488
489        // --- chunk 3 ---
490
491        let chunk = StreamChunk::from_pretty(
492            " I   I     I
493            - 1  10   100  D
494            - 1  11   101
495            - 1  11  -102",
496        );
497        let (ops, columns, visibility) = chunk.into_inner();
498
499        let visibilities = std::iter::repeat_n(visibility, agg_calls.len()).collect_vec();
500        let visibilities = deduplicater
501            .dedup_chunk(&ops, &columns, visibilities, &mut dedup_tables, None)
502            .await
503            .unwrap();
504        assert_eq!(
505            visibilities[0].iter().collect_vec(),
506            vec![false, true, true] // same as original chunk
507        );
508        assert_eq!(
509            visibilities[1].iter().collect_vec(),
510            // distinct on a
511            vec![
512                false, // hidden in original chunk
513                false, // not the last one
514                false, // not the last one
515            ]
516        );
517        assert_eq!(
518            visibilities[2].iter().collect_vec(),
519            // distinct on a, same as above
520            vec![
521                false, // hidden in original chunk
522                false, // not the last one
523                false, // not the last one
524            ]
525        );
526        assert_eq!(
527            visibilities[3].iter().collect_vec(),
528            // distinct on b
529            vec![
530                false, // hidden in original chunk
531                false, // not the last one
532                true,  // is the last one
533            ]
534        );
535
536        deduplicater.flush(&mut dedup_tables).unwrap();
537
538        epoch.inc_for_test();
539        for table in dedup_tables.values_mut() {
540            table.commit_for_test(epoch).await.unwrap();
541        }
542    }
543
544    #[tokio::test]
545    async fn test_distinct_deduplicater_delete() {
546        // Schema:
547        // a: int, b int, c int
548        // Agg calls:
549        // count(distinct a)
550        // Group keys:
551        // empty
552
553        let agg_calls = [
554            AggCall::from_pretty("(count:int8 $0:int8 distinct)"), // count(distinct a)
555        ];
556
557        let store = MemoryStateStore::new();
558        let mut epoch = EpochPair::new_test_epoch(test_epoch(1));
559        let mut dedup_tables = infer_dedup_tables(&agg_calls, &[], store).await;
560        for table in dedup_tables.values_mut() {
561            table.init_epoch(epoch).await.unwrap()
562        }
563
564        let mut deduplicater = DistinctDeduplicater::new(
565            &agg_calls,
566            Arc::new(AtomicU64::new(0)),
567            &dedup_tables,
568            &ActorContext::for_test(0),
569        );
570
571        // --- chunk 1 ---
572
573        let chunk = StreamChunk::from_pretty(
574            " I   I     I
575            + 1  10   100
576            - 1  10   100
577            + 2  21   201
578            + 2  22   202",
579        );
580        let (ops, columns, visibility) = chunk.into_inner();
581
582        let visibilities = std::iter::repeat_n(visibility, agg_calls.len()).collect_vec();
583        let visibilities = deduplicater
584            .dedup_chunk(&ops, &columns, visibilities, &mut dedup_tables, None)
585            .await
586            .unwrap();
587        assert_eq!(
588            visibilities[0].iter().collect_vec(),
589            vec![true, true, true, false]
590        );
591
592        deduplicater.flush(&mut dedup_tables).unwrap();
593
594        epoch.inc_for_test();
595        for table in dedup_tables.values_mut() {
596            table.commit_for_test(epoch).await.unwrap();
597        }
598
599        // the `a = 1` row should be deleted because all counts dropped to 0
600        let counts = dedup_tables[&0]
601            .get_row(OwnedRow::new(vec![Some(1i64.into())]))
602            .await
603            .unwrap();
604        assert!(counts.is_none());
605
606        let counts = dedup_tables[&0]
607            .get_row(OwnedRow::new(vec![Some(2i64.into())]))
608            .await
609            .unwrap();
610        assert_eq!(
611            counts.unwrap().iter().collect_vec(),
612            vec![Some(2i64.into())] // there're 2 rows with `a = 2`
613        );
614
615        // --- chunk 2 ---
616
617        let chunk = StreamChunk::from_pretty(
618            " I   I     I
619            - 2  21   201
620            - 2  22   202",
621        );
622        let (ops, columns, visibility) = chunk.into_inner();
623
624        let visibilities = std::iter::repeat_n(visibility, agg_calls.len()).collect_vec();
625        let visibilities = deduplicater
626            .dedup_chunk(&ops, &columns, visibilities, &mut dedup_tables, None)
627            .await
628            .unwrap();
629        assert_eq!(visibilities[0].iter().collect_vec(), vec![false, true]);
630        // Note that the `true` row here (2, 22, 202) is not the `true` row in the previous chunk (2, 21, 201),
631        // but this is not a problem because we only care about the distinct column.
632
633        deduplicater.flush(&mut dedup_tables).unwrap();
634
635        epoch.inc_for_test();
636        for table in dedup_tables.values_mut() {
637            table.commit_for_test(epoch).await.unwrap();
638        }
639
640        let counts = dedup_tables[&0]
641            .get_row(OwnedRow::new(vec![Some(2i64.into())]))
642            .await
643            .unwrap();
644        assert!(counts.is_none());
645    }
646
647    #[tokio::test]
648    async fn test_distinct_deduplicater_with_group() {
649        // Schema:
650        // a: int, b int, c int
651        // Agg calls:
652        // count(a), count(distinct a), count(distinct b)
653        // Group keys:
654        // c
655
656        let agg_calls = [
657            AggCall::from_pretty("(count:int8 $0:int8)"), // count(a)
658            AggCall::from_pretty("(count:int8 $0:int8 distinct)"), // count(distinct a)
659            AggCall::from_pretty("(count:int8 $1:int8 distinct)"), // count(distinct b)
660        ];
661
662        let group_key_types = [DataType::Int64];
663        let group_key = GroupKey::new(OwnedRow::new(vec![Some(100.into())]), None);
664
665        let store = MemoryStateStore::new();
666        let mut epoch = EpochPair::new_test_epoch(test_epoch(1));
667        let mut dedup_tables = infer_dedup_tables(&agg_calls, &group_key_types, store).await;
668        for table in dedup_tables.values_mut() {
669            table.init_epoch(epoch).await.unwrap()
670        }
671
672        let mut deduplicater = DistinctDeduplicater::new(
673            &agg_calls,
674            Arc::new(AtomicU64::new(0)),
675            &dedup_tables,
676            &ActorContext::for_test(0),
677        );
678
679        let chunk = StreamChunk::from_pretty(
680            " I   I     I
681            + 1  10   100
682            + 1  11   100
683            + 1  11   100
684            + 2  12   200  D
685            + 2  12   100",
686        );
687        let (ops, columns, visibility) = chunk.into_inner();
688
689        let visibilities = std::iter::repeat_n(visibility, agg_calls.len()).collect_vec();
690        let visibilities = deduplicater
691            .dedup_chunk(
692                &ops,
693                &columns,
694                visibilities,
695                &mut dedup_tables,
696                Some(&group_key),
697            )
698            .await
699            .unwrap();
700        assert_eq!(
701            visibilities[0].iter().collect_vec(),
702            vec![true, true, true, false, true] // same as original chunk
703        );
704        assert_eq!(
705            visibilities[1].iter().collect_vec(),
706            vec![true, false, false, false, true] // distinct on a
707        );
708        assert_eq!(
709            visibilities[2].iter().collect_vec(),
710            vec![true, true, false, false, true] // distinct on b
711        );
712
713        deduplicater.flush(&mut dedup_tables).unwrap();
714
715        epoch.inc_for_test();
716        for table in dedup_tables.values_mut() {
717            table.commit_for_test(epoch).await.unwrap();
718        }
719
720        let chunk = StreamChunk::from_pretty(
721            " I   I     I
722            - 1  10   100  D
723            - 1  11   100
724            - 1  11   100",
725        );
726        let (ops, columns, visibility) = chunk.into_inner();
727
728        let visibilities = std::iter::repeat_n(visibility, agg_calls.len()).collect_vec();
729        let visibilities = deduplicater
730            .dedup_chunk(
731                &ops,
732                &columns,
733                visibilities,
734                &mut dedup_tables,
735                Some(&group_key),
736            )
737            .await
738            .unwrap();
739        assert_eq!(
740            visibilities[0].iter().collect_vec(),
741            vec![false, true, true] // same as original chunk
742        );
743        assert_eq!(
744            visibilities[1].iter().collect_vec(),
745            // distinct on a
746            vec![
747                false, // hidden in original chunk
748                false, // not the last one
749                false, // not the last one
750            ]
751        );
752        assert_eq!(
753            visibilities[2].iter().collect_vec(),
754            // distinct on b
755            vec![
756                false, // hidden in original chunk
757                false, // not the last one
758                true,  // is the last one
759            ]
760        );
761
762        deduplicater.flush(&mut dedup_tables).unwrap();
763
764        epoch.inc_for_test();
765        for table in dedup_tables.values_mut() {
766            table.commit_for_test(epoch).await.unwrap();
767        }
768    }
769}