diff --git a/.github/workflows/develop.yml b/.github/workflows/develop.yml index a409e3a0cb..23e81f5667 100644 --- a/.github/workflows/develop.yml +++ b/.github/workflows/develop.yml @@ -194,38 +194,37 @@ jobs: runs-on: windows-latest-8-cores timeout-minutes: 60 steps: - - run: 'echo "temporary disabled"' - # - run: git config --global core.autocrlf false - # - uses: actions/checkout@v3 - # - uses: arduino/setup-protoc@v1 - # with: - # repo-token: ${{ secrets.GITHUB_TOKEN }} - # - name: Install Rust toolchain - # uses: dtolnay/rust-toolchain@master - # with: - # toolchain: ${{ env.RUST_TOOLCHAIN }} - # components: llvm-tools-preview - # - name: Rust Cache - # uses: Swatinem/rust-cache@v2 - # - name: Install Cargo Nextest - # uses: taiki-e/install-action@nextest - # - name: Install Python - # uses: actions/setup-python@v4 - # with: - # python-version: '3.10' - # - name: Install PyArrow Package - # run: pip install pyarrow - # - name: Install WSL distribution - # uses: Vampire/setup-wsl@v2 - # with: - # distribution: Ubuntu-22.04 - # - name: Running tests - # run: cargo nextest run -F pyo3_backend,dashboard - # env: - # RUST_BACKTRACE: 1 - # CARGO_INCREMENTAL: 0 - # GT_S3_BUCKET: ${{ secrets.S3_BUCKET }} - # GT_S3_ACCESS_KEY_ID: ${{ secrets.S3_ACCESS_KEY_ID }} - # GT_S3_ACCESS_KEY: ${{ secrets.S3_ACCESS_KEY }} - # GT_S3_REGION: ${{ secrets.S3_REGION }} - # UNITTEST_LOG_DIR: "__unittest_logs" + - run: git config --global core.autocrlf false + - uses: actions/checkout@v3 + - uses: arduino/setup-protoc@v1 + with: + repo-token: ${{ secrets.GITHUB_TOKEN }} + - name: Install Rust toolchain + uses: dtolnay/rust-toolchain@master + with: + toolchain: ${{ env.RUST_TOOLCHAIN }} + components: llvm-tools-preview + - name: Rust Cache + uses: Swatinem/rust-cache@v2 + - name: Install Cargo Nextest + uses: taiki-e/install-action@nextest + - name: Install Python + uses: actions/setup-python@v4 + with: + python-version: '3.10' + - name: Install PyArrow Package + run: pip install pyarrow + - name: Install WSL distribution + uses: Vampire/setup-wsl@v2 + with: + distribution: Ubuntu-22.04 + - name: Running tests + run: cargo nextest run -F pyo3_backend,dashboard + env: + RUST_BACKTRACE: 1 + CARGO_INCREMENTAL: 0 + GT_S3_BUCKET: ${{ secrets.S3_BUCKET }} + GT_S3_ACCESS_KEY_ID: ${{ secrets.S3_ACCESS_KEY_ID }} + GT_S3_ACCESS_KEY: ${{ secrets.S3_ACCESS_KEY }} + GT_S3_REGION: ${{ secrets.S3_REGION }} + UNITTEST_LOG_DIR: "__unittest_logs" diff --git a/Cargo.lock b/Cargo.lock index 8af66ef41a..1f800949a5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1712,6 +1712,7 @@ dependencies = [ "datafusion", "derive_builder 0.12.0", "futures", + "lazy_static", "object-store", "orc-rust", "paste", diff --git a/src/common/datasource/Cargo.toml b/src/common/datasource/Cargo.toml index 5b21ebb070..edb327519a 100644 --- a/src/common/datasource/Cargo.toml +++ b/src/common/datasource/Cargo.toml @@ -22,6 +22,7 @@ common-runtime = { workspace = true } datafusion.workspace = true derive_builder.workspace = true futures.workspace = true +lazy_static.workspace = true object-store = { workspace = true } orc-rust = "0.2" paste = "1.0" diff --git a/src/common/datasource/src/error.rs b/src/common/datasource/src/error.rs index cfc7a45c9e..ab1e918100 100644 --- a/src/common/datasource/src/error.rs +++ b/src/common/datasource/src/error.rs @@ -30,10 +30,11 @@ pub enum Error { location: Location, }, - #[snafu(display("Unsupported backend protocol: {}", protocol))] + #[snafu(display("Unsupported backend protocol: {}, url: {}", protocol, url))] UnsupportedBackendProtocol { protocol: String, location: Location, + url: String, }, #[snafu(display("Unsupported format protocol: {}", format))] diff --git a/src/common/datasource/src/lister.rs b/src/common/datasource/src/lister.rs index f0e246ff1f..6134469396 100644 --- a/src/common/datasource/src/lister.rs +++ b/src/common/datasource/src/lister.rs @@ -27,7 +27,7 @@ pub enum Source { pub struct Lister { object_store: ObjectStore, source: Source, - path: String, + root: String, regex: Option, } @@ -35,13 +35,13 @@ impl Lister { pub fn new( object_store: ObjectStore, source: Source, - path: String, + root: String, regex: Option, ) -> Self { Lister { object_store, source, - path, + root, regex, } } @@ -51,9 +51,9 @@ impl Lister { Source::Dir => { let streamer = self .object_store - .lister_with(&self.path) + .lister_with("/") .await - .context(error::ListObjectsSnafu { path: &self.path })?; + .context(error::ListObjectsSnafu { path: &self.root })?; streamer .try_filter(|f| { @@ -66,22 +66,21 @@ impl Lister { }) .try_collect::>() .await - .context(error::ListObjectsSnafu { path: &self.path }) + .context(error::ListObjectsSnafu { path: &self.root }) } Source::Filename(filename) => { // make sure this file exists - let file_full_path = format!("{}{}", self.path, filename); - let _ = self.object_store.stat(&file_full_path).await.context( + let _ = self.object_store.stat(filename).await.with_context(|_| { error::ListObjectsSnafu { - path: &file_full_path, - }, - )?; + path: format!("{}{}", &self.root, filename), + } + })?; Ok(self .object_store - .list_with(&self.path) + .list_with("/") .await - .context(error::ListObjectsSnafu { path: &self.path })? + .context(error::ListObjectsSnafu { path: &self.root })? .into_iter() .find(|f| f.name() == filename) .map(|f| vec![f]) diff --git a/src/common/datasource/src/object_store.rs b/src/common/datasource/src/object_store.rs index a07636554f..c9e36018c2 100644 --- a/src/common/datasource/src/object_store.rs +++ b/src/common/datasource/src/object_store.rs @@ -16,19 +16,29 @@ pub mod fs; pub mod s3; use std::collections::HashMap; +use lazy_static::lazy_static; use object_store::ObjectStore; +use regex::Regex; use snafu::{OptionExt, ResultExt}; use url::{ParseError, Url}; use self::fs::build_fs_backend; use self::s3::build_s3_backend; use crate::error::{self, Result}; +use crate::util::find_dir_and_filename; pub const FS_SCHEMA: &str = "FS"; pub const S3_SCHEMA: &str = "S3"; /// Returns `(schema, Option, path)` pub fn parse_url(url: &str) -> Result<(String, Option, String)> { + #[cfg(windows)] + { + // On Windows, the url may start with `C:/`. + if let Some(_) = handle_windows_path(url) { + return Ok((FS_SCHEMA.to_string(), None, url.to_string())); + } + } let parsed_url = Url::parse(url); match parsed_url { Ok(url) => Ok(( @@ -44,17 +54,47 @@ pub fn parse_url(url: &str) -> Result<(String, Option, String)> { } pub fn build_backend(url: &str, connection: &HashMap) -> Result { - let (schema, host, _path) = parse_url(url)?; + let (schema, host, path) = parse_url(url)?; + let (root, _) = find_dir_and_filename(&path); match schema.to_uppercase().as_str() { S3_SCHEMA => { let host = host.context(error::EmptyHostPathSnafu { url: url.to_string(), })?; - Ok(build_s3_backend(&host, "/", connection)?) + Ok(build_s3_backend(&host, &root, connection)?) } - FS_SCHEMA => Ok(build_fs_backend("/")?), + FS_SCHEMA => Ok(build_fs_backend(&root)?), - _ => error::UnsupportedBackendProtocolSnafu { protocol: schema }.fail(), + _ => error::UnsupportedBackendProtocolSnafu { + protocol: schema, + url, + } + .fail(), + } +} + +lazy_static! { + static ref DISK_SYMBOL_PATTERN: Regex = Regex::new("^([A-Za-z]:/)").unwrap(); +} + +pub fn handle_windows_path(url: &str) -> Option { + DISK_SYMBOL_PATTERN + .captures(url) + .map(|captures| captures[0].to_string()) +} + +#[cfg(test)] +mod tests { + use super::handle_windows_path; + + #[test] + fn test_handle_windows_path() { + assert_eq!( + handle_windows_path("C:/to/path/file"), + Some("C:/".to_string()) + ); + assert_eq!(handle_windows_path("https://google.com"), None); + assert_eq!(handle_windows_path("s3://bucket/path/to"), None); } } diff --git a/src/operator/src/statement/copy_table_from.rs b/src/operator/src/statement/copy_table_from.rs index 8dd67a7daa..b335e5356a 100644 --- a/src/operator/src/statement/copy_table_from.rs +++ b/src/operator/src/statement/copy_table_from.rs @@ -29,6 +29,7 @@ use common_datasource::object_store::{build_backend, parse_url}; use common_datasource::util::find_dir_and_filename; use common_recordbatch::adapter::ParquetRecordBatchStreamAdapter; use common_recordbatch::DfSendableRecordBatchStream; +use common_telemetry::debug; use datafusion::datasource::listing::PartitionedFile; use datafusion::datasource::object_store::ObjectStoreUrl; use datafusion::datasource::physical_plan::{FileOpener, FileScanConfig, FileStream}; @@ -75,10 +76,10 @@ impl StatementExecutor { Source::Dir }; - let lister = Lister::new(object_store.clone(), source, dir, regex); + let lister = Lister::new(object_store.clone(), source.clone(), dir.to_string(), regex); let entries = lister.list().await.context(error::ListObjectsSnafu)?; - + debug!("Copy from dir: {dir:?}, {source:?}, entries: {entries:?}"); Ok((object_store, entries)) } diff --git a/src/operator/src/statement/copy_table_to.rs b/src/operator/src/statement/copy_table_to.rs index c23d5473f3..ca6830d5ec 100644 --- a/src/operator/src/statement/copy_table_to.rs +++ b/src/operator/src/statement/copy_table_to.rs @@ -19,16 +19,18 @@ use common_datasource::file_format::csv::stream_to_csv; use common_datasource::file_format::json::stream_to_json; use common_datasource::file_format::Format; use common_datasource::object_store::{build_backend, parse_url}; +use common_datasource::util::find_dir_and_filename; use common_query::Output; use common_recordbatch::adapter::DfRecordBatchStreamAdapter; use common_recordbatch::SendableRecordBatchStream; +use common_telemetry::debug; use datafusion::datasource::DefaultTableSource; use datafusion_common::TableReference as DfTableReference; use datafusion_expr::LogicalPlanBuilder; use object_store::ObjectStore; use query::plan::LogicalPlan; use session::context::QueryContextRef; -use snafu::ResultExt; +use snafu::{OptionExt, ResultExt}; use storage::sst::SstInfo; use storage::{ParquetWriter, Source}; use table::engine::TableReference; @@ -89,7 +91,7 @@ impl StatementExecutor { ) -> Result { let table_ref = TableReference::full(&req.catalog_name, &req.schema_name, &req.table_name); let table = self.get_table(&table_ref).await?; - + let table_id = table.table_info().table_id(); let format = Format::try_from(&req.with).context(error::ParseFileFormatSnafu)?; let df_table_ref = DfTableReference::from(table_ref); @@ -132,11 +134,15 @@ impl StatementExecutor { }; let (_schema, _host, path) = parse_url(&req.location).context(error::ParseUrlSnafu)?; + let (_, filename) = find_dir_and_filename(&path); + let filename = filename.context(error::UnexpectedSnafu { + violated: format!("Expected filename, path: {path}"), + })?; let object_store = build_backend(&req.location, &req.connection).context(error::BuildBackendSnafu)?; - + debug!("Copy table: {table_id} to path: {path}"); let rows_copied = self - .stream_to_file(stream, &format, object_store, &path) + .stream_to_file(stream, &format, object_store, &filename) .await?; Ok(rows_copied) diff --git a/tests-integration/src/tests/instance_test.rs b/tests-integration/src/tests/instance_test.rs index afceddfe82..3e4073bb73 100644 --- a/tests-integration/src/tests/instance_test.rs +++ b/tests-integration/src/tests/instance_test.rs @@ -32,7 +32,7 @@ use session::context::{QueryContext, QueryContextRef}; use crate::test_util::check_output_stream; use crate::tests::test_util::{ both_instances_cases, check_unordered_output_stream, distributed, find_testing_resource, - standalone, standalone_instance_case, MockInstance, + prepare_path, standalone, standalone_instance_case, MockInstance, }; #[apply(both_instances_cases)] @@ -538,7 +538,7 @@ async fn test_execute_external_create(instance: Arc) { let instance = instance.frontend(); let tmp_dir = temp_dir::create_temp_dir("test_execute_external_create"); - let location = tmp_dir.path().to_str().unwrap(); + let location = prepare_path(tmp_dir.path().to_str().unwrap()); let output = execute_sql( &instance, @@ -575,12 +575,7 @@ async fn test_execute_external_create_infer_format(instance: Arc let instance = instance.frontend(); let tmp_dir = temp_dir::create_temp_dir("test_execute_external_create_without_ts"); - let location = tmp_dir.path().to_str().unwrap(); + let location = prepare_path(tmp_dir.path().to_str().unwrap()); let result = try_execute_sql( &instance, @@ -616,7 +611,7 @@ async fn test_execute_external_create_with_invalid_ts(instance: Arc String { - let p = find_workspace_path(path).display().to_string(); - +pub fn prepare_path(p: &str) -> String { #[cfg(windows)] let p = { // We need unix style path even in the Windows, because the path is used in object-store, must // be delimited with '/'. Inside the object-store, it will be converted to file system needed // path in the end. - let p = p.replace(':', "").replace('\\', "/"); - - // Prepend a '/' to indicate it's a file system path when parsed as object-store url in Windows. - format!("/{p}") + p.replace('\\', "/") }; - p + p.to_string() +} + +/// Find the testing file resource under workspace root to be used in object store. +pub fn find_testing_resource(path: &str) -> String { + let p = find_workspace_path(path).display().to_string(); + + prepare_path(&p) }