Using Scala to Read Really, Really Large Files – Part 1: Setup

 | 

This post provides background information on how the test data is gathered and implementation details for the curious.

To cut down the amount of noise when looking at each implementation and reduce the chance of introducing unneeded variances which might skew the measurements, we’ve extracted out the common elements to narrow the differences as much as possible to the way the files are read.

The only exception is the Java implementation, which is a very lightly modified version of this implementation, taken from the git repo of the original article.

Measurement Considerations

Collecting the data is handled by a bash script, which runs 10 rounds of 10 trials per implementation, in random order. This was run twice locally, on two different days, and once in an EC2 instance, for a total of 1800 data points. Traditionally benchmarking is done by warming up a JVM and running multiple trials only after the JIT optimizations have stabilized, however large file processing is more commonly performed as an offline process, so a cold JVM will give a more realistic benchmark for our common use-case.

Each implementation extends a common trait, to facilitate looping over them while running these tests. The consume method is intentionally synchronous and does not provide a way to abstract setup and tear-down. This is to ensure that our timing data includes the cost of the entire lifecycle.

trait FileReader {
  def description: String

  def consume(path: Path): Result
}

Screen IO can also be finicky and inconsistent in terms of how long a particular call takes, so in order to insulate our measurements from the cost of printing results, we’ll return a data structure with fields for the various bits of information we care about and print the results outside the timed section. Scala makes creating these data structures very easy:

final case class Result(
  lineCount: Int,
  specialNames: List[(Int, FullName)],
  donationsByMonth: Map[DateBucket, Int],
  mostCommonFirstName: FirstName,
  mostCommonFirstNameCount: Int
)

Choosing the Right Operation

fold aggregates a collection into a single result so it’s a good fit for this challenge and, as all of the frameworks provide this operation, it also provides a convenient way to ensure a fair comparison.

A Summary of Folding

We’ll be using a left-associative fold, which has this general shape:

def foldLeft[C[_], E, R](collection: C[E], zero: R)(merge: (R,E) => R): R

The exact signature will depend on the library, but they all require the same parts.

Collecting the Needed Parts

Because of the way the requirements are set up, the Result class we defined in the Introduction isn’t the best data structure for accumulating intermediate values.

These are the parts we’ll need:

  • A data structure for accumulating intermediate values
  • A function to merge a line into the intermediate data structure
  • A function to convert the intermediate data structure into Result

Accumulating using LineMetricsAccumulator

Defining this intermediate data is pretty straightforward, we start with a copy of Result, and incrementally modify the fields as the conversion and merge functions are written.

Because of the linear nature of blog posts, these are presented in an ordered fashion, but implementation was concurrent. The result of this process is unsurprisingly still very close to Result:

final class LineMetricsAccumulator(
  lineCount: Int,
  specialNames: List[(Int, FullName)],
  donationsByMonth: Map[DateBucket, Int],
  firstNameFrequency: Map[FirstName, Int]
)

The requirements specify a couple of particular lines that need recording. For simplicity, we’ll stash them in the companion object.

This data structure also needs to be able to represent the base state as a zero value, which we’ll also keep in the companion object.

object LineMetricsAccumulator {
  def empty: LineMetricsAccumulator = new LineMetricsAccumulator(-1, List.empty, Map.empty, Map.empty)
  val LineNumbersOfSpecialNames = Set(0, 432, 43243)
}

Merging a String into LineMetricsAccumulator

LineMetricsAccumulator is also a good place to store the logic for processing each line, because the shape of foldLeft allows us to take advantage of some of Scala’s syntactic sugar to simplify things. We’ll call this method addLine.

LineMetricsAccumulator is immutable, so addLine returns a new instance calculated from the current line and previous value.

  def addLine(line: String): LineMetricsAccumulator = line match {
    case Record(fullName, firstNameOpt, dateBucket) =>
      val lineNumber = lineCount + 1
      new LineMetricsAccumulator(
        lineCount = lineNumber,
        specialNames =
          if (LineMetricsAccumulator.LineNumbersOfSpecialNames.contains(lineNumber)) (lineNumber, fullName) :: specialNames
          else specialNames,
        donationsByMonth = donationsByMonth.updated(
          dateBucket,
          donationsByMonth.getOrElse(dateBucket, 0) + 1
        ),
        firstNameFrequency = firstNameOpt.fold(firstNameFrequency) { firstName =>
          firstNameFrequency.updated(
            firstName,
            firstNameFrequency.getOrElse(firstName, 0) + 1
          )
        }
      )
  }

Parsing the line is handled by pattern matching, using a custom extractor. These are an underused Scala gem, and provide a handy way to parse a line when we don’t care about errors.

object Record {
  def unapply(line: String): Option[(FullName, Option[FirstName], DateBucket)] = {
    val grabber = line.split('|').lift
    for {
      fullName <- grabber(7).map(FullName)
      rawDate  <- grabber(4)
    } yield (fullName, FirstName(fullName), DateBucket(rawDate))
  }
}

We’re using semantic types to help keep the field order clear. An additional bonus is that the extraction of the first name and relevant date parts can be offloaded to factories in the companion objects.

FirstName is more tolerant of bad data, as the requirements allow ignoring non-conforming values. Sorting by first name will be needed later, and unfortunately Scala doesn’t fallback to type of the wrapped value when implicits are missing so and we’ll need an explicit
delegation for Ordering.

final case class FirstName(value: String) extends AnyVal
object FirstName {

  /**
    * Extract the first name from a full name.
    *
    * @param fullName the full name, which is expected to be in the format `family, given` or `family, given middle`
    * @return [[scala.None]] if the format isn't what we expect, or [[scala.Some]] if we can pull a
    *         [[livongo.large_files_blog_post.common.FirstName]] out of the input.
    */
  def apply(fullName: FullName): Option[FirstName] = {
    for {
      firstAndMiddleNames <- fullName.value.split(',').tail.headOption.map(_.trim)
      if firstAndMiddleNames.nonEmpty
      firstName <- firstAndMiddleNames.split(' ').headOption
    } yield FirstName(firstName)
  }

  implicit val ordering: Ordering[FirstName] = Ordering.by(_.value)
}

DateBucket is considerably more strict. Because this is a challenge, if the extracted values don’t contain valid data, the assumption is that the program is more likely wrong than the data, so throwing an exception is reasonable in this case. Providing an Ordering and overriding toString make printing the results easier, and aren’t relevant to the timed portion of the code.

final case class DateBucket(year: Int, month: Int) {
  override def toString: String = "%04d-%02d".format(year, month)
}
object DateBucket {

  /**
    * Companion object constructors are a common pattern in Scala, and a good place
    * for really common initialization code.
    *
    * @param raw a string, with the expected format 'YYYYMM' (longer is OK, shorter will break things)
    */
  def apply(raw: String): DateBucket = {
    val year  = raw.take(4).toInt
    val month = raw.slice(4, 6).toInt
    DateBucket(year, month)
  }

  implicit val ordering: Ordering[DateBucket] = Ordering.by(db => db.year -> db.month)
}

Getting Results

Finalizing LineMetricsAccumulator into Result is straightforward, most of the values won’t need to be changed. The most common first name does need to be calculated, and thanks to the Ordering defined on FirstName, this is nearly trivial.

  def asResult: Result = {
    val (mostCommonFirstName, mostCommonFirstNameCount) =
    // This orders first by frequency, then alphabetically to break any ties.
      firstNameFrequency.maxBy(_.swap)
    Result(
      lineCount,
      specialNames,
      donationsByMonth,
      mostCommonFirstName,
      mostCommonFirstNameCount
    )
  }

Conclusion

With this completed implementation of LineMetricsAccumulator, we’re ready to start evaluating libraries.

/**
  * Holds the values needed to accumulate the results so far while processing the lines in the file.
  *
  * @param lineCount          number of lines before this one
  * @param specialNames       recorded names, and the indexes on which they were found - see
  *                           [[livongo.large_files_blog_post.common.LineMetricsAccumulator#SpecialLineNames]]
  * @param donationsByMonth   the number of donations in a given month
  * @param firstNameFrequency the number of times each first name appears
  */
final class LineMetricsAccumulator(
    lineCount:          Int,
    specialNames:       List[(Int, FullName)],
    donationsByMonth:   Map[DateBucket, Int],
    firstNameFrequency: Map[FirstName, Int]
) {

  /**
    * Progresses and incorporates the results of an additional line.
    *
    * This logic is common across the various ways of reading the file, so it's abstracted out here.
    *
    * @param line the raw line
    */
  def addLine(line: String): LineMetricsAccumulator = line match {
    case Record(fullName, firstNameOpt, dateBucket) =>
      val lineNumber = lineCount + 1
      new LineMetricsAccumulator(
        lineCount = lineNumber,
        specialNames =
          if (LineMetricsAccumulator.LineNumbersOfSpecialNames.contains(lineNumber))
            (lineNumber, fullName) :: specialNames
          else specialNames,
        donationsByMonth = donationsByMonth.updated(
          dateBucket,
          donationsByMonth.getOrElse(dateBucket, 0) + 1
        ),
        firstNameFrequency = firstNameOpt.fold(firstNameFrequency) { firstName =>
          firstNameFrequency.updated(
            firstName,
            firstNameFrequency.getOrElse(firstName, 0) + 1
          )
        }
      )
  }

  /**
    * Convert to [[livongo.large_files_blog_post.common.Result]] by running the only aggregation (most common first name)
    */
  def asResult: Result = {
    val (mostCommonFirstName, mostCommonFirstNameCount) =
      // This orders first by frequency, then alphabetically to break any ties.
      firstNameFrequency.maxBy(_.swap)
    Result(
      lineCount,
      specialNames,
      donationsByMonth,
      mostCommonFirstName,
      mostCommonFirstNameCount
    )
  }
}

object LineMetricsAccumulator {
  def empty: LineMetricsAccumulator = new LineMetricsAccumulator(-1, List.empty, Map.empty, Map.empty)
  val LineNumbersOfSpecialNames = Set(0, 432, 43243)
}

Up next: the standard approach