Skip to content

Commit

Permalink
Introduce basic_sql_backend_connector for sql.DB connectors (#1188)
Browse files Browse the repository at this point in the history
A lot of golang drivers for different SQL libraries expose a standard
`sql.DB` interface. Our backend connector code for ClickHouse, Postgres,
MySQL can be unified around this interface.

This reduces the code duplication by moving the common code to a new
`basic_sql_backend_connector` struct.
  • Loading branch information
avelanarius authored Jan 16, 2025
1 parent 6e6652a commit b37c655
Show file tree
Hide file tree
Showing 6 changed files with 112 additions and 194 deletions.
83 changes: 83 additions & 0 deletions quesma/backend_connectors/basic_sql_backend_connector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// Copyright Quesma, licensed under the Elastic License 2.0.
// SPDX-License-Identifier: Elastic-2.0

package backend_connectors

import (
"context"
"database/sql"
quesma_api "github.com/QuesmaOrg/quesma/quesma/v2/core"
)

type BasicSqlBackendConnector struct {
connection *sql.DB
}

type SqlRows struct {
rows *sql.Rows
}

func (p *SqlRows) Next() bool {
return p.rows.Next()
}

func (p *SqlRows) Scan(dest ...interface{}) error {
return p.rows.Scan(dest...)
}

func (p *SqlRows) Close() error {
return p.rows.Close()
}

func (p *SqlRows) Err() error {
return p.rows.Err()
}

func (p *BasicSqlBackendConnector) Open() error {
conn, err := initDBConnection()
if err != nil {
return err
}
p.connection = conn
return nil
}

func (p *BasicSqlBackendConnector) Close() error {
if p.connection == nil {
return nil
}
return p.connection.Close()
}

func (p *BasicSqlBackendConnector) Ping() error {
return p.connection.Ping()
}

func (p *BasicSqlBackendConnector) Query(ctx context.Context, query string, args ...interface{}) (quesma_api.Rows, error) {
rows, err := p.connection.QueryContext(ctx, query, args...)
if err != nil {
return nil, err
}
return &SqlRows{rows: rows}, nil
}

func (p *BasicSqlBackendConnector) QueryRow(ctx context.Context, query string, args ...interface{}) quesma_api.Row {
return p.connection.QueryRowContext(ctx, query, args...)
}

func (p *BasicSqlBackendConnector) Exec(ctx context.Context, query string, args ...interface{}) error {
if len(args) == 0 {
_, err := p.connection.ExecContext(ctx, query)
return err
}
_, err := p.connection.ExecContext(ctx, query, args...)
return err
}

func (p *BasicSqlBackendConnector) Stats() quesma_api.DBStats {
stats := p.connection.Stats()
return quesma_api.DBStats{
MaxOpenConnections: stats.MaxOpenConnections,
OpenConnections: stats.OpenConnections,
}
}
81 changes: 6 additions & 75 deletions quesma/backend_connectors/clickhouse_backend_connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,46 +4,15 @@
package backend_connectors

import (
"context"
"database/sql"
"github.com/ClickHouse/clickhouse-go/v2"

quesma_api "github.com/QuesmaOrg/quesma/quesma/v2/core"
)

type ClickHouseBackendConnector struct {
Endpoint string
connection *sql.DB
}

type ClickHouseRows struct {
rows *sql.Rows
}
type ClickHouseRow struct {
row *sql.Row
}

func (p *ClickHouseRow) Scan(dest ...interface{}) error {
if p.row == nil {
return sql.ErrNoRows
}
return p.row.Scan(dest...)
}

func (p *ClickHouseRows) Next() bool {
return p.rows.Next()
}

func (p *ClickHouseRows) Scan(dest ...interface{}) error {
return p.rows.Scan(dest...)
}

func (p *ClickHouseRows) Close() error {
return p.rows.Close()
}

func (p *ClickHouseRows) Err() error {
return p.rows.Err()
BasicSqlBackendConnector
Endpoint string
}

func (p *ClickHouseBackendConnector) GetId() quesma_api.BackendConnectorType {
Expand All @@ -59,46 +28,6 @@ func (p *ClickHouseBackendConnector) Open() error {
return nil
}

func (p *ClickHouseBackendConnector) Close() error {
if p.connection == nil {
return nil
}
return p.connection.Close()
}

func (p *ClickHouseBackendConnector) Ping() error {
return p.connection.Ping()
}

func (p *ClickHouseBackendConnector) Query(ctx context.Context, query string, args ...interface{}) (quesma_api.Rows, error) {
rows, err := p.connection.QueryContext(ctx, query, args...)
if err != nil {
return nil, err
}
return &ClickHouseRows{rows: rows}, nil
}

func (p *ClickHouseBackendConnector) QueryRow(ctx context.Context, query string, args ...interface{}) quesma_api.Row {
return p.connection.QueryRowContext(ctx, query, args...)
}

func (p *ClickHouseBackendConnector) Exec(ctx context.Context, query string, args ...interface{}) error {
if len(args) == 0 {
_, err := p.connection.ExecContext(ctx, query)
return err
}
_, err := p.connection.ExecContext(ctx, query, args...)
return err
}

func (p *ClickHouseBackendConnector) Stats() quesma_api.DBStats {
stats := p.connection.Stats()
return quesma_api.DBStats{
MaxOpenConnections: stats.MaxOpenConnections,
OpenConnections: stats.OpenConnections,
}
}

// func initDBConnection(c *config.QuesmaConfiguration, tlsConfig *tls.Config) *sql.DB {
func initDBConnection() (*sql.DB, error) {
options := clickhouse.Options{Addr: []string{"localhost:9000"}}
Expand All @@ -124,8 +53,10 @@ func NewClickHouseBackendConnector(endpoint string) *ClickHouseBackendConnector
// so that it is can be used in pre-v2 code. Should be removed when moving forwards.
func NewClickHouseBackendConnectorWithConnection(endpoint string, conn *sql.DB) *ClickHouseBackendConnector {
return &ClickHouseBackendConnector{
Endpoint: endpoint,
connection: conn,
BasicSqlBackendConnector: BasicSqlBackendConnector{
connection: conn,
},
Endpoint: endpoint,
}
}

Expand Down
65 changes: 2 additions & 63 deletions quesma/backend_connectors/mysql_backend_connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,35 +4,14 @@
package backend_connectors

import (
"context"
"database/sql"
quesma_api "github.com/QuesmaOrg/quesma/quesma/v2/core"
_ "github.com/go-sql-driver/mysql"
)

type MySqlRows struct {
rows *sql.Rows
}

func (p *MySqlRows) Next() bool {
return p.rows.Next()
}

func (p *MySqlRows) Scan(dest ...interface{}) error {
return p.rows.Scan(dest...)
}

func (p *MySqlRows) Close() error {
return p.rows.Close()
}

func (p *MySqlRows) Err() error {
return p.rows.Err()
}

type MySqlBackendConnector struct {
Endpoint string
connection *sql.DB
BasicSqlBackendConnector
Endpoint string
}

func (p *MySqlBackendConnector) InstanceName() string {
Expand All @@ -55,43 +34,3 @@ func (p *MySqlBackendConnector) Open() error {
p.connection = conn
return nil
}

func (p *MySqlBackendConnector) Close() error {
if p.connection == nil {
return nil
}
return p.connection.Close()
}

func (p *MySqlBackendConnector) Query(ctx context.Context, query string, args ...interface{}) (quesma_api.Rows, error) {
rows, err := p.connection.QueryContext(context.Background(), query, args...)
if err != nil {
return nil, err
}
return &MySqlRows{rows: rows}, nil
}

func (p *MySqlBackendConnector) QueryRow(ctx context.Context, query string, args ...interface{}) quesma_api.Row {
return p.connection.QueryRowContext(ctx, query, args...)
}

func (p *MySqlBackendConnector) Exec(ctx context.Context, query string, args ...interface{}) error {
if len(args) == 0 {
_, err := p.connection.ExecContext(context.Background(), query)
return err
}
_, err := p.connection.ExecContext(context.Background(), query, args...)
return err
}

func (p *MySqlBackendConnector) Stats() quesma_api.DBStats {
stats := p.connection.Stats()
return quesma_api.DBStats{
MaxOpenConnections: stats.MaxOpenConnections,
OpenConnections: stats.OpenConnections,
}
}

func (p *MySqlBackendConnector) Ping() error {
return nil
}
70 changes: 14 additions & 56 deletions quesma/backend_connectors/postgres_backend_connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,78 +4,36 @@
package backend_connectors

import (
"context"
"database/sql"
quesma_api "github.com/QuesmaOrg/quesma/quesma/v2/core"
"github.com/jackc/pgx/v4"

_ "github.com/jackc/pgx/v5/stdlib"
)

type PostgresBackendConnector struct {
Endpoint string
connection *pgx.Conn
BasicSqlBackendConnector
Endpoint string
}

func (p *PostgresBackendConnector) InstanceName() string {
return "postgresql"
}

func (p *PostgresBackendConnector) GetId() quesma_api.BackendConnectorType {
return quesma_api.PgSQLBackend
}

func (p *PostgresBackendConnector) Open() error {
conn, err := pgx.Connect(context.Background(), p.Endpoint)
// Note: pgx library also has its own custom interface (pgx.Connect), which is not compatible
// with the standard sql.DB interface, but has more features and is more efficient.
conn, err := sql.Open("pgx", p.Endpoint)
if err != nil {
return err
}
p.connection = conn
return nil
}

func (p *PostgresBackendConnector) Close() error {
if p.connection == nil {
return nil
}
return p.connection.Close(context.Background())
}

func (p *PostgresBackendConnector) Query(ctx context.Context, query string, args ...interface{}) (quesma_api.Rows, error) {
pgRows, err := p.connection.Query(ctx, query, args...)
err = conn.Ping()
if err != nil {
return nil, err
}
return &PgRows{rows: pgRows}, nil
}

func (p *PostgresBackendConnector) QueryRow(ctx context.Context, query string, args ...interface{}) quesma_api.Row {
return p.connection.QueryRow(ctx, query, args...)
}

func (p *PostgresBackendConnector) Exec(ctx context.Context, query string, args ...interface{}) error {
if len(args) == 0 {
_, err := p.connection.Exec(ctx, query)
return err
}
_, err := p.connection.Exec(ctx, query, args...)
return err
}

func (p *PostgresBackendConnector) Stats() quesma_api.DBStats {
return quesma_api.DBStats{}
}

type PgRows struct {
rows pgx.Rows
}

func (p *PgRows) Next() bool {
return p.rows.Next()
}

func (p *PgRows) Scan(dest ...interface{}) error {
return p.rows.Scan(dest...)
}

func (p *PgRows) Close() error {
p.rows.Close()
p.connection = conn
return nil
}

func (p *PgRows) Err() error {
return p.rows.Err()
}
2 changes: 2 additions & 0 deletions quesma/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ require (
github.com/jackc/pgproto3/v2 v2.3.3 // indirect
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect
github.com/jackc/pgtype v1.14.4 // indirect
github.com/jackc/puddle/v2 v2.2.2 // indirect
github.com/k0kubun/colorstring v0.0.0-20150214042306-9440f1994b88 // indirect
github.com/knadh/koanf/maps v0.1.1 // indirect
github.com/kr/text v0.2.0 // indirect
Expand All @@ -57,6 +58,7 @@ require (
github.com/tidwall/match v1.1.1 // indirect
github.com/tidwall/pretty v1.2.1 // indirect
golang.org/x/crypto v0.32.0 // indirect
golang.org/x/sync v0.10.0 // indirect
golang.org/x/text v0.21.0 // indirect
)

Expand Down
Loading

0 comments on commit b37c655

Please sign in to comment.