Setting up Airflow 2.0 – DZone Open Source

Hello, welcome again.

On this a part of the tutorial, we are going to focus on the best way to run airflow domestically.

There are two methods to put in Airflow domestically:

  1. PIP https://airflow.apache.org/docs/apache-airflow/stable/start/local.html
  2. Docker

We’ll set up the newest Airflow model by way of Docker.

Airflow comes with default database as SQLite(not beneficial for manufacturing because it can’t be parallelized) and Sequential executor(runs just one activity at a time).

For Docker, the stipulations to put in are: Docker (https://docs.docker.com/engine/install/) and Docker Compose (https://docs.docker.com/compose/install/)

Let’s Get Began

We’ll create a listing construction like:

14799542 directory

dags is a crucial folder; each dag definition that you simply place below the dags listing is picked up by the scheduler.

scripts: We’ve a file known as airflow-entrypoint.sh through which we are going to place the instructions that we need to execute when the airflow container begins.

.env is the file that we’ll use to produce atmosphere variables.

docker-compose.yaml is for beginning up a number of containers that’s webserver, scheduler and metadata based mostly on the dependencies.

Dockerfile is the file the place we place all pictures to be pulled and the libraries to put in.

Dockerfile

FROM apache/airflow
USER root

ARG AIRFLOW_HOME=/choose/airflow
ADD dags /choose/airflow/dags

RUN pip set up --upgrade pip

USER airflow
RUN pip set up --trusted-host pypi.org --trusted-host recordsdata.pythonhosted.org boto3

USER $AIRFLOW_UID

Apache/Airflow is our base picture and the Airflow house default folder is /choose/airflow

Then we’re including dags listing from our machine to /choose/airflow/dags in Docker as we are going to use Docker for our Airflow.

We are able to set up all python libraries.

.env

AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS=False
AIRFLOW__CORE__SQL_ALCHEMY_CONN=postgres+psycopg2://airflow:airflow@postgres:5432/airflow
AIRFLOW__CORE__FERNET_KEY=81HqDtbqAywKSOumSha3BhWNOdQ26slT6K0YaZeZyPs=
AIRFLOW_CONN_METADATA_DB=postgres+psycopg2://airflow:airflow@postgres:5432/airflow
AIRFLOW_VAR__METADATA_DB_SCHEMA=airflow
AIRFLOW__SCHEDULER__SCHEDULER_HEARTBEAT_SEC=5
AIRFLOW__CORE__EXECUTOR=LocalExecutor

The executor that we’ll use is Native in order that we will see how a number of duties can run in parallel. 

The scheduler heartbeat interval is 5 seconds. Fernet key’s used for encryption and we have now outlined our metadata connections and to attach with metadata database airflow makes use of a library known as SQL Alchemy.

scripts/airlfow-entrypoint.sh

#!/usr/bin/env bash
airflow resetdb
airflow db init
airflow upgradedb
airflow customers create -r Admin -u admin -e jyotisachdeva57@gmail.com -f jyoti -l sachdeva -p admin
airflow scheduler &
airflow webserver

These are the instructions that can be executed when the airflow container begins.

The database could be initialized, and to log in to the webserver, we have to create our first person.

Then, we are going to begin the scheduler within the background and finally the webserver.

model: "2.1"
providers:
  postgres:
    picture: postgres:12
    atmosphere:
      - POSTGRES_USER=airflow
      - POSTGRES_PASSWORD=airflow
      - POSTGRES_DB=airflow
    ports:
      - "5434:5432"
  scheduler:
    construct:
      context: .
      dockerfile: Dockerfile
    restart: on-failure
    command: scheduler
    entrypoint: ./scripts/airflow-entrypoint.sh
    depends_on:
      - postgres
    env_file:
      - .env
    ports:
      - "8794:8793"
    volumes:
      - ./dags:/choose/airflow/dags
      - ./airflow-logs:/choose/airflow/logs
      - ./scripts:/choose/airflow/scripts
    healthcheck:
      take a look at: ["CMD-SHELL", "[ -f /usr/local/airflow/airflow-webserver.pid ]"]
      interval: 30s
      timeout: 30s
      retries: 3
  webserver:
    construct:
      context: .
      dockerfile: Dockerfile
    hostname: webserver
    restart: all the time
    depends_on:
      - postgres
    command: webserver
    env_file:
      - .env
    volumes:
      - ./dags:/choose/airflow/dags
      - ./scripts:/choose/airflow/scripts
      - ./airflow-logs:/choose/airflow/logs
    ports:
      - "8088:8080"
    entrypoint: ./scripts/airflow-entrypoint.sh
    healthcheck:
      take a look at: ["CMD-SHELL", "[ -f /usr/local/airflow/airflow-webserver.pid ]"]
      interval: 30s
      timeout: 30s
      retries: 32

The primary service is Postgres, which we’re utilizing as our metadata database.

We’re mapping the port 5432 of the Postgres container with 5434 of our native machine.

The second service is scheduler.

Now, we’re utilizing Dockerfile because the picture and for this service to start out metadata database ought to be up and operating.

airflow-logs could be created on our native machine, the place all of the logs could be saved.

Finally, we have now the webserver. It could take all atmosphere variables from the .env file and would execute all instructions from airflow-entrypoint.sh on the time when the container begins.

Now, what’s within the dags folder will not be the scope of this weblog. We’ll focus on that later.

I’m simply pasting an instance, so don’t fret about what’s within the file. We’ll focus on it intimately within the subsequent weblog.

import codecs
import logging
from datetime import timedelta
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.utils import dates
logging.basicConfig(format="%(identify)s-%(levelname)s-%(asctime)s-%(message)s", stage=logging.INFO)
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
def create_dag(dag_id):
    default_args = 
        "proprietor": "jyoti",
        "description": (
            "DAG to clarify airflow ideas"
        ),
        "depends_on_past": False,
        "start_date": dates.days_ago(1),
        "retries": 1,
        "retry_delay": timedelta(minutes=1),
        "provide_context": True,
    
    new_dag = DAG(
        dag_id,
        default_args=default_args,
        schedule_interval=timedelta(minutes=5),
    )
    def task_1(**kwargs):
        logger.data('=====Executing Job 1=============')
        return kwargs['message']
    def task_2(**kwargs):
        logger.data('=====Executing Job 2=============')
        task_instance = kwargs['ti']
        consequence = task_instance.xcom_pull(key=None, task_ids="Task_1")
        logger.data('Extracted the worth from activity 1')
        logger.data(consequence)
    with new_dag:
        task1 = PythonOperator(task_id='Task_1',
                                                    python_callable=task_1,
                                                    op_kwargs=
                                                    
                                                        'message': 'hellow airflow'
                                                    ,
                                                    provide_context=True)
        task2 = PythonOperator(task_id='Task_2',
                                            python_callable=task_2,
                                            op_kwargs=None,
                                            provide_context=True)
        task2.set_upstream(task1)
        return new_dag
dag_id = "take a look at"
globals()[dag_id] = create_dag(dag_id)

Now,  let’s get the airflow newest model operating.

docker-compose -f docker-compose.yaml up --build

14799556 airflow ter

Airflow is up and operating!

Airflow webserver default port is 8080, and we’re mapping the container’s port 8080 to 8088 of our machine.

Go to http://localhost:8088

14799567 login

Airflow is up. Now we will log in utilizing the person that we created in airlfow-entrypoint.sh

The username is admin, and the password is admin.

14799569 ui

I hope you will have loved studying the weblog!

Thanks 🙂


Supply hyperlink

About PARTH SHAH

Check Also

yn6pEmFhhTGdJNw5Ucd2JK 1200 80

Raven QA strike ends, at least for now

Following the formation of a brand new union by Raven Software program QA staff final …

Leave a Reply

Your email address will not be published. Required fields are marked *

x