1use std::collections::HashMap;
16use std::marker::PhantomData;
17
18use futures::future::try_join_all;
19use futures::stream;
20use itertools::Itertools;
21use risingwave_common::bitmap::{Bitmap, BitmapBuilder};
22use risingwave_common::hash::{HashKey, PrecomputedBuildHasher};
23use risingwave_common::util::epoch::EpochPair;
24use risingwave_common::util::iter_util::ZipEqFast;
25use risingwave_common_estimate_size::EstimateSize;
26use risingwave_common_estimate_size::collections::EstimatedHashMap;
27use risingwave_expr::aggregate::{AggCall, BoxedAggregateFunction, build_retractable};
28use risingwave_pb::stream_plan::PbAggNodeVersion;
29
30use super::agg_group::{
31 AggGroup as GenericAggGroup, AggStateCacheStats, GroupKey, OnlyOutputIfHasInput,
32};
33use super::agg_state::AggStateStorage;
34use super::distinct::DistinctDeduplicater;
35use super::{AggExecutorArgs, HashAggExecutorExtraArgs, agg_call_filter_res, iter_table_storage};
36use crate::cache::ManagedLruCache;
37use crate::common::metrics::MetricsInfo;
38use crate::common::table::state_table::StateTablePostCommit;
39use crate::executor::eowc::SortBuffer;
40use crate::executor::monitor::HashAggMetrics;
41use crate::executor::prelude::*;
42
43type AggGroup<S> = GenericAggGroup<S, OnlyOutputIfHasInput>;
44type BoxedAggGroup<S> = Box<AggGroup<S>>;
45
46impl<S: StateStore> EstimateSize for BoxedAggGroup<S> {
47 fn estimated_heap_size(&self) -> usize {
48 self.as_ref().estimated_size()
49 }
50}
51
52type AggGroupCache<K, S> = ManagedLruCache<K, Option<BoxedAggGroup<S>>, PrecomputedBuildHasher>;
53
54pub struct HashAggExecutor<K: HashKey, S: StateStore> {
65 input: Executor,
66 inner: ExecutorInner<K, S>,
67}
68
69struct ExecutorInner<K: HashKey, S: StateStore> {
70 _phantom: PhantomData<K>,
71
72 version: PbAggNodeVersion,
74
75 actor_ctx: ActorContextRef,
76 info: ExecutorInfo,
77
78 input_pk_indices: Vec<usize>,
80
81 input_schema: Schema,
83
84 group_key_indices: Vec<usize>,
87
88 group_key_table_pk_projection: Arc<[usize]>,
90
91 agg_calls: Vec<AggCall>,
93
94 agg_funcs: Vec<BoxedAggregateFunction>,
96
97 row_count_index: usize,
99
100 storages: Vec<AggStateStorage<S>>,
103
104 intermediate_state_table: StateTable<S>,
109
110 distinct_dedup_tables: HashMap<usize, StateTable<S>>,
113
114 watermark_sequence: AtomicU64Ref,
116
117 extreme_cache_size: usize,
119
120 chunk_size: usize,
122
123 max_dirty_groups_heap_size: usize,
125
126 emit_on_window_close: bool,
128}
129
130impl<K: HashKey, S: StateStore> ExecutorInner<K, S> {
131 fn all_state_tables_mut(&mut self) -> impl Iterator<Item = &mut StateTable<S>> {
132 iter_table_storage(&mut self.storages)
133 .chain(self.distinct_dedup_tables.values_mut())
134 .chain(std::iter::once(&mut self.intermediate_state_table))
135 }
136}
137
138struct ExecutionVars<K: HashKey, S: StateStore> {
139 metrics: HashAggMetrics,
140
141 stats: ExecutionStats,
143
144 agg_group_cache: AggGroupCache<K, S>,
146
147 dirty_groups: EstimatedHashMap<K, BoxedAggGroup<S>>,
149
150 distinct_dedup: DistinctDeduplicater<S>,
152
153 buffered_watermarks: Vec<Option<Watermark>>,
155
156 window_watermark: Option<ScalarImpl>,
158
159 chunk_builder: StreamChunkBuilder,
161
162 buffer: SortBuffer<S>,
163}
164
165#[derive(Debug, Default)]
166struct ExecutionStats {
167 lookup_miss_count: u64,
169 total_lookup_count: u64,
170
171 chunk_lookup_miss_count: u64,
173 chunk_total_lookup_count: u64,
174
175 agg_state_cache_lookup_count: u64,
177 agg_state_cache_miss_count: u64,
178}
179
180impl ExecutionStats {
181 fn merge_state_cache_stats(&mut self, other: AggStateCacheStats) {
182 self.agg_state_cache_lookup_count += other.agg_state_cache_lookup_count;
183 self.agg_state_cache_miss_count += other.agg_state_cache_miss_count;
184 }
185}
186
187impl<K: HashKey, S: StateStore> Execute for HashAggExecutor<K, S> {
188 fn execute(self: Box<Self>) -> BoxedMessageStream {
189 self.execute_inner().boxed()
190 }
191}
192
193impl<K: HashKey, S: StateStore> HashAggExecutor<K, S> {
194 pub fn new(args: AggExecutorArgs<S, HashAggExecutorExtraArgs>) -> StreamResult<Self> {
195 let input_info = args.input.info().clone();
196
197 let group_key_len = args.extra.group_key_indices.len();
198 let group_key_table_pk_projection =
200 &args.intermediate_state_table.pk_indices()[..group_key_len];
201 assert!(
202 group_key_table_pk_projection
203 .iter()
204 .sorted()
205 .copied()
206 .eq(0..group_key_len)
207 );
208
209 Ok(Self {
210 input: args.input,
211 inner: ExecutorInner {
212 _phantom: PhantomData,
213 version: args.version,
214 actor_ctx: args.actor_ctx,
215 info: args.info,
216 input_pk_indices: input_info.pk_indices,
217 input_schema: input_info.schema,
218 group_key_indices: args.extra.group_key_indices,
219 group_key_table_pk_projection: group_key_table_pk_projection.to_vec().into(),
220 agg_funcs: args.agg_calls.iter().map(build_retractable).try_collect()?,
221 agg_calls: args.agg_calls,
222 row_count_index: args.row_count_index,
223 storages: args.storages,
224 intermediate_state_table: args.intermediate_state_table,
225 distinct_dedup_tables: args.distinct_dedup_tables,
226 watermark_sequence: args.watermark_epoch,
227 extreme_cache_size: args.extreme_cache_size,
228 chunk_size: args.extra.chunk_size,
229 max_dirty_groups_heap_size: args.extra.max_dirty_groups_heap_size,
230 emit_on_window_close: args.extra.emit_on_window_close,
231 },
232 })
233 }
234
235 fn get_group_visibilities(keys: Vec<K>, base_visibility: &Bitmap) -> Vec<(K, Bitmap)> {
242 let n_rows = keys.len();
243 let mut vis_builders = HashMap::new();
244 for (row_idx, key) in keys
245 .into_iter()
246 .enumerate()
247 .filter(|(row_idx, _)| base_visibility.is_set(*row_idx))
248 {
249 vis_builders
250 .entry(key)
251 .or_insert_with(|| BitmapBuilder::zeroed(n_rows))
252 .set(row_idx, true);
253 }
254 vis_builders
255 .into_iter()
256 .map(|(key, vis_builder)| (key, vis_builder.finish()))
257 .collect()
258 }
259
260 async fn touch_agg_groups(
263 this: &ExecutorInner<K, S>,
264 vars: &mut ExecutionVars<K, S>,
265 keys: impl IntoIterator<Item = &K>,
266 ) -> StreamExecutorResult<()> {
267 let group_key_types = &this.info.schema.data_types()[..this.group_key_indices.len()];
268 let futs = keys
269 .into_iter()
270 .filter_map(|key| {
271 vars.stats.total_lookup_count += 1;
272 if vars.dirty_groups.contains_key(key) {
273 return None;
275 }
276 match vars.agg_group_cache.get_mut(key) {
277 Some(mut agg_group) => {
278 let agg_group: &mut Option<_> = &mut agg_group;
279 assert!(
280 agg_group.is_some(),
281 "invalid state: AggGroup is None in cache but not dirty"
282 );
283 vars.dirty_groups
285 .insert(key.clone(), agg_group.take().unwrap());
286 None }
288 None => {
289 vars.stats.lookup_miss_count += 1;
290 Some(async {
291 let (agg_group, stats) = AggGroup::create(
294 this.version,
295 Some(GroupKey::new(
296 key.deserialize(group_key_types)?,
297 Some(this.group_key_table_pk_projection.clone()),
298 )),
299 &this.agg_calls,
300 &this.agg_funcs,
301 &this.storages,
302 &this.intermediate_state_table,
303 &this.input_pk_indices,
304 this.row_count_index,
305 this.emit_on_window_close,
306 this.extreme_cache_size,
307 &this.input_schema,
308 )
309 .await?;
310 Ok::<_, StreamExecutorError>((key.clone(), Box::new(agg_group), stats))
311 })
312 }
313 }
314 })
315 .collect_vec(); vars.stats.chunk_total_lookup_count += 1;
318 if !futs.is_empty() {
319 vars.stats.chunk_lookup_miss_count += 1;
321 let mut buffered = stream::iter(futs).buffer_unordered(10).fuse();
322 while let Some(result) = buffered.next().await {
323 let (key, agg_group, stats) = result?;
324 vars.stats.merge_state_cache_stats(stats);
325 let none = vars.dirty_groups.insert(key, agg_group);
326 debug_assert!(none.is_none());
327 }
328 }
329 Ok(())
330 }
331
332 async fn apply_chunk(
333 this: &mut ExecutorInner<K, S>,
334 vars: &mut ExecutionVars<K, S>,
335 chunk: StreamChunk,
336 ) -> StreamExecutorResult<()> {
337 let keys = K::build_many(&this.group_key_indices, chunk.data_chunk());
339 let group_visibilities = Self::get_group_visibilities(keys, chunk.visibility());
340
341 Self::touch_agg_groups(this, vars, group_visibilities.iter().map(|(k, _)| k)).await?;
343
344 let mut call_visibilities = Vec::with_capacity(this.agg_calls.len());
346 for agg_call in &this.agg_calls {
347 let agg_call_filter_res = agg_call_filter_res(agg_call, &chunk).await?;
348 call_visibilities.push(agg_call_filter_res);
349 }
350
351 for ((call, storage), visibility) in (this.agg_calls.iter())
354 .zip_eq_fast(&mut this.storages)
355 .zip_eq_fast(call_visibilities.iter())
356 {
357 if let AggStateStorage::MaterializedInput { table, mapping, .. } = storage
358 && !call.distinct
359 {
360 let chunk = chunk.project_with_vis(mapping.upstream_columns(), visibility.clone());
361 table.write_chunk(chunk);
362 }
363 }
364
365 for (key, visibility) in group_visibilities {
367 let agg_group: &mut BoxedAggGroup<_> = &mut vars.dirty_groups.get_mut(&key).unwrap();
368
369 let visibilities = call_visibilities
370 .iter()
371 .map(|call_vis| call_vis & &visibility)
372 .collect();
373 let visibilities = vars
374 .distinct_dedup
375 .dedup_chunk(
376 chunk.ops(),
377 chunk.columns(),
378 visibilities,
379 &mut this.distinct_dedup_tables,
380 agg_group.group_key(),
381 )
382 .await?;
383 for ((call, storage), visibility) in (this.agg_calls.iter())
384 .zip_eq_fast(&mut this.storages)
385 .zip_eq_fast(visibilities.iter())
386 {
387 if let AggStateStorage::MaterializedInput { table, mapping, .. } = storage
388 && call.distinct
389 {
390 let chunk =
391 chunk.project_with_vis(mapping.upstream_columns(), visibility.clone());
392 table.write_chunk(chunk);
393 }
394 }
395 agg_group
396 .apply_chunk(&chunk, &this.agg_calls, &this.agg_funcs, visibilities)
397 .await?;
398 }
399
400 vars.metrics
402 .agg_dirty_groups_count
403 .set(vars.dirty_groups.len() as i64);
404 vars.metrics
405 .agg_dirty_groups_heap_size
406 .set(vars.dirty_groups.estimated_heap_size() as i64);
407
408 Ok(())
409 }
410
411 #[try_stream(ok = StreamChunk, error = StreamExecutorError)]
412 async fn flush_data<'a>(this: &'a mut ExecutorInner<K, S>, vars: &'a mut ExecutionVars<K, S>) {
413 let window_watermark = vars.window_watermark.take();
414
415 for mut agg_group in vars.dirty_groups.values_mut() {
417 let Some(inter_states_change) = agg_group.build_states_change(&this.agg_funcs)? else {
418 continue;
419 };
420
421 if this.emit_on_window_close {
422 vars.buffer
423 .apply_change(inter_states_change, &mut this.intermediate_state_table);
424 } else {
425 this.intermediate_state_table
426 .write_record(inter_states_change);
427 }
428 }
429
430 if this.emit_on_window_close {
431 if let Some(watermark) = window_watermark.as_ref() {
433 #[for_await]
434 for row in vars
435 .buffer
436 .consume(watermark.clone(), &mut this.intermediate_state_table)
437 {
438 let row = row?;
439 let group_key = row
440 .clone()
441 .into_iter()
442 .take(this.group_key_indices.len())
443 .collect();
444 let inter_states = row.into_iter().skip(this.group_key_indices.len()).collect();
445
446 let mut agg_group = AggGroup::<S>::for_eowc_output(
447 this.version,
448 Some(GroupKey::new(
449 group_key,
450 Some(this.group_key_table_pk_projection.clone()),
451 )),
452 &this.agg_calls,
453 &this.agg_funcs,
454 &this.storages,
455 &inter_states,
456 &this.input_pk_indices,
457 this.row_count_index,
458 this.emit_on_window_close,
459 this.extreme_cache_size,
460 &this.input_schema,
461 )?;
462
463 let (change, stats) = agg_group
464 .build_outputs_change(&this.storages, &this.agg_funcs)
465 .await?;
466 vars.stats.merge_state_cache_stats(stats);
467
468 if let Some(change) = change
469 && let Some(chunk) = vars.chunk_builder.append_record(change)
470 {
471 yield chunk;
472 }
473 }
474 }
475 } else {
476 for mut agg_group in vars.dirty_groups.values_mut() {
479 let agg_group = agg_group.as_mut();
480 let (change, stats) = agg_group
481 .build_outputs_change(&this.storages, &this.agg_funcs)
482 .await?;
483 vars.stats.merge_state_cache_stats(stats);
484
485 if let Some(change) = change
486 && let Some(chunk) = vars.chunk_builder.append_record(change)
487 {
488 yield chunk;
489 }
490 }
491 }
492
493 for (key, agg_group) in vars.dirty_groups.drain() {
495 vars.agg_group_cache.put(key, Some(agg_group));
496 }
497
498 if let Some(chunk) = vars.chunk_builder.take() {
500 yield chunk;
501 }
502
503 if let Some(watermark) = window_watermark {
504 this.all_state_tables_mut()
506 .for_each(|table| table.update_watermark(watermark.clone()));
507 }
508
509 vars.distinct_dedup.flush(&mut this.distinct_dedup_tables)?;
511
512 vars.agg_group_cache.evict();
514 }
515
516 fn flush_metrics(_this: &ExecutorInner<K, S>, vars: &mut ExecutionVars<K, S>) {
517 vars.metrics
518 .agg_lookup_miss_count
519 .inc_by(std::mem::take(&mut vars.stats.lookup_miss_count));
520 vars.metrics
521 .agg_total_lookup_count
522 .inc_by(std::mem::take(&mut vars.stats.total_lookup_count));
523 vars.metrics
524 .agg_cached_entry_count
525 .set(vars.agg_group_cache.len() as i64);
526 vars.metrics
527 .agg_chunk_lookup_miss_count
528 .inc_by(std::mem::take(&mut vars.stats.chunk_lookup_miss_count));
529 vars.metrics
530 .agg_chunk_total_lookup_count
531 .inc_by(std::mem::take(&mut vars.stats.chunk_total_lookup_count));
532 vars.metrics
533 .agg_state_cache_lookup_count
534 .inc_by(std::mem::take(&mut vars.stats.agg_state_cache_lookup_count));
535 vars.metrics
536 .agg_state_cache_miss_count
537 .inc_by(std::mem::take(&mut vars.stats.agg_state_cache_miss_count));
538 }
539
540 async fn commit_state_tables(
541 this: &mut ExecutorInner<K, S>,
542 epoch: EpochPair,
543 ) -> StreamExecutorResult<Vec<StateTablePostCommit<'_, S>>> {
544 futures::future::try_join_all(
545 this.all_state_tables_mut()
546 .map(|table| async { table.commit(epoch).await }),
547 )
548 .await
549 }
550
551 async fn try_flush_data(this: &mut ExecutorInner<K, S>) -> StreamExecutorResult<()> {
552 futures::future::try_join_all(
553 this.all_state_tables_mut()
554 .map(|table| async { table.try_flush().await }),
555 )
556 .await?;
557 Ok(())
558 }
559
560 #[try_stream(ok = Message, error = StreamExecutorError)]
561 async fn execute_inner(self) {
562 let HashAggExecutor {
563 input,
564 inner: mut this,
565 } = self;
566
567 let actor_id = this.actor_ctx.id;
568
569 let window_col_idx_in_group_key = this.intermediate_state_table.pk_indices()[0];
570 let window_col_idx = this.group_key_indices[window_col_idx_in_group_key];
571
572 let agg_group_cache_metrics_info = MetricsInfo::new(
573 this.actor_ctx.streaming_metrics.clone(),
574 this.intermediate_state_table.table_id(),
575 this.actor_ctx.id,
576 "agg intermediate state table",
577 );
578 let metrics = this.actor_ctx.streaming_metrics.new_hash_agg_metrics(
579 this.intermediate_state_table.table_id(),
580 this.actor_ctx.id,
581 this.actor_ctx.fragment_id,
582 );
583
584 let mut vars = ExecutionVars {
585 metrics,
586 stats: ExecutionStats::default(),
587 agg_group_cache: ManagedLruCache::unbounded_with_hasher(
588 this.watermark_sequence.clone(),
589 agg_group_cache_metrics_info,
590 PrecomputedBuildHasher,
591 ),
592 dirty_groups: Default::default(),
593 distinct_dedup: DistinctDeduplicater::new(
594 &this.agg_calls,
595 this.watermark_sequence.clone(),
596 &this.distinct_dedup_tables,
597 &this.actor_ctx,
598 ),
599 buffered_watermarks: vec![None; this.group_key_indices.len()],
600 window_watermark: None,
601 chunk_builder: StreamChunkBuilder::new(this.chunk_size, this.info.schema.data_types()),
602 buffer: SortBuffer::new(window_col_idx_in_group_key, &this.intermediate_state_table),
603 };
604
605 let group_key_invert_idx = {
607 let mut group_key_invert_idx = vec![None; input.info().schema.len()];
608 for (group_key_seq, group_key_idx) in this.group_key_indices.iter().enumerate() {
609 group_key_invert_idx[*group_key_idx] = Some(group_key_seq);
610 }
611 group_key_invert_idx
612 };
613
614 let mut input = input.execute();
616 let barrier = expect_first_barrier(&mut input).await?;
617 let first_epoch = barrier.epoch;
618 yield Message::Barrier(barrier);
619 for table in this.all_state_tables_mut() {
620 table.init_epoch(first_epoch).await?;
621 }
622
623 #[for_await]
624 for msg in input {
625 let msg = msg?;
626 vars.agg_group_cache.evict();
627 match msg {
628 Message::Watermark(watermark) => {
629 let group_key_seq = group_key_invert_idx[watermark.col_idx];
630 if let Some(group_key_seq) = group_key_seq {
631 if watermark.col_idx == window_col_idx {
632 vars.window_watermark = Some(watermark.val.clone());
633 }
634 vars.buffered_watermarks[group_key_seq] =
635 Some(watermark.with_idx(group_key_seq));
636 }
637 }
638 Message::Chunk(chunk) => {
639 Self::apply_chunk(&mut this, &mut vars, chunk).await?;
640
641 if vars.dirty_groups.estimated_heap_size() >= this.max_dirty_groups_heap_size {
642 #[for_await]
644 for chunk in Self::flush_data(&mut this, &mut vars) {
645 yield Message::Chunk(chunk?);
646 }
647 }
648
649 Self::try_flush_data(&mut this).await?;
650 }
651 Message::Barrier(barrier) => {
652 #[for_await]
653 for chunk in Self::flush_data(&mut this, &mut vars) {
654 yield Message::Chunk(chunk?);
655 }
656 Self::flush_metrics(&this, &mut vars);
657 let emit_on_window_close = this.emit_on_window_close;
658 let post_commits = Self::commit_state_tables(&mut this, barrier.epoch).await?;
659
660 if emit_on_window_close {
661 if let Some(watermark) =
663 vars.buffered_watermarks[window_col_idx_in_group_key].take()
664 {
665 yield Message::Watermark(watermark);
666 }
667 } else {
668 for buffered_watermark in &mut vars.buffered_watermarks {
669 if let Some(watermark) = buffered_watermark.take() {
670 yield Message::Watermark(watermark);
671 }
672 }
673 }
674
675 let update_vnode_bitmap = barrier.as_update_vnode_bitmap(actor_id);
676 yield Message::Barrier(barrier);
677
678 if let Some(cache_may_stale) =
680 try_join_all(post_commits.into_iter().map(|post_commit| {
681 post_commit.post_yield_barrier(update_vnode_bitmap.clone())
682 }))
683 .await?
684 .pop()
685 .expect("should have at least one table")
686 .map(|(_, cache_may_stale)| cache_may_stale)
687 {
688 if cache_may_stale {
690 vars.agg_group_cache.clear();
691 vars.distinct_dedup.dedup_caches_mut().for_each(|cache| {
692 cache.clear();
693 });
694 }
695 }
696 }
697 }
698 }
699 }
700}