热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

工作流管理平台Airflow

Airflow1.引言Airflow是Airbnb开源的一个用Python写就的工作流管理平台(workflowmanagementplatform)。在前一篇文章中,介绍了如何用

Airflow

1. 引言

Airflow是Airbnb开源的一个用Python写就的工作流管理平台(workflow management platform)。在前一篇文章中,介绍了如何用Crontab管理数据流,但是缺点也是显而易见。针对于Crontab的缺点,灵活可扩展的Airflow具有以下特点:

  • 工作流依赖关系的可视化;
  • 日志追踪;
  • (Python脚本)易于扩展

对比Java系的Oozie,Airflow奉行“Configuration as code”哲学,对于描述工作流、判断触发条件等全部采用Python,使得你编写工作流就像在写脚本一样;能debug工作流(test backfill命令),更好地判别是否有错误;能更快捷地在线上做功能扩展。Airflow充分利用Python的灵巧轻便,相比之下Oozie则显得笨重厚拙太多(其实我没在黑Java~~)。《What makes Airflow great?》介绍了更多关于Airflow的优良特性;其他有关于安装、介绍的文档在这里、还有这里。

下表给出Airflow(基于1.7版本)与Oozie(基于4.0版本)对比情况:

功能AirflowOozie
工作流描述 Python xml
数据触发 Sensor datasets, input-events
工作流节点 operator action
完整工作流 DAG workflow
定期调度 DAG schedule_interval coordinator frequency
     
任务依赖 >><<
内置函数、变量 template macros EL function, EL constants

之前我曾提及Oozie没有能力表达复杂的DAG,是因为Oozie只能指定下流依赖(downstream)而不能指定上流依赖(upstream)。与之相比,Airflow就能表示复杂的DAG。Airflow没有像Oozie一样区分workflow与coordinator,而是把触发条件、工作流节点都看作一个operator,operator组成一个DAG。

2. 实战

下面将给出如何用Airflow完成data pipeline任务。

首先简要地介绍下背景:定时(每周)检查Hive表的partition的任务是否有生成,若有则触发Hive任务写Elasticsearch;然后等Hive任务完后,执行Python脚本查询Elasticsearch发送报表。但是,Airflow对Python3支持有问题(依赖包为Python2编写);因此不得不自己写HivePartitionSensor

# -*- coding: utf-8 -*-
# @Time    : 2016/11/29
# @Author  : rain
from airflow.operators import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
from impala.dbapi import connect
import logging


class HivePartitionSensor(BaseSensorOperator):
    """
    Waits for a partition to show up in Hive.

    :param host, port: the host and port of hiveserver2
    :param table: The name of the table to wait for, supports the dot notation (my_database.my_table)
    :type table: string
    :param partition: The partition clause to wait for. This is passed as
        is to the metastore Thrift client,and apparently supports SQL like
        notation as in ``ds=‘2016-12-01‘``.
    :type partition: string
    """
    template_fields = (‘table‘, ‘partition‘,)
    ui_color = ‘#2b2d42‘

    @apply_defaults
    def __init__(
            self,
            conn_host, conn_port,
            table, partition="ds=‘{{ ds }}‘",
            poke_interval=60 * 3,
            *args, **kwargs):
        super(HivePartitionSensor, self).__init__(
            poke_interval=poke_interval, *args, **kwargs)
        if not partition:
            partition = "ds=‘{{ ds }}‘"
        self.table = table
        self.partition = partition
        self.conn_host = conn_host
        self.conn_port = conn_port
        self.conn = connect(host=self.conn_host, port=self.conn_port, auth_mechanism=‘PLAIN‘)

    def poke(self, context):
        logging.info(
            ‘Poking for table {self.table}, ‘
            ‘partition {self.partition}‘.format(**locals()))
        cursor = self.conn.cursor()
        cursor.execute("show partitions {}".format(self.table))
        partitions = cursor.fetchall()
        partitions = [i[0] for i in partitions]
        if self.partition in partitions:
            return True
        else:
            return False

Python3连接Hive server2的采用的是impyla模块,HivePartitionSensor用于判断Hive表的partition是否存在。写自定义的operator,有点像写Hive、Pig的UDF;写好的operator需要放在目录~/airflow/dags,以便于DAG调用。那么,完整的工作流DAG如下:

# tag cover analysis, based on Airflow v1.7.1.3
from airflow.operators import BashOperator
from operatorUD.HivePartitionSensor import HivePartitionSensor
from airflow.models import DAG

from datetime import datetime, timedelta
from impala.dbapi import connect

conn = connect(host=‘192.168.72.18‘, port=10000, auth_mechanism=‘PLAIN‘)


def latest_hive_partition(table):
    cursor = conn.cursor()
    cursor.execute("show partitions {}".format(table))
    partitions = cursor.fetchall()
    partitions = [i[0] for i in partitions]
    return partitions[-1].split("=")[1]


log_partition_value = """{{ macros.ds_add(ds, -2)}}"""
tag_partition_value = latest_hive_partition(‘tag.dmp‘)

args = {
    ‘owner‘: ‘jyzheng‘,
    ‘depends_on_past‘: False,
    ‘start_date‘: datetime.strptime(‘2016-12-06‘, ‘%Y-%m-%d‘)
}

# execute every Tuesday
dag = DAG(
    dag_id=‘tag_cover‘, default_args=args,
    schedule_interval=‘@weekly‘,
    dagrun_timeout=timedelta(minutes=10))

ad_sensor = HivePartitionSensor(
    task_id=‘ad_sensor‘,
    conn_host=‘192.168.72.18‘,
    conn_port=10000,
    table=‘ad.ad_log‘,
    partition="day_time={}".format(log_partition_value),
    dag=dag
)

ad_hive_task = BashOperator(
    task_id=‘ad_hive_task‘,
    bash_command=‘hive -f /path/to/cron/cover/ad_tag.hql --hivevar LOG_PARTITION={} ‘
                 ‘--hivevar TAG_PARTITION={}‘.format(log_partition_value, tag_partition_value),
    dag=dag
)

ad2_hive_task = BashOperator(
    task_id=‘ad2_hive_task‘,
    bash_command=‘hive -f /path/to/cron/cover/ad2_tag.hql --hivevar LOG_PARTITION={} ‘
                 ‘--hivevar TAG_PARTITION={}‘.format(log_partition_value, tag_partition_value),
    dag=dag
)

report_task = BashOperator(
    task_id=‘report_task‘,
    bash_command=‘sleep 5m; python3 /path/to/cron/report/tag_cover.py {}‘.format(log_partition_value),
    dag=dag
)

ad_sensor >> ad_hive_task >> report_task
ad_sensor >> ad2_hive_task >> report_task



工作流管理平台Airflow


推荐阅读
  • 在1995年,Simon Plouffe 发现了一种特殊的求和方法来表示某些常数。两年后,Bailey 和 Borwein 在他们的论文中发表了这一发现,这种方法被命名为 Bailey-Borwein-Plouffe (BBP) 公式。该问题要求计算圆周率 π 的第 n 个十六进制数字。 ... [详细]
  • 本文介绍了SIP(Session Initiation Protocol,会话发起协议)的基本概念、功能、消息格式及其实现机制。SIP是一种在IP网络上用于建立、管理和终止多媒体通信会话的应用层协议。 ... [详细]
  • 我的读书清单(持续更新)201705311.《一千零一夜》2006(四五年级)2.《中华上下五千年》2008(初一)3.《鲁滨孙漂流记》2008(初二)4.《钢铁是怎样炼成的》20 ... [详细]
  • 如何在PHP中安装Xdebug扩展
    本文介绍了如何从PECL下载并编译安装Xdebug扩展,以及如何配置PHP和PHPStorm以启用调试功能。 ... [详细]
  • 本文探讨了在一个物理隔离的环境中构建数据交换平台所面临的挑战,包括但不限于数据加密、传输监控及确保文件交换的安全性和可靠性。同时,作者结合自身项目经验,分享了项目规划、实施过程中的关键决策及其背后的思考。 ... [详细]
  • importjava.io.*;importjava.util.*;publicclass五子棋游戏{staticintm1;staticintn1;staticfinalintS ... [详细]
  • 本文探讨了程序员这一职业的本质,认为他们是专注于问题解决的专业人士。文章深入分析了他们的日常工作状态、个人品质以及面对挑战时的态度,强调了编程不仅是一项技术活动,更是个人成长和精神修炼的过程。 ... [详细]
  • 二维码的实现与应用
    本文介绍了二维码的基本概念、分类及其优缺点,并详细描述了如何使用Java编程语言结合第三方库(如ZXing和qrcode.jar)来实现二维码的生成与解析。 ... [详细]
  • 在日常生活中,支付宝已成为不可或缺的支付工具之一。本文将详细介绍如何通过支付宝实现免费提现,帮助用户更好地管理个人财务,避免不必要的手续费支出。 ... [详细]
  • 本文介绍了如何通过C#语言调用动态链接库(DLL)中的函数来实现IC卡的基本操作,包括初始化设备、设置密码模式、获取设备状态等,并详细展示了将TextBox中的数据写入IC卡的具体实现方法。 ... [详细]
  • 本文详细介绍了C++中的构造函数,包括其定义、特点以及如何通过构造函数进行对象的初始化。此外,还探讨了转换构造函数的概念及其在不同情境下的应用,以及如何避免不必要的隐式类型转换。 ... [详细]
  • 数据类型--char一、char1.1char占用2个字节char取值范围:【0~65535】char采用unicode编码方式char类型的字面量用单引号括起来char可以存储一 ... [详细]
  • 探索AI智能机器人自动盈利系统的构建
    用户可通过支付198元押金及30元设备维护费租赁AI智能机器人,推荐他人加入可获得相应佣金。随着推荐人数的增加,用户将逐步解锁更高版本,享受更多收益。 ... [详细]
  • 解决Visual Studio Code中PHP Intelephense误报问题
    PHP作为一种高度灵活的编程语言,其代码结构可能导致Intelephense插件在某些情况下报告不必要的错误或警告。自1.3.3版本起,Intelephense引入了多个配置选项,允许用户根据具体的工作环境和编程风格调整这些诊断信息的显示。 ... [详细]
  • 心理学经典:《思考致富》
    《思考致富》是由美国著名成功学大师拿破仑·希尔撰写的一部重要著作,该书基于希尔长达20年的深入研究和访谈,探讨了个人成功的核心要素。书中不仅揭示了成功的关键,还提供了一系列实用的方法和策略。 ... [详细]
author-avatar
会长大的幸福7007
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有