Skip to content

Commit

Permalink
[ISSUE #608] Fix multi erros in Rmq connect experience docs (#609)
Browse files Browse the repository at this point in the history
* docs: 优化RocketMQ Connect快速接入文档,补充操作细节命令、配置说明等,使得读者能按照文档step by step的进行操作。

* docs: 优化RocketMQ Connect快速接入文档,添加storePathRootDir配置说明

* docs: 优化RocketMQ Connect快速接入文档,添加RMQ工具测试步骤

* docs: 优化RocketMQ Connect 快速接入、实战4、实战5 文档,修复错误的docker命令等内容,添加详细的操作步骤

* docs: 优化5.x版本的RocketMQ Connect 快速接入、实战4、实战5 文档,修复错误的docker命令等内容,添加详细的操作步骤

* docs: 优化5.x版本的RocketMQ Connect 快速接入、实战4、实战5 文档,修复错误的docker命令等内容,添加详细的操作步骤

* docs: 优化5.x版本的RocketMQ Connect 快速接入、实战4、实战5 英文文档,修复错误的docker命令等内容,添加详细的操作步骤

---------

Co-authored-by: yuanzhi <[email protected]>
  • Loading branch information
patrickYang6625 and yuanzhi authored Nov 23, 2023
1 parent 7a4a861 commit 1387005
Show file tree
Hide file tree
Showing 12 changed files with 2,304 additions and 808 deletions.
231 changes: 149 additions & 82 deletions docs/10-connect/03RocketMQ Connect Quick Start.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,159 +4,217 @@

# 快速开始

单机模式下[rocketmq-connect-sample]作为 demo
本教程将采用单机模式启动一个RocketMQ Connector示例工程rocketmq-connect-sample,来帮助你了解连接器的工作原理。
示例工程中提供了源端连接器,作用是从源文件中读取数据然后发送到RocketMQ集群。
同时提供了目的端连接器,作用是从RocketMQ集群中读取消息然后写入目的端文件。

rocketmq-connect-sample的主要作用是从源文件中读取数据发送到RocketMQ集群 然后从Topic中读取消息,写入到目标文件

## 1.准备
## 1.准备:启动RocketMQ

1. Linux/Unix/Mac
2. 64bit JDK 1.8+;
3. Maven 3.2.x或以上版本;
4. 启动 [RocketMQ](https://rocketmq.apache.org/docs/quick-start/);
5. 创建测试Topic
> sh ${ROCKETMQ_HOME}/bin/mqadmin updateTopic -t fileTopic -n localhost:9876 -c DefaultCluster -r 8 -w 8
4. 启动 RocketMQ。使用[RocketMQ 4.x](https://rocketmq.apache.org/docs/4.x/)
[RocketMQ 5.x](https://rocketmq.apache.org/docs/quickStart/01quickstart/)版本均可;
5. 工具测试 RocketMQ 消息收发是否正常。详见[RocketMQ 4.x](https://rocketmq.apache.org/docs/4.x/)
[RocketMQ 5.x](https://rocketmq.apache.org/docs/quickStart/01quickstart/)文档。

这里利用环境变量NAMESRV_ADDR来告诉工具客户端RocketMQ的NameServer地址为localhost:9876

```shell
#$ cd distribution/target/rocketmq-4.9.7/rocketmq-4.9.7
$ cd distribution/target/rocketmq-5.1.4/rocketmq-5.1.4

**tips** : ${ROCKETMQ_HOME} 位置说明
$ export NAMESRV_ADDR=localhost:9876
$ sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
SendResult [sendStatus=SEND_OK, msgId= ...

>bin-release.zip 版本:/rocketmq-all-4.9.4-bin-release
>
>source-release.zip 版本:/rocketmq-all-4.9.4-source-release/distribution
$ sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
ConsumeMessageThread_%d Receive New Messages: [MessageExt...
```
**说明**:RocketMQ具备自动创建Topic和Group的功能,在发送消息或订阅消息时,如果相应的Topic或Group不存在,RocketMQ会自动创建它们。因此不需要提前创建Topic和Group。
## 2.构建Connect
## 2.构建Connector Runtime
```
```shell
git clone https://github.com/apache/rocketmq-connect.git

cd rocketmq-connect

mvn -Prelease-connect -DskipTests clean install -U
export RMQ_CONNECT_HOME=`pwd`

mvn -Prelease-connect -Dmaven.test.skip=true clean install -U
```
## 3.运行Worker
**注意**:本工程已默认包含 rocketmq-connect-sample 的代码,因此无需单独构建 rocketmq-connect-sample 插件。
## 3.单机模式运行 Connector Worker
### 修改配置
`connect-standalone.conf`中配置了RocketMQ连接地址等信息,需要根据使用情况进行修改,具体参见[9.配置文件说明](#9配置文件说明)。
```
cd distribution/target/rocketmq-connect-0.0.1-SNAPSHOT/rocketmq-connect-0.0.1-SNAPSHOT
cd $RMQ_CONNECT_HOME/distribution/target/rocketmq-connect-0.0.1-SNAPSHOT/rocketmq-connect-0.0.1-SNAPSHOT
sh bin/connect-standalone.sh -c conf/connect-standalone.conf &
vim conf/connect-standalone.conf
```
单机模式(standalone)下,RocketMQ Connect 会把同步位点信息持久化到本地文件目录 storePathRootDir
>storePathRootDir=/Users/YourUsername/rocketmqconnect/storeRoot
如果想重置同步位点,则需要删除持久化的位点信息文件
```shell
rm -rf /Users/YourUsername/rocketmqconnect/storeRoot/*
```
**tips**: 可修改 /bin/runconnect.sh 适当调整 JVM Parameters Configuration
>JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m"
### 采用单机模式启动Connector Worker
runtime启动成功:
```shell
sh bin/connect-standalone.sh -c conf/connect-standalone.conf &
```
>The standalone worker boot success.
**tips**: 可修改 docker/connect/bin/runconnect.sh 适当调整 JVM 启动参数
>JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m"
查看启动日志文件:
```shell
tail -100f ~/logs/rocketmqconnect/connect_runtime.log
```
>tail -100f ~/logs/rocketmqconnect/connect_runtime.log
runtime若启动成功则日志文件中能看到如下打印内容:
>The standalone worker boot success.
ctrl + c 退出日志
要退出tail -f命令的日志追踪模式,您可以按下 Ctrl + C 组合键。
## 4.启动source connector
当前目录创建测试文件 test-source-file.txt
```
### 创建源端文件并写入测试数据
```shell
mkdir -p /Users/YourUsername/rocketmqconnect/
cd /Users/YourUsername/rocketmqconnect/
touch test-source-file.txt
echo "Hello \r\nRocketMQ\r\n Connect" >> test-source-file.txt
```
**注意**:不能有空行(demo程序遇到空行会报错)。source connector会持续读取源端文件,每读取到一行数据就会转换为消息体发送到RocketMQ,供sink connector消费。
curl -X POST -H "Content-Type: application/json" http://127.0.0.1:8082/connectors/fileSourceConnector -d '{"connector.class":"org.apache.rocketmq.connect.file.FileSourceConnector","filename":"test-source-file.txt","connect.topicname":"fileTopic"}'
### 启动Source Connector
```shell
curl -X POST -H "Content-Type: application/json" http://127.0.0.1:8082/connectors/fileSourceConnector -d '{
"connector.class": "org.apache.rocketmq.connect.file.FileSourceConnector",
"filename": "/Users/YourUsername/rocketmqconnect/test-source-file.txt",
"connect.topicname": "fileTopic"
}'
```
curl请求返回status:200则表示创建成功,返回样例:
>{"status":200,"body":{"connector.class":"org.apache.rocketmq.connect.file.FileSourceConnector","filename":"/Users/YourUsername/rocketmqconnect/test-source-file.txt","connect.topicname":"fileTopic"}}
看到以下日志说明 file source connector 启动成功了
```shell
tail -100f ~/logs/rocketmqconnect/connect_runtime.log
```
>tail -100f ~/logs/rocketmqconnect/connect_runtime.log
>
>2019-07-16 11:18:39 INFO pool-7-thread-1 - **Source task start**, config:{"properties":{"source-record-...
>Start connector fileSourceConnector and set target state STARTED successed!!
#### source connector配置说明
| key | nullable | default | description |
|-------------------| -------- | ---------------------|--------------------------|
| connector.class | false | | 实现 Connector接口的类名称(包含包名) |
| filename | false | | 数据源文件名称 |
| connect.topicname | false | | 同步文件数据所需topic |
| filename | false | | 数据源端文件名称(建议使用绝对路径) |
| connect.topicname | false | | 同步文件数据所使用的RocketMQ topic |
## 5.启动sink connector
```shell
curl -X POST -H "Content-Type: application/json" http://127.0.0.1:8082/connectors/fileSinkConnector -d '{
"connector.class": "org.apache.rocketmq.connect.file.FileSinkConnector",
"filename": "/Users/YourUsername/rocketmqconnect/test-sink-file.txt",
"connect.topicnames": "fileTopic"
}'
```
curl -X POST -H "Content-Type: application/json" http://127.0.0.1:8082/connectors/fileSinkConnector -d '{"connector.class":"org.apache.rocketmq.connect.file.FileSinkConnector","filename":"test-sink-file.txt","connect.topicnames":"fileTopic"}'
cat test-sink-file.txt
curl请求返回status:200则表示创建成功,返回样例:
>{"status":200,"body":{"connector.class":"org.apache.rocketmq.connect.file.FileSinkConnector","filename":"/Users/YourUsername/rocketmqconnect/test-sink-file.txt","connect.topicnames":"fileTopic"}}
看到以下日志说明file sink connector 启动成功了
```shell
tail -100f ~/logs/rocketmqconnect/connect_runtime.log
```
> Start connector fileSinkConnector and set target state STARTED successed!!
查看sink connector是否将数据写入了目的端文件:
```shell
cat /Users/YourUsername/rocketmqconnect/test-sink-file.txt
```
> tail -100f ~/logs/rocketmqconnect/connect_runtime.log
如果生成了 test-sink-file.txt 文件,并且与 source-file.txt 内容一样则说明整个流程正常运行。
看到以下日志说明file sink connector 启动成功了
继续向源端文件 test-source-file.txt 中写入测试数据,
```shell
cd /Users/YourUsername/rocketmqconnect/
> 2019-07-16 11:24:58 INFO pool-7-thread-2 - **Sink task start**, config:{"properties":{"source-record-...
echo "Say Hi to\r\nRMQ Connector\r\nAgain" >> test-source-file.txt
# Wait a few seconds, check if rocketmq-connect replicate data to sink file succeed
sleep 10
cat /Users/YourUsername/rocketmqconnect/test-sink-file.txt
```
**注意**:文件内容可能顺序不一样,这是因为 `rocketmq-connect-sample` 向RocketMQ Topic中收发消息时,使用的消息类型是普通消息,区别于顺序消息,消费普通消息时是不保证顺序的。
如果 test-sink-file.txt 生成并且与 source-file.txt 内容一样,说明整个流程正常运行。
文件内容可能顺序不一样,这主要是因为RocketMQ发到不同queue时,接收不同queue消息顺序可能也不一致导致的,是正常的。
#### sink connector配置说明
| key | nullable | default | description |
|--------------------| -------- | ------- | -------------------------------------------------------------------------------------- |
| connector.class | false | | 实现Connector接口的类名称(包含包名) |
| filename | false | | sink拉去的数据保存到文件 |
| connect.topicnames | false | | sink需要处理数据消息topics |
| key | nullable | default | description |
|--------------------| -------- | ------- |----------------------------------------|
| connector.class | false | | 实现Connector接口的类名称(包含包名) |
| filename | false | | sink消费RocketMQ数据后保存到的目的端文件名称(建议使用绝对路径) |
| connect.topicnames | false | | sink需要处理数据消息topics |
```
注:source/sink配置文件说明是以rocketmq-connect-sample为demo,不同source/sink connector配置有差异,请以具体sourc/sink connector 为准
```
**注意**:source/sink配置文件说明是以rocketmq-connect-sample为demo,不同source/sink connector配置有差异,请以具体sourc/sink connector 为准
## 6.停止connector

```shell
GET请求
http://(your worker ip):(port)/connectors/(connector name)/stop
RESTFul 命令格式 `http://(your worker ip):(port)/connectors/(connector name)/stop`
停止demo中的两个connector
curl http://127.0.0.1:8082/connectors/fileSinkConnector/stop
curl http://127.0.0.1:8082/connectors/fileSourceConnector/stop
```shell
curl http://127.0.0.1:8082/connectors/fileSinkConnector/stop
curl http://127.0.0.1:8082/connectors/fileSourceConnector/stop
```
看到以下日志说明connector停止成功了
>**Source task stop**, config:{"properties":{"source-record-converter":"org.apache.rocketmq.connect.runtime.converter.JsonConverter","filename":"/home/zhoubo/IdeaProjects/my-new3-rocketmq-externals/rocketmq-connect/rocketmq-connect-runtime/source-file.txt","task-class":"org.apache.rocketmq.connect.file.FileSourceTask","topic":"fileTopic","connector-class":"org.apache.rocketmq.connect.file.FileSourceConnector","update-timestamp":"1564765189322"}}
curl请求返回status:200则表示停止成功,返回样例:
>{"status":200,"body":"Connector [fileSinkConnector] deleted successfully"}
看到以下日志说明file sink connector 停止成功了
```shell
tail -100f ~/logs/rocketmqconnect/connect_default.log
```
> Completed shutdown for connectorName:fileSinkConnector
## 7.停止Worker进程
```
```shell
cd $RMQ_CONNECT_HOME/distribution/target/rocketmq-connect-0.0.1-SNAPSHOT/rocketmq-connect-0.0.1-SNAPSHOT
sh bin/connectshutdown.sh
```
## 8.日志目录
查看日志目录(下面2个命令是等价的)
```shell
ls $HOME/logs/rocketmqconnect
ls ~/logs/rocketmqconnect
```
>${user.home}/logs/rocketmqconnect
## 9.配置文件

持久化配置文件默认目录 /tmp/storeRoot

| key | description |
|----------------------|---------------------------|
| connectorConfig.json | connector配置持久化文件 |
| position.json | source connect数据处理进度持久化文件 |
| taskConfig.json | task配置持久化文件 |
| offset.json | sink connect数据消费进度持久化文件 |
| connectorStatus.json | connector 状态持久化文件 |
| taskStatus.json | task 状态持久化文件 |

## 10.配置说明
## 9.配置文件说明
可根据使用情况修改 [RESTful](https://restfulapi.cn/) 端口,storeRoot 路径,Nameserver 地址等信息
connect-standalone.conf配置文件中, 配置了 [RESTful](https://restfulapi.cn/) 端口,storeRoot 路径,Nameserver 地址等信息,可根据需要进行修改。
文件位置:work 启动目录下 conf/connect-standalone.conf
配置文件样例:
```shell
#current cluster node uniquely identifies
Expand All @@ -166,17 +224,26 @@ workerId=DEFAULT_WORKER_1
httpPort=8082
# Local file dir for config store
storePathRootDir=/home/connect/storeRoot
storePathRootDir=/Users/YourUsername/rocketmqconnect/storeRoot
#需要修改为自己的rocketmq nameserver 接入点
# RocketMQ namesrvAddr
namesrvAddr=127.0.0.1:9876
#用于加载Connector插件,类似于jvm启动加载jar包或者class类,这里目录目录用于放Connector相关的实现插件,
支持文件和目录
# Source or sink connector jar file dir
pluginPaths=rocketmq-connect-sample/target/rocketmq-connect-sample-0.0.1-SNAPSHOT.jar
# 插件地址,用于Worker加载Source/Sink Connector插件
# rocketmq-connect 工程已默认包含 rocketmq-connect-sample 模块,因此这里无需配置。
pluginPaths=
```
storePathRootDir配置说明:
# 补充:将 Connector 相关实现插件保存到指定文件夹
# pluginPaths=/usr/local/connector-plugins/*
```
单机模式(standalone)下,RocketMQ Connect 会把同步位点信息持久化到本地文件目录 storePathRootDir,持久化文件包括
| key | description |
|----------------------|---------------------------|
| connectorConfig.json | connector配置持久化文件 |
| position.json | source connect数据处理进度持久化文件 |
| taskConfig.json | task配置持久化文件 |
| offset.json | sink connect数据消费进度持久化文件 |
| connectorStatus.json | connector 状态持久化文件 |
| taskStatus.json | task 状态持久化文件 |
Loading

0 comments on commit 1387005

Please sign in to comment.