Skip to content

Commit

Permalink
Add Pipeline controller and Kafka JDBC driver (#73)
Browse files Browse the repository at this point in the history
* Add Pipeline controller and Kafka JDBC driver

* PR review fixes h/t @jogrogan
  • Loading branch information
ryannedolan authored Dec 6, 2024
1 parent e12144b commit 159a0e9
Show file tree
Hide file tree
Showing 36 changed files with 893 additions and 101 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/integration-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ jobs:
run: make deploy
- name: Deploy Samples
run: make deploy-samples
- name: Wait for Readiness
run: kubectl wait kafka/one --for=condition=Ready --timeout=10m -n kafka
- name: Run Integration Tests
run: make integration-tests
- name: Capture Cluster State
Expand Down
11 changes: 7 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@

install:
./gradlew installDist
./gradlew compileJava installDist

build:
./gradlew build
Expand All @@ -9,8 +9,11 @@ build:

bounce: build undeploy deploy deploy-samples deploy-config deploy-demo

# Integration tests expect K8s and Kafka to be running
integration-tests:
echo "\nNOTHING TO DO FOR NOW"
kubectl port-forward -n kafka svc/one-kafka-external-0 9092 & echo $$! > port-forward.pid
./gradlew intTest || kill `cat port-forward.pid`
kill `cat port-forward.pid`

clean:
./gradlew clean
Expand All @@ -26,7 +29,7 @@ undeploy:
kubectl delete -f ./deploy || echo "skipping"
kubectl delete configmap hoptimator-configmap || echo "skipping"

quickstart: build deploy-dev-environment deploy
quickstart: build deploy

deploy-dev-environment:
kubectl create -f https://github.com/jetstack/cert-manager/releases/download/v1.8.2/cert-manager.yaml || echo "skipping"
Expand Down Expand Up @@ -54,4 +57,4 @@ release:
test -n "$(VERSION)" # MISSING ARG: $$VERSION
./gradlew publish

.PHONY: build clean quickstart deploy-dev-environment deploy deploy-samples deploy-config integration-tests bounce generate-models release
.PHONY: build clean quickstart deploy-dev-environment deploy deploy-samples deploy-demo deploy-config integration-tests bounce generate-models release
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ Hoptimator requires a Kubernetes cluster. To connect from outside a Kubernetes c
```
$ make install # build and install SQL CLI
$ make deploy deploy-demo # install CRDs and K8s objects
$ kubectl port-forward -n kafka svc/one-kafka-external-0 9092 &
$ ./hoptimator
> !intro
```
Expand Down
15 changes: 11 additions & 4 deletions deploy/dev/kafka.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,18 @@ spec:
version: 3.8.0
replicas: 1
listeners:
- name: plain
port: 9092
type: internal
tls: false
- name: tls
port: 9093
type: internal
tls: true
- name: external
port: 9092
type: nodeport
tls: false
configuration:
brokers:
- broker: 0
advertisedHost: localhost
config:
offsets.topic.replication.factor: 1
transaction.state.log.replication.factor: 1
Expand All @@ -50,3 +54,6 @@ spec:
replicas: 3
storage:
type: ephemeral
entityOperator:
topicOperator: {}

71 changes: 71 additions & 0 deletions deploy/samples/kafkadb.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
apiVersion: hoptimator.linkedin.com/v1alpha1
kind: Database
metadata:
name: kafka-database
spec:
schema: KAFKA
url: jdbc:kafka://bootstrap.servers=localhost:9092
dialect: Calcite

---

apiVersion: hoptimator.linkedin.com/v1alpha1
kind: TableTemplate
metadata:
name: kafka-template
spec:
databases:
- kafka-database
yaml: |
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
name: {{name}}
namespace: kafka
labels:
strimzi.io/cluster: one
spec:
topicName: {{table}}
partitions: 1
replicas: 1
config:
retention.ms: 7200000
segment.bytes: 1073741824
connector: |
connector = kafka
topic = {{table}}
properties.bootstrap.servers = localhost:9092
---

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
name: existing-topic-1
namespace: kafka
labels:
strimzi.io/cluster: one
spec:
partitions: 1
replicas: 1
config:
retention.ms: 7200000
segment.bytes: 1073741824

---

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
name: existing-topic-2
namespace: kafka
labels:
strimzi.io/cluster: one
spec:
partitions: 1
replicas: 1
config:
retention.ms: 7200000
segment.bytes: 1073741824


1 change: 1 addition & 0 deletions hoptimator-cli/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ dependencies {
implementation project(':hoptimator-avro')
implementation project(':hoptimator-demodb')
implementation project(':hoptimator-jdbc')
implementation project(':hoptimator-kafka')
implementation project(':hoptimator-k8s')
implementation project(':hoptimator-util')
implementation libs.calcite.core
Expand Down
12 changes: 6 additions & 6 deletions hoptimator-cli/src/main/java/sqlline/HoptimatorAppConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public Collection<CommandHandler> getCommandHandlers(SqlLine sqlline) {
list.addAll(super.getCommandHandlers(sqlline));
list.add(new IntroCommandHandler(sqlline));
list.add(new PipelineCommandHandler(sqlline));
list.add(new YamlCommandHandler(sqlline));
list.add(new SpecifyCommandHandler(sqlline));
return list;
}

Expand Down Expand Up @@ -112,17 +112,17 @@ public boolean echoToFile() {
}


private static final class YamlCommandHandler implements CommandHandler {
private static final class SpecifyCommandHandler implements CommandHandler {

private final SqlLine sqlline;

private YamlCommandHandler(SqlLine sqlline) {
private SpecifyCommandHandler(SqlLine sqlline) {
this.sqlline = sqlline;
}

@Override
public String getName() {
return "yaml";
return "specify";
}

@Override
Expand All @@ -137,7 +137,7 @@ public String getHelpText() {

@Override
public String matches(String line) {
if (startsWith(line, "!yaml") || startsWith(line, "yaml")) {
if (startsWith(line, "!spec") || startsWith(line, "spec")) {
return line;
} else {
return null;
Expand All @@ -147,7 +147,7 @@ public String matches(String line) {
@Override
public void execute(String line, DispatchCallback dispatchCallback) {
if (!(sqlline.getConnection() instanceof CalciteConnection)) {
sqlline.error("This connection doesn't support `!yaml`.");
sqlline.error("This connection doesn't support `!specify`.");
dispatchCallback.setToFailure();
return;
}
Expand Down
9 changes: 8 additions & 1 deletion hoptimator-jdbc/build.gradle
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
plugins {
id 'java'
id 'java-test-fixtures'
}

dependencies {
Expand All @@ -8,11 +9,17 @@ dependencies {
implementation libs.calcite.core
implementation libs.calcite.server
implementation libs.slf4j.api
testImplementation libs.quidem

testFixturesImplementation libs.quidem
testFixturesImplementation libs.calcite.core
testFixturesImplementation project(':hoptimator-api')
testFixturesImplementation project(':hoptimator-util')

testRuntimeOnly project(':hoptimator-demodb')
testFixturesImplementation(platform('org.junit:junit-bom:5.11.3'))
testImplementation(platform('org.junit:junit-bom:5.11.3'))
testImplementation 'org.junit.jupiter:junit-jupiter'
testFixturesImplementation 'org.junit.jupiter:junit-jupiter'
testRuntimeOnly 'org.junit.platform:junit-platform-launcher'
testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine'
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package com.linkedin.hoptimator.jdbc;

import com.linkedin.hoptimator.jdbc.HoptimatorDriver;
import com.linkedin.hoptimator.util.ConnectionService;
import com.linkedin.hoptimator.util.DeploymentService;
import com.linkedin.hoptimator.util.Sink;
import com.linkedin.hoptimator.util.planner.PipelineRel;

import net.hydromatic.quidem.AbstractCommand;
import net.hydromatic.quidem.Command;
import net.hydromatic.quidem.CommandHandler;
import net.hydromatic.quidem.Quidem;

import org.apache.calcite.rel.RelNode;
import org.apache.calcite.jdbc.CalciteConnection;

import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;

import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.Reader;
import java.io.Writer;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.file.Files;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.sql.Connection;
import java.sql.DriverManager;

public abstract class QuidemTestBase {

protected void run(String resourceName) throws IOException, URISyntaxException {
run(Thread.currentThread().getContextClassLoader().getResource(resourceName).toURI());
}

protected void run(URI resource) throws IOException {
File in = new File(resource);
File out = File.createTempFile(in.getName(), ".out");
try (Reader r = new FileReader(in);
Writer w = new PrintWriter(out)) {
Quidem.Config config = Quidem.configBuilder()
.withReader(r)
.withWriter(w)
.withConnectionFactory((x, y) -> DriverManager.getConnection("jdbc:hoptimator://" + x))
.withCommandHandler(new CustomCommandHandler())
.build();
new Quidem(config).execute();
}
List<String> input = Files.readAllLines(in.toPath(), StandardCharsets.UTF_8);
List<String> output = Files.readAllLines(out.toPath(), StandardCharsets.UTF_8);
Assertions.assertTrue(!input.isEmpty(), "input script is empty");
Assertions.assertTrue(!output.isEmpty(), "script output is empty");
for (String line : output) {
System.out.println(line);
}
Assertions.assertIterableEquals(input, output);
}

private static class CustomCommandHandler implements CommandHandler {
@Override
public Command parseCommand(List<String> lines, List<String> content, final String line) {
List<String> copy = new ArrayList<>();
copy.addAll(lines);
if (line.startsWith("spec")) {
return new AbstractCommand() {
@Override
public void execute(Context context, boolean execute) throws Exception {
if (execute) {
if (!(context.connection() instanceof CalciteConnection)) {
throw new IllegalArgumentException("This connection doesn't support `!specify`.");
}
String sql = context.previousSqlCommand().sql;
CalciteConnection conn = (CalciteConnection) context.connection();
RelNode rel = HoptimatorDriver.convert(conn.createPrepareContext(), sql).root.rel;
String specs = DeploymentService.plan(rel).pipeline().specify().stream()
.collect(Collectors.joining("\n---\n"));
context.echo(Collections.singletonList(specs));
} else {
context.echo(content);
}
context.echo(copy);
}
};
}

return null;
}
}
}
Loading

0 comments on commit 159a0e9

Please sign in to comment.