1use std::collections::HashMap;
16use std::marker::PhantomData;
17
18use futures::future::try_join_all;
19use futures::stream;
20use itertools::Itertools;
21use risingwave_common::bitmap::{Bitmap, BitmapBuilder};
22use risingwave_common::hash::{HashKey, PrecomputedBuildHasher};
23use risingwave_common::util::epoch::EpochPair;
24use risingwave_common::util::iter_util::ZipEqFast;
25use risingwave_common_estimate_size::EstimateSize;
26use risingwave_common_estimate_size::collections::EstimatedHashMap;
27use risingwave_expr::aggregate::{AggCall, BoxedAggregateFunction, build_retractable};
28use risingwave_pb::stream_plan::PbAggNodeVersion;
29
30use super::agg_group::{
31 AggGroup as GenericAggGroup, AggStateCacheStats, GroupKey, OnlyOutputIfHasInput,
32};
33use super::agg_state::AggStateStorage;
34use super::distinct::DistinctDeduplicater;
35use super::{AggExecutorArgs, HashAggExecutorExtraArgs, agg_call_filter_res, iter_table_storage};
36use crate::cache::ManagedLruCache;
37use crate::common::metrics::MetricsInfo;
38use crate::common::table::state_table::StateTablePostCommit;
39use crate::executor::eowc::SortBuffer;
40use crate::executor::monitor::HashAggMetrics;
41use crate::executor::prelude::*;
42
43type AggGroup<S> = GenericAggGroup<S, OnlyOutputIfHasInput>;
44type BoxedAggGroup<S> = Box<AggGroup<S>>;
45
46impl<S: StateStore> EstimateSize for BoxedAggGroup<S> {
47 fn estimated_heap_size(&self) -> usize {
48 self.as_ref().estimated_size()
49 }
50}
51
52type AggGroupCache<K, S> = ManagedLruCache<K, Option<BoxedAggGroup<S>>, PrecomputedBuildHasher>;
53
54pub struct HashAggExecutor<K: HashKey, S: StateStore> {
65 input: Executor,
66 inner: ExecutorInner<K, S>,
67}
68
69struct ExecutorInner<K: HashKey, S: StateStore> {
70 _phantom: PhantomData<K>,
71
72 version: PbAggNodeVersion,
74
75 actor_ctx: ActorContextRef,
76 info: ExecutorInfo,
77
78 input_pk_indices: Vec<usize>,
80
81 input_schema: Schema,
83
84 group_key_indices: Vec<usize>,
87
88 group_key_table_pk_projection: Arc<[usize]>,
90
91 agg_calls: Vec<AggCall>,
93
94 agg_funcs: Vec<BoxedAggregateFunction>,
96
97 row_count_index: usize,
99
100 storages: Vec<AggStateStorage<S>>,
103
104 intermediate_state_table: StateTable<S>,
109
110 distinct_dedup_tables: HashMap<usize, StateTable<S>>,
113
114 watermark_sequence: AtomicU64Ref,
116
117 extreme_cache_size: usize,
119
120 chunk_size: usize,
122
123 max_dirty_groups_heap_size: usize,
125
126 emit_on_window_close: bool,
128}
129
130impl<K: HashKey, S: StateStore> ExecutorInner<K, S> {
131 fn all_state_tables_mut(&mut self) -> impl Iterator<Item = &mut StateTable<S>> {
132 iter_table_storage(&mut self.storages)
133 .chain(self.distinct_dedup_tables.values_mut())
134 .chain(std::iter::once(&mut self.intermediate_state_table))
135 }
136}
137
138struct ExecutionVars<K: HashKey, S: StateStore> {
139 metrics: HashAggMetrics,
140
141 stats: ExecutionStats,
143
144 agg_group_cache: AggGroupCache<K, S>,
146
147 dirty_groups: EstimatedHashMap<K, BoxedAggGroup<S>>,
149
150 distinct_dedup: DistinctDeduplicater<S>,
152
153 buffered_watermarks: Vec<Option<Watermark>>,
155
156 window_watermark: Option<ScalarImpl>,
158
159 chunk_builder: StreamChunkBuilder,
161
162 buffer: SortBuffer<S>,
163}
164
165#[derive(Debug, Default)]
166struct ExecutionStats {
167 lookup_miss_count: u64,
169 total_lookup_count: u64,
170
171 chunk_lookup_miss_count: u64,
173 chunk_total_lookup_count: u64,
174
175 agg_state_cache_lookup_count: u64,
177 agg_state_cache_miss_count: u64,
178}
179
180impl ExecutionStats {
181 fn merge_state_cache_stats(&mut self, other: AggStateCacheStats) {
182 self.agg_state_cache_lookup_count += other.agg_state_cache_lookup_count;
183 self.agg_state_cache_miss_count += other.agg_state_cache_miss_count;
184 }
185}
186
187impl<K: HashKey, S: StateStore> Execute for HashAggExecutor<K, S> {
188 fn execute(self: Box<Self>) -> BoxedMessageStream {
189 self.execute_inner().boxed()
190 }
191}
192
193impl<K: HashKey, S: StateStore> HashAggExecutor<K, S> {
194 pub fn new(args: AggExecutorArgs<S, HashAggExecutorExtraArgs>) -> StreamResult<Self> {
195 let input_info = args.input.info().clone();
196
197 let group_key_len = args.extra.group_key_indices.len();
198 let group_key_table_pk_projection =
200 &args.intermediate_state_table.pk_indices()[..group_key_len];
201 assert!(
202 group_key_table_pk_projection
203 .iter()
204 .sorted()
205 .copied()
206 .eq(0..group_key_len)
207 );
208
209 Ok(Self {
210 input: args.input,
211 inner: ExecutorInner {
212 _phantom: PhantomData,
213 version: args.version,
214 actor_ctx: args.actor_ctx,
215 info: args.info,
216 input_pk_indices: input_info.pk_indices,
217 input_schema: input_info.schema,
218 group_key_indices: args.extra.group_key_indices,
219 group_key_table_pk_projection: group_key_table_pk_projection.to_vec().into(),
220 agg_funcs: args.agg_calls.iter().map(build_retractable).try_collect()?,
221 agg_calls: args.agg_calls,
222 row_count_index: args.row_count_index,
223 storages: args.storages,
224 intermediate_state_table: args.intermediate_state_table,
225 distinct_dedup_tables: args.distinct_dedup_tables,
226 watermark_sequence: args.watermark_epoch,
227 extreme_cache_size: args.extreme_cache_size,
228 chunk_size: args.extra.chunk_size,
229 max_dirty_groups_heap_size: args.extra.max_dirty_groups_heap_size,
230 emit_on_window_close: args.extra.emit_on_window_close,
231 },
232 })
233 }
234
235 fn get_group_visibilities(keys: Vec<K>, base_visibility: &Bitmap) -> Vec<(K, Bitmap)> {
242 let n_rows = keys.len();
243 let mut vis_builders = HashMap::new();
244 for (row_idx, key) in keys
245 .into_iter()
246 .enumerate()
247 .filter(|(row_idx, _)| base_visibility.is_set(*row_idx))
248 {
249 vis_builders
250 .entry(key)
251 .or_insert_with(|| BitmapBuilder::zeroed(n_rows))
252 .set(row_idx, true);
253 }
254 vis_builders
255 .into_iter()
256 .map(|(key, vis_builder)| (key, vis_builder.finish()))
257 .collect()
258 }
259
260 async fn touch_agg_groups(
263 this: &ExecutorInner<K, S>,
264 vars: &mut ExecutionVars<K, S>,
265 keys: impl IntoIterator<Item = &K>,
266 ) -> StreamExecutorResult<()> {
267 let group_key_types = &this.info.schema.data_types()[..this.group_key_indices.len()];
268 let futs = keys
269 .into_iter()
270 .filter_map(|key| {
271 vars.stats.total_lookup_count += 1;
272 if vars.dirty_groups.contains_key(key) {
273 return None;
275 }
276 match vars.agg_group_cache.get_mut(key) {
277 Some(mut agg_group) => {
278 let agg_group: &mut Option<_> = &mut agg_group;
279 assert!(
280 agg_group.is_some(),
281 "invalid state: AggGroup is None in cache but not dirty"
282 );
283 vars.dirty_groups
285 .insert(key.clone(), agg_group.take().unwrap());
286 None }
288 None => {
289 vars.stats.lookup_miss_count += 1;
290 Some(async {
291 let agg_group = AggGroup::create(
294 this.version,
295 Some(GroupKey::new(
296 key.deserialize(group_key_types)?,
297 Some(this.group_key_table_pk_projection.clone()),
298 )),
299 &this.agg_calls,
300 &this.agg_funcs,
301 &this.storages,
302 &this.intermediate_state_table,
303 &this.input_pk_indices,
304 this.row_count_index,
305 this.emit_on_window_close,
306 this.extreme_cache_size,
307 &this.input_schema,
308 )
309 .await?;
310 Ok::<_, StreamExecutorError>((key.clone(), Box::new(agg_group)))
311 })
312 }
313 }
314 })
315 .collect_vec(); vars.stats.chunk_total_lookup_count += 1;
318 if !futs.is_empty() {
319 vars.stats.chunk_lookup_miss_count += 1;
321 let mut buffered = stream::iter(futs).buffer_unordered(10).fuse();
322 while let Some(result) = buffered.next().await {
323 let (key, agg_group) = result?;
324 let none = vars.dirty_groups.insert(key, agg_group);
325 debug_assert!(none.is_none());
326 }
327 }
328 Ok(())
329 }
330
331 async fn apply_chunk(
332 this: &mut ExecutorInner<K, S>,
333 vars: &mut ExecutionVars<K, S>,
334 chunk: StreamChunk,
335 ) -> StreamExecutorResult<()> {
336 let keys = K::build_many(&this.group_key_indices, chunk.data_chunk());
338 let group_visibilities = Self::get_group_visibilities(keys, chunk.visibility());
339
340 Self::touch_agg_groups(this, vars, group_visibilities.iter().map(|(k, _)| k)).await?;
342
343 let mut call_visibilities = Vec::with_capacity(this.agg_calls.len());
345 for agg_call in &this.agg_calls {
346 let agg_call_filter_res = agg_call_filter_res(agg_call, &chunk).await?;
347 call_visibilities.push(agg_call_filter_res);
348 }
349
350 for ((call, storage), visibility) in (this.agg_calls.iter())
353 .zip_eq_fast(&mut this.storages)
354 .zip_eq_fast(call_visibilities.iter())
355 {
356 if let AggStateStorage::MaterializedInput { table, mapping, .. } = storage
357 && !call.distinct
358 {
359 let chunk = chunk.project_with_vis(mapping.upstream_columns(), visibility.clone());
360 table.write_chunk(chunk);
361 }
362 }
363
364 for (key, visibility) in group_visibilities {
366 let agg_group: &mut BoxedAggGroup<_> = &mut vars.dirty_groups.get_mut(&key).unwrap();
367
368 let visibilities = call_visibilities
369 .iter()
370 .map(|call_vis| call_vis & &visibility)
371 .collect();
372 let visibilities = vars
373 .distinct_dedup
374 .dedup_chunk(
375 chunk.ops(),
376 chunk.columns(),
377 visibilities,
378 &mut this.distinct_dedup_tables,
379 agg_group.group_key(),
380 )
381 .await?;
382 for ((call, storage), visibility) in (this.agg_calls.iter())
383 .zip_eq_fast(&mut this.storages)
384 .zip_eq_fast(visibilities.iter())
385 {
386 if let AggStateStorage::MaterializedInput { table, mapping, .. } = storage
387 && call.distinct
388 {
389 let chunk =
390 chunk.project_with_vis(mapping.upstream_columns(), visibility.clone());
391 table.write_chunk(chunk);
392 }
393 }
394 agg_group
395 .apply_chunk(&chunk, &this.agg_calls, &this.agg_funcs, visibilities)
396 .await?;
397 }
398
399 vars.metrics
401 .agg_dirty_groups_count
402 .set(vars.dirty_groups.len() as i64);
403 vars.metrics
404 .agg_dirty_groups_heap_size
405 .set(vars.dirty_groups.estimated_heap_size() as i64);
406
407 Ok(())
408 }
409
410 #[try_stream(ok = StreamChunk, error = StreamExecutorError)]
411 async fn flush_data<'a>(this: &'a mut ExecutorInner<K, S>, vars: &'a mut ExecutionVars<K, S>) {
412 let window_watermark = vars.window_watermark.take();
413
414 for mut agg_group in vars.dirty_groups.values_mut() {
416 let Some(inter_states_change) = agg_group.build_states_change(&this.agg_funcs)? else {
417 continue;
418 };
419
420 if this.emit_on_window_close {
421 vars.buffer
422 .apply_change(inter_states_change, &mut this.intermediate_state_table);
423 } else {
424 this.intermediate_state_table
425 .write_record(inter_states_change);
426 }
427 }
428
429 if this.emit_on_window_close {
430 if let Some(watermark) = window_watermark.as_ref() {
432 #[for_await]
433 for row in vars
434 .buffer
435 .consume(watermark.clone(), &mut this.intermediate_state_table)
436 {
437 let row = row?;
438 let group_key = row
439 .clone()
440 .into_iter()
441 .take(this.group_key_indices.len())
442 .collect();
443 let inter_states = row.into_iter().skip(this.group_key_indices.len()).collect();
444
445 let mut agg_group = AggGroup::<S>::for_eowc_output(
446 this.version,
447 Some(GroupKey::new(
448 group_key,
449 Some(this.group_key_table_pk_projection.clone()),
450 )),
451 &this.agg_calls,
452 &this.agg_funcs,
453 &this.storages,
454 &inter_states,
455 &this.input_pk_indices,
456 this.row_count_index,
457 this.emit_on_window_close,
458 this.extreme_cache_size,
459 &this.input_schema,
460 )?;
461
462 let (change, stats) = agg_group
463 .build_outputs_change(&this.storages, &this.agg_funcs)
464 .await?;
465 vars.stats.merge_state_cache_stats(stats);
466
467 if let Some(change) = change {
468 if let Some(chunk) = vars.chunk_builder.append_record(change) {
469 yield chunk;
470 }
471 }
472 }
473 }
474 } else {
475 for mut agg_group in vars.dirty_groups.values_mut() {
478 let agg_group = agg_group.as_mut();
479 let (change, stats) = agg_group
480 .build_outputs_change(&this.storages, &this.agg_funcs)
481 .await?;
482 vars.stats.merge_state_cache_stats(stats);
483
484 if let Some(change) = change {
485 if let Some(chunk) = vars.chunk_builder.append_record(change) {
486 yield chunk;
487 }
488 }
489 }
490 }
491
492 for (key, agg_group) in vars.dirty_groups.drain() {
494 vars.agg_group_cache.put(key, Some(agg_group));
495 }
496
497 if let Some(chunk) = vars.chunk_builder.take() {
499 yield chunk;
500 }
501
502 if let Some(watermark) = window_watermark {
503 this.all_state_tables_mut()
505 .for_each(|table| table.update_watermark(watermark.clone()));
506 }
507
508 vars.distinct_dedup.flush(&mut this.distinct_dedup_tables)?;
510
511 vars.agg_group_cache.evict();
513 }
514
515 fn flush_metrics(_this: &ExecutorInner<K, S>, vars: &mut ExecutionVars<K, S>) {
516 vars.metrics
517 .agg_lookup_miss_count
518 .inc_by(std::mem::take(&mut vars.stats.lookup_miss_count));
519 vars.metrics
520 .agg_total_lookup_count
521 .inc_by(std::mem::take(&mut vars.stats.total_lookup_count));
522 vars.metrics
523 .agg_cached_entry_count
524 .set(vars.agg_group_cache.len() as i64);
525 vars.metrics
526 .agg_chunk_lookup_miss_count
527 .inc_by(std::mem::take(&mut vars.stats.chunk_lookup_miss_count));
528 vars.metrics
529 .agg_chunk_total_lookup_count
530 .inc_by(std::mem::take(&mut vars.stats.chunk_total_lookup_count));
531 vars.metrics
532 .agg_state_cache_lookup_count
533 .inc_by(std::mem::take(&mut vars.stats.agg_state_cache_lookup_count));
534 vars.metrics
535 .agg_state_cache_miss_count
536 .inc_by(std::mem::take(&mut vars.stats.agg_state_cache_miss_count));
537 }
538
539 async fn commit_state_tables(
540 this: &mut ExecutorInner<K, S>,
541 epoch: EpochPair,
542 ) -> StreamExecutorResult<Vec<StateTablePostCommit<'_, S>>> {
543 futures::future::try_join_all(
544 this.all_state_tables_mut()
545 .map(|table| async { table.commit(epoch).await }),
546 )
547 .await
548 }
549
550 async fn try_flush_data(this: &mut ExecutorInner<K, S>) -> StreamExecutorResult<()> {
551 futures::future::try_join_all(
552 this.all_state_tables_mut()
553 .map(|table| async { table.try_flush().await }),
554 )
555 .await?;
556 Ok(())
557 }
558
559 #[try_stream(ok = Message, error = StreamExecutorError)]
560 async fn execute_inner(self) {
561 let HashAggExecutor {
562 input,
563 inner: mut this,
564 } = self;
565
566 let actor_id = this.actor_ctx.id;
567
568 let window_col_idx_in_group_key = this.intermediate_state_table.pk_indices()[0];
569 let window_col_idx = this.group_key_indices[window_col_idx_in_group_key];
570
571 let agg_group_cache_metrics_info = MetricsInfo::new(
572 this.actor_ctx.streaming_metrics.clone(),
573 this.intermediate_state_table.table_id(),
574 this.actor_ctx.id,
575 "agg intermediate state table",
576 );
577 let metrics = this.actor_ctx.streaming_metrics.new_hash_agg_metrics(
578 this.intermediate_state_table.table_id(),
579 this.actor_ctx.id,
580 this.actor_ctx.fragment_id,
581 );
582
583 let mut vars = ExecutionVars {
584 metrics,
585 stats: ExecutionStats::default(),
586 agg_group_cache: ManagedLruCache::unbounded_with_hasher(
587 this.watermark_sequence.clone(),
588 agg_group_cache_metrics_info,
589 PrecomputedBuildHasher,
590 ),
591 dirty_groups: Default::default(),
592 distinct_dedup: DistinctDeduplicater::new(
593 &this.agg_calls,
594 this.watermark_sequence.clone(),
595 &this.distinct_dedup_tables,
596 &this.actor_ctx,
597 ),
598 buffered_watermarks: vec![None; this.group_key_indices.len()],
599 window_watermark: None,
600 chunk_builder: StreamChunkBuilder::new(this.chunk_size, this.info.schema.data_types()),
601 buffer: SortBuffer::new(window_col_idx_in_group_key, &this.intermediate_state_table),
602 };
603
604 let group_key_invert_idx = {
606 let mut group_key_invert_idx = vec![None; input.info().schema.len()];
607 for (group_key_seq, group_key_idx) in this.group_key_indices.iter().enumerate() {
608 group_key_invert_idx[*group_key_idx] = Some(group_key_seq);
609 }
610 group_key_invert_idx
611 };
612
613 let mut input = input.execute();
615 let barrier = expect_first_barrier(&mut input).await?;
616 let first_epoch = barrier.epoch;
617 yield Message::Barrier(barrier);
618 for table in this.all_state_tables_mut() {
619 table.init_epoch(first_epoch).await?;
620 }
621
622 #[for_await]
623 for msg in input {
624 let msg = msg?;
625 vars.agg_group_cache.evict();
626 match msg {
627 Message::Watermark(watermark) => {
628 let group_key_seq = group_key_invert_idx[watermark.col_idx];
629 if let Some(group_key_seq) = group_key_seq {
630 if watermark.col_idx == window_col_idx {
631 vars.window_watermark = Some(watermark.val.clone());
632 }
633 vars.buffered_watermarks[group_key_seq] =
634 Some(watermark.with_idx(group_key_seq));
635 }
636 }
637 Message::Chunk(chunk) => {
638 Self::apply_chunk(&mut this, &mut vars, chunk).await?;
639
640 if vars.dirty_groups.estimated_heap_size() >= this.max_dirty_groups_heap_size {
641 #[for_await]
643 for chunk in Self::flush_data(&mut this, &mut vars) {
644 yield Message::Chunk(chunk?);
645 }
646 }
647
648 Self::try_flush_data(&mut this).await?;
649 }
650 Message::Barrier(barrier) => {
651 #[for_await]
652 for chunk in Self::flush_data(&mut this, &mut vars) {
653 yield Message::Chunk(chunk?);
654 }
655 Self::flush_metrics(&this, &mut vars);
656 let emit_on_window_close = this.emit_on_window_close;
657 let post_commits = Self::commit_state_tables(&mut this, barrier.epoch).await?;
658
659 if emit_on_window_close {
660 if let Some(watermark) =
662 vars.buffered_watermarks[window_col_idx_in_group_key].take()
663 {
664 yield Message::Watermark(watermark);
665 }
666 } else {
667 for buffered_watermark in &mut vars.buffered_watermarks {
668 if let Some(watermark) = buffered_watermark.take() {
669 yield Message::Watermark(watermark);
670 }
671 }
672 }
673
674 let update_vnode_bitmap = barrier.as_update_vnode_bitmap(actor_id);
675 yield Message::Barrier(barrier);
676
677 if let Some(cache_may_stale) =
679 try_join_all(post_commits.into_iter().map(|post_commit| {
680 post_commit.post_yield_barrier(update_vnode_bitmap.clone())
681 }))
682 .await?
683 .pop()
684 .expect("should have at least one table")
685 .map(|(_, cache_may_stale)| cache_may_stale)
686 {
687 if cache_may_stale {
689 vars.agg_group_cache.clear();
690 vars.distinct_dedup.dedup_caches_mut().for_each(|cache| {
691 cache.clear();
692 });
693 }
694 }
695 }
696 }
697 }
698 }
699}