risingwave_batch_executors/executor/
merge_sort.rs1use std::mem;
16use std::sync::Arc;
17
18use futures_async_stream::try_stream;
19use futures_util::StreamExt;
20use itertools::Itertools;
21use risingwave_common::array::DataChunk;
22use risingwave_common::catalog::Schema;
23use risingwave_common::memory::{MemMonitoredHeap, MemoryContext, MonitoredGlobalAlloc};
24use risingwave_common::types::ToOwnedDatum;
25use risingwave_common::util::sort_util::{ColumnOrder, HeapElem};
26use risingwave_common_estimate_size::EstimateSize;
27
28use super::{BoxedDataChunkStream, BoxedExecutor, Executor};
29use crate::error::{BatchError, Result};
30
31pub struct MergeSortExecutor {
32 inputs: Vec<BoxedExecutor>,
33 column_orders: Arc<Vec<ColumnOrder>>,
34 identity: String,
35 schema: Schema,
36 chunk_size: usize,
37 mem_context: MemoryContext,
38 min_heap: MemMonitoredHeap<HeapElem>,
39 current_chunks: Vec<Option<DataChunk>, MonitoredGlobalAlloc>,
40}
41
42impl Executor for MergeSortExecutor {
43 fn schema(&self) -> &Schema {
44 &self.schema
45 }
46
47 fn identity(&self) -> &str {
48 &self.identity
49 }
50
51 fn execute(self: Box<Self>) -> BoxedDataChunkStream {
52 self.do_execute()
53 }
54}
55
56impl MergeSortExecutor {
57 #[try_stream(boxed, ok = DataChunk, error = BatchError)]
58 async fn do_execute(mut self: Box<Self>) {
59 let mut inputs = vec![];
60 mem::swap(&mut inputs, &mut self.inputs);
61 let mut input_streams = inputs
62 .into_iter()
63 .map(|input| input.execute())
64 .collect_vec();
65 for (input_idx, input_stream) in input_streams.iter_mut().enumerate() {
66 match input_stream.next().await {
67 Some(chunk) => {
68 let chunk = chunk?;
69 self.current_chunks.push(Some(chunk));
70 if let Some(chunk) = &self.current_chunks[input_idx] {
71 let next_row_idx = chunk.next_visible_row_idx(0);
75 self.push_row_into_heap(input_idx, next_row_idx.unwrap());
76 }
77 }
78 None => {
79 self.current_chunks.push(None);
80 }
81 }
82 }
83
84 while !self.min_heap.is_empty() {
85 let mut want_to_produce = self.chunk_size;
88
89 let mut builders: Vec<_> = self
90 .schema
91 .fields
92 .iter()
93 .map(|field| field.data_type.create_array_builder(self.chunk_size))
94 .collect();
95 let mut array_len = 0;
96 while want_to_produce > 0 && !self.min_heap.is_empty() {
97 let top_elem = self.min_heap.pop().unwrap();
98 let child_idx = top_elem.chunk_idx();
99 let cur_chunk = top_elem.chunk();
100 let row_idx = top_elem.elem_idx();
101 for (idx, builder) in builders.iter_mut().enumerate() {
102 let chunk_arr = cur_chunk.column_at(idx);
103 let chunk_arr = chunk_arr.as_ref();
104 let datum = chunk_arr.value_at(row_idx).to_owned_datum();
105 builder.append(&datum);
106 }
107 want_to_produce -= 1;
108 array_len += 1;
109 let possible_next_row_idx = cur_chunk.next_visible_row_idx(row_idx + 1);
111 match possible_next_row_idx {
112 Some(next_row_idx) => {
113 self.push_row_into_heap(child_idx, next_row_idx);
114 }
115 None => {
116 self.get_input_chunk(&mut input_streams, child_idx).await?;
117 if let Some(chunk) = &self.current_chunks[child_idx] {
118 let next_row_idx = chunk.next_visible_row_idx(0);
119 self.push_row_into_heap(child_idx, next_row_idx.unwrap());
120 }
121 }
122 }
123 }
124
125 let columns = builders
126 .into_iter()
127 .map(|builder| builder.finish().into())
128 .collect::<Vec<_>>();
129 let chunk = DataChunk::new(columns, array_len);
130 yield chunk
131 }
132 }
133
134 async fn get_input_chunk(
135 &mut self,
136 input_streams: &mut Vec<BoxedDataChunkStream>,
137 input_idx: usize,
138 ) -> Result<()> {
139 assert!(input_idx < input_streams.len());
140 let res = input_streams[input_idx].next().await;
141 let old = match res {
142 Some(chunk) => {
143 let chunk = chunk?;
144 assert_ne!(chunk.cardinality(), 0);
145 let new_chunk_size = chunk.estimated_heap_size() as i64;
146 let old = self.current_chunks[input_idx].replace(chunk);
147 self.mem_context.add(new_chunk_size);
148 old
149 }
150 None => std::mem::take(&mut self.current_chunks[input_idx]),
151 };
152
153 if let Some(chunk) = old {
154 self.mem_context.add(-(chunk.estimated_heap_size() as i64));
156 }
157
158 Ok(())
159 }
160
161 fn push_row_into_heap(&mut self, input_idx: usize, row_idx: usize) {
162 assert!(input_idx < self.current_chunks.len());
163 let chunk_ref = self.current_chunks[input_idx].as_ref().unwrap();
164 self.min_heap.push(HeapElem::new(
165 self.column_orders.clone(),
166 chunk_ref.clone(),
167 input_idx,
168 row_idx,
169 None,
170 ));
171 }
172}
173
174impl MergeSortExecutor {
175 pub fn new(
176 inputs: Vec<BoxedExecutor>,
177 column_orders: Arc<Vec<ColumnOrder>>,
178 schema: Schema,
179 identity: String,
180 chunk_size: usize,
181 mem_context: MemoryContext,
182 ) -> Self {
183 let inputs_num = inputs.len();
184 Self {
185 inputs,
186 column_orders,
187 identity,
188 schema,
189 chunk_size,
190 min_heap: MemMonitoredHeap::with_capacity(inputs_num, mem_context.clone()),
191 current_chunks: Vec::with_capacity_in(inputs_num, mem_context.global_allocator()),
192 mem_context,
193 }
194 }
195}