Data Engineering: Fast Spatial Joins Across ~2 Billion Rows on a Single Old GPU | by Daniel Voyce | May, 2023


Challenges in Dask Distributed Implementation

In order to utilise CuSpatial’s spatial join function “point_in_polygon”, the latitude and longitude points must be stored in an interleaved array format:

#interleaved

[
lon,
lat,
lon,
lat,
lon,
lat,
lon,
...
]

#instead of

[[lon,lat],[lon,lat],[lon,lat],[lon,lat],[lon,lat],...]

This specific arrangement is likely due to the efficiency of GPU stream processing for such data structures. The main challenge was handling a dataframe larger than the GPU memory and transforming it into the required interleaved array format.

Initially, I attempted to use dask_cudf to partition the points data I was reading, hoping that it would be sufficient to execute the point_in_polygon function. However, I soon realised that the need to interleave the points made it impossible unless the points (10GB) could fit into the GPU memory (6GB).

In retrospect, the solution was to use map_partitions to process the data in the dask_cudf frame. The interleaving needed to occur within the map_partitions function, rather than prior to passing it, so that only each partition was interleaved instead of the entire dataframe.
This was not as straightforward as it seemed due to issues with Pickle serialisation of the function, ultimately requiring the creation of the “wrapped_spatial_join” function.

In hindsight it was an obvious solution, I would have to use map_partitions to chunk through the data in the dask_cudf frame, the interleave needed to be done within the map_partitions function instead of prior to passing it to map_partitions so that only each chunk was interleaved instead of the whole dataframe.

This also wasn’t as simple as described because dask didn’t like having other functions inside the map_partitions function (it causes problems with Pickle serialisation of the function — hence the final “wrapped_spatial_join” function.

Data size and type challenges

Another issue encountered was the size limitation of the dataset for cuDF, which is restricted by the size of INT_32 (2,147,483,647). My dataset consisted of approximately 2.3 billion records, exceeding this limit.

This meant it was impossible to process the entire dataset at once, necessitating the completion of all operations on data partitions. One example of this was the final count of points within and outside polygons. A straightforward result.value_counts() could not be employed, and a separate calculation needed to be performed for each partition before aggregating the results.

This consideration becomes particularly important for larger or wider datasets that require complex calculations across them, as working on the full dataset might not be feasible. This was really the main focus of this experiment, as now this works on a small GPU, you can be sure that the same processes can be applied to larger GPU’s and much larger datasets!

In terms of performance, Parquet took longer to convert but was more efficient once the conversion was complete. Depending on the workflow, this may be a critical factor to consider. Generally, data only needs to be converted once but may be subjected to multiple analyses, making Parquet the more suitable option.

On the other hand, ORC conversion was faster but less efficient during calculations. For more ad-hoc computations, ORC may be preferable, especially when used with a tool like TrinoDB that has a highly efficient ORC engine.

Both formats exhibited strong performance on limited hardware. With some basic optimisations, it is likely that their performance could be further enhanced. It is valuable to observe the baseline performance of each format.



Source link

Leave a Comment