risingwave_meta_model_migration/
m20260518_000000_disable_unused_read_prefix_hints.rs1use std::collections::BTreeSet;
2
3use prost::Message;
4use risingwave_pb::catalog::Table as PbTable;
5use risingwave_pb::id::{FragmentId, TableId};
6use risingwave_pb::stream_plan::stream_node::NodeBody;
7use risingwave_pb::stream_plan::{SinkLogStoreType, StreamNode};
8use sea_orm::{FromQueryResult, Statement};
9use sea_orm_migration::prelude::*;
10use thiserror_ext::AsReport;
11
12#[derive(DeriveMigrationName)]
13pub struct Migration;
14
15#[async_trait::async_trait]
16impl MigrationTrait for Migration {
17 async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
18 let connection = manager.get_connection();
19 let database_backend = connection.get_database_backend();
20
21 let (sql, values) = Query::select()
22 .columns([Fragment::FragmentId, Fragment::StreamNode])
23 .from(Fragment::Table)
24 .build_any(&*database_backend.get_query_builder());
25
26 let fragments = connection
27 .query_all(Statement::from_sql_and_values(
28 database_backend,
29 sql,
30 values,
31 ))
32 .await?;
33
34 let mut table_ids = BTreeSet::new();
35
36 for row in fragments {
37 let fragment = FragmentEntity::from_query_result(&row, "")?;
38 let mut stream_node =
39 StreamNode::decode(fragment.stream_node.as_slice()).map_err(|err| {
40 DbErr::Custom(format!("failed to decode stream node: {}", err.as_report()))
41 })?;
42
43 if disable_unused_read_prefix_hints(&mut stream_node, &mut table_ids) {
44 manager
45 .exec_stmt(
46 Query::update()
47 .table(Fragment::Table)
48 .value(Fragment::StreamNode, stream_node.encode_to_vec())
49 .and_where(Expr::col(Fragment::FragmentId).eq(fragment.fragment_id))
50 .to_owned(),
51 )
52 .await?;
53 }
54 }
55
56 if !table_ids.is_empty() {
57 let table_ids = table_ids.into_iter().collect::<Vec<_>>();
58 manager
59 .exec_stmt(
60 Query::update()
61 .table(Table::Table)
62 .value(Table::ReadPrefixLenHint, 0)
63 .and_where(Expr::col(Table::TableId).is_in(table_ids))
64 .and_where(Expr::col(Table::ReadPrefixLenHint).ne(0))
65 .to_owned(),
66 )
67 .await?;
68 }
69
70 Ok(())
71 }
72
73 async fn down(&self, _manager: &SchemaManager) -> Result<(), DbErr> {
74 Err(DbErr::Custom(
75 "cannot rollback unused read prefix hint migration".to_owned(),
76 ))
77 }
78}
79
80fn disable_unused_read_prefix_hints(
81 stream_node: &mut StreamNode,
82 table_ids: &mut BTreeSet<TableId>,
83) -> bool {
84 let mut changed = false;
85
86 if let Some(node_body) = stream_node.node_body.as_mut() {
87 changed |= match node_body {
88 NodeBody::DynamicFilter(node) => {
89 disable_table_hint(node.left_table.as_mut(), table_ids)
90 }
91 NodeBody::Materialize(node) => {
92 disable_table_hint(node.staging_table.as_mut(), table_ids)
93 }
94 NodeBody::VectorIndexWrite(node) => disable_table_hint(node.table.as_mut(), table_ids),
95 NodeBody::Sink(node) if node.log_store_type == SinkLogStoreType::KvLogStore as i32 => {
96 disable_table_hint(node.table.as_mut(), table_ids)
97 }
98 NodeBody::SyncLogStore(node) => {
99 disable_table_hint(node.log_store_table.as_mut(), table_ids)
100 }
101 _ => false,
102 };
103 }
104
105 for input in &mut stream_node.input {
106 changed |= disable_unused_read_prefix_hints(input, table_ids);
107 }
108
109 changed
110}
111
112fn disable_table_hint(table: Option<&mut PbTable>, table_ids: &mut BTreeSet<TableId>) -> bool {
113 let Some(table) = table else {
114 return false;
115 };
116
117 table_ids.insert(table.id);
118 if table.read_prefix_len_hint == 0 {
119 return false;
120 }
121
122 table.read_prefix_len_hint = 0;
123 true
124}
125
126#[derive(Debug, FromQueryResult)]
127#[sea_orm(entity = "Fragment")]
128struct FragmentEntity {
129 fragment_id: FragmentId,
130 stream_node: Vec<u8>,
131}
132
133#[derive(DeriveIden)]
134enum Fragment {
135 Table,
136 FragmentId,
137 StreamNode,
138}
139
140#[derive(DeriveIden)]
141enum Table {
142 Table,
143 TableId,
144 ReadPrefixLenHint,
145}
146
147#[cfg(test)]
148mod tests {
149 use risingwave_pb::stream_plan::{
150 DynamicFilterNode, MaterializeNode, SinkNode, SyncLogStoreNode, VectorIndexWriteNode,
151 };
152
153 use super::*;
154
155 fn table(id: u32, read_prefix_len_hint: u32) -> PbTable {
156 PbTable {
157 id: TableId::new(id),
158 read_prefix_len_hint,
159 ..Default::default()
160 }
161 }
162
163 #[test]
164 fn disable_only_targeted_table_hints() {
165 let mut stream_node = StreamNode {
166 node_body: Some(NodeBody::DynamicFilter(Box::new(DynamicFilterNode {
167 left_table: Some(table(1, 3)),
168 right_table: Some(table(2, 1)),
169 ..Default::default()
170 }))),
171 input: vec![
172 StreamNode {
173 node_body: Some(NodeBody::Materialize(Box::new(MaterializeNode {
174 staging_table: Some(table(3, 2)),
175 refresh_progress_table: Some(table(4, 1)),
176 ..Default::default()
177 }))),
178 ..Default::default()
179 },
180 StreamNode {
181 node_body: Some(NodeBody::Sink(Box::new(SinkNode {
182 table: Some(table(5, 3)),
183 log_store_type: SinkLogStoreType::KvLogStore as i32,
184 ..Default::default()
185 }))),
186 ..Default::default()
187 },
188 StreamNode {
189 node_body: Some(NodeBody::Sink(Box::new(SinkNode {
190 table: Some(table(6, 3)),
191 log_store_type: SinkLogStoreType::InMemoryLogStore as i32,
192 ..Default::default()
193 }))),
194 ..Default::default()
195 },
196 StreamNode {
197 node_body: Some(NodeBody::SyncLogStore(Box::new(SyncLogStoreNode {
198 log_store_table: Some(table(7, 3)),
199 ..Default::default()
200 }))),
201 ..Default::default()
202 },
203 StreamNode {
204 node_body: Some(NodeBody::VectorIndexWrite(Box::new(VectorIndexWriteNode {
205 table: Some(table(8, 3)),
206 }))),
207 ..Default::default()
208 },
209 ],
210 ..Default::default()
211 };
212
213 let mut table_ids = BTreeSet::new();
214 assert!(disable_unused_read_prefix_hints(
215 &mut stream_node,
216 &mut table_ids
217 ));
218 assert_eq!(
219 table_ids,
220 BTreeSet::from([
221 TableId::new(1),
222 TableId::new(3),
223 TableId::new(5),
224 TableId::new(7),
225 TableId::new(8),
226 ])
227 );
228
229 let Some(NodeBody::DynamicFilter(dynamic_filter)) = &stream_node.node_body else {
230 unreachable!()
231 };
232 assert_eq!(
233 dynamic_filter
234 .left_table
235 .as_ref()
236 .unwrap()
237 .read_prefix_len_hint,
238 0
239 );
240 assert_eq!(
241 dynamic_filter
242 .right_table
243 .as_ref()
244 .unwrap()
245 .read_prefix_len_hint,
246 1
247 );
248
249 let Some(NodeBody::Materialize(materialize)) = &stream_node.input[0].node_body else {
250 unreachable!()
251 };
252 assert_eq!(
253 materialize
254 .staging_table
255 .as_ref()
256 .unwrap()
257 .read_prefix_len_hint,
258 0
259 );
260 assert_eq!(
261 materialize
262 .refresh_progress_table
263 .as_ref()
264 .unwrap()
265 .read_prefix_len_hint,
266 1
267 );
268
269 let Some(NodeBody::Sink(in_memory_sink)) = &stream_node.input[2].node_body else {
270 unreachable!()
271 };
272 assert_eq!(
273 in_memory_sink.table.as_ref().unwrap().read_prefix_len_hint,
274 3
275 );
276 }
277
278 #[test]
279 fn collect_zero_hint_table_without_rewriting_fragment() {
280 let mut stream_node = StreamNode {
281 node_body: Some(NodeBody::SyncLogStore(Box::new(SyncLogStoreNode {
282 log_store_table: Some(table(42, 0)),
283 ..Default::default()
284 }))),
285 ..Default::default()
286 };
287
288 let mut table_ids = BTreeSet::new();
289 assert!(!disable_unused_read_prefix_hints(
290 &mut stream_node,
291 &mut table_ids
292 ));
293 assert_eq!(table_ids, BTreeSet::from([TableId::new(42)]));
294 }
295}