risingwave_connector/parser/
chunk_builder.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::LazyLock;
16
17use risingwave_common::array::stream_record::RecordType;
18use risingwave_common::array::{ArrayBuilderImpl, Op, StreamChunk};
19use risingwave_common::bitmap::BitmapBuilder;
20use risingwave_common::log::LogSuppressor;
21use risingwave_common::types::{
22    Datum, DatumCow, ScalarRefImpl, ToDatumRef, ToText, datum_ref_type_match,
23};
24use risingwave_common::util::iter_util::ZipEqFast;
25use risingwave_connector_codec::decoder::{AccessError, AccessResult};
26use risingwave_pb::plan_common::additional_column::ColumnType as AdditionalColumnType;
27use smallvec::SmallVec;
28use thiserror_ext::AsReport;
29
30use super::MessageMeta;
31use crate::parser::utils::{
32    extract_cdc_meta_column, extract_header_inner_from_meta, extract_headers_from_meta,
33    extract_pulsar_message_id_data_from_meta, extract_subject_from_meta,
34    extract_timestamp_from_meta,
35};
36use crate::source::{SourceColumnDesc, SourceColumnType, SourceCtrlOpts, SourceMeta};
37
38/// Maximum number of rows in a transaction. If a transaction is larger than this, it will be force
39/// committed to avoid potential OOM.
40const MAX_TRANSACTION_SIZE: usize = 4096;
41
42/// Represents an ongoing transaction.
43struct Transaction {
44    id: Box<str>,
45    len: usize,
46}
47
48/// A builder for building a [`StreamChunk`] from [`SourceColumnDesc`].
49///
50/// Output chunk size is controlled by `source_ctrl_opts.chunk_size` and `source_ctrl_opts.split_txn`.
51/// During building process, it's possible that multiple chunks are built even without any explicit
52/// call to `finish_current_chunk`. This mainly happens when we find more than one records in one
53/// `SourceMessage` when parsing it. User of this builder should call `consume_ready_chunks` to consume
54/// the built chunks from time to time, to avoid the buffer from growing too large.
55pub struct SourceStreamChunkBuilder {
56    column_descs: Vec<SourceColumnDesc>,
57    source_ctrl_opts: SourceCtrlOpts,
58    builders: Vec<ArrayBuilderImpl>,
59    op_builder: Vec<Op>,
60    vis_builder: BitmapBuilder,
61    ongoing_txn: Option<Transaction>,
62    ready_chunks: SmallVec<[StreamChunk; 1]>,
63}
64
65impl SourceStreamChunkBuilder {
66    pub fn new(column_descs: Vec<SourceColumnDesc>, source_ctrl_opts: SourceCtrlOpts) -> Self {
67        let (builders, op_builder, vis_builder) =
68            Self::create_builders(&column_descs, source_ctrl_opts.chunk_size);
69
70        Self {
71            column_descs,
72            source_ctrl_opts,
73            builders,
74            op_builder,
75            vis_builder,
76            ongoing_txn: None,
77            ready_chunks: SmallVec::new(),
78        }
79    }
80
81    fn create_builders(
82        column_descs: &[SourceColumnDesc],
83        chunk_size: usize,
84    ) -> (Vec<ArrayBuilderImpl>, Vec<Op>, BitmapBuilder) {
85        let reserved_capacity = chunk_size + 1; // it's possible to have an additional `U-` at the end
86        let builders = column_descs
87            .iter()
88            .map(|desc| desc.data_type.create_array_builder(reserved_capacity))
89            .collect();
90        let op_builder = Vec::with_capacity(reserved_capacity);
91        let vis_builder = BitmapBuilder::with_capacity(reserved_capacity);
92        (builders, op_builder, vis_builder)
93    }
94
95    /// Begin a (CDC) transaction with the given `txn_id`.
96    pub fn begin_transaction(&mut self, txn_id: Box<str>) {
97        if let Some(ref txn) = self.ongoing_txn {
98            tracing::warn!(
99                ongoing_txn_id = txn.id,
100                new_txn_id = txn_id,
101                "already in a transaction"
102            );
103        }
104        tracing::debug!(txn_id, "begin upstream transaction");
105        self.ongoing_txn = Some(Transaction { id: txn_id, len: 0 });
106    }
107
108    /// Commit the ongoing transaction with the given `txn_id`.
109    pub fn commit_transaction(&mut self, txn_id: Box<str>) {
110        if let Some(txn) = self.ongoing_txn.take() {
111            if txn.id != txn_id {
112                tracing::warn!(
113                    expected_txn_id = txn.id,
114                    actual_txn_id = txn_id,
115                    "unexpected transaction id"
116                );
117            }
118            tracing::debug!(txn_id, "commit upstream transaction");
119
120            if self.current_chunk_len() >= self.source_ctrl_opts.chunk_size {
121                // if `split_txn` is on, we should've finished the chunk already
122                assert!(!self.source_ctrl_opts.split_txn);
123                self.finish_current_chunk();
124            }
125        } else {
126            tracing::warn!(txn_id, "no ongoing transaction to commit");
127        }
128    }
129
130    /// Check if the builder is in an ongoing transaction.
131    pub fn is_in_transaction(&self) -> bool {
132        self.ongoing_txn.is_some()
133    }
134
135    /// Get a row writer for parser to write records to the builder.
136    pub fn row_writer(&mut self) -> SourceStreamChunkRowWriter<'_> {
137        SourceStreamChunkRowWriter {
138            builder: self,
139            visible: true, // write visible rows by default
140            row_meta: None,
141        }
142    }
143
144    /// Write a heartbeat record to the builder. The builder will decide whether to finish the
145    /// current chunk or not. Currently it ensures that heartbeats are always in separate chunks.
146    pub fn heartbeat(&mut self, meta: MessageMeta<'_>) {
147        if self.current_chunk_len() > 0 {
148            // If there are records in the chunk, finish it first.
149            // If there's an ongoing transaction, `finish_current_chunk` will handle it properly.
150            // Note this
151            self.finish_current_chunk();
152        }
153
154        _ = self
155            .row_writer()
156            .invisible()
157            .with_meta(meta)
158            .do_insert(|_| Ok(Datum::None));
159        self.finish_current_chunk(); // each heartbeat should be a separate chunk
160    }
161
162    /// Finish and build a [`StreamChunk`] from the current pending records in the builder,
163    /// no matter whether the builder is in a transaction or not, `split_txn` or not. The
164    /// built chunk will be appended to the `ready_chunks` and the builder will be reset.
165    pub fn finish_current_chunk(&mut self) {
166        if self.op_builder.is_empty() {
167            return;
168        }
169
170        let (builders, op_builder, vis_builder) =
171            Self::create_builders(&self.column_descs, self.source_ctrl_opts.chunk_size);
172        let chunk = StreamChunk::with_visibility(
173            std::mem::replace(&mut self.op_builder, op_builder),
174            std::mem::replace(&mut self.builders, builders)
175                .into_iter()
176                .map(|builder| builder.finish().into())
177                .collect(),
178            std::mem::replace(&mut self.vis_builder, vis_builder).finish(),
179        );
180        self.ready_chunks.push(chunk);
181
182        if let Some(ref mut txn) = self.ongoing_txn {
183            tracing::warn!(
184                txn_id = txn.id,
185                len = txn.len,
186                "splitting an ongoing transaction"
187            );
188            txn.len = 0;
189        }
190    }
191
192    /// Consumes and returns the ready [`StreamChunk`]s.
193    pub fn consume_ready_chunks(&mut self) -> impl ExactSizeIterator<Item = StreamChunk> + '_ {
194        self.ready_chunks.drain(..)
195    }
196
197    fn current_chunk_len(&self) -> usize {
198        self.op_builder.len()
199    }
200
201    /// Commit a newly-written record by appending `op` and `vis` to the corresponding builders.
202    /// This is supposed to be called via the `row_writer` only.
203    fn commit_record(&mut self, op: Op, vis: bool) {
204        self.op_builder.push(op);
205        self.vis_builder.append(vis);
206
207        let curr_chunk_size = self.current_chunk_len();
208        let max_chunk_size = self.source_ctrl_opts.chunk_size;
209
210        if let Some(ref mut txn) = self.ongoing_txn {
211            txn.len += 1;
212
213            if txn.len >= MAX_TRANSACTION_SIZE
214                || (self.source_ctrl_opts.split_txn && curr_chunk_size >= max_chunk_size)
215            {
216                self.finish_current_chunk();
217            }
218        } else if curr_chunk_size >= max_chunk_size {
219            self.finish_current_chunk();
220        }
221    }
222}
223
224/// `SourceStreamChunkRowWriter` is responsible to write one or more records to the [`StreamChunk`],
225/// where each contains either one row (Insert/Delete) or two rows (Update) that can be written atomically.
226///
227/// Callers are supposed to call one of the `insert`, `delete` or `update` methods to write a record,
228/// providing a closure that produces one or two [`Datum`]s by corresponding [`SourceColumnDesc`].
229/// Specifically,
230/// - only columns with [`SourceColumnType::Normal`] need to be handled;
231/// - errors for non-primary key columns will be ignored and filled with default value instead;
232/// - other errors will be propagated.
233pub struct SourceStreamChunkRowWriter<'a> {
234    builder: &'a mut SourceStreamChunkBuilder,
235
236    /// Whether the rows written by this writer should be visible in output `StreamChunk`.
237    visible: bool,
238
239    /// An optional meta data of the original message.
240    ///
241    /// When this is set by `with_meta`, it'll be used to fill the columns of types other than [`SourceColumnType::Normal`].
242    row_meta: Option<MessageMeta<'a>>,
243}
244
245impl<'a> SourceStreamChunkRowWriter<'a> {
246    /// Set the meta data of the original message for this row writer.
247    ///
248    /// This should always be called except for tests.
249    pub fn with_meta(mut self, row_meta: MessageMeta<'a>) -> Self {
250        self.row_meta = Some(row_meta);
251        self
252    }
253
254    pub fn row_meta(&self) -> Option<MessageMeta<'a>> {
255        self.row_meta
256    }
257
258    pub fn source_meta(&self) -> &'a SourceMeta {
259        self.row_meta
260            .map(|m| m.source_meta)
261            .unwrap_or(&SourceMeta::Empty)
262    }
263
264    /// Convert the row writer to invisible row writer.
265    pub fn invisible(mut self) -> Self {
266        self.visible = false;
267        self
268    }
269}
270
271impl SourceStreamChunkRowWriter<'_> {
272    fn do_action<'a, A: RowWriterAction>(
273        &'a mut self,
274        mut f: impl FnMut(&SourceColumnDesc) -> AccessResult<A::Output<'a>>,
275    ) -> AccessResult<()> {
276        let mut parse_field = |desc: &SourceColumnDesc| {
277            match f(desc) {
278                Ok(output) => {
279                    // Defensive: validate the datum's shape matches expected column type before appending.
280                    A::type_check(desc, &output)?;
281                    Ok(output)
282                }
283
284                // Throw error for failed access to primary key columns.
285                Err(e) if desc.is_pk => Err(e),
286                // Ignore error for other columns and fill in `NULL` instead.
287                Err(error) => {
288                    // TODO: figure out a way to fill in not-null default value if user specifies one
289                    // TODO: decide whether the error should not be ignored (e.g., even not a valid Debezium message)
290                    // TODO: not using tracing span to provide `split_id` and `offset` due to performance concern,
291                    //       see #13105
292                    static LOG_SUPPRESSOR: LazyLock<LogSuppressor> =
293                        LazyLock::new(LogSuppressor::default);
294                    if let Ok(suppressed_count) = LOG_SUPPRESSOR.check() {
295                        tracing::warn!(
296                            error = %error.as_report(),
297                            split_id = self.row_meta.as_ref().map(|m| m.split_id),
298                            offset = self.row_meta.as_ref().map(|m| m.offset),
299                            column = desc.name,
300                            suppressed_count,
301                            "failed to parse non-pk column, padding with `NULL`"
302                        );
303                    }
304                    Ok(A::output_for(Datum::None))
305                }
306            }
307        };
308
309        let mut wrapped_f = |desc: &SourceColumnDesc| {
310            match (&desc.column_type, &desc.additional_column.column_type) {
311                (&SourceColumnType::Offset | &SourceColumnType::RowId, _) => {
312                    // SourceColumnType is for CDC source only.
313                    Ok(A::output_for(
314                        self.row_meta
315                            .as_ref()
316                            .and_then(|row_meta| row_meta.value_for_column(desc)),
317                    ))
318                }
319                (&SourceColumnType::Meta, _)
320                    if matches!(
321                        &self.row_meta.map(|ele| ele.source_meta),
322                        &Some(SourceMeta::Kafka(_) | SourceMeta::DebeziumCdc(_))
323                    ) =>
324                {
325                    // SourceColumnType is for CDC source only.
326                    Ok(A::output_for(
327                        self.row_meta
328                            .as_ref()
329                            .and_then(|row_meta| row_meta.value_for_column(desc)),
330                    ))
331                }
332
333                (
334                    _, // for cdc tables
335                    &Some(ref col @ AdditionalColumnType::DatabaseName(_))
336                    | &Some(ref col @ AdditionalColumnType::TableName(_)),
337                ) => {
338                    // For MongoDB CDC, both database_name and collection_name should be parsed from payload
339                    // Check if this is MongoDB CDC by looking for CollectionName column in schema
340                    let is_mongodb_cdc = self.builder.column_descs.iter().any(|d| {
341                        matches!(
342                            d.additional_column.column_type,
343                            Some(AdditionalColumnType::CollectionName(_))
344                        )
345                    });
346
347                    if matches!(col, AdditionalColumnType::DatabaseName(_)) && is_mongodb_cdc {
348                        // MongoDB CDC database_name should be parsed from payload.source.db
349                        parse_field(desc)
350                    } else {
351                        match self.row_meta {
352                            Some(row_meta) => {
353                                if let SourceMeta::DebeziumCdc(cdc_meta) = row_meta.source_meta {
354                                    Ok(A::output_for(extract_cdc_meta_column(
355                                        cdc_meta,
356                                        col,
357                                        desc.name.as_str(),
358                                    )?))
359                                } else {
360                                    Err(AccessError::Uncategorized {
361                                        message: "CDC metadata not found in the message".to_owned(),
362                                    })
363                                }
364                            }
365                            None => parse_field(desc), // parse from payload
366                        }
367                    }
368                }
369                (_, &Some(AdditionalColumnType::Timestamp(_))) => match self.row_meta {
370                    Some(row_meta) => Ok(A::output_for(extract_timestamp_from_meta(
371                        row_meta.source_meta,
372                    ))),
373                    None => parse_field(desc), // parse from payload
374                },
375                (_, &Some(AdditionalColumnType::CollectionName(_))) => {
376                    // collection name for `mongodb-cdc` should be parsed from the message payload
377                    parse_field(desc)
378                }
379                (_, &Some(AdditionalColumnType::Subject(_))) => Ok(A::output_for(
380                    self.row_meta
381                        .as_ref()
382                        .and_then(|ele| extract_subject_from_meta(ele.source_meta))
383                        .unwrap_or(None),
384                )),
385                (_, &Some(AdditionalColumnType::Partition(_))) => {
386                    // the meta info does not involve spec connector
387                    Ok(A::output_for(
388                        self.row_meta
389                            .as_ref()
390                            .map(|ele| ScalarRefImpl::Utf8(ele.split_id)),
391                    ))
392                }
393                (_, &Some(AdditionalColumnType::Offset(_))) => {
394                    // the meta info does not involve spec connector
395                    Ok(A::output_for(
396                        self.row_meta
397                            .as_ref()
398                            .map(|ele| ScalarRefImpl::Utf8(ele.offset)),
399                    ))
400                }
401                (_, &Some(AdditionalColumnType::HeaderInner(ref header_inner))) => {
402                    Ok(A::output_for(
403                        self.row_meta
404                            .as_ref()
405                            .and_then(|ele| {
406                                extract_header_inner_from_meta(
407                                    ele.source_meta,
408                                    header_inner.inner_field.as_ref(),
409                                    header_inner.data_type.as_ref(),
410                                )
411                            })
412                            .unwrap_or(Datum::None.into()),
413                    ))
414                }
415                (_, &Some(AdditionalColumnType::Headers(_))) => Ok(A::output_for(
416                    self.row_meta
417                        .as_ref()
418                        .and_then(|ele| extract_headers_from_meta(ele.source_meta))
419                        .unwrap_or(None),
420                )),
421                (_, &Some(AdditionalColumnType::PulsarMessageIdData(_))) => {
422                    // message_id_data is derived internally, so it's not included here
423                    Ok(A::output_for(
424                        self.row_meta
425                            .as_ref()
426                            .and_then(|ele| {
427                                extract_pulsar_message_id_data_from_meta(ele.source_meta)
428                            })
429                            .unwrap_or(None),
430                    ))
431                }
432                (_, &Some(AdditionalColumnType::Filename(_))) => {
433                    // Filename is used as partition in FS connectors
434                    Ok(A::output_for(
435                        self.row_meta
436                            .as_ref()
437                            .map(|ele| ScalarRefImpl::Utf8(ele.split_id)),
438                    ))
439                }
440                (_, &Some(AdditionalColumnType::Payload(_))) => {
441                    // ingest the whole payload as a single column
442                    // do special logic in `KvEvent::access_field`
443                    parse_field(desc)
444                }
445                (_, _) => {
446                    // For normal columns, call the user provided closure.
447                    parse_field(desc)
448                }
449            }
450        };
451
452        // Columns that changes have been applied to. Used to rollback when an error occurs.
453        let mut applied_columns = 0;
454
455        let result = (self.builder.column_descs.iter())
456            .zip_eq_fast(self.builder.builders.iter_mut())
457            .try_for_each(|(desc, builder)| {
458                wrapped_f(desc).map(|output| {
459                    A::apply(builder, output);
460                    applied_columns += 1;
461                })
462            });
463
464        match result {
465            Ok(_) => {
466                // commit the action by appending `Op`s and visibility
467                for op in A::RECORD_TYPE.ops() {
468                    self.builder.commit_record(*op, self.visible);
469                }
470
471                Ok(())
472            }
473            Err(e) => {
474                for i in 0..applied_columns {
475                    A::rollback(&mut self.builder.builders[i]);
476                }
477                Err(e)
478            }
479        }
480    }
481
482    /// Write an `Insert` record to the [`StreamChunk`], with the given fallible closure that
483    /// produces one [`Datum`] by corresponding [`SourceColumnDesc`].
484    ///
485    /// See the [struct-level documentation](SourceStreamChunkRowWriter) for more details.
486    #[inline(always)]
487    pub fn do_insert<'a, D>(
488        &mut self,
489        mut f: impl FnMut(&SourceColumnDesc) -> AccessResult<D>,
490    ) -> AccessResult<()>
491    where
492        D: Into<DatumCow<'a>>,
493    {
494        self.do_action::<InsertAction>(|desc| f(desc).map(Into::into))
495    }
496
497    /// Write a `Delete` record to the [`StreamChunk`], with the given fallible closure that
498    /// produces one [`Datum`] by corresponding [`SourceColumnDesc`].
499    ///
500    /// See the [struct-level documentation](SourceStreamChunkRowWriter) for more details.
501    #[inline(always)]
502    pub fn do_delete<'a, D>(
503        &mut self,
504        mut f: impl FnMut(&SourceColumnDesc) -> AccessResult<D>,
505    ) -> AccessResult<()>
506    where
507        D: Into<DatumCow<'a>>,
508    {
509        self.do_action::<DeleteAction>(|desc| f(desc).map(Into::into))
510    }
511
512    /// Write a `Update` record to the [`StreamChunk`], with the given fallible closure that
513    /// produces two [`Datum`]s as old and new value by corresponding [`SourceColumnDesc`].
514    ///
515    /// See the [struct-level documentation](SourceStreamChunkRowWriter) for more details.
516    #[inline(always)]
517    pub fn do_update<'a, D1, D2>(
518        &mut self,
519        mut f: impl FnMut(&SourceColumnDesc) -> AccessResult<(D1, D2)>,
520    ) -> AccessResult<()>
521    where
522        D1: Into<DatumCow<'a>>,
523        D2: Into<DatumCow<'a>>,
524    {
525        self.do_action::<UpdateAction>(|desc| f(desc).map(|(old, new)| (old.into(), new.into())))
526    }
527}
528
529trait RowWriterAction {
530    type Output<'a>;
531    const RECORD_TYPE: RecordType;
532
533    fn output_for<'a>(datum: impl Into<DatumCow<'a>>) -> Self::Output<'a>;
534
535    fn type_check(desc: &SourceColumnDesc, output: &Self::Output<'_>) -> AccessResult<()>;
536
537    fn apply(builder: &mut ArrayBuilderImpl, output: Self::Output<'_>);
538
539    fn rollback(builder: &mut ArrayBuilderImpl);
540}
541
542#[inline(always)]
543fn check_datum_type(
544    desc: &SourceColumnDesc,
545    datum: risingwave_common::types::DatumRef<'_>,
546) -> AccessResult<()> {
547    if datum_ref_type_match(&desc.data_type, datum) {
548        Ok(())
549    } else {
550        let (got, value) = match datum {
551            None => unreachable!("datum_ref_type_match returned false for NULL"),
552            // Use `to_text` (regardless of type) since we are in the mismatch path.
553            Some(s) => (s.get_ident().to_owned(), s.to_text()),
554        };
555        Err(AccessError::TypeError {
556            expected: desc.data_type.to_string(),
557            got,
558            value,
559        })
560    }
561}
562
563struct InsertAction;
564
565impl RowWriterAction for InsertAction {
566    type Output<'a> = DatumCow<'a>;
567
568    const RECORD_TYPE: RecordType = RecordType::Insert;
569
570    #[inline(always)]
571    fn output_for<'a>(datum: impl Into<DatumCow<'a>>) -> Self::Output<'a> {
572        datum.into()
573    }
574
575    #[inline(always)]
576    fn type_check(desc: &SourceColumnDesc, output: &DatumCow<'_>) -> AccessResult<()> {
577        check_datum_type(desc, output.to_datum_ref())
578    }
579
580    #[inline(always)]
581    fn apply(builder: &mut ArrayBuilderImpl, output: DatumCow<'_>) {
582        builder.append(output)
583    }
584
585    #[inline(always)]
586    fn rollback(builder: &mut ArrayBuilderImpl) {
587        builder.pop().unwrap()
588    }
589}
590
591struct DeleteAction;
592
593impl RowWriterAction for DeleteAction {
594    type Output<'a> = DatumCow<'a>;
595
596    const RECORD_TYPE: RecordType = RecordType::Delete;
597
598    #[inline(always)]
599    fn output_for<'a>(datum: impl Into<DatumCow<'a>>) -> Self::Output<'a> {
600        datum.into()
601    }
602
603    #[inline(always)]
604    fn type_check(desc: &SourceColumnDesc, output: &DatumCow<'_>) -> AccessResult<()> {
605        check_datum_type(desc, output.to_datum_ref())
606    }
607
608    #[inline(always)]
609    fn apply(builder: &mut ArrayBuilderImpl, output: DatumCow<'_>) {
610        builder.append(output)
611    }
612
613    #[inline(always)]
614    fn rollback(builder: &mut ArrayBuilderImpl) {
615        builder.pop().unwrap()
616    }
617}
618
619struct UpdateAction;
620
621impl RowWriterAction for UpdateAction {
622    type Output<'a> = (DatumCow<'a>, DatumCow<'a>);
623
624    const RECORD_TYPE: RecordType = RecordType::Update;
625
626    #[inline(always)]
627    fn output_for<'a>(datum: impl Into<DatumCow<'a>>) -> Self::Output<'a> {
628        let datum = datum.into();
629        (datum.clone(), datum)
630    }
631
632    #[inline(always)]
633    fn type_check(
634        desc: &SourceColumnDesc,
635        output: &(DatumCow<'_>, DatumCow<'_>),
636    ) -> AccessResult<()> {
637        check_datum_type(desc, output.0.to_datum_ref())?;
638        check_datum_type(desc, output.1.to_datum_ref())
639    }
640
641    #[inline(always)]
642    fn apply(builder: &mut ArrayBuilderImpl, output: (DatumCow<'_>, DatumCow<'_>)) {
643        builder.append(output.0);
644        builder.append(output.1);
645    }
646
647    #[inline(always)]
648    fn rollback(builder: &mut ArrayBuilderImpl) {
649        builder.pop().unwrap();
650        builder.pop().unwrap();
651    }
652}