Parallel execution in Bash example

Below is an example I vibe coded up with Grok which provides parallel execution of tasks within Bash 4.3+ and later. It leverages internal bash features as well as external temporary tracking files for output capture. It seems to work well and is written in such a way that it can be easily incorporated into existing bash scripting.

#!/bin/bash
# Below is a set of bash functions for managing parallel execution in scripts.
#
# These functions allow running multiple commands concurrently while
# controlling the maximum number of simultaneous jobs. This uses
# background jobs (&) and 'wait -n' (requires Bash 4.3+ for reliable
# non-blocking wait; falls back to blocking 'wait' if needed, but
# may not enforce limits perfectly on older Bash versions).
# Outputs (stdout and stderr separately) from each job are captured to temporary files.
#
# A mechanism is provided to retrieve the captured output for each job after completion.

# Global variables for tracking jobs
# All globals are prefixed with pe_

# Initialize parallel environment with a max concurrency limit.
# Usage: pe_init <max_concurrent>
# Note: Set pe_tmp_dir before calling pe_init to change the temporary directory (default: /tmp)
# Set pe_prefix_with_job=true to prefix outputs with job number when printing
pe_init() {
  if [[ ${BASH_VERSINFO[0]} -lt 4 || (${BASH_VERSINFO[0]} -eq 4 && ${BASH_VERSINFO[1]} -le 2) ]]; then
    echo "Error: This script requires Bash 4.3 or higher." >&2
    exit 1
  fi

  if [ "$#" -ne 1 ]; then
    echo "Error: pe_init requires exactly one argument (max_concurrent)." >&2
    return 1
  fi

  pe_max_concurrent="$1"
  pe_job_counter=1
  pe_last_job_id=

  unset pe_job_out_files
  unset pe_job_err_files
  unset pe_job_pids

  declare -A pe_job_out_files
  declare -A pe_job_err_files
  declare -A pe_job_pids

  pe_tmp_dir="${pe_tmp_dir:-/tmp}"

  set -m
}

# Run a command in parallel, respecting the max_concurrent limit.
# If the limit is reached, wait for one job to finish before starting.
# Captures stdout and stderr to separate temp files.
# Sets the global pe_last_job_id to the job_id after starting.
# Usage: pe_run <command> [args...]
# After call: job_id=$pe_last_job_id
pe_run() {
  if [ "$#" -lt 1 ]; then
    echo "Error: pe_run requires at least one argument (command)." >&2
    return 1
  fi

  # Check current running jobs
  while [ "$(jobs -pr | wc -l)" -ge "$pe_max_concurrent" ]; do
    wait -n || wait  # Use wait -n if available; fallback to blocking wait
  done

  job_id=$pe_job_counter
  pe_job_counter=$((pe_job_counter + 1))
  tmp_out=$(mktemp "$pe_tmp_dir/pe_out.XXXXXX")
  tmp_err=$(mktemp "$pe_tmp_dir/pe_err.XXXXXX")

  "$@" > "$tmp_out" 2> "$tmp_err" &

  pe_job_pids[$job_id]=$!
  pe_job_out_files[$job_id]=$tmp_out
  pe_job_err_files[$job_id]=$tmp_err
  pe_last_job_id=$job_id
}

# Wait for all parallel jobs to complete.
# Usage: pe_wait
pe_wait() {
  wait
}

# Retrieve the captured output (stdout to stdout, stderr to stderr) for a specific job.
# Waits for the job only if it's still running.
# Cleans up the temp files after retrieval.
# If pe_prefix_with_job=true, prefixes each line with [job <id>] for stdout and [job <id> err] for stderr.
# Usage: pe_get_output <job_id>
pe_get_output() {
  if [ "$#" -ne 1 ]; then
    echo "Error: pe_get_output requires exactly one argument (job_id)." >&2
    return 1
  fi

  job_id="$1"
  if [ -z "${pe_job_pids[$job_id]}" ]; then
    echo "Error: No such job_id $job_id." >&2
    return 1
  fi

  wait "${pe_job_pids[$job_id]}" 2>/dev/null

  out_file="${pe_job_out_files[$job_id]}"
  err_file="${pe_job_err_files[$job_id]}"

  if [ "${pe_prefix_with_job:-false}" = true ]; then
    sed "s/^/[job $job_id] /" "$out_file"
    sed "s/^/[job $job_id err] /" "$err_file" >&2
  else
    cat "$out_file"
    cat "$err_file" >&2
  fi

  rm -f "$out_file" "$err_file"

  unset pe_job_out_files[$job_id]
  unset pe_job_err_files[$job_id]
  unset pe_job_pids[$job_id]
}

# Helper function to start as many tasks as possible from the queue.
# Assumes global pe_tasks array and pe_task_index.
# Usage: pe_start_tasks
pe_start_tasks() {
  while [ "$(jobs -pr | wc -l)" -lt "$pe_max_concurrent" ] && [ $pe_task_index -lt ${#pe_tasks[@]} ]; do
    pe_run bash -c "${pe_tasks[$pe_task_index]}"

    job_id=$pe_last_job_id
#    echo "$(date +%H:%M:%S): Started job $job_id (sleep $((pe_task_index + 1)))"
    pe_task_index=$((pe_task_index + 1))
  done
}

# Helper function to process completed jobs.
# Usage: pe_process_completed
pe_process_completed() {
  local ps_o="state="

  if [ "$(uname -s)" = "Darwin" ]; then
    ps_o="stat="
  fi

  completed=()

  for job_id in "${!pe_job_pids[@]}"; do
    pid="${pe_job_pids[$job_id]}"

    if kill -0 "$pid" 2>/dev/null; then
      # Process exists, check if zombie
      state=$(ps -o "$ps_o" -p "$pid" 2>/dev/null | tr -d '[:space:]')

      if [[ "$state" == Z* ]]; then
        # It's a zombie, reap it
        wait "$pid" 2>/dev/null
        completed+=("$job_id")
      fi
    else
      # Process does not exist, consider completed
      completed+=("$job_id")
    fi
  done

  for job_id in "${completed[@]}"; do
    out_file="${pe_job_out_files[$job_id]}"
    err_file="${pe_job_err_files[$job_id]}"
    has_output=false

    if [ -s "$out_file" ] || [ -s "$err_file" ]; then
      has_output=true
    fi

    if $has_output; then
#      echo "$(date +%H:%M:%S): Output from job $job_id:"

      pe_get_output "$job_id"

#      echo "---"
    else
      # Still cleanup even if no output
      rm -f "$out_file" "$err_file"

      unset pe_job_out_files[$job_id]
      unset pe_job_err_files[$job_id]
      unset pe_job_pids[$job_id]
    fi
  done
}

# Helper function to execute all tasks in parallel.
# Assumes global pe_tasks array and pe_init called.
# Usage: pe_execute
pe_execute() {
  pe_task_index=0

  while [ $pe_task_index -lt ${#pe_tasks[@]} ] || [ ${#pe_job_pids[@]} -gt 0 ]; do
    pe_start_tasks

    if [ ${#pe_job_pids[@]} -gt 0 ]; then
      wait -n
      ret=$?

      if [ $ret -ne 127 ]; then
        while true; do
          wait -n
          if [ $? -eq 127 ]; then
            break
          fi
        done
      else
        sleep 0.1
      fi
      pe_process_completed
    fi
  done
#  echo "$(date +%H:%M:%S): All parallel jobs completed."
}

To use this code, it’s pretty simple. Save the file (pe_execute.bash) and then include it into your code, here’s an example:

#!/bin/bash
# Example usage:
# This script runs 10 commands in parallel, with a max of 3 concurrent jobs.
# Each command echoes a message and sleeps.
# The example caller is simplified using helper functions: pe_start_tasks, pe_process_completed, and pe_execute.

# include the library
. pe_execute.bash

# Initialize and limit the number of concurrent executions
pe_init 3

# Setup the task list to run in parallel
pe_tasks=()
for i in {1..10}; do
  pe_tasks+=("echo 'Hello from job $i'; sleep $i; if [ $i -eq 5 ]; then echo 'Error in job 5' >&2; fi")
done

# Run the tasks
pe_execute

Leave a Reply

Your email address will not be published. Required fields are marked *