1.使用Client类
import logging
from apscheduler.schedulers.blocking import BlockingScheduler
from clickhouse_driver import Clientlogger = logging.getLogger(__name__) # 操作日志对象class CkClient():"""数据库连接的公共类,初始化连接,自定义查询,删除等操作"""def __init__(self):self.host = "127.0.0.1"self.port = 40009self.user = "default"self.password = ""self.db = "warehouse_main_ck"self.client = Client(user=self.user, password=self.password, host=self.host, port=self.port, database=self.db)self.resultlist = []# 多条sql用;连接,拆分后,逐一执行def spliteSql(self, sql):sqllist = sql.split(';')return sqllist[0:-1]def executeSql(self, sql: str) -> list:"""支持执行多条sql"""sqllist = self.spliteSql(sql) logger.info(f"开始执行sql语句")for i in sqllist:self.resultlist = self.client.execute(i)return self.resultlistdef dojob():ck = CkClient()sql = """alter table dim_customer_statictis delete where 1=1;insert into dim_customer_statictiswith (select sum(buyer_cnt)from (select count(distinct buyer_id) as buyer_cntfrom dws_buyer_tagwhere province != ''group by province)
) as total_cnt,(select sum(order_total_amt)from dws_buyer_tagwhere province != '') as total_amt
select province,buyer_cnt,buyer_percent,order_per_price,sales_percent
from (select province,total_cnt,total_amt,count(distinct buyer_id) as buyer_cnt,round(buyer_cnt / total_cnt, 6) as buyer_percent,sum(order_total_amt) as order_total_amt,round(avg(order_per_price), 2) as order_per_price,round(order_total_amt / total_amt, 6) as sales_percentfrom dws_buyer_tagwhere province != ''group by provinceorder by buyer_cnt desc);"""# sql1 = "alter table dim_customer_statictis delete where 1=1;"result = ck.executeSql(sql)# result1 = ck.executeSql(sql1)print("res:", result)def data_main():# 创建调度器:BlockingSchedulerscheduler = BlockingScheduler()# 添加任务,定时启动,每天9:49scheduler.add_job(dojob, 'cron', hour=9, minute=49) # day_of_week='1-5'scheduler.start()if __name__ == "__main__":data_main()
2.使用connect函数
import logging
import traceback
from config.ckConfig import ck_configs # 这个是自定义的模块,可以是多个clickhouse连接配置,ck_configs = {data={"host":xxx,"port":xxx,"user":xxx...},data1={"host":xxx,"port":xxx,"user":xxx...}}from clickhouse_driver import connectclass ClickhouseManger:def __init__(self, conn_conf):"""初始化连接:param conn_conf:"""self.host = conn_conf["host"]self.port = conn_conf["port"]self.pwd = conn_conf["pwd"]self.user = conn_conf["user"]self.db = conn_conf["db"]self.conn = connect(user=self.user, password=self.pwd, host=self.host, port=self.port, database=self.db)def _get_cursor(self):"""获取游标:return:"""return self.conn.cursor()def fetchone(self, sql):"""查询单条数据:param sql: sql语句:return:"""logging.info(msg=f"------clickhouse SQL: {sql}")cursor = self._get_cursor()result = {}# noinspection PyBroadExceptiontry:cursor.execute(sql)columns_types = cursor.columns_with_typescolumns = [item[0] for item in columns_types]data = cursor.fetchone()if all([columns, data]):result = dict(zip(columns, data))except BaseException:logging.error("------clickhouse SQL ERROR:{}".format(traceback.format_exc()))finally:cursor.close()return resultdef fetchmany(self, sql):"""查询多条数据:param sql: sql语句:return:"""print("sql123", sql)logging.info(msg=f"------clickhouse SQL: {sql}")cursor = self._get_cursor()result = []# noinspection PyBroadExceptiontry:cursor.execute(sql)columns_types = cursor.columns_with_typescolumns = [item[0] for item in columns_types]data_list = cursor.fetchall()if all([columns, data_list]):for data in data_list:print("---",columns,data)result.append(dict(zip(columns, data)))except BaseException:logging.error("------clickhouse SQL ERROR:{}".format(traceback.format_exc()))finally:cursor.close()return resultdef execute(self, sql):"""执行sql语句:param sql: sql语句:return:"""logging.info(msg=f"------clickhouse SQL: {sql}")cursor = self._get_cursor()result = []try:cursor.execute(sql)except Exception as e:logging.error("------clickhouse SQL ERROR:{}".format(traceback.format_exc()))finally:cursor.close()return resultdef multiClickhouseConn(ck_configs):"""多clickhouse连接:param ck_configs: clickhouse连接配置:return:"""conn_dict = {}for name, conf_inf in ck_configs.items():conn_dict[name] = ClickhouseManger(conf_inf)print("Connection (clickhouse: %s) : %s ==== successful " % (name, conf_inf))logging.info("Connection (clickhouse %s) : %s ==== successful " % (name, conf_inf))return conn_dict
if __name__ == "__main__":b = multiClickhouseConn(ck_configs=ck_configs)sql = """insert into dim_customer_statictiswith (select sum(buyer_cnt)from (select count(distinct buyer_id) as buyer_cntfrom dws_buyer_tagwhere province != ''group by province)
) as total_cnt,(select sum(order_total_amt)from dws_buyer_tagwhere province != '') as total_amt
select province,buyer_cnt,buyer_percent,order_per_price,sales_percent
from (select province,total_cnt,total_amt,count(distinct buyer_id) as buyer_cnt,round(buyer_cnt / total_cnt, 6) as buyer_percent,sum(order_total_amt) as order_total_amt,round(avg(order_per_price), 2) as order_per_price,round(order_total_amt / total_amt, 6) as sales_percentfrom dws_buyer_tagwhere province != ''group by provinceorder by buyer_cnt desc)"""print(b['data'].execute(sql))