risingwave_stream/executor/top_n/
group_top_n_appendonly.rs1use std::collections::HashMap;
16
17use risingwave_common::array::Op;
18use risingwave_common::hash::HashKey;
19use risingwave_common::row::{RowDeserializer, RowExt};
20use risingwave_common::util::epoch::EpochPair;
21use risingwave_common::util::iter_util::ZipEqDebug;
22use risingwave_common::util::sort_util::ColumnOrder;
23
24use super::group_top_n::GroupTopNCache;
25use super::top_n_cache::AppendOnlyTopNCacheTrait;
26use super::utils::*;
27use super::{ManagedTopNState, TopNCache};
28use crate::common::metrics::MetricsInfo;
29use crate::common::table::state_table::StateTablePostCommit;
30use crate::executor::monitor::GroupTopNMetrics;
31use crate::executor::prelude::*;
32
33pub type AppendOnlyGroupTopNExecutor<K, S, const WITH_TIES: bool> =
37 TopNExecutorWrapper<InnerAppendOnlyGroupTopNExecutor<K, S, WITH_TIES>>;
38
39impl<K: HashKey, S: StateStore, const WITH_TIES: bool>
40 AppendOnlyGroupTopNExecutor<K, S, WITH_TIES>
41{
42 #[allow(clippy::too_many_arguments)]
43 pub fn new(
44 input: Executor,
45 ctx: ActorContextRef,
46 schema: Schema,
47 storage_key: Vec<ColumnOrder>,
48 offset_and_limit: (usize, usize),
49 order_by: Vec<ColumnOrder>,
50 group_by: Vec<usize>,
51 state_table: StateTable<S>,
52 watermark_epoch: AtomicU64Ref,
53 ) -> StreamResult<Self> {
54 let inner = InnerAppendOnlyGroupTopNExecutor::new(
55 schema,
56 storage_key,
57 offset_and_limit,
58 order_by,
59 group_by,
60 state_table,
61 watermark_epoch,
62 &ctx,
63 )?;
64 Ok(TopNExecutorWrapper { input, ctx, inner })
65 }
66}
67
68pub struct InnerAppendOnlyGroupTopNExecutor<K: HashKey, S: StateStore, const WITH_TIES: bool> {
69 schema: Schema,
70
71 limit: usize,
73
74 offset: usize,
76
77 storage_key_indices: PkIndices,
79
80 managed_state: ManagedTopNState<S>,
81
82 group_by: Vec<usize>,
84
85 caches: GroupTopNCache<K, WITH_TIES>,
87
88 cache_key_serde: CacheKeySerde,
90
91 metrics: GroupTopNMetrics,
92}
93
94impl<K: HashKey, S: StateStore, const WITH_TIES: bool>
95 InnerAppendOnlyGroupTopNExecutor<K, S, WITH_TIES>
96{
97 #[allow(clippy::too_many_arguments)]
98 pub fn new(
99 schema: Schema,
100 storage_key: Vec<ColumnOrder>,
101 offset_and_limit: (usize, usize),
102 order_by: Vec<ColumnOrder>,
103 group_by: Vec<usize>,
104 state_table: StateTable<S>,
105 watermark_epoch: AtomicU64Ref,
106 ctx: &ActorContext,
107 ) -> StreamResult<Self> {
108 let metrics_info = MetricsInfo::new(
109 ctx.streaming_metrics.clone(),
110 state_table.table_id(),
111 ctx.id,
112 "AppendOnlyGroupTopN",
113 );
114 let metrics = ctx.streaming_metrics.new_append_only_group_top_n_metrics(
115 state_table.table_id(),
116 ctx.id,
117 ctx.fragment_id,
118 );
119
120 let cache_key_serde = create_cache_key_serde(&storage_key, &schema, &order_by, &group_by);
121 let managed_state = ManagedTopNState::<S>::new(state_table, cache_key_serde.clone());
122
123 Ok(Self {
124 schema,
125 offset: offset_and_limit.0,
126 limit: offset_and_limit.1,
127 managed_state,
128 storage_key_indices: storage_key.into_iter().map(|op| op.column_index).collect(),
129 group_by,
130 caches: GroupTopNCache::new(watermark_epoch, metrics_info),
131 cache_key_serde,
132 metrics,
133 })
134 }
135}
136
137impl<K: HashKey, S: StateStore, const WITH_TIES: bool> TopNExecutorBase
138 for InnerAppendOnlyGroupTopNExecutor<K, S, WITH_TIES>
139where
140 TopNCache<WITH_TIES>: AppendOnlyTopNCacheTrait,
141{
142 type State = S;
143
144 async fn apply_chunk(
145 &mut self,
146 chunk: StreamChunk,
147 ) -> StreamExecutorResult<Option<StreamChunk>> {
148 let keys = K::build_many(&self.group_by, chunk.data_chunk());
149 let mut stagings = HashMap::new(); let data_types = self.schema.data_types();
152 let deserializer = RowDeserializer::new(data_types.clone());
153 for (r, group_cache_key) in chunk.rows_with_holes().zip_eq_debug(keys.iter()) {
154 let Some((op, row_ref)) = r else {
155 continue;
156 };
157
158 let pk_row = row_ref.project(&self.storage_key_indices[self.group_by.len()..]);
160 let cache_key = serialize_pk_to_cache_key(pk_row, &self.cache_key_serde);
161
162 let group_key = row_ref.project(&self.group_by);
163 self.metrics.group_top_n_total_query_cache_count.inc();
164 if !self.caches.contains(group_cache_key) {
167 self.metrics.group_top_n_cache_miss_count.inc();
168 let mut topn_cache = TopNCache::new(self.offset, self.limit, data_types.clone());
169 self.managed_state
170 .init_topn_cache(Some(group_key), &mut topn_cache)
171 .await?;
172 self.caches.push(group_cache_key.clone(), topn_cache);
173 }
174
175 let mut cache = self.caches.get_mut(group_cache_key).unwrap();
176 let staging = stagings.entry(group_cache_key.clone()).or_default();
177
178 debug_assert_eq!(op, Op::Insert);
179 cache.insert(
180 cache_key,
181 row_ref,
182 staging,
183 &mut self.managed_state,
184 &deserializer,
185 )?;
186 }
187
188 self.metrics
189 .group_top_n_cached_entry_count
190 .set(self.caches.len() as i64);
191
192 let mut chunk_builder = StreamChunkBuilder::unlimited(data_types, Some(chunk.capacity()));
193 for staging in stagings.into_values() {
194 for res in staging.into_deserialized_changes(&deserializer) {
195 let (op, row) = res?;
196 let _none = chunk_builder.append_row(op, row);
197 }
198 }
199
200 Ok(chunk_builder.take())
201 }
202
203 async fn flush_data(
204 &mut self,
205 epoch: EpochPair,
206 ) -> StreamExecutorResult<StateTablePostCommit<'_, S>> {
207 self.managed_state.flush(epoch).await
208 }
209
210 async fn try_flush_data(&mut self) -> StreamExecutorResult<()> {
211 self.managed_state.try_flush().await
212 }
213
214 fn clear_cache(&mut self) {
215 self.caches.clear();
216 }
217
218 fn evict(&mut self) {
219 self.caches.evict()
220 }
221
222 async fn init(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> {
223 self.managed_state.init_epoch(epoch).await
224 }
225
226 async fn handle_watermark(&mut self, watermark: Watermark) -> Option<Watermark> {
227 if watermark.col_idx == self.group_by[0] {
228 self.managed_state.update_watermark(watermark.val.clone());
229 Some(watermark)
230 } else {
231 None
232 }
233 }
234}