risingwave_batch_executors/executor/
delete.rs1use futures_async_stream::try_stream;
16use itertools::Itertools;
17use risingwave_common::array::{
18 Array, ArrayBuilder, DataChunk, Op, PrimitiveArrayBuilder, StreamChunk,
19};
20use risingwave_common::catalog::{Field, Schema, TableId, TableVersionId};
21use risingwave_common::transaction::transaction_id::TxnId;
22use risingwave_common::types::DataType;
23use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
24use risingwave_dml::dml_manager::DmlManagerRef;
25use risingwave_pb::batch_plan::plan_node::NodeBody;
26
27use crate::error::{BatchError, Result};
28use crate::executor::{
29 BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder,
30};
31
32pub struct DeleteExecutor {
36 table_id: TableId,
38 table_version_id: TableVersionId,
39 dml_manager: DmlManagerRef,
40 child: BoxedExecutor,
41 #[expect(dead_code)]
42 chunk_size: usize,
43 schema: Schema,
44 identity: String,
45 returning: bool,
46 txn_id: TxnId,
47 session_id: u32,
48}
49
50impl DeleteExecutor {
51 pub fn new(
52 table_id: TableId,
53 table_version_id: TableVersionId,
54 dml_manager: DmlManagerRef,
55 child: BoxedExecutor,
56 chunk_size: usize,
57 identity: String,
58 returning: bool,
59 session_id: u32,
60 ) -> Self {
61 let table_schema = child.schema().clone();
62 let txn_id = dml_manager.gen_txn_id();
63 Self {
64 table_id,
65 table_version_id,
66 dml_manager,
67 child,
68 chunk_size,
69 schema: if returning {
70 table_schema
71 } else {
72 Schema {
73 fields: vec![Field::unnamed(DataType::Int64)],
74 }
75 },
76 identity,
77 returning,
78 txn_id,
79 session_id,
80 }
81 }
82}
83
84impl Executor for DeleteExecutor {
85 fn schema(&self) -> &Schema {
86 &self.schema
87 }
88
89 fn identity(&self) -> &str {
90 &self.identity
91 }
92
93 fn execute(self: Box<Self>) -> BoxedDataChunkStream {
94 self.do_execute()
95 }
96}
97
98impl DeleteExecutor {
99 #[try_stream(boxed, ok = DataChunk, error = BatchError)]
100 async fn do_execute(self: Box<Self>) {
101 let data_types = self.child.schema().data_types();
102 let mut builder = DataChunkBuilder::new(data_types, 1024);
103
104 let table_dml_handle = self
105 .dml_manager
106 .table_dml_handle(self.table_id, self.table_version_id)?;
107 assert_eq!(
108 table_dml_handle
109 .column_descs()
110 .iter()
111 .filter_map(|c| (!c.is_generated()).then_some(c.data_type.clone()))
112 .collect_vec(),
113 self.child.schema().data_types(),
114 "bad delete schema"
115 );
116 let mut write_handle = table_dml_handle.write_handle(self.session_id, self.txn_id)?;
117
118 write_handle.begin()?;
119
120 let write_txn_data = |chunk: DataChunk| async {
122 let cap = chunk.capacity();
123 let stream_chunk = StreamChunk::from_parts(vec![Op::Delete; cap], chunk);
124
125 #[cfg(debug_assertions)]
126 table_dml_handle.check_chunk_schema(&stream_chunk);
127
128 let cardinality = stream_chunk.cardinality();
129 write_handle.write_chunk(stream_chunk).await?;
130
131 Result::Ok(cardinality)
132 };
133
134 let mut rows_deleted = 0;
135
136 #[for_await]
137 for data_chunk in self.child.execute() {
138 let data_chunk = data_chunk?;
139 if self.returning {
140 yield data_chunk.clone();
141 }
142 for chunk in builder.append_chunk(data_chunk) {
143 rows_deleted += write_txn_data(chunk).await?;
144 }
145 }
146
147 if let Some(chunk) = builder.consume_all() {
148 rows_deleted += write_txn_data(chunk).await?;
149 }
150
151 write_handle.end().await?;
152
153 if !self.returning {
155 let mut array_builder = PrimitiveArrayBuilder::<i64>::new(1);
156 array_builder.append(Some(rows_deleted as i64));
157
158 let array = array_builder.finish();
159 let ret_chunk = DataChunk::new(vec![array.into_ref()], 1);
160
161 yield ret_chunk
162 }
163 }
164}
165
166impl BoxedExecutorBuilder for DeleteExecutor {
167 async fn new_boxed_executor(
168 source: &ExecutorBuilder<'_>,
169 inputs: Vec<BoxedExecutor>,
170 ) -> Result<BoxedExecutor> {
171 let [child]: [_; 1] = inputs.try_into().unwrap();
172 let delete_node = try_match_expand!(
173 source.plan_node().get_node_body().unwrap(),
174 NodeBody::Delete
175 )?;
176
177 let table_id = TableId::new(delete_node.table_id);
178
179 Ok(Box::new(Self::new(
180 table_id,
181 delete_node.table_version_id,
182 source.context().dml_manager(),
183 child,
184 source.context().get_config().developer.chunk_size,
185 source.plan_node().get_identity().clone(),
186 delete_node.returning,
187 delete_node.session_id,
188 )))
189 }
190}
191
192#[cfg(test)]
193mod tests {
194 use std::sync::Arc;
195
196 use futures::StreamExt;
197 use risingwave_common::catalog::{
198 ColumnDesc, ColumnId, INITIAL_TABLE_VERSION_ID, schema_test_utils,
199 };
200 use risingwave_common::test_prelude::DataChunkTestExt;
201 use risingwave_dml::dml_manager::DmlManager;
202
203 use super::*;
204 use crate::executor::test_utils::MockExecutor;
205 use crate::*;
206
207 #[tokio::test]
208 async fn test_delete_executor() -> Result<()> {
209 let dml_manager = Arc::new(DmlManager::for_test());
210
211 let schema = schema_test_utils::ii();
213 let mut mock_executor = MockExecutor::new(schema.clone());
214
215 let schema = schema_test_utils::ii();
217
218 mock_executor.add(DataChunk::from_pretty(
219 "i i
220 1 2
221 3 4
222 5 6
223 7 8
224 9 10",
225 ));
226
227 let table_id = TableId::new(0);
229 let column_descs = schema
230 .fields
231 .iter()
232 .enumerate()
233 .map(|(i, field)| ColumnDesc::unnamed(ColumnId::new(i as _), field.data_type.clone()))
234 .collect_vec();
235 let reader = dml_manager
238 .register_reader(table_id, INITIAL_TABLE_VERSION_ID, &column_descs)
239 .unwrap();
240 let mut reader = reader.stream_reader().into_stream();
241
242 let delete_executor = Box::new(DeleteExecutor::new(
244 table_id,
245 INITIAL_TABLE_VERSION_ID,
246 dml_manager,
247 Box::new(mock_executor),
248 1024,
249 "DeleteExecutor".to_owned(),
250 false,
251 0,
252 ));
253
254 let handle = tokio::spawn(async move {
255 let fields = &delete_executor.schema().fields;
256 assert_eq!(fields[0].data_type, DataType::Int64);
257
258 let mut stream = delete_executor.execute();
259 let result = stream.next().await.unwrap().unwrap();
260
261 assert_eq!(
262 result.column_at(0).as_int64().iter().collect::<Vec<_>>(),
263 vec![Some(5)] );
265 });
266
267 reader.next().await.unwrap()?.into_begin().unwrap();
269
270 let txn_msg = reader.next().await.unwrap()?;
271 let chunk = txn_msg.as_stream_chunk().unwrap();
272 assert_eq!(chunk.ops().to_vec(), vec![Op::Delete; 5]);
273
274 assert_eq!(
275 chunk.columns()[0].as_int32().iter().collect::<Vec<_>>(),
276 vec![Some(1), Some(3), Some(5), Some(7), Some(9)]
277 );
278
279 assert_eq!(
280 chunk.columns()[1].as_int32().iter().collect::<Vec<_>>(),
281 vec![Some(2), Some(4), Some(6), Some(8), Some(10)]
282 );
283
284 reader.next().await.unwrap()?.into_end().unwrap();
285
286 handle.await.unwrap();
287
288 Ok(())
289 }
290}