risingwave_connector/sink/
big_query.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use core::pin::Pin;
16use core::time::Duration;
17use std::collections::{BTreeMap, HashMap, VecDeque};
18
19use anyhow::{Context, anyhow};
20use futures::future::pending;
21use futures::prelude::Future;
22use futures::{Stream, StreamExt};
23use futures_async_stream::try_stream;
24use gcp_bigquery_client::Client;
25use gcp_bigquery_client::error::BQError;
26use gcp_bigquery_client::model::query_request::QueryRequest;
27use gcp_bigquery_client::model::table::Table;
28use gcp_bigquery_client::model::table_field_schema::TableFieldSchema;
29use gcp_bigquery_client::model::table_schema::TableSchema;
30use google_cloud_bigquery::grpc::apiv1::conn_pool::ConnectionManager;
31use google_cloud_gax::conn::{ConnectionOptions, Environment};
32use google_cloud_gax::grpc::Request;
33use google_cloud_googleapis::cloud::bigquery::storage::v1::append_rows_request::{
34    MissingValueInterpretation, ProtoData, Rows as AppendRowsRequestRows,
35};
36use google_cloud_googleapis::cloud::bigquery::storage::v1::{
37    AppendRowsRequest, AppendRowsResponse, ProtoRows, ProtoSchema,
38};
39use google_cloud_pubsub::client::google_cloud_auth;
40use google_cloud_pubsub::client::google_cloud_auth::credentials::CredentialsFile;
41use phf::{Set, phf_set};
42use prost_reflect::{FieldDescriptor, MessageDescriptor};
43use prost_types::{
44    DescriptorProto, FieldDescriptorProto, FileDescriptorProto, FileDescriptorSet,
45    field_descriptor_proto,
46};
47use risingwave_common::array::{Op, StreamChunk};
48use risingwave_common::catalog::{Field, Schema};
49use risingwave_common::types::DataType;
50use serde_derive::Deserialize;
51use serde_with::{DisplayFromStr, serde_as};
52use simd_json::prelude::ArrayTrait;
53use tokio::sync::mpsc;
54use tonic::{Response, Status, async_trait};
55use url::Url;
56use uuid::Uuid;
57use with_options::WithOptions;
58use yup_oauth2::ServiceAccountKey;
59
60use super::encoder::{ProtoEncoder, ProtoHeader, RowEncoder, SerTo};
61use super::log_store::{LogStoreReadItem, TruncateOffset};
62use super::{
63    LogSinker, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, SinkError, SinkLogReader,
64};
65use crate::aws_utils::load_file_descriptor_from_s3;
66use crate::connector_common::AwsAuthProps;
67use crate::enforce_secret::EnforceSecret;
68use crate::sink::{DummySinkCommitCoordinator, Result, Sink, SinkParam, SinkWriterParam};
69
70pub const BIGQUERY_SINK: &str = "bigquery";
71pub const CHANGE_TYPE: &str = "_CHANGE_TYPE";
72const DEFAULT_GRPC_CHANNEL_NUMS: usize = 4;
73const CONNECT_TIMEOUT: Option<Duration> = Some(Duration::from_secs(30));
74const CONNECTION_TIMEOUT: Option<Duration> = None;
75const BIGQUERY_SEND_FUTURE_BUFFER_MAX_SIZE: usize = 65536;
76// < 10MB, we set 8MB
77const MAX_ROW_SIZE: usize = 8 * 1024 * 1024;
78
79#[serde_as]
80#[derive(Deserialize, Debug, Clone, WithOptions)]
81pub struct BigQueryCommon {
82    #[serde(rename = "bigquery.local.path")]
83    pub local_path: Option<String>,
84    #[serde(rename = "bigquery.s3.path")]
85    pub s3_path: Option<String>,
86    #[serde(rename = "bigquery.project")]
87    pub project: String,
88    #[serde(rename = "bigquery.dataset")]
89    pub dataset: String,
90    #[serde(rename = "bigquery.table")]
91    pub table: String,
92    #[serde(default)] // default false
93    #[serde_as(as = "DisplayFromStr")]
94    pub auto_create: bool,
95    #[serde(rename = "bigquery.credentials")]
96    pub credentials: Option<String>,
97}
98
99impl EnforceSecret for BigQueryCommon {
100    const ENFORCE_SECRET_PROPERTIES: Set<&'static str> = phf_set! {
101        "bigquery.credentials",
102    };
103}
104
105struct BigQueryFutureManager {
106    // `offset_queue` holds the Some corresponding to each future.
107    // When TruncateOffset is barrier, the num is 0, we don't need to wait for the return of `resp_stream`.
108    // When TruncateOffset is chunk:
109    // 1. chunk has no rows. we didn't send, the num is 0, we don't need to wait for the return of `resp_stream`.
110    // 2. chunk is less than `MAX_ROW_SIZE`, we only sent once, the num is 1 and we only have to wait once for `resp_stream`.
111    // 3. chunk is less than `MAX_ROW_SIZE`, we only sent n, the num is n and we need to wait n times for r.
112    offset_queue: VecDeque<(TruncateOffset, usize)>,
113    resp_stream: Pin<Box<dyn Stream<Item = Result<()>> + Send>>,
114}
115impl BigQueryFutureManager {
116    pub fn new(
117        max_future_num: usize,
118        resp_stream: impl Stream<Item = Result<()>> + Send + 'static,
119    ) -> Self {
120        let offset_queue = VecDeque::with_capacity(max_future_num);
121        Self {
122            offset_queue,
123            resp_stream: Box::pin(resp_stream),
124        }
125    }
126
127    pub fn add_offset(&mut self, offset: TruncateOffset, resp_num: usize) {
128        self.offset_queue.push_back((offset, resp_num));
129    }
130
131    pub async fn next_offset(&mut self) -> Result<TruncateOffset> {
132        if let Some((_offset, remaining_resp_num)) = self.offset_queue.front_mut() {
133            if *remaining_resp_num == 0 {
134                return Ok(self.offset_queue.pop_front().unwrap().0);
135            }
136            while *remaining_resp_num > 0 {
137                self.resp_stream
138                    .next()
139                    .await
140                    .ok_or_else(|| SinkError::BigQuery(anyhow::anyhow!("end of stream")))??;
141                *remaining_resp_num -= 1;
142            }
143            Ok(self.offset_queue.pop_front().unwrap().0)
144        } else {
145            pending().await
146        }
147    }
148}
149pub struct BigQueryLogSinker {
150    writer: BigQuerySinkWriter,
151    bigquery_future_manager: BigQueryFutureManager,
152    future_num: usize,
153}
154impl BigQueryLogSinker {
155    pub fn new(
156        writer: BigQuerySinkWriter,
157        resp_stream: impl Stream<Item = Result<()>> + Send + 'static,
158        future_num: usize,
159    ) -> Self {
160        Self {
161            writer,
162            bigquery_future_manager: BigQueryFutureManager::new(future_num, resp_stream),
163            future_num,
164        }
165    }
166}
167
168#[async_trait]
169impl LogSinker for BigQueryLogSinker {
170    async fn consume_log_and_sink(mut self, mut log_reader: impl SinkLogReader) -> Result<!> {
171        log_reader.start_from(None).await?;
172        loop {
173            tokio::select!(
174                offset = self.bigquery_future_manager.next_offset() => {
175                        log_reader.truncate(offset?)?;
176                }
177                item_result = log_reader.next_item(), if self.bigquery_future_manager.offset_queue.len() <= self.future_num => {
178                    let (epoch, item) = item_result?;
179                    match item {
180                        LogStoreReadItem::StreamChunk { chunk_id, chunk } => {
181                            let resp_num = self.writer.write_chunk(chunk)?;
182                            self.bigquery_future_manager
183                                .add_offset(TruncateOffset::Chunk { epoch, chunk_id },resp_num);
184                        }
185                        LogStoreReadItem::Barrier { .. } => {
186                            self.bigquery_future_manager
187                                .add_offset(TruncateOffset::Barrier { epoch },0);
188                        }
189                    }
190                }
191            )
192        }
193    }
194}
195
196impl BigQueryCommon {
197    async fn build_client(&self, aws_auth_props: &AwsAuthProps) -> Result<Client> {
198        let auth_json = self.get_auth_json_from_path(aws_auth_props).await?;
199
200        let service_account = serde_json::from_str::<ServiceAccountKey>(&auth_json)
201            .map_err(|err| SinkError::BigQuery(anyhow::anyhow!(err)))?;
202        let client: Client = Client::from_service_account_key(service_account, false)
203            .await
204            .map_err(|err| SinkError::BigQuery(anyhow::anyhow!(err)))?;
205        Ok(client)
206    }
207
208    async fn build_writer_client(
209        &self,
210        aws_auth_props: &AwsAuthProps,
211    ) -> Result<(StorageWriterClient, impl Stream<Item = Result<()>> + use<>)> {
212        let auth_json = self.get_auth_json_from_path(aws_auth_props).await?;
213
214        let credentials_file = CredentialsFile::new_from_str(&auth_json)
215            .await
216            .map_err(|e| SinkError::BigQuery(e.into()))?;
217        StorageWriterClient::new(credentials_file).await
218    }
219
220    async fn get_auth_json_from_path(&self, aws_auth_props: &AwsAuthProps) -> Result<String> {
221        if let Some(credentials) = &self.credentials {
222            Ok(credentials.clone())
223        } else if let Some(local_path) = &self.local_path {
224            std::fs::read_to_string(local_path)
225                .map_err(|err| SinkError::BigQuery(anyhow::anyhow!(err)))
226        } else if let Some(s3_path) = &self.s3_path {
227            let url =
228                Url::parse(s3_path).map_err(|err| SinkError::BigQuery(anyhow::anyhow!(err)))?;
229            let auth_vec = load_file_descriptor_from_s3(&url, aws_auth_props)
230                .await
231                .map_err(|err| SinkError::BigQuery(anyhow::anyhow!(err)))?;
232            Ok(String::from_utf8(auth_vec).map_err(|e| SinkError::BigQuery(e.into()))?)
233        } else {
234            Err(SinkError::BigQuery(anyhow::anyhow!(
235                "`bigquery.local.path` and `bigquery.s3.path` set at least one, configure as needed."
236            )))
237        }
238    }
239}
240
241#[serde_as]
242#[derive(Clone, Debug, Deserialize, WithOptions)]
243pub struct BigQueryConfig {
244    #[serde(flatten)]
245    pub common: BigQueryCommon,
246    #[serde(flatten)]
247    pub aws_auth_props: AwsAuthProps,
248    pub r#type: String, // accept "append-only" or "upsert"
249}
250
251impl EnforceSecret for BigQueryConfig {
252    fn enforce_one(prop: &str) -> crate::error::ConnectorResult<()> {
253        BigQueryCommon::enforce_one(prop)?;
254        AwsAuthProps::enforce_one(prop)?;
255        Ok(())
256    }
257}
258
259impl BigQueryConfig {
260    pub fn from_btreemap(properties: BTreeMap<String, String>) -> Result<Self> {
261        let config =
262            serde_json::from_value::<BigQueryConfig>(serde_json::to_value(properties).unwrap())
263                .map_err(|e| SinkError::Config(anyhow!(e)))?;
264        if config.r#type != SINK_TYPE_APPEND_ONLY && config.r#type != SINK_TYPE_UPSERT {
265            return Err(SinkError::Config(anyhow!(
266                "`{}` must be {}, or {}",
267                SINK_TYPE_OPTION,
268                SINK_TYPE_APPEND_ONLY,
269                SINK_TYPE_UPSERT
270            )));
271        }
272        Ok(config)
273    }
274}
275
276#[derive(Debug)]
277pub struct BigQuerySink {
278    pub config: BigQueryConfig,
279    schema: Schema,
280    pk_indices: Vec<usize>,
281    is_append_only: bool,
282}
283
284impl EnforceSecret for BigQuerySink {
285    fn enforce_secret<'a>(
286        prop_iter: impl Iterator<Item = &'a str>,
287    ) -> crate::error::ConnectorResult<()> {
288        for prop in prop_iter {
289            BigQueryConfig::enforce_one(prop)?;
290        }
291        Ok(())
292    }
293}
294
295impl BigQuerySink {
296    pub fn new(
297        config: BigQueryConfig,
298        schema: Schema,
299        pk_indices: Vec<usize>,
300        is_append_only: bool,
301    ) -> Result<Self> {
302        Ok(Self {
303            config,
304            schema,
305            pk_indices,
306            is_append_only,
307        })
308    }
309}
310
311impl BigQuerySink {
312    fn check_column_name_and_type(
313        &self,
314        big_query_columns_desc: HashMap<String, String>,
315    ) -> Result<()> {
316        let rw_fields_name = self.schema.fields();
317        if big_query_columns_desc.is_empty() {
318            return Err(SinkError::BigQuery(anyhow::anyhow!(
319                "Cannot find table in bigquery"
320            )));
321        }
322        if rw_fields_name.len().ne(&big_query_columns_desc.len()) {
323            return Err(SinkError::BigQuery(anyhow::anyhow!(
324                "The length of the RisingWave column {} must be equal to the length of the bigquery column {}",
325                rw_fields_name.len(),
326                big_query_columns_desc.len()
327            )));
328        }
329
330        for i in rw_fields_name {
331            let value = big_query_columns_desc.get(&i.name).ok_or_else(|| {
332                SinkError::BigQuery(anyhow::anyhow!(
333                    "Column `{:?}` on RisingWave side is not found on BigQuery side.",
334                    i.name
335                ))
336            })?;
337            let data_type_string = Self::get_string_and_check_support_from_datatype(&i.data_type)?;
338            if data_type_string.ne(value) {
339                return Err(SinkError::BigQuery(anyhow::anyhow!(
340                    "Data type mismatch for column `{:?}`. BigQuery side: `{:?}`, RisingWave side: `{:?}`. ",
341                    i.name,
342                    value,
343                    data_type_string
344                )));
345            };
346        }
347        Ok(())
348    }
349
350    fn get_string_and_check_support_from_datatype(rw_data_type: &DataType) -> Result<String> {
351        match rw_data_type {
352            DataType::Boolean => Ok("BOOL".to_owned()),
353            DataType::Int16 => Ok("INT64".to_owned()),
354            DataType::Int32 => Ok("INT64".to_owned()),
355            DataType::Int64 => Ok("INT64".to_owned()),
356            DataType::Float32 => Err(SinkError::BigQuery(anyhow::anyhow!(
357                "Bigquery cannot support real"
358            ))),
359            DataType::Float64 => Ok("FLOAT64".to_owned()),
360            DataType::Decimal => Ok("NUMERIC".to_owned()),
361            DataType::Date => Ok("DATE".to_owned()),
362            DataType::Varchar => Ok("STRING".to_owned()),
363            DataType::Time => Ok("TIME".to_owned()),
364            DataType::Timestamp => Ok("DATETIME".to_owned()),
365            DataType::Timestamptz => Ok("TIMESTAMP".to_owned()),
366            DataType::Interval => Ok("INTERVAL".to_owned()),
367            DataType::Struct(structs) => {
368                let mut elements_vec = vec![];
369                for (name, datatype) in structs.iter() {
370                    let element_string =
371                        Self::get_string_and_check_support_from_datatype(datatype)?;
372                    elements_vec.push(format!("{} {}", name, element_string));
373                }
374                Ok(format!("STRUCT<{}>", elements_vec.join(", ")))
375            }
376            DataType::List(l) => {
377                let element_string = Self::get_string_and_check_support_from_datatype(l.as_ref())?;
378                Ok(format!("ARRAY<{}>", element_string))
379            }
380            DataType::Bytea => Ok("BYTES".to_owned()),
381            DataType::Jsonb => Ok("JSON".to_owned()),
382            DataType::Serial => Ok("INT64".to_owned()),
383            DataType::Int256 => Err(SinkError::BigQuery(anyhow::anyhow!(
384                "Bigquery cannot support Int256"
385            ))),
386            DataType::Map(_) => Err(SinkError::BigQuery(anyhow::anyhow!(
387                "Bigquery cannot support Map"
388            ))),
389            DataType::Vector(_) => todo!("VECTOR_PLACEHOLDER"),
390        }
391    }
392
393    fn map_field(rw_field: &Field) -> Result<TableFieldSchema> {
394        let tfs = match &rw_field.data_type {
395            DataType::Boolean => TableFieldSchema::bool(&rw_field.name),
396            DataType::Int16 | DataType::Int32 | DataType::Int64 | DataType::Serial => {
397                TableFieldSchema::integer(&rw_field.name)
398            }
399            DataType::Float32 => {
400                return Err(SinkError::BigQuery(anyhow::anyhow!(
401                    "Bigquery cannot support real"
402                )));
403            }
404            DataType::Float64 => TableFieldSchema::float(&rw_field.name),
405            DataType::Decimal => TableFieldSchema::numeric(&rw_field.name),
406            DataType::Date => TableFieldSchema::date(&rw_field.name),
407            DataType::Varchar => TableFieldSchema::string(&rw_field.name),
408            DataType::Time => TableFieldSchema::time(&rw_field.name),
409            DataType::Timestamp => TableFieldSchema::date_time(&rw_field.name),
410            DataType::Timestamptz => TableFieldSchema::timestamp(&rw_field.name),
411            DataType::Interval => {
412                return Err(SinkError::BigQuery(anyhow::anyhow!(
413                    "Bigquery cannot support Interval"
414                )));
415            }
416            DataType::Struct(st) => {
417                let mut sub_fields = Vec::with_capacity(st.len());
418                for (name, dt) in st.iter() {
419                    let rw_field = Field::with_name(dt.clone(), name);
420                    let field = Self::map_field(&rw_field)?;
421                    sub_fields.push(field);
422                }
423                TableFieldSchema::record(&rw_field.name, sub_fields)
424            }
425            DataType::List(dt) => {
426                let inner_field = Self::map_field(&Field::with_name(*dt.clone(), &rw_field.name))?;
427                TableFieldSchema {
428                    mode: Some("REPEATED".to_owned()),
429                    ..inner_field
430                }
431            }
432
433            DataType::Bytea => TableFieldSchema::bytes(&rw_field.name),
434            DataType::Jsonb => TableFieldSchema::json(&rw_field.name),
435            DataType::Int256 => {
436                return Err(SinkError::BigQuery(anyhow::anyhow!(
437                    "Bigquery cannot support Int256"
438                )));
439            }
440            DataType::Map(_) => {
441                return Err(SinkError::BigQuery(anyhow::anyhow!(
442                    "Bigquery cannot support Map"
443                )));
444            }
445            DataType::Vector(_) => todo!("VECTOR_PLACEHOLDER"),
446        };
447        Ok(tfs)
448    }
449
450    async fn create_table(
451        &self,
452        client: &Client,
453        project_id: &str,
454        dataset_id: &str,
455        table_id: &str,
456        fields: &Vec<Field>,
457    ) -> Result<Table> {
458        let dataset = client
459            .dataset()
460            .get(project_id, dataset_id)
461            .await
462            .map_err(|e| SinkError::BigQuery(e.into()))?;
463        let fields: Vec<_> = fields.iter().map(Self::map_field).collect::<Result<_>>()?;
464        let table = Table::from_dataset(&dataset, table_id, TableSchema::new(fields));
465
466        client
467            .table()
468            .create(table)
469            .await
470            .map_err(|e| SinkError::BigQuery(e.into()))
471    }
472}
473
474impl Sink for BigQuerySink {
475    type Coordinator = DummySinkCommitCoordinator;
476    type LogSinker = BigQueryLogSinker;
477
478    const SINK_NAME: &'static str = BIGQUERY_SINK;
479
480    async fn new_log_sinker(&self, _writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
481        let (writer, resp_stream) = BigQuerySinkWriter::new(
482            self.config.clone(),
483            self.schema.clone(),
484            self.pk_indices.clone(),
485            self.is_append_only,
486        )
487        .await?;
488        Ok(BigQueryLogSinker::new(
489            writer,
490            resp_stream,
491            BIGQUERY_SEND_FUTURE_BUFFER_MAX_SIZE,
492        ))
493    }
494
495    async fn validate(&self) -> Result<()> {
496        risingwave_common::license::Feature::BigQuerySink
497            .check_available()
498            .map_err(|e| anyhow::anyhow!(e))?;
499        if !self.is_append_only && self.pk_indices.is_empty() {
500            return Err(SinkError::Config(anyhow!(
501                "Primary key not defined for upsert bigquery sink (please define in `primary_key` field)"
502            )));
503        }
504        let client = self
505            .config
506            .common
507            .build_client(&self.config.aws_auth_props)
508            .await?;
509        let BigQueryCommon {
510            project: project_id,
511            dataset: dataset_id,
512            table: table_id,
513            ..
514        } = &self.config.common;
515
516        if self.config.common.auto_create {
517            match client
518                .table()
519                .get(project_id, dataset_id, table_id, None)
520                .await
521            {
522                Err(BQError::RequestError(_)) => {
523                    // early return: no need to query schema to check column and type
524                    return self
525                        .create_table(
526                            &client,
527                            project_id,
528                            dataset_id,
529                            table_id,
530                            &self.schema.fields,
531                        )
532                        .await
533                        .map(|_| ());
534                }
535                Err(e) => return Err(SinkError::BigQuery(e.into())),
536                _ => {}
537            }
538        }
539
540        let mut rs = client
541            .job()
542            .query(
543                &self.config.common.project,
544                QueryRequest::new(format!(
545                    "SELECT column_name, data_type FROM `{}.{}.INFORMATION_SCHEMA.COLUMNS` WHERE table_name = '{}'",
546                    project_id, dataset_id, table_id,
547                )),
548            ).await.map_err(|e| SinkError::BigQuery(e.into()))?;
549
550        let mut big_query_schema = HashMap::default();
551        while rs.next_row() {
552            big_query_schema.insert(
553                rs.get_string_by_name("column_name")
554                    .map_err(|e| SinkError::BigQuery(e.into()))?
555                    .ok_or_else(|| {
556                        SinkError::BigQuery(anyhow::anyhow!("Cannot find column_name"))
557                    })?,
558                rs.get_string_by_name("data_type")
559                    .map_err(|e| SinkError::BigQuery(e.into()))?
560                    .ok_or_else(|| {
561                        SinkError::BigQuery(anyhow::anyhow!("Cannot find column_name"))
562                    })?,
563            );
564        }
565
566        self.check_column_name_and_type(big_query_schema)?;
567        Ok(())
568    }
569}
570
571pub struct BigQuerySinkWriter {
572    pub config: BigQueryConfig,
573    #[expect(dead_code)]
574    schema: Schema,
575    #[expect(dead_code)]
576    pk_indices: Vec<usize>,
577    client: StorageWriterClient,
578    is_append_only: bool,
579    row_encoder: ProtoEncoder,
580    writer_pb_schema: ProtoSchema,
581    #[expect(dead_code)]
582    message_descriptor: MessageDescriptor,
583    write_stream: String,
584    proto_field: Option<FieldDescriptor>,
585}
586
587impl TryFrom<SinkParam> for BigQuerySink {
588    type Error = SinkError;
589
590    fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
591        let schema = param.schema();
592        let config = BigQueryConfig::from_btreemap(param.properties)?;
593        BigQuerySink::new(
594            config,
595            schema,
596            param.downstream_pk,
597            param.sink_type.is_append_only(),
598        )
599    }
600}
601
602impl BigQuerySinkWriter {
603    pub async fn new(
604        config: BigQueryConfig,
605        schema: Schema,
606        pk_indices: Vec<usize>,
607        is_append_only: bool,
608    ) -> Result<(Self, impl Stream<Item = Result<()>>)> {
609        let (client, resp_stream) = config
610            .common
611            .build_writer_client(&config.aws_auth_props)
612            .await?;
613        let mut descriptor_proto = build_protobuf_schema(
614            schema
615                .fields()
616                .iter()
617                .map(|f| (f.name.as_str(), &f.data_type)),
618            config.common.table.clone(),
619        )?;
620
621        if !is_append_only {
622            let field = FieldDescriptorProto {
623                name: Some(CHANGE_TYPE.to_owned()),
624                number: Some((schema.len() + 1) as i32),
625                r#type: Some(field_descriptor_proto::Type::String.into()),
626                ..Default::default()
627            };
628            descriptor_proto.field.push(field);
629        }
630
631        let descriptor_pool = build_protobuf_descriptor_pool(&descriptor_proto)?;
632        let message_descriptor = descriptor_pool
633            .get_message_by_name(&config.common.table)
634            .ok_or_else(|| {
635                SinkError::BigQuery(anyhow::anyhow!(
636                    "Can't find message proto {}",
637                    &config.common.table
638                ))
639            })?;
640        let proto_field = if !is_append_only {
641            let proto_field = message_descriptor
642                .get_field_by_name(CHANGE_TYPE)
643                .ok_or_else(|| {
644                    SinkError::BigQuery(anyhow::anyhow!("Can't find {}", CHANGE_TYPE))
645                })?;
646            Some(proto_field)
647        } else {
648            None
649        };
650        let row_encoder = ProtoEncoder::new(
651            schema.clone(),
652            None,
653            message_descriptor.clone(),
654            ProtoHeader::None,
655        )?;
656        Ok((
657            Self {
658                write_stream: format!(
659                    "projects/{}/datasets/{}/tables/{}/streams/_default",
660                    config.common.project, config.common.dataset, config.common.table
661                ),
662                config,
663                schema,
664                pk_indices,
665                client,
666                is_append_only,
667                row_encoder,
668                message_descriptor,
669                proto_field,
670                writer_pb_schema: ProtoSchema {
671                    proto_descriptor: Some(descriptor_proto),
672                },
673            },
674            resp_stream,
675        ))
676    }
677
678    fn append_only(&mut self, chunk: StreamChunk) -> Result<Vec<Vec<u8>>> {
679        let mut serialized_rows: Vec<Vec<u8>> = Vec::with_capacity(chunk.capacity());
680        for (op, row) in chunk.rows() {
681            if op != Op::Insert {
682                continue;
683            }
684            serialized_rows.push(self.row_encoder.encode(row)?.ser_to()?)
685        }
686        Ok(serialized_rows)
687    }
688
689    fn upsert(&mut self, chunk: StreamChunk) -> Result<Vec<Vec<u8>>> {
690        let mut serialized_rows: Vec<Vec<u8>> = Vec::with_capacity(chunk.capacity());
691        for (op, row) in chunk.rows() {
692            if op == Op::UpdateDelete {
693                continue;
694            }
695            let mut pb_row = self.row_encoder.encode(row)?;
696            match op {
697                Op::Insert => pb_row
698                    .message
699                    .try_set_field(
700                        self.proto_field.as_ref().unwrap(),
701                        prost_reflect::Value::String("UPSERT".to_owned()),
702                    )
703                    .map_err(|e| SinkError::BigQuery(e.into()))?,
704                Op::Delete => pb_row
705                    .message
706                    .try_set_field(
707                        self.proto_field.as_ref().unwrap(),
708                        prost_reflect::Value::String("DELETE".to_owned()),
709                    )
710                    .map_err(|e| SinkError::BigQuery(e.into()))?,
711                Op::UpdateDelete => continue,
712                Op::UpdateInsert => pb_row
713                    .message
714                    .try_set_field(
715                        self.proto_field.as_ref().unwrap(),
716                        prost_reflect::Value::String("UPSERT".to_owned()),
717                    )
718                    .map_err(|e| SinkError::BigQuery(e.into()))?,
719            };
720
721            serialized_rows.push(pb_row.ser_to()?)
722        }
723        Ok(serialized_rows)
724    }
725
726    fn write_chunk(&mut self, chunk: StreamChunk) -> Result<usize> {
727        let serialized_rows = if self.is_append_only {
728            self.append_only(chunk)?
729        } else {
730            self.upsert(chunk)?
731        };
732        if serialized_rows.is_empty() {
733            return Ok(0);
734        }
735        let mut result = Vec::new();
736        let mut result_inner = Vec::new();
737        let mut size_count = 0;
738        for i in serialized_rows {
739            size_count += i.len();
740            if size_count > MAX_ROW_SIZE {
741                result.push(result_inner);
742                result_inner = Vec::new();
743                size_count = i.len();
744            }
745            result_inner.push(i);
746        }
747        if !result_inner.is_empty() {
748            result.push(result_inner);
749        }
750        let len = result.len();
751        for serialized_rows in result {
752            let rows = AppendRowsRequestRows::ProtoRows(ProtoData {
753                writer_schema: Some(self.writer_pb_schema.clone()),
754                rows: Some(ProtoRows { serialized_rows }),
755            });
756            self.client.append_rows(rows, self.write_stream.clone())?;
757        }
758        Ok(len)
759    }
760}
761
762#[try_stream(ok = (), error = SinkError)]
763pub async fn resp_to_stream(
764    resp_stream: impl Future<
765        Output = std::result::Result<
766            Response<google_cloud_gax::grpc::Streaming<AppendRowsResponse>>,
767            Status,
768        >,
769    >
770    + 'static
771    + Send,
772) {
773    let mut resp_stream = resp_stream
774        .await
775        .map_err(|e| SinkError::BigQuery(e.into()))?
776        .into_inner();
777    loop {
778        match resp_stream
779            .message()
780            .await
781            .map_err(|e| SinkError::BigQuery(e.into()))?
782        {
783            Some(append_rows_response) => {
784                if !append_rows_response.row_errors.is_empty() {
785                    return Err(SinkError::BigQuery(anyhow::anyhow!(
786                        "bigquery insert error {:?}",
787                        append_rows_response.row_errors
788                    )));
789                }
790                if let Some(google_cloud_googleapis::cloud::bigquery::storage::v1::append_rows_response::Response::Error(status)) = append_rows_response.response{
791                            return Err(SinkError::BigQuery(anyhow::anyhow!(
792                                "bigquery insert error {:?}",
793                                status
794                            )));
795                        }
796                yield ();
797            }
798            None => {
799                return Err(SinkError::BigQuery(anyhow::anyhow!(
800                    "bigquery insert error: end of resp stream",
801                )));
802            }
803        }
804    }
805}
806
807struct StorageWriterClient {
808    #[expect(dead_code)]
809    environment: Environment,
810    request_sender: mpsc::UnboundedSender<AppendRowsRequest>,
811}
812impl StorageWriterClient {
813    pub async fn new(
814        credentials: CredentialsFile,
815    ) -> Result<(Self, impl Stream<Item = Result<()>>)> {
816        let ts_grpc = google_cloud_auth::token::DefaultTokenSourceProvider::new_with_credentials(
817            Self::bigquery_grpc_auth_config(),
818            Box::new(credentials),
819        )
820        .await
821        .map_err(|e| SinkError::BigQuery(e.into()))?;
822        let conn_options = ConnectionOptions {
823            connect_timeout: CONNECT_TIMEOUT,
824            timeout: CONNECTION_TIMEOUT,
825        };
826        let environment = Environment::GoogleCloud(Box::new(ts_grpc));
827        let conn = ConnectionManager::new(DEFAULT_GRPC_CHANNEL_NUMS, &environment, &conn_options)
828            .await
829            .map_err(|e| SinkError::BigQuery(e.into()))?;
830        let mut client = conn.writer();
831
832        let (tx, rx) = mpsc::unbounded_channel();
833        let stream = tokio_stream::wrappers::UnboundedReceiverStream::new(rx);
834
835        let resp = async move { client.append_rows(Request::new(stream)).await };
836        let resp_stream = resp_to_stream(resp);
837
838        Ok((
839            StorageWriterClient {
840                environment,
841                request_sender: tx,
842            },
843            resp_stream,
844        ))
845    }
846
847    pub fn append_rows(&mut self, row: AppendRowsRequestRows, write_stream: String) -> Result<()> {
848        let append_req = AppendRowsRequest {
849            write_stream: write_stream.clone(),
850            offset: None,
851            trace_id: Uuid::new_v4().hyphenated().to_string(),
852            missing_value_interpretations: HashMap::default(),
853            rows: Some(row),
854            default_missing_value_interpretation: MissingValueInterpretation::DefaultValue as i32,
855        };
856        self.request_sender
857            .send(append_req)
858            .map_err(|e| SinkError::BigQuery(e.into()))?;
859        Ok(())
860    }
861
862    fn bigquery_grpc_auth_config() -> google_cloud_auth::project::Config<'static> {
863        let mut auth_config = google_cloud_auth::project::Config::default();
864        auth_config =
865            auth_config.with_audience(google_cloud_bigquery::grpc::apiv1::conn_pool::AUDIENCE);
866        auth_config =
867            auth_config.with_scopes(&google_cloud_bigquery::grpc::apiv1::conn_pool::SCOPES);
868        auth_config
869    }
870}
871
872fn build_protobuf_descriptor_pool(desc: &DescriptorProto) -> Result<prost_reflect::DescriptorPool> {
873    let file_descriptor = FileDescriptorProto {
874        message_type: vec![desc.clone()],
875        name: Some("bigquery".to_owned()),
876        ..Default::default()
877    };
878
879    prost_reflect::DescriptorPool::from_file_descriptor_set(FileDescriptorSet {
880        file: vec![file_descriptor],
881    })
882    .context("failed to build descriptor pool")
883    .map_err(SinkError::BigQuery)
884}
885
886fn build_protobuf_schema<'a>(
887    fields: impl Iterator<Item = (&'a str, &'a DataType)>,
888    name: String,
889) -> Result<DescriptorProto> {
890    let mut proto = DescriptorProto {
891        name: Some(name),
892        ..Default::default()
893    };
894    let mut struct_vec = vec![];
895    let field_vec = fields
896        .enumerate()
897        .map(|(index, (name, data_type))| {
898            let (field, des_proto) =
899                build_protobuf_field(data_type, (index + 1) as i32, name.to_owned())?;
900            if let Some(sv) = des_proto {
901                struct_vec.push(sv);
902            }
903            Ok(field)
904        })
905        .collect::<Result<Vec<_>>>()?;
906    proto.field = field_vec;
907    proto.nested_type = struct_vec;
908    Ok(proto)
909}
910
911fn build_protobuf_field(
912    data_type: &DataType,
913    index: i32,
914    name: String,
915) -> Result<(FieldDescriptorProto, Option<DescriptorProto>)> {
916    let mut field = FieldDescriptorProto {
917        name: Some(name.clone()),
918        number: Some(index),
919        ..Default::default()
920    };
921    match data_type {
922        DataType::Boolean => field.r#type = Some(field_descriptor_proto::Type::Bool.into()),
923        DataType::Int32 => field.r#type = Some(field_descriptor_proto::Type::Int32.into()),
924        DataType::Int16 | DataType::Int64 => {
925            field.r#type = Some(field_descriptor_proto::Type::Int64.into())
926        }
927        DataType::Float64 => field.r#type = Some(field_descriptor_proto::Type::Double.into()),
928        DataType::Decimal => field.r#type = Some(field_descriptor_proto::Type::String.into()),
929        DataType::Date => field.r#type = Some(field_descriptor_proto::Type::Int32.into()),
930        DataType::Varchar => field.r#type = Some(field_descriptor_proto::Type::String.into()),
931        DataType::Time => field.r#type = Some(field_descriptor_proto::Type::String.into()),
932        DataType::Timestamp => field.r#type = Some(field_descriptor_proto::Type::String.into()),
933        DataType::Timestamptz => field.r#type = Some(field_descriptor_proto::Type::String.into()),
934        DataType::Interval => field.r#type = Some(field_descriptor_proto::Type::String.into()),
935        DataType::Struct(s) => {
936            field.r#type = Some(field_descriptor_proto::Type::Message.into());
937            let name = format!("Struct{}", name);
938            let sub_proto = build_protobuf_schema(s.iter(), name.clone())?;
939            field.type_name = Some(name);
940            return Ok((field, Some(sub_proto)));
941        }
942        DataType::List(l) => {
943            let (mut field, proto) = build_protobuf_field(l.as_ref(), index, name.clone())?;
944            field.label = Some(field_descriptor_proto::Label::Repeated.into());
945            return Ok((field, proto));
946        }
947        DataType::Bytea => field.r#type = Some(field_descriptor_proto::Type::Bytes.into()),
948        DataType::Jsonb => field.r#type = Some(field_descriptor_proto::Type::String.into()),
949        DataType::Serial => field.r#type = Some(field_descriptor_proto::Type::Int64.into()),
950        DataType::Float32 | DataType::Int256 => {
951            return Err(SinkError::BigQuery(anyhow::anyhow!(
952                "Don't support Float32 and Int256"
953            )));
954        }
955        DataType::Map(_) => todo!(),
956        DataType::Vector(_) => todo!("VECTOR_PLACEHOLDER"),
957    }
958    Ok((field, None))
959}
960
961#[cfg(test)]
962mod test {
963
964    use std::assert_matches::assert_matches;
965
966    use risingwave_common::catalog::{Field, Schema};
967    use risingwave_common::types::{DataType, StructType};
968
969    use crate::sink::big_query::{
970        BigQuerySink, build_protobuf_descriptor_pool, build_protobuf_schema,
971    };
972
973    #[tokio::test]
974    async fn test_type_check() {
975        let big_query_type_string = "ARRAY<STRUCT<v1 ARRAY<INT64>, v2 STRUCT<v1 INT64, v2 INT64>>>";
976        let rw_datatype = DataType::List(Box::new(DataType::Struct(StructType::new(vec![
977            ("v1".to_owned(), DataType::List(Box::new(DataType::Int64))),
978            (
979                "v2".to_owned(),
980                DataType::Struct(StructType::new(vec![
981                    ("v1".to_owned(), DataType::Int64),
982                    ("v2".to_owned(), DataType::Int64),
983                ])),
984            ),
985        ]))));
986        assert_eq!(
987            BigQuerySink::get_string_and_check_support_from_datatype(&rw_datatype).unwrap(),
988            big_query_type_string
989        );
990    }
991
992    #[tokio::test]
993    async fn test_schema_check() {
994        let schema = Schema {
995            fields: vec![
996                Field::with_name(DataType::Int64, "v1"),
997                Field::with_name(DataType::Float64, "v2"),
998                Field::with_name(
999                    DataType::List(Box::new(DataType::Struct(StructType::new(vec![
1000                        ("v1".to_owned(), DataType::List(Box::new(DataType::Int64))),
1001                        (
1002                            "v3".to_owned(),
1003                            DataType::Struct(StructType::new(vec![
1004                                ("v1".to_owned(), DataType::Int64),
1005                                ("v2".to_owned(), DataType::Int64),
1006                            ])),
1007                        ),
1008                    ])))),
1009                    "v3",
1010                ),
1011            ],
1012        };
1013        let fields = schema
1014            .fields()
1015            .iter()
1016            .map(|f| (f.name.as_str(), &f.data_type));
1017        let desc = build_protobuf_schema(fields, "t1".to_owned()).unwrap();
1018        let pool = build_protobuf_descriptor_pool(&desc).unwrap();
1019        let t1_message = pool.get_message_by_name("t1").unwrap();
1020        assert_matches!(
1021            t1_message.get_field_by_name("v1").unwrap().kind(),
1022            prost_reflect::Kind::Int64
1023        );
1024        assert_matches!(
1025            t1_message.get_field_by_name("v2").unwrap().kind(),
1026            prost_reflect::Kind::Double
1027        );
1028        assert_matches!(
1029            t1_message.get_field_by_name("v3").unwrap().kind(),
1030            prost_reflect::Kind::Message(_)
1031        );
1032
1033        let v3_message = pool.get_message_by_name("t1.Structv3").unwrap();
1034        assert_matches!(
1035            v3_message.get_field_by_name("v1").unwrap().kind(),
1036            prost_reflect::Kind::Int64
1037        );
1038        assert!(v3_message.get_field_by_name("v1").unwrap().is_list());
1039
1040        let v3_v3_message = pool.get_message_by_name("t1.Structv3.Structv3").unwrap();
1041        assert_matches!(
1042            v3_v3_message.get_field_by_name("v1").unwrap().kind(),
1043            prost_reflect::Kind::Int64
1044        );
1045        assert_matches!(
1046            v3_v3_message.get_field_by_name("v2").unwrap().kind(),
1047            prost_reflect::Kind::Int64
1048        );
1049    }
1050}