feat: introduce reconciliation interface (#6614)

* feat: introduce reconcile interface

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions from CR

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions from CR

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: upgrade proto

Signed-off-by: WenyXu <wenymedia@gmail.com>

---------

Signed-off-by: WenyXu <wenymedia@gmail.com>
This commit is contained in:
Weny Xu
2025-08-04 17:12:48 +08:00
committed by GitHub
parent 280024d7f8
commit 414101fafa
27 changed files with 1138 additions and 107 deletions

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use api::v1::meta::ReconcileRequest;
use async_trait::async_trait;
use catalog::CatalogManagerRef;
use common_error::ext::BoxedError;
@@ -57,6 +58,17 @@ impl ProcedureServiceHandler for ProcedureServiceOperator {
.map(|pid| String::from_utf8_lossy(&pid.key).to_string()))
}
async fn reconcile(&self, request: ReconcileRequest) -> QueryResult<Option<String>> {
Ok(self
.procedure_executor
.reconcile(&ExecutorContext::default(), request)
.await
.map_err(BoxedError::new)
.context(query_error::ProcedureServiceSnafu)?
.pid
.map(|pid| String::from_utf8_lossy(&pid.key).to_string()))
}
async fn query_procedure_state(&self, pid: &str) -> QueryResult<ProcedureStateResponse> {
self.procedure_executor
.query_procedure_state(&ExecutorContext::default(), pid)