sequence

This article focuses on the managed Memory of Flink TaskManager

TaskManagerOptions

Flink – core – 1.7.2 – sources jar! /org/apache/flink/configuration/TaskManagerOptions.java

@PublicEvolving
public class TaskManagerOptions {
	//......

	/**
	 * JVM heap size forthe TaskManagers with memory size. */ @Documentation.CommonOption(position = Documentation.CommonOption.POSITION_MEMORY)  public static final ConfigOption<String> TASK_MANAGER_HEAP_MEMORY = key("taskmanager.heap.size")
			.defaultValue("1024m")
			.withDescription("JVM heap size for the TaskManagers, which are the parallel workers of" +
					" the system. On YARN setups, this value is automatically configured to the size of the TaskManager's" +
					" YARN container, minus a certain tolerance value.");

	/**
	 * Amount of memory to be allocated by the task manager's memory manager. If not * set, a relative fraction will be allocated, as defined by {@link #MANAGED_MEMORY_FRACTION}. */ public static final ConfigOption
      
        MANAGED_MEMORY_SIZE = key("taskmanager.memory.size") .defaultValue("0") .withDescription("Amount of memory to be allocated by the task manager'
      s memory manager."+" If not set, a relative fraction will be allocated."); /** * Fraction of free memory allocated by the memory manager if {@link #MANAGED_MEMORY_SIZE} is * not set. */ public static final ConfigOption
      
        MANAGED_MEMORY_FRACTION = key("
      taskmanager.memory.fraction"). DefaultValue (0.7 f). WithDescription ("The relative amount of memory (after subtracting the amount of memory used by network"+" buffers) that the task manager reserves for sorting, hash tables, and caching of intermediate results."+" For example, a value of `0.8` means that a task manager reserves 80% of its memory"+" for internal data buffers, leaving 20% of free memory for the task manager's heap for objects" + " created by user-defined functions. This parameter is only evaluated, if " + MANAGED_MEMORY_SIZE.key() + " is not set."); /** * Memory allocation method (JVM heap or off-heap), used for managed memory of the TaskManager * as well as the network buffers. **/ public static final ConfigOption
      
        MEMORY_OFF_HEAP = key("taskmanager.memory.off-heap") .defaultValue(false) .withDescription("Memory  allocation method (JVM heap or off-heap), used for managed memory of the" + " TaskManager as well as the network buffers."); /** * Whether TaskManager managed memory should be pre-allocated when the TaskManager is starting. */ public static final ConfigOption
       
         MANAGED_MEMORY_PRE_ALLOCATE = key("taskmanager.memory.preallocate") .defaultValue(false) .withDescription("Whether TaskManager managed memory should be pre-allocated when the TaskManager is starting."); / /... }
       
      Copy the code
  • Taskmanager.memory. size sets the size of memory managed by task Manager Memory Manager.It is mainly used for sorting,hashing and caching), defaults to 0; Taskmanager.heap. size sets the taskManager heap and offHeap memory

TaskManagerServices.calculateHeapSizeMB

Flink – runtime_2. 11-1.7.2 – sources. The jar! /org/apache/flink/runtime/taskexecutor/TaskManagerServices.java

public class TaskManagerServices {
	//......

	/**
	 * Calculates the amount of heap memory to use (to set via <tt>-Xmx</tt> and <tt>-Xms</tt>)
	 * based on the total memory to use and the given configuration parameters.
	 *
	 * @param totalJavaMemorySizeMB
	 * 		overall available memory to use (heap and off-heap)
	 * @param config
	 * 		configuration object
	 *
	 * @return heap memory to use (in megabytes)
	 */
	public static long calculateHeapSizeMB(long totalJavaMemorySizeMB, Configuration config) {
		Preconditions.checkArgument(totalJavaMemorySizeMB > 0);

		// subtract the Java memory used for network buffers (always off-heap)
		final long networkBufMB =
			calculateNetworkBufferMemory(
				totalJavaMemorySizeMB << 20, // megabytes to bytes
				config) >> 20; // bytes to megabytes
		final long remainingJavaMemorySizeMB = totalJavaMemorySizeMB - networkBufMB;

		// split the available Java memory between heap and off-heap

		final boolean useOffHeap = config.getBoolean(TaskManagerOptions.MEMORY_OFF_HEAP);

		final long heapSizeMB;
		if (useOffHeap) {

			long offHeapSize;
			String managedMemorySizeDefaultVal = TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue();
			if(! config.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE).equals(managedMemorySizeDefaultVal)) { try { offHeapSize = MemorySize.parse(config.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE), MEGA_BYTES).getMebiBytes(); } catch (IllegalArgumentException e) { throw new IllegalConfigurationException("Could not read "+ TaskManagerOptions.MANAGED_MEMORY_SIZE.key(), e); }}else {
				offHeapSize = Long.valueOf(managedMemorySizeDefaultVal);
			}

			if (offHeapSize <= 0) {
				// calculate off-heap section via fraction
				double fraction = config.getFloat(TaskManagerOptions.MANAGED_MEMORY_FRACTION);
				offHeapSize = (long) (fraction * remainingJavaMemorySizeMB);
			}

			TaskManagerServicesConfiguration
				.checkConfigParameter(offHeapSize < remainingJavaMemorySizeMB, offHeapSize,
					TaskManagerOptions.MANAGED_MEMORY_SIZE.key(),
					"Managed memory size too large for " + networkBufMB +
						" MB network buffer memory and a total of " + totalJavaMemorySizeMB +
						" MB JVM memory");

			heapSizeMB = remainingJavaMemorySizeMB - offHeapSize;
		} else {
			heapSizeMB = remainingJavaMemorySizeMB;
		}

		returnheapSizeMB; } / /... }Copy the code
  • Taskmanager. Memory. The size value less than or equal to 0, can according to the taskmanager. Memory. To allocate fraction configuration, the default is 0.7
  • If taskManager.memory.off-heap is enabled, The taskmanager. Memory. Fraction * (taskmanager. Heap. Size – networkBufMB) it is concluded that value as the task manager offHeapSize manager memory management
  • If taskManager.memory. off-heap is enabled, the Xmx value of taskManager is taskManager.heap. size -networkbufmb -offheapsize

TaskManagerServices.createMemoryManager

Flink – runtime_2. 11-1.7.2 – sources. The jar! /org/apache/flink/runtime/taskexecutor/TaskManagerServices.java

public class TaskManagerServices {
	//......

	/**
	 * Creates a {@link MemoryManager} from the given {@link TaskManagerServicesConfiguration}.
	 *
	 * @param taskManagerServicesConfiguration to create the memory manager from
	 * @param freeHeapMemoryWithDefrag an estimate of the size of the free heap memory
	 * @param maxJvmHeapMemory the maximum JVM heap size
	 * @returnMemory manager * @throws Exception */ private static MemoryManager createMemoryManager( TaskManagerServicesConfiguration  taskManagerServicesConfiguration, long freeHeapMemoryWithDefrag, long maxJvmHeapMemory) throws Exception { // computing the amount of memory to use depends on how much memory is available // it strictly needs to happen AFTER the network stack has been initialized // checkif a value has been configured
		long configuredMemory = taskManagerServicesConfiguration.getConfiguredMemory();

		MemoryType memType = taskManagerServicesConfiguration.getMemoryType();

		final long memorySize;

		boolean preAllocateMemory = taskManagerServicesConfiguration.isPreAllocateMemory();

		if (configuredMemory > 0) {
			if (preAllocateMemory) {
				LOG.info("Using {} MB for managed memory." , configuredMemory);
			} else {
				LOG.info("Limiting managed memory to {} MB, memory will be allocated lazily." , configuredMemory);
			}
			memorySize = configuredMemory << 20; // megabytes to bytes
		} else {
			// similar to #calculateNetworkBufferMemory(TaskManagerServicesConfiguration tmConfig)
			float memoryFraction = taskManagerServicesConfiguration.getMemoryFraction();

			if (memType == MemoryType.HEAP) {
				// network buffers allocated off-heap -> use memoryFraction of the available heap:
				long relativeMemSize = (long) (freeHeapMemoryWithDefrag * memoryFraction);
				if (preAllocateMemory) {
					LOG.info("Using {} of the currently free heap space for managed heap memory ({} MB)." ,
						memoryFraction , relativeMemSize >> 20);
				} else {
					LOG.info("Limiting managed memory to {} of the currently free heap space ({} MB), " +
						"memory will be allocated lazily." , memoryFraction , relativeMemSize >> 20);
				}
				memorySize = relativeMemSize;
			} else if(memType == MemoryType.OFF_HEAP) { // The maximum heap memory has been adjusted according to the fraction (see // calculateHeapSizeMB(long totalJavaMemorySizeMB, Configuration config)), i.e. // maxJvmHeap = jvmTotalNoNet - jvmTotalNoNet * memoryFraction = jvmTotalNoNet * (1 - memoryFraction) // DirectMemorySize = jvmTotalNoNet * memoryFraction Long directMemorySize = (long) (maxJvmHeapMemory/(1.0 - memoryFraction) * memoryFraction);if (preAllocateMemory) {
					LOG.info("Using {} of the maximum memory size for managed off-heap memory ({} MB)." ,
						memoryFraction, directMemorySize >> 20);
				} else {
					LOG.info("Limiting managed memory to {} of the maximum memory size ({} MB)," +
						" memory will be allocated lazily.", memoryFraction, directMemorySize >> 20);
				}
				memorySize = directMemorySize;
			} else {
				throw new RuntimeException("No supported memory type detected.");
			}
		}

		// now start the memory manager
		final MemoryManager memoryManager;
		try {
			memoryManager = new MemoryManager(
				memorySize,
				taskManagerServicesConfiguration.getNumberOfSlots(),
				taskManagerServicesConfiguration.getNetworkConfig().networkBufferSize(),
				memType,
				preAllocateMemory);
		} catch (OutOfMemoryError e) {
			if (memType == MemoryType.HEAP) {
				throw new Exception("OutOfMemory error (" + e.getMessage() +
					") while allocating the TaskManager heap memory (" + memorySize + " bytes).", e);
			} else if (memType == MemoryType.OFF_HEAP) {
				throw new Exception("OutOfMemory error (" + e.getMessage() +
					") while allocating the TaskManager off-heap memory (" + memorySize +
					" bytes).Try increasing the maximum direct memory (-XX:MaxDirectMemorySize)", e);
			} else{ throw e; }}returnmemoryManager; } / /... }Copy the code
  • TaskManagerServices provides a private static method, createMemoryManager, to createMemoryManager based on the configuration; The memorySize is recalculated based on the MemoryType, which is then passed to the constructor of the MemoryManager to create the MemoryManager
  • When memType is memorytype. HEAP, the memorySize is relativeMemSize. relativeMemSize = (long) (freeHeapMemoryWithDefrag * memoryFraction)
  • If the memType is memorytype. OFF_HEAP, the memorySize is directMemorySize, directMemorySize = jvmTotalNoNet * memoryFraction, MaxJvmHeap = jvmTotalNoNet – jvmTotalNoNet * memoryFraction = jvmTotalNoNet * (1 – memoryFraction), Thus directMemorySize = (long) (maxJvmHeapMemory/(1.0 – memoryFraction) * memoryFraction)

TaskManagerServicesConfiguration

Flink – runtime_2. 11-1.7.2 – sources. The jar! /org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java

public class TaskManagerServicesConfiguration {
	//......

	/**
	 * Utility method to extract TaskManager config parameters from the configuration and to
	 * sanity check them.
	 *
	 * @param configuration The configuration.
	 * @param remoteAddress identifying the IP address under which the TaskManager will be accessible
	 * @param localCommunication True, to skip initializing the network stack.
	 *                                      Use only in cases where only one task manager runs.
	 * @return TaskExecutorConfiguration that wrappers InstanceConnectionInfo, NetworkEnvironmentConfiguration, etc.
	 */
	public static TaskManagerServicesConfiguration fromConfiguration(
			Configuration configuration,
			InetAddress remoteAddress,
			boolean localCommunication) throws Exception {

		// we need this because many configs have been written with a "1" entry
		int slots = configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1);
		if (slots == -1) {
			slots = 1;
		}

		final String[] tmpDirs = ConfigurationUtils.parseTempDirectories(configuration);
		String[] localStateRootDir = ConfigurationUtils.parseLocalStateDirectories(configuration);

		if (localStateRootDir.length == 0) {
			// default to temp dirs.
			localStateRootDir = tmpDirs;
		}

		boolean localRecoveryMode = configuration.getBoolean(
			CheckpointingOptions.LOCAL_RECOVERY.key(),
			CheckpointingOptions.LOCAL_RECOVERY.defaultValue());

		final NetworkEnvironmentConfiguration networkConfig = parseNetworkEnvironmentConfiguration(
			configuration,
			localCommunication,
			remoteAddress,
			slots);

		final QueryableStateConfiguration queryableStateConfig =
				parseQueryableStateConfiguration(configuration);

		// extract memory settings
		long configuredMemory;
		String managedMemorySizeDefaultVal = TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue();
		if(! configuration.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE).equals(managedMemorySizeDefaultVal)) { try { configuredMemory = MemorySize.parse(configuration.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE), MEGA_BYTES).getMebiBytes(); } catch (IllegalArgumentException e) { throw new IllegalConfigurationException("Could not read "+ TaskManagerOptions.MANAGED_MEMORY_SIZE.key(), e); }}else{ configuredMemory = Long.valueOf(managedMemorySizeDefaultVal); } checkConfigParameter( configuration.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE).equals(TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultVal ue()) || configuredMemory > 0, configuredMemory, TaskManagerOptions.MANAGED_MEMORY_SIZE.key(),"MemoryManager needs at least one MB of memory. " +
				"If you leave this config parameter empty, the system automatically " +
				"pick a fraction of the available memory.");

		// check whether we use heap or off-heap memory
		final MemoryType memType;
		if (configuration.getBoolean(TaskManagerOptions.MEMORY_OFF_HEAP)) {
			memType = MemoryType.OFF_HEAP;
		} else {
			memType = MemoryType.HEAP;
		}

		boolean preAllocateMemory = configuration.getBoolean(TaskManagerOptions.MANAGED_MEMORY_PRE_ALLOCATE);

		floatmemoryFraction = configuration.getFloat(TaskManagerOptions.MANAGED_MEMORY_FRACTION); CheckConfigParameter (memoryFraction > 0.0f && memoryFraction < 1.0f, memoryFraction, TaskManagerOptions.MANAGED_MEMORY_FRACTION.key(),"MemoryManager fraction of the free memory must be between 0.0 and 1.0");

		long timerServiceShutdownTimeout = AkkaUtils.getTimeout(configuration).toMillis();

		return new TaskManagerServicesConfiguration(
			remoteAddress,
			tmpDirs,
			localStateRootDir,
			localRecoveryMode, networkConfig, queryableStateConfig, slots, configuredMemory, memType, preAllocateMemory, memoryFraction, timerServiceShutdownTimeout, ConfigurationUtils.getSystemResourceMetricsProbingInterval(configuration)); } / /... }Copy the code
  • FromConfiguration TaskManagerServicesConfiguration provides a static method, is used to create TaskManagerServicesConfiguration from the Configuration; Memory.off-heap memType is memorytype.off_heap if true, memoryType.heap otherwise

summary

  • The managed memory category of TaskManager is heap and offHeap. Taskmanager.memory. size sets the size of memory managed by task Manager Memory Manager.It is mainly used for sorting,hashing and caching), defaults to 0; Taskmanager.heap. Size sets the memory of the TaskManager heap and offHeap. Taskmanager. Memory. The size value less than or equal to 0, can according to the taskmanager. Memory. Fraction configuration to allocate, the default is 0.7; If taskManager.memory.off-heap is enabled, The taskmanager. Memory. Fraction * (taskmanager. Heap. Size – networkBufMB) it is concluded that value as the task manager the memory manager administration offHeapSize; If taskManager.memory. off-heap is enabled, the Xmx value of taskManager is taskManager.heap. size -networkbufmb -offheapsize
  • TaskManagerServices provides a private static method, createMemoryManager, to createMemoryManager based on the configuration; This recalculates the memorySize based on the MemoryType, which is then passed to the constructor of the MemoryManager to create the MemoryManager; When memType is memorytype. HEAP, the memorySize is relativeMemSize. RelativeMemSize = (long) (freeHeapMemoryWithDefrag * memoryFraction); relativeMemSize = (long) (freeHeapMemoryWithDefrag * memoryFraction); When memType is memorytype. HEAP, the memorySize is relativeMemSize. RelativeMemSize = (long) (freeHeapMemoryWithDefrag * memoryFraction); relativeMemSize = (long) (freeHeapMemoryWithDefrag * memoryFraction); If the memType is memorytype. OFF_HEAP, the memorySize is directMemorySize, directMemorySize = jvmTotalNoNet * memoryFraction, MaxJvmHeap = jvmTotalNoNet – jvmTotalNoNet * memoryFraction = jvmTotalNoNet * (1 – memoryFraction), Thus directMemorySize = (long) (maxJvmHeapMemory/(1.0 – memoryFraction) * memoryFraction)
  • FromConfiguration TaskManagerServicesConfiguration provides a static method, is used to create TaskManagerServicesConfiguration from the Configuration; Memory.off-heap memType is memorytype.off_heap if true, memoryType.heap otherwise

doc

  • taskmanager-memory-size