热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

推荐引擎数据导入模块的实现

毕设做到后半部分,需要实现将用户在一段时间(比如1天)内产生的新数据导入HDFS的功能,这样数据仓库中的数据才能和数据库中的数据同步在新建了一个PyDev项目后,需要如下操作(拣最

毕设做到后半部分,需要实现将用户在一段时间(比如1天)内产生的新数据导入HDFS的功能,这样数据仓库中的数据才能和数据库中的数据同步

在新建了一个PyDev项目后,需要如下操作(拣最主要的写):

 

模块的环境变量:

# -*- coding:UTF-8 -*-
#
!/usr/bin/python # FileName:pro_env.py

#*************************************************** # 项目的路径 PROJECT_DIR = "/usr/local/EclipseProjects/MyBI" # 项目配置文件的路径 PROJECT_CONF_DIR = PROJECT_DIR + "/conf/" # 项目第三方库的路径 PROJECT_LIB_DIR = PROJECT_DIR + "/lib" # 项目临时文件的路径 PROJECT_TMP_DIR = PROJECT_DIR + "/temp" #*************************************************** # Hadoop的安装路径 HADOOP_HOME = "/usr/local/hadoop/" # Hadoop的命令路径 HADOOP_PATH = HADOOP_HOME + "bin/" # HIVE的安装路径 HIVE_HOME = "/opt/hive-0.9.0/" # HIVE的命令路径 HIVE_PATH = HIVE_HOME + "bin/" # Sqoop的安装路径 SQOOP_HOME = "/opt/Sqoop/" # Sqoop的命令路径 SQOOP_PATH = SQOOP_HOME + "bin/" #*************************************************** # Java的安装路径 Java_HOME = "/usr/lib/jvm/jdk1.7.0_75"

配置文件:

导入模块的配置文件主要的目的是告诉Sqoop,导入哪些表,怎么导入,我暂时需要一张表,新建一个XML文件Import.xml,type="add"表示增量导入

xml version="1.0" encoding="UTF-8"?>
<root>
    <task type="add">
        <table>ModifyRecordstable>
    task>
root>

需要对每张表进行更细一步的配置,新建ModifyRecords.xml

xml version="1.0" encoding="UTF-8"?>
<root>
    <sqoop-shell type="import">
        <param key="connect">jdbc:mysql://localhost:3306/Recommendparam>
        <param key="username">${username}param>
        <param key="password">${password}param>
        <param key="target-dir">/user/hadoop/Recommend/$dtparam>
        <param key="query">‘select userID,movieID,pref from Recommend.ModifyRecords where modifyDate$flag"\$CONDITIONS" and $CONDITIONS‘param>
        <param key="m">1param>
        <param key="fields-terminated-by">‘,‘param>
    sqoop-shell>
root>

剩下的工作就是解析配置文件:

# -*- coding:UTF-8 -*-
#!/usr/bin/python
# FileName:import.py
from com.utls.pro_env import PROJECT_CONF_DIR
import time
from com.utls.sqoop import SqoopUtil
import xml.etree.ElementTree as ET

# 其中dt为昨天的日期,将由调度模块传入
def resolve_conf(dt):
    
    # 获得配置文件名
    conf_file = PROJECT_CONF_DIR + "Import.xml"
    
    # 解析配置文件
    xml_tree = ET.parse(conf_file)
    # 获得task元素
    tasks = xml_tree.findall(./task)
    
    for task in tasks:
        # 获得导入类型,增量导入或者全量导入
        import_type = task.attrib["type"]
        
        # 获得表名集合
        tables = task.findall(./table)
        
        # 用来保存待执行的Sqoop命令的集合
        cmds = []

        # 迭代表名集合,解析表配置文件
        for i in range(len(tables)):
            # 表名
            table_name = tables[i].text
            # 表配置文件名
            table_conf_file = PROJECT_CONF_DIR + table_name + ".xml"
            
            # 解析表配置文件
            xmlTree = ET.parse(table_conf_file)
            
            # 获取sqoop-shell节点
            sqoopNodes = xmlTree.findall("./sqoop-shell")
            
            # 获取sqoop-shell节点
            sqoop_cmd_type = sqoopNodes[0].attrib["type"]
            # 获取
            praNodes = sqoopNodes[0].findall("./param")
            
            # 用来保存param信息的字典
            cmap = {}
            
            for i in range(len(praNodes)):
                # 获得key属性的值
                key = praNodes[i].attrib["key"]
                # 获得param标签中间的值
                value = praNodes[i].text
                # 保存到字典中
                cmap[key] = value
                
            # 首先组装成sqoop命令头
            command = "sqoop " + sqoop_cmd_type
                
            # 如果为全量导入
            if(import_type == "all"):
                # query的查询条件为
                import_cOndition= dt
                flag = "<"
            # 如果为增量导入
            elif (import_type == "add"):
                # query的查询条件为=dt
                import_cOndition= dt
                flag = "="
            else:
                raise Exception
                
            # #迭代字典将param的信息拼装成字符串
            for key in cmap.keys():
                    
                value = cmap[key]
                    
                # 如果不是键值对形式的命令选项
                if(value == None or value == "" or value == " "):
                    value = ""
                    
                # 将query的CONDITIONS替换为查询条件
                if(key == "query"):
                    value = value.replace("\$CONDITIONS", import_condition)
                    value = value.replace("$flag", flag)
                        
                # 将导入分区替换为传入的时间
                if(key == "target-dir"):
                    value = value.replace("$dt", dt)
                    
                # 拼装为命令
                if key == "fields-terminated-by":
                    command += " --" + key + " " + value
                else:
                    command += " --" + key + " " + value + "\\" + "\n"
                
            # 将命令加入至待执行的命令集合
            cmds.append(command)
        
    return cmds

# Python模块的入口:main函数
if __name__ == __main__:
    
    # 调度模块将昨天的时间传入
    dt = time.strftime("%Y-%m-%d", time.localtime(time.time()))
    # 解析配置文件,获得sqoop命令集合
    cmds = resolve_conf(dt)
    
    # 迭代集合,执行命令
    for i in range(len(cmds)):
        cmd = cmds[i]
        
        # 执行导入过程
        SqoopUtil.execute_shell(cmd)

拼装出来的命令如下:

sqoop import --username xxxx --target-dir /user/hadoop/Recommend/2015-04-26 --m 1 --connect jdbc:mysql://localhost:3306/Recommend --query select userID,movieID,pref from Recommend.ModifyRecords where modifyDate="2015-04-26" and $CONDITIONS --password xxxx --fields-terminated-by ,

最后新建一个模块(不过当然写在import.py的main函数之前...),编写一个类,为该类编写一个函数,目的是用Python调用Sqoop命令:

#!/usr/bin/python
# FileName sqoop.py
# -*- coding:UTF-8 -*-
import os
class SqoopUtil(object):
    ‘‘‘
    sqoop operation
    ‘‘‘
    def __init__(self):
        pass
    
    @staticmethod
    def execute_shell(shell):
        print shell
        os.system(shell)
        

推荐引擎数据导入模块的实现


推荐阅读
  • PHP 5.2.5 安装与配置指南
    本文详细介绍了 PHP 5.2.5 的安装和配置步骤,帮助开发者解决常见的环境配置问题,特别是上传图片时遇到的错误。通过本教程,您可以顺利搭建并优化 PHP 运行环境。 ... [详细]
  • 国内BI工具迎战国际巨头Tableau,稳步崛起
    尽管商业智能(BI)工具在中国的普及程度尚不及国际市场,但近年来,随着本土企业的持续创新和市场推广,国内主流BI工具正逐渐崭露头角。面对国际品牌如Tableau的强大竞争,国内BI工具通过不断优化产品和技术,赢得了越来越多用户的认可。 ... [详细]
  • QUIC协议:快速UDP互联网连接
    QUIC(Quick UDP Internet Connections)是谷歌开发的一种旨在提高网络性能和安全性的传输层协议。它基于UDP,并结合了TLS级别的安全性,提供了更高效、更可靠的互联网通信方式。 ... [详细]
  • 深入理解 Oracle 存储函数:计算员工年收入
    本文介绍如何使用 Oracle 存储函数查询特定员工的年收入。我们将详细解释存储函数的创建过程,并提供完整的代码示例。 ... [详细]
  • 本文总结了2018年的关键成就,包括职业变动、购车、考取驾照等重要事件,并分享了读书、工作、家庭和朋友方面的感悟。同时,展望2019年,制定了健康、软实力提升和技术学习的具体目标。 ... [详细]
  • 在计算机技术的学习道路上,51CTO学院以其专业性和专注度给我留下了深刻印象。从2012年接触计算机到2014年开始系统学习网络技术和安全领域,51CTO学院始终是我信赖的学习平台。 ... [详细]
  • CSS 布局:液态三栏混合宽度布局
    本文介绍了如何使用 CSS 实现液态的三栏布局,其中各栏具有不同的宽度设置。通过调整容器和内容区域的属性,可以实现灵活且响应式的网页设计。 ... [详细]
  • Linux 系统启动故障排除指南:MBR 和 GRUB 问题
    本文详细介绍了 Linux 系统启动过程中常见的 MBR 扇区和 GRUB 引导程序故障及其解决方案,涵盖从备份、模拟故障到恢复的具体步骤。 ... [详细]
  • 本文介绍了如何使用jQuery根据元素的类型(如复选框)和标签名(如段落)来获取DOM对象。这有助于更高效地操作网页中的特定元素。 ... [详细]
  • 本文介绍如何在 Xcode 中使用快捷键和菜单命令对多行代码进行缩进,包括右缩进和左缩进的具体操作方法。 ... [详细]
  • 本文介绍了一款用于自动化部署 Linux 服务的 Bash 脚本。该脚本不仅涵盖了基本的文件复制和目录创建,还处理了系统服务的配置和启动,确保在多种 Linux 发行版上都能顺利运行。 ... [详细]
  • 在Linux系统中配置并启动ActiveMQ
    本文详细介绍了如何在Linux环境中安装和配置ActiveMQ,包括端口开放及防火墙设置。通过本文,您可以掌握完整的ActiveMQ部署流程,确保其在网络环境中正常运行。 ... [详细]
  • 如何在WPS Office for Mac中调整Word文档的文字排列方向
    本文将详细介绍如何使用最新版WPS Office for Mac调整Word文档中的文字排列方向。通过这些步骤,用户可以轻松更改文本的水平或垂直排列方式,以满足不同的排版需求。 ... [详细]
  • 理解存储器的层次结构有助于程序员优化程序性能,通过合理安排数据在不同层级的存储位置,提升CPU的数据访问速度。本文详细探讨了静态随机访问存储器(SRAM)和动态随机访问存储器(DRAM)的工作原理及其应用场景,并介绍了存储器模块中的数据存取过程及局部性原理。 ... [详细]
  • 几何画板展示电场线与等势面的交互关系
    几何画板是一款功能强大的物理教学软件,具备丰富的绘图和度量工具。它不仅能够模拟物理实验过程,还能通过定量分析揭示物理现象背后的规律,尤其适用于难以在实际实验中展示的内容。本文将介绍如何使用几何画板演示电场线与等势面之间的关系。 ... [详细]
author-avatar
ik82jht
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有