risingwave_stream/executor/eowc/
sort.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::array::Op;
16
17use super::sort_buffer::SortBuffer;
18use crate::executor::prelude::*;
19
20pub struct SortExecutor<S: StateStore> {
21    input: Executor,
22    inner: ExecutorInner<S>,
23}
24
25pub struct SortExecutorArgs<S: StateStore> {
26    pub actor_ctx: ActorContextRef,
27
28    pub input: Executor,
29
30    pub schema: Schema,
31    pub buffer_table: StateTable<S>,
32    pub chunk_size: usize,
33    pub sort_column_index: usize,
34}
35
36struct ExecutorInner<S: StateStore> {
37    actor_ctx: ActorContextRef,
38
39    schema: Schema,
40    buffer_table: StateTable<S>,
41    chunk_size: usize,
42    sort_column_index: usize,
43}
44
45struct ExecutionVars<S: StateStore> {
46    buffer: SortBuffer<S>,
47}
48
49impl<S: StateStore> Execute for SortExecutor<S> {
50    fn execute(self: Box<Self>) -> BoxedMessageStream {
51        self.executor_inner().boxed()
52    }
53}
54
55impl<S: StateStore> SortExecutor<S> {
56    pub fn new(args: SortExecutorArgs<S>) -> Self {
57        Self {
58            input: args.input,
59            inner: ExecutorInner {
60                actor_ctx: args.actor_ctx,
61                schema: args.schema,
62                buffer_table: args.buffer_table,
63                chunk_size: args.chunk_size,
64                sort_column_index: args.sort_column_index,
65            },
66        }
67    }
68
69    #[try_stream(ok = Message, error = StreamExecutorError)]
70    async fn executor_inner(self) {
71        let Self {
72            input,
73            inner: mut this,
74        } = self;
75
76        let mut input = input.execute();
77
78        let barrier = expect_first_barrier(&mut input).await?;
79        let first_epoch = barrier.epoch;
80        yield Message::Barrier(barrier);
81        this.buffer_table.init_epoch(first_epoch).await?;
82
83        let mut vars = ExecutionVars {
84            buffer: SortBuffer::new(this.sort_column_index, &this.buffer_table),
85        };
86
87        // Populate the sort buffer cache on initialization.
88        vars.buffer.refill_cache(None, &this.buffer_table).await?;
89
90        #[for_await]
91        for msg in input {
92            match msg? {
93                Message::Watermark(watermark @ Watermark { col_idx, .. })
94                    if col_idx == this.sort_column_index =>
95                {
96                    let mut chunk_builder =
97                        StreamChunkBuilder::new(this.chunk_size, this.schema.data_types());
98
99                    #[for_await]
100                    for row in vars
101                        .buffer
102                        .consume(watermark.val.clone(), &mut this.buffer_table)
103                    {
104                        let row = row?;
105                        if let Some(chunk) = chunk_builder.append_row(Op::Insert, row) {
106                            yield Message::Chunk(chunk);
107                        }
108                    }
109                    if let Some(chunk) = chunk_builder.take() {
110                        yield Message::Chunk(chunk);
111                    }
112
113                    yield Message::Watermark(watermark);
114                }
115                Message::Watermark(_) => {
116                    // ignore watermarks on other columns
117                    continue;
118                }
119                Message::Chunk(chunk) => {
120                    vars.buffer.apply_chunk(chunk, &mut this.buffer_table);
121                    this.buffer_table.try_flush().await?;
122                }
123                Message::Barrier(barrier) => {
124                    let post_commit = this.buffer_table.commit(barrier.epoch).await?;
125                    let update_vnode_bitmap = barrier.as_update_vnode_bitmap(this.actor_ctx.id);
126                    yield Message::Barrier(barrier);
127
128                    // Update the vnode bitmap for state tables of all agg calls if asked.
129                    if post_commit
130                        .post_yield_barrier(update_vnode_bitmap)
131                        .await?
132                        .is_some()
133                    {
134                        // `SortBuffer` may output data directly from its in-memory cache without
135                        // checking current vnode ownership. Therefore, we must rebuild the cache
136                        // whenever the vnode bitmap is updated to avoid emitting rows that no
137                        // longer belong to this actor.
138                        vars.buffer.refill_cache(None, &this.buffer_table).await?;
139                    }
140                }
141            }
142        }
143    }
144}
145
146#[cfg(test)]
147mod tests {
148    use risingwave_common::array::stream_chunk::StreamChunkTestExt;
149    use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, TableId};
150    use risingwave_common::util::epoch::test_epoch;
151    use risingwave_common::util::sort_util::OrderType;
152    use risingwave_storage::memory::MemoryStateStore;
153
154    use super::*;
155    use crate::common::table::test_utils::gen_pbtable;
156    use crate::executor::test_utils::{MessageSender, MockSource, StreamExecutorTestExt};
157
158    async fn create_executor<S: StateStore>(
159        sort_column_index: usize,
160        store: S,
161    ) -> (MessageSender, BoxedMessageStream) {
162        let input_schema = Schema::new(vec![
163            Field::unnamed(DataType::Int64), // pk
164            Field::unnamed(DataType::Int64),
165        ]);
166        let input_stream_key = vec![0];
167
168        // state table schema = input schema
169        let table_columns = vec![
170            ColumnDesc::unnamed(ColumnId::new(0), DataType::Int64),
171            ColumnDesc::unnamed(ColumnId::new(1), DataType::Int64),
172        ];
173
174        // note that the sort column is the first table pk column to ensure ordering
175        let table_pk_indices = vec![sort_column_index, 0];
176        let table_order_types = vec![OrderType::ascending(), OrderType::ascending()];
177        let buffer_table = StateTable::from_table_catalog(
178            &gen_pbtable(
179                TableId::new(1),
180                table_columns,
181                table_order_types,
182                table_pk_indices,
183                0,
184            ),
185            store,
186            None,
187        )
188        .await;
189
190        let (tx, source) = MockSource::channel();
191        let source = source.into_executor(input_schema, input_stream_key);
192        let sort_executor = SortExecutor::new(SortExecutorArgs {
193            actor_ctx: ActorContext::for_test(123),
194            schema: source.schema().clone(),
195            input: source,
196            buffer_table,
197            chunk_size: 1024,
198            sort_column_index,
199        });
200        (tx, sort_executor.boxed().execute())
201    }
202
203    #[tokio::test]
204    async fn test_sort_executor() {
205        let sort_column_index = 1;
206
207        let store = MemoryStateStore::new();
208        let (mut tx, mut sort_executor) = create_executor(sort_column_index, store).await;
209
210        // Init barrier
211        tx.push_barrier(test_epoch(1), false);
212
213        // Consume the barrier
214        sort_executor.expect_barrier().await;
215
216        // Init watermark
217        tx.push_int64_watermark(0, 0_i64); // expected to be ignored
218        tx.push_int64_watermark(sort_column_index, 0_i64);
219
220        // Consume the watermark
221        sort_executor.expect_watermark().await;
222
223        // Push data chunk1
224        tx.push_chunk(StreamChunk::from_pretty(
225            " I I
226            + 1 1
227            + 2 2
228            + 3 6
229            + 4 7",
230        ));
231
232        // Push watermark1 on an irrelevant column
233        tx.push_int64_watermark(0, 3_i64); // expected to be ignored
234
235        // Push watermark1 on sorted column
236        tx.push_int64_watermark(sort_column_index, 3_i64);
237
238        // Consume the data chunk
239        let chunk = sort_executor.expect_chunk().await;
240        assert_eq!(
241            chunk,
242            StreamChunk::from_pretty(
243                " I I
244                + 1 1
245                + 2 2"
246            )
247        );
248
249        // Consume the watermark
250        sort_executor.expect_watermark().await;
251
252        // Push data chunk2
253        tx.push_chunk(StreamChunk::from_pretty(
254            " I I
255            + 98 4
256            + 37 5
257            + 60 8",
258        ));
259
260        // Push barrier
261        tx.push_barrier(test_epoch(2), false);
262
263        // Consume the barrier
264        sort_executor.expect_barrier().await;
265
266        // Push watermark2 on an irrelevant column
267        tx.push_int64_watermark(0, 7_i64); // expected to be ignored
268
269        // Push watermark2 on sorted column
270        tx.push_int64_watermark(sort_column_index, 7_i64);
271
272        // Consume the data chunk
273        let chunk = sort_executor.expect_chunk().await;
274        assert_eq!(
275            chunk,
276            StreamChunk::from_pretty(
277                " I I
278                + 98 4
279                + 37 5
280                + 3 6"
281            )
282        );
283
284        // Consume the watermark
285        sort_executor.expect_watermark().await;
286    }
287
288    #[tokio::test]
289    async fn test_sort_executor_fail_over() {
290        let sort_column_index = 1;
291
292        let store = MemoryStateStore::new();
293        let (mut tx, mut sort_executor) = create_executor(sort_column_index, store.clone()).await;
294
295        // Init barrier
296        tx.push_barrier(test_epoch(1), false);
297
298        // Consume the barrier
299        sort_executor.expect_barrier().await;
300
301        // Init watermark
302        tx.push_int64_watermark(0, 0_i64); // expected to be ignored
303        tx.push_int64_watermark(sort_column_index, 0_i64);
304
305        // Consume the watermark
306        sort_executor.expect_watermark().await;
307
308        // Push data chunk
309        tx.push_chunk(StreamChunk::from_pretty(
310            " I I
311            + 1 1
312            + 2 2
313            + 3 6
314            + 4 7",
315        ));
316
317        // Push barrier
318        tx.push_barrier(test_epoch(2), false);
319
320        // Consume the barrier
321        sort_executor.expect_barrier().await;
322
323        // Mock fail over
324        let (mut recovered_tx, mut recovered_sort_executor) =
325            create_executor(sort_column_index, store).await;
326
327        // Push barrier
328        recovered_tx.push_barrier(test_epoch(2), false);
329
330        // Consume the barrier
331        recovered_sort_executor.expect_barrier().await;
332
333        // Push watermark on sorted column
334        recovered_tx.push_int64_watermark(sort_column_index, 3_i64);
335
336        // Consume the data chunk
337        let chunk = recovered_sort_executor.expect_chunk().await;
338        assert_eq!(
339            chunk,
340            StreamChunk::from_pretty(
341                " I I
342                + 1 1
343                + 2 2"
344            )
345        );
346
347        // Consume the watermark
348        recovered_sort_executor.expect_watermark().await;
349    }
350}