diff --git a/src/pipeline/src/manager/pipeline_operator.rs b/src/pipeline/src/manager/pipeline_operator.rs index 77ef8ade23..6c4256db69 100644 --- a/src/pipeline/src/manager/pipeline_operator.rs +++ b/src/pipeline/src/manager/pipeline_operator.rs @@ -20,6 +20,7 @@ use api::v1::CreateTableExpr; use catalog::{CatalogManagerRef, RegisterSystemTableRequest}; use common_catalog::consts::{DEFAULT_PRIVATE_SCHEMA_NAME, default_engine}; use common_telemetry::info; +use common_time::FOREVER; use datatypes::timestamp::TimestampNanosecond; use futures::FutureExt; use operator::insert::InserterRef; @@ -28,6 +29,7 @@ use query::QueryEngineRef; use session::context::QueryContextRef; use snafu::{OptionExt, ResultExt}; use table::TableRef; +use table::requests::TTL_KEY; use crate::Pipeline; use crate::error::{CatalogSnafu, CreateTableSnafu, PipelineTableNotFoundSnafu, Result}; @@ -59,6 +61,9 @@ impl PipelineOperator { fn create_table_request(&self, catalog: &str) -> RegisterSystemTableRequest { let (time_index, primary_keys, column_defs) = PipelineTable::build_pipeline_schema(); + let mut table_options = HashMap::new(); + table_options.insert(TTL_KEY.to_string(), FOREVER.to_string()); + let create_table_expr = CreateTableExpr { catalog_name: catalog.to_string(), schema_name: DEFAULT_PRIVATE_SCHEMA_NAME.to_string(), @@ -68,7 +73,7 @@ impl PipelineOperator { time_index, primary_keys, create_if_not_exists: true, - table_options: Default::default(), + table_options, table_id: None, // Should and will be assigned by Meta. engine: default_engine().to_string(), };