Compare commits

...

39 Commits

Author SHA1 Message Date
evenyag
d5760a7348 chore: remove unused codes 2025-03-17 15:20:42 +08:00
discord9
bc9614e22c feat: file parallel 2025-03-10 21:00:40 +08:00
discord9
7dd9e98ff6 docs: chore 2025-03-10 16:12:28 +08:00
evenyag
fb6b7f7801 fix: use label value to add map 2025-03-10 15:17:59 +08:00
evenyag
87d7c316df fix: use label value as table name 2025-03-10 14:42:19 +08:00
evenyag
c80a73bc20 feat: use pb in parquet 2025-03-10 14:40:29 +08:00
discord9
dd9d13e7df fix: cli arg 2025-03-10 14:18:35 +08:00
evenyag
79d249f5fa feat: fix panic in TimeSeriesParquetReader 2025-03-10 14:13:37 +08:00
evenyag
63bc544514 refactor: use constant 2025-03-10 14:02:27 +08:00
evenyag
30c29539a3 feat: special handle metric engine path 2025-03-10 13:58:46 +08:00
evenyag
359da62d9e feat: use parquet 2025-03-10 13:36:49 +08:00
evenyag
c9f4b36360 fix: use flushed_sequence as we can't set sequence in ingester 2025-03-10 13:36:49 +08:00
discord9
85c346b16a chore: progress bar 2025-03-10 11:53:33 +08:00
discord9
738c23beb0 feat: time unit 2025-03-10 11:25:23 +08:00
evenyag
8aadd1e59a feat: parquet remote write reader 2025-03-09 23:42:08 +08:00
discord9
cbd58291da chore: more logs 2025-03-09 23:29:58 +08:00
evenyag
e522e8959b chore: add more logs 2025-03-09 21:19:55 +08:00
evenyag
7183a93e5a feat: sanitize mito config 2025-03-09 21:05:21 +08:00
evenyag
8c538622e2 feat: add logs 2025-03-09 20:52:02 +08:00
evenyag
142dacb2c8 chore: update fs object build 2025-03-09 20:52:02 +08:00
discord9
371afc458f chore: init logging 2025-03-09 20:44:53 +08:00
discord9
0751cd74c0 feat: all in one cfg 2025-03-09 20:36:10 +08:00
discord9
ec34e8739a fix: is file 2025-03-09 19:55:12 +08:00
evenyag
b650743785 feat: implement converter convert 2025-03-09 19:53:36 +08:00
discord9
80a8b2e1bd feat: debug output file option 2025-03-09 17:23:14 +08:00
discord9
ec8a15cadd feat: ingester(WIP) 2025-03-09 16:57:26 +08:00
evenyag
f929d751a5 feat: update api 2025-03-09 16:39:35 +08:00
evenyag
fad3621a7a feat: define converter api 2025-03-09 16:05:52 +08:00
evenyag
87723effc7 feat: declare converter 2025-03-09 15:33:49 +08:00
evenyag
62a333ad09 feat: import datanode 2025-03-09 15:32:02 +08:00
evenyag
6ad186a13e feat: series to batch 2025-03-09 15:09:13 +08:00
discord9
77dee84a75 fix: parquet also sort by pk 2025-03-09 14:47:34 +08:00
evenyag
a57e263e5a feat: sort time series 2025-03-08 22:20:13 +08:00
discord9
8796ddaf31 chore: remove unwrap 2025-03-08 20:32:11 +08:00
discord9
7fa3fbdfef feat: parquet reader 2025-03-08 20:27:44 +08:00
jeremyhi
457d2a620c feat: add get table api 2025-03-08 19:53:15 +08:00
evenyag
9f14edbb28 feat: implement sst writer 2025-03-08 17:22:03 +08:00
evenyag
cb3fad0c2d chore: add deps 2025-03-08 16:17:49 +08:00
evenyag
2d1e7c2441 feat: init the converter crate 2025-03-08 14:15:35 +08:00
30 changed files with 2159 additions and 56 deletions

100
Cargo.lock generated
View File

@@ -1594,7 +1594,7 @@ dependencies = [
"bitflags 1.3.2",
"strsim 0.8.0",
"textwrap 0.11.0",
"unicode-width",
"unicode-width 0.1.14",
"vec_map",
]
@@ -1876,7 +1876,7 @@ checksum = "b34115915337defe99b2aff5c2ce6771e5fbc4079f4b506301f5cf394c8452f7"
dependencies = [
"strum 0.26.3",
"strum_macros 0.26.4",
"unicode-width",
"unicode-width 0.1.14",
]
[[package]]
@@ -2469,6 +2469,7 @@ dependencies = [
"encode_unicode",
"lazy_static",
"libc",
"unicode-width 0.1.14",
"windows-sys 0.52.0",
]
@@ -4645,7 +4646,7 @@ version = "0.2.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "14dbbfd5c71d70241ecf9e6f13737f7b5ce823821063188d7e46c41d371eebd5"
dependencies = [
"unicode-width",
"unicode-width 0.1.14",
]
[[package]]
@@ -5599,6 +5600,19 @@ dependencies = [
"serde",
]
[[package]]
name = "indicatif"
version = "0.17.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "183b3088984b400f4cfac3620d5e076c84da5364016b4f49473de574b2586235"
dependencies = [
"console",
"number_prefix",
"portable-atomic",
"unicode-width 0.2.0",
"web-time 1.1.0",
]
[[package]]
name = "inferno"
version = "0.11.21"
@@ -5628,6 +5642,25 @@ dependencies = [
"snafu 0.7.5",
]
[[package]]
name = "ingester"
version = "0.13.0"
dependencies = [
"clap 4.5.19",
"common-telemetry",
"common-time",
"datanode",
"meta-client",
"mito2",
"object-store",
"reqwest",
"serde",
"serde_json",
"sst-convert",
"tokio",
"toml 0.8.19",
]
[[package]]
name = "inotify"
version = "0.9.6"
@@ -7517,6 +7550,12 @@ dependencies = [
"libc",
]
[[package]]
name = "number_prefix"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "830b246a0e5f20af87141b25c173cd1b609bd7779a4617d6ec582abaf90870f3"
[[package]]
name = "objc"
version = "0.2.7"
@@ -7973,7 +8012,7 @@ version = "0.1.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d2ad9b889f1b12e0b9ee24db044b5129150d5eada288edc800f789928dc8c0e3"
dependencies = [
"unicode-width",
"unicode-width 0.1.14",
]
[[package]]
@@ -8069,6 +8108,19 @@ dependencies = [
"zstd-sys",
]
[[package]]
name = "parquet_opendal"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4140ae96f37c170f8d684a544711fabdac1d94adcbd97e8b033329bd37f40446"
dependencies = [
"async-trait",
"bytes",
"futures",
"opendal",
"parquet",
]
[[package]]
name = "parse-zoneinfo"
version = "0.3.1"
@@ -10056,7 +10108,7 @@ dependencies = [
"radix_trie",
"scopeguard",
"unicode-segmentation",
"unicode-width",
"unicode-width 0.1.14",
"utf8parse",
"winapi",
]
@@ -11203,6 +11255,36 @@ dependencies = [
"url",
]
[[package]]
name = "sst-convert"
version = "0.13.0"
dependencies = [
"api",
"arrow-array",
"async-trait",
"catalog",
"common-error",
"common-macro",
"common-meta",
"common-recordbatch",
"common-telemetry",
"datanode",
"datatypes",
"futures",
"futures-util",
"indicatif",
"meta-client",
"metric-engine",
"mito2",
"object-store",
"parquet",
"parquet_opendal",
"prost 0.13.3",
"snafu 0.8.5",
"store-api",
"table",
]
[[package]]
name = "stable_deref_trait"
version = "1.2.0"
@@ -11935,7 +12017,7 @@ version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d326610f408c7a4eb6f51c37c330e496b08506c9457c9d34287ecc38809fb060"
dependencies = [
"unicode-width",
"unicode-width 0.1.14",
]
[[package]]
@@ -13038,6 +13120,12 @@ version = "0.1.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7dd6e30e90baa6f72411720665d41d89b9a3d039dc45b8faea1ddd07f617f6af"
[[package]]
name = "unicode-width"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1fc81956842c57dac11422a97c3b8195a1ff727f06e85c84ed2e8aa277c9a0fd"
[[package]]
name = "unicode-xid"
version = "0.2.6"

View File

@@ -41,6 +41,7 @@ members = [
"src/flow",
"src/frontend",
"src/index",
"src/ingester",
"src/log-query",
"src/log-store",
"src/meta-client",
@@ -58,6 +59,7 @@ members = [
"src/servers",
"src/session",
"src/sql",
"src/sst-convert",
"src/store-api",
"src/table",
"tests-fuzz",
@@ -271,6 +273,7 @@ query = { path = "src/query" }
servers = { path = "src/servers" }
session = { path = "src/session" }
sql = { path = "src/sql" }
sst-convert = { path = "src/sst-convert" }
store-api = { path = "src/store-api" }
substrait = { path = "src/common/substrait" }
table = { path = "src/table" }

76
chore.md Normal file
View File

@@ -0,0 +1,76 @@
# log
## first create table
```bash
mysql --host=127.0.0.1 --port=19195 --database=public;
```
```sql
CREATE DATABASE IF NOT EXISTS `cluster1`;
USE `cluster1`;
CREATE TABLE IF NOT EXISTS `app1` (
`greptime_timestamp` TimestampNanosecond NOT NULL TIME INDEX,
`app` STRING NULL INVERTED INDEX,
`cluster` STRING NULL INVERTED INDEX,
`message` STRING NULL,
`region` STRING NULL,
`cloud-provider` STRING NULL,
`environment` STRING NULL,
`product` STRING NULL,
`sub-product` STRING NULL,
`service` STRING NULL
) WITH (
append_mode = 'true',
'compaction.type' = 'twcs',
'compaction.twcs.max_output_file_size' = '500MB',
'compaction.twcs.max_active_window_files' = '16',
'compaction.twcs.max_active_window_runs' = '4',
'compaction.twcs.max_inactive_window_files' = '4',
'compaction.twcs.max_inactive_window_runs' = '2',
);
select count(*) from app1;
SELECT * FROM app1 ORDER BY greptime_timestamp DESC LIMIT 10\G
```
## then ingest
```bash
RUST_LOG="debug" cargo run --bin=ingester -- --input-dir="/home/discord9/greptimedb/parquet_store_bk/" --parquet-dir="parquet_store/" --cfg="ingester.toml" --db-http-addr="http://127.0.0.1:4000/v1/sst/ingest_json"
```
# metrics!!!!!!!
```bash
mysql --host=127.0.0.1 --port=19195 --database=public < public.greptime_physical_table-create-tables.sql
```
## then ingest
```bash
RUST_LOG="debug"
cargo run --bin=ingester -- --input-dir="/home/discord9/greptimedb/parquet_store_bk/" --remote-write-dir="metrics_parquet/" --cfg="ingester.toml" --db-http-addr="http://127.0.0.1:4000/v1/sst/ingest_json"
# perf it
cargo build --release ---bin=ingester
samply record target/release/ingester --input-dir="/home/discord9/greptimedb/parquet_store_bk/" --remote-write-dir="metrics_parquet/" --cfg="ingester.toml" --db-http-addr="http://127.0.0.1:4000/v1/sst/ingest_json"
```
## check data
```sql
select count(*) from greptime_physical_table;
+----------+
| count(*) |
+----------+
| 36200 |
+----------+
1 row in set (0.06 sec)
select count(*) from storage_operation_errors_total;
+----------+
| count(*) |
+----------+
| 10 |
+----------+
1 row in set (0.03 sec)
```
# with oss
the same, only different is change storage config in `ingester.toml`

35
ingester.toml Normal file
View File

@@ -0,0 +1,35 @@
## The metasrv client options.
[meta_client]
## The addresses of the metasrv.
metasrv_addrs = ["127.0.0.1:3002", "127.0.0.1:3003"]
## Operation timeout.
timeout = "3s"
## Heartbeat timeout.
heartbeat_timeout = "500ms"
## DDL timeout.
ddl_timeout = "10s"
## Connect server timeout.
connect_timeout = "1s"
## `TCP_NODELAY` option for accepted connections.
tcp_nodelay = true
## The configuration about the cache of the metadata.
metadata_cache_max_capacity = 100000
## TTL of the metadata cache.
metadata_cache_ttl = "10m"
# TTI of the metadata cache.
metadata_cache_tti = "5m"
## The data storage options.
[storage]
## The working home directory.
data_home = "/tmp/greptimedb-cluster/datanode0"
type = "File"
[mito]

View File

@@ -25,6 +25,6 @@ pub mod heartbeat;
pub mod metrics;
pub mod region_server;
pub mod service;
mod store;
pub mod store;
#[cfg(any(test, feature = "testing"))]
pub mod tests;

View File

@@ -15,7 +15,7 @@
//! object storage utilities
mod azblob;
mod fs;
pub mod fs;
mod gcs;
mod oss;
mod s3;

View File

@@ -24,7 +24,8 @@ use crate::config::FileConfig;
use crate::error::{self, Result};
use crate::store;
pub(crate) async fn new_fs_object_store(
/// A helper function to create a file system object store.
pub async fn new_fs_object_store(
data_home: &str,
_file_config: &FileConfig,
) -> Result<ObjectStore> {

23
src/ingester/Cargo.toml Normal file
View File

@@ -0,0 +1,23 @@
[package]
name = "ingester"
version.workspace = true
edition.workspace = true
license.workspace = true
[dependencies]
clap.workspace = true
common-telemetry.workspace = true
common-time.workspace = true
datanode.workspace = true
meta-client.workspace = true
mito2.workspace = true
object-store.workspace = true
reqwest.workspace = true
serde.workspace = true
serde_json.workspace = true
sst-convert.workspace = true
tokio.workspace = true
toml.workspace = true
[lints]
workspace = true

294
src/ingester/src/main.rs Normal file
View File

@@ -0,0 +1,294 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use clap::Parser;
use common_telemetry::info;
use common_time::timestamp::TimeUnit;
use datanode::config::StorageConfig;
use meta_client::MetaClientOptions;
use mito2::config::MitoConfig;
use mito2::sst::file::IndexType;
use mito2::sst::parquet::SstInfo;
use serde::{Deserialize, Serialize};
use sst_convert::converter::{InputFile, InputFileType, SstConverterBuilder};
use tokio::sync::oneshot;
#[derive(Parser, Debug)]
#[command(version, about = "Greptime Ingester", long_about = None)]
struct Args {
/// Input directory
#[arg(short, long)]
input_dir: String,
/// Directory of input parquet files, relative to input_dir
#[arg(short, long)]
parquet_dir: Option<String>,
/// Directory of input json files, relative to input_dir
#[arg(short, long)]
remote_write_dir: Option<String>,
/// Config file
#[arg(short, long)]
cfg: String,
/// DB HTTP address
#[arg(short, long)]
db_http_addr: String,
/// Output path for the converted SST files.
/// If it is not None, the converted SST files will be written to the specified path
/// in the `input_store`.
/// This is for debugging purposes.
#[arg(short, long)]
sst_output_path: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
struct IngesterConfig {
meta_client: MetaClientOptions,
storage: StorageConfig,
mito: MitoConfig,
}
pub const APP_NAME: &str = "greptime-ingester";
#[tokio::main]
async fn main() {
let _guard = common_telemetry::init_global_logging(
APP_NAME,
&Default::default(),
&Default::default(),
None,
);
let args = Args::parse();
let cfg_file = std::fs::read_to_string(&args.cfg).expect("Failed to read config file");
let cfg: IngesterConfig = toml::from_str(&cfg_file).expect("Failed to parse config");
let sst_builder = {
let mut builder = SstConverterBuilder::new_fs(args.input_dir)
.with_meta_options(cfg.meta_client)
.with_storage_config(cfg.storage)
.with_config(cfg.mito);
if let Some(output_path) = args.sst_output_path {
builder = builder.with_output_path(output_path);
}
builder
};
let sst_converter = sst_builder
.clone()
.build()
.await
.expect("Failed to build sst converter");
let input_store = sst_converter.input_store.clone();
if let Some(parquet_dir) = args.parquet_dir {
// using opendal to read parquet files in given input object store
let all_parquets = input_store
.list(&parquet_dir)
.await
.expect("Failed to list parquet files");
info!("Listed all files in parquet directory: {:?}", all_parquets);
let all_parquets = all_parquets
.iter()
.filter(|parquet| parquet.name().ends_with(".parquet") && parquet.metadata().is_file())
.collect::<Vec<_>>();
let input_files = all_parquets
.iter()
.map(|parquet| {
let full_table_name = parquet.name().split("-").next().unwrap();
let (catalog_name, schema_name, table_name) = extract_name(full_table_name);
info!(
"catalog: {}, schema: {}, table: {}",
catalog_name, schema_name, table_name
);
InputFile {
catalog: catalog_name.to_string(),
schema: schema_name.to_string(),
table: table_name.to_string(),
path: parquet.path().to_string(),
file_type: InputFileType::Parquet,
}
})
.collect::<Vec<_>>();
convert_and_send(&input_files, sst_builder.clone(), &args.db_http_addr).await;
}
if let Some(remote_write_dir) = args.remote_write_dir {
// using opendal to read parquet files in given input object store
let all_parquets = input_store
.list(&remote_write_dir)
.await
.expect("Failed to list parquet files");
let all_parquets = all_parquets
.iter()
.filter(|parquet| parquet.name().ends_with(".parquet") && parquet.metadata().is_file())
.collect::<Vec<_>>();
let input_files = all_parquets
.iter()
.map(|parquet| {
let full_table_name = parquet.name().split("-").next().unwrap();
let (catalog_name, schema_name, table_name) = extract_name(full_table_name);
info!(
"catalog: {}, schema: {}, table: {}",
catalog_name, schema_name, table_name
);
InputFile {
catalog: catalog_name.to_string(),
schema: schema_name.to_string(),
table: table_name.to_string(),
path: parquet.path().to_string(),
file_type: InputFileType::RemoteWrite,
}
})
.collect::<Vec<_>>();
convert_and_send(&input_files, sst_builder.clone(), &args.db_http_addr).await;
}
}
async fn convert_and_send(
input_files: &[InputFile],
sst_builder: SstConverterBuilder,
db_http_addr: &str,
) {
let table_names = input_files
.iter()
.map(|f| (f.schema.clone(), f.table.clone()))
.collect::<Vec<_>>();
let mut rxs = Vec::new();
// Spawn a task for each input file
info!("Spawning tasks for {} input files", input_files.len());
for input_file in input_files.iter() {
let (tx, rx) = oneshot::channel();
let sst_builder = sst_builder.clone();
let input_file = (*input_file).clone();
tokio::task::spawn(async move {
let mut sst_converter = sst_builder
.build()
.await
.expect("Failed to build sst converter");
let sst_info = sst_converter
.convert_one(&input_file)
.await
.expect("Failed to convert parquet files");
tx.send(sst_info).unwrap();
});
rxs.push(rx);
}
let mut sst_infos = Vec::new();
for rx in rxs {
sst_infos.push(rx.await.unwrap());
}
info!("Converted {} input files", sst_infos.len());
let ingest_reqs = table_names
.iter()
.zip(sst_infos.iter())
.flat_map(|(schema_name, sst_info)| {
sst_info
.ssts
.iter()
.map(|sst| to_ingest_sst_req(&schema_name.0, &schema_name.1, sst))
.collect::<Vec<_>>()
})
.collect::<Vec<_>>();
// send ingest requests to DB
send_ingest_requests(db_http_addr, ingest_reqs)
.await
.unwrap();
}
fn extract_name(full_table_name: &str) -> (String, String, String) {
let mut names = full_table_name.split('.').rev();
let table_name = names.next().unwrap();
let schema_name = names.next().unwrap_or("public");
let catalog_name = names.next().unwrap_or("greptime");
(
catalog_name.to_string(),
schema_name.to_string(),
table_name.to_string(),
)
}
async fn send_ingest_requests(
addr: &str,
reqs: Vec<ClientIngestSstRequest>,
) -> Result<(), Box<dyn std::error::Error>> {
let client = reqwest::Client::new();
for req in reqs {
info!("ingesting sst: {req:?}");
let req = client.post(addr).json(&req);
let resp = req.send().await?;
info!("ingest response: {resp:?}");
}
Ok(())
}
#[derive(Debug, Serialize, Deserialize)]
pub(crate) struct ClientIngestSstRequest {
schema: Option<String>,
table: String,
pub(crate) file_id: String,
pub(crate) min_ts: i64,
pub(crate) max_ts: i64,
pub(crate) file_size: u64,
pub(crate) rows: u32,
pub(crate) row_groups: u32,
/// Available indexes of the file.
pub available_indexes: Vec<IndexType>,
/// Size of the index file.
pub index_file_size: u64,
pub time_unit: u32,
}
fn to_ingest_sst_req(
schema_name: &str,
table_name: &str,
sst_info: &SstInfo,
) -> ClientIngestSstRequest {
let index_file_size = sst_info.index_metadata.file_size;
let available_indexs = sst_info.index_metadata.build_available_indexes();
ClientIngestSstRequest {
schema: Some(schema_name.to_string()),
table: table_name.to_string(),
file_id: sst_info.file_id.to_string(),
min_ts: sst_info.time_range.0.value(),
max_ts: sst_info.time_range.1.value(),
file_size: sst_info.file_size,
rows: sst_info.num_rows as _,
row_groups: sst_info.num_row_groups as _,
available_indexes: available_indexs.to_vec(),
index_file_size,
time_unit: match sst_info.time_range.0.unit() {
TimeUnit::Second => 0,
TimeUnit::Millisecond => 3,
TimeUnit::Microsecond => 6,
TimeUnit::Nanosecond => 9,
},
}
}

View File

@@ -59,7 +59,7 @@ pub mod engine;
pub mod error;
mod metadata_region;
mod metrics;
mod row_modifier;
pub mod row_modifier;
#[cfg(test)]
mod test_util;
mod utils;

View File

@@ -40,7 +40,7 @@ const TSID_HASH_SEED: u32 = 846793005;
///
/// - For [`PrimaryKeyEncoding::Dense`] encoding,
/// it adds two columns(`__table_id`, `__tsid`) to the row.
pub struct RowModifier {
pub(crate) struct RowModifier {
codec: SparsePrimaryKeyCodec,
}
@@ -52,7 +52,7 @@ impl RowModifier {
}
/// Modify rows with the given primary key encoding.
pub fn modify_rows(
pub(crate) fn modify_rows(
&self,
iter: RowsIter,
table_id: TableId,
@@ -145,16 +145,14 @@ impl RowModifier {
/// Fills internal columns of a row with table name and a hash of tag values.
fn fill_internal_columns(&self, table_id: TableId, iter: &RowIter<'_>) -> (Value, Value) {
let mut hasher = mur3::Hasher128::with_seed(TSID_HASH_SEED);
let mut hasher = TsidGenerator::default();
for (name, value) in iter.primary_keys_with_name() {
// The type is checked before. So only null is ignored.
if let Some(ValueData::StringValue(string)) = &value.value_data {
name.hash(&mut hasher);
string.hash(&mut hasher);
hasher.write_label(name, string);
}
}
// TSID is 64 bits, simply truncate the 128 bits hash
let (hash, _) = hasher.finish128();
let hash = hasher.finish();
(
ValueData::U32Value(table_id).into(),
@@ -163,6 +161,34 @@ impl RowModifier {
}
}
/// Tsid generator.
pub struct TsidGenerator {
hasher: mur3::Hasher128,
}
impl Default for TsidGenerator {
fn default() -> Self {
Self {
hasher: mur3::Hasher128::with_seed(TSID_HASH_SEED),
}
}
}
impl TsidGenerator {
/// Writes a label pair to the generator.
pub fn write_label(&mut self, name: &str, value: &str) {
name.hash(&mut self.hasher);
value.hash(&mut self.hasher);
}
/// Generates a new TSID.
pub fn finish(&mut self) -> u64 {
// TSID is 64 bits, simply truncate the 128 bits hash
let (hash, _) = self.hasher.finish128();
hash
}
}
/// Index of a value.
#[derive(Debug, Clone, Copy)]
struct ValueIndex {

View File

@@ -121,7 +121,7 @@ impl AccessLayer {
/// Writes a SST with specific `file_id` and `metadata` to the layer.
///
/// Returns the info of the SST. If no data written, returns None.
pub(crate) async fn write_sst(
pub async fn write_sst(
&self,
request: SstWriteRequest,
write_opts: &WriteOptions,
@@ -191,26 +191,26 @@ impl AccessLayer {
/// `OperationType` represents the origin of the `SstWriteRequest`.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub(crate) enum OperationType {
pub enum OperationType {
Flush,
Compact,
}
/// Contents to build a SST.
pub(crate) struct SstWriteRequest {
pub(crate) op_type: OperationType,
pub(crate) metadata: RegionMetadataRef,
pub(crate) source: Source,
pub(crate) cache_manager: CacheManagerRef,
pub struct SstWriteRequest {
pub op_type: OperationType,
pub metadata: RegionMetadataRef,
pub source: Source,
pub cache_manager: CacheManagerRef,
#[allow(dead_code)]
pub(crate) storage: Option<String>,
pub(crate) max_sequence: Option<SequenceNumber>,
pub storage: Option<String>,
pub max_sequence: Option<SequenceNumber>,
/// Configs for index
pub(crate) index_options: IndexOptions,
pub(crate) inverted_index_config: InvertedIndexConfig,
pub(crate) fulltext_index_config: FulltextIndexConfig,
pub(crate) bloom_filter_index_config: BloomFilterConfig,
pub index_options: IndexOptions,
pub inverted_index_config: InvertedIndexConfig,
pub fulltext_index_config: FulltextIndexConfig,
pub bloom_filter_index_config: BloomFilterConfig,
}
pub(crate) async fn new_fs_cache_store(root: &str) -> Result<ObjectStore> {

View File

@@ -46,6 +46,7 @@ const INDEX_CREATE_MEM_THRESHOLD_FACTOR: u64 = 16;
pub(crate) const FETCH_OPTION_TIMEOUT: Duration = Duration::from_secs(3);
/// Configuration for [MitoEngine](crate::engine::MitoEngine).
/// Before using the config, make sure to call `MitoConfig::validate()` to check if the config is valid.
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
#[serde(default)]
pub struct MitoConfig {

View File

@@ -42,6 +42,13 @@ use crate::worker::WorkerId;
#[snafu(visibility(pub))]
#[stack_trace_debug]
pub enum Error {
#[snafu(display("External error, context: {}", context))]
External {
source: BoxedError,
context: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to encode sparse primary key, reason: {}", reason))]
EncodeSparsePrimaryKey {
reason: String,
@@ -1085,7 +1092,7 @@ impl ErrorExt for Error {
| PuffinPurgeStager { source, .. } => source.status_code(),
CleanDir { .. } => StatusCode::Unexpected,
InvalidConfig { .. } => StatusCode::InvalidArguments,
StaleLogEntry { .. } => StatusCode::Unexpected,
StaleLogEntry { .. } | External { .. } => StatusCode::Unexpected,
FilterRecordBatch { source, .. } => source.status_code(),

View File

@@ -23,8 +23,8 @@
#[cfg_attr(feature = "test", allow(unused))]
pub mod test_util;
mod access_layer;
mod cache;
pub mod access_layer;
pub mod cache;
pub mod compaction;
pub mod config;
pub mod engine;

View File

@@ -14,7 +14,7 @@
//! Mito region.
pub(crate) mod opener;
pub mod opener;
pub mod options;
pub(crate) mod version;

View File

@@ -15,7 +15,7 @@
//! Region opener.
use std::collections::HashMap;
use std::sync::atomic::AtomicI64;
use std::sync::atomic::{AtomicI64, AtomicU64};
use std::sync::Arc;
use common_telemetry::{debug, error, info, warn};
@@ -27,7 +27,9 @@ use object_store::util::{join_dir, normalize_dir};
use snafu::{ensure, OptionExt, ResultExt};
use store_api::logstore::provider::Provider;
use store_api::logstore::LogStore;
use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder};
use store_api::metadata::{
ColumnMetadata, RegionMetadata, RegionMetadataBuilder, RegionMetadataRef,
};
use store_api::region_engine::RegionRole;
use store_api::storage::{ColumnId, RegionId};
@@ -38,6 +40,7 @@ use crate::error::{
EmptyRegionDirSnafu, InvalidMetadataSnafu, ObjectStoreNotFoundSnafu, RegionCorruptedSnafu,
Result, StaleLogEntrySnafu,
};
use crate::manifest::action::RegionManifest;
use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
use crate::manifest::storage::manifest_compress_type;
use crate::memtable::time_partition::TimePartitions;
@@ -203,11 +206,16 @@ impl RegionOpener {
}
// Safety: must be set before calling this method.
let options = self.options.take().unwrap();
let object_store = self.object_store(&options.storage)?.clone();
let object_store = get_object_store(&options.storage, &self.object_store_manager)?.clone();
let provider = self.provider(&options.wal_options);
let metadata = Arc::new(metadata);
// Create a manifest manager for this region and writes regions to the manifest file.
let region_manifest_options = self.manifest_options(config, &options)?;
let region_manifest_options = Self::manifest_options(
config,
&options,
&self.region_dir,
&self.object_store_manager,
)?;
let manifest_manager = RegionManifestManager::new(
metadata.clone(),
region_manifest_options,
@@ -312,7 +320,12 @@ impl RegionOpener {
) -> Result<Option<MitoRegion>> {
let region_options = self.options.as_ref().unwrap().clone();
let region_manifest_options = self.manifest_options(config, &region_options)?;
let region_manifest_options = Self::manifest_options(
config,
&region_options,
&self.region_dir,
&self.object_store_manager,
)?;
let Some(manifest_manager) = RegionManifestManager::open(
region_manifest_options,
self.stats.total_manifest_size.clone(),
@@ -332,7 +345,7 @@ impl RegionOpener {
.take()
.unwrap_or_else(|| wal.wal_entry_reader(&provider, region_id, None));
let on_region_opened = wal.on_region_opened();
let object_store = self.object_store(&region_options.storage)?.clone();
let object_store = get_object_store(&region_options.storage, &self.object_store_manager)?;
debug!("Open region {} with options: {:?}", region_id, self.options);
@@ -422,13 +435,14 @@ impl RegionOpener {
/// Returns a new manifest options.
fn manifest_options(
&self,
config: &MitoConfig,
options: &RegionOptions,
region_dir: &str,
object_store_manager: &ObjectStoreManagerRef,
) -> Result<RegionManifestOptions> {
let object_store = self.object_store(&options.storage)?.clone();
let object_store = get_object_store(&options.storage, object_store_manager)?;
Ok(RegionManifestOptions {
manifest_dir: new_manifest_dir(&self.region_dir),
manifest_dir: new_manifest_dir(region_dir),
object_store,
// We don't allow users to set the compression algorithm as we use it as a file suffix.
// Currently, the manifest storage doesn't have good support for changing compression algorithms.
@@ -436,20 +450,72 @@ impl RegionOpener {
checkpoint_distance: config.manifest_checkpoint_distance,
})
}
}
/// Returns an object store corresponding to `name`. If `name` is `None`, this method returns the default object store.
fn object_store(&self, name: &Option<String>) -> Result<&object_store::ObjectStore> {
if let Some(name) = name {
Ok(self
.object_store_manager
.find(name)
.context(ObjectStoreNotFoundSnafu {
object_store: name.to_string(),
})?)
} else {
Ok(self.object_store_manager.default_object_store())
/// Returns an object store corresponding to `name`. If `name` is `None`, this method returns the default object store.
pub fn get_object_store(
name: &Option<String>,
object_store_manager: &ObjectStoreManagerRef,
) -> Result<object_store::ObjectStore> {
if let Some(name) = name {
Ok(object_store_manager
.find(name)
.context(ObjectStoreNotFoundSnafu {
object_store: name.to_string(),
})?
.clone())
} else {
Ok(object_store_manager.default_object_store().clone())
}
}
/// A loader for loading metadata from a region dir.
pub struct RegionMetadataLoader {
config: Arc<MitoConfig>,
object_store_manager: ObjectStoreManagerRef,
}
impl RegionMetadataLoader {
/// Creates a new `RegionOpenerBuilder`.
pub fn new(config: Arc<MitoConfig>, object_store_manager: ObjectStoreManagerRef) -> Self {
Self {
config,
object_store_manager,
}
}
/// Loads the metadata of the region from the region dir.
pub async fn load(
&self,
region_dir: &str,
region_options: &RegionOptions,
) -> Result<Option<RegionMetadataRef>> {
let manifest = self.load_manifest(region_dir, region_options).await?;
Ok(manifest.map(|m| m.metadata.clone()))
}
/// Loads the manifest of the region from the region dir.
pub async fn load_manifest(
&self,
region_dir: &str,
region_options: &RegionOptions,
) -> Result<Option<Arc<RegionManifest>>> {
let region_manifest_options = RegionOpener::manifest_options(
&self.config,
region_options,
region_dir,
&self.object_store_manager,
)?;
let Some(manifest_manager) =
RegionManifestManager::open(region_manifest_options, Arc::new(AtomicU64::new(0)))
.await?
else {
return Ok(None);
};
let manifest = manifest_manager.manifest();
Ok(Some(manifest))
}
}
/// Checks whether the recovered region has the same schema as region to create.

View File

@@ -33,6 +33,8 @@ use crate::row_converter::dense::SortField;
use crate::row_converter::{CompositeValues, PrimaryKeyCodec, PrimaryKeyFilter};
/// A codec for sparse key of metrics.
/// It requires the input primary key columns are sorted by the column name in lexicographical order.
/// It encodes the column id of the physical region.
#[derive(Clone, Debug)]
pub struct SparsePrimaryKeyCodec {
inner: Arc<SparsePrimaryKeyCodecInner>,

View File

@@ -16,9 +16,9 @@ pub(crate) mod bloom_filter;
mod codec;
pub(crate) mod fulltext_index;
mod indexer;
pub(crate) mod intermediate;
pub mod intermediate;
pub(crate) mod inverted_index;
pub(crate) mod puffin_manager;
pub mod puffin_manager;
mod statistics;
pub(crate) mod store;

View File

@@ -49,6 +49,11 @@ impl IntermediateManager {
/// Create a new `IntermediateManager` with the given root path.
/// It will clean up all garbage intermediate files from previous runs.
pub async fn init_fs(aux_path: impl AsRef<str>) -> Result<Self> {
common_telemetry::info!(
"Initializing intermediate manager, aux_path: {}",
aux_path.as_ref()
);
let store = new_fs_cache_store(&normalize_dir(aux_path.as_ref())).await?;
let store = InstrumentedStore::new(store);

View File

@@ -61,6 +61,7 @@ impl Default for WriteOptions {
}
/// Parquet SST info returned by the writer.
#[derive(Debug)]
pub struct SstInfo {
/// SST file id.
pub file_id: FileId,

View File

@@ -0,0 +1,34 @@
[package]
name = "sst-convert"
version.workspace = true
edition.workspace = true
license.workspace = true
[dependencies]
api.workspace = true
arrow-array.workspace = true
async-trait.workspace = true
catalog.workspace = true
common-error.workspace = true
common-macro.workspace = true
common-meta.workspace = true
common-recordbatch.workspace = true
common-telemetry.workspace = true
datanode.workspace = true
datatypes.workspace = true
futures.workspace = true
futures-util.workspace = true
indicatif = "0.17.0"
meta-client.workspace = true
metric-engine.workspace = true
mito2.workspace = true
object-store.workspace = true
parquet.workspace = true
parquet_opendal = "0.3.0"
prost.workspace = true
snafu.workspace = true
store-api.workspace = true
table.workspace = true
[lints]
workspace = true

View File

@@ -0,0 +1,232 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! SST converter.
use std::sync::Arc;
use common_telemetry::info;
use datanode::config::StorageConfig;
use datanode::datanode::DatanodeBuilder;
use meta_client::MetaClientOptions;
use mito2::access_layer::SstInfoArray;
use mito2::config::MitoConfig;
use mito2::read::Source;
use mito2::sst::parquet::WriteOptions;
use object_store::manager::ObjectStoreManagerRef;
use object_store::services::Fs;
use object_store::ObjectStore;
use snafu::ResultExt;
use crate::error::{DatanodeSnafu, MitoSnafu, ObjectStoreSnafu, Result};
use crate::reader::InputReaderBuilder;
use crate::table::TableMetadataHelper;
use crate::writer::RegionWriterBuilder;
/// Input file type.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum InputFileType {
/// File type is Parquet.
Parquet,
/// File type is remote write JSON.
RemoteWrite,
}
/// Description of a file to convert.
#[derive(Debug, Clone)]
pub struct InputFile {
/// Catalog of the table.
pub catalog: String,
/// Schema of the table.
pub schema: String,
/// Table to write.
/// For metric engine, it needs to be the physical table name.
pub table: String,
/// Path to the file.
pub path: String,
/// Type of the input file.
pub file_type: InputFileType,
}
/// Description of converted files for an input file.
/// A input file can be converted to multiple output files.
#[derive(Debug)]
pub struct OutputSst {
/// Meta of output SST files.
pub ssts: SstInfoArray,
}
/// SST converter takes a list of source files and converts them to SST files.
pub struct SstConverter {
/// Object store for input files.
pub input_store: ObjectStore,
/// Output path for the converted SST files.
/// If it is not None, the converted SST files will be written to the specified path
/// in the `input_store`.
/// This is for debugging purposes.
output_path: Option<String>,
reader_builder: InputReaderBuilder,
writer_builder: RegionWriterBuilder,
write_opts: WriteOptions,
}
impl SstConverter {
/// Converts a list of input to a list of outputs.
pub async fn convert(&mut self, input: &[InputFile]) -> Result<Vec<OutputSst>> {
common_telemetry::info!("Converting input {} files", input.len());
let mut outputs = Vec::with_capacity(input.len());
let bar = indicatif::ProgressBar::new(input.len() as u64);
for file in input {
let output = self.convert_one(file).await?;
outputs.push(output);
bar.inc(1);
}
bar.finish();
common_telemetry::info!("Converted {} files", outputs.len());
Ok(outputs)
}
/// Converts one input.
pub async fn convert_one(&mut self, input: &InputFile) -> Result<OutputSst> {
common_telemetry::info!(
"Converting input file, input_path: {}, output_path: {}",
input.path,
self.output_path.as_deref().unwrap_or(&input.path)
);
let reader_info = self.reader_builder.read_input(input).await?;
let source = Source::Reader(reader_info.reader);
let output_dir = self
.output_path
.as_deref()
.unwrap_or(&reader_info.region_dir);
let writer = self
.writer_builder
.build(reader_info.metadata, output_dir, reader_info.region_options)
.await
.context(MitoSnafu)?;
let ssts = writer
.write_sst(source, &self.write_opts)
.await
.context(MitoSnafu)?;
common_telemetry::info!("Converted input file, input_path: {}", input.path);
Ok(OutputSst { ssts })
}
}
/// Builder to build a SST converter.
#[derive(Clone)]
pub struct SstConverterBuilder {
input_path: String,
meta_options: MetaClientOptions,
storage_config: StorageConfig,
output_path: Option<String>,
config: MitoConfig,
}
impl SstConverterBuilder {
/// Creates a new builder with a file system path as input.
pub fn new_fs(input_path: String) -> Self {
Self {
input_path,
meta_options: MetaClientOptions::default(),
storage_config: StorageConfig::default(),
output_path: None,
config: MitoConfig::default(),
}
}
/// Attaches the meta client options.
pub fn with_meta_options(mut self, meta_options: MetaClientOptions) -> Self {
self.meta_options = meta_options;
self
}
/// Attaches the storage config.
pub fn with_storage_config(mut self, storage_config: StorageConfig) -> Self {
self.storage_config = storage_config;
self
}
/// Sets the output path for the converted SST files.
/// This is for debugging purposes.
pub fn with_output_path(mut self, output_path: String) -> Self {
self.output_path = Some(output_path);
self
}
/// Sets the config for the converted SST files.
pub fn with_config(mut self, config: MitoConfig) -> Self {
self.config = config;
self
}
/// Builds a SST converter.
pub async fn build(mut self) -> Result<SstConverter> {
self.config
.sanitize(&self.storage_config.data_home)
.context(MitoSnafu)?;
common_telemetry::info!(
"Building SST converter, input_path: {}, storage_config: {:?}, config: {:?}",
self.input_path,
self.storage_config,
self.config
);
let input_store = new_input_store(&self.input_path).await?;
let output_store_manager = new_object_store_manager(&self.storage_config).await?;
let table_helper = TableMetadataHelper::new(&self.meta_options).await?;
let config = Arc::new(self.config);
let reader_builder = InputReaderBuilder::new(
input_store.clone(),
table_helper,
output_store_manager.clone(),
config.clone(),
);
let writer_builder = RegionWriterBuilder::new(config, output_store_manager)
.await
.context(MitoSnafu)?;
Ok(SstConverter {
input_store,
output_path: self.output_path,
reader_builder,
writer_builder,
write_opts: WriteOptions::default(),
})
}
}
/// A hepler function to create the object store manager.
pub async fn new_object_store_manager(config: &StorageConfig) -> Result<ObjectStoreManagerRef> {
DatanodeBuilder::build_object_store_manager(config)
.await
.context(DatanodeSnafu)
}
/// Creates a input store from a path.
pub async fn new_input_store(path: &str) -> Result<ObjectStore> {
let builder = Fs::default().root(path);
info!("Creating input store, path: {}", path);
let object_store = ObjectStore::new(builder)
.context(ObjectStoreSnafu)?
.finish();
info!("Created input store: {:?}", object_store);
Ok(object_store)
}

View File

@@ -0,0 +1,111 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Errors for SST conversion.
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_macro::stack_trace_debug;
use snafu::{Location, Snafu};
#[derive(Snafu)]
#[snafu(visibility(pub))]
#[stack_trace_debug]
pub enum Error {
#[snafu(display("Object store error"))]
ObjectStore {
#[snafu(source)]
error: object_store::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Missing __name__ label"))]
MissingMetricName {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Table not found: {}", table_name))]
MissingTable {
table_name: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Column not found: {}", column_name))]
MissingColumn {
column_name: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Mito error"))]
Mito {
source: mito2::error::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Datanode error"))]
Datanode {
source: datanode::error::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Meta error"))]
Meta {
source: common_meta::error::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Meta client error"))]
MetaClient {
source: meta_client::error::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Decode error"))]
Decode {
#[snafu(source)]
error: prost::DecodeError,
#[snafu(implicit)]
location: Location,
},
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
impl ErrorExt for Error {
fn status_code(&self) -> StatusCode {
match self {
Error::ObjectStore { .. } => StatusCode::StorageUnavailable,
Error::MissingMetricName { .. } => StatusCode::InvalidArguments,
Error::MissingTable { .. } => StatusCode::TableNotFound,
Error::MissingColumn { .. } => StatusCode::TableColumnNotFound,
Error::Mito { source, .. } => source.status_code(),
Error::Datanode { source, .. } => source.status_code(),
Error::Meta { source, .. } => source.status_code(),
Error::MetaClient { source, .. } => source.status_code(),
Error::Decode { .. } => StatusCode::InvalidArguments,
}
}
fn as_any(&self) -> &dyn std::any::Any {
self
}
}

View File

@@ -0,0 +1,21 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
pub mod converter;
pub mod error;
pub mod reader;
mod table;
pub mod writer;
pub use reader::parquet::{OpenDALParquetReader, RawParquetReader};

View File

@@ -0,0 +1,183 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Reader to read input data in different formats.
use std::collections::HashMap;
use std::sync::Arc;
use mito2::config::MitoConfig;
use mito2::read::BoxedBatchReader;
use mito2::region::opener::RegionMetadataLoader;
use mito2::region::options::RegionOptions;
use object_store::manager::ObjectStoreManagerRef;
use object_store::util::join_dir;
use object_store::ObjectStore;
use snafu::{OptionExt, ResultExt};
use store_api::metadata::RegionMetadataRef;
use store_api::metric_engine_consts::DATA_REGION_SUBDIR;
use store_api::path_utils::region_dir;
use store_api::storage::{RegionId, SequenceNumber};
use table::metadata::TableId;
use crate::converter::{InputFile, InputFileType};
use crate::error::{MissingTableSnafu, MitoSnafu, Result};
use crate::reader::remote_write::RemoteWriteReader;
use crate::table::TableMetadataHelper;
use crate::OpenDALParquetReader;
pub(crate) mod parquet;
pub mod remote_write;
/// Reader and context.
pub struct ReaderInfo {
pub reader: BoxedBatchReader,
pub region_dir: String,
pub region_options: RegionOptions,
pub metadata: RegionMetadataRef,
}
/// Builder to build readers to read input files.
pub(crate) struct InputReaderBuilder {
input_store: ObjectStore,
table_helper: TableMetadataHelper,
region_loader: RegionMetadataLoader,
/// Cached region infos for tables.
region_infos: HashMap<String, RegionInfo>,
}
impl InputReaderBuilder {
pub(crate) fn new(
input_store: ObjectStore,
table_helper: TableMetadataHelper,
object_store_manager: ObjectStoreManagerRef,
config: Arc<MitoConfig>,
) -> Self {
let region_loader = RegionMetadataLoader::new(config, object_store_manager);
Self {
input_store,
table_helper,
region_loader,
region_infos: HashMap::new(),
}
}
/// Builds a reader to read the input file.
pub async fn read_input(&mut self, input: &InputFile) -> Result<ReaderInfo> {
match input.file_type {
InputFileType::Parquet => self.read_parquet(input).await,
InputFileType::RemoteWrite => self.read_remote_write(input).await,
}
}
/// Builds a reader to read the parquet file.
pub async fn read_parquet(&mut self, input: &InputFile) -> Result<ReaderInfo> {
let region_info = self.get_region_info(input).await?;
let reader = OpenDALParquetReader::new(
self.input_store.clone(),
&input.path,
region_info.metadata.clone(),
Some(region_info.flushed_sequence),
)
.await?;
Ok(ReaderInfo {
reader: Box::new(reader),
region_dir: region_info.region_dir,
region_options: region_info.region_options,
metadata: region_info.metadata,
})
}
/// Builds a reader to read the remote write file.
pub async fn read_remote_write(&mut self, input: &InputFile) -> Result<ReaderInfo> {
let region_info = self.get_region_info(input).await?;
// TODO(yingwen): Should update the sequence.
let reader = RemoteWriteReader::open(
self.input_store.clone(),
&input.path,
&input.catalog,
&input.schema,
region_info.metadata.clone(),
self.table_helper.clone(),
)
.await?
.with_sequence(region_info.flushed_sequence);
Ok(ReaderInfo {
reader: Box::new(reader),
region_dir: region_info.region_dir,
region_options: region_info.region_options,
metadata: region_info.metadata,
})
}
async fn get_region_info(&mut self, input: &InputFile) -> Result<RegionInfo> {
let cache_key = cache_key(&input.catalog, &input.schema, &input.table);
if let Some(region_info) = self.region_infos.get(&cache_key) {
return Ok(region_info.clone());
}
let table_info = self
.table_helper
.get_table(&input.catalog, &input.schema, &input.table)
.await?
.context(MissingTableSnafu {
table_name: &input.table,
})?;
let region_id = to_region_id(table_info.table_info.ident.table_id);
let opts = table_info.table_info.to_region_options();
// TODO(yingwen): We ignore WAL options now. We should `prepare_wal_options()` in the future.
let region_options = RegionOptions::try_from(&opts).context(MitoSnafu)?;
let mut region_dir = region_dir(&table_info.region_storage_path(), region_id);
if input.file_type == InputFileType::RemoteWrite {
// metric engine has two internal regions.
region_dir = join_dir(&region_dir, DATA_REGION_SUBDIR);
}
let manifest = self
.region_loader
.load_manifest(&region_dir, &region_options)
.await
.context(MitoSnafu)?
.context(MissingTableSnafu {
table_name: &table_info.table_info.name,
})?;
let region_info = RegionInfo {
metadata: manifest.metadata.clone(),
flushed_sequence: manifest.flushed_sequence,
region_dir,
region_options,
};
self.region_infos.insert(cache_key, region_info.clone());
Ok(region_info)
}
}
fn to_region_id(table_id: TableId) -> RegionId {
RegionId::new(table_id, 0)
}
fn cache_key(catalog: &str, schema: &str, table: &str) -> String {
format!("{}/{}/{}", catalog, schema, table)
}
#[derive(Clone)]
struct RegionInfo {
metadata: RegionMetadataRef,
flushed_sequence: SequenceNumber,
region_dir: String,
region_options: RegionOptions,
}

View File

@@ -0,0 +1,318 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Parquet file format support.
use std::collections::{BTreeMap, HashMap, VecDeque};
use std::pin::Pin;
use std::sync::Arc;
use api::v1::OpType;
use common_error::ext::BoxedError;
use common_error::snafu::{OptionExt, ResultExt};
use common_recordbatch::error::UnsupportedOperationSnafu;
use common_recordbatch::RecordBatch;
use datatypes::prelude::DataType;
use datatypes::scalars::ScalarVectorBuilder;
use datatypes::schema::Schema;
use datatypes::value::Value;
use datatypes::vectors::{MutableVector, UInt64VectorBuilder, UInt8VectorBuilder};
use futures_util::StreamExt;
use mito2::error::ReadParquetSnafu;
use mito2::read::{Batch, BatchColumn, BatchReader};
use mito2::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodec, SparsePrimaryKeyCodec};
use object_store::ObjectStore;
use parquet::arrow::async_reader::{AsyncFileReader, ParquetRecordBatchStream};
use parquet::arrow::ParquetRecordBatchStreamBuilder;
use parquet_opendal::AsyncReader;
use store_api::codec::PrimaryKeyEncoding;
use store_api::metadata::RegionMetadataRef;
use store_api::storage::{ColumnId, SequenceNumber};
use crate::error::{MitoSnafu, ObjectStoreSnafu, Result};
pub struct OpenDALParquetReader {
inner: RawParquetReader<AsyncReader>,
}
impl OpenDALParquetReader {
pub async fn new(
operator: ObjectStore,
path: &str,
metadata: RegionMetadataRef,
override_sequence: Option<SequenceNumber>,
) -> Result<Self> {
let reader = operator.reader_with(path).await.context(ObjectStoreSnafu)?;
let content_len = operator
.stat(path)
.await
.context(ObjectStoreSnafu)?
.content_length();
let reader = AsyncReader::new(reader, content_len).with_prefetch_footer_size(512 * 1024);
let stream = ParquetRecordBatchStreamBuilder::new(reader)
.await
.context(ReadParquetSnafu { path })
.context(MitoSnafu)?
.build()
.context(ReadParquetSnafu { path })
.context(MitoSnafu)?;
Ok(Self {
inner: RawParquetReader::new(stream, metadata, override_sequence, path),
})
}
}
#[async_trait::async_trait]
impl BatchReader for OpenDALParquetReader {
async fn next_batch(&mut self) -> mito2::error::Result<Option<Batch>> {
self.inner.next_batch().await
}
}
pub struct RawParquetReader<T> {
metadata: RegionMetadataRef,
override_sequence: Option<SequenceNumber>,
output_batch_queue: VecDeque<Batch>,
stream: Pin<Box<ParquetRecordBatchStream<T>>>,
path: String,
}
impl<T: AsyncFileReader + Unpin + Send + 'static> RawParquetReader<T> {
pub fn new(
stream: ParquetRecordBatchStream<T>,
metadata: RegionMetadataRef,
override_sequence: Option<SequenceNumber>,
path: &str,
) -> Self {
Self {
stream: Box::pin(stream),
metadata,
override_sequence,
output_batch_queue: VecDeque::new(),
path: path.to_string(),
}
}
pub async fn next_batch_inner(&mut self) -> mito2::error::Result<Option<Batch>> {
if let Some(batch) = self.output_batch_queue.pop_front() {
return Ok(Some(batch));
}
let Some(next_input_rb) = self.stream.next().await.transpose().with_context(|_| {
mito2::error::ReadParquetSnafu {
path: self.path.clone(),
}
})?
else {
return Ok(None);
};
let schema = Arc::new(
Schema::try_from(next_input_rb.schema())
.map_err(BoxedError::new)
.with_context(|_| mito2::error::ExternalSnafu {
context: format!(
"Failed to convert Schema from DfSchema: {:?}",
next_input_rb.schema()
),
})?,
);
let rb = RecordBatch::try_from_df_record_batch(schema, next_input_rb)
.map_err(BoxedError::new)
.with_context(|_| mito2::error::ExternalSnafu {
context: "Failed to convert RecordBatch from DfRecordBatch".to_string(),
})?;
let new_batches = extract_to_batches(&rb, &self.metadata, self.override_sequence)
.map_err(BoxedError::new)
.with_context(|_| mito2::error::ExternalSnafu {
context: format!("Failed to extract batches from RecordBatch: {:?}", rb),
})?;
self.output_batch_queue.extend(new_batches);
Ok(self.output_batch_queue.pop_front())
}
}
#[async_trait::async_trait]
impl<T: AsyncFileReader + Unpin + Send + 'static> BatchReader for RawParquetReader<T> {
async fn next_batch(&mut self) -> mito2::error::Result<Option<Batch>> {
self.next_batch_inner().await
}
}
pub fn extract_to_batches(
rb: &RecordBatch,
metadata: &RegionMetadataRef,
override_sequence: Option<SequenceNumber>,
) -> Result<Vec<Batch>, BoxedError> {
let pk_codec: Box<dyn PrimaryKeyCodec> = match metadata.primary_key_encoding {
PrimaryKeyEncoding::Dense => Box::new(DensePrimaryKeyCodec::new(metadata)),
PrimaryKeyEncoding::Sparse => Box::new(SparsePrimaryKeyCodec::new(metadata)),
};
let pk_ids = metadata.primary_key.clone();
let pk_names: Vec<_> = pk_ids
.iter()
.map(|id| {
metadata
.column_by_id(*id)
.expect("Can't find column by id")
.column_schema
.name
.clone()
})
.collect();
let pk_pos_in_rb: Vec<_> = pk_names
.into_iter()
.map(|name| {
rb.schema
.column_index_by_name(&name)
.context(UnsupportedOperationSnafu {
reason: format!("Can't find column {} in rb={:?}", name, rb),
})
.map_err(BoxedError::new)
})
.collect::<Result<_, _>>()?;
let mut pk_to_batchs: HashMap<Vec<u8>, SSTBatchBuilder> = HashMap::new();
let mut buffer = Vec::new();
for row in rb.rows() {
let pk_values: Vec<_> = pk_ids
.iter()
.zip(pk_pos_in_rb.iter())
.map(|(id, pos)| (*id, row[*pos].clone()))
.collect();
pk_codec
.encode_values(&pk_values, &mut buffer)
.map_err(BoxedError::new)?;
let cur_pk = &buffer;
let builder = if let Some(builder) = pk_to_batchs.get_mut(cur_pk) {
builder
} else {
let builder =
SSTBatchBuilder::new(rb, metadata, override_sequence).map_err(BoxedError::new)?;
pk_to_batchs.insert(cur_pk.clone(), builder);
pk_to_batchs.get_mut(cur_pk).expect("Just inserted")
};
builder.push_row(&row).map_err(BoxedError::new)?;
}
// sort batches by primary key
let mut batches = BTreeMap::new();
for (pk, builder) in pk_to_batchs {
batches.insert(pk.clone(), builder.finish(pk).map_err(BoxedError::new)?);
}
let batches = batches.into_values().collect();
Ok(batches)
}
struct SSTBatchBuilder {
/// for extract field column from record batch's row
field_column_pos: Vec<usize>,
field_ids: Vec<ColumnId>,
field_builders: Vec<Box<dyn MutableVector>>,
timestamp_pos: usize,
timestamp_builder: Box<dyn MutableVector>,
/// override sequence number
override_sequence: Option<SequenceNumber>,
sequence_builder: UInt64VectorBuilder,
op_type_builder: UInt8VectorBuilder,
cur_seq: SequenceNumber,
}
impl SSTBatchBuilder {
fn finish(mut self, pk: Vec<u8>) -> Result<Batch, BoxedError> {
let fields: Vec<_> = self
.field_ids
.iter()
.zip(self.field_builders)
.map(|(id, mut b)| BatchColumn {
column_id: *id,
data: b.to_vector(),
})
.collect();
Batch::new(
pk,
self.timestamp_builder.to_vector(),
Arc::new(self.sequence_builder.finish()),
Arc::new(self.op_type_builder.finish()),
fields,
)
.map_err(BoxedError::new)
}
fn push_row(&mut self, row: &[Value]) -> Result<(), BoxedError> {
for (field_pos, field_builder) in self
.field_column_pos
.iter()
.zip(self.field_builders.iter_mut())
{
field_builder.push_value_ref(row[*field_pos].as_value_ref());
}
self.timestamp_builder
.push_value_ref(row[self.timestamp_pos].as_value_ref());
self.sequence_builder
.push(Some(self.override_sequence.unwrap_or(self.cur_seq)));
self.op_type_builder.push(Some(OpType::Put as u8));
self.cur_seq += 1;
Ok(())
}
fn new(
rb: &RecordBatch,
metadata: &RegionMetadataRef,
override_sequence: Option<SequenceNumber>,
) -> Result<Self, BoxedError> {
let timeindex_name = &metadata.time_index_column().column_schema.name;
Ok(Self {
field_ids: metadata.field_columns().map(|c| c.column_id).collect(),
field_column_pos: metadata
.field_columns()
.map(|c| &c.column_schema.name)
.map(|name| {
rb.schema
.column_index_by_name(name)
.context(UnsupportedOperationSnafu {
reason: format!("Can't find column {} in rb={:?}", name, rb),
})
.map_err(BoxedError::new)
})
.collect::<Result<_, _>>()?,
field_builders: metadata
.field_columns()
.map(|c| c.column_schema.data_type.create_mutable_vector(512))
.collect(),
timestamp_pos: rb
.schema
.column_index_by_name(timeindex_name)
.context(UnsupportedOperationSnafu {
reason: format!("{} in rb={:?}", timeindex_name, rb),
})
.map_err(BoxedError::new)?,
timestamp_builder: metadata
.time_index_column()
.column_schema
.data_type
.create_mutable_vector(512),
override_sequence,
sequence_builder: UInt64VectorBuilder::with_capacity(512),
op_type_builder: UInt8VectorBuilder::with_capacity(512),
cur_seq: override_sequence.unwrap_or_default(),
})
}
}

View File

@@ -0,0 +1,369 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Prometheus remote write support.
//!
//! Prometheus remote write protocol in parquet format.
//! - Each row contains a protobuf binary representation of a single timeseries.
//! - Each series only occurs once in the file.
//! - Each timeseries must has a the `__name__` label.
use std::collections::HashMap;
use std::sync::Arc;
use api::prom_store::remote::{Label, TimeSeries};
use api::v1::OpType;
use arrow_array::{
Array, BinaryArray, Float64Array, TimestampMillisecondArray, UInt64Array, UInt8Array,
};
use datatypes::value::ValueRef;
use futures::StreamExt;
use metric_engine::row_modifier::TsidGenerator;
use mito2::error::ReadParquetSnafu;
use mito2::read::{Batch, BatchBuilder, BatchReader};
use mito2::row_converter::SparsePrimaryKeyCodec;
use object_store::ObjectStore;
use parquet::arrow::async_reader::ParquetRecordBatchStream;
use parquet::arrow::ParquetRecordBatchStreamBuilder;
use parquet_opendal::AsyncReader;
use prost::Message;
use snafu::{OptionExt, ResultExt};
use store_api::metadata::RegionMetadataRef;
use store_api::storage::consts::ReservedColumnId;
use store_api::storage::{ColumnId, SequenceNumber};
use table::metadata::TableId;
use crate::error::{
DecodeSnafu, MissingColumnSnafu, MissingMetricNameSnafu, MissingTableSnafu, MitoSnafu,
ObjectStoreSnafu, Result,
};
use crate::table::TableMetadataHelper;
const METRIC_NAME_LABEL: &str = "__name__";
const GREPTIME_VALUE: &str = "greptime_value";
/// A reader that reads remote write file, sorts and outputs timeseries in the primary key order.
pub struct RemoteWriteReader {
/// Timeseries sorted by primary key in reverse order.
/// So we can pop the series.
series: Vec<(Vec<u8>, TimeSeries)>,
/// Converter for converting timeseries to batches.
converter: SeriesConverter,
}
impl RemoteWriteReader {
/// Creates a new [`RemoteWriteReader`] from a object store.
/// It reads and sorts timeseries.
pub async fn open(
operator: ObjectStore,
path: &str,
catalog: &str,
schema: &str,
metadata: RegionMetadataRef,
table_helper: TableMetadataHelper,
) -> Result<Self> {
let codec = SparsePrimaryKeyCodec::new(&metadata);
let value_id = metadata
.column_by_name(GREPTIME_VALUE)
.context(MissingColumnSnafu {
column_name: GREPTIME_VALUE,
})?
.column_id;
let encoder = PrimaryKeyEncoder {
catalog: catalog.to_string(),
schema: schema.to_string(),
metadata,
table_helper,
table_ids: HashMap::new(),
codec,
};
let converter = SeriesConverter {
value_id,
override_sequence: None,
};
let mut sorter = TimeSeriesSorter::new(encoder);
let mut reader = TimeSeriesParquetReader::new(operator, path).await?;
while let Some(series) = reader.next_series().await? {
sorter.push(series).await?;
}
let mut series = sorter.sort();
series.reverse();
Ok(Self { series, converter })
}
/// Sets the sequence number of the batch.
/// Otherwise, the sequence number will be 0.
pub fn with_sequence(mut self, sequence: SequenceNumber) -> Self {
self.converter.override_sequence = Some(sequence);
self
}
fn next_series(&mut self) -> Option<(Vec<u8>, TimeSeries)> {
self.series.pop()
}
}
#[async_trait::async_trait]
impl BatchReader for RemoteWriteReader {
async fn next_batch(&mut self) -> mito2::error::Result<Option<Batch>> {
let Some((pk, series)) = self.next_series() else {
return Ok(None);
};
self.converter.convert(pk, series).map(Some)
}
}
struct SeriesConverter {
/// Column id for the value field.
value_id: ColumnId,
/// Sequence number of the batch.
override_sequence: Option<SequenceNumber>,
}
impl SeriesConverter {
/// Builds a batch from a primary key and a time series.
fn convert(&self, primary_key: Vec<u8>, series: TimeSeries) -> mito2::error::Result<Batch> {
let num_rows = series.samples.len();
let op_types = vec![OpType::Put as u8; num_rows];
// TODO(yingwen): Should we use 0 or 1 as default?
let sequences = vec![self.override_sequence.unwrap_or(0); num_rows];
let mut timestamps = Vec::with_capacity(num_rows);
let mut values = Vec::with_capacity(num_rows);
for sample in series.samples {
timestamps.push(sample.timestamp);
values.push(sample.value);
}
let mut builder = BatchBuilder::new(primary_key);
builder
.timestamps_array(Arc::new(TimestampMillisecondArray::from(timestamps)))?
.sequences_array(Arc::new(UInt64Array::from(sequences)))?
.op_types_array(Arc::new(UInt8Array::from(op_types)))?
.push_field_array(self.value_id, Arc::new(Float64Array::from(values)))?;
builder.build()
}
}
/// Prometheus remote write parquet reader.
pub struct TimeSeriesParquetReader {
path: String,
stream: ParquetRecordBatchStream<AsyncReader>,
/// Is the stream EOF.
eof: bool,
/// Current binary array.
array: Option<BinaryArray>,
/// Current row in the record batch.
/// Only valid when the record batch is Some.
row_index: usize,
}
impl TimeSeriesParquetReader {
/// Creates a new instance of `TimeSeriesParquetReader`.
pub async fn new(object_store: ObjectStore, path: &str) -> Result<Self> {
let reader = object_store
.reader_with(path)
.await
.context(ObjectStoreSnafu)?;
let content_len = object_store
.stat(path)
.await
.context(ObjectStoreSnafu)?
.content_length();
let reader = AsyncReader::new(reader, content_len).with_prefetch_footer_size(512 * 1024);
let stream = ParquetRecordBatchStreamBuilder::new(reader)
.await
.context(ReadParquetSnafu { path })
.context(MitoSnafu)?
.build()
.context(ReadParquetSnafu { path })
.context(MitoSnafu)?;
Ok(Self {
path: path.to_string(),
stream,
eof: false,
array: None,
row_index: 0,
})
}
/// Reads the next timeseries from the reader.
pub async fn next_series(&mut self) -> Result<Option<TimeSeries>> {
if self.eof {
return Ok(None);
}
if let Some(array) = &self.array {
if self.row_index >= array.len() {
self.row_index = 0;
self.array = None;
}
}
if self.array.is_none() {
let Some(record_batch) = self
.stream
.next()
.await
.transpose()
.context(ReadParquetSnafu { path: &self.path })
.context(MitoSnafu)?
else {
self.eof = true;
return Ok(None);
};
let array = record_batch
.column(0)
.as_any()
.downcast_ref::<BinaryArray>()
.context(MissingColumnSnafu {
column_name: "remote write json column",
})?
.clone();
assert!(!array.is_empty());
self.array = Some(array);
}
let array = self.array.as_ref().unwrap();
let value = array.value(self.row_index);
let time_series = TimeSeries::decode(value).context(DecodeSnafu)?;
self.row_index += 1;
Ok(Some(time_series))
}
}
/// Encoder to encode labels into primary key.
struct PrimaryKeyEncoder {
/// Catalog name.
catalog: String,
/// Schema name.
schema: String,
/// The metadata of the physical region.
metadata: RegionMetadataRef,
/// Helper to get table metadata.
table_helper: TableMetadataHelper,
/// Cached table name to table id.
table_ids: HashMap<String, TableId>,
/// Primary key encoder.
codec: SparsePrimaryKeyCodec,
}
impl PrimaryKeyEncoder {
/// Encodes the primary key for the given labels.
/// It'll sort the labels by name before encoding.
async fn encode_primary_key(
&mut self,
labels: &mut Vec<Label>,
key_buf: &mut Vec<u8>,
) -> Result<()> {
if !labels.is_sorted_by(|left, right| left.name <= right.name) {
labels.sort_unstable_by(|left, right| left.name.cmp(&right.name));
}
// Gets the table id from the label.
let name_label = labels
.iter()
.find(|label| label.name == METRIC_NAME_LABEL)
.context(MissingMetricNameSnafu)?;
let table_id = match self.table_ids.get(&name_label.value) {
Some(id) => *id,
None => {
let table_info = self
.table_helper
.get_table(&self.catalog, &self.schema, &name_label.value)
.await?
.context(MissingTableSnafu {
table_name: &name_label.value,
})?;
let id = table_info.table_info.ident.table_id;
self.table_ids.insert(name_label.value.clone(), id);
id
}
};
// Computes the tsid for the given labels.
let mut generator = TsidGenerator::default();
for label in &*labels {
if label.name != METRIC_NAME_LABEL {
generator.write_label(&label.name, &label.value);
}
}
let tsid = generator.finish();
key_buf.clear();
let internal_columns = [
(ReservedColumnId::table_id(), ValueRef::UInt32(table_id)),
(ReservedColumnId::tsid(), ValueRef::UInt64(tsid)),
];
self.codec
.encode_to_vec(internal_columns.into_iter(), key_buf)
.context(MitoSnafu)?;
let label_iter = labels.iter().filter_map(|label| {
if label.name == METRIC_NAME_LABEL {
return None;
}
let column_id = self.metadata.column_by_name(&label.name)?.column_id;
Some((column_id, ValueRef::String(&label.value)))
});
self.codec
.encode_to_vec(label_iter, key_buf)
.context(MitoSnafu)?;
Ok(())
}
}
/// Prom timeseries sorter.
/// Sorts timeseries by the primary key.
struct TimeSeriesSorter {
/// Timeseries to sort.
series: Vec<(Vec<u8>, TimeSeries)>,
/// Key encoder.
encoder: PrimaryKeyEncoder,
}
impl TimeSeriesSorter {
/// Creates a new sorter.
fn new(encoder: PrimaryKeyEncoder) -> Self {
Self {
series: Vec::new(),
encoder,
}
}
/// Push a timeseries to the sorter.
async fn push(&mut self, mut series: TimeSeries) -> Result<()> {
let mut key_buf = Vec::new();
self.encoder
.encode_primary_key(&mut series.labels, &mut key_buf)
.await?;
self.series.push((key_buf, series));
Ok(())
}
/// Sorts the requests.
/// Returns the sorted timeseries and the primary key.
fn sort(&mut self) -> Vec<(Vec<u8>, TimeSeries)> {
self.series
.sort_unstable_by(|left, right| left.0.cmp(&right.0));
std::mem::take(&mut self.series)
}
}

View File

@@ -0,0 +1,78 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Utilities to get table metadata.
use std::sync::Arc;
use catalog::kvbackend::MetaKvBackend;
use common_meta::key::table_info::TableInfoValue;
use common_meta::key::table_name::TableNameKey;
use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
use meta_client::{MetaClientOptions, MetaClientType};
use snafu::ResultExt;
use crate::error::{MetaClientSnafu, MetaSnafu, Result};
#[derive(Clone)]
pub struct TableMetadataHelper {
table_metadata_manager: TableMetadataManagerRef,
}
impl TableMetadataHelper {
pub async fn new(meta_options: &MetaClientOptions) -> Result<Self> {
let backend = build_kv_backend(meta_options).await?;
let table_metadata_manager = Arc::new(TableMetadataManager::new(Arc::new(backend)));
Ok(Self {
table_metadata_manager,
})
}
/// Get table info.
pub async fn get_table(
&self,
catalog: &str,
schema: &str,
table: &str,
) -> Result<Option<TableInfoValue>> {
let table_name = TableNameKey::new(catalog, schema, table);
let Some(table_id) = self
.table_metadata_manager
.table_name_manager()
.get(table_name)
.await
.context(MetaSnafu)?
.map(|v| v.table_id())
else {
return Ok(None);
};
let value = self
.table_metadata_manager
.table_info_manager()
.get(table_id)
.await
.context(MetaSnafu)?
.map(|v| v.into_inner());
Ok(value)
}
}
async fn build_kv_backend(meta_options: &MetaClientOptions) -> Result<MetaKvBackend> {
let meta_client = meta_client::create_meta_client(MetaClientType::Frontend, meta_options)
.await
.context(MetaClientSnafu)?;
Ok(MetaKvBackend::new(meta_client))
}

View File

@@ -0,0 +1,129 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Utilities for writing SST files.
use std::sync::Arc;
use mito2::access_layer::{
AccessLayer, AccessLayerRef, OperationType, SstInfoArray, SstWriteRequest,
};
use mito2::cache::CacheManager;
use mito2::config::MitoConfig;
use mito2::error::Result;
use mito2::read::Source;
use mito2::region::options::RegionOptions;
use mito2::sst::index::intermediate::IntermediateManager;
use mito2::sst::index::puffin_manager::PuffinManagerFactory;
use mito2::sst::parquet::WriteOptions;
use object_store::manager::ObjectStoreManagerRef;
use store_api::metadata::RegionMetadataRef;
/// A writer that can create multiple SST files for a region.
pub struct RegionWriter {
/// Mito engine config.
config: Arc<MitoConfig>,
/// Metadata of the region.
metadata: RegionMetadataRef,
/// Options of the region.
region_options: RegionOptions,
/// SST access layer.
access_layer: AccessLayerRef,
}
impl RegionWriter {
/// Writes data from a source to SST files.
pub async fn write_sst(
&self,
source: Source,
write_opts: &WriteOptions,
) -> Result<SstInfoArray> {
let request = SstWriteRequest {
op_type: OperationType::Flush,
metadata: self.metadata.clone(),
source,
cache_manager: Arc::new(CacheManager::default()),
storage: None,
max_sequence: None,
index_options: self.region_options.index_options.clone(),
inverted_index_config: self.config.inverted_index.clone(),
fulltext_index_config: self.config.fulltext_index.clone(),
bloom_filter_index_config: self.config.bloom_filter_index.clone(),
};
self.access_layer.write_sst(request, write_opts).await
}
}
/// Creator to create [`RegionWriter`] for different regions.
pub struct RegionWriterBuilder {
/// Mito engine config.
config: Arc<MitoConfig>,
/// Object stores.
object_store_manager: ObjectStoreManagerRef,
puffin_manager_factory: PuffinManagerFactory,
intermediate_manager: IntermediateManager,
}
impl RegionWriterBuilder {
/// Create a new [`RegionContextCreator`].
pub async fn new(
config: Arc<MitoConfig>,
object_store_manager: ObjectStoreManagerRef,
) -> Result<Self> {
let puffin_manager_factory = PuffinManagerFactory::new(
&config.index.aux_path,
config.index.staging_size.as_bytes(),
Some(config.index.write_buffer_size.as_bytes() as _),
config.index.staging_ttl,
)
.await?;
let intermediate_manager = IntermediateManager::init_fs(&config.index.aux_path)
.await?
.with_buffer_size(Some(config.index.write_buffer_size.as_bytes() as _));
Ok(Self {
config,
object_store_manager,
puffin_manager_factory,
intermediate_manager,
})
}
/// Builds a [`RegionWriter`] for the given region directory.
pub async fn build(
&self,
metadata: RegionMetadataRef,
region_dir: &str,
region_options: RegionOptions,
) -> Result<RegionWriter> {
let object_store = mito2::region::opener::get_object_store(
&region_options.storage,
&self.object_store_manager,
)?;
let access_layer = Arc::new(AccessLayer::new(
region_dir,
object_store,
self.puffin_manager_factory.clone(),
self.intermediate_manager.clone(),
));
Ok(RegionWriter {
config: self.config.clone(),
metadata,
region_options,
access_layer,
})
}
}