mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-26 08:00:01 +00:00
Compare commits
22 Commits
feat/query
...
feat/bulk-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
97e9b97a57 | ||
|
|
7fc74e2928 | ||
|
|
77f20ede7a | ||
|
|
ced018fce0 | ||
|
|
41dacff283 | ||
|
|
94a14b6da7 | ||
|
|
6ad3a32cb2 | ||
|
|
ac00314578 | ||
|
|
2f08bee08f | ||
|
|
8ebb31cdcd | ||
|
|
f4f8d65a39 | ||
|
|
b31990e881 | ||
|
|
6da633e70d | ||
|
|
9633e794c7 | ||
|
|
eaf1e1198f | ||
|
|
505bf25505 | ||
|
|
f1b29ece3c | ||
|
|
74df12e8c0 | ||
|
|
be6a5d2da8 | ||
|
|
7468a8ab2a | ||
|
|
5bb0466ff2 | ||
|
|
f6db419afd |
33
Cargo.lock
generated
33
Cargo.lock
generated
@@ -1621,6 +1621,7 @@ dependencies = [
|
||||
"cache",
|
||||
"catalog",
|
||||
"chrono",
|
||||
"common-base",
|
||||
"common-catalog",
|
||||
"common-error",
|
||||
"common-frontend",
|
||||
@@ -4755,6 +4756,7 @@ dependencies = [
|
||||
"substrait 0.15.0",
|
||||
"table",
|
||||
"tokio",
|
||||
"tokio-util",
|
||||
"toml 0.8.19",
|
||||
"tonic 0.12.3",
|
||||
"tower 0.5.2",
|
||||
@@ -5141,7 +5143,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "greptime-proto"
|
||||
version = "0.1.0"
|
||||
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=5f6119ac7952878d39dcde0343c4bf828d18ffc8#5f6119ac7952878d39dcde0343c4bf828d18ffc8"
|
||||
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=17971523673f4fbc982510d3c9d6647ff642e16f#17971523673f4fbc982510d3c9d6647ff642e16f"
|
||||
dependencies = [
|
||||
"prost 0.13.5",
|
||||
"serde",
|
||||
@@ -7248,6 +7250,7 @@ dependencies = [
|
||||
"humantime-serde",
|
||||
"itertools 0.14.0",
|
||||
"lazy_static",
|
||||
"mito-codec",
|
||||
"mito2",
|
||||
"mur3",
|
||||
"object-store",
|
||||
@@ -7313,6 +7316,29 @@ dependencies = [
|
||||
"windows-sys 0.52.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "mito-codec"
|
||||
version = "0.15.0"
|
||||
dependencies = [
|
||||
"api",
|
||||
"bytes",
|
||||
"common-base",
|
||||
"common-decimal",
|
||||
"common-error",
|
||||
"common-macro",
|
||||
"common-recordbatch",
|
||||
"common-telemetry",
|
||||
"common-time",
|
||||
"datafusion-common",
|
||||
"datafusion-expr",
|
||||
"datatypes",
|
||||
"memcomparable",
|
||||
"paste",
|
||||
"serde",
|
||||
"snafu 0.8.5",
|
||||
"store-api",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "mito2"
|
||||
version = "0.15.0"
|
||||
@@ -7355,6 +7381,7 @@ dependencies = [
|
||||
"lazy_static",
|
||||
"log-store",
|
||||
"memcomparable",
|
||||
"mito-codec",
|
||||
"moka",
|
||||
"object-store",
|
||||
"parquet",
|
||||
@@ -8394,6 +8421,7 @@ dependencies = [
|
||||
"common-catalog",
|
||||
"common-datasource",
|
||||
"common-error",
|
||||
"common-frontend",
|
||||
"common-function",
|
||||
"common-grpc",
|
||||
"common-grpc-expr",
|
||||
@@ -8894,8 +8922,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "pgwire"
|
||||
version = "0.30.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "4ca6c26b25be998208a13ff2f0c55b567363f34675410e6d6f1c513a150583fd"
|
||||
source = "git+https://github.com/sunng87/pgwire?rev=127573d997228cfb70c7699881c568eae8131270#127573d997228cfb70c7699881c568eae8131270"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"bytes",
|
||||
|
||||
@@ -49,6 +49,7 @@ members = [
|
||||
"src/meta-client",
|
||||
"src/meta-srv",
|
||||
"src/metric-engine",
|
||||
"src/mito-codec",
|
||||
"src/mito2",
|
||||
"src/object-store",
|
||||
"src/operator",
|
||||
@@ -133,7 +134,7 @@ etcd-client = "0.14"
|
||||
fst = "0.4.7"
|
||||
futures = "0.3"
|
||||
futures-util = "0.3"
|
||||
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "5f6119ac7952878d39dcde0343c4bf828d18ffc8" }
|
||||
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "17971523673f4fbc982510d3c9d6647ff642e16f" }
|
||||
hex = "0.4"
|
||||
http = "1"
|
||||
humantime = "2.1"
|
||||
@@ -274,6 +275,7 @@ log-store = { path = "src/log-store" }
|
||||
meta-client = { path = "src/meta-client" }
|
||||
meta-srv = { path = "src/meta-srv" }
|
||||
metric-engine = { path = "src/metric-engine" }
|
||||
mito-codec = { path = "src/mito-codec" }
|
||||
mito2 = { path = "src/mito2" }
|
||||
object-store = { path = "src/object-store" }
|
||||
operator = { path = "src/operator" }
|
||||
|
||||
@@ -17,6 +17,7 @@ arrow-schema.workspace = true
|
||||
async-stream.workspace = true
|
||||
async-trait.workspace = true
|
||||
bytes.workspace = true
|
||||
common-base.workspace = true
|
||||
common-catalog.workspace = true
|
||||
common-error.workspace = true
|
||||
common-frontend.workspace = true
|
||||
|
||||
@@ -278,12 +278,25 @@ pub enum Error {
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to list frontend nodes"))]
|
||||
ListProcess {
|
||||
#[snafu(display("Failed to invoke frontend services"))]
|
||||
InvokeFrontend {
|
||||
source: common_frontend::error::Error,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Meta client is not provided"))]
|
||||
MetaClientMissing {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to find frontend node: {}", addr))]
|
||||
FrontendNotFound {
|
||||
addr: String,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
}
|
||||
|
||||
impl Error {
|
||||
@@ -352,7 +365,10 @@ impl ErrorExt for Error {
|
||||
Error::GetViewCache { source, .. } | Error::GetTableCache { source, .. } => {
|
||||
source.status_code()
|
||||
}
|
||||
Error::ListProcess { source, .. } => source.status_code(),
|
||||
Error::InvokeFrontend { source, .. } => source.status_code(),
|
||||
Error::FrontendNotFound { .. } | Error::MetaClientMissing { .. } => {
|
||||
StatusCode::Unexpected
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -34,4 +34,20 @@ lazy_static! {
|
||||
register_histogram!("greptime_catalog_kv_get", "catalog kv get").unwrap();
|
||||
pub static ref METRIC_CATALOG_KV_BATCH_GET: Histogram =
|
||||
register_histogram!("greptime_catalog_kv_batch_get", "catalog kv batch get").unwrap();
|
||||
|
||||
/// Count of running process in each catalog.
|
||||
pub static ref PROCESS_LIST_COUNT: IntGaugeVec = register_int_gauge_vec!(
|
||||
"greptime_process_list_count",
|
||||
"Running process count per catalog",
|
||||
&["catalog"]
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
/// Count of killed process in each catalog.
|
||||
pub static ref PROCESS_KILL_COUNT: IntCounterVec = register_int_counter_vec!(
|
||||
"greptime_process_kill_count",
|
||||
"Completed kill process requests count",
|
||||
&["catalog"]
|
||||
)
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
@@ -14,24 +14,32 @@
|
||||
|
||||
use std::collections::hash_map::Entry;
|
||||
use std::collections::HashMap;
|
||||
use std::fmt::{Debug, Formatter};
|
||||
use std::sync::atomic::{AtomicU64, Ordering};
|
||||
use std::sync::{Arc, RwLock};
|
||||
|
||||
use api::v1::frontend::{ListProcessRequest, ProcessInfo};
|
||||
use api::v1::frontend::{KillProcessRequest, ListProcessRequest, ProcessInfo};
|
||||
use common_base::cancellation::CancellationHandle;
|
||||
use common_frontend::selector::{FrontendSelector, MetaClientSelector};
|
||||
use common_telemetry::{debug, info};
|
||||
use common_time::util::current_time_millis;
|
||||
use meta_client::MetaClientRef;
|
||||
use snafu::ResultExt;
|
||||
use snafu::{ensure, OptionExt, ResultExt};
|
||||
|
||||
use crate::error;
|
||||
use crate::metrics::{PROCESS_KILL_COUNT, PROCESS_LIST_COUNT};
|
||||
|
||||
pub type ProcessManagerRef = Arc<ProcessManager>;
|
||||
|
||||
/// Query process manager.
|
||||
pub struct ProcessManager {
|
||||
/// Local frontend server address,
|
||||
server_addr: String,
|
||||
/// Next process id for local queries.
|
||||
next_id: AtomicU64,
|
||||
catalogs: RwLock<HashMap<String, HashMap<u64, ProcessInfo>>>,
|
||||
/// Running process per catalog.
|
||||
catalogs: RwLock<HashMap<String, HashMap<u64, CancellableProcess>>>,
|
||||
/// Frontend selector to locate frontend nodes.
|
||||
frontend_selector: Option<MetaClientSelector>,
|
||||
}
|
||||
|
||||
@@ -50,6 +58,7 @@ impl ProcessManager {
|
||||
|
||||
impl ProcessManager {
|
||||
/// Registers a submitted query. Use the provided id if present.
|
||||
#[must_use]
|
||||
pub fn register_query(
|
||||
self: &Arc<Self>,
|
||||
catalog: String,
|
||||
@@ -68,16 +77,21 @@ impl ProcessManager {
|
||||
client,
|
||||
frontend: self.server_addr.clone(),
|
||||
};
|
||||
let cancellation_handle = Arc::new(CancellationHandle::default());
|
||||
let cancellable_process = CancellableProcess::new(cancellation_handle.clone(), process);
|
||||
|
||||
self.catalogs
|
||||
.write()
|
||||
.unwrap()
|
||||
.entry(catalog.clone())
|
||||
.or_default()
|
||||
.insert(id, process);
|
||||
.insert(id, cancellable_process);
|
||||
|
||||
Ticket {
|
||||
catalog,
|
||||
manager: self.clone(),
|
||||
id,
|
||||
cancellation_handle,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -91,30 +105,25 @@ impl ProcessManager {
|
||||
if let Entry::Occupied(mut o) = self.catalogs.write().unwrap().entry(catalog) {
|
||||
let process = o.get_mut().remove(&id);
|
||||
debug!("Deregister process: {:?}", process);
|
||||
if o.get_mut().is_empty() {
|
||||
if o.get().is_empty() {
|
||||
o.remove();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn deregister_all_queries(&self) {
|
||||
self.catalogs.write().unwrap().clear();
|
||||
info!("All queries on {} has been deregistered", self.server_addr);
|
||||
}
|
||||
|
||||
/// List local running processes in given catalog.
|
||||
pub fn local_processes(&self, catalog: Option<&str>) -> error::Result<Vec<ProcessInfo>> {
|
||||
let catalogs = self.catalogs.read().unwrap();
|
||||
let result = if let Some(catalog) = catalog {
|
||||
if let Some(catalogs) = catalogs.get(catalog) {
|
||||
catalogs.values().cloned().collect()
|
||||
catalogs.values().map(|p| p.process.clone()).collect()
|
||||
} else {
|
||||
vec![]
|
||||
}
|
||||
} else {
|
||||
catalogs
|
||||
.values()
|
||||
.flat_map(|v| v.values().cloned())
|
||||
.flat_map(|v| v.values().map(|p| p.process.clone()))
|
||||
.collect()
|
||||
};
|
||||
Ok(result)
|
||||
@@ -129,14 +138,14 @@ impl ProcessManager {
|
||||
let frontends = remote_frontend_selector
|
||||
.select(|node| node.peer.addr != self.server_addr)
|
||||
.await
|
||||
.context(error::ListProcessSnafu)?;
|
||||
.context(error::InvokeFrontendSnafu)?;
|
||||
for mut f in frontends {
|
||||
processes.extend(
|
||||
f.list_process(ListProcessRequest {
|
||||
catalog: catalog.unwrap_or_default().to_string(),
|
||||
})
|
||||
.await
|
||||
.context(error::ListProcessSnafu)?
|
||||
.context(error::InvokeFrontendSnafu)?
|
||||
.processes,
|
||||
);
|
||||
}
|
||||
@@ -144,12 +153,64 @@ impl ProcessManager {
|
||||
processes.extend(self.local_processes(catalog)?);
|
||||
Ok(processes)
|
||||
}
|
||||
|
||||
/// Kills query with provided catalog and id.
|
||||
pub async fn kill_process(
|
||||
&self,
|
||||
server_addr: String,
|
||||
catalog: String,
|
||||
id: u64,
|
||||
) -> error::Result<bool> {
|
||||
if server_addr == self.server_addr {
|
||||
if let Some(catalogs) = self.catalogs.write().unwrap().get_mut(&catalog) {
|
||||
if let Some(process) = catalogs.remove(&id) {
|
||||
process.handle.cancel();
|
||||
info!(
|
||||
"Killed process, catalog: {}, id: {:?}",
|
||||
process.process.catalog, process.process.id
|
||||
);
|
||||
PROCESS_KILL_COUNT.with_label_values(&[&catalog]).inc();
|
||||
Ok(true)
|
||||
} else {
|
||||
debug!("Failed to kill process, id not found: {}", id);
|
||||
Ok(false)
|
||||
}
|
||||
} else {
|
||||
debug!("Failed to kill process, catalog not found: {}", catalog);
|
||||
Ok(false)
|
||||
}
|
||||
} else {
|
||||
let mut nodes = self
|
||||
.frontend_selector
|
||||
.as_ref()
|
||||
.context(error::MetaClientMissingSnafu)?
|
||||
.select(|node| node.peer.addr == server_addr)
|
||||
.await
|
||||
.context(error::InvokeFrontendSnafu)?;
|
||||
ensure!(
|
||||
!nodes.is_empty(),
|
||||
error::FrontendNotFoundSnafu { addr: server_addr }
|
||||
);
|
||||
|
||||
let request = KillProcessRequest {
|
||||
server_addr,
|
||||
catalog,
|
||||
process_id: id,
|
||||
};
|
||||
nodes[0]
|
||||
.kill_process(request)
|
||||
.await
|
||||
.context(error::InvokeFrontendSnafu)?;
|
||||
Ok(true)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct Ticket {
|
||||
pub(crate) catalog: String,
|
||||
pub(crate) manager: ProcessManagerRef,
|
||||
pub(crate) id: u64,
|
||||
pub cancellation_handle: Arc<CancellationHandle>,
|
||||
}
|
||||
|
||||
impl Drop for Ticket {
|
||||
@@ -159,6 +220,37 @@ impl Drop for Ticket {
|
||||
}
|
||||
}
|
||||
|
||||
struct CancellableProcess {
|
||||
handle: Arc<CancellationHandle>,
|
||||
process: ProcessInfo,
|
||||
}
|
||||
|
||||
impl Drop for CancellableProcess {
|
||||
fn drop(&mut self) {
|
||||
PROCESS_LIST_COUNT
|
||||
.with_label_values(&[&self.process.catalog])
|
||||
.dec();
|
||||
}
|
||||
}
|
||||
|
||||
impl CancellableProcess {
|
||||
fn new(handle: Arc<CancellationHandle>, process: ProcessInfo) -> Self {
|
||||
PROCESS_LIST_COUNT
|
||||
.with_label_values(&[&process.catalog])
|
||||
.inc();
|
||||
Self { handle, process }
|
||||
}
|
||||
}
|
||||
|
||||
impl Debug for CancellableProcess {
|
||||
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("CancellableProcess")
|
||||
.field("cancelled", &self.handle.is_cancelled())
|
||||
.field("process", &self.process)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::sync::Arc;
|
||||
@@ -185,4 +277,212 @@ mod tests {
|
||||
drop(ticket);
|
||||
assert_eq!(process_manager.local_processes(None).unwrap().len(), 0);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_register_query_with_custom_id() {
|
||||
let process_manager = Arc::new(ProcessManager::new("127.0.0.1:8000".to_string(), None));
|
||||
let custom_id = 12345;
|
||||
|
||||
let ticket = process_manager.clone().register_query(
|
||||
"public".to_string(),
|
||||
vec!["test".to_string()],
|
||||
"SELECT * FROM table".to_string(),
|
||||
"client1".to_string(),
|
||||
Some(custom_id),
|
||||
);
|
||||
|
||||
assert_eq!(ticket.id, custom_id);
|
||||
|
||||
let running_processes = process_manager.local_processes(None).unwrap();
|
||||
assert_eq!(running_processes.len(), 1);
|
||||
assert_eq!(running_processes[0].id, custom_id);
|
||||
assert_eq!(&running_processes[0].client, "client1");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_multiple_queries_same_catalog() {
|
||||
let process_manager = Arc::new(ProcessManager::new("127.0.0.1:8000".to_string(), None));
|
||||
|
||||
let ticket1 = process_manager.clone().register_query(
|
||||
"public".to_string(),
|
||||
vec!["schema1".to_string()],
|
||||
"SELECT * FROM table1".to_string(),
|
||||
"client1".to_string(),
|
||||
None,
|
||||
);
|
||||
|
||||
let ticket2 = process_manager.clone().register_query(
|
||||
"public".to_string(),
|
||||
vec!["schema2".to_string()],
|
||||
"SELECT * FROM table2".to_string(),
|
||||
"client2".to_string(),
|
||||
None,
|
||||
);
|
||||
|
||||
let running_processes = process_manager.local_processes(Some("public")).unwrap();
|
||||
assert_eq!(running_processes.len(), 2);
|
||||
|
||||
// Verify both processes are present
|
||||
let ids: Vec<u64> = running_processes.iter().map(|p| p.id).collect();
|
||||
assert!(ids.contains(&ticket1.id));
|
||||
assert!(ids.contains(&ticket2.id));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_multiple_catalogs() {
|
||||
let process_manager = Arc::new(ProcessManager::new("127.0.0.1:8000".to_string(), None));
|
||||
|
||||
let _ticket1 = process_manager.clone().register_query(
|
||||
"catalog1".to_string(),
|
||||
vec!["schema1".to_string()],
|
||||
"SELECT * FROM table1".to_string(),
|
||||
"client1".to_string(),
|
||||
None,
|
||||
);
|
||||
|
||||
let _ticket2 = process_manager.clone().register_query(
|
||||
"catalog2".to_string(),
|
||||
vec!["schema2".to_string()],
|
||||
"SELECT * FROM table2".to_string(),
|
||||
"client2".to_string(),
|
||||
None,
|
||||
);
|
||||
|
||||
// Test listing processes for specific catalog
|
||||
let catalog1_processes = process_manager.local_processes(Some("catalog1")).unwrap();
|
||||
assert_eq!(catalog1_processes.len(), 1);
|
||||
assert_eq!(&catalog1_processes[0].catalog, "catalog1");
|
||||
|
||||
let catalog2_processes = process_manager.local_processes(Some("catalog2")).unwrap();
|
||||
assert_eq!(catalog2_processes.len(), 1);
|
||||
assert_eq!(&catalog2_processes[0].catalog, "catalog2");
|
||||
|
||||
// Test listing all processes
|
||||
let all_processes = process_manager.local_processes(None).unwrap();
|
||||
assert_eq!(all_processes.len(), 2);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_deregister_query() {
|
||||
let process_manager = Arc::new(ProcessManager::new("127.0.0.1:8000".to_string(), None));
|
||||
|
||||
let ticket = process_manager.clone().register_query(
|
||||
"public".to_string(),
|
||||
vec!["test".to_string()],
|
||||
"SELECT * FROM table".to_string(),
|
||||
"client1".to_string(),
|
||||
None,
|
||||
);
|
||||
assert_eq!(process_manager.local_processes(None).unwrap().len(), 1);
|
||||
process_manager.deregister_query("public".to_string(), ticket.id);
|
||||
assert_eq!(process_manager.local_processes(None).unwrap().len(), 0);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_cancellation_handle() {
|
||||
let process_manager = Arc::new(ProcessManager::new("127.0.0.1:8000".to_string(), None));
|
||||
|
||||
let ticket = process_manager.clone().register_query(
|
||||
"public".to_string(),
|
||||
vec!["test".to_string()],
|
||||
"SELECT * FROM table".to_string(),
|
||||
"client1".to_string(),
|
||||
None,
|
||||
);
|
||||
|
||||
assert!(!ticket.cancellation_handle.is_cancelled());
|
||||
ticket.cancellation_handle.cancel();
|
||||
assert!(ticket.cancellation_handle.is_cancelled());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_kill_local_process() {
|
||||
let process_manager = Arc::new(ProcessManager::new("127.0.0.1:8000".to_string(), None));
|
||||
|
||||
let ticket = process_manager.clone().register_query(
|
||||
"public".to_string(),
|
||||
vec!["test".to_string()],
|
||||
"SELECT * FROM table".to_string(),
|
||||
"client1".to_string(),
|
||||
None,
|
||||
);
|
||||
assert!(!ticket.cancellation_handle.is_cancelled());
|
||||
let killed = process_manager
|
||||
.kill_process(
|
||||
"127.0.0.1:8000".to_string(),
|
||||
"public".to_string(),
|
||||
ticket.id,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert!(killed);
|
||||
assert_eq!(process_manager.local_processes(None).unwrap().len(), 0);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_kill_nonexistent_process() {
|
||||
let process_manager = Arc::new(ProcessManager::new("127.0.0.1:8000".to_string(), None));
|
||||
let killed = process_manager
|
||||
.kill_process("127.0.0.1:8000".to_string(), "public".to_string(), 999)
|
||||
.await
|
||||
.unwrap();
|
||||
assert!(!killed);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_kill_process_nonexistent_catalog() {
|
||||
let process_manager = Arc::new(ProcessManager::new("127.0.0.1:8000".to_string(), None));
|
||||
let killed = process_manager
|
||||
.kill_process("127.0.0.1:8000".to_string(), "nonexistent".to_string(), 1)
|
||||
.await
|
||||
.unwrap();
|
||||
assert!(!killed);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_process_info_fields() {
|
||||
let process_manager = Arc::new(ProcessManager::new("127.0.0.1:8000".to_string(), None));
|
||||
|
||||
let _ticket = process_manager.clone().register_query(
|
||||
"test_catalog".to_string(),
|
||||
vec!["schema1".to_string(), "schema2".to_string()],
|
||||
"SELECT COUNT(*) FROM users WHERE age > 18".to_string(),
|
||||
"test_client".to_string(),
|
||||
Some(42),
|
||||
);
|
||||
|
||||
let processes = process_manager.local_processes(None).unwrap();
|
||||
assert_eq!(processes.len(), 1);
|
||||
|
||||
let process = &processes[0];
|
||||
assert_eq!(process.id, 42);
|
||||
assert_eq!(&process.catalog, "test_catalog");
|
||||
assert_eq!(process.schemas, vec!["schema1", "schema2"]);
|
||||
assert_eq!(&process.query, "SELECT COUNT(*) FROM users WHERE age > 18");
|
||||
assert_eq!(&process.client, "test_client");
|
||||
assert_eq!(&process.frontend, "127.0.0.1:8000");
|
||||
assert!(process.start_timestamp > 0);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_ticket_drop_deregisters_process() {
|
||||
let process_manager = Arc::new(ProcessManager::new("127.0.0.1:8000".to_string(), None));
|
||||
|
||||
{
|
||||
let _ticket = process_manager.clone().register_query(
|
||||
"public".to_string(),
|
||||
vec!["test".to_string()],
|
||||
"SELECT * FROM table".to_string(),
|
||||
"client1".to_string(),
|
||||
None,
|
||||
);
|
||||
|
||||
// Process should be registered
|
||||
assert_eq!(process_manager.local_processes(None).unwrap().len(), 1);
|
||||
} // ticket goes out of scope here
|
||||
|
||||
// Process should be automatically deregistered
|
||||
assert_eq!(process_manager.local_processes(None).unwrap().len(), 0);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -12,7 +12,7 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use api::v1::flow::{FlowRequest, FlowResponse};
|
||||
use api::v1::flow::{DirtyWindowRequest, DirtyWindowRequests, FlowRequest, FlowResponse};
|
||||
use api::v1::region::InsertRequests;
|
||||
use common_error::ext::BoxedError;
|
||||
use common_meta::node_manager::Flownode;
|
||||
@@ -44,6 +44,16 @@ impl Flownode for FlowRequester {
|
||||
.map_err(BoxedError::new)
|
||||
.context(common_meta::error::ExternalSnafu)
|
||||
}
|
||||
|
||||
async fn handle_mark_window_dirty(
|
||||
&self,
|
||||
req: DirtyWindowRequest,
|
||||
) -> common_meta::error::Result<FlowResponse> {
|
||||
self.handle_mark_window_dirty(req)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(common_meta::error::ExternalSnafu)
|
||||
}
|
||||
}
|
||||
|
||||
impl FlowRequester {
|
||||
@@ -91,4 +101,20 @@ impl FlowRequester {
|
||||
.into_inner();
|
||||
Ok(response)
|
||||
}
|
||||
|
||||
async fn handle_mark_window_dirty(&self, req: DirtyWindowRequest) -> Result<FlowResponse> {
|
||||
let (addr, mut client) = self.client.raw_flow_client()?;
|
||||
let response = client
|
||||
.handle_mark_dirty_time_window(DirtyWindowRequests {
|
||||
requests: vec![req],
|
||||
})
|
||||
.await
|
||||
.or_else(|e| {
|
||||
let code = e.code();
|
||||
let err: crate::error::Error = e.into();
|
||||
Err(BoxedError::new(err)).context(FlowServerSnafu { addr, code })
|
||||
})?
|
||||
.into_inner();
|
||||
Ok(response)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -12,7 +12,6 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::path::Path;
|
||||
use std::time::Duration;
|
||||
|
||||
use cmd::options::GreptimeOptions;
|
||||
@@ -58,12 +57,7 @@ fn test_load_datanode_example_config() {
|
||||
metadata_cache_tti: Duration::from_secs(300),
|
||||
}),
|
||||
wal: DatanodeWalConfig::RaftEngine(RaftEngineConfig {
|
||||
dir: Some(
|
||||
Path::new(DEFAULT_DATA_HOME)
|
||||
.join(WAL_DIR)
|
||||
.to_string_lossy()
|
||||
.to_string(),
|
||||
),
|
||||
dir: Some(format!("{}/{}", DEFAULT_DATA_HOME, WAL_DIR)),
|
||||
sync_period: Some(Duration::from_secs(10)),
|
||||
recovery_parallelism: 2,
|
||||
..Default::default()
|
||||
@@ -86,10 +80,7 @@ fn test_load_datanode_example_config() {
|
||||
],
|
||||
logging: LoggingOptions {
|
||||
level: Some("info".to_string()),
|
||||
dir: Path::new(DEFAULT_DATA_HOME)
|
||||
.join(DEFAULT_LOGGING_DIR)
|
||||
.to_string_lossy()
|
||||
.to_string(),
|
||||
dir: format!("{}/{}", DEFAULT_DATA_HOME, DEFAULT_LOGGING_DIR),
|
||||
otlp_endpoint: Some(DEFAULT_OTLP_ENDPOINT.to_string()),
|
||||
tracing_sample_ratio: Some(Default::default()),
|
||||
..Default::default()
|
||||
@@ -132,10 +123,7 @@ fn test_load_frontend_example_config() {
|
||||
}),
|
||||
logging: LoggingOptions {
|
||||
level: Some("info".to_string()),
|
||||
dir: Path::new(DEFAULT_DATA_HOME)
|
||||
.join(DEFAULT_LOGGING_DIR)
|
||||
.to_string_lossy()
|
||||
.to_string(),
|
||||
dir: format!("{}/{}", DEFAULT_DATA_HOME, DEFAULT_LOGGING_DIR),
|
||||
otlp_endpoint: Some(DEFAULT_OTLP_ENDPOINT.to_string()),
|
||||
tracing_sample_ratio: Some(Default::default()),
|
||||
..Default::default()
|
||||
@@ -182,10 +170,7 @@ fn test_load_metasrv_example_config() {
|
||||
..Default::default()
|
||||
},
|
||||
logging: LoggingOptions {
|
||||
dir: Path::new(DEFAULT_DATA_HOME)
|
||||
.join(DEFAULT_LOGGING_DIR)
|
||||
.to_string_lossy()
|
||||
.to_string(),
|
||||
dir: format!("{}/{}", DEFAULT_DATA_HOME, DEFAULT_LOGGING_DIR),
|
||||
level: Some("info".to_string()),
|
||||
otlp_endpoint: Some(DEFAULT_OTLP_ENDPOINT.to_string()),
|
||||
tracing_sample_ratio: Some(Default::default()),
|
||||
@@ -220,12 +205,7 @@ fn test_load_standalone_example_config() {
|
||||
component: StandaloneOptions {
|
||||
default_timezone: Some("UTC".to_string()),
|
||||
wal: DatanodeWalConfig::RaftEngine(RaftEngineConfig {
|
||||
dir: Some(
|
||||
Path::new(DEFAULT_DATA_HOME)
|
||||
.join(WAL_DIR)
|
||||
.to_string_lossy()
|
||||
.to_string(),
|
||||
),
|
||||
dir: Some(format!("{}/{}", DEFAULT_DATA_HOME, WAL_DIR)),
|
||||
sync_period: Some(Duration::from_secs(10)),
|
||||
recovery_parallelism: 2,
|
||||
..Default::default()
|
||||
@@ -248,10 +228,7 @@ fn test_load_standalone_example_config() {
|
||||
},
|
||||
logging: LoggingOptions {
|
||||
level: Some("info".to_string()),
|
||||
dir: Path::new(DEFAULT_DATA_HOME)
|
||||
.join(DEFAULT_LOGGING_DIR)
|
||||
.to_string_lossy()
|
||||
.to_string(),
|
||||
dir: format!("{}/{}", DEFAULT_DATA_HOME, DEFAULT_LOGGING_DIR),
|
||||
otlp_endpoint: Some(DEFAULT_OTLP_ENDPOINT.to_string()),
|
||||
tracing_sample_ratio: Some(Default::default()),
|
||||
..Default::default()
|
||||
|
||||
240
src/common/base/src/cancellation.rs
Normal file
240
src/common/base/src/cancellation.rs
Normal file
@@ -0,0 +1,240 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
//! [CancellationHandle] is used to compose with manual implementation of [futures::future::Future]
|
||||
//! or [futures::stream::Stream] to facilitate cancellation.
|
||||
//! See example in [frontend::stream_wrapper::CancellableStreamWrapper] and [CancellableFuture].
|
||||
|
||||
use std::fmt::{Debug, Display, Formatter};
|
||||
use std::future::Future;
|
||||
use std::pin::Pin;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::Arc;
|
||||
use std::task::{Context, Poll};
|
||||
|
||||
use futures::task::AtomicWaker;
|
||||
use pin_project::pin_project;
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct CancellationHandle {
|
||||
waker: AtomicWaker,
|
||||
cancelled: AtomicBool,
|
||||
}
|
||||
|
||||
impl Debug for CancellationHandle {
|
||||
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("CancellationHandle")
|
||||
.field("cancelled", &self.is_cancelled())
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
impl CancellationHandle {
|
||||
pub fn waker(&self) -> &AtomicWaker {
|
||||
&self.waker
|
||||
}
|
||||
|
||||
/// Cancels a future or stream.
|
||||
pub fn cancel(&self) {
|
||||
if self
|
||||
.cancelled
|
||||
.compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed)
|
||||
.is_ok()
|
||||
{
|
||||
self.waker.wake();
|
||||
}
|
||||
}
|
||||
|
||||
/// Is this handle cancelled.
|
||||
pub fn is_cancelled(&self) -> bool {
|
||||
self.cancelled.load(Ordering::Relaxed)
|
||||
}
|
||||
}
|
||||
|
||||
#[pin_project]
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct CancellableFuture<T> {
|
||||
#[pin]
|
||||
fut: T,
|
||||
handle: Arc<CancellationHandle>,
|
||||
}
|
||||
|
||||
impl<T> CancellableFuture<T> {
|
||||
pub fn new(fut: T, handle: Arc<CancellationHandle>) -> Self {
|
||||
Self { fut, handle }
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Future for CancellableFuture<T>
|
||||
where
|
||||
T: Future,
|
||||
{
|
||||
type Output = Result<T::Output, Cancelled>;
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
let this = self.as_mut().project();
|
||||
// Check if the task has been aborted
|
||||
if this.handle.is_cancelled() {
|
||||
return Poll::Ready(Err(Cancelled));
|
||||
}
|
||||
|
||||
if let Poll::Ready(x) = this.fut.poll(cx) {
|
||||
return Poll::Ready(Ok(x));
|
||||
}
|
||||
|
||||
this.handle.waker().register(cx.waker());
|
||||
if this.handle.is_cancelled() {
|
||||
return Poll::Ready(Err(Cancelled));
|
||||
}
|
||||
Poll::Pending
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Copy, Clone, Debug)]
|
||||
pub struct Cancelled;
|
||||
|
||||
impl Display for Cancelled {
|
||||
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "Future has been cancelled")
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use tokio::time::{sleep, timeout};
|
||||
|
||||
use crate::cancellation::{CancellableFuture, CancellationHandle, Cancelled};
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_cancellable_future_completes_normally() {
|
||||
let handle = Arc::new(CancellationHandle::default());
|
||||
let future = async { 42 };
|
||||
let cancellable = CancellableFuture::new(future, handle);
|
||||
|
||||
let result = cancellable.await;
|
||||
assert!(result.is_ok());
|
||||
assert_eq!(result.unwrap(), 42);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_cancellable_future_cancelled_before_start() {
|
||||
let handle = Arc::new(CancellationHandle::default());
|
||||
handle.cancel();
|
||||
|
||||
let future = async { 42 };
|
||||
let cancellable = CancellableFuture::new(future, handle);
|
||||
|
||||
let result = cancellable.await;
|
||||
assert!(result.is_err());
|
||||
assert!(matches!(result.unwrap_err(), Cancelled));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_cancellable_future_cancelled_during_execution() {
|
||||
let handle = Arc::new(CancellationHandle::default());
|
||||
let handle_clone = handle.clone();
|
||||
|
||||
// Create a future that sleeps for a long time
|
||||
let future = async {
|
||||
sleep(Duration::from_secs(10)).await;
|
||||
42
|
||||
};
|
||||
let cancellable = CancellableFuture::new(future, handle);
|
||||
|
||||
// Cancel the future after a short delay
|
||||
tokio::spawn(async move {
|
||||
sleep(Duration::from_millis(50)).await;
|
||||
handle_clone.cancel();
|
||||
});
|
||||
|
||||
let result = cancellable.await;
|
||||
assert!(result.is_err());
|
||||
assert!(matches!(result.unwrap_err(), Cancelled));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_cancellable_future_completes_before_cancellation() {
|
||||
let handle = Arc::new(CancellationHandle::default());
|
||||
let handle_clone = handle.clone();
|
||||
|
||||
// Create a future that completes quickly
|
||||
let future = async {
|
||||
sleep(Duration::from_millis(10)).await;
|
||||
42
|
||||
};
|
||||
let cancellable = CancellableFuture::new(future, handle);
|
||||
|
||||
// Try to cancel after the future should have completed
|
||||
tokio::spawn(async move {
|
||||
sleep(Duration::from_millis(100)).await;
|
||||
handle_clone.cancel();
|
||||
});
|
||||
|
||||
let result = cancellable.await;
|
||||
assert!(result.is_ok());
|
||||
assert_eq!(result.unwrap(), 42);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_cancellation_handle_is_cancelled() {
|
||||
let handle = CancellationHandle::default();
|
||||
assert!(!handle.is_cancelled());
|
||||
|
||||
handle.cancel();
|
||||
assert!(handle.is_cancelled());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_multiple_cancellable_futures_with_same_handle() {
|
||||
let handle = Arc::new(CancellationHandle::default());
|
||||
|
||||
let future1 = CancellableFuture::new(async { 1 }, handle.clone());
|
||||
let future2 = CancellableFuture::new(async { 2 }, handle.clone());
|
||||
|
||||
// Cancel before starting
|
||||
handle.cancel();
|
||||
|
||||
let (result1, result2) = tokio::join!(future1, future2);
|
||||
|
||||
assert!(result1.is_err());
|
||||
assert!(result2.is_err());
|
||||
assert!(matches!(result1.unwrap_err(), Cancelled));
|
||||
assert!(matches!(result2.unwrap_err(), Cancelled));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_cancellable_future_with_timeout() {
|
||||
let handle = Arc::new(CancellationHandle::default());
|
||||
let future = async {
|
||||
sleep(Duration::from_secs(1)).await;
|
||||
42
|
||||
};
|
||||
let cancellable = CancellableFuture::new(future, handle.clone());
|
||||
|
||||
// Use timeout to ensure the test doesn't hang
|
||||
let result = timeout(Duration::from_millis(100), cancellable).await;
|
||||
|
||||
// Should timeout because the future takes 1 second but we timeout after 100ms
|
||||
assert!(result.is_err());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_cancelled_display() {
|
||||
let cancelled = Cancelled;
|
||||
assert_eq!(format!("{}", cancelled), "Future has been cancelled");
|
||||
}
|
||||
}
|
||||
@@ -14,6 +14,7 @@
|
||||
|
||||
pub mod bit_vec;
|
||||
pub mod bytes;
|
||||
pub mod cancellation;
|
||||
pub mod plugins;
|
||||
pub mod range_read;
|
||||
#[allow(clippy::all)]
|
||||
|
||||
@@ -42,8 +42,8 @@ pub enum Error {
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to invoke list process service"))]
|
||||
ListProcess {
|
||||
#[snafu(display("Failed to invoke frontend service"))]
|
||||
InvokeFrontend {
|
||||
#[snafu(source)]
|
||||
error: tonic::Status,
|
||||
#[snafu(implicit)]
|
||||
@@ -67,7 +67,7 @@ impl ErrorExt for Error {
|
||||
External { source, .. } => source.status_code(),
|
||||
Meta { source, .. } => source.status_code(),
|
||||
ParseProcessId { .. } => StatusCode::InvalidArguments,
|
||||
ListProcess { .. } => StatusCode::External,
|
||||
InvokeFrontend { .. } => StatusCode::Unexpected,
|
||||
CreateChannel { source, .. } => source.status_code(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -16,9 +16,13 @@ use std::time::Duration;
|
||||
|
||||
use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
|
||||
use common_meta::cluster::{ClusterInfo, NodeInfo, Role};
|
||||
use greptime_proto::v1::frontend::{frontend_client, ListProcessRequest, ListProcessResponse};
|
||||
use greptime_proto::v1::frontend::{
|
||||
frontend_client, KillProcessRequest, KillProcessResponse, ListProcessRequest,
|
||||
ListProcessResponse,
|
||||
};
|
||||
use meta_client::MetaClientRef;
|
||||
use snafu::ResultExt;
|
||||
use tonic::Response;
|
||||
|
||||
use crate::error;
|
||||
use crate::error::{MetaSnafu, Result};
|
||||
@@ -28,18 +32,28 @@ pub type FrontendClientPtr = Box<dyn FrontendClient>;
|
||||
#[async_trait::async_trait]
|
||||
pub trait FrontendClient: Send {
|
||||
async fn list_process(&mut self, req: ListProcessRequest) -> Result<ListProcessResponse>;
|
||||
|
||||
async fn kill_process(&mut self, req: KillProcessRequest) -> Result<KillProcessResponse>;
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl FrontendClient for frontend_client::FrontendClient<tonic::transport::channel::Channel> {
|
||||
async fn list_process(&mut self, req: ListProcessRequest) -> Result<ListProcessResponse> {
|
||||
let response: ListProcessResponse = frontend_client::FrontendClient::<
|
||||
tonic::transport::channel::Channel,
|
||||
>::list_process(self, req)
|
||||
frontend_client::FrontendClient::<tonic::transport::channel::Channel>::list_process(
|
||||
self, req,
|
||||
)
|
||||
.await
|
||||
.context(error::ListProcessSnafu)?
|
||||
.into_inner();
|
||||
Ok(response)
|
||||
.context(error::InvokeFrontendSnafu)
|
||||
.map(Response::into_inner)
|
||||
}
|
||||
|
||||
async fn kill_process(&mut self, req: KillProcessRequest) -> Result<KillProcessResponse> {
|
||||
frontend_client::FrontendClient::<tonic::transport::channel::Channel>::kill_process(
|
||||
self, req,
|
||||
)
|
||||
.await
|
||||
.context(error::InvokeFrontendSnafu)
|
||||
.map(Response::into_inner)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -23,7 +23,8 @@ use std::sync::Arc;
|
||||
|
||||
use build::BuildFunction;
|
||||
use database::{
|
||||
CurrentSchemaFunction, DatabaseFunction, ReadPreferenceFunction, SessionUserFunction,
|
||||
ConnectionIdFunction, CurrentSchemaFunction, DatabaseFunction, PgBackendPidFunction,
|
||||
ReadPreferenceFunction, SessionUserFunction,
|
||||
};
|
||||
use pg_catalog::PGCatalogFunction;
|
||||
use procedure_state::ProcedureStateFunction;
|
||||
@@ -42,6 +43,8 @@ impl SystemFunction {
|
||||
registry.register_scalar(DatabaseFunction);
|
||||
registry.register_scalar(SessionUserFunction);
|
||||
registry.register_scalar(ReadPreferenceFunction);
|
||||
registry.register_scalar(PgBackendPidFunction);
|
||||
registry.register_scalar(ConnectionIdFunction);
|
||||
registry.register_scalar(TimezoneFunction);
|
||||
registry.register_async(Arc::new(ProcedureStateFunction));
|
||||
PGCatalogFunction::register(registry);
|
||||
|
||||
@@ -18,7 +18,8 @@ use std::sync::Arc;
|
||||
use common_query::error::Result;
|
||||
use common_query::prelude::{Signature, Volatility};
|
||||
use datatypes::prelude::{ConcreteDataType, ScalarVector};
|
||||
use datatypes::vectors::{StringVector, VectorRef};
|
||||
use datatypes::vectors::{StringVector, UInt64Vector, VectorRef};
|
||||
use derive_more::Display;
|
||||
|
||||
use crate::function::{Function, FunctionContext};
|
||||
|
||||
@@ -32,10 +33,20 @@ pub struct SessionUserFunction;
|
||||
|
||||
pub struct ReadPreferenceFunction;
|
||||
|
||||
#[derive(Display)]
|
||||
#[display("{}", self.name())]
|
||||
pub struct PgBackendPidFunction;
|
||||
|
||||
#[derive(Display)]
|
||||
#[display("{}", self.name())]
|
||||
pub struct ConnectionIdFunction;
|
||||
|
||||
const DATABASE_FUNCTION_NAME: &str = "database";
|
||||
const CURRENT_SCHEMA_FUNCTION_NAME: &str = "current_schema";
|
||||
const SESSION_USER_FUNCTION_NAME: &str = "session_user";
|
||||
const READ_PREFERENCE_FUNCTION_NAME: &str = "read_preference";
|
||||
const PG_BACKEND_PID: &str = "pg_backend_pid";
|
||||
const CONNECTION_ID: &str = "connection_id";
|
||||
|
||||
impl Function for DatabaseFunction {
|
||||
fn name(&self) -> &str {
|
||||
@@ -117,6 +128,46 @@ impl Function for ReadPreferenceFunction {
|
||||
}
|
||||
}
|
||||
|
||||
impl Function for PgBackendPidFunction {
|
||||
fn name(&self) -> &str {
|
||||
PG_BACKEND_PID
|
||||
}
|
||||
|
||||
fn return_type(&self, _input_types: &[ConcreteDataType]) -> Result<ConcreteDataType> {
|
||||
Ok(ConcreteDataType::uint64_datatype())
|
||||
}
|
||||
|
||||
fn signature(&self) -> Signature {
|
||||
Signature::nullary(Volatility::Immutable)
|
||||
}
|
||||
|
||||
fn eval(&self, func_ctx: &FunctionContext, _columns: &[VectorRef]) -> Result<VectorRef> {
|
||||
let pid = func_ctx.query_ctx.process_id();
|
||||
|
||||
Ok(Arc::new(UInt64Vector::from_slice([pid])) as _)
|
||||
}
|
||||
}
|
||||
|
||||
impl Function for ConnectionIdFunction {
|
||||
fn name(&self) -> &str {
|
||||
CONNECTION_ID
|
||||
}
|
||||
|
||||
fn return_type(&self, _input_types: &[ConcreteDataType]) -> Result<ConcreteDataType> {
|
||||
Ok(ConcreteDataType::uint64_datatype())
|
||||
}
|
||||
|
||||
fn signature(&self) -> Signature {
|
||||
Signature::nullary(Volatility::Immutable)
|
||||
}
|
||||
|
||||
fn eval(&self, func_ctx: &FunctionContext, _columns: &[VectorRef]) -> Result<VectorRef> {
|
||||
let pid = func_ctx.query_ctx.process_id();
|
||||
|
||||
Ok(Arc::new(UInt64Vector::from_slice([pid])) as _)
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for DatabaseFunction {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
write!(f, "DATABASE")
|
||||
|
||||
@@ -15,7 +15,7 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::region::RegionResponse;
|
||||
use api::v1::flow::{FlowRequest, FlowResponse};
|
||||
use api::v1::flow::{DirtyWindowRequest, FlowRequest, FlowResponse};
|
||||
use api::v1::region::{InsertRequests, RegionRequest};
|
||||
pub use common_base::AffectedRows;
|
||||
use common_query::request::QueryRequest;
|
||||
@@ -42,6 +42,9 @@ pub trait Flownode: Send + Sync {
|
||||
async fn handle(&self, request: FlowRequest) -> Result<FlowResponse>;
|
||||
|
||||
async fn handle_inserts(&self, request: InsertRequests) -> Result<FlowResponse>;
|
||||
|
||||
/// Handles requests to mark time window as dirty.
|
||||
async fn handle_mark_window_dirty(&self, req: DirtyWindowRequest) -> Result<FlowResponse>;
|
||||
}
|
||||
|
||||
pub type FlownodeRef = Arc<dyn Flownode>;
|
||||
|
||||
@@ -15,7 +15,7 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::region::RegionResponse;
|
||||
use api::v1::flow::{FlowRequest, FlowResponse};
|
||||
use api::v1::flow::{DirtyWindowRequest, FlowRequest, FlowResponse};
|
||||
use api::v1::region::{InsertRequests, RegionRequest};
|
||||
pub use common_base::AffectedRows;
|
||||
use common_query::request::QueryRequest;
|
||||
@@ -67,6 +67,14 @@ pub trait MockFlownodeHandler: Sync + Send + Clone {
|
||||
) -> Result<FlowResponse> {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
async fn handle_mark_window_dirty(
|
||||
&self,
|
||||
_peer: &Peer,
|
||||
_req: DirtyWindowRequest,
|
||||
) -> Result<FlowResponse> {
|
||||
unimplemented!()
|
||||
}
|
||||
}
|
||||
|
||||
/// A mock struct implements [NodeManager] only implement the `datanode` method.
|
||||
@@ -134,6 +142,10 @@ impl<T: MockFlownodeHandler> Flownode for MockNode<T> {
|
||||
async fn handle_inserts(&self, requests: InsertRequests) -> Result<FlowResponse> {
|
||||
self.handler.handle_inserts(&self.peer, requests).await
|
||||
}
|
||||
|
||||
async fn handle_mark_window_dirty(&self, req: DirtyWindowRequest) -> Result<FlowResponse> {
|
||||
self.handler.handle_mark_window_dirty(&self.peer, req).await
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
|
||||
@@ -173,6 +173,7 @@ pub enum Error {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Stream timeout"))]
|
||||
StreamTimeout {
|
||||
#[snafu(implicit)]
|
||||
@@ -180,6 +181,7 @@ pub enum Error {
|
||||
#[snafu(source)]
|
||||
error: tokio::time::error::Elapsed,
|
||||
},
|
||||
|
||||
#[snafu(display("RecordBatch slice index overflow: {visit_index} > {size}"))]
|
||||
RecordBatchSliceIndexOverflow {
|
||||
#[snafu(implicit)]
|
||||
@@ -187,6 +189,12 @@ pub enum Error {
|
||||
size: usize,
|
||||
visit_index: usize,
|
||||
},
|
||||
|
||||
#[snafu(display("Stream has been cancelled"))]
|
||||
StreamCancelled {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
}
|
||||
|
||||
impl ErrorExt for Error {
|
||||
@@ -221,6 +229,8 @@ impl ErrorExt for Error {
|
||||
}
|
||||
|
||||
Error::StreamTimeout { .. } => StatusCode::Cancelled,
|
||||
|
||||
Error::StreamCancelled { .. } => StatusCode::Cancelled,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -316,7 +316,7 @@ impl StreamingEngine {
|
||||
);
|
||||
|
||||
METRIC_FLOW_ROWS
|
||||
.with_label_values(&["out"])
|
||||
.with_label_values(&["out-streaming"])
|
||||
.inc_by(total_rows as u64);
|
||||
|
||||
let now = self.tick_manager.tick();
|
||||
|
||||
@@ -31,6 +31,7 @@ use common_runtime::JoinHandle;
|
||||
use common_telemetry::{error, info, trace, warn};
|
||||
use datatypes::value::Value;
|
||||
use futures::TryStreamExt;
|
||||
use greptime_proto::v1::flow::DirtyWindowRequest;
|
||||
use itertools::Itertools;
|
||||
use session::context::QueryContextBuilder;
|
||||
use snafu::{ensure, IntoError, OptionExt, ResultExt};
|
||||
@@ -46,7 +47,7 @@ use crate::error::{
|
||||
IllegalCheckTaskStateSnafu, InsertIntoFlowSnafu, InternalSnafu, JoinTaskSnafu, ListFlowsSnafu,
|
||||
NoAvailableFrontendSnafu, SyncCheckTaskSnafu, UnexpectedSnafu,
|
||||
};
|
||||
use crate::metrics::METRIC_FLOW_TASK_COUNT;
|
||||
use crate::metrics::{METRIC_FLOW_ROWS, METRIC_FLOW_TASK_COUNT};
|
||||
use crate::repr::{self, DiffRow};
|
||||
use crate::{Error, FlowId};
|
||||
|
||||
@@ -689,6 +690,9 @@ impl FlowEngine for FlowDualEngine {
|
||||
let mut to_stream_engine = Vec::with_capacity(request.requests.len());
|
||||
let mut to_batch_engine = request.requests;
|
||||
|
||||
let mut batching_row_cnt = 0;
|
||||
let mut streaming_row_cnt = 0;
|
||||
|
||||
{
|
||||
// not locking this, or recover flows will be starved when also handling flow inserts
|
||||
let src_table2flow = self.src_table2flow.read().await;
|
||||
@@ -698,9 +702,11 @@ impl FlowEngine for FlowDualEngine {
|
||||
let is_in_stream = src_table2flow.in_stream(table_id);
|
||||
let is_in_batch = src_table2flow.in_batch(table_id);
|
||||
if is_in_stream {
|
||||
streaming_row_cnt += req.rows.as_ref().map(|rs| rs.rows.len()).unwrap_or(0);
|
||||
to_stream_engine.push(req.clone());
|
||||
}
|
||||
if is_in_batch {
|
||||
batching_row_cnt += req.rows.as_ref().map(|rs| rs.rows.len()).unwrap_or(0);
|
||||
return true;
|
||||
}
|
||||
if !is_in_batch && !is_in_stream {
|
||||
@@ -713,6 +719,14 @@ impl FlowEngine for FlowDualEngine {
|
||||
// can't use drop due to https://github.com/rust-lang/rust/pull/128846
|
||||
}
|
||||
|
||||
METRIC_FLOW_ROWS
|
||||
.with_label_values(&["in-streaming"])
|
||||
.inc_by(streaming_row_cnt as u64);
|
||||
|
||||
METRIC_FLOW_ROWS
|
||||
.with_label_values(&["in-batching"])
|
||||
.inc_by(batching_row_cnt as u64);
|
||||
|
||||
let streaming_engine = self.streaming_engine.clone();
|
||||
let stream_handler: JoinHandle<Result<(), Error>> =
|
||||
common_runtime::spawn_global(async move {
|
||||
@@ -819,6 +833,10 @@ impl common_meta::node_manager::Flownode for FlowDualEngine {
|
||||
.map(|_| Default::default())
|
||||
.map_err(to_meta_err(snafu::location!()))
|
||||
}
|
||||
|
||||
async fn handle_mark_window_dirty(&self, _req: DirtyWindowRequest) -> MetaResult<FlowResponse> {
|
||||
unreachable!()
|
||||
}
|
||||
}
|
||||
|
||||
/// return a function to convert `crate::error::Error` to `common_meta::error::Error`
|
||||
@@ -926,6 +944,10 @@ impl common_meta::node_manager::Flownode for StreamingEngine {
|
||||
.map(|_| Default::default())
|
||||
.map_err(to_meta_err(snafu::location!()))
|
||||
}
|
||||
|
||||
async fn handle_mark_window_dirty(&self, _req: DirtyWindowRequest) -> MetaResult<FlowResponse> {
|
||||
unreachable!()
|
||||
}
|
||||
}
|
||||
|
||||
impl FlowEngine for StreamingEngine {
|
||||
|
||||
@@ -17,6 +17,7 @@
|
||||
use std::collections::{BTreeMap, HashMap};
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::v1::flow::{DirtyWindowRequests, FlowResponse};
|
||||
use catalog::CatalogManagerRef;
|
||||
use common_error::ext::BoxedError;
|
||||
use common_meta::ddl::create_flow::FlowType;
|
||||
@@ -29,8 +30,7 @@ use common_telemetry::{debug, info};
|
||||
use common_time::TimeToLive;
|
||||
use query::QueryEngineRef;
|
||||
use snafu::{ensure, OptionExt, ResultExt};
|
||||
use store_api::storage::RegionId;
|
||||
use table::metadata::TableId;
|
||||
use store_api::storage::{RegionId, TableId};
|
||||
use tokio::sync::{oneshot, RwLock};
|
||||
|
||||
use crate::batching_mode::frontend_client::FrontendClient;
|
||||
@@ -42,6 +42,7 @@ use crate::error::{
|
||||
ExternalSnafu, FlowAlreadyExistSnafu, FlowNotFoundSnafu, TableNotFoundMetaSnafu,
|
||||
UnexpectedSnafu, UnsupportedSnafu,
|
||||
};
|
||||
use crate::metrics::METRIC_FLOW_BATCHING_ENGINE_BULK_MARK_TIME_WINDOW;
|
||||
use crate::{CreateFlowArgs, Error, FlowId, TableName};
|
||||
|
||||
/// Batching mode Engine, responsible for driving all the batching mode tasks
|
||||
@@ -77,6 +78,116 @@ impl BatchingEngine {
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn handle_mark_dirty_time_window(
|
||||
&self,
|
||||
reqs: DirtyWindowRequests,
|
||||
) -> Result<FlowResponse, Error> {
|
||||
let table_info_mgr = self.table_meta.table_info_manager();
|
||||
|
||||
let mut group_by_table_id: HashMap<u32, Vec<_>> = HashMap::new();
|
||||
for r in reqs.requests {
|
||||
let tid = TableId::from(r.table_id);
|
||||
let entry = group_by_table_id.entry(tid).or_default();
|
||||
entry.extend(r.timestamps);
|
||||
}
|
||||
let tids = group_by_table_id.keys().cloned().collect::<Vec<TableId>>();
|
||||
let table_infos =
|
||||
table_info_mgr
|
||||
.batch_get(&tids)
|
||||
.await
|
||||
.with_context(|_| TableNotFoundMetaSnafu {
|
||||
msg: format!("Failed to get table info for table ids: {:?}", tids),
|
||||
})?;
|
||||
|
||||
let group_by_table_name = group_by_table_id
|
||||
.into_iter()
|
||||
.filter_map(|(id, timestamps)| {
|
||||
let table_name = table_infos.get(&id).map(|info| info.table_name());
|
||||
let Some(table_name) = table_name else {
|
||||
warn!("Failed to get table infos for table id: {:?}", id);
|
||||
return None;
|
||||
};
|
||||
let table_name = [
|
||||
table_name.catalog_name,
|
||||
table_name.schema_name,
|
||||
table_name.table_name,
|
||||
];
|
||||
let schema = &table_infos.get(&id).unwrap().table_info.meta.schema;
|
||||
let time_index_unit = schema.column_schemas[schema.timestamp_index.unwrap()]
|
||||
.data_type
|
||||
.as_timestamp()
|
||||
.unwrap()
|
||||
.unit();
|
||||
Some((table_name, (timestamps, time_index_unit)))
|
||||
})
|
||||
.collect::<HashMap<_, _>>();
|
||||
|
||||
let group_by_table_name = Arc::new(group_by_table_name);
|
||||
|
||||
let mut handles = Vec::new();
|
||||
let tasks = self.tasks.read().await;
|
||||
|
||||
for (_flow_id, task) in tasks.iter() {
|
||||
let src_table_names = &task.config.source_table_names;
|
||||
|
||||
if src_table_names
|
||||
.iter()
|
||||
.all(|name| !group_by_table_name.contains_key(name))
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
let group_by_table_name = group_by_table_name.clone();
|
||||
let task = task.clone();
|
||||
|
||||
let handle: JoinHandle<Result<(), Error>> = tokio::spawn(async move {
|
||||
let src_table_names = &task.config.source_table_names;
|
||||
let mut all_dirty_windows = vec![];
|
||||
for src_table_name in src_table_names {
|
||||
if let Some((timestamps, unit)) = group_by_table_name.get(src_table_name) {
|
||||
let Some(expr) = &task.config.time_window_expr else {
|
||||
continue;
|
||||
};
|
||||
for timestamp in timestamps {
|
||||
let align_start = expr
|
||||
.eval(common_time::Timestamp::new(*timestamp, *unit))?
|
||||
.0
|
||||
.context(UnexpectedSnafu {
|
||||
reason: "Failed to eval start value",
|
||||
})?;
|
||||
all_dirty_windows.push(align_start);
|
||||
}
|
||||
}
|
||||
}
|
||||
let mut state = task.state.write().unwrap();
|
||||
let flow_id_label = task.config.flow_id.to_string();
|
||||
for timestamp in all_dirty_windows {
|
||||
state.dirty_time_windows.add_window(timestamp, None);
|
||||
}
|
||||
|
||||
METRIC_FLOW_BATCHING_ENGINE_BULK_MARK_TIME_WINDOW
|
||||
.with_label_values(&[&flow_id_label])
|
||||
.set(state.dirty_time_windows.len() as f64);
|
||||
Ok(())
|
||||
});
|
||||
handles.push(handle);
|
||||
}
|
||||
drop(tasks);
|
||||
for handle in handles {
|
||||
match handle.await {
|
||||
Err(e) => {
|
||||
warn!("Failed to handle inserts: {e}");
|
||||
}
|
||||
Ok(Ok(())) => (),
|
||||
Ok(Err(e)) => {
|
||||
warn!("Failed to handle inserts: {e}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(Default::default())
|
||||
}
|
||||
|
||||
pub async fn handle_inserts_inner(
|
||||
&self,
|
||||
request: api::v1::region::InsertRequests,
|
||||
|
||||
@@ -156,6 +156,11 @@ impl DirtyTimeWindows {
|
||||
self.windows.clear();
|
||||
}
|
||||
|
||||
/// Number of dirty windows.
|
||||
pub fn len(&self) -> usize {
|
||||
self.windows.len()
|
||||
}
|
||||
|
||||
/// Generate all filter expressions consuming all time windows
|
||||
///
|
||||
/// there is two limits:
|
||||
|
||||
@@ -61,7 +61,9 @@ use crate::error::{
|
||||
SubstraitEncodeLogicalPlanSnafu, UnexpectedSnafu,
|
||||
};
|
||||
use crate::metrics::{
|
||||
METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME, METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY,
|
||||
METRIC_FLOW_BATCHING_ENGINE_ERROR_CNT, METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME,
|
||||
METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY, METRIC_FLOW_BATCHING_ENGINE_START_QUERY_CNT,
|
||||
METRIC_FLOW_ROWS,
|
||||
};
|
||||
use crate::{Error, FlowId};
|
||||
|
||||
@@ -371,6 +373,9 @@ impl BatchingTask {
|
||||
"Flow {flow_id} executed, affected_rows: {affected_rows:?}, elapsed: {:?}",
|
||||
elapsed
|
||||
);
|
||||
METRIC_FLOW_ROWS
|
||||
.with_label_values(&[format!("{}-out-batching", flow_id).as_str()])
|
||||
.inc_by(*affected_rows as _);
|
||||
} else if let Err(err) = &res {
|
||||
warn!(
|
||||
"Failed to execute Flow {flow_id} on frontend {:?}, result: {err:?}, elapsed: {:?} with query: {}",
|
||||
@@ -410,6 +415,7 @@ impl BatchingTask {
|
||||
engine: QueryEngineRef,
|
||||
frontend_client: Arc<FrontendClient>,
|
||||
) {
|
||||
let flow_id_str = self.config.flow_id.to_string();
|
||||
loop {
|
||||
// first check if shutdown signal is received
|
||||
// if so, break the loop
|
||||
@@ -427,6 +433,9 @@ impl BatchingTask {
|
||||
Err(TryRecvError::Empty) => (),
|
||||
}
|
||||
}
|
||||
METRIC_FLOW_BATCHING_ENGINE_START_QUERY_CNT
|
||||
.with_label_values(&[&flow_id_str])
|
||||
.inc();
|
||||
|
||||
let new_query = match self.gen_insert_plan(&engine).await {
|
||||
Ok(new_query) => new_query,
|
||||
@@ -473,6 +482,9 @@ impl BatchingTask {
|
||||
}
|
||||
// TODO(discord9): this error should have better place to go, but for now just print error, also more context is needed
|
||||
Err(err) => {
|
||||
METRIC_FLOW_BATCHING_ENGINE_ERROR_CNT
|
||||
.with_label_values(&[&flow_id_str])
|
||||
.inc();
|
||||
match new_query {
|
||||
Some(query) => {
|
||||
common_telemetry::error!(err; "Failed to execute query for flow={} with query: {query}", self.config.flow_id)
|
||||
|
||||
@@ -58,11 +58,32 @@ lazy_static! {
|
||||
vec![60., 4. * 60., 16. * 60., 64. * 60., 256. * 60.]
|
||||
)
|
||||
.unwrap();
|
||||
pub static ref METRIC_FLOW_BATCHING_ENGINE_BULK_MARK_TIME_WINDOW: GaugeVec =
|
||||
register_gauge_vec!(
|
||||
"greptime_flow_batching_engine_bulk_mark_time_window",
|
||||
"flow batching engine query time window count marked by bulk inserts",
|
||||
&["flow_id"],
|
||||
)
|
||||
.unwrap();
|
||||
pub static ref METRIC_FLOW_BATCHING_ENGINE_START_QUERY_CNT: IntCounterVec =
|
||||
register_int_counter_vec!(
|
||||
"greptime_flow_batching_start_query_count",
|
||||
"flow batching engine started query count",
|
||||
&["flow_id"],
|
||||
)
|
||||
.unwrap();
|
||||
pub static ref METRIC_FLOW_BATCHING_ENGINE_ERROR_CNT: IntCounterVec =
|
||||
register_int_counter_vec!(
|
||||
"greptime_flow_batching_error_count",
|
||||
"flow batching engine error count per flow id",
|
||||
&["flow_id"],
|
||||
)
|
||||
.unwrap();
|
||||
pub static ref METRIC_FLOW_RUN_INTERVAL_MS: IntGauge =
|
||||
register_int_gauge!("greptime_flow_run_interval_ms", "flow run interval in ms").unwrap();
|
||||
pub static ref METRIC_FLOW_ROWS: IntCounterVec = register_int_counter_vec!(
|
||||
"greptime_flow_processed_rows",
|
||||
"Count of rows flowing through the system",
|
||||
"Count of rows flowing through the system.",
|
||||
&["direction"]
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
@@ -17,6 +17,7 @@
|
||||
use std::net::SocketAddr;
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::v1::flow::DirtyWindowRequests;
|
||||
use api::v1::{RowDeleteRequests, RowInsertRequests};
|
||||
use cache::{TABLE_FLOWNODE_SET_CACHE_NAME, TABLE_ROUTE_CACHE_NAME};
|
||||
use catalog::CatalogManagerRef;
|
||||
@@ -136,6 +137,18 @@ impl flow_server::Flow for FlowService {
|
||||
.map(Response::new)
|
||||
.map_err(to_status_with_last_err)
|
||||
}
|
||||
|
||||
async fn handle_mark_dirty_time_window(
|
||||
&self,
|
||||
reqs: Request<DirtyWindowRequests>,
|
||||
) -> Result<Response<FlowResponse>, Status> {
|
||||
self.dual_engine
|
||||
.batching_engine()
|
||||
.handle_mark_dirty_time_window(reqs.into_inner())
|
||||
.await
|
||||
.map(Response::new)
|
||||
.map_err(to_status_with_last_err)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
@@ -578,6 +591,7 @@ impl FrontendInvoker {
|
||||
layered_cache_registry.clone(),
|
||||
inserter.clone(),
|
||||
table_route_cache,
|
||||
None,
|
||||
));
|
||||
|
||||
let invoker = FrontendInvoker::new(inserter, deleter, statement_executor);
|
||||
|
||||
@@ -70,6 +70,7 @@ store-api.workspace = true
|
||||
substrait.workspace = true
|
||||
table.workspace = true
|
||||
tokio.workspace = true
|
||||
tokio-util.workspace = true
|
||||
toml.workspace = true
|
||||
tonic.workspace = true
|
||||
|
||||
|
||||
@@ -357,6 +357,12 @@ pub enum Error {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Query has been cancelled"))]
|
||||
Cancelled {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
}
|
||||
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
@@ -435,6 +441,8 @@ impl ErrorExt for Error {
|
||||
Error::InFlightWriteBytesExceeded { .. } => StatusCode::RateLimited,
|
||||
|
||||
Error::DataFusion { error, .. } => datafusion_status_code::<Self>(error, None),
|
||||
|
||||
Error::Cancelled { .. } => StatusCode::Cancelled,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -33,6 +33,7 @@ use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
|
||||
use catalog::process_manager::ProcessManagerRef;
|
||||
use catalog::CatalogManagerRef;
|
||||
use client::OutputData;
|
||||
use common_base::cancellation::CancellableFuture;
|
||||
use common_base::Plugins;
|
||||
use common_config::KvBackendConfig;
|
||||
use common_error::ext::{BoxedError, ErrorExt};
|
||||
@@ -81,7 +82,7 @@ use crate::error::{
|
||||
};
|
||||
use crate::limiter::LimiterRef;
|
||||
use crate::slow_query_recorder::SlowQueryRecorder;
|
||||
use crate::stream_wrapper::StreamWrapper;
|
||||
use crate::stream_wrapper::CancellableStreamWrapper;
|
||||
|
||||
/// The frontend instance contains necessary components, and implements many
|
||||
/// traits, like [`servers::query_handler::grpc::GrpcQueryHandler`],
|
||||
@@ -183,67 +184,75 @@ impl Instance {
|
||||
query_ctx.current_catalog().to_string(),
|
||||
vec![query_ctx.current_schema()],
|
||||
stmt.to_string(),
|
||||
"unknown".to_string(),
|
||||
query_ctx.conn_info().to_string(),
|
||||
None,
|
||||
);
|
||||
|
||||
let output = match stmt {
|
||||
Statement::Query(_) | Statement::Explain(_) | Statement::Delete(_) => {
|
||||
// TODO: remove this when format is supported in datafusion
|
||||
if let Statement::Explain(explain) = &stmt {
|
||||
if let Some(format) = explain.format() {
|
||||
query_ctx.set_explain_format(format.to_string());
|
||||
let query_fut = async {
|
||||
match stmt {
|
||||
Statement::Query(_) | Statement::Explain(_) | Statement::Delete(_) => {
|
||||
// TODO: remove this when format is supported in datafusion
|
||||
if let Statement::Explain(explain) = &stmt {
|
||||
if let Some(format) = explain.format() {
|
||||
query_ctx.set_explain_format(format.to_string());
|
||||
}
|
||||
}
|
||||
|
||||
let stmt = QueryStatement::Sql(stmt);
|
||||
let plan = self
|
||||
.statement_executor
|
||||
.plan(&stmt, query_ctx.clone())
|
||||
.await?;
|
||||
|
||||
let QueryStatement::Sql(stmt) = stmt else {
|
||||
unreachable!()
|
||||
};
|
||||
query_interceptor.pre_execute(&stmt, Some(&plan), query_ctx.clone())?;
|
||||
self.statement_executor
|
||||
.exec_plan(plan, query_ctx)
|
||||
.await
|
||||
.context(TableOperationSnafu)
|
||||
}
|
||||
Statement::Tql(tql) => {
|
||||
let plan = self
|
||||
.statement_executor
|
||||
.plan_tql(tql.clone(), &query_ctx)
|
||||
.await?;
|
||||
|
||||
let stmt = QueryStatement::Sql(stmt);
|
||||
let plan = self
|
||||
.statement_executor
|
||||
.plan(&stmt, query_ctx.clone())
|
||||
.await?;
|
||||
|
||||
let QueryStatement::Sql(stmt) = stmt else {
|
||||
unreachable!()
|
||||
};
|
||||
query_interceptor.pre_execute(&stmt, Some(&plan), query_ctx.clone())?;
|
||||
|
||||
self.statement_executor.exec_plan(plan, query_ctx).await
|
||||
}
|
||||
Statement::Tql(tql) => {
|
||||
let plan = self
|
||||
.statement_executor
|
||||
.plan_tql(tql.clone(), &query_ctx)
|
||||
.await?;
|
||||
|
||||
query_interceptor.pre_execute(
|
||||
&Statement::Tql(tql),
|
||||
Some(&plan),
|
||||
query_ctx.clone(),
|
||||
)?;
|
||||
|
||||
self.statement_executor.exec_plan(plan, query_ctx).await
|
||||
}
|
||||
_ => {
|
||||
query_interceptor.pre_execute(&stmt, None, query_ctx.clone())?;
|
||||
|
||||
self.statement_executor.execute_sql(stmt, query_ctx).await
|
||||
query_interceptor.pre_execute(
|
||||
&Statement::Tql(tql),
|
||||
Some(&plan),
|
||||
query_ctx.clone(),
|
||||
)?;
|
||||
self.statement_executor
|
||||
.exec_plan(plan, query_ctx)
|
||||
.await
|
||||
.context(TableOperationSnafu)
|
||||
}
|
||||
_ => {
|
||||
query_interceptor.pre_execute(&stmt, None, query_ctx.clone())?;
|
||||
self.statement_executor
|
||||
.execute_sql(stmt, query_ctx)
|
||||
.await
|
||||
.context(TableOperationSnafu)
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
match output {
|
||||
Ok(output) => {
|
||||
CancellableFuture::new(query_fut, ticket.cancellation_handle.clone())
|
||||
.await
|
||||
.map_err(|_| error::CancelledSnafu.build())?
|
||||
.map(|output| {
|
||||
let Output { meta, data } = output;
|
||||
|
||||
let data = match data {
|
||||
OutputData::Stream(stream) => {
|
||||
OutputData::Stream(Box::pin(StreamWrapper::new(stream, ticket)))
|
||||
OutputData::Stream(Box::pin(CancellableStreamWrapper::new(stream, ticket)))
|
||||
}
|
||||
other => other,
|
||||
};
|
||||
Ok(Output { data, meta })
|
||||
}
|
||||
Err(e) => Err(e).context(TableOperationSnafu),
|
||||
}
|
||||
Output { data, meta }
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -605,6 +614,8 @@ pub fn check_permission(
|
||||
}
|
||||
// cursor operations are always allowed once it's created
|
||||
Statement::FetchCursor(_) | Statement::CloseCursor(_) => {}
|
||||
// User can only kill process in their own catalog.
|
||||
Statement::Kill(_) => {}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -180,6 +180,7 @@ impl FrontendBuilder {
|
||||
local_cache_invalidator,
|
||||
inserter.clone(),
|
||||
table_route_cache,
|
||||
Some(process_manager.clone()),
|
||||
));
|
||||
|
||||
let pipeline_operator = Arc::new(PipelineOperator::new(
|
||||
|
||||
@@ -35,8 +35,8 @@ use servers::query_handler::grpc::GrpcQueryHandler;
|
||||
use servers::query_handler::sql::SqlQueryHandler;
|
||||
use session::context::QueryContextRef;
|
||||
use snafu::{ensure, OptionExt, ResultExt};
|
||||
use table::metadata::TableId;
|
||||
use table::table_name::TableName;
|
||||
use table::TableRef;
|
||||
|
||||
use crate::error::{
|
||||
CatalogSnafu, DataFusionSnafu, Error, InFlightWriteBytesExceededSnafu,
|
||||
@@ -235,34 +235,33 @@ impl GrpcQueryHandler for Instance {
|
||||
|
||||
async fn put_record_batch(
|
||||
&self,
|
||||
table: &TableName,
|
||||
table_id: &mut Option<TableId>,
|
||||
table_name: &TableName,
|
||||
table_ref: &mut Option<TableRef>,
|
||||
decoder: &mut FlightDecoder,
|
||||
data: FlightData,
|
||||
) -> Result<AffectedRows> {
|
||||
let table_id = if let Some(table_id) = table_id {
|
||||
*table_id
|
||||
let table = if let Some(table) = table_ref {
|
||||
table.clone()
|
||||
} else {
|
||||
let table = self
|
||||
.catalog_manager()
|
||||
.table(
|
||||
&table.catalog_name,
|
||||
&table.schema_name,
|
||||
&table.table_name,
|
||||
&table_name.catalog_name,
|
||||
&table_name.schema_name,
|
||||
&table_name.table_name,
|
||||
None,
|
||||
)
|
||||
.await
|
||||
.context(CatalogSnafu)?
|
||||
.with_context(|| TableNotFoundSnafu {
|
||||
table_name: table.to_string(),
|
||||
table_name: table_name.to_string(),
|
||||
})?;
|
||||
let id = table.table_info().table_id();
|
||||
*table_id = Some(id);
|
||||
id
|
||||
*table_ref = Some(table.clone());
|
||||
table
|
||||
};
|
||||
|
||||
self.inserter
|
||||
.handle_bulk_insert(table_id, decoder, data)
|
||||
.handle_bulk_insert(table, decoder, data)
|
||||
.await
|
||||
.context(TableOperationSnafu)
|
||||
}
|
||||
|
||||
@@ -15,37 +15,52 @@
|
||||
use std::pin::Pin;
|
||||
use std::task::{Context, Poll};
|
||||
|
||||
use catalog::process_manager::Ticket;
|
||||
use common_recordbatch::adapter::RecordBatchMetrics;
|
||||
use common_recordbatch::{OrderOption, RecordBatch, RecordBatchStream, SendableRecordBatchStream};
|
||||
use datatypes::schema::SchemaRef;
|
||||
use futures::Stream;
|
||||
|
||||
pub struct StreamWrapper<T> {
|
||||
pub struct CancellableStreamWrapper {
|
||||
inner: SendableRecordBatchStream,
|
||||
_attachment: T,
|
||||
ticket: Ticket,
|
||||
}
|
||||
|
||||
impl<T> Unpin for StreamWrapper<T> {}
|
||||
impl Unpin for CancellableStreamWrapper {}
|
||||
|
||||
impl<T> StreamWrapper<T> {
|
||||
pub fn new(stream: SendableRecordBatchStream, attachment: T) -> Self {
|
||||
impl CancellableStreamWrapper {
|
||||
pub fn new(stream: SendableRecordBatchStream, ticket: Ticket) -> Self {
|
||||
Self {
|
||||
inner: stream,
|
||||
_attachment: attachment,
|
||||
ticket,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Stream for StreamWrapper<T> {
|
||||
impl Stream for CancellableStreamWrapper {
|
||||
type Item = common_recordbatch::error::Result<RecordBatch>;
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||
let this = &mut *self;
|
||||
Pin::new(&mut this.inner).poll_next(cx)
|
||||
if this.ticket.cancellation_handle.is_cancelled() {
|
||||
return Poll::Ready(Some(common_recordbatch::error::StreamCancelledSnafu.fail()));
|
||||
}
|
||||
|
||||
if let Poll::Ready(res) = Pin::new(&mut this.inner).poll_next(cx) {
|
||||
return Poll::Ready(res);
|
||||
}
|
||||
|
||||
// on pending, register cancellation waker.
|
||||
this.ticket.cancellation_handle.waker().register(cx.waker());
|
||||
// check if canceled again.
|
||||
if this.ticket.cancellation_handle.is_cancelled() {
|
||||
return Poll::Ready(Some(common_recordbatch::error::StreamCancelledSnafu.fail()));
|
||||
}
|
||||
Poll::Pending
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> RecordBatchStream for StreamWrapper<T> {
|
||||
impl RecordBatchStream for CancellableStreamWrapper {
|
||||
fn schema(&self) -> SchemaRef {
|
||||
self.inner.schema()
|
||||
}
|
||||
@@ -58,3 +73,295 @@ impl<T> RecordBatchStream for StreamWrapper<T> {
|
||||
self.inner.metrics()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::pin::Pin;
|
||||
use std::sync::Arc;
|
||||
use std::task::{Context, Poll};
|
||||
use std::time::Duration;
|
||||
|
||||
use catalog::process_manager::ProcessManager;
|
||||
use common_recordbatch::adapter::RecordBatchMetrics;
|
||||
use common_recordbatch::{OrderOption, RecordBatch, RecordBatchStream};
|
||||
use datatypes::data_type::ConcreteDataType;
|
||||
use datatypes::prelude::VectorRef;
|
||||
use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
|
||||
use datatypes::vectors::Int32Vector;
|
||||
use futures::{Stream, StreamExt};
|
||||
use tokio::time::{sleep, timeout};
|
||||
|
||||
use super::CancellableStreamWrapper;
|
||||
|
||||
// Mock stream for testing
|
||||
struct MockRecordBatchStream {
|
||||
schema: SchemaRef,
|
||||
batches: Vec<common_recordbatch::error::Result<RecordBatch>>,
|
||||
current: usize,
|
||||
delay: Option<Duration>,
|
||||
}
|
||||
|
||||
impl MockRecordBatchStream {
|
||||
fn new(batches: Vec<common_recordbatch::error::Result<RecordBatch>>) -> Self {
|
||||
let schema = Arc::new(Schema::new(vec![ColumnSchema::new(
|
||||
"test_col",
|
||||
ConcreteDataType::int32_datatype(),
|
||||
false,
|
||||
)]));
|
||||
|
||||
Self {
|
||||
schema,
|
||||
batches,
|
||||
current: 0,
|
||||
delay: None,
|
||||
}
|
||||
}
|
||||
|
||||
fn with_delay(mut self, delay: Duration) -> Self {
|
||||
self.delay = Some(delay);
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
impl Stream for MockRecordBatchStream {
|
||||
type Item = common_recordbatch::error::Result<RecordBatch>;
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||
if let Some(delay) = self.delay {
|
||||
// Simulate async delay
|
||||
let waker = cx.waker().clone();
|
||||
let delay_clone = delay;
|
||||
tokio::spawn(async move {
|
||||
sleep(delay_clone).await;
|
||||
waker.wake();
|
||||
});
|
||||
self.delay = None; // Only delay once
|
||||
return Poll::Pending;
|
||||
}
|
||||
|
||||
if self.current >= self.batches.len() {
|
||||
return Poll::Ready(None);
|
||||
}
|
||||
|
||||
let batch = self.batches[self.current].as_ref().unwrap().clone();
|
||||
self.current += 1;
|
||||
Poll::Ready(Some(Ok(batch)))
|
||||
}
|
||||
}
|
||||
|
||||
impl RecordBatchStream for MockRecordBatchStream {
|
||||
fn schema(&self) -> SchemaRef {
|
||||
self.schema.clone()
|
||||
}
|
||||
|
||||
fn output_ordering(&self) -> Option<&[OrderOption]> {
|
||||
None
|
||||
}
|
||||
|
||||
fn metrics(&self) -> Option<RecordBatchMetrics> {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
fn create_test_batch() -> RecordBatch {
|
||||
let schema = Arc::new(Schema::new(vec![ColumnSchema::new(
|
||||
"test_col",
|
||||
ConcreteDataType::int32_datatype(),
|
||||
false,
|
||||
)]));
|
||||
RecordBatch::new(
|
||||
schema,
|
||||
vec![Arc::new(Int32Vector::from_values(0..3)) as VectorRef],
|
||||
)
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_stream_completes_normally() {
|
||||
let batch = create_test_batch();
|
||||
let mock_stream = MockRecordBatchStream::new(vec![Ok(batch.clone())]);
|
||||
let process_manager = Arc::new(ProcessManager::new("".to_string(), None));
|
||||
let ticket = process_manager.register_query(
|
||||
"catalog".to_string(),
|
||||
vec![],
|
||||
"query".to_string(),
|
||||
"client".to_string(),
|
||||
None,
|
||||
);
|
||||
|
||||
let mut cancellable_stream = CancellableStreamWrapper::new(Box::pin(mock_stream), ticket);
|
||||
|
||||
let result = cancellable_stream.next().await;
|
||||
assert!(result.is_some());
|
||||
assert!(result.unwrap().is_ok());
|
||||
|
||||
let end_result = cancellable_stream.next().await;
|
||||
assert!(end_result.is_none());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_stream_cancelled_before_start() {
|
||||
let batch = create_test_batch();
|
||||
let mock_stream = MockRecordBatchStream::new(vec![Ok(batch)]);
|
||||
let process_manager = Arc::new(ProcessManager::new("".to_string(), None));
|
||||
let ticket = process_manager.register_query(
|
||||
"catalog".to_string(),
|
||||
vec![],
|
||||
"query".to_string(),
|
||||
"client".to_string(),
|
||||
None,
|
||||
);
|
||||
|
||||
// Cancel before creating the wrapper
|
||||
ticket.cancellation_handle.cancel();
|
||||
|
||||
let mut cancellable_stream = CancellableStreamWrapper::new(Box::pin(mock_stream), ticket);
|
||||
|
||||
let result = cancellable_stream.next().await;
|
||||
assert!(result.is_some());
|
||||
assert!(result.unwrap().is_err());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_stream_cancelled_during_execution() {
|
||||
let batch = create_test_batch();
|
||||
let mock_stream =
|
||||
MockRecordBatchStream::new(vec![Ok(batch)]).with_delay(Duration::from_millis(100));
|
||||
let process_manager = Arc::new(ProcessManager::new("".to_string(), None));
|
||||
let ticket = process_manager.register_query(
|
||||
"catalog".to_string(),
|
||||
vec![],
|
||||
"query".to_string(),
|
||||
"client".to_string(),
|
||||
None,
|
||||
);
|
||||
let cancellation_handle = ticket.cancellation_handle.clone();
|
||||
|
||||
let mut cancellable_stream = CancellableStreamWrapper::new(Box::pin(mock_stream), ticket);
|
||||
|
||||
// Cancel after a short delay
|
||||
tokio::spawn(async move {
|
||||
sleep(Duration::from_millis(50)).await;
|
||||
cancellation_handle.cancel();
|
||||
});
|
||||
|
||||
let result = cancellable_stream.next().await;
|
||||
assert!(result.is_some());
|
||||
assert!(result.unwrap().is_err());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_stream_completes_before_cancellation() {
|
||||
let batch = create_test_batch();
|
||||
let mock_stream = MockRecordBatchStream::new(vec![Ok(batch.clone())]);
|
||||
let process_manager = Arc::new(ProcessManager::new("".to_string(), None));
|
||||
let ticket = process_manager.register_query(
|
||||
"catalog".to_string(),
|
||||
vec![],
|
||||
"query".to_string(),
|
||||
"client".to_string(),
|
||||
None,
|
||||
);
|
||||
let cancellation_handle = ticket.cancellation_handle.clone();
|
||||
|
||||
let mut cancellable_stream = CancellableStreamWrapper::new(Box::pin(mock_stream), ticket);
|
||||
|
||||
// Try to cancel after the stream should have completed
|
||||
tokio::spawn(async move {
|
||||
sleep(Duration::from_millis(100)).await;
|
||||
cancellation_handle.cancel();
|
||||
});
|
||||
|
||||
let result = cancellable_stream.next().await;
|
||||
assert!(result.is_some());
|
||||
assert!(result.unwrap().is_ok());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_multiple_batches() {
|
||||
let batch1 = create_test_batch();
|
||||
let batch2 = create_test_batch();
|
||||
let mock_stream = MockRecordBatchStream::new(vec![Ok(batch1), Ok(batch2)]);
|
||||
let process_manager = Arc::new(ProcessManager::new("".to_string(), None));
|
||||
let ticket = process_manager.register_query(
|
||||
"catalog".to_string(),
|
||||
vec![],
|
||||
"query".to_string(),
|
||||
"client".to_string(),
|
||||
None,
|
||||
);
|
||||
|
||||
let mut cancellable_stream = CancellableStreamWrapper::new(Box::pin(mock_stream), ticket);
|
||||
|
||||
// First batch
|
||||
let result1 = cancellable_stream.next().await;
|
||||
assert!(result1.is_some());
|
||||
assert!(result1.unwrap().is_ok());
|
||||
|
||||
// Second batch
|
||||
let result2 = cancellable_stream.next().await;
|
||||
assert!(result2.is_some());
|
||||
assert!(result2.unwrap().is_ok());
|
||||
|
||||
// End of stream
|
||||
let end_result = cancellable_stream.next().await;
|
||||
assert!(end_result.is_none());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_record_batch_stream_methods() {
|
||||
let batch = create_test_batch();
|
||||
let mock_stream = MockRecordBatchStream::new(vec![Ok(batch)]);
|
||||
let process_manager = Arc::new(ProcessManager::new("".to_string(), None));
|
||||
let ticket = process_manager.register_query(
|
||||
"catalog".to_string(),
|
||||
vec![],
|
||||
"query".to_string(),
|
||||
"client".to_string(),
|
||||
None,
|
||||
);
|
||||
|
||||
let cancellable_stream = CancellableStreamWrapper::new(Box::pin(mock_stream), ticket);
|
||||
|
||||
// Test schema method
|
||||
let schema = cancellable_stream.schema();
|
||||
assert_eq!(schema.column_schemas().len(), 1);
|
||||
assert_eq!(schema.column_schemas()[0].name, "test_col");
|
||||
|
||||
// Test output_ordering method
|
||||
assert!(cancellable_stream.output_ordering().is_none());
|
||||
|
||||
// Test metrics method
|
||||
assert!(cancellable_stream.metrics().is_none());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_cancellation_during_pending_poll() {
|
||||
let batch = create_test_batch();
|
||||
let mock_stream =
|
||||
MockRecordBatchStream::new(vec![Ok(batch)]).with_delay(Duration::from_millis(200));
|
||||
let process_manager = Arc::new(ProcessManager::new("".to_string(), None));
|
||||
let ticket = process_manager.register_query(
|
||||
"catalog".to_string(),
|
||||
vec![],
|
||||
"query".to_string(),
|
||||
"client".to_string(),
|
||||
None,
|
||||
);
|
||||
let cancellation_handle = ticket.cancellation_handle.clone();
|
||||
|
||||
let mut cancellable_stream = CancellableStreamWrapper::new(Box::pin(mock_stream), ticket);
|
||||
|
||||
// Cancel while the stream is pending
|
||||
tokio::spawn(async move {
|
||||
sleep(Duration::from_millis(50)).await;
|
||||
cancellation_handle.cancel();
|
||||
});
|
||||
|
||||
let result = timeout(Duration::from_millis(300), cancellable_stream.next()).await;
|
||||
assert!(result.is_ok());
|
||||
let stream_result = result.unwrap();
|
||||
assert!(stream_result.is_some());
|
||||
assert!(stream_result.unwrap().is_err());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -27,6 +27,7 @@ futures-util.workspace = true
|
||||
humantime-serde.workspace = true
|
||||
itertools.workspace = true
|
||||
lazy_static = "1.4"
|
||||
mito-codec.workspace = true
|
||||
mito2.workspace = true
|
||||
mur3 = "0.1"
|
||||
object-store.workspace = true
|
||||
|
||||
@@ -115,7 +115,7 @@ pub enum Error {
|
||||
|
||||
#[snafu(display("Failed to encode primary key"))]
|
||||
EncodePrimaryKey {
|
||||
source: mito2::error::Error,
|
||||
source: mito_codec::error::Error,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
@@ -18,7 +18,7 @@ use std::hash::Hash;
|
||||
use api::v1::value::ValueData;
|
||||
use api::v1::{ColumnDataType, ColumnSchema, Row, Rows, SemanticType, Value};
|
||||
use datatypes::value::ValueRef;
|
||||
use mito2::row_converter::SparsePrimaryKeyCodec;
|
||||
use mito_codec::row_converter::SparsePrimaryKeyCodec;
|
||||
use smallvec::SmallVec;
|
||||
use snafu::ResultExt;
|
||||
use store_api::codec::PrimaryKeyEncoding;
|
||||
|
||||
30
src/mito-codec/Cargo.toml
Normal file
30
src/mito-codec/Cargo.toml
Normal file
@@ -0,0 +1,30 @@
|
||||
[package]
|
||||
name = "mito-codec"
|
||||
version.workspace = true
|
||||
edition.workspace = true
|
||||
license.workspace = true
|
||||
|
||||
[features]
|
||||
default = []
|
||||
testing = []
|
||||
|
||||
[dependencies]
|
||||
api.workspace = true
|
||||
bytes.workspace = true
|
||||
common-base.workspace = true
|
||||
common-decimal.workspace = true
|
||||
common-error.workspace = true
|
||||
common-macro.workspace = true
|
||||
common-recordbatch.workspace = true
|
||||
common-telemetry.workspace = true
|
||||
common-time.workspace = true
|
||||
datatypes.workspace = true
|
||||
memcomparable = "0.2"
|
||||
paste.workspace = true
|
||||
serde.workspace = true
|
||||
snafu.workspace = true
|
||||
store-api.workspace = true
|
||||
|
||||
[dev-dependencies]
|
||||
datafusion-common.workspace = true
|
||||
datafusion-expr.workspace = true
|
||||
95
src/mito-codec/src/error.rs
Normal file
95
src/mito-codec/src/error.rs
Normal file
@@ -0,0 +1,95 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::any::Any;
|
||||
|
||||
use common_error::ext::ErrorExt;
|
||||
use common_error::status_code::StatusCode;
|
||||
use common_macro::stack_trace_debug;
|
||||
use datatypes::prelude::ConcreteDataType;
|
||||
use snafu::{Location, Snafu};
|
||||
|
||||
/// Error definitions for mito encoding.
|
||||
#[derive(Snafu)]
|
||||
#[snafu(visibility(pub))]
|
||||
#[stack_trace_debug]
|
||||
pub enum Error {
|
||||
#[snafu(display("Row value mismatches field data type"))]
|
||||
FieldTypeMismatch {
|
||||
// Box the source to reduce the size of the error.
|
||||
#[snafu(source(from(datatypes::error::Error, Box::new)))]
|
||||
source: Box<datatypes::error::Error>,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to serialize field"))]
|
||||
SerializeField {
|
||||
#[snafu(source)]
|
||||
error: memcomparable::Error,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display(
|
||||
"Data type: {} does not support serialization/deserialization",
|
||||
data_type,
|
||||
))]
|
||||
NotSupportedField {
|
||||
data_type: ConcreteDataType,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to deserialize field"))]
|
||||
DeserializeField {
|
||||
#[snafu(source)]
|
||||
error: memcomparable::Error,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Operation not supported: {}", err_msg))]
|
||||
UnsupportedOperation {
|
||||
err_msg: String,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Encode null value"))]
|
||||
IndexEncodeNull {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
}
|
||||
|
||||
pub type Result<T, E = Error> = std::result::Result<T, E>;
|
||||
|
||||
impl ErrorExt for Error {
|
||||
fn status_code(&self) -> StatusCode {
|
||||
use Error::*;
|
||||
|
||||
match self {
|
||||
FieldTypeMismatch { source, .. } => source.status_code(),
|
||||
SerializeField { .. } | DeserializeField { .. } | IndexEncodeNull { .. } => {
|
||||
StatusCode::InvalidArguments
|
||||
}
|
||||
NotSupportedField { .. } | UnsupportedOperation { .. } => StatusCode::Unsupported,
|
||||
}
|
||||
}
|
||||
|
||||
fn as_any(&self) -> &dyn Any {
|
||||
self
|
||||
}
|
||||
}
|
||||
@@ -12,6 +12,8 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
//! Index codec utilities.
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
|
||||
@@ -47,7 +49,7 @@ impl IndexValueCodec {
|
||||
) -> Result<()> {
|
||||
ensure!(!value.is_null(), IndexEncodeNullSnafu);
|
||||
|
||||
if matches!(field.data_type, ConcreteDataType::String(_)) {
|
||||
if matches!(field.data_type(), ConcreteDataType::String(_)) {
|
||||
let value = value
|
||||
.as_string()
|
||||
.context(FieldTypeMismatchSnafu)?
|
||||
@@ -31,7 +31,7 @@ pub struct KeyValues {
|
||||
///
|
||||
/// This mutation must be a valid mutation and rows in the mutation
|
||||
/// must not be `None`.
|
||||
pub(crate) mutation: Mutation,
|
||||
pub mutation: Mutation,
|
||||
/// Key value read helper.
|
||||
helper: SparseReadRowHelper,
|
||||
/// Primary key encoding hint.
|
||||
@@ -333,8 +333,7 @@ mod tests {
|
||||
use api::v1::{self, ColumnDataType, SemanticType};
|
||||
|
||||
use super::*;
|
||||
use crate::test_util::i64_value;
|
||||
use crate::test_util::meta_util::TestRegionMetadataBuilder;
|
||||
use crate::test_util::{i64_value, TestRegionMetadataBuilder};
|
||||
|
||||
const TS_NAME: &str = "ts";
|
||||
const START_SEQ: SequenceNumber = 100;
|
||||
24
src/mito-codec/src/lib.rs
Normal file
24
src/mito-codec/src/lib.rs
Normal file
@@ -0,0 +1,24 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
//! Codec utilities for the Mito protocol.
|
||||
|
||||
pub mod error;
|
||||
pub mod index;
|
||||
pub mod key_values;
|
||||
pub mod primary_key_filter;
|
||||
pub mod row_converter;
|
||||
|
||||
#[cfg(any(test, feature = "testing"))]
|
||||
pub mod test_util;
|
||||
@@ -19,12 +19,17 @@ use api::v1::SemanticType;
|
||||
use common_recordbatch::filter::SimpleFilterEvaluator;
|
||||
use datatypes::value::Value;
|
||||
use store_api::metadata::RegionMetadataRef;
|
||||
use store_api::metric_engine_consts::DATA_SCHEMA_TABLE_ID_COLUMN_NAME;
|
||||
use store_api::storage::ColumnId;
|
||||
|
||||
use crate::error::Result;
|
||||
use crate::memtable::partition_tree::partition::Partition;
|
||||
use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyFilter, SparsePrimaryKeyCodec};
|
||||
|
||||
/// Returns true if this is a partition column for metrics in the memtable.
|
||||
pub fn is_partition_column(name: &str) -> bool {
|
||||
name == DATA_SCHEMA_TABLE_ID_COLUMN_NAME
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct PrimaryKeyFilterInner {
|
||||
metadata: RegionMetadataRef,
|
||||
@@ -42,7 +47,7 @@ impl PrimaryKeyFilterInner {
|
||||
|
||||
let mut result = true;
|
||||
for filter in self.filters.iter() {
|
||||
if Partition::is_partition_column(filter.column_name()) {
|
||||
if is_partition_column(filter.column_name()) {
|
||||
continue;
|
||||
}
|
||||
let Some(column) = self.metadata.column_by_name(filter.column_name()) else {
|
||||
@@ -149,9 +154,8 @@ mod tests {
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::v1::SemanticType;
|
||||
use datafusion::logical_expr::BinaryExpr;
|
||||
use datafusion_common::{Column, ScalarValue};
|
||||
use datafusion_expr::{Expr, Operator};
|
||||
use datafusion_expr::{BinaryExpr, Expr, Operator};
|
||||
use datatypes::prelude::ConcreteDataType;
|
||||
use datatypes::schema::ColumnSchema;
|
||||
use datatypes::value::ValueRef;
|
||||
164
src/mito-codec/src/row_converter.rs
Normal file
164
src/mito-codec/src/row_converter.rs
Normal file
@@ -0,0 +1,164 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
pub mod dense;
|
||||
pub mod sparse;
|
||||
|
||||
use std::fmt::Debug;
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_recordbatch::filter::SimpleFilterEvaluator;
|
||||
use datatypes::value::{Value, ValueRef};
|
||||
pub use dense::{DensePrimaryKeyCodec, SortField};
|
||||
pub use sparse::{SparsePrimaryKeyCodec, SparseValues, COLUMN_ID_ENCODE_SIZE};
|
||||
use store_api::codec::PrimaryKeyEncoding;
|
||||
use store_api::metadata::{RegionMetadata, RegionMetadataRef};
|
||||
use store_api::storage::ColumnId;
|
||||
|
||||
use crate::error::Result;
|
||||
use crate::key_values::KeyValue;
|
||||
|
||||
/// Row value encoder/decoder.
|
||||
pub trait PrimaryKeyCodecExt {
|
||||
/// Encodes rows to bytes.
|
||||
/// # Note
|
||||
/// Ensure the length of row iterator matches the length of fields.
|
||||
fn encode<'a, I>(&self, row: I) -> Result<Vec<u8>>
|
||||
where
|
||||
I: Iterator<Item = ValueRef<'a>>,
|
||||
{
|
||||
let mut buffer = Vec::new();
|
||||
self.encode_to_vec(row, &mut buffer)?;
|
||||
Ok(buffer)
|
||||
}
|
||||
|
||||
/// Encodes rows to specific vec.
|
||||
/// # Note
|
||||
/// Ensure the length of row iterator matches the length of fields.
|
||||
fn encode_to_vec<'a, I>(&self, row: I, buffer: &mut Vec<u8>) -> Result<()>
|
||||
where
|
||||
I: Iterator<Item = ValueRef<'a>>;
|
||||
}
|
||||
|
||||
pub trait PrimaryKeyFilter: Send + Sync {
|
||||
/// Returns true if the primary key matches the filter.
|
||||
fn matches(&mut self, pk: &[u8]) -> bool;
|
||||
}
|
||||
|
||||
/// Composite values decoded from primary key bytes.
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub enum CompositeValues {
|
||||
Dense(Vec<(ColumnId, Value)>),
|
||||
Sparse(SparseValues),
|
||||
}
|
||||
|
||||
impl CompositeValues {
|
||||
/// Extends the composite values with the given values.
|
||||
pub fn extend(&mut self, values: &[(ColumnId, Value)]) {
|
||||
match self {
|
||||
CompositeValues::Dense(dense_values) => {
|
||||
for (column_id, value) in values {
|
||||
dense_values.push((*column_id, value.clone()));
|
||||
}
|
||||
}
|
||||
CompositeValues::Sparse(sprase_value) => {
|
||||
for (column_id, value) in values {
|
||||
sprase_value.insert(*column_id, value.clone());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(any(test, feature = "testing"))]
|
||||
impl CompositeValues {
|
||||
pub fn into_sparse(self) -> SparseValues {
|
||||
match self {
|
||||
CompositeValues::Sparse(v) => v,
|
||||
_ => panic!("CompositeValues is not sparse"),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn into_dense(self) -> Vec<Value> {
|
||||
match self {
|
||||
CompositeValues::Dense(v) => v.into_iter().map(|(_, v)| v).collect(),
|
||||
_ => panic!("CompositeValues is not dense"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub trait PrimaryKeyCodec: Send + Sync + Debug {
|
||||
/// Encodes a key value to bytes.
|
||||
fn encode_key_value(&self, key_value: &KeyValue, buffer: &mut Vec<u8>) -> Result<()>;
|
||||
|
||||
/// Encodes values to bytes.
|
||||
fn encode_values(&self, values: &[(ColumnId, Value)], buffer: &mut Vec<u8>) -> Result<()>;
|
||||
|
||||
/// Encodes values to bytes.
|
||||
fn encode_value_refs(
|
||||
&self,
|
||||
values: &[(ColumnId, ValueRef)],
|
||||
buffer: &mut Vec<u8>,
|
||||
) -> Result<()>;
|
||||
|
||||
/// Returns the number of fields in the primary key.
|
||||
fn num_fields(&self) -> Option<usize>;
|
||||
|
||||
/// Returns a primary key filter factory.
|
||||
fn primary_key_filter(
|
||||
&self,
|
||||
metadata: &RegionMetadataRef,
|
||||
filters: Arc<Vec<SimpleFilterEvaluator>>,
|
||||
) -> Box<dyn PrimaryKeyFilter>;
|
||||
|
||||
/// Returns the estimated size of the primary key.
|
||||
fn estimated_size(&self) -> Option<usize> {
|
||||
None
|
||||
}
|
||||
|
||||
/// Returns the encoding type of the primary key.
|
||||
fn encoding(&self) -> PrimaryKeyEncoding;
|
||||
|
||||
/// Decodes the primary key from the given bytes.
|
||||
///
|
||||
/// Returns a [`CompositeValues`] that follows the primary key ordering.
|
||||
fn decode(&self, bytes: &[u8]) -> Result<CompositeValues>;
|
||||
|
||||
/// Decode the leftmost value from bytes.
|
||||
fn decode_leftmost(&self, bytes: &[u8]) -> Result<Option<Value>>;
|
||||
}
|
||||
|
||||
/// Builds a primary key codec from region metadata.
|
||||
pub fn build_primary_key_codec(region_metadata: &RegionMetadata) -> Arc<dyn PrimaryKeyCodec> {
|
||||
let fields = region_metadata.primary_key_columns().map(|col| {
|
||||
(
|
||||
col.column_id,
|
||||
SortField::new(col.column_schema.data_type.clone()),
|
||||
)
|
||||
});
|
||||
build_primary_key_codec_with_fields(region_metadata.primary_key_encoding, fields)
|
||||
}
|
||||
|
||||
/// Builds a primary key codec from region metadata.
|
||||
pub fn build_primary_key_codec_with_fields(
|
||||
encoding: PrimaryKeyEncoding,
|
||||
fields: impl Iterator<Item = (ColumnId, SortField)>,
|
||||
) -> Arc<dyn PrimaryKeyCodec> {
|
||||
match encoding {
|
||||
PrimaryKeyEncoding::Dense => Arc::new(DensePrimaryKeyCodec::with_fields(fields.collect())),
|
||||
PrimaryKeyEncoding::Sparse => {
|
||||
Arc::new(SparsePrimaryKeyCodec::with_fields(fields.collect()))
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -35,15 +35,16 @@ use store_api::storage::ColumnId;
|
||||
use crate::error::{
|
||||
self, FieldTypeMismatchSnafu, NotSupportedFieldSnafu, Result, SerializeFieldSnafu,
|
||||
};
|
||||
use crate::memtable::key_values::KeyValue;
|
||||
use crate::memtable::partition_tree::DensePrimaryKeyFilter;
|
||||
use crate::key_values::KeyValue;
|
||||
use crate::primary_key_filter::DensePrimaryKeyFilter;
|
||||
use crate::row_converter::{
|
||||
CompositeValues, PrimaryKeyCodec, PrimaryKeyCodecExt, PrimaryKeyFilter,
|
||||
};
|
||||
|
||||
/// Field to serialize and deserialize value in memcomparable format.
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub struct SortField {
|
||||
pub(crate) data_type: ConcreteDataType,
|
||||
data_type: ConcreteDataType,
|
||||
}
|
||||
|
||||
impl SortField {
|
||||
@@ -51,6 +52,11 @@ impl SortField {
|
||||
Self { data_type }
|
||||
}
|
||||
|
||||
/// Returns the data type of the field.
|
||||
pub fn data_type(&self) -> &ConcreteDataType {
|
||||
&self.data_type
|
||||
}
|
||||
|
||||
pub fn estimated_size(&self) -> usize {
|
||||
match &self.data_type {
|
||||
ConcreteDataType::Boolean(_) => 2,
|
||||
@@ -75,10 +81,9 @@ impl SortField {
|
||||
| ConcreteDataType::Dictionary(_) => 0,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl SortField {
|
||||
pub(crate) fn serialize(
|
||||
/// Serialize a value to the serializer.
|
||||
pub fn serialize(
|
||||
&self,
|
||||
serializer: &mut Serializer<&mut Vec<u8>>,
|
||||
value: &ValueRef,
|
||||
@@ -163,7 +168,8 @@ impl SortField {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) fn deserialize<B: Buf>(&self, deserializer: &mut Deserializer<B>) -> Result<Value> {
|
||||
/// Deserialize a value from the deserializer.
|
||||
pub fn deserialize<B: Buf>(&self, deserializer: &mut Deserializer<B>) -> Result<Value> {
|
||||
macro_rules! deserialize_and_build_value {
|
||||
(
|
||||
$self: ident;
|
||||
@@ -525,7 +531,7 @@ mod tests {
|
||||
let value = encoder.decode_value_at(&result, i, &mut offsets).unwrap();
|
||||
decoded.push(value);
|
||||
}
|
||||
assert_eq!(data_types.len(), offsets.len(), "offsets: {:?}", offsets);
|
||||
assert_eq!(data_types.len(), offsets.len(), "offsets: {offsets:?}");
|
||||
assert_eq!(decoded, row);
|
||||
}
|
||||
}
|
||||
@@ -27,8 +27,8 @@ use store_api::storage::consts::ReservedColumnId;
|
||||
use store_api::storage::ColumnId;
|
||||
|
||||
use crate::error::{DeserializeFieldSnafu, Result, SerializeFieldSnafu, UnsupportedOperationSnafu};
|
||||
use crate::memtable::key_values::KeyValue;
|
||||
use crate::memtable::partition_tree::SparsePrimaryKeyFilter;
|
||||
use crate::key_values::KeyValue;
|
||||
use crate::primary_key_filter::SparsePrimaryKeyFilter;
|
||||
use crate::row_converter::dense::SortField;
|
||||
use crate::row_converter::{CompositeValues, PrimaryKeyCodec, PrimaryKeyFilter};
|
||||
|
||||
@@ -205,7 +205,7 @@ impl SparsePrimaryKeyCodec {
|
||||
}
|
||||
|
||||
/// Returns the offset of the given column id in the given primary key.
|
||||
pub(crate) fn has_column(
|
||||
pub fn has_column(
|
||||
&self,
|
||||
pk: &[u8],
|
||||
offsets_map: &mut HashMap<u32, usize>,
|
||||
@@ -233,12 +233,7 @@ impl SparsePrimaryKeyCodec {
|
||||
}
|
||||
|
||||
/// Decode value at `offset` in `pk`.
|
||||
pub(crate) fn decode_value_at(
|
||||
&self,
|
||||
pk: &[u8],
|
||||
offset: usize,
|
||||
column_id: ColumnId,
|
||||
) -> Result<Value> {
|
||||
pub fn decode_value_at(&self, pk: &[u8], offset: usize, column_id: ColumnId) -> Result<Value> {
|
||||
let mut deserializer = Deserializer::new(pk);
|
||||
deserializer.advance(offset);
|
||||
// Safety: checked by `has_column`
|
||||
@@ -300,6 +295,40 @@ impl PrimaryKeyCodec for SparsePrimaryKeyCodec {
|
||||
}
|
||||
}
|
||||
|
||||
/// Field with column id.
|
||||
pub struct FieldWithId {
|
||||
pub field: SortField,
|
||||
pub column_id: ColumnId,
|
||||
}
|
||||
|
||||
/// A special encoder for memtable.
|
||||
pub struct SparseEncoder {
|
||||
fields: Vec<FieldWithId>,
|
||||
}
|
||||
|
||||
impl SparseEncoder {
|
||||
pub fn new(fields: Vec<FieldWithId>) -> Self {
|
||||
Self { fields }
|
||||
}
|
||||
|
||||
pub fn encode_to_vec<'a, I>(&self, row: I, buffer: &mut Vec<u8>) -> Result<()>
|
||||
where
|
||||
I: Iterator<Item = ValueRef<'a>>,
|
||||
{
|
||||
let mut serializer = Serializer::new(buffer);
|
||||
for (value, field) in row.zip(self.fields.iter()) {
|
||||
if !value.is_null() {
|
||||
field
|
||||
.column_id
|
||||
.serialize(&mut serializer)
|
||||
.context(SerializeFieldSnafu)?;
|
||||
field.field.serialize(&mut serializer, &value)?;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::sync::Arc;
|
||||
@@ -12,8 +12,10 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
//! Utilities to create a [RegionMetadata](store_api::metadata::RegionMetadata).
|
||||
//! Test utilities for mito codec.
|
||||
|
||||
use api::greptime_proto::v1;
|
||||
use api::v1::value::ValueData;
|
||||
use api::v1::SemanticType;
|
||||
use datatypes::prelude::ConcreteDataType;
|
||||
use datatypes::schema::ColumnSchema;
|
||||
@@ -105,3 +107,10 @@ impl TestRegionMetadataBuilder {
|
||||
builder.build().unwrap()
|
||||
}
|
||||
}
|
||||
|
||||
/// Creates value for i64.
|
||||
pub fn i64_value(data: i64) -> v1::Value {
|
||||
v1::Value {
|
||||
value_data: Some(ValueData::I64Value(data)),
|
||||
}
|
||||
}
|
||||
@@ -48,6 +48,7 @@ itertools.workspace = true
|
||||
lazy_static = "1.4"
|
||||
log-store = { workspace = true }
|
||||
memcomparable = "0.2"
|
||||
mito-codec.workspace = true
|
||||
moka = { workspace = true, features = ["sync", "future"] }
|
||||
object-store.workspace = true
|
||||
parquet = { workspace = true, features = ["async"] }
|
||||
@@ -82,6 +83,7 @@ common-test-util.workspace = true
|
||||
criterion = "0.4"
|
||||
dotenv.workspace = true
|
||||
log-store.workspace = true
|
||||
mito-codec = { workspace = true, features = ["testing"] }
|
||||
object-store = { workspace = true, features = ["services-memory"] }
|
||||
rskafka.workspace = true
|
||||
rstest.workspace = true
|
||||
|
||||
@@ -25,8 +25,8 @@ use mito2::memtable::partition_tree::{PartitionTreeConfig, PartitionTreeMemtable
|
||||
use mito2::memtable::time_series::TimeSeriesMemtable;
|
||||
use mito2::memtable::{KeyValues, Memtable};
|
||||
use mito2::region::options::MergeMode;
|
||||
use mito2::row_converter::DensePrimaryKeyCodec;
|
||||
use mito2::test_util::memtable_util::{self, region_metadata_to_row_schema};
|
||||
use mito_codec::row_converter::DensePrimaryKeyCodec;
|
||||
use rand::rngs::ThreadRng;
|
||||
use rand::seq::IndexedRandom;
|
||||
use rand::Rng;
|
||||
|
||||
@@ -18,8 +18,9 @@
|
||||
use common_base::readable_size::ReadableSize;
|
||||
use common_base::BitVec;
|
||||
use common_time::Timestamp;
|
||||
use smallvec::{smallvec, SmallVec};
|
||||
|
||||
use crate::sst::file::FileHandle;
|
||||
use crate::sst::file::{FileHandle, FileId};
|
||||
|
||||
/// Default max compaction output file size when not specified.
|
||||
const DEFAULT_MAX_OUTPUT_SIZE: u64 = ReadableSize::gb(2).as_bytes();
|
||||
@@ -125,17 +126,68 @@ pub trait Item: Ranged + Clone {
|
||||
fn size(&self) -> usize;
|
||||
}
|
||||
|
||||
impl Ranged for FileHandle {
|
||||
type BoundType = Timestamp;
|
||||
/// A group of files that are created by the same compaction task.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct FileGroup {
|
||||
files: SmallVec<[FileHandle; 2]>,
|
||||
size: usize,
|
||||
num_rows: usize,
|
||||
min_timestamp: Timestamp,
|
||||
max_timestamp: Timestamp,
|
||||
}
|
||||
|
||||
fn range(&self) -> (Self::BoundType, Self::BoundType) {
|
||||
self.time_range()
|
||||
impl FileGroup {
|
||||
pub(crate) fn new_with_file(file: FileHandle) -> Self {
|
||||
let size = file.size() as usize;
|
||||
let (min_timestamp, max_timestamp) = file.time_range();
|
||||
let num_rows = file.num_rows();
|
||||
Self {
|
||||
files: smallvec![file],
|
||||
size,
|
||||
num_rows,
|
||||
min_timestamp,
|
||||
max_timestamp,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn num_rows(&self) -> usize {
|
||||
self.num_rows
|
||||
}
|
||||
|
||||
pub(crate) fn add_file(&mut self, file: FileHandle) {
|
||||
self.size += file.size() as usize;
|
||||
self.num_rows += file.num_rows();
|
||||
let (min_timestamp, max_timestamp) = file.time_range();
|
||||
self.min_timestamp = self.min_timestamp.min(min_timestamp);
|
||||
self.max_timestamp = self.max_timestamp.max(max_timestamp);
|
||||
self.files.push(file);
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub(crate) fn files(&self) -> &[FileHandle] {
|
||||
&self.files[..]
|
||||
}
|
||||
|
||||
pub(crate) fn file_ids(&self) -> SmallVec<[FileId; 2]> {
|
||||
SmallVec::from_iter(self.files.iter().map(|f| f.file_id()))
|
||||
}
|
||||
|
||||
pub(crate) fn into_files(self) -> impl Iterator<Item = FileHandle> {
|
||||
self.files.into_iter()
|
||||
}
|
||||
}
|
||||
|
||||
impl Item for FileHandle {
|
||||
impl Ranged for FileGroup {
|
||||
type BoundType = Timestamp;
|
||||
|
||||
fn range(&self) -> (Self::BoundType, Self::BoundType) {
|
||||
(self.min_timestamp, self.max_timestamp)
|
||||
}
|
||||
}
|
||||
|
||||
impl Item for FileGroup {
|
||||
fn size(&self) -> usize {
|
||||
self.size() as usize
|
||||
self.size
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -12,6 +12,8 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::num::NonZeroU64;
|
||||
|
||||
use common_time::Timestamp;
|
||||
|
||||
use crate::sst::file::{FileHandle, FileId, FileMeta, Level};
|
||||
@@ -23,6 +25,23 @@ pub fn new_file_handle(
|
||||
start_ts_millis: i64,
|
||||
end_ts_millis: i64,
|
||||
level: Level,
|
||||
) -> FileHandle {
|
||||
new_file_handle_with_sequence(
|
||||
file_id,
|
||||
start_ts_millis,
|
||||
end_ts_millis,
|
||||
level,
|
||||
start_ts_millis as u64,
|
||||
)
|
||||
}
|
||||
|
||||
/// Test util to create file handles.
|
||||
pub fn new_file_handle_with_sequence(
|
||||
file_id: FileId,
|
||||
start_ts_millis: i64,
|
||||
end_ts_millis: i64,
|
||||
level: Level,
|
||||
sequence: u64,
|
||||
) -> FileHandle {
|
||||
let file_purger = new_noop_file_purger();
|
||||
FileHandle::new(
|
||||
@@ -39,7 +58,7 @@ pub fn new_file_handle(
|
||||
index_file_size: 0,
|
||||
num_rows: 0,
|
||||
num_row_groups: 0,
|
||||
sequence: None,
|
||||
sequence: NonZeroU64::new(sequence),
|
||||
},
|
||||
file_purger,
|
||||
)
|
||||
|
||||
@@ -15,6 +15,7 @@
|
||||
use std::collections::hash_map::Entry;
|
||||
use std::collections::{BTreeMap, HashMap};
|
||||
use std::fmt::Debug;
|
||||
use std::num::NonZeroU64;
|
||||
|
||||
use common_base::readable_size::ReadableSize;
|
||||
use common_telemetry::info;
|
||||
@@ -26,7 +27,9 @@ use store_api::storage::RegionId;
|
||||
use crate::compaction::buckets::infer_time_bucket;
|
||||
use crate::compaction::compactor::CompactionRegion;
|
||||
use crate::compaction::picker::{Picker, PickerOutput};
|
||||
use crate::compaction::run::{find_sorted_runs, merge_seq_files, reduce_runs};
|
||||
use crate::compaction::run::{
|
||||
find_sorted_runs, merge_seq_files, reduce_runs, FileGroup, Item, Ranged,
|
||||
};
|
||||
use crate::compaction::{get_expired_ssts, CompactionOutput};
|
||||
use crate::sst::file::{overlaps, FileHandle, Level};
|
||||
use crate::sst::version::LevelMeta;
|
||||
@@ -60,7 +63,8 @@ impl TwcsPicker {
|
||||
if files.files.is_empty() {
|
||||
continue;
|
||||
}
|
||||
let sorted_runs = find_sorted_runs(&mut files.files);
|
||||
let mut files_to_merge: Vec<_> = files.files().cloned().collect();
|
||||
let sorted_runs = find_sorted_runs(&mut files_to_merge);
|
||||
let found_runs = sorted_runs.len();
|
||||
// We only remove deletion markers if we found less than 2 runs and not in append mode.
|
||||
// because after compaction there will be no overlapping files.
|
||||
@@ -90,7 +94,7 @@ impl TwcsPicker {
|
||||
);
|
||||
output.push(CompactionOutput {
|
||||
output_level: LEVEL_COMPACTED, // always compact to l1
|
||||
inputs,
|
||||
inputs: inputs.into_iter().flat_map(|fg| fg.into_files()).collect(),
|
||||
filter_deleted,
|
||||
output_time_range: None, // we do not enforce output time range in twcs compactions.
|
||||
});
|
||||
@@ -109,21 +113,21 @@ fn log_pick_result(
|
||||
file_num: usize,
|
||||
max_output_file_size: Option<u64>,
|
||||
filter_deleted: bool,
|
||||
inputs: &[FileHandle],
|
||||
inputs: &[FileGroup],
|
||||
) {
|
||||
let input_file_str: Vec<String> = inputs
|
||||
.iter()
|
||||
.map(|f| {
|
||||
let range = f.time_range();
|
||||
let range = f.range();
|
||||
let start = range.0.to_iso8601_string();
|
||||
let end = range.1.to_iso8601_string();
|
||||
let num_rows = f.num_rows();
|
||||
format!(
|
||||
"SST{{id: {}, range: ({}, {}), size: {}, num rows: {} }}",
|
||||
f.file_id(),
|
||||
"FileGroup{{id: {:?}, range: ({}, {}), size: {}, num rows: {} }}",
|
||||
f.file_ids(),
|
||||
start,
|
||||
end,
|
||||
ReadableSize(f.size()),
|
||||
ReadableSize(f.size() as u64),
|
||||
num_rows
|
||||
)
|
||||
})
|
||||
@@ -198,7 +202,9 @@ impl Picker for TwcsPicker {
|
||||
struct Window {
|
||||
start: Timestamp,
|
||||
end: Timestamp,
|
||||
files: Vec<FileHandle>,
|
||||
// Mapping from file sequence to file groups. Files with the same sequence is considered
|
||||
// created from the same compaction task.
|
||||
files: HashMap<Option<NonZeroU64>, FileGroup>,
|
||||
time_window: i64,
|
||||
overlapping: bool,
|
||||
}
|
||||
@@ -207,10 +213,11 @@ impl Window {
|
||||
/// Creates a new [Window] with given file.
|
||||
fn new_with_file(file: FileHandle) -> Self {
|
||||
let (start, end) = file.time_range();
|
||||
let files = HashMap::from([(file.meta_ref().sequence, FileGroup::new_with_file(file))]);
|
||||
Self {
|
||||
start,
|
||||
end,
|
||||
files: vec![file],
|
||||
files,
|
||||
time_window: 0,
|
||||
overlapping: false,
|
||||
}
|
||||
@@ -226,7 +233,19 @@ impl Window {
|
||||
let (start, end) = file.time_range();
|
||||
self.start = self.start.min(start);
|
||||
self.end = self.end.max(end);
|
||||
self.files.push(file);
|
||||
|
||||
match self.files.entry(file.meta_ref().sequence) {
|
||||
Entry::Occupied(mut o) => {
|
||||
o.get_mut().add_file(file);
|
||||
}
|
||||
Entry::Vacant(v) => {
|
||||
v.insert(FileGroup::new_with_file(file));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn files(&self) -> impl Iterator<Item = &FileGroup> {
|
||||
self.files.values()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -311,7 +330,7 @@ mod tests {
|
||||
use std::collections::HashSet;
|
||||
|
||||
use super::*;
|
||||
use crate::compaction::test_util::new_file_handle;
|
||||
use crate::compaction::test_util::{new_file_handle, new_file_handle_with_sequence};
|
||||
use crate::sst::file::{FileId, Level};
|
||||
|
||||
#[test]
|
||||
@@ -371,7 +390,9 @@ mod tests {
|
||||
.iter(),
|
||||
3,
|
||||
);
|
||||
assert_eq!(5, windows.get(&0).unwrap().files.len());
|
||||
let fgs = &windows.get(&0).unwrap().files;
|
||||
assert_eq!(1, fgs.len());
|
||||
assert_eq!(fgs.values().map(|f| f.files().len()).sum::<usize>(), 5);
|
||||
|
||||
let files = [FileId::random(); 3];
|
||||
let windows = assign_to_windows(
|
||||
@@ -385,15 +406,56 @@ mod tests {
|
||||
);
|
||||
assert_eq!(
|
||||
files[0],
|
||||
windows.get(&0).unwrap().files.first().unwrap().file_id()
|
||||
windows.get(&0).unwrap().files().next().unwrap().files()[0].file_id()
|
||||
);
|
||||
assert_eq!(
|
||||
files[1],
|
||||
windows.get(&3).unwrap().files.first().unwrap().file_id()
|
||||
windows.get(&3).unwrap().files().next().unwrap().files()[0].file_id()
|
||||
);
|
||||
assert_eq!(
|
||||
files[2],
|
||||
windows.get(&12).unwrap().files.first().unwrap().file_id()
|
||||
windows.get(&12).unwrap().files().next().unwrap().files()[0].file_id()
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_assign_file_groups_to_windows() {
|
||||
let files = [
|
||||
FileId::random(),
|
||||
FileId::random(),
|
||||
FileId::random(),
|
||||
FileId::random(),
|
||||
];
|
||||
let windows = assign_to_windows(
|
||||
[
|
||||
new_file_handle_with_sequence(files[0], 0, 999, 0, 1),
|
||||
new_file_handle_with_sequence(files[1], 0, 999, 0, 1),
|
||||
new_file_handle_with_sequence(files[2], 0, 999, 0, 2),
|
||||
new_file_handle_with_sequence(files[3], 0, 999, 0, 2),
|
||||
]
|
||||
.iter(),
|
||||
3,
|
||||
);
|
||||
assert_eq!(windows.len(), 1);
|
||||
let fgs = &windows.get(&0).unwrap().files;
|
||||
assert_eq!(2, fgs.len());
|
||||
assert_eq!(
|
||||
fgs.get(&NonZeroU64::new(1))
|
||||
.unwrap()
|
||||
.files()
|
||||
.iter()
|
||||
.map(|f| f.file_id())
|
||||
.collect::<HashSet<_>>(),
|
||||
[files[0], files[1]].into_iter().collect()
|
||||
);
|
||||
assert_eq!(
|
||||
fgs.get(&NonZeroU64::new(2))
|
||||
.unwrap()
|
||||
.files()
|
||||
.iter()
|
||||
.map(|f| f.file_id())
|
||||
.collect::<HashSet<_>>(),
|
||||
[files[2], files[3]].into_iter().collect()
|
||||
);
|
||||
}
|
||||
|
||||
@@ -408,8 +470,22 @@ mod tests {
|
||||
];
|
||||
files[0].set_compacting(true);
|
||||
files[2].set_compacting(true);
|
||||
let windows = assign_to_windows(files.iter(), 3);
|
||||
assert_eq!(3, windows.get(&0).unwrap().files.len());
|
||||
let mut windows = assign_to_windows(files.iter(), 3);
|
||||
let window0 = windows.remove(&0).unwrap();
|
||||
assert_eq!(1, window0.files.len());
|
||||
let candidates = window0
|
||||
.files
|
||||
.into_values()
|
||||
.flat_map(|fg| fg.into_files())
|
||||
.map(|f| f.file_id())
|
||||
.collect::<HashSet<_>>();
|
||||
assert_eq!(candidates.len(), 3);
|
||||
assert_eq!(
|
||||
candidates,
|
||||
[files[1].file_id(), files[3].file_id(), files[4].file_id()]
|
||||
.into_iter()
|
||||
.collect::<HashSet<_>>()
|
||||
);
|
||||
}
|
||||
|
||||
/// (Window value, overlapping, files' time ranges in window)
|
||||
@@ -438,9 +514,11 @@ mod tests {
|
||||
let mut file_ranges = actual_window
|
||||
.files
|
||||
.iter()
|
||||
.map(|f| {
|
||||
let (s, e) = f.time_range();
|
||||
(s.value(), e.value())
|
||||
.flat_map(|(_, f)| {
|
||||
f.files().iter().map(|f| {
|
||||
let (s, e) = f.time_range();
|
||||
(s.value(), e.value())
|
||||
})
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
file_ranges.sort_unstable_by(|l, r| l.0.cmp(&r.0).then(l.1.cmp(&r.1)));
|
||||
@@ -607,10 +685,10 @@ mod tests {
|
||||
CompactionPickerTestCase {
|
||||
window_size: 3,
|
||||
input_files: [
|
||||
new_file_handle(file_ids[0], -2000, -3, 0),
|
||||
new_file_handle(file_ids[1], -3000, -100, 0),
|
||||
new_file_handle(file_ids[2], 0, 2999, 0), //active windows
|
||||
new_file_handle(file_ids[3], 50, 2998, 0), //active windows
|
||||
new_file_handle_with_sequence(file_ids[0], -2000, -3, 0, 1),
|
||||
new_file_handle_with_sequence(file_ids[1], -3000, -100, 0, 2),
|
||||
new_file_handle_with_sequence(file_ids[2], 0, 2999, 0, 3), //active windows
|
||||
new_file_handle_with_sequence(file_ids[3], 50, 2998, 0, 4), //active windows
|
||||
]
|
||||
.to_vec(),
|
||||
expected_outputs: vec![
|
||||
@@ -636,11 +714,11 @@ mod tests {
|
||||
CompactionPickerTestCase {
|
||||
window_size: 3,
|
||||
input_files: [
|
||||
new_file_handle(file_ids[0], -2000, -3, 0),
|
||||
new_file_handle(file_ids[1], -3000, -100, 0),
|
||||
new_file_handle(file_ids[2], 0, 2999, 0),
|
||||
new_file_handle(file_ids[3], 50, 2998, 0),
|
||||
new_file_handle(file_ids[4], 11, 2990, 0),
|
||||
new_file_handle_with_sequence(file_ids[0], -2000, -3, 0, 1),
|
||||
new_file_handle_with_sequence(file_ids[1], -3000, -100, 0, 2),
|
||||
new_file_handle_with_sequence(file_ids[2], 0, 2999, 0, 3),
|
||||
new_file_handle_with_sequence(file_ids[3], 50, 2998, 0, 4),
|
||||
new_file_handle_with_sequence(file_ids[4], 11, 2990, 0, 5),
|
||||
]
|
||||
.to_vec(),
|
||||
expected_outputs: vec![
|
||||
@@ -655,6 +733,27 @@ mod tests {
|
||||
],
|
||||
}
|
||||
.check();
|
||||
|
||||
// Case 3:
|
||||
// A compaction may split output into several files that have overlapping time ranges and same sequence,
|
||||
// we should treat these files as one FileGroup.
|
||||
let file_ids = (0..6).map(|_| FileId::random()).collect::<Vec<_>>();
|
||||
CompactionPickerTestCase {
|
||||
window_size: 3,
|
||||
input_files: [
|
||||
new_file_handle_with_sequence(file_ids[0], 0, 2999, 1, 1),
|
||||
new_file_handle_with_sequence(file_ids[1], 0, 2998, 1, 1),
|
||||
new_file_handle_with_sequence(file_ids[2], 3000, 5999, 1, 2),
|
||||
new_file_handle_with_sequence(file_ids[3], 3000, 5000, 1, 2),
|
||||
new_file_handle_with_sequence(file_ids[4], 11, 2990, 0, 3),
|
||||
]
|
||||
.to_vec(),
|
||||
expected_outputs: vec![ExpectedOutput {
|
||||
input_files: vec![0, 1, 4],
|
||||
output_level: 1,
|
||||
}],
|
||||
}
|
||||
.check();
|
||||
}
|
||||
|
||||
// TODO(hl): TTL tester that checks if get_expired_ssts function works as expected.
|
||||
|
||||
@@ -42,6 +42,13 @@ use crate::worker::WorkerId;
|
||||
#[snafu(visibility(pub))]
|
||||
#[stack_trace_debug]
|
||||
pub enum Error {
|
||||
#[snafu(display("Unexpected data type"))]
|
||||
DataTypeMismatch {
|
||||
source: datatypes::error::Error,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("External error, context: {}", context))]
|
||||
External {
|
||||
source: BoxedError,
|
||||
@@ -291,35 +298,6 @@ pub enum Error {
|
||||
#[snafu(display("Failed to write region"))]
|
||||
WriteGroup { source: Arc<Error> },
|
||||
|
||||
#[snafu(display("Row value mismatches field data type"))]
|
||||
FieldTypeMismatch { source: datatypes::error::Error },
|
||||
|
||||
#[snafu(display("Failed to serialize field"))]
|
||||
SerializeField {
|
||||
#[snafu(source)]
|
||||
error: memcomparable::Error,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display(
|
||||
"Data type: {} does not support serialization/deserialization",
|
||||
data_type,
|
||||
))]
|
||||
NotSupportedField {
|
||||
data_type: ConcreteDataType,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to deserialize field"))]
|
||||
DeserializeField {
|
||||
#[snafu(source)]
|
||||
error: memcomparable::Error,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Invalid parquet SST file {}, reason: {}", file, reason))]
|
||||
InvalidParquet {
|
||||
file: String,
|
||||
@@ -1028,6 +1006,20 @@ pub enum Error {
|
||||
location: Location,
|
||||
source: common_grpc::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to encode"))]
|
||||
Encode {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
source: mito_codec::error::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to decode"))]
|
||||
Decode {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
source: mito_codec::error::Error,
|
||||
},
|
||||
}
|
||||
|
||||
pub type Result<T, E = Error> = std::result::Result<T, E>;
|
||||
@@ -1052,6 +1044,7 @@ impl ErrorExt for Error {
|
||||
use Error::*;
|
||||
|
||||
match self {
|
||||
DataTypeMismatch { source, .. } => source.status_code(),
|
||||
OpenDal { .. } | ReadParquet { .. } => StatusCode::StorageUnavailable,
|
||||
WriteWal { source, .. } | ReadWal { source, .. } | DeleteWal { source, .. } => {
|
||||
source.status_code()
|
||||
@@ -1095,7 +1088,6 @@ impl ErrorExt for Error {
|
||||
| BiErrors { .. }
|
||||
| StopScheduler { .. }
|
||||
| ComputeVector { .. }
|
||||
| SerializeField { .. }
|
||||
| EncodeMemtable { .. }
|
||||
| CreateDir { .. }
|
||||
| ReadDataPart { .. }
|
||||
@@ -1107,9 +1099,7 @@ impl ErrorExt for Error {
|
||||
|
||||
WriteParquet { .. } => StatusCode::StorageUnavailable,
|
||||
WriteGroup { source, .. } => source.status_code(),
|
||||
FieldTypeMismatch { source, .. } => source.status_code(),
|
||||
NotSupportedField { .. } => StatusCode::Unsupported,
|
||||
DeserializeField { .. } | EncodeSparsePrimaryKey { .. } => StatusCode::Unexpected,
|
||||
EncodeSparsePrimaryKey { .. } => StatusCode::Unexpected,
|
||||
InvalidBatch { .. } => StatusCode::InvalidArguments,
|
||||
InvalidRecordBatch { .. } => StatusCode::InvalidArguments,
|
||||
ConvertVector { source, .. } => source.status_code(),
|
||||
@@ -1181,7 +1171,9 @@ impl ErrorExt for Error {
|
||||
ScanSeries { source, .. } => source.status_code(),
|
||||
|
||||
ScanMultiTimes { .. } => StatusCode::InvalidArguments,
|
||||
Error::ConvertBulkWalEntry { source, .. } => source.status_code(),
|
||||
ConvertBulkWalEntry { source, .. } => source.status_code(),
|
||||
|
||||
Encode { source, .. } | Decode { source, .. } => source.status_code(),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -41,7 +41,6 @@ pub mod read;
|
||||
pub mod region;
|
||||
mod region_write_ctx;
|
||||
pub mod request;
|
||||
pub mod row_converter;
|
||||
pub mod schedule;
|
||||
pub mod sst;
|
||||
mod time_provider;
|
||||
|
||||
@@ -21,6 +21,8 @@ use std::sync::Arc;
|
||||
|
||||
pub use bulk::part::EncodedBulkPart;
|
||||
use common_time::Timestamp;
|
||||
use mito_codec::key_values::KeyValue;
|
||||
pub use mito_codec::key_values::KeyValues;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use store_api::metadata::RegionMetadataRef;
|
||||
use store_api::storage::{ColumnId, SequenceNumber};
|
||||
@@ -29,8 +31,6 @@ use table::predicate::Predicate;
|
||||
use crate::config::MitoConfig;
|
||||
use crate::error::Result;
|
||||
use crate::flush::WriteBufferManagerRef;
|
||||
use crate::memtable::key_values::KeyValue;
|
||||
pub use crate::memtable::key_values::KeyValues;
|
||||
use crate::memtable::partition_tree::{PartitionTreeConfig, PartitionTreeMemtableBuilder};
|
||||
use crate::memtable::time_series::TimeSeriesMemtableBuilder;
|
||||
use crate::metrics::WRITE_BUFFER_BYTES;
|
||||
@@ -42,7 +42,6 @@ use crate::sst::file::FileTimeRange;
|
||||
|
||||
mod builder;
|
||||
pub mod bulk;
|
||||
pub mod key_values;
|
||||
pub mod partition_tree;
|
||||
mod simple_bulk_memtable;
|
||||
mod stats;
|
||||
|
||||
@@ -16,13 +16,13 @@
|
||||
|
||||
use std::sync::{Arc, RwLock};
|
||||
|
||||
use mito_codec::key_values::KeyValue;
|
||||
use store_api::metadata::RegionMetadataRef;
|
||||
use store_api::storage::{ColumnId, SequenceNumber};
|
||||
use table::predicate::Predicate;
|
||||
|
||||
use crate::error::Result;
|
||||
use crate::memtable::bulk::part::{BulkPart, EncodedBulkPart};
|
||||
use crate::memtable::key_values::KeyValue;
|
||||
use crate::memtable::{
|
||||
BoxedBatchIterator, KeyValues, Memtable, MemtableId, MemtableRanges, MemtableRef,
|
||||
MemtableStats, PredicateGroup,
|
||||
|
||||
@@ -17,12 +17,12 @@
|
||||
use std::collections::VecDeque;
|
||||
use std::sync::Arc;
|
||||
|
||||
use mito_codec::row_converter::{build_primary_key_codec, DensePrimaryKeyCodec};
|
||||
use parquet::file::metadata::ParquetMetaData;
|
||||
use store_api::metadata::RegionMetadataRef;
|
||||
use store_api::storage::ColumnId;
|
||||
use table::predicate::Predicate;
|
||||
|
||||
use crate::row_converter::{build_primary_key_codec, DensePrimaryKeyCodec};
|
||||
use crate::sst::parquet::file_range::RangeBase;
|
||||
use crate::sst::parquet::format::ReadFormat;
|
||||
use crate::sst::parquet::reader::SimpleFilterContext;
|
||||
|
||||
@@ -38,6 +38,8 @@ use datatypes::data_type::DataType;
|
||||
use datatypes::prelude::{MutableVector, ScalarVectorBuilder, Vector};
|
||||
use datatypes::value::Value;
|
||||
use datatypes::vectors::Helper;
|
||||
use mito_codec::key_values::{KeyValue, KeyValuesRef};
|
||||
use mito_codec::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodec, PrimaryKeyCodecExt};
|
||||
use parquet::arrow::ArrowWriter;
|
||||
use parquet::data_type::AsBytes;
|
||||
use parquet::file::metadata::ParquetMetaData;
|
||||
@@ -47,13 +49,12 @@ use store_api::metadata::RegionMetadataRef;
|
||||
use store_api::storage::SequenceNumber;
|
||||
use table::predicate::Predicate;
|
||||
|
||||
use crate::error;
|
||||
use crate::error::{ComputeArrowSnafu, EncodeMemtableSnafu, NewRecordBatchSnafu, Result};
|
||||
use crate::error::{
|
||||
self, ComputeArrowSnafu, EncodeMemtableSnafu, EncodeSnafu, NewRecordBatchSnafu, Result,
|
||||
};
|
||||
use crate::memtable::bulk::context::BulkIterContextRef;
|
||||
use crate::memtable::bulk::part_reader::BulkPartIter;
|
||||
use crate::memtable::key_values::{KeyValue, KeyValuesRef};
|
||||
use crate::memtable::BoxedBatchIterator;
|
||||
use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodec, PrimaryKeyCodecExt};
|
||||
use crate::sst::parquet::format::{PrimaryKeyArray, ReadFormat};
|
||||
use crate::sst::parquet::helper::parse_parquet_metadata;
|
||||
use crate::sst::to_sst_arrow_schema;
|
||||
@@ -354,7 +355,9 @@ fn mutations_to_record_batch(
|
||||
|
||||
for row in key_values.iter() {
|
||||
pk_buffer.clear();
|
||||
pk_encoder.encode_to_vec(row.primary_keys(), &mut pk_buffer)?;
|
||||
pk_encoder
|
||||
.encode_to_vec(row.primary_keys(), &mut pk_buffer)
|
||||
.context(EncodeSnafu)?;
|
||||
pk_builder.append_value(pk_buffer.as_bytes());
|
||||
ts_vector.push_value_ref(row.timestamp());
|
||||
sequence_builder.append_value(row.sequence());
|
||||
|
||||
@@ -19,7 +19,6 @@ mod dedup;
|
||||
mod dict;
|
||||
mod merger;
|
||||
mod partition;
|
||||
mod primary_key_filter;
|
||||
mod shard;
|
||||
mod shard_builder;
|
||||
mod tree;
|
||||
@@ -29,7 +28,8 @@ use std::sync::atomic::{AtomicI64, AtomicU64, AtomicUsize, Ordering};
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_base::readable_size::ReadableSize;
|
||||
pub(crate) use primary_key_filter::{DensePrimaryKeyFilter, SparsePrimaryKeyFilter};
|
||||
use mito_codec::key_values::KeyValue;
|
||||
use mito_codec::row_converter::{build_primary_key_codec, PrimaryKeyCodec};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use store_api::metadata::RegionMetadataRef;
|
||||
use store_api::storage::{ColumnId, SequenceNumber};
|
||||
@@ -38,7 +38,6 @@ use table::predicate::Predicate;
|
||||
use crate::error::{Result, UnsupportedOperationSnafu};
|
||||
use crate::flush::WriteBufferManagerRef;
|
||||
use crate::memtable::bulk::part::BulkPart;
|
||||
use crate::memtable::key_values::KeyValue;
|
||||
use crate::memtable::partition_tree::tree::PartitionTree;
|
||||
use crate::memtable::stats::WriteMetrics;
|
||||
use crate::memtable::{
|
||||
@@ -47,7 +46,6 @@ use crate::memtable::{
|
||||
PredicateGroup,
|
||||
};
|
||||
use crate::region::options::MergeMode;
|
||||
use crate::row_converter::{build_primary_key_codec, PrimaryKeyCodec};
|
||||
|
||||
/// Use `1/DICTIONARY_SIZE_FACTOR` of OS memory as dictionary size.
|
||||
pub(crate) const DICTIONARY_SIZE_FACTOR: u64 = 8;
|
||||
@@ -368,11 +366,11 @@ mod tests {
|
||||
use datatypes::schema::ColumnSchema;
|
||||
use datatypes::value::Value;
|
||||
use datatypes::vectors::Int64Vector;
|
||||
use mito_codec::row_converter::DensePrimaryKeyCodec;
|
||||
use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
|
||||
use store_api::storage::RegionId;
|
||||
|
||||
use super::*;
|
||||
use crate::row_converter::DensePrimaryKeyCodec;
|
||||
use crate::test_util::memtable_util::{
|
||||
self, collect_iter_timestamps, region_metadata_to_row_schema,
|
||||
};
|
||||
|
||||
@@ -33,6 +33,7 @@ use datatypes::vectors::{
|
||||
TimestampSecondVector, UInt16Vector, UInt16VectorBuilder, UInt64Vector, UInt64VectorBuilder,
|
||||
UInt8Vector, UInt8VectorBuilder,
|
||||
};
|
||||
use mito_codec::key_values::KeyValue;
|
||||
use parquet::arrow::arrow_reader::{ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder};
|
||||
use parquet::arrow::ArrowWriter;
|
||||
use parquet::basic::{Compression, Encoding, ZstdLevel};
|
||||
@@ -44,7 +45,6 @@ use store_api::storage::consts::{OP_TYPE_COLUMN_NAME, SEQUENCE_COLUMN_NAME};
|
||||
|
||||
use crate::error;
|
||||
use crate::error::Result;
|
||||
use crate::memtable::key_values::KeyValue;
|
||||
use crate::memtable::partition_tree::merger::{DataBatchKey, DataNode, DataSource, Merger};
|
||||
use crate::memtable::partition_tree::PkIndex;
|
||||
use crate::metrics::{
|
||||
|
||||
@@ -22,13 +22,16 @@ use std::time::{Duration, Instant};
|
||||
|
||||
use api::v1::SemanticType;
|
||||
use common_recordbatch::filter::SimpleFilterEvaluator;
|
||||
use mito_codec::key_values::KeyValue;
|
||||
use mito_codec::primary_key_filter::is_partition_column;
|
||||
use mito_codec::row_converter::{PrimaryKeyCodec, PrimaryKeyFilter};
|
||||
use snafu::ResultExt;
|
||||
use store_api::codec::PrimaryKeyEncoding;
|
||||
use store_api::metadata::RegionMetadataRef;
|
||||
use store_api::metric_engine_consts::DATA_SCHEMA_TABLE_ID_COLUMN_NAME;
|
||||
use store_api::storage::ColumnId;
|
||||
|
||||
use crate::error::Result;
|
||||
use crate::memtable::key_values::KeyValue;
|
||||
use crate::error::{EncodeSnafu, Result};
|
||||
use crate::memtable::partition_tree::data::{DataBatch, DataParts, DATA_INIT_CAP};
|
||||
use crate::memtable::partition_tree::dedup::DedupReader;
|
||||
use crate::memtable::partition_tree::shard::{
|
||||
@@ -39,7 +42,6 @@ use crate::memtable::partition_tree::{PartitionTreeConfig, PkId};
|
||||
use crate::memtable::stats::WriteMetrics;
|
||||
use crate::metrics::PARTITION_TREE_READ_STAGE_ELAPSED;
|
||||
use crate::read::{Batch, BatchBuilder};
|
||||
use crate::row_converter::{PrimaryKeyCodec, PrimaryKeyFilter};
|
||||
|
||||
/// Key of a partition.
|
||||
pub type PartitionKey = u32;
|
||||
@@ -91,7 +93,9 @@ impl Partition {
|
||||
// `primary_key` is sparse, re-encode the full primary key.
|
||||
let sparse_key = primary_key.clone();
|
||||
primary_key.clear();
|
||||
row_codec.encode_key_value(&key_value, primary_key)?;
|
||||
row_codec
|
||||
.encode_key_value(&key_value, primary_key)
|
||||
.context(EncodeSnafu)?;
|
||||
let pk_id = inner.shard_builder.write_with_key(
|
||||
primary_key,
|
||||
Some(&sparse_key),
|
||||
@@ -304,11 +308,6 @@ impl Partition {
|
||||
.map(|meta| meta.column_schema.name == DATA_SCHEMA_TABLE_ID_COLUMN_NAME)
|
||||
.unwrap_or(false)
|
||||
}
|
||||
|
||||
/// Returns true if this is a partition column.
|
||||
pub(crate) fn is_partition_column(name: &str) -> bool {
|
||||
name == DATA_SCHEMA_TABLE_ID_COLUMN_NAME
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct PartitionStats {
|
||||
@@ -446,7 +445,7 @@ impl ReadPartitionContext {
|
||||
fn need_prune_key(metadata: &RegionMetadataRef, filters: &[SimpleFilterEvaluator]) -> bool {
|
||||
for filter in filters {
|
||||
// We already pruned partitions before so we skip the partition column.
|
||||
if Partition::is_partition_column(filter.column_name()) {
|
||||
if is_partition_column(filter.column_name()) {
|
||||
continue;
|
||||
}
|
||||
let Some(column) = metadata.column_by_name(filter.column_name()) else {
|
||||
|
||||
@@ -17,10 +17,11 @@
|
||||
use std::cmp::Ordering;
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use mito_codec::key_values::KeyValue;
|
||||
use mito_codec::row_converter::PrimaryKeyFilter;
|
||||
use store_api::metadata::RegionMetadataRef;
|
||||
|
||||
use crate::error::Result;
|
||||
use crate::memtable::key_values::KeyValue;
|
||||
use crate::memtable::partition_tree::data::{
|
||||
DataBatch, DataParts, DataPartsReader, DataPartsReaderBuilder, DATA_INIT_CAP,
|
||||
};
|
||||
@@ -29,7 +30,6 @@ use crate::memtable::partition_tree::merger::{Merger, Node};
|
||||
use crate::memtable::partition_tree::shard_builder::ShardBuilderReader;
|
||||
use crate::memtable::partition_tree::{PkId, PkIndex, ShardId};
|
||||
use crate::metrics::PARTITION_TREE_READ_STAGE_ELAPSED;
|
||||
use crate::row_converter::PrimaryKeyFilter;
|
||||
|
||||
/// Shard stores data related to the same key dictionary.
|
||||
pub struct Shard {
|
||||
|
||||
@@ -18,10 +18,11 @@ use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use mito_codec::key_values::KeyValue;
|
||||
use mito_codec::row_converter::PrimaryKeyFilter;
|
||||
use store_api::metadata::RegionMetadataRef;
|
||||
|
||||
use crate::error::Result;
|
||||
use crate::memtable::key_values::KeyValue;
|
||||
use crate::memtable::partition_tree::data::{
|
||||
DataBatch, DataBuffer, DataBufferReader, DataBufferReaderBuilder, DataParts, DATA_INIT_CAP,
|
||||
};
|
||||
@@ -30,7 +31,6 @@ use crate::memtable::partition_tree::shard::Shard;
|
||||
use crate::memtable::partition_tree::{PartitionTreeConfig, PkId, PkIndex, ShardId};
|
||||
use crate::memtable::stats::WriteMetrics;
|
||||
use crate::metrics::PARTITION_TREE_READ_STAGE_ELAPSED;
|
||||
use crate::row_converter::PrimaryKeyFilter;
|
||||
|
||||
/// Builder to write keys and data to a shard that the key dictionary
|
||||
/// is still active.
|
||||
|
||||
@@ -23,8 +23,10 @@ use common_recordbatch::filter::SimpleFilterEvaluator;
|
||||
use common_time::Timestamp;
|
||||
use datafusion_common::ScalarValue;
|
||||
use datatypes::prelude::ValueRef;
|
||||
use memcomparable::Serializer;
|
||||
use serde::Serialize;
|
||||
use mito_codec::key_values::KeyValue;
|
||||
use mito_codec::primary_key_filter::is_partition_column;
|
||||
use mito_codec::row_converter::sparse::{FieldWithId, SparseEncoder};
|
||||
use mito_codec::row_converter::{PrimaryKeyCodec, SortField};
|
||||
use snafu::{ensure, ResultExt};
|
||||
use store_api::codec::PrimaryKeyEncoding;
|
||||
use store_api::metadata::RegionMetadataRef;
|
||||
@@ -32,10 +34,9 @@ use store_api::storage::{ColumnId, SequenceNumber};
|
||||
use table::predicate::Predicate;
|
||||
|
||||
use crate::error::{
|
||||
EncodeSparsePrimaryKeySnafu, PrimaryKeyLengthMismatchSnafu, Result, SerializeFieldSnafu,
|
||||
EncodeSnafu, EncodeSparsePrimaryKeySnafu, PrimaryKeyLengthMismatchSnafu, Result,
|
||||
};
|
||||
use crate::flush::WriteBufferManagerRef;
|
||||
use crate::memtable::key_values::KeyValue;
|
||||
use crate::memtable::partition_tree::partition::{
|
||||
Partition, PartitionKey, PartitionReader, PartitionRef, ReadPartitionContext,
|
||||
};
|
||||
@@ -46,7 +47,6 @@ use crate::metrics::{PARTITION_TREE_READ_STAGE_ELAPSED, READ_ROWS_TOTAL, READ_ST
|
||||
use crate::read::dedup::LastNonNullIter;
|
||||
use crate::read::Batch;
|
||||
use crate::region::options::MergeMode;
|
||||
use crate::row_converter::{PrimaryKeyCodec, SortField};
|
||||
|
||||
/// The partition tree.
|
||||
pub struct PartitionTree {
|
||||
@@ -73,15 +73,15 @@ impl PartitionTree {
|
||||
config: &PartitionTreeConfig,
|
||||
write_buffer_manager: Option<WriteBufferManagerRef>,
|
||||
) -> Self {
|
||||
let sparse_encoder = SparseEncoder {
|
||||
fields: metadata
|
||||
let sparse_encoder = SparseEncoder::new(
|
||||
metadata
|
||||
.primary_key_columns()
|
||||
.map(|c| FieldWithId {
|
||||
field: SortField::new(c.column_schema.data_type.clone()),
|
||||
column_id: c.column_id,
|
||||
})
|
||||
.collect(),
|
||||
};
|
||||
);
|
||||
let is_partitioned = Partition::has_multi_partitions(&metadata);
|
||||
let mut config = config.clone();
|
||||
if config.merge_mode == MergeMode::LastNonNull {
|
||||
@@ -129,7 +129,8 @@ impl PartitionTree {
|
||||
} else {
|
||||
// For compatibility, use the sparse encoder for dense primary key.
|
||||
self.sparse_encoder
|
||||
.encode_to_vec(kv.primary_keys(), buffer)?;
|
||||
.encode_to_vec(kv.primary_keys(), buffer)
|
||||
.context(EncodeSnafu)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
@@ -166,7 +167,9 @@ impl PartitionTree {
|
||||
if self.is_partitioned {
|
||||
self.encode_sparse_primary_key(&kv, pk_buffer)?;
|
||||
} else {
|
||||
self.row_codec.encode_key_value(&kv, pk_buffer)?;
|
||||
self.row_codec
|
||||
.encode_key_value(&kv, pk_buffer)
|
||||
.context(EncodeSnafu)?;
|
||||
}
|
||||
|
||||
// Write rows with
|
||||
@@ -208,7 +211,9 @@ impl PartitionTree {
|
||||
if self.is_partitioned {
|
||||
self.encode_sparse_primary_key(&kv, pk_buffer)?;
|
||||
} else {
|
||||
self.row_codec.encode_key_value(&kv, pk_buffer)?;
|
||||
self.row_codec
|
||||
.encode_key_value(&kv, pk_buffer)
|
||||
.context(EncodeSnafu)?;
|
||||
}
|
||||
|
||||
// Write rows with
|
||||
@@ -415,7 +420,7 @@ impl PartitionTree {
|
||||
for (key, partition) in partitions.iter() {
|
||||
let mut is_needed = true;
|
||||
for filter in filters {
|
||||
if !Partition::is_partition_column(filter.column_name()) {
|
||||
if !is_partition_column(filter.column_name()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
@@ -436,34 +441,6 @@ impl PartitionTree {
|
||||
}
|
||||
}
|
||||
|
||||
struct FieldWithId {
|
||||
field: SortField,
|
||||
column_id: ColumnId,
|
||||
}
|
||||
|
||||
struct SparseEncoder {
|
||||
fields: Vec<FieldWithId>,
|
||||
}
|
||||
|
||||
impl SparseEncoder {
|
||||
fn encode_to_vec<'a, I>(&self, row: I, buffer: &mut Vec<u8>) -> Result<()>
|
||||
where
|
||||
I: Iterator<Item = ValueRef<'a>>,
|
||||
{
|
||||
let mut serializer = Serializer::new(buffer);
|
||||
for (value, field) in row.zip(self.fields.iter()) {
|
||||
if !value.is_null() {
|
||||
field
|
||||
.column_id
|
||||
.serialize(&mut serializer)
|
||||
.context(SerializeFieldSnafu)?;
|
||||
field.field.serialize(&mut serializer, &value)?;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
struct TreeIterMetrics {
|
||||
iter_elapsed: Duration,
|
||||
|
||||
@@ -19,6 +19,7 @@ use std::sync::{Arc, RwLock};
|
||||
|
||||
use api::v1::OpType;
|
||||
use datatypes::vectors::Helper;
|
||||
use mito_codec::key_values::KeyValue;
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
use store_api::metadata::RegionMetadataRef;
|
||||
use store_api::storage::{ColumnId, SequenceNumber};
|
||||
@@ -26,7 +27,6 @@ use table::predicate::Predicate;
|
||||
|
||||
use crate::flush::WriteBufferManagerRef;
|
||||
use crate::memtable::bulk::part::BulkPart;
|
||||
use crate::memtable::key_values::KeyValue;
|
||||
use crate::memtable::stats::WriteMetrics;
|
||||
use crate::memtable::time_series::{Series, Values};
|
||||
use crate::memtable::{
|
||||
|
||||
@@ -29,6 +29,7 @@ use datatypes::arrow::array::{
|
||||
};
|
||||
use datatypes::arrow::buffer::{BooleanBuffer, MutableBuffer};
|
||||
use datatypes::arrow::datatypes::{DataType, Int64Type};
|
||||
use mito_codec::key_values::KeyValue;
|
||||
use smallvec::{smallvec, SmallVec};
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
use store_api::metadata::RegionMetadataRef;
|
||||
@@ -36,7 +37,6 @@ use store_api::metadata::RegionMetadataRef;
|
||||
use crate::error;
|
||||
use crate::error::{InvalidRequestSnafu, Result};
|
||||
use crate::memtable::bulk::part::BulkPart;
|
||||
use crate::memtable::key_values::KeyValue;
|
||||
use crate::memtable::version::SmallMemtableVec;
|
||||
use crate::memtable::{KeyValues, MemtableBuilderRef, MemtableId, MemtableRef};
|
||||
|
||||
|
||||
@@ -35,17 +35,19 @@ use datatypes::vectors::{
|
||||
Helper, TimestampMicrosecondVector, TimestampMillisecondVector, TimestampNanosecondVector,
|
||||
TimestampSecondVector, UInt64Vector, UInt8Vector,
|
||||
};
|
||||
use mito_codec::key_values::KeyValue;
|
||||
use mito_codec::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt};
|
||||
use snafu::{ensure, OptionExt, ResultExt};
|
||||
use store_api::metadata::RegionMetadataRef;
|
||||
use store_api::storage::{ColumnId, SequenceNumber};
|
||||
use table::predicate::Predicate;
|
||||
|
||||
use crate::error;
|
||||
use crate::error::{ComputeArrowSnafu, ConvertVectorSnafu, PrimaryKeyLengthMismatchSnafu, Result};
|
||||
use crate::error::{
|
||||
self, ComputeArrowSnafu, ConvertVectorSnafu, EncodeSnafu, PrimaryKeyLengthMismatchSnafu, Result,
|
||||
};
|
||||
use crate::flush::WriteBufferManagerRef;
|
||||
use crate::memtable::builder::{FieldBuilder, StringBuilder};
|
||||
use crate::memtable::bulk::part::BulkPart;
|
||||
use crate::memtable::key_values::KeyValue;
|
||||
use crate::memtable::simple_bulk_memtable::SimpleBulkMemtable;
|
||||
use crate::memtable::stats::WriteMetrics;
|
||||
use crate::memtable::{
|
||||
@@ -57,7 +59,6 @@ use crate::metrics::{READ_ROWS_TOTAL, READ_STAGE_ELAPSED};
|
||||
use crate::read::dedup::LastNonNullIter;
|
||||
use crate::read::{Batch, BatchBuilder, BatchColumn};
|
||||
use crate::region::options::MergeMode;
|
||||
use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt};
|
||||
|
||||
/// Initial vector builder capacity.
|
||||
const INITIAL_BUILDER_CAPACITY: usize = 4;
|
||||
@@ -176,7 +177,10 @@ impl TimeSeriesMemtable {
|
||||
}
|
||||
);
|
||||
|
||||
let primary_key_encoded = self.row_codec.encode(kv.primary_keys())?;
|
||||
let primary_key_encoded = self
|
||||
.row_codec
|
||||
.encode(kv.primary_keys())
|
||||
.context(EncodeSnafu)?;
|
||||
|
||||
let (key_allocated, value_allocated) =
|
||||
self.series_set.push_to_series(primary_key_encoded, &kv);
|
||||
@@ -1107,11 +1111,11 @@ mod tests {
|
||||
use datatypes::schema::ColumnSchema;
|
||||
use datatypes::value::{OrderedFloat, Value};
|
||||
use datatypes::vectors::{Float64Vector, Int64Vector, TimestampMillisecondVector};
|
||||
use mito_codec::row_converter::SortField;
|
||||
use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
|
||||
use store_api::storage::RegionId;
|
||||
|
||||
use super::*;
|
||||
use crate::row_converter::SortField;
|
||||
use crate::test_util::column_metadata_to_column_schema;
|
||||
|
||||
fn schema_for_test() -> RegionMetadataRef {
|
||||
|
||||
@@ -50,16 +50,17 @@ use datatypes::vectors::{
|
||||
};
|
||||
use futures::stream::BoxStream;
|
||||
use futures::TryStreamExt;
|
||||
use mito_codec::row_converter::{CompositeValues, PrimaryKeyCodec};
|
||||
use snafu::{ensure, OptionExt, ResultExt};
|
||||
use store_api::metadata::RegionMetadata;
|
||||
use store_api::storage::{ColumnId, SequenceNumber};
|
||||
|
||||
use crate::error::{
|
||||
ComputeArrowSnafu, ComputeVectorSnafu, ConvertVectorSnafu, InvalidBatchSnafu, Result,
|
||||
ComputeArrowSnafu, ComputeVectorSnafu, ConvertVectorSnafu, DecodeSnafu, InvalidBatchSnafu,
|
||||
Result,
|
||||
};
|
||||
use crate::memtable::BoxedBatchIterator;
|
||||
use crate::read::prune::PruneReader;
|
||||
use crate::row_converter::{CompositeValues, PrimaryKeyCodec};
|
||||
|
||||
/// Storage internal representation of a batch of rows for a primary key (time series).
|
||||
///
|
||||
@@ -612,7 +613,7 @@ impl Batch {
|
||||
column_id: ColumnId,
|
||||
) -> Result<Option<&Value>> {
|
||||
if self.pk_values.is_none() {
|
||||
self.pk_values = Some(codec.decode(&self.primary_key)?);
|
||||
self.pk_values = Some(codec.decode(&self.primary_key).context(DecodeSnafu)?);
|
||||
}
|
||||
|
||||
let pk_values = self.pk_values.as_ref().unwrap();
|
||||
@@ -1026,12 +1027,12 @@ pub(crate) struct ScannerMetrics {
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use mito_codec::row_converter::{self, build_primary_key_codec_with_fields};
|
||||
use store_api::codec::PrimaryKeyEncoding;
|
||||
use store_api::storage::consts::ReservedColumnId;
|
||||
|
||||
use super::*;
|
||||
use crate::error::Error;
|
||||
use crate::row_converter::{self, build_primary_key_codec_with_fields};
|
||||
use crate::test_util::new_batch_builder;
|
||||
|
||||
fn new_batch(
|
||||
|
||||
@@ -20,17 +20,17 @@ use std::sync::Arc;
|
||||
use datatypes::data_type::ConcreteDataType;
|
||||
use datatypes::value::Value;
|
||||
use datatypes::vectors::VectorRef;
|
||||
use mito_codec::row_converter::{
|
||||
build_primary_key_codec, build_primary_key_codec_with_fields, CompositeValues, PrimaryKeyCodec,
|
||||
SortField,
|
||||
};
|
||||
use snafu::{ensure, OptionExt, ResultExt};
|
||||
use store_api::metadata::{RegionMetadata, RegionMetadataRef};
|
||||
use store_api::storage::ColumnId;
|
||||
|
||||
use crate::error::{CompatReaderSnafu, CreateDefaultSnafu, Result};
|
||||
use crate::error::{CompatReaderSnafu, CreateDefaultSnafu, DecodeSnafu, EncodeSnafu, Result};
|
||||
use crate::read::projection::ProjectionMapper;
|
||||
use crate::read::{Batch, BatchColumn, BatchReader};
|
||||
use crate::row_converter::{
|
||||
build_primary_key_codec, build_primary_key_codec_with_fields, CompositeValues, PrimaryKeyCodec,
|
||||
SortField,
|
||||
};
|
||||
|
||||
/// Reader to adapt schema of underlying reader to expected schema.
|
||||
pub struct CompatReader<R> {
|
||||
@@ -155,7 +155,9 @@ impl CompatPrimaryKey {
|
||||
batch.primary_key().len() + self.converter.estimated_size().unwrap_or_default(),
|
||||
);
|
||||
buffer.extend_from_slice(batch.primary_key());
|
||||
self.converter.encode_values(&self.values, &mut buffer)?;
|
||||
self.converter
|
||||
.encode_values(&self.values, &mut buffer)
|
||||
.context(EncodeSnafu)?;
|
||||
|
||||
batch.set_primary_key(buffer);
|
||||
|
||||
@@ -405,7 +407,10 @@ impl RewritePrimaryKey {
|
||||
let values = if let Some(pk_values) = batch.pk_values() {
|
||||
pk_values
|
||||
} else {
|
||||
let new_pk_values = self.original.decode(batch.primary_key())?;
|
||||
let new_pk_values = self
|
||||
.original
|
||||
.decode(batch.primary_key())
|
||||
.context(DecodeSnafu)?;
|
||||
batch.set_pk_values(new_pk_values);
|
||||
// Safety: We ensure pk_values is not None.
|
||||
batch.pk_values().as_ref().unwrap()
|
||||
@@ -416,7 +421,9 @@ impl RewritePrimaryKey {
|
||||
);
|
||||
match values {
|
||||
CompositeValues::Dense(values) => {
|
||||
self.new.encode_values(values.as_slice(), &mut buffer)?;
|
||||
self.new
|
||||
.encode_values(values.as_slice(), &mut buffer)
|
||||
.context(EncodeSnafu)?;
|
||||
}
|
||||
CompositeValues::Sparse(values) => {
|
||||
let values = self
|
||||
@@ -427,7 +434,9 @@ impl RewritePrimaryKey {
|
||||
(*id, value.as_value_ref())
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
self.new.encode_value_refs(&values, &mut buffer)?;
|
||||
self.new
|
||||
.encode_value_refs(&values, &mut buffer)
|
||||
.context(EncodeSnafu)?;
|
||||
}
|
||||
}
|
||||
batch.set_primary_key(buffer);
|
||||
@@ -445,12 +454,14 @@ mod tests {
|
||||
use datatypes::schema::ColumnSchema;
|
||||
use datatypes::value::ValueRef;
|
||||
use datatypes::vectors::{Int64Vector, TimestampMillisecondVector, UInt64Vector, UInt8Vector};
|
||||
use mito_codec::row_converter::{
|
||||
DensePrimaryKeyCodec, PrimaryKeyCodecExt, SparsePrimaryKeyCodec,
|
||||
};
|
||||
use store_api::codec::PrimaryKeyEncoding;
|
||||
use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
|
||||
use store_api::storage::RegionId;
|
||||
|
||||
use super::*;
|
||||
use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt, SparsePrimaryKeyCodec};
|
||||
use crate::test_util::{check_reader_result, VecBatchReader};
|
||||
|
||||
/// Creates a new [RegionMetadata].
|
||||
|
||||
@@ -26,6 +26,7 @@ use datatypes::prelude::{ConcreteDataType, DataType};
|
||||
use datatypes::schema::{Schema, SchemaRef};
|
||||
use datatypes::value::Value;
|
||||
use datatypes::vectors::VectorRef;
|
||||
use mito_codec::row_converter::{build_primary_key_codec, CompositeValues, PrimaryKeyCodec};
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
use store_api::metadata::RegionMetadataRef;
|
||||
use store_api::storage::ColumnId;
|
||||
@@ -33,7 +34,6 @@ use store_api::storage::ColumnId;
|
||||
use crate::cache::CacheStrategy;
|
||||
use crate::error::{InvalidRequestSnafu, Result};
|
||||
use crate::read::Batch;
|
||||
use crate::row_converter::{build_primary_key_codec, CompositeValues, PrimaryKeyCodec};
|
||||
|
||||
/// Only cache vector when its length `<=` this value.
|
||||
const MAX_VECTOR_LENGTH_TO_CACHE: usize = 16384;
|
||||
@@ -320,12 +320,12 @@ mod tests {
|
||||
use datatypes::arrow::array::{Int64Array, TimestampMillisecondArray, UInt64Array, UInt8Array};
|
||||
use datatypes::arrow::util::pretty;
|
||||
use datatypes::value::ValueRef;
|
||||
use mito_codec::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt, SortField};
|
||||
use mito_codec::test_util::TestRegionMetadataBuilder;
|
||||
|
||||
use super::*;
|
||||
use crate::cache::CacheManager;
|
||||
use crate::read::BatchBuilder;
|
||||
use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt, SortField};
|
||||
use crate::test_util::meta_util::TestRegionMetadataBuilder;
|
||||
|
||||
fn new_batch(
|
||||
ts_start: i64,
|
||||
|
||||
@@ -962,19 +962,22 @@ impl StreamContext {
|
||||
}
|
||||
}
|
||||
}
|
||||
if verbose {
|
||||
write!(f, "{{")?;
|
||||
}
|
||||
write!(
|
||||
f,
|
||||
"partition_count={} ({} memtable ranges, {} file {} ranges)",
|
||||
"\"partition_count\":{{\"count\":{}, \"mem_ranges\":{}, \"files\":{}, \"file_ranges\":{}}}",
|
||||
self.ranges.len(),
|
||||
num_mem_ranges,
|
||||
self.input.num_files(),
|
||||
num_file_ranges,
|
||||
)?;
|
||||
if let Some(selector) = &self.input.series_row_selector {
|
||||
write!(f, ", selector={}", selector)?;
|
||||
write!(f, ", \"selector\":\"{}\"", selector)?;
|
||||
}
|
||||
if let Some(distribution) = &self.input.distribution {
|
||||
write!(f, ", distribution={}", distribution)?;
|
||||
write!(f, ", \"distribution\":\"{}\"", distribution)?;
|
||||
}
|
||||
|
||||
if verbose {
|
||||
@@ -991,14 +994,15 @@ impl StreamContext {
|
||||
|
||||
impl fmt::Debug for FileWrapper<'_> {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
let (start, end) = self.file.time_range();
|
||||
write!(
|
||||
f,
|
||||
"[file={}, time_range=({}::{}, {}::{}), rows={}, size={}, index_size={}]",
|
||||
r#"{{"file_id":"{}","time_range_start":"{}::{}","time_range_end":"{}::{}","rows":{},"size":{},"index_size":{}}}"#,
|
||||
self.file.file_id(),
|
||||
self.file.time_range().0.value(),
|
||||
self.file.time_range().0.unit(),
|
||||
self.file.time_range().1.value(),
|
||||
self.file.time_range().1.unit(),
|
||||
start.value(),
|
||||
start.unit(),
|
||||
end.value(),
|
||||
end.unit(),
|
||||
self.file.num_rows(),
|
||||
self.file.size(),
|
||||
self.file.index_size()
|
||||
@@ -1014,25 +1018,22 @@ impl StreamContext {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
let output_schema = self.input.mapper.output_schema();
|
||||
if !output_schema.is_empty() {
|
||||
write!(f, ", projection=")?;
|
||||
f.debug_list()
|
||||
.entries(output_schema.column_schemas().iter().map(|col| &col.name))
|
||||
.finish()?;
|
||||
let names: Vec<_> = output_schema
|
||||
.column_schemas()
|
||||
.iter()
|
||||
.map(|col| &col.name)
|
||||
.collect();
|
||||
write!(f, ", \"projection\": {:?}", names)?;
|
||||
}
|
||||
if let Some(predicate) = &self.input.predicate.predicate() {
|
||||
if !predicate.exprs().is_empty() {
|
||||
write!(f, ", filters=[")?;
|
||||
for (i, expr) in predicate.exprs().iter().enumerate() {
|
||||
if i == predicate.exprs().len() - 1 {
|
||||
write!(f, "{}]", expr)?;
|
||||
} else {
|
||||
write!(f, "{}, ", expr)?;
|
||||
}
|
||||
}
|
||||
let exprs: Vec<_> =
|
||||
predicate.exprs().iter().map(|e| e.to_string()).collect();
|
||||
write!(f, ", \"filters\": {:?}", exprs)?;
|
||||
}
|
||||
}
|
||||
if !self.input.files.is_empty() {
|
||||
write!(f, ", files=")?;
|
||||
write!(f, ", \"files\": ")?;
|
||||
f.debug_list()
|
||||
.entries(self.input.files.iter().map(|file| FileWrapper { file }))
|
||||
.finish()?;
|
||||
|
||||
@@ -142,36 +142,36 @@ impl fmt::Debug for ScanMetricsSet {
|
||||
|
||||
write!(
|
||||
f,
|
||||
"{{prepare_scan_cost={prepare_scan_cost:?}, \
|
||||
build_reader_cost={build_reader_cost:?}, \
|
||||
scan_cost={scan_cost:?}, \
|
||||
convert_cost={convert_cost:?}, \
|
||||
yield_cost={yield_cost:?}, \
|
||||
total_cost={total_cost:?}, \
|
||||
num_rows={num_rows}, \
|
||||
num_batches={num_batches}, \
|
||||
num_mem_ranges={num_mem_ranges}, \
|
||||
num_file_ranges={num_file_ranges}, \
|
||||
build_parts_cost={build_parts_cost:?}, \
|
||||
rg_total={rg_total}, \
|
||||
rg_fulltext_filtered={rg_fulltext_filtered}, \
|
||||
rg_inverted_filtered={rg_inverted_filtered}, \
|
||||
rg_minmax_filtered={rg_minmax_filtered}, \
|
||||
rg_bloom_filtered={rg_bloom_filtered}, \
|
||||
rows_before_filter={rows_before_filter}, \
|
||||
rows_fulltext_filtered={rows_fulltext_filtered}, \
|
||||
rows_inverted_filtered={rows_inverted_filtered}, \
|
||||
rows_bloom_filtered={rows_bloom_filtered}, \
|
||||
rows_precise_filtered={rows_precise_filtered}, \
|
||||
num_sst_record_batches={num_sst_record_batches}, \
|
||||
num_sst_batches={num_sst_batches}, \
|
||||
num_sst_rows={num_sst_rows}, \
|
||||
first_poll={first_poll:?}, \
|
||||
num_series_send_timeout={num_series_send_timeout}, \
|
||||
num_distributor_rows={num_distributor_rows}, \
|
||||
num_distributor_batches={num_distributor_batches}, \
|
||||
distributor_scan_cost={distributor_scan_cost:?}, \
|
||||
distributor_yield_cost={distributor_yield_cost:?}}},"
|
||||
"{{\"prepare_scan_cost\":\"{prepare_scan_cost:?}\", \
|
||||
\"build_reader_cost\":\"{build_reader_cost:?}\", \
|
||||
\"scan_cost\":\"{scan_cost:?}\", \
|
||||
\"convert_cost\":\"{convert_cost:?}\", \
|
||||
\"yield_cost\":\"{yield_cost:?}\", \
|
||||
\"total_cost\":\"{total_cost:?}\", \
|
||||
\"num_rows\":{num_rows}, \
|
||||
\"num_batches\":{num_batches}, \
|
||||
\"num_mem_ranges\":{num_mem_ranges}, \
|
||||
\"num_file_ranges\":{num_file_ranges}, \
|
||||
\"build_parts_cost\":\"{build_parts_cost:?}\", \
|
||||
\"rg_total\":{rg_total}, \
|
||||
\"rg_fulltext_filtered\":{rg_fulltext_filtered}, \
|
||||
\"rg_inverted_filtered\":{rg_inverted_filtered}, \
|
||||
\"rg_minmax_filtered\":{rg_minmax_filtered}, \
|
||||
\"rg_bloom_filtered\":{rg_bloom_filtered}, \
|
||||
\"rows_before_filter\":{rows_before_filter}, \
|
||||
\"rows_fulltext_filtered\":{rows_fulltext_filtered}, \
|
||||
\"rows_inverted_filtered\":{rows_inverted_filtered}, \
|
||||
\"rows_bloom_filtered\":{rows_bloom_filtered}, \
|
||||
\"rows_precise_filtered\":{rows_precise_filtered}, \
|
||||
\"num_sst_record_batches\":{num_sst_record_batches}, \
|
||||
\"num_sst_batches\":{num_sst_batches}, \
|
||||
\"num_sst_rows\":{num_sst_rows}, \
|
||||
\"first_poll\":\"{first_poll:?}\", \
|
||||
\"num_series_send_timeout\":{num_series_send_timeout}, \
|
||||
\"num_distributor_rows\":{num_distributor_rows}, \
|
||||
\"num_distributor_batches\":{num_distributor_batches}, \
|
||||
\"distributor_scan_cost\":\"{distributor_scan_cost:?}\", \
|
||||
\"distributor_yield_cost\":\"{distributor_yield_cost:?}\"}}"
|
||||
)
|
||||
}
|
||||
}
|
||||
@@ -390,10 +390,11 @@ impl PartitionMetricsList {
|
||||
/// Format verbose metrics for each partition for explain.
|
||||
pub(crate) fn format_verbose_metrics(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
let list = self.0.lock().unwrap();
|
||||
write!(f, ", metrics_per_partition: ")?;
|
||||
write!(f, ", \"metrics_per_partition\": ")?;
|
||||
f.debug_list()
|
||||
.entries(list.iter().filter_map(|p| p.as_ref()))
|
||||
.finish()
|
||||
.finish()?;
|
||||
write!(f, "}}")
|
||||
}
|
||||
}
|
||||
|
||||
@@ -488,7 +489,11 @@ impl PartitionMetrics {
|
||||
impl fmt::Debug for PartitionMetrics {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
let metrics = self.0.metrics.lock().unwrap();
|
||||
write!(f, "[partition={}, {:?}]", self.0.partition, metrics)
|
||||
write!(
|
||||
f,
|
||||
r#"{{"partition":{}, "metrics":{:?}}}"#,
|
||||
self.0.partition, metrics
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -922,11 +922,12 @@ mod tests {
|
||||
use api::v1::{Row, SemanticType};
|
||||
use datatypes::prelude::ConcreteDataType;
|
||||
use datatypes::schema::ColumnDefaultConstraint;
|
||||
use mito_codec::test_util::i64_value;
|
||||
use store_api::metadata::RegionMetadataBuilder;
|
||||
|
||||
use super::*;
|
||||
use crate::error::Error;
|
||||
use crate::test_util::{i64_value, ts_ms_value};
|
||||
use crate::test_util::ts_ms_value;
|
||||
|
||||
fn new_column_schema(
|
||||
name: &str,
|
||||
|
||||
@@ -20,13 +20,13 @@ use std::sync::Arc;
|
||||
use common_recordbatch::filter::SimpleFilterEvaluator;
|
||||
use datatypes::value::{Value, ValueRef};
|
||||
pub use dense::{DensePrimaryKeyCodec, SortField};
|
||||
use mito_codec::key_values::KeyValue;
|
||||
pub use sparse::{SparsePrimaryKeyCodec, SparseValues, COLUMN_ID_ENCODE_SIZE};
|
||||
use store_api::codec::PrimaryKeyEncoding;
|
||||
use store_api::metadata::{RegionMetadata, RegionMetadataRef};
|
||||
use store_api::storage::ColumnId;
|
||||
|
||||
use crate::error::Result;
|
||||
use crate::memtable::key_values::KeyValue;
|
||||
|
||||
/// Row value encoder/decoder.
|
||||
pub trait PrimaryKeyCodecExt {
|
||||
|
||||
@@ -13,7 +13,6 @@
|
||||
// limitations under the License.
|
||||
|
||||
pub(crate) mod bloom_filter;
|
||||
mod codec;
|
||||
pub(crate) mod fulltext_index;
|
||||
mod indexer;
|
||||
pub mod intermediate;
|
||||
|
||||
@@ -22,6 +22,8 @@ use datatypes::data_type::ConcreteDataType;
|
||||
use datatypes::value::Value;
|
||||
use index::bloom_filter::applier::InListPredicate;
|
||||
use index::Bytes;
|
||||
use mito_codec::index::IndexValueCodec;
|
||||
use mito_codec::row_converter::SortField;
|
||||
use object_store::ObjectStore;
|
||||
use puffin::puffin_manager::cache::PuffinMetadataCacheRef;
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
@@ -30,10 +32,8 @@ use store_api::storage::ColumnId;
|
||||
|
||||
use crate::cache::file_cache::FileCacheRef;
|
||||
use crate::cache::index::bloom_filter_index::BloomFilterIndexCacheRef;
|
||||
use crate::error::{ColumnNotFoundSnafu, ConvertValueSnafu, Result};
|
||||
use crate::row_converter::SortField;
|
||||
use crate::error::{ColumnNotFoundSnafu, ConvertValueSnafu, EncodeSnafu, Result};
|
||||
use crate::sst::index::bloom_filter::applier::BloomFilterIndexApplier;
|
||||
use crate::sst::index::codec::IndexValueCodec;
|
||||
use crate::sst::index::puffin_manager::PuffinManagerFactory;
|
||||
|
||||
pub struct BloomFilterIndexApplierBuilder<'a> {
|
||||
@@ -322,7 +322,8 @@ fn encode_lit(lit: &ScalarValue, data_type: ConcreteDataType) -> Result<Bytes> {
|
||||
let value = Value::try_from(lit.clone()).context(ConvertValueSnafu)?;
|
||||
let mut bytes = vec![];
|
||||
let field = SortField::new(data_type);
|
||||
IndexValueCodec::encode_nonnull_value(value.as_value_ref(), &field, &mut bytes)?;
|
||||
IndexValueCodec::encode_nonnull_value(value.as_value_ref(), &field, &mut bytes)
|
||||
.context(EncodeSnafu)?;
|
||||
Ok(bytes)
|
||||
}
|
||||
|
||||
|
||||
@@ -19,6 +19,8 @@ use std::sync::Arc;
|
||||
use common_telemetry::{debug, warn};
|
||||
use datatypes::schema::SkippingIndexType;
|
||||
use index::bloom_filter::creator::BloomFilterCreator;
|
||||
use mito_codec::index::{IndexValueCodec, IndexValuesCodec};
|
||||
use mito_codec::row_converter::SortField;
|
||||
use puffin::puffin_manager::{PuffinWriter, PutOptions};
|
||||
use snafu::{ensure, ResultExt};
|
||||
use store_api::metadata::RegionMetadataRef;
|
||||
@@ -26,14 +28,12 @@ use store_api::storage::ColumnId;
|
||||
use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
|
||||
|
||||
use crate::error::{
|
||||
BiErrorsSnafu, BloomFilterFinishSnafu, IndexOptionsSnafu, OperateAbortedIndexSnafu,
|
||||
PuffinAddBlobSnafu, PushBloomFilterValueSnafu, Result,
|
||||
BiErrorsSnafu, BloomFilterFinishSnafu, EncodeSnafu, IndexOptionsSnafu,
|
||||
OperateAbortedIndexSnafu, PuffinAddBlobSnafu, PushBloomFilterValueSnafu, Result,
|
||||
};
|
||||
use crate::read::Batch;
|
||||
use crate::row_converter::SortField;
|
||||
use crate::sst::file::FileId;
|
||||
use crate::sst::index::bloom_filter::INDEX_BLOB_TYPE;
|
||||
use crate::sst::index::codec::{IndexValueCodec, IndexValuesCodec};
|
||||
use crate::sst::index::intermediate::{
|
||||
IntermediateLocation, IntermediateManager, TempFileProvider,
|
||||
};
|
||||
@@ -210,7 +210,8 @@ impl BloomFilterIndexer {
|
||||
v.as_value_ref(),
|
||||
field,
|
||||
&mut buf,
|
||||
)?;
|
||||
)
|
||||
.context(EncodeSnafu)?;
|
||||
Ok(buf)
|
||||
})
|
||||
.transpose()?;
|
||||
@@ -234,11 +235,8 @@ impl BloomFilterIndexer {
|
||||
let elems = (!value.is_null())
|
||||
.then(|| {
|
||||
let mut buf = vec![];
|
||||
IndexValueCodec::encode_nonnull_value(
|
||||
value,
|
||||
&sort_field,
|
||||
&mut buf,
|
||||
)?;
|
||||
IndexValueCodec::encode_nonnull_value(value, &sort_field, &mut buf)
|
||||
.context(EncodeSnafu)?;
|
||||
Ok(buf)
|
||||
})
|
||||
.transpose()?;
|
||||
@@ -353,6 +351,7 @@ pub(crate) mod tests {
|
||||
use datatypes::value::ValueRef;
|
||||
use datatypes::vectors::{UInt64Vector, UInt8Vector};
|
||||
use index::bloom_filter::reader::{BloomFilterReader, BloomFilterReaderImpl};
|
||||
use mito_codec::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt};
|
||||
use object_store::services::Memory;
|
||||
use object_store::ObjectStore;
|
||||
use puffin::puffin_manager::{PuffinManager, PuffinReader};
|
||||
@@ -362,7 +361,6 @@ pub(crate) mod tests {
|
||||
use super::*;
|
||||
use crate::access_layer::FilePathProvider;
|
||||
use crate::read::BatchColumn;
|
||||
use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt};
|
||||
use crate::sst::index::puffin_manager::PuffinManagerFactory;
|
||||
|
||||
pub fn mock_object_store() -> ObjectStore {
|
||||
|
||||
@@ -29,7 +29,7 @@ use store_api::metadata::RegionMetadataRef;
|
||||
use store_api::storage::{ColumnId, ConcreteDataType, RegionId};
|
||||
|
||||
use crate::error::{
|
||||
CastVectorSnafu, CreateFulltextCreatorSnafu, FieldTypeMismatchSnafu, FulltextFinishSnafu,
|
||||
CastVectorSnafu, CreateFulltextCreatorSnafu, DataTypeMismatchSnafu, FulltextFinishSnafu,
|
||||
FulltextPushTextSnafu, IndexOptionsSnafu, OperateAbortedIndexSnafu, Result,
|
||||
};
|
||||
use crate::read::Batch;
|
||||
@@ -259,7 +259,7 @@ impl SingleCreator {
|
||||
let data = data.get_ref(i);
|
||||
let text = data
|
||||
.as_string()
|
||||
.context(FieldTypeMismatchSnafu)?
|
||||
.context(DataTypeMismatchSnafu)?
|
||||
.unwrap_or_default();
|
||||
self.inner.push_text(text).await?;
|
||||
}
|
||||
|
||||
@@ -27,6 +27,8 @@ use datatypes::data_type::ConcreteDataType;
|
||||
use datatypes::value::Value;
|
||||
use index::inverted_index::search::index_apply::PredicatesIndexApplier;
|
||||
use index::inverted_index::search::predicate::Predicate;
|
||||
use mito_codec::index::IndexValueCodec;
|
||||
use mito_codec::row_converter::SortField;
|
||||
use object_store::ObjectStore;
|
||||
use puffin::puffin_manager::cache::PuffinMetadataCacheRef;
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
@@ -35,9 +37,9 @@ use store_api::storage::ColumnId;
|
||||
|
||||
use crate::cache::file_cache::FileCacheRef;
|
||||
use crate::cache::index::inverted_index::InvertedIndexCacheRef;
|
||||
use crate::error::{BuildIndexApplierSnafu, ColumnNotFoundSnafu, ConvertValueSnafu, Result};
|
||||
use crate::row_converter::SortField;
|
||||
use crate::sst::index::codec::IndexValueCodec;
|
||||
use crate::error::{
|
||||
BuildIndexApplierSnafu, ColumnNotFoundSnafu, ConvertValueSnafu, EncodeSnafu, Result,
|
||||
};
|
||||
use crate::sst::index::inverted_index::applier::InvertedIndexApplier;
|
||||
use crate::sst::index::puffin_manager::PuffinManagerFactory;
|
||||
|
||||
@@ -230,7 +232,8 @@ impl<'a> InvertedIndexApplierBuilder<'a> {
|
||||
let value = Value::try_from(lit.clone()).context(ConvertValueSnafu)?;
|
||||
let mut bytes = vec![];
|
||||
let field = SortField::new(data_type);
|
||||
IndexValueCodec::encode_nonnull_value(value.as_value_ref(), &field, &mut bytes)?;
|
||||
IndexValueCodec::encode_nonnull_value(value.as_value_ref(), &field, &mut bytes)
|
||||
.context(EncodeSnafu)?;
|
||||
Ok(bytes)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -194,7 +194,7 @@ mod tests {
|
||||
};
|
||||
|
||||
let res = builder.collect_between(&between);
|
||||
assert!(matches!(res, Err(Error::FieldTypeMismatch { .. })));
|
||||
assert!(matches!(res, Err(Error::Encode { .. })));
|
||||
assert!(builder.output.is_empty());
|
||||
}
|
||||
|
||||
|
||||
@@ -264,7 +264,7 @@ mod tests {
|
||||
);
|
||||
|
||||
let res = builder.collect_comparison_expr(&tag_column(), &Operator::Lt, &int64_lit(10));
|
||||
assert!(matches!(res, Err(Error::FieldTypeMismatch { .. })));
|
||||
assert!(matches!(res, Err(Error::Encode { .. })));
|
||||
assert!(builder.output.is_empty());
|
||||
}
|
||||
|
||||
|
||||
@@ -226,7 +226,7 @@ mod tests {
|
||||
);
|
||||
|
||||
let res = builder.collect_eq(&tag_column(), &int64_lit(1));
|
||||
assert!(matches!(res, Err(Error::FieldTypeMismatch { .. })));
|
||||
assert!(matches!(res, Err(Error::Encode { .. })));
|
||||
assert!(builder.output.is_empty());
|
||||
}
|
||||
|
||||
|
||||
@@ -167,7 +167,7 @@ mod tests {
|
||||
};
|
||||
|
||||
let res = builder.collect_inlist(&in_list);
|
||||
assert!(matches!(res, Err(Error::FieldTypeMismatch { .. })));
|
||||
assert!(matches!(res, Err(Error::Encode { .. })));
|
||||
assert!(builder.output.is_empty());
|
||||
}
|
||||
|
||||
|
||||
@@ -22,6 +22,8 @@ use index::inverted_index::create::sort::external_sort::ExternalSorter;
|
||||
use index::inverted_index::create::sort_create::SortIndexCreator;
|
||||
use index::inverted_index::create::InvertedIndexCreator;
|
||||
use index::inverted_index::format::writer::InvertedIndexBlobWriter;
|
||||
use mito_codec::index::{IndexValueCodec, IndexValuesCodec};
|
||||
use mito_codec::row_converter::SortField;
|
||||
use puffin::puffin_manager::{PuffinWriter, PutOptions};
|
||||
use snafu::{ensure, ResultExt};
|
||||
use store_api::metadata::RegionMetadataRef;
|
||||
@@ -30,13 +32,11 @@ use tokio::io::duplex;
|
||||
use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
|
||||
|
||||
use crate::error::{
|
||||
BiErrorsSnafu, IndexFinishSnafu, OperateAbortedIndexSnafu, PuffinAddBlobSnafu,
|
||||
BiErrorsSnafu, EncodeSnafu, IndexFinishSnafu, OperateAbortedIndexSnafu, PuffinAddBlobSnafu,
|
||||
PushIndexValueSnafu, Result,
|
||||
};
|
||||
use crate::read::Batch;
|
||||
use crate::row_converter::SortField;
|
||||
use crate::sst::file::FileId;
|
||||
use crate::sst::index::codec::{IndexValueCodec, IndexValuesCodec};
|
||||
use crate::sst::index::intermediate::{
|
||||
IntermediateLocation, IntermediateManager, TempFileProvider,
|
||||
};
|
||||
@@ -205,7 +205,8 @@ impl InvertedIndexer {
|
||||
v.as_value_ref(),
|
||||
field,
|
||||
&mut self.value_buf,
|
||||
)?;
|
||||
)
|
||||
.context(EncodeSnafu)?;
|
||||
Ok(self.value_buf.as_slice())
|
||||
})
|
||||
.transpose()?;
|
||||
@@ -238,7 +239,8 @@ impl InvertedIndexer {
|
||||
value,
|
||||
&sort_field,
|
||||
&mut self.value_buf,
|
||||
)?;
|
||||
)
|
||||
.context(EncodeSnafu)?;
|
||||
self.index_creator
|
||||
.push_with_name(col_id_str, Some(&self.value_buf))
|
||||
.await
|
||||
@@ -334,6 +336,7 @@ mod tests {
|
||||
use datatypes::value::ValueRef;
|
||||
use datatypes::vectors::{UInt64Vector, UInt8Vector};
|
||||
use futures::future::BoxFuture;
|
||||
use mito_codec::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt};
|
||||
use object_store::services::Memory;
|
||||
use object_store::ObjectStore;
|
||||
use puffin::puffin_manager::cache::PuffinMetadataCache;
|
||||
@@ -346,7 +349,6 @@ mod tests {
|
||||
use crate::cache::index::inverted_index::InvertedIndexCache;
|
||||
use crate::metrics::CACHE_BYTES;
|
||||
use crate::read::BatchColumn;
|
||||
use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt};
|
||||
use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder;
|
||||
use crate::sst::index::puffin_manager::PuffinManagerFactory;
|
||||
|
||||
|
||||
@@ -22,18 +22,19 @@ use api::v1::{OpType, SemanticType};
|
||||
use common_telemetry::error;
|
||||
use datatypes::arrow::array::BooleanArray;
|
||||
use datatypes::arrow::buffer::BooleanBuffer;
|
||||
use mito_codec::row_converter::{CompositeValues, PrimaryKeyCodec};
|
||||
use parquet::arrow::arrow_reader::RowSelection;
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
use store_api::storage::TimeSeriesRowSelector;
|
||||
|
||||
use crate::error::{
|
||||
DecodeStatsSnafu, FieldTypeMismatchSnafu, RecordBatchSnafu, Result, StatsNotPresentSnafu,
|
||||
DataTypeMismatchSnafu, DecodeSnafu, DecodeStatsSnafu, RecordBatchSnafu, Result,
|
||||
StatsNotPresentSnafu,
|
||||
};
|
||||
use crate::read::compat::CompatBatch;
|
||||
use crate::read::last_row::RowGroupLastRowCachedReader;
|
||||
use crate::read::prune::PruneReader;
|
||||
use crate::read::Batch;
|
||||
use crate::row_converter::{CompositeValues, PrimaryKeyCodec};
|
||||
use crate::sst::file::FileHandle;
|
||||
use crate::sst::parquet::format::ReadFormat;
|
||||
use crate::sst::parquet::reader::{
|
||||
@@ -270,7 +271,11 @@ impl RangeBase {
|
||||
let pk_values = if let Some(pk_values) = input.pk_values() {
|
||||
pk_values
|
||||
} else {
|
||||
input.set_pk_values(self.codec.decode(input.primary_key())?);
|
||||
input.set_pk_values(
|
||||
self.codec
|
||||
.decode(input.primary_key())
|
||||
.context(DecodeSnafu)?,
|
||||
);
|
||||
input.pk_values().unwrap()
|
||||
};
|
||||
let pk_value = match pk_values {
|
||||
@@ -284,12 +289,12 @@ impl RangeBase {
|
||||
v[pk_index]
|
||||
.1
|
||||
.try_to_scalar_value(filter_ctx.data_type())
|
||||
.context(FieldTypeMismatchSnafu)?
|
||||
.context(DataTypeMismatchSnafu)?
|
||||
}
|
||||
CompositeValues::Sparse(v) => {
|
||||
let v = v.get_or_null(filter_ctx.column_id());
|
||||
v.try_to_scalar_value(filter_ctx.data_type())
|
||||
.context(FieldTypeMismatchSnafu)?
|
||||
.context(DataTypeMismatchSnafu)?
|
||||
}
|
||||
};
|
||||
if filter
|
||||
|
||||
@@ -38,6 +38,7 @@ use datatypes::arrow::datatypes::{SchemaRef, UInt32Type};
|
||||
use datatypes::arrow::record_batch::RecordBatch;
|
||||
use datatypes::prelude::DataType;
|
||||
use datatypes::vectors::{Helper, Vector};
|
||||
use mito_codec::row_converter::{build_primary_key_codec_with_fields, SortField};
|
||||
use parquet::file::metadata::{ParquetMetaData, RowGroupMetaData};
|
||||
use parquet::file::statistics::Statistics;
|
||||
use snafu::{ensure, OptionExt, ResultExt};
|
||||
@@ -48,7 +49,6 @@ use crate::error::{
|
||||
ConvertVectorSnafu, InvalidBatchSnafu, InvalidRecordBatchSnafu, NewRecordBatchSnafu, Result,
|
||||
};
|
||||
use crate::read::{Batch, BatchBuilder, BatchColumn};
|
||||
use crate::row_converter::{build_primary_key_codec_with_fields, SortField};
|
||||
use crate::sst::file::{FileMeta, FileTimeRange};
|
||||
use crate::sst::to_sst_arrow_schema;
|
||||
|
||||
|
||||
@@ -26,6 +26,7 @@ use datafusion_expr::Expr;
|
||||
use datatypes::arrow::error::ArrowError;
|
||||
use datatypes::arrow::record_batch::RecordBatch;
|
||||
use datatypes::data_type::ConcreteDataType;
|
||||
use mito_codec::row_converter::build_primary_key_codec;
|
||||
use object_store::ObjectStore;
|
||||
use parquet::arrow::arrow_reader::{ParquetRecordBatchReader, RowSelection};
|
||||
use parquet::arrow::{parquet_to_arrow_field_levels, FieldLevels, ProjectionMask};
|
||||
@@ -48,7 +49,6 @@ use crate::metrics::{
|
||||
};
|
||||
use crate::read::prune::{PruneReader, Source};
|
||||
use crate::read::{Batch, BatchReader};
|
||||
use crate::row_converter::build_primary_key_codec;
|
||||
use crate::sst::file::{FileHandle, FileId};
|
||||
use crate::sst::index::bloom_filter::applier::BloomFilterIndexApplierRef;
|
||||
use crate::sst::index::fulltext_index::applier::FulltextIndexApplierRef;
|
||||
|
||||
@@ -16,7 +16,6 @@
|
||||
|
||||
pub mod batch_util;
|
||||
pub mod memtable_util;
|
||||
pub mod meta_util;
|
||||
pub mod scheduler_util;
|
||||
pub mod sst_util;
|
||||
pub mod version_util;
|
||||
@@ -841,13 +840,6 @@ impl CreateRequestBuilder {
|
||||
}
|
||||
}
|
||||
|
||||
/// Creates value for i64.
|
||||
pub(crate) fn i64_value(data: i64) -> v1::Value {
|
||||
v1::Value {
|
||||
value_data: Some(ValueData::I64Value(data)),
|
||||
}
|
||||
}
|
||||
|
||||
/// Creates value for timestamp millis.
|
||||
pub(crate) fn ts_ms_value(data: i64) -> v1::Value {
|
||||
v1::Value {
|
||||
|
||||
@@ -25,20 +25,20 @@ use datatypes::data_type::ConcreteDataType;
|
||||
use datatypes::scalars::ScalarVector;
|
||||
use datatypes::schema::ColumnSchema;
|
||||
use datatypes::vectors::TimestampMillisecondVector;
|
||||
use mito_codec::key_values::KeyValue;
|
||||
use mito_codec::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt, SortField};
|
||||
use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder, RegionMetadataRef};
|
||||
use store_api::storage::{ColumnId, RegionId, SequenceNumber};
|
||||
use table::predicate::Predicate;
|
||||
|
||||
use crate::error::Result;
|
||||
use crate::memtable::bulk::part::BulkPart;
|
||||
use crate::memtable::key_values::KeyValue;
|
||||
use crate::memtable::partition_tree::data::{timestamp_array_to_i64_slice, DataBatch, DataBuffer};
|
||||
use crate::memtable::{
|
||||
BoxedBatchIterator, KeyValues, Memtable, MemtableBuilder, MemtableId, MemtableRanges,
|
||||
MemtableRef, MemtableStats,
|
||||
};
|
||||
use crate::read::scan_region::PredicateGroup;
|
||||
use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt, SortField};
|
||||
|
||||
/// Empty memtable for test.
|
||||
#[derive(Debug, Default)]
|
||||
|
||||
@@ -22,6 +22,7 @@ use datatypes::arrow::array::{BinaryArray, TimestampMillisecondArray, UInt64Arra
|
||||
use datatypes::prelude::ConcreteDataType;
|
||||
use datatypes::schema::ColumnSchema;
|
||||
use datatypes::value::ValueRef;
|
||||
use mito_codec::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt, SortField};
|
||||
use parquet::file::metadata::ParquetMetaData;
|
||||
use store_api::metadata::{
|
||||
ColumnMetadata, RegionMetadata, RegionMetadataBuilder, RegionMetadataRef,
|
||||
@@ -29,7 +30,6 @@ use store_api::metadata::{
|
||||
use store_api::storage::RegionId;
|
||||
|
||||
use crate::read::{Batch, BatchBuilder, Source};
|
||||
use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt, SortField};
|
||||
use crate::sst::file::{FileHandle, FileId, FileMeta};
|
||||
use crate::test_util::{new_batch_builder, new_noop_file_purger, VecBatchReader};
|
||||
|
||||
|
||||
@@ -15,6 +15,7 @@
|
||||
//! Utilities to mock version.
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::num::NonZeroU64;
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::v1::value::ValueData;
|
||||
@@ -103,7 +104,7 @@ impl VersionControlBuilder {
|
||||
index_file_size: 0,
|
||||
num_rows: 0,
|
||||
num_row_groups: 0,
|
||||
sequence: None,
|
||||
sequence: NonZeroU64::new(start_ms as u64),
|
||||
},
|
||||
);
|
||||
self
|
||||
@@ -196,7 +197,7 @@ pub(crate) fn apply_edit(
|
||||
index_file_size: 0,
|
||||
num_rows: 0,
|
||||
num_row_groups: 0,
|
||||
sequence: None,
|
||||
sequence: NonZeroU64::new(*start_ms as u64),
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
|
||||
@@ -26,6 +26,7 @@ common-base.workspace = true
|
||||
common-catalog.workspace = true
|
||||
common-datasource.workspace = true
|
||||
common-error.workspace = true
|
||||
common-frontend.workspace = true
|
||||
common-function.workspace = true
|
||||
common-grpc.workspace = true
|
||||
common-grpc-expr.workspace = true
|
||||
|
||||
@@ -12,18 +12,29 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::collections::HashSet;
|
||||
|
||||
use ahash::{HashMap, HashMapExt};
|
||||
use api::v1::flow::DirtyWindowRequest;
|
||||
use api::v1::region::{
|
||||
bulk_insert_request, region_request, BulkInsertRequest, RegionRequest, RegionRequestHeader,
|
||||
};
|
||||
use api::v1::ArrowIpc;
|
||||
use arrow::array::{
|
||||
Array, TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray,
|
||||
TimestampSecondArray,
|
||||
};
|
||||
use arrow::datatypes::{DataType, Int64Type, TimeUnit};
|
||||
use arrow::record_batch::RecordBatch;
|
||||
use common_base::AffectedRows;
|
||||
use common_grpc::flight::{FlightDecoder, FlightEncoder, FlightMessage};
|
||||
use common_grpc::FlightData;
|
||||
use common_telemetry::error;
|
||||
use common_telemetry::tracing_context::TracingContext;
|
||||
use snafu::ResultExt;
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
use store_api::storage::RegionId;
|
||||
use table::metadata::TableId;
|
||||
use table::metadata::TableInfoRef;
|
||||
use table::TableRef;
|
||||
|
||||
use crate::insert::Inserter;
|
||||
use crate::{error, metrics};
|
||||
@@ -32,10 +43,12 @@ impl Inserter {
|
||||
/// Handle bulk insert request.
|
||||
pub async fn handle_bulk_insert(
|
||||
&self,
|
||||
table_id: TableId,
|
||||
table: TableRef,
|
||||
decoder: &mut FlightDecoder,
|
||||
data: FlightData,
|
||||
) -> error::Result<AffectedRows> {
|
||||
let table_info = table.table_info();
|
||||
let table_id = table_info.table_id();
|
||||
let decode_timer = metrics::HANDLE_BULK_INSERT_ELAPSED
|
||||
.with_label_values(&["decode_request"])
|
||||
.start_timer();
|
||||
@@ -48,6 +61,10 @@ impl Inserter {
|
||||
return Ok(0);
|
||||
};
|
||||
decode_timer.observe_duration();
|
||||
|
||||
// notify flownode to update dirty timestamps if flow is configured.
|
||||
self.maybe_update_flow_dirty_window(table_info, record_batch.clone());
|
||||
|
||||
metrics::BULK_REQUEST_MESSAGE_SIZE.observe(body_size as f64);
|
||||
metrics::BULK_REQUEST_ROWS
|
||||
.with_label_values(&["raw"])
|
||||
@@ -216,4 +233,103 @@ impl Inserter {
|
||||
crate::metrics::DIST_INGEST_ROW_COUNT.inc_by(rows_inserted as u64);
|
||||
Ok(rows_inserted)
|
||||
}
|
||||
|
||||
fn maybe_update_flow_dirty_window(&self, table_info: TableInfoRef, record_batch: RecordBatch) {
|
||||
let table_id = table_info.table_id();
|
||||
let table_flownode_set_cache = self.table_flownode_set_cache.clone();
|
||||
let node_manager = self.node_manager.clone();
|
||||
common_runtime::spawn_global(async move {
|
||||
let result = table_flownode_set_cache
|
||||
.get(table_id)
|
||||
.await
|
||||
.context(error::RequestInsertsSnafu);
|
||||
let flownodes = match result {
|
||||
Ok(flownodes) => flownodes.unwrap_or_default(),
|
||||
Err(e) => {
|
||||
error!(e; "Failed to get flownodes for table id: {}", table_id);
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
let peers: HashSet<_> = flownodes.values().cloned().collect();
|
||||
if peers.is_empty() {
|
||||
return;
|
||||
}
|
||||
|
||||
let Ok(timestamps) = extract_timestamps(
|
||||
&record_batch,
|
||||
&table_info
|
||||
.meta
|
||||
.schema
|
||||
.timestamp_column()
|
||||
.as_ref()
|
||||
.unwrap()
|
||||
.name,
|
||||
)
|
||||
.inspect_err(|e| {
|
||||
error!(e; "Failed to extract timestamps from record batch");
|
||||
}) else {
|
||||
return;
|
||||
};
|
||||
|
||||
for peer in peers {
|
||||
let node_manager = node_manager.clone();
|
||||
let timestamps = timestamps.clone();
|
||||
common_runtime::spawn_global(async move {
|
||||
if let Err(e) = node_manager
|
||||
.flownode(&peer)
|
||||
.await
|
||||
.handle_mark_window_dirty(DirtyWindowRequest {
|
||||
table_id,
|
||||
timestamps,
|
||||
})
|
||||
.await
|
||||
.context(error::RequestInsertsSnafu)
|
||||
{
|
||||
error!(e; "Failed to mark timestamps as dirty, table: {}", table_id);
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
/// Calculate the timestamp range of record batch. Return `None` if record batch is empty.
|
||||
fn extract_timestamps(rb: &RecordBatch, timestamp_index_name: &str) -> error::Result<Vec<i64>> {
|
||||
let ts_col = rb
|
||||
.column_by_name(timestamp_index_name)
|
||||
.context(error::ColumnNotFoundSnafu {
|
||||
msg: timestamp_index_name,
|
||||
})?;
|
||||
if rb.num_rows() == 0 {
|
||||
return Ok(vec![]);
|
||||
}
|
||||
let primitive = match ts_col.data_type() {
|
||||
DataType::Timestamp(unit, _) => match unit {
|
||||
TimeUnit::Second => ts_col
|
||||
.as_any()
|
||||
.downcast_ref::<TimestampSecondArray>()
|
||||
.unwrap()
|
||||
.reinterpret_cast::<Int64Type>(),
|
||||
TimeUnit::Millisecond => ts_col
|
||||
.as_any()
|
||||
.downcast_ref::<TimestampMillisecondArray>()
|
||||
.unwrap()
|
||||
.reinterpret_cast::<Int64Type>(),
|
||||
TimeUnit::Microsecond => ts_col
|
||||
.as_any()
|
||||
.downcast_ref::<TimestampMicrosecondArray>()
|
||||
.unwrap()
|
||||
.reinterpret_cast::<Int64Type>(),
|
||||
TimeUnit::Nanosecond => ts_col
|
||||
.as_any()
|
||||
.downcast_ref::<TimestampNanosecondArray>()
|
||||
.unwrap()
|
||||
.reinterpret_cast::<Int64Type>(),
|
||||
},
|
||||
t => {
|
||||
return error::InvalidTimeIndexTypeSnafu { ty: t.clone() }.fail();
|
||||
}
|
||||
};
|
||||
Ok(primitive.iter().flatten().collect())
|
||||
}
|
||||
|
||||
@@ -837,6 +837,22 @@ pub enum Error {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Invalid time index type: {}", ty))]
|
||||
InvalidTimeIndexType {
|
||||
ty: arrow::datatypes::DataType,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Invalid process id: {}", id))]
|
||||
InvalidProcessId { id: String },
|
||||
|
||||
#[snafu(display("ProcessManager is not present, this can be caused by misconfiguration."))]
|
||||
ProcessManagerMissing {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
}
|
||||
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
@@ -964,6 +980,9 @@ impl ErrorExt for Error {
|
||||
Error::ColumnOptions { source, .. } => source.status_code(),
|
||||
Error::DecodeFlightData { source, .. } => source.status_code(),
|
||||
Error::ComputeArrow { .. } => StatusCode::Internal,
|
||||
Error::InvalidTimeIndexType { .. } => StatusCode::InvalidArguments,
|
||||
Error::InvalidProcessId { .. } => StatusCode::InvalidArguments,
|
||||
Error::ProcessManagerMissing { .. } => StatusCode::Unexpected,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -78,7 +78,7 @@ pub struct Inserter {
|
||||
catalog_manager: CatalogManagerRef,
|
||||
pub(crate) partition_manager: PartitionRuleManagerRef,
|
||||
pub(crate) node_manager: NodeManagerRef,
|
||||
table_flownode_set_cache: TableFlownodeSetCacheRef,
|
||||
pub(crate) table_flownode_set_cache: TableFlownodeSetCacheRef,
|
||||
}
|
||||
|
||||
pub type InserterRef = Arc<Inserter>;
|
||||
|
||||
@@ -21,6 +21,7 @@ mod cursor;
|
||||
mod ddl;
|
||||
mod describe;
|
||||
mod dml;
|
||||
mod kill;
|
||||
mod set;
|
||||
mod show;
|
||||
mod tql;
|
||||
@@ -32,6 +33,7 @@ use std::time::Duration;
|
||||
|
||||
use async_stream::stream;
|
||||
use catalog::kvbackend::KvBackendCatalogManager;
|
||||
use catalog::process_manager::ProcessManagerRef;
|
||||
use catalog::CatalogManagerRef;
|
||||
use client::{OutputData, RecordBatches};
|
||||
use common_error::ext::BoxedError;
|
||||
@@ -94,11 +96,13 @@ pub struct StatementExecutor {
|
||||
partition_manager: PartitionRuleManagerRef,
|
||||
cache_invalidator: CacheInvalidatorRef,
|
||||
inserter: InserterRef,
|
||||
process_manager: Option<ProcessManagerRef>,
|
||||
}
|
||||
|
||||
pub type StatementExecutorRef = Arc<StatementExecutor>;
|
||||
|
||||
impl StatementExecutor {
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub fn new(
|
||||
catalog_manager: CatalogManagerRef,
|
||||
query_engine: QueryEngineRef,
|
||||
@@ -107,6 +111,7 @@ impl StatementExecutor {
|
||||
cache_invalidator: CacheInvalidatorRef,
|
||||
inserter: InserterRef,
|
||||
table_route_cache: TableRouteCacheRef,
|
||||
process_manager: Option<ProcessManagerRef>,
|
||||
) -> Self {
|
||||
Self {
|
||||
catalog_manager,
|
||||
@@ -118,6 +123,7 @@ impl StatementExecutor {
|
||||
partition_manager: Arc::new(PartitionRuleManager::new(kv_backend, table_route_cache)),
|
||||
cache_invalidator,
|
||||
inserter,
|
||||
process_manager,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -363,6 +369,7 @@ impl StatementExecutor {
|
||||
Statement::ShowSearchPath(_) => self.show_search_path(query_ctx).await,
|
||||
Statement::Use(db) => self.use_database(db, query_ctx).await,
|
||||
Statement::Admin(admin) => self.execute_admin_command(admin, query_ctx).await,
|
||||
Statement::Kill(id) => self.execute_kill(query_ctx, id).await,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
46
src/operator/src/statement/kill.rs
Normal file
46
src/operator/src/statement/kill.rs
Normal file
@@ -0,0 +1,46 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use common_frontend::DisplayProcessId;
|
||||
use common_query::Output;
|
||||
use common_telemetry::error;
|
||||
use session::context::QueryContextRef;
|
||||
use snafu::ResultExt;
|
||||
|
||||
use crate::error;
|
||||
use crate::statement::StatementExecutor;
|
||||
|
||||
impl StatementExecutor {
|
||||
pub async fn execute_kill(
|
||||
&self,
|
||||
query_ctx: QueryContextRef,
|
||||
process_id: String,
|
||||
) -> crate::error::Result<Output> {
|
||||
let Some(process_manager) = self.process_manager.as_ref() else {
|
||||
error!("Process manager is not initialized");
|
||||
return error::ProcessManagerMissingSnafu.fail();
|
||||
};
|
||||
|
||||
let display_id = DisplayProcessId::try_from(process_id.as_str())
|
||||
.map_err(|_| error::InvalidProcessIdSnafu { id: process_id }.build())?;
|
||||
|
||||
let current_user_catalog = query_ctx.current_catalog().to_string();
|
||||
process_manager
|
||||
.kill_process(display_id.server_addr, current_user_catalog, display_id.id)
|
||||
.await
|
||||
.context(error::CatalogSnafu)?;
|
||||
|
||||
Ok(Output::new_with_affected_rows(0))
|
||||
}
|
||||
}
|
||||
@@ -478,6 +478,21 @@ impl QueryEngine for DatafusionQueryEngine {
|
||||
fn engine_context(&self, query_ctx: QueryContextRef) -> QueryEngineContext {
|
||||
let mut state = self.state.session_state();
|
||||
state.config_mut().set_extension(query_ctx.clone());
|
||||
// note that hints in "x-greptime-hints" is automatically parsed
|
||||
// and set to query context's extension, so we can get it from query context.
|
||||
if let Some(parallelism) = query_ctx.extension("query_parallelism") {
|
||||
if let Ok(n) = parallelism.parse::<u64>() {
|
||||
if n > 0 {
|
||||
let new_cfg = state.config().clone().with_target_partitions(n as usize);
|
||||
*state.config_mut() = new_cfg;
|
||||
}
|
||||
} else {
|
||||
common_telemetry::warn!(
|
||||
"Failed to parse query_parallelism: {}, using default value",
|
||||
parallelism
|
||||
);
|
||||
}
|
||||
}
|
||||
QueryEngineContext::new(state, query_ctx)
|
||||
}
|
||||
|
||||
|
||||
@@ -41,8 +41,9 @@ use datafusion::prelude as df_prelude;
|
||||
use datafusion::prelude::{Column, Expr as DfExpr, JoinType};
|
||||
use datafusion::scalar::ScalarValue;
|
||||
use datafusion::sql::TableReference;
|
||||
use datafusion_common::DFSchema;
|
||||
use datafusion_expr::utils::conjunction;
|
||||
use datafusion_expr::{col, lit, SortExpr};
|
||||
use datafusion_expr::{col, lit, ExprSchemable, SortExpr};
|
||||
use datatypes::arrow::datatypes::{DataType as ArrowDataType, TimeUnit as ArrowTimeUnit};
|
||||
use datatypes::data_type::ConcreteDataType;
|
||||
use itertools::Itertools;
|
||||
@@ -336,7 +337,7 @@ impl PromPlanner {
|
||||
|
||||
let group_exprs = self.agg_modifier_to_col(input.schema(), modifier, false)?;
|
||||
|
||||
let val = Self::get_param_value_as_f64(*op, param)?;
|
||||
let val = Self::get_param_as_literal_expr(param, Some(*op), Some(ArrowDataType::Float64))?;
|
||||
|
||||
// convert op and value columns to window exprs.
|
||||
let window_exprs = self.create_window_exprs(*op, group_exprs.clone(), &input)?;
|
||||
@@ -354,7 +355,7 @@ impl PromPlanner {
|
||||
let predicate = DfExpr::BinaryExpr(BinaryExpr {
|
||||
left: Box::new(col(rank)),
|
||||
op: Operator::LtEq,
|
||||
right: Box::new(lit(val)),
|
||||
right: Box::new(val.clone()),
|
||||
});
|
||||
|
||||
match expr {
|
||||
@@ -1946,8 +1947,9 @@ impl PromPlanner {
|
||||
let aggr = match op.id() {
|
||||
token::T_SUM => sum_udaf(),
|
||||
token::T_QUANTILE => {
|
||||
let q = Self::get_param_value_as_f64(op, param)?;
|
||||
non_col_args.push(lit(q));
|
||||
let q =
|
||||
Self::get_param_as_literal_expr(param, Some(op), Some(ArrowDataType::Float64))?;
|
||||
non_col_args.push(q);
|
||||
quantile_udaf()
|
||||
}
|
||||
token::T_AVG => avg_udaf(),
|
||||
@@ -2027,20 +2029,50 @@ impl PromPlanner {
|
||||
Ok(val)
|
||||
}
|
||||
|
||||
fn get_param_value_as_f64(op: TokenType, param: &Option<Box<PromExpr>>) -> Result<f64> {
|
||||
let param = param
|
||||
.as_deref()
|
||||
.with_context(|| FunctionInvalidArgumentSnafu {
|
||||
fn_name: op.to_string(),
|
||||
})?;
|
||||
let PromExpr::NumberLiteral(NumberLiteral { val }) = param else {
|
||||
return FunctionInvalidArgumentSnafu {
|
||||
fn_name: op.to_string(),
|
||||
fn get_param_as_literal_expr(
|
||||
param: &Option<Box<PromExpr>>,
|
||||
op: Option<TokenType>,
|
||||
expected_type: Option<ArrowDataType>,
|
||||
) -> Result<DfExpr> {
|
||||
let prom_param = param.as_deref().with_context(|| {
|
||||
if let Some(op) = op {
|
||||
FunctionInvalidArgumentSnafu {
|
||||
fn_name: op.to_string(),
|
||||
}
|
||||
} else {
|
||||
FunctionInvalidArgumentSnafu {
|
||||
fn_name: "unknown".to_string(),
|
||||
}
|
||||
}
|
||||
.fail();
|
||||
};
|
||||
})?;
|
||||
|
||||
Ok(*val)
|
||||
let expr = Self::try_build_literal_expr(prom_param).with_context(|| {
|
||||
if let Some(op) = op {
|
||||
FunctionInvalidArgumentSnafu {
|
||||
fn_name: op.to_string(),
|
||||
}
|
||||
} else {
|
||||
FunctionInvalidArgumentSnafu {
|
||||
fn_name: "unknown".to_string(),
|
||||
}
|
||||
}
|
||||
})?;
|
||||
|
||||
// check if the type is expected
|
||||
if let Some(expected_type) = expected_type {
|
||||
// literal should not have reference to column
|
||||
let expr_type = expr
|
||||
.get_type(&DFSchema::empty())
|
||||
.context(DataFusionPlanningSnafu)?;
|
||||
if expected_type != expr_type {
|
||||
return FunctionInvalidArgumentSnafu {
|
||||
fn_name: format!("expected {expected_type:?}, but found {expr_type:?}"),
|
||||
}
|
||||
.fail();
|
||||
}
|
||||
}
|
||||
|
||||
Ok(expr)
|
||||
}
|
||||
|
||||
/// Create [DfExpr::WindowFunction] expr for each value column with given window function.
|
||||
@@ -2096,6 +2128,28 @@ impl PromPlanner {
|
||||
Ok(normalized_exprs)
|
||||
}
|
||||
|
||||
/// Try to build a [f64] from [PromExpr].
|
||||
#[deprecated(
|
||||
note = "use `Self::get_param_as_literal_expr` instead. This is only for `create_histogram_plan`"
|
||||
)]
|
||||
fn try_build_float_literal(expr: &PromExpr) -> Option<f64> {
|
||||
match expr {
|
||||
PromExpr::NumberLiteral(NumberLiteral { val }) => Some(*val),
|
||||
PromExpr::Paren(ParenExpr { expr }) => Self::try_build_float_literal(expr),
|
||||
PromExpr::Unary(UnaryExpr { expr, .. }) => {
|
||||
Self::try_build_float_literal(expr).map(|f| -f)
|
||||
}
|
||||
PromExpr::StringLiteral(_)
|
||||
| PromExpr::Binary(_)
|
||||
| PromExpr::VectorSelector(_)
|
||||
| PromExpr::MatrixSelector(_)
|
||||
| PromExpr::Call(_)
|
||||
| PromExpr::Extension(_)
|
||||
| PromExpr::Aggregate(_)
|
||||
| PromExpr::Subquery(_) => None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Create a [SPECIAL_HISTOGRAM_QUANTILE] plan.
|
||||
async fn create_histogram_plan(
|
||||
&mut self,
|
||||
@@ -2108,11 +2162,13 @@ impl PromPlanner {
|
||||
}
|
||||
.fail();
|
||||
}
|
||||
#[allow(deprecated)]
|
||||
let phi = Self::try_build_float_literal(&args.args[0]).with_context(|| {
|
||||
FunctionInvalidArgumentSnafu {
|
||||
fn_name: SPECIAL_HISTOGRAM_QUANTILE.to_string(),
|
||||
}
|
||||
})?;
|
||||
|
||||
let input = args.args[1].as_ref().clone();
|
||||
let input_plan = self.prom_expr_to_plan(&input, session_state).await?;
|
||||
|
||||
@@ -2163,11 +2219,7 @@ impl PromPlanner {
|
||||
}
|
||||
.fail();
|
||||
}
|
||||
let lit = Self::try_build_float_literal(&args.args[0]).with_context(|| {
|
||||
FunctionInvalidArgumentSnafu {
|
||||
fn_name: SPECIAL_VECTOR_FUNCTION.to_string(),
|
||||
}
|
||||
})?;
|
||||
let lit = Self::get_param_as_literal_expr(&Some(args.args[0].clone()), None, None)?;
|
||||
|
||||
// reuse `SPECIAL_TIME_FUNCTION` as name of time index column
|
||||
self.ctx.time_index_column = Some(SPECIAL_TIME_FUNCTION.to_string());
|
||||
@@ -2182,7 +2234,7 @@ impl PromPlanner {
|
||||
self.ctx.interval,
|
||||
SPECIAL_TIME_FUNCTION.to_string(),
|
||||
GREPTIME_VALUE.to_string(),
|
||||
Some(DfExpr::Literal(ScalarValue::Float64(Some(lit)))),
|
||||
Some(lit),
|
||||
)
|
||||
.context(DataFusionPlanningSnafu)?,
|
||||
),
|
||||
@@ -2301,25 +2353,6 @@ impl PromPlanner {
|
||||
}
|
||||
}
|
||||
|
||||
/// Try to build a [f64] from [PromExpr].
|
||||
fn try_build_float_literal(expr: &PromExpr) -> Option<f64> {
|
||||
match expr {
|
||||
PromExpr::NumberLiteral(NumberLiteral { val }) => Some(*val),
|
||||
PromExpr::Paren(ParenExpr { expr }) => Self::try_build_float_literal(expr),
|
||||
PromExpr::Unary(UnaryExpr { expr, .. }) => {
|
||||
Self::try_build_float_literal(expr).map(|f| -f)
|
||||
}
|
||||
PromExpr::StringLiteral(_)
|
||||
| PromExpr::Binary(_)
|
||||
| PromExpr::VectorSelector(_)
|
||||
| PromExpr::MatrixSelector(_)
|
||||
| PromExpr::Call(_)
|
||||
| PromExpr::Extension(_)
|
||||
| PromExpr::Aggregate(_)
|
||||
| PromExpr::Subquery(_) => None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Return a lambda to build binary expression from token.
|
||||
/// Because some binary operator are function in DataFusion like `atan2` or `^`.
|
||||
#[allow(clippy::type_complexity)]
|
||||
|
||||
@@ -88,7 +88,10 @@ opensrv-mysql = { git = "https://github.com/datafuselabs/opensrv", rev = "a1fb4d
|
||||
opentelemetry-proto.workspace = true
|
||||
otel-arrow-rust.workspace = true
|
||||
parking_lot.workspace = true
|
||||
pgwire = { version = "0.30", default-features = false, features = ["server-api-ring"] }
|
||||
#pgwire = { version = "0.30", default-features = false, features = ["server-api-ring"] }
|
||||
pgwire = { git = "https://github.com/sunng87/pgwire", rev = "127573d997228cfb70c7699881c568eae8131270", default-features = false, features = [
|
||||
"server-api-ring",
|
||||
] }
|
||||
pin-project = "1.0"
|
||||
pipeline.workspace = true
|
||||
postgres-types = { version = "0.2", features = ["with-chrono-0_4", "with-serde_json-1"] }
|
||||
|
||||
@@ -1 +1 @@
|
||||
v0.9.2
|
||||
v0.9.3
|
||||
|
||||
@@ -13,7 +13,9 @@
|
||||
// limitations under the License.
|
||||
|
||||
use api::v1::frontend::frontend_server::Frontend;
|
||||
use api::v1::frontend::{ListProcessRequest, ListProcessResponse};
|
||||
use api::v1::frontend::{
|
||||
KillProcessRequest, KillProcessResponse, ListProcessRequest, ListProcessResponse,
|
||||
};
|
||||
use catalog::process_manager::ProcessManagerRef;
|
||||
use common_telemetry::error;
|
||||
use tonic::{Code, Request, Response, Status};
|
||||
@@ -41,12 +43,27 @@ impl Frontend for FrontendGrpcHandler {
|
||||
} else {
|
||||
Some(list_process_request.catalog.as_str())
|
||||
};
|
||||
match self.process_manager.local_processes(catalog) {
|
||||
Ok(processes) => Ok(Response::new(ListProcessResponse { processes })),
|
||||
Err(e) => {
|
||||
error!(e; "Failed to handle list process request");
|
||||
Err(Status::new(Code::Internal, e.to_string()))
|
||||
}
|
||||
}
|
||||
let processes = self.process_manager.local_processes(catalog).map_err(|e| {
|
||||
error!(e; "Failed to handle list process request");
|
||||
Status::new(Code::Internal, e.to_string())
|
||||
})?;
|
||||
Ok(Response::new(ListProcessResponse { processes }))
|
||||
}
|
||||
|
||||
async fn kill_process(
|
||||
&self,
|
||||
request: Request<KillProcessRequest>,
|
||||
) -> Result<Response<KillProcessResponse>, Status> {
|
||||
let req = request.into_inner();
|
||||
let success = self
|
||||
.process_manager
|
||||
.kill_process(req.server_addr, req.catalog, req.process_id)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
error!(e; "Failed to handle kill process request");
|
||||
Status::new(Code::Internal, e.to_string())
|
||||
})?;
|
||||
|
||||
Ok(Response::new(KillProcessResponse { success }))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -40,7 +40,7 @@ use futures_util::StreamExt;
|
||||
use session::context::{QueryContext, QueryContextBuilder, QueryContextRef};
|
||||
use session::hints::READ_PREFERENCE_HINT;
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
use table::metadata::TableId;
|
||||
use table::TableRef;
|
||||
use tokio::sync::mpsc;
|
||||
|
||||
use crate::error::Error::UnsupportedAuthScheme;
|
||||
@@ -149,8 +149,8 @@ impl GreptimeRequestHandler {
|
||||
.clone()
|
||||
.unwrap_or_else(common_runtime::global_runtime);
|
||||
runtime.spawn(async move {
|
||||
// Cached table id
|
||||
let mut table_id: Option<TableId> = None;
|
||||
// Cached table ref
|
||||
let mut table_ref: Option<TableRef> = None;
|
||||
|
||||
let mut decoder = FlightDecoder::default();
|
||||
while let Some(request) = stream.next().await {
|
||||
@@ -169,7 +169,7 @@ impl GreptimeRequestHandler {
|
||||
|
||||
let timer = metrics::GRPC_BULK_INSERT_ELAPSED.start_timer();
|
||||
let result = handler
|
||||
.put_record_batch(&table_name, &mut table_id, &mut decoder, data)
|
||||
.put_record_batch(&table_name, &mut table_ref, &mut decoder, data)
|
||||
.await
|
||||
.inspect_err(|e| error!(e; "Failed to handle flight record batches"));
|
||||
timer.observe_duration();
|
||||
|
||||
@@ -33,6 +33,7 @@ use common_telemetry::{error, warn};
|
||||
use datatypes::value::column_data_to_json;
|
||||
use headers::ContentType;
|
||||
use lazy_static::lazy_static;
|
||||
use mime_guess::mime;
|
||||
use pipeline::util::to_pipeline_version;
|
||||
use pipeline::{
|
||||
ContextReq, GreptimePipelineParams, PipelineContext, PipelineDefinition, Value as PipelineValue,
|
||||
@@ -47,7 +48,9 @@ use crate::error::{
|
||||
status_code_to_http_status, Error, InvalidParameterSnafu, ParseJsonSnafu, PipelineSnafu, Result,
|
||||
};
|
||||
use crate::http::header::constants::GREPTIME_PIPELINE_PARAMS_HEADER;
|
||||
use crate::http::header::{CONTENT_TYPE_NDJSON_STR, CONTENT_TYPE_PROTOBUF_STR};
|
||||
use crate::http::header::{
|
||||
CONTENT_TYPE_NDJSON_STR, CONTENT_TYPE_NDJSON_SUBTYPE_STR, CONTENT_TYPE_PROTOBUF_STR,
|
||||
};
|
||||
use crate::http::result::greptime_manage_resp::GreptimedbManageResponse;
|
||||
use crate::http::result::greptime_result_v1::GreptimedbV1Response;
|
||||
use crate::http::HttpResponse;
|
||||
@@ -665,12 +668,13 @@ impl TryFrom<&ContentType> for EventPayloadResolverInner {
|
||||
type Error = Error;
|
||||
|
||||
fn try_from(content_type: &ContentType) -> Result<Self> {
|
||||
match content_type {
|
||||
x if *x == *JSON_CONTENT_TYPE => Ok(EventPayloadResolverInner::Json),
|
||||
x if *x == *NDJSON_CONTENT_TYPE => Ok(EventPayloadResolverInner::Ndjson),
|
||||
x if *x == *TEXT_CONTENT_TYPE || *x == *TEXT_UTF8_CONTENT_TYPE => {
|
||||
Ok(EventPayloadResolverInner::Text)
|
||||
let mime: mime_guess::Mime = content_type.clone().into();
|
||||
match (mime.type_(), mime.subtype()) {
|
||||
(mime::APPLICATION, mime::JSON) => Ok(EventPayloadResolverInner::Json),
|
||||
(mime::APPLICATION, subtype) if subtype == CONTENT_TYPE_NDJSON_SUBTYPE_STR => {
|
||||
Ok(EventPayloadResolverInner::Ndjson)
|
||||
}
|
||||
(mime::TEXT, mime::PLAIN) => Ok(EventPayloadResolverInner::Text),
|
||||
_ => InvalidParameterSnafu {
|
||||
reason: format!(
|
||||
"invalid content type: {}, expected: one of {}",
|
||||
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user