Airflow Task Groups for TM1 Bedrock Pipelines

The TM1_bedrock_py.airflow_executor.async_executor module exposes a collection of Apache Airflow task-group factories. Each factory wraps one of the synchronous Bedrock workflows and turns it into a DAG-friendly building block that Airflow can schedule in parallel. This page documents every task group, its parameters, and the shared orchestration pattern.

Overview

Every task group follows the same four-stage workflow:

  1. Prepare metadata – shared mapping dataframes are built, target-cube metadata is cached, and SQL/CSV resources are enumerated.

  2. Clear the target once – cubes or tables are wiped before fan-out so each slice writes into a clean space.

  3. Generate slice parameters – the same slicing helpers used in TM1 / TM1 Parallel Processing Manual expand the param_set_mdx_list into a workload plan.

  4. Delegate parallel execution to Airflow – each slice becomes an individual task managed by Airflow’s scheduler, which provides pooling, retries, backfills, and timeouts without relying on Python threads.

Task group flow

High-level flow example: [metadata prep, target clear, slicing] → Airflow fan-out.

All task groups share the bedrock_params dictionary. Only the keys referenced below are mandatory; every other Bedrock argument can be passed through the dictionary and is forwarded as-is to the synchronous helper.

Task Groups

TM1 ↔ TM1 Slice Loader

tm1_dynamic_executor_task_group(tm1_connection, bedrock_params, dry_run=False, logging_level='INFO')

Moves data between TM1 cubes using data_copy_intercube with Airflow-level parallelisation.

Key bedrock_params

  1. data_mdx_template (required, string)

    • This is the workload template for the main MDX query that each worker will execute. It must contain placeholders that match the dimension names from param_set_mdx_list, prefixed with a $.

    • Example: SELECT ... FROM [Sales] WHERE ([Period].[$Period], [Version].[$Version])

  2. param_set_mdx_list (required, list[string])

    • This is the slicing definition. It is a list of MDX set queries that define the parameters for parallelization. Each query should return a set of elements from a single dimension.

    • Example: ["{[Period].[All Months]}", "{[Version].[Actual,Budget]}"]

  3. target_cube_name (required, string)

    • The name of the target cube to clear before writing.

  4. target_clear_set_mdx_list (required, list[string])

    • A list of set MDX lists for clearing the target slices of the input cube.

    • Example: ["{[Period].[All Months]}", "{[Version].[ForeCast]}"]

  5. Optional mapping_steps, shared_mapping, use_mixed_datatypes

SQL ↔ TM1 Loaders

sql_to_tm1_dynamic_executor_task_group(tm1_connection, sql_connection, bedrock_params, dry_run=False, logging_level='INFO')

Streams relational data into TM1. The SQL connection is resolved through BaseHook and each slice executes its own parametrised SQL template.

Key bedrock_params

  1. sql_query_template (required, string)

    • This is the workload template for the main SQL query that each worker will execute. It must contain placeholders that match the dimension names from param_set_mdx_list, prefixed with a $.

    • Example: SELECT ... FROM sales s WHERE s.period = '$Period'

  2. param_set_mdx_list (required, list[string])

    • This is the slicing definition. It is a list of MDX set queries that define the parameters for parallelization. Each query should return a set of elements from a single dimension.

    • Example: ["{[Period].[All Months]}", "{[Version].[Actual,Budget]}"]

  3. target_cube_name (required, string)

    • The name of the target cube to clear before writing.

  4. target_clear_set_mdx_list (required, list[string])

    • A list of set MDX lists for clearing the target slices of the input cube.

    • Example: ["{[Period].[All Months]}", "{[Version].[Actual,Budget]}"]

  5. Optional use_mixed_datatypes, mapping payloads

tm1_to_sql_dynamic_executor_task_group(tm1_connection, sql_connection, bedrock_params, dry_run=False, logging_level='INFO')

Reads TM1 slices and writes them to a SQL table. Before fan-out, the target table is cleared via clear_sql_table_task.

Key bedrock_params

  1. data_mdx_template (required, string)

    • This is the workload template for the main MDX query that each worker will execute. It must contain placeholders that match the dimension names from param_set_mdx_list, prefixed with a $.

    • Example: SELECT ... FROM [Sales] WHERE ([Period].[$Period], [Version].[$Version])

  2. param_set_mdx_list (required, list[string])

    • This is the slicing definition. It is a list of MDX set queries that define the parameters for parallelization. Each query should return a set of elements from a single dimension.

    • Example: ["{[Period].[All Months]}", "{[Version].[Actual,Budget]}"]

  3. target_table_name (required, string)

    • The name of the target table to write.

  4. sql_delete_statement (optional, string)

    • Custom SQL delete statement. If missing, then the table is truncated based on target_table_name.

  • Optional related_dimensions, decimal, sql_schema, mapping payloads

CSV ↔ TM1 Loaders

csv_to_tm1_dynamic_executor_task_group(tm1_connection, bedrock_params, dry_run=False, logging_level='INFO')

Discovers *.csv files below source_directory and processes each file as one Airflow task. Target cube clears and metadata caching mirror the TM1 loaders.

Key bedrock_params

  1. target_cube_name (required, string)

    • The name of the destination cube in TM1.

  2. source_directory (required, string)

    • The full path to the directory containing the source CSV files to be loaded.

  3. target_clear_set_mdx_list (required, list[string])

    • A list of set MDX lists for clearing the target slices of the input cube.

    • Example: ["{[Period].[All Months]}", "{[Version].[Actual,Budget]}"]

  4. Optional use_mixed_datatypes, decimal, delimiter, mapping payloads

tm1_to_csv_dynamic_executor_task_group(tm1_connection, bedrock_params, dry_run=False, logging_level='INFO')

Exports TM1 slices to CSV files using param_set_mdx_list and the target_csv_output_dir.

Key bedrock_params

  1. data_mdx_template (required, string)

    • This is the workload template for the main MDX query that each worker will execute. It must contain placeholders that match the dimension names from param_set_mdx_list, prefixed with a $.

    • Example: SELECT ... FROM [Sales] WHERE ([Period].[$Period], [Version].[$Version])

  2. param_set_mdx_list (required, list[string])

    • This is the slicing definition. It is a list of MDX set queries that define the parameters for parallelization. Each query should return a set of elements from a single dimension.

    • Example: ["{[Period].[All Months]}", "{[Version].[Actual,Budget]}"]

  3. target_csv_output_dir (required, string)

    • The full path to the directory containing the source CSV files to be loaded.

  4. Optional decimal, delimiter, mapping payloads

Multi-cube Helper

copy_cube_data_on_elements(tm1_connection, cube_names, unified_bedrock_params, logging_level='INFO')

Convenience wrapper that expands unified_bedrock_params per cube and spawns individual tm1_dynamic_executor_task_group instances (via TaskGroup.override()) with unique group_id values.

Example DAG Snippet

from airflow import DAG
from datetime import datetime
from TM1_bedrock_py.airflow_executor.async_executor import (
    sql_to_tm1_dynamic_executor_task_group,
)

with DAG(
    dag_id="sql_to_tm1_sales",
    start_date=datetime(2024, 1, 1),
    schedule="0 2 * * *",
    catchup=False,
) as dag:
    sql_to_tm1_dynamic_executor_task_group(
        tm1_connection="tm1_prod",
        sql_connection="warehouse",
        logging_level="INFO",
        bedrock_params={
            "target_cube_name": "Sales",
            "param_set_mdx_list": [
                "{[Period].[Period].Children}",
                "{[Version].[Version].['Actual','Budget']}",
            ],
            "sql_query_template": "SELECT * FROM fact_sales WHERE period = '$Period'",
            "target_clear_set_mdx_list": ["{[Period].[$Period]}"],
            "mapping_steps": [...],
            "shared_mapping": {...},
        },
    )

Additional Notes

Warning

Tested Databases

For the 1.1.1 release, this functionality has been explicitly tested against MS SQL Server and PostgreSQL. Behavior with other database backends is yet to be verified.