String par = args[i].trim();
if (par.startsWith("-")) {
String key = par.substring(1).trim();
i ++ ;
String value = null;
if (args.length>i) {
value = args[i].trim();
if (value.startsWith("\"") || value.startsWith("\'")) {
value = value.substring(1,value.length() - 1).trim();
}
}
map.put(key, value);
i ++ ;
}else {
i ++ ;
}
}
}
public Map getMap() {
return map;
}
}
2、拿到sql语句
public static String getSql(File file) throws Exception{
BufferedReader bf = new BufferedReader(new FileReader(file)) ;
StringBuffer sqlBuffer = new StringBuffer();
String temp = null;
while((temp=bf.readLine())!=null){
String tmp = temp.trim();
if (tmp.length()==0 || tmp.startsWith("#") || tmp.startsWith("--")) {
continue ;
}
sqlBuffer.append(tmp+ " ") ;
}
bf.close();
return sqlBuffer.toString();
}
3、将分析的参数带入sql中
public static final String BEGIN="{$" ;
public static final String END="}" ;
public static String parse(String sql, Map map)
{
int begin = sql.indexOf(BEGIN) ;
while(begin != -1)
{
String suffix = sql.substring(begin + BEGIN.length());
int end = begin + BEGIN.length() + suffix.indexOf(END) ;
String key = sql.substring(begin+BEGIN.length(), end).trim() ;
if (map != null && map.get(key) != null) {
sql = sql.substring(0, begin) + map.get(key) + sql.substring(end + 1, sql.length()) ;
}
else
{
throw new RuntimeException("Invalid Expression.....");
}
begin = sql.indexOf(BEGIN) ;
}
return sql ;
}
HiveF文件 (执行jar文件拿到拼接好的java语句,【hive -e可以在shell中执行sql,并且此文件配置到linux的环境变量下】)
hiveF ./rpt_sale_daily.hql -date $date
执行hive作业得到结果
hive2mysql
上面得到hive的结果,对配置文件进行分析删除存在的表,然后把结果从hive抽取到mysql中
hive2mysql ./aa.property -date $date
hive2mysql代码(用于拼接执行删除对应日期存在的表,和查询hive分析出来的数据插入到mysql中)
package com.cloudy.tools;
import java.io.FileInputStream;
import java.io.InputStream;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.Properties;
import org.apache.hadoop.metrics.spi.Util;
import com.ibeifeng.hiveF.ParseArgs;
import com.ibeifeng.hiveF.Utils;
public class Hive2Mysql {
public Hive2Mysql(String propertyName) throws Exception {
init(propertyName);
}
Properties prop = new Properties();
public void init(String propertyName) throws Exception {
InputStream stream = new FileInputStream(propertyName);
prop.load(stream);
}
public static void main(String[] args) {
try {
//if(args.length <1)
//{
//System.out.println("pls set propertyName!");
//System.exit(1);
//}
args &#61; new String[3];
args[0]&#61;"d:/aa.property";
args[1]&#61;"-date";
args[2]&#61;"2015-01-01";
String propertyName &#61; args[0];
ParseArgs parse &#61; new ParseArgs(args);
Hive2Mysql h2m &#61; new Hive2Mysql(propertyName);
System.out.println(h2m.prop.get("Hive_sql"));
System.out.println(h2m.prop.get("Mysql_table"));
String hive_sql &#61; h2m.prop.get("Hive_sql").toString();
hive_sql &#61; Utils.parse(hive_sql, parse.getMap());
//System.out.println("hive_sql" &#43; hive_sql);
String mysql_table &#61; h2m.prop.get("Mysql_table").toString();
String mysql_columns &#61; h2m.prop.get("mysql_columns").toString();
String mysql_delete &#61; h2m.prop.get("mysql_delete").toString();
mysql_delete &#61; Utils.parse(mysql_delete, parse.getMap());
// insert into mysql_table(pv,uv.huodong) values(123,234,"huodong");
String mysql_sql&#61;"insert into " &#43; mysql_table &#43; " (" &#43; mysql_columns&#43;") values(";
System.out.println("mysqldelete:" &#43; mysql_delete);
Connection mysqlCon &#61; MyConnection.getMySqlInstance();
Connection myHiveCon &#61; MyConnection.getHiveInstance();
//进行hive查询
Statement stHive &#61; myHiveCon.createStatement();
Statement stMsql &#61; mysqlCon.createStatement();
stMsql.execute(mysql_delete);
ResultSet rsHive &#61; stHive.executeQuery(hive_sql);
int len &#61; hive_sql.split("from")[0].split("select")[1].trim().split(",").length;
System.out.println(len);
String value &#61; "";
while(rsHive.next())
{
for(int i &#61;1;i<&#61;len;i&#43;&#43;)
{
value &#43;&#61; "&#39;" &#43; rsHive.getString(i) &#43; "&#39;,";
}
value &#61; value.substring(0,value.length()-1);
mysql_sql &#61; mysql_sql &#43; value &#43; ")";
//System.out.println(value);
System.out.println(mysql_sql);
stMsql.execute(mysql_sql);
value &#61;"";
mysql_sql&#61;"insert into " &#43; mysql_table &#43; " (" &#43; mysql_columns&#43;") values(";
}
//insert into mysql_table() values(123,234,"huodong")
} catch (Exception e) {
e.printStackTrace();
}
}
}