risingwave_connector/sink/
big_query.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use core::pin::Pin;
16use core::time::Duration;
17use std::collections::{BTreeMap, HashMap, VecDeque};
18
19use anyhow::{Context, anyhow};
20use futures::future::pending;
21use futures::prelude::Future;
22use futures::{Stream, StreamExt};
23use futures_async_stream::try_stream;
24use gcp_bigquery_client::Client;
25use gcp_bigquery_client::error::BQError;
26use gcp_bigquery_client::model::query_request::QueryRequest;
27use gcp_bigquery_client::model::table::Table;
28use gcp_bigquery_client::model::table_field_schema::TableFieldSchema;
29use gcp_bigquery_client::model::table_schema::TableSchema;
30use google_cloud_bigquery::grpc::apiv1::conn_pool::{DOMAIN, WriteConnectionManager};
31use google_cloud_gax::conn::{ConnectionOptions, Environment};
32use google_cloud_gax::grpc::Request;
33use google_cloud_googleapis::cloud::bigquery::storage::v1::append_rows_request::{
34    MissingValueInterpretation, ProtoData, Rows as AppendRowsRequestRows,
35};
36use google_cloud_googleapis::cloud::bigquery::storage::v1::{
37    AppendRowsRequest, AppendRowsResponse, ProtoRows, ProtoSchema,
38};
39use google_cloud_pubsub::client::google_cloud_auth;
40use google_cloud_pubsub::client::google_cloud_auth::credentials::CredentialsFile;
41use prost_reflect::{FieldDescriptor, MessageDescriptor};
42use prost_types::{
43    DescriptorProto, FieldDescriptorProto, FileDescriptorProto, FileDescriptorSet,
44    field_descriptor_proto,
45};
46use risingwave_common::array::{Op, StreamChunk};
47use risingwave_common::catalog::{Field, Schema};
48use risingwave_common::types::DataType;
49use serde_derive::Deserialize;
50use serde_with::{DisplayFromStr, serde_as};
51use simd_json::prelude::ArrayTrait;
52use tokio::sync::mpsc;
53use tonic::{Response, Status, async_trait};
54use url::Url;
55use uuid::Uuid;
56use with_options::WithOptions;
57use yup_oauth2::ServiceAccountKey;
58
59use super::encoder::{ProtoEncoder, ProtoHeader, RowEncoder, SerTo};
60use super::log_store::{LogStoreReadItem, TruncateOffset};
61use super::{
62    LogSinker, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, SinkError, SinkLogReader,
63};
64use crate::aws_utils::load_file_descriptor_from_s3;
65use crate::connector_common::AwsAuthProps;
66use crate::sink::{DummySinkCommitCoordinator, Result, Sink, SinkParam, SinkWriterParam};
67
68pub const BIGQUERY_SINK: &str = "bigquery";
69pub const CHANGE_TYPE: &str = "_CHANGE_TYPE";
70const DEFAULT_GRPC_CHANNEL_NUMS: usize = 4;
71const CONNECT_TIMEOUT: Option<Duration> = Some(Duration::from_secs(30));
72const CONNECTION_TIMEOUT: Option<Duration> = None;
73const BIGQUERY_SEND_FUTURE_BUFFER_MAX_SIZE: usize = 65536;
74// < 10MB, we set 8MB
75const MAX_ROW_SIZE: usize = 8 * 1024 * 1024;
76
77#[serde_as]
78#[derive(Deserialize, Debug, Clone, WithOptions)]
79pub struct BigQueryCommon {
80    #[serde(rename = "bigquery.local.path")]
81    pub local_path: Option<String>,
82    #[serde(rename = "bigquery.s3.path")]
83    pub s3_path: Option<String>,
84    #[serde(rename = "bigquery.project")]
85    pub project: String,
86    #[serde(rename = "bigquery.dataset")]
87    pub dataset: String,
88    #[serde(rename = "bigquery.table")]
89    pub table: String,
90    #[serde(default)] // default false
91    #[serde_as(as = "DisplayFromStr")]
92    pub auto_create: bool,
93    #[serde(rename = "bigquery.credentials")]
94    pub credentials: Option<String>,
95}
96
97struct BigQueryFutureManager {
98    // `offset_queue` holds the Some corresponding to each future.
99    // When TruncateOffset is barrier, the num is 0, we don't need to wait for the return of `resp_stream`.
100    // When TruncateOffset is chunk:
101    // 1. chunk has no rows. we didn't send, the num is 0, we don't need to wait for the return of `resp_stream`.
102    // 2. chunk is less than `MAX_ROW_SIZE`, we only sent once, the num is 1 and we only have to wait once for `resp_stream`.
103    // 3. chunk is less than `MAX_ROW_SIZE`, we only sent n, the num is n and we need to wait n times for r.
104    offset_queue: VecDeque<(TruncateOffset, usize)>,
105    resp_stream: Pin<Box<dyn Stream<Item = Result<()>> + Send>>,
106}
107impl BigQueryFutureManager {
108    pub fn new(
109        max_future_num: usize,
110        resp_stream: impl Stream<Item = Result<()>> + Send + 'static,
111    ) -> Self {
112        let offset_queue = VecDeque::with_capacity(max_future_num);
113        Self {
114            offset_queue,
115            resp_stream: Box::pin(resp_stream),
116        }
117    }
118
119    pub fn add_offset(&mut self, offset: TruncateOffset, resp_num: usize) {
120        self.offset_queue.push_back((offset, resp_num));
121    }
122
123    pub async fn next_offset(&mut self) -> Result<TruncateOffset> {
124        if let Some((_offset, remaining_resp_num)) = self.offset_queue.front_mut() {
125            if *remaining_resp_num == 0 {
126                return Ok(self.offset_queue.pop_front().unwrap().0);
127            }
128            while *remaining_resp_num > 0 {
129                self.resp_stream
130                    .next()
131                    .await
132                    .ok_or_else(|| SinkError::BigQuery(anyhow::anyhow!("end of stream")))??;
133                *remaining_resp_num -= 1;
134            }
135            Ok(self.offset_queue.pop_front().unwrap().0)
136        } else {
137            pending().await
138        }
139    }
140}
141pub struct BigQueryLogSinker {
142    writer: BigQuerySinkWriter,
143    bigquery_future_manager: BigQueryFutureManager,
144    future_num: usize,
145}
146impl BigQueryLogSinker {
147    pub fn new(
148        writer: BigQuerySinkWriter,
149        resp_stream: impl Stream<Item = Result<()>> + Send + 'static,
150        future_num: usize,
151    ) -> Self {
152        Self {
153            writer,
154            bigquery_future_manager: BigQueryFutureManager::new(future_num, resp_stream),
155            future_num,
156        }
157    }
158}
159
160#[async_trait]
161impl LogSinker for BigQueryLogSinker {
162    async fn consume_log_and_sink(mut self, mut log_reader: impl SinkLogReader) -> Result<!> {
163        log_reader.start_from(None).await?;
164        loop {
165            tokio::select!(
166                offset = self.bigquery_future_manager.next_offset() => {
167                        log_reader.truncate(offset?)?;
168                }
169                item_result = log_reader.next_item(), if self.bigquery_future_manager.offset_queue.len() <= self.future_num => {
170                    let (epoch, item) = item_result?;
171                    match item {
172                        LogStoreReadItem::StreamChunk { chunk_id, chunk } => {
173                            let resp_num = self.writer.write_chunk(chunk)?;
174                            self.bigquery_future_manager
175                                .add_offset(TruncateOffset::Chunk { epoch, chunk_id },resp_num);
176                        }
177                        LogStoreReadItem::Barrier { .. } => {
178                            self.bigquery_future_manager
179                                .add_offset(TruncateOffset::Barrier { epoch },0);
180                        }
181                    }
182                }
183            )
184        }
185    }
186}
187
188impl BigQueryCommon {
189    async fn build_client(&self, aws_auth_props: &AwsAuthProps) -> Result<Client> {
190        let auth_json = self.get_auth_json_from_path(aws_auth_props).await?;
191
192        let service_account = serde_json::from_str::<ServiceAccountKey>(&auth_json)
193            .map_err(|err| SinkError::BigQuery(anyhow::anyhow!(err)))?;
194        let client: Client = Client::from_service_account_key(service_account, false)
195            .await
196            .map_err(|err| SinkError::BigQuery(anyhow::anyhow!(err)))?;
197        Ok(client)
198    }
199
200    async fn build_writer_client(
201        &self,
202        aws_auth_props: &AwsAuthProps,
203    ) -> Result<(StorageWriterClient, impl Stream<Item = Result<()>> + use<>)> {
204        let auth_json = self.get_auth_json_from_path(aws_auth_props).await?;
205
206        let credentials_file = CredentialsFile::new_from_str(&auth_json)
207            .await
208            .map_err(|e| SinkError::BigQuery(e.into()))?;
209        StorageWriterClient::new(credentials_file).await
210    }
211
212    async fn get_auth_json_from_path(&self, aws_auth_props: &AwsAuthProps) -> Result<String> {
213        if let Some(credentials) = &self.credentials {
214            Ok(credentials.clone())
215        } else if let Some(local_path) = &self.local_path {
216            std::fs::read_to_string(local_path)
217                .map_err(|err| SinkError::BigQuery(anyhow::anyhow!(err)))
218        } else if let Some(s3_path) = &self.s3_path {
219            let url =
220                Url::parse(s3_path).map_err(|err| SinkError::BigQuery(anyhow::anyhow!(err)))?;
221            let auth_vec = load_file_descriptor_from_s3(&url, aws_auth_props)
222                .await
223                .map_err(|err| SinkError::BigQuery(anyhow::anyhow!(err)))?;
224            Ok(String::from_utf8(auth_vec).map_err(|e| SinkError::BigQuery(e.into()))?)
225        } else {
226            Err(SinkError::BigQuery(anyhow::anyhow!(
227                "`bigquery.local.path` and `bigquery.s3.path` set at least one, configure as needed."
228            )))
229        }
230    }
231}
232
233#[serde_as]
234#[derive(Clone, Debug, Deserialize, WithOptions)]
235pub struct BigQueryConfig {
236    #[serde(flatten)]
237    pub common: BigQueryCommon,
238    #[serde(flatten)]
239    pub aws_auth_props: AwsAuthProps,
240    pub r#type: String, // accept "append-only" or "upsert"
241}
242impl BigQueryConfig {
243    pub fn from_btreemap(properties: BTreeMap<String, String>) -> Result<Self> {
244        let config =
245            serde_json::from_value::<BigQueryConfig>(serde_json::to_value(properties).unwrap())
246                .map_err(|e| SinkError::Config(anyhow!(e)))?;
247        if config.r#type != SINK_TYPE_APPEND_ONLY && config.r#type != SINK_TYPE_UPSERT {
248            return Err(SinkError::Config(anyhow!(
249                "`{}` must be {}, or {}",
250                SINK_TYPE_OPTION,
251                SINK_TYPE_APPEND_ONLY,
252                SINK_TYPE_UPSERT
253            )));
254        }
255        Ok(config)
256    }
257}
258
259#[derive(Debug)]
260pub struct BigQuerySink {
261    pub config: BigQueryConfig,
262    schema: Schema,
263    pk_indices: Vec<usize>,
264    is_append_only: bool,
265}
266
267impl BigQuerySink {
268    pub fn new(
269        config: BigQueryConfig,
270        schema: Schema,
271        pk_indices: Vec<usize>,
272        is_append_only: bool,
273    ) -> Result<Self> {
274        Ok(Self {
275            config,
276            schema,
277            pk_indices,
278            is_append_only,
279        })
280    }
281}
282
283impl BigQuerySink {
284    fn check_column_name_and_type(
285        &self,
286        big_query_columns_desc: HashMap<String, String>,
287    ) -> Result<()> {
288        let rw_fields_name = self.schema.fields();
289        if big_query_columns_desc.is_empty() {
290            return Err(SinkError::BigQuery(anyhow::anyhow!(
291                "Cannot find table in bigquery"
292            )));
293        }
294        if rw_fields_name.len().ne(&big_query_columns_desc.len()) {
295            return Err(SinkError::BigQuery(anyhow::anyhow!(
296                "The length of the RisingWave column {} must be equal to the length of the bigquery column {}",
297                rw_fields_name.len(),
298                big_query_columns_desc.len()
299            )));
300        }
301
302        for i in rw_fields_name {
303            let value = big_query_columns_desc.get(&i.name).ok_or_else(|| {
304                SinkError::BigQuery(anyhow::anyhow!(
305                    "Column `{:?}` on RisingWave side is not found on BigQuery side.",
306                    i.name
307                ))
308            })?;
309            let data_type_string = Self::get_string_and_check_support_from_datatype(&i.data_type)?;
310            if data_type_string.ne(value) {
311                return Err(SinkError::BigQuery(anyhow::anyhow!(
312                    "Data type mismatch for column `{:?}`. BigQuery side: `{:?}`, RisingWave side: `{:?}`. ",
313                    i.name,
314                    value,
315                    data_type_string
316                )));
317            };
318        }
319        Ok(())
320    }
321
322    fn get_string_and_check_support_from_datatype(rw_data_type: &DataType) -> Result<String> {
323        match rw_data_type {
324            DataType::Boolean => Ok("BOOL".to_owned()),
325            DataType::Int16 => Ok("INT64".to_owned()),
326            DataType::Int32 => Ok("INT64".to_owned()),
327            DataType::Int64 => Ok("INT64".to_owned()),
328            DataType::Float32 => Err(SinkError::BigQuery(anyhow::anyhow!(
329                "Bigquery cannot support real"
330            ))),
331            DataType::Float64 => Ok("FLOAT64".to_owned()),
332            DataType::Decimal => Ok("NUMERIC".to_owned()),
333            DataType::Date => Ok("DATE".to_owned()),
334            DataType::Varchar => Ok("STRING".to_owned()),
335            DataType::Time => Ok("TIME".to_owned()),
336            DataType::Timestamp => Ok("DATETIME".to_owned()),
337            DataType::Timestamptz => Ok("TIMESTAMP".to_owned()),
338            DataType::Interval => Ok("INTERVAL".to_owned()),
339            DataType::Struct(structs) => {
340                let mut elements_vec = vec![];
341                for (name, datatype) in structs.iter() {
342                    let element_string =
343                        Self::get_string_and_check_support_from_datatype(datatype)?;
344                    elements_vec.push(format!("{} {}", name, element_string));
345                }
346                Ok(format!("STRUCT<{}>", elements_vec.join(", ")))
347            }
348            DataType::List(l) => {
349                let element_string = Self::get_string_and_check_support_from_datatype(l.as_ref())?;
350                Ok(format!("ARRAY<{}>", element_string))
351            }
352            DataType::Bytea => Ok("BYTES".to_owned()),
353            DataType::Jsonb => Ok("JSON".to_owned()),
354            DataType::Serial => Ok("INT64".to_owned()),
355            DataType::Int256 => Err(SinkError::BigQuery(anyhow::anyhow!(
356                "Bigquery cannot support Int256"
357            ))),
358            DataType::Map(_) => Err(SinkError::BigQuery(anyhow::anyhow!(
359                "Bigquery cannot support Map"
360            ))),
361        }
362    }
363
364    fn map_field(rw_field: &Field) -> Result<TableFieldSchema> {
365        let tfs = match &rw_field.data_type {
366            DataType::Boolean => TableFieldSchema::bool(&rw_field.name),
367            DataType::Int16 | DataType::Int32 | DataType::Int64 | DataType::Serial => {
368                TableFieldSchema::integer(&rw_field.name)
369            }
370            DataType::Float32 => {
371                return Err(SinkError::BigQuery(anyhow::anyhow!(
372                    "Bigquery cannot support real"
373                )));
374            }
375            DataType::Float64 => TableFieldSchema::float(&rw_field.name),
376            DataType::Decimal => TableFieldSchema::numeric(&rw_field.name),
377            DataType::Date => TableFieldSchema::date(&rw_field.name),
378            DataType::Varchar => TableFieldSchema::string(&rw_field.name),
379            DataType::Time => TableFieldSchema::time(&rw_field.name),
380            DataType::Timestamp => TableFieldSchema::date_time(&rw_field.name),
381            DataType::Timestamptz => TableFieldSchema::timestamp(&rw_field.name),
382            DataType::Interval => {
383                return Err(SinkError::BigQuery(anyhow::anyhow!(
384                    "Bigquery cannot support Interval"
385                )));
386            }
387            DataType::Struct(st) => {
388                let mut sub_fields = Vec::with_capacity(st.len());
389                for (name, dt) in st.iter() {
390                    let rw_field = Field::with_name(dt.clone(), name);
391                    let field = Self::map_field(&rw_field)?;
392                    sub_fields.push(field);
393                }
394                TableFieldSchema::record(&rw_field.name, sub_fields)
395            }
396            DataType::List(dt) => {
397                let inner_field = Self::map_field(&Field::with_name(*dt.clone(), &rw_field.name))?;
398                TableFieldSchema {
399                    mode: Some("REPEATED".to_owned()),
400                    ..inner_field
401                }
402            }
403
404            DataType::Bytea => TableFieldSchema::bytes(&rw_field.name),
405            DataType::Jsonb => TableFieldSchema::json(&rw_field.name),
406            DataType::Int256 => {
407                return Err(SinkError::BigQuery(anyhow::anyhow!(
408                    "Bigquery cannot support Int256"
409                )));
410            }
411            DataType::Map(_) => {
412                return Err(SinkError::BigQuery(anyhow::anyhow!(
413                    "Bigquery cannot support Map"
414                )));
415            }
416        };
417        Ok(tfs)
418    }
419
420    async fn create_table(
421        &self,
422        client: &Client,
423        project_id: &str,
424        dataset_id: &str,
425        table_id: &str,
426        fields: &Vec<Field>,
427    ) -> Result<Table> {
428        let dataset = client
429            .dataset()
430            .get(project_id, dataset_id)
431            .await
432            .map_err(|e| SinkError::BigQuery(e.into()))?;
433        let fields: Vec<_> = fields.iter().map(Self::map_field).collect::<Result<_>>()?;
434        let table = Table::from_dataset(&dataset, table_id, TableSchema::new(fields));
435
436        client
437            .table()
438            .create(table)
439            .await
440            .map_err(|e| SinkError::BigQuery(e.into()))
441    }
442}
443
444impl Sink for BigQuerySink {
445    type Coordinator = DummySinkCommitCoordinator;
446    type LogSinker = BigQueryLogSinker;
447
448    const SINK_NAME: &'static str = BIGQUERY_SINK;
449
450    async fn new_log_sinker(&self, _writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
451        let (writer, resp_stream) = BigQuerySinkWriter::new(
452            self.config.clone(),
453            self.schema.clone(),
454            self.pk_indices.clone(),
455            self.is_append_only,
456        )
457        .await?;
458        Ok(BigQueryLogSinker::new(
459            writer,
460            resp_stream,
461            BIGQUERY_SEND_FUTURE_BUFFER_MAX_SIZE,
462        ))
463    }
464
465    async fn validate(&self) -> Result<()> {
466        risingwave_common::license::Feature::BigQuerySink
467            .check_available()
468            .map_err(|e| anyhow::anyhow!(e))?;
469        if !self.is_append_only && self.pk_indices.is_empty() {
470            return Err(SinkError::Config(anyhow!(
471                "Primary key not defined for upsert bigquery sink (please define in `primary_key` field)"
472            )));
473        }
474        let client = self
475            .config
476            .common
477            .build_client(&self.config.aws_auth_props)
478            .await?;
479        let BigQueryCommon {
480            project: project_id,
481            dataset: dataset_id,
482            table: table_id,
483            ..
484        } = &self.config.common;
485
486        if self.config.common.auto_create {
487            match client
488                .table()
489                .get(project_id, dataset_id, table_id, None)
490                .await
491            {
492                Err(BQError::RequestError(_)) => {
493                    // early return: no need to query schema to check column and type
494                    return self
495                        .create_table(
496                            &client,
497                            project_id,
498                            dataset_id,
499                            table_id,
500                            &self.schema.fields,
501                        )
502                        .await
503                        .map(|_| ());
504                }
505                Err(e) => return Err(SinkError::BigQuery(e.into())),
506                _ => {}
507            }
508        }
509
510        let mut rs = client
511            .job()
512            .query(
513                &self.config.common.project,
514                QueryRequest::new(format!(
515                    "SELECT column_name, data_type FROM `{}.{}.INFORMATION_SCHEMA.COLUMNS` WHERE table_name = '{}'",
516                    project_id, dataset_id, table_id,
517                )),
518            ).await.map_err(|e| SinkError::BigQuery(e.into()))?;
519
520        let mut big_query_schema = HashMap::default();
521        while rs.next_row() {
522            big_query_schema.insert(
523                rs.get_string_by_name("column_name")
524                    .map_err(|e| SinkError::BigQuery(e.into()))?
525                    .ok_or_else(|| {
526                        SinkError::BigQuery(anyhow::anyhow!("Cannot find column_name"))
527                    })?,
528                rs.get_string_by_name("data_type")
529                    .map_err(|e| SinkError::BigQuery(e.into()))?
530                    .ok_or_else(|| {
531                        SinkError::BigQuery(anyhow::anyhow!("Cannot find column_name"))
532                    })?,
533            );
534        }
535
536        self.check_column_name_and_type(big_query_schema)?;
537        Ok(())
538    }
539}
540
541pub struct BigQuerySinkWriter {
542    pub config: BigQueryConfig,
543    #[expect(dead_code)]
544    schema: Schema,
545    #[expect(dead_code)]
546    pk_indices: Vec<usize>,
547    client: StorageWriterClient,
548    is_append_only: bool,
549    row_encoder: ProtoEncoder,
550    writer_pb_schema: ProtoSchema,
551    #[expect(dead_code)]
552    message_descriptor: MessageDescriptor,
553    write_stream: String,
554    proto_field: Option<FieldDescriptor>,
555}
556
557impl TryFrom<SinkParam> for BigQuerySink {
558    type Error = SinkError;
559
560    fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
561        let schema = param.schema();
562        let config = BigQueryConfig::from_btreemap(param.properties)?;
563        BigQuerySink::new(
564            config,
565            schema,
566            param.downstream_pk,
567            param.sink_type.is_append_only(),
568        )
569    }
570}
571
572impl BigQuerySinkWriter {
573    pub async fn new(
574        config: BigQueryConfig,
575        schema: Schema,
576        pk_indices: Vec<usize>,
577        is_append_only: bool,
578    ) -> Result<(Self, impl Stream<Item = Result<()>>)> {
579        let (client, resp_stream) = config
580            .common
581            .build_writer_client(&config.aws_auth_props)
582            .await?;
583        let mut descriptor_proto = build_protobuf_schema(
584            schema
585                .fields()
586                .iter()
587                .map(|f| (f.name.as_str(), &f.data_type)),
588            config.common.table.clone(),
589        )?;
590
591        if !is_append_only {
592            let field = FieldDescriptorProto {
593                name: Some(CHANGE_TYPE.to_owned()),
594                number: Some((schema.len() + 1) as i32),
595                r#type: Some(field_descriptor_proto::Type::String.into()),
596                ..Default::default()
597            };
598            descriptor_proto.field.push(field);
599        }
600
601        let descriptor_pool = build_protobuf_descriptor_pool(&descriptor_proto)?;
602        let message_descriptor = descriptor_pool
603            .get_message_by_name(&config.common.table)
604            .ok_or_else(|| {
605                SinkError::BigQuery(anyhow::anyhow!(
606                    "Can't find message proto {}",
607                    &config.common.table
608                ))
609            })?;
610        let proto_field = if !is_append_only {
611            let proto_field = message_descriptor
612                .get_field_by_name(CHANGE_TYPE)
613                .ok_or_else(|| {
614                    SinkError::BigQuery(anyhow::anyhow!("Can't find {}", CHANGE_TYPE))
615                })?;
616            Some(proto_field)
617        } else {
618            None
619        };
620        let row_encoder = ProtoEncoder::new(
621            schema.clone(),
622            None,
623            message_descriptor.clone(),
624            ProtoHeader::None,
625        )?;
626        Ok((
627            Self {
628                write_stream: format!(
629                    "projects/{}/datasets/{}/tables/{}/streams/_default",
630                    config.common.project, config.common.dataset, config.common.table
631                ),
632                config,
633                schema,
634                pk_indices,
635                client,
636                is_append_only,
637                row_encoder,
638                message_descriptor,
639                proto_field,
640                writer_pb_schema: ProtoSchema {
641                    proto_descriptor: Some(descriptor_proto),
642                },
643            },
644            resp_stream,
645        ))
646    }
647
648    fn append_only(&mut self, chunk: StreamChunk) -> Result<Vec<Vec<u8>>> {
649        let mut serialized_rows: Vec<Vec<u8>> = Vec::with_capacity(chunk.capacity());
650        for (op, row) in chunk.rows() {
651            if op != Op::Insert {
652                continue;
653            }
654            serialized_rows.push(self.row_encoder.encode(row)?.ser_to()?)
655        }
656        Ok(serialized_rows)
657    }
658
659    fn upsert(&mut self, chunk: StreamChunk) -> Result<Vec<Vec<u8>>> {
660        let mut serialized_rows: Vec<Vec<u8>> = Vec::with_capacity(chunk.capacity());
661        for (op, row) in chunk.rows() {
662            if op == Op::UpdateDelete {
663                continue;
664            }
665            let mut pb_row = self.row_encoder.encode(row)?;
666            match op {
667                Op::Insert => pb_row
668                    .message
669                    .try_set_field(
670                        self.proto_field.as_ref().unwrap(),
671                        prost_reflect::Value::String("UPSERT".to_owned()),
672                    )
673                    .map_err(|e| SinkError::BigQuery(e.into()))?,
674                Op::Delete => pb_row
675                    .message
676                    .try_set_field(
677                        self.proto_field.as_ref().unwrap(),
678                        prost_reflect::Value::String("DELETE".to_owned()),
679                    )
680                    .map_err(|e| SinkError::BigQuery(e.into()))?,
681                Op::UpdateDelete => continue,
682                Op::UpdateInsert => pb_row
683                    .message
684                    .try_set_field(
685                        self.proto_field.as_ref().unwrap(),
686                        prost_reflect::Value::String("UPSERT".to_owned()),
687                    )
688                    .map_err(|e| SinkError::BigQuery(e.into()))?,
689            };
690
691            serialized_rows.push(pb_row.ser_to()?)
692        }
693        Ok(serialized_rows)
694    }
695
696    fn write_chunk(&mut self, chunk: StreamChunk) -> Result<usize> {
697        let serialized_rows = if self.is_append_only {
698            self.append_only(chunk)?
699        } else {
700            self.upsert(chunk)?
701        };
702        if serialized_rows.is_empty() {
703            return Ok(0);
704        }
705        let mut result = Vec::new();
706        let mut result_inner = Vec::new();
707        let mut size_count = 0;
708        for i in serialized_rows {
709            size_count += i.len();
710            if size_count > MAX_ROW_SIZE {
711                result.push(result_inner);
712                result_inner = Vec::new();
713                size_count = i.len();
714            }
715            result_inner.push(i);
716        }
717        if !result_inner.is_empty() {
718            result.push(result_inner);
719        }
720        let len = result.len();
721        for serialized_rows in result {
722            let rows = AppendRowsRequestRows::ProtoRows(ProtoData {
723                writer_schema: Some(self.writer_pb_schema.clone()),
724                rows: Some(ProtoRows { serialized_rows }),
725            });
726            self.client.append_rows(rows, self.write_stream.clone())?;
727        }
728        Ok(len)
729    }
730}
731
732#[try_stream(ok = (), error = SinkError)]
733pub async fn resp_to_stream(
734    resp_stream: impl Future<
735        Output = std::result::Result<
736            Response<google_cloud_gax::grpc::Streaming<AppendRowsResponse>>,
737            Status,
738        >,
739    >
740    + 'static
741    + Send,
742) {
743    let mut resp_stream = resp_stream
744        .await
745        .map_err(|e| SinkError::BigQuery(e.into()))?
746        .into_inner();
747    loop {
748        match resp_stream
749            .message()
750            .await
751            .map_err(|e| SinkError::BigQuery(e.into()))?
752        {
753            Some(append_rows_response) => {
754                if !append_rows_response.row_errors.is_empty() {
755                    return Err(SinkError::BigQuery(anyhow::anyhow!(
756                        "bigquery insert error {:?}",
757                        append_rows_response.row_errors
758                    )));
759                }
760                if let Some(google_cloud_googleapis::cloud::bigquery::storage::v1::append_rows_response::Response::Error(status)) = append_rows_response.response{
761                            return Err(SinkError::BigQuery(anyhow::anyhow!(
762                                "bigquery insert error {:?}",
763                                status
764                            )));
765                        }
766                yield ();
767            }
768            None => {
769                return Err(SinkError::BigQuery(anyhow::anyhow!(
770                    "bigquery insert error: end of resp stream",
771                )));
772            }
773        }
774    }
775}
776
777struct StorageWriterClient {
778    #[expect(dead_code)]
779    environment: Environment,
780    request_sender: mpsc::UnboundedSender<AppendRowsRequest>,
781}
782impl StorageWriterClient {
783    pub async fn new(
784        credentials: CredentialsFile,
785    ) -> Result<(Self, impl Stream<Item = Result<()>>)> {
786        let ts_grpc = google_cloud_auth::token::DefaultTokenSourceProvider::new_with_credentials(
787            Self::bigquery_grpc_auth_config(),
788            Box::new(credentials),
789        )
790        .await
791        .map_err(|e| SinkError::BigQuery(e.into()))?;
792        let conn_options = ConnectionOptions {
793            connect_timeout: CONNECT_TIMEOUT,
794            timeout: CONNECTION_TIMEOUT,
795        };
796        let environment = Environment::GoogleCloud(Box::new(ts_grpc));
797        let conn = WriteConnectionManager::new(
798            DEFAULT_GRPC_CHANNEL_NUMS,
799            &environment,
800            DOMAIN,
801            &conn_options,
802        )
803        .await
804        .map_err(|e| SinkError::BigQuery(e.into()))?;
805        let mut client = conn.conn();
806
807        let (tx, rx) = mpsc::unbounded_channel();
808        let stream = tokio_stream::wrappers::UnboundedReceiverStream::new(rx);
809
810        let resp = async move { client.append_rows(Request::new(stream)).await };
811        let resp_stream = resp_to_stream(resp);
812
813        Ok((
814            StorageWriterClient {
815                environment,
816                request_sender: tx,
817            },
818            resp_stream,
819        ))
820    }
821
822    pub fn append_rows(&mut self, row: AppendRowsRequestRows, write_stream: String) -> Result<()> {
823        let append_req = AppendRowsRequest {
824            write_stream: write_stream.clone(),
825            offset: None,
826            trace_id: Uuid::new_v4().hyphenated().to_string(),
827            missing_value_interpretations: HashMap::default(),
828            rows: Some(row),
829            default_missing_value_interpretation: MissingValueInterpretation::DefaultValue as i32,
830        };
831        self.request_sender
832            .send(append_req)
833            .map_err(|e| SinkError::BigQuery(e.into()))?;
834        Ok(())
835    }
836
837    fn bigquery_grpc_auth_config() -> google_cloud_auth::project::Config<'static> {
838        let mut auth_config = google_cloud_auth::project::Config::default();
839        auth_config =
840            auth_config.with_audience(google_cloud_bigquery::grpc::apiv1::conn_pool::AUDIENCE);
841        auth_config =
842            auth_config.with_scopes(&google_cloud_bigquery::grpc::apiv1::conn_pool::SCOPES);
843        auth_config
844    }
845}
846
847fn build_protobuf_descriptor_pool(desc: &DescriptorProto) -> Result<prost_reflect::DescriptorPool> {
848    let file_descriptor = FileDescriptorProto {
849        message_type: vec![desc.clone()],
850        name: Some("bigquery".to_owned()),
851        ..Default::default()
852    };
853
854    prost_reflect::DescriptorPool::from_file_descriptor_set(FileDescriptorSet {
855        file: vec![file_descriptor],
856    })
857    .context("failed to build descriptor pool")
858    .map_err(SinkError::BigQuery)
859}
860
861fn build_protobuf_schema<'a>(
862    fields: impl Iterator<Item = (&'a str, &'a DataType)>,
863    name: String,
864) -> Result<DescriptorProto> {
865    let mut proto = DescriptorProto {
866        name: Some(name),
867        ..Default::default()
868    };
869    let mut struct_vec = vec![];
870    let field_vec = fields
871        .enumerate()
872        .map(|(index, (name, data_type))| {
873            let (field, des_proto) =
874                build_protobuf_field(data_type, (index + 1) as i32, name.to_owned())?;
875            if let Some(sv) = des_proto {
876                struct_vec.push(sv);
877            }
878            Ok(field)
879        })
880        .collect::<Result<Vec<_>>>()?;
881    proto.field = field_vec;
882    proto.nested_type = struct_vec;
883    Ok(proto)
884}
885
886fn build_protobuf_field(
887    data_type: &DataType,
888    index: i32,
889    name: String,
890) -> Result<(FieldDescriptorProto, Option<DescriptorProto>)> {
891    let mut field = FieldDescriptorProto {
892        name: Some(name.clone()),
893        number: Some(index),
894        ..Default::default()
895    };
896    match data_type {
897        DataType::Boolean => field.r#type = Some(field_descriptor_proto::Type::Bool.into()),
898        DataType::Int32 => field.r#type = Some(field_descriptor_proto::Type::Int32.into()),
899        DataType::Int16 | DataType::Int64 => {
900            field.r#type = Some(field_descriptor_proto::Type::Int64.into())
901        }
902        DataType::Float64 => field.r#type = Some(field_descriptor_proto::Type::Double.into()),
903        DataType::Decimal => field.r#type = Some(field_descriptor_proto::Type::String.into()),
904        DataType::Date => field.r#type = Some(field_descriptor_proto::Type::Int32.into()),
905        DataType::Varchar => field.r#type = Some(field_descriptor_proto::Type::String.into()),
906        DataType::Time => field.r#type = Some(field_descriptor_proto::Type::String.into()),
907        DataType::Timestamp => field.r#type = Some(field_descriptor_proto::Type::String.into()),
908        DataType::Timestamptz => field.r#type = Some(field_descriptor_proto::Type::String.into()),
909        DataType::Interval => field.r#type = Some(field_descriptor_proto::Type::String.into()),
910        DataType::Struct(s) => {
911            field.r#type = Some(field_descriptor_proto::Type::Message.into());
912            let name = format!("Struct{}", name);
913            let sub_proto = build_protobuf_schema(s.iter(), name.clone())?;
914            field.type_name = Some(name);
915            return Ok((field, Some(sub_proto)));
916        }
917        DataType::List(l) => {
918            let (mut field, proto) = build_protobuf_field(l.as_ref(), index, name.clone())?;
919            field.label = Some(field_descriptor_proto::Label::Repeated.into());
920            return Ok((field, proto));
921        }
922        DataType::Bytea => field.r#type = Some(field_descriptor_proto::Type::Bytes.into()),
923        DataType::Jsonb => field.r#type = Some(field_descriptor_proto::Type::String.into()),
924        DataType::Serial => field.r#type = Some(field_descriptor_proto::Type::Int64.into()),
925        DataType::Float32 | DataType::Int256 => {
926            return Err(SinkError::BigQuery(anyhow::anyhow!(
927                "Don't support Float32 and Int256"
928            )));
929        }
930        DataType::Map(_) => todo!(),
931    }
932    Ok((field, None))
933}
934
935#[cfg(test)]
936mod test {
937
938    use std::assert_matches::assert_matches;
939
940    use risingwave_common::catalog::{Field, Schema};
941    use risingwave_common::types::{DataType, StructType};
942
943    use crate::sink::big_query::{
944        BigQuerySink, build_protobuf_descriptor_pool, build_protobuf_schema,
945    };
946
947    #[tokio::test]
948    async fn test_type_check() {
949        let big_query_type_string = "ARRAY<STRUCT<v1 ARRAY<INT64>, v2 STRUCT<v1 INT64, v2 INT64>>>";
950        let rw_datatype = DataType::List(Box::new(DataType::Struct(StructType::new(vec![
951            ("v1".to_owned(), DataType::List(Box::new(DataType::Int64))),
952            (
953                "v2".to_owned(),
954                DataType::Struct(StructType::new(vec![
955                    ("v1".to_owned(), DataType::Int64),
956                    ("v2".to_owned(), DataType::Int64),
957                ])),
958            ),
959        ]))));
960        assert_eq!(
961            BigQuerySink::get_string_and_check_support_from_datatype(&rw_datatype).unwrap(),
962            big_query_type_string
963        );
964    }
965
966    #[tokio::test]
967    async fn test_schema_check() {
968        let schema = Schema {
969            fields: vec![
970                Field::with_name(DataType::Int64, "v1"),
971                Field::with_name(DataType::Float64, "v2"),
972                Field::with_name(
973                    DataType::List(Box::new(DataType::Struct(StructType::new(vec![
974                        ("v1".to_owned(), DataType::List(Box::new(DataType::Int64))),
975                        (
976                            "v3".to_owned(),
977                            DataType::Struct(StructType::new(vec![
978                                ("v1".to_owned(), DataType::Int64),
979                                ("v2".to_owned(), DataType::Int64),
980                            ])),
981                        ),
982                    ])))),
983                    "v3",
984                ),
985            ],
986        };
987        let fields = schema
988            .fields()
989            .iter()
990            .map(|f| (f.name.as_str(), &f.data_type));
991        let desc = build_protobuf_schema(fields, "t1".to_owned()).unwrap();
992        let pool = build_protobuf_descriptor_pool(&desc).unwrap();
993        let t1_message = pool.get_message_by_name("t1").unwrap();
994        assert_matches!(
995            t1_message.get_field_by_name("v1").unwrap().kind(),
996            prost_reflect::Kind::Int64
997        );
998        assert_matches!(
999            t1_message.get_field_by_name("v2").unwrap().kind(),
1000            prost_reflect::Kind::Double
1001        );
1002        assert_matches!(
1003            t1_message.get_field_by_name("v3").unwrap().kind(),
1004            prost_reflect::Kind::Message(_)
1005        );
1006
1007        let v3_message = pool.get_message_by_name("t1.Structv3").unwrap();
1008        assert_matches!(
1009            v3_message.get_field_by_name("v1").unwrap().kind(),
1010            prost_reflect::Kind::Int64
1011        );
1012        assert!(v3_message.get_field_by_name("v1").unwrap().is_list());
1013
1014        let v3_v3_message = pool.get_message_by_name("t1.Structv3.Structv3").unwrap();
1015        assert_matches!(
1016            v3_v3_message.get_field_by_name("v1").unwrap().kind(),
1017            prost_reflect::Kind::Int64
1018        );
1019        assert_matches!(
1020            v3_v3_message.get_field_by_name("v2").unwrap().kind(),
1021            prost_reflect::Kind::Int64
1022        );
1023    }
1024}