1use core::pin::Pin;
16use core::time::Duration;
17use std::collections::{BTreeMap, HashMap, VecDeque};
18
19use anyhow::{Context, anyhow};
20use base64::Engine;
21use base64::prelude::BASE64_STANDARD;
22use futures::future::pending;
23use futures::prelude::Future;
24use futures::{Stream, StreamExt};
25use futures_async_stream::try_stream;
26use gcp_bigquery_client::Client;
27use gcp_bigquery_client::error::BQError;
28use gcp_bigquery_client::model::query_request::QueryRequest;
29use gcp_bigquery_client::model::table::Table;
30use gcp_bigquery_client::model::table_field_schema::TableFieldSchema;
31use gcp_bigquery_client::model::table_schema::TableSchema;
32use google_cloud_bigquery::grpc::apiv1::conn_pool::ConnectionManager;
33use google_cloud_gax::conn::{ConnectionOptions, Environment};
34use google_cloud_gax::grpc::Request;
35use google_cloud_googleapis::cloud::bigquery::storage::v1::append_rows_request::{
36 MissingValueInterpretation, ProtoData, Rows as AppendRowsRequestRows,
37};
38use google_cloud_googleapis::cloud::bigquery::storage::v1::{
39 AppendRowsRequest, AppendRowsResponse, ProtoRows, ProtoSchema,
40};
41use google_cloud_pubsub::client::google_cloud_auth;
42use google_cloud_pubsub::client::google_cloud_auth::credentials::CredentialsFile;
43use phf::{Set, phf_set};
44use prost_reflect::{FieldDescriptor, MessageDescriptor};
45use prost_types::{
46 DescriptorProto, FieldDescriptorProto, FileDescriptorProto, FileDescriptorSet,
47 field_descriptor_proto,
48};
49use risingwave_common::array::{Op, StreamChunk};
50use risingwave_common::catalog::{Field, Schema};
51use risingwave_common::types::DataType;
52use serde::Deserialize;
53use serde_with::{DisplayFromStr, serde_as};
54use simd_json::prelude::ArrayTrait;
55use tokio::sync::mpsc;
56use tonic::{Response, Status, async_trait};
57use url::Url;
58use uuid::Uuid;
59use with_options::WithOptions;
60use yup_oauth2::ServiceAccountKey;
61
62use super::encoder::{ProtoEncoder, ProtoHeader, RowEncoder, SerTo};
63use super::log_store::{LogStoreReadItem, TruncateOffset};
64use super::{
65 LogSinker, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, SinkError, SinkLogReader,
66};
67use crate::aws_utils::load_file_descriptor_from_s3;
68use crate::connector_common::AwsAuthProps;
69use crate::enforce_secret::EnforceSecret;
70use crate::sink::{Result, Sink, SinkParam, SinkWriterParam};
71
72pub const BIGQUERY_SINK: &str = "bigquery";
73pub const CHANGE_TYPE: &str = "_CHANGE_TYPE";
74const DEFAULT_GRPC_CHANNEL_NUMS: usize = 4;
75const CONNECT_TIMEOUT: Option<Duration> = Some(Duration::from_secs(30));
76const CONNECTION_TIMEOUT: Option<Duration> = None;
77const BIGQUERY_SEND_FUTURE_BUFFER_MAX_SIZE: usize = 65536;
78const MAX_ROW_SIZE: usize = 8 * 1024 * 1024;
80
81#[serde_as]
82#[derive(Deserialize, Debug, Clone, WithOptions)]
83pub struct BigQueryCommon {
84 #[serde(rename = "bigquery.local.path")]
85 pub local_path: Option<String>,
86 #[serde(rename = "bigquery.s3.path")]
87 pub s3_path: Option<String>,
88 #[serde(rename = "bigquery.project")]
89 pub project: String,
90 #[serde(rename = "bigquery.dataset")]
91 pub dataset: String,
92 #[serde(rename = "bigquery.table")]
93 pub table: String,
94 #[serde(default)] #[serde_as(as = "DisplayFromStr")]
96 pub auto_create: bool,
97 #[serde(rename = "bigquery.credentials")]
98 pub credentials: Option<String>,
99}
100
101impl EnforceSecret for BigQueryCommon {
102 const ENFORCE_SECRET_PROPERTIES: Set<&'static str> = phf_set! {
103 "bigquery.credentials",
104 };
105}
106
107struct BigQueryFutureManager {
108 offset_queue: VecDeque<(TruncateOffset, usize)>,
115 resp_stream: Pin<Box<dyn Stream<Item = Result<()>> + Send>>,
116}
117impl BigQueryFutureManager {
118 pub fn new(
119 max_future_num: usize,
120 resp_stream: impl Stream<Item = Result<()>> + Send + 'static,
121 ) -> Self {
122 let offset_queue = VecDeque::with_capacity(max_future_num);
123 Self {
124 offset_queue,
125 resp_stream: Box::pin(resp_stream),
126 }
127 }
128
129 pub fn add_offset(&mut self, offset: TruncateOffset, resp_num: usize) {
130 self.offset_queue.push_back((offset, resp_num));
131 }
132
133 pub async fn next_offset(&mut self) -> Result<TruncateOffset> {
134 if let Some((_offset, remaining_resp_num)) = self.offset_queue.front_mut() {
135 if *remaining_resp_num == 0 {
136 return Ok(self.offset_queue.pop_front().unwrap().0);
137 }
138 while *remaining_resp_num > 0 {
139 self.resp_stream
140 .next()
141 .await
142 .ok_or_else(|| SinkError::BigQuery(anyhow::anyhow!("end of stream")))??;
143 *remaining_resp_num -= 1;
144 }
145 Ok(self.offset_queue.pop_front().unwrap().0)
146 } else {
147 pending().await
148 }
149 }
150}
151pub struct BigQueryLogSinker {
152 writer: BigQuerySinkWriter,
153 bigquery_future_manager: BigQueryFutureManager,
154 future_num: usize,
155}
156impl BigQueryLogSinker {
157 pub fn new(
158 writer: BigQuerySinkWriter,
159 resp_stream: impl Stream<Item = Result<()>> + Send + 'static,
160 future_num: usize,
161 ) -> Self {
162 Self {
163 writer,
164 bigquery_future_manager: BigQueryFutureManager::new(future_num, resp_stream),
165 future_num,
166 }
167 }
168}
169
170#[async_trait]
171impl LogSinker for BigQueryLogSinker {
172 async fn consume_log_and_sink(mut self, mut log_reader: impl SinkLogReader) -> Result<!> {
173 log_reader.start_from(None).await?;
174 loop {
175 tokio::select!(
176 offset = self.bigquery_future_manager.next_offset() => {
177 log_reader.truncate(offset?)?;
178 }
179 item_result = log_reader.next_item(), if self.bigquery_future_manager.offset_queue.len() <= self.future_num => {
180 let (epoch, item) = item_result?;
181 match item {
182 LogStoreReadItem::StreamChunk { chunk_id, chunk } => {
183 let resp_num = self.writer.write_chunk(chunk)?;
184 self.bigquery_future_manager
185 .add_offset(TruncateOffset::Chunk { epoch, chunk_id },resp_num);
186 }
187 LogStoreReadItem::Barrier { .. } => {
188 self.bigquery_future_manager
189 .add_offset(TruncateOffset::Barrier { epoch },0);
190 }
191 }
192 }
193 )
194 }
195 }
196}
197
198impl BigQueryCommon {
199 async fn build_client(&self, aws_auth_props: &AwsAuthProps) -> Result<Client> {
200 let auth_json = self.get_auth_json_from_path(aws_auth_props).await?;
201
202 let service_account =
203 if let Ok(auth_json_from_base64) = BASE64_STANDARD.decode(auth_json.clone()) {
204 serde_json::from_slice::<ServiceAccountKey>(&auth_json_from_base64)
205 } else {
206 serde_json::from_str::<ServiceAccountKey>(&auth_json)
207 }
208 .map_err(|e| SinkError::BigQuery(e.into()))?;
209
210 let client: Client = Client::from_service_account_key(service_account, false)
211 .await
212 .map_err(|err| SinkError::BigQuery(anyhow::anyhow!(err)))?;
213 Ok(client)
214 }
215
216 async fn build_writer_client(
217 &self,
218 aws_auth_props: &AwsAuthProps,
219 ) -> Result<(StorageWriterClient, impl Stream<Item = Result<()>> + use<>)> {
220 let auth_json = self.get_auth_json_from_path(aws_auth_props).await?;
221
222 let credentials_file =
223 if let Ok(auth_json_from_base64) = BASE64_STANDARD.decode(auth_json.clone()) {
224 serde_json::from_slice::<CredentialsFile>(&auth_json_from_base64)
225 } else {
226 serde_json::from_str::<CredentialsFile>(&auth_json)
227 }
228 .map_err(|e| SinkError::BigQuery(e.into()))?;
229
230 StorageWriterClient::new(credentials_file).await
231 }
232
233 async fn get_auth_json_from_path(&self, aws_auth_props: &AwsAuthProps) -> Result<String> {
234 if let Some(credentials) = &self.credentials {
235 Ok(credentials.clone())
236 } else if let Some(local_path) = &self.local_path {
237 std::fs::read_to_string(local_path)
238 .map_err(|err| SinkError::BigQuery(anyhow::anyhow!(err)))
239 } else if let Some(s3_path) = &self.s3_path {
240 let url =
241 Url::parse(s3_path).map_err(|err| SinkError::BigQuery(anyhow::anyhow!(err)))?;
242 let auth_vec = load_file_descriptor_from_s3(&url, aws_auth_props)
243 .await
244 .map_err(|err| SinkError::BigQuery(anyhow::anyhow!(err)))?;
245 Ok(String::from_utf8(auth_vec).map_err(|e| SinkError::BigQuery(e.into()))?)
246 } else {
247 Err(SinkError::BigQuery(anyhow::anyhow!(
248 "`bigquery.local.path` and `bigquery.s3.path` set at least one, configure as needed."
249 )))
250 }
251 }
252}
253
254#[serde_as]
255#[derive(Clone, Debug, Deserialize, WithOptions)]
256pub struct BigQueryConfig {
257 #[serde(flatten)]
258 pub common: BigQueryCommon,
259 #[serde(flatten)]
260 pub aws_auth_props: AwsAuthProps,
261 pub r#type: String, }
263
264impl EnforceSecret for BigQueryConfig {
265 fn enforce_one(prop: &str) -> crate::error::ConnectorResult<()> {
266 BigQueryCommon::enforce_one(prop)?;
267 AwsAuthProps::enforce_one(prop)?;
268 Ok(())
269 }
270}
271
272impl BigQueryConfig {
273 pub fn from_btreemap(properties: BTreeMap<String, String>) -> Result<Self> {
274 let config =
275 serde_json::from_value::<BigQueryConfig>(serde_json::to_value(properties).unwrap())
276 .map_err(|e| SinkError::Config(anyhow!(e)))?;
277 if config.r#type != SINK_TYPE_APPEND_ONLY && config.r#type != SINK_TYPE_UPSERT {
278 return Err(SinkError::Config(anyhow!(
279 "`{}` must be {}, or {}",
280 SINK_TYPE_OPTION,
281 SINK_TYPE_APPEND_ONLY,
282 SINK_TYPE_UPSERT
283 )));
284 }
285 Ok(config)
286 }
287}
288
289#[derive(Debug)]
290pub struct BigQuerySink {
291 pub config: BigQueryConfig,
292 schema: Schema,
293 pk_indices: Vec<usize>,
294 is_append_only: bool,
295}
296
297impl EnforceSecret for BigQuerySink {
298 fn enforce_secret<'a>(
299 prop_iter: impl Iterator<Item = &'a str>,
300 ) -> crate::error::ConnectorResult<()> {
301 for prop in prop_iter {
302 BigQueryConfig::enforce_one(prop)?;
303 }
304 Ok(())
305 }
306}
307
308impl BigQuerySink {
309 pub fn new(
310 config: BigQueryConfig,
311 schema: Schema,
312 pk_indices: Vec<usize>,
313 is_append_only: bool,
314 ) -> Result<Self> {
315 Ok(Self {
316 config,
317 schema,
318 pk_indices,
319 is_append_only,
320 })
321 }
322}
323
324impl BigQuerySink {
325 fn check_column_name_and_type(
326 &self,
327 big_query_columns_desc: HashMap<String, String>,
328 ) -> Result<()> {
329 let rw_fields_name = self.schema.fields();
330 if big_query_columns_desc.is_empty() {
331 return Err(SinkError::BigQuery(anyhow::anyhow!(
332 "Cannot find table in bigquery"
333 )));
334 }
335 if rw_fields_name.len().ne(&big_query_columns_desc.len()) {
336 return Err(SinkError::BigQuery(anyhow::anyhow!(
337 "The length of the RisingWave column {} must be equal to the length of the bigquery column {}",
338 rw_fields_name.len(),
339 big_query_columns_desc.len()
340 )));
341 }
342
343 for i in rw_fields_name {
344 let value = big_query_columns_desc.get(&i.name).ok_or_else(|| {
345 SinkError::BigQuery(anyhow::anyhow!(
346 "Column `{:?}` on RisingWave side is not found on BigQuery side.",
347 i.name
348 ))
349 })?;
350 let data_type_string = Self::get_string_and_check_support_from_datatype(&i.data_type)?;
351 if data_type_string.ne(value) {
352 return Err(SinkError::BigQuery(anyhow::anyhow!(
353 "Data type mismatch for column `{:?}`. BigQuery side: `{:?}`, RisingWave side: `{:?}`. ",
354 i.name,
355 value,
356 data_type_string
357 )));
358 };
359 }
360 Ok(())
361 }
362
363 fn get_string_and_check_support_from_datatype(rw_data_type: &DataType) -> Result<String> {
364 match rw_data_type {
365 DataType::Boolean => Ok("BOOL".to_owned()),
366 DataType::Int16 => Ok("INT64".to_owned()),
367 DataType::Int32 => Ok("INT64".to_owned()),
368 DataType::Int64 => Ok("INT64".to_owned()),
369 DataType::Float32 => Err(SinkError::BigQuery(anyhow::anyhow!(
370 "REAL is not supported for BigQuery sink. Please convert to FLOAT64 or other supported types."
371 ))),
372 DataType::Float64 => Ok("FLOAT64".to_owned()),
373 DataType::Decimal => Ok("NUMERIC".to_owned()),
374 DataType::Date => Ok("DATE".to_owned()),
375 DataType::Varchar => Ok("STRING".to_owned()),
376 DataType::Time => Ok("TIME".to_owned()),
377 DataType::Timestamp => Ok("DATETIME".to_owned()),
378 DataType::Timestamptz => Ok("TIMESTAMP".to_owned()),
379 DataType::Interval => Ok("INTERVAL".to_owned()),
380 DataType::Struct(structs) => {
381 let mut elements_vec = vec![];
382 for (name, datatype) in structs.iter() {
383 let element_string =
384 Self::get_string_and_check_support_from_datatype(datatype)?;
385 elements_vec.push(format!("{} {}", name, element_string));
386 }
387 Ok(format!("STRUCT<{}>", elements_vec.join(", ")))
388 }
389 DataType::List(l) => {
390 let element_string = Self::get_string_and_check_support_from_datatype(l.as_ref())?;
391 Ok(format!("ARRAY<{}>", element_string))
392 }
393 DataType::Bytea => Ok("BYTES".to_owned()),
394 DataType::Jsonb => Ok("JSON".to_owned()),
395 DataType::Serial => Ok("INT64".to_owned()),
396 DataType::Int256 => Err(SinkError::BigQuery(anyhow::anyhow!(
397 "INT256 is not supported for BigQuery sink."
398 ))),
399 DataType::Map(_) => Err(SinkError::BigQuery(anyhow::anyhow!(
400 "MAP is not supported for BigQuery sink."
401 ))),
402 DataType::Vector(_) => Err(SinkError::BigQuery(anyhow::anyhow!(
403 "VECTOR is not supported for BigQuery sink."
404 ))),
405 }
406 }
407
408 fn map_field(rw_field: &Field) -> Result<TableFieldSchema> {
409 let tfs = match &rw_field.data_type {
410 DataType::Boolean => TableFieldSchema::bool(&rw_field.name),
411 DataType::Int16 | DataType::Int32 | DataType::Int64 | DataType::Serial => {
412 TableFieldSchema::integer(&rw_field.name)
413 }
414 DataType::Float32 => {
415 return Err(SinkError::BigQuery(anyhow::anyhow!(
416 "REAL is not supported for BigQuery sink. Please convert to FLOAT64 or other supported types."
417 )));
418 }
419 DataType::Float64 => TableFieldSchema::float(&rw_field.name),
420 DataType::Decimal => TableFieldSchema::numeric(&rw_field.name),
421 DataType::Date => TableFieldSchema::date(&rw_field.name),
422 DataType::Varchar => TableFieldSchema::string(&rw_field.name),
423 DataType::Time => TableFieldSchema::time(&rw_field.name),
424 DataType::Timestamp => TableFieldSchema::date_time(&rw_field.name),
425 DataType::Timestamptz => TableFieldSchema::timestamp(&rw_field.name),
426 DataType::Interval => {
427 return Err(SinkError::BigQuery(anyhow::anyhow!(
428 "INTERVAL is not supported for BigQuery sink. Please convert to VARCHAR or other supported types."
429 )));
430 }
431 DataType::Struct(st) => {
432 let mut sub_fields = Vec::with_capacity(st.len());
433 for (name, dt) in st.iter() {
434 let rw_field = Field::with_name(dt.clone(), name);
435 let field = Self::map_field(&rw_field)?;
436 sub_fields.push(field);
437 }
438 TableFieldSchema::record(&rw_field.name, sub_fields)
439 }
440 DataType::List(dt) => {
441 let inner_field = Self::map_field(&Field::with_name(*dt.clone(), &rw_field.name))?;
442 TableFieldSchema {
443 mode: Some("REPEATED".to_owned()),
444 ..inner_field
445 }
446 }
447
448 DataType::Bytea => TableFieldSchema::bytes(&rw_field.name),
449 DataType::Jsonb => TableFieldSchema::json(&rw_field.name),
450 DataType::Int256 => {
451 return Err(SinkError::BigQuery(anyhow::anyhow!(
452 "INT256 is not supported for BigQuery sink."
453 )));
454 }
455 DataType::Map(_) => {
456 return Err(SinkError::BigQuery(anyhow::anyhow!(
457 "MAP is not supported for BigQuery sink."
458 )));
459 }
460 DataType::Vector(_) => {
461 return Err(SinkError::BigQuery(anyhow::anyhow!(
462 "VECTOR is not supported for BigQuery sink."
463 )));
464 }
465 };
466 Ok(tfs)
467 }
468
469 async fn create_table(
470 &self,
471 client: &Client,
472 project_id: &str,
473 dataset_id: &str,
474 table_id: &str,
475 fields: &Vec<Field>,
476 ) -> Result<Table> {
477 let dataset = client
478 .dataset()
479 .get(project_id, dataset_id)
480 .await
481 .map_err(|e| SinkError::BigQuery(e.into()))?;
482 let fields: Vec<_> = fields.iter().map(Self::map_field).collect::<Result<_>>()?;
483 let table = Table::from_dataset(&dataset, table_id, TableSchema::new(fields));
484
485 client
486 .table()
487 .create(table)
488 .await
489 .map_err(|e| SinkError::BigQuery(e.into()))
490 }
491}
492
493impl Sink for BigQuerySink {
494 type LogSinker = BigQueryLogSinker;
495
496 const SINK_NAME: &'static str = BIGQUERY_SINK;
497
498 async fn new_log_sinker(&self, _writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
499 let (writer, resp_stream) = BigQuerySinkWriter::new(
500 self.config.clone(),
501 self.schema.clone(),
502 self.pk_indices.clone(),
503 self.is_append_only,
504 )
505 .await?;
506 Ok(BigQueryLogSinker::new(
507 writer,
508 resp_stream,
509 BIGQUERY_SEND_FUTURE_BUFFER_MAX_SIZE,
510 ))
511 }
512
513 async fn validate(&self) -> Result<()> {
514 risingwave_common::license::Feature::BigQuerySink
515 .check_available()
516 .map_err(|e| anyhow::anyhow!(e))?;
517 if !self.is_append_only && self.pk_indices.is_empty() {
518 return Err(SinkError::Config(anyhow!(
519 "Primary key not defined for upsert bigquery sink (please define in `primary_key` field)"
520 )));
521 }
522 let client = self
523 .config
524 .common
525 .build_client(&self.config.aws_auth_props)
526 .await?;
527 let BigQueryCommon {
528 project: project_id,
529 dataset: dataset_id,
530 table: table_id,
531 ..
532 } = &self.config.common;
533
534 if self.config.common.auto_create {
535 match client
536 .table()
537 .get(project_id, dataset_id, table_id, None)
538 .await
539 {
540 Err(BQError::RequestError(_)) => {
541 return self
543 .create_table(
544 &client,
545 project_id,
546 dataset_id,
547 table_id,
548 &self.schema.fields,
549 )
550 .await
551 .map(|_| ());
552 }
553 Err(e) => return Err(SinkError::BigQuery(e.into())),
554 _ => {}
555 }
556 }
557
558 let mut rs = client
559 .job()
560 .query(
561 &self.config.common.project,
562 QueryRequest::new(format!(
563 "SELECT column_name, data_type FROM `{}.{}.INFORMATION_SCHEMA.COLUMNS` WHERE table_name = '{}'",
564 project_id, dataset_id, table_id,
565 )),
566 ).await.map_err(|e| SinkError::BigQuery(e.into()))?;
567
568 let mut big_query_schema = HashMap::default();
569 while rs.next_row() {
570 big_query_schema.insert(
571 rs.get_string_by_name("column_name")
572 .map_err(|e| SinkError::BigQuery(e.into()))?
573 .ok_or_else(|| {
574 SinkError::BigQuery(anyhow::anyhow!("Cannot find column_name"))
575 })?,
576 rs.get_string_by_name("data_type")
577 .map_err(|e| SinkError::BigQuery(e.into()))?
578 .ok_or_else(|| {
579 SinkError::BigQuery(anyhow::anyhow!("Cannot find column_name"))
580 })?,
581 );
582 }
583
584 self.check_column_name_and_type(big_query_schema)?;
585 Ok(())
586 }
587}
588
589pub struct BigQuerySinkWriter {
590 pub config: BigQueryConfig,
591 #[expect(dead_code)]
592 schema: Schema,
593 #[expect(dead_code)]
594 pk_indices: Vec<usize>,
595 client: StorageWriterClient,
596 is_append_only: bool,
597 row_encoder: ProtoEncoder,
598 writer_pb_schema: ProtoSchema,
599 #[expect(dead_code)]
600 message_descriptor: MessageDescriptor,
601 write_stream: String,
602 proto_field: Option<FieldDescriptor>,
603}
604
605impl TryFrom<SinkParam> for BigQuerySink {
606 type Error = SinkError;
607
608 fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
609 let schema = param.schema();
610 let config = BigQueryConfig::from_btreemap(param.properties)?;
611 BigQuerySink::new(
612 config,
613 schema,
614 param.downstream_pk,
615 param.sink_type.is_append_only(),
616 )
617 }
618}
619
620impl BigQuerySinkWriter {
621 pub async fn new(
622 config: BigQueryConfig,
623 schema: Schema,
624 pk_indices: Vec<usize>,
625 is_append_only: bool,
626 ) -> Result<(Self, impl Stream<Item = Result<()>>)> {
627 let (client, resp_stream) = config
628 .common
629 .build_writer_client(&config.aws_auth_props)
630 .await?;
631 let mut descriptor_proto = build_protobuf_schema(
632 schema
633 .fields()
634 .iter()
635 .map(|f| (f.name.as_str(), &f.data_type)),
636 config.common.table.clone(),
637 )?;
638
639 if !is_append_only {
640 let field = FieldDescriptorProto {
641 name: Some(CHANGE_TYPE.to_owned()),
642 number: Some((schema.len() + 1) as i32),
643 r#type: Some(field_descriptor_proto::Type::String.into()),
644 ..Default::default()
645 };
646 descriptor_proto.field.push(field);
647 }
648
649 let descriptor_pool = build_protobuf_descriptor_pool(&descriptor_proto)?;
650 let message_descriptor = descriptor_pool
651 .get_message_by_name(&config.common.table)
652 .ok_or_else(|| {
653 SinkError::BigQuery(anyhow::anyhow!(
654 "Can't find message proto {}",
655 &config.common.table
656 ))
657 })?;
658 let proto_field = if !is_append_only {
659 let proto_field = message_descriptor
660 .get_field_by_name(CHANGE_TYPE)
661 .ok_or_else(|| {
662 SinkError::BigQuery(anyhow::anyhow!("Can't find {}", CHANGE_TYPE))
663 })?;
664 Some(proto_field)
665 } else {
666 None
667 };
668 let row_encoder = ProtoEncoder::new(
669 schema.clone(),
670 None,
671 message_descriptor.clone(),
672 ProtoHeader::None,
673 )?;
674 Ok((
675 Self {
676 write_stream: format!(
677 "projects/{}/datasets/{}/tables/{}/streams/_default",
678 config.common.project, config.common.dataset, config.common.table
679 ),
680 config,
681 schema,
682 pk_indices,
683 client,
684 is_append_only,
685 row_encoder,
686 message_descriptor,
687 proto_field,
688 writer_pb_schema: ProtoSchema {
689 proto_descriptor: Some(descriptor_proto),
690 },
691 },
692 resp_stream,
693 ))
694 }
695
696 fn append_only(&mut self, chunk: StreamChunk) -> Result<Vec<Vec<u8>>> {
697 let mut serialized_rows: Vec<Vec<u8>> = Vec::with_capacity(chunk.capacity());
698 for (op, row) in chunk.rows() {
699 if op != Op::Insert {
700 continue;
701 }
702 serialized_rows.push(self.row_encoder.encode(row)?.ser_to()?)
703 }
704 Ok(serialized_rows)
705 }
706
707 fn upsert(&mut self, chunk: StreamChunk) -> Result<Vec<Vec<u8>>> {
708 let mut serialized_rows: Vec<Vec<u8>> = Vec::with_capacity(chunk.capacity());
709 for (op, row) in chunk.rows() {
710 if op == Op::UpdateDelete {
711 continue;
712 }
713 let mut pb_row = self.row_encoder.encode(row)?;
714 match op {
715 Op::Insert => pb_row
716 .message
717 .try_set_field(
718 self.proto_field.as_ref().unwrap(),
719 prost_reflect::Value::String("UPSERT".to_owned()),
720 )
721 .map_err(|e| SinkError::BigQuery(e.into()))?,
722 Op::Delete => pb_row
723 .message
724 .try_set_field(
725 self.proto_field.as_ref().unwrap(),
726 prost_reflect::Value::String("DELETE".to_owned()),
727 )
728 .map_err(|e| SinkError::BigQuery(e.into()))?,
729 Op::UpdateDelete => continue,
730 Op::UpdateInsert => pb_row
731 .message
732 .try_set_field(
733 self.proto_field.as_ref().unwrap(),
734 prost_reflect::Value::String("UPSERT".to_owned()),
735 )
736 .map_err(|e| SinkError::BigQuery(e.into()))?,
737 };
738
739 serialized_rows.push(pb_row.ser_to()?)
740 }
741 Ok(serialized_rows)
742 }
743
744 fn write_chunk(&mut self, chunk: StreamChunk) -> Result<usize> {
745 let serialized_rows = if self.is_append_only {
746 self.append_only(chunk)?
747 } else {
748 self.upsert(chunk)?
749 };
750 if serialized_rows.is_empty() {
751 return Ok(0);
752 }
753 let mut result = Vec::new();
754 let mut result_inner = Vec::new();
755 let mut size_count = 0;
756 for i in serialized_rows {
757 size_count += i.len();
758 if size_count > MAX_ROW_SIZE {
759 result.push(result_inner);
760 result_inner = Vec::new();
761 size_count = i.len();
762 }
763 result_inner.push(i);
764 }
765 if !result_inner.is_empty() {
766 result.push(result_inner);
767 }
768 let len = result.len();
769 for serialized_rows in result {
770 let rows = AppendRowsRequestRows::ProtoRows(ProtoData {
771 writer_schema: Some(self.writer_pb_schema.clone()),
772 rows: Some(ProtoRows { serialized_rows }),
773 });
774 self.client.append_rows(rows, self.write_stream.clone())?;
775 }
776 Ok(len)
777 }
778}
779
780#[try_stream(ok = (), error = SinkError)]
781pub async fn resp_to_stream(
782 resp_stream: impl Future<
783 Output = std::result::Result<
784 Response<google_cloud_gax::grpc::Streaming<AppendRowsResponse>>,
785 Status,
786 >,
787 >
788 + 'static
789 + Send,
790) {
791 let mut resp_stream = resp_stream
792 .await
793 .map_err(|e| SinkError::BigQuery(e.into()))?
794 .into_inner();
795 loop {
796 match resp_stream
797 .message()
798 .await
799 .map_err(|e| SinkError::BigQuery(e.into()))?
800 {
801 Some(append_rows_response) => {
802 if !append_rows_response.row_errors.is_empty() {
803 return Err(SinkError::BigQuery(anyhow::anyhow!(
804 "bigquery insert error {:?}",
805 append_rows_response.row_errors
806 )));
807 }
808 if let Some(google_cloud_googleapis::cloud::bigquery::storage::v1::append_rows_response::Response::Error(status)) = append_rows_response.response{
809 return Err(SinkError::BigQuery(anyhow::anyhow!(
810 "bigquery insert error {:?}",
811 status
812 )));
813 }
814 yield ();
815 }
816 None => {
817 return Err(SinkError::BigQuery(anyhow::anyhow!(
818 "bigquery insert error: end of resp stream",
819 )));
820 }
821 }
822 }
823}
824
825struct StorageWriterClient {
826 #[expect(dead_code)]
827 environment: Environment,
828 request_sender: mpsc::UnboundedSender<AppendRowsRequest>,
829}
830impl StorageWriterClient {
831 pub async fn new(
832 credentials: CredentialsFile,
833 ) -> Result<(Self, impl Stream<Item = Result<()>>)> {
834 let ts_grpc = google_cloud_auth::token::DefaultTokenSourceProvider::new_with_credentials(
835 Self::bigquery_grpc_auth_config(),
836 Box::new(credentials),
837 )
838 .await
839 .map_err(|e| SinkError::BigQuery(e.into()))?;
840 let conn_options = ConnectionOptions {
841 connect_timeout: CONNECT_TIMEOUT,
842 timeout: CONNECTION_TIMEOUT,
843 };
844 let environment = Environment::GoogleCloud(Box::new(ts_grpc));
845 let conn = ConnectionManager::new(DEFAULT_GRPC_CHANNEL_NUMS, &environment, &conn_options)
846 .await
847 .map_err(|e| SinkError::BigQuery(e.into()))?;
848 let mut client = conn.writer();
849
850 let (tx, rx) = mpsc::unbounded_channel();
851 let stream = tokio_stream::wrappers::UnboundedReceiverStream::new(rx);
852
853 let resp = async move { client.append_rows(Request::new(stream)).await };
854 let resp_stream = resp_to_stream(resp);
855
856 Ok((
857 StorageWriterClient {
858 environment,
859 request_sender: tx,
860 },
861 resp_stream,
862 ))
863 }
864
865 pub fn append_rows(&mut self, row: AppendRowsRequestRows, write_stream: String) -> Result<()> {
866 let append_req = AppendRowsRequest {
867 write_stream: write_stream.clone(),
868 offset: None,
869 trace_id: Uuid::new_v4().hyphenated().to_string(),
870 missing_value_interpretations: HashMap::default(),
871 rows: Some(row),
872 default_missing_value_interpretation: MissingValueInterpretation::DefaultValue as i32,
873 };
874 self.request_sender
875 .send(append_req)
876 .map_err(|e| SinkError::BigQuery(e.into()))?;
877 Ok(())
878 }
879
880 fn bigquery_grpc_auth_config() -> google_cloud_auth::project::Config<'static> {
881 let mut auth_config = google_cloud_auth::project::Config::default();
882 auth_config =
883 auth_config.with_audience(google_cloud_bigquery::grpc::apiv1::conn_pool::AUDIENCE);
884 auth_config =
885 auth_config.with_scopes(&google_cloud_bigquery::grpc::apiv1::conn_pool::SCOPES);
886 auth_config
887 }
888}
889
890fn build_protobuf_descriptor_pool(desc: &DescriptorProto) -> Result<prost_reflect::DescriptorPool> {
891 let file_descriptor = FileDescriptorProto {
892 message_type: vec![desc.clone()],
893 name: Some("bigquery".to_owned()),
894 ..Default::default()
895 };
896
897 prost_reflect::DescriptorPool::from_file_descriptor_set(FileDescriptorSet {
898 file: vec![file_descriptor],
899 })
900 .context("failed to build descriptor pool")
901 .map_err(SinkError::BigQuery)
902}
903
904fn build_protobuf_schema<'a>(
905 fields: impl Iterator<Item = (&'a str, &'a DataType)>,
906 name: String,
907) -> Result<DescriptorProto> {
908 let mut proto = DescriptorProto {
909 name: Some(name),
910 ..Default::default()
911 };
912 let mut struct_vec = vec![];
913 let field_vec = fields
914 .enumerate()
915 .map(|(index, (name, data_type))| {
916 let (field, des_proto) =
917 build_protobuf_field(data_type, (index + 1) as i32, name.to_owned())?;
918 if let Some(sv) = des_proto {
919 struct_vec.push(sv);
920 }
921 Ok(field)
922 })
923 .collect::<Result<Vec<_>>>()?;
924 proto.field = field_vec;
925 proto.nested_type = struct_vec;
926 Ok(proto)
927}
928
929fn build_protobuf_field(
930 data_type: &DataType,
931 index: i32,
932 name: String,
933) -> Result<(FieldDescriptorProto, Option<DescriptorProto>)> {
934 let mut field = FieldDescriptorProto {
935 name: Some(name.clone()),
936 number: Some(index),
937 ..Default::default()
938 };
939 match data_type {
940 DataType::Boolean => field.r#type = Some(field_descriptor_proto::Type::Bool.into()),
941 DataType::Int32 => field.r#type = Some(field_descriptor_proto::Type::Int32.into()),
942 DataType::Int16 | DataType::Int64 => {
943 field.r#type = Some(field_descriptor_proto::Type::Int64.into())
944 }
945 DataType::Float64 => field.r#type = Some(field_descriptor_proto::Type::Double.into()),
946 DataType::Decimal => field.r#type = Some(field_descriptor_proto::Type::String.into()),
947 DataType::Date => field.r#type = Some(field_descriptor_proto::Type::Int32.into()),
948 DataType::Varchar => field.r#type = Some(field_descriptor_proto::Type::String.into()),
949 DataType::Time => field.r#type = Some(field_descriptor_proto::Type::String.into()),
950 DataType::Timestamp => field.r#type = Some(field_descriptor_proto::Type::String.into()),
951 DataType::Timestamptz => field.r#type = Some(field_descriptor_proto::Type::String.into()),
952 DataType::Interval => field.r#type = Some(field_descriptor_proto::Type::String.into()),
953 DataType::Struct(s) => {
954 field.r#type = Some(field_descriptor_proto::Type::Message.into());
955 let name = format!("Struct{}", name);
956 let sub_proto = build_protobuf_schema(s.iter(), name.clone())?;
957 field.type_name = Some(name);
958 return Ok((field, Some(sub_proto)));
959 }
960 DataType::List(l) => {
961 let (mut field, proto) = build_protobuf_field(l.as_ref(), index, name.clone())?;
962 field.label = Some(field_descriptor_proto::Label::Repeated.into());
963 return Ok((field, proto));
964 }
965 DataType::Bytea => field.r#type = Some(field_descriptor_proto::Type::Bytes.into()),
966 DataType::Jsonb => field.r#type = Some(field_descriptor_proto::Type::String.into()),
967 DataType::Serial => field.r#type = Some(field_descriptor_proto::Type::Int64.into()),
968 DataType::Float32 | DataType::Int256 => {
969 return Err(SinkError::BigQuery(anyhow::anyhow!(
970 "Don't support Float32 and Int256"
971 )));
972 }
973 DataType::Map(_) => return Err(SinkError::BigQuery(anyhow::anyhow!("Don't support Map"))),
974 DataType::Vector(_) => {
975 return Err(SinkError::BigQuery(anyhow::anyhow!("Don't support Vector")));
976 }
977 }
978 Ok((field, None))
979}
980
981#[cfg(test)]
982mod test {
983
984 use std::assert_matches::assert_matches;
985
986 use risingwave_common::catalog::{Field, Schema};
987 use risingwave_common::types::{DataType, StructType};
988
989 use crate::sink::big_query::{
990 BigQuerySink, build_protobuf_descriptor_pool, build_protobuf_schema,
991 };
992
993 #[tokio::test]
994 async fn test_type_check() {
995 let big_query_type_string = "ARRAY<STRUCT<v1 ARRAY<INT64>, v2 STRUCT<v1 INT64, v2 INT64>>>";
996 let rw_datatype = DataType::List(Box::new(DataType::Struct(StructType::new(vec![
997 ("v1".to_owned(), DataType::List(Box::new(DataType::Int64))),
998 (
999 "v2".to_owned(),
1000 DataType::Struct(StructType::new(vec![
1001 ("v1".to_owned(), DataType::Int64),
1002 ("v2".to_owned(), DataType::Int64),
1003 ])),
1004 ),
1005 ]))));
1006 assert_eq!(
1007 BigQuerySink::get_string_and_check_support_from_datatype(&rw_datatype).unwrap(),
1008 big_query_type_string
1009 );
1010 }
1011
1012 #[tokio::test]
1013 async fn test_schema_check() {
1014 let schema = Schema {
1015 fields: vec![
1016 Field::with_name(DataType::Int64, "v1"),
1017 Field::with_name(DataType::Float64, "v2"),
1018 Field::with_name(
1019 DataType::List(Box::new(DataType::Struct(StructType::new(vec![
1020 ("v1".to_owned(), DataType::List(Box::new(DataType::Int64))),
1021 (
1022 "v3".to_owned(),
1023 DataType::Struct(StructType::new(vec![
1024 ("v1".to_owned(), DataType::Int64),
1025 ("v2".to_owned(), DataType::Int64),
1026 ])),
1027 ),
1028 ])))),
1029 "v3",
1030 ),
1031 ],
1032 };
1033 let fields = schema
1034 .fields()
1035 .iter()
1036 .map(|f| (f.name.as_str(), &f.data_type));
1037 let desc = build_protobuf_schema(fields, "t1".to_owned()).unwrap();
1038 let pool = build_protobuf_descriptor_pool(&desc).unwrap();
1039 let t1_message = pool.get_message_by_name("t1").unwrap();
1040 assert_matches!(
1041 t1_message.get_field_by_name("v1").unwrap().kind(),
1042 prost_reflect::Kind::Int64
1043 );
1044 assert_matches!(
1045 t1_message.get_field_by_name("v2").unwrap().kind(),
1046 prost_reflect::Kind::Double
1047 );
1048 assert_matches!(
1049 t1_message.get_field_by_name("v3").unwrap().kind(),
1050 prost_reflect::Kind::Message(_)
1051 );
1052
1053 let v3_message = pool.get_message_by_name("t1.Structv3").unwrap();
1054 assert_matches!(
1055 v3_message.get_field_by_name("v1").unwrap().kind(),
1056 prost_reflect::Kind::Int64
1057 );
1058 assert!(v3_message.get_field_by_name("v1").unwrap().is_list());
1059
1060 let v3_v3_message = pool.get_message_by_name("t1.Structv3.Structv3").unwrap();
1061 assert_matches!(
1062 v3_v3_message.get_field_by_name("v1").unwrap().kind(),
1063 prost_reflect::Kind::Int64
1064 );
1065 assert_matches!(
1066 v3_v3_message.get_field_by_name("v2").unwrap().kind(),
1067 prost_reflect::Kind::Int64
1068 );
1069 }
1070}