Skip to content

Commit

Permalink
Implement rows query event from mysql (#112)
Browse files Browse the repository at this point in the history
  • Loading branch information
DZunke authored Mar 13, 2024
1 parent c2c2ea7 commit 6150f0e
Show file tree
Hide file tree
Showing 9 changed files with 126 additions and 3 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ jobs:

- name: Start mysql service
run: |
echo -e "\n[mysqld]\nserver-id=1\nbinlog_format=row\nlog_bin=/var/log/mysql/mysql-bin.log" | sudo tee -a /etc/mysql/my.cnf
echo -e "\n[mysqld]\nserver-id=1\nbinlog_format=row\nlog_bin=/var/log/mysql/mysql-bin.log\nbinlog_rows_query_log_events=ON" | sudo tee -a /etc/mysql/my.cnf
sudo /etc/init.d/mysql start
mysql_tzinfo_to_sql /usr/share/zoneinfo | mysql -u root mysql -proot
Expand Down
3 changes: 2 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ services:
'--log_bin=binlog',
'--max_binlog_size=8M',
'--binlog_format=row',
'--server-id=1'
'--server-id=1',
'--binlog_rows_query_log_events=ON'
]
environment:
- MYSQL_ROOT_PASSWORD=root
Expand Down
1 change: 1 addition & 0 deletions src/MySQLReplication/Definitions/ConstEventsNames.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,5 @@ enum ConstEventsNames: string
case TABLE_MAP = 'tableMap';
case WRITE = 'write';
case FORMAT_DESCRIPTION = 'format description';
case ROWS_QUERY = 'rows_query';
}
40 changes: 40 additions & 0 deletions src/MySQLReplication/Event/DTO/RowsQueryDTO.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
<?php

declare(strict_types=1);

namespace MySQLReplication\Event\DTO;

use MySQLReplication\Definitions\ConstEventsNames;
use MySQLReplication\Event\EventInfo;

class RowsQueryDTO extends EventDTO
{
private ConstEventsNames $type = ConstEventsNames::ROWS_QUERY;

public function __construct(
EventInfo $eventInfo,
public readonly string $query,
) {
parent::__construct($eventInfo);
}

public function __toString(): string
{
return PHP_EOL .
'=== Event ' . $this->getType() . ' === ' . PHP_EOL .
'Date: ' . $this->eventInfo->getDateTime() . PHP_EOL .
'Log position: ' . $this->eventInfo->pos . PHP_EOL .
'Event size: ' . $this->eventInfo->size . PHP_EOL .
'Query: ' . $this->query . PHP_EOL;
}

public function getType(): string
{
return $this->type->value;
}

public function jsonSerialize(): array
{
return get_object_vars($this);
}
}
5 changes: 5 additions & 0 deletions src/MySQLReplication/Event/Event.php
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,11 @@ private function makeEvent(BinaryDataReader $binaryDataReader): ?EventDTO
);
}

// The Rows Query Log Event will be triggered with enabled MySQL Config `binlog_rows_query_log_events`
if ($eventInfo->type === ConstEventType::ROWS_QUERY_LOG_EVENT->value) {
return (new RowsQueryEvent($eventInfo, $binaryDataReader, $this->binLogServerInfo))->makeRowsQueryDTO();
}

if ($eventInfo->type === ConstEventType::FORMAT_DESCRIPTION_EVENT->value) {
return new FormatDescriptionEventDTO($eventInfo);
}
Expand Down
7 changes: 7 additions & 0 deletions src/MySQLReplication/Event/EventSubscribers.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
use MySQLReplication\Event\DTO\MariaDbGtidLogDTO;
use MySQLReplication\Event\DTO\QueryDTO;
use MySQLReplication\Event\DTO\RotateDTO;
use MySQLReplication\Event\DTO\RowsQueryDTO;
use MySQLReplication\Event\DTO\TableMapDTO;
use MySQLReplication\Event\DTO\UpdateRowsDTO;
use MySQLReplication\Event\DTO\WriteRowsDTO;
Expand All @@ -35,6 +36,7 @@ public static function getSubscribedEvents(): array
ConstEventsNames::MARIADB_GTID->value => 'onMariaDbGtid',
ConstEventsNames::FORMAT_DESCRIPTION->value => 'onFormatDescription',
ConstEventsNames::HEARTBEAT->value => 'onHeartbeat',
ConstEventsNames::ROWS_QUERY->value => 'onRowsQuery',
];
}

Expand Down Expand Up @@ -93,6 +95,11 @@ public function onHeartbeat(HeartbeatDTO $event): void
$this->allEvents($event);
}

public function onRowsQuery(RowsQueryDTO $event): void
{
$this->allEvents($event);
}

protected function allEvents(EventDTO $event): void
{
}
Expand Down
25 changes: 25 additions & 0 deletions src/MySQLReplication/Event/RowsQueryEvent.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
<?php

declare(strict_types=1);

namespace MySQLReplication\Event;

use MySQLReplication\Event\DTO\RowsQueryDTO;

/**
* The Rows_query event is within the binary log when the MySQL Option `binlog_rows_query_log_events`
* is enabled.
*
* @see https://dev.mysql.com/doc/dev/mysql-server/latest/classRows__query__log__event.html
*/
class RowsQueryEvent extends EventCommon
{
public function makeRowsQueryDTO(): RowsQueryDTO
{
// $this->binaryDataReader->advance(1);
return new RowsQueryDTO(
$this->eventInfo,
$this->binaryDataReader->read($this->binaryDataReader->readInt8()),
);
}
}
10 changes: 9 additions & 1 deletion tests/Integration/BaseCase.php
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ protected function setUp(): void
->withHost('0.0.0.0')
->withPassword('root')
->withPort(3306)
->withEventsIgnore([ConstEventType::GTID_LOG_EVENT->value]);
->withEventsIgnore($this->getIgnoredEvents());

$this->connect();

Expand Down Expand Up @@ -83,6 +83,14 @@ public function connect(): void
$this->connection->executeStatement('SET SESSION sql_mode = \'\';');
}

protected function getIgnoredEvents(): array
{
return [
ConstEventType::GTID_LOG_EVENT->value, // Generally in here
ConstEventType::ROWS_QUERY_LOG_EVENT->value, // Just debugging, there is a special test for it
];
}

protected function getEvent(): EventDTO
{
if ($this->mySQLReplicationFactory === null) {
Expand Down
36 changes: 36 additions & 0 deletions tests/Integration/RowsQueryTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
<?php

declare(strict_types=1);

namespace MySQLReplication\Tests\Integration;

use MySQLReplication\Definitions\ConstEventType;
use MySQLReplication\Event\DTO\QueryDTO;
use MySQLReplication\Event\DTO\RowsQueryDTO;

final class RowsQueryTest extends BaseCase
{
public function testThatTheEditingQueryIsReadFromBinLog(): void
{
$this->connection->executeStatement(
'CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))'
);

$insertQuery = 'INSERT INTO test (data) VALUES(\'Hello\') /* Foo:Bar; */';
$this->connection->executeStatement($insertQuery);

// The Create Table Query ... irrelevant content for this test
self::assertInstanceOf(QueryDTO::class, $this->getEvent());
// The BEGIN Query ... irrelevant content for this test
self::assertInstanceOf(QueryDTO::class, $this->getEvent());

$rowsQueryEvent = $this->getEvent();
self::assertInstanceOf(RowsQueryDTO::class, $rowsQueryEvent);
self::assertSame($insertQuery, $rowsQueryEvent->query);
}

protected function getIgnoredEvents(): array
{
return [ConstEventType::GTID_LOG_EVENT->value];
}
}

0 comments on commit 6150f0e

Please sign in to comment.