Compare commits

...

3 Commits

Author SHA1 Message Date
evenyag
00d759e828 chore: bump version to v0.15.1
Signed-off-by: evenyag <realevenyag@gmail.com>
2025-07-04 22:53:46 +08:00
Lei, HUANG
0042ea6462 fix: filter empty batch in bulk insert api (#6459)
* fix/filter-empty-batch-in-bulk-insert-api:
 **Add Early Return for Empty Record Batches in `bulk_insert.rs`**

 - Implemented an early return in the `Inserter` implementation to handle cases where `record_batch.num_rows()` is zero, improving efficiency by avoiding unnecessary processing.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* fix/filter-empty-batch-in-bulk-insert-api:
 **Improve Bulk Insert Handling**

 - **`handle_bulk_insert.rs`**: Added a check to handle cases where the batch has zero rows, immediately returning and sending a success response with zero rows processed.
 - **`bulk_insert.rs`**: Enhanced logic to skip processing for masks that select none, optimizing the bulk insert operation by avoiding unnecessary iterations.

 These changes improve the efficiency and robustness of the bulk insert process by handling edge cases more effectively.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* fix/filter-empty-batch-in-bulk-insert-api:
 ### Refactor and Error Handling Enhancements

 - **Refactored Timestamp Handling**: Introduced `timestamp_array_to_primitive` function in `timestamp.rs` to streamline conversion of timestamp arrays to primitive arrays, reducing redundancy in `handle_bulk_insert.rs` and `bulk_insert.rs`.
 - **Error Handling**: Added `InconsistentTimestampLength` error in `error.rs` to handle mismatched timestamp column lengths in bulk insert operations.
 - **Bulk Insert Logic**: Updated `handle_bulk_insert.rs` to utilize the new timestamp conversion function and added checks for timestamp length consistency.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* fix/filter-empty-batch-in-bulk-insert-api:
 **Refactor `bulk_insert.rs` to streamline imports**

 - Simplified import statements by removing unused timestamp-related arrays and data types from the `arrow` crate in `bulk_insert.rs`.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

---------

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
Signed-off-by: evenyag <realevenyag@gmail.com>
2025-07-04 22:53:46 +08:00
Zhenchi
d06450715f fix: add backward compatibility for SkippingIndexOptions deserialization (#6458)
* fix: add backward compatibility for `SkippingIndexOptions` deserialization

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* address comments

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* address comments

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

---------

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
Signed-off-by: evenyag <realevenyag@gmail.com>
2025-07-04 22:53:46 +08:00
7 changed files with 223 additions and 163 deletions

152
Cargo.lock generated
View File

@@ -211,7 +211,7 @@ checksum = "d301b3b94cb4b2f23d7917810addbbaff90738e0ca2be692bd027e70d7e0330c"
[[package]]
name = "api"
version = "0.15.0"
version = "0.15.1"
dependencies = [
"common-base",
"common-decimal",
@@ -944,7 +944,7 @@ dependencies = [
[[package]]
name = "auth"
version = "0.15.0"
version = "0.15.1"
dependencies = [
"api",
"async-trait",
@@ -1586,7 +1586,7 @@ dependencies = [
[[package]]
name = "cache"
version = "0.15.0"
version = "0.15.1"
dependencies = [
"catalog",
"common-error",
@@ -1610,7 +1610,7 @@ checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5"
[[package]]
name = "catalog"
version = "0.15.0"
version = "0.15.1"
dependencies = [
"api",
"arrow 54.2.1",
@@ -1948,7 +1948,7 @@ checksum = "1462739cb27611015575c0c11df5df7601141071f07518d56fcc1be504cbec97"
[[package]]
name = "cli"
version = "0.15.0"
version = "0.15.1"
dependencies = [
"async-stream",
"async-trait",
@@ -1993,7 +1993,7 @@ dependencies = [
"session",
"snafu 0.8.5",
"store-api",
"substrait 0.15.0",
"substrait 0.15.1",
"table",
"tempfile",
"tokio",
@@ -2002,7 +2002,7 @@ dependencies = [
[[package]]
name = "client"
version = "0.15.0"
version = "0.15.1"
dependencies = [
"api",
"arc-swap",
@@ -2032,7 +2032,7 @@ dependencies = [
"rand 0.9.0",
"serde_json",
"snafu 0.8.5",
"substrait 0.15.0",
"substrait 0.15.1",
"substrait 0.37.3",
"tokio",
"tokio-stream",
@@ -2073,7 +2073,7 @@ dependencies = [
[[package]]
name = "cmd"
version = "0.15.0"
version = "0.15.1"
dependencies = [
"async-trait",
"auth",
@@ -2134,7 +2134,7 @@ dependencies = [
"snafu 0.8.5",
"stat",
"store-api",
"substrait 0.15.0",
"substrait 0.15.1",
"table",
"temp-env",
"tempfile",
@@ -2181,7 +2181,7 @@ checksum = "55b672471b4e9f9e95499ea597ff64941a309b2cdbffcc46f2cc5e2d971fd335"
[[package]]
name = "common-base"
version = "0.15.0"
version = "0.15.1"
dependencies = [
"anymap2",
"async-trait",
@@ -2203,11 +2203,11 @@ dependencies = [
[[package]]
name = "common-catalog"
version = "0.15.0"
version = "0.15.1"
[[package]]
name = "common-config"
version = "0.15.0"
version = "0.15.1"
dependencies = [
"common-base",
"common-error",
@@ -2232,7 +2232,7 @@ dependencies = [
[[package]]
name = "common-datasource"
version = "0.15.0"
version = "0.15.1"
dependencies = [
"arrow 54.2.1",
"arrow-schema 54.3.1",
@@ -2269,7 +2269,7 @@ dependencies = [
[[package]]
name = "common-decimal"
version = "0.15.0"
version = "0.15.1"
dependencies = [
"bigdecimal 0.4.8",
"common-error",
@@ -2282,7 +2282,7 @@ dependencies = [
[[package]]
name = "common-error"
version = "0.15.0"
version = "0.15.1"
dependencies = [
"common-macro",
"http 1.1.0",
@@ -2293,7 +2293,7 @@ dependencies = [
[[package]]
name = "common-frontend"
version = "0.15.0"
version = "0.15.1"
dependencies = [
"async-trait",
"common-error",
@@ -2309,7 +2309,7 @@ dependencies = [
[[package]]
name = "common-function"
version = "0.15.0"
version = "0.15.1"
dependencies = [
"ahash 0.8.11",
"api",
@@ -2362,7 +2362,7 @@ dependencies = [
[[package]]
name = "common-greptimedb-telemetry"
version = "0.15.0"
version = "0.15.1"
dependencies = [
"async-trait",
"common-runtime",
@@ -2379,7 +2379,7 @@ dependencies = [
[[package]]
name = "common-grpc"
version = "0.15.0"
version = "0.15.1"
dependencies = [
"api",
"arrow-flight",
@@ -2411,7 +2411,7 @@ dependencies = [
[[package]]
name = "common-grpc-expr"
version = "0.15.0"
version = "0.15.1"
dependencies = [
"api",
"common-base",
@@ -2430,7 +2430,7 @@ dependencies = [
[[package]]
name = "common-macro"
version = "0.15.0"
version = "0.15.1"
dependencies = [
"arc-swap",
"common-query",
@@ -2444,7 +2444,7 @@ dependencies = [
[[package]]
name = "common-mem-prof"
version = "0.15.0"
version = "0.15.1"
dependencies = [
"anyhow",
"common-error",
@@ -2460,7 +2460,7 @@ dependencies = [
[[package]]
name = "common-meta"
version = "0.15.0"
version = "0.15.1"
dependencies = [
"anymap2",
"api",
@@ -2525,7 +2525,7 @@ dependencies = [
[[package]]
name = "common-options"
version = "0.15.0"
version = "0.15.1"
dependencies = [
"common-grpc",
"humantime-serde",
@@ -2534,11 +2534,11 @@ dependencies = [
[[package]]
name = "common-plugins"
version = "0.15.0"
version = "0.15.1"
[[package]]
name = "common-pprof"
version = "0.15.0"
version = "0.15.1"
dependencies = [
"common-error",
"common-macro",
@@ -2550,7 +2550,7 @@ dependencies = [
[[package]]
name = "common-procedure"
version = "0.15.0"
version = "0.15.1"
dependencies = [
"async-stream",
"async-trait",
@@ -2577,7 +2577,7 @@ dependencies = [
[[package]]
name = "common-procedure-test"
version = "0.15.0"
version = "0.15.1"
dependencies = [
"async-trait",
"common-procedure",
@@ -2586,7 +2586,7 @@ dependencies = [
[[package]]
name = "common-query"
version = "0.15.0"
version = "0.15.1"
dependencies = [
"api",
"async-trait",
@@ -2612,7 +2612,7 @@ dependencies = [
[[package]]
name = "common-recordbatch"
version = "0.15.0"
version = "0.15.1"
dependencies = [
"arc-swap",
"common-error",
@@ -2632,7 +2632,7 @@ dependencies = [
[[package]]
name = "common-runtime"
version = "0.15.0"
version = "0.15.1"
dependencies = [
"async-trait",
"clap 4.5.19",
@@ -2662,14 +2662,14 @@ dependencies = [
[[package]]
name = "common-session"
version = "0.15.0"
version = "0.15.1"
dependencies = [
"strum 0.27.1",
]
[[package]]
name = "common-telemetry"
version = "0.15.0"
version = "0.15.1"
dependencies = [
"backtrace",
"common-error",
@@ -2696,7 +2696,7 @@ dependencies = [
[[package]]
name = "common-test-util"
version = "0.15.0"
version = "0.15.1"
dependencies = [
"client",
"common-grpc",
@@ -2709,7 +2709,7 @@ dependencies = [
[[package]]
name = "common-time"
version = "0.15.0"
version = "0.15.1"
dependencies = [
"arrow 54.2.1",
"chrono",
@@ -2727,7 +2727,7 @@ dependencies = [
[[package]]
name = "common-version"
version = "0.15.0"
version = "0.15.1"
dependencies = [
"build-data",
"const_format",
@@ -2737,7 +2737,7 @@ dependencies = [
[[package]]
name = "common-wal"
version = "0.15.0"
version = "0.15.1"
dependencies = [
"common-base",
"common-error",
@@ -2760,7 +2760,7 @@ dependencies = [
[[package]]
name = "common-workload"
version = "0.15.0"
version = "0.15.1"
dependencies = [
"api",
"common-telemetry",
@@ -3716,7 +3716,7 @@ dependencies = [
[[package]]
name = "datanode"
version = "0.15.0"
version = "0.15.1"
dependencies = [
"api",
"arrow-flight",
@@ -3769,7 +3769,7 @@ dependencies = [
"session",
"snafu 0.8.5",
"store-api",
"substrait 0.15.0",
"substrait 0.15.1",
"table",
"tokio",
"toml 0.8.19",
@@ -3778,7 +3778,7 @@ dependencies = [
[[package]]
name = "datatypes"
version = "0.15.0"
version = "0.15.1"
dependencies = [
"arrow 54.2.1",
"arrow-array 54.2.1",
@@ -4438,7 +4438,7 @@ checksum = "e8c02a5121d4ea3eb16a80748c74f5549a5665e4c21333c6098f283870fbdea6"
[[package]]
name = "file-engine"
version = "0.15.0"
version = "0.15.1"
dependencies = [
"api",
"async-trait",
@@ -4575,7 +4575,7 @@ checksum = "8bf7cc16383c4b8d58b9905a8509f02926ce3058053c056376248d958c9df1e8"
[[package]]
name = "flow"
version = "0.15.0"
version = "0.15.1"
dependencies = [
"api",
"arrow 54.2.1",
@@ -4640,7 +4640,7 @@ dependencies = [
"sql",
"store-api",
"strum 0.27.1",
"substrait 0.15.0",
"substrait 0.15.1",
"table",
"tokio",
"tonic 0.12.3",
@@ -4695,7 +4695,7 @@ checksum = "6c2141d6d6c8512188a7891b4b01590a45f6dac67afb4f255c4124dbb86d4eaa"
[[package]]
name = "frontend"
version = "0.15.0"
version = "0.15.1"
dependencies = [
"api",
"arc-swap",
@@ -4755,7 +4755,7 @@ dependencies = [
"sqlparser 0.54.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=0cf6c04490d59435ee965edd2078e8855bd8471e)",
"store-api",
"strfmt",
"substrait 0.15.0",
"substrait 0.15.1",
"table",
"tokio",
"tokio-util",
@@ -5916,7 +5916,7 @@ dependencies = [
[[package]]
name = "index"
version = "0.15.0"
version = "0.15.1"
dependencies = [
"async-trait",
"asynchronous-codec",
@@ -6801,7 +6801,7 @@ checksum = "a7a70ba024b9dc04c27ea2f0c0548feb474ec5c54bba33a7f72f873a39d07b24"
[[package]]
name = "log-query"
version = "0.15.0"
version = "0.15.1"
dependencies = [
"chrono",
"common-error",
@@ -6813,7 +6813,7 @@ dependencies = [
[[package]]
name = "log-store"
version = "0.15.0"
version = "0.15.1"
dependencies = [
"async-stream",
"async-trait",
@@ -7111,7 +7111,7 @@ dependencies = [
[[package]]
name = "meta-client"
version = "0.15.0"
version = "0.15.1"
dependencies = [
"api",
"async-trait",
@@ -7139,7 +7139,7 @@ dependencies = [
[[package]]
name = "meta-srv"
version = "0.15.0"
version = "0.15.1"
dependencies = [
"api",
"async-trait",
@@ -7230,7 +7230,7 @@ dependencies = [
[[package]]
name = "metric-engine"
version = "0.15.0"
version = "0.15.1"
dependencies = [
"api",
"aquamarine",
@@ -7320,7 +7320,7 @@ dependencies = [
[[package]]
name = "mito-codec"
version = "0.15.0"
version = "0.15.1"
dependencies = [
"api",
"bytes",
@@ -7343,7 +7343,7 @@ dependencies = [
[[package]]
name = "mito2"
version = "0.15.0"
version = "0.15.1"
dependencies = [
"api",
"aquamarine",
@@ -8093,7 +8093,7 @@ dependencies = [
[[package]]
name = "object-store"
version = "0.15.0"
version = "0.15.1"
dependencies = [
"anyhow",
"bytes",
@@ -8407,7 +8407,7 @@ dependencies = [
[[package]]
name = "operator"
version = "0.15.0"
version = "0.15.1"
dependencies = [
"ahash 0.8.11",
"api",
@@ -8462,7 +8462,7 @@ dependencies = [
"sql",
"sqlparser 0.54.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=0cf6c04490d59435ee965edd2078e8855bd8471e)",
"store-api",
"substrait 0.15.0",
"substrait 0.15.1",
"table",
"tokio",
"tokio-util",
@@ -8729,7 +8729,7 @@ dependencies = [
[[package]]
name = "partition"
version = "0.15.0"
version = "0.15.1"
dependencies = [
"api",
"async-trait",
@@ -9017,7 +9017,7 @@ checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
[[package]]
name = "pipeline"
version = "0.15.0"
version = "0.15.1"
dependencies = [
"ahash 0.8.11",
"api",
@@ -9160,7 +9160,7 @@ dependencies = [
[[package]]
name = "plugins"
version = "0.15.0"
version = "0.15.1"
dependencies = [
"auth",
"clap 4.5.19",
@@ -9473,7 +9473,7 @@ dependencies = [
[[package]]
name = "promql"
version = "0.15.0"
version = "0.15.1"
dependencies = [
"ahash 0.8.11",
"async-trait",
@@ -9755,7 +9755,7 @@ dependencies = [
[[package]]
name = "puffin"
version = "0.15.0"
version = "0.15.1"
dependencies = [
"async-compression 0.4.13",
"async-trait",
@@ -9797,7 +9797,7 @@ dependencies = [
[[package]]
name = "query"
version = "0.15.0"
version = "0.15.1"
dependencies = [
"ahash 0.8.11",
"api",
@@ -9863,7 +9863,7 @@ dependencies = [
"sqlparser 0.54.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=0cf6c04490d59435ee965edd2078e8855bd8471e)",
"statrs",
"store-api",
"substrait 0.15.0",
"substrait 0.15.1",
"table",
"tokio",
"tokio-stream",
@@ -11149,7 +11149,7 @@ dependencies = [
[[package]]
name = "servers"
version = "0.15.0"
version = "0.15.1"
dependencies = [
"ahash 0.8.11",
"api",
@@ -11270,7 +11270,7 @@ dependencies = [
[[package]]
name = "session"
version = "0.15.0"
version = "0.15.1"
dependencies = [
"api",
"arc-swap",
@@ -11609,7 +11609,7 @@ dependencies = [
[[package]]
name = "sql"
version = "0.15.0"
version = "0.15.1"
dependencies = [
"api",
"chrono",
@@ -11664,7 +11664,7 @@ dependencies = [
[[package]]
name = "sqlness-runner"
version = "0.15.0"
version = "0.15.1"
dependencies = [
"async-trait",
"clap 4.5.19",
@@ -11964,7 +11964,7 @@ dependencies = [
[[package]]
name = "stat"
version = "0.15.0"
version = "0.15.1"
dependencies = [
"nix 0.30.1",
]
@@ -11990,7 +11990,7 @@ dependencies = [
[[package]]
name = "store-api"
version = "0.15.0"
version = "0.15.1"
dependencies = [
"api",
"aquamarine",
@@ -12151,7 +12151,7 @@ dependencies = [
[[package]]
name = "substrait"
version = "0.15.0"
version = "0.15.1"
dependencies = [
"async-trait",
"bytes",
@@ -12331,7 +12331,7 @@ dependencies = [
[[package]]
name = "table"
version = "0.15.0"
version = "0.15.1"
dependencies = [
"api",
"async-trait",
@@ -12592,7 +12592,7 @@ checksum = "3369f5ac52d5eb6ab48c6b4ffdc8efbcad6b89c765749064ba298f2c68a16a76"
[[package]]
name = "tests-fuzz"
version = "0.15.0"
version = "0.15.1"
dependencies = [
"arbitrary",
"async-trait",
@@ -12636,7 +12636,7 @@ dependencies = [
[[package]]
name = "tests-integration"
version = "0.15.0"
version = "0.15.1"
dependencies = [
"api",
"arrow-flight",
@@ -12703,7 +12703,7 @@ dependencies = [
"sql",
"sqlx",
"store-api",
"substrait 0.15.0",
"substrait 0.15.1",
"table",
"tempfile",
"time",

View File

@@ -71,7 +71,7 @@ members = [
resolver = "2"
[workspace.package]
version = "0.15.0"
version = "0.15.1"
edition = "2021"
license = "Apache-2.0"

View File

@@ -527,7 +527,7 @@ pub struct FulltextOptions {
#[serde(default = "fulltext_options_default_granularity")]
pub granularity: u32,
/// The false positive rate of the fulltext index (for bloom backend only)
#[serde(default = "fulltext_options_default_false_positive_rate_in_10000")]
#[serde(default = "index_options_default_false_positive_rate_in_10000")]
pub false_positive_rate_in_10000: u32,
}
@@ -535,7 +535,7 @@ fn fulltext_options_default_granularity() -> u32 {
DEFAULT_GRANULARITY
}
fn fulltext_options_default_false_positive_rate_in_10000() -> u32 {
fn index_options_default_false_positive_rate_in_10000() -> u32 {
(DEFAULT_FALSE_POSITIVE_RATE * 10000.0) as u32
}
@@ -773,6 +773,7 @@ pub struct SkippingIndexOptions {
/// The granularity of the skip index.
pub granularity: u32,
/// The false positive rate of the skip index (in ten-thousandths, e.g., 100 = 1%).
#[serde(default = "index_options_default_false_positive_rate_in_10000")]
pub false_positive_rate_in_10000: u32,
/// The type of the skip index.
#[serde(default)]
@@ -1179,4 +1180,59 @@ mod tests {
assert!(column_schema.default_constraint.is_none());
assert!(column_schema.metadata.is_empty());
}
#[test]
fn test_skipping_index_options_deserialization() {
let original_options = "{\"granularity\":1024,\"false-positive-rate-in-10000\":10,\"index-type\":\"BloomFilter\"}";
let options = serde_json::from_str::<SkippingIndexOptions>(original_options).unwrap();
assert_eq!(1024, options.granularity);
assert_eq!(SkippingIndexType::BloomFilter, options.index_type);
assert_eq!(0.001, options.false_positive_rate());
let options_str = serde_json::to_string(&options).unwrap();
assert_eq!(options_str, original_options);
}
#[test]
fn test_skipping_index_options_deserialization_v0_14_to_v0_15() {
let options = "{\"granularity\":10240,\"index-type\":\"BloomFilter\"}";
let options = serde_json::from_str::<SkippingIndexOptions>(options).unwrap();
assert_eq!(10240, options.granularity);
assert_eq!(SkippingIndexType::BloomFilter, options.index_type);
assert_eq!(DEFAULT_FALSE_POSITIVE_RATE, options.false_positive_rate());
let options_str = serde_json::to_string(&options).unwrap();
assert_eq!(options_str, "{\"granularity\":10240,\"false-positive-rate-in-10000\":100,\"index-type\":\"BloomFilter\"}");
}
#[test]
fn test_fulltext_options_deserialization() {
let original_options = "{\"enable\":true,\"analyzer\":\"English\",\"case-sensitive\":false,\"backend\":\"bloom\",\"granularity\":1024,\"false-positive-rate-in-10000\":10}";
let options = serde_json::from_str::<FulltextOptions>(original_options).unwrap();
assert!(!options.case_sensitive);
assert!(options.enable);
assert_eq!(FulltextBackend::Bloom, options.backend);
assert_eq!(FulltextAnalyzer::default(), options.analyzer);
assert_eq!(1024, options.granularity);
assert_eq!(0.001, options.false_positive_rate());
let options_str = serde_json::to_string(&options).unwrap();
assert_eq!(options_str, original_options);
}
#[test]
fn test_fulltext_options_deserialization_v0_14_to_v0_15() {
// 0.14 to 0.15
let options = "{\"enable\":true,\"analyzer\":\"English\",\"case-sensitive\":false,\"backend\":\"bloom\"}";
let options = serde_json::from_str::<FulltextOptions>(options).unwrap();
assert!(!options.case_sensitive);
assert!(options.enable);
assert_eq!(FulltextBackend::Bloom, options.backend);
assert_eq!(FulltextAnalyzer::default(), options.analyzer);
assert_eq!(DEFAULT_GRANULARITY, options.granularity);
assert_eq!(DEFAULT_FALSE_POSITIVE_RATE, options.false_positive_rate());
let options_str = serde_json::to_string(&options).unwrap();
assert_eq!(options_str, "{\"enable\":true,\"analyzer\":\"English\",\"case-sensitive\":false,\"backend\":\"bloom\",\"granularity\":10240,\"false-positive-rate-in-10000\":100}");
}
}

View File

@@ -12,6 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use arrow_array::{
ArrayRef, PrimitiveArray, TimestampMicrosecondArray, TimestampMillisecondArray,
TimestampNanosecondArray, TimestampSecondArray,
};
use arrow_schema::DataType;
use common_time::timestamp::TimeUnit;
use common_time::Timestamp;
use paste::paste;
@@ -138,6 +143,41 @@ define_timestamp_with_unit!(Millisecond);
define_timestamp_with_unit!(Microsecond);
define_timestamp_with_unit!(Nanosecond);
pub fn timestamp_array_to_primitive(
ts_array: &ArrayRef,
) -> Option<(
PrimitiveArray<arrow_array::types::Int64Type>,
arrow::datatypes::TimeUnit,
)> {
let DataType::Timestamp(unit, _) = ts_array.data_type() else {
return None;
};
let ts_primitive = match unit {
arrow_schema::TimeUnit::Second => ts_array
.as_any()
.downcast_ref::<TimestampSecondArray>()
.unwrap()
.reinterpret_cast::<arrow_array::types::Int64Type>(),
arrow_schema::TimeUnit::Millisecond => ts_array
.as_any()
.downcast_ref::<TimestampMillisecondArray>()
.unwrap()
.reinterpret_cast::<arrow_array::types::Int64Type>(),
arrow_schema::TimeUnit::Microsecond => ts_array
.as_any()
.downcast_ref::<TimestampMicrosecondArray>()
.unwrap()
.reinterpret_cast::<arrow_array::types::Int64Type>(),
arrow_schema::TimeUnit::Nanosecond => ts_array
.as_any()
.downcast_ref::<TimestampNanosecondArray>()
.unwrap()
.reinterpret_cast::<arrow_array::types::Int64Type>(),
};
Some((ts_primitive, *unit))
}
#[cfg(test)]
mod tests {
use common_time::timezone::set_default_timezone;

View File

@@ -1020,6 +1020,18 @@ pub enum Error {
location: Location,
source: mito_codec::error::Error,
},
#[snafu(display(
"Inconsistent timestamp column length, expect: {}, actual: {}",
expected,
actual
))]
InconsistentTimestampLength {
expected: usize,
actual: usize,
#[snafu(implicit)]
location: Location,
},
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@@ -1175,6 +1187,8 @@ impl ErrorExt for Error {
ConvertBulkWalEntry { source, .. } => source.status_code(),
Encode { source, .. } | Decode { source, .. } => source.status_code(),
InconsistentTimestampLength { .. } => StatusCode::InvalidArguments,
}
}

View File

@@ -15,15 +15,11 @@
//! Handles bulk insert requests.
use datatypes::arrow;
use datatypes::arrow::array::{
TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray,
TimestampSecondArray,
};
use datatypes::arrow::datatypes::{DataType, TimeUnit};
use store_api::logstore::LogStore;
use store_api::metadata::RegionMetadataRef;
use store_api::region_request::RegionBulkInsertsRequest;
use crate::error::InconsistentTimestampLengthSnafu;
use crate::memtable::bulk::part::BulkPart;
use crate::request::{OptionOutputTx, SenderBulkRequest};
use crate::worker::RegionWorkerLoop;
@@ -41,6 +37,10 @@ impl<S: LogStore> RegionWorkerLoop<S> {
.with_label_values(&["process_bulk_req"])
.start_timer();
let batch = request.payload;
if batch.num_rows() == 0 {
sender.send(Ok(0));
return;
}
let Some((ts_index, ts)) = batch
.schema()
@@ -60,55 +60,23 @@ impl<S: LogStore> RegionWorkerLoop<S> {
return;
};
let DataType::Timestamp(unit, _) = ts.data_type() else {
// safety: ts data type must be a timestamp type.
unreachable!()
};
if batch.num_rows() != ts.len() {
sender.send(
InconsistentTimestampLengthSnafu {
expected: batch.num_rows(),
actual: ts.len(),
}
.fail(),
);
return;
}
let (min_ts, max_ts) = match unit {
TimeUnit::Second => {
let ts = ts.as_any().downcast_ref::<TimestampSecondArray>().unwrap();
(
//safety: ts array must contain at least one row so this won't return None.
arrow::compute::min(ts).unwrap(),
arrow::compute::max(ts).unwrap(),
)
}
// safety: ts data type must be a timestamp type.
let (ts_primitive, _) = datatypes::timestamp::timestamp_array_to_primitive(ts).unwrap();
TimeUnit::Millisecond => {
let ts = ts
.as_any()
.downcast_ref::<TimestampMillisecondArray>()
.unwrap();
(
//safety: ts array must contain at least one row so this won't return None.
arrow::compute::min(ts).unwrap(),
arrow::compute::max(ts).unwrap(),
)
}
TimeUnit::Microsecond => {
let ts = ts
.as_any()
.downcast_ref::<TimestampMicrosecondArray>()
.unwrap();
(
//safety: ts array must contain at least one row so this won't return None.
arrow::compute::min(ts).unwrap(),
arrow::compute::max(ts).unwrap(),
)
}
TimeUnit::Nanosecond => {
let ts = ts
.as_any()
.downcast_ref::<TimestampNanosecondArray>()
.unwrap();
(
//safety: ts array must contain at least one row so this won't return None.
arrow::compute::min(ts).unwrap(),
arrow::compute::max(ts).unwrap(),
)
}
};
// safety: we've checked ts.len() == batch.num_rows() and batch is not empty
let min_ts = arrow::compute::min(&ts_primitive).unwrap();
let max_ts = arrow::compute::max(&ts_primitive).unwrap();
let part = BulkPart {
batch,

View File

@@ -20,11 +20,7 @@ use api::v1::region::{
bulk_insert_request, region_request, BulkInsertRequest, RegionRequest, RegionRequestHeader,
};
use api::v1::ArrowIpc;
use arrow::array::{
Array, TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray,
TimestampSecondArray,
};
use arrow::datatypes::{DataType, Int64Type, TimeUnit};
use arrow::array::Array;
use arrow::record_batch::RecordBatch;
use common_base::AffectedRows;
use common_grpc::flight::{FlightDecoder, FlightEncoder, FlightMessage};
@@ -62,6 +58,10 @@ impl Inserter {
};
decode_timer.observe_duration();
if record_batch.num_rows() == 0 {
return Ok(0);
}
// notify flownode to update dirty timestamps if flow is configured.
self.maybe_update_flow_dirty_window(table_info, record_batch.clone());
@@ -155,6 +155,9 @@ impl Inserter {
let mut raw_data_bytes = None;
for (peer, masks) in mask_per_datanode {
for (region_id, mask) in masks {
if mask.select_none() {
continue;
}
let rb = record_batch.clone();
let schema_bytes = schema_bytes.clone();
let node_manager = self.node_manager.clone();
@@ -304,32 +307,11 @@ fn extract_timestamps(rb: &RecordBatch, timestamp_index_name: &str) -> error::Re
if rb.num_rows() == 0 {
return Ok(vec![]);
}
let primitive = match ts_col.data_type() {
DataType::Timestamp(unit, _) => match unit {
TimeUnit::Second => ts_col
.as_any()
.downcast_ref::<TimestampSecondArray>()
.unwrap()
.reinterpret_cast::<Int64Type>(),
TimeUnit::Millisecond => ts_col
.as_any()
.downcast_ref::<TimestampMillisecondArray>()
.unwrap()
.reinterpret_cast::<Int64Type>(),
TimeUnit::Microsecond => ts_col
.as_any()
.downcast_ref::<TimestampMicrosecondArray>()
.unwrap()
.reinterpret_cast::<Int64Type>(),
TimeUnit::Nanosecond => ts_col
.as_any()
.downcast_ref::<TimestampNanosecondArray>()
.unwrap()
.reinterpret_cast::<Int64Type>(),
},
t => {
return error::InvalidTimeIndexTypeSnafu { ty: t.clone() }.fail();
}
};
let (primitive, _) =
datatypes::timestamp::timestamp_array_to_primitive(ts_col).with_context(|| {
error::InvalidTimeIndexTypeSnafu {
ty: ts_col.data_type().clone(),
}
})?;
Ok(primitive.iter().flatten().collect())
}