From 7b8e65ce93bd7988a615eab8212c35aabfc91549 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Tue, 14 Mar 2023 16:42:40 +0800 Subject: [PATCH] chore: merge public repo (#12) * feat: implement table flush (#1121) * feat: add flush method for trait * feat: implement flush via grpc * chore: move table_dir/region_name/region_id to table crate * chore: Update src/mito/src/table.rs --------- Co-authored-by: Yingwen * fix: use correct env var (#1166) * fix: use correct env var * fix: move COPY up so rustup know it's nightly * fix: add `pyo3_backend` in GHA yml * chore: name for `TODO` * temp: not set `pyo3_backend` before find DSO * fix: release linux with pyo3_backend * fix: failed to run subquery wrapped in two parentheses (#1157) * refactor: add the separate GitHub Action job to push the image to the UCloud registry (#1170) * refactor: make the cmd hold the application instance (#1159) * fix: export 'PYO3_CROSS_LIB_DIR' when cargo build for aarch64-linux and refactor matrix opts (#1171) --------- Co-authored-by: Weny Xu Co-authored-by: Yingwen Co-authored-by: discord9 <55937128+discord9@users.noreply.github.com> Co-authored-by: LFC Co-authored-by: zyy17 --- .github/workflows/release.yml | 71 +++++++++++++++++++++++++++++------ src/cmd/src/bin/greptime.rs | 68 ++++++++++++++++++++++++++++----- src/cmd/src/cli.rs | 31 +++++++++++---- src/cmd/src/cli/repl.rs | 2 +- src/cmd/src/datanode.rs | 34 +++++++++++------ src/cmd/src/error.rs | 7 ++++ src/cmd/src/frontend.rs | 34 +++++++++++++---- src/cmd/src/metasrv.rs | 38 ++++++++++++++----- src/cmd/src/standalone.rs | 47 +++++++++++++++-------- src/meta-srv/src/bootstrap.rs | 55 +++++++++++++++++++++------ 10 files changed, 303 insertions(+), 84 deletions(-) diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 7032179350..77b6c15874 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -29,10 +29,12 @@ jobs: os: ubuntu-2004-16-cores file: greptime-linux-amd64 continue-on-error: false + opts: "-F pyo3_backend" - arch: aarch64-unknown-linux-gnu os: ubuntu-2004-16-cores file: greptime-linux-arm64 continue-on-error: true + opts: "-F pyo3_backend" - arch: aarch64-apple-darwin os: macos-latest file: greptime-darwin-arm64 @@ -103,8 +105,6 @@ jobs: run: | sudo chmod +x ./docker/aarch64/compile-python.sh sudo ./docker/aarch64/compile-python.sh - export PYO3_CROSS_LIB_DIR=$(pwd)/python_arm64_build/lib - echo $PYO3_CROSS_LIB_DIR - name: Install rust toolchain uses: dtolnay/rust-toolchain@master @@ -118,13 +118,18 @@ jobs: - name: Run tests run: make unit-test integration-test sqlness-test - - name: Run cargo build with pyo3-backend - if: contains(matrix.arch, 'linux') && endsWith(matrix.arch, '-gnu') - run: cargo build ${{ matrix.opts }} --profile ${{ env.CARGO_PROFILE }} --locked --target ${{ matrix.arch }} -F pyo3_backend + - name: Run cargo build for aarch64-linux + if: contains(matrix.arch, 'aarch64-unknown-linux-gnu') + run: | + # TODO(zyy17): We should make PYO3_CROSS_LIB_DIR configurable. + export PYO3_CROSS_LIB_DIR=$(pwd)/python_arm64_build/lib + echo "PYO3_CROSS_LIB_DIR: $PYO3_CROSS_LIB_DIR" + alias python=python3 + cargo build --profile ${{ env.CARGO_PROFILE }} --locked --target ${{ matrix.arch }} ${{ matrix.opts }} - - name: Run cargo build with macos - if: contains(matrix.arch, 'darwin') - run: cargo build ${{ matrix.opts }} --profile ${{ env.CARGO_PROFILE }} --locked --target ${{ matrix.arch }} + - name: Run cargo build + if: contains(matrix.arch, 'aarch64-unknown-linux-gnu') == false + run: cargo build --profile ${{ env.CARGO_PROFILE }} --locked --target ${{ matrix.arch }} ${{ matrix.opts }} - name: Calculate checksum and rename binary shell: bash @@ -269,8 +274,6 @@ jobs: tags: | greptime/greptimedb:latest greptime/greptimedb:${{ env.IMAGE_TAG }} - uhub.service.ucloud.cn/greptime/greptimedb:latest - uhub.service.ucloud.cn/greptime/greptimedb:${{ env.IMAGE_TAG }} - name: Build and push amd64 only uses: docker/build-push-action@v3 @@ -283,5 +286,49 @@ jobs: tags: | greptime/greptimedb:latest greptime/greptimedb:${{ env.IMAGE_TAG }} - uhub.service.ucloud.cn/greptime/greptimedb:latest - uhub.service.ucloud.cn/greptime/greptimedb:${{ env.IMAGE_TAG }} + + docker-push-uhub: + name: Push docker image to UCloud Container Registry + needs: [docker] + runs-on: ubuntu-latest + if: github.repository == 'GreptimeTeam/greptimedb' + # Push to uhub may fail(500 error), but we don't want to block the release process. The failed job will be retried manually. + continue-on-error: true + steps: + - name: Checkout sources + uses: actions/checkout@v3 + + - name: Set up QEMU + uses: docker/setup-qemu-action@v2 + + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v2 + + - name: Login to UCloud Container Registry + uses: docker/login-action@v2 + with: + registry: uhub.service.ucloud.cn + username: ${{ secrets.UCLOUD_USERNAME }} + password: ${{ secrets.UCLOUD_PASSWORD }} + + - name: Configure scheduled build image tag # the tag would be ${SCHEDULED_BUILD_VERSION_PREFIX}-YYYYMMDD-${SCHEDULED_PERIOD} + shell: bash + if: github.event_name == 'schedule' + run: | + buildTime=`date "+%Y%m%d"` + SCHEDULED_BUILD_VERSION=${{ env.SCHEDULED_BUILD_VERSION_PREFIX }}-$buildTime-${{ env.SCHEDULED_PERIOD }} + echo "IMAGE_TAG=${SCHEDULED_BUILD_VERSION:1}" >> $GITHUB_ENV + + - name: Configure tag # If the release tag is v0.1.0, then the image version tag will be 0.1.0. + shell: bash + if: github.event_name != 'schedule' + run: | + VERSION=${{ github.ref_name }} + echo "IMAGE_TAG=${VERSION:1}" >> $GITHUB_ENV + + - name: Push image to uhub # Use 'docker buildx imagetools create' to create a new image base on source image. + run: | + docker buildx imagetools create \ + --tag uhub.service.ucloud.cn/greptime/greptimedb:latest \ + --tag uhub.service.ucloud.cn/greptime/greptimedb:${{ env.IMAGE_TAG }} \ + greptime/greptimedb:${{ env.IMAGE_TAG }} diff --git a/src/cmd/src/bin/greptime.rs b/src/cmd/src/bin/greptime.rs index b532c4d088..e16dc6a559 100644 --- a/src/cmd/src/bin/greptime.rs +++ b/src/cmd/src/bin/greptime.rs @@ -30,9 +30,39 @@ struct Command { subcmd: SubCommand, } +pub enum Application { + Datanode(datanode::Instance), + Frontend(frontend::Instance), + Metasrv(metasrv::Instance), + Standalone(standalone::Instance), + Cli(cli::Instance), +} + +impl Application { + async fn run(&mut self) -> Result<()> { + match self { + Application::Datanode(instance) => instance.run().await, + Application::Frontend(instance) => instance.run().await, + Application::Metasrv(instance) => instance.run().await, + Application::Standalone(instance) => instance.run().await, + Application::Cli(instance) => instance.run().await, + } + } + + async fn stop(&self) -> Result<()> { + match self { + Application::Datanode(instance) => instance.stop().await, + Application::Frontend(instance) => instance.stop().await, + Application::Metasrv(instance) => instance.stop().await, + Application::Standalone(instance) => instance.stop().await, + Application::Cli(instance) => instance.stop().await, + } + } +} + impl Command { - async fn run(self) -> Result<()> { - self.subcmd.run().await + async fn build(self) -> Result { + self.subcmd.build().await } } @@ -51,13 +81,28 @@ enum SubCommand { } impl SubCommand { - async fn run(self) -> Result<()> { + async fn build(self) -> Result { match self { - SubCommand::Datanode(cmd) => cmd.run().await, - SubCommand::Frontend(cmd) => cmd.run().await, - SubCommand::Metasrv(cmd) => cmd.run().await, - SubCommand::Standalone(cmd) => cmd.run().await, - SubCommand::Cli(cmd) => cmd.run().await, + SubCommand::Datanode(cmd) => { + let app = cmd.build().await?; + Ok(Application::Datanode(app)) + } + SubCommand::Frontend(cmd) => { + let app = cmd.build().await?; + Ok(Application::Frontend(app)) + } + SubCommand::Metasrv(cmd) => { + let app = cmd.build().await?; + Ok(Application::Metasrv(app)) + } + SubCommand::Standalone(cmd) => { + let app = cmd.build().await?; + Ok(Application::Standalone(app)) + } + SubCommand::Cli(cmd) => { + let app = cmd.build().await?; + Ok(Application::Cli(app)) + } } } } @@ -104,13 +149,18 @@ async fn main() -> Result<()> { common_telemetry::init_default_metrics_recorder(); let _guard = common_telemetry::init_global_logging(app_name, log_dir, log_level, false); + let mut app = cmd.build().await?; + tokio::select! { - result = cmd.run() => { + result = app.run() => { if let Err(err) = result { error!(err; "Fatal error occurs!"); } } _ = tokio::signal::ctrl_c() => { + if let Err(err) = app.stop().await { + error!(err; "Fatal error occurs!"); + } info!("Goodbye!"); } } diff --git a/src/cmd/src/cli.rs b/src/cmd/src/cli.rs index dbabaf8e60..0563dcc120 100644 --- a/src/cmd/src/cli.rs +++ b/src/cmd/src/cli.rs @@ -17,10 +17,25 @@ mod helper; mod repl; use clap::Parser; -use repl::Repl; +pub use repl::Repl; use crate::error::Result; +pub struct Instance { + repl: Repl, +} + +impl Instance { + pub async fn run(&mut self) -> Result<()> { + self.repl.run().await + } + + pub async fn stop(&self) -> Result<()> { + // TODO: handle cli shutdown + Ok(()) + } +} + #[derive(Parser)] pub struct Command { #[clap(subcommand)] @@ -28,8 +43,8 @@ pub struct Command { } impl Command { - pub async fn run(self) -> Result<()> { - self.cmd.run().await + pub async fn build(self) -> Result { + self.cmd.build().await } } @@ -39,9 +54,9 @@ enum SubCommand { } impl SubCommand { - async fn run(self) -> Result<()> { + async fn build(self) -> Result { match self { - SubCommand::Attach(cmd) => cmd.run().await, + SubCommand::Attach(cmd) => cmd.build().await, } } } @@ -57,8 +72,8 @@ pub(crate) struct AttachCommand { } impl AttachCommand { - async fn run(self) -> Result<()> { - let mut repl = Repl::try_new(&self).await?; - repl.run().await + async fn build(self) -> Result { + let repl = Repl::try_new(&self).await?; + Ok(Instance { repl }) } } diff --git a/src/cmd/src/cli/repl.rs b/src/cmd/src/cli/repl.rs index 144f3ac410..aa77df8e22 100644 --- a/src/cmd/src/cli/repl.rs +++ b/src/cmd/src/cli/repl.rs @@ -50,7 +50,7 @@ use crate::error::{ }; /// Captures the state of the repl, gathers commands and executes them one by one -pub(crate) struct Repl { +pub struct Repl { /// Rustyline editor for interacting with user on command line rl: Editor, diff --git a/src/cmd/src/datanode.rs b/src/cmd/src/datanode.rs index 405dc36f5d..1aa095e21b 100644 --- a/src/cmd/src/datanode.rs +++ b/src/cmd/src/datanode.rs @@ -24,6 +24,21 @@ use snafu::ResultExt; use crate::error::{Error, MissingConfigSnafu, Result, StartDatanodeSnafu}; use crate::toml_loader; +pub struct Instance { + datanode: Datanode, +} + +impl Instance { + pub async fn run(&mut self) -> Result<()> { + self.datanode.start().await.context(StartDatanodeSnafu) + } + + pub async fn stop(&self) -> Result<()> { + // TODO: handle datanode shutdown + Ok(()) + } +} + #[derive(Parser)] pub struct Command { #[clap(subcommand)] @@ -31,8 +46,8 @@ pub struct Command { } impl Command { - pub async fn run(self) -> Result<()> { - self.subcmd.run().await + pub async fn build(self) -> Result { + self.subcmd.build().await } } @@ -42,9 +57,9 @@ enum SubCommand { } impl SubCommand { - async fn run(self) -> Result<()> { + async fn build(self) -> Result { match self { - SubCommand::Start(cmd) => cmd.run().await, + SubCommand::Start(cmd) => cmd.build().await, } } } @@ -72,19 +87,16 @@ struct StartCommand { } impl StartCommand { - async fn run(self) -> Result<()> { + async fn build(self) -> Result { logging::info!("Datanode start command: {:#?}", self); let opts: DatanodeOptions = self.try_into()?; logging::info!("Datanode options: {:#?}", opts); - Datanode::new(opts) - .await - .context(StartDatanodeSnafu)? - .start() - .await - .context(StartDatanodeSnafu) + let datanode = Datanode::new(opts).await.context(StartDatanodeSnafu)?; + + Ok(Instance { datanode }) } } diff --git a/src/cmd/src/error.rs b/src/cmd/src/error.rs index 9a7c083c75..a4b42c7fda 100644 --- a/src/cmd/src/error.rs +++ b/src/cmd/src/error.rs @@ -32,6 +32,12 @@ pub enum Error { source: frontend::error::Error, }, + #[snafu(display("Failed to build meta server, source: {}", source))] + BuildMetaServer { + #[snafu(backtrace)] + source: meta_srv::error::Error, + }, + #[snafu(display("Failed to start meta server, source: {}", source))] StartMetaServer { #[snafu(backtrace)] @@ -138,6 +144,7 @@ impl ErrorExt for Error { Error::StartDatanode { source } => source.status_code(), Error::StartFrontend { source } => source.status_code(), Error::StartMetaServer { source } => source.status_code(), + Error::BuildMetaServer { source } => source.status_code(), Error::UnsupportedSelectorType { source, .. } => source.status_code(), Error::ReadConfig { .. } | Error::ParseConfig { .. } | Error::MissingConfig { .. } => { StatusCode::InvalidArguments diff --git a/src/cmd/src/frontend.rs b/src/cmd/src/frontend.rs index c943cba994..b98fdda97d 100644 --- a/src/cmd/src/frontend.rs +++ b/src/cmd/src/frontend.rs @@ -19,7 +19,7 @@ use common_base::Plugins; use frontend::frontend::FrontendOptions; use frontend::grpc::GrpcOptions; use frontend::influxdb::InfluxdbOptions; -use frontend::instance::{FrontendInstance, Instance}; +use frontend::instance::{FrontendInstance, Instance as FeInstance}; use frontend::mysql::MysqlOptions; use frontend::opentsdb::OpentsdbOptions; use frontend::postgres::PostgresOptions; @@ -34,6 +34,24 @@ use snafu::ResultExt; use crate::error::{self, IllegalAuthConfigSnafu, Result}; use crate::toml_loader; +pub struct Instance { + frontend: FeInstance, +} + +impl Instance { + pub async fn run(&mut self) -> Result<()> { + self.frontend + .start() + .await + .context(error::StartFrontendSnafu) + } + + pub async fn stop(&self) -> Result<()> { + // TODO: handle frontend shutdown + Ok(()) + } +} + #[derive(Parser)] pub struct Command { #[clap(subcommand)] @@ -41,8 +59,8 @@ pub struct Command { } impl Command { - pub async fn run(self) -> Result<()> { - self.subcmd.run().await + pub async fn build(self) -> Result { + self.subcmd.build().await } } @@ -52,9 +70,9 @@ enum SubCommand { } impl SubCommand { - async fn run(self) -> Result<()> { + async fn build(self) -> Result { match self { - SubCommand::Start(cmd) => cmd.run().await, + SubCommand::Start(cmd) => cmd.build().await, } } } @@ -90,11 +108,11 @@ pub struct StartCommand { } impl StartCommand { - async fn run(self) -> Result<()> { + async fn build(self) -> Result { let plugins = Arc::new(load_frontend_plugins(&self.user_provider)?); let opts: FrontendOptions = self.try_into()?; - let mut instance = Instance::try_new_distributed(&opts, plugins.clone()) + let mut instance = FeInstance::try_new_distributed(&opts, plugins.clone()) .await .context(error::StartFrontendSnafu)?; @@ -103,7 +121,7 @@ impl StartCommand { .await .context(error::StartFrontendSnafu)?; - instance.start().await.context(error::StartFrontendSnafu) + Ok(Instance { frontend: instance }) } } diff --git a/src/cmd/src/metasrv.rs b/src/cmd/src/metasrv.rs index 8151d3b3f4..ab46f6f09b 100644 --- a/src/cmd/src/metasrv.rs +++ b/src/cmd/src/metasrv.rs @@ -14,13 +14,32 @@ use clap::Parser; use common_telemetry::{info, logging, warn}; -use meta_srv::bootstrap; +use meta_srv::bootstrap::MetaSrvInstance; use meta_srv::metasrv::MetaSrvOptions; use snafu::ResultExt; use crate::error::{Error, Result}; use crate::{error, toml_loader}; +pub struct Instance { + instance: MetaSrvInstance, +} + +impl Instance { + pub async fn run(&mut self) -> Result<()> { + self.instance + .start() + .await + .context(error::StartMetaServerSnafu)?; + Ok(()) + } + + pub async fn stop(&self) -> Result<()> { + // TODO: handle metasrv shutdown + Ok(()) + } +} + #[derive(Parser)] pub struct Command { #[clap(subcommand)] @@ -28,8 +47,8 @@ pub struct Command { } impl Command { - pub async fn run(self) -> Result<()> { - self.subcmd.run().await + pub async fn build(self) -> Result { + self.subcmd.build().await } } @@ -39,9 +58,9 @@ enum SubCommand { } impl SubCommand { - async fn run(self) -> Result<()> { + async fn build(self) -> Result { match self { - SubCommand::Start(cmd) => cmd.run().await, + SubCommand::Start(cmd) => cmd.build().await, } } } @@ -63,16 +82,17 @@ struct StartCommand { } impl StartCommand { - async fn run(self) -> Result<()> { + async fn build(self) -> Result { logging::info!("MetaSrv start command: {:#?}", self); let opts: MetaSrvOptions = self.try_into()?; logging::info!("MetaSrv options: {:#?}", opts); - - bootstrap::bootstrap_meta_srv(opts) + let instance = MetaSrvInstance::new(opts) .await - .context(error::StartMetaServerSnafu) + .context(error::BuildMetaServerSnafu)?; + + Ok(Instance { instance }) } } diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index d046028709..1d53ca7b0f 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -47,8 +47,8 @@ pub struct Command { } impl Command { - pub async fn run(self) -> Result<()> { - self.subcmd.run().await + pub async fn build(self) -> Result { + self.subcmd.build().await } } @@ -58,9 +58,9 @@ enum SubCommand { } impl SubCommand { - async fn run(self) -> Result<()> { + async fn build(self) -> Result { match self { - SubCommand::Start(cmd) => cmd.run().await, + SubCommand::Start(cmd) => cmd.build().await, } } } @@ -133,6 +133,30 @@ impl StandaloneOptions { } } +pub struct Instance { + datanode: Datanode, + frontend: FeInstance, +} + +impl Instance { + pub async fn run(&mut self) -> Result<()> { + // Start datanode instance before starting services, to avoid requests come in before internal components are started. + self.datanode + .start_instance() + .await + .context(StartDatanodeSnafu)?; + info!("Datanode instance started"); + + self.frontend.start().await.context(StartFrontendSnafu)?; + Ok(()) + } + + pub async fn stop(&self) -> Result<()> { + // TODO: handle standalone shutdown + Ok(()) + } +} + #[derive(Debug, Parser)] struct StartCommand { #[clap(long)] @@ -164,7 +188,7 @@ struct StartCommand { } impl StartCommand { - async fn run(self) -> Result<()> { + async fn build(self) -> Result { let enable_memory_catalog = self.enable_memory_catalog; let config_file = self.config_file.clone(); let plugins = Arc::new(load_frontend_plugins(&self.user_provider)?); @@ -184,25 +208,18 @@ impl StartCommand { fe_opts, dn_opts ); - let mut datanode = Datanode::new(dn_opts.clone()) + let datanode = Datanode::new(dn_opts.clone()) .await .context(StartDatanodeSnafu)?; - let mut frontend = build_frontend(plugins.clone(), datanode.get_instance()).await?; - // Start datanode instance before starting services, to avoid requests come in before internal components are started. - datanode - .start_instance() - .await - .context(StartDatanodeSnafu)?; - info!("Datanode instance started"); + let mut frontend = build_frontend(plugins.clone(), datanode.get_instance()).await?; frontend .build_servers(&fe_opts, plugins) .await .context(StartFrontendSnafu)?; - frontend.start().await.context(StartFrontendSnafu)?; - Ok(()) + Ok(Instance { datanode, frontend }) } } diff --git a/src/meta-srv/src/bootstrap.rs b/src/meta-srv/src/bootstrap.rs index 7749470019..c075b12ec9 100644 --- a/src/meta-srv/src/bootstrap.rs +++ b/src/meta-srv/src/bootstrap.rs @@ -39,18 +39,45 @@ use crate::service::store::kv::ResettableKvStoreRef; use crate::service::store::memory::MemStore; use crate::{error, Result}; -// Bootstrap the rpc server to serve incoming request -pub async fn bootstrap_meta_srv(opts: MetaSrvOptions) -> Result<()> { - let meta_srv = make_meta_srv(opts.clone()).await?; - bootstrap_meta_srv_with_router(opts, router(meta_srv)).await +#[derive(Clone)] +pub struct MetaSrvInstance { + meta_srv: MetaSrv, + + opts: MetaSrvOptions, } -pub async fn bootstrap_meta_srv_with_router(opts: MetaSrvOptions, router: Router) -> Result<()> { - let listener = TcpListener::bind(&opts.bind_addr) +impl MetaSrvInstance { + pub async fn new(opts: MetaSrvOptions) -> Result { + let meta_srv = build_meta_srv(&opts).await?; + + Ok(MetaSrvInstance { meta_srv, opts }) + } + + pub async fn start(&self) -> Result<()> { + self.meta_srv.start().await; + bootstrap_meta_srv_with_router(&self.opts.bind_addr, router(self.meta_srv.clone())).await?; + + Ok(()) + } + + pub async fn close(&self) -> Result<()> { + // TODO: shutdown the router + self.meta_srv.shutdown(); + + Ok(()) + } +} + +// Bootstrap the rpc server to serve incoming request +pub async fn bootstrap_meta_srv(opts: MetaSrvOptions) -> Result<()> { + let meta_srv = make_meta_srv(&opts).await?; + bootstrap_meta_srv_with_router(&opts.bind_addr, router(meta_srv)).await +} + +pub async fn bootstrap_meta_srv_with_router(bind_addr: &str, router: Router) -> Result<()> { + let listener = TcpListener::bind(bind_addr) .await - .context(error::TcpBindSnafu { - addr: &opts.bind_addr, - })?; + .context(error::TcpBindSnafu { addr: bind_addr })?; let listener = TcpListenerStream::new(listener); router @@ -72,7 +99,7 @@ pub fn router(meta_srv: MetaSrv) -> Router { .add_service(admin::make_admin_service(meta_srv)) } -pub async fn make_meta_srv(opts: MetaSrvOptions) -> Result { +pub async fn build_meta_srv(opts: &MetaSrvOptions) -> Result { let (kv_store, election, lock) = if opts.use_memory_store { (Arc::new(MemStore::new()) as _, None, None) } else { @@ -107,7 +134,7 @@ pub async fn make_meta_srv(opts: MetaSrvOptions) -> Result { }; let meta_srv = MetaSrvBuilder::new() - .options(opts) + .options(opts.clone()) .kv_store(kv_store) .in_memory(in_memory) .selector(selector) @@ -117,6 +144,12 @@ pub async fn make_meta_srv(opts: MetaSrvOptions) -> Result { .build() .await; + Ok(meta_srv) +} + +pub async fn make_meta_srv(opts: &MetaSrvOptions) -> Result { + let meta_srv = build_meta_srv(opts).await?; + meta_srv.start().await; Ok(meta_srv)