Dispatcher of Spark3 source code intensive reading

Introduction to Dispatcher (chatgpt)

In Apache Spark, Dispatcher is a component of the Spark Master process, which is used to receive and process requests from client applications, such as submitting applications, killing applications, and so on. Dispatcher, as part of the Spark Master process, is the main entry point for processing client requests. When client applications need to interact with the Spark Master, they send requests to the Dispatcher, which is responsible for routing the requests to the corresponding Spark Master components for processing.

The main functions of Dispatcher are:
1. Receive and process requests from client applications: client applications can interact with the Spark cluster by sending requests to the Spark Master, such as submitting applications, querying cluster status, killing applications, etc. Dispatcher is responsible for receiving these requests and assigning them to the corresponding Spark Master components for processing.
2. Maintain request queue: When Dispatcher receives a request, it puts the request in the queue and sorts it according to the request type and priority. Doing so ensures that requests are processed in a timely manner and avoids contention among concurrent requests.
3. Routing requests to corresponding Spark Master components: Dispatcher will route requests to corresponding Spark Master components for processing according to the request type and priority. For example, if the request is to submit an application, Dispatcher will send it to the application manager for processing; if the request is to kill the application, Dispatcher will send it to the application manager or task scheduler for processing .

In general, Dispatcher is a key component of the Spark Master process, responsible for coordinating and processing requests from client applications, so that the Spark cluster can efficiently process tasks and provide services.

Dispatcher initialization code

Dispatcher is created when NettyRpcEnv is instantiated.

private val dispatcher: Dispatcher = new Dispatcher(this, numUsableCores)

In the Dispatcher class, there are several parameters that need attention.

private val endpoints: ConcurrentMap[String, EndpointData] = new ConcurrentHashMap[String, EndpointData]

private val endpointRefs: ConcurrentMap[RpcEndpoint, RpcEndpointRef] = new ConcurrentHashMap[RpcEndpoint, RpcEndpointRef]

// Track the receivers whose inboxes may contain messages.
private val receivers = new LinkedBlockingQueue[EndpointData]

/** Message loop used for dispatching messages. */
private class MessageLoop extends Runnable {
  override def run(): Unit = {
    try {
      while (true) {
        try {
          val data = receivers.take()
          if (data == PoisonPill) {
            // Put PoisonPill back so that other MessageLoops can see it.
            receivers.offer(PoisonPill)
            return
          }
          data.inbox.process(Dispatcher.this)
        } catch {
          case NonFatal(e) => logError(e.getMessage, e)
        }
      }
    } catch {
      case ie: InterruptedException => // exit
    }
  }
}

private class EndpointData(
    val name: String,
    val endpoint: RpcEndpoint,
    val ref: NettyRpcEndpointRef) {
  val inbox = new Inbox(ref, endpoint)
}

From the code, you can see that receivers is a Queue, which is used to transmit messages, and it transmits EndpointData with inbox (the message queue is stored in the inbox, and an OnStart() message). And MessageLoop is a thread that keeps getting messages from the receivers queue and processing them.
How to deal with the message, let's look at the simplified code, here is to call the onStart() method of the endpoint in the inbox.

/**
 * Process stored messages.
 */
def process(dispatcher: Dispatcher): Unit = {
  var message: InboxMessage = null
  inbox.synchronized {
    message = messages.poll()
  }
  while (true) {
    safelyCall(endpoint) {
      message match {
        case OnStart =>
          endpoint.onStart()
      }
    }
  }
}

So far, we know that Dispatcher is continuously fetching messages from Queue and calling different methods of other endpoint s (only the Onstart() method is introduced here, interested students can read the source code.)

Tags: Big Data Distribution Spark

Posted by stefharley on Sun, 12 Mar 2023 11:02:37 +1030