UnsafeBuffer的分配和释放
调研在spark中使用UnsafeBuffer分配/释放的内存使用TaskMemoryManager进行记录。
TaskMemoryManager
TaskMemoryManager持有的对象说明(方案相关):
-
memoryManager(实际是UnifiedMemoryManager):用于实际的申请内存检查及内存申请。
-
consumers:申请过内存的申请者集合。用于在内存不够时将部分对象申请的内存spill到磁盘。
-
pageTable:页面表。记录申请的页面。
任务在申请内存时都会调用到TaskMemoryManager的allocatePage方法,该方法接受两个参数:申请的内存大小和申请者。申请者需要是继承并实现了MemoryConsumer相关接口的类。方法内部调用acquireExecutionMemory方法进行内存分配记录,调用UnifiedMemoryManager的相关接口进行内存申请,申请到的page会加入到pageTable中进行维护。
public MemoryBlock allocatePage(long size, MemoryConsumer consumer) {
assert(consumer != null);
assert(consumer.getMode() == tungstenMemoryMode);
if (size > MAXIMUM_PAGE_SIZE_BYTES) {
throw new IllegalArgumentException(
"Cannot allocate a page with more than " + MAXIMUM_PAGE_SIZE_BYTES + " bytes");
}
long acquired = acquireExecutionMemory(size, consumer);
if (acquired <= 0) {
return null;
}
final int pageNumber;
synchronized (this) {
pageNumber = allocatedPages.nextClearBit(0);
if (pageNumber >= PAGE_TABLE_SIZE) {
releaseExecutionMemory(acquired, consumer);
throw new IllegalStateException(
"Have already allocated a maximum of " + PAGE_TABLE_SIZE + " pages");
}
allocatedPages.set(pageNumber);
}
MemoryBlock page = null;
try {
page = memoryManager.tungstenMemoryAllocator().allocate(acquired);
} catch (OutOfMemoryError e) {
logger.warn("Failed to allocate a page ({} bytes), try again.", acquired);
// there is no enough memory actually, it means the actual free memory is smaller than
// MemoryManager thought, we should keep the acquired memory.
synchronized (this) {
acquiredButNotUsed += acquired;
allocatedPages.clear(pageNumber);
}
// this could trigger spilling to free some pages.
return allocatePage(size, consumer);
}
page.pageNumber = pageNumber;
pageTable[pageNumber] = page;
if (logger.isTraceEnabled()) {
logger.trace("Allocate page number {} ({} bytes)", pageNumber, acquired);
}
return page;
}
acquireExecutionMemory用于记录并返回该申请者实际申请到的内存。该方法使用UnifiedMemoryManager的acquireExecutionMemory方法计算申请者实际可申请的内存,并尽可能地spill其他申请者的内存空间到磁盘以满足该申请者的申请。
public long acquireExecutionMemory(long required, MemoryConsumer consumer) {
assert(required >= 0);
assert(consumer != null);
MemoryMode mode = consumer.getMode();
synchronized (this) {
long got = memoryManager.acquireExecutionMemory(required, taskAttemptId, mode);
// 中间省略的代码为spill其余申请者空间到磁盘的代码
consumers.add(consumer);
logger.debug("Task {} acquired {} for {}", taskAttemptId, Utils.bytesToString(got), consumer);
return got;
}
}
UnifiedMemoryManager管理内存
UnifiedMemoryManager中维护了两个内存池(只考虑off-heap):offHeapStorageMemoryPool和offHeapExecutionMemoryPool
UnifiedMemoryManager中封装了内存申请器(只考虑off-heap):tungstenMemoryAllocator
@GuardedBy("this")
protected val offHeapStorageMemoryPool = new StorageMemoryPool(this, MemoryMode.OFF_HEAP)
@GuardedBy("this")
protected val offHeapExecutionMemoryPool = new ExecutionMemoryPool(this, MemoryMode.OFF_HEAP)
private[memory] final val tungstenMemoryAllocator: MemoryAllocator = {
tungstenMemoryMode match {
case MemoryMode.ON_HEAP => MemoryAllocator.HEAP
case MemoryMode.OFF_HEAP => MemoryAllocator.UNSAFE
}
}
注意:上述关于UnifiedMemoryManager的变量都继承自MemoryManager
UnifiedMemoryManager通过ExecutionMemoryPool的acquireMemory方法申请内存空间。ExecutionMemoryPool中维护了一个hash map,用于记录任务所申请的主存空间。
@GuardedBy(“lock”)
private val memoryForTask = new mutable.HashMapLong, Long
ExecutionMemoryPool的acuireMemory方法对所有内存申请进行检查,返回其可申请的内存大小。假设ExecutionMemoryPool基准空间为x,当前空间为a,StorageMemoryPool基准空间为y,当前空间为(x + y - a),StorageMeomoryPool使用空间为z,正在执行的任务数量为n,则每个任务所能申请的最大主存空间为((x + y) - Min(y, z)) / n。同时该方法还保证了每个任务能申请到的最小主存空间a / 2n,即当任务进行内存申请时,若ExecutionMemoryPool无法满足其要求,且该任务当前使用的内存空间小于a / 2n时,该任务会被挂起,直到其它任务执行结束,ExecutionMemoryPool可以为其分配最小主存空间时再进行分配。
注意:spark只保证任务能申请到最小主存空间,不保证任务能申请到其需要的空间,当任务实际申请到的空间小于其需要空间时转移到上层进行处理
private[memory] def acquireMemory(
numBytes: Long,
taskAttemptId: Long,
maybeGrowPool: Long => Unit = (additionalSpaceNeeded: Long) => Unit,
computeMaxPoolSize: () => Long = () => poolSize): Long = lock.synchronized {
assert(numBytes > 0, s"invalid number of bytes requested: $numBytes")
if (!memoryForTask.contains(taskAttemptId)) {
memoryForTask(taskAttemptId) = 0L
// This will later cause waiting tasks to wake up and check numTasks again
lock.notifyAll()
}
while (true) {
val numActiveTasks = memoryForTask.keys.size
val curMem = memoryForTask(taskAttemptId)
maybeGrowPool(numBytes - memoryFree)
val maxPoolSize = computeMaxPoolSize()
val maxMemoryPerTask = maxPoolSize / numActiveTasks
val minMemoryPerTask = poolSize / (2 * numActiveTasks)
val maxToGrant = math.min(numBytes, math.max(0, maxMemoryPerTask - curMem))
val toGrant = math.min(maxToGrant, memoryFree)
if (toGrant < numBytes && curMem + toGrant < minMemoryPerTask) {
logInfo(s"TID $taskAttemptId waiting for at least 1/2N of $poolName pool to be free")
lock.wait()
} else {
memoryForTask(taskAttemptId) += toGrant
return toGrant
}
}
0L // Never reached
}
BlockManager在off-heap模式下cache时的内存使用
BlockManager中的doPutIterator方法控制block写入主存,在memoryMode=OFF_HEAP时,调用putIteratorAsBytes方法进行实际的写block操作,在该方法中block被序列化成字节数组,并通过UnifiedMemoryManager中的StorageMemoryPool进行内存空间管理。
改进方案
TaskMemoryManager提供的allocatePage和acquireExecutionMemory方法会使得我们申请的MemoryBlock被添加到TaskMemoryManager的维护列表(pageTable),同时还会在内存不够时将我们申请的空间spill到磁盘(尽管我们并不支持)。为了避免上述问题,同时我们又想让TaskMemoryManager记录我们所申请过的内存空间,我们可以针对UnsafeBuffer的需求在TaskMemoryManager中实现专用的allocateMemory方法和freeMemory方法,仅操作TaskMemoryManager中的memoryManager对象进行内存申请/释放记录。
注意事项:以往我们使用了tungsten管理器,但是未指定offHeap模式,所以一直是在堆内申请空间,但是我们使用UNSAFE申请的空间还是在堆外,意味着我们和spark的空间管理是不统一的。现在想要统一使用UnifiedMemoryManager同时管理spark和我们的内存,就需要开启spark.memory.offHeap.enabled,若未开启该参数则需禁用UnsafeUnfixedDecaMap
注意事项:on-heap模式下不能使用UnsafeUnfixedDecaMap,因为UnifiedMemoryManager不能同时管理堆外和堆内内存。