SparkListener as a SparkDriver health checker

org.apache.spark.scheduler.SparkListener

Interface for listening to events from the Spark scheduler. Note that this is an internal interface which might change in different Spark releases. Java clients should extend

— Spark ApiDocs

The problem

SparkListener could be a source of interesting events, like: application start/end, job start/end, executor add/remove. Once we run SparkContext remotely we establish connection between SparkDriver and Spark Masters.
Usually built-in functionality is enough to handle common failures like network connection breaks, crashes of workers or failures of masters.
But there could be some unexpected failures that require more attention. They are: killing the application (from inside Spark console) or very long network connection break (longer than spark.akka.heartbeat.interval), causing application killing but without any notification to the driver. The last one is the most dangerours because driver is still running, so the next job execution just hangs waiting for communication with master.

The solution

The idea is to add our own SparkListener implementation that tracks the activities: application start, application end, executor add, executor end.

Application start

Start monitoring of a driver health.

Application end

Reset executor counter. Force cleaning of SparkContext. Force reinitialization of SparkContext on the next request/job.

Executor add

Increment number of active executors.

Executor end

Decrement number of active executors. If the number is not more than 0, force cleaning of SparkContext as well as force reinitialization of SparkContext on the next request/job.

HealthCheckListener

Implement a class HealthCheckListener which is an implementation of SparkListener interface. Add property for catching application startup, first job startup and for managing executors counter.

Important
Once application is killed from inside Spark Console the HealthCheckListener.onApplicationEnd() method is called.
import org.apache.spark.scheduler.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

public class HealthCheckListener implements SparkListener {
  private AtomicBoolean appStarted = new AtomicBoolean(false);
  private AtomicInteger executorCounter = new AtomicInteger(0);

  public Boolean isAppStarted() { return appStarted.get(); }
  public Integer getExecutorCounter() { return executorCounter.get(); }

  @Override
  public void onApplicationStart(SparkListenerApplicationStart applicationStart) {
    appStarted.set(true);
  }
  @Override
  public void onJobStart(SparkListenerJobStart jobStart) {
    appStarted.set(true);
  }
  @Override
  public void onExecutorAdded(SparkListenerExecutorAdded executorAdded) {
    executorCounter.incrementAndGet();
  }
  @Override
  public void onExecutorRemoved(SparkListenerExecutorRemoved executorRemoved) {
    executorCounter.decrementAndGet();
  }
  @Override
  public void onApplicationEnd(SparkListenerApplicationEnd applicationEnd) {
    executorCounter.set(0);
  }

  ...
}

Add listener to the Spark ListenerBus. If your code is java code just call:

javaSparkContext.sc().addSparkListener(healthCheckListener);

Implement the rules for checking if driver is still in good condition and is able to handle the next requests/jobs.

if (healthCheckListener.isAppStarted() && healthCheckListener.getExecutorCounter() <= 0) {
  throw new IOException("NO EXECUTORS FOR THE JOB!");
}

After above check stop the SparkContext

javaSparkContext.stop();

The next request/job should recreate SparkContext.

Additional (important) Spark Network Settings

SparkConfig Property: allowMultipleContexts

The 1.5.1 version of Spark allows only one instance of SparkContext per JVM. In our case we have to have ability to recreate SparkContext (to stop existing and create new one). To achieve this, just set allowMultipleContexts property on the SparkConfig instance:

SparkConf sparkConf new SparkConf();
sparkConf.set("spark.driver.allowMultipleContexts", "true");

SparkConfig Property: askTimeout

The other, very important setting is askTimeout. Its default value is 120s. It stands for duration of an RPC ask operation to wait before timing out.

When application is killed in silence, so without application end notification to SparkDriver the askTimeout controls the time after which executors are gone. When they are not available, it means we have to recreate the driver.

This could happen when the network connection between the SparkDriver and Spark Master is broken for more than spark.akka.heartbeat.interval parameter (its default value is 1000s).

In my case I set the askTimeout to 10s as a reasonable time.

SparkConf sparkConf new SparkConf();
sparkConf.set("spark.rpc.askTimeout", "10");

results matching ""

    No results matching ""