risingwave_connector/sink/
big_query.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use core::pin::Pin;
16use core::time::Duration;
17use std::collections::{BTreeMap, HashMap, VecDeque};
18
19use anyhow::{Context, anyhow};
20use base64::Engine;
21use base64::prelude::BASE64_STANDARD;
22use futures::future::pending;
23use futures::prelude::Future;
24use futures::{Stream, StreamExt};
25use futures_async_stream::try_stream;
26use gcp_bigquery_client::Client;
27use gcp_bigquery_client::error::BQError;
28use gcp_bigquery_client::model::query_request::QueryRequest;
29use gcp_bigquery_client::model::table::Table;
30use gcp_bigquery_client::model::table_field_schema::TableFieldSchema;
31use gcp_bigquery_client::model::table_schema::TableSchema;
32use google_cloud_bigquery::grpc::apiv1::conn_pool::ConnectionManager;
33use google_cloud_gax::conn::{ConnectionOptions, Environment};
34use google_cloud_gax::grpc::Request;
35use google_cloud_googleapis::cloud::bigquery::storage::v1::append_rows_request::{
36    MissingValueInterpretation, ProtoData, Rows as AppendRowsRequestRows,
37};
38use google_cloud_googleapis::cloud::bigquery::storage::v1::{
39    AppendRowsRequest, AppendRowsResponse, ProtoRows, ProtoSchema,
40};
41use google_cloud_pubsub::client::google_cloud_auth;
42use google_cloud_pubsub::client::google_cloud_auth::credentials::CredentialsFile;
43use phf::{Set, phf_set};
44use prost_reflect::{FieldDescriptor, MessageDescriptor};
45use prost_types::{
46    DescriptorProto, FieldDescriptorProto, FileDescriptorProto, FileDescriptorSet,
47    field_descriptor_proto,
48};
49use risingwave_common::array::{Op, StreamChunk};
50use risingwave_common::catalog::{Field, Schema};
51use risingwave_common::types::DataType;
52use serde::Deserialize;
53use serde_with::{DisplayFromStr, serde_as};
54use simd_json::prelude::ArrayTrait;
55use tokio::sync::mpsc;
56use tonic::{Response, Status, async_trait};
57use url::Url;
58use uuid::Uuid;
59use with_options::WithOptions;
60use yup_oauth2::ServiceAccountKey;
61
62use super::encoder::{ProtoEncoder, ProtoHeader, RowEncoder, SerTo};
63use super::log_store::{LogStoreReadItem, TruncateOffset};
64use super::{
65    LogSinker, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, SinkError, SinkLogReader,
66};
67use crate::aws_utils::load_file_descriptor_from_s3;
68use crate::connector_common::AwsAuthProps;
69use crate::enforce_secret::EnforceSecret;
70use crate::sink::{Result, Sink, SinkParam, SinkWriterParam};
71
72pub const BIGQUERY_SINK: &str = "bigquery";
73pub const CHANGE_TYPE: &str = "_CHANGE_TYPE";
74const DEFAULT_GRPC_CHANNEL_NUMS: usize = 4;
75const CONNECT_TIMEOUT: Option<Duration> = Some(Duration::from_secs(30));
76const CONNECTION_TIMEOUT: Option<Duration> = None;
77const BIGQUERY_SEND_FUTURE_BUFFER_MAX_SIZE: usize = 65536;
78// < 10MB, we set 8MB
79const MAX_ROW_SIZE: usize = 8 * 1024 * 1024;
80
81#[serde_as]
82#[derive(Deserialize, Debug, Clone, WithOptions)]
83pub struct BigQueryCommon {
84    #[serde(rename = "bigquery.local.path")]
85    pub local_path: Option<String>,
86    #[serde(rename = "bigquery.s3.path")]
87    pub s3_path: Option<String>,
88    #[serde(rename = "bigquery.project")]
89    pub project: String,
90    #[serde(rename = "bigquery.dataset")]
91    pub dataset: String,
92    #[serde(rename = "bigquery.table")]
93    pub table: String,
94    #[serde(default)] // default false
95    #[serde_as(as = "DisplayFromStr")]
96    pub auto_create: bool,
97    #[serde(rename = "bigquery.credentials")]
98    pub credentials: Option<String>,
99}
100
101impl EnforceSecret for BigQueryCommon {
102    const ENFORCE_SECRET_PROPERTIES: Set<&'static str> = phf_set! {
103        "bigquery.credentials",
104    };
105}
106
107struct BigQueryFutureManager {
108    // `offset_queue` holds the Some corresponding to each future.
109    // When TruncateOffset is barrier, the num is 0, we don't need to wait for the return of `resp_stream`.
110    // When TruncateOffset is chunk:
111    // 1. chunk has no rows. we didn't send, the num is 0, we don't need to wait for the return of `resp_stream`.
112    // 2. chunk is less than `MAX_ROW_SIZE`, we only sent once, the num is 1 and we only have to wait once for `resp_stream`.
113    // 3. chunk is less than `MAX_ROW_SIZE`, we only sent n, the num is n and we need to wait n times for r.
114    offset_queue: VecDeque<(TruncateOffset, usize)>,
115    resp_stream: Pin<Box<dyn Stream<Item = Result<()>> + Send>>,
116}
117impl BigQueryFutureManager {
118    pub fn new(
119        max_future_num: usize,
120        resp_stream: impl Stream<Item = Result<()>> + Send + 'static,
121    ) -> Self {
122        let offset_queue = VecDeque::with_capacity(max_future_num);
123        Self {
124            offset_queue,
125            resp_stream: Box::pin(resp_stream),
126        }
127    }
128
129    pub fn add_offset(&mut self, offset: TruncateOffset, resp_num: usize) {
130        self.offset_queue.push_back((offset, resp_num));
131    }
132
133    pub async fn next_offset(&mut self) -> Result<TruncateOffset> {
134        if let Some((_offset, remaining_resp_num)) = self.offset_queue.front_mut() {
135            if *remaining_resp_num == 0 {
136                return Ok(self.offset_queue.pop_front().unwrap().0);
137            }
138            while *remaining_resp_num > 0 {
139                self.resp_stream
140                    .next()
141                    .await
142                    .ok_or_else(|| SinkError::BigQuery(anyhow::anyhow!("end of stream")))??;
143                *remaining_resp_num -= 1;
144            }
145            Ok(self.offset_queue.pop_front().unwrap().0)
146        } else {
147            pending().await
148        }
149    }
150}
151pub struct BigQueryLogSinker {
152    writer: BigQuerySinkWriter,
153    bigquery_future_manager: BigQueryFutureManager,
154    future_num: usize,
155}
156impl BigQueryLogSinker {
157    pub fn new(
158        writer: BigQuerySinkWriter,
159        resp_stream: impl Stream<Item = Result<()>> + Send + 'static,
160        future_num: usize,
161    ) -> Self {
162        Self {
163            writer,
164            bigquery_future_manager: BigQueryFutureManager::new(future_num, resp_stream),
165            future_num,
166        }
167    }
168}
169
170#[async_trait]
171impl LogSinker for BigQueryLogSinker {
172    async fn consume_log_and_sink(mut self, mut log_reader: impl SinkLogReader) -> Result<!> {
173        log_reader.start_from(None).await?;
174        loop {
175            tokio::select!(
176                offset = self.bigquery_future_manager.next_offset() => {
177                        log_reader.truncate(offset?)?;
178                }
179                item_result = log_reader.next_item(), if self.bigquery_future_manager.offset_queue.len() <= self.future_num => {
180                    let (epoch, item) = item_result?;
181                    match item {
182                        LogStoreReadItem::StreamChunk { chunk_id, chunk } => {
183                            let resp_num = self.writer.write_chunk(chunk)?;
184                            self.bigquery_future_manager
185                                .add_offset(TruncateOffset::Chunk { epoch, chunk_id },resp_num);
186                        }
187                        LogStoreReadItem::Barrier { .. } => {
188                            self.bigquery_future_manager
189                                .add_offset(TruncateOffset::Barrier { epoch },0);
190                        }
191                    }
192                }
193            )
194        }
195    }
196}
197
198impl BigQueryCommon {
199    async fn build_client(&self, aws_auth_props: &AwsAuthProps) -> Result<Client> {
200        let auth_json = self.get_auth_json_from_path(aws_auth_props).await?;
201
202        let service_account =
203            if let Ok(auth_json_from_base64) = BASE64_STANDARD.decode(auth_json.clone()) {
204                serde_json::from_slice::<ServiceAccountKey>(&auth_json_from_base64)
205            } else {
206                serde_json::from_str::<ServiceAccountKey>(&auth_json)
207            }
208            .map_err(|e| SinkError::BigQuery(e.into()))?;
209
210        let client: Client = Client::from_service_account_key(service_account, false)
211            .await
212            .map_err(|err| SinkError::BigQuery(anyhow::anyhow!(err)))?;
213        Ok(client)
214    }
215
216    async fn build_writer_client(
217        &self,
218        aws_auth_props: &AwsAuthProps,
219    ) -> Result<(StorageWriterClient, impl Stream<Item = Result<()>> + use<>)> {
220        let auth_json = self.get_auth_json_from_path(aws_auth_props).await?;
221
222        let credentials_file =
223            if let Ok(auth_json_from_base64) = BASE64_STANDARD.decode(auth_json.clone()) {
224                serde_json::from_slice::<CredentialsFile>(&auth_json_from_base64)
225            } else {
226                serde_json::from_str::<CredentialsFile>(&auth_json)
227            }
228            .map_err(|e| SinkError::BigQuery(e.into()))?;
229
230        StorageWriterClient::new(credentials_file).await
231    }
232
233    async fn get_auth_json_from_path(&self, aws_auth_props: &AwsAuthProps) -> Result<String> {
234        if let Some(credentials) = &self.credentials {
235            Ok(credentials.clone())
236        } else if let Some(local_path) = &self.local_path {
237            std::fs::read_to_string(local_path)
238                .map_err(|err| SinkError::BigQuery(anyhow::anyhow!(err)))
239        } else if let Some(s3_path) = &self.s3_path {
240            let url =
241                Url::parse(s3_path).map_err(|err| SinkError::BigQuery(anyhow::anyhow!(err)))?;
242            let auth_vec = load_file_descriptor_from_s3(&url, aws_auth_props)
243                .await
244                .map_err(|err| SinkError::BigQuery(anyhow::anyhow!(err)))?;
245            Ok(String::from_utf8(auth_vec).map_err(|e| SinkError::BigQuery(e.into()))?)
246        } else {
247            Err(SinkError::BigQuery(anyhow::anyhow!(
248                "`bigquery.local.path` and `bigquery.s3.path` set at least one, configure as needed."
249            )))
250        }
251    }
252}
253
254#[serde_as]
255#[derive(Clone, Debug, Deserialize, WithOptions)]
256pub struct BigQueryConfig {
257    #[serde(flatten)]
258    pub common: BigQueryCommon,
259    #[serde(flatten)]
260    pub aws_auth_props: AwsAuthProps,
261    pub r#type: String, // accept "append-only" or "upsert"
262}
263
264impl EnforceSecret for BigQueryConfig {
265    fn enforce_one(prop: &str) -> crate::error::ConnectorResult<()> {
266        BigQueryCommon::enforce_one(prop)?;
267        AwsAuthProps::enforce_one(prop)?;
268        Ok(())
269    }
270}
271
272impl BigQueryConfig {
273    pub fn from_btreemap(properties: BTreeMap<String, String>) -> Result<Self> {
274        let config =
275            serde_json::from_value::<BigQueryConfig>(serde_json::to_value(properties).unwrap())
276                .map_err(|e| SinkError::Config(anyhow!(e)))?;
277        if config.r#type != SINK_TYPE_APPEND_ONLY && config.r#type != SINK_TYPE_UPSERT {
278            return Err(SinkError::Config(anyhow!(
279                "`{}` must be {}, or {}",
280                SINK_TYPE_OPTION,
281                SINK_TYPE_APPEND_ONLY,
282                SINK_TYPE_UPSERT
283            )));
284        }
285        Ok(config)
286    }
287}
288
289#[derive(Debug)]
290pub struct BigQuerySink {
291    pub config: BigQueryConfig,
292    schema: Schema,
293    pk_indices: Vec<usize>,
294    is_append_only: bool,
295}
296
297impl EnforceSecret for BigQuerySink {
298    fn enforce_secret<'a>(
299        prop_iter: impl Iterator<Item = &'a str>,
300    ) -> crate::error::ConnectorResult<()> {
301        for prop in prop_iter {
302            BigQueryConfig::enforce_one(prop)?;
303        }
304        Ok(())
305    }
306}
307
308impl BigQuerySink {
309    pub fn new(
310        config: BigQueryConfig,
311        schema: Schema,
312        pk_indices: Vec<usize>,
313        is_append_only: bool,
314    ) -> Result<Self> {
315        Ok(Self {
316            config,
317            schema,
318            pk_indices,
319            is_append_only,
320        })
321    }
322}
323
324impl BigQuerySink {
325    fn check_column_name_and_type(
326        &self,
327        big_query_columns_desc: HashMap<String, String>,
328    ) -> Result<()> {
329        let rw_fields_name = self.schema.fields();
330        if big_query_columns_desc.is_empty() {
331            return Err(SinkError::BigQuery(anyhow::anyhow!(
332                "Cannot find table in bigquery"
333            )));
334        }
335        if rw_fields_name.len().ne(&big_query_columns_desc.len()) {
336            return Err(SinkError::BigQuery(anyhow::anyhow!(
337                "The length of the RisingWave column {} must be equal to the length of the bigquery column {}",
338                rw_fields_name.len(),
339                big_query_columns_desc.len()
340            )));
341        }
342
343        for i in rw_fields_name {
344            let value = big_query_columns_desc.get(&i.name).ok_or_else(|| {
345                SinkError::BigQuery(anyhow::anyhow!(
346                    "Column `{:?}` on RisingWave side is not found on BigQuery side.",
347                    i.name
348                ))
349            })?;
350            let data_type_string = Self::get_string_and_check_support_from_datatype(&i.data_type)?;
351            if data_type_string.ne(value) {
352                return Err(SinkError::BigQuery(anyhow::anyhow!(
353                    "Data type mismatch for column `{:?}`. BigQuery side: `{:?}`, RisingWave side: `{:?}`. ",
354                    i.name,
355                    value,
356                    data_type_string
357                )));
358            };
359        }
360        Ok(())
361    }
362
363    fn get_string_and_check_support_from_datatype(rw_data_type: &DataType) -> Result<String> {
364        match rw_data_type {
365            DataType::Boolean => Ok("BOOL".to_owned()),
366            DataType::Int16 => Ok("INT64".to_owned()),
367            DataType::Int32 => Ok("INT64".to_owned()),
368            DataType::Int64 => Ok("INT64".to_owned()),
369            DataType::Float32 => Err(SinkError::BigQuery(anyhow::anyhow!(
370                "REAL is not supported for BigQuery sink. Please convert to FLOAT64 or other supported types."
371            ))),
372            DataType::Float64 => Ok("FLOAT64".to_owned()),
373            DataType::Decimal => Ok("NUMERIC".to_owned()),
374            DataType::Date => Ok("DATE".to_owned()),
375            DataType::Varchar => Ok("STRING".to_owned()),
376            DataType::Time => Ok("TIME".to_owned()),
377            DataType::Timestamp => Ok("DATETIME".to_owned()),
378            DataType::Timestamptz => Ok("TIMESTAMP".to_owned()),
379            DataType::Interval => Ok("INTERVAL".to_owned()),
380            DataType::Struct(structs) => {
381                let mut elements_vec = vec![];
382                for (name, datatype) in structs.iter() {
383                    let element_string =
384                        Self::get_string_and_check_support_from_datatype(datatype)?;
385                    elements_vec.push(format!("{} {}", name, element_string));
386                }
387                Ok(format!("STRUCT<{}>", elements_vec.join(", ")))
388            }
389            DataType::List(l) => {
390                let element_string = Self::get_string_and_check_support_from_datatype(l.elem())?;
391                Ok(format!("ARRAY<{}>", element_string))
392            }
393            DataType::Bytea => Ok("BYTES".to_owned()),
394            DataType::Jsonb => Ok("JSON".to_owned()),
395            DataType::Serial => Ok("INT64".to_owned()),
396            DataType::Int256 => Err(SinkError::BigQuery(anyhow::anyhow!(
397                "INT256 is not supported for BigQuery sink."
398            ))),
399            DataType::Map(_) => Err(SinkError::BigQuery(anyhow::anyhow!(
400                "MAP is not supported for BigQuery sink."
401            ))),
402            DataType::Vector(_) => Err(SinkError::BigQuery(anyhow::anyhow!(
403                "VECTOR is not supported for BigQuery sink."
404            ))),
405        }
406    }
407
408    fn map_field(rw_field: &Field) -> Result<TableFieldSchema> {
409        let tfs = match &rw_field.data_type {
410            DataType::Boolean => TableFieldSchema::bool(&rw_field.name),
411            DataType::Int16 | DataType::Int32 | DataType::Int64 | DataType::Serial => {
412                TableFieldSchema::integer(&rw_field.name)
413            }
414            DataType::Float32 => {
415                return Err(SinkError::BigQuery(anyhow::anyhow!(
416                    "REAL is not supported for BigQuery sink. Please convert to FLOAT64 or other supported types."
417                )));
418            }
419            DataType::Float64 => TableFieldSchema::float(&rw_field.name),
420            DataType::Decimal => TableFieldSchema::numeric(&rw_field.name),
421            DataType::Date => TableFieldSchema::date(&rw_field.name),
422            DataType::Varchar => TableFieldSchema::string(&rw_field.name),
423            DataType::Time => TableFieldSchema::time(&rw_field.name),
424            DataType::Timestamp => TableFieldSchema::date_time(&rw_field.name),
425            DataType::Timestamptz => TableFieldSchema::timestamp(&rw_field.name),
426            DataType::Interval => {
427                return Err(SinkError::BigQuery(anyhow::anyhow!(
428                    "INTERVAL is not supported for BigQuery sink. Please convert to VARCHAR or other supported types."
429                )));
430            }
431            DataType::Struct(st) => {
432                let mut sub_fields = Vec::with_capacity(st.len());
433                for (name, dt) in st.iter() {
434                    let rw_field = Field::with_name(dt.clone(), name);
435                    let field = Self::map_field(&rw_field)?;
436                    sub_fields.push(field);
437                }
438                TableFieldSchema::record(&rw_field.name, sub_fields)
439            }
440            DataType::List(lt) => {
441                let inner_field =
442                    Self::map_field(&Field::with_name(lt.elem().clone(), &rw_field.name))?;
443                TableFieldSchema {
444                    mode: Some("REPEATED".to_owned()),
445                    ..inner_field
446                }
447            }
448
449            DataType::Bytea => TableFieldSchema::bytes(&rw_field.name),
450            DataType::Jsonb => TableFieldSchema::json(&rw_field.name),
451            DataType::Int256 => {
452                return Err(SinkError::BigQuery(anyhow::anyhow!(
453                    "INT256 is not supported for BigQuery sink."
454                )));
455            }
456            DataType::Map(_) => {
457                return Err(SinkError::BigQuery(anyhow::anyhow!(
458                    "MAP is not supported for BigQuery sink."
459                )));
460            }
461            DataType::Vector(_) => {
462                return Err(SinkError::BigQuery(anyhow::anyhow!(
463                    "VECTOR is not supported for BigQuery sink."
464                )));
465            }
466        };
467        Ok(tfs)
468    }
469
470    async fn create_table(
471        &self,
472        client: &Client,
473        project_id: &str,
474        dataset_id: &str,
475        table_id: &str,
476        fields: &Vec<Field>,
477    ) -> Result<Table> {
478        let dataset = client
479            .dataset()
480            .get(project_id, dataset_id)
481            .await
482            .map_err(|e| SinkError::BigQuery(e.into()))?;
483        let fields: Vec<_> = fields.iter().map(Self::map_field).collect::<Result<_>>()?;
484        let table = Table::from_dataset(&dataset, table_id, TableSchema::new(fields));
485
486        client
487            .table()
488            .create(table)
489            .await
490            .map_err(|e| SinkError::BigQuery(e.into()))
491    }
492}
493
494impl Sink for BigQuerySink {
495    type LogSinker = BigQueryLogSinker;
496
497    const SINK_NAME: &'static str = BIGQUERY_SINK;
498
499    async fn new_log_sinker(&self, _writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
500        let (writer, resp_stream) = BigQuerySinkWriter::new(
501            self.config.clone(),
502            self.schema.clone(),
503            self.pk_indices.clone(),
504            self.is_append_only,
505        )
506        .await?;
507        Ok(BigQueryLogSinker::new(
508            writer,
509            resp_stream,
510            BIGQUERY_SEND_FUTURE_BUFFER_MAX_SIZE,
511        ))
512    }
513
514    async fn validate(&self) -> Result<()> {
515        risingwave_common::license::Feature::BigQuerySink
516            .check_available()
517            .map_err(|e| anyhow::anyhow!(e))?;
518        if !self.is_append_only && self.pk_indices.is_empty() {
519            return Err(SinkError::Config(anyhow!(
520                "Primary key not defined for upsert bigquery sink (please define in `primary_key` field)"
521            )));
522        }
523        let client = self
524            .config
525            .common
526            .build_client(&self.config.aws_auth_props)
527            .await?;
528        let BigQueryCommon {
529            project: project_id,
530            dataset: dataset_id,
531            table: table_id,
532            ..
533        } = &self.config.common;
534
535        if self.config.common.auto_create {
536            match client
537                .table()
538                .get(project_id, dataset_id, table_id, None)
539                .await
540            {
541                Err(BQError::RequestError(_)) => {
542                    // early return: no need to query schema to check column and type
543                    return self
544                        .create_table(
545                            &client,
546                            project_id,
547                            dataset_id,
548                            table_id,
549                            &self.schema.fields,
550                        )
551                        .await
552                        .map(|_| ());
553                }
554                Err(e) => return Err(SinkError::BigQuery(e.into())),
555                _ => {}
556            }
557        }
558
559        let mut rs = client
560            .job()
561            .query(
562                &self.config.common.project,
563                QueryRequest::new(format!(
564                    "SELECT column_name, data_type FROM `{}.{}.INFORMATION_SCHEMA.COLUMNS` WHERE table_name = '{}'",
565                    project_id, dataset_id, table_id,
566                )),
567            ).await.map_err(|e| SinkError::BigQuery(e.into()))?;
568
569        let mut big_query_schema = HashMap::default();
570        while rs.next_row() {
571            big_query_schema.insert(
572                rs.get_string_by_name("column_name")
573                    .map_err(|e| SinkError::BigQuery(e.into()))?
574                    .ok_or_else(|| {
575                        SinkError::BigQuery(anyhow::anyhow!("Cannot find column_name"))
576                    })?,
577                rs.get_string_by_name("data_type")
578                    .map_err(|e| SinkError::BigQuery(e.into()))?
579                    .ok_or_else(|| {
580                        SinkError::BigQuery(anyhow::anyhow!("Cannot find column_name"))
581                    })?,
582            );
583        }
584
585        self.check_column_name_and_type(big_query_schema)?;
586        Ok(())
587    }
588}
589
590pub struct BigQuerySinkWriter {
591    pub config: BigQueryConfig,
592    #[expect(dead_code)]
593    schema: Schema,
594    #[expect(dead_code)]
595    pk_indices: Vec<usize>,
596    client: StorageWriterClient,
597    is_append_only: bool,
598    row_encoder: ProtoEncoder,
599    writer_pb_schema: ProtoSchema,
600    #[expect(dead_code)]
601    message_descriptor: MessageDescriptor,
602    write_stream: String,
603    proto_field: Option<FieldDescriptor>,
604}
605
606impl TryFrom<SinkParam> for BigQuerySink {
607    type Error = SinkError;
608
609    fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
610        let schema = param.schema();
611        let config = BigQueryConfig::from_btreemap(param.properties)?;
612        BigQuerySink::new(
613            config,
614            schema,
615            param.downstream_pk,
616            param.sink_type.is_append_only(),
617        )
618    }
619}
620
621impl BigQuerySinkWriter {
622    pub async fn new(
623        config: BigQueryConfig,
624        schema: Schema,
625        pk_indices: Vec<usize>,
626        is_append_only: bool,
627    ) -> Result<(Self, impl Stream<Item = Result<()>>)> {
628        let (client, resp_stream) = config
629            .common
630            .build_writer_client(&config.aws_auth_props)
631            .await?;
632        let mut descriptor_proto = build_protobuf_schema(
633            schema
634                .fields()
635                .iter()
636                .map(|f| (f.name.as_str(), &f.data_type)),
637            config.common.table.clone(),
638        )?;
639
640        if !is_append_only {
641            let field = FieldDescriptorProto {
642                name: Some(CHANGE_TYPE.to_owned()),
643                number: Some((schema.len() + 1) as i32),
644                r#type: Some(field_descriptor_proto::Type::String.into()),
645                ..Default::default()
646            };
647            descriptor_proto.field.push(field);
648        }
649
650        let descriptor_pool = build_protobuf_descriptor_pool(&descriptor_proto)?;
651        let message_descriptor = descriptor_pool
652            .get_message_by_name(&config.common.table)
653            .ok_or_else(|| {
654                SinkError::BigQuery(anyhow::anyhow!(
655                    "Can't find message proto {}",
656                    &config.common.table
657                ))
658            })?;
659        let proto_field = if !is_append_only {
660            let proto_field = message_descriptor
661                .get_field_by_name(CHANGE_TYPE)
662                .ok_or_else(|| {
663                    SinkError::BigQuery(anyhow::anyhow!("Can't find {}", CHANGE_TYPE))
664                })?;
665            Some(proto_field)
666        } else {
667            None
668        };
669        let row_encoder = ProtoEncoder::new(
670            schema.clone(),
671            None,
672            message_descriptor.clone(),
673            ProtoHeader::None,
674        )?;
675        Ok((
676            Self {
677                write_stream: format!(
678                    "projects/{}/datasets/{}/tables/{}/streams/_default",
679                    config.common.project, config.common.dataset, config.common.table
680                ),
681                config,
682                schema,
683                pk_indices,
684                client,
685                is_append_only,
686                row_encoder,
687                message_descriptor,
688                proto_field,
689                writer_pb_schema: ProtoSchema {
690                    proto_descriptor: Some(descriptor_proto),
691                },
692            },
693            resp_stream,
694        ))
695    }
696
697    fn append_only(&mut self, chunk: StreamChunk) -> Result<Vec<Vec<u8>>> {
698        let mut serialized_rows: Vec<Vec<u8>> = Vec::with_capacity(chunk.capacity());
699        for (op, row) in chunk.rows() {
700            if op != Op::Insert {
701                continue;
702            }
703            serialized_rows.push(self.row_encoder.encode(row)?.ser_to()?)
704        }
705        Ok(serialized_rows)
706    }
707
708    fn upsert(&mut self, chunk: StreamChunk) -> Result<Vec<Vec<u8>>> {
709        let mut serialized_rows: Vec<Vec<u8>> = Vec::with_capacity(chunk.capacity());
710        for (op, row) in chunk.rows() {
711            if op == Op::UpdateDelete {
712                continue;
713            }
714            let mut pb_row = self.row_encoder.encode(row)?;
715            match op {
716                Op::Insert => pb_row
717                    .message
718                    .try_set_field(
719                        self.proto_field.as_ref().unwrap(),
720                        prost_reflect::Value::String("UPSERT".to_owned()),
721                    )
722                    .map_err(|e| SinkError::BigQuery(e.into()))?,
723                Op::Delete => pb_row
724                    .message
725                    .try_set_field(
726                        self.proto_field.as_ref().unwrap(),
727                        prost_reflect::Value::String("DELETE".to_owned()),
728                    )
729                    .map_err(|e| SinkError::BigQuery(e.into()))?,
730                Op::UpdateDelete => continue,
731                Op::UpdateInsert => pb_row
732                    .message
733                    .try_set_field(
734                        self.proto_field.as_ref().unwrap(),
735                        prost_reflect::Value::String("UPSERT".to_owned()),
736                    )
737                    .map_err(|e| SinkError::BigQuery(e.into()))?,
738            };
739
740            serialized_rows.push(pb_row.ser_to()?)
741        }
742        Ok(serialized_rows)
743    }
744
745    fn write_chunk(&mut self, chunk: StreamChunk) -> Result<usize> {
746        let serialized_rows = if self.is_append_only {
747            self.append_only(chunk)?
748        } else {
749            self.upsert(chunk)?
750        };
751        if serialized_rows.is_empty() {
752            return Ok(0);
753        }
754        let mut result = Vec::new();
755        let mut result_inner = Vec::new();
756        let mut size_count = 0;
757        for i in serialized_rows {
758            size_count += i.len();
759            if size_count > MAX_ROW_SIZE {
760                result.push(result_inner);
761                result_inner = Vec::new();
762                size_count = i.len();
763            }
764            result_inner.push(i);
765        }
766        if !result_inner.is_empty() {
767            result.push(result_inner);
768        }
769        let len = result.len();
770        for serialized_rows in result {
771            let rows = AppendRowsRequestRows::ProtoRows(ProtoData {
772                writer_schema: Some(self.writer_pb_schema.clone()),
773                rows: Some(ProtoRows { serialized_rows }),
774            });
775            self.client.append_rows(rows, self.write_stream.clone())?;
776        }
777        Ok(len)
778    }
779}
780
781#[try_stream(ok = (), error = SinkError)]
782pub async fn resp_to_stream(
783    resp_stream: impl Future<
784        Output = std::result::Result<
785            Response<google_cloud_gax::grpc::Streaming<AppendRowsResponse>>,
786            Status,
787        >,
788    >
789    + 'static
790    + Send,
791) {
792    let mut resp_stream = resp_stream
793        .await
794        .map_err(|e| SinkError::BigQuery(e.into()))?
795        .into_inner();
796    loop {
797        match resp_stream
798            .message()
799            .await
800            .map_err(|e| SinkError::BigQuery(e.into()))?
801        {
802            Some(append_rows_response) => {
803                if !append_rows_response.row_errors.is_empty() {
804                    return Err(SinkError::BigQuery(anyhow::anyhow!(
805                        "bigquery insert error {:?}",
806                        append_rows_response.row_errors
807                    )));
808                }
809                if let Some(google_cloud_googleapis::cloud::bigquery::storage::v1::append_rows_response::Response::Error(status)) = append_rows_response.response{
810                            return Err(SinkError::BigQuery(anyhow::anyhow!(
811                                "bigquery insert error {:?}",
812                                status
813                            )));
814                        }
815                yield ();
816            }
817            None => {
818                return Err(SinkError::BigQuery(anyhow::anyhow!(
819                    "bigquery insert error: end of resp stream",
820                )));
821            }
822        }
823    }
824}
825
826struct StorageWriterClient {
827    #[expect(dead_code)]
828    environment: Environment,
829    request_sender: mpsc::UnboundedSender<AppendRowsRequest>,
830}
831impl StorageWriterClient {
832    pub async fn new(
833        credentials: CredentialsFile,
834    ) -> Result<(Self, impl Stream<Item = Result<()>>)> {
835        let ts_grpc = google_cloud_auth::token::DefaultTokenSourceProvider::new_with_credentials(
836            Self::bigquery_grpc_auth_config(),
837            Box::new(credentials),
838        )
839        .await
840        .map_err(|e| SinkError::BigQuery(e.into()))?;
841        let conn_options = ConnectionOptions {
842            connect_timeout: CONNECT_TIMEOUT,
843            timeout: CONNECTION_TIMEOUT,
844        };
845        let environment = Environment::GoogleCloud(Box::new(ts_grpc));
846        let conn = ConnectionManager::new(DEFAULT_GRPC_CHANNEL_NUMS, &environment, &conn_options)
847            .await
848            .map_err(|e| SinkError::BigQuery(e.into()))?;
849        let mut client = conn.writer();
850
851        let (tx, rx) = mpsc::unbounded_channel();
852        let stream = tokio_stream::wrappers::UnboundedReceiverStream::new(rx);
853
854        let resp = async move { client.append_rows(Request::new(stream)).await };
855        let resp_stream = resp_to_stream(resp);
856
857        Ok((
858            StorageWriterClient {
859                environment,
860                request_sender: tx,
861            },
862            resp_stream,
863        ))
864    }
865
866    pub fn append_rows(&mut self, row: AppendRowsRequestRows, write_stream: String) -> Result<()> {
867        let append_req = AppendRowsRequest {
868            write_stream: write_stream.clone(),
869            offset: None,
870            trace_id: Uuid::new_v4().hyphenated().to_string(),
871            missing_value_interpretations: HashMap::default(),
872            rows: Some(row),
873            default_missing_value_interpretation: MissingValueInterpretation::DefaultValue as i32,
874        };
875        self.request_sender
876            .send(append_req)
877            .map_err(|e| SinkError::BigQuery(e.into()))?;
878        Ok(())
879    }
880
881    fn bigquery_grpc_auth_config() -> google_cloud_auth::project::Config<'static> {
882        let mut auth_config = google_cloud_auth::project::Config::default();
883        auth_config =
884            auth_config.with_audience(google_cloud_bigquery::grpc::apiv1::conn_pool::AUDIENCE);
885        auth_config =
886            auth_config.with_scopes(&google_cloud_bigquery::grpc::apiv1::conn_pool::SCOPES);
887        auth_config
888    }
889}
890
891fn build_protobuf_descriptor_pool(desc: &DescriptorProto) -> Result<prost_reflect::DescriptorPool> {
892    let file_descriptor = FileDescriptorProto {
893        message_type: vec![desc.clone()],
894        name: Some("bigquery".to_owned()),
895        ..Default::default()
896    };
897
898    prost_reflect::DescriptorPool::from_file_descriptor_set(FileDescriptorSet {
899        file: vec![file_descriptor],
900    })
901    .context("failed to build descriptor pool")
902    .map_err(SinkError::BigQuery)
903}
904
905fn build_protobuf_schema<'a>(
906    fields: impl Iterator<Item = (&'a str, &'a DataType)>,
907    name: String,
908) -> Result<DescriptorProto> {
909    let mut proto = DescriptorProto {
910        name: Some(name),
911        ..Default::default()
912    };
913    let mut struct_vec = vec![];
914    let field_vec = fields
915        .enumerate()
916        .map(|(index, (name, data_type))| {
917            let (field, des_proto) =
918                build_protobuf_field(data_type, (index + 1) as i32, name.to_owned())?;
919            if let Some(sv) = des_proto {
920                struct_vec.push(sv);
921            }
922            Ok(field)
923        })
924        .collect::<Result<Vec<_>>>()?;
925    proto.field = field_vec;
926    proto.nested_type = struct_vec;
927    Ok(proto)
928}
929
930fn build_protobuf_field(
931    data_type: &DataType,
932    index: i32,
933    name: String,
934) -> Result<(FieldDescriptorProto, Option<DescriptorProto>)> {
935    let mut field = FieldDescriptorProto {
936        name: Some(name.clone()),
937        number: Some(index),
938        ..Default::default()
939    };
940    match data_type {
941        DataType::Boolean => field.r#type = Some(field_descriptor_proto::Type::Bool.into()),
942        DataType::Int32 => field.r#type = Some(field_descriptor_proto::Type::Int32.into()),
943        DataType::Int16 | DataType::Int64 => {
944            field.r#type = Some(field_descriptor_proto::Type::Int64.into())
945        }
946        DataType::Float64 => field.r#type = Some(field_descriptor_proto::Type::Double.into()),
947        DataType::Decimal => field.r#type = Some(field_descriptor_proto::Type::String.into()),
948        DataType::Date => field.r#type = Some(field_descriptor_proto::Type::Int32.into()),
949        DataType::Varchar => field.r#type = Some(field_descriptor_proto::Type::String.into()),
950        DataType::Time => field.r#type = Some(field_descriptor_proto::Type::String.into()),
951        DataType::Timestamp => field.r#type = Some(field_descriptor_proto::Type::String.into()),
952        DataType::Timestamptz => field.r#type = Some(field_descriptor_proto::Type::String.into()),
953        DataType::Interval => field.r#type = Some(field_descriptor_proto::Type::String.into()),
954        DataType::Struct(s) => {
955            field.r#type = Some(field_descriptor_proto::Type::Message.into());
956            let name = format!("Struct{}", name);
957            let sub_proto = build_protobuf_schema(s.iter(), name.clone())?;
958            field.type_name = Some(name);
959            return Ok((field, Some(sub_proto)));
960        }
961        DataType::List(l) => {
962            let (mut field, proto) = build_protobuf_field(l.elem(), index, name.clone())?;
963            field.label = Some(field_descriptor_proto::Label::Repeated.into());
964            return Ok((field, proto));
965        }
966        DataType::Bytea => field.r#type = Some(field_descriptor_proto::Type::Bytes.into()),
967        DataType::Jsonb => field.r#type = Some(field_descriptor_proto::Type::String.into()),
968        DataType::Serial => field.r#type = Some(field_descriptor_proto::Type::Int64.into()),
969        DataType::Float32 | DataType::Int256 => {
970            return Err(SinkError::BigQuery(anyhow::anyhow!(
971                "Don't support Float32 and Int256"
972            )));
973        }
974        DataType::Map(_) => return Err(SinkError::BigQuery(anyhow::anyhow!("Don't support Map"))),
975        DataType::Vector(_) => {
976            return Err(SinkError::BigQuery(anyhow::anyhow!("Don't support Vector")));
977        }
978    }
979    Ok((field, None))
980}
981
982#[cfg(test)]
983mod test {
984
985    use std::assert_matches::assert_matches;
986
987    use risingwave_common::catalog::{Field, Schema};
988    use risingwave_common::types::{DataType, StructType};
989
990    use crate::sink::big_query::{
991        BigQuerySink, build_protobuf_descriptor_pool, build_protobuf_schema,
992    };
993
994    #[tokio::test]
995    async fn test_type_check() {
996        let big_query_type_string = "ARRAY<STRUCT<v1 ARRAY<INT64>, v2 STRUCT<v1 INT64, v2 INT64>>>";
997        let rw_datatype = DataType::list(DataType::Struct(StructType::new(vec![
998            ("v1".to_owned(), DataType::Int64.list()),
999            (
1000                "v2".to_owned(),
1001                DataType::Struct(StructType::new(vec![
1002                    ("v1".to_owned(), DataType::Int64),
1003                    ("v2".to_owned(), DataType::Int64),
1004                ])),
1005            ),
1006        ])));
1007        assert_eq!(
1008            BigQuerySink::get_string_and_check_support_from_datatype(&rw_datatype).unwrap(),
1009            big_query_type_string
1010        );
1011    }
1012
1013    #[tokio::test]
1014    async fn test_schema_check() {
1015        let schema = Schema {
1016            fields: vec![
1017                Field::with_name(DataType::Int64, "v1"),
1018                Field::with_name(DataType::Float64, "v2"),
1019                Field::with_name(
1020                    DataType::list(DataType::Struct(StructType::new(vec![
1021                        ("v1".to_owned(), DataType::Int64.list()),
1022                        (
1023                            "v3".to_owned(),
1024                            DataType::Struct(StructType::new(vec![
1025                                ("v1".to_owned(), DataType::Int64),
1026                                ("v2".to_owned(), DataType::Int64),
1027                            ])),
1028                        ),
1029                    ]))),
1030                    "v3",
1031                ),
1032            ],
1033        };
1034        let fields = schema
1035            .fields()
1036            .iter()
1037            .map(|f| (f.name.as_str(), &f.data_type));
1038        let desc = build_protobuf_schema(fields, "t1".to_owned()).unwrap();
1039        let pool = build_protobuf_descriptor_pool(&desc).unwrap();
1040        let t1_message = pool.get_message_by_name("t1").unwrap();
1041        assert_matches!(
1042            t1_message.get_field_by_name("v1").unwrap().kind(),
1043            prost_reflect::Kind::Int64
1044        );
1045        assert_matches!(
1046            t1_message.get_field_by_name("v2").unwrap().kind(),
1047            prost_reflect::Kind::Double
1048        );
1049        assert_matches!(
1050            t1_message.get_field_by_name("v3").unwrap().kind(),
1051            prost_reflect::Kind::Message(_)
1052        );
1053
1054        let v3_message = pool.get_message_by_name("t1.Structv3").unwrap();
1055        assert_matches!(
1056            v3_message.get_field_by_name("v1").unwrap().kind(),
1057            prost_reflect::Kind::Int64
1058        );
1059        assert!(v3_message.get_field_by_name("v1").unwrap().is_list());
1060
1061        let v3_v3_message = pool.get_message_by_name("t1.Structv3.Structv3").unwrap();
1062        assert_matches!(
1063            v3_v3_message.get_field_by_name("v1").unwrap().kind(),
1064            prost_reflect::Kind::Int64
1065        );
1066        assert_matches!(
1067            v3_v3_message.get_field_by_name("v2").unwrap().kind(),
1068            prost_reflect::Kind::Int64
1069        );
1070    }
1071}