risingwave_frontend/optimizer/plan_node/
utils.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::default::Default;
17use std::ops::Bound;
18use std::vec;
19
20use anyhow::anyhow;
21use chrono::{MappedLocalTime, TimeZone};
22use fixedbitset::FixedBitSet;
23use itertools::Itertools;
24use pretty_xmlish::{Pretty, Str, StrAssocArr, XmlNode};
25use risingwave_common::catalog::{
26    ColumnCatalog, ColumnDesc, ConflictBehavior, CreateType, Engine, Field, FieldDisplay, Schema,
27    StreamJobStatus,
28};
29use risingwave_common::constants::log_store::v2::{
30    KV_LOG_STORE_PREDEFINED_COLUMNS, PK_ORDERING, VNODE_COLUMN_INDEX,
31};
32use risingwave_common::hash::VnodeCount;
33use risingwave_common::license::Feature;
34use risingwave_common::types::{DataType, Interval, ScalarImpl, Timestamptz};
35use risingwave_common::util::iter_util::ZipEqFast;
36use risingwave_common::util::scan_range::{ScanRange, is_full_range};
37use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
38use risingwave_connector::source::iceberg::IcebergTimeTravelInfo;
39use risingwave_expr::aggregate::PbAggKind;
40use risingwave_expr::bail;
41use risingwave_sqlparser::ast::AsOf;
42
43use super::generic::{self, GenericPlanRef, PhysicalPlanRef};
44use super::{BatchPlanRef, StreamPlanRef, pretty_config};
45use crate::catalog::table_catalog::TableType;
46use crate::catalog::{ColumnId, FragmentId, TableCatalog, TableId};
47use crate::error::{ErrorCode, Result};
48use crate::expr::InputRef;
49use crate::optimizer::StreamScanType;
50use crate::optimizer::plan_node::generic::Agg;
51use crate::optimizer::plan_node::{BatchSimpleAgg, PlanAggCall};
52use crate::optimizer::property::{Cardinality, Order, RequiredDist, WatermarkColumns};
53use crate::utils::{Condition, IndexSet};
54
55#[derive(Default)]
56pub struct TableCatalogBuilder {
57    /// All columns in this table
58    columns: Vec<ColumnCatalog>,
59    pk: Vec<ColumnOrder>,
60    value_indices: Option<Vec<usize>>,
61    vnode_col_idx: Option<usize>,
62    column_names: HashMap<String, i32>,
63    watermark_columns: Option<FixedBitSet>,
64    dist_key_in_pk: Option<Vec<usize>>,
65    /// Indices of watermark columns in all columns that should be used for state cleaning.
66    clean_watermark_indices: Vec<usize>,
67}
68
69/// For DRY, mainly used for construct internal table catalog in stateful streaming executors.
70/// Be careful of the order of add column.
71impl TableCatalogBuilder {
72    /// Add a column from Field info, return the column index of the table
73    pub fn add_column(&mut self, field: &Field) -> usize {
74        let column_idx = self.columns.len();
75        let column_id = column_idx as i32;
76        // Add column desc.
77        let mut column_desc = ColumnDesc::from_field_with_column_id(field, column_id);
78
79        // Replace dot of the internal table column name with underline.
80        column_desc.name = column_desc.name.replace('.', "_");
81        // Avoid column name duplicate.
82        self.avoid_duplicate_col_name(&mut column_desc);
83
84        self.columns.push(ColumnCatalog {
85            column_desc,
86            // All columns in internal table are invisible to batch query.
87            is_hidden: false,
88        });
89        column_idx
90    }
91
92    /// Extend the columns with column ids reset. The input columns should NOT have duplicate names.
93    ///
94    /// Returns the indices of the extended columns.
95    pub fn extend_columns(&mut self, columns: &[ColumnCatalog]) -> Vec<usize> {
96        let base_idx = self.columns.len();
97        columns.iter().enumerate().for_each(|(i, col)| {
98            assert!(!self.column_names.contains_key(col.name()));
99            self.column_names.insert(col.name().to_owned(), 0);
100
101            // Reset the column id for the columns.
102            let mut new_col = col.clone();
103            new_col.column_desc.column_id = ColumnId::new((base_idx + i) as _);
104            self.columns.push(new_col);
105        });
106        Vec::from_iter(base_idx..(base_idx + columns.len()))
107    }
108
109    /// Check whether need to add a ordered column. Different from value, order desc equal pk in
110    /// semantics and they are encoded as storage key.
111    pub fn add_order_column(&mut self, column_index: usize, order_type: OrderType) {
112        self.pk.push(ColumnOrder::new(column_index, order_type));
113    }
114
115    /// get the current exist field number of the primary key.
116    pub fn get_current_pk_len(&self) -> usize {
117        self.pk.len()
118    }
119
120    pub fn set_vnode_col_idx(&mut self, vnode_col_idx: usize) {
121        self.vnode_col_idx = Some(vnode_col_idx);
122    }
123
124    pub fn set_value_indices(&mut self, value_indices: Vec<usize>) {
125        self.value_indices = Some(value_indices);
126    }
127
128    pub fn set_dist_key_in_pk(&mut self, dist_key_in_pk: Vec<usize>) {
129        self.dist_key_in_pk = Some(dist_key_in_pk);
130    }
131
132    /// Set the indices of watermark columns used for state cleaning.
133    /// These are column indices in the table schema.
134    pub fn set_clean_watermark_indices(&mut self, clean_watermark_indices: Vec<usize>) {
135        self.clean_watermark_indices = clean_watermark_indices;
136    }
137
138    /// Check the column name whether exist before. if true, record occurrence and change the name
139    /// to avoid duplicate.
140    fn avoid_duplicate_col_name(&mut self, column_desc: &mut ColumnDesc) {
141        if let Some(old_identity) = self.column_names.get(&column_desc.name) {
142            let column_name = column_desc.name.clone();
143            let mut identity = *old_identity;
144            loop {
145                column_desc.name = format!("{}_{}", column_name, identity);
146                identity += 1;
147                if !self.column_names.contains_key(&column_desc.name) {
148                    break;
149                }
150            }
151            *self.column_names.get_mut(&column_name).unwrap() = identity;
152        }
153        self.column_names.insert(column_desc.name.clone(), 0);
154    }
155
156    /// Consume builder and create `TableCatalog` (for proto). The `read_prefix_len_hint` is the
157    /// anticipated read prefix pattern (number of fields) for the table, which can be utilized for
158    /// implementing the table's bloom filter or other storage optimization techniques.
159    pub fn build(self, distribution_key: Vec<usize>, read_prefix_len_hint: usize) -> TableCatalog {
160        assert!(read_prefix_len_hint <= self.pk.len());
161        let watermark_columns = match self.watermark_columns {
162            Some(w) => w,
163            None => FixedBitSet::with_capacity(self.columns.len()),
164        };
165
166        // If `dist_key_in_pk` is set, check if it matches with `distribution_key`.
167        // Note that we cannot derive in the opposite direction, because there can be a column
168        // appearing multiple times in the PK.
169        if let Some(dist_key_in_pk) = &self.dist_key_in_pk {
170            let derived_dist_key = dist_key_in_pk
171                .iter()
172                .map(|idx| self.pk[*idx].column_index)
173                .collect_vec();
174            assert_eq!(
175                derived_dist_key, distribution_key,
176                "dist_key mismatch with dist_key_in_pk"
177            );
178        }
179
180        TableCatalog {
181            id: TableId::placeholder(),
182            schema_id: 0.into(),
183            database_id: 0.into(),
184            associated_source_id: None,
185            name: String::new(),
186            columns: self.columns.clone(),
187            pk: self.pk,
188            stream_key: vec![],
189            distribution_key,
190            // NOTE: This should be altered if `TableCatalogBuilder` is used to build something
191            // other than internal tables.
192            table_type: TableType::Internal,
193            append_only: false,
194            owner: risingwave_common::catalog::DEFAULT_SUPER_USER_ID,
195            fragment_id: FragmentId::placeholder(),
196            dml_fragment_id: None,
197            vnode_col_index: self.vnode_col_idx,
198            row_id_index: None,
199            value_indices: self
200                .value_indices
201                .unwrap_or_else(|| (0..self.columns.len()).collect_vec()),
202            definition: "".into(),
203            conflict_behavior: ConflictBehavior::NoCheck,
204            version_column_indices: vec![],
205            read_prefix_len_hint,
206            version: None, // the internal table is not versioned and can't be schema changed
207            watermark_columns,
208            dist_key_in_pk: self.dist_key_in_pk.unwrap_or_default(),
209            cardinality: Cardinality::unknown(), // TODO(card): cardinality of internal table
210            created_at_epoch: None,
211            initialized_at_epoch: None,
212            // NOTE(kwannoel): This may not match the create type of the materialized table.
213            // It should be ignored for internal tables.
214            create_type: CreateType::Foreground,
215            stream_job_status: StreamJobStatus::Creating,
216            description: None,
217            initialized_at_cluster_version: None,
218            created_at_cluster_version: None,
219            retention_seconds: None,
220            cdc_table_id: None,
221            vnode_count: VnodeCount::Placeholder, // will be filled in by the meta service later
222            webhook_info: None,
223            job_id: None,
224            engine: Engine::Hummock,
225            clean_watermark_index_in_pk: None,
226            clean_watermark_indices: self.clean_watermark_indices,
227            refreshable: false, // Internal tables are not refreshable
228            vector_index_info: None,
229            cdc_table_type: None,
230        }
231    }
232
233    pub fn columns(&self) -> &[ColumnCatalog] {
234        &self.columns
235    }
236}
237
238/// See also [`super::generic::DistillUnit`].
239pub trait Distill {
240    fn distill<'a>(&self) -> XmlNode<'a>;
241
242    fn distill_to_string(&self) -> String {
243        let mut config = pretty_config();
244        let mut output = String::with_capacity(2048);
245        config.unicode(&mut output, &Pretty::Record(self.distill()));
246        output
247    }
248}
249
250pub(super) fn childless_record<'a>(
251    name: impl Into<Str<'a>>,
252    fields: StrAssocArr<'a>,
253) -> XmlNode<'a> {
254    XmlNode::simple_record(name, fields, Default::default())
255}
256
257macro_rules! impl_distill_by_unit {
258    ($ty:ty, $core:ident, $name:expr) => {
259        use pretty_xmlish::XmlNode;
260        use $crate::optimizer::plan_node::generic::DistillUnit;
261        use $crate::optimizer::plan_node::utils::Distill;
262        impl Distill for $ty {
263            fn distill<'a>(&self) -> XmlNode<'a> {
264                self.$core.distill_with_name($name)
265            }
266        }
267    };
268}
269pub(crate) use impl_distill_by_unit;
270
271pub(crate) fn column_names_pretty<'a>(schema: &Schema) -> Pretty<'a> {
272    let columns = (schema.fields.iter())
273        .map(|f| f.name.clone())
274        .map(Pretty::from)
275        .collect();
276    Pretty::Array(columns)
277}
278
279pub(crate) fn watermark_pretty<'a>(
280    watermark_columns: &WatermarkColumns,
281    schema: &Schema,
282) -> Option<Pretty<'a>> {
283    if watermark_columns.is_empty() {
284        None
285    } else {
286        let groups = watermark_columns.grouped();
287        let pretty_groups = groups
288            .values()
289            .map(|cols| {
290                Pretty::Array(
291                    cols.indices()
292                        .map(|idx| FieldDisplay(schema.fields.get(idx).unwrap()))
293                        .map(|d| Pretty::display(&d))
294                        .collect::<Vec<_>>(),
295                )
296            })
297            .collect();
298        Some(Pretty::Array(pretty_groups))
299    }
300}
301
302#[derive(Clone, Copy)]
303pub struct IndicesDisplay<'a> {
304    pub indices: &'a [usize],
305    pub schema: &'a Schema,
306}
307
308impl<'a> IndicesDisplay<'a> {
309    /// Returns `None` means all
310    pub fn from_join<'b>(
311        join: &'a generic::Join<impl GenericPlanRef>,
312        input_schema: &'a Schema,
313    ) -> Pretty<'b> {
314        let col_num = join.internal_column_num();
315        let id = Self::from(&join.output_indices, col_num, input_schema);
316        id.map_or_else(|| Pretty::from("all"), Self::distill)
317    }
318
319    /// Returns `None` means all
320    fn from(indices: &'a [usize], col_num: usize, schema: &'a Schema) -> Option<Self> {
321        if indices.iter().copied().eq(0..col_num) {
322            return None;
323        }
324        Some(Self { indices, schema })
325    }
326
327    pub fn distill<'b>(self) -> Pretty<'b> {
328        let vec = self.indices.iter().map(|&i| {
329            let name = self.schema.fields.get(i).unwrap().name.clone();
330            Pretty::from(name)
331        });
332        Pretty::Array(vec.collect())
333    }
334}
335
336pub(crate) fn sum_affected_row(dml: BatchPlanRef) -> Result<BatchPlanRef> {
337    let dml = RequiredDist::single().batch_enforce_if_not_satisfies(dml, &Order::any())?;
338    // Accumulate the affected rows.
339    let sum_agg = PlanAggCall {
340        agg_type: PbAggKind::Sum.into(),
341        return_type: DataType::Int64,
342        inputs: vec![InputRef::new(0, DataType::Int64)],
343        distinct: false,
344        order_by: vec![],
345        filter: Condition::true_cond(),
346        direct_args: vec![],
347    };
348    let agg = Agg::new(vec![sum_agg], IndexSet::empty(), dml);
349    let batch_agg = BatchSimpleAgg::new(agg);
350    Ok(batch_agg.into())
351}
352
353/// Call `debug_struct` on the given formatter to create a debug struct builder.
354/// If a property list is provided, properties in it will be added to the struct name according to
355/// the condition of that property.
356macro_rules! plan_node_name {
357    ($name:literal $(, { $prop:literal, $cond:expr } )* $(,)?) => {
358        {
359            #[allow(unused_mut)]
360            let mut properties: Vec<&str> = vec![];
361            $( if $cond { properties.push($prop); } )*
362            let mut name = $name.to_string();
363            if !properties.is_empty() {
364                name += " [";
365                name += &properties.join(", ");
366                name += "]";
367            }
368            name
369        }
370    };
371}
372pub(crate) use plan_node_name;
373use risingwave_pb::common::{PbBatchQueryEpoch, batch_query_epoch};
374
375pub fn infer_kv_log_store_table_catalog_inner(
376    input: &StreamPlanRef,
377    columns: &[ColumnCatalog],
378) -> TableCatalog {
379    let mut table_catalog_builder = TableCatalogBuilder::default();
380
381    let mut value_indices =
382        Vec::with_capacity(KV_LOG_STORE_PREDEFINED_COLUMNS.len() + input.schema().fields().len());
383
384    for (name, data_type) in KV_LOG_STORE_PREDEFINED_COLUMNS {
385        let indice = table_catalog_builder.add_column(&Field::with_name(data_type, name));
386        value_indices.push(indice);
387    }
388
389    table_catalog_builder.set_vnode_col_idx(VNODE_COLUMN_INDEX);
390
391    for (i, ordering) in PK_ORDERING.iter().enumerate() {
392        table_catalog_builder.add_order_column(i, *ordering);
393    }
394
395    let read_prefix_len_hint = table_catalog_builder.get_current_pk_len();
396
397    if columns.len() != input.schema().fields().len()
398        || columns
399            .iter()
400            .zip_eq_fast(input.schema().fields())
401            .any(|(c, f)| *c.data_type() != f.data_type())
402    {
403        tracing::warn!(
404            "sink schema different with upstream schema: sink columns: {:?}, input schema: {:?}.",
405            columns,
406            input.schema()
407        );
408    }
409    for field in input.schema().fields() {
410        let indice = table_catalog_builder.add_column(field);
411        value_indices.push(indice);
412    }
413    table_catalog_builder.set_value_indices(value_indices);
414
415    // Modify distribution key indices based on the pre-defined columns.
416    let dist_key = input
417        .distribution()
418        .dist_column_indices()
419        .iter()
420        .map(|idx| idx + KV_LOG_STORE_PREDEFINED_COLUMNS.len())
421        .collect_vec();
422
423    table_catalog_builder.build(dist_key, read_prefix_len_hint)
424}
425
426pub fn infer_synced_kv_log_store_table_catalog_inner(
427    input: &StreamPlanRef,
428    columns: &[Field],
429) -> TableCatalog {
430    let mut table_catalog_builder = TableCatalogBuilder::default();
431
432    let mut value_indices =
433        Vec::with_capacity(KV_LOG_STORE_PREDEFINED_COLUMNS.len() + columns.len());
434
435    for (name, data_type) in KV_LOG_STORE_PREDEFINED_COLUMNS {
436        let indice = table_catalog_builder.add_column(&Field::with_name(data_type, name));
437        value_indices.push(indice);
438    }
439
440    table_catalog_builder.set_vnode_col_idx(VNODE_COLUMN_INDEX);
441
442    for (i, ordering) in PK_ORDERING.iter().enumerate() {
443        table_catalog_builder.add_order_column(i, *ordering);
444    }
445
446    let read_prefix_len_hint = table_catalog_builder.get_current_pk_len();
447
448    let payload_indices = {
449        let mut payload_indices = Vec::with_capacity(columns.len());
450        for column in columns {
451            let payload_index = table_catalog_builder.add_column(column);
452            payload_indices.push(payload_index);
453        }
454        payload_indices
455    };
456
457    value_indices.extend(payload_indices);
458    table_catalog_builder.set_value_indices(value_indices);
459
460    // Modify distribution key indices based on the pre-defined columns.
461    let dist_key = input
462        .distribution()
463        .dist_column_indices()
464        .iter()
465        .map(|idx| idx + KV_LOG_STORE_PREDEFINED_COLUMNS.len())
466        .collect_vec();
467
468    table_catalog_builder.build(dist_key, read_prefix_len_hint)
469}
470
471/// Check that all leaf nodes must be stream table scan,
472/// since that plan node maps to `backfill` executor, which supports recovery.
473/// Some other leaf nodes like `StreamValues` do not support recovery, and they
474/// cannot use background ddl.
475pub(crate) fn plan_can_use_background_ddl(plan: &StreamPlanRef) -> bool {
476    if plan.inputs().is_empty() {
477        if plan.as_stream_source_scan().is_some()
478            || plan.as_stream_now().is_some()
479            || plan.as_stream_source().is_some()
480        {
481            true
482        } else if let Some(scan) = plan.as_stream_table_scan() {
483            scan.stream_scan_type() == StreamScanType::Backfill
484                || scan.stream_scan_type() == StreamScanType::ArrangementBackfill
485                || scan.stream_scan_type() == StreamScanType::CrossDbSnapshotBackfill
486                || scan.stream_scan_type() == StreamScanType::SnapshotBackfill
487        } else {
488            false
489        }
490    } else {
491        assert!(!plan.inputs().is_empty());
492        plan.inputs().iter().all(plan_can_use_background_ddl)
493    }
494}
495
496pub fn unix_timestamp_sec_to_epoch(ts: i64) -> risingwave_common::util::epoch::Epoch {
497    let ts = ts.checked_add(1).unwrap();
498    risingwave_common::util::epoch::Epoch::from_unix_millis_or_earliest(
499        u64::try_from(ts).unwrap_or(0).checked_mul(1000).unwrap(),
500    )
501}
502
503pub fn to_batch_query_epoch(a: &Option<AsOf>) -> Result<Option<PbBatchQueryEpoch>> {
504    let Some(a) = a else {
505        return Ok(None);
506    };
507    Feature::TimeTravel.check_available()?;
508    let timestamp = match a {
509        AsOf::ProcessTime => {
510            return Err(ErrorCode::NotSupported(
511                "do not support as of proctime".to_owned(),
512                "please use as of timestamp".to_owned(),
513            )
514            .into());
515        }
516        AsOf::TimestampNum(ts) => *ts,
517        AsOf::TimestampString(ts) => {
518            let date_time = speedate::DateTime::parse_str_rfc3339(ts)
519                .map_err(|_e| anyhow!("fail to parse timestamp"))?;
520            if date_time.time.tz_offset.is_none() {
521                // If the input does not specify a time zone, use the time zone set by the "SET TIME ZONE" command.
522                risingwave_expr::expr_context::TIME_ZONE::try_with(|set_time_zone| {
523                    let tz =
524                        Timestamptz::lookup_time_zone(set_time_zone).map_err(|e| anyhow!(e))?;
525                    match tz.with_ymd_and_hms(
526                        date_time.date.year.into(),
527                        date_time.date.month.into(),
528                        date_time.date.day.into(),
529                        date_time.time.hour.into(),
530                        date_time.time.minute.into(),
531                        date_time.time.second.into(),
532                    ) {
533                        MappedLocalTime::Single(d) => Ok(d.timestamp()),
534                        MappedLocalTime::Ambiguous(_, _) | MappedLocalTime::None => {
535                            Err(anyhow!(format!(
536                                "failed to parse the timestamp {ts} with the specified time zone {tz}"
537                            )))
538                        }
539                    }
540                })??
541            } else {
542                date_time.timestamp_tz()
543            }
544        }
545        AsOf::VersionNum(_) | AsOf::VersionString(_) => {
546            return Err(ErrorCode::NotSupported(
547                "do not support as of version".to_owned(),
548                "please use as of timestamp".to_owned(),
549            )
550            .into());
551        }
552        AsOf::ProcessTimeWithInterval((value, leading_field)) => {
553            let interval = Interval::parse_with_fields(
554                value,
555                Some(crate::Binder::bind_date_time_field(*leading_field)),
556            )
557            .map_err(|_| anyhow!("fail to parse interval"))?;
558            let interval_sec = (interval.epoch_in_micros() / 1_000_000) as i64;
559            chrono::Utc::now()
560                .timestamp()
561                .checked_sub(interval_sec)
562                .ok_or_else(|| anyhow!("invalid timestamp"))?
563        }
564    };
565    Ok(Some(PbBatchQueryEpoch {
566        epoch: Some(batch_query_epoch::PbEpoch::TimeTravel(
567            unix_timestamp_sec_to_epoch(timestamp).0,
568        )),
569    }))
570}
571
572pub fn to_iceberg_time_travel_as_of(
573    a: &Option<AsOf>,
574    timezone: &String,
575) -> Result<Option<IcebergTimeTravelInfo>> {
576    Ok(match a {
577        Some(AsOf::VersionNum(v)) => Some(IcebergTimeTravelInfo::Version(*v)),
578        Some(AsOf::TimestampNum(ts)) => Some(IcebergTimeTravelInfo::TimestampMs(ts * 1000)),
579        Some(AsOf::VersionString(_)) => {
580            bail!("Unsupported version string in iceberg time travel")
581        }
582        Some(AsOf::TimestampString(ts)) => {
583            let date_time = speedate::DateTime::parse_str_rfc3339(ts)
584                .map_err(|_e| anyhow!("fail to parse timestamp"))?;
585            let timestamp = if date_time.time.tz_offset.is_none() {
586                // If the input does not specify a time zone, use the time zone set by the "SET TIME ZONE" command.
587                let tz = Timestamptz::lookup_time_zone(timezone).map_err(|e| anyhow!(e))?;
588                match tz.with_ymd_and_hms(
589                    date_time.date.year.into(),
590                    date_time.date.month.into(),
591                    date_time.date.day.into(),
592                    date_time.time.hour.into(),
593                    date_time.time.minute.into(),
594                    date_time.time.second.into(),
595                ) {
596                    MappedLocalTime::Single(d) => Ok(d.timestamp()),
597                    MappedLocalTime::Ambiguous(_, _) | MappedLocalTime::None => {
598                        Err(anyhow!(format!(
599                            "failed to parse the timestamp {ts} with the specified time zone {tz}"
600                        )))
601                    }
602                }?
603            } else {
604                date_time.timestamp_tz()
605            };
606
607            Some(IcebergTimeTravelInfo::TimestampMs(
608                timestamp * 1000 + date_time.time.microsecond as i64 / 1000,
609            ))
610        }
611        Some(AsOf::ProcessTime) | Some(AsOf::ProcessTimeWithInterval(_)) => {
612            unreachable!()
613        }
614        None => None,
615    })
616}
617
618pub fn scan_ranges_as_strs(order_names: Vec<String>, scan_ranges: &Vec<ScanRange>) -> Vec<String> {
619    let mut range_strs = vec![];
620
621    let explain_max_range = 20;
622    for scan_range in scan_ranges.iter().take(explain_max_range) {
623        #[expect(clippy::disallowed_methods)]
624        let mut range_str = scan_range
625            .eq_conds
626            .iter()
627            .zip(order_names.iter())
628            .map(|(v, name)| match v {
629                Some(v) => format!("{} = {:?}", name, v),
630                None => format!("{} IS NULL", name),
631            })
632            .collect_vec();
633
634        let len = scan_range.eq_conds.len();
635        if !is_full_range(&scan_range.range) {
636            let bound_range_str = match (&scan_range.range.0, &scan_range.range.1) {
637                (Bound::Unbounded, Bound::Unbounded) => unreachable!(),
638                (Bound::Unbounded, ub) => ub_to_string(&order_names[len..], ub),
639                (lb, Bound::Unbounded) => lb_to_string(&order_names[len..], lb),
640                (lb, ub) => format!(
641                    "{} AND {}",
642                    lb_to_string(&order_names[len..], lb),
643                    ub_to_string(&order_names[len..], ub)
644                ),
645            };
646            range_str.push(bound_range_str);
647        }
648        range_strs.push(range_str.join(" AND "));
649    }
650    if scan_ranges.len() > explain_max_range {
651        range_strs.push("...".to_owned());
652    }
653    range_strs
654}
655
656pub fn ub_to_string(order_names: &[String], ub: &Bound<Vec<Option<ScalarImpl>>>) -> String {
657    match ub {
658        Bound::Included(v) => {
659            let (name, value) = row_to_string(order_names, v);
660            format!("{} <= {}", name, value)
661        }
662        Bound::Excluded(v) => {
663            let (name, value) = row_to_string(order_names, v);
664            format!("{} < {}", name, value)
665        }
666        Bound::Unbounded => unreachable!(),
667    }
668}
669
670pub fn lb_to_string(order_names: &[String], lb: &Bound<Vec<Option<ScalarImpl>>>) -> String {
671    match lb {
672        Bound::Included(v) => {
673            let (name, value) = row_to_string(order_names, v);
674            format!("{} >= {}", name, value)
675        }
676        Bound::Excluded(v) => {
677            let (name, value) = row_to_string(order_names, v);
678            format!("{} > {}", name, value)
679        }
680        Bound::Unbounded => unreachable!(),
681    }
682}
683
684pub fn row_to_string(
685    order_names: &[String],
686    struct_values: &Vec<Option<ScalarImpl>>,
687) -> (String, String) {
688    let mut names = vec![];
689    let mut values = vec![];
690    #[expect(clippy::disallowed_methods)]
691    for (name, value) in order_names.iter().zip(struct_values.iter()) {
692        names.push(name);
693        match value {
694            Some(v) => values.push(format!("{:?}", v)),
695            None => values.push("null".to_owned()),
696        }
697    }
698    if names.len() == 1 {
699        (names[0].clone(), values[0].clone())
700    } else {
701        (
702            format!("({})", names.iter().join(", ")),
703            format!("({})", values.iter().join(", ")),
704        )
705    }
706}