risingwave_frontend/optimizer/plan_node/generic/
log_scan.rs1use std::collections::HashMap;
16use std::rc::Rc;
17
18use educe::Educe;
19use fixedbitset::FixedBitSet;
20use itertools::Itertools;
21use pretty_xmlish::Pretty;
22use risingwave_common::catalog::{Field, Schema, TableDesc};
23use risingwave_common::types::DataType;
24use risingwave_common::util::sort_util::ColumnOrder;
25use risingwave_hummock_sdk::HummockVersionId;
26
27use crate::catalog::ColumnId;
28use crate::optimizer::optimizer_context::OptimizerContextRef;
29
30const OP_NAME: &str = "op";
31const OP_TYPE: DataType = DataType::Varchar;
32
33#[derive(Debug, Clone, Educe)]
34#[educe(PartialEq, Eq, Hash)]
35pub struct LogScan {
36 pub table_name: String,
37 pub output_col_idx_with_out_hidden: Vec<usize>,
39 pub output_col_idx: Vec<usize>,
41 pub table_desc: Rc<TableDesc>,
43 pub chunk_size: Option<u32>,
45
46 #[educe(PartialEq(ignore))]
47 #[educe(Hash(ignore))]
48 pub ctx: OptimizerContextRef,
49
50 pub old_epoch: u64,
51 pub new_epoch: u64,
52 pub version_id: HummockVersionId,
53}
54
55impl LogScan {
56 pub fn output_column_ids(&self) -> Vec<ColumnId> {
58 self.output_col_idx
59 .iter()
60 .map(|i| self.table_desc.columns[*i].column_id)
61 .collect()
62 }
63
64 pub fn primary_key(&self) -> &[ColumnOrder] {
65 &self.table_desc.pk
66 }
67
68 fn column_names_with_table_prefix(&self) -> Vec<String> {
69 let mut out_column_names: Vec<_> = self
70 .output_col_idx
71 .iter()
72 .map(|&i| format!("{}.{}", self.table_name, self.table_desc.columns[i].name))
73 .collect();
74 out_column_names.push(format!("{}.{}", self.table_name, OP_NAME));
75 out_column_names
76 }
77
78 pub(crate) fn column_names(&self) -> Vec<String> {
79 let mut out_column_names: Vec<_> = self
80 .output_col_idx
81 .iter()
82 .map(|&i| self.table_desc.columns[i].name.clone())
83 .collect();
84 out_column_names.push(OP_NAME.to_owned());
85 out_column_names
86 }
87
88 pub(crate) fn column_names_without_hidden(&self) -> Vec<String> {
89 let mut out_column_names: Vec<_> = self
90 .output_col_idx_with_out_hidden
91 .iter()
92 .map(|&i| self.table_desc.columns[i].name.clone())
93 .collect();
94 out_column_names.push(OP_NAME.to_owned());
95 out_column_names
96 }
97
98 pub fn distribution_key(&self) -> Option<Vec<usize>> {
99 let tb_idx_to_op_idx = self
100 .output_col_idx
101 .iter()
102 .enumerate()
103 .map(|(op_idx, tb_idx)| (*tb_idx, op_idx))
104 .collect::<HashMap<_, _>>();
105 self.table_desc
106 .distribution_key
107 .iter()
108 .map(|&tb_idx| tb_idx_to_op_idx.get(&tb_idx).cloned())
109 .collect()
110 }
111
112 pub(crate) fn new(
114 table_name: String,
115 output_col_idx_with_out_hidden: Vec<usize>,
116 output_col_idx: Vec<usize>,
117 table_desc: Rc<TableDesc>,
118 ctx: OptimizerContextRef,
119 old_epoch: u64,
120 new_epoch: u64,
121 version_id: HummockVersionId,
122 ) -> Self {
123 Self {
124 table_name,
125 output_col_idx_with_out_hidden,
126 output_col_idx,
127 table_desc,
128 chunk_size: None,
129 ctx,
130 old_epoch,
131 new_epoch,
132 version_id,
133 }
134 }
135
136 pub(crate) fn columns_pretty<'a>(&self, verbose: bool) -> Pretty<'a> {
137 Pretty::Array(
138 match verbose {
139 true => self.column_names_with_table_prefix(),
140 false => self.column_names(),
141 }
142 .into_iter()
143 .map(Pretty::from)
144 .collect(),
145 )
146 }
147
148 pub(crate) fn schema(&self) -> Schema {
149 let mut fields: Vec<_> = self
150 .output_col_idx
151 .iter()
152 .map(|tb_idx| {
153 let col = &self.table_desc.columns[*tb_idx];
154 Field::from_with_table_name_prefix(col, &self.table_name)
155 })
156 .collect();
157 fields.push(Field::with_name(
158 OP_TYPE,
159 format!("{}.{}", &self.table_name, OP_NAME),
160 ));
161 Schema { fields }
162 }
163
164 pub(crate) fn out_fields(&self) -> FixedBitSet {
165 let mut out_fields_vec = self
166 .output_col_idx
167 .iter()
168 .enumerate()
169 .filter_map(|(index, idx)| {
170 if self.output_col_idx_with_out_hidden.contains(idx) {
171 Some(index)
172 } else {
173 None
174 }
175 })
176 .collect_vec();
177 out_fields_vec.push(self.output_col_idx.len());
179 FixedBitSet::from_iter(out_fields_vec)
180 }
181
182 pub(crate) fn ctx(&self) -> OptimizerContextRef {
183 self.ctx.clone()
184 }
185}