作者:董福强 | 来源:互联网 | 2023-08-20 15:43
sqoop2相对sqoop1,最大的优势就是提供了API方式来调用,这样第三方用户就可以根据自己的逻辑进行订制。这里记录下使用sqoop2将mysql数据导入hdfs,hdfs导
sqoop2 相对 sqoop1,最大的优势就是提供了API方式来调用,这样第三方用户就可以根据自己的逻辑进行订制。这里记录下使用 sqoop2 将 mysql 数据导入 hdfs,hdfs 导出到 mysql 两种数据同步。
相关软件
- sqoop 1.99.7
- hadoop 2.6.0
相关代码
import org.apache.sqoop.client.SqoopClient
import org.apache.sqoop.model.*
import org.apache.sqoop.submission.counter.Counter
import org.apache.sqoop.submission.counter.CounterGroup
import org.apache.sqoop.submission.counter.Counters
import org.apache.sqoop.validation.Status
import java.util.Arrays
import java.util.UUID
public class Main {
static SqoopClient client
public static MLink createMysqlLink() {
MLink link = client.createLink("generic-jdbc-connector")
// 随机生成名字,不能过长,否则会报错
link.setName("jdbc-link" + UUID.randomUUID().toString().substring(0, 10))
link.setCreationUser("xigua")
MLinkConfig linkCOnfig= link.getConnectorLinkConfig()
linkConfig.getStringInput("linkConfig.connectionString").setValue("jdbc:mysql://10.9.51.13:3008/data_platform_model")
linkConfig.getStringInput("linkConfig.jdbcDriver").setValue("com.mysql.jdbc.Driver")
linkConfig.getStringInput("linkConfig.username").setValue("hive")
linkConfig.getStringInput("linkConfig.password").setValue("hive")
// 这里必须指定 identifierEnclose, 他默认是双引号,也会报错
linkConfig.getStringInput("dialect.identifierEnclose").setValue("`")
Status status = client.saveLink(link)
if (status.canProceed()) {
System.out.println("Created Link with Link Name : " + link.getName())
return link
} else {
System.out.println("Something went wrong creating the link")
return null
}
}
public static MLink createHdfsLink() {
MLink link = client.createLink("hdfs-connector")
link.setName("hdfs-link" + UUID.randomUUID().toString().substring(0, 10))
link.setCreationUser("xigua")
MLinkConfig linkCOnfig= link.getConnectorLinkConfig()
linkConfig.getStringInput("linkConfig.uri").setValue("hdfs://qabb-qa-hdp-hbase0:8020")
linkConfig.getStringInput("linkConfig.confDir").setValue("/data/users/huser/hadoop/etc/hadoop/")
Status status = client.saveLink(link)
if (status.canProceed()) {
System.out.println("Created Link with Link Name : " + link.getName())
return link
} else {
System.out.println("Something went wrong creating the link")
return null
}
}
public static String createMysql2HdfsJob(MLink fromLink, MLink toLink) {
MJob job = client.createJob(fromLink.getName(), toLink.getName())
job.setName("xigua-job" + UUID.randomUUID())
job.setCreationUser("xigua")
MFromConfig fromJobCOnfig= job.getFromJobConfig()
fromJobConfig.getStringInput("fromJobConfig.schemaName").setValue("data_platform_model")
fromJobConfig.getStringInput("fromJobConfig.tableName").setValue("data_import")
fromJobConfig.getListInput("fromJobConfig.columnList").setValue(Arrays.asList("id", "owner", "alertUserList", "cron"))
MToConfig toJobCOnfig= job.getToJobConfig()
toJobConfig.getStringInput("toJobConfig.outputDirectory").setValue("/tmp/sqoop-job/" + UUID.randomUUID())
toJobConfig.getEnumInput("toJobConfig.outputFormat").setValue("TEXT_FILE")
toJobConfig.getEnumInput("toJobConfig.compression").setValue("NONE")
toJobConfig.getBooleanInput("toJobConfig.overrideNullValue").setValue(true)
MDriverConfig driverCOnfig= job.getDriverConfig()
driverConfig.getIntegerInput("throttlingConfig.numExtractors").setValue(1)
Status status = client.saveJob(job)
if (status.canProceed()) {
System.out.println("Created Job with Job Name: " + job.getName())
return job.getName()
} else {
System.out.println("Something went wrong creating the job")
return null
}
}
public static String createHdfs2MysqlJob(MLink fromLink, MLink toLink) {
MJob job = client.createJob(fromLink.getName(), toLink.getName())
job.setName("xigua-job" + UUID.randomUUID())
job.setCreationUser("xigua")
MFromConfig fromJobCOnfig= job.getFromJobConfig()
fromJobConfig.getStringInput("fromJobConfig.inputDirectory").setValue("/tmp/sqoop-job/4/")
MToConfig toJobCOnfig= job.getToJobConfig()
toJobConfig.getStringInput("toJobConfig.tableName").setValue("sqoop_jobs")
toJobConfig.getListInput("toJobConfig.columnList").setValue(Arrays.asList("id", "name", "alerts", "cron"))
MDriverConfig driverCOnfig= job.getDriverConfig()
driverConfig.getIntegerInput("throttlingConfig.numExtractors").setValue(1)
Status status = client.saveJob(job)
if (status.canProceed()) {
System.out.println("Created Job with Job Name: " + job.getName())
return job.getName()
} else {
System.out.println("Something went wrong creating the job")
return null
}
}
static void startJob(String jobName) {
//Job start
MSubmission submission = client.startJob(jobName)
System.out.println("Job Submission Status : " + submission.getStatus())
if (submission.getStatus().isRunning() && submission.getProgress() != -1) {
System.out.println("Progress : " + String.format("%.2f %%", submission.getProgress() * 100))
}
System.out.println("Hadoop job id :" + submission.getExternalJobId())
System.out.println("Job link : " + submission.getExternalLink())
Counters counters = submission.getCounters()
if (counters != null) {
System.out.println("Counters:")
for (CounterGroup group : counters) {
System.out.print("\t")
System.out.println(group.getName())
for (Counter counter : group) {
System.out.print("\t\t")
System.out.print(counter.getName())
System.out.print(": ")
System.out.println(counter.getValue())
}
}
}
}
public static void main(String[] args) {
// 注意sqoop 后面有个 /,如果没有会报下面的错,非常诡异
// Exception: org.apache.sqoop.common.SqoopException Message: CLIENT_0004:Unable to find valid Kerberos ticket cache (kinit)
String url = "http://10.9.57.158:12000/sqoop/"
client = new SqoopClient(url)
System.out.println(client)
MLink mysqlLink = createMysqlLink()
MLink hdfsLink = createHdfsLink()
// 先把数据导入 hdfs
startJob(createMysql2HdfsJob(mysqlLink, hdfsLink))
// 然后再把数据导回 mysql
startJob(createHdfs2MysqlJob(hdfsLink, mysqlLink))
}
}