Merge remote-tracking branch 'GreptimeTeam/poc-write-path' into poc-write-path

This commit is contained in:
Lei, HUANG
2025-02-06 06:39:07 +00:00
55 changed files with 903 additions and 380 deletions

2
Cargo.lock generated
View File

@@ -4723,7 +4723,7 @@ dependencies = [
[[package]]
name = "greptime-proto"
version = "0.1.0"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=683e9d10ae7f3dfb8aaabd89082fc600c17e3795#683e9d10ae7f3dfb8aaabd89082fc600c17e3795"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=1915576b113a494f5352fd61f211d899b7f87aab#1915576b113a494f5352fd61f211d899b7f87aab"
dependencies = [
"prost 0.13.3",
"serde",

View File

@@ -127,7 +127,8 @@ etcd-client = "0.14"
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "683e9d10ae7f3dfb8aaabd89082fc600c17e3795" }
# branch: poc-write-path
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "1915576b113a494f5352fd61f211d899b7f87aab" }
hex = "0.4"
http = "1"
humantime = "2.1"

View File

@@ -13,7 +13,7 @@
<a href="https://greptime.com/product/cloud">GreptimeCloud</a> |
<a href="https://docs.greptime.com/">User Guide</a> |
<a href="https://greptimedb.rs/">API Docs</a> |
<a href="https://github.com/GreptimeTeam/greptimedb/issues/3412">Roadmap 2024</a>
<a href="https://github.com/GreptimeTeam/greptimedb/issues/5446">Roadmap 2025</a>
</h4>
<a href="https://github.com/GreptimeTeam/greptimedb/releases/latest">

View File

@@ -26,6 +26,8 @@
| `http.addr` | String | `127.0.0.1:4000` | The address to bind the HTTP server. |
| `http.timeout` | String | `30s` | HTTP request timeout. Set to 0 to disable timeout. |
| `http.body_limit` | String | `64MB` | HTTP request body limit.<br/>The following units are supported: `B`, `KB`, `KiB`, `MB`, `MiB`, `GB`, `GiB`, `TB`, `TiB`, `PB`, `PiB`.<br/>Set to 0 to disable limit. |
| `http.enable_cors` | Bool | `true` | HTTP CORS support, it's turned on by default<br/>This allows browser to access http APIs without CORS restrictions |
| `http.cors_allowed_origins` | Array | Unset | Customize allowed origins for HTTP CORS. |
| `grpc` | -- | -- | The gRPC server options. |
| `grpc.addr` | String | `127.0.0.1:4001` | The address to bind the gRPC server. |
| `grpc.runtime_size` | Integer | `8` | The number of server worker threads. |
@@ -216,6 +218,8 @@
| `http.addr` | String | `127.0.0.1:4000` | The address to bind the HTTP server. |
| `http.timeout` | String | `30s` | HTTP request timeout. Set to 0 to disable timeout. |
| `http.body_limit` | String | `64MB` | HTTP request body limit.<br/>The following units are supported: `B`, `KB`, `KiB`, `MB`, `MiB`, `GB`, `GiB`, `TB`, `TiB`, `PB`, `PiB`.<br/>Set to 0 to disable limit. |
| `http.enable_cors` | Bool | `true` | HTTP CORS support, it's turned on by default<br/>This allows browser to access http APIs without CORS restrictions |
| `http.cors_allowed_origins` | Array | Unset | Customize allowed origins for HTTP CORS. |
| `grpc` | -- | -- | The gRPC server options. |
| `grpc.addr` | String | `127.0.0.1:4001` | The address to bind the gRPC server. |
| `grpc.hostname` | String | `127.0.0.1:4001` | The hostname advertised to the metasrv,<br/>and used for connections from outside the host |

View File

@@ -31,6 +31,12 @@ timeout = "30s"
## The following units are supported: `B`, `KB`, `KiB`, `MB`, `MiB`, `GB`, `GiB`, `TB`, `TiB`, `PB`, `PiB`.
## Set to 0 to disable limit.
body_limit = "64MB"
## HTTP CORS support, it's turned on by default
## This allows browser to access http APIs without CORS restrictions
enable_cors = true
## Customize allowed origins for HTTP CORS.
## @toml2docs:none-default
cors_allowed_origins = ["https://example.com"]
## The gRPC server options.
[grpc]

View File

@@ -39,6 +39,12 @@ timeout = "30s"
## The following units are supported: `B`, `KB`, `KiB`, `MB`, `MiB`, `GB`, `GiB`, `TB`, `TiB`, `PB`, `PiB`.
## Set to 0 to disable limit.
body_limit = "64MB"
## HTTP CORS support, it's turned on by default
## This allows browser to access http APIs without CORS restrictions
enable_cors = true
## Customize allowed origins for HTTP CORS.
## @toml2docs:none-default
cors_allowed_origins = ["https://example.com"]
## The gRPC server options.
[grpc]

View File

@@ -39,14 +39,16 @@ services:
container_name: metasrv
ports:
- 3002:3002
- 3000:3000
command:
- metasrv
- start
- --bind-addr=0.0.0.0:3002
- --server-addr=metasrv:3002
- --store-addrs=etcd0:2379
- --http-addr=0.0.0.0:3000
healthcheck:
test: [ "CMD", "curl", "-f", "http://metasrv:3002/health" ]
test: [ "CMD", "curl", "-f", "http://metasrv:3000/health" ]
interval: 5s
timeout: 3s
retries: 5
@@ -73,10 +75,10 @@ services:
volumes:
- /tmp/greptimedb-cluster-docker-compose/datanode0:/tmp/greptimedb
healthcheck:
test: [ "CMD", "curl", "-f", "http://datanode0:5000/health" ]
test: [ "CMD", "curl", "-fv", "http://datanode0:5000/health" ]
interval: 5s
timeout: 3s
retries: 5
retries: 10
depends_on:
metasrv:
condition: service_healthy
@@ -115,6 +117,7 @@ services:
container_name: flownode0
ports:
- 4004:4004
- 4005:4005
command:
- flownode
- start
@@ -122,9 +125,15 @@ services:
- --metasrv-addrs=metasrv:3002
- --rpc-addr=0.0.0.0:4004
- --rpc-hostname=flownode0:4004
- --http-addr=0.0.0.0:4005
depends_on:
frontend0:
condition: service_healthy
healthcheck:
test: [ "CMD", "curl", "-f", "http://flownode0:4005/health" ]
interval: 5s
timeout: 3s
retries: 5
networks:
- greptimedb

View File

@@ -18,7 +18,11 @@
libgit2
libz
];
lib = nixpkgs.lib;
rustToolchain = fenix.packages.${system}.fromToolchainName {
name = (lib.importTOML ./rust-toolchain.toml).toolchain.channel;
sha256 = "sha256-f/CVA1EC61EWbh0SjaRNhLL0Ypx2ObupbzigZp8NmL4=";
};
in
{
devShells.default = pkgs.mkShell {
@@ -30,14 +34,20 @@
protobuf
gnumake
mold
(fenix.packages.${system}.fromToolchainFile {
dir = ./.;
sha256 = "sha256-f/CVA1EC61EWbh0SjaRNhLL0Ypx2ObupbzigZp8NmL4=";
})
(rustToolchain.withComponents [
"cargo"
"clippy"
"rust-src"
"rustc"
"rustfmt"
"rust-analyzer"
"llvm-tools"
])
cargo-nextest
cargo-llvm-cov
taplo
curl
gnuplot ## for cargo bench
];
LD_LIBRARY_PATH = pkgs.lib.makeLibraryPath buildInputs;

View File

@@ -1,3 +1,2 @@
[toolchain]
channel = "nightly-2024-12-25"
components = ["rust-analyzer", "llvm-tools"]

View File

@@ -17,6 +17,7 @@ use std::time::Duration;
use base64::engine::general_purpose;
use base64::Engine;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_error::ext::BoxedError;
use humantime::format_duration;
use serde_json::Value;
use servers::http::header::constants::GREPTIME_DB_HEADER_TIMEOUT;
@@ -24,7 +25,9 @@ use servers::http::result::greptime_result_v1::GreptimedbV1Response;
use servers::http::GreptimeQueryOutput;
use snafu::ResultExt;
use crate::error::{HttpQuerySqlSnafu, Result, SerdeJsonSnafu};
use crate::error::{
BuildClientSnafu, HttpQuerySqlSnafu, ParseProxyOptsSnafu, Result, SerdeJsonSnafu,
};
#[derive(Debug, Clone)]
pub struct DatabaseClient {
@@ -32,6 +35,23 @@ pub struct DatabaseClient {
catalog: String,
auth_header: Option<String>,
timeout: Duration,
proxy: Option<reqwest::Proxy>,
}
pub fn parse_proxy_opts(
proxy: Option<String>,
no_proxy: bool,
) -> std::result::Result<Option<reqwest::Proxy>, BoxedError> {
if no_proxy {
return Ok(None);
}
proxy
.map(|proxy| {
reqwest::Proxy::all(proxy)
.context(ParseProxyOptsSnafu)
.map_err(BoxedError::new)
})
.transpose()
}
impl DatabaseClient {
@@ -40,6 +60,7 @@ impl DatabaseClient {
catalog: String,
auth_basic: Option<String>,
timeout: Duration,
proxy: Option<reqwest::Proxy>,
) -> Self {
let auth_header = if let Some(basic) = auth_basic {
let encoded = general_purpose::STANDARD.encode(basic);
@@ -48,11 +69,18 @@ impl DatabaseClient {
None
};
if let Some(ref proxy) = proxy {
common_telemetry::info!("Using proxy: {:?}", proxy);
} else {
common_telemetry::info!("Using system proxy(if any)");
}
Self {
addr,
catalog,
auth_header,
timeout,
proxy,
}
}
@@ -67,7 +95,13 @@ impl DatabaseClient {
("db", format!("{}-{}", self.catalog, schema)),
("sql", sql.to_string()),
];
let mut request = reqwest::Client::new()
let client = self
.proxy
.clone()
.map(|proxy| reqwest::Client::builder().proxy(proxy).build())
.unwrap_or_else(|| Ok(reqwest::Client::new()))
.context(BuildClientSnafu)?;
let mut request = client
.post(&url)
.form(&params)
.header("Content-Type", "application/x-www-form-urlencoded");

View File

@@ -86,6 +86,22 @@ pub enum Error {
location: Location,
},
#[snafu(display("Failed to parse proxy options: {}", error))]
ParseProxyOpts {
#[snafu(source)]
error: reqwest::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to build reqwest client: {}", error))]
BuildClient {
#[snafu(implicit)]
location: Location,
#[snafu(source)]
error: reqwest::Error,
},
#[snafu(display("Invalid REPL command: {reason}"))]
InvalidReplCommand { reason: String },
@@ -278,7 +294,8 @@ impl ErrorExt for Error {
| Error::InitTimezone { .. }
| Error::ConnectEtcd { .. }
| Error::CreateDir { .. }
| Error::EmptyResult { .. } => StatusCode::InvalidArguments,
| Error::EmptyResult { .. }
| Error::ParseProxyOpts { .. } => StatusCode::InvalidArguments,
Error::StartProcedureManager { source, .. }
| Error::StopProcedureManager { source, .. } => source.status_code(),
@@ -298,7 +315,8 @@ impl ErrorExt for Error {
Error::SerdeJson { .. }
| Error::FileIo { .. }
| Error::SpawnThread { .. }
| Error::InitTlsProvider { .. } => StatusCode::Unexpected,
| Error::InitTlsProvider { .. }
| Error::BuildClient { .. } => StatusCode::Unexpected,
Error::Other { source, .. } => source.status_code(),

View File

@@ -28,7 +28,7 @@ use tokio::io::{AsyncWriteExt, BufWriter};
use tokio::sync::Semaphore;
use tokio::time::Instant;
use crate::database::DatabaseClient;
use crate::database::{parse_proxy_opts, DatabaseClient};
use crate::error::{EmptyResultSnafu, Error, FileIoSnafu, Result, SchemaNotFoundSnafu};
use crate::{database, Tool};
@@ -91,19 +91,30 @@ pub struct ExportCommand {
/// The default behavior will disable server-side default timeout(i.e. `0s`).
#[clap(long, value_parser = humantime::parse_duration)]
timeout: Option<Duration>,
/// The proxy server address to connect, if set, will override the system proxy.
///
/// The default behavior will use the system proxy if neither `proxy` nor `no_proxy` is set.
#[clap(long)]
proxy: Option<String>,
/// Disable proxy server, if set, will not use any proxy.
#[clap(long)]
no_proxy: bool,
}
impl ExportCommand {
pub async fn build(&self) -> std::result::Result<Box<dyn Tool>, BoxedError> {
let (catalog, schema) =
database::split_database(&self.database).map_err(BoxedError::new)?;
let proxy = parse_proxy_opts(self.proxy.clone(), self.no_proxy)?;
let database_client = DatabaseClient::new(
self.addr.clone(),
catalog.clone(),
self.auth_basic.clone(),
// Treats `None` as `0s` to disable server-side default timeout.
self.timeout.unwrap_or_default(),
proxy,
);
Ok(Box::new(Export {

View File

@@ -25,7 +25,7 @@ use snafu::{OptionExt, ResultExt};
use tokio::sync::Semaphore;
use tokio::time::Instant;
use crate::database::DatabaseClient;
use crate::database::{parse_proxy_opts, DatabaseClient};
use crate::error::{Error, FileIoSnafu, Result, SchemaNotFoundSnafu};
use crate::{database, Tool};
@@ -76,18 +76,30 @@ pub struct ImportCommand {
/// The default behavior will disable server-side default timeout(i.e. `0s`).
#[clap(long, value_parser = humantime::parse_duration)]
timeout: Option<Duration>,
/// The proxy server address to connect, if set, will override the system proxy.
///
/// The default behavior will use the system proxy if neither `proxy` nor `no_proxy` is set.
#[clap(long)]
proxy: Option<String>,
/// Disable proxy server, if set, will not use any proxy.
#[clap(long, default_value = "false")]
no_proxy: bool,
}
impl ImportCommand {
pub async fn build(&self) -> std::result::Result<Box<dyn Tool>, BoxedError> {
let (catalog, schema) =
database::split_database(&self.database).map_err(BoxedError::new)?;
let proxy = parse_proxy_opts(self.proxy.clone(), self.no_proxy)?;
let database_client = DatabaseClient::new(
self.addr.clone(),
catalog.clone(),
self.auth_basic.clone(),
// Treats `None` as `0s` to disable server-side default timeout.
self.timeout.unwrap_or_default(),
proxy,
);
Ok(Box::new(Import {

View File

@@ -34,6 +34,7 @@ use metric_engine::config::EngineConfig as MetricEngineConfig;
use mito2::config::MitoConfig;
use servers::export_metrics::ExportMetricsOption;
use servers::grpc::GrpcOptions;
use servers::http::HttpOptions;
#[allow(deprecated)]
#[test]
@@ -144,6 +145,10 @@ fn test_load_frontend_example_config() {
..Default::default()
},
grpc: GrpcOptions::default().with_hostname("127.0.0.1:4001"),
http: HttpOptions {
cors_allowed_origins: vec!["https://example.com".to_string()],
..Default::default()
},
..Default::default()
},
..Default::default()
@@ -234,6 +239,10 @@ fn test_load_standalone_example_config() {
remote_write: Some(Default::default()),
..Default::default()
},
http: HttpOptions {
cors_allowed_origins: vec!["https://example.com".to_string()],
..Default::default()
},
..Default::default()
},
..Default::default()

View File

@@ -80,10 +80,12 @@ impl<T: LogicalPrimitiveType> PrimitiveVector<T> {
}
}
pub fn from_vec(array: Vec<T::Native>) -> Self {
Self {
array: PrimitiveArray::from_iter_values(array),
}
pub fn from_vec(vector: Vec<T::Native>) -> Self {
let mutable_buffer = arrow::buffer::MutableBuffer::from(vector);
let mut primitive_builder =
PrimitiveBuilder::<T::ArrowPrimitive>::new_from_buffer(mutable_buffer, None);
let array = primitive_builder.finish();
Self { array }
}
pub fn from_iter_values<I: IntoIterator<Item = T::Native>>(iter: I) -> Self {

View File

@@ -276,6 +276,7 @@ impl CpuDataGenerator {
rows,
}),
write_hint: None,
bulk: Vec::new(),
};
KeyValues::new(&self.metadata, mutation).unwrap()

View File

@@ -17,10 +17,12 @@ use std::sync::Arc;
use object_store::services::Fs;
use object_store::util::{join_dir, with_instrument_layers};
use object_store::ObjectStore;
use smallvec::SmallVec;
use snafu::ResultExt;
use store_api::metadata::RegionMetadataRef;
use store_api::storage::SequenceNumber;
use store_api::storage::{RegionId, SequenceNumber};
use crate::cache::file_cache::{FileCacheRef, FileType, IndexKey};
use crate::cache::write_cache::SstUploadRequest;
use crate::cache::CacheManagerRef;
use crate::config::{BloomFilterConfig, FulltextIndexConfig, InvertedIndexConfig};
@@ -30,13 +32,15 @@ use crate::region::options::IndexOptions;
use crate::sst::file::{FileHandle, FileId, FileMeta};
use crate::sst::index::intermediate::IntermediateManager;
use crate::sst::index::puffin_manager::PuffinManagerFactory;
use crate::sst::index::IndexerBuilder;
use crate::sst::index::IndexerBuilderImpl;
use crate::sst::location;
use crate::sst::parquet::reader::ParquetReaderBuilder;
use crate::sst::parquet::writer::ParquetWriter;
use crate::sst::parquet::{SstInfo, WriteOptions};
pub type AccessLayerRef = Arc<AccessLayer>;
/// SST write results.
pub type SstInfoArray = SmallVec<[SstInfo; 2]>;
/// A layer to access SST files under the same directory.
pub struct AccessLayer {
@@ -121,11 +125,8 @@ impl AccessLayer {
&self,
request: SstWriteRequest,
write_opts: &WriteOptions,
) -> Result<Option<SstInfo>> {
let file_path = location::sst_file_path(&self.region_dir, request.file_id);
let index_file_path = location::index_file_path(&self.region_dir, request.file_id);
) -> Result<SstInfoArray> {
let region_id = request.metadata.region_id;
let file_id = request.file_id;
let cache_manager = request.cache_manager.clone();
let sst_info = if let Some(write_cache) = cache_manager.write_cache() {
@@ -134,8 +135,9 @@ impl AccessLayer {
.write_and_upload_sst(
request,
SstUploadRequest {
upload_path: file_path,
index_upload_path: index_file_path,
dest_path_provider: RegionFilePathFactory {
region_dir: self.region_dir.clone(),
},
remote_store: self.object_store.clone(),
},
write_opts,
@@ -144,11 +146,9 @@ impl AccessLayer {
} else {
// Write cache is disabled.
let store = self.object_store.clone();
let indexer = IndexerBuilder {
let indexer_builder = IndexerBuilderImpl {
op_type: request.op_type,
file_id,
file_path: index_file_path,
metadata: &request.metadata,
metadata: request.metadata.clone(),
row_group_size: write_opts.row_group_size,
puffin_manager: self.puffin_manager_factory.build(store),
intermediate_manager: self.intermediate_manager.clone(),
@@ -156,24 +156,31 @@ impl AccessLayer {
inverted_index_config: request.inverted_index_config,
fulltext_index_config: request.fulltext_index_config,
bloom_filter_index_config: request.bloom_filter_index_config,
}
.build()
.await;
};
let mut writer = ParquetWriter::new_with_object_store(
self.object_store.clone(),
file_path,
request.metadata,
indexer,
);
indexer_builder,
RegionFilePathFactory {
region_dir: self.region_dir.clone(),
},
)
.await;
writer
.write_all(request.source, request.max_sequence, write_opts)
.await?
};
// Put parquet metadata to cache manager.
if let Some(sst_info) = &sst_info {
if let Some(parquet_metadata) = &sst_info.file_metadata {
cache_manager.put_parquet_meta_data(region_id, file_id, parquet_metadata.clone())
if !sst_info.is_empty() {
for sst in &sst_info {
if let Some(parquet_metadata) = &sst.file_metadata {
cache_manager.put_parquet_meta_data(
region_id,
sst.file_id,
parquet_metadata.clone(),
)
}
}
}
@@ -191,7 +198,6 @@ pub(crate) enum OperationType {
/// Contents to build a SST.
pub(crate) struct SstWriteRequest {
pub(crate) op_type: OperationType,
pub(crate) file_id: FileId,
pub(crate) metadata: RegionMetadataRef,
pub(crate) source: Source,
pub(crate) cache_manager: CacheManagerRef,
@@ -229,3 +235,47 @@ async fn clean_dir(dir: &str) -> Result<()> {
Ok(())
}
/// Path provider for SST file and index file.
pub trait FilePathProvider: Send + Sync {
/// Creates index file path of given file id.
fn build_index_file_path(&self, file_id: FileId) -> String;
/// Creates SST file path of given file id.
fn build_sst_file_path(&self, file_id: FileId) -> String;
}
/// Path provider that builds paths in local write cache.
#[derive(Clone)]
pub(crate) struct WriteCachePathProvider {
pub(crate) region_id: RegionId,
pub(crate) file_cache: FileCacheRef,
}
impl FilePathProvider for WriteCachePathProvider {
fn build_index_file_path(&self, file_id: FileId) -> String {
let puffin_key = IndexKey::new(self.region_id, file_id, FileType::Puffin);
self.file_cache.cache_file_path(puffin_key)
}
fn build_sst_file_path(&self, file_id: FileId) -> String {
let parquet_file_key = IndexKey::new(self.region_id, file_id, FileType::Parquet);
self.file_cache.cache_file_path(parquet_file_key)
}
}
/// Path provider that builds paths in region storage path.
#[derive(Clone, Debug)]
pub(crate) struct RegionFilePathFactory {
pub(crate) region_dir: String,
}
impl FilePathProvider for RegionFilePathFactory {
fn build_index_file_path(&self, file_id: FileId) -> String {
location::index_file_path(&self.region_dir, file_id)
}
fn build_sst_file_path(&self, file_id: FileId) -> String {
location::sst_file_path(&self.region_dir, file_id)
}
}

View File

@@ -23,7 +23,10 @@ use futures::AsyncWriteExt;
use object_store::ObjectStore;
use snafu::ResultExt;
use crate::access_layer::{new_fs_cache_store, SstWriteRequest};
use crate::access_layer::{
new_fs_cache_store, FilePathProvider, RegionFilePathFactory, SstInfoArray, SstWriteRequest,
WriteCachePathProvider,
};
use crate::cache::file_cache::{FileCache, FileCacheRef, FileType, IndexKey, IndexValue};
use crate::error::{self, Result};
use crate::metrics::{
@@ -32,9 +35,9 @@ use crate::metrics::{
};
use crate::sst::index::intermediate::IntermediateManager;
use crate::sst::index::puffin_manager::PuffinManagerFactory;
use crate::sst::index::IndexerBuilder;
use crate::sst::index::IndexerBuilderImpl;
use crate::sst::parquet::writer::ParquetWriter;
use crate::sst::parquet::{SstInfo, WriteOptions};
use crate::sst::parquet::WriteOptions;
use crate::sst::{DEFAULT_WRITE_BUFFER_SIZE, DEFAULT_WRITE_CONCURRENCY};
/// A cache for uploading files to remote object stores.
@@ -103,22 +106,21 @@ impl WriteCache {
write_request: SstWriteRequest,
upload_request: SstUploadRequest,
write_opts: &WriteOptions,
) -> Result<Option<SstInfo>> {
) -> Result<SstInfoArray> {
let timer = FLUSH_ELAPSED
.with_label_values(&["write_sst"])
.start_timer();
let region_id = write_request.metadata.region_id;
let file_id = write_request.file_id;
let parquet_key = IndexKey::new(region_id, file_id, FileType::Parquet);
let puffin_key = IndexKey::new(region_id, file_id, FileType::Puffin);
let store = self.file_cache.local_store();
let indexer = IndexerBuilder {
let path_provider = WriteCachePathProvider {
file_cache: self.file_cache.clone(),
region_id,
};
let indexer = IndexerBuilderImpl {
op_type: write_request.op_type,
file_id,
file_path: self.file_cache.cache_file_path(puffin_key),
metadata: &write_request.metadata,
metadata: write_request.metadata.clone(),
row_group_size: write_opts.row_group_size,
puffin_manager: self.puffin_manager_factory.build(store),
intermediate_manager: self.intermediate_manager.clone(),
@@ -126,17 +128,16 @@ impl WriteCache {
inverted_index_config: write_request.inverted_index_config,
fulltext_index_config: write_request.fulltext_index_config,
bloom_filter_index_config: write_request.bloom_filter_index_config,
}
.build()
.await;
};
// Write to FileCache.
let mut writer = ParquetWriter::new_with_object_store(
self.file_cache.local_store(),
self.file_cache.cache_file_path(parquet_key),
write_request.metadata,
indexer,
);
path_provider,
)
.await;
let sst_info = writer
.write_all(write_request.source, write_request.max_sequence, write_opts)
@@ -145,22 +146,29 @@ impl WriteCache {
timer.stop_and_record();
// Upload sst file to remote object store.
let Some(sst_info) = sst_info else {
// No data need to upload.
return Ok(None);
};
let parquet_path = &upload_request.upload_path;
let remote_store = &upload_request.remote_store;
self.upload(parquet_key, parquet_path, remote_store).await?;
if sst_info.index_metadata.file_size > 0 {
let puffin_key = IndexKey::new(region_id, file_id, FileType::Puffin);
let puffin_path = &upload_request.index_upload_path;
self.upload(puffin_key, puffin_path, remote_store).await?;
if sst_info.is_empty() {
return Ok(sst_info);
}
Ok(Some(sst_info))
let remote_store = &upload_request.remote_store;
for sst in &sst_info {
let parquet_key = IndexKey::new(region_id, sst.file_id, FileType::Parquet);
let parquet_path = upload_request
.dest_path_provider
.build_sst_file_path(sst.file_id);
self.upload(parquet_key, &parquet_path, remote_store)
.await?;
if sst.index_metadata.file_size > 0 {
let puffin_key = IndexKey::new(region_id, sst.file_id, FileType::Puffin);
let puffin_path = &upload_request
.dest_path_provider
.build_index_file_path(sst.file_id);
self.upload(puffin_key, puffin_path, remote_store).await?;
}
}
Ok(sst_info)
}
/// Removes a file from the cache by `index_key`.
@@ -319,10 +327,8 @@ impl WriteCache {
/// Request to write and upload a SST.
pub struct SstUploadRequest {
/// Path to upload the file.
pub upload_path: String,
/// Path to upload the index file.
pub index_upload_path: String,
/// Destination path provider of which SST files in write cache should be uploaded to.
pub dest_path_provider: RegionFilePathFactory,
/// Remote object store to upload.
pub remote_store: ObjectStore,
}
@@ -336,11 +342,9 @@ mod tests {
use crate::cache::test_util::new_fs_store;
use crate::cache::{CacheManager, CacheStrategy};
use crate::region::options::IndexOptions;
use crate::sst::file::FileId;
use crate::sst::location::{index_file_path, sst_file_path};
use crate::sst::parquet::reader::ParquetReaderBuilder;
use crate::test_util::sst_util::{
assert_parquet_metadata_eq, new_batch_by_range, new_source, sst_file_handle,
assert_parquet_metadata_eq, new_batch_by_range, new_source, sst_file_handle_with_file_id,
sst_region_metadata,
};
use crate::test_util::TestEnv;
@@ -351,9 +355,9 @@ mod tests {
// and now just use local file system to mock.
let mut env = TestEnv::new();
let mock_store = env.init_object_store_manager();
let file_id = FileId::random();
let upload_path = sst_file_path("test", file_id);
let index_upload_path = index_file_path("test", file_id);
let path_provider = RegionFilePathFactory {
region_dir: "test".to_string(),
};
let local_dir = create_temp_dir("");
let local_store = new_fs_store(local_dir.path().to_str().unwrap());
@@ -373,7 +377,6 @@ mod tests {
let write_request = SstWriteRequest {
op_type: OperationType::Flush,
file_id,
metadata,
source,
storage: None,
@@ -386,8 +389,7 @@ mod tests {
};
let upload_request = SstUploadRequest {
upload_path: upload_path.clone(),
index_upload_path: index_upload_path.clone(),
dest_path_provider: path_provider.clone(),
remote_store: mock_store.clone(),
};
@@ -397,18 +399,22 @@ mod tests {
};
// Write to cache and upload sst to mock remote store
write_cache
let sst_info = write_cache
.write_and_upload_sst(write_request, upload_request, &write_opts)
.await
.unwrap()
.unwrap();
.remove(0); //todo(hl): we assume it only creates one file.
let file_id = sst_info.file_id;
let sst_upload_path = path_provider.build_sst_file_path(file_id);
let index_upload_path = path_provider.build_index_file_path(file_id);
// Check write cache contains the key
let key = IndexKey::new(region_id, file_id, FileType::Parquet);
assert!(write_cache.file_cache.contains_key(&key));
// Check file data
let remote_data = mock_store.read(&upload_path).await.unwrap();
let remote_data = mock_store.read(&sst_upload_path).await.unwrap();
let cache_data = local_store
.read(&write_cache.file_cache.cache_file_path(key))
.await
@@ -436,6 +442,7 @@ mod tests {
#[tokio::test]
async fn test_read_metadata_from_write_cache() {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::new();
let data_home = env.data_home().display().to_string();
let mock_store = env.init_object_store_manager();
@@ -456,8 +463,7 @@ mod tests {
// Create source
let metadata = Arc::new(sst_region_metadata());
let handle = sst_file_handle(0, 1000);
let file_id = handle.file_id();
let source = new_source(&[
new_batch_by_range(&["a", "d"], 0, 60),
new_batch_by_range(&["b", "f"], 0, 40),
@@ -467,7 +473,6 @@ mod tests {
// Write to local cache and upload sst to mock remote store
let write_request = SstWriteRequest {
op_type: OperationType::Flush,
file_id,
metadata,
source,
storage: None,
@@ -482,11 +487,10 @@ mod tests {
row_group_size: 512,
..Default::default()
};
let upload_path = sst_file_path(&data_home, file_id);
let index_upload_path = index_file_path(&data_home, file_id);
let upload_request = SstUploadRequest {
upload_path: upload_path.clone(),
index_upload_path: index_upload_path.clone(),
dest_path_provider: RegionFilePathFactory {
region_dir: data_home.clone(),
},
remote_store: mock_store.clone(),
};
@@ -494,10 +498,11 @@ mod tests {
.write_and_upload_sst(write_request, upload_request, &write_opts)
.await
.unwrap()
.unwrap();
.remove(0);
let write_parquet_metadata = sst_info.file_metadata.unwrap();
// Read metadata from write cache
let handle = sst_file_handle_with_file_id(sst_info.file_id, 0, 1000);
let builder = ParquetReaderBuilder::new(data_home, handle.clone(), mock_store.clone())
.cache(CacheStrategy::EnableAll(cache_manager.clone()));
let reader = builder.build().await.unwrap();

View File

@@ -68,7 +68,7 @@ use crate::schedule::remote_job_scheduler::{
CompactionJob, DefaultNotifier, RemoteJob, RemoteJobSchedulerRef,
};
use crate::schedule::scheduler::SchedulerRef;
use crate::sst::file::{FileHandle, FileId, FileMeta, Level};
use crate::sst::file::{FileHandle, FileMeta, Level};
use crate::sst::version::LevelMeta;
use crate::worker::WorkerListener;
@@ -596,7 +596,6 @@ impl CompactionStatus {
#[derive(Debug, Clone)]
pub struct CompactionOutput {
pub output_file_id: FileId,
/// Compaction output file level.
pub output_level: Level,
/// Compaction input files.
@@ -610,7 +609,6 @@ pub struct CompactionOutput {
/// SerializedCompactionOutput is a serialized version of [CompactionOutput] by replacing [FileHandle] with [FileMeta].
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SerializedCompactionOutput {
output_file_id: FileId,
output_level: Level,
inputs: Vec<FileMeta>,
filter_deleted: bool,

View File

@@ -20,6 +20,7 @@ use api::v1::region::compact_request;
use common_meta::key::SchemaMetadataManagerRef;
use common_telemetry::{info, warn};
use common_time::TimeToLive;
use itertools::Itertools;
use object_store::manager::ObjectStoreManagerRef;
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};
@@ -278,19 +279,6 @@ impl Compactor for DefaultCompactor {
for output in picker_output.outputs.drain(..) {
compacted_inputs.extend(output.inputs.iter().map(|f| f.meta_ref().clone()));
info!(
"Compaction region {} output [{}]-> {}",
compaction_region.region_id,
output
.inputs
.iter()
.map(|f| f.file_id().to_string())
.collect::<Vec<_>>()
.join(","),
output.output_file_id
);
let write_opts = WriteOptions {
write_buffer_size: compaction_region.engine_config.sst_write_buffer_size,
..Default::default()
@@ -299,7 +287,6 @@ impl Compactor for DefaultCompactor {
let region_metadata = compaction_region.region_metadata.clone();
let sst_layer = compaction_region.access_layer.clone();
let region_id = compaction_region.region_id;
let file_id = output.output_file_id;
let cache_manager = compaction_region.cache_manager.clone();
let storage = compaction_region.region_options.storage.clone();
let index_options = compaction_region
@@ -320,6 +307,11 @@ impl Compactor for DefaultCompactor {
.max()
.flatten();
futs.push(async move {
let input_file_names = output
.inputs
.iter()
.map(|f| f.file_id().to_string())
.join(",");
let reader = CompactionSstReaderBuilder {
metadata: region_metadata.clone(),
sst_layer: sst_layer.clone(),
@@ -332,11 +324,10 @@ impl Compactor for DefaultCompactor {
}
.build_sst_reader()
.await?;
let file_meta_opt = sst_layer
let output_files = sst_layer
.write_sst(
SstWriteRequest {
op_type: OperationType::Compact,
file_id,
metadata: region_metadata,
source: Source::Reader(reader),
cache_manager,
@@ -350,9 +341,10 @@ impl Compactor for DefaultCompactor {
&write_opts,
)
.await?
.into_iter()
.map(|sst_info| FileMeta {
region_id,
file_id,
file_id: sst_info.file_id,
time_range: sst_info.time_range,
level: output.output_level,
file_size: sst_info.file_size,
@@ -361,8 +353,15 @@ impl Compactor for DefaultCompactor {
num_rows: sst_info.num_rows as u64,
num_row_groups: sst_info.num_row_groups,
sequence: max_sequence,
});
Ok(file_meta_opt)
})
.collect::<Vec<_>>();
let output_file_names =
output_files.iter().map(|f| f.file_id.to_string()).join(",");
info!(
"Region {} compaction inputs: [{}], outputs: [{}]",
region_id, input_file_names, output_file_names
);
Ok(output_files)
});
}
let mut output_files = Vec::with_capacity(futs.len());
@@ -377,7 +376,7 @@ impl Compactor for DefaultCompactor {
.await
.context(JoinSnafu)?
.into_iter()
.collect::<Result<Vec<_>>>()?;
.collect::<Result<Vec<Vec<_>>>>()?;
output_files.extend(metas.into_iter().flatten());
}

View File

@@ -61,7 +61,6 @@ impl From<&PickerOutput> for SerializedPickerOutput {
.outputs
.iter()
.map(|output| SerializedCompactionOutput {
output_file_id: output.output_file_id,
output_level: output.output_level,
inputs: output.inputs.iter().map(|s| s.meta_ref().clone()).collect(),
filter_deleted: output.filter_deleted,
@@ -91,7 +90,6 @@ impl PickerOutput {
.outputs
.into_iter()
.map(|output| CompactionOutput {
output_file_id: output.output_file_id,
output_level: output.output_level,
inputs: output
.inputs
@@ -167,14 +165,12 @@ mod tests {
let picker_output = PickerOutput {
outputs: vec![
CompactionOutput {
output_file_id: FileId::random(),
output_level: 0,
inputs: inputs_file_handle.clone(),
filter_deleted: false,
output_time_range: None,
},
CompactionOutput {
output_file_id: FileId::random(),
output_level: 0,
inputs: inputs_file_handle.clone(),
filter_deleted: false,
@@ -205,7 +201,6 @@ mod tests {
.iter()
.zip(picker_output_from_serialized.outputs.iter())
.for_each(|(expected, actual)| {
assert_eq!(expected.output_file_id, actual.output_file_id);
assert_eq!(expected.output_level, actual.output_level);
expected
.inputs

View File

@@ -26,7 +26,7 @@ use crate::compaction::compactor::CompactionRegion;
use crate::compaction::picker::{Picker, PickerOutput};
use crate::compaction::run::{find_sorted_runs, reduce_runs, Item};
use crate::compaction::{get_expired_ssts, CompactionOutput};
use crate::sst::file::{overlaps, FileHandle, FileId, Level};
use crate::sst::file::{overlaps, FileHandle, Level};
use crate::sst::version::LevelMeta;
const LEVEL_COMPACTED: Level = 1;
@@ -134,7 +134,6 @@ impl TwcsPicker {
for input in split_inputs {
debug_assert!(input.len() > 1);
output.push(CompactionOutput {
output_file_id: FileId::random(),
output_level: LEVEL_COMPACTED, // always compact to l1
inputs: input,
filter_deleted,
@@ -373,7 +372,7 @@ mod tests {
use super::*;
use crate::compaction::test_util::{new_file_handle, new_file_handles};
use crate::sst::file::{FileMeta, Level};
use crate::sst::file::{FileId, FileMeta, Level};
use crate::test_util::NoopFilePurger;
#[test]

View File

@@ -26,7 +26,7 @@ use crate::compaction::buckets::infer_time_bucket;
use crate::compaction::compactor::{CompactionRegion, CompactionVersion};
use crate::compaction::picker::{Picker, PickerOutput};
use crate::compaction::{get_expired_ssts, CompactionOutput};
use crate::sst::file::{FileHandle, FileId};
use crate::sst::file::FileHandle;
/// Compaction picker that splits the time range of all involved files to windows, and merges
/// the data segments intersects with those windows of files together so that the output files
@@ -132,7 +132,6 @@ fn build_output(windows: BTreeMap<i64, (i64, Vec<FileHandle>)>) -> Vec<Compactio
);
let output = CompactionOutput {
output_file_id: FileId::random(),
output_level: 1,
inputs: files,
filter_deleted: false,

View File

@@ -45,7 +45,7 @@ use crate::request::{
SenderWriteRequest, WorkerRequest,
};
use crate::schedule::scheduler::{Job, SchedulerRef};
use crate::sst::file::{FileId, FileMeta};
use crate::sst::file::FileMeta;
use crate::sst::parquet::WriteOptions;
use crate::worker::WorkerListener;
@@ -347,14 +347,12 @@ impl RegionFlushTask {
}
let max_sequence = mem.stats().max_sequence();
let file_id = FileId::random();
let iter = mem.iter(None, None, None)?;
let source = Source::Iter(iter);
// Flush to level 0.
let write_request = SstWriteRequest {
op_type: OperationType::Flush,
file_id,
metadata: version.metadata.clone(),
source,
cache_manager: self.cache_manager.clone(),
@@ -365,29 +363,31 @@ impl RegionFlushTask {
fulltext_index_config: self.engine_config.fulltext_index.clone(),
bloom_filter_index_config: self.engine_config.bloom_filter_index.clone(),
};
let Some(sst_info) = self
let ssts_written = self
.access_layer
.write_sst(write_request, &write_opts)
.await?
else {
.await?;
if ssts_written.is_empty() {
// No data written.
continue;
};
}
flushed_bytes += sst_info.file_size;
let file_meta = FileMeta {
region_id: self.region_id,
file_id,
time_range: sst_info.time_range,
level: 0,
file_size: sst_info.file_size,
available_indexes: sst_info.index_metadata.build_available_indexes(),
index_file_size: sst_info.index_metadata.file_size,
num_rows: sst_info.num_rows as u64,
num_row_groups: sst_info.num_row_groups,
sequence: NonZeroU64::new(max_sequence),
};
file_metas.push(file_meta);
file_metas.extend(ssts_written.into_iter().map(|sst_info| {
flushed_bytes += sst_info.file_size;
FileMeta {
region_id: self.region_id,
file_id: sst_info.file_id,
time_range: sst_info.time_range,
level: 0,
file_size: sst_info.file_size,
available_indexes: sst_info.index_metadata.build_available_indexes(),
index_file_size: sst_info.index_metadata.file_size,
num_rows: sst_info.num_rows as u64,
num_row_groups: sst_info.num_row_groups,
sequence: NonZeroU64::new(max_sequence),
}
}));
}
if !file_metas.is_empty() {

View File

@@ -62,7 +62,7 @@ impl BulkMemtable {
alloc_tracker: AllocTracker::new(write_buffer_manager),
max_timestamp: AtomicI64::new(i64::MIN),
min_timestamp: AtomicI64::new(i64::MAX),
max_sequence: Default::default,
max_sequence: Default::default(),
num_rows: Default::default(),
}
}

View File

@@ -55,7 +55,7 @@ use crate::sst::to_sst_arrow_schema;
#[derive(Debug)]
pub struct BulkPart {
data: Bytes,
pub(crate) data: Bytes,
metadata: BulkPartMeta,
}

View File

@@ -394,6 +394,7 @@ mod tests {
sequence: START_SEQ,
rows: Some(rows),
write_hint: None,
bulk: Vec::new(),
}
}
@@ -432,6 +433,7 @@ mod tests {
sequence: 100,
rows: None,
write_hint: None,
bulk: Vec::new(),
};
let kvs = KeyValues::new(&meta, mutation);
assert!(kvs.is_none());

View File

@@ -731,6 +731,7 @@ mod tests {
rows,
}),
write_hint: None,
bulk: Vec::new(),
};
KeyValues::new(metadata.as_ref(), mutation).unwrap()
}

View File

@@ -26,10 +26,12 @@ use common_time::Timestamp;
use datatypes::arrow;
use datatypes::arrow::array::ArrayRef;
use datatypes::data_type::{ConcreteDataType, DataType};
use datatypes::prelude::{MutableVector, ScalarVectorBuilder, Vector, VectorRef};
use datatypes::prelude::{MutableVector, Vector, VectorRef};
use datatypes::types::TimestampType;
use datatypes::value::{Value, ValueRef};
use datatypes::vectors::{
Helper, UInt64Vector, UInt64VectorBuilder, UInt8Vector, UInt8VectorBuilder,
Helper, TimestampMicrosecondVector, TimestampMillisecondVector, TimestampNanosecondVector,
TimestampSecondVector, UInt64Vector, UInt8Vector,
};
use snafu::{ensure, ResultExt};
use store_api::metadata::RegionMetadataRef;
@@ -691,22 +693,23 @@ impl Series {
/// `ValueBuilder` holds all the vector builders for field columns.
struct ValueBuilder {
timestamp: Box<dyn MutableVector>,
sequence: UInt64VectorBuilder,
op_type: UInt8VectorBuilder,
timestamp: Vec<i64>,
timestamp_type: ConcreteDataType,
sequence: Vec<u64>,
op_type: Vec<u8>,
fields: Vec<Option<Box<dyn MutableVector>>>,
field_types: Vec<ConcreteDataType>,
}
impl ValueBuilder {
fn new(region_metadata: &RegionMetadataRef, capacity: usize) -> Self {
let timestamp = region_metadata
let timestamp_type = region_metadata
.time_index_column()
.column_schema
.data_type
.create_mutable_vector(capacity);
let sequence = UInt64VectorBuilder::with_capacity(capacity);
let op_type = UInt8VectorBuilder::with_capacity(capacity);
.clone();
let sequence = Vec::with_capacity(capacity);
let op_type = Vec::with_capacity(capacity);
let field_types = region_metadata
.field_columns()
@@ -715,7 +718,8 @@ impl ValueBuilder {
let fields = (0..field_types.len()).map(|_| None).collect();
Self {
timestamp,
timestamp: Vec::with_capacity(capacity),
timestamp_type,
sequence,
op_type,
fields,
@@ -727,9 +731,10 @@ impl ValueBuilder {
/// We don't need primary keys since they've already be encoded.
fn push(&mut self, ts: ValueRef, sequence: u64, op_type: u8, fields: Vec<ValueRef>) {
debug_assert_eq!(fields.len(), self.fields.len());
self.timestamp.push_value_ref(ts);
self.sequence.push_value_ref(ValueRef::UInt64(sequence));
self.op_type.push_value_ref(ValueRef::UInt8(op_type));
self.timestamp
.push(ts.as_timestamp().unwrap().unwrap().value());
self.sequence.push(sequence);
self.op_type.push(op_type);
let num_rows = self.timestamp.len();
for (idx, field_value) in fields.into_iter().enumerate() {
if !field_value.is_null() || self.fields[idx].is_some() {
@@ -844,9 +849,23 @@ impl From<ValueBuilder> for Values {
}
})
.collect::<Vec<_>>();
let sequence = Arc::new(value.sequence.finish());
let op_type = Arc::new(value.op_type.finish());
let timestamp = value.timestamp.to_vector();
let sequence = Arc::new(UInt64Vector::from_vec(value.sequence));
let op_type = Arc::new(UInt8Vector::from_vec(value.op_type));
let timestamp: VectorRef = match value.timestamp_type {
ConcreteDataType::Timestamp(TimestampType::Second(_)) => {
Arc::new(TimestampSecondVector::from_vec(value.timestamp))
}
ConcreteDataType::Timestamp(TimestampType::Millisecond(_)) => {
Arc::new(TimestampMillisecondVector::from_vec(value.timestamp))
}
ConcreteDataType::Timestamp(TimestampType::Microsecond(_)) => {
Arc::new(TimestampMicrosecondVector::from_vec(value.timestamp))
}
ConcreteDataType::Timestamp(TimestampType::Nanosecond(_)) => {
Arc::new(TimestampNanosecondVector::from_vec(value.timestamp))
}
_ => unreachable!(),
};
if cfg!(debug_assertions) {
debug_assert_eq!(timestamp.len(), sequence.len());
@@ -1167,6 +1186,7 @@ mod tests {
rows,
}),
write_hint: None,
bulk: Vec::new(),
};
KeyValues::new(schema.as_ref(), mutation).unwrap()
}

View File

@@ -544,10 +544,12 @@ where
.as_ref()
.map(|rows| rows.rows.len())
.unwrap_or(0);
// TODO(yingwen): We need to support schema change as bulk may have different schema.
region_write_ctx.push_mutation(
mutation.op_type,
mutation.rows,
mutation.write_hint,
mutation.bulk,
OptionOutputTx::none(),
);
}

View File

@@ -136,6 +136,7 @@ impl RegionWriteCtx {
op_type: i32,
rows: Option<Rows>,
write_hint: Option<WriteHint>,
bulk: Vec<u8>,
tx: OptionOutputTx,
) {
let num_rows = rows.as_ref().map(|rows| rows.rows.len()).unwrap_or(0);
@@ -144,6 +145,7 @@ impl RegionWriteCtx {
sequence: self.next_sequence,
rows,
write_hint,
bulk,
});
let notify = WriteNotify::new(tx, num_rows);

View File

@@ -105,7 +105,6 @@ pub struct Indexer {
file_id: FileId,
file_path: String,
region_id: RegionId,
puffin_manager: Option<SstPuffinManager>,
inverted_indexer: Option<InvertedIndexer>,
last_mem_inverted_index: usize,
@@ -168,11 +167,15 @@ impl Indexer {
}
}
pub(crate) struct IndexerBuilder<'a> {
#[async_trait::async_trait]
pub trait IndexerBuilder {
/// Builds indexer of given file id to [index_file_path].
async fn build(&self, file_id: FileId, index_file_path: String) -> Indexer;
}
pub(crate) struct IndexerBuilderImpl {
pub(crate) op_type: OperationType,
pub(crate) file_id: FileId,
pub(crate) file_path: String,
pub(crate) metadata: &'a RegionMetadataRef,
pub(crate) metadata: RegionMetadataRef,
pub(crate) row_group_size: usize,
pub(crate) puffin_manager: SstPuffinManager,
pub(crate) intermediate_manager: IntermediateManager,
@@ -182,20 +185,20 @@ pub(crate) struct IndexerBuilder<'a> {
pub(crate) bloom_filter_index_config: BloomFilterConfig,
}
impl IndexerBuilder<'_> {
#[async_trait::async_trait]
impl IndexerBuilder for IndexerBuilderImpl {
/// Sanity check for arguments and create a new [Indexer] if arguments are valid.
pub(crate) async fn build(self) -> Indexer {
async fn build(&self, file_id: FileId, index_file_path: String) -> Indexer {
let mut indexer = Indexer {
file_id: self.file_id,
file_path: self.file_path.clone(),
file_id,
file_path: index_file_path,
region_id: self.metadata.region_id,
..Default::default()
};
indexer.inverted_indexer = self.build_inverted_indexer();
indexer.fulltext_indexer = self.build_fulltext_indexer().await;
indexer.bloom_filter_indexer = self.build_bloom_filter_indexer();
indexer.inverted_indexer = self.build_inverted_indexer(file_id);
indexer.fulltext_indexer = self.build_fulltext_indexer(file_id).await;
indexer.bloom_filter_indexer = self.build_bloom_filter_indexer(file_id);
if indexer.inverted_indexer.is_none()
&& indexer.fulltext_indexer.is_none()
&& indexer.bloom_filter_indexer.is_none()
@@ -204,11 +207,13 @@ impl IndexerBuilder<'_> {
return Indexer::default();
}
indexer.puffin_manager = Some(self.puffin_manager);
indexer.puffin_manager = Some(self.puffin_manager.clone());
indexer
}
}
fn build_inverted_indexer(&self) -> Option<InvertedIndexer> {
impl IndexerBuilderImpl {
fn build_inverted_indexer(&self, file_id: FileId) -> Option<InvertedIndexer> {
let create = match self.op_type {
OperationType::Flush => self.inverted_index_config.create_on_flush.auto(),
OperationType::Compact => self.inverted_index_config.create_on_compaction.auto(),
@@ -217,7 +222,7 @@ impl IndexerBuilder<'_> {
if !create {
debug!(
"Skip creating inverted index due to config, region_id: {}, file_id: {}",
self.metadata.region_id, self.file_id,
self.metadata.region_id, file_id,
);
return None;
}
@@ -228,7 +233,7 @@ impl IndexerBuilder<'_> {
if indexed_column_ids.is_empty() {
debug!(
"No columns to be indexed, skip creating inverted index, region_id: {}, file_id: {}",
self.metadata.region_id, self.file_id,
self.metadata.region_id, file_id,
);
return None;
}
@@ -238,7 +243,7 @@ impl IndexerBuilder<'_> {
else {
warn!(
"Segment row count is 0, skip creating index, region_id: {}, file_id: {}",
self.metadata.region_id, self.file_id,
self.metadata.region_id, file_id,
);
return None;
};
@@ -246,7 +251,7 @@ impl IndexerBuilder<'_> {
let Some(row_group_size) = NonZeroUsize::new(self.row_group_size) else {
warn!(
"Row group size is 0, skip creating index, region_id: {}, file_id: {}",
self.metadata.region_id, self.file_id,
self.metadata.region_id, file_id,
);
return None;
};
@@ -257,8 +262,8 @@ impl IndexerBuilder<'_> {
}
let indexer = InvertedIndexer::new(
self.file_id,
self.metadata,
file_id,
&self.metadata,
self.intermediate_manager.clone(),
self.inverted_index_config.mem_threshold_on_create(),
segment_row_count,
@@ -268,7 +273,7 @@ impl IndexerBuilder<'_> {
Some(indexer)
}
async fn build_fulltext_indexer(&self) -> Option<FulltextIndexer> {
async fn build_fulltext_indexer(&self, file_id: FileId) -> Option<FulltextIndexer> {
let create = match self.op_type {
OperationType::Flush => self.fulltext_index_config.create_on_flush.auto(),
OperationType::Compact => self.fulltext_index_config.create_on_compaction.auto(),
@@ -277,7 +282,7 @@ impl IndexerBuilder<'_> {
if !create {
debug!(
"Skip creating full-text index due to config, region_id: {}, file_id: {}",
self.metadata.region_id, self.file_id,
self.metadata.region_id, file_id,
);
return None;
}
@@ -285,9 +290,9 @@ impl IndexerBuilder<'_> {
let mem_limit = self.fulltext_index_config.mem_threshold_on_create();
let creator = FulltextIndexer::new(
&self.metadata.region_id,
&self.file_id,
&file_id,
&self.intermediate_manager,
self.metadata,
&self.metadata,
self.fulltext_index_config.compress,
mem_limit,
)
@@ -298,7 +303,7 @@ impl IndexerBuilder<'_> {
if creator.is_none() {
debug!(
"Skip creating full-text index due to no columns require indexing, region_id: {}, file_id: {}",
self.metadata.region_id, self.file_id,
self.metadata.region_id, file_id,
);
}
return creator;
@@ -309,19 +314,19 @@ impl IndexerBuilder<'_> {
if cfg!(any(test, feature = "test")) {
panic!(
"Failed to create full-text indexer, region_id: {}, file_id: {}, err: {:?}",
self.metadata.region_id, self.file_id, err
self.metadata.region_id, file_id, err
);
} else {
warn!(
err; "Failed to create full-text indexer, region_id: {}, file_id: {}",
self.metadata.region_id, self.file_id,
self.metadata.region_id, file_id,
);
}
None
}
fn build_bloom_filter_indexer(&self) -> Option<BloomFilterIndexer> {
fn build_bloom_filter_indexer(&self, file_id: FileId) -> Option<BloomFilterIndexer> {
let create = match self.op_type {
OperationType::Flush => self.bloom_filter_index_config.create_on_flush.auto(),
OperationType::Compact => self.bloom_filter_index_config.create_on_compaction.auto(),
@@ -330,15 +335,15 @@ impl IndexerBuilder<'_> {
if !create {
debug!(
"Skip creating bloom filter due to config, region_id: {}, file_id: {}",
self.metadata.region_id, self.file_id,
self.metadata.region_id, file_id,
);
return None;
}
let mem_limit = self.bloom_filter_index_config.mem_threshold_on_create();
let indexer = BloomFilterIndexer::new(
self.file_id,
self.metadata,
file_id,
&self.metadata,
self.intermediate_manager.clone(),
mem_limit,
);
@@ -348,7 +353,7 @@ impl IndexerBuilder<'_> {
if indexer.is_none() {
debug!(
"Skip creating bloom filter due to no columns require indexing, region_id: {}, file_id: {}",
self.metadata.region_id, self.file_id,
self.metadata.region_id, file_id,
);
}
return indexer;
@@ -359,12 +364,12 @@ impl IndexerBuilder<'_> {
if cfg!(any(test, feature = "test")) {
panic!(
"Failed to create bloom filter, region_id: {}, file_id: {}, err: {:?}",
self.metadata.region_id, self.file_id, err
self.metadata.region_id, file_id, err
);
} else {
warn!(
err; "Failed to create bloom filter, region_id: {}, file_id: {}",
self.metadata.region_id, self.file_id,
self.metadata.region_id, file_id,
);
}
@@ -490,11 +495,9 @@ mod tests {
with_fulltext: true,
with_skipping_bloom: true,
});
let indexer = IndexerBuilder {
let indexer = IndexerBuilderImpl {
op_type: OperationType::Flush,
file_id: FileId::random(),
file_path: "test".to_string(),
metadata: &metadata,
metadata,
row_group_size: 1024,
puffin_manager: factory.build(mock_object_store()),
intermediate_manager: intm_manager,
@@ -503,7 +506,7 @@ mod tests {
fulltext_index_config: FulltextIndexConfig::default(),
bloom_filter_index_config: BloomFilterConfig::default(),
}
.build()
.build(FileId::random(), "test".to_string())
.await;
assert!(indexer.inverted_indexer.is_some());
@@ -522,11 +525,9 @@ mod tests {
with_fulltext: true,
with_skipping_bloom: true,
});
let indexer = IndexerBuilder {
let indexer = IndexerBuilderImpl {
op_type: OperationType::Flush,
file_id: FileId::random(),
file_path: "test".to_string(),
metadata: &metadata,
metadata: metadata.clone(),
row_group_size: 1024,
puffin_manager: factory.build(mock_object_store()),
intermediate_manager: intm_manager.clone(),
@@ -538,18 +539,16 @@ mod tests {
fulltext_index_config: FulltextIndexConfig::default(),
bloom_filter_index_config: BloomFilterConfig::default(),
}
.build()
.build(FileId::random(), "test".to_string())
.await;
assert!(indexer.inverted_indexer.is_none());
assert!(indexer.fulltext_indexer.is_some());
assert!(indexer.bloom_filter_indexer.is_some());
let indexer = IndexerBuilder {
let indexer = IndexerBuilderImpl {
op_type: OperationType::Compact,
file_id: FileId::random(),
file_path: "test".to_string(),
metadata: &metadata,
metadata: metadata.clone(),
row_group_size: 1024,
puffin_manager: factory.build(mock_object_store()),
intermediate_manager: intm_manager.clone(),
@@ -561,18 +560,16 @@ mod tests {
},
bloom_filter_index_config: BloomFilterConfig::default(),
}
.build()
.build(FileId::random(), "test".to_string())
.await;
assert!(indexer.inverted_indexer.is_some());
assert!(indexer.fulltext_indexer.is_none());
assert!(indexer.bloom_filter_indexer.is_some());
let indexer = IndexerBuilder {
let indexer = IndexerBuilderImpl {
op_type: OperationType::Compact,
file_id: FileId::random(),
file_path: "test".to_string(),
metadata: &metadata,
metadata,
row_group_size: 1024,
puffin_manager: factory.build(mock_object_store()),
intermediate_manager: intm_manager,
@@ -584,7 +581,7 @@ mod tests {
..Default::default()
},
}
.build()
.build(FileId::random(), "test".to_string())
.await;
assert!(indexer.inverted_indexer.is_some());
@@ -603,11 +600,9 @@ mod tests {
with_fulltext: true,
with_skipping_bloom: true,
});
let indexer = IndexerBuilder {
let indexer = IndexerBuilderImpl {
op_type: OperationType::Flush,
file_id: FileId::random(),
file_path: "test".to_string(),
metadata: &metadata,
metadata: metadata.clone(),
row_group_size: 1024,
puffin_manager: factory.build(mock_object_store()),
intermediate_manager: intm_manager.clone(),
@@ -616,7 +611,7 @@ mod tests {
fulltext_index_config: FulltextIndexConfig::default(),
bloom_filter_index_config: BloomFilterConfig::default(),
}
.build()
.build(FileId::random(), "test".to_string())
.await;
assert!(indexer.inverted_indexer.is_none());
@@ -628,11 +623,9 @@ mod tests {
with_fulltext: false,
with_skipping_bloom: true,
});
let indexer = IndexerBuilder {
let indexer = IndexerBuilderImpl {
op_type: OperationType::Flush,
file_id: FileId::random(),
file_path: "test".to_string(),
metadata: &metadata,
metadata: metadata.clone(),
row_group_size: 1024,
puffin_manager: factory.build(mock_object_store()),
intermediate_manager: intm_manager.clone(),
@@ -641,7 +634,7 @@ mod tests {
fulltext_index_config: FulltextIndexConfig::default(),
bloom_filter_index_config: BloomFilterConfig::default(),
}
.build()
.build(FileId::random(), "test".to_string())
.await;
assert!(indexer.inverted_indexer.is_some());
@@ -653,11 +646,9 @@ mod tests {
with_fulltext: true,
with_skipping_bloom: false,
});
let indexer = IndexerBuilder {
let indexer = IndexerBuilderImpl {
op_type: OperationType::Flush,
file_id: FileId::random(),
file_path: "test".to_string(),
metadata: &metadata,
metadata: metadata.clone(),
row_group_size: 1024,
puffin_manager: factory.build(mock_object_store()),
intermediate_manager: intm_manager,
@@ -666,7 +657,7 @@ mod tests {
fulltext_index_config: FulltextIndexConfig::default(),
bloom_filter_index_config: BloomFilterConfig::default(),
}
.build()
.build(FileId::random(), "test".to_string())
.await;
assert!(indexer.inverted_indexer.is_some());
@@ -685,11 +676,9 @@ mod tests {
with_fulltext: true,
with_skipping_bloom: true,
});
let indexer = IndexerBuilder {
let indexer = IndexerBuilderImpl {
op_type: OperationType::Flush,
file_id: FileId::random(),
file_path: "test".to_string(),
metadata: &metadata,
metadata,
row_group_size: 0,
puffin_manager: factory.build(mock_object_store()),
intermediate_manager: intm_manager,
@@ -698,7 +687,7 @@ mod tests {
fulltext_index_config: FulltextIndexConfig::default(),
bloom_filter_index_config: BloomFilterConfig::default(),
}
.build()
.build(FileId::random(), "test".to_string())
.await;
assert!(indexer.inverted_indexer.is_none());

View File

@@ -19,7 +19,7 @@ use std::sync::Arc;
use common_base::readable_size::ReadableSize;
use parquet::file::metadata::ParquetMetaData;
use crate::sst::file::FileTimeRange;
use crate::sst::file::{FileId, FileTimeRange};
use crate::sst::index::IndexOutput;
use crate::sst::DEFAULT_WRITE_BUFFER_SIZE;
@@ -62,6 +62,8 @@ impl Default for WriteOptions {
/// Parquet SST info returned by the writer.
pub struct SstInfo {
/// SST file id.
pub file_id: FileId,
/// Time range of the SST. The timestamps have the same time unit as the
/// data in the SST.
pub time_range: FileTimeRange,
@@ -95,12 +97,13 @@ mod tests {
use tokio_util::compat::FuturesAsyncWriteCompatExt;
use super::*;
use crate::access_layer::FilePathProvider;
use crate::cache::{CacheManager, CacheStrategy, PageKey};
use crate::sst::index::Indexer;
use crate::sst::index::{Indexer, IndexerBuilder};
use crate::sst::parquet::format::WriteFormat;
use crate::sst::parquet::reader::ParquetReaderBuilder;
use crate::sst::parquet::writer::ParquetWriter;
use crate::sst::DEFAULT_WRITE_CONCURRENCY;
use crate::sst::{location, DEFAULT_WRITE_CONCURRENCY};
use crate::test_util::sst_util::{
assert_parquet_metadata_eq, build_test_binary_test_region_metadata, new_batch_by_range,
new_batch_with_binary, new_source, sst_file_handle, sst_region_metadata,
@@ -109,12 +112,38 @@ mod tests {
const FILE_DIR: &str = "/";
#[derive(Clone)]
struct FixedPathProvider {
file_id: FileId,
}
impl FilePathProvider for FixedPathProvider {
fn build_index_file_path(&self, _file_id: FileId) -> String {
location::index_file_path(FILE_DIR, self.file_id)
}
fn build_sst_file_path(&self, _file_id: FileId) -> String {
location::sst_file_path(FILE_DIR, self.file_id)
}
}
struct NoopIndexBuilder;
#[async_trait::async_trait]
impl IndexerBuilder for NoopIndexBuilder {
async fn build(&self, _file_id: FileId, _path: String) -> Indexer {
Indexer::default()
}
}
#[tokio::test]
async fn test_write_read() {
let mut env = TestEnv::new();
let object_store = env.init_object_store_manager();
let handle = sst_file_handle(0, 1000);
let file_path = handle.file_path(FILE_DIR);
let file_path = FixedPathProvider {
file_id: handle.file_id(),
};
let metadata = Arc::new(sst_region_metadata());
let source = new_source(&[
new_batch_by_range(&["a", "d"], 0, 60),
@@ -126,18 +155,20 @@ mod tests {
row_group_size: 50,
..Default::default()
};
let mut writer = ParquetWriter::new_with_object_store(
object_store.clone(),
metadata.clone(),
NoopIndexBuilder,
file_path,
metadata,
Indexer::default(),
);
)
.await;
let info = writer
.write_all(source, None, &write_opts)
.await
.unwrap()
.unwrap();
.remove(0);
assert_eq!(200, info.num_rows);
assert!(info.file_size > 0);
assert_eq!(
@@ -168,7 +199,6 @@ mod tests {
let mut env = TestEnv::new();
let object_store = env.init_object_store_manager();
let handle = sst_file_handle(0, 1000);
let file_path = handle.file_path(FILE_DIR);
let metadata = Arc::new(sst_region_metadata());
let source = new_source(&[
new_batch_by_range(&["a", "d"], 0, 60),
@@ -183,16 +213,19 @@ mod tests {
// Prepare data.
let mut writer = ParquetWriter::new_with_object_store(
object_store.clone(),
file_path,
metadata.clone(),
Indexer::default(),
);
NoopIndexBuilder,
FixedPathProvider {
file_id: handle.file_id(),
},
)
.await;
writer
.write_all(source, None, &write_opts)
.await
.unwrap()
.unwrap();
.remove(0);
// Enable page cache.
let cache = CacheStrategy::EnableAll(Arc::new(
@@ -236,7 +269,6 @@ mod tests {
let mut env = crate::test_util::TestEnv::new();
let object_store = env.init_object_store_manager();
let handle = sst_file_handle(0, 1000);
let file_path = handle.file_path(FILE_DIR);
let metadata = Arc::new(sst_region_metadata());
let source = new_source(&[
new_batch_by_range(&["a", "d"], 0, 60),
@@ -252,16 +284,19 @@ mod tests {
// sst info contains the parquet metadata, which is converted from FileMetaData
let mut writer = ParquetWriter::new_with_object_store(
object_store.clone(),
file_path,
metadata.clone(),
Indexer::default(),
);
NoopIndexBuilder,
FixedPathProvider {
file_id: handle.file_id(),
},
)
.await;
let sst_info = writer
.write_all(source, None, &write_opts)
.await
.unwrap()
.expect("write_all should return sst info");
.remove(0);
let writer_metadata = sst_info.file_metadata.unwrap();
// read the sst file metadata
@@ -277,7 +312,6 @@ mod tests {
let mut env = TestEnv::new();
let object_store = env.init_object_store_manager();
let handle = sst_file_handle(0, 1000);
let file_path = handle.file_path(FILE_DIR);
let metadata = Arc::new(sst_region_metadata());
let source = new_source(&[
new_batch_by_range(&["a", "d"], 0, 60),
@@ -292,15 +326,18 @@ mod tests {
// Prepare data.
let mut writer = ParquetWriter::new_with_object_store(
object_store.clone(),
file_path,
metadata.clone(),
Indexer::default(),
);
NoopIndexBuilder,
FixedPathProvider {
file_id: handle.file_id(),
},
)
.await;
writer
.write_all(source, None, &write_opts)
.await
.unwrap()
.unwrap();
.remove(0);
// Predicate
let predicate = Some(Predicate::new(vec![Expr::BinaryExpr(BinaryExpr {
@@ -330,7 +367,6 @@ mod tests {
let mut env = TestEnv::new();
let object_store = env.init_object_store_manager();
let handle = sst_file_handle(0, 1000);
let file_path = handle.file_path(FILE_DIR);
let metadata = Arc::new(sst_region_metadata());
let source = new_source(&[
new_batch_by_range(&["a", "z"], 0, 0),
@@ -345,15 +381,18 @@ mod tests {
// Prepare data.
let mut writer = ParquetWriter::new_with_object_store(
object_store.clone(),
file_path,
metadata.clone(),
Indexer::default(),
);
NoopIndexBuilder,
FixedPathProvider {
file_id: handle.file_id(),
},
)
.await;
writer
.write_all(source, None, &write_opts)
.await
.unwrap()
.unwrap();
.remove(0);
let builder = ParquetReaderBuilder::new(FILE_DIR.to_string(), handle.clone(), object_store);
let mut reader = builder.build().await.unwrap();
@@ -365,7 +404,6 @@ mod tests {
let mut env = TestEnv::new();
let object_store = env.init_object_store_manager();
let handle = sst_file_handle(0, 1000);
let file_path = handle.file_path(FILE_DIR);
let metadata = Arc::new(sst_region_metadata());
let source = new_source(&[
new_batch_by_range(&["a", "d"], 0, 60),
@@ -380,16 +418,19 @@ mod tests {
// Prepare data.
let mut writer = ParquetWriter::new_with_object_store(
object_store.clone(),
file_path,
metadata.clone(),
Indexer::default(),
);
NoopIndexBuilder,
FixedPathProvider {
file_id: handle.file_id(),
},
)
.await;
writer
.write_all(source, None, &write_opts)
.await
.unwrap()
.unwrap();
.remove(0);
// Predicate
let predicate = Some(Predicate::new(vec![Expr::BinaryExpr(BinaryExpr {

View File

@@ -28,6 +28,7 @@ use parquet::basic::{Compression, Encoding, ZstdLevel};
use parquet::file::metadata::KeyValue;
use parquet::file::properties::{WriterProperties, WriterPropertiesBuilder};
use parquet::schema::types::ColumnPath;
use smallvec::smallvec;
use snafu::ResultExt;
use store_api::metadata::RegionMetadataRef;
use store_api::storage::consts::SEQUENCE_COLUMN_NAME;
@@ -35,40 +36,48 @@ use store_api::storage::SequenceNumber;
use tokio::io::AsyncWrite;
use tokio_util::compat::{Compat, FuturesAsyncWriteCompatExt};
use crate::access_layer::{FilePathProvider, SstInfoArray};
use crate::error::{InvalidMetadataSnafu, OpenDalSnafu, Result, WriteParquetSnafu};
use crate::read::{Batch, Source};
use crate::sst::index::Indexer;
use crate::sst::file::FileId;
use crate::sst::index::{Indexer, IndexerBuilder};
use crate::sst::parquet::format::WriteFormat;
use crate::sst::parquet::helper::parse_parquet_metadata;
use crate::sst::parquet::{SstInfo, WriteOptions, PARQUET_METADATA_KEY};
use crate::sst::{DEFAULT_WRITE_BUFFER_SIZE, DEFAULT_WRITE_CONCURRENCY};
/// Parquet SST writer.
pub struct ParquetWriter<F: WriterFactory> {
pub struct ParquetWriter<F: WriterFactory, I: IndexerBuilder, P: FilePathProvider> {
/// Path provider that creates SST and index file paths according to file id.
path_provider: P,
writer: Option<AsyncArrowWriter<SizeAwareWriter<F::Writer>>>,
/// Current active file id.
current_file: FileId,
writer_factory: F,
/// Region metadata of the source and the target SST.
metadata: RegionMetadataRef,
indexer: Indexer,
/// Indexer build that can create indexer for multiple files.
indexer_builder: I,
/// Current active indexer.
current_indexer: Option<Indexer>,
bytes_written: Arc<AtomicUsize>,
}
pub trait WriterFactory {
type Writer: AsyncWrite + Send + Unpin;
fn create(&mut self) -> impl Future<Output = Result<Self::Writer>>;
fn create(&mut self, file_path: &str) -> impl Future<Output = Result<Self::Writer>>;
}
pub struct ObjectStoreWriterFactory {
path: String,
object_store: ObjectStore,
}
impl WriterFactory for ObjectStoreWriterFactory {
type Writer = Compat<FuturesAsyncWriter>;
async fn create(&mut self) -> Result<Self::Writer> {
async fn create(&mut self, file_path: &str) -> Result<Self::Writer> {
self.object_store
.writer_with(&self.path)
.writer_with(file_path)
.chunk(DEFAULT_WRITE_BUFFER_SIZE.as_bytes() as usize)
.concurrent(DEFAULT_WRITE_CONCURRENCY)
.await
@@ -77,36 +86,73 @@ impl WriterFactory for ObjectStoreWriterFactory {
}
}
impl ParquetWriter<ObjectStoreWriterFactory> {
pub fn new_with_object_store(
impl<I, P> ParquetWriter<ObjectStoreWriterFactory, I, P>
where
P: FilePathProvider,
I: IndexerBuilder,
{
pub async fn new_with_object_store(
object_store: ObjectStore,
path: String,
metadata: RegionMetadataRef,
indexer: Indexer,
) -> ParquetWriter<ObjectStoreWriterFactory> {
indexer_builder: I,
path_provider: P,
) -> ParquetWriter<ObjectStoreWriterFactory, I, P> {
ParquetWriter::new(
ObjectStoreWriterFactory { path, object_store },
ObjectStoreWriterFactory { object_store },
metadata,
indexer,
indexer_builder,
path_provider,
)
.await
}
}
impl<F> ParquetWriter<F>
impl<F, I, P> ParquetWriter<F, I, P>
where
F: WriterFactory,
I: IndexerBuilder,
P: FilePathProvider,
{
/// Creates a new parquet SST writer.
pub fn new(factory: F, metadata: RegionMetadataRef, indexer: Indexer) -> ParquetWriter<F> {
pub async fn new(
factory: F,
metadata: RegionMetadataRef,
indexer_builder: I,
path_provider: P,
) -> ParquetWriter<F, I, P> {
let init_file = FileId::random();
let index_file_path = path_provider.build_index_file_path(init_file);
let indexer = indexer_builder.build(init_file, index_file_path).await;
ParquetWriter {
path_provider,
writer: None,
current_file: init_file,
writer_factory: factory,
metadata,
indexer,
indexer_builder,
current_indexer: Some(indexer),
bytes_written: Arc::new(AtomicUsize::new(0)),
}
}
async fn get_or_create_indexer(&mut self) -> &mut Indexer {
match self.current_indexer {
None => {
self.current_file = FileId::random();
let index_file_path = self.path_provider.build_index_file_path(self.current_file);
let indexer = self
.indexer_builder
.build(self.current_file, index_file_path)
.await;
self.current_indexer = Some(indexer);
// safety: self.current_indexer already set above.
self.current_indexer.as_mut().unwrap()
}
Some(ref mut indexer) => indexer,
}
}
/// Iterates source and writes all rows to Parquet file.
///
/// Returns the [SstInfo] if the SST is written.
@@ -115,7 +161,7 @@ where
mut source: Source,
override_sequence: Option<SequenceNumber>, // override the `sequence` field from `Source`
opts: &WriteOptions,
) -> Result<Option<SstInfo>> {
) -> Result<SstInfoArray> {
let write_format =
WriteFormat::new(self.metadata.clone()).with_override_sequence(override_sequence);
let mut stats = SourceStats::default();
@@ -128,24 +174,24 @@ where
match res {
Ok(mut batch) => {
stats.update(&batch);
self.indexer.update(&mut batch).await;
self.get_or_create_indexer().await.update(&mut batch).await;
}
Err(e) => {
self.indexer.abort().await;
self.get_or_create_indexer().await.abort().await;
return Err(e);
}
}
}
let index_output = self.indexer.finish().await;
let index_output = self.get_or_create_indexer().await.finish().await;
if stats.num_rows == 0 {
return Ok(None);
return Ok(smallvec![]);
}
let Some(mut arrow_writer) = self.writer.take() else {
// No batch actually written.
return Ok(None);
return Ok(smallvec![]);
};
arrow_writer.flush().await.context(WriteParquetSnafu)?;
@@ -159,15 +205,18 @@ where
// convert FileMetaData to ParquetMetaData
let parquet_metadata = parse_parquet_metadata(file_meta)?;
let file_id = self.current_file;
// object_store.write will make sure all bytes are written or an error is raised.
Ok(Some(SstInfo {
Ok(smallvec![SstInfo {
file_id,
time_range,
file_size,
num_rows: stats.num_rows,
num_row_groups: parquet_metadata.num_row_groups() as u64,
file_metadata: Some(Arc::new(parquet_metadata)),
index_metadata: index_output,
}))
}])
}
/// Customizes per-column config according to schema and maybe column cardinality.
@@ -229,8 +278,9 @@ where
let props_builder = Self::customize_column_config(props_builder, &self.metadata);
let writer_props = props_builder.build();
let sst_file_path = self.path_provider.build_sst_file_path(self.current_file);
let writer = SizeAwareWriter::new(
self.writer_factory.create().await?,
self.writer_factory.create(&sst_file_path).await?,
self.bytes_written.clone(),
);
let arrow_writer =

View File

@@ -290,6 +290,7 @@ pub(crate) fn build_key_values_with_ts_seq_values(
rows,
}),
write_hint: None,
bulk: Vec::new(),
};
KeyValues::new(metadata.as_ref(), mutation).unwrap()
}

View File

@@ -104,13 +104,13 @@ pub fn new_source(batches: &[Batch]) -> Source {
Source::Reader(Box::new(reader))
}
/// Creates a new [FileHandle] for a SST.
pub fn sst_file_handle(start_ms: i64, end_ms: i64) -> FileHandle {
/// Creates a SST file handle with provided file id
pub fn sst_file_handle_with_file_id(file_id: FileId, start_ms: i64, end_ms: i64) -> FileHandle {
let file_purger = new_noop_file_purger();
FileHandle::new(
FileMeta {
region_id: REGION_ID,
file_id: FileId::random(),
file_id,
time_range: (
Timestamp::new_millisecond(start_ms),
Timestamp::new_millisecond(end_ms),
@@ -127,6 +127,11 @@ pub fn sst_file_handle(start_ms: i64, end_ms: i64) -> FileHandle {
)
}
/// Creates a new [FileHandle] for a SST.
pub fn sst_file_handle(start_ms: i64, end_ms: i64) -> FileHandle {
sst_file_handle_with_file_id(FileId::random(), start_ms, end_ms)
}
pub fn new_batch_by_range(tags: &[&str], start: usize, end: usize) -> Batch {
assert!(end >= start);
let pk = new_primary_key(tags);

View File

@@ -166,6 +166,7 @@ pub(crate) fn write_rows_to_version(
sequence: start_ts as u64, // The sequence may be incorrect, but it's fine in test.
rows: Some(rows),
write_hint: None,
bulk: Vec::new(),
};
let key_values = KeyValues::new(&version.metadata, mutation).unwrap();
version.memtables.mutable.write(&key_values).unwrap();

View File

@@ -288,6 +288,7 @@ mod tests {
sequence,
rows: Some(Rows { schema, rows }),
write_hint: None,
bulk: Vec::new(),
}
}

View File

@@ -176,7 +176,7 @@ pub const DEFAULT_ENTRY_RECEIVER_BUFFER_SIZE: usize = 2048;
/// |
/// // may deadlock |
/// distributor.distribute().await; |
/// |
/// |
/// |
/// receivers[0].read().await |
/// ```
@@ -280,6 +280,7 @@ mod tests {
sequence: 1u64,
rows: None,
write_hint: None,
bulk: Vec::new(),
}],
}
.encode_to_vec(),
@@ -294,6 +295,7 @@ mod tests {
sequence: 2u64,
rows: None,
write_hint: None,
bulk: Vec::new(),
}],
}
.encode_to_vec(),
@@ -308,6 +310,7 @@ mod tests {
sequence: 3u64,
rows: None,
write_hint: None,
bulk: Vec::new(),
}],
}
.encode_to_vec(),
@@ -352,6 +355,7 @@ mod tests {
sequence: 1u64,
rows: None,
write_hint: None,
bulk: Vec::new(),
}],
}
)]
@@ -372,6 +376,7 @@ mod tests {
sequence: 2u64,
rows: None,
write_hint: None,
bulk: Vec::new(),
}],
}
)]
@@ -388,6 +393,7 @@ mod tests {
sequence: 1u64,
rows: None,
write_hint: None,
bulk: Vec::new(),
}],
};
let region2 = RegionId::new(1, 2);
@@ -397,6 +403,7 @@ mod tests {
sequence: 3u64,
rows: None,
write_hint: None,
bulk: Vec::new(),
}],
};
let region3 = RegionId::new(1, 3);
@@ -406,6 +413,7 @@ mod tests {
sequence: 3u64,
rows: None,
write_hint: None,
bulk: Vec::new(),
}],
};
let provider = Provider::kafka_provider("my_topic".to_string());
@@ -484,6 +492,7 @@ mod tests {
sequence: 1u64,
rows: None,
write_hint: None,
bulk: Vec::new(),
}],
};
let region2 = RegionId::new(1, 2);
@@ -561,6 +570,7 @@ mod tests {
sequence: 1u64,
rows: None,
write_hint: None,
bulk: Vec::new(),
}],
}
.encode_to_vec(),
@@ -575,6 +585,7 @@ mod tests {
sequence: 2u64,
rows: None,
write_hint: None,
bulk: Vec::new(),
}],
}
.encode_to_vec(),
@@ -589,6 +600,7 @@ mod tests {
sequence: 3u64,
rows: None,
write_hint: None,
bulk: Vec::new(),
}],
}
.encode_to_vec(),
@@ -603,6 +615,7 @@ mod tests {
sequence: 4u64,
rows: None,
write_hint: None,
bulk: Vec::new(),
}],
}
.encode_to_vec(),
@@ -638,6 +651,7 @@ mod tests {
sequence: 4u64,
rows: None,
write_hint: None,
bulk: Vec::new(),
}],
}
)]

View File

@@ -116,6 +116,7 @@ mod tests {
sequence: 1u64,
rows: None,
write_hint: None,
bulk: Vec::new(),
}],
};
let encoded_entry = wal_entry.encode_to_vec();

View File

@@ -246,10 +246,12 @@ impl<S> RegionWorkerLoop<S> {
}
// Collect requests by region.
// TODO(yingwen): Encode into bulk.
region_ctx.push_mutation(
sender_req.request.op_type as i32,
Some(sender_req.request.rows),
sender_req.request.hint,
Vec::new(),
sender_req.sender,
);
}

View File

@@ -258,7 +258,10 @@ impl Stream for SeriesDivideStream {
let timer = std::time::Instant::now();
loop {
if !self.buffer.is_empty() {
let cut_at = self.find_first_diff_row();
let cut_at = match self.find_first_diff_row() {
Ok(cut_at) => cut_at,
Err(e) => return Poll::Ready(Some(Err(e))),
};
if let Some((batch_index, row_index)) = cut_at {
// slice out the first time series and return it.
let half_batch_of_first_series =
@@ -322,10 +325,10 @@ impl SeriesDivideStream {
/// Return the position to cut buffer.
/// None implies the current buffer only contains one time series.
fn find_first_diff_row(&mut self) -> Option<(usize, usize)> {
fn find_first_diff_row(&mut self) -> DataFusionResult<Option<(usize, usize)>> {
// fast path: no tag columns means all data belongs to the same series.
if self.tag_indices.is_empty() {
return None;
return Ok(None);
}
let mut resumed_batch_index = self.inspect_start;
@@ -341,18 +344,26 @@ impl SeriesDivideStream {
for index in &self.tag_indices {
let current_array = batch.column(*index);
let last_array = last_batch.column(*index);
let current_value = current_array
let current_string_array = current_array
.as_any()
.downcast_ref::<StringArray>()
.unwrap()
.value(0);
let last_value = last_array
.ok_or_else(|| {
datafusion::error::DataFusionError::Internal(
"Failed to downcast tag column to StringArray".to_string(),
)
})?;
let last_string_array = last_array
.as_any()
.downcast_ref::<StringArray>()
.unwrap()
.value(last_row);
.ok_or_else(|| {
datafusion::error::DataFusionError::Internal(
"Failed to downcast tag column to StringArray".to_string(),
)
})?;
let current_value = current_string_array.value(0);
let last_value = last_string_array.value(last_row);
if current_value != last_value {
return Some((resumed_batch_index, 0));
return Ok(Some((resumed_batch_index, 0)));
}
}
}
@@ -360,7 +371,15 @@ impl SeriesDivideStream {
// check column by column
for index in &self.tag_indices {
let array = batch.column(*index);
let string_array = array.as_any().downcast_ref::<StringArray>().unwrap();
let string_array =
array
.as_any()
.downcast_ref::<StringArray>()
.ok_or_else(|| {
datafusion::error::DataFusionError::Internal(
"Failed to downcast tag column to StringArray".to_string(),
)
})?;
// the first row number that not equal to the next row.
let mut same_until = 0;
while same_until < num_rows - 1 {
@@ -376,12 +395,12 @@ impl SeriesDivideStream {
// all rows are the same, inspect next batch
resumed_batch_index += 1;
} else {
return Some((resumed_batch_index, result_index));
return Ok(Some((resumed_batch_index, result_index)));
}
}
self.inspect_start = resumed_batch_index;
None
Ok(None)
}
}

View File

@@ -27,6 +27,7 @@ use crate::puffin_manager::stager::Stager;
use crate::puffin_manager::PuffinManager;
/// `FsPuffinManager` is a `PuffinManager` that provides readers and writers for puffin data in filesystem.
#[derive(Clone)]
pub struct FsPuffinManager<S, F> {
/// The stager.
stager: S,

View File

@@ -128,6 +128,18 @@ pub enum Error {
location: Location,
},
#[snafu(display(
"The end time must be greater than start time, start: {:?}, end: {:?}",
start,
end
))]
InvalidTimeRange {
start: i64,
end: i64,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Cannot find column {col}"))]
ColumnNotFound {
col: String,
@@ -193,6 +205,7 @@ impl ErrorExt for Error {
| MultipleVector { .. }
| ExpectRangeSelector { .. }
| ZeroRangeSelector { .. }
| InvalidTimeRange { .. }
| ColumnNotFound { .. }
| FunctionInvalidArgument { .. }
| UnsupportedVectorMatch { .. }

View File

@@ -63,15 +63,19 @@ use promql_parser::parser::{
VectorMatchCardinality, VectorSelector,
};
use snafu::{ensure, OptionExt, ResultExt};
use store_api::metric_engine_consts::{
DATA_SCHEMA_TABLE_ID_COLUMN_NAME, DATA_SCHEMA_TSID_COLUMN_NAME,
};
use table::table::adapter::DfTableProviderAdapter;
use crate::promql::error::{
CatalogSnafu, ColumnNotFoundSnafu, CombineTableColumnMismatchSnafu, DataFusionPlanningSnafu,
ExpectRangeSelectorSnafu, FunctionInvalidArgumentSnafu, MultiFieldsNotSupportedSnafu,
MultipleMetricMatchersSnafu, MultipleVectorSnafu, NoMetricMatcherSnafu, PromqlPlanNodeSnafu,
Result, TableNameNotFoundSnafu, TimeIndexNotFoundSnafu, UnexpectedPlanExprSnafu,
UnexpectedTokenSnafu, UnknownTableSnafu, UnsupportedExprSnafu, UnsupportedMatcherOpSnafu,
UnsupportedVectorMatchSnafu, ValueNotFoundSnafu, ZeroRangeSelectorSnafu,
ExpectRangeSelectorSnafu, FunctionInvalidArgumentSnafu, InvalidTimeRangeSnafu,
MultiFieldsNotSupportedSnafu, MultipleMetricMatchersSnafu, MultipleVectorSnafu,
NoMetricMatcherSnafu, PromqlPlanNodeSnafu, Result, TableNameNotFoundSnafu,
TimeIndexNotFoundSnafu, UnexpectedPlanExprSnafu, UnexpectedTokenSnafu, UnknownTableSnafu,
UnsupportedExprSnafu, UnsupportedMatcherOpSnafu, UnsupportedVectorMatchSnafu,
ValueNotFoundSnafu, ZeroRangeSelectorSnafu,
};
/// `time()` function in PromQL.
@@ -982,6 +986,9 @@ impl PromPlanner {
fn build_time_index_filter(&self, offset_duration: i64) -> Result<Option<DfExpr>> {
let start = self.ctx.start;
let end = self.ctx.end;
if end < start {
return InvalidTimeRangeSnafu { start, end }.fail();
}
let lookback_delta = self.ctx.lookback_delta;
let range = self.ctx.range.unwrap_or_default();
let interval = self.ctx.interval;
@@ -1146,6 +1153,10 @@ impl PromPlanner {
.table_info()
.meta
.row_key_column_names()
.filter(|col| {
// remove metric engine's internal columns
col != &DATA_SCHEMA_TABLE_ID_COLUMN_NAME && col != &DATA_SCHEMA_TSID_COLUMN_NAME
})
.cloned()
.collect();
self.ctx.tag_columns = tags;

View File

@@ -27,6 +27,7 @@ use common_macro::stack_trace_debug;
use common_telemetry::{error, warn};
use datatypes::prelude::ConcreteDataType;
use headers::ContentType;
use http::header::InvalidHeaderValue;
use query::parser::PromQuery;
use serde_json::json;
use snafu::{Location, Snafu};
@@ -345,6 +346,14 @@ pub enum Error {
location: Location,
},
#[snafu(display("Invalid http header value"))]
InvalidHeaderValue {
#[snafu(source)]
error: InvalidHeaderValue,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Error accessing catalog"))]
Catalog {
source: catalog::error::Error,
@@ -678,7 +687,7 @@ impl ErrorExt for Error {
#[cfg(feature = "mem-prof")]
DumpProfileData { source, .. } => source.status_code(),
InvalidUtf8Value { .. } => StatusCode::InvalidArguments,
InvalidUtf8Value { .. } | InvalidHeaderValue { .. } => StatusCode::InvalidArguments,
ParsePromQL { source, .. } => source.status_code(),
Other { source, .. } => source.status_code(),

View File

@@ -36,14 +36,14 @@ use datatypes::schema::SchemaRef;
use datatypes::value::transform_value_ref_to_json_value;
use event::{LogState, LogValidatorRef};
use futures::FutureExt;
use http::Method;
use http::{HeaderValue, Method};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use snafu::{ensure, ResultExt};
use tokio::sync::oneshot::{self, Sender};
use tokio::sync::Mutex;
use tower::ServiceBuilder;
use tower_http::cors::{Any, CorsLayer};
use tower_http::cors::{AllowOrigin, Any, CorsLayer};
use tower_http::decompression::RequestDecompressionLayer;
use tower_http::trace::TraceLayer;
@@ -52,7 +52,8 @@ use self::result::table_result::TableResponse;
use crate::configurator::ConfiguratorRef;
use crate::elasticsearch;
use crate::error::{
AddressBindSnafu, AlreadyStartedSnafu, Error, InternalIoSnafu, Result, ToJsonSnafu,
AddressBindSnafu, AlreadyStartedSnafu, Error, InternalIoSnafu, InvalidHeaderValueSnafu, Result,
ToJsonSnafu,
};
use crate::http::influxdb::{influxdb_health, influxdb_ping, influxdb_write_v1, influxdb_write_v2};
use crate::http::prometheus::{
@@ -140,6 +141,10 @@ pub struct HttpOptions {
pub body_limit: ReadableSize,
pub is_strict_mode: bool,
pub cors_allowed_origins: Vec<String>,
pub enable_cors: bool,
}
impl Default for HttpOptions {
@@ -150,6 +155,8 @@ impl Default for HttpOptions {
disable_dashboard: false,
body_limit: DEFAULT_BODY_LIMIT,
is_strict_mode: false,
cors_allowed_origins: Vec::new(),
enable_cors: true,
}
}
}
@@ -715,7 +722,7 @@ impl HttpServer {
/// Attaches middlewares and debug routes to the router.
/// Callers should call this method after [HttpServer::make_app()].
pub fn build(&self, router: Router) -> Router {
pub fn build(&self, router: Router) -> Result<Router> {
let timeout_layer = if self.options.timeout != Duration::default() {
Some(ServiceBuilder::new().layer(DynamicTimeoutLayer::new(self.options.timeout)))
} else {
@@ -731,26 +738,45 @@ impl HttpServer {
info!("HTTP server body limit is disabled");
None
};
let cors_layer = if self.options.enable_cors {
Some(
CorsLayer::new()
.allow_methods([
Method::GET,
Method::POST,
Method::PUT,
Method::DELETE,
Method::HEAD,
])
.allow_origin(if self.options.cors_allowed_origins.is_empty() {
AllowOrigin::from(Any)
} else {
AllowOrigin::from(
self.options
.cors_allowed_origins
.iter()
.map(|s| {
HeaderValue::from_str(s.as_str())
.context(InvalidHeaderValueSnafu)
})
.collect::<Result<Vec<HeaderValue>>>()?,
)
})
.allow_headers(Any),
)
} else {
info!("HTTP server cross-origin is disabled");
None
};
router
Ok(router
// middlewares
.layer(
ServiceBuilder::new()
// disable on failure tracing. because printing out isn't very helpful,
// and we have impl IntoResponse for Error. It will print out more detailed error messages
.layer(TraceLayer::new_for_http().on_failure(()))
.layer(
CorsLayer::new()
.allow_methods([
Method::GET,
Method::POST,
Method::PUT,
Method::DELETE,
Method::HEAD,
])
.allow_origin(Any)
.allow_headers(Any),
)
.option_layer(cors_layer)
.option_layer(timeout_layer)
.option_layer(body_limit_layer)
// auth layer
@@ -772,7 +798,7 @@ impl HttpServer {
.route("/cpu", routing::post(pprof::pprof_handler))
.route("/mem", routing::post(mem_prof::mem_prof_handler)),
),
)
))
}
fn route_metrics<S>(metrics_handler: MetricsHandler) -> Router<S> {
@@ -1032,7 +1058,7 @@ impl Server for HttpServer {
if let Some(configurator) = self.plugins.get::<ConfiguratorRef>() {
app = configurator.config_http(app);
}
let app = self.build(app);
let app = self.build(app)?;
let listener = tokio::net::TcpListener::bind(listening)
.await
.context(AddressBindSnafu { addr: listening })?
@@ -1177,17 +1203,123 @@ mod test {
}
fn make_test_app(tx: mpsc::Sender<(String, Vec<u8>)>) -> Router {
make_test_app_custom(tx, HttpOptions::default())
}
fn make_test_app_custom(tx: mpsc::Sender<(String, Vec<u8>)>, options: HttpOptions) -> Router {
let instance = Arc::new(DummyInstance { _tx: tx });
let sql_instance = ServerSqlQueryHandlerAdapter::arc(instance.clone());
let server = HttpServerBuilder::new(HttpOptions::default())
let server = HttpServerBuilder::new(options)
.with_sql_handler(sql_instance)
.build();
server.build(server.make_app()).route(
server.build(server.make_app()).unwrap().route(
"/test/timeout",
get(forever.layer(ServiceBuilder::new().layer(timeout()))),
)
}
#[tokio::test]
pub async fn test_cors() {
// cors is on by default
let (tx, _rx) = mpsc::channel(100);
let app = make_test_app(tx);
let client = TestClient::new(app).await;
let res = client.get("/health").send().await;
assert_eq!(res.status(), StatusCode::OK);
assert_eq!(
res.headers()
.get(http::header::ACCESS_CONTROL_ALLOW_ORIGIN)
.expect("expect cors header origin"),
"*"
);
let res = client
.options("/health")
.header("Access-Control-Request-Headers", "x-greptime-auth")
.header("Access-Control-Request-Method", "DELETE")
.header("Origin", "https://example.com")
.send()
.await;
assert_eq!(res.status(), StatusCode::OK);
assert_eq!(
res.headers()
.get(http::header::ACCESS_CONTROL_ALLOW_ORIGIN)
.expect("expect cors header origin"),
"*"
);
assert_eq!(
res.headers()
.get(http::header::ACCESS_CONTROL_ALLOW_HEADERS)
.expect("expect cors header headers"),
"*"
);
assert_eq!(
res.headers()
.get(http::header::ACCESS_CONTROL_ALLOW_METHODS)
.expect("expect cors header methods"),
"GET,POST,PUT,DELETE,HEAD"
);
}
#[tokio::test]
pub async fn test_cors_custom_origins() {
// cors is on by default
let (tx, _rx) = mpsc::channel(100);
let origin = "https://example.com";
let options = HttpOptions {
cors_allowed_origins: vec![origin.to_string()],
..Default::default()
};
let app = make_test_app_custom(tx, options);
let client = TestClient::new(app).await;
let res = client.get("/health").header("Origin", origin).send().await;
assert_eq!(res.status(), StatusCode::OK);
assert_eq!(
res.headers()
.get(http::header::ACCESS_CONTROL_ALLOW_ORIGIN)
.expect("expect cors header origin"),
origin
);
let res = client
.get("/health")
.header("Origin", "https://notallowed.com")
.send()
.await;
assert_eq!(res.status(), StatusCode::OK);
assert!(!res
.headers()
.contains_key(http::header::ACCESS_CONTROL_ALLOW_ORIGIN));
}
#[tokio::test]
pub async fn test_cors_disabled() {
// cors is on by default
let (tx, _rx) = mpsc::channel(100);
let options = HttpOptions {
enable_cors: false,
..Default::default()
};
let app = make_test_app_custom(tx, options);
let client = TestClient::new(app).await;
let res = client.get("/health").send().await;
assert_eq!(res.status(), StatusCode::OK);
assert!(!res
.headers()
.contains_key(http::header::ACCESS_CONTROL_ALLOW_ORIGIN));
}
#[test]
fn test_http_options_default() {
let default = HttpOptions::default();

View File

@@ -45,7 +45,7 @@ use serde_json::Value;
use session::context::{QueryContext, QueryContextRef};
use snafu::{Location, OptionExt, ResultExt};
use store_api::metric_engine_consts::{
DATA_SCHEMA_TABLE_ID_COLUMN_NAME, DATA_SCHEMA_TSID_COLUMN_NAME,
DATA_SCHEMA_TABLE_ID_COLUMN_NAME, DATA_SCHEMA_TSID_COLUMN_NAME, PHYSICAL_TABLE_METADATA_KEY,
};
pub use super::result::prometheus_resp::PrometheusJsonResponse;
@@ -941,16 +941,30 @@ pub async fn label_values_query(
.start_timer();
if label_name == METRIC_NAME_LABEL {
let mut table_names = match handler
.catalog_manager()
.table_names(&catalog, &schema, Some(&query_ctx))
.await
{
Ok(table_names) => table_names,
Err(e) => {
return PrometheusJsonResponse::error(e.status_code(), e.output_msg());
let catalog_manager = handler.catalog_manager();
let mut tables_stream = catalog_manager.tables(&catalog, &schema, Some(&query_ctx));
let mut table_names = Vec::new();
while let Some(table) = tables_stream.next().await {
// filter out physical tables
match table {
Ok(table) => {
if table
.table_info()
.meta
.options
.extra_options
.contains_key(PHYSICAL_TABLE_METADATA_KEY)
{
continue;
}
table_names.push(table.table_info().name.clone());
}
Err(e) => {
return PrometheusJsonResponse::error(e.status_code(), e.output_msg());
}
}
};
}
table_names.sort_unstable();
return PrometheusJsonResponse::success(PrometheusResponse::LabelValues(table_names));
} else if label_name == FIELD_NAME_LABEL {

View File

@@ -187,6 +187,8 @@ impl PrometheusJsonResponse {
let mut tag_column_indices = Vec::new();
let mut first_field_column_index = None;
let mut num_label_columns = 0;
for (i, column) in batches.schema().column_schemas().iter().enumerate() {
match column.data_type {
ConcreteDataType::Timestamp(datatypes::types::TimestampType::Millisecond(_)) => {
@@ -211,6 +213,7 @@ impl PrometheusJsonResponse {
}
ConcreteDataType::String(_) => {
tag_column_indices.push(i);
num_label_columns += 1;
}
_ => {}
}
@@ -223,9 +226,10 @@ impl PrometheusJsonResponse {
reason: "no value column found".to_string(),
})?;
let metric_name = (METRIC_NAME.to_string(), metric_name);
let mut buffer = BTreeMap::<Vec<(String, String)>, Vec<(f64, String)>>::new();
let metric_name = (METRIC_NAME, metric_name.as_str());
let mut buffer = BTreeMap::<Vec<(&str, &str)>, Vec<(f64, String)>>::new();
let schema = batches.schema();
for batch in batches.iter() {
// prepare things...
let tag_columns = tag_column_indices
@@ -240,7 +244,7 @@ impl PrometheusJsonResponse {
.collect::<Vec<_>>();
let tag_names = tag_column_indices
.iter()
.map(|c| batches.schema().column_name_by_index(*c).to_string())
.map(|c| schema.column_name_by_index(*c))
.collect::<Vec<_>>();
let timestamp_column = batch
.column(timestamp_column_index)
@@ -260,11 +264,12 @@ impl PrometheusJsonResponse {
for row_index in 0..batch.num_rows() {
// retrieve tags
// TODO(ruihang): push table name `__metric__`
let mut tags = vec![metric_name.clone()];
let mut tags = Vec::with_capacity(num_label_columns + 1);
tags.push(metric_name);
for (tag_column, tag_name) in tag_columns.iter().zip(tag_names.iter()) {
// TODO(ruihang): add test for NULL tag
if let Some(tag_value) = tag_column.get_data(row_index) {
tags.push((tag_name.to_string(), tag_value.to_string()));
tags.push((tag_name, tag_value));
}
}
@@ -292,7 +297,10 @@ impl PrometheusJsonResponse {
// accumulate data into result
buffer.into_iter().for_each(|(tags, mut values)| {
let metric = tags.into_iter().collect();
let metric = tags
.into_iter()
.map(|(k, v)| (k.to_string(), v.to_string()))
.collect::<HashMap<_, _>>();
match result {
PromQueryResult::Vector(ref mut v) => {
v.push(PromSeriesVector {

View File

@@ -115,11 +115,7 @@ pub(crate) fn process<'a>(query: &str, query_ctx: QueryContextRef) -> Option<Vec
}
}
static LIMIT_CAST_PATTERN: Lazy<Regex> =
Lazy::new(|| Regex::new("(?i)(LIMIT\\s+\\d+)::bigint").unwrap());
pub(crate) fn rewrite_sql(query: &str) -> Cow<'_, str> {
//TODO(sunng87): remove this when we upgraded datafusion to 43 or newer
let query = LIMIT_CAST_PATTERN.replace_all(query, "$1");
// DBeaver tricky replacement for datafusion not support sql
// TODO: add more here
query
@@ -218,11 +214,6 @@ mod test {
#[test]
fn test_rewrite() {
let sql = "SELECT * FROM number LIMIT 1::bigint";
let sql2 = "SELECT * FROM number limit 1::BIGINT";
assert_eq!("SELECT * FROM number LIMIT 1", rewrite_sql(sql));
assert_eq!("SELECT * FROM number limit 1", rewrite_sql(sql2));
assert_eq!(
"SELECT db.oid as _oid,db.* FROM pg_catalog.pg_database db",
rewrite_sql("SELECT db.oid,db.* FROM pg_catalog.pg_database db")

View File

@@ -121,7 +121,7 @@ fn make_test_app(tx: Arc<mpsc::Sender<(String, String)>>, db_name: Option<&str>)
.with_user_provider(Arc::new(user_provider))
.with_influxdb_handler(instance)
.build();
server.build(server.make_app())
server.build(server.make_app()).unwrap()
}
#[tokio::test]

View File

@@ -112,7 +112,7 @@ fn make_test_app(tx: mpsc::Sender<String>) -> Router {
.with_sql_handler(instance.clone())
.with_opentsdb_handler(instance)
.build();
server.build(server.make_app())
server.build(server.make_app()).unwrap()
}
#[tokio::test]

View File

@@ -141,7 +141,7 @@ fn make_test_app(tx: mpsc::Sender<(String, Vec<u8>)>) -> Router {
.with_sql_handler(instance.clone())
.with_prom_handler(instance, true, is_strict_mode)
.build();
server.build(server.make_app())
server.build(server.make_app()).unwrap()
}
#[tokio::test]

View File

@@ -397,7 +397,10 @@ pub async fn setup_test_http_app(store_type: StorageType, name: &str) -> (Router
.with_metrics_handler(MetricsHandler)
.with_greptime_config_options(instance.opts.datanode_options().to_toml().unwrap())
.build();
(http_server.build(http_server.make_app()), instance.guard)
(
http_server.build(http_server.make_app()).unwrap(),
instance.guard,
)
}
pub async fn setup_test_http_app_with_frontend(
@@ -436,7 +439,7 @@ pub async fn setup_test_http_app_with_frontend_and_user_provider(
let http_server = http_server.build();
let app = http_server.build(http_server.make_app());
let app = http_server.build(http_server.make_app()).unwrap();
(app, instance.guard)
}
@@ -481,7 +484,7 @@ pub async fn setup_test_prom_app_with_frontend(
.with_prometheus_handler(frontend_ref)
.with_greptime_config_options(instance.opts.datanode_options().to_toml().unwrap())
.build();
let app = http_server.build(http_server.make_app());
let app = http_server.build(http_server.make_app()).unwrap();
(app, instance.guard)
}

View File

@@ -704,6 +704,18 @@ pub async fn test_prom_http_api(store_type: StorageType) {
assert!(prom_resp.error_type.is_none());
// query `__name__` without match[]
// create a physical table and a logical table
let res = client
.get("/v1/sql?sql=create table physical_table (`ts` timestamp time index, message string) with ('physical_metric_table' = 'true');")
.send()
.await;
assert_eq!(res.status(), StatusCode::OK, "{:?}", res.text().await);
let res = client
.get("/v1/sql?sql=create table logic_table (`ts` timestamp time index, message string) with ('on_physical_table' = 'physical_table');")
.send()
.await;
assert_eq!(res.status(), StatusCode::OK, "{:?}", res.text().await);
// query `__name__`
let res = client
.get("/v1/prometheus/api/v1/label/__name__/values")
.send()
@@ -713,6 +725,15 @@ pub async fn test_prom_http_api(store_type: StorageType) {
assert_eq!(prom_resp.status, "success");
assert!(prom_resp.error.is_none());
assert!(prom_resp.error_type.is_none());
assert_eq!(
prom_resp.data,
PrometheusResponse::Labels(vec![
"demo".to_string(),
"demo_metrics".to_string(),
"logic_table".to_string(),
"numbers".to_string()
])
);
// buildinfo
let res = client
@@ -881,6 +902,8 @@ addr = "127.0.0.1:4000"
timeout = "30s"
body_limit = "64MiB"
is_strict_mode = false
cors_allowed_origins = []
enable_cors = true
[grpc]
addr = "127.0.0.1:4001"