1use std::collections::HashMap;
16use std::future::Future;
17use std::path::Path;
18use std::pin::Pin;
19use std::sync::Arc;
20use std::task::{Context, Poll};
21
22use client::{Output, OutputData, OutputMeta};
23use common_base::readable_size::ReadableSize;
24use common_datasource::file_format::csv::{
25 CsvFormat, is_skippable_arrow_error, tolerant_csv_stream,
26};
27use common_datasource::file_format::json::JsonFormat;
28use common_datasource::file_format::orc::{ReaderAdapter, infer_orc_schema, new_orc_stream_reader};
29use common_datasource::file_format::{FileFormat, Format, file_to_stream};
30use common_datasource::lister::{Lister, Source};
31use common_datasource::object_store::{FS_SCHEMA, build_backend, parse_url};
32use common_datasource::util::find_dir_and_filename;
33use common_query::{OutputCost, OutputRows};
34use common_recordbatch::DfSendableRecordBatchStream;
35use common_recordbatch::adapter::RecordBatchStreamTypeAdapter;
36use common_telemetry::{debug, tracing};
37use datafusion::datasource::physical_plan::{CsvSource, FileSource, JsonSource};
38use datafusion::parquet::arrow::ParquetRecordBatchStreamBuilder;
39use datafusion::parquet::arrow::arrow_reader::ArrowReaderMetadata;
40use datafusion_common::DataFusionError;
41use datafusion_common::arrow::error::ArrowError;
42use datafusion_common::config::CsvOptions;
43use datafusion_expr::Expr;
44use datatypes::arrow::compute::can_cast_types;
45use datatypes::arrow::datatypes::{DataType as ArrowDataType, Schema, SchemaRef};
46use datatypes::arrow::record_batch::RecordBatch;
47use datatypes::vectors::Helper;
48use futures_util::StreamExt;
49use object_store::{Entry, EntryMode, ObjectStore};
50use regex::Regex;
51use session::context::QueryContextRef;
52use snafu::{ResultExt, ensure};
53use table::requests::{CopyTableRequest, InsertRequest};
54use table::table_reference::TableReference;
55use tokio_util::compat::FuturesAsyncReadCompatExt;
56
57use crate::error::{self, IntoVectorsSnafu, PathNotFoundSnafu, Result};
58use crate::statement::StatementExecutor;
59
60const DEFAULT_BATCH_SIZE: usize = 8192;
61const DEFAULT_READ_BUFFER: usize = 256 * 1024;
62
63enum FileMetadata {
64 Parquet {
65 schema: SchemaRef,
66 metadata: ArrowReaderMetadata,
67 path: String,
68 },
69 Orc {
70 schema: SchemaRef,
71 path: String,
72 },
73 Json {
74 schema: SchemaRef,
75 format: JsonFormat,
76 path: String,
77 },
78 Csv {
79 schema: SchemaRef,
80 format: CsvFormat,
81 path: String,
82 },
83}
84
85impl FileMetadata {
86 pub fn schema(&self) -> &SchemaRef {
88 match self {
89 FileMetadata::Parquet { schema, .. } => schema,
90 FileMetadata::Orc { schema, .. } => schema,
91 FileMetadata::Json { schema, .. } => schema,
92 FileMetadata::Csv { schema, .. } => schema,
93 }
94 }
95}
96
97impl StatementExecutor {
98 async fn list_copy_from_entries(
99 &self,
100 req: &CopyTableRequest,
101 ) -> Result<(ObjectStore, Vec<Entry>)> {
102 let (schema, _host, path) = parse_url(&req.location).context(error::ParseUrlSnafu)?;
103
104 if schema.to_uppercase() == FS_SCHEMA {
105 ensure!(Path::new(&path).exists(), PathNotFoundSnafu { path });
106 }
107
108 let object_store =
109 build_backend(&req.location, &req.connection).context(error::BuildBackendSnafu)?;
110
111 let (dir, filename) = find_dir_and_filename(&path);
112 let regex = req
113 .pattern
114 .as_ref()
115 .map(|x| Regex::new(x))
116 .transpose()
117 .context(error::BuildRegexSnafu)?;
118
119 let source = if let Some(filename) = filename {
120 Source::Filename(filename)
121 } else {
122 Source::Dir
123 };
124
125 let lister = Lister::new(object_store.clone(), source.clone(), dir.clone(), regex);
126
127 let entries = lister.list().await.context(error::ListObjectsSnafu)?;
128 debug!("Copy from dir: {dir:?}, {source:?}, entries: {entries:?}");
129 Ok((object_store, entries))
130 }
131
132 async fn collect_metadata(
133 &self,
134 object_store: &ObjectStore,
135 format: Format,
136 path: String,
137 ) -> Result<FileMetadata> {
138 match format {
139 Format::Csv(format) => Ok(FileMetadata::Csv {
140 schema: Arc::new(
141 format
142 .infer_schema(object_store, &path)
143 .await
144 .context(error::InferSchemaSnafu { path: &path })?,
145 ),
146 format,
147 path,
148 }),
149 Format::Json(format) => Ok(FileMetadata::Json {
150 schema: Arc::new(
151 format
152 .infer_schema(object_store, &path)
153 .await
154 .context(error::InferSchemaSnafu { path: &path })?,
155 ),
156 format,
157 path,
158 }),
159 Format::Parquet(_) => {
160 let meta = object_store
161 .stat(&path)
162 .await
163 .context(error::ReadObjectSnafu { path: &path })?;
164 let mut reader = object_store
165 .reader(&path)
166 .await
167 .context(error::ReadObjectSnafu { path: &path })?
168 .into_futures_async_read(0..meta.content_length())
169 .await
170 .context(error::ReadObjectSnafu { path: &path })?
171 .compat();
172 let metadata = ArrowReaderMetadata::load_async(&mut reader, Default::default())
173 .await
174 .context(error::ReadParquetMetadataSnafu)?;
175
176 Ok(FileMetadata::Parquet {
177 schema: metadata.schema().clone(),
178 metadata,
179 path,
180 })
181 }
182 Format::Orc(_) => {
183 let meta = object_store
184 .stat(&path)
185 .await
186 .context(error::ReadObjectSnafu { path: &path })?;
187
188 let reader = object_store
189 .reader(&path)
190 .await
191 .context(error::ReadObjectSnafu { path: &path })?;
192
193 let schema = infer_orc_schema(ReaderAdapter::new(reader, meta.content_length()))
194 .await
195 .context(error::ReadOrcSnafu)?;
196
197 Ok(FileMetadata::Orc {
198 schema: Arc::new(schema),
199 path,
200 })
201 }
202 }
203 }
204
205 async fn build_read_stream(
206 &self,
207 compat_schema: SchemaRef,
208 object_store: &ObjectStore,
209 file_metadata: &FileMetadata,
210 projection: Vec<usize>,
211 filters: Vec<Expr>,
212 ) -> Result<DfSendableRecordBatchStream> {
213 match file_metadata {
214 FileMetadata::Csv {
215 format,
216 path,
217 schema,
218 } => {
219 let output_schema = Arc::new(
220 compat_schema
221 .project(&projection)
222 .context(error::ProjectSchemaSnafu)?,
223 );
224
225 let options = CsvOptions::default()
226 .with_has_header(format.has_header)
227 .with_delimiter(format.delimiter);
228 let csv_source = CsvSource::new(schema.clone())
229 .with_csv_options(options)
230 .with_batch_size(DEFAULT_BATCH_SIZE);
231 let stream = if format.skip_bad_records {
232 let reader_schema =
233 csv_reader_schema_for_skip_bad_records(schema, &compat_schema);
234 tolerant_csv_stream(
235 object_store,
236 path,
237 Arc::new(reader_schema),
238 projection.clone(),
239 format,
240 )
241 .await
242 .context(error::BuildFileStreamSnafu)?
243 } else {
244 file_to_stream(
245 object_store,
246 path,
247 csv_source,
248 Some(projection),
249 format.compression_type,
250 )
251 .await
252 .context(error::BuildFileStreamSnafu)?
253 };
254
255 let stream = Box::pin(
256 RecordBatchStreamTypeAdapter::new(output_schema, stream, None)
259 .with_filter(filters)
260 .context(error::PhysicalExprSnafu)?,
261 );
262 if format.skip_bad_records {
263 Ok(Box::pin(SkipBadRecordsStream::new(stream, path)))
264 } else {
265 Ok(stream)
266 }
267 }
268 FileMetadata::Json {
269 path,
270 format,
271 schema,
272 } => {
273 let output_schema = Arc::new(
274 compat_schema
275 .project(&projection)
276 .context(error::ProjectSchemaSnafu)?,
277 );
278
279 let json_source =
280 JsonSource::new(schema.clone()).with_batch_size(DEFAULT_BATCH_SIZE);
281 let stream = file_to_stream(
282 object_store,
283 path,
284 json_source,
285 Some(projection),
286 format.compression_type,
287 )
288 .await
289 .context(error::BuildFileStreamSnafu)?;
290
291 Ok(Box::pin(
292 RecordBatchStreamTypeAdapter::new(output_schema, stream, None)
295 .with_filter(filters)
296 .context(error::PhysicalExprSnafu)?,
297 ))
298 }
299 FileMetadata::Parquet { metadata, path, .. } => {
300 let meta = object_store
301 .stat(path)
302 .await
303 .context(error::ReadObjectSnafu { path })?;
304 let reader = object_store
305 .reader_with(path)
306 .chunk(DEFAULT_READ_BUFFER)
307 .await
308 .context(error::ReadObjectSnafu { path })?
309 .into_futures_async_read(0..meta.content_length())
310 .await
311 .context(error::ReadObjectSnafu { path })?
312 .compat();
313 let builder =
314 ParquetRecordBatchStreamBuilder::new_with_metadata(reader, metadata.clone());
315 let stream = builder
316 .build()
317 .context(error::BuildParquetRecordBatchStreamSnafu)?;
318
319 let output_schema = Arc::new(
320 compat_schema
321 .project(&projection)
322 .context(error::ProjectSchemaSnafu)?,
323 );
324 Ok(Box::pin(
325 RecordBatchStreamTypeAdapter::new(output_schema, stream, Some(projection))
326 .with_filter(filters)
327 .context(error::PhysicalExprSnafu)?,
328 ))
329 }
330 FileMetadata::Orc { path, .. } => {
331 let meta = object_store
332 .stat(path)
333 .await
334 .context(error::ReadObjectSnafu { path })?;
335
336 let reader = object_store
337 .reader_with(path)
338 .chunk(DEFAULT_READ_BUFFER)
339 .await
340 .context(error::ReadObjectSnafu { path })?;
341 let stream =
342 new_orc_stream_reader(ReaderAdapter::new(reader, meta.content_length()))
343 .await
344 .context(error::ReadOrcSnafu)?;
345
346 let output_schema = Arc::new(
347 compat_schema
348 .project(&projection)
349 .context(error::ProjectSchemaSnafu)?,
350 );
351
352 Ok(Box::pin(
353 RecordBatchStreamTypeAdapter::new(output_schema, stream, Some(projection))
354 .with_filter(filters)
355 .context(error::PhysicalExprSnafu)?,
356 ))
357 }
358 }
359 }
360
361 #[tracing::instrument(skip_all)]
362 pub async fn copy_table_from(
363 &self,
364 req: CopyTableRequest,
365 query_ctx: QueryContextRef,
366 ) -> Result<Output> {
367 let table_ref = TableReference {
368 catalog: &req.catalog_name,
369 schema: &req.schema_name,
370 table: &req.table_name,
371 };
372 let table = self.get_table(&table_ref).await?;
373 let format = Format::try_from(&req.with).context(error::ParseFileFormatSnafu)?;
374 let (object_store, entries) = self.list_copy_from_entries(&req).await?;
375 let mut files = Vec::with_capacity(entries.len());
376 let table_schema = table.schema().arrow_schema().clone();
377 let filters = table
378 .schema()
379 .timestamp_column()
380 .and_then(|c| {
381 common_query::logical_plan::build_same_type_ts_filter(c, req.timestamp_range)
382 })
383 .into_iter()
384 .collect::<Vec<_>>();
385
386 for entry in entries.iter() {
387 if entry.metadata().mode() != EntryMode::FILE {
388 continue;
389 }
390 let path = entry.path();
391 let file_metadata = self
392 .collect_metadata(&object_store, format.clone(), path.to_string())
393 .await?;
394
395 let file_schema = file_metadata.schema();
396 let (file_schema_projection, table_schema_projection, compat_schema) =
397 generated_schema_projection_and_compatible_file_schema(file_schema, &table_schema);
398 let projected_file_schema = Arc::new(
399 file_schema
400 .project(&file_schema_projection)
401 .context(error::ProjectSchemaSnafu)?,
402 );
403 let projected_table_schema = Arc::new(
404 table_schema
405 .project(&table_schema_projection)
406 .context(error::ProjectSchemaSnafu)?,
407 );
408 ensure_schema_compatible(&projected_file_schema, &projected_table_schema)?;
409
410 files.push((
411 Arc::new(compat_schema),
412 file_schema_projection,
413 projected_table_schema,
414 file_metadata,
415 ))
416 }
417
418 let mut rows_inserted = 0;
419 let mut insert_cost = 0;
420 let max_insert_rows = req.limit.map(|n| n as usize);
421 for (compat_schema, file_schema_projection, projected_table_schema, file_metadata) in files
422 {
423 let mut stream = self
424 .build_read_stream(
425 compat_schema,
426 &object_store,
427 &file_metadata,
428 file_schema_projection,
429 filters.clone(),
430 )
431 .await?;
432
433 let fields = projected_table_schema
434 .fields()
435 .iter()
436 .map(|f| f.name().clone())
437 .collect::<Vec<_>>();
438
439 let pending_mem_threshold = ReadableSize::mb(32).as_bytes();
441 let mut pending_mem_size = 0;
442 let mut pending = vec![];
443
444 while let Some(r) = stream.next().await {
445 let record_batch = r.context(error::ReadDfRecordBatchSnafu)?;
446 let vectors =
447 Helper::try_into_vectors(record_batch.columns()).context(IntoVectorsSnafu)?;
448
449 pending_mem_size += vectors.iter().map(|v| v.memory_size()).sum::<usize>();
450
451 let columns_values = fields
452 .iter()
453 .cloned()
454 .zip(vectors)
455 .collect::<HashMap<_, _>>();
456
457 pending.push(self.inserter.handle_table_insert(
458 InsertRequest {
459 catalog_name: req.catalog_name.clone(),
460 schema_name: req.schema_name.clone(),
461 table_name: req.table_name.clone(),
462 columns_values,
463 },
464 query_ctx.clone(),
465 ));
466
467 if pending_mem_size as u64 >= pending_mem_threshold {
468 let (rows, cost) = batch_insert(&mut pending, &mut pending_mem_size).await?;
469 rows_inserted += rows;
470 insert_cost += cost;
471 }
472
473 if let Some(max_insert_rows) = max_insert_rows
474 && rows_inserted >= max_insert_rows
475 {
476 return Ok(gen_insert_output(rows_inserted, insert_cost));
477 }
478 }
479
480 if !pending.is_empty() {
481 let (rows, cost) = batch_insert(&mut pending, &mut pending_mem_size).await?;
482 rows_inserted += rows;
483 insert_cost += cost;
484 }
485 }
486
487 Ok(gen_insert_output(rows_inserted, insert_cost))
488 }
489}
490
491fn gen_insert_output(rows_inserted: usize, insert_cost: usize) -> Output {
492 Output::new(
493 OutputData::AffectedRows(rows_inserted),
494 OutputMeta::new_with_cost(insert_cost),
495 )
496}
497
498struct SkipBadRecordsStream {
499 inner: DfSendableRecordBatchStream,
500 path: String,
501}
502
503impl SkipBadRecordsStream {
504 fn new(inner: DfSendableRecordBatchStream, path: impl Into<String>) -> Self {
505 Self {
506 inner,
507 path: path.into(),
508 }
509 }
510}
511
512impl datafusion::physical_plan::RecordBatchStream for SkipBadRecordsStream {
513 fn schema(&self) -> SchemaRef {
514 self.inner.schema()
515 }
516}
517
518impl futures::Stream for SkipBadRecordsStream {
519 type Item = datafusion_common::Result<RecordBatch>;
520
521 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
522 let this = self.get_mut();
523 loop {
524 match this.inner.as_mut().poll_next(cx) {
525 Poll::Ready(Some(Err(error))) if is_skippable_record_error(&error) => {
526 common_telemetry::warn!(
527 "Skipping bad record while copying from {}: {}",
528 this.path,
529 error
530 );
531 continue;
532 }
533 other => return other,
534 }
535 }
536 }
537}
538
539fn is_skippable_record_error(error: &DataFusionError) -> bool {
540 match error {
541 DataFusionError::ArrowError(error, _) => is_skippable_arrow_error(error),
542 DataFusionError::External(error) => error
543 .downcast_ref::<ArrowError>()
544 .is_some_and(is_skippable_arrow_error),
545 DataFusionError::Context(_, error) => is_skippable_record_error(error),
546 _ => false,
547 }
548}
549
550async fn batch_insert(
552 pending: &mut Vec<impl Future<Output = Result<Output>>>,
553 pending_bytes: &mut usize,
554) -> Result<(OutputRows, OutputCost)> {
555 let batch = pending.drain(..);
556 let result = futures::future::try_join_all(batch)
557 .await?
558 .iter()
559 .map(|o| o.extract_rows_and_cost())
560 .reduce(|(a, b), (c, d)| (a + c, b + d))
561 .unwrap_or((0, 0));
562 *pending_bytes = 0;
563 Ok(result)
564}
565
566fn can_cast_types_for_greptime(from: &ArrowDataType, to: &ArrowDataType) -> bool {
568 if let ArrowDataType::Map(_, _) = from
570 && let ArrowDataType::Binary = to
571 {
572 return true;
573 }
574
575 can_cast_types(from, to)
577}
578
579fn csv_reader_schema_for_skip_bad_records(file: &SchemaRef, compat: &SchemaRef) -> Schema {
580 let fields = file
581 .fields()
582 .iter()
583 .enumerate()
584 .map(|(idx, file_field)| {
585 let compat_field = compat
586 .fields()
587 .find(file_field.name())
588 .map(|(_, field)| field);
589
590 match compat_field {
591 Some(compat_field) if can_csv_reader_parse_type(compat_field.data_type()) => {
592 compat_field.clone()
593 }
594 _ => file.fields()[idx].clone(),
595 }
596 })
597 .collect::<Vec<_>>();
598
599 Schema::new_with_metadata(fields, file.metadata().clone())
600}
601
602fn can_csv_reader_parse_type(data_type: &ArrowDataType) -> bool {
603 match data_type {
604 ArrowDataType::Boolean
605 | ArrowDataType::Decimal32(_, _)
606 | ArrowDataType::Decimal64(_, _)
607 | ArrowDataType::Decimal128(_, _)
608 | ArrowDataType::Decimal256(_, _)
609 | ArrowDataType::Int8
610 | ArrowDataType::Int16
611 | ArrowDataType::Int32
612 | ArrowDataType::Int64
613 | ArrowDataType::UInt8
614 | ArrowDataType::UInt16
615 | ArrowDataType::UInt32
616 | ArrowDataType::UInt64
617 | ArrowDataType::Float32
618 | ArrowDataType::Float64
619 | ArrowDataType::Date32
620 | ArrowDataType::Date64
621 | ArrowDataType::Time32(_)
622 | ArrowDataType::Time64(_)
623 | ArrowDataType::Timestamp(_, _)
624 | ArrowDataType::Null
625 | ArrowDataType::Utf8
626 | ArrowDataType::Utf8View => true,
627 ArrowDataType::Dictionary(_, value_type) => value_type.as_ref() == &ArrowDataType::Utf8,
628 _ => false,
629 }
630}
631
632fn ensure_schema_compatible(from: &SchemaRef, to: &SchemaRef) -> Result<()> {
633 let not_match = from
634 .fields
635 .iter()
636 .zip(to.fields.iter())
637 .map(|(l, r)| (l.data_type(), r.data_type()))
638 .enumerate()
639 .find(|(_, (l, r))| !can_cast_types_for_greptime(l, r));
640
641 if let Some((index, _)) = not_match {
642 error::InvalidSchemaSnafu {
643 index,
644 table_schema: to.to_string(),
645 file_schema: from.to_string(),
646 }
647 .fail()
648 } else {
649 Ok(())
650 }
651}
652
653fn generated_schema_projection_and_compatible_file_schema(
658 file: &SchemaRef,
659 table: &SchemaRef,
660) -> (Vec<usize>, Vec<usize>, Schema) {
661 let mut file_projection = Vec::with_capacity(file.fields.len());
662 let mut table_projection = Vec::with_capacity(file.fields.len());
663 let mut compatible_fields = file.fields.iter().cloned().collect::<Vec<_>>();
664 for (file_idx, file_field) in file.fields.iter().enumerate() {
665 if let Some((table_idx, table_field)) = table.fields.find(file_field.name()) {
666 file_projection.push(file_idx);
667 table_projection.push(table_idx);
668
669 compatible_fields[file_idx] = table_field.clone();
671 }
672 }
673
674 (
675 file_projection,
676 table_projection,
677 Schema::new(compatible_fields),
678 )
679}
680
681#[cfg(test)]
682mod tests {
683 use std::sync::Arc;
684
685 use datatypes::arrow::datatypes::{DataType, Field, Schema};
686
687 use super::*;
688
689 fn test_schema_matches(from: (DataType, bool), to: (DataType, bool), matches: bool) {
690 let s1 = Arc::new(Schema::new(vec![Field::new("col", from.0.clone(), from.1)]));
691 let s2 = Arc::new(Schema::new(vec![Field::new("col", to.0.clone(), to.1)]));
692 let res = ensure_schema_compatible(&s1, &s2);
693 assert_eq!(
694 matches,
695 res.is_ok(),
696 "from data type: {}, to data type: {}, expected: {}, but got: {}",
697 from.0,
698 to.0,
699 matches,
700 res.is_ok()
701 )
702 }
703
704 #[test]
705 fn test_ensure_datatype_matches_ignore_timezone() {
706 test_schema_matches(
707 (
708 DataType::Timestamp(datatypes::arrow::datatypes::TimeUnit::Second, None),
709 true,
710 ),
711 (
712 DataType::Timestamp(datatypes::arrow::datatypes::TimeUnit::Second, None),
713 true,
714 ),
715 true,
716 );
717
718 test_schema_matches(
719 (
720 DataType::Timestamp(
721 datatypes::arrow::datatypes::TimeUnit::Second,
722 Some("UTC".into()),
723 ),
724 true,
725 ),
726 (
727 DataType::Timestamp(datatypes::arrow::datatypes::TimeUnit::Second, None),
728 true,
729 ),
730 true,
731 );
732
733 test_schema_matches(
734 (
735 DataType::Timestamp(
736 datatypes::arrow::datatypes::TimeUnit::Second,
737 Some("UTC".into()),
738 ),
739 true,
740 ),
741 (
742 DataType::Timestamp(
743 datatypes::arrow::datatypes::TimeUnit::Second,
744 Some("PDT".into()),
745 ),
746 true,
747 ),
748 true,
749 );
750
751 test_schema_matches(
752 (
753 DataType::Timestamp(
754 datatypes::arrow::datatypes::TimeUnit::Second,
755 Some("UTC".into()),
756 ),
757 true,
758 ),
759 (
760 DataType::Timestamp(
761 datatypes::arrow::datatypes::TimeUnit::Millisecond,
762 Some("UTC".into()),
763 ),
764 true,
765 ),
766 true,
767 );
768
769 test_schema_matches((DataType::Int8, true), (DataType::Int8, true), true);
770
771 test_schema_matches((DataType::Int8, true), (DataType::Int16, true), true);
772 }
773
774 #[test]
775 fn test_data_type_equals_ignore_timezone_with_options() {
776 test_schema_matches(
777 (
778 DataType::Timestamp(
779 datatypes::arrow::datatypes::TimeUnit::Microsecond,
780 Some("UTC".into()),
781 ),
782 true,
783 ),
784 (
785 DataType::Timestamp(
786 datatypes::arrow::datatypes::TimeUnit::Millisecond,
787 Some("PDT".into()),
788 ),
789 true,
790 ),
791 true,
792 );
793
794 test_schema_matches(
795 (DataType::Utf8, true),
796 (
797 DataType::Timestamp(
798 datatypes::arrow::datatypes::TimeUnit::Millisecond,
799 Some("PDT".into()),
800 ),
801 true,
802 ),
803 true,
804 );
805
806 test_schema_matches(
807 (
808 DataType::Timestamp(
809 datatypes::arrow::datatypes::TimeUnit::Millisecond,
810 Some("PDT".into()),
811 ),
812 true,
813 ),
814 (DataType::Utf8, true),
815 true,
816 );
817 }
818
819 #[test]
820 fn test_map_to_binary_json_compatibility() {
821 let map_type = DataType::Map(
823 Arc::new(Field::new(
824 "key_value",
825 DataType::Struct(
826 vec![
827 Field::new("key", DataType::Utf8, false),
828 Field::new("value", DataType::Utf8, false),
829 ]
830 .into(),
831 ),
832 false,
833 )),
834 false,
835 );
836
837 test_schema_matches((map_type, false), (DataType::Binary, true), true);
838
839 test_schema_matches((DataType::Int8, true), (DataType::Int16, true), true);
840 test_schema_matches((DataType::Utf8, true), (DataType::Binary, true), true);
841 }
842
843 fn make_test_schema(v: &[Field]) -> Arc<Schema> {
844 Arc::new(Schema::new(v.to_vec()))
845 }
846
847 #[test]
848 fn test_compatible_file_schema() {
849 let file_schema0 = make_test_schema(&[
850 Field::new("c1", DataType::UInt8, true),
851 Field::new("c2", DataType::UInt8, true),
852 ]);
853
854 let table_schema = make_test_schema(&[
855 Field::new("c1", DataType::Int16, true),
856 Field::new("c2", DataType::Int16, true),
857 Field::new("c3", DataType::Int16, true),
858 ]);
859
860 let compat_schema = make_test_schema(&[
861 Field::new("c1", DataType::Int16, true),
862 Field::new("c2", DataType::Int16, true),
863 ]);
864
865 let (_, tp, _) =
866 generated_schema_projection_and_compatible_file_schema(&file_schema0, &table_schema);
867
868 assert_eq!(table_schema.project(&tp).unwrap(), *compat_schema);
869 }
870
871 #[test]
872 fn test_schema_projection() {
873 let file_schema0 = make_test_schema(&[
874 Field::new("c1", DataType::UInt8, true),
875 Field::new("c2", DataType::UInt8, true),
876 Field::new("c3", DataType::UInt8, true),
877 ]);
878
879 let file_schema1 = make_test_schema(&[
880 Field::new("c3", DataType::UInt8, true),
881 Field::new("c4", DataType::UInt8, true),
882 ]);
883
884 let file_schema2 = make_test_schema(&[
885 Field::new("c3", DataType::UInt8, true),
886 Field::new("c4", DataType::UInt8, true),
887 Field::new("c5", DataType::UInt8, true),
888 ]);
889
890 let file_schema3 = make_test_schema(&[
891 Field::new("c1", DataType::UInt8, true),
892 Field::new("c2", DataType::UInt8, true),
893 ]);
894
895 let table_schema = make_test_schema(&[
896 Field::new("c3", DataType::UInt8, true),
897 Field::new("c4", DataType::UInt8, true),
898 Field::new("c5", DataType::UInt8, true),
899 ]);
900
901 let tests = [
902 (&file_schema0, &table_schema, true), (&file_schema1, &table_schema, true), (&file_schema2, &table_schema, true), (&file_schema3, &table_schema, true), ];
907
908 for test in tests {
909 let (fp, tp, _) =
910 generated_schema_projection_and_compatible_file_schema(test.0, test.1);
911 assert_eq!(test.0.project(&fp).unwrap(), test.1.project(&tp).unwrap());
912 }
913 }
914
915 #[test]
916 fn test_csv_reader_schema_for_skip_bad_records() {
917 let file_schema = make_test_schema(&[
918 Field::new("id", DataType::Utf8, true),
919 Field::new("jsons", DataType::Utf8, true),
920 Field::new("ts", DataType::Utf8, true),
921 ]);
922 let compat_schema = make_test_schema(&[
923 Field::new("id", DataType::UInt32, true),
924 Field::new("jsons", DataType::Binary, true),
925 Field::new(
926 "ts",
927 DataType::Timestamp(datatypes::arrow::datatypes::TimeUnit::Millisecond, None),
928 true,
929 ),
930 ]);
931
932 let reader_schema = csv_reader_schema_for_skip_bad_records(&file_schema, &compat_schema);
933
934 assert_eq!(reader_schema.field(0).data_type(), &DataType::UInt32);
935 assert_eq!(reader_schema.field(1).data_type(), &DataType::Utf8);
936 assert_eq!(
937 reader_schema.field(2).data_type(),
938 compat_schema.field(2).data_type()
939 );
940 }
941}