risingwave_frontend/optimizer/plan_node/generic/
log_scan.rsuse std::collections::HashMap;
use std::rc::Rc;
use educe::Educe;
use fixedbitset::FixedBitSet;
use itertools::Itertools;
use pretty_xmlish::Pretty;
use risingwave_common::catalog::{Field, Schema, TableDesc};
use risingwave_common::types::DataType;
use risingwave_common::util::sort_util::ColumnOrder;
use risingwave_hummock_sdk::HummockVersionId;
use crate::catalog::ColumnId;
use crate::optimizer::optimizer_context::OptimizerContextRef;
const OP_NAME: &str = "op";
const OP_TYPE: DataType = DataType::Varchar;
#[derive(Debug, Clone, Educe)]
#[educe(PartialEq, Eq, Hash)]
pub struct LogScan {
pub table_name: String,
pub output_col_idx_with_out_hidden: Vec<usize>,
pub output_col_idx: Vec<usize>,
pub table_desc: Rc<TableDesc>,
pub chunk_size: Option<u32>,
#[educe(PartialEq(ignore))]
#[educe(Hash(ignore))]
pub ctx: OptimizerContextRef,
pub old_epoch: u64,
pub new_epoch: u64,
pub version_id: HummockVersionId,
}
impl LogScan {
pub fn output_column_ids(&self) -> Vec<ColumnId> {
self.output_col_idx
.iter()
.map(|i| self.table_desc.columns[*i].column_id)
.collect()
}
pub fn primary_key(&self) -> &[ColumnOrder] {
&self.table_desc.pk
}
fn column_names_with_table_prefix(&self) -> Vec<String> {
let mut out_column_names: Vec<_> = self
.output_col_idx
.iter()
.map(|&i| format!("{}.{}", self.table_name, self.table_desc.columns[i].name))
.collect();
out_column_names.push(format!("{}.{}", self.table_name, OP_NAME));
out_column_names
}
pub(crate) fn column_names(&self) -> Vec<String> {
let mut out_column_names: Vec<_> = self
.output_col_idx
.iter()
.map(|&i| self.table_desc.columns[i].name.clone())
.collect();
out_column_names.push(OP_NAME.to_string());
out_column_names
}
pub(crate) fn column_names_without_hidden(&self) -> Vec<String> {
let mut out_column_names: Vec<_> = self
.output_col_idx_with_out_hidden
.iter()
.map(|&i| self.table_desc.columns[i].name.clone())
.collect();
out_column_names.push(OP_NAME.to_string());
out_column_names
}
pub fn distribution_key(&self) -> Option<Vec<usize>> {
let tb_idx_to_op_idx = self
.output_col_idx
.iter()
.enumerate()
.map(|(op_idx, tb_idx)| (*tb_idx, op_idx))
.collect::<HashMap<_, _>>();
self.table_desc
.distribution_key
.iter()
.map(|&tb_idx| tb_idx_to_op_idx.get(&tb_idx).cloned())
.collect()
}
pub(crate) fn new(
table_name: String,
output_col_idx_with_out_hidden: Vec<usize>,
output_col_idx: Vec<usize>,
table_desc: Rc<TableDesc>,
ctx: OptimizerContextRef,
old_epoch: u64,
new_epoch: u64,
version_id: HummockVersionId,
) -> Self {
Self {
table_name,
output_col_idx_with_out_hidden,
output_col_idx,
table_desc,
chunk_size: None,
ctx,
old_epoch,
new_epoch,
version_id,
}
}
pub(crate) fn columns_pretty<'a>(&self, verbose: bool) -> Pretty<'a> {
Pretty::Array(
match verbose {
true => self.column_names_with_table_prefix(),
false => self.column_names(),
}
.into_iter()
.map(Pretty::from)
.collect(),
)
}
pub(crate) fn schema(&self) -> Schema {
let mut fields: Vec<_> = self
.output_col_idx
.iter()
.map(|tb_idx| {
let col = &self.table_desc.columns[*tb_idx];
Field::from_with_table_name_prefix(col, &self.table_name)
})
.collect();
fields.push(Field::with_name(
OP_TYPE,
format!("{}.{}", &self.table_name, OP_NAME),
));
Schema { fields }
}
pub(crate) fn out_fields(&self) -> FixedBitSet {
let mut out_fields_vec = self
.output_col_idx
.iter()
.enumerate()
.filter_map(|(index, idx)| {
if self.output_col_idx_with_out_hidden.contains(idx) {
Some(index)
} else {
None
}
})
.collect_vec();
out_fields_vec.push(self.output_col_idx.len());
FixedBitSet::from_iter(out_fields_vec)
}
pub(crate) fn ctx(&self) -> OptimizerContextRef {
self.ctx.clone()
}
}