mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-22 22:20:02 +00:00
Compare commits
3 Commits
4650935fc0
...
avoid-quer
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1bfba48755 | ||
|
|
457998f0fe | ||
|
|
b02c256157 |
234
Cargo.lock
generated
234
Cargo.lock
generated
@@ -83,7 +83,7 @@ dependencies = [
|
||||
"axum",
|
||||
"bytes",
|
||||
"cfg-if",
|
||||
"http 0.2.12",
|
||||
"http",
|
||||
"indexmap 1.9.3",
|
||||
"schemars",
|
||||
"serde",
|
||||
@@ -764,9 +764,9 @@ dependencies = [
|
||||
"bytes",
|
||||
"futures-util",
|
||||
"headers",
|
||||
"http 0.2.12",
|
||||
"http-body 0.4.6",
|
||||
"hyper 0.14.28",
|
||||
"http",
|
||||
"http-body",
|
||||
"hyper",
|
||||
"itoa",
|
||||
"matchit",
|
||||
"memchr",
|
||||
@@ -794,8 +794,8 @@ dependencies = [
|
||||
"async-trait",
|
||||
"bytes",
|
||||
"futures-util",
|
||||
"http 0.2.12",
|
||||
"http-body 0.4.6",
|
||||
"http",
|
||||
"http-body",
|
||||
"mime",
|
||||
"rustversion",
|
||||
"tower-layer",
|
||||
@@ -1854,7 +1854,7 @@ dependencies = [
|
||||
"common-telemetry",
|
||||
"common-test-util",
|
||||
"common-version",
|
||||
"hyper 0.14.28",
|
||||
"hyper",
|
||||
"reqwest",
|
||||
"serde",
|
||||
"tempfile",
|
||||
@@ -1963,7 +1963,7 @@ dependencies = [
|
||||
"futures-util",
|
||||
"hex",
|
||||
"humantime-serde",
|
||||
"hyper 0.14.28",
|
||||
"hyper",
|
||||
"itertools 0.10.5",
|
||||
"lazy_static",
|
||||
"moka",
|
||||
@@ -3525,6 +3525,15 @@ version = "0.3.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a357d28ed41a50f9c765dbfe56cbc04a64e53e5fc58ba79fbc34c10ef3df831f"
|
||||
|
||||
[[package]]
|
||||
name = "encoding_rs"
|
||||
version = "0.8.34"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b45de904aa0b010bce2ab45264d0631681847fa7b6f2eaa7dab7619943bc4f59"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "endian-type"
|
||||
version = "0.1.2"
|
||||
@@ -3595,7 +3604,7 @@ name = "etcd-client"
|
||||
version = "0.12.4"
|
||||
source = "git+https://github.com/MichaelScofield/etcd-client.git?rev=4c371e9b3ea8e0a8ee2f9cbd7ded26e54a45df3b#4c371e9b3ea8e0a8ee2f9cbd7ded26e54a45df3b"
|
||||
dependencies = [
|
||||
"http 0.2.12",
|
||||
"http",
|
||||
"prost 0.12.4",
|
||||
"tokio",
|
||||
"tokio-stream",
|
||||
@@ -4210,7 +4219,7 @@ dependencies = [
|
||||
"futures-core",
|
||||
"futures-sink",
|
||||
"futures-util",
|
||||
"http 0.2.12",
|
||||
"http",
|
||||
"indexmap 2.2.6",
|
||||
"slab",
|
||||
"tokio",
|
||||
@@ -4294,7 +4303,7 @@ dependencies = [
|
||||
"base64 0.21.7",
|
||||
"bytes",
|
||||
"headers-core",
|
||||
"http 0.2.12",
|
||||
"http",
|
||||
"httpdate",
|
||||
"mime",
|
||||
"sha1",
|
||||
@@ -4306,7 +4315,7 @@ version = "0.2.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e7f66481bfee273957b1f20485a4ff3362987f85b2c236580d81b4eb7a326429"
|
||||
dependencies = [
|
||||
"http 0.2.12",
|
||||
"http",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -4409,17 +4418,6 @@ dependencies = [
|
||||
"itoa",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "http"
|
||||
version = "1.1.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "21b9ddb458710bc376481b842f5da65cdf31522de232c1ca8146abce2a358258"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"fnv",
|
||||
"itoa",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "http-body"
|
||||
version = "0.4.6"
|
||||
@@ -4427,30 +4425,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "7ceab25649e9960c0311ea418d17bee82c0dcec1bd053b5f9a66e265a693bed2"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"http 0.2.12",
|
||||
"pin-project-lite",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "http-body"
|
||||
version = "1.0.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1cac85db508abc24a2e48553ba12a996e87244a0395ce011e62b37158745d643"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"http 1.1.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "http-body-util"
|
||||
version = "0.1.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0475f8b2ac86659c21b64320d5d653f9efe42acd2a4e560073ec61a155a34f1d"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"futures-core",
|
||||
"http 1.1.0",
|
||||
"http-body 1.0.0",
|
||||
"http",
|
||||
"pin-project-lite",
|
||||
]
|
||||
|
||||
@@ -4607,8 +4582,8 @@ dependencies = [
|
||||
"futures-core",
|
||||
"futures-util",
|
||||
"h2",
|
||||
"http 0.2.12",
|
||||
"http-body 0.4.6",
|
||||
"http",
|
||||
"http-body",
|
||||
"httparse",
|
||||
"httpdate",
|
||||
"itoa",
|
||||
@@ -4620,40 +4595,18 @@ dependencies = [
|
||||
"want",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "hyper"
|
||||
version = "1.3.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "fe575dd17d0862a9a33781c8c4696a55c320909004a67a00fb286ba8b1bc496d"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"futures-channel",
|
||||
"futures-util",
|
||||
"http 1.1.0",
|
||||
"http-body 1.0.0",
|
||||
"httparse",
|
||||
"itoa",
|
||||
"pin-project-lite",
|
||||
"smallvec",
|
||||
"tokio",
|
||||
"want",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "hyper-rustls"
|
||||
version = "0.26.0"
|
||||
version = "0.24.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a0bea761b46ae2b24eb4aef630d8d1c398157b6fc29e6350ecf090a0b70c952c"
|
||||
checksum = "ec3efd23720e2049821a693cbc7e65ea87c72f1c58ff2f9522ff332b1491e590"
|
||||
dependencies = [
|
||||
"futures-util",
|
||||
"http 1.1.0",
|
||||
"hyper 1.3.1",
|
||||
"hyper-util",
|
||||
"rustls 0.22.4",
|
||||
"rustls-pki-types",
|
||||
"http",
|
||||
"hyper",
|
||||
"rustls 0.21.12",
|
||||
"tokio",
|
||||
"tokio-rustls 0.25.0",
|
||||
"tower-service",
|
||||
"tokio-rustls 0.24.1",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -4662,32 +4615,12 @@ version = "0.4.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "bbb958482e8c7be4bc3cf272a766a2b0bf1a6755e7a6ae777f017a31d11b13b1"
|
||||
dependencies = [
|
||||
"hyper 0.14.28",
|
||||
"hyper",
|
||||
"pin-project-lite",
|
||||
"tokio",
|
||||
"tokio-io-timeout",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "hyper-util"
|
||||
version = "0.1.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ca38ef113da30126bbff9cd1705f9273e15d45498615d138b0c20279ac7a76aa"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"futures-channel",
|
||||
"futures-util",
|
||||
"http 1.1.0",
|
||||
"http-body 1.0.0",
|
||||
"hyper 1.3.1",
|
||||
"pin-project-lite",
|
||||
"socket2 0.5.7",
|
||||
"tokio",
|
||||
"tower",
|
||||
"tower-service",
|
||||
"tracing",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "iana-time-zone"
|
||||
version = "0.1.60"
|
||||
@@ -5671,7 +5604,7 @@ dependencies = [
|
||||
"etcd-client",
|
||||
"futures",
|
||||
"h2",
|
||||
"http-body 0.4.6",
|
||||
"http-body",
|
||||
"humantime",
|
||||
"humantime-serde",
|
||||
"itertools 0.10.5",
|
||||
@@ -6433,6 +6366,7 @@ name = "object-store"
|
||||
version = "0.8.1"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"async-trait",
|
||||
"bytes",
|
||||
"common-telemetry",
|
||||
"common-test-util",
|
||||
@@ -6481,21 +6415,20 @@ checksum = "0ab1bc2a289d34bd04a330323ac98a1b4bc82c9d9fcb1e66b63caa84da26b575"
|
||||
|
||||
[[package]]
|
||||
name = "opendal"
|
||||
version = "0.46.0"
|
||||
version = "0.45.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "328c4992328e8965e6a6ef102d38438b5fdc7d9b9107eda2377ba05379d9d544"
|
||||
checksum = "52c17c077f23fa2d2c25d9d22af98baa43b8bbe2ef0de80cf66339aa70401467"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"async-trait",
|
||||
"backon",
|
||||
"base64 0.22.1",
|
||||
"base64 0.21.7",
|
||||
"bytes",
|
||||
"chrono",
|
||||
"crc32c",
|
||||
"flagset",
|
||||
"futures",
|
||||
"getrandom",
|
||||
"http 1.1.0",
|
||||
"http",
|
||||
"log",
|
||||
"md-5",
|
||||
"once_cell",
|
||||
@@ -6583,7 +6516,7 @@ checksum = "f24cda83b20ed2433c68241f918d0f6fdec8b1d43b7a9590ab4420c5095ca930"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"futures-core",
|
||||
"http 0.2.12",
|
||||
"http",
|
||||
"opentelemetry 0.21.0",
|
||||
"opentelemetry-proto 0.4.0",
|
||||
"opentelemetry-semantic-conventions",
|
||||
@@ -6720,7 +6653,6 @@ dependencies = [
|
||||
"substrait 0.8.1",
|
||||
"table",
|
||||
"tokio",
|
||||
"tokio-util",
|
||||
"tonic 0.11.0",
|
||||
]
|
||||
|
||||
@@ -8264,20 +8196,20 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "reqsign"
|
||||
version = "0.15.0"
|
||||
version = "0.14.9"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "01edce6b6c31a16ebc7525ac58c747a6d78bbce33e76bbebd350d6bc25b23e06"
|
||||
checksum = "43e319d9de9ff4d941abf4ac718897118b0fe04577ea3f8e0f5788971784eef5"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"async-trait",
|
||||
"base64 0.22.1",
|
||||
"base64 0.21.7",
|
||||
"chrono",
|
||||
"form_urlencoded",
|
||||
"getrandom",
|
||||
"hex",
|
||||
"hmac",
|
||||
"home",
|
||||
"http 1.1.0",
|
||||
"http",
|
||||
"jsonwebtoken",
|
||||
"log",
|
||||
"once_cell",
|
||||
@@ -8286,7 +8218,7 @@ dependencies = [
|
||||
"rand",
|
||||
"reqwest",
|
||||
"rsa 0.9.6",
|
||||
"rust-ini 0.21.0",
|
||||
"rust-ini 0.20.0",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"sha1",
|
||||
@@ -8295,20 +8227,20 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "reqwest"
|
||||
version = "0.12.4"
|
||||
version = "0.11.27"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "566cafdd92868e0939d3fb961bd0dc25fcfaaed179291093b3d43e6b3150ea10"
|
||||
checksum = "dd67538700a17451e7cba03ac727fb961abb7607553461627b97de0b89cf4a62"
|
||||
dependencies = [
|
||||
"base64 0.22.1",
|
||||
"base64 0.21.7",
|
||||
"bytes",
|
||||
"encoding_rs",
|
||||
"futures-core",
|
||||
"futures-util",
|
||||
"http 1.1.0",
|
||||
"http-body 1.0.0",
|
||||
"http-body-util",
|
||||
"hyper 1.3.1",
|
||||
"h2",
|
||||
"http",
|
||||
"http-body",
|
||||
"hyper",
|
||||
"hyper-rustls",
|
||||
"hyper-util",
|
||||
"ipnet",
|
||||
"js-sys",
|
||||
"log",
|
||||
@@ -8317,16 +8249,16 @@ dependencies = [
|
||||
"once_cell",
|
||||
"percent-encoding",
|
||||
"pin-project-lite",
|
||||
"rustls 0.22.4",
|
||||
"rustls 0.21.12",
|
||||
"rustls-native-certs",
|
||||
"rustls-pemfile 2.1.2",
|
||||
"rustls-pki-types",
|
||||
"rustls-pemfile 1.0.4",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"serde_urlencoded",
|
||||
"sync_wrapper",
|
||||
"system-configuration",
|
||||
"tokio",
|
||||
"tokio-rustls 0.25.0",
|
||||
"tokio-rustls 0.24.1",
|
||||
"tokio-util",
|
||||
"tower-service",
|
||||
"url",
|
||||
@@ -8334,8 +8266,7 @@ dependencies = [
|
||||
"wasm-bindgen-futures",
|
||||
"wasm-streams",
|
||||
"web-sys",
|
||||
"webpki-roots 0.26.1",
|
||||
"winreg 0.52.0",
|
||||
"winreg 0.50.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -8601,13 +8532,12 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "rust-ini"
|
||||
version = "0.21.0"
|
||||
version = "0.20.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0d625ed57d8f49af6cfa514c42e1a71fadcff60eb0b1c517ff82fe41aa025b41"
|
||||
checksum = "3e0698206bcb8882bf2a9ecb4c1e7785db57ff052297085a6efd4fe42302068a"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"ordered-multimap 0.7.3",
|
||||
"trim-in-place",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -8749,13 +8679,12 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "rustls-native-certs"
|
||||
version = "0.7.0"
|
||||
version = "0.6.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8f1fb85efa936c42c6d5fc28d2629bb51e4b2f4b8a5211e297d599cc5a093792"
|
||||
checksum = "a9aace74cb666635c918e9c12bc0d348266037aa8eb599b5cba565709a8dff00"
|
||||
dependencies = [
|
||||
"openssl-probe",
|
||||
"rustls-pemfile 2.1.2",
|
||||
"rustls-pki-types",
|
||||
"rustls-pemfile 1.0.4",
|
||||
"schannel",
|
||||
"security-framework",
|
||||
]
|
||||
@@ -9588,10 +9517,10 @@ dependencies = [
|
||||
"hashbrown 0.14.5",
|
||||
"headers",
|
||||
"hostname",
|
||||
"http 0.2.12",
|
||||
"http-body 0.4.6",
|
||||
"http",
|
||||
"http-body",
|
||||
"humantime-serde",
|
||||
"hyper 0.14.28",
|
||||
"hyper",
|
||||
"influxdb_line_protocol",
|
||||
"itertools 0.10.5",
|
||||
"lazy_static",
|
||||
@@ -11215,9 +11144,9 @@ dependencies = [
|
||||
"futures-core",
|
||||
"futures-util",
|
||||
"h2",
|
||||
"http 0.2.12",
|
||||
"http-body 0.4.6",
|
||||
"hyper 0.14.28",
|
||||
"http",
|
||||
"http-body",
|
||||
"hyper",
|
||||
"hyper-timeout",
|
||||
"percent-encoding",
|
||||
"pin-project",
|
||||
@@ -11243,9 +11172,9 @@ dependencies = [
|
||||
"bytes",
|
||||
"flate2",
|
||||
"h2",
|
||||
"http 0.2.12",
|
||||
"http-body 0.4.6",
|
||||
"hyper 0.14.28",
|
||||
"http",
|
||||
"http-body",
|
||||
"hyper",
|
||||
"hyper-timeout",
|
||||
"percent-encoding",
|
||||
"pin-project",
|
||||
@@ -11347,8 +11276,8 @@ dependencies = [
|
||||
"bytes",
|
||||
"futures-core",
|
||||
"futures-util",
|
||||
"http 0.2.12",
|
||||
"http-body 0.4.6",
|
||||
"http",
|
||||
"http-body",
|
||||
"http-range-header",
|
||||
"httpdate",
|
||||
"iri-string",
|
||||
@@ -11591,12 +11520,6 @@ dependencies = [
|
||||
"tree-sitter",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "trim-in-place"
|
||||
version = "0.1.7"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "343e926fc669bc8cde4fa3129ab681c63671bae288b1f1081ceee6d9d37904fc"
|
||||
|
||||
[[package]]
|
||||
name = "triomphe"
|
||||
version = "0.1.11"
|
||||
@@ -12262,15 +12185,6 @@ version = "0.25.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "5f20c57d8d7db6d3b86154206ae5d8fba62dd39573114de97c2cb0578251f8e1"
|
||||
|
||||
[[package]]
|
||||
name = "webpki-roots"
|
||||
version = "0.26.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b3de34ae270483955a94f4b21bdaaeb83d508bb84a01435f393818edb0012009"
|
||||
dependencies = [
|
||||
"rustls-pki-types",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "which"
|
||||
version = "4.4.2"
|
||||
@@ -12637,9 +12551,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "winreg"
|
||||
version = "0.52.0"
|
||||
version = "0.50.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a277a57398d4bfa075df44f501a17cfdf8542d224f0d36095a2adc7aee4ef0a5"
|
||||
checksum = "524e57b2c537c0f9b1e69f1965311ec12182b4122e45035b1508cd24d2adadb1"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"windows-sys 0.48.0",
|
||||
|
||||
@@ -146,7 +146,7 @@ raft-engine = { version = "0.4.1", default-features = false }
|
||||
rand = "0.8"
|
||||
regex = "1.8"
|
||||
regex-automata = { version = "0.4" }
|
||||
reqwest = { version = "0.12", default-features = false, features = [
|
||||
reqwest = { version = "0.11", default-features = false, features = [
|
||||
"json",
|
||||
"rustls-tls-native-roots",
|
||||
"stream",
|
||||
|
||||
@@ -92,44 +92,34 @@ impl CompressionType {
|
||||
macro_rules! impl_compression_type {
|
||||
($(($enum_item:ident, $prefix:ident)),*) => {
|
||||
paste::item! {
|
||||
use bytes::{Buf, BufMut, BytesMut};
|
||||
|
||||
impl CompressionType {
|
||||
pub async fn encode<B: Buf>(&self, mut content: B) -> io::Result<Vec<u8>> {
|
||||
pub async fn encode(&self, content: impl AsRef<[u8]>) -> io::Result<Vec<u8>> {
|
||||
match self {
|
||||
$(
|
||||
CompressionType::$enum_item => {
|
||||
let mut buffer = Vec::with_capacity(content.remaining());
|
||||
let mut buffer = Vec::with_capacity(content.as_ref().len());
|
||||
let mut encoder = write::[<$prefix Encoder>]::new(&mut buffer);
|
||||
encoder.write_all_buf(&mut content).await?;
|
||||
encoder.write_all(content.as_ref()).await?;
|
||||
encoder.shutdown().await?;
|
||||
Ok(buffer)
|
||||
}
|
||||
)*
|
||||
CompressionType::Uncompressed => {
|
||||
let mut bs = BytesMut::with_capacity(content.remaining());
|
||||
bs.put(content);
|
||||
Ok(bs.to_vec())
|
||||
},
|
||||
CompressionType::Uncompressed => Ok(content.as_ref().to_vec()),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn decode<B: Buf>(&self, mut content: B) -> io::Result<Vec<u8>> {
|
||||
pub async fn decode(&self, content: impl AsRef<[u8]>) -> io::Result<Vec<u8>> {
|
||||
match self {
|
||||
$(
|
||||
CompressionType::$enum_item => {
|
||||
let mut buffer = Vec::with_capacity(content.remaining() * 2);
|
||||
let mut buffer = Vec::with_capacity(content.as_ref().len() * 2);
|
||||
let mut encoder = write::[<$prefix Decoder>]::new(&mut buffer);
|
||||
encoder.write_all_buf(&mut content).await?;
|
||||
encoder.write_all(content.as_ref()).await?;
|
||||
encoder.shutdown().await?;
|
||||
Ok(buffer)
|
||||
}
|
||||
)*
|
||||
CompressionType::Uncompressed => {
|
||||
let mut bs = BytesMut::with_capacity(content.remaining());
|
||||
bs.put(content);
|
||||
Ok(bs.to_vec())
|
||||
},
|
||||
CompressionType::Uncompressed => Ok(content.as_ref().to_vec()),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -161,13 +151,13 @@ macro_rules! impl_compression_type {
|
||||
$(
|
||||
#[tokio::test]
|
||||
async fn [<test_ $enum_item:lower _compression>]() {
|
||||
let string = "foo_bar".as_bytes();
|
||||
let string = "foo_bar".as_bytes().to_vec();
|
||||
let compress = CompressionType::$enum_item
|
||||
.encode(string)
|
||||
.encode(&string)
|
||||
.await
|
||||
.unwrap();
|
||||
let decompress = CompressionType::$enum_item
|
||||
.decode(compress.as_slice())
|
||||
.decode(&compress)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(decompress, string);
|
||||
@@ -175,13 +165,13 @@ macro_rules! impl_compression_type {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_uncompression() {
|
||||
let string = "foo_bar".as_bytes();
|
||||
let string = "foo_bar".as_bytes().to_vec();
|
||||
let compress = CompressionType::Uncompressed
|
||||
.encode(string)
|
||||
.encode(&string)
|
||||
.await
|
||||
.unwrap();
|
||||
let decompress = CompressionType::Uncompressed
|
||||
.decode(compress.as_slice())
|
||||
.decode(&compress)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(decompress, string);
|
||||
|
||||
@@ -36,7 +36,6 @@ use datafusion::physical_plan::SendableRecordBatchStream;
|
||||
use futures::StreamExt;
|
||||
use object_store::ObjectStore;
|
||||
use snafu::ResultExt;
|
||||
use tokio_util::compat::FuturesAsyncWriteCompatExt;
|
||||
|
||||
use self::csv::CsvFormat;
|
||||
use self::json::JsonFormat;
|
||||
@@ -147,8 +146,7 @@ pub fn open_with_decoder<T: ArrowDecoder, F: Fn() -> DataFusionResult<T>>(
|
||||
let reader = object_store
|
||||
.reader(&path)
|
||||
.await
|
||||
.map_err(|e| DataFusionError::External(Box::new(e)))?
|
||||
.into_bytes_stream(..);
|
||||
.map_err(|e| DataFusionError::External(Box::new(e)))?;
|
||||
|
||||
let mut upstream = compression_type.convert_stream(reader).fuse();
|
||||
|
||||
@@ -205,7 +203,6 @@ pub async fn stream_to_file<T: DfRecordBatchEncoder, U: Fn(SharedBuffer) -> T>(
|
||||
.writer_with(&path)
|
||||
.concurrent(concurrency)
|
||||
.await
|
||||
.map(|v| v.into_futures_async_write().compat_write())
|
||||
.context(error::WriteObjectSnafu { path })
|
||||
});
|
||||
|
||||
|
||||
@@ -29,7 +29,6 @@ use datafusion::physical_plan::SendableRecordBatchStream;
|
||||
use derive_builder::Builder;
|
||||
use object_store::ObjectStore;
|
||||
use snafu::ResultExt;
|
||||
use tokio_util::compat::FuturesAsyncReadCompatExt;
|
||||
use tokio_util::io::SyncIoBridge;
|
||||
|
||||
use super::stream_to_file;
|
||||
@@ -165,16 +164,10 @@ impl FileOpener for CsvOpener {
|
||||
#[async_trait]
|
||||
impl FileFormat for CsvFormat {
|
||||
async fn infer_schema(&self, store: &ObjectStore, path: &str) -> Result<Schema> {
|
||||
let meta = store
|
||||
.stat(path)
|
||||
.await
|
||||
.context(error::ReadObjectSnafu { path })?;
|
||||
let reader = store
|
||||
.reader(path)
|
||||
.await
|
||||
.context(error::ReadObjectSnafu { path })?
|
||||
.into_futures_async_read(0..meta.content_length())
|
||||
.compat();
|
||||
.context(error::ReadObjectSnafu { path })?;
|
||||
|
||||
let decoded = self.compression_type.convert_async_read(reader);
|
||||
|
||||
|
||||
@@ -31,7 +31,6 @@ use datafusion::error::{DataFusionError, Result as DataFusionResult};
|
||||
use datafusion::physical_plan::SendableRecordBatchStream;
|
||||
use object_store::ObjectStore;
|
||||
use snafu::ResultExt;
|
||||
use tokio_util::compat::FuturesAsyncReadCompatExt;
|
||||
use tokio_util::io::SyncIoBridge;
|
||||
|
||||
use super::stream_to_file;
|
||||
@@ -83,16 +82,10 @@ impl Default for JsonFormat {
|
||||
#[async_trait]
|
||||
impl FileFormat for JsonFormat {
|
||||
async fn infer_schema(&self, store: &ObjectStore, path: &str) -> Result<Schema> {
|
||||
let meta = store
|
||||
.stat(path)
|
||||
.await
|
||||
.context(error::ReadObjectSnafu { path })?;
|
||||
let reader = store
|
||||
.reader(path)
|
||||
.await
|
||||
.context(error::ReadObjectSnafu { path })?
|
||||
.into_futures_async_read(0..meta.content_length())
|
||||
.compat();
|
||||
.context(error::ReadObjectSnafu { path })?;
|
||||
|
||||
let decoded = self.compression_type.convert_async_read(reader);
|
||||
|
||||
|
||||
@@ -16,17 +16,15 @@ use std::sync::Arc;
|
||||
|
||||
use arrow_schema::{ArrowError, Schema, SchemaRef};
|
||||
use async_trait::async_trait;
|
||||
use bytes::Bytes;
|
||||
use common_recordbatch::adapter::RecordBatchStreamTypeAdapter;
|
||||
use datafusion::datasource::physical_plan::{FileMeta, FileOpenFuture, FileOpener};
|
||||
use datafusion::error::{DataFusionError, Result as DfResult};
|
||||
use futures::future::BoxFuture;
|
||||
use futures::{FutureExt, StreamExt, TryStreamExt};
|
||||
use futures::{StreamExt, TryStreamExt};
|
||||
use object_store::ObjectStore;
|
||||
use orc_rust::arrow_reader::ArrowReaderBuilder;
|
||||
use orc_rust::async_arrow_reader::ArrowStreamReader;
|
||||
use orc_rust::reader::AsyncChunkReader;
|
||||
use snafu::ResultExt;
|
||||
use tokio::io::{AsyncRead, AsyncSeek};
|
||||
|
||||
use crate::error::{self, Result};
|
||||
use crate::file_format::FileFormat;
|
||||
@@ -34,49 +32,18 @@ use crate::file_format::FileFormat;
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
|
||||
pub struct OrcFormat;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct ReaderAdapter {
|
||||
reader: object_store::Reader,
|
||||
len: u64,
|
||||
}
|
||||
|
||||
impl ReaderAdapter {
|
||||
pub fn new(reader: object_store::Reader, len: u64) -> Self {
|
||||
Self { reader, len }
|
||||
}
|
||||
}
|
||||
|
||||
impl AsyncChunkReader for ReaderAdapter {
|
||||
fn len(&mut self) -> BoxFuture<'_, std::io::Result<u64>> {
|
||||
async move { Ok(self.len) }.boxed()
|
||||
}
|
||||
|
||||
fn get_bytes(
|
||||
&mut self,
|
||||
offset_from_start: u64,
|
||||
length: u64,
|
||||
) -> BoxFuture<'_, std::io::Result<Bytes>> {
|
||||
async move {
|
||||
let bytes = self
|
||||
.reader
|
||||
.read(offset_from_start..offset_from_start + length)
|
||||
.await?;
|
||||
Ok(bytes.to_bytes())
|
||||
}
|
||||
.boxed()
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn new_orc_stream_reader(
|
||||
reader: ReaderAdapter,
|
||||
) -> Result<ArrowStreamReader<ReaderAdapter>> {
|
||||
pub async fn new_orc_stream_reader<R: AsyncRead + AsyncSeek + Unpin + Send + 'static>(
|
||||
reader: R,
|
||||
) -> Result<ArrowStreamReader<R>> {
|
||||
let reader_build = ArrowReaderBuilder::try_new_async(reader)
|
||||
.await
|
||||
.context(error::OrcReaderSnafu)?;
|
||||
Ok(reader_build.build_async())
|
||||
}
|
||||
|
||||
pub async fn infer_orc_schema(reader: ReaderAdapter) -> Result<Schema> {
|
||||
pub async fn infer_orc_schema<R: AsyncRead + AsyncSeek + Unpin + Send + 'static>(
|
||||
reader: R,
|
||||
) -> Result<Schema> {
|
||||
let reader = new_orc_stream_reader(reader).await?;
|
||||
Ok(reader.schema().as_ref().clone())
|
||||
}
|
||||
@@ -84,15 +51,13 @@ pub async fn infer_orc_schema(reader: ReaderAdapter) -> Result<Schema> {
|
||||
#[async_trait]
|
||||
impl FileFormat for OrcFormat {
|
||||
async fn infer_schema(&self, store: &ObjectStore, path: &str) -> Result<Schema> {
|
||||
let meta = store
|
||||
.stat(path)
|
||||
.await
|
||||
.context(error::ReadObjectSnafu { path })?;
|
||||
let reader = store
|
||||
.reader(path)
|
||||
.await
|
||||
.context(error::ReadObjectSnafu { path })?;
|
||||
let schema = infer_orc_schema(ReaderAdapter::new(reader, meta.content_length())).await?;
|
||||
|
||||
let schema = infer_orc_schema(reader).await?;
|
||||
|
||||
Ok(schema)
|
||||
}
|
||||
}
|
||||
@@ -132,22 +97,14 @@ impl FileOpener for OrcOpener {
|
||||
};
|
||||
let projection = self.projection.clone();
|
||||
Ok(Box::pin(async move {
|
||||
let path = meta.location().to_string();
|
||||
|
||||
let meta = object_store
|
||||
.stat(&path)
|
||||
.await
|
||||
.map_err(|e| DataFusionError::External(Box::new(e)))?;
|
||||
|
||||
let reader = object_store
|
||||
.reader(&path)
|
||||
.reader(meta.location().to_string().as_str())
|
||||
.await
|
||||
.map_err(|e| DataFusionError::External(Box::new(e)))?;
|
||||
|
||||
let stream_reader =
|
||||
new_orc_stream_reader(ReaderAdapter::new(reader, meta.content_length()))
|
||||
.await
|
||||
.map_err(|e| DataFusionError::External(Box::new(e)))?;
|
||||
let stream_reader = new_orc_stream_reader(reader)
|
||||
.await
|
||||
.map_err(|e| DataFusionError::External(Box::new(e)))?;
|
||||
|
||||
let stream =
|
||||
RecordBatchStreamTypeAdapter::new(projected_schema, stream_reader, projection);
|
||||
|
||||
@@ -29,11 +29,10 @@ use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
|
||||
use datafusion::physical_plan::SendableRecordBatchStream;
|
||||
use futures::future::BoxFuture;
|
||||
use futures::StreamExt;
|
||||
use object_store::{FuturesAsyncReader, ObjectStore};
|
||||
use object_store::{ObjectStore, Reader, Writer};
|
||||
use parquet::basic::{Compression, ZstdLevel};
|
||||
use parquet::file::properties::WriterProperties;
|
||||
use snafu::ResultExt;
|
||||
use tokio_util::compat::{Compat, FuturesAsyncReadCompatExt, FuturesAsyncWriteCompatExt};
|
||||
|
||||
use crate::buffered_writer::{ArrowWriterCloser, DfRecordBatchEncoder, LazyBufferedWriter};
|
||||
use crate::error::{self, Result};
|
||||
@@ -46,16 +45,10 @@ pub struct ParquetFormat {}
|
||||
#[async_trait]
|
||||
impl FileFormat for ParquetFormat {
|
||||
async fn infer_schema(&self, store: &ObjectStore, path: &str) -> Result<Schema> {
|
||||
let meta = store
|
||||
.stat(path)
|
||||
.await
|
||||
.context(error::ReadObjectSnafu { path })?;
|
||||
let mut reader = store
|
||||
.reader(path)
|
||||
.await
|
||||
.context(error::ReadObjectSnafu { path })?
|
||||
.into_futures_async_read(0..meta.content_length())
|
||||
.compat();
|
||||
.context(error::ReadObjectSnafu { path })?;
|
||||
|
||||
let metadata = reader
|
||||
.get_metadata()
|
||||
@@ -105,7 +98,7 @@ impl ParquetFileReaderFactory for DefaultParquetFileReaderFactory {
|
||||
|
||||
pub struct LazyParquetFileReader {
|
||||
object_store: ObjectStore,
|
||||
reader: Option<Compat<FuturesAsyncReader>>,
|
||||
reader: Option<Reader>,
|
||||
path: String,
|
||||
}
|
||||
|
||||
@@ -121,13 +114,7 @@ impl LazyParquetFileReader {
|
||||
/// Must initialize the reader, or throw an error from the future.
|
||||
async fn maybe_initialize(&mut self) -> result::Result<(), object_store::Error> {
|
||||
if self.reader.is_none() {
|
||||
let meta = self.object_store.stat(&self.path).await?;
|
||||
let reader = self
|
||||
.object_store
|
||||
.reader(&self.path)
|
||||
.await?
|
||||
.into_futures_async_read(0..meta.content_length())
|
||||
.compat();
|
||||
let reader = self.object_store.reader(&self.path).await?;
|
||||
self.reader = Some(reader);
|
||||
}
|
||||
|
||||
@@ -180,17 +167,16 @@ pub struct BufferedWriter {
|
||||
}
|
||||
|
||||
type InnerBufferedWriter = LazyBufferedWriter<
|
||||
Compat<object_store::FuturesAsyncWriter>,
|
||||
object_store::Writer,
|
||||
ArrowWriter<SharedBuffer>,
|
||||
impl Fn(String) -> BoxFuture<'static, Result<Compat<object_store::FuturesAsyncWriter>>>,
|
||||
impl Fn(String) -> BoxFuture<'static, Result<Writer>>,
|
||||
>;
|
||||
|
||||
impl BufferedWriter {
|
||||
fn make_write_factory(
|
||||
store: ObjectStore,
|
||||
concurrency: usize,
|
||||
) -> impl Fn(String) -> BoxFuture<'static, Result<Compat<object_store::FuturesAsyncWriter>>>
|
||||
{
|
||||
) -> impl Fn(String) -> BoxFuture<'static, Result<Writer>> {
|
||||
move |path| {
|
||||
let store = store.clone();
|
||||
Box::pin(async move {
|
||||
@@ -198,7 +184,6 @@ impl BufferedWriter {
|
||||
.writer_with(&path)
|
||||
.concurrent(concurrency)
|
||||
.await
|
||||
.map(|v| v.into_futures_async_write().compat_write())
|
||||
.context(error::WriteObjectSnafu { path })
|
||||
})
|
||||
}
|
||||
|
||||
@@ -120,7 +120,7 @@ pub async fn setup_stream_to_json_test(origin_path: &str, threshold: impl Fn(usi
|
||||
|
||||
let written = tmp_store.read(&output_path).await.unwrap();
|
||||
let origin = store.read(origin_path).await.unwrap();
|
||||
assert_eq_lines(written.to_vec(), origin.to_vec());
|
||||
assert_eq_lines(written, origin);
|
||||
}
|
||||
|
||||
pub async fn setup_stream_to_csv_test(origin_path: &str, threshold: impl Fn(usize) -> usize) {
|
||||
@@ -158,7 +158,7 @@ pub async fn setup_stream_to_csv_test(origin_path: &str, threshold: impl Fn(usiz
|
||||
|
||||
let written = tmp_store.read(&output_path).await.unwrap();
|
||||
let origin = store.read(origin_path).await.unwrap();
|
||||
assert_eq_lines(written.to_vec(), origin.to_vec());
|
||||
assert_eq_lines(written, origin);
|
||||
}
|
||||
|
||||
// Ignore the CRLF difference across operating systems.
|
||||
|
||||
@@ -179,7 +179,7 @@ impl StateStore for ObjectStateStore {
|
||||
))
|
||||
})
|
||||
.context(ListStateSnafu { path: key })?;
|
||||
yield (key.into(), value.to_vec());
|
||||
yield (key.into(), value);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
@@ -20,6 +20,7 @@ mod gcs;
|
||||
mod oss;
|
||||
mod s3;
|
||||
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use std::{env, path};
|
||||
|
||||
@@ -28,7 +29,7 @@ use common_telemetry::info;
|
||||
use object_store::layers::{LruCacheLayer, RetryLayer};
|
||||
use object_store::services::Fs;
|
||||
use object_store::util::{join_dir, normalize_dir, with_instrument_layers};
|
||||
use object_store::{HttpClient, ObjectStore};
|
||||
use object_store::{HttpClient, ObjectStore, ObjectStoreBuilder};
|
||||
use snafu::prelude::*;
|
||||
|
||||
use crate::config::{ObjectStoreConfig, DEFAULT_OBJECT_STORE_CACHE_SIZE};
|
||||
@@ -106,13 +107,13 @@ async fn create_object_store_with_cache(
|
||||
if let Some(path) = cache_path {
|
||||
let atomic_temp_dir = join_dir(path, ".tmp/");
|
||||
clean_temp_dir(&atomic_temp_dir)?;
|
||||
let mut builder = Fs::default();
|
||||
builder.root(path).atomic_write_dir(&atomic_temp_dir);
|
||||
let cache_store = ObjectStore::new(builder)
|
||||
.context(error::InitBackendSnafu)?
|
||||
.finish();
|
||||
let cache_store = Fs::default()
|
||||
.root(path)
|
||||
.atomic_write_dir(&atomic_temp_dir)
|
||||
.build()
|
||||
.context(error::InitBackendSnafu)?;
|
||||
|
||||
let cache_layer = LruCacheLayer::new(cache_store, cache_capacity.0 as usize)
|
||||
let cache_layer = LruCacheLayer::new(Arc::new(cache_store), cache_capacity.0 as usize)
|
||||
.await
|
||||
.context(error::InitBackendSnafu)?;
|
||||
|
||||
|
||||
@@ -71,8 +71,7 @@ impl FileRegionManifest {
|
||||
let bs = object_store
|
||||
.read(path)
|
||||
.await
|
||||
.context(LoadRegionManifestSnafu { region_id })?
|
||||
.to_vec();
|
||||
.context(LoadRegionManifestSnafu { region_id })?;
|
||||
Self::decode(bs.as_slice())
|
||||
}
|
||||
|
||||
|
||||
@@ -171,9 +171,10 @@ impl MetricEngineInner {
|
||||
|
||||
// check if the logical region already exist
|
||||
if self
|
||||
.metadata_region
|
||||
.is_logical_region_exists(metadata_region_id, logical_region_id)
|
||||
.await?
|
||||
.state
|
||||
.read()
|
||||
.unwrap()
|
||||
.is_logical_region_exists(logical_region_id)
|
||||
{
|
||||
info!("Create a existing logical region {logical_region_id}. Skipped");
|
||||
return Ok(data_region_id);
|
||||
|
||||
@@ -104,7 +104,7 @@ impl MetricEngineInner {
|
||||
// check if the region exists
|
||||
let data_region_id = to_data_region_id(physical_region_id);
|
||||
let state = self.state.read().unwrap();
|
||||
if !state.is_logical_region_exist(logical_region_id) {
|
||||
if !state.is_logical_region_exists(logical_region_id) {
|
||||
error!("Trying to write to an nonexistent region {logical_region_id}");
|
||||
return LogicalRegionNotFoundSnafu {
|
||||
region_id: logical_region_id,
|
||||
|
||||
@@ -149,7 +149,7 @@ impl MetricEngineState {
|
||||
Ok(exist)
|
||||
}
|
||||
|
||||
pub fn is_logical_region_exist(&self, logical_region_id: RegionId) -> bool {
|
||||
pub fn is_logical_region_exists(&self, logical_region_id: RegionId) -> bool {
|
||||
self.logical_regions().contains_key(&logical_region_id)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -139,17 +139,6 @@ impl MetadataRegion {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Check if the given logical region exists.
|
||||
pub async fn is_logical_region_exists(
|
||||
&self,
|
||||
physical_region_id: RegionId,
|
||||
logical_region_id: RegionId,
|
||||
) -> Result<bool> {
|
||||
let region_id = utils::to_metadata_region_id(physical_region_id);
|
||||
let region_key = Self::concat_region_key(logical_region_id);
|
||||
self.exists(region_id, ®ion_key).await
|
||||
}
|
||||
|
||||
/// Check if the given column exists. Return the semantic type if exists.
|
||||
pub async fn column_semantic_type(
|
||||
&self,
|
||||
@@ -669,10 +658,6 @@ mod test {
|
||||
.add_logical_region(physical_region_id, logical_region_id)
|
||||
.await
|
||||
.unwrap();
|
||||
assert!(metadata_region
|
||||
.is_logical_region_exists(physical_region_id, logical_region_id)
|
||||
.await
|
||||
.unwrap());
|
||||
|
||||
// add it again
|
||||
assert!(metadata_region
|
||||
|
||||
19
src/mito2/src/cache/file_cache.rs
vendored
19
src/mito2/src/cache/file_cache.rs
vendored
@@ -112,10 +112,6 @@ impl FileCache {
|
||||
self.memory_index.insert(key, value).await;
|
||||
}
|
||||
|
||||
pub(crate) async fn get(&self, key: IndexKey) -> Option<IndexValue> {
|
||||
self.memory_index.get(&key).await
|
||||
}
|
||||
|
||||
/// Reads a file from the cache.
|
||||
pub(crate) async fn reader(&self, key: IndexKey) -> Option<Reader> {
|
||||
// We must use `get()` to update the estimator of the cache.
|
||||
@@ -376,6 +372,7 @@ fn parse_index_key(name: &str) -> Option<IndexKey> {
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use common_test_util::temp_dir::create_temp_dir;
|
||||
use futures::AsyncReadExt;
|
||||
use object_store::services::Fs;
|
||||
|
||||
use super::*;
|
||||
@@ -454,9 +451,10 @@ mod tests {
|
||||
.await;
|
||||
|
||||
// Read file content.
|
||||
let reader = cache.reader(key).await.unwrap();
|
||||
let buf = reader.read(..).await.unwrap().to_vec();
|
||||
assert_eq!("hello", String::from_utf8(buf).unwrap());
|
||||
let mut reader = cache.reader(key).await.unwrap();
|
||||
let mut buf = String::new();
|
||||
reader.read_to_string(&mut buf).await.unwrap();
|
||||
assert_eq!("hello", buf);
|
||||
|
||||
// Get weighted size.
|
||||
cache.memory_index.run_pending_tasks().await;
|
||||
@@ -551,9 +549,10 @@ mod tests {
|
||||
|
||||
for (i, file_id) in file_ids.iter().enumerate() {
|
||||
let key = IndexKey::new(region_id, *file_id, file_type);
|
||||
let reader = cache.reader(key).await.unwrap();
|
||||
let buf = reader.read(..).await.unwrap().to_vec();
|
||||
assert_eq!(i.to_string(), String::from_utf8(buf).unwrap());
|
||||
let mut reader = cache.reader(key).await.unwrap();
|
||||
let mut buf = String::new();
|
||||
reader.read_to_string(&mut buf).await.unwrap();
|
||||
assert_eq!(i.to_string(), buf);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
25
src/mito2/src/cache/write_cache.rs
vendored
25
src/mito2/src/cache/write_cache.rs
vendored
@@ -19,7 +19,6 @@ use std::time::Duration;
|
||||
|
||||
use common_base::readable_size::ReadableSize;
|
||||
use common_telemetry::{debug, info};
|
||||
use futures::AsyncWriteExt;
|
||||
use object_store::manager::ObjectStoreManagerRef;
|
||||
use object_store::ObjectStore;
|
||||
use snafu::ResultExt;
|
||||
@@ -176,27 +175,19 @@ impl WriteCache {
|
||||
}])
|
||||
.start_timer();
|
||||
|
||||
let cached_value = self
|
||||
.file_cache
|
||||
.local_store()
|
||||
.stat(&cache_path)
|
||||
.await
|
||||
.context(error::OpenDalSnafu)?;
|
||||
let reader = self
|
||||
.file_cache
|
||||
.local_store()
|
||||
.reader(&cache_path)
|
||||
.await
|
||||
.context(error::OpenDalSnafu)?
|
||||
.into_futures_async_read(0..cached_value.content_length());
|
||||
.context(error::OpenDalSnafu)?;
|
||||
|
||||
let mut writer = remote_store
|
||||
.writer_with(upload_path)
|
||||
.chunk(DEFAULT_WRITE_BUFFER_SIZE.as_bytes() as usize)
|
||||
.buffer(DEFAULT_WRITE_BUFFER_SIZE.as_bytes() as usize)
|
||||
.concurrent(DEFAULT_WRITE_CONCURRENCY)
|
||||
.await
|
||||
.context(error::OpenDalSnafu)?
|
||||
.into_futures_async_write();
|
||||
.context(error::OpenDalSnafu)?;
|
||||
|
||||
let bytes_written =
|
||||
futures::io::copy(reader, &mut writer)
|
||||
@@ -208,11 +199,7 @@ impl WriteCache {
|
||||
})?;
|
||||
|
||||
// Must close to upload all data.
|
||||
writer.close().await.context(error::UploadSnafu {
|
||||
region_id,
|
||||
file_id,
|
||||
file_type,
|
||||
})?;
|
||||
writer.close().await.context(error::OpenDalSnafu)?;
|
||||
|
||||
UPLOAD_BYTES_TOTAL.inc_by(bytes_written);
|
||||
|
||||
@@ -328,7 +315,7 @@ mod tests {
|
||||
.read(&write_cache.file_cache.cache_file_path(key))
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(remote_data.to_vec(), cache_data.to_vec());
|
||||
assert_eq!(remote_data, cache_data);
|
||||
|
||||
// Check write cache contains the index key
|
||||
let index_key = IndexKey::new(region_id, file_id, FileType::Puffin);
|
||||
@@ -339,7 +326,7 @@ mod tests {
|
||||
.read(&write_cache.file_cache.cache_file_path(index_key))
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(remote_index_data.to_vec(), cache_index_data.to_vec());
|
||||
assert_eq!(remote_index_data, cache_index_data);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
|
||||
@@ -497,7 +497,7 @@ impl ManifestObjectStore {
|
||||
}
|
||||
};
|
||||
|
||||
let checkpoint_metadata = CheckpointMetadata::decode(&last_checkpoint_data.to_vec())?;
|
||||
let checkpoint_metadata = CheckpointMetadata::decode(&last_checkpoint_data)?;
|
||||
|
||||
debug!(
|
||||
"Load checkpoint in path: {}, metadata: {:?}",
|
||||
@@ -509,11 +509,7 @@ impl ManifestObjectStore {
|
||||
|
||||
#[cfg(test)]
|
||||
pub async fn read_file(&self, path: &str) -> Result<Vec<u8>> {
|
||||
self.object_store
|
||||
.read(path)
|
||||
.await
|
||||
.context(OpenDalSnafu)
|
||||
.map(|v| v.to_vec())
|
||||
self.object_store.read(path).await.context(OpenDalSnafu)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
@@ -121,17 +121,9 @@ impl SstIndexApplier {
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
let Some(indexed_value) = file_cache
|
||||
.get(IndexKey::new(self.region_id, file_id, FileType::Puffin))
|
||||
.await
|
||||
else {
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
Ok(file_cache
|
||||
.reader(IndexKey::new(self.region_id, file_id, FileType::Puffin))
|
||||
.await
|
||||
.map(|v| v.into_futures_async_read(0..indexed_value.file_size as u64))
|
||||
.map(PuffinFileReader::new))
|
||||
}
|
||||
|
||||
@@ -198,13 +190,7 @@ mod tests {
|
||||
let region_dir = "region_dir".to_string();
|
||||
let path = location::index_file_path(®ion_dir, file_id);
|
||||
|
||||
let mut puffin_writer = PuffinFileWriter::new(
|
||||
object_store
|
||||
.writer(&path)
|
||||
.await
|
||||
.unwrap()
|
||||
.into_futures_async_write(),
|
||||
);
|
||||
let mut puffin_writer = PuffinFileWriter::new(object_store.writer(&path).await.unwrap());
|
||||
puffin_writer
|
||||
.add_blob(Blob {
|
||||
blob_type: INDEX_BLOB_TYPE.to_string(),
|
||||
@@ -250,13 +236,7 @@ mod tests {
|
||||
let region_dir = "region_dir".to_string();
|
||||
let path = location::index_file_path(®ion_dir, file_id);
|
||||
|
||||
let mut puffin_writer = PuffinFileWriter::new(
|
||||
object_store
|
||||
.writer(&path)
|
||||
.await
|
||||
.unwrap()
|
||||
.into_futures_async_write(),
|
||||
);
|
||||
let mut puffin_writer = PuffinFileWriter::new(object_store.writer(&path).await.unwrap());
|
||||
puffin_writer
|
||||
.add_blob(Blob {
|
||||
blob_type: "invalid_blob_type".to_string(),
|
||||
|
||||
@@ -26,8 +26,6 @@ use crate::error::{OpenDalSnafu, Result};
|
||||
|
||||
/// A wrapper around [`ObjectStore`] that adds instrumentation for monitoring
|
||||
/// metrics such as bytes read, bytes written, and the number of seek operations.
|
||||
///
|
||||
/// TODO: Consider refactor InstrumentedStore to use async in trait instead of AsyncRead.
|
||||
#[derive(Clone)]
|
||||
pub(crate) struct InstrumentedStore {
|
||||
/// The underlying object store.
|
||||
@@ -60,14 +58,8 @@ impl InstrumentedStore {
|
||||
read_byte_count: &'a IntCounter,
|
||||
read_count: &'a IntCounter,
|
||||
seek_count: &'a IntCounter,
|
||||
) -> Result<InstrumentedAsyncRead<'a, object_store::FuturesAsyncReader>> {
|
||||
let meta = self.object_store.stat(path).await.context(OpenDalSnafu)?;
|
||||
let reader = self
|
||||
.object_store
|
||||
.reader(path)
|
||||
.await
|
||||
.context(OpenDalSnafu)?
|
||||
.into_futures_async_read(0..meta.content_length());
|
||||
) -> Result<InstrumentedAsyncRead<'a, object_store::Reader>> {
|
||||
let reader = self.object_store.reader(path).await.context(OpenDalSnafu)?;
|
||||
Ok(InstrumentedAsyncRead::new(
|
||||
reader,
|
||||
read_byte_count,
|
||||
@@ -85,21 +77,15 @@ impl InstrumentedStore {
|
||||
write_byte_count: &'a IntCounter,
|
||||
write_count: &'a IntCounter,
|
||||
flush_count: &'a IntCounter,
|
||||
) -> Result<InstrumentedAsyncWrite<'a, object_store::FuturesAsyncWriter>> {
|
||||
) -> Result<InstrumentedAsyncWrite<'a, object_store::Writer>> {
|
||||
let writer = match self.write_buffer_size {
|
||||
Some(size) => self
|
||||
.object_store
|
||||
.writer_with(path)
|
||||
.chunk(size)
|
||||
.buffer(size)
|
||||
.await
|
||||
.context(OpenDalSnafu)?
|
||||
.into_futures_async_write(),
|
||||
None => self
|
||||
.object_store
|
||||
.writer(path)
|
||||
.await
|
||||
.context(OpenDalSnafu)?
|
||||
.into_futures_async_write(),
|
||||
.context(OpenDalSnafu)?,
|
||||
None => self.object_store.writer(path).await.context(OpenDalSnafu)?,
|
||||
};
|
||||
Ok(InstrumentedAsyncWrite::new(
|
||||
writer,
|
||||
|
||||
@@ -121,7 +121,7 @@ async fn fetch_ranges_seq(
|
||||
.read_with(&file_path)
|
||||
.range(range.start..range.end)
|
||||
.call()?;
|
||||
Ok::<_, object_store::Error>(data.to_bytes())
|
||||
Ok::<_, object_store::Error>(Bytes::from(data))
|
||||
})
|
||||
.collect::<object_store::Result<Vec<_>>>()
|
||||
};
|
||||
@@ -141,7 +141,7 @@ async fn fetch_ranges_concurrent(
|
||||
let future_read = object_store.read_with(file_path);
|
||||
handles.push(async move {
|
||||
let data = future_read.range(range.start..range.end).await?;
|
||||
Ok::<_, object_store::Error>(data.to_bytes())
|
||||
Ok::<_, object_store::Error>(Bytes::from(data))
|
||||
});
|
||||
}
|
||||
let results = futures::future::try_join_all(handles).await?;
|
||||
@@ -164,7 +164,7 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
// https://github.com/apache/opendal/blob/v0.46.0/core/src/raw/tokio_util.rs#L21-L24
|
||||
// https://github.com/apache/incubator-opendal/blob/7144ab1ca2409dff0c324bfed062ce985997f8ce/core/src/raw/tokio_util.rs#L21-L23
|
||||
/// Parse tokio error into opendal::Error.
|
||||
fn new_task_join_error(e: tokio::task::JoinError) -> object_store::Error {
|
||||
object_store::Error::new(ErrorKind::Unexpected, "tokio task join failed").set_source(e)
|
||||
|
||||
@@ -85,8 +85,7 @@ impl<'a> MetadataLoader<'a> {
|
||||
.read_with(path)
|
||||
.range(buffer_start..file_size)
|
||||
.await
|
||||
.context(error::OpenDalSnafu)?
|
||||
.to_vec();
|
||||
.context(error::OpenDalSnafu)?;
|
||||
let buffer_len = buffer.len();
|
||||
|
||||
let mut footer = [0; 8];
|
||||
@@ -130,8 +129,7 @@ impl<'a> MetadataLoader<'a> {
|
||||
.read_with(path)
|
||||
.range(metadata_start..(file_size - FOOTER_SIZE as u64))
|
||||
.await
|
||||
.context(error::OpenDalSnafu)?
|
||||
.to_vec();
|
||||
.context(error::OpenDalSnafu)?;
|
||||
|
||||
let metadata = decode_metadata(&data).map_err(|e| {
|
||||
error::InvalidParquetSnafu {
|
||||
|
||||
@@ -16,7 +16,6 @@
|
||||
|
||||
use std::time::Duration;
|
||||
|
||||
use bytes::Bytes;
|
||||
use common_telemetry::{error, info, warn};
|
||||
use futures::TryStreamExt;
|
||||
use object_store::util::join_path;
|
||||
@@ -51,7 +50,7 @@ impl<S> RegionWorkerLoop<S> {
|
||||
region
|
||||
.access_layer
|
||||
.object_store()
|
||||
.write(&marker_path, Bytes::new())
|
||||
.write(&marker_path, vec![])
|
||||
.await
|
||||
.context(OpenDalSnafu)
|
||||
.inspect_err(|e| {
|
||||
|
||||
@@ -11,21 +11,23 @@ workspace = true
|
||||
services-memory = ["opendal/services-memory"]
|
||||
|
||||
[dependencies]
|
||||
async-trait = "0.1"
|
||||
bytes.workspace = true
|
||||
common-telemetry.workspace = true
|
||||
futures.workspace = true
|
||||
lazy_static.workspace = true
|
||||
md5 = "0.7"
|
||||
moka = { workspace = true, features = ["future"] }
|
||||
opendal = { version = "0.46", features = [
|
||||
opendal = { version = "0.45", features = [
|
||||
"layers-tracing",
|
||||
"rustls",
|
||||
"services-azblob",
|
||||
"services-fs",
|
||||
"services-gcs",
|
||||
"services-http",
|
||||
"services-oss",
|
||||
"services-s3",
|
||||
] }
|
||||
], default-features = false }
|
||||
prometheus.workspace = true
|
||||
uuid.workspace = true
|
||||
|
||||
@@ -33,4 +35,5 @@ uuid.workspace = true
|
||||
anyhow = "1.0"
|
||||
common-telemetry.workspace = true
|
||||
common-test-util.workspace = true
|
||||
opendal = { version = "0.45", features = ["services-memory"] }
|
||||
tokio.workspace = true
|
||||
|
||||
@@ -14,26 +14,27 @@
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use opendal::raw::oio::ReadDyn;
|
||||
use async_trait::async_trait;
|
||||
use opendal::raw::oio::Read;
|
||||
use opendal::raw::{
|
||||
Access, Layer, LayeredAccess, OpDelete, OpList, OpRead, OpWrite, RpDelete, RpList, RpRead,
|
||||
Accessor, Layer, LayeredAccessor, OpDelete, OpList, OpRead, OpWrite, RpDelete, RpList, RpRead,
|
||||
RpWrite,
|
||||
};
|
||||
use opendal::{Operator, Result};
|
||||
use opendal::Result;
|
||||
mod read_cache;
|
||||
use common_telemetry::info;
|
||||
use read_cache::ReadCache;
|
||||
|
||||
/// An opendal layer with local LRU file cache supporting.
|
||||
#[derive(Clone)]
|
||||
pub struct LruCacheLayer {
|
||||
pub struct LruCacheLayer<C: Clone> {
|
||||
// The read cache
|
||||
read_cache: ReadCache,
|
||||
read_cache: ReadCache<C>,
|
||||
}
|
||||
|
||||
impl LruCacheLayer {
|
||||
impl<C: Accessor + Clone> LruCacheLayer<C> {
|
||||
/// Create a `[LruCacheLayer]` with local file cache and capacity in bytes.
|
||||
pub async fn new(file_cache: Operator, capacity: usize) -> Result<Self> {
|
||||
pub async fn new(file_cache: Arc<C>, capacity: usize) -> Result<Self> {
|
||||
let read_cache = ReadCache::new(file_cache, capacity);
|
||||
let (entries, bytes) = read_cache.recover_cache().await?;
|
||||
|
||||
@@ -56,11 +57,11 @@ impl LruCacheLayer {
|
||||
}
|
||||
}
|
||||
|
||||
impl<I: Access> Layer<I> for LruCacheLayer {
|
||||
type LayeredAccess = LruCacheAccess<I>;
|
||||
impl<I: Accessor, C: Accessor + Clone> Layer<I> for LruCacheLayer<C> {
|
||||
type LayeredAccessor = LruCacheAccessor<I, C>;
|
||||
|
||||
fn layer(&self, inner: I) -> Self::LayeredAccess {
|
||||
LruCacheAccess {
|
||||
fn layer(&self, inner: I) -> Self::LayeredAccessor {
|
||||
LruCacheAccessor {
|
||||
inner,
|
||||
read_cache: self.read_cache.clone(),
|
||||
}
|
||||
@@ -68,14 +69,15 @@ impl<I: Access> Layer<I> for LruCacheLayer {
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct LruCacheAccess<I> {
|
||||
pub struct LruCacheAccessor<I, C: Clone> {
|
||||
inner: I,
|
||||
read_cache: ReadCache,
|
||||
read_cache: ReadCache<C>,
|
||||
}
|
||||
|
||||
impl<I: Access> LayeredAccess for LruCacheAccess<I> {
|
||||
#[async_trait]
|
||||
impl<I: Accessor, C: Accessor + Clone> LayeredAccessor for LruCacheAccessor<I, C> {
|
||||
type Inner = I;
|
||||
type Reader = Arc<dyn ReadDyn>;
|
||||
type Reader = Box<dyn Read>;
|
||||
type BlockingReader = I::BlockingReader;
|
||||
type Writer = I::Writer;
|
||||
type BlockingWriter = I::BlockingWriter;
|
||||
|
||||
@@ -15,12 +15,12 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_telemetry::debug;
|
||||
use futures::{FutureExt, StreamExt};
|
||||
use futures::FutureExt;
|
||||
use moka::future::Cache;
|
||||
use moka::notification::ListenerFuture;
|
||||
use opendal::raw::oio::{Read, ReadDyn, Reader};
|
||||
use opendal::raw::{Access, BytesRange, OpRead, OpStat, RpRead};
|
||||
use opendal::{Buffer, Error as OpendalError, ErrorKind, Operator, Result};
|
||||
use opendal::raw::oio::{ListExt, Read, ReadExt, Reader, WriteExt};
|
||||
use opendal::raw::{Accessor, OpDelete, OpList, OpRead, OpStat, OpWrite, RpRead};
|
||||
use opendal::{Error as OpendalError, ErrorKind, Result};
|
||||
|
||||
use crate::metrics::{
|
||||
OBJECT_STORE_LRU_CACHE_BYTES, OBJECT_STORE_LRU_CACHE_ENTRIES, OBJECT_STORE_LRU_CACHE_HIT,
|
||||
@@ -52,22 +52,26 @@ fn can_cache(path: &str) -> bool {
|
||||
}
|
||||
|
||||
/// Generate an unique cache key for the read path and range.
|
||||
fn read_cache_key(path: &str, range: BytesRange) -> String {
|
||||
format!("{:x}.cache-{}", md5::compute(path), range.to_header())
|
||||
fn read_cache_key(path: &str, args: &OpRead) -> String {
|
||||
format!(
|
||||
"{:x}.cache-{}",
|
||||
md5::compute(path),
|
||||
args.range().to_header()
|
||||
)
|
||||
}
|
||||
|
||||
/// Local read cache for files in object storage
|
||||
#[derive(Clone, Debug)]
|
||||
pub(crate) struct ReadCache {
|
||||
pub(crate) struct ReadCache<C: Clone> {
|
||||
/// Local file cache backend
|
||||
file_cache: Operator,
|
||||
file_cache: Arc<C>,
|
||||
/// Local memory cache to track local cache files
|
||||
mem_cache: Cache<String, ReadResult>,
|
||||
}
|
||||
|
||||
impl ReadCache {
|
||||
impl<C: Accessor + Clone> ReadCache<C> {
|
||||
/// Create a [`ReadCache`] with capacity in bytes.
|
||||
pub(crate) fn new(file_cache: Operator, capacity: usize) -> Self {
|
||||
pub(crate) fn new(file_cache: Arc<C>, capacity: usize) -> Self {
|
||||
let file_cache_cloned = file_cache.clone();
|
||||
let eviction_listener =
|
||||
move |read_key: Arc<String>, read_result: ReadResult, cause| -> ListenerFuture {
|
||||
@@ -79,7 +83,7 @@ impl ReadCache {
|
||||
if let ReadResult::Success(size) = read_result {
|
||||
OBJECT_STORE_LRU_CACHE_BYTES.sub(size as i64);
|
||||
|
||||
let result = file_cache_cloned.delete(&read_key).await;
|
||||
let result = file_cache_cloned.delete(&read_key, OpDelete::new()).await;
|
||||
debug!(
|
||||
"Deleted local cache file `{}`, result: {:?}, cause: {:?}.",
|
||||
read_key, result, cause
|
||||
@@ -129,17 +133,17 @@ impl ReadCache {
|
||||
/// Recover existing cache items from `file_cache` to `mem_cache`.
|
||||
/// Return entry count and total approximate entry size in bytes.
|
||||
pub(crate) async fn recover_cache(&self) -> Result<(u64, u64)> {
|
||||
let mut pager = self.file_cache.lister("/").await?;
|
||||
let (_, mut pager) = self.file_cache.list("/", OpList::default()).await?;
|
||||
|
||||
while let Some(entry) = pager.next().await.transpose()? {
|
||||
while let Some(entry) = pager.next().await? {
|
||||
let read_key = entry.path();
|
||||
|
||||
// We can't retrieve the metadata from `[opendal::raw::oio::Entry]` directly,
|
||||
// because it's private field.
|
||||
let size = {
|
||||
let stat = self.file_cache.stat(read_key).await?;
|
||||
let stat = self.file_cache.stat(read_key, OpStat::default()).await?;
|
||||
|
||||
stat.content_length()
|
||||
stat.into_metadata().content_length()
|
||||
};
|
||||
|
||||
OBJECT_STORE_LRU_CACHE_ENTRIES.inc();
|
||||
@@ -155,7 +159,8 @@ impl ReadCache {
|
||||
/// Returns true when the read cache contains the specific file.
|
||||
pub(crate) async fn contains_file(&self, path: &str) -> bool {
|
||||
self.mem_cache.run_pending_tasks().await;
|
||||
self.mem_cache.contains_key(path) && self.file_cache.stat(path).await.is_ok()
|
||||
self.mem_cache.contains_key(path)
|
||||
&& self.file_cache.stat(path, OpStat::default()).await.is_ok()
|
||||
}
|
||||
|
||||
/// Read from a specific path using the OpRead operation.
|
||||
@@ -168,54 +173,86 @@ impl ReadCache {
|
||||
inner: &I,
|
||||
path: &str,
|
||||
args: OpRead,
|
||||
) -> Result<(RpRead, Arc<dyn ReadDyn>)>
|
||||
) -> Result<(RpRead, Box<dyn Read>)>
|
||||
where
|
||||
I: Access,
|
||||
I: Accessor,
|
||||
{
|
||||
if !can_cache(path) {
|
||||
return inner.read(path, args).await.map(to_output_reader);
|
||||
}
|
||||
|
||||
// FIXME: remove this block after opendal v0.47 released.
|
||||
let meta = inner.stat(path, OpStat::new()).await?;
|
||||
let (rp, reader) = inner.read(path, args).await?;
|
||||
let reader: ReadCacheReader<I> = ReadCacheReader {
|
||||
path: Arc::new(path.to_string()),
|
||||
inner_reader: reader,
|
||||
size: meta.into_metadata().content_length(),
|
||||
file_cache: self.file_cache.clone(),
|
||||
mem_cache: self.mem_cache.clone(),
|
||||
};
|
||||
Ok((rp, Arc::new(reader)))
|
||||
let read_key = read_cache_key(path, &args);
|
||||
|
||||
let read_result = self
|
||||
.mem_cache
|
||||
.try_get_with(
|
||||
read_key.clone(),
|
||||
self.read_remote(inner, &read_key, path, args.clone()),
|
||||
)
|
||||
.await
|
||||
.map_err(|e| OpendalError::new(e.kind(), &e.to_string()))?;
|
||||
|
||||
match read_result {
|
||||
ReadResult::Success(_) => {
|
||||
// There is a concurrent issue here, the local cache may be purged
|
||||
// while reading, we have to fallback to remote read
|
||||
match self.file_cache.read(&read_key, OpRead::default()).await {
|
||||
Ok(ret) => {
|
||||
OBJECT_STORE_LRU_CACHE_HIT
|
||||
.with_label_values(&["success"])
|
||||
.inc();
|
||||
Ok(to_output_reader(ret))
|
||||
}
|
||||
Err(_) => {
|
||||
OBJECT_STORE_LRU_CACHE_MISS.inc();
|
||||
inner.read(path, args).await.map(to_output_reader)
|
||||
}
|
||||
}
|
||||
}
|
||||
ReadResult::NotFound => {
|
||||
OBJECT_STORE_LRU_CACHE_HIT
|
||||
.with_label_values(&["not_found"])
|
||||
.inc();
|
||||
|
||||
Err(OpendalError::new(
|
||||
ErrorKind::NotFound,
|
||||
&format!("File not found: {path}"),
|
||||
))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct ReadCacheReader<I: Access> {
|
||||
/// Path of the file
|
||||
path: Arc<String>,
|
||||
/// Remote file reader.
|
||||
inner_reader: I::Reader,
|
||||
/// FIXME: remove this field after opendal v0.47 released.
|
||||
///
|
||||
/// OpenDAL's read_at takes `offset, limit` which means the underlying storage
|
||||
/// services could return less data than limit. We store size here as a workaround.
|
||||
///
|
||||
/// This API has been refactor into `offset, size` instead. After opendal v0.47 released,
|
||||
/// we don't need this anymore.
|
||||
size: u64,
|
||||
/// Local file cache backend
|
||||
file_cache: Operator,
|
||||
/// Local memory cache to track local cache files
|
||||
mem_cache: Cache<String, ReadResult>,
|
||||
}
|
||||
async fn try_write_cache<I>(&self, mut reader: I::Reader, read_key: &str) -> Result<usize>
|
||||
where
|
||||
I: Accessor,
|
||||
{
|
||||
let (_, mut writer) = self.file_cache.write(read_key, OpWrite::new()).await?;
|
||||
let mut total = 0;
|
||||
while let Some(bytes) = reader.next().await {
|
||||
let bytes = &bytes?;
|
||||
total += bytes.len();
|
||||
writer.write(bytes).await?;
|
||||
}
|
||||
// Call `close` to ensure data is written.
|
||||
writer.close().await?;
|
||||
Ok(total)
|
||||
}
|
||||
|
||||
impl<I: Access> ReadCacheReader<I> {
|
||||
/// TODO: we can return the Buffer directly to avoid another read from cache.
|
||||
async fn read_remote(&self, offset: u64, limit: usize) -> Result<ReadResult> {
|
||||
/// Read the file from remote storage. If success, write the content into local cache.
|
||||
async fn read_remote<I>(
|
||||
&self,
|
||||
inner: &I,
|
||||
read_key: &str,
|
||||
path: &str,
|
||||
args: OpRead,
|
||||
) -> Result<ReadResult>
|
||||
where
|
||||
I: Accessor,
|
||||
{
|
||||
OBJECT_STORE_LRU_CACHE_MISS.inc();
|
||||
|
||||
let buf = self.inner_reader.read_at(offset, limit).await?;
|
||||
let result = self.try_write_cache(buf, offset).await;
|
||||
let (_, reader) = inner.read(path, args).await?;
|
||||
let result = self.try_write_cache::<I>(reader, read_key).await;
|
||||
|
||||
match result {
|
||||
Ok(read_bytes) => {
|
||||
@@ -242,59 +279,10 @@ impl<I: Access> ReadCacheReader<I> {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn try_write_cache(&self, buf: Buffer, offset: u64) -> Result<usize> {
|
||||
let size = buf.len();
|
||||
let read_key = read_cache_key(&self.path, BytesRange::new(offset, Some(size as _)));
|
||||
self.file_cache.write(&read_key, buf).await?;
|
||||
Ok(size)
|
||||
}
|
||||
}
|
||||
|
||||
impl<I: Access> Read for ReadCacheReader<I> {
|
||||
async fn read_at(&self, offset: u64, limit: usize) -> Result<Buffer> {
|
||||
let size = self.size.min(offset + limit as u64) - offset;
|
||||
let read_key = read_cache_key(&self.path, BytesRange::new(offset, Some(size as _)));
|
||||
|
||||
let read_result = self
|
||||
.mem_cache
|
||||
.try_get_with(read_key.clone(), self.read_remote(offset, limit))
|
||||
.await
|
||||
.map_err(|e| OpendalError::new(e.kind(), &e.to_string()))?;
|
||||
|
||||
match read_result {
|
||||
ReadResult::Success(_) => {
|
||||
// There is a concurrent issue here, the local cache may be purged
|
||||
// while reading, we have to fallback to remote read
|
||||
match self.file_cache.read(&read_key).await {
|
||||
Ok(ret) => {
|
||||
OBJECT_STORE_LRU_CACHE_HIT
|
||||
.with_label_values(&["success"])
|
||||
.inc();
|
||||
Ok(ret)
|
||||
}
|
||||
Err(_) => {
|
||||
OBJECT_STORE_LRU_CACHE_MISS.inc();
|
||||
self.inner_reader.read_at(offset, limit).await
|
||||
}
|
||||
}
|
||||
}
|
||||
ReadResult::NotFound => {
|
||||
OBJECT_STORE_LRU_CACHE_HIT
|
||||
.with_label_values(&["not_found"])
|
||||
.inc();
|
||||
|
||||
Err(OpendalError::new(
|
||||
ErrorKind::NotFound,
|
||||
&format!("File not found: {}", self.path),
|
||||
))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn to_output_reader<R: Read + 'static>(input: (RpRead, R)) -> (RpRead, Reader) {
|
||||
(input.0, Arc::new(input.1))
|
||||
(input.0, Box::new(input.1))
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
@@ -15,11 +15,16 @@
|
||||
//! code originally from <https://github.com/apache/incubator-opendal/blob/main/core/src/layers/prometheus.rs>, make a tiny change to avoid crash in multi thread env
|
||||
|
||||
use std::fmt::{Debug, Formatter};
|
||||
use std::io;
|
||||
use std::task::{Context, Poll};
|
||||
|
||||
use async_trait::async_trait;
|
||||
use bytes::Bytes;
|
||||
use common_telemetry::debug;
|
||||
use futures::FutureExt;
|
||||
use lazy_static::lazy_static;
|
||||
use opendal::raw::*;
|
||||
use opendal::{Buffer, ErrorKind};
|
||||
use opendal::ErrorKind;
|
||||
use prometheus::{
|
||||
exponential_buckets, histogram_opts, register_histogram_vec, register_int_counter_vec,
|
||||
Histogram, HistogramTimer, HistogramVec, IntCounterVec,
|
||||
@@ -84,14 +89,14 @@ fn increment_errors_total(op: Operation, kind: ErrorKind) {
|
||||
#[derive(Default, Debug, Clone)]
|
||||
pub struct PrometheusMetricsLayer;
|
||||
|
||||
impl<A: Access> Layer<A> for PrometheusMetricsLayer {
|
||||
type LayeredAccess = PrometheusAccess<A>;
|
||||
impl<A: Accessor> Layer<A> for PrometheusMetricsLayer {
|
||||
type LayeredAccessor = PrometheusAccessor<A>;
|
||||
|
||||
fn layer(&self, inner: A) -> Self::LayeredAccess {
|
||||
fn layer(&self, inner: A) -> Self::LayeredAccessor {
|
||||
let meta = inner.info();
|
||||
let scheme = meta.scheme();
|
||||
|
||||
PrometheusAccess {
|
||||
PrometheusAccessor {
|
||||
inner,
|
||||
scheme: scheme.to_string(),
|
||||
}
|
||||
@@ -99,12 +104,12 @@ impl<A: Access> Layer<A> for PrometheusMetricsLayer {
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct PrometheusAccess<A: Access> {
|
||||
pub struct PrometheusAccessor<A: Accessor> {
|
||||
inner: A,
|
||||
scheme: String,
|
||||
}
|
||||
|
||||
impl<A: Access> Debug for PrometheusAccess<A> {
|
||||
impl<A: Accessor> Debug for PrometheusAccessor<A> {
|
||||
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("PrometheusAccessor")
|
||||
.field("inner", &self.inner)
|
||||
@@ -112,7 +117,8 @@ impl<A: Access> Debug for PrometheusAccess<A> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<A: Access> LayeredAccess for PrometheusAccess<A> {
|
||||
#[async_trait]
|
||||
impl<A: Accessor> LayeredAccessor for PrometheusAccessor<A> {
|
||||
type Inner = A;
|
||||
type Reader = PrometheusMetricWrapper<A::Reader>;
|
||||
type BlockingReader = PrometheusMetricWrapper<A::BlockingReader>;
|
||||
@@ -151,20 +157,27 @@ impl<A: Access> LayeredAccess for PrometheusAccess<A> {
|
||||
.with_label_values(&[&self.scheme, Operation::Read.into_static()])
|
||||
.start_timer();
|
||||
|
||||
let (rp, r) = self.inner.read(path, args).await.map_err(|e| {
|
||||
increment_errors_total(Operation::Read, e.kind());
|
||||
e
|
||||
})?;
|
||||
|
||||
Ok((
|
||||
rp,
|
||||
PrometheusMetricWrapper::new(
|
||||
r,
|
||||
Operation::Read,
|
||||
BYTES_TOTAL.with_label_values(&[&self.scheme, Operation::Read.into_static()]),
|
||||
timer,
|
||||
),
|
||||
))
|
||||
self.inner
|
||||
.read(path, args)
|
||||
.map(|v| {
|
||||
v.map(|(rp, r)| {
|
||||
(
|
||||
rp,
|
||||
PrometheusMetricWrapper::new(
|
||||
r,
|
||||
Operation::Read,
|
||||
BYTES_TOTAL
|
||||
.with_label_values(&[&self.scheme, Operation::Read.into_static()]),
|
||||
timer,
|
||||
),
|
||||
)
|
||||
})
|
||||
})
|
||||
.await
|
||||
.map_err(|e| {
|
||||
increment_errors_total(Operation::Read, e.kind());
|
||||
e
|
||||
})
|
||||
}
|
||||
|
||||
async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
|
||||
@@ -176,20 +189,27 @@ impl<A: Access> LayeredAccess for PrometheusAccess<A> {
|
||||
.with_label_values(&[&self.scheme, Operation::Write.into_static()])
|
||||
.start_timer();
|
||||
|
||||
let (rp, r) = self.inner.write(path, args).await.map_err(|e| {
|
||||
increment_errors_total(Operation::Write, e.kind());
|
||||
e
|
||||
})?;
|
||||
|
||||
Ok((
|
||||
rp,
|
||||
PrometheusMetricWrapper::new(
|
||||
r,
|
||||
Operation::Write,
|
||||
BYTES_TOTAL.with_label_values(&[&self.scheme, Operation::Write.into_static()]),
|
||||
timer,
|
||||
),
|
||||
))
|
||||
self.inner
|
||||
.write(path, args)
|
||||
.map(|v| {
|
||||
v.map(|(rp, r)| {
|
||||
(
|
||||
rp,
|
||||
PrometheusMetricWrapper::new(
|
||||
r,
|
||||
Operation::Write,
|
||||
BYTES_TOTAL
|
||||
.with_label_values(&[&self.scheme, Operation::Write.into_static()]),
|
||||
timer,
|
||||
),
|
||||
)
|
||||
})
|
||||
})
|
||||
.await
|
||||
.map_err(|e| {
|
||||
increment_errors_total(Operation::Write, e.kind());
|
||||
e
|
||||
})
|
||||
}
|
||||
|
||||
async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
|
||||
@@ -441,46 +461,103 @@ impl<R> PrometheusMetricWrapper<R> {
|
||||
}
|
||||
|
||||
impl<R: oio::Read> oio::Read for PrometheusMetricWrapper<R> {
|
||||
async fn read_at(&self, offset: u64, limit: usize) -> Result<Buffer> {
|
||||
self.inner.read_at(offset, limit).await.map_err(|err| {
|
||||
increment_errors_total(self.op, err.kind());
|
||||
err
|
||||
fn poll_read(&mut self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<Result<usize>> {
|
||||
self.inner.poll_read(cx, buf).map(|res| match res {
|
||||
Ok(bytes) => {
|
||||
self.bytes += bytes as u64;
|
||||
Ok(bytes)
|
||||
}
|
||||
Err(e) => {
|
||||
increment_errors_total(self.op, e.kind());
|
||||
Err(e)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
fn poll_seek(&mut self, cx: &mut Context<'_>, pos: io::SeekFrom) -> Poll<Result<u64>> {
|
||||
self.inner.poll_seek(cx, pos).map(|res| match res {
|
||||
Ok(n) => Ok(n),
|
||||
Err(e) => {
|
||||
increment_errors_total(self.op, e.kind());
|
||||
Err(e)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<Bytes>>> {
|
||||
self.inner.poll_next(cx).map(|res| match res {
|
||||
Some(Ok(bytes)) => {
|
||||
self.bytes += bytes.len() as u64;
|
||||
Some(Ok(bytes))
|
||||
}
|
||||
Some(Err(e)) => {
|
||||
increment_errors_total(self.op, e.kind());
|
||||
Some(Err(e))
|
||||
}
|
||||
None => None,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl<R: oio::BlockingRead> oio::BlockingRead for PrometheusMetricWrapper<R> {
|
||||
fn read_at(&self, offset: u64, limit: usize) -> opendal::Result<Buffer> {
|
||||
self.inner.read_at(offset, limit).map_err(|err| {
|
||||
fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
|
||||
self.inner
|
||||
.read(buf)
|
||||
.map(|n| {
|
||||
self.bytes += n as u64;
|
||||
n
|
||||
})
|
||||
.map_err(|e| {
|
||||
increment_errors_total(self.op, e.kind());
|
||||
e
|
||||
})
|
||||
}
|
||||
|
||||
fn seek(&mut self, pos: io::SeekFrom) -> Result<u64> {
|
||||
self.inner.seek(pos).map_err(|err| {
|
||||
increment_errors_total(self.op, err.kind());
|
||||
err
|
||||
})
|
||||
}
|
||||
|
||||
fn next(&mut self) -> Option<Result<Bytes>> {
|
||||
self.inner.next().map(|res| match res {
|
||||
Ok(bytes) => {
|
||||
self.bytes += bytes.len() as u64;
|
||||
Ok(bytes)
|
||||
}
|
||||
Err(e) => {
|
||||
increment_errors_total(self.op, e.kind());
|
||||
Err(e)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl<R: oio::Write> oio::Write for PrometheusMetricWrapper<R> {
|
||||
async fn write(&mut self, bs: Buffer) -> Result<usize> {
|
||||
match self.inner.write(bs).await {
|
||||
Ok(n) => {
|
||||
fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Poll<Result<usize>> {
|
||||
self.inner
|
||||
.poll_write(cx, bs)
|
||||
.map_ok(|n| {
|
||||
self.bytes += n as u64;
|
||||
Ok(n)
|
||||
}
|
||||
Err(err) => {
|
||||
n
|
||||
})
|
||||
.map_err(|err| {
|
||||
increment_errors_total(self.op, err.kind());
|
||||
Err(err)
|
||||
}
|
||||
}
|
||||
err
|
||||
})
|
||||
}
|
||||
|
||||
async fn close(&mut self) -> Result<()> {
|
||||
self.inner.close().await.map_err(|err| {
|
||||
fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
|
||||
self.inner.poll_abort(cx).map_err(|err| {
|
||||
increment_errors_total(self.op, err.kind());
|
||||
err
|
||||
})
|
||||
}
|
||||
|
||||
async fn abort(&mut self) -> Result<()> {
|
||||
self.inner.close().await.map_err(|err| {
|
||||
fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
|
||||
self.inner.poll_close(cx).map_err(|err| {
|
||||
increment_errors_total(self.op, err.kind());
|
||||
err
|
||||
})
|
||||
@@ -488,7 +565,7 @@ impl<R: oio::Write> oio::Write for PrometheusMetricWrapper<R> {
|
||||
}
|
||||
|
||||
impl<R: oio::BlockingWrite> oio::BlockingWrite for PrometheusMetricWrapper<R> {
|
||||
fn write(&mut self, bs: Buffer) -> Result<usize> {
|
||||
fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
|
||||
self.inner
|
||||
.write(bs)
|
||||
.map(|n| {
|
||||
|
||||
@@ -14,9 +14,8 @@
|
||||
|
||||
pub use opendal::raw::{normalize_path as raw_normalize_path, HttpClient};
|
||||
pub use opendal::{
|
||||
services, Builder as ObjectStoreBuilder, Entry, EntryMode, Error, ErrorKind,
|
||||
FuturesAsyncReader, FuturesAsyncWriter, Lister, Metakey, Operator as ObjectStore, Reader,
|
||||
Result, Writer,
|
||||
services, Builder as ObjectStoreBuilder, Entry, EntryMode, Error, ErrorKind, Lister, Metakey,
|
||||
Operator as ObjectStore, Reader, Result, Writer,
|
||||
};
|
||||
|
||||
pub mod layers;
|
||||
|
||||
@@ -22,6 +22,7 @@ use object_store::layers::LruCacheLayer;
|
||||
use object_store::services::{Fs, S3};
|
||||
use object_store::test_util::TempFolder;
|
||||
use object_store::{ObjectStore, ObjectStoreBuilder};
|
||||
use opendal::raw::Accessor;
|
||||
use opendal::services::{Azblob, Gcs, Oss};
|
||||
use opendal::{EntryMode, Operator, OperatorBuilder};
|
||||
|
||||
@@ -35,11 +36,11 @@ async fn test_object_crud(store: &ObjectStore) -> Result<()> {
|
||||
|
||||
// Read data from object;
|
||||
let bs = store.read(file_name).await?;
|
||||
assert_eq!("Hello, World!", String::from_utf8(bs.to_vec())?);
|
||||
assert_eq!("Hello, World!", String::from_utf8(bs)?);
|
||||
|
||||
// Read range from object;
|
||||
let bs = store.read_with(file_name).range(1..=11).await?;
|
||||
assert_eq!("ello, World", String::from_utf8(bs.to_vec())?);
|
||||
assert_eq!("ello, World", String::from_utf8(bs)?);
|
||||
|
||||
// Get object's Metadata
|
||||
let meta = store.stat(file_name).await?;
|
||||
@@ -76,7 +77,7 @@ async fn test_object_list(store: &ObjectStore) -> Result<()> {
|
||||
assert_eq!(p2, entries.first().unwrap().path());
|
||||
|
||||
let content = store.read(p2).await?;
|
||||
assert_eq!("Hello, object2!", String::from_utf8(content.to_vec())?);
|
||||
assert_eq!("Hello, object2!", String::from_utf8(content)?);
|
||||
|
||||
store.delete(p2).await?;
|
||||
let entries = store.list("/").await?;
|
||||
@@ -235,9 +236,11 @@ async fn test_file_backend_with_lru_cache() -> Result<()> {
|
||||
let _ = builder
|
||||
.root(&cache_dir.path().to_string_lossy())
|
||||
.atomic_write_dir(&cache_dir.path().to_string_lossy());
|
||||
let file_cache = Operator::new(builder).unwrap().finish();
|
||||
let file_cache = Arc::new(builder.build().unwrap());
|
||||
|
||||
LruCacheLayer::new(file_cache, 32).await.unwrap()
|
||||
LruCacheLayer::new(Arc::new(file_cache.clone()), 32)
|
||||
.await
|
||||
.unwrap()
|
||||
};
|
||||
|
||||
let store = store.layer(cache_layer.clone());
|
||||
@@ -250,7 +253,10 @@ async fn test_file_backend_with_lru_cache() -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn assert_lru_cache(cache_layer: &LruCacheLayer, file_names: &[&str]) {
|
||||
async fn assert_lru_cache<C: Accessor + Clone>(
|
||||
cache_layer: &LruCacheLayer<C>,
|
||||
file_names: &[&str],
|
||||
) {
|
||||
for file_name in file_names {
|
||||
assert!(cache_layer.contains_file(file_name).await);
|
||||
}
|
||||
@@ -272,7 +278,7 @@ async fn assert_cache_files(
|
||||
let bs = store.read(o.path()).await.unwrap();
|
||||
assert_eq!(
|
||||
file_contents[position],
|
||||
String::from_utf8(bs.to_vec())?,
|
||||
String::from_utf8(bs.clone())?,
|
||||
"file content not match: {}",
|
||||
o.name()
|
||||
);
|
||||
@@ -306,7 +312,9 @@ async fn test_object_store_cache_policy() -> Result<()> {
|
||||
let cache_store = OperatorBuilder::new(file_cache.clone()).finish();
|
||||
|
||||
// create operator for cache dir to verify cache file
|
||||
let cache_layer = LruCacheLayer::new(cache_store.clone(), 38).await.unwrap();
|
||||
let cache_layer = LruCacheLayer::new(Arc::new(file_cache.clone()), 38)
|
||||
.await
|
||||
.unwrap();
|
||||
let store = store.layer(cache_layer.clone());
|
||||
|
||||
// create several object handler.
|
||||
@@ -378,7 +386,7 @@ async fn test_object_store_cache_policy() -> Result<()> {
|
||||
// instead of returning `NotFound` during the reader creation.
|
||||
// The entry count is 4, because we have the p2 `NotFound` cache.
|
||||
assert!(store.read_with(p2).range(0..4).await.is_err());
|
||||
assert_eq!(cache_layer.read_cache_stat().await, (3, 35));
|
||||
assert_eq!(cache_layer.read_cache_stat().await, (4, 35));
|
||||
|
||||
assert_cache_files(
|
||||
&cache_store,
|
||||
@@ -406,7 +414,7 @@ async fn test_object_store_cache_policy() -> Result<()> {
|
||||
assert!(store.read(p2).await.is_err());
|
||||
// Read p1 with range `1..` , the existing p1 with range `0..` must be evicted.
|
||||
let _ = store.read_with(p1).range(1..15).await.unwrap();
|
||||
assert_eq!(cache_layer.read_cache_stat().await, (3, 34));
|
||||
assert_eq!(cache_layer.read_cache_stat().await, (4, 34));
|
||||
assert_cache_files(
|
||||
&cache_store,
|
||||
&[
|
||||
@@ -434,7 +442,7 @@ async fn test_object_store_cache_policy() -> Result<()> {
|
||||
|
||||
drop(cache_layer);
|
||||
// Test recover
|
||||
let cache_layer = LruCacheLayer::new(cache_store, 38).await.unwrap();
|
||||
let cache_layer = LruCacheLayer::new(Arc::new(file_cache), 38).await.unwrap();
|
||||
|
||||
// The p2 `NotFound` cache will not be recovered
|
||||
assert_eq!(cache_layer.read_cache_stat().await, (3, 34));
|
||||
|
||||
@@ -56,7 +56,6 @@ store-api.workspace = true
|
||||
substrait.workspace = true
|
||||
table.workspace = true
|
||||
tokio.workspace = true
|
||||
tokio-util.workspace = true
|
||||
tonic.workspace = true
|
||||
|
||||
[dev-dependencies]
|
||||
|
||||
@@ -20,7 +20,7 @@ use client::{Output, OutputData, OutputMeta};
|
||||
use common_base::readable_size::ReadableSize;
|
||||
use common_datasource::file_format::csv::{CsvConfigBuilder, CsvFormat, CsvOpener};
|
||||
use common_datasource::file_format::json::{JsonFormat, JsonOpener};
|
||||
use common_datasource::file_format::orc::{infer_orc_schema, new_orc_stream_reader, ReaderAdapter};
|
||||
use common_datasource::file_format::orc::{infer_orc_schema, new_orc_stream_reader};
|
||||
use common_datasource::file_format::{FileFormat, Format};
|
||||
use common_datasource::lister::{Lister, Source};
|
||||
use common_datasource::object_store::{build_backend, parse_url};
|
||||
@@ -46,7 +46,6 @@ use session::context::QueryContextRef;
|
||||
use snafu::ResultExt;
|
||||
use table::requests::{CopyTableRequest, InsertRequest};
|
||||
use table::table_reference::TableReference;
|
||||
use tokio_util::compat::FuturesAsyncReadCompatExt;
|
||||
|
||||
use crate::error::{self, IntoVectorsSnafu, Result};
|
||||
use crate::statement::StatementExecutor;
|
||||
@@ -147,16 +146,10 @@ impl StatementExecutor {
|
||||
path,
|
||||
}),
|
||||
Format::Parquet(_) => {
|
||||
let meta = object_store
|
||||
.stat(&path)
|
||||
.await
|
||||
.context(error::ReadObjectSnafu { path: &path })?;
|
||||
let mut reader = object_store
|
||||
.reader(&path)
|
||||
.await
|
||||
.context(error::ReadObjectSnafu { path: &path })?
|
||||
.into_futures_async_read(0..meta.content_length())
|
||||
.compat();
|
||||
.context(error::ReadObjectSnafu { path: &path })?;
|
||||
let metadata = ArrowReaderMetadata::load_async(&mut reader, Default::default())
|
||||
.await
|
||||
.context(error::ReadParquetMetadataSnafu)?;
|
||||
@@ -168,17 +161,12 @@ impl StatementExecutor {
|
||||
})
|
||||
}
|
||||
Format::Orc(_) => {
|
||||
let meta = object_store
|
||||
.stat(&path)
|
||||
.await
|
||||
.context(error::ReadObjectSnafu { path: &path })?;
|
||||
|
||||
let reader = object_store
|
||||
.reader(&path)
|
||||
.await
|
||||
.context(error::ReadObjectSnafu { path: &path })?;
|
||||
|
||||
let schema = infer_orc_schema(ReaderAdapter::new(reader, meta.content_length()))
|
||||
let schema = infer_orc_schema(reader)
|
||||
.await
|
||||
.context(error::ReadOrcSnafu)?;
|
||||
|
||||
@@ -291,17 +279,11 @@ impl StatementExecutor {
|
||||
)))
|
||||
}
|
||||
FileMetadata::Parquet { metadata, path, .. } => {
|
||||
let meta = object_store
|
||||
.stat(path)
|
||||
.await
|
||||
.context(error::ReadObjectSnafu { path })?;
|
||||
let reader = object_store
|
||||
.reader_with(path)
|
||||
.chunk(DEFAULT_READ_BUFFER)
|
||||
.buffer(DEFAULT_READ_BUFFER)
|
||||
.await
|
||||
.context(error::ReadObjectSnafu { path })?
|
||||
.into_futures_async_read(0..meta.content_length())
|
||||
.compat();
|
||||
.context(error::ReadObjectSnafu { path })?;
|
||||
let builder =
|
||||
ParquetRecordBatchStreamBuilder::new_with_metadata(reader, metadata.clone());
|
||||
let stream = builder
|
||||
@@ -320,20 +302,14 @@ impl StatementExecutor {
|
||||
)))
|
||||
}
|
||||
FileMetadata::Orc { path, .. } => {
|
||||
let meta = object_store
|
||||
.stat(path)
|
||||
.await
|
||||
.context(error::ReadObjectSnafu { path })?;
|
||||
|
||||
let reader = object_store
|
||||
.reader_with(path)
|
||||
.chunk(DEFAULT_READ_BUFFER)
|
||||
.buffer(DEFAULT_READ_BUFFER)
|
||||
.await
|
||||
.context(error::ReadObjectSnafu { path })?;
|
||||
let stream =
|
||||
new_orc_stream_reader(ReaderAdapter::new(reader, meta.content_length()))
|
||||
.await
|
||||
.context(error::ReadOrcSnafu)?;
|
||||
let stream = new_orc_stream_reader(reader)
|
||||
.await
|
||||
.context(error::ReadOrcSnafu)?;
|
||||
|
||||
let projected_schema = Arc::new(
|
||||
compat_schema
|
||||
|
||||
@@ -16,12 +16,14 @@ use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use axum::http::HeaderValue;
|
||||
use common_base::Plugins;
|
||||
use common_telemetry::metric::{convert_metric_to_write_request, MetricFilter};
|
||||
use common_telemetry::{error, info};
|
||||
use common_time::Timestamp;
|
||||
use hyper::HeaderMap;
|
||||
use prost::Message;
|
||||
use reqwest::header::{HeaderMap, HeaderName, HeaderValue};
|
||||
use reqwest::header::HeaderName;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use session::context::QueryContextBuilder;
|
||||
use snafu::{ensure, ResultExt};
|
||||
@@ -113,7 +115,7 @@ impl ExportMetricsTask {
|
||||
}
|
||||
);
|
||||
}
|
||||
let mut headers = HeaderMap::new();
|
||||
let mut headers = reqwest::header::HeaderMap::new();
|
||||
if let Some(remote_write) = &config.remote_write {
|
||||
ensure!(
|
||||
!remote_write.url.is_empty(),
|
||||
|
||||
@@ -14,10 +14,10 @@
|
||||
|
||||
use std::collections::HashMap;
|
||||
|
||||
use axum::headers::HeaderValue;
|
||||
use axum::response::{IntoResponse, Response};
|
||||
use axum::Json;
|
||||
use common_query::Output;
|
||||
use reqwest::header::HeaderValue;
|
||||
use schemars::JsonSchema;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_json::Value;
|
||||
|
||||
@@ -32,7 +32,6 @@
|
||||
|
||||
use std::convert::TryFrom;
|
||||
use std::net::{SocketAddr, TcpListener};
|
||||
use std::str::FromStr;
|
||||
|
||||
use axum::body::HttpBody;
|
||||
use axum::BoxError;
|
||||
@@ -170,15 +169,7 @@ impl RequestBuilder {
|
||||
HeaderValue: TryFrom<V>,
|
||||
<HeaderValue as TryFrom<V>>::Error: Into<http::Error>,
|
||||
{
|
||||
// TODO(tisonkun): revert once http bump to 1.x
|
||||
let key: HeaderName = key.try_into().map_err(Into::into).unwrap();
|
||||
let key = reqwest::header::HeaderName::from_bytes(key.as_ref()).unwrap();
|
||||
|
||||
let value: HeaderValue = value.try_into().map_err(Into::into).unwrap();
|
||||
let value = reqwest::header::HeaderValue::from_bytes(value.as_bytes()).unwrap();
|
||||
|
||||
self.builder = self.builder.header(key, value);
|
||||
|
||||
self
|
||||
}
|
||||
|
||||
@@ -219,19 +210,12 @@ impl TestResponse {
|
||||
|
||||
/// Get the response status.
|
||||
pub fn status(&self) -> StatusCode {
|
||||
StatusCode::from_u16(self.response.status().as_u16()).unwrap()
|
||||
self.response.status()
|
||||
}
|
||||
|
||||
/// Get the response headers.
|
||||
pub fn headers(&self) -> http::HeaderMap {
|
||||
// TODO(tisonkun): revert once http bump to 1.x
|
||||
let mut headers = http::HeaderMap::new();
|
||||
for (key, value) in self.response.headers() {
|
||||
let key = HeaderName::from_str(key.as_str()).unwrap();
|
||||
let value = HeaderValue::from_bytes(value.as_bytes()).unwrap();
|
||||
headers.insert(key, value);
|
||||
}
|
||||
headers
|
||||
pub fn headers(&self) -> &http::HeaderMap {
|
||||
self.response.headers()
|
||||
}
|
||||
|
||||
/// Get the response in chunks.
|
||||
|
||||
Reference in New Issue
Block a user