diff --git a/README.md b/README.md index 6f75fb1..ffc634a 100644 --- a/README.md +++ b/README.md @@ -15,7 +15,7 @@ composer require sweikenb/pcntl You can just create an instance of `\Sweikenb\Library\Pcntl\ProcessManager` and create a process-fork by calling `runProcess()`. -The manager will handle the rest and makes sure all process will be terminated properly. It will also make shure that +The manager will handle the rest and makes sure all process will be terminated properly. It will also make sure that the major system signals will be propagated to the child processes as well. In case you want to define your own set of signals you want to propagate to the children, you can add an array with the signals as second argument to the constructor. @@ -58,6 +58,70 @@ The following events are thrown: | process.manager.child.exit | A child has exited. | | process.manager.child.send.kill | A kill signal was sent to a child process. | +## Inter Process Communication + +You can send data between the parent and child process using messages. + +The data gets send by sockets and can be anything that can be encoded using `serialize()`: + +```php +runProcess(function(ChildProcessInterface $childProcess, ParentProcessInterface $parentProcess){ + $message = $parentProcess->getNextMessage(true); + if ($message) { + // Process message here ... + fwrite( + STDOUT, + fprintf('Got a message from the parent process: %s - %s', $message->getTopic(), $message->getPayload()) + ); + } + $parentProcess->sendMessage(new MessageModel('some_response', 'hello parent')); +}); + +$child->sendMessage(new MessageModel('some_topic', 'hello child')); + +// wait and cleanup +sleep(3); +$child->kill(); +``` + +## Process Pool & Worker Processes + +You can also distribute workload across multiple worker to work in parallel. The actual work must be placed inside a +class that is invokable _(`__invoke`)_ and must not have a constructor. + +```php +sendMessage($messageFactory->create('some_topic', ExampleWorkerService::class)); +} + +// wait and cleanup +sleep(5); +$pool->killAll(); +``` + ## Example ```php diff --git a/composer.json b/composer.json index b7aaf61..73437ba 100644 --- a/composer.json +++ b/composer.json @@ -12,7 +12,8 @@ "require": { "php": "^8.1", "ext-pcntl": "*", - "ext-posix": "*" + "ext-posix": "*", + "ext-sockets": "*" }, "suggest": { "symfony/event-dispatcher": "If you want to add events, you can use the symfony event dispatcher." diff --git a/example/000_simple.php b/example/000_simple.php index 1916b85..44c925c 100644 --- a/example/000_simple.php +++ b/example/000_simple.php @@ -22,25 +22,25 @@ $childA = $pm->runProcess( function (ChildProcessInterface $childProcess, ParentProcessInterface $parentProcess) { - sleep(mt_rand(1, 10)); + sleep(mt_rand(1, 3)); echo "Hallo from child A\n"; } ); $childB = $pm->runProcess( function (ChildProcessInterface $childProcess, ParentProcessInterface $parentProcess) { - sleep(mt_rand(1, 10)); + sleep(mt_rand(1, 3)); echo "Hallo from child B\n"; } ); $childC = $pm->runProcess( function (ChildProcessInterface $childProcess, ParentProcessInterface $parentProcess) { - sleep(mt_rand(1, 10)); + sleep(mt_rand(1, 3)); echo "Hallo from child C\n"; } ); $childD = $pm->runProcess( function (ChildProcessInterface $childProcess, ParentProcessInterface $parentProcess) { - sleep(mt_rand(1, 10)); + sleep(mt_rand(1, 3)); echo "Hallo from child D\n"; } ); diff --git a/example/100_simple_ipc.php b/example/100_simple_ipc.php new file mode 100644 index 0000000..59ac553 --- /dev/null +++ b/example/100_simple_ipc.php @@ -0,0 +1,75 @@ + $workers */ + +for ($i = 0; $i < $numWorker; $i++) { + $workers[$i] = $pm->runProcess( + function (ChildProcessInterface $process, ParentProcessInterface $parentProcess) use ($i, $factory) { + fwrite( + STDOUT, + sprintf("> Worker #%d: started and ready to process messages\n", ($i + 1)) + ); + $count = 0; + while ($message = $parentProcess->getNextMessage()) { + $count++; + fwrite( + STDOUT, + sprintf( + ">> Worker #%d: received a message: '%s' '%s' (no. msg.: %d)\n", + ($i + 1), + $message->getTopic(), + $message->getPayload(), + $count + ) + ); + $parentProcess->sendMessage( + $factory->create( + sprintf('Answer from #%d', $process->getId()), + sprintf("msg %d", $count) + ) + ); + } + } + ); +} + +for ($i = 0; $i < $numWorker * $numMessages; $i++) { + $workerId = $i % $numWorker; + $message = $factory->create('some message for ' . ($workerId + 1), 'some payload for ' . ($workerId + 1)); + $workers[$workerId]->sendMessage($message); +} + +foreach ($workers as $i => $worker) { + $count = 0; + while ($count < $numMessages) { + $count++; + $message = $worker->getNextMessage(); + fwrite( + STDOUT, + sprintf( + ">> Worker #%d answered with message: '%s' '%s'\n", + $worker->getId(), + $message->getTopic(), + $message->getPayload() + ) + ); + } + + // stop the worker process + fwrite(STDOUT, sprintf('# Stopping worker (%d)', $worker->getId())); + $worker->kill(); +} diff --git a/example/110_process_pool.php b/example/110_process_pool.php new file mode 100644 index 0000000..5e0752d --- /dev/null +++ b/example/110_process_pool.php @@ -0,0 +1,26 @@ +sendMessage( + $factory->create('hello_world', ExampleWorkerService::class) + ); +} + +// Give the workers some time to work. +// Usually you would send messages in some kid of event/endless-loop and/or with some custom unblock logic. +sleep(3); + +// Work done, kill all workers! +// HINT: if you skipp this kill, the main process and its worker will run infinitely +$pool->killAll(); diff --git a/example/ExampleWorkerService.php b/example/ExampleWorkerService.php new file mode 100644 index 0000000..b606ee7 --- /dev/null +++ b/example/ExampleWorkerService.php @@ -0,0 +1,18 @@ +getId() + ) + ); + } +} diff --git a/src/Api/ChildProcessInterface.php b/src/Api/ChildProcessInterface.php index f886ff7..ccc68f7 100644 --- a/src/Api/ChildProcessInterface.php +++ b/src/Api/ChildProcessInterface.php @@ -4,6 +4,5 @@ interface ChildProcessInterface extends ProcessInterface { - // The library provides different classes for parent and child to make programming more expressive. - // Basically you could create just one model which implements all interfaces and achieve the same results. -} \ No newline at end of file + public function kill(): void; +} diff --git a/src/Api/Ipc/MessageInterface.php b/src/Api/Ipc/MessageInterface.php new file mode 100644 index 0000000..850dbae --- /dev/null +++ b/src/Api/Ipc/MessageInterface.php @@ -0,0 +1,22 @@ +processId; } -} \ No newline at end of file +} diff --git a/src/Factory/MessageFactory.php b/src/Factory/MessageFactory.php new file mode 100644 index 0000000..acc0240 --- /dev/null +++ b/src/Factory/MessageFactory.php @@ -0,0 +1,14 @@ +ipcSocket) { + @socket_shutdown($this->ipcSocket); + @socket_close($this->ipcSocket); + $this->ipcSocket = null; + } + } public function getId(): int { return $this->id; } -} \ No newline at end of file + + public function setIpcSocket(?Socket $socket): self + { + $this->ipcSocket = $socket; + return $this; + } + + public function getIpcSocket(): ?Socket + { + return $this->ipcSocket; + } + + public function sendMessage(MessageInterface $message): bool + { + // send message and capture the next response-message + $socket = $this->getIpcSocket(); + if ($socket) { + $buffer = serialize($message); + $buffer = sprintf("%s#%s", strlen($buffer), $buffer); + if (socket_write($socket, $buffer, strlen($buffer)) === false) { + throw new ProcessException(socket_strerror(socket_last_error($socket))); + } + return true; + } + return false; + } + + public function getNextMessage(bool $wait = true): ?MessageInterface + { + $socket = $this->getIpcSocket(); + if (!$socket) { + return null; + } + + $length = ''; + $buffer = null; + while (true) { + if ($buffer === null) { + $char = socket_read($socket, 1); + if ($char === false) { + throw new ProcessException(socket_strerror(socket_last_error($socket))); + } + if ($char === '') { + if ($wait) { + // unblock the system + usleep(1000); + continue; + } + return null; + } + if ($char === '#') { + $length = intval($length); + $buffer = ''; + } else { + if (!is_numeric($char)) { + throw new ProcessException('Unexpected char, can not read message.'); + } + $length .= $char; + } + continue; + } + $buffer = socket_read($socket, $length); + $message = @unserialize($buffer); + if ($message === false) { + throw new ProcessException('Can not unserialize message.'); + } + if ($message instanceof MessageInterface) { + return $message; + } + throw new ProcessException('Invalid message received.'); + } + } +} diff --git a/src/Model/ChildProcessModel.php b/src/Model/ChildProcessModel.php index a7c0a88..b98082f 100644 --- a/src/Model/ChildProcessModel.php +++ b/src/Model/ChildProcessModel.php @@ -6,6 +6,8 @@ class ChildProcessModel extends AbstractProcessModel implements ChildProcessInterface { - // The library provides different classes for parent and child to make programming more expressive. - // Basically you could create just one model which implements all interfaces and achieve the same results. -} \ No newline at end of file + public function kill(): void + { + @posix_kill($this->getId(), SIGKILL); + } +} diff --git a/src/Model/Ipc/MessageModel.php b/src/Model/Ipc/MessageModel.php new file mode 100644 index 0000000..ad3ae52 --- /dev/null +++ b/src/Model/Ipc/MessageModel.php @@ -0,0 +1,48 @@ +topic; + } + + public function getPayload(): mixed + { + return $this->payload; + } + + public function serialize(): ?string + { + return serialize($this->__serialize()); + } + + public function unserialize(string $data): void + { + $this->__unserialize(unserialize($data)); + } + + public function __serialize(): array + { + return [ + 'topic' => $this->topic, + 'payload' => $this->payload, + ]; + } + + public function __unserialize(array $data): void + { + $this->topic = $data['topic'] ?? ''; + $this->payload = $data['payload'] ?? ''; + } +} diff --git a/src/Model/Ipc/WorkerMessageModel.php b/src/Model/Ipc/WorkerMessageModel.php new file mode 100644 index 0000000..cb79749 --- /dev/null +++ b/src/Model/Ipc/WorkerMessageModel.php @@ -0,0 +1,39 @@ +getPayload(); + call_user_func( + $processPool->getInvocationBuilder(), + $classname, + $childProcess, + $parentProcess + ); + } catch (Exception $e) { + fwrite( + STDERR, + sprintf( + "WORKER(%d) ERROR: %s (%s:%s)\n", + $childProcess->getId(), + $e->getMessage(), + $e->getFile(), + $e->getLine() + ) + ); + } + } +} diff --git a/src/Model/ParentProcessModel.php b/src/Model/ParentProcessModel.php index c203a47..179883b 100644 --- a/src/Model/ParentProcessModel.php +++ b/src/Model/ParentProcessModel.php @@ -8,4 +8,4 @@ class ParentProcessModel extends AbstractProcessModel implements ParentProcessIn { // The library provides different classes for parent and child to make programming more expressive. // Basically you could create just one model which implements all interfaces and achieve the same results. -} \ No newline at end of file +} diff --git a/src/ProcessManager.php b/src/ProcessManager.php index 3067ce8..d6f58e8 100644 --- a/src/ProcessManager.php +++ b/src/ProcessManager.php @@ -28,6 +28,10 @@ class ProcessManager implements ProcessManagerInterface private ?EventDispatcherInterface $eventDispatcher = null; private ProcessFactoryInterface $processFactory; private ParentProcessInterface $mainProcess; + + /** + * @var array + */ private array $childProcesses = []; private bool $isChildProcess = false; @@ -37,13 +41,10 @@ public function __construct( ?ProcessFactoryInterface $processFactory = null ) { // make sure we have a proper factory, if non is provided, use the one that comes with the library - if (!$processFactory) { - $processFactory = new ProcessFactory(); - } - $this->processFactory = $processFactory; + $this->processFactory = $processFactory ?? new ProcessFactory(); // create an instance for the current (parent) process - $this->mainProcess = $this->processFactory->createParentProcess(posix_getpid()); + $this->mainProcess = $this->processFactory->createParentProcess(posix_getpid(), null); // any special signals that should be handled? $propagateSignals = empty($propagateSignals) @@ -68,8 +69,7 @@ function (int $dispatchSignal) { function () use ($autoWait) { if ($autoWait) { $this->wait(); - } - else { + } else { if (!empty($this->childProcesses)) { foreach ($this->childProcesses as $childProcess) { echo sprintf( @@ -114,6 +114,12 @@ public function runProcess(callable $callback): ChildProcessInterface throw new ProcessException('Multi-level process-nesting not supported.'); } + // create IPC sockets + $ipc = []; + if (socket_create_pair(AF_UNIX, SOCK_STREAM, 0, $ipc) === false) { + throw new ProcessException(socket_strerror(socket_last_error())); + } + // fork now $pid = pcntl_fork(); @@ -125,7 +131,8 @@ public function runProcess(callable $callback): ChildProcessInterface // we are the parent if ($pid > 0) { - $childProcess = $this->processFactory->createChildProcess($pid); + @socket_close($ipc[0]); + $childProcess = $this->processFactory->createChildProcess($pid, $ipc[1]); $this->childProcesses[$pid] = $childProcess; $this->dispatchEvent(self::EVENT_CHILD_CREATED, $pid); @@ -134,15 +141,15 @@ public function runProcess(callable $callback): ChildProcessInterface // we are the child try { + @socket_close($ipc[1]); $this->childProcesses = []; $this->isChildProcess = true; call_user_func( $callback, - $this->processFactory->createChildProcess(posix_getpid()), - $this->getMainProcess() + $this->processFactory->createChildProcess(posix_getpid(), null), + $this->getMainProcess()->setIpcSocket($ipc[0]) ); - } - finally { + } finally { exit(0); } } @@ -154,6 +161,8 @@ public function wait(?callable $callback = null): ProcessManagerInterface $pid = pcntl_wait($status); if (isset($this->childProcesses[$pid])) { unset($this->childProcesses[$pid]); + + // dispatch event and trigger callback if present $this->dispatchEvent(self::EVENT_CHILD_EXIT, $pid); if (null !== $callback) { call_user_func($callback, $status, $pid); @@ -164,4 +173,4 @@ public function wait(?callable $callback = null): ProcessManagerInterface return $this; } -} \ No newline at end of file +} diff --git a/src/ProcessPool.php b/src/ProcessPool.php new file mode 100644 index 0000000..f6fd851 --- /dev/null +++ b/src/ProcessPool.php @@ -0,0 +1,174 @@ + + */ + protected array $pool = []; + + public function __construct( + int $numWorkers, + ?callable $invocationBuilder = null, + ?WorkerSelectionStrategyInterface $workerSelectionStrategy = null, + ?ProcessManagerInterface $processManager = null + ) { + // normalize arguments and ensure we have proper instances (or create a default fallback) + $this->numWorkers = max(1, $numWorkers); + $this->invocationBuilder = $invocationBuilder ?? function (...$args): mixed { + return $this->defaultInvoker(...$args); + }; + $this->workerSelectionStrategy = $workerSelectionStrategy ?? new RoundRobinWorkerSelectionStrategy(); + $this->processManager = $processManager ?? new ProcessManager(); + + // star the worker process + for ($workerNo = 0; $workerNo < $this->numWorkers; $workerNo++) { + $this->startWorker(); + } + } + + public function __destruct() + { + $this->killAll(); + } + + private function startWorker(): ChildProcessInterface + { + // create process + $process = $this->processManager->runProcess(function (...$args) { + $this->handleMessage(...$args); + }); + + // register process + $this->pool[$process->getId()] = $process; + + // reconfigure selection-strategy + $this->workerSelectionStrategy->configure(array_keys($this->pool)); + + // return created process + return $process; + } + + private function handleMessage(ChildProcessInterface $childProcess, ParentProcessInterface $parentProcess) + { + try { + while ($message = $parentProcess->getNextMessage()) { + if ($message instanceof WorkerMessageInterface) { + $message->execute($this, $childProcess, $parentProcess); + } + } + } catch (Exception $e) { + fwrite( + STDERR, + sprintf( + "Worker %d encountered an error: %s (%s:%d)\n", + $childProcess->getId(), + $e->getMessage(), + $e->getFile(), + $e->getLine() + ) + ); + } + } + + /** + * @throws ProcessException + * @throws ReflectionException + */ + private function defaultInvoker( + string $class, + ChildProcessInterface $childProcess, + ParentProcessInterface $parentProcess + ): mixed { + // class available? + if (!class_exists($class)) { + throw new ProcessException('Workload class not found!'); + } + + // check if the worker requires non-optional constructor params + $ref = new ReflectionClass($class); + $constructor = $ref->getConstructor(); + if ($constructor !== null) { + foreach ($constructor->getParameters() as $parameter) { + if (!$parameter->isDefaultValueAvailable()) { + throw new ProcessException( + 'The provided worker requires constructor arguments but ' . + 'no custom invocation callback is configured.' + ); + } + } + } + + // get the worker instance + $worker = $ref->newInstance(); + if (!is_callable($worker)) { + throw new ProcessException('Provided class is not invokable!'); + } + + // invoke the worker now + return $worker($childProcess, $parentProcess); + } + + private function getNextWorker(): ChildProcessInterface + { + // get next pid to distribute load to + $pid = $this->workerSelectionStrategy->getNextWorkerPid(); + + // initially known process? + $worker = $this->pool[$pid] ?? null; + /* @var ChildProcessInterface|null $worker */ + if ($worker === null) { + throw new ProcessException('Worker selection failed due to unknown PID.'); + } + + // ensure the process is still running + if (!file_exists(sprintf("/proc/%d", $pid))) { + unset($this->pool[$pid]); + + // start a drop-in worker + $worker = $this->startWorker(); + } + + return $worker; + } + + public function getInvocationBuilder(): callable + { + return $this->invocationBuilder; + } + + public function sendMessage(WorkerMessageInterface $workerMessage): bool + { + return $this->getNextWorker()->sendMessage($workerMessage); + } + + public function killAll(): void + { + foreach ($this->pool as $pid => $worker) { + $worker->kill(); + unset($this->pool[$pid], $worker); + } + } +} diff --git a/src/Strategies/RandomWorkerSelectionStrategy.php b/src/Strategies/RandomWorkerSelectionStrategy.php new file mode 100644 index 0000000..aa09c9f --- /dev/null +++ b/src/Strategies/RandomWorkerSelectionStrategy.php @@ -0,0 +1,31 @@ +pids = array_map('intval', $processIds); + $this->max = count($this->pids) - 1; + } + + public function getNextWorkerPid(): int + { + $nextIndex = 0; + if ($this->max !== 0) { + $nextIndex = mt_rand(0, $this->max); + } + return $this->pids[$nextIndex]; + } +} diff --git a/src/Strategies/RoundRobinWorkerSelectionStrategy.php b/src/Strategies/RoundRobinWorkerSelectionStrategy.php new file mode 100644 index 0000000..daa02f9 --- /dev/null +++ b/src/Strategies/RoundRobinWorkerSelectionStrategy.php @@ -0,0 +1,41 @@ +pids = array_map('intval', $processIds); + $this->lastPid = -1; + } + + public function getNextWorkerPid(): int + { + if (count($this->pids) === 0) { + throw new ProcessException( + 'Can not provide next worker pid as the strategy is not properly configured yet' + ); + } + + // move to the next + $this->lastPid++; + if (!isset($this->pids[$this->lastPid])) { + // rewind + $this->lastPid = 0; + } + + // return pid to use + return $this->pids[$this->lastPid]; + } +}