FLYR and Azul Partner on Revenue Optimization Initiatives

Tech Blog

The Benefits of Dynamic Pipeline Generation Airflow

As part of our responsibilities as data engineers on a team focused on delivering key customer metrics, the FLYR Cloud team challenges ourselves to build flexible ways of managing our pipelines.

Google Cloud is our platform of choice, so raw data is loaded into BigQuery before being transformed into our company’s internal unified schema. Our transformation logic is implemented as SQL and executed on top of Google BigQuery. Whenever we generate new data, downstream systems are notified via PubSub message.

Our pipelines are orchestrated by Apache Airflow, and – in the case of pipelines managed by our team – Directed Acyclic Graphs (DAGs) themselves do not contain any logic except to organize the steps of execution. This means that our meat and potatoes are in SQL transformations.

Balancing Skill Sets

Our team is not homogeneous in terms of skills. In addition to the data engineers, we also have data analysts. The commonality among these team members is strong SQL skills.

Based on previous experiences, such a team structure can lead to difficulties in sprint planning and delivering predictive increments when one skill set is overloaded and another is internally blocked.

The obvious line of prevention is to leverage our common SQL strengths and allow all team members to focus on writing SQL, rather than having to set up pipelines that are similar in structure over and over again.

However, we encountered a problem when tackling Python code writing, which can be very repetitive when adding or extending pipelines. This is even more true for entry-level programmers, especially for ones with no Python background.

Airflow Directed Acyclic Graph (DAG) actually is Python code

If you study Airflow DAG beyond a basic tutorial, you will see that in Airflow, it doesn’t matter how DAG is constructed as long as the result is a DAG and the object of the DAG class.

For Airflow,

from airflow import DAG
from airflow.operators.python import PythonOperator
with DAG('my_dag', start_date=datetime(2022, 1, 1)) as dag:
  task_0 =  PythonOperator(
        task_id='python_1',
        python_callable=lambda : print('hi 0!')
    )
  task_1 =  PythonOperator(
        task_id='python_1',
        python_callable=lambda: print('hi 1!')
    )
  task_2 =  PythonOperator(
        task_id='python_1',
        python_callable=lambda: print('hi 2!')
    )
  task_3 =  PythonOperator(
        task_id='python_1',
        python_callable=lambda: print('hi 3!')
    )
  task_4 =  PythonOperator(
        task_id='python_1',
        python_callable=lambda: print('hi 4!')
    )
  task_5 =  PythonOperator(
        task_id='python_1',
        python_callable=lambda: print('hi 5!')
    )
  task_0 >> task_1 >> task_2 >> task_3 >> task_4 >> task_5

and

from airflow import DAG

from airflow.models.baseoperator import chain
from airflow.operators.python import PythonOperator

with DAG('my_dag', start_date=datetime(2022, 1, 1)) as dag:
  tasks = list()
  for i in range(5):
    tasks.append(
      PythonOperator(
        task_id=f'python_{i}',
        python_callable=lambda : print(f'hi {i}!')
        )
      )
   chain(tasks)

are identical from the DAG structure perspective.

The biggest takeaway so far has been that Airflow DAG (*.py files) doesn’t have to (and should not) be used as a declarative pipeline definition. You can, and should, use all the boon and glory of Python when working with Airflow, keeping in mind the guiding principles outlined in PEP 20. Because of Python’s flexibility, we can actually make our DAGs dynamic and easily control their structure or behavior in runtime or configuration. This approach, which could be considered metaprogramming, has many benefits, such as a smaller code footprint, reduced development time, and greater flexibility to handle new situations with no base code modification. This also creates the possibility of minimizing risk.

Generating a Pipeline

According to Murphy’s Law, “Anything that can go wrong will go wrong.”

Our main motivation for generating pipelines is to reduce the risk of mistakes, which tends to happen proportionally to a team’s size and rapidly increases with time constraints and tight delivery timelines.

Since SQL is our team’s common language, we decided to define the pipeline structure with the file structure of SQL scripts.

File structure like below

│
├── pipeline
│   ├── query1.sql
│   ├── query2.sql
│   ├── query3.sql
│   ├── query4.sql
│   └── multi_stage
│       ├── 01_first_multistage_query.sql
│       ├── 02_second_multistage_query.sql
│       └── 09_last_multistage_qery.sql
│

should be interpreted as

[ query1,
  query2,
  query3,
  query4,
 [01_first_multistage_query >> 02_second_multistage_query >> 09_last_multistage_qery]
]

Airflow generated graph

So we expect that each *.sql file is represented as a task, and each directory organizes tasks into sequences (based on alphabetical order).

Now let’s jump into some code. The first thing to do is to represent file structure in the Python structure.

import os

def get_table_name_from_file_name(fn: str) -> str:
    return fn.split('.')[0]

# this method allows to exclude files that should not be considered as steps of pipeline
def exclusion_rules(metric_source):
    if metric_source.startswith('__'):
        return True
    return False


def iterate_through_files_to_create_pipelines(path: str) -> List:
    pipelines = list()
    for (dirpath, dirnames, filenames) in os.walk(path):
        for fn in filenames:
            if exclusion_rules(get_table_name_from_file_name(fn)):
                continue
            pipelines.append(QueryPipeline(os.path.join(dirpath, fn)))

        for dn in dirnames:
            if exclusion_rules(dn):
                continue
            pipelines.append(SequencePipeline(os.path.join(dirpath, dn)))
        break
    return pipelines

The returned list contains a list of QueryPipeline or SequencePipeline, which represents a single QUERY or multi-stage query sequence. We’ll explain more about these in the following section.

Now we need to build DAG from a structure representing file structure, which is actually DAG structure.

def generate_dag(dags_folder=conf.get("core", "dags_folder")):
    pipelines = iterate_through_files_to_create_pipelines(path=f'{dags_folder}')
    schedule_interval = f'22 22 * * *'

    with DAG(
        dag_id=f"pipeline",
        schedule_interval=schedule_interval,
        default_args={"start_date": datetime(2021, 12, 1)},
        is_paused_upon_creation=True,
    ) as dag:
        groups = list()
        for pipeline in pipelines:
            with TaskGroup(group_id=pipeline.metric_table_name, prefix_group_id=False) as transform_group:
                pipeline.build_pipeline()
                groups.append(transform_group)

    chain(groups)
    return dag

p = generate_dag()

The section above is where the first part of the magic happens – we are building pipeline definition while executing *.py code.

Let’s take a look at the Pipeline class first. This is the base class for QueryPipeline and SequencePipeline.

It also introduces something new into the pipeline: each pipeline segment (first-level file or directory), after being executed, should publish a notification to the common notification bus (PubSub). For the sake of simplicity, it is represented by DummyOperator and could be considered as a POST-transformation task(s).

class Pipeline:
    PROJECT_ID = os.getenv('GCP_PROJECT_ID')
    DATASET = os.getenv('DATASET') 

    @property
    def metric_table(self):
        return {
                        'projectId': self.PROJECT_ID,
                        'datasetId': self.DATASET,
                        'tableId': self.output_table_name
                }
  
    def _delivery_notification(self):
        # More about it later
        return DummyOperator(
            task_id=f"{self.output_table_name}_notification")

A few assumptions and simplifications in QueryPipeline:

*.sql file is actually a template with well-known placeholders (BQ dataset, project ). Its name points into the output table. Thanks to that, the generation of such segments of a pipeline is as simple as

class QueryPipeline(Pipeline):

    def __init__(self, sql_query_template_file: str, table_name: str = None):
        self.filename = sql_query_template_file
        self.table_name = table_name or get_table_name_from_file_name(os.path.basename(self.filename))
        with open(self.filename, 'r') as sql_query_template:
            self.template = sql_query_template.read()

    @property
    def __query_string(self):
        return self.template.format(project_id=self.PROJECT_ID,
                                    dataset=self.DATASET)

    def build_pipeline(self):
        bq = self.get_transform()
        post_transofrmation = self._delivery_notification()
        bq >> post_transofrmation

    def get_transform(self):
        return QueryBigqueryOperator(
            task_id=f"{self.table_name}.sql",
            bigquery_query_string_or_callable=self.__query_string,
            bigquery_project_config_or_callable=self.metric_table
        )

A quick explanation about the code above:

__query_string method is responsible for filling out all placeholders inside the query. For the sake of simplicity, we are only using two variables.

build_pipeline method is responsible for building a whole pipeline (stream in DAG) that delivers data. It is also worth mentioning that, in our case, delivery always requires at least two steps (tasks) – a transformation task and a “post_transformation” task, which is simplified by sending the delivery notification over PubSub )
get_transform method is responsible for building and configuring the transformation task, which is always BigQuery SQL code.

SequencePipeline is no more than a series of QueryPipelines transformations.

def get_table_name_from_numbered_file_name(fn: str) -> str:
    return fn.split('.')[0] .split('_', 1)[1]

class SequencePipeline(Pipeline):
    def __init__(self, dir_path):
        self.steps: List[QueryPipeline] = list()
        self.metric_table_name: str = os.path.split(dir_path)[1]
        for (dirpath, dirnames, filenames) in os.walk(f'{dir_path}'):
            for fn in sorted(filenames):
                self.steps.append(MetricQueryPipeline(os.path.join(dirpath, fn),
                                                      table_name=get_table_name_from_numbered_file_name(fn)))

    def build_pipeline(self) -> None:
        tasks = [step.get_transform() for step in self.steps]
        tasks.append(self._delivery_notification())
        prev = None
        for task in tasks:
            if prev:
                prev >> task
            prev = task

The extra logic in SequencePipeline is because we are building a pipeline containing get_transform tasks from QueryPipelines built from individual, alphabetically-ordered SQL files in a folder, plus one post_transformation task.

Setting Up Sensors

Once we are set to generate a pipeline, we need to embed it into a global data pipeline by setting dependencies. That will assure that the pipeline is started when all dependent data sources are fresh.

We don’t want any other code or configuration except SQL to be written for the purpose of adding or modifying DAG. That way, we avoid the mistake of setting or not setting all required dependencies. We achieve this by parsing our SQL code in search of references to tables in specific projects and datasets (this is enough for our purpose).

The simplified naming convention is that table {customer_project_id}.{dataset}.{table_name} is delivered by DAG {customer_code}.{dataset}.{table_name}, and both customer_code and customer_project_id are part of the deployment configuration. Any reference to table project.dataset.table in SQL code should result in setting up a sensor for same-day execution to customer.dataset.table DAG.

Automatization is implemented by simple SQL Query Template parser in QueryPipeline class

class QueryPipeline(Pipeline):
    # for sake of simplicity, assuption that single source dataset is used is made
    cdm_dag_rgx = re.compile(r'`{project_id}\.({dataset})\.(.*?)`', flags=re.MULTILINE)
    ...
    ...
    ...
    
    def get_dependencies(self):
        tables = [t[1] for t in self.dag_rgx.findall(self.template)]
        dependencies = [(self.customer, 'dataset', table) for table in tables]
        return dependencies

and in SequencePipeline

class SequencePipeline(Pipeline):
    ...
    ...
    ...
    
    def get_metric_dependencies(self) -> list:
        return list(set(sum([step.get_metric_dependencies() for step in self.steps], [])))

Method get_dependencies allows us to identify all source tables’ dependencies and, by the rule of thumb described above, the DAGs that deliver them.

Now setting sensors is easy!

For each metric and each dependency, create ExternalTaskSensor and associate it with the corresponding dependent metric task group.

sensors = dict()
        for pipeline in pipelines:
            for table in pipeline.get_metric_dependencies():
                table_name = table[2]
                if table_name not in sensors:
                    sensors[table_name] = FlyrExternalTaskSensor(
                            task_id=f"{'_'.join(table)}_sensor",
                            external_dag_id=get_cdm_dag_name(table_name),
                            external_task_id='send_cdm_table_notification',
                            execution_date_fn=get_cdm_execution(table_name),
                            **sensor_config)
        
        with TaskGroup(group_id=pipeline.metric_table_name, prefix_group_id=False) as transform_group:
                pipeline.build_pipeline()

                for table in pipeline.get_metric_dependencies():
                    table_name = table[2]
                    sensors[table_name] >> transform_group

                groups.append(transform_group)

Conclusion

Airflow allows us to leverage Python, allowing *.py to be more than a static and declarative DAG definition. The example above shows that you can build or extend the airflow pipeline without writing a single line of Python code. That brings us a few steps closer to “Ship Day One” and also allows for more skillset diversity within the team.

However, to quote Isaac Asimov, “it’s a poor atom blaster that won’t point both ways.” In other words, there are bad consequences as well as good ones. With the high flexibility of metaprogramming, it is possible to introduce code that is hard to debug and hard to read, and that behaves contrary to expectation. When this technique is used poorly, it can cause severe issues. A naming convention and a well-structured and defined DAG topology are a couple of convenient and easy ways to help limit problems.

Join our team

FLYR is a team of industry experts passionate about improving the practice of Revenue Management.

View roles

Stay connected

Subscribe for all the latest industry insights and updates from FLYR.

Similar stories

Tech Blog

Off-Policy Evaluation: Valuing New Pricing Models Using Reinforcement Learning

FLYR’s engineering team estimates new pricing strategy outcomes before production to ensure the most successful models are deployed.

Tech Blog

Leveraging Data to Select the Right Language for High-Performance APIs

FLYR’s airline customers require lightning-fast turnaround times when generating seat prices, and at 3500 API calls per airline per second, every millisecond counts.

Tech Blog

Sustain your Application’s Loose Coupling with Dependency Injection in Python

FLYR’s engineering team discusses how to implement Python dependency injection in your projects at multiple levels of engagement.