Defeat the Titans with salt !!!

Last time, we discussed how you could use Brickhouse’s sketch set implementation to scalably handle counting uniques.
Even with sketch sets, however, there are times when skew or unbalanced datasets can reak havoc with your jobs. Even when Hive uses map-side aggregation to reduce output, there are cases where a few “titans” can have orders of magnitude more data than most individuals.

Consider, for example, use-cases in social media, (perhaps in social networks similar to Twitter), where a few very famous celebrities are associated with the bulk of the data, and most users don’t produce that match. Imagine you needed to count all the unique retweeters for all Twitter users. Certain big stars, like Lady Gaga, or Justin Bieber, can have many millions of retweeters. Most people, however, don’t have many at all. Using sketch sets, you could write a query like the following:

This works fine for many data-sets, but with Twitter, the output from the few titans causes problems. Even though we don’t need to resort, like we would with count distinct, the job is still unbalanced, because one or two reducers will get this huge titanic output. These few reducers will be running long after the others have completed.

We can solve this problem, however, by taking advantage of the fact that we can merge sketches together. We can use a technique known as “salting” your query. We add a random number we call a “salt” to our grouping. Then we group our query by user_id and this newly random salt. This breaks up the work for these titans into different groupings, distributing the data across different reducers. Next we need to merge all those salted sketches to get the actual value. To do this, we use the union_sketch UDAF in Brickhouse, which aggregates sketch_sets together:

Since a sketch set is the set of 5000 items with the lowest MD5 hash values, they can be easily merged. Just merge all the values, and take the lowest 5000 hashes of the result. This can be used as an estimate of the size of the union of the two underlying sketches. ( There is also a combine_sketch UDF in Brickhouse, which will combine sketches in a non-aggregate manner.)

This technique isn’t just useful for sketch sets. It can be applied to almost any situation where you are doing an aggregation across an unbalanced dataset. We could have done something similar if we were just doing simple counts. You just need to be able to save intermediate results in a format which can then be merged together in a sec

Advertisements
Posted in Uncategorized | Leave a comment

When you absolutely have to do a count distinct

Previously, I discussed how horrible it was to attempt to perform a count distinct in Hive; how it would cause you to sort the universe, and then wait until the end of time until a single reducer to complete. The standard solution is to avoid doing an exact count, and using some probabilistic data structure, like KMV sketches or HyperLogLogs to do a count estimate.

Sometimes, however, you really do need to have an exact count. For example, when doing some data QA on your pipelines, you want to make sure that you haven’t accidentally dropped any records, or have some faulty logic which somehow introduced extra records. In this case, you want to make sure that some exact counts match in the data inputs and outputs.

How can you avoid the evils of count distinct ??? This installment’s guest blogger, Prantik Bhattaccharyya, discusses how you can use Brickhouse’s group_count UDF, along with a prudent distribute, to save the universe.

Read about it on his newly pressed blog

Posted in Uncategorized | 1 Comment

Hive and JSON made simple

It seems that JSON has become the lingua france for the Web 2.0 world. It’s simple, extendible, easily parsed by browsers, easily understood by humans, and so on. It’s no surprise then that a lot of our Big Data ETL tasks end up extracting JSON from some external system, aggregating and transforming it, and then outputting JSON to be consumed by various other systems.

Hive already has some builtin mechanisms to deal with JSON, but honestly, I think they are somewhat awkward. The `get_json_object` UDF allows you to pull out specific fields from a JSON string, but requires you to specify with XPATH, which can become hairy, and the output is always a string. There is also the `json_tuple` UDTF, but this is similar to performing multiple `get_json_object` calls at once. These UDF’s assume that you know what fields exist in the JSON ahead of time, and don’t handle structures like arrays and maps very well.

There are several JSON SerDe’s which attempt to simplify dealing with JSON, which can sometimes help, but often are not what we want. You generally define a table which has a single column, which is a JSON string. Often times that’s not the case. We have JSON with other columns ( perhaps an ID or timestamp) or multiple JSON columns to deal with. They often require that the JSON spec be known in advance, and can often be awkward dealing with recursive structures.

There also isn’t an easy way to generate JSON from Hive. The Hive code quickly becomes an ugly mess of `concat` functions, and with nested structures, it is often not clear if all the curly braces and brackets are aligned. It’s frustrating to waste hours of cluster-time (and realtime !!) producing a large data-set which is ‘not-quite’ valid JSON.

To make life easier, Brickhouse provides several UDF’s for parsing and generating JSON to and from Hive structures. Once parsed into a Hive type, data can be manipulated as one normally would ( with all the other UDFs in Brickhouse !!) and then output back into JSON.

json_split and json_map

Let’s start out with the simple specific cases.  Often we’ll have a string containing a JSON array, or a JSON map, and we simply want to interpret them as a Hive list or map. That’s what `json_split` and `json_map` does. ( It’s called `json_split` because it’s similar to the split method to turn a string into an array, but we’re dealing with JSON rather than a straight delimiter).

Without any extra arguments, it assumes that the values you want are strings.  However any valid (primitive) Hive type is possible.  Pass in the type you would like to interpret the type as into the second argument, and the returned value will be of that type. ( For example ‘bigint’ , ‘string,double’ ).

from_json

More often than not, however, we usually have something a little more complex.  With `from_json` we are able to parse any arbitrary JSON schema. To accomplish this, we need to pass as an argument a description of the schema.

There are two ways to do this.  One way is to pass in as the second argument an object of the type you would like returned, as a template object.  This is usually done using the `map()` or `array()` UDF’s to create an empty object whose type could be interpreted.

This was a little awkward to try to explain to people, however, so we introduced a string argument where you could just pass in the Hive type string for the returned type ( ie ‘array<map<string,double>>’  ).

JSON maps and named_struct‘s

In JSON, one can have maps where values can be of multiple types. One value in the map could be a string, and another could be an array.  To support this, sometime we don’t want to interpret a JSON map as a Hive map, but rather as a ‘named_struct’.  Using the named_struct in this way allows us to map any arbitrary JSON schema to a Hive type.

to_json

For the round trip, we just need to convert back into valid JSON. To do this, we just pass in the object we want to print out. The type of the object passed in is interpreted at query parse time to decide the format of the output.

Again, using named_struct’s are useful for generating JSON maps of heterogenous value types.   The Hive UDF `named_struct` is useful for generating arbitrary named_structs.

Converting to and from CamelCase and snake_case

There can be some more frustrations however. Consumers and producers of JSON are often of a Javascript ilk, and use a particular naming style called ‘CamelCase’, (where adding upper-case letters in your string add humps to the camel). Choosing between CamelCase, and the alternative , underscore or ‘snake’ case ( not sure sure where the snakes come from) is a religious debate which won’t be resolved in any of our lifetimes.

This wouldn’t be a problem, except for the fact that Hive types ( and HiveQL ) is case-independent. There is no difference to the Hive Parser between ‘value’ and ‘VALUE’ and ‘Value’ and ‘VaLuE’ and ‘vAlUe’. So we can never represent a CamelCase JSON schema as a Hive type.

As a cheap workaround, the ‘to_json‘ and ‘from_json‘ take an extra boolean argument, which is a flag to convert to and from CamelCase. ( A bit of a hack, perhaps, but let me know if you can think of something more elegant.). This allows us to output in a format that those Javascript developers will love, and have it cleanly map onto our hive types

I hope that helps clear some confusion about how to use the JSON UDF’s in Brickhouse. In our organization, dealing with JSON in Hive was possible, but extremely ugly and very brittle in practice. With the JSON UDF’s, I believe it to be just a little better.

Posted in Big Data, Hive | Tagged , , , | 32 Comments

Brickhouse version 0.6.0 released !!!

We are proud to announce the 0.6.0 release of Brickhouse. It is available for download via the sonatype maven repositories, or as a pre-built bundle on the Downloads page.

This release is mostly a bug-fix release, with some fixes to JSON and HBase UDF’s, and the ability to specify sketch set size for the KMV sketch set UDFs. We plan on making regular releases every few months, as needed. Please file a ticket on the github issues page if you find a bug, or would like to make a feature request.

Posted in Uncategorized | 2 Comments

Using sketch_set for reach estimation

A common problem I’ve seen in MapReduce for advertising analytics is calculating the number of unique values in a large data set. Usually the unique value represents a viewer, or cookie, or user. From a business end, it matters a lot if there is one visitor making a million impressions on your site, or a million visitors making one impression. This is where the term “reach” comes from; How many viewers does your website/application/tv show actually “reach” ???

For the sake of the examples, lets imagine that we have a table called “weblogs”, and that there is a column called “cookie”, which we’re trying to find the number of uniques, or the “reach”.

The Traditional Approach

So, How would you do this normally ??? Someone from a straight SQL background would probably try something like the following.

This may do the job, but let’s look at what it actually does

singler_reducer

It sorts the entire table by the key “cookie”, and then outputs to one reducer. In order to count uniques, it scans records in sorted order, and if the current value changes, it increments its count. Given a large dataset, however, this will become very inefficient; first because you essentially have to resort all your data, and second, because the job cannot utilize more than one reducer.

Enter the SketchSet

So how can the Brickhouse help us out here ? The Brickhouse provides a simple reach estimator, called a sketch set. It is similar to Min-Hash approaches, but basically by saving the K lowest hashes, we can estimate the size of the dataset. Here is the same query without distinct.

The method sketch_set is a UDAF which returns a representation of the data-set in the form of a array<string> ( which caps out at 5000 values. More on that later ). The UDF estimated_reach then evaluates the representation, and returns an approximation which is close to the actual number of uniques. In this case there is an aggregation, but you don’t have to sort your entire data-set.

Others can probably explain this more than I can, but essentially we are keeping track of the 5000 values which have the lowest MD5 hashes. This is generally known as a KMV sketch. If we have less than 5000 values, we just keep an array containing all of the values. If we have more than 5000 values, and we want to add a value, we check it’s MD5 hash. If the value is lower than the highest current value we have, we add it to the array, and then remove the value with the previous highest hash. In the array, we store the actual value, sorted in the order of its’ MD5 hash.

(I chose 5000 as the default size, because it had the right trade-off of accuracy to space for my needs. Accuracy is generally more than 99% and rarely less then 97%. Also, for low reach sets with less than 5000 elements, we have an exact number, because we have the elements. Also, Spoiler Alert: Brickhouse now contains experimental HyperLogLog UDF’s thats to our friends at )

How does it work ???

It’s pretty difficult to convince people that this approach will work, but I like to think of it the following way. A strong hash function, like MD5, will distribute hashes evenly among the whole range of numbers, from -MAX_LONG to +MAX_LONG for a Java 64 bit hash, and will have very few hash collisions, where two values produce the same hash. As you add more and more values to your sketch, the value of the 5000th lowest hash gets smaller and smaller, and the space of occupied hashes become denser and denser. If you had a set of 2*MAX_LONG unique elements, and your hash function was perfect, then the 5000’th lowest hash function would be -MAX_LONG + 5000.

Pre-aggregate your sketches

Another thing I like about sketch sets, is that they can be easily combined. Imagine that you wanted to create a daily report which calculated your website’s reach for the past 90 days. Instead of scanning 90 days of logs everyday, we can just create a daily sketch, and then merge them over a rolling time window. The UDAF union_sketch takes pre-generated sketch sets ( from the sketch_set UDAF or the convert_to_sketch UDF) and return a sketch set equivalent to the union of all those sets. Here is how you could estimate your 90 day reach:

Take Away

If you get anything out of this posting, and out of Brickhouse, it is that you can easily avoid doing a count distinct to count unique values, and hopefully save countless machine cycles, and countless hours waiting for results. There are data structures which do similar things, but I like sketch sets because they are simple to implement, and for low reach, you have the actual values. It caps the total amount of memory used, so that your jobs are sized roughly the same no matter how many uniques are actually counted. Since it is a list of 5000 strings, however, it might not be the best choice where one is overly concerned with the amount of storage or sort space. There are also some more fun tricks that you can do with them, which I will discuss in upcoming posts.

Posted in Uncategorized | Tagged , , , | Leave a comment

Squash the Long Tail with Brickhouse’s HBase UDFs

The problems we face with Data Science and Big Data often is that we often can express solutions to our problems in simple and elegant terms, which often works well with toy or limited datasets, but can never scale to work in the real world. The real challenge with Big Data often is what I call, “Getting the Turkey to fly.” You’ve got some map-reduce jobs or Hive queries which work fine on a limited set of data, but just fall over when data sizes hit certain boundaries.

Classical Bayesian Inference

Let’s go through a hypothetical example (based on a real-world problem) where Brickhouse helps you to “get the turkey to fly”. Consider the problem of topic assignment. You have a large number of text-based documents, and you would like to classify the documents to a topic, based upon the content of the text. A well-known technique for document classification is Bayesian Inference . Say you’ve examined a set of ground truth documents which have been manually curated to specific labels (i.e. topics), and came up with a set of weights useful for assigning features (i.e words) to certain labels, probably using some form of Tf-idf. Perhaps you’ve constructed this model in a Hive table similar to the following:

Now say, you want to classify documents (i.e. web pages, or social media content) that you process, possibly on a daily basis. Perhaps you’ve done some parsing on the documents to reduce them to a bag of words, possibly represented by a table similar to the following:

To classify these documents according to Bayesian inference, one basically sums the log of the feature weights associated with a label, and then take the argmax. One can express this elegantly with the following Hive query:

We simply just explode our document features and join with our model matrix, and sum the log of the products. Note that we can implement arg_max by using Brickhouse’s collect_max UDF, and passing an argument of 1.

This might be fine for a relatively small set of documents, given a large enough cluster, but consider what happens when the number of documents grows large. You’ve got to explode all your maps, and then sort them by feature. Eventually, you are just going to run out of sort space, and you’ll spend all your time writing and reading spilled records. Beyond a certain point, the turkey just won’t fly.

Distributed Cache to the rescue

Fortunately, Brickhouse has a few tools to help out the situation. First of all, we can use the distributed_map UDF to avoid the join, and to avoid the enormous resort of values. This UDF allows you to take a “key-value” table, which has been pushed to Hadoop’s distributed cache, using Hive’s “file” command, and access it as an in-memory map. This is similar in spirit to Hive’s map-side joins, but gives you more control, because you can explicitly decide when something gets pushed to the distributed cache.

To avoid our join, we can store our label weights per feature, and place them into the distibuted cache, similar to the following:

This is a little complicated, so lets go over this step by step. First we need to create a local file with our feature label weights. This is the insert overwrite local directory step. We use collect to generate a map of labels and weights for a given feature. (i.e all the topics a word might be associated with; the word “angel” might be associated with religion, startups, or baseball). The to_json converts the map into JSON. The distributed_map assumes that the file is stored in one or more text partfiles, ( not sequence or RCFiles), uncompressed, with a key and value separated by the standard Hive ‘^A’ delimiter, and rows separated by standard newline ‘\n’. Note the add file command which sends to table to each node in the cluster, by adding it to distributed cache.

The next query actually does the classification. The distributed_map UDF accesses the table we created in the first step, doing a lookup by feature, from the file ‘feature_label_weights’, and interpreting the value as a string type. The JSON we created in the first step is converted back into a map of weights with the from_json method. Note that we passed in a template object ( map(“label”,0.0) ), to tell the UDF the type of object we wanted to JSON to be parsed into. We multiply this “vector” by the scalar of how many times the word appears in the document. ( Some of the “vector” operations aren’t currently in brickhouse, but should make it in the near future. Consider the type map as a “vector”, where each string key is a dimension in some highly dimensional space, and the double is size of the vector in that dimension). Assume that there is a UDAF to aggregate vector addition (vector plus a vector), called union_vector_sum, which we use to sum all the label weights for the document. Assume that there is also an arg_max UDF, which returns the key in the vector map having the largest value. The label with the strongest weight is the one which is most likely to be associated with the document.

Note that we did explode our document features, but we don’t have a join anywhere, so there is no need to sort. Since doc_id is unique, much of the aggregation could potentially be done on the map-side, reducing data sent to reducers as well.


Enter the long tail

This approach actually works well as long as the distributed map stays relatively small, and can fit into memory. Depending on your settings, you can usually find an extra 200-300 megabytes to use for data structures like this. This approach probably will work for thousands of features, (possibly 10’s of thousands) and hundreds of labels, but eventually things will start getting cramped, and you will run into OutOfMemory errors. That might be fine for some applications, but what if you want to deal with the long tail? What if you wanted to handle potentially millions of words or phrases, and classify documents into potentially thousands of documents? Then we need to go a different way to get the turkey to fly.

Brickhouse’s HBase UDF’s

The solution I suggest is to replace our distributed map with an external key value store. Brickhouse provides some convenient UDFs to access HBase, so we will use that. It would also be possible that other key value stores could be used, like Redis or memcache, but often people already have an HBase cluster hanging around. Before we go any further, make sure the HBase servers you use are not outward-facing.

Load the feature_label_weights into HBase

First step, we need to create a table in HBase where we can store our label weights. This should be a relatively small and simple table, as far as HBase tables go, so only one column family, called ‘c’. To keep the table well balanced, we’ll predefine the splits ahead of time, so that hexadecimal keys will split evenly across region servers.

Now, we need to load our data. Hive already has an hbase-handler package, with a custom tableformat, but this is often is impractical for all but the smallest of tables, because it does single Put’s and Get’s, which can be inefficient. Brickhouse provides an hbase_batch_put UDF, which allows to batch multiple Puts together. Combined with some salting, we can load our HBase table in a fairly scalable manner.

Note the salting. We generate an MD5 hash for the feature name, which should spread values evenly, take a modulo 16 , and convert to hexadecimal to map onto our region servers, with the predefined splits. We then group and distribute by this salt, so that the puts will be batched together according to the region where they will be inserted, and distributed, so that the batch Puts will be done by different reducers, and spread evenly across one’s cluster. This can be visualized something like the following:

HBase Batch Put

Do the lookup with hbase_cached_get

This should get our label weights into HBase. Now we need to be able to access it, while scoring our documents. There is an hbase_get, and even an hbase_batch_get UDF in Brickhouse, but to avoid DOS’ing our servers, we introduced the hbase_cached_get UDF.

Very similar to before, but instead of using distributed_map, we use the HBase UDF. Also, we pass in a template object, so that the JSON from the table will automatically be converted into the map we need.

When this job starts up, it will generate a lot of traffic hitting your HBase server. But rapidly, this will settle down, and HBase will be called only occasional. Assuming that your dataset follows some bell-curve distribution, there will be a relatively small set of words which most people use. These will be loaded first, and then cached into memory, so it doesn’t need to be re-fetched. If our documents have some obscure words, then they will be looked up. But since they are obscure, this happens relatively infrequently. If enough words have been read, so that available memory fills up, the internal cache will be flushed, and items will be removed according to a LRU basis.

Conclusion

The point I want to make is that solving real-world problems, which scale to real-world volumes can be difficult, and even if you think you have a solution, you can hit a brick wall, where it just doesn’t scale. Hopefully, Brickhouse contains the tools to help conquer these problems, and others will utilize what we have learnt while developing Brickhouse. Here we discussed several groups of Brickhouse extensions; to_json and from_json which make it easy to decode very large JSON structures, distributed_map which allows you to access the distributed cache and avoid very large sorts, various vector operations which allow you to perform basic vector algebra on arrays and maps, and the HBase UDFs which allow to write and read from HBase, and to use HBase as a persistence layer beneath a distributed cache. That’s a lot to discuss in a single blog post.

Posted in Uncategorized | Tagged , , , , , , | 4 Comments

Exploding multiple arrays at the same time with numeric_range

Hive allows you to emit all the elements of an array into multiple rows using the explode UDTF, but there is no easy way to explode multiple arrays at the same time.

Say you have a table my_table which contains two array columns, both of the same size. (Say you had an ordered list of multiple values, possibly of different types). Then you need to have to explode the arrays, and have a row which contains the values from the two arrays.

This can be easily solved with Brickhouse’s numeric_range UDTF. It emits an integer according to a specified numeric range. It takes 1,2 or 3 arguments. For 1 argument, it emits an integer from 0 to n -1. For 2, it emits from the first argument to the second argument value – 1. For 3, it uses the third argument as an increment value.

The array_index UDF simply returns an array’s value at the i’th index. It is needed because currently Hive’s bracket [ ] operators support only constant values. ( See https://issues.apache.org/jira/browse/HIVE-1955 ). Brickhouse also contains a map_index UDF to return the value of a map for a particular key.

Posted in Uncategorized | 7 Comments