risingwave_stream/executor/top_n/
group_top_n_appendonly.rs1use std::collections::HashMap;
16
17use risingwave_common::array::Op;
18use risingwave_common::hash::HashKey;
19use risingwave_common::row::{RowDeserializer, RowExt};
20use risingwave_common::util::epoch::EpochPair;
21use risingwave_common::util::iter_util::ZipEqDebug;
22use risingwave_common::util::sort_util::ColumnOrder;
23
24use super::group_top_n::GroupTopNCache;
25use super::top_n_cache::AppendOnlyTopNCacheTrait;
26use super::utils::*;
27use super::{ManagedTopNState, TopNCache};
28use crate::common::metrics::MetricsInfo;
29use crate::common::table::state_table::StateTablePostCommit;
30use crate::executor::monitor::GroupTopNMetrics;
31use crate::executor::prelude::*;
32use crate::executor::top_n::top_n_cache::TopNStaging;
33
34pub type AppendOnlyGroupTopNExecutor<K, S, const WITH_TIES: bool> =
38 TopNExecutorWrapper<InnerAppendOnlyGroupTopNExecutor<K, S, WITH_TIES>>;
39
40impl<K: HashKey, S: StateStore, const WITH_TIES: bool>
41 AppendOnlyGroupTopNExecutor<K, S, WITH_TIES>
42{
43 #[allow(clippy::too_many_arguments)]
44 pub fn new(
45 input: Executor,
46 ctx: ActorContextRef,
47 schema: Schema,
48 storage_key: Vec<ColumnOrder>,
49 offset_and_limit: (usize, usize),
50 order_by: Vec<ColumnOrder>,
51 group_by: Vec<usize>,
52 state_table: StateTable<S>,
53 watermark_epoch: AtomicU64Ref,
54 ) -> StreamResult<Self> {
55 let inner = InnerAppendOnlyGroupTopNExecutor::new(
56 schema,
57 storage_key,
58 offset_and_limit,
59 order_by,
60 group_by,
61 state_table,
62 watermark_epoch,
63 &ctx,
64 )?;
65 Ok(TopNExecutorWrapper { input, ctx, inner })
66 }
67}
68
69pub struct InnerAppendOnlyGroupTopNExecutor<K: HashKey, S: StateStore, const WITH_TIES: bool> {
70 schema: Schema,
71
72 limit: usize,
74
75 offset: usize,
77
78 storage_key_indices: PkIndices,
80
81 managed_state: ManagedTopNState<S>,
82
83 group_by: Vec<usize>,
85
86 caches: GroupTopNCache<K, WITH_TIES>,
88
89 cache_key_serde: CacheKeySerde,
91
92 topn_cache_min_capacity: usize,
94
95 metrics: GroupTopNMetrics,
96}
97
98impl<K: HashKey, S: StateStore, const WITH_TIES: bool>
99 InnerAppendOnlyGroupTopNExecutor<K, S, WITH_TIES>
100{
101 #[allow(clippy::too_many_arguments)]
102 pub fn new(
103 schema: Schema,
104 storage_key: Vec<ColumnOrder>,
105 offset_and_limit: (usize, usize),
106 order_by: Vec<ColumnOrder>,
107 group_by: Vec<usize>,
108 state_table: StateTable<S>,
109 watermark_epoch: AtomicU64Ref,
110 ctx: &ActorContext,
111 ) -> StreamResult<Self> {
112 let metrics_info = MetricsInfo::new(
113 ctx.streaming_metrics.clone(),
114 state_table.table_id(),
115 ctx.id,
116 "AppendOnlyGroupTopN",
117 );
118 let metrics = ctx.streaming_metrics.new_append_only_group_top_n_metrics(
119 state_table.table_id(),
120 ctx.id,
121 ctx.fragment_id,
122 );
123
124 let cache_key_serde = create_cache_key_serde(&storage_key, &schema, &order_by, &group_by);
125 let managed_state = ManagedTopNState::<S>::new(state_table, cache_key_serde.clone());
126
127 Ok(Self {
128 schema,
129 offset: offset_and_limit.0,
130 limit: offset_and_limit.1,
131 managed_state,
132 storage_key_indices: storage_key.into_iter().map(|op| op.column_index).collect(),
133 group_by,
134 caches: GroupTopNCache::new(watermark_epoch, metrics_info),
135 cache_key_serde,
136 topn_cache_min_capacity: ctx.streaming_config.developer.topn_cache_min_capacity,
137 metrics,
138 })
139 }
140}
141
142impl<K: HashKey, S: StateStore, const WITH_TIES: bool> TopNExecutorBase
143 for InnerAppendOnlyGroupTopNExecutor<K, S, WITH_TIES>
144where
145 TopNCache<WITH_TIES>: AppendOnlyTopNCacheTrait,
146{
147 type State = S;
148
149 async fn apply_chunk(
150 &mut self,
151 chunk: StreamChunk,
152 ) -> StreamExecutorResult<Option<StreamChunk>> {
153 let keys = K::build_many(&self.group_by, chunk.data_chunk());
154 let mut stagings: HashMap<K, TopNStaging> = HashMap::new(); let data_types = self.schema.data_types();
157 let deserializer = RowDeserializer::new(data_types.clone());
158 for (r, group_cache_key) in chunk.rows_with_holes().zip_eq_debug(keys.iter()) {
159 let Some((op, row_ref)) = r else {
160 continue;
161 };
162
163 let pk_row = row_ref.project(&self.storage_key_indices[self.group_by.len()..]);
165 let cache_key = serialize_pk_to_cache_key(pk_row, &self.cache_key_serde);
166
167 let group_key = row_ref.project(&self.group_by);
168 self.metrics.group_top_n_total_query_cache_count.inc();
169 if !self.caches.contains(group_cache_key) {
172 self.metrics.group_top_n_cache_miss_count.inc();
173 let mut topn_cache = TopNCache::with_min_capacity(
174 self.offset,
175 self.limit,
176 data_types.clone(),
177 self.topn_cache_min_capacity,
178 );
179 self.managed_state
180 .init_append_only_topn_cache(Some(group_key), &mut topn_cache)
181 .await?;
182 self.caches.put(group_cache_key.clone(), topn_cache);
183 }
184
185 let mut cache = self.caches.get_mut(group_cache_key).unwrap();
186 let staging = stagings.entry(group_cache_key.clone()).or_default();
187
188 debug_assert_eq!(op, Op::Insert);
189 cache.insert(
190 cache_key,
191 row_ref,
192 staging,
193 &mut self.managed_state,
194 &deserializer,
195 )?;
196 }
197
198 self.metrics
199 .group_top_n_cached_entry_count
200 .set(self.caches.len() as i64);
201
202 let mut chunk_builder = StreamChunkBuilder::unlimited(data_types, Some(chunk.capacity()));
203 for staging in stagings.into_values() {
204 for res in staging.into_deserialized_changes(&deserializer) {
205 let record = res?;
206 let _none = chunk_builder.append_record(record);
207 }
208 }
209
210 Ok(chunk_builder.take())
211 }
212
213 async fn flush_data(
214 &mut self,
215 epoch: EpochPair,
216 ) -> StreamExecutorResult<StateTablePostCommit<'_, S>> {
217 self.managed_state.flush(epoch).await
218 }
219
220 async fn try_flush_data(&mut self) -> StreamExecutorResult<()> {
221 self.managed_state.try_flush().await
222 }
223
224 fn clear_cache(&mut self) {
225 self.caches.clear();
226 }
227
228 fn evict(&mut self) {
229 self.caches.evict()
230 }
231
232 async fn init(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> {
233 self.managed_state.init_epoch(epoch).await
234 }
235
236 async fn handle_watermark(&mut self, watermark: Watermark) -> Option<Watermark> {
237 if watermark.col_idx == self.group_by[0] {
238 self.managed_state.update_watermark(watermark.val.clone());
239 Some(watermark)
240 } else {
241 None
242 }
243 }
244}