I wanted a distributed system where the client submitting the job has the code and logic, but the workers don’t. They just execute the workload they get. I had used Dask in the LocalCluster mode, where everything runs on the same computer but in different processes/threads. It works well, and I have used it a bunch of times. But this time, I wanted many heterogeneous computing systems to come together and execute the work aka Dask Distributed.
Introduction
I knew Dask supports this through a scheduler and worker model. But I had not tried this. So I did, and it works pretty well. But you need to take care of some additional stuff.
- The client, scheduler, and workers should all use the same version of Pyth…
I wanted a distributed system where the client submitting the job has the code and logic, but the workers don’t. They just execute the workload they get. I had used Dask in the LocalCluster mode, where everything runs on the same computer but in different processes/threads. It works well, and I have used it a bunch of times. But this time, I wanted many heterogeneous computing systems to come together and execute the work aka Dask Distributed.
Introduction
I knew Dask supports this through a scheduler and worker model. But I had not tried this. So I did, and it works pretty well. But you need to take care of some additional stuff.
- The client, scheduler, and workers should all use the same version of Python and the same version of Python packages. So it’s better to pin the package and the Python version. I have used Podman/Docker to run the scheduler and the workers. So I have tied down the image version and hence the versions of Python, Dask, Distributed, and their dependencies. I have used the identical versions for my client. See the header files in the client – distributed_dask.py
- The workers won’t have additional packages. You can either create a new image of the worker with your required packages, or you can dynamically install them using the PipInstall plugin. This plugin takes an array of Python packages and installs them on all workers as they start. I have also pinned these packages Easy way to ship packages to workers.
- I have created a small plugin here called EnvPlugin. This runs on all the workers. Essentially, it copies all the ENV variables from the worker.env file to the client or worker environment. That way, it’s easy to ship that as well.
- cloudpickle is used to serialize the code to ship it from client to workers
- Some of the other pinned packages lkelz4, msgpack etc are used by dask and sitributed.
Client Code
This is the code you probably run from your machine. That connects to scheduler, send code and jobs so it can get the work done.
# /// script
# requires-python = "==3.10.12"
# dependencies = [
# "dask==2024.8.2",
# "distributed==2024.8.2",
# "toolz==0.12.0",
# "lz4==4.3.3",
# "tornado==6.4.1",
# "cloudpickle==3.0.0",
# "msgpack==1.0.8",
# "python-dotenv==1.1.1",
# "requests==2.32.5"
# ]
# ///
import os
import json
import logging
import requests
import csv
from dotenv import dotenv_values
from dask import bag as db
from dask.distributed import (
Client,
LocalCluster,
WorkerPlugin,
PipInstall,
)
logging.basicConfig(level=logging.INFO, format="%(levelname)s %(message)s")
# ---- EnvPlugin plugin -------------------------------------------------
class EnvPlugin(WorkerPlugin):
def __init__(self, env):
self.env = env
def setup(self, worker):
logging.warning("EnvPlugin loaded with keys: %s", list(self.env.keys()))
os.environ.update(self.env)
URL = "https://httpbin.org/post"
HEADERS = {"accept": "application/json"}
# -- Acutal thing that runs on workers -----------------------------------------
def post_row(row: dict) -> dict:
"""Send one row as JSON payload"""
API_KEY_FOR_HTTP_BIN = os.environ.get("API_KEY_FOR_HTTP_BIN")
HEADERS["x-api-key"] = API_KEY_FOR_HTTP_BIN
print(API_KEY_FOR_HTTP_BIN)
r = requests.post(URL, json=row, headers=HEADERS, timeout=20)
r.raise_for_status()
return r.json()
def main(csv_path: str):
with open(csv_path, newline="", encoding="utf-8") as f:
records = list(csv.DictReader(f))
env_vars = dotenv_values("worker.env")
print("==========env_vars for workers=============")
print(env_vars)
os.environ.update(env_vars)
scheduler_address = os.environ.get("DASK_SCHEDULER_ADDRESS")
logging.info(f"Connecting to remote scheduler at {scheduler_address}")
client = Client(scheduler_address)
# Setup env on the workers
client.register_plugin(
EnvPlugin(env_vars), name="env"
)
# Install packages on the workers
plugin = PipInstall(
packages=[
"python-dotenv==1.1.1",
"requests==2.32.5",
],
pip_options=["--upgrade"],
)
client.register_plugin(plugin)
b = db.from_sequence(records, partition_size=10)
# Send to workers
results = b.map(post_row).compute()
client.close()
# Print gathered results
print(results)
logging.info(f"Completed {len(results)} requests")
if __name__ == "__main__":
import sys
if len(sys.argv) != 2:
print("uv run distributed_dask.py data.csv")
raise SystemExit(1)
main(sys.argv[1])
Mismatched versions error
Setup and Run
I am using Podman/Docker to run the scheduler and workers. It makes things easy and keeps the environment clean. I have used the Docker images provided by the project. But in your case, you might want to build them and harden them. Especially the workers, as they will run whatever the client sends. So you will have to be careful if you are running a worker.
Create a network and then run the scheduler in that network ( you could run as part of host too). It starts the scheduler and starts listening for workers to join on port 8786. And exposes a dashboard on another port 8787. That you can access by going to http://localhost:8787
podman network create dask
# start the scheduler
podman run --network dask \
-p 8787:8787 -p 8686:8686 \
--name scheduler \
ghcr.io/dask/dask:2024.8.2 dask-scheduler
Now you can create as many workers as you want and make them join the scheduler. Here I have two.
# start the worker 1
podman run -d \
--name dask-worker-1 \
--network dask \
ghcr.io/dask/dask:2024.8.2 \
dask-worker scheduler:8786 \
--nworkers 1 \
--nthreads 2 \
--worker-port 9000:9009
# start the worker 2
podman run -d \
--name dask-worker-2 \
--network dask \
ghcr.io/dask/dask:2024.8.2 \
dask-worker scheduler:8786 \
--nworkers 1 \
--nthreads 2 \
--worker-port 9000:9009
Clients use two ports. One 8786 port on the scheduler to notify them of joining. And another random port so the scheduler can talk to them. It usually auto-picks. But you can also specify a range, so it assigns one of the available ones in that range. So give some additional ports. If there are 8 workers then give a range of 9 or more. This also helps you recognize the open ports easily. You can check the scheduler logs to see if the workers have joined. You can also check the web dashboard to do the same.
Now just submit the jobs by calling our client script.
export DASK_SCHEDULER_ADDRESS=tcp://127.0.0.1:8786
uv run distributed_dask.py data.csv
Dask Dashboard while jobs are running
That’s it. Now the client should be able to connect to the scheduler, send code and setup instructions to workers, then send jobs to workers, collect all the responses, and print all of them.
Usage
The example I have shown here seems trivial. But I built a CPU and network heave pipeline for processing I used this logic to parse Bihar SIR data for The Hindu (WN-35/2025) in a really short time. It was a CPU and network-intensive job. I could use the combined power of all my laptops to run this and get the job done so that they could run the story. I felt like I was running a supercomputer at home.
Also note what I have not shown in the example client code but it shouldn’t be difficult to add.
- Error handling. I have not done any here. Any error will break it
- Retry mechanism
- There is no real or persistent message queue here. So, at some point, if the scheduler goes down, you will lose work.
- Hardening the workers. Anyway, as a worker, you shouldn’t join a scheduler you don’t trust, as they can send any kind of code to execute on the worker.
You can read this blog using RSS Feed. But if you are the person who loves getting emails, then you can join my readers by signing up.
Join 2,255 other subscribers