mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-10 15:22:56 +00:00
feat(flow): dual engine (#5881)
* feat: partial use batch mode(WIP) * feat: add flow engine trait * refactor: more trait method * dual engine * feat: dual engine * refactor: flow map cache * chore: per review * chore: per review
This commit is contained in:
@@ -307,3 +307,8 @@ strip = true
|
||||
[profile.dev.package.tests-fuzz]
|
||||
debug = false
|
||||
strip = true
|
||||
|
||||
[profile.dev]
|
||||
opt-level = 1
|
||||
[profile.dev.package."*"]
|
||||
opt-level = 3
|
||||
|
||||
@@ -32,7 +32,9 @@ use common_meta::key::TableMetadataManager;
|
||||
use common_telemetry::info;
|
||||
use common_telemetry::logging::TracingOptions;
|
||||
use common_version::{short_version, version};
|
||||
use flow::{FlownodeBuilder, FlownodeInstance, FlownodeServiceBuilder, FrontendInvoker};
|
||||
use flow::{
|
||||
FlownodeBuilder, FlownodeInstance, FlownodeServiceBuilder, FrontendClient, FrontendInvoker,
|
||||
};
|
||||
use meta_client::{MetaClientOptions, MetaClientType};
|
||||
use snafu::{ensure, OptionExt, ResultExt};
|
||||
use tracing_appender::non_blocking::WorkerGuard;
|
||||
@@ -313,12 +315,14 @@ impl StartCommand {
|
||||
);
|
||||
|
||||
let flow_metadata_manager = Arc::new(FlowMetadataManager::new(cached_meta_backend.clone()));
|
||||
let frontend_client = FrontendClient::from_meta_client(meta_client.clone());
|
||||
let flownode_builder = FlownodeBuilder::new(
|
||||
opts.clone(),
|
||||
Plugins::new(),
|
||||
table_metadata_manager,
|
||||
catalog_manager.clone(),
|
||||
flow_metadata_manager,
|
||||
Arc::new(frontend_client),
|
||||
)
|
||||
.with_heartbeat_task(heartbeat_task);
|
||||
|
||||
|
||||
@@ -57,7 +57,7 @@ use datanode::region_server::RegionServer;
|
||||
use file_engine::config::EngineConfig as FileEngineConfig;
|
||||
use flow::{
|
||||
FlowConfig, FlowWorkerManager, FlownodeBuilder, FlownodeInstance, FlownodeOptions,
|
||||
FrontendInvoker,
|
||||
FrontendClient, FrontendInvoker,
|
||||
};
|
||||
use frontend::frontend::{Frontend, FrontendOptions};
|
||||
use frontend::instance::builder::FrontendBuilder;
|
||||
@@ -523,12 +523,18 @@ impl StartCommand {
|
||||
flow: opts.flow.clone(),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
// TODO(discord9): for standalone not use grpc, but just somehow get a handler to frontend grpc client without
|
||||
// actually make a connection
|
||||
let fe_server_addr = fe_opts.grpc.bind_addr.clone();
|
||||
let frontend_client = FrontendClient::from_static_grpc_addr(fe_server_addr);
|
||||
let flow_builder = FlownodeBuilder::new(
|
||||
flownode_options,
|
||||
plugins.clone(),
|
||||
table_metadata_manager.clone(),
|
||||
catalog_manager.clone(),
|
||||
flow_metadata_manager.clone(),
|
||||
Arc::new(frontend_client),
|
||||
);
|
||||
let flownode = flow_builder
|
||||
.build()
|
||||
|
||||
@@ -324,7 +324,7 @@ pub enum CreateFlowState {
|
||||
}
|
||||
|
||||
/// The type of flow.
|
||||
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
|
||||
pub enum FlowType {
|
||||
/// The flow is a batching task.
|
||||
Batching,
|
||||
|
||||
@@ -16,7 +16,7 @@
|
||||
//! and communicating with other parts of the database
|
||||
#![warn(unused_imports)]
|
||||
|
||||
use std::collections::{BTreeMap, HashMap};
|
||||
use std::collections::BTreeMap;
|
||||
use std::sync::Arc;
|
||||
use std::time::{Duration, Instant, SystemTime};
|
||||
|
||||
@@ -56,6 +56,7 @@ use crate::error::{EvalSnafu, ExternalSnafu, InternalSnafu, InvalidQuerySnafu, U
|
||||
use crate::expr::Batch;
|
||||
use crate::metrics::{METRIC_FLOW_INSERT_ELAPSED, METRIC_FLOW_ROWS, METRIC_FLOW_RUN_INTERVAL_MS};
|
||||
use crate::repr::{self, DiffRow, RelationDesc, Row, BATCH_SIZE};
|
||||
use crate::{CreateFlowArgs, FlowId, TableName};
|
||||
|
||||
mod flownode_impl;
|
||||
mod parse_expr;
|
||||
@@ -78,11 +79,6 @@ pub const AUTO_CREATED_PLACEHOLDER_TS_COL: &str = "__ts_placeholder";
|
||||
|
||||
pub const AUTO_CREATED_UPDATE_AT_TS_COL: &str = "update_at";
|
||||
|
||||
// TODO(discord9): refactor common types for flow to a separate module
|
||||
/// FlowId is a unique identifier for a flow task
|
||||
pub type FlowId = u64;
|
||||
pub type TableName = [String; 3];
|
||||
|
||||
/// Flow config that exists both in standalone&distributed mode
|
||||
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
|
||||
#[serde(default)]
|
||||
@@ -731,25 +727,10 @@ impl FlowWorkerManager {
|
||||
}
|
||||
}
|
||||
|
||||
/// The arguments to create a flow in [`FlowWorkerManager`].
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct CreateFlowArgs {
|
||||
pub flow_id: FlowId,
|
||||
pub sink_table_name: TableName,
|
||||
pub source_table_ids: Vec<TableId>,
|
||||
pub create_if_not_exists: bool,
|
||||
pub or_replace: bool,
|
||||
pub expire_after: Option<i64>,
|
||||
pub comment: Option<String>,
|
||||
pub sql: String,
|
||||
pub flow_options: HashMap<String, String>,
|
||||
pub query_ctx: Option<QueryContext>,
|
||||
}
|
||||
|
||||
/// Create&Remove flow
|
||||
impl FlowWorkerManager {
|
||||
/// remove a flow by it's id
|
||||
pub async fn remove_flow(&self, flow_id: FlowId) -> Result<(), Error> {
|
||||
pub async fn remove_flow_inner(&self, flow_id: FlowId) -> Result<(), Error> {
|
||||
for handle in self.worker_handles.iter() {
|
||||
if handle.contains_flow(flow_id).await? {
|
||||
handle.remove_flow(flow_id).await?;
|
||||
@@ -766,7 +747,7 @@ impl FlowWorkerManager {
|
||||
/// 1. parse query into typed plan(and optional parse expire_after expr)
|
||||
/// 2. render source/sink with output table id and used input table id
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub async fn create_flow(&self, args: CreateFlowArgs) -> Result<Option<FlowId>, Error> {
|
||||
pub async fn create_flow_inner(&self, args: CreateFlowArgs) -> Result<Option<FlowId>, Error> {
|
||||
let CreateFlowArgs {
|
||||
flow_id,
|
||||
sink_table_name,
|
||||
@@ -905,6 +886,32 @@ impl FlowWorkerManager {
|
||||
info!("Successfully create flow with id={}", flow_id);
|
||||
Ok(Some(flow_id))
|
||||
}
|
||||
|
||||
pub async fn flush_flow_inner(&self, flow_id: FlowId) -> Result<usize, Error> {
|
||||
debug!("Starting to flush flow_id={:?}", flow_id);
|
||||
// lock to make sure writes before flush are written to flow
|
||||
// and immediately drop to prevent following writes to be blocked
|
||||
drop(self.flush_lock.write().await);
|
||||
let flushed_input_rows = self.node_context.read().await.flush_all_sender().await?;
|
||||
let rows_send = self.run_available(true).await?;
|
||||
let row = self.send_writeback_requests().await?;
|
||||
debug!(
|
||||
"Done to flush flow_id={:?} with {} input rows flushed, {} rows sended and {} output rows flushed",
|
||||
flow_id, flushed_input_rows, rows_send, row
|
||||
);
|
||||
Ok(row)
|
||||
}
|
||||
|
||||
pub async fn flow_exist_inner(&self, flow_id: FlowId) -> Result<bool, Error> {
|
||||
let mut exist = false;
|
||||
for handle in self.worker_handles.iter() {
|
||||
if handle.contains_flow(flow_id).await? {
|
||||
exist = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
Ok(exist)
|
||||
}
|
||||
}
|
||||
|
||||
/// FlowTickManager is a manager for flow tick, which trakc flow execution progress
|
||||
|
||||
@@ -13,40 +13,228 @@
|
||||
// limitations under the License.
|
||||
|
||||
//! impl `FlowNode` trait for FlowNodeManager so standalone can call them
|
||||
use std::collections::HashMap;
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::v1::flow::{
|
||||
flow_request, CreateRequest, DropRequest, FlowRequest, FlowResponse, FlushFlow,
|
||||
};
|
||||
use api::v1::region::InsertRequests;
|
||||
use common_error::ext::BoxedError;
|
||||
use common_meta::error::{ExternalSnafu, Result, UnexpectedSnafu};
|
||||
use common_telemetry::{debug, trace};
|
||||
use common_meta::ddl::create_flow::FlowType;
|
||||
use common_meta::error::{Result as MetaResult, UnexpectedSnafu};
|
||||
use common_runtime::JoinHandle;
|
||||
use common_telemetry::{trace, warn};
|
||||
use datatypes::value::Value;
|
||||
use itertools::Itertools;
|
||||
use snafu::{IntoError, OptionExt, ResultExt};
|
||||
use store_api::storage::RegionId;
|
||||
use store_api::storage::{RegionId, TableId};
|
||||
|
||||
use crate::adapter::{CreateFlowArgs, FlowWorkerManager};
|
||||
use crate::error::{CreateFlowSnafu, InsertIntoFlowSnafu, InternalSnafu};
|
||||
use crate::batching_mode::engine::BatchingEngine;
|
||||
use crate::engine::FlowEngine;
|
||||
use crate::error::{CreateFlowSnafu, FlowNotFoundSnafu, InsertIntoFlowSnafu, InternalSnafu};
|
||||
use crate::metrics::METRIC_FLOW_TASK_COUNT;
|
||||
use crate::repr::{self, DiffRow};
|
||||
use crate::{Error, FlowId};
|
||||
|
||||
/// return a function to convert `crate::error::Error` to `common_meta::error::Error`
|
||||
fn to_meta_err(
|
||||
location: snafu::Location,
|
||||
) -> impl FnOnce(crate::error::Error) -> common_meta::error::Error {
|
||||
move |err: crate::error::Error| -> common_meta::error::Error {
|
||||
common_meta::error::Error::External {
|
||||
location,
|
||||
source: BoxedError::new(err),
|
||||
/// Manage both streaming and batching mode engine
|
||||
///
|
||||
/// including create/drop/flush flow
|
||||
/// and redirect insert requests to the appropriate engine
|
||||
pub struct FlowDualEngine {
|
||||
streaming_engine: Arc<FlowWorkerManager>,
|
||||
batching_engine: Arc<BatchingEngine>,
|
||||
/// helper struct for faster query flow by table id or vice versa
|
||||
src_table2flow: std::sync::RwLock<SrcTableToFlow>,
|
||||
}
|
||||
|
||||
struct SrcTableToFlow {
|
||||
/// mapping of table ids to flow ids for streaming mode
|
||||
stream: HashMap<TableId, HashSet<FlowId>>,
|
||||
/// mapping of table ids to flow ids for batching mode
|
||||
batch: HashMap<TableId, HashSet<FlowId>>,
|
||||
/// mapping of flow ids to (flow type, source table ids)
|
||||
flow_infos: HashMap<FlowId, (FlowType, Vec<TableId>)>,
|
||||
}
|
||||
|
||||
impl SrcTableToFlow {
|
||||
fn in_stream(&self, table_id: TableId) -> bool {
|
||||
self.stream.contains_key(&table_id)
|
||||
}
|
||||
fn in_batch(&self, table_id: TableId) -> bool {
|
||||
self.batch.contains_key(&table_id)
|
||||
}
|
||||
fn add_flow(&mut self, flow_id: FlowId, flow_type: FlowType, src_table_ids: Vec<TableId>) {
|
||||
let mapping = match flow_type {
|
||||
FlowType::Streaming => &mut self.stream,
|
||||
FlowType::Batching => &mut self.batch,
|
||||
};
|
||||
|
||||
for src_table in src_table_ids.clone() {
|
||||
mapping
|
||||
.entry(src_table)
|
||||
.and_modify(|flows| {
|
||||
flows.insert(flow_id);
|
||||
})
|
||||
.or_insert_with(|| {
|
||||
let mut set = HashSet::new();
|
||||
set.insert(flow_id);
|
||||
set
|
||||
});
|
||||
}
|
||||
self.flow_infos.insert(flow_id, (flow_type, src_table_ids));
|
||||
}
|
||||
|
||||
fn remove_flow(&mut self, flow_id: FlowId) {
|
||||
let mapping = match self.get_flow_type(flow_id) {
|
||||
Some(FlowType::Streaming) => &mut self.stream,
|
||||
Some(FlowType::Batching) => &mut self.batch,
|
||||
None => return,
|
||||
};
|
||||
if let Some((_, src_table_ids)) = self.flow_infos.remove(&flow_id) {
|
||||
for src_table in src_table_ids {
|
||||
if let Some(flows) = mapping.get_mut(&src_table) {
|
||||
flows.remove(&flow_id);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn get_flow_type(&self, flow_id: FlowId) -> Option<FlowType> {
|
||||
self.flow_infos
|
||||
.get(&flow_id)
|
||||
.map(|(flow_type, _)| flow_type)
|
||||
.cloned()
|
||||
}
|
||||
}
|
||||
|
||||
impl FlowEngine for FlowDualEngine {
|
||||
async fn create_flow(&self, args: CreateFlowArgs) -> Result<Option<FlowId>, Error> {
|
||||
let flow_type = args
|
||||
.flow_options
|
||||
.get(FlowType::FLOW_TYPE_KEY)
|
||||
.map(|s| s.as_str());
|
||||
|
||||
let flow_type = match flow_type {
|
||||
Some(FlowType::BATCHING) => FlowType::Batching,
|
||||
Some(FlowType::STREAMING) => FlowType::Streaming,
|
||||
None => FlowType::Batching,
|
||||
Some(flow_type) => {
|
||||
return InternalSnafu {
|
||||
reason: format!("Invalid flow type: {}", flow_type),
|
||||
}
|
||||
.fail()
|
||||
}
|
||||
};
|
||||
|
||||
let flow_id = args.flow_id;
|
||||
let src_table_ids = args.source_table_ids.clone();
|
||||
|
||||
let res = match flow_type {
|
||||
FlowType::Batching => self.batching_engine.create_flow(args).await,
|
||||
FlowType::Streaming => self.streaming_engine.create_flow(args).await,
|
||||
}?;
|
||||
|
||||
self.src_table2flow
|
||||
.write()
|
||||
.unwrap()
|
||||
.add_flow(flow_id, flow_type, src_table_ids);
|
||||
|
||||
Ok(res)
|
||||
}
|
||||
|
||||
async fn remove_flow(&self, flow_id: FlowId) -> Result<(), Error> {
|
||||
let flow_type = self.src_table2flow.read().unwrap().get_flow_type(flow_id);
|
||||
match flow_type {
|
||||
Some(FlowType::Batching) => self.batching_engine.remove_flow(flow_id).await,
|
||||
Some(FlowType::Streaming) => self.streaming_engine.remove_flow(flow_id).await,
|
||||
None => FlowNotFoundSnafu { id: flow_id }.fail(),
|
||||
}?;
|
||||
// remove mapping
|
||||
self.src_table2flow.write().unwrap().remove_flow(flow_id);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn flush_flow(&self, flow_id: FlowId) -> Result<usize, Error> {
|
||||
let flow_type = self.src_table2flow.read().unwrap().get_flow_type(flow_id);
|
||||
match flow_type {
|
||||
Some(FlowType::Batching) => self.batching_engine.flush_flow(flow_id).await,
|
||||
Some(FlowType::Streaming) => self.streaming_engine.flush_flow(flow_id).await,
|
||||
None => FlowNotFoundSnafu { id: flow_id }.fail(),
|
||||
}
|
||||
}
|
||||
|
||||
async fn flow_exist(&self, flow_id: FlowId) -> Result<bool, Error> {
|
||||
let flow_type = self.src_table2flow.read().unwrap().get_flow_type(flow_id);
|
||||
// not using `flow_type.is_some()` to make sure the flow is actually exist in the underlying engine
|
||||
match flow_type {
|
||||
Some(FlowType::Batching) => self.batching_engine.flow_exist(flow_id).await,
|
||||
Some(FlowType::Streaming) => self.streaming_engine.flow_exist(flow_id).await,
|
||||
None => Ok(false),
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_flow_inserts(
|
||||
&self,
|
||||
request: api::v1::region::InsertRequests,
|
||||
) -> Result<(), Error> {
|
||||
// TODO(discord9): make as little clone as possible
|
||||
let mut to_stream_engine = Vec::with_capacity(request.requests.len());
|
||||
let mut to_batch_engine = request.requests;
|
||||
|
||||
{
|
||||
let src_table2flow = self.src_table2flow.read().unwrap();
|
||||
to_batch_engine.retain(|req| {
|
||||
let region_id = RegionId::from(req.region_id);
|
||||
let table_id = region_id.table_id();
|
||||
let is_in_stream = src_table2flow.in_stream(table_id);
|
||||
let is_in_batch = src_table2flow.in_batch(table_id);
|
||||
if is_in_stream {
|
||||
to_stream_engine.push(req.clone());
|
||||
}
|
||||
if is_in_batch {
|
||||
return true;
|
||||
}
|
||||
if !is_in_batch && !is_in_stream {
|
||||
// TODO(discord9): also put to centralized logging for flow once it implemented
|
||||
warn!("Table {} is not any flow's source table", table_id)
|
||||
}
|
||||
false
|
||||
});
|
||||
// drop(src_table2flow);
|
||||
// can't use drop due to https://github.com/rust-lang/rust/pull/128846
|
||||
}
|
||||
|
||||
let streaming_engine = self.streaming_engine.clone();
|
||||
let stream_handler: JoinHandle<Result<(), Error>> =
|
||||
common_runtime::spawn_global(async move {
|
||||
streaming_engine
|
||||
.handle_flow_inserts(api::v1::region::InsertRequests {
|
||||
requests: to_stream_engine,
|
||||
})
|
||||
.await?;
|
||||
Ok(())
|
||||
});
|
||||
self.batching_engine
|
||||
.handle_flow_inserts(api::v1::region::InsertRequests {
|
||||
requests: to_batch_engine,
|
||||
})
|
||||
.await?;
|
||||
stream_handler.await.map_err(|e| {
|
||||
crate::error::UnexpectedSnafu {
|
||||
reason: format!("JoinError when handle inserts for flow stream engine: {e:?}"),
|
||||
}
|
||||
.build()
|
||||
})??;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl common_meta::node_manager::Flownode for FlowWorkerManager {
|
||||
async fn handle(&self, request: FlowRequest) -> Result<FlowResponse> {
|
||||
impl common_meta::node_manager::Flownode for FlowDualEngine {
|
||||
async fn handle(&self, request: FlowRequest) -> MetaResult<FlowResponse> {
|
||||
let query_ctx = request
|
||||
.header
|
||||
.and_then(|h| h.query_context)
|
||||
@@ -109,31 +297,10 @@ impl common_meta::node_manager::Flownode for FlowWorkerManager {
|
||||
Some(flow_request::Body::Flush(FlushFlow {
|
||||
flow_id: Some(flow_id),
|
||||
})) => {
|
||||
// TODO(discord9): impl individual flush
|
||||
debug!("Starting to flush flow_id={:?}", flow_id);
|
||||
// lock to make sure writes before flush are written to flow
|
||||
// and immediately drop to prevent following writes to be blocked
|
||||
drop(self.flush_lock.write().await);
|
||||
let flushed_input_rows = self
|
||||
.node_context
|
||||
.read()
|
||||
.await
|
||||
.flush_all_sender()
|
||||
.await
|
||||
.map_err(to_meta_err(snafu::location!()))?;
|
||||
let rows_send = self
|
||||
.run_available(true)
|
||||
.await
|
||||
.map_err(to_meta_err(snafu::location!()))?;
|
||||
let row = self
|
||||
.send_writeback_requests()
|
||||
.flush_flow(flow_id.id as u64)
|
||||
.await
|
||||
.map_err(to_meta_err(snafu::location!()))?;
|
||||
|
||||
debug!(
|
||||
"Done to flush flow_id={:?} with {} input rows flushed, {} rows sended and {} output rows flushed",
|
||||
flow_id, flushed_input_rows, rows_send, row
|
||||
);
|
||||
Ok(FlowResponse {
|
||||
affected_flows: vec![flow_id],
|
||||
affected_rows: row as u64,
|
||||
@@ -151,7 +318,167 @@ impl common_meta::node_manager::Flownode for FlowWorkerManager {
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_inserts(&self, request: InsertRequests) -> Result<FlowResponse> {
|
||||
async fn handle_inserts(&self, request: InsertRequests) -> MetaResult<FlowResponse> {
|
||||
FlowEngine::handle_flow_inserts(self, request)
|
||||
.await
|
||||
.map(|_| Default::default())
|
||||
.map_err(to_meta_err(snafu::location!()))
|
||||
}
|
||||
}
|
||||
|
||||
/// return a function to convert `crate::error::Error` to `common_meta::error::Error`
|
||||
fn to_meta_err(
|
||||
location: snafu::Location,
|
||||
) -> impl FnOnce(crate::error::Error) -> common_meta::error::Error {
|
||||
move |err: crate::error::Error| -> common_meta::error::Error {
|
||||
common_meta::error::Error::External {
|
||||
location,
|
||||
source: BoxedError::new(err),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl common_meta::node_manager::Flownode for FlowWorkerManager {
|
||||
async fn handle(&self, request: FlowRequest) -> MetaResult<FlowResponse> {
|
||||
let query_ctx = request
|
||||
.header
|
||||
.and_then(|h| h.query_context)
|
||||
.map(|ctx| ctx.into());
|
||||
match request.body {
|
||||
Some(flow_request::Body::Create(CreateRequest {
|
||||
flow_id: Some(task_id),
|
||||
source_table_ids,
|
||||
sink_table_name: Some(sink_table_name),
|
||||
create_if_not_exists,
|
||||
expire_after,
|
||||
comment,
|
||||
sql,
|
||||
flow_options,
|
||||
or_replace,
|
||||
})) => {
|
||||
let source_table_ids = source_table_ids.into_iter().map(|id| id.id).collect_vec();
|
||||
let sink_table_name = [
|
||||
sink_table_name.catalog_name,
|
||||
sink_table_name.schema_name,
|
||||
sink_table_name.table_name,
|
||||
];
|
||||
let expire_after = expire_after.map(|e| e.value);
|
||||
let args = CreateFlowArgs {
|
||||
flow_id: task_id.id as u64,
|
||||
sink_table_name,
|
||||
source_table_ids,
|
||||
create_if_not_exists,
|
||||
or_replace,
|
||||
expire_after,
|
||||
comment: Some(comment),
|
||||
sql: sql.clone(),
|
||||
flow_options,
|
||||
query_ctx,
|
||||
};
|
||||
let ret = self
|
||||
.create_flow(args)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.with_context(|_| CreateFlowSnafu { sql: sql.clone() })
|
||||
.map_err(to_meta_err(snafu::location!()))?;
|
||||
METRIC_FLOW_TASK_COUNT.inc();
|
||||
Ok(FlowResponse {
|
||||
affected_flows: ret
|
||||
.map(|id| greptime_proto::v1::FlowId { id: id as u32 })
|
||||
.into_iter()
|
||||
.collect_vec(),
|
||||
..Default::default()
|
||||
})
|
||||
}
|
||||
Some(flow_request::Body::Drop(DropRequest {
|
||||
flow_id: Some(flow_id),
|
||||
})) => {
|
||||
self.remove_flow(flow_id.id as u64)
|
||||
.await
|
||||
.map_err(to_meta_err(snafu::location!()))?;
|
||||
METRIC_FLOW_TASK_COUNT.dec();
|
||||
Ok(Default::default())
|
||||
}
|
||||
Some(flow_request::Body::Flush(FlushFlow {
|
||||
flow_id: Some(flow_id),
|
||||
})) => {
|
||||
let row = self
|
||||
.flush_flow_inner(flow_id.id as u64)
|
||||
.await
|
||||
.map_err(to_meta_err(snafu::location!()))?;
|
||||
Ok(FlowResponse {
|
||||
affected_flows: vec![flow_id],
|
||||
affected_rows: row as u64,
|
||||
..Default::default()
|
||||
})
|
||||
}
|
||||
None => UnexpectedSnafu {
|
||||
err_msg: "Missing request body",
|
||||
}
|
||||
.fail(),
|
||||
_ => UnexpectedSnafu {
|
||||
err_msg: "Invalid request body.",
|
||||
}
|
||||
.fail(),
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_inserts(&self, request: InsertRequests) -> MetaResult<FlowResponse> {
|
||||
self.handle_inserts_inner(request)
|
||||
.await
|
||||
.map(|_| Default::default())
|
||||
.map_err(to_meta_err(snafu::location!()))
|
||||
}
|
||||
}
|
||||
|
||||
impl FlowEngine for FlowWorkerManager {
|
||||
async fn create_flow(&self, args: CreateFlowArgs) -> Result<Option<FlowId>, Error> {
|
||||
self.create_flow_inner(args).await
|
||||
}
|
||||
|
||||
async fn remove_flow(&self, flow_id: FlowId) -> Result<(), Error> {
|
||||
self.remove_flow_inner(flow_id).await
|
||||
}
|
||||
|
||||
async fn flush_flow(&self, flow_id: FlowId) -> Result<usize, Error> {
|
||||
self.flush_flow_inner(flow_id).await
|
||||
}
|
||||
|
||||
async fn flow_exist(&self, flow_id: FlowId) -> Result<bool, Error> {
|
||||
self.flow_exist_inner(flow_id).await
|
||||
}
|
||||
|
||||
async fn handle_flow_inserts(
|
||||
&self,
|
||||
request: api::v1::region::InsertRequests,
|
||||
) -> Result<(), Error> {
|
||||
self.handle_inserts_inner(request).await
|
||||
}
|
||||
}
|
||||
|
||||
/// Simple helper enum for fetching value from row with default value
|
||||
#[derive(Debug, Clone)]
|
||||
enum FetchFromRow {
|
||||
Idx(usize),
|
||||
Default(Value),
|
||||
}
|
||||
|
||||
impl FetchFromRow {
|
||||
/// Panic if idx is out of bound
|
||||
fn fetch(&self, row: &repr::Row) -> Value {
|
||||
match self {
|
||||
FetchFromRow::Idx(idx) => row.get(*idx).unwrap().clone(),
|
||||
FetchFromRow::Default(v) => v.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl FlowWorkerManager {
|
||||
async fn handle_inserts_inner(
|
||||
&self,
|
||||
request: InsertRequests,
|
||||
) -> std::result::Result<(), Error> {
|
||||
// using try_read to ensure two things:
|
||||
// 1. flush wouldn't happen until inserts before it is inserted
|
||||
// 2. inserts happening concurrently with flush wouldn't be block by flush
|
||||
@@ -172,11 +499,7 @@ impl common_meta::node_manager::Flownode for FlowWorkerManager {
|
||||
let ctx = self.node_context.read().await;
|
||||
|
||||
// TODO(discord9): also check schema version so that altered table can be reported
|
||||
let table_schema = ctx
|
||||
.table_source
|
||||
.table_from_id(&table_id)
|
||||
.await
|
||||
.map_err(to_meta_err(snafu::location!()))?;
|
||||
let table_schema = ctx.table_source.table_from_id(&table_id).await?;
|
||||
let default_vals = table_schema
|
||||
.default_values
|
||||
.iter()
|
||||
@@ -210,9 +533,9 @@ impl common_meta::node_manager::Flownode for FlowWorkerManager {
|
||||
None => InternalSnafu {
|
||||
reason: format!("Expect column {idx} of table id={table_id} to have name in table schema, found None"),
|
||||
}
|
||||
.fail().map_err(BoxedError::new).context(ExternalSnafu),
|
||||
.fail(),
|
||||
})
|
||||
.collect::<Result<Vec<_>>>()?;
|
||||
.collect::<Result<Vec<_>, _>>()?;
|
||||
let name_to_col = HashMap::<_, _>::from_iter(
|
||||
insert_schema
|
||||
.iter()
|
||||
@@ -229,8 +552,8 @@ impl common_meta::node_manager::Flownode for FlowWorkerManager {
|
||||
.copied()
|
||||
.map(FetchFromRow::Idx)
|
||||
.or_else(|| col_default_val.clone().map(FetchFromRow::Default))
|
||||
.with_context(|| UnexpectedSnafu {
|
||||
err_msg: format!(
|
||||
.with_context(|| crate::error::UnexpectedSnafu {
|
||||
reason: format!(
|
||||
"Column not found: {}, default_value: {:?}",
|
||||
col_name, col_default_val
|
||||
),
|
||||
@@ -272,27 +595,9 @@ impl common_meta::node_manager::Flownode for FlowWorkerManager {
|
||||
}
|
||||
.into_error(err);
|
||||
common_telemetry::error!(err; "Failed to handle write request");
|
||||
let err = to_meta_err(snafu::location!())(err);
|
||||
return Err(err);
|
||||
}
|
||||
}
|
||||
Ok(Default::default())
|
||||
}
|
||||
}
|
||||
|
||||
/// Simple helper enum for fetching value from row with default value
|
||||
#[derive(Debug, Clone)]
|
||||
enum FetchFromRow {
|
||||
Idx(usize),
|
||||
Default(Value),
|
||||
}
|
||||
|
||||
impl FetchFromRow {
|
||||
/// Panic if idx is out of bound
|
||||
fn fetch(&self, row: &repr::Row) -> Value {
|
||||
match self {
|
||||
FetchFromRow::Idx(idx) => row.get(*idx).unwrap().clone(),
|
||||
FetchFromRow::Default(v) => v.clone(),
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -16,8 +16,8 @@
|
||||
|
||||
use std::time::Duration;
|
||||
|
||||
mod engine;
|
||||
mod frontend_client;
|
||||
pub(crate) mod engine;
|
||||
pub(crate) mod frontend_client;
|
||||
mod state;
|
||||
mod task;
|
||||
mod time_window;
|
||||
|
||||
@@ -12,10 +12,11 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
//! Batching mode engine
|
||||
|
||||
use std::collections::{BTreeMap, HashMap};
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::v1::flow::FlowResponse;
|
||||
use common_error::ext::BoxedError;
|
||||
use common_meta::ddl::create_flow::FlowType;
|
||||
use common_meta::key::flow::FlowMetadataManagerRef;
|
||||
@@ -30,13 +31,13 @@ use store_api::storage::RegionId;
|
||||
use table::metadata::TableId;
|
||||
use tokio::sync::{oneshot, RwLock};
|
||||
|
||||
use crate::adapter::{CreateFlowArgs, FlowId, TableName};
|
||||
use crate::batching_mode::frontend_client::FrontendClient;
|
||||
use crate::batching_mode::task::BatchingTask;
|
||||
use crate::batching_mode::time_window::{find_time_window_expr, TimeWindowExpr};
|
||||
use crate::batching_mode::utils::sql_to_df_plan;
|
||||
use crate::engine::FlowEngine;
|
||||
use crate::error::{ExternalSnafu, FlowAlreadyExistSnafu, TableNotFoundMetaSnafu, UnexpectedSnafu};
|
||||
use crate::Error;
|
||||
use crate::{CreateFlowArgs, Error, FlowId, TableName};
|
||||
|
||||
/// Batching mode Engine, responsible for driving all the batching mode tasks
|
||||
///
|
||||
@@ -67,10 +68,10 @@ impl BatchingEngine {
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn handle_inserts(
|
||||
pub async fn handle_inserts_inner(
|
||||
&self,
|
||||
request: api::v1::region::InsertRequests,
|
||||
) -> Result<FlowResponse, Error> {
|
||||
) -> Result<(), Error> {
|
||||
let table_info_mgr = self.table_meta.table_info_manager();
|
||||
let mut group_by_table_id: HashMap<TableId, Vec<api::v1::Rows>> = HashMap::new();
|
||||
|
||||
@@ -170,7 +171,7 @@ impl BatchingEngine {
|
||||
}
|
||||
drop(tasks);
|
||||
|
||||
Ok(Default::default())
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -191,7 +192,7 @@ async fn get_table_name(
|
||||
}
|
||||
|
||||
impl BatchingEngine {
|
||||
pub async fn create_flow(&self, args: CreateFlowArgs) -> Result<Option<FlowId>, Error> {
|
||||
pub async fn create_flow_inner(&self, args: CreateFlowArgs) -> Result<Option<FlowId>, Error> {
|
||||
let CreateFlowArgs {
|
||||
flow_id,
|
||||
sink_table_name,
|
||||
@@ -308,7 +309,7 @@ impl BatchingEngine {
|
||||
Ok(Some(flow_id))
|
||||
}
|
||||
|
||||
pub async fn remove_flow(&self, flow_id: FlowId) -> Result<(), Error> {
|
||||
pub async fn remove_flow_inner(&self, flow_id: FlowId) -> Result<(), Error> {
|
||||
if self.tasks.write().await.remove(&flow_id).is_none() {
|
||||
warn!("Flow {flow_id} not found in tasks")
|
||||
}
|
||||
@@ -324,19 +325,42 @@ impl BatchingEngine {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn flush_flow(&self, flow_id: FlowId) -> Result<(), Error> {
|
||||
pub async fn flush_flow_inner(&self, flow_id: FlowId) -> Result<usize, Error> {
|
||||
let task = self.tasks.read().await.get(&flow_id).cloned();
|
||||
let task = task.with_context(|| UnexpectedSnafu {
|
||||
reason: format!("Can't found task for flow {flow_id}"),
|
||||
})?;
|
||||
|
||||
task.gen_exec_once(&self.query_engine, &self.frontend_client)
|
||||
let res = task
|
||||
.gen_exec_once(&self.query_engine, &self.frontend_client)
|
||||
.await?;
|
||||
Ok(())
|
||||
let affected_rows = res.map(|(r, _)| r).unwrap_or_default() as usize;
|
||||
Ok(affected_rows)
|
||||
}
|
||||
|
||||
/// Determine if the batching mode flow task exists with given flow id
|
||||
pub async fn flow_exist(&self, flow_id: FlowId) -> bool {
|
||||
pub async fn flow_exist_inner(&self, flow_id: FlowId) -> bool {
|
||||
self.tasks.read().await.contains_key(&flow_id)
|
||||
}
|
||||
}
|
||||
|
||||
impl FlowEngine for BatchingEngine {
|
||||
async fn create_flow(&self, args: CreateFlowArgs) -> Result<Option<FlowId>, Error> {
|
||||
self.create_flow_inner(args).await
|
||||
}
|
||||
async fn remove_flow(&self, flow_id: FlowId) -> Result<(), Error> {
|
||||
self.remove_flow_inner(flow_id).await
|
||||
}
|
||||
async fn flush_flow(&self, flow_id: FlowId) -> Result<usize, Error> {
|
||||
self.flush_flow_inner(flow_id).await
|
||||
}
|
||||
async fn flow_exist(&self, flow_id: FlowId) -> Result<bool, Error> {
|
||||
Ok(self.flow_exist_inner(flow_id).await)
|
||||
}
|
||||
async fn handle_flow_inserts(
|
||||
&self,
|
||||
request: api::v1::region::InsertRequests,
|
||||
) -> Result<(), Error> {
|
||||
self.handle_inserts_inner(request).await
|
||||
}
|
||||
}
|
||||
|
||||
@@ -26,11 +26,10 @@ use snafu::ResultExt;
|
||||
use tokio::sync::oneshot;
|
||||
use tokio::time::Instant;
|
||||
|
||||
use crate::adapter::FlowId;
|
||||
use crate::batching_mode::task::BatchingTask;
|
||||
use crate::batching_mode::MIN_REFRESH_DURATION;
|
||||
use crate::error::{DatatypesSnafu, InternalSnafu, TimeSnafu};
|
||||
use crate::Error;
|
||||
use crate::{Error, FlowId};
|
||||
|
||||
/// The state of the [`BatchingTask`].
|
||||
#[derive(Debug)]
|
||||
|
||||
@@ -43,7 +43,7 @@ use tokio::sync::oneshot;
|
||||
use tokio::sync::oneshot::error::TryRecvError;
|
||||
use tokio::time::Instant;
|
||||
|
||||
use crate::adapter::{FlowId, AUTO_CREATED_PLACEHOLDER_TS_COL, AUTO_CREATED_UPDATE_AT_TS_COL};
|
||||
use crate::adapter::{AUTO_CREATED_PLACEHOLDER_TS_COL, AUTO_CREATED_UPDATE_AT_TS_COL};
|
||||
use crate::batching_mode::frontend_client::FrontendClient;
|
||||
use crate::batching_mode::state::TaskState;
|
||||
use crate::batching_mode::time_window::TimeWindowExpr;
|
||||
@@ -60,7 +60,7 @@ use crate::error::{
|
||||
use crate::metrics::{
|
||||
METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME, METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY,
|
||||
};
|
||||
use crate::Error;
|
||||
use crate::{Error, FlowId};
|
||||
|
||||
/// The task's config, immutable once created
|
||||
#[derive(Clone)]
|
||||
|
||||
57
src/flow/src/engine.rs
Normal file
57
src/flow/src/engine.rs
Normal file
@@ -0,0 +1,57 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
//! Define a trait for flow engine, which is used by both streaming engine and batch engine
|
||||
|
||||
use std::collections::HashMap;
|
||||
|
||||
use session::context::QueryContext;
|
||||
use table::metadata::TableId;
|
||||
|
||||
use crate::Error;
|
||||
// TODO(discord9): refactor common types for flow to a separate module
|
||||
/// FlowId is a unique identifier for a flow task
|
||||
pub type FlowId = u64;
|
||||
pub type TableName = [String; 3];
|
||||
|
||||
/// The arguments to create a flow
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct CreateFlowArgs {
|
||||
pub flow_id: FlowId,
|
||||
pub sink_table_name: TableName,
|
||||
pub source_table_ids: Vec<TableId>,
|
||||
pub create_if_not_exists: bool,
|
||||
pub or_replace: bool,
|
||||
pub expire_after: Option<i64>,
|
||||
pub comment: Option<String>,
|
||||
pub sql: String,
|
||||
pub flow_options: HashMap<String, String>,
|
||||
pub query_ctx: Option<QueryContext>,
|
||||
}
|
||||
|
||||
pub trait FlowEngine {
|
||||
/// Create a flow using the provided arguments, return previous flow id if exists and is replaced
|
||||
async fn create_flow(&self, args: CreateFlowArgs) -> Result<Option<FlowId>, Error>;
|
||||
/// Remove a flow by its ID
|
||||
async fn remove_flow(&self, flow_id: FlowId) -> Result<(), Error>;
|
||||
/// Flush the flow, return the number of rows flushed
|
||||
async fn flush_flow(&self, flow_id: FlowId) -> Result<usize, Error>;
|
||||
/// Check if the flow exists
|
||||
async fn flow_exist(&self, flow_id: FlowId) -> Result<bool, Error>;
|
||||
/// Handle the insert requests for the flow
|
||||
async fn handle_flow_inserts(
|
||||
&self,
|
||||
request: api::v1::region::InsertRequests,
|
||||
) -> Result<(), Error>;
|
||||
}
|
||||
@@ -25,8 +25,8 @@ use common_telemetry::common_error::status_code::StatusCode;
|
||||
use snafu::{Location, ResultExt, Snafu};
|
||||
use tonic::metadata::MetadataMap;
|
||||
|
||||
use crate::adapter::FlowId;
|
||||
use crate::expr::EvalError;
|
||||
use crate::FlowId;
|
||||
|
||||
/// This error is used to represent all possible errors that can occur in the flow module.
|
||||
#[derive(Snafu)]
|
||||
|
||||
@@ -26,9 +26,10 @@
|
||||
|
||||
// allow unused for now because it should be use later
|
||||
mod adapter;
|
||||
mod batching_mode;
|
||||
pub(crate) mod batching_mode;
|
||||
mod compute;
|
||||
mod df_optimizer;
|
||||
pub(crate) mod engine;
|
||||
pub mod error;
|
||||
mod expr;
|
||||
pub mod heartbeat;
|
||||
@@ -43,6 +44,8 @@ mod utils;
|
||||
mod test_utils;
|
||||
|
||||
pub use adapter::{FlowConfig, FlowWorkerManager, FlowWorkerManagerRef, FlownodeOptions};
|
||||
pub use batching_mode::frontend_client::FrontendClient;
|
||||
pub(crate) use engine::{CreateFlowArgs, FlowId, TableName};
|
||||
pub use error::{Error, Result};
|
||||
pub use server::{
|
||||
FlownodeBuilder, FlownodeInstance, FlownodeServer, FlownodeServiceBuilder, FrontendInvoker,
|
||||
|
||||
@@ -50,7 +50,7 @@ use tonic::codec::CompressionEncoding;
|
||||
use tonic::transport::server::TcpIncoming;
|
||||
use tonic::{Request, Response, Status};
|
||||
|
||||
use crate::adapter::{create_worker, CreateFlowArgs, FlowWorkerManagerRef};
|
||||
use crate::adapter::{create_worker, FlowWorkerManagerRef};
|
||||
use crate::error::{
|
||||
to_status_with_last_err, CacheRequiredSnafu, CreateFlowSnafu, ExternalSnafu, FlowNotFoundSnafu,
|
||||
ListFlowsSnafu, ParseAddrSnafu, ShutdownServerSnafu, StartServerSnafu, UnexpectedSnafu,
|
||||
@@ -59,12 +59,13 @@ use crate::heartbeat::HeartbeatTask;
|
||||
use crate::metrics::{METRIC_FLOW_PROCESSING_TIME, METRIC_FLOW_ROWS};
|
||||
use crate::transform::register_function_to_query_engine;
|
||||
use crate::utils::{SizeReportSender, StateReportHandler};
|
||||
use crate::{Error, FlowWorkerManager, FlownodeOptions};
|
||||
use crate::{CreateFlowArgs, Error, FlowWorkerManager, FlownodeOptions, FrontendClient};
|
||||
|
||||
pub const FLOW_NODE_SERVER_NAME: &str = "FLOW_NODE_SERVER";
|
||||
/// wrapping flow node manager to avoid orphan rule with Arc<...>
|
||||
#[derive(Clone)]
|
||||
pub struct FlowService {
|
||||
/// TODO(discord9): replace with dual engine
|
||||
pub manager: FlowWorkerManagerRef,
|
||||
}
|
||||
|
||||
@@ -290,6 +291,7 @@ pub struct FlownodeBuilder {
|
||||
heartbeat_task: Option<HeartbeatTask>,
|
||||
/// receive a oneshot sender to send state size report
|
||||
state_report_handler: Option<StateReportHandler>,
|
||||
frontend_client: Arc<FrontendClient>,
|
||||
}
|
||||
|
||||
impl FlownodeBuilder {
|
||||
@@ -300,6 +302,7 @@ impl FlownodeBuilder {
|
||||
table_meta: TableMetadataManagerRef,
|
||||
catalog_manager: CatalogManagerRef,
|
||||
flow_metadata_manager: FlowMetadataManagerRef,
|
||||
frontend_client: Arc<FrontendClient>,
|
||||
) -> Self {
|
||||
Self {
|
||||
opts,
|
||||
@@ -309,6 +312,7 @@ impl FlownodeBuilder {
|
||||
flow_metadata_manager,
|
||||
heartbeat_task: None,
|
||||
state_report_handler: None,
|
||||
frontend_client,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -432,7 +436,7 @@ impl FlownodeBuilder {
|
||||
),
|
||||
};
|
||||
manager
|
||||
.create_flow(args)
|
||||
.create_flow_inner(args)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.with_context(|_| CreateFlowSnafu {
|
||||
|
||||
@@ -117,6 +117,7 @@ impl MetaClientBuilder {
|
||||
.enable_store()
|
||||
.enable_heartbeat()
|
||||
.enable_procedure()
|
||||
.enable_access_cluster_info()
|
||||
}
|
||||
|
||||
pub fn enable_heartbeat(self) -> Self {
|
||||
|
||||
@@ -41,7 +41,7 @@ use common_procedure::options::ProcedureConfig;
|
||||
use common_procedure::ProcedureManagerRef;
|
||||
use common_wal::config::{DatanodeWalConfig, MetasrvWalConfig};
|
||||
use datanode::datanode::DatanodeBuilder;
|
||||
use flow::FlownodeBuilder;
|
||||
use flow::{FlownodeBuilder, FrontendClient};
|
||||
use frontend::frontend::Frontend;
|
||||
use frontend::instance::builder::FrontendBuilder;
|
||||
use frontend::instance::{Instance, StandaloneDatanodeManager};
|
||||
@@ -174,12 +174,15 @@ impl GreptimeDbStandaloneBuilder {
|
||||
Some(procedure_manager.clone()),
|
||||
);
|
||||
|
||||
let fe_server_addr = opts.frontend_options().grpc.bind_addr.clone();
|
||||
let frontend_client = FrontendClient::from_static_grpc_addr(fe_server_addr);
|
||||
let flow_builder = FlownodeBuilder::new(
|
||||
Default::default(),
|
||||
plugins.clone(),
|
||||
table_metadata_manager.clone(),
|
||||
catalog_manager.clone(),
|
||||
flow_metadata_manager.clone(),
|
||||
Arc::new(frontend_client),
|
||||
);
|
||||
let flownode = Arc::new(flow_builder.build().await.unwrap());
|
||||
|
||||
|
||||
Reference in New Issue
Block a user