Spark: Unit Testing HDFS interactions

Testing of Spark jobs that use Hadoop’s FileSystem API require minimal DFS implementation.

Hadoop’s tests include simplified, powerful and able to run locally implementation of the MiniDFSCluster.

In order to use it, it is necessary to do serveral steps:

Configuration of gradle dependencies

repositories {
  mavenCentral()
  mavenLocal()
  jcenter()
}

dependencies {
  // Apache Spark core - includes hadoop dependencies
  compile "org.apache.spark:spark-core_2.10:1.5.1"

  // junit for unit testing
  testCompile 'junit:junit:4.12'

  // MINICluster, that containts MiniDFSCluster implementation
  testCompile "org.apache.hadoop:hadoop-minicluster:2.7.1"
}

configurations.all {
  resolutionStrategy {
    // force certain versions of dependencies (including transitive)
    //  *append new forced containers:
    force(
      // hadoop we are working with is in the latest version
      "org.apache.hadoop:hadoop-client:2.7.1"
    )
  }
}

Testing on Windows platform

Unit testing on MS Windows requires native libraries to be installed in the system.

$ git clone https://github.com/sardetushar/hadooponwindows.git

Add the following variables to your shell (either bash (for git-bash for example) or windows)

export HADOOP_HOME=$PATH_TO_/hadooponwindows-master
export PATH=$PATH:$HADOOP_HOME/bin

The instruction/tutorial for Windows platform could be found in the following guide.

Setup unit testing infrastructure

In your unit test class define the setup() method with @Before annotation. This method will be invoked before every unit test. Inside it instantiate root cluster folder, cluster configuration, cluster itself and Hadoop’s FileSystem.

Define tearDown() method with @After annotation. This method will be invoked after every test. Inside it destroy cluster as well as spark context.

In order to see logging statements from you test cases or classes under testing add log4j settings on the class level - the setupTests() method.

public class HdfsTests {
  private Configuration conf;
  private FileSystem fs;
  private MiniDFSCluster cluster;
  private JavaSparkContext sparkContext;

  @BeforeClass
  public static void setupTests() throws Exception {
    // Force logging level for a job class
    LogManager.getLogger(YourJobClass.class).setLevel(Level.DEBUG);
  }

  @Before
  public void setup() throws Exception{
    File testDataPath = new File(getClass().getResource("/minicluster").getFile());
    System.clearProperty(MiniDFSCluster.PROP_TEST_BUILD_DATA);
    conf = new HdfsConfiguration();
    File testDataCluster1 = new File(testDataPath, "cluster1");
    String c1PathStr = testDataCluster1.getAbsolutePath();
    conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, c1PathStr);
    cluster = new MiniDFSCluster.Builder(conf).build();
    fs = FileSystem.get(conf);

    SparkConf conf = new SparkConf()
      .setAppName("LOG_ANALYZER")
      .setMaster("local")
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .set("spark.io.compression.codec", "lz4");
    sparkContext = new JavaSparkContext(conf);
  }

  @After
  public void tearDown() {
    if (cluster != null) {
      cluster.shutdown();
      cluster = null;
    }
    if (sparkContext != null) {
      sparkContext.stop();
      sparkContext = null;
    }
  }
}

Additional helper methods

  • copyFromLocalDir() - copy all files from local directory to HDFS path.

  private void copyFromLocalDir(FileSystem fs, File localDir, Path toPath) throws Exception {
    File[] localDirFiles = localDir.listFiles();
    Path[] localDirPaths = Arrays.stream(localDirFiles).map(f -> new Path(f.getAbsolutePath())).toArray(size -> new Path[size]);
    fs.copyFromLocalFile(false, false, localDirPaths, toPath);
  }
  • writeHDFSContent() - write content in form strings array to the file in HDFS.

  private void writeHDFSContent(FileSystem fs, Path path, String fileName, String[] content) throws IOException {
    Path filePath = new Path(path, fileName);
    FSDataOutputStream out = fs.create(filePath);
    for (String c : content) {
      out.writeBytes(c);
    }
    out.close();
  }
  • readHDFSContent() - read content of the HDFS file.

  private String[] readHDFSContent(FileSystem fs, Path path, String fileName) throws IOException {
    Path filePath = new Path(path, fileName);
    assertTrue("File [" + filePath.toString() + "] has to exist", fs.exists(filePath));
    FileStatus fileStatus = fs.getFileStatus(filePath);
    assertTrue("File [" + filePath.toString() + "] cannot be a directory", !fileStatus.isDirectory());
    BufferedReader reader = new BufferedReader(new InputStreamReader(fs.open(filePath)));
    List<String> lines = Lists.newArrayList();
    for (String line = reader.readLine(); line != null; line = reader.readLine()) {
      lines.add(line);
    }
    return lines.toArray(new String[0]);
  }
  • checkHDFSFile() - check if file exists in HDFS

  private void checkHDFSFile(FileSystem fs, Path path, String fileName) throws IOException {
    Path filePath = new Path(path, fileName);
    assertTrue("File [" + filePath.toString() + "] should exist", fs.exists(filePath));
    FileStatus fileStatus = fs.getFileStatus(filePath);
    assertTrue("File [" + filePath.toString() + "] is not a directory", !fileStatus.isDirectory());
  }

results matching ""

    No results matching ""