The Perl programming language and concurrent processing (Part 2)

In today’s example I show you how to setup a very basic bi-directional IPC (Inter-Process Communication) pathway between two or more child processes to the parent.


#!/usr/bin/perl
#
# Bi-directional IPC communication example between two or more forked processes
# 
# Authored by: Colin Faber <cfaber@fpsn.net>
#

### LOAD MODULES
# Keep it clean!
use strict;
use warnings;

# Import the flock constants
use Fcntl ':flock';

# IO::Handle required for autoflush()
use IO::Handle;

# IO::Select needed to prevent blocking on handle reads
use IO::Select;

# Easy fork management
use Proc::Fork::Control;


### HANDLE SIGNALS
# Trap ctrl+c nicely
$SIG{INT} = sub {
	if(cfork_is_child()){
		print "Child $$ exiting\n";
		cfork_exit();
	} else {
		print "Bye!\n";
		cfork_kill_children();
		cfork_exit();
	}
};

$SIG{TERM} = sub {
	if(cfork_is_child()){
		print "\nChild $$ exiting!\n";
	}

	cfork_exit();
};

### SETUP CONFIGURATION
# Setup our parent and child readers
my $sel_cr = IO::Select->new;
my $sel_pr = IO::Select->new;

# We'll use this to store our handles
my %list;

# Allow up to 5 forks
cfork_init(5);

# Set nonblocking mode so forks return instantly on success
cfork_nonblocking(1);

### START WORK
# Spawn 5 jobs
for my $job (0 .. 4){ 

	# Generate the client / parent read write pipes
	pipe(my $pr, my $cw);
	pipe(my $cr, my $pw);

	# autoflush the writers
	$cw->autoflush(1);
	$pw->autoflush(1);

	# Add the readers to their IO::Select queues
	$sel_pr->add($pr);
	$sel_cr->add($cr);

	# Fork
	my $pid = cfork(sub {
		my $i = 0;

		# Loop endlessly
		while(1){

			# Returns a list of ready to read handles
			for($sel_pr->can_read(0.001)){

				# Find the handle which this child belongs to
				if($_ == $pr){

					# Nibble one line off it
					chomp(my $line = <$pr>);

					# Lock the writer output to prevent clobbering
					flock(STDOUT, &LOCK_EX);

					# Print out to console
					print "Child Pid $$ just read this: '$line'\n";

					# Ulock the writer output
					flock(STDOUT, &LOCK_UN);
				}
			}

			## Write data back to the parent

			# lock the handle to the parent
			flock($pw, &LOCK_EX);

			# Send data to the parent handle
			print $pw "$i Job $job Child Pid [$$] is sending this " . localtime() . "\n";

			# Unlock the parent handle
			flock($pw, &LOCK_UN);

			# For fun, after (X) iterations, stop the child job 2
			if($job == 2 && $i > 5){
				print "Stopping $$\n";
				cfork_exit();
			}

			$i++;

			# Sleep for a second.
			cfork_sleep(1);
		}
	});

	## Back in the parent

	# Record the PID and list of handles associated with it,
	# two readers, two writers
	$list{ $pid } = {
		'pr' => $pr,
		'cr' => $cr,
		'pw' => $pw,
		'cw' => $cw
	};
}

# disable async
cfork_nonblocking(0);

print "Reading\n";

my $transno;

# Parent loop endlessly.
while(1){

	my $success;

	## Stage 1
	# Check the list of running children
	for(keys %list){

		# Is the PID still running?
		if(!kill(0, $_)){

			# PID is dead, so remove it's handles from the IO::Select queues
			$sel_cr->remove( $list{ $_ }->{cr} );
			$sel_pr->remove( $list{ $_ }->{pr} );

			# Remove this child from the handles list
			delete $list{ $_ };
		}
	}

	## Stage 2
	# Print out data from the children by scanning the available handles list
	for my $ready ($sel_cr->can_read(0.001)){
		my ($cr, $cw);

		# Scan the list of running children
		for my $pid (keys %list){

			# Identify the handle that's ready to communication
			if($list{ $pid }->{cr} == $ready){
				$transno++;

				$cr = $list{ $pid }->{cr};
				$cw = $list{ $pid }->{cw};

				# Lock the output, send data to the child, unlock the output
				flock($cw, &LOCK_EX);
				print $cw "$transno Parent Pid $$ is sending this " . localtime() . "\n";
				flock($cw, &LOCK_UN);

				# Read the data from the child
				chomp(my $line = <$cr>);

				flock(STDOUT, &LOCK_EX);
				print "Received: '$line'\n";
				flock(STDOUT, &LOCK_UN);

				# Set the success flag and end the search within the %list loop
				$success = 1;
				last;
			}
		}
	}

	# only sleep if the success flag is false, otherwise try again for another read right away.
	sleep 1 if !$success;
}

# Wait for all children to exit gracefully
cfork_wait(1);

cfork_exit();

Using the above code you can see the processes communicating with each other in the following output:

$ perl bidirectional-ipc.pl 
Reading
Received: '0 Job 0 Child Pid [122913] is sending this Fri Jan  1 09:47:32 2021'
Received: '0 Job 1 Child Pid [122914] is sending this Fri Jan  1 09:47:32 2021'
Received: '0 Job 4 Child Pid [122917] is sending this Fri Jan  1 09:47:32 2021'
Received: '0 Job 2 Child Pid [122915] is sending this Fri Jan  1 09:47:32 2021'
Received: '0 Job 3 Child Pid [122916] is sending this Fri Jan  1 09:47:32 2021'
Child Pid 122913 just read this: '1 Parent Pid 122912 is sending this Fri Jan  1 09:47:32 2021'
Child Pid 122914 just read this: '2 Parent Pid 122912 is sending this Fri Jan  1 09:47:32 2021'
Child Pid 122915 just read this: '4 Parent Pid 122912 is sending this Fri Jan  1 09:47:32 2021'
Child Pid 122916 just read this: '5 Parent Pid 122912 is sending this Fri Jan  1 09:47:32 2021'
Child Pid 122917 just read this: '3 Parent Pid 122912 is sending this Fri Jan  1 09:47:32 2021'
Received: '1 Job 0 Child Pid [122913] is sending this Fri Jan  1 09:47:33 2021'
Received: '1 Job 1 Child Pid [122914] is sending this Fri Jan  1 09:47:33 2021'
Received: '1 Job 2 Child Pid [122915] is sending this Fri Jan  1 09:47:33 2021'
Received: '1 Job 3 Child Pid [122916] is sending this Fri Jan  1 09:47:33 2021'
Received: '1 Job 4 Child Pid [122917] is sending this Fri Jan  1 09:47:33 2021'
Child Pid 122913 just read this: '6 Parent Pid 122912 is sending this Fri Jan  1 09:47:33 2021'
Child Pid 122914 just read this: '7 Parent Pid 122912 is sending this Fri Jan  1 09:47:33 2021'
Child Pid 122915 just read this: '8 Parent Pid 122912 is sending this Fri Jan  1 09:47:33 2021'
Child Pid 122916 just read this: '9 Parent Pid 122912 is sending this Fri Jan  1 09:47:33 2021'
Child Pid 122917 just read this: '10 Parent Pid 122912 is sending this Fri Jan  1 09:47:33 2021'
Received: '2 Job 0 Child Pid [122913] is sending this Fri Jan  1 09:47:34 2021'
Received: '2 Job 1 Child Pid [122914] is sending this Fri Jan  1 09:47:34 2021'
Received: '2 Job 2 Child Pid [122915] is sending this Fri Jan  1 09:47:34 2021'
Received: '2 Job 3 Child Pid [122916] is sending this Fri Jan  1 09:47:34 2021'
Received: '2 Job 4 Child Pid [122917] is sending this Fri Jan  1 09:47:34 2021'
Child Pid 122913 just read this: '11 Parent Pid 122912 is sending this Fri Jan  1 09:47:34 2021'
Child Pid 122916 just read this: '14 Parent Pid 122912 is sending this Fri Jan  1 09:47:34 2021'
Child Pid 122914 just read this: '12 Parent Pid 122912 is sending this Fri Jan  1 09:47:34 2021'
Child Pid 122915 just read this: '13 Parent Pid 122912 is sending this Fri Jan  1 09:47:34 2021'
Child Pid 122917 just read this: '15 Parent Pid 122912 is sending this Fri Jan  1 09:47:34 2021'
Received: '3 Job 0 Child Pid [122913] is sending this Fri Jan  1 09:47:35 2021'
Received: '3 Job 1 Child Pid [122914] is sending this Fri Jan  1 09:47:35 2021'
Received: '3 Job 2 Child Pid [122915] is sending this Fri Jan  1 09:47:35 2021'
Received: '3 Job 3 Child Pid [122916] is sending this Fri Jan  1 09:47:35 2021'
Received: '3 Job 4 Child Pid [122917] is sending this Fri Jan  1 09:47:35 2021'
^CBye!
Child 122917 exiting
Child 122916 exiting
Child 122915 exiting
Child 122913 exiting
Child 122914 exiting

Leave a Reply

Your email address will not be published. Required fields are marked *