risingwave_storage/row_serde/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16
17use risingwave_common::catalog::{ColumnDesc, ColumnId};
18use risingwave_common::row::{OwnedRow, Project, RowExt};
19
20pub mod row_serde_util;
21
22pub mod value_serde;
23
24/// Find out the [`ColumnDesc`] selected with a list of [`ColumnId`].
25///
26/// # Returns
27///
28/// A pair of columns and their indexes in input columns
29pub fn find_columns_by_ids(
30    table_columns: &[ColumnDesc],
31    column_ids: &[ColumnId],
32) -> (Vec<ColumnDesc>, Vec<usize>) {
33    if column_ids.is_empty() {
34        // shortcut
35        return (vec![], vec![]);
36    }
37    let id_to_columns = table_columns
38        .iter()
39        .enumerate()
40        .map(|(index, c)| (c.column_id, (c.clone(), index)))
41        .collect::<HashMap<_, _>>();
42    column_ids
43        .iter()
44        .map(|id| id_to_columns.get(id).expect("column id not found").clone())
45        .unzip()
46}
47
48#[derive(Clone)]
49pub struct ColumnMapping {
50    output_indices: Vec<usize>,
51}
52
53#[allow(clippy::len_without_is_empty)]
54impl ColumnMapping {
55    /// Create a mapping with given `table_columns` projected on the `column_ids`.
56    pub fn new(output_indices: Vec<usize>) -> Self {
57        Self { output_indices }
58    }
59
60    /// Project a row with this mapping
61    pub fn project(&self, origin_row: OwnedRow) -> Project<'_, OwnedRow> {
62        origin_row.project(&self.output_indices)
63    }
64}
65
66#[cfg(test)]
67mod test {
68    use std::fmt::Debug;
69
70    use expect_test::{Expect, expect};
71    use risingwave_common::types::DataType;
72
73    use super::*;
74
75    fn check(actual: impl Debug, expect: Expect) {
76        let actual = format!("{:#?}", actual);
77        expect.assert_eq(&actual);
78    }
79
80    #[test]
81    fn test_find_columns_by_ids() {
82        let table_columns = vec![
83            ColumnDesc::unnamed(1.into(), DataType::Varchar),
84            ColumnDesc::unnamed(2.into(), DataType::Int64),
85            ColumnDesc::unnamed(3.into(), DataType::Int16),
86        ];
87        let column_ids = vec![2.into(), 3.into()];
88        let result = find_columns_by_ids(&table_columns, &column_ids);
89        check(
90            result,
91            expect![[r#"
92                (
93                    [
94                        ColumnDesc {
95                            data_type: Int64,
96                            column_id: #2,
97                            name: "",
98                            generated_or_default_column: None,
99                            description: None,
100                            additional_column: AdditionalColumn {
101                                column_type: None,
102                            },
103                            version: Pr13707,
104                            system_column: None,
105                            nullable: true,
106                        },
107                        ColumnDesc {
108                            data_type: Int16,
109                            column_id: #3,
110                            name: "",
111                            generated_or_default_column: None,
112                            description: None,
113                            additional_column: AdditionalColumn {
114                                column_type: None,
115                            },
116                            version: Pr13707,
117                            system_column: None,
118                            nullable: true,
119                        },
120                    ],
121                    [
122                        1,
123                        2,
124                    ],
125                )"#]],
126        );
127
128        let table_columns = vec![
129            ColumnDesc::unnamed(2.into(), DataType::Int64),
130            ColumnDesc::unnamed(1.into(), DataType::Varchar),
131            ColumnDesc::unnamed(3.into(), DataType::Int16),
132        ];
133        let column_ids = vec![2.into(), 1.into()];
134        let result = find_columns_by_ids(&table_columns, &column_ids);
135        check(
136            result,
137            expect![[r#"
138                (
139                    [
140                        ColumnDesc {
141                            data_type: Int64,
142                            column_id: #2,
143                            name: "",
144                            generated_or_default_column: None,
145                            description: None,
146                            additional_column: AdditionalColumn {
147                                column_type: None,
148                            },
149                            version: Pr13707,
150                            system_column: None,
151                            nullable: true,
152                        },
153                        ColumnDesc {
154                            data_type: Varchar,
155                            column_id: #1,
156                            name: "",
157                            generated_or_default_column: None,
158                            description: None,
159                            additional_column: AdditionalColumn {
160                                column_type: None,
161                            },
162                            version: Pr13707,
163                            system_column: None,
164                            nullable: true,
165                        },
166                    ],
167                    [
168                        0,
169                        1,
170                    ],
171                )"#]],
172        );
173    }
174}