Skip to content

Commit

Permalink
enhanced monitor, process commands
Browse files Browse the repository at this point in the history
  • Loading branch information
haristku committed Oct 9, 2024
1 parent 253df77 commit c08817b
Show file tree
Hide file tree
Showing 3 changed files with 245 additions and 35 deletions.
171 changes: 150 additions & 21 deletions Commands/Monitor.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@ class Monitor extends ConsoleCommand
protected function configure()
{
$this->setName('queuedtracking:monitor');
$this->setDescription('Shows and updates the current state of the queue every 2 seconds.');
$this->setDescription("Shows and updates the current state of the queue every 2 seconds.\n Key ,=first page, .=last page, 0-9=move to page section, arrow LEFT=prev page, RIGHT=next page, UP=next 10 pages, DOWN=prev 10 pages, q=quit");
$this->addRequiredValueOption('iterations', null, 'If set, will limit the number of monitoring iterations done.');
$this->addRequiredValueOption('perpage', 'p', 'Number of queue worker displayed per page.', 16);
}

/**
Expand All @@ -36,6 +37,11 @@ protected function doExecute(): int
$systemCheck->checkRedisIsInstalled();
}

if (!$this->isPcntlFunctionAvailable()) {
$output->write(str_repeat("\r\n", 100));
$output->write("\e[".(100)."A");
}

$iterations = $this->getIterationsFromArg();
if ($iterations !== null) {
$output->writeln("<info>Only running " . $iterations . " iterations.</info>");
Expand All @@ -58,34 +64,130 @@ protected function doExecute(): int
$output->writeln('The command <comment>./console queuedtracking:process</comment> has to be executed to process request sets within queue');
}

$output->writeln(sprintf('Up to %d workers will be used', $manager->getNumberOfAvailableQueues()));
$output->writeln(sprintf('Processor will start once there are at least %s request sets in the queue',
$output->writeln(sprintf('Up to <info>%d</> workers will be used', $manager->getNumberOfAvailableQueues()));
$output->writeln(sprintf('Processor will start once there are at least <info>%s</> request sets in the queue',
$manager->getNumberOfRequestsToProcessAtSameTime()));
$iterationCount = 0;


$qCurrentPage = 1;
$qCount = count($queues);
$qPerPAge = min(max($this->getPerPageFromArg(), 1), $qCount);
$qPageCount = ceil($qCount / $qPerPAge);

$signalTrap = function() use ($output) {
$output->writeln("\e[u\e[?25h");
die;
};
if ($this->isPcntlFunctionAvailable())
{
pcntl_signal(SIGINT, $signalTrap);
pcntl_signal(SIGTERM, $signalTrap);
}

readline_callback_handler_install('', function() {});
stream_set_blocking (STDIN, false);

$output->writeln(str_repeat("-", 30));
$output->writeln("<fg=black;bg=white;options=bold>".str_pad(" Q INDEX", 10).str_pad(" | REQUEST SETS", 20)."</>");
$output->writeln(str_repeat("-", 30));
$output->write("\e[?25l");

$lastStatsTimer = microtime(true) - 2;
$lastSumInQueue = false;
$diffSumInQueue = 0;
$keyPressed = "";
while (1) {
$memory = $backend->getMemoryStats(); // I know this will only work with redis currently as it is not defined in backend interface etc. needs to be refactored once we add another backend
if ($this->isPcntlFunctionAvailable()) {
pcntl_signal_dispatch();
}

if (microtime(true) - $lastStatsTimer >= 2 || $keyPressed != "")
{
$qCurrentPage = min(max($qCurrentPage, 1), $qPageCount);
$memory = $backend->getMemoryStats(); // I know this will only work with redis currently as it is not defined in backend interface etc. needs to be refactored once we add another backend

$sumInQueue = 0;
foreach ($queues as $sumQ) {
$sumInQueue += $sumQ->getNumberOfRequestSetsInQueue();
}

if ($lastSumInQueue !== false) {
$diffSumInQueue = $lastSumInQueue - $sumInQueue;
$diffRps = round($diffSumInQueue / (microtime(true) - $lastStatsTimer), 2);
$diffSumInQueue = $diffSumInQueue < 0 ? "<fg=red;options=bold>".abs($diffRps)."</>" : "<fg=green;options=bold>{$diffRps}</>";
}

$numInQueue = 0;
for ($idxPage = 0; $idxPage < $qPerPAge; $idxPage++) {
$idx = ($qCurrentPage - 1) * $qPerPAge + $idxPage;
if (isset($queues[$idx])) {
$q = $queues[$idx]->getNumberOfRequestSetsInQueue();
$numInQueue += (int)$q;
$output->writeln(str_pad($idx, 10, " ", STR_PAD_LEFT)." | ".str_pad(number_format($q), 16, " ", STR_PAD_LEFT));
} else {
$output->writeln(str_pad("", 10)." | ".str_pad("", 16));
}
}

$output->writeln(str_repeat("-", 30));
$output->writeln("<fg=black;bg=white;options=bold>".str_pad(" ".($qCount)." Q", 10)." | ".str_pad(number_format($sumInQueue)." R", 16)."</>");
$output->writeln(str_repeat("-", 30));
$output->writeln(sprintf(
"Q [%s-%s] | <info>page %s/%s</> | <comment>press (0-9.,q) or arrow(L,R,U,D)</> | diff/sec %s \n".
"%s used memory (%s peak). <info>%d</> workers active.".str_repeat(" ", 15),
($idx - $qPerPAge + 1),
$idx, $qCurrentPage, $qPageCount, $diffSumInQueue,
$memory['used_memory_human'] ?? 'Unknown',
$memory['used_memory_peak_human'] ?? 'Unknown',
$lock->getNumberOfAcquiredLocks()
));
$output->write("\e[s");
$output->write("\e[0G");
$output->write("\e[".($qPerPAge + 5)."A");

if (!is_null($iterations)) {
$iterationCount += 1;
if ($iterationCount >= $iterations) {
break;
}
}

$numInQueue = array();
foreach ($queues as $queue) {
$numInQueue[] = $queue->getNumberOfRequestSetsInQueue();
$lastSumInQueue = $sumInQueue;
$lastStatsTimer = microtime(true);
}

$message = sprintf('%s (%s) request sets left in queue. %s used memory (%s peak). %d workers active. ',
array_sum($numInQueue),
implode('+', $numInQueue),
$memory['used_memory_human'] ?? 'Unknown',
$memory['used_memory_peak_human'] ?? 'unknown',
$lock->getNumberOfAcquiredLocks());
$output->write("\x0D");
$output->write($message);
if (!is_null($iterations)) {
$iterationCount += 1;
if ($iterationCount >= $iterations) {
break;
$keyStroke = stream_get_contents(STDIN, 3);
$keyPressed = strlen($keyStroke) == 3 ? $keyStroke[2] : (strlen($keyStroke) > 0 ? $keyStroke[0] : "");
if ($keyPressed != "" and in_array($keyPressed, array(".", ",", "0", "1", "2", "3", "4", "5", "6", "7", "8", "9", "A", "B", "C", "D", "q"))) {
switch ($keyPressed) {
case "0": case "1": case "2": case "3": case "4":
case "5": case "6": case "7": case "8": case "9":
$keyPressed = $keyPressed != "0" ? $keyPressed : "10";
$qCurrentPage = floor(($qCurrentPage - 0.1) / 10) * 10 + (int)$keyPressed; break;
case "C":
$qCurrentPage++;
break;
case "D":
$qCurrentPage--;
break;
case "A":
$qCurrentPage += 10;
break;
case "B":
$qCurrentPage -= 10;
break;
case ",":
$qCurrentPage = 1;
break;
case ".":
$qCurrentPage = $qPageCount;
break;
case "q":
$signalTrap();
}
}
sleep(2);

usleep(5000);
}

return self::SUCCESS;
Expand All @@ -112,4 +214,31 @@ private function getIterationsFromArg()
return $iterations;
}

/**
* Loads the `perpage` argument from the commands arguments.
*
* @return int|null
*/
private function getPerPageFromArg()
{
$perPage = $this->getInput()->getOption('perpage');
if (!is_numeric($perPage)) {
throw new \Exception('perpage needs to be numeric');
} else {
$perPage = (int)$perPage;
if ($perPage <= 0) {
throw new \Exception('perpage needs to be a non-zero positive number');
}
}
return $perPage;
}

private function isPcntlFunctionAvailable()
{
if (extension_loaded('pcntl') && function_exists('pcntl_signal') && function_exists('pcntl_signal_dispatch')) {
return true;
}

return false;
}
}
86 changes: 72 additions & 14 deletions Commands/Process.php
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ protected function configure()
$this->setName('queuedtracking:process');
$this->addRequiredValueOption('queue-id', null, 'If set, will only work on that specific queue. For example "0" or "1" (if there are multiple queues). Not recommended when only one worker is in use. If for example 4 workers are in use, you may want to use 0, 1, 2, or 3.');
$this->addRequiredValueOption('force-num-requests-process-at-once', null, 'If defined, it overwrites the setting of how many requests will be picked out of the queue and processed at once. Must be a number which is >= 1. By default, the configured value from the settings will be used. This can be useful for example if you want to process every single request within the queue. If otherwise a batch size of say 100 is configured, then there may be otherwise 99 requests left in the queue. It can be also useful for testing purposes.');
$this->addRequiredValueOption('cycle', 'c', 'The proccess will automatically loop for "n" cycle time(s), set "0" to infinite.', 1);
$this->addRequiredValueOption('sleep', 's', 'Take a nap for "n" second(s) before recycle, minimum is 1 second.', 1);
$this->addRequiredValueOption('delay', 'd', 'Delay before finished', 0);
$this->setDescription('Processes all queued tracking requests in case there are enough requests in the queue and in case they are not already in process by another script. To keep track of the queue use the <comment>--verbose</comment> option or execute the <comment>queuedtracking:monitor</comment> command.');
}

Expand Down Expand Up @@ -76,29 +79,84 @@ protected function doExecute(): int
throw new \Exception('Number of requests to process must be a number and at least 1');
}

$output->writeln("<info>Starting to process request sets, this can take a while</info>");

register_shutdown_function(function () use ($queueManager) {
$queueManager->unlock();
});

$startTime = microtime(true);
$processor = new Processor($queueManager);
$processor->setNumberOfMaxBatchesToProcess(500);
$tracker = $processor->process();

$neededTime = (microtime(true) - $startTime);
$numRequestsTracked = $tracker->getCountOfLoggedRequests();
$requestsPerSecond = $this->getNumberOfRequestsPerSecond($numRequestsTracked, $neededTime);

Piwik::postEvent('Tracker.end');
$numberOfProcessCycle = $input->getOption('cycle');
if (!is_numeric($numberOfProcessCycle)) {
throw new \Exception('"cycle" needs to be numeric');
}
$numberOfProcessCycle = (int)$numberOfProcessCycle;
$infiniteCycle = $numberOfProcessCycle == 0;

$delayedBeforeFinish = (int)$input->getOption('delay');

$napster = max(1, $input->getOption('sleep'));
if (!is_numeric($napster)) {
throw new \Exception('"nap" needs to be numeric');
}
$napster = (int)$napster;

$lastTimeGotMoreThanZeroTrackedReq = microtime(true);
$originalNumberOfRequestsToProcessAtSameTime = $queueManager->getNumberOfRequestsToProcessAtSameTime();

while ($numberOfProcessCycle > 0 || $infiniteCycle) {
$wipingOutQueue = false;
if (microtime(true) - $lastTimeGotMoreThanZeroTrackedReq > 10) {
$queueManager->setNumberOfRequestsToProcessAtSameTime(1);
$wipingOutQueue = true;
$lastTimeGotMoreThanZeroTrackedReq = microtime(true);
}

if ($wipingOutQueue) {
$output->writeln("<fg=red;bg=white;options=bold> TRYING TO WIPE OUT THE QUEUE </>");
}
$output->writeln("<info>Starting to process request sets, this can take a while</info>");

$startTime = microtime(true);
$processor = new Processor($queueManager);
$processor->setNumberOfMaxBatchesToProcess(500);
$tracker = $processor->process();

$neededTime = (microtime(true) - $startTime);
$numRequestsTracked = $tracker->getCountOfLoggedRequests();
$requestsPerSecond = $this->getNumberOfRequestsPerSecond($numRequestsTracked, $neededTime);

$this->writeSuccessMessage(
array(sprintf('This worker finished queue processing with %sreq/s (%s requests in %02.2f seconds)', $requestsPerSecond, $numRequestsTracked, $neededTime))
);
Piwik::postEvent('Tracker.end');

if ($numRequestsTracked > 0) {
$lastTimeGotMoreThanZeroTrackedReq = microtime(true);
}

if (!$infiniteCycle) {
$numberOfProcessCycle--;
}
if ($numberOfProcessCycle > 0 || $infiniteCycle) {
$cTogo = $infiniteCycle ? "infinite" : $numberOfProcessCycle;
$output->writeln("===========================================================================");
$output->writeln("<comment>Taking a nap for {$napster} second(s) before re-run the process. <info>({$cTogo})</info> cyle(s) to go.</comment>");
$output->writeln("===========================================================================");
sleep($napster);
}

if ($wipingOutQueue) {
$queueManager->setNumberOfRequestsToProcessAtSameTime($originalNumberOfRequestsToProcessAtSameTime);
}
}

// Piwik::postEvent('Tracker.end');
$trackerEnvironment->destroy();

$this->writeSuccessMessage(
array(sprintf('This worker finished queue processing with %sreq/s (%s requests in %02.2f seconds)', $requestsPerSecond, $numRequestsTracked, $neededTime))
);

if ($delayedBeforeFinish > 0) {
sleep($delayedBeforeFinish);
}
return self::SUCCESS;
}

Expand Down
23 changes: 23 additions & 0 deletions docs/faq.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ requests using the [Piwik console](http://developer.piwik.org/guides/piwik-on-th
* Disable the setting "Process during tracking request" in the Piwik UI under "Settings => Plugin Settings"
* Setup a cronjob that executes the command `./console queuedtracking:process` for instance every minute
* That's it
* Or, if you have __"non WINDOWS OS"__ you can use the [Supervisor](http://supervisord.org/) as a cron alternative.

The `queuedtracking:process` command will make sure to process all queued tracking requests whenever possible and the
command will exit as soon as there are not enough requests queued anymore. That's why you should setup a cronjob to start
Expand All @@ -43,6 +44,28 @@ Example crontab entry that starts the processor every minute:

`* * * * * cd /piwik && ./console queuedtracking:process >/dev/null 2>&1`

Example Supervisor entry that will start 16 processors/workers with 10 loop cycle times and auto restart:

```ini
[program:matomo]
directory=/path/to/your/matomo
command=/path/to/your/php /path/to/your/matomo/console queuedtracking:process --queue-id=%(process_num)s -c 10 -s 2 -d 5
process_name=queuedtracking-%(process_num)s

#change the number according to how many worker(s) you have
numprocs=16

numprocs_start=0
stopsignal=TERM
autostart=true
autorestart=true
stopwaitsecs=120
#priority=1000
stdout_logfile=/dev/null
stdout_logfile_maxbytes=0
redirect_stderr=true
```

__Can I keep track of the state of the queue?__

Yes, you can. Just execute the command `./console queuedtracking:monitor`. This will show the current state of the queue. To exit this command you can for example press `CTRL + C` key at the same time.
Expand Down

0 comments on commit c08817b

Please sign in to comment.