Compare commits

..

3 Commits

Author SHA1 Message Date
discord9
af90fad499 feat: truly limit time range by split window 2025-06-11 15:26:47 +08:00
discord9
22c61432f6 feat: wait for longer anyway&wait metrics 2025-06-11 15:20:47 +08:00
discord9
91f373e66e metrics: more generic metrics batching mode 2025-06-11 15:20:47 +08:00
207 changed files with 1510 additions and 4789 deletions

View File

@@ -10,13 +10,13 @@ inputs:
meta-replicas:
default: 2
description: "Number of Metasrv replicas"
image-registry:
image-registry:
default: "docker.io"
description: "Image registry"
image-repository:
image-repository:
default: "greptime/greptimedb"
description: "Image repository"
image-tag:
image-tag:
default: "latest"
description: 'Image tag'
etcd-endpoints:
@@ -32,12 +32,12 @@ runs:
steps:
- name: Install GreptimeDB operator
uses: nick-fields/retry@v3
with:
with:
timeout_minutes: 3
max_attempts: 3
shell: bash
command: |
helm repo add greptime https://greptimeteam.github.io/helm-charts/
helm repo add greptime https://greptimeteam.github.io/helm-charts/
helm repo update
helm upgrade \
--install \
@@ -48,10 +48,10 @@ runs:
--wait-for-jobs
- name: Install GreptimeDB cluster
shell: bash
run: |
run: |
helm upgrade \
--install my-greptimedb \
--set meta.backendStorage.etcd.endpoints=${{ inputs.etcd-endpoints }} \
--set meta.etcdEndpoints=${{ inputs.etcd-endpoints }} \
--set meta.enableRegionFailover=${{ inputs.enable-region-failover }} \
--set image.registry=${{ inputs.image-registry }} \
--set image.repository=${{ inputs.image-repository }} \
@@ -72,7 +72,7 @@ runs:
- name: Wait for GreptimeDB
shell: bash
run: |
while true; do
while true; do
PHASE=$(kubectl -n my-greptimedb get gtc my-greptimedb -o jsonpath='{.status.clusterPhase}')
if [ "$PHASE" == "Running" ]; then
echo "Cluster is ready"
@@ -86,10 +86,10 @@ runs:
- name: Print GreptimeDB info
if: always()
shell: bash
run: |
run: |
kubectl get all --show-labels -n my-greptimedb
- name: Describe Nodes
if: always()
shell: bash
run: |
run: |
kubectl describe nodes

View File

@@ -68,7 +68,7 @@ function deploy_greptimedb_cluster() {
helm install "$cluster_name" greptime/greptimedb-cluster \
--set image.tag="$GREPTIMEDB_IMAGE_TAG" \
--set meta.backendStorage.etcd.endpoints="etcd.$install_namespace:2379" \
--set meta.etcdEndpoints="etcd.$install_namespace:2379" \
-n "$install_namespace"
# Wait for greptimedb cluster to be ready.
@@ -103,7 +103,7 @@ function deploy_greptimedb_cluster_with_s3_storage() {
helm install "$cluster_name" greptime/greptimedb-cluster -n "$install_namespace" \
--set image.tag="$GREPTIMEDB_IMAGE_TAG" \
--set meta.backendStorage.etcd.endpoints="etcd.$install_namespace:2379" \
--set meta.etcdEndpoints="etcd.$install_namespace:2379" \
--set storage.s3.bucket="$AWS_CI_TEST_BUCKET" \
--set storage.s3.region="$AWS_REGION" \
--set storage.s3.root="$DATA_ROOT" \

44
Cargo.lock generated
View File

@@ -1621,10 +1621,8 @@ dependencies = [
"cache",
"catalog",
"chrono",
"common-base",
"common-catalog",
"common-error",
"common-frontend",
"common-macro",
"common-meta",
"common-procedure",
@@ -2295,14 +2293,8 @@ version = "0.15.0"
dependencies = [
"async-trait",
"common-error",
"common-grpc",
"common-macro",
"common-meta",
"greptime-proto",
"meta-client",
"snafu 0.8.5",
"tokio",
"tonic 0.12.3",
]
[[package]]
@@ -4709,7 +4701,6 @@ dependencies = [
"common-config",
"common-datasource",
"common-error",
"common-frontend",
"common-function",
"common-grpc",
"common-macro",
@@ -4756,7 +4747,6 @@ dependencies = [
"substrait 0.15.0",
"table",
"tokio",
"tokio-util",
"toml 0.8.19",
"tonic 0.12.3",
"tower 0.5.2",
@@ -5143,7 +5133,7 @@ dependencies = [
[[package]]
name = "greptime-proto"
version = "0.1.0"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=17971523673f4fbc982510d3c9d6647ff642e16f#17971523673f4fbc982510d3c9d6647ff642e16f"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=454c52634c3bac27de10bf0d85d5533eed1cf03f#454c52634c3bac27de10bf0d85d5533eed1cf03f"
dependencies = [
"prost 0.13.5",
"serde",
@@ -7250,7 +7240,6 @@ dependencies = [
"humantime-serde",
"itertools 0.14.0",
"lazy_static",
"mito-codec",
"mito2",
"mur3",
"object-store",
@@ -7316,29 +7305,6 @@ dependencies = [
"windows-sys 0.52.0",
]
[[package]]
name = "mito-codec"
version = "0.15.0"
dependencies = [
"api",
"bytes",
"common-base",
"common-decimal",
"common-error",
"common-macro",
"common-recordbatch",
"common-telemetry",
"common-time",
"datafusion-common",
"datafusion-expr",
"datatypes",
"memcomparable",
"paste",
"serde",
"snafu 0.8.5",
"store-api",
]
[[package]]
name = "mito2"
version = "0.15.0"
@@ -7381,7 +7347,6 @@ dependencies = [
"lazy_static",
"log-store",
"memcomparable",
"mito-codec",
"moka",
"object-store",
"parquet",
@@ -8421,7 +8386,6 @@ dependencies = [
"common-catalog",
"common-datasource",
"common-error",
"common-frontend",
"common-function",
"common-grpc",
"common-grpc-expr",
@@ -8921,8 +8885,9 @@ dependencies = [
[[package]]
name = "pgwire"
version = "0.30.2"
source = "git+https://github.com/sunng87/pgwire?rev=127573d997228cfb70c7699881c568eae8131270#127573d997228cfb70c7699881c568eae8131270"
version = "0.30.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ec79ee18e6cafde8698885646780b967ecc905120798b8359dd0da64f9688e89"
dependencies = [
"async-trait",
"bytes",
@@ -11170,7 +11135,6 @@ dependencies = [
"common-catalog",
"common-config",
"common-error",
"common-frontend",
"common-grpc",
"common-macro",
"common-mem-prof",

View File

@@ -49,7 +49,6 @@ members = [
"src/meta-client",
"src/meta-srv",
"src/metric-engine",
"src/mito-codec",
"src/mito2",
"src/object-store",
"src/operator",
@@ -134,7 +133,7 @@ etcd-client = "0.14"
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "17971523673f4fbc982510d3c9d6647ff642e16f" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "454c52634c3bac27de10bf0d85d5533eed1cf03f" }
hex = "0.4"
http = "1"
humantime = "2.1"
@@ -275,7 +274,6 @@ log-store = { path = "src/log-store" }
meta-client = { path = "src/meta-client" }
meta-srv = { path = "src/meta-srv" }
metric-engine = { path = "src/metric-engine" }
mito-codec = { path = "src/mito-codec" }
mito2 = { path = "src/mito2" }
object-store = { path = "src/object-store" }
operator = { path = "src/operator" }

View File

@@ -232,7 +232,7 @@
| `grpc.bind_addr` | String | `127.0.0.1:4001` | The address to bind the gRPC server. |
| `grpc.server_addr` | String | `127.0.0.1:4001` | The address advertised to the metasrv, and used for connections from outside the host.<br/>If left empty or unset, the server will automatically use the IP address of the first network interface<br/>on the host, with the same port number as the one specified in `grpc.bind_addr`. |
| `grpc.runtime_size` | Integer | `8` | The number of server worker threads. |
| `grpc.flight_compression` | String | `arrow_ipc` | Compression mode for frontend side Arrow IPC service. Available options:<br/>- `none`: disable all compression<br/>- `transport`: only enable gRPC transport compression (zstd)<br/>- `arrow_ipc`: only enable Arrow IPC compression (lz4)<br/>- `all`: enable all compression.<br/>Default to `none` |
| `grpc.flight_compression` | String | `arrow_ipc` | Compression mode for frontend side Arrow IPC service. Available options:<br/>- `none`: disable all compression<br/>- `transport`: only enable gRPC transport compression (zstd)<br/>- `arrow_ipc`: only enable Arrow IPC compression (lz4)<br/>- `all`: enable all compression. |
| `grpc.tls` | -- | -- | gRPC server TLS options, see `mysql.tls` section. |
| `grpc.tls.mode` | String | `disable` | TLS mode. |
| `grpc.tls.cert_path` | String | Unset | Certificate file path. |
@@ -405,7 +405,7 @@
| `grpc.runtime_size` | Integer | `8` | The number of server worker threads. |
| `grpc.max_recv_message_size` | String | `512MB` | The maximum receive message size for gRPC server. |
| `grpc.max_send_message_size` | String | `512MB` | The maximum send message size for gRPC server. |
| `grpc.flight_compression` | String | `arrow_ipc` | Compression mode for datanode side Arrow IPC service. Available options:<br/>- `none`: disable all compression<br/>- `transport`: only enable gRPC transport compression (zstd)<br/>- `arrow_ipc`: only enable Arrow IPC compression (lz4)<br/>- `all`: enable all compression.<br/>Default to `none` |
| `grpc.flight_compression` | String | `arrow_ipc` | Compression mode for datanode side Arrow IPC service. Available options:<br/>- `none`: disable all compression<br/>- `transport`: only enable gRPC transport compression (zstd)<br/>- `arrow_ipc`: only enable Arrow IPC compression (lz4)<br/>- `all`: enable all compression. |
| `grpc.tls` | -- | -- | gRPC server TLS options, see `mysql.tls` section. |
| `grpc.tls.mode` | String | `disable` | TLS mode. |
| `grpc.tls.cert_path` | String | Unset | Certificate file path. |

View File

@@ -9,7 +9,7 @@ We highly recommend using the self-monitoring feature provided by [GreptimeDB Op
- **Metrics Dashboards**
- `dashboards/metrics/cluster/dashboard.json`: The Grafana dashboard for the GreptimeDB cluster. Read the [dashboard.md](./dashboards/metrics/cluster/dashboard.md) for more details.
- `dashboards/metrics/standalone/dashboard.json`: The Grafana dashboard for the standalone GreptimeDB instance. **It's generated from the `cluster/dashboard.json` by removing the instance filter through the `make dashboards` command**. Read the [dashboard.md](./dashboards/metrics/standalone/dashboard.md) for more details.
- **Logs Dashboard**
@@ -83,7 +83,7 @@ If you use the [Helm Chart](https://github.com/GreptimeTeam/helm-charts) to depl
- `monitoring.enabled=true`: Deploys a standalone GreptimeDB instance dedicated to monitoring the cluster;
- `grafana.enabled=true`: Deploys Grafana and automatically imports the monitoring dashboard;
The standalone GreptimeDB instance will collect metrics from your cluster, and the dashboard will be available in the Grafana UI. For detailed deployment instructions, please refer to our [Kubernetes deployment guide](https://docs.greptime.com/user-guide/deployments-administration/deploy-on-kubernetes/getting-started).
The standalone GreptimeDB instance will collect metrics from your cluster, and the dashboard will be available in the Grafana UI. For detailed deployment instructions, please refer to our [Kubernetes deployment guide](https://docs.greptime.com/nightly/user-guide/deployments/deploy-on-kubernetes/getting-started).
### Self-host Prometheus and import dashboards manually

View File

@@ -17,10 +17,8 @@ arrow-schema.workspace = true
async-stream.workspace = true
async-trait.workspace = true
bytes.workspace = true
common-base.workspace = true
common-catalog.workspace = true
common-error.workspace = true
common-frontend.workspace = true
common-macro.workspace = true
common-meta.workspace = true
common-procedure.workspace = true

View File

@@ -277,26 +277,6 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to invoke frontend services"))]
InvokeFrontend {
source: common_frontend::error::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Meta client is not provided"))]
MetaClientMissing {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to find frontend node: {}", addr))]
FrontendNotFound {
addr: String,
#[snafu(implicit)]
location: Location,
},
}
impl Error {
@@ -365,10 +345,6 @@ impl ErrorExt for Error {
Error::GetViewCache { source, .. } | Error::GetTableCache { source, .. } => {
source.status_code()
}
Error::InvokeFrontend { source, .. } => source.status_code(),
Error::FrontendNotFound { .. } | Error::MetaClientMissing { .. } => {
StatusCode::Unexpected
}
}
}

View File

@@ -51,7 +51,6 @@ use crate::error::{
};
use crate::information_schema::{InformationExtensionRef, InformationSchemaProvider};
use crate::kvbackend::TableCacheRef;
use crate::process_manager::ProcessManagerRef;
use crate::system_schema::pg_catalog::PGCatalogProvider;
use crate::system_schema::SystemSchemaProvider;
use crate::CatalogManager;
@@ -85,7 +84,6 @@ impl KvBackendCatalogManager {
backend: KvBackendRef,
cache_registry: LayeredCacheRegistryRef,
procedure_manager: Option<ProcedureManagerRef>,
process_manager: Option<ProcessManagerRef>,
) -> Arc<Self> {
Arc::new_cyclic(|me| Self {
information_extension,
@@ -104,14 +102,12 @@ impl KvBackendCatalogManager {
DEFAULT_CATALOG_NAME.to_string(),
me.clone(),
Arc::new(FlowMetadataManager::new(backend.clone())),
process_manager.clone(),
)),
pg_catalog_provider: Arc::new(PGCatalogProvider::new(
DEFAULT_CATALOG_NAME.to_string(),
me.clone(),
)),
backend,
process_manager,
},
cache_registry,
procedure_manager,
@@ -423,7 +419,6 @@ struct SystemCatalog {
information_schema_provider: Arc<InformationSchemaProvider>,
pg_catalog_provider: Arc<PGCatalogProvider>,
backend: KvBackendRef,
process_manager: Option<ProcessManagerRef>,
}
impl SystemCatalog {
@@ -491,7 +486,6 @@ impl SystemCatalog {
catalog.to_string(),
self.catalog_manager.clone(),
Arc::new(FlowMetadataManager::new(self.backend.clone())),
self.process_manager.clone(),
))
});
information_schema_provider.table(table_name)

View File

@@ -40,7 +40,6 @@ pub mod information_schema {
pub use crate::system_schema::information_schema::*;
}
pub mod process_manager;
pub mod table_source;
#[async_trait::async_trait]

View File

@@ -356,7 +356,6 @@ impl MemoryCatalogManager {
catalog,
Arc::downgrade(self) as Weak<dyn CatalogManager>,
Arc::new(FlowMetadataManager::new(Arc::new(MemoryKvBackend::new()))),
None, // we don't need ProcessManager on regions server.
);
let information_schema = information_schema_provider.tables().clone();

View File

@@ -34,20 +34,4 @@ lazy_static! {
register_histogram!("greptime_catalog_kv_get", "catalog kv get").unwrap();
pub static ref METRIC_CATALOG_KV_BATCH_GET: Histogram =
register_histogram!("greptime_catalog_kv_batch_get", "catalog kv batch get").unwrap();
/// Count of running process in each catalog.
pub static ref PROCESS_LIST_COUNT: IntGaugeVec = register_int_gauge_vec!(
"greptime_process_list_count",
"Running process count per catalog",
&["catalog"]
)
.unwrap();
/// Count of killed process in each catalog.
pub static ref PROCESS_KILL_COUNT: IntCounterVec = register_int_counter_vec!(
"greptime_process_kill_count",
"Completed kill process requests count",
&["catalog"]
)
.unwrap();
}

View File

@@ -1,488 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::fmt::{Debug, Formatter};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, RwLock};
use api::v1::frontend::{KillProcessRequest, ListProcessRequest, ProcessInfo};
use common_base::cancellation::CancellationHandle;
use common_frontend::selector::{FrontendSelector, MetaClientSelector};
use common_telemetry::{debug, info};
use common_time::util::current_time_millis;
use meta_client::MetaClientRef;
use snafu::{ensure, OptionExt, ResultExt};
use crate::error;
use crate::metrics::{PROCESS_KILL_COUNT, PROCESS_LIST_COUNT};
pub type ProcessManagerRef = Arc<ProcessManager>;
/// Query process manager.
pub struct ProcessManager {
/// Local frontend server address,
server_addr: String,
/// Next process id for local queries.
next_id: AtomicU64,
/// Running process per catalog.
catalogs: RwLock<HashMap<String, HashMap<u64, CancellableProcess>>>,
/// Frontend selector to locate frontend nodes.
frontend_selector: Option<MetaClientSelector>,
}
impl ProcessManager {
/// Create a [ProcessManager] instance with server address and kv client.
pub fn new(server_addr: String, meta_client: Option<MetaClientRef>) -> Self {
let frontend_selector = meta_client.map(MetaClientSelector::new);
Self {
server_addr,
next_id: Default::default(),
catalogs: Default::default(),
frontend_selector,
}
}
}
impl ProcessManager {
/// Registers a submitted query. Use the provided id if present.
#[must_use]
pub fn register_query(
self: &Arc<Self>,
catalog: String,
schemas: Vec<String>,
query: String,
client: String,
id: Option<u64>,
) -> Ticket {
let id = id.unwrap_or_else(|| self.next_id.fetch_add(1, Ordering::Relaxed));
let process = ProcessInfo {
id,
catalog: catalog.clone(),
schemas,
query,
start_timestamp: current_time_millis(),
client,
frontend: self.server_addr.clone(),
};
let cancellation_handle = Arc::new(CancellationHandle::default());
let cancellable_process = CancellableProcess::new(cancellation_handle.clone(), process);
self.catalogs
.write()
.unwrap()
.entry(catalog.clone())
.or_default()
.insert(id, cancellable_process);
Ticket {
catalog,
manager: self.clone(),
id,
cancellation_handle,
}
}
/// Generates the next process id.
pub fn next_id(&self) -> u64 {
self.next_id.fetch_add(1, Ordering::Relaxed)
}
/// De-register a query from process list.
pub fn deregister_query(&self, catalog: String, id: u64) {
if let Entry::Occupied(mut o) = self.catalogs.write().unwrap().entry(catalog) {
let process = o.get_mut().remove(&id);
debug!("Deregister process: {:?}", process);
if o.get().is_empty() {
o.remove();
}
}
}
/// List local running processes in given catalog.
pub fn local_processes(&self, catalog: Option<&str>) -> error::Result<Vec<ProcessInfo>> {
let catalogs = self.catalogs.read().unwrap();
let result = if let Some(catalog) = catalog {
if let Some(catalogs) = catalogs.get(catalog) {
catalogs.values().map(|p| p.process.clone()).collect()
} else {
vec![]
}
} else {
catalogs
.values()
.flat_map(|v| v.values().map(|p| p.process.clone()))
.collect()
};
Ok(result)
}
pub async fn list_all_processes(
&self,
catalog: Option<&str>,
) -> error::Result<Vec<ProcessInfo>> {
let mut processes = vec![];
if let Some(remote_frontend_selector) = self.frontend_selector.as_ref() {
let frontends = remote_frontend_selector
.select(|node| node.peer.addr != self.server_addr)
.await
.context(error::InvokeFrontendSnafu)?;
for mut f in frontends {
processes.extend(
f.list_process(ListProcessRequest {
catalog: catalog.unwrap_or_default().to_string(),
})
.await
.context(error::InvokeFrontendSnafu)?
.processes,
);
}
}
processes.extend(self.local_processes(catalog)?);
Ok(processes)
}
/// Kills query with provided catalog and id.
pub async fn kill_process(
&self,
server_addr: String,
catalog: String,
id: u64,
) -> error::Result<bool> {
if server_addr == self.server_addr {
if let Some(catalogs) = self.catalogs.write().unwrap().get_mut(&catalog) {
if let Some(process) = catalogs.remove(&id) {
process.handle.cancel();
info!(
"Killed process, catalog: {}, id: {:?}",
process.process.catalog, process.process.id
);
PROCESS_KILL_COUNT.with_label_values(&[&catalog]).inc();
Ok(true)
} else {
debug!("Failed to kill process, id not found: {}", id);
Ok(false)
}
} else {
debug!("Failed to kill process, catalog not found: {}", catalog);
Ok(false)
}
} else {
let mut nodes = self
.frontend_selector
.as_ref()
.context(error::MetaClientMissingSnafu)?
.select(|node| node.peer.addr == server_addr)
.await
.context(error::InvokeFrontendSnafu)?;
ensure!(
!nodes.is_empty(),
error::FrontendNotFoundSnafu { addr: server_addr }
);
let request = KillProcessRequest {
server_addr,
catalog,
process_id: id,
};
nodes[0]
.kill_process(request)
.await
.context(error::InvokeFrontendSnafu)?;
Ok(true)
}
}
}
pub struct Ticket {
pub(crate) catalog: String,
pub(crate) manager: ProcessManagerRef,
pub(crate) id: u64,
pub cancellation_handle: Arc<CancellationHandle>,
}
impl Drop for Ticket {
fn drop(&mut self) {
self.manager
.deregister_query(std::mem::take(&mut self.catalog), self.id);
}
}
struct CancellableProcess {
handle: Arc<CancellationHandle>,
process: ProcessInfo,
}
impl Drop for CancellableProcess {
fn drop(&mut self) {
PROCESS_LIST_COUNT
.with_label_values(&[&self.process.catalog])
.dec();
}
}
impl CancellableProcess {
fn new(handle: Arc<CancellationHandle>, process: ProcessInfo) -> Self {
PROCESS_LIST_COUNT
.with_label_values(&[&process.catalog])
.inc();
Self { handle, process }
}
}
impl Debug for CancellableProcess {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("CancellableProcess")
.field("cancelled", &self.handle.is_cancelled())
.field("process", &self.process)
.finish()
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use crate::process_manager::ProcessManager;
#[tokio::test]
async fn test_register_query() {
let process_manager = Arc::new(ProcessManager::new("127.0.0.1:8000".to_string(), None));
let ticket = process_manager.clone().register_query(
"public".to_string(),
vec!["test".to_string()],
"SELECT * FROM table".to_string(),
"".to_string(),
None,
);
let running_processes = process_manager.local_processes(None).unwrap();
assert_eq!(running_processes.len(), 1);
assert_eq!(&running_processes[0].frontend, "127.0.0.1:8000");
assert_eq!(running_processes[0].id, ticket.id);
assert_eq!(&running_processes[0].query, "SELECT * FROM table");
drop(ticket);
assert_eq!(process_manager.local_processes(None).unwrap().len(), 0);
}
#[tokio::test]
async fn test_register_query_with_custom_id() {
let process_manager = Arc::new(ProcessManager::new("127.0.0.1:8000".to_string(), None));
let custom_id = 12345;
let ticket = process_manager.clone().register_query(
"public".to_string(),
vec!["test".to_string()],
"SELECT * FROM table".to_string(),
"client1".to_string(),
Some(custom_id),
);
assert_eq!(ticket.id, custom_id);
let running_processes = process_manager.local_processes(None).unwrap();
assert_eq!(running_processes.len(), 1);
assert_eq!(running_processes[0].id, custom_id);
assert_eq!(&running_processes[0].client, "client1");
}
#[tokio::test]
async fn test_multiple_queries_same_catalog() {
let process_manager = Arc::new(ProcessManager::new("127.0.0.1:8000".to_string(), None));
let ticket1 = process_manager.clone().register_query(
"public".to_string(),
vec!["schema1".to_string()],
"SELECT * FROM table1".to_string(),
"client1".to_string(),
None,
);
let ticket2 = process_manager.clone().register_query(
"public".to_string(),
vec!["schema2".to_string()],
"SELECT * FROM table2".to_string(),
"client2".to_string(),
None,
);
let running_processes = process_manager.local_processes(Some("public")).unwrap();
assert_eq!(running_processes.len(), 2);
// Verify both processes are present
let ids: Vec<u64> = running_processes.iter().map(|p| p.id).collect();
assert!(ids.contains(&ticket1.id));
assert!(ids.contains(&ticket2.id));
}
#[tokio::test]
async fn test_multiple_catalogs() {
let process_manager = Arc::new(ProcessManager::new("127.0.0.1:8000".to_string(), None));
let _ticket1 = process_manager.clone().register_query(
"catalog1".to_string(),
vec!["schema1".to_string()],
"SELECT * FROM table1".to_string(),
"client1".to_string(),
None,
);
let _ticket2 = process_manager.clone().register_query(
"catalog2".to_string(),
vec!["schema2".to_string()],
"SELECT * FROM table2".to_string(),
"client2".to_string(),
None,
);
// Test listing processes for specific catalog
let catalog1_processes = process_manager.local_processes(Some("catalog1")).unwrap();
assert_eq!(catalog1_processes.len(), 1);
assert_eq!(&catalog1_processes[0].catalog, "catalog1");
let catalog2_processes = process_manager.local_processes(Some("catalog2")).unwrap();
assert_eq!(catalog2_processes.len(), 1);
assert_eq!(&catalog2_processes[0].catalog, "catalog2");
// Test listing all processes
let all_processes = process_manager.local_processes(None).unwrap();
assert_eq!(all_processes.len(), 2);
}
#[tokio::test]
async fn test_deregister_query() {
let process_manager = Arc::new(ProcessManager::new("127.0.0.1:8000".to_string(), None));
let ticket = process_manager.clone().register_query(
"public".to_string(),
vec!["test".to_string()],
"SELECT * FROM table".to_string(),
"client1".to_string(),
None,
);
assert_eq!(process_manager.local_processes(None).unwrap().len(), 1);
process_manager.deregister_query("public".to_string(), ticket.id);
assert_eq!(process_manager.local_processes(None).unwrap().len(), 0);
}
#[tokio::test]
async fn test_cancellation_handle() {
let process_manager = Arc::new(ProcessManager::new("127.0.0.1:8000".to_string(), None));
let ticket = process_manager.clone().register_query(
"public".to_string(),
vec!["test".to_string()],
"SELECT * FROM table".to_string(),
"client1".to_string(),
None,
);
assert!(!ticket.cancellation_handle.is_cancelled());
ticket.cancellation_handle.cancel();
assert!(ticket.cancellation_handle.is_cancelled());
}
#[tokio::test]
async fn test_kill_local_process() {
let process_manager = Arc::new(ProcessManager::new("127.0.0.1:8000".to_string(), None));
let ticket = process_manager.clone().register_query(
"public".to_string(),
vec!["test".to_string()],
"SELECT * FROM table".to_string(),
"client1".to_string(),
None,
);
assert!(!ticket.cancellation_handle.is_cancelled());
let killed = process_manager
.kill_process(
"127.0.0.1:8000".to_string(),
"public".to_string(),
ticket.id,
)
.await
.unwrap();
assert!(killed);
assert_eq!(process_manager.local_processes(None).unwrap().len(), 0);
}
#[tokio::test]
async fn test_kill_nonexistent_process() {
let process_manager = Arc::new(ProcessManager::new("127.0.0.1:8000".to_string(), None));
let killed = process_manager
.kill_process("127.0.0.1:8000".to_string(), "public".to_string(), 999)
.await
.unwrap();
assert!(!killed);
}
#[tokio::test]
async fn test_kill_process_nonexistent_catalog() {
let process_manager = Arc::new(ProcessManager::new("127.0.0.1:8000".to_string(), None));
let killed = process_manager
.kill_process("127.0.0.1:8000".to_string(), "nonexistent".to_string(), 1)
.await
.unwrap();
assert!(!killed);
}
#[tokio::test]
async fn test_process_info_fields() {
let process_manager = Arc::new(ProcessManager::new("127.0.0.1:8000".to_string(), None));
let _ticket = process_manager.clone().register_query(
"test_catalog".to_string(),
vec!["schema1".to_string(), "schema2".to_string()],
"SELECT COUNT(*) FROM users WHERE age > 18".to_string(),
"test_client".to_string(),
Some(42),
);
let processes = process_manager.local_processes(None).unwrap();
assert_eq!(processes.len(), 1);
let process = &processes[0];
assert_eq!(process.id, 42);
assert_eq!(&process.catalog, "test_catalog");
assert_eq!(process.schemas, vec!["schema1", "schema2"]);
assert_eq!(&process.query, "SELECT COUNT(*) FROM users WHERE age > 18");
assert_eq!(&process.client, "test_client");
assert_eq!(&process.frontend, "127.0.0.1:8000");
assert!(process.start_timestamp > 0);
}
#[tokio::test]
async fn test_ticket_drop_deregisters_process() {
let process_manager = Arc::new(ProcessManager::new("127.0.0.1:8000".to_string(), None));
{
let _ticket = process_manager.clone().register_query(
"public".to_string(),
vec!["test".to_string()],
"SELECT * FROM table".to_string(),
"client1".to_string(),
None,
);
// Process should be registered
assert_eq!(process_manager.local_processes(None).unwrap().len(), 1);
} // ticket goes out of scope here
// Process should be automatically deregistered
assert_eq!(process_manager.local_processes(None).unwrap().len(), 0);
}
}

View File

@@ -19,7 +19,6 @@ mod information_memory_table;
pub mod key_column_usage;
mod partitions;
mod procedure_info;
mod process_list;
pub mod region_peers;
mod region_statistics;
mod runtime_metrics;
@@ -43,7 +42,6 @@ use common_recordbatch::SendableRecordBatchStream;
use datatypes::schema::SchemaRef;
use lazy_static::lazy_static;
use paste::paste;
use process_list::InformationSchemaProcessList;
use store_api::storage::{ScanRequest, TableId};
use table::metadata::TableType;
use table::TableRef;
@@ -52,7 +50,6 @@ use views::InformationSchemaViews;
use self::columns::InformationSchemaColumns;
use crate::error::{Error, Result};
use crate::process_manager::ProcessManagerRef;
use crate::system_schema::information_schema::cluster_info::InformationSchemaClusterInfo;
use crate::system_schema::information_schema::flows::InformationSchemaFlows;
use crate::system_schema::information_schema::information_memory_table::get_schema_columns;
@@ -116,7 +113,6 @@ macro_rules! setup_memory_table {
pub struct InformationSchemaProvider {
catalog_name: String,
catalog_manager: Weak<dyn CatalogManager>,
process_manager: Option<ProcessManagerRef>,
flow_metadata_manager: Arc<FlowMetadataManager>,
tables: HashMap<String, TableRef>,
}
@@ -211,10 +207,6 @@ impl SystemSchemaProviderInner for InformationSchemaProvider {
self.catalog_manager.clone(),
),
) as _),
PROCESS_LIST => self
.process_manager
.as_ref()
.map(|p| Arc::new(InformationSchemaProcessList::new(p.clone())) as _),
_ => None,
}
}
@@ -225,13 +217,11 @@ impl InformationSchemaProvider {
catalog_name: String,
catalog_manager: Weak<dyn CatalogManager>,
flow_metadata_manager: Arc<FlowMetadataManager>,
process_manager: Option<ProcessManagerRef>,
) -> Self {
let mut provider = Self {
catalog_name,
catalog_manager,
flow_metadata_manager,
process_manager,
tables: HashMap::new(),
};
@@ -287,9 +277,6 @@ impl InformationSchemaProvider {
self.build_table(TABLE_CONSTRAINTS).unwrap(),
);
tables.insert(FLOWS.to_string(), self.build_table(FLOWS).unwrap());
if let Some(process_list) = self.build_table(PROCESS_LIST) {
tables.insert(PROCESS_LIST.to_string(), process_list);
}
// Add memory tables
for name in MEMORY_TABLES.iter() {
tables.insert((*name).to_string(), self.build_table(name).expect(name));

View File

@@ -1,189 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use common_catalog::consts::INFORMATION_SCHEMA_PROCESS_LIST_TABLE_ID;
use common_error::ext::BoxedError;
use common_frontend::DisplayProcessId;
use common_recordbatch::adapter::RecordBatchStreamAdapter;
use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
use common_time::util::current_time_millis;
use common_time::{Duration, Timestamp};
use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
use datatypes::prelude::ConcreteDataType as CDT;
use datatypes::scalars::ScalarVectorBuilder;
use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
use datatypes::value::Value;
use datatypes::vectors::{
DurationMillisecondVectorBuilder, StringVectorBuilder, TimestampMillisecondVectorBuilder,
VectorRef,
};
use snafu::ResultExt;
use store_api::storage::{ScanRequest, TableId};
use crate::error::{self, InternalSnafu};
use crate::information_schema::Predicates;
use crate::process_manager::ProcessManagerRef;
use crate::system_schema::information_schema::InformationTable;
/// Column names of `information_schema.process_list`
const ID: &str = "id";
const CATALOG: &str = "catalog";
const SCHEMAS: &str = "schemas";
const QUERY: &str = "query";
const CLIENT: &str = "client";
const FRONTEND: &str = "frontend";
const START_TIMESTAMP: &str = "start_timestamp";
const ELAPSED_TIME: &str = "elapsed_time";
/// `information_schema.process_list` table implementation that tracks running
/// queries in current cluster.
pub struct InformationSchemaProcessList {
schema: SchemaRef,
process_manager: ProcessManagerRef,
}
impl InformationSchemaProcessList {
pub fn new(process_manager: ProcessManagerRef) -> Self {
Self {
schema: Self::schema(),
process_manager,
}
}
fn schema() -> SchemaRef {
Arc::new(Schema::new(vec![
ColumnSchema::new(ID, CDT::string_datatype(), false),
ColumnSchema::new(CATALOG, CDT::string_datatype(), false),
ColumnSchema::new(SCHEMAS, CDT::string_datatype(), false),
ColumnSchema::new(QUERY, CDT::string_datatype(), false),
ColumnSchema::new(CLIENT, CDT::string_datatype(), false),
ColumnSchema::new(FRONTEND, CDT::string_datatype(), false),
ColumnSchema::new(
START_TIMESTAMP,
CDT::timestamp_millisecond_datatype(),
false,
),
ColumnSchema::new(ELAPSED_TIME, CDT::duration_millisecond_datatype(), false),
]))
}
}
impl InformationTable for InformationSchemaProcessList {
fn table_id(&self) -> TableId {
INFORMATION_SCHEMA_PROCESS_LIST_TABLE_ID
}
fn table_name(&self) -> &'static str {
"process_list"
}
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
fn to_stream(&self, request: ScanRequest) -> error::Result<SendableRecordBatchStream> {
let process_manager = self.process_manager.clone();
let stream = Box::pin(DfRecordBatchStreamAdapter::new(
self.schema.arrow_schema().clone(),
futures::stream::once(async move {
make_process_list(process_manager, request)
.await
.map(RecordBatch::into_df_record_batch)
.map_err(|e| datafusion::error::DataFusionError::External(Box::new(e)))
}),
));
Ok(Box::pin(
RecordBatchStreamAdapter::try_new(stream)
.map_err(BoxedError::new)
.context(InternalSnafu)?,
))
}
}
/// Build running process list.
async fn make_process_list(
process_manager: ProcessManagerRef,
request: ScanRequest,
) -> error::Result<RecordBatch> {
let predicates = Predicates::from_scan_request(&Some(request));
let current_time = current_time_millis();
// todo(hl): find a way to extract user catalog to filter queries from other users.
let queries = process_manager.list_all_processes(None).await?;
let mut id_builder = StringVectorBuilder::with_capacity(queries.len());
let mut catalog_builder = StringVectorBuilder::with_capacity(queries.len());
let mut schemas_builder = StringVectorBuilder::with_capacity(queries.len());
let mut query_builder = StringVectorBuilder::with_capacity(queries.len());
let mut client_builder = StringVectorBuilder::with_capacity(queries.len());
let mut frontend_builder = StringVectorBuilder::with_capacity(queries.len());
let mut start_time_builder = TimestampMillisecondVectorBuilder::with_capacity(queries.len());
let mut elapsed_time_builder = DurationMillisecondVectorBuilder::with_capacity(queries.len());
for process in queries {
let display_id = DisplayProcessId {
server_addr: process.frontend.to_string(),
id: process.id,
}
.to_string();
let schemas = process.schemas.join(",");
let id = Value::from(display_id);
let catalog = Value::from(process.catalog);
let schemas = Value::from(schemas);
let query = Value::from(process.query);
let client = Value::from(process.client);
let frontend = Value::from(process.frontend);
let start_timestamp = Value::from(Timestamp::new_millisecond(process.start_timestamp));
let elapsed_time = Value::from(Duration::new_millisecond(
current_time - process.start_timestamp,
));
let row = [
(ID, &id),
(CATALOG, &catalog),
(SCHEMAS, &schemas),
(QUERY, &query),
(CLIENT, &client),
(FRONTEND, &frontend),
(START_TIMESTAMP, &start_timestamp),
(ELAPSED_TIME, &elapsed_time),
];
if predicates.eval(&row) {
id_builder.push(id.as_string().as_deref());
catalog_builder.push(catalog.as_string().as_deref());
schemas_builder.push(schemas.as_string().as_deref());
query_builder.push(query.as_string().as_deref());
client_builder.push(client.as_string().as_deref());
frontend_builder.push(frontend.as_string().as_deref());
start_time_builder.push(start_timestamp.as_timestamp().map(|t| t.value().into()));
elapsed_time_builder.push(elapsed_time.as_duration().map(|d| d.value().into()));
}
}
RecordBatch::new(
InformationSchemaProcessList::schema(),
vec![
Arc::new(id_builder.finish()) as VectorRef,
Arc::new(catalog_builder.finish()) as VectorRef,
Arc::new(schemas_builder.finish()) as VectorRef,
Arc::new(query_builder.finish()) as VectorRef,
Arc::new(client_builder.finish()) as VectorRef,
Arc::new(frontend_builder.finish()) as VectorRef,
Arc::new(start_time_builder.finish()) as VectorRef,
Arc::new(elapsed_time_builder.finish()) as VectorRef,
],
)
.context(error::CreateRecordBatchSnafu)
}

View File

@@ -47,4 +47,3 @@ pub const VIEWS: &str = "views";
pub const FLOWS: &str = "flows";
pub const PROCEDURE_INFO: &str = "procedure_info";
pub const REGION_STATISTICS: &str = "region_statistics";
pub const PROCESS_LIST: &str = "process_list";

View File

@@ -328,7 +328,6 @@ mod tests {
backend.clone(),
layered_cache_registry,
None,
None,
);
let table_metadata_manager = TableMetadataManager::new(backend);
let mut view_info = common_meta::key::test_utils::new_test_table_info(1024, vec![]);

View File

@@ -1,39 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
mod export;
mod import;
use clap::Subcommand;
use common_error::ext::BoxedError;
use crate::data::export::ExportCommand;
use crate::data::import::ImportCommand;
use crate::Tool;
/// Command for data operations including exporting data from and importing data into GreptimeDB.
#[derive(Subcommand)]
pub enum DataCommand {
Export(ExportCommand),
Import(ImportCommand),
}
impl DataCommand {
pub async fn build(&self) -> std::result::Result<Box<dyn Tool>, BoxedError> {
match self {
DataCommand::Export(cmd) => cmd.build().await,
DataCommand::Import(cmd) => cmd.build().await,
}
}
}

View File

@@ -30,7 +30,6 @@ pub enum Error {
location: Location,
msg: String,
},
#[snafu(display("Failed to create default catalog and schema"))]
InitMetadata {
#[snafu(implicit)]
@@ -229,25 +228,22 @@ pub enum Error {
#[snafu(source)]
error: ObjectStoreError,
},
#[snafu(display("S3 config need be set"))]
S3ConfigNotSet {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Output directory not set"))]
OutputDirNotSet {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Empty store addresses"))]
EmptyStoreAddrs {
#[snafu(display("KV backend not set: {}", backend))]
KvBackendNotSet {
backend: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Unsupported memory backend"))]
UnsupportedMemoryBackend {
#[snafu(implicit)]
@@ -260,13 +256,6 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Invalid arguments: {}", msg))]
InvalidArguments {
msg: String,
#[snafu(implicit)]
location: Location,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -287,7 +276,6 @@ impl ErrorExt for Error {
| Error::EmptyResult { .. }
| Error::InvalidFilePath { .. }
| Error::UnsupportedMemoryBackend { .. }
| Error::InvalidArguments { .. }
| Error::ParseProxyOpts { .. } => StatusCode::InvalidArguments,
Error::StartProcedureManager { source, .. }
@@ -308,7 +296,7 @@ impl ErrorExt for Error {
Error::OpenDal { .. } => StatusCode::Internal,
Error::S3ConfigNotSet { .. }
| Error::OutputDirNotSet { .. }
| Error::EmptyStoreAddrs { .. } => StatusCode::InvalidArguments,
| Error::KvBackendNotSet { .. } => StatusCode::InvalidArguments,
Error::BuildRuntime { source, .. } => source.status_code(),

View File

@@ -13,20 +13,22 @@
// limitations under the License.
mod bench;
mod data;
mod database;
pub mod error;
mod metadata;
mod export;
mod import;
mod meta_snapshot;
use async_trait::async_trait;
use clap::Parser;
use clap::{Parser, Subcommand};
use common_error::ext::BoxedError;
pub use database::DatabaseClient;
use error::Result;
pub use crate::bench::BenchTableMetadataCommand;
pub use crate::data::DataCommand;
pub use crate::metadata::MetadataCommand;
pub use crate::export::ExportCommand;
pub use crate::import::ImportCommand;
pub use crate::meta_snapshot::{MetaCommand, MetaInfoCommand, MetaRestoreCommand, MetaSaveCommand};
#[async_trait]
pub trait Tool: Send + Sync {
@@ -49,3 +51,19 @@ impl AttachCommand {
unimplemented!("Wait for https://github.com/GreptimeTeam/greptimedb/issues/2373")
}
}
/// Subcommand for data operations like export and import.
#[derive(Subcommand)]
pub enum DataCommand {
Export(ExportCommand),
Import(ImportCommand),
}
impl DataCommand {
pub async fn build(&self) -> std::result::Result<Box<dyn Tool>, BoxedError> {
match self {
DataCommand::Export(cmd) => cmd.build().await,
DataCommand::Import(cmd) => cmd.build().await,
}
}
}

View File

@@ -13,37 +13,139 @@
// limitations under the License.
use std::path::Path;
use std::sync::Arc;
use async_trait::async_trait;
use clap::{Parser, Subcommand};
use common_base::secrets::{ExposeSecret, SecretString};
use common_error::ext::BoxedError;
use common_meta::kv_backend::chroot::ChrootKvBackend;
use common_meta::kv_backend::etcd::EtcdStore;
use common_meta::kv_backend::KvBackendRef;
use common_meta::snapshot::MetadataSnapshotManager;
use meta_srv::bootstrap::create_etcd_client;
use meta_srv::metasrv::BackendImpl;
use object_store::services::{Fs, S3};
use object_store::ObjectStore;
use snafu::{OptionExt, ResultExt};
use crate::error::{InvalidFilePathSnafu, OpenDalSnafu, S3ConfigNotSetSnafu};
use crate::metadata::common::StoreConfig;
use crate::error::{
InvalidFilePathSnafu, KvBackendNotSetSnafu, OpenDalSnafu, S3ConfigNotSetSnafu,
UnsupportedMemoryBackendSnafu,
};
use crate::Tool;
/// Subcommand for metadata snapshot operations, including saving snapshots, restoring from snapshots, and viewing snapshot information.
/// Subcommand for metadata snapshot management.
#[derive(Subcommand)]
pub enum SnapshotCommand {
/// Save a snapshot of the current metadata state to a specified location.
Save(SaveCommand),
/// Restore metadata from a snapshot.
Restore(RestoreCommand),
/// Explore metadata from a snapshot.
Info(InfoCommand),
pub enum MetaCommand {
#[clap(subcommand)]
Snapshot(MetaSnapshotCommand),
}
impl SnapshotCommand {
impl MetaCommand {
pub async fn build(&self) -> Result<Box<dyn Tool>, BoxedError> {
match self {
SnapshotCommand::Save(cmd) => cmd.build().await,
SnapshotCommand::Restore(cmd) => cmd.build().await,
SnapshotCommand::Info(cmd) => cmd.build().await,
MetaCommand::Snapshot(cmd) => cmd.build().await,
}
}
}
/// Subcommand for metadata snapshot operations. such as save, restore and info.
#[derive(Subcommand)]
pub enum MetaSnapshotCommand {
/// Export metadata snapshot tool.
Save(MetaSaveCommand),
/// Restore metadata snapshot tool.
Restore(MetaRestoreCommand),
/// Explore metadata from metadata snapshot.
Info(MetaInfoCommand),
}
impl MetaSnapshotCommand {
pub async fn build(&self) -> Result<Box<dyn Tool>, BoxedError> {
match self {
MetaSnapshotCommand::Save(cmd) => cmd.build().await,
MetaSnapshotCommand::Restore(cmd) => cmd.build().await,
MetaSnapshotCommand::Info(cmd) => cmd.build().await,
}
}
}
#[derive(Debug, Default, Parser)]
struct MetaConnection {
/// The endpoint of store. one of etcd, pg or mysql.
#[clap(long, alias = "store-addr", value_delimiter = ',', num_args = 1..)]
store_addrs: Vec<String>,
/// The database backend.
#[clap(long, value_enum)]
backend: Option<BackendImpl>,
#[clap(long, default_value = "")]
store_key_prefix: String,
#[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
#[clap(long,default_value = common_meta::kv_backend::DEFAULT_META_TABLE_NAME)]
meta_table_name: String,
#[clap(long, default_value = "128")]
max_txn_ops: usize,
}
impl MetaConnection {
pub async fn build(&self) -> Result<KvBackendRef, BoxedError> {
let max_txn_ops = self.max_txn_ops;
let store_addrs = &self.store_addrs;
if store_addrs.is_empty() {
KvBackendNotSetSnafu { backend: "all" }
.fail()
.map_err(BoxedError::new)
} else {
let kvbackend = match self.backend {
Some(BackendImpl::EtcdStore) => {
let etcd_client = create_etcd_client(store_addrs)
.await
.map_err(BoxedError::new)?;
Ok(EtcdStore::with_etcd_client(etcd_client, max_txn_ops))
}
#[cfg(feature = "pg_kvbackend")]
Some(BackendImpl::PostgresStore) => {
let table_name = &self.meta_table_name;
let pool = meta_srv::bootstrap::create_postgres_pool(store_addrs)
.await
.map_err(BoxedError::new)?;
Ok(common_meta::kv_backend::rds::PgStore::with_pg_pool(
pool,
table_name,
max_txn_ops,
)
.await
.map_err(BoxedError::new)?)
}
#[cfg(feature = "mysql_kvbackend")]
Some(BackendImpl::MysqlStore) => {
let table_name = &self.meta_table_name;
let pool = meta_srv::bootstrap::create_mysql_pool(store_addrs)
.await
.map_err(BoxedError::new)?;
Ok(common_meta::kv_backend::rds::MySqlStore::with_mysql_pool(
pool,
table_name,
max_txn_ops,
)
.await
.map_err(BoxedError::new)?)
}
Some(BackendImpl::MemoryStore) => UnsupportedMemoryBackendSnafu
.fail()
.map_err(BoxedError::new),
_ => KvBackendNotSetSnafu { backend: "all" }
.fail()
.map_err(BoxedError::new),
};
if self.store_key_prefix.is_empty() {
kvbackend
} else {
let chroot_kvbackend =
ChrootKvBackend::new(self.store_key_prefix.as_bytes().to_vec(), kvbackend?);
Ok(Arc::new(chroot_kvbackend))
}
}
}
}
@@ -112,10 +214,10 @@ impl S3Config {
/// It will dump the metadata snapshot to local file or s3 bucket.
/// The snapshot file will be in binary format.
#[derive(Debug, Default, Parser)]
pub struct SaveCommand {
/// The store configuration.
pub struct MetaSaveCommand {
/// The connection to the metadata store.
#[clap(flatten)]
store: StoreConfig,
connection: MetaConnection,
/// The s3 config.
#[clap(flatten)]
s3_config: S3Config,
@@ -138,9 +240,9 @@ fn create_local_file_object_store(root: &str) -> Result<ObjectStore, BoxedError>
Ok(object_store)
}
impl SaveCommand {
impl MetaSaveCommand {
pub async fn build(&self) -> Result<Box<dyn Tool>, BoxedError> {
let kvbackend = self.store.build().await?;
let kvbackend = self.connection.build().await?;
let output_dir = &self.output_dir;
let object_store = self.s3_config.build(output_dir).map_err(BoxedError::new)?;
if let Some(store) = object_store {
@@ -160,7 +262,7 @@ impl SaveCommand {
}
}
struct MetaSnapshotTool {
pub struct MetaSnapshotTool {
inner: MetadataSnapshotManager,
target_file: String,
}
@@ -176,16 +278,14 @@ impl Tool for MetaSnapshotTool {
}
}
/// Restore metadata from a snapshot file.
///
/// This command restores the metadata state from a previously saved snapshot.
/// The snapshot can be loaded from either a local file system or an S3 bucket,
/// depending on the provided configuration.
/// Restore metadata snapshot tool.
/// This tool is used to restore metadata snapshot from etcd, pg or mysql.
/// It will restore the metadata snapshot from local file or s3 bucket.
#[derive(Debug, Default, Parser)]
pub struct RestoreCommand {
/// The store configuration.
pub struct MetaRestoreCommand {
/// The connection to the metadata store.
#[clap(flatten)]
store: StoreConfig,
connection: MetaConnection,
/// The s3 config.
#[clap(flatten)]
s3_config: S3Config,
@@ -199,9 +299,9 @@ pub struct RestoreCommand {
force: bool,
}
impl RestoreCommand {
impl MetaRestoreCommand {
pub async fn build(&self) -> Result<Box<dyn Tool>, BoxedError> {
let kvbackend = self.store.build().await?;
let kvbackend = self.connection.build().await?;
let input_dir = &self.input_dir;
let object_store = self.s3_config.build(input_dir).map_err(BoxedError::new)?;
if let Some(store) = object_store {
@@ -223,7 +323,7 @@ impl RestoreCommand {
}
}
struct MetaRestoreTool {
pub struct MetaRestoreTool {
inner: MetadataSnapshotManager,
source_file: String,
force: bool,
@@ -272,12 +372,9 @@ impl Tool for MetaRestoreTool {
}
}
/// Explore metadata from a snapshot file.
///
/// This command allows filtering the metadata by a specific key and limiting the number of results.
/// It prints the filtered metadata to the console.
/// Explore metadata from metadata snapshot.
#[derive(Debug, Default, Parser)]
pub struct InfoCommand {
pub struct MetaInfoCommand {
/// The s3 config.
#[clap(flatten)]
s3_config: S3Config,
@@ -292,7 +389,7 @@ pub struct InfoCommand {
limit: Option<usize>,
}
struct MetaInfoTool {
pub struct MetaInfoTool {
inner: ObjectStore,
source_file: String,
inspect_key: String,
@@ -301,7 +398,6 @@ struct MetaInfoTool {
#[async_trait]
impl Tool for MetaInfoTool {
#[allow(clippy::print_stdout)]
async fn do_work(&self) -> std::result::Result<(), BoxedError> {
let result = MetadataSnapshotManager::info(
&self.inner,
@@ -318,7 +414,7 @@ impl Tool for MetaInfoTool {
}
}
impl InfoCommand {
impl MetaInfoCommand {
fn decide_object_store_root_for_local_store(
file_path: &str,
) -> Result<(&str, &str), BoxedError> {

View File

@@ -1,42 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
mod common;
mod control;
mod snapshot;
use clap::Subcommand;
use common_error::ext::BoxedError;
use crate::metadata::control::ControlCommand;
use crate::metadata::snapshot::SnapshotCommand;
use crate::Tool;
/// Command for managing metadata operations, including saving metadata snapshots and restoring metadata from snapshots.
#[derive(Subcommand)]
pub enum MetadataCommand {
#[clap(subcommand)]
Snapshot(SnapshotCommand),
#[clap(subcommand)]
Control(ControlCommand),
}
impl MetadataCommand {
pub async fn build(&self) -> Result<Box<dyn Tool>, BoxedError> {
match self {
MetadataCommand::Snapshot(cmd) => cmd.build().await,
MetadataCommand::Control(cmd) => cmd.build().await,
}
}
}

View File

@@ -1,116 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use clap::Parser;
use common_error::ext::BoxedError;
use common_meta::kv_backend::chroot::ChrootKvBackend;
use common_meta::kv_backend::etcd::EtcdStore;
use common_meta::kv_backend::KvBackendRef;
use meta_srv::bootstrap::create_etcd_client;
use meta_srv::metasrv::BackendImpl;
use crate::error::{EmptyStoreAddrsSnafu, UnsupportedMemoryBackendSnafu};
#[derive(Debug, Default, Parser)]
pub(crate) struct StoreConfig {
/// The endpoint of store. one of etcd, postgres or mysql.
///
/// For postgres store, the format is:
/// "password=password dbname=postgres user=postgres host=localhost port=5432"
///
/// For etcd store, the format is:
/// "127.0.0.1:2379"
///
/// For mysql store, the format is:
/// "mysql://user:password@ip:port/dbname"
#[clap(long, alias = "store-addr", value_delimiter = ',', num_args = 1..)]
store_addrs: Vec<String>,
/// The maximum number of operations in a transaction. Only used when using [etcd-store].
#[clap(long, default_value = "128")]
max_txn_ops: usize,
/// The metadata store backend.
#[clap(long, value_enum, default_value = "etcd-store")]
backend: BackendImpl,
/// The key prefix of the metadata store.
#[clap(long, default_value = "")]
store_key_prefix: String,
/// The table name in RDS to store metadata. Only used when using [postgres-store] or [mysql-store].
#[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
#[clap(long, default_value = common_meta::kv_backend::DEFAULT_META_TABLE_NAME)]
meta_table_name: String,
}
impl StoreConfig {
/// Builds a [`KvBackendRef`] from the store configuration.
pub async fn build(&self) -> Result<KvBackendRef, BoxedError> {
let max_txn_ops = self.max_txn_ops;
let store_addrs = &self.store_addrs;
if store_addrs.is_empty() {
EmptyStoreAddrsSnafu.fail().map_err(BoxedError::new)
} else {
let kvbackend = match self.backend {
BackendImpl::EtcdStore => {
let etcd_client = create_etcd_client(store_addrs)
.await
.map_err(BoxedError::new)?;
Ok(EtcdStore::with_etcd_client(etcd_client, max_txn_ops))
}
#[cfg(feature = "pg_kvbackend")]
BackendImpl::PostgresStore => {
let table_name = &self.meta_table_name;
let pool = meta_srv::bootstrap::create_postgres_pool(store_addrs)
.await
.map_err(BoxedError::new)?;
Ok(common_meta::kv_backend::rds::PgStore::with_pg_pool(
pool,
table_name,
max_txn_ops,
)
.await
.map_err(BoxedError::new)?)
}
#[cfg(feature = "mysql_kvbackend")]
BackendImpl::MysqlStore => {
let table_name = &self.meta_table_name;
let pool = meta_srv::bootstrap::create_mysql_pool(store_addrs)
.await
.map_err(BoxedError::new)?;
Ok(common_meta::kv_backend::rds::MySqlStore::with_mysql_pool(
pool,
table_name,
max_txn_ops,
)
.await
.map_err(BoxedError::new)?)
}
BackendImpl::MemoryStore => UnsupportedMemoryBackendSnafu
.fail()
.map_err(BoxedError::new),
};
if self.store_key_prefix.is_empty() {
kvbackend
} else {
let chroot_kvbackend =
ChrootKvBackend::new(self.store_key_prefix.as_bytes().to_vec(), kvbackend?);
Ok(Arc::new(chroot_kvbackend))
}
}
}
}

View File

@@ -1,38 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
mod get;
mod utils;
use clap::Subcommand;
use common_error::ext::BoxedError;
use get::GetCommand;
use crate::Tool;
/// Subcommand for metadata control.
#[derive(Subcommand)]
pub enum ControlCommand {
/// Get the metadata from the metasrv.
#[clap(subcommand)]
Get(GetCommand),
}
impl ControlCommand {
pub async fn build(&self) -> Result<Box<dyn Tool>, BoxedError> {
match self {
ControlCommand::Get(cmd) => cmd.build().await,
}
}
}

View File

@@ -1,242 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::cmp::min;
use async_trait::async_trait;
use clap::{Parser, Subcommand};
use client::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_catalog::format_full_table_name;
use common_error::ext::BoxedError;
use common_meta::key::table_info::TableInfoKey;
use common_meta::key::table_name::TableNameKey;
use common_meta::key::table_route::TableRouteKey;
use common_meta::key::TableMetadataManager;
use common_meta::kv_backend::KvBackendRef;
use common_meta::range_stream::{PaginationStream, DEFAULT_PAGE_SIZE};
use common_meta::rpc::store::RangeRequest;
use futures::TryStreamExt;
use crate::error::InvalidArgumentsSnafu;
use crate::metadata::common::StoreConfig;
use crate::metadata::control::utils::{decode_key_value, json_fromatter};
use crate::Tool;
/// Subcommand for get command.
#[derive(Subcommand)]
pub enum GetCommand {
Key(GetKeyCommand),
Table(GetTableCommand),
}
impl GetCommand {
pub async fn build(&self) -> Result<Box<dyn Tool>, BoxedError> {
match self {
GetCommand::Key(cmd) => cmd.build().await,
GetCommand::Table(cmd) => cmd.build().await,
}
}
}
/// Get key-value pairs from the metadata store.
#[derive(Debug, Default, Parser)]
pub struct GetKeyCommand {
/// The key to get from the metadata store. If empty, returns all key-value pairs.
#[clap(default_value = "")]
key: String,
/// Whether to perform a prefix query. If true, returns all key-value pairs where the key starts with the given prefix.
#[clap(long, default_value = "false")]
prefix: bool,
/// The maximum number of key-value pairs to return. If 0, returns all key-value pairs.
#[clap(long, default_value = "0")]
limit: u64,
#[clap(flatten)]
store: StoreConfig,
}
impl GetKeyCommand {
pub async fn build(&self) -> Result<Box<dyn Tool>, BoxedError> {
let kvbackend = self.store.build().await?;
Ok(Box::new(GetKeyTool {
kvbackend,
key: self.key.clone(),
prefix: self.prefix,
limit: self.limit,
}))
}
}
struct GetKeyTool {
kvbackend: KvBackendRef,
key: String,
prefix: bool,
limit: u64,
}
#[async_trait]
impl Tool for GetKeyTool {
async fn do_work(&self) -> Result<(), BoxedError> {
let mut req = RangeRequest::default();
if self.prefix {
req = req.with_prefix(self.key.as_bytes());
} else {
req = req.with_key(self.key.as_bytes());
}
let page_size = if self.limit > 0 {
min(self.limit as usize, DEFAULT_PAGE_SIZE)
} else {
DEFAULT_PAGE_SIZE
};
let pagination_stream =
PaginationStream::new(self.kvbackend.clone(), req, page_size, decode_key_value);
let mut stream = Box::pin(pagination_stream.into_stream());
let mut counter = 0;
while let Some((key, value)) = stream.try_next().await.map_err(BoxedError::new)? {
print!("{}\n{}\n", key, value);
counter += 1;
if self.limit > 0 && counter >= self.limit {
break;
}
}
Ok(())
}
}
/// Get table metadata from the metadata store via table id.
#[derive(Debug, Default, Parser)]
pub struct GetTableCommand {
/// Get table metadata by table id.
#[clap(long)]
table_id: Option<u32>,
/// Get table metadata by table name.
#[clap(long)]
table_name: Option<String>,
/// The schema name of the table.
#[clap(long)]
schema_name: Option<String>,
/// Pretty print the output.
#[clap(long, default_value = "false")]
pretty: bool,
#[clap(flatten)]
store: StoreConfig,
}
impl GetTableCommand {
pub fn validate(&self) -> Result<(), BoxedError> {
if self.table_id.is_none() && self.table_name.is_none() {
return Err(BoxedError::new(
InvalidArgumentsSnafu {
msg: "You must specify either --table-id or --table-name.",
}
.build(),
));
}
Ok(())
}
}
struct GetTableTool {
kvbackend: KvBackendRef,
table_id: Option<u32>,
table_name: Option<String>,
schema_name: Option<String>,
pretty: bool,
}
#[async_trait]
impl Tool for GetTableTool {
async fn do_work(&self) -> Result<(), BoxedError> {
let table_metadata_manager = TableMetadataManager::new(self.kvbackend.clone());
let table_name_manager = table_metadata_manager.table_name_manager();
let table_info_manager = table_metadata_manager.table_info_manager();
let table_route_manager = table_metadata_manager.table_route_manager();
let table_id = if let Some(table_name) = &self.table_name {
let catalog = DEFAULT_CATALOG_NAME.to_string();
let schema_name = self
.schema_name
.clone()
.unwrap_or_else(|| DEFAULT_SCHEMA_NAME.to_string());
let key = TableNameKey::new(&catalog, &schema_name, table_name);
let Some(table_name) = table_name_manager.get(key).await.map_err(BoxedError::new)?
else {
println!(
"Table({}) not found",
format_full_table_name(&catalog, &schema_name, table_name)
);
return Ok(());
};
table_name.table_id()
} else {
// Safety: we have validated that table_id or table_name is not None
self.table_id.unwrap()
};
let table_info = table_info_manager
.get(table_id)
.await
.map_err(BoxedError::new)?;
if let Some(table_info) = table_info {
println!(
"{}\n{}",
TableInfoKey::new(table_id),
json_fromatter(self.pretty, &*table_info)
);
} else {
println!("Table info not found");
}
let table_route = table_route_manager
.table_route_storage()
.get(table_id)
.await
.map_err(BoxedError::new)?;
if let Some(table_route) = table_route {
println!(
"{}\n{}",
TableRouteKey::new(table_id),
json_fromatter(self.pretty, &table_route)
);
} else {
println!("Table route not found");
}
Ok(())
}
}
impl GetTableCommand {
pub async fn build(&self) -> Result<Box<dyn Tool>, BoxedError> {
self.validate()?;
let kvbackend = self.store.build().await?;
Ok(Box::new(GetTableTool {
kvbackend,
table_id: self.table_id,
table_name: self.table_name.clone(),
schema_name: self.schema_name.clone(),
pretty: self.pretty,
}))
}
}

View File

@@ -1,36 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use common_meta::error::Result as CommonMetaResult;
use common_meta::rpc::KeyValue;
use serde::Serialize;
/// Decodes a key-value pair into a string.
pub fn decode_key_value(kv: KeyValue) -> CommonMetaResult<(String, String)> {
let key = String::from_utf8_lossy(&kv.key).to_string();
let value = String::from_utf8_lossy(&kv.value).to_string();
Ok((key, value))
}
/// Formats a value as a JSON string.
pub fn json_fromatter<T>(pretty: bool, value: &T) -> String
where
T: Serialize,
{
if pretty {
serde_json::to_string_pretty(value).unwrap()
} else {
serde_json::to_string(value).unwrap()
}
}

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use api::v1::flow::{DirtyWindowRequest, DirtyWindowRequests, FlowRequest, FlowResponse};
use api::v1::flow::{FlowRequest, FlowResponse};
use api::v1::region::InsertRequests;
use common_error::ext::BoxedError;
use common_meta::node_manager::Flownode;
@@ -44,16 +44,6 @@ impl Flownode for FlowRequester {
.map_err(BoxedError::new)
.context(common_meta::error::ExternalSnafu)
}
async fn handle_mark_window_dirty(
&self,
req: DirtyWindowRequest,
) -> common_meta::error::Result<FlowResponse> {
self.handle_mark_window_dirty(req)
.await
.map_err(BoxedError::new)
.context(common_meta::error::ExternalSnafu)
}
}
impl FlowRequester {
@@ -101,20 +91,4 @@ impl FlowRequester {
.into_inner();
Ok(response)
}
async fn handle_mark_window_dirty(&self, req: DirtyWindowRequest) -> Result<FlowResponse> {
let (addr, mut client) = self.client.raw_flow_client()?;
let response = client
.handle_mark_dirty_time_window(DirtyWindowRequests {
requests: vec![req],
})
.await
.or_else(|e| {
let code = e.code();
let err: crate::error::Error = e.into();
Err(BoxedError::new(err)).context(FlowServerSnafu { addr, code })
})?
.into_inner();
Ok(response)
}
}

View File

@@ -323,7 +323,6 @@ impl StartCommand {
cached_meta_backend.clone(),
layered_cache_registry.clone(),
None,
None,
);
let table_metadata_manager =

View File

@@ -20,7 +20,6 @@ use async_trait::async_trait;
use cache::{build_fundamental_cache_registry, with_default_composite_cache_registry};
use catalog::information_extension::DistributedInformationExtension;
use catalog::kvbackend::{CachedKvBackendBuilder, KvBackendCatalogManager, MetaKvBackend};
use catalog::process_manager::ProcessManager;
use clap::Parser;
use client::client_manager::NodeClients;
use common_base::Plugins;
@@ -39,7 +38,6 @@ use frontend::heartbeat::HeartbeatTask;
use frontend::instance::builder::FrontendBuilder;
use frontend::server::Services;
use meta_client::{MetaClientOptions, MetaClientType};
use servers::addrs;
use servers::export_metrics::ExportMetricsTask;
use servers::tls::{TlsMode, TlsOption};
use snafu::{OptionExt, ResultExt};
@@ -344,17 +342,11 @@ impl StartCommand {
let information_extension =
Arc::new(DistributedInformationExtension::new(meta_client.clone()));
let process_manager = Arc::new(ProcessManager::new(
addrs::resolve_addr(&opts.grpc.bind_addr, Some(&opts.grpc.server_addr)),
Some(meta_client.clone()),
));
let catalog_manager = KvBackendCatalogManager::new(
information_extension,
cached_meta_backend.clone(),
layered_cache_registry.clone(),
None,
Some(process_manager.clone()),
);
let executor = HandlerGroupExecutor::new(vec![
@@ -391,7 +383,6 @@ impl StartCommand {
catalog_manager,
Arc::new(client),
meta_client,
process_manager,
)
.with_plugin(plugins.clone())
.with_local_cache_invalidator(layered_cache_registry)

View File

@@ -21,7 +21,6 @@ use async_trait::async_trait;
use cache::{build_fundamental_cache_registry, with_default_composite_cache_registry};
use catalog::information_schema::InformationExtension;
use catalog::kvbackend::KvBackendCatalogManager;
use catalog::process_manager::ProcessManager;
use clap::Parser;
use client::api::v1::meta::RegionRole;
use common_base::readable_size::ReadableSize;
@@ -527,14 +526,11 @@ impl StartCommand {
datanode.region_server(),
procedure_manager.clone(),
));
let process_manager = Arc::new(ProcessManager::new(opts.grpc.server_addr.clone(), None));
let catalog_manager = KvBackendCatalogManager::new(
information_extension.clone(),
kv_backend.clone(),
layered_cache_registry.clone(),
Some(procedure_manager.clone()),
Some(process_manager.clone()),
);
let table_metadata_manager =
@@ -624,7 +620,6 @@ impl StartCommand {
catalog_manager.clone(),
node_manager.clone(),
ddl_task_executor.clone(),
process_manager,
)
.with_plugin(plugins.clone())
.try_build()
@@ -652,7 +647,7 @@ impl StartCommand {
node_manager,
)
.await
.context(StartFlownodeSnafu)?;
.context(error::StartFlownodeSnafu)?;
flow_streaming_engine.set_frontend_invoker(invoker).await;
let export_metrics_task = ExportMetricsTask::try_new(&opts.export_metrics, Some(&plugins))

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::path::Path;
use std::time::Duration;
use cmd::options::GreptimeOptions;
@@ -57,7 +58,12 @@ fn test_load_datanode_example_config() {
metadata_cache_tti: Duration::from_secs(300),
}),
wal: DatanodeWalConfig::RaftEngine(RaftEngineConfig {
dir: Some(format!("{}/{}", DEFAULT_DATA_HOME, WAL_DIR)),
dir: Some(
Path::new(DEFAULT_DATA_HOME)
.join(WAL_DIR)
.to_string_lossy()
.to_string(),
),
sync_period: Some(Duration::from_secs(10)),
recovery_parallelism: 2,
..Default::default()
@@ -80,7 +86,10 @@ fn test_load_datanode_example_config() {
],
logging: LoggingOptions {
level: Some("info".to_string()),
dir: format!("{}/{}", DEFAULT_DATA_HOME, DEFAULT_LOGGING_DIR),
dir: Path::new(DEFAULT_DATA_HOME)
.join(DEFAULT_LOGGING_DIR)
.to_string_lossy()
.to_string(),
otlp_endpoint: Some(DEFAULT_OTLP_ENDPOINT.to_string()),
tracing_sample_ratio: Some(Default::default()),
..Default::default()
@@ -123,7 +132,10 @@ fn test_load_frontend_example_config() {
}),
logging: LoggingOptions {
level: Some("info".to_string()),
dir: format!("{}/{}", DEFAULT_DATA_HOME, DEFAULT_LOGGING_DIR),
dir: Path::new(DEFAULT_DATA_HOME)
.join(DEFAULT_LOGGING_DIR)
.to_string_lossy()
.to_string(),
otlp_endpoint: Some(DEFAULT_OTLP_ENDPOINT.to_string()),
tracing_sample_ratio: Some(Default::default()),
..Default::default()
@@ -170,7 +182,10 @@ fn test_load_metasrv_example_config() {
..Default::default()
},
logging: LoggingOptions {
dir: format!("{}/{}", DEFAULT_DATA_HOME, DEFAULT_LOGGING_DIR),
dir: Path::new(DEFAULT_DATA_HOME)
.join(DEFAULT_LOGGING_DIR)
.to_string_lossy()
.to_string(),
level: Some("info".to_string()),
otlp_endpoint: Some(DEFAULT_OTLP_ENDPOINT.to_string()),
tracing_sample_ratio: Some(Default::default()),
@@ -205,7 +220,12 @@ fn test_load_standalone_example_config() {
component: StandaloneOptions {
default_timezone: Some("UTC".to_string()),
wal: DatanodeWalConfig::RaftEngine(RaftEngineConfig {
dir: Some(format!("{}/{}", DEFAULT_DATA_HOME, WAL_DIR)),
dir: Some(
Path::new(DEFAULT_DATA_HOME)
.join(WAL_DIR)
.to_string_lossy()
.to_string(),
),
sync_period: Some(Duration::from_secs(10)),
recovery_parallelism: 2,
..Default::default()
@@ -228,7 +248,10 @@ fn test_load_standalone_example_config() {
},
logging: LoggingOptions {
level: Some("info".to_string()),
dir: format!("{}/{}", DEFAULT_DATA_HOME, DEFAULT_LOGGING_DIR),
dir: Path::new(DEFAULT_DATA_HOME)
.join(DEFAULT_LOGGING_DIR)
.to_string_lossy()
.to_string(),
otlp_endpoint: Some(DEFAULT_OTLP_ENDPOINT.to_string()),
tracing_sample_ratio: Some(Default::default()),
..Default::default()

View File

@@ -1,240 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! [CancellationHandle] is used to compose with manual implementation of [futures::future::Future]
//! or [futures::stream::Stream] to facilitate cancellation.
//! See example in [frontend::stream_wrapper::CancellableStreamWrapper] and [CancellableFuture].
use std::fmt::{Debug, Display, Formatter};
use std::future::Future;
use std::pin::Pin;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::task::{Context, Poll};
use futures::task::AtomicWaker;
use pin_project::pin_project;
#[derive(Default)]
pub struct CancellationHandle {
waker: AtomicWaker,
cancelled: AtomicBool,
}
impl Debug for CancellationHandle {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("CancellationHandle")
.field("cancelled", &self.is_cancelled())
.finish()
}
}
impl CancellationHandle {
pub fn waker(&self) -> &AtomicWaker {
&self.waker
}
/// Cancels a future or stream.
pub fn cancel(&self) {
if self
.cancelled
.compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed)
.is_ok()
{
self.waker.wake();
}
}
/// Is this handle cancelled.
pub fn is_cancelled(&self) -> bool {
self.cancelled.load(Ordering::Relaxed)
}
}
#[pin_project]
#[derive(Debug, Clone)]
pub struct CancellableFuture<T> {
#[pin]
fut: T,
handle: Arc<CancellationHandle>,
}
impl<T> CancellableFuture<T> {
pub fn new(fut: T, handle: Arc<CancellationHandle>) -> Self {
Self { fut, handle }
}
}
impl<T> Future for CancellableFuture<T>
where
T: Future,
{
type Output = Result<T::Output, Cancelled>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.as_mut().project();
// Check if the task has been aborted
if this.handle.is_cancelled() {
return Poll::Ready(Err(Cancelled));
}
if let Poll::Ready(x) = this.fut.poll(cx) {
return Poll::Ready(Ok(x));
}
this.handle.waker().register(cx.waker());
if this.handle.is_cancelled() {
return Poll::Ready(Err(Cancelled));
}
Poll::Pending
}
}
#[derive(Copy, Clone, Debug)]
pub struct Cancelled;
impl Display for Cancelled {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "Future has been cancelled")
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use std::time::Duration;
use tokio::time::{sleep, timeout};
use crate::cancellation::{CancellableFuture, CancellationHandle, Cancelled};
#[tokio::test]
async fn test_cancellable_future_completes_normally() {
let handle = Arc::new(CancellationHandle::default());
let future = async { 42 };
let cancellable = CancellableFuture::new(future, handle);
let result = cancellable.await;
assert!(result.is_ok());
assert_eq!(result.unwrap(), 42);
}
#[tokio::test]
async fn test_cancellable_future_cancelled_before_start() {
let handle = Arc::new(CancellationHandle::default());
handle.cancel();
let future = async { 42 };
let cancellable = CancellableFuture::new(future, handle);
let result = cancellable.await;
assert!(result.is_err());
assert!(matches!(result.unwrap_err(), Cancelled));
}
#[tokio::test]
async fn test_cancellable_future_cancelled_during_execution() {
let handle = Arc::new(CancellationHandle::default());
let handle_clone = handle.clone();
// Create a future that sleeps for a long time
let future = async {
sleep(Duration::from_secs(10)).await;
42
};
let cancellable = CancellableFuture::new(future, handle);
// Cancel the future after a short delay
tokio::spawn(async move {
sleep(Duration::from_millis(50)).await;
handle_clone.cancel();
});
let result = cancellable.await;
assert!(result.is_err());
assert!(matches!(result.unwrap_err(), Cancelled));
}
#[tokio::test]
async fn test_cancellable_future_completes_before_cancellation() {
let handle = Arc::new(CancellationHandle::default());
let handle_clone = handle.clone();
// Create a future that completes quickly
let future = async {
sleep(Duration::from_millis(10)).await;
42
};
let cancellable = CancellableFuture::new(future, handle);
// Try to cancel after the future should have completed
tokio::spawn(async move {
sleep(Duration::from_millis(100)).await;
handle_clone.cancel();
});
let result = cancellable.await;
assert!(result.is_ok());
assert_eq!(result.unwrap(), 42);
}
#[tokio::test]
async fn test_cancellation_handle_is_cancelled() {
let handle = CancellationHandle::default();
assert!(!handle.is_cancelled());
handle.cancel();
assert!(handle.is_cancelled());
}
#[tokio::test]
async fn test_multiple_cancellable_futures_with_same_handle() {
let handle = Arc::new(CancellationHandle::default());
let future1 = CancellableFuture::new(async { 1 }, handle.clone());
let future2 = CancellableFuture::new(async { 2 }, handle.clone());
// Cancel before starting
handle.cancel();
let (result1, result2) = tokio::join!(future1, future2);
assert!(result1.is_err());
assert!(result2.is_err());
assert!(matches!(result1.unwrap_err(), Cancelled));
assert!(matches!(result2.unwrap_err(), Cancelled));
}
#[tokio::test]
async fn test_cancellable_future_with_timeout() {
let handle = Arc::new(CancellationHandle::default());
let future = async {
sleep(Duration::from_secs(1)).await;
42
};
let cancellable = CancellableFuture::new(future, handle.clone());
// Use timeout to ensure the test doesn't hang
let result = timeout(Duration::from_millis(100), cancellable).await;
// Should timeout because the future takes 1 second but we timeout after 100ms
assert!(result.is_err());
}
#[tokio::test]
async fn test_cancelled_display() {
let cancelled = Cancelled;
assert_eq!(format!("{}", cancelled), "Future has been cancelled");
}
}

View File

@@ -14,7 +14,6 @@
pub mod bit_vec;
pub mod bytes;
pub mod cancellation;
pub mod plugins;
pub mod range_read;
#[allow(clippy::all)]

View File

@@ -102,8 +102,6 @@ pub const INFORMATION_SCHEMA_FLOW_TABLE_ID: u32 = 33;
pub const INFORMATION_SCHEMA_PROCEDURE_INFO_TABLE_ID: u32 = 34;
/// id for information_schema.region_statistics
pub const INFORMATION_SCHEMA_REGION_STATISTICS_TABLE_ID: u32 = 35;
/// id for information_schema.process_list
pub const INFORMATION_SCHEMA_PROCESS_LIST_TABLE_ID: u32 = 36;
// ----- End of information_schema tables -----

View File

@@ -7,13 +7,5 @@ license.workspace = true
[dependencies]
async-trait.workspace = true
common-error.workspace = true
common-grpc.workspace = true
common-macro.workspace = true
common-meta.workspace = true
greptime-proto.workspace = true
meta-client.workspace = true
snafu.workspace = true
tonic.workspace = true
[dev-dependencies]
tokio.workspace = true

View File

@@ -27,35 +27,6 @@ pub enum Error {
location: Location,
source: BoxedError,
},
#[snafu(display("Failed to list nodes from metasrv"))]
Meta {
source: Box<meta_client::error::Error>,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to parse process id: {}", s))]
ParseProcessId {
s: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to invoke frontend service"))]
InvokeFrontend {
#[snafu(source)]
error: tonic::Status,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to invoke list process service"))]
CreateChannel {
source: common_grpc::error::Error,
#[snafu(implicit)]
location: Location,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -65,10 +36,6 @@ impl ErrorExt for Error {
use Error::*;
match self {
External { source, .. } => source.status_code(),
Meta { source, .. } => source.status_code(),
ParseProcessId { .. } => StatusCode::InvalidArguments,
InvokeFrontend { .. } => StatusCode::Unexpected,
CreateChannel { source, .. } => source.status_code(),
}
}

View File

@@ -12,41 +12,4 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::fmt::{Display, Formatter};
use std::str::FromStr;
use snafu::OptionExt;
pub mod error;
pub mod selector;
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct DisplayProcessId {
pub server_addr: String,
pub id: u64,
}
impl Display for DisplayProcessId {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "{}/{}", self.server_addr, self.id)
}
}
impl TryFrom<&str> for DisplayProcessId {
type Error = error::Error;
fn try_from(value: &str) -> Result<Self, Self::Error> {
let mut split = value.split('/');
let server_addr = split
.next()
.context(error::ParseProcessIdSnafu { s: value })?
.to_string();
let id = split
.next()
.context(error::ParseProcessIdSnafu { s: value })?;
let id = u64::from_str(id)
.ok()
.context(error::ParseProcessIdSnafu { s: value })?;
Ok(DisplayProcessId { server_addr, id })
}
}

View File

@@ -1,112 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::time::Duration;
use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
use common_meta::cluster::{ClusterInfo, NodeInfo, Role};
use greptime_proto::v1::frontend::{
frontend_client, KillProcessRequest, KillProcessResponse, ListProcessRequest,
ListProcessResponse,
};
use meta_client::MetaClientRef;
use snafu::ResultExt;
use tonic::Response;
use crate::error;
use crate::error::{MetaSnafu, Result};
pub type FrontendClientPtr = Box<dyn FrontendClient>;
#[async_trait::async_trait]
pub trait FrontendClient: Send {
async fn list_process(&mut self, req: ListProcessRequest) -> Result<ListProcessResponse>;
async fn kill_process(&mut self, req: KillProcessRequest) -> Result<KillProcessResponse>;
}
#[async_trait::async_trait]
impl FrontendClient for frontend_client::FrontendClient<tonic::transport::channel::Channel> {
async fn list_process(&mut self, req: ListProcessRequest) -> Result<ListProcessResponse> {
frontend_client::FrontendClient::<tonic::transport::channel::Channel>::list_process(
self, req,
)
.await
.context(error::InvokeFrontendSnafu)
.map(Response::into_inner)
}
async fn kill_process(&mut self, req: KillProcessRequest) -> Result<KillProcessResponse> {
frontend_client::FrontendClient::<tonic::transport::channel::Channel>::kill_process(
self, req,
)
.await
.context(error::InvokeFrontendSnafu)
.map(Response::into_inner)
}
}
#[async_trait::async_trait]
pub trait FrontendSelector {
async fn select<F>(&self, predicate: F) -> Result<Vec<FrontendClientPtr>>
where
F: Fn(&NodeInfo) -> bool + Send;
}
#[derive(Debug, Clone)]
pub struct MetaClientSelector {
meta_client: MetaClientRef,
channel_manager: ChannelManager,
}
#[async_trait::async_trait]
impl FrontendSelector for MetaClientSelector {
async fn select<F>(&self, predicate: F) -> Result<Vec<FrontendClientPtr>>
where
F: Fn(&NodeInfo) -> bool + Send,
{
let nodes = self
.meta_client
.list_nodes(Some(Role::Frontend))
.await
.map_err(Box::new)
.context(MetaSnafu)?;
nodes
.into_iter()
.filter(predicate)
.map(|node| {
let channel = self
.channel_manager
.get(node.peer.addr)
.context(error::CreateChannelSnafu)?;
let client = frontend_client::FrontendClient::new(channel);
Ok(Box::new(client) as FrontendClientPtr)
})
.collect::<Result<Vec<_>>>()
}
}
impl MetaClientSelector {
pub fn new(meta_client: MetaClientRef) -> Self {
let cfg = ChannelConfig::new()
.connect_timeout(Duration::from_secs(30))
.timeout(Duration::from_secs(30));
let channel_manager = ChannelManager::with_config(cfg);
Self {
meta_client,
channel_manager,
}
}
}

View File

@@ -23,8 +23,7 @@ use std::sync::Arc;
use build::BuildFunction;
use database::{
ConnectionIdFunction, CurrentSchemaFunction, DatabaseFunction, PgBackendPidFunction,
ReadPreferenceFunction, SessionUserFunction,
CurrentSchemaFunction, DatabaseFunction, ReadPreferenceFunction, SessionUserFunction,
};
use pg_catalog::PGCatalogFunction;
use procedure_state::ProcedureStateFunction;
@@ -43,8 +42,6 @@ impl SystemFunction {
registry.register_scalar(DatabaseFunction);
registry.register_scalar(SessionUserFunction);
registry.register_scalar(ReadPreferenceFunction);
registry.register_scalar(PgBackendPidFunction);
registry.register_scalar(ConnectionIdFunction);
registry.register_scalar(TimezoneFunction);
registry.register_async(Arc::new(ProcedureStateFunction));
PGCatalogFunction::register(registry);

View File

@@ -18,8 +18,7 @@ use std::sync::Arc;
use common_query::error::Result;
use common_query::prelude::{Signature, Volatility};
use datatypes::prelude::{ConcreteDataType, ScalarVector};
use datatypes::vectors::{StringVector, UInt64Vector, VectorRef};
use derive_more::Display;
use datatypes::vectors::{StringVector, VectorRef};
use crate::function::{Function, FunctionContext};
@@ -33,20 +32,10 @@ pub struct SessionUserFunction;
pub struct ReadPreferenceFunction;
#[derive(Display)]
#[display("{}", self.name())]
pub struct PgBackendPidFunction;
#[derive(Display)]
#[display("{}", self.name())]
pub struct ConnectionIdFunction;
const DATABASE_FUNCTION_NAME: &str = "database";
const CURRENT_SCHEMA_FUNCTION_NAME: &str = "current_schema";
const SESSION_USER_FUNCTION_NAME: &str = "session_user";
const READ_PREFERENCE_FUNCTION_NAME: &str = "read_preference";
const PG_BACKEND_PID: &str = "pg_backend_pid";
const CONNECTION_ID: &str = "connection_id";
impl Function for DatabaseFunction {
fn name(&self) -> &str {
@@ -128,46 +117,6 @@ impl Function for ReadPreferenceFunction {
}
}
impl Function for PgBackendPidFunction {
fn name(&self) -> &str {
PG_BACKEND_PID
}
fn return_type(&self, _input_types: &[ConcreteDataType]) -> Result<ConcreteDataType> {
Ok(ConcreteDataType::uint64_datatype())
}
fn signature(&self) -> Signature {
Signature::nullary(Volatility::Immutable)
}
fn eval(&self, func_ctx: &FunctionContext, _columns: &[VectorRef]) -> Result<VectorRef> {
let pid = func_ctx.query_ctx.process_id();
Ok(Arc::new(UInt64Vector::from_slice([pid])) as _)
}
}
impl Function for ConnectionIdFunction {
fn name(&self) -> &str {
CONNECTION_ID
}
fn return_type(&self, _input_types: &[ConcreteDataType]) -> Result<ConcreteDataType> {
Ok(ConcreteDataType::uint64_datatype())
}
fn signature(&self) -> Signature {
Signature::nullary(Volatility::Immutable)
}
fn eval(&self, func_ctx: &FunctionContext, _columns: &[VectorRef]) -> Result<VectorRef> {
let pid = func_ctx.query_ctx.process_id();
Ok(Arc::new(UInt64Vector::from_slice([pid])) as _)
}
}
impl fmt::Display for DatabaseFunction {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "DATABASE")

View File

@@ -1001,7 +1001,7 @@ impl ErrorExt for Error {
}
#[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
RdsTransactionRetryFailed { .. } => StatusCode::Internal,
DatanodeTableInfoNotFound { .. } => StatusCode::Internal,
Error::DatanodeTableInfoNotFound { .. } => StatusCode::Internal,
}
}

View File

@@ -70,12 +70,11 @@ impl MetadataKey<'_, ViewInfoKey> for ViewInfoKey {
}
.build()
})?;
let captures =
VIEW_INFO_KEY_PATTERN
.captures(key)
.with_context(|| InvalidViewInfoSnafu {
err_msg: format!("Invalid ViewInfoKey '{key}'"),
})?;
let captures = VIEW_INFO_KEY_PATTERN
.captures(key)
.context(InvalidViewInfoSnafu {
err_msg: format!("Invalid ViewInfoKey '{key}'"),
})?;
// Safety: pass the regex check above
let view_id = captures[1].parse::<TableId>().unwrap();
Ok(ViewInfoKey { view_id })

View File

@@ -15,7 +15,7 @@
use std::sync::Arc;
use api::region::RegionResponse;
use api::v1::flow::{DirtyWindowRequest, FlowRequest, FlowResponse};
use api::v1::flow::{FlowRequest, FlowResponse};
use api::v1::region::{InsertRequests, RegionRequest};
pub use common_base::AffectedRows;
use common_query::request::QueryRequest;
@@ -42,9 +42,6 @@ pub trait Flownode: Send + Sync {
async fn handle(&self, request: FlowRequest) -> Result<FlowResponse>;
async fn handle_inserts(&self, request: InsertRequests) -> Result<FlowResponse>;
/// Handles requests to mark time window as dirty.
async fn handle_mark_window_dirty(&self, req: DirtyWindowRequest) -> Result<FlowResponse>;
}
pub type FlownodeRef = Arc<dyn Flownode>;

View File

@@ -15,7 +15,7 @@
use std::sync::Arc;
use api::region::RegionResponse;
use api::v1::flow::{DirtyWindowRequest, FlowRequest, FlowResponse};
use api::v1::flow::{FlowRequest, FlowResponse};
use api::v1::region::{InsertRequests, RegionRequest};
pub use common_base::AffectedRows;
use common_query::request::QueryRequest;
@@ -67,14 +67,6 @@ pub trait MockFlownodeHandler: Sync + Send + Clone {
) -> Result<FlowResponse> {
unimplemented!()
}
async fn handle_mark_window_dirty(
&self,
_peer: &Peer,
_req: DirtyWindowRequest,
) -> Result<FlowResponse> {
unimplemented!()
}
}
/// A mock struct implements [NodeManager] only implement the `datanode` method.
@@ -142,10 +134,6 @@ impl<T: MockFlownodeHandler> Flownode for MockNode<T> {
async fn handle_inserts(&self, requests: InsertRequests) -> Result<FlowResponse> {
self.handler.handle_inserts(&self.peer, requests).await
}
async fn handle_mark_window_dirty(&self, req: DirtyWindowRequest) -> Result<FlowResponse> {
self.handler.handle_mark_window_dirty(&self.peer, req).await
}
}
#[async_trait::async_trait]

View File

@@ -173,7 +173,6 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Stream timeout"))]
StreamTimeout {
#[snafu(implicit)]
@@ -181,7 +180,6 @@ pub enum Error {
#[snafu(source)]
error: tokio::time::error::Elapsed,
},
#[snafu(display("RecordBatch slice index overflow: {visit_index} > {size}"))]
RecordBatchSliceIndexOverflow {
#[snafu(implicit)]
@@ -189,12 +187,6 @@ pub enum Error {
size: usize,
visit_index: usize,
},
#[snafu(display("Stream has been cancelled"))]
StreamCancelled {
#[snafu(implicit)]
location: Location,
},
}
impl ErrorExt for Error {
@@ -229,8 +221,6 @@ impl ErrorExt for Error {
}
Error::StreamTimeout { .. } => StatusCode::Cancelled,
Error::StreamCancelled { .. } => StatusCode::Cancelled,
}
}

View File

@@ -326,14 +326,6 @@ impl Value {
}
}
/// Cast Value to [Duration]. Return None if value is not a valid duration data type.
pub fn as_duration(&self) -> Option<Duration> {
match self {
Value::Duration(d) => Some(*d),
_ => None,
}
}
/// Returns the logical type of the value.
pub fn logical_type_id(&self) -> LogicalTypeId {
match self {

View File

@@ -31,7 +31,6 @@ use common_runtime::JoinHandle;
use common_telemetry::{error, info, trace, warn};
use datatypes::value::Value;
use futures::TryStreamExt;
use greptime_proto::v1::flow::DirtyWindowRequest;
use itertools::Itertools;
use session::context::QueryContextBuilder;
use snafu::{ensure, IntoError, OptionExt, ResultExt};
@@ -833,10 +832,6 @@ impl common_meta::node_manager::Flownode for FlowDualEngine {
.map(|_| Default::default())
.map_err(to_meta_err(snafu::location!()))
}
async fn handle_mark_window_dirty(&self, _req: DirtyWindowRequest) -> MetaResult<FlowResponse> {
unreachable!()
}
}
/// return a function to convert `crate::error::Error` to `common_meta::error::Error`
@@ -944,10 +939,6 @@ impl common_meta::node_manager::Flownode for StreamingEngine {
.map(|_| Default::default())
.map_err(to_meta_err(snafu::location!()))
}
async fn handle_mark_window_dirty(&self, _req: DirtyWindowRequest) -> MetaResult<FlowResponse> {
unreachable!()
}
}
impl FlowEngine for StreamingEngine {

View File

@@ -17,7 +17,6 @@
use std::collections::{BTreeMap, HashMap};
use std::sync::Arc;
use api::v1::flow::{DirtyWindowRequests, FlowResponse};
use catalog::CatalogManagerRef;
use common_error::ext::BoxedError;
use common_meta::ddl::create_flow::FlowType;
@@ -30,7 +29,8 @@ use common_telemetry::{debug, info};
use common_time::TimeToLive;
use query::QueryEngineRef;
use snafu::{ensure, OptionExt, ResultExt};
use store_api::storage::{RegionId, TableId};
use store_api::storage::RegionId;
use table::metadata::TableId;
use tokio::sync::{oneshot, RwLock};
use crate::batching_mode::frontend_client::FrontendClient;
@@ -42,7 +42,7 @@ use crate::error::{
ExternalSnafu, FlowAlreadyExistSnafu, FlowNotFoundSnafu, TableNotFoundMetaSnafu,
UnexpectedSnafu, UnsupportedSnafu,
};
use crate::metrics::METRIC_FLOW_BATCHING_ENGINE_BULK_MARK_TIME_WINDOW;
use crate::metrics::METRIC_FLOW_ROWS;
use crate::{CreateFlowArgs, Error, FlowId, TableName};
/// Batching mode Engine, responsible for driving all the batching mode tasks
@@ -78,116 +78,6 @@ impl BatchingEngine {
}
}
pub async fn handle_mark_dirty_time_window(
&self,
reqs: DirtyWindowRequests,
) -> Result<FlowResponse, Error> {
let table_info_mgr = self.table_meta.table_info_manager();
let mut group_by_table_id: HashMap<u32, Vec<_>> = HashMap::new();
for r in reqs.requests {
let tid = TableId::from(r.table_id);
let entry = group_by_table_id.entry(tid).or_default();
entry.extend(r.timestamps);
}
let tids = group_by_table_id.keys().cloned().collect::<Vec<TableId>>();
let table_infos =
table_info_mgr
.batch_get(&tids)
.await
.with_context(|_| TableNotFoundMetaSnafu {
msg: format!("Failed to get table info for table ids: {:?}", tids),
})?;
let group_by_table_name = group_by_table_id
.into_iter()
.filter_map(|(id, timestamps)| {
let table_name = table_infos.get(&id).map(|info| info.table_name());
let Some(table_name) = table_name else {
warn!("Failed to get table infos for table id: {:?}", id);
return None;
};
let table_name = [
table_name.catalog_name,
table_name.schema_name,
table_name.table_name,
];
let schema = &table_infos.get(&id).unwrap().table_info.meta.schema;
let time_index_unit = schema.column_schemas[schema.timestamp_index.unwrap()]
.data_type
.as_timestamp()
.unwrap()
.unit();
Some((table_name, (timestamps, time_index_unit)))
})
.collect::<HashMap<_, _>>();
let group_by_table_name = Arc::new(group_by_table_name);
let mut handles = Vec::new();
let tasks = self.tasks.read().await;
for (_flow_id, task) in tasks.iter() {
let src_table_names = &task.config.source_table_names;
if src_table_names
.iter()
.all(|name| !group_by_table_name.contains_key(name))
{
continue;
}
let group_by_table_name = group_by_table_name.clone();
let task = task.clone();
let handle: JoinHandle<Result<(), Error>> = tokio::spawn(async move {
let src_table_names = &task.config.source_table_names;
let mut all_dirty_windows = vec![];
for src_table_name in src_table_names {
if let Some((timestamps, unit)) = group_by_table_name.get(src_table_name) {
let Some(expr) = &task.config.time_window_expr else {
continue;
};
for timestamp in timestamps {
let align_start = expr
.eval(common_time::Timestamp::new(*timestamp, *unit))?
.0
.context(UnexpectedSnafu {
reason: "Failed to eval start value",
})?;
all_dirty_windows.push(align_start);
}
}
}
let mut state = task.state.write().unwrap();
let flow_id_label = task.config.flow_id.to_string();
for timestamp in all_dirty_windows {
state.dirty_time_windows.add_window(timestamp, None);
}
METRIC_FLOW_BATCHING_ENGINE_BULK_MARK_TIME_WINDOW
.with_label_values(&[&flow_id_label])
.set(state.dirty_time_windows.len() as f64);
Ok(())
});
handles.push(handle);
}
drop(tasks);
for handle in handles {
match handle.await {
Err(e) => {
warn!("Failed to handle inserts: {e}");
}
Ok(Ok(())) => (),
Ok(Err(e)) => {
warn!("Failed to handle inserts: {e}");
}
}
}
Ok(Default::default())
}
pub async fn handle_inserts_inner(
&self,
request: api::v1::region::InsertRequests,
@@ -266,6 +156,10 @@ impl BatchingEngine {
let Some(expr) = &task.config.time_window_expr else {
continue;
};
let row_cnt: usize = entry.iter().map(|rows| rows.rows.len()).sum();
METRIC_FLOW_ROWS
.with_label_values(&[&format!("{}-batching-in", task.config.flow_id)])
.inc_by(row_cnt as u64);
let involved_time_windows = expr.handle_rows(entry.clone()).await?;
let mut state = task.state.write().unwrap();
state

View File

@@ -32,6 +32,7 @@ use crate::batching_mode::MIN_REFRESH_DURATION;
use crate::error::{DatatypesSnafu, InternalSnafu, TimeSnafu, UnexpectedSnafu};
use crate::metrics::{
METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME_RANGE, METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_CNT,
METRIC_FLOW_BATCHING_ENGINE_WAIT_TIME,
};
use crate::{Error, FlowId};
@@ -102,8 +103,19 @@ impl TaskState {
})
.unwrap_or(last_duration);
METRIC_FLOW_BATCHING_ENGINE_WAIT_TIME
.with_label_values(&[
flow_id.to_string().as_str(),
time_window_size
.unwrap_or_default()
.as_secs_f64()
.to_string()
.as_str(),
])
.observe(next_duration.as_secs_f64());
// if have dirty time window, execute immediately to clean dirty time window
if self.dirty_time_windows.windows.is_empty() {
/*if self.dirty_time_windows.windows.is_empty() {
self.last_update_time + next_duration
} else {
debug!(
@@ -113,7 +125,9 @@ impl TaskState {
self.dirty_time_windows.windows
);
Instant::now()
}
}*/
self.last_update_time + next_duration
}
}
@@ -156,11 +170,6 @@ impl DirtyTimeWindows {
self.windows.clear();
}
/// Number of dirty windows.
pub fn len(&self) -> usize {
self.windows.len()
}
/// Generate all filter expressions consuming all time windows
///
/// there is two limits:
@@ -211,47 +220,59 @@ impl DirtyTimeWindows {
// get the first `window_cnt` time windows
let max_time_range = window_size * window_cnt as i32;
let nth = {
let mut cur_time_range = chrono::Duration::zero();
let mut nth_key = None;
for (idx, (start, end)) in self.windows.iter().enumerate() {
// if time range is too long, stop
if cur_time_range > max_time_range {
nth_key = Some(*start);
break;
}
// if we have enough time windows, stop
if idx >= window_cnt {
nth_key = Some(*start);
break;
}
let mut to_be_query = BTreeMap::new();
let mut new_windows = self.windows.clone();
let mut cur_time_range = chrono::Duration::zero();
for (idx, (start, end)) in self.windows.iter().enumerate() {
let first_end = start
.add_duration(window_size.to_std().unwrap())
.context(TimeSnafu)?;
let end = end.unwrap_or(first_end);
if let Some(end) = end {
if let Some(x) = end.sub(start) {
cur_time_range += x;
}
}
// if time range is too long, stop
if cur_time_range >= max_time_range {
break;
}
nth_key
};
let first_nth = {
if let Some(nth) = nth {
let mut after = self.windows.split_off(&nth);
std::mem::swap(&mut self.windows, &mut after);
after
} else {
std::mem::take(&mut self.windows)
// if we have enough time windows, stop
if idx >= window_cnt {
break;
}
};
if let Some(x) = end.sub(start) {
if cur_time_range + x <= max_time_range {
to_be_query.insert(*start, Some(end));
new_windows.remove(start);
cur_time_range += x;
} else {
// too large a window, split it
// split at window_size * times
let surplus = max_time_range - cur_time_range;
let times = surplus.num_seconds() / window_size.num_seconds();
let split_offset = window_size * times as i32;
let split_at = start
.add_duration(split_offset.to_std().unwrap())
.context(TimeSnafu)?;
to_be_query.insert(*start, Some(split_at));
// remove the original window
new_windows.remove(start);
new_windows.insert(split_at, Some(end));
cur_time_range += split_offset;
break;
}
}
}
self.windows = new_windows;
METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_CNT
.with_label_values(&[flow_id.to_string().as_str()])
.observe(first_nth.len() as f64);
.observe(to_be_query.len() as f64);
let full_time_range = first_nth
let full_time_range = to_be_query
.iter()
.fold(chrono::Duration::zero(), |acc, (start, end)| {
if let Some(end) = end {
@@ -266,7 +287,7 @@ impl DirtyTimeWindows {
.observe(full_time_range);
let mut expr_lst = vec![];
for (start, end) in first_nth.into_iter() {
for (start, end) in to_be_query.into_iter() {
// align using time window exprs
let (start, end) = if let Some(ctx) = task_ctx {
let Some(time_window_expr) = &ctx.config.time_window_expr else {
@@ -500,6 +521,64 @@ mod test {
"((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:00:21' AS TIMESTAMP)))",
)
),
// split range
(
Vec::from_iter((0..20).map(|i|Timestamp::new_second(i*3)).chain(std::iter::once(
Timestamp::new_second(60 + 3 * (DirtyTimeWindows::MERGE_DIST as i64 + 1)),
))),
(chrono::Duration::seconds(3), None),
BTreeMap::from([
(
Timestamp::new_second(0),
Some(Timestamp::new_second(
60
)),
),
(
Timestamp::new_second(60 + 3 * (DirtyTimeWindows::MERGE_DIST as i64 + 1)),
Some(Timestamp::new_second(
60 + 3 * (DirtyTimeWindows::MERGE_DIST as i64 + 1) + 3
)),
)]),
Some(
"((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:01:00' AS TIMESTAMP)))",
)
),
// split 2 min into 1 min
(
Vec::from_iter((0..40).map(|i|Timestamp::new_second(i*3))),
(chrono::Duration::seconds(3), None),
BTreeMap::from([
(
Timestamp::new_second(0),
Some(Timestamp::new_second(
40 * 3
)),
)]),
Some(
"((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:01:00' AS TIMESTAMP)))",
)
),
// split 3s + 1min into 3s + 57s
(
Vec::from_iter(std::iter::once(Timestamp::new_second(0)).chain((0..40).map(|i|Timestamp::new_second(20+i*3)))),
(chrono::Duration::seconds(3), None),
BTreeMap::from([
(
Timestamp::new_second(0),
Some(Timestamp::new_second(
3
)),
),(
Timestamp::new_second(20),
Some(Timestamp::new_second(
140
)),
)]),
Some(
"(((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:00:03' AS TIMESTAMP))) OR ((ts >= CAST('1970-01-01 00:00:20' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:01:17' AS TIMESTAMP))))",
)
),
// expired
(
vec![
@@ -516,6 +595,8 @@ mod test {
None
),
];
// let len = testcases.len();
// let testcases = testcases[(len - 2)..(len - 1)].to_vec();
for (lower_bounds, (window_size, expire_lower_bound), expected, expected_filter_expr) in
testcases
{

View File

@@ -35,6 +35,13 @@ lazy_static! {
vec![0.0, 5., 10., 20., 40., 80., 160., 320., 640.,]
)
.unwrap();
pub static ref METRIC_FLOW_BATCHING_ENGINE_WAIT_TIME: HistogramVec = register_histogram_vec!(
"greptime_flow_batching_engine_wait_time_secs",
"flow batching engine wait time between query(seconds)",
&["flow_id", "time_window_size"],
vec![0.0, 5., 10., 20., 40., 80., 160., 320., 640.,]
)
.unwrap();
pub static ref METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY: HistogramVec = register_histogram_vec!(
"greptime_flow_batching_engine_slow_query_secs",
"flow batching engine slow query(seconds)",
@@ -58,13 +65,6 @@ lazy_static! {
vec![60., 4. * 60., 16. * 60., 64. * 60., 256. * 60.]
)
.unwrap();
pub static ref METRIC_FLOW_BATCHING_ENGINE_BULK_MARK_TIME_WINDOW: GaugeVec =
register_gauge_vec!(
"greptime_flow_batching_engine_bulk_mark_time_window",
"flow batching engine query time window count marked by bulk inserts",
&["flow_id"],
)
.unwrap();
pub static ref METRIC_FLOW_BATCHING_ENGINE_START_QUERY_CNT: IntCounterVec =
register_int_counter_vec!(
"greptime_flow_batching_start_query_count",
@@ -83,7 +83,7 @@ lazy_static! {
register_int_gauge!("greptime_flow_run_interval_ms", "flow run interval in ms").unwrap();
pub static ref METRIC_FLOW_ROWS: IntCounterVec = register_int_counter_vec!(
"greptime_flow_processed_rows",
"Count of rows flowing through the system.",
"Count of rows flowing through the system",
&["direction"]
)
.unwrap();

View File

@@ -17,7 +17,6 @@
use std::net::SocketAddr;
use std::sync::Arc;
use api::v1::flow::DirtyWindowRequests;
use api::v1::{RowDeleteRequests, RowInsertRequests};
use cache::{TABLE_FLOWNODE_SET_CACHE_NAME, TABLE_ROUTE_CACHE_NAME};
use catalog::CatalogManagerRef;
@@ -137,18 +136,6 @@ impl flow_server::Flow for FlowService {
.map(Response::new)
.map_err(to_status_with_last_err)
}
async fn handle_mark_dirty_time_window(
&self,
reqs: Request<DirtyWindowRequests>,
) -> Result<Response<FlowResponse>, Status> {
self.dual_engine
.batching_engine()
.handle_mark_dirty_time_window(reqs.into_inner())
.await
.map(Response::new)
.map_err(to_status_with_last_err)
}
}
#[derive(Clone)]
@@ -591,7 +578,6 @@ impl FrontendInvoker {
layered_cache_registry.clone(),
inserter.clone(),
table_route_cache,
None,
));
let invoker = FrontendInvoker::new(inserter, deleter, statement_executor);

View File

@@ -25,7 +25,6 @@ common-catalog.workspace = true
common-config.workspace = true
common-datasource.workspace = true
common-error.workspace = true
common-frontend.workspace = true
common-function.workspace = true
common-grpc.workspace = true
common-macro.workspace = true
@@ -70,7 +69,6 @@ store-api.workspace = true
substrait.workspace = true
table.workspace = true
tokio.workspace = true
tokio-util.workspace = true
toml.workspace = true
tonic.workspace = true

View File

@@ -357,12 +357,6 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Query has been cancelled"))]
Cancelled {
#[snafu(implicit)]
location: Location,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -441,8 +435,6 @@ impl ErrorExt for Error {
Error::InFlightWriteBytesExceeded { .. } => StatusCode::RateLimited,
Error::DataFusion { error, .. } => datafusion_status_code::<Self>(error, None),
Error::Cancelled { .. } => StatusCode::Cancelled,
}
}

View File

@@ -30,10 +30,8 @@ use std::time::SystemTime;
use async_trait::async_trait;
use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
use catalog::process_manager::ProcessManagerRef;
use catalog::CatalogManagerRef;
use client::OutputData;
use common_base::cancellation::CancellableFuture;
use common_base::Plugins;
use common_config::KvBackendConfig;
use common_error::ext::{BoxedError, ErrorExt};
@@ -82,7 +80,6 @@ use crate::error::{
};
use crate::limiter::LimiterRef;
use crate::slow_query_recorder::SlowQueryRecorder;
use crate::stream_wrapper::CancellableStreamWrapper;
/// The frontend instance contains necessary components, and implements many
/// traits, like [`servers::query_handler::grpc::GrpcQueryHandler`],
@@ -99,7 +96,6 @@ pub struct Instance {
table_metadata_manager: TableMetadataManagerRef,
slow_query_recorder: Option<SlowQueryRecorder>,
limiter: Option<LimiterRef>,
process_manager: ProcessManagerRef,
}
impl Instance {
@@ -157,10 +153,6 @@ impl Instance {
pub fn inserter(&self) -> &InserterRef {
&self.inserter
}
pub fn process_manager(&self) -> &ProcessManagerRef {
&self.process_manager
}
}
fn parse_stmt(sql: &str, dialect: &(dyn Dialect + Send + Sync)) -> Result<Vec<Statement>> {
@@ -180,79 +172,50 @@ impl Instance {
None
};
let ticket = self.process_manager.register_query(
query_ctx.current_catalog().to_string(),
vec![query_ctx.current_schema()],
stmt.to_string(),
query_ctx.conn_info().to_string(),
None,
);
let query_fut = async {
match stmt {
Statement::Query(_) | Statement::Explain(_) | Statement::Delete(_) => {
// TODO: remove this when format is supported in datafusion
if let Statement::Explain(explain) = &stmt {
if let Some(format) = explain.format() {
query_ctx.set_explain_format(format.to_string());
}
let output = match stmt {
Statement::Query(_) | Statement::Explain(_) | Statement::Delete(_) => {
// TODO: remove this when format is supported in datafusion
if let Statement::Explain(explain) = &stmt {
if let Some(format) = explain.format() {
query_ctx.set_explain_format(format.to_string());
}
let stmt = QueryStatement::Sql(stmt);
let plan = self
.statement_executor
.plan(&stmt, query_ctx.clone())
.await?;
let QueryStatement::Sql(stmt) = stmt else {
unreachable!()
};
query_interceptor.pre_execute(&stmt, Some(&plan), query_ctx.clone())?;
self.statement_executor
.exec_plan(plan, query_ctx)
.await
.context(TableOperationSnafu)
}
Statement::Tql(tql) => {
let plan = self
.statement_executor
.plan_tql(tql.clone(), &query_ctx)
.await?;
query_interceptor.pre_execute(
&Statement::Tql(tql),
Some(&plan),
query_ctx.clone(),
)?;
self.statement_executor
.exec_plan(plan, query_ctx)
.await
.context(TableOperationSnafu)
}
_ => {
query_interceptor.pre_execute(&stmt, None, query_ctx.clone())?;
self.statement_executor
.execute_sql(stmt, query_ctx)
.await
.context(TableOperationSnafu)
}
let stmt = QueryStatement::Sql(stmt);
let plan = self
.statement_executor
.plan(&stmt, query_ctx.clone())
.await?;
let QueryStatement::Sql(stmt) = stmt else {
unreachable!()
};
query_interceptor.pre_execute(&stmt, Some(&plan), query_ctx.clone())?;
self.statement_executor.exec_plan(plan, query_ctx).await
}
Statement::Tql(tql) => {
let plan = self
.statement_executor
.plan_tql(tql.clone(), &query_ctx)
.await?;
query_interceptor.pre_execute(
&Statement::Tql(tql),
Some(&plan),
query_ctx.clone(),
)?;
self.statement_executor.exec_plan(plan, query_ctx).await
}
_ => {
query_interceptor.pre_execute(&stmt, None, query_ctx.clone())?;
self.statement_executor.execute_sql(stmt, query_ctx).await
}
};
CancellableFuture::new(query_fut, ticket.cancellation_handle.clone())
.await
.map_err(|_| error::CancelledSnafu.build())?
.map(|output| {
let Output { meta, data } = output;
let data = match data {
OutputData::Stream(stream) => {
OutputData::Stream(Box::pin(CancellableStreamWrapper::new(stream, ticket)))
}
other => other,
};
Output { data, meta }
})
output.context(TableOperationSnafu)
}
}
@@ -614,8 +577,6 @@ pub fn check_permission(
}
// cursor operations are always allowed once it's created
Statement::FetchCursor(_) | Statement::CloseCursor(_) => {}
// User can only kill process in their own catalog.
Statement::Kill(_) => {}
}
Ok(())
}

View File

@@ -15,7 +15,6 @@
use std::sync::Arc;
use cache::{TABLE_FLOWNODE_SET_CACHE_NAME, TABLE_ROUTE_CACHE_NAME};
use catalog::process_manager::ProcessManagerRef;
use catalog::CatalogManagerRef;
use common_base::Plugins;
use common_meta::cache::{LayeredCacheRegistryRef, TableRouteCacheRef};
@@ -55,11 +54,9 @@ pub struct FrontendBuilder {
node_manager: NodeManagerRef,
plugins: Option<Plugins>,
procedure_executor: ProcedureExecutorRef,
process_manager: ProcessManagerRef,
}
impl FrontendBuilder {
#[allow(clippy::too_many_arguments)]
pub fn new(
options: FrontendOptions,
kv_backend: KvBackendRef,
@@ -67,7 +64,6 @@ impl FrontendBuilder {
catalog_manager: CatalogManagerRef,
node_manager: NodeManagerRef,
procedure_executor: ProcedureExecutorRef,
process_manager: ProcessManagerRef,
) -> Self {
Self {
options,
@@ -78,7 +74,6 @@ impl FrontendBuilder {
node_manager,
plugins: None,
procedure_executor,
process_manager,
}
}
@@ -100,7 +95,7 @@ impl FrontendBuilder {
let kv_backend = self.kv_backend;
let node_manager = self.node_manager;
let plugins = self.plugins.unwrap_or_default();
let process_manager = self.process_manager;
let table_route_cache: TableRouteCacheRef =
self.layered_cache_registry
.get()
@@ -180,7 +175,6 @@ impl FrontendBuilder {
local_cache_invalidator,
inserter.clone(),
table_route_cache,
Some(process_manager.clone()),
));
let pipeline_operator = Arc::new(PipelineOperator::new(
@@ -222,7 +216,6 @@ impl FrontendBuilder {
table_metadata_manager: Arc::new(TableMetadataManager::new(kv_backend)),
slow_query_recorder,
limiter,
process_manager,
})
}
}

View File

@@ -35,8 +35,8 @@ use servers::query_handler::grpc::GrpcQueryHandler;
use servers::query_handler::sql::SqlQueryHandler;
use session::context::QueryContextRef;
use snafu::{ensure, OptionExt, ResultExt};
use table::metadata::TableId;
use table::table_name::TableName;
use table::TableRef;
use crate::error::{
CatalogSnafu, DataFusionSnafu, Error, InFlightWriteBytesExceededSnafu,
@@ -235,33 +235,34 @@ impl GrpcQueryHandler for Instance {
async fn put_record_batch(
&self,
table_name: &TableName,
table_ref: &mut Option<TableRef>,
table: &TableName,
table_id: &mut Option<TableId>,
decoder: &mut FlightDecoder,
data: FlightData,
) -> Result<AffectedRows> {
let table = if let Some(table) = table_ref {
table.clone()
let table_id = if let Some(table_id) = table_id {
*table_id
} else {
let table = self
.catalog_manager()
.table(
&table_name.catalog_name,
&table_name.schema_name,
&table_name.table_name,
&table.catalog_name,
&table.schema_name,
&table.table_name,
None,
)
.await
.context(CatalogSnafu)?
.with_context(|| TableNotFoundSnafu {
table_name: table_name.to_string(),
table_name: table.to_string(),
})?;
*table_ref = Some(table.clone());
table
let id = table.table_info().table_id();
*table_id = Some(id);
id
};
self.inserter
.handle_bulk_insert(table, decoder, data)
.handle_bulk_insert(table_id, decoder, data)
.await
.context(TableOperationSnafu)
}

View File

@@ -23,4 +23,3 @@ pub(crate) mod metrics;
pub mod server;
pub mod service_config;
pub(crate) mod slow_query_recorder;
mod stream_wrapper;

View File

@@ -20,7 +20,6 @@ use common_base::Plugins;
use common_config::Configurable;
use servers::error::Error as ServerError;
use servers::grpc::builder::GrpcServerBuilder;
use servers::grpc::frontend_grpc_handler::FrontendGrpcHandler;
use servers::grpc::greptime_handler::GreptimeRequestHandler;
use servers::grpc::{GrpcOptions, GrpcServer, GrpcServerConfig};
use servers::http::event::LogValidatorRef;
@@ -158,14 +157,11 @@ where
opts.grpc.flight_compression,
);
let frontend_grpc_handler =
FrontendGrpcHandler::new(self.instance.process_manager().clone());
let grpc_server = builder
.database_handler(greptime_request_handler.clone())
.prometheus_handler(self.instance.clone(), user_provider.clone())
.otel_arrow_handler(OtelArrowServiceHandler(self.instance.clone()))
.flight_handler(Arc::new(greptime_request_handler))
.frontend_grpc_handler(frontend_grpc_handler)
.build();
Ok(grpc_server)
}

View File

@@ -1,367 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::pin::Pin;
use std::task::{Context, Poll};
use catalog::process_manager::Ticket;
use common_recordbatch::adapter::RecordBatchMetrics;
use common_recordbatch::{OrderOption, RecordBatch, RecordBatchStream, SendableRecordBatchStream};
use datatypes::schema::SchemaRef;
use futures::Stream;
pub struct CancellableStreamWrapper {
inner: SendableRecordBatchStream,
ticket: Ticket,
}
impl Unpin for CancellableStreamWrapper {}
impl CancellableStreamWrapper {
pub fn new(stream: SendableRecordBatchStream, ticket: Ticket) -> Self {
Self {
inner: stream,
ticket,
}
}
}
impl Stream for CancellableStreamWrapper {
type Item = common_recordbatch::error::Result<RecordBatch>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = &mut *self;
if this.ticket.cancellation_handle.is_cancelled() {
return Poll::Ready(Some(common_recordbatch::error::StreamCancelledSnafu.fail()));
}
if let Poll::Ready(res) = Pin::new(&mut this.inner).poll_next(cx) {
return Poll::Ready(res);
}
// on pending, register cancellation waker.
this.ticket.cancellation_handle.waker().register(cx.waker());
// check if canceled again.
if this.ticket.cancellation_handle.is_cancelled() {
return Poll::Ready(Some(common_recordbatch::error::StreamCancelledSnafu.fail()));
}
Poll::Pending
}
}
impl RecordBatchStream for CancellableStreamWrapper {
fn schema(&self) -> SchemaRef {
self.inner.schema()
}
fn output_ordering(&self) -> Option<&[OrderOption]> {
self.inner.output_ordering()
}
fn metrics(&self) -> Option<RecordBatchMetrics> {
self.inner.metrics()
}
}
#[cfg(test)]
mod tests {
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Duration;
use catalog::process_manager::ProcessManager;
use common_recordbatch::adapter::RecordBatchMetrics;
use common_recordbatch::{OrderOption, RecordBatch, RecordBatchStream};
use datatypes::data_type::ConcreteDataType;
use datatypes::prelude::VectorRef;
use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
use datatypes::vectors::Int32Vector;
use futures::{Stream, StreamExt};
use tokio::time::{sleep, timeout};
use super::CancellableStreamWrapper;
// Mock stream for testing
struct MockRecordBatchStream {
schema: SchemaRef,
batches: Vec<common_recordbatch::error::Result<RecordBatch>>,
current: usize,
delay: Option<Duration>,
}
impl MockRecordBatchStream {
fn new(batches: Vec<common_recordbatch::error::Result<RecordBatch>>) -> Self {
let schema = Arc::new(Schema::new(vec![ColumnSchema::new(
"test_col",
ConcreteDataType::int32_datatype(),
false,
)]));
Self {
schema,
batches,
current: 0,
delay: None,
}
}
fn with_delay(mut self, delay: Duration) -> Self {
self.delay = Some(delay);
self
}
}
impl Stream for MockRecordBatchStream {
type Item = common_recordbatch::error::Result<RecordBatch>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if let Some(delay) = self.delay {
// Simulate async delay
let waker = cx.waker().clone();
let delay_clone = delay;
tokio::spawn(async move {
sleep(delay_clone).await;
waker.wake();
});
self.delay = None; // Only delay once
return Poll::Pending;
}
if self.current >= self.batches.len() {
return Poll::Ready(None);
}
let batch = self.batches[self.current].as_ref().unwrap().clone();
self.current += 1;
Poll::Ready(Some(Ok(batch)))
}
}
impl RecordBatchStream for MockRecordBatchStream {
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
fn output_ordering(&self) -> Option<&[OrderOption]> {
None
}
fn metrics(&self) -> Option<RecordBatchMetrics> {
None
}
}
fn create_test_batch() -> RecordBatch {
let schema = Arc::new(Schema::new(vec![ColumnSchema::new(
"test_col",
ConcreteDataType::int32_datatype(),
false,
)]));
RecordBatch::new(
schema,
vec![Arc::new(Int32Vector::from_values(0..3)) as VectorRef],
)
.unwrap()
}
#[tokio::test]
async fn test_stream_completes_normally() {
let batch = create_test_batch();
let mock_stream = MockRecordBatchStream::new(vec![Ok(batch.clone())]);
let process_manager = Arc::new(ProcessManager::new("".to_string(), None));
let ticket = process_manager.register_query(
"catalog".to_string(),
vec![],
"query".to_string(),
"client".to_string(),
None,
);
let mut cancellable_stream = CancellableStreamWrapper::new(Box::pin(mock_stream), ticket);
let result = cancellable_stream.next().await;
assert!(result.is_some());
assert!(result.unwrap().is_ok());
let end_result = cancellable_stream.next().await;
assert!(end_result.is_none());
}
#[tokio::test]
async fn test_stream_cancelled_before_start() {
let batch = create_test_batch();
let mock_stream = MockRecordBatchStream::new(vec![Ok(batch)]);
let process_manager = Arc::new(ProcessManager::new("".to_string(), None));
let ticket = process_manager.register_query(
"catalog".to_string(),
vec![],
"query".to_string(),
"client".to_string(),
None,
);
// Cancel before creating the wrapper
ticket.cancellation_handle.cancel();
let mut cancellable_stream = CancellableStreamWrapper::new(Box::pin(mock_stream), ticket);
let result = cancellable_stream.next().await;
assert!(result.is_some());
assert!(result.unwrap().is_err());
}
#[tokio::test]
async fn test_stream_cancelled_during_execution() {
let batch = create_test_batch();
let mock_stream =
MockRecordBatchStream::new(vec![Ok(batch)]).with_delay(Duration::from_millis(100));
let process_manager = Arc::new(ProcessManager::new("".to_string(), None));
let ticket = process_manager.register_query(
"catalog".to_string(),
vec![],
"query".to_string(),
"client".to_string(),
None,
);
let cancellation_handle = ticket.cancellation_handle.clone();
let mut cancellable_stream = CancellableStreamWrapper::new(Box::pin(mock_stream), ticket);
// Cancel after a short delay
tokio::spawn(async move {
sleep(Duration::from_millis(50)).await;
cancellation_handle.cancel();
});
let result = cancellable_stream.next().await;
assert!(result.is_some());
assert!(result.unwrap().is_err());
}
#[tokio::test]
async fn test_stream_completes_before_cancellation() {
let batch = create_test_batch();
let mock_stream = MockRecordBatchStream::new(vec![Ok(batch.clone())]);
let process_manager = Arc::new(ProcessManager::new("".to_string(), None));
let ticket = process_manager.register_query(
"catalog".to_string(),
vec![],
"query".to_string(),
"client".to_string(),
None,
);
let cancellation_handle = ticket.cancellation_handle.clone();
let mut cancellable_stream = CancellableStreamWrapper::new(Box::pin(mock_stream), ticket);
// Try to cancel after the stream should have completed
tokio::spawn(async move {
sleep(Duration::from_millis(100)).await;
cancellation_handle.cancel();
});
let result = cancellable_stream.next().await;
assert!(result.is_some());
assert!(result.unwrap().is_ok());
}
#[tokio::test]
async fn test_multiple_batches() {
let batch1 = create_test_batch();
let batch2 = create_test_batch();
let mock_stream = MockRecordBatchStream::new(vec![Ok(batch1), Ok(batch2)]);
let process_manager = Arc::new(ProcessManager::new("".to_string(), None));
let ticket = process_manager.register_query(
"catalog".to_string(),
vec![],
"query".to_string(),
"client".to_string(),
None,
);
let mut cancellable_stream = CancellableStreamWrapper::new(Box::pin(mock_stream), ticket);
// First batch
let result1 = cancellable_stream.next().await;
assert!(result1.is_some());
assert!(result1.unwrap().is_ok());
// Second batch
let result2 = cancellable_stream.next().await;
assert!(result2.is_some());
assert!(result2.unwrap().is_ok());
// End of stream
let end_result = cancellable_stream.next().await;
assert!(end_result.is_none());
}
#[tokio::test]
async fn test_record_batch_stream_methods() {
let batch = create_test_batch();
let mock_stream = MockRecordBatchStream::new(vec![Ok(batch)]);
let process_manager = Arc::new(ProcessManager::new("".to_string(), None));
let ticket = process_manager.register_query(
"catalog".to_string(),
vec![],
"query".to_string(),
"client".to_string(),
None,
);
let cancellable_stream = CancellableStreamWrapper::new(Box::pin(mock_stream), ticket);
// Test schema method
let schema = cancellable_stream.schema();
assert_eq!(schema.column_schemas().len(), 1);
assert_eq!(schema.column_schemas()[0].name, "test_col");
// Test output_ordering method
assert!(cancellable_stream.output_ordering().is_none());
// Test metrics method
assert!(cancellable_stream.metrics().is_none());
}
#[tokio::test]
async fn test_cancellation_during_pending_poll() {
let batch = create_test_batch();
let mock_stream =
MockRecordBatchStream::new(vec![Ok(batch)]).with_delay(Duration::from_millis(200));
let process_manager = Arc::new(ProcessManager::new("".to_string(), None));
let ticket = process_manager.register_query(
"catalog".to_string(),
vec![],
"query".to_string(),
"client".to_string(),
None,
);
let cancellation_handle = ticket.cancellation_handle.clone();
let mut cancellable_stream = CancellableStreamWrapper::new(Box::pin(mock_stream), ticket);
// Cancel while the stream is pending
tokio::spawn(async move {
sleep(Duration::from_millis(50)).await;
cancellation_handle.cancel();
});
let result = timeout(Duration::from_millis(300), cancellable_stream.next()).await;
assert!(result.is_ok());
let stream_result = result.unwrap();
assert!(stream_result.is_some());
assert!(stream_result.unwrap().is_err());
}
}

View File

@@ -27,7 +27,6 @@ futures-util.workspace = true
humantime-serde.workspace = true
itertools.workspace = true
lazy_static = "1.4"
mito-codec.workspace = true
mito2.workspace = true
mur3 = "0.1"
object-store.workspace = true

View File

@@ -115,7 +115,7 @@ pub enum Error {
#[snafu(display("Failed to encode primary key"))]
EncodePrimaryKey {
source: mito_codec::error::Error,
source: mito2::error::Error,
#[snafu(implicit)]
location: Location,
},

View File

@@ -18,7 +18,7 @@ use std::hash::Hash;
use api::v1::value::ValueData;
use api::v1::{ColumnDataType, ColumnSchema, Row, Rows, SemanticType, Value};
use datatypes::value::ValueRef;
use mito_codec::row_converter::SparsePrimaryKeyCodec;
use mito2::row_converter::SparsePrimaryKeyCodec;
use smallvec::SmallVec;
use snafu::ResultExt;
use store_api::codec::PrimaryKeyEncoding;

View File

@@ -1,30 +0,0 @@
[package]
name = "mito-codec"
version.workspace = true
edition.workspace = true
license.workspace = true
[features]
default = []
testing = []
[dependencies]
api.workspace = true
bytes.workspace = true
common-base.workspace = true
common-decimal.workspace = true
common-error.workspace = true
common-macro.workspace = true
common-recordbatch.workspace = true
common-telemetry.workspace = true
common-time.workspace = true
datatypes.workspace = true
memcomparable = "0.2"
paste.workspace = true
serde.workspace = true
snafu.workspace = true
store-api.workspace = true
[dev-dependencies]
datafusion-common.workspace = true
datafusion-expr.workspace = true

View File

@@ -1,95 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::any::Any;
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_macro::stack_trace_debug;
use datatypes::prelude::ConcreteDataType;
use snafu::{Location, Snafu};
/// Error definitions for mito encoding.
#[derive(Snafu)]
#[snafu(visibility(pub))]
#[stack_trace_debug]
pub enum Error {
#[snafu(display("Row value mismatches field data type"))]
FieldTypeMismatch {
// Box the source to reduce the size of the error.
#[snafu(source(from(datatypes::error::Error, Box::new)))]
source: Box<datatypes::error::Error>,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to serialize field"))]
SerializeField {
#[snafu(source)]
error: memcomparable::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display(
"Data type: {} does not support serialization/deserialization",
data_type,
))]
NotSupportedField {
data_type: ConcreteDataType,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to deserialize field"))]
DeserializeField {
#[snafu(source)]
error: memcomparable::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Operation not supported: {}", err_msg))]
UnsupportedOperation {
err_msg: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Encode null value"))]
IndexEncodeNull {
#[snafu(implicit)]
location: Location,
},
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
impl ErrorExt for Error {
fn status_code(&self) -> StatusCode {
use Error::*;
match self {
FieldTypeMismatch { source, .. } => source.status_code(),
SerializeField { .. } | DeserializeField { .. } | IndexEncodeNull { .. } => {
StatusCode::InvalidArguments
}
NotSupportedField { .. } | UnsupportedOperation { .. } => StatusCode::Unsupported,
}
}
fn as_any(&self) -> &dyn Any {
self
}
}

View File

@@ -1,24 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Codec utilities for the Mito protocol.
pub mod error;
pub mod index;
pub mod key_values;
pub mod primary_key_filter;
pub mod row_converter;
#[cfg(any(test, feature = "testing"))]
pub mod test_util;

View File

@@ -1,164 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
pub mod dense;
pub mod sparse;
use std::fmt::Debug;
use std::sync::Arc;
use common_recordbatch::filter::SimpleFilterEvaluator;
use datatypes::value::{Value, ValueRef};
pub use dense::{DensePrimaryKeyCodec, SortField};
pub use sparse::{SparsePrimaryKeyCodec, SparseValues, COLUMN_ID_ENCODE_SIZE};
use store_api::codec::PrimaryKeyEncoding;
use store_api::metadata::{RegionMetadata, RegionMetadataRef};
use store_api::storage::ColumnId;
use crate::error::Result;
use crate::key_values::KeyValue;
/// Row value encoder/decoder.
pub trait PrimaryKeyCodecExt {
/// Encodes rows to bytes.
/// # Note
/// Ensure the length of row iterator matches the length of fields.
fn encode<'a, I>(&self, row: I) -> Result<Vec<u8>>
where
I: Iterator<Item = ValueRef<'a>>,
{
let mut buffer = Vec::new();
self.encode_to_vec(row, &mut buffer)?;
Ok(buffer)
}
/// Encodes rows to specific vec.
/// # Note
/// Ensure the length of row iterator matches the length of fields.
fn encode_to_vec<'a, I>(&self, row: I, buffer: &mut Vec<u8>) -> Result<()>
where
I: Iterator<Item = ValueRef<'a>>;
}
pub trait PrimaryKeyFilter: Send + Sync {
/// Returns true if the primary key matches the filter.
fn matches(&mut self, pk: &[u8]) -> bool;
}
/// Composite values decoded from primary key bytes.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum CompositeValues {
Dense(Vec<(ColumnId, Value)>),
Sparse(SparseValues),
}
impl CompositeValues {
/// Extends the composite values with the given values.
pub fn extend(&mut self, values: &[(ColumnId, Value)]) {
match self {
CompositeValues::Dense(dense_values) => {
for (column_id, value) in values {
dense_values.push((*column_id, value.clone()));
}
}
CompositeValues::Sparse(sprase_value) => {
for (column_id, value) in values {
sprase_value.insert(*column_id, value.clone());
}
}
}
}
}
#[cfg(any(test, feature = "testing"))]
impl CompositeValues {
pub fn into_sparse(self) -> SparseValues {
match self {
CompositeValues::Sparse(v) => v,
_ => panic!("CompositeValues is not sparse"),
}
}
pub fn into_dense(self) -> Vec<Value> {
match self {
CompositeValues::Dense(v) => v.into_iter().map(|(_, v)| v).collect(),
_ => panic!("CompositeValues is not dense"),
}
}
}
pub trait PrimaryKeyCodec: Send + Sync + Debug {
/// Encodes a key value to bytes.
fn encode_key_value(&self, key_value: &KeyValue, buffer: &mut Vec<u8>) -> Result<()>;
/// Encodes values to bytes.
fn encode_values(&self, values: &[(ColumnId, Value)], buffer: &mut Vec<u8>) -> Result<()>;
/// Encodes values to bytes.
fn encode_value_refs(
&self,
values: &[(ColumnId, ValueRef)],
buffer: &mut Vec<u8>,
) -> Result<()>;
/// Returns the number of fields in the primary key.
fn num_fields(&self) -> Option<usize>;
/// Returns a primary key filter factory.
fn primary_key_filter(
&self,
metadata: &RegionMetadataRef,
filters: Arc<Vec<SimpleFilterEvaluator>>,
) -> Box<dyn PrimaryKeyFilter>;
/// Returns the estimated size of the primary key.
fn estimated_size(&self) -> Option<usize> {
None
}
/// Returns the encoding type of the primary key.
fn encoding(&self) -> PrimaryKeyEncoding;
/// Decodes the primary key from the given bytes.
///
/// Returns a [`CompositeValues`] that follows the primary key ordering.
fn decode(&self, bytes: &[u8]) -> Result<CompositeValues>;
/// Decode the leftmost value from bytes.
fn decode_leftmost(&self, bytes: &[u8]) -> Result<Option<Value>>;
}
/// Builds a primary key codec from region metadata.
pub fn build_primary_key_codec(region_metadata: &RegionMetadata) -> Arc<dyn PrimaryKeyCodec> {
let fields = region_metadata.primary_key_columns().map(|col| {
(
col.column_id,
SortField::new(col.column_schema.data_type.clone()),
)
});
build_primary_key_codec_with_fields(region_metadata.primary_key_encoding, fields)
}
/// Builds a primary key codec from region metadata.
pub fn build_primary_key_codec_with_fields(
encoding: PrimaryKeyEncoding,
fields: impl Iterator<Item = (ColumnId, SortField)>,
) -> Arc<dyn PrimaryKeyCodec> {
match encoding {
PrimaryKeyEncoding::Dense => Arc::new(DensePrimaryKeyCodec::with_fields(fields.collect())),
PrimaryKeyEncoding::Sparse => {
Arc::new(SparsePrimaryKeyCodec::with_fields(fields.collect()))
}
}
}

View File

@@ -48,7 +48,6 @@ itertools.workspace = true
lazy_static = "1.4"
log-store = { workspace = true }
memcomparable = "0.2"
mito-codec.workspace = true
moka = { workspace = true, features = ["sync", "future"] }
object-store.workspace = true
parquet = { workspace = true, features = ["async"] }
@@ -83,7 +82,6 @@ common-test-util.workspace = true
criterion = "0.4"
dotenv.workspace = true
log-store.workspace = true
mito-codec = { workspace = true, features = ["testing"] }
object-store = { workspace = true, features = ["services-memory"] }
rskafka.workspace = true
rstest.workspace = true

View File

@@ -25,8 +25,8 @@ use mito2::memtable::partition_tree::{PartitionTreeConfig, PartitionTreeMemtable
use mito2::memtable::time_series::TimeSeriesMemtable;
use mito2::memtable::{KeyValues, Memtable};
use mito2::region::options::MergeMode;
use mito2::row_converter::DensePrimaryKeyCodec;
use mito2::test_util::memtable_util::{self, region_metadata_to_row_schema};
use mito_codec::row_converter::DensePrimaryKeyCodec;
use rand::rngs::ThreadRng;
use rand::seq::IndexedRandom;
use rand::Rng;

View File

@@ -18,9 +18,8 @@
use common_base::readable_size::ReadableSize;
use common_base::BitVec;
use common_time::Timestamp;
use smallvec::{smallvec, SmallVec};
use crate::sst::file::{FileHandle, FileId};
use crate::sst::file::FileHandle;
/// Default max compaction output file size when not specified.
const DEFAULT_MAX_OUTPUT_SIZE: u64 = ReadableSize::gb(2).as_bytes();
@@ -126,68 +125,17 @@ pub trait Item: Ranged + Clone {
fn size(&self) -> usize;
}
/// A group of files that are created by the same compaction task.
#[derive(Debug, Clone)]
pub struct FileGroup {
files: SmallVec<[FileHandle; 2]>,
size: usize,
num_rows: usize,
min_timestamp: Timestamp,
max_timestamp: Timestamp,
}
impl FileGroup {
pub(crate) fn new_with_file(file: FileHandle) -> Self {
let size = file.size() as usize;
let (min_timestamp, max_timestamp) = file.time_range();
let num_rows = file.num_rows();
Self {
files: smallvec![file],
size,
num_rows,
min_timestamp,
max_timestamp,
}
}
pub(crate) fn num_rows(&self) -> usize {
self.num_rows
}
pub(crate) fn add_file(&mut self, file: FileHandle) {
self.size += file.size() as usize;
self.num_rows += file.num_rows();
let (min_timestamp, max_timestamp) = file.time_range();
self.min_timestamp = self.min_timestamp.min(min_timestamp);
self.max_timestamp = self.max_timestamp.max(max_timestamp);
self.files.push(file);
}
#[cfg(test)]
pub(crate) fn files(&self) -> &[FileHandle] {
&self.files[..]
}
pub(crate) fn file_ids(&self) -> SmallVec<[FileId; 2]> {
SmallVec::from_iter(self.files.iter().map(|f| f.file_id()))
}
pub(crate) fn into_files(self) -> impl Iterator<Item = FileHandle> {
self.files.into_iter()
}
}
impl Ranged for FileGroup {
impl Ranged for FileHandle {
type BoundType = Timestamp;
fn range(&self) -> (Self::BoundType, Self::BoundType) {
(self.min_timestamp, self.max_timestamp)
self.time_range()
}
}
impl Item for FileGroup {
impl Item for FileHandle {
fn size(&self) -> usize {
self.size
self.size() as usize
}
}

View File

@@ -12,8 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::num::NonZeroU64;
use common_time::Timestamp;
use crate::sst::file::{FileHandle, FileId, FileMeta, Level};
@@ -25,23 +23,6 @@ pub fn new_file_handle(
start_ts_millis: i64,
end_ts_millis: i64,
level: Level,
) -> FileHandle {
new_file_handle_with_sequence(
file_id,
start_ts_millis,
end_ts_millis,
level,
start_ts_millis as u64,
)
}
/// Test util to create file handles.
pub fn new_file_handle_with_sequence(
file_id: FileId,
start_ts_millis: i64,
end_ts_millis: i64,
level: Level,
sequence: u64,
) -> FileHandle {
let file_purger = new_noop_file_purger();
FileHandle::new(
@@ -58,7 +39,7 @@ pub fn new_file_handle_with_sequence(
index_file_size: 0,
num_rows: 0,
num_row_groups: 0,
sequence: NonZeroU64::new(sequence),
sequence: None,
},
file_purger,
)

View File

@@ -15,7 +15,6 @@
use std::collections::hash_map::Entry;
use std::collections::{BTreeMap, HashMap};
use std::fmt::Debug;
use std::num::NonZeroU64;
use common_base::readable_size::ReadableSize;
use common_telemetry::info;
@@ -27,9 +26,7 @@ use store_api::storage::RegionId;
use crate::compaction::buckets::infer_time_bucket;
use crate::compaction::compactor::CompactionRegion;
use crate::compaction::picker::{Picker, PickerOutput};
use crate::compaction::run::{
find_sorted_runs, merge_seq_files, reduce_runs, FileGroup, Item, Ranged,
};
use crate::compaction::run::{find_sorted_runs, merge_seq_files, reduce_runs};
use crate::compaction::{get_expired_ssts, CompactionOutput};
use crate::sst::file::{overlaps, FileHandle, Level};
use crate::sst::version::LevelMeta;
@@ -63,8 +60,7 @@ impl TwcsPicker {
if files.files.is_empty() {
continue;
}
let mut files_to_merge: Vec<_> = files.files().cloned().collect();
let sorted_runs = find_sorted_runs(&mut files_to_merge);
let sorted_runs = find_sorted_runs(&mut files.files);
let found_runs = sorted_runs.len();
// We only remove deletion markers if we found less than 2 runs and not in append mode.
// because after compaction there will be no overlapping files.
@@ -94,7 +90,7 @@ impl TwcsPicker {
);
output.push(CompactionOutput {
output_level: LEVEL_COMPACTED, // always compact to l1
inputs: inputs.into_iter().flat_map(|fg| fg.into_files()).collect(),
inputs,
filter_deleted,
output_time_range: None, // we do not enforce output time range in twcs compactions.
});
@@ -113,21 +109,21 @@ fn log_pick_result(
file_num: usize,
max_output_file_size: Option<u64>,
filter_deleted: bool,
inputs: &[FileGroup],
inputs: &[FileHandle],
) {
let input_file_str: Vec<String> = inputs
.iter()
.map(|f| {
let range = f.range();
let range = f.time_range();
let start = range.0.to_iso8601_string();
let end = range.1.to_iso8601_string();
let num_rows = f.num_rows();
format!(
"FileGroup{{id: {:?}, range: ({}, {}), size: {}, num rows: {} }}",
f.file_ids(),
"SST{{id: {}, range: ({}, {}), size: {}, num rows: {} }}",
f.file_id(),
start,
end,
ReadableSize(f.size() as u64),
ReadableSize(f.size()),
num_rows
)
})
@@ -202,9 +198,7 @@ impl Picker for TwcsPicker {
struct Window {
start: Timestamp,
end: Timestamp,
// Mapping from file sequence to file groups. Files with the same sequence is considered
// created from the same compaction task.
files: HashMap<Option<NonZeroU64>, FileGroup>,
files: Vec<FileHandle>,
time_window: i64,
overlapping: bool,
}
@@ -213,11 +207,10 @@ impl Window {
/// Creates a new [Window] with given file.
fn new_with_file(file: FileHandle) -> Self {
let (start, end) = file.time_range();
let files = HashMap::from([(file.meta_ref().sequence, FileGroup::new_with_file(file))]);
Self {
start,
end,
files,
files: vec![file],
time_window: 0,
overlapping: false,
}
@@ -233,19 +226,7 @@ impl Window {
let (start, end) = file.time_range();
self.start = self.start.min(start);
self.end = self.end.max(end);
match self.files.entry(file.meta_ref().sequence) {
Entry::Occupied(mut o) => {
o.get_mut().add_file(file);
}
Entry::Vacant(v) => {
v.insert(FileGroup::new_with_file(file));
}
}
}
fn files(&self) -> impl Iterator<Item = &FileGroup> {
self.files.values()
self.files.push(file);
}
}
@@ -330,7 +311,7 @@ mod tests {
use std::collections::HashSet;
use super::*;
use crate::compaction::test_util::{new_file_handle, new_file_handle_with_sequence};
use crate::compaction::test_util::new_file_handle;
use crate::sst::file::{FileId, Level};
#[test]
@@ -390,9 +371,7 @@ mod tests {
.iter(),
3,
);
let fgs = &windows.get(&0).unwrap().files;
assert_eq!(1, fgs.len());
assert_eq!(fgs.values().map(|f| f.files().len()).sum::<usize>(), 5);
assert_eq!(5, windows.get(&0).unwrap().files.len());
let files = [FileId::random(); 3];
let windows = assign_to_windows(
@@ -406,56 +385,15 @@ mod tests {
);
assert_eq!(
files[0],
windows.get(&0).unwrap().files().next().unwrap().files()[0].file_id()
windows.get(&0).unwrap().files.first().unwrap().file_id()
);
assert_eq!(
files[1],
windows.get(&3).unwrap().files().next().unwrap().files()[0].file_id()
windows.get(&3).unwrap().files.first().unwrap().file_id()
);
assert_eq!(
files[2],
windows.get(&12).unwrap().files().next().unwrap().files()[0].file_id()
);
}
#[test]
fn test_assign_file_groups_to_windows() {
let files = [
FileId::random(),
FileId::random(),
FileId::random(),
FileId::random(),
];
let windows = assign_to_windows(
[
new_file_handle_with_sequence(files[0], 0, 999, 0, 1),
new_file_handle_with_sequence(files[1], 0, 999, 0, 1),
new_file_handle_with_sequence(files[2], 0, 999, 0, 2),
new_file_handle_with_sequence(files[3], 0, 999, 0, 2),
]
.iter(),
3,
);
assert_eq!(windows.len(), 1);
let fgs = &windows.get(&0).unwrap().files;
assert_eq!(2, fgs.len());
assert_eq!(
fgs.get(&NonZeroU64::new(1))
.unwrap()
.files()
.iter()
.map(|f| f.file_id())
.collect::<HashSet<_>>(),
[files[0], files[1]].into_iter().collect()
);
assert_eq!(
fgs.get(&NonZeroU64::new(2))
.unwrap()
.files()
.iter()
.map(|f| f.file_id())
.collect::<HashSet<_>>(),
[files[2], files[3]].into_iter().collect()
windows.get(&12).unwrap().files.first().unwrap().file_id()
);
}
@@ -470,22 +408,8 @@ mod tests {
];
files[0].set_compacting(true);
files[2].set_compacting(true);
let mut windows = assign_to_windows(files.iter(), 3);
let window0 = windows.remove(&0).unwrap();
assert_eq!(1, window0.files.len());
let candidates = window0
.files
.into_values()
.flat_map(|fg| fg.into_files())
.map(|f| f.file_id())
.collect::<HashSet<_>>();
assert_eq!(candidates.len(), 3);
assert_eq!(
candidates,
[files[1].file_id(), files[3].file_id(), files[4].file_id()]
.into_iter()
.collect::<HashSet<_>>()
);
let windows = assign_to_windows(files.iter(), 3);
assert_eq!(3, windows.get(&0).unwrap().files.len());
}
/// (Window value, overlapping, files' time ranges in window)
@@ -514,11 +438,9 @@ mod tests {
let mut file_ranges = actual_window
.files
.iter()
.flat_map(|(_, f)| {
f.files().iter().map(|f| {
let (s, e) = f.time_range();
(s.value(), e.value())
})
.map(|f| {
let (s, e) = f.time_range();
(s.value(), e.value())
})
.collect::<Vec<_>>();
file_ranges.sort_unstable_by(|l, r| l.0.cmp(&r.0).then(l.1.cmp(&r.1)));
@@ -685,10 +607,10 @@ mod tests {
CompactionPickerTestCase {
window_size: 3,
input_files: [
new_file_handle_with_sequence(file_ids[0], -2000, -3, 0, 1),
new_file_handle_with_sequence(file_ids[1], -3000, -100, 0, 2),
new_file_handle_with_sequence(file_ids[2], 0, 2999, 0, 3), //active windows
new_file_handle_with_sequence(file_ids[3], 50, 2998, 0, 4), //active windows
new_file_handle(file_ids[0], -2000, -3, 0),
new_file_handle(file_ids[1], -3000, -100, 0),
new_file_handle(file_ids[2], 0, 2999, 0), //active windows
new_file_handle(file_ids[3], 50, 2998, 0), //active windows
]
.to_vec(),
expected_outputs: vec![
@@ -714,11 +636,11 @@ mod tests {
CompactionPickerTestCase {
window_size: 3,
input_files: [
new_file_handle_with_sequence(file_ids[0], -2000, -3, 0, 1),
new_file_handle_with_sequence(file_ids[1], -3000, -100, 0, 2),
new_file_handle_with_sequence(file_ids[2], 0, 2999, 0, 3),
new_file_handle_with_sequence(file_ids[3], 50, 2998, 0, 4),
new_file_handle_with_sequence(file_ids[4], 11, 2990, 0, 5),
new_file_handle(file_ids[0], -2000, -3, 0),
new_file_handle(file_ids[1], -3000, -100, 0),
new_file_handle(file_ids[2], 0, 2999, 0),
new_file_handle(file_ids[3], 50, 2998, 0),
new_file_handle(file_ids[4], 11, 2990, 0),
]
.to_vec(),
expected_outputs: vec![
@@ -733,27 +655,6 @@ mod tests {
],
}
.check();
// Case 3:
// A compaction may split output into several files that have overlapping time ranges and same sequence,
// we should treat these files as one FileGroup.
let file_ids = (0..6).map(|_| FileId::random()).collect::<Vec<_>>();
CompactionPickerTestCase {
window_size: 3,
input_files: [
new_file_handle_with_sequence(file_ids[0], 0, 2999, 1, 1),
new_file_handle_with_sequence(file_ids[1], 0, 2998, 1, 1),
new_file_handle_with_sequence(file_ids[2], 3000, 5999, 1, 2),
new_file_handle_with_sequence(file_ids[3], 3000, 5000, 1, 2),
new_file_handle_with_sequence(file_ids[4], 11, 2990, 0, 3),
]
.to_vec(),
expected_outputs: vec![ExpectedOutput {
input_files: vec![0, 1, 4],
output_level: 1,
}],
}
.check();
}
// TODO(hl): TTL tester that checks if get_expired_ssts function works as expected.

View File

@@ -42,13 +42,6 @@ use crate::worker::WorkerId;
#[snafu(visibility(pub))]
#[stack_trace_debug]
pub enum Error {
#[snafu(display("Unexpected data type"))]
DataTypeMismatch {
source: datatypes::error::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("External error, context: {}", context))]
External {
source: BoxedError,
@@ -298,6 +291,35 @@ pub enum Error {
#[snafu(display("Failed to write region"))]
WriteGroup { source: Arc<Error> },
#[snafu(display("Row value mismatches field data type"))]
FieldTypeMismatch { source: datatypes::error::Error },
#[snafu(display("Failed to serialize field"))]
SerializeField {
#[snafu(source)]
error: memcomparable::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display(
"Data type: {} does not support serialization/deserialization",
data_type,
))]
NotSupportedField {
data_type: ConcreteDataType,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to deserialize field"))]
DeserializeField {
#[snafu(source)]
error: memcomparable::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Invalid parquet SST file {}, reason: {}", file, reason))]
InvalidParquet {
file: String,
@@ -1006,20 +1028,6 @@ pub enum Error {
location: Location,
source: common_grpc::Error,
},
#[snafu(display("Failed to encode"))]
Encode {
#[snafu(implicit)]
location: Location,
source: mito_codec::error::Error,
},
#[snafu(display("Failed to decode"))]
Decode {
#[snafu(implicit)]
location: Location,
source: mito_codec::error::Error,
},
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@@ -1044,7 +1052,6 @@ impl ErrorExt for Error {
use Error::*;
match self {
DataTypeMismatch { source, .. } => source.status_code(),
OpenDal { .. } | ReadParquet { .. } => StatusCode::StorageUnavailable,
WriteWal { source, .. } | ReadWal { source, .. } | DeleteWal { source, .. } => {
source.status_code()
@@ -1088,6 +1095,7 @@ impl ErrorExt for Error {
| BiErrors { .. }
| StopScheduler { .. }
| ComputeVector { .. }
| SerializeField { .. }
| EncodeMemtable { .. }
| CreateDir { .. }
| ReadDataPart { .. }
@@ -1099,7 +1107,9 @@ impl ErrorExt for Error {
WriteParquet { .. } => StatusCode::StorageUnavailable,
WriteGroup { source, .. } => source.status_code(),
EncodeSparsePrimaryKey { .. } => StatusCode::Unexpected,
FieldTypeMismatch { source, .. } => source.status_code(),
NotSupportedField { .. } => StatusCode::Unsupported,
DeserializeField { .. } | EncodeSparsePrimaryKey { .. } => StatusCode::Unexpected,
InvalidBatch { .. } => StatusCode::InvalidArguments,
InvalidRecordBatch { .. } => StatusCode::InvalidArguments,
ConvertVector { source, .. } => source.status_code(),
@@ -1171,9 +1181,7 @@ impl ErrorExt for Error {
ScanSeries { source, .. } => source.status_code(),
ScanMultiTimes { .. } => StatusCode::InvalidArguments,
ConvertBulkWalEntry { source, .. } => source.status_code(),
Encode { source, .. } | Decode { source, .. } => source.status_code(),
Error::ConvertBulkWalEntry { source, .. } => source.status_code(),
}
}

View File

@@ -41,6 +41,7 @@ pub mod read;
pub mod region;
mod region_write_ctx;
pub mod request;
pub mod row_converter;
pub mod schedule;
pub mod sst;
mod time_provider;

View File

@@ -21,8 +21,6 @@ use std::sync::Arc;
pub use bulk::part::EncodedBulkPart;
use common_time::Timestamp;
use mito_codec::key_values::KeyValue;
pub use mito_codec::key_values::KeyValues;
use serde::{Deserialize, Serialize};
use store_api::metadata::RegionMetadataRef;
use store_api::storage::{ColumnId, SequenceNumber};
@@ -31,6 +29,8 @@ use table::predicate::Predicate;
use crate::config::MitoConfig;
use crate::error::Result;
use crate::flush::WriteBufferManagerRef;
use crate::memtable::key_values::KeyValue;
pub use crate::memtable::key_values::KeyValues;
use crate::memtable::partition_tree::{PartitionTreeConfig, PartitionTreeMemtableBuilder};
use crate::memtable::time_series::TimeSeriesMemtableBuilder;
use crate::metrics::WRITE_BUFFER_BYTES;
@@ -42,6 +42,7 @@ use crate::sst::file::FileTimeRange;
mod builder;
pub mod bulk;
pub mod key_values;
pub mod partition_tree;
mod simple_bulk_memtable;
mod stats;

View File

@@ -16,13 +16,13 @@
use std::sync::{Arc, RwLock};
use mito_codec::key_values::KeyValue;
use store_api::metadata::RegionMetadataRef;
use store_api::storage::{ColumnId, SequenceNumber};
use table::predicate::Predicate;
use crate::error::Result;
use crate::memtable::bulk::part::{BulkPart, EncodedBulkPart};
use crate::memtable::key_values::KeyValue;
use crate::memtable::{
BoxedBatchIterator, KeyValues, Memtable, MemtableId, MemtableRanges, MemtableRef,
MemtableStats, PredicateGroup,

View File

@@ -17,12 +17,12 @@
use std::collections::VecDeque;
use std::sync::Arc;
use mito_codec::row_converter::{build_primary_key_codec, DensePrimaryKeyCodec};
use parquet::file::metadata::ParquetMetaData;
use store_api::metadata::RegionMetadataRef;
use store_api::storage::ColumnId;
use table::predicate::Predicate;
use crate::row_converter::{build_primary_key_codec, DensePrimaryKeyCodec};
use crate::sst::parquet::file_range::RangeBase;
use crate::sst::parquet::format::ReadFormat;
use crate::sst::parquet::reader::SimpleFilterContext;

View File

@@ -38,8 +38,6 @@ use datatypes::data_type::DataType;
use datatypes::prelude::{MutableVector, ScalarVectorBuilder, Vector};
use datatypes::value::Value;
use datatypes::vectors::Helper;
use mito_codec::key_values::{KeyValue, KeyValuesRef};
use mito_codec::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodec, PrimaryKeyCodecExt};
use parquet::arrow::ArrowWriter;
use parquet::data_type::AsBytes;
use parquet::file::metadata::ParquetMetaData;
@@ -49,12 +47,13 @@ use store_api::metadata::RegionMetadataRef;
use store_api::storage::SequenceNumber;
use table::predicate::Predicate;
use crate::error::{
self, ComputeArrowSnafu, EncodeMemtableSnafu, EncodeSnafu, NewRecordBatchSnafu, Result,
};
use crate::error;
use crate::error::{ComputeArrowSnafu, EncodeMemtableSnafu, NewRecordBatchSnafu, Result};
use crate::memtable::bulk::context::BulkIterContextRef;
use crate::memtable::bulk::part_reader::BulkPartIter;
use crate::memtable::key_values::{KeyValue, KeyValuesRef};
use crate::memtable::BoxedBatchIterator;
use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodec, PrimaryKeyCodecExt};
use crate::sst::parquet::format::{PrimaryKeyArray, ReadFormat};
use crate::sst::parquet::helper::parse_parquet_metadata;
use crate::sst::to_sst_arrow_schema;
@@ -355,9 +354,7 @@ fn mutations_to_record_batch(
for row in key_values.iter() {
pk_buffer.clear();
pk_encoder
.encode_to_vec(row.primary_keys(), &mut pk_buffer)
.context(EncodeSnafu)?;
pk_encoder.encode_to_vec(row.primary_keys(), &mut pk_buffer)?;
pk_builder.append_value(pk_buffer.as_bytes());
ts_vector.push_value_ref(row.timestamp());
sequence_builder.append_value(row.sequence());

View File

@@ -31,7 +31,7 @@ pub struct KeyValues {
///
/// This mutation must be a valid mutation and rows in the mutation
/// must not be `None`.
pub mutation: Mutation,
pub(crate) mutation: Mutation,
/// Key value read helper.
helper: SparseReadRowHelper,
/// Primary key encoding hint.
@@ -333,7 +333,8 @@ mod tests {
use api::v1::{self, ColumnDataType, SemanticType};
use super::*;
use crate::test_util::{i64_value, TestRegionMetadataBuilder};
use crate::test_util::i64_value;
use crate::test_util::meta_util::TestRegionMetadataBuilder;
const TS_NAME: &str = "ts";
const START_SEQ: SequenceNumber = 100;

View File

@@ -19,6 +19,7 @@ mod dedup;
mod dict;
mod merger;
mod partition;
mod primary_key_filter;
mod shard;
mod shard_builder;
mod tree;
@@ -28,8 +29,7 @@ use std::sync::atomic::{AtomicI64, AtomicU64, AtomicUsize, Ordering};
use std::sync::Arc;
use common_base::readable_size::ReadableSize;
use mito_codec::key_values::KeyValue;
use mito_codec::row_converter::{build_primary_key_codec, PrimaryKeyCodec};
pub(crate) use primary_key_filter::{DensePrimaryKeyFilter, SparsePrimaryKeyFilter};
use serde::{Deserialize, Serialize};
use store_api::metadata::RegionMetadataRef;
use store_api::storage::{ColumnId, SequenceNumber};
@@ -38,6 +38,7 @@ use table::predicate::Predicate;
use crate::error::{Result, UnsupportedOperationSnafu};
use crate::flush::WriteBufferManagerRef;
use crate::memtable::bulk::part::BulkPart;
use crate::memtable::key_values::KeyValue;
use crate::memtable::partition_tree::tree::PartitionTree;
use crate::memtable::stats::WriteMetrics;
use crate::memtable::{
@@ -46,6 +47,7 @@ use crate::memtable::{
PredicateGroup,
};
use crate::region::options::MergeMode;
use crate::row_converter::{build_primary_key_codec, PrimaryKeyCodec};
/// Use `1/DICTIONARY_SIZE_FACTOR` of OS memory as dictionary size.
pub(crate) const DICTIONARY_SIZE_FACTOR: u64 = 8;
@@ -366,11 +368,11 @@ mod tests {
use datatypes::schema::ColumnSchema;
use datatypes::value::Value;
use datatypes::vectors::Int64Vector;
use mito_codec::row_converter::DensePrimaryKeyCodec;
use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
use store_api::storage::RegionId;
use super::*;
use crate::row_converter::DensePrimaryKeyCodec;
use crate::test_util::memtable_util::{
self, collect_iter_timestamps, region_metadata_to_row_schema,
};

View File

@@ -33,7 +33,6 @@ use datatypes::vectors::{
TimestampSecondVector, UInt16Vector, UInt16VectorBuilder, UInt64Vector, UInt64VectorBuilder,
UInt8Vector, UInt8VectorBuilder,
};
use mito_codec::key_values::KeyValue;
use parquet::arrow::arrow_reader::{ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder};
use parquet::arrow::ArrowWriter;
use parquet::basic::{Compression, Encoding, ZstdLevel};
@@ -45,6 +44,7 @@ use store_api::storage::consts::{OP_TYPE_COLUMN_NAME, SEQUENCE_COLUMN_NAME};
use crate::error;
use crate::error::Result;
use crate::memtable::key_values::KeyValue;
use crate::memtable::partition_tree::merger::{DataBatchKey, DataNode, DataSource, Merger};
use crate::memtable::partition_tree::PkIndex;
use crate::metrics::{

View File

@@ -22,16 +22,13 @@ use std::time::{Duration, Instant};
use api::v1::SemanticType;
use common_recordbatch::filter::SimpleFilterEvaluator;
use mito_codec::key_values::KeyValue;
use mito_codec::primary_key_filter::is_partition_column;
use mito_codec::row_converter::{PrimaryKeyCodec, PrimaryKeyFilter};
use snafu::ResultExt;
use store_api::codec::PrimaryKeyEncoding;
use store_api::metadata::RegionMetadataRef;
use store_api::metric_engine_consts::DATA_SCHEMA_TABLE_ID_COLUMN_NAME;
use store_api::storage::ColumnId;
use crate::error::{EncodeSnafu, Result};
use crate::error::Result;
use crate::memtable::key_values::KeyValue;
use crate::memtable::partition_tree::data::{DataBatch, DataParts, DATA_INIT_CAP};
use crate::memtable::partition_tree::dedup::DedupReader;
use crate::memtable::partition_tree::shard::{
@@ -42,6 +39,7 @@ use crate::memtable::partition_tree::{PartitionTreeConfig, PkId};
use crate::memtable::stats::WriteMetrics;
use crate::metrics::PARTITION_TREE_READ_STAGE_ELAPSED;
use crate::read::{Batch, BatchBuilder};
use crate::row_converter::{PrimaryKeyCodec, PrimaryKeyFilter};
/// Key of a partition.
pub type PartitionKey = u32;
@@ -93,9 +91,7 @@ impl Partition {
// `primary_key` is sparse, re-encode the full primary key.
let sparse_key = primary_key.clone();
primary_key.clear();
row_codec
.encode_key_value(&key_value, primary_key)
.context(EncodeSnafu)?;
row_codec.encode_key_value(&key_value, primary_key)?;
let pk_id = inner.shard_builder.write_with_key(
primary_key,
Some(&sparse_key),
@@ -308,6 +304,11 @@ impl Partition {
.map(|meta| meta.column_schema.name == DATA_SCHEMA_TABLE_ID_COLUMN_NAME)
.unwrap_or(false)
}
/// Returns true if this is a partition column.
pub(crate) fn is_partition_column(name: &str) -> bool {
name == DATA_SCHEMA_TABLE_ID_COLUMN_NAME
}
}
pub(crate) struct PartitionStats {
@@ -445,7 +446,7 @@ impl ReadPartitionContext {
fn need_prune_key(metadata: &RegionMetadataRef, filters: &[SimpleFilterEvaluator]) -> bool {
for filter in filters {
// We already pruned partitions before so we skip the partition column.
if is_partition_column(filter.column_name()) {
if Partition::is_partition_column(filter.column_name()) {
continue;
}
let Some(column) = metadata.column_by_name(filter.column_name()) else {

View File

@@ -19,17 +19,12 @@ use api::v1::SemanticType;
use common_recordbatch::filter::SimpleFilterEvaluator;
use datatypes::value::Value;
use store_api::metadata::RegionMetadataRef;
use store_api::metric_engine_consts::DATA_SCHEMA_TABLE_ID_COLUMN_NAME;
use store_api::storage::ColumnId;
use crate::error::Result;
use crate::memtable::partition_tree::partition::Partition;
use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyFilter, SparsePrimaryKeyCodec};
/// Returns true if this is a partition column for metrics in the memtable.
pub fn is_partition_column(name: &str) -> bool {
name == DATA_SCHEMA_TABLE_ID_COLUMN_NAME
}
#[derive(Clone)]
struct PrimaryKeyFilterInner {
metadata: RegionMetadataRef,
@@ -47,7 +42,7 @@ impl PrimaryKeyFilterInner {
let mut result = true;
for filter in self.filters.iter() {
if is_partition_column(filter.column_name()) {
if Partition::is_partition_column(filter.column_name()) {
continue;
}
let Some(column) = self.metadata.column_by_name(filter.column_name()) else {
@@ -154,8 +149,9 @@ mod tests {
use std::sync::Arc;
use api::v1::SemanticType;
use datafusion::logical_expr::BinaryExpr;
use datafusion_common::{Column, ScalarValue};
use datafusion_expr::{BinaryExpr, Expr, Operator};
use datafusion_expr::{Expr, Operator};
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::ColumnSchema;
use datatypes::value::ValueRef;

View File

@@ -17,11 +17,10 @@
use std::cmp::Ordering;
use std::time::{Duration, Instant};
use mito_codec::key_values::KeyValue;
use mito_codec::row_converter::PrimaryKeyFilter;
use store_api::metadata::RegionMetadataRef;
use crate::error::Result;
use crate::memtable::key_values::KeyValue;
use crate::memtable::partition_tree::data::{
DataBatch, DataParts, DataPartsReader, DataPartsReaderBuilder, DATA_INIT_CAP,
};
@@ -30,6 +29,7 @@ use crate::memtable::partition_tree::merger::{Merger, Node};
use crate::memtable::partition_tree::shard_builder::ShardBuilderReader;
use crate::memtable::partition_tree::{PkId, PkIndex, ShardId};
use crate::metrics::PARTITION_TREE_READ_STAGE_ELAPSED;
use crate::row_converter::PrimaryKeyFilter;
/// Shard stores data related to the same key dictionary.
pub struct Shard {

View File

@@ -18,11 +18,10 @@ use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, Instant};
use mito_codec::key_values::KeyValue;
use mito_codec::row_converter::PrimaryKeyFilter;
use store_api::metadata::RegionMetadataRef;
use crate::error::Result;
use crate::memtable::key_values::KeyValue;
use crate::memtable::partition_tree::data::{
DataBatch, DataBuffer, DataBufferReader, DataBufferReaderBuilder, DataParts, DATA_INIT_CAP,
};
@@ -31,6 +30,7 @@ use crate::memtable::partition_tree::shard::Shard;
use crate::memtable::partition_tree::{PartitionTreeConfig, PkId, PkIndex, ShardId};
use crate::memtable::stats::WriteMetrics;
use crate::metrics::PARTITION_TREE_READ_STAGE_ELAPSED;
use crate::row_converter::PrimaryKeyFilter;
/// Builder to write keys and data to a shard that the key dictionary
/// is still active.

View File

@@ -23,10 +23,8 @@ use common_recordbatch::filter::SimpleFilterEvaluator;
use common_time::Timestamp;
use datafusion_common::ScalarValue;
use datatypes::prelude::ValueRef;
use mito_codec::key_values::KeyValue;
use mito_codec::primary_key_filter::is_partition_column;
use mito_codec::row_converter::sparse::{FieldWithId, SparseEncoder};
use mito_codec::row_converter::{PrimaryKeyCodec, SortField};
use memcomparable::Serializer;
use serde::Serialize;
use snafu::{ensure, ResultExt};
use store_api::codec::PrimaryKeyEncoding;
use store_api::metadata::RegionMetadataRef;
@@ -34,9 +32,10 @@ use store_api::storage::{ColumnId, SequenceNumber};
use table::predicate::Predicate;
use crate::error::{
EncodeSnafu, EncodeSparsePrimaryKeySnafu, PrimaryKeyLengthMismatchSnafu, Result,
EncodeSparsePrimaryKeySnafu, PrimaryKeyLengthMismatchSnafu, Result, SerializeFieldSnafu,
};
use crate::flush::WriteBufferManagerRef;
use crate::memtable::key_values::KeyValue;
use crate::memtable::partition_tree::partition::{
Partition, PartitionKey, PartitionReader, PartitionRef, ReadPartitionContext,
};
@@ -47,6 +46,7 @@ use crate::metrics::{PARTITION_TREE_READ_STAGE_ELAPSED, READ_ROWS_TOTAL, READ_ST
use crate::read::dedup::LastNonNullIter;
use crate::read::Batch;
use crate::region::options::MergeMode;
use crate::row_converter::{PrimaryKeyCodec, SortField};
/// The partition tree.
pub struct PartitionTree {
@@ -73,15 +73,15 @@ impl PartitionTree {
config: &PartitionTreeConfig,
write_buffer_manager: Option<WriteBufferManagerRef>,
) -> Self {
let sparse_encoder = SparseEncoder::new(
metadata
let sparse_encoder = SparseEncoder {
fields: metadata
.primary_key_columns()
.map(|c| FieldWithId {
field: SortField::new(c.column_schema.data_type.clone()),
column_id: c.column_id,
})
.collect(),
);
};
let is_partitioned = Partition::has_multi_partitions(&metadata);
let mut config = config.clone();
if config.merge_mode == MergeMode::LastNonNull {
@@ -129,8 +129,7 @@ impl PartitionTree {
} else {
// For compatibility, use the sparse encoder for dense primary key.
self.sparse_encoder
.encode_to_vec(kv.primary_keys(), buffer)
.context(EncodeSnafu)?;
.encode_to_vec(kv.primary_keys(), buffer)?;
}
Ok(())
}
@@ -167,9 +166,7 @@ impl PartitionTree {
if self.is_partitioned {
self.encode_sparse_primary_key(&kv, pk_buffer)?;
} else {
self.row_codec
.encode_key_value(&kv, pk_buffer)
.context(EncodeSnafu)?;
self.row_codec.encode_key_value(&kv, pk_buffer)?;
}
// Write rows with
@@ -211,9 +208,7 @@ impl PartitionTree {
if self.is_partitioned {
self.encode_sparse_primary_key(&kv, pk_buffer)?;
} else {
self.row_codec
.encode_key_value(&kv, pk_buffer)
.context(EncodeSnafu)?;
self.row_codec.encode_key_value(&kv, pk_buffer)?;
}
// Write rows with
@@ -420,7 +415,7 @@ impl PartitionTree {
for (key, partition) in partitions.iter() {
let mut is_needed = true;
for filter in filters {
if !is_partition_column(filter.column_name()) {
if !Partition::is_partition_column(filter.column_name()) {
continue;
}
@@ -441,6 +436,34 @@ impl PartitionTree {
}
}
struct FieldWithId {
field: SortField,
column_id: ColumnId,
}
struct SparseEncoder {
fields: Vec<FieldWithId>,
}
impl SparseEncoder {
fn encode_to_vec<'a, I>(&self, row: I, buffer: &mut Vec<u8>) -> Result<()>
where
I: Iterator<Item = ValueRef<'a>>,
{
let mut serializer = Serializer::new(buffer);
for (value, field) in row.zip(self.fields.iter()) {
if !value.is_null() {
field
.column_id
.serialize(&mut serializer)
.context(SerializeFieldSnafu)?;
field.field.serialize(&mut serializer, &value)?;
}
}
Ok(())
}
}
#[derive(Default)]
struct TreeIterMetrics {
iter_elapsed: Duration,

View File

@@ -19,7 +19,6 @@ use std::sync::{Arc, RwLock};
use api::v1::OpType;
use datatypes::vectors::Helper;
use mito_codec::key_values::KeyValue;
use snafu::{OptionExt, ResultExt};
use store_api::metadata::RegionMetadataRef;
use store_api::storage::{ColumnId, SequenceNumber};
@@ -27,6 +26,7 @@ use table::predicate::Predicate;
use crate::flush::WriteBufferManagerRef;
use crate::memtable::bulk::part::BulkPart;
use crate::memtable::key_values::KeyValue;
use crate::memtable::stats::WriteMetrics;
use crate::memtable::time_series::{Series, Values};
use crate::memtable::{

View File

@@ -29,7 +29,6 @@ use datatypes::arrow::array::{
};
use datatypes::arrow::buffer::{BooleanBuffer, MutableBuffer};
use datatypes::arrow::datatypes::{DataType, Int64Type};
use mito_codec::key_values::KeyValue;
use smallvec::{smallvec, SmallVec};
use snafu::{OptionExt, ResultExt};
use store_api::metadata::RegionMetadataRef;
@@ -37,6 +36,7 @@ use store_api::metadata::RegionMetadataRef;
use crate::error;
use crate::error::{InvalidRequestSnafu, Result};
use crate::memtable::bulk::part::BulkPart;
use crate::memtable::key_values::KeyValue;
use crate::memtable::version::SmallMemtableVec;
use crate::memtable::{KeyValues, MemtableBuilderRef, MemtableId, MemtableRef};

View File

@@ -35,19 +35,17 @@ use datatypes::vectors::{
Helper, TimestampMicrosecondVector, TimestampMillisecondVector, TimestampNanosecondVector,
TimestampSecondVector, UInt64Vector, UInt8Vector,
};
use mito_codec::key_values::KeyValue;
use mito_codec::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt};
use snafu::{ensure, OptionExt, ResultExt};
use store_api::metadata::RegionMetadataRef;
use store_api::storage::{ColumnId, SequenceNumber};
use table::predicate::Predicate;
use crate::error::{
self, ComputeArrowSnafu, ConvertVectorSnafu, EncodeSnafu, PrimaryKeyLengthMismatchSnafu, Result,
};
use crate::error;
use crate::error::{ComputeArrowSnafu, ConvertVectorSnafu, PrimaryKeyLengthMismatchSnafu, Result};
use crate::flush::WriteBufferManagerRef;
use crate::memtable::builder::{FieldBuilder, StringBuilder};
use crate::memtable::bulk::part::BulkPart;
use crate::memtable::key_values::KeyValue;
use crate::memtable::simple_bulk_memtable::SimpleBulkMemtable;
use crate::memtable::stats::WriteMetrics;
use crate::memtable::{
@@ -59,6 +57,7 @@ use crate::metrics::{READ_ROWS_TOTAL, READ_STAGE_ELAPSED};
use crate::read::dedup::LastNonNullIter;
use crate::read::{Batch, BatchBuilder, BatchColumn};
use crate::region::options::MergeMode;
use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt};
/// Initial vector builder capacity.
const INITIAL_BUILDER_CAPACITY: usize = 4;
@@ -177,10 +176,7 @@ impl TimeSeriesMemtable {
}
);
let primary_key_encoded = self
.row_codec
.encode(kv.primary_keys())
.context(EncodeSnafu)?;
let primary_key_encoded = self.row_codec.encode(kv.primary_keys())?;
let (key_allocated, value_allocated) =
self.series_set.push_to_series(primary_key_encoded, &kv);
@@ -1111,11 +1107,11 @@ mod tests {
use datatypes::schema::ColumnSchema;
use datatypes::value::{OrderedFloat, Value};
use datatypes::vectors::{Float64Vector, Int64Vector, TimestampMillisecondVector};
use mito_codec::row_converter::SortField;
use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
use store_api::storage::RegionId;
use super::*;
use crate::row_converter::SortField;
use crate::test_util::column_metadata_to_column_schema;
fn schema_for_test() -> RegionMetadataRef {

View File

@@ -50,17 +50,16 @@ use datatypes::vectors::{
};
use futures::stream::BoxStream;
use futures::TryStreamExt;
use mito_codec::row_converter::{CompositeValues, PrimaryKeyCodec};
use snafu::{ensure, OptionExt, ResultExt};
use store_api::metadata::RegionMetadata;
use store_api::storage::{ColumnId, SequenceNumber};
use crate::error::{
ComputeArrowSnafu, ComputeVectorSnafu, ConvertVectorSnafu, DecodeSnafu, InvalidBatchSnafu,
Result,
ComputeArrowSnafu, ComputeVectorSnafu, ConvertVectorSnafu, InvalidBatchSnafu, Result,
};
use crate::memtable::BoxedBatchIterator;
use crate::read::prune::PruneReader;
use crate::row_converter::{CompositeValues, PrimaryKeyCodec};
/// Storage internal representation of a batch of rows for a primary key (time series).
///
@@ -613,7 +612,7 @@ impl Batch {
column_id: ColumnId,
) -> Result<Option<&Value>> {
if self.pk_values.is_none() {
self.pk_values = Some(codec.decode(&self.primary_key).context(DecodeSnafu)?);
self.pk_values = Some(codec.decode(&self.primary_key)?);
}
let pk_values = self.pk_values.as_ref().unwrap();
@@ -1027,12 +1026,12 @@ pub(crate) struct ScannerMetrics {
#[cfg(test)]
mod tests {
use mito_codec::row_converter::{self, build_primary_key_codec_with_fields};
use store_api::codec::PrimaryKeyEncoding;
use store_api::storage::consts::ReservedColumnId;
use super::*;
use crate::error::Error;
use crate::row_converter::{self, build_primary_key_codec_with_fields};
use crate::test_util::new_batch_builder;
fn new_batch(

View File

@@ -20,17 +20,17 @@ use std::sync::Arc;
use datatypes::data_type::ConcreteDataType;
use datatypes::value::Value;
use datatypes::vectors::VectorRef;
use mito_codec::row_converter::{
build_primary_key_codec, build_primary_key_codec_with_fields, CompositeValues, PrimaryKeyCodec,
SortField,
};
use snafu::{ensure, OptionExt, ResultExt};
use store_api::metadata::{RegionMetadata, RegionMetadataRef};
use store_api::storage::ColumnId;
use crate::error::{CompatReaderSnafu, CreateDefaultSnafu, DecodeSnafu, EncodeSnafu, Result};
use crate::error::{CompatReaderSnafu, CreateDefaultSnafu, Result};
use crate::read::projection::ProjectionMapper;
use crate::read::{Batch, BatchColumn, BatchReader};
use crate::row_converter::{
build_primary_key_codec, build_primary_key_codec_with_fields, CompositeValues, PrimaryKeyCodec,
SortField,
};
/// Reader to adapt schema of underlying reader to expected schema.
pub struct CompatReader<R> {
@@ -155,9 +155,7 @@ impl CompatPrimaryKey {
batch.primary_key().len() + self.converter.estimated_size().unwrap_or_default(),
);
buffer.extend_from_slice(batch.primary_key());
self.converter
.encode_values(&self.values, &mut buffer)
.context(EncodeSnafu)?;
self.converter.encode_values(&self.values, &mut buffer)?;
batch.set_primary_key(buffer);
@@ -407,10 +405,7 @@ impl RewritePrimaryKey {
let values = if let Some(pk_values) = batch.pk_values() {
pk_values
} else {
let new_pk_values = self
.original
.decode(batch.primary_key())
.context(DecodeSnafu)?;
let new_pk_values = self.original.decode(batch.primary_key())?;
batch.set_pk_values(new_pk_values);
// Safety: We ensure pk_values is not None.
batch.pk_values().as_ref().unwrap()
@@ -421,9 +416,7 @@ impl RewritePrimaryKey {
);
match values {
CompositeValues::Dense(values) => {
self.new
.encode_values(values.as_slice(), &mut buffer)
.context(EncodeSnafu)?;
self.new.encode_values(values.as_slice(), &mut buffer)?;
}
CompositeValues::Sparse(values) => {
let values = self
@@ -434,9 +427,7 @@ impl RewritePrimaryKey {
(*id, value.as_value_ref())
})
.collect::<Vec<_>>();
self.new
.encode_value_refs(&values, &mut buffer)
.context(EncodeSnafu)?;
self.new.encode_value_refs(&values, &mut buffer)?;
}
}
batch.set_primary_key(buffer);
@@ -454,14 +445,12 @@ mod tests {
use datatypes::schema::ColumnSchema;
use datatypes::value::ValueRef;
use datatypes::vectors::{Int64Vector, TimestampMillisecondVector, UInt64Vector, UInt8Vector};
use mito_codec::row_converter::{
DensePrimaryKeyCodec, PrimaryKeyCodecExt, SparsePrimaryKeyCodec,
};
use store_api::codec::PrimaryKeyEncoding;
use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
use store_api::storage::RegionId;
use super::*;
use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt, SparsePrimaryKeyCodec};
use crate::test_util::{check_reader_result, VecBatchReader};
/// Creates a new [RegionMetadata].

View File

@@ -26,7 +26,6 @@ use datatypes::prelude::{ConcreteDataType, DataType};
use datatypes::schema::{Schema, SchemaRef};
use datatypes::value::Value;
use datatypes::vectors::VectorRef;
use mito_codec::row_converter::{build_primary_key_codec, CompositeValues, PrimaryKeyCodec};
use snafu::{OptionExt, ResultExt};
use store_api::metadata::RegionMetadataRef;
use store_api::storage::ColumnId;
@@ -34,6 +33,7 @@ use store_api::storage::ColumnId;
use crate::cache::CacheStrategy;
use crate::error::{InvalidRequestSnafu, Result};
use crate::read::Batch;
use crate::row_converter::{build_primary_key_codec, CompositeValues, PrimaryKeyCodec};
/// Only cache vector when its length `<=` this value.
const MAX_VECTOR_LENGTH_TO_CACHE: usize = 16384;
@@ -320,12 +320,12 @@ mod tests {
use datatypes::arrow::array::{Int64Array, TimestampMillisecondArray, UInt64Array, UInt8Array};
use datatypes::arrow::util::pretty;
use datatypes::value::ValueRef;
use mito_codec::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt, SortField};
use mito_codec::test_util::TestRegionMetadataBuilder;
use super::*;
use crate::cache::CacheManager;
use crate::read::BatchBuilder;
use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt, SortField};
use crate::test_util::meta_util::TestRegionMetadataBuilder;
fn new_batch(
ts_start: i64,

View File

@@ -962,22 +962,19 @@ impl StreamContext {
}
}
}
if verbose {
write!(f, "{{")?;
}
write!(
f,
"\"partition_count\":{{\"count\":{}, \"mem_ranges\":{}, \"files\":{}, \"file_ranges\":{}}}",
"partition_count={} ({} memtable ranges, {} file {} ranges)",
self.ranges.len(),
num_mem_ranges,
self.input.num_files(),
num_file_ranges,
)?;
if let Some(selector) = &self.input.series_row_selector {
write!(f, ", \"selector\":\"{}\"", selector)?;
write!(f, ", selector={}", selector)?;
}
if let Some(distribution) = &self.input.distribution {
write!(f, ", \"distribution\":\"{}\"", distribution)?;
write!(f, ", distribution={}", distribution)?;
}
if verbose {
@@ -994,15 +991,14 @@ impl StreamContext {
impl fmt::Debug for FileWrapper<'_> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let (start, end) = self.file.time_range();
write!(
f,
r#"{{"file_id":"{}","time_range_start":"{}::{}","time_range_end":"{}::{}","rows":{},"size":{},"index_size":{}}}"#,
"[file={}, time_range=({}::{}, {}::{}), rows={}, size={}, index_size={}]",
self.file.file_id(),
start.value(),
start.unit(),
end.value(),
end.unit(),
self.file.time_range().0.value(),
self.file.time_range().0.unit(),
self.file.time_range().1.value(),
self.file.time_range().1.unit(),
self.file.num_rows(),
self.file.size(),
self.file.index_size()
@@ -1018,22 +1014,25 @@ impl StreamContext {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let output_schema = self.input.mapper.output_schema();
if !output_schema.is_empty() {
let names: Vec<_> = output_schema
.column_schemas()
.iter()
.map(|col| &col.name)
.collect();
write!(f, ", \"projection\": {:?}", names)?;
write!(f, ", projection=")?;
f.debug_list()
.entries(output_schema.column_schemas().iter().map(|col| &col.name))
.finish()?;
}
if let Some(predicate) = &self.input.predicate.predicate() {
if !predicate.exprs().is_empty() {
let exprs: Vec<_> =
predicate.exprs().iter().map(|e| e.to_string()).collect();
write!(f, ", \"filters\": {:?}", exprs)?;
write!(f, ", filters=[")?;
for (i, expr) in predicate.exprs().iter().enumerate() {
if i == predicate.exprs().len() - 1 {
write!(f, "{}]", expr)?;
} else {
write!(f, "{}, ", expr)?;
}
}
}
}
if !self.input.files.is_empty() {
write!(f, ", \"files\": ")?;
write!(f, ", files=")?;
f.debug_list()
.entries(self.input.files.iter().map(|file| FileWrapper { file }))
.finish()?;

View File

@@ -142,36 +142,36 @@ impl fmt::Debug for ScanMetricsSet {
write!(
f,
"{{\"prepare_scan_cost\":\"{prepare_scan_cost:?}\", \
\"build_reader_cost\":\"{build_reader_cost:?}\", \
\"scan_cost\":\"{scan_cost:?}\", \
\"convert_cost\":\"{convert_cost:?}\", \
\"yield_cost\":\"{yield_cost:?}\", \
\"total_cost\":\"{total_cost:?}\", \
\"num_rows\":{num_rows}, \
\"num_batches\":{num_batches}, \
\"num_mem_ranges\":{num_mem_ranges}, \
\"num_file_ranges\":{num_file_ranges}, \
\"build_parts_cost\":\"{build_parts_cost:?}\", \
\"rg_total\":{rg_total}, \
\"rg_fulltext_filtered\":{rg_fulltext_filtered}, \
\"rg_inverted_filtered\":{rg_inverted_filtered}, \
\"rg_minmax_filtered\":{rg_minmax_filtered}, \
\"rg_bloom_filtered\":{rg_bloom_filtered}, \
\"rows_before_filter\":{rows_before_filter}, \
\"rows_fulltext_filtered\":{rows_fulltext_filtered}, \
\"rows_inverted_filtered\":{rows_inverted_filtered}, \
\"rows_bloom_filtered\":{rows_bloom_filtered}, \
\"rows_precise_filtered\":{rows_precise_filtered}, \
\"num_sst_record_batches\":{num_sst_record_batches}, \
\"num_sst_batches\":{num_sst_batches}, \
\"num_sst_rows\":{num_sst_rows}, \
\"first_poll\":\"{first_poll:?}\", \
\"num_series_send_timeout\":{num_series_send_timeout}, \
\"num_distributor_rows\":{num_distributor_rows}, \
\"num_distributor_batches\":{num_distributor_batches}, \
\"distributor_scan_cost\":\"{distributor_scan_cost:?}\", \
\"distributor_yield_cost\":\"{distributor_yield_cost:?}\"}}"
"{{prepare_scan_cost={prepare_scan_cost:?}, \
build_reader_cost={build_reader_cost:?}, \
scan_cost={scan_cost:?}, \
convert_cost={convert_cost:?}, \
yield_cost={yield_cost:?}, \
total_cost={total_cost:?}, \
num_rows={num_rows}, \
num_batches={num_batches}, \
num_mem_ranges={num_mem_ranges}, \
num_file_ranges={num_file_ranges}, \
build_parts_cost={build_parts_cost:?}, \
rg_total={rg_total}, \
rg_fulltext_filtered={rg_fulltext_filtered}, \
rg_inverted_filtered={rg_inverted_filtered}, \
rg_minmax_filtered={rg_minmax_filtered}, \
rg_bloom_filtered={rg_bloom_filtered}, \
rows_before_filter={rows_before_filter}, \
rows_fulltext_filtered={rows_fulltext_filtered}, \
rows_inverted_filtered={rows_inverted_filtered}, \
rows_bloom_filtered={rows_bloom_filtered}, \
rows_precise_filtered={rows_precise_filtered}, \
num_sst_record_batches={num_sst_record_batches}, \
num_sst_batches={num_sst_batches}, \
num_sst_rows={num_sst_rows}, \
first_poll={first_poll:?}, \
num_series_send_timeout={num_series_send_timeout}, \
num_distributor_rows={num_distributor_rows}, \
num_distributor_batches={num_distributor_batches}, \
distributor_scan_cost={distributor_scan_cost:?}, \
distributor_yield_cost={distributor_yield_cost:?}}},"
)
}
}
@@ -390,11 +390,10 @@ impl PartitionMetricsList {
/// Format verbose metrics for each partition for explain.
pub(crate) fn format_verbose_metrics(&self, f: &mut fmt::Formatter) -> fmt::Result {
let list = self.0.lock().unwrap();
write!(f, ", \"metrics_per_partition\": ")?;
write!(f, ", metrics_per_partition: ")?;
f.debug_list()
.entries(list.iter().filter_map(|p| p.as_ref()))
.finish()?;
write!(f, "}}")
.finish()
}
}
@@ -489,11 +488,7 @@ impl PartitionMetrics {
impl fmt::Debug for PartitionMetrics {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let metrics = self.0.metrics.lock().unwrap();
write!(
f,
r#"{{"partition":{}, "metrics":{:?}}}"#,
self.0.partition, metrics
)
write!(f, "[partition={}, {:?}]", self.0.partition, metrics)
}
}

View File

@@ -922,12 +922,11 @@ mod tests {
use api::v1::{Row, SemanticType};
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::ColumnDefaultConstraint;
use mito_codec::test_util::i64_value;
use store_api::metadata::RegionMetadataBuilder;
use super::*;
use crate::error::Error;
use crate::test_util::ts_ms_value;
use crate::test_util::{i64_value, ts_ms_value};
fn new_column_schema(
name: &str,

View File

@@ -20,13 +20,13 @@ use std::sync::Arc;
use common_recordbatch::filter::SimpleFilterEvaluator;
use datatypes::value::{Value, ValueRef};
pub use dense::{DensePrimaryKeyCodec, SortField};
use mito_codec::key_values::KeyValue;
pub use sparse::{SparsePrimaryKeyCodec, SparseValues, COLUMN_ID_ENCODE_SIZE};
use store_api::codec::PrimaryKeyEncoding;
use store_api::metadata::{RegionMetadata, RegionMetadataRef};
use store_api::storage::ColumnId;
use crate::error::Result;
use crate::memtable::key_values::KeyValue;
/// Row value encoder/decoder.
pub trait PrimaryKeyCodecExt {

View File

@@ -35,16 +35,15 @@ use store_api::storage::ColumnId;
use crate::error::{
self, FieldTypeMismatchSnafu, NotSupportedFieldSnafu, Result, SerializeFieldSnafu,
};
use crate::key_values::KeyValue;
use crate::primary_key_filter::DensePrimaryKeyFilter;
use crate::memtable::key_values::KeyValue;
use crate::memtable::partition_tree::DensePrimaryKeyFilter;
use crate::row_converter::{
CompositeValues, PrimaryKeyCodec, PrimaryKeyCodecExt, PrimaryKeyFilter,
};
/// Field to serialize and deserialize value in memcomparable format.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SortField {
data_type: ConcreteDataType,
pub(crate) data_type: ConcreteDataType,
}
impl SortField {
@@ -52,11 +51,6 @@ impl SortField {
Self { data_type }
}
/// Returns the data type of the field.
pub fn data_type(&self) -> &ConcreteDataType {
&self.data_type
}
pub fn estimated_size(&self) -> usize {
match &self.data_type {
ConcreteDataType::Boolean(_) => 2,
@@ -81,9 +75,10 @@ impl SortField {
| ConcreteDataType::Dictionary(_) => 0,
}
}
}
/// Serialize a value to the serializer.
pub fn serialize(
impl SortField {
pub(crate) fn serialize(
&self,
serializer: &mut Serializer<&mut Vec<u8>>,
value: &ValueRef,
@@ -168,8 +163,7 @@ impl SortField {
Ok(())
}
/// Deserialize a value from the deserializer.
pub fn deserialize<B: Buf>(&self, deserializer: &mut Deserializer<B>) -> Result<Value> {
pub(crate) fn deserialize<B: Buf>(&self, deserializer: &mut Deserializer<B>) -> Result<Value> {
macro_rules! deserialize_and_build_value {
(
$self: ident;
@@ -531,7 +525,7 @@ mod tests {
let value = encoder.decode_value_at(&result, i, &mut offsets).unwrap();
decoded.push(value);
}
assert_eq!(data_types.len(), offsets.len(), "offsets: {offsets:?}");
assert_eq!(data_types.len(), offsets.len(), "offsets: {:?}", offsets);
assert_eq!(decoded, row);
}
}

View File

@@ -27,8 +27,8 @@ use store_api::storage::consts::ReservedColumnId;
use store_api::storage::ColumnId;
use crate::error::{DeserializeFieldSnafu, Result, SerializeFieldSnafu, UnsupportedOperationSnafu};
use crate::key_values::KeyValue;
use crate::primary_key_filter::SparsePrimaryKeyFilter;
use crate::memtable::key_values::KeyValue;
use crate::memtable::partition_tree::SparsePrimaryKeyFilter;
use crate::row_converter::dense::SortField;
use crate::row_converter::{CompositeValues, PrimaryKeyCodec, PrimaryKeyFilter};
@@ -205,7 +205,7 @@ impl SparsePrimaryKeyCodec {
}
/// Returns the offset of the given column id in the given primary key.
pub fn has_column(
pub(crate) fn has_column(
&self,
pk: &[u8],
offsets_map: &mut HashMap<u32, usize>,
@@ -233,7 +233,12 @@ impl SparsePrimaryKeyCodec {
}
/// Decode value at `offset` in `pk`.
pub fn decode_value_at(&self, pk: &[u8], offset: usize, column_id: ColumnId) -> Result<Value> {
pub(crate) fn decode_value_at(
&self,
pk: &[u8],
offset: usize,
column_id: ColumnId,
) -> Result<Value> {
let mut deserializer = Deserializer::new(pk);
deserializer.advance(offset);
// Safety: checked by `has_column`
@@ -295,40 +300,6 @@ impl PrimaryKeyCodec for SparsePrimaryKeyCodec {
}
}
/// Field with column id.
pub struct FieldWithId {
pub field: SortField,
pub column_id: ColumnId,
}
/// A special encoder for memtable.
pub struct SparseEncoder {
fields: Vec<FieldWithId>,
}
impl SparseEncoder {
pub fn new(fields: Vec<FieldWithId>) -> Self {
Self { fields }
}
pub fn encode_to_vec<'a, I>(&self, row: I, buffer: &mut Vec<u8>) -> Result<()>
where
I: Iterator<Item = ValueRef<'a>>,
{
let mut serializer = Serializer::new(buffer);
for (value, field) in row.zip(self.fields.iter()) {
if !value.is_null() {
field
.column_id
.serialize(&mut serializer)
.context(SerializeFieldSnafu)?;
field.field.serialize(&mut serializer, &value)?;
}
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;

View File

@@ -13,6 +13,7 @@
// limitations under the License.
pub(crate) mod bloom_filter;
mod codec;
pub(crate) mod fulltext_index;
mod indexer;
pub mod intermediate;

Some files were not shown because too many files have changed in this diff Show More