热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

两个map一个reduce(两个输入文件)

两个map,一个map读取一个hdfs文件,map完之后进入一个reduce进行逻辑处理。packagecom.zhongxin.mr;importorg.apache.common

两个map,一个map读取一个hdfs文件,map完之后进入一个reduce进行逻辑处理。

package com.zhongxin.mr;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

import java.io.IOException;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;

/**
 * Created by DingYS on 2017/12/7.
 * 用户回款计划统计(详情)
 */
public class UserPlanAmount {

    public static class StatisticsMap extends Mapper {
        private Text outKey = new Text();
        private Text outValue = new Text();
        private Pattern pattern = Pattern.compile(",");

        //statistics文件处理
        public void map(LongWritable key, Text value, Context context) throws IOException,InterruptedException{
            String strs[] = pattern.split(String.valueOf(value));
            String bidNo = strs[2];
            String userId = strs[3];
            String totalOnInvestedShare= strs[8];
            String addShare = strs[17];
            String addyield = strs[16];
            String outv = bidNo + pattern +"statstics" + pattern + userId + pattern + totalOnInvestedShare + pattern + addShare + pattern + addyield;
            outKey.set(bidNo);
            outValue.set(outv);
            context.write(outKey,outValue);
        }
    }


    public static class PlanMap extends Mapper {
        private Text outKey = new Text();
        private Text outValue = new Text();
        private Pattern pattern = Pattern.compile(",");

        // plan统计表(该文件在sqoop导入时就进行了数据计算及合并)
        public void map(LongWritable key,Text value,Context context) throws IOException,InterruptedException{
            String strs[] = pattern.split(String.valueOf(value));
            String bidNo = strs[0];
            String interestTime = strs[1];
            String status = strs[2];
            String planStatus = strs[3];
            String yield = strs[4];
            String endDate = strs[6];
            String cycle = strs[7];
            String financedAmount = strs[8];
            String interestType = strs[9];
            String penaltyAmount = strs[10];
            String days = strs[11];
            if("INIT".equals(status)){
                String ouv = bidNo + pattern + "plan" + pattern + interestTime + pattern + planStatus + pattern + yield + pattern +
                        cycle + pattern + financedAmount + pattern + interestType + pattern + penaltyAmount + pattern + days + pattern + endDate;
                outKey.set(bidNo);
                outValue.set(ouv);
                context.write(outKey,outValue);
            }
        }

    }

    public static class Reduce extends Reducer{

        private Text outValue = new Text();
        private Pattern pattern = Pattern.compile(",");

        public void reduce(Text key,Iterable values,Context context) throws IOException,InterruptedException{

            Map> planMap = new HashMap>();

            List statisticsLst = new ArrayList();
            for(Text value : values){
                String strs[] = pattern.split(String.valueOf(value));
                String pbidNo = strs[0];
                if("plan".equals(strs[1])){
                    if(planMap.containsKey(pbidNo)){
                        planMap.get(pbidNo).add(String.valueOf(value));
                    }else{
                        List planLst = new ArrayList();
                        planLst.add(String.valueOf(value));
                        planMap.put(pbidNo,planLst);
                    }
                }else{
                    statisticsLst.add(String.valueOf(value));
                }
            }

            for(String value : statisticsLst){
                String strs[] = pattern.split(String.valueOf(value));
                String bidNo = strs[0];
                String userId = strs[2];
                String totalOnInvestedShare= strs[3];
                String addShare = strs[4];
                String addyield = strs[5];
                if(null == planMap.get(bidNo) || 0 >= planMap.get(bidNo).size()){
                    continue;
                }
                String planBid = planMap.get(bidNo).get(0);
                if(StringUtils.isBlank(planBid)){
                    continue;
                }
                String interestType = pattern.split(planBid)[7];
                if("A1".equals(interestType)){
                    // 到期还本付息
                    for(String v : planMap.get(bidNo)){
                        String strp[] = pattern.split(v);
                        String interestTime = strp[2];
                        String yield = strp[4];
                        String cycle = strp[5];
                        BigDecimal interest = new BigDecimal(totalOnInvestedShare).multiply(new BigDecimal(yield)).divide(new BigDecimal(100),4);
                        BigDecimal addInterest = new BigDecimal(0);
                        if(StringUtils.isNotBlank(addShare) && StringUtils.isNotBlank(addyield)){
                            addInterest = new BigDecimal(addShare).multiply(new BigDecimal(addyield)).divide(new BigDecimal(100),4);
                        }
                        BigDecimal totalInterest = interest.add(addInterest).multiply(new BigDecimal(cycle)).divide(new BigDecimal(365),2);
                        String outv =  userId + pattern + bidNo + pattern + interestTime + pattern + totalInterest + 0.00 + 0.00;
                        outValue.set(outv);
                        context.write(key,outValue);
                    }
                }else{
                    // 按月付息,按季付息
                    for(String v : planMap.get(bidNo)){
                        String strp[] = pattern.split(v);
                        String interestTime = strp[2];
                        String yield = strp[4];
                        String days = strp[9];
                        String endDate = strp[10];
                        String penaltyTotalAmount = strp[8];
                        String financeAmount = strp[6];
                        BigDecimal interest = new BigDecimal(totalOnInvestedShare).multiply(new BigDecimal(yield)).divide(new BigDecimal(100),4);
                        BigDecimal addInterest = new BigDecimal(0);
                        if("null".equals(addShare) && "null".equals(addyield)){
                            addInterest = new BigDecimal(addShare).multiply(new BigDecimal(addyield)).divide(new BigDecimal(100),4);
                        }
                        BigDecimal totalInterest = interest.add(addInterest).multiply(new BigDecimal(days)).divide(new BigDecimal(365),2);
                        String planSttus = strp[3];
                        BigDecimal penalty = new BigDecimal(0);
                        BigDecimal capital = new BigDecimal(0);
                        if("ADVANCE".equals(planSttus)){
                            // 提前还款
                            penalty = new BigDecimal(penaltyTotalAmount).divide(new BigDecimal(financeAmount),2).multiply(new BigDecimal(totalOnInvestedShare));
                            totalInterest = totalInterest.add(penalty);
                            capital = new BigDecimal(totalOnInvestedShare);
                        }
                        /**
                         * 最后一次派息capital记成totalOnInvestedShare
                         */
                        if(interestTime.equals(endDate)){
                            capital = new BigDecimal(totalOnInvestedShare);
                        }
                        String outv =  userId + pattern + bidNo + pattern + interestTime + pattern + totalInterest  +pattern + capital + pattern + penalty;
                        outValue.set(outv);
                        context.write(key,outValue);
                    }
                }
            }

        }

    }

    public static void main(String[] args) throws Exception{
        Configuration cOnfig= new Configuration();
        Job job = Job.getInstance(config);
        job.setJobName("userPlanAmount");
        job.setJarByClass(UserPlanAmount.class);
        MultipleInputs.addInputPath(job,new Path(args[0]), TextInputFormat.class,StatisticsMap.class);
        MultipleInputs.addInputPath(job,new Path(args[1]),TextInputFormat.class,PlanMap.class);
        job.setReducerClass(Reduce.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        job.setOutputFormatClass(TextOutputFormat.class);

        FileOutputFormat.setOutputPath(job, new Path(args[2]));

        System.exit(job.waitForCompletion(true) ? 0 : 1);

    }

}

  


推荐阅读
  • 使用Maven JAR插件将单个或多个文件及其依赖项合并为一个可引用的JAR包
    本文介绍了如何利用Maven中的maven-assembly-plugin插件将单个或多个Java文件及其依赖项打包成一个可引用的JAR文件。首先,需要创建一个新的Maven项目,并将待打包的Java文件复制到该项目中。通过配置maven-assembly-plugin,可以实现将所有文件及其依赖项合并为一个独立的JAR包,方便在其他项目中引用和使用。此外,该方法还支持自定义装配描述符,以满足不同场景下的需求。 ... [详细]
  • Python 伦理黑客技术:深入探讨后门攻击(第三部分)
    在《Python 伦理黑客技术:深入探讨后门攻击(第三部分)》中,作者详细分析了后门攻击中的Socket问题。由于TCP协议基于流,难以确定消息批次的结束点,这给后门攻击的实现带来了挑战。为了解决这一问题,文章提出了一系列有效的技术方案,包括使用特定的分隔符和长度前缀,以确保数据包的准确传输和解析。这些方法不仅提高了攻击的隐蔽性和可靠性,还为安全研究人员提供了宝贵的参考。 ... [详细]
  • 在对WordPress Duplicator插件0.4.4版本的安全评估中,发现其存在跨站脚本(XSS)攻击漏洞。此漏洞可能被利用进行恶意操作,建议用户及时更新至最新版本以确保系统安全。测试方法仅限于安全研究和教学目的,使用时需自行承担风险。漏洞编号:HTB23162。 ... [详细]
  • Python 程序转换为 EXE 文件:详细解析 .py 脚本打包成独立可执行文件的方法与技巧
    在开发了几个简单的爬虫 Python 程序后,我决定将其封装成独立的可执行文件以便于分发和使用。为了实现这一目标,首先需要解决的是如何将 Python 脚本转换为 EXE 文件。在这个过程中,我选择了 Qt 作为 GUI 框架,因为之前对此并不熟悉,希望通过这个项目进一步学习和掌握 Qt 的基本用法。本文将详细介绍从 .py 脚本到 EXE 文件的整个过程,包括所需工具、具体步骤以及常见问题的解决方案。 ... [详细]
  • 在处理 XML 数据时,如果需要解析 `` 标签的内容,可以采用 Pull 解析方法。Pull 解析是一种高效的 XML 解析方式,适用于流式数据处理。具体实现中,可以通过 Java 的 `XmlPullParser` 或其他类似的库来逐步读取和解析 XML 文档中的 `` 元素。这样不仅能够提高解析效率,还能减少内存占用。本文将详细介绍如何使用 Pull 解析方法来提取 `` 标签的内容,并提供一个示例代码,帮助开发者快速解决问题。 ... [详细]
  • 在 Vue 应用开发中,页面状态管理和跨页面数据传递是常见需求。本文将详细介绍 Vue Router 提供的两种有效方式,帮助开发者高效地实现页面间的数据交互与状态同步,同时分享一些最佳实践和注意事项。 ... [详细]
  • 本文介绍了一种自定义的Android圆形进度条视图,支持在进度条上显示数字,并在圆心位置展示文字内容。通过自定义绘图和组件组合的方式实现,详细展示了自定义View的开发流程和关键技术点。示例代码和效果展示将在文章末尾提供。 ... [详细]
  • 如何使用 `org.apache.poi.openxml4j.opc.PackagePart` 类中的 `loadRelationships()` 方法及其代码示例详解 ... [详细]
  • 本文介绍了如何利用Struts1框架构建一个简易的四则运算计算器。通过采用DispatchAction来处理不同类型的计算请求,并使用动态Form来优化开发流程,确保代码的简洁性和可维护性。同时,系统提供了用户友好的错误提示,以增强用户体验。 ... [详细]
  • 深入剖析Java中SimpleDateFormat在多线程环境下的潜在风险与解决方案
    深入剖析Java中SimpleDateFormat在多线程环境下的潜在风险与解决方案 ... [详细]
  • 使用 ListView 浏览安卓系统中的回收站文件 ... [详细]
  • 在Android平台中,播放音频的采样率通常固定为44.1kHz,而录音的采样率则固定为8kHz。为了确保音频设备的正常工作,底层驱动必须预先设定这些固定的采样率。当上层应用提供的采样率与这些预设值不匹配时,需要通过重采样(resample)技术来调整采样率,以保证音频数据的正确处理和传输。本文将详细探讨FFMpeg在音频处理中的基础理论及重采样技术的应用。 ... [详细]
  • 基于 Vue 和 Element UI 实现的简洁登录界面设计
    本文介绍了一种利用 Vue.js 和 Element UI 框架构建的简洁登录界面设计。该设计不仅注重用户体验,还确保了界面的美观性和易用性。通过合理的布局和组件配置,实现了高效、响应式的登录功能,适用于多种前端应用场景。 ... [详细]
  • 本文深入解析了 jQuery 中用于扩展功能的三个关键方法:`$.extend()`、`$.fn` 和 `$.fn.extend()`。其中,`$.extend()` 用于扩展 jQuery 对象本身,而 `$.fn.extend()` 则用于扩展 jQuery 的原型对象,使自定义方法能够作为 jQuery 实例的方法使用。通过这些方法,开发者可以轻松地创建和集成自定义插件,增强 jQuery 的功能。文章详细介绍了每个方法的用法、参数及实际应用场景,帮助读者更好地理解和运用这些强大的工具。 ... [详细]
  • 本文探讨了使用JavaScript在不同页面间传递参数的技术方法。具体而言,从a.html页面跳转至b.html时,如何携带参数并使b.html替代当前页面显示,而非新开窗口。文中详细介绍了实现这一功能的代码及注释,帮助开发者更好地理解和应用该技术。 ... [详细]
author-avatar
mobiledu2502890777
这个家伙很懒,什么也没留下!
Tags | 热门标签
RankList | 热门文章
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有