risingwave_frontend/optimizer/property/
func_dep.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::fmt;
16
17use fixedbitset::FixedBitSet;
18use itertools::Itertools;
19
20use crate::optimizer::property::Order;
21
22/// [`FunctionalDependency`] represent a dependency of from --> to.
23///
24/// For columns ABCD, the FD AC --> B is represented as {0, 2} --> {1} using `FixedBitset`.
25#[derive(Debug, PartialEq, Eq, Clone, Default, Hash)]
26pub struct FunctionalDependency {
27    from: FixedBitSet,
28    to: FixedBitSet,
29}
30
31impl fmt::Display for FunctionalDependency {
32    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
33        let from = self.from.ones().collect_vec();
34        let to = self.to.ones().collect_vec();
35        f.write_fmt(format_args!("{:?} --> {:?}", from, to))
36    }
37}
38
39impl FunctionalDependency {
40    /// Create a [`FunctionalDependency`] with bitset.
41    /// This indicate a from --> to dependency.
42    pub fn new(from: FixedBitSet, to: FixedBitSet) -> Self {
43        assert_eq!(
44            from.len(),
45            to.len(),
46            "from and to should have the same length"
47        );
48        assert!(!to.is_clear(), "`to` should contains at least one element");
49        FunctionalDependency { from, to }
50    }
51
52    pub fn from(&self) -> &FixedBitSet {
53        &self.from
54    }
55
56    pub fn to(&self) -> &FixedBitSet {
57        &self.to
58    }
59
60    /// Grow the capacity of [`FunctionalDependency`] to **columns**, all new columns initialized to
61    /// zero.
62    pub fn grow(&mut self, columns: usize) {
63        self.from.grow(columns);
64        self.to.grow(columns);
65    }
66
67    pub fn set_from(&mut self, column_index: usize, enabled: bool) {
68        self.from.set(column_index, enabled);
69    }
70
71    pub fn set_to(&mut self, column_index: usize, enabled: bool) {
72        self.to.set(column_index, enabled);
73    }
74
75    /// Create a [`FunctionalDependency`] with column indices. The order of the indices doesn't
76    /// matter. It is treated as a combination of columns.
77    pub fn with_indices(column_cnt: usize, from: &[usize], to: &[usize]) -> Self {
78        let from = {
79            let mut tmp = FixedBitSet::with_capacity(column_cnt);
80            for &i in from {
81                tmp.set(i, true);
82            }
83            tmp
84        };
85        let to = {
86            let mut tmp = FixedBitSet::with_capacity(column_cnt);
87            for &i in to {
88                tmp.set(i, true);
89            }
90            tmp
91        };
92        FunctionalDependency { from, to }
93    }
94
95    /// Create a [`FunctionalDependency`] with a key. The combination of these columns can determine
96    /// all other columns.
97    fn with_key(column_cnt: usize, key_indices: &[usize]) -> Self {
98        let mut from = FixedBitSet::with_capacity(column_cnt);
99        for &idx in key_indices {
100            from.set(idx, true);
101        }
102        let mut to = from.clone();
103        to.toggle_range(0..to.len());
104        FunctionalDependency { from, to }
105    }
106
107    /// Create a [`FunctionalDependency`] for constant columns.
108    /// These columns can be determined by any column.
109    pub fn with_constant(column_cnt: usize, constant_indices: &[usize]) -> Self {
110        let mut to = FixedBitSet::with_capacity(column_cnt);
111        for &i in constant_indices {
112            to.set(i, true);
113        }
114        FunctionalDependency {
115            from: FixedBitSet::with_capacity(column_cnt),
116            to,
117        }
118    }
119
120    pub fn into_parts(self) -> (FixedBitSet, FixedBitSet) {
121        (self.from, self.to)
122    }
123}
124
125/// [`FunctionalDependencySet`] contains the functional dependencies.
126///
127/// It is used in optimizer to track the dependencies between columns.
128#[derive(Debug, PartialEq, Eq, Hash, Clone, Default)]
129pub struct FunctionalDependencySet {
130    /// the number of columns
131    column_count: usize,
132    /// `strict` contains all strict functional dependencies.
133    ///
134    /// The strict functional dependency use the **NULL=** semantic. It means that all NULLs are
135    /// considered as equal. So for following table, A --> B is not valid.
136    /// **NOT** allowed.
137    /// ```text
138    ///   A   | B
139    /// ------|---
140    ///  NULL | 1
141    ///  NULL | 2
142    /// ```
143    ///
144    /// Currently we only have strict dependencies, but we may also have lax dependencies in the
145    /// future.
146    strict: Vec<FunctionalDependency>,
147}
148
149impl fmt::Display for FunctionalDependencySet {
150    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
151        f.write_str("{")?;
152        self.strict.iter().format(", ").fmt(f)?;
153        f.write_str("}")
154    }
155}
156
157impl FunctionalDependencySet {
158    /// Create an empty [`FunctionalDependencySet`]
159    pub fn new(column_count: usize) -> Self {
160        Self {
161            strict: Vec::new(),
162            column_count,
163        }
164    }
165
166    /// Create a [`FunctionalDependencySet`] with the indices of a key.
167    ///
168    /// The **combination** of these columns can determine all other columns.
169    pub fn with_key(column_cnt: usize, key_indices: &[usize]) -> Self {
170        let mut tmp = Self::new(column_cnt);
171        tmp.add_key(key_indices);
172        tmp
173    }
174
175    /// Create a [`FunctionalDependencySet`] with a dependency [`Vec`]
176    pub fn with_dependencies(
177        column_count: usize,
178        strict_dependencies: Vec<FunctionalDependency>,
179    ) -> Self {
180        for i in &strict_dependencies {
181            assert_eq!(column_count, i.from().len())
182        }
183        Self {
184            strict: strict_dependencies,
185            column_count,
186        }
187    }
188
189    pub fn as_dependencies_mut(&mut self) -> &mut Vec<FunctionalDependency> {
190        &mut self.strict
191    }
192
193    pub fn as_dependencies(&self) -> &Vec<FunctionalDependency> {
194        &self.strict
195    }
196
197    pub fn into_dependencies(self) -> Vec<FunctionalDependency> {
198        self.strict
199    }
200
201    /// Add a functional dependency to a [`FunctionalDependencySet`].
202    pub fn add_functional_dependency(&mut self, fd: FunctionalDependency) {
203        let FunctionalDependency { from, to } = fd;
204        assert_eq!(self.column_count, from.len());
205        if !to.is_clear() {
206            match self.strict.iter().position(|elem| elem.from == from) {
207                Some(idx) => self.strict[idx].to.union_with(&to),
208                None => self.strict.push(FunctionalDependency::new(from, to)),
209            }
210        }
211    }
212
213    /// Add key columns to a  [`FunctionalDependencySet`].
214    pub fn add_key(&mut self, key_indices: &[usize]) {
215        self.add_functional_dependency(FunctionalDependency::with_key(
216            self.column_count,
217            key_indices,
218        ));
219    }
220
221    /// Add constant columns to a  [`FunctionalDependencySet`].
222    pub fn add_constant_columns(&mut self, constant_columns: &[usize]) {
223        self.add_functional_dependency(FunctionalDependency::with_constant(
224            self.column_count,
225            constant_columns,
226        ));
227    }
228
229    /// Add a dependency to [`FunctionalDependencySet`] using column indices.
230    pub fn add_functional_dependency_by_column_indices(&mut self, from: &[usize], to: &[usize]) {
231        self.add_functional_dependency(FunctionalDependency::with_indices(
232            self.column_count,
233            from,
234            to,
235        ))
236    }
237
238    /// O(d), where `d` is the number of functional dependencies.
239    /// The call to `is_subset` is technically O(n),
240    /// but we can consider it O(1) since the constant factor is 1/usize.
241    fn get_closure(&self, columns: &FixedBitSet) -> FixedBitSet {
242        let mut closure = columns.clone();
243        let mut no_updates;
244        loop {
245            no_updates = true;
246            for FunctionalDependency { from, to } in &self.strict {
247                if from.is_subset(&closure) && !to.is_subset(&closure) {
248                    closure.union_with(to);
249                    no_updates = false;
250                }
251            }
252            if no_updates {
253                break;
254            }
255        }
256        closure
257    }
258
259    /// Return `true` if the dependency determinant -> dependant exists.
260    /// O(d), where the dominant cost is from `Self::get_closure`, which has O(d) complexity.
261    pub fn is_determined_by(&self, determinant: &FixedBitSet, dependant: &FixedBitSet) -> bool {
262        self.get_closure(determinant).is_superset(dependant)
263    }
264
265    /// This just checks if the columns specified by the `columns` bitset
266    /// determines each other column.
267    fn is_key_inner(&self, columns: &FixedBitSet) -> bool {
268        let all_columns = {
269            let mut tmp = columns.clone();
270            tmp.set_range(.., true);
271            tmp
272        };
273        self.is_determined_by(columns, &all_columns)
274    }
275
276    /// Return true if the combination of `columns` can fully determine other columns.
277    pub fn is_key(&self, columns: &[usize]) -> bool {
278        let mut key_bitset = FixedBitSet::from_iter(columns.iter().copied());
279        key_bitset.grow(self.column_count);
280        self.is_key_inner(&key_bitset)
281    }
282
283    /// This is just a wrapper around `Self::minimize_key_bitset` to minimize `key_indices`.
284    pub fn minimize_key(&self, key_indices: &[usize]) -> Vec<usize> {
285        let mut key_bitset = FixedBitSet::from_iter(key_indices.iter().copied());
286        key_bitset.grow(self.column_count);
287        let res = self.minimize_key_bitset(key_bitset);
288        res.ones().collect_vec()
289    }
290
291    /// -------
292    /// Overview
293    /// -------
294    /// Remove redundant columns from the given set.
295    ///
296    /// Redundant columns can be functionally determined by other columns so there is no need to
297    /// keep them in a key.
298    ///
299    /// Note that the accurate minimization algorithm can take O(2^n) time,
300    /// so we use a approximate algorithm which use O(cn) time. `c` is the number of columns, and
301    /// `n` is the number of functional dependency rules.
302    ///
303    /// This algorithm may not necessarily find the key with the least number of columns.
304    /// But it will ensure that no redundant columns will be preserved.
305    ///
306    /// ---------
307    /// Algorithm
308    /// ---------
309    /// This algorithm removes columns one by one and check
310    /// whether the remaining columns can form a key or not. If the remaining columns can form a
311    /// key, then this column can be removed.
312    fn minimize_key_bitset(&self, key: FixedBitSet) -> FixedBitSet {
313        assert!(
314            self.is_key_inner(&key),
315            "{:?} is not a key!",
316            key.ones().collect_vec()
317        );
318        let mut new_key = key.clone();
319        for i in key.ones() {
320            new_key.set(i, false);
321            if !self.is_key_inner(&new_key) {
322                new_key.set(i, true);
323            }
324        }
325        new_key
326    }
327
328    /// Wrapper around `Self::minimize_order_key_bitset` to minimize `order_key`.
329    /// View the documentation of `Self::minimize_order_key_bitset` for more information.
330    /// In the process of minimizing the order key,
331    /// we must ensure that if the indices are part of
332    /// distribution key, they must not be pruned.
333    pub fn minimize_order_key(&self, order_key: Order, dist_key_indices: &[usize]) -> Order {
334        let dist_key_bitset = FixedBitSet::from_iter(dist_key_indices.iter().copied());
335        let order_key_indices = order_key
336            .column_orders
337            .iter()
338            .map(|o| o.column_index)
339            .collect();
340        let min_bitset = self.minimize_order_key_bitset(order_key_indices);
341        let order = order_key
342            .column_orders
343            .iter()
344            .filter(|o| {
345                min_bitset.contains(o.column_index) || dist_key_bitset.contains(o.column_index)
346            })
347            .cloned()
348            .collect_vec();
349        Order::new(order)
350    }
351
352    /// 1. Iterate over the prefixes of the order key.
353    /// 2. If some continuous subset of columns and the next
354    ///    column of the order key form a functional dependency,
355    ///    we can prune that column.
356    /// 3. This function has O(dn) complexity, where:
357    ///    1. `d` is the number of functional dependencies,
358    ///       because each iteration in the loop calls `Self::is_determined_by`.
359    ///    2. `n` is the number of columns.
360    fn minimize_order_key_bitset(&self, order_key: Vec<usize>) -> FixedBitSet {
361        if order_key.is_empty() {
362            return FixedBitSet::new();
363        }
364
365        // Initialize current_prefix.
366        let mut prefix = FixedBitSet::with_capacity(self.column_count);
367
368        // Grow current_prefix.
369        for i in order_key {
370            let mut next = FixedBitSet::with_capacity(self.column_count);
371            next.set(i, true);
372
373            // Check if prefix -> next_column
374            if !self.is_determined_by(&prefix, &next) {
375                prefix.set(i, true);
376            }
377        }
378        prefix
379    }
380}
381
382#[cfg(test)]
383mod tests {
384    use fixedbitset::FixedBitSet;
385    use itertools::Itertools;
386
387    use super::FunctionalDependencySet;
388
389    #[test]
390    fn test_minimize_key() {
391        let mut fd_set = FunctionalDependencySet::new(5);
392        // [0, 2, 3, 4] is a key
393        fd_set.add_key(&[0, 2, 3, 4]);
394        // 0 is constant
395        fd_set.add_constant_columns(&[0]);
396        // 1, 2 --> 3
397        fd_set.add_functional_dependency_by_column_indices(&[1, 2], &[3]);
398        // 0, 2 --> 3
399        fd_set.add_functional_dependency_by_column_indices(&[0, 2], &[3]);
400        // 3, 4 --> 2
401        fd_set.add_functional_dependency_by_column_indices(&[3, 4], &[2]);
402        // therefore, column 0 and column 2 can be removed from key
403        let key = fd_set.minimize_key(&[0, 2, 3, 4]);
404        assert_eq!(key, &[3, 4]);
405    }
406
407    #[test]
408    fn test_with_key() {
409        let fd = FunctionalDependencySet::with_key(4, &[1]);
410        let fd_inner = fd.into_dependencies();
411
412        assert_eq!(fd_inner.len(), 1);
413
414        let (from, to) = fd_inner[0].clone().into_parts();
415        // 1 --> 0, 2, 3
416        assert_eq!(from.ones().collect_vec(), &[1]);
417        assert_eq!(to.ones().collect_vec(), &[0, 2, 3]);
418    }
419
420    #[test]
421    fn test_add_key() {
422        let mut fd = FunctionalDependencySet::new(4);
423        fd.add_key(&[1]);
424        let fd_inner = fd.into_dependencies();
425
426        assert_eq!(fd_inner.len(), 1);
427
428        let (from, to) = fd_inner[0].clone().into_parts();
429        assert_eq!(from.ones().collect_vec(), &[1]);
430        assert_eq!(to.ones().collect_vec(), &[0, 2, 3]);
431    }
432
433    #[test]
434    fn test_add_constant_columns() {
435        let mut fd = FunctionalDependencySet::new(4);
436        fd.add_constant_columns(&[1]);
437        let fd_inner = fd.into_dependencies();
438
439        assert_eq!(fd_inner.len(), 1);
440
441        let (from, to) = fd_inner[0].clone().into_parts();
442        assert!(from.ones().collect_vec().is_empty());
443        assert_eq!(to.ones().collect_vec(), &[1]);
444    }
445
446    #[test]
447    fn test_add_fd_by_indices() {
448        let mut fd = FunctionalDependencySet::new(4);
449        fd.add_functional_dependency_by_column_indices(&[1, 2], &[0]); // (1, 2) --> (0), 4 columns
450        let fd_inner = fd.into_dependencies();
451
452        assert_eq!(fd_inner.len(), 1);
453
454        let (from, to) = fd_inner[0].clone().into_parts();
455        assert_eq!(from.ones().collect_vec(), &[1, 2]);
456        assert_eq!(to.ones().collect_vec(), &[0]);
457    }
458
459    #[test]
460    fn test_determined_by() {
461        let mut fd = FunctionalDependencySet::new(5);
462        fd.add_functional_dependency_by_column_indices(&[1, 2], &[0]); // (1, 2) --> (0)
463        fd.add_functional_dependency_by_column_indices(&[0, 1], &[3]); // (0, 1) --> (3)
464        fd.add_functional_dependency_by_column_indices(&[3], &[4]); // (3) --> (4)
465        let from = FixedBitSet::from_iter([1usize, 2usize]);
466        let to = FixedBitSet::from_iter([4usize]);
467        assert!(fd.is_determined_by(&from, &to)); // (1, 2) --> (4) holds
468    }
469
470    // (1, 2) -> (0)
471    // [1, 2, 0] -> [1, 2] (prune, since 0 is after continuous (1, 2))
472    #[test]
473    fn test_minimize_order_by_prefix() {
474        let mut fd = FunctionalDependencySet::new(5);
475        fd.add_functional_dependency_by_column_indices(&[1, 2], &[0]); // (1, 2) --> (0)
476        let order_key = vec![1usize, 2usize, 0usize];
477        let actual_key = fd.minimize_order_key_bitset(order_key);
478        let mut expected_key = FixedBitSet::with_capacity(5);
479        expected_key.set(1, true);
480        expected_key.set(2, true);
481        println!("{:b}", actual_key);
482        println!("{:b}", expected_key);
483        assert_eq!(actual_key, expected_key);
484    }
485
486    // (1, 2) -> (0)
487    // [3, 1, 2, 0] -> [3, 1, 2] (prune, since 0 is after continuous (1, 2))
488    #[test]
489    fn test_minimize_order_by_tail_subset_prefix() {
490        let mut fd = FunctionalDependencySet::new(5);
491        fd.add_functional_dependency_by_column_indices(&[1, 2], &[0]); // (1, 2) --> (0)
492        let order_key = vec![3usize, 1usize, 2usize, 0usize];
493        let actual_key = fd.minimize_order_key_bitset(order_key);
494        let mut expected_key = FixedBitSet::with_capacity(5);
495        expected_key.set(1, true);
496        expected_key.set(2, true);
497        expected_key.set(3, true);
498        println!("{:b}", actual_key);
499        println!("{:b}", expected_key);
500        assert_eq!(actual_key, expected_key);
501    }
502
503    // (1, 2) -> (0)
504    // [3, 1, 2, 4, 0] -> [3, 1, 2, 4] (prune, since continuous (1, 2) is before 0)
505    #[test]
506    fn test_minimize_order_by_middle_subset_prefix() {
507        let mut fd = FunctionalDependencySet::new(5);
508        fd.add_functional_dependency_by_column_indices(&[1, 2], &[0]); // (1, 2) --> (0)
509        let order_key = vec![3usize, 1usize, 2usize, 4usize, 0usize];
510        let actual_key = fd.minimize_order_key_bitset(order_key);
511        let mut expected_key = FixedBitSet::with_capacity(5);
512        expected_key.set(1, true);
513        expected_key.set(2, true);
514        expected_key.set(3, true);
515        expected_key.set(4, true);
516        println!("{:b}", actual_key);
517        println!("{:b}", expected_key);
518        assert_eq!(actual_key, expected_key);
519    }
520
521    // (1, 2) -> (0)
522    // [0, 1, 2] -> [0, 1, 2] (no pruning)
523    #[test]
524    fn test_minimize_order_by_suffix_cant_prune_prefix() {
525        let mut fd = FunctionalDependencySet::new(5);
526        fd.add_functional_dependency_by_column_indices(&[1, 2], &[0]); // (1, 2) --> (0)
527        let order_key = vec![0usize, 1usize, 2usize];
528        let actual_key = fd.minimize_order_key_bitset(order_key);
529        let mut expected_key = FixedBitSet::with_capacity(5);
530        expected_key.set(0, true);
531        expected_key.set(1, true);
532        expected_key.set(2, true);
533        println!("{:b}", actual_key);
534        println!("{:b}", expected_key);
535        assert_eq!(actual_key, expected_key);
536    }
537
538    #[test]
539    fn test_minimize_order_by_with_empty_key() {
540        let mut fd = FunctionalDependencySet::new(5);
541        fd.add_functional_dependency_by_column_indices(&[], &[0]); // () --> (0)
542        let order_key = vec![0];
543        let actual_key = fd.minimize_order_key_bitset(order_key);
544        let expected_key = FixedBitSet::with_capacity(5);
545        println!("{:b}", actual_key);
546        println!("{:b}", expected_key);
547        assert_eq!(actual_key, expected_key);
548    }
549}