Compare commits

...

3 Commits

Author SHA1 Message Date
Ruihang Xia
1bfba48755 Revert "build(deps): upgrade opendal to 0.46 (#4037)"
This reverts commit f9db5ff0d6.
2024-06-03 20:28:59 +08:00
Ruihang Xia
457998f0fe Merge branch 'main' into avoid-query-meta
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2024-05-31 18:16:36 +08:00
Ruihang Xia
b02c256157 perf: use memory state to check if a logical region exists
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2024-05-31 18:16:08 +08:00
35 changed files with 447 additions and 649 deletions

234
Cargo.lock generated
View File

@@ -83,7 +83,7 @@ dependencies = [
"axum",
"bytes",
"cfg-if",
"http 0.2.12",
"http",
"indexmap 1.9.3",
"schemars",
"serde",
@@ -764,9 +764,9 @@ dependencies = [
"bytes",
"futures-util",
"headers",
"http 0.2.12",
"http-body 0.4.6",
"hyper 0.14.28",
"http",
"http-body",
"hyper",
"itoa",
"matchit",
"memchr",
@@ -794,8 +794,8 @@ dependencies = [
"async-trait",
"bytes",
"futures-util",
"http 0.2.12",
"http-body 0.4.6",
"http",
"http-body",
"mime",
"rustversion",
"tower-layer",
@@ -1854,7 +1854,7 @@ dependencies = [
"common-telemetry",
"common-test-util",
"common-version",
"hyper 0.14.28",
"hyper",
"reqwest",
"serde",
"tempfile",
@@ -1963,7 +1963,7 @@ dependencies = [
"futures-util",
"hex",
"humantime-serde",
"hyper 0.14.28",
"hyper",
"itertools 0.10.5",
"lazy_static",
"moka",
@@ -3525,6 +3525,15 @@ version = "0.3.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a357d28ed41a50f9c765dbfe56cbc04a64e53e5fc58ba79fbc34c10ef3df831f"
[[package]]
name = "encoding_rs"
version = "0.8.34"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b45de904aa0b010bce2ab45264d0631681847fa7b6f2eaa7dab7619943bc4f59"
dependencies = [
"cfg-if",
]
[[package]]
name = "endian-type"
version = "0.1.2"
@@ -3595,7 +3604,7 @@ name = "etcd-client"
version = "0.12.4"
source = "git+https://github.com/MichaelScofield/etcd-client.git?rev=4c371e9b3ea8e0a8ee2f9cbd7ded26e54a45df3b#4c371e9b3ea8e0a8ee2f9cbd7ded26e54a45df3b"
dependencies = [
"http 0.2.12",
"http",
"prost 0.12.4",
"tokio",
"tokio-stream",
@@ -4210,7 +4219,7 @@ dependencies = [
"futures-core",
"futures-sink",
"futures-util",
"http 0.2.12",
"http",
"indexmap 2.2.6",
"slab",
"tokio",
@@ -4294,7 +4303,7 @@ dependencies = [
"base64 0.21.7",
"bytes",
"headers-core",
"http 0.2.12",
"http",
"httpdate",
"mime",
"sha1",
@@ -4306,7 +4315,7 @@ version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e7f66481bfee273957b1f20485a4ff3362987f85b2c236580d81b4eb7a326429"
dependencies = [
"http 0.2.12",
"http",
]
[[package]]
@@ -4409,17 +4418,6 @@ dependencies = [
"itoa",
]
[[package]]
name = "http"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "21b9ddb458710bc376481b842f5da65cdf31522de232c1ca8146abce2a358258"
dependencies = [
"bytes",
"fnv",
"itoa",
]
[[package]]
name = "http-body"
version = "0.4.6"
@@ -4427,30 +4425,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7ceab25649e9960c0311ea418d17bee82c0dcec1bd053b5f9a66e265a693bed2"
dependencies = [
"bytes",
"http 0.2.12",
"pin-project-lite",
]
[[package]]
name = "http-body"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1cac85db508abc24a2e48553ba12a996e87244a0395ce011e62b37158745d643"
dependencies = [
"bytes",
"http 1.1.0",
]
[[package]]
name = "http-body-util"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0475f8b2ac86659c21b64320d5d653f9efe42acd2a4e560073ec61a155a34f1d"
dependencies = [
"bytes",
"futures-core",
"http 1.1.0",
"http-body 1.0.0",
"http",
"pin-project-lite",
]
@@ -4607,8 +4582,8 @@ dependencies = [
"futures-core",
"futures-util",
"h2",
"http 0.2.12",
"http-body 0.4.6",
"http",
"http-body",
"httparse",
"httpdate",
"itoa",
@@ -4620,40 +4595,18 @@ dependencies = [
"want",
]
[[package]]
name = "hyper"
version = "1.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fe575dd17d0862a9a33781c8c4696a55c320909004a67a00fb286ba8b1bc496d"
dependencies = [
"bytes",
"futures-channel",
"futures-util",
"http 1.1.0",
"http-body 1.0.0",
"httparse",
"itoa",
"pin-project-lite",
"smallvec",
"tokio",
"want",
]
[[package]]
name = "hyper-rustls"
version = "0.26.0"
version = "0.24.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a0bea761b46ae2b24eb4aef630d8d1c398157b6fc29e6350ecf090a0b70c952c"
checksum = "ec3efd23720e2049821a693cbc7e65ea87c72f1c58ff2f9522ff332b1491e590"
dependencies = [
"futures-util",
"http 1.1.0",
"hyper 1.3.1",
"hyper-util",
"rustls 0.22.4",
"rustls-pki-types",
"http",
"hyper",
"rustls 0.21.12",
"tokio",
"tokio-rustls 0.25.0",
"tower-service",
"tokio-rustls 0.24.1",
]
[[package]]
@@ -4662,32 +4615,12 @@ version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bbb958482e8c7be4bc3cf272a766a2b0bf1a6755e7a6ae777f017a31d11b13b1"
dependencies = [
"hyper 0.14.28",
"hyper",
"pin-project-lite",
"tokio",
"tokio-io-timeout",
]
[[package]]
name = "hyper-util"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ca38ef113da30126bbff9cd1705f9273e15d45498615d138b0c20279ac7a76aa"
dependencies = [
"bytes",
"futures-channel",
"futures-util",
"http 1.1.0",
"http-body 1.0.0",
"hyper 1.3.1",
"pin-project-lite",
"socket2 0.5.7",
"tokio",
"tower",
"tower-service",
"tracing",
]
[[package]]
name = "iana-time-zone"
version = "0.1.60"
@@ -5671,7 +5604,7 @@ dependencies = [
"etcd-client",
"futures",
"h2",
"http-body 0.4.6",
"http-body",
"humantime",
"humantime-serde",
"itertools 0.10.5",
@@ -6433,6 +6366,7 @@ name = "object-store"
version = "0.8.1"
dependencies = [
"anyhow",
"async-trait",
"bytes",
"common-telemetry",
"common-test-util",
@@ -6481,21 +6415,20 @@ checksum = "0ab1bc2a289d34bd04a330323ac98a1b4bc82c9d9fcb1e66b63caa84da26b575"
[[package]]
name = "opendal"
version = "0.46.0"
version = "0.45.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "328c4992328e8965e6a6ef102d38438b5fdc7d9b9107eda2377ba05379d9d544"
checksum = "52c17c077f23fa2d2c25d9d22af98baa43b8bbe2ef0de80cf66339aa70401467"
dependencies = [
"anyhow",
"async-trait",
"backon",
"base64 0.22.1",
"base64 0.21.7",
"bytes",
"chrono",
"crc32c",
"flagset",
"futures",
"getrandom",
"http 1.1.0",
"http",
"log",
"md-5",
"once_cell",
@@ -6583,7 +6516,7 @@ checksum = "f24cda83b20ed2433c68241f918d0f6fdec8b1d43b7a9590ab4420c5095ca930"
dependencies = [
"async-trait",
"futures-core",
"http 0.2.12",
"http",
"opentelemetry 0.21.0",
"opentelemetry-proto 0.4.0",
"opentelemetry-semantic-conventions",
@@ -6720,7 +6653,6 @@ dependencies = [
"substrait 0.8.1",
"table",
"tokio",
"tokio-util",
"tonic 0.11.0",
]
@@ -8264,20 +8196,20 @@ dependencies = [
[[package]]
name = "reqsign"
version = "0.15.0"
version = "0.14.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "01edce6b6c31a16ebc7525ac58c747a6d78bbce33e76bbebd350d6bc25b23e06"
checksum = "43e319d9de9ff4d941abf4ac718897118b0fe04577ea3f8e0f5788971784eef5"
dependencies = [
"anyhow",
"async-trait",
"base64 0.22.1",
"base64 0.21.7",
"chrono",
"form_urlencoded",
"getrandom",
"hex",
"hmac",
"home",
"http 1.1.0",
"http",
"jsonwebtoken",
"log",
"once_cell",
@@ -8286,7 +8218,7 @@ dependencies = [
"rand",
"reqwest",
"rsa 0.9.6",
"rust-ini 0.21.0",
"rust-ini 0.20.0",
"serde",
"serde_json",
"sha1",
@@ -8295,20 +8227,20 @@ dependencies = [
[[package]]
name = "reqwest"
version = "0.12.4"
version = "0.11.27"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "566cafdd92868e0939d3fb961bd0dc25fcfaaed179291093b3d43e6b3150ea10"
checksum = "dd67538700a17451e7cba03ac727fb961abb7607553461627b97de0b89cf4a62"
dependencies = [
"base64 0.22.1",
"base64 0.21.7",
"bytes",
"encoding_rs",
"futures-core",
"futures-util",
"http 1.1.0",
"http-body 1.0.0",
"http-body-util",
"hyper 1.3.1",
"h2",
"http",
"http-body",
"hyper",
"hyper-rustls",
"hyper-util",
"ipnet",
"js-sys",
"log",
@@ -8317,16 +8249,16 @@ dependencies = [
"once_cell",
"percent-encoding",
"pin-project-lite",
"rustls 0.22.4",
"rustls 0.21.12",
"rustls-native-certs",
"rustls-pemfile 2.1.2",
"rustls-pki-types",
"rustls-pemfile 1.0.4",
"serde",
"serde_json",
"serde_urlencoded",
"sync_wrapper",
"system-configuration",
"tokio",
"tokio-rustls 0.25.0",
"tokio-rustls 0.24.1",
"tokio-util",
"tower-service",
"url",
@@ -8334,8 +8266,7 @@ dependencies = [
"wasm-bindgen-futures",
"wasm-streams",
"web-sys",
"webpki-roots 0.26.1",
"winreg 0.52.0",
"winreg 0.50.0",
]
[[package]]
@@ -8601,13 +8532,12 @@ dependencies = [
[[package]]
name = "rust-ini"
version = "0.21.0"
version = "0.20.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0d625ed57d8f49af6cfa514c42e1a71fadcff60eb0b1c517ff82fe41aa025b41"
checksum = "3e0698206bcb8882bf2a9ecb4c1e7785db57ff052297085a6efd4fe42302068a"
dependencies = [
"cfg-if",
"ordered-multimap 0.7.3",
"trim-in-place",
]
[[package]]
@@ -8749,13 +8679,12 @@ dependencies = [
[[package]]
name = "rustls-native-certs"
version = "0.7.0"
version = "0.6.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8f1fb85efa936c42c6d5fc28d2629bb51e4b2f4b8a5211e297d599cc5a093792"
checksum = "a9aace74cb666635c918e9c12bc0d348266037aa8eb599b5cba565709a8dff00"
dependencies = [
"openssl-probe",
"rustls-pemfile 2.1.2",
"rustls-pki-types",
"rustls-pemfile 1.0.4",
"schannel",
"security-framework",
]
@@ -9588,10 +9517,10 @@ dependencies = [
"hashbrown 0.14.5",
"headers",
"hostname",
"http 0.2.12",
"http-body 0.4.6",
"http",
"http-body",
"humantime-serde",
"hyper 0.14.28",
"hyper",
"influxdb_line_protocol",
"itertools 0.10.5",
"lazy_static",
@@ -11215,9 +11144,9 @@ dependencies = [
"futures-core",
"futures-util",
"h2",
"http 0.2.12",
"http-body 0.4.6",
"hyper 0.14.28",
"http",
"http-body",
"hyper",
"hyper-timeout",
"percent-encoding",
"pin-project",
@@ -11243,9 +11172,9 @@ dependencies = [
"bytes",
"flate2",
"h2",
"http 0.2.12",
"http-body 0.4.6",
"hyper 0.14.28",
"http",
"http-body",
"hyper",
"hyper-timeout",
"percent-encoding",
"pin-project",
@@ -11347,8 +11276,8 @@ dependencies = [
"bytes",
"futures-core",
"futures-util",
"http 0.2.12",
"http-body 0.4.6",
"http",
"http-body",
"http-range-header",
"httpdate",
"iri-string",
@@ -11591,12 +11520,6 @@ dependencies = [
"tree-sitter",
]
[[package]]
name = "trim-in-place"
version = "0.1.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "343e926fc669bc8cde4fa3129ab681c63671bae288b1f1081ceee6d9d37904fc"
[[package]]
name = "triomphe"
version = "0.1.11"
@@ -12262,15 +12185,6 @@ version = "0.25.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5f20c57d8d7db6d3b86154206ae5d8fba62dd39573114de97c2cb0578251f8e1"
[[package]]
name = "webpki-roots"
version = "0.26.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b3de34ae270483955a94f4b21bdaaeb83d508bb84a01435f393818edb0012009"
dependencies = [
"rustls-pki-types",
]
[[package]]
name = "which"
version = "4.4.2"
@@ -12637,9 +12551,9 @@ dependencies = [
[[package]]
name = "winreg"
version = "0.52.0"
version = "0.50.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a277a57398d4bfa075df44f501a17cfdf8542d224f0d36095a2adc7aee4ef0a5"
checksum = "524e57b2c537c0f9b1e69f1965311ec12182b4122e45035b1508cd24d2adadb1"
dependencies = [
"cfg-if",
"windows-sys 0.48.0",

View File

@@ -146,7 +146,7 @@ raft-engine = { version = "0.4.1", default-features = false }
rand = "0.8"
regex = "1.8"
regex-automata = { version = "0.4" }
reqwest = { version = "0.12", default-features = false, features = [
reqwest = { version = "0.11", default-features = false, features = [
"json",
"rustls-tls-native-roots",
"stream",

View File

@@ -92,44 +92,34 @@ impl CompressionType {
macro_rules! impl_compression_type {
($(($enum_item:ident, $prefix:ident)),*) => {
paste::item! {
use bytes::{Buf, BufMut, BytesMut};
impl CompressionType {
pub async fn encode<B: Buf>(&self, mut content: B) -> io::Result<Vec<u8>> {
pub async fn encode(&self, content: impl AsRef<[u8]>) -> io::Result<Vec<u8>> {
match self {
$(
CompressionType::$enum_item => {
let mut buffer = Vec::with_capacity(content.remaining());
let mut buffer = Vec::with_capacity(content.as_ref().len());
let mut encoder = write::[<$prefix Encoder>]::new(&mut buffer);
encoder.write_all_buf(&mut content).await?;
encoder.write_all(content.as_ref()).await?;
encoder.shutdown().await?;
Ok(buffer)
}
)*
CompressionType::Uncompressed => {
let mut bs = BytesMut::with_capacity(content.remaining());
bs.put(content);
Ok(bs.to_vec())
},
CompressionType::Uncompressed => Ok(content.as_ref().to_vec()),
}
}
pub async fn decode<B: Buf>(&self, mut content: B) -> io::Result<Vec<u8>> {
pub async fn decode(&self, content: impl AsRef<[u8]>) -> io::Result<Vec<u8>> {
match self {
$(
CompressionType::$enum_item => {
let mut buffer = Vec::with_capacity(content.remaining() * 2);
let mut buffer = Vec::with_capacity(content.as_ref().len() * 2);
let mut encoder = write::[<$prefix Decoder>]::new(&mut buffer);
encoder.write_all_buf(&mut content).await?;
encoder.write_all(content.as_ref()).await?;
encoder.shutdown().await?;
Ok(buffer)
}
)*
CompressionType::Uncompressed => {
let mut bs = BytesMut::with_capacity(content.remaining());
bs.put(content);
Ok(bs.to_vec())
},
CompressionType::Uncompressed => Ok(content.as_ref().to_vec()),
}
}
@@ -161,13 +151,13 @@ macro_rules! impl_compression_type {
$(
#[tokio::test]
async fn [<test_ $enum_item:lower _compression>]() {
let string = "foo_bar".as_bytes();
let string = "foo_bar".as_bytes().to_vec();
let compress = CompressionType::$enum_item
.encode(string)
.encode(&string)
.await
.unwrap();
let decompress = CompressionType::$enum_item
.decode(compress.as_slice())
.decode(&compress)
.await
.unwrap();
assert_eq!(decompress, string);
@@ -175,13 +165,13 @@ macro_rules! impl_compression_type {
#[tokio::test]
async fn test_uncompression() {
let string = "foo_bar".as_bytes();
let string = "foo_bar".as_bytes().to_vec();
let compress = CompressionType::Uncompressed
.encode(string)
.encode(&string)
.await
.unwrap();
let decompress = CompressionType::Uncompressed
.decode(compress.as_slice())
.decode(&compress)
.await
.unwrap();
assert_eq!(decompress, string);

View File

@@ -36,7 +36,6 @@ use datafusion::physical_plan::SendableRecordBatchStream;
use futures::StreamExt;
use object_store::ObjectStore;
use snafu::ResultExt;
use tokio_util::compat::FuturesAsyncWriteCompatExt;
use self::csv::CsvFormat;
use self::json::JsonFormat;
@@ -147,8 +146,7 @@ pub fn open_with_decoder<T: ArrowDecoder, F: Fn() -> DataFusionResult<T>>(
let reader = object_store
.reader(&path)
.await
.map_err(|e| DataFusionError::External(Box::new(e)))?
.into_bytes_stream(..);
.map_err(|e| DataFusionError::External(Box::new(e)))?;
let mut upstream = compression_type.convert_stream(reader).fuse();
@@ -205,7 +203,6 @@ pub async fn stream_to_file<T: DfRecordBatchEncoder, U: Fn(SharedBuffer) -> T>(
.writer_with(&path)
.concurrent(concurrency)
.await
.map(|v| v.into_futures_async_write().compat_write())
.context(error::WriteObjectSnafu { path })
});

View File

@@ -29,7 +29,6 @@ use datafusion::physical_plan::SendableRecordBatchStream;
use derive_builder::Builder;
use object_store::ObjectStore;
use snafu::ResultExt;
use tokio_util::compat::FuturesAsyncReadCompatExt;
use tokio_util::io::SyncIoBridge;
use super::stream_to_file;
@@ -165,16 +164,10 @@ impl FileOpener for CsvOpener {
#[async_trait]
impl FileFormat for CsvFormat {
async fn infer_schema(&self, store: &ObjectStore, path: &str) -> Result<Schema> {
let meta = store
.stat(path)
.await
.context(error::ReadObjectSnafu { path })?;
let reader = store
.reader(path)
.await
.context(error::ReadObjectSnafu { path })?
.into_futures_async_read(0..meta.content_length())
.compat();
.context(error::ReadObjectSnafu { path })?;
let decoded = self.compression_type.convert_async_read(reader);

View File

@@ -31,7 +31,6 @@ use datafusion::error::{DataFusionError, Result as DataFusionResult};
use datafusion::physical_plan::SendableRecordBatchStream;
use object_store::ObjectStore;
use snafu::ResultExt;
use tokio_util::compat::FuturesAsyncReadCompatExt;
use tokio_util::io::SyncIoBridge;
use super::stream_to_file;
@@ -83,16 +82,10 @@ impl Default for JsonFormat {
#[async_trait]
impl FileFormat for JsonFormat {
async fn infer_schema(&self, store: &ObjectStore, path: &str) -> Result<Schema> {
let meta = store
.stat(path)
.await
.context(error::ReadObjectSnafu { path })?;
let reader = store
.reader(path)
.await
.context(error::ReadObjectSnafu { path })?
.into_futures_async_read(0..meta.content_length())
.compat();
.context(error::ReadObjectSnafu { path })?;
let decoded = self.compression_type.convert_async_read(reader);

View File

@@ -16,17 +16,15 @@ use std::sync::Arc;
use arrow_schema::{ArrowError, Schema, SchemaRef};
use async_trait::async_trait;
use bytes::Bytes;
use common_recordbatch::adapter::RecordBatchStreamTypeAdapter;
use datafusion::datasource::physical_plan::{FileMeta, FileOpenFuture, FileOpener};
use datafusion::error::{DataFusionError, Result as DfResult};
use futures::future::BoxFuture;
use futures::{FutureExt, StreamExt, TryStreamExt};
use futures::{StreamExt, TryStreamExt};
use object_store::ObjectStore;
use orc_rust::arrow_reader::ArrowReaderBuilder;
use orc_rust::async_arrow_reader::ArrowStreamReader;
use orc_rust::reader::AsyncChunkReader;
use snafu::ResultExt;
use tokio::io::{AsyncRead, AsyncSeek};
use crate::error::{self, Result};
use crate::file_format::FileFormat;
@@ -34,49 +32,18 @@ use crate::file_format::FileFormat;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub struct OrcFormat;
#[derive(Clone)]
pub struct ReaderAdapter {
reader: object_store::Reader,
len: u64,
}
impl ReaderAdapter {
pub fn new(reader: object_store::Reader, len: u64) -> Self {
Self { reader, len }
}
}
impl AsyncChunkReader for ReaderAdapter {
fn len(&mut self) -> BoxFuture<'_, std::io::Result<u64>> {
async move { Ok(self.len) }.boxed()
}
fn get_bytes(
&mut self,
offset_from_start: u64,
length: u64,
) -> BoxFuture<'_, std::io::Result<Bytes>> {
async move {
let bytes = self
.reader
.read(offset_from_start..offset_from_start + length)
.await?;
Ok(bytes.to_bytes())
}
.boxed()
}
}
pub async fn new_orc_stream_reader(
reader: ReaderAdapter,
) -> Result<ArrowStreamReader<ReaderAdapter>> {
pub async fn new_orc_stream_reader<R: AsyncRead + AsyncSeek + Unpin + Send + 'static>(
reader: R,
) -> Result<ArrowStreamReader<R>> {
let reader_build = ArrowReaderBuilder::try_new_async(reader)
.await
.context(error::OrcReaderSnafu)?;
Ok(reader_build.build_async())
}
pub async fn infer_orc_schema(reader: ReaderAdapter) -> Result<Schema> {
pub async fn infer_orc_schema<R: AsyncRead + AsyncSeek + Unpin + Send + 'static>(
reader: R,
) -> Result<Schema> {
let reader = new_orc_stream_reader(reader).await?;
Ok(reader.schema().as_ref().clone())
}
@@ -84,15 +51,13 @@ pub async fn infer_orc_schema(reader: ReaderAdapter) -> Result<Schema> {
#[async_trait]
impl FileFormat for OrcFormat {
async fn infer_schema(&self, store: &ObjectStore, path: &str) -> Result<Schema> {
let meta = store
.stat(path)
.await
.context(error::ReadObjectSnafu { path })?;
let reader = store
.reader(path)
.await
.context(error::ReadObjectSnafu { path })?;
let schema = infer_orc_schema(ReaderAdapter::new(reader, meta.content_length())).await?;
let schema = infer_orc_schema(reader).await?;
Ok(schema)
}
}
@@ -132,20 +97,12 @@ impl FileOpener for OrcOpener {
};
let projection = self.projection.clone();
Ok(Box::pin(async move {
let path = meta.location().to_string();
let meta = object_store
.stat(&path)
.await
.map_err(|e| DataFusionError::External(Box::new(e)))?;
let reader = object_store
.reader(&path)
.reader(meta.location().to_string().as_str())
.await
.map_err(|e| DataFusionError::External(Box::new(e)))?;
let stream_reader =
new_orc_stream_reader(ReaderAdapter::new(reader, meta.content_length()))
let stream_reader = new_orc_stream_reader(reader)
.await
.map_err(|e| DataFusionError::External(Box::new(e)))?;

View File

@@ -29,11 +29,10 @@ use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
use datafusion::physical_plan::SendableRecordBatchStream;
use futures::future::BoxFuture;
use futures::StreamExt;
use object_store::{FuturesAsyncReader, ObjectStore};
use object_store::{ObjectStore, Reader, Writer};
use parquet::basic::{Compression, ZstdLevel};
use parquet::file::properties::WriterProperties;
use snafu::ResultExt;
use tokio_util::compat::{Compat, FuturesAsyncReadCompatExt, FuturesAsyncWriteCompatExt};
use crate::buffered_writer::{ArrowWriterCloser, DfRecordBatchEncoder, LazyBufferedWriter};
use crate::error::{self, Result};
@@ -46,16 +45,10 @@ pub struct ParquetFormat {}
#[async_trait]
impl FileFormat for ParquetFormat {
async fn infer_schema(&self, store: &ObjectStore, path: &str) -> Result<Schema> {
let meta = store
.stat(path)
.await
.context(error::ReadObjectSnafu { path })?;
let mut reader = store
.reader(path)
.await
.context(error::ReadObjectSnafu { path })?
.into_futures_async_read(0..meta.content_length())
.compat();
.context(error::ReadObjectSnafu { path })?;
let metadata = reader
.get_metadata()
@@ -105,7 +98,7 @@ impl ParquetFileReaderFactory for DefaultParquetFileReaderFactory {
pub struct LazyParquetFileReader {
object_store: ObjectStore,
reader: Option<Compat<FuturesAsyncReader>>,
reader: Option<Reader>,
path: String,
}
@@ -121,13 +114,7 @@ impl LazyParquetFileReader {
/// Must initialize the reader, or throw an error from the future.
async fn maybe_initialize(&mut self) -> result::Result<(), object_store::Error> {
if self.reader.is_none() {
let meta = self.object_store.stat(&self.path).await?;
let reader = self
.object_store
.reader(&self.path)
.await?
.into_futures_async_read(0..meta.content_length())
.compat();
let reader = self.object_store.reader(&self.path).await?;
self.reader = Some(reader);
}
@@ -180,17 +167,16 @@ pub struct BufferedWriter {
}
type InnerBufferedWriter = LazyBufferedWriter<
Compat<object_store::FuturesAsyncWriter>,
object_store::Writer,
ArrowWriter<SharedBuffer>,
impl Fn(String) -> BoxFuture<'static, Result<Compat<object_store::FuturesAsyncWriter>>>,
impl Fn(String) -> BoxFuture<'static, Result<Writer>>,
>;
impl BufferedWriter {
fn make_write_factory(
store: ObjectStore,
concurrency: usize,
) -> impl Fn(String) -> BoxFuture<'static, Result<Compat<object_store::FuturesAsyncWriter>>>
{
) -> impl Fn(String) -> BoxFuture<'static, Result<Writer>> {
move |path| {
let store = store.clone();
Box::pin(async move {
@@ -198,7 +184,6 @@ impl BufferedWriter {
.writer_with(&path)
.concurrent(concurrency)
.await
.map(|v| v.into_futures_async_write().compat_write())
.context(error::WriteObjectSnafu { path })
})
}

View File

@@ -120,7 +120,7 @@ pub async fn setup_stream_to_json_test(origin_path: &str, threshold: impl Fn(usi
let written = tmp_store.read(&output_path).await.unwrap();
let origin = store.read(origin_path).await.unwrap();
assert_eq_lines(written.to_vec(), origin.to_vec());
assert_eq_lines(written, origin);
}
pub async fn setup_stream_to_csv_test(origin_path: &str, threshold: impl Fn(usize) -> usize) {
@@ -158,7 +158,7 @@ pub async fn setup_stream_to_csv_test(origin_path: &str, threshold: impl Fn(usiz
let written = tmp_store.read(&output_path).await.unwrap();
let origin = store.read(origin_path).await.unwrap();
assert_eq_lines(written.to_vec(), origin.to_vec());
assert_eq_lines(written, origin);
}
// Ignore the CRLF difference across operating systems.

View File

@@ -179,7 +179,7 @@ impl StateStore for ObjectStateStore {
))
})
.context(ListStateSnafu { path: key })?;
yield (key.into(), value.to_vec());
yield (key.into(), value);
}
}
});

View File

@@ -20,6 +20,7 @@ mod gcs;
mod oss;
mod s3;
use std::sync::Arc;
use std::time::Duration;
use std::{env, path};
@@ -28,7 +29,7 @@ use common_telemetry::info;
use object_store::layers::{LruCacheLayer, RetryLayer};
use object_store::services::Fs;
use object_store::util::{join_dir, normalize_dir, with_instrument_layers};
use object_store::{HttpClient, ObjectStore};
use object_store::{HttpClient, ObjectStore, ObjectStoreBuilder};
use snafu::prelude::*;
use crate::config::{ObjectStoreConfig, DEFAULT_OBJECT_STORE_CACHE_SIZE};
@@ -106,13 +107,13 @@ async fn create_object_store_with_cache(
if let Some(path) = cache_path {
let atomic_temp_dir = join_dir(path, ".tmp/");
clean_temp_dir(&atomic_temp_dir)?;
let mut builder = Fs::default();
builder.root(path).atomic_write_dir(&atomic_temp_dir);
let cache_store = ObjectStore::new(builder)
.context(error::InitBackendSnafu)?
.finish();
let cache_store = Fs::default()
.root(path)
.atomic_write_dir(&atomic_temp_dir)
.build()
.context(error::InitBackendSnafu)?;
let cache_layer = LruCacheLayer::new(cache_store, cache_capacity.0 as usize)
let cache_layer = LruCacheLayer::new(Arc::new(cache_store), cache_capacity.0 as usize)
.await
.context(error::InitBackendSnafu)?;

View File

@@ -71,8 +71,7 @@ impl FileRegionManifest {
let bs = object_store
.read(path)
.await
.context(LoadRegionManifestSnafu { region_id })?
.to_vec();
.context(LoadRegionManifestSnafu { region_id })?;
Self::decode(bs.as_slice())
}

View File

@@ -171,9 +171,10 @@ impl MetricEngineInner {
// check if the logical region already exist
if self
.metadata_region
.is_logical_region_exists(metadata_region_id, logical_region_id)
.await?
.state
.read()
.unwrap()
.is_logical_region_exists(logical_region_id)
{
info!("Create a existing logical region {logical_region_id}. Skipped");
return Ok(data_region_id);

View File

@@ -104,7 +104,7 @@ impl MetricEngineInner {
// check if the region exists
let data_region_id = to_data_region_id(physical_region_id);
let state = self.state.read().unwrap();
if !state.is_logical_region_exist(logical_region_id) {
if !state.is_logical_region_exists(logical_region_id) {
error!("Trying to write to an nonexistent region {logical_region_id}");
return LogicalRegionNotFoundSnafu {
region_id: logical_region_id,

View File

@@ -149,7 +149,7 @@ impl MetricEngineState {
Ok(exist)
}
pub fn is_logical_region_exist(&self, logical_region_id: RegionId) -> bool {
pub fn is_logical_region_exists(&self, logical_region_id: RegionId) -> bool {
self.logical_regions().contains_key(&logical_region_id)
}
}

View File

@@ -139,17 +139,6 @@ impl MetadataRegion {
Ok(())
}
/// Check if the given logical region exists.
pub async fn is_logical_region_exists(
&self,
physical_region_id: RegionId,
logical_region_id: RegionId,
) -> Result<bool> {
let region_id = utils::to_metadata_region_id(physical_region_id);
let region_key = Self::concat_region_key(logical_region_id);
self.exists(region_id, &region_key).await
}
/// Check if the given column exists. Return the semantic type if exists.
pub async fn column_semantic_type(
&self,
@@ -669,10 +658,6 @@ mod test {
.add_logical_region(physical_region_id, logical_region_id)
.await
.unwrap();
assert!(metadata_region
.is_logical_region_exists(physical_region_id, logical_region_id)
.await
.unwrap());
// add it again
assert!(metadata_region

View File

@@ -112,10 +112,6 @@ impl FileCache {
self.memory_index.insert(key, value).await;
}
pub(crate) async fn get(&self, key: IndexKey) -> Option<IndexValue> {
self.memory_index.get(&key).await
}
/// Reads a file from the cache.
pub(crate) async fn reader(&self, key: IndexKey) -> Option<Reader> {
// We must use `get()` to update the estimator of the cache.
@@ -376,6 +372,7 @@ fn parse_index_key(name: &str) -> Option<IndexKey> {
#[cfg(test)]
mod tests {
use common_test_util::temp_dir::create_temp_dir;
use futures::AsyncReadExt;
use object_store::services::Fs;
use super::*;
@@ -454,9 +451,10 @@ mod tests {
.await;
// Read file content.
let reader = cache.reader(key).await.unwrap();
let buf = reader.read(..).await.unwrap().to_vec();
assert_eq!("hello", String::from_utf8(buf).unwrap());
let mut reader = cache.reader(key).await.unwrap();
let mut buf = String::new();
reader.read_to_string(&mut buf).await.unwrap();
assert_eq!("hello", buf);
// Get weighted size.
cache.memory_index.run_pending_tasks().await;
@@ -551,9 +549,10 @@ mod tests {
for (i, file_id) in file_ids.iter().enumerate() {
let key = IndexKey::new(region_id, *file_id, file_type);
let reader = cache.reader(key).await.unwrap();
let buf = reader.read(..).await.unwrap().to_vec();
assert_eq!(i.to_string(), String::from_utf8(buf).unwrap());
let mut reader = cache.reader(key).await.unwrap();
let mut buf = String::new();
reader.read_to_string(&mut buf).await.unwrap();
assert_eq!(i.to_string(), buf);
}
}

View File

@@ -19,7 +19,6 @@ use std::time::Duration;
use common_base::readable_size::ReadableSize;
use common_telemetry::{debug, info};
use futures::AsyncWriteExt;
use object_store::manager::ObjectStoreManagerRef;
use object_store::ObjectStore;
use snafu::ResultExt;
@@ -176,27 +175,19 @@ impl WriteCache {
}])
.start_timer();
let cached_value = self
.file_cache
.local_store()
.stat(&cache_path)
.await
.context(error::OpenDalSnafu)?;
let reader = self
.file_cache
.local_store()
.reader(&cache_path)
.await
.context(error::OpenDalSnafu)?
.into_futures_async_read(0..cached_value.content_length());
.context(error::OpenDalSnafu)?;
let mut writer = remote_store
.writer_with(upload_path)
.chunk(DEFAULT_WRITE_BUFFER_SIZE.as_bytes() as usize)
.buffer(DEFAULT_WRITE_BUFFER_SIZE.as_bytes() as usize)
.concurrent(DEFAULT_WRITE_CONCURRENCY)
.await
.context(error::OpenDalSnafu)?
.into_futures_async_write();
.context(error::OpenDalSnafu)?;
let bytes_written =
futures::io::copy(reader, &mut writer)
@@ -208,11 +199,7 @@ impl WriteCache {
})?;
// Must close to upload all data.
writer.close().await.context(error::UploadSnafu {
region_id,
file_id,
file_type,
})?;
writer.close().await.context(error::OpenDalSnafu)?;
UPLOAD_BYTES_TOTAL.inc_by(bytes_written);
@@ -328,7 +315,7 @@ mod tests {
.read(&write_cache.file_cache.cache_file_path(key))
.await
.unwrap();
assert_eq!(remote_data.to_vec(), cache_data.to_vec());
assert_eq!(remote_data, cache_data);
// Check write cache contains the index key
let index_key = IndexKey::new(region_id, file_id, FileType::Puffin);
@@ -339,7 +326,7 @@ mod tests {
.read(&write_cache.file_cache.cache_file_path(index_key))
.await
.unwrap();
assert_eq!(remote_index_data.to_vec(), cache_index_data.to_vec());
assert_eq!(remote_index_data, cache_index_data);
}
#[tokio::test]

View File

@@ -497,7 +497,7 @@ impl ManifestObjectStore {
}
};
let checkpoint_metadata = CheckpointMetadata::decode(&last_checkpoint_data.to_vec())?;
let checkpoint_metadata = CheckpointMetadata::decode(&last_checkpoint_data)?;
debug!(
"Load checkpoint in path: {}, metadata: {:?}",
@@ -509,11 +509,7 @@ impl ManifestObjectStore {
#[cfg(test)]
pub async fn read_file(&self, path: &str) -> Result<Vec<u8>> {
self.object_store
.read(path)
.await
.context(OpenDalSnafu)
.map(|v| v.to_vec())
self.object_store.read(path).await.context(OpenDalSnafu)
}
#[cfg(test)]

View File

@@ -121,17 +121,9 @@ impl SstIndexApplier {
return Ok(None);
};
let Some(indexed_value) = file_cache
.get(IndexKey::new(self.region_id, file_id, FileType::Puffin))
.await
else {
return Ok(None);
};
Ok(file_cache
.reader(IndexKey::new(self.region_id, file_id, FileType::Puffin))
.await
.map(|v| v.into_futures_async_read(0..indexed_value.file_size as u64))
.map(PuffinFileReader::new))
}
@@ -198,13 +190,7 @@ mod tests {
let region_dir = "region_dir".to_string();
let path = location::index_file_path(&region_dir, file_id);
let mut puffin_writer = PuffinFileWriter::new(
object_store
.writer(&path)
.await
.unwrap()
.into_futures_async_write(),
);
let mut puffin_writer = PuffinFileWriter::new(object_store.writer(&path).await.unwrap());
puffin_writer
.add_blob(Blob {
blob_type: INDEX_BLOB_TYPE.to_string(),
@@ -250,13 +236,7 @@ mod tests {
let region_dir = "region_dir".to_string();
let path = location::index_file_path(&region_dir, file_id);
let mut puffin_writer = PuffinFileWriter::new(
object_store
.writer(&path)
.await
.unwrap()
.into_futures_async_write(),
);
let mut puffin_writer = PuffinFileWriter::new(object_store.writer(&path).await.unwrap());
puffin_writer
.add_blob(Blob {
blob_type: "invalid_blob_type".to_string(),

View File

@@ -26,8 +26,6 @@ use crate::error::{OpenDalSnafu, Result};
/// A wrapper around [`ObjectStore`] that adds instrumentation for monitoring
/// metrics such as bytes read, bytes written, and the number of seek operations.
///
/// TODO: Consider refactor InstrumentedStore to use async in trait instead of AsyncRead.
#[derive(Clone)]
pub(crate) struct InstrumentedStore {
/// The underlying object store.
@@ -60,14 +58,8 @@ impl InstrumentedStore {
read_byte_count: &'a IntCounter,
read_count: &'a IntCounter,
seek_count: &'a IntCounter,
) -> Result<InstrumentedAsyncRead<'a, object_store::FuturesAsyncReader>> {
let meta = self.object_store.stat(path).await.context(OpenDalSnafu)?;
let reader = self
.object_store
.reader(path)
.await
.context(OpenDalSnafu)?
.into_futures_async_read(0..meta.content_length());
) -> Result<InstrumentedAsyncRead<'a, object_store::Reader>> {
let reader = self.object_store.reader(path).await.context(OpenDalSnafu)?;
Ok(InstrumentedAsyncRead::new(
reader,
read_byte_count,
@@ -85,21 +77,15 @@ impl InstrumentedStore {
write_byte_count: &'a IntCounter,
write_count: &'a IntCounter,
flush_count: &'a IntCounter,
) -> Result<InstrumentedAsyncWrite<'a, object_store::FuturesAsyncWriter>> {
) -> Result<InstrumentedAsyncWrite<'a, object_store::Writer>> {
let writer = match self.write_buffer_size {
Some(size) => self
.object_store
.writer_with(path)
.chunk(size)
.buffer(size)
.await
.context(OpenDalSnafu)?
.into_futures_async_write(),
None => self
.object_store
.writer(path)
.await
.context(OpenDalSnafu)?
.into_futures_async_write(),
.context(OpenDalSnafu)?,
None => self.object_store.writer(path).await.context(OpenDalSnafu)?,
};
Ok(InstrumentedAsyncWrite::new(
writer,

View File

@@ -121,7 +121,7 @@ async fn fetch_ranges_seq(
.read_with(&file_path)
.range(range.start..range.end)
.call()?;
Ok::<_, object_store::Error>(data.to_bytes())
Ok::<_, object_store::Error>(Bytes::from(data))
})
.collect::<object_store::Result<Vec<_>>>()
};
@@ -141,7 +141,7 @@ async fn fetch_ranges_concurrent(
let future_read = object_store.read_with(file_path);
handles.push(async move {
let data = future_read.range(range.start..range.end).await?;
Ok::<_, object_store::Error>(data.to_bytes())
Ok::<_, object_store::Error>(Bytes::from(data))
});
}
let results = futures::future::try_join_all(handles).await?;
@@ -164,7 +164,7 @@ where
}
}
// https://github.com/apache/opendal/blob/v0.46.0/core/src/raw/tokio_util.rs#L21-L24
// https://github.com/apache/incubator-opendal/blob/7144ab1ca2409dff0c324bfed062ce985997f8ce/core/src/raw/tokio_util.rs#L21-L23
/// Parse tokio error into opendal::Error.
fn new_task_join_error(e: tokio::task::JoinError) -> object_store::Error {
object_store::Error::new(ErrorKind::Unexpected, "tokio task join failed").set_source(e)

View File

@@ -85,8 +85,7 @@ impl<'a> MetadataLoader<'a> {
.read_with(path)
.range(buffer_start..file_size)
.await
.context(error::OpenDalSnafu)?
.to_vec();
.context(error::OpenDalSnafu)?;
let buffer_len = buffer.len();
let mut footer = [0; 8];
@@ -130,8 +129,7 @@ impl<'a> MetadataLoader<'a> {
.read_with(path)
.range(metadata_start..(file_size - FOOTER_SIZE as u64))
.await
.context(error::OpenDalSnafu)?
.to_vec();
.context(error::OpenDalSnafu)?;
let metadata = decode_metadata(&data).map_err(|e| {
error::InvalidParquetSnafu {

View File

@@ -16,7 +16,6 @@
use std::time::Duration;
use bytes::Bytes;
use common_telemetry::{error, info, warn};
use futures::TryStreamExt;
use object_store::util::join_path;
@@ -51,7 +50,7 @@ impl<S> RegionWorkerLoop<S> {
region
.access_layer
.object_store()
.write(&marker_path, Bytes::new())
.write(&marker_path, vec![])
.await
.context(OpenDalSnafu)
.inspect_err(|e| {

View File

@@ -11,21 +11,23 @@ workspace = true
services-memory = ["opendal/services-memory"]
[dependencies]
async-trait = "0.1"
bytes.workspace = true
common-telemetry.workspace = true
futures.workspace = true
lazy_static.workspace = true
md5 = "0.7"
moka = { workspace = true, features = ["future"] }
opendal = { version = "0.46", features = [
opendal = { version = "0.45", features = [
"layers-tracing",
"rustls",
"services-azblob",
"services-fs",
"services-gcs",
"services-http",
"services-oss",
"services-s3",
] }
], default-features = false }
prometheus.workspace = true
uuid.workspace = true
@@ -33,4 +35,5 @@ uuid.workspace = true
anyhow = "1.0"
common-telemetry.workspace = true
common-test-util.workspace = true
opendal = { version = "0.45", features = ["services-memory"] }
tokio.workspace = true

View File

@@ -14,26 +14,27 @@
use std::sync::Arc;
use opendal::raw::oio::ReadDyn;
use async_trait::async_trait;
use opendal::raw::oio::Read;
use opendal::raw::{
Access, Layer, LayeredAccess, OpDelete, OpList, OpRead, OpWrite, RpDelete, RpList, RpRead,
Accessor, Layer, LayeredAccessor, OpDelete, OpList, OpRead, OpWrite, RpDelete, RpList, RpRead,
RpWrite,
};
use opendal::{Operator, Result};
use opendal::Result;
mod read_cache;
use common_telemetry::info;
use read_cache::ReadCache;
/// An opendal layer with local LRU file cache supporting.
#[derive(Clone)]
pub struct LruCacheLayer {
pub struct LruCacheLayer<C: Clone> {
// The read cache
read_cache: ReadCache,
read_cache: ReadCache<C>,
}
impl LruCacheLayer {
impl<C: Accessor + Clone> LruCacheLayer<C> {
/// Create a `[LruCacheLayer]` with local file cache and capacity in bytes.
pub async fn new(file_cache: Operator, capacity: usize) -> Result<Self> {
pub async fn new(file_cache: Arc<C>, capacity: usize) -> Result<Self> {
let read_cache = ReadCache::new(file_cache, capacity);
let (entries, bytes) = read_cache.recover_cache().await?;
@@ -56,11 +57,11 @@ impl LruCacheLayer {
}
}
impl<I: Access> Layer<I> for LruCacheLayer {
type LayeredAccess = LruCacheAccess<I>;
impl<I: Accessor, C: Accessor + Clone> Layer<I> for LruCacheLayer<C> {
type LayeredAccessor = LruCacheAccessor<I, C>;
fn layer(&self, inner: I) -> Self::LayeredAccess {
LruCacheAccess {
fn layer(&self, inner: I) -> Self::LayeredAccessor {
LruCacheAccessor {
inner,
read_cache: self.read_cache.clone(),
}
@@ -68,14 +69,15 @@ impl<I: Access> Layer<I> for LruCacheLayer {
}
#[derive(Debug)]
pub struct LruCacheAccess<I> {
pub struct LruCacheAccessor<I, C: Clone> {
inner: I,
read_cache: ReadCache,
read_cache: ReadCache<C>,
}
impl<I: Access> LayeredAccess for LruCacheAccess<I> {
#[async_trait]
impl<I: Accessor, C: Accessor + Clone> LayeredAccessor for LruCacheAccessor<I, C> {
type Inner = I;
type Reader = Arc<dyn ReadDyn>;
type Reader = Box<dyn Read>;
type BlockingReader = I::BlockingReader;
type Writer = I::Writer;
type BlockingWriter = I::BlockingWriter;

View File

@@ -15,12 +15,12 @@
use std::sync::Arc;
use common_telemetry::debug;
use futures::{FutureExt, StreamExt};
use futures::FutureExt;
use moka::future::Cache;
use moka::notification::ListenerFuture;
use opendal::raw::oio::{Read, ReadDyn, Reader};
use opendal::raw::{Access, BytesRange, OpRead, OpStat, RpRead};
use opendal::{Buffer, Error as OpendalError, ErrorKind, Operator, Result};
use opendal::raw::oio::{ListExt, Read, ReadExt, Reader, WriteExt};
use opendal::raw::{Accessor, OpDelete, OpList, OpRead, OpStat, OpWrite, RpRead};
use opendal::{Error as OpendalError, ErrorKind, Result};
use crate::metrics::{
OBJECT_STORE_LRU_CACHE_BYTES, OBJECT_STORE_LRU_CACHE_ENTRIES, OBJECT_STORE_LRU_CACHE_HIT,
@@ -52,22 +52,26 @@ fn can_cache(path: &str) -> bool {
}
/// Generate an unique cache key for the read path and range.
fn read_cache_key(path: &str, range: BytesRange) -> String {
format!("{:x}.cache-{}", md5::compute(path), range.to_header())
fn read_cache_key(path: &str, args: &OpRead) -> String {
format!(
"{:x}.cache-{}",
md5::compute(path),
args.range().to_header()
)
}
/// Local read cache for files in object storage
#[derive(Clone, Debug)]
pub(crate) struct ReadCache {
pub(crate) struct ReadCache<C: Clone> {
/// Local file cache backend
file_cache: Operator,
file_cache: Arc<C>,
/// Local memory cache to track local cache files
mem_cache: Cache<String, ReadResult>,
}
impl ReadCache {
impl<C: Accessor + Clone> ReadCache<C> {
/// Create a [`ReadCache`] with capacity in bytes.
pub(crate) fn new(file_cache: Operator, capacity: usize) -> Self {
pub(crate) fn new(file_cache: Arc<C>, capacity: usize) -> Self {
let file_cache_cloned = file_cache.clone();
let eviction_listener =
move |read_key: Arc<String>, read_result: ReadResult, cause| -> ListenerFuture {
@@ -79,7 +83,7 @@ impl ReadCache {
if let ReadResult::Success(size) = read_result {
OBJECT_STORE_LRU_CACHE_BYTES.sub(size as i64);
let result = file_cache_cloned.delete(&read_key).await;
let result = file_cache_cloned.delete(&read_key, OpDelete::new()).await;
debug!(
"Deleted local cache file `{}`, result: {:?}, cause: {:?}.",
read_key, result, cause
@@ -129,17 +133,17 @@ impl ReadCache {
/// Recover existing cache items from `file_cache` to `mem_cache`.
/// Return entry count and total approximate entry size in bytes.
pub(crate) async fn recover_cache(&self) -> Result<(u64, u64)> {
let mut pager = self.file_cache.lister("/").await?;
let (_, mut pager) = self.file_cache.list("/", OpList::default()).await?;
while let Some(entry) = pager.next().await.transpose()? {
while let Some(entry) = pager.next().await? {
let read_key = entry.path();
// We can't retrieve the metadata from `[opendal::raw::oio::Entry]` directly,
// because it's private field.
let size = {
let stat = self.file_cache.stat(read_key).await?;
let stat = self.file_cache.stat(read_key, OpStat::default()).await?;
stat.content_length()
stat.into_metadata().content_length()
};
OBJECT_STORE_LRU_CACHE_ENTRIES.inc();
@@ -155,7 +159,8 @@ impl ReadCache {
/// Returns true when the read cache contains the specific file.
pub(crate) async fn contains_file(&self, path: &str) -> bool {
self.mem_cache.run_pending_tasks().await;
self.mem_cache.contains_key(path) && self.file_cache.stat(path).await.is_ok()
self.mem_cache.contains_key(path)
&& self.file_cache.stat(path, OpStat::default()).await.is_ok()
}
/// Read from a specific path using the OpRead operation.
@@ -168,54 +173,86 @@ impl ReadCache {
inner: &I,
path: &str,
args: OpRead,
) -> Result<(RpRead, Arc<dyn ReadDyn>)>
) -> Result<(RpRead, Box<dyn Read>)>
where
I: Access,
I: Accessor,
{
if !can_cache(path) {
return inner.read(path, args).await.map(to_output_reader);
}
// FIXME: remove this block after opendal v0.47 released.
let meta = inner.stat(path, OpStat::new()).await?;
let (rp, reader) = inner.read(path, args).await?;
let reader: ReadCacheReader<I> = ReadCacheReader {
path: Arc::new(path.to_string()),
inner_reader: reader,
size: meta.into_metadata().content_length(),
file_cache: self.file_cache.clone(),
mem_cache: self.mem_cache.clone(),
};
Ok((rp, Arc::new(reader)))
let read_key = read_cache_key(path, &args);
let read_result = self
.mem_cache
.try_get_with(
read_key.clone(),
self.read_remote(inner, &read_key, path, args.clone()),
)
.await
.map_err(|e| OpendalError::new(e.kind(), &e.to_string()))?;
match read_result {
ReadResult::Success(_) => {
// There is a concurrent issue here, the local cache may be purged
// while reading, we have to fallback to remote read
match self.file_cache.read(&read_key, OpRead::default()).await {
Ok(ret) => {
OBJECT_STORE_LRU_CACHE_HIT
.with_label_values(&["success"])
.inc();
Ok(to_output_reader(ret))
}
Err(_) => {
OBJECT_STORE_LRU_CACHE_MISS.inc();
inner.read(path, args).await.map(to_output_reader)
}
}
}
ReadResult::NotFound => {
OBJECT_STORE_LRU_CACHE_HIT
.with_label_values(&["not_found"])
.inc();
Err(OpendalError::new(
ErrorKind::NotFound,
&format!("File not found: {path}"),
))
}
}
}
pub struct ReadCacheReader<I: Access> {
/// Path of the file
path: Arc<String>,
/// Remote file reader.
inner_reader: I::Reader,
/// FIXME: remove this field after opendal v0.47 released.
///
/// OpenDAL's read_at takes `offset, limit` which means the underlying storage
/// services could return less data than limit. We store size here as a workaround.
///
/// This API has been refactor into `offset, size` instead. After opendal v0.47 released,
/// we don't need this anymore.
size: u64,
/// Local file cache backend
file_cache: Operator,
/// Local memory cache to track local cache files
mem_cache: Cache<String, ReadResult>,
async fn try_write_cache<I>(&self, mut reader: I::Reader, read_key: &str) -> Result<usize>
where
I: Accessor,
{
let (_, mut writer) = self.file_cache.write(read_key, OpWrite::new()).await?;
let mut total = 0;
while let Some(bytes) = reader.next().await {
let bytes = &bytes?;
total += bytes.len();
writer.write(bytes).await?;
}
// Call `close` to ensure data is written.
writer.close().await?;
Ok(total)
}
impl<I: Access> ReadCacheReader<I> {
/// TODO: we can return the Buffer directly to avoid another read from cache.
async fn read_remote(&self, offset: u64, limit: usize) -> Result<ReadResult> {
/// Read the file from remote storage. If success, write the content into local cache.
async fn read_remote<I>(
&self,
inner: &I,
read_key: &str,
path: &str,
args: OpRead,
) -> Result<ReadResult>
where
I: Accessor,
{
OBJECT_STORE_LRU_CACHE_MISS.inc();
let buf = self.inner_reader.read_at(offset, limit).await?;
let result = self.try_write_cache(buf, offset).await;
let (_, reader) = inner.read(path, args).await?;
let result = self.try_write_cache::<I>(reader, read_key).await;
match result {
Ok(read_bytes) => {
@@ -242,59 +279,10 @@ impl<I: Access> ReadCacheReader<I> {
}
}
}
async fn try_write_cache(&self, buf: Buffer, offset: u64) -> Result<usize> {
let size = buf.len();
let read_key = read_cache_key(&self.path, BytesRange::new(offset, Some(size as _)));
self.file_cache.write(&read_key, buf).await?;
Ok(size)
}
}
impl<I: Access> Read for ReadCacheReader<I> {
async fn read_at(&self, offset: u64, limit: usize) -> Result<Buffer> {
let size = self.size.min(offset + limit as u64) - offset;
let read_key = read_cache_key(&self.path, BytesRange::new(offset, Some(size as _)));
let read_result = self
.mem_cache
.try_get_with(read_key.clone(), self.read_remote(offset, limit))
.await
.map_err(|e| OpendalError::new(e.kind(), &e.to_string()))?;
match read_result {
ReadResult::Success(_) => {
// There is a concurrent issue here, the local cache may be purged
// while reading, we have to fallback to remote read
match self.file_cache.read(&read_key).await {
Ok(ret) => {
OBJECT_STORE_LRU_CACHE_HIT
.with_label_values(&["success"])
.inc();
Ok(ret)
}
Err(_) => {
OBJECT_STORE_LRU_CACHE_MISS.inc();
self.inner_reader.read_at(offset, limit).await
}
}
}
ReadResult::NotFound => {
OBJECT_STORE_LRU_CACHE_HIT
.with_label_values(&["not_found"])
.inc();
Err(OpendalError::new(
ErrorKind::NotFound,
&format!("File not found: {}", self.path),
))
}
}
}
}
fn to_output_reader<R: Read + 'static>(input: (RpRead, R)) -> (RpRead, Reader) {
(input.0, Arc::new(input.1))
(input.0, Box::new(input.1))
}
#[cfg(test)]

View File

@@ -15,11 +15,16 @@
//! code originally from <https://github.com/apache/incubator-opendal/blob/main/core/src/layers/prometheus.rs>, make a tiny change to avoid crash in multi thread env
use std::fmt::{Debug, Formatter};
use std::io;
use std::task::{Context, Poll};
use async_trait::async_trait;
use bytes::Bytes;
use common_telemetry::debug;
use futures::FutureExt;
use lazy_static::lazy_static;
use opendal::raw::*;
use opendal::{Buffer, ErrorKind};
use opendal::ErrorKind;
use prometheus::{
exponential_buckets, histogram_opts, register_histogram_vec, register_int_counter_vec,
Histogram, HistogramTimer, HistogramVec, IntCounterVec,
@@ -84,14 +89,14 @@ fn increment_errors_total(op: Operation, kind: ErrorKind) {
#[derive(Default, Debug, Clone)]
pub struct PrometheusMetricsLayer;
impl<A: Access> Layer<A> for PrometheusMetricsLayer {
type LayeredAccess = PrometheusAccess<A>;
impl<A: Accessor> Layer<A> for PrometheusMetricsLayer {
type LayeredAccessor = PrometheusAccessor<A>;
fn layer(&self, inner: A) -> Self::LayeredAccess {
fn layer(&self, inner: A) -> Self::LayeredAccessor {
let meta = inner.info();
let scheme = meta.scheme();
PrometheusAccess {
PrometheusAccessor {
inner,
scheme: scheme.to_string(),
}
@@ -99,12 +104,12 @@ impl<A: Access> Layer<A> for PrometheusMetricsLayer {
}
#[derive(Clone)]
pub struct PrometheusAccess<A: Access> {
pub struct PrometheusAccessor<A: Accessor> {
inner: A,
scheme: String,
}
impl<A: Access> Debug for PrometheusAccess<A> {
impl<A: Accessor> Debug for PrometheusAccessor<A> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("PrometheusAccessor")
.field("inner", &self.inner)
@@ -112,7 +117,8 @@ impl<A: Access> Debug for PrometheusAccess<A> {
}
}
impl<A: Access> LayeredAccess for PrometheusAccess<A> {
#[async_trait]
impl<A: Accessor> LayeredAccessor for PrometheusAccessor<A> {
type Inner = A;
type Reader = PrometheusMetricWrapper<A::Reader>;
type BlockingReader = PrometheusMetricWrapper<A::BlockingReader>;
@@ -151,20 +157,27 @@ impl<A: Access> LayeredAccess for PrometheusAccess<A> {
.with_label_values(&[&self.scheme, Operation::Read.into_static()])
.start_timer();
let (rp, r) = self.inner.read(path, args).await.map_err(|e| {
increment_errors_total(Operation::Read, e.kind());
e
})?;
Ok((
self.inner
.read(path, args)
.map(|v| {
v.map(|(rp, r)| {
(
rp,
PrometheusMetricWrapper::new(
r,
Operation::Read,
BYTES_TOTAL.with_label_values(&[&self.scheme, Operation::Read.into_static()]),
BYTES_TOTAL
.with_label_values(&[&self.scheme, Operation::Read.into_static()]),
timer,
),
))
)
})
})
.await
.map_err(|e| {
increment_errors_total(Operation::Read, e.kind());
e
})
}
async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
@@ -176,20 +189,27 @@ impl<A: Access> LayeredAccess for PrometheusAccess<A> {
.with_label_values(&[&self.scheme, Operation::Write.into_static()])
.start_timer();
let (rp, r) = self.inner.write(path, args).await.map_err(|e| {
increment_errors_total(Operation::Write, e.kind());
e
})?;
Ok((
self.inner
.write(path, args)
.map(|v| {
v.map(|(rp, r)| {
(
rp,
PrometheusMetricWrapper::new(
r,
Operation::Write,
BYTES_TOTAL.with_label_values(&[&self.scheme, Operation::Write.into_static()]),
BYTES_TOTAL
.with_label_values(&[&self.scheme, Operation::Write.into_static()]),
timer,
),
))
)
})
})
.await
.map_err(|e| {
increment_errors_total(Operation::Write, e.kind());
e
})
}
async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
@@ -441,46 +461,103 @@ impl<R> PrometheusMetricWrapper<R> {
}
impl<R: oio::Read> oio::Read for PrometheusMetricWrapper<R> {
async fn read_at(&self, offset: u64, limit: usize) -> Result<Buffer> {
self.inner.read_at(offset, limit).await.map_err(|err| {
increment_errors_total(self.op, err.kind());
err
fn poll_read(&mut self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<Result<usize>> {
self.inner.poll_read(cx, buf).map(|res| match res {
Ok(bytes) => {
self.bytes += bytes as u64;
Ok(bytes)
}
Err(e) => {
increment_errors_total(self.op, e.kind());
Err(e)
}
})
}
fn poll_seek(&mut self, cx: &mut Context<'_>, pos: io::SeekFrom) -> Poll<Result<u64>> {
self.inner.poll_seek(cx, pos).map(|res| match res {
Ok(n) => Ok(n),
Err(e) => {
increment_errors_total(self.op, e.kind());
Err(e)
}
})
}
fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<Bytes>>> {
self.inner.poll_next(cx).map(|res| match res {
Some(Ok(bytes)) => {
self.bytes += bytes.len() as u64;
Some(Ok(bytes))
}
Some(Err(e)) => {
increment_errors_total(self.op, e.kind());
Some(Err(e))
}
None => None,
})
}
}
impl<R: oio::BlockingRead> oio::BlockingRead for PrometheusMetricWrapper<R> {
fn read_at(&self, offset: u64, limit: usize) -> opendal::Result<Buffer> {
self.inner.read_at(offset, limit).map_err(|err| {
increment_errors_total(self.op, err.kind());
err
})
}
}
impl<R: oio::Write> oio::Write for PrometheusMetricWrapper<R> {
async fn write(&mut self, bs: Buffer) -> Result<usize> {
match self.inner.write(bs).await {
Ok(n) => {
fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
self.inner
.read(buf)
.map(|n| {
self.bytes += n as u64;
Ok(n)
}
Err(err) => {
increment_errors_total(self.op, err.kind());
Err(err)
}
}
n
})
.map_err(|e| {
increment_errors_total(self.op, e.kind());
e
})
}
async fn close(&mut self) -> Result<()> {
self.inner.close().await.map_err(|err| {
fn seek(&mut self, pos: io::SeekFrom) -> Result<u64> {
self.inner.seek(pos).map_err(|err| {
increment_errors_total(self.op, err.kind());
err
})
}
async fn abort(&mut self) -> Result<()> {
self.inner.close().await.map_err(|err| {
fn next(&mut self) -> Option<Result<Bytes>> {
self.inner.next().map(|res| match res {
Ok(bytes) => {
self.bytes += bytes.len() as u64;
Ok(bytes)
}
Err(e) => {
increment_errors_total(self.op, e.kind());
Err(e)
}
})
}
}
#[async_trait]
impl<R: oio::Write> oio::Write for PrometheusMetricWrapper<R> {
fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Poll<Result<usize>> {
self.inner
.poll_write(cx, bs)
.map_ok(|n| {
self.bytes += n as u64;
n
})
.map_err(|err| {
increment_errors_total(self.op, err.kind());
err
})
}
fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
self.inner.poll_abort(cx).map_err(|err| {
increment_errors_total(self.op, err.kind());
err
})
}
fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
self.inner.poll_close(cx).map_err(|err| {
increment_errors_total(self.op, err.kind());
err
})
@@ -488,7 +565,7 @@ impl<R: oio::Write> oio::Write for PrometheusMetricWrapper<R> {
}
impl<R: oio::BlockingWrite> oio::BlockingWrite for PrometheusMetricWrapper<R> {
fn write(&mut self, bs: Buffer) -> Result<usize> {
fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
self.inner
.write(bs)
.map(|n| {

View File

@@ -14,9 +14,8 @@
pub use opendal::raw::{normalize_path as raw_normalize_path, HttpClient};
pub use opendal::{
services, Builder as ObjectStoreBuilder, Entry, EntryMode, Error, ErrorKind,
FuturesAsyncReader, FuturesAsyncWriter, Lister, Metakey, Operator as ObjectStore, Reader,
Result, Writer,
services, Builder as ObjectStoreBuilder, Entry, EntryMode, Error, ErrorKind, Lister, Metakey,
Operator as ObjectStore, Reader, Result, Writer,
};
pub mod layers;

View File

@@ -22,6 +22,7 @@ use object_store::layers::LruCacheLayer;
use object_store::services::{Fs, S3};
use object_store::test_util::TempFolder;
use object_store::{ObjectStore, ObjectStoreBuilder};
use opendal::raw::Accessor;
use opendal::services::{Azblob, Gcs, Oss};
use opendal::{EntryMode, Operator, OperatorBuilder};
@@ -35,11 +36,11 @@ async fn test_object_crud(store: &ObjectStore) -> Result<()> {
// Read data from object;
let bs = store.read(file_name).await?;
assert_eq!("Hello, World!", String::from_utf8(bs.to_vec())?);
assert_eq!("Hello, World!", String::from_utf8(bs)?);
// Read range from object;
let bs = store.read_with(file_name).range(1..=11).await?;
assert_eq!("ello, World", String::from_utf8(bs.to_vec())?);
assert_eq!("ello, World", String::from_utf8(bs)?);
// Get object's Metadata
let meta = store.stat(file_name).await?;
@@ -76,7 +77,7 @@ async fn test_object_list(store: &ObjectStore) -> Result<()> {
assert_eq!(p2, entries.first().unwrap().path());
let content = store.read(p2).await?;
assert_eq!("Hello, object2!", String::from_utf8(content.to_vec())?);
assert_eq!("Hello, object2!", String::from_utf8(content)?);
store.delete(p2).await?;
let entries = store.list("/").await?;
@@ -235,9 +236,11 @@ async fn test_file_backend_with_lru_cache() -> Result<()> {
let _ = builder
.root(&cache_dir.path().to_string_lossy())
.atomic_write_dir(&cache_dir.path().to_string_lossy());
let file_cache = Operator::new(builder).unwrap().finish();
let file_cache = Arc::new(builder.build().unwrap());
LruCacheLayer::new(file_cache, 32).await.unwrap()
LruCacheLayer::new(Arc::new(file_cache.clone()), 32)
.await
.unwrap()
};
let store = store.layer(cache_layer.clone());
@@ -250,7 +253,10 @@ async fn test_file_backend_with_lru_cache() -> Result<()> {
Ok(())
}
async fn assert_lru_cache(cache_layer: &LruCacheLayer, file_names: &[&str]) {
async fn assert_lru_cache<C: Accessor + Clone>(
cache_layer: &LruCacheLayer<C>,
file_names: &[&str],
) {
for file_name in file_names {
assert!(cache_layer.contains_file(file_name).await);
}
@@ -272,7 +278,7 @@ async fn assert_cache_files(
let bs = store.read(o.path()).await.unwrap();
assert_eq!(
file_contents[position],
String::from_utf8(bs.to_vec())?,
String::from_utf8(bs.clone())?,
"file content not match: {}",
o.name()
);
@@ -306,7 +312,9 @@ async fn test_object_store_cache_policy() -> Result<()> {
let cache_store = OperatorBuilder::new(file_cache.clone()).finish();
// create operator for cache dir to verify cache file
let cache_layer = LruCacheLayer::new(cache_store.clone(), 38).await.unwrap();
let cache_layer = LruCacheLayer::new(Arc::new(file_cache.clone()), 38)
.await
.unwrap();
let store = store.layer(cache_layer.clone());
// create several object handler.
@@ -378,7 +386,7 @@ async fn test_object_store_cache_policy() -> Result<()> {
// instead of returning `NotFound` during the reader creation.
// The entry count is 4, because we have the p2 `NotFound` cache.
assert!(store.read_with(p2).range(0..4).await.is_err());
assert_eq!(cache_layer.read_cache_stat().await, (3, 35));
assert_eq!(cache_layer.read_cache_stat().await, (4, 35));
assert_cache_files(
&cache_store,
@@ -406,7 +414,7 @@ async fn test_object_store_cache_policy() -> Result<()> {
assert!(store.read(p2).await.is_err());
// Read p1 with range `1..` , the existing p1 with range `0..` must be evicted.
let _ = store.read_with(p1).range(1..15).await.unwrap();
assert_eq!(cache_layer.read_cache_stat().await, (3, 34));
assert_eq!(cache_layer.read_cache_stat().await, (4, 34));
assert_cache_files(
&cache_store,
&[
@@ -434,7 +442,7 @@ async fn test_object_store_cache_policy() -> Result<()> {
drop(cache_layer);
// Test recover
let cache_layer = LruCacheLayer::new(cache_store, 38).await.unwrap();
let cache_layer = LruCacheLayer::new(Arc::new(file_cache), 38).await.unwrap();
// The p2 `NotFound` cache will not be recovered
assert_eq!(cache_layer.read_cache_stat().await, (3, 34));

View File

@@ -56,7 +56,6 @@ store-api.workspace = true
substrait.workspace = true
table.workspace = true
tokio.workspace = true
tokio-util.workspace = true
tonic.workspace = true
[dev-dependencies]

View File

@@ -20,7 +20,7 @@ use client::{Output, OutputData, OutputMeta};
use common_base::readable_size::ReadableSize;
use common_datasource::file_format::csv::{CsvConfigBuilder, CsvFormat, CsvOpener};
use common_datasource::file_format::json::{JsonFormat, JsonOpener};
use common_datasource::file_format::orc::{infer_orc_schema, new_orc_stream_reader, ReaderAdapter};
use common_datasource::file_format::orc::{infer_orc_schema, new_orc_stream_reader};
use common_datasource::file_format::{FileFormat, Format};
use common_datasource::lister::{Lister, Source};
use common_datasource::object_store::{build_backend, parse_url};
@@ -46,7 +46,6 @@ use session::context::QueryContextRef;
use snafu::ResultExt;
use table::requests::{CopyTableRequest, InsertRequest};
use table::table_reference::TableReference;
use tokio_util::compat::FuturesAsyncReadCompatExt;
use crate::error::{self, IntoVectorsSnafu, Result};
use crate::statement::StatementExecutor;
@@ -147,16 +146,10 @@ impl StatementExecutor {
path,
}),
Format::Parquet(_) => {
let meta = object_store
.stat(&path)
.await
.context(error::ReadObjectSnafu { path: &path })?;
let mut reader = object_store
.reader(&path)
.await
.context(error::ReadObjectSnafu { path: &path })?
.into_futures_async_read(0..meta.content_length())
.compat();
.context(error::ReadObjectSnafu { path: &path })?;
let metadata = ArrowReaderMetadata::load_async(&mut reader, Default::default())
.await
.context(error::ReadParquetMetadataSnafu)?;
@@ -168,17 +161,12 @@ impl StatementExecutor {
})
}
Format::Orc(_) => {
let meta = object_store
.stat(&path)
.await
.context(error::ReadObjectSnafu { path: &path })?;
let reader = object_store
.reader(&path)
.await
.context(error::ReadObjectSnafu { path: &path })?;
let schema = infer_orc_schema(ReaderAdapter::new(reader, meta.content_length()))
let schema = infer_orc_schema(reader)
.await
.context(error::ReadOrcSnafu)?;
@@ -291,17 +279,11 @@ impl StatementExecutor {
)))
}
FileMetadata::Parquet { metadata, path, .. } => {
let meta = object_store
.stat(path)
.await
.context(error::ReadObjectSnafu { path })?;
let reader = object_store
.reader_with(path)
.chunk(DEFAULT_READ_BUFFER)
.buffer(DEFAULT_READ_BUFFER)
.await
.context(error::ReadObjectSnafu { path })?
.into_futures_async_read(0..meta.content_length())
.compat();
.context(error::ReadObjectSnafu { path })?;
let builder =
ParquetRecordBatchStreamBuilder::new_with_metadata(reader, metadata.clone());
let stream = builder
@@ -320,18 +302,12 @@ impl StatementExecutor {
)))
}
FileMetadata::Orc { path, .. } => {
let meta = object_store
.stat(path)
.await
.context(error::ReadObjectSnafu { path })?;
let reader = object_store
.reader_with(path)
.chunk(DEFAULT_READ_BUFFER)
.buffer(DEFAULT_READ_BUFFER)
.await
.context(error::ReadObjectSnafu { path })?;
let stream =
new_orc_stream_reader(ReaderAdapter::new(reader, meta.content_length()))
let stream = new_orc_stream_reader(reader)
.await
.context(error::ReadOrcSnafu)?;

View File

@@ -16,12 +16,14 @@ use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use axum::http::HeaderValue;
use common_base::Plugins;
use common_telemetry::metric::{convert_metric_to_write_request, MetricFilter};
use common_telemetry::{error, info};
use common_time::Timestamp;
use hyper::HeaderMap;
use prost::Message;
use reqwest::header::{HeaderMap, HeaderName, HeaderValue};
use reqwest::header::HeaderName;
use serde::{Deserialize, Serialize};
use session::context::QueryContextBuilder;
use snafu::{ensure, ResultExt};
@@ -113,7 +115,7 @@ impl ExportMetricsTask {
}
);
}
let mut headers = HeaderMap::new();
let mut headers = reqwest::header::HeaderMap::new();
if let Some(remote_write) = &config.remote_write {
ensure!(
!remote_write.url.is_empty(),

View File

@@ -14,10 +14,10 @@
use std::collections::HashMap;
use axum::headers::HeaderValue;
use axum::response::{IntoResponse, Response};
use axum::Json;
use common_query::Output;
use reqwest::header::HeaderValue;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use serde_json::Value;

View File

@@ -32,7 +32,6 @@
use std::convert::TryFrom;
use std::net::{SocketAddr, TcpListener};
use std::str::FromStr;
use axum::body::HttpBody;
use axum::BoxError;
@@ -170,15 +169,7 @@ impl RequestBuilder {
HeaderValue: TryFrom<V>,
<HeaderValue as TryFrom<V>>::Error: Into<http::Error>,
{
// TODO(tisonkun): revert once http bump to 1.x
let key: HeaderName = key.try_into().map_err(Into::into).unwrap();
let key = reqwest::header::HeaderName::from_bytes(key.as_ref()).unwrap();
let value: HeaderValue = value.try_into().map_err(Into::into).unwrap();
let value = reqwest::header::HeaderValue::from_bytes(value.as_bytes()).unwrap();
self.builder = self.builder.header(key, value);
self
}
@@ -219,19 +210,12 @@ impl TestResponse {
/// Get the response status.
pub fn status(&self) -> StatusCode {
StatusCode::from_u16(self.response.status().as_u16()).unwrap()
self.response.status()
}
/// Get the response headers.
pub fn headers(&self) -> http::HeaderMap {
// TODO(tisonkun): revert once http bump to 1.x
let mut headers = http::HeaderMap::new();
for (key, value) in self.response.headers() {
let key = HeaderName::from_str(key.as_str()).unwrap();
let value = HeaderValue::from_bytes(value.as_bytes()).unwrap();
headers.insert(key, value);
}
headers
pub fn headers(&self) -> &http::HeaderMap {
self.response.headers()
}
/// Get the response in chunks.