risingwave_batch_executors/executor/
merge_sort.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::mem;
16use std::sync::Arc;
17
18use futures_async_stream::try_stream;
19use futures_util::StreamExt;
20use itertools::Itertools;
21use risingwave_common::array::DataChunk;
22use risingwave_common::catalog::Schema;
23use risingwave_common::memory::{MemMonitoredHeap, MemoryContext, MonitoredGlobalAlloc};
24use risingwave_common::types::ToOwnedDatum;
25use risingwave_common::util::sort_util::{ColumnOrder, HeapElem};
26use risingwave_common_estimate_size::EstimateSize;
27
28use super::{BoxedDataChunkStream, BoxedExecutor, Executor};
29use crate::error::{BatchError, Result};
30
31pub struct MergeSortExecutor {
32    inputs: Vec<BoxedExecutor>,
33    column_orders: Arc<Vec<ColumnOrder>>,
34    identity: String,
35    schema: Schema,
36    chunk_size: usize,
37    mem_context: MemoryContext,
38    min_heap: MemMonitoredHeap<HeapElem>,
39    current_chunks: Vec<Option<DataChunk>, MonitoredGlobalAlloc>,
40}
41
42impl Executor for MergeSortExecutor {
43    fn schema(&self) -> &Schema {
44        &self.schema
45    }
46
47    fn identity(&self) -> &str {
48        &self.identity
49    }
50
51    fn execute(self: Box<Self>) -> BoxedDataChunkStream {
52        self.do_execute()
53    }
54}
55
56impl MergeSortExecutor {
57    #[try_stream(boxed, ok = DataChunk, error = BatchError)]
58    async fn do_execute(mut self: Box<Self>) {
59        let mut inputs = vec![];
60        mem::swap(&mut inputs, &mut self.inputs);
61        let mut input_streams = inputs
62            .into_iter()
63            .map(|input| input.execute())
64            .collect_vec();
65        for (input_idx, input_stream) in input_streams.iter_mut().enumerate() {
66            match input_stream.next().await {
67                Some(chunk) => {
68                    let chunk = chunk?;
69                    self.current_chunks.push(Some(chunk));
70                    if let Some(chunk) = &self.current_chunks[input_idx] {
71                        // We assume that we would always get a non-empty chunk from the upstream of
72                        // exchange, therefore we are sure that there is at least
73                        // one visible row.
74                        let next_row_idx = chunk.next_visible_row_idx(0);
75                        self.push_row_into_heap(input_idx, next_row_idx.unwrap());
76                    }
77                }
78                None => {
79                    self.current_chunks.push(None);
80                }
81            }
82        }
83
84        while !self.min_heap.is_empty() {
85            // It is possible that we cannot produce this much as
86            // we may run out of input data chunks from sources.
87            let mut want_to_produce = self.chunk_size;
88
89            let mut builders: Vec<_> = self
90                .schema
91                .fields
92                .iter()
93                .map(|field| field.data_type.create_array_builder(self.chunk_size))
94                .collect();
95            let mut array_len = 0;
96            while want_to_produce > 0 && !self.min_heap.is_empty() {
97                let top_elem = self.min_heap.pop().unwrap();
98                let child_idx = top_elem.chunk_idx();
99                let cur_chunk = top_elem.chunk();
100                let row_idx = top_elem.elem_idx();
101                for (idx, builder) in builders.iter_mut().enumerate() {
102                    let chunk_arr = cur_chunk.column_at(idx);
103                    let chunk_arr = chunk_arr.as_ref();
104                    let datum = chunk_arr.value_at(row_idx).to_owned_datum();
105                    builder.append(&datum);
106                }
107                want_to_produce -= 1;
108                array_len += 1;
109                // check whether we have another row from the same chunk being popped
110                let possible_next_row_idx = cur_chunk.next_visible_row_idx(row_idx + 1);
111                match possible_next_row_idx {
112                    Some(next_row_idx) => {
113                        self.push_row_into_heap(child_idx, next_row_idx);
114                    }
115                    None => {
116                        self.get_input_chunk(&mut input_streams, child_idx).await?;
117                        if let Some(chunk) = &self.current_chunks[child_idx] {
118                            let next_row_idx = chunk.next_visible_row_idx(0);
119                            self.push_row_into_heap(child_idx, next_row_idx.unwrap());
120                        }
121                    }
122                }
123            }
124
125            let columns = builders
126                .into_iter()
127                .map(|builder| builder.finish().into())
128                .collect::<Vec<_>>();
129            let chunk = DataChunk::new(columns, array_len);
130            yield chunk
131        }
132    }
133
134    async fn get_input_chunk(
135        &mut self,
136        input_streams: &mut Vec<BoxedDataChunkStream>,
137        input_idx: usize,
138    ) -> Result<()> {
139        assert!(input_idx < input_streams.len());
140        let res = input_streams[input_idx].next().await;
141        let old = match res {
142            Some(chunk) => {
143                let chunk = chunk?;
144                assert_ne!(chunk.cardinality(), 0);
145                let new_chunk_size = chunk.estimated_heap_size() as i64;
146                let old = self.current_chunks[input_idx].replace(chunk);
147                self.mem_context.add(new_chunk_size);
148                old
149            }
150            None => std::mem::take(&mut self.current_chunks[input_idx]),
151        };
152
153        if let Some(chunk) = old {
154            // Reduce the heap size of retired chunk
155            self.mem_context.add(-(chunk.estimated_heap_size() as i64));
156        }
157
158        Ok(())
159    }
160
161    fn push_row_into_heap(&mut self, input_idx: usize, row_idx: usize) {
162        assert!(input_idx < self.current_chunks.len());
163        let chunk_ref = self.current_chunks[input_idx].as_ref().unwrap();
164        self.min_heap.push(HeapElem::new(
165            self.column_orders.clone(),
166            chunk_ref.clone(),
167            input_idx,
168            row_idx,
169            None,
170        ));
171    }
172}
173
174impl MergeSortExecutor {
175    pub fn new(
176        inputs: Vec<BoxedExecutor>,
177        column_orders: Arc<Vec<ColumnOrder>>,
178        schema: Schema,
179        identity: String,
180        chunk_size: usize,
181        mem_context: MemoryContext,
182    ) -> Self {
183        let inputs_num = inputs.len();
184        Self {
185            inputs,
186            column_orders,
187            identity,
188            schema,
189            chunk_size,
190            min_heap: MemMonitoredHeap::with_capacity(inputs_num, mem_context.clone()),
191            current_chunks: Vec::with_capacity_in(inputs_num, mem_context.global_allocator()),
192            mem_context,
193        }
194    }
195}