Modern Route Optimization with Python | by Mauro Di Pietro | Jun, 2023


Setup

First of all, I need to import the following libraries:

## for data
import pandas as pd #1.1.5
import numpy as np #1.21.0

## for plotting
import matplotlib.pyplot as plt #3.3.2
import seaborn as sns #0.11.1
import folium #0.14.0
from folium import plugins
import plotly.express as px #5.1.0

## for simple routing
import osmnx as ox #1.2.2
import networkx as nx #3.0

## for advanced routing
from ortools.constraint_solver import pywrapcp #9.6
from ortools.constraint_solver import routing_enums_pb2

Then, I shall read the dataset (please note that for geospatial data Latitude=Y axis and Longitude=X axis):

city = "Hong Kong"

dtf = pd.read_csv('data_stores.csv')
dtf = dtf[dtf["City"]==city][
["City","Street Address","Latitude","Longitude"]
].reset_index(drop=True)
dtf = dtf.reset_index().rename(
columns={"index":"id", "Latitude":"y", "Longitude":"x"})

print("tot:", len(dtf))
dtf.head(3)

Image by author

Among these locations, I will pick one to be “depot” (the base), and calculate what’s the best way to serve all the other locations.

# pinpoint your starting location
i = 0
dtf["base"] = dtf["id"].apply(lambda x: 1 if x==i else 0)
start = dtf[dtf["base"]==1][["y","x"]].values[0]

print("start =", start)
dtf.head(3)

Image by author

Let’s plot the starting location along with the other data points:

plt.scatter(y=dtf["y"], x=dtf["x"], color="black")
plt.scatter(y=start[0], x=start[1], color="red")
plt.show()
Image by author

To make this more realistic, I’m going to display the data points as locations on a map. You can do that with Folium, a powerful library that uses HTML to create different types of interactive maps.

# setup
data = dtf.copy()
color = "base" #color based on this column
lst_colors = ["black","red"]
popup = "id" #popup based on this column

# base map
map_ = folium.Map(location=start, tiles="cartodbpositron", zoom_start=11)

# add colors
lst_elements = sorted(list(data[color].unique()))
data["color"] = data[color].apply(lambda x:
lst_colors[lst_elements.index(x)])

# add popup
data.apply(lambda row:
folium.CircleMarker(
location=[row["y"],row["x"]], popup=row[popup],
color=row["color"], fill=True, radius=5).add_to(map_),
axis=1)

# add full-screen button
plugins.Fullscreen(position="topright", title="Expand",
title_cancel="Exit", force_separate_button=True).add_to(map_)

# show
map_

Image by author

Basically, we need to find the most convenient way for the red point (depot) to serve all the other locations (customers).

# add lines
for i in range(len(dtf)):
points = [start, dtf[["y","x"]].iloc[i].tolist()]
folium.PolyLine(points, tooltip="Coast", color="red",
weight=0.5, opacity=0.5).add_to(map_)

map_

Image by author

A quick tip, if you want the option to change the map style, add the following code:

layers = ["cartodbpositron", "openstreetmap", "Stamen Terrain", 
"Stamen Water Color", "Stamen Toner", "cartodbdark_matter"]
for tile in layers:
folium.TileLayer(tile).add_to(map_)
folium.LayerControl(position='bottomright').add_to(map_)
map_
Image by author

Shortest Path

The most common approach for this kind of usecase is to consider the road network as a graph and find the shortest path between nodes.

We already have all the nodes (the location points in our dataset), but we’re missing the links (the streets connecting the points). Therefore, we need to get street map data with OSMnx, a super useful library that queries Open Street Map and converts the response into NetworkX graphs, which is the standard Python library to work with graph objects.

# create network graph
G = ox.graph_from_point(start, dist=10000,
network_type="drive") #'drive', 'bike', 'walk'
G = ox.add_edge_speeds(G)
G = ox.add_edge_travel_times(G)

# plot
fig, ax = ox.plot_graph(G, bgcolor="black", node_size=5,
node_color="white", figsize=(16,8))

Image by author

The graph object contains nodes and links extracted from the map. All those little points are nodes. If you want to see just the links, you can set node_size=0 :

Image by author

The nodes have this form…

image by author

… you can put them in a “geo-dataframe” like this…

# geo-dataframe (nodes)
print("nodes:", len(G.nodes()))
ox.graph_to_gdfs(G, nodes=True, edges=False).reset_index().head(3)
Image by author

… and the same goes for the links.

# geo-dataframe (links)
print("links:", len(G.edges()))
ox.graph_to_gdfs(G, nodes=False, edges=True).reset_index().head(3)
Image by author

Now that we have the graph, let’s understand how to move inside the network from node to node. We already have a starting point, so let’s pick a random destination… for example the closest node:

Image by author
end = dtf[dtf["id"]==68][["y","x"]].values[0]
print("locations: from", start, "--> to", end)

We have 2 locations, but in order to use the graph, we must get the equivalent nodes.

start_node = ox.distance.nearest_nodes(G, start[1], start[0])
end_node = ox.distance.nearest_nodes(G, end[1], end[0])
print("nodes: from", start_node, "--> to", end_node)

Hence, we can find the shortest path between the two nodes with Dijkstra’s algorithm. Basically, it calculates the overall route by finding the shortest path from one “neighborhood” of nodes to another, step by step.

Image by author

In Python, we can apply the algorithm directly with NetworkX. One can specify the attribute weight of the optimization, for instance we could prioritize the distance or the travel time. The shortest weighted path between 2 nodes is the one that minimizes the weight.

# calculate shortest path
path_lenght = nx.shortest_path(G, source=start_node, target=end_node,
method='dijkstra', weight='lenght')
print(path_lenght)

# plot on the graph
fig, ax = ox.plot_graph_route(G, path_lenght, route_color="red",
route_linewidth=5, node_size=1,
bgcolor='black', node_color="white",
figsize=(16,8))

Image by author

If we optimize for time instead:

# calculate shortest path
path_time = nx.shortest_path(G, source=start_node, target=end_node,
method='dijkstra', weight='travel_time')
print(path_time)

# plot on the graph
fig, ax = ox.plot_graph_route(G, path_time, route_color="blue",
route_linewidth=5, node_size=1,
bgcolor='black', node_color="white",
figsize=(16,8))

Image by author

We can compare the paths on the graph…

# plot on the graph
fig, ax = ox.plot_graph_routes(G, routes=[path_lenght, path_time],
route_colors=["red","blue"],
route_linewidth=5, node_size=1,
bgcolor='black', node_color="white",
figsize=(16,8))
Image by author

… or even better on the map using the combo OSMnx Folium:

# plot on the map
ox.plot_route_folium(G, route=path_lenght, route_map=map_,
color="red", weight=1)
ox.plot_route_folium(G, route=path_time, route_map=map_,
color="blue", weight=1)
map_
Image by author

Finally, we can select a path and simulate the driver that goes from node to node. We shall use Plotly, the famous graphing library for interactive plots, and Mapbox, a provider of custom online maps for famous websites (i.e. Lonely Planet and Financial Times). First, we must prepare the dataframe with route information, then create a Plotly animation.

lst_start, lst_end = [],[]
start_x, start_y = [],[]
end_x, end_y = [],[]
lst_length, lst_time = [],[]

for a,b in zip(route_time[:-1], route_time[1:]):
lst_start.append(a)
lst_end.append(b)
lst_length.append(round(G.edges[(a,b,0)]['length']))
lst_time.append(round(G.edges[(a,b,0)]['travel_time']))
start_x.append(G.nodes[a]['x'])
start_y.append(G.nodes[a]['y'])
end_x.append(G.nodes[b]['x'])
end_y.append(G.nodes[b]['y'])

df = pd.DataFrame(list(zip(lst_start, lst_end,
start_x, start_y, end_x, end_y,
lst_length, lst_time)),
columns=["start","end","start_x","start_y",
"end_x","end_y","length","travel_time"]
).reset_index().rename(columns={"index":"id"})

df.head()

Image by author
## create start/end df 
df_start = df[df["start"] == start_node]
df_end = df[df["end"] == end_node]

## create basic map
fig = px.scatter_mapbox(data_frame=df, lon="start_x", lat="start_y",
zoom=15, width=1000, height=800,
animation_frame="id",
mapbox_style="carto-positron")
## add driver
fig.data[0].marker = {"size":12}
## add start point
fig.add_trace(px.scatter_mapbox(data_frame=df_start,
lon="start_x", lat="start_y").data[0])
fig.data[1].marker = {"size":15, "color":"red"}
## add end point
fig.add_trace(px.scatter_mapbox(data_frame=df_end,
lon="start_x", lat="start_y").data[0])
fig.data[2].marker = {"size":15, "color":"green"}
## add route
fig.add_trace(px.line_mapbox(data_frame=df,
lon="start_x", lat="start_y").data[0])
fig

Image by author

Preprocessing

That was just a warm-up to learn how to find the path from one node to another. We still have to calculate a route to visit all the locations. These problems typically follow a set formula: generate the distance matrix of shortest-path costs between all locations, construct an initial solution, and improve it by a sequence of iterations.

## get the node for each location
dtf["node"] = dtf[["y","x"]].apply(lambda x:
ox.distance.nearest_nodes(G, x[1], x[0]),
axis=1)
dtf = dtf.drop_duplicates("node", keep='first')
dtf.head()
Image by author

Therefore, before applying any model, the first thing to do is calculate the distance matrix between all the locations in our dataset. We can do that by finding the distance of the shortest path between each node we are interested in.

## distance length function
def f(a,b):
try:
d = nx.shortest_path_length(G, source=a, target=b,
method='dijkstra',
weight='travel_time')
except:
d = np.nan
return d

## apply the function
distance_matrix = np.asarray([[f(a,b) for b in dtf["node"].tolist()]
for a in dtf["node"].tolist()])
distance_matrix = pd.DataFrame(distance_matrix,
columns=dtf["node"].values,
index=dtf["node"].values)
distance_matrix.head()

Image by author

It’s essential to check if there are NaNs, 0s, and Inf values:

heatmap = distance_matrix.copy()
for col in heatmap.columns:
heatmap[col] = heatmap[col].apply(lambda x:
0.3 if pd.isnull(x) else #nan -> purple
(0.7 if np.isinf(x) else #inf -> orange
(0 if x!=0 else 1) )) # 0 -> white

fig, ax = plt.subplots(figsize=(10,5))
sns.heatmap(heatmap, vmin=0, vmax=1, cbar=False, ax=ax)
plt.show()

Image by author

We have the 0s in the right place (the diagonal), and there is no Inf, although we see some NaNs… so we gotta deal with them. This step is quite delicate, as the distance matrix impacts any routing model one can use. Usually, I replace the missing values with the average distance of the row.

# fillna with row average
distance_matrix = distance_matrix.T.fillna(distance_matrix.mean(axis=1)).T

# fillna with overall average
distance_matrix = distance_matrix.fillna(distance_matrix.mean().mean())

Image by author

We shall now start working on the Routing Optimization models.

Traveling Salesman Problem

If we consider just the distance that a single driver has to go, the optimal route is the collection of the shortest paths from node to node, so basically it’s the shortest route.

## Business parameters
drivers = 1

lst_nodes = dtf["node"].tolist()
print("start:", start_node, "| tot locations to visit:",
len(lst_nodes)-1, "| drivers:", drivers)

The most advanced Python library is OR-Tools, developed by Google for solving linear programming and related optimization problems. That’s such a powerful tool because it makes use of many techniques, one is Conflict-Driven Clause Learning which is similar to a Reinforcement Learning algorithm. To put it in simple terms, it learns from conflicts during the search for a satisfying solution and tries to avoid repeating the same conflict.

First of all, you must define the index manager that keeps track of the nodes indexing, and the routing model which is the main OR-Tools object.

manager = pywrapcp.RoutingIndexManager(len(lst_nodes), 
drivers,
lst_nodes.index(start_node))
model = pywrapcp.RoutingModel(manager)

Then, we need to add the cost function for each step which will be minimized. In our case, it’s the distance…

def get_distance(from_index, to_index):
return distance_matrix.iloc[from_index,to_index]

distance = model.RegisterTransitCallback(get_distance)
model.SetArcCostEvaluatorOfAllVehicles(distance)

… and specify the strategy.

parameters = pywrapcp.DefaultRoutingSearchParameters()
parameters.first_solution_strategy = (
routing_enums_pb2.FirstSolutionStrategy.PATH_CHEAPEST_ARC
)

Finally, solve the problem and print the solution:

solution = model.SolveWithParameters(parameters)

index = model.Start(0)
print('Route for driver:')
route_idx, route_distance = [], 0
while not model.IsEnd(index):
route_idx.append( manager.IndexToNode(index) )
previous_index = index
index = solution.Value( model.NextVar(index) )
### update distance
try:
route_distance += get_distance(previous_index, index)
except:
route_distance += model.GetArcCostForVehicle(
from_index=previous_index,
to_index=index,
vehicle=0)

print(route_idx)
print(f'Total distance: {round(route_distance/1000,2)} km')
print(f'Nodes visited: {len(route_idx)}')

Let’s transform the route from a sequence of indexes to a sequence of nodes:

print("Route for driver (nodes):")
lst_route = [lst_nodes[i] for i in route_idx]
print(lst_route)

Thanks to that, we can plot the route on the map:

# Get path between nodes
def get_path_between_nodes(lst_route):
lst_paths = []
for i in range(len(lst_route)):
try:
a, b = lst_nodes[i], lst_nodes[i+1]
except:
break
try:
path = nx.shortest_path(G, source=a, target=b,
method='dijkstra',
weight='travel_time')
if len(path) > 1:
lst_paths.append(path)
except:
continue
return lst_paths

lst_paths = get_path_between_nodes(lst_route)

# Add paths on the map
for path in lst_paths:
ox.plot_route_folium(G, route=path, route_map=map_,
color="blue", weight=1)
map_

Image by author

Vehicle Routing Problem

Sadly, the real world isn’t that simple as companies have more business constraints. Consequently, there are many variants of the Vehicle Routing Problem:

  • Capacitated Vehicle Routing Problem: vehicles have a limited carrying capacity for the goods that must be delivered.
  • Vehicle Routing Problem with Time Windows: the delivery locations have time windows within the deliveries that must be made.
  • Vehicle Routing Problem with Pickup and Delivery: goods need to be moved from certain pickup locations to other delivery locations.
  • Vehicle Routing Problem with Profits: it is not mandatory for vehicles to visit all nodes, the aim is to maximize the sum of collected profits.

For this usecase, I’m going to introduce limitations for the drivers in both carrying capacity and coverable distance.

## Business parameters
drivers = 3
driver_capacities = [20,20,20]
demands = [0] + [1]*(len(lst_nodes)-1)
max_distance = 1000

Just like we did before, we need to create the manager, the model, and add the distance function:

## model
manager = pywrapcp.RoutingIndexManager(len(lst_nodes),
drivers,
lst_nodes.index(start_node))
model = pywrapcp.RoutingModel(manager)

## add distance (cost)
def get_distance(from_index, to_index):
return distance_matrix.iloc[from_index,to_index]

distance = model.RegisterTransitCallback(get_distance)
model.SetArcCostEvaluatorOfAllVehicles(distance)

However, this time we must include the new business constraints:

## add capacity (costraint)
def get_demand(from_index):
return demands[from_index]

demand = model.RegisterUnaryTransitCallback(get_demand)
model.AddDimensionWithVehicleCapacity(demand, slack_max=0,
vehicle_capacities=driver_capacities,
fix_start_cumul_to_zero=True,
name='Capacity')

## add limited distance (costraint)
name = 'Distance'
model.AddDimension(distance, slack_max=0, capacity=max_distance,
fix_start_cumul_to_zero=True, name=name)
distance_dimension = model.GetDimensionOrDie(name)
distance_dimension.SetGlobalSpanCostCoefficient(100)

## set strategy to minimize cost
parameters = pywrapcp.DefaultRoutingSearchParameters()
parameters.first_solution_strategy = (
routing_enums_pb2.FirstSolutionStrategy.PATH_CHEAPEST_ARC
)
solution = model.SolveWithParameters(parameters)

Finally, solve the problem and print the solution:

solution = model.SolveWithParameters(parameters)

dic_routes_idx, total_distance, total_load = {}, 0, 0
for driver in range(drivers):
print(f'Route for driver {driver}:')
index = model.Start(driver)
route_idx, route_distance, route_load = [], 0, 0
while not model.IsEnd(index):
node_index = manager.IndexToNode(index)
route_idx.append( manager.IndexToNode(index) )
previous_index = index
index = solution.Value( model.NextVar(index) )
### update distance
try:
route_distance += get_distance(previous_index, index)
except:
route_distance += model.GetArcCostForVehicle(
from_index=previous_index,
to_index=index,
vehicle=driver)
### update load
route_load += demands[node_index]

route_idx.append( manager.IndexToNode(index) )
print(route_idx)
dic_routes_idx[driver] = route_idx
print(f'distance: {round(route_distance/1000,2)} km')
print(f'load: {round(route_load,2)}', "n")
total_distance += route_distance
total_load += route_load

print(f'Total distance: {round(total_distance/1000,2)} km')
print(f'Total load: {total_load}')

# Convert from idx to nodes
dic_route = {}
for k,v in dic_routes_idx.items():
print(f"Route for driver {k} (nodes):")
dic_route[k] = [lst_nodes[i] for i in v]
print(dic_route[k], "n")

Let’s visualize the routes on the map:

# Get path between nodes
dic_paths = {k:get_path_between_nodes(v) for k,v in dic_route.items()}

# Add paths on the map
lst_colors = ["red","green","blue"]
for k,v in dic_paths.items():
for path in v:
ox.plot_route_folium(G, route=path, route_map=map_,
color=lst_colors[k], weight=1)
map_

Image by author

At last, to end this article in style, let’s run the simulation to see our drivers getting to work. First, get the appropriate dataframe…

def df_animation_multiple_path(G, lst_paths, parallel=True):
df = pd.DataFrame()
for path in lst_paths:
lst_start, lst_end = [],[]
start_x, start_y = [],[]
end_x, end_y = [],[]
lst_length, lst_time = [],[]

for a,b in zip(path[:-1], path[1:]):
lst_start.append(a)
lst_end.append(b)
lst_length.append(round(G.edges[(a,b,0)]['length']))
lst_time.append(round(G.edges[(a,b,0)]['travel_time']))
start_x.append(G.nodes[a]['x'])
start_y.append(G.nodes[a]['y'])
end_x.append(G.nodes[b]['x'])
end_y.append(G.nodes[b]['y'])

tmp = pd.DataFrame(list(zip(lst_start, lst_end,
start_x, start_y,
end_x, end_y,
lst_length, lst_time)),
columns=["start","end","start_x","start_y",
"end_x","end_y","length","travel_time"]
)
df = pd.concat([df,tmp], ignore_index=(not parallel))

df = df.reset_index().rename(columns={"index":"id"})
return df

… second, plot the animation.

df = pd.DataFrame()
for driver,lst_paths in dic_paths.items():
tmp = df_animation_multiple_path(G, lst_paths, parallel=False)
df = pd.concat([df,tmp], axis=0)

first_node, last_node = lst_paths[0][0], lst_paths[-1][-1]
plot_animation(df, first_node, last_node)

Image by author

Conclusion

This article has been a tutorial to demonstrate how to perform Route Optimization with Python. First of all, we learn how to visualize the geo-dataset on a map and build the network graph. Then, I showed how to approach both Traveling Salesman Problem by finding the shortest route for a driver, and Vehicle Routing Problem by finding the cheapest route for many drivers.

On top of that, now you know how to produce interactive plots and cool animations using geospatial data.

I hope you enjoyed it! Feel free to contact me for questions and feedback or just to share your interesting projects.

👉 Let’s Connect 👈



Source link

Leave a Comment