Skip to content

Commit

Permalink
[OSPP2024]feat: support aliyun OSS Sink Connector (#540)
Browse files Browse the repository at this point in the history
feat: support rocketmq-connect-oss sink
  • Loading branch information
limbo-24 authored Oct 30, 2024
1 parent 3bfdef8 commit 0da281f
Show file tree
Hide file tree
Showing 6 changed files with 800 additions and 0 deletions.
48 changes: 48 additions & 0 deletions connectors/aliyun/rocketmq-connect-oss/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# rocketmq-connect-oss
* **rocketmq-connect-oss** 说明
```
Be responsible for consuming messages from producer and writing data to oss.
```

## rocketmq-connect-oss 打包
```
mvn clean install -Dmaven.test.skip=true
```

## rocketmq-connect-oss 启动

* **rocketmq-connect-oss** 启动

```
http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-oss-sink-connector-name}
?config={"source-rocketmq":"${runtime-ip}:${runtime-port}","source-cluster":"${broker-cluster}","connector-class":"org.apache.rocketmq.connect.oss.sink.OssSinkConnector","connect-topicname":"${connect-topicname}","accessKeyId":"${accessKeyId}","accessKeySecret":"${accessKeySecret}","accountEndpoint":"${accountEndpoint}","bucketName":"${bucketName}","fileUrlPrefix":"${fileUrlPrefix}","objectName":"${objectName}","region":"${region}","partitionMethod":"${partitionMethod}"}
```

例子
```
http://localhost:8081/connectors/ossConnectorSink?config={"source-rocketmq":"localhost:9876","source-cluster":"DefaultCluster",
"connector-class":"org.apache.rocketmq.connect.oss.sink.OssSinkConnector","connect-topicname":"oss-topic","accountEndpoint":"xxxx","accessKeyId":"xxxx","accessKeySecret":"xxxx",
"bucketName":"xxxx","objectName":"xxxx","region":"xxxx","partitionMethod":"xxxx","fileUrlPrefix":"xxxx"}
```

>**注:** `rocketmq-oss-connect` 的启动依赖于`rocketmq-connect-runtime`项目的启动,需将打好的所有`jar`包放置到`runtime`项目中`pluginPaths`配置的路径后再执行上面的启动请求,该值配置在`runtime`项目下的`connect.conf`文件中
## rocketmq-connect-oss 停止

```
http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-oss-connector-name}/stop
```

## rocketmq-connect-oss 参数说明
* **oss-sink-connector 参数说明**

| KEY | TYPE | Must be filled | Description | Example
|-----------------|--------|----------------|-------------------------------|----------------------------|
| accountEndpoint | String | YES | OSS endpoint | oss-cn-beijing.aliyuncs.com |
| accessKeyId | String | YES | 阿里云授信账号的AK | xxxx |
| accessKeySecret | String | YES | 阿里云授信账号的SK | xxx |
| bucketName | String | YES | OSS bucketName | test_bucket |
| objectName | String | YES | 上传目的object名字 | test.txt |
| region | String | YES | OSS region | cn-beijing |
| partitionMethod | String | YES | 分区模式,Normal表示不分区,Time表示按时间分区 | Time |
| fileUrlPrefix | String | YES | 到object的URL前缀 | file1/ |
218 changes: 218 additions & 0 deletions connectors/aliyun/rocketmq-connect-oss/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,218 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-connect-oss</artifactId>
<version>1.0.0</version>

<name>connect-oss</name>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<slf4j.version>1.7.36</slf4j.version>
<logback.version>1.2.11</logback.version>
<junit.version>4.13.2</junit.version>
<fastjson.version>1.2.83</fastjson.version>
<assertj.version>3.22.0</assertj.version>
<mockito.version>4.5.1</mockito.version>
<openmessaging-connector.version>0.1.4</openmessaging-connector.version>
<commons-lang3.version>3.12.0</commons-lang3.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<aliyun-sdk-oss.version>3.17.2</aliyun-sdk-oss.version>
<aliyun-java-sdk-sts.version>3.1.0</aliyun-java-sdk-sts.version>
<aliyun-java-sdk-core.version>4.6.0</aliyun-java-sdk-core.version>
<gson.version>2.9.0</gson.version>
</properties>

<build>
<plugins>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>versions-maven-plugin</artifactId>
<version>2.3</version>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>clirr-maven-plugin</artifactId>
<version>2.7</version>
</plugin>
<plugin>
<artifactId>maven-dependency-plugin</artifactId>
<configuration>
<outputDirectory>${project.build.directory}/lib</outputDirectory>
<excludeTransitive>false</excludeTransitive>
<stripVersion>true</stripVersion>
</configuration>
</plugin>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.1</version>
<configuration>
<source>${maven.compiler.source}</source>
<target>${maven.compiler.target}</target>
<compilerVersion>${maven.compiler.source}</compilerVersion>
<showDeprecation>true</showDeprecation>
<showWarnings>true</showWarnings>
</configuration>
</plugin>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.19.1</version>
<configuration>
<argLine>-Xms512m -Xmx1024m</argLine>
<forkMode>always</forkMode>
<includes>
<include>**/*Test.java</include>
</includes>
</configuration>
</plugin>
<plugin>
<artifactId>maven-site-plugin</artifactId>
<version>3.6</version>
<configuration>
<locales>en_US</locales>
<outputEncoding>UTF-8</outputEncoding>
<inputEncoding>UTF-8</inputEncoding>
</configuration>
</plugin>
<plugin>
<artifactId>maven-source-plugin</artifactId>
<version>3.0.1</version>
<executions>
<execution>
<id>attach-sources</id>
<goals>
<goal>jar</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-javadoc-plugin</artifactId>
<version>2.10.4</version>
<configuration>
<charset>UTF-8</charset>
<locale>en_US</locale>
<excludePackageNames>io.openmessaging.internal</excludePackageNames>
</configuration>
<executions>
<execution>
<id>aggregate</id>
<goals>
<goal>aggregate</goal>
</goals>
<phase>site</phase>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-resources-plugin</artifactId>
<version>3.0.2</version>
<configuration>
<encoding>${project.build.sourceEncoding}</encoding>
</configuration>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>findbugs-maven-plugin</artifactId>
<version>3.0.4</version>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<configuration>
<archive>
<manifest>
<mainClass>org.apache.rocketmq.connect.oss.sink.OssSinkConnector</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>${logback.version}</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
<version>${logback.version}</version>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<version>${assertj.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>${fastjson.version}</version>
</dependency>
<dependency>
<groupId>io.openmessaging</groupId>
<artifactId>openmessaging-connector</artifactId>
<version>${openmessaging-connector.version}</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>${junit.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>${mockito.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>${commons-lang3.version}</version>
</dependency>
<dependency>
<groupId>com.aliyun.oss</groupId>
<artifactId>aliyun-sdk-oss</artifactId>
<version>${aliyun-sdk-oss.version}</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>${gson.version}</version>
</dependency>
<dependency>
<groupId>com.aliyun</groupId>
<artifactId>aliyun-java-sdk-core</artifactId>
<version>${aliyun-java-sdk-core.version}</version>
</dependency>
<dependency>
<groupId>com.aliyun</groupId>
<artifactId>aliyun-java-sdk-sts</artifactId>
<version>${aliyun-java-sdk-sts.version}</version>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package org.apache.rocketmq.connect.oss.sink;

import io.openmessaging.KeyValue;
import io.openmessaging.connector.api.component.task.Task;
import io.openmessaging.connector.api.component.task.sink.SinkConnector;
import io.openmessaging.internal.DefaultKeyValue;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.connect.oss.sink.constant.OssConstant;

import java.util.ArrayList;
import java.util.List;

public class OssSinkConnector extends SinkConnector {

private String accessKeyId;

private String accessKeySecret;

private String accountEndpoint;

private String bucketName;

private String fileUrlPrefix;

private String objectName;

private String region;

private String partitionMethod;

@Override
public List<KeyValue> taskConfigs(int maxTasks) {
List<KeyValue> keyValueList = new ArrayList<>();
for (int i = 0; i < maxTasks; ++i) {
KeyValue keyValue = new DefaultKeyValue();
keyValue.put(OssConstant.ACCESS_KEY_ID, accessKeyId);
keyValue.put(OssConstant.ACCESS_KEY_SECRET, accessKeySecret);
keyValue.put(OssConstant.ACCOUNT_ENDPOINT, accountEndpoint);
keyValue.put(OssConstant.BUCKET_NAME, bucketName);
keyValue.put(OssConstant.FILE_URL_PREFIX, fileUrlPrefix);
keyValue.put(OssConstant.OBJECT_NAME, objectName);
keyValue.put(OssConstant.REGION, region);
keyValue.put(OssConstant.PARTITION_METHOD, partitionMethod);
keyValueList.add(keyValue);
}
return keyValueList;
}

@Override
public Class<? extends Task> taskClass() {
return OssSinkTask.class;
}

@Override
public void validate(KeyValue config) {
if (StringUtils.isBlank(config.getString(OssConstant.ACCESS_KEY_ID))
|| StringUtils.isBlank(config.getString(OssConstant.ACCESS_KEY_SECRET))
|| StringUtils.isBlank(config.getString(OssConstant.ACCOUNT_ENDPOINT))
|| StringUtils.isBlank(config.getString(OssConstant.BUCKET_NAME))
|| StringUtils.isBlank(config.getString(OssConstant.OBJECT_NAME))
|| StringUtils.isBlank(config.getString(OssConstant.REGION))
|| StringUtils.isBlank(config.getString(OssConstant.PARTITION_METHOD))) {
throw new RuntimeException("Oss required parameter is null !");
}
}

@Override
public void start(KeyValue config) {
accessKeyId = config.getString(OssConstant.ACCESS_KEY_ID);
accessKeySecret = config.getString(OssConstant.ACCESS_KEY_SECRET);
accountEndpoint = config.getString(OssConstant.ACCOUNT_ENDPOINT);
bucketName = config.getString(OssConstant.BUCKET_NAME);
fileUrlPrefix = config.getString(OssConstant.FILE_URL_PREFIX);
objectName = config.getString(OssConstant.OBJECT_NAME);
region = config.getString(OssConstant.REGION);
partitionMethod = config.getString(OssConstant.PARTITION_METHOD);
}

@Override
public void stop() {

}
}
Loading

0 comments on commit 0da281f

Please sign in to comment.