Return to Blog

Clojure on Hadoop: A New Hope

Factual’s U.S. Places dataset is built from tens of billions of signals. Our raw data is stored in HDFS and processed using Hadoop.

We’re big fans of the core Hadoop stack, however there is a dark side to using Hadoop. The traditional approach to building and running Hadoop jobs can be cumbersome. As our Director of Engineering once said, “there’s no such thing as an ad-hoc Hadoop job written in Java”.

Factual is a Clojure friendly shop, and the Clojure community led us to Cascalog. We were intrigued by its strength as an agile query language and data processing framework. It was easy to get started, which is a testament to Cascalog’s creator, Nathan Marz.

We were able to leverage Cascalog’s high-level features such as built-in joins and aggregators to abstract away the complexity of commonly performed queries and QA operations.

This article aims to illustrate Cascalog basics and core strengths. We’ll focus on how easy it is to run useful queries against data stored with different text formats such as csv, json, and even raw text.

Cascalog and Clojure

With Cascalog, you can customize your query logic – including text parsing – using the full power of Clojure. So anytime your Cascalog query needs to manipulate data or text in some non-trivial fashion, you have a full programming language at your disposal. There isn’t an artificial barrier between your programming language and your query language.

You’ll see this demonstrated in our examples. If you are unfamiliar with Clojure, just know that anything that begins with “defn” is a pure Clojure function and anything that begins with “?<-” is a Cascalog query.

Cascalog also leverages Clojure’s approach to REPL-based development. This means it’s easy to experiment rapidly with various queries. It’s not necessary to setup special directories or manually define tables.

Working with CSV Data

Say we have some city data in csv format, in a data-source called cities:

"New York, NY"
"Chicago, IL"
"Los Angeles, CA"
"San Francisco, CA"
"Seattle, WA"

To make sense of this data, let’s write a simple Clojure function that parses each line into a city and state field:

(defn cities-parser [line]
  (map #(.trim %) (first (csv/parse-csv line))))

The name of the function is cities-parser and line is the input parameter. We use a csv library to split each value by comma, then trim each value.

Next, let’s run a Cascalog query that makes use of cities-parser:

1  (?<- 
2    (stdout)
3    [?city ?state] 
4    (cities ?line) 
5    (cities-parser ?line :> ?city ?state))

Admittedly, it may look a little strange. Here’s what’s going on:

  • Line 1: ?<- indicates that this a Cascalog query that will be run immediately
  • Line 2: output the results to standard out
  • Line 3: These are the output variables, which are defined in line 4 and 5. Cascalog variables start with either ?, !, or !! (Each of which have different meaning, which is outside the scope of this article)
  • Line 4: cities is the name of our data source. In Cascalog terminology it is a generator and each entry in the data-source will be referred by the variable name ?line.
  • Line 5: The :> indicates assignment. cities-parser is called with the variable ?line and the results are assigned to ?city and ?state variable names.

    When you run the query, you should get the following results:

    Chicago         IL
    Los Angeles     CA
    New York        NY
    San Francisco	CA
    Seattle	        WA

    It’s that simple! With our cities-parser function we can now refer to the city and state fields individually. For example, if we only want the cities in California, we can write the following query:

    (?<- (stdout) [?city ?state] 
      (cities ?line) 
      (cities-parser ?line :> ?city ?state) 
      (= ?state "CA")))

    The last line is a predicate which only allows through records where the state equals “CA”.

    Working with JSON Data

    Working with JSON data is just as simple as CSV data. Let’s say we have some data on famous buildings, in a data-source called buildings:

    {"name":"Chrysler Building","city":"New York"}
    {"name":"Empire State Building","city":"New York"}
    {"name":"John Hancock Center","city":"Chicago"}
    {"name":"Walt Disney Concert Hall","city":"Los Angeles"}
    {"name":"Transamerica Pyramid","city":"San Francisco"}
    {"name":"Space Needle","city":"Seattle"}

    Just like we did for csv, we’ll define a function for parsing json:

    (defn buildings-parser [line]
      (map (json/parse-string line) [“name” “city”]))

    Here we use a json library to parse the json into a hash map. Then we get the values for the keys name and city.

    Next, let’s write a Cascalog query that uses our buildings-parser function to see what happens:

    (?<- (stdout) [?name ?city] 
      (buildings ?line) 
      (buildings-parser ?line :> ?name ?city))

    The structure of the query looks very similar to our earlier csv query. The main differences are the parsing function, data source, and variable names. Running the query should return the following results:

    John Hancock Center        Chicago
    Walt Disney Concert Hall   Los Angeles
    Chrysler Building          New York
    Empire State Building      New York
    Transamerica Pyramid       San Francisco
    Space Needle               Seattle

    Joining on Text Data

    Our data on famous buildings only has the name and city. Wouldn’t it be great if we could tie in our cities table so that we also have information about what state the building is located in?

    A powerful feature of Cascalog is the ability to do joins across datasets. We can use our data on cities to help make our buildings dataset more comprehensive.

    (?<- (stdout) [?name ?city ?state] 
      (buildings ?building_line) 
      (buildings-parser ?building_line :> ?name ?city) 
      (cities ?city_line) 
      (cities-parser ?city_line :> ?city ?state))

    In this query buildings and cities are the data sources. Joins are implicit in Cascalog. If you look at line 3 and 5, the variable name ?city occurs on both. Cascalog interprets this as a join, automatically joining records from buildings and cities where the ?city value is equivalent.

    John Hancock Center         Chicago          IL
    Walt Disney Concert Hall    Los Angeles      CA
    Chrysler Building           New York         NY
    Empire State Building       New York         NY
    Transamerica Pyramid        San Francisco    CA
    Space Needle                Seattle          WA

    In less than 10 lines of actual code, we were able to run a join against csv and json data.

    Getting More Advanced: Raw Text and Aggregation

    Being able to run Cascalog queries on json and csv data is great, but sometimes we have data that is less structured. In the last two examples, you may have noticed that the key to working with text in Cascalog is defining a parsing function that enables us to make sense of our data.

    For example, say we have building city information in sentence format, in a data-source called buildings-text:

    "The Chrysler Building is located in New York"
    "The Empire State Building is located in New York"
    "The John Hancock Center is located in Chicago"
    "The Walt Disney Concert Hall is located in Los Angeles"
    "The Transamerica Pyramid is located in San Francisco"
    "The Space Needle is located in Seattle"

    Let’s write a query to count the number of buildings per state.

    First let’s write a function that uses a regular expression to parse out the city from the sentences:

    (defn buildings-city-parser [line]
      (map #(nth (first (re-seq #"The .* is located in (.*)" line)) %) [1]))

    #”regex” is a regular expression in Clojure. The buildings-city-parser function captures the building’s city and returns it.

    Since we already have our city state data, we’ll re-use the code. But, we need to make a few changes:

    (def cities-subquery
      (<- [?city ?state] 
        (cities ?line)
        (cities-parser ?line :> ?city ?state)))

    Here we use <- instead of ?<-, the difference between the two forms is that <- defines a query, whereas ?<- defines a query and runs it. Since we will be nesting this query, we don’t want to run the query right away. Also, notice that we didn’t specify where to output the results. This is because we’ll be using the output of that subquery as the input to another query.

    Now, let’s write the Cascalog query that counts the number of buildings per city:

    (?<- (stdout) [?state ?count]
      (buildings-text ?line) 
      (buildings-city-parser ?line :> ?city) 
      (cities-subquery ?city ?state) 
      (c/count ?count))

    In the above query, buildings-text is the data-source. For each line of text, the city is parsed out. We’re also using cities-subquery as a data-source. We join on the ?city variable name and call the count aggregator function. The count function knows to count ?state because that is the only other variable being emitted.

    Running the query should return the following results:

    CA 2
    NY 2
    WA 1
    IL 1


    Cascalog is a powerful tool that allows users to work with large datasets on top of Hadoop, without having to think about mappers and reducers. It provides a powerful abstraction layer that allows the user to think in terms of transformations. These transformations are written in pure Clojure.

    Cascalog is flexible enough to handle different data formats. All you need to do is provide functions that parse that data. We aren’t constrained by SQL syntax limitations.

    Cascalog is written in Clojure, which runs on the JVM, so provides access to all Hadoop features, including custom file output formats and complex datatypes.

    In summary, we’ve been very happy with our adoption of Cascalog and are looking forward to using it for ever more ambitious projects at Factual.

    Source Code: