作者:嘉兴布奇乐乐园 | 来源:互联网 | 2023-01-29 09:30
我已经设置了postgresql数据库的气流,我正在创建多个dags
def subdag(parent_dag_name, child_dag_name,currentDate,batchId,category,subCategory,yearMonth,utilityType,REALTIME_HOME, args):
dag_subdag = DAG(
dag_id='%s.%s' % (parent_dag_name, child_dag_name),
default_args=args,
schedule_interval="@once",
)
# get site list to run bs reports
site_list = getSiteListforProcessing(category,subCategory,utilityType,yearMonth);
print (site_list)
def update_status(siteId,**kwargs):
createdDate=getCurrentTimestamp();
print ('N',siteId,batchId,yearMonth,utilityType,'N')
updateJobStatusLog('N',siteId,batchId,yearMonth,utilityType,'P')
def error_status(siteId,**kwargs):
createdDate=getCurrentTimestamp();
print ('N',siteId,batchId,yearMonth,utilityType,'N')
BS_template = """
echo "{{ params.date }}"
java -cp xx.jar com.xGenerator {{params.siteId}} {{params.utilityType}} {{params.date}}
"""
for index,siteid in enumerate(site_list):
t1 = BashOperator(
task_id='%s-task-%s' % (child_dag_name, index + 1),
bash_command=BS_template,
params={'date': currentDate, 'realtime_home': REALTIME_HOME,'siteId': siteid, "utilityType":utilityType},
default_args=args,
dag=dag_subdag)
t2 = PythonOperator(
task_id='%s-updatetask-%s' % (child_dag_name, index + 1),
dag=dag_subdag,
python_callable=update_status,
op_kwargs={'siteId':siteid})
t2.set_upstream(t1)
return dag_subdag
它创建动态任务,但是在所有动态任务中,它始终会失败,并将错误记录为:"使用sqlite时不能使用多个线程.将max_threads设置为1"例如:如果创建4个任务3次运行,并且如果创建2个任务1次运行.