risingwave_batch_executors/executor/
delete.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use futures_async_stream::try_stream;
16use itertools::Itertools;
17use risingwave_common::array::{
18    Array, ArrayBuilder, DataChunk, Op, PrimitiveArrayBuilder, StreamChunk,
19};
20use risingwave_common::catalog::{Field, Schema, TableId, TableVersionId};
21use risingwave_common::transaction::transaction_id::TxnId;
22use risingwave_common::types::DataType;
23use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
24use risingwave_dml::dml_manager::DmlManagerRef;
25use risingwave_pb::batch_plan::plan_node::NodeBody;
26
27use crate::error::{BatchError, Result};
28use crate::executor::{
29    BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder,
30};
31
32/// [`DeleteExecutor`] implements table deletion with values from its child executor.
33// Note: multiple `DELETE`s in a single epoch, or concurrent `DELETE`s may lead to conflicting
34// records. This is validated and filtered on the first `Materialize`.
35pub struct DeleteExecutor {
36    /// Target table id.
37    table_id: TableId,
38    table_version_id: TableVersionId,
39    dml_manager: DmlManagerRef,
40    child: BoxedExecutor,
41    #[expect(dead_code)]
42    chunk_size: usize,
43    schema: Schema,
44    identity: String,
45    returning: bool,
46    txn_id: TxnId,
47    session_id: u32,
48}
49
50impl DeleteExecutor {
51    pub fn new(
52        table_id: TableId,
53        table_version_id: TableVersionId,
54        dml_manager: DmlManagerRef,
55        child: BoxedExecutor,
56        chunk_size: usize,
57        identity: String,
58        returning: bool,
59        session_id: u32,
60    ) -> Self {
61        let table_schema = child.schema().clone();
62        let txn_id = dml_manager.gen_txn_id();
63        Self {
64            table_id,
65            table_version_id,
66            dml_manager,
67            child,
68            chunk_size,
69            schema: if returning {
70                table_schema
71            } else {
72                Schema {
73                    fields: vec![Field::unnamed(DataType::Int64)],
74                }
75            },
76            identity,
77            returning,
78            txn_id,
79            session_id,
80        }
81    }
82}
83
84impl Executor for DeleteExecutor {
85    fn schema(&self) -> &Schema {
86        &self.schema
87    }
88
89    fn identity(&self) -> &str {
90        &self.identity
91    }
92
93    fn execute(self: Box<Self>) -> BoxedDataChunkStream {
94        self.do_execute()
95    }
96}
97
98impl DeleteExecutor {
99    #[try_stream(boxed, ok = DataChunk, error = BatchError)]
100    async fn do_execute(self: Box<Self>) {
101        let data_types = self.child.schema().data_types();
102        let mut builder = DataChunkBuilder::new(data_types, 1024);
103
104        let table_dml_handle = self
105            .dml_manager
106            .table_dml_handle(self.table_id, self.table_version_id)?;
107        assert_eq!(
108            table_dml_handle
109                .column_descs()
110                .iter()
111                .filter_map(|c| (!c.is_generated()).then_some(c.data_type.clone()))
112                .collect_vec(),
113            self.child.schema().data_types(),
114            "bad delete schema"
115        );
116        let mut write_handle = table_dml_handle.write_handle(self.session_id, self.txn_id)?;
117
118        write_handle.begin()?;
119
120        // Transform the data chunk to a stream chunk, then write to the source.
121        let write_txn_data = |chunk: DataChunk| async {
122            let cap = chunk.capacity();
123            let stream_chunk = StreamChunk::from_parts(vec![Op::Delete; cap], chunk);
124
125            #[cfg(debug_assertions)]
126            table_dml_handle.check_chunk_schema(&stream_chunk);
127
128            let cardinality = stream_chunk.cardinality();
129            write_handle.write_chunk(stream_chunk).await?;
130
131            Result::Ok(cardinality)
132        };
133
134        let mut rows_deleted = 0;
135
136        #[for_await]
137        for data_chunk in self.child.execute() {
138            let data_chunk = data_chunk?;
139            if self.returning {
140                yield data_chunk.clone();
141            }
142            for chunk in builder.append_chunk(data_chunk) {
143                rows_deleted += write_txn_data(chunk).await?;
144            }
145        }
146
147        if let Some(chunk) = builder.consume_all() {
148            rows_deleted += write_txn_data(chunk).await?;
149        }
150
151        write_handle.end().await?;
152
153        // create ret value
154        if !self.returning {
155            let mut array_builder = PrimitiveArrayBuilder::<i64>::new(1);
156            array_builder.append(Some(rows_deleted as i64));
157
158            let array = array_builder.finish();
159            let ret_chunk = DataChunk::new(vec![array.into_ref()], 1);
160
161            yield ret_chunk
162        }
163    }
164}
165
166impl BoxedExecutorBuilder for DeleteExecutor {
167    async fn new_boxed_executor(
168        source: &ExecutorBuilder<'_>,
169        inputs: Vec<BoxedExecutor>,
170    ) -> Result<BoxedExecutor> {
171        let [child]: [_; 1] = inputs.try_into().unwrap();
172        let delete_node = try_match_expand!(
173            source.plan_node().get_node_body().unwrap(),
174            NodeBody::Delete
175        )?;
176
177        let table_id = TableId::new(delete_node.table_id);
178
179        Ok(Box::new(Self::new(
180            table_id,
181            delete_node.table_version_id,
182            source.context().dml_manager(),
183            child,
184            source.context().get_config().developer.chunk_size,
185            source.plan_node().get_identity().clone(),
186            delete_node.returning,
187            delete_node.session_id,
188        )))
189    }
190}
191
192#[cfg(test)]
193mod tests {
194    use std::sync::Arc;
195
196    use futures::StreamExt;
197    use risingwave_common::catalog::{
198        ColumnDesc, ColumnId, INITIAL_TABLE_VERSION_ID, schema_test_utils,
199    };
200    use risingwave_common::test_prelude::DataChunkTestExt;
201    use risingwave_dml::dml_manager::DmlManager;
202
203    use super::*;
204    use crate::executor::test_utils::MockExecutor;
205    use crate::*;
206
207    #[tokio::test]
208    async fn test_delete_executor() -> Result<()> {
209        let dml_manager = Arc::new(DmlManager::for_test());
210
211        // Schema for mock executor.
212        let schema = schema_test_utils::ii();
213        let mut mock_executor = MockExecutor::new(schema.clone());
214
215        // Schema of the table
216        let schema = schema_test_utils::ii();
217
218        mock_executor.add(DataChunk::from_pretty(
219            "i  i
220             1  2
221             3  4
222             5  6
223             7  8
224             9 10",
225        ));
226
227        // Create reader
228        let table_id = TableId::new(0);
229        let column_descs = schema
230            .fields
231            .iter()
232            .enumerate()
233            .map(|(i, field)| ColumnDesc::unnamed(ColumnId::new(i as _), field.data_type.clone()))
234            .collect_vec();
235        // We must create a variable to hold this `Arc<TableDmlHandle>` here, or it will be dropped
236        // due to the `Weak` reference in `DmlManager`.
237        let reader = dml_manager
238            .register_reader(table_id, INITIAL_TABLE_VERSION_ID, &column_descs)
239            .unwrap();
240        let mut reader = reader.stream_reader().into_stream();
241
242        // Delete
243        let delete_executor = Box::new(DeleteExecutor::new(
244            table_id,
245            INITIAL_TABLE_VERSION_ID,
246            dml_manager,
247            Box::new(mock_executor),
248            1024,
249            "DeleteExecutor".to_owned(),
250            false,
251            0,
252        ));
253
254        let handle = tokio::spawn(async move {
255            let fields = &delete_executor.schema().fields;
256            assert_eq!(fields[0].data_type, DataType::Int64);
257
258            let mut stream = delete_executor.execute();
259            let result = stream.next().await.unwrap().unwrap();
260
261            assert_eq!(
262                result.column_at(0).as_int64().iter().collect::<Vec<_>>(),
263                vec![Some(5)] // deleted rows
264            );
265        });
266
267        // Read
268        reader.next().await.unwrap()?.into_begin().unwrap();
269
270        let txn_msg = reader.next().await.unwrap()?;
271        let chunk = txn_msg.as_stream_chunk().unwrap();
272        assert_eq!(chunk.ops().to_vec(), vec![Op::Delete; 5]);
273
274        assert_eq!(
275            chunk.columns()[0].as_int32().iter().collect::<Vec<_>>(),
276            vec![Some(1), Some(3), Some(5), Some(7), Some(9)]
277        );
278
279        assert_eq!(
280            chunk.columns()[1].as_int32().iter().collect::<Vec<_>>(),
281            vec![Some(2), Some(4), Some(6), Some(8), Some(10)]
282        );
283
284        reader.next().await.unwrap()?.into_end().unwrap();
285
286        handle.await.unwrap();
287
288        Ok(())
289    }
290}