What Is Some Cricket Betting Tips for Online Cricket Betting?

The ease of use and accessibility of internet platforms has helped cricket betting site skyrocket in recent years. Some cricket betting tips In India will significantly increase your odds of winning…

Smartphone

独家优惠奖金 100% 高达 1 BTC + 180 免费旋转




Spark UDAF could be an option!

Calculate average on sparse arrays

This clustering has various applications, ranging from improving our prediction models to creating custom user segments (predefined lists of people sharing some known properties). For some of them, we need to normalize the resulting probabilities by their average at different levels. In our example, we calculate the average probabilities by country.

The input is the dataset described above. But we store sparse and short-encoded arrays to S3 to reduce storage costs.

Sparse arrays

Since our arrays are very sparse, we only want to store significant values (above some thresholds) into two arrays:

For example, considering a threshold of 0.1 and supposing that the missing values in the above table are zeros, the sparse version would be:

Short encoding:

After this processing, our input dataset looks like this:

We would like to decode these Short values to probabilities, have the sum of all probabilities per array be equal to 1, and then compute an average by array index (aka cluster). Here is the step by step illustration of what we want to achieve:

Step 1 — Probabilities are decoded back to Double.

Step 2 — Probabilities are normalized, the sum of probas should equal 1 for each user.

Step 3 — We calculate, by country, the average membership of people to each cluster.

The first intuition was to create a UDF that decode and normalize data and a second one to densify the arrays. We could have a single one to do the 3 operations. Having elementary UDFs is a best practice, check the “Avoid UDFs or UDAFs that perform more than one thing” paragraph in Part I for more details.

However, grouping some functions in the same UDF sometimes helps to avoid intermediate data structures and conversion operations between Spark’s internal data representation and the JVM. There is a trade-off to make between performance and code complexity.

Then we use Spark’s avg aggregation function to calculate the average by country for each cluster.

28 min on an EMR cluster of 50 r3.xlarge nodes (AWS instances).

Dataset size: 700GB in compressed parquet/snappy (a lot more when uncompressed, at least by a factor of 10) and 1.5 billion rows.

Let’s have a look at it, step by step:

WholeStageCodegen: Since version 2.0.0, Spark can optimize some physical operations by collapsing them into one java function that it generates on the fly. In our case, this includes the parquet file read operation and the execution of our two UDFs. This is explicit in the execution plan in the SQL tab of the Spark UI:

Exchange: this is the shuffle triggered by the aggregations ( avg ).

HashAggregate: we can see a first aggregation operation before the shuffle and another one after it. This is how some aggregations are implemented in Spark. The Data is pre-aggregated within each input partition before shuffling (also called partial aggregation or map side aggregation); it is then further aggregated post shuffle to obtain the global result.

In this first implementation, the densifying operation instantiates 100 length array for each row, this is time and memory consuming. Also, converting back all these arrays from Java objects to an InternalRow data-structure (Spark-SQL optimized representation) at the exit from densifyUdf is also a heavy operation.

Following our first results, a way to identify optimization leads could have been to perform a JVM profiling. In our case, we decided to directly try a UDAF to calculate the average on the arrays:

Then we use it in our job so we don’t need the densifyUdf anymore:

11 min on an EMR cluster of 50 r3.xlarge nodes. \o/

Dataset size: 700GB in compressed parquet/snappy and 1.5 billion rows.

The UDAF implementation is 2.5X faster than the first implementation.

So we tried to eliminate the decodeAndNormalizeUdf and apply its logic in the UDAF itself to see if it changes anything.

The inputSchema and the update methods of the UDAF become the following (the other methods remain the same):

The execution of the above took 10 min on the same cluster, so we can conclude it’s almost 10% faster.

An important difference that we noticed on the execution plan, is the peak memory by task. It’s more important with the UDF applied separately from the UDAF.

Here, we need to make a choice, keeping the UDF is more readable and granular but a little bit less efficient. We finally chose to put the decode function in the UDAF.

Even if the UDAF solution is much faster, it uses the sort-based aggregation (cf the execution plan) which is most of the time less performing than the hash-based aggregation since it involves some extra sorting that is useless in our case.

Why Spark uses it? Let’s dig a bit further.

Reading the code we understand that all fields of our bufferSchema should have mutable data-types. But wait! I thought we were actually using mutable types! Here is a reminder of our buffer schema, we are using an array of double and a long:

It shows that the Hash aggregation was not selected because we are using an ArrayType in the aggregation buffer.

To make sure Spark is able to perform a Hash aggregation we slightly edited the UDAF to use 100 (nbClusters) Double columns instead of an array.

At the end of the class we added some methods to the MutableAggregationBuffer via an implicit conversion to make the code a little bit less verbose and more readable.

Hurray! We are now using a hash-based aggregation again:

Avoiding the Array also avoids allocating a new array and copy all the data to the aggregating buffer on each row (on each update call) triggered by:

A thread dump of the previous implementation (with array) via the Spark UI showed this:

The execution of this new version only takes 9.2 min.

Yes! We gained another 8%.

Add a comment

Related posts:

Florida Board Of Education Passes Contentious Black History Curriculum Guidelines

New guidelines for teaching Black history in Florida were passed by the Florida Board of Education on Wednesday, drawing immediate criticism from those who saw the revisions as “a big step backward.”…