mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-23 16:30:39 +00:00
chore: merge branch 'main' into mysql-kvbackend-split
This commit is contained in:
@@ -14,7 +14,7 @@ workspace = true
|
||||
api.workspace = true
|
||||
arrow-flight.workspace = true
|
||||
async-stream.workspace = true
|
||||
async-trait = "0.1"
|
||||
async-trait.workspace = true
|
||||
auth.workspace = true
|
||||
axum.workspace = true
|
||||
cache.workspace = true
|
||||
|
||||
@@ -308,11 +308,10 @@ impl GreptimeDbClusterBuilder {
|
||||
expected_datanodes: usize,
|
||||
) {
|
||||
for _ in 0..10 {
|
||||
let alive_datanodes =
|
||||
meta_srv::lease::alive_datanodes(1000, meta_peer_client, u64::MAX)
|
||||
.await
|
||||
.unwrap()
|
||||
.len();
|
||||
let alive_datanodes = meta_srv::lease::alive_datanodes(meta_peer_client, u64::MAX)
|
||||
.await
|
||||
.unwrap()
|
||||
.len();
|
||||
if alive_datanodes == expected_datanodes {
|
||||
return;
|
||||
}
|
||||
@@ -322,10 +321,9 @@ impl GreptimeDbClusterBuilder {
|
||||
}
|
||||
|
||||
async fn create_datanode(&self, opts: DatanodeOptions, metasrv: MockInfo) -> Datanode {
|
||||
let mut meta_client =
|
||||
MetaClientBuilder::datanode_default_options(1000, opts.node_id.unwrap())
|
||||
.channel_manager(metasrv.channel_manager)
|
||||
.build();
|
||||
let mut meta_client = MetaClientBuilder::datanode_default_options(opts.node_id.unwrap())
|
||||
.channel_manager(metasrv.channel_manager)
|
||||
.build();
|
||||
meta_client.start(&[&metasrv.server_addr]).await.unwrap();
|
||||
let meta_client = Arc::new(meta_client);
|
||||
|
||||
@@ -357,7 +355,7 @@ impl GreptimeDbClusterBuilder {
|
||||
metasrv: MockInfo,
|
||||
datanode_clients: Arc<NodeClients>,
|
||||
) -> Arc<FeInstance> {
|
||||
let mut meta_client = MetaClientBuilder::frontend_default_options(1000)
|
||||
let mut meta_client = MetaClientBuilder::frontend_default_options()
|
||||
.channel_manager(metasrv.channel_manager)
|
||||
.enable_access_cluster_info()
|
||||
.build();
|
||||
|
||||
@@ -23,7 +23,7 @@ use common_error::status_code::StatusCode as ErrorCode;
|
||||
use flate2::write::GzEncoder;
|
||||
use flate2::Compression;
|
||||
use log_query::{Context, Limit, LogQuery, TimeFilter};
|
||||
use loki_proto::logproto::{EntryAdapter, PushRequest, StreamAdapter};
|
||||
use loki_proto::logproto::{EntryAdapter, LabelPairAdapter, PushRequest, StreamAdapter};
|
||||
use loki_proto::prost_types::Timestamp;
|
||||
use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest;
|
||||
use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest;
|
||||
@@ -96,7 +96,8 @@ macro_rules! http_tests {
|
||||
test_pipeline_dispatcher,
|
||||
|
||||
test_otlp_metrics,
|
||||
test_otlp_traces,
|
||||
test_otlp_traces_v0,
|
||||
test_otlp_traces_v1,
|
||||
test_otlp_logs,
|
||||
test_loki_pb_logs,
|
||||
test_loki_json_logs,
|
||||
@@ -413,6 +414,18 @@ pub async fn test_sql_api(store_type: StorageType) {
|
||||
let body = serde_json::from_str::<ErrorResponse>(&res.text().await).unwrap();
|
||||
assert_eq!(body.code(), ErrorCode::DatabaseNotFound as u32);
|
||||
|
||||
// test analyze format
|
||||
let res = client
|
||||
.get("/v1/sql?sql=explain analyze format json select cpu, ts from demo limit 1")
|
||||
.send()
|
||||
.await;
|
||||
assert_eq!(res.status(), StatusCode::OK);
|
||||
let body = serde_json::from_str::<GreptimedbV1Response>(&res.text().await).unwrap();
|
||||
let output = body.output();
|
||||
assert_eq!(output.len(), 1);
|
||||
// this is something only json format can show
|
||||
assert!(format!("{:?}", output[0]).contains("\\\"param\\\""));
|
||||
|
||||
// test parse method
|
||||
let res = client.get("/v1/sql/parse?sql=desc table t").send().await;
|
||||
assert_eq!(res.status(), StatusCode::OK);
|
||||
@@ -1007,6 +1020,7 @@ min_compaction_interval = "0s"
|
||||
[region_engine.mito.index]
|
||||
aux_path = ""
|
||||
staging_size = "2GiB"
|
||||
staging_ttl = "7days"
|
||||
write_buffer_size = "8MiB"
|
||||
content_cache_page_size = "64KiB"
|
||||
|
||||
@@ -1224,10 +1238,14 @@ transform:
|
||||
- id2
|
||||
type: int32
|
||||
- fields:
|
||||
- type
|
||||
- log
|
||||
- logger
|
||||
type: string
|
||||
- field: type
|
||||
type: string
|
||||
index: skipping
|
||||
- field: log
|
||||
type: string
|
||||
index: fulltext
|
||||
- field: time
|
||||
type: time
|
||||
index: timestamp
|
||||
@@ -1301,11 +1319,22 @@ transform:
|
||||
.await;
|
||||
assert_eq!(res.status(), StatusCode::OK);
|
||||
|
||||
let encoded: String = url::form_urlencoded::byte_serialize(version_str.as_bytes()).collect();
|
||||
// 3. check schema
|
||||
|
||||
// 3. remove pipeline
|
||||
let expected_schema = "[[\"logs1\",\"CREATE TABLE IF NOT EXISTS \\\"logs1\\\" (\\n \\\"id1\\\" INT NULL,\\n \\\"id2\\\" INT NULL,\\n \\\"logger\\\" STRING NULL,\\n \\\"type\\\" STRING NULL SKIPPING INDEX WITH(granularity = '10240', type = 'BLOOM'),\\n \\\"log\\\" STRING NULL FULLTEXT INDEX WITH(analyzer = 'English', case_sensitive = 'false'),\\n \\\"time\\\" TIMESTAMP(9) NOT NULL,\\n TIME INDEX (\\\"time\\\")\\n)\\n\\nENGINE=mito\\nWITH(\\n append_mode = 'true'\\n)\"]]";
|
||||
validate_data(
|
||||
"pipeline_schema",
|
||||
&client,
|
||||
"show create table logs1",
|
||||
expected_schema,
|
||||
)
|
||||
.await;
|
||||
|
||||
// 4. remove pipeline
|
||||
let encoded_ver_str: String =
|
||||
url::form_urlencoded::byte_serialize(version_str.as_bytes()).collect();
|
||||
let res = client
|
||||
.delete(format!("/v1/pipelines/test?version={}", encoded).as_str())
|
||||
.delete(format!("/v1/pipelines/test?version={}", encoded_ver_str).as_str())
|
||||
.send()
|
||||
.await;
|
||||
|
||||
@@ -1321,7 +1350,7 @@ transform:
|
||||
format!(r#"[{{"name":"test","version":"{}"}}]"#, version_str).as_str()
|
||||
);
|
||||
|
||||
// 4. write data failed
|
||||
// 5. write data failed
|
||||
let res = client
|
||||
.post("/v1/ingest?db=public&table=logs1&pipeline_name=test")
|
||||
.header("Content-Type", "application/json")
|
||||
@@ -2025,7 +2054,7 @@ pub async fn test_otlp_metrics(store_type: StorageType) {
|
||||
guard.remove_all().await;
|
||||
}
|
||||
|
||||
pub async fn test_otlp_traces(store_type: StorageType) {
|
||||
pub async fn test_otlp_traces_v0(store_type: StorageType) {
|
||||
// init
|
||||
common_telemetry::init_default_ut_logging();
|
||||
let (app, mut guard) = setup_test_http_app_with_frontend(store_type, "test_otlp_traces").await;
|
||||
@@ -2097,6 +2126,99 @@ pub async fn test_otlp_traces(store_type: StorageType) {
|
||||
guard.remove_all().await;
|
||||
}
|
||||
|
||||
pub async fn test_otlp_traces_v1(store_type: StorageType) {
|
||||
// init
|
||||
common_telemetry::init_default_ut_logging();
|
||||
let (app, mut guard) = setup_test_http_app_with_frontend(store_type, "test_otlp_traces").await;
|
||||
const TRACE_V1: &str = "greptime_trace_v1";
|
||||
|
||||
let content = r#"
|
||||
{"resourceSpans":[{"resource":{"attributes":[{"key":"service.name","value":{"stringValue":"telemetrygen"}}],"droppedAttributesCount":0},"scopeSpans":[{"scope":{"name":"telemetrygen","version":"","attributes":[],"droppedAttributesCount":0},"spans":[{"traceId":"c05d7a4ec8e1f231f02ed6e8da8655b4","spanId":"9630f2916e2f7909","traceState":"","parentSpanId":"d24f921c75f68e23","flags":256,"name":"okey-dokey-0","kind":2,"startTimeUnixNano":"1736480942444376000","endTimeUnixNano":"1736480942444499000","attributes":[{"key":"net.peer.ip","value":{"stringValue":"1.2.3.4"}},{"key":"peer.service","value":{"stringValue":"telemetrygen-client"}}],"droppedAttributesCount":0,"events":[],"droppedEventsCount":0,"links":[],"droppedLinksCount":0,"status":{"message":"","code":0}},{"traceId":"c05d7a4ec8e1f231f02ed6e8da8655b4","spanId":"d24f921c75f68e23","traceState":"","parentSpanId":"","flags":256,"name":"lets-go","kind":3,"startTimeUnixNano":"1736480942444376000","endTimeUnixNano":"1736480942444499000","attributes":[{"key":"net.peer.ip","value":{"stringValue":"1.2.3.4"}},{"key":"peer.service","value":{"stringValue":"telemetrygen-server"}}],"droppedAttributesCount":0,"events":[],"droppedEventsCount":0,"links":[],"droppedLinksCount":0,"status":{"message":"","code":0}},{"traceId":"cc9e0991a2e63d274984bd44ee669203","spanId":"8f847259b0f6e1ab","traceState":"","parentSpanId":"eba7be77e3558179","flags":256,"name":"okey-dokey-0","kind":2,"startTimeUnixNano":"1736480942444589000","endTimeUnixNano":"1736480942444712000","attributes":[{"key":"net.peer.ip","value":{"stringValue":"1.2.3.4"}},{"key":"peer.service","value":{"stringValue":"telemetrygen-client"}}],"droppedAttributesCount":0,"events":[],"droppedEventsCount":0,"links":[],"droppedLinksCount":0,"status":{"message":"","code":0}},{"traceId":"cc9e0991a2e63d274984bd44ee669203","spanId":"eba7be77e3558179","traceState":"","parentSpanId":"","flags":256,"name":"lets-go","kind":3,"startTimeUnixNano":"1736480942444589000","endTimeUnixNano":"1736480942444712000","attributes":[{"key":"net.peer.ip","value":{"stringValue":"1.2.3.4"}},{"key":"peer.service","value":{"stringValue":"telemetrygen-server"}}],"droppedAttributesCount":0,"events":[],"droppedEventsCount":0,"links":[],"droppedLinksCount":0,"status":{"message":"","code":0}}],"schemaUrl":""}],"schemaUrl":"https://opentelemetry.io/schemas/1.4.0"}]}
|
||||
"#;
|
||||
|
||||
let req: ExportTraceServiceRequest = serde_json::from_str(content).unwrap();
|
||||
let body = req.encode_to_vec();
|
||||
|
||||
// handshake
|
||||
let client = TestClient::new(app).await;
|
||||
|
||||
// write traces data
|
||||
let res = send_req(
|
||||
&client,
|
||||
vec![
|
||||
(
|
||||
HeaderName::from_static("content-type"),
|
||||
HeaderValue::from_static("application/x-protobuf"),
|
||||
),
|
||||
(
|
||||
HeaderName::from_static("x-greptime-log-pipeline-name"),
|
||||
HeaderValue::from_static(TRACE_V1),
|
||||
),
|
||||
(
|
||||
HeaderName::from_static("x-greptime-trace-table-name"),
|
||||
HeaderValue::from_static("mytable"),
|
||||
),
|
||||
],
|
||||
"/v1/otlp/v1/traces",
|
||||
body.clone(),
|
||||
false,
|
||||
)
|
||||
.await;
|
||||
assert_eq!(StatusCode::OK, res.status());
|
||||
|
||||
// select traces data
|
||||
let expected = r#"[[1736480942444376000,1736480942444499000,123000,"c05d7a4ec8e1f231f02ed6e8da8655b4","9630f2916e2f7909","d24f921c75f68e23","SPAN_KIND_SERVER","okey-dokey-0","STATUS_CODE_UNSET","","","telemetrygen","","telemetrygen","1.2.3.4","telemetrygen-client",[],[]],[1736480942444376000,1736480942444499000,123000,"c05d7a4ec8e1f231f02ed6e8da8655b4","d24f921c75f68e23","","SPAN_KIND_CLIENT","lets-go","STATUS_CODE_UNSET","","","telemetrygen","","telemetrygen","1.2.3.4","telemetrygen-server",[],[]],[1736480942444589000,1736480942444712000,123000,"cc9e0991a2e63d274984bd44ee669203","8f847259b0f6e1ab","eba7be77e3558179","SPAN_KIND_SERVER","okey-dokey-0","STATUS_CODE_UNSET","","","telemetrygen","","telemetrygen","1.2.3.4","telemetrygen-client",[],[]],[1736480942444589000,1736480942444712000,123000,"cc9e0991a2e63d274984bd44ee669203","eba7be77e3558179","","SPAN_KIND_CLIENT","lets-go","STATUS_CODE_UNSET","","","telemetrygen","","telemetrygen","1.2.3.4","telemetrygen-server",[],[]]]"#;
|
||||
validate_data("otlp_traces", &client, "select * from mytable;", expected).await;
|
||||
|
||||
let expected_ddl = r#"[["mytable","CREATE TABLE IF NOT EXISTS \"mytable\" (\n \"timestamp\" TIMESTAMP(9) NOT NULL,\n \"timestamp_end\" TIMESTAMP(9) NULL,\n \"duration_nano\" BIGINT UNSIGNED NULL,\n \"trace_id\" STRING NULL SKIPPING INDEX WITH(granularity = '10240', type = 'BLOOM'),\n \"span_id\" STRING NULL,\n \"parent_span_id\" STRING NULL SKIPPING INDEX WITH(granularity = '10240', type = 'BLOOM'),\n \"span_kind\" STRING NULL,\n \"span_name\" STRING NULL SKIPPING INDEX WITH(granularity = '10240', type = 'BLOOM'),\n \"span_status_code\" STRING NULL,\n \"span_status_message\" STRING NULL,\n \"trace_state\" STRING NULL,\n \"scope_name\" STRING NULL,\n \"scope_version\" STRING NULL,\n \"service_name\" STRING NULL,\n \"span_attributes.net.peer.ip\" STRING NULL,\n \"span_attributes.peer.service\" STRING NULL,\n \"span_events\" JSON NULL,\n \"span_links\" JSON NULL,\n TIME INDEX (\"timestamp\"),\n PRIMARY KEY (\"trace_id\", \"span_id\")\n)\nPARTITION ON COLUMNS (\"trace_id\") (\n trace_id < '1',\n trace_id >= '1' AND trace_id < '2',\n trace_id >= '2' AND trace_id < '3',\n trace_id >= '3' AND trace_id < '4',\n trace_id >= '4' AND trace_id < '5',\n trace_id >= '5' AND trace_id < '6',\n trace_id >= '6' AND trace_id < '7',\n trace_id >= '7' AND trace_id < '8',\n trace_id >= '8' AND trace_id < '9',\n trace_id >= '9' AND trace_id < 'A',\n trace_id >= 'A' AND trace_id < 'B' OR trace_id >= 'a' AND trace_id < 'b',\n trace_id >= 'B' AND trace_id < 'C' OR trace_id >= 'b' AND trace_id < 'c',\n trace_id >= 'C' AND trace_id < 'D' OR trace_id >= 'c' AND trace_id < 'd',\n trace_id >= 'D' AND trace_id < 'E' OR trace_id >= 'd' AND trace_id < 'e',\n trace_id >= 'E' AND trace_id < 'F' OR trace_id >= 'e' AND trace_id < 'f',\n trace_id >= 'F' AND trace_id < 'a' OR trace_id >= 'f'\n)\nENGINE=mito\nWITH(\n append_mode = 'true'\n)"]]"#;
|
||||
validate_data(
|
||||
"otlp_traces",
|
||||
&client,
|
||||
"show create table mytable;",
|
||||
expected_ddl,
|
||||
)
|
||||
.await;
|
||||
|
||||
// drop table
|
||||
let res = client.get("/v1/sql?sql=drop table mytable;").send().await;
|
||||
assert_eq!(res.status(), StatusCode::OK);
|
||||
|
||||
// write traces data with gzip
|
||||
let res = send_req(
|
||||
&client,
|
||||
vec![
|
||||
(
|
||||
HeaderName::from_static("content-type"),
|
||||
HeaderValue::from_static("application/x-protobuf"),
|
||||
),
|
||||
(
|
||||
HeaderName::from_static("x-greptime-log-pipeline-name"),
|
||||
HeaderValue::from_static(TRACE_V1),
|
||||
),
|
||||
(
|
||||
HeaderName::from_static("x-greptime-trace-table-name"),
|
||||
HeaderValue::from_static("mytable"),
|
||||
),
|
||||
],
|
||||
"/v1/otlp/v1/traces",
|
||||
body.clone(),
|
||||
true,
|
||||
)
|
||||
.await;
|
||||
assert_eq!(StatusCode::OK, res.status());
|
||||
|
||||
// select traces data again
|
||||
validate_data(
|
||||
"otlp_traces_with_gzip",
|
||||
&client,
|
||||
"select * from mytable;",
|
||||
expected,
|
||||
)
|
||||
.await;
|
||||
|
||||
guard.remove_all().await;
|
||||
}
|
||||
|
||||
pub async fn test_otlp_logs(store_type: StorageType) {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
let (app, mut guard) = setup_test_http_app_with_frontend(store_type, "test_otlp_logs").await;
|
||||
@@ -2225,12 +2347,30 @@ pub async fn test_loki_pb_logs(store_type: StorageType) {
|
||||
EntryAdapter {
|
||||
timestamp: Some(Timestamp::from_str("2024-11-07T10:53:50").unwrap()),
|
||||
line: "this is a log message".to_string(),
|
||||
structured_metadata: vec![],
|
||||
structured_metadata: vec![
|
||||
LabelPairAdapter {
|
||||
name: "key1".to_string(),
|
||||
value: "value1".to_string(),
|
||||
},
|
||||
LabelPairAdapter {
|
||||
name: "key2".to_string(),
|
||||
value: "value2".to_string(),
|
||||
},
|
||||
],
|
||||
parsed: vec![],
|
||||
},
|
||||
EntryAdapter {
|
||||
timestamp: Some(Timestamp::from_str("2024-11-07T10:53:50").unwrap()),
|
||||
line: "this is a log message".to_string(),
|
||||
timestamp: Some(Timestamp::from_str("2024-11-07T10:53:51").unwrap()),
|
||||
line: "this is a log message 2".to_string(),
|
||||
structured_metadata: vec![LabelPairAdapter {
|
||||
name: "key3".to_string(),
|
||||
value: "value3".to_string(),
|
||||
}],
|
||||
parsed: vec![],
|
||||
},
|
||||
EntryAdapter {
|
||||
timestamp: Some(Timestamp::from_str("2024-11-07T10:53:52").unwrap()),
|
||||
line: "this is a log message 2".to_string(),
|
||||
structured_metadata: vec![],
|
||||
parsed: vec![],
|
||||
},
|
||||
@@ -2270,7 +2410,7 @@ pub async fn test_loki_pb_logs(store_type: StorageType) {
|
||||
assert_eq!(StatusCode::OK, res.status());
|
||||
|
||||
// test schema
|
||||
let expected = "[[\"loki_table_name\",\"CREATE TABLE IF NOT EXISTS \\\"loki_table_name\\\" (\\n \\\"greptime_timestamp\\\" TIMESTAMP(9) NOT NULL,\\n \\\"line\\\" STRING NULL,\\n \\\"service\\\" STRING NULL,\\n \\\"source\\\" STRING NULL,\\n \\\"wadaxi\\\" STRING NULL,\\n TIME INDEX (\\\"greptime_timestamp\\\"),\\n PRIMARY KEY (\\\"service\\\", \\\"source\\\", \\\"wadaxi\\\")\\n)\\n\\nENGINE=mito\\nWITH(\\n append_mode = 'true'\\n)\"]]";
|
||||
let expected = "[[\"loki_table_name\",\"CREATE TABLE IF NOT EXISTS \\\"loki_table_name\\\" (\\n \\\"greptime_timestamp\\\" TIMESTAMP(9) NOT NULL,\\n \\\"line\\\" STRING NULL,\\n \\\"structured_metadata\\\" JSON NULL,\\n \\\"service\\\" STRING NULL,\\n \\\"source\\\" STRING NULL,\\n \\\"wadaxi\\\" STRING NULL,\\n TIME INDEX (\\\"greptime_timestamp\\\"),\\n PRIMARY KEY (\\\"service\\\", \\\"source\\\", \\\"wadaxi\\\")\\n)\\n\\nENGINE=mito\\nWITH(\\n append_mode = 'true'\\n)\"]]";
|
||||
validate_data(
|
||||
"loki_pb_schema",
|
||||
&client,
|
||||
@@ -2280,7 +2420,7 @@ pub async fn test_loki_pb_logs(store_type: StorageType) {
|
||||
.await;
|
||||
|
||||
// test content
|
||||
let expected = r#"[[1730976830000000000,"this is a log message","test","integration","do anything"],[1730976830000000000,"this is a log message","test","integration","do anything"]]"#;
|
||||
let expected = "[[1730976830000000000,\"this is a log message\",{\"key1\":\"value1\",\"key2\":\"value2\"},\"test\",\"integration\",\"do anything\"],[1730976831000000000,\"this is a log message 2\",{\"key3\":\"value3\"},\"test\",\"integration\",\"do anything\"],[1730976832000000000,\"this is a log message 2\",{},\"test\",\"integration\",\"do anything\"]]";
|
||||
validate_data(
|
||||
"loki_pb_content",
|
||||
&client,
|
||||
@@ -2308,8 +2448,9 @@ pub async fn test_loki_json_logs(store_type: StorageType) {
|
||||
"sender": "integration"
|
||||
},
|
||||
"values": [
|
||||
[ "1735901380059465984", "this is line one" ],
|
||||
[ "1735901398478897920", "this is line two" ]
|
||||
[ "1735901380059465984", "this is line one", {"key1":"value1","key2":"value2"}],
|
||||
[ "1735901398478897920", "this is line two", {"key3":"value3"}],
|
||||
[ "1735901398478897921", "this is line two updated"]
|
||||
]
|
||||
}
|
||||
]
|
||||
@@ -2339,7 +2480,7 @@ pub async fn test_loki_json_logs(store_type: StorageType) {
|
||||
assert_eq!(StatusCode::OK, res.status());
|
||||
|
||||
// test schema
|
||||
let expected = "[[\"loki_table_name\",\"CREATE TABLE IF NOT EXISTS \\\"loki_table_name\\\" (\\n \\\"greptime_timestamp\\\" TIMESTAMP(9) NOT NULL,\\n \\\"line\\\" STRING NULL,\\n \\\"sender\\\" STRING NULL,\\n \\\"source\\\" STRING NULL,\\n TIME INDEX (\\\"greptime_timestamp\\\"),\\n PRIMARY KEY (\\\"sender\\\", \\\"source\\\")\\n)\\n\\nENGINE=mito\\nWITH(\\n append_mode = 'true'\\n)\"]]";
|
||||
let expected = "[[\"loki_table_name\",\"CREATE TABLE IF NOT EXISTS \\\"loki_table_name\\\" (\\n \\\"greptime_timestamp\\\" TIMESTAMP(9) NOT NULL,\\n \\\"line\\\" STRING NULL,\\n \\\"structured_metadata\\\" JSON NULL,\\n \\\"sender\\\" STRING NULL,\\n \\\"source\\\" STRING NULL,\\n TIME INDEX (\\\"greptime_timestamp\\\"),\\n PRIMARY KEY (\\\"sender\\\", \\\"source\\\")\\n)\\n\\nENGINE=mito\\nWITH(\\n append_mode = 'true'\\n)\"]]";
|
||||
validate_data(
|
||||
"loki_json_schema",
|
||||
&client,
|
||||
@@ -2349,7 +2490,7 @@ pub async fn test_loki_json_logs(store_type: StorageType) {
|
||||
.await;
|
||||
|
||||
// test content
|
||||
let expected = "[[1735901380059465984,\"this is line one\",\"integration\",\"test\"],[1735901398478897920,\"this is line two\",\"integration\",\"test\"]]";
|
||||
let expected = "[[1735901380059465984,\"this is line one\",{\"key1\":\"value1\",\"key2\":\"value2\"},\"integration\",\"test\"],[1735901398478897920,\"this is line two\",{\"key3\":\"value3\"},\"integration\",\"test\"],[1735901398478897921,\"this is line two updated\",{},\"integration\",\"test\"]]";
|
||||
validate_data(
|
||||
"loki_json_content",
|
||||
&client,
|
||||
|
||||
@@ -35,7 +35,7 @@ use futures::future::BoxFuture;
|
||||
use meta_srv::error::Result as MetaResult;
|
||||
use meta_srv::metasrv::SelectorContext;
|
||||
use meta_srv::procedure::region_migration::RegionMigrationProcedureTask;
|
||||
use meta_srv::selector::{Namespace, Selector, SelectorOptions};
|
||||
use meta_srv::selector::{Selector, SelectorOptions};
|
||||
use servers::query_handler::sql::SqlQueryHandler;
|
||||
use session::context::{QueryContext, QueryContextRef};
|
||||
use store_api::storage::RegionId;
|
||||
@@ -169,7 +169,6 @@ pub async fn test_region_migration(store_type: StorageType, endpoints: Vec<Strin
|
||||
// Trigger region migration.
|
||||
let procedure = region_migration_manager
|
||||
.submit_procedure(RegionMigrationProcedureTask::new(
|
||||
0,
|
||||
region_id,
|
||||
peer_factory(from_peer_id),
|
||||
peer_factory(to_peer_id),
|
||||
@@ -216,7 +215,6 @@ pub async fn test_region_migration(store_type: StorageType, endpoints: Vec<Strin
|
||||
// Triggers again.
|
||||
let procedure = region_migration_manager
|
||||
.submit_procedure(RegionMigrationProcedureTask::new(
|
||||
0,
|
||||
region_id,
|
||||
peer_factory(from_peer_id),
|
||||
peer_factory(to_peer_id),
|
||||
@@ -473,7 +471,6 @@ pub async fn test_region_migration_by_sql(store_type: StorageType, endpoints: Ve
|
||||
// Triggers again.
|
||||
let procedure = region_migration_manager
|
||||
.submit_procedure(RegionMigrationProcedureTask::new(
|
||||
0,
|
||||
region_id,
|
||||
peer_factory(from_peer_id),
|
||||
peer_factory(to_peer_id),
|
||||
@@ -578,7 +575,6 @@ pub async fn test_region_migration_multiple_regions(
|
||||
// Trigger region migration.
|
||||
let procedure = region_migration_manager
|
||||
.submit_procedure(RegionMigrationProcedureTask::new(
|
||||
0,
|
||||
region_id,
|
||||
peer_factory(from_peer_id),
|
||||
peer_factory(to_peer_id),
|
||||
@@ -625,7 +621,6 @@ pub async fn test_region_migration_multiple_regions(
|
||||
// Triggers again.
|
||||
let procedure = region_migration_manager
|
||||
.submit_procedure(RegionMigrationProcedureTask::new(
|
||||
0,
|
||||
region_id,
|
||||
peer_factory(from_peer_id),
|
||||
peer_factory(to_peer_id),
|
||||
@@ -715,7 +710,6 @@ pub async fn test_region_migration_all_regions(store_type: StorageType, endpoint
|
||||
// Trigger region migration.
|
||||
let procedure = region_migration_manager
|
||||
.submit_procedure(RegionMigrationProcedureTask::new(
|
||||
0,
|
||||
region_id,
|
||||
peer_factory(from_peer_id),
|
||||
peer_factory(to_peer_id),
|
||||
@@ -763,7 +757,6 @@ pub async fn test_region_migration_all_regions(store_type: StorageType, endpoint
|
||||
// Triggers again.
|
||||
let procedure = region_migration_manager
|
||||
.submit_procedure(RegionMigrationProcedureTask::new(
|
||||
0,
|
||||
region_id,
|
||||
peer_factory(from_peer_id),
|
||||
peer_factory(to_peer_id),
|
||||
@@ -842,7 +835,6 @@ pub async fn test_region_migration_incorrect_from_peer(
|
||||
// Trigger region migration.
|
||||
let err = region_migration_manager
|
||||
.submit_procedure(RegionMigrationProcedureTask::new(
|
||||
0,
|
||||
region_id,
|
||||
peer_factory(5),
|
||||
peer_factory(1),
|
||||
@@ -925,7 +917,6 @@ pub async fn test_region_migration_incorrect_region_id(
|
||||
// Trigger region migration.
|
||||
let err = region_migration_manager
|
||||
.submit_procedure(RegionMigrationProcedureTask::new(
|
||||
0,
|
||||
region_id,
|
||||
peer_factory(2),
|
||||
peer_factory(1),
|
||||
@@ -957,7 +948,6 @@ impl Selector for ConstNodeSelector {
|
||||
|
||||
async fn select(
|
||||
&self,
|
||||
_ns: Namespace,
|
||||
_ctx: &Self::Context,
|
||||
_opts: SelectorOptions,
|
||||
) -> MetaResult<Self::Output> {
|
||||
|
||||
Reference in New Issue
Block a user