作者:丁木China | 来源:互联网 | 2024-09-25 12:49
我正在尝试使用环境变量将 DB 参数传递给 BashOperator,但我找不到任何文档/示例如何使用 Jinja 模板中的连接。
所以我正在寻找类似于变量的东西
echo {{ var.value. }}
回答
对于气流 >= 2.2.0:
假设您有 conn id,test_conn
您可以通过以下方式直接使用宏:
{{ conn.test_conn }}
所以你得到任何连接属性,如:
{{ conn.test_conn.host }}
, {{ conn.test_conn.login }}
,{{ conn.test_conn.password }}
等等。
对于气流 <2.2.0:
没有现成的宏,但是您可以创建自定义宏来解决这个问题。
连接示例:
创建宏:
def get_host(conn_id):
cOnnection= BaseHook.get_connection(conn_id)
return connection.host
def get_schema(conn_id):
cOnnection= BaseHook.get_connection(conn_id)
return connection.schema
def get_login(conn_id):
cOnnection= BaseHook.get_connection(conn_id)
return connection.login
在 DAG 中使用它们:
def print_function(**context):
print(f"host={context['host']} schema={context['schema']} login={context['login']}")
user_macros = {
'get_host': get_host,
'get_schema': get_schema,
'get_login': get_login,
}
with DAG(
dag_id='connection',
default_args=default_args,
schedule_interval=None,
user_defined_macros=user_macros,
) as dag:
# Example how to use as function
python_op = PythonOperator(
task_id='python_task',
provide_cOntext=True,
python_callable=print_function,
op_kwargs={
'host': get_host("test_conn"),
'schema': get_schema("test_conn"),
'login': get_login("test_conn"),
}
)
# Example how to use as Jinja string
bash_op = BashOperator(
task_id='bash_task',
bash_command='echo {{ get_host("test_conn") }} {{ get_schema("test_conn") }} {{ get_login("test_conn") }} ',
)
渲染PythonOperator
示例:
渲染BashOperator
示例:
一般说明:此代码的作用是创建一个自定义函数func()
以供使用,user_defined_macros
从而提供使用它的能力,就像 Airflow 本身定义的这个宏一样。您可以通过以下方式访问模板:{{ func() }}
如示例中所示,该函数允许接受参数。
请注意,您可以为连接对象中的所有字段创建此类函数。
请谨慎使用它,将密码作为文本传递可能不是一个好主意。