import org.apache.spark.scheduler.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
public class HealthCheckListener implements SparkListener {
private AtomicBoolean appStarted = new AtomicBoolean(false);
private AtomicInteger executorCounter = new AtomicInteger(0);
public Boolean isAppStarted() { return appStarted.get(); }
public Integer getExecutorCounter() { return executorCounter.get(); }
@Override
public void onApplicationStart(SparkListenerApplicationStart applicationStart) {
appStarted.set(true);
}
@Override
public void onJobStart(SparkListenerJobStart jobStart) {
appStarted.set(true);
}
@Override
public void onExecutorAdded(SparkListenerExecutorAdded executorAdded) {
executorCounter.incrementAndGet();
}
@Override
public void onExecutorRemoved(SparkListenerExecutorRemoved executorRemoved) {
executorCounter.decrementAndGet();
}
@Override
public void onApplicationEnd(SparkListenerApplicationEnd applicationEnd) {
executorCounter.set(0);
}
...
}
SparkListener as a SparkDriver health checker
org.apache.spark.scheduler.SparkListener
Interface for listening to events from the Spark scheduler. Note that this is an internal interface which might change in different Spark releases. Java clients should extend
The problem
SparkListener could be a source of interesting events, like: application start/end, job start/end, executor add/remove. Once we run SparkContext remotely we establish connection between SparkDriver and Spark Masters.
Usually built-in functionality is enough to handle common failures like network connection breaks, crashes of workers or failures of masters.
But there could be some unexpected failures that require more attention. They are: killing the application (from inside Spark console) or very long network connection break (longer than spark.akka.heartbeat.interval), causing application killing but without any notification to the driver. The last one is the most dangerours because driver is still running, so the next job execution just hangs waiting for communication with master.
The solution
The idea is to add our own SparkListener implementation that tracks the activities: application start, application end, executor add, executor end.
- Application start
-
Start monitoring of a driver health.
- Application end
-
Reset executor counter. Force cleaning of SparkContext. Force reinitialization of SparkContext on the next request/job.
- Executor add
-
Increment number of active executors.
- Executor end
-
Decrement number of active executors. If the number is not more than 0, force cleaning of SparkContext as well as force reinitialization of SparkContext on the next request/job.
HealthCheckListener
Implement a class HealthCheckListener which is an implementation of SparkListener interface. Add property for catching application startup, first job startup and for managing executors counter.
Important
|
Once application is killed from inside Spark Console the HealthCheckListener.onApplicationEnd() method is called. |
Add listener to the Spark ListenerBus. If your code is java code just call:
javaSparkContext.sc().addSparkListener(healthCheckListener);
Implement the rules for checking if driver is still in good condition and is able to handle the next requests/jobs.
if (healthCheckListener.isAppStarted() && healthCheckListener.getExecutorCounter() <= 0) {
throw new IOException("NO EXECUTORS FOR THE JOB!");
}
After above check stop the SparkContext
javaSparkContext.stop();
The next request/job should recreate SparkContext.
Additional (important) Spark Network Settings
SparkConfig Property: allowMultipleContexts
The 1.5.1 version of Spark allows only one instance of SparkContext per JVM. In our case we have to have ability to recreate SparkContext (to stop existing and create new one). To achieve this, just set allowMultipleContexts property on the SparkConfig instance:
SparkConf sparkConf new SparkConf();
sparkConf.set("spark.driver.allowMultipleContexts", "true");
SparkConfig Property: askTimeout
The other, very important setting is askTimeout. Its default value is 120s. It stands for duration of an RPC ask operation to wait before timing out.
When application is killed in silence, so without application end notification to SparkDriver the askTimeout controls the time after which executors are gone. When they are not available, it means we have to recreate the driver.
This could happen when the network connection between the SparkDriver and Spark Master is broken for more than spark.akka.heartbeat.interval parameter (its default value is 1000s).
In my case I set the askTimeout to 10s as a reasonable time.
SparkConf sparkConf new SparkConf();
sparkConf.set("spark.rpc.askTimeout", "10");