risingwave_stream/executor/top_n/
group_top_n_appendonly.rs1use std::collections::HashMap;
16
17use risingwave_common::array::Op;
18use risingwave_common::hash::HashKey;
19use risingwave_common::row::{RowDeserializer, RowExt};
20use risingwave_common::util::epoch::EpochPair;
21use risingwave_common::util::iter_util::ZipEqDebug;
22use risingwave_common::util::sort_util::ColumnOrder;
23
24use super::group_top_n::GroupTopNCache;
25use super::top_n_cache::AppendOnlyTopNCacheTrait;
26use super::utils::*;
27use super::{ManagedTopNState, TopNCache};
28use crate::common::metrics::MetricsInfo;
29use crate::common::table::state_table::StateTablePostCommit;
30use crate::executor::monitor::GroupTopNMetrics;
31use crate::executor::prelude::*;
32
33pub type AppendOnlyGroupTopNExecutor<K, S, const WITH_TIES: bool> =
37 TopNExecutorWrapper<InnerAppendOnlyGroupTopNExecutor<K, S, WITH_TIES>>;
38
39impl<K: HashKey, S: StateStore, const WITH_TIES: bool>
40 AppendOnlyGroupTopNExecutor<K, S, WITH_TIES>
41{
42 #[allow(clippy::too_many_arguments)]
43 pub fn new(
44 input: Executor,
45 ctx: ActorContextRef,
46 schema: Schema,
47 storage_key: Vec<ColumnOrder>,
48 offset_and_limit: (usize, usize),
49 order_by: Vec<ColumnOrder>,
50 group_by: Vec<usize>,
51 state_table: StateTable<S>,
52 watermark_epoch: AtomicU64Ref,
53 ) -> StreamResult<Self> {
54 let inner = InnerAppendOnlyGroupTopNExecutor::new(
55 schema,
56 storage_key,
57 offset_and_limit,
58 order_by,
59 group_by,
60 state_table,
61 watermark_epoch,
62 &ctx,
63 )?;
64 Ok(TopNExecutorWrapper { input, ctx, inner })
65 }
66}
67
68pub struct InnerAppendOnlyGroupTopNExecutor<K: HashKey, S: StateStore, const WITH_TIES: bool> {
69 schema: Schema,
70
71 limit: usize,
73
74 offset: usize,
76
77 storage_key_indices: PkIndices,
79
80 managed_state: ManagedTopNState<S>,
81
82 group_by: Vec<usize>,
84
85 caches: GroupTopNCache<K, WITH_TIES>,
87
88 cache_key_serde: CacheKeySerde,
90
91 topn_cache_min_capacity: usize,
93
94 metrics: GroupTopNMetrics,
95}
96
97impl<K: HashKey, S: StateStore, const WITH_TIES: bool>
98 InnerAppendOnlyGroupTopNExecutor<K, S, WITH_TIES>
99{
100 #[allow(clippy::too_many_arguments)]
101 pub fn new(
102 schema: Schema,
103 storage_key: Vec<ColumnOrder>,
104 offset_and_limit: (usize, usize),
105 order_by: Vec<ColumnOrder>,
106 group_by: Vec<usize>,
107 state_table: StateTable<S>,
108 watermark_epoch: AtomicU64Ref,
109 ctx: &ActorContext,
110 ) -> StreamResult<Self> {
111 let metrics_info = MetricsInfo::new(
112 ctx.streaming_metrics.clone(),
113 state_table.table_id(),
114 ctx.id,
115 "AppendOnlyGroupTopN",
116 );
117 let metrics = ctx.streaming_metrics.new_append_only_group_top_n_metrics(
118 state_table.table_id(),
119 ctx.id,
120 ctx.fragment_id,
121 );
122
123 let cache_key_serde = create_cache_key_serde(&storage_key, &schema, &order_by, &group_by);
124 let managed_state = ManagedTopNState::<S>::new(state_table, cache_key_serde.clone());
125
126 Ok(Self {
127 schema,
128 offset: offset_and_limit.0,
129 limit: offset_and_limit.1,
130 managed_state,
131 storage_key_indices: storage_key.into_iter().map(|op| op.column_index).collect(),
132 group_by,
133 caches: GroupTopNCache::new(watermark_epoch, metrics_info),
134 cache_key_serde,
135 topn_cache_min_capacity: ctx.streaming_config.developer.topn_cache_min_capacity,
136 metrics,
137 })
138 }
139}
140
141impl<K: HashKey, S: StateStore, const WITH_TIES: bool> TopNExecutorBase
142 for InnerAppendOnlyGroupTopNExecutor<K, S, WITH_TIES>
143where
144 TopNCache<WITH_TIES>: AppendOnlyTopNCacheTrait,
145{
146 type State = S;
147
148 async fn apply_chunk(
149 &mut self,
150 chunk: StreamChunk,
151 ) -> StreamExecutorResult<Option<StreamChunk>> {
152 let keys = K::build_many(&self.group_by, chunk.data_chunk());
153 let mut stagings = HashMap::new(); let data_types = self.schema.data_types();
156 let deserializer = RowDeserializer::new(data_types.clone());
157 for (r, group_cache_key) in chunk.rows_with_holes().zip_eq_debug(keys.iter()) {
158 let Some((op, row_ref)) = r else {
159 continue;
160 };
161
162 let pk_row = row_ref.project(&self.storage_key_indices[self.group_by.len()..]);
164 let cache_key = serialize_pk_to_cache_key(pk_row, &self.cache_key_serde);
165
166 let group_key = row_ref.project(&self.group_by);
167 self.metrics.group_top_n_total_query_cache_count.inc();
168 if !self.caches.contains(group_cache_key) {
171 self.metrics.group_top_n_cache_miss_count.inc();
172 let mut topn_cache = TopNCache::with_min_capacity(
173 self.offset,
174 self.limit,
175 data_types.clone(),
176 self.topn_cache_min_capacity,
177 );
178 self.managed_state
179 .init_append_only_topn_cache(Some(group_key), &mut topn_cache)
180 .await?;
181 self.caches.put(group_cache_key.clone(), topn_cache);
182 }
183
184 let mut cache = self.caches.get_mut(group_cache_key).unwrap();
185 let staging = stagings.entry(group_cache_key.clone()).or_default();
186
187 debug_assert_eq!(op, Op::Insert);
188 cache.insert(
189 cache_key,
190 row_ref,
191 staging,
192 &mut self.managed_state,
193 &deserializer,
194 )?;
195 }
196
197 self.metrics
198 .group_top_n_cached_entry_count
199 .set(self.caches.len() as i64);
200
201 let mut chunk_builder = StreamChunkBuilder::unlimited(data_types, Some(chunk.capacity()));
202 for staging in stagings.into_values() {
203 for res in staging.into_deserialized_changes(&deserializer) {
204 let (op, row) = res?;
205 let _none = chunk_builder.append_row(op, row);
206 }
207 }
208
209 Ok(chunk_builder.take())
210 }
211
212 async fn flush_data(
213 &mut self,
214 epoch: EpochPair,
215 ) -> StreamExecutorResult<StateTablePostCommit<'_, S>> {
216 self.managed_state.flush(epoch).await
217 }
218
219 async fn try_flush_data(&mut self) -> StreamExecutorResult<()> {
220 self.managed_state.try_flush().await
221 }
222
223 fn clear_cache(&mut self) {
224 self.caches.clear();
225 }
226
227 fn evict(&mut self) {
228 self.caches.evict()
229 }
230
231 async fn init(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> {
232 self.managed_state.init_epoch(epoch).await
233 }
234
235 async fn handle_watermark(&mut self, watermark: Watermark) -> Option<Watermark> {
236 if watermark.col_idx == self.group_by[0] {
237 self.managed_state.update_watermark(watermark.val.clone());
238 Some(watermark)
239 } else {
240 None
241 }
242 }
243}