From 1bfba48755edced10ffbddf93e7c36f2321d0559 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Mon, 3 Jun 2024 20:28:59 +0800 Subject: [PATCH] Revert "build(deps): upgrade opendal to 0.46 (#4037)" This reverts commit f9db5ff0d6d007e577287c90da3d74172596f735. --- Cargo.lock | 234 ++++++------------ Cargo.toml | 2 +- src/common/datasource/src/compression.rs | 38 ++- src/common/datasource/src/file_format.rs | 5 +- src/common/datasource/src/file_format/csv.rs | 9 +- src/common/datasource/src/file_format/json.rs | 9 +- src/common/datasource/src/file_format/orc.rs | 73 ++---- .../datasource/src/file_format/parquet.rs | 29 +-- src/common/datasource/src/test_util.rs | 4 +- src/common/procedure/src/store/state_store.rs | 2 +- src/datanode/src/store.rs | 15 +- src/file-engine/src/manifest.rs | 3 +- src/mito2/src/cache/file_cache.rs | 19 +- src/mito2/src/cache/write_cache.rs | 25 +- src/mito2/src/manifest/storage.rs | 8 +- src/mito2/src/sst/index/applier.rs | 24 +- src/mito2/src/sst/index/store.rs | 26 +- src/mito2/src/sst/parquet/helper.rs | 6 +- src/mito2/src/sst/parquet/metadata.rs | 6 +- src/mito2/src/worker/handle_drop.rs | 3 +- src/object-store/Cargo.toml | 7 +- src/object-store/src/layers/lru_cache.rs | 32 +-- .../src/layers/lru_cache/read_cache.rs | 194 +++++++-------- src/object-store/src/layers/prometheus.rs | 189 +++++++++----- src/object-store/src/lib.rs | 5 +- src/object-store/tests/object_store_test.rs | 30 ++- src/operator/Cargo.toml | 1 - src/operator/src/statement/copy_table_from.rs | 42 +--- src/servers/src/export_metrics.rs | 6 +- src/servers/src/http/greptime_result_v1.rs | 2 +- src/servers/src/http/test_helpers.rs | 22 +- 31 files changed, 441 insertions(+), 629 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 007f777441..5e7f3aa121 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -83,7 +83,7 @@ dependencies = [ "axum", "bytes", "cfg-if", - "http 0.2.12", + "http", "indexmap 1.9.3", "schemars", "serde", @@ -764,9 +764,9 @@ dependencies = [ "bytes", "futures-util", "headers", - "http 0.2.12", - "http-body 0.4.6", - "hyper 0.14.28", + "http", + "http-body", + "hyper", "itoa", "matchit", "memchr", @@ -794,8 +794,8 @@ dependencies = [ "async-trait", "bytes", "futures-util", - "http 0.2.12", - "http-body 0.4.6", + "http", + "http-body", "mime", "rustversion", "tower-layer", @@ -1854,7 +1854,7 @@ dependencies = [ "common-telemetry", "common-test-util", "common-version", - "hyper 0.14.28", + "hyper", "reqwest", "serde", "tempfile", @@ -1963,7 +1963,7 @@ dependencies = [ "futures-util", "hex", "humantime-serde", - "hyper 0.14.28", + "hyper", "itertools 0.10.5", "lazy_static", "moka", @@ -3525,6 +3525,15 @@ version = "0.3.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a357d28ed41a50f9c765dbfe56cbc04a64e53e5fc58ba79fbc34c10ef3df831f" +[[package]] +name = "encoding_rs" +version = "0.8.34" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b45de904aa0b010bce2ab45264d0631681847fa7b6f2eaa7dab7619943bc4f59" +dependencies = [ + "cfg-if", +] + [[package]] name = "endian-type" version = "0.1.2" @@ -3595,7 +3604,7 @@ name = "etcd-client" version = "0.12.4" source = "git+https://github.com/MichaelScofield/etcd-client.git?rev=4c371e9b3ea8e0a8ee2f9cbd7ded26e54a45df3b#4c371e9b3ea8e0a8ee2f9cbd7ded26e54a45df3b" dependencies = [ - "http 0.2.12", + "http", "prost 0.12.4", "tokio", "tokio-stream", @@ -4210,7 +4219,7 @@ dependencies = [ "futures-core", "futures-sink", "futures-util", - "http 0.2.12", + "http", "indexmap 2.2.6", "slab", "tokio", @@ -4294,7 +4303,7 @@ dependencies = [ "base64 0.21.7", "bytes", "headers-core", - "http 0.2.12", + "http", "httpdate", "mime", "sha1", @@ -4306,7 +4315,7 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e7f66481bfee273957b1f20485a4ff3362987f85b2c236580d81b4eb7a326429" dependencies = [ - "http 0.2.12", + "http", ] [[package]] @@ -4409,17 +4418,6 @@ dependencies = [ "itoa", ] -[[package]] -name = "http" -version = "1.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "21b9ddb458710bc376481b842f5da65cdf31522de232c1ca8146abce2a358258" -dependencies = [ - "bytes", - "fnv", - "itoa", -] - [[package]] name = "http-body" version = "0.4.6" @@ -4427,30 +4425,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7ceab25649e9960c0311ea418d17bee82c0dcec1bd053b5f9a66e265a693bed2" dependencies = [ "bytes", - "http 0.2.12", - "pin-project-lite", -] - -[[package]] -name = "http-body" -version = "1.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1cac85db508abc24a2e48553ba12a996e87244a0395ce011e62b37158745d643" -dependencies = [ - "bytes", - "http 1.1.0", -] - -[[package]] -name = "http-body-util" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0475f8b2ac86659c21b64320d5d653f9efe42acd2a4e560073ec61a155a34f1d" -dependencies = [ - "bytes", - "futures-core", - "http 1.1.0", - "http-body 1.0.0", + "http", "pin-project-lite", ] @@ -4607,8 +4582,8 @@ dependencies = [ "futures-core", "futures-util", "h2", - "http 0.2.12", - "http-body 0.4.6", + "http", + "http-body", "httparse", "httpdate", "itoa", @@ -4620,40 +4595,18 @@ dependencies = [ "want", ] -[[package]] -name = "hyper" -version = "1.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fe575dd17d0862a9a33781c8c4696a55c320909004a67a00fb286ba8b1bc496d" -dependencies = [ - "bytes", - "futures-channel", - "futures-util", - "http 1.1.0", - "http-body 1.0.0", - "httparse", - "itoa", - "pin-project-lite", - "smallvec", - "tokio", - "want", -] - [[package]] name = "hyper-rustls" -version = "0.26.0" +version = "0.24.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a0bea761b46ae2b24eb4aef630d8d1c398157b6fc29e6350ecf090a0b70c952c" +checksum = "ec3efd23720e2049821a693cbc7e65ea87c72f1c58ff2f9522ff332b1491e590" dependencies = [ "futures-util", - "http 1.1.0", - "hyper 1.3.1", - "hyper-util", - "rustls 0.22.4", - "rustls-pki-types", + "http", + "hyper", + "rustls 0.21.12", "tokio", - "tokio-rustls 0.25.0", - "tower-service", + "tokio-rustls 0.24.1", ] [[package]] @@ -4662,32 +4615,12 @@ version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bbb958482e8c7be4bc3cf272a766a2b0bf1a6755e7a6ae777f017a31d11b13b1" dependencies = [ - "hyper 0.14.28", + "hyper", "pin-project-lite", "tokio", "tokio-io-timeout", ] -[[package]] -name = "hyper-util" -version = "0.1.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ca38ef113da30126bbff9cd1705f9273e15d45498615d138b0c20279ac7a76aa" -dependencies = [ - "bytes", - "futures-channel", - "futures-util", - "http 1.1.0", - "http-body 1.0.0", - "hyper 1.3.1", - "pin-project-lite", - "socket2 0.5.7", - "tokio", - "tower", - "tower-service", - "tracing", -] - [[package]] name = "iana-time-zone" version = "0.1.60" @@ -5671,7 +5604,7 @@ dependencies = [ "etcd-client", "futures", "h2", - "http-body 0.4.6", + "http-body", "humantime", "humantime-serde", "itertools 0.10.5", @@ -6433,6 +6366,7 @@ name = "object-store" version = "0.8.1" dependencies = [ "anyhow", + "async-trait", "bytes", "common-telemetry", "common-test-util", @@ -6481,21 +6415,20 @@ checksum = "0ab1bc2a289d34bd04a330323ac98a1b4bc82c9d9fcb1e66b63caa84da26b575" [[package]] name = "opendal" -version = "0.46.0" +version = "0.45.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "328c4992328e8965e6a6ef102d38438b5fdc7d9b9107eda2377ba05379d9d544" +checksum = "52c17c077f23fa2d2c25d9d22af98baa43b8bbe2ef0de80cf66339aa70401467" dependencies = [ "anyhow", "async-trait", "backon", - "base64 0.22.1", + "base64 0.21.7", "bytes", "chrono", - "crc32c", "flagset", "futures", "getrandom", - "http 1.1.0", + "http", "log", "md-5", "once_cell", @@ -6583,7 +6516,7 @@ checksum = "f24cda83b20ed2433c68241f918d0f6fdec8b1d43b7a9590ab4420c5095ca930" dependencies = [ "async-trait", "futures-core", - "http 0.2.12", + "http", "opentelemetry 0.21.0", "opentelemetry-proto 0.4.0", "opentelemetry-semantic-conventions", @@ -6720,7 +6653,6 @@ dependencies = [ "substrait 0.8.1", "table", "tokio", - "tokio-util", "tonic 0.11.0", ] @@ -8264,20 +8196,20 @@ dependencies = [ [[package]] name = "reqsign" -version = "0.15.0" +version = "0.14.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "01edce6b6c31a16ebc7525ac58c747a6d78bbce33e76bbebd350d6bc25b23e06" +checksum = "43e319d9de9ff4d941abf4ac718897118b0fe04577ea3f8e0f5788971784eef5" dependencies = [ "anyhow", "async-trait", - "base64 0.22.1", + "base64 0.21.7", "chrono", "form_urlencoded", "getrandom", "hex", "hmac", "home", - "http 1.1.0", + "http", "jsonwebtoken", "log", "once_cell", @@ -8286,7 +8218,7 @@ dependencies = [ "rand", "reqwest", "rsa 0.9.6", - "rust-ini 0.21.0", + "rust-ini 0.20.0", "serde", "serde_json", "sha1", @@ -8295,20 +8227,20 @@ dependencies = [ [[package]] name = "reqwest" -version = "0.12.4" +version = "0.11.27" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "566cafdd92868e0939d3fb961bd0dc25fcfaaed179291093b3d43e6b3150ea10" +checksum = "dd67538700a17451e7cba03ac727fb961abb7607553461627b97de0b89cf4a62" dependencies = [ - "base64 0.22.1", + "base64 0.21.7", "bytes", + "encoding_rs", "futures-core", "futures-util", - "http 1.1.0", - "http-body 1.0.0", - "http-body-util", - "hyper 1.3.1", + "h2", + "http", + "http-body", + "hyper", "hyper-rustls", - "hyper-util", "ipnet", "js-sys", "log", @@ -8317,16 +8249,16 @@ dependencies = [ "once_cell", "percent-encoding", "pin-project-lite", - "rustls 0.22.4", + "rustls 0.21.12", "rustls-native-certs", - "rustls-pemfile 2.1.2", - "rustls-pki-types", + "rustls-pemfile 1.0.4", "serde", "serde_json", "serde_urlencoded", "sync_wrapper", + "system-configuration", "tokio", - "tokio-rustls 0.25.0", + "tokio-rustls 0.24.1", "tokio-util", "tower-service", "url", @@ -8334,8 +8266,7 @@ dependencies = [ "wasm-bindgen-futures", "wasm-streams", "web-sys", - "webpki-roots 0.26.1", - "winreg 0.52.0", + "winreg 0.50.0", ] [[package]] @@ -8601,13 +8532,12 @@ dependencies = [ [[package]] name = "rust-ini" -version = "0.21.0" +version = "0.20.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0d625ed57d8f49af6cfa514c42e1a71fadcff60eb0b1c517ff82fe41aa025b41" +checksum = "3e0698206bcb8882bf2a9ecb4c1e7785db57ff052297085a6efd4fe42302068a" dependencies = [ "cfg-if", "ordered-multimap 0.7.3", - "trim-in-place", ] [[package]] @@ -8749,13 +8679,12 @@ dependencies = [ [[package]] name = "rustls-native-certs" -version = "0.7.0" +version = "0.6.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8f1fb85efa936c42c6d5fc28d2629bb51e4b2f4b8a5211e297d599cc5a093792" +checksum = "a9aace74cb666635c918e9c12bc0d348266037aa8eb599b5cba565709a8dff00" dependencies = [ "openssl-probe", - "rustls-pemfile 2.1.2", - "rustls-pki-types", + "rustls-pemfile 1.0.4", "schannel", "security-framework", ] @@ -9588,10 +9517,10 @@ dependencies = [ "hashbrown 0.14.5", "headers", "hostname", - "http 0.2.12", - "http-body 0.4.6", + "http", + "http-body", "humantime-serde", - "hyper 0.14.28", + "hyper", "influxdb_line_protocol", "itertools 0.10.5", "lazy_static", @@ -11215,9 +11144,9 @@ dependencies = [ "futures-core", "futures-util", "h2", - "http 0.2.12", - "http-body 0.4.6", - "hyper 0.14.28", + "http", + "http-body", + "hyper", "hyper-timeout", "percent-encoding", "pin-project", @@ -11243,9 +11172,9 @@ dependencies = [ "bytes", "flate2", "h2", - "http 0.2.12", - "http-body 0.4.6", - "hyper 0.14.28", + "http", + "http-body", + "hyper", "hyper-timeout", "percent-encoding", "pin-project", @@ -11347,8 +11276,8 @@ dependencies = [ "bytes", "futures-core", "futures-util", - "http 0.2.12", - "http-body 0.4.6", + "http", + "http-body", "http-range-header", "httpdate", "iri-string", @@ -11591,12 +11520,6 @@ dependencies = [ "tree-sitter", ] -[[package]] -name = "trim-in-place" -version = "0.1.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "343e926fc669bc8cde4fa3129ab681c63671bae288b1f1081ceee6d9d37904fc" - [[package]] name = "triomphe" version = "0.1.11" @@ -12262,15 +12185,6 @@ version = "0.25.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5f20c57d8d7db6d3b86154206ae5d8fba62dd39573114de97c2cb0578251f8e1" -[[package]] -name = "webpki-roots" -version = "0.26.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b3de34ae270483955a94f4b21bdaaeb83d508bb84a01435f393818edb0012009" -dependencies = [ - "rustls-pki-types", -] - [[package]] name = "which" version = "4.4.2" @@ -12637,9 +12551,9 @@ dependencies = [ [[package]] name = "winreg" -version = "0.52.0" +version = "0.50.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a277a57398d4bfa075df44f501a17cfdf8542d224f0d36095a2adc7aee4ef0a5" +checksum = "524e57b2c537c0f9b1e69f1965311ec12182b4122e45035b1508cd24d2adadb1" dependencies = [ "cfg-if", "windows-sys 0.48.0", diff --git a/Cargo.toml b/Cargo.toml index 734d2438b0..28f421798b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -146,7 +146,7 @@ raft-engine = { version = "0.4.1", default-features = false } rand = "0.8" regex = "1.8" regex-automata = { version = "0.4" } -reqwest = { version = "0.12", default-features = false, features = [ +reqwest = { version = "0.11", default-features = false, features = [ "json", "rustls-tls-native-roots", "stream", diff --git a/src/common/datasource/src/compression.rs b/src/common/datasource/src/compression.rs index 0d74849177..77b40f8e9c 100644 --- a/src/common/datasource/src/compression.rs +++ b/src/common/datasource/src/compression.rs @@ -92,44 +92,34 @@ impl CompressionType { macro_rules! impl_compression_type { ($(($enum_item:ident, $prefix:ident)),*) => { paste::item! { - use bytes::{Buf, BufMut, BytesMut}; - impl CompressionType { - pub async fn encode(&self, mut content: B) -> io::Result> { + pub async fn encode(&self, content: impl AsRef<[u8]>) -> io::Result> { match self { $( CompressionType::$enum_item => { - let mut buffer = Vec::with_capacity(content.remaining()); + let mut buffer = Vec::with_capacity(content.as_ref().len()); let mut encoder = write::[<$prefix Encoder>]::new(&mut buffer); - encoder.write_all_buf(&mut content).await?; + encoder.write_all(content.as_ref()).await?; encoder.shutdown().await?; Ok(buffer) } )* - CompressionType::Uncompressed => { - let mut bs = BytesMut::with_capacity(content.remaining()); - bs.put(content); - Ok(bs.to_vec()) - }, + CompressionType::Uncompressed => Ok(content.as_ref().to_vec()), } } - pub async fn decode(&self, mut content: B) -> io::Result> { + pub async fn decode(&self, content: impl AsRef<[u8]>) -> io::Result> { match self { $( CompressionType::$enum_item => { - let mut buffer = Vec::with_capacity(content.remaining() * 2); + let mut buffer = Vec::with_capacity(content.as_ref().len() * 2); let mut encoder = write::[<$prefix Decoder>]::new(&mut buffer); - encoder.write_all_buf(&mut content).await?; + encoder.write_all(content.as_ref()).await?; encoder.shutdown().await?; Ok(buffer) } )* - CompressionType::Uncompressed => { - let mut bs = BytesMut::with_capacity(content.remaining()); - bs.put(content); - Ok(bs.to_vec()) - }, + CompressionType::Uncompressed => Ok(content.as_ref().to_vec()), } } @@ -161,13 +151,13 @@ macro_rules! impl_compression_type { $( #[tokio::test] async fn []() { - let string = "foo_bar".as_bytes(); + let string = "foo_bar".as_bytes().to_vec(); let compress = CompressionType::$enum_item - .encode(string) + .encode(&string) .await .unwrap(); let decompress = CompressionType::$enum_item - .decode(compress.as_slice()) + .decode(&compress) .await .unwrap(); assert_eq!(decompress, string); @@ -175,13 +165,13 @@ macro_rules! impl_compression_type { #[tokio::test] async fn test_uncompression() { - let string = "foo_bar".as_bytes(); + let string = "foo_bar".as_bytes().to_vec(); let compress = CompressionType::Uncompressed - .encode(string) + .encode(&string) .await .unwrap(); let decompress = CompressionType::Uncompressed - .decode(compress.as_slice()) + .decode(&compress) .await .unwrap(); assert_eq!(decompress, string); diff --git a/src/common/datasource/src/file_format.rs b/src/common/datasource/src/file_format.rs index 5bb9258ad3..6f80590d26 100644 --- a/src/common/datasource/src/file_format.rs +++ b/src/common/datasource/src/file_format.rs @@ -36,7 +36,6 @@ use datafusion::physical_plan::SendableRecordBatchStream; use futures::StreamExt; use object_store::ObjectStore; use snafu::ResultExt; -use tokio_util::compat::FuturesAsyncWriteCompatExt; use self::csv::CsvFormat; use self::json::JsonFormat; @@ -147,8 +146,7 @@ pub fn open_with_decoder DataFusionResult>( let reader = object_store .reader(&path) .await - .map_err(|e| DataFusionError::External(Box::new(e)))? - .into_bytes_stream(..); + .map_err(|e| DataFusionError::External(Box::new(e)))?; let mut upstream = compression_type.convert_stream(reader).fuse(); @@ -205,7 +203,6 @@ pub async fn stream_to_file T>( .writer_with(&path) .concurrent(concurrency) .await - .map(|v| v.into_futures_async_write().compat_write()) .context(error::WriteObjectSnafu { path }) }); diff --git a/src/common/datasource/src/file_format/csv.rs b/src/common/datasource/src/file_format/csv.rs index ade4e5409e..4cf2b9e133 100644 --- a/src/common/datasource/src/file_format/csv.rs +++ b/src/common/datasource/src/file_format/csv.rs @@ -29,7 +29,6 @@ use datafusion::physical_plan::SendableRecordBatchStream; use derive_builder::Builder; use object_store::ObjectStore; use snafu::ResultExt; -use tokio_util::compat::FuturesAsyncReadCompatExt; use tokio_util::io::SyncIoBridge; use super::stream_to_file; @@ -165,16 +164,10 @@ impl FileOpener for CsvOpener { #[async_trait] impl FileFormat for CsvFormat { async fn infer_schema(&self, store: &ObjectStore, path: &str) -> Result { - let meta = store - .stat(path) - .await - .context(error::ReadObjectSnafu { path })?; let reader = store .reader(path) .await - .context(error::ReadObjectSnafu { path })? - .into_futures_async_read(0..meta.content_length()) - .compat(); + .context(error::ReadObjectSnafu { path })?; let decoded = self.compression_type.convert_async_read(reader); diff --git a/src/common/datasource/src/file_format/json.rs b/src/common/datasource/src/file_format/json.rs index 97057f8362..77fde3fddb 100644 --- a/src/common/datasource/src/file_format/json.rs +++ b/src/common/datasource/src/file_format/json.rs @@ -31,7 +31,6 @@ use datafusion::error::{DataFusionError, Result as DataFusionResult}; use datafusion::physical_plan::SendableRecordBatchStream; use object_store::ObjectStore; use snafu::ResultExt; -use tokio_util::compat::FuturesAsyncReadCompatExt; use tokio_util::io::SyncIoBridge; use super::stream_to_file; @@ -83,16 +82,10 @@ impl Default for JsonFormat { #[async_trait] impl FileFormat for JsonFormat { async fn infer_schema(&self, store: &ObjectStore, path: &str) -> Result { - let meta = store - .stat(path) - .await - .context(error::ReadObjectSnafu { path })?; let reader = store .reader(path) .await - .context(error::ReadObjectSnafu { path })? - .into_futures_async_read(0..meta.content_length()) - .compat(); + .context(error::ReadObjectSnafu { path })?; let decoded = self.compression_type.convert_async_read(reader); diff --git a/src/common/datasource/src/file_format/orc.rs b/src/common/datasource/src/file_format/orc.rs index 4a6001f3e7..23e0589c99 100644 --- a/src/common/datasource/src/file_format/orc.rs +++ b/src/common/datasource/src/file_format/orc.rs @@ -16,17 +16,15 @@ use std::sync::Arc; use arrow_schema::{ArrowError, Schema, SchemaRef}; use async_trait::async_trait; -use bytes::Bytes; use common_recordbatch::adapter::RecordBatchStreamTypeAdapter; use datafusion::datasource::physical_plan::{FileMeta, FileOpenFuture, FileOpener}; use datafusion::error::{DataFusionError, Result as DfResult}; -use futures::future::BoxFuture; -use futures::{FutureExt, StreamExt, TryStreamExt}; +use futures::{StreamExt, TryStreamExt}; use object_store::ObjectStore; use orc_rust::arrow_reader::ArrowReaderBuilder; use orc_rust::async_arrow_reader::ArrowStreamReader; -use orc_rust::reader::AsyncChunkReader; use snafu::ResultExt; +use tokio::io::{AsyncRead, AsyncSeek}; use crate::error::{self, Result}; use crate::file_format::FileFormat; @@ -34,49 +32,18 @@ use crate::file_format::FileFormat; #[derive(Debug, Clone, Copy, PartialEq, Eq, Default)] pub struct OrcFormat; -#[derive(Clone)] -pub struct ReaderAdapter { - reader: object_store::Reader, - len: u64, -} - -impl ReaderAdapter { - pub fn new(reader: object_store::Reader, len: u64) -> Self { - Self { reader, len } - } -} - -impl AsyncChunkReader for ReaderAdapter { - fn len(&mut self) -> BoxFuture<'_, std::io::Result> { - async move { Ok(self.len) }.boxed() - } - - fn get_bytes( - &mut self, - offset_from_start: u64, - length: u64, - ) -> BoxFuture<'_, std::io::Result> { - async move { - let bytes = self - .reader - .read(offset_from_start..offset_from_start + length) - .await?; - Ok(bytes.to_bytes()) - } - .boxed() - } -} - -pub async fn new_orc_stream_reader( - reader: ReaderAdapter, -) -> Result> { +pub async fn new_orc_stream_reader( + reader: R, +) -> Result> { let reader_build = ArrowReaderBuilder::try_new_async(reader) .await .context(error::OrcReaderSnafu)?; Ok(reader_build.build_async()) } -pub async fn infer_orc_schema(reader: ReaderAdapter) -> Result { +pub async fn infer_orc_schema( + reader: R, +) -> Result { let reader = new_orc_stream_reader(reader).await?; Ok(reader.schema().as_ref().clone()) } @@ -84,15 +51,13 @@ pub async fn infer_orc_schema(reader: ReaderAdapter) -> Result { #[async_trait] impl FileFormat for OrcFormat { async fn infer_schema(&self, store: &ObjectStore, path: &str) -> Result { - let meta = store - .stat(path) - .await - .context(error::ReadObjectSnafu { path })?; let reader = store .reader(path) .await .context(error::ReadObjectSnafu { path })?; - let schema = infer_orc_schema(ReaderAdapter::new(reader, meta.content_length())).await?; + + let schema = infer_orc_schema(reader).await?; + Ok(schema) } } @@ -132,22 +97,14 @@ impl FileOpener for OrcOpener { }; let projection = self.projection.clone(); Ok(Box::pin(async move { - let path = meta.location().to_string(); - - let meta = object_store - .stat(&path) - .await - .map_err(|e| DataFusionError::External(Box::new(e)))?; - let reader = object_store - .reader(&path) + .reader(meta.location().to_string().as_str()) .await .map_err(|e| DataFusionError::External(Box::new(e)))?; - let stream_reader = - new_orc_stream_reader(ReaderAdapter::new(reader, meta.content_length())) - .await - .map_err(|e| DataFusionError::External(Box::new(e)))?; + let stream_reader = new_orc_stream_reader(reader) + .await + .map_err(|e| DataFusionError::External(Box::new(e)))?; let stream = RecordBatchStreamTypeAdapter::new(projected_schema, stream_reader, projection); diff --git a/src/common/datasource/src/file_format/parquet.rs b/src/common/datasource/src/file_format/parquet.rs index 2e887ac2f7..651d5904c8 100644 --- a/src/common/datasource/src/file_format/parquet.rs +++ b/src/common/datasource/src/file_format/parquet.rs @@ -29,11 +29,10 @@ use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet; use datafusion::physical_plan::SendableRecordBatchStream; use futures::future::BoxFuture; use futures::StreamExt; -use object_store::{FuturesAsyncReader, ObjectStore}; +use object_store::{ObjectStore, Reader, Writer}; use parquet::basic::{Compression, ZstdLevel}; use parquet::file::properties::WriterProperties; use snafu::ResultExt; -use tokio_util::compat::{Compat, FuturesAsyncReadCompatExt, FuturesAsyncWriteCompatExt}; use crate::buffered_writer::{ArrowWriterCloser, DfRecordBatchEncoder, LazyBufferedWriter}; use crate::error::{self, Result}; @@ -46,16 +45,10 @@ pub struct ParquetFormat {} #[async_trait] impl FileFormat for ParquetFormat { async fn infer_schema(&self, store: &ObjectStore, path: &str) -> Result { - let meta = store - .stat(path) - .await - .context(error::ReadObjectSnafu { path })?; let mut reader = store .reader(path) .await - .context(error::ReadObjectSnafu { path })? - .into_futures_async_read(0..meta.content_length()) - .compat(); + .context(error::ReadObjectSnafu { path })?; let metadata = reader .get_metadata() @@ -105,7 +98,7 @@ impl ParquetFileReaderFactory for DefaultParquetFileReaderFactory { pub struct LazyParquetFileReader { object_store: ObjectStore, - reader: Option>, + reader: Option, path: String, } @@ -121,13 +114,7 @@ impl LazyParquetFileReader { /// Must initialize the reader, or throw an error from the future. async fn maybe_initialize(&mut self) -> result::Result<(), object_store::Error> { if self.reader.is_none() { - let meta = self.object_store.stat(&self.path).await?; - let reader = self - .object_store - .reader(&self.path) - .await? - .into_futures_async_read(0..meta.content_length()) - .compat(); + let reader = self.object_store.reader(&self.path).await?; self.reader = Some(reader); } @@ -180,17 +167,16 @@ pub struct BufferedWriter { } type InnerBufferedWriter = LazyBufferedWriter< - Compat, + object_store::Writer, ArrowWriter, - impl Fn(String) -> BoxFuture<'static, Result>>, + impl Fn(String) -> BoxFuture<'static, Result>, >; impl BufferedWriter { fn make_write_factory( store: ObjectStore, concurrency: usize, - ) -> impl Fn(String) -> BoxFuture<'static, Result>> - { + ) -> impl Fn(String) -> BoxFuture<'static, Result> { move |path| { let store = store.clone(); Box::pin(async move { @@ -198,7 +184,6 @@ impl BufferedWriter { .writer_with(&path) .concurrent(concurrency) .await - .map(|v| v.into_futures_async_write().compat_write()) .context(error::WriteObjectSnafu { path }) }) } diff --git a/src/common/datasource/src/test_util.rs b/src/common/datasource/src/test_util.rs index d3a24a23d2..8f1af59c90 100644 --- a/src/common/datasource/src/test_util.rs +++ b/src/common/datasource/src/test_util.rs @@ -120,7 +120,7 @@ pub async fn setup_stream_to_json_test(origin_path: &str, threshold: impl Fn(usi let written = tmp_store.read(&output_path).await.unwrap(); let origin = store.read(origin_path).await.unwrap(); - assert_eq_lines(written.to_vec(), origin.to_vec()); + assert_eq_lines(written, origin); } pub async fn setup_stream_to_csv_test(origin_path: &str, threshold: impl Fn(usize) -> usize) { @@ -158,7 +158,7 @@ pub async fn setup_stream_to_csv_test(origin_path: &str, threshold: impl Fn(usiz let written = tmp_store.read(&output_path).await.unwrap(); let origin = store.read(origin_path).await.unwrap(); - assert_eq_lines(written.to_vec(), origin.to_vec()); + assert_eq_lines(written, origin); } // Ignore the CRLF difference across operating systems. diff --git a/src/common/procedure/src/store/state_store.rs b/src/common/procedure/src/store/state_store.rs index 096ef84b12..f02ccc7937 100644 --- a/src/common/procedure/src/store/state_store.rs +++ b/src/common/procedure/src/store/state_store.rs @@ -179,7 +179,7 @@ impl StateStore for ObjectStateStore { )) }) .context(ListStateSnafu { path: key })?; - yield (key.into(), value.to_vec()); + yield (key.into(), value); } } }); diff --git a/src/datanode/src/store.rs b/src/datanode/src/store.rs index 8b64511598..8b778306c4 100644 --- a/src/datanode/src/store.rs +++ b/src/datanode/src/store.rs @@ -20,6 +20,7 @@ mod gcs; mod oss; mod s3; +use std::sync::Arc; use std::time::Duration; use std::{env, path}; @@ -28,7 +29,7 @@ use common_telemetry::info; use object_store::layers::{LruCacheLayer, RetryLayer}; use object_store::services::Fs; use object_store::util::{join_dir, normalize_dir, with_instrument_layers}; -use object_store::{HttpClient, ObjectStore}; +use object_store::{HttpClient, ObjectStore, ObjectStoreBuilder}; use snafu::prelude::*; use crate::config::{ObjectStoreConfig, DEFAULT_OBJECT_STORE_CACHE_SIZE}; @@ -106,13 +107,13 @@ async fn create_object_store_with_cache( if let Some(path) = cache_path { let atomic_temp_dir = join_dir(path, ".tmp/"); clean_temp_dir(&atomic_temp_dir)?; - let mut builder = Fs::default(); - builder.root(path).atomic_write_dir(&atomic_temp_dir); - let cache_store = ObjectStore::new(builder) - .context(error::InitBackendSnafu)? - .finish(); + let cache_store = Fs::default() + .root(path) + .atomic_write_dir(&atomic_temp_dir) + .build() + .context(error::InitBackendSnafu)?; - let cache_layer = LruCacheLayer::new(cache_store, cache_capacity.0 as usize) + let cache_layer = LruCacheLayer::new(Arc::new(cache_store), cache_capacity.0 as usize) .await .context(error::InitBackendSnafu)?; diff --git a/src/file-engine/src/manifest.rs b/src/file-engine/src/manifest.rs index 6310c3ccb9..d2e8255301 100644 --- a/src/file-engine/src/manifest.rs +++ b/src/file-engine/src/manifest.rs @@ -71,8 +71,7 @@ impl FileRegionManifest { let bs = object_store .read(path) .await - .context(LoadRegionManifestSnafu { region_id })? - .to_vec(); + .context(LoadRegionManifestSnafu { region_id })?; Self::decode(bs.as_slice()) } diff --git a/src/mito2/src/cache/file_cache.rs b/src/mito2/src/cache/file_cache.rs index 931e506269..0afbf5b669 100644 --- a/src/mito2/src/cache/file_cache.rs +++ b/src/mito2/src/cache/file_cache.rs @@ -112,10 +112,6 @@ impl FileCache { self.memory_index.insert(key, value).await; } - pub(crate) async fn get(&self, key: IndexKey) -> Option { - self.memory_index.get(&key).await - } - /// Reads a file from the cache. pub(crate) async fn reader(&self, key: IndexKey) -> Option { // We must use `get()` to update the estimator of the cache. @@ -376,6 +372,7 @@ fn parse_index_key(name: &str) -> Option { #[cfg(test)] mod tests { use common_test_util::temp_dir::create_temp_dir; + use futures::AsyncReadExt; use object_store::services::Fs; use super::*; @@ -454,9 +451,10 @@ mod tests { .await; // Read file content. - let reader = cache.reader(key).await.unwrap(); - let buf = reader.read(..).await.unwrap().to_vec(); - assert_eq!("hello", String::from_utf8(buf).unwrap()); + let mut reader = cache.reader(key).await.unwrap(); + let mut buf = String::new(); + reader.read_to_string(&mut buf).await.unwrap(); + assert_eq!("hello", buf); // Get weighted size. cache.memory_index.run_pending_tasks().await; @@ -551,9 +549,10 @@ mod tests { for (i, file_id) in file_ids.iter().enumerate() { let key = IndexKey::new(region_id, *file_id, file_type); - let reader = cache.reader(key).await.unwrap(); - let buf = reader.read(..).await.unwrap().to_vec(); - assert_eq!(i.to_string(), String::from_utf8(buf).unwrap()); + let mut reader = cache.reader(key).await.unwrap(); + let mut buf = String::new(); + reader.read_to_string(&mut buf).await.unwrap(); + assert_eq!(i.to_string(), buf); } } diff --git a/src/mito2/src/cache/write_cache.rs b/src/mito2/src/cache/write_cache.rs index f7a5af339b..23a8419469 100644 --- a/src/mito2/src/cache/write_cache.rs +++ b/src/mito2/src/cache/write_cache.rs @@ -19,7 +19,6 @@ use std::time::Duration; use common_base::readable_size::ReadableSize; use common_telemetry::{debug, info}; -use futures::AsyncWriteExt; use object_store::manager::ObjectStoreManagerRef; use object_store::ObjectStore; use snafu::ResultExt; @@ -176,27 +175,19 @@ impl WriteCache { }]) .start_timer(); - let cached_value = self - .file_cache - .local_store() - .stat(&cache_path) - .await - .context(error::OpenDalSnafu)?; let reader = self .file_cache .local_store() .reader(&cache_path) .await - .context(error::OpenDalSnafu)? - .into_futures_async_read(0..cached_value.content_length()); + .context(error::OpenDalSnafu)?; let mut writer = remote_store .writer_with(upload_path) - .chunk(DEFAULT_WRITE_BUFFER_SIZE.as_bytes() as usize) + .buffer(DEFAULT_WRITE_BUFFER_SIZE.as_bytes() as usize) .concurrent(DEFAULT_WRITE_CONCURRENCY) .await - .context(error::OpenDalSnafu)? - .into_futures_async_write(); + .context(error::OpenDalSnafu)?; let bytes_written = futures::io::copy(reader, &mut writer) @@ -208,11 +199,7 @@ impl WriteCache { })?; // Must close to upload all data. - writer.close().await.context(error::UploadSnafu { - region_id, - file_id, - file_type, - })?; + writer.close().await.context(error::OpenDalSnafu)?; UPLOAD_BYTES_TOTAL.inc_by(bytes_written); @@ -328,7 +315,7 @@ mod tests { .read(&write_cache.file_cache.cache_file_path(key)) .await .unwrap(); - assert_eq!(remote_data.to_vec(), cache_data.to_vec()); + assert_eq!(remote_data, cache_data); // Check write cache contains the index key let index_key = IndexKey::new(region_id, file_id, FileType::Puffin); @@ -339,7 +326,7 @@ mod tests { .read(&write_cache.file_cache.cache_file_path(index_key)) .await .unwrap(); - assert_eq!(remote_index_data.to_vec(), cache_index_data.to_vec()); + assert_eq!(remote_index_data, cache_index_data); } #[tokio::test] diff --git a/src/mito2/src/manifest/storage.rs b/src/mito2/src/manifest/storage.rs index 88450a21bd..420386e124 100644 --- a/src/mito2/src/manifest/storage.rs +++ b/src/mito2/src/manifest/storage.rs @@ -497,7 +497,7 @@ impl ManifestObjectStore { } }; - let checkpoint_metadata = CheckpointMetadata::decode(&last_checkpoint_data.to_vec())?; + let checkpoint_metadata = CheckpointMetadata::decode(&last_checkpoint_data)?; debug!( "Load checkpoint in path: {}, metadata: {:?}", @@ -509,11 +509,7 @@ impl ManifestObjectStore { #[cfg(test)] pub async fn read_file(&self, path: &str) -> Result> { - self.object_store - .read(path) - .await - .context(OpenDalSnafu) - .map(|v| v.to_vec()) + self.object_store.read(path).await.context(OpenDalSnafu) } #[cfg(test)] diff --git a/src/mito2/src/sst/index/applier.rs b/src/mito2/src/sst/index/applier.rs index eb4e42cd47..f14251afb9 100644 --- a/src/mito2/src/sst/index/applier.rs +++ b/src/mito2/src/sst/index/applier.rs @@ -121,17 +121,9 @@ impl SstIndexApplier { return Ok(None); }; - let Some(indexed_value) = file_cache - .get(IndexKey::new(self.region_id, file_id, FileType::Puffin)) - .await - else { - return Ok(None); - }; - Ok(file_cache .reader(IndexKey::new(self.region_id, file_id, FileType::Puffin)) .await - .map(|v| v.into_futures_async_read(0..indexed_value.file_size as u64)) .map(PuffinFileReader::new)) } @@ -198,13 +190,7 @@ mod tests { let region_dir = "region_dir".to_string(); let path = location::index_file_path(®ion_dir, file_id); - let mut puffin_writer = PuffinFileWriter::new( - object_store - .writer(&path) - .await - .unwrap() - .into_futures_async_write(), - ); + let mut puffin_writer = PuffinFileWriter::new(object_store.writer(&path).await.unwrap()); puffin_writer .add_blob(Blob { blob_type: INDEX_BLOB_TYPE.to_string(), @@ -250,13 +236,7 @@ mod tests { let region_dir = "region_dir".to_string(); let path = location::index_file_path(®ion_dir, file_id); - let mut puffin_writer = PuffinFileWriter::new( - object_store - .writer(&path) - .await - .unwrap() - .into_futures_async_write(), - ); + let mut puffin_writer = PuffinFileWriter::new(object_store.writer(&path).await.unwrap()); puffin_writer .add_blob(Blob { blob_type: "invalid_blob_type".to_string(), diff --git a/src/mito2/src/sst/index/store.rs b/src/mito2/src/sst/index/store.rs index 9d26118366..23893cc861 100644 --- a/src/mito2/src/sst/index/store.rs +++ b/src/mito2/src/sst/index/store.rs @@ -26,8 +26,6 @@ use crate::error::{OpenDalSnafu, Result}; /// A wrapper around [`ObjectStore`] that adds instrumentation for monitoring /// metrics such as bytes read, bytes written, and the number of seek operations. -/// -/// TODO: Consider refactor InstrumentedStore to use async in trait instead of AsyncRead. #[derive(Clone)] pub(crate) struct InstrumentedStore { /// The underlying object store. @@ -60,14 +58,8 @@ impl InstrumentedStore { read_byte_count: &'a IntCounter, read_count: &'a IntCounter, seek_count: &'a IntCounter, - ) -> Result> { - let meta = self.object_store.stat(path).await.context(OpenDalSnafu)?; - let reader = self - .object_store - .reader(path) - .await - .context(OpenDalSnafu)? - .into_futures_async_read(0..meta.content_length()); + ) -> Result> { + let reader = self.object_store.reader(path).await.context(OpenDalSnafu)?; Ok(InstrumentedAsyncRead::new( reader, read_byte_count, @@ -85,21 +77,15 @@ impl InstrumentedStore { write_byte_count: &'a IntCounter, write_count: &'a IntCounter, flush_count: &'a IntCounter, - ) -> Result> { + ) -> Result> { let writer = match self.write_buffer_size { Some(size) => self .object_store .writer_with(path) - .chunk(size) + .buffer(size) .await - .context(OpenDalSnafu)? - .into_futures_async_write(), - None => self - .object_store - .writer(path) - .await - .context(OpenDalSnafu)? - .into_futures_async_write(), + .context(OpenDalSnafu)?, + None => self.object_store.writer(path).await.context(OpenDalSnafu)?, }; Ok(InstrumentedAsyncWrite::new( writer, diff --git a/src/mito2/src/sst/parquet/helper.rs b/src/mito2/src/sst/parquet/helper.rs index b3cc8f8279..34196df7c0 100644 --- a/src/mito2/src/sst/parquet/helper.rs +++ b/src/mito2/src/sst/parquet/helper.rs @@ -121,7 +121,7 @@ async fn fetch_ranges_seq( .read_with(&file_path) .range(range.start..range.end) .call()?; - Ok::<_, object_store::Error>(data.to_bytes()) + Ok::<_, object_store::Error>(Bytes::from(data)) }) .collect::>>() }; @@ -141,7 +141,7 @@ async fn fetch_ranges_concurrent( let future_read = object_store.read_with(file_path); handles.push(async move { let data = future_read.range(range.start..range.end).await?; - Ok::<_, object_store::Error>(data.to_bytes()) + Ok::<_, object_store::Error>(Bytes::from(data)) }); } let results = futures::future::try_join_all(handles).await?; @@ -164,7 +164,7 @@ where } } -// https://github.com/apache/opendal/blob/v0.46.0/core/src/raw/tokio_util.rs#L21-L24 +// https://github.com/apache/incubator-opendal/blob/7144ab1ca2409dff0c324bfed062ce985997f8ce/core/src/raw/tokio_util.rs#L21-L23 /// Parse tokio error into opendal::Error. fn new_task_join_error(e: tokio::task::JoinError) -> object_store::Error { object_store::Error::new(ErrorKind::Unexpected, "tokio task join failed").set_source(e) diff --git a/src/mito2/src/sst/parquet/metadata.rs b/src/mito2/src/sst/parquet/metadata.rs index 3cf5a85cf8..e0db7b40b8 100644 --- a/src/mito2/src/sst/parquet/metadata.rs +++ b/src/mito2/src/sst/parquet/metadata.rs @@ -85,8 +85,7 @@ impl<'a> MetadataLoader<'a> { .read_with(path) .range(buffer_start..file_size) .await - .context(error::OpenDalSnafu)? - .to_vec(); + .context(error::OpenDalSnafu)?; let buffer_len = buffer.len(); let mut footer = [0; 8]; @@ -130,8 +129,7 @@ impl<'a> MetadataLoader<'a> { .read_with(path) .range(metadata_start..(file_size - FOOTER_SIZE as u64)) .await - .context(error::OpenDalSnafu)? - .to_vec(); + .context(error::OpenDalSnafu)?; let metadata = decode_metadata(&data).map_err(|e| { error::InvalidParquetSnafu { diff --git a/src/mito2/src/worker/handle_drop.rs b/src/mito2/src/worker/handle_drop.rs index c307e437c5..814d48bd1b 100644 --- a/src/mito2/src/worker/handle_drop.rs +++ b/src/mito2/src/worker/handle_drop.rs @@ -16,7 +16,6 @@ use std::time::Duration; -use bytes::Bytes; use common_telemetry::{error, info, warn}; use futures::TryStreamExt; use object_store::util::join_path; @@ -51,7 +50,7 @@ impl RegionWorkerLoop { region .access_layer .object_store() - .write(&marker_path, Bytes::new()) + .write(&marker_path, vec![]) .await .context(OpenDalSnafu) .inspect_err(|e| { diff --git a/src/object-store/Cargo.toml b/src/object-store/Cargo.toml index 5f4c4c98ed..da1291cad2 100644 --- a/src/object-store/Cargo.toml +++ b/src/object-store/Cargo.toml @@ -11,21 +11,23 @@ workspace = true services-memory = ["opendal/services-memory"] [dependencies] +async-trait = "0.1" bytes.workspace = true common-telemetry.workspace = true futures.workspace = true lazy_static.workspace = true md5 = "0.7" moka = { workspace = true, features = ["future"] } -opendal = { version = "0.46", features = [ +opendal = { version = "0.45", features = [ "layers-tracing", + "rustls", "services-azblob", "services-fs", "services-gcs", "services-http", "services-oss", "services-s3", -] } +], default-features = false } prometheus.workspace = true uuid.workspace = true @@ -33,4 +35,5 @@ uuid.workspace = true anyhow = "1.0" common-telemetry.workspace = true common-test-util.workspace = true +opendal = { version = "0.45", features = ["services-memory"] } tokio.workspace = true diff --git a/src/object-store/src/layers/lru_cache.rs b/src/object-store/src/layers/lru_cache.rs index bcea36603c..70d20710cb 100644 --- a/src/object-store/src/layers/lru_cache.rs +++ b/src/object-store/src/layers/lru_cache.rs @@ -14,26 +14,27 @@ use std::sync::Arc; -use opendal::raw::oio::ReadDyn; +use async_trait::async_trait; +use opendal::raw::oio::Read; use opendal::raw::{ - Access, Layer, LayeredAccess, OpDelete, OpList, OpRead, OpWrite, RpDelete, RpList, RpRead, + Accessor, Layer, LayeredAccessor, OpDelete, OpList, OpRead, OpWrite, RpDelete, RpList, RpRead, RpWrite, }; -use opendal::{Operator, Result}; +use opendal::Result; mod read_cache; use common_telemetry::info; use read_cache::ReadCache; /// An opendal layer with local LRU file cache supporting. #[derive(Clone)] -pub struct LruCacheLayer { +pub struct LruCacheLayer { // The read cache - read_cache: ReadCache, + read_cache: ReadCache, } -impl LruCacheLayer { +impl LruCacheLayer { /// Create a `[LruCacheLayer]` with local file cache and capacity in bytes. - pub async fn new(file_cache: Operator, capacity: usize) -> Result { + pub async fn new(file_cache: Arc, capacity: usize) -> Result { let read_cache = ReadCache::new(file_cache, capacity); let (entries, bytes) = read_cache.recover_cache().await?; @@ -56,11 +57,11 @@ impl LruCacheLayer { } } -impl Layer for LruCacheLayer { - type LayeredAccess = LruCacheAccess; +impl Layer for LruCacheLayer { + type LayeredAccessor = LruCacheAccessor; - fn layer(&self, inner: I) -> Self::LayeredAccess { - LruCacheAccess { + fn layer(&self, inner: I) -> Self::LayeredAccessor { + LruCacheAccessor { inner, read_cache: self.read_cache.clone(), } @@ -68,14 +69,15 @@ impl Layer for LruCacheLayer { } #[derive(Debug)] -pub struct LruCacheAccess { +pub struct LruCacheAccessor { inner: I, - read_cache: ReadCache, + read_cache: ReadCache, } -impl LayeredAccess for LruCacheAccess { +#[async_trait] +impl LayeredAccessor for LruCacheAccessor { type Inner = I; - type Reader = Arc; + type Reader = Box; type BlockingReader = I::BlockingReader; type Writer = I::Writer; type BlockingWriter = I::BlockingWriter; diff --git a/src/object-store/src/layers/lru_cache/read_cache.rs b/src/object-store/src/layers/lru_cache/read_cache.rs index 81415b8039..f37e4772c3 100644 --- a/src/object-store/src/layers/lru_cache/read_cache.rs +++ b/src/object-store/src/layers/lru_cache/read_cache.rs @@ -15,12 +15,12 @@ use std::sync::Arc; use common_telemetry::debug; -use futures::{FutureExt, StreamExt}; +use futures::FutureExt; use moka::future::Cache; use moka::notification::ListenerFuture; -use opendal::raw::oio::{Read, ReadDyn, Reader}; -use opendal::raw::{Access, BytesRange, OpRead, OpStat, RpRead}; -use opendal::{Buffer, Error as OpendalError, ErrorKind, Operator, Result}; +use opendal::raw::oio::{ListExt, Read, ReadExt, Reader, WriteExt}; +use opendal::raw::{Accessor, OpDelete, OpList, OpRead, OpStat, OpWrite, RpRead}; +use opendal::{Error as OpendalError, ErrorKind, Result}; use crate::metrics::{ OBJECT_STORE_LRU_CACHE_BYTES, OBJECT_STORE_LRU_CACHE_ENTRIES, OBJECT_STORE_LRU_CACHE_HIT, @@ -52,22 +52,26 @@ fn can_cache(path: &str) -> bool { } /// Generate an unique cache key for the read path and range. -fn read_cache_key(path: &str, range: BytesRange) -> String { - format!("{:x}.cache-{}", md5::compute(path), range.to_header()) +fn read_cache_key(path: &str, args: &OpRead) -> String { + format!( + "{:x}.cache-{}", + md5::compute(path), + args.range().to_header() + ) } /// Local read cache for files in object storage #[derive(Clone, Debug)] -pub(crate) struct ReadCache { +pub(crate) struct ReadCache { /// Local file cache backend - file_cache: Operator, + file_cache: Arc, /// Local memory cache to track local cache files mem_cache: Cache, } -impl ReadCache { +impl ReadCache { /// Create a [`ReadCache`] with capacity in bytes. - pub(crate) fn new(file_cache: Operator, capacity: usize) -> Self { + pub(crate) fn new(file_cache: Arc, capacity: usize) -> Self { let file_cache_cloned = file_cache.clone(); let eviction_listener = move |read_key: Arc, read_result: ReadResult, cause| -> ListenerFuture { @@ -79,7 +83,7 @@ impl ReadCache { if let ReadResult::Success(size) = read_result { OBJECT_STORE_LRU_CACHE_BYTES.sub(size as i64); - let result = file_cache_cloned.delete(&read_key).await; + let result = file_cache_cloned.delete(&read_key, OpDelete::new()).await; debug!( "Deleted local cache file `{}`, result: {:?}, cause: {:?}.", read_key, result, cause @@ -129,17 +133,17 @@ impl ReadCache { /// Recover existing cache items from `file_cache` to `mem_cache`. /// Return entry count and total approximate entry size in bytes. pub(crate) async fn recover_cache(&self) -> Result<(u64, u64)> { - let mut pager = self.file_cache.lister("/").await?; + let (_, mut pager) = self.file_cache.list("/", OpList::default()).await?; - while let Some(entry) = pager.next().await.transpose()? { + while let Some(entry) = pager.next().await? { let read_key = entry.path(); // We can't retrieve the metadata from `[opendal::raw::oio::Entry]` directly, // because it's private field. let size = { - let stat = self.file_cache.stat(read_key).await?; + let stat = self.file_cache.stat(read_key, OpStat::default()).await?; - stat.content_length() + stat.into_metadata().content_length() }; OBJECT_STORE_LRU_CACHE_ENTRIES.inc(); @@ -155,7 +159,8 @@ impl ReadCache { /// Returns true when the read cache contains the specific file. pub(crate) async fn contains_file(&self, path: &str) -> bool { self.mem_cache.run_pending_tasks().await; - self.mem_cache.contains_key(path) && self.file_cache.stat(path).await.is_ok() + self.mem_cache.contains_key(path) + && self.file_cache.stat(path, OpStat::default()).await.is_ok() } /// Read from a specific path using the OpRead operation. @@ -168,54 +173,86 @@ impl ReadCache { inner: &I, path: &str, args: OpRead, - ) -> Result<(RpRead, Arc)> + ) -> Result<(RpRead, Box)> where - I: Access, + I: Accessor, { if !can_cache(path) { return inner.read(path, args).await.map(to_output_reader); } - // FIXME: remove this block after opendal v0.47 released. - let meta = inner.stat(path, OpStat::new()).await?; - let (rp, reader) = inner.read(path, args).await?; - let reader: ReadCacheReader = ReadCacheReader { - path: Arc::new(path.to_string()), - inner_reader: reader, - size: meta.into_metadata().content_length(), - file_cache: self.file_cache.clone(), - mem_cache: self.mem_cache.clone(), - }; - Ok((rp, Arc::new(reader))) + let read_key = read_cache_key(path, &args); + + let read_result = self + .mem_cache + .try_get_with( + read_key.clone(), + self.read_remote(inner, &read_key, path, args.clone()), + ) + .await + .map_err(|e| OpendalError::new(e.kind(), &e.to_string()))?; + + match read_result { + ReadResult::Success(_) => { + // There is a concurrent issue here, the local cache may be purged + // while reading, we have to fallback to remote read + match self.file_cache.read(&read_key, OpRead::default()).await { + Ok(ret) => { + OBJECT_STORE_LRU_CACHE_HIT + .with_label_values(&["success"]) + .inc(); + Ok(to_output_reader(ret)) + } + Err(_) => { + OBJECT_STORE_LRU_CACHE_MISS.inc(); + inner.read(path, args).await.map(to_output_reader) + } + } + } + ReadResult::NotFound => { + OBJECT_STORE_LRU_CACHE_HIT + .with_label_values(&["not_found"]) + .inc(); + + Err(OpendalError::new( + ErrorKind::NotFound, + &format!("File not found: {path}"), + )) + } + } } -} -pub struct ReadCacheReader { - /// Path of the file - path: Arc, - /// Remote file reader. - inner_reader: I::Reader, - /// FIXME: remove this field after opendal v0.47 released. - /// - /// OpenDAL's read_at takes `offset, limit` which means the underlying storage - /// services could return less data than limit. We store size here as a workaround. - /// - /// This API has been refactor into `offset, size` instead. After opendal v0.47 released, - /// we don't need this anymore. - size: u64, - /// Local file cache backend - file_cache: Operator, - /// Local memory cache to track local cache files - mem_cache: Cache, -} + async fn try_write_cache(&self, mut reader: I::Reader, read_key: &str) -> Result + where + I: Accessor, + { + let (_, mut writer) = self.file_cache.write(read_key, OpWrite::new()).await?; + let mut total = 0; + while let Some(bytes) = reader.next().await { + let bytes = &bytes?; + total += bytes.len(); + writer.write(bytes).await?; + } + // Call `close` to ensure data is written. + writer.close().await?; + Ok(total) + } -impl ReadCacheReader { - /// TODO: we can return the Buffer directly to avoid another read from cache. - async fn read_remote(&self, offset: u64, limit: usize) -> Result { + /// Read the file from remote storage. If success, write the content into local cache. + async fn read_remote( + &self, + inner: &I, + read_key: &str, + path: &str, + args: OpRead, + ) -> Result + where + I: Accessor, + { OBJECT_STORE_LRU_CACHE_MISS.inc(); - let buf = self.inner_reader.read_at(offset, limit).await?; - let result = self.try_write_cache(buf, offset).await; + let (_, reader) = inner.read(path, args).await?; + let result = self.try_write_cache::(reader, read_key).await; match result { Ok(read_bytes) => { @@ -242,59 +279,10 @@ impl ReadCacheReader { } } } - - async fn try_write_cache(&self, buf: Buffer, offset: u64) -> Result { - let size = buf.len(); - let read_key = read_cache_key(&self.path, BytesRange::new(offset, Some(size as _))); - self.file_cache.write(&read_key, buf).await?; - Ok(size) - } -} - -impl Read for ReadCacheReader { - async fn read_at(&self, offset: u64, limit: usize) -> Result { - let size = self.size.min(offset + limit as u64) - offset; - let read_key = read_cache_key(&self.path, BytesRange::new(offset, Some(size as _))); - - let read_result = self - .mem_cache - .try_get_with(read_key.clone(), self.read_remote(offset, limit)) - .await - .map_err(|e| OpendalError::new(e.kind(), &e.to_string()))?; - - match read_result { - ReadResult::Success(_) => { - // There is a concurrent issue here, the local cache may be purged - // while reading, we have to fallback to remote read - match self.file_cache.read(&read_key).await { - Ok(ret) => { - OBJECT_STORE_LRU_CACHE_HIT - .with_label_values(&["success"]) - .inc(); - Ok(ret) - } - Err(_) => { - OBJECT_STORE_LRU_CACHE_MISS.inc(); - self.inner_reader.read_at(offset, limit).await - } - } - } - ReadResult::NotFound => { - OBJECT_STORE_LRU_CACHE_HIT - .with_label_values(&["not_found"]) - .inc(); - - Err(OpendalError::new( - ErrorKind::NotFound, - &format!("File not found: {}", self.path), - )) - } - } - } } fn to_output_reader(input: (RpRead, R)) -> (RpRead, Reader) { - (input.0, Arc::new(input.1)) + (input.0, Box::new(input.1)) } #[cfg(test)] diff --git a/src/object-store/src/layers/prometheus.rs b/src/object-store/src/layers/prometheus.rs index a609ce7203..51f8689984 100644 --- a/src/object-store/src/layers/prometheus.rs +++ b/src/object-store/src/layers/prometheus.rs @@ -15,11 +15,16 @@ //! code originally from , make a tiny change to avoid crash in multi thread env use std::fmt::{Debug, Formatter}; +use std::io; +use std::task::{Context, Poll}; +use async_trait::async_trait; +use bytes::Bytes; use common_telemetry::debug; +use futures::FutureExt; use lazy_static::lazy_static; use opendal::raw::*; -use opendal::{Buffer, ErrorKind}; +use opendal::ErrorKind; use prometheus::{ exponential_buckets, histogram_opts, register_histogram_vec, register_int_counter_vec, Histogram, HistogramTimer, HistogramVec, IntCounterVec, @@ -84,14 +89,14 @@ fn increment_errors_total(op: Operation, kind: ErrorKind) { #[derive(Default, Debug, Clone)] pub struct PrometheusMetricsLayer; -impl Layer for PrometheusMetricsLayer { - type LayeredAccess = PrometheusAccess; +impl Layer for PrometheusMetricsLayer { + type LayeredAccessor = PrometheusAccessor; - fn layer(&self, inner: A) -> Self::LayeredAccess { + fn layer(&self, inner: A) -> Self::LayeredAccessor { let meta = inner.info(); let scheme = meta.scheme(); - PrometheusAccess { + PrometheusAccessor { inner, scheme: scheme.to_string(), } @@ -99,12 +104,12 @@ impl Layer for PrometheusMetricsLayer { } #[derive(Clone)] -pub struct PrometheusAccess { +pub struct PrometheusAccessor { inner: A, scheme: String, } -impl Debug for PrometheusAccess { +impl Debug for PrometheusAccessor { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { f.debug_struct("PrometheusAccessor") .field("inner", &self.inner) @@ -112,7 +117,8 @@ impl Debug for PrometheusAccess { } } -impl LayeredAccess for PrometheusAccess { +#[async_trait] +impl LayeredAccessor for PrometheusAccessor { type Inner = A; type Reader = PrometheusMetricWrapper; type BlockingReader = PrometheusMetricWrapper; @@ -151,20 +157,27 @@ impl LayeredAccess for PrometheusAccess { .with_label_values(&[&self.scheme, Operation::Read.into_static()]) .start_timer(); - let (rp, r) = self.inner.read(path, args).await.map_err(|e| { - increment_errors_total(Operation::Read, e.kind()); - e - })?; - - Ok(( - rp, - PrometheusMetricWrapper::new( - r, - Operation::Read, - BYTES_TOTAL.with_label_values(&[&self.scheme, Operation::Read.into_static()]), - timer, - ), - )) + self.inner + .read(path, args) + .map(|v| { + v.map(|(rp, r)| { + ( + rp, + PrometheusMetricWrapper::new( + r, + Operation::Read, + BYTES_TOTAL + .with_label_values(&[&self.scheme, Operation::Read.into_static()]), + timer, + ), + ) + }) + }) + .await + .map_err(|e| { + increment_errors_total(Operation::Read, e.kind()); + e + }) } async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { @@ -176,20 +189,27 @@ impl LayeredAccess for PrometheusAccess { .with_label_values(&[&self.scheme, Operation::Write.into_static()]) .start_timer(); - let (rp, r) = self.inner.write(path, args).await.map_err(|e| { - increment_errors_total(Operation::Write, e.kind()); - e - })?; - - Ok(( - rp, - PrometheusMetricWrapper::new( - r, - Operation::Write, - BYTES_TOTAL.with_label_values(&[&self.scheme, Operation::Write.into_static()]), - timer, - ), - )) + self.inner + .write(path, args) + .map(|v| { + v.map(|(rp, r)| { + ( + rp, + PrometheusMetricWrapper::new( + r, + Operation::Write, + BYTES_TOTAL + .with_label_values(&[&self.scheme, Operation::Write.into_static()]), + timer, + ), + ) + }) + }) + .await + .map_err(|e| { + increment_errors_total(Operation::Write, e.kind()); + e + }) } async fn stat(&self, path: &str, args: OpStat) -> Result { @@ -441,46 +461,103 @@ impl PrometheusMetricWrapper { } impl oio::Read for PrometheusMetricWrapper { - async fn read_at(&self, offset: u64, limit: usize) -> Result { - self.inner.read_at(offset, limit).await.map_err(|err| { - increment_errors_total(self.op, err.kind()); - err + fn poll_read(&mut self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll> { + self.inner.poll_read(cx, buf).map(|res| match res { + Ok(bytes) => { + self.bytes += bytes as u64; + Ok(bytes) + } + Err(e) => { + increment_errors_total(self.op, e.kind()); + Err(e) + } + }) + } + + fn poll_seek(&mut self, cx: &mut Context<'_>, pos: io::SeekFrom) -> Poll> { + self.inner.poll_seek(cx, pos).map(|res| match res { + Ok(n) => Ok(n), + Err(e) => { + increment_errors_total(self.op, e.kind()); + Err(e) + } + }) + } + + fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll>> { + self.inner.poll_next(cx).map(|res| match res { + Some(Ok(bytes)) => { + self.bytes += bytes.len() as u64; + Some(Ok(bytes)) + } + Some(Err(e)) => { + increment_errors_total(self.op, e.kind()); + Some(Err(e)) + } + None => None, }) } } impl oio::BlockingRead for PrometheusMetricWrapper { - fn read_at(&self, offset: u64, limit: usize) -> opendal::Result { - self.inner.read_at(offset, limit).map_err(|err| { + fn read(&mut self, buf: &mut [u8]) -> Result { + self.inner + .read(buf) + .map(|n| { + self.bytes += n as u64; + n + }) + .map_err(|e| { + increment_errors_total(self.op, e.kind()); + e + }) + } + + fn seek(&mut self, pos: io::SeekFrom) -> Result { + self.inner.seek(pos).map_err(|err| { increment_errors_total(self.op, err.kind()); err }) } + + fn next(&mut self) -> Option> { + self.inner.next().map(|res| match res { + Ok(bytes) => { + self.bytes += bytes.len() as u64; + Ok(bytes) + } + Err(e) => { + increment_errors_total(self.op, e.kind()); + Err(e) + } + }) + } } +#[async_trait] impl oio::Write for PrometheusMetricWrapper { - async fn write(&mut self, bs: Buffer) -> Result { - match self.inner.write(bs).await { - Ok(n) => { + fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Poll> { + self.inner + .poll_write(cx, bs) + .map_ok(|n| { self.bytes += n as u64; - Ok(n) - } - Err(err) => { + n + }) + .map_err(|err| { increment_errors_total(self.op, err.kind()); - Err(err) - } - } + err + }) } - async fn close(&mut self) -> Result<()> { - self.inner.close().await.map_err(|err| { + fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll> { + self.inner.poll_abort(cx).map_err(|err| { increment_errors_total(self.op, err.kind()); err }) } - async fn abort(&mut self) -> Result<()> { - self.inner.close().await.map_err(|err| { + fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll> { + self.inner.poll_close(cx).map_err(|err| { increment_errors_total(self.op, err.kind()); err }) @@ -488,7 +565,7 @@ impl oio::Write for PrometheusMetricWrapper { } impl oio::BlockingWrite for PrometheusMetricWrapper { - fn write(&mut self, bs: Buffer) -> Result { + fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result { self.inner .write(bs) .map(|n| { diff --git a/src/object-store/src/lib.rs b/src/object-store/src/lib.rs index ae561d4bbc..a26a9bda64 100644 --- a/src/object-store/src/lib.rs +++ b/src/object-store/src/lib.rs @@ -14,9 +14,8 @@ pub use opendal::raw::{normalize_path as raw_normalize_path, HttpClient}; pub use opendal::{ - services, Builder as ObjectStoreBuilder, Entry, EntryMode, Error, ErrorKind, - FuturesAsyncReader, FuturesAsyncWriter, Lister, Metakey, Operator as ObjectStore, Reader, - Result, Writer, + services, Builder as ObjectStoreBuilder, Entry, EntryMode, Error, ErrorKind, Lister, Metakey, + Operator as ObjectStore, Reader, Result, Writer, }; pub mod layers; diff --git a/src/object-store/tests/object_store_test.rs b/src/object-store/tests/object_store_test.rs index a3d3800054..cfe0afe027 100644 --- a/src/object-store/tests/object_store_test.rs +++ b/src/object-store/tests/object_store_test.rs @@ -22,6 +22,7 @@ use object_store::layers::LruCacheLayer; use object_store::services::{Fs, S3}; use object_store::test_util::TempFolder; use object_store::{ObjectStore, ObjectStoreBuilder}; +use opendal::raw::Accessor; use opendal::services::{Azblob, Gcs, Oss}; use opendal::{EntryMode, Operator, OperatorBuilder}; @@ -35,11 +36,11 @@ async fn test_object_crud(store: &ObjectStore) -> Result<()> { // Read data from object; let bs = store.read(file_name).await?; - assert_eq!("Hello, World!", String::from_utf8(bs.to_vec())?); + assert_eq!("Hello, World!", String::from_utf8(bs)?); // Read range from object; let bs = store.read_with(file_name).range(1..=11).await?; - assert_eq!("ello, World", String::from_utf8(bs.to_vec())?); + assert_eq!("ello, World", String::from_utf8(bs)?); // Get object's Metadata let meta = store.stat(file_name).await?; @@ -76,7 +77,7 @@ async fn test_object_list(store: &ObjectStore) -> Result<()> { assert_eq!(p2, entries.first().unwrap().path()); let content = store.read(p2).await?; - assert_eq!("Hello, object2!", String::from_utf8(content.to_vec())?); + assert_eq!("Hello, object2!", String::from_utf8(content)?); store.delete(p2).await?; let entries = store.list("/").await?; @@ -235,9 +236,11 @@ async fn test_file_backend_with_lru_cache() -> Result<()> { let _ = builder .root(&cache_dir.path().to_string_lossy()) .atomic_write_dir(&cache_dir.path().to_string_lossy()); - let file_cache = Operator::new(builder).unwrap().finish(); + let file_cache = Arc::new(builder.build().unwrap()); - LruCacheLayer::new(file_cache, 32).await.unwrap() + LruCacheLayer::new(Arc::new(file_cache.clone()), 32) + .await + .unwrap() }; let store = store.layer(cache_layer.clone()); @@ -250,7 +253,10 @@ async fn test_file_backend_with_lru_cache() -> Result<()> { Ok(()) } -async fn assert_lru_cache(cache_layer: &LruCacheLayer, file_names: &[&str]) { +async fn assert_lru_cache( + cache_layer: &LruCacheLayer, + file_names: &[&str], +) { for file_name in file_names { assert!(cache_layer.contains_file(file_name).await); } @@ -272,7 +278,7 @@ async fn assert_cache_files( let bs = store.read(o.path()).await.unwrap(); assert_eq!( file_contents[position], - String::from_utf8(bs.to_vec())?, + String::from_utf8(bs.clone())?, "file content not match: {}", o.name() ); @@ -306,7 +312,9 @@ async fn test_object_store_cache_policy() -> Result<()> { let cache_store = OperatorBuilder::new(file_cache.clone()).finish(); // create operator for cache dir to verify cache file - let cache_layer = LruCacheLayer::new(cache_store.clone(), 38).await.unwrap(); + let cache_layer = LruCacheLayer::new(Arc::new(file_cache.clone()), 38) + .await + .unwrap(); let store = store.layer(cache_layer.clone()); // create several object handler. @@ -378,7 +386,7 @@ async fn test_object_store_cache_policy() -> Result<()> { // instead of returning `NotFound` during the reader creation. // The entry count is 4, because we have the p2 `NotFound` cache. assert!(store.read_with(p2).range(0..4).await.is_err()); - assert_eq!(cache_layer.read_cache_stat().await, (3, 35)); + assert_eq!(cache_layer.read_cache_stat().await, (4, 35)); assert_cache_files( &cache_store, @@ -406,7 +414,7 @@ async fn test_object_store_cache_policy() -> Result<()> { assert!(store.read(p2).await.is_err()); // Read p1 with range `1..` , the existing p1 with range `0..` must be evicted. let _ = store.read_with(p1).range(1..15).await.unwrap(); - assert_eq!(cache_layer.read_cache_stat().await, (3, 34)); + assert_eq!(cache_layer.read_cache_stat().await, (4, 34)); assert_cache_files( &cache_store, &[ @@ -434,7 +442,7 @@ async fn test_object_store_cache_policy() -> Result<()> { drop(cache_layer); // Test recover - let cache_layer = LruCacheLayer::new(cache_store, 38).await.unwrap(); + let cache_layer = LruCacheLayer::new(Arc::new(file_cache), 38).await.unwrap(); // The p2 `NotFound` cache will not be recovered assert_eq!(cache_layer.read_cache_stat().await, (3, 34)); diff --git a/src/operator/Cargo.toml b/src/operator/Cargo.toml index 8681dca297..f99ea1c63b 100644 --- a/src/operator/Cargo.toml +++ b/src/operator/Cargo.toml @@ -56,7 +56,6 @@ store-api.workspace = true substrait.workspace = true table.workspace = true tokio.workspace = true -tokio-util.workspace = true tonic.workspace = true [dev-dependencies] diff --git a/src/operator/src/statement/copy_table_from.rs b/src/operator/src/statement/copy_table_from.rs index 94892f10a8..52880d700e 100644 --- a/src/operator/src/statement/copy_table_from.rs +++ b/src/operator/src/statement/copy_table_from.rs @@ -20,7 +20,7 @@ use client::{Output, OutputData, OutputMeta}; use common_base::readable_size::ReadableSize; use common_datasource::file_format::csv::{CsvConfigBuilder, CsvFormat, CsvOpener}; use common_datasource::file_format::json::{JsonFormat, JsonOpener}; -use common_datasource::file_format::orc::{infer_orc_schema, new_orc_stream_reader, ReaderAdapter}; +use common_datasource::file_format::orc::{infer_orc_schema, new_orc_stream_reader}; use common_datasource::file_format::{FileFormat, Format}; use common_datasource::lister::{Lister, Source}; use common_datasource::object_store::{build_backend, parse_url}; @@ -46,7 +46,6 @@ use session::context::QueryContextRef; use snafu::ResultExt; use table::requests::{CopyTableRequest, InsertRequest}; use table::table_reference::TableReference; -use tokio_util::compat::FuturesAsyncReadCompatExt; use crate::error::{self, IntoVectorsSnafu, Result}; use crate::statement::StatementExecutor; @@ -147,16 +146,10 @@ impl StatementExecutor { path, }), Format::Parquet(_) => { - let meta = object_store - .stat(&path) - .await - .context(error::ReadObjectSnafu { path: &path })?; let mut reader = object_store .reader(&path) .await - .context(error::ReadObjectSnafu { path: &path })? - .into_futures_async_read(0..meta.content_length()) - .compat(); + .context(error::ReadObjectSnafu { path: &path })?; let metadata = ArrowReaderMetadata::load_async(&mut reader, Default::default()) .await .context(error::ReadParquetMetadataSnafu)?; @@ -168,17 +161,12 @@ impl StatementExecutor { }) } Format::Orc(_) => { - let meta = object_store - .stat(&path) - .await - .context(error::ReadObjectSnafu { path: &path })?; - let reader = object_store .reader(&path) .await .context(error::ReadObjectSnafu { path: &path })?; - let schema = infer_orc_schema(ReaderAdapter::new(reader, meta.content_length())) + let schema = infer_orc_schema(reader) .await .context(error::ReadOrcSnafu)?; @@ -291,17 +279,11 @@ impl StatementExecutor { ))) } FileMetadata::Parquet { metadata, path, .. } => { - let meta = object_store - .stat(path) - .await - .context(error::ReadObjectSnafu { path })?; let reader = object_store .reader_with(path) - .chunk(DEFAULT_READ_BUFFER) + .buffer(DEFAULT_READ_BUFFER) .await - .context(error::ReadObjectSnafu { path })? - .into_futures_async_read(0..meta.content_length()) - .compat(); + .context(error::ReadObjectSnafu { path })?; let builder = ParquetRecordBatchStreamBuilder::new_with_metadata(reader, metadata.clone()); let stream = builder @@ -320,20 +302,14 @@ impl StatementExecutor { ))) } FileMetadata::Orc { path, .. } => { - let meta = object_store - .stat(path) - .await - .context(error::ReadObjectSnafu { path })?; - let reader = object_store .reader_with(path) - .chunk(DEFAULT_READ_BUFFER) + .buffer(DEFAULT_READ_BUFFER) .await .context(error::ReadObjectSnafu { path })?; - let stream = - new_orc_stream_reader(ReaderAdapter::new(reader, meta.content_length())) - .await - .context(error::ReadOrcSnafu)?; + let stream = new_orc_stream_reader(reader) + .await + .context(error::ReadOrcSnafu)?; let projected_schema = Arc::new( compat_schema diff --git a/src/servers/src/export_metrics.rs b/src/servers/src/export_metrics.rs index d71dbcaa88..d97efc5757 100644 --- a/src/servers/src/export_metrics.rs +++ b/src/servers/src/export_metrics.rs @@ -16,12 +16,14 @@ use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; +use axum::http::HeaderValue; use common_base::Plugins; use common_telemetry::metric::{convert_metric_to_write_request, MetricFilter}; use common_telemetry::{error, info}; use common_time::Timestamp; +use hyper::HeaderMap; use prost::Message; -use reqwest::header::{HeaderMap, HeaderName, HeaderValue}; +use reqwest::header::HeaderName; use serde::{Deserialize, Serialize}; use session::context::QueryContextBuilder; use snafu::{ensure, ResultExt}; @@ -113,7 +115,7 @@ impl ExportMetricsTask { } ); } - let mut headers = HeaderMap::new(); + let mut headers = reqwest::header::HeaderMap::new(); if let Some(remote_write) = &config.remote_write { ensure!( !remote_write.url.is_empty(), diff --git a/src/servers/src/http/greptime_result_v1.rs b/src/servers/src/http/greptime_result_v1.rs index de8284c40a..9cb2924ba6 100644 --- a/src/servers/src/http/greptime_result_v1.rs +++ b/src/servers/src/http/greptime_result_v1.rs @@ -14,10 +14,10 @@ use std::collections::HashMap; -use axum::headers::HeaderValue; use axum::response::{IntoResponse, Response}; use axum::Json; use common_query::Output; +use reqwest::header::HeaderValue; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; use serde_json::Value; diff --git a/src/servers/src/http/test_helpers.rs b/src/servers/src/http/test_helpers.rs index 0142fc5180..11d416f97f 100644 --- a/src/servers/src/http/test_helpers.rs +++ b/src/servers/src/http/test_helpers.rs @@ -32,7 +32,6 @@ use std::convert::TryFrom; use std::net::{SocketAddr, TcpListener}; -use std::str::FromStr; use axum::body::HttpBody; use axum::BoxError; @@ -170,15 +169,7 @@ impl RequestBuilder { HeaderValue: TryFrom, >::Error: Into, { - // TODO(tisonkun): revert once http bump to 1.x - let key: HeaderName = key.try_into().map_err(Into::into).unwrap(); - let key = reqwest::header::HeaderName::from_bytes(key.as_ref()).unwrap(); - - let value: HeaderValue = value.try_into().map_err(Into::into).unwrap(); - let value = reqwest::header::HeaderValue::from_bytes(value.as_bytes()).unwrap(); - self.builder = self.builder.header(key, value); - self } @@ -219,19 +210,12 @@ impl TestResponse { /// Get the response status. pub fn status(&self) -> StatusCode { - StatusCode::from_u16(self.response.status().as_u16()).unwrap() + self.response.status() } /// Get the response headers. - pub fn headers(&self) -> http::HeaderMap { - // TODO(tisonkun): revert once http bump to 1.x - let mut headers = http::HeaderMap::new(); - for (key, value) in self.response.headers() { - let key = HeaderName::from_str(key.as_str()).unwrap(); - let value = HeaderValue::from_bytes(value.as_bytes()).unwrap(); - headers.insert(key, value); - } - headers + pub fn headers(&self) -> &http::HeaderMap { + self.response.headers() } /// Get the response in chunks.