在设置完taskManager内存之后相当于向yarn申请这么大内存的container,然后flink内部的内存大部分是由flink框架管理,在启动container之前就会 预先计算各个内存块的大小。
内存块划分
// 默认值0.25f
final float memoryCutoffRatio = config.getFloat(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_RATIO);// 最少预留大小默认600MB
final int minCutoff = config.getInteger(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN);// 先减去一块内存预留给jvm
long cutoff = (long) (containerMemoryMB * memoryCutoffRatio);
if (cutoff
final long heapSizeMB = TaskManagerServices.calculateHeapSizeMB(javaMemorySizeMB, config);
// use the cut-off memory for off-heap (that was its intention)
// 计算得到堆外内存后,总内存减去得到堆内的大小
final long offHeapSize = javaMemorySizeMB == heapSizeMB ? -1L : containerMemoryMB - heapSizeMB;// (3) obtain the additional environment variables from the configuration
final HashMap
final String prefix = ResourceManagerOptions.CONTAINERIZED_TASK_MANAGER_ENV_PREFIX;for (String key : config.keySet()) {if (key.startsWith(prefix) && key.length() > prefix.length()) {// remove prefixString envVarKey = key.substring(prefix.length());envVars.put(envVarKey, config.getString(key, null));}
}
计算堆内内存大小TaskManagerServices.calculateHeapSizeMB
// 默认线上是开启堆外内存的,为了数据交换的过程只使用堆外内存,gc友好
if (useOffHeap) {// subtract the Java memory used for network buffersfinal long networkBufMB &#61; calculateNetworkBufferMemory(totalJavaMemorySize, config) >> 20; // bytes to megabytesfinal long remainingJavaMemorySizeMB &#61; totalJavaMemorySizeMB - networkBufMB;long offHeapSize &#61; config.getLong(TaskManagerOptions.MANAGED_MEMORY_SIZE);if (offHeapSize <&#61; 0) {// calculate off-heap section via fraction// 将划去networkBuffer大小*一个堆外的系数&#xff08;默认是0.7&#xff09;得到其他的堆外内存double fraction &#61; config.getFloat(TaskManagerOptions.MANAGED_MEMORY_FRACTION);offHeapSize &#61; (long) (fraction * remainingJavaMemorySizeMB);}TaskManagerServicesConfiguration.checkConfigParameter(offHeapSize
}
计算堆外内存大小calculateNetworkBufferMemory
Preconditions.checkArgument(totalJavaMemorySize > 0);int segmentSize &#61; config.getInteger(TaskManagerOptions.MEMORY_SEGMENT_SIZE);final long networkBufBytes;
// 涉及新老两个版本的参数&#xff0c;以前版本是直接设置networkbuffer的个数&#xff0c;但是比较难估计&#xff0c;新的版本是直接设置内存块大小
if (TaskManagerServicesConfiguration.hasNewNetworkBufConf(config)) {// new configuration based on fractions of available memory with selectable min and maxfloat networkBufFraction &#61; config.getFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION);long networkBufMin &#61; config.getLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN);long networkBufMax &#61; config.getLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX);TaskManagerServicesConfiguration.checkNetworkBufferConfig(segmentSize, networkBufFraction, networkBufMin, networkBufMax);// 通过networkFraction计算network的内存大小&#xff0c;这个fraction默认值是0.1&#xff0c;同时最小设置默认是64MnetworkBufBytes &#61; Math.min(networkBufMax, Math.max(networkBufMin,(long) (networkBufFraction * totalJavaMemorySize)));TaskManagerServicesConfiguration.checkConfigParameter(networkBufBytes
} else {// use old (deprecated) network buffers parameterint numNetworkBuffers &#61; config.getInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS);networkBufBytes &#61; (long) numNetworkBuffers * (long) segmentSize;TaskManagerServicesConfiguration.checkNetworkConfigOld(numNetworkBuffers);TaskManagerServicesConfiguration.checkConfigParameter(networkBufBytes
}return networkBufBytes;
小结
- JVM预留内存&#xff0c;总内存的20%&#xff0c;最小预留&#xff0c;600M
- 剩下的内存的10%作为networkBuffer的内存&#xff0c;最小64M
- 剩下内存30%设为堆内内存&#xff0c;总内存减去堆内内存设为directMemory&#xff0c;用于netty和rocksDB和networkBuffer以及JVM自身内存
举例
一个启动时设置TaskManager内存大小为1024MB
- 1024MB - (1024 * 0.2 <600MB) -> 600MB &#61; 424MB (cutoff)
- 424MB - (424MB * 0.1 <64MB) -> 64MB &#61; 360MB (networkbuffer)
- 360MB * (1 - 0.7) &#61; 108MB -> &#xff08;onHeap&#xff09;
- 1024MB - 108MB &#61; 916MB &#xff08;maxDirectMemory&#xff09;
最终启动命令&#xff1a;
yarn 46218 46212 1 Jan08 ? 00:17:50
/home/yarn/java-current/bin/java
-Xms109m -Xmx109m -XX:MaxDirectMemorySize&#61;915m -verbose:gc -XX:&#43;PrintGCDetails -XX:&#43;PrintGCDateStamps
-XX:&#43;UseGCLogFileRotation -XX:NumberOfGCLogFiles&#61;2 -XX:GCLogFileSize&#61;512M
-Xloggc:/data1/hadoopdata/nodemanager/logdir/application_1545981373722_0172/container_e194_1545981373722_0172_01_000005/taskmanager_gc.log
-XX:&#43;UseConcMarkSweepGC
-XX:CMSInitiatingOccupancyFraction&#61;75
-XX:&#43;UseCMSInitiatingOccupancyOnly
-XX:&#43;AlwaysPreTouch -server
-XX:&#43;HeapDumpOnOutOfMemoryError
-Dlog.file&#61;/data1/hadoopdata/nodemanager/logdir/application_1545981373722_0172/container_e194_1545981373722_0172_01_000005/taskmanager.log
-Dlogback.configurationFile&#61;file:./logback.xml
-Dlog4j.configuration&#61;file:./log4j.properties org.apache.flink.yarn.YarnTaskManager
--configDir .