airflow 文档学习(一) 基本Operator

1. Operator和task

简单来说,Operator就是task的抽象类

2. BaseOperator

所有的功能性Operator的来源

2.1 参数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
task_id (string) :唯一标识task的id
owner (string)
retries (int):任务重试此时
retry_delay (timedelta) :重试间隔
start_date (datetime):任务开始时间,第一个任务实例的执行时间
end_date (datetime):如果指定的话,任务执行不会超过这个时间
depends_on_past (bool):如果指定的话,任务实例将以此运行并且依赖上一个任务的成功
wait_for_downstream (bool):在这个参数指定的任何地方,depends_on_past都将强制为true
queue(str):指定到队列,CeleryExcutor指定特定队列
dag(DAG):指定dag
pool(str):此任务运行的插槽池,限制任务的并发
execution_timeout (datetime.timedelta):执行此任务实例允许的最长时间,超过最长时间则任务失败
sla(datetime.timedelta):作业预计成功时间
on_failure_callback(callable):当此任务实例失败时调用的函数
on_retry_callback (callable) :与on_failure_callback相似,只是重试任务时触发
on_success_callback (callable) :与on_failure_callback相似,任务成功时触发
trigger_rule (str):定义依赖的触发规则
·包括的选项有:{all_success | all_failed | all_done | one_success | one_failed | dummy},默认为all_success
run_as_user(str):在运行任务时使用unix用户名进行模拟
executor_config (dict):特定的执行程序解释其任务参数
task_concurrency (int):设置后,任务将限制execution_dates之间的并发运行
resources (dict):资源参数名称(Resources构造函数的参数名称)与其值的映射。
priority_weight(int):此任务的优先级权重
weight_rule(str):用于任务的有效总优先级权重的加权方法
max_retry_delay (timedelta):重试最大延迟间隔
retry_exponential_backoff (bool)

2.2 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
set_downstream(task_or_task_list)
将任务或者任务列表设置为当前任务的下游
set_upstream(task_or_task_list)
将任务或者任务列表设置为当前任务的上游

clear(start_date=None, end_date=None, upstream=False, downstream=False, session=None)
根据指定参数清除与任务关联的任务实例的状态
execute(context)
创建operator时派生的主要方法
get_direct_relative_ids(upstream=False)
获取当前任务上游或者下游的直接相对id
get_direct_relatives(upstream=False)
获取当前任务的上游或者下游直接关联对象
get_flat_relative_ids(upstream=False, found_descendants=None)
获取上游或者下游的关联对象的id列表
get_flat_relatives(upstream=False)
获取上游或者下游的关联对象列表
get_task_instances(session, start_date=None, end_date=None)
获取特定时间范围的与此任务相关的任务实例
has_dag()
是否设置dag
on_kill()
当任务实例被杀死时,重写方法以清除子进程
run(start_date=None, end_date=None, ignore_first_depends_on_past=False, ignore_ti_state=False, mark_success=False)
为日期范围运行一组任务实例
post_execute(context, result=None)
调用self.execute()后立即触发,它传递上下文和operator返回的结果
pre_execute(context)
调用self.execute()之前触发
prepare_template()
模板化字段被其内容替换之后触发
render_template(attr, content, context)
从文件或直接在字段中呈现模板,并返回呈现结果
render_template_from_field(attr, content, context, jinja_env)
从字段中呈现模板
xcom_pull(context, task_ids=None, dag_id=None, key='return_value', include_prior_dates=None)
xcom_push(context, key, value, execution_date=None)

2.3 可调用值

1
2
3
4
5
dag:如果设置了则返回dag,否则报错
deps:返回依赖任务列表
downstream_list:下游任务列表
schedule_interval:任务排列
upstream_list:上游任务列表

3. BaseSensorOperator

基于 airflow.models.BaseOperator, airflow.models.SkipMixin

3.1 参数:

1
2
3
soft_fail (bool):设置为true以将任务标记为失败时的skipped
poke_interval (int):作业在每次尝试之间应等待的时间(单位:秒)
timeout (int):超时时间

3.2 方法:

1
2
execute(context)
poke(context)

4. Core Operators

4.1 airflow.sensors.base_sensor_operator.BaseSensorOperator(poke_interval=60, timeout=604800, soft_fail=False, *args, **kwargs)

基于 airflow.models.BaseOperator

4.1.1 参数:

1
2
3
bash_command (string):bash命令
xcom_push (bool):设置为true,则当bash命令完成之后,写入stdout的最后一行也会被推送到xcom
env (dict):如果不为None的话,则它定义新进程的环境变量的映射,用于替代当前的进程环境

4.1.2 方法:

1
2
3
4
execute(context)
会在临时目录中执行bash,并且之后会对其进行清理
on_kill()
同BaseOperator

4.2 airflow.operators.bash_operator.BashOperator(bash_command, xcom_push=False, env=None, output_encoding=’utf-8’, *args, **kwargs)

基于 airflow.operators.python_operator.PythonOperator, airflow.models.SkipMixin

4.3 airflow.operators.python_operator.BranchPythonOperator(python_callable, op_args=None, op_kwargs=None, provide_context=False, templates_dict=None, templates_exts=None, *args, **kwargs)

基于 airflow.operators.python_operator.PythonOperator, airflow.models.SkipMixin

4.4 airflow.operators.check_operator.CheckOperator(sql, conn_id=None, *args, **kwargs)

基于 airflow.models.BaseOperator

CheckOperator需要返回一个单行的sql查询,第一行的每个值都需要使用python bool cast进行计算,如果任何返回值false,则检查失败并输出错误

4.4.1 注意:

1)python bool cast 会将一下视作false:

 False

 0

 Empty string (“”)

 Empty list ([])

 Empty dictionary or set ({})

2)这是一个抽象类,需要定义get_db_hook,而get_db_hook是钩子,从外部获取单个记录

4.4.2 参数:

1
sql (string):执行的sql语句

4.5 airflow.operators.email_operator.EmailOperator(to, subject, html_content, files=None, cc=None, bcc=None, mime_subtype=’mixed’, mime_charset=’us_ascii’, *args, **kwargs)

基于 airflow.models.BaseOperator

功能:发送一封邮件

4.5.1 参数:

1
2
3
4
5
6
7
8
to (list or string (comma or semicolon delimited)):发送电子邮件的邮件列表
subject (string):邮件主题
html_content (string):邮件内容(允许html标记)
files (list):附件名称
cc (list or string (comma or semicolon delimited)):抄送列表
bcc (list or string (comma or semicolon delimited)):bcc列表
mime_subtype (string):MIME字内容类型
mime_charset (string):字符集参数添加到 Content-Type 头部

4.6 airflow.operators.mysql_operator.MySqlOperator(sql, mysql_conn_id=’mysql_default’, parameters=None, autocommit=False, database=None, *args, **kwargs)

基于 airflow.models.BaseOperator

功能:在指定的mysql数据库中执行sql代码

4.6.1 参数:

1
2
3
mysql_conn_id (string):数据库名
sql:执行的sql语句
database (string):覆盖连接中定义的数据库名称

4.7 airflow.operators.presto_check_operator.PrestoValueCheckOperator(sql, pass_value, tolerance=None, presto_conn_id=’presto_default’, *args, **kwargs)

基于:airflow.models.BaseOperator

4.7.1 参数:

1
2
3
4
5
6
python_callable (python callable):可调用对象的引用
op_kwargs (dict):关键字参数字典,将在调用函数中解压
op_args (list):调用callable时将解压缩的位置参数列表
provide_context (bool):如果设置为true,Airflow将传递一组可在函数中使用的关键字参数。 这组kwargs完全对应于你在jinja模板中可以使用的内容,为此,您需要在函数头中定义** kwargs
templates_dict (dict of str):一个字典,其中的值是模板,这些模板将在__init__和执行之间的某个时间由Airflow引擎进行模板化,并在应用模板后在可调用的上下文中可用
templates_exts (list(str)):处理模板化字段时要解析的文件扩展名列表

4.8 airflow.operators.python_operator.PythonVirtualenvOperator(python_callable, requirements=None, python_version=None, use_dill=False, system_site_packages=True, op_args=None, op_kwargs=None, string_args=None, templates_dict=None, templates_exts=None, *args, **kwargs)

###4.9 airflow.operators.python_operator.ShortCircuitOperator(python_callable, op_args=None, op_kwargs=None, provide_context=False, templates_dict=None, templates_exts=None, *args, **kwargs)

基于:airflow.operators.python_operator.PythonOperator, airflow.models.SkipMixin

功能:仅在满足条件时才允许工作流继续。 否则,将跳过工作流程“短路”和下游任务。
派生自PythonOperator,条件结果由python_callable决定

4.10 airflow.sensors.http_sensor.HttpSensor(endpoint, http_conn_id=’http_default’, method=’GET’, request_params=None, headers=None, response_check=None, extra_options=None, *args, **kwargs)

基于:airflow.sensors.base_sensor_operator.BaseSensorOperator

4.10.1 参数:

1
2
3
4
5
6
7
http_conn_id (string):连接
method (string):方法
endpoint (string):完整的url
request_params (a dictionary of string key/value pairs):添加到get的url参数
headers (a dictionary of string key/value pairs):添加到get请求的http头部
response_check (A lambda or defined function.):检查请求相应对象
extra_options(选项字典,其中key是字符串,值取决于要修改的选项。):“请求”库的额外选项,请参阅“请求”文档(修改超时,ssl等选项)

4.11 airflow.sensors.sql_sensor.SqlSensor(conn_id, sql, *args, **kwargs)

基于:airflow.sensors.base_sensor_operator.BaseSensorOperator

功能:运行sql语句,直到满足条件。 它将继续尝试,而sql不返回任何行,或者如果第一个单元格返回(0,’0’,’’)

4.11.1 参数:

1
2
conn_id (string)
sql

4.12 airflow.sensors.time_sensor.TimeSensor(target_time, *args, **kwargs)

基于:airflow.sensors.base_sensor_operator.BaseSensorOperator

功能:等到当天的指定时间

4.12.1 参数:

1
target_time (datetime.time)

其他

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
4.13 airflow.sensors.time_delta_sensor.TimeDeltaSensor(delta, *args, **kwargs)
4.14 airflow.sensors.web_hdfs_sensor.WebHdfsSensor(filepath, webhdfs_conn_id='webhdfs_default', *args, **kwargs)
4.15 airflow.operators.latest_only_operator.LatestOnlyOperator(task_id, owner='Airflow', email=None, email_on_retry=True, email_on_failure=True, retries=0, retry_delay=datetime.timedelta(0, 300), retry_exponential_backoff=False, max_retry_delay=None, start_date=None, end_date=None, schedule_interval=None, depends_on_past=False, wait_for_downstream=False, dag=None, params=None, default_args=None, adhoc=False, priority_weight=1, weight_rule='downstream', queue='default', pool=None, sla=None, execution_timeout=None, on_failure_callback=None, on_success_callback=None, on_retry_callback=None, trigger_rule='all_success', resources=None, run_as_user=None, task_concurrency=None, executor_config=None, inlets=None, outlets=None, *args, **kwargs)
4.16 airflow.operators.mssql_operator.MsSqlOperator(sql, mssql_conn_id='mssql_default', parameters=None, autocommit=False, database=None, *args, **kwargs)
4.17 airflow.operators.mssql_to_hive.MsSqlToHiveTransfer(sql, hive_table, create=True, recreate=False, partition=None, delimiter='x01', mssql_conn_id='mssql_default', hive_cli_conn_id='hive_cli_default', tblproperties=None, *args, **kwargs)
4.18 airflow.operators.dummy_operator.DummyOperator(*args, **kwargs)
4.19 airflow.operators.mysql_to_hive.MySqlToHiveTransfer(sql, hive_table, create=True, recreate=False, partition=None, delimiter='x01', mysql_conn_id='mysql_default', hive_cli_conn_id='hive_cli_default', tblproperties=None, *args, **kwargs)
4.20 airflow.operators.oracle_operator.OracleOperator(sql, oracle_conn_id='oracle_default', parameters=None, autocommit=False, *args, **kwargs)
4.21 airflow.operators.pig_operator.PigOperator(pig, pig_cli_conn_id='pig_cli_default', pigparams_jinja_translate=False, *args, **kwargs)
4.22 airflow.operators.postgres_operator.PostgresOperator(sql, postgres_conn_id='postgres_default', autocommit=False, parameters=None, database=None, *args, **kwargs)
4.23 airflow.operators.presto_check_operator.PrestoCheckOperator(sql, presto_conn_id='presto_default', *args, **kwargs)
4.24 airflow.operators.presto_check_operator.PrestoIntervalCheckOperator(table, metrics_thresholds, date_filter_column='ds', days_back=-7, presto_conn_id='presto_default', *args, **kwargs)
4.25 airflow.operators.presto_to_mysql.PrestoToMySqlTransfer(sql, mysql_table, presto_conn_id='presto_default', mysql_conn_id='mysql_default', mysql_preoperator=None, *args, **kwargs)
4.26 airflow.operators.druid_check_operator.DruidCheckOperator(sql, druid_broker_conn_id='druid_broker_default', *args, **kwargs)
4.27 airflow.operators.generic_transfer.GenericTransfer(sql, destination_table, source_conn_id, destination_conn_id, preoperator=None, *args, **kwargs)
4.28 airflow.operators.hive_to_druid.HiveToDruidTransfer(sql, druid_datasource, ts_dim, metric_spec=None, hive_cli_conn_id='hive_cli_default', druid_ingest_conn_id='druid_ingest_default', metastore_conn_id='metastore_default', hadoop_dependency_coordinates=None, intervals=None, num_shards=-1, target_partition_size=-1, query_granularity='NONE', segment_granularity='DAY', hive_tblproperties=None, *args, **kwargs)
4.29 airflow.operators.s3_file_transform_operator.S3FileTransformOperator(source_s3_key, dest_s3_key, transform_script=None, select_expression=None, source_aws_conn_id='aws_default', dest_aws_conn_id='aws_default', replace=False, *args, **kwargs)
4.30 airflow.operators.s3_to_hive_operator.S3ToHiveTransfer(s3_key, field_dict, hive_table, delimiter=', ', create=True, recreate=False, partition=None, headers=False, check_headers=False, wildcard_match=False, aws_conn_id='aws_default', hive_cli_conn_id='hive_cli_default', input_compressed=False, tblproperties=None, select_expression=None, *args, **kwargs)
4.31 airflow.operators.s3_to_redshift_operator.S3ToRedshiftTransfer(schema, table, s3_bucket, s3_key, redshift_conn_id='redshift_default', aws_conn_id='aws_default', copy_options=(), autocommit=False, parameters=None, *args, **kwargs)
4.32 airflow.operators.hive_to_mysql.HiveToMySqlTransfer(sql, mysql_table, hiveserver2_conn_id='hiveserver2_default', mysql_conn_id='mysql_default', mysql_preoperator=None, mysql_postoperator=None, bulk_load=False, *args, **kwargs)
4.33 airflow.operators.hive_to_samba_operator.Hive2SambaOperator(hql, destination_filepath, samba_conn_id='samba_default', hiveserver2_conn_id='hiveserver2_default', *args, **kwargs)
4.34 airflow.sensors.metastore_partition_sensor.MetastorePartitionSensor(table, partition_name, schema='default', mysql_conn_id='metastore_mysql', *args, **kwargs)
4.35 airflow.sensors.named_hive_partition_sensor.NamedHivePartitionSensor(partition_names, metastore_conn_id='metastore_default', poke_interval=180, hook=None, *args, **kwargs)
4.36 airflow.sensors.s3_key_sensor.S3KeySensor(bucket_key, bucket_name=None, wildcard_match=False, aws_conn_id='aws_default', *args, **kwargs)
4.37 airflow.sensors.s3_prefix_sensor.S3PrefixSensor(bucket_name, prefix, delimiter='/', aws_conn_id='aws_default', *args, **kwargs)
4.38 airflow.operators.hive_operator.HiveOperator(hql, hive_cli_conn_id='hive_cli_default', schema='default', hiveconfs=None, hiveconf_jinja_translate=False, script_begin_tag=None, run_as_owner=False, mapred_queue=None, mapred_queue_priority=None, mapred_job_name=None, *args, **kwargs)
4.39 airflow.operators.hive_stats_operator.HiveStatsCollectionOperator(table, partition, extra_exprs=None, col_blacklist=None, assignment_func=None, metastore_conn_id='metastore_default', presto_conn_id='presto_default', mysql_conn_id='airflow_db', *args, **kwargs)
4.40 airflow.operators.check_operator.IntervalCheckOperator(table, metrics_thresholds, date_filter_column='ds', days_back=-7, conn_id=None, *args, **kwargs)
4.41 airflow.operators.jdbc_operator.JdbcOperator(sql, jdbc_conn_id='jdbc_default', autocommit=False, parameters=None, *args, **kwargs)
欢迎打赏!您的鼓励将支持我继续前行!