risingwave_batch_executors/executor/
delete.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16
17use futures_async_stream::try_stream;
18use itertools::Itertools;
19use risingwave_common::array::{
20    Array, ArrayBuilder, DataChunk, Op, PrimitiveArrayBuilder, StreamChunk,
21};
22use risingwave_common::catalog::{Field, Schema, TableId, TableVersionId};
23use risingwave_common::transaction::transaction_id::TxnId;
24use risingwave_common::types::DataType;
25use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
26use risingwave_dml::dml_manager::DmlManagerRef;
27use risingwave_pb::batch_plan::plan_node::NodeBody;
28
29use crate::error::{BatchError, Result};
30use crate::executor::{
31    BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder,
32};
33
34/// [`DeleteExecutor`] implements table deletion with values from its child executor.
35// Note: multiple `DELETE`s in a single epoch, or concurrent `DELETE`s may lead to conflicting
36// records. This is validated and filtered on the first `Materialize`.
37pub struct DeleteExecutor {
38    /// Target table id.
39    table_id: TableId,
40    table_version_id: TableVersionId,
41    pk_indices: Vec<usize>,
42    dml_manager: DmlManagerRef,
43    child: BoxedExecutor,
44    chunk_size: usize,
45    schema: Schema,
46    identity: String,
47    returning: bool,
48    txn_id: TxnId,
49    session_id: u32,
50    upsert: bool,
51    wait_for_persistence: bool,
52}
53
54impl DeleteExecutor {
55    #[expect(clippy::too_many_arguments)]
56    pub fn new(
57        table_id: TableId,
58        table_version_id: TableVersionId,
59        pk_indices: Vec<usize>,
60        dml_manager: DmlManagerRef,
61        child: BoxedExecutor,
62        chunk_size: usize,
63        identity: String,
64        returning: bool,
65        session_id: u32,
66        upsert: bool,
67        wait_for_persistence: bool,
68    ) -> Self {
69        let table_schema = child.schema().clone();
70        let txn_id = dml_manager.gen_txn_id();
71        Self {
72            table_id,
73            table_version_id,
74            pk_indices,
75            dml_manager,
76            child,
77            chunk_size,
78            schema: if returning {
79                table_schema
80            } else {
81                Schema {
82                    fields: vec![Field::unnamed(DataType::Int64)],
83                }
84            },
85            identity,
86            returning,
87            txn_id,
88            session_id,
89            upsert,
90            wait_for_persistence,
91        }
92    }
93}
94
95impl Executor for DeleteExecutor {
96    fn schema(&self) -> &Schema {
97        &self.schema
98    }
99
100    fn identity(&self) -> &str {
101        &self.identity
102    }
103
104    fn execute(self: Box<Self>) -> BoxedDataChunkStream {
105        self.do_execute()
106    }
107}
108
109impl DeleteExecutor {
110    #[try_stream(boxed, ok = DataChunk, error = BatchError)]
111    async fn do_execute(self: Box<Self>) {
112        let pk_indices: HashSet<_> = self.pk_indices.into_iter().collect();
113        let data_types = self.child.schema().data_types();
114        let mut builder = DataChunkBuilder::new(data_types, self.chunk_size);
115
116        let table_dml_handle = self
117            .dml_manager
118            .table_dml_handle(self.table_id, self.table_version_id)?;
119        assert_eq!(
120            table_dml_handle
121                .column_descs()
122                .iter()
123                .filter_map(|c| (!c.is_generated()).then_some(c.data_type.clone()))
124                .collect_vec(),
125            self.child.schema().data_types(),
126            "bad delete schema"
127        );
128        let mut write_handle = table_dml_handle.write_handle(self.session_id, self.txn_id)?;
129
130        write_handle.begin()?;
131
132        // Transform the data chunk to a stream chunk, then write to the source.
133        let write_txn_data = |chunk: DataChunk| async {
134            let cap = chunk.capacity();
135            let stream_chunk = StreamChunk::from_parts(vec![Op::Delete; cap], chunk);
136
137            #[cfg(debug_assertions)]
138            table_dml_handle.check_chunk_schema(&stream_chunk);
139
140            let cardinality = stream_chunk.cardinality();
141            write_handle.write_chunk(stream_chunk).await?;
142
143            Result::Ok(cardinality)
144        };
145
146        let mut rows_deleted = 0;
147
148        #[for_await]
149        for data_chunk in self.child.execute() {
150            let mut data_chunk = data_chunk?;
151            if self.returning {
152                yield data_chunk.clone();
153            }
154            if self.upsert {
155                let (cols, vis) = data_chunk.into_parts();
156                let cap = vis.len();
157                let mut new_cols = Vec::with_capacity(cols.len());
158                // Only keep the primary key columns, pad the rest with null.
159                for (i, col) in cols.into_iter().enumerate() {
160                    if pk_indices.contains(&i) {
161                        new_cols.push(col);
162                    } else {
163                        let mut builder = col.create_builder(cap);
164                        builder.append_n_null(cap);
165                        new_cols.push(builder.finish().into());
166                    }
167                }
168                data_chunk = DataChunk::new(new_cols, vis);
169            }
170            for chunk in builder.append_chunk(data_chunk) {
171                rows_deleted += write_txn_data(chunk).await?;
172            }
173        }
174
175        if let Some(chunk) = builder.consume_all() {
176            rows_deleted += write_txn_data(chunk).await?;
177        }
178
179        if self.wait_for_persistence {
180            write_handle.end_wait_persistence()?.await?;
181        } else {
182            write_handle.end().await?;
183        }
184
185        // create ret value
186        if !self.returning {
187            let mut array_builder = PrimitiveArrayBuilder::<i64>::new(1);
188            array_builder.append(Some(rows_deleted as i64));
189
190            let array = array_builder.finish();
191            let ret_chunk = DataChunk::new(vec![array.into_ref()], 1);
192
193            yield ret_chunk
194        }
195    }
196}
197
198impl BoxedExecutorBuilder for DeleteExecutor {
199    async fn new_boxed_executor(
200        source: &ExecutorBuilder<'_>,
201        inputs: Vec<BoxedExecutor>,
202    ) -> Result<BoxedExecutor> {
203        let [child]: [_; 1] = inputs.try_into().unwrap();
204        let delete_node = try_match_expand!(
205            source.plan_node().get_node_body().unwrap(),
206            NodeBody::Delete
207        )?;
208
209        let table_id = delete_node.table_id;
210
211        Ok(Box::new(Self::new(
212            table_id,
213            delete_node.table_version_id,
214            delete_node
215                .pk_indices
216                .iter()
217                .map(|&idx| idx as usize)
218                .collect(),
219            source.context().dml_manager(),
220            child,
221            source.context().get_config().developer.chunk_size,
222            source.plan_node().get_identity().clone(),
223            delete_node.returning,
224            delete_node.session_id,
225            delete_node.upsert,
226            delete_node.wait_for_persistence,
227        )))
228    }
229}
230
231#[cfg(test)]
232mod tests {
233    use std::sync::Arc;
234
235    use assert_matches::assert_matches;
236    use futures::StreamExt;
237    use risingwave_common::catalog::{
238        ColumnDesc, ColumnId, INITIAL_TABLE_VERSION_ID, schema_test_utils,
239    };
240    use risingwave_common::test_prelude::DataChunkTestExt;
241    use risingwave_common::transaction::transaction_message::TxnMsg;
242    use risingwave_dml::dml_manager::DmlManager;
243
244    use super::*;
245    use crate::executor::test_utils::MockExecutor;
246    use crate::*;
247
248    #[tokio::test]
249    async fn test_delete_executor() -> Result<()> {
250        let dml_manager = Arc::new(DmlManager::for_test());
251
252        // Schema for mock executor.
253        let schema = schema_test_utils::ii();
254        let mut mock_executor = MockExecutor::new(schema.clone());
255
256        // Schema of the table
257        let schema = schema_test_utils::ii();
258
259        mock_executor.add(DataChunk::from_pretty(
260            "i  i
261             1  2
262             3  4
263             5  6
264             7  8
265             9 10",
266        ));
267
268        // Create reader
269        let table_id = TableId::new(0);
270        let column_descs = schema
271            .fields
272            .iter()
273            .enumerate()
274            .map(|(i, field)| ColumnDesc::unnamed(ColumnId::new(i as _), field.data_type.clone()))
275            .collect_vec();
276        // We must create a variable to hold this `Arc<TableDmlHandle>` here, or it will be dropped
277        // due to the `Weak` reference in `DmlManager`.
278        let reader = dml_manager
279            .register_reader(table_id, INITIAL_TABLE_VERSION_ID, &column_descs)
280            .unwrap();
281        let mut reader = reader.stream_reader().into_stream();
282
283        // Delete
284        let delete_executor = Box::new(DeleteExecutor::new(
285            table_id,
286            INITIAL_TABLE_VERSION_ID,
287            vec![0],
288            dml_manager,
289            Box::new(mock_executor),
290            1024,
291            "DeleteExecutor".to_owned(),
292            false,
293            0,
294            false,
295            false,
296        ));
297
298        let handle = tokio::spawn(async move {
299            let fields = &delete_executor.schema().fields;
300            assert_eq!(fields[0].data_type, DataType::Int64);
301
302            let mut stream = delete_executor.execute();
303            let result = stream.next().await.unwrap().unwrap();
304
305            assert_eq!(
306                result.column_at(0).as_int64().iter().collect::<Vec<_>>(),
307                vec![Some(5)] // deleted rows
308            );
309        });
310
311        // Read
312        reader.next().await.unwrap()?.into_begin().unwrap();
313
314        let txn_msg = reader.next().await.unwrap()?;
315        let chunk = txn_msg.as_stream_chunk().unwrap();
316        assert_eq!(chunk.ops().to_vec(), vec![Op::Delete; 5]);
317
318        assert_eq!(
319            chunk.columns()[0].as_int32().iter().collect::<Vec<_>>(),
320            vec![Some(1), Some(3), Some(5), Some(7), Some(9)]
321        );
322
323        assert_eq!(
324            chunk.columns()[1].as_int32().iter().collect::<Vec<_>>(),
325            vec![Some(2), Some(4), Some(6), Some(8), Some(10)]
326        );
327
328        assert_matches!(reader.next().await.unwrap()?, TxnMsg::End(_, None));
329
330        handle.await.unwrap();
331
332        Ok(())
333    }
334}