[SOL-239] Custom aggregate functions in EXAPowerlytics for large data sets Created: 17.02.2015 Updated: 24.07.2020 Resolved: 24.07.2020
|Reporter:||Captain EXASOL||Assignee:||Captain EXASOL|
Note: This solution is no longer maintained. For the latest information, please visit our Knowledge Base:
This solution demonstrates some basic principles for parallel programming with EXAPowerlytics.
The task for this tutorial is to create UDF replacements for the internal functions AVG and STDDEV in the R language using UDFs that run inside the data base. Clearly, we don't expect stellar performance here, especially when compared to EXASolution's built in versions. On the other hand, the provided solutions provide a blueprint for custom aggregate functions for customers.
The primary goals for our versions of AVG and STDDEV are:
For programming in EXAPowerlytics, there are a number of basic principles to keep in mind:
1: Scripts run in standard implementations of their respective language. This means, that UDFs in languages that have slower standard implementations will be slower than UDFs in languages that have faster implementations.
2. Resources for UDF scripts are always limited in order to avoid a UDF from freezing the machine. So, in UDFs, whenever the size of the input data is not limited, one has to take care of not storing all input data in temporary data structures as this very soon will use up all the resources which are available to the running script.
3. For scripts of type SCALAR-RETURNS or type SCALAR-EMITS, parallelism is automatically managed by EXASolution, if the script is invoked on a sufficiently large number of input rows.
4. For scripts of type SET-RETURNS or type SET-EMITS, parallelism is coupled to the GROUP BY clause of the SQL query which contains the script. Basically, it is guaranteed that each GROUP BY group is processed by a single invokation of the run() method of a script's instance. This allows to iterate over all the contents of a group and therefore enables users to create all kinds of aggregate functions. On the downside, one has to keep in mind that if the groups which are defined by the GROUP BY clause are too large, the script will not be able to store all data in temporary data structures. Hence, while it is possible to iterate over all the data via successive calls to ctx.next(), it is not possible to store all the data. In addition if the data to be processed is very large and the number of groups defined is very small, the number of parallel script instances also is very small, yielding suboptimal performance.
As a rule of thumb: With SET-RETURNS and SET-EMITS scripts, one should aim for medium sized groups (maybe 10000 to 100000 entries)
table n9 is a little helper for generating some test data.
Table big contains nearly 390 million rows
In our first attempt, we try to strictly follow the EXASolution manual which
For small tables, this function is good enough.
For larger tables, it fails with VM error: ...
Clearly, the problem is that inside r_stats(), we tried to read all the data at once which exceeded the resources available to the script instance.
For functions like AVG and STDDEV it is possible to avoid the problem above by following the MAP-REDUCE approach. Here, the computation is split into two functions:
Then in SQL, a queries of the form
is used to create the Map-Reduce processes. In EXAPowerlytics, map() functions correspond to the UDF types SET-RETURNS and SET-EMITS these functions are computed independently for each "GROUP BY" group, hence, by using an appropriate GROUP BY clause, it is possible to control the parallelism of query execution (see principle no. 4 above)
First, let's focus on computing the AVG in MAP-REDUCE style The map() function is called many times in parallel. By later using appropriate GROUP BY clause, we will limit the size of the input data and therefore here we can simply read all data at once in this function
Our version of the reduce() function is called only once (other approaches are possible as well). This means, that there is no enforced limit on the size of the input data which means that we must loop over the data in small pieces. It is very important to avoid reading all the data at once.
With map() and reduce() functions in place, we now have to create a SQL query which combines both and also has a proper GROUP BY clause in order to achieve good parallelism and not to exceed the resource limits, especially in the map() functions. By the rule of thumb above we aim to create groups of sizes about 10000 to 100000 elements. So for our queries on the table BIG, we aim to create a group by clause which produces around
(=38742.0489) different groups of nearly identical size. This is achieved by using a dynamic group by clause like
shows the number of group created and the statement
shows the statistics of the group size.
On a (small and old) test cluster this took around 16 seconds As expected, this is really slow, compared to the 0.6 seconds of the internal AVG function:
On the other hand, this is interpreted and slow generic R vs. extremly optimized C++, hence, a performance difference of two orders of magnitude is not really a big surprise.
Now after computing the AVG function let's tackle STDDEV, again using MAP-REDUCE style. We implement the same definition of STDDEV, namely corrected sample standard deviation, which is implemented in EXASolution.
Again, we start with the map() function. Later, via GROUP BY, we will guarantee that the input size is not too large, so we can programm it rather carelessly:
Again the reduce function is more complicated as the input size is not limited.
When calling the above map() and reduce() functions, we have to provide some value for the average. This can be achieved by joining a scalar value to the BIG table:
actually, our map() and reduce() functions only compute the sum inside the formula above, the remainder of the formula for STDDEV is evaluated in standard SQL.
Clearly, providing the average manually in the query is not nice. In EXAPowerlytics it possible to combine several MAP-REDUCE computations, hence we simply can replace (select 5 as m) by the MAP-REDUCE computation above as
In this solution we described basic principles for programming in EXAPowerlytics. With some care it is straightforward to create powerful distributed and parallel in database computations using just a few lines of R. Regarding performance these computations cannot compete with highly optimized builtin functions of EXASolution but it is well in the range of what could be desired from a parallel programming environment for languages like R, Python, or Java at a scale these languages are not inherently designed for and with the ease of simple scripting.
|Category 1:||UDFs and In-Database Analytics|