ParslFest 2025¶
Accelerating QMCPy Notebook Tests with Parsl¶
Joshua Herman, Brandon Sharp, and Sou-Cheng Choi, QMCPy Developers
Aug 28 -- 29, 2025
Updated: Dec 3, 2025
Requirements:
- testbook :
pip install testbook==0.4.2 - Parsl:
pip install parsl==2025.7.28
In [1]:
Copied!
try:
import parsl as pl
except ModuleNotFoundError:
!pip install -q parsl
try:
import parsl as pl
except ModuleNotFoundError:
!pip install -q parsl
In [2]:
Copied!
import os
import parsl as pl
from util import setup_environment, find_repo_root, run_make_command, parse_total_time
# Configuration flags
force_compute = True
is_debug = False
workers = 2
output_dir = setup_environment()
import os
import parsl as pl
from util import setup_environment, find_repo_root, run_make_command, parse_total_time
# Configuration flags
force_compute = True
is_debug = False
workers = 2
output_dir = setup_environment()
2. Parsl¶
- Install and Configure Parsl
- Run the tests in parallel with Parsl
2.1 Configure Parsl¶
In [3]:
Copied!
from parsl.config import Config
from parsl.executors import ThreadPoolExecutor
# Prefer explicit PARSL_MAX_WORKERS from environment when provided by the caller
_env_workers = os.environ.get('PARSL_MAX_WORKERS')
if _env_workers:
try:
max_workers = int(_env_workers)
except ValueError:
max_workers = None
else:
max_workers = None
if not max_workers: # Default fallback based on CPU count (at least 1, cap to cpu_count-1)
max_workers = min(workers, max(workers, os.cpu_count()-1))
# Use ThreadPoolExecutor (works reliably on macOS and Linux)
config = Config(executors=[ThreadPoolExecutor(max_threads=max_workers, label="local_threads")])
# Ensure clean state: clear any existing Parsl config from previous runs
pl.clear()
# Now load the config
pl.load(config)
print(f"Parsl loaded with {max_workers} workers (PARSL_MAX_WORKERS env={os.environ.get('PARSL_MAX_WORKERS')})")
from parsl.config import Config
from parsl.executors import ThreadPoolExecutor
# Prefer explicit PARSL_MAX_WORKERS from environment when provided by the caller
_env_workers = os.environ.get('PARSL_MAX_WORKERS')
if _env_workers:
try:
max_workers = int(_env_workers)
except ValueError:
max_workers = None
else:
max_workers = None
if not max_workers: # Default fallback based on CPU count (at least 1, cap to cpu_count-1)
max_workers = min(workers, max(workers, os.cpu_count()-1))
# Use ThreadPoolExecutor (works reliably on macOS and Linux)
config = Config(executors=[ThreadPoolExecutor(max_threads=max_workers, label="local_threads")])
# Ensure clean state: clear any existing Parsl config from previous runs
pl.clear()
# Now load the config
pl.load(config)
print(f"Parsl loaded with {max_workers} workers (PARSL_MAX_WORKERS env={os.environ.get('PARSL_MAX_WORKERS')})")
Out[3]:
<parsl.dataflow.dflow.DataFlowKernel at 0x10b9a8cb0>
Parsl loaded with 2 workers (PARSL_MAX_WORKERS env=None)
2.2 Create a Parsl Test Runner¶
In [4]:
Copied!
import parsl_test_runner
import inspect
# See only functions
print("Functions:")
functions = inspect.getmembers(parsl_test_runner, inspect.isfunction)
for name, func in functions:
print(f"- {name}")
print("\n" + "="*50)
# Get help on specific function
print("Help for execute_parallel_tests:")
help(parsl_test_runner.execute_parallel_tests)
import parsl_test_runner
import inspect
# See only functions
print("Functions:")
functions = inspect.getmembers(parsl_test_runner, inspect.isfunction)
for name, func in functions:
print(f"- {name}")
print("\n" + "="*50)
# Get help on specific function
print("Help for execute_parallel_tests:")
help(parsl_test_runner.execute_parallel_tests)
Functions:
- bash_app
- execute_parallel_tests
- generate_summary_report
- main
==================================================
Help for execute_parallel_tests:
Help on function execute_parallel_tests in module parsl_test_runner:
execute_parallel_tests()
Execute all testbook tests in parallel using Parsl
In [5]:
Copied!
# Verify Parsl configuration
print(f"Max workers configured: {max_workers}")
print(f"Active Parsl DFK: {pl.dfk()}")
print(f"Executors: {[executor.label for executor in pl.dfk().executors.values()]}")
if hasattr(config, 'executors'):
for executor in config.executors:
if hasattr(executor, 'max_workers_per_node'):
print(f"Executor '{executor.label}' max_workers_per_node: {executor.max_workers_per_node}")
# Verify Parsl configuration
print(f"Max workers configured: {max_workers}")
print(f"Active Parsl DFK: {pl.dfk()}")
print(f"Executors: {[executor.label for executor in pl.dfk().executors.values()]}")
if hasattr(config, 'executors'):
for executor in config.executors:
if hasattr(executor, 'max_workers_per_node'):
print(f"Executor '{executor.label}' max_workers_per_node: {executor.max_workers_per_node}")
Max workers configured: 2 Active Parsl DFK: <parsl.dataflow.dflow.DataFlowKernel object at 0x10b9a8cb0> Executors: ['local_threads', '_parsl_internal']
2.3 Run the Notebooks in Parallel with Parsl¶
In [6]:
Copied!
import uuid
execution_id = str(uuid.uuid4())[:8]
print(f"=== EXECUTION ID: {execution_id} ===")
print(f"Starting parallel test execution with {max_workers} workers...")
par_fname = os.path.join(output_dir, f"parallel_times_{max_workers}.csv")
par_output = os.path.join(output_dir, f"parallel_output_{max_workers}.txt")
if (not os.path.exists(par_fname)) or force_compute:
env = os.environ.copy()
env['PARSL_MAX_WORKERS'] = str(max_workers)
run_make_command("booktests_parallel_no_docker", par_output, is_debug=is_debug, env=env)
parallel_time = parse_total_time(par_output, r"Total test time: ([\d\.]+)s")
print(f"\n=== RESULTS FOR EXECUTION {execution_id} ===")
print(f"Parallel time: {parallel_time:.2f} seconds")
with open(par_fname, "w") as f:
_ = f.write(f"workers,time\n{max_workers},{parallel_time:.2f}\n")
print(f"=== END EXECUTION {execution_id} ===")
import uuid
execution_id = str(uuid.uuid4())[:8]
print(f"=== EXECUTION ID: {execution_id} ===")
print(f"Starting parallel test execution with {max_workers} workers...")
par_fname = os.path.join(output_dir, f"parallel_times_{max_workers}.csv")
par_output = os.path.join(output_dir, f"parallel_output_{max_workers}.txt")
if (not os.path.exists(par_fname)) or force_compute:
env = os.environ.copy()
env['PARSL_MAX_WORKERS'] = str(max_workers)
run_make_command("booktests_parallel_no_docker", par_output, is_debug=is_debug, env=env)
parallel_time = parse_total_time(par_output, r"Total test time: ([\d\.]+)s")
print(f"\n=== RESULTS FOR EXECUTION {execution_id} ===")
print(f"Parallel time: {parallel_time:.2f} seconds")
with open(par_fname, "w") as f:
_ = f.write(f"workers,time\n{max_workers},{parallel_time:.2f}\n")
print(f"=== END EXECUTION {execution_id} ===")
=== EXECUTION ID: 0a17146f === Starting parallel test execution with 2 workers...
Out[6]:
True
=== RESULTS FOR EXECUTION 0a17146f === Parallel time: 322.83 seconds === END EXECUTION 0a17146f ===
In [7]:
Copied!
!date
!ls -ltr output
!date
!ls -ltr output
Thu Dec 4 15:04:12 CST 2025 total 368 -rw-r--r--@ 1 terrya staff 6267 Dec 4 14:56 sequential_output.csv -rw-r--r--@ 1 terrya staff 7 Dec 4 14:56 sequential_time.csv -rw-r--r--@ 1 terrya staff 6380 Dec 4 14:56 01_sequential_output.ipynb -rw-r--r--@ 1 terrya staff 152935 Dec 4 15:00 parallel_times_speedup.png -rw-r--r--@ 1 terrya staff 4501 Dec 4 15:04 parallel_output_2.txt -rw-r--r--@ 1 terrya staff 22 Dec 4 15:04 parallel_times_2.csv
In [8]:
Copied!
import platform
if platform.system().lower() == 'linux':
!uname -a
!nproc --all
!awk '/MemTotal/ {printf "%.2f GB\n", $2/1024/1024}' /proc/meminfo
import platform
if platform.system().lower() == 'linux':
!uname -a
!nproc --all
!awk '/MemTotal/ {printf "%.2f GB\n", $2/1024/1024}' /proc/meminfo