risingwave_frontend/optimizer/plan_node/
utils.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::default::Default;
17use std::ops::Bound;
18use std::vec;
19
20use anyhow::anyhow;
21use chrono::{MappedLocalTime, TimeZone};
22use fixedbitset::FixedBitSet;
23use itertools::Itertools;
24use pretty_xmlish::{Pretty, Str, StrAssocArr, XmlNode};
25use risingwave_common::catalog::{
26    ColumnCatalog, ColumnDesc, ConflictBehavior, CreateType, Engine, Field, FieldDisplay,
27    OBJECT_ID_PLACEHOLDER, Schema, StreamJobStatus,
28};
29use risingwave_common::constants::log_store::v2::{
30    KV_LOG_STORE_PREDEFINED_COLUMNS, PK_ORDERING, VNODE_COLUMN_INDEX,
31};
32use risingwave_common::hash::VnodeCount;
33use risingwave_common::license::Feature;
34use risingwave_common::types::{DataType, Interval, ScalarImpl, Timestamptz};
35use risingwave_common::util::scan_range::{ScanRange, is_full_range};
36use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
37use risingwave_connector::source::iceberg::IcebergTimeTravelInfo;
38use risingwave_expr::aggregate::PbAggKind;
39use risingwave_expr::bail;
40use risingwave_pb::plan_common::as_of::AsOfType;
41use risingwave_pb::plan_common::{PbAsOf, as_of};
42use risingwave_sqlparser::ast::AsOf;
43
44use super::generic::{self, GenericPlanRef, PhysicalPlanRef};
45use super::pretty_config;
46use crate::PlanRef;
47use crate::catalog::table_catalog::TableType;
48use crate::catalog::{ColumnId, TableCatalog, TableId};
49use crate::error::{ErrorCode, Result};
50use crate::expr::InputRef;
51use crate::optimizer::StreamScanType;
52use crate::optimizer::plan_node::generic::Agg;
53use crate::optimizer::plan_node::{BatchSimpleAgg, PlanAggCall};
54use crate::optimizer::property::{Cardinality, Order, RequiredDist, WatermarkColumns};
55use crate::utils::{Condition, IndexSet};
56
57#[derive(Default)]
58pub struct TableCatalogBuilder {
59    /// All columns in this table
60    columns: Vec<ColumnCatalog>,
61    pk: Vec<ColumnOrder>,
62    value_indices: Option<Vec<usize>>,
63    vnode_col_idx: Option<usize>,
64    column_names: HashMap<String, i32>,
65    watermark_columns: Option<FixedBitSet>,
66    dist_key_in_pk: Option<Vec<usize>>,
67}
68
69/// For DRY, mainly used for construct internal table catalog in stateful streaming executors.
70/// Be careful of the order of add column.
71impl TableCatalogBuilder {
72    /// Add a column from Field info, return the column index of the table
73    pub fn add_column(&mut self, field: &Field) -> usize {
74        let column_idx = self.columns.len();
75        let column_id = column_idx as i32;
76        // Add column desc.
77        let mut column_desc = ColumnDesc::from_field_with_column_id(field, column_id);
78
79        // Replace dot of the internal table column name with underline.
80        column_desc.name = column_desc.name.replace('.', "_");
81        // Avoid column name duplicate.
82        self.avoid_duplicate_col_name(&mut column_desc);
83
84        self.columns.push(ColumnCatalog {
85            column_desc,
86            // All columns in internal table are invisible to batch query.
87            is_hidden: false,
88        });
89        column_idx
90    }
91
92    /// Extend the columns with column ids reset. The input columns should NOT have duplicate names.
93    ///
94    /// Returns the indices of the extended columns.
95    pub fn extend_columns(&mut self, columns: &[ColumnCatalog]) -> Vec<usize> {
96        let base_idx = self.columns.len();
97        columns.iter().enumerate().for_each(|(i, col)| {
98            assert!(!self.column_names.contains_key(col.name()));
99            self.column_names.insert(col.name().to_owned(), 0);
100
101            // Reset the column id for the columns.
102            let mut new_col = col.clone();
103            new_col.column_desc.column_id = ColumnId::new((base_idx + i) as _);
104            self.columns.push(new_col);
105        });
106        Vec::from_iter(base_idx..(base_idx + columns.len()))
107    }
108
109    /// Check whether need to add a ordered column. Different from value, order desc equal pk in
110    /// semantics and they are encoded as storage key.
111    pub fn add_order_column(&mut self, column_index: usize, order_type: OrderType) {
112        self.pk.push(ColumnOrder::new(column_index, order_type));
113    }
114
115    /// get the current exist field number of the primary key.
116    pub fn get_current_pk_len(&self) -> usize {
117        self.pk.len()
118    }
119
120    pub fn set_vnode_col_idx(&mut self, vnode_col_idx: usize) {
121        self.vnode_col_idx = Some(vnode_col_idx);
122    }
123
124    pub fn set_value_indices(&mut self, value_indices: Vec<usize>) {
125        self.value_indices = Some(value_indices);
126    }
127
128    pub fn set_dist_key_in_pk(&mut self, dist_key_in_pk: Vec<usize>) {
129        self.dist_key_in_pk = Some(dist_key_in_pk);
130    }
131
132    /// Check the column name whether exist before. if true, record occurrence and change the name
133    /// to avoid duplicate.
134    fn avoid_duplicate_col_name(&mut self, column_desc: &mut ColumnDesc) {
135        if let Some(old_identity) = self.column_names.get(&column_desc.name) {
136            let column_name = column_desc.name.clone();
137            let mut identity = *old_identity;
138            loop {
139                column_desc.name = format!("{}_{}", column_name, identity);
140                identity += 1;
141                if !self.column_names.contains_key(&column_desc.name) {
142                    break;
143                }
144            }
145            *self.column_names.get_mut(&column_name).unwrap() = identity;
146        }
147        self.column_names.insert(column_desc.name.clone(), 0);
148    }
149
150    /// Consume builder and create `TableCatalog` (for proto). The `read_prefix_len_hint` is the
151    /// anticipated read prefix pattern (number of fields) for the table, which can be utilized for
152    /// implementing the table's bloom filter or other storage optimization techniques.
153    pub fn build(self, distribution_key: Vec<usize>, read_prefix_len_hint: usize) -> TableCatalog {
154        assert!(read_prefix_len_hint <= self.pk.len());
155        let watermark_columns = match self.watermark_columns {
156            Some(w) => w,
157            None => FixedBitSet::with_capacity(self.columns.len()),
158        };
159
160        // If `dist_key_in_pk` is set, check if it matches with `distribution_key`.
161        // Note that we cannot derive in the opposite direction, because there can be a column
162        // appearing multiple times in the PK.
163        if let Some(dist_key_in_pk) = &self.dist_key_in_pk {
164            let derived_dist_key = dist_key_in_pk
165                .iter()
166                .map(|idx| self.pk[*idx].column_index)
167                .collect_vec();
168            assert_eq!(
169                derived_dist_key, distribution_key,
170                "dist_key mismatch with dist_key_in_pk"
171            );
172        }
173
174        TableCatalog {
175            id: TableId::placeholder(),
176            schema_id: 0,
177            database_id: 0,
178            associated_source_id: None,
179            name: String::new(),
180            dependent_relations: vec![],
181            columns: self.columns.clone(),
182            pk: self.pk,
183            stream_key: vec![],
184            distribution_key,
185            // NOTE: This should be altered if `TableCatalogBuilder` is used to build something
186            // other than internal tables.
187            table_type: TableType::Internal,
188            append_only: false,
189            owner: risingwave_common::catalog::DEFAULT_SUPER_USER_ID,
190            fragment_id: OBJECT_ID_PLACEHOLDER,
191            dml_fragment_id: None,
192            vnode_col_index: self.vnode_col_idx,
193            row_id_index: None,
194            value_indices: self
195                .value_indices
196                .unwrap_or_else(|| (0..self.columns.len()).collect_vec()),
197            definition: "".into(),
198            conflict_behavior: ConflictBehavior::NoCheck,
199            version_column_index: None,
200            read_prefix_len_hint,
201            version: None, // the internal table is not versioned and can't be schema changed
202            watermark_columns,
203            dist_key_in_pk: self.dist_key_in_pk.unwrap_or_default(),
204            cardinality: Cardinality::unknown(), // TODO(card): cardinality of internal table
205            created_at_epoch: None,
206            initialized_at_epoch: None,
207            cleaned_by_watermark: false,
208            // NOTE(kwannoel): This may not match the create type of the materialized table.
209            // It should be ignored for internal tables.
210            create_type: CreateType::Foreground,
211            stream_job_status: StreamJobStatus::Creating,
212            description: None,
213            incoming_sinks: vec![],
214            initialized_at_cluster_version: None,
215            created_at_cluster_version: None,
216            retention_seconds: None,
217            cdc_table_id: None,
218            vnode_count: VnodeCount::Placeholder, // will be filled in by the meta service later
219            webhook_info: None,
220            job_id: None,
221            engine: Engine::Hummock,
222            clean_watermark_index_in_pk: None, // TODO: fill this field
223        }
224    }
225
226    pub fn columns(&self) -> &[ColumnCatalog] {
227        &self.columns
228    }
229}
230
231/// See also [`super::generic::DistillUnit`].
232pub trait Distill {
233    fn distill<'a>(&self) -> XmlNode<'a>;
234
235    fn distill_to_string(&self) -> String {
236        let mut config = pretty_config();
237        let mut output = String::with_capacity(2048);
238        config.unicode(&mut output, &Pretty::Record(self.distill()));
239        output
240    }
241}
242
243pub(super) fn childless_record<'a>(
244    name: impl Into<Str<'a>>,
245    fields: StrAssocArr<'a>,
246) -> XmlNode<'a> {
247    XmlNode::simple_record(name, fields, Default::default())
248}
249
250macro_rules! impl_distill_by_unit {
251    ($ty:ty, $core:ident, $name:expr) => {
252        use pretty_xmlish::XmlNode;
253        use $crate::optimizer::plan_node::generic::DistillUnit;
254        use $crate::optimizer::plan_node::utils::Distill;
255        impl Distill for $ty {
256            fn distill<'a>(&self) -> XmlNode<'a> {
257                self.$core.distill_with_name($name)
258            }
259        }
260    };
261}
262pub(crate) use impl_distill_by_unit;
263
264pub(crate) fn column_names_pretty<'a>(schema: &Schema) -> Pretty<'a> {
265    let columns = (schema.fields.iter())
266        .map(|f| f.name.clone())
267        .map(Pretty::from)
268        .collect();
269    Pretty::Array(columns)
270}
271
272pub(crate) fn watermark_pretty<'a>(
273    watermark_columns: &WatermarkColumns,
274    schema: &Schema,
275) -> Option<Pretty<'a>> {
276    if watermark_columns.is_empty() {
277        None
278    } else {
279        let groups = watermark_columns.grouped();
280        let pretty_groups = groups
281            .values()
282            .map(|cols| {
283                Pretty::Array(
284                    cols.indices()
285                        .map(|idx| FieldDisplay(schema.fields.get(idx).unwrap()))
286                        .map(|d| Pretty::display(&d))
287                        .collect::<Vec<_>>(),
288                )
289            })
290            .collect();
291        Some(Pretty::Array(pretty_groups))
292    }
293}
294
295#[derive(Clone, Copy)]
296pub struct IndicesDisplay<'a> {
297    pub indices: &'a [usize],
298    pub schema: &'a Schema,
299}
300
301impl<'a> IndicesDisplay<'a> {
302    /// Returns `None` means all
303    pub fn from_join<'b, PlanRef: GenericPlanRef>(
304        join: &'a generic::Join<PlanRef>,
305        input_schema: &'a Schema,
306    ) -> Pretty<'b> {
307        let col_num = join.internal_column_num();
308        let id = Self::from(&join.output_indices, col_num, input_schema);
309        id.map_or_else(|| Pretty::from("all"), Self::distill)
310    }
311
312    /// Returns `None` means all
313    fn from(indices: &'a [usize], col_num: usize, schema: &'a Schema) -> Option<Self> {
314        if indices.iter().copied().eq(0..col_num) {
315            return None;
316        }
317        Some(Self { indices, schema })
318    }
319
320    pub fn distill<'b>(self) -> Pretty<'b> {
321        let vec = self.indices.iter().map(|&i| {
322            let name = self.schema.fields.get(i).unwrap().name.clone();
323            Pretty::from(name)
324        });
325        Pretty::Array(vec.collect())
326    }
327}
328
329pub(crate) fn sum_affected_row(dml: PlanRef) -> Result<PlanRef> {
330    let dml = RequiredDist::single().enforce_if_not_satisfies(dml, &Order::any())?;
331    // Accumulate the affected rows.
332    let sum_agg = PlanAggCall {
333        agg_type: PbAggKind::Sum.into(),
334        return_type: DataType::Int64,
335        inputs: vec![InputRef::new(0, DataType::Int64)],
336        distinct: false,
337        order_by: vec![],
338        filter: Condition::true_cond(),
339        direct_args: vec![],
340    };
341    let agg = Agg::new(vec![sum_agg], IndexSet::empty(), dml);
342    let batch_agg = BatchSimpleAgg::new(agg);
343    Ok(batch_agg.into())
344}
345
346/// Call `debug_struct` on the given formatter to create a debug struct builder.
347/// If a property list is provided, properties in it will be added to the struct name according to
348/// the condition of that property.
349macro_rules! plan_node_name {
350    ($name:literal $(, { $prop:literal, $cond:expr } )* $(,)?) => {
351        {
352            #[allow(unused_mut)]
353            let mut properties: Vec<&str> = vec![];
354            $( if $cond { properties.push($prop); } )*
355            let mut name = $name.to_string();
356            if !properties.is_empty() {
357                name += " [";
358                name += &properties.join(", ");
359                name += "]";
360            }
361            name
362        }
363    };
364}
365pub(crate) use plan_node_name;
366
367pub fn infer_kv_log_store_table_catalog_inner(
368    input: &PlanRef,
369    columns: &[ColumnCatalog],
370) -> TableCatalog {
371    let mut table_catalog_builder = TableCatalogBuilder::default();
372
373    let mut value_indices =
374        Vec::with_capacity(KV_LOG_STORE_PREDEFINED_COLUMNS.len() + columns.len());
375
376    for (name, data_type) in KV_LOG_STORE_PREDEFINED_COLUMNS {
377        let indice = table_catalog_builder.add_column(&Field::with_name(data_type, name));
378        value_indices.push(indice);
379    }
380
381    table_catalog_builder.set_vnode_col_idx(VNODE_COLUMN_INDEX);
382
383    for (i, ordering) in PK_ORDERING.iter().enumerate() {
384        table_catalog_builder.add_order_column(i, *ordering);
385    }
386
387    let read_prefix_len_hint = table_catalog_builder.get_current_pk_len();
388
389    let payload_indices = table_catalog_builder.extend_columns(columns);
390
391    value_indices.extend(payload_indices);
392    table_catalog_builder.set_value_indices(value_indices);
393
394    // Modify distribution key indices based on the pre-defined columns.
395    let dist_key = input
396        .distribution()
397        .dist_column_indices()
398        .iter()
399        .map(|idx| idx + KV_LOG_STORE_PREDEFINED_COLUMNS.len())
400        .collect_vec();
401
402    table_catalog_builder.build(dist_key, read_prefix_len_hint)
403}
404
405pub fn infer_synced_kv_log_store_table_catalog_inner(
406    input: &PlanRef,
407    columns: &[Field],
408) -> TableCatalog {
409    let mut table_catalog_builder = TableCatalogBuilder::default();
410
411    let mut value_indices =
412        Vec::with_capacity(KV_LOG_STORE_PREDEFINED_COLUMNS.len() + columns.len());
413
414    for (name, data_type) in KV_LOG_STORE_PREDEFINED_COLUMNS {
415        let indice = table_catalog_builder.add_column(&Field::with_name(data_type, name));
416        value_indices.push(indice);
417    }
418
419    table_catalog_builder.set_vnode_col_idx(VNODE_COLUMN_INDEX);
420
421    for (i, ordering) in PK_ORDERING.iter().enumerate() {
422        table_catalog_builder.add_order_column(i, *ordering);
423    }
424
425    let read_prefix_len_hint = table_catalog_builder.get_current_pk_len();
426
427    let payload_indices = {
428        let mut payload_indices = Vec::with_capacity(columns.len());
429        for column in columns {
430            let payload_index = table_catalog_builder.add_column(column);
431            payload_indices.push(payload_index);
432        }
433        payload_indices
434    };
435
436    value_indices.extend(payload_indices);
437    table_catalog_builder.set_value_indices(value_indices);
438
439    // Modify distribution key indices based on the pre-defined columns.
440    let dist_key = input
441        .distribution()
442        .dist_column_indices()
443        .iter()
444        .map(|idx| idx + KV_LOG_STORE_PREDEFINED_COLUMNS.len())
445        .collect_vec();
446
447    table_catalog_builder.build(dist_key, read_prefix_len_hint)
448}
449
450/// Check that all leaf nodes must be stream table scan,
451/// since that plan node maps to `backfill` executor, which supports recovery.
452/// Some other leaf nodes like `StreamValues` do not support recovery, and they
453/// cannot use background ddl.
454pub(crate) fn plan_can_use_background_ddl(plan: &PlanRef) -> bool {
455    if plan.inputs().is_empty() {
456        if plan.as_stream_source_scan().is_some()
457            || plan.as_stream_now().is_some()
458            || plan.as_stream_source().is_some()
459        {
460            true
461        } else if let Some(scan) = plan.as_stream_table_scan() {
462            scan.stream_scan_type() == StreamScanType::Backfill
463                || scan.stream_scan_type() == StreamScanType::ArrangementBackfill
464                || scan.stream_scan_type() == StreamScanType::CrossDbSnapshotBackfill
465        } else {
466            false
467        }
468    } else {
469        assert!(!plan.inputs().is_empty());
470        plan.inputs().iter().all(plan_can_use_background_ddl)
471    }
472}
473
474pub fn to_pb_time_travel_as_of(a: &Option<AsOf>) -> Result<Option<PbAsOf>> {
475    let Some(a) = a else {
476        return Ok(None);
477    };
478    Feature::TimeTravel
479        .check_available()
480        .map_err(|e| anyhow::anyhow!(e))?;
481    let as_of_type = match a {
482        AsOf::ProcessTime => {
483            return Err(ErrorCode::NotSupported(
484                "do not support as of proctime".to_owned(),
485                "please use as of timestamp".to_owned(),
486            )
487            .into());
488        }
489        AsOf::TimestampNum(ts) => AsOfType::Timestamp(as_of::Timestamp { timestamp: *ts }),
490        AsOf::TimestampString(ts) => {
491            let date_time = speedate::DateTime::parse_str_rfc3339(ts)
492                .map_err(|_e| anyhow!("fail to parse timestamp"))?;
493            let timestamp = if date_time.time.tz_offset.is_none() {
494                // If the input does not specify a time zone, use the time zone set by the "SET TIME ZONE" command.
495                risingwave_expr::expr_context::TIME_ZONE::try_with(|set_time_zone| {
496                    let tz =
497                        Timestamptz::lookup_time_zone(set_time_zone).map_err(|e| anyhow!(e))?;
498                    match tz.with_ymd_and_hms(
499                        date_time.date.year.into(),
500                        date_time.date.month.into(),
501                        date_time.date.day.into(),
502                        date_time.time.hour.into(),
503                        date_time.time.minute.into(),
504                        date_time.time.second.into(),
505                    ) {
506                        MappedLocalTime::Single(d) => Ok(d.timestamp()),
507                        MappedLocalTime::Ambiguous(_, _) | MappedLocalTime::None => {
508                            Err(anyhow!(format!(
509                                "failed to parse the timestamp {ts} with the specified time zone {tz}"
510                            )))
511                        }
512                    }
513                })??
514            } else {
515                date_time.timestamp_tz()
516            };
517            AsOfType::Timestamp(as_of::Timestamp { timestamp })
518        }
519        AsOf::VersionNum(_) | AsOf::VersionString(_) => {
520            return Err(ErrorCode::NotSupported(
521                "do not support as of version".to_owned(),
522                "please use as of timestamp".to_owned(),
523            )
524            .into());
525        }
526        AsOf::ProcessTimeWithInterval((value, leading_field)) => {
527            let interval = Interval::parse_with_fields(
528                value,
529                Some(crate::Binder::bind_date_time_field(leading_field.clone())),
530            )
531            .map_err(|_| anyhow!("fail to parse interval"))?;
532            let interval_sec = (interval.epoch_in_micros() / 1_000_000) as i64;
533            let timestamp = chrono::Utc::now()
534                .timestamp()
535                .checked_sub(interval_sec)
536                .ok_or_else(|| anyhow!("invalid timestamp"))?;
537            AsOfType::Timestamp(as_of::Timestamp { timestamp })
538        }
539    };
540    Ok(Some(PbAsOf {
541        as_of_type: Some(as_of_type),
542    }))
543}
544
545pub fn to_iceberg_time_travel_as_of(
546    a: &Option<AsOf>,
547    timezone: &String,
548) -> Result<Option<IcebergTimeTravelInfo>> {
549    Ok(match a {
550        Some(AsOf::VersionNum(v)) => Some(IcebergTimeTravelInfo::Version(*v)),
551        Some(AsOf::TimestampNum(ts)) => Some(IcebergTimeTravelInfo::TimestampMs(ts * 1000)),
552        Some(AsOf::VersionString(_)) => {
553            bail!("Unsupported version string in iceberg time travel")
554        }
555        Some(AsOf::TimestampString(ts)) => {
556            let date_time = speedate::DateTime::parse_str_rfc3339(ts)
557                .map_err(|_e| anyhow!("fail to parse timestamp"))?;
558            let timestamp = if date_time.time.tz_offset.is_none() {
559                // If the input does not specify a time zone, use the time zone set by the "SET TIME ZONE" command.
560                let tz = Timestamptz::lookup_time_zone(timezone).map_err(|e| anyhow!(e))?;
561                match tz.with_ymd_and_hms(
562                    date_time.date.year.into(),
563                    date_time.date.month.into(),
564                    date_time.date.day.into(),
565                    date_time.time.hour.into(),
566                    date_time.time.minute.into(),
567                    date_time.time.second.into(),
568                ) {
569                    MappedLocalTime::Single(d) => Ok(d.timestamp()),
570                    MappedLocalTime::Ambiguous(_, _) | MappedLocalTime::None => {
571                        Err(anyhow!(format!(
572                            "failed to parse the timestamp {ts} with the specified time zone {tz}"
573                        )))
574                    }
575                }?
576            } else {
577                date_time.timestamp_tz()
578            };
579
580            Some(IcebergTimeTravelInfo::TimestampMs(
581                timestamp * 1000 + date_time.time.microsecond as i64 / 1000,
582            ))
583        }
584        Some(AsOf::ProcessTime) | Some(AsOf::ProcessTimeWithInterval(_)) => {
585            unreachable!()
586        }
587        None => None,
588    })
589}
590
591pub fn scan_ranges_as_strs(order_names: Vec<String>, scan_ranges: &Vec<ScanRange>) -> Vec<String> {
592    let mut range_strs = vec![];
593
594    let explain_max_range = 20;
595    for scan_range in scan_ranges.iter().take(explain_max_range) {
596        #[expect(clippy::disallowed_methods)]
597        let mut range_str = scan_range
598            .eq_conds
599            .iter()
600            .zip(order_names.iter())
601            .map(|(v, name)| match v {
602                Some(v) => format!("{} = {:?}", name, v),
603                None => format!("{} IS NULL", name),
604            })
605            .collect_vec();
606
607        let len = scan_range.eq_conds.len();
608        if !is_full_range(&scan_range.range) {
609            let bound_range_str = match (&scan_range.range.0, &scan_range.range.1) {
610                (Bound::Unbounded, Bound::Unbounded) => unreachable!(),
611                (Bound::Unbounded, ub) => ub_to_string(&order_names[len..], ub),
612                (lb, Bound::Unbounded) => lb_to_string(&order_names[len..], lb),
613                (lb, ub) => format!(
614                    "{} AND {}",
615                    lb_to_string(&order_names[len..], lb),
616                    ub_to_string(&order_names[len..], ub)
617                ),
618            };
619            range_str.push(bound_range_str);
620        }
621        range_strs.push(range_str.join(" AND "));
622    }
623    if scan_ranges.len() > explain_max_range {
624        range_strs.push("...".to_owned());
625    }
626    range_strs
627}
628
629pub fn ub_to_string(order_names: &[String], ub: &Bound<Vec<Option<ScalarImpl>>>) -> String {
630    match ub {
631        Bound::Included(v) => {
632            let (name, value) = row_to_string(order_names, v);
633            format!("{} <= {}", name, value)
634        }
635        Bound::Excluded(v) => {
636            let (name, value) = row_to_string(order_names, v);
637            format!("{} < {}", name, value)
638        }
639        Bound::Unbounded => unreachable!(),
640    }
641}
642
643pub fn lb_to_string(order_names: &[String], lb: &Bound<Vec<Option<ScalarImpl>>>) -> String {
644    match lb {
645        Bound::Included(v) => {
646            let (name, value) = row_to_string(order_names, v);
647            format!("{} >= {}", name, value)
648        }
649        Bound::Excluded(v) => {
650            let (name, value) = row_to_string(order_names, v);
651            format!("{} > {}", name, value)
652        }
653        Bound::Unbounded => unreachable!(),
654    }
655}
656
657pub fn row_to_string(
658    order_names: &[String],
659    struct_values: &Vec<Option<ScalarImpl>>,
660) -> (String, String) {
661    let mut names = vec![];
662    let mut values = vec![];
663    #[expect(clippy::disallowed_methods)]
664    for (name, value) in order_names.iter().zip(struct_values.iter()) {
665        names.push(name);
666        match value {
667            Some(v) => values.push(format!("{:?}", v)),
668            None => values.push("null".to_owned()),
669        }
670    }
671    if names.len() == 1 {
672        (names[0].clone(), values[0].clone())
673    } else {
674        (
675            format!("({})", names.iter().join(", ")),
676            format!("({})", values.iter().join(", ")),
677        )
678    }
679}