Developing Scientific Software. Part 2: Practical Aspects with Python | by Carlos Costa, Ph.D. | Jul, 2023

Part 2: Practical Aspects with Python

In this article we will follow the tenets of TDD for developing Scientific Software as laid out in the first installment of this series to develop an edge detection filter known as the Sobel filter.

In the first article, we talked about how important — and tricky — it can be to develop reliable testing methods for software which the complex problems often found in scientific software. We also saw how to overcome those issues by following a development cycle inspired by TDD, but adapted for scientific computing. I reproduce a shortened version of these instructions below.

Implementation cycle

1. Gather requirements
2. Sketch the design
3. Implement initial tests
5. Build an oracle library
6. Revisit tests
7. Implement profiling

1. Optimize
2. Reimplement

New method cycle

1. Implement new method
2. Validate against previous curated oracles

In this article, we will follow the above instructions to develop a function which applies the Sobel filter. The Sobel filter is a commonly used computer vision tool to detect or enhance edges in images. Keep reading to see some examples!

Starting with the first step, we gather some requirements. We will follow the standard formulation of the Sobel filter described in this article. Simply put, the Sobel operator consists of convolving image A with the following two 3 × 3 kernels, squaring each pixel in the resulting images, summing them and taking the point-by-point square root. If Ax and Ay are the images resulting from the convolutions, the result of the Sobel filter S is √(Ax² + Ay²).

We want this function to take any 2D array and generate another 2D array. We might want it to operate on any two axes of an ndarray. We might even want it to work on more (or less) than two axes. We might have specifications for how to handle the edges of the array.

For now let’s remember to keep it simple, and start with a 2D implementation. But before we do that, let’s sketch the design.

We start with a simple design, leveraging Python’s annotations. I highly recommend annotating as much as possible, and using mypy as a linter.

`from typing import Tuplefrom numpy.core.multiarray import normalize_axis_indexfrom numpy.typing import NDArraydef sobel(arr: NDArray, axes: Tuple[int, int] = (-2, -1)) -> NDArray:"""Compute the Sobel filter of an imageParameters----------arr : NDArrayInput imageaxes : Tuple[int, int], optionalAxes over which to compute the filter, by default (-2, -1)Returns-------NDArrayOutput"""# Only accepts 2D arraysif arr.ndim != 2:raise NotImplementedError# Ensure that the axis[0] is the first axis, and axis[1] is the second# axis. The obscure `normalize_axis_index` converts negative indices to# indices between 0 and arr.ndim - 1.if any(normalize_axis_index(ax, arr.ndim) != ifor i, ax in zip(range(2), axes)):raise NotImplementedErrorpass`

This function doesn’t do much. But it is documented, annotated, and its current limitations are baked into it. Now that we have a design, we immediately pivot to tests.

First, we notice that empty images (all zeroes) have no edges. So they must output zeroes as well. In fact, any constant image should also return zeros. Let’s bake that into out first tests. We will also see how we can use monkey testing to test many arrays.

`# test_zero_constant.pyimport numpy as npimport pytest# Test multiple dtypes at once@pytest.mark.parametrize("dtype",["float16", "float32", "float64", "float128"],)def test_zero(dtype):# Set random seedseed = int(np.random.rand() * (2**32 - 1))np.random.seed(seed)# Create a 2D array of random shape and fill with zerosnx, ny = np.random.randint(3, 100, size=(2,))arr = np.zeros((nx, ny), dtype=dtype)# Apply sobel functionarr_sob = sobel(arr)# `assert_array_equal` should fail most of the times.# It will only work when `arr_sob` is identically zero,# which is usually not the case.# DO NOT USE!# np.testing.assert_array_equal(#     arr_sob, 0.0, err_msg=f"{seed=} {nx=}, {ny=}"# )# `assert_almost_equal` can fail when used with high decimals.# It also relies on float64 checking, which might fail for# float128 types.# DO NOT USE!# np.testing.assert_almost_equal(#     arr_sob,#     np.zeros_like(arr),#     err_msg=f"{seed=} {nx=}, {ny=}",#     decimal=4,# )# `assert_allclose` with custom tolerance is my preferred method# The 10 is arbitrary and depends on the problem. If a method# which you know to be correct does not pass, increase to 100, etc.# If the tolerance needed to make the tests pass is too high, make# sure the method is actually correct.tol = 10 * np.finfo(arr.dtype).epserr_msg = f"{seed=} {nx=}, {ny=} {tol=}"  # Log seeds and other infonp.testing.assert_allclose(arr_sob,np.zeros_like(arr),err_msg=err_msg,atol=tol,  # rtol is useless for desired=zeros)@pytest.mark.parametrize("dtype", ["float16", "float32", "float64", "float128"])def test_constant(dtype):seed = int(np.random.rand() * (2**32 - 1))np.random.seed(seed)nx, ny = np.random.randint(3, 100, size=(2,))constant = np.random.randn(1).item()arr = np.full((nx, ny), fill_value=constant, dtype=dtype)arr_sob = sobel(arr)tol = 10 * np.finfo(arr.dtype).epserr_msg = f"{seed=} {nx=}, {ny=} {tol=}"np.testing.assert_allclose(arr_sob,np.zeros_like(arr),err_msg=err_msg,atol=tol,  # rtol is useless for desired=zeros)`

This code snippet can be run from the command-line with

`\$ pytest -qq -s -x -vv --durations=0 test_zero_constant.py`

Of course our tests will currently fail, but they are enough for now. Let’s implement a first version.

`from typing import Tupleimport numpy as npfrom numpy.core.multiarray import normalize_axis_indexfrom numpy.typing import NDArraydef sobel(arr: NDArray, axes: Tuple[int, int] = (-2, -1)) -> NDArray:if arr.ndim != 2:raise NotImplementedErrorif any(normalize_axis_index(ax, arr.ndim) != ifor i, ax in zip(range(2), axes)):raise NotImplementedError# Define our filter kernels. Notice they inherit the input type, so# that a float32 input never has to be cast to float64 for computation.# But can you see where using another dtype for Gx and Gy might make# sense for some input dtypes?Gx = np.array([[-1, 0, 1], [-2, 0, 2], [-1, 0, 1]],dtype=arr.dtype,)Gy = np.array([[-1, -2, -1], [0, 0, 0], [1, 2, 1]],dtype=arr.dtype,)# Create the output array and fill with zeroess = np.zeros_like(arr)for ix in range(1, arr.shape[0] - 1):for iy in range(1, arr.shape[1] - 1):# Pointwise multiplication followed by sum, aka convolutions1 = (Gx * arr[ix - 1 : ix + 2, iy - 1 : iy + 2]).sum()s2 = (Gy * arr[ix - 1 : ix + 2, iy - 1 : iy + 2]).sum()s[ix, iy] = np.hypot(s1, s2)  # np.sqrt(s1**2 + s2**2)return s`

With this new function, all our tests should pass, and we should get an output like this:

`\$ pytest -qq -s -x -vv --durations=0 test_zero_constant.py........======================================== slowest durations =========================================0.09s call     t_049988eae7f94139a7067f142bf2852f.py::test_constant[float16]0.08s call     t_049988eae7f94139a7067f142bf2852f.py::test_zero[float64]0.06s call     t_049988eae7f94139a7067f142bf2852f.py::test_constant[float128]0.04s call     t_049988eae7f94139a7067f142bf2852f.py::test_zero[float128]0.04s call     t_049988eae7f94139a7067f142bf2852f.py::test_constant[float64]0.02s call     t_049988eae7f94139a7067f142bf2852f.py::test_constant[float32]0.01s call     t_049988eae7f94139a7067f142bf2852f.py::test_zero[float16](17 durations < 0.005s hidden.  Use -vv to show these durations.)8 passed in 0.35s`

We have so far:

1. Gathered requirements of our problem.
2. Sketched an initial design.
3. Implemented some tests.
4. Implemented the alpha version, which passes these tests.

These tests provide verification for future versions, as well as a very rudimentary oracle library. But while unit tests are great at capturing minute deviations from expected results, they are not great at differentiating wrong results from quantitatively different — but still correct — results. Suppose tomorrow we want to implement another Sobel-type operator, which has a longer kernel. We don’t expect the results to match exactly to our current version, but we do expect the outputs of both functions to be qualitatively very similar.

In addition, it is excellent practice to try many different inputs to our functions to get a sense of how they behave for different inputs. This ensures that we scientifically validate the results.

Scikit-image has an excellent library of images, which we can use to create oracles.

`# !pip installscikit-image poochfrom typing import Dict, Callableimport numpy as npimport skimage.databwimages: Dict[str, np.ndarray] = {}for attrname in skimage.data.__all__:attr = getattr(skimage.data, attrname)# Data are obtained via function callsif isinstance(attr, Callable):try:# Download the datadata = attr()# Ensure it is a 2D arrayif isinstance(data, np.ndarray) and data.ndim == 2:# Convert from various int types to float32 to better# assess precisionbwimages[attrname] = data.astype(np.float32)except:continue# Apply sobel to imagesbwimages_sobel = {k: sobel(v) for k, v in bwimages.items()}`

Once we have the data, we can plot it.

`import matplotlib.pyplot as pltfrom mpl_toolkits.axes_grid1 import make_axes_locatabledef create_colorbar(im, ax):divider = make_axes_locatable(ax)cax = divider.append_axes("right", size="5%", pad=0.1)cb = ax.get_figure().colorbar(im, cax=cax, orientation="vertical")return cax, cbfor name, data in bwimages.items():fig, axs = plt.subplots(1, 2, figsize=(10, 4), sharex=True, sharey=True)im = axs[0].imshow(data, aspect="equal", cmap="gray")create_colorbar(im, axs[0])axs[0].set(title=name)im = axs[1].imshow(bwimages_sobel[name], aspect="equal", cmap="gray")create_colorbar(im, axs[1])axs[1].set(title=f"{name} sobel")`

These look really good! I would recommend storing these, both as arrays and as figures which I can quickly check against for a new version. I highly recommend HD5F for array storage. You can also set up a Sphinx Gallery to directly generate the figures them when updating documentation, that way you have a reproducible figure build that you can use to check against previous versions.

After the results have been validated, you can store them on disk and use them as part of your unit testing. Something like this:

`oracle_library = [(k, v, bwimages_sobel[k]) for k, v in bwimages.items()]# save_to_disk(oracle_library, ...)`
`# test_oracle.pyimport numpy as npimport pytestfrom numpy.typing import NDArray# oracle_library = read_from_disk(...)@pytest.mark.parametrize("name,input,output", oracle_library)def test_oracles(name: str, input: NDArray, output: NDArray):output_new = sobel(input)tol = 10 * np.finfo(input.dtype).epsmean_avg_error = np.abs(output_new - output).mean()np.testing.assert_allclose(output_new,output,err_msg=f"{name=} {tol=} {mean_avg_error=}",atol=tol,rtol=tol,)`

Computing the Sobel filter for these datasets took a while! So the next step is to profile the code. I recommend creating a `benchmark_xyz.py` file for each test, and re-run them regularly to probe for performance regressions. This can even be part of your unit testing, but we won’t go so far in this example. Another idea is to use the oracles above for benchmarking.

There are many ways of timing code execution. To get the system-wide, “real” elapsed time, use `perf_counter_ns` from the built-in `time` module to measure time in nanoseconds. In a Jupyter notebook, the built-in `%%timeit` cell magic times a certain cell execution. The decorator below is inspired by this cell magic and can be used to time any function.

`import timefrom functools import wrapsfrom typing import Callable, Optionaldef sizeof_fmt(num, suffix="s"):for unit in ["n", "μ", "m"]:if abs(num) < 1000:return f"{num:3.1f} {unit}{suffix}"num /= 1000return f"{num:.1f}{suffix}"def timeit(func_or_number: Optional[Callable] = None,number: int = 10,) -> Callable:"""Apply to a function to time its executions.Parameters----------func_or_number : Optional[Callable], optionalFunction to be decorated or `number` argument (see below), bydefault Nonenumber : int, optionalNumber of times the function will run to obtain statistics, bydefault 10Returns-------CallableWhen fed a function, returns the decorated function. Otherwisereturns a decorator.Examples--------.. code-block:: python@timeitdef my_fun():pass@timeit(100)def my_fun():pass@timeit(number=3)def my_fun():pass"""if isinstance(func_or_number, Callable):func = func_or_numbernumber = numberelif isinstance(func_or_number, int):func = Nonenumber = func_or_numberelse:func = Nonenumber = numberdef decorator(f):@wraps(f)def wrapper(*args, **kwargs):runs_ns = np.empty((number,))# Run first and measure store the resultstart_time = time.perf_counter_ns()result = f(*args, **kwargs)runs_ns[0] = time.perf_counter_ns() - start_timefor i in range(1, number):start_time = time.perf_counter_ns()f(*args, **kwargs)  # Without storage, fasterruns_ns[i] = time.perf_counter_ns() - start_timetime_msg = f"{sizeof_fmt(runs_ns.mean())} ± "time_msg += f"{sizeof_fmt(runs_ns.std())}"print(f"{time_msg} per run (mean ± std. dev. of {number} runs)")return resultreturn wrapperif func is not None:return decorator(func)return decorator`

Putting our function to the test:

`arr_test = np.random.randn(500, 500)sobel_timed = timeit(sobel)sobel_timed(arr_test);# 3.9s ± 848.9 ms per run (mean ± std. dev. of 10 runs)`

Not too fast 🙁

When investigating slowness or performance regressions, it is also possible to track the runtime of each line. The `line_profiler` library is an excellent resource for this. It can be used via Jupyter cell magic, or using the API. Here is an API example:

`from line_profiler import LineProfilerlp = LineProfiler()lp_wrapper = lp(sobel)lp_wrapper(arr_test)lp.print_stats(output_unit=1)  # 1 for seconds, 1e-3 for milliseconds, etc.`

Here is an example output:

`Timer unit: 1 sTotal time: 4.27197 sFile: /tmp/ipykernel_521529/1313985340.pyFunction: sobel at line 8Line #      Hits         Time  Per Hit   % Time  Line Contents==============================================================8                                           def sobel(arr: NDArray, axes: Tuple[int, int] = (-2, -1)) -> NDArray:9                                               # Only accepts 2D arrays10         1          0.0      0.0      0.0      if arr.ndim != 2:11                                                   raise NotImplementedError12                                           13                                               # Ensure that the axis[0] is the first axis, and axis[1] is the second14                                               # axis. The obscure `normalize_axis_index` converts negative indices to15                                               # indices between 0 and arr.ndim - 1.16         1          0.0      0.0      0.0      if any(17                                                   normalize_axis_index(ax, arr.ndim) != i18         1          0.0      0.0      0.0          for i, ax in zip(range(2), axes)19                                               ):20                                                   raise NotImplementedError21                                           22         1          0.0      0.0      0.0      Gx = np.array(23         1          0.0      0.0      0.0          [[-1, 0, 1], [-2, 0, 2], [-1, 0, 1]],24         1          0.0      0.0      0.0          dtype=arr.dtype,25                                               )26         1          0.0      0.0      0.0      Gy = np.array(27         1          0.0      0.0      0.0          [[-1, -2, -1], [0, 0, 0], [1, 2, 1]],28         1          0.0      0.0      0.0          dtype=arr.dtype,29                                               )30         1          0.0      0.0      0.0      s = np.zeros_like(arr)31       498          0.0      0.0      0.0      for ix in range(1, arr.shape[0] - 1):32    248004          0.1      0.0      2.2          for iy in range(1, arr.shape[1] - 1):33    248004          1.8      0.0     41.5              s1 = (Gx * arr[ix - 1 : ix + 2, iy - 1 : iy + 2]).sum()34    248004          1.7      0.0     39.5              s2 = (Gy * arr[ix - 1 : ix + 2, iy - 1 : iy + 2]).sum()35    248004          0.7      0.0     16.8              s[ix, iy] = np.hypot(s1, s2)36         1          0.0      0.0      0.0      return s`

Lot’s of time is spend inside the innermost loop. NumPy prefers vectorized math, as it can then rely on compiled code. Since we are using explicit for loops, it makes sense that this function is very slow.

In addition, it is important to be smart about memory allocations inside of loops. NumPy is somewhat smart about allocating small amounts of memory inside loops, so the lines defining `s1` or `s2` might not be allocating new memory. But they also could be, or there could be some other memory allocation that is happening under the hood that we are not aware of. I therefore recommend also profiling memory. I like using Memray for that, but there are others like Fil and Sciagraph. I would also avoid memory_profiler which (very unfortunately!) is no longer maintained. Also Memray is much more powerful. Here is an example of Memray in action:

`\$ # Create sobel.bin which holds the profiling information\$ memray run -fo sobel.bin --trace-python-allocators sobel_run.pyWriting profile results into sobel.binMemray WARNING: Correcting symbol for aligned_alloc from 0x7fc5c984d8f0 to 0x7fc5ca4a5ce0[memray] Successfully generated profile results.You can now generate reports from the stored allocation records.Some example commands to generate reports:python3 -m memray flamegraph sobel.bin`
`\$ # Generate flame graph\$ memray flamegraph -fo sobel_flamegraph.html --temporary-allocations sobel.bin⠙ Calculating high watermark... ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╸  99% 0:00:0100:01⠏ Processing allocation records... ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╸  98% 0:00:0100:01Wrote sobel_flamegraph.html`
`\$ # Show memory tree\$ memray tree --temporary-allocations sobel.bin⠧ Calculating high watermark... ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╸ 100% 0:00:0100:01⠧ Processing allocation records... ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╸ 100% 0:00:0100:01Allocation metadata-------------------Command line arguments: 'memray run -fo sobel.bin --trace-python-allocators sobel_run.py'Peak memory size: 11.719MBNumber of allocations: 15332714Biggest 10 allocations:-----------------------📂 123.755MB (100.00 %) <ROOT>  └── [[3 frames hidden in 2 file(s)]]└── 📂 123.755MB (100.00 %) _run_code  /usr/lib/python3.10/runpy.py:86├── 📂 122.988MB (99.38 %) <module>  sobel_run.py:40│   ├── 📄 51.087MB (41.28 %) sobel  sobel_run.py:35│   ├── [[1 frames hidden in 1 file(s)]]│   │   └── 📄 18.922MB (15.29 %) _sum  │   │       lib/python3.10/site-packages/numpy/core/_methods.py:49│   └── [[1 frames hidden in 1 file(s)]]│       └── 📄 18.921MB (15.29 %) _sum  │           lib/python3.10/site-packages/numpy/core/_methods.py:49...`

Now that we have a working alpha and some profiling functions, we will leverage the SciPy library to obtain a much faster version.

`from typing import Tupleimport numpy as npfrom numpy.core.multiarray import normalize_axis_indexfrom numpy.typing import NDArrayfrom scipy.signal import convolve2ddef sobel_conv2d(arr: NDArray, axes: Tuple[int, int] = (-2, -1)) -> NDArray:if arr.ndim != 2:raise NotImplementedErrorif any(normalize_axis_index(ax, arr.ndim) != ifor i, ax in zip(range(2), axes)):raise NotImplementedError# Create the kernels as a single, complex array. Allows us to use# np.abs instead of np.hypot to calculate the magnitude.G = np.array([[-1, 0, 1], [-2, 0, 2], [-1, 0, 1]],dtype=arr.dtype,)G = G + 1j * np.array([[-1, -2, -1], [0, 0, 0], [1, 2, 1]],dtype=arr.dtype,)s = convolve2d(arr, G, mode="same")np.absolute(s, out=s)  # In-place absreturn s.real`
`sobel_timed = timeit(sobel_conv2d)sobel_timed(arr_test)# 14.3 ms ± 1.71 ms per loop (mean ± std. dev. of 10 runs)`

Much better! But is it right?

The images look very similar, but if you notice the color scale, they are not the same. Running the tests flags a small mean average error. Thankfully, we are now well-equipped at detecting quantitative and qualitative differences.

After investigating this bug, we attribute it to the different boundary conditions. Looking into `convolve2d` documentation tells us that the input array is padded with zeroes before applying the kernel. In the alpha, we padded the output!

Which one is right? Arguably the SciPy implementation makes more sense. In this case we should adopt the new version as the oracle, and if required, fix the alpha version to match it. This is common in scientific software development: new information of how to do things better changes the oracles and the tests.

In this case, the fix is easy, pad the array with zeros prior to processing it.

`def sobel_v2(arr: NDArray, axes: Tuple[int, int] = (-2, -1)) -> NDArray:# ...arr = np.pad(arr, (1,))  # After padding, it is shaped (nx + 2, ny + 2)s = np.zeros_like(arr)for ix in range(1, arr.shape[0] - 1):for iy in range(1, arr.shape[1] - 1):s1 = (Gx * arr[ix - 1 : ix + 2, iy - 1 : iy + 2]).sum()s2 = (Gy * arr[ix - 1 : ix + 2, iy - 1 : iy + 2]).sum()s[ix - 1, iy - 1] = np.hypot(s1, s2)  # Adjust indicesreturn s`

Once we corrected out function, we can update the oracles and tests which rely on them.

We saw how to put in practice some of the software development ideas explored in this article. We also saw some tools which you can use to develop high-quality, high-performance code.

I suggest you try some of these ideas on your own projects. In particular, practice profiling code and improving it. The Sobel filter function we coded is very inefficient, I suggest trying to improve it.

For example, try CPU parallelization with a JIT compiler such as Numba, porting the inner loop into Cython, or implementing a CUDA GPU function with Numba or CuPy. Feel free to check out my series on coding CUDA kernels with Numba.