mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-22 22:20:02 +00:00
Compare commits
1 Commits
v0.11.0-ni
...
bench_hydr
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
4b2f274971 |
@@ -23,7 +23,7 @@ use paste::paste;
|
||||
use crate::{Builder, JoinHandle, Runtime};
|
||||
|
||||
const READ_WORKERS: usize = 8;
|
||||
const WRITE_WORKERS: usize = 8;
|
||||
const WRITE_WORKERS: usize = 16;
|
||||
const BG_WORKERS: usize = 8;
|
||||
|
||||
pub fn create_runtime(runtime_name: &str, thread_name: &str, worker_threads: usize) -> Runtime {
|
||||
|
||||
@@ -24,6 +24,7 @@ pub mod standalone;
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::v1::RowInsertRequests;
|
||||
use api::v1::meta::Role;
|
||||
use async_trait::async_trait;
|
||||
use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
|
||||
@@ -75,6 +76,7 @@ use sql::statements::copy::CopyTable;
|
||||
use sql::statements::statement::Statement;
|
||||
use sqlparser::ast::ObjectName;
|
||||
pub use standalone::StandaloneDatanodeManager;
|
||||
use tokio::sync::mpsc::UnboundedSender;
|
||||
|
||||
use crate::error::{
|
||||
self, Error, ExecLogicalPlanSnafu, ExecutePromqlSnafu, ExternalSnafu, ParseSqlSnafu,
|
||||
@@ -119,6 +121,7 @@ pub struct Instance {
|
||||
inserter: InserterRef,
|
||||
deleter: DeleterRef,
|
||||
export_metrics_task: Option<ExportMetricsTask>,
|
||||
proxy_sender: Option<UnboundedSender<RowInsertRequests>>
|
||||
}
|
||||
|
||||
impl Instance {
|
||||
|
||||
@@ -15,6 +15,8 @@
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::v1::greptime_database_client::GreptimeDatabaseClient;
|
||||
use api::v1::GreptimeRequest;
|
||||
use catalog::kvbackend::KvBackendCatalogManager;
|
||||
use common_base::Plugins;
|
||||
use common_meta::cache_invalidator::{CacheInvalidatorRef, DummyCacheInvalidator};
|
||||
@@ -27,6 +29,7 @@ use operator::statement::StatementExecutor;
|
||||
use operator::table::TableMutationOperator;
|
||||
use partition::manager::PartitionRuleManager;
|
||||
use query::QueryEngineFactory;
|
||||
use tokio::sync::mpsc::unbounded_channel;
|
||||
|
||||
use crate::error::Result;
|
||||
use crate::heartbeat::HeartbeatTask;
|
||||
@@ -134,6 +137,20 @@ impl FrontendBuilder {
|
||||
|
||||
plugins.insert::<StatementExecutorRef>(statement_executor.clone());
|
||||
|
||||
let addr = std::env::var("FLOW_ADDR").unwrap_or("http://[::1]:14514".to_string());
|
||||
let (send, mut recv) = unbounded_channel();
|
||||
tokio::spawn(async move {
|
||||
let conn = tonic::transport::Endpoint::new(addr)
|
||||
.unwrap()
|
||||
.connect_lazy();
|
||||
let mut client = GreptimeDatabaseClient::new(conn);
|
||||
let mut outer_request = GreptimeRequest::default();
|
||||
while let Some(request) = recv.recv().await {
|
||||
outer_request.request =
|
||||
Some(api::v1::greptime_request::Request::RowInserts(request));
|
||||
let _response = client.handle(outer_request.clone()).await.unwrap();
|
||||
}
|
||||
});
|
||||
Ok(Instance {
|
||||
catalog_manager,
|
||||
script_executor,
|
||||
@@ -145,6 +162,7 @@ impl FrontendBuilder {
|
||||
inserter,
|
||||
deleter,
|
||||
export_metrics_task: None,
|
||||
proxy_sender: Some(send),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -12,6 +12,8 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::time::Instant;
|
||||
|
||||
use api::v1::ddl_request::{Expr as DdlExpr, Expr};
|
||||
use api::v1::greptime_request::Request;
|
||||
use api::v1::query_request::Query;
|
||||
@@ -20,6 +22,7 @@ use async_trait::async_trait;
|
||||
use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
|
||||
use common_meta::table_name::TableName;
|
||||
use common_query::Output;
|
||||
use common_telemetry::info;
|
||||
use query::parser::PromQuery;
|
||||
use servers::interceptor::{GrpcQueryInterceptor, GrpcQueryInterceptorRef};
|
||||
use servers::query_handler::grpc::GrpcQueryHandler;
|
||||
@@ -189,6 +192,8 @@ impl Instance {
|
||||
requests: RowInsertRequests,
|
||||
ctx: QueryContextRef,
|
||||
) -> Result<Output> {
|
||||
self.proxy_sender.as_ref().map(|s|s.send(requests.clone()));
|
||||
|
||||
self.inserter
|
||||
.handle_row_inserts(requests, ctx, self.statement_executor.as_ref())
|
||||
.await
|
||||
|
||||
Reference in New Issue
Block a user