mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-24 15:09:59 +00:00
Compare commits
9 Commits
feat/query
...
feat/bulk-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
ffe84af343 | ||
|
|
41c40688c4 | ||
|
|
59a82a3f29 | ||
|
|
0d7012a1ea | ||
|
|
88da98c829 | ||
|
|
73ce5914f2 | ||
|
|
f5eac3528c | ||
|
|
9cd61d221d | ||
|
|
7527ff976e |
@@ -10,13 +10,13 @@ inputs:
|
||||
meta-replicas:
|
||||
default: 2
|
||||
description: "Number of Metasrv replicas"
|
||||
image-registry:
|
||||
image-registry:
|
||||
default: "docker.io"
|
||||
description: "Image registry"
|
||||
image-repository:
|
||||
image-repository:
|
||||
default: "greptime/greptimedb"
|
||||
description: "Image repository"
|
||||
image-tag:
|
||||
image-tag:
|
||||
default: "latest"
|
||||
description: 'Image tag'
|
||||
etcd-endpoints:
|
||||
@@ -32,12 +32,12 @@ runs:
|
||||
steps:
|
||||
- name: Install GreptimeDB operator
|
||||
uses: nick-fields/retry@v3
|
||||
with:
|
||||
with:
|
||||
timeout_minutes: 3
|
||||
max_attempts: 3
|
||||
shell: bash
|
||||
command: |
|
||||
helm repo add greptime https://greptimeteam.github.io/helm-charts/
|
||||
helm repo add greptime https://greptimeteam.github.io/helm-charts/
|
||||
helm repo update
|
||||
helm upgrade \
|
||||
--install \
|
||||
@@ -48,10 +48,10 @@ runs:
|
||||
--wait-for-jobs
|
||||
- name: Install GreptimeDB cluster
|
||||
shell: bash
|
||||
run: |
|
||||
run: |
|
||||
helm upgrade \
|
||||
--install my-greptimedb \
|
||||
--set meta.backendStorage.etcd.endpoints=${{ inputs.etcd-endpoints }} \
|
||||
--set meta.etcdEndpoints=${{ inputs.etcd-endpoints }} \
|
||||
--set meta.enableRegionFailover=${{ inputs.enable-region-failover }} \
|
||||
--set image.registry=${{ inputs.image-registry }} \
|
||||
--set image.repository=${{ inputs.image-repository }} \
|
||||
@@ -72,7 +72,7 @@ runs:
|
||||
- name: Wait for GreptimeDB
|
||||
shell: bash
|
||||
run: |
|
||||
while true; do
|
||||
while true; do
|
||||
PHASE=$(kubectl -n my-greptimedb get gtc my-greptimedb -o jsonpath='{.status.clusterPhase}')
|
||||
if [ "$PHASE" == "Running" ]; then
|
||||
echo "Cluster is ready"
|
||||
@@ -86,10 +86,10 @@ runs:
|
||||
- name: Print GreptimeDB info
|
||||
if: always()
|
||||
shell: bash
|
||||
run: |
|
||||
run: |
|
||||
kubectl get all --show-labels -n my-greptimedb
|
||||
- name: Describe Nodes
|
||||
if: always()
|
||||
shell: bash
|
||||
run: |
|
||||
run: |
|
||||
kubectl describe nodes
|
||||
|
||||
4
.github/scripts/deploy-greptimedb.sh
vendored
4
.github/scripts/deploy-greptimedb.sh
vendored
@@ -68,7 +68,7 @@ function deploy_greptimedb_cluster() {
|
||||
|
||||
helm install "$cluster_name" greptime/greptimedb-cluster \
|
||||
--set image.tag="$GREPTIMEDB_IMAGE_TAG" \
|
||||
--set meta.backendStorage.etcd.endpoints="etcd.$install_namespace:2379" \
|
||||
--set meta.etcdEndpoints="etcd.$install_namespace:2379" \
|
||||
-n "$install_namespace"
|
||||
|
||||
# Wait for greptimedb cluster to be ready.
|
||||
@@ -103,7 +103,7 @@ function deploy_greptimedb_cluster_with_s3_storage() {
|
||||
|
||||
helm install "$cluster_name" greptime/greptimedb-cluster -n "$install_namespace" \
|
||||
--set image.tag="$GREPTIMEDB_IMAGE_TAG" \
|
||||
--set meta.backendStorage.etcd.endpoints="etcd.$install_namespace:2379" \
|
||||
--set meta.etcdEndpoints="etcd.$install_namespace:2379" \
|
||||
--set storage.s3.bucket="$AWS_CI_TEST_BUCKET" \
|
||||
--set storage.s3.region="$AWS_REGION" \
|
||||
--set storage.s3.root="$DATA_ROOT" \
|
||||
|
||||
15
Cargo.lock
generated
15
Cargo.lock
generated
@@ -1623,7 +1623,6 @@ dependencies = [
|
||||
"chrono",
|
||||
"common-catalog",
|
||||
"common-error",
|
||||
"common-frontend",
|
||||
"common-macro",
|
||||
"common-meta",
|
||||
"common-procedure",
|
||||
@@ -2294,14 +2293,8 @@ version = "0.15.0"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"common-error",
|
||||
"common-grpc",
|
||||
"common-macro",
|
||||
"common-meta",
|
||||
"greptime-proto",
|
||||
"meta-client",
|
||||
"snafu 0.8.5",
|
||||
"tokio",
|
||||
"tonic 0.12.3",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -4708,7 +4701,6 @@ dependencies = [
|
||||
"common-config",
|
||||
"common-datasource",
|
||||
"common-error",
|
||||
"common-frontend",
|
||||
"common-function",
|
||||
"common-grpc",
|
||||
"common-macro",
|
||||
@@ -5141,7 +5133,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "greptime-proto"
|
||||
version = "0.1.0"
|
||||
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=5f6119ac7952878d39dcde0343c4bf828d18ffc8#5f6119ac7952878d39dcde0343c4bf828d18ffc8"
|
||||
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=52083925a15d741c259800a9a54eba3467939180#52083925a15d741c259800a9a54eba3467939180"
|
||||
dependencies = [
|
||||
"prost 0.13.5",
|
||||
"serde",
|
||||
@@ -8893,9 +8885,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "pgwire"
|
||||
version = "0.30.2"
|
||||
version = "0.30.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "4ca6c26b25be998208a13ff2f0c55b567363f34675410e6d6f1c513a150583fd"
|
||||
checksum = "ec79ee18e6cafde8698885646780b967ecc905120798b8359dd0da64f9688e89"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"bytes",
|
||||
@@ -11143,7 +11135,6 @@ dependencies = [
|
||||
"common-catalog",
|
||||
"common-config",
|
||||
"common-error",
|
||||
"common-frontend",
|
||||
"common-grpc",
|
||||
"common-macro",
|
||||
"common-mem-prof",
|
||||
|
||||
@@ -133,7 +133,7 @@ etcd-client = "0.14"
|
||||
fst = "0.4.7"
|
||||
futures = "0.3"
|
||||
futures-util = "0.3"
|
||||
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "5f6119ac7952878d39dcde0343c4bf828d18ffc8" }
|
||||
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "52083925a15d741c259800a9a54eba3467939180" }
|
||||
hex = "0.4"
|
||||
http = "1"
|
||||
humantime = "2.1"
|
||||
|
||||
@@ -232,7 +232,7 @@
|
||||
| `grpc.bind_addr` | String | `127.0.0.1:4001` | The address to bind the gRPC server. |
|
||||
| `grpc.server_addr` | String | `127.0.0.1:4001` | The address advertised to the metasrv, and used for connections from outside the host.<br/>If left empty or unset, the server will automatically use the IP address of the first network interface<br/>on the host, with the same port number as the one specified in `grpc.bind_addr`. |
|
||||
| `grpc.runtime_size` | Integer | `8` | The number of server worker threads. |
|
||||
| `grpc.flight_compression` | String | `arrow_ipc` | Compression mode for frontend side Arrow IPC service. Available options:<br/>- `none`: disable all compression<br/>- `transport`: only enable gRPC transport compression (zstd)<br/>- `arrow_ipc`: only enable Arrow IPC compression (lz4)<br/>- `all`: enable all compression.<br/>Default to `none` |
|
||||
| `grpc.flight_compression` | String | `arrow_ipc` | Compression mode for frontend side Arrow IPC service. Available options:<br/>- `none`: disable all compression<br/>- `transport`: only enable gRPC transport compression (zstd)<br/>- `arrow_ipc`: only enable Arrow IPC compression (lz4)<br/>- `all`: enable all compression. |
|
||||
| `grpc.tls` | -- | -- | gRPC server TLS options, see `mysql.tls` section. |
|
||||
| `grpc.tls.mode` | String | `disable` | TLS mode. |
|
||||
| `grpc.tls.cert_path` | String | Unset | Certificate file path. |
|
||||
@@ -405,7 +405,7 @@
|
||||
| `grpc.runtime_size` | Integer | `8` | The number of server worker threads. |
|
||||
| `grpc.max_recv_message_size` | String | `512MB` | The maximum receive message size for gRPC server. |
|
||||
| `grpc.max_send_message_size` | String | `512MB` | The maximum send message size for gRPC server. |
|
||||
| `grpc.flight_compression` | String | `arrow_ipc` | Compression mode for datanode side Arrow IPC service. Available options:<br/>- `none`: disable all compression<br/>- `transport`: only enable gRPC transport compression (zstd)<br/>- `arrow_ipc`: only enable Arrow IPC compression (lz4)<br/>- `all`: enable all compression.<br/>Default to `none` |
|
||||
| `grpc.flight_compression` | String | `arrow_ipc` | Compression mode for datanode side Arrow IPC service. Available options:<br/>- `none`: disable all compression<br/>- `transport`: only enable gRPC transport compression (zstd)<br/>- `arrow_ipc`: only enable Arrow IPC compression (lz4)<br/>- `all`: enable all compression. |
|
||||
| `grpc.tls` | -- | -- | gRPC server TLS options, see `mysql.tls` section. |
|
||||
| `grpc.tls.mode` | String | `disable` | TLS mode. |
|
||||
| `grpc.tls.cert_path` | String | Unset | Certificate file path. |
|
||||
|
||||
@@ -9,7 +9,7 @@ We highly recommend using the self-monitoring feature provided by [GreptimeDB Op
|
||||
- **Metrics Dashboards**
|
||||
|
||||
- `dashboards/metrics/cluster/dashboard.json`: The Grafana dashboard for the GreptimeDB cluster. Read the [dashboard.md](./dashboards/metrics/cluster/dashboard.md) for more details.
|
||||
|
||||
|
||||
- `dashboards/metrics/standalone/dashboard.json`: The Grafana dashboard for the standalone GreptimeDB instance. **It's generated from the `cluster/dashboard.json` by removing the instance filter through the `make dashboards` command**. Read the [dashboard.md](./dashboards/metrics/standalone/dashboard.md) for more details.
|
||||
|
||||
- **Logs Dashboard**
|
||||
@@ -83,7 +83,7 @@ If you use the [Helm Chart](https://github.com/GreptimeTeam/helm-charts) to depl
|
||||
- `monitoring.enabled=true`: Deploys a standalone GreptimeDB instance dedicated to monitoring the cluster;
|
||||
- `grafana.enabled=true`: Deploys Grafana and automatically imports the monitoring dashboard;
|
||||
|
||||
The standalone GreptimeDB instance will collect metrics from your cluster, and the dashboard will be available in the Grafana UI. For detailed deployment instructions, please refer to our [Kubernetes deployment guide](https://docs.greptime.com/user-guide/deployments-administration/deploy-on-kubernetes/getting-started).
|
||||
The standalone GreptimeDB instance will collect metrics from your cluster, and the dashboard will be available in the Grafana UI. For detailed deployment instructions, please refer to our [Kubernetes deployment guide](https://docs.greptime.com/nightly/user-guide/deployments/deploy-on-kubernetes/getting-started).
|
||||
|
||||
### Self-host Prometheus and import dashboards manually
|
||||
|
||||
|
||||
@@ -19,7 +19,6 @@ async-trait.workspace = true
|
||||
bytes.workspace = true
|
||||
common-catalog.workspace = true
|
||||
common-error.workspace = true
|
||||
common-frontend.workspace = true
|
||||
common-macro.workspace = true
|
||||
common-meta.workspace = true
|
||||
common-procedure.workspace = true
|
||||
|
||||
@@ -277,13 +277,6 @@ pub enum Error {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to list frontend nodes"))]
|
||||
ListProcess {
|
||||
source: common_frontend::error::Error,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
}
|
||||
|
||||
impl Error {
|
||||
@@ -352,7 +345,6 @@ impl ErrorExt for Error {
|
||||
Error::GetViewCache { source, .. } | Error::GetTableCache { source, .. } => {
|
||||
source.status_code()
|
||||
}
|
||||
Error::ListProcess { source, .. } => source.status_code(),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -51,7 +51,6 @@ use crate::error::{
|
||||
};
|
||||
use crate::information_schema::{InformationExtensionRef, InformationSchemaProvider};
|
||||
use crate::kvbackend::TableCacheRef;
|
||||
use crate::process_manager::ProcessManagerRef;
|
||||
use crate::system_schema::pg_catalog::PGCatalogProvider;
|
||||
use crate::system_schema::SystemSchemaProvider;
|
||||
use crate::CatalogManager;
|
||||
@@ -85,7 +84,6 @@ impl KvBackendCatalogManager {
|
||||
backend: KvBackendRef,
|
||||
cache_registry: LayeredCacheRegistryRef,
|
||||
procedure_manager: Option<ProcedureManagerRef>,
|
||||
process_manager: Option<ProcessManagerRef>,
|
||||
) -> Arc<Self> {
|
||||
Arc::new_cyclic(|me| Self {
|
||||
information_extension,
|
||||
@@ -104,14 +102,12 @@ impl KvBackendCatalogManager {
|
||||
DEFAULT_CATALOG_NAME.to_string(),
|
||||
me.clone(),
|
||||
Arc::new(FlowMetadataManager::new(backend.clone())),
|
||||
process_manager.clone(),
|
||||
)),
|
||||
pg_catalog_provider: Arc::new(PGCatalogProvider::new(
|
||||
DEFAULT_CATALOG_NAME.to_string(),
|
||||
me.clone(),
|
||||
)),
|
||||
backend,
|
||||
process_manager,
|
||||
},
|
||||
cache_registry,
|
||||
procedure_manager,
|
||||
@@ -423,7 +419,6 @@ struct SystemCatalog {
|
||||
information_schema_provider: Arc<InformationSchemaProvider>,
|
||||
pg_catalog_provider: Arc<PGCatalogProvider>,
|
||||
backend: KvBackendRef,
|
||||
process_manager: Option<ProcessManagerRef>,
|
||||
}
|
||||
|
||||
impl SystemCatalog {
|
||||
@@ -491,7 +486,6 @@ impl SystemCatalog {
|
||||
catalog.to_string(),
|
||||
self.catalog_manager.clone(),
|
||||
Arc::new(FlowMetadataManager::new(self.backend.clone())),
|
||||
self.process_manager.clone(),
|
||||
))
|
||||
});
|
||||
information_schema_provider.table(table_name)
|
||||
|
||||
@@ -40,7 +40,6 @@ pub mod information_schema {
|
||||
pub use crate::system_schema::information_schema::*;
|
||||
}
|
||||
|
||||
pub mod process_manager;
|
||||
pub mod table_source;
|
||||
|
||||
#[async_trait::async_trait]
|
||||
|
||||
@@ -356,7 +356,6 @@ impl MemoryCatalogManager {
|
||||
catalog,
|
||||
Arc::downgrade(self) as Weak<dyn CatalogManager>,
|
||||
Arc::new(FlowMetadataManager::new(Arc::new(MemoryKvBackend::new()))),
|
||||
None, // we don't need ProcessManager on regions server.
|
||||
);
|
||||
let information_schema = information_schema_provider.tables().clone();
|
||||
|
||||
|
||||
@@ -1,188 +0,0 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::collections::hash_map::Entry;
|
||||
use std::collections::HashMap;
|
||||
use std::sync::atomic::{AtomicU64, Ordering};
|
||||
use std::sync::{Arc, RwLock};
|
||||
|
||||
use api::v1::frontend::{ListProcessRequest, ProcessInfo};
|
||||
use common_frontend::selector::{FrontendSelector, MetaClientSelector};
|
||||
use common_telemetry::{debug, info};
|
||||
use common_time::util::current_time_millis;
|
||||
use meta_client::MetaClientRef;
|
||||
use snafu::ResultExt;
|
||||
|
||||
use crate::error;
|
||||
|
||||
pub type ProcessManagerRef = Arc<ProcessManager>;
|
||||
|
||||
pub struct ProcessManager {
|
||||
server_addr: String,
|
||||
next_id: AtomicU64,
|
||||
catalogs: RwLock<HashMap<String, HashMap<u64, ProcessInfo>>>,
|
||||
frontend_selector: Option<MetaClientSelector>,
|
||||
}
|
||||
|
||||
impl ProcessManager {
|
||||
/// Create a [ProcessManager] instance with server address and kv client.
|
||||
pub fn new(server_addr: String, meta_client: Option<MetaClientRef>) -> Self {
|
||||
let frontend_selector = meta_client.map(MetaClientSelector::new);
|
||||
Self {
|
||||
server_addr,
|
||||
next_id: Default::default(),
|
||||
catalogs: Default::default(),
|
||||
frontend_selector,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl ProcessManager {
|
||||
/// Registers a submitted query. Use the provided id if present.
|
||||
pub fn register_query(
|
||||
self: &Arc<Self>,
|
||||
catalog: String,
|
||||
schemas: Vec<String>,
|
||||
query: String,
|
||||
client: String,
|
||||
id: Option<u64>,
|
||||
) -> Ticket {
|
||||
let id = id.unwrap_or_else(|| self.next_id.fetch_add(1, Ordering::Relaxed));
|
||||
let process = ProcessInfo {
|
||||
id,
|
||||
catalog: catalog.clone(),
|
||||
schemas,
|
||||
query,
|
||||
start_timestamp: current_time_millis(),
|
||||
client,
|
||||
frontend: self.server_addr.clone(),
|
||||
};
|
||||
self.catalogs
|
||||
.write()
|
||||
.unwrap()
|
||||
.entry(catalog.clone())
|
||||
.or_default()
|
||||
.insert(id, process);
|
||||
Ticket {
|
||||
catalog,
|
||||
manager: self.clone(),
|
||||
id,
|
||||
}
|
||||
}
|
||||
|
||||
/// Generates the next process id.
|
||||
pub fn next_id(&self) -> u64 {
|
||||
self.next_id.fetch_add(1, Ordering::Relaxed)
|
||||
}
|
||||
|
||||
/// De-register a query from process list.
|
||||
pub fn deregister_query(&self, catalog: String, id: u64) {
|
||||
if let Entry::Occupied(mut o) = self.catalogs.write().unwrap().entry(catalog) {
|
||||
let process = o.get_mut().remove(&id);
|
||||
debug!("Deregister process: {:?}", process);
|
||||
if o.get_mut().is_empty() {
|
||||
o.remove();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn deregister_all_queries(&self) {
|
||||
self.catalogs.write().unwrap().clear();
|
||||
info!("All queries on {} has been deregistered", self.server_addr);
|
||||
}
|
||||
|
||||
/// List local running processes in given catalog.
|
||||
pub fn local_processes(&self, catalog: Option<&str>) -> error::Result<Vec<ProcessInfo>> {
|
||||
let catalogs = self.catalogs.read().unwrap();
|
||||
let result = if let Some(catalog) = catalog {
|
||||
if let Some(catalogs) = catalogs.get(catalog) {
|
||||
catalogs.values().cloned().collect()
|
||||
} else {
|
||||
vec![]
|
||||
}
|
||||
} else {
|
||||
catalogs
|
||||
.values()
|
||||
.flat_map(|v| v.values().cloned())
|
||||
.collect()
|
||||
};
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
pub async fn list_all_processes(
|
||||
&self,
|
||||
catalog: Option<&str>,
|
||||
) -> error::Result<Vec<ProcessInfo>> {
|
||||
let mut processes = vec![];
|
||||
if let Some(remote_frontend_selector) = self.frontend_selector.as_ref() {
|
||||
let frontends = remote_frontend_selector
|
||||
.select(|node| node.peer.addr != self.server_addr)
|
||||
.await
|
||||
.context(error::ListProcessSnafu)?;
|
||||
for mut f in frontends {
|
||||
processes.extend(
|
||||
f.list_process(ListProcessRequest {
|
||||
catalog: catalog.unwrap_or_default().to_string(),
|
||||
})
|
||||
.await
|
||||
.context(error::ListProcessSnafu)?
|
||||
.processes,
|
||||
);
|
||||
}
|
||||
}
|
||||
processes.extend(self.local_processes(catalog)?);
|
||||
Ok(processes)
|
||||
}
|
||||
}
|
||||
|
||||
pub struct Ticket {
|
||||
pub(crate) catalog: String,
|
||||
pub(crate) manager: ProcessManagerRef,
|
||||
pub(crate) id: u64,
|
||||
}
|
||||
|
||||
impl Drop for Ticket {
|
||||
fn drop(&mut self) {
|
||||
self.manager
|
||||
.deregister_query(std::mem::take(&mut self.catalog), self.id);
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::sync::Arc;
|
||||
|
||||
use crate::process_manager::ProcessManager;
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_register_query() {
|
||||
let process_manager = Arc::new(ProcessManager::new("127.0.0.1:8000".to_string(), None));
|
||||
let ticket = process_manager.clone().register_query(
|
||||
"public".to_string(),
|
||||
vec!["test".to_string()],
|
||||
"SELECT * FROM table".to_string(),
|
||||
"".to_string(),
|
||||
None,
|
||||
);
|
||||
|
||||
let running_processes = process_manager.local_processes(None).unwrap();
|
||||
assert_eq!(running_processes.len(), 1);
|
||||
assert_eq!(&running_processes[0].frontend, "127.0.0.1:8000");
|
||||
assert_eq!(running_processes[0].id, ticket.id);
|
||||
assert_eq!(&running_processes[0].query, "SELECT * FROM table");
|
||||
|
||||
drop(ticket);
|
||||
assert_eq!(process_manager.local_processes(None).unwrap().len(), 0);
|
||||
}
|
||||
}
|
||||
@@ -19,7 +19,6 @@ mod information_memory_table;
|
||||
pub mod key_column_usage;
|
||||
mod partitions;
|
||||
mod procedure_info;
|
||||
mod process_list;
|
||||
pub mod region_peers;
|
||||
mod region_statistics;
|
||||
mod runtime_metrics;
|
||||
@@ -43,7 +42,6 @@ use common_recordbatch::SendableRecordBatchStream;
|
||||
use datatypes::schema::SchemaRef;
|
||||
use lazy_static::lazy_static;
|
||||
use paste::paste;
|
||||
use process_list::InformationSchemaProcessList;
|
||||
use store_api::storage::{ScanRequest, TableId};
|
||||
use table::metadata::TableType;
|
||||
use table::TableRef;
|
||||
@@ -52,7 +50,6 @@ use views::InformationSchemaViews;
|
||||
|
||||
use self::columns::InformationSchemaColumns;
|
||||
use crate::error::{Error, Result};
|
||||
use crate::process_manager::ProcessManagerRef;
|
||||
use crate::system_schema::information_schema::cluster_info::InformationSchemaClusterInfo;
|
||||
use crate::system_schema::information_schema::flows::InformationSchemaFlows;
|
||||
use crate::system_schema::information_schema::information_memory_table::get_schema_columns;
|
||||
@@ -116,7 +113,6 @@ macro_rules! setup_memory_table {
|
||||
pub struct InformationSchemaProvider {
|
||||
catalog_name: String,
|
||||
catalog_manager: Weak<dyn CatalogManager>,
|
||||
process_manager: Option<ProcessManagerRef>,
|
||||
flow_metadata_manager: Arc<FlowMetadataManager>,
|
||||
tables: HashMap<String, TableRef>,
|
||||
}
|
||||
@@ -211,10 +207,6 @@ impl SystemSchemaProviderInner for InformationSchemaProvider {
|
||||
self.catalog_manager.clone(),
|
||||
),
|
||||
) as _),
|
||||
PROCESS_LIST => self
|
||||
.process_manager
|
||||
.as_ref()
|
||||
.map(|p| Arc::new(InformationSchemaProcessList::new(p.clone())) as _),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
@@ -225,13 +217,11 @@ impl InformationSchemaProvider {
|
||||
catalog_name: String,
|
||||
catalog_manager: Weak<dyn CatalogManager>,
|
||||
flow_metadata_manager: Arc<FlowMetadataManager>,
|
||||
process_manager: Option<ProcessManagerRef>,
|
||||
) -> Self {
|
||||
let mut provider = Self {
|
||||
catalog_name,
|
||||
catalog_manager,
|
||||
flow_metadata_manager,
|
||||
process_manager,
|
||||
tables: HashMap::new(),
|
||||
};
|
||||
|
||||
@@ -287,9 +277,6 @@ impl InformationSchemaProvider {
|
||||
self.build_table(TABLE_CONSTRAINTS).unwrap(),
|
||||
);
|
||||
tables.insert(FLOWS.to_string(), self.build_table(FLOWS).unwrap());
|
||||
if let Some(process_list) = self.build_table(PROCESS_LIST) {
|
||||
tables.insert(PROCESS_LIST.to_string(), process_list);
|
||||
}
|
||||
// Add memory tables
|
||||
for name in MEMORY_TABLES.iter() {
|
||||
tables.insert((*name).to_string(), self.build_table(name).expect(name));
|
||||
|
||||
@@ -1,189 +0,0 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_catalog::consts::INFORMATION_SCHEMA_PROCESS_LIST_TABLE_ID;
|
||||
use common_error::ext::BoxedError;
|
||||
use common_frontend::DisplayProcessId;
|
||||
use common_recordbatch::adapter::RecordBatchStreamAdapter;
|
||||
use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
|
||||
use common_time::util::current_time_millis;
|
||||
use common_time::{Duration, Timestamp};
|
||||
use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
|
||||
use datatypes::prelude::ConcreteDataType as CDT;
|
||||
use datatypes::scalars::ScalarVectorBuilder;
|
||||
use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
|
||||
use datatypes::value::Value;
|
||||
use datatypes::vectors::{
|
||||
DurationMillisecondVectorBuilder, StringVectorBuilder, TimestampMillisecondVectorBuilder,
|
||||
VectorRef,
|
||||
};
|
||||
use snafu::ResultExt;
|
||||
use store_api::storage::{ScanRequest, TableId};
|
||||
|
||||
use crate::error::{self, InternalSnafu};
|
||||
use crate::information_schema::Predicates;
|
||||
use crate::process_manager::ProcessManagerRef;
|
||||
use crate::system_schema::information_schema::InformationTable;
|
||||
|
||||
/// Column names of `information_schema.process_list`
|
||||
const ID: &str = "id";
|
||||
const CATALOG: &str = "catalog";
|
||||
const SCHEMAS: &str = "schemas";
|
||||
const QUERY: &str = "query";
|
||||
const CLIENT: &str = "client";
|
||||
const FRONTEND: &str = "frontend";
|
||||
const START_TIMESTAMP: &str = "start_timestamp";
|
||||
const ELAPSED_TIME: &str = "elapsed_time";
|
||||
|
||||
/// `information_schema.process_list` table implementation that tracks running
|
||||
/// queries in current cluster.
|
||||
pub struct InformationSchemaProcessList {
|
||||
schema: SchemaRef,
|
||||
process_manager: ProcessManagerRef,
|
||||
}
|
||||
|
||||
impl InformationSchemaProcessList {
|
||||
pub fn new(process_manager: ProcessManagerRef) -> Self {
|
||||
Self {
|
||||
schema: Self::schema(),
|
||||
process_manager,
|
||||
}
|
||||
}
|
||||
|
||||
fn schema() -> SchemaRef {
|
||||
Arc::new(Schema::new(vec![
|
||||
ColumnSchema::new(ID, CDT::string_datatype(), false),
|
||||
ColumnSchema::new(CATALOG, CDT::string_datatype(), false),
|
||||
ColumnSchema::new(SCHEMAS, CDT::string_datatype(), false),
|
||||
ColumnSchema::new(QUERY, CDT::string_datatype(), false),
|
||||
ColumnSchema::new(CLIENT, CDT::string_datatype(), false),
|
||||
ColumnSchema::new(FRONTEND, CDT::string_datatype(), false),
|
||||
ColumnSchema::new(
|
||||
START_TIMESTAMP,
|
||||
CDT::timestamp_millisecond_datatype(),
|
||||
false,
|
||||
),
|
||||
ColumnSchema::new(ELAPSED_TIME, CDT::duration_millisecond_datatype(), false),
|
||||
]))
|
||||
}
|
||||
}
|
||||
|
||||
impl InformationTable for InformationSchemaProcessList {
|
||||
fn table_id(&self) -> TableId {
|
||||
INFORMATION_SCHEMA_PROCESS_LIST_TABLE_ID
|
||||
}
|
||||
|
||||
fn table_name(&self) -> &'static str {
|
||||
"process_list"
|
||||
}
|
||||
|
||||
fn schema(&self) -> SchemaRef {
|
||||
self.schema.clone()
|
||||
}
|
||||
|
||||
fn to_stream(&self, request: ScanRequest) -> error::Result<SendableRecordBatchStream> {
|
||||
let process_manager = self.process_manager.clone();
|
||||
let stream = Box::pin(DfRecordBatchStreamAdapter::new(
|
||||
self.schema.arrow_schema().clone(),
|
||||
futures::stream::once(async move {
|
||||
make_process_list(process_manager, request)
|
||||
.await
|
||||
.map(RecordBatch::into_df_record_batch)
|
||||
.map_err(|e| datafusion::error::DataFusionError::External(Box::new(e)))
|
||||
}),
|
||||
));
|
||||
|
||||
Ok(Box::pin(
|
||||
RecordBatchStreamAdapter::try_new(stream)
|
||||
.map_err(BoxedError::new)
|
||||
.context(InternalSnafu)?,
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
/// Build running process list.
|
||||
async fn make_process_list(
|
||||
process_manager: ProcessManagerRef,
|
||||
request: ScanRequest,
|
||||
) -> error::Result<RecordBatch> {
|
||||
let predicates = Predicates::from_scan_request(&Some(request));
|
||||
let current_time = current_time_millis();
|
||||
// todo(hl): find a way to extract user catalog to filter queries from other users.
|
||||
let queries = process_manager.list_all_processes(None).await?;
|
||||
|
||||
let mut id_builder = StringVectorBuilder::with_capacity(queries.len());
|
||||
let mut catalog_builder = StringVectorBuilder::with_capacity(queries.len());
|
||||
let mut schemas_builder = StringVectorBuilder::with_capacity(queries.len());
|
||||
let mut query_builder = StringVectorBuilder::with_capacity(queries.len());
|
||||
let mut client_builder = StringVectorBuilder::with_capacity(queries.len());
|
||||
let mut frontend_builder = StringVectorBuilder::with_capacity(queries.len());
|
||||
let mut start_time_builder = TimestampMillisecondVectorBuilder::with_capacity(queries.len());
|
||||
let mut elapsed_time_builder = DurationMillisecondVectorBuilder::with_capacity(queries.len());
|
||||
|
||||
for process in queries {
|
||||
let display_id = DisplayProcessId {
|
||||
server_addr: process.frontend.to_string(),
|
||||
id: process.id,
|
||||
}
|
||||
.to_string();
|
||||
let schemas = process.schemas.join(",");
|
||||
let id = Value::from(display_id);
|
||||
let catalog = Value::from(process.catalog);
|
||||
let schemas = Value::from(schemas);
|
||||
let query = Value::from(process.query);
|
||||
let client = Value::from(process.client);
|
||||
let frontend = Value::from(process.frontend);
|
||||
let start_timestamp = Value::from(Timestamp::new_millisecond(process.start_timestamp));
|
||||
let elapsed_time = Value::from(Duration::new_millisecond(
|
||||
current_time - process.start_timestamp,
|
||||
));
|
||||
let row = [
|
||||
(ID, &id),
|
||||
(CATALOG, &catalog),
|
||||
(SCHEMAS, &schemas),
|
||||
(QUERY, &query),
|
||||
(CLIENT, &client),
|
||||
(FRONTEND, &frontend),
|
||||
(START_TIMESTAMP, &start_timestamp),
|
||||
(ELAPSED_TIME, &elapsed_time),
|
||||
];
|
||||
if predicates.eval(&row) {
|
||||
id_builder.push(id.as_string().as_deref());
|
||||
catalog_builder.push(catalog.as_string().as_deref());
|
||||
schemas_builder.push(schemas.as_string().as_deref());
|
||||
query_builder.push(query.as_string().as_deref());
|
||||
client_builder.push(client.as_string().as_deref());
|
||||
frontend_builder.push(frontend.as_string().as_deref());
|
||||
start_time_builder.push(start_timestamp.as_timestamp().map(|t| t.value().into()));
|
||||
elapsed_time_builder.push(elapsed_time.as_duration().map(|d| d.value().into()));
|
||||
}
|
||||
}
|
||||
|
||||
RecordBatch::new(
|
||||
InformationSchemaProcessList::schema(),
|
||||
vec![
|
||||
Arc::new(id_builder.finish()) as VectorRef,
|
||||
Arc::new(catalog_builder.finish()) as VectorRef,
|
||||
Arc::new(schemas_builder.finish()) as VectorRef,
|
||||
Arc::new(query_builder.finish()) as VectorRef,
|
||||
Arc::new(client_builder.finish()) as VectorRef,
|
||||
Arc::new(frontend_builder.finish()) as VectorRef,
|
||||
Arc::new(start_time_builder.finish()) as VectorRef,
|
||||
Arc::new(elapsed_time_builder.finish()) as VectorRef,
|
||||
],
|
||||
)
|
||||
.context(error::CreateRecordBatchSnafu)
|
||||
}
|
||||
@@ -47,4 +47,3 @@ pub const VIEWS: &str = "views";
|
||||
pub const FLOWS: &str = "flows";
|
||||
pub const PROCEDURE_INFO: &str = "procedure_info";
|
||||
pub const REGION_STATISTICS: &str = "region_statistics";
|
||||
pub const PROCESS_LIST: &str = "process_list";
|
||||
|
||||
@@ -328,7 +328,6 @@ mod tests {
|
||||
backend.clone(),
|
||||
layered_cache_registry,
|
||||
None,
|
||||
None,
|
||||
);
|
||||
let table_metadata_manager = TableMetadataManager::new(backend);
|
||||
let mut view_info = common_meta::key::test_utils::new_test_table_info(1024, vec![]);
|
||||
|
||||
@@ -1,39 +0,0 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
mod export;
|
||||
mod import;
|
||||
|
||||
use clap::Subcommand;
|
||||
use common_error::ext::BoxedError;
|
||||
|
||||
use crate::data::export::ExportCommand;
|
||||
use crate::data::import::ImportCommand;
|
||||
use crate::Tool;
|
||||
|
||||
/// Command for data operations including exporting data from and importing data into GreptimeDB.
|
||||
#[derive(Subcommand)]
|
||||
pub enum DataCommand {
|
||||
Export(ExportCommand),
|
||||
Import(ImportCommand),
|
||||
}
|
||||
|
||||
impl DataCommand {
|
||||
pub async fn build(&self) -> std::result::Result<Box<dyn Tool>, BoxedError> {
|
||||
match self {
|
||||
DataCommand::Export(cmd) => cmd.build().await,
|
||||
DataCommand::Import(cmd) => cmd.build().await,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -30,7 +30,6 @@ pub enum Error {
|
||||
location: Location,
|
||||
msg: String,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to create default catalog and schema"))]
|
||||
InitMetadata {
|
||||
#[snafu(implicit)]
|
||||
@@ -229,25 +228,22 @@ pub enum Error {
|
||||
#[snafu(source)]
|
||||
error: ObjectStoreError,
|
||||
},
|
||||
|
||||
#[snafu(display("S3 config need be set"))]
|
||||
S3ConfigNotSet {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Output directory not set"))]
|
||||
OutputDirNotSet {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Empty store addresses"))]
|
||||
EmptyStoreAddrs {
|
||||
#[snafu(display("KV backend not set: {}", backend))]
|
||||
KvBackendNotSet {
|
||||
backend: String,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Unsupported memory backend"))]
|
||||
UnsupportedMemoryBackend {
|
||||
#[snafu(implicit)]
|
||||
@@ -260,13 +256,6 @@ pub enum Error {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Invalid arguments: {}", msg))]
|
||||
InvalidArguments {
|
||||
msg: String,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
}
|
||||
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
@@ -287,7 +276,6 @@ impl ErrorExt for Error {
|
||||
| Error::EmptyResult { .. }
|
||||
| Error::InvalidFilePath { .. }
|
||||
| Error::UnsupportedMemoryBackend { .. }
|
||||
| Error::InvalidArguments { .. }
|
||||
| Error::ParseProxyOpts { .. } => StatusCode::InvalidArguments,
|
||||
|
||||
Error::StartProcedureManager { source, .. }
|
||||
@@ -308,7 +296,7 @@ impl ErrorExt for Error {
|
||||
Error::OpenDal { .. } => StatusCode::Internal,
|
||||
Error::S3ConfigNotSet { .. }
|
||||
| Error::OutputDirNotSet { .. }
|
||||
| Error::EmptyStoreAddrs { .. } => StatusCode::InvalidArguments,
|
||||
| Error::KvBackendNotSet { .. } => StatusCode::InvalidArguments,
|
||||
|
||||
Error::BuildRuntime { source, .. } => source.status_code(),
|
||||
|
||||
|
||||
@@ -13,20 +13,22 @@
|
||||
// limitations under the License.
|
||||
|
||||
mod bench;
|
||||
mod data;
|
||||
mod database;
|
||||
pub mod error;
|
||||
mod metadata;
|
||||
mod export;
|
||||
mod import;
|
||||
mod meta_snapshot;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use clap::Parser;
|
||||
use clap::{Parser, Subcommand};
|
||||
use common_error::ext::BoxedError;
|
||||
pub use database::DatabaseClient;
|
||||
use error::Result;
|
||||
|
||||
pub use crate::bench::BenchTableMetadataCommand;
|
||||
pub use crate::data::DataCommand;
|
||||
pub use crate::metadata::MetadataCommand;
|
||||
pub use crate::export::ExportCommand;
|
||||
pub use crate::import::ImportCommand;
|
||||
pub use crate::meta_snapshot::{MetaCommand, MetaInfoCommand, MetaRestoreCommand, MetaSaveCommand};
|
||||
|
||||
#[async_trait]
|
||||
pub trait Tool: Send + Sync {
|
||||
@@ -49,3 +51,19 @@ impl AttachCommand {
|
||||
unimplemented!("Wait for https://github.com/GreptimeTeam/greptimedb/issues/2373")
|
||||
}
|
||||
}
|
||||
|
||||
/// Subcommand for data operations like export and import.
|
||||
#[derive(Subcommand)]
|
||||
pub enum DataCommand {
|
||||
Export(ExportCommand),
|
||||
Import(ImportCommand),
|
||||
}
|
||||
|
||||
impl DataCommand {
|
||||
pub async fn build(&self) -> std::result::Result<Box<dyn Tool>, BoxedError> {
|
||||
match self {
|
||||
DataCommand::Export(cmd) => cmd.build().await,
|
||||
DataCommand::Import(cmd) => cmd.build().await,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -13,37 +13,139 @@
|
||||
// limitations under the License.
|
||||
|
||||
use std::path::Path;
|
||||
use std::sync::Arc;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use clap::{Parser, Subcommand};
|
||||
use common_base::secrets::{ExposeSecret, SecretString};
|
||||
use common_error::ext::BoxedError;
|
||||
use common_meta::kv_backend::chroot::ChrootKvBackend;
|
||||
use common_meta::kv_backend::etcd::EtcdStore;
|
||||
use common_meta::kv_backend::KvBackendRef;
|
||||
use common_meta::snapshot::MetadataSnapshotManager;
|
||||
use meta_srv::bootstrap::create_etcd_client;
|
||||
use meta_srv::metasrv::BackendImpl;
|
||||
use object_store::services::{Fs, S3};
|
||||
use object_store::ObjectStore;
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
|
||||
use crate::error::{InvalidFilePathSnafu, OpenDalSnafu, S3ConfigNotSetSnafu};
|
||||
use crate::metadata::common::StoreConfig;
|
||||
use crate::error::{
|
||||
InvalidFilePathSnafu, KvBackendNotSetSnafu, OpenDalSnafu, S3ConfigNotSetSnafu,
|
||||
UnsupportedMemoryBackendSnafu,
|
||||
};
|
||||
use crate::Tool;
|
||||
|
||||
/// Subcommand for metadata snapshot operations, including saving snapshots, restoring from snapshots, and viewing snapshot information.
|
||||
/// Subcommand for metadata snapshot management.
|
||||
#[derive(Subcommand)]
|
||||
pub enum SnapshotCommand {
|
||||
/// Save a snapshot of the current metadata state to a specified location.
|
||||
Save(SaveCommand),
|
||||
/// Restore metadata from a snapshot.
|
||||
Restore(RestoreCommand),
|
||||
/// Explore metadata from a snapshot.
|
||||
Info(InfoCommand),
|
||||
pub enum MetaCommand {
|
||||
#[clap(subcommand)]
|
||||
Snapshot(MetaSnapshotCommand),
|
||||
}
|
||||
|
||||
impl SnapshotCommand {
|
||||
impl MetaCommand {
|
||||
pub async fn build(&self) -> Result<Box<dyn Tool>, BoxedError> {
|
||||
match self {
|
||||
SnapshotCommand::Save(cmd) => cmd.build().await,
|
||||
SnapshotCommand::Restore(cmd) => cmd.build().await,
|
||||
SnapshotCommand::Info(cmd) => cmd.build().await,
|
||||
MetaCommand::Snapshot(cmd) => cmd.build().await,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Subcommand for metadata snapshot operations. such as save, restore and info.
|
||||
#[derive(Subcommand)]
|
||||
pub enum MetaSnapshotCommand {
|
||||
/// Export metadata snapshot tool.
|
||||
Save(MetaSaveCommand),
|
||||
/// Restore metadata snapshot tool.
|
||||
Restore(MetaRestoreCommand),
|
||||
/// Explore metadata from metadata snapshot.
|
||||
Info(MetaInfoCommand),
|
||||
}
|
||||
|
||||
impl MetaSnapshotCommand {
|
||||
pub async fn build(&self) -> Result<Box<dyn Tool>, BoxedError> {
|
||||
match self {
|
||||
MetaSnapshotCommand::Save(cmd) => cmd.build().await,
|
||||
MetaSnapshotCommand::Restore(cmd) => cmd.build().await,
|
||||
MetaSnapshotCommand::Info(cmd) => cmd.build().await,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Default, Parser)]
|
||||
struct MetaConnection {
|
||||
/// The endpoint of store. one of etcd, pg or mysql.
|
||||
#[clap(long, alias = "store-addr", value_delimiter = ',', num_args = 1..)]
|
||||
store_addrs: Vec<String>,
|
||||
/// The database backend.
|
||||
#[clap(long, value_enum)]
|
||||
backend: Option<BackendImpl>,
|
||||
#[clap(long, default_value = "")]
|
||||
store_key_prefix: String,
|
||||
#[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
|
||||
#[clap(long,default_value = common_meta::kv_backend::DEFAULT_META_TABLE_NAME)]
|
||||
meta_table_name: String,
|
||||
#[clap(long, default_value = "128")]
|
||||
max_txn_ops: usize,
|
||||
}
|
||||
|
||||
impl MetaConnection {
|
||||
pub async fn build(&self) -> Result<KvBackendRef, BoxedError> {
|
||||
let max_txn_ops = self.max_txn_ops;
|
||||
let store_addrs = &self.store_addrs;
|
||||
if store_addrs.is_empty() {
|
||||
KvBackendNotSetSnafu { backend: "all" }
|
||||
.fail()
|
||||
.map_err(BoxedError::new)
|
||||
} else {
|
||||
let kvbackend = match self.backend {
|
||||
Some(BackendImpl::EtcdStore) => {
|
||||
let etcd_client = create_etcd_client(store_addrs)
|
||||
.await
|
||||
.map_err(BoxedError::new)?;
|
||||
Ok(EtcdStore::with_etcd_client(etcd_client, max_txn_ops))
|
||||
}
|
||||
#[cfg(feature = "pg_kvbackend")]
|
||||
Some(BackendImpl::PostgresStore) => {
|
||||
let table_name = &self.meta_table_name;
|
||||
let pool = meta_srv::bootstrap::create_postgres_pool(store_addrs)
|
||||
.await
|
||||
.map_err(BoxedError::new)?;
|
||||
Ok(common_meta::kv_backend::rds::PgStore::with_pg_pool(
|
||||
pool,
|
||||
table_name,
|
||||
max_txn_ops,
|
||||
)
|
||||
.await
|
||||
.map_err(BoxedError::new)?)
|
||||
}
|
||||
#[cfg(feature = "mysql_kvbackend")]
|
||||
Some(BackendImpl::MysqlStore) => {
|
||||
let table_name = &self.meta_table_name;
|
||||
let pool = meta_srv::bootstrap::create_mysql_pool(store_addrs)
|
||||
.await
|
||||
.map_err(BoxedError::new)?;
|
||||
Ok(common_meta::kv_backend::rds::MySqlStore::with_mysql_pool(
|
||||
pool,
|
||||
table_name,
|
||||
max_txn_ops,
|
||||
)
|
||||
.await
|
||||
.map_err(BoxedError::new)?)
|
||||
}
|
||||
Some(BackendImpl::MemoryStore) => UnsupportedMemoryBackendSnafu
|
||||
.fail()
|
||||
.map_err(BoxedError::new),
|
||||
_ => KvBackendNotSetSnafu { backend: "all" }
|
||||
.fail()
|
||||
.map_err(BoxedError::new),
|
||||
};
|
||||
if self.store_key_prefix.is_empty() {
|
||||
kvbackend
|
||||
} else {
|
||||
let chroot_kvbackend =
|
||||
ChrootKvBackend::new(self.store_key_prefix.as_bytes().to_vec(), kvbackend?);
|
||||
Ok(Arc::new(chroot_kvbackend))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -112,10 +214,10 @@ impl S3Config {
|
||||
/// It will dump the metadata snapshot to local file or s3 bucket.
|
||||
/// The snapshot file will be in binary format.
|
||||
#[derive(Debug, Default, Parser)]
|
||||
pub struct SaveCommand {
|
||||
/// The store configuration.
|
||||
pub struct MetaSaveCommand {
|
||||
/// The connection to the metadata store.
|
||||
#[clap(flatten)]
|
||||
store: StoreConfig,
|
||||
connection: MetaConnection,
|
||||
/// The s3 config.
|
||||
#[clap(flatten)]
|
||||
s3_config: S3Config,
|
||||
@@ -138,9 +240,9 @@ fn create_local_file_object_store(root: &str) -> Result<ObjectStore, BoxedError>
|
||||
Ok(object_store)
|
||||
}
|
||||
|
||||
impl SaveCommand {
|
||||
impl MetaSaveCommand {
|
||||
pub async fn build(&self) -> Result<Box<dyn Tool>, BoxedError> {
|
||||
let kvbackend = self.store.build().await?;
|
||||
let kvbackend = self.connection.build().await?;
|
||||
let output_dir = &self.output_dir;
|
||||
let object_store = self.s3_config.build(output_dir).map_err(BoxedError::new)?;
|
||||
if let Some(store) = object_store {
|
||||
@@ -160,7 +262,7 @@ impl SaveCommand {
|
||||
}
|
||||
}
|
||||
|
||||
struct MetaSnapshotTool {
|
||||
pub struct MetaSnapshotTool {
|
||||
inner: MetadataSnapshotManager,
|
||||
target_file: String,
|
||||
}
|
||||
@@ -176,16 +278,14 @@ impl Tool for MetaSnapshotTool {
|
||||
}
|
||||
}
|
||||
|
||||
/// Restore metadata from a snapshot file.
|
||||
///
|
||||
/// This command restores the metadata state from a previously saved snapshot.
|
||||
/// The snapshot can be loaded from either a local file system or an S3 bucket,
|
||||
/// depending on the provided configuration.
|
||||
/// Restore metadata snapshot tool.
|
||||
/// This tool is used to restore metadata snapshot from etcd, pg or mysql.
|
||||
/// It will restore the metadata snapshot from local file or s3 bucket.
|
||||
#[derive(Debug, Default, Parser)]
|
||||
pub struct RestoreCommand {
|
||||
/// The store configuration.
|
||||
pub struct MetaRestoreCommand {
|
||||
/// The connection to the metadata store.
|
||||
#[clap(flatten)]
|
||||
store: StoreConfig,
|
||||
connection: MetaConnection,
|
||||
/// The s3 config.
|
||||
#[clap(flatten)]
|
||||
s3_config: S3Config,
|
||||
@@ -199,9 +299,9 @@ pub struct RestoreCommand {
|
||||
force: bool,
|
||||
}
|
||||
|
||||
impl RestoreCommand {
|
||||
impl MetaRestoreCommand {
|
||||
pub async fn build(&self) -> Result<Box<dyn Tool>, BoxedError> {
|
||||
let kvbackend = self.store.build().await?;
|
||||
let kvbackend = self.connection.build().await?;
|
||||
let input_dir = &self.input_dir;
|
||||
let object_store = self.s3_config.build(input_dir).map_err(BoxedError::new)?;
|
||||
if let Some(store) = object_store {
|
||||
@@ -223,7 +323,7 @@ impl RestoreCommand {
|
||||
}
|
||||
}
|
||||
|
||||
struct MetaRestoreTool {
|
||||
pub struct MetaRestoreTool {
|
||||
inner: MetadataSnapshotManager,
|
||||
source_file: String,
|
||||
force: bool,
|
||||
@@ -272,12 +372,9 @@ impl Tool for MetaRestoreTool {
|
||||
}
|
||||
}
|
||||
|
||||
/// Explore metadata from a snapshot file.
|
||||
///
|
||||
/// This command allows filtering the metadata by a specific key and limiting the number of results.
|
||||
/// It prints the filtered metadata to the console.
|
||||
/// Explore metadata from metadata snapshot.
|
||||
#[derive(Debug, Default, Parser)]
|
||||
pub struct InfoCommand {
|
||||
pub struct MetaInfoCommand {
|
||||
/// The s3 config.
|
||||
#[clap(flatten)]
|
||||
s3_config: S3Config,
|
||||
@@ -292,7 +389,7 @@ pub struct InfoCommand {
|
||||
limit: Option<usize>,
|
||||
}
|
||||
|
||||
struct MetaInfoTool {
|
||||
pub struct MetaInfoTool {
|
||||
inner: ObjectStore,
|
||||
source_file: String,
|
||||
inspect_key: String,
|
||||
@@ -301,7 +398,6 @@ struct MetaInfoTool {
|
||||
|
||||
#[async_trait]
|
||||
impl Tool for MetaInfoTool {
|
||||
#[allow(clippy::print_stdout)]
|
||||
async fn do_work(&self) -> std::result::Result<(), BoxedError> {
|
||||
let result = MetadataSnapshotManager::info(
|
||||
&self.inner,
|
||||
@@ -318,7 +414,7 @@ impl Tool for MetaInfoTool {
|
||||
}
|
||||
}
|
||||
|
||||
impl InfoCommand {
|
||||
impl MetaInfoCommand {
|
||||
fn decide_object_store_root_for_local_store(
|
||||
file_path: &str,
|
||||
) -> Result<(&str, &str), BoxedError> {
|
||||
@@ -1,42 +0,0 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
mod common;
|
||||
mod control;
|
||||
mod snapshot;
|
||||
|
||||
use clap::Subcommand;
|
||||
use common_error::ext::BoxedError;
|
||||
|
||||
use crate::metadata::control::ControlCommand;
|
||||
use crate::metadata::snapshot::SnapshotCommand;
|
||||
use crate::Tool;
|
||||
|
||||
/// Command for managing metadata operations, including saving metadata snapshots and restoring metadata from snapshots.
|
||||
#[derive(Subcommand)]
|
||||
pub enum MetadataCommand {
|
||||
#[clap(subcommand)]
|
||||
Snapshot(SnapshotCommand),
|
||||
#[clap(subcommand)]
|
||||
Control(ControlCommand),
|
||||
}
|
||||
|
||||
impl MetadataCommand {
|
||||
pub async fn build(&self) -> Result<Box<dyn Tool>, BoxedError> {
|
||||
match self {
|
||||
MetadataCommand::Snapshot(cmd) => cmd.build().await,
|
||||
MetadataCommand::Control(cmd) => cmd.build().await,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,116 +0,0 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use clap::Parser;
|
||||
use common_error::ext::BoxedError;
|
||||
use common_meta::kv_backend::chroot::ChrootKvBackend;
|
||||
use common_meta::kv_backend::etcd::EtcdStore;
|
||||
use common_meta::kv_backend::KvBackendRef;
|
||||
use meta_srv::bootstrap::create_etcd_client;
|
||||
use meta_srv::metasrv::BackendImpl;
|
||||
|
||||
use crate::error::{EmptyStoreAddrsSnafu, UnsupportedMemoryBackendSnafu};
|
||||
|
||||
#[derive(Debug, Default, Parser)]
|
||||
pub(crate) struct StoreConfig {
|
||||
/// The endpoint of store. one of etcd, postgres or mysql.
|
||||
///
|
||||
/// For postgres store, the format is:
|
||||
/// "password=password dbname=postgres user=postgres host=localhost port=5432"
|
||||
///
|
||||
/// For etcd store, the format is:
|
||||
/// "127.0.0.1:2379"
|
||||
///
|
||||
/// For mysql store, the format is:
|
||||
/// "mysql://user:password@ip:port/dbname"
|
||||
#[clap(long, alias = "store-addr", value_delimiter = ',', num_args = 1..)]
|
||||
store_addrs: Vec<String>,
|
||||
|
||||
/// The maximum number of operations in a transaction. Only used when using [etcd-store].
|
||||
#[clap(long, default_value = "128")]
|
||||
max_txn_ops: usize,
|
||||
|
||||
/// The metadata store backend.
|
||||
#[clap(long, value_enum, default_value = "etcd-store")]
|
||||
backend: BackendImpl,
|
||||
|
||||
/// The key prefix of the metadata store.
|
||||
#[clap(long, default_value = "")]
|
||||
store_key_prefix: String,
|
||||
|
||||
/// The table name in RDS to store metadata. Only used when using [postgres-store] or [mysql-store].
|
||||
#[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
|
||||
#[clap(long, default_value = common_meta::kv_backend::DEFAULT_META_TABLE_NAME)]
|
||||
meta_table_name: String,
|
||||
}
|
||||
|
||||
impl StoreConfig {
|
||||
/// Builds a [`KvBackendRef`] from the store configuration.
|
||||
pub async fn build(&self) -> Result<KvBackendRef, BoxedError> {
|
||||
let max_txn_ops = self.max_txn_ops;
|
||||
let store_addrs = &self.store_addrs;
|
||||
if store_addrs.is_empty() {
|
||||
EmptyStoreAddrsSnafu.fail().map_err(BoxedError::new)
|
||||
} else {
|
||||
let kvbackend = match self.backend {
|
||||
BackendImpl::EtcdStore => {
|
||||
let etcd_client = create_etcd_client(store_addrs)
|
||||
.await
|
||||
.map_err(BoxedError::new)?;
|
||||
Ok(EtcdStore::with_etcd_client(etcd_client, max_txn_ops))
|
||||
}
|
||||
#[cfg(feature = "pg_kvbackend")]
|
||||
BackendImpl::PostgresStore => {
|
||||
let table_name = &self.meta_table_name;
|
||||
let pool = meta_srv::bootstrap::create_postgres_pool(store_addrs)
|
||||
.await
|
||||
.map_err(BoxedError::new)?;
|
||||
Ok(common_meta::kv_backend::rds::PgStore::with_pg_pool(
|
||||
pool,
|
||||
table_name,
|
||||
max_txn_ops,
|
||||
)
|
||||
.await
|
||||
.map_err(BoxedError::new)?)
|
||||
}
|
||||
#[cfg(feature = "mysql_kvbackend")]
|
||||
BackendImpl::MysqlStore => {
|
||||
let table_name = &self.meta_table_name;
|
||||
let pool = meta_srv::bootstrap::create_mysql_pool(store_addrs)
|
||||
.await
|
||||
.map_err(BoxedError::new)?;
|
||||
Ok(common_meta::kv_backend::rds::MySqlStore::with_mysql_pool(
|
||||
pool,
|
||||
table_name,
|
||||
max_txn_ops,
|
||||
)
|
||||
.await
|
||||
.map_err(BoxedError::new)?)
|
||||
}
|
||||
BackendImpl::MemoryStore => UnsupportedMemoryBackendSnafu
|
||||
.fail()
|
||||
.map_err(BoxedError::new),
|
||||
};
|
||||
if self.store_key_prefix.is_empty() {
|
||||
kvbackend
|
||||
} else {
|
||||
let chroot_kvbackend =
|
||||
ChrootKvBackend::new(self.store_key_prefix.as_bytes().to_vec(), kvbackend?);
|
||||
Ok(Arc::new(chroot_kvbackend))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,38 +0,0 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
mod get;
|
||||
mod utils;
|
||||
|
||||
use clap::Subcommand;
|
||||
use common_error::ext::BoxedError;
|
||||
use get::GetCommand;
|
||||
|
||||
use crate::Tool;
|
||||
|
||||
/// Subcommand for metadata control.
|
||||
#[derive(Subcommand)]
|
||||
pub enum ControlCommand {
|
||||
/// Get the metadata from the metasrv.
|
||||
#[clap(subcommand)]
|
||||
Get(GetCommand),
|
||||
}
|
||||
|
||||
impl ControlCommand {
|
||||
pub async fn build(&self) -> Result<Box<dyn Tool>, BoxedError> {
|
||||
match self {
|
||||
ControlCommand::Get(cmd) => cmd.build().await,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,242 +0,0 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::cmp::min;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use clap::{Parser, Subcommand};
|
||||
use client::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
|
||||
use common_catalog::format_full_table_name;
|
||||
use common_error::ext::BoxedError;
|
||||
use common_meta::key::table_info::TableInfoKey;
|
||||
use common_meta::key::table_name::TableNameKey;
|
||||
use common_meta::key::table_route::TableRouteKey;
|
||||
use common_meta::key::TableMetadataManager;
|
||||
use common_meta::kv_backend::KvBackendRef;
|
||||
use common_meta::range_stream::{PaginationStream, DEFAULT_PAGE_SIZE};
|
||||
use common_meta::rpc::store::RangeRequest;
|
||||
use futures::TryStreamExt;
|
||||
|
||||
use crate::error::InvalidArgumentsSnafu;
|
||||
use crate::metadata::common::StoreConfig;
|
||||
use crate::metadata::control::utils::{decode_key_value, json_fromatter};
|
||||
use crate::Tool;
|
||||
|
||||
/// Subcommand for get command.
|
||||
#[derive(Subcommand)]
|
||||
pub enum GetCommand {
|
||||
Key(GetKeyCommand),
|
||||
Table(GetTableCommand),
|
||||
}
|
||||
|
||||
impl GetCommand {
|
||||
pub async fn build(&self) -> Result<Box<dyn Tool>, BoxedError> {
|
||||
match self {
|
||||
GetCommand::Key(cmd) => cmd.build().await,
|
||||
GetCommand::Table(cmd) => cmd.build().await,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Get key-value pairs from the metadata store.
|
||||
#[derive(Debug, Default, Parser)]
|
||||
pub struct GetKeyCommand {
|
||||
/// The key to get from the metadata store. If empty, returns all key-value pairs.
|
||||
#[clap(default_value = "")]
|
||||
key: String,
|
||||
|
||||
/// Whether to perform a prefix query. If true, returns all key-value pairs where the key starts with the given prefix.
|
||||
#[clap(long, default_value = "false")]
|
||||
prefix: bool,
|
||||
|
||||
/// The maximum number of key-value pairs to return. If 0, returns all key-value pairs.
|
||||
#[clap(long, default_value = "0")]
|
||||
limit: u64,
|
||||
|
||||
#[clap(flatten)]
|
||||
store: StoreConfig,
|
||||
}
|
||||
|
||||
impl GetKeyCommand {
|
||||
pub async fn build(&self) -> Result<Box<dyn Tool>, BoxedError> {
|
||||
let kvbackend = self.store.build().await?;
|
||||
Ok(Box::new(GetKeyTool {
|
||||
kvbackend,
|
||||
key: self.key.clone(),
|
||||
prefix: self.prefix,
|
||||
limit: self.limit,
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
struct GetKeyTool {
|
||||
kvbackend: KvBackendRef,
|
||||
key: String,
|
||||
prefix: bool,
|
||||
limit: u64,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl Tool for GetKeyTool {
|
||||
async fn do_work(&self) -> Result<(), BoxedError> {
|
||||
let mut req = RangeRequest::default();
|
||||
if self.prefix {
|
||||
req = req.with_prefix(self.key.as_bytes());
|
||||
} else {
|
||||
req = req.with_key(self.key.as_bytes());
|
||||
}
|
||||
let page_size = if self.limit > 0 {
|
||||
min(self.limit as usize, DEFAULT_PAGE_SIZE)
|
||||
} else {
|
||||
DEFAULT_PAGE_SIZE
|
||||
};
|
||||
let pagination_stream =
|
||||
PaginationStream::new(self.kvbackend.clone(), req, page_size, decode_key_value);
|
||||
let mut stream = Box::pin(pagination_stream.into_stream());
|
||||
let mut counter = 0;
|
||||
|
||||
while let Some((key, value)) = stream.try_next().await.map_err(BoxedError::new)? {
|
||||
print!("{}\n{}\n", key, value);
|
||||
counter += 1;
|
||||
if self.limit > 0 && counter >= self.limit {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Get table metadata from the metadata store via table id.
|
||||
#[derive(Debug, Default, Parser)]
|
||||
pub struct GetTableCommand {
|
||||
/// Get table metadata by table id.
|
||||
#[clap(long)]
|
||||
table_id: Option<u32>,
|
||||
|
||||
/// Get table metadata by table name.
|
||||
#[clap(long)]
|
||||
table_name: Option<String>,
|
||||
|
||||
/// The schema name of the table.
|
||||
#[clap(long)]
|
||||
schema_name: Option<String>,
|
||||
|
||||
/// Pretty print the output.
|
||||
#[clap(long, default_value = "false")]
|
||||
pretty: bool,
|
||||
|
||||
#[clap(flatten)]
|
||||
store: StoreConfig,
|
||||
}
|
||||
|
||||
impl GetTableCommand {
|
||||
pub fn validate(&self) -> Result<(), BoxedError> {
|
||||
if self.table_id.is_none() && self.table_name.is_none() {
|
||||
return Err(BoxedError::new(
|
||||
InvalidArgumentsSnafu {
|
||||
msg: "You must specify either --table-id or --table-name.",
|
||||
}
|
||||
.build(),
|
||||
));
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
struct GetTableTool {
|
||||
kvbackend: KvBackendRef,
|
||||
table_id: Option<u32>,
|
||||
table_name: Option<String>,
|
||||
schema_name: Option<String>,
|
||||
pretty: bool,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl Tool for GetTableTool {
|
||||
async fn do_work(&self) -> Result<(), BoxedError> {
|
||||
let table_metadata_manager = TableMetadataManager::new(self.kvbackend.clone());
|
||||
let table_name_manager = table_metadata_manager.table_name_manager();
|
||||
let table_info_manager = table_metadata_manager.table_info_manager();
|
||||
let table_route_manager = table_metadata_manager.table_route_manager();
|
||||
|
||||
let table_id = if let Some(table_name) = &self.table_name {
|
||||
let catalog = DEFAULT_CATALOG_NAME.to_string();
|
||||
let schema_name = self
|
||||
.schema_name
|
||||
.clone()
|
||||
.unwrap_or_else(|| DEFAULT_SCHEMA_NAME.to_string());
|
||||
let key = TableNameKey::new(&catalog, &schema_name, table_name);
|
||||
|
||||
let Some(table_name) = table_name_manager.get(key).await.map_err(BoxedError::new)?
|
||||
else {
|
||||
println!(
|
||||
"Table({}) not found",
|
||||
format_full_table_name(&catalog, &schema_name, table_name)
|
||||
);
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
table_name.table_id()
|
||||
} else {
|
||||
// Safety: we have validated that table_id or table_name is not None
|
||||
self.table_id.unwrap()
|
||||
};
|
||||
|
||||
let table_info = table_info_manager
|
||||
.get(table_id)
|
||||
.await
|
||||
.map_err(BoxedError::new)?;
|
||||
if let Some(table_info) = table_info {
|
||||
println!(
|
||||
"{}\n{}",
|
||||
TableInfoKey::new(table_id),
|
||||
json_fromatter(self.pretty, &*table_info)
|
||||
);
|
||||
} else {
|
||||
println!("Table info not found");
|
||||
}
|
||||
|
||||
let table_route = table_route_manager
|
||||
.table_route_storage()
|
||||
.get(table_id)
|
||||
.await
|
||||
.map_err(BoxedError::new)?;
|
||||
if let Some(table_route) = table_route {
|
||||
println!(
|
||||
"{}\n{}",
|
||||
TableRouteKey::new(table_id),
|
||||
json_fromatter(self.pretty, &table_route)
|
||||
);
|
||||
} else {
|
||||
println!("Table route not found");
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl GetTableCommand {
|
||||
pub async fn build(&self) -> Result<Box<dyn Tool>, BoxedError> {
|
||||
self.validate()?;
|
||||
let kvbackend = self.store.build().await?;
|
||||
Ok(Box::new(GetTableTool {
|
||||
kvbackend,
|
||||
table_id: self.table_id,
|
||||
table_name: self.table_name.clone(),
|
||||
schema_name: self.schema_name.clone(),
|
||||
pretty: self.pretty,
|
||||
}))
|
||||
}
|
||||
}
|
||||
@@ -1,36 +0,0 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use common_meta::error::Result as CommonMetaResult;
|
||||
use common_meta::rpc::KeyValue;
|
||||
use serde::Serialize;
|
||||
|
||||
/// Decodes a key-value pair into a string.
|
||||
pub fn decode_key_value(kv: KeyValue) -> CommonMetaResult<(String, String)> {
|
||||
let key = String::from_utf8_lossy(&kv.key).to_string();
|
||||
let value = String::from_utf8_lossy(&kv.value).to_string();
|
||||
Ok((key, value))
|
||||
}
|
||||
|
||||
/// Formats a value as a JSON string.
|
||||
pub fn json_fromatter<T>(pretty: bool, value: &T) -> String
|
||||
where
|
||||
T: Serialize,
|
||||
{
|
||||
if pretty {
|
||||
serde_json::to_string_pretty(value).unwrap()
|
||||
} else {
|
||||
serde_json::to_string(value).unwrap()
|
||||
}
|
||||
}
|
||||
@@ -12,7 +12,7 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use api::v1::flow::{FlowRequest, FlowResponse};
|
||||
use api::v1::flow::{DirtyWindowRequest, DirtyWindowRequests, FlowRequest, FlowResponse};
|
||||
use api::v1::region::InsertRequests;
|
||||
use common_error::ext::BoxedError;
|
||||
use common_meta::node_manager::Flownode;
|
||||
@@ -44,6 +44,16 @@ impl Flownode for FlowRequester {
|
||||
.map_err(BoxedError::new)
|
||||
.context(common_meta::error::ExternalSnafu)
|
||||
}
|
||||
|
||||
async fn handle_mark_window_dirty(
|
||||
&self,
|
||||
req: DirtyWindowRequest,
|
||||
) -> common_meta::error::Result<FlowResponse> {
|
||||
self.handle_mark_window_dirty(req)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(common_meta::error::ExternalSnafu)
|
||||
}
|
||||
}
|
||||
|
||||
impl FlowRequester {
|
||||
@@ -91,4 +101,20 @@ impl FlowRequester {
|
||||
.into_inner();
|
||||
Ok(response)
|
||||
}
|
||||
|
||||
async fn handle_mark_window_dirty(&self, req: DirtyWindowRequest) -> Result<FlowResponse> {
|
||||
let (addr, mut client) = self.client.raw_flow_client()?;
|
||||
let response = client
|
||||
.handle_mark_dirty_time_window(DirtyWindowRequests {
|
||||
requests: vec![req],
|
||||
})
|
||||
.await
|
||||
.or_else(|e| {
|
||||
let code = e.code();
|
||||
let err: crate::error::Error = e.into();
|
||||
Err(BoxedError::new(err)).context(FlowServerSnafu { addr, code })
|
||||
})?
|
||||
.into_inner();
|
||||
Ok(response)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -323,7 +323,6 @@ impl StartCommand {
|
||||
cached_meta_backend.clone(),
|
||||
layered_cache_registry.clone(),
|
||||
None,
|
||||
None,
|
||||
);
|
||||
|
||||
let table_metadata_manager =
|
||||
|
||||
@@ -20,7 +20,6 @@ use async_trait::async_trait;
|
||||
use cache::{build_fundamental_cache_registry, with_default_composite_cache_registry};
|
||||
use catalog::information_extension::DistributedInformationExtension;
|
||||
use catalog::kvbackend::{CachedKvBackendBuilder, KvBackendCatalogManager, MetaKvBackend};
|
||||
use catalog::process_manager::ProcessManager;
|
||||
use clap::Parser;
|
||||
use client::client_manager::NodeClients;
|
||||
use common_base::Plugins;
|
||||
@@ -39,7 +38,6 @@ use frontend::heartbeat::HeartbeatTask;
|
||||
use frontend::instance::builder::FrontendBuilder;
|
||||
use frontend::server::Services;
|
||||
use meta_client::{MetaClientOptions, MetaClientType};
|
||||
use servers::addrs;
|
||||
use servers::export_metrics::ExportMetricsTask;
|
||||
use servers::tls::{TlsMode, TlsOption};
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
@@ -344,17 +342,11 @@ impl StartCommand {
|
||||
|
||||
let information_extension =
|
||||
Arc::new(DistributedInformationExtension::new(meta_client.clone()));
|
||||
|
||||
let process_manager = Arc::new(ProcessManager::new(
|
||||
addrs::resolve_addr(&opts.grpc.bind_addr, Some(&opts.grpc.server_addr)),
|
||||
Some(meta_client.clone()),
|
||||
));
|
||||
let catalog_manager = KvBackendCatalogManager::new(
|
||||
information_extension,
|
||||
cached_meta_backend.clone(),
|
||||
layered_cache_registry.clone(),
|
||||
None,
|
||||
Some(process_manager.clone()),
|
||||
);
|
||||
|
||||
let executor = HandlerGroupExecutor::new(vec![
|
||||
@@ -391,7 +383,6 @@ impl StartCommand {
|
||||
catalog_manager,
|
||||
Arc::new(client),
|
||||
meta_client,
|
||||
process_manager,
|
||||
)
|
||||
.with_plugin(plugins.clone())
|
||||
.with_local_cache_invalidator(layered_cache_registry)
|
||||
|
||||
@@ -21,7 +21,6 @@ use async_trait::async_trait;
|
||||
use cache::{build_fundamental_cache_registry, with_default_composite_cache_registry};
|
||||
use catalog::information_schema::InformationExtension;
|
||||
use catalog::kvbackend::KvBackendCatalogManager;
|
||||
use catalog::process_manager::ProcessManager;
|
||||
use clap::Parser;
|
||||
use client::api::v1::meta::RegionRole;
|
||||
use common_base::readable_size::ReadableSize;
|
||||
@@ -527,14 +526,11 @@ impl StartCommand {
|
||||
datanode.region_server(),
|
||||
procedure_manager.clone(),
|
||||
));
|
||||
|
||||
let process_manager = Arc::new(ProcessManager::new(opts.grpc.server_addr.clone(), None));
|
||||
let catalog_manager = KvBackendCatalogManager::new(
|
||||
information_extension.clone(),
|
||||
kv_backend.clone(),
|
||||
layered_cache_registry.clone(),
|
||||
Some(procedure_manager.clone()),
|
||||
Some(process_manager.clone()),
|
||||
);
|
||||
|
||||
let table_metadata_manager =
|
||||
@@ -624,7 +620,6 @@ impl StartCommand {
|
||||
catalog_manager.clone(),
|
||||
node_manager.clone(),
|
||||
ddl_task_executor.clone(),
|
||||
process_manager,
|
||||
)
|
||||
.with_plugin(plugins.clone())
|
||||
.try_build()
|
||||
@@ -652,7 +647,7 @@ impl StartCommand {
|
||||
node_manager,
|
||||
)
|
||||
.await
|
||||
.context(StartFlownodeSnafu)?;
|
||||
.context(error::StartFlownodeSnafu)?;
|
||||
flow_streaming_engine.set_frontend_invoker(invoker).await;
|
||||
|
||||
let export_metrics_task = ExportMetricsTask::try_new(&opts.export_metrics, Some(&plugins))
|
||||
|
||||
@@ -102,8 +102,6 @@ pub const INFORMATION_SCHEMA_FLOW_TABLE_ID: u32 = 33;
|
||||
pub const INFORMATION_SCHEMA_PROCEDURE_INFO_TABLE_ID: u32 = 34;
|
||||
/// id for information_schema.region_statistics
|
||||
pub const INFORMATION_SCHEMA_REGION_STATISTICS_TABLE_ID: u32 = 35;
|
||||
/// id for information_schema.process_list
|
||||
pub const INFORMATION_SCHEMA_PROCESS_LIST_TABLE_ID: u32 = 36;
|
||||
|
||||
// ----- End of information_schema tables -----
|
||||
|
||||
|
||||
@@ -7,13 +7,5 @@ license.workspace = true
|
||||
[dependencies]
|
||||
async-trait.workspace = true
|
||||
common-error.workspace = true
|
||||
common-grpc.workspace = true
|
||||
common-macro.workspace = true
|
||||
common-meta.workspace = true
|
||||
greptime-proto.workspace = true
|
||||
meta-client.workspace = true
|
||||
snafu.workspace = true
|
||||
tonic.workspace = true
|
||||
|
||||
[dev-dependencies]
|
||||
tokio.workspace = true
|
||||
|
||||
@@ -27,35 +27,6 @@ pub enum Error {
|
||||
location: Location,
|
||||
source: BoxedError,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to list nodes from metasrv"))]
|
||||
Meta {
|
||||
source: Box<meta_client::error::Error>,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to parse process id: {}", s))]
|
||||
ParseProcessId {
|
||||
s: String,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to invoke list process service"))]
|
||||
ListProcess {
|
||||
#[snafu(source)]
|
||||
error: tonic::Status,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to invoke list process service"))]
|
||||
CreateChannel {
|
||||
source: common_grpc::error::Error,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
}
|
||||
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
@@ -65,10 +36,6 @@ impl ErrorExt for Error {
|
||||
use Error::*;
|
||||
match self {
|
||||
External { source, .. } => source.status_code(),
|
||||
Meta { source, .. } => source.status_code(),
|
||||
ParseProcessId { .. } => StatusCode::InvalidArguments,
|
||||
ListProcess { .. } => StatusCode::External,
|
||||
CreateChannel { source, .. } => source.status_code(),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -12,41 +12,4 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::fmt::{Display, Formatter};
|
||||
use std::str::FromStr;
|
||||
|
||||
use snafu::OptionExt;
|
||||
|
||||
pub mod error;
|
||||
pub mod selector;
|
||||
|
||||
#[derive(Debug, Clone, Eq, PartialEq)]
|
||||
pub struct DisplayProcessId {
|
||||
pub server_addr: String,
|
||||
pub id: u64,
|
||||
}
|
||||
|
||||
impl Display for DisplayProcessId {
|
||||
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "{}/{}", self.server_addr, self.id)
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<&str> for DisplayProcessId {
|
||||
type Error = error::Error;
|
||||
|
||||
fn try_from(value: &str) -> Result<Self, Self::Error> {
|
||||
let mut split = value.split('/');
|
||||
let server_addr = split
|
||||
.next()
|
||||
.context(error::ParseProcessIdSnafu { s: value })?
|
||||
.to_string();
|
||||
let id = split
|
||||
.next()
|
||||
.context(error::ParseProcessIdSnafu { s: value })?;
|
||||
let id = u64::from_str(id)
|
||||
.ok()
|
||||
.context(error::ParseProcessIdSnafu { s: value })?;
|
||||
Ok(DisplayProcessId { server_addr, id })
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,98 +0,0 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::time::Duration;
|
||||
|
||||
use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
|
||||
use common_meta::cluster::{ClusterInfo, NodeInfo, Role};
|
||||
use greptime_proto::v1::frontend::{frontend_client, ListProcessRequest, ListProcessResponse};
|
||||
use meta_client::MetaClientRef;
|
||||
use snafu::ResultExt;
|
||||
|
||||
use crate::error;
|
||||
use crate::error::{MetaSnafu, Result};
|
||||
|
||||
pub type FrontendClientPtr = Box<dyn FrontendClient>;
|
||||
|
||||
#[async_trait::async_trait]
|
||||
pub trait FrontendClient: Send {
|
||||
async fn list_process(&mut self, req: ListProcessRequest) -> Result<ListProcessResponse>;
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl FrontendClient for frontend_client::FrontendClient<tonic::transport::channel::Channel> {
|
||||
async fn list_process(&mut self, req: ListProcessRequest) -> Result<ListProcessResponse> {
|
||||
let response: ListProcessResponse = frontend_client::FrontendClient::<
|
||||
tonic::transport::channel::Channel,
|
||||
>::list_process(self, req)
|
||||
.await
|
||||
.context(error::ListProcessSnafu)?
|
||||
.into_inner();
|
||||
Ok(response)
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
pub trait FrontendSelector {
|
||||
async fn select<F>(&self, predicate: F) -> Result<Vec<FrontendClientPtr>>
|
||||
where
|
||||
F: Fn(&NodeInfo) -> bool + Send;
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct MetaClientSelector {
|
||||
meta_client: MetaClientRef,
|
||||
channel_manager: ChannelManager,
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl FrontendSelector for MetaClientSelector {
|
||||
async fn select<F>(&self, predicate: F) -> Result<Vec<FrontendClientPtr>>
|
||||
where
|
||||
F: Fn(&NodeInfo) -> bool + Send,
|
||||
{
|
||||
let nodes = self
|
||||
.meta_client
|
||||
.list_nodes(Some(Role::Frontend))
|
||||
.await
|
||||
.map_err(Box::new)
|
||||
.context(MetaSnafu)?;
|
||||
|
||||
nodes
|
||||
.into_iter()
|
||||
.filter(predicate)
|
||||
.map(|node| {
|
||||
let channel = self
|
||||
.channel_manager
|
||||
.get(node.peer.addr)
|
||||
.context(error::CreateChannelSnafu)?;
|
||||
let client = frontend_client::FrontendClient::new(channel);
|
||||
Ok(Box::new(client) as FrontendClientPtr)
|
||||
})
|
||||
.collect::<Result<Vec<_>>>()
|
||||
}
|
||||
}
|
||||
|
||||
impl MetaClientSelector {
|
||||
pub fn new(meta_client: MetaClientRef) -> Self {
|
||||
let cfg = ChannelConfig::new()
|
||||
.connect_timeout(Duration::from_secs(30))
|
||||
.timeout(Duration::from_secs(30));
|
||||
let channel_manager = ChannelManager::with_config(cfg);
|
||||
Self {
|
||||
meta_client,
|
||||
channel_manager,
|
||||
}
|
||||
}
|
||||
}
|
||||
90
src/common/function/src/adjust_flow.rs
Normal file
90
src/common/function/src/adjust_flow.rs
Normal file
@@ -0,0 +1,90 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use common_macro::admin_fn;
|
||||
use common_query::error::{
|
||||
InvalidFuncArgsSnafu, MissingFlowServiceHandlerSnafu, Result, UnsupportedInputDataTypeSnafu,
|
||||
};
|
||||
use common_query::prelude::Signature;
|
||||
use datafusion::logical_expr::Volatility;
|
||||
use datatypes::value::{Value, ValueRef};
|
||||
use session::context::QueryContextRef;
|
||||
use snafu::ensure;
|
||||
use store_api::storage::ConcreteDataType;
|
||||
|
||||
use crate::handlers::FlowServiceHandlerRef;
|
||||
use crate::helper::parse_catalog_flow;
|
||||
|
||||
fn adjust_signature() -> Signature {
|
||||
Signature::exact(
|
||||
vec![
|
||||
ConcreteDataType::string_datatype(), // flow name
|
||||
ConcreteDataType::uint64_datatype(), // min_run_interval in seconds
|
||||
ConcreteDataType::uint64_datatype(), // max filter number per query
|
||||
],
|
||||
Volatility::Immutable,
|
||||
)
|
||||
}
|
||||
|
||||
#[admin_fn(
|
||||
name = AdjustFlowFunction,
|
||||
display_name = adjust_flow,
|
||||
sig_fn = adjust_signature,
|
||||
ret = uint64
|
||||
)]
|
||||
pub(crate) async fn adjust_flow(
|
||||
flow_service_handler: &FlowServiceHandlerRef,
|
||||
query_ctx: &QueryContextRef,
|
||||
params: &[ValueRef<'_>],
|
||||
) -> Result<Value> {
|
||||
ensure!(
|
||||
params.len() == 3,
|
||||
InvalidFuncArgsSnafu {
|
||||
err_msg: format!(
|
||||
"The length of the args is not correct, expect 3, have: {}",
|
||||
params.len()
|
||||
),
|
||||
}
|
||||
);
|
||||
|
||||
let (flow_name, min_run_interval, max_filter_num) = match (params[0], params[1], params[2]) {
|
||||
(
|
||||
ValueRef::String(flow_name),
|
||||
ValueRef::UInt64(min_run_interval),
|
||||
ValueRef::UInt64(max_filter_num),
|
||||
) => (flow_name, min_run_interval, max_filter_num),
|
||||
_ => {
|
||||
return UnsupportedInputDataTypeSnafu {
|
||||
function: "adjust_flow",
|
||||
datatypes: params.iter().map(|v| v.data_type()).collect::<Vec<_>>(),
|
||||
}
|
||||
.fail();
|
||||
}
|
||||
};
|
||||
|
||||
let (catalog_name, flow_name) = parse_catalog_flow(flow_name, query_ctx)?;
|
||||
|
||||
let res = flow_service_handler
|
||||
.adjust(
|
||||
&catalog_name,
|
||||
&flow_name,
|
||||
min_run_interval,
|
||||
max_filter_num as usize,
|
||||
query_ctx.clone(),
|
||||
)
|
||||
.await?;
|
||||
let affected_rows = res.affected_rows;
|
||||
|
||||
Ok(Value::from(affected_rows))
|
||||
}
|
||||
@@ -26,6 +26,7 @@ use flush_compact_table::{CompactTableFunction, FlushTableFunction};
|
||||
use migrate_region::MigrateRegionFunction;
|
||||
use remove_region_follower::RemoveRegionFollowerFunction;
|
||||
|
||||
use crate::adjust_flow::AdjustFlowFunction;
|
||||
use crate::flush_flow::FlushFlowFunction;
|
||||
use crate::function_registry::FunctionRegistry;
|
||||
|
||||
@@ -43,5 +44,6 @@ impl AdminFunction {
|
||||
registry.register_async(Arc::new(FlushTableFunction));
|
||||
registry.register_async(Arc::new(CompactTableFunction));
|
||||
registry.register_async(Arc::new(FlushFlowFunction));
|
||||
registry.register_async(Arc::new(AdjustFlowFunction));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -12,21 +12,19 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use common_error::ext::BoxedError;
|
||||
use common_macro::admin_fn;
|
||||
use common_query::error::{
|
||||
ExecuteSnafu, InvalidFuncArgsSnafu, MissingFlowServiceHandlerSnafu, Result,
|
||||
UnsupportedInputDataTypeSnafu,
|
||||
InvalidFuncArgsSnafu, MissingFlowServiceHandlerSnafu, Result, UnsupportedInputDataTypeSnafu,
|
||||
};
|
||||
use common_query::prelude::Signature;
|
||||
use datafusion::logical_expr::Volatility;
|
||||
use datatypes::value::{Value, ValueRef};
|
||||
use session::context::QueryContextRef;
|
||||
use snafu::{ensure, ResultExt};
|
||||
use sql::parser::ParserContext;
|
||||
use snafu::ensure;
|
||||
use store_api::storage::ConcreteDataType;
|
||||
|
||||
use crate::handlers::FlowServiceHandlerRef;
|
||||
use crate::helper::parse_catalog_flow;
|
||||
|
||||
fn flush_signature() -> Signature {
|
||||
Signature::uniform(
|
||||
@@ -47,20 +45,6 @@ pub(crate) async fn flush_flow(
|
||||
query_ctx: &QueryContextRef,
|
||||
params: &[ValueRef<'_>],
|
||||
) -> Result<Value> {
|
||||
let (catalog_name, flow_name) = parse_flush_flow(params, query_ctx)?;
|
||||
|
||||
let res = flow_service_handler
|
||||
.flush(&catalog_name, &flow_name, query_ctx.clone())
|
||||
.await?;
|
||||
let affected_rows = res.affected_rows;
|
||||
|
||||
Ok(Value::from(affected_rows))
|
||||
}
|
||||
|
||||
fn parse_flush_flow(
|
||||
params: &[ValueRef<'_>],
|
||||
query_ctx: &QueryContextRef,
|
||||
) -> Result<(String, String)> {
|
||||
ensure!(
|
||||
params.len() == 1,
|
||||
InvalidFuncArgsSnafu {
|
||||
@@ -70,7 +54,6 @@ fn parse_flush_flow(
|
||||
),
|
||||
}
|
||||
);
|
||||
|
||||
let ValueRef::String(flow_name) = params[0] else {
|
||||
return UnsupportedInputDataTypeSnafu {
|
||||
function: "flush_flow",
|
||||
@@ -78,27 +61,14 @@ fn parse_flush_flow(
|
||||
}
|
||||
.fail();
|
||||
};
|
||||
let obj_name = ParserContext::parse_table_name(flow_name, query_ctx.sql_dialect())
|
||||
.map_err(BoxedError::new)
|
||||
.context(ExecuteSnafu)?;
|
||||
let (catalog_name, flow_name) = parse_catalog_flow(flow_name, query_ctx)?;
|
||||
|
||||
let (catalog_name, flow_name) = match &obj_name.0[..] {
|
||||
[flow_name] => (
|
||||
query_ctx.current_catalog().to_string(),
|
||||
flow_name.value.clone(),
|
||||
),
|
||||
[catalog, flow_name] => (catalog.value.clone(), flow_name.value.clone()),
|
||||
_ => {
|
||||
return InvalidFuncArgsSnafu {
|
||||
err_msg: format!(
|
||||
"expect flow name to be <catalog>.<flow-name> or <flow-name>, actual: {}",
|
||||
obj_name
|
||||
),
|
||||
}
|
||||
.fail()
|
||||
}
|
||||
};
|
||||
Ok((catalog_name, flow_name))
|
||||
let res = flow_service_handler
|
||||
.flush(&catalog_name, &flow_name, query_ctx.clone())
|
||||
.await?;
|
||||
let affected_rows = res.affected_rows;
|
||||
|
||||
Ok(Value::from(affected_rows))
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
@@ -154,10 +124,7 @@ mod test {
|
||||
("catalog.flow_name", ("catalog", "flow_name")),
|
||||
];
|
||||
for (input, expected) in testcases.iter() {
|
||||
let args = vec![*input];
|
||||
let args = args.into_iter().map(ValueRef::String).collect::<Vec<_>>();
|
||||
|
||||
let result = parse_flush_flow(&args, &QueryContext::arc()).unwrap();
|
||||
let result = parse_catalog_flow(input, &QueryContext::arc()).unwrap();
|
||||
assert_eq!(*expected, (result.0.as_str(), result.1.as_str()));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -87,6 +87,15 @@ pub trait FlowServiceHandler: Send + Sync {
|
||||
flow: &str,
|
||||
ctx: QueryContextRef,
|
||||
) -> Result<api::v1::flow::FlowResponse>;
|
||||
|
||||
async fn adjust(
|
||||
&self,
|
||||
catalog: &str,
|
||||
flow: &str,
|
||||
min_run_interval_secs: u64,
|
||||
max_filter_num_per_query: usize,
|
||||
ctx: QueryContextRef,
|
||||
) -> Result<api::v1::flow::FlowResponse>;
|
||||
}
|
||||
|
||||
pub type TableMutationHandlerRef = Arc<dyn TableMutationHandler>;
|
||||
|
||||
@@ -12,12 +12,15 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use common_query::error::{InvalidInputTypeSnafu, Result};
|
||||
use common_error::ext::BoxedError;
|
||||
use common_query::error::{ExecuteSnafu, InvalidFuncArgsSnafu, InvalidInputTypeSnafu, Result};
|
||||
use common_query::prelude::{Signature, TypeSignature, Volatility};
|
||||
use datatypes::prelude::ConcreteDataType;
|
||||
use datatypes::types::cast::cast;
|
||||
use datatypes::value::ValueRef;
|
||||
use session::context::QueryContextRef;
|
||||
use snafu::ResultExt;
|
||||
use sql::parser::ParserContext;
|
||||
|
||||
/// Create a function signature with oneof signatures of interleaving two arguments.
|
||||
pub fn one_of_sigs2(args1: Vec<ConcreteDataType>, args2: Vec<ConcreteDataType>) -> Signature {
|
||||
@@ -43,3 +46,30 @@ pub fn cast_u64(value: &ValueRef) -> Result<Option<u64>> {
|
||||
})
|
||||
.map(|v| v.as_u64())
|
||||
}
|
||||
|
||||
pub fn parse_catalog_flow(
|
||||
flow_name: &str,
|
||||
query_ctx: &QueryContextRef,
|
||||
) -> Result<(String, String)> {
|
||||
let obj_name = ParserContext::parse_table_name(flow_name, query_ctx.sql_dialect())
|
||||
.map_err(BoxedError::new)
|
||||
.context(ExecuteSnafu)?;
|
||||
|
||||
let (catalog_name, flow_name) = match &obj_name.0[..] {
|
||||
[flow_name] => (
|
||||
query_ctx.current_catalog().to_string(),
|
||||
flow_name.value.clone(),
|
||||
),
|
||||
[catalog, flow_name] => (catalog.value.clone(), flow_name.value.clone()),
|
||||
_ => {
|
||||
return InvalidFuncArgsSnafu {
|
||||
err_msg: format!(
|
||||
"expect flow name to be <catalog>.<flow-name> or <flow-name>, actual: {}",
|
||||
obj_name
|
||||
),
|
||||
}
|
||||
.fail()
|
||||
}
|
||||
};
|
||||
Ok((catalog_name, flow_name))
|
||||
}
|
||||
|
||||
@@ -15,6 +15,7 @@
|
||||
#![feature(let_chains)]
|
||||
#![feature(try_blocks)]
|
||||
|
||||
mod adjust_flow;
|
||||
mod admin;
|
||||
mod flush_flow;
|
||||
mod macros;
|
||||
|
||||
@@ -148,6 +148,17 @@ impl FunctionState {
|
||||
) -> Result<api::v1::flow::FlowResponse> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
async fn adjust(
|
||||
&self,
|
||||
_catalog: &str,
|
||||
_flow: &str,
|
||||
_min_run_interval_secs: u64,
|
||||
_max_filter_num_per_query: usize,
|
||||
_ctx: QueryContextRef,
|
||||
) -> Result<api::v1::flow::FlowResponse> {
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
|
||||
Self {
|
||||
|
||||
@@ -1001,7 +1001,7 @@ impl ErrorExt for Error {
|
||||
}
|
||||
#[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
|
||||
RdsTransactionRetryFailed { .. } => StatusCode::Internal,
|
||||
DatanodeTableInfoNotFound { .. } => StatusCode::Internal,
|
||||
Error::DatanodeTableInfoNotFound { .. } => StatusCode::Internal,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -70,12 +70,11 @@ impl MetadataKey<'_, ViewInfoKey> for ViewInfoKey {
|
||||
}
|
||||
.build()
|
||||
})?;
|
||||
let captures =
|
||||
VIEW_INFO_KEY_PATTERN
|
||||
.captures(key)
|
||||
.with_context(|| InvalidViewInfoSnafu {
|
||||
err_msg: format!("Invalid ViewInfoKey '{key}'"),
|
||||
})?;
|
||||
let captures = VIEW_INFO_KEY_PATTERN
|
||||
.captures(key)
|
||||
.context(InvalidViewInfoSnafu {
|
||||
err_msg: format!("Invalid ViewInfoKey '{key}'"),
|
||||
})?;
|
||||
// Safety: pass the regex check above
|
||||
let view_id = captures[1].parse::<TableId>().unwrap();
|
||||
Ok(ViewInfoKey { view_id })
|
||||
|
||||
@@ -15,7 +15,7 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::region::RegionResponse;
|
||||
use api::v1::flow::{FlowRequest, FlowResponse};
|
||||
use api::v1::flow::{DirtyWindowRequest, FlowRequest, FlowResponse};
|
||||
use api::v1::region::{InsertRequests, RegionRequest};
|
||||
pub use common_base::AffectedRows;
|
||||
use common_query::request::QueryRequest;
|
||||
@@ -42,6 +42,9 @@ pub trait Flownode: Send + Sync {
|
||||
async fn handle(&self, request: FlowRequest) -> Result<FlowResponse>;
|
||||
|
||||
async fn handle_inserts(&self, request: InsertRequests) -> Result<FlowResponse>;
|
||||
|
||||
/// Handles requests to mark time window as dirty.
|
||||
async fn handle_mark_window_dirty(&self, req: DirtyWindowRequest) -> Result<FlowResponse>;
|
||||
}
|
||||
|
||||
pub type FlownodeRef = Arc<dyn Flownode>;
|
||||
|
||||
@@ -15,7 +15,7 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::region::RegionResponse;
|
||||
use api::v1::flow::{FlowRequest, FlowResponse};
|
||||
use api::v1::flow::{DirtyWindowRequest, FlowRequest, FlowResponse};
|
||||
use api::v1::region::{InsertRequests, RegionRequest};
|
||||
pub use common_base::AffectedRows;
|
||||
use common_query::request::QueryRequest;
|
||||
@@ -67,6 +67,14 @@ pub trait MockFlownodeHandler: Sync + Send + Clone {
|
||||
) -> Result<FlowResponse> {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
async fn handle_mark_window_dirty(
|
||||
&self,
|
||||
_peer: &Peer,
|
||||
_req: DirtyWindowRequest,
|
||||
) -> Result<FlowResponse> {
|
||||
unimplemented!()
|
||||
}
|
||||
}
|
||||
|
||||
/// A mock struct implements [NodeManager] only implement the `datanode` method.
|
||||
@@ -134,6 +142,10 @@ impl<T: MockFlownodeHandler> Flownode for MockNode<T> {
|
||||
async fn handle_inserts(&self, requests: InsertRequests) -> Result<FlowResponse> {
|
||||
self.handler.handle_inserts(&self.peer, requests).await
|
||||
}
|
||||
|
||||
async fn handle_mark_window_dirty(&self, req: DirtyWindowRequest) -> Result<FlowResponse> {
|
||||
self.handler.handle_mark_window_dirty(&self.peer, req).await
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
|
||||
@@ -326,14 +326,6 @@ impl Value {
|
||||
}
|
||||
}
|
||||
|
||||
/// Cast Value to [Duration]. Return None if value is not a valid duration data type.
|
||||
pub fn as_duration(&self) -> Option<Duration> {
|
||||
match self {
|
||||
Value::Duration(d) => Some(*d),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the logical type of the value.
|
||||
pub fn logical_type_id(&self) -> LogicalTypeId {
|
||||
match self {
|
||||
|
||||
@@ -61,6 +61,7 @@ prost.workspace = true
|
||||
query.workspace = true
|
||||
rand.workspace = true
|
||||
serde.workspace = true
|
||||
serde_json.workspace = true
|
||||
servers.workspace = true
|
||||
session.workspace = true
|
||||
smallvec.workspace = true
|
||||
|
||||
@@ -316,7 +316,7 @@ impl StreamingEngine {
|
||||
);
|
||||
|
||||
METRIC_FLOW_ROWS
|
||||
.with_label_values(&["out"])
|
||||
.with_label_values(&["out-streaming"])
|
||||
.inc_by(total_rows as u64);
|
||||
|
||||
let now = self.tick_manager.tick();
|
||||
|
||||
@@ -18,7 +18,7 @@ use std::sync::atomic::AtomicBool;
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::v1::flow::{
|
||||
flow_request, CreateRequest, DropRequest, FlowRequest, FlowResponse, FlushFlow,
|
||||
flow_request, AdjustFlow, CreateRequest, DropRequest, FlowRequest, FlowResponse, FlushFlow,
|
||||
};
|
||||
use api::v1::region::InsertRequests;
|
||||
use catalog::CatalogManager;
|
||||
@@ -31,7 +31,9 @@ use common_runtime::JoinHandle;
|
||||
use common_telemetry::{error, info, trace, warn};
|
||||
use datatypes::value::Value;
|
||||
use futures::TryStreamExt;
|
||||
use greptime_proto::v1::flow::DirtyWindowRequest;
|
||||
use itertools::Itertools;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use session::context::QueryContextBuilder;
|
||||
use snafu::{ensure, IntoError, OptionExt, ResultExt};
|
||||
use store_api::storage::{RegionId, TableId};
|
||||
@@ -46,7 +48,7 @@ use crate::error::{
|
||||
IllegalCheckTaskStateSnafu, InsertIntoFlowSnafu, InternalSnafu, JoinTaskSnafu, ListFlowsSnafu,
|
||||
NoAvailableFrontendSnafu, SyncCheckTaskSnafu, UnexpectedSnafu,
|
||||
};
|
||||
use crate::metrics::METRIC_FLOW_TASK_COUNT;
|
||||
use crate::metrics::{METRIC_FLOW_ROWS, METRIC_FLOW_TASK_COUNT};
|
||||
use crate::repr::{self, DiffRow};
|
||||
use crate::{Error, FlowId};
|
||||
|
||||
@@ -689,6 +691,9 @@ impl FlowEngine for FlowDualEngine {
|
||||
let mut to_stream_engine = Vec::with_capacity(request.requests.len());
|
||||
let mut to_batch_engine = request.requests;
|
||||
|
||||
let mut batching_row_cnt = 0;
|
||||
let mut streaming_row_cnt = 0;
|
||||
|
||||
{
|
||||
// not locking this, or recover flows will be starved when also handling flow inserts
|
||||
let src_table2flow = self.src_table2flow.read().await;
|
||||
@@ -698,9 +703,11 @@ impl FlowEngine for FlowDualEngine {
|
||||
let is_in_stream = src_table2flow.in_stream(table_id);
|
||||
let is_in_batch = src_table2flow.in_batch(table_id);
|
||||
if is_in_stream {
|
||||
streaming_row_cnt += req.rows.as_ref().map(|rs| rs.rows.len()).unwrap_or(0);
|
||||
to_stream_engine.push(req.clone());
|
||||
}
|
||||
if is_in_batch {
|
||||
batching_row_cnt += req.rows.as_ref().map(|rs| rs.rows.len()).unwrap_or(0);
|
||||
return true;
|
||||
}
|
||||
if !is_in_batch && !is_in_stream {
|
||||
@@ -713,6 +720,14 @@ impl FlowEngine for FlowDualEngine {
|
||||
// can't use drop due to https://github.com/rust-lang/rust/pull/128846
|
||||
}
|
||||
|
||||
METRIC_FLOW_ROWS
|
||||
.with_label_values(&["in-streaming"])
|
||||
.inc_by(streaming_row_cnt as u64);
|
||||
|
||||
METRIC_FLOW_ROWS
|
||||
.with_label_values(&["in-batching"])
|
||||
.inc_by(batching_row_cnt as u64);
|
||||
|
||||
let streaming_engine = self.streaming_engine.clone();
|
||||
let stream_handler: JoinHandle<Result<(), Error>> =
|
||||
common_runtime::spawn_global(async move {
|
||||
@@ -809,6 +824,25 @@ impl common_meta::node_manager::Flownode for FlowDualEngine {
|
||||
..Default::default()
|
||||
})
|
||||
}
|
||||
Some(flow_request::Body::Adjust(AdjustFlow { flow_id, options })) => {
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
struct Options {
|
||||
min_run_interval_secs: u64,
|
||||
max_filter_num_per_query: usize,
|
||||
}
|
||||
let options: Options = serde_json::from_str(&options).with_context(|_| {
|
||||
common_meta::error::DeserializeFromJsonSnafu { input: options }
|
||||
})?;
|
||||
self.batching_engine
|
||||
.adjust_flow(
|
||||
flow_id.unwrap().id as u64,
|
||||
options.min_run_interval_secs,
|
||||
options.max_filter_num_per_query,
|
||||
)
|
||||
.await
|
||||
.map_err(to_meta_err(snafu::location!()))?;
|
||||
Ok(Default::default())
|
||||
}
|
||||
other => common_meta::error::InvalidFlowRequestBodySnafu { body: other }.fail(),
|
||||
}
|
||||
}
|
||||
@@ -819,6 +853,11 @@ impl common_meta::node_manager::Flownode for FlowDualEngine {
|
||||
.map(|_| Default::default())
|
||||
.map_err(to_meta_err(snafu::location!()))
|
||||
}
|
||||
|
||||
async fn handle_mark_window_dirty(&self, _req: DirtyWindowRequest) -> MetaResult<FlowResponse> {
|
||||
// todo: implement
|
||||
unimplemented!()
|
||||
}
|
||||
}
|
||||
|
||||
/// return a function to convert `crate::error::Error` to `common_meta::error::Error`
|
||||
@@ -926,6 +965,11 @@ impl common_meta::node_manager::Flownode for StreamingEngine {
|
||||
.map(|_| Default::default())
|
||||
.map_err(to_meta_err(snafu::location!()))
|
||||
}
|
||||
|
||||
async fn handle_mark_window_dirty(&self, _req: DirtyWindowRequest) -> MetaResult<FlowResponse> {
|
||||
// todo: implement
|
||||
unimplemented!()
|
||||
}
|
||||
}
|
||||
|
||||
impl FlowEngine for StreamingEngine {
|
||||
|
||||
@@ -17,6 +17,7 @@
|
||||
use std::collections::{BTreeMap, HashMap};
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::v1::flow::{DirtyWindowRequests, FlowResponse};
|
||||
use catalog::CatalogManagerRef;
|
||||
use common_error::ext::BoxedError;
|
||||
use common_meta::ddl::create_flow::FlowType;
|
||||
@@ -29,8 +30,7 @@ use common_telemetry::{debug, info};
|
||||
use common_time::TimeToLive;
|
||||
use query::QueryEngineRef;
|
||||
use snafu::{ensure, OptionExt, ResultExt};
|
||||
use store_api::storage::RegionId;
|
||||
use table::metadata::TableId;
|
||||
use store_api::storage::{RegionId, TableId};
|
||||
use tokio::sync::{oneshot, RwLock};
|
||||
|
||||
use crate::batching_mode::frontend_client::FrontendClient;
|
||||
@@ -42,6 +42,7 @@ use crate::error::{
|
||||
ExternalSnafu, FlowAlreadyExistSnafu, FlowNotFoundSnafu, TableNotFoundMetaSnafu,
|
||||
UnexpectedSnafu, UnsupportedSnafu,
|
||||
};
|
||||
use crate::metrics::METRIC_FLOW_BATCHING_ENGINE_BULK_MARK_TIME_WINDOW_RANGE;
|
||||
use crate::{CreateFlowArgs, Error, FlowId, TableName};
|
||||
|
||||
/// Batching mode Engine, responsible for driving all the batching mode tasks
|
||||
@@ -77,6 +78,122 @@ impl BatchingEngine {
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn handle_mark_dirty_time_window(
|
||||
&self,
|
||||
reqs: DirtyWindowRequests,
|
||||
) -> Result<FlowResponse, Error> {
|
||||
let table_info_mgr = self.table_meta.table_info_manager();
|
||||
|
||||
let mut group_by_table_id: HashMap<u32, Vec<_>> = HashMap::new();
|
||||
for r in reqs.requests {
|
||||
let tid = TableId::from(r.table_id);
|
||||
let entry = group_by_table_id.entry(tid).or_default();
|
||||
entry.extend(r.dirty_time_ranges);
|
||||
}
|
||||
let tids = group_by_table_id.keys().cloned().collect::<Vec<TableId>>();
|
||||
let table_infos =
|
||||
table_info_mgr
|
||||
.batch_get(&tids)
|
||||
.await
|
||||
.with_context(|_| TableNotFoundMetaSnafu {
|
||||
msg: format!("Failed to get table info for table ids: {:?}", tids),
|
||||
})?;
|
||||
|
||||
let group_by_table_name = group_by_table_id
|
||||
.into_iter()
|
||||
.filter_map(|(id, rows)| {
|
||||
let table_name = table_infos.get(&id).map(|info| info.table_name());
|
||||
let Some(table_name) = table_name else {
|
||||
warn!("Failed to get table infos for table id: {:?}", id);
|
||||
return None;
|
||||
};
|
||||
let table_name = [
|
||||
table_name.catalog_name,
|
||||
table_name.schema_name,
|
||||
table_name.table_name,
|
||||
];
|
||||
let schema = &table_infos.get(&id).unwrap().table_info.meta.schema;
|
||||
let time_index_unit = schema.column_schemas[schema.timestamp_index.unwrap()]
|
||||
.data_type
|
||||
.as_timestamp()
|
||||
.unwrap()
|
||||
.unit();
|
||||
Some((table_name, (rows, time_index_unit)))
|
||||
})
|
||||
.collect::<HashMap<_, _>>();
|
||||
|
||||
let group_by_table_name = Arc::new(group_by_table_name);
|
||||
|
||||
let mut handles = Vec::new();
|
||||
let tasks = self.tasks.read().await;
|
||||
|
||||
for (_flow_id, task) in tasks.iter() {
|
||||
let src_table_names = &task.config.source_table_names;
|
||||
|
||||
if src_table_names
|
||||
.iter()
|
||||
.all(|name| !group_by_table_name.contains_key(name))
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
let group_by_table_name = group_by_table_name.clone();
|
||||
let task = task.clone();
|
||||
|
||||
let handle: JoinHandle<Result<(), Error>> = tokio::spawn(async move {
|
||||
let src_table_names = &task.config.source_table_names;
|
||||
let mut all_dirty_windows = vec![];
|
||||
for src_table_name in src_table_names {
|
||||
if let Some((window_ranges, unit)) = group_by_table_name.get(src_table_name) {
|
||||
let Some(expr) = &task.config.time_window_expr else {
|
||||
continue;
|
||||
};
|
||||
for window in window_ranges {
|
||||
let align_start = expr
|
||||
.eval(common_time::Timestamp::new(window.start_value, *unit))?
|
||||
.0
|
||||
.context(UnexpectedSnafu {
|
||||
reason: "Failed to eval start value",
|
||||
})?;
|
||||
|
||||
let align_end = expr
|
||||
.eval(common_time::Timestamp::new(window.end_value, *unit))?
|
||||
.1
|
||||
.context(UnexpectedSnafu {
|
||||
reason: "Failed to eval end value",
|
||||
})?;
|
||||
all_dirty_windows.push((align_start, align_end));
|
||||
}
|
||||
}
|
||||
}
|
||||
let mut state = task.state.write().unwrap();
|
||||
let flow_id_label = task.config.flow_id.to_string();
|
||||
for (s, e) in all_dirty_windows {
|
||||
METRIC_FLOW_BATCHING_ENGINE_BULK_MARK_TIME_WINDOW_RANGE
|
||||
.with_label_values(&[&flow_id_label])
|
||||
.observe(e.sub(&s).unwrap_or_default().num_seconds() as f64);
|
||||
state.dirty_time_windows.add_window(s, Some(e));
|
||||
}
|
||||
Ok(())
|
||||
});
|
||||
handles.push(handle);
|
||||
}
|
||||
drop(tasks);
|
||||
for handle in handles {
|
||||
match handle.await {
|
||||
Err(e) => {
|
||||
warn!("Failed to handle inserts: {e}");
|
||||
}
|
||||
Ok(Ok(())) => (),
|
||||
Ok(Err(e)) => {
|
||||
warn!("Failed to handle inserts: {e}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(Default::default())
|
||||
}
|
||||
|
||||
pub async fn handle_inserts_inner(
|
||||
&self,
|
||||
request: api::v1::region::InsertRequests,
|
||||
@@ -388,6 +505,20 @@ impl BatchingEngine {
|
||||
pub async fn flow_exist_inner(&self, flow_id: FlowId) -> bool {
|
||||
self.tasks.read().await.contains_key(&flow_id)
|
||||
}
|
||||
|
||||
pub async fn adjust_flow(
|
||||
&self,
|
||||
flow_id: FlowId,
|
||||
min_run_interval_secs: u64,
|
||||
max_filter_num_per_query: usize,
|
||||
) -> Result<(), Error> {
|
||||
let task = self.tasks.read().await.get(&flow_id).cloned();
|
||||
let task = task.with_context(|| FlowNotFoundSnafu { id: flow_id })?;
|
||||
debug!("Adjusting flow {flow_id} with min_run_interval_secs={} and max_filter_num_per_query={}", min_run_interval_secs, max_filter_num_per_query);
|
||||
task.adjust(min_run_interval_secs, max_filter_num_per_query);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl FlowEngine for BatchingEngine {
|
||||
|
||||
@@ -14,8 +14,9 @@
|
||||
|
||||
//! Frontend client to run flow as batching task which is time-window-aware normal query triggered every tick set by user
|
||||
|
||||
use std::sync::{Arc, Weak};
|
||||
use std::time::SystemTime;
|
||||
use std::collections::HashMap;
|
||||
use std::sync::{Arc, Mutex, Weak};
|
||||
use std::time::{Duration, Instant, SystemTime};
|
||||
|
||||
use api::v1::greptime_request::Request;
|
||||
use api::v1::CreateTableExpr;
|
||||
@@ -26,20 +27,21 @@ use common_meta::cluster::{NodeInfo, NodeInfoKey, Role};
|
||||
use common_meta::peer::Peer;
|
||||
use common_meta::rpc::store::RangeRequest;
|
||||
use common_query::Output;
|
||||
use common_telemetry::warn;
|
||||
use common_telemetry::{debug, warn};
|
||||
use itertools::Itertools;
|
||||
use meta_client::client::MetaClient;
|
||||
use rand::rng;
|
||||
use rand::seq::SliceRandom;
|
||||
use servers::query_handler::grpc::GrpcQueryHandler;
|
||||
use session::context::{QueryContextBuilder, QueryContextRef};
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
|
||||
use crate::batching_mode::task::BatchingTask;
|
||||
use crate::batching_mode::{
|
||||
DEFAULT_BATCHING_ENGINE_QUERY_TIMEOUT, FRONTEND_ACTIVITY_TIMEOUT, GRPC_CONN_TIMEOUT,
|
||||
GRPC_MAX_RETRIES,
|
||||
};
|
||||
use crate::error::{ExternalSnafu, InvalidRequestSnafu, NoAvailableFrontendSnafu, UnexpectedSnafu};
|
||||
use crate::{Error, FlowAuthHeader};
|
||||
use crate::metrics::METRIC_FLOW_BATCHING_ENGINE_GUESS_FE_LOAD;
|
||||
use crate::{Error, FlowAuthHeader, FlowId};
|
||||
|
||||
/// Just like [`GrpcQueryHandler`] but use BoxedError
|
||||
///
|
||||
@@ -74,6 +76,105 @@ impl<
|
||||
|
||||
type HandlerMutable = Arc<std::sync::Mutex<Option<Weak<dyn GrpcQueryHandlerWithBoxedError>>>>;
|
||||
|
||||
/// Statistics about running query on this frontend from flownode
|
||||
#[derive(Debug, Default, Clone)]
|
||||
struct FrontendStat {
|
||||
/// The query for flow id has been running since this timestamp
|
||||
since: HashMap<FlowId, Instant>,
|
||||
/// The average query time for each flow id
|
||||
/// This is used to calculate the average query time for each flow id
|
||||
past_query_avg: HashMap<FlowId, (usize, Duration)>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Default, Clone)]
|
||||
pub struct FrontendStats {
|
||||
/// The statistics for each flow id
|
||||
stats: Arc<Mutex<HashMap<String, FrontendStat>>>,
|
||||
}
|
||||
|
||||
impl FrontendStats {
|
||||
pub fn observe(&self, frontend_addr: &str, flow_id: FlowId) -> FrontendStatsGuard {
|
||||
let mut stats = self.stats.lock().expect("Failed to lock frontend stats");
|
||||
let stat = stats.entry(frontend_addr.to_string()).or_default();
|
||||
stat.since.insert(flow_id, Instant::now());
|
||||
|
||||
FrontendStatsGuard {
|
||||
stats: self.stats.clone(),
|
||||
frontend_addr: frontend_addr.to_string(),
|
||||
cur: flow_id,
|
||||
}
|
||||
}
|
||||
|
||||
/// return frontend addrs sorted by load, from lightest to heaviest
|
||||
/// The load is calculated as the total average query time for each flow id plus running query's total running time elapsed
|
||||
pub fn sort_by_load(&self) -> Vec<String> {
|
||||
let stats = self.stats.lock().expect("Failed to lock frontend stats");
|
||||
let fe_load_factor = stats
|
||||
.iter()
|
||||
.map(|(node_addr, stat)| {
|
||||
// total expected avg running time for all currently running queries
|
||||
let total_expect_avg_run_time = stat
|
||||
.since
|
||||
.keys()
|
||||
.map(|f| {
|
||||
let (count, total_duration) =
|
||||
stat.past_query_avg.get(f).unwrap_or(&(0, Duration::ZERO));
|
||||
if *count == 0 {
|
||||
0.0
|
||||
} else {
|
||||
total_duration.as_secs_f64() / *count as f64
|
||||
}
|
||||
})
|
||||
.sum::<f64>();
|
||||
let total_cur_running_time = stat
|
||||
.since
|
||||
.values()
|
||||
.map(|since| since.elapsed().as_secs_f64())
|
||||
.sum::<f64>();
|
||||
(
|
||||
node_addr.to_string(),
|
||||
total_expect_avg_run_time + total_cur_running_time,
|
||||
)
|
||||
})
|
||||
.sorted_by(|(_, load_a), (_, load_b)| {
|
||||
load_a
|
||||
.partial_cmp(load_b)
|
||||
.unwrap_or(std::cmp::Ordering::Equal)
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
debug!("Frontend load factor: {:?}", fe_load_factor);
|
||||
for (node_addr, load) in &fe_load_factor {
|
||||
METRIC_FLOW_BATCHING_ENGINE_GUESS_FE_LOAD
|
||||
.with_label_values(&[&node_addr.to_string()])
|
||||
.observe(*load);
|
||||
}
|
||||
fe_load_factor
|
||||
.into_iter()
|
||||
.map(|(addr, _)| addr)
|
||||
.collect::<Vec<_>>()
|
||||
}
|
||||
}
|
||||
|
||||
pub struct FrontendStatsGuard {
|
||||
stats: Arc<Mutex<HashMap<String, FrontendStat>>>,
|
||||
frontend_addr: String,
|
||||
cur: FlowId,
|
||||
}
|
||||
|
||||
impl Drop for FrontendStatsGuard {
|
||||
fn drop(&mut self) {
|
||||
let mut stats = self.stats.lock().expect("Failed to lock frontend stats");
|
||||
if let Some(stat) = stats.get_mut(&self.frontend_addr) {
|
||||
if let Some(since) = stat.since.remove(&self.cur) {
|
||||
let elapsed = since.elapsed();
|
||||
let (count, total_duration) = stat.past_query_avg.entry(self.cur).or_default();
|
||||
*count += 1;
|
||||
*total_duration += elapsed;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// A simple frontend client able to execute sql using grpc protocol
|
||||
///
|
||||
/// This is for computation-heavy query which need to offload computation to frontend, lifting the load from flownode
|
||||
@@ -83,6 +184,7 @@ pub enum FrontendClient {
|
||||
meta_client: Arc<MetaClient>,
|
||||
chnl_mgr: ChannelManager,
|
||||
auth: Option<FlowAuthHeader>,
|
||||
fe_stats: FrontendStats,
|
||||
},
|
||||
Standalone {
|
||||
/// for the sake of simplicity still use grpc even in standalone mode
|
||||
@@ -114,6 +216,7 @@ impl FrontendClient {
|
||||
ChannelManager::with_config(cfg)
|
||||
},
|
||||
auth,
|
||||
fe_stats: Default::default(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -192,6 +295,7 @@ impl FrontendClient {
|
||||
meta_client: _,
|
||||
chnl_mgr,
|
||||
auth,
|
||||
fe_stats,
|
||||
} = self
|
||||
else {
|
||||
return UnexpectedSnafu {
|
||||
@@ -208,8 +312,21 @@ impl FrontendClient {
|
||||
.duration_since(SystemTime::UNIX_EPOCH)
|
||||
.unwrap()
|
||||
.as_millis() as i64;
|
||||
// shuffle the frontends to avoid always pick the same one
|
||||
frontends.shuffle(&mut rng());
|
||||
let node_addrs_by_load = fe_stats.sort_by_load();
|
||||
// index+1 to load order asc, so that the lightest node has load 1 and non-existent node has load 0
|
||||
let addr2load = node_addrs_by_load
|
||||
.iter()
|
||||
.enumerate()
|
||||
.map(|(i, id)| (id.clone(), i + 1))
|
||||
.collect::<HashMap<_, _>>();
|
||||
// sort frontends by load, from lightest to heaviest
|
||||
frontends.sort_by(|(_, a), (_, b)| {
|
||||
// if not even in stats, treat as 0 load since never been queried
|
||||
let load_a = addr2load.get(&a.peer.addr).unwrap_or(&0);
|
||||
let load_b = addr2load.get(&b.peer.addr).unwrap_or(&0);
|
||||
load_a.cmp(load_b)
|
||||
});
|
||||
debug!("Frontend nodes sorted by load: {:?}", frontends);
|
||||
|
||||
// found node with maximum last_activity_ts
|
||||
for (_, node_info) in frontends
|
||||
@@ -257,6 +374,7 @@ impl FrontendClient {
|
||||
create: CreateTableExpr,
|
||||
catalog: &str,
|
||||
schema: &str,
|
||||
task: Option<&BatchingTask>,
|
||||
) -> Result<u32, Error> {
|
||||
self.handle(
|
||||
Request::Ddl(api::v1::DdlRequest {
|
||||
@@ -265,6 +383,7 @@ impl FrontendClient {
|
||||
catalog,
|
||||
schema,
|
||||
&mut None,
|
||||
task,
|
||||
)
|
||||
.await
|
||||
}
|
||||
@@ -276,15 +395,19 @@ impl FrontendClient {
|
||||
catalog: &str,
|
||||
schema: &str,
|
||||
peer_desc: &mut Option<PeerDesc>,
|
||||
task: Option<&BatchingTask>,
|
||||
) -> Result<u32, Error> {
|
||||
match self {
|
||||
FrontendClient::Distributed { .. } => {
|
||||
FrontendClient::Distributed { fe_stats, .. } => {
|
||||
let db = self.get_random_active_frontend(catalog, schema).await?;
|
||||
|
||||
*peer_desc = Some(PeerDesc::Dist {
|
||||
peer: db.peer.clone(),
|
||||
});
|
||||
|
||||
let flow_id = task.map(|t| t.config.flow_id).unwrap_or_default();
|
||||
let _guard = fe_stats.observe(&db.peer.addr, flow_id);
|
||||
|
||||
db.database
|
||||
.handle_with_retry(req.clone(), GRPC_MAX_RETRIES)
|
||||
.await
|
||||
|
||||
@@ -31,7 +31,9 @@ use crate::batching_mode::time_window::TimeWindowExpr;
|
||||
use crate::batching_mode::MIN_REFRESH_DURATION;
|
||||
use crate::error::{DatatypesSnafu, InternalSnafu, TimeSnafu, UnexpectedSnafu};
|
||||
use crate::metrics::{
|
||||
METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME_RANGE, METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_CNT,
|
||||
METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_CNT, METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_SIZE,
|
||||
METRIC_FLOW_BATCHING_ENGINE_STALLED_QUERY_WINDOW_CNT,
|
||||
METRIC_FLOW_BATCHING_ENGINE_STALLED_WINDOW_SIZE,
|
||||
};
|
||||
use crate::{Error, FlowId};
|
||||
|
||||
@@ -52,6 +54,11 @@ pub struct TaskState {
|
||||
pub(crate) shutdown_rx: oneshot::Receiver<()>,
|
||||
/// Task handle
|
||||
pub(crate) task_handle: Option<tokio::task::JoinHandle<()>>,
|
||||
|
||||
/// min run interval in seconds
|
||||
pub(crate) min_run_interval: Option<u64>,
|
||||
/// max filter number per query
|
||||
pub(crate) max_filter_num: Option<usize>,
|
||||
}
|
||||
impl TaskState {
|
||||
pub fn new(query_ctx: QueryContextRef, shutdown_rx: oneshot::Receiver<()>) -> Self {
|
||||
@@ -63,6 +70,8 @@ impl TaskState {
|
||||
exec_state: ExecState::Idle,
|
||||
shutdown_rx,
|
||||
task_handle: None,
|
||||
min_run_interval: None,
|
||||
max_filter_num: None,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -86,24 +95,21 @@ impl TaskState {
|
||||
/// TODO: Make this behavior configurable.
|
||||
pub fn get_next_start_query_time(
|
||||
&self,
|
||||
flow_id: FlowId,
|
||||
time_window_size: &Option<Duration>,
|
||||
_flow_id: FlowId,
|
||||
_time_window_size: &Option<Duration>,
|
||||
max_timeout: Option<Duration>,
|
||||
) -> Instant {
|
||||
let last_duration = max_timeout
|
||||
let next_duration = max_timeout
|
||||
.unwrap_or(self.last_query_duration)
|
||||
.min(self.last_query_duration)
|
||||
.max(MIN_REFRESH_DURATION);
|
||||
|
||||
let next_duration = time_window_size
|
||||
.map(|t| {
|
||||
let half = t / 2;
|
||||
half.max(last_duration)
|
||||
})
|
||||
.unwrap_or(last_duration);
|
||||
.max(
|
||||
self.min_run_interval
|
||||
.map(Duration::from_secs)
|
||||
.unwrap_or(MIN_REFRESH_DURATION),
|
||||
);
|
||||
|
||||
// if have dirty time window, execute immediately to clean dirty time window
|
||||
if self.dirty_time_windows.windows.is_empty() {
|
||||
/*if self.dirty_time_windows.windows.is_empty() {
|
||||
self.last_update_time + next_duration
|
||||
} else {
|
||||
debug!(
|
||||
@@ -113,7 +119,10 @@ impl TaskState {
|
||||
self.dirty_time_windows.windows
|
||||
);
|
||||
Instant::now()
|
||||
}
|
||||
}*/
|
||||
|
||||
// wait for next duration anyway
|
||||
self.last_update_time + next_duration
|
||||
}
|
||||
}
|
||||
|
||||
@@ -206,47 +215,63 @@ impl DirtyTimeWindows {
|
||||
|
||||
// get the first `window_cnt` time windows
|
||||
let max_time_range = window_size * window_cnt as i32;
|
||||
let nth = {
|
||||
let mut cur_time_range = chrono::Duration::zero();
|
||||
let mut nth_key = None;
|
||||
for (idx, (start, end)) in self.windows.iter().enumerate() {
|
||||
// if time range is too long, stop
|
||||
if cur_time_range > max_time_range {
|
||||
nth_key = Some(*start);
|
||||
break;
|
||||
}
|
||||
|
||||
// if we have enough time windows, stop
|
||||
if idx >= window_cnt {
|
||||
nth_key = Some(*start);
|
||||
break;
|
||||
}
|
||||
let mut to_be_query = BTreeMap::new();
|
||||
let mut new_windows = self.windows.clone();
|
||||
let mut cur_time_range = chrono::Duration::zero();
|
||||
for (idx, (start, end)) in self.windows.iter().enumerate() {
|
||||
let first_end = start
|
||||
.add_duration(window_size.to_std().unwrap())
|
||||
.context(TimeSnafu)?;
|
||||
let end = end.unwrap_or(first_end);
|
||||
|
||||
if let Some(end) = end {
|
||||
if let Some(x) = end.sub(start) {
|
||||
cur_time_range += x;
|
||||
}
|
||||
}
|
||||
// if time range is too long, stop
|
||||
if cur_time_range >= max_time_range {
|
||||
break;
|
||||
}
|
||||
|
||||
nth_key
|
||||
};
|
||||
let first_nth = {
|
||||
if let Some(nth) = nth {
|
||||
let mut after = self.windows.split_off(&nth);
|
||||
std::mem::swap(&mut self.windows, &mut after);
|
||||
|
||||
after
|
||||
} else {
|
||||
std::mem::take(&mut self.windows)
|
||||
// if we have enough time windows, stop
|
||||
if idx >= window_cnt {
|
||||
break;
|
||||
}
|
||||
};
|
||||
|
||||
if let Some(x) = end.sub(start) {
|
||||
if cur_time_range + x <= max_time_range {
|
||||
to_be_query.insert(*start, Some(end));
|
||||
new_windows.remove(start);
|
||||
cur_time_range += x;
|
||||
} else {
|
||||
// too large a window, split it
|
||||
// split at window_size * times
|
||||
let surplus = max_time_range - cur_time_range;
|
||||
let times = surplus.num_seconds() / window_size.num_seconds();
|
||||
|
||||
let split_offset = window_size * times as i32;
|
||||
let split_at = start
|
||||
.add_duration(split_offset.to_std().unwrap())
|
||||
.context(TimeSnafu)?;
|
||||
to_be_query.insert(*start, Some(split_at));
|
||||
|
||||
// remove the original window
|
||||
new_windows.remove(start);
|
||||
new_windows.insert(split_at, Some(end));
|
||||
cur_time_range += split_offset;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
self.windows = new_windows;
|
||||
|
||||
METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_CNT
|
||||
.with_label_values(&[flow_id.to_string().as_str()])
|
||||
.observe(first_nth.len() as f64);
|
||||
.observe(to_be_query.len() as f64);
|
||||
|
||||
let full_time_range = first_nth
|
||||
METRIC_FLOW_BATCHING_ENGINE_STALLED_QUERY_WINDOW_CNT
|
||||
.with_label_values(&[flow_id.to_string().as_str()])
|
||||
.observe(self.windows.len() as f64);
|
||||
|
||||
let full_time_range = to_be_query
|
||||
.iter()
|
||||
.fold(chrono::Duration::zero(), |acc, (start, end)| {
|
||||
if let Some(end) = end {
|
||||
@@ -256,12 +281,27 @@ impl DirtyTimeWindows {
|
||||
}
|
||||
})
|
||||
.num_seconds() as f64;
|
||||
METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME_RANGE
|
||||
METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_SIZE
|
||||
.with_label_values(&[flow_id.to_string().as_str()])
|
||||
.observe(full_time_range);
|
||||
|
||||
let stalled_time_range =
|
||||
self.windows
|
||||
.iter()
|
||||
.fold(chrono::Duration::zero(), |acc, (start, end)| {
|
||||
if let Some(end) = end {
|
||||
acc + end.sub(start).unwrap_or(chrono::Duration::zero())
|
||||
} else {
|
||||
acc
|
||||
}
|
||||
});
|
||||
|
||||
METRIC_FLOW_BATCHING_ENGINE_STALLED_WINDOW_SIZE
|
||||
.with_label_values(&[flow_id.to_string().as_str()])
|
||||
.observe(stalled_time_range.num_seconds() as f64);
|
||||
|
||||
let mut expr_lst = vec![];
|
||||
for (start, end) in first_nth.into_iter() {
|
||||
for (start, end) in to_be_query.into_iter() {
|
||||
// align using time window exprs
|
||||
let (start, end) = if let Some(ctx) = task_ctx {
|
||||
let Some(time_window_expr) = &ctx.config.time_window_expr else {
|
||||
@@ -495,6 +535,64 @@ mod test {
|
||||
"((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:00:21' AS TIMESTAMP)))",
|
||||
)
|
||||
),
|
||||
// split range
|
||||
(
|
||||
Vec::from_iter((0..20).map(|i|Timestamp::new_second(i*3)).chain(std::iter::once(
|
||||
Timestamp::new_second(60 + 3 * (DirtyTimeWindows::MERGE_DIST as i64 + 1)),
|
||||
))),
|
||||
(chrono::Duration::seconds(3), None),
|
||||
BTreeMap::from([
|
||||
(
|
||||
Timestamp::new_second(0),
|
||||
Some(Timestamp::new_second(
|
||||
60
|
||||
)),
|
||||
),
|
||||
(
|
||||
Timestamp::new_second(60 + 3 * (DirtyTimeWindows::MERGE_DIST as i64 + 1)),
|
||||
Some(Timestamp::new_second(
|
||||
60 + 3 * (DirtyTimeWindows::MERGE_DIST as i64 + 1) + 3
|
||||
)),
|
||||
)]),
|
||||
Some(
|
||||
"((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:01:00' AS TIMESTAMP)))",
|
||||
)
|
||||
),
|
||||
// split 2 min into 1 min
|
||||
(
|
||||
Vec::from_iter((0..40).map(|i|Timestamp::new_second(i*3))),
|
||||
(chrono::Duration::seconds(3), None),
|
||||
BTreeMap::from([
|
||||
(
|
||||
Timestamp::new_second(0),
|
||||
Some(Timestamp::new_second(
|
||||
40 * 3
|
||||
)),
|
||||
)]),
|
||||
Some(
|
||||
"((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:01:00' AS TIMESTAMP)))",
|
||||
)
|
||||
),
|
||||
// split 3s + 1min into 3s + 57s
|
||||
(
|
||||
Vec::from_iter(std::iter::once(Timestamp::new_second(0)).chain((0..40).map(|i|Timestamp::new_second(20+i*3)))),
|
||||
(chrono::Duration::seconds(3), None),
|
||||
BTreeMap::from([
|
||||
(
|
||||
Timestamp::new_second(0),
|
||||
Some(Timestamp::new_second(
|
||||
3
|
||||
)),
|
||||
),(
|
||||
Timestamp::new_second(20),
|
||||
Some(Timestamp::new_second(
|
||||
140
|
||||
)),
|
||||
)]),
|
||||
Some(
|
||||
"(((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:00:03' AS TIMESTAMP))) OR ((ts >= CAST('1970-01-01 00:00:20' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:01:17' AS TIMESTAMP))))",
|
||||
)
|
||||
),
|
||||
// expired
|
||||
(
|
||||
vec![
|
||||
@@ -511,6 +609,8 @@ mod test {
|
||||
None
|
||||
),
|
||||
];
|
||||
// let len = testcases.len();
|
||||
// let testcases = testcases[(len - 2)..(len - 1)].to_vec();
|
||||
for (lower_bounds, (window_size, expire_lower_bound), expected, expected_filter_expr) in
|
||||
testcases
|
||||
{
|
||||
|
||||
@@ -61,7 +61,9 @@ use crate::error::{
|
||||
SubstraitEncodeLogicalPlanSnafu, UnexpectedSnafu,
|
||||
};
|
||||
use crate::metrics::{
|
||||
METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME, METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY,
|
||||
METRIC_FLOW_BATCHING_ENGINE_ERROR_CNT, METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME,
|
||||
METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY, METRIC_FLOW_BATCHING_ENGINE_START_QUERY_CNT,
|
||||
METRIC_FLOW_ROWS,
|
||||
};
|
||||
use crate::{Error, FlowId};
|
||||
|
||||
@@ -144,6 +146,12 @@ impl BatchingTask {
|
||||
})
|
||||
}
|
||||
|
||||
pub fn adjust(&self, min_run_interval_secs: u64, max_filter_num_per_query: usize) {
|
||||
let mut state = self.state.write().unwrap();
|
||||
state.min_run_interval = Some(min_run_interval_secs);
|
||||
state.max_filter_num = Some(max_filter_num_per_query);
|
||||
}
|
||||
|
||||
/// mark time window range (now - expire_after, now) as dirty (or (0, now) if expire_after not set)
|
||||
///
|
||||
/// useful for flush_flow to flush dirty time windows range
|
||||
@@ -280,7 +288,7 @@ impl BatchingTask {
|
||||
let catalog = &self.config.sink_table_name[0];
|
||||
let schema = &self.config.sink_table_name[1];
|
||||
frontend_client
|
||||
.create(expr.clone(), catalog, schema)
|
||||
.create(expr.clone(), catalog, schema, Some(self))
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
@@ -361,7 +369,7 @@ impl BatchingTask {
|
||||
};
|
||||
|
||||
frontend_client
|
||||
.handle(req, catalog, schema, &mut peer_desc)
|
||||
.handle(req, catalog, schema, &mut peer_desc, Some(self))
|
||||
.await
|
||||
};
|
||||
|
||||
@@ -371,6 +379,9 @@ impl BatchingTask {
|
||||
"Flow {flow_id} executed, affected_rows: {affected_rows:?}, elapsed: {:?}",
|
||||
elapsed
|
||||
);
|
||||
METRIC_FLOW_ROWS
|
||||
.with_label_values(&[format!("{}-out-batching", flow_id).as_str()])
|
||||
.inc_by(*affected_rows as _);
|
||||
} else if let Err(err) = &res {
|
||||
warn!(
|
||||
"Failed to execute Flow {flow_id} on frontend {:?}, result: {err:?}, elapsed: {:?} with query: {}",
|
||||
@@ -410,6 +421,7 @@ impl BatchingTask {
|
||||
engine: QueryEngineRef,
|
||||
frontend_client: Arc<FrontendClient>,
|
||||
) {
|
||||
let flow_id_str = self.config.flow_id.to_string();
|
||||
loop {
|
||||
// first check if shutdown signal is received
|
||||
// if so, break the loop
|
||||
@@ -427,6 +439,9 @@ impl BatchingTask {
|
||||
Err(TryRecvError::Empty) => (),
|
||||
}
|
||||
}
|
||||
METRIC_FLOW_BATCHING_ENGINE_START_QUERY_CNT
|
||||
.with_label_values(&[&flow_id_str])
|
||||
.inc();
|
||||
|
||||
let new_query = match self.gen_insert_plan(&engine).await {
|
||||
Ok(new_query) => new_query,
|
||||
@@ -473,6 +488,9 @@ impl BatchingTask {
|
||||
}
|
||||
// TODO(discord9): this error should have better place to go, but for now just print error, also more context is needed
|
||||
Err(err) => {
|
||||
METRIC_FLOW_BATCHING_ENGINE_ERROR_CNT
|
||||
.with_label_values(&[&flow_id_str])
|
||||
.inc();
|
||||
match new_query {
|
||||
Some(query) => {
|
||||
common_telemetry::error!(err; "Failed to execute query for flow={} with query: {query}", self.config.flow_id)
|
||||
@@ -580,19 +598,20 @@ impl BatchingTask {
|
||||
),
|
||||
})?;
|
||||
|
||||
let expr = self
|
||||
.state
|
||||
.write()
|
||||
.unwrap()
|
||||
.dirty_time_windows
|
||||
.gen_filter_exprs(
|
||||
let expr = {
|
||||
let mut state = self.state.write().unwrap();
|
||||
let max_window_cnt = state
|
||||
.max_filter_num
|
||||
.unwrap_or(DirtyTimeWindows::MAX_FILTER_NUM);
|
||||
state.dirty_time_windows.gen_filter_exprs(
|
||||
&col_name,
|
||||
Some(l),
|
||||
window_size,
|
||||
DirtyTimeWindows::MAX_FILTER_NUM,
|
||||
max_window_cnt,
|
||||
self.config.flow_id,
|
||||
Some(self),
|
||||
)?;
|
||||
)?
|
||||
};
|
||||
|
||||
debug!(
|
||||
"Flow id={:?}, Generated filter expr: {:?}",
|
||||
|
||||
@@ -42,6 +42,14 @@ lazy_static! {
|
||||
vec![60., 2. * 60., 3. * 60., 5. * 60., 10. * 60.]
|
||||
)
|
||||
.unwrap();
|
||||
pub static ref METRIC_FLOW_BATCHING_ENGINE_STALLED_QUERY_WINDOW_CNT: HistogramVec =
|
||||
register_histogram_vec!(
|
||||
"greptime_flow_batching_engine_stalled_query_window_cnt",
|
||||
"flow batching engine stalled query time window count",
|
||||
&["flow_id"],
|
||||
vec![0.0, 5., 10., 20., 40.]
|
||||
)
|
||||
.unwrap();
|
||||
pub static ref METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_CNT: HistogramVec =
|
||||
register_histogram_vec!(
|
||||
"greptime_flow_batching_engine_query_window_cnt",
|
||||
@@ -50,19 +58,57 @@ lazy_static! {
|
||||
vec![0.0, 5., 10., 20., 40.]
|
||||
)
|
||||
.unwrap();
|
||||
pub static ref METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME_RANGE: HistogramVec =
|
||||
pub static ref METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_SIZE: HistogramVec =
|
||||
register_histogram_vec!(
|
||||
"greptime_flow_batching_engine_query_time_range_secs",
|
||||
"flow batching engine query time range(seconds)",
|
||||
"greptime_flow_batching_engine_query_window_size_secs",
|
||||
"flow batching engine query window size(seconds)",
|
||||
&["flow_id"],
|
||||
vec![60., 4. * 60., 16. * 60., 64. * 60., 256. * 60.]
|
||||
)
|
||||
.unwrap();
|
||||
pub static ref METRIC_FLOW_BATCHING_ENGINE_STALLED_WINDOW_SIZE: HistogramVec =
|
||||
register_histogram_vec!(
|
||||
"greptime_flow_batching_engine_stalled_window_size_secs",
|
||||
"flow batching engine stalled window size(seconds)",
|
||||
&["flow_id"],
|
||||
vec![60., 4. * 60., 16. * 60., 64. * 60., 256. * 60.]
|
||||
)
|
||||
.unwrap();
|
||||
pub static ref METRIC_FLOW_BATCHING_ENGINE_BULK_MARK_TIME_WINDOW_RANGE: HistogramVec =
|
||||
register_histogram_vec!(
|
||||
"greptime_flow_batching_engine_bulk_mark_time_window_range_secs",
|
||||
"flow batching engine query time window range marked by bulk memtable in seconds",
|
||||
&["flow_id"],
|
||||
vec![0.0, 60., 4. * 60., 16. * 60., 64. * 60., 256. * 60.]
|
||||
)
|
||||
.unwrap();
|
||||
pub static ref METRIC_FLOW_BATCHING_ENGINE_START_QUERY_CNT: IntCounterVec =
|
||||
register_int_counter_vec!(
|
||||
"greptime_flow_batching_start_query_count",
|
||||
"flow batching engine started query count",
|
||||
&["flow_id"],
|
||||
)
|
||||
.unwrap();
|
||||
pub static ref METRIC_FLOW_BATCHING_ENGINE_ERROR_CNT: IntCounterVec =
|
||||
register_int_counter_vec!(
|
||||
"greptime_flow_batching_error_count",
|
||||
"flow batching engine error count per flow id",
|
||||
&["flow_id"]
|
||||
)
|
||||
.unwrap();
|
||||
pub static ref METRIC_FLOW_BATCHING_ENGINE_GUESS_FE_LOAD: HistogramVec =
|
||||
register_histogram_vec!(
|
||||
"greptime_flow_batching_engine_guess_fe_load",
|
||||
"flow batching engine guessed frontend load",
|
||||
&["fe_addr"],
|
||||
vec![60., 4. * 60., 16. * 60., 64. * 60., 256. * 60.]
|
||||
)
|
||||
.unwrap();
|
||||
pub static ref METRIC_FLOW_RUN_INTERVAL_MS: IntGauge =
|
||||
register_int_gauge!("greptime_flow_run_interval_ms", "flow run interval in ms").unwrap();
|
||||
pub static ref METRIC_FLOW_ROWS: IntCounterVec = register_int_counter_vec!(
|
||||
"greptime_flow_processed_rows",
|
||||
"Count of rows flowing through the system",
|
||||
"Count of rows flowing through the system.",
|
||||
&["direction"]
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
@@ -17,6 +17,7 @@
|
||||
use std::net::SocketAddr;
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::v1::flow::DirtyWindowRequests;
|
||||
use api::v1::{RowDeleteRequests, RowInsertRequests};
|
||||
use cache::{TABLE_FLOWNODE_SET_CACHE_NAME, TABLE_ROUTE_CACHE_NAME};
|
||||
use catalog::CatalogManagerRef;
|
||||
@@ -136,6 +137,18 @@ impl flow_server::Flow for FlowService {
|
||||
.map(Response::new)
|
||||
.map_err(to_status_with_last_err)
|
||||
}
|
||||
|
||||
async fn handle_mark_dirty_time_window(
|
||||
&self,
|
||||
reqs: Request<DirtyWindowRequests>,
|
||||
) -> Result<Response<FlowResponse>, Status> {
|
||||
self.dual_engine
|
||||
.batching_engine()
|
||||
.handle_mark_dirty_time_window(reqs.into_inner())
|
||||
.await
|
||||
.map(Response::new)
|
||||
.map_err(to_status_with_last_err)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
|
||||
@@ -25,7 +25,6 @@ common-catalog.workspace = true
|
||||
common-config.workspace = true
|
||||
common-datasource.workspace = true
|
||||
common-error.workspace = true
|
||||
common-frontend.workspace = true
|
||||
common-function.workspace = true
|
||||
common-grpc.workspace = true
|
||||
common-macro.workspace = true
|
||||
|
||||
@@ -30,7 +30,6 @@ use std::time::SystemTime;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
|
||||
use catalog::process_manager::ProcessManagerRef;
|
||||
use catalog::CatalogManagerRef;
|
||||
use client::OutputData;
|
||||
use common_base::Plugins;
|
||||
@@ -81,7 +80,6 @@ use crate::error::{
|
||||
};
|
||||
use crate::limiter::LimiterRef;
|
||||
use crate::slow_query_recorder::SlowQueryRecorder;
|
||||
use crate::stream_wrapper::StreamWrapper;
|
||||
|
||||
/// The frontend instance contains necessary components, and implements many
|
||||
/// traits, like [`servers::query_handler::grpc::GrpcQueryHandler`],
|
||||
@@ -98,7 +96,6 @@ pub struct Instance {
|
||||
table_metadata_manager: TableMetadataManagerRef,
|
||||
slow_query_recorder: Option<SlowQueryRecorder>,
|
||||
limiter: Option<LimiterRef>,
|
||||
process_manager: ProcessManagerRef,
|
||||
}
|
||||
|
||||
impl Instance {
|
||||
@@ -156,10 +153,6 @@ impl Instance {
|
||||
pub fn inserter(&self) -> &InserterRef {
|
||||
&self.inserter
|
||||
}
|
||||
|
||||
pub fn process_manager(&self) -> &ProcessManagerRef {
|
||||
&self.process_manager
|
||||
}
|
||||
}
|
||||
|
||||
fn parse_stmt(sql: &str, dialect: &(dyn Dialect + Send + Sync)) -> Result<Vec<Statement>> {
|
||||
@@ -179,14 +172,6 @@ impl Instance {
|
||||
None
|
||||
};
|
||||
|
||||
let ticket = self.process_manager.register_query(
|
||||
query_ctx.current_catalog().to_string(),
|
||||
vec![query_ctx.current_schema()],
|
||||
stmt.to_string(),
|
||||
"unknown".to_string(),
|
||||
None,
|
||||
);
|
||||
|
||||
let output = match stmt {
|
||||
Statement::Query(_) | Statement::Explain(_) | Statement::Delete(_) => {
|
||||
// TODO: remove this when format is supported in datafusion
|
||||
@@ -230,20 +215,7 @@ impl Instance {
|
||||
}
|
||||
};
|
||||
|
||||
match output {
|
||||
Ok(output) => {
|
||||
let Output { meta, data } = output;
|
||||
|
||||
let data = match data {
|
||||
OutputData::Stream(stream) => {
|
||||
OutputData::Stream(Box::pin(StreamWrapper::new(stream, ticket)))
|
||||
}
|
||||
other => other,
|
||||
};
|
||||
Ok(Output { data, meta })
|
||||
}
|
||||
Err(e) => Err(e).context(TableOperationSnafu),
|
||||
}
|
||||
output.context(TableOperationSnafu)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -15,7 +15,6 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use cache::{TABLE_FLOWNODE_SET_CACHE_NAME, TABLE_ROUTE_CACHE_NAME};
|
||||
use catalog::process_manager::ProcessManagerRef;
|
||||
use catalog::CatalogManagerRef;
|
||||
use common_base::Plugins;
|
||||
use common_meta::cache::{LayeredCacheRegistryRef, TableRouteCacheRef};
|
||||
@@ -55,11 +54,9 @@ pub struct FrontendBuilder {
|
||||
node_manager: NodeManagerRef,
|
||||
plugins: Option<Plugins>,
|
||||
procedure_executor: ProcedureExecutorRef,
|
||||
process_manager: ProcessManagerRef,
|
||||
}
|
||||
|
||||
impl FrontendBuilder {
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub fn new(
|
||||
options: FrontendOptions,
|
||||
kv_backend: KvBackendRef,
|
||||
@@ -67,7 +64,6 @@ impl FrontendBuilder {
|
||||
catalog_manager: CatalogManagerRef,
|
||||
node_manager: NodeManagerRef,
|
||||
procedure_executor: ProcedureExecutorRef,
|
||||
process_manager: ProcessManagerRef,
|
||||
) -> Self {
|
||||
Self {
|
||||
options,
|
||||
@@ -78,7 +74,6 @@ impl FrontendBuilder {
|
||||
node_manager,
|
||||
plugins: None,
|
||||
procedure_executor,
|
||||
process_manager,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -100,7 +95,7 @@ impl FrontendBuilder {
|
||||
let kv_backend = self.kv_backend;
|
||||
let node_manager = self.node_manager;
|
||||
let plugins = self.plugins.unwrap_or_default();
|
||||
let process_manager = self.process_manager;
|
||||
|
||||
let table_route_cache: TableRouteCacheRef =
|
||||
self.layered_cache_registry
|
||||
.get()
|
||||
@@ -221,7 +216,6 @@ impl FrontendBuilder {
|
||||
table_metadata_manager: Arc::new(TableMetadataManager::new(kv_backend)),
|
||||
slow_query_recorder,
|
||||
limiter,
|
||||
process_manager,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -35,8 +35,8 @@ use servers::query_handler::grpc::GrpcQueryHandler;
|
||||
use servers::query_handler::sql::SqlQueryHandler;
|
||||
use session::context::QueryContextRef;
|
||||
use snafu::{ensure, OptionExt, ResultExt};
|
||||
use table::metadata::TableId;
|
||||
use table::table_name::TableName;
|
||||
use table::TableRef;
|
||||
|
||||
use crate::error::{
|
||||
CatalogSnafu, DataFusionSnafu, Error, InFlightWriteBytesExceededSnafu,
|
||||
@@ -235,34 +235,33 @@ impl GrpcQueryHandler for Instance {
|
||||
|
||||
async fn put_record_batch(
|
||||
&self,
|
||||
table: &TableName,
|
||||
table_id: &mut Option<TableId>,
|
||||
table_name: &TableName,
|
||||
table_ref: &mut Option<TableRef>,
|
||||
decoder: &mut FlightDecoder,
|
||||
data: FlightData,
|
||||
) -> Result<AffectedRows> {
|
||||
let table_id = if let Some(table_id) = table_id {
|
||||
*table_id
|
||||
let table = if let Some(table) = table_ref {
|
||||
table.clone()
|
||||
} else {
|
||||
let table = self
|
||||
.catalog_manager()
|
||||
.table(
|
||||
&table.catalog_name,
|
||||
&table.schema_name,
|
||||
&table.table_name,
|
||||
&table_name.catalog_name,
|
||||
&table_name.schema_name,
|
||||
&table_name.table_name,
|
||||
None,
|
||||
)
|
||||
.await
|
||||
.context(CatalogSnafu)?
|
||||
.with_context(|| TableNotFoundSnafu {
|
||||
table_name: table.to_string(),
|
||||
table_name: table_name.to_string(),
|
||||
})?;
|
||||
let id = table.table_info().table_id();
|
||||
*table_id = Some(id);
|
||||
id
|
||||
*table_ref = Some(table.clone());
|
||||
table
|
||||
};
|
||||
|
||||
self.inserter
|
||||
.handle_bulk_insert(table_id, decoder, data)
|
||||
.handle_bulk_insert(table, decoder, data)
|
||||
.await
|
||||
.context(TableOperationSnafu)
|
||||
}
|
||||
|
||||
@@ -23,4 +23,3 @@ pub(crate) mod metrics;
|
||||
pub mod server;
|
||||
pub mod service_config;
|
||||
pub(crate) mod slow_query_recorder;
|
||||
mod stream_wrapper;
|
||||
|
||||
@@ -20,7 +20,6 @@ use common_base::Plugins;
|
||||
use common_config::Configurable;
|
||||
use servers::error::Error as ServerError;
|
||||
use servers::grpc::builder::GrpcServerBuilder;
|
||||
use servers::grpc::frontend_grpc_handler::FrontendGrpcHandler;
|
||||
use servers::grpc::greptime_handler::GreptimeRequestHandler;
|
||||
use servers::grpc::{GrpcOptions, GrpcServer, GrpcServerConfig};
|
||||
use servers::http::event::LogValidatorRef;
|
||||
@@ -158,14 +157,11 @@ where
|
||||
opts.grpc.flight_compression,
|
||||
);
|
||||
|
||||
let frontend_grpc_handler =
|
||||
FrontendGrpcHandler::new(self.instance.process_manager().clone());
|
||||
let grpc_server = builder
|
||||
.database_handler(greptime_request_handler.clone())
|
||||
.prometheus_handler(self.instance.clone(), user_provider.clone())
|
||||
.otel_arrow_handler(OtelArrowServiceHandler(self.instance.clone()))
|
||||
.flight_handler(Arc::new(greptime_request_handler))
|
||||
.frontend_grpc_handler(frontend_grpc_handler)
|
||||
.build();
|
||||
Ok(grpc_server)
|
||||
}
|
||||
|
||||
@@ -1,60 +0,0 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::pin::Pin;
|
||||
use std::task::{Context, Poll};
|
||||
|
||||
use common_recordbatch::adapter::RecordBatchMetrics;
|
||||
use common_recordbatch::{OrderOption, RecordBatch, RecordBatchStream, SendableRecordBatchStream};
|
||||
use datatypes::schema::SchemaRef;
|
||||
use futures::Stream;
|
||||
|
||||
pub struct StreamWrapper<T> {
|
||||
inner: SendableRecordBatchStream,
|
||||
_attachment: T,
|
||||
}
|
||||
|
||||
impl<T> Unpin for StreamWrapper<T> {}
|
||||
|
||||
impl<T> StreamWrapper<T> {
|
||||
pub fn new(stream: SendableRecordBatchStream, attachment: T) -> Self {
|
||||
Self {
|
||||
inner: stream,
|
||||
_attachment: attachment,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Stream for StreamWrapper<T> {
|
||||
type Item = common_recordbatch::error::Result<RecordBatch>;
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||
let this = &mut *self;
|
||||
Pin::new(&mut this.inner).poll_next(cx)
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> RecordBatchStream for StreamWrapper<T> {
|
||||
fn schema(&self) -> SchemaRef {
|
||||
self.inner.schema()
|
||||
}
|
||||
|
||||
fn output_ordering(&self) -> Option<&[OrderOption]> {
|
||||
self.inner.output_ordering()
|
||||
}
|
||||
|
||||
fn metrics(&self) -> Option<RecordBatchMetrics> {
|
||||
self.inner.metrics()
|
||||
}
|
||||
}
|
||||
@@ -12,18 +12,28 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::collections::HashSet;
|
||||
|
||||
use ahash::{HashMap, HashMapExt};
|
||||
use api::v1::flow::{DirtyWindowRequest, WindowRange};
|
||||
use api::v1::region::{
|
||||
bulk_insert_request, region_request, BulkInsertRequest, RegionRequest, RegionRequestHeader,
|
||||
};
|
||||
use api::v1::ArrowIpc;
|
||||
use arrow::array::{
|
||||
Array, TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray,
|
||||
TimestampSecondArray,
|
||||
};
|
||||
use arrow::datatypes::{DataType, Int64Type, TimeUnit};
|
||||
use arrow::record_batch::RecordBatch;
|
||||
use common_base::AffectedRows;
|
||||
use common_grpc::flight::{FlightDecoder, FlightEncoder, FlightMessage};
|
||||
use common_grpc::FlightData;
|
||||
use common_telemetry::error;
|
||||
use common_telemetry::tracing_context::TracingContext;
|
||||
use snafu::ResultExt;
|
||||
use store_api::storage::RegionId;
|
||||
use table::metadata::TableId;
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
use store_api::storage::{RegionId, TableId};
|
||||
use table::TableRef;
|
||||
|
||||
use crate::insert::Inserter;
|
||||
use crate::{error, metrics};
|
||||
@@ -32,10 +42,11 @@ impl Inserter {
|
||||
/// Handle bulk insert request.
|
||||
pub async fn handle_bulk_insert(
|
||||
&self,
|
||||
table_id: TableId,
|
||||
table: TableRef,
|
||||
decoder: &mut FlightDecoder,
|
||||
data: FlightData,
|
||||
) -> error::Result<AffectedRows> {
|
||||
let table_id = table.table_info().table_id();
|
||||
let decode_timer = metrics::HANDLE_BULK_INSERT_ELAPSED
|
||||
.with_label_values(&["decode_request"])
|
||||
.start_timer();
|
||||
@@ -48,6 +59,20 @@ impl Inserter {
|
||||
return Ok(0);
|
||||
};
|
||||
decode_timer.observe_duration();
|
||||
if let Some((min, max)) = compute_timestamp_range(
|
||||
&record_batch,
|
||||
&table
|
||||
.table_info()
|
||||
.meta
|
||||
.schema
|
||||
.timestamp_column()
|
||||
.as_ref()
|
||||
.unwrap()
|
||||
.name,
|
||||
)? {
|
||||
// notify flownode to update dirty time windows.
|
||||
self.update_flow_dirty_window(table_id, min, max);
|
||||
}
|
||||
metrics::BULK_REQUEST_MESSAGE_SIZE.observe(body_size as f64);
|
||||
metrics::BULK_REQUEST_ROWS
|
||||
.with_label_values(&["raw"])
|
||||
@@ -216,4 +241,88 @@ impl Inserter {
|
||||
crate::metrics::DIST_INGEST_ROW_COUNT.inc_by(rows_inserted as u64);
|
||||
Ok(rows_inserted)
|
||||
}
|
||||
|
||||
fn update_flow_dirty_window(&self, table_id: TableId, min: i64, max: i64) {
|
||||
let table_flownode_set_cache = self.table_flownode_set_cache.clone();
|
||||
let node_manager = self.node_manager.clone();
|
||||
common_runtime::spawn_global(async move {
|
||||
let result = table_flownode_set_cache
|
||||
.get(table_id)
|
||||
.await
|
||||
.context(error::RequestInsertsSnafu);
|
||||
let flownodes = match result {
|
||||
Ok(flownodes) => flownodes.unwrap_or_default(),
|
||||
Err(e) => {
|
||||
error!(e; "Failed to get flownodes for table id: {}", table_id);
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
let peers: HashSet<_> = flownodes.values().cloned().collect();
|
||||
for peer in peers {
|
||||
let node_manager = node_manager.clone();
|
||||
common_runtime::spawn_global(async move {
|
||||
if let Err(e) = node_manager
|
||||
.flownode(&peer)
|
||||
.await
|
||||
.handle_mark_window_dirty(DirtyWindowRequest {
|
||||
table_id,
|
||||
dirty_time_ranges: vec![WindowRange {
|
||||
start_value: min,
|
||||
end_value: max,
|
||||
}],
|
||||
})
|
||||
.await
|
||||
.context(error::RequestInsertsSnafu)
|
||||
{
|
||||
error!(e; "Failed to mark time window as dirty, table: {}, min: {}, max: {}", table_id, min, max);
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
/// Calculate the timestamp range of record batch. Return `None` if record batch is empty.
|
||||
fn compute_timestamp_range(
|
||||
rb: &RecordBatch,
|
||||
timestamp_index_name: &str,
|
||||
) -> error::Result<Option<(i64, i64)>> {
|
||||
let ts_col = rb
|
||||
.column_by_name(timestamp_index_name)
|
||||
.context(error::ColumnNotFoundSnafu {
|
||||
msg: timestamp_index_name,
|
||||
})?;
|
||||
if rb.num_rows() == 0 {
|
||||
return Ok(None);
|
||||
}
|
||||
let primitive = match ts_col.data_type() {
|
||||
DataType::Timestamp(unit, _) => match unit {
|
||||
TimeUnit::Second => ts_col
|
||||
.as_any()
|
||||
.downcast_ref::<TimestampSecondArray>()
|
||||
.unwrap()
|
||||
.reinterpret_cast::<Int64Type>(),
|
||||
TimeUnit::Millisecond => ts_col
|
||||
.as_any()
|
||||
.downcast_ref::<TimestampMillisecondArray>()
|
||||
.unwrap()
|
||||
.reinterpret_cast::<Int64Type>(),
|
||||
TimeUnit::Microsecond => ts_col
|
||||
.as_any()
|
||||
.downcast_ref::<TimestampMicrosecondArray>()
|
||||
.unwrap()
|
||||
.reinterpret_cast::<Int64Type>(),
|
||||
TimeUnit::Nanosecond => ts_col
|
||||
.as_any()
|
||||
.downcast_ref::<TimestampNanosecondArray>()
|
||||
.unwrap()
|
||||
.reinterpret_cast::<Int64Type>(),
|
||||
},
|
||||
t => {
|
||||
return error::InvalidTimeIndexTypeSnafu { ty: t.clone() }.fail();
|
||||
}
|
||||
};
|
||||
|
||||
Ok(arrow::compute::min(&primitive).zip(arrow::compute::max(&primitive)))
|
||||
}
|
||||
|
||||
@@ -837,6 +837,13 @@ pub enum Error {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Invalid time index type: {}", ty))]
|
||||
InvalidTimeIndexType {
|
||||
ty: arrow::datatypes::DataType,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
}
|
||||
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
@@ -964,6 +971,7 @@ impl ErrorExt for Error {
|
||||
Error::ColumnOptions { source, .. } => source.status_code(),
|
||||
Error::DecodeFlightData { source, .. } => source.status_code(),
|
||||
Error::ComputeArrow { .. } => StatusCode::Internal,
|
||||
Error::InvalidTimeIndexType { .. } => StatusCode::InvalidArguments,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -12,7 +12,7 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use api::v1::flow::FlowRequestHeader;
|
||||
use api::v1::flow::{AdjustFlow, FlowRequestHeader};
|
||||
use async_trait::async_trait;
|
||||
use common_error::ext::BoxedError;
|
||||
use common_function::handlers::FlowServiceHandler;
|
||||
@@ -22,6 +22,7 @@ use common_query::error::Result;
|
||||
use common_telemetry::tracing_context::TracingContext;
|
||||
use futures::stream::FuturesUnordered;
|
||||
use futures::StreamExt;
|
||||
use serde_json::json;
|
||||
use session::context::QueryContextRef;
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
|
||||
@@ -57,9 +58,96 @@ impl FlowServiceHandler for FlowServiceOperator {
|
||||
) -> Result<api::v1::flow::FlowResponse> {
|
||||
self.flush_inner(catalog, flow, ctx).await
|
||||
}
|
||||
|
||||
async fn adjust(
|
||||
&self,
|
||||
catalog: &str,
|
||||
flow: &str,
|
||||
min_run_interval_secs: u64,
|
||||
max_filter_num_per_query: usize,
|
||||
ctx: QueryContextRef,
|
||||
) -> Result<api::v1::flow::FlowResponse> {
|
||||
self.adjust_inner(
|
||||
catalog,
|
||||
flow,
|
||||
min_run_interval_secs,
|
||||
max_filter_num_per_query,
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
}
|
||||
}
|
||||
|
||||
impl FlowServiceOperator {
|
||||
async fn adjust_inner(
|
||||
&self,
|
||||
catalog: &str,
|
||||
flow: &str,
|
||||
min_run_interval_secs: u64,
|
||||
max_filter_num_per_query: usize,
|
||||
ctx: QueryContextRef,
|
||||
) -> Result<api::v1::flow::FlowResponse> {
|
||||
let id = self
|
||||
.flow_metadata_manager
|
||||
.flow_name_manager()
|
||||
.get(catalog, flow)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(common_query::error::ExecuteSnafu)?
|
||||
.context(common_meta::error::FlowNotFoundSnafu {
|
||||
flow_name: format!("{}.{}", catalog, flow),
|
||||
})
|
||||
.map_err(BoxedError::new)
|
||||
.context(common_query::error::ExecuteSnafu)?
|
||||
.flow_id();
|
||||
|
||||
let all_flownode_peers = self
|
||||
.flow_metadata_manager
|
||||
.flow_route_manager()
|
||||
.routes(id)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(common_query::error::ExecuteSnafu)?;
|
||||
|
||||
// order of flownodes doesn't matter here
|
||||
let all_flow_nodes = FuturesUnordered::from_iter(
|
||||
all_flownode_peers
|
||||
.iter()
|
||||
.map(|(_key, peer)| self.node_manager.flownode(peer.peer())),
|
||||
)
|
||||
.collect::<Vec<_>>()
|
||||
.await;
|
||||
|
||||
// TODO(discord9): use proper type for flow options
|
||||
let options = json!({
|
||||
"min_run_interval_secs": min_run_interval_secs,
|
||||
"max_filter_num_per_query": max_filter_num_per_query,
|
||||
});
|
||||
|
||||
for node in all_flow_nodes {
|
||||
let _res = {
|
||||
use api::v1::flow::{flow_request, FlowRequest};
|
||||
let flush_req = FlowRequest {
|
||||
header: Some(FlowRequestHeader {
|
||||
tracing_context: TracingContext::from_current_span().to_w3c(),
|
||||
query_context: Some(
|
||||
common_meta::rpc::ddl::QueryContext::from(ctx.clone()).into(),
|
||||
),
|
||||
}),
|
||||
body: Some(flow_request::Body::Adjust(AdjustFlow {
|
||||
flow_id: Some(api::v1::FlowId { id }),
|
||||
options: options.to_string(),
|
||||
})),
|
||||
};
|
||||
node.handle(flush_req)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(common_query::error::ExecuteSnafu)?
|
||||
};
|
||||
}
|
||||
Ok(Default::default())
|
||||
}
|
||||
|
||||
/// Flush the flownodes according to the flow id.
|
||||
async fn flush_inner(
|
||||
&self,
|
||||
|
||||
@@ -78,7 +78,7 @@ pub struct Inserter {
|
||||
catalog_manager: CatalogManagerRef,
|
||||
pub(crate) partition_manager: PartitionRuleManagerRef,
|
||||
pub(crate) node_manager: NodeManagerRef,
|
||||
table_flownode_set_cache: TableFlownodeSetCacheRef,
|
||||
pub(crate) table_flownode_set_cache: TableFlownodeSetCacheRef,
|
||||
}
|
||||
|
||||
pub type InserterRef = Arc<Inserter>;
|
||||
|
||||
@@ -21,7 +21,7 @@ use crate::error::{
|
||||
ValueRequiredForDispatcherRuleSnafu,
|
||||
};
|
||||
use crate::etl::ctx_req::TABLE_SUFFIX_KEY;
|
||||
use crate::Value;
|
||||
use crate::{PipelineMap, Value};
|
||||
|
||||
const FIELD: &str = "field";
|
||||
const PIPELINE: &str = "pipeline";
|
||||
@@ -109,7 +109,7 @@ impl TryFrom<&Yaml> for Dispatcher {
|
||||
|
||||
impl Dispatcher {
|
||||
/// execute dispatcher and returns matched rule if any
|
||||
pub(crate) fn exec(&self, data: &Value) -> Option<&Rule> {
|
||||
pub(crate) fn exec(&self, data: &PipelineMap) -> Option<&Rule> {
|
||||
if let Some(value) = data.get(&self.field) {
|
||||
for rule in &self.rules {
|
||||
if rule.value == *value {
|
||||
@@ -119,7 +119,7 @@ impl Dispatcher {
|
||||
|
||||
None
|
||||
} else {
|
||||
debug!("field {} not found in keys {:?}", &self.field, data);
|
||||
debug!("field {} not found in keys {:?}", &self.field, data.keys());
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
@@ -734,12 +734,6 @@ pub enum Error {
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Top level value must be map"))]
|
||||
ValueMustBeMap {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to build DataFusion logical plan"))]
|
||||
BuildDfLogicalPlan {
|
||||
#[snafu(source)]
|
||||
@@ -815,7 +809,7 @@ impl ErrorExt for Error {
|
||||
PipelineNotFound { .. }
|
||||
| InvalidPipelineVersion { .. }
|
||||
| InvalidCustomTimeIndex { .. } => StatusCode::InvalidArguments,
|
||||
MultiPipelineWithDiffSchema { .. } | ValueMustBeMap { .. } => StatusCode::IllegalState,
|
||||
MultiPipelineWithDiffSchema { .. } => StatusCode::IllegalState,
|
||||
BuildDfLogicalPlan { .. } | RecordBatchLenNotMatch { .. } => StatusCode::Internal,
|
||||
ExecuteInternalStatement { source, .. } => source.status_code(),
|
||||
DataFrame { source, .. } => source.status_code(),
|
||||
|
||||
@@ -19,8 +19,6 @@ pub mod processor;
|
||||
pub mod transform;
|
||||
pub mod value;
|
||||
|
||||
use std::collections::BTreeMap;
|
||||
|
||||
use ahash::{HashMap, HashMapExt};
|
||||
use api::v1::Row;
|
||||
use common_time::timestamp::TimeUnit;
|
||||
@@ -33,7 +31,7 @@ use yaml_rust::YamlLoader;
|
||||
use crate::dispatcher::{Dispatcher, Rule};
|
||||
use crate::error::{
|
||||
AutoTransformOneTimestampSnafu, InputValueMustBeObjectSnafu, IntermediateKeyIndexSnafu, Result,
|
||||
ValueMustBeMapSnafu, YamlLoadSnafu, YamlParseSnafu,
|
||||
YamlLoadSnafu, YamlParseSnafu,
|
||||
};
|
||||
use crate::etl::ctx_req::TABLE_SUFFIX_KEY;
|
||||
use crate::etl::processor::ProcessorKind;
|
||||
@@ -47,6 +45,8 @@ const TRANSFORMS: &str = "transforms";
|
||||
const DISPATCHER: &str = "dispatcher";
|
||||
const TABLESUFFIX: &str = "table_suffix";
|
||||
|
||||
pub type PipelineMap = std::collections::BTreeMap<String, Value>;
|
||||
|
||||
pub enum Content<'a> {
|
||||
Json(&'a str),
|
||||
Yaml(&'a str),
|
||||
@@ -155,7 +155,7 @@ impl DispatchedTo {
|
||||
pub enum PipelineExecOutput {
|
||||
Transformed(TransformedOutput),
|
||||
AutoTransform(AutoTransformOutput),
|
||||
DispatchedTo(DispatchedTo, Value),
|
||||
DispatchedTo(DispatchedTo, PipelineMap),
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
@@ -163,7 +163,7 @@ pub struct TransformedOutput {
|
||||
pub opt: ContextOpt,
|
||||
pub row: Row,
|
||||
pub table_suffix: Option<String>,
|
||||
pub pipeline_map: Value,
|
||||
pub pipeline_map: PipelineMap,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
@@ -171,7 +171,7 @@ pub struct AutoTransformOutput {
|
||||
pub table_suffix: Option<String>,
|
||||
// ts_column_name -> unit
|
||||
pub ts_unit_map: HashMap<String, TimeUnit>,
|
||||
pub pipeline_map: Value,
|
||||
pub pipeline_map: PipelineMap,
|
||||
}
|
||||
|
||||
impl PipelineExecOutput {
|
||||
@@ -197,42 +197,42 @@ impl PipelineExecOutput {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn json_to_map(val: serde_json::Value) -> Result<Value> {
|
||||
pub fn json_to_map(val: serde_json::Value) -> Result<PipelineMap> {
|
||||
match val {
|
||||
serde_json::Value::Object(map) => {
|
||||
let mut intermediate_state = BTreeMap::new();
|
||||
let mut intermediate_state = PipelineMap::new();
|
||||
for (k, v) in map {
|
||||
intermediate_state.insert(k, Value::try_from(v)?);
|
||||
}
|
||||
Ok(Value::Map(intermediate_state.into()))
|
||||
Ok(intermediate_state)
|
||||
}
|
||||
_ => InputValueMustBeObjectSnafu.fail(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn json_array_to_map(val: Vec<serde_json::Value>) -> Result<Vec<Value>> {
|
||||
pub fn json_array_to_map(val: Vec<serde_json::Value>) -> Result<Vec<PipelineMap>> {
|
||||
val.into_iter().map(json_to_map).collect()
|
||||
}
|
||||
|
||||
pub fn simd_json_to_map(val: simd_json::OwnedValue) -> Result<Value> {
|
||||
pub fn simd_json_to_map(val: simd_json::OwnedValue) -> Result<PipelineMap> {
|
||||
match val {
|
||||
simd_json::OwnedValue::Object(map) => {
|
||||
let mut intermediate_state = BTreeMap::new();
|
||||
let mut intermediate_state = PipelineMap::new();
|
||||
for (k, v) in map.into_iter() {
|
||||
intermediate_state.insert(k, Value::try_from(v)?);
|
||||
}
|
||||
Ok(Value::Map(intermediate_state.into()))
|
||||
Ok(intermediate_state)
|
||||
}
|
||||
_ => InputValueMustBeObjectSnafu.fail(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn simd_json_array_to_map(val: Vec<simd_json::OwnedValue>) -> Result<Vec<Value>> {
|
||||
pub fn simd_json_array_to_map(val: Vec<simd_json::OwnedValue>) -> Result<Vec<PipelineMap>> {
|
||||
val.into_iter().map(simd_json_to_map).collect()
|
||||
}
|
||||
|
||||
impl Pipeline {
|
||||
pub fn exec_mut(&self, mut val: Value) -> Result<PipelineExecOutput> {
|
||||
pub fn exec_mut(&self, mut val: PipelineMap) -> Result<PipelineExecOutput> {
|
||||
// process
|
||||
for processor in self.processors.iter() {
|
||||
val = processor.exec_mut(val)?;
|
||||
@@ -263,7 +263,7 @@ impl Pipeline {
|
||||
|
||||
let mut ts_unit_map = HashMap::with_capacity(4);
|
||||
// get all ts values
|
||||
for (k, v) in val.as_map_mut().context(ValueMustBeMapSnafu)? {
|
||||
for (k, v) in val.iter() {
|
||||
if let Value::Timestamp(ts) = v {
|
||||
if !ts_unit_map.contains_key(k) {
|
||||
ts_unit_map.insert(k.clone(), ts.get_unit());
|
||||
@@ -378,9 +378,8 @@ transform:
|
||||
type: timestamp, ns
|
||||
index: time"#;
|
||||
let pipeline: Pipeline = parse(&Content::Yaml(pipeline_str)).unwrap();
|
||||
let mut payload = BTreeMap::new();
|
||||
let mut payload = PipelineMap::new();
|
||||
payload.insert("message".to_string(), Value::String(message));
|
||||
let payload = Value::Map(payload.into());
|
||||
let result = pipeline
|
||||
.exec_mut(payload)
|
||||
.unwrap()
|
||||
|
||||
@@ -18,11 +18,9 @@ use std::sync::Arc;
|
||||
use ahash::{HashMap, HashMapExt};
|
||||
use api::v1::{RowInsertRequest, RowInsertRequests, Rows};
|
||||
use session::context::{QueryContext, QueryContextRef};
|
||||
use snafu::OptionExt;
|
||||
|
||||
use crate::error::{Result, ValueMustBeMapSnafu};
|
||||
use crate::tablesuffix::TableSuffixTemplate;
|
||||
use crate::Value;
|
||||
use crate::PipelineMap;
|
||||
|
||||
const GREPTIME_AUTO_CREATE_TABLE: &str = "greptime_auto_create_table";
|
||||
const GREPTIME_TTL: &str = "greptime_ttl";
|
||||
@@ -73,8 +71,7 @@ pub struct ContextOpt {
|
||||
}
|
||||
|
||||
impl ContextOpt {
|
||||
pub fn from_pipeline_map_to_opt(pipeline_map: &mut Value) -> Result<Self> {
|
||||
let pipeline_map = pipeline_map.as_map_mut().context(ValueMustBeMapSnafu)?;
|
||||
pub fn from_pipeline_map_to_opt(pipeline_map: &mut PipelineMap) -> Self {
|
||||
let mut opt = Self::default();
|
||||
for k in PIPELINE_HINT_KEYS {
|
||||
if let Some(v) = pipeline_map.remove(k) {
|
||||
@@ -104,13 +101,13 @@ impl ContextOpt {
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(opt)
|
||||
opt
|
||||
}
|
||||
|
||||
pub(crate) fn resolve_table_suffix(
|
||||
&mut self,
|
||||
table_suffix: Option<&TableSuffixTemplate>,
|
||||
pipeline_map: &Value,
|
||||
pipeline_map: &PipelineMap,
|
||||
) -> Option<String> {
|
||||
self.table_suffix
|
||||
.take()
|
||||
|
||||
@@ -60,7 +60,7 @@ use crate::etl::processor::json_parse::JsonParseProcessor;
|
||||
use crate::etl::processor::select::SelectProcessor;
|
||||
use crate::etl::processor::simple_extract::SimpleExtractProcessor;
|
||||
use crate::etl::processor::vrl::VrlProcessor;
|
||||
use crate::Value;
|
||||
use crate::etl::PipelineMap;
|
||||
|
||||
const FIELD_NAME: &str = "field";
|
||||
const FIELDS_NAME: &str = "fields";
|
||||
@@ -125,7 +125,7 @@ pub trait Processor: std::fmt::Debug + Send + Sync + 'static {
|
||||
fn ignore_missing(&self) -> bool;
|
||||
|
||||
/// Execute the processor on a vector which be preprocessed by the pipeline
|
||||
fn exec_mut(&self, val: Value) -> Result<Value>;
|
||||
fn exec_mut(&self, val: PipelineMap) -> Result<PipelineMap>;
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
|
||||
@@ -16,8 +16,6 @@
|
||||
//!
|
||||
//! Refer to [`CmcdProcessor`] for more information.
|
||||
|
||||
use std::collections::BTreeMap;
|
||||
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
use urlencoding::decode;
|
||||
|
||||
@@ -32,6 +30,7 @@ use crate::etl::processor::{
|
||||
IGNORE_MISSING_NAME,
|
||||
};
|
||||
use crate::etl::value::Value;
|
||||
use crate::etl::PipelineMap;
|
||||
|
||||
pub(crate) const PROCESSOR_CMCD: &str = "cmcd";
|
||||
|
||||
@@ -160,8 +159,8 @@ impl CmcdProcessor {
|
||||
format!("{}_{}", prefix, key)
|
||||
}
|
||||
|
||||
fn parse(&self, name: &str, value: &str) -> Result<BTreeMap<String, Value>> {
|
||||
let mut working_set = BTreeMap::new();
|
||||
fn parse(&self, name: &str, value: &str) -> Result<PipelineMap> {
|
||||
let mut working_set = PipelineMap::new();
|
||||
|
||||
let parts = value.split(',');
|
||||
|
||||
@@ -250,14 +249,14 @@ impl Processor for CmcdProcessor {
|
||||
self.ignore_missing
|
||||
}
|
||||
|
||||
fn exec_mut(&self, mut val: Value) -> Result<Value> {
|
||||
fn exec_mut(&self, mut val: PipelineMap) -> Result<PipelineMap> {
|
||||
for field in self.fields.iter() {
|
||||
let name = field.input_field();
|
||||
|
||||
match val.get(name) {
|
||||
Some(Value::String(s)) => {
|
||||
let results = self.parse(field.target_or_input_field(), s)?;
|
||||
val.extend(results.into())?;
|
||||
val.extend(results);
|
||||
}
|
||||
Some(Value::Null) | None => {
|
||||
if !self.ignore_missing {
|
||||
@@ -433,7 +432,7 @@ mod tests {
|
||||
let expected = vec
|
||||
.into_iter()
|
||||
.map(|(k, v)| (k.to_string(), v))
|
||||
.collect::<BTreeMap<String, Value>>();
|
||||
.collect::<PipelineMap>();
|
||||
|
||||
let actual = processor.parse("prefix", &decoded).unwrap();
|
||||
assert_eq!(actual, expected);
|
||||
|
||||
@@ -14,8 +14,6 @@
|
||||
|
||||
// Reference: https://www.elastic.co/guide/en/elasticsearch/reference/current/csv-processor.html
|
||||
|
||||
use std::collections::BTreeMap;
|
||||
|
||||
use csv::{ReaderBuilder, Trim};
|
||||
use itertools::EitherOrBoth::{Both, Left, Right};
|
||||
use itertools::Itertools;
|
||||
@@ -31,6 +29,7 @@ use crate::etl::processor::{
|
||||
IGNORE_MISSING_NAME,
|
||||
};
|
||||
use crate::etl::value::Value;
|
||||
use crate::etl::PipelineMap;
|
||||
|
||||
pub(crate) const PROCESSOR_CSV: &str = "csv";
|
||||
|
||||
@@ -60,7 +59,7 @@ pub struct CsvProcessor {
|
||||
|
||||
impl CsvProcessor {
|
||||
// process the csv format string to a map with target_fields as keys
|
||||
fn process(&self, val: &str) -> Result<BTreeMap<String, Value>> {
|
||||
fn process(&self, val: &str) -> Result<PipelineMap> {
|
||||
let mut reader = self.reader.from_reader(val.as_bytes());
|
||||
|
||||
if let Some(result) = reader.records().next() {
|
||||
@@ -190,14 +189,14 @@ impl Processor for CsvProcessor {
|
||||
self.ignore_missing
|
||||
}
|
||||
|
||||
fn exec_mut(&self, mut val: Value) -> Result<Value> {
|
||||
fn exec_mut(&self, mut val: PipelineMap) -> Result<PipelineMap> {
|
||||
for field in self.fields.iter() {
|
||||
let name = field.input_field();
|
||||
|
||||
match val.get(name) {
|
||||
Some(Value::String(v)) => {
|
||||
let results = self.process(v)?;
|
||||
val.extend(results.into())?;
|
||||
val.extend(results);
|
||||
}
|
||||
Some(Value::Null) | None => {
|
||||
if !self.ignore_missing {
|
||||
@@ -240,7 +239,7 @@ mod tests {
|
||||
|
||||
let result = processor.process("1,2").unwrap();
|
||||
|
||||
let values: BTreeMap<String, Value> = [
|
||||
let values: PipelineMap = [
|
||||
("a".into(), Value::String("1".into())),
|
||||
("b".into(), Value::String("2".into())),
|
||||
]
|
||||
@@ -266,7 +265,7 @@ mod tests {
|
||||
|
||||
let result = processor.process("1,2").unwrap();
|
||||
|
||||
let values: BTreeMap<String, Value> = [
|
||||
let values: PipelineMap = [
|
||||
("a".into(), Value::String("1".into())),
|
||||
("b".into(), Value::String("2".into())),
|
||||
("c".into(), Value::Null),
|
||||
@@ -291,7 +290,7 @@ mod tests {
|
||||
|
||||
let result = processor.process("1,2").unwrap();
|
||||
|
||||
let values: BTreeMap<String, Value> = [
|
||||
let values: PipelineMap = [
|
||||
("a".into(), Value::String("1".into())),
|
||||
("b".into(), Value::String("2".into())),
|
||||
("c".into(), Value::String("default".into())),
|
||||
@@ -317,7 +316,7 @@ mod tests {
|
||||
|
||||
let result = processor.process("1,2").unwrap();
|
||||
|
||||
let values: BTreeMap<String, Value> = [
|
||||
let values: PipelineMap = [
|
||||
("a".into(), Value::String("1".into())),
|
||||
("b".into(), Value::String("2".into())),
|
||||
]
|
||||
|
||||
@@ -30,6 +30,7 @@ use crate::etl::processor::{
|
||||
FIELD_NAME, IGNORE_MISSING_NAME,
|
||||
};
|
||||
use crate::etl::value::{Timestamp, Value};
|
||||
use crate::etl::PipelineMap;
|
||||
|
||||
pub(crate) const PROCESSOR_DATE: &str = "date";
|
||||
|
||||
@@ -197,14 +198,14 @@ impl Processor for DateProcessor {
|
||||
self.ignore_missing
|
||||
}
|
||||
|
||||
fn exec_mut(&self, mut val: Value) -> Result<Value> {
|
||||
fn exec_mut(&self, mut val: PipelineMap) -> Result<PipelineMap> {
|
||||
for field in self.fields.iter() {
|
||||
let index = field.input_field();
|
||||
match val.get(index) {
|
||||
Some(Value::String(s)) => {
|
||||
let timestamp = self.parse(s)?;
|
||||
let output_key = field.target_or_input_field();
|
||||
val.insert(output_key.to_string(), Value::Timestamp(timestamp))?;
|
||||
val.insert(output_key.to_string(), Value::Timestamp(timestamp));
|
||||
}
|
||||
Some(Value::Null) | None => {
|
||||
if !self.ignore_missing {
|
||||
|
||||
@@ -30,6 +30,7 @@ use crate::etl::processor::{
|
||||
yaml_bool, yaml_new_field, yaml_new_fields, FIELDS_NAME, FIELD_NAME, IGNORE_MISSING_NAME,
|
||||
};
|
||||
use crate::etl::value::Value;
|
||||
use crate::etl::PipelineMap;
|
||||
|
||||
pub(crate) const PROCESSOR_DECOLORIZE: &str = "decolorize";
|
||||
|
||||
@@ -101,7 +102,7 @@ impl crate::etl::processor::Processor for DecolorizeProcessor {
|
||||
self.ignore_missing
|
||||
}
|
||||
|
||||
fn exec_mut(&self, mut val: Value) -> Result<Value> {
|
||||
fn exec_mut(&self, mut val: PipelineMap) -> Result<PipelineMap> {
|
||||
for field in self.fields.iter() {
|
||||
let index = field.input_field();
|
||||
match val.get(index) {
|
||||
@@ -117,7 +118,7 @@ impl crate::etl::processor::Processor for DecolorizeProcessor {
|
||||
Some(v) => {
|
||||
let result = self.process(v)?;
|
||||
let output_index = field.target_or_input_field();
|
||||
val.insert(output_index.to_string(), result)?;
|
||||
val.insert(output_index.to_string(), result);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -33,6 +33,7 @@ use crate::etl::processor::{
|
||||
yaml_bool, yaml_new_field, yaml_new_fields, FIELDS_NAME, FIELD_NAME, IGNORE_MISSING_NAME,
|
||||
};
|
||||
use crate::etl::value::Value;
|
||||
use crate::etl::PipelineMap;
|
||||
|
||||
pub(crate) const PROCESSOR_DIGEST: &str = "digest";
|
||||
|
||||
@@ -200,7 +201,7 @@ impl crate::etl::processor::Processor for DigestProcessor {
|
||||
self.ignore_missing
|
||||
}
|
||||
|
||||
fn exec_mut(&self, mut val: Value) -> Result<Value> {
|
||||
fn exec_mut(&self, mut val: PipelineMap) -> Result<PipelineMap> {
|
||||
for field in self.fields.iter() {
|
||||
let index = field.input_field();
|
||||
match val.get(index) {
|
||||
@@ -216,7 +217,7 @@ impl crate::etl::processor::Processor for DigestProcessor {
|
||||
Some(v) => {
|
||||
let result = self.process(v)?;
|
||||
let output_index = field.target_or_input_field();
|
||||
val.insert(output_index.to_string(), result)?;
|
||||
val.insert(output_index.to_string(), result);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -31,6 +31,7 @@ use crate::etl::processor::{
|
||||
Processor, FIELDS_NAME, FIELD_NAME, IGNORE_MISSING_NAME, PATTERNS_NAME, PATTERN_NAME,
|
||||
};
|
||||
use crate::etl::value::Value;
|
||||
use crate::etl::PipelineMap;
|
||||
|
||||
pub(crate) const PROCESSOR_DISSECT: &str = "dissect";
|
||||
|
||||
@@ -600,14 +601,14 @@ impl Processor for DissectProcessor {
|
||||
self.ignore_missing
|
||||
}
|
||||
|
||||
fn exec_mut(&self, mut val: Value) -> Result<Value> {
|
||||
fn exec_mut(&self, mut val: PipelineMap) -> Result<PipelineMap> {
|
||||
for field in self.fields.iter() {
|
||||
let index = field.input_field();
|
||||
match val.get(index) {
|
||||
Some(Value::String(val_str)) => {
|
||||
let r = self.process(val_str)?;
|
||||
for (k, v) in r {
|
||||
val.insert(k, v)?;
|
||||
val.insert(k, v);
|
||||
}
|
||||
}
|
||||
Some(Value::Null) | None => {
|
||||
|
||||
@@ -29,6 +29,7 @@ use crate::etl::value::time::{
|
||||
SEC_RESOLUTION, S_RESOLUTION, US_RESOLUTION,
|
||||
};
|
||||
use crate::etl::value::{Timestamp, Value};
|
||||
use crate::etl::PipelineMap;
|
||||
|
||||
pub(crate) const PROCESSOR_EPOCH: &str = "epoch";
|
||||
const RESOLUTION_NAME: &str = "resolution";
|
||||
@@ -166,7 +167,7 @@ impl Processor for EpochProcessor {
|
||||
self.ignore_missing
|
||||
}
|
||||
|
||||
fn exec_mut(&self, mut val: Value) -> Result<Value> {
|
||||
fn exec_mut(&self, mut val: PipelineMap) -> Result<PipelineMap> {
|
||||
for field in self.fields.iter() {
|
||||
let index = field.input_field();
|
||||
match val.get(index) {
|
||||
@@ -182,7 +183,7 @@ impl Processor for EpochProcessor {
|
||||
Some(v) => {
|
||||
let timestamp = self.parse(v)?;
|
||||
let output_index = field.target_or_input_field();
|
||||
val.insert(output_index.to_string(), Value::Timestamp(timestamp))?;
|
||||
val.insert(output_index.to_string(), Value::Timestamp(timestamp));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -25,6 +25,7 @@ use crate::etl::processor::{
|
||||
IGNORE_MISSING_NAME, PATTERN_NAME,
|
||||
};
|
||||
use crate::etl::value::Value;
|
||||
use crate::etl::PipelineMap;
|
||||
|
||||
pub(crate) const PROCESSOR_GSUB: &str = "gsub";
|
||||
|
||||
@@ -117,7 +118,7 @@ impl crate::etl::processor::Processor for GsubProcessor {
|
||||
self.ignore_missing
|
||||
}
|
||||
|
||||
fn exec_mut(&self, mut val: Value) -> Result<Value> {
|
||||
fn exec_mut(&self, mut val: PipelineMap) -> Result<PipelineMap> {
|
||||
for field in self.fields.iter() {
|
||||
let index = field.input_field();
|
||||
match val.get(index) {
|
||||
@@ -133,7 +134,7 @@ impl crate::etl::processor::Processor for GsubProcessor {
|
||||
Some(v) => {
|
||||
let result = self.process(v)?;
|
||||
let output_index = field.target_or_input_field();
|
||||
val.insert(output_index.to_string(), result)?;
|
||||
val.insert(output_index.to_string(), result);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -24,6 +24,7 @@ use crate::etl::processor::{
|
||||
IGNORE_MISSING_NAME, SEPARATOR_NAME,
|
||||
};
|
||||
use crate::etl::value::{Array, Value};
|
||||
use crate::etl::PipelineMap;
|
||||
|
||||
pub(crate) const PROCESSOR_JOIN: &str = "join";
|
||||
|
||||
@@ -94,14 +95,14 @@ impl Processor for JoinProcessor {
|
||||
self.ignore_missing
|
||||
}
|
||||
|
||||
fn exec_mut(&self, mut val: Value) -> Result<Value> {
|
||||
fn exec_mut(&self, mut val: PipelineMap) -> Result<PipelineMap> {
|
||||
for field in self.fields.iter() {
|
||||
let index = field.input_field();
|
||||
match val.get(index) {
|
||||
Some(Value::Array(arr)) => {
|
||||
let result = self.process(arr)?;
|
||||
let output_index = field.target_or_input_field();
|
||||
val.insert(output_index.to_string(), result)?;
|
||||
val.insert(output_index.to_string(), result);
|
||||
}
|
||||
Some(Value::Null) | None => {
|
||||
if !self.ignore_missing {
|
||||
|
||||
@@ -22,7 +22,7 @@ use crate::etl::field::Fields;
|
||||
use crate::etl::processor::{
|
||||
yaml_bool, yaml_new_field, yaml_new_fields, FIELDS_NAME, FIELD_NAME, IGNORE_MISSING_NAME,
|
||||
};
|
||||
use crate::{json_to_map, Processor, Value};
|
||||
use crate::{json_to_map, PipelineMap, Processor, Value};
|
||||
|
||||
pub(crate) const PROCESSOR_JSON_PARSE: &str = "json_parse";
|
||||
|
||||
@@ -77,7 +77,7 @@ impl JsonParseProcessor {
|
||||
};
|
||||
let parsed: serde_json::Value = serde_json::from_str(json_str).context(JsonParseSnafu)?;
|
||||
match parsed {
|
||||
serde_json::Value::Object(_) => Ok(json_to_map(parsed)?),
|
||||
serde_json::Value::Object(_) => Ok(Value::Map(json_to_map(parsed)?.into())),
|
||||
serde_json::Value::Array(arr) => Ok(Value::Array(arr.try_into()?)),
|
||||
_ => ProcessorUnsupportedValueSnafu {
|
||||
processor: self.kind(),
|
||||
@@ -97,14 +97,14 @@ impl Processor for JsonParseProcessor {
|
||||
self.ignore_missing
|
||||
}
|
||||
|
||||
fn exec_mut(&self, mut val: Value) -> Result<Value> {
|
||||
fn exec_mut(&self, mut val: PipelineMap) -> Result<PipelineMap> {
|
||||
for field in self.fields.iter() {
|
||||
let index = field.input_field();
|
||||
match val.get(index) {
|
||||
Some(v) => {
|
||||
let processed = self.process_field(v)?;
|
||||
let output_index = field.target_or_input_field();
|
||||
val.insert(output_index.to_string(), processed)?;
|
||||
val.insert(output_index.to_string(), processed);
|
||||
}
|
||||
None => {
|
||||
if !self.ignore_missing {
|
||||
|
||||
@@ -21,8 +21,8 @@ use crate::error::{
|
||||
};
|
||||
use crate::etl::field::Fields;
|
||||
use crate::etl::processor::{
|
||||
yaml_bool, yaml_new_field, yaml_new_fields, yaml_string, Processor, FIELDS_NAME, FIELD_NAME,
|
||||
IGNORE_MISSING_NAME, JSON_PATH_NAME, JSON_PATH_RESULT_INDEX_NAME,
|
||||
yaml_bool, yaml_new_field, yaml_new_fields, yaml_string, PipelineMap, Processor, FIELDS_NAME,
|
||||
FIELD_NAME, IGNORE_MISSING_NAME, JSON_PATH_NAME, JSON_PATH_RESULT_INDEX_NAME,
|
||||
};
|
||||
use crate::Value;
|
||||
|
||||
@@ -125,14 +125,14 @@ impl Processor for JsonPathProcessor {
|
||||
self.ignore_missing
|
||||
}
|
||||
|
||||
fn exec_mut(&self, mut val: Value) -> Result<Value> {
|
||||
fn exec_mut(&self, mut val: PipelineMap) -> Result<PipelineMap> {
|
||||
for field in self.fields.iter() {
|
||||
let index = field.input_field();
|
||||
match val.get(index) {
|
||||
Some(v) => {
|
||||
let processed = self.process_field(v)?;
|
||||
let output_index = field.target_or_input_field();
|
||||
val.insert(output_index.to_string(), processed)?;
|
||||
val.insert(output_index.to_string(), processed);
|
||||
}
|
||||
None => {
|
||||
if !self.ignore_missing {
|
||||
|
||||
@@ -24,6 +24,7 @@ use crate::etl::processor::{
|
||||
IGNORE_MISSING_NAME, METHOD_NAME,
|
||||
};
|
||||
use crate::etl::value::Value;
|
||||
use crate::etl::PipelineMap;
|
||||
|
||||
pub(crate) const PROCESSOR_LETTER: &str = "letter";
|
||||
|
||||
@@ -125,14 +126,14 @@ impl Processor for LetterProcessor {
|
||||
self.ignore_missing
|
||||
}
|
||||
|
||||
fn exec_mut(&self, mut val: Value) -> Result<Value> {
|
||||
fn exec_mut(&self, mut val: PipelineMap) -> Result<PipelineMap> {
|
||||
for field in self.fields.iter() {
|
||||
let index = field.input_field();
|
||||
match val.get(index) {
|
||||
Some(Value::String(s)) => {
|
||||
let result = self.process_field(s)?;
|
||||
let output_key = field.target_or_input_field();
|
||||
val.insert(output_key.to_string(), result)?;
|
||||
val.insert(output_key.to_string(), result);
|
||||
}
|
||||
Some(Value::Null) | None => {
|
||||
if !self.ignore_missing {
|
||||
|
||||
@@ -18,8 +18,6 @@ const PATTERNS_NAME: &str = "patterns";
|
||||
|
||||
pub(crate) const PROCESSOR_REGEX: &str = "regex";
|
||||
|
||||
use std::collections::BTreeMap;
|
||||
|
||||
use lazy_static::lazy_static;
|
||||
use regex::Regex;
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
@@ -35,6 +33,7 @@ use crate::etl::processor::{
|
||||
FIELD_NAME, IGNORE_MISSING_NAME, PATTERN_NAME,
|
||||
};
|
||||
use crate::etl::value::Value;
|
||||
use crate::etl::PipelineMap;
|
||||
|
||||
lazy_static! {
|
||||
static ref GROUPS_NAME_REGEX: Regex = Regex::new(r"\(\?P?<([[:word:]]+)>.+?\)").unwrap();
|
||||
@@ -168,8 +167,8 @@ impl RegexProcessor {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn process(&self, prefix: &str, val: &str) -> Result<BTreeMap<String, Value>> {
|
||||
let mut result = BTreeMap::new();
|
||||
fn process(&self, prefix: &str, val: &str) -> Result<PipelineMap> {
|
||||
let mut result = PipelineMap::new();
|
||||
for gr in self.patterns.iter() {
|
||||
if let Some(captures) = gr.regex.captures(val) {
|
||||
for group in gr.groups.iter() {
|
||||
@@ -193,14 +192,14 @@ impl Processor for RegexProcessor {
|
||||
self.ignore_missing
|
||||
}
|
||||
|
||||
fn exec_mut(&self, mut val: Value) -> Result<Value> {
|
||||
fn exec_mut(&self, mut val: PipelineMap) -> Result<PipelineMap> {
|
||||
for field in self.fields.iter() {
|
||||
let index = field.input_field();
|
||||
let prefix = field.target_or_input_field();
|
||||
match val.get(index) {
|
||||
Some(Value::String(s)) => {
|
||||
let result = self.process(prefix, s)?;
|
||||
val.extend(result.into())?;
|
||||
val.extend(result);
|
||||
}
|
||||
Some(Value::Null) | None => {
|
||||
if !self.ignore_missing {
|
||||
@@ -270,7 +269,7 @@ ignore_missing: false"#;
|
||||
let cw = "[c=w,n=US_CA_SANJOSE,o=55155]";
|
||||
let breadcrumbs_str = [cc, cg, co, cp, cw].iter().join(",");
|
||||
|
||||
let temporary_map: BTreeMap<String, Value> = [
|
||||
let temporary_map: PipelineMap = [
|
||||
("breadcrumbs_parent", Value::String(cc.to_string())),
|
||||
("breadcrumbs_edge", Value::String(cg.to_string())),
|
||||
("breadcrumbs_origin", Value::String(co.to_string())),
|
||||
|
||||
@@ -15,14 +15,12 @@
|
||||
use ahash::{HashSet, HashSetExt};
|
||||
use snafu::OptionExt;
|
||||
|
||||
use crate::error::{
|
||||
Error, KeyMustBeStringSnafu, ProcessorUnsupportedValueSnafu, Result, ValueMustBeMapSnafu,
|
||||
};
|
||||
use crate::error::{Error, KeyMustBeStringSnafu, ProcessorUnsupportedValueSnafu, Result};
|
||||
use crate::etl::field::Fields;
|
||||
use crate::etl::processor::{
|
||||
yaml_new_field, yaml_new_fields, yaml_string, FIELDS_NAME, FIELD_NAME, TYPE_NAME,
|
||||
};
|
||||
use crate::{Processor, Value};
|
||||
use crate::{PipelineMap, Processor};
|
||||
|
||||
pub(crate) const PROCESSOR_SELECT: &str = "select";
|
||||
const INCLUDE_KEY: &str = "include";
|
||||
@@ -98,29 +96,27 @@ impl Processor for SelectProcessor {
|
||||
true
|
||||
}
|
||||
|
||||
fn exec_mut(&self, mut val: Value) -> Result<Value> {
|
||||
let v_map = val.as_map_mut().context(ValueMustBeMapSnafu)?;
|
||||
|
||||
fn exec_mut(&self, mut val: PipelineMap) -> Result<PipelineMap> {
|
||||
match self.select_type {
|
||||
SelectType::Include => {
|
||||
let mut include_key_set = HashSet::with_capacity(v_map.len());
|
||||
let mut include_key_set = HashSet::with_capacity(val.len());
|
||||
for field in self.fields.iter() {
|
||||
// If the field has a target, move the value to the target
|
||||
let field_name = field.input_field();
|
||||
if let Some(target_name) = field.target_field() {
|
||||
if let Some(v) = v_map.remove(field_name) {
|
||||
v_map.insert(target_name.to_string(), v);
|
||||
if let Some(v) = val.remove(field_name) {
|
||||
val.insert(target_name.to_string(), v);
|
||||
}
|
||||
include_key_set.insert(target_name);
|
||||
} else {
|
||||
include_key_set.insert(field_name);
|
||||
}
|
||||
}
|
||||
v_map.retain(|k, _| include_key_set.contains(k.as_str()));
|
||||
val.retain(|k, _| include_key_set.contains(k.as_str()));
|
||||
}
|
||||
SelectType::Exclude => {
|
||||
for field in self.fields.iter() {
|
||||
v_map.remove(field.input_field());
|
||||
val.remove(field.input_field());
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -131,11 +127,9 @@ impl Processor for SelectProcessor {
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use std::collections::BTreeMap;
|
||||
|
||||
use crate::etl::field::{Field, Fields};
|
||||
use crate::etl::processor::select::{SelectProcessor, SelectType};
|
||||
use crate::{Map, Processor, Value};
|
||||
use crate::{PipelineMap, Processor, Value};
|
||||
|
||||
#[test]
|
||||
fn test_select() {
|
||||
@@ -144,14 +138,13 @@ mod test {
|
||||
select_type: SelectType::Include,
|
||||
};
|
||||
|
||||
let mut p = BTreeMap::new();
|
||||
let mut p = PipelineMap::new();
|
||||
p.insert("hello".to_string(), Value::String("world".to_string()));
|
||||
p.insert("hello2".to_string(), Value::String("world2".to_string()));
|
||||
|
||||
let result = processor.exec_mut(Value::Map(Map { values: p }));
|
||||
let result = processor.exec_mut(p);
|
||||
assert!(result.is_ok());
|
||||
let mut result = result.unwrap();
|
||||
let p = result.as_map_mut().unwrap();
|
||||
let p = result.unwrap();
|
||||
assert_eq!(p.len(), 1);
|
||||
assert_eq!(p.get("hello"), Some(&Value::String("world".to_string())));
|
||||
}
|
||||
@@ -163,14 +156,13 @@ mod test {
|
||||
select_type: SelectType::Include,
|
||||
};
|
||||
|
||||
let mut p = BTreeMap::new();
|
||||
let mut p = PipelineMap::new();
|
||||
p.insert("hello".to_string(), Value::String("world".to_string()));
|
||||
p.insert("hello2".to_string(), Value::String("world2".to_string()));
|
||||
|
||||
let result = processor.exec_mut(Value::Map(Map { values: p }));
|
||||
let result = processor.exec_mut(p);
|
||||
assert!(result.is_ok());
|
||||
let mut result = result.unwrap();
|
||||
let p = result.as_map_mut().unwrap();
|
||||
let p = result.unwrap();
|
||||
assert_eq!(p.len(), 1);
|
||||
assert_eq!(p.get("hello3"), Some(&Value::String("world".to_string())));
|
||||
}
|
||||
@@ -182,14 +174,13 @@ mod test {
|
||||
select_type: SelectType::Exclude,
|
||||
};
|
||||
|
||||
let mut p = BTreeMap::new();
|
||||
let mut p = PipelineMap::new();
|
||||
p.insert("hello".to_string(), Value::String("world".to_string()));
|
||||
p.insert("hello2".to_string(), Value::String("world2".to_string()));
|
||||
|
||||
let result = processor.exec_mut(Value::Map(Map { values: p }));
|
||||
let result = processor.exec_mut(p);
|
||||
assert!(result.is_ok());
|
||||
let mut result = result.unwrap();
|
||||
let p = result.as_map_mut().unwrap();
|
||||
let p = result.unwrap();
|
||||
assert_eq!(p.len(), 1);
|
||||
assert_eq!(p.get("hello"), None);
|
||||
assert_eq!(p.get("hello2"), Some(&Value::String("world2".to_string())));
|
||||
|
||||
@@ -20,7 +20,7 @@ use crate::etl::processor::{
|
||||
yaml_bool, yaml_new_field, yaml_new_fields, yaml_string, FIELDS_NAME, FIELD_NAME,
|
||||
IGNORE_MISSING_NAME, KEY_NAME,
|
||||
};
|
||||
use crate::{Processor, Value};
|
||||
use crate::{PipelineMap, Processor, Value};
|
||||
|
||||
pub(crate) const PROCESSOR_SIMPLE_EXTRACT: &str = "simple_extract";
|
||||
|
||||
@@ -98,14 +98,14 @@ impl Processor for SimpleExtractProcessor {
|
||||
self.ignore_missing
|
||||
}
|
||||
|
||||
fn exec_mut(&self, mut val: Value) -> Result<Value> {
|
||||
fn exec_mut(&self, mut val: PipelineMap) -> Result<PipelineMap> {
|
||||
for field in self.fields.iter() {
|
||||
let index = field.input_field();
|
||||
match val.get(index) {
|
||||
Some(v) => {
|
||||
let processed = self.process_field(v)?;
|
||||
let output_index = field.target_or_input_field();
|
||||
val.insert(output_index.to_string(), processed)?;
|
||||
val.insert(output_index.to_string(), processed);
|
||||
}
|
||||
None => {
|
||||
if !self.ignore_missing {
|
||||
|
||||
@@ -36,6 +36,7 @@ use crate::etl::value::time::{
|
||||
SEC_RESOLUTION, S_RESOLUTION, US_RESOLUTION,
|
||||
};
|
||||
use crate::etl::value::{Timestamp, Value};
|
||||
use crate::etl::PipelineMap;
|
||||
|
||||
pub(crate) const PROCESSOR_TIMESTAMP: &str = "timestamp";
|
||||
const RESOLUTION_NAME: &str = "resolution";
|
||||
@@ -301,7 +302,7 @@ impl Processor for TimestampProcessor {
|
||||
self.ignore_missing
|
||||
}
|
||||
|
||||
fn exec_mut(&self, mut val: Value) -> Result<Value> {
|
||||
fn exec_mut(&self, mut val: PipelineMap) -> Result<PipelineMap> {
|
||||
for field in self.fields.iter() {
|
||||
let index = field.input_field();
|
||||
match val.get(index) {
|
||||
@@ -317,7 +318,7 @@ impl Processor for TimestampProcessor {
|
||||
Some(v) => {
|
||||
let result = self.parse(v)?;
|
||||
let output_key = field.target_or_input_field();
|
||||
val.insert(output_key.to_string(), Value::Timestamp(result))?;
|
||||
val.insert(output_key.to_string(), Value::Timestamp(result));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -25,6 +25,7 @@ use crate::etl::processor::{
|
||||
IGNORE_MISSING_NAME, METHOD_NAME,
|
||||
};
|
||||
use crate::etl::value::Value;
|
||||
use crate::PipelineMap;
|
||||
|
||||
pub(crate) const PROCESSOR_URL_ENCODING: &str = "urlencoding";
|
||||
|
||||
@@ -125,14 +126,14 @@ impl crate::etl::processor::Processor for UrlEncodingProcessor {
|
||||
self.ignore_missing
|
||||
}
|
||||
|
||||
fn exec_mut(&self, mut val: Value) -> Result<Value> {
|
||||
fn exec_mut(&self, mut val: PipelineMap) -> Result<PipelineMap> {
|
||||
for field in self.fields.iter() {
|
||||
let index = field.input_field();
|
||||
match val.get(index) {
|
||||
Some(Value::String(s)) => {
|
||||
let result = self.process_field(s)?;
|
||||
let output_index = field.target_or_input_field();
|
||||
val.insert(output_index.to_string(), result)?;
|
||||
val.insert(output_index.to_string(), result);
|
||||
}
|
||||
Some(Value::Null) | None => {
|
||||
if !self.ignore_missing {
|
||||
|
||||
@@ -27,7 +27,7 @@ use crate::error::{
|
||||
InvalidTimestampSnafu, KeyMustBeStringSnafu, Result, VrlRegexValueSnafu, VrlReturnValueSnafu,
|
||||
};
|
||||
use crate::etl::processor::yaml_string;
|
||||
use crate::Value as PipelineValue;
|
||||
use crate::{PipelineMap, Value as PipelineValue};
|
||||
|
||||
pub(crate) const PROCESSOR_VRL: &str = "vrl";
|
||||
const SOURCE: &str = "source";
|
||||
@@ -62,11 +62,14 @@ impl VrlProcessor {
|
||||
Ok(Self { source, program })
|
||||
}
|
||||
|
||||
pub fn resolve(&self, m: PipelineValue) -> Result<PipelineValue> {
|
||||
let pipeline_vrl = pipeline_value_to_vrl_value(m)?;
|
||||
pub fn resolve(&self, m: PipelineMap) -> Result<PipelineValue> {
|
||||
let pipeline_vrl = m
|
||||
.into_iter()
|
||||
.map(|(k, v)| pipeline_value_to_vrl_value(v).map(|v| (KeyString::from(k), v)))
|
||||
.collect::<Result<BTreeMap<_, _>>>()?;
|
||||
|
||||
let mut target = TargetValue {
|
||||
value: pipeline_vrl,
|
||||
value: VrlValue::Object(pipeline_vrl),
|
||||
metadata: VrlValue::Object(BTreeMap::new()),
|
||||
secrets: Secrets::default(),
|
||||
};
|
||||
@@ -113,11 +116,11 @@ impl crate::etl::processor::Processor for VrlProcessor {
|
||||
true
|
||||
}
|
||||
|
||||
fn exec_mut(&self, val: PipelineValue) -> Result<PipelineValue> {
|
||||
fn exec_mut(&self, val: PipelineMap) -> Result<PipelineMap> {
|
||||
let val = self.resolve(val)?;
|
||||
|
||||
if let PipelineValue::Map(m) = val {
|
||||
Ok(PipelineValue::Map(m.values.into()))
|
||||
Ok(m.values)
|
||||
} else {
|
||||
VrlRegexValueSnafu.fail()
|
||||
}
|
||||
@@ -241,19 +244,19 @@ del(.user_info)
|
||||
assert!(v.is_ok());
|
||||
let v = v.unwrap();
|
||||
|
||||
let mut n = BTreeMap::new();
|
||||
let mut n = PipelineMap::new();
|
||||
n.insert(
|
||||
"name".to_string(),
|
||||
PipelineValue::String("certain_name".to_string()),
|
||||
);
|
||||
|
||||
let mut m = BTreeMap::new();
|
||||
let mut m = PipelineMap::new();
|
||||
m.insert(
|
||||
"user_info".to_string(),
|
||||
PipelineValue::Map(Map { values: n }),
|
||||
);
|
||||
|
||||
let re = v.resolve(PipelineValue::Map(Map { values: m }));
|
||||
let re = v.resolve(m);
|
||||
assert!(re.is_ok());
|
||||
let re = re.unwrap();
|
||||
|
||||
|
||||
@@ -14,7 +14,7 @@
|
||||
|
||||
pub mod coerce;
|
||||
|
||||
use std::collections::{BTreeMap, HashSet};
|
||||
use std::collections::HashSet;
|
||||
use std::sync::Arc;
|
||||
|
||||
use ahash::{HashMap, HashMapExt};
|
||||
@@ -29,19 +29,19 @@ use itertools::Itertools;
|
||||
use once_cell::sync::OnceCell;
|
||||
use serde_json::Number;
|
||||
use session::context::Channel;
|
||||
use snafu::OptionExt;
|
||||
|
||||
use crate::error::{
|
||||
IdentifyPipelineColumnTypeMismatchSnafu, ReachedMaxNestedLevelsSnafu, Result,
|
||||
TransformColumnNameMustBeUniqueSnafu, TransformMultipleTimestampIndexSnafu,
|
||||
TransformTimestampIndexCountSnafu, UnsupportedNumberTypeSnafu, ValueMustBeMapSnafu,
|
||||
TransformTimestampIndexCountSnafu, UnsupportedNumberTypeSnafu,
|
||||
};
|
||||
use crate::etl::ctx_req::ContextOpt;
|
||||
use crate::etl::field::{Field, Fields};
|
||||
use crate::etl::transform::index::Index;
|
||||
use crate::etl::transform::{Transform, Transforms};
|
||||
use crate::etl::value::{Timestamp, Value};
|
||||
use crate::{Map, PipelineContext};
|
||||
use crate::etl::PipelineMap;
|
||||
use crate::PipelineContext;
|
||||
|
||||
const DEFAULT_GREPTIME_TIMESTAMP_COLUMN: &str = "greptime_timestamp";
|
||||
const DEFAULT_MAX_NESTED_LEVELS_FOR_JSON_FLATTENING: usize = 10;
|
||||
@@ -186,8 +186,8 @@ impl GreptimeTransformer {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn transform_mut(&self, pipeline_map: &mut Value) -> Result<(ContextOpt, Row)> {
|
||||
let opt = ContextOpt::from_pipeline_map_to_opt(pipeline_map)?;
|
||||
pub fn transform_mut(&self, pipeline_map: &mut PipelineMap) -> Result<(ContextOpt, Row)> {
|
||||
let opt = ContextOpt::from_pipeline_map_to_opt(pipeline_map);
|
||||
|
||||
let mut values = vec![GreptimeValue { value_data: None }; self.schema.len()];
|
||||
let mut output_index = 0;
|
||||
@@ -337,7 +337,7 @@ fn resolve_number_schema(
|
||||
)
|
||||
}
|
||||
|
||||
fn calc_ts(p_ctx: &PipelineContext, values: &Value) -> Result<Option<ValueData>> {
|
||||
fn calc_ts(p_ctx: &PipelineContext, values: &PipelineMap) -> Result<Option<ValueData>> {
|
||||
match p_ctx.channel {
|
||||
Channel::Prometheus => Ok(Some(ValueData::TimestampMillisecondValue(
|
||||
values
|
||||
@@ -362,7 +362,7 @@ fn calc_ts(p_ctx: &PipelineContext, values: &Value) -> Result<Option<ValueData>>
|
||||
|
||||
fn values_to_row(
|
||||
schema_info: &mut SchemaInfo,
|
||||
values: Value,
|
||||
values: PipelineMap,
|
||||
pipeline_ctx: &PipelineContext<'_>,
|
||||
) -> Result<Row> {
|
||||
let mut row: Vec<GreptimeValue> = Vec::with_capacity(schema_info.schema.len());
|
||||
@@ -382,8 +382,6 @@ fn values_to_row(
|
||||
.as_ref()
|
||||
.map_or(DEFAULT_GREPTIME_TIMESTAMP_COLUMN, |ts| ts.get_column_name());
|
||||
|
||||
let values = values.into_map().context(ValueMustBeMapSnafu)?;
|
||||
|
||||
for (column_name, value) in values {
|
||||
if column_name == ts_column_name {
|
||||
continue;
|
||||
@@ -520,7 +518,7 @@ fn resolve_value(
|
||||
}
|
||||
|
||||
fn identity_pipeline_inner(
|
||||
pipeline_maps: Vec<Value>,
|
||||
pipeline_maps: Vec<PipelineMap>,
|
||||
pipeline_ctx: &PipelineContext<'_>,
|
||||
) -> Result<(SchemaInfo, HashMap<ContextOpt, Vec<Row>>)> {
|
||||
let mut schema_info = SchemaInfo::default();
|
||||
@@ -547,7 +545,7 @@ fn identity_pipeline_inner(
|
||||
let len = pipeline_maps.len();
|
||||
|
||||
for mut pipeline_map in pipeline_maps {
|
||||
let opt = ContextOpt::from_pipeline_map_to_opt(&mut pipeline_map)?;
|
||||
let opt = ContextOpt::from_pipeline_map_to_opt(&mut pipeline_map);
|
||||
let row = values_to_row(&mut schema_info, pipeline_map, pipeline_ctx)?;
|
||||
|
||||
opt_map
|
||||
@@ -578,7 +576,7 @@ fn identity_pipeline_inner(
|
||||
/// 4. The pipeline will return an error if the same column datatype is mismatched
|
||||
/// 5. The pipeline will analyze the schema of each json record and merge them to get the final schema.
|
||||
pub fn identity_pipeline(
|
||||
array: Vec<Value>,
|
||||
array: Vec<PipelineMap>,
|
||||
table: Option<Arc<table::Table>>,
|
||||
pipeline_ctx: &PipelineContext<'_>,
|
||||
) -> Result<HashMap<ContextOpt, Rows>> {
|
||||
@@ -586,7 +584,7 @@ pub fn identity_pipeline(
|
||||
array
|
||||
.into_iter()
|
||||
.map(|item| flatten_object(item, DEFAULT_MAX_NESTED_LEVELS_FOR_JSON_FLATTENING))
|
||||
.collect::<Result<Vec<Value>>>()?
|
||||
.collect::<Result<Vec<PipelineMap>>>()?
|
||||
} else {
|
||||
array
|
||||
};
|
||||
@@ -620,22 +618,21 @@ pub fn identity_pipeline(
|
||||
///
|
||||
/// The `max_nested_levels` parameter is used to limit the nested levels of the JSON object.
|
||||
/// The error will be returned if the nested levels is greater than the `max_nested_levels`.
|
||||
pub fn flatten_object(object: Value, max_nested_levels: usize) -> Result<Value> {
|
||||
let mut flattened = BTreeMap::new();
|
||||
let object = object.into_map().context(ValueMustBeMapSnafu)?;
|
||||
pub fn flatten_object(object: PipelineMap, max_nested_levels: usize) -> Result<PipelineMap> {
|
||||
let mut flattened = PipelineMap::new();
|
||||
|
||||
if !object.is_empty() {
|
||||
// it will use recursion to flatten the object.
|
||||
do_flatten_object(&mut flattened, None, object, 1, max_nested_levels)?;
|
||||
}
|
||||
|
||||
Ok(Value::Map(Map { values: flattened }))
|
||||
Ok(flattened)
|
||||
}
|
||||
|
||||
fn do_flatten_object(
|
||||
dest: &mut BTreeMap<String, Value>,
|
||||
dest: &mut PipelineMap,
|
||||
base: Option<&str>,
|
||||
object: BTreeMap<String, Value>,
|
||||
object: PipelineMap,
|
||||
current_level: usize,
|
||||
max_nested_levels: usize,
|
||||
) -> Result<()> {
|
||||
|
||||
@@ -16,7 +16,6 @@ pub mod array;
|
||||
pub mod map;
|
||||
pub mod time;
|
||||
|
||||
use std::collections::BTreeMap;
|
||||
use std::result::Result as StdResult;
|
||||
|
||||
pub use array::Array;
|
||||
@@ -31,16 +30,15 @@ pub use time::Timestamp;
|
||||
|
||||
use crate::error::{
|
||||
Error, Result, UnsupportedNumberTypeSnafu, ValueDefaultValueUnsupportedSnafu,
|
||||
ValueInvalidResolutionSnafu, ValueMustBeMapSnafu, ValueParseBooleanSnafu, ValueParseFloatSnafu,
|
||||
ValueParseIntSnafu, ValueParseTypeSnafu, ValueUnsupportedYamlTypeSnafu,
|
||||
ValueYamlKeyMustBeStringSnafu,
|
||||
ValueInvalidResolutionSnafu, ValueParseBooleanSnafu, ValueParseFloatSnafu, ValueParseIntSnafu,
|
||||
ValueParseTypeSnafu, ValueUnsupportedYamlTypeSnafu, ValueYamlKeyMustBeStringSnafu,
|
||||
};
|
||||
|
||||
pub type PipelineMap = Value;
|
||||
use crate::etl::PipelineMap;
|
||||
|
||||
/// Value can be used as type
|
||||
/// acts as value: the enclosed value is the actual value
|
||||
/// acts as type: the enclosed value is the default value
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Default)]
|
||||
pub enum Value {
|
||||
// as value: null
|
||||
@@ -72,47 +70,6 @@ pub enum Value {
|
||||
}
|
||||
|
||||
impl Value {
|
||||
pub fn get(&self, key: &str) -> Option<&Self> {
|
||||
match self {
|
||||
Value::Map(map) => map.get(key),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_mut(&mut self, key: &str) -> Option<&mut Self> {
|
||||
match self {
|
||||
Value::Map(map) => map.get_mut(key),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn remove(&mut self, key: &str) -> Option<Value> {
|
||||
match self {
|
||||
Value::Map(map) => map.remove(key),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn extend(&mut self, other: Map) -> Result<()> {
|
||||
match self {
|
||||
Value::Map(map) => {
|
||||
map.extend(other);
|
||||
Ok(())
|
||||
}
|
||||
_ => ValueMustBeMapSnafu.fail(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn insert(&mut self, key: String, value: Value) -> Result<()> {
|
||||
match self {
|
||||
Value::Map(map) => {
|
||||
map.insert(key, value);
|
||||
Ok(())
|
||||
}
|
||||
_ => ValueMustBeMapSnafu.fail(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn is_null(&self) -> bool {
|
||||
matches!(self, Value::Null)
|
||||
}
|
||||
@@ -279,6 +236,13 @@ impl Value {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get(&self, key: &str) -> Option<&Self> {
|
||||
match self {
|
||||
Value::Map(map) => map.get(key),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn as_str(&self) -> Option<&str> {
|
||||
match self {
|
||||
Value::String(v) => Some(v),
|
||||
@@ -325,20 +289,6 @@ impl Value {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn as_map_mut(&mut self) -> Option<&mut BTreeMap<String, Self>> {
|
||||
match self {
|
||||
Value::Map(map) => Some(map),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn into_map(self) -> Option<BTreeMap<String, Self>> {
|
||||
match self {
|
||||
Value::Map(map) => Some(map.values),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
// ref https://github.com/serde-rs/json/blob/master/src/value/mod.rs#L779
|
||||
pub fn pointer(&self, pointer: &str) -> Option<&Value> {
|
||||
if pointer.is_empty() {
|
||||
@@ -438,7 +388,7 @@ impl TryFrom<simd_json::value::OwnedValue> for Value {
|
||||
Ok(Value::Array(Array { values: re }))
|
||||
}
|
||||
simd_json::OwnedValue::Object(map) => {
|
||||
let mut values = BTreeMap::new();
|
||||
let mut values = PipelineMap::new();
|
||||
for (k, v) in map.into_iter() {
|
||||
values.insert(k, Value::try_from(v)?);
|
||||
}
|
||||
@@ -475,7 +425,7 @@ impl TryFrom<serde_json::Value> for Value {
|
||||
Ok(Value::Array(Array { values }))
|
||||
}
|
||||
serde_json::Value::Object(v) => {
|
||||
let mut values = BTreeMap::new();
|
||||
let mut values = PipelineMap::new();
|
||||
for (k, v) in v {
|
||||
values.insert(k, Value::try_from(v)?);
|
||||
}
|
||||
@@ -506,7 +456,7 @@ impl TryFrom<&yaml_rust::Yaml> for Value {
|
||||
Ok(Value::Array(Array { values }))
|
||||
}
|
||||
yaml_rust::Yaml::Hash(v) => {
|
||||
let mut values = BTreeMap::new();
|
||||
let mut values = PipelineMap::new();
|
||||
for (k, v) in v {
|
||||
let key = k
|
||||
.as_str()
|
||||
|
||||
@@ -12,13 +12,12 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::collections::BTreeMap;
|
||||
|
||||
use crate::etl::value::Value;
|
||||
use crate::PipelineMap;
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Default)]
|
||||
pub struct Map {
|
||||
pub values: BTreeMap<String, Value>,
|
||||
pub values: PipelineMap,
|
||||
}
|
||||
|
||||
impl Map {
|
||||
@@ -37,14 +36,14 @@ impl Map {
|
||||
}
|
||||
}
|
||||
|
||||
impl From<BTreeMap<String, Value>> for Map {
|
||||
fn from(values: BTreeMap<String, Value>) -> Self {
|
||||
impl From<PipelineMap> for Map {
|
||||
fn from(values: PipelineMap) -> Self {
|
||||
Self { values }
|
||||
}
|
||||
}
|
||||
|
||||
impl std::ops::Deref for Map {
|
||||
type Target = BTreeMap<String, Value>;
|
||||
type Target = PipelineMap;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self.values
|
||||
|
||||
@@ -27,7 +27,8 @@ pub use etl::transform::GreptimeTransformer;
|
||||
pub use etl::value::{Array, Map, Value};
|
||||
pub use etl::{
|
||||
json_array_to_map, json_to_map, parse, simd_json_array_to_map, simd_json_to_map,
|
||||
AutoTransformOutput, Content, DispatchedTo, Pipeline, PipelineExecOutput, TransformedOutput,
|
||||
AutoTransformOutput, Content, DispatchedTo, Pipeline, PipelineExecOutput, PipelineMap,
|
||||
TransformedOutput,
|
||||
};
|
||||
pub use manager::{
|
||||
pipeline_operator, table, util, IdentityTimeIndex, PipelineContext, PipelineDefinition,
|
||||
|
||||
@@ -34,9 +34,6 @@ const PIPELINES_CACHE_TTL: Duration = Duration::from_secs(10);
|
||||
pub(crate) struct PipelineCache {
|
||||
pipelines: Cache<String, Arc<Pipeline>>,
|
||||
original_pipelines: Cache<String, (String, TimestampNanosecond)>,
|
||||
/// If the pipeline table is invalid, we can use this cache to prevent failures when writing logs through the pipeline
|
||||
/// The failover cache never expires, but it will be updated when the pipelines cache is updated.
|
||||
failover_cache: Cache<String, (String, TimestampNanosecond)>,
|
||||
}
|
||||
|
||||
impl PipelineCache {
|
||||
@@ -50,7 +47,6 @@ impl PipelineCache {
|
||||
.max_capacity(PIPELINES_CACHE_SIZE)
|
||||
.time_to_live(PIPELINES_CACHE_TTL)
|
||||
.build(),
|
||||
failover_cache: Cache::builder().max_capacity(PIPELINES_CACHE_SIZE).build(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -67,7 +63,7 @@ impl PipelineCache {
|
||||
schema,
|
||||
name,
|
||||
version,
|
||||
pipeline.clone(),
|
||||
pipeline,
|
||||
with_latest,
|
||||
);
|
||||
}
|
||||
@@ -85,14 +81,6 @@ impl PipelineCache {
|
||||
schema,
|
||||
name,
|
||||
version,
|
||||
pipeline.clone(),
|
||||
with_latest,
|
||||
);
|
||||
insert_cache_generic(
|
||||
&self.failover_cache,
|
||||
schema,
|
||||
name,
|
||||
version,
|
||||
pipeline,
|
||||
with_latest,
|
||||
);
|
||||
@@ -107,15 +95,6 @@ impl PipelineCache {
|
||||
get_cache_generic(&self.pipelines, schema, name, version)
|
||||
}
|
||||
|
||||
pub(crate) fn get_failover_cache(
|
||||
&self,
|
||||
schema: &str,
|
||||
name: &str,
|
||||
version: PipelineVersion,
|
||||
) -> Result<Option<(String, TimestampNanosecond)>> {
|
||||
get_cache_generic(&self.failover_cache, schema, name, version)
|
||||
}
|
||||
|
||||
pub(crate) fn get_pipeline_str_cache(
|
||||
&self,
|
||||
schema: &str,
|
||||
@@ -146,7 +125,6 @@ impl PipelineCache {
|
||||
let k = k.as_str();
|
||||
self.pipelines.remove(k);
|
||||
self.original_pipelines.remove(k);
|
||||
self.failover_cache.remove(k);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -39,14 +39,13 @@ use table::metadata::TableInfo;
|
||||
use table::TableRef;
|
||||
|
||||
use crate::error::{
|
||||
BuildDfLogicalPlanSnafu, CastTypeSnafu, CollectRecordsSnafu, DataFrameSnafu, Error,
|
||||
BuildDfLogicalPlanSnafu, CastTypeSnafu, CollectRecordsSnafu, DataFrameSnafu,
|
||||
ExecuteInternalStatementSnafu, InsertPipelineSnafu, InvalidPipelineVersionSnafu,
|
||||
MultiPipelineWithDiffSchemaSnafu, PipelineNotFoundSnafu, RecordBatchLenNotMatchSnafu, Result,
|
||||
};
|
||||
use crate::etl::{parse, Content, Pipeline};
|
||||
use crate::manager::pipeline_cache::PipelineCache;
|
||||
use crate::manager::{PipelineInfo, PipelineVersion};
|
||||
use crate::metrics::METRIC_PIPELINE_TABLE_FIND_COUNT;
|
||||
use crate::util::prepare_dataframe_conditions;
|
||||
|
||||
pub(crate) const PIPELINE_TABLE_NAME: &str = "pipelines";
|
||||
@@ -284,34 +283,7 @@ impl PipelineTable {
|
||||
return Ok(pipeline);
|
||||
}
|
||||
|
||||
let mut pipeline_vec;
|
||||
match self.find_pipeline(name, version).await {
|
||||
Ok(p) => {
|
||||
METRIC_PIPELINE_TABLE_FIND_COUNT
|
||||
.with_label_values(&["true"])
|
||||
.inc();
|
||||
pipeline_vec = p;
|
||||
}
|
||||
Err(e) => {
|
||||
match e {
|
||||
Error::CollectRecords { .. } => {
|
||||
// if collect records failed, it means the pipeline table is temporary invalid
|
||||
// we should use failover cache
|
||||
METRIC_PIPELINE_TABLE_FIND_COUNT
|
||||
.with_label_values(&["false"])
|
||||
.inc();
|
||||
return self
|
||||
.cache
|
||||
.get_failover_cache(schema, name, version)?
|
||||
.ok_or(PipelineNotFoundSnafu { name, version }.build());
|
||||
}
|
||||
_ => {
|
||||
// if other error, we should return it
|
||||
return Err(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
let mut pipeline_vec = self.find_pipeline(name, version).await?;
|
||||
ensure!(
|
||||
!pipeline_vec.is_empty(),
|
||||
PipelineNotFoundSnafu { name, version }
|
||||
|
||||
@@ -13,7 +13,7 @@
|
||||
// limitations under the License.
|
||||
|
||||
use lazy_static::lazy_static;
|
||||
use prometheus::{register_counter_vec, register_histogram_vec, CounterVec, HistogramVec};
|
||||
use prometheus::{register_histogram_vec, HistogramVec};
|
||||
|
||||
lazy_static! {
|
||||
pub static ref METRIC_PIPELINE_CREATE_HISTOGRAM: HistogramVec = register_histogram_vec!(
|
||||
@@ -34,10 +34,4 @@ lazy_static! {
|
||||
&["success"]
|
||||
)
|
||||
.unwrap();
|
||||
pub static ref METRIC_PIPELINE_TABLE_FIND_COUNT: CounterVec = register_counter_vec!(
|
||||
"greptime_pipeline_table_find_count",
|
||||
"Count of pipeline table find operations",
|
||||
&["success"]
|
||||
)
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
@@ -20,7 +20,7 @@ use yaml_rust::Yaml;
|
||||
use crate::error::{
|
||||
Error, InvalidTableSuffixTemplateSnafu, RequiredTableSuffixTemplateSnafu, Result,
|
||||
};
|
||||
use crate::Value;
|
||||
use crate::{PipelineMap, Value};
|
||||
|
||||
const REPLACE_KEY: &str = "{}";
|
||||
|
||||
@@ -47,7 +47,7 @@ pub(crate) struct TableSuffixTemplate {
|
||||
}
|
||||
|
||||
impl TableSuffixTemplate {
|
||||
pub fn apply(&self, val: &Value) -> Option<String> {
|
||||
pub fn apply(&self, val: &PipelineMap) -> Option<String> {
|
||||
let values = self
|
||||
.keys
|
||||
.iter()
|
||||
|
||||
@@ -13,21 +13,23 @@
|
||||
// limitations under the License.
|
||||
|
||||
use clap::Parser;
|
||||
use cli::{BenchTableMetadataCommand, DataCommand, MetadataCommand, Tool};
|
||||
use cli::{BenchTableMetadataCommand, DataCommand, MetaCommand, Tool};
|
||||
use common_error::ext::BoxedError;
|
||||
|
||||
#[derive(Parser)]
|
||||
pub enum SubCommand {
|
||||
// Attach(AttachCommand),
|
||||
Bench(BenchTableMetadataCommand),
|
||||
#[clap(subcommand)]
|
||||
Data(DataCommand),
|
||||
#[clap(subcommand)]
|
||||
Meta(MetadataCommand),
|
||||
Meta(MetaCommand),
|
||||
}
|
||||
|
||||
impl SubCommand {
|
||||
pub async fn build(&self) -> std::result::Result<Box<dyn Tool>, BoxedError> {
|
||||
match self {
|
||||
// SubCommand::Attach(cmd) => cmd.build().await,
|
||||
SubCommand::Bench(cmd) => cmd.build().await,
|
||||
SubCommand::Data(cmd) => cmd.build().await,
|
||||
SubCommand::Meta(cmd) => cmd.build().await,
|
||||
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user