Apache Airflow on AWS ECS

An overview of what AWS ECS is, how to run Apache Airflow and tasks on it for eased infrastructure maintenance, and what we’ve encountered so that you have an easier time getting up and running. Our data team recently made the transition in workflow systems from Jenkins to Apache Airflow. Airflow was a completely new […]

An overview of what AWS ECS is, how to run Apache Airflow and tasks on it for eased infrastructure maintenance, and what we’ve encountered so that you have an easier time getting up and running.

Our data team recently made the transition in workflow systems from Jenkins to Apache Airflow. Airflow was a completely new system to us that we had no previous experience with but is the current industry standard for the sort of data-centric workflow jobs we were looking to run. While it hasn’t been a perfect fit, we have been able to get a lot of benefits from it: jobs are defined in code, we’ve the history of each job, it goes through our normal pull request process, and everyone on the team is able to read and write jobs.

Since our team is data focused, we wanted our Airflow setup to be as easy to maintain as possible, especially around infrastructure, so we have minimal distractions with high resiliency. This led us to using AWS ECS not only to run Airflow but for our bigger tasks that are already containerized. Not familiar with ECS? Or how to run Airflow or its tasks on it? Don’t worry, we weren’t either.

Welcome

This is what we’ve learned.

  1. What is ECS?
  2. ECS with Terraform
  3. Setting up Airflow infrastructure on ECS
  4. Setting up Airflow tasks on ECS
  5. Learnings from Airflow and ECS

tl;dr Running Airflow’s infrastructure on ECS is super easy but running the ECS operator needs hecka help setting up.

What is ECS?

ECS is AWS’s Elastic Container Service, designed to let you run containers without worrying about servers. You do this by creating a cluster for your system, define task definitions for the tasks you want to run, and possibly group your tasks into services. You can also choose if you want to have your containers fully managed (ECS on Fargate), kind of managed but kind of not (ECS on EC2), or if you’d like to use Kubernetes.

If all of that made sense to you, congratulations: you can skip to the next section! If you need a bit more of a breakdown, here’s what you need to know:

A container is like a snapshot of a tiny computer that you can run anywhere. If, for example, you have some Docker containers to run, you can run them on my mac Mac, my father’s Mac, my mother’s Windows, my sister’s Linux, a server in a public cloud, a server in a private data center with an undisclosed location for security reasons — anywhere there’s a computer that supports the container’s setup can run that container, and you’ll get the same result from the container every time.

The main advantage containers give us is that it simplifies making sure that something running locally on an engineer’s machine, and running perfectly fine in a staging environment QA is testing in, will also run in a production environment where users are making requests, since they’re all the same container being run the same way. No dependency went missing in the middle, no permission was accidentally changed locally that hid a problem: it’s the same thing running the same way.

You might be saying, that sounds great! I’mma launch containers everywhere! And that is the attitude ECS is meant to capture and bring to fruition, because your other option to running containers is a bit more hands on: after all, a container needs to execute on a system, and that means hardware. That means a server somewhere you define and setup with an operating system of its own, and you might have to install the container’s setup system on it (for example, installing Docker), and you have keep it all up to date, plus you want to secure it, and you have to have a way to get containers started on that server, and you probably want a way to monitor for if a container goes down with a way to restart it, and there’s security patches…. This is starting to be a long list, and while it’s been a pretty standard list for a while, we deserve better systems.

We’re a data team, after all, and none of that is really about working with our data.

That’s where ECS comes in. Instead of running servers (EC2) where you do all the above things to to get your containers running, you can use ECS with Fargate to worry only about the containers: here’s the CPU and memory it needs, here’s the start command, here’s a health check, give it these environment values, and I want two of these running at all times. Boom: your containers are running, and restart as needed, and things are kept up to date for you, and there’s monitoring built in.

There’s a lot more nuance that goes into picking if you want to run ECS on Fargate or EC2 or Kubernetes, but if you’re still reading this, you probably want Fargate: with Fargate, you only need to worry about your containers, and the rest is taken care of for you.

You guys, I'm, like, really smart now. You don't even know.

Now that we’re all caught up…

ECS with Terraform

As discussed in a previous post, we’re huge fans of using Terraform for our team’s infrastructure, especially modules to keep systems in line across environments; I’ll let you read that post if you’d like some background knowledge on how we use Terraform modules and have them interact with Consul for sharing values. The short story though is, as with the above, it keeps the setup easy for our data team so we can continue to focus on our data.

For our workflow system, as with our data pipeline, we started by setting up a new Terraform module that contains

  1. input values like configurations for the infrastructure or pass through values (we’ll discuss those later)
  2. a single output value within Terraform, the workflow security group, so it can be allowed access to other systems like the data warehouse
  3. Consul output values like where to find the workflow system or pass through values (again, more on those later)
  4. our metadata store for Airflow; we’re using a Postgres RDS instance
  5. a broker store for Celery; we’re using a Redis ElasticCache instance
  6. secrets management, to go back and forth with Parameter Store which integrates nicely with ECS
  7. our ECS cluster to house our workflow system, including its security group and IAM role
  8. our main ECS tasks, for the basic Airflow infrastructure (discussed in the next section)

That might seem like a big list to you but remember, this is for a fully functional, production ready Airflow setup: you can start much simpler with just the cluster and its tasks, and add on as you go.

To start our ECS setup, we first needed a cluster with a capacity provider, ie the management style we want:

resource "aws_ecs_cluster" "airflow-cluster" {
  name = "airflow-test"
  capacity_providers = ["FARGATE"]
}

Our cluster also needed a role, which you can define through Terraform or create manually through the AWS console and then connect in Terraform, so it can have permissions to do things like talk to Redshift:

data "aws_iam_role" "airflow-role" {
  name = "test.workflow"
}

(If you didn’t catch it, that’s a data block instead of a resource block, so it’s fetching what already exists and making it usable within Terraform. This is especially helpful if you have existing infrastructure that hasn’t been fully ported over yet but want to set up new infrastructure in Terraform.)

The other big thing our clusters needed regardless of its tasks is to control who can talk to it and getting permission to talk to others, since we want to tightly control who can access our data:

resource "aws_security_group" "airflow-sg" {
  name = "Airflow"
  description = "Airflow test security group"
  vpc_id = var.vpc_id

  ingress {
    from_port = 0
    to_port = 0
    protocol = "-1"
    
    self = true
    security_groups = var.security_groups_access_workflow
  }

  egress {
    from_port = 0
    to_port = 0
    protocol = "-1"
    cidr_blocks = ["0.0.0.0/0"]
  }
}

You can see we take in the VPC and security groups from whoever is invoking the module, and then expose elsewhere this Airflow security group for other systems to allow access to.

Beautiful, now we have an ECS cluster. We can look at it, we can marvel at it, we can say, “Oh yes, I don’t worry about servers anymore, for I have transcended and run my containers directly.” But of course, just because we have a cluster doesn’t mean it does anything: you need to define task definitions for actual work to be done, and possibly services too if you’d like. (Tasks, as we’ll see later, can be run outside services.) However this does give us the framework now to set up the rest of our Airflow infrastructure within.

Tell me more

Setting up Airflow infrastructure on ECS

The two main pieces of Airflow infrastructure we needed were dubbed “the controller” and “the scheduler.” (Later additions to our setup like Celery workers, nicknamed “stalks,” followed the same setup pattern so I won’t include them here.) Now, you might understand immediatley what the scheduler is doing: it’s in charge of the Airflow scheduler (airflow scheduler). That leaves the controller as a new addition to the Airflow vocabulary.

We use the controller to run the UI (airflow webserver), make sure the database is all set up (airflow initdb), set up our root users (airflow create_user …), and create pools to throttle access to certain resources (airflow pool --import throttling_pools.json). Since it’s in charge of controlling all these pieces, we have dubbed it the controller, and when more work is needed, it is where we add this work to.

(Sidenote: as a team we prefer to use controller/worker language across our systems, with the controller name coming from Kafka where the lead broker is dubbed the controller, since leader refers to a different part of the system and is an easily overloaded term. It works well for nearly all systems we’ve applied it to, and might work well for your systems as well.)

Despite these differences between what the controller and scheduler do, they actually have almost identical setups within ECS and use a lot of the same inputs, so I’ll show the scheduler to start with since it has less pieces.

What's that?

The first thing our scheduler needed was a task definition:

resource "aws_ecs_task_definition" "scheduler-definition" {
  family = "scheduler-test"
  container_definitions = jsonencode(
    [
      {
        "name" = "scheduler",
        "image" = format("%s/%s", var.docker_address, var.controller_container),
        "portMappings" = [{"containerPort" = var.controller_port}],
        "command" = ["sh","start_scheduler.sh"],
        "environment" = [
          { "name" = "ENVIRONMENT", "value" = var.environment },
          { "name" = "LOG_LEVEL", "value" = var.log_level },
          { "name" = "CONSUL_ADDRESS", "value" = var.consul_address },
          { "name" = "DOCKER_ADDRESS", "value" = var.docker_address },

          { "name" = "AIRFLOW__CORE__SQL_ALCHEMY_SCHEMA", "value" = var.database_schema },
          { "name" = "AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION", "value" = "False" }
        ],
        "secrets" = [
          { "name" = "AIRFLOW__CORE__SQL_ALCHEMY_CONN", "valueFrom" = aws_ssm_parameter.metadata_connection.arn },
          { "name" = "AWS_ACCESS_KEY_ID", "valueFrom" = data.aws_ssm_parameter.aws_access_key_id.arn },
          { "name" = "AWS_SECRET_ACCESS_KEY", "valueFrom" = data.aws_ssm_parameter.aws_secret_access_key.arn }
        ],
        "logConfiguration" = {
          "logDriver" = "awslogs",
          "options" = {
            "awslogs-create-group" = "true",
            "awslogs-region" = var.region,
            "awslogs-group" = "/ecs/airflow/test/scheduler",
            "awslogs-stream-prefix" = "ecs"
          }
        },
        "healthCheck" = {
          "command" = var.scheduler_health_command_list,
          "startPeriod" = var.controller_grace_period,
          "interval" = var.controller_healtch_check_interval,
          "retries" = var.controller_healtch_check_retries,
          "timeout" = var.controller_healtch_check_timeout
        }
      }
    ]
  )

  requires_compatibilities = ["FARGATE"]
  network_mode = "awsvpc"

  execution_role_arn = data.aws_iam_role.airflow-ecs-role.arn

  cpu = var.scheduler_cpus * 1024.0
  memory = var.scheduler_memory * 1024.0
}

The majority of this is the task definition, mainly what container to run, what environment values and secret values to set up, how to log and perform health checks, and what command to run. Then we linked that up with Fargate and our role we created earlier, specify the CPU and memory we want, and we have something that can run the scheduler.

(Note that right now, this is not tied to our cluster: task definitions are cluster agnostic, should you have a task you want to run in multiple clusters.)

Now, since we always want a scheduler running, we created a service around this task definition to ensure it’s able to do its job:

resource "aws_ecs_service" "scheduler-service" {
  name = "scheduler"
  cluster = aws_ecs_cluster.airflow-cluster.arn
  launch_type = "FARGATE"
  platform_version = "LATEST"
  task_definition = aws_ecs_task_definition.scheduler-definition.arn
  desired_count = 1

  network_configuration {
    subnets = var.subnets
    security_groups = [aws_security_group.airflow-sg.id]
  }

  enable_ecs_managed_tags = true
  propagate_tags = "TASK_DEFINITION"
}

This service wraps our task definition, pulling it into the cluster which will always make sure one task is running based on it. This takes care of running our Airflow scheduler, nothing else needed, boom we’re golden.

Excited

The controller has an almost identical task definition and service setup, sharing nearly all values. What we added to it though was a nice DNS record that can be accessed while on our VPN and the option to run multiple web servers if we wanted to through a load balancer:

resource "aws_ecs_service" "controller-service" {
  name = "controller"
  cluster = aws_ecs_cluster.airflow-cluster.arn
  launch_type = "FARGATE"
  platform_version = "LATEST"
  task_definition = aws_ecs_task_definition.controller-definition.arn
  desired_count = 1

  load_balancer {
    target_group_arn = aws_lb_target_group.controller-target.arn
    container_name = local.controller_definition_name
    container_port = var.controller_port
  }
  health_check_grace_period_seconds = var.controller_grace_period

  network_configuration {
    subnets = var.subnets
    security_groups = [aws_security_group.airflow-sg.id]
  }

  enable_ecs_managed_tags = true
  propagate_tags = "TASK_DEFINITION"
}

resource "aws_route53_record" "controller-dns" {
  zone_id = var.dns_zone
  name = var.controller_address
  type = "A"

  alias {
    name = aws_lb.controller-lb.dns_name
    zone_id = aws_lb.controller-lb.zone_id
    evaluate_target_health = false
  }
}

resource "aws_lb" "controller-lb" {
  name = "controller-test"
  subnets = var.subnets
  load_balancer_type = "application"
  internal = true
  security_groups = [aws_security_group.airflow-sg.id]
}

resource "aws_lb_target_group" "controller-target" {
  name = "controller-test"
  port = var.controller_port
  protocol = local.controller_protocol
  vpc_id = var.vpc_id
  target_type = "ip"

  health_check {
    path = var.controller_health_endpoint
    matcher = "200"
    interval = var.controller_grace_period
  }
}

resource "aws_lb_listener" "controller-listener" {
  load_balancer_arn = aws_lb.controller-lb.arn
  port = var.controller_port
  protocol = local.controller_protocol

  default_action {
    target_group_arn = aws_lb_target_group.controller-target.arn
    type = "forward"
  }
}

resource "aws_lb_listener_rule" "controller-listener-rule" {
  listener_arn = aws_lb_listener.controller-listener.arn

  action {
    type = "forward"
    target_group_arn = aws_lb_target_group.controller-target.arn
  }

  condition {
    field = "host-header"
    values = [aws_route53_record.controller-dns.name]
  }
}

(If you’ve never connected a DNS record, load balancer, and auto scaling group in EC2 before, the above might look like a lot of work, but it’s a pretty standard if verbose setup.)

And with that, we now have Airflow up and running: the database can be setup and configured as desired, the scheduler will run, the controller will prep the system if needed before starting the web server, and we’re good to roll this out for testing. Of course you might choose to pass in your secrets in a different way, or add way more Airflow configurations, but it should be simple no matter what.

It's happening

You might have noticed that I did sneak in a few extra environment variables in those Airflow task definitions: the environment, the log level, the Consul address, and the Docker address. We found that having those always available helped our jobs to run (for example, we know every job can always check the environment it’s in) and allowed us to build custom utilities, especially around running Airflow tasks on ECS.

Setting up Airflow tasks on ECS

Airflow has an ECS operator that seems great to start with: run this little bit of code and you’re done! But… not quite.

Adjusting glasses

Unfortunately, Airflow’s ECS operator assumes you already have your task definitions setup and waiting to be run. If you do, then go ahead and use the operator to run tasks within your Airflow cluster, you are ready to move on. If however you need to define those dynamically with your jobs, like we did, then it’s time for some Python.

Remember how before I said we had pass through values in our Terraform module? That’s where those come in. Terraform is where we know things like the address for our Docker Registry, or how to connect to our data pipeline and data warehouse. By having Terraform pass those values into Consul, we can then write Python to pull it down and make use of it, same as with our data pipeline setup. See: logic to the madness!

We have utility functions in our workflow setup for all of our most common actions: creating a DAG, creating a local (Python) operator, creating a remote (ECS) operator, getting values from Consul, posting to Slack, etc. These grew out of the Airflow provided functionality either not providing everything we needed or requiring additional setup that we wanted to keep standard and sensible for our team, like the remote operator setup. Here, let me even show you our publicly exposed function’s call signature and documentation for making a remote operator to run something in ECS:

def make_remote_operator(
    dag,
    task_id,
    task_group,
    task_name,
    task_container,
    task_start_command = None,
    task_memory = CpuValues.M.default_memory(),
    task_cpus = CpuValues.M,
    healthcheck_command = None,
    healthcheck_waittime = None,
    healthcheck_interval = None,
    healthcheck_timeout = None,
    healthcheck_retries = None,
    environment_values = None,
    secret_values = None,
    local_values = None,
    jit_values = None,
    throttling_group = None
):
    '''
    Create a remote operator. Currently this is an ECS operator. Read more at https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task_definition_parameters.html .
    
    :param dag: DAG this operator will be part of.
    :param task_id: Id of this task.
    :param task_group: Enum value of WorkGroup this task is a type of, for example WorkGroup.ETL for an ETL task.
    :param task_name: Name of task.
    :param task_container: Container to run for task.
    :param task_start_command: Start command to use, if different from default.
    :param task_memory: How much memory (in MiB) to have; must be valid for the CPU setting. Use the CpuValues enum to verify valid values.
    :param task_cpus: How much CPU to have, as a CpuValues enum.
    :param healthcheck_command: Command to run to perform a health check, if any.
    :param healthcheck_waittime: How long to wait before performing a health check, in seconds. Must be from 0 to 300. Disabled by default.
    :param healthcheck_interval: How long to wait between performing a health check, in seconds. Must be from 5 to 300. Default is 30 seconds.
    :param healthcheck_timeout: How long to wait before failing a health check that didn't response, in seconds. Must be from 2 to 60. Default is 5 seconds.
    :param healthcheck_retries: How many times to retry the health check before failing. Must be from 1 to 10. Default is 3.
    :param environment_values: List of environment value keys to fetch and populate from Consul.
    :param secret_values: List of secret environment value keys to fetch and populate from Consul.
    :param local_values: Map of key/values to populate.
    :param jit_values: Map of key/values to populate that you cannot know until just before the job runs (just in time).
    :param throttling_groups: Enum value of ThrottlingGroup this task if in the throttling group of, if any, for example ThrottlingGroup.WAREHOUSE for the warehouse group.
    :return: ECS operator.
    '''

Oh, woe to me, business is bad.

Long list, isn’t it? This is the actual information needed to create a task definition and run it in ECS; you can see why we wrapped it instead of asking everyone on the team to go their own way and hoping for the best.

To take you through the steps so you can create a utility that makes sense for your use case, let me walk you through the code as it executes. This actually brought us even deeper into how ECS runs than we needed to know for the controller and scheduler, while also giving the data team a way to easily run their containers remotely without needing engineering help.

Magic girl transformation: Ada, countess of Lovelace

The first thing we do to create an ECS operator is to get all the values in a specific Consul directory, waiting for a remote task to use. You might find your values are different from ours but what we pop in there is:

  • the ARN for the cluster to run in
  • the execution role to use
  • the launch type (FARGATE)
  • the network configurations
  • the AWS region name

You can see that most of these were also used in setting up the controller and scheduler; that’s why passing these from Terraform to Consul is so important in allowing us to use them here. If you decided to move from Fargate to EC2, update Terraform, it’ll tell Consul, and the Python that pulls these values down will automatically update your task definitions.

Now with those basics out of the way, we need to generate our task definition, which is what the majority of those parameters are about. Essentially we want to generate the full JSON task definition that the ECS API uses, including settings like:

  • the CPU and memory; we use an enum for them since ECS can be a bit picky
  • the container and, if applicable, the command to start it with; this is where having the Docker Registry address comes in handy
  • any configurations around health checks for the container while it runs
  • environment and secret values, which we can pull from Consul if they’re present or supply from the job definition directly
  • logging so we can follow what the container is doing

This is the part of the code that requires the deepest understanding of ECS but, once implemented, should help new people find quickly what they need to supply and what it does without having to go to the AWS documentation.

Alright, at this point we have a task definition but we haven’t registered it yet. AWS’s boto3 library provides a way to interact with ECS; what we do is check for a current version of the task and pull it down. If there’s no task definition or, and this is the crucial part, if the task definition we just generated is different from what ECS currently knows about, we register a new version; otherwise we let it be. (We founded we needed deepdiff to do this comparison effectively as there’s no upsert method, so our system… yeah it registered hundreds of thousands of task definitions for no reason. 🤦🏻‍♀️) So, let’s say you have a data scientist on your team who changes a job so that a container’s start command and a few of the input values are different: this will detect that and push up an update to the task definition without the data scientist even having to know what a task definition is.

Other way around

It’s now and only now that we can finally invoke the ECS operator Airflow provides, putting any last minute overrides like just in time values (maybe input from the user triggering the job) into the operator and adding it to any pools as needed (for example, only letting so many systems interact with the warehouse at once). From there, Airflow handles telling ECS to spin up this task and execute, watching it until it’s done, and reporting back.

Nothing to see here. Please disperse.

That’s all a lot of work and you might be asking yourself, what’s the point? Well for our team, the point is that we already have a lot of containers and setup around them: a Docker Compose file, for example, to tell us how to run every task. By running these containers locally and in the workflow system, we know we will have consistent results, and we can extend our utilities to do things like read in the Docker Compose file (it’s just a YAML, after all) to help us generate our remote operator (what’s the container name, or startup command, or values needed?) with a bit more ease. For heavy duty, long running operations that might require a lot of CPU or memory, this gives us flexibility. (Currently Fargate doesn’t support GPUs but that’s on the roadmap and other ECS capacity providers do support it, which might be of particular interest to data teams.)

Learnings from Airflow and ECS

Picking up ECS was quite the challenge sometimes, and I’m not sure we would have made it ourselves without writeups from the community about how they managed it and AWS sending some lovely folks to visit us several months ago and answer our questions. Tasks and services, for example, were very confusing at the start and we couldn’t get our head around what we were suppose to be doing with them.

We’d discovered ECS at the start of our journey as something people talked about running their Airflow setup on, but hadn’t found a lot of detail around how to actually do that, especially for working with ECS operators in Airflow. It’s been our goal from the start to not only get Airflow running and learn how to work with ECS for other systems the data team might have, but to also provide what we learned and did for others to have an easier time getting their system set up and running. If people want to know more than I’ve already written, we can post follow ups; just let us know!

Our Airflow setup actually arrived just in time for Jenkins going hard down. We were able to spin up our Airflow production with less than a day’s notice and start using it. Sure, it still has some quirks the team needs to get used to as Airflow is a hecka quirky system, but for the moment it’s taken over better than anyone could have anticipated. The rest of the team is quickly getting accustomed to how ECS works, where logs are, how to monitor it — everything we’d hoped for.

We're all just trying to turn hard work into chicken nuggets.

Ideally in the future we’ll be able to move other systems the team owns from EC2 to ECS, which will both ease what the team needs to know to maintain those services while also easing the burden on other teams to help us keep them up and running. After all, when we do our job well, no one even notices we’re here doing it, moving and processing terabytes of data like it’s no big deal, and that’s how we like it.

Also, Airflow contributors? How about building out that ECS operator pretty please?

Thank you

Source: GameChanger