作者:霦槟哊禮kc | 来源:互联网 | 2023-10-10 14:55
importconfigparserimportosimportpyhdfsfromqueueimportQueuefromthreadingimportThreadcat
import configparser
import os
import pyhdfs
from queue import Queue
from threading import Threadcategory_queue = Queue()
hdfs_client = pyhdfs.HdfsClient(hdfs_ip, hdfs_port, hdfs_user)def load_conf():print("加载配置文件")cf = configparser.ConfigParser()print('---')cf.read("base.conf", encoding="UTF-8")print('=====')secs = cf.sections()if len(secs) == 0:raise TypeError("配置文件无效")print("配置文件已加载")print("解析配置文件")conf_dict_list = [dict()]for x in range(len(conf_dict_list)):sec = secs[x]ops_list = cf.options(sec)for ops in ops_list:conf_dict_list[x][ops] = cf.get(sec, ops)print("配置文件解析成功")return conf_dict_list[0]def get_file():for category_dir in hdfs_client.listdir(hdfs_data_path):num = 0 # 单类计数if hdfs_client.get_file_status(hdfs_data_path+os.sep+category_dir) == 'DIRECTORY':category_queue.put(hdfs_data_path+os.sep+category_dir)def read_file():while category_queue.not_empty:category_dir = category_queue.get()num = 0for date_file in hdfs_client.listdir(category_dir):print(date_file)if '.out' in date_file:f = hdfs_client.open(date_file)line_num = len(f.readlines())num += line_numprint(category_dir, num)print('类别:{},数据量:{}'.format(category_dir, num))if __name__ == '__main__':print("启动程序")base_dict = load_conf()hdfs_ip = base_dict["hdfs_ip"]hdfs_port = int(base_dict["hdfs_port"])hdfs_user = base_dict["hdfs_user"]hdfs_data_path = base_dict["hdfs_data_path"]sleep_time = int(base_dict["sleep_time"])thread_list = []t = Thread(target=get_file)thread_list.append(t)for i in range(10):t = Thread(target=read_file)thread_list.append(i)for j in thread_list:j.start()j.join()