From f52fc9b7d4577b2f16885cc0939f3988bfb74e26 Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Tue, 28 Feb 2023 10:54:52 +0800 Subject: [PATCH] fix: fix panic when the root is not specified (#1089) --- src/datanode/src/error.rs | 14 +++++++++--- src/datanode/src/sql/copy_table.rs | 5 ++++- src/datanode/src/tests/instance_test.rs | 30 +++++++++++++++++++++++++ src/datanode/src/tests/test_util.rs | 4 ++++ 4 files changed, 49 insertions(+), 4 deletions(-) diff --git a/src/datanode/src/error.rs b/src/datanode/src/error.rs index 961a6adf73..b7a7bc1848 100644 --- a/src/datanode/src/error.rs +++ b/src/datanode/src/error.rs @@ -190,6 +190,12 @@ pub enum Error { backtrace: Backtrace, }, + #[snafu(display("Failed to build backend, source: {}", source))] + BuildBackend { + source: object_store::Error, + backtrace: Backtrace, + }, + #[snafu(display("Runtime resource error, source: {}", source))] RuntimeResource { #[snafu(backtrace)] @@ -482,9 +488,11 @@ impl ErrorExt for Error { | MissingRequiredField { .. } | IncorrectInternalState { .. } => StatusCode::Internal, - InitBackend { .. } | WriteParquet { .. } | PollStream { .. } | WriteObject { .. } => { - StatusCode::StorageUnavailable - } + BuildBackend { .. } + | InitBackend { .. } + | WriteParquet { .. } + | PollStream { .. } + | WriteObject { .. } => StatusCode::StorageUnavailable, OpenLogStore { source } => source.status_code(), StartScriptManager { source } => source.status_code(), OpenStorageEngine { source } => source.status_code(), diff --git a/src/datanode/src/sql/copy_table.rs b/src/datanode/src/sql/copy_table.rs index 8131604111..5b401649d4 100644 --- a/src/datanode/src/sql/copy_table.rs +++ b/src/datanode/src/sql/copy_table.rs @@ -52,7 +52,10 @@ impl SqlHandler { .context(error::TableScanExecSnafu)?; let stream = Box::pin(DfRecordBatchStreamAdapter::new(stream)); - let accessor = Builder::default().build().unwrap(); + let accessor = Builder::default() + .root("/") + .build() + .context(error::BuildBackendSnafu)?; let object_store = ObjectStore::new(accessor).finish(); let mut parquet_writer = ParquetWriter::new(req.file_name, stream, object_store); diff --git a/src/datanode/src/tests/instance_test.rs b/src/datanode/src/tests/instance_test.rs index 556c296266..7c6c2f3ac9 100644 --- a/src/datanode/src/tests/instance_test.rs +++ b/src/datanode/src/tests/instance_test.rs @@ -765,6 +765,36 @@ async fn test_delete() { check_output_stream(output, expect).await; } +#[tokio::test(flavor = "multi_thread")] +async fn test_execute_copy_to() { + let instance = setup_test_instance("test_execute_copy_to").await; + + // setups + execute_sql( + &instance, + "create table demo(host string, cpu double, memory double, ts timestamp time index);", + ) + .await; + + let output = execute_sql( + &instance, + r#"insert into demo(host, cpu, memory, ts) values + ('host1', 66.6, 1024, 1655276557000), + ('host2', 88.8, 333.3, 1655276558000) + "#, + ) + .await; + assert!(matches!(output, Output::AffectedRows(2))); + + // exports + let data_dir = instance.data_tmp_dir().path(); + + let copy_to_stmt = format!("Copy demo TO '{}/export/demo.parquet'", data_dir.display()); + + let output = execute_sql(&instance, ©_to_stmt).await; + assert!(matches!(output, Output::AffectedRows(2))); +} + async fn execute_sql(instance: &MockInstance, sql: &str) -> Output { execute_sql_in_db(instance, sql, DEFAULT_SCHEMA_NAME).await } diff --git a/src/datanode/src/tests/test_util.rs b/src/datanode/src/tests/test_util.rs index 71d085841b..5b1c700040 100644 --- a/src/datanode/src/tests/test_util.rs +++ b/src/datanode/src/tests/test_util.rs @@ -51,6 +51,10 @@ impl MockInstance { pub(crate) fn inner(&self) -> &Instance { &self.instance } + + pub(crate) fn data_tmp_dir(&self) -> &TempDir { + &self._guard._data_tmp_dir + } } struct TestGuard {