risingwave_stream/executor/top_n/
top_n_appendonly.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::array::Op;
16use risingwave_common::row::{RowDeserializer, RowExt};
17use risingwave_common::util::epoch::EpochPair;
18use risingwave_common::util::sort_util::ColumnOrder;
19
20use super::top_n_cache::{AppendOnlyTopNCacheTrait, TopNStaging};
21use super::utils::*;
22use super::{ManagedTopNState, TopNCache};
23use crate::common::table::state_table::StateTablePostCommit;
24use crate::executor::prelude::*;
25
26/// If the input is append-only, `AppendOnlyGroupTopNExecutor` does not need
27/// to keep all the rows seen. As long as a record
28/// is no longer in the result set, it can be deleted.
29///
30/// TODO: Optimization: primary key may contain several columns and is used to determine
31/// the order, therefore the value part should not contain the same columns to save space.
32pub type AppendOnlyTopNExecutor<S, const WITH_TIES: bool> =
33    TopNExecutorWrapper<InnerAppendOnlyTopNExecutor<S, WITH_TIES>>;
34
35impl<S: StateStore, const WITH_TIES: bool> AppendOnlyTopNExecutor<S, WITH_TIES> {
36    #[allow(clippy::too_many_arguments)]
37    pub fn new(
38        input: Executor,
39        ctx: ActorContextRef,
40        schema: Schema,
41        storage_key: Vec<ColumnOrder>,
42        offset_and_limit: (usize, usize),
43        order_by: Vec<ColumnOrder>,
44        state_table: StateTable<S>,
45    ) -> StreamResult<Self> {
46        Ok(TopNExecutorWrapper {
47            input,
48            ctx,
49            inner: InnerAppendOnlyTopNExecutor::new(
50                schema,
51                storage_key,
52                offset_and_limit,
53                order_by,
54                state_table,
55            )?,
56        })
57    }
58}
59
60pub struct InnerAppendOnlyTopNExecutor<S: StateStore, const WITH_TIES: bool> {
61    schema: Schema,
62
63    /// The storage key indices of the `TopNExecutor`
64    storage_key_indices: PkIndices,
65
66    /// We are interested in which element is in the range of [offset, offset+limit).
67    managed_state: ManagedTopNState<S>,
68
69    /// In-memory cache of top (N + N * `TOPN_CACHE_HIGH_CAPACITY_FACTOR`) rows
70    /// TODO: support WITH TIES
71    cache: TopNCache<WITH_TIES>,
72
73    /// Used for serializing pk into `CacheKey`.
74    cache_key_serde: CacheKeySerde,
75}
76
77impl<S: StateStore, const WITH_TIES: bool> InnerAppendOnlyTopNExecutor<S, WITH_TIES> {
78    #[allow(clippy::too_many_arguments)]
79    pub fn new(
80        schema: Schema,
81        storage_key: Vec<ColumnOrder>,
82        offset_and_limit: (usize, usize),
83        order_by: Vec<ColumnOrder>,
84        state_table: StateTable<S>,
85    ) -> StreamResult<Self> {
86        let num_offset = offset_and_limit.0;
87        let num_limit = offset_and_limit.1;
88
89        let cache_key_serde = create_cache_key_serde(&storage_key, &schema, &order_by, &[]);
90        let managed_state = ManagedTopNState::<S>::new(state_table, cache_key_serde.clone());
91        let data_types = schema.data_types();
92
93        Ok(Self {
94            schema,
95            managed_state,
96            storage_key_indices: storage_key.into_iter().map(|op| op.column_index).collect(),
97            cache: TopNCache::new(num_offset, num_limit, data_types),
98            cache_key_serde,
99        })
100    }
101}
102
103impl<S: StateStore, const WITH_TIES: bool> TopNExecutorBase
104    for InnerAppendOnlyTopNExecutor<S, WITH_TIES>
105where
106    TopNCache<WITH_TIES>: AppendOnlyTopNCacheTrait,
107{
108    type State = S;
109
110    async fn apply_chunk(
111        &mut self,
112        chunk: StreamChunk,
113    ) -> StreamExecutorResult<Option<StreamChunk>> {
114        let mut staging = TopNStaging::new();
115        let data_types = self.schema.data_types();
116        let deserializer = RowDeserializer::new(data_types.clone());
117        // apply the chunk to state table
118        for (op, row_ref) in chunk.rows() {
119            debug_assert_eq!(op, Op::Insert);
120            let pk_row = row_ref.project(&self.storage_key_indices);
121            let cache_key = serialize_pk_to_cache_key(pk_row, &self.cache_key_serde);
122            self.cache.insert(
123                cache_key,
124                row_ref,
125                &mut staging,
126                &mut self.managed_state,
127                &deserializer,
128            )?;
129        }
130
131        if staging.is_empty() {
132            return Ok(None);
133        }
134        let mut chunk_builder = StreamChunkBuilder::unlimited(data_types, Some(staging.len()));
135        for res in staging.into_deserialized_changes(&deserializer) {
136            let (op, row) = res?;
137            let _none = chunk_builder.append_row(op, row);
138        }
139        Ok(chunk_builder.take())
140    }
141
142    async fn flush_data(
143        &mut self,
144        epoch: EpochPair,
145    ) -> StreamExecutorResult<StateTablePostCommit<'_, S>> {
146        self.managed_state.flush(epoch).await
147    }
148
149    async fn try_flush_data(&mut self) -> StreamExecutorResult<()> {
150        self.managed_state.try_flush().await
151    }
152
153    async fn init(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> {
154        self.managed_state.init_epoch(epoch).await?;
155        self.managed_state
156            .init_topn_cache(NO_GROUP_KEY, &mut self.cache)
157            .await
158    }
159
160    async fn handle_watermark(&mut self, _: Watermark) -> Option<Watermark> {
161        // TODO(yuhao): handle watermark
162        None
163    }
164}
165
166#[cfg(test)]
167mod tests {
168
169    use risingwave_common::array::StreamChunk;
170    use risingwave_common::array::stream_chunk::StreamChunkTestExt;
171    use risingwave_common::catalog::{Field, Schema};
172    use risingwave_common::types::DataType;
173    use risingwave_common::util::epoch::test_epoch;
174    use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
175
176    use super::AppendOnlyTopNExecutor;
177    use crate::executor::test_utils::top_n_executor::create_in_memory_state_table;
178    use crate::executor::test_utils::{MockSource, StreamExecutorTestExt};
179    use crate::executor::{ActorContext, Barrier, Execute, Executor, Message, PkIndices};
180
181    fn create_stream_chunks() -> Vec<StreamChunk> {
182        let chunk1 = StreamChunk::from_pretty(
183            "  I I
184            +  1 0
185            +  2 1
186            +  3 2
187            + 10 3
188            +  9 4
189            +  8 5",
190        );
191        let chunk2 = StreamChunk::from_pretty(
192            "  I I
193            +  7 6
194            +  3 7
195            +  1 8
196            +  9 9",
197        );
198        let chunk3 = StreamChunk::from_pretty(
199            " I  I
200            + 1 12
201            + 1 13
202            + 2 14
203            + 3 15",
204        );
205        vec![chunk1, chunk2, chunk3]
206    }
207
208    fn create_schema() -> Schema {
209        Schema {
210            fields: vec![
211                Field::unnamed(DataType::Int64),
212                Field::unnamed(DataType::Int64),
213            ],
214        }
215    }
216
217    fn storage_key() -> Vec<ColumnOrder> {
218        order_by()
219    }
220
221    fn order_by() -> Vec<ColumnOrder> {
222        vec![
223            ColumnOrder::new(0, OrderType::ascending()),
224            ColumnOrder::new(1, OrderType::ascending()),
225        ]
226    }
227
228    fn pk_indices() -> PkIndices {
229        vec![0, 1]
230    }
231
232    fn create_source() -> Executor {
233        let mut chunks = create_stream_chunks();
234        MockSource::with_messages(vec![
235            Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
236            Message::Chunk(std::mem::take(&mut chunks[0])),
237            Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
238            Message::Chunk(std::mem::take(&mut chunks[1])),
239            Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
240            Message::Chunk(std::mem::take(&mut chunks[2])),
241        ])
242        .into_executor(create_schema(), pk_indices())
243    }
244
245    #[tokio::test]
246    async fn test_append_only_top_n_executor_with_limit() {
247        let storage_key = storage_key();
248        let source = create_source();
249        let state_table = create_in_memory_state_table(
250            &[DataType::Int64, DataType::Int64],
251            &[OrderType::ascending(), OrderType::ascending()],
252            &pk_indices(),
253        )
254        .await;
255
256        let schema = source.schema().clone();
257        let top_n = AppendOnlyTopNExecutor::<_, false>::new(
258            source,
259            ActorContext::for_test(0),
260            schema,
261            storage_key,
262            (0, 5),
263            order_by(),
264            state_table,
265        )
266        .unwrap();
267        let mut top_n = top_n.boxed().execute();
268
269        // consume the init epoch
270        top_n.expect_barrier().await;
271        assert_eq!(
272            top_n.expect_chunk().await.sort_rows(),
273            StreamChunk::from_pretty(
274                "  I I
275                +  1 0
276                +  2 1
277                +  3 2
278                +  9 4
279                +  8 5"
280            )
281            .sort_rows(),
282        );
283        // We added (1, 2, 3, 10, 9, 8).
284        // Now (1, 2, 3, 8, 9)
285        // Barrier
286        top_n.expect_barrier().await;
287        assert_eq!(
288            top_n.expect_chunk().await.sort_rows(),
289            StreamChunk::from_pretty(
290                " I I
291                - 9 4
292                - 8 5
293                + 3 7
294                + 1 8"
295            )
296            .sort_rows(),
297        );
298        // We added (7, 3, 1, 9).
299        // Now (1, 1, 2, 3, 3)
300        // Barrier
301        top_n.expect_barrier().await;
302        assert_eq!(
303            top_n.expect_chunk().await.sort_rows(),
304            StreamChunk::from_pretty(
305                " I  I
306                - 3  7
307                + 1 12
308                - 3  2
309                + 1 13"
310            )
311            .sort_rows(),
312        );
313        // We added (1, 1, 2, 3).
314        // Now (1, 1, 1, 1, 2)
315    }
316
317    #[tokio::test]
318    async fn test_append_only_top_n_executor_with_offset_and_limit() {
319        let source = create_source();
320        let state_table = create_in_memory_state_table(
321            &[DataType::Int64, DataType::Int64],
322            &[OrderType::ascending(), OrderType::ascending()],
323            &pk_indices(),
324        )
325        .await;
326
327        let schema = source.schema().clone();
328        let top_n = AppendOnlyTopNExecutor::<_, false>::new(
329            source,
330            ActorContext::for_test(0),
331            schema,
332            storage_key(),
333            (3, 4),
334            order_by(),
335            state_table,
336        )
337        .unwrap();
338        let mut top_n = top_n.boxed().execute();
339
340        // consume the init epoch
341        top_n.expect_barrier().await;
342        assert_eq!(
343            top_n.expect_chunk().await.sort_rows(),
344            StreamChunk::from_pretty(
345                "  I I
346                + 10 3
347                +  9 4
348                +  8 5"
349            )
350            .sort_rows(),
351        );
352        // We added (1, 2, 3, 10, 9, 8).
353        // Now (1, 2, 3) -> (8, 9, 10)
354        // barrier
355        top_n.expect_barrier().await;
356        assert_eq!(
357            top_n.expect_chunk().await.sort_rows(),
358            StreamChunk::from_pretty(
359                "  I I
360                +  7 6
361                - 10 3
362                +  3 7
363                -  9 4
364                +  3 2"
365            )
366            .sort_rows(),
367        );
368        // We added (7, 3, 1, 9).
369        // Now (1, 1, 2) -> (3, 3, 7, 8)
370        // barrier
371        top_n.expect_barrier().await;
372        assert_eq!(
373            top_n.expect_chunk().await.sort_rows(),
374            StreamChunk::from_pretty(
375                " I  I
376                - 8  5
377                + 2  1
378                - 7  6
379                + 1 13
380                - 3  7
381                + 2 14"
382            )
383            .sort_rows(),
384        );
385        // We added (1, 1, 2, 3).
386        // Now (1, 1, 1) -> (1, 2, 2, 3)
387    }
388}