mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-25 15:40:02 +00:00
Compare commits
4 Commits
v1.0.0-bet
...
bench_flow
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
5ce678fbf4 | ||
|
|
2e101bad53 | ||
|
|
7d4009e51f | ||
|
|
1a1c093ad7 |
@@ -44,6 +44,7 @@ pub struct FrontendOptions {
|
|||||||
pub logging: LoggingOptions,
|
pub logging: LoggingOptions,
|
||||||
pub datanode: DatanodeOptions,
|
pub datanode: DatanodeOptions,
|
||||||
pub user_provider: Option<String>,
|
pub user_provider: Option<String>,
|
||||||
|
pub flow_grpc_addr: Option<String>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Default for FrontendOptions {
|
impl Default for FrontendOptions {
|
||||||
@@ -64,6 +65,7 @@ impl Default for FrontendOptions {
|
|||||||
logging: LoggingOptions::default(),
|
logging: LoggingOptions::default(),
|
||||||
datanode: DatanodeOptions::default(),
|
datanode: DatanodeOptions::default(),
|
||||||
user_provider: None,
|
user_provider: None,
|
||||||
|
flow_grpc_addr: None,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -24,6 +24,7 @@ pub mod standalone;
|
|||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use api::v1::greptime_database_client::GreptimeDatabaseClient;
|
||||||
use api::v1::meta::Role;
|
use api::v1::meta::Role;
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
|
use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
|
||||||
@@ -82,6 +83,8 @@ use sql::statements::copy::CopyTable;
|
|||||||
use sql::statements::statement::Statement;
|
use sql::statements::statement::Statement;
|
||||||
use sqlparser::ast::ObjectName;
|
use sqlparser::ast::ObjectName;
|
||||||
pub use standalone::StandaloneDatanodeManager;
|
pub use standalone::StandaloneDatanodeManager;
|
||||||
|
use tokio::sync::Mutex;
|
||||||
|
use tonic::transport::Channel;
|
||||||
|
|
||||||
use self::region_query::FrontendRegionQueryHandler;
|
use self::region_query::FrontendRegionQueryHandler;
|
||||||
use self::standalone::StandaloneTableMetadataCreator;
|
use self::standalone::StandaloneTableMetadataCreator;
|
||||||
@@ -117,6 +120,12 @@ pub trait FrontendInstance:
|
|||||||
pub type FrontendInstanceRef = Arc<dyn FrontendInstance>;
|
pub type FrontendInstanceRef = Arc<dyn FrontendInstance>;
|
||||||
pub type StatementExecutorRef = Arc<StatementExecutor>;
|
pub type StatementExecutorRef = Arc<StatementExecutor>;
|
||||||
|
|
||||||
|
/// Send certain query to flow worker through grpc
|
||||||
|
pub struct FlowProxy {
|
||||||
|
flow_client: Mutex<GreptimeDatabaseClient<Channel>>,
|
||||||
|
}
|
||||||
|
pub type FlowProxyRef = Arc<FlowProxy>;
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct Instance {
|
pub struct Instance {
|
||||||
catalog_manager: CatalogManagerRef,
|
catalog_manager: CatalogManagerRef,
|
||||||
@@ -128,6 +137,7 @@ pub struct Instance {
|
|||||||
heartbeat_task: Option<HeartbeatTask>,
|
heartbeat_task: Option<HeartbeatTask>,
|
||||||
inserter: InserterRef,
|
inserter: InserterRef,
|
||||||
deleter: DeleterRef,
|
deleter: DeleterRef,
|
||||||
|
flow: Option<FlowProxyRef>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Instance {
|
impl Instance {
|
||||||
@@ -219,6 +229,7 @@ impl Instance {
|
|||||||
heartbeat_task,
|
heartbeat_task,
|
||||||
inserter,
|
inserter,
|
||||||
deleter,
|
deleter,
|
||||||
|
flow: None,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -345,6 +356,11 @@ impl Instance {
|
|||||||
inserter.clone(),
|
inserter.clone(),
|
||||||
));
|
));
|
||||||
|
|
||||||
|
let addr = std::env::var("FLOW_ADDR").unwrap_or("http://[::1]:14514".to_string());
|
||||||
|
let conn = tonic::transport::Endpoint::new(addr)
|
||||||
|
.unwrap()
|
||||||
|
.connect_lazy();
|
||||||
|
let client = GreptimeDatabaseClient::new(conn);
|
||||||
Ok(Instance {
|
Ok(Instance {
|
||||||
catalog_manager: catalog_manager.clone(),
|
catalog_manager: catalog_manager.clone(),
|
||||||
script_executor,
|
script_executor,
|
||||||
@@ -355,6 +371,12 @@ impl Instance {
|
|||||||
heartbeat_task: None,
|
heartbeat_task: None,
|
||||||
inserter,
|
inserter,
|
||||||
deleter,
|
deleter,
|
||||||
|
flow: Some(
|
||||||
|
FlowProxy {
|
||||||
|
flow_client: Mutex::new(client),
|
||||||
|
}
|
||||||
|
.into(),
|
||||||
|
),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -15,11 +15,14 @@
|
|||||||
use api::v1::ddl_request::{Expr as DdlExpr, Expr};
|
use api::v1::ddl_request::{Expr as DdlExpr, Expr};
|
||||||
use api::v1::greptime_request::Request;
|
use api::v1::greptime_request::Request;
|
||||||
use api::v1::query_request::Query;
|
use api::v1::query_request::Query;
|
||||||
use api::v1::{DeleteRequests, InsertRequests, RowDeleteRequests, RowInsertRequests};
|
use api::v1::{
|
||||||
|
DeleteRequests, GreptimeRequest, InsertRequests, RowDeleteRequests, RowInsertRequests,
|
||||||
|
};
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
|
use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
|
||||||
use common_meta::table_name::TableName;
|
use common_meta::table_name::TableName;
|
||||||
use common_query::Output;
|
use common_query::Output;
|
||||||
|
use common_telemetry::info;
|
||||||
use query::parser::PromQuery;
|
use query::parser::PromQuery;
|
||||||
use servers::interceptor::{GrpcQueryInterceptor, GrpcQueryInterceptorRef};
|
use servers::interceptor::{GrpcQueryInterceptor, GrpcQueryInterceptorRef};
|
||||||
use servers::query_handler::grpc::GrpcQueryHandler;
|
use servers::query_handler::grpc::GrpcQueryHandler;
|
||||||
@@ -47,6 +50,27 @@ impl GrpcQueryHandler for Instance {
|
|||||||
.as_ref()
|
.as_ref()
|
||||||
.check_permission(ctx.current_user(), PermissionReq::GrpcRequest(&request))
|
.check_permission(ctx.current_user(), PermissionReq::GrpcRequest(&request))
|
||||||
.context(PermissionSnafu)?;
|
.context(PermissionSnafu)?;
|
||||||
|
/*
|
||||||
|
// copy row inserts to flow worker
|
||||||
|
if let Request::RowInserts(_) = &request {
|
||||||
|
let full_req = GreptimeRequest {
|
||||||
|
header: None,
|
||||||
|
request: Some(request.clone()),
|
||||||
|
};
|
||||||
|
|
||||||
|
if let Some(flow_proxy) = &self.flow {
|
||||||
|
flow_proxy
|
||||||
|
.flow_client
|
||||||
|
.lock()
|
||||||
|
.await
|
||||||
|
.handle(full_req)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
} else {
|
||||||
|
info!("flow proxy is not initialized");
|
||||||
|
}
|
||||||
|
};
|
||||||
|
let output = Output::AffectedRows(0);*/
|
||||||
|
|
||||||
let output = match request {
|
let output = match request {
|
||||||
Request::Inserts(requests) => self.handle_inserts(requests, ctx.clone()).await?,
|
Request::Inserts(requests) => self.handle_inserts(requests, ctx.clone()).await?,
|
||||||
@@ -187,10 +211,24 @@ impl Instance {
|
|||||||
requests: RowInsertRequests,
|
requests: RowInsertRequests,
|
||||||
ctx: QueryContextRef,
|
ctx: QueryContextRef,
|
||||||
) -> Result<Output> {
|
) -> Result<Output> {
|
||||||
|
let full_req = GreptimeRequest {
|
||||||
|
header: None,
|
||||||
|
request: Some(Request::RowInserts(requests.clone())),
|
||||||
|
};
|
||||||
|
|
||||||
|
if let Some(flow_proxy) = &self.flow {
|
||||||
|
let mut client = flow_proxy.flow_client.lock().await.clone();
|
||||||
|
client.handle(full_req).await.unwrap();
|
||||||
|
} else {
|
||||||
|
info!("flow proxy is not initialized");
|
||||||
|
}
|
||||||
|
Ok(Output::AffectedRows(0))
|
||||||
|
/*
|
||||||
self.inserter
|
self.inserter
|
||||||
.handle_row_inserts(requests, ctx, self.statement_executor.as_ref())
|
.handle_row_inserts(requests, ctx, self.statement_executor.as_ref())
|
||||||
.await
|
.await
|
||||||
.context(TableOperationSnafu)
|
.context(TableOperationSnafu)
|
||||||
|
*/
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn handle_deletes(
|
pub async fn handle_deletes(
|
||||||
|
|||||||
Reference in New Issue
Block a user