refactor: Removed openssl from build requirement (#308)

* refactor:replace another axum-test-helper branch

* refactor: upgrade opendal  version

* refactor: use cursor for file buffer

* refactor:remove native-tls in mysql_async

* refactor: use async block and pipeline for newer opendal api

* chore: update Cargo.lock

* chore: update dependencies

* docs: removed openssl from build requirement

* fix: call close on pipe writer to flush reader for parquet streamer

* refactor: remove redundant return

* chore: use pinned revision for our forked mysql_async

* style: avoid wild-card import in test code

* Apply suggestions from code review

Co-authored-by: Yingwen <realevenyag@gmail.com>

* style: use chained call for builder

Co-authored-by: liangxingjian <965662709@qq.com>
Co-authored-by: Yingwen <realevenyag@gmail.com>
This commit is contained in:
Ning Sun
2022-10-17 19:29:17 +08:00
committed by GitHub
parent 69ba4581b7
commit f243649971
17 changed files with 316 additions and 511 deletions

View File

@@ -54,7 +54,7 @@ version = "0.10"
features = ["io_csv", "io_json", "io_parquet", "io_parquet_compression", "io_ipc", "ahash", "compute", "serde_types"]
[dev-dependencies]
axum-test-helper = "0.1"
axum-test-helper = { git = "https://github.com/sunng87/axum-test-helper.git", branch = "patch-1" }
client = { path = "../client" }
common-query = { path = "../common/query" }
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2", features = ["simd"] }

View File

@@ -12,7 +12,7 @@ use common_query::Output;
use common_telemetry::logging::{debug, error, info};
use common_telemetry::timer;
use log_store::fs::{config::LogConfig, log::LocalFileLogStore};
use object_store::{backend::fs::Backend, util, ObjectStore};
use object_store::{services::fs::Builder, util, ObjectStore};
use query::query_engine::{QueryEngineFactory, QueryEngineRef};
use servers::query_handler::{GrpcAdminHandler, GrpcQueryHandler, SqlQueryHandler};
use snafu::prelude::*;
@@ -340,10 +340,9 @@ async fn new_object_store(store_config: &ObjectStoreConfig) -> Result<ObjectStor
info!("The storage directory is: {}", &data_dir);
let accessor = Backend::build()
let accessor = Builder::default()
.root(&data_dir)
.finish()
.await
.build()
.context(error::InitBackendSnafu { dir: &data_dir })?;
Ok(ObjectStore::new(accessor))

View File

@@ -98,7 +98,7 @@ mod tests {
use datatypes::schema::{ColumnSchema, SchemaBuilder, SchemaRef};
use datatypes::value::Value;
use log_store::fs::noop::NoopLogStore;
use object_store::{backend::fs::Backend, ObjectStore};
use object_store::{services::fs::Builder, ObjectStore};
use query::QueryEngineFactory;
use sql::statements::statement::Statement;
use storage::config::EngineConfig as StorageEngineConfig;
@@ -180,7 +180,7 @@ mod tests {
async fn test_statement_to_request() {
let dir = TempDir::new("setup_test_engine_and_table").unwrap();
let store_dir = dir.path().to_string_lossy();
let accessor = Backend::build().root(&store_dir).finish().await.unwrap();
let accessor = Builder::default().root(&store_dir).build().unwrap();
let object_store = ObjectStore::new(accessor);
let sql = r#"insert into demo(host, cpu, memory, ts) values

View File

@@ -6,7 +6,7 @@ edition = "2021"
[dependencies]
futures = { version = "0.3" }
opendal = "0.9"
opendal = "0.17"
tokio = { version = "1.0", features = ["full"] }
[dev-dependencies]

View File

@@ -1,5 +1,5 @@
pub use opendal::{
io_util::SeekableReader, Accessor, DirEntry, DirStreamer, Layer, Metadata, Object,
io_util::SeekableReader, services, Accessor, DirEntry, DirStreamer, Layer, Object,
ObjectMetadata, ObjectMode, Operator as ObjectStore,
};
pub mod backend;

View File

@@ -78,10 +78,9 @@ async fn test_object_list(store: &ObjectStore) -> Result<()> {
async fn test_fs_backend() -> Result<()> {
let tmp_dir = TempDir::new("test_fs_backend")?;
let store = ObjectStore::new(
fs::Backend::build()
fs::Builder::default()
.root(&tmp_dir.path().to_string_lossy())
.finish()
.await?,
.build()?,
);
test_object_crud(&store).await?;
@@ -95,14 +94,14 @@ async fn test_s3_backend() -> Result<()> {
logging::init_default_ut_logging();
if env::var("GT_S3_BUCKET").is_ok() {
logging::info!("Running s3 test.");
let store = ObjectStore::new(
s3::Backend::build()
.access_key_id(&env::var("GT_S3_ACCESS_KEY_ID")?)
.secret_access_key(&env::var("GT_S3_ACCESS_KEY")?)
.bucket(&env::var("GT_S3_BUCKET")?)
.finish()
.await?,
);
let accessor = s3::Builder::default()
.access_key_id(&env::var("GT_S3_ACCESS_KEY_ID")?)
.secret_access_key(&env::var("GT_S3_ACCESS_KEY")?)
.bucket(&env::var("GT_S3_BUCKET")?)
.build()?;
let store = ObjectStore::new(accessor);
test_object_crud(&store).await?;
test_object_list(&store).await?;
}

View File

@@ -39,10 +39,10 @@ tower = { version = "0.4", features = ["full"] }
tower-http = { version = "0.3", features = ["full"] }
[dev-dependencies]
axum-test-helper = "0.1"
axum-test-helper = { git = "https://github.com/sunng87/axum-test-helper.git", branch = "patch-1" }
catalog = { path = "../catalog" }
common-base = { path = "../common/base" }
mysql_async = "0.30"
mysql_async = { git = "https://github.com/Morranto/mysql_async.git", rev = "127b538" }
rand = "0.8"
script = { path = "../script", features = ["python"] }
test-util = { path = "../../test-util" }

View File

@@ -28,6 +28,7 @@ prost = "0.11"
regex = "1.5"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
sluice = "0.5"
snafu = { version = "0.7", features = ["backtraces"] }
store-api = { path = "../store-api" }
table = { path = "../table" }

View File

@@ -314,7 +314,7 @@ impl<S: LogStore> EngineInner<S> {
mod tests {
use datatypes::type_id::LogicalTypeId;
use log_store::test_util::log_store_util;
use object_store::backend::fs::Backend;
use object_store::backend::fs::Builder;
use store_api::storage::Region;
use tempdir::TempDir;
@@ -327,7 +327,8 @@ mod tests {
log_store_util::create_tmp_local_file_log_store("test_engine_wal").await;
let dir = TempDir::new("test_create_new_region").unwrap();
let store_dir = dir.path().to_string_lossy();
let accessor = Backend::build().root(&store_dir).finish().await.unwrap();
let accessor = Builder::default().root(&store_dir).build().unwrap();
let object_store = ObjectStore::new(accessor);
let config = EngineConfig::default();

View File

@@ -22,10 +22,9 @@ mod tests {
common_telemetry::init_default_ut_logging();
let tmp_dir = TempDir::new("test_region_manifest").unwrap();
let object_store = ObjectStore::new(
fs::Backend::build()
fs::Builder::default()
.root(&tmp_dir.path().to_string_lossy())
.finish()
.await
.build()
.unwrap(),
);

View File

@@ -214,9 +214,12 @@ impl ManifestLogStorage for ManifestObjectStore {
);
let bs = checkpoint_metadata.encode()?;
last_checkpoint.write(bs).await.context(WriteObjectSnafu {
path: last_checkpoint.path(),
})?;
last_checkpoint
.write(bs.as_ref())
.await
.context(WriteObjectSnafu {
path: last_checkpoint.path(),
})?;
Ok(())
}
@@ -271,10 +274,9 @@ mod tests {
common_telemetry::init_default_ut_logging();
let tmp_dir = TempDir::new("test_manifest_log_store").unwrap();
let object_store = ObjectStore::new(
fs::Backend::build()
fs::Builder::default()
.root(&tmp_dir.path().to_string_lossy())
.finish()
.await
.build()
.unwrap(),
);

View File

@@ -183,10 +183,9 @@ async fn test_recover_region_manifets() {
let tmp_dir = TempDir::new("test_new_region").unwrap();
let object_store = ObjectStore::new(
fs::Backend::build()
fs::Builder::default()
.root(&tmp_dir.path().to_string_lossy())
.finish()
.await
.build()
.unwrap(),
);

View File

@@ -18,8 +18,9 @@ use datatypes::arrow::io::parquet::write::{
};
use futures::AsyncWriteExt;
use futures_util::sink::SinkExt;
use futures_util::{Stream, TryStreamExt};
use futures_util::{try_join, Stream, TryStreamExt};
use object_store::{ObjectStore, SeekableReader};
use sluice::pipe;
use snafu::ResultExt;
use table::predicate::Predicate;
@@ -62,45 +63,55 @@ impl<'a> ParquetWriter<'a> {
let schema = store_schema.arrow_schema();
let object = self.object_store.object(self.file_path);
// FIXME(hl): writer size is not used in fs backend so just leave it to 0,
// but in s3/azblob backend the Content-Length field of HTTP request is set
// to this value.
let mut writer = object.writer(0).await.context(error::FlushIoSnafu)?;
let (reader, mut writer) = pipe::pipe();
// now all physical types use plain encoding, maybe let caller to choose encoding for each type.
let encodings = get_encoding_for_schema(schema, |_| Encoding::Plain);
let mut sink = FileSink::try_new(
&mut writer,
// The file sink needs the `Schema` instead of a reference.
(**schema).clone(),
encodings,
WriteOptions {
write_statistics: true,
compression: Compression::Gzip,
version: Version::V2,
try_join!(
async {
// FIXME(hl): writer size is not used in fs backend so just leave it to 0,
// but in s3/azblob backend the Content-Length field of HTTP request is set
// to this value.
object
.write_from(0, reader)
.await
.context(error::FlushIoSnafu)
},
)
.context(error::WriteParquetSnafu)?;
for batch in self.iter {
let batch = batch?;
sink.send(store_schema.batch_to_arrow_chunk(&batch))
.await
async {
let mut sink = FileSink::try_new(
&mut writer,
// The file sink needs the `Schema` instead of a reference.
(**schema).clone(),
encodings,
WriteOptions {
write_statistics: true,
compression: Compression::Gzip,
version: Version::V2,
},
)
.context(error::WriteParquetSnafu)?;
}
if let Some(meta) = extra_meta {
for (k, v) in meta {
sink.metadata.insert(k, Some(v));
for batch in self.iter {
let batch = batch?;
sink.send(store_schema.batch_to_arrow_chunk(&batch))
.await
.context(error::WriteParquetSnafu)?;
}
if let Some(meta) = extra_meta {
for (k, v) in meta {
sink.metadata.insert(k, Some(v));
}
}
sink.close().await.context(error::WriteParquetSnafu)?;
drop(sink);
writer.close().await.context(error::WriteObjectSnafu {
path: self.file_path,
})
}
}
sink.close().await.context(error::WriteParquetSnafu)?;
drop(sink);
writer.flush().await.context(error::WriteObjectSnafu {
path: self.file_path,
})
)
.map(|_| ())
}
}
@@ -287,7 +298,7 @@ mod tests {
use datatypes::arrow::io::parquet::read::FileReader;
use datatypes::prelude::{ScalarVector, Vector};
use datatypes::vectors::TimestampVector;
use object_store::backend::fs::Backend;
use object_store::backend::fs::Builder;
use store_api::storage::OpType;
use tempdir::TempDir;
@@ -324,7 +335,7 @@ mod tests {
let dir = TempDir::new("write_parquet").unwrap();
let path = dir.path().to_str().unwrap();
let backend = Backend::build().root(path).finish().await.unwrap();
let backend = Builder::default().root(path).build().unwrap();
let object_store = ObjectStore::new(backend);
let sst_file_name = "test-flush.parquet";
let iter = memtable.iter(&IterContext::default()).unwrap();

View File

@@ -1,7 +1,7 @@
use std::sync::Arc;
use log_store::fs::{config::LogConfig, log::LocalFileLogStore};
use object_store::{backend::fs::Backend, ObjectStore};
use object_store::{backend::fs::Builder, ObjectStore};
use crate::background::JobPoolImpl;
use crate::engine;
@@ -24,7 +24,7 @@ pub async fn new_store_config(
let sst_dir = engine::region_sst_dir(parent_dir, region_name);
let manifest_dir = engine::region_manifest_dir(parent_dir, region_name);
let accessor = Backend::build().root(store_dir).finish().await.unwrap();
let accessor = Builder::default().root(store_dir).build().unwrap();
let object_store = ObjectStore::new(accessor);
let sst_layer = Arc::new(FsAccessLayer::new(&sst_dir, object_store.clone()));
let manifest = RegionManifest::new(&manifest_dir, object_store);

View File

@@ -6,7 +6,7 @@ use std::sync::Arc;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::{ColumnSchema, Schema, SchemaBuilder, SchemaRef};
use log_store::fs::noop::NoopLogStore;
use object_store::{backend::fs::Backend, ObjectStore};
use object_store::{services::fs::Builder, ObjectStore};
use storage::config::EngineConfig as StorageEngineConfig;
use storage::EngineImpl;
use table::engine::EngineContext;
@@ -65,8 +65,7 @@ pub fn build_test_table_info() -> TableInfo {
pub async fn new_test_object_store(prefix: &str) -> (TempDir, ObjectStore) {
let dir = TempDir::new(prefix).unwrap();
let store_dir = dir.path().to_string_lossy();
let accessor = Backend::build().root(&store_dir).finish().await.unwrap();
let accessor = Builder::default().root(&store_dir).build().unwrap();
(dir, ObjectStore::new(accessor))
}