diff --git a/Cargo.lock b/Cargo.lock index f89045bb6b..98d888cfbb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4723,7 +4723,7 @@ dependencies = [ [[package]] name = "greptime-proto" version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=683e9d10ae7f3dfb8aaabd89082fc600c17e3795#683e9d10ae7f3dfb8aaabd89082fc600c17e3795" +source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=1915576b113a494f5352fd61f211d899b7f87aab#1915576b113a494f5352fd61f211d899b7f87aab" dependencies = [ "prost 0.13.3", "serde", diff --git a/Cargo.toml b/Cargo.toml index c094a2d651..ca58730599 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -127,7 +127,8 @@ etcd-client = "0.14" fst = "0.4.7" futures = "0.3" futures-util = "0.3" -greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "683e9d10ae7f3dfb8aaabd89082fc600c17e3795" } +# branch: poc-write-path +greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "1915576b113a494f5352fd61f211d899b7f87aab" } hex = "0.4" http = "1" humantime = "2.1" diff --git a/README.md b/README.md index ba94fdbbb2..51434ff837 100644 --- a/README.md +++ b/README.md @@ -13,7 +13,7 @@ GreptimeCloud | User Guide | API Docs | - Roadmap 2024 + Roadmap 2025 diff --git a/config/config.md b/config/config.md index 0639e06a21..42cbe6c116 100644 --- a/config/config.md +++ b/config/config.md @@ -26,6 +26,8 @@ | `http.addr` | String | `127.0.0.1:4000` | The address to bind the HTTP server. | | `http.timeout` | String | `30s` | HTTP request timeout. Set to 0 to disable timeout. | | `http.body_limit` | String | `64MB` | HTTP request body limit.
The following units are supported: `B`, `KB`, `KiB`, `MB`, `MiB`, `GB`, `GiB`, `TB`, `TiB`, `PB`, `PiB`.
Set to 0 to disable limit. | +| `http.enable_cors` | Bool | `true` | HTTP CORS support, it's turned on by default
This allows browser to access http APIs without CORS restrictions | +| `http.cors_allowed_origins` | Array | Unset | Customize allowed origins for HTTP CORS. | | `grpc` | -- | -- | The gRPC server options. | | `grpc.addr` | String | `127.0.0.1:4001` | The address to bind the gRPC server. | | `grpc.runtime_size` | Integer | `8` | The number of server worker threads. | @@ -216,6 +218,8 @@ | `http.addr` | String | `127.0.0.1:4000` | The address to bind the HTTP server. | | `http.timeout` | String | `30s` | HTTP request timeout. Set to 0 to disable timeout. | | `http.body_limit` | String | `64MB` | HTTP request body limit.
The following units are supported: `B`, `KB`, `KiB`, `MB`, `MiB`, `GB`, `GiB`, `TB`, `TiB`, `PB`, `PiB`.
Set to 0 to disable limit. | +| `http.enable_cors` | Bool | `true` | HTTP CORS support, it's turned on by default
This allows browser to access http APIs without CORS restrictions | +| `http.cors_allowed_origins` | Array | Unset | Customize allowed origins for HTTP CORS. | | `grpc` | -- | -- | The gRPC server options. | | `grpc.addr` | String | `127.0.0.1:4001` | The address to bind the gRPC server. | | `grpc.hostname` | String | `127.0.0.1:4001` | The hostname advertised to the metasrv,
and used for connections from outside the host | diff --git a/config/frontend.example.toml b/config/frontend.example.toml index ade1b9169a..2ecd7a19e8 100644 --- a/config/frontend.example.toml +++ b/config/frontend.example.toml @@ -31,6 +31,12 @@ timeout = "30s" ## The following units are supported: `B`, `KB`, `KiB`, `MB`, `MiB`, `GB`, `GiB`, `TB`, `TiB`, `PB`, `PiB`. ## Set to 0 to disable limit. body_limit = "64MB" +## HTTP CORS support, it's turned on by default +## This allows browser to access http APIs without CORS restrictions +enable_cors = true +## Customize allowed origins for HTTP CORS. +## @toml2docs:none-default +cors_allowed_origins = ["https://example.com"] ## The gRPC server options. [grpc] diff --git a/config/standalone.example.toml b/config/standalone.example.toml index c86e48ae51..da1afc29fc 100644 --- a/config/standalone.example.toml +++ b/config/standalone.example.toml @@ -39,6 +39,12 @@ timeout = "30s" ## The following units are supported: `B`, `KB`, `KiB`, `MB`, `MiB`, `GB`, `GiB`, `TB`, `TiB`, `PB`, `PiB`. ## Set to 0 to disable limit. body_limit = "64MB" +## HTTP CORS support, it's turned on by default +## This allows browser to access http APIs without CORS restrictions +enable_cors = true +## Customize allowed origins for HTTP CORS. +## @toml2docs:none-default +cors_allowed_origins = ["https://example.com"] ## The gRPC server options. [grpc] diff --git a/docker/docker-compose/cluster-with-etcd.yaml b/docker/docker-compose/cluster-with-etcd.yaml index e7794662a8..8e1773c7d7 100644 --- a/docker/docker-compose/cluster-with-etcd.yaml +++ b/docker/docker-compose/cluster-with-etcd.yaml @@ -39,14 +39,16 @@ services: container_name: metasrv ports: - 3002:3002 + - 3000:3000 command: - metasrv - start - --bind-addr=0.0.0.0:3002 - --server-addr=metasrv:3002 - --store-addrs=etcd0:2379 + - --http-addr=0.0.0.0:3000 healthcheck: - test: [ "CMD", "curl", "-f", "http://metasrv:3002/health" ] + test: [ "CMD", "curl", "-f", "http://metasrv:3000/health" ] interval: 5s timeout: 3s retries: 5 @@ -73,10 +75,10 @@ services: volumes: - /tmp/greptimedb-cluster-docker-compose/datanode0:/tmp/greptimedb healthcheck: - test: [ "CMD", "curl", "-f", "http://datanode0:5000/health" ] + test: [ "CMD", "curl", "-fv", "http://datanode0:5000/health" ] interval: 5s timeout: 3s - retries: 5 + retries: 10 depends_on: metasrv: condition: service_healthy @@ -115,6 +117,7 @@ services: container_name: flownode0 ports: - 4004:4004 + - 4005:4005 command: - flownode - start @@ -122,9 +125,15 @@ services: - --metasrv-addrs=metasrv:3002 - --rpc-addr=0.0.0.0:4004 - --rpc-hostname=flownode0:4004 + - --http-addr=0.0.0.0:4005 depends_on: frontend0: condition: service_healthy + healthcheck: + test: [ "CMD", "curl", "-f", "http://flownode0:4005/health" ] + interval: 5s + timeout: 3s + retries: 5 networks: - greptimedb diff --git a/flake.nix b/flake.nix index 4daa373959..a6d9fbc0df 100644 --- a/flake.nix +++ b/flake.nix @@ -18,7 +18,11 @@ libgit2 libz ]; - + lib = nixpkgs.lib; + rustToolchain = fenix.packages.${system}.fromToolchainName { + name = (lib.importTOML ./rust-toolchain.toml).toolchain.channel; + sha256 = "sha256-f/CVA1EC61EWbh0SjaRNhLL0Ypx2ObupbzigZp8NmL4="; + }; in { devShells.default = pkgs.mkShell { @@ -30,14 +34,20 @@ protobuf gnumake mold - (fenix.packages.${system}.fromToolchainFile { - dir = ./.; - sha256 = "sha256-f/CVA1EC61EWbh0SjaRNhLL0Ypx2ObupbzigZp8NmL4="; - }) + (rustToolchain.withComponents [ + "cargo" + "clippy" + "rust-src" + "rustc" + "rustfmt" + "rust-analyzer" + "llvm-tools" + ]) cargo-nextest cargo-llvm-cov taplo curl + gnuplot ## for cargo bench ]; LD_LIBRARY_PATH = pkgs.lib.makeLibraryPath buildInputs; diff --git a/rust-toolchain.toml b/rust-toolchain.toml index 388d16249a..eb2546003b 100644 --- a/rust-toolchain.toml +++ b/rust-toolchain.toml @@ -1,3 +1,2 @@ [toolchain] channel = "nightly-2024-12-25" -components = ["rust-analyzer", "llvm-tools"] diff --git a/src/cli/src/database.rs b/src/cli/src/database.rs index 7152aac592..24c4514fbc 100644 --- a/src/cli/src/database.rs +++ b/src/cli/src/database.rs @@ -17,6 +17,7 @@ use std::time::Duration; use base64::engine::general_purpose; use base64::Engine; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; +use common_error::ext::BoxedError; use humantime::format_duration; use serde_json::Value; use servers::http::header::constants::GREPTIME_DB_HEADER_TIMEOUT; @@ -24,7 +25,9 @@ use servers::http::result::greptime_result_v1::GreptimedbV1Response; use servers::http::GreptimeQueryOutput; use snafu::ResultExt; -use crate::error::{HttpQuerySqlSnafu, Result, SerdeJsonSnafu}; +use crate::error::{ + BuildClientSnafu, HttpQuerySqlSnafu, ParseProxyOptsSnafu, Result, SerdeJsonSnafu, +}; #[derive(Debug, Clone)] pub struct DatabaseClient { @@ -32,6 +35,23 @@ pub struct DatabaseClient { catalog: String, auth_header: Option, timeout: Duration, + proxy: Option, +} + +pub fn parse_proxy_opts( + proxy: Option, + no_proxy: bool, +) -> std::result::Result, BoxedError> { + if no_proxy { + return Ok(None); + } + proxy + .map(|proxy| { + reqwest::Proxy::all(proxy) + .context(ParseProxyOptsSnafu) + .map_err(BoxedError::new) + }) + .transpose() } impl DatabaseClient { @@ -40,6 +60,7 @@ impl DatabaseClient { catalog: String, auth_basic: Option, timeout: Duration, + proxy: Option, ) -> Self { let auth_header = if let Some(basic) = auth_basic { let encoded = general_purpose::STANDARD.encode(basic); @@ -48,11 +69,18 @@ impl DatabaseClient { None }; + if let Some(ref proxy) = proxy { + common_telemetry::info!("Using proxy: {:?}", proxy); + } else { + common_telemetry::info!("Using system proxy(if any)"); + } + Self { addr, catalog, auth_header, timeout, + proxy, } } @@ -67,7 +95,13 @@ impl DatabaseClient { ("db", format!("{}-{}", self.catalog, schema)), ("sql", sql.to_string()), ]; - let mut request = reqwest::Client::new() + let client = self + .proxy + .clone() + .map(|proxy| reqwest::Client::builder().proxy(proxy).build()) + .unwrap_or_else(|| Ok(reqwest::Client::new())) + .context(BuildClientSnafu)?; + let mut request = client .post(&url) .form(¶ms) .header("Content-Type", "application/x-www-form-urlencoded"); diff --git a/src/cli/src/error.rs b/src/cli/src/error.rs index bf0b6342c1..1b79ee759b 100644 --- a/src/cli/src/error.rs +++ b/src/cli/src/error.rs @@ -86,6 +86,22 @@ pub enum Error { location: Location, }, + #[snafu(display("Failed to parse proxy options: {}", error))] + ParseProxyOpts { + #[snafu(source)] + error: reqwest::Error, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Failed to build reqwest client: {}", error))] + BuildClient { + #[snafu(implicit)] + location: Location, + #[snafu(source)] + error: reqwest::Error, + }, + #[snafu(display("Invalid REPL command: {reason}"))] InvalidReplCommand { reason: String }, @@ -278,7 +294,8 @@ impl ErrorExt for Error { | Error::InitTimezone { .. } | Error::ConnectEtcd { .. } | Error::CreateDir { .. } - | Error::EmptyResult { .. } => StatusCode::InvalidArguments, + | Error::EmptyResult { .. } + | Error::ParseProxyOpts { .. } => StatusCode::InvalidArguments, Error::StartProcedureManager { source, .. } | Error::StopProcedureManager { source, .. } => source.status_code(), @@ -298,7 +315,8 @@ impl ErrorExt for Error { Error::SerdeJson { .. } | Error::FileIo { .. } | Error::SpawnThread { .. } - | Error::InitTlsProvider { .. } => StatusCode::Unexpected, + | Error::InitTlsProvider { .. } + | Error::BuildClient { .. } => StatusCode::Unexpected, Error::Other { source, .. } => source.status_code(), diff --git a/src/cli/src/export.rs b/src/cli/src/export.rs index 91e4be22bb..846e2a49ad 100644 --- a/src/cli/src/export.rs +++ b/src/cli/src/export.rs @@ -28,7 +28,7 @@ use tokio::io::{AsyncWriteExt, BufWriter}; use tokio::sync::Semaphore; use tokio::time::Instant; -use crate::database::DatabaseClient; +use crate::database::{parse_proxy_opts, DatabaseClient}; use crate::error::{EmptyResultSnafu, Error, FileIoSnafu, Result, SchemaNotFoundSnafu}; use crate::{database, Tool}; @@ -91,19 +91,30 @@ pub struct ExportCommand { /// The default behavior will disable server-side default timeout(i.e. `0s`). #[clap(long, value_parser = humantime::parse_duration)] timeout: Option, + + /// The proxy server address to connect, if set, will override the system proxy. + /// + /// The default behavior will use the system proxy if neither `proxy` nor `no_proxy` is set. + #[clap(long)] + proxy: Option, + + /// Disable proxy server, if set, will not use any proxy. + #[clap(long)] + no_proxy: bool, } impl ExportCommand { pub async fn build(&self) -> std::result::Result, BoxedError> { let (catalog, schema) = database::split_database(&self.database).map_err(BoxedError::new)?; - + let proxy = parse_proxy_opts(self.proxy.clone(), self.no_proxy)?; let database_client = DatabaseClient::new( self.addr.clone(), catalog.clone(), self.auth_basic.clone(), // Treats `None` as `0s` to disable server-side default timeout. self.timeout.unwrap_or_default(), + proxy, ); Ok(Box::new(Export { diff --git a/src/cli/src/import.rs b/src/cli/src/import.rs index f76560fbcd..7cff2fd37f 100644 --- a/src/cli/src/import.rs +++ b/src/cli/src/import.rs @@ -25,7 +25,7 @@ use snafu::{OptionExt, ResultExt}; use tokio::sync::Semaphore; use tokio::time::Instant; -use crate::database::DatabaseClient; +use crate::database::{parse_proxy_opts, DatabaseClient}; use crate::error::{Error, FileIoSnafu, Result, SchemaNotFoundSnafu}; use crate::{database, Tool}; @@ -76,18 +76,30 @@ pub struct ImportCommand { /// The default behavior will disable server-side default timeout(i.e. `0s`). #[clap(long, value_parser = humantime::parse_duration)] timeout: Option, + + /// The proxy server address to connect, if set, will override the system proxy. + /// + /// The default behavior will use the system proxy if neither `proxy` nor `no_proxy` is set. + #[clap(long)] + proxy: Option, + + /// Disable proxy server, if set, will not use any proxy. + #[clap(long, default_value = "false")] + no_proxy: bool, } impl ImportCommand { pub async fn build(&self) -> std::result::Result, BoxedError> { let (catalog, schema) = database::split_database(&self.database).map_err(BoxedError::new)?; + let proxy = parse_proxy_opts(self.proxy.clone(), self.no_proxy)?; let database_client = DatabaseClient::new( self.addr.clone(), catalog.clone(), self.auth_basic.clone(), // Treats `None` as `0s` to disable server-side default timeout. self.timeout.unwrap_or_default(), + proxy, ); Ok(Box::new(Import { diff --git a/src/cmd/tests/load_config_test.rs b/src/cmd/tests/load_config_test.rs index 78ef6848f4..f913964694 100644 --- a/src/cmd/tests/load_config_test.rs +++ b/src/cmd/tests/load_config_test.rs @@ -34,6 +34,7 @@ use metric_engine::config::EngineConfig as MetricEngineConfig; use mito2::config::MitoConfig; use servers::export_metrics::ExportMetricsOption; use servers::grpc::GrpcOptions; +use servers::http::HttpOptions; #[allow(deprecated)] #[test] @@ -144,6 +145,10 @@ fn test_load_frontend_example_config() { ..Default::default() }, grpc: GrpcOptions::default().with_hostname("127.0.0.1:4001"), + http: HttpOptions { + cors_allowed_origins: vec!["https://example.com".to_string()], + ..Default::default() + }, ..Default::default() }, ..Default::default() @@ -234,6 +239,10 @@ fn test_load_standalone_example_config() { remote_write: Some(Default::default()), ..Default::default() }, + http: HttpOptions { + cors_allowed_origins: vec!["https://example.com".to_string()], + ..Default::default() + }, ..Default::default() }, ..Default::default() diff --git a/src/datatypes/src/vectors/primitive.rs b/src/datatypes/src/vectors/primitive.rs index 6212276dd0..7b059e0d07 100644 --- a/src/datatypes/src/vectors/primitive.rs +++ b/src/datatypes/src/vectors/primitive.rs @@ -80,10 +80,12 @@ impl PrimitiveVector { } } - pub fn from_vec(array: Vec) -> Self { - Self { - array: PrimitiveArray::from_iter_values(array), - } + pub fn from_vec(vector: Vec) -> Self { + let mutable_buffer = arrow::buffer::MutableBuffer::from(vector); + let mut primitive_builder = + PrimitiveBuilder::::new_from_buffer(mutable_buffer, None); + let array = primitive_builder.finish(); + Self { array } } pub fn from_iter_values>(iter: I) -> Self { diff --git a/src/mito2/benches/memtable_bench.rs b/src/mito2/benches/memtable_bench.rs index aca8ad4652..a7d552e5a5 100644 --- a/src/mito2/benches/memtable_bench.rs +++ b/src/mito2/benches/memtable_bench.rs @@ -276,6 +276,7 @@ impl CpuDataGenerator { rows, }), write_hint: None, + bulk: Vec::new(), }; KeyValues::new(&self.metadata, mutation).unwrap() diff --git a/src/mito2/src/access_layer.rs b/src/mito2/src/access_layer.rs index 16d1480a61..51dd7a962a 100644 --- a/src/mito2/src/access_layer.rs +++ b/src/mito2/src/access_layer.rs @@ -17,10 +17,12 @@ use std::sync::Arc; use object_store::services::Fs; use object_store::util::{join_dir, with_instrument_layers}; use object_store::ObjectStore; +use smallvec::SmallVec; use snafu::ResultExt; use store_api::metadata::RegionMetadataRef; -use store_api::storage::SequenceNumber; +use store_api::storage::{RegionId, SequenceNumber}; +use crate::cache::file_cache::{FileCacheRef, FileType, IndexKey}; use crate::cache::write_cache::SstUploadRequest; use crate::cache::CacheManagerRef; use crate::config::{BloomFilterConfig, FulltextIndexConfig, InvertedIndexConfig}; @@ -30,13 +32,15 @@ use crate::region::options::IndexOptions; use crate::sst::file::{FileHandle, FileId, FileMeta}; use crate::sst::index::intermediate::IntermediateManager; use crate::sst::index::puffin_manager::PuffinManagerFactory; -use crate::sst::index::IndexerBuilder; +use crate::sst::index::IndexerBuilderImpl; use crate::sst::location; use crate::sst::parquet::reader::ParquetReaderBuilder; use crate::sst::parquet::writer::ParquetWriter; use crate::sst::parquet::{SstInfo, WriteOptions}; pub type AccessLayerRef = Arc; +/// SST write results. +pub type SstInfoArray = SmallVec<[SstInfo; 2]>; /// A layer to access SST files under the same directory. pub struct AccessLayer { @@ -121,11 +125,8 @@ impl AccessLayer { &self, request: SstWriteRequest, write_opts: &WriteOptions, - ) -> Result> { - let file_path = location::sst_file_path(&self.region_dir, request.file_id); - let index_file_path = location::index_file_path(&self.region_dir, request.file_id); + ) -> Result { let region_id = request.metadata.region_id; - let file_id = request.file_id; let cache_manager = request.cache_manager.clone(); let sst_info = if let Some(write_cache) = cache_manager.write_cache() { @@ -134,8 +135,9 @@ impl AccessLayer { .write_and_upload_sst( request, SstUploadRequest { - upload_path: file_path, - index_upload_path: index_file_path, + dest_path_provider: RegionFilePathFactory { + region_dir: self.region_dir.clone(), + }, remote_store: self.object_store.clone(), }, write_opts, @@ -144,11 +146,9 @@ impl AccessLayer { } else { // Write cache is disabled. let store = self.object_store.clone(); - let indexer = IndexerBuilder { + let indexer_builder = IndexerBuilderImpl { op_type: request.op_type, - file_id, - file_path: index_file_path, - metadata: &request.metadata, + metadata: request.metadata.clone(), row_group_size: write_opts.row_group_size, puffin_manager: self.puffin_manager_factory.build(store), intermediate_manager: self.intermediate_manager.clone(), @@ -156,24 +156,31 @@ impl AccessLayer { inverted_index_config: request.inverted_index_config, fulltext_index_config: request.fulltext_index_config, bloom_filter_index_config: request.bloom_filter_index_config, - } - .build() - .await; + }; let mut writer = ParquetWriter::new_with_object_store( self.object_store.clone(), - file_path, request.metadata, - indexer, - ); + indexer_builder, + RegionFilePathFactory { + region_dir: self.region_dir.clone(), + }, + ) + .await; writer .write_all(request.source, request.max_sequence, write_opts) .await? }; // Put parquet metadata to cache manager. - if let Some(sst_info) = &sst_info { - if let Some(parquet_metadata) = &sst_info.file_metadata { - cache_manager.put_parquet_meta_data(region_id, file_id, parquet_metadata.clone()) + if !sst_info.is_empty() { + for sst in &sst_info { + if let Some(parquet_metadata) = &sst.file_metadata { + cache_manager.put_parquet_meta_data( + region_id, + sst.file_id, + parquet_metadata.clone(), + ) + } } } @@ -191,7 +198,6 @@ pub(crate) enum OperationType { /// Contents to build a SST. pub(crate) struct SstWriteRequest { pub(crate) op_type: OperationType, - pub(crate) file_id: FileId, pub(crate) metadata: RegionMetadataRef, pub(crate) source: Source, pub(crate) cache_manager: CacheManagerRef, @@ -229,3 +235,47 @@ async fn clean_dir(dir: &str) -> Result<()> { Ok(()) } + +/// Path provider for SST file and index file. +pub trait FilePathProvider: Send + Sync { + /// Creates index file path of given file id. + fn build_index_file_path(&self, file_id: FileId) -> String; + + /// Creates SST file path of given file id. + fn build_sst_file_path(&self, file_id: FileId) -> String; +} + +/// Path provider that builds paths in local write cache. +#[derive(Clone)] +pub(crate) struct WriteCachePathProvider { + pub(crate) region_id: RegionId, + pub(crate) file_cache: FileCacheRef, +} + +impl FilePathProvider for WriteCachePathProvider { + fn build_index_file_path(&self, file_id: FileId) -> String { + let puffin_key = IndexKey::new(self.region_id, file_id, FileType::Puffin); + self.file_cache.cache_file_path(puffin_key) + } + + fn build_sst_file_path(&self, file_id: FileId) -> String { + let parquet_file_key = IndexKey::new(self.region_id, file_id, FileType::Parquet); + self.file_cache.cache_file_path(parquet_file_key) + } +} + +/// Path provider that builds paths in region storage path. +#[derive(Clone, Debug)] +pub(crate) struct RegionFilePathFactory { + pub(crate) region_dir: String, +} + +impl FilePathProvider for RegionFilePathFactory { + fn build_index_file_path(&self, file_id: FileId) -> String { + location::index_file_path(&self.region_dir, file_id) + } + + fn build_sst_file_path(&self, file_id: FileId) -> String { + location::sst_file_path(&self.region_dir, file_id) + } +} diff --git a/src/mito2/src/cache/write_cache.rs b/src/mito2/src/cache/write_cache.rs index 1e9dfb5400..0ae00b3c6c 100644 --- a/src/mito2/src/cache/write_cache.rs +++ b/src/mito2/src/cache/write_cache.rs @@ -23,7 +23,10 @@ use futures::AsyncWriteExt; use object_store::ObjectStore; use snafu::ResultExt; -use crate::access_layer::{new_fs_cache_store, SstWriteRequest}; +use crate::access_layer::{ + new_fs_cache_store, FilePathProvider, RegionFilePathFactory, SstInfoArray, SstWriteRequest, + WriteCachePathProvider, +}; use crate::cache::file_cache::{FileCache, FileCacheRef, FileType, IndexKey, IndexValue}; use crate::error::{self, Result}; use crate::metrics::{ @@ -32,9 +35,9 @@ use crate::metrics::{ }; use crate::sst::index::intermediate::IntermediateManager; use crate::sst::index::puffin_manager::PuffinManagerFactory; -use crate::sst::index::IndexerBuilder; +use crate::sst::index::IndexerBuilderImpl; use crate::sst::parquet::writer::ParquetWriter; -use crate::sst::parquet::{SstInfo, WriteOptions}; +use crate::sst::parquet::WriteOptions; use crate::sst::{DEFAULT_WRITE_BUFFER_SIZE, DEFAULT_WRITE_CONCURRENCY}; /// A cache for uploading files to remote object stores. @@ -103,22 +106,21 @@ impl WriteCache { write_request: SstWriteRequest, upload_request: SstUploadRequest, write_opts: &WriteOptions, - ) -> Result> { + ) -> Result { let timer = FLUSH_ELAPSED .with_label_values(&["write_sst"]) .start_timer(); let region_id = write_request.metadata.region_id; - let file_id = write_request.file_id; - let parquet_key = IndexKey::new(region_id, file_id, FileType::Parquet); - let puffin_key = IndexKey::new(region_id, file_id, FileType::Puffin); let store = self.file_cache.local_store(); - let indexer = IndexerBuilder { + let path_provider = WriteCachePathProvider { + file_cache: self.file_cache.clone(), + region_id, + }; + let indexer = IndexerBuilderImpl { op_type: write_request.op_type, - file_id, - file_path: self.file_cache.cache_file_path(puffin_key), - metadata: &write_request.metadata, + metadata: write_request.metadata.clone(), row_group_size: write_opts.row_group_size, puffin_manager: self.puffin_manager_factory.build(store), intermediate_manager: self.intermediate_manager.clone(), @@ -126,17 +128,16 @@ impl WriteCache { inverted_index_config: write_request.inverted_index_config, fulltext_index_config: write_request.fulltext_index_config, bloom_filter_index_config: write_request.bloom_filter_index_config, - } - .build() - .await; + }; // Write to FileCache. let mut writer = ParquetWriter::new_with_object_store( self.file_cache.local_store(), - self.file_cache.cache_file_path(parquet_key), write_request.metadata, indexer, - ); + path_provider, + ) + .await; let sst_info = writer .write_all(write_request.source, write_request.max_sequence, write_opts) @@ -145,22 +146,29 @@ impl WriteCache { timer.stop_and_record(); // Upload sst file to remote object store. - let Some(sst_info) = sst_info else { - // No data need to upload. - return Ok(None); - }; - - let parquet_path = &upload_request.upload_path; - let remote_store = &upload_request.remote_store; - self.upload(parquet_key, parquet_path, remote_store).await?; - - if sst_info.index_metadata.file_size > 0 { - let puffin_key = IndexKey::new(region_id, file_id, FileType::Puffin); - let puffin_path = &upload_request.index_upload_path; - self.upload(puffin_key, puffin_path, remote_store).await?; + if sst_info.is_empty() { + return Ok(sst_info); } - Ok(Some(sst_info)) + let remote_store = &upload_request.remote_store; + for sst in &sst_info { + let parquet_key = IndexKey::new(region_id, sst.file_id, FileType::Parquet); + let parquet_path = upload_request + .dest_path_provider + .build_sst_file_path(sst.file_id); + self.upload(parquet_key, &parquet_path, remote_store) + .await?; + + if sst.index_metadata.file_size > 0 { + let puffin_key = IndexKey::new(region_id, sst.file_id, FileType::Puffin); + let puffin_path = &upload_request + .dest_path_provider + .build_index_file_path(sst.file_id); + self.upload(puffin_key, puffin_path, remote_store).await?; + } + } + + Ok(sst_info) } /// Removes a file from the cache by `index_key`. @@ -319,10 +327,8 @@ impl WriteCache { /// Request to write and upload a SST. pub struct SstUploadRequest { - /// Path to upload the file. - pub upload_path: String, - /// Path to upload the index file. - pub index_upload_path: String, + /// Destination path provider of which SST files in write cache should be uploaded to. + pub dest_path_provider: RegionFilePathFactory, /// Remote object store to upload. pub remote_store: ObjectStore, } @@ -336,11 +342,9 @@ mod tests { use crate::cache::test_util::new_fs_store; use crate::cache::{CacheManager, CacheStrategy}; use crate::region::options::IndexOptions; - use crate::sst::file::FileId; - use crate::sst::location::{index_file_path, sst_file_path}; use crate::sst::parquet::reader::ParquetReaderBuilder; use crate::test_util::sst_util::{ - assert_parquet_metadata_eq, new_batch_by_range, new_source, sst_file_handle, + assert_parquet_metadata_eq, new_batch_by_range, new_source, sst_file_handle_with_file_id, sst_region_metadata, }; use crate::test_util::TestEnv; @@ -351,9 +355,9 @@ mod tests { // and now just use local file system to mock. let mut env = TestEnv::new(); let mock_store = env.init_object_store_manager(); - let file_id = FileId::random(); - let upload_path = sst_file_path("test", file_id); - let index_upload_path = index_file_path("test", file_id); + let path_provider = RegionFilePathFactory { + region_dir: "test".to_string(), + }; let local_dir = create_temp_dir(""); let local_store = new_fs_store(local_dir.path().to_str().unwrap()); @@ -373,7 +377,6 @@ mod tests { let write_request = SstWriteRequest { op_type: OperationType::Flush, - file_id, metadata, source, storage: None, @@ -386,8 +389,7 @@ mod tests { }; let upload_request = SstUploadRequest { - upload_path: upload_path.clone(), - index_upload_path: index_upload_path.clone(), + dest_path_provider: path_provider.clone(), remote_store: mock_store.clone(), }; @@ -397,18 +399,22 @@ mod tests { }; // Write to cache and upload sst to mock remote store - write_cache + let sst_info = write_cache .write_and_upload_sst(write_request, upload_request, &write_opts) .await .unwrap() - .unwrap(); + .remove(0); //todo(hl): we assume it only creates one file. + + let file_id = sst_info.file_id; + let sst_upload_path = path_provider.build_sst_file_path(file_id); + let index_upload_path = path_provider.build_index_file_path(file_id); // Check write cache contains the key let key = IndexKey::new(region_id, file_id, FileType::Parquet); assert!(write_cache.file_cache.contains_key(&key)); // Check file data - let remote_data = mock_store.read(&upload_path).await.unwrap(); + let remote_data = mock_store.read(&sst_upload_path).await.unwrap(); let cache_data = local_store .read(&write_cache.file_cache.cache_file_path(key)) .await @@ -436,6 +442,7 @@ mod tests { #[tokio::test] async fn test_read_metadata_from_write_cache() { + common_telemetry::init_default_ut_logging(); let mut env = TestEnv::new(); let data_home = env.data_home().display().to_string(); let mock_store = env.init_object_store_manager(); @@ -456,8 +463,7 @@ mod tests { // Create source let metadata = Arc::new(sst_region_metadata()); - let handle = sst_file_handle(0, 1000); - let file_id = handle.file_id(); + let source = new_source(&[ new_batch_by_range(&["a", "d"], 0, 60), new_batch_by_range(&["b", "f"], 0, 40), @@ -467,7 +473,6 @@ mod tests { // Write to local cache and upload sst to mock remote store let write_request = SstWriteRequest { op_type: OperationType::Flush, - file_id, metadata, source, storage: None, @@ -482,11 +487,10 @@ mod tests { row_group_size: 512, ..Default::default() }; - let upload_path = sst_file_path(&data_home, file_id); - let index_upload_path = index_file_path(&data_home, file_id); let upload_request = SstUploadRequest { - upload_path: upload_path.clone(), - index_upload_path: index_upload_path.clone(), + dest_path_provider: RegionFilePathFactory { + region_dir: data_home.clone(), + }, remote_store: mock_store.clone(), }; @@ -494,10 +498,11 @@ mod tests { .write_and_upload_sst(write_request, upload_request, &write_opts) .await .unwrap() - .unwrap(); + .remove(0); let write_parquet_metadata = sst_info.file_metadata.unwrap(); // Read metadata from write cache + let handle = sst_file_handle_with_file_id(sst_info.file_id, 0, 1000); let builder = ParquetReaderBuilder::new(data_home, handle.clone(), mock_store.clone()) .cache(CacheStrategy::EnableAll(cache_manager.clone())); let reader = builder.build().await.unwrap(); diff --git a/src/mito2/src/compaction.rs b/src/mito2/src/compaction.rs index 14edc748eb..057d9ca720 100644 --- a/src/mito2/src/compaction.rs +++ b/src/mito2/src/compaction.rs @@ -68,7 +68,7 @@ use crate::schedule::remote_job_scheduler::{ CompactionJob, DefaultNotifier, RemoteJob, RemoteJobSchedulerRef, }; use crate::schedule::scheduler::SchedulerRef; -use crate::sst::file::{FileHandle, FileId, FileMeta, Level}; +use crate::sst::file::{FileHandle, FileMeta, Level}; use crate::sst::version::LevelMeta; use crate::worker::WorkerListener; @@ -596,7 +596,6 @@ impl CompactionStatus { #[derive(Debug, Clone)] pub struct CompactionOutput { - pub output_file_id: FileId, /// Compaction output file level. pub output_level: Level, /// Compaction input files. @@ -610,7 +609,6 @@ pub struct CompactionOutput { /// SerializedCompactionOutput is a serialized version of [CompactionOutput] by replacing [FileHandle] with [FileMeta]. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct SerializedCompactionOutput { - output_file_id: FileId, output_level: Level, inputs: Vec, filter_deleted: bool, diff --git a/src/mito2/src/compaction/compactor.rs b/src/mito2/src/compaction/compactor.rs index ae3aeea45b..6bda8c578f 100644 --- a/src/mito2/src/compaction/compactor.rs +++ b/src/mito2/src/compaction/compactor.rs @@ -20,6 +20,7 @@ use api::v1::region::compact_request; use common_meta::key::SchemaMetadataManagerRef; use common_telemetry::{info, warn}; use common_time::TimeToLive; +use itertools::Itertools; use object_store::manager::ObjectStoreManagerRef; use serde::{Deserialize, Serialize}; use snafu::{OptionExt, ResultExt}; @@ -278,19 +279,6 @@ impl Compactor for DefaultCompactor { for output in picker_output.outputs.drain(..) { compacted_inputs.extend(output.inputs.iter().map(|f| f.meta_ref().clone())); - - info!( - "Compaction region {} output [{}]-> {}", - compaction_region.region_id, - output - .inputs - .iter() - .map(|f| f.file_id().to_string()) - .collect::>() - .join(","), - output.output_file_id - ); - let write_opts = WriteOptions { write_buffer_size: compaction_region.engine_config.sst_write_buffer_size, ..Default::default() @@ -299,7 +287,6 @@ impl Compactor for DefaultCompactor { let region_metadata = compaction_region.region_metadata.clone(); let sst_layer = compaction_region.access_layer.clone(); let region_id = compaction_region.region_id; - let file_id = output.output_file_id; let cache_manager = compaction_region.cache_manager.clone(); let storage = compaction_region.region_options.storage.clone(); let index_options = compaction_region @@ -320,6 +307,11 @@ impl Compactor for DefaultCompactor { .max() .flatten(); futs.push(async move { + let input_file_names = output + .inputs + .iter() + .map(|f| f.file_id().to_string()) + .join(","); let reader = CompactionSstReaderBuilder { metadata: region_metadata.clone(), sst_layer: sst_layer.clone(), @@ -332,11 +324,10 @@ impl Compactor for DefaultCompactor { } .build_sst_reader() .await?; - let file_meta_opt = sst_layer + let output_files = sst_layer .write_sst( SstWriteRequest { op_type: OperationType::Compact, - file_id, metadata: region_metadata, source: Source::Reader(reader), cache_manager, @@ -350,9 +341,10 @@ impl Compactor for DefaultCompactor { &write_opts, ) .await? + .into_iter() .map(|sst_info| FileMeta { region_id, - file_id, + file_id: sst_info.file_id, time_range: sst_info.time_range, level: output.output_level, file_size: sst_info.file_size, @@ -361,8 +353,15 @@ impl Compactor for DefaultCompactor { num_rows: sst_info.num_rows as u64, num_row_groups: sst_info.num_row_groups, sequence: max_sequence, - }); - Ok(file_meta_opt) + }) + .collect::>(); + let output_file_names = + output_files.iter().map(|f| f.file_id.to_string()).join(","); + info!( + "Region {} compaction inputs: [{}], outputs: [{}]", + region_id, input_file_names, output_file_names + ); + Ok(output_files) }); } let mut output_files = Vec::with_capacity(futs.len()); @@ -377,7 +376,7 @@ impl Compactor for DefaultCompactor { .await .context(JoinSnafu)? .into_iter() - .collect::>>()?; + .collect::>>>()?; output_files.extend(metas.into_iter().flatten()); } diff --git a/src/mito2/src/compaction/picker.rs b/src/mito2/src/compaction/picker.rs index 9397c2bf64..431973c3b6 100644 --- a/src/mito2/src/compaction/picker.rs +++ b/src/mito2/src/compaction/picker.rs @@ -61,7 +61,6 @@ impl From<&PickerOutput> for SerializedPickerOutput { .outputs .iter() .map(|output| SerializedCompactionOutput { - output_file_id: output.output_file_id, output_level: output.output_level, inputs: output.inputs.iter().map(|s| s.meta_ref().clone()).collect(), filter_deleted: output.filter_deleted, @@ -91,7 +90,6 @@ impl PickerOutput { .outputs .into_iter() .map(|output| CompactionOutput { - output_file_id: output.output_file_id, output_level: output.output_level, inputs: output .inputs @@ -167,14 +165,12 @@ mod tests { let picker_output = PickerOutput { outputs: vec![ CompactionOutput { - output_file_id: FileId::random(), output_level: 0, inputs: inputs_file_handle.clone(), filter_deleted: false, output_time_range: None, }, CompactionOutput { - output_file_id: FileId::random(), output_level: 0, inputs: inputs_file_handle.clone(), filter_deleted: false, @@ -205,7 +201,6 @@ mod tests { .iter() .zip(picker_output_from_serialized.outputs.iter()) .for_each(|(expected, actual)| { - assert_eq!(expected.output_file_id, actual.output_file_id); assert_eq!(expected.output_level, actual.output_level); expected .inputs diff --git a/src/mito2/src/compaction/twcs.rs b/src/mito2/src/compaction/twcs.rs index 8efaa6c65f..a4e8913eef 100644 --- a/src/mito2/src/compaction/twcs.rs +++ b/src/mito2/src/compaction/twcs.rs @@ -26,7 +26,7 @@ use crate::compaction::compactor::CompactionRegion; use crate::compaction::picker::{Picker, PickerOutput}; use crate::compaction::run::{find_sorted_runs, reduce_runs, Item}; use crate::compaction::{get_expired_ssts, CompactionOutput}; -use crate::sst::file::{overlaps, FileHandle, FileId, Level}; +use crate::sst::file::{overlaps, FileHandle, Level}; use crate::sst::version::LevelMeta; const LEVEL_COMPACTED: Level = 1; @@ -134,7 +134,6 @@ impl TwcsPicker { for input in split_inputs { debug_assert!(input.len() > 1); output.push(CompactionOutput { - output_file_id: FileId::random(), output_level: LEVEL_COMPACTED, // always compact to l1 inputs: input, filter_deleted, @@ -373,7 +372,7 @@ mod tests { use super::*; use crate::compaction::test_util::{new_file_handle, new_file_handles}; - use crate::sst::file::{FileMeta, Level}; + use crate::sst::file::{FileId, FileMeta, Level}; use crate::test_util::NoopFilePurger; #[test] diff --git a/src/mito2/src/compaction/window.rs b/src/mito2/src/compaction/window.rs index 10bdb47297..f7ad4af893 100644 --- a/src/mito2/src/compaction/window.rs +++ b/src/mito2/src/compaction/window.rs @@ -26,7 +26,7 @@ use crate::compaction::buckets::infer_time_bucket; use crate::compaction::compactor::{CompactionRegion, CompactionVersion}; use crate::compaction::picker::{Picker, PickerOutput}; use crate::compaction::{get_expired_ssts, CompactionOutput}; -use crate::sst::file::{FileHandle, FileId}; +use crate::sst::file::FileHandle; /// Compaction picker that splits the time range of all involved files to windows, and merges /// the data segments intersects with those windows of files together so that the output files @@ -132,7 +132,6 @@ fn build_output(windows: BTreeMap)>) -> Vec, - sequence: UInt64VectorBuilder, - op_type: UInt8VectorBuilder, + timestamp: Vec, + timestamp_type: ConcreteDataType, + sequence: Vec, + op_type: Vec, fields: Vec>>, field_types: Vec, } impl ValueBuilder { fn new(region_metadata: &RegionMetadataRef, capacity: usize) -> Self { - let timestamp = region_metadata + let timestamp_type = region_metadata .time_index_column() .column_schema .data_type - .create_mutable_vector(capacity); - let sequence = UInt64VectorBuilder::with_capacity(capacity); - let op_type = UInt8VectorBuilder::with_capacity(capacity); + .clone(); + let sequence = Vec::with_capacity(capacity); + let op_type = Vec::with_capacity(capacity); let field_types = region_metadata .field_columns() @@ -715,7 +718,8 @@ impl ValueBuilder { let fields = (0..field_types.len()).map(|_| None).collect(); Self { - timestamp, + timestamp: Vec::with_capacity(capacity), + timestamp_type, sequence, op_type, fields, @@ -727,9 +731,10 @@ impl ValueBuilder { /// We don't need primary keys since they've already be encoded. fn push(&mut self, ts: ValueRef, sequence: u64, op_type: u8, fields: Vec) { debug_assert_eq!(fields.len(), self.fields.len()); - self.timestamp.push_value_ref(ts); - self.sequence.push_value_ref(ValueRef::UInt64(sequence)); - self.op_type.push_value_ref(ValueRef::UInt8(op_type)); + self.timestamp + .push(ts.as_timestamp().unwrap().unwrap().value()); + self.sequence.push(sequence); + self.op_type.push(op_type); let num_rows = self.timestamp.len(); for (idx, field_value) in fields.into_iter().enumerate() { if !field_value.is_null() || self.fields[idx].is_some() { @@ -844,9 +849,23 @@ impl From for Values { } }) .collect::>(); - let sequence = Arc::new(value.sequence.finish()); - let op_type = Arc::new(value.op_type.finish()); - let timestamp = value.timestamp.to_vector(); + let sequence = Arc::new(UInt64Vector::from_vec(value.sequence)); + let op_type = Arc::new(UInt8Vector::from_vec(value.op_type)); + let timestamp: VectorRef = match value.timestamp_type { + ConcreteDataType::Timestamp(TimestampType::Second(_)) => { + Arc::new(TimestampSecondVector::from_vec(value.timestamp)) + } + ConcreteDataType::Timestamp(TimestampType::Millisecond(_)) => { + Arc::new(TimestampMillisecondVector::from_vec(value.timestamp)) + } + ConcreteDataType::Timestamp(TimestampType::Microsecond(_)) => { + Arc::new(TimestampMicrosecondVector::from_vec(value.timestamp)) + } + ConcreteDataType::Timestamp(TimestampType::Nanosecond(_)) => { + Arc::new(TimestampNanosecondVector::from_vec(value.timestamp)) + } + _ => unreachable!(), + }; if cfg!(debug_assertions) { debug_assert_eq!(timestamp.len(), sequence.len()); @@ -1167,6 +1186,7 @@ mod tests { rows, }), write_hint: None, + bulk: Vec::new(), }; KeyValues::new(schema.as_ref(), mutation).unwrap() } diff --git a/src/mito2/src/region/opener.rs b/src/mito2/src/region/opener.rs index 1071a2ffb2..1132808236 100644 --- a/src/mito2/src/region/opener.rs +++ b/src/mito2/src/region/opener.rs @@ -544,10 +544,12 @@ where .as_ref() .map(|rows| rows.rows.len()) .unwrap_or(0); + // TODO(yingwen): We need to support schema change as bulk may have different schema. region_write_ctx.push_mutation( mutation.op_type, mutation.rows, mutation.write_hint, + mutation.bulk, OptionOutputTx::none(), ); } diff --git a/src/mito2/src/region_write_ctx.rs b/src/mito2/src/region_write_ctx.rs index 0047822b4d..2aa1e48880 100644 --- a/src/mito2/src/region_write_ctx.rs +++ b/src/mito2/src/region_write_ctx.rs @@ -136,6 +136,7 @@ impl RegionWriteCtx { op_type: i32, rows: Option, write_hint: Option, + bulk: Vec, tx: OptionOutputTx, ) { let num_rows = rows.as_ref().map(|rows| rows.rows.len()).unwrap_or(0); @@ -144,6 +145,7 @@ impl RegionWriteCtx { sequence: self.next_sequence, rows, write_hint, + bulk, }); let notify = WriteNotify::new(tx, num_rows); diff --git a/src/mito2/src/sst/index.rs b/src/mito2/src/sst/index.rs index 311b2d0191..da1f6c86a3 100644 --- a/src/mito2/src/sst/index.rs +++ b/src/mito2/src/sst/index.rs @@ -105,7 +105,6 @@ pub struct Indexer { file_id: FileId, file_path: String, region_id: RegionId, - puffin_manager: Option, inverted_indexer: Option, last_mem_inverted_index: usize, @@ -168,11 +167,15 @@ impl Indexer { } } -pub(crate) struct IndexerBuilder<'a> { +#[async_trait::async_trait] +pub trait IndexerBuilder { + /// Builds indexer of given file id to [index_file_path]. + async fn build(&self, file_id: FileId, index_file_path: String) -> Indexer; +} + +pub(crate) struct IndexerBuilderImpl { pub(crate) op_type: OperationType, - pub(crate) file_id: FileId, - pub(crate) file_path: String, - pub(crate) metadata: &'a RegionMetadataRef, + pub(crate) metadata: RegionMetadataRef, pub(crate) row_group_size: usize, pub(crate) puffin_manager: SstPuffinManager, pub(crate) intermediate_manager: IntermediateManager, @@ -182,20 +185,20 @@ pub(crate) struct IndexerBuilder<'a> { pub(crate) bloom_filter_index_config: BloomFilterConfig, } -impl IndexerBuilder<'_> { +#[async_trait::async_trait] +impl IndexerBuilder for IndexerBuilderImpl { /// Sanity check for arguments and create a new [Indexer] if arguments are valid. - pub(crate) async fn build(self) -> Indexer { + async fn build(&self, file_id: FileId, index_file_path: String) -> Indexer { let mut indexer = Indexer { - file_id: self.file_id, - file_path: self.file_path.clone(), + file_id, + file_path: index_file_path, region_id: self.metadata.region_id, - ..Default::default() }; - indexer.inverted_indexer = self.build_inverted_indexer(); - indexer.fulltext_indexer = self.build_fulltext_indexer().await; - indexer.bloom_filter_indexer = self.build_bloom_filter_indexer(); + indexer.inverted_indexer = self.build_inverted_indexer(file_id); + indexer.fulltext_indexer = self.build_fulltext_indexer(file_id).await; + indexer.bloom_filter_indexer = self.build_bloom_filter_indexer(file_id); if indexer.inverted_indexer.is_none() && indexer.fulltext_indexer.is_none() && indexer.bloom_filter_indexer.is_none() @@ -204,11 +207,13 @@ impl IndexerBuilder<'_> { return Indexer::default(); } - indexer.puffin_manager = Some(self.puffin_manager); + indexer.puffin_manager = Some(self.puffin_manager.clone()); indexer } +} - fn build_inverted_indexer(&self) -> Option { +impl IndexerBuilderImpl { + fn build_inverted_indexer(&self, file_id: FileId) -> Option { let create = match self.op_type { OperationType::Flush => self.inverted_index_config.create_on_flush.auto(), OperationType::Compact => self.inverted_index_config.create_on_compaction.auto(), @@ -217,7 +222,7 @@ impl IndexerBuilder<'_> { if !create { debug!( "Skip creating inverted index due to config, region_id: {}, file_id: {}", - self.metadata.region_id, self.file_id, + self.metadata.region_id, file_id, ); return None; } @@ -228,7 +233,7 @@ impl IndexerBuilder<'_> { if indexed_column_ids.is_empty() { debug!( "No columns to be indexed, skip creating inverted index, region_id: {}, file_id: {}", - self.metadata.region_id, self.file_id, + self.metadata.region_id, file_id, ); return None; } @@ -238,7 +243,7 @@ impl IndexerBuilder<'_> { else { warn!( "Segment row count is 0, skip creating index, region_id: {}, file_id: {}", - self.metadata.region_id, self.file_id, + self.metadata.region_id, file_id, ); return None; }; @@ -246,7 +251,7 @@ impl IndexerBuilder<'_> { let Some(row_group_size) = NonZeroUsize::new(self.row_group_size) else { warn!( "Row group size is 0, skip creating index, region_id: {}, file_id: {}", - self.metadata.region_id, self.file_id, + self.metadata.region_id, file_id, ); return None; }; @@ -257,8 +262,8 @@ impl IndexerBuilder<'_> { } let indexer = InvertedIndexer::new( - self.file_id, - self.metadata, + file_id, + &self.metadata, self.intermediate_manager.clone(), self.inverted_index_config.mem_threshold_on_create(), segment_row_count, @@ -268,7 +273,7 @@ impl IndexerBuilder<'_> { Some(indexer) } - async fn build_fulltext_indexer(&self) -> Option { + async fn build_fulltext_indexer(&self, file_id: FileId) -> Option { let create = match self.op_type { OperationType::Flush => self.fulltext_index_config.create_on_flush.auto(), OperationType::Compact => self.fulltext_index_config.create_on_compaction.auto(), @@ -277,7 +282,7 @@ impl IndexerBuilder<'_> { if !create { debug!( "Skip creating full-text index due to config, region_id: {}, file_id: {}", - self.metadata.region_id, self.file_id, + self.metadata.region_id, file_id, ); return None; } @@ -285,9 +290,9 @@ impl IndexerBuilder<'_> { let mem_limit = self.fulltext_index_config.mem_threshold_on_create(); let creator = FulltextIndexer::new( &self.metadata.region_id, - &self.file_id, + &file_id, &self.intermediate_manager, - self.metadata, + &self.metadata, self.fulltext_index_config.compress, mem_limit, ) @@ -298,7 +303,7 @@ impl IndexerBuilder<'_> { if creator.is_none() { debug!( "Skip creating full-text index due to no columns require indexing, region_id: {}, file_id: {}", - self.metadata.region_id, self.file_id, + self.metadata.region_id, file_id, ); } return creator; @@ -309,19 +314,19 @@ impl IndexerBuilder<'_> { if cfg!(any(test, feature = "test")) { panic!( "Failed to create full-text indexer, region_id: {}, file_id: {}, err: {:?}", - self.metadata.region_id, self.file_id, err + self.metadata.region_id, file_id, err ); } else { warn!( err; "Failed to create full-text indexer, region_id: {}, file_id: {}", - self.metadata.region_id, self.file_id, + self.metadata.region_id, file_id, ); } None } - fn build_bloom_filter_indexer(&self) -> Option { + fn build_bloom_filter_indexer(&self, file_id: FileId) -> Option { let create = match self.op_type { OperationType::Flush => self.bloom_filter_index_config.create_on_flush.auto(), OperationType::Compact => self.bloom_filter_index_config.create_on_compaction.auto(), @@ -330,15 +335,15 @@ impl IndexerBuilder<'_> { if !create { debug!( "Skip creating bloom filter due to config, region_id: {}, file_id: {}", - self.metadata.region_id, self.file_id, + self.metadata.region_id, file_id, ); return None; } let mem_limit = self.bloom_filter_index_config.mem_threshold_on_create(); let indexer = BloomFilterIndexer::new( - self.file_id, - self.metadata, + file_id, + &self.metadata, self.intermediate_manager.clone(), mem_limit, ); @@ -348,7 +353,7 @@ impl IndexerBuilder<'_> { if indexer.is_none() { debug!( "Skip creating bloom filter due to no columns require indexing, region_id: {}, file_id: {}", - self.metadata.region_id, self.file_id, + self.metadata.region_id, file_id, ); } return indexer; @@ -359,12 +364,12 @@ impl IndexerBuilder<'_> { if cfg!(any(test, feature = "test")) { panic!( "Failed to create bloom filter, region_id: {}, file_id: {}, err: {:?}", - self.metadata.region_id, self.file_id, err + self.metadata.region_id, file_id, err ); } else { warn!( err; "Failed to create bloom filter, region_id: {}, file_id: {}", - self.metadata.region_id, self.file_id, + self.metadata.region_id, file_id, ); } @@ -490,11 +495,9 @@ mod tests { with_fulltext: true, with_skipping_bloom: true, }); - let indexer = IndexerBuilder { + let indexer = IndexerBuilderImpl { op_type: OperationType::Flush, - file_id: FileId::random(), - file_path: "test".to_string(), - metadata: &metadata, + metadata, row_group_size: 1024, puffin_manager: factory.build(mock_object_store()), intermediate_manager: intm_manager, @@ -503,7 +506,7 @@ mod tests { fulltext_index_config: FulltextIndexConfig::default(), bloom_filter_index_config: BloomFilterConfig::default(), } - .build() + .build(FileId::random(), "test".to_string()) .await; assert!(indexer.inverted_indexer.is_some()); @@ -522,11 +525,9 @@ mod tests { with_fulltext: true, with_skipping_bloom: true, }); - let indexer = IndexerBuilder { + let indexer = IndexerBuilderImpl { op_type: OperationType::Flush, - file_id: FileId::random(), - file_path: "test".to_string(), - metadata: &metadata, + metadata: metadata.clone(), row_group_size: 1024, puffin_manager: factory.build(mock_object_store()), intermediate_manager: intm_manager.clone(), @@ -538,18 +539,16 @@ mod tests { fulltext_index_config: FulltextIndexConfig::default(), bloom_filter_index_config: BloomFilterConfig::default(), } - .build() + .build(FileId::random(), "test".to_string()) .await; assert!(indexer.inverted_indexer.is_none()); assert!(indexer.fulltext_indexer.is_some()); assert!(indexer.bloom_filter_indexer.is_some()); - let indexer = IndexerBuilder { + let indexer = IndexerBuilderImpl { op_type: OperationType::Compact, - file_id: FileId::random(), - file_path: "test".to_string(), - metadata: &metadata, + metadata: metadata.clone(), row_group_size: 1024, puffin_manager: factory.build(mock_object_store()), intermediate_manager: intm_manager.clone(), @@ -561,18 +560,16 @@ mod tests { }, bloom_filter_index_config: BloomFilterConfig::default(), } - .build() + .build(FileId::random(), "test".to_string()) .await; assert!(indexer.inverted_indexer.is_some()); assert!(indexer.fulltext_indexer.is_none()); assert!(indexer.bloom_filter_indexer.is_some()); - let indexer = IndexerBuilder { + let indexer = IndexerBuilderImpl { op_type: OperationType::Compact, - file_id: FileId::random(), - file_path: "test".to_string(), - metadata: &metadata, + metadata, row_group_size: 1024, puffin_manager: factory.build(mock_object_store()), intermediate_manager: intm_manager, @@ -584,7 +581,7 @@ mod tests { ..Default::default() }, } - .build() + .build(FileId::random(), "test".to_string()) .await; assert!(indexer.inverted_indexer.is_some()); @@ -603,11 +600,9 @@ mod tests { with_fulltext: true, with_skipping_bloom: true, }); - let indexer = IndexerBuilder { + let indexer = IndexerBuilderImpl { op_type: OperationType::Flush, - file_id: FileId::random(), - file_path: "test".to_string(), - metadata: &metadata, + metadata: metadata.clone(), row_group_size: 1024, puffin_manager: factory.build(mock_object_store()), intermediate_manager: intm_manager.clone(), @@ -616,7 +611,7 @@ mod tests { fulltext_index_config: FulltextIndexConfig::default(), bloom_filter_index_config: BloomFilterConfig::default(), } - .build() + .build(FileId::random(), "test".to_string()) .await; assert!(indexer.inverted_indexer.is_none()); @@ -628,11 +623,9 @@ mod tests { with_fulltext: false, with_skipping_bloom: true, }); - let indexer = IndexerBuilder { + let indexer = IndexerBuilderImpl { op_type: OperationType::Flush, - file_id: FileId::random(), - file_path: "test".to_string(), - metadata: &metadata, + metadata: metadata.clone(), row_group_size: 1024, puffin_manager: factory.build(mock_object_store()), intermediate_manager: intm_manager.clone(), @@ -641,7 +634,7 @@ mod tests { fulltext_index_config: FulltextIndexConfig::default(), bloom_filter_index_config: BloomFilterConfig::default(), } - .build() + .build(FileId::random(), "test".to_string()) .await; assert!(indexer.inverted_indexer.is_some()); @@ -653,11 +646,9 @@ mod tests { with_fulltext: true, with_skipping_bloom: false, }); - let indexer = IndexerBuilder { + let indexer = IndexerBuilderImpl { op_type: OperationType::Flush, - file_id: FileId::random(), - file_path: "test".to_string(), - metadata: &metadata, + metadata: metadata.clone(), row_group_size: 1024, puffin_manager: factory.build(mock_object_store()), intermediate_manager: intm_manager, @@ -666,7 +657,7 @@ mod tests { fulltext_index_config: FulltextIndexConfig::default(), bloom_filter_index_config: BloomFilterConfig::default(), } - .build() + .build(FileId::random(), "test".to_string()) .await; assert!(indexer.inverted_indexer.is_some()); @@ -685,11 +676,9 @@ mod tests { with_fulltext: true, with_skipping_bloom: true, }); - let indexer = IndexerBuilder { + let indexer = IndexerBuilderImpl { op_type: OperationType::Flush, - file_id: FileId::random(), - file_path: "test".to_string(), - metadata: &metadata, + metadata, row_group_size: 0, puffin_manager: factory.build(mock_object_store()), intermediate_manager: intm_manager, @@ -698,7 +687,7 @@ mod tests { fulltext_index_config: FulltextIndexConfig::default(), bloom_filter_index_config: BloomFilterConfig::default(), } - .build() + .build(FileId::random(), "test".to_string()) .await; assert!(indexer.inverted_indexer.is_none()); diff --git a/src/mito2/src/sst/parquet.rs b/src/mito2/src/sst/parquet.rs index 05dafb0edf..12d16b7cda 100644 --- a/src/mito2/src/sst/parquet.rs +++ b/src/mito2/src/sst/parquet.rs @@ -19,7 +19,7 @@ use std::sync::Arc; use common_base::readable_size::ReadableSize; use parquet::file::metadata::ParquetMetaData; -use crate::sst::file::FileTimeRange; +use crate::sst::file::{FileId, FileTimeRange}; use crate::sst::index::IndexOutput; use crate::sst::DEFAULT_WRITE_BUFFER_SIZE; @@ -62,6 +62,8 @@ impl Default for WriteOptions { /// Parquet SST info returned by the writer. pub struct SstInfo { + /// SST file id. + pub file_id: FileId, /// Time range of the SST. The timestamps have the same time unit as the /// data in the SST. pub time_range: FileTimeRange, @@ -95,12 +97,13 @@ mod tests { use tokio_util::compat::FuturesAsyncWriteCompatExt; use super::*; + use crate::access_layer::FilePathProvider; use crate::cache::{CacheManager, CacheStrategy, PageKey}; - use crate::sst::index::Indexer; + use crate::sst::index::{Indexer, IndexerBuilder}; use crate::sst::parquet::format::WriteFormat; use crate::sst::parquet::reader::ParquetReaderBuilder; use crate::sst::parquet::writer::ParquetWriter; - use crate::sst::DEFAULT_WRITE_CONCURRENCY; + use crate::sst::{location, DEFAULT_WRITE_CONCURRENCY}; use crate::test_util::sst_util::{ assert_parquet_metadata_eq, build_test_binary_test_region_metadata, new_batch_by_range, new_batch_with_binary, new_source, sst_file_handle, sst_region_metadata, @@ -109,12 +112,38 @@ mod tests { const FILE_DIR: &str = "/"; + #[derive(Clone)] + struct FixedPathProvider { + file_id: FileId, + } + + impl FilePathProvider for FixedPathProvider { + fn build_index_file_path(&self, _file_id: FileId) -> String { + location::index_file_path(FILE_DIR, self.file_id) + } + + fn build_sst_file_path(&self, _file_id: FileId) -> String { + location::sst_file_path(FILE_DIR, self.file_id) + } + } + + struct NoopIndexBuilder; + + #[async_trait::async_trait] + impl IndexerBuilder for NoopIndexBuilder { + async fn build(&self, _file_id: FileId, _path: String) -> Indexer { + Indexer::default() + } + } + #[tokio::test] async fn test_write_read() { let mut env = TestEnv::new(); let object_store = env.init_object_store_manager(); let handle = sst_file_handle(0, 1000); - let file_path = handle.file_path(FILE_DIR); + let file_path = FixedPathProvider { + file_id: handle.file_id(), + }; let metadata = Arc::new(sst_region_metadata()); let source = new_source(&[ new_batch_by_range(&["a", "d"], 0, 60), @@ -126,18 +155,20 @@ mod tests { row_group_size: 50, ..Default::default() }; + let mut writer = ParquetWriter::new_with_object_store( object_store.clone(), + metadata.clone(), + NoopIndexBuilder, file_path, - metadata, - Indexer::default(), - ); + ) + .await; let info = writer .write_all(source, None, &write_opts) .await .unwrap() - .unwrap(); + .remove(0); assert_eq!(200, info.num_rows); assert!(info.file_size > 0); assert_eq!( @@ -168,7 +199,6 @@ mod tests { let mut env = TestEnv::new(); let object_store = env.init_object_store_manager(); let handle = sst_file_handle(0, 1000); - let file_path = handle.file_path(FILE_DIR); let metadata = Arc::new(sst_region_metadata()); let source = new_source(&[ new_batch_by_range(&["a", "d"], 0, 60), @@ -183,16 +213,19 @@ mod tests { // Prepare data. let mut writer = ParquetWriter::new_with_object_store( object_store.clone(), - file_path, metadata.clone(), - Indexer::default(), - ); + NoopIndexBuilder, + FixedPathProvider { + file_id: handle.file_id(), + }, + ) + .await; writer .write_all(source, None, &write_opts) .await .unwrap() - .unwrap(); + .remove(0); // Enable page cache. let cache = CacheStrategy::EnableAll(Arc::new( @@ -236,7 +269,6 @@ mod tests { let mut env = crate::test_util::TestEnv::new(); let object_store = env.init_object_store_manager(); let handle = sst_file_handle(0, 1000); - let file_path = handle.file_path(FILE_DIR); let metadata = Arc::new(sst_region_metadata()); let source = new_source(&[ new_batch_by_range(&["a", "d"], 0, 60), @@ -252,16 +284,19 @@ mod tests { // sst info contains the parquet metadata, which is converted from FileMetaData let mut writer = ParquetWriter::new_with_object_store( object_store.clone(), - file_path, metadata.clone(), - Indexer::default(), - ); + NoopIndexBuilder, + FixedPathProvider { + file_id: handle.file_id(), + }, + ) + .await; let sst_info = writer .write_all(source, None, &write_opts) .await .unwrap() - .expect("write_all should return sst info"); + .remove(0); let writer_metadata = sst_info.file_metadata.unwrap(); // read the sst file metadata @@ -277,7 +312,6 @@ mod tests { let mut env = TestEnv::new(); let object_store = env.init_object_store_manager(); let handle = sst_file_handle(0, 1000); - let file_path = handle.file_path(FILE_DIR); let metadata = Arc::new(sst_region_metadata()); let source = new_source(&[ new_batch_by_range(&["a", "d"], 0, 60), @@ -292,15 +326,18 @@ mod tests { // Prepare data. let mut writer = ParquetWriter::new_with_object_store( object_store.clone(), - file_path, metadata.clone(), - Indexer::default(), - ); + NoopIndexBuilder, + FixedPathProvider { + file_id: handle.file_id(), + }, + ) + .await; writer .write_all(source, None, &write_opts) .await .unwrap() - .unwrap(); + .remove(0); // Predicate let predicate = Some(Predicate::new(vec![Expr::BinaryExpr(BinaryExpr { @@ -330,7 +367,6 @@ mod tests { let mut env = TestEnv::new(); let object_store = env.init_object_store_manager(); let handle = sst_file_handle(0, 1000); - let file_path = handle.file_path(FILE_DIR); let metadata = Arc::new(sst_region_metadata()); let source = new_source(&[ new_batch_by_range(&["a", "z"], 0, 0), @@ -345,15 +381,18 @@ mod tests { // Prepare data. let mut writer = ParquetWriter::new_with_object_store( object_store.clone(), - file_path, metadata.clone(), - Indexer::default(), - ); + NoopIndexBuilder, + FixedPathProvider { + file_id: handle.file_id(), + }, + ) + .await; writer .write_all(source, None, &write_opts) .await .unwrap() - .unwrap(); + .remove(0); let builder = ParquetReaderBuilder::new(FILE_DIR.to_string(), handle.clone(), object_store); let mut reader = builder.build().await.unwrap(); @@ -365,7 +404,6 @@ mod tests { let mut env = TestEnv::new(); let object_store = env.init_object_store_manager(); let handle = sst_file_handle(0, 1000); - let file_path = handle.file_path(FILE_DIR); let metadata = Arc::new(sst_region_metadata()); let source = new_source(&[ new_batch_by_range(&["a", "d"], 0, 60), @@ -380,16 +418,19 @@ mod tests { // Prepare data. let mut writer = ParquetWriter::new_with_object_store( object_store.clone(), - file_path, metadata.clone(), - Indexer::default(), - ); + NoopIndexBuilder, + FixedPathProvider { + file_id: handle.file_id(), + }, + ) + .await; writer .write_all(source, None, &write_opts) .await .unwrap() - .unwrap(); + .remove(0); // Predicate let predicate = Some(Predicate::new(vec![Expr::BinaryExpr(BinaryExpr { diff --git a/src/mito2/src/sst/parquet/writer.rs b/src/mito2/src/sst/parquet/writer.rs index 01baf2df95..8d0fd38e28 100644 --- a/src/mito2/src/sst/parquet/writer.rs +++ b/src/mito2/src/sst/parquet/writer.rs @@ -28,6 +28,7 @@ use parquet::basic::{Compression, Encoding, ZstdLevel}; use parquet::file::metadata::KeyValue; use parquet::file::properties::{WriterProperties, WriterPropertiesBuilder}; use parquet::schema::types::ColumnPath; +use smallvec::smallvec; use snafu::ResultExt; use store_api::metadata::RegionMetadataRef; use store_api::storage::consts::SEQUENCE_COLUMN_NAME; @@ -35,40 +36,48 @@ use store_api::storage::SequenceNumber; use tokio::io::AsyncWrite; use tokio_util::compat::{Compat, FuturesAsyncWriteCompatExt}; +use crate::access_layer::{FilePathProvider, SstInfoArray}; use crate::error::{InvalidMetadataSnafu, OpenDalSnafu, Result, WriteParquetSnafu}; use crate::read::{Batch, Source}; -use crate::sst::index::Indexer; +use crate::sst::file::FileId; +use crate::sst::index::{Indexer, IndexerBuilder}; use crate::sst::parquet::format::WriteFormat; use crate::sst::parquet::helper::parse_parquet_metadata; use crate::sst::parquet::{SstInfo, WriteOptions, PARQUET_METADATA_KEY}; use crate::sst::{DEFAULT_WRITE_BUFFER_SIZE, DEFAULT_WRITE_CONCURRENCY}; /// Parquet SST writer. -pub struct ParquetWriter { +pub struct ParquetWriter { + /// Path provider that creates SST and index file paths according to file id. + path_provider: P, writer: Option>>, + /// Current active file id. + current_file: FileId, writer_factory: F, /// Region metadata of the source and the target SST. metadata: RegionMetadataRef, - indexer: Indexer, + /// Indexer build that can create indexer for multiple files. + indexer_builder: I, + /// Current active indexer. + current_indexer: Option, bytes_written: Arc, } pub trait WriterFactory { type Writer: AsyncWrite + Send + Unpin; - fn create(&mut self) -> impl Future>; + fn create(&mut self, file_path: &str) -> impl Future>; } pub struct ObjectStoreWriterFactory { - path: String, object_store: ObjectStore, } impl WriterFactory for ObjectStoreWriterFactory { type Writer = Compat; - async fn create(&mut self) -> Result { + async fn create(&mut self, file_path: &str) -> Result { self.object_store - .writer_with(&self.path) + .writer_with(file_path) .chunk(DEFAULT_WRITE_BUFFER_SIZE.as_bytes() as usize) .concurrent(DEFAULT_WRITE_CONCURRENCY) .await @@ -77,36 +86,73 @@ impl WriterFactory for ObjectStoreWriterFactory { } } -impl ParquetWriter { - pub fn new_with_object_store( +impl ParquetWriter +where + P: FilePathProvider, + I: IndexerBuilder, +{ + pub async fn new_with_object_store( object_store: ObjectStore, - path: String, metadata: RegionMetadataRef, - indexer: Indexer, - ) -> ParquetWriter { + indexer_builder: I, + path_provider: P, + ) -> ParquetWriter { ParquetWriter::new( - ObjectStoreWriterFactory { path, object_store }, + ObjectStoreWriterFactory { object_store }, metadata, - indexer, + indexer_builder, + path_provider, ) + .await } } -impl ParquetWriter +impl ParquetWriter where F: WriterFactory, + I: IndexerBuilder, + P: FilePathProvider, { /// Creates a new parquet SST writer. - pub fn new(factory: F, metadata: RegionMetadataRef, indexer: Indexer) -> ParquetWriter { + pub async fn new( + factory: F, + metadata: RegionMetadataRef, + indexer_builder: I, + path_provider: P, + ) -> ParquetWriter { + let init_file = FileId::random(); + let index_file_path = path_provider.build_index_file_path(init_file); + let indexer = indexer_builder.build(init_file, index_file_path).await; + ParquetWriter { + path_provider, writer: None, + current_file: init_file, writer_factory: factory, metadata, - indexer, + indexer_builder, + current_indexer: Some(indexer), bytes_written: Arc::new(AtomicUsize::new(0)), } } + async fn get_or_create_indexer(&mut self) -> &mut Indexer { + match self.current_indexer { + None => { + self.current_file = FileId::random(); + let index_file_path = self.path_provider.build_index_file_path(self.current_file); + let indexer = self + .indexer_builder + .build(self.current_file, index_file_path) + .await; + self.current_indexer = Some(indexer); + // safety: self.current_indexer already set above. + self.current_indexer.as_mut().unwrap() + } + Some(ref mut indexer) => indexer, + } + } + /// Iterates source and writes all rows to Parquet file. /// /// Returns the [SstInfo] if the SST is written. @@ -115,7 +161,7 @@ where mut source: Source, override_sequence: Option, // override the `sequence` field from `Source` opts: &WriteOptions, - ) -> Result> { + ) -> Result { let write_format = WriteFormat::new(self.metadata.clone()).with_override_sequence(override_sequence); let mut stats = SourceStats::default(); @@ -128,24 +174,24 @@ where match res { Ok(mut batch) => { stats.update(&batch); - self.indexer.update(&mut batch).await; + self.get_or_create_indexer().await.update(&mut batch).await; } Err(e) => { - self.indexer.abort().await; + self.get_or_create_indexer().await.abort().await; return Err(e); } } } - let index_output = self.indexer.finish().await; + let index_output = self.get_or_create_indexer().await.finish().await; if stats.num_rows == 0 { - return Ok(None); + return Ok(smallvec![]); } let Some(mut arrow_writer) = self.writer.take() else { // No batch actually written. - return Ok(None); + return Ok(smallvec![]); }; arrow_writer.flush().await.context(WriteParquetSnafu)?; @@ -159,15 +205,18 @@ where // convert FileMetaData to ParquetMetaData let parquet_metadata = parse_parquet_metadata(file_meta)?; + let file_id = self.current_file; + // object_store.write will make sure all bytes are written or an error is raised. - Ok(Some(SstInfo { + Ok(smallvec![SstInfo { + file_id, time_range, file_size, num_rows: stats.num_rows, num_row_groups: parquet_metadata.num_row_groups() as u64, file_metadata: Some(Arc::new(parquet_metadata)), index_metadata: index_output, - })) + }]) } /// Customizes per-column config according to schema and maybe column cardinality. @@ -229,8 +278,9 @@ where let props_builder = Self::customize_column_config(props_builder, &self.metadata); let writer_props = props_builder.build(); + let sst_file_path = self.path_provider.build_sst_file_path(self.current_file); let writer = SizeAwareWriter::new( - self.writer_factory.create().await?, + self.writer_factory.create(&sst_file_path).await?, self.bytes_written.clone(), ); let arrow_writer = diff --git a/src/mito2/src/test_util/memtable_util.rs b/src/mito2/src/test_util/memtable_util.rs index 4cb4469dc0..671d6b0e6c 100644 --- a/src/mito2/src/test_util/memtable_util.rs +++ b/src/mito2/src/test_util/memtable_util.rs @@ -290,6 +290,7 @@ pub(crate) fn build_key_values_with_ts_seq_values( rows, }), write_hint: None, + bulk: Vec::new(), }; KeyValues::new(metadata.as_ref(), mutation).unwrap() } diff --git a/src/mito2/src/test_util/sst_util.rs b/src/mito2/src/test_util/sst_util.rs index 8bef6d205b..1002fc4d79 100644 --- a/src/mito2/src/test_util/sst_util.rs +++ b/src/mito2/src/test_util/sst_util.rs @@ -104,13 +104,13 @@ pub fn new_source(batches: &[Batch]) -> Source { Source::Reader(Box::new(reader)) } -/// Creates a new [FileHandle] for a SST. -pub fn sst_file_handle(start_ms: i64, end_ms: i64) -> FileHandle { +/// Creates a SST file handle with provided file id +pub fn sst_file_handle_with_file_id(file_id: FileId, start_ms: i64, end_ms: i64) -> FileHandle { let file_purger = new_noop_file_purger(); FileHandle::new( FileMeta { region_id: REGION_ID, - file_id: FileId::random(), + file_id, time_range: ( Timestamp::new_millisecond(start_ms), Timestamp::new_millisecond(end_ms), @@ -127,6 +127,11 @@ pub fn sst_file_handle(start_ms: i64, end_ms: i64) -> FileHandle { ) } +/// Creates a new [FileHandle] for a SST. +pub fn sst_file_handle(start_ms: i64, end_ms: i64) -> FileHandle { + sst_file_handle_with_file_id(FileId::random(), start_ms, end_ms) +} + pub fn new_batch_by_range(tags: &[&str], start: usize, end: usize) -> Batch { assert!(end >= start); let pk = new_primary_key(tags); diff --git a/src/mito2/src/test_util/version_util.rs b/src/mito2/src/test_util/version_util.rs index 0d6d0e62c0..7c719d7d65 100644 --- a/src/mito2/src/test_util/version_util.rs +++ b/src/mito2/src/test_util/version_util.rs @@ -166,6 +166,7 @@ pub(crate) fn write_rows_to_version( sequence: start_ts as u64, // The sequence may be incorrect, but it's fine in test. rows: Some(rows), write_hint: None, + bulk: Vec::new(), }; let key_values = KeyValues::new(&version.metadata, mutation).unwrap(); version.memtables.mutable.write(&key_values).unwrap(); diff --git a/src/mito2/src/wal.rs b/src/mito2/src/wal.rs index a23de504f3..8752fc115b 100644 --- a/src/mito2/src/wal.rs +++ b/src/mito2/src/wal.rs @@ -288,6 +288,7 @@ mod tests { sequence, rows: Some(Rows { schema, rows }), write_hint: None, + bulk: Vec::new(), } } diff --git a/src/mito2/src/wal/entry_distributor.rs b/src/mito2/src/wal/entry_distributor.rs index 3c0891ecf0..6dd30f8639 100644 --- a/src/mito2/src/wal/entry_distributor.rs +++ b/src/mito2/src/wal/entry_distributor.rs @@ -176,7 +176,7 @@ pub const DEFAULT_ENTRY_RECEIVER_BUFFER_SIZE: usize = 2048; /// | /// // may deadlock | /// distributor.distribute().await; | -/// | +/// | /// | /// receivers[0].read().await | /// ``` @@ -280,6 +280,7 @@ mod tests { sequence: 1u64, rows: None, write_hint: None, + bulk: Vec::new(), }], } .encode_to_vec(), @@ -294,6 +295,7 @@ mod tests { sequence: 2u64, rows: None, write_hint: None, + bulk: Vec::new(), }], } .encode_to_vec(), @@ -308,6 +310,7 @@ mod tests { sequence: 3u64, rows: None, write_hint: None, + bulk: Vec::new(), }], } .encode_to_vec(), @@ -352,6 +355,7 @@ mod tests { sequence: 1u64, rows: None, write_hint: None, + bulk: Vec::new(), }], } )] @@ -372,6 +376,7 @@ mod tests { sequence: 2u64, rows: None, write_hint: None, + bulk: Vec::new(), }], } )] @@ -388,6 +393,7 @@ mod tests { sequence: 1u64, rows: None, write_hint: None, + bulk: Vec::new(), }], }; let region2 = RegionId::new(1, 2); @@ -397,6 +403,7 @@ mod tests { sequence: 3u64, rows: None, write_hint: None, + bulk: Vec::new(), }], }; let region3 = RegionId::new(1, 3); @@ -406,6 +413,7 @@ mod tests { sequence: 3u64, rows: None, write_hint: None, + bulk: Vec::new(), }], }; let provider = Provider::kafka_provider("my_topic".to_string()); @@ -484,6 +492,7 @@ mod tests { sequence: 1u64, rows: None, write_hint: None, + bulk: Vec::new(), }], }; let region2 = RegionId::new(1, 2); @@ -561,6 +570,7 @@ mod tests { sequence: 1u64, rows: None, write_hint: None, + bulk: Vec::new(), }], } .encode_to_vec(), @@ -575,6 +585,7 @@ mod tests { sequence: 2u64, rows: None, write_hint: None, + bulk: Vec::new(), }], } .encode_to_vec(), @@ -589,6 +600,7 @@ mod tests { sequence: 3u64, rows: None, write_hint: None, + bulk: Vec::new(), }], } .encode_to_vec(), @@ -603,6 +615,7 @@ mod tests { sequence: 4u64, rows: None, write_hint: None, + bulk: Vec::new(), }], } .encode_to_vec(), @@ -638,6 +651,7 @@ mod tests { sequence: 4u64, rows: None, write_hint: None, + bulk: Vec::new(), }], } )] diff --git a/src/mito2/src/wal/entry_reader.rs b/src/mito2/src/wal/entry_reader.rs index bbcd425189..f4f517841f 100644 --- a/src/mito2/src/wal/entry_reader.rs +++ b/src/mito2/src/wal/entry_reader.rs @@ -116,6 +116,7 @@ mod tests { sequence: 1u64, rows: None, write_hint: None, + bulk: Vec::new(), }], }; let encoded_entry = wal_entry.encode_to_vec(); diff --git a/src/mito2/src/worker/handle_write.rs b/src/mito2/src/worker/handle_write.rs index 5f9fdd698c..aa84be93fc 100644 --- a/src/mito2/src/worker/handle_write.rs +++ b/src/mito2/src/worker/handle_write.rs @@ -246,10 +246,12 @@ impl RegionWorkerLoop { } // Collect requests by region. + // TODO(yingwen): Encode into bulk. region_ctx.push_mutation( sender_req.request.op_type as i32, Some(sender_req.request.rows), sender_req.request.hint, + Vec::new(), sender_req.sender, ); } diff --git a/src/promql/src/extension_plan/series_divide.rs b/src/promql/src/extension_plan/series_divide.rs index fbdaa7dec3..b5fc923c00 100644 --- a/src/promql/src/extension_plan/series_divide.rs +++ b/src/promql/src/extension_plan/series_divide.rs @@ -258,7 +258,10 @@ impl Stream for SeriesDivideStream { let timer = std::time::Instant::now(); loop { if !self.buffer.is_empty() { - let cut_at = self.find_first_diff_row(); + let cut_at = match self.find_first_diff_row() { + Ok(cut_at) => cut_at, + Err(e) => return Poll::Ready(Some(Err(e))), + }; if let Some((batch_index, row_index)) = cut_at { // slice out the first time series and return it. let half_batch_of_first_series = @@ -322,10 +325,10 @@ impl SeriesDivideStream { /// Return the position to cut buffer. /// None implies the current buffer only contains one time series. - fn find_first_diff_row(&mut self) -> Option<(usize, usize)> { + fn find_first_diff_row(&mut self) -> DataFusionResult> { // fast path: no tag columns means all data belongs to the same series. if self.tag_indices.is_empty() { - return None; + return Ok(None); } let mut resumed_batch_index = self.inspect_start; @@ -341,18 +344,26 @@ impl SeriesDivideStream { for index in &self.tag_indices { let current_array = batch.column(*index); let last_array = last_batch.column(*index); - let current_value = current_array + let current_string_array = current_array .as_any() .downcast_ref::() - .unwrap() - .value(0); - let last_value = last_array + .ok_or_else(|| { + datafusion::error::DataFusionError::Internal( + "Failed to downcast tag column to StringArray".to_string(), + ) + })?; + let last_string_array = last_array .as_any() .downcast_ref::() - .unwrap() - .value(last_row); + .ok_or_else(|| { + datafusion::error::DataFusionError::Internal( + "Failed to downcast tag column to StringArray".to_string(), + ) + })?; + let current_value = current_string_array.value(0); + let last_value = last_string_array.value(last_row); if current_value != last_value { - return Some((resumed_batch_index, 0)); + return Ok(Some((resumed_batch_index, 0))); } } } @@ -360,7 +371,15 @@ impl SeriesDivideStream { // check column by column for index in &self.tag_indices { let array = batch.column(*index); - let string_array = array.as_any().downcast_ref::().unwrap(); + let string_array = + array + .as_any() + .downcast_ref::() + .ok_or_else(|| { + datafusion::error::DataFusionError::Internal( + "Failed to downcast tag column to StringArray".to_string(), + ) + })?; // the first row number that not equal to the next row. let mut same_until = 0; while same_until < num_rows - 1 { @@ -376,12 +395,12 @@ impl SeriesDivideStream { // all rows are the same, inspect next batch resumed_batch_index += 1; } else { - return Some((resumed_batch_index, result_index)); + return Ok(Some((resumed_batch_index, result_index))); } } self.inspect_start = resumed_batch_index; - None + Ok(None) } } diff --git a/src/puffin/src/puffin_manager/fs_puffin_manager.rs b/src/puffin/src/puffin_manager/fs_puffin_manager.rs index 52190f92fb..c03a86aaf6 100644 --- a/src/puffin/src/puffin_manager/fs_puffin_manager.rs +++ b/src/puffin/src/puffin_manager/fs_puffin_manager.rs @@ -27,6 +27,7 @@ use crate::puffin_manager::stager::Stager; use crate::puffin_manager::PuffinManager; /// `FsPuffinManager` is a `PuffinManager` that provides readers and writers for puffin data in filesystem. +#[derive(Clone)] pub struct FsPuffinManager { /// The stager. stager: S, diff --git a/src/query/src/promql/error.rs b/src/query/src/promql/error.rs index f204cdbd7b..532f598b77 100644 --- a/src/query/src/promql/error.rs +++ b/src/query/src/promql/error.rs @@ -128,6 +128,18 @@ pub enum Error { location: Location, }, + #[snafu(display( + "The end time must be greater than start time, start: {:?}, end: {:?}", + start, + end + ))] + InvalidTimeRange { + start: i64, + end: i64, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("Cannot find column {col}"))] ColumnNotFound { col: String, @@ -193,6 +205,7 @@ impl ErrorExt for Error { | MultipleVector { .. } | ExpectRangeSelector { .. } | ZeroRangeSelector { .. } + | InvalidTimeRange { .. } | ColumnNotFound { .. } | FunctionInvalidArgument { .. } | UnsupportedVectorMatch { .. } diff --git a/src/query/src/promql/planner.rs b/src/query/src/promql/planner.rs index 69b4aef6d5..49c6051454 100644 --- a/src/query/src/promql/planner.rs +++ b/src/query/src/promql/planner.rs @@ -63,15 +63,19 @@ use promql_parser::parser::{ VectorMatchCardinality, VectorSelector, }; use snafu::{ensure, OptionExt, ResultExt}; +use store_api::metric_engine_consts::{ + DATA_SCHEMA_TABLE_ID_COLUMN_NAME, DATA_SCHEMA_TSID_COLUMN_NAME, +}; use table::table::adapter::DfTableProviderAdapter; use crate::promql::error::{ CatalogSnafu, ColumnNotFoundSnafu, CombineTableColumnMismatchSnafu, DataFusionPlanningSnafu, - ExpectRangeSelectorSnafu, FunctionInvalidArgumentSnafu, MultiFieldsNotSupportedSnafu, - MultipleMetricMatchersSnafu, MultipleVectorSnafu, NoMetricMatcherSnafu, PromqlPlanNodeSnafu, - Result, TableNameNotFoundSnafu, TimeIndexNotFoundSnafu, UnexpectedPlanExprSnafu, - UnexpectedTokenSnafu, UnknownTableSnafu, UnsupportedExprSnafu, UnsupportedMatcherOpSnafu, - UnsupportedVectorMatchSnafu, ValueNotFoundSnafu, ZeroRangeSelectorSnafu, + ExpectRangeSelectorSnafu, FunctionInvalidArgumentSnafu, InvalidTimeRangeSnafu, + MultiFieldsNotSupportedSnafu, MultipleMetricMatchersSnafu, MultipleVectorSnafu, + NoMetricMatcherSnafu, PromqlPlanNodeSnafu, Result, TableNameNotFoundSnafu, + TimeIndexNotFoundSnafu, UnexpectedPlanExprSnafu, UnexpectedTokenSnafu, UnknownTableSnafu, + UnsupportedExprSnafu, UnsupportedMatcherOpSnafu, UnsupportedVectorMatchSnafu, + ValueNotFoundSnafu, ZeroRangeSelectorSnafu, }; /// `time()` function in PromQL. @@ -982,6 +986,9 @@ impl PromPlanner { fn build_time_index_filter(&self, offset_duration: i64) -> Result> { let start = self.ctx.start; let end = self.ctx.end; + if end < start { + return InvalidTimeRangeSnafu { start, end }.fail(); + } let lookback_delta = self.ctx.lookback_delta; let range = self.ctx.range.unwrap_or_default(); let interval = self.ctx.interval; @@ -1146,6 +1153,10 @@ impl PromPlanner { .table_info() .meta .row_key_column_names() + .filter(|col| { + // remove metric engine's internal columns + col != &DATA_SCHEMA_TABLE_ID_COLUMN_NAME && col != &DATA_SCHEMA_TSID_COLUMN_NAME + }) .cloned() .collect(); self.ctx.tag_columns = tags; diff --git a/src/servers/src/error.rs b/src/servers/src/error.rs index 235195fa02..096c3fd75f 100644 --- a/src/servers/src/error.rs +++ b/src/servers/src/error.rs @@ -27,6 +27,7 @@ use common_macro::stack_trace_debug; use common_telemetry::{error, warn}; use datatypes::prelude::ConcreteDataType; use headers::ContentType; +use http::header::InvalidHeaderValue; use query::parser::PromQuery; use serde_json::json; use snafu::{Location, Snafu}; @@ -345,6 +346,14 @@ pub enum Error { location: Location, }, + #[snafu(display("Invalid http header value"))] + InvalidHeaderValue { + #[snafu(source)] + error: InvalidHeaderValue, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("Error accessing catalog"))] Catalog { source: catalog::error::Error, @@ -678,7 +687,7 @@ impl ErrorExt for Error { #[cfg(feature = "mem-prof")] DumpProfileData { source, .. } => source.status_code(), - InvalidUtf8Value { .. } => StatusCode::InvalidArguments, + InvalidUtf8Value { .. } | InvalidHeaderValue { .. } => StatusCode::InvalidArguments, ParsePromQL { source, .. } => source.status_code(), Other { source, .. } => source.status_code(), diff --git a/src/servers/src/http.rs b/src/servers/src/http.rs index e446e1a455..ea522efdc2 100644 --- a/src/servers/src/http.rs +++ b/src/servers/src/http.rs @@ -36,14 +36,14 @@ use datatypes::schema::SchemaRef; use datatypes::value::transform_value_ref_to_json_value; use event::{LogState, LogValidatorRef}; use futures::FutureExt; -use http::Method; +use http::{HeaderValue, Method}; use serde::{Deserialize, Serialize}; use serde_json::Value; use snafu::{ensure, ResultExt}; use tokio::sync::oneshot::{self, Sender}; use tokio::sync::Mutex; use tower::ServiceBuilder; -use tower_http::cors::{Any, CorsLayer}; +use tower_http::cors::{AllowOrigin, Any, CorsLayer}; use tower_http::decompression::RequestDecompressionLayer; use tower_http::trace::TraceLayer; @@ -52,7 +52,8 @@ use self::result::table_result::TableResponse; use crate::configurator::ConfiguratorRef; use crate::elasticsearch; use crate::error::{ - AddressBindSnafu, AlreadyStartedSnafu, Error, InternalIoSnafu, Result, ToJsonSnafu, + AddressBindSnafu, AlreadyStartedSnafu, Error, InternalIoSnafu, InvalidHeaderValueSnafu, Result, + ToJsonSnafu, }; use crate::http::influxdb::{influxdb_health, influxdb_ping, influxdb_write_v1, influxdb_write_v2}; use crate::http::prometheus::{ @@ -140,6 +141,10 @@ pub struct HttpOptions { pub body_limit: ReadableSize, pub is_strict_mode: bool, + + pub cors_allowed_origins: Vec, + + pub enable_cors: bool, } impl Default for HttpOptions { @@ -150,6 +155,8 @@ impl Default for HttpOptions { disable_dashboard: false, body_limit: DEFAULT_BODY_LIMIT, is_strict_mode: false, + cors_allowed_origins: Vec::new(), + enable_cors: true, } } } @@ -715,7 +722,7 @@ impl HttpServer { /// Attaches middlewares and debug routes to the router. /// Callers should call this method after [HttpServer::make_app()]. - pub fn build(&self, router: Router) -> Router { + pub fn build(&self, router: Router) -> Result { let timeout_layer = if self.options.timeout != Duration::default() { Some(ServiceBuilder::new().layer(DynamicTimeoutLayer::new(self.options.timeout))) } else { @@ -731,26 +738,45 @@ impl HttpServer { info!("HTTP server body limit is disabled"); None }; + let cors_layer = if self.options.enable_cors { + Some( + CorsLayer::new() + .allow_methods([ + Method::GET, + Method::POST, + Method::PUT, + Method::DELETE, + Method::HEAD, + ]) + .allow_origin(if self.options.cors_allowed_origins.is_empty() { + AllowOrigin::from(Any) + } else { + AllowOrigin::from( + self.options + .cors_allowed_origins + .iter() + .map(|s| { + HeaderValue::from_str(s.as_str()) + .context(InvalidHeaderValueSnafu) + }) + .collect::>>()?, + ) + }) + .allow_headers(Any), + ) + } else { + info!("HTTP server cross-origin is disabled"); + None + }; - router + Ok(router // middlewares .layer( ServiceBuilder::new() // disable on failure tracing. because printing out isn't very helpful, // and we have impl IntoResponse for Error. It will print out more detailed error messages .layer(TraceLayer::new_for_http().on_failure(())) - .layer( - CorsLayer::new() - .allow_methods([ - Method::GET, - Method::POST, - Method::PUT, - Method::DELETE, - Method::HEAD, - ]) - .allow_origin(Any) - .allow_headers(Any), - ) + .option_layer(cors_layer) .option_layer(timeout_layer) .option_layer(body_limit_layer) // auth layer @@ -772,7 +798,7 @@ impl HttpServer { .route("/cpu", routing::post(pprof::pprof_handler)) .route("/mem", routing::post(mem_prof::mem_prof_handler)), ), - ) + )) } fn route_metrics(metrics_handler: MetricsHandler) -> Router { @@ -1032,7 +1058,7 @@ impl Server for HttpServer { if let Some(configurator) = self.plugins.get::() { app = configurator.config_http(app); } - let app = self.build(app); + let app = self.build(app)?; let listener = tokio::net::TcpListener::bind(listening) .await .context(AddressBindSnafu { addr: listening })? @@ -1177,17 +1203,123 @@ mod test { } fn make_test_app(tx: mpsc::Sender<(String, Vec)>) -> Router { + make_test_app_custom(tx, HttpOptions::default()) + } + + fn make_test_app_custom(tx: mpsc::Sender<(String, Vec)>, options: HttpOptions) -> Router { let instance = Arc::new(DummyInstance { _tx: tx }); let sql_instance = ServerSqlQueryHandlerAdapter::arc(instance.clone()); - let server = HttpServerBuilder::new(HttpOptions::default()) + let server = HttpServerBuilder::new(options) .with_sql_handler(sql_instance) .build(); - server.build(server.make_app()).route( + server.build(server.make_app()).unwrap().route( "/test/timeout", get(forever.layer(ServiceBuilder::new().layer(timeout()))), ) } + #[tokio::test] + pub async fn test_cors() { + // cors is on by default + let (tx, _rx) = mpsc::channel(100); + let app = make_test_app(tx); + let client = TestClient::new(app).await; + + let res = client.get("/health").send().await; + + assert_eq!(res.status(), StatusCode::OK); + assert_eq!( + res.headers() + .get(http::header::ACCESS_CONTROL_ALLOW_ORIGIN) + .expect("expect cors header origin"), + "*" + ); + + let res = client + .options("/health") + .header("Access-Control-Request-Headers", "x-greptime-auth") + .header("Access-Control-Request-Method", "DELETE") + .header("Origin", "https://example.com") + .send() + .await; + assert_eq!(res.status(), StatusCode::OK); + assert_eq!( + res.headers() + .get(http::header::ACCESS_CONTROL_ALLOW_ORIGIN) + .expect("expect cors header origin"), + "*" + ); + assert_eq!( + res.headers() + .get(http::header::ACCESS_CONTROL_ALLOW_HEADERS) + .expect("expect cors header headers"), + "*" + ); + assert_eq!( + res.headers() + .get(http::header::ACCESS_CONTROL_ALLOW_METHODS) + .expect("expect cors header methods"), + "GET,POST,PUT,DELETE,HEAD" + ); + } + + #[tokio::test] + pub async fn test_cors_custom_origins() { + // cors is on by default + let (tx, _rx) = mpsc::channel(100); + let origin = "https://example.com"; + + let options = HttpOptions { + cors_allowed_origins: vec![origin.to_string()], + ..Default::default() + }; + + let app = make_test_app_custom(tx, options); + let client = TestClient::new(app).await; + + let res = client.get("/health").header("Origin", origin).send().await; + + assert_eq!(res.status(), StatusCode::OK); + assert_eq!( + res.headers() + .get(http::header::ACCESS_CONTROL_ALLOW_ORIGIN) + .expect("expect cors header origin"), + origin + ); + + let res = client + .get("/health") + .header("Origin", "https://notallowed.com") + .send() + .await; + + assert_eq!(res.status(), StatusCode::OK); + assert!(!res + .headers() + .contains_key(http::header::ACCESS_CONTROL_ALLOW_ORIGIN)); + } + + #[tokio::test] + pub async fn test_cors_disabled() { + // cors is on by default + let (tx, _rx) = mpsc::channel(100); + + let options = HttpOptions { + enable_cors: false, + ..Default::default() + }; + + let app = make_test_app_custom(tx, options); + let client = TestClient::new(app).await; + + let res = client.get("/health").send().await; + + assert_eq!(res.status(), StatusCode::OK); + assert!(!res + .headers() + .contains_key(http::header::ACCESS_CONTROL_ALLOW_ORIGIN)); + } + #[test] fn test_http_options_default() { let default = HttpOptions::default(); diff --git a/src/servers/src/http/prometheus.rs b/src/servers/src/http/prometheus.rs index adbf49010b..d6f205ee9e 100644 --- a/src/servers/src/http/prometheus.rs +++ b/src/servers/src/http/prometheus.rs @@ -45,7 +45,7 @@ use serde_json::Value; use session::context::{QueryContext, QueryContextRef}; use snafu::{Location, OptionExt, ResultExt}; use store_api::metric_engine_consts::{ - DATA_SCHEMA_TABLE_ID_COLUMN_NAME, DATA_SCHEMA_TSID_COLUMN_NAME, + DATA_SCHEMA_TABLE_ID_COLUMN_NAME, DATA_SCHEMA_TSID_COLUMN_NAME, PHYSICAL_TABLE_METADATA_KEY, }; pub use super::result::prometheus_resp::PrometheusJsonResponse; @@ -941,16 +941,30 @@ pub async fn label_values_query( .start_timer(); if label_name == METRIC_NAME_LABEL { - let mut table_names = match handler - .catalog_manager() - .table_names(&catalog, &schema, Some(&query_ctx)) - .await - { - Ok(table_names) => table_names, - Err(e) => { - return PrometheusJsonResponse::error(e.status_code(), e.output_msg()); + let catalog_manager = handler.catalog_manager(); + let mut tables_stream = catalog_manager.tables(&catalog, &schema, Some(&query_ctx)); + let mut table_names = Vec::new(); + while let Some(table) = tables_stream.next().await { + // filter out physical tables + match table { + Ok(table) => { + if table + .table_info() + .meta + .options + .extra_options + .contains_key(PHYSICAL_TABLE_METADATA_KEY) + { + continue; + } + + table_names.push(table.table_info().name.clone()); + } + Err(e) => { + return PrometheusJsonResponse::error(e.status_code(), e.output_msg()); + } } - }; + } table_names.sort_unstable(); return PrometheusJsonResponse::success(PrometheusResponse::LabelValues(table_names)); } else if label_name == FIELD_NAME_LABEL { diff --git a/src/servers/src/http/result/prometheus_resp.rs b/src/servers/src/http/result/prometheus_resp.rs index d4ab0480da..8aa28011a2 100644 --- a/src/servers/src/http/result/prometheus_resp.rs +++ b/src/servers/src/http/result/prometheus_resp.rs @@ -187,6 +187,8 @@ impl PrometheusJsonResponse { let mut tag_column_indices = Vec::new(); let mut first_field_column_index = None; + let mut num_label_columns = 0; + for (i, column) in batches.schema().column_schemas().iter().enumerate() { match column.data_type { ConcreteDataType::Timestamp(datatypes::types::TimestampType::Millisecond(_)) => { @@ -211,6 +213,7 @@ impl PrometheusJsonResponse { } ConcreteDataType::String(_) => { tag_column_indices.push(i); + num_label_columns += 1; } _ => {} } @@ -223,9 +226,10 @@ impl PrometheusJsonResponse { reason: "no value column found".to_string(), })?; - let metric_name = (METRIC_NAME.to_string(), metric_name); - let mut buffer = BTreeMap::, Vec<(f64, String)>>::new(); + let metric_name = (METRIC_NAME, metric_name.as_str()); + let mut buffer = BTreeMap::, Vec<(f64, String)>>::new(); + let schema = batches.schema(); for batch in batches.iter() { // prepare things... let tag_columns = tag_column_indices @@ -240,7 +244,7 @@ impl PrometheusJsonResponse { .collect::>(); let tag_names = tag_column_indices .iter() - .map(|c| batches.schema().column_name_by_index(*c).to_string()) + .map(|c| schema.column_name_by_index(*c)) .collect::>(); let timestamp_column = batch .column(timestamp_column_index) @@ -260,11 +264,12 @@ impl PrometheusJsonResponse { for row_index in 0..batch.num_rows() { // retrieve tags // TODO(ruihang): push table name `__metric__` - let mut tags = vec![metric_name.clone()]; + let mut tags = Vec::with_capacity(num_label_columns + 1); + tags.push(metric_name); for (tag_column, tag_name) in tag_columns.iter().zip(tag_names.iter()) { // TODO(ruihang): add test for NULL tag if let Some(tag_value) = tag_column.get_data(row_index) { - tags.push((tag_name.to_string(), tag_value.to_string())); + tags.push((tag_name, tag_value)); } } @@ -292,7 +297,10 @@ impl PrometheusJsonResponse { // accumulate data into result buffer.into_iter().for_each(|(tags, mut values)| { - let metric = tags.into_iter().collect(); + let metric = tags + .into_iter() + .map(|(k, v)| (k.to_string(), v.to_string())) + .collect::>(); match result { PromQueryResult::Vector(ref mut v) => { v.push(PromSeriesVector { diff --git a/src/servers/src/postgres/fixtures.rs b/src/servers/src/postgres/fixtures.rs index 34761d5d35..4774c4ece0 100644 --- a/src/servers/src/postgres/fixtures.rs +++ b/src/servers/src/postgres/fixtures.rs @@ -115,11 +115,7 @@ pub(crate) fn process<'a>(query: &str, query_ctx: QueryContextRef) -> Option = - Lazy::new(|| Regex::new("(?i)(LIMIT\\s+\\d+)::bigint").unwrap()); pub(crate) fn rewrite_sql(query: &str) -> Cow<'_, str> { - //TODO(sunng87): remove this when we upgraded datafusion to 43 or newer - let query = LIMIT_CAST_PATTERN.replace_all(query, "$1"); // DBeaver tricky replacement for datafusion not support sql // TODO: add more here query @@ -218,11 +214,6 @@ mod test { #[test] fn test_rewrite() { - let sql = "SELECT * FROM number LIMIT 1::bigint"; - let sql2 = "SELECT * FROM number limit 1::BIGINT"; - - assert_eq!("SELECT * FROM number LIMIT 1", rewrite_sql(sql)); - assert_eq!("SELECT * FROM number limit 1", rewrite_sql(sql2)); assert_eq!( "SELECT db.oid as _oid,db.* FROM pg_catalog.pg_database db", rewrite_sql("SELECT db.oid,db.* FROM pg_catalog.pg_database db") diff --git a/src/servers/tests/http/influxdb_test.rs b/src/servers/tests/http/influxdb_test.rs index b4f823da86..1a251763ae 100644 --- a/src/servers/tests/http/influxdb_test.rs +++ b/src/servers/tests/http/influxdb_test.rs @@ -121,7 +121,7 @@ fn make_test_app(tx: Arc>, db_name: Option<&str>) .with_user_provider(Arc::new(user_provider)) .with_influxdb_handler(instance) .build(); - server.build(server.make_app()) + server.build(server.make_app()).unwrap() } #[tokio::test] diff --git a/src/servers/tests/http/opentsdb_test.rs b/src/servers/tests/http/opentsdb_test.rs index efbfc2a310..358af19dc8 100644 --- a/src/servers/tests/http/opentsdb_test.rs +++ b/src/servers/tests/http/opentsdb_test.rs @@ -112,7 +112,7 @@ fn make_test_app(tx: mpsc::Sender) -> Router { .with_sql_handler(instance.clone()) .with_opentsdb_handler(instance) .build(); - server.build(server.make_app()) + server.build(server.make_app()).unwrap() } #[tokio::test] diff --git a/src/servers/tests/http/prom_store_test.rs b/src/servers/tests/http/prom_store_test.rs index f5ca54d22d..77a06db079 100644 --- a/src/servers/tests/http/prom_store_test.rs +++ b/src/servers/tests/http/prom_store_test.rs @@ -141,7 +141,7 @@ fn make_test_app(tx: mpsc::Sender<(String, Vec)>) -> Router { .with_sql_handler(instance.clone()) .with_prom_handler(instance, true, is_strict_mode) .build(); - server.build(server.make_app()) + server.build(server.make_app()).unwrap() } #[tokio::test] diff --git a/tests-integration/src/test_util.rs b/tests-integration/src/test_util.rs index a326681e0f..626e2d96ac 100644 --- a/tests-integration/src/test_util.rs +++ b/tests-integration/src/test_util.rs @@ -397,7 +397,10 @@ pub async fn setup_test_http_app(store_type: StorageType, name: &str) -> (Router .with_metrics_handler(MetricsHandler) .with_greptime_config_options(instance.opts.datanode_options().to_toml().unwrap()) .build(); - (http_server.build(http_server.make_app()), instance.guard) + ( + http_server.build(http_server.make_app()).unwrap(), + instance.guard, + ) } pub async fn setup_test_http_app_with_frontend( @@ -436,7 +439,7 @@ pub async fn setup_test_http_app_with_frontend_and_user_provider( let http_server = http_server.build(); - let app = http_server.build(http_server.make_app()); + let app = http_server.build(http_server.make_app()).unwrap(); (app, instance.guard) } @@ -481,7 +484,7 @@ pub async fn setup_test_prom_app_with_frontend( .with_prometheus_handler(frontend_ref) .with_greptime_config_options(instance.opts.datanode_options().to_toml().unwrap()) .build(); - let app = http_server.build(http_server.make_app()); + let app = http_server.build(http_server.make_app()).unwrap(); (app, instance.guard) } diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index e4dc1efd50..e4a75aa341 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -704,6 +704,18 @@ pub async fn test_prom_http_api(store_type: StorageType) { assert!(prom_resp.error_type.is_none()); // query `__name__` without match[] + // create a physical table and a logical table + let res = client + .get("/v1/sql?sql=create table physical_table (`ts` timestamp time index, message string) with ('physical_metric_table' = 'true');") + .send() + .await; + assert_eq!(res.status(), StatusCode::OK, "{:?}", res.text().await); + let res = client + .get("/v1/sql?sql=create table logic_table (`ts` timestamp time index, message string) with ('on_physical_table' = 'physical_table');") + .send() + .await; + assert_eq!(res.status(), StatusCode::OK, "{:?}", res.text().await); + // query `__name__` let res = client .get("/v1/prometheus/api/v1/label/__name__/values") .send() @@ -713,6 +725,15 @@ pub async fn test_prom_http_api(store_type: StorageType) { assert_eq!(prom_resp.status, "success"); assert!(prom_resp.error.is_none()); assert!(prom_resp.error_type.is_none()); + assert_eq!( + prom_resp.data, + PrometheusResponse::Labels(vec![ + "demo".to_string(), + "demo_metrics".to_string(), + "logic_table".to_string(), + "numbers".to_string() + ]) + ); // buildinfo let res = client @@ -881,6 +902,8 @@ addr = "127.0.0.1:4000" timeout = "30s" body_limit = "64MiB" is_strict_mode = false +cors_allowed_origins = [] +enable_cors = true [grpc] addr = "127.0.0.1:4001"