热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

分享一个client模式spark程序

spark提交任务有client和cluster两种模式主要区别:是否将driver程序放在远程worker机器上执行。cluster模式由master挑选一个worker机器放置

spark提交任务有client和cluster两种模式

主要区别:是否将driver程序放在远程worker机器上执行。cluster模式由master挑选一个worker机器放置driver进程。

client模式,也叫交互模式,任务提交后客户端一直保持连接,并即时获得运行的信息。

cluster模式,也叫非交互模式,任务提交后由后台运行,关闭客户端不影响任务的执行,运行信息需要通过日志文件去查看

PS 参数解释

./bin/spark-submit \
--class <main-class> \
--master \
--deploy-mode \
--conf = \
... # other options
\
[application-arguments]

--deploy-mode: Whether to deploy your driver on the worker nodes (cluster) or locally as an external client (client) (default: client)

对于RDD创建临时表的影响

因为创建的临时表在driver内存中,所以client模式创建的临时表会占用本地内存,而cluster模式看起将临时表存储在了spark集群上

以下例子在客户端运行,从odps加载数据到spark,再创建了临时表用于后续查询,因为没有打成jar在提交到集群,只能采用client模式

pom配置
...
<java.version>1.8</java.version>
<spark.version>2.2.1</spark.version>
<scala.version>2.11</scala.version>
...

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.codehaus.janino</groupId>
<artifactId>commons-compiler</artifactId>
<version>2.7.8</version>
</dependency>
spring配置
<bean id="sparkConf" class="org.apache.spark.SparkConf">
<property name="AppName" value="${spark.appName}"/>
<property name="Master" value="${spark.master}"/>
</bean>
<bean id="mySparkSession" class="com.***.***.service.impl.MySparkSession" init-method="init">
<property name="sparkConf" ref="sparkConf"/>
<property name="env" value="${env}"/>
</bean>
MySparkSession. java
...
logger.info("env:{}",env);
if("product".equalsIgnoreCase(env)){
logger.warn("env is product without spark",env);
return;
}
if(sparkConf == null){
throw new IllegalArgumentException("error sparkConf cannot be null");
}
if(!isLocalhost()) {
sparkConf.set("spark.executor.cores", "12");
sparkConf.set("spark.cores.max", "12");
sparkConf.set("spark.executor.memory", "1g");// 由 Spark 应用程序启动时的 –executor-memory 或 spark.executor.memory 参数配置
sparkConf.set("spark.default.parallelism", "32");//3*4*3 //从16调到100,到500 远程任务数,spark 分配任务延迟(scheduler delay)从几十秒下降到几秒
sparkConf.set("spark.default.parallelism", "108");//3*4*3 //从16调到100,到500 远程任务数,spark 分配任务延迟(scheduler delay)从几十秒下降到几秒
}else{
sparkConf.set("spark.executor.cores", "6");
sparkConf.set("spark.executor.memory", "1g");
sparkConf.set("spark.default.parallelism", "54");
}
sparkConf.set("spark.executor.extraJavaOptions", "-XX:+PrintGCDetails -XX:+PrintGCTimeStamps");
sparkConf.set("spark.logConf", "true");//当SparkContext启动时,将有效的SparkConf记录为INFO。默认:false
sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");//当SparkContext启动时,将有效的SparkConf记录为INFO。默认:false
// sparkConf.set("spark.deploy.mode","cluster"); 这种方式设置无效
sparkContext =new SparkContext(sparkConf);
sparkSession = SparkSession.builder().sparkContext(sparkContext).getOrCreate();
properties配置
spark.appName = xx-application
spark.master = spark://127.0.0.1:7077
从odps加载数据
public void loadTables(boolean reload,boolean isTest) {
long startTime = 0;
List<User> users = null;
for(TableMapInfo entry : tableInfos){
String odpsTableName = entry.getOdpsTableName();
String sparkTableName = entry.getSparkTableName();
Class theClass = entry.getBeanClass();
boolean hasPartition = entry.isHasPartition();
String partition = null;
if(hasPartition){
partition = DateUtils.format(DateUtils.addDays(new Date(),-1),DateUtils.PATTERN_YEAR2DAY);
partition = entry.getPartitionPrefix()+"="+partition;
}
loadTableByPage(odpsTableName,sparkTableName,theClass,partition,isTest?11000:Integer.MAX_VALUE);
}
}
private <T> T loadTableByPage(String odpsTableName,String sparkTableName,Class<T> theClass,String partition,int limit) {
long startTime;
long pageSize = 10000;
long pageNo = 1;
long startNo = 0;
try {
// 如何分批加载到spark避免缓存过大导致oom,用aDataset.unionAll
TableTunnel tableTunnel = new TableTunnel(aliyunOdpsClient.getOdps());
PartitionSpec partitionSpec = StringUtils.isBlank(partition) ? null : new PartitionSpec(partition);
TableTunnel.DownloadSession downloadSession = partitionSpec != null ? tableTunnel.createDownloadSession(aliyunOdpsClient.getAliyunOdpsProject(), odpsTableName, partitionSpec)
: tableTunnel.createDownloadSession(aliyunOdpsClient.getAliyunOdpsProject(), odpsTableName);
long recordCount = downloadSession.getRecordCount();
recordCount = Math.min(recordCount,limit);
Dataset<Row> userAll = null;
long totalPages = recordCount%pageSize==0?recordCount/pageSize:recordCount/pageSize +1;
List<T> beans = null;
for(long i=0;i<totalPages;++i){
startTime = System.currentTimeMillis();
startNo = i*pageSize;
beans = OdpsReaderUtil.readTableByPage(downloadSession,odpsTableName, partition, theClass,startNo,pageSize,recordCount);
if(i==0){
userAll = spark.session().createDataFrame(beans, theClass);
}else{
userAll = userAll.unionAll(spark.session().createDataFrame(beans, theClass));
}
beans = null;
logger.warn("{} {} recordTotal:{} page:{}/{} cost:{} ms ", odpsTableName, partition, recordCount, i+1,totalPages,(System.currentTimeMillis() - startTime));
}
startTime = System.currentTimeMillis();
userAll.createOrReplaceGlobalTempView(sparkTableName);
userAll.persist();
userAll = null;
System.gc();
Thread.sleep(10*1000);
logger.info("create table cost:{} ms", (System.currentTimeMillis() - startTime));
return null;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
查询临时表
@Override
public QueryResult<Patient> queryPatientsByRule(UserGroupExecRequest request) {
QueryResult<Patient> queryResult = new QueryResult<>();
Encoder<Patient> encoder = Encoders.bean(Patient.class);
Encoder<Total> encoderTotal = Encoders.bean(Total.class);
Map<String,Object> params = new HashMap<>();
params.put("limit",500000000);
params.put("sex",request.getSex());
params.put("beginAge",request.getBeginAge());
params.put("endAge",request.getEndAge());
params.put("isInHospital",request.getIsInHospital());
params.put("startTime",parseDate(request.getStartTime()) );
params.put("endTime",parseDate(request.getEndTime()) );
String sql = sqlMapBuilder.getSql(TABLE_PATIENT,"selectPatients",params);
String sqlCount = sqlMapBuilder.getSql(TABLE_PATIENT,"countPatients",params);
Dataset<Patient> patientDataset = spark.session().sql(sql).as(encoder);
queryResult.setList(patientDataset.collectAsList());
return queryResult;
}
sqlmap:
<sparkSqlMap namespace="patient">
<sql id="selectPatients">
[CDATA[
select *
from global_temp.patient a
where 1=1
#if(${sex} && ${sex}!='')
and a.sex = '${sex}'
#end
#if(${beginAge} && ${beginAge}>=0)
and a.age >= '${beginAge}'
#end
#if(${endAge} && ${endAge}>=0)
and a.age <= '${endAge}'
#end
#if(${isInHospital})
and a.is_hospital = '${isInHospital}'
#end
#if(${startTime})
and a.bind_time >= ${startTime}
#end
#if(${endTime})
and a.bind_time <= ${endTime}
#end
limit ${limit}
]]>

PS.spark应用概念

https://blog.csdn.net/zhujianlin1990/article/details/79977560

不足
driver程序本地应用中跑,创建的临时表占用大量本地jvm内存,如果driver分配到的内存少存在oom风险



推荐阅读
  • CentOS 6.5安装VMware Tools及共享文件夹显示问题解决方法
    本文介绍了在CentOS 6.5上安装VMware Tools及解决共享文件夹显示问题的方法。包括清空CD/DVD使用的ISO镜像文件、创建挂载目录、改变光驱设备的读写权限等步骤。最后给出了拷贝解压VMware Tools的操作。 ... [详细]
  • 本文介绍了在开发Android新闻App时,搭建本地服务器的步骤。通过使用XAMPP软件,可以一键式搭建起开发环境,包括Apache、MySQL、PHP、PERL。在本地服务器上新建数据库和表,并设置相应的属性。最后,给出了创建new表的SQL语句。这个教程适合初学者参考。 ... [详细]
  • 阿,里,云,物,联网,net,core,客户端,czgl,aliiotclient, ... [详细]
  • 高质量SQL书写的30条建议
    本文提供了30条关于优化SQL的建议,包括避免使用select *,使用具体字段,以及使用limit 1等。这些建议是基于实际开发经验总结出来的,旨在帮助读者优化SQL查询。 ... [详细]
  • centos安装Mysql的方法及步骤详解
    本文介绍了centos安装Mysql的两种方式:rpm方式和绿色方式安装,详细介绍了安装所需的软件包以及安装过程中的注意事项,包括检查是否安装成功的方法。通过本文,读者可以了解到在centos系统上如何正确安装Mysql。 ... [详细]
  • TableAPI报一下异常:FieldtypesofqueryresultandregisteredTableSink
    报错信息如下:Exceptioninthread“main”org.apache.flink.table.api.ValidationException:Fieldtypesofq ... [详细]
  • 安装mysqlclient失败解决办法
    本文介绍了在MAC系统中,使用django使用mysql数据库报错的解决办法。通过源码安装mysqlclient或将mysql_config添加到系统环境变量中,可以解决安装mysqlclient失败的问题。同时,还介绍了查看mysql安装路径和使配置文件生效的方法。 ... [详细]
  • 本文介绍了如何使用php限制数据库插入的条数并显示每次插入数据库之间的数据数目,以及避免重复提交的方法。同时还介绍了如何限制某一个数据库用户的并发连接数,以及设置数据库的连接数和连接超时时间的方法。最后提供了一些关于浏览器在线用户数和数据库连接数量比例的参考值。 ... [详细]
  • Java学习笔记之面向对象编程(OOP)
    本文介绍了Java学习笔记中的面向对象编程(OOP)内容,包括OOP的三大特性(封装、继承、多态)和五大原则(单一职责原则、开放封闭原则、里式替换原则、依赖倒置原则)。通过学习OOP,可以提高代码复用性、拓展性和安全性。 ... [详细]
  • 这个问题困扰了我两天,卸载Dr.COM客户端(我们学校上网要装这个客户端登陆服务器,以后只能在网页里输入用户名和密码了),问题解决了。问题的现象:在实验室机台式机上安装openfire和sp ... [详细]
  • 7.4 基本输入源
    一、文件流1.在spark-shell中创建文件流进入spark-shell创建文件流。另外打开一个终端窗口,启动进入spark-shell上面在spark-shell中执行的程序 ... [详细]
  • 《Spark核心技术与高级应用》——1.2节Spark的重要扩展
    本节书摘来自华章社区《Spark核心技术与高级应用》一书中的第1章,第1.2节Spark的重要扩展,作者于俊向海代其锋马海平,更多章节内容可以访问云栖社区“华章社区”公众号查看1. ... [详细]
  • Spark Streaming和Kafka整合之路(最新版本)
    2019独角兽企业重金招聘Python工程师标准最近完成了SparkStreaming和Kafka的整合工作,耗时虽然不长,但是当中还是遇到了不少 ... [详细]
  • spark的任务已经执行完成:scalavallinesc.textFile(hdfs:vm122:9000dblp.rdf)line:org.apache ... [详细]
  • 基于,docker,快速,部署,多,需求,spark ... [详细]
author-avatar
天才愤青2_735
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有