risingwave_stream/executor/top_n/
group_top_n_appendonly.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16
17use risingwave_common::array::Op;
18use risingwave_common::hash::HashKey;
19use risingwave_common::row::{RowDeserializer, RowExt};
20use risingwave_common::util::epoch::EpochPair;
21use risingwave_common::util::iter_util::ZipEqDebug;
22use risingwave_common::util::sort_util::ColumnOrder;
23
24use super::group_top_n::GroupTopNCache;
25use super::top_n_cache::AppendOnlyTopNCacheTrait;
26use super::utils::*;
27use super::{ManagedTopNState, TopNCache};
28use crate::common::metrics::MetricsInfo;
29use crate::common::table::state_table::StateTablePostCommit;
30use crate::executor::monitor::GroupTopNMetrics;
31use crate::executor::prelude::*;
32
33/// If the input is append-only, `AppendOnlyGroupTopNExecutor` does not need
34/// to keep all the rows seen. As long as a record
35/// is no longer in the result set, it can be deleted.
36pub type AppendOnlyGroupTopNExecutor<K, S, const WITH_TIES: bool> =
37    TopNExecutorWrapper<InnerAppendOnlyGroupTopNExecutor<K, S, WITH_TIES>>;
38
39impl<K: HashKey, S: StateStore, const WITH_TIES: bool>
40    AppendOnlyGroupTopNExecutor<K, S, WITH_TIES>
41{
42    #[allow(clippy::too_many_arguments)]
43    pub fn new(
44        input: Executor,
45        ctx: ActorContextRef,
46        schema: Schema,
47        storage_key: Vec<ColumnOrder>,
48        offset_and_limit: (usize, usize),
49        order_by: Vec<ColumnOrder>,
50        group_by: Vec<usize>,
51        state_table: StateTable<S>,
52        watermark_epoch: AtomicU64Ref,
53    ) -> StreamResult<Self> {
54        let inner = InnerAppendOnlyGroupTopNExecutor::new(
55            schema,
56            storage_key,
57            offset_and_limit,
58            order_by,
59            group_by,
60            state_table,
61            watermark_epoch,
62            &ctx,
63        )?;
64        Ok(TopNExecutorWrapper { input, ctx, inner })
65    }
66}
67
68pub struct InnerAppendOnlyGroupTopNExecutor<K: HashKey, S: StateStore, const WITH_TIES: bool> {
69    schema: Schema,
70
71    /// `LIMIT XXX`. None means no limit.
72    limit: usize,
73
74    /// `OFFSET XXX`. `0` means no offset.
75    offset: usize,
76
77    /// The storage key indices of the `AppendOnlyGroupTopNExecutor`
78    storage_key_indices: PkIndices,
79
80    managed_state: ManagedTopNState<S>,
81
82    /// which column we used to group the data.
83    group_by: Vec<usize>,
84
85    /// group key -> cache for this group
86    caches: GroupTopNCache<K, WITH_TIES>,
87
88    /// Used for serializing pk into `CacheKey`.
89    cache_key_serde: CacheKeySerde,
90
91    metrics: GroupTopNMetrics,
92}
93
94impl<K: HashKey, S: StateStore, const WITH_TIES: bool>
95    InnerAppendOnlyGroupTopNExecutor<K, S, WITH_TIES>
96{
97    #[allow(clippy::too_many_arguments)]
98    pub fn new(
99        schema: Schema,
100        storage_key: Vec<ColumnOrder>,
101        offset_and_limit: (usize, usize),
102        order_by: Vec<ColumnOrder>,
103        group_by: Vec<usize>,
104        state_table: StateTable<S>,
105        watermark_epoch: AtomicU64Ref,
106        ctx: &ActorContext,
107    ) -> StreamResult<Self> {
108        let metrics_info = MetricsInfo::new(
109            ctx.streaming_metrics.clone(),
110            state_table.table_id(),
111            ctx.id,
112            "AppendOnlyGroupTopN",
113        );
114        let metrics = ctx.streaming_metrics.new_append_only_group_top_n_metrics(
115            state_table.table_id(),
116            ctx.id,
117            ctx.fragment_id,
118        );
119
120        let cache_key_serde = create_cache_key_serde(&storage_key, &schema, &order_by, &group_by);
121        let managed_state = ManagedTopNState::<S>::new(state_table, cache_key_serde.clone());
122
123        Ok(Self {
124            schema,
125            offset: offset_and_limit.0,
126            limit: offset_and_limit.1,
127            managed_state,
128            storage_key_indices: storage_key.into_iter().map(|op| op.column_index).collect(),
129            group_by,
130            caches: GroupTopNCache::new(watermark_epoch, metrics_info),
131            cache_key_serde,
132            metrics,
133        })
134    }
135}
136
137impl<K: HashKey, S: StateStore, const WITH_TIES: bool> TopNExecutorBase
138    for InnerAppendOnlyGroupTopNExecutor<K, S, WITH_TIES>
139where
140    TopNCache<WITH_TIES>: AppendOnlyTopNCacheTrait,
141{
142    type State = S;
143
144    async fn apply_chunk(
145        &mut self,
146        chunk: StreamChunk,
147    ) -> StreamExecutorResult<Option<StreamChunk>> {
148        let keys = K::build_many(&self.group_by, chunk.data_chunk());
149        let mut stagings = HashMap::new(); // K -> `TopNStaging`
150
151        let data_types = self.schema.data_types();
152        let deserializer = RowDeserializer::new(data_types.clone());
153        for (r, group_cache_key) in chunk.rows_with_holes().zip_eq_debug(keys.iter()) {
154            let Some((op, row_ref)) = r else {
155                continue;
156            };
157
158            // The pk without group by
159            let pk_row = row_ref.project(&self.storage_key_indices[self.group_by.len()..]);
160            let cache_key = serialize_pk_to_cache_key(pk_row, &self.cache_key_serde);
161
162            let group_key = row_ref.project(&self.group_by);
163            self.metrics.group_top_n_total_query_cache_count.inc();
164            // If 'self.caches' does not already have a cache for the current group, create a new
165            // cache for it and insert it into `self.caches`
166            if !self.caches.contains(group_cache_key) {
167                self.metrics.group_top_n_cache_miss_count.inc();
168                let mut topn_cache = TopNCache::new(self.offset, self.limit, data_types.clone());
169                self.managed_state
170                    .init_topn_cache(Some(group_key), &mut topn_cache)
171                    .await?;
172                self.caches.push(group_cache_key.clone(), topn_cache);
173            }
174
175            let mut cache = self.caches.get_mut(group_cache_key).unwrap();
176            let staging = stagings.entry(group_cache_key.clone()).or_default();
177
178            debug_assert_eq!(op, Op::Insert);
179            cache.insert(
180                cache_key,
181                row_ref,
182                staging,
183                &mut self.managed_state,
184                &deserializer,
185            )?;
186        }
187
188        self.metrics
189            .group_top_n_cached_entry_count
190            .set(self.caches.len() as i64);
191
192        let mut chunk_builder = StreamChunkBuilder::unlimited(data_types, Some(chunk.capacity()));
193        for staging in stagings.into_values() {
194            for res in staging.into_deserialized_changes(&deserializer) {
195                let (op, row) = res?;
196                let _none = chunk_builder.append_row(op, row);
197            }
198        }
199
200        Ok(chunk_builder.take())
201    }
202
203    async fn flush_data(
204        &mut self,
205        epoch: EpochPair,
206    ) -> StreamExecutorResult<StateTablePostCommit<'_, S>> {
207        self.managed_state.flush(epoch).await
208    }
209
210    async fn try_flush_data(&mut self) -> StreamExecutorResult<()> {
211        self.managed_state.try_flush().await
212    }
213
214    fn clear_cache(&mut self) {
215        self.caches.clear();
216    }
217
218    fn evict(&mut self) {
219        self.caches.evict()
220    }
221
222    async fn init(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> {
223        self.managed_state.init_epoch(epoch).await
224    }
225
226    async fn handle_watermark(&mut self, watermark: Watermark) -> Option<Watermark> {
227        if watermark.col_idx == self.group_by[0] {
228            self.managed_state.update_watermark(watermark.val.clone());
229            Some(watermark)
230        } else {
231            None
232        }
233    }
234}