risingwave_connector/sink/
big_query.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use core::pin::Pin;
16use core::time::Duration;
17use std::collections::{BTreeMap, HashMap, VecDeque};
18
19use anyhow::{Context, anyhow};
20use futures::future::pending;
21use futures::prelude::Future;
22use futures::{Stream, StreamExt};
23use futures_async_stream::try_stream;
24use gcp_bigquery_client::Client;
25use gcp_bigquery_client::error::BQError;
26use gcp_bigquery_client::model::query_request::QueryRequest;
27use gcp_bigquery_client::model::table::Table;
28use gcp_bigquery_client::model::table_field_schema::TableFieldSchema;
29use gcp_bigquery_client::model::table_schema::TableSchema;
30use google_cloud_bigquery::grpc::apiv1::conn_pool::ConnectionManager;
31use google_cloud_gax::conn::{ConnectionOptions, Environment};
32use google_cloud_gax::grpc::Request;
33use google_cloud_googleapis::cloud::bigquery::storage::v1::append_rows_request::{
34    MissingValueInterpretation, ProtoData, Rows as AppendRowsRequestRows,
35};
36use google_cloud_googleapis::cloud::bigquery::storage::v1::{
37    AppendRowsRequest, AppendRowsResponse, ProtoRows, ProtoSchema,
38};
39use google_cloud_pubsub::client::google_cloud_auth;
40use google_cloud_pubsub::client::google_cloud_auth::credentials::CredentialsFile;
41use phf::{Set, phf_set};
42use prost_reflect::{FieldDescriptor, MessageDescriptor};
43use prost_types::{
44    DescriptorProto, FieldDescriptorProto, FileDescriptorProto, FileDescriptorSet,
45    field_descriptor_proto,
46};
47use risingwave_common::array::{Op, StreamChunk};
48use risingwave_common::catalog::{Field, Schema};
49use risingwave_common::types::DataType;
50use serde_derive::Deserialize;
51use serde_with::{DisplayFromStr, serde_as};
52use simd_json::prelude::ArrayTrait;
53use tokio::sync::mpsc;
54use tonic::{Response, Status, async_trait};
55use url::Url;
56use uuid::Uuid;
57use with_options::WithOptions;
58use yup_oauth2::ServiceAccountKey;
59
60use super::encoder::{ProtoEncoder, ProtoHeader, RowEncoder, SerTo};
61use super::log_store::{LogStoreReadItem, TruncateOffset};
62use super::{
63    LogSinker, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, SinkError, SinkLogReader,
64};
65use crate::aws_utils::load_file_descriptor_from_s3;
66use crate::connector_common::AwsAuthProps;
67use crate::enforce_secret::EnforceSecret;
68use crate::sink::{Result, Sink, SinkParam, SinkWriterParam};
69
70pub const BIGQUERY_SINK: &str = "bigquery";
71pub const CHANGE_TYPE: &str = "_CHANGE_TYPE";
72const DEFAULT_GRPC_CHANNEL_NUMS: usize = 4;
73const CONNECT_TIMEOUT: Option<Duration> = Some(Duration::from_secs(30));
74const CONNECTION_TIMEOUT: Option<Duration> = None;
75const BIGQUERY_SEND_FUTURE_BUFFER_MAX_SIZE: usize = 65536;
76// < 10MB, we set 8MB
77const MAX_ROW_SIZE: usize = 8 * 1024 * 1024;
78
79#[serde_as]
80#[derive(Deserialize, Debug, Clone, WithOptions)]
81pub struct BigQueryCommon {
82    #[serde(rename = "bigquery.local.path")]
83    pub local_path: Option<String>,
84    #[serde(rename = "bigquery.s3.path")]
85    pub s3_path: Option<String>,
86    #[serde(rename = "bigquery.project")]
87    pub project: String,
88    #[serde(rename = "bigquery.dataset")]
89    pub dataset: String,
90    #[serde(rename = "bigquery.table")]
91    pub table: String,
92    #[serde(default)] // default false
93    #[serde_as(as = "DisplayFromStr")]
94    pub auto_create: bool,
95    #[serde(rename = "bigquery.credentials")]
96    pub credentials: Option<String>,
97}
98
99impl EnforceSecret for BigQueryCommon {
100    const ENFORCE_SECRET_PROPERTIES: Set<&'static str> = phf_set! {
101        "bigquery.credentials",
102    };
103}
104
105struct BigQueryFutureManager {
106    // `offset_queue` holds the Some corresponding to each future.
107    // When TruncateOffset is barrier, the num is 0, we don't need to wait for the return of `resp_stream`.
108    // When TruncateOffset is chunk:
109    // 1. chunk has no rows. we didn't send, the num is 0, we don't need to wait for the return of `resp_stream`.
110    // 2. chunk is less than `MAX_ROW_SIZE`, we only sent once, the num is 1 and we only have to wait once for `resp_stream`.
111    // 3. chunk is less than `MAX_ROW_SIZE`, we only sent n, the num is n and we need to wait n times for r.
112    offset_queue: VecDeque<(TruncateOffset, usize)>,
113    resp_stream: Pin<Box<dyn Stream<Item = Result<()>> + Send>>,
114}
115impl BigQueryFutureManager {
116    pub fn new(
117        max_future_num: usize,
118        resp_stream: impl Stream<Item = Result<()>> + Send + 'static,
119    ) -> Self {
120        let offset_queue = VecDeque::with_capacity(max_future_num);
121        Self {
122            offset_queue,
123            resp_stream: Box::pin(resp_stream),
124        }
125    }
126
127    pub fn add_offset(&mut self, offset: TruncateOffset, resp_num: usize) {
128        self.offset_queue.push_back((offset, resp_num));
129    }
130
131    pub async fn next_offset(&mut self) -> Result<TruncateOffset> {
132        if let Some((_offset, remaining_resp_num)) = self.offset_queue.front_mut() {
133            if *remaining_resp_num == 0 {
134                return Ok(self.offset_queue.pop_front().unwrap().0);
135            }
136            while *remaining_resp_num > 0 {
137                self.resp_stream
138                    .next()
139                    .await
140                    .ok_or_else(|| SinkError::BigQuery(anyhow::anyhow!("end of stream")))??;
141                *remaining_resp_num -= 1;
142            }
143            Ok(self.offset_queue.pop_front().unwrap().0)
144        } else {
145            pending().await
146        }
147    }
148}
149pub struct BigQueryLogSinker {
150    writer: BigQuerySinkWriter,
151    bigquery_future_manager: BigQueryFutureManager,
152    future_num: usize,
153}
154impl BigQueryLogSinker {
155    pub fn new(
156        writer: BigQuerySinkWriter,
157        resp_stream: impl Stream<Item = Result<()>> + Send + 'static,
158        future_num: usize,
159    ) -> Self {
160        Self {
161            writer,
162            bigquery_future_manager: BigQueryFutureManager::new(future_num, resp_stream),
163            future_num,
164        }
165    }
166}
167
168#[async_trait]
169impl LogSinker for BigQueryLogSinker {
170    async fn consume_log_and_sink(mut self, mut log_reader: impl SinkLogReader) -> Result<!> {
171        log_reader.start_from(None).await?;
172        loop {
173            tokio::select!(
174                offset = self.bigquery_future_manager.next_offset() => {
175                        log_reader.truncate(offset?)?;
176                }
177                item_result = log_reader.next_item(), if self.bigquery_future_manager.offset_queue.len() <= self.future_num => {
178                    let (epoch, item) = item_result?;
179                    match item {
180                        LogStoreReadItem::StreamChunk { chunk_id, chunk } => {
181                            let resp_num = self.writer.write_chunk(chunk)?;
182                            self.bigquery_future_manager
183                                .add_offset(TruncateOffset::Chunk { epoch, chunk_id },resp_num);
184                        }
185                        LogStoreReadItem::Barrier { .. } => {
186                            self.bigquery_future_manager
187                                .add_offset(TruncateOffset::Barrier { epoch },0);
188                        }
189                    }
190                }
191            )
192        }
193    }
194}
195
196impl BigQueryCommon {
197    async fn build_client(&self, aws_auth_props: &AwsAuthProps) -> Result<Client> {
198        let auth_json = self.get_auth_json_from_path(aws_auth_props).await?;
199
200        let service_account = serde_json::from_str::<ServiceAccountKey>(&auth_json)
201            .map_err(|err| SinkError::BigQuery(anyhow::anyhow!(err)))?;
202        let client: Client = Client::from_service_account_key(service_account, false)
203            .await
204            .map_err(|err| SinkError::BigQuery(anyhow::anyhow!(err)))?;
205        Ok(client)
206    }
207
208    async fn build_writer_client(
209        &self,
210        aws_auth_props: &AwsAuthProps,
211    ) -> Result<(StorageWriterClient, impl Stream<Item = Result<()>> + use<>)> {
212        let auth_json = self.get_auth_json_from_path(aws_auth_props).await?;
213
214        let credentials_file = CredentialsFile::new_from_str(&auth_json)
215            .await
216            .map_err(|e| SinkError::BigQuery(e.into()))?;
217        StorageWriterClient::new(credentials_file).await
218    }
219
220    async fn get_auth_json_from_path(&self, aws_auth_props: &AwsAuthProps) -> Result<String> {
221        if let Some(credentials) = &self.credentials {
222            Ok(credentials.clone())
223        } else if let Some(local_path) = &self.local_path {
224            std::fs::read_to_string(local_path)
225                .map_err(|err| SinkError::BigQuery(anyhow::anyhow!(err)))
226        } else if let Some(s3_path) = &self.s3_path {
227            let url =
228                Url::parse(s3_path).map_err(|err| SinkError::BigQuery(anyhow::anyhow!(err)))?;
229            let auth_vec = load_file_descriptor_from_s3(&url, aws_auth_props)
230                .await
231                .map_err(|err| SinkError::BigQuery(anyhow::anyhow!(err)))?;
232            Ok(String::from_utf8(auth_vec).map_err(|e| SinkError::BigQuery(e.into()))?)
233        } else {
234            Err(SinkError::BigQuery(anyhow::anyhow!(
235                "`bigquery.local.path` and `bigquery.s3.path` set at least one, configure as needed."
236            )))
237        }
238    }
239}
240
241#[serde_as]
242#[derive(Clone, Debug, Deserialize, WithOptions)]
243pub struct BigQueryConfig {
244    #[serde(flatten)]
245    pub common: BigQueryCommon,
246    #[serde(flatten)]
247    pub aws_auth_props: AwsAuthProps,
248    pub r#type: String, // accept "append-only" or "upsert"
249}
250
251impl EnforceSecret for BigQueryConfig {
252    fn enforce_one(prop: &str) -> crate::error::ConnectorResult<()> {
253        BigQueryCommon::enforce_one(prop)?;
254        AwsAuthProps::enforce_one(prop)?;
255        Ok(())
256    }
257}
258
259impl BigQueryConfig {
260    pub fn from_btreemap(properties: BTreeMap<String, String>) -> Result<Self> {
261        let config =
262            serde_json::from_value::<BigQueryConfig>(serde_json::to_value(properties).unwrap())
263                .map_err(|e| SinkError::Config(anyhow!(e)))?;
264        if config.r#type != SINK_TYPE_APPEND_ONLY && config.r#type != SINK_TYPE_UPSERT {
265            return Err(SinkError::Config(anyhow!(
266                "`{}` must be {}, or {}",
267                SINK_TYPE_OPTION,
268                SINK_TYPE_APPEND_ONLY,
269                SINK_TYPE_UPSERT
270            )));
271        }
272        Ok(config)
273    }
274}
275
276#[derive(Debug)]
277pub struct BigQuerySink {
278    pub config: BigQueryConfig,
279    schema: Schema,
280    pk_indices: Vec<usize>,
281    is_append_only: bool,
282}
283
284impl EnforceSecret for BigQuerySink {
285    fn enforce_secret<'a>(
286        prop_iter: impl Iterator<Item = &'a str>,
287    ) -> crate::error::ConnectorResult<()> {
288        for prop in prop_iter {
289            BigQueryConfig::enforce_one(prop)?;
290        }
291        Ok(())
292    }
293}
294
295impl BigQuerySink {
296    pub fn new(
297        config: BigQueryConfig,
298        schema: Schema,
299        pk_indices: Vec<usize>,
300        is_append_only: bool,
301    ) -> Result<Self> {
302        Ok(Self {
303            config,
304            schema,
305            pk_indices,
306            is_append_only,
307        })
308    }
309}
310
311impl BigQuerySink {
312    fn check_column_name_and_type(
313        &self,
314        big_query_columns_desc: HashMap<String, String>,
315    ) -> Result<()> {
316        let rw_fields_name = self.schema.fields();
317        if big_query_columns_desc.is_empty() {
318            return Err(SinkError::BigQuery(anyhow::anyhow!(
319                "Cannot find table in bigquery"
320            )));
321        }
322        if rw_fields_name.len().ne(&big_query_columns_desc.len()) {
323            return Err(SinkError::BigQuery(anyhow::anyhow!(
324                "The length of the RisingWave column {} must be equal to the length of the bigquery column {}",
325                rw_fields_name.len(),
326                big_query_columns_desc.len()
327            )));
328        }
329
330        for i in rw_fields_name {
331            let value = big_query_columns_desc.get(&i.name).ok_or_else(|| {
332                SinkError::BigQuery(anyhow::anyhow!(
333                    "Column `{:?}` on RisingWave side is not found on BigQuery side.",
334                    i.name
335                ))
336            })?;
337            let data_type_string = Self::get_string_and_check_support_from_datatype(&i.data_type)?;
338            if data_type_string.ne(value) {
339                return Err(SinkError::BigQuery(anyhow::anyhow!(
340                    "Data type mismatch for column `{:?}`. BigQuery side: `{:?}`, RisingWave side: `{:?}`. ",
341                    i.name,
342                    value,
343                    data_type_string
344                )));
345            };
346        }
347        Ok(())
348    }
349
350    fn get_string_and_check_support_from_datatype(rw_data_type: &DataType) -> Result<String> {
351        match rw_data_type {
352            DataType::Boolean => Ok("BOOL".to_owned()),
353            DataType::Int16 => Ok("INT64".to_owned()),
354            DataType::Int32 => Ok("INT64".to_owned()),
355            DataType::Int64 => Ok("INT64".to_owned()),
356            DataType::Float32 => Err(SinkError::BigQuery(anyhow::anyhow!(
357                "REAL is not supported for BigQuery sink. Please convert to FLOAT64 or other supported types."
358            ))),
359            DataType::Float64 => Ok("FLOAT64".to_owned()),
360            DataType::Decimal => Ok("NUMERIC".to_owned()),
361            DataType::Date => Ok("DATE".to_owned()),
362            DataType::Varchar => Ok("STRING".to_owned()),
363            DataType::Time => Ok("TIME".to_owned()),
364            DataType::Timestamp => Ok("DATETIME".to_owned()),
365            DataType::Timestamptz => Ok("TIMESTAMP".to_owned()),
366            DataType::Interval => Ok("INTERVAL".to_owned()),
367            DataType::Struct(structs) => {
368                let mut elements_vec = vec![];
369                for (name, datatype) in structs.iter() {
370                    let element_string =
371                        Self::get_string_and_check_support_from_datatype(datatype)?;
372                    elements_vec.push(format!("{} {}", name, element_string));
373                }
374                Ok(format!("STRUCT<{}>", elements_vec.join(", ")))
375            }
376            DataType::List(l) => {
377                let element_string = Self::get_string_and_check_support_from_datatype(l.as_ref())?;
378                Ok(format!("ARRAY<{}>", element_string))
379            }
380            DataType::Bytea => Ok("BYTES".to_owned()),
381            DataType::Jsonb => Ok("JSON".to_owned()),
382            DataType::Serial => Ok("INT64".to_owned()),
383            DataType::Int256 => Err(SinkError::BigQuery(anyhow::anyhow!(
384                "INT256 is not supported for BigQuery sink."
385            ))),
386            DataType::Map(_) => Err(SinkError::BigQuery(anyhow::anyhow!(
387                "MAP is not supported for BigQuery sink."
388            ))),
389            DataType::Vector(_) => todo!("VECTOR_PLACEHOLDER"),
390        }
391    }
392
393    fn map_field(rw_field: &Field) -> Result<TableFieldSchema> {
394        let tfs = match &rw_field.data_type {
395            DataType::Boolean => TableFieldSchema::bool(&rw_field.name),
396            DataType::Int16 | DataType::Int32 | DataType::Int64 | DataType::Serial => {
397                TableFieldSchema::integer(&rw_field.name)
398            }
399            DataType::Float32 => {
400                return Err(SinkError::BigQuery(anyhow::anyhow!(
401                    "REAL is not supported for BigQuery sink. Please convert to FLOAT64 or other supported types."
402                )));
403            }
404            DataType::Float64 => TableFieldSchema::float(&rw_field.name),
405            DataType::Decimal => TableFieldSchema::numeric(&rw_field.name),
406            DataType::Date => TableFieldSchema::date(&rw_field.name),
407            DataType::Varchar => TableFieldSchema::string(&rw_field.name),
408            DataType::Time => TableFieldSchema::time(&rw_field.name),
409            DataType::Timestamp => TableFieldSchema::date_time(&rw_field.name),
410            DataType::Timestamptz => TableFieldSchema::timestamp(&rw_field.name),
411            DataType::Interval => {
412                return Err(SinkError::BigQuery(anyhow::anyhow!(
413                    "INTERVAL is not supported for BigQuery sink. Please convert to VARCHAR or other supported types."
414                )));
415            }
416            DataType::Struct(st) => {
417                let mut sub_fields = Vec::with_capacity(st.len());
418                for (name, dt) in st.iter() {
419                    let rw_field = Field::with_name(dt.clone(), name);
420                    let field = Self::map_field(&rw_field)?;
421                    sub_fields.push(field);
422                }
423                TableFieldSchema::record(&rw_field.name, sub_fields)
424            }
425            DataType::List(dt) => {
426                let inner_field = Self::map_field(&Field::with_name(*dt.clone(), &rw_field.name))?;
427                TableFieldSchema {
428                    mode: Some("REPEATED".to_owned()),
429                    ..inner_field
430                }
431            }
432
433            DataType::Bytea => TableFieldSchema::bytes(&rw_field.name),
434            DataType::Jsonb => TableFieldSchema::json(&rw_field.name),
435            DataType::Int256 => {
436                return Err(SinkError::BigQuery(anyhow::anyhow!(
437                    "INT256 is not supported for BigQuery sink."
438                )));
439            }
440            DataType::Map(_) => {
441                return Err(SinkError::BigQuery(anyhow::anyhow!(
442                    "MAP is not supported for BigQuery sink."
443                )));
444            }
445            DataType::Vector(_) => todo!("VECTOR_PLACEHOLDER"),
446        };
447        Ok(tfs)
448    }
449
450    async fn create_table(
451        &self,
452        client: &Client,
453        project_id: &str,
454        dataset_id: &str,
455        table_id: &str,
456        fields: &Vec<Field>,
457    ) -> Result<Table> {
458        let dataset = client
459            .dataset()
460            .get(project_id, dataset_id)
461            .await
462            .map_err(|e| SinkError::BigQuery(e.into()))?;
463        let fields: Vec<_> = fields.iter().map(Self::map_field).collect::<Result<_>>()?;
464        let table = Table::from_dataset(&dataset, table_id, TableSchema::new(fields));
465
466        client
467            .table()
468            .create(table)
469            .await
470            .map_err(|e| SinkError::BigQuery(e.into()))
471    }
472}
473
474impl Sink for BigQuerySink {
475    type LogSinker = BigQueryLogSinker;
476
477    const SINK_NAME: &'static str = BIGQUERY_SINK;
478
479    async fn new_log_sinker(&self, _writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
480        let (writer, resp_stream) = BigQuerySinkWriter::new(
481            self.config.clone(),
482            self.schema.clone(),
483            self.pk_indices.clone(),
484            self.is_append_only,
485        )
486        .await?;
487        Ok(BigQueryLogSinker::new(
488            writer,
489            resp_stream,
490            BIGQUERY_SEND_FUTURE_BUFFER_MAX_SIZE,
491        ))
492    }
493
494    async fn validate(&self) -> Result<()> {
495        risingwave_common::license::Feature::BigQuerySink
496            .check_available()
497            .map_err(|e| anyhow::anyhow!(e))?;
498        if !self.is_append_only && self.pk_indices.is_empty() {
499            return Err(SinkError::Config(anyhow!(
500                "Primary key not defined for upsert bigquery sink (please define in `primary_key` field)"
501            )));
502        }
503        let client = self
504            .config
505            .common
506            .build_client(&self.config.aws_auth_props)
507            .await?;
508        let BigQueryCommon {
509            project: project_id,
510            dataset: dataset_id,
511            table: table_id,
512            ..
513        } = &self.config.common;
514
515        if self.config.common.auto_create {
516            match client
517                .table()
518                .get(project_id, dataset_id, table_id, None)
519                .await
520            {
521                Err(BQError::RequestError(_)) => {
522                    // early return: no need to query schema to check column and type
523                    return self
524                        .create_table(
525                            &client,
526                            project_id,
527                            dataset_id,
528                            table_id,
529                            &self.schema.fields,
530                        )
531                        .await
532                        .map(|_| ());
533                }
534                Err(e) => return Err(SinkError::BigQuery(e.into())),
535                _ => {}
536            }
537        }
538
539        let mut rs = client
540            .job()
541            .query(
542                &self.config.common.project,
543                QueryRequest::new(format!(
544                    "SELECT column_name, data_type FROM `{}.{}.INFORMATION_SCHEMA.COLUMNS` WHERE table_name = '{}'",
545                    project_id, dataset_id, table_id,
546                )),
547            ).await.map_err(|e| SinkError::BigQuery(e.into()))?;
548
549        let mut big_query_schema = HashMap::default();
550        while rs.next_row() {
551            big_query_schema.insert(
552                rs.get_string_by_name("column_name")
553                    .map_err(|e| SinkError::BigQuery(e.into()))?
554                    .ok_or_else(|| {
555                        SinkError::BigQuery(anyhow::anyhow!("Cannot find column_name"))
556                    })?,
557                rs.get_string_by_name("data_type")
558                    .map_err(|e| SinkError::BigQuery(e.into()))?
559                    .ok_or_else(|| {
560                        SinkError::BigQuery(anyhow::anyhow!("Cannot find column_name"))
561                    })?,
562            );
563        }
564
565        self.check_column_name_and_type(big_query_schema)?;
566        Ok(())
567    }
568}
569
570pub struct BigQuerySinkWriter {
571    pub config: BigQueryConfig,
572    #[expect(dead_code)]
573    schema: Schema,
574    #[expect(dead_code)]
575    pk_indices: Vec<usize>,
576    client: StorageWriterClient,
577    is_append_only: bool,
578    row_encoder: ProtoEncoder,
579    writer_pb_schema: ProtoSchema,
580    #[expect(dead_code)]
581    message_descriptor: MessageDescriptor,
582    write_stream: String,
583    proto_field: Option<FieldDescriptor>,
584}
585
586impl TryFrom<SinkParam> for BigQuerySink {
587    type Error = SinkError;
588
589    fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
590        let schema = param.schema();
591        let config = BigQueryConfig::from_btreemap(param.properties)?;
592        BigQuerySink::new(
593            config,
594            schema,
595            param.downstream_pk,
596            param.sink_type.is_append_only(),
597        )
598    }
599}
600
601impl BigQuerySinkWriter {
602    pub async fn new(
603        config: BigQueryConfig,
604        schema: Schema,
605        pk_indices: Vec<usize>,
606        is_append_only: bool,
607    ) -> Result<(Self, impl Stream<Item = Result<()>>)> {
608        let (client, resp_stream) = config
609            .common
610            .build_writer_client(&config.aws_auth_props)
611            .await?;
612        let mut descriptor_proto = build_protobuf_schema(
613            schema
614                .fields()
615                .iter()
616                .map(|f| (f.name.as_str(), &f.data_type)),
617            config.common.table.clone(),
618        )?;
619
620        if !is_append_only {
621            let field = FieldDescriptorProto {
622                name: Some(CHANGE_TYPE.to_owned()),
623                number: Some((schema.len() + 1) as i32),
624                r#type: Some(field_descriptor_proto::Type::String.into()),
625                ..Default::default()
626            };
627            descriptor_proto.field.push(field);
628        }
629
630        let descriptor_pool = build_protobuf_descriptor_pool(&descriptor_proto)?;
631        let message_descriptor = descriptor_pool
632            .get_message_by_name(&config.common.table)
633            .ok_or_else(|| {
634                SinkError::BigQuery(anyhow::anyhow!(
635                    "Can't find message proto {}",
636                    &config.common.table
637                ))
638            })?;
639        let proto_field = if !is_append_only {
640            let proto_field = message_descriptor
641                .get_field_by_name(CHANGE_TYPE)
642                .ok_or_else(|| {
643                    SinkError::BigQuery(anyhow::anyhow!("Can't find {}", CHANGE_TYPE))
644                })?;
645            Some(proto_field)
646        } else {
647            None
648        };
649        let row_encoder = ProtoEncoder::new(
650            schema.clone(),
651            None,
652            message_descriptor.clone(),
653            ProtoHeader::None,
654        )?;
655        Ok((
656            Self {
657                write_stream: format!(
658                    "projects/{}/datasets/{}/tables/{}/streams/_default",
659                    config.common.project, config.common.dataset, config.common.table
660                ),
661                config,
662                schema,
663                pk_indices,
664                client,
665                is_append_only,
666                row_encoder,
667                message_descriptor,
668                proto_field,
669                writer_pb_schema: ProtoSchema {
670                    proto_descriptor: Some(descriptor_proto),
671                },
672            },
673            resp_stream,
674        ))
675    }
676
677    fn append_only(&mut self, chunk: StreamChunk) -> Result<Vec<Vec<u8>>> {
678        let mut serialized_rows: Vec<Vec<u8>> = Vec::with_capacity(chunk.capacity());
679        for (op, row) in chunk.rows() {
680            if op != Op::Insert {
681                continue;
682            }
683            serialized_rows.push(self.row_encoder.encode(row)?.ser_to()?)
684        }
685        Ok(serialized_rows)
686    }
687
688    fn upsert(&mut self, chunk: StreamChunk) -> Result<Vec<Vec<u8>>> {
689        let mut serialized_rows: Vec<Vec<u8>> = Vec::with_capacity(chunk.capacity());
690        for (op, row) in chunk.rows() {
691            if op == Op::UpdateDelete {
692                continue;
693            }
694            let mut pb_row = self.row_encoder.encode(row)?;
695            match op {
696                Op::Insert => pb_row
697                    .message
698                    .try_set_field(
699                        self.proto_field.as_ref().unwrap(),
700                        prost_reflect::Value::String("UPSERT".to_owned()),
701                    )
702                    .map_err(|e| SinkError::BigQuery(e.into()))?,
703                Op::Delete => pb_row
704                    .message
705                    .try_set_field(
706                        self.proto_field.as_ref().unwrap(),
707                        prost_reflect::Value::String("DELETE".to_owned()),
708                    )
709                    .map_err(|e| SinkError::BigQuery(e.into()))?,
710                Op::UpdateDelete => continue,
711                Op::UpdateInsert => pb_row
712                    .message
713                    .try_set_field(
714                        self.proto_field.as_ref().unwrap(),
715                        prost_reflect::Value::String("UPSERT".to_owned()),
716                    )
717                    .map_err(|e| SinkError::BigQuery(e.into()))?,
718            };
719
720            serialized_rows.push(pb_row.ser_to()?)
721        }
722        Ok(serialized_rows)
723    }
724
725    fn write_chunk(&mut self, chunk: StreamChunk) -> Result<usize> {
726        let serialized_rows = if self.is_append_only {
727            self.append_only(chunk)?
728        } else {
729            self.upsert(chunk)?
730        };
731        if serialized_rows.is_empty() {
732            return Ok(0);
733        }
734        let mut result = Vec::new();
735        let mut result_inner = Vec::new();
736        let mut size_count = 0;
737        for i in serialized_rows {
738            size_count += i.len();
739            if size_count > MAX_ROW_SIZE {
740                result.push(result_inner);
741                result_inner = Vec::new();
742                size_count = i.len();
743            }
744            result_inner.push(i);
745        }
746        if !result_inner.is_empty() {
747            result.push(result_inner);
748        }
749        let len = result.len();
750        for serialized_rows in result {
751            let rows = AppendRowsRequestRows::ProtoRows(ProtoData {
752                writer_schema: Some(self.writer_pb_schema.clone()),
753                rows: Some(ProtoRows { serialized_rows }),
754            });
755            self.client.append_rows(rows, self.write_stream.clone())?;
756        }
757        Ok(len)
758    }
759}
760
761#[try_stream(ok = (), error = SinkError)]
762pub async fn resp_to_stream(
763    resp_stream: impl Future<
764        Output = std::result::Result<
765            Response<google_cloud_gax::grpc::Streaming<AppendRowsResponse>>,
766            Status,
767        >,
768    >
769    + 'static
770    + Send,
771) {
772    let mut resp_stream = resp_stream
773        .await
774        .map_err(|e| SinkError::BigQuery(e.into()))?
775        .into_inner();
776    loop {
777        match resp_stream
778            .message()
779            .await
780            .map_err(|e| SinkError::BigQuery(e.into()))?
781        {
782            Some(append_rows_response) => {
783                if !append_rows_response.row_errors.is_empty() {
784                    return Err(SinkError::BigQuery(anyhow::anyhow!(
785                        "bigquery insert error {:?}",
786                        append_rows_response.row_errors
787                    )));
788                }
789                if let Some(google_cloud_googleapis::cloud::bigquery::storage::v1::append_rows_response::Response::Error(status)) = append_rows_response.response{
790                            return Err(SinkError::BigQuery(anyhow::anyhow!(
791                                "bigquery insert error {:?}",
792                                status
793                            )));
794                        }
795                yield ();
796            }
797            None => {
798                return Err(SinkError::BigQuery(anyhow::anyhow!(
799                    "bigquery insert error: end of resp stream",
800                )));
801            }
802        }
803    }
804}
805
806struct StorageWriterClient {
807    #[expect(dead_code)]
808    environment: Environment,
809    request_sender: mpsc::UnboundedSender<AppendRowsRequest>,
810}
811impl StorageWriterClient {
812    pub async fn new(
813        credentials: CredentialsFile,
814    ) -> Result<(Self, impl Stream<Item = Result<()>>)> {
815        let ts_grpc = google_cloud_auth::token::DefaultTokenSourceProvider::new_with_credentials(
816            Self::bigquery_grpc_auth_config(),
817            Box::new(credentials),
818        )
819        .await
820        .map_err(|e| SinkError::BigQuery(e.into()))?;
821        let conn_options = ConnectionOptions {
822            connect_timeout: CONNECT_TIMEOUT,
823            timeout: CONNECTION_TIMEOUT,
824        };
825        let environment = Environment::GoogleCloud(Box::new(ts_grpc));
826        let conn = ConnectionManager::new(DEFAULT_GRPC_CHANNEL_NUMS, &environment, &conn_options)
827            .await
828            .map_err(|e| SinkError::BigQuery(e.into()))?;
829        let mut client = conn.writer();
830
831        let (tx, rx) = mpsc::unbounded_channel();
832        let stream = tokio_stream::wrappers::UnboundedReceiverStream::new(rx);
833
834        let resp = async move { client.append_rows(Request::new(stream)).await };
835        let resp_stream = resp_to_stream(resp);
836
837        Ok((
838            StorageWriterClient {
839                environment,
840                request_sender: tx,
841            },
842            resp_stream,
843        ))
844    }
845
846    pub fn append_rows(&mut self, row: AppendRowsRequestRows, write_stream: String) -> Result<()> {
847        let append_req = AppendRowsRequest {
848            write_stream: write_stream.clone(),
849            offset: None,
850            trace_id: Uuid::new_v4().hyphenated().to_string(),
851            missing_value_interpretations: HashMap::default(),
852            rows: Some(row),
853            default_missing_value_interpretation: MissingValueInterpretation::DefaultValue as i32,
854        };
855        self.request_sender
856            .send(append_req)
857            .map_err(|e| SinkError::BigQuery(e.into()))?;
858        Ok(())
859    }
860
861    fn bigquery_grpc_auth_config() -> google_cloud_auth::project::Config<'static> {
862        let mut auth_config = google_cloud_auth::project::Config::default();
863        auth_config =
864            auth_config.with_audience(google_cloud_bigquery::grpc::apiv1::conn_pool::AUDIENCE);
865        auth_config =
866            auth_config.with_scopes(&google_cloud_bigquery::grpc::apiv1::conn_pool::SCOPES);
867        auth_config
868    }
869}
870
871fn build_protobuf_descriptor_pool(desc: &DescriptorProto) -> Result<prost_reflect::DescriptorPool> {
872    let file_descriptor = FileDescriptorProto {
873        message_type: vec![desc.clone()],
874        name: Some("bigquery".to_owned()),
875        ..Default::default()
876    };
877
878    prost_reflect::DescriptorPool::from_file_descriptor_set(FileDescriptorSet {
879        file: vec![file_descriptor],
880    })
881    .context("failed to build descriptor pool")
882    .map_err(SinkError::BigQuery)
883}
884
885fn build_protobuf_schema<'a>(
886    fields: impl Iterator<Item = (&'a str, &'a DataType)>,
887    name: String,
888) -> Result<DescriptorProto> {
889    let mut proto = DescriptorProto {
890        name: Some(name),
891        ..Default::default()
892    };
893    let mut struct_vec = vec![];
894    let field_vec = fields
895        .enumerate()
896        .map(|(index, (name, data_type))| {
897            let (field, des_proto) =
898                build_protobuf_field(data_type, (index + 1) as i32, name.to_owned())?;
899            if let Some(sv) = des_proto {
900                struct_vec.push(sv);
901            }
902            Ok(field)
903        })
904        .collect::<Result<Vec<_>>>()?;
905    proto.field = field_vec;
906    proto.nested_type = struct_vec;
907    Ok(proto)
908}
909
910fn build_protobuf_field(
911    data_type: &DataType,
912    index: i32,
913    name: String,
914) -> Result<(FieldDescriptorProto, Option<DescriptorProto>)> {
915    let mut field = FieldDescriptorProto {
916        name: Some(name.clone()),
917        number: Some(index),
918        ..Default::default()
919    };
920    match data_type {
921        DataType::Boolean => field.r#type = Some(field_descriptor_proto::Type::Bool.into()),
922        DataType::Int32 => field.r#type = Some(field_descriptor_proto::Type::Int32.into()),
923        DataType::Int16 | DataType::Int64 => {
924            field.r#type = Some(field_descriptor_proto::Type::Int64.into())
925        }
926        DataType::Float64 => field.r#type = Some(field_descriptor_proto::Type::Double.into()),
927        DataType::Decimal => field.r#type = Some(field_descriptor_proto::Type::String.into()),
928        DataType::Date => field.r#type = Some(field_descriptor_proto::Type::Int32.into()),
929        DataType::Varchar => field.r#type = Some(field_descriptor_proto::Type::String.into()),
930        DataType::Time => field.r#type = Some(field_descriptor_proto::Type::String.into()),
931        DataType::Timestamp => field.r#type = Some(field_descriptor_proto::Type::String.into()),
932        DataType::Timestamptz => field.r#type = Some(field_descriptor_proto::Type::String.into()),
933        DataType::Interval => field.r#type = Some(field_descriptor_proto::Type::String.into()),
934        DataType::Struct(s) => {
935            field.r#type = Some(field_descriptor_proto::Type::Message.into());
936            let name = format!("Struct{}", name);
937            let sub_proto = build_protobuf_schema(s.iter(), name.clone())?;
938            field.type_name = Some(name);
939            return Ok((field, Some(sub_proto)));
940        }
941        DataType::List(l) => {
942            let (mut field, proto) = build_protobuf_field(l.as_ref(), index, name.clone())?;
943            field.label = Some(field_descriptor_proto::Label::Repeated.into());
944            return Ok((field, proto));
945        }
946        DataType::Bytea => field.r#type = Some(field_descriptor_proto::Type::Bytes.into()),
947        DataType::Jsonb => field.r#type = Some(field_descriptor_proto::Type::String.into()),
948        DataType::Serial => field.r#type = Some(field_descriptor_proto::Type::Int64.into()),
949        DataType::Float32 | DataType::Int256 => {
950            return Err(SinkError::BigQuery(anyhow::anyhow!(
951                "Don't support Float32 and Int256"
952            )));
953        }
954        DataType::Map(_) => todo!(),
955        DataType::Vector(_) => todo!("VECTOR_PLACEHOLDER"),
956    }
957    Ok((field, None))
958}
959
960#[cfg(test)]
961mod test {
962
963    use std::assert_matches::assert_matches;
964
965    use risingwave_common::catalog::{Field, Schema};
966    use risingwave_common::types::{DataType, StructType};
967
968    use crate::sink::big_query::{
969        BigQuerySink, build_protobuf_descriptor_pool, build_protobuf_schema,
970    };
971
972    #[tokio::test]
973    async fn test_type_check() {
974        let big_query_type_string = "ARRAY<STRUCT<v1 ARRAY<INT64>, v2 STRUCT<v1 INT64, v2 INT64>>>";
975        let rw_datatype = DataType::List(Box::new(DataType::Struct(StructType::new(vec![
976            ("v1".to_owned(), DataType::List(Box::new(DataType::Int64))),
977            (
978                "v2".to_owned(),
979                DataType::Struct(StructType::new(vec![
980                    ("v1".to_owned(), DataType::Int64),
981                    ("v2".to_owned(), DataType::Int64),
982                ])),
983            ),
984        ]))));
985        assert_eq!(
986            BigQuerySink::get_string_and_check_support_from_datatype(&rw_datatype).unwrap(),
987            big_query_type_string
988        );
989    }
990
991    #[tokio::test]
992    async fn test_schema_check() {
993        let schema = Schema {
994            fields: vec![
995                Field::with_name(DataType::Int64, "v1"),
996                Field::with_name(DataType::Float64, "v2"),
997                Field::with_name(
998                    DataType::List(Box::new(DataType::Struct(StructType::new(vec![
999                        ("v1".to_owned(), DataType::List(Box::new(DataType::Int64))),
1000                        (
1001                            "v3".to_owned(),
1002                            DataType::Struct(StructType::new(vec![
1003                                ("v1".to_owned(), DataType::Int64),
1004                                ("v2".to_owned(), DataType::Int64),
1005                            ])),
1006                        ),
1007                    ])))),
1008                    "v3",
1009                ),
1010            ],
1011        };
1012        let fields = schema
1013            .fields()
1014            .iter()
1015            .map(|f| (f.name.as_str(), &f.data_type));
1016        let desc = build_protobuf_schema(fields, "t1".to_owned()).unwrap();
1017        let pool = build_protobuf_descriptor_pool(&desc).unwrap();
1018        let t1_message = pool.get_message_by_name("t1").unwrap();
1019        assert_matches!(
1020            t1_message.get_field_by_name("v1").unwrap().kind(),
1021            prost_reflect::Kind::Int64
1022        );
1023        assert_matches!(
1024            t1_message.get_field_by_name("v2").unwrap().kind(),
1025            prost_reflect::Kind::Double
1026        );
1027        assert_matches!(
1028            t1_message.get_field_by_name("v3").unwrap().kind(),
1029            prost_reflect::Kind::Message(_)
1030        );
1031
1032        let v3_message = pool.get_message_by_name("t1.Structv3").unwrap();
1033        assert_matches!(
1034            v3_message.get_field_by_name("v1").unwrap().kind(),
1035            prost_reflect::Kind::Int64
1036        );
1037        assert!(v3_message.get_field_by_name("v1").unwrap().is_list());
1038
1039        let v3_v3_message = pool.get_message_by_name("t1.Structv3.Structv3").unwrap();
1040        assert_matches!(
1041            v3_v3_message.get_field_by_name("v1").unwrap().kind(),
1042            prost_reflect::Kind::Int64
1043        );
1044        assert_matches!(
1045            v3_v3_message.get_field_by_name("v2").unwrap().kind(),
1046            prost_reflect::Kind::Int64
1047        );
1048    }
1049}