热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

推荐引擎数据导入模块的实现

毕设做到后半部分,需要实现将用户在一段时间(比如1天)内产生的新数据导入HDFS的功能,这样数据仓库中的数据才能和数据库中的数据同步在新建了一个PyDev项目后,需要如下操作(拣最

毕设做到后半部分,需要实现将用户在一段时间(比如1天)内产生的新数据导入HDFS的功能,这样数据仓库中的数据才能和数据库中的数据同步

在新建了一个PyDev项目后,需要如下操作(拣最主要的写):

 

模块的环境变量:

# -*- coding:UTF-8 -*-
#
!/usr/bin/python # FileName:pro_env.py

#*************************************************** # 项目的路径 PROJECT_DIR = "/usr/local/EclipseProjects/MyBI" # 项目配置文件的路径 PROJECT_CONF_DIR = PROJECT_DIR + "/conf/" # 项目第三方库的路径 PROJECT_LIB_DIR = PROJECT_DIR + "/lib" # 项目临时文件的路径 PROJECT_TMP_DIR = PROJECT_DIR + "/temp" #*************************************************** # Hadoop的安装路径 HADOOP_HOME = "/usr/local/hadoop/" # Hadoop的命令路径 HADOOP_PATH = HADOOP_HOME + "bin/" # HIVE的安装路径 HIVE_HOME = "/opt/hive-0.9.0/" # HIVE的命令路径 HIVE_PATH = HIVE_HOME + "bin/" # Sqoop的安装路径 SQOOP_HOME = "/opt/Sqoop/" # Sqoop的命令路径 SQOOP_PATH = SQOOP_HOME + "bin/" #*************************************************** # Java的安装路径 Java_HOME = "/usr/lib/jvm/jdk1.7.0_75"

配置文件:

导入模块的配置文件主要的目的是告诉Sqoop,导入哪些表,怎么导入,我暂时需要一张表,新建一个XML文件Import.xml,type="add"表示增量导入

xml version="1.0" encoding="UTF-8"?>
<root>
    <task type="add">
        <table>ModifyRecordstable>
    task>
root>

需要对每张表进行更细一步的配置,新建ModifyRecords.xml

xml version="1.0" encoding="UTF-8"?>
<root>
    <sqoop-shell type="import">
        <param key="connect">jdbc:mysql://localhost:3306/Recommendparam>
        <param key="username">${username}param>
        <param key="password">${password}param>
        <param key="target-dir">/user/hadoop/Recommend/$dtparam>
        <param key="query">‘select userID,movieID,pref from Recommend.ModifyRecords where modifyDate$flag"\$CONDITIONS" and $CONDITIONS‘param>
        <param key="m">1param>
        <param key="fields-terminated-by">‘,‘param>
    sqoop-shell>
root>

剩下的工作就是解析配置文件:

# -*- coding:UTF-8 -*-
#!/usr/bin/python
# FileName:import.py
from com.utls.pro_env import PROJECT_CONF_DIR
import time
from com.utls.sqoop import SqoopUtil
import xml.etree.ElementTree as ET

# 其中dt为昨天的日期,将由调度模块传入
def resolve_conf(dt):
    
    # 获得配置文件名
    conf_file = PROJECT_CONF_DIR + "Import.xml"
    
    # 解析配置文件
    xml_tree = ET.parse(conf_file)
    # 获得task元素
    tasks = xml_tree.findall(./task)
    
    for task in tasks:
        # 获得导入类型,增量导入或者全量导入
        import_type = task.attrib["type"]
        
        # 获得表名集合
        tables = task.findall(./table)
        
        # 用来保存待执行的Sqoop命令的集合
        cmds = []

        # 迭代表名集合,解析表配置文件
        for i in range(len(tables)):
            # 表名
            table_name = tables[i].text
            # 表配置文件名
            table_conf_file = PROJECT_CONF_DIR + table_name + ".xml"
            
            # 解析表配置文件
            xmlTree = ET.parse(table_conf_file)
            
            # 获取sqoop-shell节点
            sqoopNodes = xmlTree.findall("./sqoop-shell")
            
            # 获取sqoop-shell节点
            sqoop_cmd_type = sqoopNodes[0].attrib["type"]
            # 获取
            praNodes = sqoopNodes[0].findall("./param")
            
            # 用来保存param信息的字典
            cmap = {}
            
            for i in range(len(praNodes)):
                # 获得key属性的值
                key = praNodes[i].attrib["key"]
                # 获得param标签中间的值
                value = praNodes[i].text
                # 保存到字典中
                cmap[key] = value
                
            # 首先组装成sqoop命令头
            command = "sqoop " + sqoop_cmd_type
                
            # 如果为全量导入
            if(import_type == "all"):
                # query的查询条件为
                import_cOndition= dt
                flag = "<"
            # 如果为增量导入
            elif (import_type == "add"):
                # query的查询条件为=dt
                import_cOndition= dt
                flag = "="
            else:
                raise Exception
                
            # #迭代字典将param的信息拼装成字符串
            for key in cmap.keys():
                    
                value = cmap[key]
                    
                # 如果不是键值对形式的命令选项
                if(value == None or value == "" or value == " "):
                    value = ""
                    
                # 将query的CONDITIONS替换为查询条件
                if(key == "query"):
                    value = value.replace("\$CONDITIONS", import_condition)
                    value = value.replace("$flag", flag)
                        
                # 将导入分区替换为传入的时间
                if(key == "target-dir"):
                    value = value.replace("$dt", dt)
                    
                # 拼装为命令
                if key == "fields-terminated-by":
                    command += " --" + key + " " + value
                else:
                    command += " --" + key + " " + value + "\\" + "\n"
                
            # 将命令加入至待执行的命令集合
            cmds.append(command)
        
    return cmds

# Python模块的入口:main函数
if __name__ == __main__:
    
    # 调度模块将昨天的时间传入
    dt = time.strftime("%Y-%m-%d", time.localtime(time.time()))
    # 解析配置文件,获得sqoop命令集合
    cmds = resolve_conf(dt)
    
    # 迭代集合,执行命令
    for i in range(len(cmds)):
        cmd = cmds[i]
        
        # 执行导入过程
        SqoopUtil.execute_shell(cmd)

拼装出来的命令如下:

sqoop import --username xxxx --target-dir /user/hadoop/Recommend/2015-04-26 --m 1 --connect jdbc:mysql://localhost:3306/Recommend --query select userID,movieID,pref from Recommend.ModifyRecords where modifyDate="2015-04-26" and $CONDITIONS --password xxxx --fields-terminated-by ,

最后新建一个模块(不过当然写在import.py的main函数之前...),编写一个类,为该类编写一个函数,目的是用Python调用Sqoop命令:

#!/usr/bin/python
# FileName sqoop.py
# -*- coding:UTF-8 -*-
import os
class SqoopUtil(object):
    ‘‘‘
    sqoop operation
    ‘‘‘
    def __init__(self):
        pass
    
    @staticmethod
    def execute_shell(shell):
        print shell
        os.system(shell)
        

推荐引擎数据导入模块的实现


推荐阅读
  • 利用树莓派畅享落网电台音乐体验
    最近重新拾起了闲置已久的树莓派,这台小巧的开发板已经沉寂了半年多。上个月闲暇时间较多,我决定将其重新启用。恰逢落网电台进行了改版,回忆起之前在树莓派论坛上看到有人用它来播放豆瓣音乐,便萌生了同样的想法。通过一番调试,终于实现了在树莓派上流畅播放落网电台音乐的功能,带来了全新的音乐享受体验。 ... [详细]
  • 本指南详细介绍了如何在CentOS 6.6 64位系统上以root用户身份部署Tomcat 8服务器。系统环境为CentOS 6.6 64位,采用源码安装方式。所需软件为apache-tomcat-8.0.23.tar.gz,建议将软件下载至/root/opt目录。具体下载地址请参见官方资源。本指南涵盖了从环境准备到服务启动的完整步骤,适用于需要在该系统环境下搭建高性能Web应用服务器的技术人员。 ... [详细]
  • 该问题可能由守护进程配置不当引起,例如未识别的JVM选项或内存分配不足。建议检查并调整JVM参数,确保为对象堆预留足够的内存空间(至少1572864KB)。此外,还可以优化应用程序的内存使用,减少不必要的内存消耗。 ... [详细]
  • ### 摘要`mkdir` 命令用于在指定位置创建新的目录。其基本格式为 `mkdir [选项] 目录名称`。通过该命令,用户可以在文件系统中创建一个或多个以指定名称命名的文件夹。执行此操作的用户需要具备相应的权限。此外,`mkdir` 还支持多种选项,如 `-p` 用于递归创建多级目录,确保路径中的所有层级都存在。掌握这些基本用法和选项,有助于提高在 Linux 系统中的文件管理效率。 ... [详细]
  • 在探讨Hibernate框架的高级特性时,缓存机制和懒加载策略是提升数据操作效率的关键要素。缓存策略能够显著减少数据库访问次数,从而提高应用性能,特别是在处理频繁访问的数据时。Hibernate提供了多层次的缓存支持,包括一级缓存和二级缓存,以满足不同场景下的需求。懒加载策略则通过按需加载关联对象,进一步优化了资源利用和响应时间。本文将深入分析这些机制的实现原理及其最佳实践。 ... [详细]
  • PHP自学必备:从零开始的准备工作与工具选择 ... [详细]
  • 初探性能优化:入门指南与实践技巧
    在编程领域,常有“尚未精通编码便急于优化”的声音。为了从性能优化的角度提升代码质量,本文将带领读者初步探索性能优化的基本概念与实践技巧。即使程序看似运行良好,数据处理效率仍有待提高,通过系统学习性能优化,能够帮助开发者编写更加高效、稳定的代码。文章不仅介绍了性能优化的基础知识,还提供了实用的调优方法和工具,帮助读者在实际项目中应用这些技术。 ... [详细]
  • 本文详细介绍了如何安全地手动卸载Exchange Server 2003,以确保系统的稳定性和数据的完整性。根据微软官方支持文档(https://support.microsoft.com/kb833396/zh-cn),在进行卸载操作前,需要特别注意备份重要数据,并遵循一系列严格的步骤,以避免对现有网络环境造成不利影响。此外,文章还提供了详细的故障排除指南,帮助管理员在遇到问题时能够迅速解决,确保整个卸载过程顺利进行。 ... [详细]
  • 在 POJ1651 的乘法谜题挑战中,如果选手按相反顺序选择卡片,即先选 50,再选 20,最后选 1,则最终得分会有所不同。题目要求输入的第一行包含... 改写后的摘要:在 POJ1651 的乘法谜题挑战中,如果选手按照逆序选取卡片,例如依次选择 50、20 和 1,最终的得分将发生变化。题目首先要求输入的第一行包括... ... [详细]
  • 资源管理器的基础架构包括三个核心组件:1)资源池,用于将CPU和内存等资源分配给不同的容器;2)负载组,负责承载任务并将其分配到相应的资源池;3)分类函数,用于将不同的会话映射到合适的负载组。该系统提供了两种主要的资源管理策略。 ... [详细]
  • AngularJS 进阶指南:第三部分深入解析
    在本文中,我们将深入探讨 AngularJS 的指令模型,特别是 `ng-model` 指令。`ng-model` 指令用于将 HTML 元素与应用程序数据进行双向绑定,支持多种数据类型验证,如数字、电子邮件地址和必填项检查。此外,我们还将介绍如何利用该指令优化表单验证和数据处理流程,提升开发效率和用户体验。 ... [详细]
  • 本文深入解析了Java面向对象编程的核心概念及其应用,重点探讨了面向对象的三大特性:封装、继承和多态。封装确保了数据的安全性和代码的可维护性;继承支持代码的重用和扩展;多态则增强了程序的灵活性和可扩展性。通过具体示例,文章详细阐述了这些特性在实际开发中的应用和优势。 ... [详细]
  • `chkconfig` 命令主要用于管理和查询系统服务在不同运行级别中的启动状态。该命令不仅能够更新服务的启动配置,还能检查特定服务的当前状态。通过 `chkconfig`,管理员可以轻松地控制服务在系统启动时的行为,确保关键服务正常运行,同时禁用不必要的服务以提高系统性能和安全性。本文将详细介绍 `chkconfig` 的各项参数及其使用方法,帮助读者更好地理解和应用这一强大的系统管理工具。 ... [详细]
  • 本文深入解析了HTML框架集(FRAMESET)的使用方法及其应用场景。首先介绍了几个关键概念,如如何通过FRAMESET标签将主视图划分为多个独立的区域,每个区域可以加载不同的HTML文件。此外,还详细探讨了FRAMESET在实际开发中的优缺点,并提供了具体的实例代码,帮助开发者更好地理解和应用这一技术。 ... [详细]
  • 2012年9月12日优酷土豆校园招聘笔试题目解析与备考指南
    2012年9月12日,优酷土豆校园招聘笔试题目解析与备考指南。在选择题部分,有一道题目涉及中国人的血型分布情况,具体为A型30%、B型20%、O型40%、AB型10%。若需确保在随机选取的样本中,至少有一人为B型血的概率不低于90%,则需要选取的最少人数是多少?该问题不仅考察了概率统计的基本知识,还要求考生具备一定的逻辑推理能力。 ... [详细]
author-avatar
ik82jht
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有