From a3417f50cf8cdd961eb513c6b2e412e7fb2444bb Mon Sep 17 00:00:00 2001 From: discord9 Date: Mon, 6 May 2024 14:16:30 +0800 Subject: [PATCH] refactor: rename some `task` to `flow` --- src/flow/Cargo.toml | 2 +- src/flow/src/adapter.rs | 4 ++-- src/flow/src/adapter/server.rs | 14 +++++++------- 3 files changed, 10 insertions(+), 10 deletions(-) diff --git a/src/flow/Cargo.toml b/src/flow/Cargo.toml index e2c4d1cc16..5d124ece88 100644 --- a/src/flow/Cargo.toml +++ b/src/flow/Cargo.toml @@ -5,7 +5,7 @@ edition.workspace = true license.workspace = true [[bin]] -name = "mem-footprint" +name = "footprint" path = "bin/mem_footprint.rs" [lints] diff --git a/src/flow/src/adapter.rs b/src/flow/src/adapter.rs index cc5e4ec93e..4bfb0d8b17 100644 --- a/src/flow/src/adapter.rs +++ b/src/flow/src/adapter.rs @@ -438,7 +438,7 @@ impl FlowNodeManager { expire_when: Option, comment: Option, sql: String, - task_options: HashMap, + flow_options: HashMap, ) -> Result, Error> { if create_if_not_exist { // check if the task already exists @@ -467,7 +467,7 @@ impl FlowNodeManager { // TODO(discord9): parse `expire_when` let _ = expire_when; let _ = comment; - let _ = task_options; + let _ = flow_options; // TODO(discord9): add more than one handles let sink_id = node_ctx diff --git a/src/flow/src/adapter/server.rs b/src/flow/src/adapter/server.rs index 6594572974..61c2e285c3 100644 --- a/src/flow/src/adapter/server.rs +++ b/src/flow/src/adapter/server.rs @@ -17,7 +17,7 @@ use std::net::SocketAddr; use std::sync::Arc; -use api::v1::flow::{CreateRequest, RemoveRequest}; +use api::v1::flow::{CreateRequest, DropRequest}; use common_telemetry::tracing::info; use futures::FutureExt; use greptime_proto::v1::flow::{ @@ -50,14 +50,14 @@ impl flow_server::Flow for FlowService { ) -> Result, Status> { match request.into_inner().body { Some(flow_request::Body::Create(CreateRequest { - task_id: Some(task_id), + flow_id: Some(task_id), source_table_ids, sink_table_name: Some(sink_table_name), create_if_not_exists, expire_when, comment, sql, - task_options, + flow_options, })) => { let source_table_ids = source_table_ids.into_iter().map(|id| id.id).collect_vec(); let sink_table_id = self @@ -75,7 +75,7 @@ impl flow_server::Flow for FlowService { Some(expire_when), Some(comment), sql, - task_options, + flow_options, ) .await .map_err(|e| tonic::Status::internal(e.to_string()))?; @@ -87,11 +87,11 @@ impl flow_server::Flow for FlowService { ..Default::default() })) } - Some(flow_request::Body::Remove(RemoveRequest { - task_id: Some(task_id), + Some(flow_request::Body::Drop(DropRequest { + flow_id: Some(flow_id), })) => { self.manager - .remove_flow(task_id.id as u64) + .remove_flow(flow_id.id as u64) .await .map_err(|e| tonic::Status::internal(e.to_string()))?; Ok(Response::new(Default::default()))