risingwave_connector/sink/
big_query.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use core::pin::Pin;
16use core::time::Duration;
17use std::collections::{BTreeMap, HashMap, VecDeque};
18
19use anyhow::{Context, anyhow};
20use base64::Engine;
21use base64::prelude::BASE64_STANDARD;
22use futures::future::pending;
23use futures::prelude::Future;
24use futures::{Stream, StreamExt};
25use futures_async_stream::try_stream;
26use gcp_bigquery_client::Client;
27use gcp_bigquery_client::error::BQError;
28use gcp_bigquery_client::model::query_request::QueryRequest;
29use gcp_bigquery_client::model::table::Table;
30use gcp_bigquery_client::model::table_field_schema::TableFieldSchema;
31use gcp_bigquery_client::model::table_schema::TableSchema;
32use google_cloud_bigquery::grpc::apiv1::conn_pool::ConnectionManager;
33use google_cloud_gax::conn::{ConnectionOptions, Environment};
34use google_cloud_gax::grpc::Request;
35use google_cloud_googleapis::cloud::bigquery::storage::v1::append_rows_request::{
36    MissingValueInterpretation, ProtoData, Rows as AppendRowsRequestRows,
37};
38use google_cloud_googleapis::cloud::bigquery::storage::v1::{
39    AppendRowsRequest, AppendRowsResponse, ProtoRows, ProtoSchema,
40};
41use google_cloud_pubsub::client::google_cloud_auth;
42use google_cloud_pubsub::client::google_cloud_auth::credentials::CredentialsFile;
43use phf::{Set, phf_set};
44use prost_reflect::{FieldDescriptor, MessageDescriptor};
45use prost_types::{
46    DescriptorProto, FieldDescriptorProto, FileDescriptorProto, FileDescriptorSet,
47    field_descriptor_proto,
48};
49use risingwave_common::array::{Op, StreamChunk};
50use risingwave_common::catalog::{Field, Schema};
51use risingwave_common::types::DataType;
52use serde::Deserialize;
53use serde_with::{DisplayFromStr, serde_as};
54use simd_json::prelude::ArrayTrait;
55use tokio::sync::mpsc;
56use tonic::{Response, Status, async_trait};
57use url::Url;
58use uuid::Uuid;
59use with_options::WithOptions;
60use yup_oauth2::ServiceAccountKey;
61
62use super::encoder::{ProtoEncoder, ProtoHeader, RowEncoder, SerTo};
63use super::log_store::{LogStoreReadItem, TruncateOffset};
64use super::{
65    LogSinker, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, SinkError, SinkLogReader,
66};
67use crate::aws_utils::load_file_descriptor_from_s3;
68use crate::connector_common::AwsAuthProps;
69use crate::enforce_secret::EnforceSecret;
70use crate::sink::{Result, Sink, SinkParam, SinkWriterParam};
71
72pub const BIGQUERY_SINK: &str = "bigquery";
73pub const CHANGE_TYPE: &str = "_CHANGE_TYPE";
74const DEFAULT_GRPC_CHANNEL_NUMS: usize = 4;
75const CONNECT_TIMEOUT: Option<Duration> = Some(Duration::from_secs(30));
76const CONNECTION_TIMEOUT: Option<Duration> = None;
77const BIGQUERY_SEND_FUTURE_BUFFER_MAX_SIZE: usize = 65536;
78// < 10MB, we set 8MB
79const MAX_ROW_SIZE: usize = 8 * 1024 * 1024;
80
81#[serde_as]
82#[derive(Deserialize, Debug, Clone, WithOptions)]
83pub struct BigQueryCommon {
84    #[serde(rename = "bigquery.local.path")]
85    pub local_path: Option<String>,
86    #[serde(rename = "bigquery.s3.path")]
87    pub s3_path: Option<String>,
88    #[serde(rename = "bigquery.project")]
89    pub project: String,
90    #[serde(rename = "bigquery.dataset")]
91    pub dataset: String,
92    #[serde(rename = "bigquery.table")]
93    pub table: String,
94    #[serde(default)] // default false
95    #[serde_as(as = "DisplayFromStr")]
96    pub auto_create: bool,
97    #[serde(rename = "bigquery.credentials")]
98    pub credentials: Option<String>,
99}
100
101impl EnforceSecret for BigQueryCommon {
102    const ENFORCE_SECRET_PROPERTIES: Set<&'static str> = phf_set! {
103        "bigquery.credentials",
104    };
105}
106
107struct BigQueryFutureManager {
108    // `offset_queue` holds the Some corresponding to each future.
109    // When TruncateOffset is barrier, the num is 0, we don't need to wait for the return of `resp_stream`.
110    // When TruncateOffset is chunk:
111    // 1. chunk has no rows. we didn't send, the num is 0, we don't need to wait for the return of `resp_stream`.
112    // 2. chunk is less than `MAX_ROW_SIZE`, we only sent once, the num is 1 and we only have to wait once for `resp_stream`.
113    // 3. chunk is less than `MAX_ROW_SIZE`, we only sent n, the num is n and we need to wait n times for r.
114    offset_queue: VecDeque<(TruncateOffset, usize)>,
115    resp_stream: Pin<Box<dyn Stream<Item = Result<()>> + Send>>,
116}
117impl BigQueryFutureManager {
118    pub fn new(
119        max_future_num: usize,
120        resp_stream: impl Stream<Item = Result<()>> + Send + 'static,
121    ) -> Self {
122        let offset_queue = VecDeque::with_capacity(max_future_num);
123        Self {
124            offset_queue,
125            resp_stream: Box::pin(resp_stream),
126        }
127    }
128
129    pub fn add_offset(&mut self, offset: TruncateOffset, resp_num: usize) {
130        self.offset_queue.push_back((offset, resp_num));
131    }
132
133    pub async fn next_offset(&mut self) -> Result<TruncateOffset> {
134        if let Some((_offset, remaining_resp_num)) = self.offset_queue.front_mut() {
135            if *remaining_resp_num == 0 {
136                return Ok(self.offset_queue.pop_front().unwrap().0);
137            }
138            while *remaining_resp_num > 0 {
139                self.resp_stream
140                    .next()
141                    .await
142                    .ok_or_else(|| SinkError::BigQuery(anyhow::anyhow!("end of stream")))??;
143                *remaining_resp_num -= 1;
144            }
145            Ok(self.offset_queue.pop_front().unwrap().0)
146        } else {
147            pending().await
148        }
149    }
150}
151pub struct BigQueryLogSinker {
152    writer: BigQuerySinkWriter,
153    bigquery_future_manager: BigQueryFutureManager,
154    future_num: usize,
155}
156impl BigQueryLogSinker {
157    pub fn new(
158        writer: BigQuerySinkWriter,
159        resp_stream: impl Stream<Item = Result<()>> + Send + 'static,
160        future_num: usize,
161    ) -> Self {
162        Self {
163            writer,
164            bigquery_future_manager: BigQueryFutureManager::new(future_num, resp_stream),
165            future_num,
166        }
167    }
168}
169
170#[async_trait]
171impl LogSinker for BigQueryLogSinker {
172    async fn consume_log_and_sink(mut self, mut log_reader: impl SinkLogReader) -> Result<!> {
173        log_reader.start_from(None).await?;
174        loop {
175            tokio::select!(
176                offset = self.bigquery_future_manager.next_offset() => {
177                        log_reader.truncate(offset?)?;
178                }
179                item_result = log_reader.next_item(), if self.bigquery_future_manager.offset_queue.len() <= self.future_num => {
180                    let (epoch, item) = item_result?;
181                    match item {
182                        LogStoreReadItem::StreamChunk { chunk_id, chunk } => {
183                            let resp_num = self.writer.write_chunk(chunk)?;
184                            self.bigquery_future_manager
185                                .add_offset(TruncateOffset::Chunk { epoch, chunk_id },resp_num);
186                        }
187                        LogStoreReadItem::Barrier { .. } => {
188                            self.bigquery_future_manager
189                                .add_offset(TruncateOffset::Barrier { epoch },0);
190                        }
191                    }
192                }
193            )
194        }
195    }
196}
197
198impl BigQueryCommon {
199    async fn build_client(&self, aws_auth_props: &AwsAuthProps) -> Result<Client> {
200        let auth_json = self.get_auth_json_from_path(aws_auth_props).await?;
201
202        let service_account =
203            if let Ok(auth_json_from_base64) = BASE64_STANDARD.decode(auth_json.clone()) {
204                serde_json::from_slice::<ServiceAccountKey>(&auth_json_from_base64)
205            } else {
206                serde_json::from_str::<ServiceAccountKey>(&auth_json)
207            }
208            .map_err(|e| SinkError::BigQuery(e.into()))?;
209
210        let client: Client = Client::from_service_account_key(service_account, false)
211            .await
212            .map_err(|err| SinkError::BigQuery(anyhow::anyhow!(err)))?;
213        Ok(client)
214    }
215
216    async fn build_writer_client(
217        &self,
218        aws_auth_props: &AwsAuthProps,
219    ) -> Result<(StorageWriterClient, impl Stream<Item = Result<()>> + use<>)> {
220        let auth_json = self.get_auth_json_from_path(aws_auth_props).await?;
221
222        let credentials_file =
223            if let Ok(auth_json_from_base64) = BASE64_STANDARD.decode(auth_json.clone()) {
224                serde_json::from_slice::<CredentialsFile>(&auth_json_from_base64)
225            } else {
226                serde_json::from_str::<CredentialsFile>(&auth_json)
227            }
228            .map_err(|e| SinkError::BigQuery(e.into()))?;
229
230        StorageWriterClient::new(credentials_file).await
231    }
232
233    async fn get_auth_json_from_path(&self, aws_auth_props: &AwsAuthProps) -> Result<String> {
234        if let Some(credentials) = &self.credentials {
235            Ok(credentials.clone())
236        } else if let Some(local_path) = &self.local_path {
237            std::fs::read_to_string(local_path)
238                .map_err(|err| SinkError::BigQuery(anyhow::anyhow!(err)))
239        } else if let Some(s3_path) = &self.s3_path {
240            let url =
241                Url::parse(s3_path).map_err(|err| SinkError::BigQuery(anyhow::anyhow!(err)))?;
242            let auth_vec = load_file_descriptor_from_s3(&url, aws_auth_props)
243                .await
244                .map_err(|err| SinkError::BigQuery(anyhow::anyhow!(err)))?;
245            Ok(String::from_utf8(auth_vec).map_err(|e| SinkError::BigQuery(e.into()))?)
246        } else {
247            Err(SinkError::BigQuery(anyhow::anyhow!(
248                "`bigquery.local.path` and `bigquery.s3.path` set at least one, configure as needed."
249            )))
250        }
251    }
252}
253
254#[serde_as]
255#[derive(Clone, Debug, Deserialize, WithOptions)]
256pub struct BigQueryConfig {
257    #[serde(flatten)]
258    pub common: BigQueryCommon,
259    #[serde(flatten)]
260    pub aws_auth_props: AwsAuthProps,
261    pub r#type: String, // accept "append-only" or "upsert"
262}
263
264impl EnforceSecret for BigQueryConfig {
265    fn enforce_one(prop: &str) -> crate::error::ConnectorResult<()> {
266        BigQueryCommon::enforce_one(prop)?;
267        AwsAuthProps::enforce_one(prop)?;
268        Ok(())
269    }
270}
271
272impl BigQueryConfig {
273    pub fn from_btreemap(properties: BTreeMap<String, String>) -> Result<Self> {
274        let config =
275            serde_json::from_value::<BigQueryConfig>(serde_json::to_value(properties).unwrap())
276                .map_err(|e| SinkError::Config(anyhow!(e)))?;
277        if config.r#type != SINK_TYPE_APPEND_ONLY && config.r#type != SINK_TYPE_UPSERT {
278            return Err(SinkError::Config(anyhow!(
279                "`{}` must be {}, or {}",
280                SINK_TYPE_OPTION,
281                SINK_TYPE_APPEND_ONLY,
282                SINK_TYPE_UPSERT
283            )));
284        }
285        Ok(config)
286    }
287}
288
289#[derive(Debug)]
290pub struct BigQuerySink {
291    pub config: BigQueryConfig,
292    schema: Schema,
293    pk_indices: Vec<usize>,
294    is_append_only: bool,
295}
296
297impl EnforceSecret for BigQuerySink {
298    fn enforce_secret<'a>(
299        prop_iter: impl Iterator<Item = &'a str>,
300    ) -> crate::error::ConnectorResult<()> {
301        for prop in prop_iter {
302            BigQueryConfig::enforce_one(prop)?;
303        }
304        Ok(())
305    }
306}
307
308impl BigQuerySink {
309    pub fn new(
310        config: BigQueryConfig,
311        schema: Schema,
312        pk_indices: Vec<usize>,
313        is_append_only: bool,
314    ) -> Result<Self> {
315        Ok(Self {
316            config,
317            schema,
318            pk_indices,
319            is_append_only,
320        })
321    }
322}
323
324impl BigQuerySink {
325    fn check_column_name_and_type(
326        &self,
327        big_query_columns_desc: HashMap<String, String>,
328    ) -> Result<()> {
329        let rw_fields_name = self.schema.fields();
330        if big_query_columns_desc.is_empty() {
331            return Err(SinkError::BigQuery(anyhow::anyhow!(
332                "Cannot find table in bigquery"
333            )));
334        }
335        if rw_fields_name.len().ne(&big_query_columns_desc.len()) {
336            return Err(SinkError::BigQuery(anyhow::anyhow!(
337                "The length of the RisingWave column {} must be equal to the length of the bigquery column {}",
338                rw_fields_name.len(),
339                big_query_columns_desc.len()
340            )));
341        }
342
343        for i in rw_fields_name {
344            let value = big_query_columns_desc.get(&i.name).ok_or_else(|| {
345                SinkError::BigQuery(anyhow::anyhow!(
346                    "Column `{:?}` on RisingWave side is not found on BigQuery side.",
347                    i.name
348                ))
349            })?;
350            let data_type_string = Self::get_string_and_check_support_from_datatype(&i.data_type)?;
351            if data_type_string.ne(value) {
352                return Err(SinkError::BigQuery(anyhow::anyhow!(
353                    "Data type mismatch for column `{:?}`. BigQuery side: `{:?}`, RisingWave side: `{:?}`. ",
354                    i.name,
355                    value,
356                    data_type_string
357                )));
358            };
359        }
360        Ok(())
361    }
362
363    fn get_string_and_check_support_from_datatype(rw_data_type: &DataType) -> Result<String> {
364        match rw_data_type {
365            DataType::Boolean => Ok("BOOL".to_owned()),
366            DataType::Int16 => Ok("INT64".to_owned()),
367            DataType::Int32 => Ok("INT64".to_owned()),
368            DataType::Int64 => Ok("INT64".to_owned()),
369            DataType::Float32 => Err(SinkError::BigQuery(anyhow::anyhow!(
370                "REAL is not supported for BigQuery sink. Please convert to FLOAT64 or other supported types."
371            ))),
372            DataType::Float64 => Ok("FLOAT64".to_owned()),
373            DataType::Decimal => Ok("NUMERIC".to_owned()),
374            DataType::Date => Ok("DATE".to_owned()),
375            DataType::Varchar => Ok("STRING".to_owned()),
376            DataType::Time => Ok("TIME".to_owned()),
377            DataType::Timestamp => Ok("DATETIME".to_owned()),
378            DataType::Timestamptz => Ok("TIMESTAMP".to_owned()),
379            DataType::Interval => Ok("INTERVAL".to_owned()),
380            DataType::Struct(structs) => {
381                let mut elements_vec = vec![];
382                for (name, datatype) in structs.iter() {
383                    let element_string =
384                        Self::get_string_and_check_support_from_datatype(datatype)?;
385                    elements_vec.push(format!("{} {}", name, element_string));
386                }
387                Ok(format!("STRUCT<{}>", elements_vec.join(", ")))
388            }
389            DataType::List(l) => {
390                let element_string = Self::get_string_and_check_support_from_datatype(l.as_ref())?;
391                Ok(format!("ARRAY<{}>", element_string))
392            }
393            DataType::Bytea => Ok("BYTES".to_owned()),
394            DataType::Jsonb => Ok("JSON".to_owned()),
395            DataType::Serial => Ok("INT64".to_owned()),
396            DataType::Int256 => Err(SinkError::BigQuery(anyhow::anyhow!(
397                "INT256 is not supported for BigQuery sink."
398            ))),
399            DataType::Map(_) => Err(SinkError::BigQuery(anyhow::anyhow!(
400                "MAP is not supported for BigQuery sink."
401            ))),
402            DataType::Vector(_) => Err(SinkError::BigQuery(anyhow::anyhow!(
403                "VECTOR is not supported for BigQuery sink."
404            ))),
405        }
406    }
407
408    fn map_field(rw_field: &Field) -> Result<TableFieldSchema> {
409        let tfs = match &rw_field.data_type {
410            DataType::Boolean => TableFieldSchema::bool(&rw_field.name),
411            DataType::Int16 | DataType::Int32 | DataType::Int64 | DataType::Serial => {
412                TableFieldSchema::integer(&rw_field.name)
413            }
414            DataType::Float32 => {
415                return Err(SinkError::BigQuery(anyhow::anyhow!(
416                    "REAL is not supported for BigQuery sink. Please convert to FLOAT64 or other supported types."
417                )));
418            }
419            DataType::Float64 => TableFieldSchema::float(&rw_field.name),
420            DataType::Decimal => TableFieldSchema::numeric(&rw_field.name),
421            DataType::Date => TableFieldSchema::date(&rw_field.name),
422            DataType::Varchar => TableFieldSchema::string(&rw_field.name),
423            DataType::Time => TableFieldSchema::time(&rw_field.name),
424            DataType::Timestamp => TableFieldSchema::date_time(&rw_field.name),
425            DataType::Timestamptz => TableFieldSchema::timestamp(&rw_field.name),
426            DataType::Interval => {
427                return Err(SinkError::BigQuery(anyhow::anyhow!(
428                    "INTERVAL is not supported for BigQuery sink. Please convert to VARCHAR or other supported types."
429                )));
430            }
431            DataType::Struct(st) => {
432                let mut sub_fields = Vec::with_capacity(st.len());
433                for (name, dt) in st.iter() {
434                    let rw_field = Field::with_name(dt.clone(), name);
435                    let field = Self::map_field(&rw_field)?;
436                    sub_fields.push(field);
437                }
438                TableFieldSchema::record(&rw_field.name, sub_fields)
439            }
440            DataType::List(dt) => {
441                let inner_field = Self::map_field(&Field::with_name(*dt.clone(), &rw_field.name))?;
442                TableFieldSchema {
443                    mode: Some("REPEATED".to_owned()),
444                    ..inner_field
445                }
446            }
447
448            DataType::Bytea => TableFieldSchema::bytes(&rw_field.name),
449            DataType::Jsonb => TableFieldSchema::json(&rw_field.name),
450            DataType::Int256 => {
451                return Err(SinkError::BigQuery(anyhow::anyhow!(
452                    "INT256 is not supported for BigQuery sink."
453                )));
454            }
455            DataType::Map(_) => {
456                return Err(SinkError::BigQuery(anyhow::anyhow!(
457                    "MAP is not supported for BigQuery sink."
458                )));
459            }
460            DataType::Vector(_) => {
461                return Err(SinkError::BigQuery(anyhow::anyhow!(
462                    "VECTOR is not supported for BigQuery sink."
463                )));
464            }
465        };
466        Ok(tfs)
467    }
468
469    async fn create_table(
470        &self,
471        client: &Client,
472        project_id: &str,
473        dataset_id: &str,
474        table_id: &str,
475        fields: &Vec<Field>,
476    ) -> Result<Table> {
477        let dataset = client
478            .dataset()
479            .get(project_id, dataset_id)
480            .await
481            .map_err(|e| SinkError::BigQuery(e.into()))?;
482        let fields: Vec<_> = fields.iter().map(Self::map_field).collect::<Result<_>>()?;
483        let table = Table::from_dataset(&dataset, table_id, TableSchema::new(fields));
484
485        client
486            .table()
487            .create(table)
488            .await
489            .map_err(|e| SinkError::BigQuery(e.into()))
490    }
491}
492
493impl Sink for BigQuerySink {
494    type LogSinker = BigQueryLogSinker;
495
496    const SINK_NAME: &'static str = BIGQUERY_SINK;
497
498    async fn new_log_sinker(&self, _writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
499        let (writer, resp_stream) = BigQuerySinkWriter::new(
500            self.config.clone(),
501            self.schema.clone(),
502            self.pk_indices.clone(),
503            self.is_append_only,
504        )
505        .await?;
506        Ok(BigQueryLogSinker::new(
507            writer,
508            resp_stream,
509            BIGQUERY_SEND_FUTURE_BUFFER_MAX_SIZE,
510        ))
511    }
512
513    async fn validate(&self) -> Result<()> {
514        risingwave_common::license::Feature::BigQuerySink
515            .check_available()
516            .map_err(|e| anyhow::anyhow!(e))?;
517        if !self.is_append_only && self.pk_indices.is_empty() {
518            return Err(SinkError::Config(anyhow!(
519                "Primary key not defined for upsert bigquery sink (please define in `primary_key` field)"
520            )));
521        }
522        let client = self
523            .config
524            .common
525            .build_client(&self.config.aws_auth_props)
526            .await?;
527        let BigQueryCommon {
528            project: project_id,
529            dataset: dataset_id,
530            table: table_id,
531            ..
532        } = &self.config.common;
533
534        if self.config.common.auto_create {
535            match client
536                .table()
537                .get(project_id, dataset_id, table_id, None)
538                .await
539            {
540                Err(BQError::RequestError(_)) => {
541                    // early return: no need to query schema to check column and type
542                    return self
543                        .create_table(
544                            &client,
545                            project_id,
546                            dataset_id,
547                            table_id,
548                            &self.schema.fields,
549                        )
550                        .await
551                        .map(|_| ());
552                }
553                Err(e) => return Err(SinkError::BigQuery(e.into())),
554                _ => {}
555            }
556        }
557
558        let mut rs = client
559            .job()
560            .query(
561                &self.config.common.project,
562                QueryRequest::new(format!(
563                    "SELECT column_name, data_type FROM `{}.{}.INFORMATION_SCHEMA.COLUMNS` WHERE table_name = '{}'",
564                    project_id, dataset_id, table_id,
565                )),
566            ).await.map_err(|e| SinkError::BigQuery(e.into()))?;
567
568        let mut big_query_schema = HashMap::default();
569        while rs.next_row() {
570            big_query_schema.insert(
571                rs.get_string_by_name("column_name")
572                    .map_err(|e| SinkError::BigQuery(e.into()))?
573                    .ok_or_else(|| {
574                        SinkError::BigQuery(anyhow::anyhow!("Cannot find column_name"))
575                    })?,
576                rs.get_string_by_name("data_type")
577                    .map_err(|e| SinkError::BigQuery(e.into()))?
578                    .ok_or_else(|| {
579                        SinkError::BigQuery(anyhow::anyhow!("Cannot find column_name"))
580                    })?,
581            );
582        }
583
584        self.check_column_name_and_type(big_query_schema)?;
585        Ok(())
586    }
587}
588
589pub struct BigQuerySinkWriter {
590    pub config: BigQueryConfig,
591    #[expect(dead_code)]
592    schema: Schema,
593    #[expect(dead_code)]
594    pk_indices: Vec<usize>,
595    client: StorageWriterClient,
596    is_append_only: bool,
597    row_encoder: ProtoEncoder,
598    writer_pb_schema: ProtoSchema,
599    #[expect(dead_code)]
600    message_descriptor: MessageDescriptor,
601    write_stream: String,
602    proto_field: Option<FieldDescriptor>,
603}
604
605impl TryFrom<SinkParam> for BigQuerySink {
606    type Error = SinkError;
607
608    fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
609        let schema = param.schema();
610        let config = BigQueryConfig::from_btreemap(param.properties)?;
611        BigQuerySink::new(
612            config,
613            schema,
614            param.downstream_pk,
615            param.sink_type.is_append_only(),
616        )
617    }
618}
619
620impl BigQuerySinkWriter {
621    pub async fn new(
622        config: BigQueryConfig,
623        schema: Schema,
624        pk_indices: Vec<usize>,
625        is_append_only: bool,
626    ) -> Result<(Self, impl Stream<Item = Result<()>>)> {
627        let (client, resp_stream) = config
628            .common
629            .build_writer_client(&config.aws_auth_props)
630            .await?;
631        let mut descriptor_proto = build_protobuf_schema(
632            schema
633                .fields()
634                .iter()
635                .map(|f| (f.name.as_str(), &f.data_type)),
636            config.common.table.clone(),
637        )?;
638
639        if !is_append_only {
640            let field = FieldDescriptorProto {
641                name: Some(CHANGE_TYPE.to_owned()),
642                number: Some((schema.len() + 1) as i32),
643                r#type: Some(field_descriptor_proto::Type::String.into()),
644                ..Default::default()
645            };
646            descriptor_proto.field.push(field);
647        }
648
649        let descriptor_pool = build_protobuf_descriptor_pool(&descriptor_proto)?;
650        let message_descriptor = descriptor_pool
651            .get_message_by_name(&config.common.table)
652            .ok_or_else(|| {
653                SinkError::BigQuery(anyhow::anyhow!(
654                    "Can't find message proto {}",
655                    &config.common.table
656                ))
657            })?;
658        let proto_field = if !is_append_only {
659            let proto_field = message_descriptor
660                .get_field_by_name(CHANGE_TYPE)
661                .ok_or_else(|| {
662                    SinkError::BigQuery(anyhow::anyhow!("Can't find {}", CHANGE_TYPE))
663                })?;
664            Some(proto_field)
665        } else {
666            None
667        };
668        let row_encoder = ProtoEncoder::new(
669            schema.clone(),
670            None,
671            message_descriptor.clone(),
672            ProtoHeader::None,
673        )?;
674        Ok((
675            Self {
676                write_stream: format!(
677                    "projects/{}/datasets/{}/tables/{}/streams/_default",
678                    config.common.project, config.common.dataset, config.common.table
679                ),
680                config,
681                schema,
682                pk_indices,
683                client,
684                is_append_only,
685                row_encoder,
686                message_descriptor,
687                proto_field,
688                writer_pb_schema: ProtoSchema {
689                    proto_descriptor: Some(descriptor_proto),
690                },
691            },
692            resp_stream,
693        ))
694    }
695
696    fn append_only(&mut self, chunk: StreamChunk) -> Result<Vec<Vec<u8>>> {
697        let mut serialized_rows: Vec<Vec<u8>> = Vec::with_capacity(chunk.capacity());
698        for (op, row) in chunk.rows() {
699            if op != Op::Insert {
700                continue;
701            }
702            serialized_rows.push(self.row_encoder.encode(row)?.ser_to()?)
703        }
704        Ok(serialized_rows)
705    }
706
707    fn upsert(&mut self, chunk: StreamChunk) -> Result<Vec<Vec<u8>>> {
708        let mut serialized_rows: Vec<Vec<u8>> = Vec::with_capacity(chunk.capacity());
709        for (op, row) in chunk.rows() {
710            if op == Op::UpdateDelete {
711                continue;
712            }
713            let mut pb_row = self.row_encoder.encode(row)?;
714            match op {
715                Op::Insert => pb_row
716                    .message
717                    .try_set_field(
718                        self.proto_field.as_ref().unwrap(),
719                        prost_reflect::Value::String("UPSERT".to_owned()),
720                    )
721                    .map_err(|e| SinkError::BigQuery(e.into()))?,
722                Op::Delete => pb_row
723                    .message
724                    .try_set_field(
725                        self.proto_field.as_ref().unwrap(),
726                        prost_reflect::Value::String("DELETE".to_owned()),
727                    )
728                    .map_err(|e| SinkError::BigQuery(e.into()))?,
729                Op::UpdateDelete => continue,
730                Op::UpdateInsert => pb_row
731                    .message
732                    .try_set_field(
733                        self.proto_field.as_ref().unwrap(),
734                        prost_reflect::Value::String("UPSERT".to_owned()),
735                    )
736                    .map_err(|e| SinkError::BigQuery(e.into()))?,
737            };
738
739            serialized_rows.push(pb_row.ser_to()?)
740        }
741        Ok(serialized_rows)
742    }
743
744    fn write_chunk(&mut self, chunk: StreamChunk) -> Result<usize> {
745        let serialized_rows = if self.is_append_only {
746            self.append_only(chunk)?
747        } else {
748            self.upsert(chunk)?
749        };
750        if serialized_rows.is_empty() {
751            return Ok(0);
752        }
753        let mut result = Vec::new();
754        let mut result_inner = Vec::new();
755        let mut size_count = 0;
756        for i in serialized_rows {
757            size_count += i.len();
758            if size_count > MAX_ROW_SIZE {
759                result.push(result_inner);
760                result_inner = Vec::new();
761                size_count = i.len();
762            }
763            result_inner.push(i);
764        }
765        if !result_inner.is_empty() {
766            result.push(result_inner);
767        }
768        let len = result.len();
769        for serialized_rows in result {
770            let rows = AppendRowsRequestRows::ProtoRows(ProtoData {
771                writer_schema: Some(self.writer_pb_schema.clone()),
772                rows: Some(ProtoRows { serialized_rows }),
773            });
774            self.client.append_rows(rows, self.write_stream.clone())?;
775        }
776        Ok(len)
777    }
778}
779
780#[try_stream(ok = (), error = SinkError)]
781pub async fn resp_to_stream(
782    resp_stream: impl Future<
783        Output = std::result::Result<
784            Response<google_cloud_gax::grpc::Streaming<AppendRowsResponse>>,
785            Status,
786        >,
787    >
788    + 'static
789    + Send,
790) {
791    let mut resp_stream = resp_stream
792        .await
793        .map_err(|e| SinkError::BigQuery(e.into()))?
794        .into_inner();
795    loop {
796        match resp_stream
797            .message()
798            .await
799            .map_err(|e| SinkError::BigQuery(e.into()))?
800        {
801            Some(append_rows_response) => {
802                if !append_rows_response.row_errors.is_empty() {
803                    return Err(SinkError::BigQuery(anyhow::anyhow!(
804                        "bigquery insert error {:?}",
805                        append_rows_response.row_errors
806                    )));
807                }
808                if let Some(google_cloud_googleapis::cloud::bigquery::storage::v1::append_rows_response::Response::Error(status)) = append_rows_response.response{
809                            return Err(SinkError::BigQuery(anyhow::anyhow!(
810                                "bigquery insert error {:?}",
811                                status
812                            )));
813                        }
814                yield ();
815            }
816            None => {
817                return Err(SinkError::BigQuery(anyhow::anyhow!(
818                    "bigquery insert error: end of resp stream",
819                )));
820            }
821        }
822    }
823}
824
825struct StorageWriterClient {
826    #[expect(dead_code)]
827    environment: Environment,
828    request_sender: mpsc::UnboundedSender<AppendRowsRequest>,
829}
830impl StorageWriterClient {
831    pub async fn new(
832        credentials: CredentialsFile,
833    ) -> Result<(Self, impl Stream<Item = Result<()>>)> {
834        let ts_grpc = google_cloud_auth::token::DefaultTokenSourceProvider::new_with_credentials(
835            Self::bigquery_grpc_auth_config(),
836            Box::new(credentials),
837        )
838        .await
839        .map_err(|e| SinkError::BigQuery(e.into()))?;
840        let conn_options = ConnectionOptions {
841            connect_timeout: CONNECT_TIMEOUT,
842            timeout: CONNECTION_TIMEOUT,
843        };
844        let environment = Environment::GoogleCloud(Box::new(ts_grpc));
845        let conn = ConnectionManager::new(DEFAULT_GRPC_CHANNEL_NUMS, &environment, &conn_options)
846            .await
847            .map_err(|e| SinkError::BigQuery(e.into()))?;
848        let mut client = conn.writer();
849
850        let (tx, rx) = mpsc::unbounded_channel();
851        let stream = tokio_stream::wrappers::UnboundedReceiverStream::new(rx);
852
853        let resp = async move { client.append_rows(Request::new(stream)).await };
854        let resp_stream = resp_to_stream(resp);
855
856        Ok((
857            StorageWriterClient {
858                environment,
859                request_sender: tx,
860            },
861            resp_stream,
862        ))
863    }
864
865    pub fn append_rows(&mut self, row: AppendRowsRequestRows, write_stream: String) -> Result<()> {
866        let append_req = AppendRowsRequest {
867            write_stream: write_stream.clone(),
868            offset: None,
869            trace_id: Uuid::new_v4().hyphenated().to_string(),
870            missing_value_interpretations: HashMap::default(),
871            rows: Some(row),
872            default_missing_value_interpretation: MissingValueInterpretation::DefaultValue as i32,
873        };
874        self.request_sender
875            .send(append_req)
876            .map_err(|e| SinkError::BigQuery(e.into()))?;
877        Ok(())
878    }
879
880    fn bigquery_grpc_auth_config() -> google_cloud_auth::project::Config<'static> {
881        let mut auth_config = google_cloud_auth::project::Config::default();
882        auth_config =
883            auth_config.with_audience(google_cloud_bigquery::grpc::apiv1::conn_pool::AUDIENCE);
884        auth_config =
885            auth_config.with_scopes(&google_cloud_bigquery::grpc::apiv1::conn_pool::SCOPES);
886        auth_config
887    }
888}
889
890fn build_protobuf_descriptor_pool(desc: &DescriptorProto) -> Result<prost_reflect::DescriptorPool> {
891    let file_descriptor = FileDescriptorProto {
892        message_type: vec![desc.clone()],
893        name: Some("bigquery".to_owned()),
894        ..Default::default()
895    };
896
897    prost_reflect::DescriptorPool::from_file_descriptor_set(FileDescriptorSet {
898        file: vec![file_descriptor],
899    })
900    .context("failed to build descriptor pool")
901    .map_err(SinkError::BigQuery)
902}
903
904fn build_protobuf_schema<'a>(
905    fields: impl Iterator<Item = (&'a str, &'a DataType)>,
906    name: String,
907) -> Result<DescriptorProto> {
908    let mut proto = DescriptorProto {
909        name: Some(name),
910        ..Default::default()
911    };
912    let mut struct_vec = vec![];
913    let field_vec = fields
914        .enumerate()
915        .map(|(index, (name, data_type))| {
916            let (field, des_proto) =
917                build_protobuf_field(data_type, (index + 1) as i32, name.to_owned())?;
918            if let Some(sv) = des_proto {
919                struct_vec.push(sv);
920            }
921            Ok(field)
922        })
923        .collect::<Result<Vec<_>>>()?;
924    proto.field = field_vec;
925    proto.nested_type = struct_vec;
926    Ok(proto)
927}
928
929fn build_protobuf_field(
930    data_type: &DataType,
931    index: i32,
932    name: String,
933) -> Result<(FieldDescriptorProto, Option<DescriptorProto>)> {
934    let mut field = FieldDescriptorProto {
935        name: Some(name.clone()),
936        number: Some(index),
937        ..Default::default()
938    };
939    match data_type {
940        DataType::Boolean => field.r#type = Some(field_descriptor_proto::Type::Bool.into()),
941        DataType::Int32 => field.r#type = Some(field_descriptor_proto::Type::Int32.into()),
942        DataType::Int16 | DataType::Int64 => {
943            field.r#type = Some(field_descriptor_proto::Type::Int64.into())
944        }
945        DataType::Float64 => field.r#type = Some(field_descriptor_proto::Type::Double.into()),
946        DataType::Decimal => field.r#type = Some(field_descriptor_proto::Type::String.into()),
947        DataType::Date => field.r#type = Some(field_descriptor_proto::Type::Int32.into()),
948        DataType::Varchar => field.r#type = Some(field_descriptor_proto::Type::String.into()),
949        DataType::Time => field.r#type = Some(field_descriptor_proto::Type::String.into()),
950        DataType::Timestamp => field.r#type = Some(field_descriptor_proto::Type::String.into()),
951        DataType::Timestamptz => field.r#type = Some(field_descriptor_proto::Type::String.into()),
952        DataType::Interval => field.r#type = Some(field_descriptor_proto::Type::String.into()),
953        DataType::Struct(s) => {
954            field.r#type = Some(field_descriptor_proto::Type::Message.into());
955            let name = format!("Struct{}", name);
956            let sub_proto = build_protobuf_schema(s.iter(), name.clone())?;
957            field.type_name = Some(name);
958            return Ok((field, Some(sub_proto)));
959        }
960        DataType::List(l) => {
961            let (mut field, proto) = build_protobuf_field(l.as_ref(), index, name.clone())?;
962            field.label = Some(field_descriptor_proto::Label::Repeated.into());
963            return Ok((field, proto));
964        }
965        DataType::Bytea => field.r#type = Some(field_descriptor_proto::Type::Bytes.into()),
966        DataType::Jsonb => field.r#type = Some(field_descriptor_proto::Type::String.into()),
967        DataType::Serial => field.r#type = Some(field_descriptor_proto::Type::Int64.into()),
968        DataType::Float32 | DataType::Int256 => {
969            return Err(SinkError::BigQuery(anyhow::anyhow!(
970                "Don't support Float32 and Int256"
971            )));
972        }
973        DataType::Map(_) => return Err(SinkError::BigQuery(anyhow::anyhow!("Don't support Map"))),
974        DataType::Vector(_) => {
975            return Err(SinkError::BigQuery(anyhow::anyhow!("Don't support Vector")));
976        }
977    }
978    Ok((field, None))
979}
980
981#[cfg(test)]
982mod test {
983
984    use std::assert_matches::assert_matches;
985
986    use risingwave_common::catalog::{Field, Schema};
987    use risingwave_common::types::{DataType, StructType};
988
989    use crate::sink::big_query::{
990        BigQuerySink, build_protobuf_descriptor_pool, build_protobuf_schema,
991    };
992
993    #[tokio::test]
994    async fn test_type_check() {
995        let big_query_type_string = "ARRAY<STRUCT<v1 ARRAY<INT64>, v2 STRUCT<v1 INT64, v2 INT64>>>";
996        let rw_datatype = DataType::List(Box::new(DataType::Struct(StructType::new(vec![
997            ("v1".to_owned(), DataType::List(Box::new(DataType::Int64))),
998            (
999                "v2".to_owned(),
1000                DataType::Struct(StructType::new(vec![
1001                    ("v1".to_owned(), DataType::Int64),
1002                    ("v2".to_owned(), DataType::Int64),
1003                ])),
1004            ),
1005        ]))));
1006        assert_eq!(
1007            BigQuerySink::get_string_and_check_support_from_datatype(&rw_datatype).unwrap(),
1008            big_query_type_string
1009        );
1010    }
1011
1012    #[tokio::test]
1013    async fn test_schema_check() {
1014        let schema = Schema {
1015            fields: vec![
1016                Field::with_name(DataType::Int64, "v1"),
1017                Field::with_name(DataType::Float64, "v2"),
1018                Field::with_name(
1019                    DataType::List(Box::new(DataType::Struct(StructType::new(vec![
1020                        ("v1".to_owned(), DataType::List(Box::new(DataType::Int64))),
1021                        (
1022                            "v3".to_owned(),
1023                            DataType::Struct(StructType::new(vec![
1024                                ("v1".to_owned(), DataType::Int64),
1025                                ("v2".to_owned(), DataType::Int64),
1026                            ])),
1027                        ),
1028                    ])))),
1029                    "v3",
1030                ),
1031            ],
1032        };
1033        let fields = schema
1034            .fields()
1035            .iter()
1036            .map(|f| (f.name.as_str(), &f.data_type));
1037        let desc = build_protobuf_schema(fields, "t1".to_owned()).unwrap();
1038        let pool = build_protobuf_descriptor_pool(&desc).unwrap();
1039        let t1_message = pool.get_message_by_name("t1").unwrap();
1040        assert_matches!(
1041            t1_message.get_field_by_name("v1").unwrap().kind(),
1042            prost_reflect::Kind::Int64
1043        );
1044        assert_matches!(
1045            t1_message.get_field_by_name("v2").unwrap().kind(),
1046            prost_reflect::Kind::Double
1047        );
1048        assert_matches!(
1049            t1_message.get_field_by_name("v3").unwrap().kind(),
1050            prost_reflect::Kind::Message(_)
1051        );
1052
1053        let v3_message = pool.get_message_by_name("t1.Structv3").unwrap();
1054        assert_matches!(
1055            v3_message.get_field_by_name("v1").unwrap().kind(),
1056            prost_reflect::Kind::Int64
1057        );
1058        assert!(v3_message.get_field_by_name("v1").unwrap().is_list());
1059
1060        let v3_v3_message = pool.get_message_by_name("t1.Structv3.Structv3").unwrap();
1061        assert_matches!(
1062            v3_v3_message.get_field_by_name("v1").unwrap().kind(),
1063            prost_reflect::Kind::Int64
1064        );
1065        assert_matches!(
1066            v3_v3_message.get_field_by_name("v2").unwrap().kind(),
1067            prost_reflect::Kind::Int64
1068        );
1069    }
1070}