tests(WIP): get demo working

This commit is contained in:
discord9
2024-05-07 21:24:41 +08:00
parent d7942a1a00
commit e1a8215394
9 changed files with 101 additions and 39 deletions

View File

@@ -413,6 +413,7 @@ impl StartCommand {
Default::default(),
fe_plugins.clone(),
table_metadata_manager.clone(),
catalog_manager.clone(),
)
.with_kv_backend(kv_backend.clone());
let flownode = Arc::new(flow_builder.build().await);
@@ -473,6 +474,7 @@ impl StartCommand {
flownode
.set_frontend_invoker(Box::new(frontend.clone()))
.await;
let _handle = flownode.clone().run_background();
let servers = Services::new(fe_opts.clone(), Arc::new(frontend.clone()), fe_plugins)
.build()

View File

@@ -19,6 +19,7 @@ use std::collections::{BTreeMap, BTreeSet, HashMap, VecDeque};
use std::sync::Arc;
use api::v1::{RowDeleteRequest, RowDeleteRequests, RowInsertRequest, RowInsertRequests};
use catalog::kvbackend::KvBackendCatalogManager;
use catalog::memory::MemoryCatalogManager;
use common_base::Plugins;
use common_error::ext::BoxedError;
@@ -27,6 +28,7 @@ use common_meta::key::table_info::{TableInfoManager, TableInfoValue};
use common_meta::key::table_name::{TableNameKey, TableNameManager};
use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
use common_meta::kv_backend::KvBackendRef;
use common_runtime::JoinHandle;
use common_telemetry::info;
use greptime_proto::v1;
use hydroflow::scheduled::graph::Hydroflow;
@@ -87,6 +89,7 @@ pub struct FlownodeBuilder {
plugins: Plugins,
kv_backend: Option<KvBackendRef>,
table_meta: TableMetadataManagerRef,
catalog_manager: Arc<KvBackendCatalogManager>,
}
impl FlownodeBuilder {
@@ -95,12 +98,14 @@ impl FlownodeBuilder {
opts: FlownodeOptions,
plugins: Plugins,
table_meta: TableMetadataManagerRef,
catalog_manager: Arc<KvBackendCatalogManager>,
) -> Self {
Self {
opts,
plugins,
kv_backend: None,
table_meta,
catalog_manager,
}
}
@@ -116,7 +121,7 @@ impl FlownodeBuilder {
pub async fn build(self) -> FlownodeManager {
let query_engine_factory = QueryEngineFactory::new_with_plugins(
// query engine in flownode only translate plan with resolved table source.
MemoryCatalogManager::with_default_setup(),
self.catalog_manager.clone(),
None,
None,
None,
@@ -182,11 +187,19 @@ pub struct FlownodeManager {
}
impl FlownodeManager {
/// run in common_runtime background runtime
pub fn run_background(self: Arc<Self>) -> JoinHandle<()> {
info!("Starting flownode manager task");
common_runtime::spawn_bg(async move {
self.run().await;
})
}
/// Trigger dataflow running, and then send writeback request to the source sender
///
/// note that this method didn't handle input mirror request, as this should be handled by grpc server
pub async fn run(&self) {
loop {
info!("Trigger one run of flownode");
self.run_available().await;
// TODO(discord9): error handling
let _ = self.send_writeback_requests().await;
@@ -195,7 +208,10 @@ impl FlownodeManager {
}
/// set frontend invoker
pub async fn set_frontend_invoker(&self, frontend: Box<dyn FrontendInvoker + Send + Sync>) {
pub async fn set_frontend_invoker(
self: &Arc<Self>,
frontend: Box<dyn FrontendInvoker + Send + Sync>,
) {
*self.frontend_invoker.write().await = Some(frontend);
}
@@ -318,13 +334,24 @@ impl TableInfoSource {
.map(|v| v.into_inner()))
}
pub async fn get_table_schema(&self, table_id: &TableId) -> Result<RelationType, Error> {
pub async fn get_table_name_schema(
&self,
table_id: &TableId,
) -> Result<(TableName, RelationType), Error> {
let table_info_value = self
.get_table_info_value(table_id)
.await?
.with_context(|| TableNotFoundSnafu {
name: format!("TableId = {:?}, Can't found table info", table_id),
})?;
let table_name = table_info_value.table_name();
let table_name = vec![
table_name.catalog_name,
table_name.schema_name,
table_name.table_name,
];
let raw_schema = table_info_value.table_info.meta.schema;
let column_types = raw_schema
.column_schemas
@@ -339,11 +366,14 @@ impl TableInfoSource {
let keys = vec![repr::Key::from(key)];
let time_index = raw_schema.timestamp_index;
Ok(RelationType {
column_types,
keys,
time_index,
})
Ok((
table_name,
RelationType {
column_types,
keys,
time_index,
},
))
}
}
@@ -432,7 +462,11 @@ impl FlownodeManager {
let schema = table_info.table_info.meta.schema.column_schemas;
let proto_schema = column_schemas_to_proto(schema, &primary_keys)?;
info!(
"Sending {} writeback requests for table {}",
reqs.len(),
table_name.join(".")
);
for req in reqs {
match req {
DiffRequest::Insert(insert) => {
@@ -552,13 +586,16 @@ impl FlownodeManager {
for source in source_table_ids {
node_ctx
.assign_global_id_to_table(&self.table_info_source, None, Some(*source))
.await;
.await?;
}
node_ctx
.assign_global_id_to_table(&self.table_info_source, Some(sink_table_name.clone()), None)
.await;
.await?;
node_ctx.register_task_src_sink(flow_id, source_table_ids, sink_table_name.clone());
// TODO(discord9): pass the actual `QueryContext` in here
node_ctx.query_context = Some(QueryContext::with("greptime", "public"));
// construct a active dataflow state with it
let flow_plan = sql_to_flow_plan(node_ctx.borrow_mut(), &self.query_engine, &sql).await?;
@@ -592,7 +629,7 @@ impl FlownodeManager {
create_if_not_exist,
)
.await?;
info!("Successfully create flow with id={}", flow_id);
Ok(Some(flow_id))
}
}
@@ -625,6 +662,7 @@ pub struct FlowNodeContext {
pub schema: HashMap<GlobalId, RelationType>,
/// All the tables that have been registered in the worker
pub table_repr: TriMap,
pub query_context: Option<Arc<QueryContext>>,
}
impl FlowNodeContext {
@@ -764,6 +802,8 @@ impl FlowNodeContext {
/// Assign a global id to a table, if already assigned, return the existing global id
///
/// require at least one of `table_name` or `table_id` to be `Some`
///
/// and will try to fetch the schema from table info manager(if table exist now)
///
/// NOTE: this will not actually render the table into collection refered as GlobalId
@@ -771,23 +811,32 @@ impl FlowNodeContext {
pub async fn assign_global_id_to_table(
&mut self,
srv_map: &TableInfoSource,
table_name: Option<TableName>,
mut table_name: Option<TableName>,
table_id: Option<TableId>,
) -> GlobalId {
if let Some((_name, gid)) = table_name
) -> Result<GlobalId, Error> {
// if we can find by table name/id. not assign it
if let Some(gid) = table_name
.as_ref()
.and_then(|table_name| self.table_repr.get_by_name(table_name))
.map(|(_, gid)| gid)
.or_else(|| {
table_id
.and_then(|id| self.table_repr.get_by_table_id(&id))
.map(|(_, gid)| gid)
})
{
gid
Ok(gid)
} else {
let global_id = self.new_global_id();
if let Some(table_id) = table_id {
let schema = srv_map.get_table_schema(&table_id).await;
let _ = schema.map(|schema| self.schema.insert(global_id, schema));
}
let (known_table_name, schema) = srv_map.get_table_name_schema(&table_id).await?;
table_name = table_name.or(Some(known_table_name));
self.schema.insert(global_id, schema);
} // if we don't have table id, it means database havn't assign one yet or we don't need it
self.table_repr.insert(table_name, table_id, global_id);
global_id
Ok(global_id)
}
}
@@ -798,7 +847,7 @@ impl FlowNodeContext {
}
/// A tri-directional map that maps table name, table id, and global id
#[derive(Default)]
#[derive(Default, Debug)]
pub struct TriMap {
name_to_global_id: HashMap<TableName, GlobalId>,
id_to_global_id: HashMap<TableId, GlobalId>,

View File

@@ -182,9 +182,7 @@ impl servers::server::Server for FlowNodeServer {
// TODO(discord9): better place for dataflow to run per second
let manager_ref = self.flow_service.manager.clone();
let _handle_trigger_run = common_runtime::spawn_bg(async move {
manager_ref.run().await;
});
let _handle = manager_ref.clone().run_background();
Ok(addr)
}

View File

@@ -18,6 +18,7 @@ use std::borrow::BorrowMut;
use std::collections::{BTreeMap, VecDeque};
use std::sync::Arc;
use common_telemetry::info;
use hydroflow::scheduled::graph::Hydroflow;
use snafu::ResultExt;
use tokio::sync::{broadcast, mpsc, Mutex};

View File

@@ -206,12 +206,16 @@ impl AggregateFunc {
.fail();
}
};
let input_type = arg_type.unwrap_or_else(ConcreteDataType::null_datatype);
let input_type = if matches!(generic_fn, GenericFn::Count) {
ConcreteDataType::null_datatype()
} else {
arg_type.unwrap_or_else(ConcreteDataType::null_datatype)
};
rule.get(&(generic_fn, input_type.clone()))
.cloned()
.with_context(|| InvalidQuerySnafu {
reason: format!(
"No specialization found for binary function {:?} with input type {:?}",
"No specialization found for aggregate function {:?} with input type {:?}",
generic_fn, input_type
),
})

View File

@@ -101,11 +101,11 @@ pub async fn sql_to_flow_plan(
engine: &Arc<dyn QueryEngine>,
sql: &str,
) -> Result<TypedPlan, Error> {
let stmt =
QueryLanguageParser::parse_sql(sql, &QueryContext::arc()).context(InvalidQueryPlanSnafu)?;
let query_ctx = ctx.query_context.clone().unwrap();
let stmt = QueryLanguageParser::parse_sql(sql, &query_ctx).context(InvalidQueryPlanSnafu)?;
let plan = engine
.planner()
.plan(stmt, QueryContext::arc())
.plan(stmt, query_ctx)
.await
.context(InvalidQueryPlanSnafu)?;
let LogicalPlan::DfPlan(plan) = plan;

View File

@@ -14,6 +14,7 @@
#![warn(unused_imports)]
use common_telemetry::info;
use datatypes::data_type::ConcreteDataType as CDT;
use itertools::Itertools;
use snafu::{OptionExt, ResultExt};
@@ -99,7 +100,7 @@ impl TypedExpr {
},
)
.unzip();
info!("Function: {:?}", f);
match arg_len {
// because variadic function can also have 1 arguments, we need to check if it's a variadic function first
1 if VariadicFunc::from_str_and_types(fn_name, &arg_types).is_err() => {

View File

@@ -114,7 +114,12 @@ impl TypedPlan {
}
Some(RelType::Read(read)) => {
if let Some(ReadType::NamedTable(nt)) = &read.as_ref().read_type {
let table_reference = nt.names.clone();
let query_ctx = ctx.query_context.clone().unwrap();
let mut table_reference = vec![
query_ctx.current_catalog().to_string(),
query_ctx.current_schema().to_string(),
];
table_reference.extend(nt.names.clone());
let table = ctx.table(&table_reference)?;
let get_table = Plan::Get {
id: crate::expr::Id::Global(table.0),

View File

@@ -131,14 +131,6 @@ impl GreptimeDbStandaloneBuilder {
let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone()));
table_metadata_manager.init().await.unwrap();
let flow_builder = FlownodeBuilder::new(
Default::default(),
plugins.clone(),
table_metadata_manager.clone(),
)
.with_kv_backend(kv_backend.clone());
let flownode = Arc::new(flow_builder.build().await);
let flow_metadata_manager = Arc::new(FlowMetadataManager::new(kv_backend.clone()));
let multi_cache_invalidator = Arc::new(MultiCacheInvalidator::default());
let catalog_manager = KvBackendCatalogManager::new(
@@ -149,6 +141,15 @@ impl GreptimeDbStandaloneBuilder {
)
.await;
let flow_builder = FlownodeBuilder::new(
Default::default(),
plugins.clone(),
table_metadata_manager.clone(),
catalog_manager.clone(),
)
.with_kv_backend(kv_backend.clone());
let flownode = Arc::new(flow_builder.build().await);
let node_manager = Arc::new(StandaloneDatanodeManager {
region_server: datanode.region_server(),
flow_server: flownode.clone(),
@@ -209,6 +210,7 @@ impl GreptimeDbStandaloneBuilder {
flownode
.set_frontend_invoker(Box::new(instance.clone()))
.await;
let _ = flownode.run_background();
procedure_manager.start().await.unwrap();
wal_options_allocator.start().await.unwrap();