risingwave_stream/executor/eowc/
sort.rs1use risingwave_common::array::Op;
16
17use super::sort_buffer::SortBuffer;
18use crate::executor::prelude::*;
19
20pub struct SortExecutor<S: StateStore> {
21 input: Executor,
22 inner: ExecutorInner<S>,
23}
24
25pub struct SortExecutorArgs<S: StateStore> {
26 pub actor_ctx: ActorContextRef,
27
28 pub input: Executor,
29
30 pub schema: Schema,
31 pub buffer_table: StateTable<S>,
32 pub chunk_size: usize,
33 pub sort_column_index: usize,
34}
35
36struct ExecutorInner<S: StateStore> {
37 actor_ctx: ActorContextRef,
38
39 schema: Schema,
40 buffer_table: StateTable<S>,
41 chunk_size: usize,
42 sort_column_index: usize,
43}
44
45struct ExecutionVars<S: StateStore> {
46 buffer: SortBuffer<S>,
47}
48
49impl<S: StateStore> Execute for SortExecutor<S> {
50 fn execute(self: Box<Self>) -> BoxedMessageStream {
51 self.executor_inner().boxed()
52 }
53}
54
55impl<S: StateStore> SortExecutor<S> {
56 pub fn new(args: SortExecutorArgs<S>) -> Self {
57 Self {
58 input: args.input,
59 inner: ExecutorInner {
60 actor_ctx: args.actor_ctx,
61 schema: args.schema,
62 buffer_table: args.buffer_table,
63 chunk_size: args.chunk_size,
64 sort_column_index: args.sort_column_index,
65 },
66 }
67 }
68
69 #[try_stream(ok = Message, error = StreamExecutorError)]
70 async fn executor_inner(self) {
71 let Self {
72 input,
73 inner: mut this,
74 } = self;
75
76 let mut input = input.execute();
77
78 let barrier = expect_first_barrier(&mut input).await?;
79 let first_epoch = barrier.epoch;
80 yield Message::Barrier(barrier);
81 this.buffer_table.init_epoch(first_epoch).await?;
82
83 let mut vars = ExecutionVars {
84 buffer: SortBuffer::new(this.sort_column_index, &this.buffer_table),
85 };
86
87 vars.buffer.refill_cache(None, &this.buffer_table).await?;
89
90 #[for_await]
91 for msg in input {
92 match msg? {
93 Message::Watermark(watermark @ Watermark { col_idx, .. })
94 if col_idx == this.sort_column_index =>
95 {
96 let mut chunk_builder =
97 StreamChunkBuilder::new(this.chunk_size, this.schema.data_types());
98
99 #[for_await]
100 for row in vars
101 .buffer
102 .consume(watermark.val.clone(), &mut this.buffer_table)
103 {
104 let row = row?;
105 if let Some(chunk) = chunk_builder.append_row(Op::Insert, row) {
106 yield Message::Chunk(chunk);
107 }
108 }
109 if let Some(chunk) = chunk_builder.take() {
110 yield Message::Chunk(chunk);
111 }
112
113 yield Message::Watermark(watermark);
114 }
115 Message::Watermark(_) => {
116 continue;
118 }
119 Message::Chunk(chunk) => {
120 vars.buffer.apply_chunk(chunk, &mut this.buffer_table);
121 this.buffer_table.try_flush().await?;
122 }
123 Message::Barrier(barrier) => {
124 let post_commit = this.buffer_table.commit(barrier.epoch).await?;
125 let update_vnode_bitmap = barrier.as_update_vnode_bitmap(this.actor_ctx.id);
126 yield Message::Barrier(barrier);
127
128 if let Some((_, cache_may_stale)) =
130 post_commit.post_yield_barrier(update_vnode_bitmap).await?
131 {
132 if cache_may_stale {
134 vars.buffer.refill_cache(None, &this.buffer_table).await?;
135 }
136 }
137 }
138 }
139 }
140 }
141}
142
143#[cfg(test)]
144mod tests {
145 use risingwave_common::array::stream_chunk::StreamChunkTestExt;
146 use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, TableId};
147 use risingwave_common::util::epoch::test_epoch;
148 use risingwave_common::util::sort_util::OrderType;
149 use risingwave_storage::memory::MemoryStateStore;
150
151 use super::*;
152 use crate::common::table::test_utils::gen_pbtable;
153 use crate::executor::test_utils::{MessageSender, MockSource, StreamExecutorTestExt};
154
155 async fn create_executor<S: StateStore>(
156 sort_column_index: usize,
157 store: S,
158 ) -> (MessageSender, BoxedMessageStream) {
159 let input_schema = Schema::new(vec![
160 Field::unnamed(DataType::Int64), Field::unnamed(DataType::Int64),
162 ]);
163 let input_pk_indices = vec![0];
164
165 let table_columns = vec![
167 ColumnDesc::unnamed(ColumnId::new(0), DataType::Int64),
168 ColumnDesc::unnamed(ColumnId::new(1), DataType::Int64),
169 ];
170
171 let table_pk_indices = vec![sort_column_index, 0];
173 let table_order_types = vec![OrderType::ascending(), OrderType::ascending()];
174 let buffer_table = StateTable::from_table_catalog(
175 &gen_pbtable(
176 TableId::new(1),
177 table_columns,
178 table_order_types,
179 table_pk_indices,
180 0,
181 ),
182 store,
183 None,
184 )
185 .await;
186
187 let (tx, source) = MockSource::channel();
188 let source = source.into_executor(input_schema, input_pk_indices);
189 let sort_executor = SortExecutor::new(SortExecutorArgs {
190 actor_ctx: ActorContext::for_test(123),
191 schema: source.schema().clone(),
192 input: source,
193 buffer_table,
194 chunk_size: 1024,
195 sort_column_index,
196 });
197 (tx, sort_executor.boxed().execute())
198 }
199
200 #[tokio::test]
201 async fn test_sort_executor() {
202 let sort_column_index = 1;
203
204 let store = MemoryStateStore::new();
205 let (mut tx, mut sort_executor) = create_executor(sort_column_index, store).await;
206
207 tx.push_barrier(test_epoch(1), false);
209
210 sort_executor.expect_barrier().await;
212
213 tx.push_int64_watermark(0, 0_i64); tx.push_int64_watermark(sort_column_index, 0_i64);
216
217 sort_executor.expect_watermark().await;
219
220 tx.push_chunk(StreamChunk::from_pretty(
222 " I I
223 + 1 1
224 + 2 2
225 + 3 6
226 + 4 7",
227 ));
228
229 tx.push_int64_watermark(0, 3_i64); tx.push_int64_watermark(sort_column_index, 3_i64);
234
235 let chunk = sort_executor.expect_chunk().await;
237 assert_eq!(
238 chunk,
239 StreamChunk::from_pretty(
240 " I I
241 + 1 1
242 + 2 2"
243 )
244 );
245
246 sort_executor.expect_watermark().await;
248
249 tx.push_chunk(StreamChunk::from_pretty(
251 " I I
252 + 98 4
253 + 37 5
254 + 60 8",
255 ));
256
257 tx.push_barrier(test_epoch(2), false);
259
260 sort_executor.expect_barrier().await;
262
263 tx.push_int64_watermark(0, 7_i64); tx.push_int64_watermark(sort_column_index, 7_i64);
268
269 let chunk = sort_executor.expect_chunk().await;
271 assert_eq!(
272 chunk,
273 StreamChunk::from_pretty(
274 " I I
275 + 98 4
276 + 37 5
277 + 3 6"
278 )
279 );
280
281 sort_executor.expect_watermark().await;
283 }
284
285 #[tokio::test]
286 async fn test_sort_executor_fail_over() {
287 let sort_column_index = 1;
288
289 let store = MemoryStateStore::new();
290 let (mut tx, mut sort_executor) = create_executor(sort_column_index, store.clone()).await;
291
292 tx.push_barrier(test_epoch(1), false);
294
295 sort_executor.expect_barrier().await;
297
298 tx.push_int64_watermark(0, 0_i64); tx.push_int64_watermark(sort_column_index, 0_i64);
301
302 sort_executor.expect_watermark().await;
304
305 tx.push_chunk(StreamChunk::from_pretty(
307 " I I
308 + 1 1
309 + 2 2
310 + 3 6
311 + 4 7",
312 ));
313
314 tx.push_barrier(test_epoch(2), false);
316
317 sort_executor.expect_barrier().await;
319
320 let (mut recovered_tx, mut recovered_sort_executor) =
322 create_executor(sort_column_index, store).await;
323
324 recovered_tx.push_barrier(test_epoch(2), false);
326
327 recovered_sort_executor.expect_barrier().await;
329
330 recovered_tx.push_int64_watermark(sort_column_index, 3_i64);
332
333 let chunk = recovered_sort_executor.expect_chunk().await;
335 assert_eq!(
336 chunk,
337 StreamChunk::from_pretty(
338 " I I
339 + 1 1
340 + 2 2"
341 )
342 );
343
344 recovered_sort_executor.expect_watermark().await;
346 }
347}