risingwave_frontend/optimizer/plan_node/
utils.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::default::Default;
17use std::ops::Bound;
18use std::vec;
19
20use anyhow::anyhow;
21use chrono::{MappedLocalTime, TimeZone};
22use fixedbitset::FixedBitSet;
23use itertools::Itertools;
24use pretty_xmlish::{Pretty, Str, StrAssocArr, XmlNode};
25use risingwave_common::catalog::{
26    ColumnCatalog, ColumnDesc, ConflictBehavior, CreateType, Engine, Field, FieldDisplay,
27    OBJECT_ID_PLACEHOLDER, Schema, StreamJobStatus,
28};
29use risingwave_common::constants::log_store::v2::{
30    KV_LOG_STORE_PREDEFINED_COLUMNS, PK_ORDERING, VNODE_COLUMN_INDEX,
31};
32use risingwave_common::hash::VnodeCount;
33use risingwave_common::license::Feature;
34use risingwave_common::types::{DataType, Interval, ScalarImpl, Timestamptz};
35use risingwave_common::util::iter_util::ZipEqFast;
36use risingwave_common::util::scan_range::{ScanRange, is_full_range};
37use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
38use risingwave_connector::source::iceberg::IcebergTimeTravelInfo;
39use risingwave_expr::aggregate::PbAggKind;
40use risingwave_expr::bail;
41use risingwave_pb::plan_common::as_of::AsOfType;
42use risingwave_pb::plan_common::{PbAsOf, as_of};
43use risingwave_sqlparser::ast::AsOf;
44
45use super::generic::{self, GenericPlanRef, PhysicalPlanRef};
46use super::pretty_config;
47use crate::PlanRef;
48use crate::catalog::table_catalog::TableType;
49use crate::catalog::{ColumnId, TableCatalog, TableId};
50use crate::error::{ErrorCode, Result};
51use crate::expr::InputRef;
52use crate::optimizer::StreamScanType;
53use crate::optimizer::plan_node::generic::Agg;
54use crate::optimizer::plan_node::{BatchSimpleAgg, PlanAggCall};
55use crate::optimizer::property::{Cardinality, Order, RequiredDist, WatermarkColumns};
56use crate::utils::{Condition, IndexSet};
57
58#[derive(Default)]
59pub struct TableCatalogBuilder {
60    /// All columns in this table
61    columns: Vec<ColumnCatalog>,
62    pk: Vec<ColumnOrder>,
63    value_indices: Option<Vec<usize>>,
64    vnode_col_idx: Option<usize>,
65    column_names: HashMap<String, i32>,
66    watermark_columns: Option<FixedBitSet>,
67    dist_key_in_pk: Option<Vec<usize>>,
68}
69
70/// For DRY, mainly used for construct internal table catalog in stateful streaming executors.
71/// Be careful of the order of add column.
72impl TableCatalogBuilder {
73    /// Add a column from Field info, return the column index of the table
74    pub fn add_column(&mut self, field: &Field) -> usize {
75        let column_idx = self.columns.len();
76        let column_id = column_idx as i32;
77        // Add column desc.
78        let mut column_desc = ColumnDesc::from_field_with_column_id(field, column_id);
79
80        // Replace dot of the internal table column name with underline.
81        column_desc.name = column_desc.name.replace('.', "_");
82        // Avoid column name duplicate.
83        self.avoid_duplicate_col_name(&mut column_desc);
84
85        self.columns.push(ColumnCatalog {
86            column_desc,
87            // All columns in internal table are invisible to batch query.
88            is_hidden: false,
89        });
90        column_idx
91    }
92
93    /// Extend the columns with column ids reset. The input columns should NOT have duplicate names.
94    ///
95    /// Returns the indices of the extended columns.
96    pub fn extend_columns(&mut self, columns: &[ColumnCatalog]) -> Vec<usize> {
97        let base_idx = self.columns.len();
98        columns.iter().enumerate().for_each(|(i, col)| {
99            assert!(!self.column_names.contains_key(col.name()));
100            self.column_names.insert(col.name().to_owned(), 0);
101
102            // Reset the column id for the columns.
103            let mut new_col = col.clone();
104            new_col.column_desc.column_id = ColumnId::new((base_idx + i) as _);
105            self.columns.push(new_col);
106        });
107        Vec::from_iter(base_idx..(base_idx + columns.len()))
108    }
109
110    /// Check whether need to add a ordered column. Different from value, order desc equal pk in
111    /// semantics and they are encoded as storage key.
112    pub fn add_order_column(&mut self, column_index: usize, order_type: OrderType) {
113        self.pk.push(ColumnOrder::new(column_index, order_type));
114    }
115
116    /// get the current exist field number of the primary key.
117    pub fn get_current_pk_len(&self) -> usize {
118        self.pk.len()
119    }
120
121    pub fn set_vnode_col_idx(&mut self, vnode_col_idx: usize) {
122        self.vnode_col_idx = Some(vnode_col_idx);
123    }
124
125    pub fn set_value_indices(&mut self, value_indices: Vec<usize>) {
126        self.value_indices = Some(value_indices);
127    }
128
129    pub fn set_dist_key_in_pk(&mut self, dist_key_in_pk: Vec<usize>) {
130        self.dist_key_in_pk = Some(dist_key_in_pk);
131    }
132
133    /// Check the column name whether exist before. if true, record occurrence and change the name
134    /// to avoid duplicate.
135    fn avoid_duplicate_col_name(&mut self, column_desc: &mut ColumnDesc) {
136        if let Some(old_identity) = self.column_names.get(&column_desc.name) {
137            let column_name = column_desc.name.clone();
138            let mut identity = *old_identity;
139            loop {
140                column_desc.name = format!("{}_{}", column_name, identity);
141                identity += 1;
142                if !self.column_names.contains_key(&column_desc.name) {
143                    break;
144                }
145            }
146            *self.column_names.get_mut(&column_name).unwrap() = identity;
147        }
148        self.column_names.insert(column_desc.name.clone(), 0);
149    }
150
151    /// Consume builder and create `TableCatalog` (for proto). The `read_prefix_len_hint` is the
152    /// anticipated read prefix pattern (number of fields) for the table, which can be utilized for
153    /// implementing the table's bloom filter or other storage optimization techniques.
154    pub fn build(self, distribution_key: Vec<usize>, read_prefix_len_hint: usize) -> TableCatalog {
155        assert!(read_prefix_len_hint <= self.pk.len());
156        let watermark_columns = match self.watermark_columns {
157            Some(w) => w,
158            None => FixedBitSet::with_capacity(self.columns.len()),
159        };
160
161        // If `dist_key_in_pk` is set, check if it matches with `distribution_key`.
162        // Note that we cannot derive in the opposite direction, because there can be a column
163        // appearing multiple times in the PK.
164        if let Some(dist_key_in_pk) = &self.dist_key_in_pk {
165            let derived_dist_key = dist_key_in_pk
166                .iter()
167                .map(|idx| self.pk[*idx].column_index)
168                .collect_vec();
169            assert_eq!(
170                derived_dist_key, distribution_key,
171                "dist_key mismatch with dist_key_in_pk"
172            );
173        }
174
175        TableCatalog {
176            id: TableId::placeholder(),
177            schema_id: 0,
178            database_id: 0,
179            associated_source_id: None,
180            name: String::new(),
181            dependent_relations: vec![],
182            columns: self.columns.clone(),
183            pk: self.pk,
184            stream_key: vec![],
185            distribution_key,
186            // NOTE: This should be altered if `TableCatalogBuilder` is used to build something
187            // other than internal tables.
188            table_type: TableType::Internal,
189            append_only: false,
190            owner: risingwave_common::catalog::DEFAULT_SUPER_USER_ID,
191            fragment_id: OBJECT_ID_PLACEHOLDER,
192            dml_fragment_id: None,
193            vnode_col_index: self.vnode_col_idx,
194            row_id_index: None,
195            value_indices: self
196                .value_indices
197                .unwrap_or_else(|| (0..self.columns.len()).collect_vec()),
198            definition: "".into(),
199            conflict_behavior: ConflictBehavior::NoCheck,
200            version_column_index: None,
201            read_prefix_len_hint,
202            version: None, // the internal table is not versioned and can't be schema changed
203            watermark_columns,
204            dist_key_in_pk: self.dist_key_in_pk.unwrap_or_default(),
205            cardinality: Cardinality::unknown(), // TODO(card): cardinality of internal table
206            created_at_epoch: None,
207            initialized_at_epoch: None,
208            cleaned_by_watermark: false,
209            // NOTE(kwannoel): This may not match the create type of the materialized table.
210            // It should be ignored for internal tables.
211            create_type: CreateType::Foreground,
212            stream_job_status: StreamJobStatus::Creating,
213            description: None,
214            incoming_sinks: vec![],
215            initialized_at_cluster_version: None,
216            created_at_cluster_version: None,
217            retention_seconds: None,
218            cdc_table_id: None,
219            vnode_count: VnodeCount::Placeholder, // will be filled in by the meta service later
220            webhook_info: None,
221            job_id: None,
222            engine: Engine::Hummock,
223            clean_watermark_index_in_pk: None, // TODO: fill this field
224        }
225    }
226
227    pub fn columns(&self) -> &[ColumnCatalog] {
228        &self.columns
229    }
230}
231
232/// See also [`super::generic::DistillUnit`].
233pub trait Distill {
234    fn distill<'a>(&self) -> XmlNode<'a>;
235
236    fn distill_to_string(&self) -> String {
237        let mut config = pretty_config();
238        let mut output = String::with_capacity(2048);
239        config.unicode(&mut output, &Pretty::Record(self.distill()));
240        output
241    }
242}
243
244pub(super) fn childless_record<'a>(
245    name: impl Into<Str<'a>>,
246    fields: StrAssocArr<'a>,
247) -> XmlNode<'a> {
248    XmlNode::simple_record(name, fields, Default::default())
249}
250
251macro_rules! impl_distill_by_unit {
252    ($ty:ty, $core:ident, $name:expr) => {
253        use pretty_xmlish::XmlNode;
254        use $crate::optimizer::plan_node::generic::DistillUnit;
255        use $crate::optimizer::plan_node::utils::Distill;
256        impl Distill for $ty {
257            fn distill<'a>(&self) -> XmlNode<'a> {
258                self.$core.distill_with_name($name)
259            }
260        }
261    };
262}
263pub(crate) use impl_distill_by_unit;
264
265pub(crate) fn column_names_pretty<'a>(schema: &Schema) -> Pretty<'a> {
266    let columns = (schema.fields.iter())
267        .map(|f| f.name.clone())
268        .map(Pretty::from)
269        .collect();
270    Pretty::Array(columns)
271}
272
273pub(crate) fn watermark_pretty<'a>(
274    watermark_columns: &WatermarkColumns,
275    schema: &Schema,
276) -> Option<Pretty<'a>> {
277    if watermark_columns.is_empty() {
278        None
279    } else {
280        let groups = watermark_columns.grouped();
281        let pretty_groups = groups
282            .values()
283            .map(|cols| {
284                Pretty::Array(
285                    cols.indices()
286                        .map(|idx| FieldDisplay(schema.fields.get(idx).unwrap()))
287                        .map(|d| Pretty::display(&d))
288                        .collect::<Vec<_>>(),
289                )
290            })
291            .collect();
292        Some(Pretty::Array(pretty_groups))
293    }
294}
295
296#[derive(Clone, Copy)]
297pub struct IndicesDisplay<'a> {
298    pub indices: &'a [usize],
299    pub schema: &'a Schema,
300}
301
302impl<'a> IndicesDisplay<'a> {
303    /// Returns `None` means all
304    pub fn from_join<'b, PlanRef: GenericPlanRef>(
305        join: &'a generic::Join<PlanRef>,
306        input_schema: &'a Schema,
307    ) -> Pretty<'b> {
308        let col_num = join.internal_column_num();
309        let id = Self::from(&join.output_indices, col_num, input_schema);
310        id.map_or_else(|| Pretty::from("all"), Self::distill)
311    }
312
313    /// Returns `None` means all
314    fn from(indices: &'a [usize], col_num: usize, schema: &'a Schema) -> Option<Self> {
315        if indices.iter().copied().eq(0..col_num) {
316            return None;
317        }
318        Some(Self { indices, schema })
319    }
320
321    pub fn distill<'b>(self) -> Pretty<'b> {
322        let vec = self.indices.iter().map(|&i| {
323            let name = self.schema.fields.get(i).unwrap().name.clone();
324            Pretty::from(name)
325        });
326        Pretty::Array(vec.collect())
327    }
328}
329
330pub(crate) fn sum_affected_row(dml: PlanRef) -> Result<PlanRef> {
331    let dml = RequiredDist::single().enforce_if_not_satisfies(dml, &Order::any())?;
332    // Accumulate the affected rows.
333    let sum_agg = PlanAggCall {
334        agg_type: PbAggKind::Sum.into(),
335        return_type: DataType::Int64,
336        inputs: vec![InputRef::new(0, DataType::Int64)],
337        distinct: false,
338        order_by: vec![],
339        filter: Condition::true_cond(),
340        direct_args: vec![],
341    };
342    let agg = Agg::new(vec![sum_agg], IndexSet::empty(), dml);
343    let batch_agg = BatchSimpleAgg::new(agg);
344    Ok(batch_agg.into())
345}
346
347/// Call `debug_struct` on the given formatter to create a debug struct builder.
348/// If a property list is provided, properties in it will be added to the struct name according to
349/// the condition of that property.
350macro_rules! plan_node_name {
351    ($name:literal $(, { $prop:literal, $cond:expr } )* $(,)?) => {
352        {
353            #[allow(unused_mut)]
354            let mut properties: Vec<&str> = vec![];
355            $( if $cond { properties.push($prop); } )*
356            let mut name = $name.to_string();
357            if !properties.is_empty() {
358                name += " [";
359                name += &properties.join(", ");
360                name += "]";
361            }
362            name
363        }
364    };
365}
366pub(crate) use plan_node_name;
367
368pub fn infer_kv_log_store_table_catalog_inner(
369    input: &PlanRef,
370    columns: &[ColumnCatalog],
371) -> TableCatalog {
372    let mut table_catalog_builder = TableCatalogBuilder::default();
373
374    let mut value_indices =
375        Vec::with_capacity(KV_LOG_STORE_PREDEFINED_COLUMNS.len() + input.schema().fields().len());
376
377    for (name, data_type) in KV_LOG_STORE_PREDEFINED_COLUMNS {
378        let indice = table_catalog_builder.add_column(&Field::with_name(data_type, name));
379        value_indices.push(indice);
380    }
381
382    table_catalog_builder.set_vnode_col_idx(VNODE_COLUMN_INDEX);
383
384    for (i, ordering) in PK_ORDERING.iter().enumerate() {
385        table_catalog_builder.add_order_column(i, *ordering);
386    }
387
388    let read_prefix_len_hint = table_catalog_builder.get_current_pk_len();
389
390    if columns.len() != input.schema().fields().len()
391        || columns
392            .iter()
393            .zip_eq_fast(input.schema().fields())
394            .any(|(c, f)| *c.data_type() != f.data_type())
395    {
396        tracing::warn!(
397            "sink schema different with upstream schema: sink columns: {:?}, input schema: {:?}.",
398            columns,
399            input.schema()
400        );
401    }
402    for field in input.schema().fields() {
403        let indice = table_catalog_builder.add_column(field);
404        value_indices.push(indice);
405    }
406    table_catalog_builder.set_value_indices(value_indices);
407
408    // Modify distribution key indices based on the pre-defined columns.
409    let dist_key = input
410        .distribution()
411        .dist_column_indices()
412        .iter()
413        .map(|idx| idx + KV_LOG_STORE_PREDEFINED_COLUMNS.len())
414        .collect_vec();
415
416    table_catalog_builder.build(dist_key, read_prefix_len_hint)
417}
418
419pub fn infer_synced_kv_log_store_table_catalog_inner(
420    input: &PlanRef,
421    columns: &[Field],
422) -> TableCatalog {
423    let mut table_catalog_builder = TableCatalogBuilder::default();
424
425    let mut value_indices =
426        Vec::with_capacity(KV_LOG_STORE_PREDEFINED_COLUMNS.len() + columns.len());
427
428    for (name, data_type) in KV_LOG_STORE_PREDEFINED_COLUMNS {
429        let indice = table_catalog_builder.add_column(&Field::with_name(data_type, name));
430        value_indices.push(indice);
431    }
432
433    table_catalog_builder.set_vnode_col_idx(VNODE_COLUMN_INDEX);
434
435    for (i, ordering) in PK_ORDERING.iter().enumerate() {
436        table_catalog_builder.add_order_column(i, *ordering);
437    }
438
439    let read_prefix_len_hint = table_catalog_builder.get_current_pk_len();
440
441    let payload_indices = {
442        let mut payload_indices = Vec::with_capacity(columns.len());
443        for column in columns {
444            let payload_index = table_catalog_builder.add_column(column);
445            payload_indices.push(payload_index);
446        }
447        payload_indices
448    };
449
450    value_indices.extend(payload_indices);
451    table_catalog_builder.set_value_indices(value_indices);
452
453    // Modify distribution key indices based on the pre-defined columns.
454    let dist_key = input
455        .distribution()
456        .dist_column_indices()
457        .iter()
458        .map(|idx| idx + KV_LOG_STORE_PREDEFINED_COLUMNS.len())
459        .collect_vec();
460
461    table_catalog_builder.build(dist_key, read_prefix_len_hint)
462}
463
464/// Check that all leaf nodes must be stream table scan,
465/// since that plan node maps to `backfill` executor, which supports recovery.
466/// Some other leaf nodes like `StreamValues` do not support recovery, and they
467/// cannot use background ddl.
468pub(crate) fn plan_can_use_background_ddl(plan: &PlanRef) -> bool {
469    if plan.inputs().is_empty() {
470        if plan.as_stream_source_scan().is_some()
471            || plan.as_stream_now().is_some()
472            || plan.as_stream_source().is_some()
473        {
474            true
475        } else if let Some(scan) = plan.as_stream_table_scan() {
476            scan.stream_scan_type() == StreamScanType::Backfill
477                || scan.stream_scan_type() == StreamScanType::ArrangementBackfill
478                || scan.stream_scan_type() == StreamScanType::CrossDbSnapshotBackfill
479                || scan.stream_scan_type() == StreamScanType::SnapshotBackfill
480        } else {
481            false
482        }
483    } else {
484        assert!(!plan.inputs().is_empty());
485        plan.inputs().iter().all(plan_can_use_background_ddl)
486    }
487}
488
489pub fn to_pb_time_travel_as_of(a: &Option<AsOf>) -> Result<Option<PbAsOf>> {
490    let Some(a) = a else {
491        return Ok(None);
492    };
493    Feature::TimeTravel
494        .check_available()
495        .map_err(|e| anyhow::anyhow!(e))?;
496    let as_of_type = match a {
497        AsOf::ProcessTime => {
498            return Err(ErrorCode::NotSupported(
499                "do not support as of proctime".to_owned(),
500                "please use as of timestamp".to_owned(),
501            )
502            .into());
503        }
504        AsOf::TimestampNum(ts) => AsOfType::Timestamp(as_of::Timestamp { timestamp: *ts }),
505        AsOf::TimestampString(ts) => {
506            let date_time = speedate::DateTime::parse_str_rfc3339(ts)
507                .map_err(|_e| anyhow!("fail to parse timestamp"))?;
508            let timestamp = if date_time.time.tz_offset.is_none() {
509                // If the input does not specify a time zone, use the time zone set by the "SET TIME ZONE" command.
510                risingwave_expr::expr_context::TIME_ZONE::try_with(|set_time_zone| {
511                    let tz =
512                        Timestamptz::lookup_time_zone(set_time_zone).map_err(|e| anyhow!(e))?;
513                    match tz.with_ymd_and_hms(
514                        date_time.date.year.into(),
515                        date_time.date.month.into(),
516                        date_time.date.day.into(),
517                        date_time.time.hour.into(),
518                        date_time.time.minute.into(),
519                        date_time.time.second.into(),
520                    ) {
521                        MappedLocalTime::Single(d) => Ok(d.timestamp()),
522                        MappedLocalTime::Ambiguous(_, _) | MappedLocalTime::None => {
523                            Err(anyhow!(format!(
524                                "failed to parse the timestamp {ts} with the specified time zone {tz}"
525                            )))
526                        }
527                    }
528                })??
529            } else {
530                date_time.timestamp_tz()
531            };
532            AsOfType::Timestamp(as_of::Timestamp { timestamp })
533        }
534        AsOf::VersionNum(_) | AsOf::VersionString(_) => {
535            return Err(ErrorCode::NotSupported(
536                "do not support as of version".to_owned(),
537                "please use as of timestamp".to_owned(),
538            )
539            .into());
540        }
541        AsOf::ProcessTimeWithInterval((value, leading_field)) => {
542            let interval = Interval::parse_with_fields(
543                value,
544                Some(crate::Binder::bind_date_time_field(leading_field.clone())),
545            )
546            .map_err(|_| anyhow!("fail to parse interval"))?;
547            let interval_sec = (interval.epoch_in_micros() / 1_000_000) as i64;
548            let timestamp = chrono::Utc::now()
549                .timestamp()
550                .checked_sub(interval_sec)
551                .ok_or_else(|| anyhow!("invalid timestamp"))?;
552            AsOfType::Timestamp(as_of::Timestamp { timestamp })
553        }
554    };
555    Ok(Some(PbAsOf {
556        as_of_type: Some(as_of_type),
557    }))
558}
559
560pub fn to_iceberg_time_travel_as_of(
561    a: &Option<AsOf>,
562    timezone: &String,
563) -> Result<Option<IcebergTimeTravelInfo>> {
564    Ok(match a {
565        Some(AsOf::VersionNum(v)) => Some(IcebergTimeTravelInfo::Version(*v)),
566        Some(AsOf::TimestampNum(ts)) => Some(IcebergTimeTravelInfo::TimestampMs(ts * 1000)),
567        Some(AsOf::VersionString(_)) => {
568            bail!("Unsupported version string in iceberg time travel")
569        }
570        Some(AsOf::TimestampString(ts)) => {
571            let date_time = speedate::DateTime::parse_str_rfc3339(ts)
572                .map_err(|_e| anyhow!("fail to parse timestamp"))?;
573            let timestamp = if date_time.time.tz_offset.is_none() {
574                // If the input does not specify a time zone, use the time zone set by the "SET TIME ZONE" command.
575                let tz = Timestamptz::lookup_time_zone(timezone).map_err(|e| anyhow!(e))?;
576                match tz.with_ymd_and_hms(
577                    date_time.date.year.into(),
578                    date_time.date.month.into(),
579                    date_time.date.day.into(),
580                    date_time.time.hour.into(),
581                    date_time.time.minute.into(),
582                    date_time.time.second.into(),
583                ) {
584                    MappedLocalTime::Single(d) => Ok(d.timestamp()),
585                    MappedLocalTime::Ambiguous(_, _) | MappedLocalTime::None => {
586                        Err(anyhow!(format!(
587                            "failed to parse the timestamp {ts} with the specified time zone {tz}"
588                        )))
589                    }
590                }?
591            } else {
592                date_time.timestamp_tz()
593            };
594
595            Some(IcebergTimeTravelInfo::TimestampMs(
596                timestamp * 1000 + date_time.time.microsecond as i64 / 1000,
597            ))
598        }
599        Some(AsOf::ProcessTime) | Some(AsOf::ProcessTimeWithInterval(_)) => {
600            unreachable!()
601        }
602        None => None,
603    })
604}
605
606pub fn scan_ranges_as_strs(order_names: Vec<String>, scan_ranges: &Vec<ScanRange>) -> Vec<String> {
607    let mut range_strs = vec![];
608
609    let explain_max_range = 20;
610    for scan_range in scan_ranges.iter().take(explain_max_range) {
611        #[expect(clippy::disallowed_methods)]
612        let mut range_str = scan_range
613            .eq_conds
614            .iter()
615            .zip(order_names.iter())
616            .map(|(v, name)| match v {
617                Some(v) => format!("{} = {:?}", name, v),
618                None => format!("{} IS NULL", name),
619            })
620            .collect_vec();
621
622        let len = scan_range.eq_conds.len();
623        if !is_full_range(&scan_range.range) {
624            let bound_range_str = match (&scan_range.range.0, &scan_range.range.1) {
625                (Bound::Unbounded, Bound::Unbounded) => unreachable!(),
626                (Bound::Unbounded, ub) => ub_to_string(&order_names[len..], ub),
627                (lb, Bound::Unbounded) => lb_to_string(&order_names[len..], lb),
628                (lb, ub) => format!(
629                    "{} AND {}",
630                    lb_to_string(&order_names[len..], lb),
631                    ub_to_string(&order_names[len..], ub)
632                ),
633            };
634            range_str.push(bound_range_str);
635        }
636        range_strs.push(range_str.join(" AND "));
637    }
638    if scan_ranges.len() > explain_max_range {
639        range_strs.push("...".to_owned());
640    }
641    range_strs
642}
643
644pub fn ub_to_string(order_names: &[String], ub: &Bound<Vec<Option<ScalarImpl>>>) -> String {
645    match ub {
646        Bound::Included(v) => {
647            let (name, value) = row_to_string(order_names, v);
648            format!("{} <= {}", name, value)
649        }
650        Bound::Excluded(v) => {
651            let (name, value) = row_to_string(order_names, v);
652            format!("{} < {}", name, value)
653        }
654        Bound::Unbounded => unreachable!(),
655    }
656}
657
658pub fn lb_to_string(order_names: &[String], lb: &Bound<Vec<Option<ScalarImpl>>>) -> String {
659    match lb {
660        Bound::Included(v) => {
661            let (name, value) = row_to_string(order_names, v);
662            format!("{} >= {}", name, value)
663        }
664        Bound::Excluded(v) => {
665            let (name, value) = row_to_string(order_names, v);
666            format!("{} > {}", name, value)
667        }
668        Bound::Unbounded => unreachable!(),
669    }
670}
671
672pub fn row_to_string(
673    order_names: &[String],
674    struct_values: &Vec<Option<ScalarImpl>>,
675) -> (String, String) {
676    let mut names = vec![];
677    let mut values = vec![];
678    #[expect(clippy::disallowed_methods)]
679    for (name, value) in order_names.iter().zip(struct_values.iter()) {
680        names.push(name);
681        match value {
682            Some(v) => values.push(format!("{:?}", v)),
683            None => values.push("null".to_owned()),
684        }
685    }
686    if names.len() == 1 {
687        (names[0].clone(), values[0].clone())
688    } else {
689        (
690            format!("({})", names.iter().join(", ")),
691            format!("({})", values.iter().join(", ")),
692        )
693    }
694}