package com.zhongxin.mr;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import java.io.IOException;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;
/**
* Created by DingYS on 2017/12/7.
* 用户回款计划统计(详情)
*/
public class UserPlanAmount {
public static class StatisticsMap extends Mapper {
private Text outKey = new Text();
private Text outValue = new Text();
private Pattern pattern = Pattern.compile(",");
//statistics文件处理
public void map(LongWritable key, Text value, Context context) throws IOException,InterruptedException{
String strs[] = pattern.split(String.valueOf(value));
String bidNo = strs[2];
String userId = strs[3];
String totalOnInvestedShare= strs[8];
String addShare = strs[17];
String addyield = strs[16];
String outv = bidNo + pattern +"statstics" + pattern + userId + pattern + totalOnInvestedShare + pattern + addShare + pattern + addyield;
outKey.set(bidNo);
outValue.set(outv);
context.write(outKey,outValue);
}
}
public static class PlanMap extends Mapper {
private Text outKey = new Text();
private Text outValue = new Text();
private Pattern pattern = Pattern.compile(",");
// plan统计表(该文件在sqoop导入时就进行了数据计算及合并)
public void map(LongWritable key,Text value,Context context) throws IOException,InterruptedException{
String strs[] = pattern.split(String.valueOf(value));
String bidNo = strs[0];
String interestTime = strs[1];
String status = strs[2];
String planStatus = strs[3];
String yield = strs[4];
String endDate = strs[6];
String cycle = strs[7];
String financedAmount = strs[8];
String interestType = strs[9];
String penaltyAmount = strs[10];
String days = strs[11];
if("INIT".equals(status)){
String ouv = bidNo + pattern + "plan" + pattern + interestTime + pattern + planStatus + pattern + yield + pattern +
cycle + pattern + financedAmount + pattern + interestType + pattern + penaltyAmount + pattern + days + pattern + endDate;
outKey.set(bidNo);
outValue.set(ouv);
context.write(outKey,outValue);
}
}
}
public static class Reduce extends Reducer{
private Text outValue = new Text();
private Pattern pattern = Pattern.compile(",");
public void reduce(Text key,Iterable values,Context context) throws IOException,InterruptedException{
Map> planMap = new HashMap>();
List statisticsLst = new ArrayList();
for(Text value : values){
String strs[] = pattern.split(String.valueOf(value));
String pbidNo = strs[0];
if("plan".equals(strs[1])){
if(planMap.containsKey(pbidNo)){
planMap.get(pbidNo).add(String.valueOf(value));
}else{
List planLst = new ArrayList();
planLst.add(String.valueOf(value));
planMap.put(pbidNo,planLst);
}
}else{
statisticsLst.add(String.valueOf(value));
}
}
for(String value : statisticsLst){
String strs[] = pattern.split(String.valueOf(value));
String bidNo = strs[0];
String userId = strs[2];
String totalOnInvestedShare= strs[3];
String addShare = strs[4];
String addyield = strs[5];
if(null == planMap.get(bidNo) || 0 >= planMap.get(bidNo).size()){
continue;
}
String planBid = planMap.get(bidNo).get(0);
if(StringUtils.isBlank(planBid)){
continue;
}
String interestType = pattern.split(planBid)[7];
if("A1".equals(interestType)){
// 到期还本付息
for(String v : planMap.get(bidNo)){
String strp[] = pattern.split(v);
String interestTime = strp[2];
String yield = strp[4];
String cycle = strp[5];
BigDecimal interest = new BigDecimal(totalOnInvestedShare).multiply(new BigDecimal(yield)).divide(new BigDecimal(100),4);
BigDecimal addInterest = new BigDecimal(0);
if(StringUtils.isNotBlank(addShare) && StringUtils.isNotBlank(addyield)){
addInterest = new BigDecimal(addShare).multiply(new BigDecimal(addyield)).divide(new BigDecimal(100),4);
}
BigDecimal totalInterest = interest.add(addInterest).multiply(new BigDecimal(cycle)).divide(new BigDecimal(365),2);
String outv = userId + pattern + bidNo + pattern + interestTime + pattern + totalInterest + 0.00 + 0.00;
outValue.set(outv);
context.write(key,outValue);
}
}else{
// 按月付息,按季付息
for(String v : planMap.get(bidNo)){
String strp[] = pattern.split(v);
String interestTime = strp[2];
String yield = strp[4];
String days = strp[9];
String endDate = strp[10];
String penaltyTotalAmount = strp[8];
String financeAmount = strp[6];
BigDecimal interest = new BigDecimal(totalOnInvestedShare).multiply(new BigDecimal(yield)).divide(new BigDecimal(100),4);
BigDecimal addInterest = new BigDecimal(0);
if("null".equals(addShare) && "null".equals(addyield)){
addInterest = new BigDecimal(addShare).multiply(new BigDecimal(addyield)).divide(new BigDecimal(100),4);
}
BigDecimal totalInterest = interest.add(addInterest).multiply(new BigDecimal(days)).divide(new BigDecimal(365),2);
String planSttus = strp[3];
BigDecimal penalty = new BigDecimal(0);
BigDecimal capital = new BigDecimal(0);
if("ADVANCE".equals(planSttus)){
// 提前还款
penalty = new BigDecimal(penaltyTotalAmount).divide(new BigDecimal(financeAmount),2).multiply(new BigDecimal(totalOnInvestedShare));
totalInterest = totalInterest.add(penalty);
capital = new BigDecimal(totalOnInvestedShare);
}
/**
* 最后一次派息capital记成totalOnInvestedShare
*/
if(interestTime.equals(endDate)){
capital = new BigDecimal(totalOnInvestedShare);
}
String outv = userId + pattern + bidNo + pattern + interestTime + pattern + totalInterest +pattern + capital + pattern + penalty;
outValue.set(outv);
context.write(key,outValue);
}
}
}
}
}
public static void main(String[] args) throws Exception{
Configuration cOnfig= new Configuration();
Job job = Job.getInstance(config);
job.setJobName("userPlanAmount");
job.setJarByClass(UserPlanAmount.class);
MultipleInputs.addInputPath(job,new Path(args[0]), TextInputFormat.class,StatisticsMap.class);
MultipleInputs.addInputPath(job,new Path(args[1]),TextInputFormat.class,PlanMap.class);
job.setReducerClass(Reduce.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileOutputFormat.setOutputPath(job, new Path(args[2]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}