Recently, my colleagues and I have been working on a big high-loaded service that utilizes the Xgboost machine learning model and Dask as the tool for distributed data processing and forecast generating. Here I would like to share findings that we have been able to maximize the use of Dask for the purpose of data preparation and ML model fitting.
What is Dask?
Dask is a library for distributed processing of large amounts of data. The basic concept behind it is to divide large arrays into small parts (partitions).
This is how Dask Dataframes are stored and processed: tables can be split into small data frames (look at this as pandas DataFrames) so that there is no need to store the entire table in RAM. The entire source table may be too large to load into memory, but individual partitions can. In addition, such data storage allows efficient utilization of multiple processor cores to parallelize computations.
At the same time, the size of these partitions (chunks) is determined by the developer. Thus, the same dataframe can be divided into several partitions using for example ‘Split 1’ or ‘Split 2’ (Figure 1).
Choosing the optimal partition size is crucial, because if it is not optimal, the processing of the data can slow down. The optimal partition size depends on the size of the overall dataset, as well as on the resources of the server (or laptop) — the number of CPUs and available RAM.
Disclaimer: Further on, for convenience, we will measure the size of the dataset by the number of rows. All tables will consist of 4 columns (3 features + 1 target). When implementing the algorithm in the system, we built all dependencies not on the number of rows in the tables, but on the total number of elements (rows x columns).
Dask can be used to compute simple statistics and aggregations, but Dask can also be used to train big machine learning models (using a lot of data). For example, XGBoost. Since the service we were developing might require us to train a model on 2–10 million records using only 8–16 GB of RAM (in the case of small virtual machines), we decided to conduct experiments.
Even in the case of calculating simple statistics, the size of partitions is very important because it can significantly slow down the calculation algorithm in two cases:
- Partitions are too large so it takes too much time and resources to process them in RAM
- Partitions are too small so to process all of them Dask needs to load these tables into RAM too often — more time is spent on synchronization and uploading/downloading than on the calculations themselves
Thus, using the same computing resources can significantly degrade the performance of the program by choosing a non-optimal partition size (Figure 2). Figure 2 shows the time to fit the XGBoost model on Dask dataframes with different partition sizes. The average execution time over 5 runs is shown.
In this post, the algorithm for optimal size for Dask Dataframes partitions search is discussed. All tables shown in this post are used to fit the Dask Xgboost machine learning model. We will also share some tips that you may find helpful.
In the official Dask documentation there are web pages with tips on how to handle Dask objects (dataframes, arrays, etc.) correctly such as Dask DataFrames Best Practices.
On this page, in particular, you can see such advice:
You should aim for partitions that have around 100MB of data each.
However, this advice is rough and does not take into account the computing specifications of the server, the size of the source dataset and the specifics of solving the problem.
As mentioned above, it is assumed that the optimal partition size depends on the following three conditions:
- Size of full dataset;
- CPU resources (number of processes) which XGBoost and Dask can use;
- Available Random-access memory (RAM).
Thus, during the experiments, the number of computing resources was varied, as well as the size of the source dataset. Considered cases:
- Partition size, thousands of rows: [5, 10, 50, 100, 200, 300, 400, 500, 600, 700, 800, 900, 1000] (13 cases)
- Size of full dataset, thousands of rows: [100, 200, 300, 400, 500, 600, 1000, 2000, 3000, 4000] (10 cases)
- CPU resources (workers): [2, 3, 4] (3 cases)
- Available Random-access memory (RAM) per worker: [1GB, 2GB, 4GB] (3 cases)
Note: Worker in Dask is a process in a computer (on a server) that uses the computing resources allocated to it and runs in isolation and in parallel relative to other workers.
Thus, 1170 (13 x 10 x 3 x 3) cases were analyzed. To obtain more robust estimates of execution time, each case was launched 5 times. The metrics (execution time) were then averaged.
As part of the investigation, we wanted to find out the limits of the dataset size at which the virtual machine would not be able to handle the load in order to scale the service more successfully.
First, consider the general visualizations from the experiments. We performed runs with different numbers of CPU cores and amounts of RAM, as well as varying the size of the original dataset and the size of the partitions. After completing the experiments, we prepared a table showing only the optimal solutions (partition sizes). Optimal partition sizes are those at which the execution time with given conditions (RAM, CPU and source dataset size) was minimal. The correlation matrices of the collected metrics are shown in Figure 3.
From the graph you can see that the biggest influence on the execution time was obviously the size of the source dataset. The number of workers and amount of RAM also have a significant effect on the fit time. Chunk size has a relatively weak effect. However, this could be due to the fact that the dependence between execution time and partition size is nonlinear, which is confirmed by the curve from Figure 2. Also, Figure 4 confirms that the measurements were made correctly, because the results are consistent with our expectations.
Let’s look at the animation with 3d graphs (Animation 1).
In the animation, the optimal (for each combination of Processes number and RAM per worker) cases are circled in red. That is, the conditions in which the execution time of the algorithm was minimal for a given size of dataset, number of cores, RAM and partition size are shown. The graphs also show piecewise constant optimal surfaces in gray (NB: surface is global for all cases).
From the animation it can be seen that in some frames there is no data on experiments (no dots) (Figure 4). This means that the proposed computational resources were insufficient to run the model.
From the picture it can be observed that with this dataset size, if the number of cores is small, then the larger partitions should be formed. Note that this dependence does not hold for all cases.
Based on the results of launches with insufficient computational resources, the following visualization was prepared (Figure 5).
Also, based on the statistics collected on failed runs, a conclusion (tip) was reached: if the amount of memory is limited, it is more reliable to use a small partition size.
Based on this research, several tips for more effective configuration of the system based on the Dask XGBoost model were formed. Note that this study was conducted in order to run Dask more efficiently on relatively small servers (not having hundreds of gigabytes of RAM and tens of CPUs).
The experiment revealed the optimal hyperplanes. They are modeled using Gaussian processes. Based on this algorithm the optimal partition sizes are automatically selected (Animation 2).
As can be seen from the animation, on average, the optimal partition size decreases when the number of rows in the source dataset increases.
Conclusion (& tips)
I hope you were interested in reading about what size patrician proved to be the optimal size for training the XGBoost model.
I realize that this article has gotten very “technical”. Therefore, for those who managed to read it to the end, I will give some tips:
- If you are measuring execution time, always run the calculations several times and average the results since runtimes are stochastic;
- If you are in doubt about what size of partitions to choose, it is better to make a mistake to a smaller extent (otherwise the algorithm will not just run for a long time, but may crash with an error);
- To initialize a cluster locally in Dask, the cluster = LocalCluster() and Client(cluster) commands are used. We strongly recommend to initialize such a cluster only once in code using the Singleton pattern. You can see how it can be implemented in Python here. Otherwise, you will initialize a new cluster every launch;
- On average, the optimal partition size decreases when the number of rows in the source dataset increases