risingwave_batch_executors/executor/
delete.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use futures_async_stream::try_stream;
16use itertools::Itertools;
17use risingwave_common::array::{
18    Array, ArrayBuilder, DataChunk, Op, PrimitiveArrayBuilder, StreamChunk,
19};
20use risingwave_common::catalog::{Field, Schema, TableId, TableVersionId};
21use risingwave_common::transaction::transaction_id::TxnId;
22use risingwave_common::types::DataType;
23use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
24use risingwave_dml::dml_manager::DmlManagerRef;
25use risingwave_pb::batch_plan::plan_node::NodeBody;
26
27use crate::error::{BatchError, Result};
28use crate::executor::{
29    BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder,
30};
31
32/// [`DeleteExecutor`] implements table deletion with values from its child executor.
33// Note: multiple `DELETE`s in a single epoch, or concurrent `DELETE`s may lead to conflicting
34// records. This is validated and filtered on the first `Materialize`.
35pub struct DeleteExecutor {
36    /// Target table id.
37    table_id: TableId,
38    table_version_id: TableVersionId,
39    dml_manager: DmlManagerRef,
40    child: BoxedExecutor,
41    chunk_size: usize,
42    schema: Schema,
43    identity: String,
44    returning: bool,
45    txn_id: TxnId,
46    session_id: u32,
47}
48
49impl DeleteExecutor {
50    pub fn new(
51        table_id: TableId,
52        table_version_id: TableVersionId,
53        dml_manager: DmlManagerRef,
54        child: BoxedExecutor,
55        chunk_size: usize,
56        identity: String,
57        returning: bool,
58        session_id: u32,
59    ) -> Self {
60        let table_schema = child.schema().clone();
61        let txn_id = dml_manager.gen_txn_id();
62        Self {
63            table_id,
64            table_version_id,
65            dml_manager,
66            child,
67            chunk_size,
68            schema: if returning {
69                table_schema
70            } else {
71                Schema {
72                    fields: vec![Field::unnamed(DataType::Int64)],
73                }
74            },
75            identity,
76            returning,
77            txn_id,
78            session_id,
79        }
80    }
81}
82
83impl Executor for DeleteExecutor {
84    fn schema(&self) -> &Schema {
85        &self.schema
86    }
87
88    fn identity(&self) -> &str {
89        &self.identity
90    }
91
92    fn execute(self: Box<Self>) -> BoxedDataChunkStream {
93        self.do_execute()
94    }
95}
96
97impl DeleteExecutor {
98    #[try_stream(boxed, ok = DataChunk, error = BatchError)]
99    async fn do_execute(self: Box<Self>) {
100        let data_types = self.child.schema().data_types();
101        let mut builder = DataChunkBuilder::new(data_types, self.chunk_size);
102
103        let table_dml_handle = self
104            .dml_manager
105            .table_dml_handle(self.table_id, self.table_version_id)?;
106        assert_eq!(
107            table_dml_handle
108                .column_descs()
109                .iter()
110                .filter_map(|c| (!c.is_generated()).then_some(c.data_type.clone()))
111                .collect_vec(),
112            self.child.schema().data_types(),
113            "bad delete schema"
114        );
115        let mut write_handle = table_dml_handle.write_handle(self.session_id, self.txn_id)?;
116
117        write_handle.begin()?;
118
119        // Transform the data chunk to a stream chunk, then write to the source.
120        let write_txn_data = |chunk: DataChunk| async {
121            let cap = chunk.capacity();
122            let stream_chunk = StreamChunk::from_parts(vec![Op::Delete; cap], chunk);
123
124            #[cfg(debug_assertions)]
125            table_dml_handle.check_chunk_schema(&stream_chunk);
126
127            let cardinality = stream_chunk.cardinality();
128            write_handle.write_chunk(stream_chunk).await?;
129
130            Result::Ok(cardinality)
131        };
132
133        let mut rows_deleted = 0;
134
135        #[for_await]
136        for data_chunk in self.child.execute() {
137            let data_chunk = data_chunk?;
138            if self.returning {
139                yield data_chunk.clone();
140            }
141            for chunk in builder.append_chunk(data_chunk) {
142                rows_deleted += write_txn_data(chunk).await?;
143            }
144        }
145
146        if let Some(chunk) = builder.consume_all() {
147            rows_deleted += write_txn_data(chunk).await?;
148        }
149
150        write_handle.end().await?;
151
152        // create ret value
153        if !self.returning {
154            let mut array_builder = PrimitiveArrayBuilder::<i64>::new(1);
155            array_builder.append(Some(rows_deleted as i64));
156
157            let array = array_builder.finish();
158            let ret_chunk = DataChunk::new(vec![array.into_ref()], 1);
159
160            yield ret_chunk
161        }
162    }
163}
164
165impl BoxedExecutorBuilder for DeleteExecutor {
166    async fn new_boxed_executor(
167        source: &ExecutorBuilder<'_>,
168        inputs: Vec<BoxedExecutor>,
169    ) -> Result<BoxedExecutor> {
170        let [child]: [_; 1] = inputs.try_into().unwrap();
171        let delete_node = try_match_expand!(
172            source.plan_node().get_node_body().unwrap(),
173            NodeBody::Delete
174        )?;
175
176        let table_id = TableId::new(delete_node.table_id);
177
178        Ok(Box::new(Self::new(
179            table_id,
180            delete_node.table_version_id,
181            source.context().dml_manager(),
182            child,
183            source.context().get_config().developer.chunk_size,
184            source.plan_node().get_identity().clone(),
185            delete_node.returning,
186            delete_node.session_id,
187        )))
188    }
189}
190
191#[cfg(test)]
192mod tests {
193    use std::sync::Arc;
194
195    use futures::StreamExt;
196    use risingwave_common::catalog::{
197        ColumnDesc, ColumnId, INITIAL_TABLE_VERSION_ID, schema_test_utils,
198    };
199    use risingwave_common::test_prelude::DataChunkTestExt;
200    use risingwave_dml::dml_manager::DmlManager;
201
202    use super::*;
203    use crate::executor::test_utils::MockExecutor;
204    use crate::*;
205
206    #[tokio::test]
207    async fn test_delete_executor() -> Result<()> {
208        let dml_manager = Arc::new(DmlManager::for_test());
209
210        // Schema for mock executor.
211        let schema = schema_test_utils::ii();
212        let mut mock_executor = MockExecutor::new(schema.clone());
213
214        // Schema of the table
215        let schema = schema_test_utils::ii();
216
217        mock_executor.add(DataChunk::from_pretty(
218            "i  i
219             1  2
220             3  4
221             5  6
222             7  8
223             9 10",
224        ));
225
226        // Create reader
227        let table_id = TableId::new(0);
228        let column_descs = schema
229            .fields
230            .iter()
231            .enumerate()
232            .map(|(i, field)| ColumnDesc::unnamed(ColumnId::new(i as _), field.data_type.clone()))
233            .collect_vec();
234        // We must create a variable to hold this `Arc<TableDmlHandle>` here, or it will be dropped
235        // due to the `Weak` reference in `DmlManager`.
236        let reader = dml_manager
237            .register_reader(table_id, INITIAL_TABLE_VERSION_ID, &column_descs)
238            .unwrap();
239        let mut reader = reader.stream_reader().into_stream();
240
241        // Delete
242        let delete_executor = Box::new(DeleteExecutor::new(
243            table_id,
244            INITIAL_TABLE_VERSION_ID,
245            dml_manager,
246            Box::new(mock_executor),
247            1024,
248            "DeleteExecutor".to_owned(),
249            false,
250            0,
251        ));
252
253        let handle = tokio::spawn(async move {
254            let fields = &delete_executor.schema().fields;
255            assert_eq!(fields[0].data_type, DataType::Int64);
256
257            let mut stream = delete_executor.execute();
258            let result = stream.next().await.unwrap().unwrap();
259
260            assert_eq!(
261                result.column_at(0).as_int64().iter().collect::<Vec<_>>(),
262                vec![Some(5)] // deleted rows
263            );
264        });
265
266        // Read
267        reader.next().await.unwrap()?.into_begin().unwrap();
268
269        let txn_msg = reader.next().await.unwrap()?;
270        let chunk = txn_msg.as_stream_chunk().unwrap();
271        assert_eq!(chunk.ops().to_vec(), vec![Op::Delete; 5]);
272
273        assert_eq!(
274            chunk.columns()[0].as_int32().iter().collect::<Vec<_>>(),
275            vec![Some(1), Some(3), Some(5), Some(7), Some(9)]
276        );
277
278        assert_eq!(
279            chunk.columns()[1].as_int32().iter().collect::<Vec<_>>(),
280            vec![Some(2), Some(4), Some(6), Some(8), Some(10)]
281        );
282
283        reader.next().await.unwrap()?.into_end().unwrap();
284
285        handle.await.unwrap();
286
287        Ok(())
288    }
289}