risingwave_stream/executor/top_n/
top_n_appendonly.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::array::Op;
16use risingwave_common::row::{RowDeserializer, RowExt};
17use risingwave_common::util::epoch::EpochPair;
18use risingwave_common::util::sort_util::ColumnOrder;
19
20use super::top_n_cache::{AppendOnlyTopNCacheTrait, TopNStaging};
21use super::utils::*;
22use super::{ManagedTopNState, TopNCache};
23use crate::common::table::state_table::StateTablePostCommit;
24use crate::executor::prelude::*;
25
26/// If the input is append-only, `AppendOnlyGroupTopNExecutor` does not need
27/// to keep all the rows seen. As long as a record
28/// is no longer in the result set, it can be deleted.
29///
30/// TODO: Optimization: primary key may contain several columns and is used to determine
31/// the order, therefore the value part should not contain the same columns to save space.
32pub type AppendOnlyTopNExecutor<S, const WITH_TIES: bool> =
33    TopNExecutorWrapper<InnerAppendOnlyTopNExecutor<S, WITH_TIES>>;
34
35impl<S: StateStore, const WITH_TIES: bool> AppendOnlyTopNExecutor<S, WITH_TIES> {
36    pub fn new(
37        input: Executor,
38        ctx: ActorContextRef,
39        schema: Schema,
40        storage_key: Vec<ColumnOrder>,
41        offset_and_limit: (usize, usize),
42        order_by: Vec<ColumnOrder>,
43        state_table: StateTable<S>,
44    ) -> StreamResult<Self> {
45        Ok(TopNExecutorWrapper {
46            input,
47            ctx,
48            inner: InnerAppendOnlyTopNExecutor::new(
49                schema,
50                storage_key,
51                offset_and_limit,
52                order_by,
53                state_table,
54            )?,
55        })
56    }
57}
58
59pub struct InnerAppendOnlyTopNExecutor<S: StateStore, const WITH_TIES: bool> {
60    schema: Schema,
61
62    /// The storage key indices of the `TopNExecutor`
63    storage_key_indices: Vec<usize>,
64
65    /// We are interested in which element is in the range of [offset, offset+limit).
66    managed_state: ManagedTopNState<S>,
67
68    /// In-memory cache of top (N + N * `TOPN_CACHE_HIGH_CAPACITY_FACTOR`) rows
69    /// TODO: support WITH TIES
70    cache: TopNCache<WITH_TIES>,
71
72    /// Used for serializing pk into `CacheKey`.
73    cache_key_serde: CacheKeySerde,
74}
75
76impl<S: StateStore, const WITH_TIES: bool> InnerAppendOnlyTopNExecutor<S, WITH_TIES> {
77    pub fn new(
78        schema: Schema,
79        storage_key: Vec<ColumnOrder>,
80        offset_and_limit: (usize, usize),
81        order_by: Vec<ColumnOrder>,
82        state_table: StateTable<S>,
83    ) -> StreamResult<Self> {
84        let num_offset = offset_and_limit.0;
85        let num_limit = offset_and_limit.1;
86
87        let cache_key_serde = create_cache_key_serde(&storage_key, &schema, &order_by, &[]);
88        let managed_state = ManagedTopNState::<S>::new(state_table, cache_key_serde.clone());
89        let data_types = schema.data_types();
90
91        Ok(Self {
92            schema,
93            managed_state,
94            storage_key_indices: storage_key.into_iter().map(|op| op.column_index).collect(),
95            cache: TopNCache::new(num_offset, num_limit, data_types),
96            cache_key_serde,
97        })
98    }
99}
100
101impl<S: StateStore, const WITH_TIES: bool> TopNExecutorBase
102    for InnerAppendOnlyTopNExecutor<S, WITH_TIES>
103where
104    TopNCache<WITH_TIES>: AppendOnlyTopNCacheTrait,
105{
106    type State = S;
107
108    async fn apply_chunk(
109        &mut self,
110        chunk: StreamChunk,
111    ) -> StreamExecutorResult<Option<StreamChunk>> {
112        let mut staging = TopNStaging::new();
113        let data_types = self.schema.data_types();
114        let deserializer = RowDeserializer::new(data_types.clone());
115        // apply the chunk to state table
116        for (op, row_ref) in chunk.rows() {
117            debug_assert_eq!(op, Op::Insert);
118            let pk_row = row_ref.project(&self.storage_key_indices);
119            let cache_key = serialize_pk_to_cache_key(pk_row, &self.cache_key_serde);
120            self.cache.insert(
121                cache_key,
122                row_ref,
123                &mut staging,
124                &mut self.managed_state,
125                &deserializer,
126            )?;
127        }
128
129        if staging.is_empty() {
130            return Ok(None);
131        }
132        let mut chunk_builder = StreamChunkBuilder::unlimited(data_types, Some(staging.len()));
133        for res in staging.into_deserialized_changes(&deserializer) {
134            let record = res?;
135            let _none = chunk_builder.append_record(record);
136        }
137        Ok(chunk_builder.take())
138    }
139
140    async fn flush_data(
141        &mut self,
142        epoch: EpochPair,
143    ) -> StreamExecutorResult<StateTablePostCommit<'_, S>> {
144        self.managed_state.flush(epoch).await
145    }
146
147    async fn try_flush_data(&mut self) -> StreamExecutorResult<()> {
148        self.managed_state.try_flush().await
149    }
150
151    async fn init(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> {
152        self.managed_state.init_epoch(epoch).await?;
153        self.managed_state
154            .init_topn_cache(NO_GROUP_KEY, &mut self.cache)
155            .await
156    }
157
158    async fn handle_watermark(&mut self, _: Watermark) -> Option<Watermark> {
159        // TODO(yuhao): handle watermark
160        None
161    }
162}
163
164#[cfg(test)]
165mod tests {
166
167    use risingwave_common::array::StreamChunk;
168    use risingwave_common::array::stream_chunk::StreamChunkTestExt;
169    use risingwave_common::catalog::{Field, Schema};
170    use risingwave_common::types::DataType;
171    use risingwave_common::util::epoch::test_epoch;
172    use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
173
174    use super::AppendOnlyTopNExecutor;
175    use crate::executor::test_utils::top_n_executor::create_in_memory_state_table;
176    use crate::executor::test_utils::{MockSource, StreamExecutorTestExt};
177    use crate::executor::{ActorContext, Barrier, Execute, Executor, Message, StreamKey};
178
179    fn create_stream_chunks() -> Vec<StreamChunk> {
180        let chunk1 = StreamChunk::from_pretty(
181            "  I I
182            +  1 0
183            +  2 1
184            +  3 2
185            + 10 3
186            +  9 4
187            +  8 5",
188        );
189        let chunk2 = StreamChunk::from_pretty(
190            "  I I
191            +  7 6
192            +  3 7
193            +  1 8
194            +  9 9",
195        );
196        let chunk3 = StreamChunk::from_pretty(
197            " I  I
198            + 1 12
199            + 1 13
200            + 2 14
201            + 3 15",
202        );
203        vec![chunk1, chunk2, chunk3]
204    }
205
206    fn create_schema() -> Schema {
207        Schema {
208            fields: vec![
209                Field::unnamed(DataType::Int64),
210                Field::unnamed(DataType::Int64),
211            ],
212        }
213    }
214
215    fn storage_key() -> Vec<ColumnOrder> {
216        order_by()
217    }
218
219    fn order_by() -> Vec<ColumnOrder> {
220        vec![
221            ColumnOrder::new(0, OrderType::ascending()),
222            ColumnOrder::new(1, OrderType::ascending()),
223        ]
224    }
225
226    fn stream_key() -> StreamKey {
227        vec![0, 1]
228    }
229
230    fn create_source() -> Executor {
231        let mut chunks = create_stream_chunks();
232        MockSource::with_messages(vec![
233            Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
234            Message::Chunk(std::mem::take(&mut chunks[0])),
235            Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
236            Message::Chunk(std::mem::take(&mut chunks[1])),
237            Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
238            Message::Chunk(std::mem::take(&mut chunks[2])),
239        ])
240        .into_executor(create_schema(), stream_key())
241    }
242
243    #[tokio::test]
244    async fn test_append_only_top_n_executor_with_limit() {
245        let storage_key = storage_key();
246        let source = create_source();
247        let state_table = create_in_memory_state_table(
248            &[DataType::Int64, DataType::Int64],
249            &[OrderType::ascending(), OrderType::ascending()],
250            &stream_key(),
251        )
252        .await;
253
254        let schema = source.schema().clone();
255        let top_n = AppendOnlyTopNExecutor::<_, false>::new(
256            source,
257            ActorContext::for_test(0),
258            schema,
259            storage_key,
260            (0, 5),
261            order_by(),
262            state_table,
263        )
264        .unwrap();
265        let mut top_n = top_n.boxed().execute();
266
267        // consume the init epoch
268        top_n.expect_barrier().await;
269        assert_eq!(
270            top_n.expect_chunk().await.sort_rows(),
271            StreamChunk::from_pretty(
272                "  I I
273                +  1 0
274                +  2 1
275                +  3 2
276                +  9 4
277                +  8 5"
278            )
279            .sort_rows(),
280        );
281        // We added (1, 2, 3, 10, 9, 8).
282        // Now (1, 2, 3, 8, 9)
283        // Barrier
284        top_n.expect_barrier().await;
285        assert_eq!(
286            top_n.expect_chunk().await.sort_rows(),
287            StreamChunk::from_pretty(
288                " I I
289                - 9 4
290                - 8 5
291                + 3 7
292                + 1 8"
293            )
294            .sort_rows(),
295        );
296        // We added (7, 3, 1, 9).
297        // Now (1, 1, 2, 3, 3)
298        // Barrier
299        top_n.expect_barrier().await;
300        assert_eq!(
301            top_n.expect_chunk().await.sort_rows(),
302            StreamChunk::from_pretty(
303                " I  I
304                - 3  7
305                + 1 12
306                - 3  2
307                + 1 13"
308            )
309            .sort_rows(),
310        );
311        // We added (1, 1, 2, 3).
312        // Now (1, 1, 1, 1, 2)
313    }
314
315    #[tokio::test]
316    async fn test_append_only_top_n_executor_with_offset_and_limit() {
317        let source = create_source();
318        let state_table = create_in_memory_state_table(
319            &[DataType::Int64, DataType::Int64],
320            &[OrderType::ascending(), OrderType::ascending()],
321            &stream_key(),
322        )
323        .await;
324
325        let schema = source.schema().clone();
326        let top_n = AppendOnlyTopNExecutor::<_, false>::new(
327            source,
328            ActorContext::for_test(0),
329            schema,
330            storage_key(),
331            (3, 4),
332            order_by(),
333            state_table,
334        )
335        .unwrap();
336        let mut top_n = top_n.boxed().execute();
337
338        // consume the init epoch
339        top_n.expect_barrier().await;
340        assert_eq!(
341            top_n.expect_chunk().await.sort_rows(),
342            StreamChunk::from_pretty(
343                "  I I
344                + 10 3
345                +  9 4
346                +  8 5"
347            )
348            .sort_rows(),
349        );
350        // We added (1, 2, 3, 10, 9, 8).
351        // Now (1, 2, 3) -> (8, 9, 10)
352        // barrier
353        top_n.expect_barrier().await;
354        assert_eq!(
355            top_n.expect_chunk().await.sort_rows(),
356            StreamChunk::from_pretty(
357                "  I I
358                +  7 6
359                - 10 3
360                +  3 7
361                -  9 4
362                +  3 2"
363            )
364            .sort_rows(),
365        );
366        // We added (7, 3, 1, 9).
367        // Now (1, 1, 2) -> (3, 3, 7, 8)
368        // barrier
369        top_n.expect_barrier().await;
370        assert_eq!(
371            top_n.expect_chunk().await.sort_rows(),
372            StreamChunk::from_pretty(
373                " I  I
374                - 8  5
375                + 2  1
376                - 7  6
377                + 1 13
378                - 3  7
379                + 2 14"
380            )
381            .sort_rows(),
382        );
383        // We added (1, 1, 2, 3).
384        // Now (1, 1, 1) -> (1, 2, 2, 3)
385    }
386}