risingwave_stream/executor/eowc/
sort.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::array::Op;
16
17use super::sort_buffer::SortBuffer;
18use crate::executor::prelude::*;
19
20pub struct SortExecutor<S: StateStore> {
21    input: Executor,
22    inner: ExecutorInner<S>,
23}
24
25pub struct SortExecutorArgs<S: StateStore> {
26    pub actor_ctx: ActorContextRef,
27
28    pub input: Executor,
29
30    pub schema: Schema,
31    pub buffer_table: StateTable<S>,
32    pub chunk_size: usize,
33    pub sort_column_index: usize,
34}
35
36struct ExecutorInner<S: StateStore> {
37    actor_ctx: ActorContextRef,
38
39    schema: Schema,
40    buffer_table: StateTable<S>,
41    chunk_size: usize,
42    sort_column_index: usize,
43}
44
45struct ExecutionVars<S: StateStore> {
46    buffer: SortBuffer<S>,
47}
48
49impl<S: StateStore> Execute for SortExecutor<S> {
50    fn execute(self: Box<Self>) -> BoxedMessageStream {
51        self.executor_inner().boxed()
52    }
53}
54
55impl<S: StateStore> SortExecutor<S> {
56    pub fn new(args: SortExecutorArgs<S>) -> Self {
57        Self {
58            input: args.input,
59            inner: ExecutorInner {
60                actor_ctx: args.actor_ctx,
61                schema: args.schema,
62                buffer_table: args.buffer_table,
63                chunk_size: args.chunk_size,
64                sort_column_index: args.sort_column_index,
65            },
66        }
67    }
68
69    #[try_stream(ok = Message, error = StreamExecutorError)]
70    async fn executor_inner(self) {
71        let Self {
72            input,
73            inner: mut this,
74        } = self;
75
76        let mut input = input.execute();
77
78        let barrier = expect_first_barrier(&mut input).await?;
79        let first_epoch = barrier.epoch;
80        yield Message::Barrier(barrier);
81        this.buffer_table.init_epoch(first_epoch).await?;
82
83        let mut vars = ExecutionVars {
84            buffer: SortBuffer::new(this.sort_column_index, &this.buffer_table),
85        };
86
87        // Populate the sort buffer cache on initialization.
88        vars.buffer.refill_cache(None, &this.buffer_table).await?;
89
90        #[for_await]
91        for msg in input {
92            match msg? {
93                Message::Watermark(watermark @ Watermark { col_idx, .. })
94                    if col_idx == this.sort_column_index =>
95                {
96                    let mut chunk_builder =
97                        StreamChunkBuilder::new(this.chunk_size, this.schema.data_types());
98
99                    #[for_await]
100                    for row in vars
101                        .buffer
102                        .consume(watermark.val.clone(), &mut this.buffer_table)
103                    {
104                        let row = row?;
105                        if let Some(chunk) = chunk_builder.append_row(Op::Insert, row) {
106                            yield Message::Chunk(chunk);
107                        }
108                    }
109                    if let Some(chunk) = chunk_builder.take() {
110                        yield Message::Chunk(chunk);
111                    }
112
113                    yield Message::Watermark(watermark);
114                }
115                Message::Watermark(_) => {
116                    // ignore watermarks on other columns
117                    continue;
118                }
119                Message::Chunk(chunk) => {
120                    vars.buffer.apply_chunk(chunk, &mut this.buffer_table);
121                    this.buffer_table.try_flush().await?;
122                }
123                Message::Barrier(barrier) => {
124                    let post_commit = this.buffer_table.commit(barrier.epoch).await?;
125                    let update_vnode_bitmap = barrier.as_update_vnode_bitmap(this.actor_ctx.id);
126                    yield Message::Barrier(barrier);
127
128                    // Update the vnode bitmap for state tables of all agg calls if asked.
129                    if let Some((_, cache_may_stale)) =
130                        post_commit.post_yield_barrier(update_vnode_bitmap).await?
131                    {
132                        // Manipulate the cache if necessary.
133                        if cache_may_stale {
134                            vars.buffer.refill_cache(None, &this.buffer_table).await?;
135                        }
136                    }
137                }
138            }
139        }
140    }
141}
142
143#[cfg(test)]
144mod tests {
145    use risingwave_common::array::stream_chunk::StreamChunkTestExt;
146    use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, TableId};
147    use risingwave_common::util::epoch::test_epoch;
148    use risingwave_common::util::sort_util::OrderType;
149    use risingwave_storage::memory::MemoryStateStore;
150
151    use super::*;
152    use crate::common::table::test_utils::gen_pbtable;
153    use crate::executor::test_utils::{MessageSender, MockSource, StreamExecutorTestExt};
154
155    async fn create_executor<S: StateStore>(
156        sort_column_index: usize,
157        store: S,
158    ) -> (MessageSender, BoxedMessageStream) {
159        let input_schema = Schema::new(vec![
160            Field::unnamed(DataType::Int64), // pk
161            Field::unnamed(DataType::Int64),
162        ]);
163        let input_pk_indices = vec![0];
164
165        // state table schema = input schema
166        let table_columns = vec![
167            ColumnDesc::unnamed(ColumnId::new(0), DataType::Int64),
168            ColumnDesc::unnamed(ColumnId::new(1), DataType::Int64),
169        ];
170
171        // note that the sort column is the first table pk column to ensure ordering
172        let table_pk_indices = vec![sort_column_index, 0];
173        let table_order_types = vec![OrderType::ascending(), OrderType::ascending()];
174        let buffer_table = StateTable::from_table_catalog(
175            &gen_pbtable(
176                TableId::new(1),
177                table_columns,
178                table_order_types,
179                table_pk_indices,
180                0,
181            ),
182            store,
183            None,
184        )
185        .await;
186
187        let (tx, source) = MockSource::channel();
188        let source = source.into_executor(input_schema, input_pk_indices);
189        let sort_executor = SortExecutor::new(SortExecutorArgs {
190            actor_ctx: ActorContext::for_test(123),
191            schema: source.schema().clone(),
192            input: source,
193            buffer_table,
194            chunk_size: 1024,
195            sort_column_index,
196        });
197        (tx, sort_executor.boxed().execute())
198    }
199
200    #[tokio::test]
201    async fn test_sort_executor() {
202        let sort_column_index = 1;
203
204        let store = MemoryStateStore::new();
205        let (mut tx, mut sort_executor) = create_executor(sort_column_index, store).await;
206
207        // Init barrier
208        tx.push_barrier(test_epoch(1), false);
209
210        // Consume the barrier
211        sort_executor.expect_barrier().await;
212
213        // Init watermark
214        tx.push_int64_watermark(0, 0_i64); // expected to be ignored
215        tx.push_int64_watermark(sort_column_index, 0_i64);
216
217        // Consume the watermark
218        sort_executor.expect_watermark().await;
219
220        // Push data chunk1
221        tx.push_chunk(StreamChunk::from_pretty(
222            " I I
223            + 1 1
224            + 2 2
225            + 3 6
226            + 4 7",
227        ));
228
229        // Push watermark1 on an irrelevant column
230        tx.push_int64_watermark(0, 3_i64); // expected to be ignored
231
232        // Push watermark1 on sorted column
233        tx.push_int64_watermark(sort_column_index, 3_i64);
234
235        // Consume the data chunk
236        let chunk = sort_executor.expect_chunk().await;
237        assert_eq!(
238            chunk,
239            StreamChunk::from_pretty(
240                " I I
241                + 1 1
242                + 2 2"
243            )
244        );
245
246        // Consume the watermark
247        sort_executor.expect_watermark().await;
248
249        // Push data chunk2
250        tx.push_chunk(StreamChunk::from_pretty(
251            " I I
252            + 98 4
253            + 37 5
254            + 60 8",
255        ));
256
257        // Push barrier
258        tx.push_barrier(test_epoch(2), false);
259
260        // Consume the barrier
261        sort_executor.expect_barrier().await;
262
263        // Push watermark2 on an irrelevant column
264        tx.push_int64_watermark(0, 7_i64); // expected to be ignored
265
266        // Push watermark2 on sorted column
267        tx.push_int64_watermark(sort_column_index, 7_i64);
268
269        // Consume the data chunk
270        let chunk = sort_executor.expect_chunk().await;
271        assert_eq!(
272            chunk,
273            StreamChunk::from_pretty(
274                " I I
275                + 98 4
276                + 37 5
277                + 3 6"
278            )
279        );
280
281        // Consume the watermark
282        sort_executor.expect_watermark().await;
283    }
284
285    #[tokio::test]
286    async fn test_sort_executor_fail_over() {
287        let sort_column_index = 1;
288
289        let store = MemoryStateStore::new();
290        let (mut tx, mut sort_executor) = create_executor(sort_column_index, store.clone()).await;
291
292        // Init barrier
293        tx.push_barrier(test_epoch(1), false);
294
295        // Consume the barrier
296        sort_executor.expect_barrier().await;
297
298        // Init watermark
299        tx.push_int64_watermark(0, 0_i64); // expected to be ignored
300        tx.push_int64_watermark(sort_column_index, 0_i64);
301
302        // Consume the watermark
303        sort_executor.expect_watermark().await;
304
305        // Push data chunk
306        tx.push_chunk(StreamChunk::from_pretty(
307            " I I
308            + 1 1
309            + 2 2
310            + 3 6
311            + 4 7",
312        ));
313
314        // Push barrier
315        tx.push_barrier(test_epoch(2), false);
316
317        // Consume the barrier
318        sort_executor.expect_barrier().await;
319
320        // Mock fail over
321        let (mut recovered_tx, mut recovered_sort_executor) =
322            create_executor(sort_column_index, store).await;
323
324        // Push barrier
325        recovered_tx.push_barrier(test_epoch(2), false);
326
327        // Consume the barrier
328        recovered_sort_executor.expect_barrier().await;
329
330        // Push watermark on sorted column
331        recovered_tx.push_int64_watermark(sort_column_index, 3_i64);
332
333        // Consume the data chunk
334        let chunk = recovered_sort_executor.expect_chunk().await;
335        assert_eq!(
336            chunk,
337            StreamChunk::from_pretty(
338                " I I
339                + 1 1
340                + 2 2"
341            )
342        );
343
344        // Consume the watermark
345        recovered_sort_executor.expect_watermark().await;
346    }
347}