risingwave_batch_executors/executor/
delete.rs1use std::collections::HashSet;
16
17use futures_async_stream::try_stream;
18use itertools::Itertools;
19use risingwave_common::array::{
20 Array, ArrayBuilder, DataChunk, Op, PrimitiveArrayBuilder, StreamChunk,
21};
22use risingwave_common::catalog::{Field, Schema, TableId, TableVersionId};
23use risingwave_common::transaction::transaction_id::TxnId;
24use risingwave_common::types::DataType;
25use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
26use risingwave_dml::dml_manager::DmlManagerRef;
27use risingwave_pb::batch_plan::plan_node::NodeBody;
28
29use crate::error::{BatchError, Result};
30use crate::executor::{
31 BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder,
32};
33
34pub struct DeleteExecutor {
38 table_id: TableId,
40 table_version_id: TableVersionId,
41 pk_indices: Vec<usize>,
42 dml_manager: DmlManagerRef,
43 child: BoxedExecutor,
44 chunk_size: usize,
45 schema: Schema,
46 identity: String,
47 returning: bool,
48 txn_id: TxnId,
49 session_id: u32,
50 upsert: bool,
51 wait_for_persistence: bool,
52}
53
54impl DeleteExecutor {
55 #[expect(clippy::too_many_arguments)]
56 pub fn new(
57 table_id: TableId,
58 table_version_id: TableVersionId,
59 pk_indices: Vec<usize>,
60 dml_manager: DmlManagerRef,
61 child: BoxedExecutor,
62 chunk_size: usize,
63 identity: String,
64 returning: bool,
65 session_id: u32,
66 upsert: bool,
67 wait_for_persistence: bool,
68 ) -> Self {
69 let table_schema = child.schema().clone();
70 let txn_id = dml_manager.gen_txn_id();
71 Self {
72 table_id,
73 table_version_id,
74 pk_indices,
75 dml_manager,
76 child,
77 chunk_size,
78 schema: if returning {
79 table_schema
80 } else {
81 Schema {
82 fields: vec![Field::unnamed(DataType::Int64)],
83 }
84 },
85 identity,
86 returning,
87 txn_id,
88 session_id,
89 upsert,
90 wait_for_persistence,
91 }
92 }
93}
94
95impl Executor for DeleteExecutor {
96 fn schema(&self) -> &Schema {
97 &self.schema
98 }
99
100 fn identity(&self) -> &str {
101 &self.identity
102 }
103
104 fn execute(self: Box<Self>) -> BoxedDataChunkStream {
105 self.do_execute()
106 }
107}
108
109impl DeleteExecutor {
110 #[try_stream(boxed, ok = DataChunk, error = BatchError)]
111 async fn do_execute(self: Box<Self>) {
112 let pk_indices: HashSet<_> = self.pk_indices.into_iter().collect();
113 let data_types = self.child.schema().data_types();
114 let mut builder = DataChunkBuilder::new(data_types, self.chunk_size);
115
116 let table_dml_handle = self
117 .dml_manager
118 .table_dml_handle(self.table_id, self.table_version_id)?;
119 assert_eq!(
120 table_dml_handle
121 .column_descs()
122 .iter()
123 .filter_map(|c| (!c.is_generated()).then_some(c.data_type.clone()))
124 .collect_vec(),
125 self.child.schema().data_types(),
126 "bad delete schema"
127 );
128 let mut write_handle = table_dml_handle.write_handle(self.session_id, self.txn_id)?;
129
130 write_handle.begin()?;
131
132 let write_txn_data = |chunk: DataChunk| async {
134 let cap = chunk.capacity();
135 let stream_chunk = StreamChunk::from_parts(vec![Op::Delete; cap], chunk);
136
137 #[cfg(debug_assertions)]
138 table_dml_handle.check_chunk_schema(&stream_chunk);
139
140 let cardinality = stream_chunk.cardinality();
141 write_handle.write_chunk(stream_chunk).await?;
142
143 Result::Ok(cardinality)
144 };
145
146 let mut rows_deleted = 0;
147
148 #[for_await]
149 for data_chunk in self.child.execute() {
150 let mut data_chunk = data_chunk?;
151 if self.returning {
152 yield data_chunk.clone();
153 }
154 if self.upsert {
155 let (cols, vis) = data_chunk.into_parts();
156 let cap = vis.len();
157 let mut new_cols = Vec::with_capacity(cols.len());
158 for (i, col) in cols.into_iter().enumerate() {
160 if pk_indices.contains(&i) {
161 new_cols.push(col);
162 } else {
163 let mut builder = col.create_builder(cap);
164 builder.append_n_null(cap);
165 new_cols.push(builder.finish().into());
166 }
167 }
168 data_chunk = DataChunk::new(new_cols, vis);
169 }
170 for chunk in builder.append_chunk(data_chunk) {
171 rows_deleted += write_txn_data(chunk).await?;
172 }
173 }
174
175 if let Some(chunk) = builder.consume_all() {
176 rows_deleted += write_txn_data(chunk).await?;
177 }
178
179 if self.wait_for_persistence {
180 write_handle.end_wait_persistence()?.await?;
181 } else {
182 write_handle.end().await?;
183 }
184
185 if !self.returning {
187 let mut array_builder = PrimitiveArrayBuilder::<i64>::new(1);
188 array_builder.append(Some(rows_deleted as i64));
189
190 let array = array_builder.finish();
191 let ret_chunk = DataChunk::new(vec![array.into_ref()], 1);
192
193 yield ret_chunk
194 }
195 }
196}
197
198impl BoxedExecutorBuilder for DeleteExecutor {
199 async fn new_boxed_executor(
200 source: &ExecutorBuilder<'_>,
201 inputs: Vec<BoxedExecutor>,
202 ) -> Result<BoxedExecutor> {
203 let [child]: [_; 1] = inputs.try_into().unwrap();
204 let delete_node = try_match_expand!(
205 source.plan_node().get_node_body().unwrap(),
206 NodeBody::Delete
207 )?;
208
209 let table_id = delete_node.table_id;
210
211 Ok(Box::new(Self::new(
212 table_id,
213 delete_node.table_version_id,
214 delete_node
215 .pk_indices
216 .iter()
217 .map(|&idx| idx as usize)
218 .collect(),
219 source.context().dml_manager(),
220 child,
221 source.context().get_config().developer.chunk_size,
222 source.plan_node().get_identity().clone(),
223 delete_node.returning,
224 delete_node.session_id,
225 delete_node.upsert,
226 delete_node.wait_for_persistence,
227 )))
228 }
229}
230
231#[cfg(test)]
232mod tests {
233 use std::sync::Arc;
234
235 use assert_matches::assert_matches;
236 use futures::StreamExt;
237 use risingwave_common::catalog::{
238 ColumnDesc, ColumnId, INITIAL_TABLE_VERSION_ID, schema_test_utils,
239 };
240 use risingwave_common::test_prelude::DataChunkTestExt;
241 use risingwave_common::transaction::transaction_message::TxnMsg;
242 use risingwave_dml::dml_manager::DmlManager;
243
244 use super::*;
245 use crate::executor::test_utils::MockExecutor;
246 use crate::*;
247
248 #[tokio::test]
249 async fn test_delete_executor() -> Result<()> {
250 let dml_manager = Arc::new(DmlManager::for_test());
251
252 let schema = schema_test_utils::ii();
254 let mut mock_executor = MockExecutor::new(schema.clone());
255
256 let schema = schema_test_utils::ii();
258
259 mock_executor.add(DataChunk::from_pretty(
260 "i i
261 1 2
262 3 4
263 5 6
264 7 8
265 9 10",
266 ));
267
268 let table_id = TableId::new(0);
270 let column_descs = schema
271 .fields
272 .iter()
273 .enumerate()
274 .map(|(i, field)| ColumnDesc::unnamed(ColumnId::new(i as _), field.data_type.clone()))
275 .collect_vec();
276 let reader = dml_manager
279 .register_reader(table_id, INITIAL_TABLE_VERSION_ID, &column_descs)
280 .unwrap();
281 let mut reader = reader.stream_reader().into_stream();
282
283 let delete_executor = Box::new(DeleteExecutor::new(
285 table_id,
286 INITIAL_TABLE_VERSION_ID,
287 vec![0],
288 dml_manager,
289 Box::new(mock_executor),
290 1024,
291 "DeleteExecutor".to_owned(),
292 false,
293 0,
294 false,
295 false,
296 ));
297
298 let handle = tokio::spawn(async move {
299 let fields = &delete_executor.schema().fields;
300 assert_eq!(fields[0].data_type, DataType::Int64);
301
302 let mut stream = delete_executor.execute();
303 let result = stream.next().await.unwrap().unwrap();
304
305 assert_eq!(
306 result.column_at(0).as_int64().iter().collect::<Vec<_>>(),
307 vec![Some(5)] );
309 });
310
311 reader.next().await.unwrap()?.into_begin().unwrap();
313
314 let txn_msg = reader.next().await.unwrap()?;
315 let chunk = txn_msg.as_stream_chunk().unwrap();
316 assert_eq!(chunk.ops().to_vec(), vec![Op::Delete; 5]);
317
318 assert_eq!(
319 chunk.columns()[0].as_int32().iter().collect::<Vec<_>>(),
320 vec![Some(1), Some(3), Some(5), Some(7), Some(9)]
321 );
322
323 assert_eq!(
324 chunk.columns()[1].as_int32().iter().collect::<Vec<_>>(),
325 vec![Some(2), Some(4), Some(6), Some(8), Some(10)]
326 );
327
328 assert_matches!(reader.next().await.unwrap()?, TxnMsg::End(_, None));
329
330 handle.await.unwrap();
331
332 Ok(())
333 }
334}