Squash the Long Tail with Brickhouse’s HBase UDFs

The problems we face with Data Science and Big Data often is that we often can express solutions to our problems in simple and elegant terms, which often works well with toy or limited datasets, but can never scale to work in the real world. The real challenge with Big Data often is what I call, “Getting the Turkey to fly.” You’ve got some map-reduce jobs or Hive queries which work fine on a limited set of data, but just fall over when data sizes hit certain boundaries.

Classical Bayesian Inference

Let’s go through a hypothetical example (based on a real-world problem) where Brickhouse helps you to “get the turkey to fly”. Consider the problem of topic assignment. You have a large number of text-based documents, and you would like to classify the documents to a topic, based upon the content of the text. A well-known technique for document classification is Bayesian Inference . Say you’ve examined a set of ground truth documents which have been manually curated to specific labels (i.e. topics), and came up with a set of weights useful for assigning features (i.e words) to certain labels, probably using some form of Tf-idf. Perhaps you’ve constructed this model in a Hive table similar to the following:

Now say, you want to classify documents (i.e. web pages, or social media content) that you process, possibly on a daily basis. Perhaps you’ve done some parsing on the documents to reduce them to a bag of words, possibly represented by a table similar to the following:

To classify these documents according to Bayesian inference, one basically sums the log of the feature weights associated with a label, and then take the argmax. One can express this elegantly with the following Hive query:

We simply just explode our document features and join with our model matrix, and sum the log of the products. Note that we can implement arg_max by using Brickhouse’s collect_max UDF, and passing an argument of 1.

This might be fine for a relatively small set of documents, given a large enough cluster, but consider what happens when the number of documents grows large. You’ve got to explode all your maps, and then sort them by feature. Eventually, you are just going to run out of sort space, and you’ll spend all your time writing and reading spilled records. Beyond a certain point, the turkey just won’t fly.

Distributed Cache to the rescue

Fortunately, Brickhouse has a few tools to help out the situation. First of all, we can use the distributed_map UDF to avoid the join, and to avoid the enormous resort of values. This UDF allows you to take a “key-value” table, which has been pushed to Hadoop’s distributed cache, using Hive’s “file” command, and access it as an in-memory map. This is similar in spirit to Hive’s map-side joins, but gives you more control, because you can explicitly decide when something gets pushed to the distributed cache.

To avoid our join, we can store our label weights per feature, and place them into the distibuted cache, similar to the following:

This is a little complicated, so lets go over this step by step. First we need to create a local file with our feature label weights. This is the insert overwrite local directory step. We use collect to generate a map of labels and weights for a given feature. (i.e all the topics a word might be associated with; the word “angel” might be associated with religion, startups, or baseball). The to_json converts the map into JSON. The distributed_map assumes that the file is stored in one or more text partfiles, ( not sequence or RCFiles), uncompressed, with a key and value separated by the standard Hive ‘^A’ delimiter, and rows separated by standard newline ‘\n’. Note the add file command which sends to table to each node in the cluster, by adding it to distributed cache.

The next query actually does the classification. The distributed_map UDF accesses the table we created in the first step, doing a lookup by feature, from the file ‘feature_label_weights’, and interpreting the value as a string type. The JSON we created in the first step is converted back into a map of weights with the from_json method. Note that we passed in a template object ( map(“label”,0.0) ), to tell the UDF the type of object we wanted to JSON to be parsed into. We multiply this “vector” by the scalar of how many times the word appears in the document. ( Some of the “vector” operations aren’t currently in brickhouse, but should make it in the near future. Consider the type map as a “vector”, where each string key is a dimension in some highly dimensional space, and the double is size of the vector in that dimension). Assume that there is a UDAF to aggregate vector addition (vector plus a vector), called union_vector_sum, which we use to sum all the label weights for the document. Assume that there is also an arg_max UDF, which returns the key in the vector map having the largest value. The label with the strongest weight is the one which is most likely to be associated with the document.

Note that we did explode our document features, but we don’t have a join anywhere, so there is no need to sort. Since doc_id is unique, much of the aggregation could potentially be done on the map-side, reducing data sent to reducers as well.


Enter the long tail

This approach actually works well as long as the distributed map stays relatively small, and can fit into memory. Depending on your settings, you can usually find an extra 200-300 megabytes to use for data structures like this. This approach probably will work for thousands of features, (possibly 10’s of thousands) and hundreds of labels, but eventually things will start getting cramped, and you will run into OutOfMemory errors. That might be fine for some applications, but what if you want to deal with the long tail? What if you wanted to handle potentially millions of words or phrases, and classify documents into potentially thousands of documents? Then we need to go a different way to get the turkey to fly.

Brickhouse’s HBase UDF’s

The solution I suggest is to replace our distributed map with an external key value store. Brickhouse provides some convenient UDFs to access HBase, so we will use that. It would also be possible that other key value stores could be used, like Redis or memcache, but often people already have an HBase cluster hanging around. Before we go any further, make sure the HBase servers you use are not outward-facing.

Load the feature_label_weights into HBase

First step, we need to create a table in HBase where we can store our label weights. This should be a relatively small and simple table, as far as HBase tables go, so only one column family, called ‘c’. To keep the table well balanced, we’ll predefine the splits ahead of time, so that hexadecimal keys will split evenly across region servers.

Now, we need to load our data. Hive already has an hbase-handler package, with a custom tableformat, but this is often is impractical for all but the smallest of tables, because it does single Put’s and Get’s, which can be inefficient. Brickhouse provides an hbase_batch_put UDF, which allows to batch multiple Puts together. Combined with some salting, we can load our HBase table in a fairly scalable manner.

Note the salting. We generate an MD5 hash for the feature name, which should spread values evenly, take a modulo 16 , and convert to hexadecimal to map onto our region servers, with the predefined splits. We then group and distribute by this salt, so that the puts will be batched together according to the region where they will be inserted, and distributed, so that the batch Puts will be done by different reducers, and spread evenly across one’s cluster. This can be visualized something like the following:

HBase Batch Put

Do the lookup with hbase_cached_get

This should get our label weights into HBase. Now we need to be able to access it, while scoring our documents. There is an hbase_get, and even an hbase_batch_get UDF in Brickhouse, but to avoid DOS’ing our servers, we introduced the hbase_cached_get UDF.

Very similar to before, but instead of using distributed_map, we use the HBase UDF. Also, we pass in a template object, so that the JSON from the table will automatically be converted into the map we need.

When this job starts up, it will generate a lot of traffic hitting your HBase server. But rapidly, this will settle down, and HBase will be called only occasional. Assuming that your dataset follows some bell-curve distribution, there will be a relatively small set of words which most people use. These will be loaded first, and then cached into memory, so it doesn’t need to be re-fetched. If our documents have some obscure words, then they will be looked up. But since they are obscure, this happens relatively infrequently. If enough words have been read, so that available memory fills up, the internal cache will be flushed, and items will be removed according to a LRU basis.

Conclusion

The point I want to make is that solving real-world problems, which scale to real-world volumes can be difficult, and even if you think you have a solution, you can hit a brick wall, where it just doesn’t scale. Hopefully, Brickhouse contains the tools to help conquer these problems, and others will utilize what we have learnt while developing Brickhouse. Here we discussed several groups of Brickhouse extensions; to_json and from_json which make it easy to decode very large JSON structures, distributed_map which allows you to access the distributed cache and avoid very large sorts, various vector operations which allow you to perform basic vector algebra on arrays and maps, and the HBase UDFs which allow to write and read from HBase, and to use HBase as a persistence layer beneath a distributed cache. That’s a lot to discuss in a single blog post.

Advertisements
This entry was posted in Uncategorized and tagged , , , , , , . Bookmark the permalink.

4 Responses to Squash the Long Tail with Brickhouse’s HBase UDFs

  1. solste7en says:

    great write-up! very clear and easy to follow

  2. jyo says:

    Hi, brickhouse is cool, facing an issue while using hbase_cached_get
    select
    hb_key,
    vector_mult(
    hbase_cached_get( map( ‘table_name’, ‘feature_label_weights’,
    ‘family’, ‘c’,
    ‘qualifier’, ‘q’,
    ‘hbase.zookeeper.quorum’, ‘zk1,zk2,zk3’), hb_key, map(“label”, 0.0))
    )
    as label_scores
    from
    hb_bayes_load_view
    limit 100;

    FAILED: ClassCastException org.apache.hadoop.hive.serde2.objectinspector.StandardMapObjectInspector cannot be cast to org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector

    Any idea?

    • jeromebanks says:

      I’ll double check. Some of the UDF’s mentioned might not reflect what is in the current public release of Brickhouse.
      If you find anything else, please file an issue at Brickhouse’s github page

    • jeromebanks says:

      Just double checked. This is because the current vector_mult only handles arrays, not the “multi-dimensional maps” we describe in the blog. Issue #45 is open to handle this, but it involves open-sourcing some more of our internal code.

      We’ll make a new release in the near future (next week sometime) to fix this.

      thanks for pointing this out , and thanks for using Brickhouse !!!

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s