Compare commits

..

5 Commits

Author SHA1 Message Date
Weny Xu
2ae20daa62 feat: add sync region instruction for repartition procedure (#7562)
* feat: add sync region instruction for repartition procedure

This commit introduces a new sync region instruction and integrates it
into the repartition procedure flow, specifically for metric engine tables.

Changes:
- Add SyncRegion instruction type and SyncRegionsReply in instruction.rs
- Implement SyncRegionHandler in datanode to handle sync region requests
- Add SyncRegion state in repartition procedure to sync newly allocated regions
- Integrate sync region step after enter_staging_region for metric engine tables
- Add sync_region flag and allocated_region_ids to PersistentContext
- Make SyncRegionFromRequest serializable for instruction transmission
- Add test utilities and mock support for sync region operations

The sync region step is conditionally executed based on the table engine type,
ensuring that newly allocated regions in metric engine tables are properly
synced from their source regions before proceeding with manifest remapping.

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: add logs

Signed-off-by: WenyXu <wenymedia@gmail.com>

* feat(repartition): improve staging region handling and support metric engine repartition
- Reorder sync region flow: move SyncRegion from EnterStagingRegion to RepartitionStart to sync before applying staging
- Add ExitStaging metadata update state to properly clear staging leader info after repartition completes
- Update build_template_from_raw_table_info to optionally skip metric engine internal columns when creating region requests
- Fix region state transition: set_dropping now expects specific state (Staging or Writable) for proper validation
- Adjust region drop and copy handlers to handle staging regions correctly
- Add comprehensive test cases for metric engine SPLIT/MERGE partition operations on physical tables with logical tables
- Improve logging for table route updates, region drops, and repartition operations

Signed-off-by: WenyXu <wenymedia@gmail.com>

* refactor: removes code duplication

Signed-off-by: WenyXu <wenymedia@gmail.com>

* fix: update result

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: refine comments

Signed-off-by: WenyXu <wenymedia@gmail.com>

* feat: add error strategy support for flush region and flush pending deallocate regions

- **Add `ErrorStrategy` enum** in `procedure/utils.rs`:
  - Supports `Ignore` and `Retry` strategies for error handling
  - Refactor `flush_region` to accept `error_strategy` parameter
  - Extract `handle_flush_region_reply` helper function for better code organization

- **Add pending deallocate region support**:
  - Add `pending_deallocate_region_ids` field to `PersistentContext`
  - Implement `flush_pending_deallocate_regions` in `EnterStagingRegion` state
  - Flush pending deallocate regions before entering staging regions to ensure data consistency

- **Update error handling**:
  - `flush_leader_region`: Use `ErrorStrategy::Ignore` to skip unreachable datanodes
  - `sync_region`: Use `ErrorStrategy::Retry` for critical operations
  - `enter_staging_region`: Use `ErrorStrategy::Retry` when flushing pending deallocate regions

This change improves the robustness of the repartition procedure by:
1. Providing flexible error handling strategies for flush operations
2. Ensuring pending deallocate regions are properly flushed before repartitioning
3. Preventing data inconsistency during region migration

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions from CR

Signed-off-by: WenyXu <wenymedia@gmail.com>

* fix: compile

Signed-off-by: WenyXu <wenymedia@gmail.com>

---------

Signed-off-by: WenyXu <wenymedia@gmail.com>
2026-01-15 04:52:57 +00:00
LFC
e64c31e59a chore: upgrade DataFusion family (#7558)
* chore: upgrade DataFusion family

Signed-off-by: luofucong <luofc@foxmail.com>

* use main proto

Signed-off-by: luofucong <luofc@foxmail.com>

* fix ci

Signed-off-by: luofucong <luofc@foxmail.com>

---------

Signed-off-by: luofucong <luofc@foxmail.com>
2026-01-14 14:02:31 +00:00
Ruihang Xia
a5cb0116a2 perf: avoid boundary checks on accessing array items (#7570)
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2026-01-14 12:56:39 +00:00
Ruihang Xia
170f94fc08 feat: enable pruning for manipulate plans (#7565)
* feat: enable pruning for manipulate plans

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* apply to other plans and add sqlness case

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix scalar manipulate and histogram fold for missing some columns

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* don't drop every columns

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* remove unrelated part

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2026-01-14 08:32:51 +00:00
Yingwen
1c9aa59317 style: remove unused imports (#7567)
* style: remove unused imports

Signed-off-by: evenyag <realevenyag@gmail.com>

* style: import only in test

Signed-off-by: evenyag <realevenyag@gmail.com>

---------

Signed-off-by: evenyag <realevenyag@gmail.com>
2026-01-14 07:59:40 +00:00
153 changed files with 4496 additions and 1869 deletions

1555
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -100,13 +100,13 @@ rust.unexpected_cfgs = { level = "warn", check-cfg = ['cfg(tokio_unstable)'] }
# See for more detaiils: https://github.com/rust-lang/cargo/issues/11329
ahash = { version = "0.8", features = ["compile-time-rng"] }
aquamarine = "0.6"
arrow = { version = "56.2", features = ["prettyprint"] }
arrow-array = { version = "56.2", default-features = false, features = ["chrono-tz"] }
arrow-buffer = "56.2"
arrow-cast = "56.2"
arrow-flight = "56.2"
arrow-ipc = { version = "56.2", default-features = false, features = ["lz4", "zstd"] }
arrow-schema = { version = "56.2", features = ["serde"] }
arrow = { version = "57.0", features = ["prettyprint"] }
arrow-array = { version = "57.0", default-features = false, features = ["chrono-tz"] }
arrow-buffer = "57.0"
arrow-cast = "57.0"
arrow-flight = "57.0"
arrow-ipc = { version = "57.0", default-features = false, features = ["lz4", "zstd"] }
arrow-schema = { version = "57.0", features = ["serde"] }
async-stream = "0.3"
async-trait = "0.1"
# Remember to update axum-extra, axum-macros when updating axum
@@ -120,38 +120,39 @@ bitflags = "2.4.1"
bytemuck = "1.12"
bytes = { version = "1.7", features = ["serde"] }
chrono = { version = "0.4", features = ["serde"] }
chrono-tz = { version = "0.10.1", features = ["case-insensitive"] }
chrono-tz = { version = "0.10", features = ["case-insensitive"] }
clap = { version = "4.4", features = ["derive"] }
config = "0.13.0"
const_format = "0.2"
crossbeam-utils = "0.8"
dashmap = "6.1"
datafusion = "50"
datafusion-common = "50"
datafusion-expr = "50"
datafusion-functions = "50"
datafusion-functions-aggregate-common = "50"
datafusion-optimizer = "50"
datafusion-orc = "0.5"
datafusion-pg-catalog = "0.12.3"
datafusion-physical-expr = "50"
datafusion-physical-plan = "50"
datafusion-sql = "50"
datafusion-substrait = "50"
datafusion = "51.0"
datafusion-common = "51.0"
datafusion-datasource = "51.0"
datafusion-expr = "51.0"
datafusion-functions = "51.0"
datafusion-functions-aggregate-common = "51.0"
datafusion-optimizer = "51.0"
datafusion-orc = { git = "https://github.com/GreptimeTeam/datafusion-orc.git", rev = "35f2e04bf81f2ab7b6f86c0450d6a77b7098d43e" }
datafusion-pg-catalog = "0.13"
datafusion-physical-expr = "51.0"
datafusion-physical-plan = "51.0"
datafusion-sql = "51.0"
datafusion-substrait = "51.0"
deadpool = "0.12"
deadpool-postgres = "0.14"
derive_builder = "0.20"
derive_more = { version = "2.1", features = ["full"] }
dotenv = "0.15"
either = "1.15"
etcd-client = { version = "0.16.1", features = [
etcd-client = { version = "0.17", features = [
"tls",
"tls-roots",
] }
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "58aeee49267fb1eafa6f9123f9d0c47dd0f62722" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "1353b0ada9e17890c7ba0e402ba29b2b57816ff1" }
hex = "0.4"
http = "1"
humantime = "2.1"
@@ -162,7 +163,7 @@ itertools = "0.14"
jsonb = { git = "https://github.com/databendlabs/jsonb.git", rev = "8c8d2fc294a39f3ff08909d60f718639cfba3875", default-features = false }
lazy_static = "1.4"
local-ip-address = "0.6"
loki-proto = { git = "https://github.com/GreptimeTeam/loki-proto.git", rev = "3b7cd33234358b18ece977bf689dc6fb760f29ab" }
loki-proto = { git = "https://github.com/GreptimeTeam/loki-proto.git", rev = "f69c8924c4babe516373e26a4118be82d976629c" }
meter-core = { git = "https://github.com/GreptimeTeam/greptime-meter.git", rev = "5618e779cf2bb4755b499c630fba4c35e91898cb" }
mockall = "0.13"
moka = "0.12"
@@ -172,7 +173,7 @@ notify = "8.0"
num_cpus = "1.16"
object_store_opendal = "0.54"
once_cell = "1.18"
opentelemetry-proto = { version = "0.30", features = [
opentelemetry-proto = { version = "0.31", features = [
"gen-tonic",
"metrics",
"trace",
@@ -180,18 +181,18 @@ opentelemetry-proto = { version = "0.30", features = [
"logs",
] }
ordered-float = { version = "4.3", features = ["serde"] }
otel-arrow-rust = { git = "https://github.com/GreptimeTeam/otel-arrow", rev = "2d64b7c0fa95642028a8205b36fe9ea0b023ec59", features = [
otel-arrow-rust = { git = "https://github.com/GreptimeTeam/otel-arrow", rev = "5da284414e9b14f678344b51e5292229e4b5f8d2", features = [
"server",
] }
parking_lot = "0.12"
parquet = { version = "56.2", default-features = false, features = ["arrow", "async", "object_store"] }
parquet = { version = "57.0", default-features = false, features = ["arrow", "async", "object_store"] }
paste = "1.0"
pin-project = "1.0"
pretty_assertions = "1.4.0"
prometheus = { version = "0.13.3", features = ["process"] }
promql-parser = { version = "0.7.1", features = ["ser"] }
prost = { version = "0.13", features = ["no-recursion-limit"] }
prost-types = "0.13"
prost = { version = "0.14", features = ["no-recursion-limit"] }
prost-types = "0.14"
raft-engine = { version = "0.4.1", default-features = false }
rand = "0.9"
ratelimit = "0.10"
@@ -223,7 +224,7 @@ simd-json = "0.15"
similar-asserts = "1.6.0"
smallvec = { version = "1", features = ["serde"] }
snafu = "0.8"
sqlparser = { version = "0.58.0", default-features = false, features = ["std", "visitor", "serde"] }
sqlparser = { version = "0.59.0", default-features = false, features = ["std", "visitor", "serde"] }
sqlx = { version = "0.8", default-features = false, features = ["any", "macros", "json", "runtime-tokio-rustls"] }
strum = { version = "0.27", features = ["derive"] }
sysinfo = "0.33"
@@ -234,7 +235,7 @@ tokio-rustls = { version = "0.26.2", default-features = false }
tokio-stream = "0.1"
tokio-util = { version = "0.7", features = ["io-util", "compat"] }
toml = "0.8.8"
tonic = { version = "0.13", features = ["tls-ring", "gzip", "zstd"] }
tonic = { version = "0.14", features = ["tls-ring", "gzip", "zstd"] }
tower = "0.5"
tower-http = "0.6"
tracing = "0.1"
@@ -322,19 +323,20 @@ git = "https://github.com/GreptimeTeam/greptime-meter.git"
rev = "5618e779cf2bb4755b499c630fba4c35e91898cb"
[patch.crates-io]
datafusion = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "fd4b2abcf3c3e43e94951bda452c9fd35243aab0" }
datafusion-common = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "fd4b2abcf3c3e43e94951bda452c9fd35243aab0" }
datafusion-expr = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "fd4b2abcf3c3e43e94951bda452c9fd35243aab0" }
datafusion-functions = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "fd4b2abcf3c3e43e94951bda452c9fd35243aab0" }
datafusion-functions-aggregate-common = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "fd4b2abcf3c3e43e94951bda452c9fd35243aab0" }
datafusion-optimizer = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "fd4b2abcf3c3e43e94951bda452c9fd35243aab0" }
datafusion-physical-expr = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "fd4b2abcf3c3e43e94951bda452c9fd35243aab0" }
datafusion-physical-expr-common = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "fd4b2abcf3c3e43e94951bda452c9fd35243aab0" }
datafusion-physical-plan = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "fd4b2abcf3c3e43e94951bda452c9fd35243aab0" }
datafusion-datasource = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "fd4b2abcf3c3e43e94951bda452c9fd35243aab0" }
datafusion-sql = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "fd4b2abcf3c3e43e94951bda452c9fd35243aab0" }
datafusion-substrait = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "fd4b2abcf3c3e43e94951bda452c9fd35243aab0" }
sqlparser = { git = "https://github.com/GreptimeTeam/sqlparser-rs.git", rev = "a0ce2bc6eb3e804532932f39833c32432f5c9a39" } # branch = "v0.58.x"
datafusion = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "7143b2fc4492a7970774583ed0997a459f3e5c05" }
datafusion-common = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "7143b2fc4492a7970774583ed0997a459f3e5c05" }
datafusion-expr = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "7143b2fc4492a7970774583ed0997a459f3e5c05" }
datafusion-functions = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "7143b2fc4492a7970774583ed0997a459f3e5c05" }
datafusion-functions-aggregate-common = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "7143b2fc4492a7970774583ed0997a459f3e5c05" }
datafusion-optimizer = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "7143b2fc4492a7970774583ed0997a459f3e5c05" }
datafusion-physical-expr = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "7143b2fc4492a7970774583ed0997a459f3e5c05" }
datafusion-physical-expr-common = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "7143b2fc4492a7970774583ed0997a459f3e5c05" }
datafusion-physical-plan = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "7143b2fc4492a7970774583ed0997a459f3e5c05" }
datafusion-pg-catalog = { git = "https://github.com/GreptimeTeam/datafusion-postgres.git", rev = "74ac8e2806be6de91ff192b97f64735392539d16" }
datafusion-datasource = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "7143b2fc4492a7970774583ed0997a459f3e5c05" }
datafusion-sql = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "7143b2fc4492a7970774583ed0997a459f3e5c05" }
datafusion-substrait = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "7143b2fc4492a7970774583ed0997a459f3e5c05" }
sqlparser = { git = "https://github.com/GreptimeTeam/sqlparser-rs.git", rev = "d7d95a44889e099e32d78e9bad9bc00598faef28" } # on branch v0.59.x
[profile.release]
debug = 1

View File

@@ -96,21 +96,6 @@ curl -X POST "localhost:4000/debug/prof/mem?output=proto" > greptime.pprof
You can periodically dump profiling data and compare them to find the delta memory usage.
### Symbolicate external dump files
If you have jemalloc heap dump files generated externally (e.g., via `MALLOC_CONF="prof:true,prof_prefix:jeprof.out,lg_prof_interval:26"` or `prof_gdump`), you can upload them to a running GreptimeDB instance for symbolication and flamegraph generation:
```bash
# Upload a jemalloc heap dump file and get a symbolicated flamegraph
curl -X POST --data-binary @/path/to/jeprof.out.12345.0.i0.heap \
localhost:4000/debug/prof/mem/symbol > flamegraph.svg
```
This is useful when:
- You collected heap dumps from a production environment
- You want to symbolicate dumps on a machine with debug symbols
- You need to analyze dumps generated by jemalloc's automatic dump mechanisms
## Analyze profiling data with flamegraph
To create flamegraph according to dumped profiling data:

View File

@@ -35,6 +35,7 @@ use mito2::sst::parquet::reader::ParquetReaderBuilder;
use mito2::sst::parquet::{PARQUET_METADATA_KEY, WriteOptions};
use mito2::worker::write_cache_from_config;
use object_store::ObjectStore;
use parquet::file::metadata::{FooterTail, KeyValue};
use regex::Regex;
use snafu::OptionExt;
use store_api::metadata::{RegionMetadata, RegionMetadataRef};
@@ -463,7 +464,6 @@ fn extract_region_metadata(
file_path: &str,
meta: &parquet::file::metadata::ParquetMetaData,
) -> error::Result<RegionMetadataRef> {
use parquet::format::KeyValue;
let kvs: Option<&Vec<KeyValue>> = meta.file_metadata().key_value_metadata();
let Some(kvs) = kvs else {
return Err(error::IllegalConfigSnafu {
@@ -608,7 +608,7 @@ async fn load_parquet_metadata(
let buffer_len = buffer.len();
let mut footer = [0; 8];
footer.copy_from_slice(&buffer[buffer_len - FOOTER_SIZE..]);
let footer = ParquetMetaDataReader::decode_footer_tail(&footer)?;
let footer = FooterTail::try_new(&footer)?;
let metadata_len = footer.metadata_length() as u64;
if actual_size - (FOOTER_SIZE as u64) < metadata_len {
return Err("invalid footer/metadata length".into());

View File

@@ -27,13 +27,14 @@ common-recordbatch.workspace = true
common-runtime.workspace = true
common-telemetry.workspace = true
datafusion.workspace = true
datafusion-datasource.workspace = true
datafusion-orc.workspace = true
datatypes.workspace = true
futures.workspace = true
lazy_static.workspace = true
object-store.workspace = true
object_store_opendal.workspace = true
orc-rust = { version = "0.6.3", default-features = false, features = ["async"] }
orc-rust = { version = "0.7", default-features = false, features = ["async"] }
parquet.workspace = true
paste.workspace = true
regex.workspace = true

View File

@@ -14,7 +14,7 @@
use arrow::record_batch::RecordBatch;
use async_trait::async_trait;
use datafusion::parquet::format::FileMetaData;
use parquet::file::metadata::ParquetMetaData;
use crate::error::Result;
@@ -24,5 +24,5 @@ pub trait DfRecordBatchEncoder {
#[async_trait]
pub trait ArrowWriterCloser {
async fn close(mut self) -> Result<FileMetaData>;
async fn close(mut self) -> Result<ParquetMetaData>;
}

View File

@@ -40,7 +40,6 @@ use datafusion::datasource::physical_plan::{
use datafusion::error::{DataFusionError, Result as DataFusionResult};
use datafusion::physical_plan::SendableRecordBatchStream;
use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
use datatypes::arrow::datatypes::SchemaRef;
use futures::{StreamExt, TryStreamExt};
use object_store::ObjectStore;
use object_store_opendal::OpendalStore;
@@ -303,24 +302,20 @@ where
pub async fn file_to_stream(
store: &ObjectStore,
filename: &str,
file_schema: SchemaRef,
file_source: Arc<dyn FileSource>,
projection: Option<Vec<usize>>,
compression_type: CompressionType,
) -> Result<DfSendableRecordBatchStream> {
let df_compression: DfCompressionType = compression_type.into();
let config = FileScanConfigBuilder::new(
ObjectStoreUrl::local_filesystem(),
file_schema,
file_source.clone(),
)
.with_file_group(FileGroup::new(vec![PartitionedFile::new(
filename.to_string(),
0,
)]))
.with_projection(projection)
.with_file_compression_type(df_compression)
.build();
let config =
FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), file_source.clone())
.with_file_group(FileGroup::new(vec![PartitionedFile::new(
filename.to_string(),
0,
)]))
.with_projection_indices(projection)
.with_file_compression_type(df_compression)
.build();
let store = Arc::new(OpendalStore::new(store.clone()));
let file_opener = file_source

View File

@@ -440,14 +440,11 @@ mod tests {
.await
.unwrap(),
);
let csv_source = CsvSource::new(true, b',', b'"')
.with_schema(schema.clone())
.with_batch_size(8192);
let csv_source = CsvSource::new(schema).with_batch_size(8192);
let stream = file_to_stream(
&store,
compressed_file_path_str,
schema.clone(),
csv_source.clone(),
None,
compression_type,

View File

@@ -347,14 +347,11 @@ mod tests {
.await
.unwrap(),
);
let json_source = JsonSource::new()
.with_schema(schema.clone())
.with_batch_size(8192);
let json_source = JsonSource::new(schema).with_batch_size(8192);
let stream = file_to_stream(
&store,
compressed_file_path_str,
schema.clone(),
json_source.clone(),
None,
compression_type,

View File

@@ -18,15 +18,15 @@ use std::sync::Arc;
use arrow::record_batch::RecordBatch;
use arrow_schema::Schema;
use async_trait::async_trait;
use datafusion::datasource::physical_plan::{FileMeta, ParquetFileReaderFactory};
use datafusion::datasource::physical_plan::ParquetFileReaderFactory;
use datafusion::error::Result as DatafusionResult;
use datafusion::parquet::arrow::async_reader::AsyncFileReader;
use datafusion::parquet::arrow::{ArrowWriter, parquet_to_arrow_schema};
use datafusion::parquet::errors::{ParquetError, Result as ParquetResult};
use datafusion::parquet::file::metadata::ParquetMetaData;
use datafusion::parquet::format::FileMetaData;
use datafusion::physical_plan::SendableRecordBatchStream;
use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
use datafusion_datasource::PartitionedFile;
use datatypes::schema::SchemaRef;
use futures::StreamExt;
use futures::future::BoxFuture;
@@ -100,11 +100,11 @@ impl ParquetFileReaderFactory for DefaultParquetFileReaderFactory {
fn create_reader(
&self,
_partition_index: usize,
file_meta: FileMeta,
partitioned_file: PartitionedFile,
_metadata_size_hint: Option<usize>,
_metrics: &ExecutionPlanMetricsSet,
) -> DatafusionResult<Box<dyn AsyncFileReader + Send>> {
let path = file_meta.location().to_string();
let path = partitioned_file.path().to_string();
let object_store = self.object_store.clone();
Ok(Box::new(LazyParquetFileReader::new(object_store, path)))
@@ -180,7 +180,7 @@ impl DfRecordBatchEncoder for ArrowWriter<SharedBuffer> {
#[async_trait]
impl ArrowWriterCloser for ArrowWriter<SharedBuffer> {
async fn close(self) -> Result<FileMetaData> {
async fn close(self) -> Result<ParquetMetaData> {
self.close().context(error::EncodeRecordBatchSnafu)
}
}

View File

@@ -67,14 +67,14 @@ impl Test<'_> {
async fn test_json_opener() {
let store = test_store("/");
let schema = test_basic_schema();
let file_source = Arc::new(JsonSource::new()).with_batch_size(test_util::TEST_BATCH_SIZE);
let file_source = Arc::new(JsonSource::new(schema)).with_batch_size(test_util::TEST_BATCH_SIZE);
let path = &find_workspace_path("/src/common/datasource/tests/json/basic.json")
.display()
.to_string();
let tests = [
Test {
config: scan_config(schema.clone(), None, path, file_source.clone()),
config: scan_config(None, path, file_source.clone()),
file_source: file_source.clone(),
expected: vec![
"+-----+-------+",
@@ -87,7 +87,7 @@ async fn test_json_opener() {
],
},
Test {
config: scan_config(schema, Some(1), path, file_source.clone()),
config: scan_config(Some(1), path, file_source.clone()),
file_source,
expected: vec![
"+-----+------+",
@@ -112,13 +112,11 @@ async fn test_csv_opener() {
.display()
.to_string();
let file_source = CsvSource::new(true, b',', b'"')
.with_batch_size(test_util::TEST_BATCH_SIZE)
.with_schema(schema.clone());
let file_source = CsvSource::new(schema).with_batch_size(test_util::TEST_BATCH_SIZE);
let tests = [
Test {
config: scan_config(schema.clone(), None, path, file_source.clone()),
config: scan_config(None, path, file_source.clone()),
file_source: file_source.clone(),
expected: vec![
"+-----+-------+---------------------+----------+------------+",
@@ -131,7 +129,7 @@ async fn test_csv_opener() {
],
},
Test {
config: scan_config(schema, Some(1), path, file_source.clone()),
config: scan_config(Some(1), path, file_source.clone()),
file_source,
expected: vec![
"+-----+------+---------------------+----------+------------+",
@@ -158,10 +156,10 @@ async fn test_parquet_exec() {
.display()
.to_string();
let parquet_source = ParquetSource::default()
let parquet_source = ParquetSource::new(schema)
.with_parquet_file_reader_factory(Arc::new(DefaultParquetFileReaderFactory::new(store)));
let config = scan_config(schema, None, path, Arc::new(parquet_source));
let config = scan_config(None, path, Arc::new(parquet_source));
let exec = DataSourceExec::from_data_source(config);
let ctx = SessionContext::new();
@@ -197,11 +195,11 @@ async fn test_orc_opener() {
let store = test_store("/");
let schema = Arc::new(OrcFormat.infer_schema(&store, path).await.unwrap());
let file_source = Arc::new(OrcSource::default());
let file_source = Arc::new(OrcSource::new(schema.into()));
let tests = [
Test {
config: scan_config(schema.clone(), None, path, file_source.clone()),
config: scan_config(None, path, file_source.clone()),
file_source: file_source.clone(),
expected: vec![
"+----------+-----+-------+------------+-----+-----+-------+--------------------+------------------------+-----------+---------------+------------+----------------+---------------+-------------------+--------------+---------------+---------------+----------------------------+-------------+",
@@ -216,7 +214,7 @@ async fn test_orc_opener() {
],
},
Test {
config: scan_config(schema.clone(), Some(1), path, file_source.clone()),
config: scan_config(Some(1), path, file_source.clone()),
file_source,
expected: vec![
"+----------+-----+------+------------+---+-----+-------+--------------------+------------------------+-----------+---------------+------------+----------------+---------------+-------------------+--------------+---------------+---------------+-------------------------+-------------+",

View File

@@ -80,7 +80,6 @@ pub fn csv_basic_schema() -> SchemaRef {
}
pub(crate) fn scan_config(
file_schema: SchemaRef,
limit: Option<usize>,
filename: &str,
file_source: Arc<dyn FileSource>,
@@ -89,7 +88,7 @@ pub(crate) fn scan_config(
let filename = &filename.replace('\\', "/");
let file_group = FileGroup::new(vec![PartitionedFile::new(filename.clone(), 4096)]);
FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), file_schema, file_source)
FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), file_source)
.with_file_group(file_group)
.with_limit(limit)
.build()
@@ -109,7 +108,7 @@ pub async fn setup_stream_to_json_test(origin_path: &str, threshold: impl Fn(usi
let size = store.read(origin_path).await.unwrap().len();
let config = scan_config(schema, None, origin_path, Arc::new(JsonSource::new()));
let config = scan_config(None, origin_path, Arc::new(JsonSource::new(schema)));
let stream = FileStream::new(
&config,
0,
@@ -151,10 +150,8 @@ pub async fn setup_stream_to_csv_test(
let schema = csv_basic_schema();
let csv_source = CsvSource::new(true, b',', b'"')
.with_schema(schema.clone())
.with_batch_size(TEST_BATCH_SIZE);
let config = scan_config(schema, None, origin_path, csv_source.clone());
let csv_source = CsvSource::new(schema).with_batch_size(TEST_BATCH_SIZE);
let config = scan_config(None, origin_path, csv_source.clone());
let size = store.read(origin_path).await.unwrap().len();
let csv_opener = csv_source.create_file_opener(

View File

@@ -104,7 +104,8 @@ mod tests {
assert!(matches!(f.signature(),
datafusion_expr::Signature {
type_signature: datafusion_expr::TypeSignature::Uniform(1, valid_types),
volatility: datafusion_expr::Volatility::Immutable
volatility: datafusion_expr::Volatility::Immutable,
..
} if valid_types == &ConcreteDataType::numerics().into_iter().map(|dt| { use datatypes::data_type::DataType; dt.as_arrow_type() }).collect::<Vec<_>>()));
}

View File

@@ -331,7 +331,8 @@ mod tests {
assert!(matches!(f.signature(),
datafusion_expr::Signature {
type_signature: datafusion_expr::TypeSignature::Uniform(1, valid_types),
volatility: datafusion_expr::Volatility::Immutable
volatility: datafusion_expr::Volatility::Immutable,
..
} if valid_types == &vec![ArrowDataType::Utf8]));
}

View File

@@ -145,7 +145,8 @@ mod tests {
assert!(matches!(f.signature(),
datafusion_expr::Signature {
type_signature: datafusion_expr::TypeSignature::OneOf(sigs),
volatility: datafusion_expr::Volatility::Immutable
volatility: datafusion_expr::Volatility::Immutable,
..
} if sigs.len() == 2));
}

View File

@@ -341,6 +341,7 @@ impl AggregateUDFImpl for StateWrapper {
name: acc_args.name,
is_distinct: acc_args.is_distinct,
exprs: acc_args.exprs,
expr_fields: acc_args.expr_fields,
};
self.inner.accumulator(acc_args)?
};

View File

@@ -650,7 +650,7 @@ async fn test_last_value_order_by_udaf() {
DataType::Timestamp(arrow_schema::TimeUnit::Millisecond, None),
true
), // ordering field is added to state fields too
Field::new("is_set", DataType::Boolean, true)
Field::new("last_value[last_value_is_set]", DataType::Boolean, true)
]
.into()
),
@@ -735,7 +735,7 @@ async fn test_last_value_order_by_udaf() {
DataType::Timestamp(arrow_schema::TimeUnit::Millisecond, None),
true,
),
Field::new("is_set", DataType::Boolean, true),
Field::new("last_value[last_value_is_set]", DataType::Boolean, true),
]
.into(),
vec![

View File

@@ -453,8 +453,8 @@ impl Accumulator for CountHashAccumulator {
);
};
let hash_array = inner_array.as_any().downcast_ref::<UInt64Array>().unwrap();
for i in 0..hash_array.len() {
self.values.insert(hash_array.value(i));
for &hash in hash_array.values().iter().take(hash_array.len()) {
self.values.insert(hash);
}
}
Ok(())

View File

@@ -152,9 +152,9 @@ impl DfAccumulator for JsonEncodePathAccumulator {
let lng_array = lng_array.as_primitive::<Float64Type>();
let mut coords = Vec::with_capacity(len);
for i in 0..len {
let lng = lng_array.value(i);
let lat = lat_array.value(i);
let lng_values = lng_array.values();
let lat_values = lat_array.values();
for (&lng, &lat) in lng_values.iter().zip(lat_values.iter()).take(len) {
coords.push(vec![lng, lat]);
}

View File

@@ -122,7 +122,8 @@ mod tests {
matches!(f.signature(),
Signature {
type_signature: TypeSignature::OneOf(sigs),
volatility: Volatility::Immutable
volatility: Volatility::Immutable,
..
} if sigs.len() == 15),
"{:?}",
f.signature()

View File

@@ -193,7 +193,8 @@ mod tests {
assert!(matches!(f.signature(),
Signature {
type_signature: TypeSignature::OneOf(sigs),
volatility: Volatility::Immutable
volatility: Volatility::Immutable,
..
} if sigs.len() == 6));
}

View File

@@ -120,7 +120,8 @@ mod tests {
matches!(f.signature(),
Signature {
type_signature: TypeSignature::OneOf(sigs),
volatility: Volatility::Immutable
volatility: Volatility::Immutable,
..
} if sigs.len() == 15),
"{:?}",
f.signature()

View File

@@ -25,7 +25,6 @@ use datafusion_common::arrow::array::{
};
use datafusion_common::arrow::datatypes::DataType;
use datafusion_common::{DataFusionError, Result};
use datafusion_expr::type_coercion::aggregates::STRINGS;
use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, Signature, Volatility};
use datatypes::arrow_array::{int_array_value_at_index, string_array_value_at_index};
use datatypes::json::JsonStructureSettings;
@@ -519,7 +518,7 @@ impl Default for JsonGetObject {
DataType::LargeBinary,
DataType::BinaryView,
],
STRINGS.to_vec(),
vec![DataType::UInt8, DataType::LargeUtf8, DataType::Utf8View],
),
}
}

View File

@@ -99,7 +99,8 @@ mod tests {
assert!(matches!(rate.signature(),
Signature {
type_signature: TypeSignature::Uniform(2, valid_types),
volatility: Volatility::Immutable
volatility: Volatility::Immutable,
..
} if valid_types == NUMERICS
));
let values = vec![1.0, 3.0, 6.0];

View File

@@ -208,9 +208,9 @@ fn decode_dictionary(
let mut rows = Vec::with_capacity(number_rows);
let keys = dict.keys();
for i in 0..number_rows {
let dict_index = keys.value(i) as usize;
rows.push(decoded_values[dict_index].clone());
let dict_indices = keys.values();
for &dict_index in dict_indices[..number_rows].iter() {
rows.push(decoded_values[dict_index as usize].clone());
}
Ok(rows)

View File

@@ -19,8 +19,10 @@ use datafusion_common::DataFusionError;
use datafusion_common::arrow::array::{Array, AsArray, StringViewBuilder};
use datafusion_common::arrow::compute;
use datafusion_common::arrow::datatypes::DataType;
use datafusion_expr::type_coercion::aggregates::BINARYS;
use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, Signature, TypeSignature, Volatility};
use datafusion_common::types::logical_binary;
use datafusion_expr::{
Coercion, ColumnarValue, ScalarFunctionArgs, Signature, TypeSignatureClass, Volatility,
};
use datatypes::types::vector_type_value_to_string;
use crate::function::{Function, extract_args};
@@ -35,11 +37,10 @@ pub struct VectorToStringFunction {
impl Default for VectorToStringFunction {
fn default() -> Self {
Self {
signature: Signature::one_of(
vec![
TypeSignature::Uniform(1, vec![DataType::BinaryView]),
TypeSignature::Uniform(1, BINARYS.to_vec()),
],
signature: Signature::coercible(
vec![Coercion::new_exact(TypeSignatureClass::Native(
logical_binary(),
))],
Volatility::Immutable,
),
}

View File

@@ -15,10 +15,10 @@
use std::fmt::Display;
use datafusion::arrow::datatypes::DataType;
use datafusion::logical_expr::ColumnarValue;
use datafusion::logical_expr::{Coercion, ColumnarValue, TypeSignature, TypeSignatureClass};
use datafusion_common::ScalarValue;
use datafusion_expr::type_coercion::aggregates::{BINARYS, STRINGS};
use datafusion_expr::{ScalarFunctionArgs, Signature, TypeSignature, Volatility};
use datafusion_common::types::{logical_binary, logical_string};
use datafusion_expr::{ScalarFunctionArgs, Signature, Volatility};
use nalgebra::DVectorView;
use crate::function::Function;
@@ -36,9 +36,12 @@ impl Default for ElemAvgFunction {
Self {
signature: Signature::one_of(
vec![
TypeSignature::Uniform(1, STRINGS.to_vec()),
TypeSignature::Uniform(1, BINARYS.to_vec()),
TypeSignature::Uniform(1, vec![DataType::BinaryView]),
TypeSignature::Coercible(vec![Coercion::new_exact(
TypeSignatureClass::Native(logical_binary()),
)]),
TypeSignature::Coercible(vec![Coercion::new_exact(
TypeSignatureClass::Native(logical_string()),
)]),
],
Volatility::Immutable,
),

View File

@@ -15,10 +15,10 @@
use std::fmt::Display;
use datafusion::arrow::datatypes::DataType;
use datafusion::logical_expr::ColumnarValue;
use datafusion::logical_expr_common::type_coercion::aggregates::{BINARYS, STRINGS};
use datafusion::logical_expr::{Coercion, ColumnarValue, TypeSignature, TypeSignatureClass};
use datafusion_common::ScalarValue;
use datafusion_expr::{ScalarFunctionArgs, Signature, TypeSignature, Volatility};
use datafusion_common::types::{logical_binary, logical_string};
use datafusion_expr::{ScalarFunctionArgs, Signature, Volatility};
use nalgebra::DVectorView;
use crate::function::Function;
@@ -49,9 +49,12 @@ impl Default for ElemProductFunction {
Self {
signature: Signature::one_of(
vec![
TypeSignature::Uniform(1, STRINGS.to_vec()),
TypeSignature::Uniform(1, BINARYS.to_vec()),
TypeSignature::Uniform(1, vec![DataType::BinaryView]),
TypeSignature::Coercible(vec![Coercion::new_exact(
TypeSignatureClass::Native(logical_binary()),
)]),
TypeSignature::Coercible(vec![Coercion::new_exact(
TypeSignatureClass::Native(logical_string()),
)]),
],
Volatility::Immutable,
),

View File

@@ -15,9 +15,9 @@
use std::fmt::Display;
use datafusion::arrow::datatypes::DataType;
use datafusion::logical_expr::ColumnarValue;
use datafusion::logical_expr::{Coercion, ColumnarValue, TypeSignatureClass};
use datafusion_common::ScalarValue;
use datafusion_expr::type_coercion::aggregates::{BINARYS, STRINGS};
use datafusion_common::types::{logical_binary, logical_string};
use datafusion_expr::{ScalarFunctionArgs, Signature, TypeSignature, Volatility};
use nalgebra::DVectorView;
@@ -36,9 +36,12 @@ impl Default for ElemSumFunction {
Self {
signature: Signature::one_of(
vec![
TypeSignature::Uniform(1, STRINGS.to_vec()),
TypeSignature::Uniform(1, BINARYS.to_vec()),
TypeSignature::Uniform(1, vec![DataType::BinaryView]),
TypeSignature::Coercible(vec![Coercion::new_exact(
TypeSignatureClass::Native(logical_binary()),
)]),
TypeSignature::Coercible(vec![Coercion::new_exact(
TypeSignatureClass::Native(logical_string()),
)]),
],
Volatility::Immutable,
),

View File

@@ -15,9 +15,9 @@
use std::fmt::Display;
use datafusion::arrow::datatypes::DataType;
use datafusion::logical_expr::ColumnarValue;
use datafusion::logical_expr_common::type_coercion::aggregates::{BINARYS, STRINGS};
use datafusion::logical_expr::{Coercion, ColumnarValue, TypeSignatureClass};
use datafusion_common::ScalarValue;
use datafusion_common::types::{logical_binary, logical_string};
use datafusion_expr::{ScalarFunctionArgs, Signature, TypeSignature, Volatility};
use crate::function::Function;
@@ -49,8 +49,12 @@ impl Default for VectorDimFunction {
Self {
signature: Signature::one_of(
vec![
TypeSignature::Uniform(1, STRINGS.to_vec()),
TypeSignature::Uniform(1, BINARYS.to_vec()),
TypeSignature::Coercible(vec![Coercion::new_exact(
TypeSignatureClass::Native(logical_binary()),
)]),
TypeSignature::Coercible(vec![Coercion::new_exact(
TypeSignatureClass::Native(logical_string()),
)]),
],
Volatility::Immutable,
),

View File

@@ -15,9 +15,9 @@
use std::fmt::Display;
use datafusion::arrow::datatypes::DataType;
use datafusion::logical_expr::ColumnarValue;
use datafusion::logical_expr_common::type_coercion::aggregates::{BINARYS, STRINGS};
use datafusion::logical_expr::{Coercion, ColumnarValue, TypeSignatureClass};
use datafusion_common::ScalarValue;
use datafusion_common::types::{logical_binary, logical_string};
use datafusion_expr::{ScalarFunctionArgs, Signature, TypeSignature, Volatility};
use nalgebra::DVectorView;
@@ -52,9 +52,12 @@ impl Default for VectorNormFunction {
Self {
signature: Signature::one_of(
vec![
TypeSignature::Uniform(1, STRINGS.to_vec()),
TypeSignature::Uniform(1, BINARYS.to_vec()),
TypeSignature::Uniform(1, vec![DataType::BinaryView]),
TypeSignature::Coercible(vec![Coercion::new_exact(
TypeSignatureClass::Native(logical_binary()),
)]),
TypeSignature::Coercible(vec![Coercion::new_exact(
TypeSignatureClass::Native(logical_string()),
)]),
],
Volatility::Immutable,
),

View File

@@ -106,7 +106,8 @@ mod tests {
assert!(matches!(f.signature(),
datafusion_expr::Signature {
type_signature: datafusion_expr::TypeSignature::Uniform(1, valid_types),
volatility: datafusion_expr::Volatility::Immutable
volatility: datafusion_expr::Volatility::Immutable,
..
} if valid_types == &vec![ArrowDataType::Utf8]));
}

View File

@@ -103,10 +103,11 @@ impl FlightEncoder {
FlightMessage::RecordBatch(record_batch) => {
let (encoded_dictionaries, encoded_batch) = self
.data_gen
.encoded_batch(
.encode(
&record_batch,
&mut self.dictionary_tracker,
&self.write_options,
&mut Default::default(),
)
.expect("DictionaryTracker configured above to not fail on replacement");

View File

@@ -133,23 +133,3 @@ pub fn is_gdump_active() -> Result<bool> {
// safety: PROF_GDUMP, if present, is a boolean value.
unsafe { Ok(tikv_jemalloc_ctl::raw::read::<bool>(PROF_GDUMP).context(error::ReadGdumpSnafu)?) }
}
/// Symbolicate a jeheap format dump file and return a flamegraph.
///
/// This function takes the raw content of a jemalloc heap dump file,
/// parses it using `parse_jeheap`, and generates a flamegraph SVG.
///
/// The symbolication uses the current process's memory mappings.
pub fn symbolicate_jeheap(dump_content: &[u8]) -> Result<Vec<u8>> {
let profile = BufReader::new(dump_content);
let stack_profile = parse_jeheap(profile, MAPPINGS.as_deref()).context(ParseJeHeapSnafu)?;
let mut opts = FlamegraphOptions::default();
opts.title = "symbolicated_heap".to_string();
opts.count_name = "bytes".to_string();
let flamegraph = stack_profile
.to_flamegraph(&mut opts)
.context(FlamegraphSnafu)?;
Ok(flamegraph)
}

View File

@@ -19,7 +19,7 @@ mod jemalloc;
#[cfg(not(windows))]
pub use jemalloc::{
activate_heap_profile, deactivate_heap_profile, dump_flamegraph, dump_pprof, dump_profile,
is_gdump_active, is_heap_profile_active, set_gdump_active, symbolicate_jeheap,
is_gdump_active, is_heap_profile_active, set_gdump_active,
};
#[cfg(windows)]
@@ -61,8 +61,3 @@ pub fn is_gdump_active() -> error::Result<bool> {
pub fn set_gdump_active(_: bool) -> error::Result<()> {
error::ProfilingNotSupportedSnafu.fail()
}
#[cfg(windows)]
pub fn symbolicate_jeheap(_dump_content: &[u8]) -> error::Result<Vec<u8>> {
error::ProfilingNotSupportedSnafu.fail()
}

View File

@@ -102,6 +102,6 @@ pub fn create_region_request_builder_from_raw_table_info(
raw_table_info: &RawTableInfo,
physical_table_id: TableId,
) -> Result<CreateRequestBuilder> {
let template = build_template_from_raw_table_info(raw_table_info)?;
let template = build_template_from_raw_table_info(raw_table_info, false)?;
Ok(CreateRequestBuilder::new(template, Some(physical_table_id)))
}

View File

@@ -20,7 +20,9 @@ use api::v1::region::{CreateRequest, RegionColumnDef};
use api::v1::{ColumnDef, CreateTableExpr, SemanticType};
use common_telemetry::warn;
use snafu::{OptionExt, ResultExt};
use store_api::metric_engine_consts::LOGICAL_TABLE_METADATA_KEY;
use store_api::metric_engine_consts::{
LOGICAL_TABLE_METADATA_KEY, is_metric_engine_internal_column,
};
use store_api::storage::{RegionId, RegionNumber};
use table::metadata::{RawTableInfo, TableId};
@@ -30,34 +32,45 @@ use crate::wal_provider::prepare_wal_options;
/// Constructs a [CreateRequest] based on the provided [RawTableInfo].
///
/// Note: This function is primarily intended for creating logical tables or allocating placeholder regions.
pub fn build_template_from_raw_table_info(raw_table_info: &RawTableInfo) -> Result<CreateRequest> {
pub fn build_template_from_raw_table_info(
raw_table_info: &RawTableInfo,
skip_internal_columns: bool,
) -> Result<CreateRequest> {
let primary_key_indices = &raw_table_info.meta.primary_key_indices;
let column_defs = raw_table_info
let filtered = raw_table_info
.meta
.schema
.column_schemas
.iter()
.enumerate()
.filter(|(_, c)| !skip_internal_columns || !is_metric_engine_internal_column(&c.name))
.map(|(i, c)| {
let is_primary_key = primary_key_indices.contains(&i);
let column_def = try_as_column_def(c, is_primary_key)
.context(error::ConvertColumnDefSnafu { column: &c.name })?;
Ok(RegionColumnDef {
column_def: Some(column_def),
// The column id will be overridden by the metric engine.
// So we just use the index as the column id.
column_id: i as u32,
})
Ok((
is_primary_key.then_some(i),
RegionColumnDef {
column_def: Some(column_def),
// The column id will be overridden by the metric engine.
// So we just use the index as the column id.
column_id: i as u32,
},
))
})
.collect::<Result<Vec<_>>>()?;
.collect::<Result<Vec<(Option<usize>, RegionColumnDef)>>>()?;
let (new_primary_key_indices, column_defs): (Vec<_>, Vec<_>) = filtered.into_iter().unzip();
let options = HashMap::from(&raw_table_info.meta.options);
let template = CreateRequest {
region_id: 0,
engine: raw_table_info.meta.engine.clone(),
column_defs,
primary_key: primary_key_indices.iter().map(|i| *i as u32).collect(),
primary_key: new_primary_key_indices
.iter()
.flatten()
.map(|i| *i as u32)
.collect(),
path: String::new(),
options,
partition: None,

View File

@@ -17,6 +17,7 @@ use std::fmt::{Display, Formatter};
use std::time::Duration;
use serde::{Deserialize, Deserializer, Serialize};
use store_api::region_engine::SyncRegionFromRequest;
use store_api::storage::{FileRefsManifest, GcReport, RegionId, RegionNumber};
use strum::Display;
use table::metadata::TableId;
@@ -530,6 +531,25 @@ impl Display for EnterStagingRegion {
}
}
/// Instruction payload for syncing a region from a manifest or another region.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct SyncRegion {
/// Region id to sync.
pub region_id: RegionId,
/// Request to sync the region.
pub request: SyncRegionFromRequest,
}
impl Display for SyncRegion {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
f,
"SyncRegion(region_id={}, request={:?})",
self.region_id, self.request
)
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct RemapManifest {
pub region_id: RegionId,
@@ -602,8 +622,11 @@ pub enum Instruction {
Suspend,
/// Makes regions enter staging state.
EnterStagingRegions(Vec<EnterStagingRegion>),
/// Syncs regions.
SyncRegions(Vec<SyncRegion>),
/// Remaps manifests for a region.
RemapManifest(RemapManifest),
/// Applies staging manifests for a region.
ApplyStagingManifests(Vec<ApplyStagingManifest>),
}
@@ -669,6 +692,13 @@ impl Instruction {
_ => None,
}
}
pub fn into_sync_regions(self) -> Option<Vec<SyncRegion>> {
match self {
Self::SyncRegions(sync_regions) => Some(sync_regions),
_ => None,
}
}
}
/// The reply of [UpgradeRegion].
@@ -784,6 +814,31 @@ impl EnterStagingRegionsReply {
}
}
/// Reply for a single region sync request.
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
pub struct SyncRegionReply {
/// Region id of the synced region.
pub region_id: RegionId,
/// Returns true if the region is successfully synced and ready.
pub ready: bool,
/// Indicates whether the region exists.
pub exists: bool,
/// Return error message if any during the operation.
pub error: Option<String>,
}
/// Reply for a batch of region sync requests.
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
pub struct SyncRegionsReply {
pub replies: Vec<SyncRegionReply>,
}
impl SyncRegionsReply {
pub fn new(replies: Vec<SyncRegionReply>) -> Self {
Self { replies }
}
}
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
pub struct RemapManifestReply {
/// Returns false if the region does not exist.
@@ -847,6 +902,7 @@ pub enum InstructionReply {
GetFileRefs(GetFileRefsReply),
GcRegions(GcRegionsReply),
EnterStagingRegions(EnterStagingRegionsReply),
SyncRegions(SyncRegionsReply),
RemapManifest(RemapManifestReply),
ApplyStagingManifests(ApplyStagingManifestsReply),
}
@@ -872,6 +928,9 @@ impl Display for InstructionReply {
reply.replies
)
}
Self::SyncRegions(reply) => {
write!(f, "InstructionReply::SyncRegions({:?})", reply.replies)
}
Self::RemapManifest(reply) => write!(f, "InstructionReply::RemapManifest({})", reply),
Self::ApplyStagingManifests(reply) => write!(
f,
@@ -926,6 +985,13 @@ impl InstructionReply {
}
}
pub fn expect_sync_regions_reply(self) -> Vec<SyncRegionReply> {
match self {
Self::SyncRegions(reply) => reply.replies,
_ => panic!("Expected SyncRegion reply"),
}
}
pub fn expect_remap_manifest_reply(self) -> RemapManifestReply {
match self {
Self::RemapManifest(reply) => reply,

View File

@@ -150,7 +150,7 @@ fn create_region_request_from_raw_table_info(
raw_table_info: &RawTableInfo,
physical_table_id: TableId,
) -> Result<CreateRequestBuilder> {
let template = build_template_from_raw_table_info(raw_table_info)?;
let template = build_template_from_raw_table_info(raw_table_info, false)?;
Ok(CreateRequestBuilder::new(template, Some(physical_table_id)))
}

View File

@@ -7,7 +7,6 @@ license.workspace = true
[dependencies]
common-error.workspace = true
common-macro.workspace = true
prost.workspace = true
snafu.workspace = true
tokio.workspace = true

View File

@@ -31,6 +31,7 @@ mod flush_region;
mod gc_worker;
mod open_region;
mod remap_manifest;
mod sync_region;
mod upgrade_region;
use crate::heartbeat::handler::apply_staging_manifest::ApplyStagingManifestsHandler;
@@ -42,6 +43,7 @@ use crate::heartbeat::handler::flush_region::FlushRegionsHandler;
use crate::heartbeat::handler::gc_worker::GcRegionsHandler;
use crate::heartbeat::handler::open_region::OpenRegionsHandler;
use crate::heartbeat::handler::remap_manifest::RemapManifestHandler;
use crate::heartbeat::handler::sync_region::SyncRegionHandler;
use crate::heartbeat::handler::upgrade_region::UpgradeRegionsHandler;
use crate::heartbeat::task_tracker::TaskTracker;
use crate::region_server::RegionServer;
@@ -132,6 +134,7 @@ impl RegionHeartbeatResponseHandler {
Instruction::EnterStagingRegions(_) => {
Ok(Some(Box::new(EnterStagingRegionsHandler.into())))
}
Instruction::SyncRegions(_) => Ok(Some(Box::new(SyncRegionHandler.into()))),
Instruction::RemapManifest(_) => Ok(Some(Box::new(RemapManifestHandler.into()))),
Instruction::ApplyStagingManifests(_) => {
Ok(Some(Box::new(ApplyStagingManifestsHandler.into())))
@@ -150,6 +153,7 @@ pub enum InstructionHandlers {
GetFileRefs(GetFileRefsHandler),
GcRegions(GcRegionsHandler),
EnterStagingRegions(EnterStagingRegionsHandler),
SyncRegions(SyncRegionHandler),
RemapManifest(RemapManifestHandler),
ApplyStagingManifests(ApplyStagingManifestsHandler),
}
@@ -175,6 +179,7 @@ impl_from_handler!(
GetFileRefsHandler => GetFileRefs,
GcRegionsHandler => GcRegions,
EnterStagingRegionsHandler => EnterStagingRegions,
SyncRegionHandler => SyncRegions,
RemapManifestHandler => RemapManifest,
ApplyStagingManifestsHandler => ApplyStagingManifests
);
@@ -222,6 +227,7 @@ dispatch_instr!(
GetFileRefs => GetFileRefs,
GcRegions => GcRegions,
EnterStagingRegions => EnterStagingRegions,
SyncRegions => SyncRegions,
RemapManifest => RemapManifest,
ApplyStagingManifests => ApplyStagingManifests,
);

View File

@@ -48,19 +48,32 @@ impl ApplyStagingManifestsHandler {
ctx: &HandlerContext,
request: ApplyStagingManifest,
) -> ApplyStagingManifestReply {
let Some(leader) = ctx.region_server.is_region_leader(request.region_id) else {
warn!("Region: {} is not found", request.region_id);
let ApplyStagingManifest {
region_id,
ref partition_expr,
central_region_id,
ref manifest_path,
} = request;
common_telemetry::info!(
"Datanode received apply staging manifest request, region_id: {}, central_region_id: {}, partition_expr: {}, manifest_path: {}",
region_id,
central_region_id,
partition_expr,
manifest_path
);
let Some(leader) = ctx.region_server.is_region_leader(region_id) else {
warn!("Region: {} is not found", region_id);
return ApplyStagingManifestReply {
region_id: request.region_id,
region_id,
exists: false,
ready: false,
error: None,
};
};
if !leader {
warn!("Region: {} is not leader", request.region_id);
warn!("Region: {} is not leader", region_id);
return ApplyStagingManifestReply {
region_id: request.region_id,
region_id,
exists: true,
ready: false,
error: Some("Region is not leader".into()),
@@ -70,25 +83,25 @@ impl ApplyStagingManifestsHandler {
match ctx
.region_server
.handle_request(
request.region_id,
region_id,
RegionRequest::ApplyStagingManifest(ApplyStagingManifestRequest {
partition_expr: request.partition_expr,
central_region_id: request.central_region_id,
manifest_path: request.manifest_path,
partition_expr: partition_expr.clone(),
central_region_id,
manifest_path: manifest_path.clone(),
}),
)
.await
{
Ok(_) => ApplyStagingManifestReply {
region_id: request.region_id,
region_id,
exists: true,
ready: true,
error: None,
},
Err(err) => {
error!(err; "Failed to apply staging manifest");
error!(err; "Failed to apply staging manifest, region_id: {}", region_id);
ApplyStagingManifestReply {
region_id: request.region_id,
region_id,
exists: true,
ready: false,
error: Some(format!("{err:?}")),

View File

@@ -51,6 +51,11 @@ impl EnterStagingRegionsHandler {
partition_expr,
}: EnterStagingRegion,
) -> EnterStagingRegionReply {
common_telemetry::info!(
"Datanode received enter staging region: {}, partition_expr: {}",
region_id,
partition_expr
);
let Some(writable) = ctx.region_server.is_region_leader(region_id) else {
warn!("Region: {} is not found", region_id);
return EnterStagingRegionReply {
@@ -85,7 +90,7 @@ impl EnterStagingRegionsHandler {
error: None,
},
Err(err) => {
error!(err; "Failed to enter staging region");
error!(err; "Failed to enter staging region, region_id: {}", region_id);
EnterStagingRegionReply {
region_id,
ready: false,

View File

@@ -13,7 +13,7 @@
// limitations under the License.
use common_meta::instruction::{InstructionReply, RemapManifest, RemapManifestReply};
use common_telemetry::warn;
use common_telemetry::{error, info, warn};
use store_api::region_engine::RemapManifestsRequest;
use crate::heartbeat::handler::{HandlerContext, InstructionHandler};
@@ -34,6 +34,12 @@ impl InstructionHandler for RemapManifestHandler {
region_mapping,
new_partition_exprs,
} = request;
info!(
"Datanode received remap manifest request, region_id: {}, input_regions: {}, target_regions: {}",
region_id,
input_regions.len(),
new_partition_exprs.len()
);
let Some(leader) = ctx.region_server.is_region_leader(region_id) else {
warn!("Region: {} is not found", region_id);
return Some(InstructionReply::RemapManifest(RemapManifestReply {
@@ -67,11 +73,18 @@ impl InstructionHandler for RemapManifestHandler {
manifest_paths: result.manifest_paths,
error: None,
}),
Err(e) => InstructionReply::RemapManifest(RemapManifestReply {
exists: true,
manifest_paths: Default::default(),
error: Some(format!("{e:?}")),
}),
Err(e) => {
error!(
e;
"Remap manifests failed on datanode, region_id: {}",
region_id
);
InstructionReply::RemapManifest(RemapManifestReply {
exists: true,
manifest_paths: Default::default(),
error: Some(format!("{e:?}")),
})
}
};
Some(reply)

View File

@@ -0,0 +1,192 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use common_meta::instruction::{InstructionReply, SyncRegion, SyncRegionReply, SyncRegionsReply};
use common_telemetry::{error, info, warn};
use futures::future::join_all;
use crate::heartbeat::handler::{HandlerContext, InstructionHandler};
/// Handler for [SyncRegion] instruction.
/// It syncs the region from a manifest or another region.
#[derive(Debug, Clone, Copy, Default)]
pub struct SyncRegionHandler;
#[async_trait::async_trait]
impl InstructionHandler for SyncRegionHandler {
type Instruction = Vec<SyncRegion>;
/// Handles a batch of [SyncRegion] instructions.
async fn handle(
&self,
ctx: &HandlerContext,
regions: Self::Instruction,
) -> Option<InstructionReply> {
let futures = regions
.into_iter()
.map(|sync_region| Self::handle_sync_region(ctx, sync_region));
let results = join_all(futures).await;
Some(InstructionReply::SyncRegions(SyncRegionsReply::new(
results,
)))
}
}
impl SyncRegionHandler {
/// Handles a single [SyncRegion] instruction.
async fn handle_sync_region(
ctx: &HandlerContext,
SyncRegion { region_id, request }: SyncRegion,
) -> SyncRegionReply {
let Some(writable) = ctx.region_server.is_region_leader(region_id) else {
warn!("Region: {} is not found", region_id);
return SyncRegionReply {
region_id,
ready: false,
exists: false,
error: None,
};
};
if !writable {
warn!("Region: {} is not writable", region_id);
return SyncRegionReply {
region_id,
ready: false,
exists: true,
error: Some("Region is not writable".into()),
};
}
match ctx.region_server.sync_region(region_id, request).await {
Ok(_) => {
info!("Successfully synced region: {}", region_id);
SyncRegionReply {
region_id,
ready: true,
exists: true,
error: None,
}
}
Err(e) => {
error!(e; "Failed to sync region: {}", region_id);
SyncRegionReply {
region_id,
ready: false,
exists: true,
error: Some(format!("{:?}", e)),
}
}
}
}
}
#[cfg(test)]
mod tests {
use store_api::metric_engine_consts::METRIC_ENGINE_NAME;
use store_api::region_engine::{RegionRole, SyncRegionFromRequest};
use store_api::storage::RegionId;
use crate::heartbeat::handler::sync_region::SyncRegionHandler;
use crate::heartbeat::handler::{HandlerContext, InstructionHandler};
use crate::tests::{MockRegionEngine, mock_region_server};
#[tokio::test]
async fn test_handle_sync_region_not_found() {
let mut mock_region_server = mock_region_server();
let (mock_engine, _) = MockRegionEngine::new(METRIC_ENGINE_NAME);
mock_region_server.register_engine(mock_engine);
let handler_context = HandlerContext::new_for_test(mock_region_server);
let handler = SyncRegionHandler;
let region_id = RegionId::new(1024, 1);
let sync_region = common_meta::instruction::SyncRegion {
region_id,
request: SyncRegionFromRequest::from_manifest(Default::default()),
};
let reply = handler
.handle(&handler_context, vec![sync_region])
.await
.unwrap()
.expect_sync_regions_reply();
assert_eq!(reply.len(), 1);
assert_eq!(reply[0].region_id, region_id);
assert!(!reply[0].exists);
assert!(!reply[0].ready);
}
#[tokio::test]
async fn test_handle_sync_region_not_writable() {
let mock_region_server = mock_region_server();
let region_id = RegionId::new(1024, 1);
let (mock_engine, _) = MockRegionEngine::with_custom_apply_fn(METRIC_ENGINE_NAME, |r| {
r.mock_role = Some(Some(RegionRole::Follower));
});
mock_region_server.register_test_region(region_id, mock_engine);
let handler_context = HandlerContext::new_for_test(mock_region_server);
let handler = SyncRegionHandler;
let sync_region = common_meta::instruction::SyncRegion {
region_id,
request: SyncRegionFromRequest::from_manifest(Default::default()),
};
let reply = handler
.handle(&handler_context, vec![sync_region])
.await
.unwrap()
.expect_sync_regions_reply();
assert_eq!(reply.len(), 1);
assert_eq!(reply[0].region_id, region_id);
assert!(reply[0].exists);
assert!(!reply[0].ready);
assert!(reply[0].error.is_some());
}
#[tokio::test]
async fn test_handle_sync_region_success() {
let mock_region_server = mock_region_server();
let region_id = RegionId::new(1024, 1);
let (mock_engine, _) = MockRegionEngine::with_custom_apply_fn(METRIC_ENGINE_NAME, |r| {
r.mock_role = Some(Some(RegionRole::Leader));
});
mock_region_server.register_test_region(region_id, mock_engine);
let handler_context = HandlerContext::new_for_test(mock_region_server);
let handler = SyncRegionHandler;
let sync_region = common_meta::instruction::SyncRegion {
region_id,
request: SyncRegionFromRequest::from_manifest(Default::default()),
};
let reply = handler
.handle(&handler_context, vec![sync_region])
.await
.unwrap()
.expect_sync_regions_reply();
assert_eq!(reply.len(), 1);
assert_eq!(reply[0].region_id, region_id);
assert!(reply[0].exists);
assert!(reply[0].ready);
assert!(reply[0].error.is_none());
}
}

View File

@@ -115,12 +115,17 @@ pub type MockSetReadonlyGracefullyHandler =
pub type MockGetMetadataHandler =
Box<dyn Fn(RegionId) -> Result<RegionMetadataRef, Error> + Send + Sync>;
pub type MockSyncRegionHandler = Box<
dyn Fn(RegionId, SyncRegionFromRequest) -> Result<SyncRegionFromResponse, Error> + Send + Sync,
>;
pub struct MockRegionEngine {
sender: Sender<(RegionId, RegionRequest)>,
pub(crate) handle_request_delay: Option<Duration>,
pub(crate) handle_request_mock_fn: Option<MockRequestHandler>,
pub(crate) handle_set_readonly_gracefully_mock_fn: Option<MockSetReadonlyGracefullyHandler>,
pub(crate) handle_get_metadata_mock_fn: Option<MockGetMetadataHandler>,
pub(crate) handle_sync_region_mock_fn: Option<MockSyncRegionHandler>,
pub(crate) mock_role: Option<Option<RegionRole>>,
engine: String,
}
@@ -136,6 +141,7 @@ impl MockRegionEngine {
handle_request_mock_fn: None,
handle_set_readonly_gracefully_mock_fn: None,
handle_get_metadata_mock_fn: None,
handle_sync_region_mock_fn: None,
mock_role: None,
engine: engine.to_string(),
}),
@@ -156,6 +162,7 @@ impl MockRegionEngine {
handle_request_mock_fn: Some(mock_fn),
handle_set_readonly_gracefully_mock_fn: None,
handle_get_metadata_mock_fn: None,
handle_sync_region_mock_fn: None,
mock_role: None,
engine: engine.to_string(),
}),
@@ -176,6 +183,7 @@ impl MockRegionEngine {
handle_request_mock_fn: None,
handle_set_readonly_gracefully_mock_fn: None,
handle_get_metadata_mock_fn: Some(mock_fn),
handle_sync_region_mock_fn: None,
mock_role: None,
engine: engine.to_string(),
}),
@@ -197,6 +205,7 @@ impl MockRegionEngine {
handle_request_mock_fn: None,
handle_set_readonly_gracefully_mock_fn: None,
handle_get_metadata_mock_fn: None,
handle_sync_region_mock_fn: None,
mock_role: None,
engine: engine.to_string(),
};
@@ -286,10 +295,14 @@ impl RegionEngine for MockRegionEngine {
async fn sync_region(
&self,
_region_id: RegionId,
_request: SyncRegionFromRequest,
region_id: RegionId,
request: SyncRegionFromRequest,
) -> Result<SyncRegionFromResponse, BoxedError> {
unimplemented!()
if let Some(mock_fn) = &self.handle_sync_region_mock_fn {
return mock_fn(region_id, request).map_err(BoxedError::new);
};
Ok(SyncRegionFromResponse::Mito { synced: true })
}
async fn remap_manifests(

View File

@@ -1145,10 +1145,11 @@ impl TryFrom<ScalarValue> for Value {
ScalarValue::List(array) => {
// this is for item type
let datatype = ConcreteDataType::try_from(&array.value_type())?;
let items = ScalarValue::convert_array_to_scalar_vec(array.as_ref())
.context(ConvertArrowArrayToScalarsSnafu)?
let scalar_values = ScalarValue::convert_array_to_scalar_vec(array.as_ref())
.context(ConvertArrowArrayToScalarsSnafu)?;
let items = scalar_values
.into_iter()
.flatten()
.flat_map(|v| v.unwrap_or_else(|| vec![ScalarValue::Null]))
.map(|x| x.try_into())
.collect::<Result<Vec<Value>>>()?;
Value::List(ListValue::new(items, Arc::new(datatype)))
@@ -2997,6 +2998,7 @@ pub(crate) mod tests {
.unwrap()
.into_iter()
.flatten()
.flatten()
.collect::<Vec<_>>();
assert_eq!(
vs,

View File

@@ -13,7 +13,6 @@
// limitations under the License.
use std::any::Any;
use std::borrow::Borrow;
use std::sync::Arc;
use arrow::array::{Array, ArrayBuilder, ArrayIter, ArrayRef, BooleanArray, BooleanBuilder};
@@ -69,8 +68,8 @@ impl From<Vec<Option<bool>>> for BooleanVector {
}
}
impl<Ptr: Borrow<Option<bool>>> FromIterator<Ptr> for BooleanVector {
fn from_iter<I: IntoIterator<Item = Ptr>>(iter: I) -> Self {
impl FromIterator<Option<bool>> for BooleanVector {
fn from_iter<T: IntoIterator<Item = Option<bool>>>(iter: T) -> Self {
BooleanVector {
array: BooleanArray::from_iter(iter),
}
@@ -303,7 +302,7 @@ mod tests {
#[test]
fn test_boolean_vector_from_iter() {
let input = vec![Some(false), Some(true), Some(false), Some(true)];
let vec = input.iter().collect::<BooleanVector>();
let vec = input.iter().cloned().collect::<BooleanVector>();
assert_eq!(4, vec.len());
for (i, v) in input.into_iter().enumerate() {
assert_eq!(v, vec.get_data(i), "Failed at {i}")

View File

@@ -83,8 +83,6 @@ impl Decimal128Vector {
/// For example:
/// value = 12345, precision = 3, return error.
pub fn with_precision_and_scale(self, precision: u8, scale: i8) -> Result<Self> {
// validate if precision is too small
self.validate_decimal_precision(precision)?;
let array = self
.array
.with_precision_and_scale(precision, scale)
@@ -124,7 +122,7 @@ impl Decimal128Vector {
}
/// Validate decimal precision, if precision is invalid, return error.
fn validate_decimal_precision(&self, precision: u8) -> Result<()> {
pub fn validate_decimal_precision(&self, precision: u8) -> Result<()> {
self.array
.validate_decimal_precision(precision)
.context(ValueExceedsPrecisionSnafu { precision })
@@ -564,7 +562,9 @@ pub mod tests {
let decimal_vector = decimal_builder.finish();
assert_eq!(decimal_vector.precision(), 38);
assert_eq!(decimal_vector.scale(), 10);
let result = decimal_vector.with_precision_and_scale(3, 2);
let result = decimal_vector
.with_precision_and_scale(3, 2)
.and_then(|x| x.validate_decimal_precision(3));
assert_eq!(
"Value exceeds the precision 3 bound",
result.unwrap_err().to_string()

View File

@@ -170,11 +170,12 @@ impl<K: ArrowDictionaryKeyType> Serializable for DictionaryVector<K> {
// the value it refers to in the dictionary
let mut result = Vec::with_capacity(self.len());
for i in 0..self.len() {
let keys = self.array.keys();
let key_values = &keys.values()[..self.len()];
for (i, &key) in key_values.iter().enumerate() {
if self.is_null(i) {
result.push(JsonValue::Null);
} else {
let key = self.array.keys().value(i);
let value = self.item_vector.get(key.as_usize());
let json_value = serde_json::to_value(value).context(error::SerializeSnafu)?;
result.push(json_value);
@@ -247,16 +248,9 @@ impl<K: ArrowDictionaryKeyType> VectorOp for DictionaryVector<K> {
let mut replicated_keys = PrimitiveBuilder::new();
let mut previous_offset = 0;
for (i, &offset) in offsets.iter().enumerate() {
let key = if i < self.len() {
if keys.is_valid(i) {
Some(keys.value(i))
} else {
None
}
} else {
None
};
let mut key_iter = keys.iter().chain(std::iter::repeat(None));
for &offset in offsets {
let key = key_iter.next().unwrap();
// repeat this key (offset - previous_offset) times
let repeat_count = offset - previous_offset;

View File

@@ -170,10 +170,11 @@ impl Helper {
ScalarValue::List(array) => {
let item_type = Arc::new(ConcreteDataType::try_from(&array.value_type())?);
let mut builder = ListVectorBuilder::with_type_capacity(item_type.clone(), 1);
let values = ScalarValue::convert_array_to_scalar_vec(array.as_ref())
.context(ConvertArrowArrayToScalarsSnafu)?
let scalar_values = ScalarValue::convert_array_to_scalar_vec(array.as_ref())
.context(ConvertArrowArrayToScalarsSnafu)?;
let values = scalar_values
.into_iter()
.flatten()
.flat_map(|v| v.unwrap_or_else(|| vec![ScalarValue::Null]))
.map(ScalarValue::try_into)
.collect::<Result<Vec<Value>>>()?;
builder.push(Some(ListValueRef::Ref {

View File

@@ -18,6 +18,7 @@ use common_datasource::file_format::Format;
use common_datasource::file_format::csv::CsvFormat;
use common_datasource::file_format::parquet::DefaultParquetFileReaderFactory;
use datafusion::common::ToDFSchema;
use datafusion::config::CsvOptions;
use datafusion::datasource::listing::PartitionedFile;
use datafusion::datasource::object_store::ObjectStoreUrl;
use datafusion::datasource::physical_plan::{
@@ -34,7 +35,6 @@ use datafusion::prelude::SessionContext;
use datafusion_expr::expr::Expr;
use datafusion_expr::utils::conjunction;
use datafusion_orc::OrcSource;
use datatypes::arrow::datatypes::Schema as ArrowSchema;
use datatypes::schema::SchemaRef;
use object_store::ObjectStore;
use snafu::ResultExt;
@@ -45,7 +45,6 @@ const DEFAULT_BATCH_SIZE: usize = 8192;
fn build_record_batch_stream(
scan_plan_config: &ScanPlanConfig,
file_schema: Arc<ArrowSchema>,
limit: Option<usize>,
file_source: Arc<dyn FileSource>,
) -> Result<DfSendableRecordBatchStream> {
@@ -55,15 +54,12 @@ fn build_record_batch_stream(
.map(|filename| PartitionedFile::new(filename.clone(), 0))
.collect::<Vec<_>>();
let config = FileScanConfigBuilder::new(
ObjectStoreUrl::local_filesystem(),
file_schema,
file_source.clone(),
)
.with_projection(scan_plan_config.projection.cloned())
.with_limit(limit)
.with_file_group(FileGroup::new(files))
.build();
let config =
FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), file_source.clone())
.with_projection_indices(scan_plan_config.projection.cloned())
.with_limit(limit)
.with_file_group(FileGroup::new(files))
.build();
let store = Arc::new(object_store_opendal::OpendalStore::new(
scan_plan_config.store.clone(),
@@ -89,11 +85,14 @@ fn new_csv_stream(
// push down limit only if there is no filter
let limit = config.filters.is_empty().then_some(config.limit).flatten();
let csv_source = CsvSource::new(format.has_header, format.delimiter, b'"')
.with_schema(file_schema.clone())
let options = CsvOptions::default()
.with_has_header(format.has_header)
.with_delimiter(format.delimiter);
let csv_source = CsvSource::new(file_schema)
.with_csv_options(options)
.with_batch_size(DEFAULT_BATCH_SIZE);
build_record_batch_stream(config, file_schema, limit, csv_source)
build_record_batch_stream(config, limit, csv_source)
}
fn new_json_stream(config: &ScanPlanConfig) -> Result<DfSendableRecordBatchStream> {
@@ -102,8 +101,8 @@ fn new_json_stream(config: &ScanPlanConfig) -> Result<DfSendableRecordBatchStrea
// push down limit only if there is no filter
let limit = config.filters.is_empty().then_some(config.limit).flatten();
let file_source = JsonSource::new().with_batch_size(DEFAULT_BATCH_SIZE);
build_record_batch_stream(config, file_schema, limit, file_source)
let file_source = JsonSource::new(file_schema).with_batch_size(DEFAULT_BATCH_SIZE);
build_record_batch_stream(config, limit, file_source)
}
fn new_parquet_stream_with_exec_plan(
@@ -126,9 +125,10 @@ fn new_parquet_stream_with_exec_plan(
.collect::<Vec<_>>(),
);
let mut parquet_source = ParquetSource::default().with_parquet_file_reader_factory(Arc::new(
DefaultParquetFileReaderFactory::new(store.clone()),
));
let mut parquet_source = ParquetSource::new(file_schema.clone())
.with_parquet_file_reader_factory(Arc::new(DefaultParquetFileReaderFactory::new(
store.clone(),
)));
// build predicate filter
let filters = filters.to_vec();
@@ -143,15 +143,12 @@ fn new_parquet_stream_with_exec_plan(
parquet_source = parquet_source.with_predicate(filters);
};
let file_scan_config = FileScanConfigBuilder::new(
ObjectStoreUrl::local_filesystem(),
file_schema,
Arc::new(parquet_source),
)
.with_file_group(file_group)
.with_projection(projection.cloned())
.with_limit(*limit)
.build();
let file_scan_config =
FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), Arc::new(parquet_source))
.with_file_group(file_group)
.with_projection_indices(projection.cloned())
.with_limit(*limit)
.build();
// TODO(ruihang): get this from upper layer
let task_ctx = SessionContext::default().task_ctx();
@@ -170,8 +167,8 @@ fn new_orc_stream(config: &ScanPlanConfig) -> Result<DfSendableRecordBatchStream
// push down limit only if there is no filter
let limit = config.filters.is_empty().then_some(config.limit).flatten();
let file_source = OrcSource::default().with_batch_size(DEFAULT_BATCH_SIZE);
build_record_batch_stream(config, file_schema, limit, file_source)
let file_source = OrcSource::new(file_schema.into()).with_batch_size(DEFAULT_BATCH_SIZE);
build_record_batch_stream(config, limit, file_source)
}
#[derive(Debug, Clone)]

View File

@@ -14,7 +14,7 @@
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
use std::time::Instant;
use common_meta::datanode::RegionManifestInfo;
use common_meta::peer::Peer;
@@ -22,9 +22,7 @@ use common_telemetry::init_default_ut_logging;
use store_api::region_engine::RegionRole;
use store_api::storage::{FileId, FileRefsManifest, GcReport, RegionId};
use crate::gc::mock::{
MockSchedulerCtx, TEST_REGION_SIZE_200MB, mock_region_stat, new_empty_report_with,
};
use crate::gc::mock::{MockSchedulerCtx, TEST_REGION_SIZE_200MB, mock_region_stat};
use crate::gc::{GcScheduler, GcSchedulerOptions};
// Integration Flow Tests
@@ -135,6 +133,10 @@ async fn test_full_gc_workflow() {
#[cfg(target_os = "linux")]
#[tokio::test]
async fn test_tracker_cleanup() {
use std::time::Duration;
use crate::gc::mock::new_empty_report_with;
init_default_ut_logging();
let table_id = 1;

View File

@@ -14,19 +14,15 @@
use std::any::Any;
use api::v1::meta::MailboxMessage;
use common_meta::instruction::{FlushErrorStrategy, FlushRegions, Instruction, InstructionReply};
use common_procedure::{Context as ProcedureContext, Status};
use common_telemetry::{info, warn};
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};
use snafu::OptionExt;
use tokio::time::Instant;
use crate::error::{self, Error, Result};
use crate::handler::HeartbeatMailbox;
use crate::error::{self, Result};
use crate::procedure::region_migration::update_metadata::UpdateMetadata;
use crate::procedure::region_migration::{Context, State};
use crate::service::mailbox::Channel;
use crate::procedure::utils;
/// Flushes the leader region before downgrading it.
///
@@ -61,15 +57,6 @@ impl State for PreFlushRegion {
}
impl PreFlushRegion {
/// Builds flush leader region instruction.
fn build_flush_leader_region_instruction(&self, ctx: &Context) -> Instruction {
let pc = &ctx.persistent_ctx;
Instruction::FlushRegions(FlushRegions::sync_batch(
pc.region_ids.clone(),
FlushErrorStrategy::TryAll,
))
}
/// Tries to flush a leader region.
///
/// Ignore:
@@ -89,109 +76,18 @@ impl PreFlushRegion {
.context(error::ExceededDeadlineSnafu {
operation: "Flush leader region",
})?;
let flush_instruction = self.build_flush_leader_region_instruction(ctx);
let region_ids = &ctx.persistent_ctx.region_ids;
let leader = &ctx.persistent_ctx.from_peer;
let msg = MailboxMessage::json_message(
&format!("Flush leader region: {:?}", region_ids),
&format!("Metasrv@{}", ctx.server_addr()),
&format!("Datanode-{}@{}", leader.id, leader.addr),
common_time::util::current_time_millis(),
&flush_instruction,
utils::flush_region(
&ctx.mailbox,
&ctx.server_addr,
region_ids,
leader,
operation_timeout,
utils::ErrorStrategy::Ignore,
)
.with_context(|_| error::SerializeToJsonSnafu {
input: flush_instruction.to_string(),
})?;
let ch = Channel::Datanode(leader.id);
let now = Instant::now();
let result = ctx.mailbox.send(&ch, msg, operation_timeout).await;
match result {
Ok(receiver) => match receiver.await {
Ok(msg) => {
let reply = HeartbeatMailbox::json_reply(&msg)?;
info!(
"Received flush leader region reply: {:?}, region: {:?}, elapsed: {:?}",
reply,
region_ids,
now.elapsed()
);
let reply_result = match reply {
InstructionReply::FlushRegions(flush_reply) => {
if flush_reply.results.len() != region_ids.len() {
return error::UnexpectedInstructionReplySnafu {
mailbox_message: msg.to_string(),
reason: format!(
"expect {} region flush result, but got {}",
region_ids.len(),
flush_reply.results.len()
),
}
.fail();
}
match flush_reply.overall_success {
true => (true, None),
false => (
false,
Some(
flush_reply
.results
.iter()
.filter_map(|(region_id, result)| match result {
Ok(_) => None,
Err(e) => Some(format!("{}: {}", region_id, e)),
})
.collect::<Vec<String>>()
.join("; "),
),
),
}
}
_ => {
return error::UnexpectedInstructionReplySnafu {
mailbox_message: msg.to_string(),
reason: "expect flush region reply",
}
.fail();
}
};
let (result, error) = reply_result;
if let Some(error) = error {
warn!(
"Failed to flush leader regions {:?} on datanode {:?}, error: {}. Skip flush operation.",
region_ids, leader, &error
);
} else if result {
info!(
"The flush leader regions {:?} on datanode {:?} is successful, elapsed: {:?}",
region_ids,
leader,
now.elapsed()
);
}
Ok(())
}
Err(Error::MailboxTimeout { .. }) => error::ExceededDeadlineSnafu {
operation: "Flush leader regions",
}
.fail(),
Err(err) => Err(err),
},
Err(Error::PusherNotFound { .. }) => {
warn!(
"Failed to flush leader regions({:?}), the datanode({}) is unreachable(PusherNotFound). Skip flush operation.",
region_ids, leader
);
Ok(())
}
Err(err) => Err(err),
}
.await
}
}
@@ -202,11 +98,13 @@ mod tests {
use store_api::storage::RegionId;
use super::*;
use crate::error::Error;
use crate::procedure::region_migration::test_util::{self, TestingEnv, new_procedure_context};
use crate::procedure::region_migration::{ContextFactory, PersistentContext};
use crate::procedure::test_util::{
new_close_region_reply, new_flush_region_reply_for_region, send_mock_reply,
};
use crate::service::mailbox::Channel;
fn new_persistent_context() -> PersistentContext {
test_util::new_persistent_context(1, 2, RegionId::new(1024, 1))

View File

@@ -47,7 +47,7 @@ use common_procedure::{
BoxedProcedure, Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure,
ProcedureManagerRef, Result as ProcedureResult, Status, StringKey, UserMetadata,
};
use common_telemetry::error;
use common_telemetry::{error, info};
use partition::expr::PartitionExpr;
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};
@@ -232,7 +232,10 @@ impl Context {
&new_region_routes,
table_id,
)?;
info!(
"Updating table route for table: {}, new region routes: {:?}",
table_id, new_region_routes
);
self.table_metadata_manager
.update_table_route(
table_id,
@@ -262,6 +265,13 @@ impl Context {
.await;
Ok(())
}
/// Returns the next operation timeout.
///
/// If the next operation timeout is not set, it will return `None`.
pub fn next_operation_timeout(&self) -> Option<std::time::Duration> {
Some(std::time::Duration::from_secs(10))
}
}
#[async_trait::async_trait]
@@ -335,6 +345,13 @@ impl Procedure for RepartitionProcedure {
async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
let state = &mut self.state;
let state_name = state.name();
// Log state transition
common_telemetry::info!(
"Repartition procedure executing state: {}, table_id: {}",
state_name,
self.context.persistent_ctx.table_id
);
match state.next(&mut self.context, _ctx).await {
Ok((next, status)) => {
*state = next;

View File

@@ -65,6 +65,12 @@ impl State for AllocateRegion {
&mut next_region_number,
&self.plan_entries,
);
let plan_count = repartition_plan_entries.len();
let to_allocate = Self::count_regions_to_allocate(&repartition_plan_entries);
info!(
"Repartition allocate regions start, table_id: {}, groups: {}, regions_to_allocate: {}",
table_id, plan_count, to_allocate
);
// If no region to allocate, directly dispatch the plan.
if Self::count_regions_to_allocate(&repartition_plan_entries) == 0 {
@@ -99,6 +105,20 @@ impl State for AllocateRegion {
.await
.context(error::AllocateWalOptionsSnafu { table_id })?;
let new_region_count = new_allocated_region_routes.len();
let new_regions_brief: Vec<_> = new_allocated_region_routes
.iter()
.map(|route| {
let region_id = route.region.id;
let peer = route.leader_peer.as_ref().map(|p| p.id).unwrap_or_default();
format!("region_id: {}, peer: {}", region_id, peer)
})
.collect();
info!(
"Allocated regions for repartition, table_id: {}, new_region_count: {}, new_regions: {:?}",
table_id, new_region_count, new_regions_brief
);
let _operating_guards = Self::register_operating_regions(
&ctx.memory_region_keeper,
&new_allocated_region_routes,
@@ -137,7 +157,6 @@ impl AllocateRegion {
Self { plan_entries }
}
#[allow(dead_code)]
fn register_operating_regions(
memory_region_keeper: &MemoryRegionKeeperRef,
region_routes: &[RegionRoute],
@@ -155,7 +174,6 @@ impl AllocateRegion {
Ok(operating_guards)
}
#[allow(dead_code)]
fn generate_region_routes(
region_routes: &[RegionRoute],
new_allocated_region_ids: &[RegionRoute],
@@ -177,7 +195,6 @@ impl AllocateRegion {
///
/// This method takes the allocation plan entries and converts them to repartition plan entries,
/// updating `next_region_number` for each newly allocated region.
#[allow(dead_code)]
fn convert_to_repartition_plans(
table_id: TableId,
next_region_number: &mut RegionNumber,
@@ -196,7 +213,6 @@ impl AllocateRegion {
}
/// Collects all regions that need to be allocated from the repartition plan entries.
#[allow(dead_code)]
fn collect_allocate_regions(
repartition_plan_entries: &[RepartitionPlanEntry],
) -> Vec<&RegionDescriptor> {
@@ -207,7 +223,6 @@ impl AllocateRegion {
}
/// Prepares region allocation data: region numbers and their partition expressions.
#[allow(dead_code)]
fn prepare_region_allocation_data(
allocate_regions: &[&RegionDescriptor],
) -> Result<Vec<(RegionNumber, String)>> {
@@ -225,7 +240,6 @@ impl AllocateRegion {
}
/// Calculates the total number of regions that need to be allocated.
#[allow(dead_code)]
fn count_regions_to_allocate(repartition_plan_entries: &[RepartitionPlanEntry]) -> usize {
repartition_plan_entries
.iter()
@@ -234,12 +248,10 @@ impl AllocateRegion {
}
/// Gets the next region number from the physical table route.
#[allow(dead_code)]
fn get_next_region_number(max_region_number: RegionNumber) -> RegionNumber {
max_region_number + 1
}
#[allow(dead_code)]
async fn allocate_regions(
node_manager: &NodeManagerRef,
raw_table_info: &RawTableInfo,
@@ -252,12 +264,14 @@ impl AllocateRegion {
&raw_table_info.name,
);
let table_id = raw_table_info.ident.table_id;
let request = build_template_from_raw_table_info(raw_table_info)
let request = build_template_from_raw_table_info(raw_table_info, true)
.context(error::BuildCreateRequestSnafu { table_id })?;
let builder = CreateRequestBuilder::new(request, None);
let region_count = region_routes.len();
let wal_region_count = wal_options.len();
info!(
"Allocating regions for table: {}, region_routes: {:?}, wal_options: {:?}",
table_id, region_routes, wal_options
"Allocating regions on datanodes, table_id: {}, region_count: {}, wal_regions: {}",
table_id, region_count, wal_region_count
);
let executor = CreateTableExecutor::new(table_ref.into(), false, builder);
executor

View File

@@ -15,7 +15,7 @@
use std::any::Any;
use common_procedure::{Context as ProcedureContext, ProcedureId, Status, watcher};
use common_telemetry::error;
use common_telemetry::{error, info};
use serde::{Deserialize, Serialize};
use snafu::ResultExt;
@@ -64,9 +64,10 @@ impl Collect {
impl State for Collect {
async fn next(
&mut self,
_ctx: &mut Context,
ctx: &mut Context,
procedure_ctx: &ProcedureContext,
) -> Result<(Box<dyn State>, Status)> {
let table_id = ctx.persistent_ctx.table_id;
for procedure_meta in self.inflight_procedures.iter() {
let procedure_id = procedure_meta.procedure_id;
let group_id = procedure_meta.group_id;
@@ -93,7 +94,16 @@ impl State for Collect {
}
}
if !self.failed_procedures.is_empty() || !self.unknown_procedures.is_empty() {
let inflight = self.inflight_procedures.len();
let succeeded = self.succeeded_procedures.len();
let failed = self.failed_procedures.len();
let unknown = self.unknown_procedures.len();
info!(
"Collected repartition group results for table_id: {}, inflight: {}, succeeded: {}, failed: {}, unknown: {}",
table_id, inflight, succeeded, failed, unknown
);
if failed > 0 || unknown > 0 {
// TODO(weny): retry the failed or unknown procedures.
}

View File

@@ -62,9 +62,10 @@ impl State for DeallocateRegion {
.flat_map(|p| p.pending_deallocate_region_ids.iter())
.cloned()
.collect::<HashSet<_>>();
let dealloc_count = pending_deallocate_region_ids.len();
info!(
"Deallocating regions: {:?} for table: {} during repartition procedure",
pending_deallocate_region_ids, table_id
"Deallocating regions for repartition, table_id: {}, count: {}, regions: {:?}",
table_id, dealloc_count, pending_deallocate_region_ids
);
let table_lock = TableLock::Write(table_id).into();
@@ -111,7 +112,6 @@ impl State for DeallocateRegion {
}
impl DeallocateRegion {
#[allow(dead_code)]
async fn deallocate_regions(
node_manager: &NodeManagerRef,
leader_region_registry: &LeaderRegionRegistryRef,
@@ -136,7 +136,6 @@ impl DeallocateRegion {
Ok(())
}
#[allow(dead_code)]
fn filter_deallocatable_region_routes(
table_id: TableId,
region_routes: &[RegionRoute],
@@ -161,7 +160,6 @@ impl DeallocateRegion {
.collect::<Vec<_>>()
}
#[allow(dead_code)]
fn generate_region_routes(
region_routes: &[RegionRoute],
pending_deallocate_region_ids: &HashSet<RegionId>,

View File

@@ -16,7 +16,9 @@ use std::any::Any;
use std::collections::HashMap;
use common_procedure::{Context as ProcedureContext, ProcedureWithId, Status};
use common_telemetry::info;
use serde::{Deserialize, Serialize};
use store_api::metric_engine_consts::METRIC_ENGINE_NAME;
use store_api::storage::RegionId;
use crate::error::Result;
@@ -28,7 +30,6 @@ use crate::procedure::repartition::{self, Context, State};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Dispatch;
#[allow(dead_code)]
fn build_region_mapping(
source_regions: &[RegionDescriptor],
target_regions: &[RegionDescriptor],
@@ -57,8 +58,12 @@ impl State for Dispatch {
_procedure_ctx: &ProcedureContext,
) -> Result<(Box<dyn State>, Status)> {
let table_id = ctx.persistent_ctx.table_id;
let mut procedures = Vec::with_capacity(ctx.persistent_ctx.plans.len());
let mut procedure_metas = Vec::with_capacity(ctx.persistent_ctx.plans.len());
let table_info_value = ctx.get_table_info_value().await?;
let table_engine = table_info_value.table_info.meta.engine;
let sync_region = table_engine == METRIC_ENGINE_NAME;
let plan_count = ctx.persistent_ctx.plans.len();
let mut procedures = Vec::with_capacity(plan_count);
let mut procedure_metas = Vec::with_capacity(plan_count);
for (plan_index, plan) in ctx.persistent_ctx.plans.iter().enumerate() {
let region_mapping = build_region_mapping(
&plan.source_regions,
@@ -73,6 +78,9 @@ impl State for Dispatch {
plan.source_regions.clone(),
plan.target_regions.clone(),
region_mapping,
sync_region,
plan.allocated_region_ids.clone(),
plan.pending_deallocate_region_ids.clone(),
);
let group_procedure = RepartitionGroupProcedure::new(persistent_ctx, ctx);
@@ -85,6 +93,14 @@ impl State for Dispatch {
procedures.push(procedure);
}
let group_ids: Vec<_> = procedure_metas.iter().map(|m| m.group_id).collect();
info!(
"Dispatch repartition groups for table_id: {}, group_count: {}, group_ids: {:?}",
table_id,
group_ids.len(),
group_ids
);
Ok((
Box::new(Collect::new(procedure_metas)),
Status::suspended(procedures, true),

View File

@@ -17,6 +17,7 @@ pub(crate) mod enter_staging_region;
pub(crate) mod remap_manifest;
pub(crate) mod repartition_end;
pub(crate) mod repartition_start;
pub(crate) mod sync_region;
pub(crate) mod update_metadata;
pub(crate) mod utils;
@@ -40,7 +41,7 @@ use common_procedure::{
Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure,
Result as ProcedureResult, Status, StringKey, UserMetadata,
};
use common_telemetry::error;
use common_telemetry::{error, info};
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};
use store_api::storage::{RegionId, TableId};
@@ -55,7 +56,6 @@ use crate::service::mailbox::MailboxRef;
pub type GroupId = Uuid;
#[allow(dead_code)]
pub struct RepartitionGroupProcedure {
state: Box<dyn State>,
context: Context,
@@ -113,6 +113,14 @@ impl Procedure for RepartitionGroupProcedure {
async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
let state = &mut self.state;
let state_name = state.name();
// Log state transition
common_telemetry::info!(
"Repartition group procedure executing state: {}, group id: {}, table id: {}",
state_name,
self.context.persistent_ctx.group_id,
self.context.persistent_ctx.table_id
);
match state.next(&mut self.context, _ctx).await {
Ok((next, status)) => {
@@ -221,9 +229,16 @@ pub struct PersistentContext {
/// The staging manifest paths of the repartition group.
/// The value will be set in [RemapManifest](crate::procedure::repartition::group::remap_manifest::RemapManifest) state.
pub staging_manifest_paths: HashMap<RegionId, String>,
/// Whether sync region is needed for this group.
pub sync_region: bool,
/// The region ids of the newly allocated regions.
pub allocated_region_ids: Vec<RegionId>,
/// The region ids of the regions that are pending deallocation.
pub pending_deallocate_region_ids: Vec<RegionId>,
}
impl PersistentContext {
#[allow(clippy::too_many_arguments)]
pub fn new(
group_id: GroupId,
table_id: TableId,
@@ -232,6 +247,9 @@ impl PersistentContext {
sources: Vec<RegionDescriptor>,
targets: Vec<RegionDescriptor>,
region_mapping: HashMap<RegionId, Vec<RegionId>>,
sync_region: bool,
allocated_region_ids: Vec<RegionId>,
pending_deallocate_region_ids: Vec<RegionId>,
) -> Self {
Self {
group_id,
@@ -243,6 +261,9 @@ impl PersistentContext {
region_mapping,
group_prepare_result: None,
staging_manifest_paths: HashMap::new(),
sync_region,
allocated_region_ids,
pending_deallocate_region_ids,
}
}
@@ -334,6 +355,7 @@ impl Context {
new_region_routes: Vec<RegionRoute>,
) -> Result<()> {
let table_id = self.persistent_ctx.table_id;
let group_id = self.persistent_ctx.group_id;
// Safety: prepare result is set in [RepartitionStart] state.
let prepare_result = self.persistent_ctx.group_prepare_result.as_ref().unwrap();
let central_region_datanode_table_value = self
@@ -345,6 +367,10 @@ impl Context {
..
} = &central_region_datanode_table_value.region_info;
info!(
"Updating table route for table: {}, group_id: {}, new region routes: {:?}",
table_id, group_id, new_region_routes
);
self.table_metadata_manager
.update_table_route(
table_id,

View File

@@ -31,7 +31,7 @@ use store_api::storage::RegionId;
use crate::error::{self, Error, Result};
use crate::handler::HeartbeatMailbox;
use crate::procedure::repartition::group::repartition_end::RepartitionEnd;
use crate::procedure::repartition::group::update_metadata::UpdateMetadata;
use crate::procedure::repartition::group::utils::{
HandleMultipleResult, group_region_routes_by_peer, handle_multiple_results,
};
@@ -52,7 +52,10 @@ impl State for ApplyStagingManifest {
) -> Result<(Box<dyn State>, Status)> {
self.apply_staging_manifests(ctx).await?;
Ok((Box::new(RepartitionEnd), Status::executing(true)))
Ok((
Box::new(UpdateMetadata::ExitStaging),
Status::executing(true),
))
}
fn as_any(&self) -> &dyn Any {
@@ -125,7 +128,6 @@ impl ApplyStagingManifest {
})
}
#[allow(dead_code)]
async fn apply_staging_manifests(&self, ctx: &mut Context) -> Result<()> {
let table_id = ctx.persistent_ctx.table_id;
let group_id = ctx.persistent_ctx.group_id;
@@ -150,6 +152,7 @@ impl ApplyStagingManifest {
operation: "Apply staging manifests",
})?;
let instruction_region_count: usize = instructions.values().map(|v| v.len()).sum();
let (peers, tasks): (Vec<_>, Vec<_>) = instructions
.iter()
.map(|(peer, apply_staging_manifests)| {
@@ -166,8 +169,11 @@ impl ApplyStagingManifest {
})
.unzip();
info!(
"Sent apply staging manifests instructions to peers: {:?} for repartition table {}, group id {}",
peers, table_id, group_id
"Sent apply staging manifests instructions, table_id: {}, group_id: {}, peers: {}, regions: {}",
table_id,
group_id,
peers.len(),
instruction_region_count
);
let format_err_msg = |idx: usize, error: &Error| {
@@ -292,11 +298,7 @@ impl ApplyStagingManifest {
match receiver.await {
Ok(msg) => {
let reply = HeartbeatMailbox::json_reply(&msg)?;
info!(
"Received apply staging manifests reply: {:?}, elapsed: {:?}",
reply,
now.elapsed()
);
let elapsed = now.elapsed();
let InstructionReply::ApplyStagingManifests(ApplyStagingManifestsReply { replies }) =
reply
else {
@@ -306,9 +308,23 @@ impl ApplyStagingManifest {
}
.fail();
};
let total = replies.len();
let (mut ready, mut not_ready, mut with_error) = (0, 0, 0);
let region_ids = replies.iter().map(|r| r.region_id).collect::<Vec<_>>();
for reply in replies {
if reply.error.is_some() {
with_error += 1;
} else if reply.ready {
ready += 1;
} else {
not_ready += 1;
}
Self::handle_apply_staging_manifest_reply(&reply, &now, peer)?;
}
info!(
"Received apply staging manifests reply, peer: {:?}, total_regions: {}, regions:{:?}, ready: {}, not_ready: {}, with_error: {}, elapsed: {:?}",
peer, total, region_ids, ready, not_ready, with_error, elapsed
);
Ok(())
}

View File

@@ -23,7 +23,7 @@ use common_meta::instruction::{
use common_meta::peer::Peer;
use common_procedure::{Context as ProcedureContext, Status};
use common_telemetry::info;
use futures::future::join_all;
use futures::future::{join_all, try_join_all};
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt, ensure};
@@ -35,6 +35,7 @@ use crate::procedure::repartition::group::utils::{
};
use crate::procedure::repartition::group::{Context, GroupPrepareResult, State};
use crate::procedure::repartition::plan::RegionDescriptor;
use crate::procedure::utils::{self, ErrorStrategy};
use crate::service::mailbox::{Channel, MailboxRef};
#[derive(Debug, Serialize, Deserialize)]
@@ -48,6 +49,7 @@ impl State for EnterStagingRegion {
ctx: &mut Context,
_procedure_ctx: &ProcedureContext,
) -> Result<(Box<dyn State>, Status)> {
self.flush_pending_deallocate_regions(ctx).await?;
self.enter_staging_regions(ctx).await?;
Ok((Box::new(RemapManifest), Status::executing(true)))
@@ -94,7 +96,6 @@ impl EnterStagingRegion {
Ok(instructions)
}
#[allow(dead_code)]
async fn enter_staging_regions(&self, ctx: &mut Context) -> Result<()> {
let table_id = ctx.persistent_ctx.table_id;
let group_id = ctx.persistent_ctx.group_id;
@@ -102,6 +103,8 @@ impl EnterStagingRegion {
let prepare_result = ctx.persistent_ctx.group_prepare_result.as_ref().unwrap();
let targets = &ctx.persistent_ctx.targets;
let instructions = Self::build_enter_staging_instructions(prepare_result, targets)?;
let target_region_count = targets.len();
let peer_count = instructions.len();
let operation_timeout =
ctx.next_operation_timeout()
.context(error::ExceededDeadlineSnafu {
@@ -123,8 +126,8 @@ impl EnterStagingRegion {
})
.unzip();
info!(
"Sent enter staging regions instructions to peers: {:?} for repartition table {}, group id {}",
peers, table_id, group_id
"Sent enter staging regions instructions, table_id: {}, group_id: {}, peers: {}, target_regions: {}",
table_id, group_id, peer_count, target_region_count
);
let format_err_msg = |idx: usize, error: &Error| {
@@ -242,11 +245,7 @@ impl EnterStagingRegion {
match receiver.await {
Ok(msg) => {
let reply = HeartbeatMailbox::json_reply(&msg)?;
info!(
"Received enter staging regions reply: {:?}, elapsed: {:?}",
reply,
now.elapsed()
);
let elapsed = now.elapsed();
let InstructionReply::EnterStagingRegions(EnterStagingRegionsReply { replies }) =
reply
else {
@@ -256,9 +255,22 @@ impl EnterStagingRegion {
}
.fail();
};
let total = replies.len();
let (mut ready, mut not_ready, mut with_error) = (0, 0, 0);
for reply in replies {
if reply.error.is_some() {
with_error += 1;
} else if reply.ready {
ready += 1;
} else {
not_ready += 1;
}
Self::handle_enter_staging_region_reply(&reply, &now, peer)?;
}
info!(
"Received enter staging regions reply, peer: {:?}, total_regions: {}, ready: {}, not_ready: {}, with_error: {}, elapsed: {:?}",
peer, total, ready, not_ready, with_error, elapsed
);
Ok(())
}
@@ -320,6 +332,61 @@ impl EnterStagingRegion {
Ok(())
}
async fn flush_pending_deallocate_regions(&self, ctx: &mut Context) -> Result<()> {
let pending_deallocate_region_ids = &ctx.persistent_ctx.pending_deallocate_region_ids;
if pending_deallocate_region_ids.is_empty() {
return Ok(());
}
let table_id = ctx.persistent_ctx.table_id;
let group_id = ctx.persistent_ctx.group_id;
let operation_timeout =
ctx.next_operation_timeout()
.context(error::ExceededDeadlineSnafu {
operation: "Flush pending deallocate regions",
})?;
let result = &ctx.persistent_ctx.group_prepare_result.as_ref().unwrap();
let source_routes = result
.source_routes
.iter()
.filter(|route| pending_deallocate_region_ids.contains(&route.region.id))
.cloned()
.collect::<Vec<_>>();
let peer_region_ids_map = group_region_routes_by_peer(&source_routes);
info!(
"Flushing pending deallocate regions, table_id: {}, group_id: {}, peer_region_ids_map: {:?}",
table_id, group_id, peer_region_ids_map
);
let now = Instant::now();
let tasks = peer_region_ids_map
.iter()
.map(|(peer, region_ids)| {
utils::flush_region(
&ctx.mailbox,
&ctx.server_addr,
region_ids,
peer,
operation_timeout,
ErrorStrategy::Retry,
)
})
.collect::<Vec<_>>();
try_join_all(tasks).await?;
info!(
"Flushed pending deallocate regions: {:?}, table_id: {}, group_id: {}, elapsed: {:?}",
source_routes
.iter()
.map(|route| route.region.id)
.collect::<Vec<_>>(),
table_id,
group_id,
now.elapsed()
);
Ok(())
}
}
#[cfg(test)]

View File

@@ -65,6 +65,13 @@ impl State for RemapManifest {
.await?;
let table_id = ctx.persistent_ctx.table_id;
let group_id = ctx.persistent_ctx.group_id;
let manifest_count = manifest_paths.len();
let input_region_count = ctx.persistent_ctx.sources.len();
let target_region_count = ctx.persistent_ctx.targets.len();
info!(
"Remap manifests finished for repartition, table_id: {}, group_id: {}, input_regions: {}, target_regions: {}, manifest_paths: {}",
table_id, group_id, input_region_count, target_region_count, manifest_count
);
if manifest_paths.len() != ctx.persistent_ctx.targets.len() {
warn!(
@@ -156,11 +163,7 @@ impl RemapManifest {
match receiver.await {
Ok(msg) => {
let reply = HeartbeatMailbox::json_reply(&msg)?;
info!(
"Received remap manifest reply: {:?}, elapsed: {:?}",
reply,
now.elapsed()
);
let elapsed = now.elapsed();
let InstructionReply::RemapManifest(reply) = reply else {
return error::UnexpectedInstructionReplySnafu {
mailbox_message: msg.to_string(),
@@ -168,6 +171,11 @@ impl RemapManifest {
}
.fail();
};
let manifest_count = reply.manifest_paths.len();
info!(
"Received remap manifest reply for central_region: {}, manifest_paths: {}, elapsed: {:?}",
remap.region_id, manifest_count, elapsed
);
Self::handle_remap_manifest_reply(remap.region_id, reply, &now, peer)
}

View File

@@ -13,7 +13,7 @@
// limitations under the License.
use std::any::Any;
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use common_meta::rpc::router::RegionRoute;
use common_procedure::{Context as ProcedureContext, Status};
@@ -22,6 +22,7 @@ use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt, ensure};
use crate::error::{self, Result};
use crate::procedure::repartition::group::sync_region::SyncRegion;
use crate::procedure::repartition::group::update_metadata::UpdateMetadata;
use crate::procedure::repartition::group::{
Context, GroupId, GroupPrepareResult, State, region_routes,
@@ -56,7 +57,6 @@ impl RepartitionStart {
/// Ensures that both source and target regions are present in the region routes.
///
/// Both source and target regions must be present in the region routes (target regions should be allocated before repartitioning).
#[allow(dead_code)]
fn ensure_route_present(
group_id: GroupId,
region_routes: &[RegionRoute],
@@ -172,6 +172,28 @@ impl State for RepartitionStart {
ctx.persistent_ctx.targets.len()
);
if ctx.persistent_ctx.sync_region {
let prepare_result = ctx.persistent_ctx.group_prepare_result.as_ref().unwrap();
let allocated_region_ids: HashSet<_> = ctx
.persistent_ctx
.allocated_region_ids
.iter()
.copied()
.collect();
let region_routes: Vec<_> = prepare_result
.target_routes
.iter()
.filter(|route| allocated_region_ids.contains(&route.region.id))
.cloned()
.collect();
if !region_routes.is_empty() {
return Ok((
Box::new(SyncRegion { region_routes }),
Status::executing(true),
));
}
}
Ok((
Box::new(UpdateMetadata::ApplyStaging),
Status::executing(true),

View File

@@ -0,0 +1,445 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::any::Any;
use std::collections::HashMap;
use std::time::{Duration, Instant};
use api::v1::meta::MailboxMessage;
use common_meta::instruction::{Instruction, InstructionReply, SyncRegionReply, SyncRegionsReply};
use common_meta::peer::Peer;
use common_meta::rpc::router::RegionRoute;
use common_procedure::{Context as ProcedureContext, Status};
use common_telemetry::info;
use futures::future::join_all;
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt, ensure};
use store_api::region_engine::SyncRegionFromRequest;
use store_api::storage::RegionId;
use crate::error::{self, Error, Result};
use crate::handler::HeartbeatMailbox;
use crate::procedure::repartition::group::update_metadata::UpdateMetadata;
use crate::procedure::repartition::group::utils::{
HandleMultipleResult, group_region_routes_by_peer, handle_multiple_results,
};
use crate::procedure::repartition::group::{Context, State};
use crate::procedure::utils::ErrorStrategy;
use crate::service::mailbox::{Channel, MailboxRef};
const DEFAULT_SYNC_REGION_PARALLELISM: usize = 3;
/// The state of syncing regions for a repartition group.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SyncRegion {
pub region_routes: Vec<RegionRoute>,
}
#[async_trait::async_trait]
#[typetag::serde]
impl State for SyncRegion {
async fn next(
&mut self,
ctx: &mut Context,
_procedure_ctx: &ProcedureContext,
) -> Result<(Box<dyn State>, Status)> {
Self::flush_central_region(ctx).await?;
self.sync_regions(ctx).await?;
Ok((
Box::new(UpdateMetadata::ApplyStaging),
Status::executing(true),
))
}
fn as_any(&self) -> &dyn Any {
self
}
}
impl SyncRegion {
async fn flush_central_region(ctx: &mut Context) -> Result<()> {
let operation_timeout =
ctx.next_operation_timeout()
.context(error::ExceededDeadlineSnafu {
operation: "Flush central region",
})?;
let prepare_result = ctx.persistent_ctx.group_prepare_result.as_ref().unwrap();
crate::procedure::utils::flush_region(
&ctx.mailbox,
&ctx.server_addr,
&[prepare_result.central_region],
&prepare_result.central_region_datanode,
operation_timeout,
ErrorStrategy::Retry,
)
.await
}
/// Builds instructions to sync regions on datanodes.
fn build_sync_region_instructions(
central_region: RegionId,
region_routes: &[RegionRoute],
) -> HashMap<Peer, Vec<common_meta::instruction::SyncRegion>> {
let target_region_routes_by_peer = group_region_routes_by_peer(region_routes);
let mut instructions = HashMap::with_capacity(target_region_routes_by_peer.len());
for (peer, region_ids) in target_region_routes_by_peer {
let sync_regions = region_ids
.into_iter()
.map(|region_id| {
let request = SyncRegionFromRequest::FromRegion {
source_region_id: central_region,
parallelism: DEFAULT_SYNC_REGION_PARALLELISM,
};
common_meta::instruction::SyncRegion { region_id, request }
})
.collect();
instructions.insert((*peer).clone(), sync_regions);
}
instructions
}
/// Syncs regions on datanodes.
async fn sync_regions(&self, ctx: &mut Context) -> Result<()> {
let table_id = ctx.persistent_ctx.table_id;
let prepare_result = ctx.persistent_ctx.group_prepare_result.as_ref().unwrap();
let instructions = Self::build_sync_region_instructions(
prepare_result.central_region,
&self.region_routes,
);
let operation_timeout =
ctx.next_operation_timeout()
.context(error::ExceededDeadlineSnafu {
operation: "Sync regions",
})?;
let (peers, tasks): (Vec<_>, Vec<_>) = instructions
.iter()
.map(|(peer, sync_regions)| {
(
peer,
Self::sync_region(
&ctx.mailbox,
&ctx.server_addr,
peer,
sync_regions,
operation_timeout,
),
)
})
.unzip();
info!(
"Sent sync regions instructions to peers: {:?} for repartition table {}",
peers, table_id
);
let format_err_msg = |idx: usize, error: &Error| {
let peer = peers[idx];
format!(
"Failed to sync regions on datanode {:?}, error: {:?}",
peer, error
)
};
let results = join_all(tasks).await;
let result = handle_multiple_results(&results);
match result {
HandleMultipleResult::AllSuccessful => Ok(()),
HandleMultipleResult::AllRetryable(retryable_errors) => error::RetryLaterSnafu {
reason: format!(
"All retryable errors during syncing regions for repartition table {}: {:?}",
table_id,
retryable_errors
.iter()
.map(|(idx, error)| format_err_msg(*idx, error))
.collect::<Vec<_>>()
.join(",")
),
}
.fail(),
HandleMultipleResult::AllNonRetryable(non_retryable_errors) => error::UnexpectedSnafu {
violated: format!(
"All non retryable errors during syncing regions for repartition table {}: {:?}",
table_id,
non_retryable_errors
.iter()
.map(|(idx, error)| format_err_msg(*idx, error))
.collect::<Vec<_>>()
.join(",")
),
}
.fail(),
HandleMultipleResult::PartialRetryable {
retryable_errors,
non_retryable_errors,
} => error::UnexpectedSnafu {
violated: format!(
"Partial retryable errors during syncing regions for repartition table {}: {:?}, non retryable errors: {:?}",
table_id,
retryable_errors
.iter()
.map(|(idx, error)| format_err_msg(*idx, error))
.collect::<Vec<_>>()
.join(","),
non_retryable_errors
.iter()
.map(|(idx, error)| format_err_msg(*idx, error))
.collect::<Vec<_>>()
.join(","),
),
}
.fail(),
}
}
/// Syncs regions on a datanode.
async fn sync_region(
mailbox: &MailboxRef,
server_addr: &str,
peer: &Peer,
sync_regions: &[common_meta::instruction::SyncRegion],
timeout: Duration,
) -> Result<()> {
let ch = Channel::Datanode(peer.id);
let instruction = Instruction::SyncRegions(sync_regions.to_vec());
let message = MailboxMessage::json_message(
&format!(
"Sync regions: {:?}",
sync_regions.iter().map(|r| r.region_id).collect::<Vec<_>>()
),
&format!("Metasrv@{}", server_addr),
&format!("Datanode-{}@{}", peer.id, peer.addr),
common_time::util::current_time_millis(),
&instruction,
)
.with_context(|_| error::SerializeToJsonSnafu {
input: instruction.to_string(),
})?;
let now = std::time::Instant::now();
let receiver = mailbox.send(&ch, message, timeout).await;
let receiver = match receiver {
Ok(receiver) => receiver,
Err(error::Error::PusherNotFound { .. }) => error::RetryLaterSnafu {
reason: format!(
"Pusher not found for sync regions on datanode {:?}, elapsed: {:?}",
peer,
now.elapsed()
),
}
.fail()?,
Err(err) => {
return Err(err);
}
};
match receiver.await {
Ok(msg) => {
let reply = HeartbeatMailbox::json_reply(&msg)?;
info!(
"Received sync regions reply: {:?}, elapsed: {:?}",
reply,
now.elapsed()
);
let InstructionReply::SyncRegions(SyncRegionsReply { replies }) = reply else {
return error::UnexpectedInstructionReplySnafu {
mailbox_message: msg.to_string(),
reason: "expect sync regions reply",
}
.fail();
};
for reply in replies {
Self::handle_sync_region_reply(&reply, &now, peer)?;
}
Ok(())
}
Err(error::Error::MailboxTimeout { .. }) => {
let reason = format!(
"Mailbox received timeout for sync regions on datanode {:?}, elapsed: {:?}",
peer,
now.elapsed()
);
error::RetryLaterSnafu { reason }.fail()
}
Err(err) => Err(err),
}
}
fn handle_sync_region_reply(
SyncRegionReply {
region_id,
ready,
exists,
error,
}: &SyncRegionReply,
now: &Instant,
peer: &Peer,
) -> Result<()> {
ensure!(
exists,
error::UnexpectedSnafu {
violated: format!(
"Region {} doesn't exist on datanode {:?}, elapsed: {:?}",
region_id,
peer,
now.elapsed()
)
}
);
if let Some(error) = error {
return error::RetryLaterSnafu {
reason: format!(
"Failed to sync region {} on datanode {:?}, error: {:?}, elapsed: {:?}",
region_id,
peer,
error,
now.elapsed()
),
}
.fail();
}
ensure!(
ready,
error::RetryLaterSnafu {
reason: format!(
"Region {} failed to sync on datanode {:?}, elapsed: {:?}",
region_id,
peer,
now.elapsed()
),
}
);
Ok(())
}
}
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
use common_meta::peer::Peer;
use common_meta::rpc::router::{Region, RegionRoute};
use store_api::region_engine::SyncRegionFromRequest;
use store_api::storage::RegionId;
use crate::error::Error;
use crate::procedure::repartition::group::GroupPrepareResult;
use crate::procedure::repartition::group::sync_region::SyncRegion;
use crate::procedure::repartition::test_util::{TestingEnv, new_persistent_context};
use crate::procedure::test_util::{new_sync_region_reply, send_mock_reply};
use crate::service::mailbox::Channel;
#[test]
fn test_build_sync_region_instructions() {
let table_id = 1024;
let central_region = RegionId::new(table_id, 1);
let region_routes = vec![RegionRoute {
region: Region {
id: RegionId::new(table_id, 3),
..Default::default()
},
leader_peer: Some(Peer::empty(1)),
..Default::default()
}];
let instructions =
SyncRegion::build_sync_region_instructions(central_region, &region_routes);
assert_eq!(instructions.len(), 1);
let peer_instructions = instructions.get(&Peer::empty(1)).unwrap();
assert_eq!(peer_instructions.len(), 1);
assert_eq!(peer_instructions[0].region_id, RegionId::new(table_id, 3));
let SyncRegionFromRequest::FromRegion {
source_region_id, ..
} = &peer_instructions[0].request
else {
panic!("expect from region request");
};
assert_eq!(*source_region_id, central_region);
}
fn test_prepare_result(table_id: u32) -> GroupPrepareResult {
GroupPrepareResult {
source_routes: vec![],
target_routes: vec![],
central_region: RegionId::new(table_id, 1),
central_region_datanode: Peer::empty(1),
}
}
#[tokio::test]
async fn test_sync_regions_all_successful() {
let mut env = TestingEnv::new();
let table_id = 1024;
let mut persistent_context = new_persistent_context(table_id, vec![], vec![]);
persistent_context.group_prepare_result = Some(test_prepare_result(table_id));
let (tx, rx) = tokio::sync::mpsc::channel(1);
env.mailbox_ctx
.insert_heartbeat_response_receiver(Channel::Datanode(1), tx)
.await;
send_mock_reply(env.mailbox_ctx.mailbox().clone(), rx, |id| {
Ok(new_sync_region_reply(
id,
RegionId::new(1024, 3),
true,
true,
None,
))
});
let mut ctx = env.create_context(persistent_context);
let region_routes = vec![RegionRoute {
region: Region {
id: RegionId::new(table_id, 3),
..Default::default()
},
leader_peer: Some(Peer::empty(1)),
..Default::default()
}];
let sync_region = SyncRegion { region_routes };
sync_region.sync_regions(&mut ctx).await.unwrap();
}
#[tokio::test]
async fn test_sync_regions_retryable() {
let env = TestingEnv::new();
let table_id = 1024;
let mut persistent_context = new_persistent_context(table_id, vec![], vec![]);
persistent_context.group_prepare_result = Some(test_prepare_result(table_id));
let mut ctx = env.create_context(persistent_context);
let region_routes = vec![RegionRoute {
region: Region {
id: RegionId::new(table_id, 3),
..Default::default()
},
leader_peer: Some(Peer::empty(1)),
..Default::default()
}];
let sync_region = SyncRegion { region_routes };
let err = sync_region.sync_regions(&mut ctx).await.unwrap_err();
assert_matches!(err, Error::RetryLater { .. });
}
}

View File

@@ -13,6 +13,7 @@
// limitations under the License.
pub(crate) mod apply_staging_region;
pub(crate) mod exit_staging_region;
pub(crate) mod rollback_staging_region;
use std::any::Any;
@@ -28,11 +29,14 @@ use crate::procedure::repartition::group::repartition_end::RepartitionEnd;
use crate::procedure::repartition::group::{Context, State};
#[derive(Debug, Serialize, Deserialize)]
#[allow(clippy::enum_variant_names)]
pub enum UpdateMetadata {
/// Applies the new partition expressions for staging regions.
ApplyStaging,
/// Rolls back the new partition expressions for staging regions.
RollbackStaging,
/// Exits the staging regions.
ExitStaging,
}
#[async_trait::async_trait]
@@ -62,7 +66,18 @@ impl State for UpdateMetadata {
if let Err(err) = ctx.invalidate_table_cache().await {
warn!(
"Failed to broadcast the invalidate table cache message during the rollback staging regions, error: {err:?}"
err;
"Failed to broadcast the invalidate table cache message during the rollback staging regions"
);
};
Ok((Box::new(RepartitionEnd), Status::executing(false)))
}
UpdateMetadata::ExitStaging => {
self.exit_staging_regions(ctx).await?;
if let Err(err) = ctx.invalidate_table_cache().await {
warn!(
err;
"Failed to broadcast the invalidate table cache message during the exit staging regions"
);
};
Ok((Box::new(RepartitionEnd), Status::executing(false)))

View File

@@ -16,7 +16,7 @@ use std::collections::HashMap;
use common_error::ext::BoxedError;
use common_meta::rpc::router::RegionRoute;
use common_telemetry::error;
use common_telemetry::{error, info};
use snafu::{OptionExt, ResultExt};
use crate::error::{self, Result};
@@ -77,7 +77,6 @@ impl UpdateMetadata {
/// - Source region not found.
/// - Failed to update the table route.
/// - Central region datanode table value not found.
#[allow(dead_code)]
pub(crate) async fn apply_staging_regions(&self, ctx: &mut Context) -> Result<()> {
let table_id = ctx.persistent_ctx.table_id;
let group_id = ctx.persistent_ctx.group_id;
@@ -90,6 +89,13 @@ impl UpdateMetadata {
region_routes,
)?;
let source_count = ctx.persistent_ctx.sources.len();
let target_count = ctx.persistent_ctx.targets.len();
info!(
"Apply staging regions for repartition, table_id: {}, group_id: {}, sources: {}, targets: {}",
table_id, group_id, source_count, target_count
);
if let Err(err) = ctx
.update_table_route(&current_table_route_value, new_region_routes)
.await

View File

@@ -0,0 +1,104 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use common_error::ext::BoxedError;
use common_meta::rpc::router::RegionRoute;
use common_telemetry::{error, info};
use snafu::{OptionExt, ResultExt};
use crate::error::{self, Result};
use crate::procedure::repartition::group::update_metadata::UpdateMetadata;
use crate::procedure::repartition::group::{Context, GroupId, region_routes};
use crate::procedure::repartition::plan::RegionDescriptor;
impl UpdateMetadata {
fn exit_staging_region_routes(
group_id: GroupId,
sources: &[RegionDescriptor],
targets: &[RegionDescriptor],
current_region_routes: &[RegionRoute],
) -> Result<Vec<RegionRoute>> {
let mut region_routes = current_region_routes.to_vec();
let mut region_routes_map = region_routes
.iter_mut()
.map(|route| (route.region.id, route))
.collect::<HashMap<_, _>>();
for target in targets {
let region_route = region_routes_map.get_mut(&target.region_id).context(
error::RepartitionTargetRegionMissingSnafu {
group_id,
region_id: target.region_id,
},
)?;
region_route.clear_leader_staging();
}
for source in sources {
let region_route = region_routes_map.get_mut(&source.region_id).context(
error::RepartitionSourceRegionMissingSnafu {
group_id,
region_id: source.region_id,
},
)?;
region_route.clear_leader_staging();
}
Ok(region_routes)
}
/// Exits the staging regions.
///
/// Abort:
/// - Table route is not physical.
/// - Target region not found.
/// - Source region not found.
/// - Failed to update the table route.
/// - Central region datanode table value not found.
pub(crate) async fn exit_staging_regions(&self, ctx: &mut Context) -> Result<()> {
let table_id = ctx.persistent_ctx.table_id;
let group_id = ctx.persistent_ctx.group_id;
let current_table_route_value = ctx.get_table_route_value().await?;
let region_routes = region_routes(table_id, current_table_route_value.get_inner_ref())?;
let new_region_routes = Self::exit_staging_region_routes(
group_id,
&ctx.persistent_ctx.sources,
&ctx.persistent_ctx.targets,
region_routes,
)?;
let source_count = ctx.persistent_ctx.sources.len();
let target_count = ctx.persistent_ctx.targets.len();
info!(
"Exit staging regions for repartition, table_id: {}, group_id: {}, sources: {}, targets: {}",
table_id, group_id, source_count, target_count
);
if let Err(err) = ctx
.update_table_route(&current_table_route_value, new_region_routes)
.await
{
error!(err; "Failed to update the table route during the updating metadata for repartition: {table_id}, group_id: {group_id}");
return Err(BoxedError::new(err)).context(error::RetryLaterWithSourceSnafu {
reason: format!(
"Failed to update the table route during the updating metadata for repartition: {table_id}, group_id: {group_id}"
),
});
};
Ok(())
}
}

View File

@@ -16,7 +16,7 @@ use std::collections::HashMap;
use common_error::ext::BoxedError;
use common_meta::rpc::router::RegionRoute;
use common_telemetry::error;
use common_telemetry::{error, info};
use snafu::{OptionExt, ResultExt};
use crate::error::{self, Result};
@@ -29,7 +29,6 @@ impl UpdateMetadata {
/// Abort:
/// - Source region not found.
/// - Target region not found.
#[allow(dead_code)]
fn rollback_staging_region_routes(
group_id: GroupId,
source_routes: &[RegionRoute],
@@ -74,7 +73,6 @@ impl UpdateMetadata {
/// - Target region not found.
/// - Failed to update the table route.
/// - Central region datanode table value not found.
#[allow(dead_code)]
pub(crate) async fn rollback_staging_regions(&self, ctx: &mut Context) -> Result<()> {
let table_id = ctx.persistent_ctx.table_id;
let group_id = ctx.persistent_ctx.group_id;
@@ -89,6 +87,13 @@ impl UpdateMetadata {
region_routes,
)?;
let source_count = prepare_result.source_routes.len();
let target_count = prepare_result.target_routes.len();
info!(
"Rollback staging regions for repartition, table_id: {}, group_id: {}, sources: {}, targets: {}",
table_id, group_id, source_count, target_count
);
if let Err(err) = ctx
.update_table_route(&current_table_route_value, new_region_routes)
.await

View File

@@ -16,6 +16,7 @@ use std::any::Any;
use common_meta::key::table_route::PhysicalTableRouteValue;
use common_procedure::{Context as ProcedureContext, Status};
use common_telemetry::debug;
use partition::expr::PartitionExpr;
use partition::subtask::{self, RepartitionSubtask};
use serde::{Deserialize, Serialize};
@@ -69,6 +70,17 @@ impl State for RepartitionStart {
);
let plans = Self::build_plan(&table_route, &self.from_exprs, &self.to_exprs)?;
let plan_count = plans.len();
let total_source_regions: usize = plans.iter().map(|p| p.source_regions.len()).sum();
let total_target_regions: usize =
plans.iter().map(|p| p.target_partition_exprs.len()).sum();
common_telemetry::info!(
"Repartition start, table_id: {}, plans: {}, total_source_regions: {}, total_target_regions: {}",
table_id,
plan_count,
total_source_regions,
total_target_regions
);
if plans.is_empty() {
return Ok((Box::new(RepartitionEnd), Status::done()));
@@ -86,7 +98,6 @@ impl State for RepartitionStart {
}
impl RepartitionStart {
#[allow(dead_code)]
fn build_plan(
physical_route: &PhysicalTableRouteValue,
from_exprs: &[PartitionExpr],
@@ -106,7 +117,6 @@ impl RepartitionStart {
))
}
#[allow(dead_code)]
fn build_plan_entries(
subtasks: Vec<RepartitionSubtask>,
source_index: &[RegionDescriptor],
@@ -159,8 +169,9 @@ impl RepartitionStart {
.find_map(|(region_id, existing_expr)| {
(existing_expr == &expr_json).then_some(*region_id)
})
.with_context(|| error::RepartitionSourceExprMismatchSnafu {
expr: expr_json,
.with_context(|| error::RepartitionSourceExprMismatchSnafu { expr: &expr_json })
.inspect_err(|_| {
debug!("Failed to find matching region for partition expression: {}, existing regions: {:?}", expr_json, existing_regions);
})?;
Ok(RegionDescriptor {

View File

@@ -96,5 +96,8 @@ pub fn new_persistent_context(
region_mapping: HashMap::new(),
group_prepare_result: None,
staging_manifest_paths: HashMap::new(),
sync_region: false,
allocated_region_ids: vec![],
pending_deallocate_region_ids: vec![],
}
}

View File

@@ -18,7 +18,8 @@ use api::v1::meta::mailbox_message::Payload;
use api::v1::meta::{HeartbeatResponse, MailboxMessage};
use common_meta::instruction::{
DowngradeRegionReply, DowngradeRegionsReply, EnterStagingRegionReply, EnterStagingRegionsReply,
FlushRegionReply, InstructionReply, SimpleReply, UpgradeRegionReply, UpgradeRegionsReply,
FlushRegionReply, InstructionReply, SimpleReply, SyncRegionReply, SyncRegionsReply,
UpgradeRegionReply, UpgradeRegionsReply,
};
use common_meta::key::TableMetadataManagerRef;
use common_meta::key::table_route::TableRouteValue;
@@ -253,6 +254,34 @@ pub fn new_enter_staging_region_reply(
}
}
/// Generates a [InstructionReply::SyncRegions] reply.
pub fn new_sync_region_reply(
id: u64,
region_id: RegionId,
ready: bool,
exists: bool,
error: Option<String>,
) -> MailboxMessage {
MailboxMessage {
id,
subject: "mock".to_string(),
from: "datanode".to_string(),
to: "meta".to_string(),
timestamp_millis: current_time_millis(),
payload: Some(Payload::Json(
serde_json::to_string(&InstructionReply::SyncRegions(SyncRegionsReply::new(vec![
SyncRegionReply {
region_id,
ready,
exists,
error,
},
])))
.unwrap(),
)),
}
}
/// Mock the test data for WAL pruning.
pub async fn new_wal_prune_metadata(
table_metadata_manager: TableMetadataManagerRef,

View File

@@ -12,6 +12,185 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::time::Duration;
use api::v1::meta::MailboxMessage;
use common_meta::instruction::{FlushErrorStrategy, FlushRegions, Instruction, InstructionReply};
use common_meta::peer::Peer;
use common_telemetry::{info, warn};
use snafu::ResultExt;
use store_api::storage::RegionId;
use tokio::time::Instant;
use crate::error::{self, Error, Result};
use crate::handler::HeartbeatMailbox;
use crate::service::mailbox::{Channel, MailboxRef};
pub(crate) enum ErrorStrategy {
Ignore,
Retry,
}
fn handle_flush_region_reply(
reply: &InstructionReply,
region_ids: &[RegionId],
msg: &MailboxMessage,
) -> Result<(bool, Option<String>)> {
let result = match reply {
InstructionReply::FlushRegions(flush_reply) => {
if flush_reply.results.len() != region_ids.len() {
return error::UnexpectedInstructionReplySnafu {
mailbox_message: msg.to_string(),
reason: format!(
"expect {} region flush result, but got {}",
region_ids.len(),
flush_reply.results.len()
),
}
.fail();
}
match flush_reply.overall_success {
true => (true, None),
false => (
false,
Some(
flush_reply
.results
.iter()
.filter_map(|(region_id, result)| match result {
Ok(_) => None,
Err(e) => Some(format!("{}: {:?}", region_id, e)),
})
.collect::<Vec<String>>()
.join("; "),
),
),
}
}
_ => {
return error::UnexpectedInstructionReplySnafu {
mailbox_message: msg.to_string(),
reason: "expect flush region reply",
}
.fail();
}
};
Ok(result)
}
/// Flushes the regions on the datanode.
///
/// Retry Or Ignore:
/// - [PusherNotFound](error::Error::PusherNotFound), The datanode is unreachable.
/// - Failed to flush region on the Datanode.
///
/// Abort:
/// - [MailboxTimeout](error::Error::MailboxTimeout), Timeout.
/// - [UnexpectedInstructionReply](error::Error::UnexpectedInstructionReply).
/// - [ExceededDeadline](error::Error::ExceededDeadline)
/// - Invalid JSON.
pub(crate) async fn flush_region(
mailbox: &MailboxRef,
server_addr: &str,
region_ids: &[RegionId],
datanode: &Peer,
timeout: Duration,
error_strategy: ErrorStrategy,
) -> Result<()> {
let flush_instruction = Instruction::FlushRegions(FlushRegions::sync_batch(
region_ids.to_vec(),
FlushErrorStrategy::TryAll,
));
let msg = MailboxMessage::json_message(
&format!("Flush regions: {:?}", region_ids),
&format!("Metasrv@{}", server_addr),
&format!("Datanode-{}@{}", datanode.id, datanode.addr),
common_time::util::current_time_millis(),
&flush_instruction,
)
.with_context(|_| error::SerializeToJsonSnafu {
input: flush_instruction.to_string(),
})?;
let ch = Channel::Datanode(datanode.id);
let now = Instant::now();
let receiver = mailbox.send(&ch, msg, timeout).await;
let receiver = match receiver {
Ok(receiver) => receiver,
Err(error::Error::PusherNotFound { .. }) => match error_strategy {
ErrorStrategy::Ignore => {
warn!(
"Failed to flush regions({:?}), the datanode({}) is unreachable(PusherNotFound). Skip flush operation.",
region_ids, datanode
);
return Ok(());
}
ErrorStrategy::Retry => error::RetryLaterSnafu {
reason: format!(
"Pusher not found for flush regions on datanode {:?}, elapsed: {:?}",
datanode,
now.elapsed()
),
}
.fail()?,
},
Err(err) => {
return Err(err);
}
};
match receiver.await {
Ok(msg) => {
let reply = HeartbeatMailbox::json_reply(&msg)?;
info!(
"Received flush region reply: {:?}, regions: {:?}, elapsed: {:?}",
reply,
region_ids,
now.elapsed()
);
let (result, error) = handle_flush_region_reply(&reply, region_ids, &msg)?;
if let Some(error) = error {
match error_strategy {
ErrorStrategy::Ignore => {
warn!(
"Failed to flush regions {:?}, the datanode({}) error is ignored: {}",
region_ids, datanode, error
);
}
ErrorStrategy::Retry => {
return error::RetryLaterSnafu {
reason: format!(
"Failed to flush regions {:?}, the datanode({}) error is retried: {}",
region_ids,
datanode,
error,
),
}
.fail()?;
}
}
} else if result {
info!(
"The flush regions {:?} on datanode {:?} is successful, elapsed: {:?}",
region_ids,
datanode,
now.elapsed()
);
}
Ok(())
}
Err(Error::MailboxTimeout { .. }) => error::ExceededDeadlineSnafu {
operation: "Flush regions",
}
.fail(),
Err(err) => Err(err),
}
}
#[cfg(any(test, feature = "mock"))]
pub mod mock {
use std::io::Error;

View File

@@ -14,7 +14,7 @@
//! Drop a metric region
use common_telemetry::info;
use common_telemetry::{debug, info};
use snafu::ResultExt;
use store_api::region_engine::RegionEngine;
use store_api::region_request::{AffectedRows, RegionDropRequest, RegionRequest};
@@ -46,6 +46,15 @@ impl MetricEngineInner {
.physical_region_states()
.get(&data_region_id)
{
debug!(
"Physical region {} is busy, there are still some logical regions: {:?}",
data_region_id,
state
.logical_regions()
.iter()
.map(|id| id.to_string())
.collect::<Vec<_>>()
);
(true, !state.logical_regions().is_empty())
} else {
// the second argument is not used, just pass in a dummy value

View File

@@ -35,7 +35,7 @@ use index::result_cache::IndexResultCache;
use moka::notification::RemovalCause;
use moka::sync::Cache;
use object_store::ObjectStore;
use parquet::file::metadata::ParquetMetaData;
use parquet::file::metadata::{PageIndexPolicy, ParquetMetaData};
use puffin::puffin_manager::cache::{PuffinMetadataCache, PuffinMetadataCacheRef};
use store_api::storage::{ConcreteDataType, FileId, RegionId, TimeSeriesRowSelector};
@@ -85,13 +85,13 @@ impl CacheStrategy {
&self,
file_id: RegionFileId,
metrics: &mut MetadataCacheMetrics,
page_index_policy: PageIndexPolicy,
) -> Option<Arc<ParquetMetaData>> {
match self {
CacheStrategy::EnableAll(cache_manager) => {
cache_manager.get_parquet_meta_data(file_id, metrics).await
}
CacheStrategy::Compaction(cache_manager) => {
cache_manager.get_parquet_meta_data(file_id, metrics).await
CacheStrategy::EnableAll(cache_manager) | CacheStrategy::Compaction(cache_manager) => {
cache_manager
.get_parquet_meta_data(file_id, metrics, page_index_policy)
.await
}
CacheStrategy::Disabled => {
metrics.cache_miss += 1;
@@ -340,6 +340,7 @@ impl CacheManager {
&self,
file_id: RegionFileId,
metrics: &mut MetadataCacheMetrics,
page_index_policy: PageIndexPolicy,
) -> Option<Arc<ParquetMetaData>> {
// Try to get metadata from sst meta cache
if let Some(metadata) = self.get_parquet_meta_data_from_mem_cache(file_id) {
@@ -352,7 +353,7 @@ impl CacheManager {
if let Some(write_cache) = &self.write_cache
&& let Some(metadata) = write_cache
.file_cache()
.get_parquet_meta_data(key, metrics)
.get_parquet_meta_data(key, metrics, page_index_policy)
.await
{
metrics.file_cache_hit += 1;
@@ -893,7 +894,7 @@ mod tests {
cache.put_parquet_meta_data(file_id, metadata);
assert!(
cache
.get_parquet_meta_data(file_id, &mut metrics)
.get_parquet_meta_data(file_id, &mut metrics, Default::default())
.await
.is_none()
);
@@ -923,7 +924,7 @@ mod tests {
let file_id = RegionFileId::new(region_id, FileId::random());
assert!(
cache
.get_parquet_meta_data(file_id, &mut metrics)
.get_parquet_meta_data(file_id, &mut metrics, Default::default())
.await
.is_none()
);
@@ -931,14 +932,14 @@ mod tests {
cache.put_parquet_meta_data(file_id, metadata);
assert!(
cache
.get_parquet_meta_data(file_id, &mut metrics)
.get_parquet_meta_data(file_id, &mut metrics, Default::default())
.await
.is_some()
);
cache.remove_parquet_meta_data(file_id);
assert!(
cache
.get_parquet_meta_data(file_id, &mut metrics)
.get_parquet_meta_data(file_id, &mut metrics, Default::default())
.await
.is_none()
);

View File

@@ -16,11 +16,13 @@
use std::mem;
use parquet::basic::ColumnOrder;
use parquet::file::metadata::{
FileMetaData, ParquetColumnIndex, ParquetMetaData, ParquetOffsetIndex, RowGroupMetaData,
FileMetaData, KeyValue, ParquetColumnIndex, ParquetMetaData, ParquetOffsetIndex,
RowGroupMetaData,
};
use parquet::file::page_index::index::Index;
use parquet::format::{ColumnOrder, KeyValue, PageLocation};
use parquet::file::page_index::column_index::ColumnIndexMetaData as Index;
use parquet::file::page_index::offset_index::PageLocation;
use parquet::schema::types::{ColumnDescriptor, SchemaDescriptor, Type};
/// Returns estimated size of [ParquetMetaData].

View File

@@ -28,7 +28,7 @@ use moka::notification::RemovalCause;
use moka::policy::EvictionPolicy;
use object_store::util::join_path;
use object_store::{ErrorKind, ObjectStore, Reader};
use parquet::file::metadata::ParquetMetaData;
use parquet::file::metadata::{PageIndexPolicy, ParquetMetaData};
use snafu::ResultExt;
use store_api::storage::{FileId, RegionId};
use tokio::sync::mpsc::{Sender, UnboundedReceiver};
@@ -571,6 +571,7 @@ impl FileCache {
&self,
key: IndexKey,
cache_metrics: &mut MetadataCacheMetrics,
page_index_policy: PageIndexPolicy,
) -> Option<ParquetMetaData> {
// Check if file cache contains the key
if let Some(index_value) = self.inner.parquet_index.get(&key).await {
@@ -578,7 +579,8 @@ impl FileCache {
let local_store = self.local_store();
let file_path = self.inner.cache_file_path(key);
let file_size = index_value.file_size as u64;
let metadata_loader = MetadataLoader::new(local_store, &file_path, file_size);
let mut metadata_loader = MetadataLoader::new(local_store, &file_path, file_size);
metadata_loader.with_page_index_policy(page_index_policy);
match metadata_loader.load(cache_metrics).await {
Ok(metadata) => {

View File

@@ -24,6 +24,7 @@ use object_store::services::Fs;
use parquet::arrow::ArrowWriter;
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
use parquet::file::metadata::ParquetMetaData;
use parquet::file::statistics::Statistics;
/// Returns a parquet meta data.
pub(crate) fn parquet_meta() -> Arc<ParquetMetaData> {
@@ -49,3 +50,60 @@ pub(crate) fn new_fs_store(path: &str) -> ObjectStore {
let builder = Fs::default();
ObjectStore::new(builder.root(path)).unwrap().finish()
}
pub(crate) fn assert_parquet_metadata_equal(x: Arc<ParquetMetaData>, y: Arc<ParquetMetaData>) {
// Normalize the statistics in parquet metadata because the flag "min_max_backwards_compatible"
// is not persisted across parquet metadata writer and reader.
fn normalize_statistics(metadata: ParquetMetaData) -> ParquetMetaData {
let unset_min_max_backwards_compatible_flag = |stats: Statistics| -> Statistics {
match stats {
Statistics::Boolean(stats) => {
Statistics::Boolean(stats.with_backwards_compatible_min_max(false))
}
Statistics::Int32(stats) => {
Statistics::Int32(stats.with_backwards_compatible_min_max(false))
}
Statistics::Int64(stats) => {
Statistics::Int64(stats.with_backwards_compatible_min_max(false))
}
Statistics::Int96(stats) => {
Statistics::Int96(stats.with_backwards_compatible_min_max(false))
}
Statistics::Float(stats) => {
Statistics::Float(stats.with_backwards_compatible_min_max(false))
}
Statistics::Double(stats) => {
Statistics::Double(stats.with_backwards_compatible_min_max(false))
}
Statistics::ByteArray(stats) => {
Statistics::ByteArray(stats.with_backwards_compatible_min_max(false))
}
Statistics::FixedLenByteArray(stats) => {
Statistics::FixedLenByteArray(stats.with_backwards_compatible_min_max(false))
}
}
};
let mut metadata_builder = metadata.into_builder();
for rg in metadata_builder.take_row_groups() {
let mut rg_builder = rg.into_builder();
for col in rg_builder.take_columns() {
let stats = col
.statistics()
.cloned()
.map(unset_min_max_backwards_compatible_flag);
let mut col_builder = col.into_builder().clear_statistics();
if let Some(stats) = stats {
col_builder = col_builder.set_statistics(stats);
}
rg_builder = rg_builder.add_column_metadata(col_builder.build().unwrap());
}
metadata_builder = metadata_builder.add_row_group(rg_builder.build().unwrap());
}
metadata_builder.build()
}
let x = normalize_statistics(Arc::unwrap_or_clone(x));
let y = normalize_statistics(Arc::unwrap_or_clone(y));
assert_eq!(x, y);
}

View File

@@ -470,11 +470,12 @@ impl UploadTracker {
mod tests {
use common_test_util::temp_dir::create_temp_dir;
use object_store::ATOMIC_WRITE_DIR;
use parquet::file::metadata::PageIndexPolicy;
use store_api::region_request::PathType;
use super::*;
use crate::access_layer::OperationType;
use crate::cache::test_util::new_fs_store;
use crate::cache::test_util::{assert_parquet_metadata_equal, new_fs_store};
use crate::cache::{CacheManager, CacheStrategy};
use crate::error::InvalidBatchSnafu;
use crate::read::Source;
@@ -482,8 +483,7 @@ mod tests {
use crate::sst::parquet::reader::ParquetReaderBuilder;
use crate::test_util::TestEnv;
use crate::test_util::sst_util::{
assert_parquet_metadata_eq, new_batch_by_range, new_source, sst_file_handle_with_file_id,
sst_region_metadata,
new_batch_by_range, new_source, sst_file_handle_with_file_id, sst_region_metadata,
};
#[tokio::test]
@@ -652,11 +652,12 @@ mod tests {
handle.clone(),
mock_store.clone(),
)
.cache(CacheStrategy::EnableAll(cache_manager.clone()));
.cache(CacheStrategy::EnableAll(cache_manager.clone()))
.page_index_policy(PageIndexPolicy::Optional);
let reader = builder.build().await.unwrap();
// Check parquet metadata
assert_parquet_metadata_eq(write_parquet_metadata, reader.parquet_metadata());
assert_parquet_metadata_equal(write_parquet_metadata, reader.parquet_metadata());
}
#[tokio::test]

View File

@@ -601,14 +601,6 @@ pub enum Error {
location: Location,
},
#[snafu(display("Invalid file metadata"))]
ConvertMetaData {
#[snafu(implicit)]
location: Location,
#[snafu(source)]
error: parquet::errors::ParquetError,
},
#[snafu(display("Column not found, column: {column}"))]
ColumnNotFound {
column: String,
@@ -1284,7 +1276,6 @@ impl ErrorExt for Error {
| Join { .. }
| WorkerStopped { .. }
| Recv { .. }
| ConvertMetaData { .. }
| DecodeWal { .. }
| ComputeArrow { .. }
| BiErrors { .. }

View File

@@ -71,7 +71,6 @@ use crate::sst::index::IndexOutput;
use crate::sst::parquet::file_range::{PreFilterMode, row_group_contains_delete};
use crate::sst::parquet::flat_format::primary_key_column_index;
use crate::sst::parquet::format::{PrimaryKeyArray, PrimaryKeyArrayBuilder, ReadFormat};
use crate::sst::parquet::helper::parse_parquet_metadata;
use crate::sst::parquet::{PARQUET_METADATA_KEY, SstInfo};
use crate::sst::{SeriesEstimator, to_sst_arrow_schema};
@@ -1197,7 +1196,7 @@ impl BulkPartEncoder {
metrics.num_rows += total_rows;
let buf = Bytes::from(buf);
let parquet_metadata = Arc::new(parse_parquet_metadata(file_metadata)?);
let parquet_metadata = Arc::new(file_metadata);
let num_series = series_estimator.finish();
Ok(Some(EncodedBulkPart {
@@ -1232,7 +1231,7 @@ impl BulkPartEncoder {
};
let buf = Bytes::from(buf);
let parquet_metadata = Arc::new(parse_parquet_metadata(file_metadata)?);
let parquet_metadata = Arc::new(file_metadata);
Ok(Some(EncodedBulkPart {
data: buf,

View File

@@ -314,11 +314,8 @@ impl MitoRegion {
/// Sets the dropping state.
/// You should call this method in the worker loop.
pub(crate) fn set_dropping(&self) -> Result<()> {
self.compare_exchange_state(
RegionLeaderState::Writable,
RegionRoleState::Leader(RegionLeaderState::Dropping),
)
pub(crate) fn set_dropping(&self, expect: RegionLeaderState) -> Result<()> {
self.compare_exchange_state(expect, RegionRoleState::Leader(RegionLeaderState::Dropping))
}
/// Sets the truncating state.

View File

@@ -1174,9 +1174,8 @@ pub(crate) fn decode_primary_keys_with_counts(
let mut result: Vec<(CompositeValues, usize)> = Vec::new();
let mut prev_key: Option<u32> = None;
for i in 0..keys.len() {
let current_key = keys.value(i);
let pk_indices = keys.values();
for &current_key in pk_indices.iter().take(keys.len()) {
// Checks if current key is the same as previous key
if let Some(prev) = prev_key
&& prev == current_key

View File

@@ -115,7 +115,7 @@ mod tests {
use object_store::ObjectStore;
use parquet::arrow::AsyncArrowWriter;
use parquet::basic::{Compression, Encoding, ZstdLevel};
use parquet::file::metadata::KeyValue;
use parquet::file::metadata::{KeyValue, PageIndexPolicy};
use parquet::file::properties::WriterProperties;
use store_api::codec::PrimaryKeyEncoding;
use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder};
@@ -126,6 +126,7 @@ mod tests {
use super::*;
use crate::access_layer::{FilePathProvider, Metrics, RegionFilePathFactory, WriteType};
use crate::cache::test_util::assert_parquet_metadata_equal;
use crate::cache::{CacheManager, CacheStrategy, PageKey};
use crate::config::IndexConfig;
use crate::read::{BatchBuilder, BatchReader, FlatSource};
@@ -143,9 +144,9 @@ mod tests {
DEFAULT_WRITE_CONCURRENCY, FlatSchemaOptions, location, to_flat_sst_arrow_schema,
};
use crate::test_util::sst_util::{
assert_parquet_metadata_eq, build_test_binary_test_region_metadata, new_batch_by_range,
new_batch_with_binary, new_batch_with_custom_sequence, new_primary_key, new_source,
new_sparse_primary_key, sst_file_handle, sst_file_handle_with_file_id, sst_region_metadata,
build_test_binary_test_region_metadata, new_batch_by_range, new_batch_with_binary,
new_batch_with_custom_sequence, new_primary_key, new_source, new_sparse_primary_key,
sst_file_handle, sst_file_handle_with_file_id, sst_region_metadata,
sst_region_metadata_with_encoding,
};
use crate::test_util::{TestEnv, check_reader_result};
@@ -377,11 +378,12 @@ mod tests {
PathType::Bare,
handle.clone(),
object_store,
);
)
.page_index_policy(PageIndexPolicy::Optional);
let reader = builder.build().await.unwrap();
let reader_metadata = reader.parquet_metadata();
assert_parquet_metadata_eq(writer_metadata, reader_metadata)
assert_parquet_metadata_equal(writer_metadata, reader_metadata);
}
#[tokio::test]

View File

@@ -563,9 +563,8 @@ pub(crate) fn decode_primary_keys(
// The parquet reader may read the whole dictionary page into the dictionary values, so
// we may decode many primary keys not in this batch if we decode the values array directly.
for i in 0..keys.len() {
let current_key = keys.value(i);
let pk_indices = keys.values();
for &current_key in pk_indices.iter().take(keys.len()) {
// Check if current key is the same as previous key
if let Some(prev) = prev_key
&& prev == current_key

View File

@@ -13,82 +13,11 @@
// limitations under the License.
use std::ops::Range;
use std::sync::Arc;
use std::time::Instant;
use bytes::Bytes;
use common_telemetry::trace;
use object_store::ObjectStore;
use parquet::basic::ColumnOrder;
use parquet::file::metadata::{FileMetaData, ParquetMetaData, RowGroupMetaData};
use parquet::format;
use parquet::schema::types::{SchemaDescriptor, from_thrift};
use snafu::ResultExt;
use crate::error;
use crate::error::Result;
// Refer to https://github.com/apache/arrow-rs/blob/7e134f4d277c0b62c27529fc15a4739de3ad0afd/parquet/src/file/footer.rs#L74-L90
/// Convert [format::FileMetaData] to [ParquetMetaData]
pub fn parse_parquet_metadata(t_file_metadata: format::FileMetaData) -> Result<ParquetMetaData> {
let schema = from_thrift(&t_file_metadata.schema).context(error::ConvertMetaDataSnafu)?;
let schema_desc_ptr = Arc::new(SchemaDescriptor::new(schema));
let mut row_groups = Vec::with_capacity(t_file_metadata.row_groups.len());
for rg in t_file_metadata.row_groups {
row_groups.push(
RowGroupMetaData::from_thrift(schema_desc_ptr.clone(), rg)
.context(error::ConvertMetaDataSnafu)?,
);
}
let column_orders = parse_column_orders(t_file_metadata.column_orders, &schema_desc_ptr);
let file_metadata = FileMetaData::new(
t_file_metadata.version,
t_file_metadata.num_rows,
t_file_metadata.created_by,
t_file_metadata.key_value_metadata,
schema_desc_ptr,
column_orders,
);
// There may be a problem owing to lacking of column_index and offset_index,
// if we open page index in the future.
Ok(ParquetMetaData::new(file_metadata, row_groups))
}
// Port from https://github.com/apache/arrow-rs/blob/7e134f4d277c0b62c27529fc15a4739de3ad0afd/parquet/src/file/footer.rs#L106-L137
/// Parses column orders from Thrift definition.
/// If no column orders are defined, returns `None`.
fn parse_column_orders(
t_column_orders: Option<Vec<format::ColumnOrder>>,
schema_descr: &SchemaDescriptor,
) -> Option<Vec<ColumnOrder>> {
match t_column_orders {
Some(orders) => {
// Should always be the case
assert_eq!(
orders.len(),
schema_descr.num_columns(),
"Column order length mismatch"
);
let mut res = Vec::with_capacity(schema_descr.num_columns());
for (i, column) in schema_descr.columns().iter().enumerate() {
match orders[i] {
format::ColumnOrder::TYPEORDER(_) => {
let sort_order = ColumnOrder::get_sort_order(
column.logical_type(),
column.converted_type(),
column.physical_type(),
);
res.push(ColumnOrder::TYPE_DEFINED_ORDER(sort_order));
}
}
}
Some(res)
}
None => None,
}
}
const FETCH_PARALLELISM: usize = 8;
pub(crate) const MERGE_GAP: usize = 512 * 1024;

View File

@@ -21,7 +21,7 @@ use futures::future::BoxFuture;
use object_store::ObjectStore;
use parquet::arrow::async_reader::MetadataFetch;
use parquet::errors::{ParquetError, Result as ParquetResult};
use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataReader};
use parquet::file::metadata::{PageIndexPolicy, ParquetMetaData, ParquetMetaDataReader};
use snafu::{IntoError as _, ResultExt};
use crate::error::{self, Result};
@@ -37,6 +37,7 @@ pub(crate) struct MetadataLoader<'a> {
file_path: &'a str,
// The size of parquet file
file_size: u64,
page_index_policy: PageIndexPolicy,
}
impl<'a> MetadataLoader<'a> {
@@ -50,9 +51,14 @@ impl<'a> MetadataLoader<'a> {
object_store,
file_path,
file_size,
page_index_policy: Default::default(),
}
}
pub(crate) fn with_page_index_policy(&mut self, page_index_policy: PageIndexPolicy) {
self.page_index_policy = page_index_policy;
}
/// Get the size of parquet file. If file_size is 0, stat the object store to get the size.
async fn get_file_size(&self) -> Result<u64> {
let file_size = match self.file_size {
@@ -70,8 +76,9 @@ impl<'a> MetadataLoader<'a> {
pub async fn load(&self, cache_metrics: &mut MetadataCacheMetrics) -> Result<ParquetMetaData> {
let path = self.file_path;
let file_size = self.get_file_size().await?;
let reader =
ParquetMetaDataReader::new().with_prefetch_hint(Some(DEFAULT_PREFETCH_SIZE as usize));
let reader = ParquetMetaDataReader::new()
.with_prefetch_hint(Some(DEFAULT_PREFETCH_SIZE as usize))
.with_page_index_policy(self.page_index_policy);
let num_reads = AtomicUsize::new(0);
let bytes_read = AtomicU64::new(0);

View File

@@ -33,8 +33,7 @@ use mito_codec::row_converter::build_primary_key_codec;
use object_store::ObjectStore;
use parquet::arrow::arrow_reader::{ParquetRecordBatchReader, RowSelection};
use parquet::arrow::{FieldLevels, ProjectionMask, parquet_to_arrow_field_levels};
use parquet::file::metadata::ParquetMetaData;
use parquet::format::KeyValue;
use parquet::file::metadata::{KeyValue, PageIndexPolicy, ParquetMetaData};
use snafu::{OptionExt, ResultExt};
use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataRef};
use store_api::region_request::PathType;
@@ -142,6 +141,7 @@ pub struct ParquetReaderBuilder {
pre_filter_mode: PreFilterMode,
/// Whether to decode primary key values eagerly when reading primary key format SSTs.
decode_primary_key_values: bool,
page_index_policy: PageIndexPolicy,
}
impl ParquetReaderBuilder {
@@ -172,6 +172,7 @@ impl ParquetReaderBuilder {
compaction: false,
pre_filter_mode: PreFilterMode::All,
decode_primary_key_values: false,
page_index_policy: Default::default(),
}
}
@@ -276,6 +277,12 @@ impl ParquetReaderBuilder {
self
}
#[must_use]
pub fn page_index_policy(mut self, page_index_policy: PageIndexPolicy) -> Self {
self.page_index_policy = page_index_policy;
self
}
/// Builds a [ParquetReader].
///
/// This needs to perform IO operation.
@@ -314,7 +321,12 @@ impl ParquetReaderBuilder {
// Loads parquet metadata of the file.
let (parquet_meta, cache_miss) = self
.read_parquet_metadata(&file_path, file_size, &mut metrics.metadata_cache_metrics)
.read_parquet_metadata(
&file_path,
file_size,
&mut metrics.metadata_cache_metrics,
self.page_index_policy,
)
.await?;
// Decodes region metadata.
let key_value_meta = parquet_meta.file_metadata().key_value_metadata();
@@ -479,6 +491,7 @@ impl ParquetReaderBuilder {
file_path: &str,
file_size: u64,
cache_metrics: &mut MetadataCacheMetrics,
page_index_policy: PageIndexPolicy,
) -> Result<(Arc<ParquetMetaData>, bool)> {
let start = Instant::now();
let _t = READ_STAGE_ELAPSED
@@ -489,7 +502,7 @@ impl ParquetReaderBuilder {
// Tries to get from cache with metrics tracking.
if let Some(metadata) = self
.cache_strategy
.get_parquet_meta_data(file_id, cache_metrics)
.get_parquet_meta_data(file_id, cache_metrics, page_index_policy)
.await
{
cache_metrics.metadata_load_cost += start.elapsed();
@@ -497,7 +510,9 @@ impl ParquetReaderBuilder {
}
// Cache miss, load metadata directly.
let metadata_loader = MetadataLoader::new(self.object_store.clone(), file_path, file_size);
let mut metadata_loader =
MetadataLoader::new(self.object_store.clone(), file_path, file_size);
metadata_loader.with_page_index_policy(page_index_policy);
let metadata = metadata_loader.load(cache_metrics).await?;
let metadata = Arc::new(metadata);

View File

@@ -55,7 +55,6 @@ use crate::sst::file::RegionFileId;
use crate::sst::index::{IndexOutput, Indexer, IndexerBuilder};
use crate::sst::parquet::flat_format::{FlatWriteFormat, time_index_column_index};
use crate::sst::parquet::format::PrimaryKeyWriteFormat;
use crate::sst::parquet::helper::parse_parquet_metadata;
use crate::sst::parquet::{PARQUET_METADATA_KEY, SstInfo, WriteOptions};
use crate::sst::{
DEFAULT_WRITE_BUFFER_SIZE, DEFAULT_WRITE_CONCURRENCY, FlatSchemaOptions, SeriesEstimator,
@@ -205,14 +204,12 @@ where
}
current_writer.flush().await.context(WriteParquetSnafu)?;
let file_meta = current_writer.close().await.context(WriteParquetSnafu)?;
let parquet_metadata = current_writer.close().await.context(WriteParquetSnafu)?;
let file_size = self.bytes_written.load(Ordering::Relaxed) as u64;
// Safety: num rows > 0 so we must have min/max.
let time_range = stats.time_range.unwrap();
// convert FileMetaData to ParquetMetaData
let parquet_metadata = parse_parquet_metadata(file_meta)?;
let max_row_group_uncompressed_size: u64 = parquet_metadata
.row_groups()
.iter()

View File

@@ -23,7 +23,6 @@ use datatypes::prelude::ConcreteDataType;
use datatypes::schema::{ColumnSchema, SkippingIndexOptions};
use datatypes::value::ValueRef;
use mito_codec::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt, SortField};
use parquet::file::metadata::ParquetMetaData;
use store_api::metadata::{
ColumnMetadata, RegionMetadata, RegionMetadataBuilder, RegionMetadataRef,
};
@@ -277,30 +276,6 @@ pub fn new_batch_with_binary(tags: &[&str], start: usize, end: usize) -> Batch {
builder.build().unwrap()
}
/// ParquetMetaData doesn't implement `PartialEq` trait, check internal fields manually
pub fn assert_parquet_metadata_eq(a: Arc<ParquetMetaData>, b: Arc<ParquetMetaData>) {
macro_rules! assert_metadata {
( $a:expr, $b:expr, $($method:ident,)+ ) => {
$(
assert_eq!($a.$method(), $b.$method());
)+
}
}
assert_metadata!(
a.file_metadata(),
b.file_metadata(),
version,
num_rows,
created_by,
key_value_metadata,
schema_descr,
column_orders,
);
assert_metadata!(a, b, row_groups, column_index, offset_index,);
}
/// Creates a new region metadata for testing SSTs with binary datatype.
///
/// Schema: tag_0(string), field_0(binary), ts

View File

@@ -31,7 +31,7 @@ impl<S> RegionWorkerLoop<S> {
let region_id = request.region_id;
let source_region_id = request.source_region_id;
let sender = request.sender;
let region = match self.regions.writable_region(region_id) {
let region = match self.regions.writable_non_staging_region(region_id) {
Ok(region) => region,
Err(e) => {
let _ = sender.send(Err(e));

View File

@@ -42,12 +42,18 @@ where
&mut self,
region_id: RegionId,
) -> Result<AffectedRows> {
let region = self.regions.writable_non_staging_region(region_id)?;
let region = self.regions.writable_region(region_id)?;
info!("Try to drop region: {}, worker: {}", region_id, self.id);
let is_staging = region.is_staging();
let expect_state = if is_staging {
RegionLeaderState::Staging
} else {
RegionLeaderState::Writable
};
// Marks the region as dropping.
region.set_dropping()?;
region.set_dropping(expect_state)?;
// Writes dropping marker
// We rarely drop a region so we still operate in the worker loop.
let region_dir = region.access_layer.build_region_dir(region_id);

View File

@@ -21,6 +21,7 @@ use std::num::NonZeroU64;
use std::sync::Arc;
use common_telemetry::{info, warn};
use parquet::file::metadata::PageIndexPolicy;
use store_api::logstore::LogStore;
use store_api::metadata::RegionMetadataRef;
use store_api::storage::RegionId;
@@ -523,7 +524,11 @@ async fn edit_region(
let mut cache_metrics = Default::default();
let _ = write_cache
.file_cache()
.get_parquet_meta_data(index_key, &mut cache_metrics)
.get_parquet_meta_data(
index_key,
&mut cache_metrics,
PageIndexPolicy::Optional,
)
.await;
listener.on_file_cache_filled(index_key.file_id);

View File

@@ -33,6 +33,7 @@ use common_telemetry::{debug, tracing};
use datafusion::datasource::physical_plan::{CsvSource, FileSource, JsonSource};
use datafusion::parquet::arrow::ParquetRecordBatchStreamBuilder;
use datafusion::parquet::arrow::arrow_reader::ArrowReaderMetadata;
use datafusion_common::config::CsvOptions;
use datafusion_expr::Expr;
use datatypes::arrow::compute::can_cast_types;
use datatypes::arrow::datatypes::{DataType as ArrowDataType, Schema, SchemaRef};
@@ -214,13 +215,15 @@ impl StatementExecutor {
.context(error::ProjectSchemaSnafu)?,
);
let csv_source = CsvSource::new(format.has_header, format.delimiter, b'"')
.with_schema(schema.clone())
let options = CsvOptions::default()
.with_has_header(format.has_header)
.with_delimiter(format.delimiter);
let csv_source = CsvSource::new(schema.clone())
.with_csv_options(options)
.with_batch_size(DEFAULT_BATCH_SIZE);
let stream = file_to_stream(
object_store,
path,
schema.clone(),
csv_source,
Some(projection),
format.compression_type,
@@ -247,13 +250,11 @@ impl StatementExecutor {
.context(error::ProjectSchemaSnafu)?,
);
let json_source = JsonSource::new()
.with_schema(schema.clone())
.with_batch_size(DEFAULT_BATCH_SIZE);
let json_source =
JsonSource::new(schema.clone()).with_batch_size(DEFAULT_BATCH_SIZE);
let stream = file_to_stream(
object_store,
path,
schema.clone(),
json_source,
Some(projection),
format.compression_type,

View File

@@ -34,7 +34,7 @@ use datafusion::physical_plan::{
RecordBatchStream, SendableRecordBatchStream,
};
use datafusion_common::DFSchema;
use datafusion_expr::EmptyRelation;
use datafusion_expr::{EmptyRelation, col};
use datatypes::arrow;
use datatypes::arrow::array::{ArrayRef, Float64Array, TimestampMillisecondArray};
use datatypes::arrow::datatypes::{DataType, Field, SchemaRef, TimeUnit};
@@ -107,7 +107,21 @@ impl UserDefinedLogicalNodeCore for Absent {
}
fn expressions(&self) -> Vec<Expr> {
vec![]
if self.unfix.is_some() {
return vec![];
}
vec![col(&self.time_index_column)]
}
fn necessary_children_exprs(&self, _output_columns: &[usize]) -> Option<Vec<Vec<usize>>> {
if self.unfix.is_some() {
return None;
}
let input_schema = self.input.schema();
let time_index_idx = input_schema.index_of_column_by_name(None, &self.time_index_column)?;
Some(vec![vec![time_index_idx]])
}
fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {

View File

@@ -40,6 +40,7 @@ use datafusion::physical_plan::{
Partitioning, PhysicalExpr, PlanProperties, RecordBatchStream, SendableRecordBatchStream,
};
use datafusion::prelude::{Column, Expr};
use datafusion_expr::col;
use datatypes::prelude::{ConcreteDataType, DataType as GtDataType};
use datatypes::value::{OrderedF64, Value, ValueRef};
use datatypes::vectors::{Helper, MutableVector, VectorRef};
@@ -88,7 +89,45 @@ impl UserDefinedLogicalNodeCore for HistogramFold {
}
fn expressions(&self) -> Vec<Expr> {
vec![]
let mut exprs = vec![
col(&self.le_column),
col(&self.ts_column),
col(&self.field_column),
];
exprs.extend(self.input.schema().fields().iter().filter_map(|f| {
let name = f.name();
if name != &self.le_column && name != &self.ts_column && name != &self.field_column {
Some(col(name))
} else {
None
}
}));
exprs
}
fn necessary_children_exprs(&self, output_columns: &[usize]) -> Option<Vec<Vec<usize>>> {
let input_schema = self.input.schema();
let le_column_index = input_schema.index_of_column_by_name(None, &self.le_column)?;
if output_columns.is_empty() {
let indices = (0..input_schema.fields().len()).collect::<Vec<_>>();
return Some(vec![indices]);
}
let mut necessary_indices = output_columns
.iter()
.map(|&output_column| {
if output_column < le_column_index {
output_column
} else {
output_column + 1
}
})
.collect::<Vec<_>>();
necessary_indices.push(le_column_index);
necessary_indices.sort_unstable();
necessary_indices.dedup();
Some(vec![necessary_indices])
}
fn fmt_for_explain(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
@@ -998,11 +1037,26 @@ mod test {
use datafusion::common::ToDFSchema;
use datafusion::datasource::memory::MemorySourceConfig;
use datafusion::datasource::source::DataSourceExec;
use datafusion::logical_expr::EmptyRelation;
use datafusion::prelude::SessionContext;
use datatypes::arrow_array::StringArray;
use futures::FutureExt;
use super::*;
fn project_batch(batch: &RecordBatch, indices: &[usize]) -> RecordBatch {
let fields = indices
.iter()
.map(|&idx| batch.schema().field(idx).clone())
.collect::<Vec<_>>();
let columns = indices
.iter()
.map(|&idx| batch.column(idx).clone())
.collect::<Vec<_>>();
let schema = Arc::new(Schema::new(fields));
RecordBatch::try_new(schema, columns).unwrap()
}
fn prepare_test_data() -> DataSourceExec {
let schema = Arc::new(Schema::new(vec![
Field::new("host", DataType::Utf8, true),
@@ -1190,6 +1244,100 @@ mod test {
assert_eq!(result_literal, expected);
}
#[tokio::test]
async fn pruning_should_keep_le_column_for_exec() {
let schema = Arc::new(Schema::new(vec![
Field::new("ts", DataType::Timestamp(TimeUnit::Millisecond, None), true),
Field::new("le", DataType::Utf8, true),
Field::new("val", DataType::Float64, true),
]));
let df_schema = schema.clone().to_dfschema_ref().unwrap();
let input = LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: false,
schema: df_schema,
});
let plan = HistogramFold::new(
"le".to_string(),
"val".to_string(),
"ts".to_string(),
0.5,
input,
)
.unwrap();
let output_columns = [0usize, 1usize];
let required = plan.necessary_children_exprs(&output_columns).unwrap();
let required = &required[0];
assert_eq!(required.as_slice(), &[0, 1, 2]);
let input_batch = RecordBatch::try_new(
schema,
vec![
Arc::new(TimestampMillisecondArray::from(vec![0, 0])),
Arc::new(StringArray::from(vec!["0.1", "+Inf"])),
Arc::new(Float64Array::from(vec![1.0, 2.0])),
],
)
.unwrap();
let projected = project_batch(&input_batch, required);
let projected_schema = projected.schema();
let memory_exec = Arc::new(DataSourceExec::new(Arc::new(
MemorySourceConfig::try_new(&[vec![projected]], projected_schema, None).unwrap(),
)));
let fold_exec = plan.to_execution_plan(memory_exec);
let session_context = SessionContext::default();
let output_batches =
datafusion::physical_plan::collect(fold_exec, session_context.task_ctx())
.await
.unwrap();
assert_eq!(output_batches.len(), 1);
let output_batch = &output_batches[0];
assert_eq!(output_batch.num_rows(), 1);
let ts = output_batch
.column(0)
.as_any()
.downcast_ref::<TimestampMillisecondArray>()
.unwrap();
assert_eq!(ts.values(), &[0i64]);
let values = output_batch
.column(1)
.as_any()
.downcast_ref::<Float64Array>()
.unwrap();
assert!((values.value(0) - 0.1).abs() < 1e-12);
// Simulate the pre-fix pruning behavior: omit the `le` column from the child input.
let le_index = 1usize;
let broken_required = output_columns
.iter()
.map(|&output_column| {
if output_column < le_index {
output_column
} else {
output_column + 1
}
})
.collect::<Vec<_>>();
let broken = project_batch(&input_batch, &broken_required);
let broken_schema = broken.schema();
let broken_exec = Arc::new(DataSourceExec::new(Arc::new(
MemorySourceConfig::try_new(&[vec![broken]], broken_schema, None).unwrap(),
)));
let broken_fold_exec = plan.to_execution_plan(broken_exec);
let session_context = SessionContext::default();
let broken_result = std::panic::AssertUnwindSafe(async {
datafusion::physical_plan::collect(broken_fold_exec, session_context.task_ctx()).await
})
.catch_unwind()
.await;
assert!(broken_result.is_err());
}
#[test]
fn confirm_schema() {
let input_schema = Schema::new(vec![

View File

@@ -33,6 +33,7 @@ use datafusion::physical_plan::{
DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, PlanProperties, RecordBatchStream,
SendableRecordBatchStream, Statistics,
};
use datafusion_expr::col;
use datatypes::arrow::compute;
use datatypes::arrow::error::Result as ArrowResult;
use futures::{Stream, StreamExt, ready};
@@ -84,7 +85,37 @@ impl UserDefinedLogicalNodeCore for InstantManipulate {
}
fn expressions(&self) -> Vec<Expr> {
vec![]
if self.unfix.is_some() {
return vec![];
}
let mut exprs = vec![col(&self.time_index_column)];
if let Some(field) = &self.field_column {
exprs.push(col(field));
}
exprs
}
fn necessary_children_exprs(&self, output_columns: &[usize]) -> Option<Vec<Vec<usize>>> {
if self.unfix.is_some() {
return None;
}
let input_schema = self.input.schema();
if output_columns.is_empty() {
let indices = (0..input_schema.fields().len()).collect::<Vec<_>>();
return Some(vec![indices]);
}
let mut required = output_columns.to_vec();
required.push(input_schema.index_of_column_by_name(None, &self.time_index_column)?);
if let Some(field) = &self.field_column {
required.push(input_schema.index_of_column_by_name(None, field)?);
}
required.sort_unstable();
required.dedup();
Some(vec![required])
}
fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
@@ -440,8 +471,6 @@ impl InstantManipulateStream {
// refer to Go version: https://github.com/prometheus/prometheus/blob/e934d0f01158a1d55fa0ebb035346b195fcc1260/promql/engine.go#L1571
// and the function `vectorSelectorSingle`
pub fn manipulate(&self, input: RecordBatch) -> DataFusionResult<RecordBatch> {
let mut take_indices = vec![];
let ts_column = input
.column(self.time_index)
.as_any()
@@ -473,6 +502,8 @@ impl InstantManipulateStream {
let aligned_start = self.start + (max_start - self.start) / self.interval * self.interval;
let aligned_end = self.end - (self.end - min_end) / self.interval * self.interval;
let mut take_indices = vec![];
let mut cursor = 0;
let aligned_ts_iter = (aligned_start..=aligned_end).step_by(self.interval as usize);
@@ -570,6 +601,8 @@ impl InstantManipulateStream {
#[cfg(test)]
mod test {
use datafusion::common::ToDFSchema;
use datafusion::logical_expr::{EmptyRelation, LogicalPlan};
use datafusion::prelude::SessionContext;
use super::*;
@@ -611,6 +644,30 @@ mod test {
assert_eq!(result_literal, expected);
}
#[test]
fn pruning_should_keep_time_and_field_columns_for_exec() {
let df_schema = prepare_test_data().schema().to_dfschema_ref().unwrap();
let input = LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: false,
schema: df_schema,
});
let plan = InstantManipulate::new(
0,
0,
0,
0,
TIME_INDEX_COLUMN.to_string(),
Some("value".to_string()),
input,
);
// Simulate a parent projection requesting only the `path` column.
let output_columns = [2usize];
let required = plan.necessary_children_exprs(&output_columns).unwrap();
let required = &required[0];
assert_eq!(required.as_slice(), &[0, 1, 2]);
}
#[tokio::test]
async fn lookback_10s_interval_30s() {
let expected = String::from(

View File

@@ -31,6 +31,7 @@ use datafusion::physical_plan::{
DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, PlanProperties, RecordBatchStream,
SendableRecordBatchStream,
};
use datafusion_expr::col;
use datatypes::arrow::array::TimestampMillisecondArray;
use datatypes::arrow::datatypes::SchemaRef;
use datatypes::arrow::record_batch::RecordBatch;
@@ -83,7 +84,38 @@ impl UserDefinedLogicalNodeCore for SeriesNormalize {
}
fn expressions(&self) -> Vec<datafusion::logical_expr::Expr> {
vec![]
if self.unfix.is_some() {
return vec![];
}
self.tag_columns
.iter()
.map(col)
.chain(std::iter::once(col(&self.time_index_column_name)))
.collect()
}
fn necessary_children_exprs(&self, output_columns: &[usize]) -> Option<Vec<Vec<usize>>> {
if self.unfix.is_some() {
return None;
}
let input_schema = self.input.schema();
if output_columns.is_empty() {
let indices = (0..input_schema.fields().len()).collect::<Vec<_>>();
return Some(vec![indices]);
}
let mut required = Vec::with_capacity(output_columns.len() + 1 + self.tag_columns.len());
required.extend_from_slice(output_columns);
required.push(input_schema.index_of_column_by_name(None, &self.time_index_column_name)?);
for tag in &self.tag_columns {
required.push(input_schema.index_of_column_by_name(None, tag)?);
}
required.sort_unstable();
required.dedup();
Some(vec![required])
}
fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
@@ -429,8 +461,10 @@ mod test {
use datafusion::arrow::datatypes::{
ArrowPrimitiveType, DataType, Field, Schema, TimestampMillisecondType,
};
use datafusion::common::ToDFSchema;
use datafusion::datasource::memory::MemorySourceConfig;
use datafusion::datasource::source::DataSourceExec;
use datafusion::logical_expr::{EmptyRelation, LogicalPlan};
use datafusion::prelude::SessionContext;
use datatypes::arrow::array::TimestampMillisecondArray;
use datatypes::arrow_array::StringArray;
@@ -461,6 +495,23 @@ mod test {
))
}
#[test]
fn pruning_should_keep_time_and_tag_columns_for_exec() {
let df_schema = prepare_test_data().schema().to_dfschema_ref().unwrap();
let input = LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: false,
schema: df_schema,
});
let plan =
SeriesNormalize::new(0, TIME_INDEX_COLUMN, true, vec!["path".to_string()], input);
// Simulate a parent projection requesting only the `value` column.
let output_columns = [1usize];
let required = plan.necessary_children_exprs(&output_columns).unwrap();
let required = &required[0];
assert_eq!(required.as_slice(), &[0, 1, 2]);
}
#[tokio::test]
async fn test_sort_record_batch() {
let memory_exec = Arc::new(prepare_test_data());

View File

@@ -18,7 +18,7 @@ use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use common_telemetry::debug;
use common_telemetry::{debug, warn};
use datafusion::arrow::array::{Array, ArrayRef, Int64Array, TimestampMillisecondArray};
use datafusion::arrow::compute;
use datafusion::arrow::datatypes::{Field, SchemaRef};
@@ -38,6 +38,7 @@ use datafusion::physical_plan::{
SendableRecordBatchStream, Statistics,
};
use datafusion::sql::TableReference;
use datafusion_expr::col;
use futures::{Stream, StreamExt, ready};
use greptime_proto::substrait_extension as pb;
use prost::Message;
@@ -288,7 +289,53 @@ impl UserDefinedLogicalNodeCore for RangeManipulate {
}
fn expressions(&self) -> Vec<Expr> {
vec![]
if self.unfix.is_some() {
return vec![];
}
let mut exprs = Vec::with_capacity(1 + self.field_columns.len());
exprs.push(col(&self.time_index));
exprs.extend(self.field_columns.iter().map(col));
exprs
}
fn necessary_children_exprs(&self, output_columns: &[usize]) -> Option<Vec<Vec<usize>>> {
if self.unfix.is_some() {
return None;
}
let input_schema = self.input.schema();
let input_len = input_schema.fields().len();
let time_index_idx = input_schema.index_of_column_by_name(None, &self.time_index)?;
if output_columns.is_empty() {
let indices = (0..input_len).collect::<Vec<_>>();
return Some(vec![indices]);
}
let mut required = Vec::with_capacity(output_columns.len() + 1 + self.field_columns.len());
required.push(time_index_idx);
for value_column in &self.field_columns {
required.push(input_schema.index_of_column_by_name(None, value_column)?);
}
for &idx in output_columns {
if idx < input_len {
required.push(idx);
} else if idx == input_len {
// Derived timestamp range column.
required.push(time_index_idx);
} else {
warn!(
"Output column index {} is out of bounds for input schema with length {}",
idx, input_len
);
return None;
}
}
required.sort_unstable();
required.dedup();
Some(vec![required])
}
fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
@@ -734,16 +781,31 @@ mod test {
use datafusion::common::ToDFSchema;
use datafusion::datasource::memory::MemorySourceConfig;
use datafusion::datasource::source::DataSourceExec;
use datafusion::logical_expr::{EmptyRelation, LogicalPlan};
use datafusion::physical_expr::Partitioning;
use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
use datafusion::physical_plan::memory::MemoryStream;
use datafusion::prelude::SessionContext;
use datatypes::arrow::array::TimestampMillisecondArray;
use futures::FutureExt;
use super::*;
const TIME_INDEX_COLUMN: &str = "timestamp";
fn project_batch(batch: &RecordBatch, indices: &[usize]) -> RecordBatch {
let fields = indices
.iter()
.map(|&idx| batch.schema().field(idx).clone())
.collect::<Vec<_>>();
let columns = indices
.iter()
.map(|&idx| batch.column(idx).clone())
.collect::<Vec<_>>();
let schema = Arc::new(Schema::new(fields));
RecordBatch::try_new(schema, columns).unwrap()
}
fn prepare_test_data() -> DataSourceExec {
let schema = Arc::new(Schema::new(vec![
Field::new(TIME_INDEX_COLUMN, TimestampMillisecondType::DATA_TYPE, true),
@@ -844,10 +906,96 @@ mod test {
assert_eq!(result_literal, expected);
}
#[tokio::test]
async fn pruning_should_keep_time_and_value_columns_for_exec() {
let schema = Arc::new(Schema::new(vec![
Field::new(TIME_INDEX_COLUMN, TimestampMillisecondType::DATA_TYPE, true),
Field::new("value_1", DataType::Float64, true),
Field::new("value_2", DataType::Float64, true),
Field::new("path", DataType::Utf8, true),
]));
let df_schema = schema.clone().to_dfschema_ref().unwrap();
let input = LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: false,
schema: df_schema,
});
let plan = RangeManipulate::new(
0,
310_000,
30_000,
90_000,
TIME_INDEX_COLUMN.to_string(),
vec!["value_1".to_string(), "value_2".to_string()],
input,
)
.unwrap();
// Simulate a parent projection requesting only the `path` column.
let output_columns = [3usize];
let required = plan.necessary_children_exprs(&output_columns).unwrap();
let required = &required[0];
assert_eq!(required.as_slice(), &[0, 1, 2, 3]);
let timestamp_column = Arc::new(TimestampMillisecondArray::from(vec![
0, 30_000, 60_000, 90_000, 120_000, // every 30s
180_000, 240_000, // every 60s
241_000, 271_000, 291_000, // others
])) as _;
let field_column: ArrayRef = Arc::new(Float64Array::from(vec![1.0; 10])) as _;
let path_column = Arc::new(StringArray::from(vec!["foo"; 10])) as _;
let input_batch = RecordBatch::try_new(
schema,
vec![
timestamp_column,
field_column.clone(),
field_column,
path_column,
],
)
.unwrap();
let projected = project_batch(&input_batch, required);
let projected_schema = projected.schema();
let memory_exec = Arc::new(DataSourceExec::new(Arc::new(
MemorySourceConfig::try_new(&[vec![projected]], projected_schema, None).unwrap(),
)));
let range_exec = plan.to_execution_plan(memory_exec);
let session_context = SessionContext::default();
let output_batches =
datafusion::physical_plan::collect(range_exec, session_context.task_ctx())
.await
.unwrap();
assert_eq!(output_batches.len(), 1);
let output_batch = &output_batches[0];
let path = output_batch
.column(3)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
assert!(path.iter().all(|v| v == Some("foo")));
// Simulate the pre-fix pruning behavior: omit the timestamp/value columns from the child.
let broken_required = [3usize];
let broken = project_batch(&input_batch, &broken_required);
let broken_schema = broken.schema();
let broken_exec = Arc::new(DataSourceExec::new(Arc::new(
MemorySourceConfig::try_new(&[vec![broken]], broken_schema, None).unwrap(),
)));
let broken_range_exec = plan.to_execution_plan(broken_exec);
let session_context = SessionContext::default();
let broken_result = std::panic::AssertUnwindSafe(async {
datafusion::physical_plan::collect(broken_range_exec, session_context.task_ctx()).await
})
.catch_unwind()
.await;
assert!(broken_result.is_err());
}
#[tokio::test]
async fn interval_30s_range_90s() {
let expected = String::from(
"PrimitiveArray<Timestamp(Millisecond, None)>\n[\n \
"PrimitiveArray<Timestamp(ms)>\n[\n \
1970-01-01T00:00:00,\n \
1970-01-01T00:00:30,\n \
1970-01-01T00:01:00,\n \
@@ -867,7 +1015,7 @@ mod test {
ranges: [Some(0..1), Some(0..2), Some(0..3), Some(0..4), Some(1..5), Some(2..5), Some(3..6), Some(4..6), Some(5..7), Some(5..8), Some(6..10)] \
}\nStringArray\n[\n \"foo\",\n \"foo\",\n \"foo\",\n \"foo\",\n \"foo\",\n \"foo\",\n \"foo\",\n \"foo\",\n \"foo\",\n \"foo\",\n \"foo\",\n]\n\
RangeArray { \
base array: PrimitiveArray<Timestamp(Millisecond, None)>\n[\n 1970-01-01T00:00:00,\n 1970-01-01T00:00:30,\n 1970-01-01T00:01:00,\n 1970-01-01T00:01:30,\n 1970-01-01T00:02:00,\n 1970-01-01T00:03:00,\n 1970-01-01T00:04:00,\n 1970-01-01T00:04:01,\n 1970-01-01T00:04:31,\n 1970-01-01T00:04:51,\n], \
base array: PrimitiveArray<Timestamp(ms)>\n[\n 1970-01-01T00:00:00,\n 1970-01-01T00:00:30,\n 1970-01-01T00:01:00,\n 1970-01-01T00:01:30,\n 1970-01-01T00:02:00,\n 1970-01-01T00:03:00,\n 1970-01-01T00:04:00,\n 1970-01-01T00:04:01,\n 1970-01-01T00:04:31,\n 1970-01-01T00:04:51,\n], \
ranges: [Some(0..1), Some(0..2), Some(0..3), Some(0..4), Some(1..5), Some(2..5), Some(3..6), Some(4..6), Some(5..7), Some(5..8), Some(6..10)] \
}",
);
@@ -880,7 +1028,7 @@ mod test {
#[tokio::test]
async fn small_empty_range() {
let expected = String::from(
"PrimitiveArray<Timestamp(Millisecond, None)>\n[\n \
"PrimitiveArray<Timestamp(ms)>\n[\n \
1970-01-01T00:00:00.001,\n \
1970-01-01T00:00:03.001,\n \
1970-01-01T00:00:06.001,\n \
@@ -893,7 +1041,7 @@ mod test {
ranges: [Some(0..1), Some(0..0), Some(0..0), Some(0..0)] \
}\nStringArray\n[\n \"foo\",\n \"foo\",\n \"foo\",\n \"foo\",\n]\n\
RangeArray { \
base array: PrimitiveArray<Timestamp(Millisecond, None)>\n[\n 1970-01-01T00:00:00,\n 1970-01-01T00:00:30,\n 1970-01-01T00:01:00,\n 1970-01-01T00:01:30,\n 1970-01-01T00:02:00,\n 1970-01-01T00:03:00,\n 1970-01-01T00:04:00,\n 1970-01-01T00:04:01,\n 1970-01-01T00:04:31,\n 1970-01-01T00:04:51,\n], \
base array: PrimitiveArray<Timestamp(ms)>\n[\n 1970-01-01T00:00:00,\n 1970-01-01T00:00:30,\n 1970-01-01T00:01:00,\n 1970-01-01T00:01:30,\n 1970-01-01T00:02:00,\n 1970-01-01T00:03:00,\n 1970-01-01T00:04:00,\n 1970-01-01T00:04:01,\n 1970-01-01T00:04:31,\n 1970-01-01T00:04:51,\n], \
ranges: [Some(0..1), Some(0..0), Some(0..0), Some(0..0)] \
}",
);

View File

@@ -31,6 +31,7 @@ use datafusion::physical_plan::{
};
use datafusion::prelude::Expr;
use datafusion::sql::TableReference;
use datafusion_expr::col;
use datatypes::arrow::array::{Array, Float64Array, StringArray, TimestampMillisecondArray};
use datatypes::arrow::compute::{CastOptions, cast_with_options, concat_batches};
use datatypes::arrow::datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit};
@@ -266,7 +267,36 @@ impl UserDefinedLogicalNodeCore for ScalarCalculate {
}
fn expressions(&self) -> Vec<Expr> {
vec![]
if self.unfix.is_some() {
return vec![];
}
self.tag_columns
.iter()
.map(col)
.chain(std::iter::once(col(&self.time_index)))
.chain(std::iter::once(col(&self.field_column)))
.collect()
}
fn necessary_children_exprs(&self, _output_columns: &[usize]) -> Option<Vec<Vec<usize>>> {
if self.unfix.is_some() {
return None;
}
let input_schema = self.input.schema();
let time_index_idx = input_schema.index_of_column_by_name(None, &self.time_index)?;
let field_column_idx = input_schema.index_of_column_by_name(None, &self.field_column)?;
let mut required = Vec::with_capacity(2 + self.tag_columns.len());
required.extend([time_index_idx, field_column_idx]);
for tag in &self.tag_columns {
required.push(input_schema.index_of_column_by_name(None, tag)?);
}
required.sort_unstable();
required.dedup();
Some(vec![required])
}
fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
@@ -275,15 +305,9 @@ impl UserDefinedLogicalNodeCore for ScalarCalculate {
fn with_exprs_and_inputs(
&self,
exprs: Vec<Expr>,
_exprs: Vec<Expr>,
inputs: Vec<LogicalPlan>,
) -> DataFusionResult<Self> {
if !exprs.is_empty() {
return Err(DataFusionError::Internal(
"ScalarCalculate should not have any expressions".to_string(),
));
}
let input: LogicalPlan = inputs.into_iter().next().unwrap();
let input_schema = input.schema();
@@ -624,6 +648,109 @@ mod test {
use super::*;
fn project_batch(batch: &RecordBatch, indices: &[usize]) -> RecordBatch {
let fields = indices
.iter()
.map(|&idx| batch.schema().field(idx).clone())
.collect::<Vec<_>>();
let columns = indices
.iter()
.map(|&idx| batch.column(idx).clone())
.collect::<Vec<_>>();
let schema = Arc::new(Schema::new(fields));
RecordBatch::try_new(schema, columns).unwrap()
}
#[test]
fn necessary_children_exprs_preserve_tag_columns() {
let schema = Arc::new(Schema::new(vec![
Field::new("ts", DataType::Timestamp(TimeUnit::Millisecond, None), true),
Field::new("tag1", DataType::Utf8, true),
Field::new("tag2", DataType::Utf8, true),
Field::new("val", DataType::Float64, true),
Field::new("extra", DataType::Utf8, true),
]));
let schema = Arc::new(DFSchema::try_from(schema).unwrap());
let input = LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: false,
schema,
});
let tag_columns = vec!["tag1".to_string(), "tag2".to_string()];
let plan = ScalarCalculate::new(0, 1, 1, input, "ts", &tag_columns, "val", None).unwrap();
let required = plan.necessary_children_exprs(&[0, 1]).unwrap();
assert_eq!(required, vec![vec![0, 1, 2, 3]]);
}
#[tokio::test]
async fn pruning_should_keep_tag_columns_for_exec() {
let schema = Arc::new(Schema::new(vec![
Field::new("ts", DataType::Timestamp(TimeUnit::Millisecond, None), true),
Field::new("tag1", DataType::Utf8, true),
Field::new("tag2", DataType::Utf8, true),
Field::new("val", DataType::Float64, true),
Field::new("extra", DataType::Utf8, true),
]));
let df_schema = Arc::new(DFSchema::try_from(schema.clone()).unwrap());
let input = LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: false,
schema: df_schema,
});
let tag_columns = vec!["tag1".to_string(), "tag2".to_string()];
let plan =
ScalarCalculate::new(0, 15_000, 5000, input, "ts", &tag_columns, "val", None).unwrap();
let required = plan.necessary_children_exprs(&[0, 1]).unwrap();
let required = &required[0];
let batch = RecordBatch::try_new(
schema,
vec![
Arc::new(TimestampMillisecondArray::from(vec![
0, 5_000, 10_000, 15_000,
])),
Arc::new(StringArray::from(vec!["foo", "foo", "foo", "foo"])),
Arc::new(StringArray::from(vec!["bar", "bar", "bar", "bar"])),
Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0, 4.0])),
Arc::new(StringArray::from(vec!["x", "x", "x", "x"])),
],
)
.unwrap();
let projected_batch = project_batch(&batch, required);
let projected_schema = projected_batch.schema();
let memory_exec = Arc::new(DataSourceExec::new(Arc::new(
MemorySourceConfig::try_new(&[vec![projected_batch]], projected_schema, None).unwrap(),
)));
let scalar_exec = plan.to_execution_plan(memory_exec).unwrap();
let session_context = SessionContext::default();
let result = datafusion::physical_plan::collect(scalar_exec, session_context.task_ctx())
.await
.unwrap();
assert_eq!(result.len(), 1);
let batch = &result[0];
assert_eq!(batch.num_columns(), 2);
assert_eq!(batch.num_rows(), 4);
assert_eq!(batch.schema().field(0).name(), "ts");
assert_eq!(batch.schema().field(1).name(), "scalar(val)");
let ts = batch
.column(0)
.as_any()
.downcast_ref::<TimestampMillisecondArray>()
.unwrap();
assert_eq!(ts.values(), &[0i64, 5_000, 10_000, 15_000]);
let values = batch
.column(1)
.as_any()
.downcast_ref::<Float64Array>()
.unwrap();
assert_eq!(values.values(), &[1.0f64, 2.0, 3.0, 4.0]);
}
fn prepare_test_data(series: Vec<RecordBatch>) -> DataSourceExec {
let schema = Arc::new(Schema::new(vec![
Field::new("ts", DataType::Timestamp(TimeUnit::Millisecond, None), true),

Some files were not shown because too many files have changed in this diff Show More