From 0dad963cd89be409c729830dea4128a021129090 Mon Sep 17 00:00:00 2001 From: Jordi Boggiano Date: Fri, 5 Jun 2020 08:33:44 +0200 Subject: [PATCH] Add executeAsync to ProcessExecutor and allow Loop class to wait on it in addition to HttpDownloader --- src/Composer/Factory.php | 2 +- src/Composer/Util/HttpDownloader.php | 71 +++--- src/Composer/Util/Loop.php | 35 ++- src/Composer/Util/ProcessExecutor.php | 204 ++++++++++++++++++ .../Test/Downloader/FileDownloaderTest.php | 6 +- .../Test/Downloader/XzDownloaderTest.php | 2 +- .../Test/Downloader/ZipDownloaderTest.php | 2 +- .../Repository/ComposerRepositoryTest.php | 10 +- 8 files changed, 293 insertions(+), 39 deletions(-) diff --git a/src/Composer/Factory.php b/src/Composer/Factory.php index 04da08e31..5bc41ee6b 100644 --- a/src/Composer/Factory.php +++ b/src/Composer/Factory.php @@ -336,7 +336,7 @@ class Factory $httpDownloader = self::createHttpDownloader($io, $config); $process = new ProcessExecutor($io); - $loop = new Loop($httpDownloader); + $loop = new Loop($httpDownloader, $process); $composer->setLoop($loop); // initialize event dispatcher diff --git a/src/Composer/Util/HttpDownloader.php b/src/Composer/Util/HttpDownloader.php index 2fa8fa716..41ced41e3 100644 --- a/src/Composer/Util/HttpDownloader.php +++ b/src/Composer/Util/HttpDownloader.php @@ -44,6 +44,7 @@ class HttpDownloader private $rfs; private $idGen = 0; private $disabled; + private $allowAsync = false; /** * @param IOInterface $io The IO instance @@ -139,6 +140,10 @@ class HttpDownloader 'origin' => Url::getOrigin($this->config, $request['url']), ); + if (!$sync && !$this->allowAsync) { + throw new \LogicException('You must use the HttpDownloader instance which is part of a Composer\Loop instance to be able to run async http requests'); + } + // capture username/password from URL if there is one if (preg_match('{^https?://([^:/]+):([^@/]+)@([^/]+)}i', $request['url'], $match)) { $this->io->setAuthentication($job['origin'], rawurldecode($match[1]), rawurldecode($match[2])); @@ -189,7 +194,6 @@ class HttpDownloader // TODO 3.0 this should be done directly on $this when PHP 5.3 is dropped $downloader->markJobDone(); - $downloader->scheduleNextJob(); return $response; }, function ($e) use (&$job, $downloader) { @@ -197,7 +201,6 @@ class HttpDownloader $job['exception'] = $e; $downloader->markJobDone(); - $downloader->scheduleNextJob(); throw $e; }); @@ -251,13 +254,7 @@ class HttpDownloader public function markJobDone() { $this->runningJobs--; - } - /** - * @private - */ - public function scheduleNextJob() - { foreach ($this->jobs as $job) { if ($job['status'] === self::STATUS_QUEUED) { $this->startJob($job['id']); @@ -268,34 +265,50 @@ class HttpDownloader } } - public function wait($index = null, $progress = false) + public function wait($index = null) { while (true) { - if ($this->curl) { - $this->curl->tick(); + if (!$this->hasActiveJob($index)) { + return; } - if (null !== $index) { - if ($this->jobs[$index]['status'] === self::STATUS_COMPLETED || $this->jobs[$index]['status'] === self::STATUS_FAILED) { - return; - } - } else { - $done = true; - foreach ($this->jobs as $job) { - if (!in_array($job['status'], array(self::STATUS_COMPLETED, self::STATUS_FAILED), true)) { - $done = false; - break; - } elseif (!$job['sync']) { - unset($this->jobs[$job['id']]); - } - } - if ($done) { - return; - } + usleep(1000); + } + } + + /** + * @internal + */ + public function enableAsync() + { + $this->allowAsync = true; + } + + /** + * @internal + */ + public function hasActiveJob($index = null) + { + if ($this->curl) { + $this->curl->tick(); + } + + if (null !== $index) { + if ($this->jobs[$index]['status'] === self::STATUS_COMPLETED || $this->jobs[$index]['status'] === self::STATUS_FAILED) { + return false; } + return true; + } - usleep(1000); + foreach ($this->jobs as $job) { + if (!in_array($job['status'], array(self::STATUS_COMPLETED, self::STATUS_FAILED), true)) { + return true; + } elseif (!$job['sync']) { + unset($this->jobs[$job['id']]); + } } + + return false; } private function getResponse($index) diff --git a/src/Composer/Util/Loop.php b/src/Composer/Util/Loop.php index dfaa2ac53..b0061ba2d 100644 --- a/src/Composer/Util/Loop.php +++ b/src/Composer/Util/Loop.php @@ -21,10 +21,19 @@ use React\Promise\Promise; class Loop { private $httpDownloader; + private $processExecutor; + private $currentPromises; - public function __construct(HttpDownloader $httpDownloader) + public function __construct(HttpDownloader $httpDownloader = null, ProcessExecutor $processExecutor = null) { $this->httpDownloader = $httpDownloader; + if ($this->httpDownloader) { + $this->httpDownloader->enableAsync(); + } + $this->processExecutor = $processExecutor; + if ($this->processExecutor) { + $this->processExecutor->enableAsync(); + } } public function wait(array $promises) @@ -39,8 +48,30 @@ class Loop } ); - $this->httpDownloader->wait(); + $this->currentPromises = $promises; + + while (true) { + $hasActiveJob = false; + + if ($this->httpDownloader) { + if ($this->httpDownloader->hasActiveJob()) { + $hasActiveJob = true; + } + } + if ($this->processExecutor) { + if ($this->processExecutor->hasActiveJob()) { + $hasActiveJob = true; + } + } + + if (!$hasActiveJob) { + break; + } + + usleep(5000); + } + $this->currentPromises = null; if ($uncaught) { throw $uncaught; } diff --git a/src/Composer/Util/ProcessExecutor.php b/src/Composer/Util/ProcessExecutor.php index a30a04d15..b443e541d 100644 --- a/src/Composer/Util/ProcessExecutor.php +++ b/src/Composer/Util/ProcessExecutor.php @@ -16,18 +16,32 @@ use Composer\IO\IOInterface; use Symfony\Component\Process\Process; use Symfony\Component\Process\ProcessUtils; use Symfony\Component\Process\Exception\RuntimeException; +use React\Promise\Promise; /** * @author Robert Schönthal + * @author Jordi Boggiano */ class ProcessExecutor { + const STATUS_QUEUED = 1; + const STATUS_STARTED = 2; + const STATUS_COMPLETED = 3; + const STATUS_FAILED = 4; + const STATUS_ABORTED = 5; + protected static $timeout = 300; protected $captureOutput; protected $errorOutput; protected $io; + private $jobs = array(); + private $runningJobs = 0; + private $maxJobs = 10; + private $idGen = 0; + private $allowAsync = false; + public function __construct(IOInterface $io = null) { $this->io = $io; @@ -112,6 +126,196 @@ class ProcessExecutor return $process->getExitCode(); } + /** + * starts a process on the commandline in async mode + * + * @param string $command the command to execute + * @param mixed $output the output will be written into this var if passed by ref + * if a callable is passed it will be used as output handler + * @param string $cwd the working directory + * @return int statuscode + */ + public function executeAsync($command, $cwd = null) + { + if (!$this->allowAsync) { + throw new \LogicException('You must use the ProcessExecutor instance which is part of a Composer\Loop instance to be able to run async processes'); + } + + $job = array( + 'id' => $this->idGen++, + 'status' => self::STATUS_QUEUED, + 'command' => $command, + 'cwd' => $cwd, + ); + + $resolver = function ($resolve, $reject) use (&$job) { + $job['status'] = ProcessExecutor::STATUS_QUEUED; + $job['resolve'] = $resolve; + $job['reject'] = $reject; + }; + + $self = $this; + $io = $this->io; + + $canceler = function () use (&$job) { + if ($job['status'] === self::STATUS_QUEUED) { + $job['status'] = self::STATUS_ABORTED; + } + if ($job['status'] !== self::STATUS_STARTED) { + return; + } + $job['status'] = self::STATUS_ABORTED; + try { + if (defined('SIGINT')) { + $job['process']->signal(SIGINT); + } + } catch (\Exception $e) { + // signal can throw in various conditions, but we don't care if it fails + } + $job['process']->stop(1); + }; + + $promise = new Promise($resolver, $canceler); + $promise = $promise->then(function () use (&$job, $self) { + if ($job['process']->isSuccessful()) { + $job['status'] = ProcessExecutor::STATUS_COMPLETED; + } else { + $job['status'] = ProcessExecutor::STATUS_FAILED; + } + + // TODO 3.0 this should be done directly on $this when PHP 5.3 is dropped + $self->markJobDone(); + + return $job['process']; + }, function () use (&$job, $self) { + $job['status'] = ProcessExecutor::STATUS_FAILED; + + $self->markJobDone(); + + return \React\Promise\reject($job['process']); + }); + $this->jobs[$job['id']] =& $job; + + if ($this->runningJobs < $this->maxJobs) { + $this->startJob($job['id']); + } + + return $promise; + } + + private function startJob($id) + { + $job =& $this->jobs[$id]; + if ($job['status'] !== self::STATUS_QUEUED) { + return; + } + + // start job + $job['status'] = self::STATUS_STARTED; + $this->runningJobs++; + + $command = $job['command']; + $cwd = $job['cwd']; + + if ($this->io && $this->io->isDebug()) { + $safeCommand = preg_replace_callback('{://(?P[^:/\s]+):(?P[^@\s/]+)@}i', function ($m) { + if (preg_match('{^[a-f0-9]{12,}$}', $m['user'])) { + return '://***:***@'; + } + + return '://'.$m['user'].':***@'; + }, $command); + $safeCommand = preg_replace("{--password (.*[^\\\\]\') }", '--password \'***\' ', $safeCommand); + $this->io->writeError('Executing async command ('.($cwd ?: 'CWD').'): '.$safeCommand); + } + + // make sure that null translate to the proper directory in case the dir is a symlink + // and we call a git command, because msysgit does not handle symlinks properly + if (null === $cwd && Platform::isWindows() && false !== strpos($command, 'git') && getcwd()) { + $cwd = realpath(getcwd()); + } + + // TODO in v3, commands should be passed in as arrays of cmd + args + if (method_exists('Symfony\Component\Process\Process', 'fromShellCommandline')) { + $process = Process::fromShellCommandline($command, $cwd, null, null, static::getTimeout()); + } else { + $process = new Process($command, $cwd, null, null, static::getTimeout()); + } + + $job['process'] = $process; + + $process->start(); + } + + public function wait($index = null) + { + while (true) { + if (!$this->hasActiveJob($index)) { + return; + } + + usleep(1000); + } + } + + /** + * @internal + */ + public function enableAsync() + { + $this->allowAsync = true; + } + + /** + * @internal + */ + public function hasActiveJob($index = null) + { + // tick + foreach ($this->jobs as &$job) { + if ($job['status'] === self::STATUS_STARTED) { + if (!$job['process']->isRunning()) { + call_user_func($job['resolve'], $job['process']); + } + } + } + + if (null !== $index) { + if ($this->jobs[$index]['status'] === self::STATUS_COMPLETED || $this->jobs[$index]['status'] === self::STATUS_FAILED || $this->jobs[$index]['status'] === self::STATUS_ABORTED) { + return false; + } + + return true; + } + + foreach ($this->jobs as $job) { + if (!in_array($job['status'], array(self::STATUS_COMPLETED, self::STATUS_FAILED, self::STATUS_ABORTED), true)) { + return true; + } else { + unset($this->jobs[$job['id']]); + } + } + + return false; + } + + /** + * @private + */ + public function markJobDone() + { + $this->runningJobs--; + + foreach ($this->jobs as $job) { + if ($job['status'] === self::STATUS_QUEUED) { + $this->startJob($job['id']); + if ($this->runningJobs >= $this->maxJobs) { + return; + } + } + } + } + public function splitLines($output) { $output = trim($output); diff --git a/tests/Composer/Test/Downloader/FileDownloaderTest.php b/tests/Composer/Test/Downloader/FileDownloaderTest.php index c86ffa2f7..ba8f95db9 100644 --- a/tests/Composer/Test/Downloader/FileDownloaderTest.php +++ b/tests/Composer/Test/Downloader/FileDownloaderTest.php @@ -139,8 +139,8 @@ class FileDownloaderTest extends TestCase ->will($this->returnValue($path.'/vendor')); try { - $promise = $downloader->download($packageMock, $path); $loop = new Loop($this->httpDownloader); + $promise = $downloader->download($packageMock, $path); $loop->wait(array($promise)); $this->fail('Download was expected to throw'); @@ -225,8 +225,8 @@ class FileDownloaderTest extends TestCase touch($dlFile); try { - $promise = $downloader->download($packageMock, $path); $loop = new Loop($this->httpDownloader); + $promise = $downloader->download($packageMock, $path); $loop->wait(array($promise)); $this->fail('Download was expected to throw'); @@ -296,8 +296,8 @@ class FileDownloaderTest extends TestCase mkdir(dirname($dlFile), 0777, true); touch($dlFile); - $promise = $downloader->download($newPackage, $path, $oldPackage); $loop = new Loop($this->httpDownloader); + $promise = $downloader->download($newPackage, $path, $oldPackage); $loop->wait(array($promise)); $downloader->update($oldPackage, $newPackage, $path); diff --git a/tests/Composer/Test/Downloader/XzDownloaderTest.php b/tests/Composer/Test/Downloader/XzDownloaderTest.php index f770b0d35..6996d67f6 100644 --- a/tests/Composer/Test/Downloader/XzDownloaderTest.php +++ b/tests/Composer/Test/Downloader/XzDownloaderTest.php @@ -70,8 +70,8 @@ class XzDownloaderTest extends TestCase $downloader = new XzDownloader($io, $config, $httpDownloader = new HttpDownloader($io, $this->getMockBuilder('Composer\Config')->getMock()), null, null, null); try { - $promise = $downloader->download($packageMock, $this->testDir.'/install-path'); $loop = new Loop($httpDownloader); + $promise = $downloader->download($packageMock, $this->testDir.'/install-path'); $loop->wait(array($promise)); $downloader->install($packageMock, $this->testDir.'/install-path'); diff --git a/tests/Composer/Test/Downloader/ZipDownloaderTest.php b/tests/Composer/Test/Downloader/ZipDownloaderTest.php index 4436c6ad7..764af8feb 100644 --- a/tests/Composer/Test/Downloader/ZipDownloaderTest.php +++ b/tests/Composer/Test/Downloader/ZipDownloaderTest.php @@ -92,8 +92,8 @@ class ZipDownloaderTest extends TestCase $this->setPrivateProperty('hasSystemUnzip', false); try { - $promise = $downloader->download($this->package, $path = sys_get_temp_dir().'/composer-zip-test'); $loop = new Loop($this->httpDownloader); + $promise = $downloader->download($this->package, $path = sys_get_temp_dir().'/composer-zip-test'); $loop->wait(array($promise)); $downloader->install($this->package, $path); diff --git a/tests/Composer/Test/Repository/ComposerRepositoryTest.php b/tests/Composer/Test/Repository/ComposerRepositoryTest.php index 4fcbbb431..01e3be4ce 100644 --- a/tests/Composer/Test/Repository/ComposerRepositoryTest.php +++ b/tests/Composer/Test/Repository/ComposerRepositoryTest.php @@ -189,16 +189,19 @@ class ComposerRepositoryTest extends TestCase ->getMock(); $httpDownloader->expects($this->at(0)) + ->method('enableAsync'); + + $httpDownloader->expects($this->at(1)) ->method('get') ->with($url = 'http://example.org/packages.json') ->willReturn(new \Composer\Util\Http\Response(array('url' => $url), 200, array(), json_encode(array('search' => '/search.json?q=%query%&type=%type%')))); - $httpDownloader->expects($this->at(1)) + $httpDownloader->expects($this->at(2)) ->method('get') ->with($url = 'http://example.org/search.json?q=foo&type=composer-plugin') ->willReturn(new \Composer\Util\Http\Response(array('url' => $url), 200, array(), json_encode($result))); - $httpDownloader->expects($this->at(2)) + $httpDownloader->expects($this->at(3)) ->method('get') ->with($url = 'http://example.org/search.json?q=foo&type=library') ->willReturn(new \Composer\Util\Http\Response(array('url' => $url), 200, array(), json_encode(array()))); @@ -291,6 +294,9 @@ class ComposerRepositoryTest extends TestCase ->getMock(); $httpDownloader->expects($this->at(0)) + ->method('enableAsync'); + + $httpDownloader->expects($this->at(1)) ->method('get') ->with($url = 'http://example.org/packages.json') ->willReturn(new \Composer\Util\Http\Response(array('url' => $url), 200, array(), json_encode(array(