fix(flow): flow's table schema cache (#5251)

* fix: flow schema cache

* refactor: location for `to_meta_err`

* chore: endfile emptyline

* chore: review(partially)

* chore: per review

* refactor: per review

* refactor: per review
This commit is contained in:
discord9
2025-01-02 18:33:23 +08:00
committed by Yingwen
parent ab3f9c42f1
commit cf605ecccc
8 changed files with 407 additions and 108 deletions

View File

@@ -45,10 +45,8 @@ use tokio::sync::broadcast::error::TryRecvError;
use tokio::sync::{broadcast, watch, Mutex, RwLock};
pub(crate) use crate::adapter::node_context::FlownodeContext;
use crate::adapter::table_source::TableSource;
use crate::adapter::util::{
relation_desc_to_column_schemas_with_fallback, table_info_value_to_relation_desc,
};
use crate::adapter::table_source::ManagedTableSource;
use crate::adapter::util::relation_desc_to_column_schemas_with_fallback;
use crate::adapter::worker::{create_worker, Worker, WorkerHandle};
use crate::compute::ErrCollector;
use crate::df_optimizer::sql_to_flow_plan;
@@ -69,7 +67,7 @@ mod util;
mod worker;
pub(crate) mod node_context;
mod table_source;
pub(crate) mod table_source;
use crate::error::Error;
use crate::utils::StateReportHandler;
@@ -129,7 +127,7 @@ pub struct FlowWorkerManager {
/// The query engine that will be used to parse the query and convert it to a dataflow plan
pub query_engine: Arc<dyn QueryEngine>,
/// Getting table name and table schema from table info manager
table_info_source: TableSource,
table_info_source: ManagedTableSource,
frontend_invoker: RwLock<Option<FrontendInvoker>>,
/// contains mapping from table name to global id, and table schema
node_context: RwLock<FlownodeContext>,
@@ -158,11 +156,11 @@ impl FlowWorkerManager {
query_engine: Arc<dyn QueryEngine>,
table_meta: TableMetadataManagerRef,
) -> Self {
let srv_map = TableSource::new(
let srv_map = ManagedTableSource::new(
table_meta.table_info_manager().clone(),
table_meta.table_name_manager().clone(),
);
let node_context = FlownodeContext::default();
let node_context = FlownodeContext::new(Box::new(srv_map.clone()) as _);
let tick_manager = FlowTickManager::new();
let worker_handles = Vec::new();
FlowWorkerManager {
@@ -409,7 +407,7 @@ impl FlowWorkerManager {
) -> Result<Option<(Vec<String>, Option<usize>, Vec<ColumnSchema>)>, Error> {
if let Some(table_id) = self
.table_info_source
.get_table_id_from_name(table_name)
.get_opt_table_id_from_name(table_name)
.await?
{
let table_info = self
@@ -828,27 +826,9 @@ impl FlowWorkerManager {
.fail()?,
}
}
let table_id = self
.table_info_source
.get_table_id_from_name(&sink_table_name)
.await?
.context(UnexpectedSnafu {
reason: format!("Can't get table id for table name {:?}", sink_table_name),
})?;
let table_info_value = self
.table_info_source
.get_table_info_value(&table_id)
.await?
.context(UnexpectedSnafu {
reason: format!("Can't get table info value for table id {:?}", table_id),
})?;
let real_schema = table_info_value_to_relation_desc(table_info_value)?;
node_ctx.assign_table_schema(&sink_table_name, real_schema.clone())?;
} else {
// assign inferred schema to sink table
// create sink table
node_ctx.assign_table_schema(&sink_table_name, flow_plan.schema.clone())?;
let did_create = self
.create_table_from_relation(
&format!("flow-id={flow_id}"),

View File

@@ -34,11 +34,16 @@ use crate::error::InternalSnafu;
use crate::metrics::METRIC_FLOW_TASK_COUNT;
use crate::repr::{self, DiffRow};
fn to_meta_err(err: crate::error::Error) -> common_meta::error::Error {
// TODO(discord9): refactor this
Err::<(), _>(BoxedError::new(err))
.with_context(|_| ExternalSnafu)
.unwrap_err()
/// return a function to convert `crate::error::Error` to `common_meta::error::Error`
fn to_meta_err(
location: snafu::Location,
) -> impl FnOnce(crate::error::Error) -> common_meta::error::Error {
move |err: crate::error::Error| -> common_meta::error::Error {
common_meta::error::Error::External {
location,
source: BoxedError::new(err),
}
}
}
#[async_trait::async_trait]
@@ -79,7 +84,10 @@ impl Flownode for FlowWorkerManager {
flow_options,
query_ctx,
};
let ret = self.create_flow(args).await.map_err(to_meta_err)?;
let ret = self
.create_flow(args)
.await
.map_err(to_meta_err(snafu::location!()))?;
METRIC_FLOW_TASK_COUNT.inc();
Ok(FlowResponse {
affected_flows: ret
@@ -94,7 +102,7 @@ impl Flownode for FlowWorkerManager {
})) => {
self.remove_flow(flow_id.id as u64)
.await
.map_err(to_meta_err)?;
.map_err(to_meta_err(snafu::location!()))?;
METRIC_FLOW_TASK_COUNT.dec();
Ok(Default::default())
}
@@ -112,9 +120,15 @@ impl Flownode for FlowWorkerManager {
.await
.flush_all_sender()
.await
.map_err(to_meta_err)?;
let rows_send = self.run_available(true).await.map_err(to_meta_err)?;
let row = self.send_writeback_requests().await.map_err(to_meta_err)?;
.map_err(to_meta_err(snafu::location!()))?;
let rows_send = self
.run_available(true)
.await
.map_err(to_meta_err(snafu::location!()))?;
let row = self
.send_writeback_requests()
.await
.map_err(to_meta_err(snafu::location!()))?;
debug!(
"Done to flush flow_id={:?} with {} input rows flushed, {} rows sended and {} output rows flushed",
@@ -156,15 +170,14 @@ impl Flownode for FlowWorkerManager {
let fetch_order = {
let ctx = self.node_context.read().await;
let table_col_names = ctx
.table_repr
.get_by_table_id(&table_id)
.map(|r| r.1)
.and_then(|id| ctx.schema.get(&id))
.map(|desc| &desc.names)
.context(UnexpectedSnafu {
err_msg: format!("Table not found: {}", table_id),
})?;
// TODO(discord9): also check schema version so that altered table can be reported
let table_schema = ctx
.table_source
.table_from_id(&table_id)
.await
.map_err(to_meta_err(snafu::location!()))?;
let table_col_names = table_schema.names;
let table_col_names = table_col_names
.iter().enumerate()
.map(|(idx,name)| match name {
@@ -211,12 +224,12 @@ impl Flownode for FlowWorkerManager {
.iter()
.map(from_proto_to_data_type)
.collect::<std::result::Result<Vec<_>, _>>()
.map_err(to_meta_err)?;
.map_err(to_meta_err(snafu::location!()))?;
self.handle_write_request(region_id.into(), rows, &batch_datatypes)
.await
.map_err(|err| {
common_telemetry::error!(err;"Failed to handle write request");
to_meta_err(err)
to_meta_err(snafu::location!())(err)
})?;
}
Ok(Default::default())

View File

@@ -25,7 +25,8 @@ use snafu::{OptionExt, ResultExt};
use table::metadata::TableId;
use tokio::sync::{broadcast, mpsc, RwLock};
use crate::adapter::{FlowId, TableName, TableSource};
use crate::adapter::table_source::FlowTableSource;
use crate::adapter::{FlowId, ManagedTableSource, TableName};
use crate::error::{Error, EvalSnafu, TableNotFoundSnafu};
use crate::expr::error::InternalSnafu;
use crate::expr::{Batch, GlobalId};
@@ -33,7 +34,7 @@ use crate::metrics::METRIC_FLOW_INPUT_BUF_SIZE;
use crate::repr::{DiffRow, RelationDesc, BATCH_SIZE, BROADCAST_CAP, SEND_BUF_CAP};
/// A context that holds the information of the dataflow
#[derive(Default, Debug)]
#[derive(Debug)]
pub struct FlownodeContext {
/// mapping from source table to tasks, useful for schedule which task to run when a source table is updated
pub source_to_tasks: BTreeMap<TableId, BTreeSet<FlowId>>,
@@ -50,13 +51,28 @@ pub struct FlownodeContext {
/// note that the sink receiver should only have one, and we are using broadcast as mpsc channel here
pub sink_receiver:
BTreeMap<TableName, (mpsc::UnboundedSender<Batch>, mpsc::UnboundedReceiver<Batch>)>,
/// the schema of the table, query from metasrv or inferred from TypedPlan
pub schema: HashMap<GlobalId, RelationDesc>,
/// can query the schema of the table source, from metasrv with local cache
pub table_source: Box<dyn FlowTableSource>,
/// All the tables that have been registered in the worker
pub table_repr: IdToNameMap,
pub query_context: Option<Arc<QueryContext>>,
}
impl FlownodeContext {
pub fn new(table_source: Box<dyn FlowTableSource>) -> Self {
Self {
source_to_tasks: Default::default(),
flow_to_sink: Default::default(),
sink_to_flow: Default::default(),
source_sender: Default::default(),
sink_receiver: Default::default(),
table_source,
table_repr: Default::default(),
query_context: Default::default(),
}
}
}
/// a simple broadcast sender with backpressure, bounded capacity and blocking on send when send buf is full
/// note that it wouldn't evict old data, so it's possible to block forever if the receiver is slow
///
@@ -284,7 +300,7 @@ impl FlownodeContext {
/// Retrieves a GlobalId and table schema representing a table previously registered by calling the [register_table] function.
///
/// Returns an error if no table has been registered with the provided names
pub fn table(&self, name: &TableName) -> Result<(GlobalId, RelationDesc), Error> {
pub async fn table(&self, name: &TableName) -> Result<(GlobalId, RelationDesc), Error> {
let id = self
.table_repr
.get_by_name(name)
@@ -292,13 +308,7 @@ impl FlownodeContext {
.with_context(|| TableNotFoundSnafu {
name: name.join("."),
})?;
let schema = self
.schema
.get(&id)
.cloned()
.with_context(|| TableNotFoundSnafu {
name: name.join("."),
})?;
let schema = self.table_source.table(name).await?;
Ok((id, schema))
}
@@ -312,7 +322,7 @@ impl FlownodeContext {
/// merely creating a mapping from table id to global id
pub async fn assign_global_id_to_table(
&mut self,
srv_map: &TableSource,
srv_map: &ManagedTableSource,
mut table_name: Option<TableName>,
table_id: Option<TableId>,
) -> Result<GlobalId, Error> {
@@ -333,9 +343,8 @@ impl FlownodeContext {
// table id is Some meaning db must have created the table
if let Some(table_id) = table_id {
let (known_table_name, schema) = srv_map.get_table_name_schema(&table_id).await?;
let known_table_name = srv_map.get_table_name(&table_id).await?;
table_name = table_name.or(Some(known_table_name));
self.schema.insert(global_id, schema);
} // if we don't have table id, it means database haven't assign one yet or we don't need it
// still update the mapping with new global id
@@ -344,26 +353,6 @@ impl FlownodeContext {
}
}
/// Assign a schema to a table
///
pub fn assign_table_schema(
&mut self,
table_name: &TableName,
schema: RelationDesc,
) -> Result<(), Error> {
let gid = self
.table_repr
.get_by_name(table_name)
.map(|(_, gid)| gid)
.context(TableNotFoundSnafu {
name: format!("Table not found: {:?} in flownode cache", table_name),
})?;
self.schema.insert(gid, schema);
Ok(())
}
/// Get a new global id
pub fn new_global_id(&self) -> GlobalId {
GlobalId::User(self.table_repr.global_id_to_name_id.len() as u64)

View File

@@ -27,16 +27,56 @@ use crate::error::{
};
use crate::repr::RelationDesc;
/// mapping of table name <-> table id should be query from tableinfo manager
pub struct TableSource {
/// Table source but for flow, provide table schema by table name/id
#[async_trait::async_trait]
pub trait FlowTableSource: Send + Sync + std::fmt::Debug {
async fn table_name_from_id(&self, table_id: &TableId) -> Result<TableName, Error>;
async fn table_id_from_name(&self, name: &TableName) -> Result<TableId, Error>;
/// Get the table schema by table name
async fn table(&self, name: &TableName) -> Result<RelationDesc, Error> {
let id = self.table_id_from_name(name).await?;
self.table_from_id(&id).await
}
async fn table_from_id(&self, table_id: &TableId) -> Result<RelationDesc, Error>;
}
/// managed table source information, query from table info manager and table name manager
#[derive(Clone)]
pub struct ManagedTableSource {
/// for query `TableId -> TableName` mapping
table_info_manager: TableInfoManager,
table_name_manager: TableNameManager,
}
impl TableSource {
#[async_trait::async_trait]
impl FlowTableSource for ManagedTableSource {
async fn table_from_id(&self, table_id: &TableId) -> Result<RelationDesc, Error> {
let table_info_value = self
.get_table_info_value(table_id)
.await?
.with_context(|| TableNotFoundSnafu {
name: format!("TableId = {:?}, Can't found table info", table_id),
})?;
let desc = table_info_value_to_relation_desc(table_info_value)?;
Ok(desc)
}
async fn table_name_from_id(&self, table_id: &TableId) -> Result<TableName, Error> {
self.get_table_name(table_id).await
}
async fn table_id_from_name(&self, name: &TableName) -> Result<TableId, Error> {
self.get_opt_table_id_from_name(name)
.await?
.with_context(|| TableNotFoundSnafu {
name: name.join("."),
})
}
}
impl ManagedTableSource {
pub fn new(table_info_manager: TableInfoManager, table_name_manager: TableNameManager) -> Self {
TableSource {
ManagedTableSource {
table_info_manager,
table_name_manager,
}
@@ -63,7 +103,10 @@ impl TableSource {
}
/// If the table haven't been created in database, the tableId returned would be null
pub async fn get_table_id_from_name(&self, name: &TableName) -> Result<Option<TableId>, Error> {
pub async fn get_opt_table_id_from_name(
&self,
name: &TableName,
) -> Result<Option<TableId>, Error> {
let ret = self
.table_name_manager
.get(TableNameKey::new(&name[0], &name[1], &name[2]))
@@ -126,3 +169,117 @@ impl TableSource {
Ok((table_name, desc))
}
}
impl std::fmt::Debug for ManagedTableSource {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("KvBackendTableSource").finish()
}
}
#[cfg(test)]
pub(crate) mod test {
use std::collections::HashMap;
use datatypes::data_type::ConcreteDataType as CDT;
use super::*;
use crate::repr::{ColumnType, RelationType};
pub struct FlowDummyTableSource {
pub id_names_to_desc: Vec<(TableId, TableName, RelationDesc)>,
id_to_idx: HashMap<TableId, usize>,
name_to_idx: HashMap<TableName, usize>,
}
impl Default for FlowDummyTableSource {
fn default() -> Self {
let id_names_to_desc = vec![
(
1024,
[
"greptime".to_string(),
"public".to_string(),
"numbers".to_string(),
],
RelationType::new(vec![ColumnType::new(CDT::uint32_datatype(), false)])
.into_named(vec![Some("number".to_string())]),
),
(
1025,
[
"greptime".to_string(),
"public".to_string(),
"numbers_with_ts".to_string(),
],
RelationType::new(vec![
ColumnType::new(CDT::uint32_datatype(), false),
ColumnType::new(CDT::timestamp_millisecond_datatype(), false),
])
.into_named(vec![Some("number".to_string()), Some("ts".to_string())]),
),
];
let id_to_idx = id_names_to_desc
.iter()
.enumerate()
.map(|(idx, (id, _name, _desc))| (*id, idx))
.collect();
let name_to_idx = id_names_to_desc
.iter()
.enumerate()
.map(|(idx, (_id, name, _desc))| (name.clone(), idx))
.collect();
Self {
id_names_to_desc,
id_to_idx,
name_to_idx,
}
}
}
#[async_trait::async_trait]
impl FlowTableSource for FlowDummyTableSource {
async fn table_from_id(&self, table_id: &TableId) -> Result<RelationDesc, Error> {
let idx = self.id_to_idx.get(table_id).context(TableNotFoundSnafu {
name: format!("Table id = {:?}, couldn't found table desc", table_id),
})?;
let desc = self
.id_names_to_desc
.get(*idx)
.map(|x| x.2.clone())
.context(TableNotFoundSnafu {
name: format!("Table id = {:?}, couldn't found table desc", table_id),
})?;
Ok(desc)
}
async fn table_name_from_id(&self, table_id: &TableId) -> Result<TableName, Error> {
let idx = self.id_to_idx.get(table_id).context(TableNotFoundSnafu {
name: format!("Table id = {:?}, couldn't found table desc", table_id),
})?;
self.id_names_to_desc
.get(*idx)
.map(|x| x.1.clone())
.context(TableNotFoundSnafu {
name: format!("Table id = {:?}, couldn't found table desc", table_id),
})
}
async fn table_id_from_name(&self, name: &TableName) -> Result<TableId, Error> {
for (id, table_name, _desc) in &self.id_names_to_desc {
if name == table_name {
return Ok(*id);
}
}
TableNotFoundSnafu {
name: format!("Table name = {:?}, couldn't found table id", name),
}
.fail()?
}
}
impl std::fmt::Debug for FlowDummyTableSource {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("DummyTableSource").finish()
}
}
}

View File

@@ -173,12 +173,11 @@ mod test {
use super::*;
use crate::adapter::node_context::IdToNameMap;
use crate::adapter::table_source::test::FlowDummyTableSource;
use crate::df_optimizer::apply_df_optimizer;
use crate::expr::GlobalId;
use crate::repr::{ColumnType, RelationType};
pub fn create_test_ctx() -> FlownodeContext {
let mut schemas = HashMap::new();
let mut tri_map = IdToNameMap::new();
{
let gid = GlobalId::User(0);
@@ -187,10 +186,7 @@ mod test {
"public".to_string(),
"numbers".to_string(),
];
let schema = RelationType::new(vec![ColumnType::new(CDT::uint32_datatype(), false)]);
tri_map.insert(Some(name.clone()), Some(1024), gid);
schemas.insert(gid, schema.into_named(vec![Some("number".to_string())]));
}
{
@@ -200,23 +196,16 @@ mod test {
"public".to_string(),
"numbers_with_ts".to_string(),
];
let schema = RelationType::new(vec![
ColumnType::new(CDT::uint32_datatype(), false),
ColumnType::new(CDT::timestamp_millisecond_datatype(), false),
]);
schemas.insert(
gid,
schema.into_named(vec![Some("number".to_string()), Some("ts".to_string())]),
);
tri_map.insert(Some(name.clone()), Some(1025), gid);
}
FlownodeContext {
schema: schemas,
table_repr: tri_map,
query_context: Some(Arc::new(QueryContext::with("greptime", "public"))),
..Default::default()
}
let dummy_source = FlowDummyTableSource::default();
let mut ctx = FlownodeContext::new(Box::new(dummy_source));
ctx.table_repr = tri_map;
ctx.query_context = Some(Arc::new(QueryContext::with("greptime", "public")));
ctx
}
pub fn create_test_query_engine() -> Arc<dyn QueryEngine> {

View File

@@ -176,7 +176,7 @@ impl TypedPlan {
}
.fail()?,
};
let table = ctx.table(&table_reference)?;
let table = ctx.table(&table_reference).await?;
let get_table = Plan::Get {
id: crate::expr::Id::Global(table.0),
};

View File

@@ -0,0 +1,107 @@
-- test if flow can get table schema correctly after table have been altered
CREATE TABLE bytes_log (
byte INT,
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
-- event time
TIME INDEX(ts)
);
Affected Rows: 0
CREATE TABLE approx_rate (
rate DOUBLE,
time_window TIMESTAMP,
update_at TIMESTAMP,
TIME INDEX(time_window)
);
Affected Rows: 0
-- make both src&sink table in cache of flownode by using them
CREATE FLOW find_approx_rate SINK TO approx_rate AS
SELECT
(max(byte) - min(byte)) / 30.0 as rate,
date_bin(INTERVAL '30 second', ts) as time_window
from
bytes_log
GROUP BY
time_window;
Affected Rows: 0
SHOW CREATE FLOW find_approx_rate;
+------------------+-----------------------------------------------------------------------------------------------------------------------------------------+
| Flow | Create Flow |
+------------------+-----------------------------------------------------------------------------------------------------------------------------------------+
| find_approx_rate | CREATE FLOW IF NOT EXISTS find_approx_rate |
| | SINK TO approx_rate |
| | AS SELECT (max(byte) - min(byte)) / 30.0 AS rate, date_bin(INTERVAL '30 second', ts) AS time_window FROM bytes_log GROUP BY time_window |
+------------------+-----------------------------------------------------------------------------------------------------------------------------------------+
DROP FLOW find_approx_rate;
Affected Rows: 0
ALTER TABLE bytes_log ADD COLUMN stat INT DEFAULT 200 AFTER byte;
Affected Rows: 0
ALTER TABLE approx_rate ADD COLUMN sample_cnt INT64 DEFAULT 0 AFTER rate;
Affected Rows: 0
CREATE FLOW find_approx_rate SINK TO approx_rate AS
SELECT
(max(byte) - min(byte)) / 30.0 as rate,
count(byte) as sample_cnt,
date_bin(INTERVAL '30 second', ts) as time_window
from
bytes_log
GROUP BY
time_window;
Affected Rows: 0
INSERT INTO
bytes_log
VALUES
(0, 200, '2023-01-01 00:00:01'),
(300,200, '2023-01-01 00:00:29');
Affected Rows: 2
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('find_approx_rate');
+--------------------------------------+
| ADMIN FLUSH_FLOW('find_approx_rate') |
+--------------------------------------+
| FLOW_FLUSHED |
+--------------------------------------+
SELECT
rate,
sample_cnt,
time_window
FROM
approx_rate;
+------+------------+---------------------+
| rate | sample_cnt | time_window |
+------+------------+---------------------+
| 10.0 | 2 | 2023-01-01T00:00:00 |
+------+------------+---------------------+
DROP TABLE bytes_log;
Affected Rows: 0
DROP FLOW find_approx_rate;
Affected Rows: 0
DROP TABLE approx_rate;
Affected Rows: 0

View File

@@ -0,0 +1,64 @@
-- test if flow can get table schema correctly after table have been altered
CREATE TABLE bytes_log (
byte INT,
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
-- event time
TIME INDEX(ts)
);
CREATE TABLE approx_rate (
rate DOUBLE,
time_window TIMESTAMP,
update_at TIMESTAMP,
TIME INDEX(time_window)
);
-- make both src&sink table in cache of flownode by using them
CREATE FLOW find_approx_rate SINK TO approx_rate AS
SELECT
(max(byte) - min(byte)) / 30.0 as rate,
date_bin(INTERVAL '30 second', ts) as time_window
from
bytes_log
GROUP BY
time_window;
SHOW CREATE FLOW find_approx_rate;
DROP FLOW find_approx_rate;
ALTER TABLE bytes_log ADD COLUMN stat INT DEFAULT 200 AFTER byte;
ALTER TABLE approx_rate ADD COLUMN sample_cnt INT64 DEFAULT 0 AFTER rate;
CREATE FLOW find_approx_rate SINK TO approx_rate AS
SELECT
(max(byte) - min(byte)) / 30.0 as rate,
count(byte) as sample_cnt,
date_bin(INTERVAL '30 second', ts) as time_window
from
bytes_log
GROUP BY
time_window;
INSERT INTO
bytes_log
VALUES
(0, 200, '2023-01-01 00:00:01'),
(300,200, '2023-01-01 00:00:29');
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('find_approx_rate');
SELECT
rate,
sample_cnt,
time_window
FROM
approx_rate;
DROP TABLE bytes_log;
DROP FLOW find_approx_rate;
DROP TABLE approx_rate;