mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-22 22:20:02 +00:00
Compare commits
4 Commits
feat/bulk-
...
bench_flow
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
5ce678fbf4 | ||
|
|
2e101bad53 | ||
|
|
7d4009e51f | ||
|
|
1a1c093ad7 |
@@ -44,6 +44,7 @@ pub struct FrontendOptions {
|
||||
pub logging: LoggingOptions,
|
||||
pub datanode: DatanodeOptions,
|
||||
pub user_provider: Option<String>,
|
||||
pub flow_grpc_addr: Option<String>,
|
||||
}
|
||||
|
||||
impl Default for FrontendOptions {
|
||||
@@ -64,6 +65,7 @@ impl Default for FrontendOptions {
|
||||
logging: LoggingOptions::default(),
|
||||
datanode: DatanodeOptions::default(),
|
||||
user_provider: None,
|
||||
flow_grpc_addr: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -24,6 +24,7 @@ pub mod standalone;
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::v1::greptime_database_client::GreptimeDatabaseClient;
|
||||
use api::v1::meta::Role;
|
||||
use async_trait::async_trait;
|
||||
use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
|
||||
@@ -82,6 +83,8 @@ use sql::statements::copy::CopyTable;
|
||||
use sql::statements::statement::Statement;
|
||||
use sqlparser::ast::ObjectName;
|
||||
pub use standalone::StandaloneDatanodeManager;
|
||||
use tokio::sync::Mutex;
|
||||
use tonic::transport::Channel;
|
||||
|
||||
use self::region_query::FrontendRegionQueryHandler;
|
||||
use self::standalone::StandaloneTableMetadataCreator;
|
||||
@@ -117,6 +120,12 @@ pub trait FrontendInstance:
|
||||
pub type FrontendInstanceRef = Arc<dyn FrontendInstance>;
|
||||
pub type StatementExecutorRef = Arc<StatementExecutor>;
|
||||
|
||||
/// Send certain query to flow worker through grpc
|
||||
pub struct FlowProxy {
|
||||
flow_client: Mutex<GreptimeDatabaseClient<Channel>>,
|
||||
}
|
||||
pub type FlowProxyRef = Arc<FlowProxy>;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct Instance {
|
||||
catalog_manager: CatalogManagerRef,
|
||||
@@ -128,6 +137,7 @@ pub struct Instance {
|
||||
heartbeat_task: Option<HeartbeatTask>,
|
||||
inserter: InserterRef,
|
||||
deleter: DeleterRef,
|
||||
flow: Option<FlowProxyRef>,
|
||||
}
|
||||
|
||||
impl Instance {
|
||||
@@ -219,6 +229,7 @@ impl Instance {
|
||||
heartbeat_task,
|
||||
inserter,
|
||||
deleter,
|
||||
flow: None,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -345,6 +356,11 @@ impl Instance {
|
||||
inserter.clone(),
|
||||
));
|
||||
|
||||
let addr = std::env::var("FLOW_ADDR").unwrap_or("http://[::1]:14514".to_string());
|
||||
let conn = tonic::transport::Endpoint::new(addr)
|
||||
.unwrap()
|
||||
.connect_lazy();
|
||||
let client = GreptimeDatabaseClient::new(conn);
|
||||
Ok(Instance {
|
||||
catalog_manager: catalog_manager.clone(),
|
||||
script_executor,
|
||||
@@ -355,6 +371,12 @@ impl Instance {
|
||||
heartbeat_task: None,
|
||||
inserter,
|
||||
deleter,
|
||||
flow: Some(
|
||||
FlowProxy {
|
||||
flow_client: Mutex::new(client),
|
||||
}
|
||||
.into(),
|
||||
),
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
@@ -15,11 +15,14 @@
|
||||
use api::v1::ddl_request::{Expr as DdlExpr, Expr};
|
||||
use api::v1::greptime_request::Request;
|
||||
use api::v1::query_request::Query;
|
||||
use api::v1::{DeleteRequests, InsertRequests, RowDeleteRequests, RowInsertRequests};
|
||||
use api::v1::{
|
||||
DeleteRequests, GreptimeRequest, InsertRequests, RowDeleteRequests, RowInsertRequests,
|
||||
};
|
||||
use async_trait::async_trait;
|
||||
use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
|
||||
use common_meta::table_name::TableName;
|
||||
use common_query::Output;
|
||||
use common_telemetry::info;
|
||||
use query::parser::PromQuery;
|
||||
use servers::interceptor::{GrpcQueryInterceptor, GrpcQueryInterceptorRef};
|
||||
use servers::query_handler::grpc::GrpcQueryHandler;
|
||||
@@ -47,6 +50,27 @@ impl GrpcQueryHandler for Instance {
|
||||
.as_ref()
|
||||
.check_permission(ctx.current_user(), PermissionReq::GrpcRequest(&request))
|
||||
.context(PermissionSnafu)?;
|
||||
/*
|
||||
// copy row inserts to flow worker
|
||||
if let Request::RowInserts(_) = &request {
|
||||
let full_req = GreptimeRequest {
|
||||
header: None,
|
||||
request: Some(request.clone()),
|
||||
};
|
||||
|
||||
if let Some(flow_proxy) = &self.flow {
|
||||
flow_proxy
|
||||
.flow_client
|
||||
.lock()
|
||||
.await
|
||||
.handle(full_req)
|
||||
.await
|
||||
.unwrap();
|
||||
} else {
|
||||
info!("flow proxy is not initialized");
|
||||
}
|
||||
};
|
||||
let output = Output::AffectedRows(0);*/
|
||||
|
||||
let output = match request {
|
||||
Request::Inserts(requests) => self.handle_inserts(requests, ctx.clone()).await?,
|
||||
@@ -187,10 +211,24 @@ impl Instance {
|
||||
requests: RowInsertRequests,
|
||||
ctx: QueryContextRef,
|
||||
) -> Result<Output> {
|
||||
let full_req = GreptimeRequest {
|
||||
header: None,
|
||||
request: Some(Request::RowInserts(requests.clone())),
|
||||
};
|
||||
|
||||
if let Some(flow_proxy) = &self.flow {
|
||||
let mut client = flow_proxy.flow_client.lock().await.clone();
|
||||
client.handle(full_req).await.unwrap();
|
||||
} else {
|
||||
info!("flow proxy is not initialized");
|
||||
}
|
||||
Ok(Output::AffectedRows(0))
|
||||
/*
|
||||
self.inserter
|
||||
.handle_row_inserts(requests, ctx, self.statement_executor.as_ref())
|
||||
.await
|
||||
.context(TableOperationSnafu)
|
||||
*/
|
||||
}
|
||||
|
||||
pub async fn handle_deletes(
|
||||
|
||||
Reference in New Issue
Block a user