1use std::collections::HashMap;
16use std::marker::PhantomData;
17use std::sync::atomic::AtomicU64;
18
19use itertools::Itertools;
20use risingwave_common::array::{ArrayRef, Op};
21use risingwave_common::bitmap::{Bitmap, BitmapBuilder};
22use risingwave_common::row::{self, CompactedRow, RowExt};
23use risingwave_common::util::iter_util::ZipEqFast;
24use risingwave_expr::aggregate::AggCall;
25
26use super::agg_group::GroupKey;
27use crate::cache::ManagedLruCache;
28use crate::common::metrics::MetricsInfo;
29use crate::executor::monitor::AggDistinctDedupMetrics;
30use crate::executor::prelude::*;
31
32type DedupCache = ManagedLruCache<CompactedRow, Box<[i64]>>;
33
34struct ColumnDeduplicater<S: StateStore> {
36 cache: DedupCache,
37 metrics: AggDistinctDedupMetrics,
38 _phantom: PhantomData<S>,
39}
40
41impl<S: StateStore> ColumnDeduplicater<S> {
42 fn new(
43 distinct_col: usize,
44 dedup_table: &StateTable<S>,
45 watermark_sequence: Arc<AtomicU64>,
46 actor_ctx: &ActorContext,
47 ) -> Self {
48 let metrics_info = MetricsInfo::new(
49 actor_ctx.streaming_metrics.clone(),
50 dedup_table.table_id(),
51 actor_ctx.id,
52 format!("distinct dedup column {}", distinct_col),
53 );
54 let metrics = actor_ctx.streaming_metrics.new_agg_distinct_dedup_metrics(
55 dedup_table.table_id(),
56 actor_ctx.id,
57 actor_ctx.fragment_id,
58 );
59
60 Self {
61 cache: ManagedLruCache::unbounded(watermark_sequence, metrics_info),
62 metrics,
63 _phantom: PhantomData,
64 }
65 }
66
67 async fn dedup(
68 &mut self,
69 ops: &[Op],
70 column: &ArrayRef,
71 mut visibilities: Vec<&mut Bitmap>,
72 dedup_table: &mut StateTable<S>,
73 group_key: Option<&GroupKey>,
74 ) -> StreamExecutorResult<()> {
75 let n_calls = visibilities.len();
76
77 let mut prev_counts_map = HashMap::new(); let mut vis_masks_inv = (0..visibilities.len())
81 .map(|_| BitmapBuilder::zeroed(column.len()))
82 .collect_vec();
83 for (datum_idx, (op, datum)) in ops.iter().zip_eq_fast(column.iter()).enumerate() {
84 if !visibilities.iter().any(|vis| vis.is_set(datum_idx)) {
86 continue;
87 }
88
89 let row_prefix = group_key.map(GroupKey::table_row).chain(row::once(datum));
91 let table_pk = group_key.map(GroupKey::table_pk).chain(row::once(datum));
92 let cache_key =
93 CompactedRow::from(group_key.map(GroupKey::cache_key).chain(row::once(datum)));
94
95 self.metrics.agg_distinct_total_cache_count.inc();
96 let mut counts = if self.cache.contains(&cache_key) {
99 self.cache.get_mut(&cache_key).unwrap()
100 } else {
101 self.metrics.agg_distinct_cache_miss_count.inc();
102 let counts = if let Some(counts_row) =
104 dedup_table.get_row(&table_pk).await? as Option<OwnedRow>
105 {
106 counts_row
107 .iter()
108 .map(|v| v.map_or(0, ScalarRefImpl::into_int64))
109 .collect()
110 } else {
111 dedup_table.insert(
113 (&row_prefix).chain(row::repeat_n(Some(ScalarImpl::from(0i64)), n_calls)),
114 );
115 vec![0; n_calls].into_boxed_slice()
116 };
117 self.cache.put(cache_key.clone(), counts); self.cache.get_mut(&cache_key).unwrap()
120 };
121 debug_assert_eq!(counts.len(), visibilities.len());
122
123 prev_counts_map
125 .entry(datum)
126 .or_insert_with(|| counts.to_owned());
127
128 match op {
129 Op::Insert | Op::UpdateInsert => {
130 for (i, vis) in visibilities.iter().enumerate() {
132 if vis.is_set(datum_idx) {
133 counts[i] += 1;
134 if counts[i] > 1 {
135 vis_masks_inv[i].set(datum_idx, true);
137 }
138 }
139 }
140 }
141 Op::Delete | Op::UpdateDelete => {
142 for (i, vis) in visibilities.iter().enumerate() {
144 if vis.is_set(datum_idx) {
145 counts[i] -= 1;
146 debug_assert!(counts[i] >= 0);
147 if counts[i] > 0 {
148 vis_masks_inv[i].set(datum_idx, true);
150 }
151 }
152 }
153 }
154 }
155 }
156
157 prev_counts_map
159 .into_iter()
160 .for_each(|(datum, prev_counts)| {
161 let row_prefix = group_key.map(GroupKey::table_row).chain(row::once(datum));
162 let cache_key =
163 CompactedRow::from(group_key.map(GroupKey::cache_key).chain(row::once(datum)));
164 let new_counts = OwnedRow::new(
165 self.cache
166 .get(&cache_key)
167 .expect("distinct key in `prev_counts_map` must also exist in `self.cache`")
168 .iter()
169 .map(|&v| Some(v.into()))
170 .collect(),
171 );
172 let old_counts =
173 OwnedRow::new(prev_counts.iter().map(|&v| Some(v.into())).collect());
174 let old_row = row_prefix.chain(old_counts);
175 if new_counts
176 .iter()
177 .all(|v| v.map_or(0, ScalarRefImpl::into_int64) == 0)
178 {
179 dedup_table.delete(old_row);
181 self.cache.remove(&cache_key);
182 } else {
183 dedup_table.update(old_row, row_prefix.chain(new_counts));
184 }
185 });
186
187 for (vis, vis_mask_inv) in visibilities.iter_mut().zip_eq(vis_masks_inv.into_iter()) {
188 **vis &= !vis_mask_inv.finish();
190 }
191
192 self.cache.evict();
195
196 Ok(())
197 }
198
199 fn flush(&mut self, _dedup_table: &StateTable<S>) {
201 self.cache.evict();
205
206 self.metrics
207 .agg_distinct_cached_entry_count
208 .set(self.cache.len() as i64);
209 }
210}
211
212unsafe fn get_many_mut_from_slice<'a, T>(slice: &'a mut [T], indices: &[usize]) -> Vec<&'a mut T> {
216 let mut res = Vec::with_capacity(indices.len());
217 let ptr = slice.as_mut_ptr();
218 for &idx in indices {
219 unsafe {
220 res.push(&mut *ptr.add(idx));
221 }
222 }
223 res
224}
225
226pub struct DistinctDeduplicater<S: StateStore> {
227 deduplicaters: HashMap<usize, (Box<[usize]>, ColumnDeduplicater<S>)>,
230}
231
232impl<S: StateStore> DistinctDeduplicater<S> {
233 pub fn new(
234 agg_calls: &[AggCall],
235 watermark_epoch: Arc<AtomicU64>,
236 distinct_dedup_tables: &HashMap<usize, StateTable<S>>,
237 actor_ctx: &ActorContext,
238 ) -> Self {
239 let deduplicaters: HashMap<_, _> = agg_calls
240 .iter()
241 .enumerate()
242 .filter(|(_, call)| call.distinct) .into_group_map_by(|(_, call)| call.args.val_indices()[0])
244 .into_iter()
245 .map(|(distinct_col, indices_and_calls)| {
246 let dedup_table = distinct_dedup_tables.get(&distinct_col).unwrap();
247 let call_indices: Box<[_]> = indices_and_calls.into_iter().map(|v| v.0).collect();
248 let deduplicater = ColumnDeduplicater::new(
249 distinct_col,
250 dedup_table,
251 watermark_epoch.clone(),
252 actor_ctx,
253 );
254 (distinct_col, (call_indices, deduplicater))
255 })
256 .collect();
257 Self { deduplicaters }
258 }
259
260 pub fn dedup_caches_mut(&mut self) -> impl Iterator<Item = &mut DedupCache> {
261 self.deduplicaters
262 .values_mut()
263 .map(|(_, deduplicater)| &mut deduplicater.cache)
264 }
265
266 pub async fn dedup_chunk(
269 &mut self,
270 ops: &[Op],
271 columns: &[ArrayRef],
272 mut visibilities: Vec<Bitmap>,
273 dedup_tables: &mut HashMap<usize, StateTable<S>>,
274 group_key: Option<&GroupKey>,
275 ) -> StreamExecutorResult<Vec<Bitmap>> {
276 for (distinct_col, (call_indices, deduplicater)) in &mut self.deduplicaters {
277 let column = &columns[*distinct_col];
278 let dedup_table = dedup_tables.get_mut(distinct_col).unwrap();
279 let visibilities = unsafe { get_many_mut_from_slice(&mut visibilities, call_indices) };
283 deduplicater
284 .dedup(ops, column, visibilities, dedup_table, group_key)
285 .await?;
286 }
287 Ok(visibilities)
288 }
289
290 pub fn flush(
292 &mut self,
293 dedup_tables: &mut HashMap<usize, StateTable<S>>,
294 ) -> StreamExecutorResult<()> {
295 for (distinct_col, (_, deduplicater)) in &mut self.deduplicaters {
296 let dedup_table = dedup_tables.get_mut(distinct_col).unwrap();
297 deduplicater.flush(dedup_table);
298 }
299 Ok(())
300 }
301}
302
303#[cfg(test)]
304mod tests {
305 use risingwave_common::catalog::{ColumnDesc, ColumnId, TableId};
306 use risingwave_common::test_prelude::StreamChunkTestExt;
307 use risingwave_common::util::epoch::{EpochPair, test_epoch};
308 use risingwave_common::util::sort_util::OrderType;
309 use risingwave_storage::memory::MemoryStateStore;
310
311 use super::*;
312 use crate::common::table::test_utils::gen_pbtable_with_value_indices;
313
314 async fn infer_dedup_tables<S: StateStore>(
315 agg_calls: &[AggCall],
316 group_key_types: &[DataType],
317 store: S,
318 ) -> HashMap<usize, StateTable<S>> {
319 let mut dedup_tables = HashMap::new();
321
322 for (distinct_col, indices_and_calls) in agg_calls
323 .iter()
324 .enumerate()
325 .filter(|(_, call)| call.distinct) .into_group_map_by(|(_, call)| call.args.val_indices()[0])
327 {
328 let mut columns = vec![];
329 let mut order_types = vec![];
330
331 let mut next_column_id = 0;
332 let mut add_column_desc = |data_type: DataType| {
333 columns.push(ColumnDesc::unnamed(
334 ColumnId::new(next_column_id),
335 data_type,
336 ));
337 next_column_id += 1;
338 };
339
340 for data_type in group_key_types {
342 add_column_desc(data_type.clone());
343 order_types.push(OrderType::ascending());
344 }
345
346 add_column_desc(indices_and_calls[0].1.args.arg_types()[0].clone());
348 order_types.push(OrderType::ascending());
349
350 for (_, _) in indices_and_calls {
352 add_column_desc(DataType::Int64);
353 }
354
355 let pk_indices = (0..(group_key_types.len() + 1)).collect::<Vec<_>>();
356 let value_indices = ((group_key_types.len() + 1)..columns.len()).collect::<Vec<_>>();
357 let table_pb = gen_pbtable_with_value_indices(
358 TableId::new(2333 + distinct_col as u32),
359 columns,
360 order_types,
361 pk_indices,
362 0,
363 value_indices,
364 );
365 let table = StateTable::from_table_catalog(&table_pb, store.clone(), None).await;
366 dedup_tables.insert(distinct_col, table);
367 }
368
369 dedup_tables
370 }
371
372 #[tokio::test]
373 async fn test_distinct_deduplicater() {
374 let agg_calls = [
382 AggCall::from_pretty("(count:int8 $0:int8)"), AggCall::from_pretty("(count:int8 $0:int8 distinct)"), AggCall::from_pretty("( sum:int8 $0:int8 distinct)"), AggCall::from_pretty("(count:int8 $1:int8 distinct)"), ];
387
388 let store = MemoryStateStore::new();
389 let mut epoch = EpochPair::new_test_epoch(test_epoch(1));
390 let mut dedup_tables = infer_dedup_tables(&agg_calls, &[], store).await;
391 for table in dedup_tables.values_mut() {
392 table.init_epoch(epoch).await.unwrap()
393 }
394
395 let mut deduplicater = DistinctDeduplicater::new(
396 &agg_calls,
397 Arc::new(AtomicU64::new(0)),
398 &dedup_tables,
399 &ActorContext::for_test(0),
400 );
401
402 let chunk = StreamChunk::from_pretty(
405 " I I I
406 + 1 10 100
407 + 1 11 101",
408 );
409 let (ops, columns, visibility) = chunk.into_inner();
410
411 let visibilities = std::iter::repeat_n(visibility, agg_calls.len()).collect_vec();
412 let visibilities = deduplicater
413 .dedup_chunk(&ops, &columns, visibilities, &mut dedup_tables, None)
414 .await
415 .unwrap();
416 assert_eq!(
417 visibilities[0].iter().collect_vec(),
418 vec![true, true] );
420 assert_eq!(
421 visibilities[1].iter().collect_vec(),
422 vec![true, false] );
424 assert_eq!(
425 visibilities[2].iter().collect_vec(),
426 vec![true, false] );
428 assert_eq!(
429 visibilities[3].iter().collect_vec(),
430 vec![true, true] );
432
433 deduplicater.flush(&mut dedup_tables).unwrap();
434
435 epoch.inc_for_test();
436 for table in dedup_tables.values_mut() {
437 table.commit_for_test(epoch).await.unwrap();
438 }
439
440 let chunk = StreamChunk::from_pretty(
443 " I I I
444 + 1 11 -102
445 + 2 12 103 D
446 + 2 12 -104",
447 );
448 let (ops, columns, visibility) = chunk.into_inner();
449
450 let visibilities = std::iter::repeat_n(visibility, agg_calls.len()).collect_vec();
451 let visibilities = deduplicater
452 .dedup_chunk(&ops, &columns, visibilities, &mut dedup_tables, None)
453 .await
454 .unwrap();
455 assert_eq!(
456 visibilities[0].iter().collect_vec(),
457 vec![true, false, true] );
459 assert_eq!(
460 visibilities[1].iter().collect_vec(),
461 vec![false, false, true] );
463 assert_eq!(
464 visibilities[2].iter().collect_vec(),
465 vec![false, false, true] );
467 assert_eq!(
468 visibilities[3].iter().collect_vec(),
469 vec![false, false, true] );
471
472 deduplicater.flush(&mut dedup_tables).unwrap();
473
474 epoch.inc_for_test();
475 for table in dedup_tables.values_mut() {
476 table.commit_for_test(epoch).await.unwrap();
477 }
478
479 drop(deduplicater);
480
481 let mut deduplicater = DistinctDeduplicater::new(
483 &agg_calls,
484 Arc::new(AtomicU64::new(0)),
485 &dedup_tables,
486 &ActorContext::for_test(0),
487 );
488
489 let chunk = StreamChunk::from_pretty(
492 " I I I
493 - 1 10 100 D
494 - 1 11 101
495 - 1 11 -102",
496 );
497 let (ops, columns, visibility) = chunk.into_inner();
498
499 let visibilities = std::iter::repeat_n(visibility, agg_calls.len()).collect_vec();
500 let visibilities = deduplicater
501 .dedup_chunk(&ops, &columns, visibilities, &mut dedup_tables, None)
502 .await
503 .unwrap();
504 assert_eq!(
505 visibilities[0].iter().collect_vec(),
506 vec![false, true, true] );
508 assert_eq!(
509 visibilities[1].iter().collect_vec(),
510 vec![
512 false, false, false, ]
516 );
517 assert_eq!(
518 visibilities[2].iter().collect_vec(),
519 vec![
521 false, false, false, ]
525 );
526 assert_eq!(
527 visibilities[3].iter().collect_vec(),
528 vec![
530 false, false, true, ]
534 );
535
536 deduplicater.flush(&mut dedup_tables).unwrap();
537
538 epoch.inc_for_test();
539 for table in dedup_tables.values_mut() {
540 table.commit_for_test(epoch).await.unwrap();
541 }
542 }
543
544 #[tokio::test]
545 async fn test_distinct_deduplicater_delete() {
546 let agg_calls = [
554 AggCall::from_pretty("(count:int8 $0:int8 distinct)"), ];
556
557 let store = MemoryStateStore::new();
558 let mut epoch = EpochPair::new_test_epoch(test_epoch(1));
559 let mut dedup_tables = infer_dedup_tables(&agg_calls, &[], store).await;
560 for table in dedup_tables.values_mut() {
561 table.init_epoch(epoch).await.unwrap()
562 }
563
564 let mut deduplicater = DistinctDeduplicater::new(
565 &agg_calls,
566 Arc::new(AtomicU64::new(0)),
567 &dedup_tables,
568 &ActorContext::for_test(0),
569 );
570
571 let chunk = StreamChunk::from_pretty(
574 " I I I
575 + 1 10 100
576 - 1 10 100
577 + 2 21 201
578 + 2 22 202",
579 );
580 let (ops, columns, visibility) = chunk.into_inner();
581
582 let visibilities = std::iter::repeat_n(visibility, agg_calls.len()).collect_vec();
583 let visibilities = deduplicater
584 .dedup_chunk(&ops, &columns, visibilities, &mut dedup_tables, None)
585 .await
586 .unwrap();
587 assert_eq!(
588 visibilities[0].iter().collect_vec(),
589 vec![true, true, true, false]
590 );
591
592 deduplicater.flush(&mut dedup_tables).unwrap();
593
594 epoch.inc_for_test();
595 for table in dedup_tables.values_mut() {
596 table.commit_for_test(epoch).await.unwrap();
597 }
598
599 let counts = dedup_tables[&0]
601 .get_row(OwnedRow::new(vec![Some(1i64.into())]))
602 .await
603 .unwrap();
604 assert!(counts.is_none());
605
606 let counts = dedup_tables[&0]
607 .get_row(OwnedRow::new(vec![Some(2i64.into())]))
608 .await
609 .unwrap();
610 assert_eq!(
611 counts.unwrap().iter().collect_vec(),
612 vec![Some(2i64.into())] );
614
615 let chunk = StreamChunk::from_pretty(
618 " I I I
619 - 2 21 201
620 - 2 22 202",
621 );
622 let (ops, columns, visibility) = chunk.into_inner();
623
624 let visibilities = std::iter::repeat_n(visibility, agg_calls.len()).collect_vec();
625 let visibilities = deduplicater
626 .dedup_chunk(&ops, &columns, visibilities, &mut dedup_tables, None)
627 .await
628 .unwrap();
629 assert_eq!(visibilities[0].iter().collect_vec(), vec![false, true]);
630 deduplicater.flush(&mut dedup_tables).unwrap();
634
635 epoch.inc_for_test();
636 for table in dedup_tables.values_mut() {
637 table.commit_for_test(epoch).await.unwrap();
638 }
639
640 let counts = dedup_tables[&0]
641 .get_row(OwnedRow::new(vec![Some(2i64.into())]))
642 .await
643 .unwrap();
644 assert!(counts.is_none());
645 }
646
647 #[tokio::test]
648 async fn test_distinct_deduplicater_with_group() {
649 let agg_calls = [
657 AggCall::from_pretty("(count:int8 $0:int8)"), AggCall::from_pretty("(count:int8 $0:int8 distinct)"), AggCall::from_pretty("(count:int8 $1:int8 distinct)"), ];
661
662 let group_key_types = [DataType::Int64];
663 let group_key = GroupKey::new(OwnedRow::new(vec![Some(100.into())]), None);
664
665 let store = MemoryStateStore::new();
666 let mut epoch = EpochPair::new_test_epoch(test_epoch(1));
667 let mut dedup_tables = infer_dedup_tables(&agg_calls, &group_key_types, store).await;
668 for table in dedup_tables.values_mut() {
669 table.init_epoch(epoch).await.unwrap()
670 }
671
672 let mut deduplicater = DistinctDeduplicater::new(
673 &agg_calls,
674 Arc::new(AtomicU64::new(0)),
675 &dedup_tables,
676 &ActorContext::for_test(0),
677 );
678
679 let chunk = StreamChunk::from_pretty(
680 " I I I
681 + 1 10 100
682 + 1 11 100
683 + 1 11 100
684 + 2 12 200 D
685 + 2 12 100",
686 );
687 let (ops, columns, visibility) = chunk.into_inner();
688
689 let visibilities = std::iter::repeat_n(visibility, agg_calls.len()).collect_vec();
690 let visibilities = deduplicater
691 .dedup_chunk(
692 &ops,
693 &columns,
694 visibilities,
695 &mut dedup_tables,
696 Some(&group_key),
697 )
698 .await
699 .unwrap();
700 assert_eq!(
701 visibilities[0].iter().collect_vec(),
702 vec![true, true, true, false, true] );
704 assert_eq!(
705 visibilities[1].iter().collect_vec(),
706 vec![true, false, false, false, true] );
708 assert_eq!(
709 visibilities[2].iter().collect_vec(),
710 vec![true, true, false, false, true] );
712
713 deduplicater.flush(&mut dedup_tables).unwrap();
714
715 epoch.inc_for_test();
716 for table in dedup_tables.values_mut() {
717 table.commit_for_test(epoch).await.unwrap();
718 }
719
720 let chunk = StreamChunk::from_pretty(
721 " I I I
722 - 1 10 100 D
723 - 1 11 100
724 - 1 11 100",
725 );
726 let (ops, columns, visibility) = chunk.into_inner();
727
728 let visibilities = std::iter::repeat_n(visibility, agg_calls.len()).collect_vec();
729 let visibilities = deduplicater
730 .dedup_chunk(
731 &ops,
732 &columns,
733 visibilities,
734 &mut dedup_tables,
735 Some(&group_key),
736 )
737 .await
738 .unwrap();
739 assert_eq!(
740 visibilities[0].iter().collect_vec(),
741 vec![false, true, true] );
743 assert_eq!(
744 visibilities[1].iter().collect_vec(),
745 vec![
747 false, false, false, ]
751 );
752 assert_eq!(
753 visibilities[2].iter().collect_vec(),
754 vec![
756 false, false, true, ]
760 );
761
762 deduplicater.flush(&mut dedup_tables).unwrap();
763
764 epoch.inc_for_test();
765 for table in dedup_tables.values_mut() {
766 table.commit_for_test(epoch).await.unwrap();
767 }
768 }
769}