Multiprocessing mp wandb sweeps and the count parameter, how to do sweeps with mp?

I am trying to run hyperparameter sweeps using Weights and Biases (W&B) and would like to leverage multiprocessing to parallelize my experiments as much as possible. I want to ensure that each set of hyperparameters is evaluated only once and that I can run multiple experiments simultaneously without repeating hyperparameters.

Here is a simplified version of my training and evaluation script:

import random
import numpy as np
import wandb
from concurrent.futures import ProcessPoolExecutor, as_completed
from typing import Tuple, Dict

def train_one_epoch(epoch: int, lr: float, bs: int) -> Tuple[float, float]:
    """
    Simulate training for one epoch.
    """
    acc = 0.25 + ((epoch / 30) + (random.random() / 10))
    loss = 0.2 + (1 - ((epoch - 1) / 10 + random.random() / 5))
    return acc, loss

def evaluate_one_epoch(epoch: int) -> Tuple[float, float]:
    """
    Simulate evaluation for one epoch.
    """
    acc = 0.1 + ((epoch / 20) + (random.random() / 10))
    loss = 0.25 + (1 - ((epoch - 1) / 10 + random.random() / 6))
    return acc, loss

config: Dict[str, float] = {"lr": 0.0001, "bs": 16, "epochs": 5}

def run_epoch(epoch: int, lr: float, bs: int) -> Tuple[int, float, float, float, float]:
    """
    Run training and evaluation for one epoch.
    """
    train_acc, train_loss = train_one_epoch(epoch, lr, bs)
    val_acc, val_loss = evaluate_one_epoch(epoch)
    return epoch, train_acc, train_loss, val_acc, val_loss

def main() -> None:
    """
    Main function to run the training and evaluation in parallel using multiprocessing.
    """
    lr = config["lr"]
    bs = config["bs"]
    epochs = config["epochs"]

    # Initialize Weights and Biases
    wandb.init(project="my_project", config=config)

    with ProcessPoolExecutor() as executor:
        futures = [executor.submit(run_epoch, epoch, lr, bs) for epoch in np.arange(1, epochs)]
        for future in as_completed(futures):
            epoch, train_acc, train_loss, val_acc, val_loss = future.result()
            wandb.log({
                "epoch": epoch,
                "train_acc": train_acc,
                "train_loss": train_loss,
                "val_acc": val_acc,
                "val_loss": val_loss
            })
            print(f"epoch: {epoch}")
            print(f"training accuracy: {train_acc}, training loss: {train_loss}")
            print(f"validation accuracy: {val_acc}, validation loss: {val_loss}")

if __name__ == "__main__":
    main()

Questions:

  1. Role of counts in W&B multiprocessing: How do I properly configure the counts parameter (if it exists) or any other relevant parameter in W&B to maximize parallelization without repeating hyperparameters?
  2. Ensuring unique hyperparameters: How can I ensure that each process picks a unique set of hyperparameters when running the sweep? Should I handle this in my Python code or is there a feature in W&B that manages this?
  3. Parallelization best practices: Are there best practices for implementing parallelization with W&B to get the most efficient use of resources? For example, is there a preferred way to structure the ProcessPoolExecutor or any other method to maximize parallel runs?

I am interested in both

  1. parallelizability with only CPU multi core option
  2. parallelizability with multiple GPU option

Any insights, code examples, or references to the W&B documentation would be greatly appreciated!


Extension and comments to how to do it in the CLI/Bash option are also welcomed!


Refs:

Hello, This a reply from our support bot designed to assist you with your Weights & Biases related queries. To reach a human please reply to this message.

To address your questions about running hyperparameter sweeps with Weights and Biases (W&B) using multiprocessing, let’s break down your queries:

  1. Role of counts in W&B multiprocessing: The count parameter in W&B sweeps specifies the total number of runs you’d like the sweep agent to execute. When using multiprocessing or any form of parallelization, W&B automatically manages the distribution of hyperparameter sets to ensure that each process works on a unique set of hyperparameters. You don’t need to manually configure counts for parallelization; instead, ensure that the count parameter reflects the total number of experiments you wish to run across all processes. If not specified, a random sweep will run indefinitely until manually stopped.

  2. Ensuring unique hyperparameters: W&B’s sweep server ensures that each agent (in your case, each process) picks a unique set of hyperparameters to run. This is managed automatically by W&B’s backend, so you don’t need to handle this in your Python code. When an agent requests a new set of hyperparameters, the sweep server assigns a set that hasn’t been used yet, based on the sweep configuration.

  3. Parallelization best practices: For maximizing efficiency in parallel runs, especially with CPU and GPU resources, consider the following:

    • CPU Multi-core Option: Utilize ProcessPoolExecutor as you’ve done, which is a good approach. Ensure that the number of processes does not exceed the number of available CPU cores to avoid context switching overhead.
    • GPU Option: When running on multiple GPUs, you can specify the GPU device in your training script using environment variables or library-specific settings (e.g., CUDA_VISIBLE_DEVICES for PyTorch or TensorFlow). Each process should be assigned a unique GPU. This can be managed outside your Python script, in a bash script, or by using W&B’s agent command with the --count parameter to specify the number of parallel runs.

For both CPU and GPU parallelization, W&B’s sweep functionality automatically manages the distribution of hyperparameter sets, so your main focus should be on efficiently utilizing your computational resources.

CLI/Bash Option for Sweeps:
To manage sweeps from the CLI, you can initialize and run agents using the W&B CLI commands. For example, to start a sweep, you would use:

wandb sweep sweep.yaml

And to run agents in parallel, you can start multiple instances of the following command in separate terminal sessions or scripts, specifying the sweep ID:

wandb agent your_sweep_id --count 10

This --count parameter here controls how many runs each agent will execute, which is useful for managing parallelization.

References:

  • For a comprehensive guide on setting up and running sweeps, including parallelization strategies, refer to the W&B Sweeps documentation.
  • For specific examples of CLI commands and managing sweeps, see the W&B CLI documentation.

These resources should help you efficiently implement parallelization in your W&B sweeps, leveraging both CPU and GPU resources.

To reach a human please reply to this message.

-WandBot :robot:

how do I reach out to a human? I want a human official verified answer pleace :slight_smile:

@system contact a human for help pls

When working with Weights and Biases (W&B/wandb) for hyperparameter (hp) optimization, you can use sweeps to systematically explore different combinations of hyperparameters to find the best performing set.

What are Sweeps?

Sweeps in W&B allow you to define a set of hyperparameters to search over. When you create a sweep in the cli or in python, this creates a set (e.g., grid) of hyperparameters to search over. This (usually afaik) create a sweep with all the possible hyperparameters in your wandb account/server and later when you run an agent it fetches one and tries that hp and logs it to wandb.

Understanding count and Agents

  • Agents: These are the workers that run the trials/hp attempts/runs – basically try each hp. Each agent pulls a set of hyperparameters (hps) from the W&B/wandb server, runs the (usually) training script (but in our example a search for optimal chinchilla scaling laws) with these hps, logs the results, and repeats.
  • Count: This is the number of trials/runs the agent will run. If you set count to 100, the agent will run 100 trials, each with a different combination of hyperparameters.

Afaik, if you use grid search and count is higher than the total number of combinations, the sweep stops after running all combinations. If count is lower, it runs only the specified number of trials.

Afaik as long as an agent is running and the sweep has hps to try, it will keep fetching them from your wandb sweep server from your wandb account. You Can see the agents running (and kill them, pause them) etc for a run in your wandb’s account.

I think the crux is that an agent continually fetches hps until the sweep is finished from your wandb’s sweep run on the wandb website (or you kill it). You can run multiple agents until the sweep’s hps are exhausted on your wanbd’s website.

I will provide an example without multiprocessing then make it multiprocessing (mp):

1 Example without Multiprocessing

import numpy as np
import scipy.optimize as opt
import scipy.special
import wandb

# Define the synthetic scaling law function
def scaling_law(c, e, a, b, alpha, beta):
    return np.exp(e) + np.exp(a) * c[:, 0] ** (-alpha) + np.exp(b) * c[:, 1] ** (-beta)

# Generate synthetic data
np.random.seed(0)
C = np.array([[7e9, 2e12], [13e9, 2e12], [34e9, 2e12], [70e9, 2e12]])  # [m, Din] = [m, 2]
e_true, a_true, b_true = np.log(1.8172), np.log(482.01), np.log(2085.43)
alpha_true, beta_true = 0.3478, 0.3658
L_target = scaling_law(C, e_true, a_true, b_true, alpha_true, beta_true).reshape(-1, 1)  # [m, K]
L_target = np.repeat(L_target, 1, axis=1)  # [m , K]

# Define the cost function using the Huber loss
def aggregate_huber_loss(theta_sl, c, l_target, delta=1e-3):
    e, a, b, alpha, beta = theta_sl
    E, A, B = np.exp(e), np.exp(a), np.exp(b)
    l_pred = E + A * c[:, 0] ** (-alpha) + B * c[:, 1] ** (-beta)
    log_l_target = np.log(l_target)
    x1 = a - alpha * np.log(c[:, 0]).reshape(-1, 1)
    x2 = b - beta * np.log(c[:, 1]).reshape(-1, 1)
    x3 = e * np.ones((c.shape[0], 1))
    lse = scipy.special.logsumexp([x1, x2, x3], axis=0)
    h = scipy.special.huber(delta, lse - log_l_target)
    return h.sum()

# Training function to run each trial
def train():
    wandb.init()
    config = wandb.config

    initial_params = [config.e, config.a, config.b, config.alpha, config.beta]

    # Perform the optimization
    result = opt.minimize(aggregate_huber_loss, initial_params, args=(C, L_target), method='BFGS')
    optimized_params = result.x

    e_opt, a_opt, b_opt, alpha_opt, beta_opt = optimized_params
    loss = aggregate_huber_loss(optimized_params, C, L_target)

    wandb.log({
        "e": e_opt,
        "a": a_opt,
        "b": b_opt,
        "alpha": alpha_opt,
        "beta": beta_opt,
        "loss": loss
    })

# Sweep configuration for grid search
sweep_config = {
    "method": "grid",
    "metric": {
        "name": "loss",
        "goal": "minimize"
    },
    "parameters": {
        "e": {
            "values": [-1, 0, 1]
        },
        "a": {
            "values": [0, 5, 10]
        },
        "b": {
            "values": [0, 5, 10]
        },
        "alpha": {
            "values": [0, 1, 2]
        },
        "beta": {
            "values": [0, 1, 2]
        }
    }
}

# Initialize the sweep
sweep_id = wandb.sweep(sweep_config, project="scaling-law-optimization")

# Print the sweep URL and ID
print(f"Sweep URL: https://wandb.ai/{wandb.run.entity}/{wandb.run.project}/sweeps/{sweep_id}")
print(f"Sweep ID: {sweep_id}")

# Run the sweep
# wandb.agent(sweep_id, function=train, count=10)  # only tries 10 out og G^5 sweeps
wandb.agent(sweep_id, function=train)  # tries all G^5 hps! Sweeps them all!

my understanding is as long as the agents are running it keeps fetching hps fromt he server sweep until the sever (your wandb site/account) for this sweep is exhausted. Some sweeps like random and bayesian afaik can run forever! So count is important here (or manually killing it).

2 Example with Multiprocessing

The main idea I think is realizing that when you create a sweep (in python or in the cli), the process fetching hps/trials to try is the agent. So my suggestions to parallelize over the agent e.g.,

import numpy as np
import scipy.optimize as opt
import scipy.special
import wandb
from multiprocessing import Process, cpu_count

# Define the synthetic scaling law function
def scaling_law(c, e, a, b, alpha, beta):
    return np.exp(e) + np.exp(a) * c[:, 0] ** (-alpha) + np.exp(b) * c[:, 1] ** (-beta)

# Generate synthetic data
np.random.seed(0)
C = np.array([[7e9, 2e12], [13e9, 2e12], [34e9, 2e12], [70e9, 2e12]])  # [m, Din] = [m, 2]
e_true, a_true, b_true = np.log(1.8172), np.log(482.01), np.log(2085.43)
alpha_true, beta_true = 0.3478, 0.3658
L_target = scaling_law(C, e_true, a_true, b_true, alpha_true, beta_true).reshape(-1, 1)  # [m, K]
L_target = np.repeat(L_target, 1, axis=1)  # [m , K]

# Define the cost function using the Huber loss
def aggregate_huber_loss(theta_sl, c, l_target, delta=1e-3):
    e, a, b, alpha, beta = theta_sl
    E, A, B = np.exp(e), np.exp(a), np.exp(b)
    l_pred = E + A * c[:, 0] ** (-alpha) + B * c[:, 1] ** (-beta)
    log_l_target = np.log(l_target)
    x1 = a - alpha * np.log(c[:, 0]).reshape(-1, 1)
    x2 = b - beta * np.log(c[:, 1]).reshape(-1, 1)
    x3 = e * np.ones((c.shape[0], 1))
    lse = scipy.special.logsumexp([x1, x2, x3], axis=0)
    h = scipy.special.huber(delta, lse - log_l_target)
    return h.sum()

# Training function to run each trial
def train():
    wandb.init()
    config = wandb.config

    initial_params = [config.e, config.a, config.b, config.alpha, config.beta]

    # Perform the optimization
    result = opt.minimize(aggregate_huber_loss, initial_params, args=(C, L_target), method='BFGS')
    optimized_params = result.x

    e_opt, a_opt, b_opt, alpha_opt, beta_opt = optimized_params
    loss = aggregate_huber_loss(optimized_params, C, L_target)

    wandb.log({
        "e": e_opt,
        "a": a_opt,
        "b": b_opt,
        "alpha": alpha_opt,
        "beta": beta_opt,
        "loss": loss
    })

# Sweep configuration for grid search
sweep_config = {
    "method": "grid",
    "metric": {
        "name": "loss",
        "goal": "minimize"
    },
    "parameters": {
        "e": {
            "values": [-1, 0, 1]
        },
        "a": {
            "values": [0, 5, 10]
        },
        "b": {
            "values": [0, 5, 10]
        },
        "alpha": {
            "values": [0, 1, 2]
        },
        "beta": {
            "values": [0, 1, 2]
        }
    }
}

# Initialize the sweep
sweep_id = wandb.sweep(sweep_config, project="scaling-law-optimization")

# Print the sweep URL and ID
print(f"Sweep URL: https://wandb.ai/{wandb.run.entity}/{wandb.run.project}/sweeps/{sweep_id}")
print(f"Sweep ID: {sweep_id}")

# Function to run an agent
def run_agent():
    # wandb.agent(sweep_id, function=train, count=10)  # runs subset 10 <= G^5 sweeps
    wandb.agent(sweep_id, function=train)  # keeps fetching hps until all hps in sweep are done. All G^5

# Number of agents to run in parallel
num_agents = min(cpu_count(), 72)  # Adjust this number based on your system

if __name__ == "__main__":
    processes = []
    for _ in range(num_agents):
        p = Process(target=run_agent)
        p.start()
        processes.append(p)
    
    for p in processes:
        p.join()
    print('Done!\a')

In CLI Bash

1 Without multiprocressing

Creteate an sweep from the config .yaml file and then pass that sweep id to the multiple agents you create (e.g., with a slurm array or tmux sessions).

In detail:

1 create yaml file for sweeps

program: ~/github_repo_proj_folder/scaling_laws.py
method: grid
metric:
  name: loss
  goal: minimize
parameters:
  e:
    values: [-1, 0, 1]
  a:
    values: [0, 5, 10]
  b:
    values: [0, 5, 10]
  alpha:
    values: [0, 1, 2]
  beta:
    values: [0, 1, 2]

2: Initialize the Sweep in cli terminal

wandb sweep sweep_config.yaml

This command will output a sweep ID in the format entity/project/sweep_ID. Note this sweep ID for the next steps.

3: Run a Single Agent

Running an agent will continually fetch hps from your wandb server sweep run until it’s done:

wandb agent <sweep_id>

replacing <sweep_id> with your actual sweep ID, e.g., from the output of the previous command.

2 With multiprocessing parallelization in the cli

One way is to run each agent once the wandb sweep is initialize in lots of tmux sessions with the & commands or with slurm arrays or even with nohop:

#!/bin/bash

# start sweep
## wandb sweep sweep_config.yaml

# Number of agents to run
NUM_AGENTS=4
SWEEP_ID=<sweep_id>  # Replace with your actual sweep ID

# Run agents in parallel
for i in $(seq 1 $NUM_AGENTS); do
  nohup wandb agent $SWEEP_ID > agent_$i.log 2>&1 &
done

# Wait for all agents to finish (optional)
wait

ref: multithreading - How to implement multiprocessing with Weights and Biases wandb sweeps for maximum parallelization, especially how the count var work in this setting? - Stack Overflow

multithreading - How to implement multiprocessing with Weights and Biases wandb sweeps for maximum parallelization, especially how the count var work in this setting? - Stack Overflow full SO answer

and other refs:

and

Hi @brando thank you for reaching out, and I apologize for the ticket not being rerouted to our support team initially. We’ve identified the issue within our wandbot automation, and moving forward, tickets will be correctly assigned to the support team upon the first response.

I’m glad to hear you found a solution to parallelize the agents using the multiprocessing package. Thank you for sharing your example in our Forum! This is marked as resolved on our end now as well, but feel free to reply here if you had any follow up questions.