热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

推荐引擎数据导入模块的实现

毕设做到后半部分,需要实现将用户在一段时间(比如1天)内产生的新数据导入HDFS的功能,这样数据仓库中的数据才能和数据库中的数据同步在新建了一个PyDev项目后,需要如下操作(拣最

毕设做到后半部分,需要实现将用户在一段时间(比如1天)内产生的新数据导入HDFS的功能,这样数据仓库中的数据才能和数据库中的数据同步

在新建了一个PyDev项目后,需要如下操作(拣最主要的写):

 

模块的环境变量:

# -*- coding:UTF-8 -*-
#
!/usr/bin/python # FileName:pro_env.py

#*************************************************** # 项目的路径 PROJECT_DIR = "/usr/local/EclipseProjects/MyBI" # 项目配置文件的路径 PROJECT_CONF_DIR = PROJECT_DIR + "/conf/" # 项目第三方库的路径 PROJECT_LIB_DIR = PROJECT_DIR + "/lib" # 项目临时文件的路径 PROJECT_TMP_DIR = PROJECT_DIR + "/temp" #*************************************************** # Hadoop的安装路径 HADOOP_HOME = "/usr/local/hadoop/" # Hadoop的命令路径 HADOOP_PATH = HADOOP_HOME + "bin/" # HIVE的安装路径 HIVE_HOME = "/opt/hive-0.9.0/" # HIVE的命令路径 HIVE_PATH = HIVE_HOME + "bin/" # Sqoop的安装路径 SQOOP_HOME = "/opt/Sqoop/" # Sqoop的命令路径 SQOOP_PATH = SQOOP_HOME + "bin/" #*************************************************** # Java的安装路径 Java_HOME = "/usr/lib/jvm/jdk1.7.0_75"

配置文件:

导入模块的配置文件主要的目的是告诉Sqoop,导入哪些表,怎么导入,我暂时需要一张表,新建一个XML文件Import.xml,type="add"表示增量导入

xml version="1.0" encoding="UTF-8"?>
<root>
    <task type="add">
        <table>ModifyRecordstable>
    task>
root>

需要对每张表进行更细一步的配置,新建ModifyRecords.xml

xml version="1.0" encoding="UTF-8"?>
<root>
    <sqoop-shell type="import">
        <param key="connect">jdbc:mysql://localhost:3306/Recommendparam>
        <param key="username">${username}param>
        <param key="password">${password}param>
        <param key="target-dir">/user/hadoop/Recommend/$dtparam>
        <param key="query">‘select userID,movieID,pref from Recommend.ModifyRecords where modifyDate$flag"\$CONDITIONS" and $CONDITIONS‘param>
        <param key="m">1param>
        <param key="fields-terminated-by">‘,‘param>
    sqoop-shell>
root>

剩下的工作就是解析配置文件:

# -*- coding:UTF-8 -*-
#!/usr/bin/python
# FileName:import.py
from com.utls.pro_env import PROJECT_CONF_DIR
import time
from com.utls.sqoop import SqoopUtil
import xml.etree.ElementTree as ET

# 其中dt为昨天的日期,将由调度模块传入
def resolve_conf(dt):
    
    # 获得配置文件名
    conf_file = PROJECT_CONF_DIR + "Import.xml"
    
    # 解析配置文件
    xml_tree = ET.parse(conf_file)
    # 获得task元素
    tasks = xml_tree.findall(./task)
    
    for task in tasks:
        # 获得导入类型,增量导入或者全量导入
        import_type = task.attrib["type"]
        
        # 获得表名集合
        tables = task.findall(./table)
        
        # 用来保存待执行的Sqoop命令的集合
        cmds = []

        # 迭代表名集合,解析表配置文件
        for i in range(len(tables)):
            # 表名
            table_name = tables[i].text
            # 表配置文件名
            table_conf_file = PROJECT_CONF_DIR + table_name + ".xml"
            
            # 解析表配置文件
            xmlTree = ET.parse(table_conf_file)
            
            # 获取sqoop-shell节点
            sqoopNodes = xmlTree.findall("./sqoop-shell")
            
            # 获取sqoop-shell节点
            sqoop_cmd_type = sqoopNodes[0].attrib["type"]
            # 获取
            praNodes = sqoopNodes[0].findall("./param")
            
            # 用来保存param信息的字典
            cmap = {}
            
            for i in range(len(praNodes)):
                # 获得key属性的值
                key = praNodes[i].attrib["key"]
                # 获得param标签中间的值
                value = praNodes[i].text
                # 保存到字典中
                cmap[key] = value
                
            # 首先组装成sqoop命令头
            command = "sqoop " + sqoop_cmd_type
                
            # 如果为全量导入
            if(import_type == "all"):
                # query的查询条件为
                import_cOndition= dt
                flag = "<"
            # 如果为增量导入
            elif (import_type == "add"):
                # query的查询条件为=dt
                import_cOndition= dt
                flag = "="
            else:
                raise Exception
                
            # #迭代字典将param的信息拼装成字符串
            for key in cmap.keys():
                    
                value = cmap[key]
                    
                # 如果不是键值对形式的命令选项
                if(value == None or value == "" or value == " "):
                    value = ""
                    
                # 将query的CONDITIONS替换为查询条件
                if(key == "query"):
                    value = value.replace("\$CONDITIONS", import_condition)
                    value = value.replace("$flag", flag)
                        
                # 将导入分区替换为传入的时间
                if(key == "target-dir"):
                    value = value.replace("$dt", dt)
                    
                # 拼装为命令
                if key == "fields-terminated-by":
                    command += " --" + key + " " + value
                else:
                    command += " --" + key + " " + value + "\\" + "\n"
                
            # 将命令加入至待执行的命令集合
            cmds.append(command)
        
    return cmds

# Python模块的入口:main函数
if __name__ == __main__:
    
    # 调度模块将昨天的时间传入
    dt = time.strftime("%Y-%m-%d", time.localtime(time.time()))
    # 解析配置文件,获得sqoop命令集合
    cmds = resolve_conf(dt)
    
    # 迭代集合,执行命令
    for i in range(len(cmds)):
        cmd = cmds[i]
        
        # 执行导入过程
        SqoopUtil.execute_shell(cmd)

拼装出来的命令如下:

sqoop import --username xxxx --target-dir /user/hadoop/Recommend/2015-04-26 --m 1 --connect jdbc:mysql://localhost:3306/Recommend --query select userID,movieID,pref from Recommend.ModifyRecords where modifyDate="2015-04-26" and $CONDITIONS --password xxxx --fields-terminated-by ,

最后新建一个模块(不过当然写在import.py的main函数之前...),编写一个类,为该类编写一个函数,目的是用Python调用Sqoop命令:

#!/usr/bin/python
# FileName sqoop.py
# -*- coding:UTF-8 -*-
import os
class SqoopUtil(object):
    ‘‘‘
    sqoop operation
    ‘‘‘
    def __init__(self):
        pass
    
    @staticmethod
    def execute_shell(shell):
        print shell
        os.system(shell)
        

推荐引擎数据导入模块的实现


推荐阅读
  • 国内BI工具迎战国际巨头Tableau,稳步崛起
    尽管商业智能(BI)工具在中国的普及程度尚不及国际市场,但近年来,随着本土企业的持续创新和市场推广,国内主流BI工具正逐渐崭露头角。面对国际品牌如Tableau的强大竞争,国内BI工具通过不断优化产品和技术,赢得了越来越多用户的认可。 ... [详细]
  • Linux 系统启动故障排除指南:MBR 和 GRUB 问题
    本文详细介绍了 Linux 系统启动过程中常见的 MBR 扇区和 GRUB 引导程序故障及其解决方案,涵盖从备份、模拟故障到恢复的具体步骤。 ... [详细]
  • Vue 2 中解决页面刷新和按钮跳转导致导航栏样式失效的问题
    本文介绍了如何通过配置路由的 meta 字段,确保 Vue 2 项目中的导航栏在页面刷新或内部按钮跳转时,始终保持正确的 active 样式。具体实现方法包括设置路由的 meta 属性,并在 HTML 模板中动态绑定类名。 ... [详细]
  • 本文探讨了如何通过最小生成树(MST)来计算严格次小生成树。在处理过程中,需特别注意所有边权重相等的情况,以避免错误。我们首先构建最小生成树,然后枚举每条非树边,检查其是否能形成更优的次小生成树。 ... [详细]
  • QUIC协议:快速UDP互联网连接
    QUIC(Quick UDP Internet Connections)是谷歌开发的一种旨在提高网络性能和安全性的传输层协议。它基于UDP,并结合了TLS级别的安全性,提供了更高效、更可靠的互联网通信方式。 ... [详细]
  • 深入理解OAuth认证机制
    本文介绍了OAuth认证协议的核心概念及其工作原理。OAuth是一种开放标准,旨在为第三方应用提供安全的用户资源访问授权,同时确保用户的账户信息(如用户名和密码)不会暴露给第三方。 ... [详细]
  • 2023 ARM嵌入式系统全国技术巡讲旨在分享ARM公司在半导体知识产权(IP)领域的最新进展。作为全球领先的IP提供商,ARM在嵌入式处理器市场占据主导地位,其产品广泛应用于90%以上的嵌入式设备中。此次巡讲将邀请来自ARM、飞思卡尔以及华清远见教育集团的行业专家,共同探讨当前嵌入式系统的前沿技术和应用。 ... [详细]
  • 深入理解 Oracle 存储函数:计算员工年收入
    本文介绍如何使用 Oracle 存储函数查询特定员工的年收入。我们将详细解释存储函数的创建过程,并提供完整的代码示例。 ... [详细]
  • 本文总结了2018年的关键成就,包括职业变动、购车、考取驾照等重要事件,并分享了读书、工作、家庭和朋友方面的感悟。同时,展望2019年,制定了健康、软实力提升和技术学习的具体目标。 ... [详细]
  • 在计算机技术的学习道路上,51CTO学院以其专业性和专注度给我留下了深刻印象。从2012年接触计算机到2014年开始系统学习网络技术和安全领域,51CTO学院始终是我信赖的学习平台。 ... [详细]
  • CSS 布局:液态三栏混合宽度布局
    本文介绍了如何使用 CSS 实现液态的三栏布局,其中各栏具有不同的宽度设置。通过调整容器和内容区域的属性,可以实现灵活且响应式的网页设计。 ... [详细]
  • 本文介绍了如何使用jQuery根据元素的类型(如复选框)和标签名(如段落)来获取DOM对象。这有助于更高效地操作网页中的特定元素。 ... [详细]
  • 本文详细分析了JSP(JavaServer Pages)技术的主要优点和缺点,帮助开发者更好地理解其适用场景及潜在挑战。JSP作为一种服务器端技术,广泛应用于Web开发中。 ... [详细]
  • 在 Windows 10 中,F1 至 F12 键默认设置为快捷功能键。本文将介绍几种有效方法来禁用这些快捷键,并恢复其标准功能键的作用。请注意,部分笔记本电脑的快捷键可能无法完全关闭。 ... [详细]
  • 本文详细介绍如何使用Python进行配置文件的读写操作,涵盖常见的配置文件格式(如INI、JSON、TOML和YAML),并提供具体的代码示例。 ... [详细]
author-avatar
ik82jht
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有