mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-06 05:12:54 +00:00
Compare commits
6 Commits
revert-749
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
522ca99cd6 | ||
|
|
2d756b24c8 | ||
|
|
527a1c03f3 | ||
|
|
7e243632c7 | ||
|
|
3556eb4476 | ||
|
|
9343da7fe8 |
@@ -70,19 +70,23 @@ runs:
|
|||||||
--wait \
|
--wait \
|
||||||
--wait-for-jobs
|
--wait-for-jobs
|
||||||
- name: Wait for GreptimeDB
|
- name: Wait for GreptimeDB
|
||||||
shell: bash
|
uses: nick-fields/retry@v3
|
||||||
run: |
|
with:
|
||||||
while true; do
|
timeout_minutes: 3
|
||||||
PHASE=$(kubectl -n my-greptimedb get gtc my-greptimedb -o jsonpath='{.status.clusterPhase}')
|
max_attempts: 1
|
||||||
if [ "$PHASE" == "Running" ]; then
|
shell: bash
|
||||||
echo "Cluster is ready"
|
command: |
|
||||||
break
|
while true; do
|
||||||
else
|
PHASE=$(kubectl -n my-greptimedb get gtc my-greptimedb -o jsonpath='{.status.clusterPhase}')
|
||||||
echo "Cluster is not ready yet: Current phase: $PHASE"
|
if [ "$PHASE" == "Running" ]; then
|
||||||
kubectl get pods -n my-greptimedb
|
echo "Cluster is ready"
|
||||||
sleep 5 # wait for 5 seconds before check again.
|
break
|
||||||
fi
|
else
|
||||||
done
|
echo "Cluster is not ready yet: Current phase: $PHASE"
|
||||||
|
kubectl get pods -n my-greptimedb
|
||||||
|
sleep 5 # wait for 5 seconds before check again.
|
||||||
|
fi
|
||||||
|
done
|
||||||
- name: Print GreptimeDB info
|
- name: Print GreptimeDB info
|
||||||
if: always()
|
if: always()
|
||||||
shell: bash
|
shell: bash
|
||||||
|
|||||||
2
Cargo.lock
generated
2
Cargo.lock
generated
@@ -4062,6 +4062,7 @@ dependencies = [
|
|||||||
"mito2",
|
"mito2",
|
||||||
"num_cpus",
|
"num_cpus",
|
||||||
"object-store",
|
"object-store",
|
||||||
|
"partition",
|
||||||
"prometheus",
|
"prometheus",
|
||||||
"prost 0.13.5",
|
"prost 0.13.5",
|
||||||
"query",
|
"query",
|
||||||
@@ -9474,6 +9475,7 @@ dependencies = [
|
|||||||
"ahash 0.8.12",
|
"ahash 0.8.12",
|
||||||
"api",
|
"api",
|
||||||
"arrow",
|
"arrow",
|
||||||
|
"arrow-schema",
|
||||||
"async-trait",
|
"async-trait",
|
||||||
"catalog",
|
"catalog",
|
||||||
"chrono",
|
"chrono",
|
||||||
|
|||||||
@@ -895,7 +895,7 @@ pub fn is_column_type_value_eq(
|
|||||||
.unwrap_or(false)
|
.unwrap_or(false)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn encode_json_value(value: JsonValue) -> v1::JsonValue {
|
pub fn encode_json_value(value: JsonValue) -> v1::JsonValue {
|
||||||
fn helper(json: JsonVariant) -> v1::JsonValue {
|
fn helper(json: JsonVariant) -> v1::JsonValue {
|
||||||
let value = match json {
|
let value = match json {
|
||||||
JsonVariant::Null => None,
|
JsonVariant::Null => None,
|
||||||
|
|||||||
@@ -17,8 +17,8 @@ use std::collections::HashMap;
|
|||||||
use arrow_schema::extension::{EXTENSION_TYPE_METADATA_KEY, EXTENSION_TYPE_NAME_KEY};
|
use arrow_schema::extension::{EXTENSION_TYPE_METADATA_KEY, EXTENSION_TYPE_NAME_KEY};
|
||||||
use datatypes::schema::{
|
use datatypes::schema::{
|
||||||
COMMENT_KEY, ColumnDefaultConstraint, ColumnSchema, FULLTEXT_KEY, FulltextAnalyzer,
|
COMMENT_KEY, ColumnDefaultConstraint, ColumnSchema, FULLTEXT_KEY, FulltextAnalyzer,
|
||||||
FulltextBackend, FulltextOptions, INVERTED_INDEX_KEY, SKIPPING_INDEX_KEY, SkippingIndexOptions,
|
FulltextBackend, FulltextOptions, INVERTED_INDEX_KEY, Metadata, SKIPPING_INDEX_KEY,
|
||||||
SkippingIndexType,
|
SkippingIndexOptions, SkippingIndexType,
|
||||||
};
|
};
|
||||||
use greptime_proto::v1::{
|
use greptime_proto::v1::{
|
||||||
Analyzer, FulltextBackend as PbFulltextBackend, SkippingIndexType as PbSkippingIndexType,
|
Analyzer, FulltextBackend as PbFulltextBackend, SkippingIndexType as PbSkippingIndexType,
|
||||||
@@ -36,6 +36,14 @@ const INVERTED_INDEX_GRPC_KEY: &str = "inverted_index";
|
|||||||
/// Key used to store skip index options in gRPC column options.
|
/// Key used to store skip index options in gRPC column options.
|
||||||
const SKIPPING_INDEX_GRPC_KEY: &str = "skipping_index";
|
const SKIPPING_INDEX_GRPC_KEY: &str = "skipping_index";
|
||||||
|
|
||||||
|
const COLUMN_OPTION_MAPPINGS: [(&str, &str); 5] = [
|
||||||
|
(FULLTEXT_GRPC_KEY, FULLTEXT_KEY),
|
||||||
|
(INVERTED_INDEX_GRPC_KEY, INVERTED_INDEX_KEY),
|
||||||
|
(SKIPPING_INDEX_GRPC_KEY, SKIPPING_INDEX_KEY),
|
||||||
|
(EXTENSION_TYPE_NAME_KEY, EXTENSION_TYPE_NAME_KEY),
|
||||||
|
(EXTENSION_TYPE_METADATA_KEY, EXTENSION_TYPE_METADATA_KEY),
|
||||||
|
];
|
||||||
|
|
||||||
/// Tries to construct a `ColumnSchema` from the given `ColumnDef`.
|
/// Tries to construct a `ColumnSchema` from the given `ColumnDef`.
|
||||||
pub fn try_as_column_schema(column_def: &ColumnDef) -> Result<ColumnSchema> {
|
pub fn try_as_column_schema(column_def: &ColumnDef) -> Result<ColumnSchema> {
|
||||||
let data_type = ColumnDataTypeWrapper::try_new(
|
let data_type = ColumnDataTypeWrapper::try_new(
|
||||||
@@ -131,6 +139,21 @@ pub fn try_as_column_def(column_schema: &ColumnSchema, is_primary_key: bool) ->
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Collect the [ColumnOptions] into the [Metadata] that can be used in, for example, [ColumnSchema].
|
||||||
|
pub fn collect_column_options(column_options: Option<&ColumnOptions>) -> Metadata {
|
||||||
|
let Some(ColumnOptions { options }) = column_options else {
|
||||||
|
return Metadata::default();
|
||||||
|
};
|
||||||
|
|
||||||
|
let mut metadata = Metadata::with_capacity(options.len());
|
||||||
|
for (x, y) in COLUMN_OPTION_MAPPINGS {
|
||||||
|
if let Some(v) = options.get(x) {
|
||||||
|
metadata.insert(y.to_string(), v.clone());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
metadata
|
||||||
|
}
|
||||||
|
|
||||||
/// Constructs a `ColumnOptions` from the given `ColumnSchema`.
|
/// Constructs a `ColumnOptions` from the given `ColumnSchema`.
|
||||||
pub fn options_from_column_schema(column_schema: &ColumnSchema) -> Option<ColumnOptions> {
|
pub fn options_from_column_schema(column_schema: &ColumnSchema) -> Option<ColumnOptions> {
|
||||||
let mut options = ColumnOptions::default();
|
let mut options = ColumnOptions::default();
|
||||||
|
|||||||
@@ -399,8 +399,8 @@ impl InformationSchemaColumnsBuilder {
|
|||||||
self.is_nullables.push(Some("No"));
|
self.is_nullables.push(Some("No"));
|
||||||
}
|
}
|
||||||
self.column_types.push(Some(&data_type));
|
self.column_types.push(Some(&data_type));
|
||||||
self.column_comments
|
let column_comment = column_schema.column_comment().map(|x| x.as_ref());
|
||||||
.push(column_schema.column_comment().map(|x| x.as_ref()));
|
self.column_comments.push(column_comment);
|
||||||
}
|
}
|
||||||
|
|
||||||
fn finish(&mut self) -> Result<RecordBatch> {
|
fn finish(&mut self) -> Result<RecordBatch> {
|
||||||
|
|||||||
@@ -92,7 +92,7 @@ impl StoreConfig {
|
|||||||
pub fn tls_config(&self) -> Option<TlsOption> {
|
pub fn tls_config(&self) -> Option<TlsOption> {
|
||||||
if self.backend_tls_mode != TlsMode::Disable {
|
if self.backend_tls_mode != TlsMode::Disable {
|
||||||
Some(TlsOption {
|
Some(TlsOption {
|
||||||
mode: self.backend_tls_mode.clone(),
|
mode: self.backend_tls_mode,
|
||||||
cert_path: self.backend_tls_cert_path.clone(),
|
cert_path: self.backend_tls_cert_path.clone(),
|
||||||
key_path: self.backend_tls_key_path.clone(),
|
key_path: self.backend_tls_key_path.clone(),
|
||||||
ca_cert_path: self.backend_tls_ca_cert_path.clone(),
|
ca_cert_path: self.backend_tls_ca_cert_path.clone(),
|
||||||
|
|||||||
@@ -236,7 +236,7 @@ impl StartCommand {
|
|||||||
};
|
};
|
||||||
|
|
||||||
let tls_opts = TlsOption::new(
|
let tls_opts = TlsOption::new(
|
||||||
self.tls_mode.clone(),
|
self.tls_mode,
|
||||||
self.tls_cert_path.clone(),
|
self.tls_cert_path.clone(),
|
||||||
self.tls_key_path.clone(),
|
self.tls_key_path.clone(),
|
||||||
self.tls_watch,
|
self.tls_watch,
|
||||||
|
|||||||
@@ -261,7 +261,7 @@ impl StartCommand {
|
|||||||
};
|
};
|
||||||
|
|
||||||
let tls_opts = TlsOption::new(
|
let tls_opts = TlsOption::new(
|
||||||
self.tls_mode.clone(),
|
self.tls_mode,
|
||||||
self.tls_cert_path.clone(),
|
self.tls_cert_path.clone(),
|
||||||
self.tls_key_path.clone(),
|
self.tls_key_path.clone(),
|
||||||
self.tls_watch,
|
self.tls_watch,
|
||||||
|
|||||||
@@ -530,6 +530,49 @@ impl Display for EnterStagingRegion {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
|
||||||
|
pub struct RemapManifest {
|
||||||
|
pub region_id: RegionId,
|
||||||
|
/// Regions to remap manifests from.
|
||||||
|
pub input_regions: Vec<RegionId>,
|
||||||
|
/// For each old region, which new regions should receive its files
|
||||||
|
pub region_mapping: HashMap<RegionId, Vec<RegionId>>,
|
||||||
|
/// New partition expressions for the new regions.
|
||||||
|
pub new_partition_exprs: HashMap<RegionId, String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Display for RemapManifest {
|
||||||
|
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
|
||||||
|
write!(
|
||||||
|
f,
|
||||||
|
"RemapManifest(region_id={}, input_regions={:?}, region_mapping={:?}, new_partition_exprs={:?})",
|
||||||
|
self.region_id, self.input_regions, self.region_mapping, self.new_partition_exprs
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
|
||||||
|
pub struct ApplyStagingManifest {
|
||||||
|
/// The region ID to apply the staging manifest to.
|
||||||
|
pub region_id: RegionId,
|
||||||
|
/// The partition expression of the staging region.
|
||||||
|
pub partition_expr: String,
|
||||||
|
/// The region that stores the staging manifests in its staging blob storage.
|
||||||
|
pub central_region_id: RegionId,
|
||||||
|
/// The relative path to the staging manifest within the central region's staging blob storage.
|
||||||
|
pub manifest_path: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Display for ApplyStagingManifest {
|
||||||
|
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
|
||||||
|
write!(
|
||||||
|
f,
|
||||||
|
"ApplyStagingManifest(region_id={}, partition_expr={}, central_region_id={}, manifest_path={})",
|
||||||
|
self.region_id, self.partition_expr, self.central_region_id, self.manifest_path
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize, Display, PartialEq)]
|
#[derive(Debug, Clone, Serialize, Deserialize, Display, PartialEq)]
|
||||||
pub enum Instruction {
|
pub enum Instruction {
|
||||||
/// Opens regions.
|
/// Opens regions.
|
||||||
@@ -559,6 +602,10 @@ pub enum Instruction {
|
|||||||
Suspend,
|
Suspend,
|
||||||
/// Makes regions enter staging state.
|
/// Makes regions enter staging state.
|
||||||
EnterStagingRegions(Vec<EnterStagingRegion>),
|
EnterStagingRegions(Vec<EnterStagingRegion>),
|
||||||
|
/// Remaps manifests for a region.
|
||||||
|
RemapManifest(RemapManifest),
|
||||||
|
/// Applies staging manifests for a region.
|
||||||
|
ApplyStagingManifests(Vec<ApplyStagingManifest>),
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Instruction {
|
impl Instruction {
|
||||||
@@ -737,6 +784,48 @@ impl EnterStagingRegionsReply {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
|
||||||
|
pub struct RemapManifestReply {
|
||||||
|
/// Returns false if the region does not exist.
|
||||||
|
pub exists: bool,
|
||||||
|
/// A map from region IDs to their corresponding remapped manifest paths.
|
||||||
|
pub manifest_paths: HashMap<RegionId, String>,
|
||||||
|
/// Return error if any during the operation.
|
||||||
|
pub error: Option<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Display for RemapManifestReply {
|
||||||
|
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
|
||||||
|
write!(
|
||||||
|
f,
|
||||||
|
"RemapManifestReply(manifest_paths={:?}, error={:?})",
|
||||||
|
self.manifest_paths, self.error
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
|
||||||
|
pub struct ApplyStagingManifestsReply {
|
||||||
|
pub replies: Vec<ApplyStagingManifestReply>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ApplyStagingManifestsReply {
|
||||||
|
pub fn new(replies: Vec<ApplyStagingManifestReply>) -> Self {
|
||||||
|
Self { replies }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
|
||||||
|
pub struct ApplyStagingManifestReply {
|
||||||
|
pub region_id: RegionId,
|
||||||
|
/// Returns true if the region is ready to serve reads and writes.
|
||||||
|
pub ready: bool,
|
||||||
|
/// Indicates whether the region exists.
|
||||||
|
pub exists: bool,
|
||||||
|
/// Return error if any during the operation.
|
||||||
|
pub error: Option<String>,
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
|
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
|
||||||
#[serde(tag = "type", rename_all = "snake_case")]
|
#[serde(tag = "type", rename_all = "snake_case")]
|
||||||
pub enum InstructionReply {
|
pub enum InstructionReply {
|
||||||
@@ -758,6 +847,8 @@ pub enum InstructionReply {
|
|||||||
GetFileRefs(GetFileRefsReply),
|
GetFileRefs(GetFileRefsReply),
|
||||||
GcRegions(GcRegionsReply),
|
GcRegions(GcRegionsReply),
|
||||||
EnterStagingRegions(EnterStagingRegionsReply),
|
EnterStagingRegions(EnterStagingRegionsReply),
|
||||||
|
RemapManifest(RemapManifestReply),
|
||||||
|
ApplyStagingManifests(ApplyStagingManifestsReply),
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Display for InstructionReply {
|
impl Display for InstructionReply {
|
||||||
@@ -781,6 +872,12 @@ impl Display for InstructionReply {
|
|||||||
reply.replies
|
reply.replies
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
Self::RemapManifest(reply) => write!(f, "InstructionReply::RemapManifest({})", reply),
|
||||||
|
Self::ApplyStagingManifests(reply) => write!(
|
||||||
|
f,
|
||||||
|
"InstructionReply::ApplyStagingManifests({:?})",
|
||||||
|
reply.replies
|
||||||
|
),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -828,6 +925,20 @@ impl InstructionReply {
|
|||||||
_ => panic!("Expected EnterStagingRegion reply"),
|
_ => panic!("Expected EnterStagingRegion reply"),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn expect_remap_manifest_reply(self) -> RemapManifestReply {
|
||||||
|
match self {
|
||||||
|
Self::RemapManifest(reply) => reply,
|
||||||
|
_ => panic!("Expected RemapManifest reply"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn expect_apply_staging_manifests_reply(self) -> Vec<ApplyStagingManifestReply> {
|
||||||
|
match self {
|
||||||
|
Self::ApplyStagingManifests(reply) => reply.replies,
|
||||||
|
_ => panic!("Expected ApplyStagingManifest reply"),
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
|
|||||||
@@ -77,4 +77,5 @@ common-query.workspace = true
|
|||||||
common-test-util.workspace = true
|
common-test-util.workspace = true
|
||||||
datafusion-common.workspace = true
|
datafusion-common.workspace = true
|
||||||
mito2 = { workspace = true, features = ["test"] }
|
mito2 = { workspace = true, features = ["test"] }
|
||||||
|
partition.workspace = true
|
||||||
session.workspace = true
|
session.workspace = true
|
||||||
|
|||||||
@@ -201,6 +201,7 @@ pub enum Error {
|
|||||||
ShutdownServer {
|
ShutdownServer {
|
||||||
#[snafu(implicit)]
|
#[snafu(implicit)]
|
||||||
location: Location,
|
location: Location,
|
||||||
|
#[snafu(source)]
|
||||||
source: servers::error::Error,
|
source: servers::error::Error,
|
||||||
},
|
},
|
||||||
|
|
||||||
@@ -208,6 +209,7 @@ pub enum Error {
|
|||||||
ShutdownInstance {
|
ShutdownInstance {
|
||||||
#[snafu(implicit)]
|
#[snafu(implicit)]
|
||||||
location: Location,
|
location: Location,
|
||||||
|
#[snafu(source)]
|
||||||
source: BoxedError,
|
source: BoxedError,
|
||||||
},
|
},
|
||||||
|
|
||||||
|
|||||||
@@ -22,6 +22,7 @@ use common_telemetry::error;
|
|||||||
use snafu::OptionExt;
|
use snafu::OptionExt;
|
||||||
use store_api::storage::GcReport;
|
use store_api::storage::GcReport;
|
||||||
|
|
||||||
|
mod apply_staging_manifest;
|
||||||
mod close_region;
|
mod close_region;
|
||||||
mod downgrade_region;
|
mod downgrade_region;
|
||||||
mod enter_staging;
|
mod enter_staging;
|
||||||
@@ -29,8 +30,10 @@ mod file_ref;
|
|||||||
mod flush_region;
|
mod flush_region;
|
||||||
mod gc_worker;
|
mod gc_worker;
|
||||||
mod open_region;
|
mod open_region;
|
||||||
|
mod remap_manifest;
|
||||||
mod upgrade_region;
|
mod upgrade_region;
|
||||||
|
|
||||||
|
use crate::heartbeat::handler::apply_staging_manifest::ApplyStagingManifestsHandler;
|
||||||
use crate::heartbeat::handler::close_region::CloseRegionsHandler;
|
use crate::heartbeat::handler::close_region::CloseRegionsHandler;
|
||||||
use crate::heartbeat::handler::downgrade_region::DowngradeRegionsHandler;
|
use crate::heartbeat::handler::downgrade_region::DowngradeRegionsHandler;
|
||||||
use crate::heartbeat::handler::enter_staging::EnterStagingRegionsHandler;
|
use crate::heartbeat::handler::enter_staging::EnterStagingRegionsHandler;
|
||||||
@@ -38,6 +41,7 @@ use crate::heartbeat::handler::file_ref::GetFileRefsHandler;
|
|||||||
use crate::heartbeat::handler::flush_region::FlushRegionsHandler;
|
use crate::heartbeat::handler::flush_region::FlushRegionsHandler;
|
||||||
use crate::heartbeat::handler::gc_worker::GcRegionsHandler;
|
use crate::heartbeat::handler::gc_worker::GcRegionsHandler;
|
||||||
use crate::heartbeat::handler::open_region::OpenRegionsHandler;
|
use crate::heartbeat::handler::open_region::OpenRegionsHandler;
|
||||||
|
use crate::heartbeat::handler::remap_manifest::RemapManifestHandler;
|
||||||
use crate::heartbeat::handler::upgrade_region::UpgradeRegionsHandler;
|
use crate::heartbeat::handler::upgrade_region::UpgradeRegionsHandler;
|
||||||
use crate::heartbeat::task_tracker::TaskTracker;
|
use crate::heartbeat::task_tracker::TaskTracker;
|
||||||
use crate::region_server::RegionServer;
|
use crate::region_server::RegionServer;
|
||||||
@@ -128,6 +132,10 @@ impl RegionHeartbeatResponseHandler {
|
|||||||
Instruction::EnterStagingRegions(_) => {
|
Instruction::EnterStagingRegions(_) => {
|
||||||
Ok(Some(Box::new(EnterStagingRegionsHandler.into())))
|
Ok(Some(Box::new(EnterStagingRegionsHandler.into())))
|
||||||
}
|
}
|
||||||
|
Instruction::RemapManifest(_) => Ok(Some(Box::new(RemapManifestHandler.into()))),
|
||||||
|
Instruction::ApplyStagingManifests(_) => {
|
||||||
|
Ok(Some(Box::new(ApplyStagingManifestsHandler.into())))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -142,6 +150,8 @@ pub enum InstructionHandlers {
|
|||||||
GetFileRefs(GetFileRefsHandler),
|
GetFileRefs(GetFileRefsHandler),
|
||||||
GcRegions(GcRegionsHandler),
|
GcRegions(GcRegionsHandler),
|
||||||
EnterStagingRegions(EnterStagingRegionsHandler),
|
EnterStagingRegions(EnterStagingRegionsHandler),
|
||||||
|
RemapManifest(RemapManifestHandler),
|
||||||
|
ApplyStagingManifests(ApplyStagingManifestsHandler),
|
||||||
}
|
}
|
||||||
|
|
||||||
macro_rules! impl_from_handler {
|
macro_rules! impl_from_handler {
|
||||||
@@ -164,7 +174,9 @@ impl_from_handler!(
|
|||||||
UpgradeRegionsHandler => UpgradeRegions,
|
UpgradeRegionsHandler => UpgradeRegions,
|
||||||
GetFileRefsHandler => GetFileRefs,
|
GetFileRefsHandler => GetFileRefs,
|
||||||
GcRegionsHandler => GcRegions,
|
GcRegionsHandler => GcRegions,
|
||||||
EnterStagingRegionsHandler => EnterStagingRegions
|
EnterStagingRegionsHandler => EnterStagingRegions,
|
||||||
|
RemapManifestHandler => RemapManifest,
|
||||||
|
ApplyStagingManifestsHandler => ApplyStagingManifests
|
||||||
);
|
);
|
||||||
|
|
||||||
macro_rules! dispatch_instr {
|
macro_rules! dispatch_instr {
|
||||||
@@ -209,7 +221,9 @@ dispatch_instr!(
|
|||||||
UpgradeRegions => UpgradeRegions,
|
UpgradeRegions => UpgradeRegions,
|
||||||
GetFileRefs => GetFileRefs,
|
GetFileRefs => GetFileRefs,
|
||||||
GcRegions => GcRegions,
|
GcRegions => GcRegions,
|
||||||
EnterStagingRegions => EnterStagingRegions
|
EnterStagingRegions => EnterStagingRegions,
|
||||||
|
RemapManifest => RemapManifest,
|
||||||
|
ApplyStagingManifests => ApplyStagingManifests,
|
||||||
);
|
);
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
|
|||||||
287
src/datanode/src/heartbeat/handler/apply_staging_manifest.rs
Normal file
287
src/datanode/src/heartbeat/handler/apply_staging_manifest.rs
Normal file
@@ -0,0 +1,287 @@
|
|||||||
|
// Copyright 2023 Greptime Team
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
use common_meta::instruction::{
|
||||||
|
ApplyStagingManifest, ApplyStagingManifestReply, ApplyStagingManifestsReply, InstructionReply,
|
||||||
|
};
|
||||||
|
use common_telemetry::{error, warn};
|
||||||
|
use futures::future::join_all;
|
||||||
|
use store_api::region_request::{ApplyStagingManifestRequest, RegionRequest};
|
||||||
|
|
||||||
|
use crate::heartbeat::handler::{HandlerContext, InstructionHandler};
|
||||||
|
|
||||||
|
pub struct ApplyStagingManifestsHandler;
|
||||||
|
|
||||||
|
#[async_trait::async_trait]
|
||||||
|
impl InstructionHandler for ApplyStagingManifestsHandler {
|
||||||
|
type Instruction = Vec<ApplyStagingManifest>;
|
||||||
|
async fn handle(
|
||||||
|
&self,
|
||||||
|
ctx: &HandlerContext,
|
||||||
|
requests: Self::Instruction,
|
||||||
|
) -> Option<InstructionReply> {
|
||||||
|
let results = join_all(
|
||||||
|
requests
|
||||||
|
.into_iter()
|
||||||
|
.map(|request| Self::handle_apply_staging_manifest(ctx, request)),
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
Some(InstructionReply::ApplyStagingManifests(
|
||||||
|
ApplyStagingManifestsReply::new(results),
|
||||||
|
))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ApplyStagingManifestsHandler {
|
||||||
|
async fn handle_apply_staging_manifest(
|
||||||
|
ctx: &HandlerContext,
|
||||||
|
request: ApplyStagingManifest,
|
||||||
|
) -> ApplyStagingManifestReply {
|
||||||
|
let Some(leader) = ctx.region_server.is_region_leader(request.region_id) else {
|
||||||
|
warn!("Region: {} is not found", request.region_id);
|
||||||
|
return ApplyStagingManifestReply {
|
||||||
|
region_id: request.region_id,
|
||||||
|
exists: false,
|
||||||
|
ready: false,
|
||||||
|
error: None,
|
||||||
|
};
|
||||||
|
};
|
||||||
|
if !leader {
|
||||||
|
warn!("Region: {} is not leader", request.region_id);
|
||||||
|
return ApplyStagingManifestReply {
|
||||||
|
region_id: request.region_id,
|
||||||
|
exists: true,
|
||||||
|
ready: false,
|
||||||
|
error: Some("Region is not leader".into()),
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
match ctx
|
||||||
|
.region_server
|
||||||
|
.handle_request(
|
||||||
|
request.region_id,
|
||||||
|
RegionRequest::ApplyStagingManifest(ApplyStagingManifestRequest {
|
||||||
|
partition_expr: request.partition_expr,
|
||||||
|
central_region_id: request.central_region_id,
|
||||||
|
manifest_path: request.manifest_path,
|
||||||
|
}),
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
Ok(_) => ApplyStagingManifestReply {
|
||||||
|
region_id: request.region_id,
|
||||||
|
exists: true,
|
||||||
|
ready: true,
|
||||||
|
error: None,
|
||||||
|
},
|
||||||
|
Err(err) => {
|
||||||
|
error!(err; "Failed to apply staging manifest");
|
||||||
|
ApplyStagingManifestReply {
|
||||||
|
region_id: request.region_id,
|
||||||
|
exists: true,
|
||||||
|
ready: false,
|
||||||
|
error: Some(format!("{err:?}")),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use std::collections::HashMap;
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use common_meta::instruction::RemapManifest;
|
||||||
|
use datatypes::value::Value;
|
||||||
|
use mito2::config::MitoConfig;
|
||||||
|
use mito2::engine::MITO_ENGINE_NAME;
|
||||||
|
use mito2::test_util::{CreateRequestBuilder, TestEnv};
|
||||||
|
use partition::expr::{PartitionExpr, col};
|
||||||
|
use store_api::path_utils::table_dir;
|
||||||
|
use store_api::region_engine::RegionRole;
|
||||||
|
use store_api::region_request::EnterStagingRequest;
|
||||||
|
use store_api::storage::RegionId;
|
||||||
|
|
||||||
|
use super::*;
|
||||||
|
use crate::heartbeat::handler::remap_manifest::RemapManifestHandler;
|
||||||
|
use crate::region_server::RegionServer;
|
||||||
|
use crate::tests::{MockRegionEngine, mock_region_server};
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_region_not_exist() {
|
||||||
|
let mut mock_region_server = mock_region_server();
|
||||||
|
let (mock_engine, _) = MockRegionEngine::new(MITO_ENGINE_NAME);
|
||||||
|
mock_region_server.register_engine(mock_engine);
|
||||||
|
let handler_context = HandlerContext::new_for_test(mock_region_server);
|
||||||
|
let region_id = RegionId::new(1024, 1);
|
||||||
|
let reply = ApplyStagingManifestsHandler
|
||||||
|
.handle(
|
||||||
|
&handler_context,
|
||||||
|
vec![ApplyStagingManifest {
|
||||||
|
region_id,
|
||||||
|
partition_expr: "".to_string(),
|
||||||
|
central_region_id: RegionId::new(1024, 9999), // use a dummy value
|
||||||
|
manifest_path: "".to_string(),
|
||||||
|
}],
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
let replies = reply.expect_apply_staging_manifests_reply();
|
||||||
|
let reply = &replies[0];
|
||||||
|
assert!(!reply.exists);
|
||||||
|
assert!(!reply.ready);
|
||||||
|
assert!(reply.error.is_none());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_region_not_leader() {
|
||||||
|
let mock_region_server = mock_region_server();
|
||||||
|
let region_id = RegionId::new(1024, 1);
|
||||||
|
let (mock_engine, _) =
|
||||||
|
MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
|
||||||
|
region_engine.mock_role = Some(Some(RegionRole::Follower));
|
||||||
|
region_engine.handle_request_mock_fn = Some(Box::new(|_, _| Ok(0)));
|
||||||
|
});
|
||||||
|
mock_region_server.register_test_region(region_id, mock_engine);
|
||||||
|
let handler_context = HandlerContext::new_for_test(mock_region_server);
|
||||||
|
let region_id = RegionId::new(1024, 1);
|
||||||
|
let reply = ApplyStagingManifestsHandler
|
||||||
|
.handle(
|
||||||
|
&handler_context,
|
||||||
|
vec![ApplyStagingManifest {
|
||||||
|
region_id,
|
||||||
|
partition_expr: "".to_string(),
|
||||||
|
central_region_id: RegionId::new(1024, 2),
|
||||||
|
manifest_path: "".to_string(),
|
||||||
|
}],
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
let replies = reply.expect_apply_staging_manifests_reply();
|
||||||
|
let reply = &replies[0];
|
||||||
|
assert!(reply.exists);
|
||||||
|
assert!(!reply.ready);
|
||||||
|
assert!(reply.error.is_some());
|
||||||
|
}
|
||||||
|
|
||||||
|
fn range_expr(col_name: &str, start: i64, end: i64) -> PartitionExpr {
|
||||||
|
col(col_name)
|
||||||
|
.gt_eq(Value::Int64(start))
|
||||||
|
.and(col(col_name).lt(Value::Int64(end)))
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn prepare_region(region_server: &RegionServer) {
|
||||||
|
let region_specs = [
|
||||||
|
(RegionId::new(1024, 1), range_expr("x", 0, 49)),
|
||||||
|
(RegionId::new(1024, 2), range_expr("x", 49, 100)),
|
||||||
|
];
|
||||||
|
|
||||||
|
for (region_id, partition_expr) in region_specs {
|
||||||
|
let builder = CreateRequestBuilder::new();
|
||||||
|
let mut create_req = builder.build();
|
||||||
|
create_req.table_dir = table_dir("test", 1024);
|
||||||
|
region_server
|
||||||
|
.handle_request(region_id, RegionRequest::Create(create_req))
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
region_server
|
||||||
|
.handle_request(
|
||||||
|
region_id,
|
||||||
|
RegionRequest::EnterStaging(EnterStagingRequest {
|
||||||
|
partition_expr: partition_expr.as_json_str().unwrap(),
|
||||||
|
}),
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_apply_staging_manifest() {
|
||||||
|
common_telemetry::init_default_ut_logging();
|
||||||
|
let mut region_server = mock_region_server();
|
||||||
|
let region_id = RegionId::new(1024, 1);
|
||||||
|
let mut engine_env = TestEnv::new().await;
|
||||||
|
let engine = engine_env.create_engine(MitoConfig::default()).await;
|
||||||
|
region_server.register_engine(Arc::new(engine.clone()));
|
||||||
|
prepare_region(®ion_server).await;
|
||||||
|
|
||||||
|
let handler_context = HandlerContext::new_for_test(region_server);
|
||||||
|
let region_id2 = RegionId::new(1024, 2);
|
||||||
|
let reply = RemapManifestHandler
|
||||||
|
.handle(
|
||||||
|
&handler_context,
|
||||||
|
RemapManifest {
|
||||||
|
region_id,
|
||||||
|
input_regions: vec![region_id, region_id2],
|
||||||
|
region_mapping: HashMap::from([
|
||||||
|
// [0,49) <- [0, 50)
|
||||||
|
(region_id, vec![region_id]),
|
||||||
|
// [49, 100) <- [0, 50), [50,100)
|
||||||
|
(region_id2, vec![region_id, region_id2]),
|
||||||
|
]),
|
||||||
|
new_partition_exprs: HashMap::from([
|
||||||
|
(region_id, range_expr("x", 0, 49).as_json_str().unwrap()),
|
||||||
|
(region_id2, range_expr("x", 49, 100).as_json_str().unwrap()),
|
||||||
|
]),
|
||||||
|
},
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
let reply = reply.expect_remap_manifest_reply();
|
||||||
|
assert!(reply.exists);
|
||||||
|
assert!(reply.error.is_none(), "{}", reply.error.unwrap());
|
||||||
|
assert_eq!(reply.manifest_paths.len(), 2);
|
||||||
|
let manifest_path_1 = reply.manifest_paths[®ion_id].clone();
|
||||||
|
let manifest_path_2 = reply.manifest_paths[®ion_id2].clone();
|
||||||
|
|
||||||
|
let reply = ApplyStagingManifestsHandler
|
||||||
|
.handle(
|
||||||
|
&handler_context,
|
||||||
|
vec![ApplyStagingManifest {
|
||||||
|
region_id,
|
||||||
|
partition_expr: range_expr("x", 0, 49).as_json_str().unwrap(),
|
||||||
|
central_region_id: region_id,
|
||||||
|
manifest_path: manifest_path_1,
|
||||||
|
}],
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
let replies = reply.expect_apply_staging_manifests_reply();
|
||||||
|
let reply = &replies[0];
|
||||||
|
assert!(reply.exists);
|
||||||
|
assert!(reply.ready);
|
||||||
|
assert!(reply.error.is_none());
|
||||||
|
|
||||||
|
// partition expr mismatch
|
||||||
|
let reply = ApplyStagingManifestsHandler
|
||||||
|
.handle(
|
||||||
|
&handler_context,
|
||||||
|
vec![ApplyStagingManifest {
|
||||||
|
region_id: region_id2,
|
||||||
|
partition_expr: range_expr("x", 50, 100).as_json_str().unwrap(),
|
||||||
|
central_region_id: region_id,
|
||||||
|
manifest_path: manifest_path_2,
|
||||||
|
}],
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
let replies = reply.expect_apply_staging_manifests_reply();
|
||||||
|
let reply = &replies[0];
|
||||||
|
assert!(reply.exists);
|
||||||
|
assert!(!reply.ready);
|
||||||
|
assert!(reply.error.is_some());
|
||||||
|
}
|
||||||
|
}
|
||||||
246
src/datanode/src/heartbeat/handler/remap_manifest.rs
Normal file
246
src/datanode/src/heartbeat/handler/remap_manifest.rs
Normal file
@@ -0,0 +1,246 @@
|
|||||||
|
// Copyright 2023 Greptime Team
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
use common_meta::instruction::{InstructionReply, RemapManifest, RemapManifestReply};
|
||||||
|
use common_telemetry::warn;
|
||||||
|
use store_api::region_engine::RemapManifestsRequest;
|
||||||
|
|
||||||
|
use crate::heartbeat::handler::{HandlerContext, InstructionHandler};
|
||||||
|
|
||||||
|
pub struct RemapManifestHandler;
|
||||||
|
|
||||||
|
#[async_trait::async_trait]
|
||||||
|
impl InstructionHandler for RemapManifestHandler {
|
||||||
|
type Instruction = RemapManifest;
|
||||||
|
async fn handle(
|
||||||
|
&self,
|
||||||
|
ctx: &HandlerContext,
|
||||||
|
request: Self::Instruction,
|
||||||
|
) -> Option<InstructionReply> {
|
||||||
|
let RemapManifest {
|
||||||
|
region_id,
|
||||||
|
input_regions,
|
||||||
|
region_mapping,
|
||||||
|
new_partition_exprs,
|
||||||
|
} = request;
|
||||||
|
let Some(leader) = ctx.region_server.is_region_leader(region_id) else {
|
||||||
|
warn!("Region: {} is not found", region_id);
|
||||||
|
return Some(InstructionReply::RemapManifest(RemapManifestReply {
|
||||||
|
exists: false,
|
||||||
|
manifest_paths: Default::default(),
|
||||||
|
error: None,
|
||||||
|
}));
|
||||||
|
};
|
||||||
|
|
||||||
|
if !leader {
|
||||||
|
warn!("Region: {} is not leader", region_id);
|
||||||
|
return Some(InstructionReply::RemapManifest(RemapManifestReply {
|
||||||
|
exists: true,
|
||||||
|
manifest_paths: Default::default(),
|
||||||
|
error: Some("Region is not leader".into()),
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
|
||||||
|
let reply = match ctx
|
||||||
|
.region_server
|
||||||
|
.remap_manifests(RemapManifestsRequest {
|
||||||
|
region_id,
|
||||||
|
input_regions,
|
||||||
|
region_mapping,
|
||||||
|
new_partition_exprs,
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
Ok(result) => InstructionReply::RemapManifest(RemapManifestReply {
|
||||||
|
exists: true,
|
||||||
|
manifest_paths: result.manifest_paths,
|
||||||
|
error: None,
|
||||||
|
}),
|
||||||
|
Err(e) => InstructionReply::RemapManifest(RemapManifestReply {
|
||||||
|
exists: true,
|
||||||
|
manifest_paths: Default::default(),
|
||||||
|
error: Some(format!("{e:?}")),
|
||||||
|
}),
|
||||||
|
};
|
||||||
|
|
||||||
|
Some(reply)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use std::collections::HashMap;
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use common_meta::instruction::RemapManifest;
|
||||||
|
use datatypes::value::Value;
|
||||||
|
use mito2::config::MitoConfig;
|
||||||
|
use mito2::engine::MITO_ENGINE_NAME;
|
||||||
|
use mito2::test_util::{CreateRequestBuilder, TestEnv};
|
||||||
|
use partition::expr::{PartitionExpr, col};
|
||||||
|
use store_api::path_utils::table_dir;
|
||||||
|
use store_api::region_engine::RegionRole;
|
||||||
|
use store_api::region_request::{EnterStagingRequest, RegionRequest};
|
||||||
|
use store_api::storage::RegionId;
|
||||||
|
|
||||||
|
use crate::heartbeat::handler::remap_manifest::RemapManifestHandler;
|
||||||
|
use crate::heartbeat::handler::{HandlerContext, InstructionHandler};
|
||||||
|
use crate::region_server::RegionServer;
|
||||||
|
use crate::tests::{MockRegionEngine, mock_region_server};
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_region_not_exist() {
|
||||||
|
let mut mock_region_server = mock_region_server();
|
||||||
|
let (mock_engine, _) = MockRegionEngine::new(MITO_ENGINE_NAME);
|
||||||
|
mock_region_server.register_engine(mock_engine);
|
||||||
|
let handler_context = HandlerContext::new_for_test(mock_region_server);
|
||||||
|
let region_id = RegionId::new(1024, 1);
|
||||||
|
let reply = RemapManifestHandler
|
||||||
|
.handle(
|
||||||
|
&handler_context,
|
||||||
|
RemapManifest {
|
||||||
|
region_id,
|
||||||
|
input_regions: vec![],
|
||||||
|
region_mapping: HashMap::new(),
|
||||||
|
new_partition_exprs: HashMap::new(),
|
||||||
|
},
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
let reply = &reply.expect_remap_manifest_reply();
|
||||||
|
assert!(!reply.exists);
|
||||||
|
assert!(reply.error.is_none());
|
||||||
|
assert!(reply.manifest_paths.is_empty());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_region_not_leader() {
|
||||||
|
let mock_region_server = mock_region_server();
|
||||||
|
let region_id = RegionId::new(1024, 1);
|
||||||
|
let (mock_engine, _) =
|
||||||
|
MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
|
||||||
|
region_engine.mock_role = Some(Some(RegionRole::Follower));
|
||||||
|
region_engine.handle_request_mock_fn = Some(Box::new(|_, _| Ok(0)));
|
||||||
|
});
|
||||||
|
mock_region_server.register_test_region(region_id, mock_engine);
|
||||||
|
let handler_context = HandlerContext::new_for_test(mock_region_server);
|
||||||
|
let reply = RemapManifestHandler
|
||||||
|
.handle(
|
||||||
|
&handler_context,
|
||||||
|
RemapManifest {
|
||||||
|
region_id,
|
||||||
|
input_regions: vec![],
|
||||||
|
region_mapping: HashMap::new(),
|
||||||
|
new_partition_exprs: HashMap::new(),
|
||||||
|
},
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
let reply = reply.expect_remap_manifest_reply();
|
||||||
|
assert!(reply.exists);
|
||||||
|
assert!(reply.error.is_some());
|
||||||
|
}
|
||||||
|
|
||||||
|
fn range_expr(col_name: &str, start: i64, end: i64) -> PartitionExpr {
|
||||||
|
col(col_name)
|
||||||
|
.gt_eq(Value::Int64(start))
|
||||||
|
.and(col(col_name).lt(Value::Int64(end)))
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn prepare_region(region_server: &RegionServer) {
|
||||||
|
let region_specs = [
|
||||||
|
(RegionId::new(1024, 1), range_expr("x", 0, 50)),
|
||||||
|
(RegionId::new(1024, 2), range_expr("x", 50, 100)),
|
||||||
|
];
|
||||||
|
|
||||||
|
for (region_id, partition_expr) in region_specs {
|
||||||
|
let builder = CreateRequestBuilder::new();
|
||||||
|
let mut create_req = builder.build();
|
||||||
|
create_req.table_dir = table_dir("test", 1024);
|
||||||
|
region_server
|
||||||
|
.handle_request(region_id, RegionRequest::Create(create_req))
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
region_server
|
||||||
|
.handle_request(
|
||||||
|
region_id,
|
||||||
|
RegionRequest::EnterStaging(EnterStagingRequest {
|
||||||
|
partition_expr: partition_expr.as_json_str().unwrap(),
|
||||||
|
}),
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_remap_manifest() {
|
||||||
|
common_telemetry::init_default_ut_logging();
|
||||||
|
let mut region_server = mock_region_server();
|
||||||
|
let region_id = RegionId::new(1024, 1);
|
||||||
|
let mut engine_env = TestEnv::new().await;
|
||||||
|
let engine = engine_env.create_engine(MitoConfig::default()).await;
|
||||||
|
region_server.register_engine(Arc::new(engine.clone()));
|
||||||
|
prepare_region(®ion_server).await;
|
||||||
|
|
||||||
|
let handler_context = HandlerContext::new_for_test(region_server);
|
||||||
|
let region_id2 = RegionId::new(1024, 2);
|
||||||
|
let reply = RemapManifestHandler
|
||||||
|
.handle(
|
||||||
|
&handler_context,
|
||||||
|
RemapManifest {
|
||||||
|
region_id,
|
||||||
|
input_regions: vec![region_id, region_id2],
|
||||||
|
region_mapping: HashMap::from([
|
||||||
|
(region_id, vec![region_id]),
|
||||||
|
(region_id2, vec![region_id]),
|
||||||
|
]),
|
||||||
|
new_partition_exprs: HashMap::from([(
|
||||||
|
region_id,
|
||||||
|
range_expr("x", 0, 100).as_json_str().unwrap(),
|
||||||
|
)]),
|
||||||
|
},
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
let reply = reply.expect_remap_manifest_reply();
|
||||||
|
assert!(reply.exists);
|
||||||
|
assert!(reply.error.is_none(), "{}", reply.error.unwrap());
|
||||||
|
assert_eq!(reply.manifest_paths.len(), 1);
|
||||||
|
|
||||||
|
// Remap failed
|
||||||
|
let reply = RemapManifestHandler
|
||||||
|
.handle(
|
||||||
|
&handler_context,
|
||||||
|
RemapManifest {
|
||||||
|
region_id,
|
||||||
|
input_regions: vec![region_id],
|
||||||
|
region_mapping: HashMap::from([
|
||||||
|
(region_id, vec![region_id]),
|
||||||
|
(region_id2, vec![region_id]),
|
||||||
|
]),
|
||||||
|
new_partition_exprs: HashMap::from([(
|
||||||
|
region_id,
|
||||||
|
range_expr("x", 0, 100).as_json_str().unwrap(),
|
||||||
|
)]),
|
||||||
|
},
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
let reply = reply.expect_remap_manifest_reply();
|
||||||
|
assert!(reply.exists);
|
||||||
|
assert!(reply.error.is_some());
|
||||||
|
assert!(reply.manifest_paths.is_empty());
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -65,8 +65,9 @@ use store_api::metric_engine_consts::{
|
|||||||
FILE_ENGINE_NAME, LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME,
|
FILE_ENGINE_NAME, LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME,
|
||||||
};
|
};
|
||||||
use store_api::region_engine::{
|
use store_api::region_engine::{
|
||||||
RegionEngineRef, RegionManifestInfo, RegionRole, RegionStatistic, SetRegionRoleStateResponse,
|
RegionEngineRef, RegionManifestInfo, RegionRole, RegionStatistic, RemapManifestsRequest,
|
||||||
SettableRegionRoleState, SyncRegionFromRequest,
|
RemapManifestsResponse, SetRegionRoleStateResponse, SettableRegionRoleState,
|
||||||
|
SyncRegionFromRequest,
|
||||||
};
|
};
|
||||||
use store_api::region_request::{
|
use store_api::region_request::{
|
||||||
AffectedRows, BatchRegionDdlRequest, RegionCatchupRequest, RegionCloseRequest,
|
AffectedRows, BatchRegionDdlRequest, RegionCatchupRequest, RegionCloseRequest,
|
||||||
@@ -604,6 +605,25 @@ impl RegionServer {
|
|||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Remaps manifests from old regions to new regions.
|
||||||
|
pub async fn remap_manifests(
|
||||||
|
&self,
|
||||||
|
request: RemapManifestsRequest,
|
||||||
|
) -> Result<RemapManifestsResponse> {
|
||||||
|
let region_id = request.region_id;
|
||||||
|
let engine_with_status = self
|
||||||
|
.inner
|
||||||
|
.region_map
|
||||||
|
.get(®ion_id)
|
||||||
|
.with_context(|| RegionNotFoundSnafu { region_id })?;
|
||||||
|
|
||||||
|
engine_with_status
|
||||||
|
.engine()
|
||||||
|
.remap_manifests(request)
|
||||||
|
.await
|
||||||
|
.with_context(|_| HandleRegionRequestSnafu { region_id })
|
||||||
|
}
|
||||||
|
|
||||||
fn is_suspended(&self) -> bool {
|
fn is_suspended(&self) -> bool {
|
||||||
self.suspend.load(Ordering::Relaxed)
|
self.suspend.load(Ordering::Relaxed)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -816,7 +816,7 @@ mod tests {
|
|||||||
let result = encode_by_struct(&json_struct, json);
|
let result = encode_by_struct(&json_struct, json);
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
result.unwrap_err().to_string(),
|
result.unwrap_err().to_string(),
|
||||||
"Cannot cast value bar to Number(I64)"
|
r#"Cannot cast value bar to "<Number>""#
|
||||||
);
|
);
|
||||||
|
|
||||||
let json = json!({
|
let json = json!({
|
||||||
|
|||||||
@@ -13,7 +13,7 @@
|
|||||||
// limitations under the License.
|
// limitations under the License.
|
||||||
|
|
||||||
use std::collections::BTreeMap;
|
use std::collections::BTreeMap;
|
||||||
use std::fmt::{Display, Formatter};
|
use std::fmt::{Debug, Display, Formatter};
|
||||||
use std::str::FromStr;
|
use std::str::FromStr;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
@@ -134,24 +134,24 @@ impl From<&ConcreteDataType> for JsonNativeType {
|
|||||||
impl Display for JsonNativeType {
|
impl Display for JsonNativeType {
|
||||||
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
|
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
|
||||||
match self {
|
match self {
|
||||||
JsonNativeType::Null => write!(f, "Null"),
|
JsonNativeType::Null => write!(f, r#""<Null>""#),
|
||||||
JsonNativeType::Bool => write!(f, "Bool"),
|
JsonNativeType::Bool => write!(f, r#""<Bool>""#),
|
||||||
JsonNativeType::Number(t) => {
|
JsonNativeType::Number(_) => {
|
||||||
write!(f, "Number({t:?})")
|
write!(f, r#""<Number>""#)
|
||||||
}
|
}
|
||||||
JsonNativeType::String => write!(f, "String"),
|
JsonNativeType::String => write!(f, r#""<String>""#),
|
||||||
JsonNativeType::Array(item_type) => {
|
JsonNativeType::Array(item_type) => {
|
||||||
write!(f, "Array[{}]", item_type)
|
write!(f, "[{}]", item_type)
|
||||||
}
|
}
|
||||||
JsonNativeType::Object(object) => {
|
JsonNativeType::Object(object) => {
|
||||||
write!(
|
write!(
|
||||||
f,
|
f,
|
||||||
"Object{{{}}}",
|
"{{{}}}",
|
||||||
object
|
object
|
||||||
.iter()
|
.iter()
|
||||||
.map(|(k, v)| format!(r#""{k}": {v}"#))
|
.map(|(k, v)| format!(r#""{k}":{v}"#))
|
||||||
.collect::<Vec<_>>()
|
.collect::<Vec<_>>()
|
||||||
.join(", ")
|
.join(",")
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -183,7 +183,11 @@ impl JsonType {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn native_type(&self) -> &JsonNativeType {
|
pub fn is_native_type(&self) -> bool {
|
||||||
|
matches!(self.format, JsonFormat::Native(_))
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn native_type(&self) -> &JsonNativeType {
|
||||||
match &self.format {
|
match &self.format {
|
||||||
JsonFormat::Jsonb => &JsonNativeType::String,
|
JsonFormat::Jsonb => &JsonNativeType::String,
|
||||||
JsonFormat::Native(x) => x.as_ref(),
|
JsonFormat::Native(x) => x.as_ref(),
|
||||||
@@ -650,15 +654,16 @@ mod tests {
|
|||||||
"list": [1, 2, 3],
|
"list": [1, 2, 3],
|
||||||
"object": {"a": 1}
|
"object": {"a": 1}
|
||||||
}"#;
|
}"#;
|
||||||
let expected = r#"Json<Object{"hello": String, "list": Array[Number(I64)], "object": Object{"a": Number(I64)}}>"#;
|
let expected =
|
||||||
|
r#"Json<{"hello":"<String>","list":["<Number>"],"object":{"a":"<Number>"}}>"#;
|
||||||
test(json, json_type, Ok(expected))?;
|
test(json, json_type, Ok(expected))?;
|
||||||
|
|
||||||
// cannot merge with other non-object json values:
|
// cannot merge with other non-object json values:
|
||||||
let jsons = [r#""s""#, "1", "[1]"];
|
let jsons = [r#""s""#, "1", "[1]"];
|
||||||
let expects = [
|
let expects = [
|
||||||
r#"Failed to merge JSON datatype: datatypes have conflict, this: Object{"hello": String, "list": Array[Number(I64)], "object": Object{"a": Number(I64)}}, that: String"#,
|
r#"Failed to merge JSON datatype: datatypes have conflict, this: {"hello":"<String>","list":["<Number>"],"object":{"a":"<Number>"}}, that: "<String>""#,
|
||||||
r#"Failed to merge JSON datatype: datatypes have conflict, this: Object{"hello": String, "list": Array[Number(I64)], "object": Object{"a": Number(I64)}}, that: Number(I64)"#,
|
r#"Failed to merge JSON datatype: datatypes have conflict, this: {"hello":"<String>","list":["<Number>"],"object":{"a":"<Number>"}}, that: "<Number>""#,
|
||||||
r#"Failed to merge JSON datatype: datatypes have conflict, this: Object{"hello": String, "list": Array[Number(I64)], "object": Object{"a": Number(I64)}}, that: Array[Number(I64)]"#,
|
r#"Failed to merge JSON datatype: datatypes have conflict, this: {"hello":"<String>","list":["<Number>"],"object":{"a":"<Number>"}}, that: ["<Number>"]"#,
|
||||||
];
|
];
|
||||||
for (json, expect) in jsons.into_iter().zip(expects.into_iter()) {
|
for (json, expect) in jsons.into_iter().zip(expects.into_iter()) {
|
||||||
test(json, json_type, Err(expect))?;
|
test(json, json_type, Err(expect))?;
|
||||||
@@ -670,7 +675,7 @@ mod tests {
|
|||||||
"float": 0.123,
|
"float": 0.123,
|
||||||
"no": 42
|
"no": 42
|
||||||
}"#;
|
}"#;
|
||||||
let expected = r#"Failed to merge JSON datatype: datatypes have conflict, this: String, that: Number(I64)"#;
|
let expected = r#"Failed to merge JSON datatype: datatypes have conflict, this: "<String>", that: "<Number>""#;
|
||||||
test(json, json_type, Err(expected))?;
|
test(json, json_type, Err(expected))?;
|
||||||
|
|
||||||
// can merge with another json object:
|
// can merge with another json object:
|
||||||
@@ -679,7 +684,7 @@ mod tests {
|
|||||||
"float": 0.123,
|
"float": 0.123,
|
||||||
"int": 42
|
"int": 42
|
||||||
}"#;
|
}"#;
|
||||||
let expected = r#"Json<Object{"float": Number(F64), "hello": String, "int": Number(I64), "list": Array[Number(I64)], "object": Object{"a": Number(I64)}}>"#;
|
let expected = r#"Json<{"float":"<Number>","hello":"<String>","int":"<Number>","list":["<Number>"],"object":{"a":"<Number>"}}>"#;
|
||||||
test(json, json_type, Ok(expected))?;
|
test(json, json_type, Ok(expected))?;
|
||||||
|
|
||||||
// can merge with some complex nested json object:
|
// can merge with some complex nested json object:
|
||||||
@@ -689,7 +694,7 @@ mod tests {
|
|||||||
"float": 0.456,
|
"float": 0.456,
|
||||||
"int": 0
|
"int": 0
|
||||||
}"#;
|
}"#;
|
||||||
let expected = r#"Json<Object{"float": Number(F64), "hello": String, "int": Number(I64), "list": Array[Number(I64)], "object": Object{"a": Number(I64), "foo": String, "l": Array[String], "o": Object{"key": String}}}>"#;
|
let expected = r#"Json<{"float":"<Number>","hello":"<String>","int":"<Number>","list":["<Number>"],"object":{"a":"<Number>","foo":"<String>","l":["<String>"],"o":{"key":"<String>"}}}>"#;
|
||||||
test(json, json_type, Ok(expected))?;
|
test(json, json_type, Ok(expected))?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
|
|||||||
@@ -321,10 +321,10 @@ mod tests {
|
|||||||
Ok(()),
|
Ok(()),
|
||||||
Ok(()),
|
Ok(()),
|
||||||
Err(
|
Err(
|
||||||
"Failed to merge JSON datatype: datatypes have conflict, this: Number(I64), that: String",
|
r#"Failed to merge JSON datatype: datatypes have conflict, this: "<Number>", that: "<String>""#,
|
||||||
),
|
),
|
||||||
Err(
|
Err(
|
||||||
"Failed to merge JSON datatype: datatypes have conflict, this: Number(I64), that: Array[Bool]",
|
r#"Failed to merge JSON datatype: datatypes have conflict, this: "<Number>", that: ["<Bool>"]"#,
|
||||||
),
|
),
|
||||||
];
|
];
|
||||||
let mut builder = JsonVectorBuilder::new(JsonNativeType::Null, 1);
|
let mut builder = JsonVectorBuilder::new(JsonNativeType::Null, 1);
|
||||||
@@ -396,12 +396,12 @@ mod tests {
|
|||||||
// test children builders:
|
// test children builders:
|
||||||
assert_eq!(builder.builders.len(), 6);
|
assert_eq!(builder.builders.len(), 6);
|
||||||
let expect_types = [
|
let expect_types = [
|
||||||
r#"Json<Object{"list": Array[Number(I64)], "s": String}>"#,
|
r#"Json<{"list":["<Number>"],"s":"<String>"}>"#,
|
||||||
r#"Json<Object{"float": Number(F64), "s": String}>"#,
|
r#"Json<{"float":"<Number>","s":"<String>"}>"#,
|
||||||
r#"Json<Object{"float": Number(F64), "int": Number(I64)}>"#,
|
r#"Json<{"float":"<Number>","int":"<Number>"}>"#,
|
||||||
r#"Json<Object{"int": Number(I64), "object": Object{"hello": String, "timestamp": Number(I64)}}>"#,
|
r#"Json<{"int":"<Number>","object":{"hello":"<String>","timestamp":"<Number>"}}>"#,
|
||||||
r#"Json<Object{"nested": Object{"a": Object{"b": Object{"b": Object{"a": String}}}}, "object": Object{"timestamp": Number(I64)}}>"#,
|
r#"Json<{"nested":{"a":{"b":{"b":{"a":"<String>"}}}},"object":{"timestamp":"<Number>"}}>"#,
|
||||||
r#"Json<Object{"nested": Object{"a": Object{"b": Object{"a": Object{"b": String}}}}, "object": Object{"timestamp": Number(I64)}}>"#,
|
r#"Json<{"nested":{"a":{"b":{"a":{"b":"<String>"}}}},"object":{"timestamp":"<Number>"}}>"#,
|
||||||
];
|
];
|
||||||
let expect_vectors = [
|
let expect_vectors = [
|
||||||
r#"
|
r#"
|
||||||
@@ -456,7 +456,7 @@ mod tests {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// test final merged json type:
|
// test final merged json type:
|
||||||
let expected = r#"Json<Object{"float": Number(F64), "int": Number(I64), "list": Array[Number(I64)], "nested": Object{"a": Object{"b": Object{"a": Object{"b": String}, "b": Object{"a": String}}}}, "object": Object{"hello": String, "timestamp": Number(I64)}, "s": String}>"#;
|
let expected = r#"Json<{"float":"<Number>","int":"<Number>","list":["<Number>"],"nested":{"a":{"b":{"a":{"b":"<String>"},"b":{"a":"<String>"}}}},"object":{"hello":"<String>","timestamp":"<Number>"},"s":"<String>"}>"#;
|
||||||
assert_eq!(builder.data_type().to_string(), expected);
|
assert_eq!(builder.data_type().to_string(), expected);
|
||||||
|
|
||||||
// test final produced vector:
|
// test final produced vector:
|
||||||
|
|||||||
@@ -13,18 +13,41 @@
|
|||||||
// limitations under the License.
|
// limitations under the License.
|
||||||
|
|
||||||
use std::any::Any;
|
use std::any::Any;
|
||||||
|
use std::collections::HashMap;
|
||||||
|
|
||||||
use common_procedure::{Context as ProcedureContext, ProcedureWithId, Status};
|
use common_procedure::{Context as ProcedureContext, ProcedureWithId, Status};
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
|
use store_api::storage::RegionId;
|
||||||
|
|
||||||
use crate::error::Result;
|
use crate::error::Result;
|
||||||
use crate::procedure::repartition::collect::{Collect, ProcedureMeta};
|
use crate::procedure::repartition::collect::{Collect, ProcedureMeta};
|
||||||
use crate::procedure::repartition::group::RepartitionGroupProcedure;
|
use crate::procedure::repartition::group::RepartitionGroupProcedure;
|
||||||
|
use crate::procedure::repartition::plan::RegionDescriptor;
|
||||||
use crate::procedure::repartition::{self, Context, State};
|
use crate::procedure::repartition::{self, Context, State};
|
||||||
|
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
pub struct Dispatch;
|
pub struct Dispatch;
|
||||||
|
|
||||||
|
#[allow(dead_code)]
|
||||||
|
fn build_region_mapping(
|
||||||
|
source_regions: &[RegionDescriptor],
|
||||||
|
target_regions: &[RegionDescriptor],
|
||||||
|
transition_map: &[Vec<usize>],
|
||||||
|
) -> HashMap<RegionId, Vec<RegionId>> {
|
||||||
|
transition_map
|
||||||
|
.iter()
|
||||||
|
.enumerate()
|
||||||
|
.map(|(source_idx, indices)| {
|
||||||
|
let source_region = source_regions[source_idx].region_id;
|
||||||
|
let target_regions = indices
|
||||||
|
.iter()
|
||||||
|
.map(|&target_idx| target_regions[target_idx].region_id)
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
(source_region, target_regions)
|
||||||
|
})
|
||||||
|
.collect::<HashMap<RegionId, _>>()
|
||||||
|
}
|
||||||
|
|
||||||
#[async_trait::async_trait]
|
#[async_trait::async_trait]
|
||||||
#[typetag::serde]
|
#[typetag::serde]
|
||||||
impl State for Dispatch {
|
impl State for Dispatch {
|
||||||
@@ -37,11 +60,19 @@ impl State for Dispatch {
|
|||||||
let mut procedures = Vec::with_capacity(ctx.persistent_ctx.plans.len());
|
let mut procedures = Vec::with_capacity(ctx.persistent_ctx.plans.len());
|
||||||
let mut procedure_metas = Vec::with_capacity(ctx.persistent_ctx.plans.len());
|
let mut procedure_metas = Vec::with_capacity(ctx.persistent_ctx.plans.len());
|
||||||
for (plan_index, plan) in ctx.persistent_ctx.plans.iter().enumerate() {
|
for (plan_index, plan) in ctx.persistent_ctx.plans.iter().enumerate() {
|
||||||
|
let region_mapping = build_region_mapping(
|
||||||
|
&plan.source_regions,
|
||||||
|
&plan.target_regions,
|
||||||
|
&plan.transition_map,
|
||||||
|
);
|
||||||
let persistent_ctx = repartition::group::PersistentContext::new(
|
let persistent_ctx = repartition::group::PersistentContext::new(
|
||||||
plan.group_id,
|
plan.group_id,
|
||||||
table_id,
|
table_id,
|
||||||
|
ctx.persistent_ctx.catalog_name.clone(),
|
||||||
|
ctx.persistent_ctx.schema_name.clone(),
|
||||||
plan.source_regions.clone(),
|
plan.source_regions.clone(),
|
||||||
plan.target_regions.clone(),
|
plan.target_regions.clone(),
|
||||||
|
region_mapping,
|
||||||
);
|
);
|
||||||
|
|
||||||
let group_procedure = RepartitionGroupProcedure::new(persistent_ctx, ctx);
|
let group_procedure = RepartitionGroupProcedure::new(persistent_ctx, ctx);
|
||||||
|
|||||||
@@ -12,27 +12,34 @@
|
|||||||
// See the License for the specific language governing permissions and
|
// See the License for the specific language governing permissions and
|
||||||
// limitations under the License.
|
// limitations under the License.
|
||||||
|
|
||||||
|
pub(crate) mod apply_staging_manifest;
|
||||||
pub(crate) mod enter_staging_region;
|
pub(crate) mod enter_staging_region;
|
||||||
|
pub(crate) mod remap_manifest;
|
||||||
|
pub(crate) mod repartition_end;
|
||||||
pub(crate) mod repartition_start;
|
pub(crate) mod repartition_start;
|
||||||
pub(crate) mod update_metadata;
|
pub(crate) mod update_metadata;
|
||||||
pub(crate) mod utils;
|
pub(crate) mod utils;
|
||||||
|
|
||||||
use std::any::Any;
|
use std::any::Any;
|
||||||
|
use std::collections::HashMap;
|
||||||
use std::fmt::Debug;
|
use std::fmt::Debug;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
use common_error::ext::BoxedError;
|
use common_error::ext::BoxedError;
|
||||||
use common_meta::DatanodeId;
|
|
||||||
use common_meta::cache_invalidator::CacheInvalidatorRef;
|
use common_meta::cache_invalidator::CacheInvalidatorRef;
|
||||||
use common_meta::instruction::CacheIdent;
|
use common_meta::instruction::CacheIdent;
|
||||||
use common_meta::key::datanode_table::{DatanodeTableKey, DatanodeTableValue, RegionInfo};
|
use common_meta::key::datanode_table::{DatanodeTableKey, DatanodeTableValue, RegionInfo};
|
||||||
use common_meta::key::table_route::TableRouteValue;
|
use common_meta::key::table_route::TableRouteValue;
|
||||||
use common_meta::key::{DeserializedValueWithBytes, TableMetadataManagerRef};
|
use common_meta::key::{DeserializedValueWithBytes, TableMetadataManagerRef};
|
||||||
|
use common_meta::lock_key::{CatalogLock, RegionLock, SchemaLock};
|
||||||
|
use common_meta::peer::Peer;
|
||||||
use common_meta::rpc::router::RegionRoute;
|
use common_meta::rpc::router::RegionRoute;
|
||||||
|
use common_procedure::error::ToJsonSnafu;
|
||||||
use common_procedure::{
|
use common_procedure::{
|
||||||
Context as ProcedureContext, LockKey, Procedure, Result as ProcedureResult, Status,
|
Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure,
|
||||||
UserMetadata,
|
Result as ProcedureResult, Status, StringKey, UserMetadata,
|
||||||
};
|
};
|
||||||
|
use common_telemetry::error;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use snafu::{OptionExt, ResultExt};
|
use snafu::{OptionExt, ResultExt};
|
||||||
use store_api::storage::{RegionId, TableId};
|
use store_api::storage::{RegionId, TableId};
|
||||||
@@ -71,6 +78,12 @@ impl RepartitionGroupProcedure {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Serialize)]
|
||||||
|
pub struct RepartitionGroupData<'a> {
|
||||||
|
persistent_ctx: &'a PersistentContext,
|
||||||
|
state: &'a dyn State,
|
||||||
|
}
|
||||||
|
|
||||||
#[async_trait::async_trait]
|
#[async_trait::async_trait]
|
||||||
impl Procedure for RepartitionGroupProcedure {
|
impl Procedure for RepartitionGroupProcedure {
|
||||||
fn type_name(&self) -> &str {
|
fn type_name(&self) -> &str {
|
||||||
@@ -78,27 +91,48 @@ impl Procedure for RepartitionGroupProcedure {
|
|||||||
}
|
}
|
||||||
|
|
||||||
async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
|
async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
|
||||||
todo!()
|
let state = &mut self.state;
|
||||||
}
|
|
||||||
|
|
||||||
async fn rollback(&mut self, _: &ProcedureContext) -> ProcedureResult<()> {
|
match state.next(&mut self.context, _ctx).await {
|
||||||
todo!()
|
Ok((next, status)) => {
|
||||||
|
*state = next;
|
||||||
|
Ok(status)
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
if e.is_retryable() {
|
||||||
|
Err(ProcedureError::retry_later(e))
|
||||||
|
} else {
|
||||||
|
error!(
|
||||||
|
e;
|
||||||
|
"Repartition group procedure failed, group id: {}, table id: {}",
|
||||||
|
self.context.persistent_ctx.group_id,
|
||||||
|
self.context.persistent_ctx.table_id,
|
||||||
|
);
|
||||||
|
Err(ProcedureError::external(e))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn rollback_supported(&self) -> bool {
|
fn rollback_supported(&self) -> bool {
|
||||||
true
|
false
|
||||||
}
|
}
|
||||||
|
|
||||||
fn dump(&self) -> ProcedureResult<String> {
|
fn dump(&self) -> ProcedureResult<String> {
|
||||||
todo!()
|
let data = RepartitionGroupData {
|
||||||
|
persistent_ctx: &self.context.persistent_ctx,
|
||||||
|
state: self.state.as_ref(),
|
||||||
|
};
|
||||||
|
serde_json::to_string(&data).context(ToJsonSnafu)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn lock_key(&self) -> LockKey {
|
fn lock_key(&self) -> LockKey {
|
||||||
todo!()
|
LockKey::new(self.context.persistent_ctx.lock_key())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn user_metadata(&self) -> Option<UserMetadata> {
|
fn user_metadata(&self) -> Option<UserMetadata> {
|
||||||
todo!()
|
// TODO(weny): support user metadata.
|
||||||
|
None
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -123,8 +157,8 @@ pub struct GroupPrepareResult {
|
|||||||
pub target_routes: Vec<RegionRoute>,
|
pub target_routes: Vec<RegionRoute>,
|
||||||
/// The primary source region id (first source region), used for retrieving region options.
|
/// The primary source region id (first source region), used for retrieving region options.
|
||||||
pub central_region: RegionId,
|
pub central_region: RegionId,
|
||||||
/// The datanode id where the primary source region is located.
|
/// The peer where the primary source region is located.
|
||||||
pub central_region_datanode_id: DatanodeId,
|
pub central_region_datanode: Peer,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
|
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
|
||||||
@@ -132,30 +166,59 @@ pub struct PersistentContext {
|
|||||||
pub group_id: GroupId,
|
pub group_id: GroupId,
|
||||||
/// The table id of the repartition group.
|
/// The table id of the repartition group.
|
||||||
pub table_id: TableId,
|
pub table_id: TableId,
|
||||||
|
/// The catalog name of the repartition group.
|
||||||
|
pub catalog_name: String,
|
||||||
|
/// The schema name of the repartition group.
|
||||||
|
pub schema_name: String,
|
||||||
/// The source regions of the repartition group.
|
/// The source regions of the repartition group.
|
||||||
pub sources: Vec<RegionDescriptor>,
|
pub sources: Vec<RegionDescriptor>,
|
||||||
/// The target regions of the repartition group.
|
/// The target regions of the repartition group.
|
||||||
pub targets: Vec<RegionDescriptor>,
|
pub targets: Vec<RegionDescriptor>,
|
||||||
|
/// For each `source region`, the corresponding
|
||||||
|
/// `target regions` that overlap with it.
|
||||||
|
pub region_mapping: HashMap<RegionId, Vec<RegionId>>,
|
||||||
/// The result of group prepare.
|
/// The result of group prepare.
|
||||||
/// The value will be set in [RepartitionStart](crate::procedure::repartition::group::repartition_start::RepartitionStart) state.
|
/// The value will be set in [RepartitionStart](crate::procedure::repartition::group::repartition_start::RepartitionStart) state.
|
||||||
pub group_prepare_result: Option<GroupPrepareResult>,
|
pub group_prepare_result: Option<GroupPrepareResult>,
|
||||||
|
/// The staging manifest paths of the repartition group.
|
||||||
|
/// The value will be set in [RemapManifest](crate::procedure::repartition::group::remap_manifest::RemapManifest) state.
|
||||||
|
pub staging_manifest_paths: HashMap<RegionId, String>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl PersistentContext {
|
impl PersistentContext {
|
||||||
pub fn new(
|
pub fn new(
|
||||||
group_id: GroupId,
|
group_id: GroupId,
|
||||||
table_id: TableId,
|
table_id: TableId,
|
||||||
|
catalog_name: String,
|
||||||
|
schema_name: String,
|
||||||
sources: Vec<RegionDescriptor>,
|
sources: Vec<RegionDescriptor>,
|
||||||
targets: Vec<RegionDescriptor>,
|
targets: Vec<RegionDescriptor>,
|
||||||
|
region_mapping: HashMap<RegionId, Vec<RegionId>>,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
Self {
|
Self {
|
||||||
group_id,
|
group_id,
|
||||||
table_id,
|
table_id,
|
||||||
|
catalog_name,
|
||||||
|
schema_name,
|
||||||
sources,
|
sources,
|
||||||
targets,
|
targets,
|
||||||
|
region_mapping,
|
||||||
group_prepare_result: None,
|
group_prepare_result: None,
|
||||||
|
staging_manifest_paths: HashMap::new(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn lock_key(&self) -> Vec<StringKey> {
|
||||||
|
let mut lock_keys = Vec::with_capacity(2 + self.sources.len());
|
||||||
|
lock_keys.extend([
|
||||||
|
CatalogLock::Read(&self.catalog_name).into(),
|
||||||
|
SchemaLock::read(&self.catalog_name, &self.schema_name).into(),
|
||||||
|
]);
|
||||||
|
for source in &self.sources {
|
||||||
|
lock_keys.push(RegionLock::Write(source.region_id).into());
|
||||||
|
}
|
||||||
|
lock_keys
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Context {
|
impl Context {
|
||||||
@@ -253,7 +316,7 @@ impl Context {
|
|||||||
// Safety: prepare result is set in [RepartitionStart] state.
|
// Safety: prepare result is set in [RepartitionStart] state.
|
||||||
let prepare_result = self.persistent_ctx.group_prepare_result.as_ref().unwrap();
|
let prepare_result = self.persistent_ctx.group_prepare_result.as_ref().unwrap();
|
||||||
let central_region_datanode_table_value = self
|
let central_region_datanode_table_value = self
|
||||||
.get_datanode_table_value(table_id, prepare_result.central_region_datanode_id)
|
.get_datanode_table_value(table_id, prepare_result.central_region_datanode.id)
|
||||||
.await?;
|
.await?;
|
||||||
let RegionInfo {
|
let RegionInfo {
|
||||||
region_options,
|
region_options,
|
||||||
|
|||||||
@@ -0,0 +1,333 @@
|
|||||||
|
// Copyright 2023 Greptime Team
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
use std::any::Any;
|
||||||
|
use std::collections::HashMap;
|
||||||
|
use std::time::{Duration, Instant};
|
||||||
|
|
||||||
|
use api::v1::meta::MailboxMessage;
|
||||||
|
use common_meta::instruction::{
|
||||||
|
ApplyStagingManifestReply, ApplyStagingManifestsReply, Instruction, InstructionReply,
|
||||||
|
};
|
||||||
|
use common_meta::peer::Peer;
|
||||||
|
use common_meta::rpc::router::RegionRoute;
|
||||||
|
use common_procedure::{Context as ProcedureContext, Status};
|
||||||
|
use common_telemetry::info;
|
||||||
|
use futures::future::join_all;
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
use snafu::{OptionExt, ResultExt, ensure};
|
||||||
|
use store_api::storage::RegionId;
|
||||||
|
|
||||||
|
use crate::error::{self, Error, Result};
|
||||||
|
use crate::handler::HeartbeatMailbox;
|
||||||
|
use crate::procedure::repartition::group::update_metadata::UpdateMetadata;
|
||||||
|
use crate::procedure::repartition::group::utils::{
|
||||||
|
HandleMultipleResult, group_region_routes_by_peer, handle_multiple_results,
|
||||||
|
};
|
||||||
|
use crate::procedure::repartition::group::{Context, State};
|
||||||
|
use crate::procedure::repartition::plan::RegionDescriptor;
|
||||||
|
use crate::service::mailbox::{Channel, MailboxRef};
|
||||||
|
|
||||||
|
#[derive(Debug, Serialize, Deserialize)]
|
||||||
|
pub struct ApplyStagingManifest;
|
||||||
|
|
||||||
|
#[async_trait::async_trait]
|
||||||
|
#[typetag::serde]
|
||||||
|
impl State for ApplyStagingManifest {
|
||||||
|
async fn next(
|
||||||
|
&mut self,
|
||||||
|
ctx: &mut Context,
|
||||||
|
_procedure_ctx: &ProcedureContext,
|
||||||
|
) -> Result<(Box<dyn State>, Status)> {
|
||||||
|
self.apply_staging_manifests(ctx).await?;
|
||||||
|
|
||||||
|
Ok((
|
||||||
|
Box::new(UpdateMetadata::ApplyStaging),
|
||||||
|
Status::executing(true),
|
||||||
|
))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn as_any(&self) -> &dyn Any {
|
||||||
|
self
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ApplyStagingManifest {
|
||||||
|
fn build_apply_staging_manifest_instructions(
|
||||||
|
staging_manifest_paths: &HashMap<RegionId, String>,
|
||||||
|
target_routes: &[RegionRoute],
|
||||||
|
targets: &[RegionDescriptor],
|
||||||
|
central_region_id: RegionId,
|
||||||
|
) -> Result<HashMap<Peer, Vec<common_meta::instruction::ApplyStagingManifest>>> {
|
||||||
|
let target_partition_expr_by_region = targets
|
||||||
|
.iter()
|
||||||
|
.map(|target| {
|
||||||
|
Ok((
|
||||||
|
target.region_id,
|
||||||
|
target
|
||||||
|
.partition_expr
|
||||||
|
.as_json_str()
|
||||||
|
.context(error::SerializePartitionExprSnafu)?,
|
||||||
|
))
|
||||||
|
})
|
||||||
|
.collect::<Result<HashMap<_, _>>>()?;
|
||||||
|
// Safety: `leader_peer` is set for all region routes, checked in `repartition_start`.
|
||||||
|
let target_region_routes_by_peer = group_region_routes_by_peer(target_routes);
|
||||||
|
let mut instructions = HashMap::with_capacity(target_region_routes_by_peer.len());
|
||||||
|
|
||||||
|
for (peer, region_ids) in target_region_routes_by_peer {
|
||||||
|
let apply_staging_manifests = region_ids
|
||||||
|
.into_iter()
|
||||||
|
.map(|region_id| common_meta::instruction::ApplyStagingManifest {
|
||||||
|
region_id,
|
||||||
|
partition_expr: target_partition_expr_by_region[®ion_id].clone(),
|
||||||
|
central_region_id,
|
||||||
|
manifest_path: staging_manifest_paths[®ion_id].clone(),
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
instructions.insert(peer.clone(), apply_staging_manifests);
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(instructions)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[allow(dead_code)]
|
||||||
|
async fn apply_staging_manifests(&self, ctx: &mut Context) -> Result<()> {
|
||||||
|
let table_id = ctx.persistent_ctx.table_id;
|
||||||
|
let group_id = ctx.persistent_ctx.group_id;
|
||||||
|
let staging_manifest_paths = &ctx.persistent_ctx.staging_manifest_paths;
|
||||||
|
// Safety: the group prepare result is set in the RepartitionStart state.
|
||||||
|
let prepare_result = ctx.persistent_ctx.group_prepare_result.as_ref().unwrap();
|
||||||
|
let targets = &ctx.persistent_ctx.targets;
|
||||||
|
let target_routes = &prepare_result.target_routes;
|
||||||
|
let central_region_id = prepare_result.central_region;
|
||||||
|
let instructions = Self::build_apply_staging_manifest_instructions(
|
||||||
|
staging_manifest_paths,
|
||||||
|
target_routes,
|
||||||
|
targets,
|
||||||
|
central_region_id,
|
||||||
|
)?;
|
||||||
|
let operation_timeout =
|
||||||
|
ctx.next_operation_timeout()
|
||||||
|
.context(error::ExceededDeadlineSnafu {
|
||||||
|
operation: "Apply staging manifests",
|
||||||
|
})?;
|
||||||
|
|
||||||
|
let (peers, tasks): (Vec<_>, Vec<_>) = instructions
|
||||||
|
.iter()
|
||||||
|
.map(|(peer, apply_staging_manifests)| {
|
||||||
|
(
|
||||||
|
peer,
|
||||||
|
Self::apply_staging_manifest(
|
||||||
|
&ctx.mailbox,
|
||||||
|
&ctx.server_addr,
|
||||||
|
peer,
|
||||||
|
apply_staging_manifests,
|
||||||
|
operation_timeout,
|
||||||
|
),
|
||||||
|
)
|
||||||
|
})
|
||||||
|
.unzip();
|
||||||
|
info!(
|
||||||
|
"Sent apply staging manifests instructions to peers: {:?} for repartition table {}, group id {}",
|
||||||
|
peers, table_id, group_id
|
||||||
|
);
|
||||||
|
|
||||||
|
let format_err_msg = |idx: usize, error: &Error| {
|
||||||
|
let peer = peers[idx];
|
||||||
|
format!(
|
||||||
|
"Failed to apply staging manifests on datanode {:?}, error: {:?}",
|
||||||
|
peer, error
|
||||||
|
)
|
||||||
|
};
|
||||||
|
// Waits for all tasks to complete.
|
||||||
|
let results = join_all(tasks).await;
|
||||||
|
let result = handle_multiple_results(&results);
|
||||||
|
match result {
|
||||||
|
HandleMultipleResult::AllSuccessful => Ok(()),
|
||||||
|
HandleMultipleResult::AllRetryable(retryable_errors) => error::RetryLaterSnafu {
|
||||||
|
reason: format!(
|
||||||
|
"All retryable errors during applying staging manifests for repartition table {}, group id {}: {:?}",
|
||||||
|
table_id, group_id,
|
||||||
|
retryable_errors
|
||||||
|
.iter()
|
||||||
|
.map(|(idx, error)| format_err_msg(*idx, error))
|
||||||
|
.collect::<Vec<_>>()
|
||||||
|
.join(",")
|
||||||
|
),
|
||||||
|
}
|
||||||
|
.fail(),
|
||||||
|
HandleMultipleResult::AllNonRetryable(non_retryable_errors) => error::UnexpectedSnafu {
|
||||||
|
violated: format!(
|
||||||
|
"All non retryable errors during applying staging manifests for repartition table {}, group id {}: {:?}",
|
||||||
|
table_id, group_id,
|
||||||
|
non_retryable_errors
|
||||||
|
.iter()
|
||||||
|
.map(|(idx, error)| format_err_msg(*idx, error))
|
||||||
|
.collect::<Vec<_>>()
|
||||||
|
.join(",")
|
||||||
|
),
|
||||||
|
}
|
||||||
|
.fail(),
|
||||||
|
HandleMultipleResult::PartialRetryable {
|
||||||
|
retryable_errors,
|
||||||
|
non_retryable_errors,
|
||||||
|
} => error::UnexpectedSnafu {
|
||||||
|
violated: format!(
|
||||||
|
"Partial retryable errors during applying staging manifests for repartition table {}, group id {}: {:?}, non retryable errors: {:?}",
|
||||||
|
table_id, group_id,
|
||||||
|
retryable_errors
|
||||||
|
.iter()
|
||||||
|
.map(|(idx, error)| format_err_msg(*idx, error))
|
||||||
|
.collect::<Vec<_>>()
|
||||||
|
.join(","),
|
||||||
|
non_retryable_errors
|
||||||
|
.iter()
|
||||||
|
.map(|(idx, error)| format_err_msg(*idx, error))
|
||||||
|
.collect::<Vec<_>>()
|
||||||
|
.join(","),
|
||||||
|
),
|
||||||
|
}
|
||||||
|
.fail(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn apply_staging_manifest(
|
||||||
|
mailbox: &MailboxRef,
|
||||||
|
server_addr: &str,
|
||||||
|
peer: &Peer,
|
||||||
|
apply_staging_manifests: &[common_meta::instruction::ApplyStagingManifest],
|
||||||
|
timeout: Duration,
|
||||||
|
) -> Result<()> {
|
||||||
|
let ch = Channel::Datanode(peer.id);
|
||||||
|
let instruction = Instruction::ApplyStagingManifests(apply_staging_manifests.to_vec());
|
||||||
|
let message = MailboxMessage::json_message(
|
||||||
|
&format!(
|
||||||
|
"Apply staging manifests for regions: {:?}",
|
||||||
|
apply_staging_manifests
|
||||||
|
.iter()
|
||||||
|
.map(|r| r.region_id)
|
||||||
|
.collect::<Vec<_>>()
|
||||||
|
),
|
||||||
|
&format!("Metasrv@{}", server_addr),
|
||||||
|
&format!("Datanode-{}@{}", peer.id, peer.addr),
|
||||||
|
common_time::util::current_time_millis(),
|
||||||
|
&instruction,
|
||||||
|
)
|
||||||
|
.with_context(|_| error::SerializeToJsonSnafu {
|
||||||
|
input: instruction.to_string(),
|
||||||
|
})?;
|
||||||
|
|
||||||
|
let now = Instant::now();
|
||||||
|
let receiver = mailbox.send(&ch, message, timeout).await;
|
||||||
|
|
||||||
|
let receiver = match receiver {
|
||||||
|
Ok(receiver) => receiver,
|
||||||
|
Err(error::Error::PusherNotFound { .. }) => error::RetryLaterSnafu {
|
||||||
|
reason: format!(
|
||||||
|
"Pusher not found for apply staging manifests on datanode {:?}, elapsed: {:?}",
|
||||||
|
peer,
|
||||||
|
now.elapsed()
|
||||||
|
),
|
||||||
|
}
|
||||||
|
.fail()?,
|
||||||
|
Err(err) => {
|
||||||
|
return Err(err);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
match receiver.await {
|
||||||
|
Ok(msg) => {
|
||||||
|
let reply = HeartbeatMailbox::json_reply(&msg)?;
|
||||||
|
info!(
|
||||||
|
"Received apply staging manifests reply: {:?}, elapsed: {:?}",
|
||||||
|
reply,
|
||||||
|
now.elapsed()
|
||||||
|
);
|
||||||
|
let InstructionReply::ApplyStagingManifests(ApplyStagingManifestsReply { replies }) =
|
||||||
|
reply
|
||||||
|
else {
|
||||||
|
return error::UnexpectedInstructionReplySnafu {
|
||||||
|
mailbox_message: msg.to_string(),
|
||||||
|
reason: "expect apply staging manifests reply",
|
||||||
|
}
|
||||||
|
.fail();
|
||||||
|
};
|
||||||
|
for reply in replies {
|
||||||
|
Self::handle_apply_staging_manifest_reply(&reply, &now, peer)?;
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
Err(error::Error::MailboxTimeout { .. }) => {
|
||||||
|
let reason = format!(
|
||||||
|
"Mailbox received timeout for apply staging manifests on datanode {:?}, elapsed: {:?}",
|
||||||
|
peer,
|
||||||
|
now.elapsed()
|
||||||
|
);
|
||||||
|
error::RetryLaterSnafu { reason }.fail()
|
||||||
|
}
|
||||||
|
Err(err) => Err(err),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn handle_apply_staging_manifest_reply(
|
||||||
|
ApplyStagingManifestReply {
|
||||||
|
region_id,
|
||||||
|
ready,
|
||||||
|
exists,
|
||||||
|
error,
|
||||||
|
}: &ApplyStagingManifestReply,
|
||||||
|
now: &Instant,
|
||||||
|
peer: &Peer,
|
||||||
|
) -> Result<()> {
|
||||||
|
ensure!(
|
||||||
|
exists,
|
||||||
|
error::UnexpectedSnafu {
|
||||||
|
violated: format!(
|
||||||
|
"Region {} doesn't exist on datanode {:?}, elapsed: {:?}",
|
||||||
|
region_id,
|
||||||
|
peer,
|
||||||
|
now.elapsed()
|
||||||
|
)
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
if error.is_some() {
|
||||||
|
return error::RetryLaterSnafu {
|
||||||
|
reason: format!(
|
||||||
|
"Failed to apply staging manifest on datanode {:?}, error: {:?}, elapsed: {:?}",
|
||||||
|
peer,
|
||||||
|
error,
|
||||||
|
now.elapsed()
|
||||||
|
),
|
||||||
|
}
|
||||||
|
.fail();
|
||||||
|
}
|
||||||
|
|
||||||
|
ensure!(
|
||||||
|
ready,
|
||||||
|
error::RetryLaterSnafu {
|
||||||
|
reason: format!(
|
||||||
|
"Region {} is still applying staging manifest on datanode {:?}, elapsed: {:?}",
|
||||||
|
region_id,
|
||||||
|
peer,
|
||||||
|
now.elapsed()
|
||||||
|
),
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -29,6 +29,7 @@ use snafu::{OptionExt, ResultExt, ensure};
|
|||||||
|
|
||||||
use crate::error::{self, Error, Result};
|
use crate::error::{self, Error, Result};
|
||||||
use crate::handler::HeartbeatMailbox;
|
use crate::handler::HeartbeatMailbox;
|
||||||
|
use crate::procedure::repartition::group::remap_manifest::RemapManifest;
|
||||||
use crate::procedure::repartition::group::utils::{
|
use crate::procedure::repartition::group::utils::{
|
||||||
HandleMultipleResult, group_region_routes_by_peer, handle_multiple_results,
|
HandleMultipleResult, group_region_routes_by_peer, handle_multiple_results,
|
||||||
};
|
};
|
||||||
@@ -49,7 +50,7 @@ impl State for EnterStagingRegion {
|
|||||||
) -> Result<(Box<dyn State>, Status)> {
|
) -> Result<(Box<dyn State>, Status)> {
|
||||||
self.enter_staging_regions(ctx).await?;
|
self.enter_staging_regions(ctx).await?;
|
||||||
|
|
||||||
Ok(Self::next_state())
|
Ok((Box::new(RemapManifest), Status::executing(true)))
|
||||||
}
|
}
|
||||||
|
|
||||||
fn as_any(&self) -> &dyn Any {
|
fn as_any(&self) -> &dyn Any {
|
||||||
@@ -58,16 +59,10 @@ impl State for EnterStagingRegion {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl EnterStagingRegion {
|
impl EnterStagingRegion {
|
||||||
#[allow(dead_code)]
|
|
||||||
fn next_state() -> (Box<dyn State>, Status) {
|
|
||||||
// TODO(weny): change it later.
|
|
||||||
(Box::new(EnterStagingRegion), Status::executing(true))
|
|
||||||
}
|
|
||||||
|
|
||||||
fn build_enter_staging_instructions(
|
fn build_enter_staging_instructions(
|
||||||
prepare_result: &GroupPrepareResult,
|
prepare_result: &GroupPrepareResult,
|
||||||
targets: &[RegionDescriptor],
|
targets: &[RegionDescriptor],
|
||||||
) -> Result<HashMap<Peer, Instruction>> {
|
) -> Result<HashMap<Peer, Vec<common_meta::instruction::EnterStagingRegion>>> {
|
||||||
let target_partition_expr_by_region = targets
|
let target_partition_expr_by_region = targets
|
||||||
.iter()
|
.iter()
|
||||||
.map(|target| {
|
.map(|target| {
|
||||||
@@ -93,10 +88,7 @@ impl EnterStagingRegion {
|
|||||||
partition_expr: target_partition_expr_by_region[®ion_id].clone(),
|
partition_expr: target_partition_expr_by_region[®ion_id].clone(),
|
||||||
})
|
})
|
||||||
.collect();
|
.collect();
|
||||||
instructions.insert(
|
instructions.insert(peer.clone(), enter_staging_regions);
|
||||||
peer.clone(),
|
|
||||||
Instruction::EnterStagingRegions(enter_staging_regions),
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(instructions)
|
Ok(instructions)
|
||||||
@@ -117,14 +109,14 @@ impl EnterStagingRegion {
|
|||||||
})?;
|
})?;
|
||||||
let (peers, tasks): (Vec<_>, Vec<_>) = instructions
|
let (peers, tasks): (Vec<_>, Vec<_>) = instructions
|
||||||
.iter()
|
.iter()
|
||||||
.map(|(peer, instruction)| {
|
.map(|(peer, enter_staging_regions)| {
|
||||||
(
|
(
|
||||||
peer,
|
peer,
|
||||||
Self::enter_staging_region(
|
Self::enter_staging_region(
|
||||||
&ctx.mailbox,
|
&ctx.mailbox,
|
||||||
&ctx.server_addr,
|
&ctx.server_addr,
|
||||||
peer,
|
peer,
|
||||||
instruction,
|
enter_staging_regions,
|
||||||
operation_timeout,
|
operation_timeout,
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
@@ -208,12 +200,19 @@ impl EnterStagingRegion {
|
|||||||
mailbox: &MailboxRef,
|
mailbox: &MailboxRef,
|
||||||
server_addr: &str,
|
server_addr: &str,
|
||||||
peer: &Peer,
|
peer: &Peer,
|
||||||
instruction: &Instruction,
|
enter_staging_regions: &[common_meta::instruction::EnterStagingRegion],
|
||||||
timeout: Duration,
|
timeout: Duration,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
let ch = Channel::Datanode(peer.id);
|
let ch = Channel::Datanode(peer.id);
|
||||||
|
let instruction = Instruction::EnterStagingRegions(enter_staging_regions.to_vec());
|
||||||
let message = MailboxMessage::json_message(
|
let message = MailboxMessage::json_message(
|
||||||
&format!("Enter staging regions: {:?}", instruction),
|
&format!(
|
||||||
|
"Enter staging regions: {:?}",
|
||||||
|
enter_staging_regions
|
||||||
|
.iter()
|
||||||
|
.map(|r| r.region_id)
|
||||||
|
.collect::<Vec<_>>()
|
||||||
|
),
|
||||||
&format!("Metasrv@{}", server_addr),
|
&format!("Metasrv@{}", server_addr),
|
||||||
&format!("Datanode-{}@{}", peer.id, peer.addr),
|
&format!("Datanode-{}@{}", peer.id, peer.addr),
|
||||||
common_time::util::current_time_millis(),
|
common_time::util::current_time_millis(),
|
||||||
@@ -328,7 +327,6 @@ mod tests {
|
|||||||
use std::assert_matches::assert_matches;
|
use std::assert_matches::assert_matches;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
use common_meta::instruction::Instruction;
|
|
||||||
use common_meta::peer::Peer;
|
use common_meta::peer::Peer;
|
||||||
use common_meta::rpc::router::{Region, RegionRoute};
|
use common_meta::rpc::router::{Region, RegionRoute};
|
||||||
use store_api::storage::RegionId;
|
use store_api::storage::RegionId;
|
||||||
@@ -376,7 +374,7 @@ mod tests {
|
|||||||
},
|
},
|
||||||
],
|
],
|
||||||
central_region: RegionId::new(table_id, 1),
|
central_region: RegionId::new(table_id, 1),
|
||||||
central_region_datanode_id: 1,
|
central_region_datanode: Peer::empty(1),
|
||||||
};
|
};
|
||||||
let targets = test_targets();
|
let targets = test_targets();
|
||||||
let instructions =
|
let instructions =
|
||||||
@@ -384,12 +382,7 @@ mod tests {
|
|||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
assert_eq!(instructions.len(), 2);
|
assert_eq!(instructions.len(), 2);
|
||||||
let instruction_1 = instructions
|
let instruction_1 = instructions.get(&Peer::empty(1)).unwrap().clone();
|
||||||
.get(&Peer::empty(1))
|
|
||||||
.unwrap()
|
|
||||||
.clone()
|
|
||||||
.into_enter_staging_regions()
|
|
||||||
.unwrap();
|
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
instruction_1,
|
instruction_1,
|
||||||
vec![common_meta::instruction::EnterStagingRegion {
|
vec![common_meta::instruction::EnterStagingRegion {
|
||||||
@@ -397,12 +390,7 @@ mod tests {
|
|||||||
partition_expr: range_expr("x", 0, 10).as_json_str().unwrap(),
|
partition_expr: range_expr("x", 0, 10).as_json_str().unwrap(),
|
||||||
}]
|
}]
|
||||||
);
|
);
|
||||||
let instruction_2 = instructions
|
let instruction_2 = instructions.get(&Peer::empty(2)).unwrap().clone();
|
||||||
.get(&Peer::empty(2))
|
|
||||||
.unwrap()
|
|
||||||
.clone()
|
|
||||||
.into_enter_staging_regions()
|
|
||||||
.unwrap();
|
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
instruction_2,
|
instruction_2,
|
||||||
vec![common_meta::instruction::EnterStagingRegion {
|
vec![common_meta::instruction::EnterStagingRegion {
|
||||||
@@ -417,18 +405,17 @@ mod tests {
|
|||||||
let env = TestingEnv::new();
|
let env = TestingEnv::new();
|
||||||
let server_addr = "localhost";
|
let server_addr = "localhost";
|
||||||
let peer = Peer::empty(1);
|
let peer = Peer::empty(1);
|
||||||
let instruction =
|
let enter_staging_regions = vec![common_meta::instruction::EnterStagingRegion {
|
||||||
Instruction::EnterStagingRegions(vec![common_meta::instruction::EnterStagingRegion {
|
region_id: RegionId::new(1024, 1),
|
||||||
region_id: RegionId::new(1024, 1),
|
partition_expr: range_expr("x", 0, 10).as_json_str().unwrap(),
|
||||||
partition_expr: range_expr("x", 0, 10).as_json_str().unwrap(),
|
}];
|
||||||
}]);
|
|
||||||
let timeout = Duration::from_secs(10);
|
let timeout = Duration::from_secs(10);
|
||||||
|
|
||||||
let err = EnterStagingRegion::enter_staging_region(
|
let err = EnterStagingRegion::enter_staging_region(
|
||||||
env.mailbox_ctx.mailbox(),
|
env.mailbox_ctx.mailbox(),
|
||||||
server_addr,
|
server_addr,
|
||||||
&peer,
|
&peer,
|
||||||
&instruction,
|
&enter_staging_regions,
|
||||||
timeout,
|
timeout,
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
@@ -447,11 +434,10 @@ mod tests {
|
|||||||
.await;
|
.await;
|
||||||
let server_addr = "localhost";
|
let server_addr = "localhost";
|
||||||
let peer = Peer::empty(1);
|
let peer = Peer::empty(1);
|
||||||
let instruction =
|
let enter_staging_regions = vec![common_meta::instruction::EnterStagingRegion {
|
||||||
Instruction::EnterStagingRegions(vec![common_meta::instruction::EnterStagingRegion {
|
region_id: RegionId::new(1024, 1),
|
||||||
region_id: RegionId::new(1024, 1),
|
partition_expr: range_expr("x", 0, 10).as_json_str().unwrap(),
|
||||||
partition_expr: range_expr("x", 0, 10).as_json_str().unwrap(),
|
}];
|
||||||
}]);
|
|
||||||
let timeout = Duration::from_secs(10);
|
let timeout = Duration::from_secs(10);
|
||||||
|
|
||||||
// Sends a timeout error.
|
// Sends a timeout error.
|
||||||
@@ -463,7 +449,7 @@ mod tests {
|
|||||||
env.mailbox_ctx.mailbox(),
|
env.mailbox_ctx.mailbox(),
|
||||||
server_addr,
|
server_addr,
|
||||||
&peer,
|
&peer,
|
||||||
&instruction,
|
&enter_staging_regions,
|
||||||
timeout,
|
timeout,
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
@@ -479,11 +465,10 @@ mod tests {
|
|||||||
|
|
||||||
let server_addr = "localhost";
|
let server_addr = "localhost";
|
||||||
let peer = Peer::empty(1);
|
let peer = Peer::empty(1);
|
||||||
let instruction =
|
let enter_staging_regions = vec![common_meta::instruction::EnterStagingRegion {
|
||||||
Instruction::EnterStagingRegions(vec![common_meta::instruction::EnterStagingRegion {
|
region_id: RegionId::new(1024, 1),
|
||||||
region_id: RegionId::new(1024, 1),
|
partition_expr: range_expr("x", 0, 10).as_json_str().unwrap(),
|
||||||
partition_expr: range_expr("x", 0, 10).as_json_str().unwrap(),
|
}];
|
||||||
}]);
|
|
||||||
let timeout = Duration::from_secs(10);
|
let timeout = Duration::from_secs(10);
|
||||||
|
|
||||||
env.mailbox_ctx
|
env.mailbox_ctx
|
||||||
@@ -498,7 +483,7 @@ mod tests {
|
|||||||
env.mailbox_ctx.mailbox(),
|
env.mailbox_ctx.mailbox(),
|
||||||
server_addr,
|
server_addr,
|
||||||
&peer,
|
&peer,
|
||||||
&instruction,
|
&enter_staging_regions,
|
||||||
timeout,
|
timeout,
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
@@ -516,11 +501,10 @@ mod tests {
|
|||||||
.await;
|
.await;
|
||||||
let server_addr = "localhost";
|
let server_addr = "localhost";
|
||||||
let peer = Peer::empty(1);
|
let peer = Peer::empty(1);
|
||||||
let instruction =
|
let enter_staging_regions = vec![common_meta::instruction::EnterStagingRegion {
|
||||||
Instruction::EnterStagingRegions(vec![common_meta::instruction::EnterStagingRegion {
|
region_id: RegionId::new(1024, 1),
|
||||||
region_id: RegionId::new(1024, 1),
|
partition_expr: range_expr("x", 0, 10).as_json_str().unwrap(),
|
||||||
partition_expr: range_expr("x", 0, 10).as_json_str().unwrap(),
|
}];
|
||||||
}]);
|
|
||||||
let timeout = Duration::from_secs(10);
|
let timeout = Duration::from_secs(10);
|
||||||
|
|
||||||
// Sends a failed reply.
|
// Sends a failed reply.
|
||||||
@@ -538,7 +522,7 @@ mod tests {
|
|||||||
env.mailbox_ctx.mailbox(),
|
env.mailbox_ctx.mailbox(),
|
||||||
server_addr,
|
server_addr,
|
||||||
&peer,
|
&peer,
|
||||||
&instruction,
|
&enter_staging_regions,
|
||||||
timeout,
|
timeout,
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
@@ -565,7 +549,7 @@ mod tests {
|
|||||||
env.mailbox_ctx.mailbox(),
|
env.mailbox_ctx.mailbox(),
|
||||||
server_addr,
|
server_addr,
|
||||||
&peer,
|
&peer,
|
||||||
&instruction,
|
&enter_staging_regions,
|
||||||
timeout,
|
timeout,
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
@@ -596,7 +580,7 @@ mod tests {
|
|||||||
},
|
},
|
||||||
],
|
],
|
||||||
central_region: RegionId::new(table_id, 1),
|
central_region: RegionId::new(table_id, 1),
|
||||||
central_region_datanode_id: 1,
|
central_region_datanode: Peer::empty(1),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
222
src/meta-srv/src/procedure/repartition/group/remap_manifest.rs
Normal file
222
src/meta-srv/src/procedure/repartition/group/remap_manifest.rs
Normal file
@@ -0,0 +1,222 @@
|
|||||||
|
// Copyright 2023 Greptime Team
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
use std::any::Any;
|
||||||
|
use std::collections::HashMap;
|
||||||
|
use std::time::{Duration, Instant};
|
||||||
|
|
||||||
|
use api::v1::meta::MailboxMessage;
|
||||||
|
use common_meta::instruction::{Instruction, InstructionReply, RemapManifestReply};
|
||||||
|
use common_meta::peer::Peer;
|
||||||
|
use common_procedure::{Context as ProcedureContext, Status};
|
||||||
|
use common_telemetry::{info, warn};
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
use snafu::{OptionExt, ResultExt, ensure};
|
||||||
|
use store_api::storage::RegionId;
|
||||||
|
|
||||||
|
use crate::error::{self, Result};
|
||||||
|
use crate::handler::HeartbeatMailbox;
|
||||||
|
use crate::procedure::repartition::group::apply_staging_manifest::ApplyStagingManifest;
|
||||||
|
use crate::procedure::repartition::group::{Context, State};
|
||||||
|
use crate::procedure::repartition::plan::RegionDescriptor;
|
||||||
|
use crate::service::mailbox::{Channel, MailboxRef};
|
||||||
|
|
||||||
|
#[derive(Debug, Serialize, Deserialize)]
|
||||||
|
pub(crate) struct RemapManifest;
|
||||||
|
|
||||||
|
#[async_trait::async_trait]
|
||||||
|
#[typetag::serde]
|
||||||
|
impl State for RemapManifest {
|
||||||
|
async fn next(
|
||||||
|
&mut self,
|
||||||
|
ctx: &mut Context,
|
||||||
|
_procedure_ctx: &ProcedureContext,
|
||||||
|
) -> Result<(Box<dyn State>, Status)> {
|
||||||
|
let prepare_result = ctx.persistent_ctx.group_prepare_result.as_ref().unwrap();
|
||||||
|
let remap = Self::build_remap_manifest_instructions(
|
||||||
|
&ctx.persistent_ctx.sources,
|
||||||
|
&ctx.persistent_ctx.targets,
|
||||||
|
&ctx.persistent_ctx.region_mapping,
|
||||||
|
prepare_result.central_region,
|
||||||
|
)?;
|
||||||
|
let operation_timeout =
|
||||||
|
ctx.next_operation_timeout()
|
||||||
|
.context(error::ExceededDeadlineSnafu {
|
||||||
|
operation: "Remap manifests",
|
||||||
|
})?;
|
||||||
|
let manifest_paths = Self::remap_manifests(
|
||||||
|
&ctx.mailbox,
|
||||||
|
&ctx.server_addr,
|
||||||
|
&prepare_result.central_region_datanode,
|
||||||
|
&remap,
|
||||||
|
operation_timeout,
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
let table_id = ctx.persistent_ctx.table_id;
|
||||||
|
let group_id = ctx.persistent_ctx.group_id;
|
||||||
|
|
||||||
|
if manifest_paths.len() != ctx.persistent_ctx.targets.len() {
|
||||||
|
warn!(
|
||||||
|
"Mismatch in manifest paths count: expected {}, got {}. This occurred during remapping manifests for group {} and table {}.",
|
||||||
|
ctx.persistent_ctx.targets.len(),
|
||||||
|
manifest_paths.len(),
|
||||||
|
group_id,
|
||||||
|
table_id
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx.persistent_ctx.staging_manifest_paths = manifest_paths;
|
||||||
|
|
||||||
|
Ok((Box::new(ApplyStagingManifest), Status::executing(true)))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn as_any(&self) -> &dyn Any {
|
||||||
|
self
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl RemapManifest {
|
||||||
|
fn build_remap_manifest_instructions(
|
||||||
|
source_regions: &[RegionDescriptor],
|
||||||
|
target_regions: &[RegionDescriptor],
|
||||||
|
region_mapping: &HashMap<RegionId, Vec<RegionId>>,
|
||||||
|
central_region_id: RegionId,
|
||||||
|
) -> Result<common_meta::instruction::RemapManifest> {
|
||||||
|
let new_partition_exprs = target_regions
|
||||||
|
.iter()
|
||||||
|
.map(|r| {
|
||||||
|
Ok((
|
||||||
|
r.region_id,
|
||||||
|
r.partition_expr
|
||||||
|
.as_json_str()
|
||||||
|
.context(error::SerializePartitionExprSnafu)?,
|
||||||
|
))
|
||||||
|
})
|
||||||
|
.collect::<Result<HashMap<RegionId, String>>>()?;
|
||||||
|
|
||||||
|
Ok(common_meta::instruction::RemapManifest {
|
||||||
|
region_id: central_region_id,
|
||||||
|
input_regions: source_regions.iter().map(|r| r.region_id).collect(),
|
||||||
|
region_mapping: region_mapping.clone(),
|
||||||
|
new_partition_exprs,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn remap_manifests(
|
||||||
|
mailbox: &MailboxRef,
|
||||||
|
server_addr: &str,
|
||||||
|
peer: &Peer,
|
||||||
|
remap: &common_meta::instruction::RemapManifest,
|
||||||
|
timeout: Duration,
|
||||||
|
) -> Result<HashMap<RegionId, String>> {
|
||||||
|
let ch = Channel::Datanode(peer.id);
|
||||||
|
let instruction = Instruction::RemapManifest(remap.clone());
|
||||||
|
let message = MailboxMessage::json_message(
|
||||||
|
&format!(
|
||||||
|
"Remap manifests, central region: {}, input regions: {:?}",
|
||||||
|
remap.region_id, remap.input_regions
|
||||||
|
),
|
||||||
|
&format!("Metasrv@{}", server_addr),
|
||||||
|
&format!("Datanode-{}@{}", peer.id, peer.addr),
|
||||||
|
common_time::util::current_time_millis(),
|
||||||
|
&instruction,
|
||||||
|
)
|
||||||
|
.with_context(|_| error::SerializeToJsonSnafu {
|
||||||
|
input: instruction.to_string(),
|
||||||
|
})?;
|
||||||
|
let now = Instant::now();
|
||||||
|
let receiver = mailbox.send(&ch, message, timeout).await;
|
||||||
|
|
||||||
|
let receiver = match receiver {
|
||||||
|
Ok(receiver) => receiver,
|
||||||
|
Err(error::Error::PusherNotFound { .. }) => error::RetryLaterSnafu {
|
||||||
|
reason: format!(
|
||||||
|
"Pusher not found for remap manifests on datanode {:?}, elapsed: {:?}",
|
||||||
|
peer,
|
||||||
|
now.elapsed()
|
||||||
|
),
|
||||||
|
}
|
||||||
|
.fail()?,
|
||||||
|
Err(err) => {
|
||||||
|
return Err(err);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
match receiver.await {
|
||||||
|
Ok(msg) => {
|
||||||
|
let reply = HeartbeatMailbox::json_reply(&msg)?;
|
||||||
|
info!(
|
||||||
|
"Received remap manifest reply: {:?}, elapsed: {:?}",
|
||||||
|
reply,
|
||||||
|
now.elapsed()
|
||||||
|
);
|
||||||
|
let InstructionReply::RemapManifest(reply) = reply else {
|
||||||
|
return error::UnexpectedInstructionReplySnafu {
|
||||||
|
mailbox_message: msg.to_string(),
|
||||||
|
reason: "expect remap manifest reply",
|
||||||
|
}
|
||||||
|
.fail();
|
||||||
|
};
|
||||||
|
|
||||||
|
Self::handle_remap_manifest_reply(remap.region_id, reply, &now, peer)
|
||||||
|
}
|
||||||
|
Err(error::Error::MailboxTimeout { .. }) => {
|
||||||
|
let reason = format!(
|
||||||
|
"Mailbox received timeout for remap manifests on datanode {:?}, elapsed: {:?}",
|
||||||
|
peer,
|
||||||
|
now.elapsed()
|
||||||
|
);
|
||||||
|
error::RetryLaterSnafu { reason }.fail()
|
||||||
|
}
|
||||||
|
Err(err) => Err(err),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn handle_remap_manifest_reply(
|
||||||
|
region_id: RegionId,
|
||||||
|
RemapManifestReply {
|
||||||
|
exists,
|
||||||
|
manifest_paths,
|
||||||
|
error,
|
||||||
|
}: RemapManifestReply,
|
||||||
|
now: &Instant,
|
||||||
|
peer: &Peer,
|
||||||
|
) -> Result<HashMap<RegionId, String>> {
|
||||||
|
ensure!(
|
||||||
|
exists,
|
||||||
|
error::UnexpectedSnafu {
|
||||||
|
violated: format!(
|
||||||
|
"Region {} doesn't exist on datanode {:?}, elapsed: {:?}",
|
||||||
|
region_id,
|
||||||
|
peer,
|
||||||
|
now.elapsed()
|
||||||
|
)
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
if error.is_some() {
|
||||||
|
return error::RetryLaterSnafu {
|
||||||
|
reason: format!(
|
||||||
|
"Failed to remap manifest on datanode {:?}, error: {:?}, elapsed: {:?}",
|
||||||
|
peer,
|
||||||
|
error,
|
||||||
|
now.elapsed()
|
||||||
|
),
|
||||||
|
}
|
||||||
|
.fail();
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(manifest_paths)
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,40 @@
|
|||||||
|
// Copyright 2023 Greptime Team
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
use std::any::Any;
|
||||||
|
|
||||||
|
use common_procedure::{Context as ProcedureContext, Status};
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
|
use crate::error::Result;
|
||||||
|
use crate::procedure::repartition::group::{Context, State};
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
pub struct RepartitionEnd;
|
||||||
|
|
||||||
|
#[async_trait::async_trait]
|
||||||
|
#[typetag::serde]
|
||||||
|
impl State for RepartitionEnd {
|
||||||
|
async fn next(
|
||||||
|
&mut self,
|
||||||
|
_ctx: &mut Context,
|
||||||
|
_procedure_ctx: &ProcedureContext,
|
||||||
|
) -> Result<(Box<dyn State>, Status)> {
|
||||||
|
Ok((Box::new(RepartitionEnd), Status::done()))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn as_any(&self) -> &dyn Any {
|
||||||
|
self
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -22,6 +22,7 @@ use serde::{Deserialize, Serialize};
|
|||||||
use snafu::{OptionExt, ResultExt, ensure};
|
use snafu::{OptionExt, ResultExt, ensure};
|
||||||
|
|
||||||
use crate::error::{self, Result};
|
use crate::error::{self, Result};
|
||||||
|
use crate::procedure::repartition::group::update_metadata::UpdateMetadata;
|
||||||
use crate::procedure::repartition::group::{
|
use crate::procedure::repartition::group::{
|
||||||
Context, GroupId, GroupPrepareResult, State, region_routes,
|
Context, GroupId, GroupPrepareResult, State, region_routes,
|
||||||
};
|
};
|
||||||
@@ -109,7 +110,7 @@ impl RepartitionStart {
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
let central_region = sources[0].region_id;
|
let central_region = sources[0].region_id;
|
||||||
let central_region_datanode_id = source_region_routes[0]
|
let central_region_datanode = source_region_routes[0]
|
||||||
.leader_peer
|
.leader_peer
|
||||||
.as_ref()
|
.as_ref()
|
||||||
.context(error::UnexpectedSnafu {
|
.context(error::UnexpectedSnafu {
|
||||||
@@ -118,20 +119,22 @@ impl RepartitionStart {
|
|||||||
central_region
|
central_region
|
||||||
),
|
),
|
||||||
})?
|
})?
|
||||||
.id;
|
.clone();
|
||||||
|
|
||||||
Ok(GroupPrepareResult {
|
Ok(GroupPrepareResult {
|
||||||
source_routes: source_region_routes,
|
source_routes: source_region_routes,
|
||||||
target_routes: target_region_routes,
|
target_routes: target_region_routes,
|
||||||
central_region,
|
central_region,
|
||||||
central_region_datanode_id,
|
central_region_datanode,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
#[allow(dead_code)]
|
#[allow(dead_code)]
|
||||||
fn next_state() -> (Box<dyn State>, Status) {
|
fn next_state() -> (Box<dyn State>, Status) {
|
||||||
// TODO(weny): change it later.
|
(
|
||||||
(Box::new(RepartitionStart), Status::executing(true))
|
Box::new(UpdateMetadata::ApplyStaging),
|
||||||
|
Status::executing(true),
|
||||||
|
)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -17,12 +17,14 @@ pub(crate) mod rollback_staging_region;
|
|||||||
|
|
||||||
use std::any::Any;
|
use std::any::Any;
|
||||||
|
|
||||||
|
use common_meta::lock_key::TableLock;
|
||||||
use common_procedure::{Context as ProcedureContext, Status};
|
use common_procedure::{Context as ProcedureContext, Status};
|
||||||
use common_telemetry::warn;
|
use common_telemetry::warn;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
use crate::error::Result;
|
use crate::error::Result;
|
||||||
use crate::procedure::repartition::group::repartition_start::RepartitionStart;
|
use crate::procedure::repartition::group::enter_staging_region::EnterStagingRegion;
|
||||||
|
use crate::procedure::repartition::group::repartition_end::RepartitionEnd;
|
||||||
use crate::procedure::repartition::group::{Context, State};
|
use crate::procedure::repartition::group::{Context, State};
|
||||||
|
|
||||||
#[derive(Debug, Serialize, Deserialize)]
|
#[derive(Debug, Serialize, Deserialize)]
|
||||||
@@ -33,22 +35,16 @@ pub enum UpdateMetadata {
|
|||||||
RollbackStaging,
|
RollbackStaging,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl UpdateMetadata {
|
|
||||||
#[allow(dead_code)]
|
|
||||||
fn next_state() -> (Box<dyn State>, Status) {
|
|
||||||
// TODO(weny): change it later.
|
|
||||||
(Box::new(RepartitionStart), Status::executing(true))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[async_trait::async_trait]
|
#[async_trait::async_trait]
|
||||||
#[typetag::serde]
|
#[typetag::serde]
|
||||||
impl State for UpdateMetadata {
|
impl State for UpdateMetadata {
|
||||||
async fn next(
|
async fn next(
|
||||||
&mut self,
|
&mut self,
|
||||||
ctx: &mut Context,
|
ctx: &mut Context,
|
||||||
_procedure_ctx: &ProcedureContext,
|
procedure_ctx: &ProcedureContext,
|
||||||
) -> Result<(Box<dyn State>, Status)> {
|
) -> Result<(Box<dyn State>, Status)> {
|
||||||
|
let table_lock = TableLock::Write(ctx.persistent_ctx.table_id).into();
|
||||||
|
let _guard = procedure_ctx.provider.acquire_lock(&table_lock).await;
|
||||||
match self {
|
match self {
|
||||||
UpdateMetadata::ApplyStaging => {
|
UpdateMetadata::ApplyStaging => {
|
||||||
// TODO(weny): If all metadata have already been updated, skip applying staging regions.
|
// TODO(weny): If all metadata have already been updated, skip applying staging regions.
|
||||||
@@ -59,7 +55,7 @@ impl State for UpdateMetadata {
|
|||||||
"Failed to broadcast the invalidate table cache message during the apply staging regions, error: {err:?}"
|
"Failed to broadcast the invalidate table cache message during the apply staging regions, error: {err:?}"
|
||||||
);
|
);
|
||||||
};
|
};
|
||||||
Ok(Self::next_state())
|
Ok((Box::new(EnterStagingRegion), Status::executing(false)))
|
||||||
}
|
}
|
||||||
UpdateMetadata::RollbackStaging => {
|
UpdateMetadata::RollbackStaging => {
|
||||||
self.rollback_staging_regions(ctx).await?;
|
self.rollback_staging_regions(ctx).await?;
|
||||||
@@ -69,7 +65,7 @@ impl State for UpdateMetadata {
|
|||||||
"Failed to broadcast the invalidate table cache message during the rollback staging regions, error: {err:?}"
|
"Failed to broadcast the invalidate table cache message during the rollback staging regions, error: {err:?}"
|
||||||
);
|
);
|
||||||
};
|
};
|
||||||
Ok(Self::next_state())
|
Ok((Box::new(RepartitionEnd), Status::executing(false)))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -41,6 +41,9 @@ pub struct AllocationPlanEntry {
|
|||||||
pub regions_to_allocate: usize,
|
pub regions_to_allocate: usize,
|
||||||
/// The number of regions that need to be deallocated (source count - target count, if positive).
|
/// The number of regions that need to be deallocated (source count - target count, if positive).
|
||||||
pub regions_to_deallocate: usize,
|
pub regions_to_deallocate: usize,
|
||||||
|
/// For each `source_regions[k]`, the corresponding vector contains global
|
||||||
|
/// `target_partition_exprs` that overlap with it.
|
||||||
|
pub transition_map: Vec<Vec<usize>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// A plan entry for the dispatch phase after region allocation,
|
/// A plan entry for the dispatch phase after region allocation,
|
||||||
@@ -57,6 +60,9 @@ pub struct RepartitionPlanEntry {
|
|||||||
pub allocated_region_ids: Vec<RegionId>,
|
pub allocated_region_ids: Vec<RegionId>,
|
||||||
/// The region ids of the regions that are pending deallocation.
|
/// The region ids of the regions that are pending deallocation.
|
||||||
pub pending_deallocate_region_ids: Vec<RegionId>,
|
pub pending_deallocate_region_ids: Vec<RegionId>,
|
||||||
|
/// For each `source_regions[k]`, the corresponding vector contains global
|
||||||
|
/// `target_regions` that overlap with it.
|
||||||
|
pub transition_map: Vec<Vec<usize>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl RepartitionPlanEntry {
|
impl RepartitionPlanEntry {
|
||||||
@@ -71,6 +77,7 @@ impl RepartitionPlanEntry {
|
|||||||
target_partition_exprs,
|
target_partition_exprs,
|
||||||
regions_to_allocate,
|
regions_to_allocate,
|
||||||
regions_to_deallocate,
|
regions_to_deallocate,
|
||||||
|
transition_map,
|
||||||
}: &AllocationPlanEntry,
|
}: &AllocationPlanEntry,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
debug_assert!(*regions_to_allocate == 0 && *regions_to_deallocate == 0);
|
debug_assert!(*regions_to_allocate == 0 && *regions_to_deallocate == 0);
|
||||||
@@ -89,6 +96,7 @@ impl RepartitionPlanEntry {
|
|||||||
target_regions,
|
target_regions,
|
||||||
allocated_region_ids: vec![],
|
allocated_region_ids: vec![],
|
||||||
pending_deallocate_region_ids: vec![],
|
pending_deallocate_region_ids: vec![],
|
||||||
|
transition_map: transition_map.clone(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -129,6 +129,7 @@ impl RepartitionStart {
|
|||||||
target_partition_exprs,
|
target_partition_exprs,
|
||||||
regions_to_allocate,
|
regions_to_allocate,
|
||||||
regions_to_deallocate,
|
regions_to_deallocate,
|
||||||
|
transition_map: subtask.transition_map,
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
.collect::<Vec<_>>()
|
.collect::<Vec<_>>()
|
||||||
|
|||||||
@@ -12,6 +12,7 @@
|
|||||||
// See the License for the specific language governing permissions and
|
// See the License for the specific language governing permissions and
|
||||||
// limitations under the License.
|
// limitations under the License.
|
||||||
|
|
||||||
|
use std::collections::HashMap;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
|
use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
|
||||||
@@ -87,9 +88,13 @@ pub fn new_persistent_context(
|
|||||||
) -> PersistentContext {
|
) -> PersistentContext {
|
||||||
PersistentContext {
|
PersistentContext {
|
||||||
group_id: Uuid::new_v4(),
|
group_id: Uuid::new_v4(),
|
||||||
|
catalog_name: "test_catalog".to_string(),
|
||||||
|
schema_name: "test_schema".to_string(),
|
||||||
table_id,
|
table_id,
|
||||||
sources,
|
sources,
|
||||||
targets,
|
targets,
|
||||||
|
region_mapping: HashMap::new(),
|
||||||
group_prepare_result: None,
|
group_prepare_result: None,
|
||||||
|
staging_manifest_paths: HashMap::new(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -13,6 +13,7 @@
|
|||||||
// limitations under the License.
|
// limitations under the License.
|
||||||
|
|
||||||
use common_meta::kv_backend::etcd::create_etcd_tls_options;
|
use common_meta::kv_backend::etcd::create_etcd_tls_options;
|
||||||
|
use common_telemetry::warn;
|
||||||
use etcd_client::{Client, ConnectOptions};
|
use etcd_client::{Client, ConnectOptions};
|
||||||
use servers::tls::{TlsMode, TlsOption};
|
use servers::tls::{TlsMode, TlsOption};
|
||||||
use snafu::ResultExt;
|
use snafu::ResultExt;
|
||||||
@@ -38,9 +39,12 @@ pub async fn create_etcd_client_with_tls(
|
|||||||
client_options.keep_alive_interval,
|
client_options.keep_alive_interval,
|
||||||
client_options.keep_alive_timeout,
|
client_options.keep_alive_timeout,
|
||||||
);
|
);
|
||||||
|
|
||||||
|
let all_endpoints_use_https = etcd_endpoints.iter().all(|e| e.starts_with("https"));
|
||||||
if let Some(tls_config) = tls_config
|
if let Some(tls_config) = tls_config
|
||||||
&& let Some(tls_options) = create_etcd_tls_options(&convert_tls_option(tls_config))
|
&& let Some(tls_options) =
|
||||||
.context(BuildTlsOptionsSnafu)?
|
create_etcd_tls_options(&convert_tls_option(all_endpoints_use_https, tls_config))
|
||||||
|
.context(BuildTlsOptionsSnafu)?
|
||||||
{
|
{
|
||||||
connect_options = connect_options.with_tls(tls_options);
|
connect_options = connect_options.with_tls(tls_options);
|
||||||
}
|
}
|
||||||
@@ -50,9 +54,22 @@ pub async fn create_etcd_client_with_tls(
|
|||||||
.context(error::ConnectEtcdSnafu)
|
.context(error::ConnectEtcdSnafu)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn convert_tls_option(tls_option: &TlsOption) -> common_meta::kv_backend::etcd::TlsOption {
|
fn convert_tls_option(
|
||||||
|
all_endpoints_use_https: bool,
|
||||||
|
tls_option: &TlsOption,
|
||||||
|
) -> common_meta::kv_backend::etcd::TlsOption {
|
||||||
let mode = match tls_option.mode {
|
let mode = match tls_option.mode {
|
||||||
TlsMode::Disable => common_meta::kv_backend::etcd::TlsMode::Disable,
|
TlsMode::Disable => common_meta::kv_backend::etcd::TlsMode::Disable,
|
||||||
|
TlsMode::Prefer => {
|
||||||
|
if all_endpoints_use_https {
|
||||||
|
common_meta::kv_backend::etcd::TlsMode::Require
|
||||||
|
} else {
|
||||||
|
warn!(
|
||||||
|
"All endpoints use HTTP, TLS prefer mode is not supported, using disable mode"
|
||||||
|
);
|
||||||
|
common_meta::kv_backend::etcd::TlsMode::Disable
|
||||||
|
}
|
||||||
|
}
|
||||||
_ => common_meta::kv_backend::etcd::TlsMode::Require,
|
_ => common_meta::kv_backend::etcd::TlsMode::Require,
|
||||||
};
|
};
|
||||||
common_meta::kv_backend::etcd::TlsOption {
|
common_meta::kv_backend::etcd::TlsOption {
|
||||||
|
|||||||
@@ -13,6 +13,7 @@ workspace = true
|
|||||||
ahash.workspace = true
|
ahash.workspace = true
|
||||||
api.workspace = true
|
api.workspace = true
|
||||||
arrow.workspace = true
|
arrow.workspace = true
|
||||||
|
arrow-schema.workspace = true
|
||||||
async-trait.workspace = true
|
async-trait.workspace = true
|
||||||
catalog.workspace = true
|
catalog.workspace = true
|
||||||
chrono.workspace = true
|
chrono.workspace = true
|
||||||
|
|||||||
@@ -609,10 +609,14 @@ pub enum Error {
|
|||||||
},
|
},
|
||||||
|
|
||||||
#[snafu(display(
|
#[snafu(display(
|
||||||
"Multiple pipelines with different schemas found, but none under current schema. Please replicate one of them or delete until only one schema left. schemas: {}",
|
"Multiple pipelines with different schemas found, but none under current schema. Please replicate one of them or delete until only one schema left. name: {}, current_schema: {}, schemas: {}",
|
||||||
schemas
|
name,
|
||||||
|
current_schema,
|
||||||
|
schemas,
|
||||||
))]
|
))]
|
||||||
MultiPipelineWithDiffSchema {
|
MultiPipelineWithDiffSchema {
|
||||||
|
name: String,
|
||||||
|
current_schema: String,
|
||||||
schemas: String,
|
schemas: String,
|
||||||
#[snafu(implicit)]
|
#[snafu(implicit)]
|
||||||
location: Location,
|
location: Location,
|
||||||
@@ -800,6 +804,20 @@ pub enum Error {
|
|||||||
#[snafu(implicit)]
|
#[snafu(implicit)]
|
||||||
location: Location,
|
location: Location,
|
||||||
},
|
},
|
||||||
|
|
||||||
|
#[snafu(transparent)]
|
||||||
|
GreptimeProto {
|
||||||
|
source: api::error::Error,
|
||||||
|
#[snafu(implicit)]
|
||||||
|
location: Location,
|
||||||
|
},
|
||||||
|
|
||||||
|
#[snafu(transparent)]
|
||||||
|
Datatypes {
|
||||||
|
source: datatypes::error::Error,
|
||||||
|
#[snafu(implicit)]
|
||||||
|
location: Location,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
pub type Result<T> = std::result::Result<T, Error>;
|
pub type Result<T> = std::result::Result<T, Error>;
|
||||||
@@ -920,6 +938,9 @@ impl ErrorExt for Error {
|
|||||||
FloatIsNan { .. }
|
FloatIsNan { .. }
|
||||||
| InvalidEpochForResolution { .. }
|
| InvalidEpochForResolution { .. }
|
||||||
| UnsupportedTypeInPipeline { .. } => StatusCode::InvalidArguments,
|
| UnsupportedTypeInPipeline { .. } => StatusCode::InvalidArguments,
|
||||||
|
|
||||||
|
GreptimeProto { source, .. } => source.status_code(),
|
||||||
|
Datatypes { source, .. } => source.status_code(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -19,13 +19,17 @@ use std::collections::{BTreeMap, HashSet};
|
|||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use ahash::{HashMap, HashMapExt};
|
use ahash::{HashMap, HashMapExt};
|
||||||
use api::helper::proto_value_type;
|
use api::helper::{ColumnDataTypeWrapper, encode_json_value};
|
||||||
use api::v1::column_data_type_extension::TypeExt;
|
use api::v1::column_def::{collect_column_options, options_from_column_schema};
|
||||||
use api::v1::value::ValueData;
|
use api::v1::value::ValueData;
|
||||||
use api::v1::{ColumnDataType, ColumnDataTypeExtension, JsonTypeExtension, SemanticType};
|
use api::v1::{ColumnDataType, SemanticType};
|
||||||
|
use arrow_schema::extension::ExtensionType;
|
||||||
use coerce::{coerce_columns, coerce_value};
|
use coerce::{coerce_columns, coerce_value};
|
||||||
use common_query::prelude::{greptime_timestamp, greptime_value};
|
use common_query::prelude::{greptime_timestamp, greptime_value};
|
||||||
use common_telemetry::warn;
|
use common_telemetry::warn;
|
||||||
|
use datatypes::data_type::ConcreteDataType;
|
||||||
|
use datatypes::extension::json::JsonExtensionType;
|
||||||
|
use datatypes::value::Value;
|
||||||
use greptime_proto::v1::{ColumnSchema, Row, Rows, Value as GreptimeValue};
|
use greptime_proto::v1::{ColumnSchema, Row, Rows, Value as GreptimeValue};
|
||||||
use itertools::Itertools;
|
use itertools::Itertools;
|
||||||
use jsonb::Number;
|
use jsonb::Number;
|
||||||
@@ -33,12 +37,15 @@ use once_cell::sync::OnceCell;
|
|||||||
use serde_json as serde_json_crate;
|
use serde_json as serde_json_crate;
|
||||||
use session::context::Channel;
|
use session::context::Channel;
|
||||||
use snafu::OptionExt;
|
use snafu::OptionExt;
|
||||||
|
use table::Table;
|
||||||
use vrl::prelude::{Bytes, VrlValueConvert};
|
use vrl::prelude::{Bytes, VrlValueConvert};
|
||||||
|
use vrl::value::value::StdError;
|
||||||
use vrl::value::{KeyString, Value as VrlValue};
|
use vrl::value::{KeyString, Value as VrlValue};
|
||||||
|
|
||||||
use crate::error::{
|
use crate::error::{
|
||||||
ArrayElementMustBeObjectSnafu, IdentifyPipelineColumnTypeMismatchSnafu, InvalidTimestampSnafu,
|
ArrayElementMustBeObjectSnafu, CoerceIncompatibleTypesSnafu,
|
||||||
Result, TimeIndexMustBeNonNullSnafu, TransformColumnNameMustBeUniqueSnafu,
|
IdentifyPipelineColumnTypeMismatchSnafu, InvalidTimestampSnafu, Result,
|
||||||
|
TimeIndexMustBeNonNullSnafu, TransformColumnNameMustBeUniqueSnafu,
|
||||||
TransformMultipleTimestampIndexSnafu, TransformTimestampIndexCountSnafu, ValueMustBeMapSnafu,
|
TransformMultipleTimestampIndexSnafu, TransformTimestampIndexCountSnafu, ValueMustBeMapSnafu,
|
||||||
};
|
};
|
||||||
use crate::etl::PipelineDocVersion;
|
use crate::etl::PipelineDocVersion;
|
||||||
@@ -272,15 +279,75 @@ impl GreptimeTransformer {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct ColumnMetadata {
|
||||||
|
column_schema: datatypes::schema::ColumnSchema,
|
||||||
|
semantic_type: SemanticType,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<ColumnSchema> for ColumnMetadata {
|
||||||
|
fn from(value: ColumnSchema) -> Self {
|
||||||
|
let datatype = value.datatype();
|
||||||
|
let semantic_type = value.semantic_type();
|
||||||
|
let ColumnSchema {
|
||||||
|
column_name,
|
||||||
|
datatype: _,
|
||||||
|
semantic_type: _,
|
||||||
|
datatype_extension,
|
||||||
|
options,
|
||||||
|
} = value;
|
||||||
|
|
||||||
|
let column_schema = datatypes::schema::ColumnSchema::new(
|
||||||
|
column_name,
|
||||||
|
ColumnDataTypeWrapper::new(datatype, datatype_extension).into(),
|
||||||
|
semantic_type != SemanticType::Timestamp,
|
||||||
|
);
|
||||||
|
|
||||||
|
let metadata = collect_column_options(options.as_ref());
|
||||||
|
let column_schema = column_schema.with_metadata(metadata);
|
||||||
|
|
||||||
|
Self {
|
||||||
|
column_schema,
|
||||||
|
semantic_type,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl TryFrom<ColumnMetadata> for ColumnSchema {
|
||||||
|
type Error = api::error::Error;
|
||||||
|
|
||||||
|
fn try_from(value: ColumnMetadata) -> std::result::Result<Self, Self::Error> {
|
||||||
|
let ColumnMetadata {
|
||||||
|
column_schema,
|
||||||
|
semantic_type,
|
||||||
|
} = value;
|
||||||
|
|
||||||
|
let options = options_from_column_schema(&column_schema);
|
||||||
|
|
||||||
|
let (datatype, datatype_extension) =
|
||||||
|
ColumnDataTypeWrapper::try_from(column_schema.data_type).map(|x| x.into_parts())?;
|
||||||
|
|
||||||
|
Ok(ColumnSchema {
|
||||||
|
column_name: column_schema.name,
|
||||||
|
datatype: datatype as _,
|
||||||
|
semantic_type: semantic_type as _,
|
||||||
|
datatype_extension,
|
||||||
|
options,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// This is used to record the current state schema information and a sequential cache of field names.
|
/// This is used to record the current state schema information and a sequential cache of field names.
|
||||||
/// As you traverse the user input JSON, this will change.
|
/// As you traverse the user input JSON, this will change.
|
||||||
/// It will record a superset of all user input schemas.
|
/// It will record a superset of all user input schemas.
|
||||||
#[derive(Debug, Default)]
|
#[derive(Default)]
|
||||||
pub struct SchemaInfo {
|
pub struct SchemaInfo {
|
||||||
/// schema info
|
/// schema info
|
||||||
pub schema: Vec<ColumnSchema>,
|
pub schema: Vec<ColumnMetadata>,
|
||||||
/// index of the column name
|
/// index of the column name
|
||||||
pub index: HashMap<String, usize>,
|
pub index: HashMap<String, usize>,
|
||||||
|
/// The pipeline's corresponding table (if already created). Useful to retrieve column schemas.
|
||||||
|
table: Option<Arc<Table>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl SchemaInfo {
|
impl SchemaInfo {
|
||||||
@@ -288,6 +355,7 @@ impl SchemaInfo {
|
|||||||
Self {
|
Self {
|
||||||
schema: Vec::with_capacity(capacity),
|
schema: Vec::with_capacity(capacity),
|
||||||
index: HashMap::with_capacity(capacity),
|
index: HashMap::with_capacity(capacity),
|
||||||
|
table: None,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -297,46 +365,88 @@ impl SchemaInfo {
|
|||||||
index.insert(schema.column_name.clone(), i);
|
index.insert(schema.column_name.clone(), i);
|
||||||
}
|
}
|
||||||
Self {
|
Self {
|
||||||
schema: schema_list,
|
schema: schema_list.into_iter().map(Into::into).collect(),
|
||||||
index,
|
index,
|
||||||
|
table: None,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn set_table(&mut self, table: Option<Arc<Table>>) {
|
||||||
|
self.table = table;
|
||||||
|
}
|
||||||
|
|
||||||
|
fn find_column_schema_in_table(&self, column_name: &str) -> Option<ColumnMetadata> {
|
||||||
|
if let Some(table) = &self.table
|
||||||
|
&& let Some(i) = table.schema_ref().column_index_by_name(column_name)
|
||||||
|
{
|
||||||
|
let column_schema = table.schema_ref().column_schemas()[i].clone();
|
||||||
|
|
||||||
|
let semantic_type = if column_schema.is_time_index() {
|
||||||
|
SemanticType::Timestamp
|
||||||
|
} else if table.table_info().meta.primary_key_indices.contains(&i) {
|
||||||
|
SemanticType::Tag
|
||||||
|
} else {
|
||||||
|
SemanticType::Field
|
||||||
|
};
|
||||||
|
|
||||||
|
Some(ColumnMetadata {
|
||||||
|
column_schema,
|
||||||
|
semantic_type,
|
||||||
|
})
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn column_schemas(&self) -> api::error::Result<Vec<ColumnSchema>> {
|
||||||
|
self.schema
|
||||||
|
.iter()
|
||||||
|
.map(|x| x.clone().try_into())
|
||||||
|
.collect::<api::error::Result<Vec<_>>>()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn resolve_schema(
|
fn resolve_schema(
|
||||||
index: Option<usize>,
|
index: Option<usize>,
|
||||||
value_data: ValueData,
|
pipeline_context: &PipelineContext,
|
||||||
column_schema: ColumnSchema,
|
column: &str,
|
||||||
row: &mut Vec<GreptimeValue>,
|
value_type: &ConcreteDataType,
|
||||||
schema_info: &mut SchemaInfo,
|
schema_info: &mut SchemaInfo,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
if let Some(index) = index {
|
if let Some(index) = index {
|
||||||
let api_value = GreptimeValue {
|
let column_type = &mut schema_info.schema[index].column_schema.data_type;
|
||||||
value_data: Some(value_data),
|
match (column_type, value_type) {
|
||||||
};
|
(column_type, value_type) if column_type == value_type => Ok(()),
|
||||||
// Safety unwrap is fine here because api_value is always valid
|
(ConcreteDataType::Json(column_type), ConcreteDataType::Json(value_type))
|
||||||
let value_column_data_type = proto_value_type(&api_value).unwrap();
|
if column_type.is_include(value_type) =>
|
||||||
// Safety unwrap is fine here because index is always valid
|
{
|
||||||
let schema_column_data_type = schema_info.schema.get(index).unwrap().datatype();
|
Ok(())
|
||||||
if value_column_data_type != schema_column_data_type {
|
|
||||||
IdentifyPipelineColumnTypeMismatchSnafu {
|
|
||||||
column: column_schema.column_name,
|
|
||||||
expected: schema_column_data_type.as_str_name(),
|
|
||||||
actual: value_column_data_type.as_str_name(),
|
|
||||||
}
|
}
|
||||||
.fail()
|
(column_type, value_type) => IdentifyPipelineColumnTypeMismatchSnafu {
|
||||||
} else {
|
column,
|
||||||
row[index] = api_value;
|
expected: column_type.to_string(),
|
||||||
Ok(())
|
actual: value_type.to_string(),
|
||||||
|
}
|
||||||
|
.fail(),
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
let key = column_schema.column_name.clone();
|
let column_schema = schema_info
|
||||||
|
.find_column_schema_in_table(column)
|
||||||
|
.unwrap_or_else(|| {
|
||||||
|
let semantic_type = decide_semantic(pipeline_context, column);
|
||||||
|
let column_schema = datatypes::schema::ColumnSchema::new(
|
||||||
|
column,
|
||||||
|
value_type.clone(),
|
||||||
|
semantic_type != SemanticType::Timestamp,
|
||||||
|
);
|
||||||
|
ColumnMetadata {
|
||||||
|
column_schema,
|
||||||
|
semantic_type,
|
||||||
|
}
|
||||||
|
});
|
||||||
|
let key = column.to_string();
|
||||||
schema_info.schema.push(column_schema);
|
schema_info.schema.push(column_schema);
|
||||||
schema_info.index.insert(key, schema_info.schema.len() - 1);
|
schema_info.index.insert(key, schema_info.schema.len() - 1);
|
||||||
let api_value = GreptimeValue {
|
|
||||||
value_data: Some(value_data),
|
|
||||||
};
|
|
||||||
row.push(api_value);
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -481,11 +591,11 @@ pub(crate) fn values_to_row(
|
|||||||
Ok(Row { values: row })
|
Ok(Row { values: row })
|
||||||
}
|
}
|
||||||
|
|
||||||
fn decide_semantic(p_ctx: &PipelineContext, column_name: &str) -> i32 {
|
fn decide_semantic(p_ctx: &PipelineContext, column_name: &str) -> SemanticType {
|
||||||
if p_ctx.channel == Channel::Prometheus && column_name != greptime_value() {
|
if p_ctx.channel == Channel::Prometheus && column_name != greptime_value() {
|
||||||
SemanticType::Tag as i32
|
SemanticType::Tag
|
||||||
} else {
|
} else {
|
||||||
SemanticType::Field as i32
|
SemanticType::Field
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -497,55 +607,56 @@ fn resolve_value(
|
|||||||
p_ctx: &PipelineContext,
|
p_ctx: &PipelineContext,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
let index = schema_info.index.get(&column_name).copied();
|
let index = schema_info.index.get(&column_name).copied();
|
||||||
let mut resolve_simple_type =
|
|
||||||
|value_data: ValueData, column_name: String, data_type: ColumnDataType| {
|
|
||||||
let semantic_type = decide_semantic(p_ctx, &column_name);
|
|
||||||
resolve_schema(
|
|
||||||
index,
|
|
||||||
value_data,
|
|
||||||
ColumnSchema {
|
|
||||||
column_name,
|
|
||||||
datatype: data_type as i32,
|
|
||||||
semantic_type,
|
|
||||||
datatype_extension: None,
|
|
||||||
options: None,
|
|
||||||
},
|
|
||||||
row,
|
|
||||||
schema_info,
|
|
||||||
)
|
|
||||||
};
|
|
||||||
|
|
||||||
match value {
|
let value_data = match value {
|
||||||
VrlValue::Null => {}
|
VrlValue::Null => None,
|
||||||
|
|
||||||
VrlValue::Integer(v) => {
|
VrlValue::Integer(v) => {
|
||||||
// safe unwrap after type matched
|
// safe unwrap after type matched
|
||||||
resolve_simple_type(ValueData::I64Value(v), column_name, ColumnDataType::Int64)?;
|
resolve_schema(
|
||||||
|
index,
|
||||||
|
p_ctx,
|
||||||
|
&column_name,
|
||||||
|
&ConcreteDataType::int64_datatype(),
|
||||||
|
schema_info,
|
||||||
|
)?;
|
||||||
|
Some(ValueData::I64Value(v))
|
||||||
}
|
}
|
||||||
|
|
||||||
VrlValue::Float(v) => {
|
VrlValue::Float(v) => {
|
||||||
// safe unwrap after type matched
|
// safe unwrap after type matched
|
||||||
resolve_simple_type(
|
resolve_schema(
|
||||||
ValueData::F64Value(v.into()),
|
index,
|
||||||
column_name,
|
p_ctx,
|
||||||
ColumnDataType::Float64,
|
&column_name,
|
||||||
|
&ConcreteDataType::float64_datatype(),
|
||||||
|
schema_info,
|
||||||
)?;
|
)?;
|
||||||
|
Some(ValueData::F64Value(v.into()))
|
||||||
}
|
}
|
||||||
|
|
||||||
VrlValue::Boolean(v) => {
|
VrlValue::Boolean(v) => {
|
||||||
resolve_simple_type(
|
resolve_schema(
|
||||||
ValueData::BoolValue(v),
|
index,
|
||||||
column_name,
|
p_ctx,
|
||||||
ColumnDataType::Boolean,
|
&column_name,
|
||||||
|
&ConcreteDataType::boolean_datatype(),
|
||||||
|
schema_info,
|
||||||
)?;
|
)?;
|
||||||
|
Some(ValueData::BoolValue(v))
|
||||||
}
|
}
|
||||||
|
|
||||||
VrlValue::Bytes(v) => {
|
VrlValue::Bytes(v) => {
|
||||||
resolve_simple_type(
|
resolve_schema(
|
||||||
ValueData::StringValue(String::from_utf8_lossy_owned(v.to_vec())),
|
index,
|
||||||
column_name,
|
p_ctx,
|
||||||
ColumnDataType::String,
|
&column_name,
|
||||||
|
&ConcreteDataType::string_datatype(),
|
||||||
|
schema_info,
|
||||||
)?;
|
)?;
|
||||||
|
Some(ValueData::StringValue(String::from_utf8_lossy_owned(
|
||||||
|
v.to_vec(),
|
||||||
|
)))
|
||||||
}
|
}
|
||||||
|
|
||||||
VrlValue::Regex(v) => {
|
VrlValue::Regex(v) => {
|
||||||
@@ -553,42 +664,83 @@ fn resolve_value(
|
|||||||
"Persisting regex value in the table, this should not happen, column_name: {}",
|
"Persisting regex value in the table, this should not happen, column_name: {}",
|
||||||
column_name
|
column_name
|
||||||
);
|
);
|
||||||
resolve_simple_type(
|
resolve_schema(
|
||||||
ValueData::StringValue(v.to_string()),
|
index,
|
||||||
column_name,
|
p_ctx,
|
||||||
ColumnDataType::String,
|
&column_name,
|
||||||
|
&ConcreteDataType::string_datatype(),
|
||||||
|
schema_info,
|
||||||
)?;
|
)?;
|
||||||
|
Some(ValueData::StringValue(v.to_string()))
|
||||||
}
|
}
|
||||||
|
|
||||||
VrlValue::Timestamp(ts) => {
|
VrlValue::Timestamp(ts) => {
|
||||||
let ns = ts.timestamp_nanos_opt().context(InvalidTimestampSnafu {
|
let ns = ts.timestamp_nanos_opt().context(InvalidTimestampSnafu {
|
||||||
input: ts.to_rfc3339(),
|
input: ts.to_rfc3339(),
|
||||||
})?;
|
})?;
|
||||||
resolve_simple_type(
|
resolve_schema(
|
||||||
ValueData::TimestampNanosecondValue(ns),
|
index,
|
||||||
column_name,
|
p_ctx,
|
||||||
ColumnDataType::TimestampNanosecond,
|
&column_name,
|
||||||
|
&ConcreteDataType::timestamp_nanosecond_datatype(),
|
||||||
|
schema_info,
|
||||||
)?;
|
)?;
|
||||||
|
Some(ValueData::TimestampNanosecondValue(ns))
|
||||||
}
|
}
|
||||||
|
|
||||||
VrlValue::Array(_) | VrlValue::Object(_) => {
|
VrlValue::Array(_) | VrlValue::Object(_) => {
|
||||||
let data = vrl_value_to_jsonb_value(&value);
|
let is_json_native_type = schema_info
|
||||||
resolve_schema(
|
.find_column_schema_in_table(&column_name)
|
||||||
index,
|
.is_some_and(|x| {
|
||||||
ValueData::BinaryValue(data.to_vec()),
|
if let ConcreteDataType::Json(column_type) = &x.column_schema.data_type {
|
||||||
ColumnSchema {
|
column_type.is_native_type()
|
||||||
column_name,
|
} else {
|
||||||
datatype: ColumnDataType::Binary as i32,
|
false
|
||||||
semantic_type: SemanticType::Field as i32,
|
}
|
||||||
datatype_extension: Some(ColumnDataTypeExtension {
|
});
|
||||||
type_ext: Some(TypeExt::JsonType(JsonTypeExtension::JsonBinary.into())),
|
|
||||||
}),
|
let value = if is_json_native_type {
|
||||||
options: None,
|
let json_extension_type: Option<JsonExtensionType> =
|
||||||
},
|
if let Some(x) = schema_info.find_column_schema_in_table(&column_name) {
|
||||||
row,
|
x.column_schema.extension_type()?
|
||||||
schema_info,
|
} else {
|
||||||
)?;
|
None
|
||||||
|
};
|
||||||
|
let settings = json_extension_type
|
||||||
|
.and_then(|x| x.metadata().json_structure_settings.clone())
|
||||||
|
.unwrap_or_default();
|
||||||
|
let value: serde_json::Value = value.try_into().map_err(|e: StdError| {
|
||||||
|
CoerceIncompatibleTypesSnafu { msg: e.to_string() }.build()
|
||||||
|
})?;
|
||||||
|
let value = settings.encode(value)?;
|
||||||
|
|
||||||
|
resolve_schema(index, p_ctx, &column_name, &value.data_type(), schema_info)?;
|
||||||
|
|
||||||
|
let Value::Json(value) = value else {
|
||||||
|
unreachable!()
|
||||||
|
};
|
||||||
|
ValueData::JsonValue(encode_json_value(*value))
|
||||||
|
} else {
|
||||||
|
resolve_schema(
|
||||||
|
index,
|
||||||
|
p_ctx,
|
||||||
|
&column_name,
|
||||||
|
&ConcreteDataType::binary_datatype(),
|
||||||
|
schema_info,
|
||||||
|
)?;
|
||||||
|
|
||||||
|
let value = vrl_value_to_jsonb_value(&value);
|
||||||
|
ValueData::BinaryValue(value.to_vec())
|
||||||
|
};
|
||||||
|
Some(value)
|
||||||
}
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let value = GreptimeValue { value_data };
|
||||||
|
if let Some(index) = index {
|
||||||
|
row[index] = value;
|
||||||
|
} else {
|
||||||
|
row.push(value);
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
@@ -626,20 +778,24 @@ fn identity_pipeline_inner(
|
|||||||
let custom_ts = pipeline_ctx.pipeline_definition.get_custom_ts();
|
let custom_ts = pipeline_ctx.pipeline_definition.get_custom_ts();
|
||||||
|
|
||||||
// set time index column schema first
|
// set time index column schema first
|
||||||
schema_info.schema.push(ColumnSchema {
|
let column_schema = datatypes::schema::ColumnSchema::new(
|
||||||
column_name: custom_ts
|
custom_ts
|
||||||
.map(|ts| ts.get_column_name().to_string())
|
.map(|ts| ts.get_column_name().to_string())
|
||||||
.unwrap_or_else(|| greptime_timestamp().to_string()),
|
.unwrap_or_else(|| greptime_timestamp().to_string()),
|
||||||
datatype: custom_ts.map(|c| c.get_datatype()).unwrap_or_else(|| {
|
custom_ts
|
||||||
if pipeline_ctx.channel == Channel::Prometheus {
|
.map(|c| ConcreteDataType::from(ColumnDataTypeWrapper::new(c.get_datatype(), None)))
|
||||||
ColumnDataType::TimestampMillisecond
|
.unwrap_or_else(|| {
|
||||||
} else {
|
if pipeline_ctx.channel == Channel::Prometheus {
|
||||||
ColumnDataType::TimestampNanosecond
|
ConcreteDataType::timestamp_millisecond_datatype()
|
||||||
}
|
} else {
|
||||||
}) as i32,
|
ConcreteDataType::timestamp_nanosecond_datatype()
|
||||||
semantic_type: SemanticType::Timestamp as i32,
|
}
|
||||||
datatype_extension: None,
|
}),
|
||||||
options: None,
|
false,
|
||||||
|
);
|
||||||
|
schema_info.schema.push(ColumnMetadata {
|
||||||
|
column_schema,
|
||||||
|
semantic_type: SemanticType::Timestamp,
|
||||||
});
|
});
|
||||||
|
|
||||||
let mut opt_map = HashMap::new();
|
let mut opt_map = HashMap::new();
|
||||||
@@ -697,28 +853,29 @@ pub fn identity_pipeline(
|
|||||||
input.push(result);
|
input.push(result);
|
||||||
}
|
}
|
||||||
|
|
||||||
identity_pipeline_inner(input, pipeline_ctx).map(|(mut schema, opt_map)| {
|
identity_pipeline_inner(input, pipeline_ctx).and_then(|(mut schema, opt_map)| {
|
||||||
if let Some(table) = table {
|
if let Some(table) = table {
|
||||||
let table_info = table.table_info();
|
let table_info = table.table_info();
|
||||||
for tag_name in table_info.meta.row_key_column_names() {
|
for tag_name in table_info.meta.row_key_column_names() {
|
||||||
if let Some(index) = schema.index.get(tag_name) {
|
if let Some(index) = schema.index.get(tag_name) {
|
||||||
schema.schema[*index].semantic_type = SemanticType::Tag as i32;
|
schema.schema[*index].semantic_type = SemanticType::Tag;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
opt_map
|
let column_schemas = schema.column_schemas()?;
|
||||||
|
Ok(opt_map
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.map(|(opt, rows)| {
|
.map(|(opt, rows)| {
|
||||||
(
|
(
|
||||||
opt,
|
opt,
|
||||||
Rows {
|
Rows {
|
||||||
schema: schema.schema.clone(),
|
schema: column_schemas.clone(),
|
||||||
rows,
|
rows,
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
})
|
})
|
||||||
.collect::<HashMap<ContextOpt, Rows>>()
|
.collect::<HashMap<ContextOpt, Rows>>())
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -850,7 +1007,7 @@ mod tests {
|
|||||||
assert!(rows.is_err());
|
assert!(rows.is_err());
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
rows.err().unwrap().to_string(),
|
rows.err().unwrap().to_string(),
|
||||||
"Column datatype mismatch. For column: score, expected datatype: FLOAT64, actual datatype: STRING".to_string(),
|
"Column datatype mismatch. For column: score, expected datatype: Float64, actual datatype: String".to_string(),
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
{
|
{
|
||||||
@@ -879,7 +1036,7 @@ mod tests {
|
|||||||
assert!(rows.is_err());
|
assert!(rows.is_err());
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
rows.err().unwrap().to_string(),
|
rows.err().unwrap().to_string(),
|
||||||
"Column datatype mismatch. For column: score, expected datatype: FLOAT64, actual datatype: INT64".to_string(),
|
"Column datatype mismatch. For column: score, expected datatype: Float64, actual datatype: Int64".to_string(),
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
{
|
{
|
||||||
@@ -942,7 +1099,7 @@ mod tests {
|
|||||||
.map(|(mut schema, mut rows)| {
|
.map(|(mut schema, mut rows)| {
|
||||||
for name in tag_column_names {
|
for name in tag_column_names {
|
||||||
if let Some(index) = schema.index.get(&name) {
|
if let Some(index) = schema.index.get(&name) {
|
||||||
schema.schema[*index].semantic_type = SemanticType::Tag as i32;
|
schema.schema[*index].semantic_type = SemanticType::Tag;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -950,7 +1107,7 @@ mod tests {
|
|||||||
let rows = rows.remove(&ContextOpt::default()).unwrap();
|
let rows = rows.remove(&ContextOpt::default()).unwrap();
|
||||||
|
|
||||||
Rows {
|
Rows {
|
||||||
schema: schema.schema,
|
schema: schema.column_schemas().unwrap(),
|
||||||
rows,
|
rows,
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|||||||
@@ -15,6 +15,7 @@
|
|||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
|
use common_telemetry::debug;
|
||||||
use datatypes::timestamp::TimestampNanosecond;
|
use datatypes::timestamp::TimestampNanosecond;
|
||||||
use moka::sync::Cache;
|
use moka::sync::Cache;
|
||||||
|
|
||||||
@@ -33,10 +34,18 @@ const PIPELINES_CACHE_TTL: Duration = Duration::from_secs(10);
|
|||||||
/// to encapsulate inner cache. Only public methods are exposed.
|
/// to encapsulate inner cache. Only public methods are exposed.
|
||||||
pub(crate) struct PipelineCache {
|
pub(crate) struct PipelineCache {
|
||||||
pipelines: Cache<String, Arc<Pipeline>>,
|
pipelines: Cache<String, Arc<Pipeline>>,
|
||||||
original_pipelines: Cache<String, (String, TimestampNanosecond)>,
|
original_pipelines: Cache<String, PipelineContent>,
|
||||||
/// If the pipeline table is invalid, we can use this cache to prevent failures when writing logs through the pipeline
|
/// If the pipeline table is invalid, we can use this cache to prevent failures when writing logs through the pipeline
|
||||||
/// The failover cache never expires, but it will be updated when the pipelines cache is updated.
|
/// The failover cache never expires, but it will be updated when the pipelines cache is updated.
|
||||||
failover_cache: Cache<String, (String, TimestampNanosecond)>,
|
failover_cache: Cache<String, PipelineContent>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone, Debug, PartialEq, Eq)]
|
||||||
|
pub struct PipelineContent {
|
||||||
|
pub name: String,
|
||||||
|
pub content: String,
|
||||||
|
pub version: TimestampNanosecond,
|
||||||
|
pub schema: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl PipelineCache {
|
impl PipelineCache {
|
||||||
@@ -45,12 +54,17 @@ impl PipelineCache {
|
|||||||
pipelines: Cache::builder()
|
pipelines: Cache::builder()
|
||||||
.max_capacity(PIPELINES_CACHE_SIZE)
|
.max_capacity(PIPELINES_CACHE_SIZE)
|
||||||
.time_to_live(PIPELINES_CACHE_TTL)
|
.time_to_live(PIPELINES_CACHE_TTL)
|
||||||
|
.name("pipelines")
|
||||||
.build(),
|
.build(),
|
||||||
original_pipelines: Cache::builder()
|
original_pipelines: Cache::builder()
|
||||||
.max_capacity(PIPELINES_CACHE_SIZE)
|
.max_capacity(PIPELINES_CACHE_SIZE)
|
||||||
.time_to_live(PIPELINES_CACHE_TTL)
|
.time_to_live(PIPELINES_CACHE_TTL)
|
||||||
|
.name("original_pipelines")
|
||||||
|
.build(),
|
||||||
|
failover_cache: Cache::builder()
|
||||||
|
.max_capacity(PIPELINES_CACHE_SIZE)
|
||||||
|
.name("failover_cache")
|
||||||
.build(),
|
.build(),
|
||||||
failover_cache: Cache::builder().max_capacity(PIPELINES_CACHE_SIZE).build(),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -72,19 +86,15 @@ impl PipelineCache {
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn insert_pipeline_str_cache(
|
pub(crate) fn insert_pipeline_str_cache(&self, pipeline: &PipelineContent, with_latest: bool) {
|
||||||
&self,
|
let schema = pipeline.schema.as_str();
|
||||||
schema: &str,
|
let name = pipeline.name.as_str();
|
||||||
name: &str,
|
let version = pipeline.version;
|
||||||
version: PipelineVersion,
|
|
||||||
pipeline: (String, TimestampNanosecond),
|
|
||||||
with_latest: bool,
|
|
||||||
) {
|
|
||||||
insert_cache_generic(
|
insert_cache_generic(
|
||||||
&self.original_pipelines,
|
&self.original_pipelines,
|
||||||
schema,
|
schema,
|
||||||
name,
|
name,
|
||||||
version,
|
Some(version),
|
||||||
pipeline.clone(),
|
pipeline.clone(),
|
||||||
with_latest,
|
with_latest,
|
||||||
);
|
);
|
||||||
@@ -92,8 +102,8 @@ impl PipelineCache {
|
|||||||
&self.failover_cache,
|
&self.failover_cache,
|
||||||
schema,
|
schema,
|
||||||
name,
|
name,
|
||||||
version,
|
Some(version),
|
||||||
pipeline,
|
pipeline.clone(),
|
||||||
with_latest,
|
with_latest,
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
@@ -112,7 +122,7 @@ impl PipelineCache {
|
|||||||
schema: &str,
|
schema: &str,
|
||||||
name: &str,
|
name: &str,
|
||||||
version: PipelineVersion,
|
version: PipelineVersion,
|
||||||
) -> Result<Option<(String, TimestampNanosecond)>> {
|
) -> Result<Option<PipelineContent>> {
|
||||||
get_cache_generic(&self.failover_cache, schema, name, version)
|
get_cache_generic(&self.failover_cache, schema, name, version)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -121,7 +131,7 @@ impl PipelineCache {
|
|||||||
schema: &str,
|
schema: &str,
|
||||||
name: &str,
|
name: &str,
|
||||||
version: PipelineVersion,
|
version: PipelineVersion,
|
||||||
) -> Result<Option<(String, TimestampNanosecond)>> {
|
) -> Result<Option<PipelineContent>> {
|
||||||
get_cache_generic(&self.original_pipelines, schema, name, version)
|
get_cache_generic(&self.original_pipelines, schema, name, version)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -174,13 +184,13 @@ fn get_cache_generic<T: Clone + Send + Sync + 'static>(
|
|||||||
version: PipelineVersion,
|
version: PipelineVersion,
|
||||||
) -> Result<Option<T>> {
|
) -> Result<Option<T>> {
|
||||||
// lets try empty schema first
|
// lets try empty schema first
|
||||||
let k = generate_pipeline_cache_key(EMPTY_SCHEMA_NAME, name, version);
|
let emp_key = generate_pipeline_cache_key(EMPTY_SCHEMA_NAME, name, version);
|
||||||
if let Some(value) = cache.get(&k) {
|
if let Some(value) = cache.get(&emp_key) {
|
||||||
return Ok(Some(value));
|
return Ok(Some(value));
|
||||||
}
|
}
|
||||||
// use input schema
|
// use input schema
|
||||||
let k = generate_pipeline_cache_key(schema, name, version);
|
let schema_k = generate_pipeline_cache_key(schema, name, version);
|
||||||
if let Some(value) = cache.get(&k) {
|
if let Some(value) = cache.get(&schema_k) {
|
||||||
return Ok(Some(value));
|
return Ok(Some(value));
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -193,14 +203,28 @@ fn get_cache_generic<T: Clone + Send + Sync + 'static>(
|
|||||||
|
|
||||||
match ks.len() {
|
match ks.len() {
|
||||||
0 => Ok(None),
|
0 => Ok(None),
|
||||||
1 => Ok(Some(ks.remove(0).1)),
|
1 => {
|
||||||
_ => MultiPipelineWithDiffSchemaSnafu {
|
let (_, value) = ks.remove(0);
|
||||||
schemas: ks
|
Ok(Some(value))
|
||||||
.iter()
|
}
|
||||||
.filter_map(|(k, _)| k.split_once('/').map(|k| k.0))
|
_ => {
|
||||||
.collect::<Vec<_>>()
|
debug!(
|
||||||
.join(","),
|
"caches keys: {:?}, emp key: {:?}, schema key: {:?}, suffix key: {:?}",
|
||||||
|
cache.iter().map(|e| e.0).collect::<Vec<_>>(),
|
||||||
|
emp_key,
|
||||||
|
schema_k,
|
||||||
|
suffix_key
|
||||||
|
);
|
||||||
|
MultiPipelineWithDiffSchemaSnafu {
|
||||||
|
name: name.to_string(),
|
||||||
|
current_schema: schema.to_string(),
|
||||||
|
schemas: ks
|
||||||
|
.iter()
|
||||||
|
.filter_map(|(k, _)| k.split_once('/').map(|k| k.0))
|
||||||
|
.collect::<Vec<_>>()
|
||||||
|
.join(","),
|
||||||
|
}
|
||||||
|
.fail()?
|
||||||
}
|
}
|
||||||
.fail()?,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -220,6 +220,7 @@ impl PipelineOperator {
|
|||||||
.observe(timer.elapsed().as_secs_f64())
|
.observe(timer.elapsed().as_secs_f64())
|
||||||
})
|
})
|
||||||
.await
|
.await
|
||||||
|
.map(|p| (p.content, p.version))
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Insert a pipeline into the pipeline table.
|
/// Insert a pipeline into the pipeline table.
|
||||||
|
|||||||
@@ -46,7 +46,7 @@ use crate::error::{
|
|||||||
MultiPipelineWithDiffSchemaSnafu, PipelineNotFoundSnafu, RecordBatchLenNotMatchSnafu, Result,
|
MultiPipelineWithDiffSchemaSnafu, PipelineNotFoundSnafu, RecordBatchLenNotMatchSnafu, Result,
|
||||||
};
|
};
|
||||||
use crate::etl::{Content, Pipeline, parse};
|
use crate::etl::{Content, Pipeline, parse};
|
||||||
use crate::manager::pipeline_cache::PipelineCache;
|
use crate::manager::pipeline_cache::{PipelineCache, PipelineContent};
|
||||||
use crate::manager::{PipelineInfo, PipelineVersion};
|
use crate::manager::{PipelineInfo, PipelineVersion};
|
||||||
use crate::metrics::METRIC_PIPELINE_TABLE_FIND_COUNT;
|
use crate::metrics::METRIC_PIPELINE_TABLE_FIND_COUNT;
|
||||||
use crate::util::prepare_dataframe_conditions;
|
use crate::util::prepare_dataframe_conditions;
|
||||||
@@ -260,17 +260,22 @@ impl PipelineTable {
|
|||||||
&self,
|
&self,
|
||||||
schema: &str,
|
schema: &str,
|
||||||
name: &str,
|
name: &str,
|
||||||
version: PipelineVersion,
|
input_version: PipelineVersion,
|
||||||
) -> Result<Arc<Pipeline>> {
|
) -> Result<Arc<Pipeline>> {
|
||||||
if let Some(pipeline) = self.cache.get_pipeline_cache(schema, name, version)? {
|
if let Some(pipeline) = self.cache.get_pipeline_cache(schema, name, input_version)? {
|
||||||
return Ok(pipeline);
|
return Ok(pipeline);
|
||||||
}
|
}
|
||||||
|
|
||||||
let pipeline = self.get_pipeline_str(schema, name, version).await?;
|
let pipeline_content = self.get_pipeline_str(schema, name, input_version).await?;
|
||||||
let compiled_pipeline = Arc::new(Self::compile_pipeline(&pipeline.0)?);
|
let compiled_pipeline = Arc::new(Self::compile_pipeline(&pipeline_content.content)?);
|
||||||
|
|
||||||
self.cache
|
self.cache.insert_pipeline_cache(
|
||||||
.insert_pipeline_cache(schema, name, version, compiled_pipeline.clone(), false);
|
&pipeline_content.schema,
|
||||||
|
name,
|
||||||
|
Some(pipeline_content.version),
|
||||||
|
compiled_pipeline.clone(),
|
||||||
|
input_version.is_none(),
|
||||||
|
);
|
||||||
Ok(compiled_pipeline)
|
Ok(compiled_pipeline)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -280,14 +285,17 @@ impl PipelineTable {
|
|||||||
&self,
|
&self,
|
||||||
schema: &str,
|
schema: &str,
|
||||||
name: &str,
|
name: &str,
|
||||||
version: PipelineVersion,
|
input_version: PipelineVersion,
|
||||||
) -> Result<(String, TimestampNanosecond)> {
|
) -> Result<PipelineContent> {
|
||||||
if let Some(pipeline) = self.cache.get_pipeline_str_cache(schema, name, version)? {
|
if let Some(pipeline) = self
|
||||||
|
.cache
|
||||||
|
.get_pipeline_str_cache(schema, name, input_version)?
|
||||||
|
{
|
||||||
return Ok(pipeline);
|
return Ok(pipeline);
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut pipeline_vec;
|
let mut pipeline_vec;
|
||||||
match self.find_pipeline(name, version).await {
|
match self.find_pipeline(name, input_version).await {
|
||||||
Ok(p) => {
|
Ok(p) => {
|
||||||
METRIC_PIPELINE_TABLE_FIND_COUNT
|
METRIC_PIPELINE_TABLE_FIND_COUNT
|
||||||
.with_label_values(&["true"])
|
.with_label_values(&["true"])
|
||||||
@@ -304,8 +312,11 @@ impl PipelineTable {
|
|||||||
.inc();
|
.inc();
|
||||||
return self
|
return self
|
||||||
.cache
|
.cache
|
||||||
.get_failover_cache(schema, name, version)?
|
.get_failover_cache(schema, name, input_version)?
|
||||||
.ok_or(PipelineNotFoundSnafu { name, version }.build());
|
.context(PipelineNotFoundSnafu {
|
||||||
|
name,
|
||||||
|
version: input_version,
|
||||||
|
});
|
||||||
}
|
}
|
||||||
_ => {
|
_ => {
|
||||||
// if other error, we should return it
|
// if other error, we should return it
|
||||||
@@ -316,42 +327,40 @@ impl PipelineTable {
|
|||||||
};
|
};
|
||||||
ensure!(
|
ensure!(
|
||||||
!pipeline_vec.is_empty(),
|
!pipeline_vec.is_empty(),
|
||||||
PipelineNotFoundSnafu { name, version }
|
PipelineNotFoundSnafu {
|
||||||
|
name,
|
||||||
|
version: input_version
|
||||||
|
}
|
||||||
);
|
);
|
||||||
|
|
||||||
// if the result is exact one, use it
|
// if the result is exact one, use it
|
||||||
if pipeline_vec.len() == 1 {
|
if pipeline_vec.len() == 1 {
|
||||||
let (pipeline_content, found_schema, version) = pipeline_vec.remove(0);
|
let pipeline_content = pipeline_vec.remove(0);
|
||||||
let p = (pipeline_content, version);
|
|
||||||
self.cache.insert_pipeline_str_cache(
|
self.cache
|
||||||
&found_schema,
|
.insert_pipeline_str_cache(&pipeline_content, input_version.is_none());
|
||||||
name,
|
return Ok(pipeline_content);
|
||||||
Some(version),
|
|
||||||
p.clone(),
|
|
||||||
false,
|
|
||||||
);
|
|
||||||
return Ok(p);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// check if there's empty schema pipeline
|
// check if there's empty schema pipeline
|
||||||
// if there isn't, check current schema
|
// if there isn't, check current schema
|
||||||
let pipeline = pipeline_vec
|
let pipeline = pipeline_vec
|
||||||
.iter()
|
.iter()
|
||||||
.find(|v| v.1 == EMPTY_SCHEMA_NAME)
|
.position(|v| v.schema == EMPTY_SCHEMA_NAME)
|
||||||
.or_else(|| pipeline_vec.iter().find(|v| v.1 == schema));
|
.or_else(|| pipeline_vec.iter().position(|v| v.schema == schema))
|
||||||
|
.map(|idx| pipeline_vec.remove(idx));
|
||||||
|
|
||||||
// multiple pipeline with no empty or current schema
|
// multiple pipeline with no empty or current schema
|
||||||
// throw an error
|
// throw an error
|
||||||
let (pipeline_content, found_schema, version) =
|
let pipeline_content = pipeline.with_context(|| MultiPipelineWithDiffSchemaSnafu {
|
||||||
pipeline.context(MultiPipelineWithDiffSchemaSnafu {
|
name: name.to_string(),
|
||||||
schemas: pipeline_vec.iter().map(|v| v.1.clone()).join(","),
|
current_schema: schema.to_string(),
|
||||||
})?;
|
schemas: pipeline_vec.iter().map(|v| v.schema.clone()).join(","),
|
||||||
|
})?;
|
||||||
|
|
||||||
let v = *version;
|
|
||||||
let p = (pipeline_content.clone(), v);
|
|
||||||
self.cache
|
self.cache
|
||||||
.insert_pipeline_str_cache(found_schema, name, Some(v), p.clone(), false);
|
.insert_pipeline_str_cache(&pipeline_content, input_version.is_none());
|
||||||
Ok(p)
|
Ok(pipeline_content)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Insert a pipeline into the pipeline table and compile it.
|
/// Insert a pipeline into the pipeline table and compile it.
|
||||||
@@ -378,13 +387,15 @@ impl PipelineTable {
|
|||||||
true,
|
true,
|
||||||
);
|
);
|
||||||
|
|
||||||
self.cache.insert_pipeline_str_cache(
|
let pipeline_content = PipelineContent {
|
||||||
EMPTY_SCHEMA_NAME,
|
name: name.to_string(),
|
||||||
name,
|
content: pipeline.to_string(),
|
||||||
Some(TimestampNanosecond(version)),
|
version: TimestampNanosecond(version),
|
||||||
(pipeline.to_owned(), TimestampNanosecond(version)),
|
schema: EMPTY_SCHEMA_NAME.to_string(),
|
||||||
true,
|
};
|
||||||
);
|
|
||||||
|
self.cache
|
||||||
|
.insert_pipeline_str_cache(&pipeline_content, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok((version, compiled_pipeline))
|
Ok((version, compiled_pipeline))
|
||||||
@@ -466,7 +477,7 @@ impl PipelineTable {
|
|||||||
&self,
|
&self,
|
||||||
name: &str,
|
name: &str,
|
||||||
version: PipelineVersion,
|
version: PipelineVersion,
|
||||||
) -> Result<Vec<(String, String, TimestampNanosecond)>> {
|
) -> Result<Vec<PipelineContent>> {
|
||||||
// 1. prepare dataframe
|
// 1. prepare dataframe
|
||||||
let dataframe = self
|
let dataframe = self
|
||||||
.query_engine
|
.query_engine
|
||||||
@@ -566,11 +577,12 @@ impl PipelineTable {
|
|||||||
|
|
||||||
let len = pipeline_content.len();
|
let len = pipeline_content.len();
|
||||||
for i in 0..len {
|
for i in 0..len {
|
||||||
re.push((
|
re.push(PipelineContent {
|
||||||
pipeline_content.value(i).to_string(),
|
name: name.to_string(),
|
||||||
pipeline_schema.value(i).to_string(),
|
content: pipeline_content.value(i).to_string(),
|
||||||
TimestampNanosecond::new(pipeline_created_at.value(i)),
|
version: TimestampNanosecond::new(pipeline_created_at.value(i)),
|
||||||
));
|
schema: pipeline_schema.value(i).to_string(),
|
||||||
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -61,7 +61,7 @@ pub fn parse_and_exec(input_str: &str, pipeline_yaml: &str) -> Rows {
|
|||||||
}
|
}
|
||||||
|
|
||||||
Rows {
|
Rows {
|
||||||
schema: schema_info.schema.clone(),
|
schema: schema_info.column_schemas().unwrap(),
|
||||||
rows,
|
rows,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -52,7 +52,7 @@ transform:
|
|||||||
|
|
||||||
// check schema
|
// check schema
|
||||||
assert_eq!(output.schema[0].column_name, "commit");
|
assert_eq!(output.schema[0].column_name, "commit");
|
||||||
let type_id: i32 = ColumnDataType::Binary.into();
|
let type_id: i32 = ColumnDataType::Json.into();
|
||||||
assert_eq!(output.schema[0].datatype, type_id);
|
assert_eq!(output.schema[0].datatype, type_id);
|
||||||
|
|
||||||
// check value
|
// check value
|
||||||
@@ -91,7 +91,7 @@ transform:
|
|||||||
|
|
||||||
// check schema
|
// check schema
|
||||||
assert_eq!(output.schema[0].column_name, "commit_json");
|
assert_eq!(output.schema[0].column_name, "commit_json");
|
||||||
let type_id: i32 = ColumnDataType::Binary.into();
|
let type_id: i32 = ColumnDataType::Json.into();
|
||||||
assert_eq!(output.schema[0].datatype, type_id);
|
assert_eq!(output.schema[0].datatype, type_id);
|
||||||
|
|
||||||
// check value
|
// check value
|
||||||
@@ -160,7 +160,7 @@ transform:
|
|||||||
|
|
||||||
// check schema
|
// check schema
|
||||||
assert_eq!(output.schema[0].column_name, "commit");
|
assert_eq!(output.schema[0].column_name, "commit");
|
||||||
let type_id: i32 = ColumnDataType::Binary.into();
|
let type_id: i32 = ColumnDataType::Json.into();
|
||||||
assert_eq!(output.schema[0].datatype, type_id);
|
assert_eq!(output.schema[0].datatype, type_id);
|
||||||
|
|
||||||
// check value
|
// check value
|
||||||
|
|||||||
@@ -281,18 +281,18 @@ struct PlanRewriter {
|
|||||||
/// 2: Sort: t.pk1+t.pk2
|
/// 2: Sort: t.pk1+t.pk2
|
||||||
/// 3. Projection: t.number, t.pk1, t.pk2
|
/// 3. Projection: t.number, t.pk1, t.pk2
|
||||||
/// ```
|
/// ```
|
||||||
/// `Sort` will make a column requirement for `t.pk1` at level 2.
|
/// `Sort` will make a column requirement for `t.pk1+t.pk2` at level 2.
|
||||||
/// Which making `Projection` at level 1 need to add a ref to `t.pk1` as well.
|
/// Which making `Projection` at level 1 need to add a ref to `t.pk1` as well.
|
||||||
/// So that the expanded plan will be
|
/// So that the expanded plan will be
|
||||||
/// ```ignore
|
/// ```ignore
|
||||||
/// Projection: t.number
|
/// Projection: t.number
|
||||||
/// MergeSort: t.pk1
|
/// MergeSort: t.pk1+t.pk2
|
||||||
/// MergeScan: remote_input=
|
/// MergeScan: remote_input=
|
||||||
/// Projection: t.number, "t.pk1+t.pk2" <--- the original `Projection` at level 1 get added with `t.pk1+t.pk2`
|
/// Projection: t.number, "t.pk1+t.pk2" <--- the original `Projection` at level 1 get added with `t.pk1+t.pk2`
|
||||||
/// Sort: t.pk1+t.pk2
|
/// Sort: t.pk1+t.pk2
|
||||||
/// Projection: t.number, t.pk1, t.pk2
|
/// Projection: t.number, t.pk1, t.pk2
|
||||||
/// ```
|
/// ```
|
||||||
/// Making `MergeSort` can have `t.pk1` as input.
|
/// Making `MergeSort` can have `t.pk1+t.pk2` as input.
|
||||||
/// Meanwhile `Projection` at level 3 doesn't need to add any new column because 3 > 2
|
/// Meanwhile `Projection` at level 3 doesn't need to add any new column because 3 > 2
|
||||||
/// and col requirements at level 2 is not applicable for level 3.
|
/// and col requirements at level 2 is not applicable for level 3.
|
||||||
///
|
///
|
||||||
@@ -392,10 +392,11 @@ impl PlanRewriter {
|
|||||||
&& ext_b.node.name() == MergeSortLogicalPlan::name()
|
&& ext_b.node.name() == MergeSortLogicalPlan::name()
|
||||||
{
|
{
|
||||||
// revert last `ConditionalCommutative` result for Sort plan in this case.
|
// revert last `ConditionalCommutative` result for Sort plan in this case.
|
||||||
// `update_column_requirements` left unchanged because Sort won't generate
|
// also need to remove any column requirements made by the Sort Plan
|
||||||
// new columns or remove existing columns.
|
// as it may refer to columns later no longer exist(rightfully) like by aggregate or projection
|
||||||
self.stage.pop();
|
self.stage.pop();
|
||||||
self.expand_on_next_part_cond_trans_commutative = false;
|
self.expand_on_next_part_cond_trans_commutative = false;
|
||||||
|
self.column_requirements.clear();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Commutativity::PartialCommutative => {
|
Commutativity::PartialCommutative => {
|
||||||
@@ -680,6 +681,10 @@ struct EnforceDistRequirementRewriter {
|
|||||||
|
|
||||||
impl EnforceDistRequirementRewriter {
|
impl EnforceDistRequirementRewriter {
|
||||||
fn new(column_requirements: Vec<(HashSet<Column>, usize)>, cur_level: usize) -> Self {
|
fn new(column_requirements: Vec<(HashSet<Column>, usize)>, cur_level: usize) -> Self {
|
||||||
|
debug!(
|
||||||
|
"Create EnforceDistRequirementRewriter with column_requirements: {:?} at cur_level: {}",
|
||||||
|
column_requirements, cur_level
|
||||||
|
);
|
||||||
Self {
|
Self {
|
||||||
column_requirements,
|
column_requirements,
|
||||||
cur_level,
|
cur_level,
|
||||||
@@ -733,7 +738,7 @@ impl EnforceDistRequirementRewriter {
|
|||||||
.filter(|a| !a.is_empty())
|
.filter(|a| !a.is_empty())
|
||||||
else {
|
else {
|
||||||
return Err(datafusion_common::DataFusionError::Internal(format!(
|
return Err(datafusion_common::DataFusionError::Internal(format!(
|
||||||
"EnforceDistRequirementRewriter: no alias found for required column {original_col} in child plan {child} from original plan {original}",
|
"EnforceDistRequirementRewriter: no alias found for required column {original_col} at level {level} in current node's child plan: \n{child} from original plan: \n{original}",
|
||||||
)));
|
)));
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|||||||
@@ -777,6 +777,67 @@ fn expand_step_aggr_proj() {
|
|||||||
assert_eq!(expected, result.to_string());
|
assert_eq!(expected, result.to_string());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Make sure that `SeriesDivide` special handling correctly clean up column requirements from it's previous sort
|
||||||
|
#[test]
|
||||||
|
fn expand_complex_col_req_sort_pql() {
|
||||||
|
// use logging for better debugging
|
||||||
|
init_default_ut_logging();
|
||||||
|
let test_table = TestTable::table_with_name(0, "t".to_string());
|
||||||
|
let table_source = Arc::new(DefaultTableSource::new(Arc::new(
|
||||||
|
DfTableProviderAdapter::new(test_table),
|
||||||
|
)));
|
||||||
|
|
||||||
|
let plan = LogicalPlanBuilder::scan_with_filters("t", table_source.clone(), None, vec![])
|
||||||
|
.unwrap()
|
||||||
|
.sort(vec![
|
||||||
|
col("pk1").sort(true, false),
|
||||||
|
col("pk2").sort(true, false),
|
||||||
|
col("pk3").sort(true, false), // make some col req here
|
||||||
|
])
|
||||||
|
.unwrap()
|
||||||
|
.build()
|
||||||
|
.unwrap();
|
||||||
|
let plan = SeriesDivide::new(
|
||||||
|
vec!["pk1".to_string(), "pk2".to_string(), "pk3".to_string()],
|
||||||
|
"ts".to_string(),
|
||||||
|
plan,
|
||||||
|
);
|
||||||
|
let plan = LogicalPlan::Extension(datafusion_expr::Extension {
|
||||||
|
node: Arc::new(plan),
|
||||||
|
});
|
||||||
|
|
||||||
|
let plan = LogicalPlanBuilder::from(plan)
|
||||||
|
.aggregate(vec![col("pk1"), col("pk2")], vec![min(col("number"))])
|
||||||
|
.unwrap()
|
||||||
|
.sort(vec![
|
||||||
|
col("pk1").sort(true, false),
|
||||||
|
col("pk2").sort(true, false),
|
||||||
|
])
|
||||||
|
.unwrap()
|
||||||
|
.project(vec![col("pk1"), col("pk2")])
|
||||||
|
.unwrap()
|
||||||
|
.build()
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let config = ConfigOptions::default();
|
||||||
|
let result = DistPlannerAnalyzer {}.analyze(plan, &config).unwrap();
|
||||||
|
|
||||||
|
let expected = [
|
||||||
|
"Projection: t.pk1, t.pk2",
|
||||||
|
" MergeSort: t.pk1 ASC NULLS LAST, t.pk2 ASC NULLS LAST",
|
||||||
|
" MergeScan [is_placeholder=false, remote_input=[",
|
||||||
|
"Projection: t.pk1, t.pk2",
|
||||||
|
" Sort: t.pk1 ASC NULLS LAST, t.pk2 ASC NULLS LAST",
|
||||||
|
" Aggregate: groupBy=[[t.pk1, t.pk2]], aggr=[[min(t.number)]]",
|
||||||
|
r#" PromSeriesDivide: tags=["pk1", "pk2", "pk3"]"#,
|
||||||
|
" Sort: t.pk1 ASC NULLS LAST, t.pk2 ASC NULLS LAST, t.pk3 ASC NULLS LAST",
|
||||||
|
" TableScan: t",
|
||||||
|
"]]",
|
||||||
|
]
|
||||||
|
.join("\n");
|
||||||
|
assert_eq!(expected, result.to_string());
|
||||||
|
}
|
||||||
|
|
||||||
/// should only expand `Sort`, notice `Sort` before `Aggregate` usually can and
|
/// should only expand `Sort`, notice `Sort` before `Aggregate` usually can and
|
||||||
/// will be optimized out, and dist planner shouldn't handle that case, but
|
/// will be optimized out, and dist planner shouldn't handle that case, but
|
||||||
/// for now, still handle that be expanding the `Sort` node
|
/// for now, still handle that be expanding the `Sort` node
|
||||||
|
|||||||
@@ -144,7 +144,7 @@ impl Categorizer {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
// all group by expressions are partition columns can push down, unless
|
// all group by expressions are partition columns can push down, unless
|
||||||
// another push down(including `Limit` or `Sort`) is already in progress(which will then prvent next cond commutative node from being push down).
|
// another push down(including `Limit` or `Sort`) is already in progress(which will then prevent next cond commutative node from being push down).
|
||||||
// TODO(discord9): This is a temporary solution(that works), a better description of
|
// TODO(discord9): This is a temporary solution(that works), a better description of
|
||||||
// commutativity is needed under this situation.
|
// commutativity is needed under this situation.
|
||||||
Commutativity::ConditionalCommutative(None)
|
Commutativity::ConditionalCommutative(None)
|
||||||
|
|||||||
@@ -234,7 +234,7 @@ impl QueryEngineState {
|
|||||||
rules.retain(|rule| rule.name() != name);
|
rules.retain(|rule| rule.name() != name);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Optimize the logical plan by the extension anayzer rules.
|
/// Optimize the logical plan by the extension analyzer rules.
|
||||||
pub fn optimize_by_extension_rules(
|
pub fn optimize_by_extension_rules(
|
||||||
&self,
|
&self,
|
||||||
plan: DfLogicalPlan,
|
plan: DfLogicalPlan,
|
||||||
|
|||||||
@@ -664,6 +664,13 @@ pub enum Error {
|
|||||||
#[snafu(implicit)]
|
#[snafu(implicit)]
|
||||||
location: Location,
|
location: Location,
|
||||||
},
|
},
|
||||||
|
|
||||||
|
#[snafu(transparent)]
|
||||||
|
GreptimeProto {
|
||||||
|
source: api::error::Error,
|
||||||
|
#[snafu(implicit)]
|
||||||
|
location: Location,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
pub type Result<T, E = Error> = std::result::Result<T, E>;
|
pub type Result<T, E = Error> = std::result::Result<T, E>;
|
||||||
@@ -794,6 +801,8 @@ impl ErrorExt for Error {
|
|||||||
Suspended { .. } => StatusCode::Suspended,
|
Suspended { .. } => StatusCode::Suspended,
|
||||||
|
|
||||||
MemoryLimitExceeded { .. } => StatusCode::RateLimited,
|
MemoryLimitExceeded { .. } => StatusCode::RateLimited,
|
||||||
|
|
||||||
|
GreptimeProto { source, .. } => source.status_code(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -23,6 +23,7 @@ pub mod memory_limit;
|
|||||||
pub mod prom_query_gateway;
|
pub mod prom_query_gateway;
|
||||||
pub mod region_server;
|
pub mod region_server;
|
||||||
|
|
||||||
|
use std::any::Any;
|
||||||
use std::net::SocketAddr;
|
use std::net::SocketAddr;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
@@ -399,4 +400,8 @@ impl Server for GrpcServer {
|
|||||||
fn bind_addr(&self) -> Option<SocketAddr> {
|
fn bind_addr(&self) -> Option<SocketAddr> {
|
||||||
self.bind_addr
|
self.bind_addr
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn as_any(&self) -> &dyn Any {
|
||||||
|
self
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1285,6 +1285,10 @@ impl Server for HttpServer {
|
|||||||
fn bind_addr(&self) -> Option<SocketAddr> {
|
fn bind_addr(&self) -> Option<SocketAddr> {
|
||||||
self.bind_addr
|
self.bind_addr
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn as_any(&self) -> &dyn std::any::Any {
|
||||||
|
self
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
|
|||||||
@@ -31,7 +31,7 @@ use axum_extra::TypedHeader;
|
|||||||
use common_catalog::consts::default_engine;
|
use common_catalog::consts::default_engine;
|
||||||
use common_error::ext::{BoxedError, ErrorExt};
|
use common_error::ext::{BoxedError, ErrorExt};
|
||||||
use common_query::{Output, OutputData};
|
use common_query::{Output, OutputData};
|
||||||
use common_telemetry::{debug, error, warn};
|
use common_telemetry::{error, warn};
|
||||||
use headers::ContentType;
|
use headers::ContentType;
|
||||||
use lazy_static::lazy_static;
|
use lazy_static::lazy_static;
|
||||||
use mime_guess::mime;
|
use mime_guess::mime;
|
||||||
@@ -738,11 +738,6 @@ pub async fn log_ingester(
|
|||||||
|
|
||||||
let value = extract_pipeline_value_by_content_type(content_type, payload, ignore_errors)?;
|
let value = extract_pipeline_value_by_content_type(content_type, payload, ignore_errors)?;
|
||||||
|
|
||||||
debug!(
|
|
||||||
"receiving logs: {:?}",
|
|
||||||
serde_json::to_string(&value).unwrap()
|
|
||||||
);
|
|
||||||
|
|
||||||
query_ctx.set_channel(Channel::Log);
|
query_ctx.set_channel(Channel::Log);
|
||||||
let query_ctx = Arc::new(query_ctx);
|
let query_ctx = Arc::new(query_ctx);
|
||||||
|
|
||||||
|
|||||||
@@ -152,7 +152,7 @@ pub async fn loki_ingest(
|
|||||||
rows.push(row);
|
rows.push(row);
|
||||||
}
|
}
|
||||||
|
|
||||||
let schemas = schema_info.schema;
|
let schemas = schema_info.column_schemas()?;
|
||||||
// fill Null for missing values
|
// fill Null for missing values
|
||||||
for row in rows.iter_mut() {
|
for row in rows.iter_mut() {
|
||||||
row.resize(schemas.len(), GreptimeValue::default());
|
row.resize(schemas.len(), GreptimeValue::default());
|
||||||
@@ -746,13 +746,16 @@ fn process_labels(
|
|||||||
} else {
|
} else {
|
||||||
// not exist
|
// not exist
|
||||||
// add schema and append to values
|
// add schema and append to values
|
||||||
schemas.push(ColumnSchema {
|
schemas.push(
|
||||||
column_name: k.clone(),
|
ColumnSchema {
|
||||||
datatype: ColumnDataType::String.into(),
|
column_name: k.clone(),
|
||||||
semantic_type: SemanticType::Tag.into(),
|
datatype: ColumnDataType::String.into(),
|
||||||
datatype_extension: None,
|
semantic_type: SemanticType::Tag.into(),
|
||||||
options: None,
|
datatype_extension: None,
|
||||||
});
|
options: None,
|
||||||
|
}
|
||||||
|
.into(),
|
||||||
|
);
|
||||||
column_indexer.insert(k, schemas.len() - 1);
|
column_indexer.insert(k, schemas.len() - 1);
|
||||||
|
|
||||||
row.push(GreptimeValue {
|
row.push(GreptimeValue {
|
||||||
|
|||||||
@@ -12,6 +12,7 @@
|
|||||||
// See the License for the specific language governing permissions and
|
// See the License for the specific language governing permissions and
|
||||||
// limitations under the License.
|
// limitations under the License.
|
||||||
|
|
||||||
|
use std::any::Any;
|
||||||
use std::future::Future;
|
use std::future::Future;
|
||||||
use std::net::SocketAddr;
|
use std::net::SocketAddr;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
@@ -265,4 +266,8 @@ impl Server for MysqlServer {
|
|||||||
fn bind_addr(&self) -> Option<SocketAddr> {
|
fn bind_addr(&self) -> Option<SocketAddr> {
|
||||||
self.bind_addr
|
self.bind_addr
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn as_any(&self) -> &dyn Any {
|
||||||
|
self
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -381,6 +381,7 @@ fn extract_field_from_attr_and_combine_schema(
|
|||||||
|
|
||||||
if let Some(index) = select_schema.index.get(key) {
|
if let Some(index) = select_schema.index.get(key) {
|
||||||
let column_schema = &select_schema.schema[*index];
|
let column_schema = &select_schema.schema[*index];
|
||||||
|
let column_schema: ColumnSchema = column_schema.clone().try_into()?;
|
||||||
// datatype of the same column name should be the same
|
// datatype of the same column name should be the same
|
||||||
ensure!(
|
ensure!(
|
||||||
column_schema.datatype == schema.datatype,
|
column_schema.datatype == schema.datatype,
|
||||||
@@ -393,7 +394,7 @@ fn extract_field_from_attr_and_combine_schema(
|
|||||||
);
|
);
|
||||||
extracted_values[*index] = value;
|
extracted_values[*index] = value;
|
||||||
} else {
|
} else {
|
||||||
select_schema.schema.push(schema);
|
select_schema.schema.push(schema.into());
|
||||||
select_schema
|
select_schema
|
||||||
.index
|
.index
|
||||||
.insert(key.clone(), select_schema.schema.len() - 1);
|
.insert(key.clone(), select_schema.schema.len() - 1);
|
||||||
@@ -480,7 +481,7 @@ fn parse_export_logs_service_request_to_rows(
|
|||||||
let mut parse_ctx = ParseContext::new(select_info);
|
let mut parse_ctx = ParseContext::new(select_info);
|
||||||
let mut rows = parse_resource(&mut parse_ctx, request.resource_logs)?;
|
let mut rows = parse_resource(&mut parse_ctx, request.resource_logs)?;
|
||||||
|
|
||||||
schemas.extend(parse_ctx.select_schema.schema);
|
schemas.extend(parse_ctx.select_schema.column_schemas()?);
|
||||||
|
|
||||||
rows.iter_mut().for_each(|row| {
|
rows.iter_mut().for_each(|row| {
|
||||||
row.values.resize(schemas.len(), GreptimeValue::default());
|
row.values.resize(schemas.len(), GreptimeValue::default());
|
||||||
|
|||||||
@@ -135,12 +135,18 @@ async fn run_custom_pipeline(
|
|||||||
let mut schema_info = SchemaInfo::default();
|
let mut schema_info = SchemaInfo::default();
|
||||||
schema_info
|
schema_info
|
||||||
.schema
|
.schema
|
||||||
.push(time_index_column_schema(ts_name, timeunit));
|
.push(time_index_column_schema(ts_name, timeunit).into());
|
||||||
|
|
||||||
schema_info
|
schema_info
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
let table = handler
|
||||||
|
.get_table(&table_name, query_ctx)
|
||||||
|
.await
|
||||||
|
.context(CatalogSnafu)?;
|
||||||
|
schema_info.set_table(table);
|
||||||
|
|
||||||
for pipeline_map in pipeline_maps {
|
for pipeline_map in pipeline_maps {
|
||||||
let result = pipeline
|
let result = pipeline
|
||||||
.exec_mut(pipeline_map, pipeline_ctx, &mut schema_info)
|
.exec_mut(pipeline_map, pipeline_ctx, &mut schema_info)
|
||||||
@@ -194,7 +200,7 @@ async fn run_custom_pipeline(
|
|||||||
RowInsertRequest {
|
RowInsertRequest {
|
||||||
rows: Some(Rows {
|
rows: Some(Rows {
|
||||||
rows,
|
rows,
|
||||||
schema: schema_info.schema.clone(),
|
schema: schema_info.column_schemas()?,
|
||||||
}),
|
}),
|
||||||
table_name: table_name.clone(),
|
table_name: table_name.clone(),
|
||||||
},
|
},
|
||||||
|
|||||||
@@ -12,6 +12,7 @@
|
|||||||
// See the License for the specific language governing permissions and
|
// See the License for the specific language governing permissions and
|
||||||
// limitations under the License.
|
// limitations under the License.
|
||||||
|
|
||||||
|
use std::any::Any;
|
||||||
use std::future::Future;
|
use std::future::Future;
|
||||||
use std::net::SocketAddr;
|
use std::net::SocketAddr;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
@@ -144,4 +145,8 @@ impl Server for PostgresServer {
|
|||||||
fn bind_addr(&self) -> Option<SocketAddr> {
|
fn bind_addr(&self) -> Option<SocketAddr> {
|
||||||
self.bind_addr
|
self.bind_addr
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn as_any(&self) -> &dyn Any {
|
||||||
|
self
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -12,6 +12,7 @@
|
|||||||
// See the License for the specific language governing permissions and
|
// See the License for the specific language governing permissions and
|
||||||
// limitations under the License.
|
// limitations under the License.
|
||||||
|
|
||||||
|
use std::any::Any;
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::fmt::{Debug, Formatter};
|
use std::fmt::{Debug, Formatter};
|
||||||
use std::net::SocketAddr;
|
use std::net::SocketAddr;
|
||||||
@@ -147,6 +148,8 @@ pub trait Server: Send + Sync {
|
|||||||
fn bind_addr(&self) -> Option<SocketAddr> {
|
fn bind_addr(&self) -> Option<SocketAddr> {
|
||||||
None
|
None
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn as_any(&self) -> &dyn Any;
|
||||||
}
|
}
|
||||||
|
|
||||||
struct AcceptTask {
|
struct AcceptTask {
|
||||||
|
|||||||
@@ -29,7 +29,7 @@ use strum::EnumString;
|
|||||||
use crate::error::{InternalIoSnafu, Result};
|
use crate::error::{InternalIoSnafu, Result};
|
||||||
|
|
||||||
/// TlsMode is used for Mysql and Postgres server start up.
|
/// TlsMode is used for Mysql and Postgres server start up.
|
||||||
#[derive(Debug, Default, Serialize, Deserialize, Clone, PartialEq, Eq, EnumString)]
|
#[derive(Debug, Default, Serialize, Deserialize, Clone, Copy, PartialEq, Eq, EnumString)]
|
||||||
#[serde(rename_all = "snake_case")]
|
#[serde(rename_all = "snake_case")]
|
||||||
pub enum TlsMode {
|
pub enum TlsMode {
|
||||||
#[default]
|
#[default]
|
||||||
|
|||||||
@@ -1,14 +1,14 @@
|
|||||||
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------+
|
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------+
|
||||||
| data | ts |
|
| data | time_us |
|
||||||
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------+
|
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------+
|
||||||
| {_raw: {"commit":{"cid":"bafyreidblutgvj75o4q4akzyyejedjj6l3it6hgqwee6jpwv2wqph5fsgm","record":{"$type":"app.bsky.feed.post","createdAt":"2024-11-21T16:09:27.095Z","langs":["en"],"reply":{"parent":{"cid":"bafyreibfglofvqou2yiqvwzk4rcgkhhxrbunyemshdjledgwymimqkg24e","uri":"at://did:plc:6tr6tuzlx2db3rduzr2d6r24/app.bsky.feed.post/3lbhqo2rtys2z"},"root":{"cid":"bafyreibfglofvqou2yiqvwzk4rcgkhhxrbunyemshdjledgwymimqkg24e","uri":"at://did:plc:6tr6tuzlx2db3rduzr2d6r24/app.bsky.feed.post/3lbhqo2rtys2z"}},"text":"aaaaah. LIght shines in a corner of WTF...."},"rev":"3lbhtytnn2k2f","rkey":"3lbhtyteurk2y"}}, commit.collection: app.bsky.feed.post, commit.operation: create, did: did:plc:yj3sjq3blzpynh27cumnp5ks, kind: commit, time_us: 1732206349000167} | 1970-01-01T00:00:00.001 |
|
| {_raw: {"commit":{"cid":"bafyreidblutgvj75o4q4akzyyejedjj6l3it6hgqwee6jpwv2wqph5fsgm","record":{"$type":"app.bsky.feed.post","createdAt":"2024-11-21T16:09:27.095Z","langs":["en"],"reply":{"parent":{"cid":"bafyreibfglofvqou2yiqvwzk4rcgkhhxrbunyemshdjledgwymimqkg24e","uri":"at://did:plc:6tr6tuzlx2db3rduzr2d6r24/app.bsky.feed.post/3lbhqo2rtys2z"},"root":{"cid":"bafyreibfglofvqou2yiqvwzk4rcgkhhxrbunyemshdjledgwymimqkg24e","uri":"at://did:plc:6tr6tuzlx2db3rduzr2d6r24/app.bsky.feed.post/3lbhqo2rtys2z"}},"text":"aaaaah. LIght shines in a corner of WTF...."},"rev":"3lbhtytnn2k2f","rkey":"3lbhtyteurk2y"}}, commit.collection: app.bsky.feed.post, commit.operation: create, did: did:plc:yj3sjq3blzpynh27cumnp5ks, kind: commit, time_us: 1732206349000167} | 2024-11-21T16:25:49.000167 |
|
||||||
| {_raw: {"commit":{"cid":"bafyreia5l5vrkh5oj4cjyhcqby2dprhyvcyofo2q5562tijlae2pzih23m","record":{"$type":"app.bsky.feed.like","createdAt":"2024-11-21T16:25:46.221Z","subject":{"cid":"bafyreidjvrcmckkm765mct5fph36x7kupkfo35rjklbf2k76xkzwyiauge","uri":"at://did:plc:azrv4rcbws6kmcga4fsbphg2/app.bsky.feed.post/3lbgjdpbiec2l"}},"rev":"3lbhuvzds6d2a","rkey":"3lbhuvzdked2a"}}, commit.collection: app.bsky.feed.like, commit.operation: create, did: did:plc:3i4xf2v4wcnyktgv6satke64, kind: commit, time_us: 1732206349000644} | 1970-01-01T00:00:00.002 |
|
| {_raw: {"commit":{"cid":"bafyreia5l5vrkh5oj4cjyhcqby2dprhyvcyofo2q5562tijlae2pzih23m","record":{"$type":"app.bsky.feed.like","createdAt":"2024-11-21T16:25:46.221Z","subject":{"cid":"bafyreidjvrcmckkm765mct5fph36x7kupkfo35rjklbf2k76xkzwyiauge","uri":"at://did:plc:azrv4rcbws6kmcga4fsbphg2/app.bsky.feed.post/3lbgjdpbiec2l"}},"rev":"3lbhuvzds6d2a","rkey":"3lbhuvzdked2a"}}, commit.collection: app.bsky.feed.like, commit.operation: create, did: did:plc:3i4xf2v4wcnyktgv6satke64, kind: commit, time_us: 1732206349000644} | 2024-11-21T16:25:49.000644 |
|
||||||
| {_raw: {"commit":{"cid":"bafyreiew2p6cgirfaj45qoenm4fgumib7xoloclrap3jgkz5es7g7kby3i","record":{"$type":"app.bsky.graph.follow","createdAt":"2024-11-21T16:27:40.923Z","subject":"did:plc:r7cdh4sgzqbfdc6wcdxxti7c"},"rev":"3lbhuvze3gi2u","rkey":"3lbhuvzdtmi2u"}}, commit.collection: app.bsky.graph.follow, commit.operation: create, did: did:plc:gccfnqqizz4urhchsaie6jft, kind: commit, time_us: 1732206349001108} | 1970-01-01T00:00:00.003 |
|
| {_raw: {"commit":{"cid":"bafyreiew2p6cgirfaj45qoenm4fgumib7xoloclrap3jgkz5es7g7kby3i","record":{"$type":"app.bsky.graph.follow","createdAt":"2024-11-21T16:27:40.923Z","subject":"did:plc:r7cdh4sgzqbfdc6wcdxxti7c"},"rev":"3lbhuvze3gi2u","rkey":"3lbhuvzdtmi2u"}}, commit.collection: app.bsky.graph.follow, commit.operation: create, did: did:plc:gccfnqqizz4urhchsaie6jft, kind: commit, time_us: 1732206349001108} | 2024-11-21T16:25:49.001108 |
|
||||||
| {_raw: {"commit":{"cid":"bafyreidjix4dauj2afjlbzmhj3a7gwftcevvmmy6edww6vrjdbst26rkby","record":{"$type":"app.bsky.feed.like","createdAt":"2024-11-21T16:15:58.232Z","subject":{"cid":"bafyreiavpshyqzrlo5m7fqodjhs6jevweqnif4phasiwimv4a7mnsqi2fe","uri":"at://did:plc:fusulxqc52zbrc75fi6xrcof/app.bsky.feed.post/3lbhskq5zn22f"}},"rev":"3lbhueija5p22","rkey":"3lbhueiizcx22"}}, commit.collection: app.bsky.feed.like, commit.operation: create, did: did:plc:msxqf3twq7abtdw7dbfskphk, kind: commit, time_us: 1732206349001372} | 1970-01-01T00:00:00.004 |
|
| {_raw: {"commit":{"cid":"bafyreidjix4dauj2afjlbzmhj3a7gwftcevvmmy6edww6vrjdbst26rkby","record":{"$type":"app.bsky.feed.like","createdAt":"2024-11-21T16:15:58.232Z","subject":{"cid":"bafyreiavpshyqzrlo5m7fqodjhs6jevweqnif4phasiwimv4a7mnsqi2fe","uri":"at://did:plc:fusulxqc52zbrc75fi6xrcof/app.bsky.feed.post/3lbhskq5zn22f"}},"rev":"3lbhueija5p22","rkey":"3lbhueiizcx22"}}, commit.collection: app.bsky.feed.like, commit.operation: create, did: did:plc:msxqf3twq7abtdw7dbfskphk, kind: commit, time_us: 1732206349001372} | 2024-11-21T16:25:49.001372 |
|
||||||
| {_raw: {"commit":{"cid":"bafyreigzdsdne3z2xxcakgisieyj7y47hj6eg7lj6v4q25ah5q2qotu5ku","record":{"$type":"app.bsky.feed.post","createdAt":"2024-11-21T16:09:27.254Z","langs":["en"],"reply":{"parent":{"cid":"bafyreih35fe2jj3gchmgk4amold4l6sfxd2sby5wrg3jrws5fkdypxrbg4","uri":"at://did:plc:6wx2gg5yqgvmlu35r6y3bk6d/app.bsky.feed.post/3lbhtj2eb4s2o"},"root":{"cid":"bafyreifipyt3vctd4ptuoicvio7rbr5xvjv4afwuggnd2prnmn55mu6luu","uri":"at://did:plc:474ldquxwzrlcvjhhbbk2wte/app.bsky.feed.post/3lbhdzrynik27"}},"text":"okay i take mine back because I hadn’t heard this one yet^^"},"rev":"3lbhtytohxc2o","rkey":"3lbhtytjqzk2q"}}, commit.collection: app.bsky.feed.post, commit.operation: create, did: did:plc:l5o3qjrmfztir54cpwlv2eme, kind: commit, time_us: 1732206349001905} | 1970-01-01T00:00:00.005 |
|
| {_raw: {"commit":{"cid":"bafyreigzdsdne3z2xxcakgisieyj7y47hj6eg7lj6v4q25ah5q2qotu5ku","record":{"$type":"app.bsky.feed.post","createdAt":"2024-11-21T16:09:27.254Z","langs":["en"],"reply":{"parent":{"cid":"bafyreih35fe2jj3gchmgk4amold4l6sfxd2sby5wrg3jrws5fkdypxrbg4","uri":"at://did:plc:6wx2gg5yqgvmlu35r6y3bk6d/app.bsky.feed.post/3lbhtj2eb4s2o"},"root":{"cid":"bafyreifipyt3vctd4ptuoicvio7rbr5xvjv4afwuggnd2prnmn55mu6luu","uri":"at://did:plc:474ldquxwzrlcvjhhbbk2wte/app.bsky.feed.post/3lbhdzrynik27"}},"text":"okay i take mine back because I hadn’t heard this one yet^^"},"rev":"3lbhtytohxc2o","rkey":"3lbhtytjqzk2q"}}, commit.collection: app.bsky.feed.post, commit.operation: create, did: did:plc:l5o3qjrmfztir54cpwlv2eme, kind: commit, time_us: 1732206349001905} | 2024-11-21T16:25:49.001905 |
|
||||||
| {_raw: {"commit":{"cid":"bafyreiaa2vsdr4ckwjg4jq47zfd7mewidywfz3qh3dmglcd6ozi4xwdega","record":{"$type":"app.bsky.graph.follow","createdAt":"2024-11-21T16:15:21.495Z","subject":"did:plc:amsdn2tbjxo3xrwqneqhh4cm"},"rev":"3lbhudfo3yi2w","rkey":"3lbhudfnw4y2w"}}, commit.collection: app.bsky.graph.follow, commit.operation: create, did: did:plc:jkaaf5j2yb2pvpx3ualm3vbh, kind: commit, time_us: 1732206349002758} | 1970-01-01T00:00:00.006 |
|
| {_raw: {"commit":{"cid":"bafyreiaa2vsdr4ckwjg4jq47zfd7mewidywfz3qh3dmglcd6ozi4xwdega","record":{"$type":"app.bsky.graph.follow","createdAt":"2024-11-21T16:15:21.495Z","subject":"did:plc:amsdn2tbjxo3xrwqneqhh4cm"},"rev":"3lbhudfo3yi2w","rkey":"3lbhudfnw4y2w"}}, commit.collection: app.bsky.graph.follow, commit.operation: create, did: did:plc:jkaaf5j2yb2pvpx3ualm3vbh, kind: commit, time_us: 1732206349002758} | 2024-11-21T16:25:49.002758 |
|
||||||
| {_raw: {"commit":{"cid":"bafyreihaatlpar3abtx6ck3kde2ksic6zzflk4ppduhf6dxurytqrv33ni","record":{"$type":"app.bsky.graph.follow","createdAt":"2024-11-21T16:18:39.913Z","subject":"did:plc:gf3vum7insztt5rxrpxdz2id"},"rev":"3lbhujcp4ix2n","rkey":"3lbhujcoxmp2n"}}, commit.collection: app.bsky.graph.follow, commit.operation: create, did: did:plc:tdwz2h4id5dxezvohftsmffu, kind: commit, time_us: 1732206349003106} | 1970-01-01T00:00:00.007 |
|
| {_raw: {"commit":{"cid":"bafyreihaatlpar3abtx6ck3kde2ksic6zzflk4ppduhf6dxurytqrv33ni","record":{"$type":"app.bsky.graph.follow","createdAt":"2024-11-21T16:18:39.913Z","subject":"did:plc:gf3vum7insztt5rxrpxdz2id"},"rev":"3lbhujcp4ix2n","rkey":"3lbhujcoxmp2n"}}, commit.collection: app.bsky.graph.follow, commit.operation: create, did: did:plc:tdwz2h4id5dxezvohftsmffu, kind: commit, time_us: 1732206349003106} | 2024-11-21T16:25:49.003106 |
|
||||||
| {_raw: {"commit":{"cid":"bafyreid5ycocp5zq2g7fcx2xxzxrbafuh7b5qhtwuwiomzo6vqila2cbpu","record":{"$type":"app.bsky.feed.repost","createdAt":"2024-11-21T16:23:36.714Z","subject":{"cid":"bafyreieaacfiobnuqvjhhsndyi5s3fd6krbzdduxsyrzfv43kczpcmkl6y","uri":"at://did:plc:o5q6dynpme4ndolc3heztasm/app.bsky.feed.post/3lbfli3qsoc2o"}},"rev":"3lbhus5vior2t","rkey":"3lbhus5vbtz2t"}}, commit.collection: app.bsky.feed.repost, commit.operation: create, did: did:plc:cdsd346mwow7aj3tgfkwsct3, kind: commit, time_us: 1732206349003461} | 1970-01-01T00:00:00.008 |
|
| {_raw: {"commit":{"cid":"bafyreid5ycocp5zq2g7fcx2xxzxrbafuh7b5qhtwuwiomzo6vqila2cbpu","record":{"$type":"app.bsky.feed.repost","createdAt":"2024-11-21T16:23:36.714Z","subject":{"cid":"bafyreieaacfiobnuqvjhhsndyi5s3fd6krbzdduxsyrzfv43kczpcmkl6y","uri":"at://did:plc:o5q6dynpme4ndolc3heztasm/app.bsky.feed.post/3lbfli3qsoc2o"}},"rev":"3lbhus5vior2t","rkey":"3lbhus5vbtz2t"}}, commit.collection: app.bsky.feed.repost, commit.operation: create, did: did:plc:cdsd346mwow7aj3tgfkwsct3, kind: commit, time_us: 1732206349003461} | 2024-11-21T16:25:49.003461 |
|
||||||
| {_raw: {"commit":{"cid":"bafyreibugobcike72y4zxvdyz2oopyt6ywwqfielcwojkb27p7s6rlomgm","record":{"$type":"app.bsky.feed.post","createdAt":"2024-11-21T16:25:44.376Z","langs":["en"],"reply":{"parent":{"cid":"bafyreiaev27cfcxxvn2pdhrwwquzwgclujnulzbcfnn4p4fwgb6migjhw4","uri":"at://did:plc:zec6cslvgc3hhdatrhk6pq5p/app.bsky.feed.post/3lbhujvds4c2b"},"root":{"cid":"bafyreif7qjxhvecwnhlynijj6pf47jwvtkahsz3zh2kaipwu2bw2dxwaqq","uri":"at://did:plc:s4bwqchfzm6gjqfeb6mexgbu/app.bsky.feed.post/3lbhug53kkk2m"}},"text":"\n⌜ Blinking. She hadn't realized she spoke out loud. ⌟\n\n‘ It was nothing like that — . I was only thinking . . . ’\n\n⌜ Trailing off, her mind occupied. ⌟\n"},"rev":"3lbhuvzeccx2w","rkey":"3lbhuvxf4qs2m"}}, commit.collection: app.bsky.feed.post, commit.operation: create, did: did:plc:s4bwqchfzm6gjqfeb6mexgbu, kind: commit, time_us: 1732206349003907} | 1970-01-01T00:00:00.009 |
|
| {_raw: {"commit":{"cid":"bafyreibugobcike72y4zxvdyz2oopyt6ywwqfielcwojkb27p7s6rlomgm","record":{"$type":"app.bsky.feed.post","createdAt":"2024-11-21T16:25:44.376Z","langs":["en"],"reply":{"parent":{"cid":"bafyreiaev27cfcxxvn2pdhrwwquzwgclujnulzbcfnn4p4fwgb6migjhw4","uri":"at://did:plc:zec6cslvgc3hhdatrhk6pq5p/app.bsky.feed.post/3lbhujvds4c2b"},"root":{"cid":"bafyreif7qjxhvecwnhlynijj6pf47jwvtkahsz3zh2kaipwu2bw2dxwaqq","uri":"at://did:plc:s4bwqchfzm6gjqfeb6mexgbu/app.bsky.feed.post/3lbhug53kkk2m"}},"text":"\n⌜ Blinking. She hadn't realized she spoke out loud. ⌟\n\n‘ It was nothing like that — . I was only thinking . . . ’\n\n⌜ Trailing off, her mind occupied. ⌟\n"},"rev":"3lbhuvzeccx2w","rkey":"3lbhuvxf4qs2m"}}, commit.collection: app.bsky.feed.post, commit.operation: create, did: did:plc:s4bwqchfzm6gjqfeb6mexgbu, kind: commit, time_us: 1732206349003907} | 2024-11-21T16:25:49.003907 |
|
||||||
| {_raw: {"commit":{"cid":"bafyreidjk2svg2fdjiiwohmfmvp3hdxhpb33ycnixzbkyib5m6cocindxq","record":{"$type":"app.bsky.feed.like","createdAt":"2024-11-21T16:25:46.167Z","subject":{"cid":"bafyreiaumopip75nzx2xjbugtwemdppsyx54bd2odf6q45f3o7xkocgari","uri":"at://did:plc:ig2jv6gqup4t7gdq2pmanknw/app.bsky.feed.post/3lbhuvtlaec2c"}},"rev":"3lbhuvzedg52j","rkey":"3lbhuvzdyof2j"}}, commit.collection: app.bsky.feed.like, commit.operation: create, did: did:plc:hbc74dlsxhq53kp5oxges6d7, kind: commit, time_us: 1732206349004769} | 1970-01-01T00:00:00.010 |
|
| {_raw: {"commit":{"cid":"bafyreidjk2svg2fdjiiwohmfmvp3hdxhpb33ycnixzbkyib5m6cocindxq","record":{"$type":"app.bsky.feed.like","createdAt":"2024-11-21T16:25:46.167Z","subject":{"cid":"bafyreiaumopip75nzx2xjbugtwemdppsyx54bd2odf6q45f3o7xkocgari","uri":"at://did:plc:ig2jv6gqup4t7gdq2pmanknw/app.bsky.feed.post/3lbhuvtlaec2c"}},"rev":"3lbhuvzedg52j","rkey":"3lbhuvzdyof2j"}}, commit.collection: app.bsky.feed.like, commit.operation: create, did: did:plc:hbc74dlsxhq53kp5oxges6d7, kind: commit, time_us: 1732206349004769} | 2024-11-21T16:25:49.004769 |
|
||||||
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------+
|
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------+
|
||||||
|
|||||||
@@ -48,9 +48,9 @@ use flow::{FlownodeBuilder, FrontendClient, GrpcQueryHandlerWithBoxedError};
|
|||||||
use frontend::frontend::Frontend;
|
use frontend::frontend::Frontend;
|
||||||
use frontend::instance::builder::FrontendBuilder;
|
use frontend::instance::builder::FrontendBuilder;
|
||||||
use frontend::instance::{Instance, StandaloneDatanodeManager};
|
use frontend::instance::{Instance, StandaloneDatanodeManager};
|
||||||
|
use frontend::server::Services;
|
||||||
use meta_srv::metasrv::{FLOW_ID_SEQ, TABLE_ID_SEQ};
|
use meta_srv::metasrv::{FLOW_ID_SEQ, TABLE_ID_SEQ};
|
||||||
use servers::grpc::GrpcOptions;
|
use servers::grpc::GrpcOptions;
|
||||||
use servers::server::ServerHandlers;
|
|
||||||
use snafu::ResultExt;
|
use snafu::ResultExt;
|
||||||
use standalone::options::StandaloneOptions;
|
use standalone::options::StandaloneOptions;
|
||||||
|
|
||||||
@@ -249,7 +249,7 @@ impl GreptimeDbStandaloneBuilder {
|
|||||||
procedure_executor.clone(),
|
procedure_executor.clone(),
|
||||||
Arc::new(ProcessManager::new(server_addr, None)),
|
Arc::new(ProcessManager::new(server_addr, None)),
|
||||||
)
|
)
|
||||||
.with_plugin(plugins)
|
.with_plugin(plugins.clone())
|
||||||
.try_build()
|
.try_build()
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
@@ -282,14 +282,15 @@ impl GreptimeDbStandaloneBuilder {
|
|||||||
|
|
||||||
test_util::prepare_another_catalog_and_schema(&instance).await;
|
test_util::prepare_another_catalog_and_schema(&instance).await;
|
||||||
|
|
||||||
let mut frontend = Frontend {
|
let servers = Services::new(opts.clone(), instance.clone(), plugins)
|
||||||
|
.build()
|
||||||
|
.unwrap();
|
||||||
|
let frontend = Frontend {
|
||||||
instance,
|
instance,
|
||||||
servers: ServerHandlers::default(),
|
servers,
|
||||||
heartbeat_task: None,
|
heartbeat_task: None,
|
||||||
};
|
};
|
||||||
|
|
||||||
frontend.start().await.unwrap();
|
|
||||||
|
|
||||||
GreptimeDbStandalone {
|
GreptimeDbStandalone {
|
||||||
frontend: Arc::new(frontend),
|
frontend: Arc::new(frontend),
|
||||||
opts,
|
opts,
|
||||||
|
|||||||
@@ -18,14 +18,114 @@ use std::{fs, io};
|
|||||||
|
|
||||||
use common_test_util::find_workspace_path;
|
use common_test_util::find_workspace_path;
|
||||||
use frontend::instance::Instance;
|
use frontend::instance::Instance;
|
||||||
|
use http::StatusCode;
|
||||||
|
use servers::http::test_helpers::TestClient;
|
||||||
|
use servers::http::{HTTP_SERVER, HttpServer};
|
||||||
|
use servers::server::ServerHandlers;
|
||||||
use tests_integration::standalone::GreptimeDbStandaloneBuilder;
|
use tests_integration::standalone::GreptimeDbStandaloneBuilder;
|
||||||
use tests_integration::test_util::execute_sql_and_expect;
|
use tests_integration::test_util::execute_sql_and_expect;
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
|
||||||
async fn test_load_jsonbench_data() {
|
async fn test_load_jsonbench_data_by_pipeline() -> io::Result<()> {
|
||||||
common_telemetry::init_default_ut_logging();
|
common_telemetry::init_default_ut_logging();
|
||||||
|
|
||||||
let instance = GreptimeDbStandaloneBuilder::new("test_load_jsonbench_data")
|
let instance = GreptimeDbStandaloneBuilder::new("test_load_jsonbench_data_by_pipeline")
|
||||||
|
.build()
|
||||||
|
.await;
|
||||||
|
let frontend = instance.fe_instance();
|
||||||
|
|
||||||
|
let ServerHandlers::Init(handlers) = instance.frontend.server_handlers() else {
|
||||||
|
unreachable!()
|
||||||
|
};
|
||||||
|
let router = {
|
||||||
|
let handlers = handlers.lock().unwrap();
|
||||||
|
let server = handlers
|
||||||
|
.get(HTTP_SERVER)
|
||||||
|
.and_then(|x| x.0.as_any().downcast_ref::<HttpServer>())
|
||||||
|
.unwrap();
|
||||||
|
server.build(server.make_app()).unwrap()
|
||||||
|
};
|
||||||
|
let client = TestClient::new(router).await;
|
||||||
|
|
||||||
|
create_table(frontend).await;
|
||||||
|
|
||||||
|
desc_table(frontend).await;
|
||||||
|
|
||||||
|
create_pipeline(&client).await;
|
||||||
|
|
||||||
|
insert_data_by_pipeline(&client).await?;
|
||||||
|
|
||||||
|
query_data(frontend).await
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn insert_data_by_pipeline(client: &TestClient) -> io::Result<()> {
|
||||||
|
let file = fs::read(find_workspace_path(
|
||||||
|
"tests-integration/resources/jsonbench-head-10.ndjson",
|
||||||
|
))?;
|
||||||
|
|
||||||
|
let response = client
|
||||||
|
.post("/v1/ingest?table=bluesky&pipeline_name=jsonbench")
|
||||||
|
.header("Content-Type", "text/plain")
|
||||||
|
.body(file)
|
||||||
|
.send()
|
||||||
|
.await;
|
||||||
|
assert_eq!(response.status(), StatusCode::OK);
|
||||||
|
|
||||||
|
let response = response.text().await;
|
||||||
|
// Note that this pattern also matches the inserted rows: "10".
|
||||||
|
let pattern = r#"{"output":[{"affectedrows":10}]"#;
|
||||||
|
assert!(response.starts_with(pattern));
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn create_pipeline(client: &TestClient) {
|
||||||
|
let pipeline = r#"
|
||||||
|
version: 2
|
||||||
|
|
||||||
|
processors:
|
||||||
|
- json_parse:
|
||||||
|
fields:
|
||||||
|
- message, data
|
||||||
|
ignore_missing: true
|
||||||
|
- simple_extract:
|
||||||
|
fields:
|
||||||
|
- data, time_us
|
||||||
|
key: "time_us"
|
||||||
|
ignore_missing: false
|
||||||
|
- epoch:
|
||||||
|
fields:
|
||||||
|
- time_us
|
||||||
|
resolution: microsecond
|
||||||
|
- select:
|
||||||
|
fields:
|
||||||
|
- time_us
|
||||||
|
- data
|
||||||
|
|
||||||
|
transform:
|
||||||
|
- fields:
|
||||||
|
- time_us
|
||||||
|
type: epoch, us
|
||||||
|
index: timestamp
|
||||||
|
"#;
|
||||||
|
|
||||||
|
let response = client
|
||||||
|
.post("/v1/pipelines/jsonbench")
|
||||||
|
.header("Content-Type", "application/x-yaml")
|
||||||
|
.body(pipeline)
|
||||||
|
.send()
|
||||||
|
.await;
|
||||||
|
assert_eq!(response.status(), StatusCode::OK);
|
||||||
|
|
||||||
|
let response = response.text().await;
|
||||||
|
let pattern = r#"{"pipelines":[{"name":"jsonbench""#;
|
||||||
|
assert!(response.starts_with(pattern));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
|
||||||
|
async fn test_load_jsonbench_data_by_sql() -> io::Result<()> {
|
||||||
|
common_telemetry::init_default_ut_logging();
|
||||||
|
|
||||||
|
let instance = GreptimeDbStandaloneBuilder::new("test_load_jsonbench_data_by_sql")
|
||||||
.build()
|
.build()
|
||||||
.await;
|
.await;
|
||||||
let frontend = instance.fe_instance();
|
let frontend = instance.fe_instance();
|
||||||
@@ -34,9 +134,9 @@ async fn test_load_jsonbench_data() {
|
|||||||
|
|
||||||
desc_table(frontend).await;
|
desc_table(frontend).await;
|
||||||
|
|
||||||
insert_data(frontend).await.unwrap();
|
insert_data_by_sql(frontend).await?;
|
||||||
|
|
||||||
query_data(frontend).await.unwrap();
|
query_data(frontend).await
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn query_data(frontend: &Arc<Instance>) -> io::Result<()> {
|
async fn query_data(frontend: &Arc<Instance>) -> io::Result<()> {
|
||||||
@@ -46,22 +146,21 @@ async fn query_data(frontend: &Arc<Instance>) -> io::Result<()> {
|
|||||||
| count(*) |
|
| count(*) |
|
||||||
+----------+
|
+----------+
|
||||||
| 10 |
|
| 10 |
|
||||||
+----------+
|
+----------+"#;
|
||||||
"#;
|
|
||||||
execute_sql_and_expect(frontend, sql, expected).await;
|
execute_sql_and_expect(frontend, sql, expected).await;
|
||||||
|
|
||||||
let sql = "SELECT * FROM bluesky ORDER BY ts";
|
let sql = "SELECT * FROM bluesky ORDER BY time_us";
|
||||||
let expected = fs::read_to_string(find_workspace_path(
|
let expected = fs::read_to_string(find_workspace_path(
|
||||||
"tests-integration/resources/jsonbench-select-all.txt",
|
"tests-integration/resources/jsonbench-select-all.txt",
|
||||||
))?;
|
))?;
|
||||||
execute_sql_and_expect(frontend, sql, &expected).await;
|
execute_sql_and_expect(frontend, sql, &expected).await;
|
||||||
|
|
||||||
// query 1:
|
// query 1:
|
||||||
let sql = "\
|
let sql = "
|
||||||
SELECT \
|
SELECT
|
||||||
json_get_string(data, '$.commit.collection') AS event, count() AS count \
|
json_get_string(data, '$.commit.collection') AS event, count() AS count
|
||||||
FROM bluesky \
|
FROM bluesky
|
||||||
GROUP BY event \
|
GROUP BY event
|
||||||
ORDER BY count DESC, event ASC";
|
ORDER BY count DESC, event ASC";
|
||||||
let expected = r#"
|
let expected = r#"
|
||||||
+-----------------------+-------+
|
+-----------------------+-------+
|
||||||
@@ -75,16 +174,16 @@ ORDER BY count DESC, event ASC";
|
|||||||
execute_sql_and_expect(frontend, sql, expected).await;
|
execute_sql_and_expect(frontend, sql, expected).await;
|
||||||
|
|
||||||
// query 2:
|
// query 2:
|
||||||
let sql = "\
|
let sql = "
|
||||||
SELECT \
|
SELECT
|
||||||
json_get_string(data, '$.commit.collection') AS event, \
|
json_get_string(data, '$.commit.collection') AS event,
|
||||||
count() AS count, \
|
count() AS count,
|
||||||
count(DISTINCT json_get_string(data, '$.did')) AS users \
|
count(DISTINCT json_get_string(data, '$.did')) AS users
|
||||||
FROM bluesky \
|
FROM bluesky
|
||||||
WHERE \
|
WHERE
|
||||||
(json_get_string(data, '$.kind') = 'commit') AND \
|
(json_get_string(data, '$.kind') = 'commit') AND
|
||||||
(json_get_string(data, '$.commit.operation') = 'create') \
|
(json_get_string(data, '$.commit.operation') = 'create')
|
||||||
GROUP BY event \
|
GROUP BY event
|
||||||
ORDER BY count DESC, event ASC";
|
ORDER BY count DESC, event ASC";
|
||||||
let expected = r#"
|
let expected = r#"
|
||||||
+-----------------------+-------+-------+
|
+-----------------------+-------+-------+
|
||||||
@@ -98,18 +197,18 @@ ORDER BY count DESC, event ASC";
|
|||||||
execute_sql_and_expect(frontend, sql, expected).await;
|
execute_sql_and_expect(frontend, sql, expected).await;
|
||||||
|
|
||||||
// query 3:
|
// query 3:
|
||||||
let sql = "\
|
let sql = "
|
||||||
SELECT \
|
SELECT
|
||||||
json_get_string(data, '$.commit.collection') AS event, \
|
json_get_string(data, '$.commit.collection') AS event,
|
||||||
date_part('hour', to_timestamp_micros(json_get_int(data, '$.time_us'))) as hour_of_day, \
|
date_part('hour', to_timestamp_micros(json_get_int(data, '$.time_us'))) as hour_of_day,
|
||||||
count() AS count \
|
count() AS count
|
||||||
FROM bluesky \
|
FROM bluesky
|
||||||
WHERE \
|
WHERE
|
||||||
(json_get_string(data, '$.kind') = 'commit') AND \
|
(json_get_string(data, '$.kind') = 'commit') AND
|
||||||
(json_get_string(data, '$.commit.operation') = 'create') AND \
|
(json_get_string(data, '$.commit.operation') = 'create') AND
|
||||||
json_get_string(data, '$.commit.collection') IN \
|
json_get_string(data, '$.commit.collection') IN
|
||||||
('app.bsky.feed.post', 'app.bsky.feed.repost', 'app.bsky.feed.like') \
|
('app.bsky.feed.post', 'app.bsky.feed.repost', 'app.bsky.feed.like')
|
||||||
GROUP BY event, hour_of_day \
|
GROUP BY event, hour_of_day
|
||||||
ORDER BY hour_of_day, event";
|
ORDER BY hour_of_day, event";
|
||||||
let expected = r#"
|
let expected = r#"
|
||||||
+----------------------+-------------+-------+
|
+----------------------+-------------+-------+
|
||||||
@@ -122,7 +221,7 @@ ORDER BY hour_of_day, event";
|
|||||||
execute_sql_and_expect(frontend, sql, expected).await;
|
execute_sql_and_expect(frontend, sql, expected).await;
|
||||||
|
|
||||||
// query 4:
|
// query 4:
|
||||||
let sql = "\
|
let sql = "
|
||||||
SELECT
|
SELECT
|
||||||
json_get_string(data, '$.did') as user_id,
|
json_get_string(data, '$.did') as user_id,
|
||||||
min(to_timestamp_micros(json_get_int(data, '$.time_us'))) AS first_post_ts
|
min(to_timestamp_micros(json_get_int(data, '$.time_us'))) AS first_post_ts
|
||||||
@@ -174,19 +273,23 @@ LIMIT 3";
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn insert_data(frontend: &Arc<Instance>) -> io::Result<()> {
|
async fn insert_data_by_sql(frontend: &Arc<Instance>) -> io::Result<()> {
|
||||||
let file = fs::File::open(find_workspace_path(
|
let file = fs::File::open(find_workspace_path(
|
||||||
"tests-integration/resources/jsonbench-head-10.ndjson",
|
"tests-integration/resources/jsonbench-head-10.ndjson",
|
||||||
))?;
|
))?;
|
||||||
let reader = io::BufReader::new(file);
|
let reader = io::BufReader::new(file);
|
||||||
for (i, line) in reader.lines().enumerate() {
|
for line in reader.lines() {
|
||||||
let line = line?;
|
let line = line?;
|
||||||
if line.is_empty() {
|
if line.is_empty() {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let json: serde_json::Value = serde_json::from_str(&line)?;
|
||||||
|
let time_us = json.pointer("/time_us").and_then(|x| x.as_u64()).unwrap();
|
||||||
|
|
||||||
let sql = format!(
|
let sql = format!(
|
||||||
"INSERT INTO bluesky (ts, data) VALUES ({}, '{}')",
|
"INSERT INTO bluesky (time_us, data) VALUES ({}, '{}')",
|
||||||
i + 1,
|
time_us,
|
||||||
line.replace("'", "''"), // standard method to escape the single quote
|
line.replace("'", "''"), // standard method to escape the single quote
|
||||||
);
|
);
|
||||||
execute_sql_and_expect(frontend, &sql, "Affected Rows: 1").await;
|
execute_sql_and_expect(frontend, &sql, "Affected Rows: 1").await;
|
||||||
@@ -197,12 +300,12 @@ async fn insert_data(frontend: &Arc<Instance>) -> io::Result<()> {
|
|||||||
async fn desc_table(frontend: &Arc<Instance>) {
|
async fn desc_table(frontend: &Arc<Instance>) {
|
||||||
let sql = "DESC TABLE bluesky";
|
let sql = "DESC TABLE bluesky";
|
||||||
let expected = r#"
|
let expected = r#"
|
||||||
+--------+----------------------------------------------------------------------------------------------------------------------------------------------+-----+------+---------+---------------+
|
+---------+------------------------------------------------------------------------------------------------------------------------------------------------+-----+------+---------+---------------+
|
||||||
| Column | Type | Key | Null | Default | Semantic Type |
|
| Column | Type | Key | Null | Default | Semantic Type |
|
||||||
+--------+----------------------------------------------------------------------------------------------------------------------------------------------+-----+------+---------+---------------+
|
+---------+------------------------------------------------------------------------------------------------------------------------------------------------+-----+------+---------+---------------+
|
||||||
| data | Json<Object{"_raw": String, "commit.collection": String, "commit.operation": String, "did": String, "kind": String, "time_us": Number(I64)}> | | YES | | FIELD |
|
| data | Json<{"_raw":"<String>","commit.collection":"<String>","commit.operation":"<String>","did":"<String>","kind":"<String>","time_us":"<Number>"}> | | YES | | FIELD |
|
||||||
| ts | TimestampMillisecond | PRI | NO | | TIMESTAMP |
|
| time_us | TimestampMicrosecond | PRI | NO | | TIMESTAMP |
|
||||||
+--------+----------------------------------------------------------------------------------------------------------------------------------------------+-----+------+---------+---------------+"#;
|
+---------+------------------------------------------------------------------------------------------------------------------------------------------------+-----+------+---------+---------------+"#;
|
||||||
execute_sql_and_expect(frontend, sql, expected).await;
|
execute_sql_and_expect(frontend, sql, expected).await;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -219,7 +322,7 @@ CREATE TABLE bluesky (
|
|||||||
time_us Bigint
|
time_us Bigint
|
||||||
>,
|
>,
|
||||||
),
|
),
|
||||||
ts Timestamp TIME INDEX,
|
time_us TimestampMicrosecond TIME INDEX,
|
||||||
)
|
)
|
||||||
"#;
|
"#;
|
||||||
execute_sql_and_expect(frontend, sql, "Affected Rows: 0").await;
|
execute_sql_and_expect(frontend, sql, "Affected Rows: 0").await;
|
||||||
|
|||||||
@@ -32,6 +32,17 @@ SHOW CREATE TABLE comment_table_test;
|
|||||||
| | ) |
|
| | ) |
|
||||||
+--------------------+---------------------------------------------------+
|
+--------------------+---------------------------------------------------+
|
||||||
|
|
||||||
|
SELECT table_comment
|
||||||
|
FROM information_schema.tables
|
||||||
|
WHERE table_schema = 'public'
|
||||||
|
AND table_name = 'comment_table_test';
|
||||||
|
|
||||||
|
+-------------------------+
|
||||||
|
| table_comment |
|
||||||
|
+-------------------------+
|
||||||
|
| table level description |
|
||||||
|
+-------------------------+
|
||||||
|
|
||||||
-- Remove table comment
|
-- Remove table comment
|
||||||
COMMENT ON TABLE comment_table_test IS NULL;
|
COMMENT ON TABLE comment_table_test IS NULL;
|
||||||
|
|
||||||
@@ -54,6 +65,17 @@ SHOW CREATE TABLE comment_table_test;
|
|||||||
| | |
|
| | |
|
||||||
+--------------------+---------------------------------------------------+
|
+--------------------+---------------------------------------------------+
|
||||||
|
|
||||||
|
SELECT table_comment
|
||||||
|
FROM information_schema.tables
|
||||||
|
WHERE table_schema = 'public'
|
||||||
|
AND table_name = 'comment_table_test';
|
||||||
|
|
||||||
|
+---------------+
|
||||||
|
| table_comment |
|
||||||
|
+---------------+
|
||||||
|
| |
|
||||||
|
+---------------+
|
||||||
|
|
||||||
DROP TABLE comment_table_test;
|
DROP TABLE comment_table_test;
|
||||||
|
|
||||||
Affected Rows: 0
|
Affected Rows: 0
|
||||||
@@ -90,6 +112,18 @@ SHOW CREATE TABLE comment_column_test;
|
|||||||
| | |
|
| | |
|
||||||
+---------------------+---------------------------------------------------------+
|
+---------------------+---------------------------------------------------------+
|
||||||
|
|
||||||
|
SELECT column_comment
|
||||||
|
FROM information_schema.columns
|
||||||
|
WHERE table_schema = 'public'
|
||||||
|
AND table_name = 'comment_column_test'
|
||||||
|
AND column_name = 'val';
|
||||||
|
|
||||||
|
+--------------------------+
|
||||||
|
| column_comment |
|
||||||
|
+--------------------------+
|
||||||
|
| value column description |
|
||||||
|
+--------------------------+
|
||||||
|
|
||||||
-- Remove column comment
|
-- Remove column comment
|
||||||
COMMENT ON COLUMN comment_column_test.val IS NULL;
|
COMMENT ON COLUMN comment_column_test.val IS NULL;
|
||||||
|
|
||||||
@@ -112,6 +146,18 @@ SHOW CREATE TABLE comment_column_test;
|
|||||||
| | |
|
| | |
|
||||||
+---------------------+----------------------------------------------------+
|
+---------------------+----------------------------------------------------+
|
||||||
|
|
||||||
|
SELECT column_comment
|
||||||
|
FROM information_schema.columns
|
||||||
|
WHERE table_schema = 'public'
|
||||||
|
AND table_name = 'comment_column_test'
|
||||||
|
AND column_name = 'val';
|
||||||
|
|
||||||
|
+----------------+
|
||||||
|
| column_comment |
|
||||||
|
+----------------+
|
||||||
|
| |
|
||||||
|
+----------------+
|
||||||
|
|
||||||
DROP TABLE comment_column_test;
|
DROP TABLE comment_column_test;
|
||||||
|
|
||||||
Affected Rows: 0
|
Affected Rows: 0
|
||||||
@@ -155,6 +201,16 @@ SHOW CREATE FLOW flow_comment_test;
|
|||||||
| | AS SELECT desc_str, ts FROM flow_source_comment_test |
|
| | AS SELECT desc_str, ts FROM flow_source_comment_test |
|
||||||
+-------------------+------------------------------------------------------+
|
+-------------------+------------------------------------------------------+
|
||||||
|
|
||||||
|
SELECT comment
|
||||||
|
FROM information_schema.flows
|
||||||
|
WHERE flow_name = 'flow_comment_test';
|
||||||
|
|
||||||
|
+------------------------+
|
||||||
|
| comment |
|
||||||
|
+------------------------+
|
||||||
|
| flow level description |
|
||||||
|
+------------------------+
|
||||||
|
|
||||||
-- Remove flow comment
|
-- Remove flow comment
|
||||||
COMMENT ON FLOW flow_comment_test IS NULL;
|
COMMENT ON FLOW flow_comment_test IS NULL;
|
||||||
|
|
||||||
@@ -170,6 +226,16 @@ SHOW CREATE FLOW flow_comment_test;
|
|||||||
| | AS SELECT desc_str, ts FROM flow_source_comment_test |
|
| | AS SELECT desc_str, ts FROM flow_source_comment_test |
|
||||||
+-------------------+------------------------------------------------------+
|
+-------------------+------------------------------------------------------+
|
||||||
|
|
||||||
|
SELECT comment
|
||||||
|
FROM information_schema.flows
|
||||||
|
WHERE flow_name = 'flow_comment_test';
|
||||||
|
|
||||||
|
+---------+
|
||||||
|
| comment |
|
||||||
|
+---------+
|
||||||
|
| |
|
||||||
|
+---------+
|
||||||
|
|
||||||
DROP FLOW flow_comment_test;
|
DROP FLOW flow_comment_test;
|
||||||
|
|
||||||
Affected Rows: 0
|
Affected Rows: 0
|
||||||
|
|||||||
@@ -9,10 +9,18 @@ CREATE TABLE comment_table_test (
|
|||||||
-- Add table comment
|
-- Add table comment
|
||||||
COMMENT ON TABLE comment_table_test IS 'table level description';
|
COMMENT ON TABLE comment_table_test IS 'table level description';
|
||||||
SHOW CREATE TABLE comment_table_test;
|
SHOW CREATE TABLE comment_table_test;
|
||||||
|
SELECT table_comment
|
||||||
|
FROM information_schema.tables
|
||||||
|
WHERE table_schema = 'public'
|
||||||
|
AND table_name = 'comment_table_test';
|
||||||
|
|
||||||
-- Remove table comment
|
-- Remove table comment
|
||||||
COMMENT ON TABLE comment_table_test IS NULL;
|
COMMENT ON TABLE comment_table_test IS NULL;
|
||||||
SHOW CREATE TABLE comment_table_test;
|
SHOW CREATE TABLE comment_table_test;
|
||||||
|
SELECT table_comment
|
||||||
|
FROM information_schema.tables
|
||||||
|
WHERE table_schema = 'public'
|
||||||
|
AND table_name = 'comment_table_test';
|
||||||
|
|
||||||
DROP TABLE comment_table_test;
|
DROP TABLE comment_table_test;
|
||||||
|
|
||||||
@@ -27,10 +35,20 @@ CREATE TABLE comment_column_test (
|
|||||||
-- Add column comment
|
-- Add column comment
|
||||||
COMMENT ON COLUMN comment_column_test.val IS 'value column description';
|
COMMENT ON COLUMN comment_column_test.val IS 'value column description';
|
||||||
SHOW CREATE TABLE comment_column_test;
|
SHOW CREATE TABLE comment_column_test;
|
||||||
|
SELECT column_comment
|
||||||
|
FROM information_schema.columns
|
||||||
|
WHERE table_schema = 'public'
|
||||||
|
AND table_name = 'comment_column_test'
|
||||||
|
AND column_name = 'val';
|
||||||
|
|
||||||
-- Remove column comment
|
-- Remove column comment
|
||||||
COMMENT ON COLUMN comment_column_test.val IS NULL;
|
COMMENT ON COLUMN comment_column_test.val IS NULL;
|
||||||
SHOW CREATE TABLE comment_column_test;
|
SHOW CREATE TABLE comment_column_test;
|
||||||
|
SELECT column_comment
|
||||||
|
FROM information_schema.columns
|
||||||
|
WHERE table_schema = 'public'
|
||||||
|
AND table_name = 'comment_column_test'
|
||||||
|
AND column_name = 'val';
|
||||||
|
|
||||||
DROP TABLE comment_column_test;
|
DROP TABLE comment_column_test;
|
||||||
|
|
||||||
@@ -54,12 +72,17 @@ SELECT desc_str, ts FROM flow_source_comment_test;
|
|||||||
-- Add flow comment
|
-- Add flow comment
|
||||||
COMMENT ON FLOW flow_comment_test IS 'flow level description';
|
COMMENT ON FLOW flow_comment_test IS 'flow level description';
|
||||||
SHOW CREATE FLOW flow_comment_test;
|
SHOW CREATE FLOW flow_comment_test;
|
||||||
|
SELECT comment
|
||||||
|
FROM information_schema.flows
|
||||||
|
WHERE flow_name = 'flow_comment_test';
|
||||||
|
|
||||||
-- Remove flow comment
|
-- Remove flow comment
|
||||||
COMMENT ON FLOW flow_comment_test IS NULL;
|
COMMENT ON FLOW flow_comment_test IS NULL;
|
||||||
SHOW CREATE FLOW flow_comment_test;
|
SHOW CREATE FLOW flow_comment_test;
|
||||||
|
SELECT comment
|
||||||
|
FROM information_schema.flows
|
||||||
|
WHERE flow_name = 'flow_comment_test';
|
||||||
|
|
||||||
DROP FLOW flow_comment_test;
|
DROP FLOW flow_comment_test;
|
||||||
DROP TABLE flow_source_comment_test;
|
DROP TABLE flow_source_comment_test;
|
||||||
DROP TABLE flow_sink_comment_test;
|
DROP TABLE flow_sink_comment_test;
|
||||||
|
|
||||||
|
|||||||
@@ -12,7 +12,7 @@ DESC TABLE t;
|
|||||||
| Column | Type | Key | Null | Default | Semantic Type |
|
| Column | Type | Key | Null | Default | Semantic Type |
|
||||||
+--------+----------------------+-----+------+---------+---------------+
|
+--------+----------------------+-----+------+---------+---------------+
|
||||||
| ts | TimestampMillisecond | PRI | NO | | TIMESTAMP |
|
| ts | TimestampMillisecond | PRI | NO | | TIMESTAMP |
|
||||||
| j | Json<Null> | | YES | | FIELD |
|
| j | Json<"<Null>"> | | YES | | FIELD |
|
||||||
+--------+----------------------+-----+------+---------+---------------+
|
+--------+----------------------+-----+------+---------+---------------+
|
||||||
|
|
||||||
INSERT INTO t VALUES
|
INSERT INTO t VALUES
|
||||||
@@ -24,12 +24,12 @@ Affected Rows: 3
|
|||||||
|
|
||||||
DESC TABLE t;
|
DESC TABLE t;
|
||||||
|
|
||||||
+--------+-----------------------------------------------------------------------------------------------------------------------------------------+-----+------+---------+---------------+
|
+--------+---------------------------------------------------------------------------------------------------+-----+------+---------+---------------+
|
||||||
| Column | Type | Key | Null | Default | Semantic Type |
|
| Column | Type | Key | Null | Default | Semantic Type |
|
||||||
+--------+-----------------------------------------------------------------------------------------------------------------------------------------+-----+------+---------+---------------+
|
+--------+---------------------------------------------------------------------------------------------------+-----+------+---------+---------------+
|
||||||
| ts | TimestampMillisecond | PRI | NO | | TIMESTAMP |
|
| ts | TimestampMillisecond | PRI | NO | | TIMESTAMP |
|
||||||
| j | Json<Object{"int": Number(I64), "list": Array[Number(F64)], "nested": Object{"a": Object{"x": String}, "b": Object{"y": Number(I64)}}}> | | YES | | FIELD |
|
| j | Json<{"int":"<Number>","list":["<Number>"],"nested":{"a":{"x":"<String>"},"b":{"y":"<Number>"}}}> | | YES | | FIELD |
|
||||||
+--------+-----------------------------------------------------------------------------------------------------------------------------------------+-----+------+---------+---------------+
|
+--------+---------------------------------------------------------------------------------------------------+-----+------+---------+---------------+
|
||||||
|
|
||||||
INSERT INTO t VALUES
|
INSERT INTO t VALUES
|
||||||
(1762128004000, '{"int": 4, "bool": true, "nested": {"a": {"y": 1}}}'),
|
(1762128004000, '{"int": 4, "bool": true, "nested": {"a": {"y": 1}}}'),
|
||||||
@@ -39,12 +39,12 @@ Affected Rows: 2
|
|||||||
|
|
||||||
DESC TABLE t;
|
DESC TABLE t;
|
||||||
|
|
||||||
+--------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+------+---------+---------------+
|
+--------+-------------------------------------------------------------------------------------------------------------------------------------------------+-----+------+---------+---------------+
|
||||||
| Column | Type | Key | Null | Default | Semantic Type |
|
| Column | Type | Key | Null | Default | Semantic Type |
|
||||||
+--------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+------+---------+---------------+
|
+--------+-------------------------------------------------------------------------------------------------------------------------------------------------+-----+------+---------+---------------+
|
||||||
| ts | TimestampMillisecond | PRI | NO | | TIMESTAMP |
|
| ts | TimestampMillisecond | PRI | NO | | TIMESTAMP |
|
||||||
| j | Json<Object{"bool": Bool, "int": Number(I64), "list": Array[Number(F64)], "nested": Object{"a": Object{"x": String, "y": Number(I64)}, "b": Object{"x": String, "y": Number(I64)}}}> | | YES | | FIELD |
|
| j | Json<{"bool":"<Bool>","int":"<Number>","list":["<Number>"],"nested":{"a":{"x":"<String>","y":"<Number>"},"b":{"x":"<String>","y":"<Number>"}}}> | | YES | | FIELD |
|
||||||
+--------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+------+---------+---------------+
|
+--------+-------------------------------------------------------------------------------------------------------------------------------------------------+-----+------+---------+---------------+
|
||||||
|
|
||||||
INSERT INTO t VALUES (1762128006000, '{"int": 6, "list": [-6.0], "bool": true, "nested": {"a": {"x": "ax", "y": 66}, "b": {"y": -66, "x": "bx"}}}');
|
INSERT INTO t VALUES (1762128006000, '{"int": 6, "list": [-6.0], "bool": true, "nested": {"a": {"x": "ax", "y": 66}, "b": {"y": -66, "x": "bx"}}}');
|
||||||
|
|
||||||
@@ -52,12 +52,12 @@ Affected Rows: 1
|
|||||||
|
|
||||||
DESC TABLE t;
|
DESC TABLE t;
|
||||||
|
|
||||||
+--------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+------+---------+---------------+
|
+--------+-------------------------------------------------------------------------------------------------------------------------------------------------+-----+------+---------+---------------+
|
||||||
| Column | Type | Key | Null | Default | Semantic Type |
|
| Column | Type | Key | Null | Default | Semantic Type |
|
||||||
+--------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+------+---------+---------------+
|
+--------+-------------------------------------------------------------------------------------------------------------------------------------------------+-----+------+---------+---------------+
|
||||||
| ts | TimestampMillisecond | PRI | NO | | TIMESTAMP |
|
| ts | TimestampMillisecond | PRI | NO | | TIMESTAMP |
|
||||||
| j | Json<Object{"bool": Bool, "int": Number(I64), "list": Array[Number(F64)], "nested": Object{"a": Object{"x": String, "y": Number(I64)}, "b": Object{"x": String, "y": Number(I64)}}}> | | YES | | FIELD |
|
| j | Json<{"bool":"<Bool>","int":"<Number>","list":["<Number>"],"nested":{"a":{"x":"<String>","y":"<Number>"},"b":{"x":"<String>","y":"<Number>"}}}> | | YES | | FIELD |
|
||||||
+--------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+------+---------+---------------+
|
+--------+-------------------------------------------------------------------------------------------------------------------------------------------------+-----+------+---------+---------------+
|
||||||
|
|
||||||
INSERT INTO t VALUES (1762128011000, '{}');
|
INSERT INTO t VALUES (1762128011000, '{}');
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user