1use core::pin::Pin;
16use core::time::Duration;
17use std::collections::{BTreeMap, HashMap, VecDeque};
18
19use anyhow::{Context, anyhow};
20use futures::future::pending;
21use futures::prelude::Future;
22use futures::{Stream, StreamExt};
23use futures_async_stream::try_stream;
24use gcp_bigquery_client::Client;
25use gcp_bigquery_client::error::BQError;
26use gcp_bigquery_client::model::query_request::QueryRequest;
27use gcp_bigquery_client::model::table::Table;
28use gcp_bigquery_client::model::table_field_schema::TableFieldSchema;
29use gcp_bigquery_client::model::table_schema::TableSchema;
30use google_cloud_bigquery::grpc::apiv1::conn_pool::ConnectionManager;
31use google_cloud_gax::conn::{ConnectionOptions, Environment};
32use google_cloud_gax::grpc::Request;
33use google_cloud_googleapis::cloud::bigquery::storage::v1::append_rows_request::{
34 MissingValueInterpretation, ProtoData, Rows as AppendRowsRequestRows,
35};
36use google_cloud_googleapis::cloud::bigquery::storage::v1::{
37 AppendRowsRequest, AppendRowsResponse, ProtoRows, ProtoSchema,
38};
39use google_cloud_pubsub::client::google_cloud_auth;
40use google_cloud_pubsub::client::google_cloud_auth::credentials::CredentialsFile;
41use phf::{Set, phf_set};
42use prost_reflect::{FieldDescriptor, MessageDescriptor};
43use prost_types::{
44 DescriptorProto, FieldDescriptorProto, FileDescriptorProto, FileDescriptorSet,
45 field_descriptor_proto,
46};
47use risingwave_common::array::{Op, StreamChunk};
48use risingwave_common::catalog::{Field, Schema};
49use risingwave_common::types::DataType;
50use serde_derive::Deserialize;
51use serde_with::{DisplayFromStr, serde_as};
52use simd_json::prelude::ArrayTrait;
53use tokio::sync::mpsc;
54use tonic::{Response, Status, async_trait};
55use url::Url;
56use uuid::Uuid;
57use with_options::WithOptions;
58use yup_oauth2::ServiceAccountKey;
59
60use super::encoder::{ProtoEncoder, ProtoHeader, RowEncoder, SerTo};
61use super::log_store::{LogStoreReadItem, TruncateOffset};
62use super::{
63 LogSinker, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, SinkError, SinkLogReader,
64};
65use crate::aws_utils::load_file_descriptor_from_s3;
66use crate::connector_common::AwsAuthProps;
67use crate::enforce_secret::EnforceSecret;
68use crate::sink::{DummySinkCommitCoordinator, Result, Sink, SinkParam, SinkWriterParam};
69
70pub const BIGQUERY_SINK: &str = "bigquery";
71pub const CHANGE_TYPE: &str = "_CHANGE_TYPE";
72const DEFAULT_GRPC_CHANNEL_NUMS: usize = 4;
73const CONNECT_TIMEOUT: Option<Duration> = Some(Duration::from_secs(30));
74const CONNECTION_TIMEOUT: Option<Duration> = None;
75const BIGQUERY_SEND_FUTURE_BUFFER_MAX_SIZE: usize = 65536;
76const MAX_ROW_SIZE: usize = 8 * 1024 * 1024;
78
79#[serde_as]
80#[derive(Deserialize, Debug, Clone, WithOptions)]
81pub struct BigQueryCommon {
82 #[serde(rename = "bigquery.local.path")]
83 pub local_path: Option<String>,
84 #[serde(rename = "bigquery.s3.path")]
85 pub s3_path: Option<String>,
86 #[serde(rename = "bigquery.project")]
87 pub project: String,
88 #[serde(rename = "bigquery.dataset")]
89 pub dataset: String,
90 #[serde(rename = "bigquery.table")]
91 pub table: String,
92 #[serde(default)] #[serde_as(as = "DisplayFromStr")]
94 pub auto_create: bool,
95 #[serde(rename = "bigquery.credentials")]
96 pub credentials: Option<String>,
97}
98
99impl EnforceSecret for BigQueryCommon {
100 const ENFORCE_SECRET_PROPERTIES: Set<&'static str> = phf_set! {
101 "bigquery.credentials",
102 };
103}
104
105struct BigQueryFutureManager {
106 offset_queue: VecDeque<(TruncateOffset, usize)>,
113 resp_stream: Pin<Box<dyn Stream<Item = Result<()>> + Send>>,
114}
115impl BigQueryFutureManager {
116 pub fn new(
117 max_future_num: usize,
118 resp_stream: impl Stream<Item = Result<()>> + Send + 'static,
119 ) -> Self {
120 let offset_queue = VecDeque::with_capacity(max_future_num);
121 Self {
122 offset_queue,
123 resp_stream: Box::pin(resp_stream),
124 }
125 }
126
127 pub fn add_offset(&mut self, offset: TruncateOffset, resp_num: usize) {
128 self.offset_queue.push_back((offset, resp_num));
129 }
130
131 pub async fn next_offset(&mut self) -> Result<TruncateOffset> {
132 if let Some((_offset, remaining_resp_num)) = self.offset_queue.front_mut() {
133 if *remaining_resp_num == 0 {
134 return Ok(self.offset_queue.pop_front().unwrap().0);
135 }
136 while *remaining_resp_num > 0 {
137 self.resp_stream
138 .next()
139 .await
140 .ok_or_else(|| SinkError::BigQuery(anyhow::anyhow!("end of stream")))??;
141 *remaining_resp_num -= 1;
142 }
143 Ok(self.offset_queue.pop_front().unwrap().0)
144 } else {
145 pending().await
146 }
147 }
148}
149pub struct BigQueryLogSinker {
150 writer: BigQuerySinkWriter,
151 bigquery_future_manager: BigQueryFutureManager,
152 future_num: usize,
153}
154impl BigQueryLogSinker {
155 pub fn new(
156 writer: BigQuerySinkWriter,
157 resp_stream: impl Stream<Item = Result<()>> + Send + 'static,
158 future_num: usize,
159 ) -> Self {
160 Self {
161 writer,
162 bigquery_future_manager: BigQueryFutureManager::new(future_num, resp_stream),
163 future_num,
164 }
165 }
166}
167
168#[async_trait]
169impl LogSinker for BigQueryLogSinker {
170 async fn consume_log_and_sink(mut self, mut log_reader: impl SinkLogReader) -> Result<!> {
171 log_reader.start_from(None).await?;
172 loop {
173 tokio::select!(
174 offset = self.bigquery_future_manager.next_offset() => {
175 log_reader.truncate(offset?)?;
176 }
177 item_result = log_reader.next_item(), if self.bigquery_future_manager.offset_queue.len() <= self.future_num => {
178 let (epoch, item) = item_result?;
179 match item {
180 LogStoreReadItem::StreamChunk { chunk_id, chunk } => {
181 let resp_num = self.writer.write_chunk(chunk)?;
182 self.bigquery_future_manager
183 .add_offset(TruncateOffset::Chunk { epoch, chunk_id },resp_num);
184 }
185 LogStoreReadItem::Barrier { .. } => {
186 self.bigquery_future_manager
187 .add_offset(TruncateOffset::Barrier { epoch },0);
188 }
189 }
190 }
191 )
192 }
193 }
194}
195
196impl BigQueryCommon {
197 async fn build_client(&self, aws_auth_props: &AwsAuthProps) -> Result<Client> {
198 let auth_json = self.get_auth_json_from_path(aws_auth_props).await?;
199
200 let service_account = serde_json::from_str::<ServiceAccountKey>(&auth_json)
201 .map_err(|err| SinkError::BigQuery(anyhow::anyhow!(err)))?;
202 let client: Client = Client::from_service_account_key(service_account, false)
203 .await
204 .map_err(|err| SinkError::BigQuery(anyhow::anyhow!(err)))?;
205 Ok(client)
206 }
207
208 async fn build_writer_client(
209 &self,
210 aws_auth_props: &AwsAuthProps,
211 ) -> Result<(StorageWriterClient, impl Stream<Item = Result<()>> + use<>)> {
212 let auth_json = self.get_auth_json_from_path(aws_auth_props).await?;
213
214 let credentials_file = CredentialsFile::new_from_str(&auth_json)
215 .await
216 .map_err(|e| SinkError::BigQuery(e.into()))?;
217 StorageWriterClient::new(credentials_file).await
218 }
219
220 async fn get_auth_json_from_path(&self, aws_auth_props: &AwsAuthProps) -> Result<String> {
221 if let Some(credentials) = &self.credentials {
222 Ok(credentials.clone())
223 } else if let Some(local_path) = &self.local_path {
224 std::fs::read_to_string(local_path)
225 .map_err(|err| SinkError::BigQuery(anyhow::anyhow!(err)))
226 } else if let Some(s3_path) = &self.s3_path {
227 let url =
228 Url::parse(s3_path).map_err(|err| SinkError::BigQuery(anyhow::anyhow!(err)))?;
229 let auth_vec = load_file_descriptor_from_s3(&url, aws_auth_props)
230 .await
231 .map_err(|err| SinkError::BigQuery(anyhow::anyhow!(err)))?;
232 Ok(String::from_utf8(auth_vec).map_err(|e| SinkError::BigQuery(e.into()))?)
233 } else {
234 Err(SinkError::BigQuery(anyhow::anyhow!(
235 "`bigquery.local.path` and `bigquery.s3.path` set at least one, configure as needed."
236 )))
237 }
238 }
239}
240
241#[serde_as]
242#[derive(Clone, Debug, Deserialize, WithOptions)]
243pub struct BigQueryConfig {
244 #[serde(flatten)]
245 pub common: BigQueryCommon,
246 #[serde(flatten)]
247 pub aws_auth_props: AwsAuthProps,
248 pub r#type: String, }
250
251impl EnforceSecret for BigQueryConfig {
252 fn enforce_one(prop: &str) -> crate::error::ConnectorResult<()> {
253 BigQueryCommon::enforce_one(prop)?;
254 AwsAuthProps::enforce_one(prop)?;
255 Ok(())
256 }
257}
258
259impl BigQueryConfig {
260 pub fn from_btreemap(properties: BTreeMap<String, String>) -> Result<Self> {
261 let config =
262 serde_json::from_value::<BigQueryConfig>(serde_json::to_value(properties).unwrap())
263 .map_err(|e| SinkError::Config(anyhow!(e)))?;
264 if config.r#type != SINK_TYPE_APPEND_ONLY && config.r#type != SINK_TYPE_UPSERT {
265 return Err(SinkError::Config(anyhow!(
266 "`{}` must be {}, or {}",
267 SINK_TYPE_OPTION,
268 SINK_TYPE_APPEND_ONLY,
269 SINK_TYPE_UPSERT
270 )));
271 }
272 Ok(config)
273 }
274}
275
276#[derive(Debug)]
277pub struct BigQuerySink {
278 pub config: BigQueryConfig,
279 schema: Schema,
280 pk_indices: Vec<usize>,
281 is_append_only: bool,
282}
283
284impl EnforceSecret for BigQuerySink {
285 fn enforce_secret<'a>(
286 prop_iter: impl Iterator<Item = &'a str>,
287 ) -> crate::error::ConnectorResult<()> {
288 for prop in prop_iter {
289 BigQueryConfig::enforce_one(prop)?;
290 }
291 Ok(())
292 }
293}
294
295impl BigQuerySink {
296 pub fn new(
297 config: BigQueryConfig,
298 schema: Schema,
299 pk_indices: Vec<usize>,
300 is_append_only: bool,
301 ) -> Result<Self> {
302 Ok(Self {
303 config,
304 schema,
305 pk_indices,
306 is_append_only,
307 })
308 }
309}
310
311impl BigQuerySink {
312 fn check_column_name_and_type(
313 &self,
314 big_query_columns_desc: HashMap<String, String>,
315 ) -> Result<()> {
316 let rw_fields_name = self.schema.fields();
317 if big_query_columns_desc.is_empty() {
318 return Err(SinkError::BigQuery(anyhow::anyhow!(
319 "Cannot find table in bigquery"
320 )));
321 }
322 if rw_fields_name.len().ne(&big_query_columns_desc.len()) {
323 return Err(SinkError::BigQuery(anyhow::anyhow!(
324 "The length of the RisingWave column {} must be equal to the length of the bigquery column {}",
325 rw_fields_name.len(),
326 big_query_columns_desc.len()
327 )));
328 }
329
330 for i in rw_fields_name {
331 let value = big_query_columns_desc.get(&i.name).ok_or_else(|| {
332 SinkError::BigQuery(anyhow::anyhow!(
333 "Column `{:?}` on RisingWave side is not found on BigQuery side.",
334 i.name
335 ))
336 })?;
337 let data_type_string = Self::get_string_and_check_support_from_datatype(&i.data_type)?;
338 if data_type_string.ne(value) {
339 return Err(SinkError::BigQuery(anyhow::anyhow!(
340 "Data type mismatch for column `{:?}`. BigQuery side: `{:?}`, RisingWave side: `{:?}`. ",
341 i.name,
342 value,
343 data_type_string
344 )));
345 };
346 }
347 Ok(())
348 }
349
350 fn get_string_and_check_support_from_datatype(rw_data_type: &DataType) -> Result<String> {
351 match rw_data_type {
352 DataType::Boolean => Ok("BOOL".to_owned()),
353 DataType::Int16 => Ok("INT64".to_owned()),
354 DataType::Int32 => Ok("INT64".to_owned()),
355 DataType::Int64 => Ok("INT64".to_owned()),
356 DataType::Float32 => Err(SinkError::BigQuery(anyhow::anyhow!(
357 "Bigquery cannot support real"
358 ))),
359 DataType::Float64 => Ok("FLOAT64".to_owned()),
360 DataType::Decimal => Ok("NUMERIC".to_owned()),
361 DataType::Date => Ok("DATE".to_owned()),
362 DataType::Varchar => Ok("STRING".to_owned()),
363 DataType::Time => Ok("TIME".to_owned()),
364 DataType::Timestamp => Ok("DATETIME".to_owned()),
365 DataType::Timestamptz => Ok("TIMESTAMP".to_owned()),
366 DataType::Interval => Ok("INTERVAL".to_owned()),
367 DataType::Struct(structs) => {
368 let mut elements_vec = vec![];
369 for (name, datatype) in structs.iter() {
370 let element_string =
371 Self::get_string_and_check_support_from_datatype(datatype)?;
372 elements_vec.push(format!("{} {}", name, element_string));
373 }
374 Ok(format!("STRUCT<{}>", elements_vec.join(", ")))
375 }
376 DataType::List(l) => {
377 let element_string = Self::get_string_and_check_support_from_datatype(l.as_ref())?;
378 Ok(format!("ARRAY<{}>", element_string))
379 }
380 DataType::Bytea => Ok("BYTES".to_owned()),
381 DataType::Jsonb => Ok("JSON".to_owned()),
382 DataType::Serial => Ok("INT64".to_owned()),
383 DataType::Int256 => Err(SinkError::BigQuery(anyhow::anyhow!(
384 "Bigquery cannot support Int256"
385 ))),
386 DataType::Map(_) => Err(SinkError::BigQuery(anyhow::anyhow!(
387 "Bigquery cannot support Map"
388 ))),
389 }
390 }
391
392 fn map_field(rw_field: &Field) -> Result<TableFieldSchema> {
393 let tfs = match &rw_field.data_type {
394 DataType::Boolean => TableFieldSchema::bool(&rw_field.name),
395 DataType::Int16 | DataType::Int32 | DataType::Int64 | DataType::Serial => {
396 TableFieldSchema::integer(&rw_field.name)
397 }
398 DataType::Float32 => {
399 return Err(SinkError::BigQuery(anyhow::anyhow!(
400 "Bigquery cannot support real"
401 )));
402 }
403 DataType::Float64 => TableFieldSchema::float(&rw_field.name),
404 DataType::Decimal => TableFieldSchema::numeric(&rw_field.name),
405 DataType::Date => TableFieldSchema::date(&rw_field.name),
406 DataType::Varchar => TableFieldSchema::string(&rw_field.name),
407 DataType::Time => TableFieldSchema::time(&rw_field.name),
408 DataType::Timestamp => TableFieldSchema::date_time(&rw_field.name),
409 DataType::Timestamptz => TableFieldSchema::timestamp(&rw_field.name),
410 DataType::Interval => {
411 return Err(SinkError::BigQuery(anyhow::anyhow!(
412 "Bigquery cannot support Interval"
413 )));
414 }
415 DataType::Struct(st) => {
416 let mut sub_fields = Vec::with_capacity(st.len());
417 for (name, dt) in st.iter() {
418 let rw_field = Field::with_name(dt.clone(), name);
419 let field = Self::map_field(&rw_field)?;
420 sub_fields.push(field);
421 }
422 TableFieldSchema::record(&rw_field.name, sub_fields)
423 }
424 DataType::List(dt) => {
425 let inner_field = Self::map_field(&Field::with_name(*dt.clone(), &rw_field.name))?;
426 TableFieldSchema {
427 mode: Some("REPEATED".to_owned()),
428 ..inner_field
429 }
430 }
431
432 DataType::Bytea => TableFieldSchema::bytes(&rw_field.name),
433 DataType::Jsonb => TableFieldSchema::json(&rw_field.name),
434 DataType::Int256 => {
435 return Err(SinkError::BigQuery(anyhow::anyhow!(
436 "Bigquery cannot support Int256"
437 )));
438 }
439 DataType::Map(_) => {
440 return Err(SinkError::BigQuery(anyhow::anyhow!(
441 "Bigquery cannot support Map"
442 )));
443 }
444 };
445 Ok(tfs)
446 }
447
448 async fn create_table(
449 &self,
450 client: &Client,
451 project_id: &str,
452 dataset_id: &str,
453 table_id: &str,
454 fields: &Vec<Field>,
455 ) -> Result<Table> {
456 let dataset = client
457 .dataset()
458 .get(project_id, dataset_id)
459 .await
460 .map_err(|e| SinkError::BigQuery(e.into()))?;
461 let fields: Vec<_> = fields.iter().map(Self::map_field).collect::<Result<_>>()?;
462 let table = Table::from_dataset(&dataset, table_id, TableSchema::new(fields));
463
464 client
465 .table()
466 .create(table)
467 .await
468 .map_err(|e| SinkError::BigQuery(e.into()))
469 }
470}
471
472impl Sink for BigQuerySink {
473 type Coordinator = DummySinkCommitCoordinator;
474 type LogSinker = BigQueryLogSinker;
475
476 const SINK_NAME: &'static str = BIGQUERY_SINK;
477
478 async fn new_log_sinker(&self, _writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
479 let (writer, resp_stream) = BigQuerySinkWriter::new(
480 self.config.clone(),
481 self.schema.clone(),
482 self.pk_indices.clone(),
483 self.is_append_only,
484 )
485 .await?;
486 Ok(BigQueryLogSinker::new(
487 writer,
488 resp_stream,
489 BIGQUERY_SEND_FUTURE_BUFFER_MAX_SIZE,
490 ))
491 }
492
493 async fn validate(&self) -> Result<()> {
494 risingwave_common::license::Feature::BigQuerySink
495 .check_available()
496 .map_err(|e| anyhow::anyhow!(e))?;
497 if !self.is_append_only && self.pk_indices.is_empty() {
498 return Err(SinkError::Config(anyhow!(
499 "Primary key not defined for upsert bigquery sink (please define in `primary_key` field)"
500 )));
501 }
502 let client = self
503 .config
504 .common
505 .build_client(&self.config.aws_auth_props)
506 .await?;
507 let BigQueryCommon {
508 project: project_id,
509 dataset: dataset_id,
510 table: table_id,
511 ..
512 } = &self.config.common;
513
514 if self.config.common.auto_create {
515 match client
516 .table()
517 .get(project_id, dataset_id, table_id, None)
518 .await
519 {
520 Err(BQError::RequestError(_)) => {
521 return self
523 .create_table(
524 &client,
525 project_id,
526 dataset_id,
527 table_id,
528 &self.schema.fields,
529 )
530 .await
531 .map(|_| ());
532 }
533 Err(e) => return Err(SinkError::BigQuery(e.into())),
534 _ => {}
535 }
536 }
537
538 let mut rs = client
539 .job()
540 .query(
541 &self.config.common.project,
542 QueryRequest::new(format!(
543 "SELECT column_name, data_type FROM `{}.{}.INFORMATION_SCHEMA.COLUMNS` WHERE table_name = '{}'",
544 project_id, dataset_id, table_id,
545 )),
546 ).await.map_err(|e| SinkError::BigQuery(e.into()))?;
547
548 let mut big_query_schema = HashMap::default();
549 while rs.next_row() {
550 big_query_schema.insert(
551 rs.get_string_by_name("column_name")
552 .map_err(|e| SinkError::BigQuery(e.into()))?
553 .ok_or_else(|| {
554 SinkError::BigQuery(anyhow::anyhow!("Cannot find column_name"))
555 })?,
556 rs.get_string_by_name("data_type")
557 .map_err(|e| SinkError::BigQuery(e.into()))?
558 .ok_or_else(|| {
559 SinkError::BigQuery(anyhow::anyhow!("Cannot find column_name"))
560 })?,
561 );
562 }
563
564 self.check_column_name_and_type(big_query_schema)?;
565 Ok(())
566 }
567}
568
569pub struct BigQuerySinkWriter {
570 pub config: BigQueryConfig,
571 #[expect(dead_code)]
572 schema: Schema,
573 #[expect(dead_code)]
574 pk_indices: Vec<usize>,
575 client: StorageWriterClient,
576 is_append_only: bool,
577 row_encoder: ProtoEncoder,
578 writer_pb_schema: ProtoSchema,
579 #[expect(dead_code)]
580 message_descriptor: MessageDescriptor,
581 write_stream: String,
582 proto_field: Option<FieldDescriptor>,
583}
584
585impl TryFrom<SinkParam> for BigQuerySink {
586 type Error = SinkError;
587
588 fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
589 let schema = param.schema();
590 let config = BigQueryConfig::from_btreemap(param.properties)?;
591 BigQuerySink::new(
592 config,
593 schema,
594 param.downstream_pk,
595 param.sink_type.is_append_only(),
596 )
597 }
598}
599
600impl BigQuerySinkWriter {
601 pub async fn new(
602 config: BigQueryConfig,
603 schema: Schema,
604 pk_indices: Vec<usize>,
605 is_append_only: bool,
606 ) -> Result<(Self, impl Stream<Item = Result<()>>)> {
607 let (client, resp_stream) = config
608 .common
609 .build_writer_client(&config.aws_auth_props)
610 .await?;
611 let mut descriptor_proto = build_protobuf_schema(
612 schema
613 .fields()
614 .iter()
615 .map(|f| (f.name.as_str(), &f.data_type)),
616 config.common.table.clone(),
617 )?;
618
619 if !is_append_only {
620 let field = FieldDescriptorProto {
621 name: Some(CHANGE_TYPE.to_owned()),
622 number: Some((schema.len() + 1) as i32),
623 r#type: Some(field_descriptor_proto::Type::String.into()),
624 ..Default::default()
625 };
626 descriptor_proto.field.push(field);
627 }
628
629 let descriptor_pool = build_protobuf_descriptor_pool(&descriptor_proto)?;
630 let message_descriptor = descriptor_pool
631 .get_message_by_name(&config.common.table)
632 .ok_or_else(|| {
633 SinkError::BigQuery(anyhow::anyhow!(
634 "Can't find message proto {}",
635 &config.common.table
636 ))
637 })?;
638 let proto_field = if !is_append_only {
639 let proto_field = message_descriptor
640 .get_field_by_name(CHANGE_TYPE)
641 .ok_or_else(|| {
642 SinkError::BigQuery(anyhow::anyhow!("Can't find {}", CHANGE_TYPE))
643 })?;
644 Some(proto_field)
645 } else {
646 None
647 };
648 let row_encoder = ProtoEncoder::new(
649 schema.clone(),
650 None,
651 message_descriptor.clone(),
652 ProtoHeader::None,
653 )?;
654 Ok((
655 Self {
656 write_stream: format!(
657 "projects/{}/datasets/{}/tables/{}/streams/_default",
658 config.common.project, config.common.dataset, config.common.table
659 ),
660 config,
661 schema,
662 pk_indices,
663 client,
664 is_append_only,
665 row_encoder,
666 message_descriptor,
667 proto_field,
668 writer_pb_schema: ProtoSchema {
669 proto_descriptor: Some(descriptor_proto),
670 },
671 },
672 resp_stream,
673 ))
674 }
675
676 fn append_only(&mut self, chunk: StreamChunk) -> Result<Vec<Vec<u8>>> {
677 let mut serialized_rows: Vec<Vec<u8>> = Vec::with_capacity(chunk.capacity());
678 for (op, row) in chunk.rows() {
679 if op != Op::Insert {
680 continue;
681 }
682 serialized_rows.push(self.row_encoder.encode(row)?.ser_to()?)
683 }
684 Ok(serialized_rows)
685 }
686
687 fn upsert(&mut self, chunk: StreamChunk) -> Result<Vec<Vec<u8>>> {
688 let mut serialized_rows: Vec<Vec<u8>> = Vec::with_capacity(chunk.capacity());
689 for (op, row) in chunk.rows() {
690 if op == Op::UpdateDelete {
691 continue;
692 }
693 let mut pb_row = self.row_encoder.encode(row)?;
694 match op {
695 Op::Insert => pb_row
696 .message
697 .try_set_field(
698 self.proto_field.as_ref().unwrap(),
699 prost_reflect::Value::String("UPSERT".to_owned()),
700 )
701 .map_err(|e| SinkError::BigQuery(e.into()))?,
702 Op::Delete => pb_row
703 .message
704 .try_set_field(
705 self.proto_field.as_ref().unwrap(),
706 prost_reflect::Value::String("DELETE".to_owned()),
707 )
708 .map_err(|e| SinkError::BigQuery(e.into()))?,
709 Op::UpdateDelete => continue,
710 Op::UpdateInsert => pb_row
711 .message
712 .try_set_field(
713 self.proto_field.as_ref().unwrap(),
714 prost_reflect::Value::String("UPSERT".to_owned()),
715 )
716 .map_err(|e| SinkError::BigQuery(e.into()))?,
717 };
718
719 serialized_rows.push(pb_row.ser_to()?)
720 }
721 Ok(serialized_rows)
722 }
723
724 fn write_chunk(&mut self, chunk: StreamChunk) -> Result<usize> {
725 let serialized_rows = if self.is_append_only {
726 self.append_only(chunk)?
727 } else {
728 self.upsert(chunk)?
729 };
730 if serialized_rows.is_empty() {
731 return Ok(0);
732 }
733 let mut result = Vec::new();
734 let mut result_inner = Vec::new();
735 let mut size_count = 0;
736 for i in serialized_rows {
737 size_count += i.len();
738 if size_count > MAX_ROW_SIZE {
739 result.push(result_inner);
740 result_inner = Vec::new();
741 size_count = i.len();
742 }
743 result_inner.push(i);
744 }
745 if !result_inner.is_empty() {
746 result.push(result_inner);
747 }
748 let len = result.len();
749 for serialized_rows in result {
750 let rows = AppendRowsRequestRows::ProtoRows(ProtoData {
751 writer_schema: Some(self.writer_pb_schema.clone()),
752 rows: Some(ProtoRows { serialized_rows }),
753 });
754 self.client.append_rows(rows, self.write_stream.clone())?;
755 }
756 Ok(len)
757 }
758}
759
760#[try_stream(ok = (), error = SinkError)]
761pub async fn resp_to_stream(
762 resp_stream: impl Future<
763 Output = std::result::Result<
764 Response<google_cloud_gax::grpc::Streaming<AppendRowsResponse>>,
765 Status,
766 >,
767 >
768 + 'static
769 + Send,
770) {
771 let mut resp_stream = resp_stream
772 .await
773 .map_err(|e| SinkError::BigQuery(e.into()))?
774 .into_inner();
775 loop {
776 match resp_stream
777 .message()
778 .await
779 .map_err(|e| SinkError::BigQuery(e.into()))?
780 {
781 Some(append_rows_response) => {
782 if !append_rows_response.row_errors.is_empty() {
783 return Err(SinkError::BigQuery(anyhow::anyhow!(
784 "bigquery insert error {:?}",
785 append_rows_response.row_errors
786 )));
787 }
788 if let Some(google_cloud_googleapis::cloud::bigquery::storage::v1::append_rows_response::Response::Error(status)) = append_rows_response.response{
789 return Err(SinkError::BigQuery(anyhow::anyhow!(
790 "bigquery insert error {:?}",
791 status
792 )));
793 }
794 yield ();
795 }
796 None => {
797 return Err(SinkError::BigQuery(anyhow::anyhow!(
798 "bigquery insert error: end of resp stream",
799 )));
800 }
801 }
802 }
803}
804
805struct StorageWriterClient {
806 #[expect(dead_code)]
807 environment: Environment,
808 request_sender: mpsc::UnboundedSender<AppendRowsRequest>,
809}
810impl StorageWriterClient {
811 pub async fn new(
812 credentials: CredentialsFile,
813 ) -> Result<(Self, impl Stream<Item = Result<()>>)> {
814 let ts_grpc = google_cloud_auth::token::DefaultTokenSourceProvider::new_with_credentials(
815 Self::bigquery_grpc_auth_config(),
816 Box::new(credentials),
817 )
818 .await
819 .map_err(|e| SinkError::BigQuery(e.into()))?;
820 let conn_options = ConnectionOptions {
821 connect_timeout: CONNECT_TIMEOUT,
822 timeout: CONNECTION_TIMEOUT,
823 };
824 let environment = Environment::GoogleCloud(Box::new(ts_grpc));
825 let conn = ConnectionManager::new(DEFAULT_GRPC_CHANNEL_NUMS, &environment, &conn_options)
826 .await
827 .map_err(|e| SinkError::BigQuery(e.into()))?;
828 let mut client = conn.writer();
829
830 let (tx, rx) = mpsc::unbounded_channel();
831 let stream = tokio_stream::wrappers::UnboundedReceiverStream::new(rx);
832
833 let resp = async move { client.append_rows(Request::new(stream)).await };
834 let resp_stream = resp_to_stream(resp);
835
836 Ok((
837 StorageWriterClient {
838 environment,
839 request_sender: tx,
840 },
841 resp_stream,
842 ))
843 }
844
845 pub fn append_rows(&mut self, row: AppendRowsRequestRows, write_stream: String) -> Result<()> {
846 let append_req = AppendRowsRequest {
847 write_stream: write_stream.clone(),
848 offset: None,
849 trace_id: Uuid::new_v4().hyphenated().to_string(),
850 missing_value_interpretations: HashMap::default(),
851 rows: Some(row),
852 default_missing_value_interpretation: MissingValueInterpretation::DefaultValue as i32,
853 };
854 self.request_sender
855 .send(append_req)
856 .map_err(|e| SinkError::BigQuery(e.into()))?;
857 Ok(())
858 }
859
860 fn bigquery_grpc_auth_config() -> google_cloud_auth::project::Config<'static> {
861 let mut auth_config = google_cloud_auth::project::Config::default();
862 auth_config =
863 auth_config.with_audience(google_cloud_bigquery::grpc::apiv1::conn_pool::AUDIENCE);
864 auth_config =
865 auth_config.with_scopes(&google_cloud_bigquery::grpc::apiv1::conn_pool::SCOPES);
866 auth_config
867 }
868}
869
870fn build_protobuf_descriptor_pool(desc: &DescriptorProto) -> Result<prost_reflect::DescriptorPool> {
871 let file_descriptor = FileDescriptorProto {
872 message_type: vec![desc.clone()],
873 name: Some("bigquery".to_owned()),
874 ..Default::default()
875 };
876
877 prost_reflect::DescriptorPool::from_file_descriptor_set(FileDescriptorSet {
878 file: vec![file_descriptor],
879 })
880 .context("failed to build descriptor pool")
881 .map_err(SinkError::BigQuery)
882}
883
884fn build_protobuf_schema<'a>(
885 fields: impl Iterator<Item = (&'a str, &'a DataType)>,
886 name: String,
887) -> Result<DescriptorProto> {
888 let mut proto = DescriptorProto {
889 name: Some(name),
890 ..Default::default()
891 };
892 let mut struct_vec = vec![];
893 let field_vec = fields
894 .enumerate()
895 .map(|(index, (name, data_type))| {
896 let (field, des_proto) =
897 build_protobuf_field(data_type, (index + 1) as i32, name.to_owned())?;
898 if let Some(sv) = des_proto {
899 struct_vec.push(sv);
900 }
901 Ok(field)
902 })
903 .collect::<Result<Vec<_>>>()?;
904 proto.field = field_vec;
905 proto.nested_type = struct_vec;
906 Ok(proto)
907}
908
909fn build_protobuf_field(
910 data_type: &DataType,
911 index: i32,
912 name: String,
913) -> Result<(FieldDescriptorProto, Option<DescriptorProto>)> {
914 let mut field = FieldDescriptorProto {
915 name: Some(name.clone()),
916 number: Some(index),
917 ..Default::default()
918 };
919 match data_type {
920 DataType::Boolean => field.r#type = Some(field_descriptor_proto::Type::Bool.into()),
921 DataType::Int32 => field.r#type = Some(field_descriptor_proto::Type::Int32.into()),
922 DataType::Int16 | DataType::Int64 => {
923 field.r#type = Some(field_descriptor_proto::Type::Int64.into())
924 }
925 DataType::Float64 => field.r#type = Some(field_descriptor_proto::Type::Double.into()),
926 DataType::Decimal => field.r#type = Some(field_descriptor_proto::Type::String.into()),
927 DataType::Date => field.r#type = Some(field_descriptor_proto::Type::Int32.into()),
928 DataType::Varchar => field.r#type = Some(field_descriptor_proto::Type::String.into()),
929 DataType::Time => field.r#type = Some(field_descriptor_proto::Type::String.into()),
930 DataType::Timestamp => field.r#type = Some(field_descriptor_proto::Type::String.into()),
931 DataType::Timestamptz => field.r#type = Some(field_descriptor_proto::Type::String.into()),
932 DataType::Interval => field.r#type = Some(field_descriptor_proto::Type::String.into()),
933 DataType::Struct(s) => {
934 field.r#type = Some(field_descriptor_proto::Type::Message.into());
935 let name = format!("Struct{}", name);
936 let sub_proto = build_protobuf_schema(s.iter(), name.clone())?;
937 field.type_name = Some(name);
938 return Ok((field, Some(sub_proto)));
939 }
940 DataType::List(l) => {
941 let (mut field, proto) = build_protobuf_field(l.as_ref(), index, name.clone())?;
942 field.label = Some(field_descriptor_proto::Label::Repeated.into());
943 return Ok((field, proto));
944 }
945 DataType::Bytea => field.r#type = Some(field_descriptor_proto::Type::Bytes.into()),
946 DataType::Jsonb => field.r#type = Some(field_descriptor_proto::Type::String.into()),
947 DataType::Serial => field.r#type = Some(field_descriptor_proto::Type::Int64.into()),
948 DataType::Float32 | DataType::Int256 => {
949 return Err(SinkError::BigQuery(anyhow::anyhow!(
950 "Don't support Float32 and Int256"
951 )));
952 }
953 DataType::Map(_) => todo!(),
954 }
955 Ok((field, None))
956}
957
958#[cfg(test)]
959mod test {
960
961 use std::assert_matches::assert_matches;
962
963 use risingwave_common::catalog::{Field, Schema};
964 use risingwave_common::types::{DataType, StructType};
965
966 use crate::sink::big_query::{
967 BigQuerySink, build_protobuf_descriptor_pool, build_protobuf_schema,
968 };
969
970 #[tokio::test]
971 async fn test_type_check() {
972 let big_query_type_string = "ARRAY<STRUCT<v1 ARRAY<INT64>, v2 STRUCT<v1 INT64, v2 INT64>>>";
973 let rw_datatype = DataType::List(Box::new(DataType::Struct(StructType::new(vec![
974 ("v1".to_owned(), DataType::List(Box::new(DataType::Int64))),
975 (
976 "v2".to_owned(),
977 DataType::Struct(StructType::new(vec![
978 ("v1".to_owned(), DataType::Int64),
979 ("v2".to_owned(), DataType::Int64),
980 ])),
981 ),
982 ]))));
983 assert_eq!(
984 BigQuerySink::get_string_and_check_support_from_datatype(&rw_datatype).unwrap(),
985 big_query_type_string
986 );
987 }
988
989 #[tokio::test]
990 async fn test_schema_check() {
991 let schema = Schema {
992 fields: vec![
993 Field::with_name(DataType::Int64, "v1"),
994 Field::with_name(DataType::Float64, "v2"),
995 Field::with_name(
996 DataType::List(Box::new(DataType::Struct(StructType::new(vec![
997 ("v1".to_owned(), DataType::List(Box::new(DataType::Int64))),
998 (
999 "v3".to_owned(),
1000 DataType::Struct(StructType::new(vec![
1001 ("v1".to_owned(), DataType::Int64),
1002 ("v2".to_owned(), DataType::Int64),
1003 ])),
1004 ),
1005 ])))),
1006 "v3",
1007 ),
1008 ],
1009 };
1010 let fields = schema
1011 .fields()
1012 .iter()
1013 .map(|f| (f.name.as_str(), &f.data_type));
1014 let desc = build_protobuf_schema(fields, "t1".to_owned()).unwrap();
1015 let pool = build_protobuf_descriptor_pool(&desc).unwrap();
1016 let t1_message = pool.get_message_by_name("t1").unwrap();
1017 assert_matches!(
1018 t1_message.get_field_by_name("v1").unwrap().kind(),
1019 prost_reflect::Kind::Int64
1020 );
1021 assert_matches!(
1022 t1_message.get_field_by_name("v2").unwrap().kind(),
1023 prost_reflect::Kind::Double
1024 );
1025 assert_matches!(
1026 t1_message.get_field_by_name("v3").unwrap().kind(),
1027 prost_reflect::Kind::Message(_)
1028 );
1029
1030 let v3_message = pool.get_message_by_name("t1.Structv3").unwrap();
1031 assert_matches!(
1032 v3_message.get_field_by_name("v1").unwrap().kind(),
1033 prost_reflect::Kind::Int64
1034 );
1035 assert!(v3_message.get_field_by_name("v1").unwrap().is_list());
1036
1037 let v3_v3_message = pool.get_message_by_name("t1.Structv3.Structv3").unwrap();
1038 assert_matches!(
1039 v3_v3_message.get_field_by_name("v1").unwrap().kind(),
1040 prost_reflect::Kind::Int64
1041 );
1042 assert_matches!(
1043 v3_v3_message.get_field_by_name("v2").unwrap().kind(),
1044 prost_reflect::Kind::Int64
1045 );
1046 }
1047}