risingwave_meta_model_migration/
m20260518_000000_disable_unused_read_prefix_hints.rs

1use std::collections::BTreeSet;
2
3use prost::Message;
4use risingwave_pb::catalog::Table as PbTable;
5use risingwave_pb::id::{FragmentId, TableId};
6use risingwave_pb::stream_plan::stream_node::NodeBody;
7use risingwave_pb::stream_plan::{SinkLogStoreType, StreamNode};
8use sea_orm::{FromQueryResult, Statement};
9use sea_orm_migration::prelude::*;
10use thiserror_ext::AsReport;
11
12#[derive(DeriveMigrationName)]
13pub struct Migration;
14
15#[async_trait::async_trait]
16impl MigrationTrait for Migration {
17    async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
18        let connection = manager.get_connection();
19        let database_backend = connection.get_database_backend();
20
21        let (sql, values) = Query::select()
22            .columns([Fragment::FragmentId, Fragment::StreamNode])
23            .from(Fragment::Table)
24            .build_any(&*database_backend.get_query_builder());
25
26        let fragments = connection
27            .query_all(Statement::from_sql_and_values(
28                database_backend,
29                sql,
30                values,
31            ))
32            .await?;
33
34        let mut table_ids = BTreeSet::new();
35
36        for row in fragments {
37            let fragment = FragmentEntity::from_query_result(&row, "")?;
38            let mut stream_node =
39                StreamNode::decode(fragment.stream_node.as_slice()).map_err(|err| {
40                    DbErr::Custom(format!("failed to decode stream node: {}", err.as_report()))
41                })?;
42
43            if disable_unused_read_prefix_hints(&mut stream_node, &mut table_ids) {
44                manager
45                    .exec_stmt(
46                        Query::update()
47                            .table(Fragment::Table)
48                            .value(Fragment::StreamNode, stream_node.encode_to_vec())
49                            .and_where(Expr::col(Fragment::FragmentId).eq(fragment.fragment_id))
50                            .to_owned(),
51                    )
52                    .await?;
53            }
54        }
55
56        if !table_ids.is_empty() {
57            let table_ids = table_ids.into_iter().collect::<Vec<_>>();
58            manager
59                .exec_stmt(
60                    Query::update()
61                        .table(Table::Table)
62                        .value(Table::ReadPrefixLenHint, 0)
63                        .and_where(Expr::col(Table::TableId).is_in(table_ids))
64                        .and_where(Expr::col(Table::ReadPrefixLenHint).ne(0))
65                        .to_owned(),
66                )
67                .await?;
68        }
69
70        Ok(())
71    }
72
73    async fn down(&self, _manager: &SchemaManager) -> Result<(), DbErr> {
74        Err(DbErr::Custom(
75            "cannot rollback unused read prefix hint migration".to_owned(),
76        ))
77    }
78}
79
80fn disable_unused_read_prefix_hints(
81    stream_node: &mut StreamNode,
82    table_ids: &mut BTreeSet<TableId>,
83) -> bool {
84    let mut changed = false;
85
86    if let Some(node_body) = stream_node.node_body.as_mut() {
87        changed |= match node_body {
88            NodeBody::DynamicFilter(node) => {
89                disable_table_hint(node.left_table.as_mut(), table_ids)
90            }
91            NodeBody::Materialize(node) => {
92                disable_table_hint(node.staging_table.as_mut(), table_ids)
93            }
94            NodeBody::VectorIndexWrite(node) => disable_table_hint(node.table.as_mut(), table_ids),
95            NodeBody::Sink(node) if node.log_store_type == SinkLogStoreType::KvLogStore as i32 => {
96                disable_table_hint(node.table.as_mut(), table_ids)
97            }
98            NodeBody::SyncLogStore(node) => {
99                disable_table_hint(node.log_store_table.as_mut(), table_ids)
100            }
101            _ => false,
102        };
103    }
104
105    for input in &mut stream_node.input {
106        changed |= disable_unused_read_prefix_hints(input, table_ids);
107    }
108
109    changed
110}
111
112fn disable_table_hint(table: Option<&mut PbTable>, table_ids: &mut BTreeSet<TableId>) -> bool {
113    let Some(table) = table else {
114        return false;
115    };
116
117    table_ids.insert(table.id);
118    if table.read_prefix_len_hint == 0 {
119        return false;
120    }
121
122    table.read_prefix_len_hint = 0;
123    true
124}
125
126#[derive(Debug, FromQueryResult)]
127#[sea_orm(entity = "Fragment")]
128struct FragmentEntity {
129    fragment_id: FragmentId,
130    stream_node: Vec<u8>,
131}
132
133#[derive(DeriveIden)]
134enum Fragment {
135    Table,
136    FragmentId,
137    StreamNode,
138}
139
140#[derive(DeriveIden)]
141enum Table {
142    Table,
143    TableId,
144    ReadPrefixLenHint,
145}
146
147#[cfg(test)]
148mod tests {
149    use risingwave_pb::stream_plan::{
150        DynamicFilterNode, MaterializeNode, SinkNode, SyncLogStoreNode, VectorIndexWriteNode,
151    };
152
153    use super::*;
154
155    fn table(id: u32, read_prefix_len_hint: u32) -> PbTable {
156        PbTable {
157            id: TableId::new(id),
158            read_prefix_len_hint,
159            ..Default::default()
160        }
161    }
162
163    #[test]
164    fn disable_only_targeted_table_hints() {
165        let mut stream_node = StreamNode {
166            node_body: Some(NodeBody::DynamicFilter(Box::new(DynamicFilterNode {
167                left_table: Some(table(1, 3)),
168                right_table: Some(table(2, 1)),
169                ..Default::default()
170            }))),
171            input: vec![
172                StreamNode {
173                    node_body: Some(NodeBody::Materialize(Box::new(MaterializeNode {
174                        staging_table: Some(table(3, 2)),
175                        refresh_progress_table: Some(table(4, 1)),
176                        ..Default::default()
177                    }))),
178                    ..Default::default()
179                },
180                StreamNode {
181                    node_body: Some(NodeBody::Sink(Box::new(SinkNode {
182                        table: Some(table(5, 3)),
183                        log_store_type: SinkLogStoreType::KvLogStore as i32,
184                        ..Default::default()
185                    }))),
186                    ..Default::default()
187                },
188                StreamNode {
189                    node_body: Some(NodeBody::Sink(Box::new(SinkNode {
190                        table: Some(table(6, 3)),
191                        log_store_type: SinkLogStoreType::InMemoryLogStore as i32,
192                        ..Default::default()
193                    }))),
194                    ..Default::default()
195                },
196                StreamNode {
197                    node_body: Some(NodeBody::SyncLogStore(Box::new(SyncLogStoreNode {
198                        log_store_table: Some(table(7, 3)),
199                        ..Default::default()
200                    }))),
201                    ..Default::default()
202                },
203                StreamNode {
204                    node_body: Some(NodeBody::VectorIndexWrite(Box::new(VectorIndexWriteNode {
205                        table: Some(table(8, 3)),
206                    }))),
207                    ..Default::default()
208                },
209            ],
210            ..Default::default()
211        };
212
213        let mut table_ids = BTreeSet::new();
214        assert!(disable_unused_read_prefix_hints(
215            &mut stream_node,
216            &mut table_ids
217        ));
218        assert_eq!(
219            table_ids,
220            BTreeSet::from([
221                TableId::new(1),
222                TableId::new(3),
223                TableId::new(5),
224                TableId::new(7),
225                TableId::new(8),
226            ])
227        );
228
229        let Some(NodeBody::DynamicFilter(dynamic_filter)) = &stream_node.node_body else {
230            unreachable!()
231        };
232        assert_eq!(
233            dynamic_filter
234                .left_table
235                .as_ref()
236                .unwrap()
237                .read_prefix_len_hint,
238            0
239        );
240        assert_eq!(
241            dynamic_filter
242                .right_table
243                .as_ref()
244                .unwrap()
245                .read_prefix_len_hint,
246            1
247        );
248
249        let Some(NodeBody::Materialize(materialize)) = &stream_node.input[0].node_body else {
250            unreachable!()
251        };
252        assert_eq!(
253            materialize
254                .staging_table
255                .as_ref()
256                .unwrap()
257                .read_prefix_len_hint,
258            0
259        );
260        assert_eq!(
261            materialize
262                .refresh_progress_table
263                .as_ref()
264                .unwrap()
265                .read_prefix_len_hint,
266            1
267        );
268
269        let Some(NodeBody::Sink(in_memory_sink)) = &stream_node.input[2].node_body else {
270            unreachable!()
271        };
272        assert_eq!(
273            in_memory_sink.table.as_ref().unwrap().read_prefix_len_hint,
274            3
275        );
276    }
277
278    #[test]
279    fn collect_zero_hint_table_without_rewriting_fragment() {
280        let mut stream_node = StreamNode {
281            node_body: Some(NodeBody::SyncLogStore(Box::new(SyncLogStoreNode {
282                log_store_table: Some(table(42, 0)),
283                ..Default::default()
284            }))),
285            ..Default::default()
286        };
287
288        let mut table_ids = BTreeSet::new();
289        assert!(!disable_unused_read_prefix_hints(
290            &mut stream_node,
291            &mut table_ids
292        ));
293        assert_eq!(table_ids, BTreeSet::from([TableId::new(42)]));
294    }
295}