risingwave_connector/sink/
big_query.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use core::pin::Pin;
16use core::time::Duration;
17use std::collections::{BTreeMap, HashMap, VecDeque};
18
19use anyhow::{Context, anyhow};
20use futures::future::pending;
21use futures::prelude::Future;
22use futures::{Stream, StreamExt};
23use futures_async_stream::try_stream;
24use gcp_bigquery_client::Client;
25use gcp_bigquery_client::error::BQError;
26use gcp_bigquery_client::model::query_request::QueryRequest;
27use gcp_bigquery_client::model::table::Table;
28use gcp_bigquery_client::model::table_field_schema::TableFieldSchema;
29use gcp_bigquery_client::model::table_schema::TableSchema;
30use google_cloud_bigquery::grpc::apiv1::conn_pool::ConnectionManager;
31use google_cloud_gax::conn::{ConnectionOptions, Environment};
32use google_cloud_gax::grpc::Request;
33use google_cloud_googleapis::cloud::bigquery::storage::v1::append_rows_request::{
34    MissingValueInterpretation, ProtoData, Rows as AppendRowsRequestRows,
35};
36use google_cloud_googleapis::cloud::bigquery::storage::v1::{
37    AppendRowsRequest, AppendRowsResponse, ProtoRows, ProtoSchema,
38};
39use google_cloud_pubsub::client::google_cloud_auth;
40use google_cloud_pubsub::client::google_cloud_auth::credentials::CredentialsFile;
41use phf::{Set, phf_set};
42use prost_reflect::{FieldDescriptor, MessageDescriptor};
43use prost_types::{
44    DescriptorProto, FieldDescriptorProto, FileDescriptorProto, FileDescriptorSet,
45    field_descriptor_proto,
46};
47use risingwave_common::array::{Op, StreamChunk};
48use risingwave_common::catalog::{Field, Schema};
49use risingwave_common::types::DataType;
50use serde_derive::Deserialize;
51use serde_with::{DisplayFromStr, serde_as};
52use simd_json::prelude::ArrayTrait;
53use tokio::sync::mpsc;
54use tonic::{Response, Status, async_trait};
55use url::Url;
56use uuid::Uuid;
57use with_options::WithOptions;
58use yup_oauth2::ServiceAccountKey;
59
60use super::encoder::{ProtoEncoder, ProtoHeader, RowEncoder, SerTo};
61use super::log_store::{LogStoreReadItem, TruncateOffset};
62use super::{
63    LogSinker, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, SinkError, SinkLogReader,
64};
65use crate::aws_utils::load_file_descriptor_from_s3;
66use crate::connector_common::AwsAuthProps;
67use crate::enforce_secret::EnforceSecret;
68use crate::sink::{DummySinkCommitCoordinator, Result, Sink, SinkParam, SinkWriterParam};
69
70pub const BIGQUERY_SINK: &str = "bigquery";
71pub const CHANGE_TYPE: &str = "_CHANGE_TYPE";
72const DEFAULT_GRPC_CHANNEL_NUMS: usize = 4;
73const CONNECT_TIMEOUT: Option<Duration> = Some(Duration::from_secs(30));
74const CONNECTION_TIMEOUT: Option<Duration> = None;
75const BIGQUERY_SEND_FUTURE_BUFFER_MAX_SIZE: usize = 65536;
76// < 10MB, we set 8MB
77const MAX_ROW_SIZE: usize = 8 * 1024 * 1024;
78
79#[serde_as]
80#[derive(Deserialize, Debug, Clone, WithOptions)]
81pub struct BigQueryCommon {
82    #[serde(rename = "bigquery.local.path")]
83    pub local_path: Option<String>,
84    #[serde(rename = "bigquery.s3.path")]
85    pub s3_path: Option<String>,
86    #[serde(rename = "bigquery.project")]
87    pub project: String,
88    #[serde(rename = "bigquery.dataset")]
89    pub dataset: String,
90    #[serde(rename = "bigquery.table")]
91    pub table: String,
92    #[serde(default)] // default false
93    #[serde_as(as = "DisplayFromStr")]
94    pub auto_create: bool,
95    #[serde(rename = "bigquery.credentials")]
96    pub credentials: Option<String>,
97}
98
99impl EnforceSecret for BigQueryCommon {
100    const ENFORCE_SECRET_PROPERTIES: Set<&'static str> = phf_set! {
101        "bigquery.credentials",
102    };
103}
104
105struct BigQueryFutureManager {
106    // `offset_queue` holds the Some corresponding to each future.
107    // When TruncateOffset is barrier, the num is 0, we don't need to wait for the return of `resp_stream`.
108    // When TruncateOffset is chunk:
109    // 1. chunk has no rows. we didn't send, the num is 0, we don't need to wait for the return of `resp_stream`.
110    // 2. chunk is less than `MAX_ROW_SIZE`, we only sent once, the num is 1 and we only have to wait once for `resp_stream`.
111    // 3. chunk is less than `MAX_ROW_SIZE`, we only sent n, the num is n and we need to wait n times for r.
112    offset_queue: VecDeque<(TruncateOffset, usize)>,
113    resp_stream: Pin<Box<dyn Stream<Item = Result<()>> + Send>>,
114}
115impl BigQueryFutureManager {
116    pub fn new(
117        max_future_num: usize,
118        resp_stream: impl Stream<Item = Result<()>> + Send + 'static,
119    ) -> Self {
120        let offset_queue = VecDeque::with_capacity(max_future_num);
121        Self {
122            offset_queue,
123            resp_stream: Box::pin(resp_stream),
124        }
125    }
126
127    pub fn add_offset(&mut self, offset: TruncateOffset, resp_num: usize) {
128        self.offset_queue.push_back((offset, resp_num));
129    }
130
131    pub async fn next_offset(&mut self) -> Result<TruncateOffset> {
132        if let Some((_offset, remaining_resp_num)) = self.offset_queue.front_mut() {
133            if *remaining_resp_num == 0 {
134                return Ok(self.offset_queue.pop_front().unwrap().0);
135            }
136            while *remaining_resp_num > 0 {
137                self.resp_stream
138                    .next()
139                    .await
140                    .ok_or_else(|| SinkError::BigQuery(anyhow::anyhow!("end of stream")))??;
141                *remaining_resp_num -= 1;
142            }
143            Ok(self.offset_queue.pop_front().unwrap().0)
144        } else {
145            pending().await
146        }
147    }
148}
149pub struct BigQueryLogSinker {
150    writer: BigQuerySinkWriter,
151    bigquery_future_manager: BigQueryFutureManager,
152    future_num: usize,
153}
154impl BigQueryLogSinker {
155    pub fn new(
156        writer: BigQuerySinkWriter,
157        resp_stream: impl Stream<Item = Result<()>> + Send + 'static,
158        future_num: usize,
159    ) -> Self {
160        Self {
161            writer,
162            bigquery_future_manager: BigQueryFutureManager::new(future_num, resp_stream),
163            future_num,
164        }
165    }
166}
167
168#[async_trait]
169impl LogSinker for BigQueryLogSinker {
170    async fn consume_log_and_sink(mut self, mut log_reader: impl SinkLogReader) -> Result<!> {
171        log_reader.start_from(None).await?;
172        loop {
173            tokio::select!(
174                offset = self.bigquery_future_manager.next_offset() => {
175                        log_reader.truncate(offset?)?;
176                }
177                item_result = log_reader.next_item(), if self.bigquery_future_manager.offset_queue.len() <= self.future_num => {
178                    let (epoch, item) = item_result?;
179                    match item {
180                        LogStoreReadItem::StreamChunk { chunk_id, chunk } => {
181                            let resp_num = self.writer.write_chunk(chunk)?;
182                            self.bigquery_future_manager
183                                .add_offset(TruncateOffset::Chunk { epoch, chunk_id },resp_num);
184                        }
185                        LogStoreReadItem::Barrier { .. } => {
186                            self.bigquery_future_manager
187                                .add_offset(TruncateOffset::Barrier { epoch },0);
188                        }
189                    }
190                }
191            )
192        }
193    }
194}
195
196impl BigQueryCommon {
197    async fn build_client(&self, aws_auth_props: &AwsAuthProps) -> Result<Client> {
198        let auth_json = self.get_auth_json_from_path(aws_auth_props).await?;
199
200        let service_account = serde_json::from_str::<ServiceAccountKey>(&auth_json)
201            .map_err(|err| SinkError::BigQuery(anyhow::anyhow!(err)))?;
202        let client: Client = Client::from_service_account_key(service_account, false)
203            .await
204            .map_err(|err| SinkError::BigQuery(anyhow::anyhow!(err)))?;
205        Ok(client)
206    }
207
208    async fn build_writer_client(
209        &self,
210        aws_auth_props: &AwsAuthProps,
211    ) -> Result<(StorageWriterClient, impl Stream<Item = Result<()>> + use<>)> {
212        let auth_json = self.get_auth_json_from_path(aws_auth_props).await?;
213
214        let credentials_file = CredentialsFile::new_from_str(&auth_json)
215            .await
216            .map_err(|e| SinkError::BigQuery(e.into()))?;
217        StorageWriterClient::new(credentials_file).await
218    }
219
220    async fn get_auth_json_from_path(&self, aws_auth_props: &AwsAuthProps) -> Result<String> {
221        if let Some(credentials) = &self.credentials {
222            Ok(credentials.clone())
223        } else if let Some(local_path) = &self.local_path {
224            std::fs::read_to_string(local_path)
225                .map_err(|err| SinkError::BigQuery(anyhow::anyhow!(err)))
226        } else if let Some(s3_path) = &self.s3_path {
227            let url =
228                Url::parse(s3_path).map_err(|err| SinkError::BigQuery(anyhow::anyhow!(err)))?;
229            let auth_vec = load_file_descriptor_from_s3(&url, aws_auth_props)
230                .await
231                .map_err(|err| SinkError::BigQuery(anyhow::anyhow!(err)))?;
232            Ok(String::from_utf8(auth_vec).map_err(|e| SinkError::BigQuery(e.into()))?)
233        } else {
234            Err(SinkError::BigQuery(anyhow::anyhow!(
235                "`bigquery.local.path` and `bigquery.s3.path` set at least one, configure as needed."
236            )))
237        }
238    }
239}
240
241#[serde_as]
242#[derive(Clone, Debug, Deserialize, WithOptions)]
243pub struct BigQueryConfig {
244    #[serde(flatten)]
245    pub common: BigQueryCommon,
246    #[serde(flatten)]
247    pub aws_auth_props: AwsAuthProps,
248    pub r#type: String, // accept "append-only" or "upsert"
249}
250
251impl EnforceSecret for BigQueryConfig {
252    fn enforce_one(prop: &str) -> crate::error::ConnectorResult<()> {
253        BigQueryCommon::enforce_one(prop)?;
254        AwsAuthProps::enforce_one(prop)?;
255        Ok(())
256    }
257}
258
259impl BigQueryConfig {
260    pub fn from_btreemap(properties: BTreeMap<String, String>) -> Result<Self> {
261        let config =
262            serde_json::from_value::<BigQueryConfig>(serde_json::to_value(properties).unwrap())
263                .map_err(|e| SinkError::Config(anyhow!(e)))?;
264        if config.r#type != SINK_TYPE_APPEND_ONLY && config.r#type != SINK_TYPE_UPSERT {
265            return Err(SinkError::Config(anyhow!(
266                "`{}` must be {}, or {}",
267                SINK_TYPE_OPTION,
268                SINK_TYPE_APPEND_ONLY,
269                SINK_TYPE_UPSERT
270            )));
271        }
272        Ok(config)
273    }
274}
275
276#[derive(Debug)]
277pub struct BigQuerySink {
278    pub config: BigQueryConfig,
279    schema: Schema,
280    pk_indices: Vec<usize>,
281    is_append_only: bool,
282}
283
284impl EnforceSecret for BigQuerySink {
285    fn enforce_secret<'a>(
286        prop_iter: impl Iterator<Item = &'a str>,
287    ) -> crate::error::ConnectorResult<()> {
288        for prop in prop_iter {
289            BigQueryConfig::enforce_one(prop)?;
290        }
291        Ok(())
292    }
293}
294
295impl BigQuerySink {
296    pub fn new(
297        config: BigQueryConfig,
298        schema: Schema,
299        pk_indices: Vec<usize>,
300        is_append_only: bool,
301    ) -> Result<Self> {
302        Ok(Self {
303            config,
304            schema,
305            pk_indices,
306            is_append_only,
307        })
308    }
309}
310
311impl BigQuerySink {
312    fn check_column_name_and_type(
313        &self,
314        big_query_columns_desc: HashMap<String, String>,
315    ) -> Result<()> {
316        let rw_fields_name = self.schema.fields();
317        if big_query_columns_desc.is_empty() {
318            return Err(SinkError::BigQuery(anyhow::anyhow!(
319                "Cannot find table in bigquery"
320            )));
321        }
322        if rw_fields_name.len().ne(&big_query_columns_desc.len()) {
323            return Err(SinkError::BigQuery(anyhow::anyhow!(
324                "The length of the RisingWave column {} must be equal to the length of the bigquery column {}",
325                rw_fields_name.len(),
326                big_query_columns_desc.len()
327            )));
328        }
329
330        for i in rw_fields_name {
331            let value = big_query_columns_desc.get(&i.name).ok_or_else(|| {
332                SinkError::BigQuery(anyhow::anyhow!(
333                    "Column `{:?}` on RisingWave side is not found on BigQuery side.",
334                    i.name
335                ))
336            })?;
337            let data_type_string = Self::get_string_and_check_support_from_datatype(&i.data_type)?;
338            if data_type_string.ne(value) {
339                return Err(SinkError::BigQuery(anyhow::anyhow!(
340                    "Data type mismatch for column `{:?}`. BigQuery side: `{:?}`, RisingWave side: `{:?}`. ",
341                    i.name,
342                    value,
343                    data_type_string
344                )));
345            };
346        }
347        Ok(())
348    }
349
350    fn get_string_and_check_support_from_datatype(rw_data_type: &DataType) -> Result<String> {
351        match rw_data_type {
352            DataType::Boolean => Ok("BOOL".to_owned()),
353            DataType::Int16 => Ok("INT64".to_owned()),
354            DataType::Int32 => Ok("INT64".to_owned()),
355            DataType::Int64 => Ok("INT64".to_owned()),
356            DataType::Float32 => Err(SinkError::BigQuery(anyhow::anyhow!(
357                "Bigquery cannot support real"
358            ))),
359            DataType::Float64 => Ok("FLOAT64".to_owned()),
360            DataType::Decimal => Ok("NUMERIC".to_owned()),
361            DataType::Date => Ok("DATE".to_owned()),
362            DataType::Varchar => Ok("STRING".to_owned()),
363            DataType::Time => Ok("TIME".to_owned()),
364            DataType::Timestamp => Ok("DATETIME".to_owned()),
365            DataType::Timestamptz => Ok("TIMESTAMP".to_owned()),
366            DataType::Interval => Ok("INTERVAL".to_owned()),
367            DataType::Struct(structs) => {
368                let mut elements_vec = vec![];
369                for (name, datatype) in structs.iter() {
370                    let element_string =
371                        Self::get_string_and_check_support_from_datatype(datatype)?;
372                    elements_vec.push(format!("{} {}", name, element_string));
373                }
374                Ok(format!("STRUCT<{}>", elements_vec.join(", ")))
375            }
376            DataType::List(l) => {
377                let element_string = Self::get_string_and_check_support_from_datatype(l.as_ref())?;
378                Ok(format!("ARRAY<{}>", element_string))
379            }
380            DataType::Bytea => Ok("BYTES".to_owned()),
381            DataType::Jsonb => Ok("JSON".to_owned()),
382            DataType::Serial => Ok("INT64".to_owned()),
383            DataType::Int256 => Err(SinkError::BigQuery(anyhow::anyhow!(
384                "Bigquery cannot support Int256"
385            ))),
386            DataType::Map(_) => Err(SinkError::BigQuery(anyhow::anyhow!(
387                "Bigquery cannot support Map"
388            ))),
389        }
390    }
391
392    fn map_field(rw_field: &Field) -> Result<TableFieldSchema> {
393        let tfs = match &rw_field.data_type {
394            DataType::Boolean => TableFieldSchema::bool(&rw_field.name),
395            DataType::Int16 | DataType::Int32 | DataType::Int64 | DataType::Serial => {
396                TableFieldSchema::integer(&rw_field.name)
397            }
398            DataType::Float32 => {
399                return Err(SinkError::BigQuery(anyhow::anyhow!(
400                    "Bigquery cannot support real"
401                )));
402            }
403            DataType::Float64 => TableFieldSchema::float(&rw_field.name),
404            DataType::Decimal => TableFieldSchema::numeric(&rw_field.name),
405            DataType::Date => TableFieldSchema::date(&rw_field.name),
406            DataType::Varchar => TableFieldSchema::string(&rw_field.name),
407            DataType::Time => TableFieldSchema::time(&rw_field.name),
408            DataType::Timestamp => TableFieldSchema::date_time(&rw_field.name),
409            DataType::Timestamptz => TableFieldSchema::timestamp(&rw_field.name),
410            DataType::Interval => {
411                return Err(SinkError::BigQuery(anyhow::anyhow!(
412                    "Bigquery cannot support Interval"
413                )));
414            }
415            DataType::Struct(st) => {
416                let mut sub_fields = Vec::with_capacity(st.len());
417                for (name, dt) in st.iter() {
418                    let rw_field = Field::with_name(dt.clone(), name);
419                    let field = Self::map_field(&rw_field)?;
420                    sub_fields.push(field);
421                }
422                TableFieldSchema::record(&rw_field.name, sub_fields)
423            }
424            DataType::List(dt) => {
425                let inner_field = Self::map_field(&Field::with_name(*dt.clone(), &rw_field.name))?;
426                TableFieldSchema {
427                    mode: Some("REPEATED".to_owned()),
428                    ..inner_field
429                }
430            }
431
432            DataType::Bytea => TableFieldSchema::bytes(&rw_field.name),
433            DataType::Jsonb => TableFieldSchema::json(&rw_field.name),
434            DataType::Int256 => {
435                return Err(SinkError::BigQuery(anyhow::anyhow!(
436                    "Bigquery cannot support Int256"
437                )));
438            }
439            DataType::Map(_) => {
440                return Err(SinkError::BigQuery(anyhow::anyhow!(
441                    "Bigquery cannot support Map"
442                )));
443            }
444        };
445        Ok(tfs)
446    }
447
448    async fn create_table(
449        &self,
450        client: &Client,
451        project_id: &str,
452        dataset_id: &str,
453        table_id: &str,
454        fields: &Vec<Field>,
455    ) -> Result<Table> {
456        let dataset = client
457            .dataset()
458            .get(project_id, dataset_id)
459            .await
460            .map_err(|e| SinkError::BigQuery(e.into()))?;
461        let fields: Vec<_> = fields.iter().map(Self::map_field).collect::<Result<_>>()?;
462        let table = Table::from_dataset(&dataset, table_id, TableSchema::new(fields));
463
464        client
465            .table()
466            .create(table)
467            .await
468            .map_err(|e| SinkError::BigQuery(e.into()))
469    }
470}
471
472impl Sink for BigQuerySink {
473    type Coordinator = DummySinkCommitCoordinator;
474    type LogSinker = BigQueryLogSinker;
475
476    const SINK_NAME: &'static str = BIGQUERY_SINK;
477
478    async fn new_log_sinker(&self, _writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
479        let (writer, resp_stream) = BigQuerySinkWriter::new(
480            self.config.clone(),
481            self.schema.clone(),
482            self.pk_indices.clone(),
483            self.is_append_only,
484        )
485        .await?;
486        Ok(BigQueryLogSinker::new(
487            writer,
488            resp_stream,
489            BIGQUERY_SEND_FUTURE_BUFFER_MAX_SIZE,
490        ))
491    }
492
493    async fn validate(&self) -> Result<()> {
494        risingwave_common::license::Feature::BigQuerySink
495            .check_available()
496            .map_err(|e| anyhow::anyhow!(e))?;
497        if !self.is_append_only && self.pk_indices.is_empty() {
498            return Err(SinkError::Config(anyhow!(
499                "Primary key not defined for upsert bigquery sink (please define in `primary_key` field)"
500            )));
501        }
502        let client = self
503            .config
504            .common
505            .build_client(&self.config.aws_auth_props)
506            .await?;
507        let BigQueryCommon {
508            project: project_id,
509            dataset: dataset_id,
510            table: table_id,
511            ..
512        } = &self.config.common;
513
514        if self.config.common.auto_create {
515            match client
516                .table()
517                .get(project_id, dataset_id, table_id, None)
518                .await
519            {
520                Err(BQError::RequestError(_)) => {
521                    // early return: no need to query schema to check column and type
522                    return self
523                        .create_table(
524                            &client,
525                            project_id,
526                            dataset_id,
527                            table_id,
528                            &self.schema.fields,
529                        )
530                        .await
531                        .map(|_| ());
532                }
533                Err(e) => return Err(SinkError::BigQuery(e.into())),
534                _ => {}
535            }
536        }
537
538        let mut rs = client
539            .job()
540            .query(
541                &self.config.common.project,
542                QueryRequest::new(format!(
543                    "SELECT column_name, data_type FROM `{}.{}.INFORMATION_SCHEMA.COLUMNS` WHERE table_name = '{}'",
544                    project_id, dataset_id, table_id,
545                )),
546            ).await.map_err(|e| SinkError::BigQuery(e.into()))?;
547
548        let mut big_query_schema = HashMap::default();
549        while rs.next_row() {
550            big_query_schema.insert(
551                rs.get_string_by_name("column_name")
552                    .map_err(|e| SinkError::BigQuery(e.into()))?
553                    .ok_or_else(|| {
554                        SinkError::BigQuery(anyhow::anyhow!("Cannot find column_name"))
555                    })?,
556                rs.get_string_by_name("data_type")
557                    .map_err(|e| SinkError::BigQuery(e.into()))?
558                    .ok_or_else(|| {
559                        SinkError::BigQuery(anyhow::anyhow!("Cannot find column_name"))
560                    })?,
561            );
562        }
563
564        self.check_column_name_and_type(big_query_schema)?;
565        Ok(())
566    }
567}
568
569pub struct BigQuerySinkWriter {
570    pub config: BigQueryConfig,
571    #[expect(dead_code)]
572    schema: Schema,
573    #[expect(dead_code)]
574    pk_indices: Vec<usize>,
575    client: StorageWriterClient,
576    is_append_only: bool,
577    row_encoder: ProtoEncoder,
578    writer_pb_schema: ProtoSchema,
579    #[expect(dead_code)]
580    message_descriptor: MessageDescriptor,
581    write_stream: String,
582    proto_field: Option<FieldDescriptor>,
583}
584
585impl TryFrom<SinkParam> for BigQuerySink {
586    type Error = SinkError;
587
588    fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
589        let schema = param.schema();
590        let config = BigQueryConfig::from_btreemap(param.properties)?;
591        BigQuerySink::new(
592            config,
593            schema,
594            param.downstream_pk,
595            param.sink_type.is_append_only(),
596        )
597    }
598}
599
600impl BigQuerySinkWriter {
601    pub async fn new(
602        config: BigQueryConfig,
603        schema: Schema,
604        pk_indices: Vec<usize>,
605        is_append_only: bool,
606    ) -> Result<(Self, impl Stream<Item = Result<()>>)> {
607        let (client, resp_stream) = config
608            .common
609            .build_writer_client(&config.aws_auth_props)
610            .await?;
611        let mut descriptor_proto = build_protobuf_schema(
612            schema
613                .fields()
614                .iter()
615                .map(|f| (f.name.as_str(), &f.data_type)),
616            config.common.table.clone(),
617        )?;
618
619        if !is_append_only {
620            let field = FieldDescriptorProto {
621                name: Some(CHANGE_TYPE.to_owned()),
622                number: Some((schema.len() + 1) as i32),
623                r#type: Some(field_descriptor_proto::Type::String.into()),
624                ..Default::default()
625            };
626            descriptor_proto.field.push(field);
627        }
628
629        let descriptor_pool = build_protobuf_descriptor_pool(&descriptor_proto)?;
630        let message_descriptor = descriptor_pool
631            .get_message_by_name(&config.common.table)
632            .ok_or_else(|| {
633                SinkError::BigQuery(anyhow::anyhow!(
634                    "Can't find message proto {}",
635                    &config.common.table
636                ))
637            })?;
638        let proto_field = if !is_append_only {
639            let proto_field = message_descriptor
640                .get_field_by_name(CHANGE_TYPE)
641                .ok_or_else(|| {
642                    SinkError::BigQuery(anyhow::anyhow!("Can't find {}", CHANGE_TYPE))
643                })?;
644            Some(proto_field)
645        } else {
646            None
647        };
648        let row_encoder = ProtoEncoder::new(
649            schema.clone(),
650            None,
651            message_descriptor.clone(),
652            ProtoHeader::None,
653        )?;
654        Ok((
655            Self {
656                write_stream: format!(
657                    "projects/{}/datasets/{}/tables/{}/streams/_default",
658                    config.common.project, config.common.dataset, config.common.table
659                ),
660                config,
661                schema,
662                pk_indices,
663                client,
664                is_append_only,
665                row_encoder,
666                message_descriptor,
667                proto_field,
668                writer_pb_schema: ProtoSchema {
669                    proto_descriptor: Some(descriptor_proto),
670                },
671            },
672            resp_stream,
673        ))
674    }
675
676    fn append_only(&mut self, chunk: StreamChunk) -> Result<Vec<Vec<u8>>> {
677        let mut serialized_rows: Vec<Vec<u8>> = Vec::with_capacity(chunk.capacity());
678        for (op, row) in chunk.rows() {
679            if op != Op::Insert {
680                continue;
681            }
682            serialized_rows.push(self.row_encoder.encode(row)?.ser_to()?)
683        }
684        Ok(serialized_rows)
685    }
686
687    fn upsert(&mut self, chunk: StreamChunk) -> Result<Vec<Vec<u8>>> {
688        let mut serialized_rows: Vec<Vec<u8>> = Vec::with_capacity(chunk.capacity());
689        for (op, row) in chunk.rows() {
690            if op == Op::UpdateDelete {
691                continue;
692            }
693            let mut pb_row = self.row_encoder.encode(row)?;
694            match op {
695                Op::Insert => pb_row
696                    .message
697                    .try_set_field(
698                        self.proto_field.as_ref().unwrap(),
699                        prost_reflect::Value::String("UPSERT".to_owned()),
700                    )
701                    .map_err(|e| SinkError::BigQuery(e.into()))?,
702                Op::Delete => pb_row
703                    .message
704                    .try_set_field(
705                        self.proto_field.as_ref().unwrap(),
706                        prost_reflect::Value::String("DELETE".to_owned()),
707                    )
708                    .map_err(|e| SinkError::BigQuery(e.into()))?,
709                Op::UpdateDelete => continue,
710                Op::UpdateInsert => pb_row
711                    .message
712                    .try_set_field(
713                        self.proto_field.as_ref().unwrap(),
714                        prost_reflect::Value::String("UPSERT".to_owned()),
715                    )
716                    .map_err(|e| SinkError::BigQuery(e.into()))?,
717            };
718
719            serialized_rows.push(pb_row.ser_to()?)
720        }
721        Ok(serialized_rows)
722    }
723
724    fn write_chunk(&mut self, chunk: StreamChunk) -> Result<usize> {
725        let serialized_rows = if self.is_append_only {
726            self.append_only(chunk)?
727        } else {
728            self.upsert(chunk)?
729        };
730        if serialized_rows.is_empty() {
731            return Ok(0);
732        }
733        let mut result = Vec::new();
734        let mut result_inner = Vec::new();
735        let mut size_count = 0;
736        for i in serialized_rows {
737            size_count += i.len();
738            if size_count > MAX_ROW_SIZE {
739                result.push(result_inner);
740                result_inner = Vec::new();
741                size_count = i.len();
742            }
743            result_inner.push(i);
744        }
745        if !result_inner.is_empty() {
746            result.push(result_inner);
747        }
748        let len = result.len();
749        for serialized_rows in result {
750            let rows = AppendRowsRequestRows::ProtoRows(ProtoData {
751                writer_schema: Some(self.writer_pb_schema.clone()),
752                rows: Some(ProtoRows { serialized_rows }),
753            });
754            self.client.append_rows(rows, self.write_stream.clone())?;
755        }
756        Ok(len)
757    }
758}
759
760#[try_stream(ok = (), error = SinkError)]
761pub async fn resp_to_stream(
762    resp_stream: impl Future<
763        Output = std::result::Result<
764            Response<google_cloud_gax::grpc::Streaming<AppendRowsResponse>>,
765            Status,
766        >,
767    >
768    + 'static
769    + Send,
770) {
771    let mut resp_stream = resp_stream
772        .await
773        .map_err(|e| SinkError::BigQuery(e.into()))?
774        .into_inner();
775    loop {
776        match resp_stream
777            .message()
778            .await
779            .map_err(|e| SinkError::BigQuery(e.into()))?
780        {
781            Some(append_rows_response) => {
782                if !append_rows_response.row_errors.is_empty() {
783                    return Err(SinkError::BigQuery(anyhow::anyhow!(
784                        "bigquery insert error {:?}",
785                        append_rows_response.row_errors
786                    )));
787                }
788                if let Some(google_cloud_googleapis::cloud::bigquery::storage::v1::append_rows_response::Response::Error(status)) = append_rows_response.response{
789                            return Err(SinkError::BigQuery(anyhow::anyhow!(
790                                "bigquery insert error {:?}",
791                                status
792                            )));
793                        }
794                yield ();
795            }
796            None => {
797                return Err(SinkError::BigQuery(anyhow::anyhow!(
798                    "bigquery insert error: end of resp stream",
799                )));
800            }
801        }
802    }
803}
804
805struct StorageWriterClient {
806    #[expect(dead_code)]
807    environment: Environment,
808    request_sender: mpsc::UnboundedSender<AppendRowsRequest>,
809}
810impl StorageWriterClient {
811    pub async fn new(
812        credentials: CredentialsFile,
813    ) -> Result<(Self, impl Stream<Item = Result<()>>)> {
814        let ts_grpc = google_cloud_auth::token::DefaultTokenSourceProvider::new_with_credentials(
815            Self::bigquery_grpc_auth_config(),
816            Box::new(credentials),
817        )
818        .await
819        .map_err(|e| SinkError::BigQuery(e.into()))?;
820        let conn_options = ConnectionOptions {
821            connect_timeout: CONNECT_TIMEOUT,
822            timeout: CONNECTION_TIMEOUT,
823        };
824        let environment = Environment::GoogleCloud(Box::new(ts_grpc));
825        let conn = ConnectionManager::new(DEFAULT_GRPC_CHANNEL_NUMS, &environment, &conn_options)
826            .await
827            .map_err(|e| SinkError::BigQuery(e.into()))?;
828        let mut client = conn.writer();
829
830        let (tx, rx) = mpsc::unbounded_channel();
831        let stream = tokio_stream::wrappers::UnboundedReceiverStream::new(rx);
832
833        let resp = async move { client.append_rows(Request::new(stream)).await };
834        let resp_stream = resp_to_stream(resp);
835
836        Ok((
837            StorageWriterClient {
838                environment,
839                request_sender: tx,
840            },
841            resp_stream,
842        ))
843    }
844
845    pub fn append_rows(&mut self, row: AppendRowsRequestRows, write_stream: String) -> Result<()> {
846        let append_req = AppendRowsRequest {
847            write_stream: write_stream.clone(),
848            offset: None,
849            trace_id: Uuid::new_v4().hyphenated().to_string(),
850            missing_value_interpretations: HashMap::default(),
851            rows: Some(row),
852            default_missing_value_interpretation: MissingValueInterpretation::DefaultValue as i32,
853        };
854        self.request_sender
855            .send(append_req)
856            .map_err(|e| SinkError::BigQuery(e.into()))?;
857        Ok(())
858    }
859
860    fn bigquery_grpc_auth_config() -> google_cloud_auth::project::Config<'static> {
861        let mut auth_config = google_cloud_auth::project::Config::default();
862        auth_config =
863            auth_config.with_audience(google_cloud_bigquery::grpc::apiv1::conn_pool::AUDIENCE);
864        auth_config =
865            auth_config.with_scopes(&google_cloud_bigquery::grpc::apiv1::conn_pool::SCOPES);
866        auth_config
867    }
868}
869
870fn build_protobuf_descriptor_pool(desc: &DescriptorProto) -> Result<prost_reflect::DescriptorPool> {
871    let file_descriptor = FileDescriptorProto {
872        message_type: vec![desc.clone()],
873        name: Some("bigquery".to_owned()),
874        ..Default::default()
875    };
876
877    prost_reflect::DescriptorPool::from_file_descriptor_set(FileDescriptorSet {
878        file: vec![file_descriptor],
879    })
880    .context("failed to build descriptor pool")
881    .map_err(SinkError::BigQuery)
882}
883
884fn build_protobuf_schema<'a>(
885    fields: impl Iterator<Item = (&'a str, &'a DataType)>,
886    name: String,
887) -> Result<DescriptorProto> {
888    let mut proto = DescriptorProto {
889        name: Some(name),
890        ..Default::default()
891    };
892    let mut struct_vec = vec![];
893    let field_vec = fields
894        .enumerate()
895        .map(|(index, (name, data_type))| {
896            let (field, des_proto) =
897                build_protobuf_field(data_type, (index + 1) as i32, name.to_owned())?;
898            if let Some(sv) = des_proto {
899                struct_vec.push(sv);
900            }
901            Ok(field)
902        })
903        .collect::<Result<Vec<_>>>()?;
904    proto.field = field_vec;
905    proto.nested_type = struct_vec;
906    Ok(proto)
907}
908
909fn build_protobuf_field(
910    data_type: &DataType,
911    index: i32,
912    name: String,
913) -> Result<(FieldDescriptorProto, Option<DescriptorProto>)> {
914    let mut field = FieldDescriptorProto {
915        name: Some(name.clone()),
916        number: Some(index),
917        ..Default::default()
918    };
919    match data_type {
920        DataType::Boolean => field.r#type = Some(field_descriptor_proto::Type::Bool.into()),
921        DataType::Int32 => field.r#type = Some(field_descriptor_proto::Type::Int32.into()),
922        DataType::Int16 | DataType::Int64 => {
923            field.r#type = Some(field_descriptor_proto::Type::Int64.into())
924        }
925        DataType::Float64 => field.r#type = Some(field_descriptor_proto::Type::Double.into()),
926        DataType::Decimal => field.r#type = Some(field_descriptor_proto::Type::String.into()),
927        DataType::Date => field.r#type = Some(field_descriptor_proto::Type::Int32.into()),
928        DataType::Varchar => field.r#type = Some(field_descriptor_proto::Type::String.into()),
929        DataType::Time => field.r#type = Some(field_descriptor_proto::Type::String.into()),
930        DataType::Timestamp => field.r#type = Some(field_descriptor_proto::Type::String.into()),
931        DataType::Timestamptz => field.r#type = Some(field_descriptor_proto::Type::String.into()),
932        DataType::Interval => field.r#type = Some(field_descriptor_proto::Type::String.into()),
933        DataType::Struct(s) => {
934            field.r#type = Some(field_descriptor_proto::Type::Message.into());
935            let name = format!("Struct{}", name);
936            let sub_proto = build_protobuf_schema(s.iter(), name.clone())?;
937            field.type_name = Some(name);
938            return Ok((field, Some(sub_proto)));
939        }
940        DataType::List(l) => {
941            let (mut field, proto) = build_protobuf_field(l.as_ref(), index, name.clone())?;
942            field.label = Some(field_descriptor_proto::Label::Repeated.into());
943            return Ok((field, proto));
944        }
945        DataType::Bytea => field.r#type = Some(field_descriptor_proto::Type::Bytes.into()),
946        DataType::Jsonb => field.r#type = Some(field_descriptor_proto::Type::String.into()),
947        DataType::Serial => field.r#type = Some(field_descriptor_proto::Type::Int64.into()),
948        DataType::Float32 | DataType::Int256 => {
949            return Err(SinkError::BigQuery(anyhow::anyhow!(
950                "Don't support Float32 and Int256"
951            )));
952        }
953        DataType::Map(_) => todo!(),
954    }
955    Ok((field, None))
956}
957
958#[cfg(test)]
959mod test {
960
961    use std::assert_matches::assert_matches;
962
963    use risingwave_common::catalog::{Field, Schema};
964    use risingwave_common::types::{DataType, StructType};
965
966    use crate::sink::big_query::{
967        BigQuerySink, build_protobuf_descriptor_pool, build_protobuf_schema,
968    };
969
970    #[tokio::test]
971    async fn test_type_check() {
972        let big_query_type_string = "ARRAY<STRUCT<v1 ARRAY<INT64>, v2 STRUCT<v1 INT64, v2 INT64>>>";
973        let rw_datatype = DataType::List(Box::new(DataType::Struct(StructType::new(vec![
974            ("v1".to_owned(), DataType::List(Box::new(DataType::Int64))),
975            (
976                "v2".to_owned(),
977                DataType::Struct(StructType::new(vec![
978                    ("v1".to_owned(), DataType::Int64),
979                    ("v2".to_owned(), DataType::Int64),
980                ])),
981            ),
982        ]))));
983        assert_eq!(
984            BigQuerySink::get_string_and_check_support_from_datatype(&rw_datatype).unwrap(),
985            big_query_type_string
986        );
987    }
988
989    #[tokio::test]
990    async fn test_schema_check() {
991        let schema = Schema {
992            fields: vec![
993                Field::with_name(DataType::Int64, "v1"),
994                Field::with_name(DataType::Float64, "v2"),
995                Field::with_name(
996                    DataType::List(Box::new(DataType::Struct(StructType::new(vec![
997                        ("v1".to_owned(), DataType::List(Box::new(DataType::Int64))),
998                        (
999                            "v3".to_owned(),
1000                            DataType::Struct(StructType::new(vec![
1001                                ("v1".to_owned(), DataType::Int64),
1002                                ("v2".to_owned(), DataType::Int64),
1003                            ])),
1004                        ),
1005                    ])))),
1006                    "v3",
1007                ),
1008            ],
1009        };
1010        let fields = schema
1011            .fields()
1012            .iter()
1013            .map(|f| (f.name.as_str(), &f.data_type));
1014        let desc = build_protobuf_schema(fields, "t1".to_owned()).unwrap();
1015        let pool = build_protobuf_descriptor_pool(&desc).unwrap();
1016        let t1_message = pool.get_message_by_name("t1").unwrap();
1017        assert_matches!(
1018            t1_message.get_field_by_name("v1").unwrap().kind(),
1019            prost_reflect::Kind::Int64
1020        );
1021        assert_matches!(
1022            t1_message.get_field_by_name("v2").unwrap().kind(),
1023            prost_reflect::Kind::Double
1024        );
1025        assert_matches!(
1026            t1_message.get_field_by_name("v3").unwrap().kind(),
1027            prost_reflect::Kind::Message(_)
1028        );
1029
1030        let v3_message = pool.get_message_by_name("t1.Structv3").unwrap();
1031        assert_matches!(
1032            v3_message.get_field_by_name("v1").unwrap().kind(),
1033            prost_reflect::Kind::Int64
1034        );
1035        assert!(v3_message.get_field_by_name("v1").unwrap().is_list());
1036
1037        let v3_v3_message = pool.get_message_by_name("t1.Structv3.Structv3").unwrap();
1038        assert_matches!(
1039            v3_v3_message.get_field_by_name("v1").unwrap().kind(),
1040            prost_reflect::Kind::Int64
1041        );
1042        assert_matches!(
1043            v3_v3_message.get_field_by_name("v2").unwrap().kind(),
1044            prost_reflect::Kind::Int64
1045        );
1046    }
1047}