azkaban

>i # Azkaban概论 ## 为什么需要工作流调度系统 1、一个完整的数据分析系统通常都是由大量任务单元组成。 Shell脚本程序,Java 程序,MapReduce 程序、Hive 脚本等 2、各任务单元之间存在时间先后及前后依赖关系 3、为了很好地组织起这样的复杂执行计划,需要一个工作流调度系统来调度执行 ## 常见的工作流调度系统 1、简单的任务调度:直接使用Linux的Crontab来定义;。 2、复杂的任务调度:开发调度平台或使用现成的开源调度系统,比如 - Ooize:CDH平台下的一个工作流程调度器,可以借助CDH平台下的工作组件HUE,使用就比较友好,因为HUE提供了可视化的界面,脱离HUE后需要编写大量的xml文件,比较繁琐 - Azkaban:简单易用的工作流程调度器。配置工作流程通过yaml文件进行配置,定时规则在页面上进行配置 - Airflow:Python开发的工作流程调度器,配置流程规则需要使用Python脚本 >w Crontab可以实现简单的任务调度,它不能直接解决工作流程的依赖关系的 > 复杂的调度工具主要分两步:描述工作流程、配置定时 > 区别在于描述工作流程或配置定时规则 ## Azkaban与Ooize对比 总体来说,Ooize 相比Azkaban是一个重量级的任务调度系统,功能全面,但配置使用也更复杂,作为老牌的调度器,使用最多。如果可以不在意某些功能的缺失,轻量级调度器Azkaban是很不错的候选对象。 ## 基本架构 ![image.png](https://cos.easydoc.net/52087651/files/l6eqiev1.png) Azkaban Web Server:项目管理、用户管理、权限管理、任务的定时和触发、提供工作界面 Azkaban Executor Server:负责具体任务的执行 MySQL:工作流程的配置、定时规则、任务的执行状态的存储 ## 模式 - 单机模式:适合测试使用,Web Server、Executor Server是在一台服务器上 - 集群模式 :适合生产环境,Web Server、Executor Server独立部署,是两个独立的进程,集群可以部署多个独立的Executor Server能起到一定的负载均衡和容灾的作用 >i # 安装 ## 安装包 >s azkaban-db-3.84.4.tar.gz:azkaban在使用的时候会用到Mysql数据库,所以需要提前建好需要的数据库和表,这里面是所有的建表语句 azkaban-exec-server-3.84.4.tar.gz:Executor Server安装包 azkaban-web-server-3.84.4.tar.gz:Web Server安装包 ## 解压 ```shell tar -xvf azkaban-web-server-3.84.4.tar.gz -C /opt/azkaban tar -xvf azkaban-exec-server-3.84.4.tar.gz -C /opt/azkaban tar -xvf azkaban-db-3.84.4.tar.gz -C /opt/azkaban ``` ## 改名可选 ```shell mv azkaban-web-server-3.84.4 azkaban-web mv azkaban-exec-server-3.84.4 azkaban-exec ``` ## azkaban部署 ### 数据库初始化 > 所需库、表、用户的创建 - **正常安装MySQL** - **启动MySQL** `mysql -uroot -p` - 登录MySQL创建Azkaban数据库 `mysql> create database azkaban` - 创建Azkaban用户并赋予权限 - 设置密码有效4位以上 `mysql> set global validate_password_length=4;` - 设置密码策略最低级别 `mysql> set global validate_password_policy=0;` - 创建Azkaban用户,任何主机都可以访问Azkaban,密码自己定 `mysql> CREATE USER 'azkaban'@'%' IDENTIFIED BY 'liulike';` - 赋予Azkaban用户增删改查权限 `mysql> GRANT SELECT,INSERT,UPDATE,DELETE ON azkaban.* to 'azkaban'@'%' WITH GRANT OPTION;` - 创建Azkaban表,完成后退出MySQL `mysql> use azkaban;` `mysql> source /opt/azkaban/azkaban-db-3.84.4/create-all-sq1-3.84.4.sq1` `mysql> quit;` - 更改MySQL包大小;防止Azkaban连接MySQL阻塞 `vi /etc/my.cnf` 在[mysqld]下面加一行`max_allowed_packet=1024M` [mysqld] `max_allowed_packet=1024M` - 重启MySQL `systemctl restart mysq1de` ### Executor Server的部署与配置 Azkaban Executor Server 处理工作流和作业的实际执行。 - 编辑 azkaban.properties ```shell vi /opt/azkaban-web/conf/azkaban.properties ``` 修改如下标红的属性 <pre> #... <span style="color:red">default.timezone.id=Asia/Shanghai</span> #... <span style="color:red">azkaban.webserver.url=http://hadoop-01:8081</span> <span style="color:red">executor.port=12321</span> #... database.type=mysql mysql.port=3306 <span style="color:red">mysql.host=hadoop-01</span> mysql.database=azkaban mysql.user=azkaban <span style="color:red">mysql.password=liulike</span> mysql.numconnections=100 </pre> - 同步 azkaban-exec 到所有节点 [root@hadoop-01 azkaban]$ acp -r /opt/azkaban-exec - <span style="color:red">必须进入到/opt/azkaban-exec 路径</span>,分别在三台机器上,启动 executor server 注意:如果在/opt/azkaban-exec 目录下出现 executor.port 文件,说明 启动成功 ```shell [root@hadoop-01 azkaban-exec]$ bin/start-exec.sh [root@hadoop-02 azkaban-exec]$ bin/start-exec.sh [root@hadoop-03 azkaban-exec]$ bin/start-exec.sh ``` - 下面激活 executor,需要 ```shell [root@hadoop-01 azkaban-exec]$ curl -G "hadoop-01:12321/executor?action=activate" && echo [root@hadoop-01 azkaban-exec]$ curl -G "hadoop-01:12321/executor?action=activate" && echo [root@hadoop-01 azkaban-exec]$ curl -G "hadoop-01:12321/executor?action=activate" && echo ``` 如果三台机器都出现如下提示,则表示激活成功 ```shell {"status":"success"} ``` ### Web Server的部署与配置 Azkaban Web Server 处理项目管理,身份验证,计划和执行触发。 - 编辑 azkaban.properties ```shell [root@hadoop-01 azkaban]$ vi /opt/azkaban-web/conf/azkaban.properties ``` 修改如下属性 <pre> ... <span style="color:red">default.timezone.id=Asia/Shanghai</span> ... database.type=mysql mysql.port=3306 <span style="color:red">mysql.host=hadoop-01</span> mysql.database=azkaban mysql.user=azkaban <span style="color:red">mysql.password=liulike</span> mysql.numconnections=100 ... <span style="color:red">azkaban.executorselector.filters=StaticRemainingFlowSize,CpuStatus</span> </pre> >s 说明: #StaticRemainingFlowSize:正在排队的任务数; #CpuStatus:CPU 占用情况 #MinimumFreeMemory:内存占用情况。测试环境,必须将 MinimumFreeMemory 删除或注释掉,否则它会认为集群资源不够,不执行。 - 修改 azkaban-users.xml 文件,添加 root 用户 ```shell [root@hadoop-01 azkaban-web]$ vi /opt/azkaban-web/conf/azkaban-users.xml ``` ```shell <azkaban-users> <user groups="azkaban" password="azkaban" roles="admin" username="azkaban"/> <user password="metrics" roles="metrics" username="metrics"/> <user password="liulike" roles="admin" username="liulike"/> <role name="admin" permissions="ADMIN"/> <role name="metrics" permissions="METRICS"/> </azkaban-users> ``` - 必须进入到 hadoop-01 的/opt/azkaban-web 路径,启动 web server [root@hadoop-01 azkaban-web]$ bin/start-web.sh - 访问 http://hadoop-01:8081,并用 liulike 用户登陆 >i # 案例实操 ## HelloWorld 案例 - 在 windows 环境,新建 azkaban.project 文件,编辑内容如下 `azkaban-flow-version: 2.0` 注意:该文件作用,是采用新的 Flow-API 方式解析 flow 文件。 - 新建 basic.flow 文件,内容如下: ```shell nodes: - name: jobA type: command config: command: echo "Hello World" ``` >s 参数说明 (1)Name:job 名称 (2)Type:job 类型。command 表示你要执行作业的方式为命令 (3)Config:job 配置 >w flow文件写法与yaml文件的格式相同: ● 大小写敏感 ● 使用缩进表示层级关系 ● 不支持Tab键制表符缩进,只使用空格缩进 ● 缩进的空格数目不重要,只要相同层级的元素左侧对齐即可,通常开头缩进两个空格: ● 字符后缩进一个空格,如冒号,逗号,短横杆(-)等 ● 字符串可以不用引号标注 ● “#"表示注释 YAML 支持的数据结构有三种: ● 对象:键值对的集合,又称为映射(mapping)/ 哈希(hashes) / 字典(dictionary) ● 数组:一组按次序排列的值,又称为序列(sequence) / 列表(list) ● 纯量(scalars):单个的、不可再分的值 - 将 azkaban.project、basic.flow 文件压缩到一个 zip 文件,文件名称必须是英文。 - 在 WebServer 新建项目:http://hadoop-01:8081/index ![image.png](https://cos.easydoc.net/52087651/files/l6fwza85.png) - 给项目名称命名和添加项目描述 ![image.png](https://cos.easydoc.net/52087651/files/l6fx0hgi.png) - first.zip 文件上传、选择上传的文件 ![image.png](https://cos.easydoc.net/52087651/files/l6fx1mvp.png) - 执行任务流 ![image.png](https://cos.easydoc.net/52087651/files/l6fxd1up.png) ![image.png](https://cos.easydoc.net/52087651/files/l6fxffhv.png) ![image.png](https://cos.easydoc.net/52087651/files/l6fxgiky.png) - 在日志中,查看运行结果 ## 作业依赖案例 需求:JobA 和 JobB 执行完了,才能执行 JobC 具体步骤: - 修改 basic.flow 为如下内容 ```shell nodes: - name: jobC type: command # jobC 依赖 JobA 和 JobB dependsOn: - jobA - jobB config: command: echo "I’m JobC" - name: jobA type: command config: command: echo "I’m JobA" - name: jobB type: command config: command: echo "I’m JobB" ``` >s 参数说明: dependsOn:作业依赖,后面案例中演示 - 将修改后的 basic.flow 和 azkaban.project 压缩成 second.zip 文件 - 重复 `HelloWorld 案例` 后续步骤。 ![image.png](https://cos.easydoc.net/52087651/files/l6k6ajmf.png) ![image.png](https://cos.easydoc.net/52087651/files/l6k69at2.png) ![image.png](https://cos.easydoc.net/52087651/files/l6k6d17v.png) ## 自动失败重试案例 需求:如果执行任务失败,需要重试 3 次,重试的时间间隔 10000ms 具体步骤: - 编译配置流 ```shell nodes: # 需求:如果执行任务失败,需要重试 3 次,重试的时间间隔 1s - name: JobA type: command config: command: sh /not_exists.sh retries: 3 retry.backoff: 1000 ``` >s 参数说明: retries:重试次数 retry.backoff:重试的时间间隔 - 将修改后的 basic.flow 和 azkaban.project 压缩成 three.zip 文件 - 重复 `HelloWorld 案例` 后续步骤。 - 执行并观察到一次失败+三次重试 ![image.png](https://cos.easydoc.net/52087651/files/l6k6gt90.png) - 也可以点击上图中的 Log,在任务日志中看到,总共执行了 4 次。 ![image.png](https://cos.easydoc.net/52087651/files/l6k6i3nm.png) - 也可以在 Flow 全局配置中添加任务失败重试配置,此时重试配置会应用到所有 Job。 案例如下: ```shell config: # 此时配置会在全局执行 retries: 3 retry.backoff: 10000 nodes: # 需求:如果执行任务失败,需要重试 3 次,重试的时间间隔 1s - name: JobA type: command ``` ## 手动失败重试案例 需求:JobA——>JobB(依赖于 A)——>JobC——>JobD——>JobE——>JobF。生产环境,任何 Job 都有可能挂掉,可以根据需求执行想要执行的 Job。 具体步骤: 1)编译配置流 ```yaml nodes: - name: JobA type: command config: command: echo "This is JobA." - name: JobB type: command dependsOn: - JobA config: command: echo "This is JobB." - name: JobC type: command dependsOn: - JobB config: command: echo "This is JobC." - name: JobD type: command dependsOn: - JobC config: command: echo "This is JobD." - name: JobE type: command dependsOn: - JobD config: command: echo "This is JobE." - name: JobF type: command dependsOn: - JobE config: command: echo "This is JobF. ``` 2)将修改后的 basic.flow 和 azkaban.project 压缩成 five.zip 文件 3)重复 HelloWorld案例 后续步骤。 Enable 和 Disable 下面都分别有如下参数: Parents:该作业的上一个任务 Ancestors:该作业前的所有任务 Children:该作业后的一个任务 Descendents:该作业后的所有任务 Enable All:所有的任务 4)可以根据需求选择性执行对应的任务。 ![image.png](https://cos.easydoc.net/52087651/files/l6k6p0yr.png) ![image.png](https://cos.easydoc.net/52087651/files/l6k6pfu3.png) 关闭先前已执行的 ![image.png](https://cos.easydoc.net/52087651/files/l6k6rrvr.png) ![image.png](https://cos.easydoc.net/52087651/files/l6k6t24i.png) ![image.png](https://cos.easydoc.net/52087651/files/l6k6tayo.png) # Azkaban 进阶 ## JavaProcess 作业类型案例 JavaProcess 类型可以运行一个自定义主类方法,type 类型为 javaprocess,可用的配置为: Xms:最小堆 Xmx:最大堆 classpath:类路径 java.class:要运行的 Java 对象,其中必须包含 Main 方法 main.args:main 方法的参数 案例: 1)新建一个 azkaban 的 maven 工程 2)创建包名:com.root 3)创建 AzTest 类 ```java package com.liulike; public class AzTest { public static void main(String[] args) { System.out.println("This is for testing!"); } } ``` 4)打包成 jar 包 azkaban_java.jar 5)新建 testJava.flow,内容如下 ```yaml nodes: - name: test_java type: javaprocess config: Xms: 96M Xmx: 200M java.class: com.liulike.AzTest ``` 6)将 Jar 包、flow 文件和 project 文件打包成 javatest.zip 7)创建项目——>上传 javatest.zip ——>执行作业——>观察结果 ![image.png](https://cos.easydoc.net/52087651/files/l6k6uu6v.png) ## 条件工作流案例 条件工作流功能允许用户自定义执行条件来决定是否运行某些Job。条件可以由当前Job 的父 Job 输出的运行时参数构成,也可以使用预定义宏。在这些条件下,用户可以在确定 Job 执行逻辑时获得更大的灵活性,例如,只要父 Job 之一成功,就可以运行当前 Job。 ### 运行时参数案例 1)基本原理 (1)父 Job 将参数写入 JOB_OUTPUT_PROP_FILE 环境变量所指向的文件 (2)子 Job 使用 ${jobName:param}来获取父 Job 输出的参数并定义执行条件 2)支持的条件运算符: (1)== 等于 (2)!= 不等于 (3)> 大于 (4)>= 大于等于 (5)< 小于 (6)<= 小于等于 (7)&& 与 (8)|| 或 (9)! 非 3)案例: 需求: JobA 执行一个 shell 脚本。 JobB 执行一个 shell 脚本,但 JobB 不需要每天都执行,而只需要每个周一执行。 (1)新建 JobA.sh ```shell #!/bin/bash echo "do JobA" wk=`date +%w` echo "{\"wk\":$wk}" > $JOB_OUTPUT_PROP_FILE ``` (2)新建 JobB.sh ```shell #!/bin/bash echo "do JobB" ``` (3)新建 condition.flow ```yaml nodes: - name: JobA type: command config: command: sh JobA.sh - name: JobB type: command dependsOn: - JobA config: command: sh JobB.sh condition: ${JobA:wk} == 1 ``` (4)将 JobA.sh、JobB.sh、condition.flow 和 azkaban.project 打包成 condition.zip (5)创建 condition 项目——>上传 condition.zip 文件——>执行作业——>观察结果 (6)按照我们设定的条件,JobB 会根据当日日期决定是否执行。 ![image.png](https://cos.easydoc.net/52087651/files/l6k6ylhk.png) 如果正好是星期一: ![image.png](https://cos.easydoc.net/52087651/files/l6k72rjs.png) ![image.png](https://cos.easydoc.net/52087651/files/l6k726va.png) 如果不是星期一: ![image.png](https://cos.easydoc.net/52087651/files/l6k70u1b.png) ![image.png](https://cos.easydoc.net/52087651/files/l6k713x8.png) ### 预定义宏案例 Azkaban 中预置了几个特殊的判断条件,称为预定义宏。 预定义宏会根据所有父 Job 的完成情况进行判断,再决定是否执行。可用的预定义宏如下: (1)all_success: 表示父 Job 全部成功才执行(默认) (2)all_done:表示父 Job 全部完成才执行 (3)all_failed:表示父 Job 全部失败才执行 (4)one_success:表示父 Job 至少一个成功才执行 (5)one_failed:表示父 Job 至少一个失败才执行 1)案例 需求: JobA 执行一个 shell 脚本 JobB 执行一个 shell 脚本 JobC 执行一个 shell 脚本,要求 JobA、JobB 中有一个成功即可执行 (1)新建 JobA.sh ```shell #!/bin/bash echo "do JobA" ``` (2)新建 JobC.sh ```shell #!/bin/bash echo "do JobC" ``` (3)新建 macro.flow ```yaml nodes: - name: JobA type: command config: command: sh JobA.sh - name: JobB type: command config: command: sh JobB.sh - name: JobC type: command dependsOn: - JobA - JobB config: command: sh JobC.sh condition: one_success ``` (4)JobA.sh、JobC.sh、macro.flow、azkaban.project 文件,打包成 macro.zip。 >d 注意:没有 JobB.sh (5)创建 macro 项目——>上传 macro.zip 文件——>执行作业——>观察结果 ![image.png](https://cos.easydoc.net/52087651/files/l6k740n3.png) ![image.png](https://cos.easydoc.net/52087651/files/l6k74eee.png) ![image.png](https://cos.easydoc.net/52087651/files/l6k74ut6.png) ## 定时执行案例 需求:JobA 每间隔 1 分钟执行一次; 具体步骤: 1)Azkaban 可以定时执行工作流。在执行工作流时候,选择左下角 Schedule ![image.png](https://cos.easydoc.net/52087651/files/l6k77v3v.png) 2)右上角注意时区是上海,然后在左面填写具体执行事件,填写的方法和 crontab 配置定时任务规则一致。 ![image.png](https://cos.easydoc.net/52087651/files/l6k77ga9.png) 3)观察结果 ![image.png](https://cos.easydoc.net/52087651/files/l6k78i9u.png) ![image.png](https://cos.easydoc.net/52087651/files/l6k7cjcy.png) 4)删除定时调度 点击 remove Schedule 即可删除当前任务的调度规则。 ![image.png](https://cos.easydoc.net/52087651/files/l6k7cywl.png) ## 邮件报警案例 ### 注册邮箱 1)申请注册一个 126 邮箱 ![image.png](https://cos.easydoc.net/52087651/files/l6k89dz8.png) 2)点击设置——>POP3/SMTP/IMAP ![image.png](https://cos.easydoc.net/52087651/files/l6k8alg7.png) 3)开启 SMTP 服务 ![image.png](https://cos.easydoc.net/52087651/files/l6k41vtt.png) 4)一定要记住授权码 ![image.png](https://cos.easydoc.net/52087651/files/l6k40mr6.png) ### 默认邮件报警案例 Azkaban 默认支持通过邮件对失败的任务进行报警,配置方法如下: 1 ) 在azkaban-web节点hadoop-01上,编辑 /opt/azkaban-web/conf/azkaban.properties,修改如下内容 ```shell [root@hadoop-01 azkaban-web]$ vi /opt/azkaban-web/conf/azkaban.properties ``` 添加如下内容: #这里设置邮件发送服务器,需要 申请邮箱,切开通 stmp 服务,以下只是例子 ```shell mail.sender=root@126.com mail.host=smtp.126.com mail.user=root@126.com mail.password=用邮箱的授权码 ``` 2)保存并重启 web-server ```shell [root@hadoop-01 azkaban-web]$ bin/shutdown-web.sh [root@hadoop-01 azkaban-web]$ bin/start-web.sh ``` 3)编辑 basic.flow ```yaml nodes: - name: jobA type: command config: command: echo "This is an email test." ``` 4)将 azkaban.project 和 basic.flow 压缩成 email.zip 5)创建工程——>上传文件——>执行作业——>查看结果 ![image.png](https://cos.easydoc.net/52087651/files/l6k7jw56.png) ## 电话报警案例 ### 第三方告警平台集成 有时任务执行失败后邮件报警接收不及时,因此可能需要其他报警方式,比如电话报警。 如有类似需求,可与第三方告警平台进行集成,例如睿象云。 1)进入睿象云官网注册账号并登录 官网地址:[https://www.aiops.com/](https://www.aiops.com/) ![image.png](https://cos.easydoc.net/52087651/files/l6k8v5w6.png) 2)集成告警平台,使用 Email 集成 ![image.png](https://cos.easydoc.net/52087651/files/l6k8y9z3.png) 3)获取邮箱地址,后边需将报警信息发送至该邮箱 ![image.png](38) 4)配置分派策略 ![image.png](https://cos.easydoc.net/52087651/files/l6k94pjz.png) 5)配置通知策略 ![image.png](https://cos.easydoc.net/52087651/files/l6k97mtv.png) ### 测试 执行上一个邮件通知的案例,将通知对象改为刚刚集成第三方平台时获取的邮箱。 ## Azkaban 多 Executor 模式注意事项 Azkaban 多 Executor 模式是指,在集群中多个节点部署 Executor。在这种模式下, Azkaban web Server 会根据策略,选取其中一个 Executor 去执行任务。 为确保所选的 Executor 能够准确的执行任务,我们须在以下两种方案任选其一,推荐使 用方案二。 方案一:指定特定的 Executor(hadoop-01)去执行任务。 1)在 MySQL 中 azkaban 数据库 executors 表中,查询 hadoop-01 上的 Executor 的 id。 2)在执行工作流程时加入 useExecutor 属性,如下 方案二:在 Executor 所在所有节点部署任务所需脚本和应用。 # 参考资料 ## Azkaban 完整配置 见官网文档:https://azkaban.readthedocs.io/en/latest/configuration.html ## 可能遇见的错误 ```shell 08-08-2022 10:38:29 CST jobA INFO - Cannot request memory (Xms 0 kb, Xmx 0 kb) from system for job jobA, sleep for 60 secs and retry, attempt 1 of 720 ``` 解决: 在azkaban的根目录下的plugins/jobtypes/目录下的commonprivate.properties文件内末尾加上`memCheck.enabled=false`重启即可