Compare commits

...

3 Commits

Author SHA1 Message Date
Ruihang Xia
1bfba48755 Revert "build(deps): upgrade opendal to 0.46 (#4037)"
This reverts commit f9db5ff0d6.
2024-06-03 20:28:59 +08:00
Ruihang Xia
457998f0fe Merge branch 'main' into avoid-query-meta
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2024-05-31 18:16:36 +08:00
Ruihang Xia
b02c256157 perf: use memory state to check if a logical region exists
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2024-05-31 18:16:08 +08:00
35 changed files with 447 additions and 649 deletions

234
Cargo.lock generated
View File

@@ -83,7 +83,7 @@ dependencies = [
"axum", "axum",
"bytes", "bytes",
"cfg-if", "cfg-if",
"http 0.2.12", "http",
"indexmap 1.9.3", "indexmap 1.9.3",
"schemars", "schemars",
"serde", "serde",
@@ -764,9 +764,9 @@ dependencies = [
"bytes", "bytes",
"futures-util", "futures-util",
"headers", "headers",
"http 0.2.12", "http",
"http-body 0.4.6", "http-body",
"hyper 0.14.28", "hyper",
"itoa", "itoa",
"matchit", "matchit",
"memchr", "memchr",
@@ -794,8 +794,8 @@ dependencies = [
"async-trait", "async-trait",
"bytes", "bytes",
"futures-util", "futures-util",
"http 0.2.12", "http",
"http-body 0.4.6", "http-body",
"mime", "mime",
"rustversion", "rustversion",
"tower-layer", "tower-layer",
@@ -1854,7 +1854,7 @@ dependencies = [
"common-telemetry", "common-telemetry",
"common-test-util", "common-test-util",
"common-version", "common-version",
"hyper 0.14.28", "hyper",
"reqwest", "reqwest",
"serde", "serde",
"tempfile", "tempfile",
@@ -1963,7 +1963,7 @@ dependencies = [
"futures-util", "futures-util",
"hex", "hex",
"humantime-serde", "humantime-serde",
"hyper 0.14.28", "hyper",
"itertools 0.10.5", "itertools 0.10.5",
"lazy_static", "lazy_static",
"moka", "moka",
@@ -3525,6 +3525,15 @@ version = "0.3.6"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a357d28ed41a50f9c765dbfe56cbc04a64e53e5fc58ba79fbc34c10ef3df831f" checksum = "a357d28ed41a50f9c765dbfe56cbc04a64e53e5fc58ba79fbc34c10ef3df831f"
[[package]]
name = "encoding_rs"
version = "0.8.34"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b45de904aa0b010bce2ab45264d0631681847fa7b6f2eaa7dab7619943bc4f59"
dependencies = [
"cfg-if",
]
[[package]] [[package]]
name = "endian-type" name = "endian-type"
version = "0.1.2" version = "0.1.2"
@@ -3595,7 +3604,7 @@ name = "etcd-client"
version = "0.12.4" version = "0.12.4"
source = "git+https://github.com/MichaelScofield/etcd-client.git?rev=4c371e9b3ea8e0a8ee2f9cbd7ded26e54a45df3b#4c371e9b3ea8e0a8ee2f9cbd7ded26e54a45df3b" source = "git+https://github.com/MichaelScofield/etcd-client.git?rev=4c371e9b3ea8e0a8ee2f9cbd7ded26e54a45df3b#4c371e9b3ea8e0a8ee2f9cbd7ded26e54a45df3b"
dependencies = [ dependencies = [
"http 0.2.12", "http",
"prost 0.12.4", "prost 0.12.4",
"tokio", "tokio",
"tokio-stream", "tokio-stream",
@@ -4210,7 +4219,7 @@ dependencies = [
"futures-core", "futures-core",
"futures-sink", "futures-sink",
"futures-util", "futures-util",
"http 0.2.12", "http",
"indexmap 2.2.6", "indexmap 2.2.6",
"slab", "slab",
"tokio", "tokio",
@@ -4294,7 +4303,7 @@ dependencies = [
"base64 0.21.7", "base64 0.21.7",
"bytes", "bytes",
"headers-core", "headers-core",
"http 0.2.12", "http",
"httpdate", "httpdate",
"mime", "mime",
"sha1", "sha1",
@@ -4306,7 +4315,7 @@ version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e7f66481bfee273957b1f20485a4ff3362987f85b2c236580d81b4eb7a326429" checksum = "e7f66481bfee273957b1f20485a4ff3362987f85b2c236580d81b4eb7a326429"
dependencies = [ dependencies = [
"http 0.2.12", "http",
] ]
[[package]] [[package]]
@@ -4409,17 +4418,6 @@ dependencies = [
"itoa", "itoa",
] ]
[[package]]
name = "http"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "21b9ddb458710bc376481b842f5da65cdf31522de232c1ca8146abce2a358258"
dependencies = [
"bytes",
"fnv",
"itoa",
]
[[package]] [[package]]
name = "http-body" name = "http-body"
version = "0.4.6" version = "0.4.6"
@@ -4427,30 +4425,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7ceab25649e9960c0311ea418d17bee82c0dcec1bd053b5f9a66e265a693bed2" checksum = "7ceab25649e9960c0311ea418d17bee82c0dcec1bd053b5f9a66e265a693bed2"
dependencies = [ dependencies = [
"bytes", "bytes",
"http 0.2.12", "http",
"pin-project-lite",
]
[[package]]
name = "http-body"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1cac85db508abc24a2e48553ba12a996e87244a0395ce011e62b37158745d643"
dependencies = [
"bytes",
"http 1.1.0",
]
[[package]]
name = "http-body-util"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0475f8b2ac86659c21b64320d5d653f9efe42acd2a4e560073ec61a155a34f1d"
dependencies = [
"bytes",
"futures-core",
"http 1.1.0",
"http-body 1.0.0",
"pin-project-lite", "pin-project-lite",
] ]
@@ -4607,8 +4582,8 @@ dependencies = [
"futures-core", "futures-core",
"futures-util", "futures-util",
"h2", "h2",
"http 0.2.12", "http",
"http-body 0.4.6", "http-body",
"httparse", "httparse",
"httpdate", "httpdate",
"itoa", "itoa",
@@ -4620,40 +4595,18 @@ dependencies = [
"want", "want",
] ]
[[package]]
name = "hyper"
version = "1.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fe575dd17d0862a9a33781c8c4696a55c320909004a67a00fb286ba8b1bc496d"
dependencies = [
"bytes",
"futures-channel",
"futures-util",
"http 1.1.0",
"http-body 1.0.0",
"httparse",
"itoa",
"pin-project-lite",
"smallvec",
"tokio",
"want",
]
[[package]] [[package]]
name = "hyper-rustls" name = "hyper-rustls"
version = "0.26.0" version = "0.24.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a0bea761b46ae2b24eb4aef630d8d1c398157b6fc29e6350ecf090a0b70c952c" checksum = "ec3efd23720e2049821a693cbc7e65ea87c72f1c58ff2f9522ff332b1491e590"
dependencies = [ dependencies = [
"futures-util", "futures-util",
"http 1.1.0", "http",
"hyper 1.3.1", "hyper",
"hyper-util", "rustls 0.21.12",
"rustls 0.22.4",
"rustls-pki-types",
"tokio", "tokio",
"tokio-rustls 0.25.0", "tokio-rustls 0.24.1",
"tower-service",
] ]
[[package]] [[package]]
@@ -4662,32 +4615,12 @@ version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bbb958482e8c7be4bc3cf272a766a2b0bf1a6755e7a6ae777f017a31d11b13b1" checksum = "bbb958482e8c7be4bc3cf272a766a2b0bf1a6755e7a6ae777f017a31d11b13b1"
dependencies = [ dependencies = [
"hyper 0.14.28", "hyper",
"pin-project-lite", "pin-project-lite",
"tokio", "tokio",
"tokio-io-timeout", "tokio-io-timeout",
] ]
[[package]]
name = "hyper-util"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ca38ef113da30126bbff9cd1705f9273e15d45498615d138b0c20279ac7a76aa"
dependencies = [
"bytes",
"futures-channel",
"futures-util",
"http 1.1.0",
"http-body 1.0.0",
"hyper 1.3.1",
"pin-project-lite",
"socket2 0.5.7",
"tokio",
"tower",
"tower-service",
"tracing",
]
[[package]] [[package]]
name = "iana-time-zone" name = "iana-time-zone"
version = "0.1.60" version = "0.1.60"
@@ -5671,7 +5604,7 @@ dependencies = [
"etcd-client", "etcd-client",
"futures", "futures",
"h2", "h2",
"http-body 0.4.6", "http-body",
"humantime", "humantime",
"humantime-serde", "humantime-serde",
"itertools 0.10.5", "itertools 0.10.5",
@@ -6433,6 +6366,7 @@ name = "object-store"
version = "0.8.1" version = "0.8.1"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"async-trait",
"bytes", "bytes",
"common-telemetry", "common-telemetry",
"common-test-util", "common-test-util",
@@ -6481,21 +6415,20 @@ checksum = "0ab1bc2a289d34bd04a330323ac98a1b4bc82c9d9fcb1e66b63caa84da26b575"
[[package]] [[package]]
name = "opendal" name = "opendal"
version = "0.46.0" version = "0.45.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "328c4992328e8965e6a6ef102d38438b5fdc7d9b9107eda2377ba05379d9d544" checksum = "52c17c077f23fa2d2c25d9d22af98baa43b8bbe2ef0de80cf66339aa70401467"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"async-trait", "async-trait",
"backon", "backon",
"base64 0.22.1", "base64 0.21.7",
"bytes", "bytes",
"chrono", "chrono",
"crc32c",
"flagset", "flagset",
"futures", "futures",
"getrandom", "getrandom",
"http 1.1.0", "http",
"log", "log",
"md-5", "md-5",
"once_cell", "once_cell",
@@ -6583,7 +6516,7 @@ checksum = "f24cda83b20ed2433c68241f918d0f6fdec8b1d43b7a9590ab4420c5095ca930"
dependencies = [ dependencies = [
"async-trait", "async-trait",
"futures-core", "futures-core",
"http 0.2.12", "http",
"opentelemetry 0.21.0", "opentelemetry 0.21.0",
"opentelemetry-proto 0.4.0", "opentelemetry-proto 0.4.0",
"opentelemetry-semantic-conventions", "opentelemetry-semantic-conventions",
@@ -6720,7 +6653,6 @@ dependencies = [
"substrait 0.8.1", "substrait 0.8.1",
"table", "table",
"tokio", "tokio",
"tokio-util",
"tonic 0.11.0", "tonic 0.11.0",
] ]
@@ -8264,20 +8196,20 @@ dependencies = [
[[package]] [[package]]
name = "reqsign" name = "reqsign"
version = "0.15.0" version = "0.14.9"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "01edce6b6c31a16ebc7525ac58c747a6d78bbce33e76bbebd350d6bc25b23e06" checksum = "43e319d9de9ff4d941abf4ac718897118b0fe04577ea3f8e0f5788971784eef5"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"async-trait", "async-trait",
"base64 0.22.1", "base64 0.21.7",
"chrono", "chrono",
"form_urlencoded", "form_urlencoded",
"getrandom", "getrandom",
"hex", "hex",
"hmac", "hmac",
"home", "home",
"http 1.1.0", "http",
"jsonwebtoken", "jsonwebtoken",
"log", "log",
"once_cell", "once_cell",
@@ -8286,7 +8218,7 @@ dependencies = [
"rand", "rand",
"reqwest", "reqwest",
"rsa 0.9.6", "rsa 0.9.6",
"rust-ini 0.21.0", "rust-ini 0.20.0",
"serde", "serde",
"serde_json", "serde_json",
"sha1", "sha1",
@@ -8295,20 +8227,20 @@ dependencies = [
[[package]] [[package]]
name = "reqwest" name = "reqwest"
version = "0.12.4" version = "0.11.27"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "566cafdd92868e0939d3fb961bd0dc25fcfaaed179291093b3d43e6b3150ea10" checksum = "dd67538700a17451e7cba03ac727fb961abb7607553461627b97de0b89cf4a62"
dependencies = [ dependencies = [
"base64 0.22.1", "base64 0.21.7",
"bytes", "bytes",
"encoding_rs",
"futures-core", "futures-core",
"futures-util", "futures-util",
"http 1.1.0", "h2",
"http-body 1.0.0", "http",
"http-body-util", "http-body",
"hyper 1.3.1", "hyper",
"hyper-rustls", "hyper-rustls",
"hyper-util",
"ipnet", "ipnet",
"js-sys", "js-sys",
"log", "log",
@@ -8317,16 +8249,16 @@ dependencies = [
"once_cell", "once_cell",
"percent-encoding", "percent-encoding",
"pin-project-lite", "pin-project-lite",
"rustls 0.22.4", "rustls 0.21.12",
"rustls-native-certs", "rustls-native-certs",
"rustls-pemfile 2.1.2", "rustls-pemfile 1.0.4",
"rustls-pki-types",
"serde", "serde",
"serde_json", "serde_json",
"serde_urlencoded", "serde_urlencoded",
"sync_wrapper", "sync_wrapper",
"system-configuration",
"tokio", "tokio",
"tokio-rustls 0.25.0", "tokio-rustls 0.24.1",
"tokio-util", "tokio-util",
"tower-service", "tower-service",
"url", "url",
@@ -8334,8 +8266,7 @@ dependencies = [
"wasm-bindgen-futures", "wasm-bindgen-futures",
"wasm-streams", "wasm-streams",
"web-sys", "web-sys",
"webpki-roots 0.26.1", "winreg 0.50.0",
"winreg 0.52.0",
] ]
[[package]] [[package]]
@@ -8601,13 +8532,12 @@ dependencies = [
[[package]] [[package]]
name = "rust-ini" name = "rust-ini"
version = "0.21.0" version = "0.20.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0d625ed57d8f49af6cfa514c42e1a71fadcff60eb0b1c517ff82fe41aa025b41" checksum = "3e0698206bcb8882bf2a9ecb4c1e7785db57ff052297085a6efd4fe42302068a"
dependencies = [ dependencies = [
"cfg-if", "cfg-if",
"ordered-multimap 0.7.3", "ordered-multimap 0.7.3",
"trim-in-place",
] ]
[[package]] [[package]]
@@ -8749,13 +8679,12 @@ dependencies = [
[[package]] [[package]]
name = "rustls-native-certs" name = "rustls-native-certs"
version = "0.7.0" version = "0.6.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8f1fb85efa936c42c6d5fc28d2629bb51e4b2f4b8a5211e297d599cc5a093792" checksum = "a9aace74cb666635c918e9c12bc0d348266037aa8eb599b5cba565709a8dff00"
dependencies = [ dependencies = [
"openssl-probe", "openssl-probe",
"rustls-pemfile 2.1.2", "rustls-pemfile 1.0.4",
"rustls-pki-types",
"schannel", "schannel",
"security-framework", "security-framework",
] ]
@@ -9588,10 +9517,10 @@ dependencies = [
"hashbrown 0.14.5", "hashbrown 0.14.5",
"headers", "headers",
"hostname", "hostname",
"http 0.2.12", "http",
"http-body 0.4.6", "http-body",
"humantime-serde", "humantime-serde",
"hyper 0.14.28", "hyper",
"influxdb_line_protocol", "influxdb_line_protocol",
"itertools 0.10.5", "itertools 0.10.5",
"lazy_static", "lazy_static",
@@ -11215,9 +11144,9 @@ dependencies = [
"futures-core", "futures-core",
"futures-util", "futures-util",
"h2", "h2",
"http 0.2.12", "http",
"http-body 0.4.6", "http-body",
"hyper 0.14.28", "hyper",
"hyper-timeout", "hyper-timeout",
"percent-encoding", "percent-encoding",
"pin-project", "pin-project",
@@ -11243,9 +11172,9 @@ dependencies = [
"bytes", "bytes",
"flate2", "flate2",
"h2", "h2",
"http 0.2.12", "http",
"http-body 0.4.6", "http-body",
"hyper 0.14.28", "hyper",
"hyper-timeout", "hyper-timeout",
"percent-encoding", "percent-encoding",
"pin-project", "pin-project",
@@ -11347,8 +11276,8 @@ dependencies = [
"bytes", "bytes",
"futures-core", "futures-core",
"futures-util", "futures-util",
"http 0.2.12", "http",
"http-body 0.4.6", "http-body",
"http-range-header", "http-range-header",
"httpdate", "httpdate",
"iri-string", "iri-string",
@@ -11591,12 +11520,6 @@ dependencies = [
"tree-sitter", "tree-sitter",
] ]
[[package]]
name = "trim-in-place"
version = "0.1.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "343e926fc669bc8cde4fa3129ab681c63671bae288b1f1081ceee6d9d37904fc"
[[package]] [[package]]
name = "triomphe" name = "triomphe"
version = "0.1.11" version = "0.1.11"
@@ -12262,15 +12185,6 @@ version = "0.25.4"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5f20c57d8d7db6d3b86154206ae5d8fba62dd39573114de97c2cb0578251f8e1" checksum = "5f20c57d8d7db6d3b86154206ae5d8fba62dd39573114de97c2cb0578251f8e1"
[[package]]
name = "webpki-roots"
version = "0.26.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b3de34ae270483955a94f4b21bdaaeb83d508bb84a01435f393818edb0012009"
dependencies = [
"rustls-pki-types",
]
[[package]] [[package]]
name = "which" name = "which"
version = "4.4.2" version = "4.4.2"
@@ -12637,9 +12551,9 @@ dependencies = [
[[package]] [[package]]
name = "winreg" name = "winreg"
version = "0.52.0" version = "0.50.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a277a57398d4bfa075df44f501a17cfdf8542d224f0d36095a2adc7aee4ef0a5" checksum = "524e57b2c537c0f9b1e69f1965311ec12182b4122e45035b1508cd24d2adadb1"
dependencies = [ dependencies = [
"cfg-if", "cfg-if",
"windows-sys 0.48.0", "windows-sys 0.48.0",

View File

@@ -146,7 +146,7 @@ raft-engine = { version = "0.4.1", default-features = false }
rand = "0.8" rand = "0.8"
regex = "1.8" regex = "1.8"
regex-automata = { version = "0.4" } regex-automata = { version = "0.4" }
reqwest = { version = "0.12", default-features = false, features = [ reqwest = { version = "0.11", default-features = false, features = [
"json", "json",
"rustls-tls-native-roots", "rustls-tls-native-roots",
"stream", "stream",

View File

@@ -92,44 +92,34 @@ impl CompressionType {
macro_rules! impl_compression_type { macro_rules! impl_compression_type {
($(($enum_item:ident, $prefix:ident)),*) => { ($(($enum_item:ident, $prefix:ident)),*) => {
paste::item! { paste::item! {
use bytes::{Buf, BufMut, BytesMut};
impl CompressionType { impl CompressionType {
pub async fn encode<B: Buf>(&self, mut content: B) -> io::Result<Vec<u8>> { pub async fn encode(&self, content: impl AsRef<[u8]>) -> io::Result<Vec<u8>> {
match self { match self {
$( $(
CompressionType::$enum_item => { CompressionType::$enum_item => {
let mut buffer = Vec::with_capacity(content.remaining()); let mut buffer = Vec::with_capacity(content.as_ref().len());
let mut encoder = write::[<$prefix Encoder>]::new(&mut buffer); let mut encoder = write::[<$prefix Encoder>]::new(&mut buffer);
encoder.write_all_buf(&mut content).await?; encoder.write_all(content.as_ref()).await?;
encoder.shutdown().await?; encoder.shutdown().await?;
Ok(buffer) Ok(buffer)
} }
)* )*
CompressionType::Uncompressed => { CompressionType::Uncompressed => Ok(content.as_ref().to_vec()),
let mut bs = BytesMut::with_capacity(content.remaining());
bs.put(content);
Ok(bs.to_vec())
},
} }
} }
pub async fn decode<B: Buf>(&self, mut content: B) -> io::Result<Vec<u8>> { pub async fn decode(&self, content: impl AsRef<[u8]>) -> io::Result<Vec<u8>> {
match self { match self {
$( $(
CompressionType::$enum_item => { CompressionType::$enum_item => {
let mut buffer = Vec::with_capacity(content.remaining() * 2); let mut buffer = Vec::with_capacity(content.as_ref().len() * 2);
let mut encoder = write::[<$prefix Decoder>]::new(&mut buffer); let mut encoder = write::[<$prefix Decoder>]::new(&mut buffer);
encoder.write_all_buf(&mut content).await?; encoder.write_all(content.as_ref()).await?;
encoder.shutdown().await?; encoder.shutdown().await?;
Ok(buffer) Ok(buffer)
} }
)* )*
CompressionType::Uncompressed => { CompressionType::Uncompressed => Ok(content.as_ref().to_vec()),
let mut bs = BytesMut::with_capacity(content.remaining());
bs.put(content);
Ok(bs.to_vec())
},
} }
} }
@@ -161,13 +151,13 @@ macro_rules! impl_compression_type {
$( $(
#[tokio::test] #[tokio::test]
async fn [<test_ $enum_item:lower _compression>]() { async fn [<test_ $enum_item:lower _compression>]() {
let string = "foo_bar".as_bytes(); let string = "foo_bar".as_bytes().to_vec();
let compress = CompressionType::$enum_item let compress = CompressionType::$enum_item
.encode(string) .encode(&string)
.await .await
.unwrap(); .unwrap();
let decompress = CompressionType::$enum_item let decompress = CompressionType::$enum_item
.decode(compress.as_slice()) .decode(&compress)
.await .await
.unwrap(); .unwrap();
assert_eq!(decompress, string); assert_eq!(decompress, string);
@@ -175,13 +165,13 @@ macro_rules! impl_compression_type {
#[tokio::test] #[tokio::test]
async fn test_uncompression() { async fn test_uncompression() {
let string = "foo_bar".as_bytes(); let string = "foo_bar".as_bytes().to_vec();
let compress = CompressionType::Uncompressed let compress = CompressionType::Uncompressed
.encode(string) .encode(&string)
.await .await
.unwrap(); .unwrap();
let decompress = CompressionType::Uncompressed let decompress = CompressionType::Uncompressed
.decode(compress.as_slice()) .decode(&compress)
.await .await
.unwrap(); .unwrap();
assert_eq!(decompress, string); assert_eq!(decompress, string);

View File

@@ -36,7 +36,6 @@ use datafusion::physical_plan::SendableRecordBatchStream;
use futures::StreamExt; use futures::StreamExt;
use object_store::ObjectStore; use object_store::ObjectStore;
use snafu::ResultExt; use snafu::ResultExt;
use tokio_util::compat::FuturesAsyncWriteCompatExt;
use self::csv::CsvFormat; use self::csv::CsvFormat;
use self::json::JsonFormat; use self::json::JsonFormat;
@@ -147,8 +146,7 @@ pub fn open_with_decoder<T: ArrowDecoder, F: Fn() -> DataFusionResult<T>>(
let reader = object_store let reader = object_store
.reader(&path) .reader(&path)
.await .await
.map_err(|e| DataFusionError::External(Box::new(e)))? .map_err(|e| DataFusionError::External(Box::new(e)))?;
.into_bytes_stream(..);
let mut upstream = compression_type.convert_stream(reader).fuse(); let mut upstream = compression_type.convert_stream(reader).fuse();
@@ -205,7 +203,6 @@ pub async fn stream_to_file<T: DfRecordBatchEncoder, U: Fn(SharedBuffer) -> T>(
.writer_with(&path) .writer_with(&path)
.concurrent(concurrency) .concurrent(concurrency)
.await .await
.map(|v| v.into_futures_async_write().compat_write())
.context(error::WriteObjectSnafu { path }) .context(error::WriteObjectSnafu { path })
}); });

View File

@@ -29,7 +29,6 @@ use datafusion::physical_plan::SendableRecordBatchStream;
use derive_builder::Builder; use derive_builder::Builder;
use object_store::ObjectStore; use object_store::ObjectStore;
use snafu::ResultExt; use snafu::ResultExt;
use tokio_util::compat::FuturesAsyncReadCompatExt;
use tokio_util::io::SyncIoBridge; use tokio_util::io::SyncIoBridge;
use super::stream_to_file; use super::stream_to_file;
@@ -165,16 +164,10 @@ impl FileOpener for CsvOpener {
#[async_trait] #[async_trait]
impl FileFormat for CsvFormat { impl FileFormat for CsvFormat {
async fn infer_schema(&self, store: &ObjectStore, path: &str) -> Result<Schema> { async fn infer_schema(&self, store: &ObjectStore, path: &str) -> Result<Schema> {
let meta = store
.stat(path)
.await
.context(error::ReadObjectSnafu { path })?;
let reader = store let reader = store
.reader(path) .reader(path)
.await .await
.context(error::ReadObjectSnafu { path })? .context(error::ReadObjectSnafu { path })?;
.into_futures_async_read(0..meta.content_length())
.compat();
let decoded = self.compression_type.convert_async_read(reader); let decoded = self.compression_type.convert_async_read(reader);

View File

@@ -31,7 +31,6 @@ use datafusion::error::{DataFusionError, Result as DataFusionResult};
use datafusion::physical_plan::SendableRecordBatchStream; use datafusion::physical_plan::SendableRecordBatchStream;
use object_store::ObjectStore; use object_store::ObjectStore;
use snafu::ResultExt; use snafu::ResultExt;
use tokio_util::compat::FuturesAsyncReadCompatExt;
use tokio_util::io::SyncIoBridge; use tokio_util::io::SyncIoBridge;
use super::stream_to_file; use super::stream_to_file;
@@ -83,16 +82,10 @@ impl Default for JsonFormat {
#[async_trait] #[async_trait]
impl FileFormat for JsonFormat { impl FileFormat for JsonFormat {
async fn infer_schema(&self, store: &ObjectStore, path: &str) -> Result<Schema> { async fn infer_schema(&self, store: &ObjectStore, path: &str) -> Result<Schema> {
let meta = store
.stat(path)
.await
.context(error::ReadObjectSnafu { path })?;
let reader = store let reader = store
.reader(path) .reader(path)
.await .await
.context(error::ReadObjectSnafu { path })? .context(error::ReadObjectSnafu { path })?;
.into_futures_async_read(0..meta.content_length())
.compat();
let decoded = self.compression_type.convert_async_read(reader); let decoded = self.compression_type.convert_async_read(reader);

View File

@@ -16,17 +16,15 @@ use std::sync::Arc;
use arrow_schema::{ArrowError, Schema, SchemaRef}; use arrow_schema::{ArrowError, Schema, SchemaRef};
use async_trait::async_trait; use async_trait::async_trait;
use bytes::Bytes;
use common_recordbatch::adapter::RecordBatchStreamTypeAdapter; use common_recordbatch::adapter::RecordBatchStreamTypeAdapter;
use datafusion::datasource::physical_plan::{FileMeta, FileOpenFuture, FileOpener}; use datafusion::datasource::physical_plan::{FileMeta, FileOpenFuture, FileOpener};
use datafusion::error::{DataFusionError, Result as DfResult}; use datafusion::error::{DataFusionError, Result as DfResult};
use futures::future::BoxFuture; use futures::{StreamExt, TryStreamExt};
use futures::{FutureExt, StreamExt, TryStreamExt};
use object_store::ObjectStore; use object_store::ObjectStore;
use orc_rust::arrow_reader::ArrowReaderBuilder; use orc_rust::arrow_reader::ArrowReaderBuilder;
use orc_rust::async_arrow_reader::ArrowStreamReader; use orc_rust::async_arrow_reader::ArrowStreamReader;
use orc_rust::reader::AsyncChunkReader;
use snafu::ResultExt; use snafu::ResultExt;
use tokio::io::{AsyncRead, AsyncSeek};
use crate::error::{self, Result}; use crate::error::{self, Result};
use crate::file_format::FileFormat; use crate::file_format::FileFormat;
@@ -34,49 +32,18 @@ use crate::file_format::FileFormat;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)] #[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub struct OrcFormat; pub struct OrcFormat;
#[derive(Clone)] pub async fn new_orc_stream_reader<R: AsyncRead + AsyncSeek + Unpin + Send + 'static>(
pub struct ReaderAdapter { reader: R,
reader: object_store::Reader, ) -> Result<ArrowStreamReader<R>> {
len: u64,
}
impl ReaderAdapter {
pub fn new(reader: object_store::Reader, len: u64) -> Self {
Self { reader, len }
}
}
impl AsyncChunkReader for ReaderAdapter {
fn len(&mut self) -> BoxFuture<'_, std::io::Result<u64>> {
async move { Ok(self.len) }.boxed()
}
fn get_bytes(
&mut self,
offset_from_start: u64,
length: u64,
) -> BoxFuture<'_, std::io::Result<Bytes>> {
async move {
let bytes = self
.reader
.read(offset_from_start..offset_from_start + length)
.await?;
Ok(bytes.to_bytes())
}
.boxed()
}
}
pub async fn new_orc_stream_reader(
reader: ReaderAdapter,
) -> Result<ArrowStreamReader<ReaderAdapter>> {
let reader_build = ArrowReaderBuilder::try_new_async(reader) let reader_build = ArrowReaderBuilder::try_new_async(reader)
.await .await
.context(error::OrcReaderSnafu)?; .context(error::OrcReaderSnafu)?;
Ok(reader_build.build_async()) Ok(reader_build.build_async())
} }
pub async fn infer_orc_schema(reader: ReaderAdapter) -> Result<Schema> { pub async fn infer_orc_schema<R: AsyncRead + AsyncSeek + Unpin + Send + 'static>(
reader: R,
) -> Result<Schema> {
let reader = new_orc_stream_reader(reader).await?; let reader = new_orc_stream_reader(reader).await?;
Ok(reader.schema().as_ref().clone()) Ok(reader.schema().as_ref().clone())
} }
@@ -84,15 +51,13 @@ pub async fn infer_orc_schema(reader: ReaderAdapter) -> Result<Schema> {
#[async_trait] #[async_trait]
impl FileFormat for OrcFormat { impl FileFormat for OrcFormat {
async fn infer_schema(&self, store: &ObjectStore, path: &str) -> Result<Schema> { async fn infer_schema(&self, store: &ObjectStore, path: &str) -> Result<Schema> {
let meta = store
.stat(path)
.await
.context(error::ReadObjectSnafu { path })?;
let reader = store let reader = store
.reader(path) .reader(path)
.await .await
.context(error::ReadObjectSnafu { path })?; .context(error::ReadObjectSnafu { path })?;
let schema = infer_orc_schema(ReaderAdapter::new(reader, meta.content_length())).await?;
let schema = infer_orc_schema(reader).await?;
Ok(schema) Ok(schema)
} }
} }
@@ -132,20 +97,12 @@ impl FileOpener for OrcOpener {
}; };
let projection = self.projection.clone(); let projection = self.projection.clone();
Ok(Box::pin(async move { Ok(Box::pin(async move {
let path = meta.location().to_string();
let meta = object_store
.stat(&path)
.await
.map_err(|e| DataFusionError::External(Box::new(e)))?;
let reader = object_store let reader = object_store
.reader(&path) .reader(meta.location().to_string().as_str())
.await .await
.map_err(|e| DataFusionError::External(Box::new(e)))?; .map_err(|e| DataFusionError::External(Box::new(e)))?;
let stream_reader = let stream_reader = new_orc_stream_reader(reader)
new_orc_stream_reader(ReaderAdapter::new(reader, meta.content_length()))
.await .await
.map_err(|e| DataFusionError::External(Box::new(e)))?; .map_err(|e| DataFusionError::External(Box::new(e)))?;

View File

@@ -29,11 +29,10 @@ use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
use datafusion::physical_plan::SendableRecordBatchStream; use datafusion::physical_plan::SendableRecordBatchStream;
use futures::future::BoxFuture; use futures::future::BoxFuture;
use futures::StreamExt; use futures::StreamExt;
use object_store::{FuturesAsyncReader, ObjectStore}; use object_store::{ObjectStore, Reader, Writer};
use parquet::basic::{Compression, ZstdLevel}; use parquet::basic::{Compression, ZstdLevel};
use parquet::file::properties::WriterProperties; use parquet::file::properties::WriterProperties;
use snafu::ResultExt; use snafu::ResultExt;
use tokio_util::compat::{Compat, FuturesAsyncReadCompatExt, FuturesAsyncWriteCompatExt};
use crate::buffered_writer::{ArrowWriterCloser, DfRecordBatchEncoder, LazyBufferedWriter}; use crate::buffered_writer::{ArrowWriterCloser, DfRecordBatchEncoder, LazyBufferedWriter};
use crate::error::{self, Result}; use crate::error::{self, Result};
@@ -46,16 +45,10 @@ pub struct ParquetFormat {}
#[async_trait] #[async_trait]
impl FileFormat for ParquetFormat { impl FileFormat for ParquetFormat {
async fn infer_schema(&self, store: &ObjectStore, path: &str) -> Result<Schema> { async fn infer_schema(&self, store: &ObjectStore, path: &str) -> Result<Schema> {
let meta = store
.stat(path)
.await
.context(error::ReadObjectSnafu { path })?;
let mut reader = store let mut reader = store
.reader(path) .reader(path)
.await .await
.context(error::ReadObjectSnafu { path })? .context(error::ReadObjectSnafu { path })?;
.into_futures_async_read(0..meta.content_length())
.compat();
let metadata = reader let metadata = reader
.get_metadata() .get_metadata()
@@ -105,7 +98,7 @@ impl ParquetFileReaderFactory for DefaultParquetFileReaderFactory {
pub struct LazyParquetFileReader { pub struct LazyParquetFileReader {
object_store: ObjectStore, object_store: ObjectStore,
reader: Option<Compat<FuturesAsyncReader>>, reader: Option<Reader>,
path: String, path: String,
} }
@@ -121,13 +114,7 @@ impl LazyParquetFileReader {
/// Must initialize the reader, or throw an error from the future. /// Must initialize the reader, or throw an error from the future.
async fn maybe_initialize(&mut self) -> result::Result<(), object_store::Error> { async fn maybe_initialize(&mut self) -> result::Result<(), object_store::Error> {
if self.reader.is_none() { if self.reader.is_none() {
let meta = self.object_store.stat(&self.path).await?; let reader = self.object_store.reader(&self.path).await?;
let reader = self
.object_store
.reader(&self.path)
.await?
.into_futures_async_read(0..meta.content_length())
.compat();
self.reader = Some(reader); self.reader = Some(reader);
} }
@@ -180,17 +167,16 @@ pub struct BufferedWriter {
} }
type InnerBufferedWriter = LazyBufferedWriter< type InnerBufferedWriter = LazyBufferedWriter<
Compat<object_store::FuturesAsyncWriter>, object_store::Writer,
ArrowWriter<SharedBuffer>, ArrowWriter<SharedBuffer>,
impl Fn(String) -> BoxFuture<'static, Result<Compat<object_store::FuturesAsyncWriter>>>, impl Fn(String) -> BoxFuture<'static, Result<Writer>>,
>; >;
impl BufferedWriter { impl BufferedWriter {
fn make_write_factory( fn make_write_factory(
store: ObjectStore, store: ObjectStore,
concurrency: usize, concurrency: usize,
) -> impl Fn(String) -> BoxFuture<'static, Result<Compat<object_store::FuturesAsyncWriter>>> ) -> impl Fn(String) -> BoxFuture<'static, Result<Writer>> {
{
move |path| { move |path| {
let store = store.clone(); let store = store.clone();
Box::pin(async move { Box::pin(async move {
@@ -198,7 +184,6 @@ impl BufferedWriter {
.writer_with(&path) .writer_with(&path)
.concurrent(concurrency) .concurrent(concurrency)
.await .await
.map(|v| v.into_futures_async_write().compat_write())
.context(error::WriteObjectSnafu { path }) .context(error::WriteObjectSnafu { path })
}) })
} }

View File

@@ -120,7 +120,7 @@ pub async fn setup_stream_to_json_test(origin_path: &str, threshold: impl Fn(usi
let written = tmp_store.read(&output_path).await.unwrap(); let written = tmp_store.read(&output_path).await.unwrap();
let origin = store.read(origin_path).await.unwrap(); let origin = store.read(origin_path).await.unwrap();
assert_eq_lines(written.to_vec(), origin.to_vec()); assert_eq_lines(written, origin);
} }
pub async fn setup_stream_to_csv_test(origin_path: &str, threshold: impl Fn(usize) -> usize) { pub async fn setup_stream_to_csv_test(origin_path: &str, threshold: impl Fn(usize) -> usize) {
@@ -158,7 +158,7 @@ pub async fn setup_stream_to_csv_test(origin_path: &str, threshold: impl Fn(usiz
let written = tmp_store.read(&output_path).await.unwrap(); let written = tmp_store.read(&output_path).await.unwrap();
let origin = store.read(origin_path).await.unwrap(); let origin = store.read(origin_path).await.unwrap();
assert_eq_lines(written.to_vec(), origin.to_vec()); assert_eq_lines(written, origin);
} }
// Ignore the CRLF difference across operating systems. // Ignore the CRLF difference across operating systems.

View File

@@ -179,7 +179,7 @@ impl StateStore for ObjectStateStore {
)) ))
}) })
.context(ListStateSnafu { path: key })?; .context(ListStateSnafu { path: key })?;
yield (key.into(), value.to_vec()); yield (key.into(), value);
} }
} }
}); });

View File

@@ -20,6 +20,7 @@ mod gcs;
mod oss; mod oss;
mod s3; mod s3;
use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use std::{env, path}; use std::{env, path};
@@ -28,7 +29,7 @@ use common_telemetry::info;
use object_store::layers::{LruCacheLayer, RetryLayer}; use object_store::layers::{LruCacheLayer, RetryLayer};
use object_store::services::Fs; use object_store::services::Fs;
use object_store::util::{join_dir, normalize_dir, with_instrument_layers}; use object_store::util::{join_dir, normalize_dir, with_instrument_layers};
use object_store::{HttpClient, ObjectStore}; use object_store::{HttpClient, ObjectStore, ObjectStoreBuilder};
use snafu::prelude::*; use snafu::prelude::*;
use crate::config::{ObjectStoreConfig, DEFAULT_OBJECT_STORE_CACHE_SIZE}; use crate::config::{ObjectStoreConfig, DEFAULT_OBJECT_STORE_CACHE_SIZE};
@@ -106,13 +107,13 @@ async fn create_object_store_with_cache(
if let Some(path) = cache_path { if let Some(path) = cache_path {
let atomic_temp_dir = join_dir(path, ".tmp/"); let atomic_temp_dir = join_dir(path, ".tmp/");
clean_temp_dir(&atomic_temp_dir)?; clean_temp_dir(&atomic_temp_dir)?;
let mut builder = Fs::default(); let cache_store = Fs::default()
builder.root(path).atomic_write_dir(&atomic_temp_dir); .root(path)
let cache_store = ObjectStore::new(builder) .atomic_write_dir(&atomic_temp_dir)
.context(error::InitBackendSnafu)? .build()
.finish(); .context(error::InitBackendSnafu)?;
let cache_layer = LruCacheLayer::new(cache_store, cache_capacity.0 as usize) let cache_layer = LruCacheLayer::new(Arc::new(cache_store), cache_capacity.0 as usize)
.await .await
.context(error::InitBackendSnafu)?; .context(error::InitBackendSnafu)?;

View File

@@ -71,8 +71,7 @@ impl FileRegionManifest {
let bs = object_store let bs = object_store
.read(path) .read(path)
.await .await
.context(LoadRegionManifestSnafu { region_id })? .context(LoadRegionManifestSnafu { region_id })?;
.to_vec();
Self::decode(bs.as_slice()) Self::decode(bs.as_slice())
} }

View File

@@ -171,9 +171,10 @@ impl MetricEngineInner {
// check if the logical region already exist // check if the logical region already exist
if self if self
.metadata_region .state
.is_logical_region_exists(metadata_region_id, logical_region_id) .read()
.await? .unwrap()
.is_logical_region_exists(logical_region_id)
{ {
info!("Create a existing logical region {logical_region_id}. Skipped"); info!("Create a existing logical region {logical_region_id}. Skipped");
return Ok(data_region_id); return Ok(data_region_id);

View File

@@ -104,7 +104,7 @@ impl MetricEngineInner {
// check if the region exists // check if the region exists
let data_region_id = to_data_region_id(physical_region_id); let data_region_id = to_data_region_id(physical_region_id);
let state = self.state.read().unwrap(); let state = self.state.read().unwrap();
if !state.is_logical_region_exist(logical_region_id) { if !state.is_logical_region_exists(logical_region_id) {
error!("Trying to write to an nonexistent region {logical_region_id}"); error!("Trying to write to an nonexistent region {logical_region_id}");
return LogicalRegionNotFoundSnafu { return LogicalRegionNotFoundSnafu {
region_id: logical_region_id, region_id: logical_region_id,

View File

@@ -149,7 +149,7 @@ impl MetricEngineState {
Ok(exist) Ok(exist)
} }
pub fn is_logical_region_exist(&self, logical_region_id: RegionId) -> bool { pub fn is_logical_region_exists(&self, logical_region_id: RegionId) -> bool {
self.logical_regions().contains_key(&logical_region_id) self.logical_regions().contains_key(&logical_region_id)
} }
} }

View File

@@ -139,17 +139,6 @@ impl MetadataRegion {
Ok(()) Ok(())
} }
/// Check if the given logical region exists.
pub async fn is_logical_region_exists(
&self,
physical_region_id: RegionId,
logical_region_id: RegionId,
) -> Result<bool> {
let region_id = utils::to_metadata_region_id(physical_region_id);
let region_key = Self::concat_region_key(logical_region_id);
self.exists(region_id, &region_key).await
}
/// Check if the given column exists. Return the semantic type if exists. /// Check if the given column exists. Return the semantic type if exists.
pub async fn column_semantic_type( pub async fn column_semantic_type(
&self, &self,
@@ -669,10 +658,6 @@ mod test {
.add_logical_region(physical_region_id, logical_region_id) .add_logical_region(physical_region_id, logical_region_id)
.await .await
.unwrap(); .unwrap();
assert!(metadata_region
.is_logical_region_exists(physical_region_id, logical_region_id)
.await
.unwrap());
// add it again // add it again
assert!(metadata_region assert!(metadata_region

View File

@@ -112,10 +112,6 @@ impl FileCache {
self.memory_index.insert(key, value).await; self.memory_index.insert(key, value).await;
} }
pub(crate) async fn get(&self, key: IndexKey) -> Option<IndexValue> {
self.memory_index.get(&key).await
}
/// Reads a file from the cache. /// Reads a file from the cache.
pub(crate) async fn reader(&self, key: IndexKey) -> Option<Reader> { pub(crate) async fn reader(&self, key: IndexKey) -> Option<Reader> {
// We must use `get()` to update the estimator of the cache. // We must use `get()` to update the estimator of the cache.
@@ -376,6 +372,7 @@ fn parse_index_key(name: &str) -> Option<IndexKey> {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use common_test_util::temp_dir::create_temp_dir; use common_test_util::temp_dir::create_temp_dir;
use futures::AsyncReadExt;
use object_store::services::Fs; use object_store::services::Fs;
use super::*; use super::*;
@@ -454,9 +451,10 @@ mod tests {
.await; .await;
// Read file content. // Read file content.
let reader = cache.reader(key).await.unwrap(); let mut reader = cache.reader(key).await.unwrap();
let buf = reader.read(..).await.unwrap().to_vec(); let mut buf = String::new();
assert_eq!("hello", String::from_utf8(buf).unwrap()); reader.read_to_string(&mut buf).await.unwrap();
assert_eq!("hello", buf);
// Get weighted size. // Get weighted size.
cache.memory_index.run_pending_tasks().await; cache.memory_index.run_pending_tasks().await;
@@ -551,9 +549,10 @@ mod tests {
for (i, file_id) in file_ids.iter().enumerate() { for (i, file_id) in file_ids.iter().enumerate() {
let key = IndexKey::new(region_id, *file_id, file_type); let key = IndexKey::new(region_id, *file_id, file_type);
let reader = cache.reader(key).await.unwrap(); let mut reader = cache.reader(key).await.unwrap();
let buf = reader.read(..).await.unwrap().to_vec(); let mut buf = String::new();
assert_eq!(i.to_string(), String::from_utf8(buf).unwrap()); reader.read_to_string(&mut buf).await.unwrap();
assert_eq!(i.to_string(), buf);
} }
} }

View File

@@ -19,7 +19,6 @@ use std::time::Duration;
use common_base::readable_size::ReadableSize; use common_base::readable_size::ReadableSize;
use common_telemetry::{debug, info}; use common_telemetry::{debug, info};
use futures::AsyncWriteExt;
use object_store::manager::ObjectStoreManagerRef; use object_store::manager::ObjectStoreManagerRef;
use object_store::ObjectStore; use object_store::ObjectStore;
use snafu::ResultExt; use snafu::ResultExt;
@@ -176,27 +175,19 @@ impl WriteCache {
}]) }])
.start_timer(); .start_timer();
let cached_value = self
.file_cache
.local_store()
.stat(&cache_path)
.await
.context(error::OpenDalSnafu)?;
let reader = self let reader = self
.file_cache .file_cache
.local_store() .local_store()
.reader(&cache_path) .reader(&cache_path)
.await .await
.context(error::OpenDalSnafu)? .context(error::OpenDalSnafu)?;
.into_futures_async_read(0..cached_value.content_length());
let mut writer = remote_store let mut writer = remote_store
.writer_with(upload_path) .writer_with(upload_path)
.chunk(DEFAULT_WRITE_BUFFER_SIZE.as_bytes() as usize) .buffer(DEFAULT_WRITE_BUFFER_SIZE.as_bytes() as usize)
.concurrent(DEFAULT_WRITE_CONCURRENCY) .concurrent(DEFAULT_WRITE_CONCURRENCY)
.await .await
.context(error::OpenDalSnafu)? .context(error::OpenDalSnafu)?;
.into_futures_async_write();
let bytes_written = let bytes_written =
futures::io::copy(reader, &mut writer) futures::io::copy(reader, &mut writer)
@@ -208,11 +199,7 @@ impl WriteCache {
})?; })?;
// Must close to upload all data. // Must close to upload all data.
writer.close().await.context(error::UploadSnafu { writer.close().await.context(error::OpenDalSnafu)?;
region_id,
file_id,
file_type,
})?;
UPLOAD_BYTES_TOTAL.inc_by(bytes_written); UPLOAD_BYTES_TOTAL.inc_by(bytes_written);
@@ -328,7 +315,7 @@ mod tests {
.read(&write_cache.file_cache.cache_file_path(key)) .read(&write_cache.file_cache.cache_file_path(key))
.await .await
.unwrap(); .unwrap();
assert_eq!(remote_data.to_vec(), cache_data.to_vec()); assert_eq!(remote_data, cache_data);
// Check write cache contains the index key // Check write cache contains the index key
let index_key = IndexKey::new(region_id, file_id, FileType::Puffin); let index_key = IndexKey::new(region_id, file_id, FileType::Puffin);
@@ -339,7 +326,7 @@ mod tests {
.read(&write_cache.file_cache.cache_file_path(index_key)) .read(&write_cache.file_cache.cache_file_path(index_key))
.await .await
.unwrap(); .unwrap();
assert_eq!(remote_index_data.to_vec(), cache_index_data.to_vec()); assert_eq!(remote_index_data, cache_index_data);
} }
#[tokio::test] #[tokio::test]

View File

@@ -497,7 +497,7 @@ impl ManifestObjectStore {
} }
}; };
let checkpoint_metadata = CheckpointMetadata::decode(&last_checkpoint_data.to_vec())?; let checkpoint_metadata = CheckpointMetadata::decode(&last_checkpoint_data)?;
debug!( debug!(
"Load checkpoint in path: {}, metadata: {:?}", "Load checkpoint in path: {}, metadata: {:?}",
@@ -509,11 +509,7 @@ impl ManifestObjectStore {
#[cfg(test)] #[cfg(test)]
pub async fn read_file(&self, path: &str) -> Result<Vec<u8>> { pub async fn read_file(&self, path: &str) -> Result<Vec<u8>> {
self.object_store self.object_store.read(path).await.context(OpenDalSnafu)
.read(path)
.await
.context(OpenDalSnafu)
.map(|v| v.to_vec())
} }
#[cfg(test)] #[cfg(test)]

View File

@@ -121,17 +121,9 @@ impl SstIndexApplier {
return Ok(None); return Ok(None);
}; };
let Some(indexed_value) = file_cache
.get(IndexKey::new(self.region_id, file_id, FileType::Puffin))
.await
else {
return Ok(None);
};
Ok(file_cache Ok(file_cache
.reader(IndexKey::new(self.region_id, file_id, FileType::Puffin)) .reader(IndexKey::new(self.region_id, file_id, FileType::Puffin))
.await .await
.map(|v| v.into_futures_async_read(0..indexed_value.file_size as u64))
.map(PuffinFileReader::new)) .map(PuffinFileReader::new))
} }
@@ -198,13 +190,7 @@ mod tests {
let region_dir = "region_dir".to_string(); let region_dir = "region_dir".to_string();
let path = location::index_file_path(&region_dir, file_id); let path = location::index_file_path(&region_dir, file_id);
let mut puffin_writer = PuffinFileWriter::new( let mut puffin_writer = PuffinFileWriter::new(object_store.writer(&path).await.unwrap());
object_store
.writer(&path)
.await
.unwrap()
.into_futures_async_write(),
);
puffin_writer puffin_writer
.add_blob(Blob { .add_blob(Blob {
blob_type: INDEX_BLOB_TYPE.to_string(), blob_type: INDEX_BLOB_TYPE.to_string(),
@@ -250,13 +236,7 @@ mod tests {
let region_dir = "region_dir".to_string(); let region_dir = "region_dir".to_string();
let path = location::index_file_path(&region_dir, file_id); let path = location::index_file_path(&region_dir, file_id);
let mut puffin_writer = PuffinFileWriter::new( let mut puffin_writer = PuffinFileWriter::new(object_store.writer(&path).await.unwrap());
object_store
.writer(&path)
.await
.unwrap()
.into_futures_async_write(),
);
puffin_writer puffin_writer
.add_blob(Blob { .add_blob(Blob {
blob_type: "invalid_blob_type".to_string(), blob_type: "invalid_blob_type".to_string(),

View File

@@ -26,8 +26,6 @@ use crate::error::{OpenDalSnafu, Result};
/// A wrapper around [`ObjectStore`] that adds instrumentation for monitoring /// A wrapper around [`ObjectStore`] that adds instrumentation for monitoring
/// metrics such as bytes read, bytes written, and the number of seek operations. /// metrics such as bytes read, bytes written, and the number of seek operations.
///
/// TODO: Consider refactor InstrumentedStore to use async in trait instead of AsyncRead.
#[derive(Clone)] #[derive(Clone)]
pub(crate) struct InstrumentedStore { pub(crate) struct InstrumentedStore {
/// The underlying object store. /// The underlying object store.
@@ -60,14 +58,8 @@ impl InstrumentedStore {
read_byte_count: &'a IntCounter, read_byte_count: &'a IntCounter,
read_count: &'a IntCounter, read_count: &'a IntCounter,
seek_count: &'a IntCounter, seek_count: &'a IntCounter,
) -> Result<InstrumentedAsyncRead<'a, object_store::FuturesAsyncReader>> { ) -> Result<InstrumentedAsyncRead<'a, object_store::Reader>> {
let meta = self.object_store.stat(path).await.context(OpenDalSnafu)?; let reader = self.object_store.reader(path).await.context(OpenDalSnafu)?;
let reader = self
.object_store
.reader(path)
.await
.context(OpenDalSnafu)?
.into_futures_async_read(0..meta.content_length());
Ok(InstrumentedAsyncRead::new( Ok(InstrumentedAsyncRead::new(
reader, reader,
read_byte_count, read_byte_count,
@@ -85,21 +77,15 @@ impl InstrumentedStore {
write_byte_count: &'a IntCounter, write_byte_count: &'a IntCounter,
write_count: &'a IntCounter, write_count: &'a IntCounter,
flush_count: &'a IntCounter, flush_count: &'a IntCounter,
) -> Result<InstrumentedAsyncWrite<'a, object_store::FuturesAsyncWriter>> { ) -> Result<InstrumentedAsyncWrite<'a, object_store::Writer>> {
let writer = match self.write_buffer_size { let writer = match self.write_buffer_size {
Some(size) => self Some(size) => self
.object_store .object_store
.writer_with(path) .writer_with(path)
.chunk(size) .buffer(size)
.await .await
.context(OpenDalSnafu)? .context(OpenDalSnafu)?,
.into_futures_async_write(), None => self.object_store.writer(path).await.context(OpenDalSnafu)?,
None => self
.object_store
.writer(path)
.await
.context(OpenDalSnafu)?
.into_futures_async_write(),
}; };
Ok(InstrumentedAsyncWrite::new( Ok(InstrumentedAsyncWrite::new(
writer, writer,

View File

@@ -121,7 +121,7 @@ async fn fetch_ranges_seq(
.read_with(&file_path) .read_with(&file_path)
.range(range.start..range.end) .range(range.start..range.end)
.call()?; .call()?;
Ok::<_, object_store::Error>(data.to_bytes()) Ok::<_, object_store::Error>(Bytes::from(data))
}) })
.collect::<object_store::Result<Vec<_>>>() .collect::<object_store::Result<Vec<_>>>()
}; };
@@ -141,7 +141,7 @@ async fn fetch_ranges_concurrent(
let future_read = object_store.read_with(file_path); let future_read = object_store.read_with(file_path);
handles.push(async move { handles.push(async move {
let data = future_read.range(range.start..range.end).await?; let data = future_read.range(range.start..range.end).await?;
Ok::<_, object_store::Error>(data.to_bytes()) Ok::<_, object_store::Error>(Bytes::from(data))
}); });
} }
let results = futures::future::try_join_all(handles).await?; let results = futures::future::try_join_all(handles).await?;
@@ -164,7 +164,7 @@ where
} }
} }
// https://github.com/apache/opendal/blob/v0.46.0/core/src/raw/tokio_util.rs#L21-L24 // https://github.com/apache/incubator-opendal/blob/7144ab1ca2409dff0c324bfed062ce985997f8ce/core/src/raw/tokio_util.rs#L21-L23
/// Parse tokio error into opendal::Error. /// Parse tokio error into opendal::Error.
fn new_task_join_error(e: tokio::task::JoinError) -> object_store::Error { fn new_task_join_error(e: tokio::task::JoinError) -> object_store::Error {
object_store::Error::new(ErrorKind::Unexpected, "tokio task join failed").set_source(e) object_store::Error::new(ErrorKind::Unexpected, "tokio task join failed").set_source(e)

View File

@@ -85,8 +85,7 @@ impl<'a> MetadataLoader<'a> {
.read_with(path) .read_with(path)
.range(buffer_start..file_size) .range(buffer_start..file_size)
.await .await
.context(error::OpenDalSnafu)? .context(error::OpenDalSnafu)?;
.to_vec();
let buffer_len = buffer.len(); let buffer_len = buffer.len();
let mut footer = [0; 8]; let mut footer = [0; 8];
@@ -130,8 +129,7 @@ impl<'a> MetadataLoader<'a> {
.read_with(path) .read_with(path)
.range(metadata_start..(file_size - FOOTER_SIZE as u64)) .range(metadata_start..(file_size - FOOTER_SIZE as u64))
.await .await
.context(error::OpenDalSnafu)? .context(error::OpenDalSnafu)?;
.to_vec();
let metadata = decode_metadata(&data).map_err(|e| { let metadata = decode_metadata(&data).map_err(|e| {
error::InvalidParquetSnafu { error::InvalidParquetSnafu {

View File

@@ -16,7 +16,6 @@
use std::time::Duration; use std::time::Duration;
use bytes::Bytes;
use common_telemetry::{error, info, warn}; use common_telemetry::{error, info, warn};
use futures::TryStreamExt; use futures::TryStreamExt;
use object_store::util::join_path; use object_store::util::join_path;
@@ -51,7 +50,7 @@ impl<S> RegionWorkerLoop<S> {
region region
.access_layer .access_layer
.object_store() .object_store()
.write(&marker_path, Bytes::new()) .write(&marker_path, vec![])
.await .await
.context(OpenDalSnafu) .context(OpenDalSnafu)
.inspect_err(|e| { .inspect_err(|e| {

View File

@@ -11,21 +11,23 @@ workspace = true
services-memory = ["opendal/services-memory"] services-memory = ["opendal/services-memory"]
[dependencies] [dependencies]
async-trait = "0.1"
bytes.workspace = true bytes.workspace = true
common-telemetry.workspace = true common-telemetry.workspace = true
futures.workspace = true futures.workspace = true
lazy_static.workspace = true lazy_static.workspace = true
md5 = "0.7" md5 = "0.7"
moka = { workspace = true, features = ["future"] } moka = { workspace = true, features = ["future"] }
opendal = { version = "0.46", features = [ opendal = { version = "0.45", features = [
"layers-tracing", "layers-tracing",
"rustls",
"services-azblob", "services-azblob",
"services-fs", "services-fs",
"services-gcs", "services-gcs",
"services-http", "services-http",
"services-oss", "services-oss",
"services-s3", "services-s3",
] } ], default-features = false }
prometheus.workspace = true prometheus.workspace = true
uuid.workspace = true uuid.workspace = true
@@ -33,4 +35,5 @@ uuid.workspace = true
anyhow = "1.0" anyhow = "1.0"
common-telemetry.workspace = true common-telemetry.workspace = true
common-test-util.workspace = true common-test-util.workspace = true
opendal = { version = "0.45", features = ["services-memory"] }
tokio.workspace = true tokio.workspace = true

View File

@@ -14,26 +14,27 @@
use std::sync::Arc; use std::sync::Arc;
use opendal::raw::oio::ReadDyn; use async_trait::async_trait;
use opendal::raw::oio::Read;
use opendal::raw::{ use opendal::raw::{
Access, Layer, LayeredAccess, OpDelete, OpList, OpRead, OpWrite, RpDelete, RpList, RpRead, Accessor, Layer, LayeredAccessor, OpDelete, OpList, OpRead, OpWrite, RpDelete, RpList, RpRead,
RpWrite, RpWrite,
}; };
use opendal::{Operator, Result}; use opendal::Result;
mod read_cache; mod read_cache;
use common_telemetry::info; use common_telemetry::info;
use read_cache::ReadCache; use read_cache::ReadCache;
/// An opendal layer with local LRU file cache supporting. /// An opendal layer with local LRU file cache supporting.
#[derive(Clone)] #[derive(Clone)]
pub struct LruCacheLayer { pub struct LruCacheLayer<C: Clone> {
// The read cache // The read cache
read_cache: ReadCache, read_cache: ReadCache<C>,
} }
impl LruCacheLayer { impl<C: Accessor + Clone> LruCacheLayer<C> {
/// Create a `[LruCacheLayer]` with local file cache and capacity in bytes. /// Create a `[LruCacheLayer]` with local file cache and capacity in bytes.
pub async fn new(file_cache: Operator, capacity: usize) -> Result<Self> { pub async fn new(file_cache: Arc<C>, capacity: usize) -> Result<Self> {
let read_cache = ReadCache::new(file_cache, capacity); let read_cache = ReadCache::new(file_cache, capacity);
let (entries, bytes) = read_cache.recover_cache().await?; let (entries, bytes) = read_cache.recover_cache().await?;
@@ -56,11 +57,11 @@ impl LruCacheLayer {
} }
} }
impl<I: Access> Layer<I> for LruCacheLayer { impl<I: Accessor, C: Accessor + Clone> Layer<I> for LruCacheLayer<C> {
type LayeredAccess = LruCacheAccess<I>; type LayeredAccessor = LruCacheAccessor<I, C>;
fn layer(&self, inner: I) -> Self::LayeredAccess { fn layer(&self, inner: I) -> Self::LayeredAccessor {
LruCacheAccess { LruCacheAccessor {
inner, inner,
read_cache: self.read_cache.clone(), read_cache: self.read_cache.clone(),
} }
@@ -68,14 +69,15 @@ impl<I: Access> Layer<I> for LruCacheLayer {
} }
#[derive(Debug)] #[derive(Debug)]
pub struct LruCacheAccess<I> { pub struct LruCacheAccessor<I, C: Clone> {
inner: I, inner: I,
read_cache: ReadCache, read_cache: ReadCache<C>,
} }
impl<I: Access> LayeredAccess for LruCacheAccess<I> { #[async_trait]
impl<I: Accessor, C: Accessor + Clone> LayeredAccessor for LruCacheAccessor<I, C> {
type Inner = I; type Inner = I;
type Reader = Arc<dyn ReadDyn>; type Reader = Box<dyn Read>;
type BlockingReader = I::BlockingReader; type BlockingReader = I::BlockingReader;
type Writer = I::Writer; type Writer = I::Writer;
type BlockingWriter = I::BlockingWriter; type BlockingWriter = I::BlockingWriter;

View File

@@ -15,12 +15,12 @@
use std::sync::Arc; use std::sync::Arc;
use common_telemetry::debug; use common_telemetry::debug;
use futures::{FutureExt, StreamExt}; use futures::FutureExt;
use moka::future::Cache; use moka::future::Cache;
use moka::notification::ListenerFuture; use moka::notification::ListenerFuture;
use opendal::raw::oio::{Read, ReadDyn, Reader}; use opendal::raw::oio::{ListExt, Read, ReadExt, Reader, WriteExt};
use opendal::raw::{Access, BytesRange, OpRead, OpStat, RpRead}; use opendal::raw::{Accessor, OpDelete, OpList, OpRead, OpStat, OpWrite, RpRead};
use opendal::{Buffer, Error as OpendalError, ErrorKind, Operator, Result}; use opendal::{Error as OpendalError, ErrorKind, Result};
use crate::metrics::{ use crate::metrics::{
OBJECT_STORE_LRU_CACHE_BYTES, OBJECT_STORE_LRU_CACHE_ENTRIES, OBJECT_STORE_LRU_CACHE_HIT, OBJECT_STORE_LRU_CACHE_BYTES, OBJECT_STORE_LRU_CACHE_ENTRIES, OBJECT_STORE_LRU_CACHE_HIT,
@@ -52,22 +52,26 @@ fn can_cache(path: &str) -> bool {
} }
/// Generate an unique cache key for the read path and range. /// Generate an unique cache key for the read path and range.
fn read_cache_key(path: &str, range: BytesRange) -> String { fn read_cache_key(path: &str, args: &OpRead) -> String {
format!("{:x}.cache-{}", md5::compute(path), range.to_header()) format!(
"{:x}.cache-{}",
md5::compute(path),
args.range().to_header()
)
} }
/// Local read cache for files in object storage /// Local read cache for files in object storage
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub(crate) struct ReadCache { pub(crate) struct ReadCache<C: Clone> {
/// Local file cache backend /// Local file cache backend
file_cache: Operator, file_cache: Arc<C>,
/// Local memory cache to track local cache files /// Local memory cache to track local cache files
mem_cache: Cache<String, ReadResult>, mem_cache: Cache<String, ReadResult>,
} }
impl ReadCache { impl<C: Accessor + Clone> ReadCache<C> {
/// Create a [`ReadCache`] with capacity in bytes. /// Create a [`ReadCache`] with capacity in bytes.
pub(crate) fn new(file_cache: Operator, capacity: usize) -> Self { pub(crate) fn new(file_cache: Arc<C>, capacity: usize) -> Self {
let file_cache_cloned = file_cache.clone(); let file_cache_cloned = file_cache.clone();
let eviction_listener = let eviction_listener =
move |read_key: Arc<String>, read_result: ReadResult, cause| -> ListenerFuture { move |read_key: Arc<String>, read_result: ReadResult, cause| -> ListenerFuture {
@@ -79,7 +83,7 @@ impl ReadCache {
if let ReadResult::Success(size) = read_result { if let ReadResult::Success(size) = read_result {
OBJECT_STORE_LRU_CACHE_BYTES.sub(size as i64); OBJECT_STORE_LRU_CACHE_BYTES.sub(size as i64);
let result = file_cache_cloned.delete(&read_key).await; let result = file_cache_cloned.delete(&read_key, OpDelete::new()).await;
debug!( debug!(
"Deleted local cache file `{}`, result: {:?}, cause: {:?}.", "Deleted local cache file `{}`, result: {:?}, cause: {:?}.",
read_key, result, cause read_key, result, cause
@@ -129,17 +133,17 @@ impl ReadCache {
/// Recover existing cache items from `file_cache` to `mem_cache`. /// Recover existing cache items from `file_cache` to `mem_cache`.
/// Return entry count and total approximate entry size in bytes. /// Return entry count and total approximate entry size in bytes.
pub(crate) async fn recover_cache(&self) -> Result<(u64, u64)> { pub(crate) async fn recover_cache(&self) -> Result<(u64, u64)> {
let mut pager = self.file_cache.lister("/").await?; let (_, mut pager) = self.file_cache.list("/", OpList::default()).await?;
while let Some(entry) = pager.next().await.transpose()? { while let Some(entry) = pager.next().await? {
let read_key = entry.path(); let read_key = entry.path();
// We can't retrieve the metadata from `[opendal::raw::oio::Entry]` directly, // We can't retrieve the metadata from `[opendal::raw::oio::Entry]` directly,
// because it's private field. // because it's private field.
let size = { let size = {
let stat = self.file_cache.stat(read_key).await?; let stat = self.file_cache.stat(read_key, OpStat::default()).await?;
stat.content_length() stat.into_metadata().content_length()
}; };
OBJECT_STORE_LRU_CACHE_ENTRIES.inc(); OBJECT_STORE_LRU_CACHE_ENTRIES.inc();
@@ -155,7 +159,8 @@ impl ReadCache {
/// Returns true when the read cache contains the specific file. /// Returns true when the read cache contains the specific file.
pub(crate) async fn contains_file(&self, path: &str) -> bool { pub(crate) async fn contains_file(&self, path: &str) -> bool {
self.mem_cache.run_pending_tasks().await; self.mem_cache.run_pending_tasks().await;
self.mem_cache.contains_key(path) && self.file_cache.stat(path).await.is_ok() self.mem_cache.contains_key(path)
&& self.file_cache.stat(path, OpStat::default()).await.is_ok()
} }
/// Read from a specific path using the OpRead operation. /// Read from a specific path using the OpRead operation.
@@ -168,54 +173,86 @@ impl ReadCache {
inner: &I, inner: &I,
path: &str, path: &str,
args: OpRead, args: OpRead,
) -> Result<(RpRead, Arc<dyn ReadDyn>)> ) -> Result<(RpRead, Box<dyn Read>)>
where where
I: Access, I: Accessor,
{ {
if !can_cache(path) { if !can_cache(path) {
return inner.read(path, args).await.map(to_output_reader); return inner.read(path, args).await.map(to_output_reader);
} }
// FIXME: remove this block after opendal v0.47 released. let read_key = read_cache_key(path, &args);
let meta = inner.stat(path, OpStat::new()).await?;
let (rp, reader) = inner.read(path, args).await?; let read_result = self
let reader: ReadCacheReader<I> = ReadCacheReader { .mem_cache
path: Arc::new(path.to_string()), .try_get_with(
inner_reader: reader, read_key.clone(),
size: meta.into_metadata().content_length(), self.read_remote(inner, &read_key, path, args.clone()),
file_cache: self.file_cache.clone(), )
mem_cache: self.mem_cache.clone(), .await
}; .map_err(|e| OpendalError::new(e.kind(), &e.to_string()))?;
Ok((rp, Arc::new(reader)))
match read_result {
ReadResult::Success(_) => {
// There is a concurrent issue here, the local cache may be purged
// while reading, we have to fallback to remote read
match self.file_cache.read(&read_key, OpRead::default()).await {
Ok(ret) => {
OBJECT_STORE_LRU_CACHE_HIT
.with_label_values(&["success"])
.inc();
Ok(to_output_reader(ret))
} }
} Err(_) => {
OBJECT_STORE_LRU_CACHE_MISS.inc();
inner.read(path, args).await.map(to_output_reader)
}
}
}
ReadResult::NotFound => {
OBJECT_STORE_LRU_CACHE_HIT
.with_label_values(&["not_found"])
.inc();
pub struct ReadCacheReader<I: Access> { Err(OpendalError::new(
/// Path of the file ErrorKind::NotFound,
path: Arc<String>, &format!("File not found: {path}"),
/// Remote file reader. ))
inner_reader: I::Reader, }
/// FIXME: remove this field after opendal v0.47 released. }
/// }
/// OpenDAL's read_at takes `offset, limit` which means the underlying storage
/// services could return less data than limit. We store size here as a workaround.
///
/// This API has been refactor into `offset, size` instead. After opendal v0.47 released,
/// we don't need this anymore.
size: u64,
/// Local file cache backend
file_cache: Operator,
/// Local memory cache to track local cache files
mem_cache: Cache<String, ReadResult>,
}
impl<I: Access> ReadCacheReader<I> { async fn try_write_cache<I>(&self, mut reader: I::Reader, read_key: &str) -> Result<usize>
/// TODO: we can return the Buffer directly to avoid another read from cache. where
async fn read_remote(&self, offset: u64, limit: usize) -> Result<ReadResult> { I: Accessor,
{
let (_, mut writer) = self.file_cache.write(read_key, OpWrite::new()).await?;
let mut total = 0;
while let Some(bytes) = reader.next().await {
let bytes = &bytes?;
total += bytes.len();
writer.write(bytes).await?;
}
// Call `close` to ensure data is written.
writer.close().await?;
Ok(total)
}
/// Read the file from remote storage. If success, write the content into local cache.
async fn read_remote<I>(
&self,
inner: &I,
read_key: &str,
path: &str,
args: OpRead,
) -> Result<ReadResult>
where
I: Accessor,
{
OBJECT_STORE_LRU_CACHE_MISS.inc(); OBJECT_STORE_LRU_CACHE_MISS.inc();
let buf = self.inner_reader.read_at(offset, limit).await?; let (_, reader) = inner.read(path, args).await?;
let result = self.try_write_cache(buf, offset).await; let result = self.try_write_cache::<I>(reader, read_key).await;
match result { match result {
Ok(read_bytes) => { Ok(read_bytes) => {
@@ -242,59 +279,10 @@ impl<I: Access> ReadCacheReader<I> {
} }
} }
} }
async fn try_write_cache(&self, buf: Buffer, offset: u64) -> Result<usize> {
let size = buf.len();
let read_key = read_cache_key(&self.path, BytesRange::new(offset, Some(size as _)));
self.file_cache.write(&read_key, buf).await?;
Ok(size)
}
}
impl<I: Access> Read for ReadCacheReader<I> {
async fn read_at(&self, offset: u64, limit: usize) -> Result<Buffer> {
let size = self.size.min(offset + limit as u64) - offset;
let read_key = read_cache_key(&self.path, BytesRange::new(offset, Some(size as _)));
let read_result = self
.mem_cache
.try_get_with(read_key.clone(), self.read_remote(offset, limit))
.await
.map_err(|e| OpendalError::new(e.kind(), &e.to_string()))?;
match read_result {
ReadResult::Success(_) => {
// There is a concurrent issue here, the local cache may be purged
// while reading, we have to fallback to remote read
match self.file_cache.read(&read_key).await {
Ok(ret) => {
OBJECT_STORE_LRU_CACHE_HIT
.with_label_values(&["success"])
.inc();
Ok(ret)
}
Err(_) => {
OBJECT_STORE_LRU_CACHE_MISS.inc();
self.inner_reader.read_at(offset, limit).await
}
}
}
ReadResult::NotFound => {
OBJECT_STORE_LRU_CACHE_HIT
.with_label_values(&["not_found"])
.inc();
Err(OpendalError::new(
ErrorKind::NotFound,
&format!("File not found: {}", self.path),
))
}
}
}
} }
fn to_output_reader<R: Read + 'static>(input: (RpRead, R)) -> (RpRead, Reader) { fn to_output_reader<R: Read + 'static>(input: (RpRead, R)) -> (RpRead, Reader) {
(input.0, Arc::new(input.1)) (input.0, Box::new(input.1))
} }
#[cfg(test)] #[cfg(test)]

View File

@@ -15,11 +15,16 @@
//! code originally from <https://github.com/apache/incubator-opendal/blob/main/core/src/layers/prometheus.rs>, make a tiny change to avoid crash in multi thread env //! code originally from <https://github.com/apache/incubator-opendal/blob/main/core/src/layers/prometheus.rs>, make a tiny change to avoid crash in multi thread env
use std::fmt::{Debug, Formatter}; use std::fmt::{Debug, Formatter};
use std::io;
use std::task::{Context, Poll};
use async_trait::async_trait;
use bytes::Bytes;
use common_telemetry::debug; use common_telemetry::debug;
use futures::FutureExt;
use lazy_static::lazy_static; use lazy_static::lazy_static;
use opendal::raw::*; use opendal::raw::*;
use opendal::{Buffer, ErrorKind}; use opendal::ErrorKind;
use prometheus::{ use prometheus::{
exponential_buckets, histogram_opts, register_histogram_vec, register_int_counter_vec, exponential_buckets, histogram_opts, register_histogram_vec, register_int_counter_vec,
Histogram, HistogramTimer, HistogramVec, IntCounterVec, Histogram, HistogramTimer, HistogramVec, IntCounterVec,
@@ -84,14 +89,14 @@ fn increment_errors_total(op: Operation, kind: ErrorKind) {
#[derive(Default, Debug, Clone)] #[derive(Default, Debug, Clone)]
pub struct PrometheusMetricsLayer; pub struct PrometheusMetricsLayer;
impl<A: Access> Layer<A> for PrometheusMetricsLayer { impl<A: Accessor> Layer<A> for PrometheusMetricsLayer {
type LayeredAccess = PrometheusAccess<A>; type LayeredAccessor = PrometheusAccessor<A>;
fn layer(&self, inner: A) -> Self::LayeredAccess { fn layer(&self, inner: A) -> Self::LayeredAccessor {
let meta = inner.info(); let meta = inner.info();
let scheme = meta.scheme(); let scheme = meta.scheme();
PrometheusAccess { PrometheusAccessor {
inner, inner,
scheme: scheme.to_string(), scheme: scheme.to_string(),
} }
@@ -99,12 +104,12 @@ impl<A: Access> Layer<A> for PrometheusMetricsLayer {
} }
#[derive(Clone)] #[derive(Clone)]
pub struct PrometheusAccess<A: Access> { pub struct PrometheusAccessor<A: Accessor> {
inner: A, inner: A,
scheme: String, scheme: String,
} }
impl<A: Access> Debug for PrometheusAccess<A> { impl<A: Accessor> Debug for PrometheusAccessor<A> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("PrometheusAccessor") f.debug_struct("PrometheusAccessor")
.field("inner", &self.inner) .field("inner", &self.inner)
@@ -112,7 +117,8 @@ impl<A: Access> Debug for PrometheusAccess<A> {
} }
} }
impl<A: Access> LayeredAccess for PrometheusAccess<A> { #[async_trait]
impl<A: Accessor> LayeredAccessor for PrometheusAccessor<A> {
type Inner = A; type Inner = A;
type Reader = PrometheusMetricWrapper<A::Reader>; type Reader = PrometheusMetricWrapper<A::Reader>;
type BlockingReader = PrometheusMetricWrapper<A::BlockingReader>; type BlockingReader = PrometheusMetricWrapper<A::BlockingReader>;
@@ -151,20 +157,27 @@ impl<A: Access> LayeredAccess for PrometheusAccess<A> {
.with_label_values(&[&self.scheme, Operation::Read.into_static()]) .with_label_values(&[&self.scheme, Operation::Read.into_static()])
.start_timer(); .start_timer();
let (rp, r) = self.inner.read(path, args).await.map_err(|e| { self.inner
increment_errors_total(Operation::Read, e.kind()); .read(path, args)
e .map(|v| {
})?; v.map(|(rp, r)| {
(
Ok((
rp, rp,
PrometheusMetricWrapper::new( PrometheusMetricWrapper::new(
r, r,
Operation::Read, Operation::Read,
BYTES_TOTAL.with_label_values(&[&self.scheme, Operation::Read.into_static()]), BYTES_TOTAL
.with_label_values(&[&self.scheme, Operation::Read.into_static()]),
timer, timer,
), ),
)) )
})
})
.await
.map_err(|e| {
increment_errors_total(Operation::Read, e.kind());
e
})
} }
async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
@@ -176,20 +189,27 @@ impl<A: Access> LayeredAccess for PrometheusAccess<A> {
.with_label_values(&[&self.scheme, Operation::Write.into_static()]) .with_label_values(&[&self.scheme, Operation::Write.into_static()])
.start_timer(); .start_timer();
let (rp, r) = self.inner.write(path, args).await.map_err(|e| { self.inner
increment_errors_total(Operation::Write, e.kind()); .write(path, args)
e .map(|v| {
})?; v.map(|(rp, r)| {
(
Ok((
rp, rp,
PrometheusMetricWrapper::new( PrometheusMetricWrapper::new(
r, r,
Operation::Write, Operation::Write,
BYTES_TOTAL.with_label_values(&[&self.scheme, Operation::Write.into_static()]), BYTES_TOTAL
.with_label_values(&[&self.scheme, Operation::Write.into_static()]),
timer, timer,
), ),
)) )
})
})
.await
.map_err(|e| {
increment_errors_total(Operation::Write, e.kind());
e
})
} }
async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> { async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
@@ -441,46 +461,103 @@ impl<R> PrometheusMetricWrapper<R> {
} }
impl<R: oio::Read> oio::Read for PrometheusMetricWrapper<R> { impl<R: oio::Read> oio::Read for PrometheusMetricWrapper<R> {
async fn read_at(&self, offset: u64, limit: usize) -> Result<Buffer> { fn poll_read(&mut self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<Result<usize>> {
self.inner.read_at(offset, limit).await.map_err(|err| { self.inner.poll_read(cx, buf).map(|res| match res {
increment_errors_total(self.op, err.kind()); Ok(bytes) => {
err self.bytes += bytes as u64;
Ok(bytes)
}
Err(e) => {
increment_errors_total(self.op, e.kind());
Err(e)
}
})
}
fn poll_seek(&mut self, cx: &mut Context<'_>, pos: io::SeekFrom) -> Poll<Result<u64>> {
self.inner.poll_seek(cx, pos).map(|res| match res {
Ok(n) => Ok(n),
Err(e) => {
increment_errors_total(self.op, e.kind());
Err(e)
}
})
}
fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<Bytes>>> {
self.inner.poll_next(cx).map(|res| match res {
Some(Ok(bytes)) => {
self.bytes += bytes.len() as u64;
Some(Ok(bytes))
}
Some(Err(e)) => {
increment_errors_total(self.op, e.kind());
Some(Err(e))
}
None => None,
}) })
} }
} }
impl<R: oio::BlockingRead> oio::BlockingRead for PrometheusMetricWrapper<R> { impl<R: oio::BlockingRead> oio::BlockingRead for PrometheusMetricWrapper<R> {
fn read_at(&self, offset: u64, limit: usize) -> opendal::Result<Buffer> { fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
self.inner.read_at(offset, limit).map_err(|err| { self.inner
.read(buf)
.map(|n| {
self.bytes += n as u64;
n
})
.map_err(|e| {
increment_errors_total(self.op, e.kind());
e
})
}
fn seek(&mut self, pos: io::SeekFrom) -> Result<u64> {
self.inner.seek(pos).map_err(|err| {
increment_errors_total(self.op, err.kind()); increment_errors_total(self.op, err.kind());
err err
}) })
} }
fn next(&mut self) -> Option<Result<Bytes>> {
self.inner.next().map(|res| match res {
Ok(bytes) => {
self.bytes += bytes.len() as u64;
Ok(bytes)
}
Err(e) => {
increment_errors_total(self.op, e.kind());
Err(e)
}
})
}
} }
#[async_trait]
impl<R: oio::Write> oio::Write for PrometheusMetricWrapper<R> { impl<R: oio::Write> oio::Write for PrometheusMetricWrapper<R> {
async fn write(&mut self, bs: Buffer) -> Result<usize> { fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Poll<Result<usize>> {
match self.inner.write(bs).await { self.inner
Ok(n) => { .poll_write(cx, bs)
.map_ok(|n| {
self.bytes += n as u64; self.bytes += n as u64;
Ok(n) n
} })
Err(err) => { .map_err(|err| {
increment_errors_total(self.op, err.kind());
Err(err)
}
}
}
async fn close(&mut self) -> Result<()> {
self.inner.close().await.map_err(|err| {
increment_errors_total(self.op, err.kind()); increment_errors_total(self.op, err.kind());
err err
}) })
} }
async fn abort(&mut self) -> Result<()> { fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
self.inner.close().await.map_err(|err| { self.inner.poll_abort(cx).map_err(|err| {
increment_errors_total(self.op, err.kind());
err
})
}
fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
self.inner.poll_close(cx).map_err(|err| {
increment_errors_total(self.op, err.kind()); increment_errors_total(self.op, err.kind());
err err
}) })
@@ -488,7 +565,7 @@ impl<R: oio::Write> oio::Write for PrometheusMetricWrapper<R> {
} }
impl<R: oio::BlockingWrite> oio::BlockingWrite for PrometheusMetricWrapper<R> { impl<R: oio::BlockingWrite> oio::BlockingWrite for PrometheusMetricWrapper<R> {
fn write(&mut self, bs: Buffer) -> Result<usize> { fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
self.inner self.inner
.write(bs) .write(bs)
.map(|n| { .map(|n| {

View File

@@ -14,9 +14,8 @@
pub use opendal::raw::{normalize_path as raw_normalize_path, HttpClient}; pub use opendal::raw::{normalize_path as raw_normalize_path, HttpClient};
pub use opendal::{ pub use opendal::{
services, Builder as ObjectStoreBuilder, Entry, EntryMode, Error, ErrorKind, services, Builder as ObjectStoreBuilder, Entry, EntryMode, Error, ErrorKind, Lister, Metakey,
FuturesAsyncReader, FuturesAsyncWriter, Lister, Metakey, Operator as ObjectStore, Reader, Operator as ObjectStore, Reader, Result, Writer,
Result, Writer,
}; };
pub mod layers; pub mod layers;

View File

@@ -22,6 +22,7 @@ use object_store::layers::LruCacheLayer;
use object_store::services::{Fs, S3}; use object_store::services::{Fs, S3};
use object_store::test_util::TempFolder; use object_store::test_util::TempFolder;
use object_store::{ObjectStore, ObjectStoreBuilder}; use object_store::{ObjectStore, ObjectStoreBuilder};
use opendal::raw::Accessor;
use opendal::services::{Azblob, Gcs, Oss}; use opendal::services::{Azblob, Gcs, Oss};
use opendal::{EntryMode, Operator, OperatorBuilder}; use opendal::{EntryMode, Operator, OperatorBuilder};
@@ -35,11 +36,11 @@ async fn test_object_crud(store: &ObjectStore) -> Result<()> {
// Read data from object; // Read data from object;
let bs = store.read(file_name).await?; let bs = store.read(file_name).await?;
assert_eq!("Hello, World!", String::from_utf8(bs.to_vec())?); assert_eq!("Hello, World!", String::from_utf8(bs)?);
// Read range from object; // Read range from object;
let bs = store.read_with(file_name).range(1..=11).await?; let bs = store.read_with(file_name).range(1..=11).await?;
assert_eq!("ello, World", String::from_utf8(bs.to_vec())?); assert_eq!("ello, World", String::from_utf8(bs)?);
// Get object's Metadata // Get object's Metadata
let meta = store.stat(file_name).await?; let meta = store.stat(file_name).await?;
@@ -76,7 +77,7 @@ async fn test_object_list(store: &ObjectStore) -> Result<()> {
assert_eq!(p2, entries.first().unwrap().path()); assert_eq!(p2, entries.first().unwrap().path());
let content = store.read(p2).await?; let content = store.read(p2).await?;
assert_eq!("Hello, object2!", String::from_utf8(content.to_vec())?); assert_eq!("Hello, object2!", String::from_utf8(content)?);
store.delete(p2).await?; store.delete(p2).await?;
let entries = store.list("/").await?; let entries = store.list("/").await?;
@@ -235,9 +236,11 @@ async fn test_file_backend_with_lru_cache() -> Result<()> {
let _ = builder let _ = builder
.root(&cache_dir.path().to_string_lossy()) .root(&cache_dir.path().to_string_lossy())
.atomic_write_dir(&cache_dir.path().to_string_lossy()); .atomic_write_dir(&cache_dir.path().to_string_lossy());
let file_cache = Operator::new(builder).unwrap().finish(); let file_cache = Arc::new(builder.build().unwrap());
LruCacheLayer::new(file_cache, 32).await.unwrap() LruCacheLayer::new(Arc::new(file_cache.clone()), 32)
.await
.unwrap()
}; };
let store = store.layer(cache_layer.clone()); let store = store.layer(cache_layer.clone());
@@ -250,7 +253,10 @@ async fn test_file_backend_with_lru_cache() -> Result<()> {
Ok(()) Ok(())
} }
async fn assert_lru_cache(cache_layer: &LruCacheLayer, file_names: &[&str]) { async fn assert_lru_cache<C: Accessor + Clone>(
cache_layer: &LruCacheLayer<C>,
file_names: &[&str],
) {
for file_name in file_names { for file_name in file_names {
assert!(cache_layer.contains_file(file_name).await); assert!(cache_layer.contains_file(file_name).await);
} }
@@ -272,7 +278,7 @@ async fn assert_cache_files(
let bs = store.read(o.path()).await.unwrap(); let bs = store.read(o.path()).await.unwrap();
assert_eq!( assert_eq!(
file_contents[position], file_contents[position],
String::from_utf8(bs.to_vec())?, String::from_utf8(bs.clone())?,
"file content not match: {}", "file content not match: {}",
o.name() o.name()
); );
@@ -306,7 +312,9 @@ async fn test_object_store_cache_policy() -> Result<()> {
let cache_store = OperatorBuilder::new(file_cache.clone()).finish(); let cache_store = OperatorBuilder::new(file_cache.clone()).finish();
// create operator for cache dir to verify cache file // create operator for cache dir to verify cache file
let cache_layer = LruCacheLayer::new(cache_store.clone(), 38).await.unwrap(); let cache_layer = LruCacheLayer::new(Arc::new(file_cache.clone()), 38)
.await
.unwrap();
let store = store.layer(cache_layer.clone()); let store = store.layer(cache_layer.clone());
// create several object handler. // create several object handler.
@@ -378,7 +386,7 @@ async fn test_object_store_cache_policy() -> Result<()> {
// instead of returning `NotFound` during the reader creation. // instead of returning `NotFound` during the reader creation.
// The entry count is 4, because we have the p2 `NotFound` cache. // The entry count is 4, because we have the p2 `NotFound` cache.
assert!(store.read_with(p2).range(0..4).await.is_err()); assert!(store.read_with(p2).range(0..4).await.is_err());
assert_eq!(cache_layer.read_cache_stat().await, (3, 35)); assert_eq!(cache_layer.read_cache_stat().await, (4, 35));
assert_cache_files( assert_cache_files(
&cache_store, &cache_store,
@@ -406,7 +414,7 @@ async fn test_object_store_cache_policy() -> Result<()> {
assert!(store.read(p2).await.is_err()); assert!(store.read(p2).await.is_err());
// Read p1 with range `1..` , the existing p1 with range `0..` must be evicted. // Read p1 with range `1..` , the existing p1 with range `0..` must be evicted.
let _ = store.read_with(p1).range(1..15).await.unwrap(); let _ = store.read_with(p1).range(1..15).await.unwrap();
assert_eq!(cache_layer.read_cache_stat().await, (3, 34)); assert_eq!(cache_layer.read_cache_stat().await, (4, 34));
assert_cache_files( assert_cache_files(
&cache_store, &cache_store,
&[ &[
@@ -434,7 +442,7 @@ async fn test_object_store_cache_policy() -> Result<()> {
drop(cache_layer); drop(cache_layer);
// Test recover // Test recover
let cache_layer = LruCacheLayer::new(cache_store, 38).await.unwrap(); let cache_layer = LruCacheLayer::new(Arc::new(file_cache), 38).await.unwrap();
// The p2 `NotFound` cache will not be recovered // The p2 `NotFound` cache will not be recovered
assert_eq!(cache_layer.read_cache_stat().await, (3, 34)); assert_eq!(cache_layer.read_cache_stat().await, (3, 34));

View File

@@ -56,7 +56,6 @@ store-api.workspace = true
substrait.workspace = true substrait.workspace = true
table.workspace = true table.workspace = true
tokio.workspace = true tokio.workspace = true
tokio-util.workspace = true
tonic.workspace = true tonic.workspace = true
[dev-dependencies] [dev-dependencies]

View File

@@ -20,7 +20,7 @@ use client::{Output, OutputData, OutputMeta};
use common_base::readable_size::ReadableSize; use common_base::readable_size::ReadableSize;
use common_datasource::file_format::csv::{CsvConfigBuilder, CsvFormat, CsvOpener}; use common_datasource::file_format::csv::{CsvConfigBuilder, CsvFormat, CsvOpener};
use common_datasource::file_format::json::{JsonFormat, JsonOpener}; use common_datasource::file_format::json::{JsonFormat, JsonOpener};
use common_datasource::file_format::orc::{infer_orc_schema, new_orc_stream_reader, ReaderAdapter}; use common_datasource::file_format::orc::{infer_orc_schema, new_orc_stream_reader};
use common_datasource::file_format::{FileFormat, Format}; use common_datasource::file_format::{FileFormat, Format};
use common_datasource::lister::{Lister, Source}; use common_datasource::lister::{Lister, Source};
use common_datasource::object_store::{build_backend, parse_url}; use common_datasource::object_store::{build_backend, parse_url};
@@ -46,7 +46,6 @@ use session::context::QueryContextRef;
use snafu::ResultExt; use snafu::ResultExt;
use table::requests::{CopyTableRequest, InsertRequest}; use table::requests::{CopyTableRequest, InsertRequest};
use table::table_reference::TableReference; use table::table_reference::TableReference;
use tokio_util::compat::FuturesAsyncReadCompatExt;
use crate::error::{self, IntoVectorsSnafu, Result}; use crate::error::{self, IntoVectorsSnafu, Result};
use crate::statement::StatementExecutor; use crate::statement::StatementExecutor;
@@ -147,16 +146,10 @@ impl StatementExecutor {
path, path,
}), }),
Format::Parquet(_) => { Format::Parquet(_) => {
let meta = object_store
.stat(&path)
.await
.context(error::ReadObjectSnafu { path: &path })?;
let mut reader = object_store let mut reader = object_store
.reader(&path) .reader(&path)
.await .await
.context(error::ReadObjectSnafu { path: &path })? .context(error::ReadObjectSnafu { path: &path })?;
.into_futures_async_read(0..meta.content_length())
.compat();
let metadata = ArrowReaderMetadata::load_async(&mut reader, Default::default()) let metadata = ArrowReaderMetadata::load_async(&mut reader, Default::default())
.await .await
.context(error::ReadParquetMetadataSnafu)?; .context(error::ReadParquetMetadataSnafu)?;
@@ -168,17 +161,12 @@ impl StatementExecutor {
}) })
} }
Format::Orc(_) => { Format::Orc(_) => {
let meta = object_store
.stat(&path)
.await
.context(error::ReadObjectSnafu { path: &path })?;
let reader = object_store let reader = object_store
.reader(&path) .reader(&path)
.await .await
.context(error::ReadObjectSnafu { path: &path })?; .context(error::ReadObjectSnafu { path: &path })?;
let schema = infer_orc_schema(ReaderAdapter::new(reader, meta.content_length())) let schema = infer_orc_schema(reader)
.await .await
.context(error::ReadOrcSnafu)?; .context(error::ReadOrcSnafu)?;
@@ -291,17 +279,11 @@ impl StatementExecutor {
))) )))
} }
FileMetadata::Parquet { metadata, path, .. } => { FileMetadata::Parquet { metadata, path, .. } => {
let meta = object_store
.stat(path)
.await
.context(error::ReadObjectSnafu { path })?;
let reader = object_store let reader = object_store
.reader_with(path) .reader_with(path)
.chunk(DEFAULT_READ_BUFFER) .buffer(DEFAULT_READ_BUFFER)
.await .await
.context(error::ReadObjectSnafu { path })? .context(error::ReadObjectSnafu { path })?;
.into_futures_async_read(0..meta.content_length())
.compat();
let builder = let builder =
ParquetRecordBatchStreamBuilder::new_with_metadata(reader, metadata.clone()); ParquetRecordBatchStreamBuilder::new_with_metadata(reader, metadata.clone());
let stream = builder let stream = builder
@@ -320,18 +302,12 @@ impl StatementExecutor {
))) )))
} }
FileMetadata::Orc { path, .. } => { FileMetadata::Orc { path, .. } => {
let meta = object_store
.stat(path)
.await
.context(error::ReadObjectSnafu { path })?;
let reader = object_store let reader = object_store
.reader_with(path) .reader_with(path)
.chunk(DEFAULT_READ_BUFFER) .buffer(DEFAULT_READ_BUFFER)
.await .await
.context(error::ReadObjectSnafu { path })?; .context(error::ReadObjectSnafu { path })?;
let stream = let stream = new_orc_stream_reader(reader)
new_orc_stream_reader(ReaderAdapter::new(reader, meta.content_length()))
.await .await
.context(error::ReadOrcSnafu)?; .context(error::ReadOrcSnafu)?;

View File

@@ -16,12 +16,14 @@ use std::collections::HashMap;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use axum::http::HeaderValue;
use common_base::Plugins; use common_base::Plugins;
use common_telemetry::metric::{convert_metric_to_write_request, MetricFilter}; use common_telemetry::metric::{convert_metric_to_write_request, MetricFilter};
use common_telemetry::{error, info}; use common_telemetry::{error, info};
use common_time::Timestamp; use common_time::Timestamp;
use hyper::HeaderMap;
use prost::Message; use prost::Message;
use reqwest::header::{HeaderMap, HeaderName, HeaderValue}; use reqwest::header::HeaderName;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use session::context::QueryContextBuilder; use session::context::QueryContextBuilder;
use snafu::{ensure, ResultExt}; use snafu::{ensure, ResultExt};
@@ -113,7 +115,7 @@ impl ExportMetricsTask {
} }
); );
} }
let mut headers = HeaderMap::new(); let mut headers = reqwest::header::HeaderMap::new();
if let Some(remote_write) = &config.remote_write { if let Some(remote_write) = &config.remote_write {
ensure!( ensure!(
!remote_write.url.is_empty(), !remote_write.url.is_empty(),

View File

@@ -14,10 +14,10 @@
use std::collections::HashMap; use std::collections::HashMap;
use axum::headers::HeaderValue;
use axum::response::{IntoResponse, Response}; use axum::response::{IntoResponse, Response};
use axum::Json; use axum::Json;
use common_query::Output; use common_query::Output;
use reqwest::header::HeaderValue;
use schemars::JsonSchema; use schemars::JsonSchema;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use serde_json::Value; use serde_json::Value;

View File

@@ -32,7 +32,6 @@
use std::convert::TryFrom; use std::convert::TryFrom;
use std::net::{SocketAddr, TcpListener}; use std::net::{SocketAddr, TcpListener};
use std::str::FromStr;
use axum::body::HttpBody; use axum::body::HttpBody;
use axum::BoxError; use axum::BoxError;
@@ -170,15 +169,7 @@ impl RequestBuilder {
HeaderValue: TryFrom<V>, HeaderValue: TryFrom<V>,
<HeaderValue as TryFrom<V>>::Error: Into<http::Error>, <HeaderValue as TryFrom<V>>::Error: Into<http::Error>,
{ {
// TODO(tisonkun): revert once http bump to 1.x
let key: HeaderName = key.try_into().map_err(Into::into).unwrap();
let key = reqwest::header::HeaderName::from_bytes(key.as_ref()).unwrap();
let value: HeaderValue = value.try_into().map_err(Into::into).unwrap();
let value = reqwest::header::HeaderValue::from_bytes(value.as_bytes()).unwrap();
self.builder = self.builder.header(key, value); self.builder = self.builder.header(key, value);
self self
} }
@@ -219,19 +210,12 @@ impl TestResponse {
/// Get the response status. /// Get the response status.
pub fn status(&self) -> StatusCode { pub fn status(&self) -> StatusCode {
StatusCode::from_u16(self.response.status().as_u16()).unwrap() self.response.status()
} }
/// Get the response headers. /// Get the response headers.
pub fn headers(&self) -> http::HeaderMap { pub fn headers(&self) -> &http::HeaderMap {
// TODO(tisonkun): revert once http bump to 1.x self.response.headers()
let mut headers = http::HeaderMap::new();
for (key, value) in self.response.headers() {
let key = HeaderName::from_str(key.as_str()).unwrap();
let value = HeaderValue::from_bytes(value.as_bytes()).unwrap();
headers.insert(key, value);
}
headers
} }
/// Get the response in chunks. /// Get the response in chunks.