1use core::pin::Pin;
16use core::time::Duration;
17use std::collections::{BTreeMap, HashMap, VecDeque};
18
19use anyhow::{Context, anyhow};
20use base64::Engine;
21use base64::prelude::BASE64_STANDARD;
22use futures::future::pending;
23use futures::prelude::Future;
24use futures::{Stream, StreamExt};
25use futures_async_stream::try_stream;
26use gcp_bigquery_client::Client;
27use gcp_bigquery_client::error::BQError;
28use gcp_bigquery_client::model::query_request::QueryRequest;
29use gcp_bigquery_client::model::table::Table;
30use gcp_bigquery_client::model::table_field_schema::TableFieldSchema;
31use gcp_bigquery_client::model::table_schema::TableSchema;
32use google_cloud_bigquery::grpc::apiv1::conn_pool::ConnectionManager;
33use google_cloud_gax::conn::{ConnectionOptions, Environment};
34use google_cloud_gax::grpc::Request;
35use google_cloud_googleapis::cloud::bigquery::storage::v1::append_rows_request::{
36 MissingValueInterpretation, ProtoData, Rows as AppendRowsRequestRows,
37};
38use google_cloud_googleapis::cloud::bigquery::storage::v1::{
39 AppendRowsRequest, AppendRowsResponse, ProtoRows, ProtoSchema,
40};
41use google_cloud_pubsub::client::google_cloud_auth;
42use google_cloud_pubsub::client::google_cloud_auth::credentials::CredentialsFile;
43use phf::{Set, phf_set};
44use prost_reflect::{FieldDescriptor, MessageDescriptor};
45use prost_types::{
46 DescriptorProto, FieldDescriptorProto, FileDescriptorProto, FileDescriptorSet,
47 field_descriptor_proto,
48};
49use risingwave_common::array::{Op, StreamChunk};
50use risingwave_common::catalog::{Field, Schema};
51use risingwave_common::types::DataType;
52use serde::Deserialize;
53use serde_with::{DisplayFromStr, serde_as};
54use simd_json::prelude::ArrayTrait;
55use tokio::sync::mpsc;
56use tonic::{Response, Status, async_trait};
57use url::Url;
58use uuid::Uuid;
59use with_options::WithOptions;
60use yup_oauth2::ServiceAccountKey;
61
62use super::encoder::{ProtoEncoder, ProtoHeader, RowEncoder, SerTo};
63use super::log_store::{LogStoreReadItem, TruncateOffset};
64use super::{
65 LogSinker, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, SinkError, SinkLogReader,
66};
67use crate::aws_utils::load_file_descriptor_from_s3;
68use crate::connector_common::AwsAuthProps;
69use crate::enforce_secret::EnforceSecret;
70use crate::sink::{Result, Sink, SinkParam, SinkWriterParam};
71
72pub const BIGQUERY_SINK: &str = "bigquery";
73pub const CHANGE_TYPE: &str = "_CHANGE_TYPE";
74const DEFAULT_GRPC_CHANNEL_NUMS: usize = 4;
75const CONNECT_TIMEOUT: Option<Duration> = Some(Duration::from_secs(30));
76const CONNECTION_TIMEOUT: Option<Duration> = None;
77const BIGQUERY_SEND_FUTURE_BUFFER_MAX_SIZE: usize = 65536;
78const MAX_ROW_SIZE: usize = 8 * 1024 * 1024;
80
81#[serde_as]
82#[derive(Deserialize, Debug, Clone, WithOptions)]
83pub struct BigQueryCommon {
84 #[serde(rename = "bigquery.local.path")]
85 pub local_path: Option<String>,
86 #[serde(rename = "bigquery.s3.path")]
87 pub s3_path: Option<String>,
88 #[serde(rename = "bigquery.project")]
89 pub project: String,
90 #[serde(rename = "bigquery.dataset")]
91 pub dataset: String,
92 #[serde(rename = "bigquery.table")]
93 pub table: String,
94 #[serde(default)] #[serde_as(as = "DisplayFromStr")]
96 pub auto_create: bool,
97 #[serde(rename = "bigquery.credentials")]
98 pub credentials: Option<String>,
99}
100
101impl EnforceSecret for BigQueryCommon {
102 const ENFORCE_SECRET_PROPERTIES: Set<&'static str> = phf_set! {
103 "bigquery.credentials",
104 };
105}
106
107struct BigQueryFutureManager {
108 offset_queue: VecDeque<(TruncateOffset, usize)>,
115 resp_stream: Pin<Box<dyn Stream<Item = Result<()>> + Send>>,
116}
117impl BigQueryFutureManager {
118 pub fn new(
119 max_future_num: usize,
120 resp_stream: impl Stream<Item = Result<()>> + Send + 'static,
121 ) -> Self {
122 let offset_queue = VecDeque::with_capacity(max_future_num);
123 Self {
124 offset_queue,
125 resp_stream: Box::pin(resp_stream),
126 }
127 }
128
129 pub fn add_offset(&mut self, offset: TruncateOffset, resp_num: usize) {
130 self.offset_queue.push_back((offset, resp_num));
131 }
132
133 pub async fn next_offset(&mut self) -> Result<TruncateOffset> {
134 if let Some((_offset, remaining_resp_num)) = self.offset_queue.front_mut() {
135 if *remaining_resp_num == 0 {
136 return Ok(self.offset_queue.pop_front().unwrap().0);
137 }
138 while *remaining_resp_num > 0 {
139 self.resp_stream
140 .next()
141 .await
142 .ok_or_else(|| SinkError::BigQuery(anyhow::anyhow!("end of stream")))??;
143 *remaining_resp_num -= 1;
144 }
145 Ok(self.offset_queue.pop_front().unwrap().0)
146 } else {
147 pending().await
148 }
149 }
150}
151pub struct BigQueryLogSinker {
152 writer: BigQuerySinkWriter,
153 bigquery_future_manager: BigQueryFutureManager,
154 future_num: usize,
155}
156impl BigQueryLogSinker {
157 pub fn new(
158 writer: BigQuerySinkWriter,
159 resp_stream: impl Stream<Item = Result<()>> + Send + 'static,
160 future_num: usize,
161 ) -> Self {
162 Self {
163 writer,
164 bigquery_future_manager: BigQueryFutureManager::new(future_num, resp_stream),
165 future_num,
166 }
167 }
168}
169
170#[async_trait]
171impl LogSinker for BigQueryLogSinker {
172 async fn consume_log_and_sink(mut self, mut log_reader: impl SinkLogReader) -> Result<!> {
173 log_reader.start_from(None).await?;
174 loop {
175 tokio::select!(
176 offset = self.bigquery_future_manager.next_offset() => {
177 log_reader.truncate(offset?)?;
178 }
179 item_result = log_reader.next_item(), if self.bigquery_future_manager.offset_queue.len() <= self.future_num => {
180 let (epoch, item) = item_result?;
181 match item {
182 LogStoreReadItem::StreamChunk { chunk_id, chunk } => {
183 let resp_num = self.writer.write_chunk(chunk)?;
184 self.bigquery_future_manager
185 .add_offset(TruncateOffset::Chunk { epoch, chunk_id },resp_num);
186 }
187 LogStoreReadItem::Barrier { .. } => {
188 self.bigquery_future_manager
189 .add_offset(TruncateOffset::Barrier { epoch },0);
190 }
191 }
192 }
193 )
194 }
195 }
196}
197
198impl BigQueryCommon {
199 async fn build_client(&self, aws_auth_props: &AwsAuthProps) -> Result<Client> {
200 let auth_json = self.get_auth_json_from_path(aws_auth_props).await?;
201
202 let service_account =
203 if let Ok(auth_json_from_base64) = BASE64_STANDARD.decode(auth_json.clone()) {
204 serde_json::from_slice::<ServiceAccountKey>(&auth_json_from_base64)
205 } else {
206 serde_json::from_str::<ServiceAccountKey>(&auth_json)
207 }
208 .map_err(|e| SinkError::BigQuery(e.into()))?;
209
210 let client: Client = Client::from_service_account_key(service_account, false)
211 .await
212 .map_err(|err| SinkError::BigQuery(anyhow::anyhow!(err)))?;
213 Ok(client)
214 }
215
216 async fn build_writer_client(
217 &self,
218 aws_auth_props: &AwsAuthProps,
219 ) -> Result<(StorageWriterClient, impl Stream<Item = Result<()>> + use<>)> {
220 let auth_json = self.get_auth_json_from_path(aws_auth_props).await?;
221
222 let credentials_file =
223 if let Ok(auth_json_from_base64) = BASE64_STANDARD.decode(auth_json.clone()) {
224 serde_json::from_slice::<CredentialsFile>(&auth_json_from_base64)
225 } else {
226 serde_json::from_str::<CredentialsFile>(&auth_json)
227 }
228 .map_err(|e| SinkError::BigQuery(e.into()))?;
229
230 StorageWriterClient::new(credentials_file).await
231 }
232
233 async fn get_auth_json_from_path(&self, aws_auth_props: &AwsAuthProps) -> Result<String> {
234 if let Some(credentials) = &self.credentials {
235 Ok(credentials.clone())
236 } else if let Some(local_path) = &self.local_path {
237 std::fs::read_to_string(local_path)
238 .map_err(|err| SinkError::BigQuery(anyhow::anyhow!(err)))
239 } else if let Some(s3_path) = &self.s3_path {
240 let url =
241 Url::parse(s3_path).map_err(|err| SinkError::BigQuery(anyhow::anyhow!(err)))?;
242 let auth_vec = load_file_descriptor_from_s3(&url, aws_auth_props)
243 .await
244 .map_err(|err| SinkError::BigQuery(anyhow::anyhow!(err)))?;
245 Ok(String::from_utf8(auth_vec).map_err(|e| SinkError::BigQuery(e.into()))?)
246 } else {
247 Err(SinkError::BigQuery(anyhow::anyhow!(
248 "`bigquery.local.path` and `bigquery.s3.path` set at least one, configure as needed."
249 )))
250 }
251 }
252}
253
254#[serde_as]
255#[derive(Clone, Debug, Deserialize, WithOptions)]
256pub struct BigQueryConfig {
257 #[serde(flatten)]
258 pub common: BigQueryCommon,
259 #[serde(flatten)]
260 pub aws_auth_props: AwsAuthProps,
261 pub r#type: String, }
263
264impl EnforceSecret for BigQueryConfig {
265 fn enforce_one(prop: &str) -> crate::error::ConnectorResult<()> {
266 BigQueryCommon::enforce_one(prop)?;
267 AwsAuthProps::enforce_one(prop)?;
268 Ok(())
269 }
270}
271
272impl BigQueryConfig {
273 pub fn from_btreemap(properties: BTreeMap<String, String>) -> Result<Self> {
274 let config =
275 serde_json::from_value::<BigQueryConfig>(serde_json::to_value(properties).unwrap())
276 .map_err(|e| SinkError::Config(anyhow!(e)))?;
277 if config.r#type != SINK_TYPE_APPEND_ONLY && config.r#type != SINK_TYPE_UPSERT {
278 return Err(SinkError::Config(anyhow!(
279 "`{}` must be {}, or {}",
280 SINK_TYPE_OPTION,
281 SINK_TYPE_APPEND_ONLY,
282 SINK_TYPE_UPSERT
283 )));
284 }
285 Ok(config)
286 }
287}
288
289#[derive(Debug)]
290pub struct BigQuerySink {
291 pub config: BigQueryConfig,
292 schema: Schema,
293 pk_indices: Vec<usize>,
294 is_append_only: bool,
295}
296
297impl EnforceSecret for BigQuerySink {
298 fn enforce_secret<'a>(
299 prop_iter: impl Iterator<Item = &'a str>,
300 ) -> crate::error::ConnectorResult<()> {
301 for prop in prop_iter {
302 BigQueryConfig::enforce_one(prop)?;
303 }
304 Ok(())
305 }
306}
307
308impl BigQuerySink {
309 pub fn new(
310 config: BigQueryConfig,
311 schema: Schema,
312 pk_indices: Vec<usize>,
313 is_append_only: bool,
314 ) -> Result<Self> {
315 Ok(Self {
316 config,
317 schema,
318 pk_indices,
319 is_append_only,
320 })
321 }
322}
323
324impl BigQuerySink {
325 fn check_column_name_and_type(
326 &self,
327 big_query_columns_desc: HashMap<String, String>,
328 ) -> Result<()> {
329 let rw_fields_name = self.schema.fields();
330 if big_query_columns_desc.is_empty() {
331 return Err(SinkError::BigQuery(anyhow::anyhow!(
332 "Cannot find table in bigquery"
333 )));
334 }
335 if rw_fields_name.len().ne(&big_query_columns_desc.len()) {
336 return Err(SinkError::BigQuery(anyhow::anyhow!(
337 "The length of the RisingWave column {} must be equal to the length of the bigquery column {}",
338 rw_fields_name.len(),
339 big_query_columns_desc.len()
340 )));
341 }
342
343 for i in rw_fields_name {
344 let value = big_query_columns_desc.get(&i.name).ok_or_else(|| {
345 SinkError::BigQuery(anyhow::anyhow!(
346 "Column `{:?}` on RisingWave side is not found on BigQuery side.",
347 i.name
348 ))
349 })?;
350 let data_type_string = Self::get_string_and_check_support_from_datatype(&i.data_type)?;
351 if data_type_string.ne(value) {
352 return Err(SinkError::BigQuery(anyhow::anyhow!(
353 "Data type mismatch for column `{:?}`. BigQuery side: `{:?}`, RisingWave side: `{:?}`. ",
354 i.name,
355 value,
356 data_type_string
357 )));
358 };
359 }
360 Ok(())
361 }
362
363 fn get_string_and_check_support_from_datatype(rw_data_type: &DataType) -> Result<String> {
364 match rw_data_type {
365 DataType::Boolean => Ok("BOOL".to_owned()),
366 DataType::Int16 => Ok("INT64".to_owned()),
367 DataType::Int32 => Ok("INT64".to_owned()),
368 DataType::Int64 => Ok("INT64".to_owned()),
369 DataType::Float32 => Err(SinkError::BigQuery(anyhow::anyhow!(
370 "REAL is not supported for BigQuery sink. Please convert to FLOAT64 or other supported types."
371 ))),
372 DataType::Float64 => Ok("FLOAT64".to_owned()),
373 DataType::Decimal => Ok("NUMERIC".to_owned()),
374 DataType::Date => Ok("DATE".to_owned()),
375 DataType::Varchar => Ok("STRING".to_owned()),
376 DataType::Time => Ok("TIME".to_owned()),
377 DataType::Timestamp => Ok("DATETIME".to_owned()),
378 DataType::Timestamptz => Ok("TIMESTAMP".to_owned()),
379 DataType::Interval => Ok("INTERVAL".to_owned()),
380 DataType::Struct(structs) => {
381 let mut elements_vec = vec![];
382 for (name, datatype) in structs.iter() {
383 let element_string =
384 Self::get_string_and_check_support_from_datatype(datatype)?;
385 elements_vec.push(format!("{} {}", name, element_string));
386 }
387 Ok(format!("STRUCT<{}>", elements_vec.join(", ")))
388 }
389 DataType::List(l) => {
390 let element_string = Self::get_string_and_check_support_from_datatype(l.elem())?;
391 Ok(format!("ARRAY<{}>", element_string))
392 }
393 DataType::Bytea => Ok("BYTES".to_owned()),
394 DataType::Jsonb => Ok("JSON".to_owned()),
395 DataType::Serial => Ok("INT64".to_owned()),
396 DataType::Int256 => Err(SinkError::BigQuery(anyhow::anyhow!(
397 "INT256 is not supported for BigQuery sink."
398 ))),
399 DataType::Map(_) => Err(SinkError::BigQuery(anyhow::anyhow!(
400 "MAP is not supported for BigQuery sink."
401 ))),
402 DataType::Vector(_) => Err(SinkError::BigQuery(anyhow::anyhow!(
403 "VECTOR is not supported for BigQuery sink."
404 ))),
405 }
406 }
407
408 fn map_field(rw_field: &Field) -> Result<TableFieldSchema> {
409 let tfs = match &rw_field.data_type {
410 DataType::Boolean => TableFieldSchema::bool(&rw_field.name),
411 DataType::Int16 | DataType::Int32 | DataType::Int64 | DataType::Serial => {
412 TableFieldSchema::integer(&rw_field.name)
413 }
414 DataType::Float32 => {
415 return Err(SinkError::BigQuery(anyhow::anyhow!(
416 "REAL is not supported for BigQuery sink. Please convert to FLOAT64 or other supported types."
417 )));
418 }
419 DataType::Float64 => TableFieldSchema::float(&rw_field.name),
420 DataType::Decimal => TableFieldSchema::numeric(&rw_field.name),
421 DataType::Date => TableFieldSchema::date(&rw_field.name),
422 DataType::Varchar => TableFieldSchema::string(&rw_field.name),
423 DataType::Time => TableFieldSchema::time(&rw_field.name),
424 DataType::Timestamp => TableFieldSchema::date_time(&rw_field.name),
425 DataType::Timestamptz => TableFieldSchema::timestamp(&rw_field.name),
426 DataType::Interval => {
427 return Err(SinkError::BigQuery(anyhow::anyhow!(
428 "INTERVAL is not supported for BigQuery sink. Please convert to VARCHAR or other supported types."
429 )));
430 }
431 DataType::Struct(st) => {
432 let mut sub_fields = Vec::with_capacity(st.len());
433 for (name, dt) in st.iter() {
434 let rw_field = Field::with_name(dt.clone(), name);
435 let field = Self::map_field(&rw_field)?;
436 sub_fields.push(field);
437 }
438 TableFieldSchema::record(&rw_field.name, sub_fields)
439 }
440 DataType::List(lt) => {
441 let inner_field =
442 Self::map_field(&Field::with_name(lt.elem().clone(), &rw_field.name))?;
443 TableFieldSchema {
444 mode: Some("REPEATED".to_owned()),
445 ..inner_field
446 }
447 }
448
449 DataType::Bytea => TableFieldSchema::bytes(&rw_field.name),
450 DataType::Jsonb => TableFieldSchema::json(&rw_field.name),
451 DataType::Int256 => {
452 return Err(SinkError::BigQuery(anyhow::anyhow!(
453 "INT256 is not supported for BigQuery sink."
454 )));
455 }
456 DataType::Map(_) => {
457 return Err(SinkError::BigQuery(anyhow::anyhow!(
458 "MAP is not supported for BigQuery sink."
459 )));
460 }
461 DataType::Vector(_) => {
462 return Err(SinkError::BigQuery(anyhow::anyhow!(
463 "VECTOR is not supported for BigQuery sink."
464 )));
465 }
466 };
467 Ok(tfs)
468 }
469
470 async fn create_table(
471 &self,
472 client: &Client,
473 project_id: &str,
474 dataset_id: &str,
475 table_id: &str,
476 fields: &Vec<Field>,
477 ) -> Result<Table> {
478 let dataset = client
479 .dataset()
480 .get(project_id, dataset_id)
481 .await
482 .map_err(|e| SinkError::BigQuery(e.into()))?;
483 let fields: Vec<_> = fields.iter().map(Self::map_field).collect::<Result<_>>()?;
484 let table = Table::from_dataset(&dataset, table_id, TableSchema::new(fields));
485
486 client
487 .table()
488 .create(table)
489 .await
490 .map_err(|e| SinkError::BigQuery(e.into()))
491 }
492}
493
494impl Sink for BigQuerySink {
495 type LogSinker = BigQueryLogSinker;
496
497 const SINK_NAME: &'static str = BIGQUERY_SINK;
498
499 async fn new_log_sinker(&self, _writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
500 let (writer, resp_stream) = BigQuerySinkWriter::new(
501 self.config.clone(),
502 self.schema.clone(),
503 self.pk_indices.clone(),
504 self.is_append_only,
505 )
506 .await?;
507 Ok(BigQueryLogSinker::new(
508 writer,
509 resp_stream,
510 BIGQUERY_SEND_FUTURE_BUFFER_MAX_SIZE,
511 ))
512 }
513
514 async fn validate(&self) -> Result<()> {
515 risingwave_common::license::Feature::BigQuerySink
516 .check_available()
517 .map_err(|e| anyhow::anyhow!(e))?;
518 if !self.is_append_only && self.pk_indices.is_empty() {
519 return Err(SinkError::Config(anyhow!(
520 "Primary key not defined for upsert bigquery sink (please define in `primary_key` field)"
521 )));
522 }
523 let client = self
524 .config
525 .common
526 .build_client(&self.config.aws_auth_props)
527 .await?;
528 let BigQueryCommon {
529 project: project_id,
530 dataset: dataset_id,
531 table: table_id,
532 ..
533 } = &self.config.common;
534
535 if self.config.common.auto_create {
536 match client
537 .table()
538 .get(project_id, dataset_id, table_id, None)
539 .await
540 {
541 Err(BQError::RequestError(_)) => {
542 return self
544 .create_table(
545 &client,
546 project_id,
547 dataset_id,
548 table_id,
549 &self.schema.fields,
550 )
551 .await
552 .map(|_| ());
553 }
554 Err(e) => return Err(SinkError::BigQuery(e.into())),
555 _ => {}
556 }
557 }
558
559 let mut rs = client
560 .job()
561 .query(
562 &self.config.common.project,
563 QueryRequest::new(format!(
564 "SELECT column_name, data_type FROM `{}.{}.INFORMATION_SCHEMA.COLUMNS` WHERE table_name = '{}'",
565 project_id, dataset_id, table_id,
566 )),
567 ).await.map_err(|e| SinkError::BigQuery(e.into()))?;
568
569 let mut big_query_schema = HashMap::default();
570 while rs.next_row() {
571 big_query_schema.insert(
572 rs.get_string_by_name("column_name")
573 .map_err(|e| SinkError::BigQuery(e.into()))?
574 .ok_or_else(|| {
575 SinkError::BigQuery(anyhow::anyhow!("Cannot find column_name"))
576 })?,
577 rs.get_string_by_name("data_type")
578 .map_err(|e| SinkError::BigQuery(e.into()))?
579 .ok_or_else(|| {
580 SinkError::BigQuery(anyhow::anyhow!("Cannot find column_name"))
581 })?,
582 );
583 }
584
585 self.check_column_name_and_type(big_query_schema)?;
586 Ok(())
587 }
588}
589
590pub struct BigQuerySinkWriter {
591 pub config: BigQueryConfig,
592 #[expect(dead_code)]
593 schema: Schema,
594 #[expect(dead_code)]
595 pk_indices: Vec<usize>,
596 client: StorageWriterClient,
597 is_append_only: bool,
598 row_encoder: ProtoEncoder,
599 writer_pb_schema: ProtoSchema,
600 #[expect(dead_code)]
601 message_descriptor: MessageDescriptor,
602 write_stream: String,
603 proto_field: Option<FieldDescriptor>,
604}
605
606impl TryFrom<SinkParam> for BigQuerySink {
607 type Error = SinkError;
608
609 fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
610 let schema = param.schema();
611 let pk_indices = param.downstream_pk_or_empty();
612 let config = BigQueryConfig::from_btreemap(param.properties)?;
613 BigQuerySink::new(config, schema, pk_indices, param.sink_type.is_append_only())
614 }
615}
616
617impl BigQuerySinkWriter {
618 pub async fn new(
619 config: BigQueryConfig,
620 schema: Schema,
621 pk_indices: Vec<usize>,
622 is_append_only: bool,
623 ) -> Result<(Self, impl Stream<Item = Result<()>>)> {
624 let (client, resp_stream) = config
625 .common
626 .build_writer_client(&config.aws_auth_props)
627 .await?;
628 let mut descriptor_proto = build_protobuf_schema(
629 schema
630 .fields()
631 .iter()
632 .map(|f| (f.name.as_str(), &f.data_type)),
633 config.common.table.clone(),
634 )?;
635
636 if !is_append_only {
637 let field = FieldDescriptorProto {
638 name: Some(CHANGE_TYPE.to_owned()),
639 number: Some((schema.len() + 1) as i32),
640 r#type: Some(field_descriptor_proto::Type::String.into()),
641 ..Default::default()
642 };
643 descriptor_proto.field.push(field);
644 }
645
646 let descriptor_pool = build_protobuf_descriptor_pool(&descriptor_proto)?;
647 let message_descriptor = descriptor_pool
648 .get_message_by_name(&config.common.table)
649 .ok_or_else(|| {
650 SinkError::BigQuery(anyhow::anyhow!(
651 "Can't find message proto {}",
652 &config.common.table
653 ))
654 })?;
655 let proto_field = if !is_append_only {
656 let proto_field = message_descriptor
657 .get_field_by_name(CHANGE_TYPE)
658 .ok_or_else(|| {
659 SinkError::BigQuery(anyhow::anyhow!("Can't find {}", CHANGE_TYPE))
660 })?;
661 Some(proto_field)
662 } else {
663 None
664 };
665 let row_encoder = ProtoEncoder::new(
666 schema.clone(),
667 None,
668 message_descriptor.clone(),
669 ProtoHeader::None,
670 )?;
671 Ok((
672 Self {
673 write_stream: format!(
674 "projects/{}/datasets/{}/tables/{}/streams/_default",
675 config.common.project, config.common.dataset, config.common.table
676 ),
677 config,
678 schema,
679 pk_indices,
680 client,
681 is_append_only,
682 row_encoder,
683 message_descriptor,
684 proto_field,
685 writer_pb_schema: ProtoSchema {
686 proto_descriptor: Some(descriptor_proto),
687 },
688 },
689 resp_stream,
690 ))
691 }
692
693 fn append_only(&mut self, chunk: StreamChunk) -> Result<Vec<Vec<u8>>> {
694 let mut serialized_rows: Vec<Vec<u8>> = Vec::with_capacity(chunk.capacity());
695 for (op, row) in chunk.rows() {
696 if op != Op::Insert {
697 continue;
698 }
699 serialized_rows.push(self.row_encoder.encode(row)?.ser_to()?)
700 }
701 Ok(serialized_rows)
702 }
703
704 fn upsert(&mut self, chunk: StreamChunk) -> Result<Vec<Vec<u8>>> {
705 let mut serialized_rows: Vec<Vec<u8>> = Vec::with_capacity(chunk.capacity());
706 for (op, row) in chunk.rows() {
707 if op == Op::UpdateDelete {
708 continue;
709 }
710 let mut pb_row = self.row_encoder.encode(row)?;
711 match op {
712 Op::Insert => pb_row
713 .message
714 .try_set_field(
715 self.proto_field.as_ref().unwrap(),
716 prost_reflect::Value::String("UPSERT".to_owned()),
717 )
718 .map_err(|e| SinkError::BigQuery(e.into()))?,
719 Op::Delete => pb_row
720 .message
721 .try_set_field(
722 self.proto_field.as_ref().unwrap(),
723 prost_reflect::Value::String("DELETE".to_owned()),
724 )
725 .map_err(|e| SinkError::BigQuery(e.into()))?,
726 Op::UpdateDelete => continue,
727 Op::UpdateInsert => pb_row
728 .message
729 .try_set_field(
730 self.proto_field.as_ref().unwrap(),
731 prost_reflect::Value::String("UPSERT".to_owned()),
732 )
733 .map_err(|e| SinkError::BigQuery(e.into()))?,
734 };
735
736 serialized_rows.push(pb_row.ser_to()?)
737 }
738 Ok(serialized_rows)
739 }
740
741 fn write_chunk(&mut self, chunk: StreamChunk) -> Result<usize> {
742 let serialized_rows = if self.is_append_only {
743 self.append_only(chunk)?
744 } else {
745 self.upsert(chunk)?
746 };
747 if serialized_rows.is_empty() {
748 return Ok(0);
749 }
750 let mut result = Vec::new();
751 let mut result_inner = Vec::new();
752 let mut size_count = 0;
753 for i in serialized_rows {
754 size_count += i.len();
755 if size_count > MAX_ROW_SIZE {
756 result.push(result_inner);
757 result_inner = Vec::new();
758 size_count = i.len();
759 }
760 result_inner.push(i);
761 }
762 if !result_inner.is_empty() {
763 result.push(result_inner);
764 }
765 let len = result.len();
766 for serialized_rows in result {
767 let rows = AppendRowsRequestRows::ProtoRows(ProtoData {
768 writer_schema: Some(self.writer_pb_schema.clone()),
769 rows: Some(ProtoRows { serialized_rows }),
770 });
771 self.client.append_rows(rows, self.write_stream.clone())?;
772 }
773 Ok(len)
774 }
775}
776
777#[try_stream(ok = (), error = SinkError)]
778pub async fn resp_to_stream(
779 resp_stream: impl Future<
780 Output = std::result::Result<
781 Response<google_cloud_gax::grpc::Streaming<AppendRowsResponse>>,
782 Status,
783 >,
784 >
785 + 'static
786 + Send,
787) {
788 let mut resp_stream = resp_stream
789 .await
790 .map_err(|e| SinkError::BigQuery(e.into()))?
791 .into_inner();
792 loop {
793 match resp_stream
794 .message()
795 .await
796 .map_err(|e| SinkError::BigQuery(e.into()))?
797 {
798 Some(append_rows_response) => {
799 if !append_rows_response.row_errors.is_empty() {
800 return Err(SinkError::BigQuery(anyhow::anyhow!(
801 "bigquery insert error {:?}",
802 append_rows_response.row_errors
803 )));
804 }
805 if let Some(google_cloud_googleapis::cloud::bigquery::storage::v1::append_rows_response::Response::Error(status)) = append_rows_response.response{
806 return Err(SinkError::BigQuery(anyhow::anyhow!(
807 "bigquery insert error {:?}",
808 status
809 )));
810 }
811 yield ();
812 }
813 None => {
814 return Err(SinkError::BigQuery(anyhow::anyhow!(
815 "bigquery insert error: end of resp stream",
816 )));
817 }
818 }
819 }
820}
821
822struct StorageWriterClient {
823 #[expect(dead_code)]
824 environment: Environment,
825 request_sender: mpsc::UnboundedSender<AppendRowsRequest>,
826}
827impl StorageWriterClient {
828 pub async fn new(
829 credentials: CredentialsFile,
830 ) -> Result<(Self, impl Stream<Item = Result<()>>)> {
831 let ts_grpc = google_cloud_auth::token::DefaultTokenSourceProvider::new_with_credentials(
832 Self::bigquery_grpc_auth_config(),
833 Box::new(credentials),
834 )
835 .await
836 .map_err(|e| SinkError::BigQuery(e.into()))?;
837 let conn_options = ConnectionOptions {
838 connect_timeout: CONNECT_TIMEOUT,
839 timeout: CONNECTION_TIMEOUT,
840 };
841 let environment = Environment::GoogleCloud(Box::new(ts_grpc));
842 let conn = ConnectionManager::new(DEFAULT_GRPC_CHANNEL_NUMS, &environment, &conn_options)
843 .await
844 .map_err(|e| SinkError::BigQuery(e.into()))?;
845 let mut client = conn.writer();
846
847 let (tx, rx) = mpsc::unbounded_channel();
848 let stream = tokio_stream::wrappers::UnboundedReceiverStream::new(rx);
849
850 let resp = async move { client.append_rows(Request::new(stream)).await };
851 let resp_stream = resp_to_stream(resp);
852
853 Ok((
854 StorageWriterClient {
855 environment,
856 request_sender: tx,
857 },
858 resp_stream,
859 ))
860 }
861
862 pub fn append_rows(&mut self, row: AppendRowsRequestRows, write_stream: String) -> Result<()> {
863 let append_req = AppendRowsRequest {
864 write_stream,
865 offset: None,
866 trace_id: Uuid::new_v4().hyphenated().to_string(),
867 missing_value_interpretations: HashMap::default(),
868 rows: Some(row),
869 default_missing_value_interpretation: MissingValueInterpretation::DefaultValue as i32,
870 };
871 self.request_sender
872 .send(append_req)
873 .map_err(|e| SinkError::BigQuery(e.into()))?;
874 Ok(())
875 }
876
877 fn bigquery_grpc_auth_config() -> google_cloud_auth::project::Config<'static> {
878 let mut auth_config = google_cloud_auth::project::Config::default();
879 auth_config =
880 auth_config.with_audience(google_cloud_bigquery::grpc::apiv1::conn_pool::AUDIENCE);
881 auth_config =
882 auth_config.with_scopes(&google_cloud_bigquery::grpc::apiv1::conn_pool::SCOPES);
883 auth_config
884 }
885}
886
887fn build_protobuf_descriptor_pool(desc: &DescriptorProto) -> Result<prost_reflect::DescriptorPool> {
888 let file_descriptor = FileDescriptorProto {
889 message_type: vec![desc.clone()],
890 name: Some("bigquery".to_owned()),
891 ..Default::default()
892 };
893
894 prost_reflect::DescriptorPool::from_file_descriptor_set(FileDescriptorSet {
895 file: vec![file_descriptor],
896 })
897 .context("failed to build descriptor pool")
898 .map_err(SinkError::BigQuery)
899}
900
901fn build_protobuf_schema<'a>(
902 fields: impl Iterator<Item = (&'a str, &'a DataType)>,
903 name: String,
904) -> Result<DescriptorProto> {
905 let mut proto = DescriptorProto {
906 name: Some(name),
907 ..Default::default()
908 };
909 let mut struct_vec = vec![];
910 let field_vec = fields
911 .enumerate()
912 .map(|(index, (name, data_type))| {
913 let (field, des_proto) =
914 build_protobuf_field(data_type, (index + 1) as i32, name.to_owned())?;
915 if let Some(sv) = des_proto {
916 struct_vec.push(sv);
917 }
918 Ok(field)
919 })
920 .collect::<Result<Vec<_>>>()?;
921 proto.field = field_vec;
922 proto.nested_type = struct_vec;
923 Ok(proto)
924}
925
926fn build_protobuf_field(
927 data_type: &DataType,
928 index: i32,
929 name: String,
930) -> Result<(FieldDescriptorProto, Option<DescriptorProto>)> {
931 let mut field = FieldDescriptorProto {
932 name: Some(name.clone()),
933 number: Some(index),
934 ..Default::default()
935 };
936 match data_type {
937 DataType::Boolean => field.r#type = Some(field_descriptor_proto::Type::Bool.into()),
938 DataType::Int32 => field.r#type = Some(field_descriptor_proto::Type::Int32.into()),
939 DataType::Int16 | DataType::Int64 => {
940 field.r#type = Some(field_descriptor_proto::Type::Int64.into())
941 }
942 DataType::Float64 => field.r#type = Some(field_descriptor_proto::Type::Double.into()),
943 DataType::Decimal => field.r#type = Some(field_descriptor_proto::Type::String.into()),
944 DataType::Date => field.r#type = Some(field_descriptor_proto::Type::Int32.into()),
945 DataType::Varchar => field.r#type = Some(field_descriptor_proto::Type::String.into()),
946 DataType::Time => field.r#type = Some(field_descriptor_proto::Type::String.into()),
947 DataType::Timestamp => field.r#type = Some(field_descriptor_proto::Type::String.into()),
948 DataType::Timestamptz => field.r#type = Some(field_descriptor_proto::Type::String.into()),
949 DataType::Interval => field.r#type = Some(field_descriptor_proto::Type::String.into()),
950 DataType::Struct(s) => {
951 field.r#type = Some(field_descriptor_proto::Type::Message.into());
952 let name = format!("Struct{}", name);
953 let sub_proto = build_protobuf_schema(s.iter(), name.clone())?;
954 field.type_name = Some(name);
955 return Ok((field, Some(sub_proto)));
956 }
957 DataType::List(l) => {
958 let (mut field, proto) = build_protobuf_field(l.elem(), index, name)?;
959 field.label = Some(field_descriptor_proto::Label::Repeated.into());
960 return Ok((field, proto));
961 }
962 DataType::Bytea => field.r#type = Some(field_descriptor_proto::Type::Bytes.into()),
963 DataType::Jsonb => field.r#type = Some(field_descriptor_proto::Type::String.into()),
964 DataType::Serial => field.r#type = Some(field_descriptor_proto::Type::Int64.into()),
965 DataType::Float32 | DataType::Int256 => {
966 return Err(SinkError::BigQuery(anyhow::anyhow!(
967 "Don't support Float32 and Int256"
968 )));
969 }
970 DataType::Map(_) => return Err(SinkError::BigQuery(anyhow::anyhow!("Don't support Map"))),
971 DataType::Vector(_) => {
972 return Err(SinkError::BigQuery(anyhow::anyhow!("Don't support Vector")));
973 }
974 }
975 Ok((field, None))
976}
977
978#[cfg(test)]
979mod test {
980
981 use std::assert_matches::assert_matches;
982
983 use risingwave_common::catalog::{Field, Schema};
984 use risingwave_common::types::{DataType, StructType};
985
986 use crate::sink::big_query::{
987 BigQuerySink, build_protobuf_descriptor_pool, build_protobuf_schema,
988 };
989
990 #[tokio::test]
991 async fn test_type_check() {
992 let big_query_type_string = "ARRAY<STRUCT<v1 ARRAY<INT64>, v2 STRUCT<v1 INT64, v2 INT64>>>";
993 let rw_datatype = DataType::list(DataType::Struct(StructType::new(vec![
994 ("v1".to_owned(), DataType::Int64.list()),
995 (
996 "v2".to_owned(),
997 DataType::Struct(StructType::new(vec![
998 ("v1".to_owned(), DataType::Int64),
999 ("v2".to_owned(), DataType::Int64),
1000 ])),
1001 ),
1002 ])));
1003 assert_eq!(
1004 BigQuerySink::get_string_and_check_support_from_datatype(&rw_datatype).unwrap(),
1005 big_query_type_string
1006 );
1007 }
1008
1009 #[tokio::test]
1010 async fn test_schema_check() {
1011 let schema = Schema {
1012 fields: vec![
1013 Field::with_name(DataType::Int64, "v1"),
1014 Field::with_name(DataType::Float64, "v2"),
1015 Field::with_name(
1016 DataType::list(DataType::Struct(StructType::new(vec![
1017 ("v1".to_owned(), DataType::Int64.list()),
1018 (
1019 "v3".to_owned(),
1020 DataType::Struct(StructType::new(vec![
1021 ("v1".to_owned(), DataType::Int64),
1022 ("v2".to_owned(), DataType::Int64),
1023 ])),
1024 ),
1025 ]))),
1026 "v3",
1027 ),
1028 ],
1029 };
1030 let fields = schema
1031 .fields()
1032 .iter()
1033 .map(|f| (f.name.as_str(), &f.data_type));
1034 let desc = build_protobuf_schema(fields, "t1".to_owned()).unwrap();
1035 let pool = build_protobuf_descriptor_pool(&desc).unwrap();
1036 let t1_message = pool.get_message_by_name("t1").unwrap();
1037 assert_matches!(
1038 t1_message.get_field_by_name("v1").unwrap().kind(),
1039 prost_reflect::Kind::Int64
1040 );
1041 assert_matches!(
1042 t1_message.get_field_by_name("v2").unwrap().kind(),
1043 prost_reflect::Kind::Double
1044 );
1045 assert_matches!(
1046 t1_message.get_field_by_name("v3").unwrap().kind(),
1047 prost_reflect::Kind::Message(_)
1048 );
1049
1050 let v3_message = pool.get_message_by_name("t1.Structv3").unwrap();
1051 assert_matches!(
1052 v3_message.get_field_by_name("v1").unwrap().kind(),
1053 prost_reflect::Kind::Int64
1054 );
1055 assert!(v3_message.get_field_by_name("v1").unwrap().is_list());
1056
1057 let v3_v3_message = pool.get_message_by_name("t1.Structv3.Structv3").unwrap();
1058 assert_matches!(
1059 v3_v3_message.get_field_by_name("v1").unwrap().kind(),
1060 prost_reflect::Kind::Int64
1061 );
1062 assert_matches!(
1063 v3_v3_message.get_field_by_name("v2").unwrap().kind(),
1064 prost_reflect::Kind::Int64
1065 );
1066 }
1067}