mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-25 17:30:41 +00:00
feat/bulk-support-flow-batch:
### Add Dirty Window Handling in Flow Module - **Updated `greptime-proto` Dependency**: Updated the `greptime-proto` dependency to a new revision in `Cargo.lock` and `Cargo.toml`. - **Flow Module Enhancements**: - Added `DirtyWindowRequest` handling in `flow.rs`, `node_manager.rs`, `test_util.rs`, `flownode_impl.rs`, and `server.rs`. - Implemented `handle_mark_window_dirty` function to manage dirty time windows. - **Bulk Insert Enhancements**: - Modified `bulk_insert.rs` to notify flownodes about dirty time windows using `update_flow_dirty_window`. - **Removed Unused Imports**: Cleaned up unused imports in `greptime_handler.rs`, `grpc.rs`, and `mod.rs`. Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
This commit is contained in:
@@ -12,7 +12,7 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use api::v1::flow::{FlowRequest, FlowResponse};
|
||||
use api::v1::flow::{DirtyWindowRequest, DirtyWindowRequests, FlowRequest, FlowResponse};
|
||||
use api::v1::region::InsertRequests;
|
||||
use common_error::ext::BoxedError;
|
||||
use common_meta::node_manager::Flownode;
|
||||
@@ -44,6 +44,16 @@ impl Flownode for FlowRequester {
|
||||
.map_err(BoxedError::new)
|
||||
.context(common_meta::error::ExternalSnafu)
|
||||
}
|
||||
|
||||
async fn handle_mark_window_dirty(
|
||||
&self,
|
||||
req: DirtyWindowRequest,
|
||||
) -> common_meta::error::Result<FlowResponse> {
|
||||
self.handle_mark_window_dirty(req)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(common_meta::error::ExternalSnafu)
|
||||
}
|
||||
}
|
||||
|
||||
impl FlowRequester {
|
||||
@@ -91,4 +101,20 @@ impl FlowRequester {
|
||||
.into_inner();
|
||||
Ok(response)
|
||||
}
|
||||
|
||||
async fn handle_mark_window_dirty(&self, req: DirtyWindowRequest) -> Result<FlowResponse> {
|
||||
let (addr, mut client) = self.client.raw_flow_client()?;
|
||||
let response = client
|
||||
.handle_mark_dirty_time_window(DirtyWindowRequests {
|
||||
requests: vec![req],
|
||||
})
|
||||
.await
|
||||
.or_else(|e| {
|
||||
let code = e.code();
|
||||
let err: crate::error::Error = e.into();
|
||||
Err(BoxedError::new(err)).context(FlowServerSnafu { addr, code })
|
||||
})?
|
||||
.into_inner();
|
||||
Ok(response)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user