mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-28 00:42:56 +00:00
chore: merge public repo (#12)
* feat: implement table flush (#1121) * feat: add flush method for trait * feat: implement flush via grpc * chore: move table_dir/region_name/region_id to table crate * chore: Update src/mito/src/table.rs --------- Co-authored-by: Yingwen <realevenyag@gmail.com> * fix: use correct env var (#1166) * fix: use correct env var * fix: move COPY up so rustup know it's nightly * fix: add `pyo3_backend` in GHA yml * chore: name for `TODO` * temp: not set `pyo3_backend` before find DSO * fix: release linux with pyo3_backend * fix: failed to run subquery wrapped in two parentheses (#1157) * refactor: add the separate GitHub Action job to push the image to the UCloud registry (#1170) * refactor: make the cmd hold the application instance (#1159) * fix: export 'PYO3_CROSS_LIB_DIR' when cargo build for aarch64-linux and refactor matrix opts (#1171) --------- Co-authored-by: Weny Xu <wenymedia@gmail.com> Co-authored-by: Yingwen <realevenyag@gmail.com> Co-authored-by: discord9 <55937128+discord9@users.noreply.github.com> Co-authored-by: LFC <luofucong@greptime.com> Co-authored-by: zyy17 <zyylsxm@gmail.com>
This commit is contained in:
71
.github/workflows/release.yml
vendored
71
.github/workflows/release.yml
vendored
@@ -29,10 +29,12 @@ jobs:
|
||||
os: ubuntu-2004-16-cores
|
||||
file: greptime-linux-amd64
|
||||
continue-on-error: false
|
||||
opts: "-F pyo3_backend"
|
||||
- arch: aarch64-unknown-linux-gnu
|
||||
os: ubuntu-2004-16-cores
|
||||
file: greptime-linux-arm64
|
||||
continue-on-error: true
|
||||
opts: "-F pyo3_backend"
|
||||
- arch: aarch64-apple-darwin
|
||||
os: macos-latest
|
||||
file: greptime-darwin-arm64
|
||||
@@ -103,8 +105,6 @@ jobs:
|
||||
run: |
|
||||
sudo chmod +x ./docker/aarch64/compile-python.sh
|
||||
sudo ./docker/aarch64/compile-python.sh
|
||||
export PYO3_CROSS_LIB_DIR=$(pwd)/python_arm64_build/lib
|
||||
echo $PYO3_CROSS_LIB_DIR
|
||||
|
||||
- name: Install rust toolchain
|
||||
uses: dtolnay/rust-toolchain@master
|
||||
@@ -118,13 +118,18 @@ jobs:
|
||||
- name: Run tests
|
||||
run: make unit-test integration-test sqlness-test
|
||||
|
||||
- name: Run cargo build with pyo3-backend
|
||||
if: contains(matrix.arch, 'linux') && endsWith(matrix.arch, '-gnu')
|
||||
run: cargo build ${{ matrix.opts }} --profile ${{ env.CARGO_PROFILE }} --locked --target ${{ matrix.arch }} -F pyo3_backend
|
||||
- name: Run cargo build for aarch64-linux
|
||||
if: contains(matrix.arch, 'aarch64-unknown-linux-gnu')
|
||||
run: |
|
||||
# TODO(zyy17): We should make PYO3_CROSS_LIB_DIR configurable.
|
||||
export PYO3_CROSS_LIB_DIR=$(pwd)/python_arm64_build/lib
|
||||
echo "PYO3_CROSS_LIB_DIR: $PYO3_CROSS_LIB_DIR"
|
||||
alias python=python3
|
||||
cargo build --profile ${{ env.CARGO_PROFILE }} --locked --target ${{ matrix.arch }} ${{ matrix.opts }}
|
||||
|
||||
- name: Run cargo build with macos
|
||||
if: contains(matrix.arch, 'darwin')
|
||||
run: cargo build ${{ matrix.opts }} --profile ${{ env.CARGO_PROFILE }} --locked --target ${{ matrix.arch }}
|
||||
- name: Run cargo build
|
||||
if: contains(matrix.arch, 'aarch64-unknown-linux-gnu') == false
|
||||
run: cargo build --profile ${{ env.CARGO_PROFILE }} --locked --target ${{ matrix.arch }} ${{ matrix.opts }}
|
||||
|
||||
- name: Calculate checksum and rename binary
|
||||
shell: bash
|
||||
@@ -269,8 +274,6 @@ jobs:
|
||||
tags: |
|
||||
greptime/greptimedb:latest
|
||||
greptime/greptimedb:${{ env.IMAGE_TAG }}
|
||||
uhub.service.ucloud.cn/greptime/greptimedb:latest
|
||||
uhub.service.ucloud.cn/greptime/greptimedb:${{ env.IMAGE_TAG }}
|
||||
|
||||
- name: Build and push amd64 only
|
||||
uses: docker/build-push-action@v3
|
||||
@@ -283,5 +286,49 @@ jobs:
|
||||
tags: |
|
||||
greptime/greptimedb:latest
|
||||
greptime/greptimedb:${{ env.IMAGE_TAG }}
|
||||
uhub.service.ucloud.cn/greptime/greptimedb:latest
|
||||
uhub.service.ucloud.cn/greptime/greptimedb:${{ env.IMAGE_TAG }}
|
||||
|
||||
docker-push-uhub:
|
||||
name: Push docker image to UCloud Container Registry
|
||||
needs: [docker]
|
||||
runs-on: ubuntu-latest
|
||||
if: github.repository == 'GreptimeTeam/greptimedb'
|
||||
# Push to uhub may fail(500 error), but we don't want to block the release process. The failed job will be retried manually.
|
||||
continue-on-error: true
|
||||
steps:
|
||||
- name: Checkout sources
|
||||
uses: actions/checkout@v3
|
||||
|
||||
- name: Set up QEMU
|
||||
uses: docker/setup-qemu-action@v2
|
||||
|
||||
- name: Set up Docker Buildx
|
||||
uses: docker/setup-buildx-action@v2
|
||||
|
||||
- name: Login to UCloud Container Registry
|
||||
uses: docker/login-action@v2
|
||||
with:
|
||||
registry: uhub.service.ucloud.cn
|
||||
username: ${{ secrets.UCLOUD_USERNAME }}
|
||||
password: ${{ secrets.UCLOUD_PASSWORD }}
|
||||
|
||||
- name: Configure scheduled build image tag # the tag would be ${SCHEDULED_BUILD_VERSION_PREFIX}-YYYYMMDD-${SCHEDULED_PERIOD}
|
||||
shell: bash
|
||||
if: github.event_name == 'schedule'
|
||||
run: |
|
||||
buildTime=`date "+%Y%m%d"`
|
||||
SCHEDULED_BUILD_VERSION=${{ env.SCHEDULED_BUILD_VERSION_PREFIX }}-$buildTime-${{ env.SCHEDULED_PERIOD }}
|
||||
echo "IMAGE_TAG=${SCHEDULED_BUILD_VERSION:1}" >> $GITHUB_ENV
|
||||
|
||||
- name: Configure tag # If the release tag is v0.1.0, then the image version tag will be 0.1.0.
|
||||
shell: bash
|
||||
if: github.event_name != 'schedule'
|
||||
run: |
|
||||
VERSION=${{ github.ref_name }}
|
||||
echo "IMAGE_TAG=${VERSION:1}" >> $GITHUB_ENV
|
||||
|
||||
- name: Push image to uhub # Use 'docker buildx imagetools create' to create a new image base on source image.
|
||||
run: |
|
||||
docker buildx imagetools create \
|
||||
--tag uhub.service.ucloud.cn/greptime/greptimedb:latest \
|
||||
--tag uhub.service.ucloud.cn/greptime/greptimedb:${{ env.IMAGE_TAG }} \
|
||||
greptime/greptimedb:${{ env.IMAGE_TAG }}
|
||||
|
||||
@@ -30,9 +30,39 @@ struct Command {
|
||||
subcmd: SubCommand,
|
||||
}
|
||||
|
||||
pub enum Application {
|
||||
Datanode(datanode::Instance),
|
||||
Frontend(frontend::Instance),
|
||||
Metasrv(metasrv::Instance),
|
||||
Standalone(standalone::Instance),
|
||||
Cli(cli::Instance),
|
||||
}
|
||||
|
||||
impl Application {
|
||||
async fn run(&mut self) -> Result<()> {
|
||||
match self {
|
||||
Application::Datanode(instance) => instance.run().await,
|
||||
Application::Frontend(instance) => instance.run().await,
|
||||
Application::Metasrv(instance) => instance.run().await,
|
||||
Application::Standalone(instance) => instance.run().await,
|
||||
Application::Cli(instance) => instance.run().await,
|
||||
}
|
||||
}
|
||||
|
||||
async fn stop(&self) -> Result<()> {
|
||||
match self {
|
||||
Application::Datanode(instance) => instance.stop().await,
|
||||
Application::Frontend(instance) => instance.stop().await,
|
||||
Application::Metasrv(instance) => instance.stop().await,
|
||||
Application::Standalone(instance) => instance.stop().await,
|
||||
Application::Cli(instance) => instance.stop().await,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Command {
|
||||
async fn run(self) -> Result<()> {
|
||||
self.subcmd.run().await
|
||||
async fn build(self) -> Result<Application> {
|
||||
self.subcmd.build().await
|
||||
}
|
||||
}
|
||||
|
||||
@@ -51,13 +81,28 @@ enum SubCommand {
|
||||
}
|
||||
|
||||
impl SubCommand {
|
||||
async fn run(self) -> Result<()> {
|
||||
async fn build(self) -> Result<Application> {
|
||||
match self {
|
||||
SubCommand::Datanode(cmd) => cmd.run().await,
|
||||
SubCommand::Frontend(cmd) => cmd.run().await,
|
||||
SubCommand::Metasrv(cmd) => cmd.run().await,
|
||||
SubCommand::Standalone(cmd) => cmd.run().await,
|
||||
SubCommand::Cli(cmd) => cmd.run().await,
|
||||
SubCommand::Datanode(cmd) => {
|
||||
let app = cmd.build().await?;
|
||||
Ok(Application::Datanode(app))
|
||||
}
|
||||
SubCommand::Frontend(cmd) => {
|
||||
let app = cmd.build().await?;
|
||||
Ok(Application::Frontend(app))
|
||||
}
|
||||
SubCommand::Metasrv(cmd) => {
|
||||
let app = cmd.build().await?;
|
||||
Ok(Application::Metasrv(app))
|
||||
}
|
||||
SubCommand::Standalone(cmd) => {
|
||||
let app = cmd.build().await?;
|
||||
Ok(Application::Standalone(app))
|
||||
}
|
||||
SubCommand::Cli(cmd) => {
|
||||
let app = cmd.build().await?;
|
||||
Ok(Application::Cli(app))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -104,13 +149,18 @@ async fn main() -> Result<()> {
|
||||
common_telemetry::init_default_metrics_recorder();
|
||||
let _guard = common_telemetry::init_global_logging(app_name, log_dir, log_level, false);
|
||||
|
||||
let mut app = cmd.build().await?;
|
||||
|
||||
tokio::select! {
|
||||
result = cmd.run() => {
|
||||
result = app.run() => {
|
||||
if let Err(err) = result {
|
||||
error!(err; "Fatal error occurs!");
|
||||
}
|
||||
}
|
||||
_ = tokio::signal::ctrl_c() => {
|
||||
if let Err(err) = app.stop().await {
|
||||
error!(err; "Fatal error occurs!");
|
||||
}
|
||||
info!("Goodbye!");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -17,10 +17,25 @@ mod helper;
|
||||
mod repl;
|
||||
|
||||
use clap::Parser;
|
||||
use repl::Repl;
|
||||
pub use repl::Repl;
|
||||
|
||||
use crate::error::Result;
|
||||
|
||||
pub struct Instance {
|
||||
repl: Repl,
|
||||
}
|
||||
|
||||
impl Instance {
|
||||
pub async fn run(&mut self) -> Result<()> {
|
||||
self.repl.run().await
|
||||
}
|
||||
|
||||
pub async fn stop(&self) -> Result<()> {
|
||||
// TODO: handle cli shutdown
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Parser)]
|
||||
pub struct Command {
|
||||
#[clap(subcommand)]
|
||||
@@ -28,8 +43,8 @@ pub struct Command {
|
||||
}
|
||||
|
||||
impl Command {
|
||||
pub async fn run(self) -> Result<()> {
|
||||
self.cmd.run().await
|
||||
pub async fn build(self) -> Result<Instance> {
|
||||
self.cmd.build().await
|
||||
}
|
||||
}
|
||||
|
||||
@@ -39,9 +54,9 @@ enum SubCommand {
|
||||
}
|
||||
|
||||
impl SubCommand {
|
||||
async fn run(self) -> Result<()> {
|
||||
async fn build(self) -> Result<Instance> {
|
||||
match self {
|
||||
SubCommand::Attach(cmd) => cmd.run().await,
|
||||
SubCommand::Attach(cmd) => cmd.build().await,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -57,8 +72,8 @@ pub(crate) struct AttachCommand {
|
||||
}
|
||||
|
||||
impl AttachCommand {
|
||||
async fn run(self) -> Result<()> {
|
||||
let mut repl = Repl::try_new(&self).await?;
|
||||
repl.run().await
|
||||
async fn build(self) -> Result<Instance> {
|
||||
let repl = Repl::try_new(&self).await?;
|
||||
Ok(Instance { repl })
|
||||
}
|
||||
}
|
||||
|
||||
@@ -50,7 +50,7 @@ use crate::error::{
|
||||
};
|
||||
|
||||
/// Captures the state of the repl, gathers commands and executes them one by one
|
||||
pub(crate) struct Repl {
|
||||
pub struct Repl {
|
||||
/// Rustyline editor for interacting with user on command line
|
||||
rl: Editor<RustylineHelper>,
|
||||
|
||||
|
||||
@@ -24,6 +24,21 @@ use snafu::ResultExt;
|
||||
use crate::error::{Error, MissingConfigSnafu, Result, StartDatanodeSnafu};
|
||||
use crate::toml_loader;
|
||||
|
||||
pub struct Instance {
|
||||
datanode: Datanode,
|
||||
}
|
||||
|
||||
impl Instance {
|
||||
pub async fn run(&mut self) -> Result<()> {
|
||||
self.datanode.start().await.context(StartDatanodeSnafu)
|
||||
}
|
||||
|
||||
pub async fn stop(&self) -> Result<()> {
|
||||
// TODO: handle datanode shutdown
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Parser)]
|
||||
pub struct Command {
|
||||
#[clap(subcommand)]
|
||||
@@ -31,8 +46,8 @@ pub struct Command {
|
||||
}
|
||||
|
||||
impl Command {
|
||||
pub async fn run(self) -> Result<()> {
|
||||
self.subcmd.run().await
|
||||
pub async fn build(self) -> Result<Instance> {
|
||||
self.subcmd.build().await
|
||||
}
|
||||
}
|
||||
|
||||
@@ -42,9 +57,9 @@ enum SubCommand {
|
||||
}
|
||||
|
||||
impl SubCommand {
|
||||
async fn run(self) -> Result<()> {
|
||||
async fn build(self) -> Result<Instance> {
|
||||
match self {
|
||||
SubCommand::Start(cmd) => cmd.run().await,
|
||||
SubCommand::Start(cmd) => cmd.build().await,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -72,19 +87,16 @@ struct StartCommand {
|
||||
}
|
||||
|
||||
impl StartCommand {
|
||||
async fn run(self) -> Result<()> {
|
||||
async fn build(self) -> Result<Instance> {
|
||||
logging::info!("Datanode start command: {:#?}", self);
|
||||
|
||||
let opts: DatanodeOptions = self.try_into()?;
|
||||
|
||||
logging::info!("Datanode options: {:#?}", opts);
|
||||
|
||||
Datanode::new(opts)
|
||||
.await
|
||||
.context(StartDatanodeSnafu)?
|
||||
.start()
|
||||
.await
|
||||
.context(StartDatanodeSnafu)
|
||||
let datanode = Datanode::new(opts).await.context(StartDatanodeSnafu)?;
|
||||
|
||||
Ok(Instance { datanode })
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -32,6 +32,12 @@ pub enum Error {
|
||||
source: frontend::error::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to build meta server, source: {}", source))]
|
||||
BuildMetaServer {
|
||||
#[snafu(backtrace)]
|
||||
source: meta_srv::error::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to start meta server, source: {}", source))]
|
||||
StartMetaServer {
|
||||
#[snafu(backtrace)]
|
||||
@@ -138,6 +144,7 @@ impl ErrorExt for Error {
|
||||
Error::StartDatanode { source } => source.status_code(),
|
||||
Error::StartFrontend { source } => source.status_code(),
|
||||
Error::StartMetaServer { source } => source.status_code(),
|
||||
Error::BuildMetaServer { source } => source.status_code(),
|
||||
Error::UnsupportedSelectorType { source, .. } => source.status_code(),
|
||||
Error::ReadConfig { .. } | Error::ParseConfig { .. } | Error::MissingConfig { .. } => {
|
||||
StatusCode::InvalidArguments
|
||||
|
||||
@@ -19,7 +19,7 @@ use common_base::Plugins;
|
||||
use frontend::frontend::FrontendOptions;
|
||||
use frontend::grpc::GrpcOptions;
|
||||
use frontend::influxdb::InfluxdbOptions;
|
||||
use frontend::instance::{FrontendInstance, Instance};
|
||||
use frontend::instance::{FrontendInstance, Instance as FeInstance};
|
||||
use frontend::mysql::MysqlOptions;
|
||||
use frontend::opentsdb::OpentsdbOptions;
|
||||
use frontend::postgres::PostgresOptions;
|
||||
@@ -34,6 +34,24 @@ use snafu::ResultExt;
|
||||
use crate::error::{self, IllegalAuthConfigSnafu, Result};
|
||||
use crate::toml_loader;
|
||||
|
||||
pub struct Instance {
|
||||
frontend: FeInstance,
|
||||
}
|
||||
|
||||
impl Instance {
|
||||
pub async fn run(&mut self) -> Result<()> {
|
||||
self.frontend
|
||||
.start()
|
||||
.await
|
||||
.context(error::StartFrontendSnafu)
|
||||
}
|
||||
|
||||
pub async fn stop(&self) -> Result<()> {
|
||||
// TODO: handle frontend shutdown
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Parser)]
|
||||
pub struct Command {
|
||||
#[clap(subcommand)]
|
||||
@@ -41,8 +59,8 @@ pub struct Command {
|
||||
}
|
||||
|
||||
impl Command {
|
||||
pub async fn run(self) -> Result<()> {
|
||||
self.subcmd.run().await
|
||||
pub async fn build(self) -> Result<Instance> {
|
||||
self.subcmd.build().await
|
||||
}
|
||||
}
|
||||
|
||||
@@ -52,9 +70,9 @@ enum SubCommand {
|
||||
}
|
||||
|
||||
impl SubCommand {
|
||||
async fn run(self) -> Result<()> {
|
||||
async fn build(self) -> Result<Instance> {
|
||||
match self {
|
||||
SubCommand::Start(cmd) => cmd.run().await,
|
||||
SubCommand::Start(cmd) => cmd.build().await,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -90,11 +108,11 @@ pub struct StartCommand {
|
||||
}
|
||||
|
||||
impl StartCommand {
|
||||
async fn run(self) -> Result<()> {
|
||||
async fn build(self) -> Result<Instance> {
|
||||
let plugins = Arc::new(load_frontend_plugins(&self.user_provider)?);
|
||||
let opts: FrontendOptions = self.try_into()?;
|
||||
|
||||
let mut instance = Instance::try_new_distributed(&opts, plugins.clone())
|
||||
let mut instance = FeInstance::try_new_distributed(&opts, plugins.clone())
|
||||
.await
|
||||
.context(error::StartFrontendSnafu)?;
|
||||
|
||||
@@ -103,7 +121,7 @@ impl StartCommand {
|
||||
.await
|
||||
.context(error::StartFrontendSnafu)?;
|
||||
|
||||
instance.start().await.context(error::StartFrontendSnafu)
|
||||
Ok(Instance { frontend: instance })
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -14,13 +14,32 @@
|
||||
|
||||
use clap::Parser;
|
||||
use common_telemetry::{info, logging, warn};
|
||||
use meta_srv::bootstrap;
|
||||
use meta_srv::bootstrap::MetaSrvInstance;
|
||||
use meta_srv::metasrv::MetaSrvOptions;
|
||||
use snafu::ResultExt;
|
||||
|
||||
use crate::error::{Error, Result};
|
||||
use crate::{error, toml_loader};
|
||||
|
||||
pub struct Instance {
|
||||
instance: MetaSrvInstance,
|
||||
}
|
||||
|
||||
impl Instance {
|
||||
pub async fn run(&mut self) -> Result<()> {
|
||||
self.instance
|
||||
.start()
|
||||
.await
|
||||
.context(error::StartMetaServerSnafu)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn stop(&self) -> Result<()> {
|
||||
// TODO: handle metasrv shutdown
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Parser)]
|
||||
pub struct Command {
|
||||
#[clap(subcommand)]
|
||||
@@ -28,8 +47,8 @@ pub struct Command {
|
||||
}
|
||||
|
||||
impl Command {
|
||||
pub async fn run(self) -> Result<()> {
|
||||
self.subcmd.run().await
|
||||
pub async fn build(self) -> Result<Instance> {
|
||||
self.subcmd.build().await
|
||||
}
|
||||
}
|
||||
|
||||
@@ -39,9 +58,9 @@ enum SubCommand {
|
||||
}
|
||||
|
||||
impl SubCommand {
|
||||
async fn run(self) -> Result<()> {
|
||||
async fn build(self) -> Result<Instance> {
|
||||
match self {
|
||||
SubCommand::Start(cmd) => cmd.run().await,
|
||||
SubCommand::Start(cmd) => cmd.build().await,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -63,16 +82,17 @@ struct StartCommand {
|
||||
}
|
||||
|
||||
impl StartCommand {
|
||||
async fn run(self) -> Result<()> {
|
||||
async fn build(self) -> Result<Instance> {
|
||||
logging::info!("MetaSrv start command: {:#?}", self);
|
||||
|
||||
let opts: MetaSrvOptions = self.try_into()?;
|
||||
|
||||
logging::info!("MetaSrv options: {:#?}", opts);
|
||||
|
||||
bootstrap::bootstrap_meta_srv(opts)
|
||||
let instance = MetaSrvInstance::new(opts)
|
||||
.await
|
||||
.context(error::StartMetaServerSnafu)
|
||||
.context(error::BuildMetaServerSnafu)?;
|
||||
|
||||
Ok(Instance { instance })
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -47,8 +47,8 @@ pub struct Command {
|
||||
}
|
||||
|
||||
impl Command {
|
||||
pub async fn run(self) -> Result<()> {
|
||||
self.subcmd.run().await
|
||||
pub async fn build(self) -> Result<Instance> {
|
||||
self.subcmd.build().await
|
||||
}
|
||||
}
|
||||
|
||||
@@ -58,9 +58,9 @@ enum SubCommand {
|
||||
}
|
||||
|
||||
impl SubCommand {
|
||||
async fn run(self) -> Result<()> {
|
||||
async fn build(self) -> Result<Instance> {
|
||||
match self {
|
||||
SubCommand::Start(cmd) => cmd.run().await,
|
||||
SubCommand::Start(cmd) => cmd.build().await,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -133,6 +133,30 @@ impl StandaloneOptions {
|
||||
}
|
||||
}
|
||||
|
||||
pub struct Instance {
|
||||
datanode: Datanode,
|
||||
frontend: FeInstance,
|
||||
}
|
||||
|
||||
impl Instance {
|
||||
pub async fn run(&mut self) -> Result<()> {
|
||||
// Start datanode instance before starting services, to avoid requests come in before internal components are started.
|
||||
self.datanode
|
||||
.start_instance()
|
||||
.await
|
||||
.context(StartDatanodeSnafu)?;
|
||||
info!("Datanode instance started");
|
||||
|
||||
self.frontend.start().await.context(StartFrontendSnafu)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn stop(&self) -> Result<()> {
|
||||
// TODO: handle standalone shutdown
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Parser)]
|
||||
struct StartCommand {
|
||||
#[clap(long)]
|
||||
@@ -164,7 +188,7 @@ struct StartCommand {
|
||||
}
|
||||
|
||||
impl StartCommand {
|
||||
async fn run(self) -> Result<()> {
|
||||
async fn build(self) -> Result<Instance> {
|
||||
let enable_memory_catalog = self.enable_memory_catalog;
|
||||
let config_file = self.config_file.clone();
|
||||
let plugins = Arc::new(load_frontend_plugins(&self.user_provider)?);
|
||||
@@ -184,25 +208,18 @@ impl StartCommand {
|
||||
fe_opts, dn_opts
|
||||
);
|
||||
|
||||
let mut datanode = Datanode::new(dn_opts.clone())
|
||||
let datanode = Datanode::new(dn_opts.clone())
|
||||
.await
|
||||
.context(StartDatanodeSnafu)?;
|
||||
let mut frontend = build_frontend(plugins.clone(), datanode.get_instance()).await?;
|
||||
|
||||
// Start datanode instance before starting services, to avoid requests come in before internal components are started.
|
||||
datanode
|
||||
.start_instance()
|
||||
.await
|
||||
.context(StartDatanodeSnafu)?;
|
||||
info!("Datanode instance started");
|
||||
let mut frontend = build_frontend(plugins.clone(), datanode.get_instance()).await?;
|
||||
|
||||
frontend
|
||||
.build_servers(&fe_opts, plugins)
|
||||
.await
|
||||
.context(StartFrontendSnafu)?;
|
||||
|
||||
frontend.start().await.context(StartFrontendSnafu)?;
|
||||
Ok(())
|
||||
Ok(Instance { datanode, frontend })
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -39,18 +39,45 @@ use crate::service::store::kv::ResettableKvStoreRef;
|
||||
use crate::service::store::memory::MemStore;
|
||||
use crate::{error, Result};
|
||||
|
||||
// Bootstrap the rpc server to serve incoming request
|
||||
pub async fn bootstrap_meta_srv(opts: MetaSrvOptions) -> Result<()> {
|
||||
let meta_srv = make_meta_srv(opts.clone()).await?;
|
||||
bootstrap_meta_srv_with_router(opts, router(meta_srv)).await
|
||||
#[derive(Clone)]
|
||||
pub struct MetaSrvInstance {
|
||||
meta_srv: MetaSrv,
|
||||
|
||||
opts: MetaSrvOptions,
|
||||
}
|
||||
|
||||
pub async fn bootstrap_meta_srv_with_router(opts: MetaSrvOptions, router: Router) -> Result<()> {
|
||||
let listener = TcpListener::bind(&opts.bind_addr)
|
||||
impl MetaSrvInstance {
|
||||
pub async fn new(opts: MetaSrvOptions) -> Result<MetaSrvInstance> {
|
||||
let meta_srv = build_meta_srv(&opts).await?;
|
||||
|
||||
Ok(MetaSrvInstance { meta_srv, opts })
|
||||
}
|
||||
|
||||
pub async fn start(&self) -> Result<()> {
|
||||
self.meta_srv.start().await;
|
||||
bootstrap_meta_srv_with_router(&self.opts.bind_addr, router(self.meta_srv.clone())).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn close(&self) -> Result<()> {
|
||||
// TODO: shutdown the router
|
||||
self.meta_srv.shutdown();
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
// Bootstrap the rpc server to serve incoming request
|
||||
pub async fn bootstrap_meta_srv(opts: MetaSrvOptions) -> Result<()> {
|
||||
let meta_srv = make_meta_srv(&opts).await?;
|
||||
bootstrap_meta_srv_with_router(&opts.bind_addr, router(meta_srv)).await
|
||||
}
|
||||
|
||||
pub async fn bootstrap_meta_srv_with_router(bind_addr: &str, router: Router) -> Result<()> {
|
||||
let listener = TcpListener::bind(bind_addr)
|
||||
.await
|
||||
.context(error::TcpBindSnafu {
|
||||
addr: &opts.bind_addr,
|
||||
})?;
|
||||
.context(error::TcpBindSnafu { addr: bind_addr })?;
|
||||
let listener = TcpListenerStream::new(listener);
|
||||
|
||||
router
|
||||
@@ -72,7 +99,7 @@ pub fn router(meta_srv: MetaSrv) -> Router {
|
||||
.add_service(admin::make_admin_service(meta_srv))
|
||||
}
|
||||
|
||||
pub async fn make_meta_srv(opts: MetaSrvOptions) -> Result<MetaSrv> {
|
||||
pub async fn build_meta_srv(opts: &MetaSrvOptions) -> Result<MetaSrv> {
|
||||
let (kv_store, election, lock) = if opts.use_memory_store {
|
||||
(Arc::new(MemStore::new()) as _, None, None)
|
||||
} else {
|
||||
@@ -107,7 +134,7 @@ pub async fn make_meta_srv(opts: MetaSrvOptions) -> Result<MetaSrv> {
|
||||
};
|
||||
|
||||
let meta_srv = MetaSrvBuilder::new()
|
||||
.options(opts)
|
||||
.options(opts.clone())
|
||||
.kv_store(kv_store)
|
||||
.in_memory(in_memory)
|
||||
.selector(selector)
|
||||
@@ -117,6 +144,12 @@ pub async fn make_meta_srv(opts: MetaSrvOptions) -> Result<MetaSrv> {
|
||||
.build()
|
||||
.await;
|
||||
|
||||
Ok(meta_srv)
|
||||
}
|
||||
|
||||
pub async fn make_meta_srv(opts: &MetaSrvOptions) -> Result<MetaSrv> {
|
||||
let meta_srv = build_meta_srv(opts).await?;
|
||||
|
||||
meta_srv.start().await;
|
||||
|
||||
Ok(meta_srv)
|
||||
|
||||
Reference in New Issue
Block a user