risingwave_stream/executor/aggregate/
hash_agg.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::marker::PhantomData;
17
18use futures::future::try_join_all;
19use futures::stream;
20use itertools::Itertools;
21use risingwave_common::bitmap::{Bitmap, BitmapBuilder};
22use risingwave_common::hash::{HashKey, PrecomputedBuildHasher};
23use risingwave_common::util::epoch::EpochPair;
24use risingwave_common::util::iter_util::ZipEqFast;
25use risingwave_common_estimate_size::EstimateSize;
26use risingwave_common_estimate_size::collections::EstimatedHashMap;
27use risingwave_expr::aggregate::{AggCall, BoxedAggregateFunction, build_retractable};
28use risingwave_pb::stream_plan::PbAggNodeVersion;
29
30use super::agg_group::{
31    AggGroup as GenericAggGroup, AggStateCacheStats, GroupKey, OnlyOutputIfHasInput,
32};
33use super::agg_state::AggStateStorage;
34use super::distinct::DistinctDeduplicater;
35use super::{AggExecutorArgs, HashAggExecutorExtraArgs, agg_call_filter_res, iter_table_storage};
36use crate::cache::ManagedLruCache;
37use crate::common::metrics::MetricsInfo;
38use crate::common::table::state_table::StateTablePostCommit;
39use crate::executor::eowc::SortBuffer;
40use crate::executor::monitor::HashAggMetrics;
41use crate::executor::prelude::*;
42
43type AggGroup<S> = GenericAggGroup<S, OnlyOutputIfHasInput>;
44type BoxedAggGroup<S> = Box<AggGroup<S>>;
45
46impl<S: StateStore> EstimateSize for BoxedAggGroup<S> {
47    fn estimated_heap_size(&self) -> usize {
48        self.as_ref().estimated_size()
49    }
50}
51
52type AggGroupCache<K, S> = ManagedLruCache<K, Option<BoxedAggGroup<S>>, PrecomputedBuildHasher>;
53
54/// [`HashAggExecutor`] could process large amounts of data using a state backend. It works as
55/// follows:
56///
57/// * The executor pulls data from the upstream, and apply the data chunks to the corresponding
58///   aggregation states.
59/// * While processing, it will record which keys have been modified in this epoch using
60///   `group_change_set`.
61/// * Upon a barrier is received, the executor will call `.flush` on the storage backend, so that
62///   all modifications will be flushed to the storage backend. Meanwhile, the executor will go
63///   through `group_change_set`, and produce a stream chunk based on the state changes.
64pub struct HashAggExecutor<K: HashKey, S: StateStore> {
65    input: Executor,
66    inner: ExecutorInner<K, S>,
67}
68
69struct ExecutorInner<K: HashKey, S: StateStore> {
70    _phantom: PhantomData<K>,
71
72    /// Version of aggregation executors.
73    version: PbAggNodeVersion,
74
75    actor_ctx: ActorContextRef,
76    info: ExecutorInfo,
77
78    /// Pk indices from input. Only used by `AggNodeVersion` before `ISSUE_13465`.
79    input_pk_indices: Vec<usize>,
80
81    /// Schema from input.
82    input_schema: Schema,
83
84    /// Indices of the columns
85    /// all of the aggregation functions in this executor should depend on same group of keys
86    group_key_indices: Vec<usize>,
87
88    // The projection from group key in table schema to table pk.
89    group_key_table_pk_projection: Arc<[usize]>,
90
91    /// A [`HashAggExecutor`] may have multiple [`AggCall`]s.
92    agg_calls: Vec<AggCall>,
93
94    /// Aggregate functions.
95    agg_funcs: Vec<BoxedAggregateFunction>,
96
97    /// Index of row count agg call (`count(*)`) in the call list.
98    row_count_index: usize,
99
100    /// State storages for each aggregation calls.
101    /// `None` means the agg call need not to maintain a state table by itself.
102    storages: Vec<AggStateStorage<S>>,
103
104    /// Intermediate state table for value-state agg calls.
105    /// The state of all value-state aggregates are collected and stored in this
106    /// table when `flush_data` is called.
107    /// Also serves as EOWC sort buffer table.
108    intermediate_state_table: StateTable<S>,
109
110    /// State tables for deduplicating rows on distinct key for distinct agg calls.
111    /// One table per distinct column (may be shared by multiple agg calls).
112    distinct_dedup_tables: HashMap<usize, StateTable<S>>,
113
114    /// Watermark epoch.
115    watermark_sequence: AtomicU64Ref,
116
117    /// State cache size for extreme agg.
118    extreme_cache_size: usize,
119
120    /// The maximum size of the chunk produced by executor at a time.
121    chunk_size: usize,
122
123    /// The maximum heap size of dirty groups. If exceeds, the executor should flush dirty groups.
124    max_dirty_groups_heap_size: usize,
125
126    /// Should emit on window close according to watermark?
127    emit_on_window_close: bool,
128}
129
130impl<K: HashKey, S: StateStore> ExecutorInner<K, S> {
131    fn all_state_tables_mut(&mut self) -> impl Iterator<Item = &mut StateTable<S>> {
132        iter_table_storage(&mut self.storages)
133            .chain(self.distinct_dedup_tables.values_mut())
134            .chain(std::iter::once(&mut self.intermediate_state_table))
135    }
136}
137
138struct ExecutionVars<K: HashKey, S: StateStore> {
139    metrics: HashAggMetrics,
140
141    // Stats collected during execution, will be flushed to metrics at the end of each barrier.
142    stats: ExecutionStats,
143
144    /// Cache for [`AggGroup`]s. `HashKey` -> `AggGroup`.
145    agg_group_cache: AggGroupCache<K, S>,
146
147    /// Changed [`AggGroup`]s in the current epoch (before next flush).
148    dirty_groups: EstimatedHashMap<K, BoxedAggGroup<S>>,
149
150    /// Distinct deduplicater to deduplicate input rows for each distinct agg call.
151    distinct_dedup: DistinctDeduplicater<S>,
152
153    /// Buffer watermarks on group keys received since last barrier.
154    buffered_watermarks: Vec<Option<Watermark>>,
155
156    /// Latest watermark on window column.
157    window_watermark: Option<ScalarImpl>,
158
159    /// Stream chunk builder.
160    chunk_builder: StreamChunkBuilder,
161
162    buffer: SortBuffer<S>,
163}
164
165#[derive(Debug, Default)]
166struct ExecutionStats {
167    /// Agg group cache (LRU) stats.
168    lookup_miss_count: u64,
169    total_lookup_count: u64,
170
171    /// Agg group cache stats in chunk-granularity.
172    chunk_lookup_miss_count: u64,
173    chunk_total_lookup_count: u64,
174
175    /// Agg state cache stats.
176    agg_state_cache_lookup_count: u64,
177    agg_state_cache_miss_count: u64,
178}
179
180impl ExecutionStats {
181    fn merge_state_cache_stats(&mut self, other: AggStateCacheStats) {
182        self.agg_state_cache_lookup_count += other.agg_state_cache_lookup_count;
183        self.agg_state_cache_miss_count += other.agg_state_cache_miss_count;
184    }
185}
186
187impl<K: HashKey, S: StateStore> Execute for HashAggExecutor<K, S> {
188    fn execute(self: Box<Self>) -> BoxedMessageStream {
189        self.execute_inner().boxed()
190    }
191}
192
193impl<K: HashKey, S: StateStore> HashAggExecutor<K, S> {
194    pub fn new(args: AggExecutorArgs<S, HashAggExecutorExtraArgs>) -> StreamResult<Self> {
195        let input_info = args.input.info().clone();
196
197        let group_key_len = args.extra.group_key_indices.len();
198        // NOTE: we assume the prefix of table pk is exactly the group key
199        let group_key_table_pk_projection =
200            &args.intermediate_state_table.pk_indices()[..group_key_len];
201        assert!(
202            group_key_table_pk_projection
203                .iter()
204                .sorted()
205                .copied()
206                .eq(0..group_key_len)
207        );
208
209        Ok(Self {
210            input: args.input,
211            inner: ExecutorInner {
212                _phantom: PhantomData,
213                version: args.version,
214                actor_ctx: args.actor_ctx,
215                info: args.info,
216                input_pk_indices: input_info.pk_indices,
217                input_schema: input_info.schema,
218                group_key_indices: args.extra.group_key_indices,
219                group_key_table_pk_projection: group_key_table_pk_projection.to_vec().into(),
220                agg_funcs: args.agg_calls.iter().map(build_retractable).try_collect()?,
221                agg_calls: args.agg_calls,
222                row_count_index: args.row_count_index,
223                storages: args.storages,
224                intermediate_state_table: args.intermediate_state_table,
225                distinct_dedup_tables: args.distinct_dedup_tables,
226                watermark_sequence: args.watermark_epoch,
227                extreme_cache_size: args.extreme_cache_size,
228                chunk_size: args.extra.chunk_size,
229                max_dirty_groups_heap_size: args.extra.max_dirty_groups_heap_size,
230                emit_on_window_close: args.extra.emit_on_window_close,
231            },
232        })
233    }
234
235    /// Get visibilities that mask rows in the chunk for each group. The returned visibility
236    /// is a `Bitmap` rather than `Option<Bitmap>` because it's likely to have multiple groups
237    /// in one chunk.
238    ///
239    /// * `keys`: Hash Keys of rows.
240    /// * `base_visibility`: Visibility of rows.
241    fn get_group_visibilities(keys: Vec<K>, base_visibility: &Bitmap) -> Vec<(K, Bitmap)> {
242        let n_rows = keys.len();
243        let mut vis_builders = HashMap::new();
244        for (row_idx, key) in keys
245            .into_iter()
246            .enumerate()
247            .filter(|(row_idx, _)| base_visibility.is_set(*row_idx))
248        {
249            vis_builders
250                .entry(key)
251                .or_insert_with(|| BitmapBuilder::zeroed(n_rows))
252                .set(row_idx, true);
253        }
254        vis_builders
255            .into_iter()
256            .map(|(key, vis_builder)| (key, vis_builder.finish()))
257            .collect()
258    }
259
260    /// Touch the [`AggGroup`]s for the given keys, which means move them from cache to the `dirty_groups` map.
261    /// If the [`AggGroup`] doesn't exist in the cache before, it will be created or recovered from state table.
262    async fn touch_agg_groups(
263        this: &ExecutorInner<K, S>,
264        vars: &mut ExecutionVars<K, S>,
265        keys: impl IntoIterator<Item = &K>,
266    ) -> StreamExecutorResult<()> {
267        let group_key_types = &this.info.schema.data_types()[..this.group_key_indices.len()];
268        let futs = keys
269            .into_iter()
270            .filter_map(|key| {
271                vars.stats.total_lookup_count += 1;
272                if vars.dirty_groups.contains_key(key) {
273                    // already dirty
274                    return None;
275                }
276                match vars.agg_group_cache.get_mut(key) {
277                    Some(mut agg_group) => {
278                        let agg_group: &mut Option<_> = &mut agg_group;
279                        assert!(
280                            agg_group.is_some(),
281                            "invalid state: AggGroup is None in cache but not dirty"
282                        );
283                        // move from cache to `dirty_groups`
284                        vars.dirty_groups
285                            .insert(key.clone(), agg_group.take().unwrap());
286                        None // no need to create
287                    }
288                    None => {
289                        vars.stats.lookup_miss_count += 1;
290                        Some(async {
291                            // Create `AggGroup` for the current group if not exists. This will
292                            // restore agg states from the intermediate state table.
293                            let agg_group = AggGroup::create(
294                                this.version,
295                                Some(GroupKey::new(
296                                    key.deserialize(group_key_types)?,
297                                    Some(this.group_key_table_pk_projection.clone()),
298                                )),
299                                &this.agg_calls,
300                                &this.agg_funcs,
301                                &this.storages,
302                                &this.intermediate_state_table,
303                                &this.input_pk_indices,
304                                this.row_count_index,
305                                this.emit_on_window_close,
306                                this.extreme_cache_size,
307                                &this.input_schema,
308                            )
309                            .await?;
310                            Ok::<_, StreamExecutorError>((key.clone(), Box::new(agg_group)))
311                        })
312                    }
313                }
314            })
315            .collect_vec(); // collect is necessary to avoid lifetime issue of `agg_group_cache`
316
317        vars.stats.chunk_total_lookup_count += 1;
318        if !futs.is_empty() {
319            // If not all the required states/keys are in the cache, it's a chunk-level cache miss.
320            vars.stats.chunk_lookup_miss_count += 1;
321            let mut buffered = stream::iter(futs).buffer_unordered(10).fuse();
322            while let Some(result) = buffered.next().await {
323                let (key, agg_group) = result?;
324                let none = vars.dirty_groups.insert(key, agg_group);
325                debug_assert!(none.is_none());
326            }
327        }
328        Ok(())
329    }
330
331    async fn apply_chunk(
332        this: &mut ExecutorInner<K, S>,
333        vars: &mut ExecutionVars<K, S>,
334        chunk: StreamChunk,
335    ) -> StreamExecutorResult<()> {
336        // Find groups in this chunk and generate visibility for each group key.
337        let keys = K::build_many(&this.group_key_indices, chunk.data_chunk());
338        let group_visibilities = Self::get_group_visibilities(keys, chunk.visibility());
339
340        // Ensure all `AggGroup`s are in `dirty_groups`.
341        Self::touch_agg_groups(this, vars, group_visibilities.iter().map(|(k, _)| k)).await?;
342
343        // Calculate the row visibility for every agg call.
344        let mut call_visibilities = Vec::with_capacity(this.agg_calls.len());
345        for agg_call in &this.agg_calls {
346            let agg_call_filter_res = agg_call_filter_res(agg_call, &chunk).await?;
347            call_visibilities.push(agg_call_filter_res);
348        }
349
350        // Materialize input chunk if needed and possible.
351        // For aggregations without distinct, we can materialize before grouping.
352        for ((call, storage), visibility) in (this.agg_calls.iter())
353            .zip_eq_fast(&mut this.storages)
354            .zip_eq_fast(call_visibilities.iter())
355        {
356            if let AggStateStorage::MaterializedInput { table, mapping, .. } = storage
357                && !call.distinct
358            {
359                let chunk = chunk.project_with_vis(mapping.upstream_columns(), visibility.clone());
360                table.write_chunk(chunk);
361            }
362        }
363
364        // Apply chunk to each of the state (per agg_call), for each group.
365        for (key, visibility) in group_visibilities {
366            let agg_group: &mut BoxedAggGroup<_> = &mut vars.dirty_groups.get_mut(&key).unwrap();
367
368            let visibilities = call_visibilities
369                .iter()
370                .map(|call_vis| call_vis & &visibility)
371                .collect();
372            let visibilities = vars
373                .distinct_dedup
374                .dedup_chunk(
375                    chunk.ops(),
376                    chunk.columns(),
377                    visibilities,
378                    &mut this.distinct_dedup_tables,
379                    agg_group.group_key(),
380                )
381                .await?;
382            for ((call, storage), visibility) in (this.agg_calls.iter())
383                .zip_eq_fast(&mut this.storages)
384                .zip_eq_fast(visibilities.iter())
385            {
386                if let AggStateStorage::MaterializedInput { table, mapping, .. } = storage
387                    && call.distinct
388                {
389                    let chunk =
390                        chunk.project_with_vis(mapping.upstream_columns(), visibility.clone());
391                    table.write_chunk(chunk);
392                }
393            }
394            agg_group
395                .apply_chunk(&chunk, &this.agg_calls, &this.agg_funcs, visibilities)
396                .await?;
397        }
398
399        // Update the metrics.
400        vars.metrics
401            .agg_dirty_groups_count
402            .set(vars.dirty_groups.len() as i64);
403        vars.metrics
404            .agg_dirty_groups_heap_size
405            .set(vars.dirty_groups.estimated_heap_size() as i64);
406
407        Ok(())
408    }
409
410    #[try_stream(ok = StreamChunk, error = StreamExecutorError)]
411    async fn flush_data<'a>(this: &'a mut ExecutorInner<K, S>, vars: &'a mut ExecutionVars<K, S>) {
412        let window_watermark = vars.window_watermark.take();
413
414        // flush changed states into intermediate state table
415        for mut agg_group in vars.dirty_groups.values_mut() {
416            let Some(inter_states_change) = agg_group.build_states_change(&this.agg_funcs)? else {
417                continue;
418            };
419
420            if this.emit_on_window_close {
421                vars.buffer
422                    .apply_change(inter_states_change, &mut this.intermediate_state_table);
423            } else {
424                this.intermediate_state_table
425                    .write_record(inter_states_change);
426            }
427        }
428
429        if this.emit_on_window_close {
430            // remove all groups under watermark and emit their results
431            if let Some(watermark) = window_watermark.as_ref() {
432                #[for_await]
433                for row in vars
434                    .buffer
435                    .consume(watermark.clone(), &mut this.intermediate_state_table)
436                {
437                    let row = row?;
438                    let group_key = row
439                        .clone()
440                        .into_iter()
441                        .take(this.group_key_indices.len())
442                        .collect();
443                    let inter_states = row.into_iter().skip(this.group_key_indices.len()).collect();
444
445                    let mut agg_group = AggGroup::<S>::for_eowc_output(
446                        this.version,
447                        Some(GroupKey::new(
448                            group_key,
449                            Some(this.group_key_table_pk_projection.clone()),
450                        )),
451                        &this.agg_calls,
452                        &this.agg_funcs,
453                        &this.storages,
454                        &inter_states,
455                        &this.input_pk_indices,
456                        this.row_count_index,
457                        this.emit_on_window_close,
458                        this.extreme_cache_size,
459                        &this.input_schema,
460                    )?;
461
462                    let (change, stats) = agg_group
463                        .build_outputs_change(&this.storages, &this.agg_funcs)
464                        .await?;
465                    vars.stats.merge_state_cache_stats(stats);
466
467                    if let Some(change) = change {
468                        if let Some(chunk) = vars.chunk_builder.append_record(change) {
469                            yield chunk;
470                        }
471                    }
472                }
473            }
474        } else {
475            // emit on update
476            // TODO(wrj,rc): we may need to parallelize it and set a reasonable concurrency limit.
477            for mut agg_group in vars.dirty_groups.values_mut() {
478                let agg_group = agg_group.as_mut();
479                let (change, stats) = agg_group
480                    .build_outputs_change(&this.storages, &this.agg_funcs)
481                    .await?;
482                vars.stats.merge_state_cache_stats(stats);
483
484                if let Some(change) = change {
485                    if let Some(chunk) = vars.chunk_builder.append_record(change) {
486                        yield chunk;
487                    }
488                }
489            }
490        }
491
492        // move dirty groups back to cache
493        for (key, agg_group) in vars.dirty_groups.drain() {
494            vars.agg_group_cache.put(key, Some(agg_group));
495        }
496
497        // Yield the remaining rows in chunk builder.
498        if let Some(chunk) = vars.chunk_builder.take() {
499            yield chunk;
500        }
501
502        if let Some(watermark) = window_watermark {
503            // Update watermark of state tables, for state cleaning.
504            this.all_state_tables_mut()
505                .for_each(|table| table.update_watermark(watermark.clone()));
506        }
507
508        // Flush distinct dedup state.
509        vars.distinct_dedup.flush(&mut this.distinct_dedup_tables)?;
510
511        // Evict cache to target capacity.
512        vars.agg_group_cache.evict();
513    }
514
515    fn flush_metrics(_this: &ExecutorInner<K, S>, vars: &mut ExecutionVars<K, S>) {
516        vars.metrics
517            .agg_lookup_miss_count
518            .inc_by(std::mem::take(&mut vars.stats.lookup_miss_count));
519        vars.metrics
520            .agg_total_lookup_count
521            .inc_by(std::mem::take(&mut vars.stats.total_lookup_count));
522        vars.metrics
523            .agg_cached_entry_count
524            .set(vars.agg_group_cache.len() as i64);
525        vars.metrics
526            .agg_chunk_lookup_miss_count
527            .inc_by(std::mem::take(&mut vars.stats.chunk_lookup_miss_count));
528        vars.metrics
529            .agg_chunk_total_lookup_count
530            .inc_by(std::mem::take(&mut vars.stats.chunk_total_lookup_count));
531        vars.metrics
532            .agg_state_cache_lookup_count
533            .inc_by(std::mem::take(&mut vars.stats.agg_state_cache_lookup_count));
534        vars.metrics
535            .agg_state_cache_miss_count
536            .inc_by(std::mem::take(&mut vars.stats.agg_state_cache_miss_count));
537    }
538
539    async fn commit_state_tables(
540        this: &mut ExecutorInner<K, S>,
541        epoch: EpochPair,
542    ) -> StreamExecutorResult<Vec<StateTablePostCommit<'_, S>>> {
543        futures::future::try_join_all(
544            this.all_state_tables_mut()
545                .map(|table| async { table.commit(epoch).await }),
546        )
547        .await
548    }
549
550    async fn try_flush_data(this: &mut ExecutorInner<K, S>) -> StreamExecutorResult<()> {
551        futures::future::try_join_all(
552            this.all_state_tables_mut()
553                .map(|table| async { table.try_flush().await }),
554        )
555        .await?;
556        Ok(())
557    }
558
559    #[try_stream(ok = Message, error = StreamExecutorError)]
560    async fn execute_inner(self) {
561        let HashAggExecutor {
562            input,
563            inner: mut this,
564        } = self;
565
566        let actor_id = this.actor_ctx.id;
567
568        let window_col_idx_in_group_key = this.intermediate_state_table.pk_indices()[0];
569        let window_col_idx = this.group_key_indices[window_col_idx_in_group_key];
570
571        let agg_group_cache_metrics_info = MetricsInfo::new(
572            this.actor_ctx.streaming_metrics.clone(),
573            this.intermediate_state_table.table_id(),
574            this.actor_ctx.id,
575            "agg intermediate state table",
576        );
577        let metrics = this.actor_ctx.streaming_metrics.new_hash_agg_metrics(
578            this.intermediate_state_table.table_id(),
579            this.actor_ctx.id,
580            this.actor_ctx.fragment_id,
581        );
582
583        let mut vars = ExecutionVars {
584            metrics,
585            stats: ExecutionStats::default(),
586            agg_group_cache: ManagedLruCache::unbounded_with_hasher(
587                this.watermark_sequence.clone(),
588                agg_group_cache_metrics_info,
589                PrecomputedBuildHasher,
590            ),
591            dirty_groups: Default::default(),
592            distinct_dedup: DistinctDeduplicater::new(
593                &this.agg_calls,
594                this.watermark_sequence.clone(),
595                &this.distinct_dedup_tables,
596                &this.actor_ctx,
597            ),
598            buffered_watermarks: vec![None; this.group_key_indices.len()],
599            window_watermark: None,
600            chunk_builder: StreamChunkBuilder::new(this.chunk_size, this.info.schema.data_types()),
601            buffer: SortBuffer::new(window_col_idx_in_group_key, &this.intermediate_state_table),
602        };
603
604        // TODO(rc): use something like a `ColumnMapping` type
605        let group_key_invert_idx = {
606            let mut group_key_invert_idx = vec![None; input.info().schema.len()];
607            for (group_key_seq, group_key_idx) in this.group_key_indices.iter().enumerate() {
608                group_key_invert_idx[*group_key_idx] = Some(group_key_seq);
609            }
610            group_key_invert_idx
611        };
612
613        // First barrier
614        let mut input = input.execute();
615        let barrier = expect_first_barrier(&mut input).await?;
616        let first_epoch = barrier.epoch;
617        yield Message::Barrier(barrier);
618        for table in this.all_state_tables_mut() {
619            table.init_epoch(first_epoch).await?;
620        }
621
622        #[for_await]
623        for msg in input {
624            let msg = msg?;
625            vars.agg_group_cache.evict();
626            match msg {
627                Message::Watermark(watermark) => {
628                    let group_key_seq = group_key_invert_idx[watermark.col_idx];
629                    if let Some(group_key_seq) = group_key_seq {
630                        if watermark.col_idx == window_col_idx {
631                            vars.window_watermark = Some(watermark.val.clone());
632                        }
633                        vars.buffered_watermarks[group_key_seq] =
634                            Some(watermark.with_idx(group_key_seq));
635                    }
636                }
637                Message::Chunk(chunk) => {
638                    Self::apply_chunk(&mut this, &mut vars, chunk).await?;
639
640                    if vars.dirty_groups.estimated_heap_size() >= this.max_dirty_groups_heap_size {
641                        // flush dirty groups if heap size is too large, to better prevent from OOM
642                        #[for_await]
643                        for chunk in Self::flush_data(&mut this, &mut vars) {
644                            yield Message::Chunk(chunk?);
645                        }
646                    }
647
648                    Self::try_flush_data(&mut this).await?;
649                }
650                Message::Barrier(barrier) => {
651                    #[for_await]
652                    for chunk in Self::flush_data(&mut this, &mut vars) {
653                        yield Message::Chunk(chunk?);
654                    }
655                    Self::flush_metrics(&this, &mut vars);
656                    let emit_on_window_close = this.emit_on_window_close;
657                    let post_commits = Self::commit_state_tables(&mut this, barrier.epoch).await?;
658
659                    if emit_on_window_close {
660                        // ignore watermarks on other columns
661                        if let Some(watermark) =
662                            vars.buffered_watermarks[window_col_idx_in_group_key].take()
663                        {
664                            yield Message::Watermark(watermark);
665                        }
666                    } else {
667                        for buffered_watermark in &mut vars.buffered_watermarks {
668                            if let Some(watermark) = buffered_watermark.take() {
669                                yield Message::Watermark(watermark);
670                            }
671                        }
672                    }
673
674                    let update_vnode_bitmap = barrier.as_update_vnode_bitmap(actor_id);
675                    yield Message::Barrier(barrier);
676
677                    // Update the vnode bitmap for state tables of all agg calls if asked.
678                    if let Some(cache_may_stale) =
679                        try_join_all(post_commits.into_iter().map(|post_commit| {
680                            post_commit.post_yield_barrier(update_vnode_bitmap.clone())
681                        }))
682                        .await?
683                        .pop()
684                        .expect("should have at least one table")
685                        .map(|(_, cache_may_stale)| cache_may_stale)
686                    {
687                        // Manipulate the cache if necessary.
688                        if cache_may_stale {
689                            vars.agg_group_cache.clear();
690                            vars.distinct_dedup.dedup_caches_mut().for_each(|cache| {
691                                cache.clear();
692                            });
693                        }
694                    }
695                }
696            }
697        }
698    }
699}