Spark源码分析(6)

Spark 源码分析(6)

Posted by UNMOVE on April 25, 2018

UnsafeBuffer的分配和释放

调研在spark中使用UnsafeBuffer分配/释放的内存使用TaskMemoryManager进行记录。

TaskMemoryManager

TaskMemoryManager持有的对象说明(方案相关):

  1. memoryManager(实际是UnifiedMemoryManager):用于实际的申请内存检查及内存申请。

  2. consumers:申请过内存的申请者集合。用于在内存不够时将部分对象申请的内存spill到磁盘。

  3. pageTable:页面表。记录申请的页面。

任务在申请内存时都会调用到TaskMemoryManager的allocatePage方法,该方法接受两个参数:申请的内存大小和申请者。申请者需要是继承并实现了MemoryConsumer相关接口的类。方法内部调用acquireExecutionMemory方法进行内存分配记录,调用UnifiedMemoryManager的相关接口进行内存申请,申请到的page会加入到pageTable中进行维护。

  public MemoryBlock allocatePage(long size, MemoryConsumer consumer) {
    assert(consumer != null);
    assert(consumer.getMode() == tungstenMemoryMode);
    if (size > MAXIMUM_PAGE_SIZE_BYTES) {
      throw new IllegalArgumentException(
        "Cannot allocate a page with more than " + MAXIMUM_PAGE_SIZE_BYTES + " bytes");
    }

    long acquired = acquireExecutionMemory(size, consumer);
    if (acquired <= 0) {
      return null;
    }

    final int pageNumber;
    synchronized (this) {
      pageNumber = allocatedPages.nextClearBit(0);
      if (pageNumber >= PAGE_TABLE_SIZE) {
        releaseExecutionMemory(acquired, consumer);
        throw new IllegalStateException(
          "Have already allocated a maximum of " + PAGE_TABLE_SIZE + " pages");
      }
      allocatedPages.set(pageNumber);
    }
    MemoryBlock page = null;
    try {
      page = memoryManager.tungstenMemoryAllocator().allocate(acquired);
    } catch (OutOfMemoryError e) {
      logger.warn("Failed to allocate a page ({} bytes), try again.", acquired);
      // there is no enough memory actually, it means the actual free memory is smaller than
      // MemoryManager thought, we should keep the acquired memory.
      synchronized (this) {
        acquiredButNotUsed += acquired;
        allocatedPages.clear(pageNumber);
      }
      // this could trigger spilling to free some pages.
      return allocatePage(size, consumer);
    }
    page.pageNumber = pageNumber;
    pageTable[pageNumber] = page;
    if (logger.isTraceEnabled()) {
      logger.trace("Allocate page number {} ({} bytes)", pageNumber, acquired);
    }
    return page;
  }

acquireExecutionMemory用于记录并返回该申请者实际申请到的内存。该方法使用UnifiedMemoryManager的acquireExecutionMemory方法计算申请者实际可申请的内存,并尽可能地spill其他申请者的内存空间到磁盘以满足该申请者的申请。

  public long acquireExecutionMemory(long required, MemoryConsumer consumer) {
    assert(required >= 0);
    assert(consumer != null);
    MemoryMode mode = consumer.getMode();
    synchronized (this) {
      long got = memoryManager.acquireExecutionMemory(required, taskAttemptId, mode);

      // 中间省略的代码为spill其余申请者空间到磁盘的代码

      consumers.add(consumer);
      logger.debug("Task {} acquired {} for {}", taskAttemptId, Utils.bytesToString(got), consumer);
      return got;
    }
  }

UnifiedMemoryManager管理内存

UnifiedMemoryManager中维护了两个内存池(只考虑off-heap):offHeapStorageMemoryPool和offHeapExecutionMemoryPool

UnifiedMemoryManager中封装了内存申请器(只考虑off-heap):tungstenMemoryAllocator

  @GuardedBy("this")
  protected val offHeapStorageMemoryPool = new StorageMemoryPool(this, MemoryMode.OFF_HEAP)

  @GuardedBy("this")
  protected val offHeapExecutionMemoryPool = new ExecutionMemoryPool(this, MemoryMode.OFF_HEAP)

  private[memory] final val tungstenMemoryAllocator: MemoryAllocator = {
  tungstenMemoryMode match {
    case MemoryMode.ON_HEAP => MemoryAllocator.HEAP
    case MemoryMode.OFF_HEAP => MemoryAllocator.UNSAFE
  }
  }

注意:上述关于UnifiedMemoryManager的变量都继承自MemoryManager

UnifiedMemoryManager通过ExecutionMemoryPool的acquireMemory方法申请内存空间。ExecutionMemoryPool中维护了一个hash map,用于记录任务所申请的主存空间。

  @GuardedBy(“lock”)
  private val memoryForTask = new mutable.HashMapLong, Long

ExecutionMemoryPool的acuireMemory方法对所有内存申请进行检查,返回其可申请的内存大小。假设ExecutionMemoryPool基准空间为x,当前空间为a,StorageMemoryPool基准空间为y,当前空间为(x + y - a),StorageMeomoryPool使用空间为z,正在执行的任务数量为n,则每个任务所能申请的最大主存空间为((x + y) - Min(y, z)) / n。同时该方法还保证了每个任务能申请到的最小主存空间a / 2n,即当任务进行内存申请时,若ExecutionMemoryPool无法满足其要求,且该任务当前使用的内存空间小于a / 2n时,该任务会被挂起,直到其它任务执行结束,ExecutionMemoryPool可以为其分配最小主存空间时再进行分配。

注意:spark只保证任务能申请到最小主存空间,不保证任务能申请到其需要的空间,当任务实际申请到的空间小于其需要空间时转移到上层进行处理

  private[memory] def acquireMemory(
    numBytes: Long,
    taskAttemptId: Long,
    maybeGrowPool: Long => Unit = (additionalSpaceNeeded: Long) => Unit,
    computeMaxPoolSize: () => Long = () => poolSize): Long = lock.synchronized {
    assert(numBytes > 0, s"invalid number of bytes requested: $numBytes")

    if (!memoryForTask.contains(taskAttemptId)) {
      memoryForTask(taskAttemptId) = 0L
      // This will later cause waiting tasks to wake up and check numTasks again
      lock.notifyAll()
    }

    while (true) {
      val numActiveTasks = memoryForTask.keys.size
      val curMem = memoryForTask(taskAttemptId)
      maybeGrowPool(numBytes - memoryFree)

      val maxPoolSize = computeMaxPoolSize()
      val maxMemoryPerTask = maxPoolSize / numActiveTasks
      val minMemoryPerTask = poolSize / (2 * numActiveTasks)

  
      val maxToGrant = math.min(numBytes, math.max(0, maxMemoryPerTask - curMem))
      val toGrant = math.min(maxToGrant, memoryFree)

      if (toGrant < numBytes && curMem + toGrant < minMemoryPerTask) {
        logInfo(s"TID $taskAttemptId waiting for at least 1/2N of $poolName pool to be free")
        lock.wait()
      } else {
        memoryForTask(taskAttemptId) += toGrant
        return toGrant
      }
    }
    0L  // Never reached
  }

BlockManager在off-heap模式下cache时的内存使用

BlockManager中的doPutIterator方法控制block写入主存,在memoryMode=OFF_HEAP时,调用putIteratorAsBytes方法进行实际的写block操作,在该方法中block被序列化成字节数组,并通过UnifiedMemoryManager中的StorageMemoryPool进行内存空间管理。

改进方案

TaskMemoryManager提供的allocatePage和acquireExecutionMemory方法会使得我们申请的MemoryBlock被添加到TaskMemoryManager的维护列表(pageTable),同时还会在内存不够时将我们申请的空间spill到磁盘(尽管我们并不支持)。为了避免上述问题,同时我们又想让TaskMemoryManager记录我们所申请过的内存空间,我们可以针对UnsafeBuffer的需求在TaskMemoryManager中实现专用的allocateMemory方法和freeMemory方法,仅操作TaskMemoryManager中的memoryManager对象进行内存申请/释放记录。

注意事项:以往我们使用了tungsten管理器,但是未指定offHeap模式,所以一直是在堆内申请空间,但是我们使用UNSAFE申请的空间还是在堆外,意味着我们和spark的空间管理是不统一的。现在想要统一使用UnifiedMemoryManager同时管理spark和我们的内存,就需要开启spark.memory.offHeap.enabled,若未开启该参数则需禁用UnsafeUnfixedDecaMap

注意事项:on-heap模式下不能使用UnsafeUnfixedDecaMap,因为UnifiedMemoryManager不能同时管理堆外和堆内内存。