risingwave_stream/executor/eowc/
sort.rs1use risingwave_common::array::Op;
16
17use super::sort_buffer::SortBuffer;
18use crate::executor::prelude::*;
19
20pub struct SortExecutor<S: StateStore> {
21 input: Executor,
22 inner: ExecutorInner<S>,
23}
24
25pub struct SortExecutorArgs<S: StateStore> {
26 pub actor_ctx: ActorContextRef,
27
28 pub input: Executor,
29
30 pub schema: Schema,
31 pub buffer_table: StateTable<S>,
32 pub chunk_size: usize,
33 pub sort_column_index: usize,
34}
35
36struct ExecutorInner<S: StateStore> {
37 actor_ctx: ActorContextRef,
38
39 schema: Schema,
40 buffer_table: StateTable<S>,
41 chunk_size: usize,
42 sort_column_index: usize,
43}
44
45struct ExecutionVars<S: StateStore> {
46 buffer: SortBuffer<S>,
47}
48
49impl<S: StateStore> Execute for SortExecutor<S> {
50 fn execute(self: Box<Self>) -> BoxedMessageStream {
51 self.executor_inner().boxed()
52 }
53}
54
55impl<S: StateStore> SortExecutor<S> {
56 pub fn new(args: SortExecutorArgs<S>) -> Self {
57 Self {
58 input: args.input,
59 inner: ExecutorInner {
60 actor_ctx: args.actor_ctx,
61 schema: args.schema,
62 buffer_table: args.buffer_table,
63 chunk_size: args.chunk_size,
64 sort_column_index: args.sort_column_index,
65 },
66 }
67 }
68
69 #[try_stream(ok = Message, error = StreamExecutorError)]
70 async fn executor_inner(self) {
71 let Self {
72 input,
73 inner: mut this,
74 } = self;
75
76 let mut input = input.execute();
77
78 let barrier = expect_first_barrier(&mut input).await?;
79 let first_epoch = barrier.epoch;
80 yield Message::Barrier(barrier);
81 this.buffer_table.init_epoch(first_epoch).await?;
82
83 let mut vars = ExecutionVars {
84 buffer: SortBuffer::new(this.sort_column_index, &this.buffer_table),
85 };
86
87 vars.buffer.refill_cache(None, &this.buffer_table).await?;
89
90 #[for_await]
91 for msg in input {
92 match msg? {
93 Message::Watermark(watermark @ Watermark { col_idx, .. })
94 if col_idx == this.sort_column_index =>
95 {
96 let mut chunk_builder =
97 StreamChunkBuilder::new(this.chunk_size, this.schema.data_types());
98
99 #[for_await]
100 for row in vars
101 .buffer
102 .consume(watermark.val.clone(), &mut this.buffer_table)
103 {
104 let row = row?;
105 if let Some(chunk) = chunk_builder.append_row(Op::Insert, row) {
106 yield Message::Chunk(chunk);
107 }
108 }
109 if let Some(chunk) = chunk_builder.take() {
110 yield Message::Chunk(chunk);
111 }
112
113 yield Message::Watermark(watermark);
114 }
115 Message::Watermark(_) => {
116 continue;
118 }
119 Message::Chunk(chunk) => {
120 vars.buffer.apply_chunk(chunk, &mut this.buffer_table);
121 this.buffer_table.try_flush().await?;
122 }
123 Message::Barrier(barrier) => {
124 let post_commit = this.buffer_table.commit(barrier.epoch).await?;
125 let update_vnode_bitmap = barrier.as_update_vnode_bitmap(this.actor_ctx.id);
126 yield Message::Barrier(barrier);
127
128 if post_commit
130 .post_yield_barrier(update_vnode_bitmap)
131 .await?
132 .is_some()
133 {
134 vars.buffer.refill_cache(None, &this.buffer_table).await?;
139 }
140 }
141 }
142 }
143 }
144}
145
146#[cfg(test)]
147mod tests {
148 use risingwave_common::array::stream_chunk::StreamChunkTestExt;
149 use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, TableId};
150 use risingwave_common::util::epoch::test_epoch;
151 use risingwave_common::util::sort_util::OrderType;
152 use risingwave_storage::memory::MemoryStateStore;
153
154 use super::*;
155 use crate::common::table::test_utils::gen_pbtable;
156 use crate::executor::test_utils::{MessageSender, MockSource, StreamExecutorTestExt};
157
158 async fn create_executor<S: StateStore>(
159 sort_column_index: usize,
160 store: S,
161 ) -> (MessageSender, BoxedMessageStream) {
162 let input_schema = Schema::new(vec![
163 Field::unnamed(DataType::Int64), Field::unnamed(DataType::Int64),
165 ]);
166 let input_stream_key = vec![0];
167
168 let table_columns = vec![
170 ColumnDesc::unnamed(ColumnId::new(0), DataType::Int64),
171 ColumnDesc::unnamed(ColumnId::new(1), DataType::Int64),
172 ];
173
174 let table_pk_indices = vec![sort_column_index, 0];
176 let table_order_types = vec![OrderType::ascending(), OrderType::ascending()];
177 let buffer_table = StateTable::from_table_catalog(
178 &gen_pbtable(
179 TableId::new(1),
180 table_columns,
181 table_order_types,
182 table_pk_indices,
183 0,
184 ),
185 store,
186 None,
187 )
188 .await;
189
190 let (tx, source) = MockSource::channel();
191 let source = source.into_executor(input_schema, input_stream_key);
192 let sort_executor = SortExecutor::new(SortExecutorArgs {
193 actor_ctx: ActorContext::for_test(123),
194 schema: source.schema().clone(),
195 input: source,
196 buffer_table,
197 chunk_size: 1024,
198 sort_column_index,
199 });
200 (tx, sort_executor.boxed().execute())
201 }
202
203 #[tokio::test]
204 async fn test_sort_executor() {
205 let sort_column_index = 1;
206
207 let store = MemoryStateStore::new();
208 let (mut tx, mut sort_executor) = create_executor(sort_column_index, store).await;
209
210 tx.push_barrier(test_epoch(1), false);
212
213 sort_executor.expect_barrier().await;
215
216 tx.push_int64_watermark(0, 0_i64); tx.push_int64_watermark(sort_column_index, 0_i64);
219
220 sort_executor.expect_watermark().await;
222
223 tx.push_chunk(StreamChunk::from_pretty(
225 " I I
226 + 1 1
227 + 2 2
228 + 3 6
229 + 4 7",
230 ));
231
232 tx.push_int64_watermark(0, 3_i64); tx.push_int64_watermark(sort_column_index, 3_i64);
237
238 let chunk = sort_executor.expect_chunk().await;
240 assert_eq!(
241 chunk,
242 StreamChunk::from_pretty(
243 " I I
244 + 1 1
245 + 2 2"
246 )
247 );
248
249 sort_executor.expect_watermark().await;
251
252 tx.push_chunk(StreamChunk::from_pretty(
254 " I I
255 + 98 4
256 + 37 5
257 + 60 8",
258 ));
259
260 tx.push_barrier(test_epoch(2), false);
262
263 sort_executor.expect_barrier().await;
265
266 tx.push_int64_watermark(0, 7_i64); tx.push_int64_watermark(sort_column_index, 7_i64);
271
272 let chunk = sort_executor.expect_chunk().await;
274 assert_eq!(
275 chunk,
276 StreamChunk::from_pretty(
277 " I I
278 + 98 4
279 + 37 5
280 + 3 6"
281 )
282 );
283
284 sort_executor.expect_watermark().await;
286 }
287
288 #[tokio::test]
289 async fn test_sort_executor_fail_over() {
290 let sort_column_index = 1;
291
292 let store = MemoryStateStore::new();
293 let (mut tx, mut sort_executor) = create_executor(sort_column_index, store.clone()).await;
294
295 tx.push_barrier(test_epoch(1), false);
297
298 sort_executor.expect_barrier().await;
300
301 tx.push_int64_watermark(0, 0_i64); tx.push_int64_watermark(sort_column_index, 0_i64);
304
305 sort_executor.expect_watermark().await;
307
308 tx.push_chunk(StreamChunk::from_pretty(
310 " I I
311 + 1 1
312 + 2 2
313 + 3 6
314 + 4 7",
315 ));
316
317 tx.push_barrier(test_epoch(2), false);
319
320 sort_executor.expect_barrier().await;
322
323 let (mut recovered_tx, mut recovered_sort_executor) =
325 create_executor(sort_column_index, store).await;
326
327 recovered_tx.push_barrier(test_epoch(2), false);
329
330 recovered_sort_executor.expect_barrier().await;
332
333 recovered_tx.push_int64_watermark(sort_column_index, 3_i64);
335
336 let chunk = recovered_sort_executor.expect_chunk().await;
338 assert_eq!(
339 chunk,
340 StreamChunk::from_pretty(
341 " I I
342 + 1 1
343 + 2 2"
344 )
345 );
346
347 recovered_sort_executor.expect_watermark().await;
349 }
350}