diff --git a/i18n/en/docusaurus-plugin-content-docs/current/10-connect/03RocketMQ Connect Quick Start.md b/i18n/en/docusaurus-plugin-content-docs/current/10-connect/03RocketMQ Connect Quick Start.md index b6a0c363fb..735bf608fe 100644 --- a/i18n/en/docusaurus-plugin-content-docs/current/10-connect/03RocketMQ Connect Quick Start.md +++ b/i18n/en/docusaurus-plugin-content-docs/current/10-connect/03RocketMQ Connect Quick Start.md @@ -2,163 +2,242 @@ # Quick Start -In standalone mode, [rocketmq-connect-sample] serves as a demo. +This tutorial will start a RocketMQ Connector example project "rocketmq-connect-sample" in standalone mode to help you understand the working principle of connectors. +The example project provides a source connector that reads data from source files and sends it to the RocketMQ cluster. +It also provides a sink connector that reads messages from the RocketMQ cluster and writes them to destination files. -The main purpose of rocketmq-connect-sample is to read data from a source file and send it to a RocketMQ cluster, and then read messages from the Topic and write them to a target file. - -## 1. Prepare +## 1. Preparation: Start RocketMQ 1. Linux/Unix/Mac 2. 64bit JDK 1.8+; 3. Maven 3.2.x+; -4. Start [RocketMQ](https://rocketmq.apache.org/docs/quick-start/); -5. Create test Topic +4. Start RocketMQ. Either [RocketMQ 4.x](https://rocketmq.apache.org/docs/4.x/) or + [RocketMQ 5.x](https://rocketmq.apache.org/docs/quickStart/01quickstart/) 5.x version can be used; +5. Test RocketMQ message sending and receiving using the tool. -> sh ${ROCKETMQ_HOME}/bin/mqadmin updateTopic -t fileTopic -n localhost:9876 -c DefaultCluster -r 8 -w 8 +Here, use the environment variable NAMESRV_ADDR to inform the tool client of the NameServer address of RocketMQ as localhost:9876. -**tips** : ${ROCKETMQ_HOME} locational instructions +```shell +#$ cd distribution/target/rocketmq-4.9.7/rocketmq-4.9.7 +$ cd distribution/target/rocketmq-5.1.4/rocketmq-5.1.4 ->bin-release.zip version:/rocketmq-all-4.9.4-bin-release -> ->source-release.zip version:/rocketmq-all-4.9.4-source-release/distribution +$ export NAMESRV_ADDR=localhost:9876 +$ sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer + SendResult [sendStatus=SEND_OK, msgId= ... +$ sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer + ConsumeMessageThread_%d Receive New Messages: [MessageExt... +``` -## 2. Build Connect +**Note**: RocketMQ has the feature of automatically creating Topic and Group. When sending or subscribing to messages, +if the corresponding Topic or Group does not exist, RocketMQ will automatically create them. Therefore, +there is no need to create Topic and Group in advance. -``` +## 2. Build Connector Runtime +```shell git clone https://github.com/apache/rocketmq-connect.git cd rocketmq-connect -mvn -Prelease-connect -DskipTests clean install -U +export RMQ_CONNECT_HOME=`pwd` +mvn -Prelease-connect -Dmaven.test.skip=true clean install -U ``` -## 3. Run Worker +**Note**: The project already includes the code for rocketmq-connect-sample by default, +so there is no need to build the rocketmq-connect-sample plugin separately. +## 3. Run Connector Worker in Standalone Mode + +### Modify Configuration +Modify the `connect-standalone.conf` file to configure the RocketMQ connection +address and other information. Please refer to [9. Configuration File Instructions](#9-configuration-file-instructions) for details. ``` -cd distribution/target/rocketmq-connect-0.0.1-SNAPSHOT/rocketmq-connect-0.0.1-SNAPSHOT +cd $RMQ_CONNECT_HOME/distribution/target/rocketmq-connect-0.0.1-SNAPSHOT/rocketmq-connect-0.0.1-SNAPSHOT -sh bin/connect-standalone.sh -c conf/connect-standalone.conf & +vim conf/connect-standalone.conf +``` + +In standalone mode, RocketMQ Connect persists the synchronization checkpoint information +to the local file directory storePathRootDir. +>storePathRootDir=/Users/YourUsername/rocketmqconnect/storeRoot + +If you want to reset the synchronization checkpoint, you need to delete the persisted +checkpoint file. + +```shell +rm -rf /Users/YourUsername/rocketmqconnect/storeRoot/* ``` -**tips**: The JVM Parameters Configuration can be adjusted in /bin/runconnect.sh as needed. +### Start Connector Worker in Standalone Mode + +```shell +sh bin/connect-standalone.sh -c conf/connect-standalone.conf & +``` + +**tips**: You can modify `docker/connect/bin/runconnect.sh` to adjust JVM startup +parameters as needed. >JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m" -runtime start successful: +To view the startup log file: ->The standalone worker boot success. +```shell +tail -100f ~/logs/rocketmqconnect/connect_runtime.log +``` -View the startup log files. +If the runtime starts successfully, you will see the following print in the log file: ->tail -100f ~/logs/rocketmqconnect/connect_runtime.log +>The standalone worker boot success. -`ctrl + c` exit log +To exit the log tracking mode of `tail -f` command, you can press the `Ctrl + C` key combination. -## 4. Start source connector +## 4. Start Source Connector -Create a test file named test-source-file.txt in the current directory. +### Create Source File and Write Test Data -``` +```shell +mkdir -p /Users/YourUsername/rocketmqconnect/ +cd /Users/YourUsername/rocketmqconnect/ touch test-source-file.txt echo "Hello \r\nRocketMQ\r\n Connect" >> test-source-file.txt +``` +**Note**: There should be no empty lines (the demo program will throw an error if it +encounters empty lines). The source connector will continuously read the source file +and convert each line of data into a message body to be sent to RocketMQ for consumption +by the sink connector. + +### Start Source Connector + +```shell +curl -X POST -H "Content-Type: application/json" http://127.0.0.1:8082/connectors/fileSourceConnector -d '{ + "connector.class": "org.apache.rocketmq.connect.file.FileSourceConnector", + "filename": "/Users/YourUsername/rocketmqconnect/test-source-file.txt", + "connect.topicname": "fileTopic" +}' +``` + +If the curl request returns status 200, it indicates successful creation. Example response: +>{"status":200,"body":{"connector.class":"org.apache.rocketmq.connect.file.FileSourceConnector","filename":"/Users/YourUsername/rocketmqconnect/test-source-file.txt","connect.topicname":"fileTopic"}} -curl -X POST -H "Content-Type: application/json" http://127.0.0.1:8082/connectors/fileSourceConnector -d '{"connector.class":"org.apache.rocketmq.connect.file.FileSourceConnector","filename":"test-source-file.txt","connect.topicname":"fileTopic"}' +View the log file: +```shell +tail -100f ~/logs/rocketmqconnect/connect_runtime.log ``` -If you see the following log message, it means the file source connector has started successfully. +If you see the following log, it means the file source connector has started successfully: +>Start connector fileSourceConnector and set target state STARTED successed!! ->tail -100f ~/logs/rocketmqconnect/connect_runtime.log -> ->2019-07-16 11:18:39 INFO pool-7-thread-1 - **Source task start**, config:{"properties":{"source-record-... -#### source connector configuration instructions +#### Source Connector Configuration Instructions | key | nullable | default | description | | ----------------- | -------- | ------- | ------------------------------------------------------------ | | connector.class | false | | The class name (including the package name) that implements the Connector interface | -| filename | false | | source file name | +| filename | false | | The name of the source file (recommended to use absolute path) | | connect.topicname | false | | Topic required for synchronizing file data | ## 5. Start sink connector +```shell +curl -X POST -H "Content-Type: application/json" http://127.0.0.1:8082/connectors/fileSinkConnector -d '{ + "connector.class": "org.apache.rocketmq.connect.file.FileSinkConnector", + "filename": "/Users/YourUsername/rocketmqconnect/test-sink-file.txt", + "connect.topicnames": "fileTopic" +}' ``` -curl -X POST -H "Content-Type: application/json" http://127.0.0.1:8082/connectors/fileSinkConnector -d '{"connector.class":"org.apache.rocketmq.connect.file.FileSinkConnector","filename":"test-sink-file.txt","connect.topicnames":"fileTopic"}' -cat test-sink-file.txt +If the curl request returns status 200, it indicates successful creation. Example response: +>{"status":200,"body":{"connector.class":"org.apache.rocketmq.connect.file.FileSinkConnector","filename":"/Users/YourUsername/rocketmqconnect/test-sink-file.txt","connect.topicnames":"fileTopic"}} + +View the log file: +```shell +tail -100f ~/logs/rocketmqconnect/connect_runtime.log ``` +If you see the following log, it means the file sink connector has started successfully: +> Start connector fileSinkConnector and set target state STARTED successed!! -> tail -100f ~/logs/rocketmqconnect/connect_runtime.log +Check if the sink connector has written data to the destination file: +```shell +cat /Users/YourUsername/rocketmqconnect/test-sink-file.txt +``` -If you see the following log message, it means the file sink connector has started successfully. +If the test-sink-file.txt file is generated and its content is the same as the +test-source-file.txt, it means the entire process is running correctly. -> 2019-07-16 11:24:58 INFO pool-7-thread-2 - **Sink task start**, config:{"properties":{"source-record-... +Continue writing test data to the source file test-source-file.txt: +```shell +cd /Users/YourUsername/rocketmqconnect/ -If test-sink-file.txt is generated and its content is the same as source-file.txt, it means that the entire process is running normally. +echo "Say Hi to\r\nRMQ Connector\r\nAgain" >> test-source-file.txt -The file contents may be in a different order, which is normal because the order of messages received from different queues in RocketMQ may also be inconsistent. +# Wait a few seconds, check if rocketmq-connect replicate data to sink file succeed +sleep 10 +cat /Users/YourUsername/rocketmqconnect/test-sink-file.txt +``` + +**Note**: The order of file contents may vary because the `rocketmq-connect-sample` uses `normal message` when +sending and receiving messages to/from a RocketMQ topic. This is different from `ordered message`, and consuming +`normal messages` does not guarantee the order. #### sink connector configuration instructions | key | nullable | default | description | | ------------------ | -------- | ------- | ------------------------------------------------------------ | | connector.class | false | | The class name (including the package name) that implements the Connector interface | -| filename | false | | The sink pulls data and saves it to a file. | -| connect.topicnames | false | | The topics of the data messages that the sink needs to process. | +| filename | false | | The sink pulls data and saves it to a file(recommended to use absolute path) | +| connect.topicnames | false | | The topics of the data messages that the sink needs to process | -``` -Tips:The configuration file instructions for the sample rocketmq-connect-sample are for reference only, different source/sink connectors have different configurations, please refer to the specific source/sink connector. -``` + +**Tips**:The configuration file instructions for the sample rocketmq-connect-sample are for reference only, different source/sink connectors have different configurations, please refer to the specific source/sink connector. ## 6. Stop connector +The RESTful command format for stopping connectors is +`http://(your worker ip):(port)/connectors/(connector name)/stop` +To stop the two connectors in the demo, you can use the following commands: ```shell -#GET request -http://(your worker ip):(port)/connectors/(connector name)/stop - -#Stopping the two connectors in the demo -curl http://127.0.0.1:8082/connectors/fileSinkConnector/stop -curl http://127.0.0.1:8082/connectors/fileSourceConnector/stop - +curl http://127.0.0.1:8082/connectors/fileSinkConnector/stop +curl http://127.0.0.1:8082/connectors/fileSourceConnector/stop ``` -Seeing the following log message indicates that the connector has been successfully stopped. +If the curl request returns a status of 200, it indicates successful stopping of the connectors. +Example response: +>{"status":200,"body":"Connector [fileSinkConnector] deleted successfully"} ->**Source task stop**, config:{"properties":{"source-record-converter":"org.apache.rocketmq.connect.runtime.converter.JsonConverter","filename":"/home/zhoubo/IdeaProjects/my-new3-rocketmq-externals/rocketmq-connect/rocketmq-connect-runtime/source-file.txt","task-class":"org.apache.rocketmq.connect.file.FileSourceTask","topic":"fileTopic","connector-class":"org.apache.rocketmq.connect.file.FileSourceConnector","update-timestamp":"1564765189322"}} - -## 7. Stopping the Worker process +If you see the following log message, it means the file sink connector has been +successfully shut down: +```shell +tail -100f ~/logs/rocketmqconnect/connect_default.log ``` +> Completed shutdown for connectorName:fileSinkConnector + +## 7. Stop the Worker process + +```shell +cd $RMQ_CONNECT_HOME/distribution/target/rocketmq-connect-0.0.1-SNAPSHOT/rocketmq-connect-0.0.1-SNAPSHOT sh bin/connectshutdown.sh ``` ## 8. Log directory ->${user.home}/logs/rocketmqconnect - -## 9. Configuration file - -The default directory for persistent configuration files is /tmp/storeRoot. +You can use the following commands to view the log directory: -| key | description | -| -------------------- | --------------------------------------------------------- | -| connectorConfig.json | Connector configuration persistence files | -| position.json | Source connect data processing progress persistence files | -| taskConfig.json | Task configuration persistence files | -| offset.json | Sink connect data consumption progress persistence files | -| connectorStatus.json | Connector status persistence files | -| taskStatus.json | Task status persistence files | +```shell +ls $HOME/logs/rocketmqconnect +ls ~/logs/rocketmqconnect +``` -## 10. Configuration Instructions +## 9. Configuration File Instructions Modify the RESTful port, storeRoot path, Nameserver address, and other information based on your usage. -The file location is in the conf/connect-standalone.conf under the work startup directory. +Here is an example of a configuration file: ```shell #current cluster node uniquely identifies @@ -168,16 +247,29 @@ workerId=DEFAULT_WORKER_1 httpPort=8082 # Local file dir for config store -storePathRootDir=/home/connect/storeRoot +storePathRootDir=/Users/YourUsername/rocketmqconnect/storeRoot #You need to modify it to your own rocketmq nameserver endpoint. # RocketMQ namesrvAddr namesrvAddr=127.0.0.1:9876 -#This is used for loading Connector plugins, similar to how JVM loads jar packages or classes at startup. This directory is used for placing Connector-related implementation plugins and supports both files and directories. -# Source or sink connector jar file dir -pluginPaths=rocketmq-connect-sample/target/rocketmq-connect-sample-0.0.1-SNAPSHOT.jar +# Plugin path for loading Source/Sink Connectors +# The rocketmq-connect project already includes the rocketmq-connect-sample module by default, so no configuration is needed here. +pluginPaths= +``` + +Explanation of storePathRootDir configuration: + +In standalone mode, RocketMQ Connect persists the synchronization checkpoint information +to the local file directory specified by storePathRootDir. The persistent files include: + + +| key | description | +| -------------------- | --------------------------------------------------------- | +| connectorConfig.json | Connector configuration persistence files | +| position.json | Source connect data processing progress persistence files | +| taskConfig.json | Task configuration persistence files | +| offset.json | Sink connect data consumption progress persistence files | +| connectorStatus.json | Connector status persistence files | +| taskStatus.json | Task status persistence files | -# Addition : put the Connector-related implementation plugins in the specified folder. -# pluginPaths=/usr/local/connector-plugins/* -``` \ No newline at end of file diff --git a/i18n/en/docusaurus-plugin-content-docs/current/10-connect/07RocketMQ Connect In Action4.md b/i18n/en/docusaurus-plugin-content-docs/current/10-connect/07RocketMQ Connect In Action4.md index 3f0dd01471..23d647d6e8 100644 --- a/i18n/en/docusaurus-plugin-content-docs/current/10-connect/07RocketMQ Connect In Action4.md +++ b/i18n/en/docusaurus-plugin-content-docs/current/10-connect/07RocketMQ Connect In Action4.md @@ -1,6 +1,6 @@ # RocketMQ Connect in Action 4 -SFTP Server(file data) -> RocketMQ Connect +SFTP Server (File Data) -> RocketMQ Connect -> SFTP Server (File) ## Preparation @@ -9,55 +9,70 @@ SFTP Server(file data) -> RocketMQ Connect 1. Linux/Unix/Mac 2. 64bit JDK 1.8+; 3. Maven 3.2.x+; -4. Start [RocketMQ](https://rocketmq.apache.org/docs/quick-start/); +4. Start RocketMQ. Either [RocketMQ 4.x](https://rocketmq.apache.org/docs/4.x/) or + [RocketMQ 5.x](https://rocketmq.apache.org/docs/quickStart/01quickstart/) 5.x version can be used; +5. Test RocketMQ message sending and receiving using the tool. +Here, use the environment variable NAMESRV_ADDR to inform the tool client of the NameServer address of RocketMQ as localhost:9876. +```shell +#$ cd distribution/target/rocketmq-4.9.7/rocketmq-4.9.7 +$ cd distribution/target/rocketmq-5.1.4/rocketmq-5.1.4 -**Tips** : ${ROCKETMQ_HOME} locational instructions +$ export NAMESRV_ADDR=localhost:9876 +$ sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer + SendResult [sendStatus=SEND_OK, msgId= ... ->bin-release.zip version:/rocketmq-all-4.9.4-bin-release -> ->source-release.zip version:/rocketmq-all-4.9.4-source-release/distribution +$ sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer + ConsumeMessageThread_%d Receive New Messages: [MessageExt... +``` +**Note**: RocketMQ has the feature of automatically creating Topic and Group. When sending or subscribing to messages, +if the corresponding Topic or Group does not exist, RocketMQ will automatically create them. Therefore, +there is no need to create Topic and Group in advance. -### Start Connect +### Build Connector Runtime +```shell +git clone https://github.com/apache/rocketmq-connect.git -#### **Compiling Connector Plugin** +cd rocketmq-connect -RocketMQ Connector SFTP +export RMQ_CONNECT_HOME=`pwd` -``` -$ cd rocketmq-connect/connectors/rocketmq-connect-sftp/ -$ mvn clean package -Dmaven.test.skip=true +mvn -Prelease-connect -Dmaven.test.skip=true clean install -U ``` -Move the compiled RocketMQ Connector SFTP package into the Runtime loading directory. The command is as follows: +### Build SFTP Connector Plugin ``` -mkdir -p /usr/local/connector-plugins -cp target/rocketmq-connect-sftp-0.0.1-SNAPSHOT-jar-with-dependencies.jar /usr/local/connector-plugins -``` - -#### Start Connect Runtime +cd $RMQ_CONNECT_HOME/connectors/rocketmq-connect-sftp/ +mvn clean package -Dmaven.test.skip=true ``` -cd rocketmq-connect -mvn -Prelease-connect -DskipTests clean install -U +Put the compiled jar of the SFTP RocketMQ Connector into the Plugin directory for runtime loading. +``` +mkdir -p /Users/YourUsername/rocketmqconnect/connector-plugins +cp target/rocketmq-connect-sftp-0.0.1-SNAPSHOT-jar-with-dependencies.jar /Users/YourUsername/rocketmqconnect/connector-plugins ``` -Modify the configuration `connect-standalone.conf`, the main configuration is as follows +### Run Connector Worker in Standalone Mode + +Modify the `connect-standalone.conf` file to configure the RocketMQ connection +address and other information. ``` -$ cd distribution/target/rocketmq-connect-0.0.1-SNAPSHOT/rocketmq-connect-0.0.1-SNAPSHOT -$ vim conf/connect-standalone.conf +cd $RMQ_CONNECT_HOME/distribution/target/rocketmq-connect-0.0.1-SNAPSHOT/rocketmq-connect-0.0.1-SNAPSHOT + +vim conf/connect-standalone.conf ``` +Example configuration information is as follows: ``` workerId=standalone-worker -storePathRootDir=/tmp/storeRoot +storePathRootDir=/Users/YourUsername/rocketmqconnect/storeRoot ## Http port for user to access REST API httpPort=8082 @@ -67,84 +82,164 @@ namesrvAddr=localhost:9876 # RocketMQ acl aclEnable=false -accessKey=rocketmq -secretKey=12345678 +#accessKey=rocketmq +#secretKey=12345678 -autoCreateGroupEnable=false clusterName="DefaultCluster" -# Core configuration, configure the plugin directory that was previously compiled here. -# Source or sink connector jar file dir,The default value is rocketmq-connect-sample -pluginPaths=/usr/local/connector-plugins +# Plugin path for loading Source/Sink Connectors +pluginPaths=/Users/YourUsername/rocketmqconnect/connector-plugins ``` +In standalone mode, RocketMQ Connect persistently stores the synchronization checkpoint information +in the local file directory specified by storePathRootDir. + +>storePathRootDir=/Users/YourUsername/rocketmqconnect/storeRoot +If you want to reset the synchronization checkpoint, you need to delete the persisted checkpoint information files. +```shell +rm -rf /Users/YourUsername/rocketmqconnect/storeRoot/* ``` -cd distribution/target/rocketmq-connect-0.0.1-SNAPSHOT/rocketmq-connect-0.0.1-SNAPSHOT +To start Connector Worker in standalone mode: +``` sh bin/connect-standalone.sh -c conf/connect-standalone.conf & - ``` ### Set up an SFTP server -Use the built-in SFTP server on MAC OS. +SFTP (SSH File Transfer Protocol) is a file transfer protocol used for secure file transfers between computers. +SFTP is built on top of the SSH (Secure Shell) protocol and utilizes encryption and authentication. -[Allow remote computers to access your Mac](https://support.apple.com/zh-cn/guide/mac-help/mchlp1066/mac) +We will use the built-in SFTP service in macOS (by enabling "Remote Login" access). +For detailed instructions, please refer to the +[Allow a remote computer to access your Mac](https://support.apple.com/guide/mac-help/allow-a-remote-computer-to-access-your-mac-mchlp1066/mac)document. + +### Create Source Test File +Create a test file named `source.txt` and write some test data to it: + +```shell +mkdir -p /Users/YourUsername/rocketmqconnect/sftp-test/ -### Test data +cd /Users/YourUsername/rocketmqconnect/sftp-test/ -Log in to the SFTP server and place a file called source.txt with specific contents in the user directory, for example: /path/to/. +touch source.txt + +echo 'John Doe|100000202211290001|20221129001|30000.00|2022-11-28|03:00:00|7.00 +Jane Smith|100000202211290002|20221129002|40000.00|2022-11-28|04:00:00|9.00 +Bob Johnson|100000202211290003|20221129003|50000.00|2022-11-28|05:00:00|12.00' >> source.txt +``` + +Log in to the SFTP service to verify that you can access it normally. Enter the following command, then enter your +password : +```shell +# sftp -P port YourUsername@hostname +sftp -P 22 YourUsername@127.0.0.1 +``` +**Note**: Since this is the SFTP service provided by your local MAC OS, the address is `127.0.0.1` and the port is the default 22. -```text -zhangsan|100000202211290001|20221129001|30000.00|2022-11-28|03:00:00|7.00 -lisi|100000202211290002|20221129002|40000.00|2022-11-28|04:00:00|9.00 -zhaowu|100000202211290003|20221129003|50000.00|2022-11-28|05:00:00|12.00 +```shell +sftp> cd /Users/YourUsername/rocketmqconnect/sftp-test/ +sftp> ls source.txt +sftp> bye ``` ## Start Connector -### Start SFTP source connector +### Start SFTP Source Connector -Synchronize the SFTP file: source.txt -Purpose: by logging into the SFTP server, parsing the file and encapsulating it into a generic ConnectRecord object, sending it to the RocketMQ Topic. +Run the following command to start the SFTP source connector. This connector will connect to the +SFTP service to read from the `source.txt` file. For each line of text in the file, the connector +will parse and package the contents into a generic ConnectRecord object, which will then be sent +to a RocketMQ topic for consumption by sink connectors. ```shell curl -X POST --location "http://localhost:8082/connectors/SftpSourceConnector" --http1.1 \ -H "Host: localhost:8082" \ -H "Content-Type: application/json" \ - -d "{ - \"connector.class\": \"org.apache.rocketmq.connect.http.sink.SftpSourceConnector\", - \"host\": \"127.0.0.1\", - \"port\": 22, - \"username\": \"wencheng\", - \"password\": \"1617\", - \"filePath\": \"/Users/wencheng/Documents/source.txt\", - \"connect.topicname\": \"sftpTopic\", - \"fieldSeparator\": \"|\", - \"fieldSchema\": \"username|idCardNo|orderNo|orderAmount|trxDate|trxTime|profit\" - }" + -d '{ + "connector.class": "org.apache.rocketmq.connect.http.sink.SftpSourceConnector", + "host": "127.0.0.1", + "port": 22, + "username": "YourUsername", + "password": "yourPassword", + "filePath": "/Users/YourUsername/rocketmqconnect/sftp-test/source.txt", + "connect.topicname": "sftpTopic", + "fieldSeparator": "|", + "fieldSchema": "username|idCardNo|orderNo|orderAmount|trxDate|trxTime|profit" + }' +``` + +If the curl request returns status: 200, it indicates that the connector was successfully +created. An example response would look like this: +```json +{"status":200,"body":{"connector.class":"... +``` + +To confirm that the file source connector has started successfully, run the following command: +```shell +tail -100f ~/logs/rocketmqconnect/connect_runtime.log ``` -After running the above commands, the file data on the SFTP server will be organized into data in the specified format, and written to MQ. Afterwards, it can be consumed by the sink connector or other business systems. +>Start connector SftpSourceConnector and set target state STARTED successed!! -### Start SFTP sink connector +### Start SFTP Sink Connector -Purpose: by consuming the data in the Topic, use the SFTP protocol to write to the target file. +Run the following command to start the SFTP sink connector. This connector will subscribe to the RocketMQ topic +to consume messages and convert each one into a single line of text, which will then be written to the destination +file `sink.txt` using the SFTP protocol: ```shell curl -X POST --location "http://localhost:8082/connectors/SftpSinkConnector" --http1.1 \ -H "Host: localhost:8082" \ -H "Content-Type: application/json" \ - -d "{ - \"connector.class\": \"org.apache.rocketmq.connect.http.sink.SftpSinkConnector\", - \"host\": \"127.0.0.1\", - \"port\": 22, - \"username\": \"wencheng\", - \"password\": \"1617\", - \"filePath\": \"/Users/wencheng/Documents/sink.txt\", - \"connect.topicnames\": \"sftpTopic\", - \"fieldSeparator\": \"|\", - \"fieldSchema\": \"username|idCardNo|orderNo|orderAmount|trxDate|trxTime|profit\" - }" + -d '{ + "connector.class": "org.apache.rocketmq.connect.http.sink.SftpSinkConnector", + "host": "127.0.0.1", + "port": 22, + "username": "YourUsername", + "password": "yourPassword", + "filePath": "/Users/YourUsername/rocketmqconnect/sftp-test/sink.txt", + "connect.topicnames": "sftpTopic", + "fieldSeparator": "|", + "fieldSchema": "username|idCardNo|orderNo|orderAmount|trxDate|trxTime|profit" + }' ``` + +If the curl request returns status: 200, it indicates that the connector was successfully +created. An example response would look like this: +```json +{"status":200,"body":{"connector.class":"... +``` + +Check the logs to confirm successful startup of the SFTP sink connector: +```shell +tail -100f ~/logs/rocketmqconnect/connect_runtime.log +``` + +>Start connector SftpSinkConnector and set target state STARTED successed!! + +Confirm that the data has been written to the destination file by running the following command: +```shell +cat /Users/YourUsername/rocketmqconnect/sftp-test/sink.txt +``` + +If the `sink.txt` file has been generated and its contents match those of the `source.txt` file, the entire process is working correctly. + +Write more test data to the `source.txt` file to continue testing: +```shell +cd /Users/YourUsername/rocketmqconnect/sftp-test/ + +echo 'John Doe|100000202211290001|20221129001|30000.00|2022-11-28|03:00:00|7.00 +Jane Smith|100000202211290002|20221129002|40000.00|2022-11-28|04:00:00|9.00 +Bob Johnson|100000202211290003|20221129003|50000.00|2022-11-28|05:00:00|12.00' >> source.txt + +# Wait a few seconds to give the connector time to replicate data to the sink file. +sleep 10 + +cat /Users/YourUsername/rocketmqconnect/sftp-test/sink.txt +``` + +**Note**: The order of file contents may vary because the `rocketmq-connect-sftp` uses `normal message` when +sending and receiving messages to/from a RocketMQ topic. This is different from `ordered message`, and consuming +`normal messages` does not guarantee the order. \ No newline at end of file diff --git a/i18n/en/docusaurus-plugin-content-docs/current/10-connect/08RocketMQ Connect In Action5-ES.md b/i18n/en/docusaurus-plugin-content-docs/current/10-connect/08RocketMQ Connect In Action5-ES.md index 269fc8cd02..5bec501329 100644 --- a/i18n/en/docusaurus-plugin-content-docs/current/10-connect/08RocketMQ Connect In Action5-ES.md +++ b/i18n/en/docusaurus-plugin-content-docs/current/10-connect/08RocketMQ Connect In Action5-ES.md @@ -1,6 +1,6 @@ # RocketMQ Connect in Action 5 -Elsticsearch Source - >RocketMQ Connect -> Elasticsearch Sink +Elasticsearch Source -> RocketMQ Connect -> Elasticsearch Sink ## preparatory work @@ -8,54 +8,77 @@ Elsticsearch Source - >RocketMQ Connect -> Elasticsearch Sink 1. Linux/Unix/Mac 2. 64bit JDK 1.8+; -3. Maven 3.2.x or later; -4. Start [RocketMQ](https://rocketmq.apache.org/docs/quick-start/); +3. Maven 3.2.x+; +4. Start RocketMQ. Either [RocketMQ 4.x](https://rocketmq.apache.org/docs/4.x/) or + [RocketMQ 5.x](https://rocketmq.apache.org/docs/quickStart/01quickstart/) 5.x version can be used; +5. Test RocketMQ message sending and receiving using the tool. +Here, use the environment variable NAMESRV_ADDR to inform the tool client of the NameServer address of RocketMQ as localhost:9876. +```shell +#$ cd distribution/target/rocketmq-4.9.7/rocketmq-4.9.7 +$ cd distribution/target/rocketmq-5.1.4/rocketmq-5.1.4 -**tips** : ${ROCKETMQ_HOME} Position Description +$ export NAMESRV_ADDR=localhost:9876 +$ sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer + SendResult [sendStatus=SEND_OK, msgId= ... ->bin-release.zip version:/rocketmq-all-4.9.4-bin-release -> ->source-release.zip versioon:/rocketmq-all-4.9.4-source-release/distribution +$ sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer + ConsumeMessageThread_%d Receive New Messages: [MessageExt... +``` +**Note**: RocketMQ has the feature of automatically creating Topic and Group. When sending or subscribing to messages, +if the corresponding Topic or Group does not exist, RocketMQ will automatically create them. Therefore, +there is no need to create Topic and Group in advance. -### Start Connect +Here's the English translation of the content: +### Building the Connector Runtime -#### Connector plugin compilation +Clone the repository and build the RocketMQ Connect project: -Elasticsearch RocketMQ Connector -``` -$ cd rocketmq-connect/connectors/rocketmq-connect-elasticsearch/ -$ mvn clean package -Dmaven.test.skip=true -``` +```shell +git clone https://github.com/apache/rocketmq-connect.git -Move the compiled Elasticsearch RocketMQ Connector package into the Runtime load directory. The command is as follows: -``` -mkdir -p /usr/local/connector-plugins -cp rocketmq-connect-elasticsearch/target/rocketmq-connect-elasticsearch-1.0.0-jar-with-dependencies.jar /usr/local/connector-plugins +cd rocketmq-connect + +export RMQ_CONNECT_HOME=`pwd` + +mvn -Prelease-connect -Dmaven.test.skip=true clean install -U ``` +### Build Elasticsearch Connector Plugin +Build the Elasticsearch RocketMQ Connector plugin: -#### Start Connect Runtime +```shell +cd $RMQ_CONNECT_HOME/connectors/rocketmq-connect-elasticsearch/ +mvn clean package -Dmaven.test.skip=true ``` -cd rocketmq-connect -mvn -Prelease-connect -DskipTests clean install -U +Copy the compiled Elasticsearch RocketMQ Connector plugin JAR file into the plugin directory used by the runtime: -``` +```shell +mkdir -p /Users/YourUsername/rocketmqconnect/connector-plugins -Update `connect-standalone.conf` ,Key configurations are as follows: +cp target/rocketmq-connect-elasticsearch-1.0.0-jar-with-dependencies.jar /Users/YourUsername/rocketmqconnect/connector-plugins ``` -$ cd distribution/target/rocketmq-connect-0.0.1-SNAPSHOT/rocketmq-connect-0.0.1-SNAPSHOT -$ vim conf/connect-standalone.conf + +### Run Connector Worker in Standalone Mode + +Modify the `connect-standalone.conf` file to configure the RocketMQ connection +address and other information. + +```shell +cd $RMQ_CONNECT_HOME/distribution/target/rocketmq-connect-0.0.1-SNAPSHOT/rocketmq-connect-0.0.1-SNAPSHOT + +vim conf/connect-standalone.conf ``` +Example configuration information is as follows: ``` workerId=standalone-worker -storePathRootDir=/tmp/storeRoot +storePathRootDir=/Users/YourUsername/rocketmqconnect/storeRoot ## Http port for user to access REST API httpPort=8082 @@ -65,53 +88,217 @@ namesrvAddr=localhost:9876 # RocketMQ acl aclEnable=false -accessKey=rocketmq -secretKey=12345678 +#accessKey=rocketmq +#secretKey=12345678 -autoCreateGroupEnable=false clusterName="DefaultCluster" -# Core configuration where the plugin directory where you compiled the elasticsearch package is located -# Source or sink connector jar file dir,The default value is rocketmq-connect-sample -pluginPaths=/usr/local/connector-plugins +# Plugin path for loading Source/Sink Connectors +pluginPaths=/Users/YourUsername/rocketmqconnect/connector-plugins ``` +In standalone mode, RocketMQ Connect persistently stores the synchronization checkpoint information +in the local file directory specified by storePathRootDir. + +>storePathRootDir=/Users/YourUsername/rocketmqconnect/storeRoot +If you want to reset the synchronization checkpoint, delete the persistence files: +```shell +rm -rf /Users/YourUsername/rocketmqconnect/storeRoot/* ``` -cd distribution/target/rocketmq-connect-0.0.1-SNAPSHOT/rocketmq-connect-0.0.1-SNAPSHOT +To start Connector Worker in standalone mode: +``` sh bin/connect-standalone.sh -c conf/connect-standalone.conf & +``` + +### Set Up Elasticsearch Services + +Elasticsearch is an open-source search and analytics engine. + +We'll use two separate Docker instances of Elasticsearch to serve as our source and destination databases: ``` +docker pull docker.elastic.co/elasticsearch/elasticsearch:7.15.1 -### Elasticsearch Image +docker run --name es1 -p 9200:9200 -p 9300:9300 -e "discovery.type=single-node" -e "ES_JAVA_OPTS=-Xms1g -Xmx1g" \ +-v /Users/YourUsername/rocketmqconnect/es/es1_data:/usr/share/elasticsearch/data \ +-d docker.elastic.co/elasticsearch/elasticsearch:7.15.1 -Use docker to build the Elasticsearch database +docker run --name es2 -p 9201:9200 -p 9301:9300 -e "discovery.type=single-node" -e "ES_JAVA_OPTS=-Xms1g -Xmx1g" \ +-v /Users/YourUsername/rocketmqconnect/es/es2_data:/usr/share/elasticsearch/data \ +-d docker.elastic.co/elasticsearch/elasticsearch:7.15.1 ``` -# starting a elasticsearch instance -docker run --name my-elasticsearch -p 9200:9200 -p 9300:9300 -e "ES_JAVA_OPTS=-Xms1g -Xmx1g" -d 74c2e0ec249c + +Explanation of Docker commands: + +- `--name es2`: Specifies a name for the container, e.g., `es2`. +- `-p 9201:9200 -p 9301:9300`: Maps ports 9200 and 9300 on the Elasticsearch container to host ports 9201 and 9301 so that the Elasticsearch service can be accessed via the host. +- `-e discovery.type=single-node`: configures Elasticsearch to work on a single node without discovering other nodes in a cluster, suitable for single-server deployment. +- `-v /Users/YourUsername/rocketmqconnect/es/es2_data:/usr/share/elasticsearch/data`: Mounts a directory on the host to `/usr/share/elasticsearch/data` within the container for persistent storage of Elasticsearch data. + +This runs a custom-configured instance of Elasticsearch with persistent data storage on a container accessible through port 9200 on the host machine, making it useful for development or testing environments on a local machine. + +View the Elasticsearch logs: + ``` -### Kibana Image +docker logs -f es1 + +docker logs -f es2 +``` + +Verify that Elasticsearch has started successfully: -Use docker to build the Kibana environment ``` -docker run --name my-kibana -e ELASTICSEARCH_URL=http://192.168.0.101:9200 -p 5601:5601 -d 5dca66b41943 +# Check Elasticsearch instance 1 +curl -XGET http://localhost:9200 + +# Check Elasticsearch instance 2 +curl -XGET http://localhost:9201 ``` +A successful connection and correct operation will result in JSON responses containing information +about Elasticsearch and its version number. -### test data +### Set Up Kibana Services +Kibana is an open-source data visualization tool that allows users to interactively explore +and understand data stored within Elasticsearch clusters. It offers rich features such as charts, graphs, and dashboards. -Create test data with kibana Dev Tools: reference [console-ibana](https://www.elastic.co/guide/en/kibana/8.5/console-kibana.html#console-kibana); +For convenience, we'll set up two separate instances of Kibana in Docker and link them to +our previously established Elasticsearch containers using the following command: +``` +docker pull docker.elastic.co/kibana/kibana:7.15.1 -Source Index:connect_es +docker run --name kibana1 --link es1:elasticsearch -p 5601:5601 -d docker.elastic.co/kibana/kibana:7.15.1 -## Start Connector +docker run --name kibana2 --link es2:elasticsearch -p 5602:5601 -d docker.elastic.co/kibana/kibana:7.15.1 +``` -### Start Elasticsearch source connector +Explanation of Docker Commands: +- `--name kibana2`: Assigns a name to the new container, e.g., kibana2 +- `--link es2:elasticsearch`: Links the container to another named Elasticsearch instance (in this case, 'es2'). This enables communication between Kibana and Elasticsearch. +- `-p 5602:5601`: Maps Kibana's default port (5601) to the same port on the host machine to make it accessible through the browser. +- `-d`: runs the Docker container in detached mode. -Synchronizing source index data: connect_es -effect:Send a RocketMQ Topic by parsing Elasticsearch document data and wrapping it into a generic ConnectRecord object +Once the container has launched, you can monitor its log output: + +``` +docker logs -f kibana1 + +docker logs -f kibana2 +``` + + +To access Kibana console pages, simply visit following addresses in your browser +- kibana1: http://localhost:5601 +- kibana2:http://localhost:5602 + +If they load correctly, it indicates successful startup of the respective Kibana instances. + +### Write Test Data to the Source Elasticsearch +Kibana's Dev Tools can help you interact and operate directly with Elasticsearch in Kibana. +You can execute various queries and operations, analyze and understand the returned data. +Refer to the documentation [console-kibana](https://www.elastic.co/guide/en/kibana/8.9/console-kibana.html). + +#### Bulk Write Test Data +Access the Kibana1 console through the browser, find Dev Tools from the left menu, +and enter the following commands on the page to write test data: + +``` +POST /_bulk +{ "index" : { "_index" : "connect_es" } } +{ "id": "1", "field1": "value1", "field2": "value2" } +{ "index" : { "_index" : "connect_es" } } +{ "id": "2", "field1": "value3", "field2": "value4" } +``` + +**Note**: +- connect_es: The index name for the data. +- id/field1/field2: These are field names, and 1, value1, value2 represent the values for the fields. + +**Note**: There is a limitation in `rocketmq-connect-elasticsearch`, which requires a field in the data that +can be used for >= comparison operations (string or number). This field will be used to record the +synchronization checkpoint. In the above example, the `id` field is a globally unique, incrementing numerical field. + +#### Query Data +To query data within an index, use the following command: +``` +GET /connect_es/_search +{ + "size": 100 +} +``` + +If there is no data available, the response will be: +``` +{ + "error" : { + ... + "type" : "index_not_found_exception", + "reason" : "no such index [connect_es]", + "resource.type" : "index_or_alias", + "resource.id" : "connect_es", + "index_uuid" : "_na_", + "index" : "connect_es" + }, + "status" : 404 +} +``` + +If there is data available, the response will be: +``` +{ + ... + "hits" : { + "total" : { + "value" : 2, + "relation" : "eq" + }, + "max_score" : 1.0, + "hits" : [ + { + "_index" : "connect_es", + "_type" : "_doc", + "_id" : "_dx49osBb46Z9cN4hYCg", + "_score" : 1.0, + "_source" : { + "id" : "1", + "field1" : "value1", + "field2" : "value2" + } + }, + { + "_index" : "connect_es", + "_type" : "_doc", + "_id" : "_tx49osBb46Z9cN4hYCg", + "_score" : 1.0, + "_source" : { + "id" : "2", + "field1" : "value3", + "field2" : "value4" + } + } + ] + } +} + +``` + +#### Delete Data + +If you need to delete data within an index due to repeated testing or other reasons, you can use the following command: + +``` +DELETE /connect_es +``` + +## Start Connector + +### Start Elasticsearch Source Connector +Run the following command to start the ES source connector. The connector will connect to Elasticsearch +and read document data from the connect_es index. It will parse the Elasticsearch document data and +package it into a generic ConnectRecord object, which will be sent to a RocketMQ topic for consumption by the Sink Connector. ``` curl -X POST -H "Content-Type: application/json" http://127.0.0.1:8082/connectors/elasticsearchSourceConnector -d '{ @@ -131,26 +318,60 @@ curl -X POST -H "Content-Type: application/json" http://127.0.0.1:8082/connector }' ``` -### Start Elasticsearch sink connector +**Note**: The startup command specifies that the source ES should synchronize the connect_es index, +and the incrementing field in the index is id. Data will be fetched starting from id=1. + +If the curl request returns status:200, it indicates a successful creation, and the sample response will be: +>{"status":200,"body":{"connector.class":"... + +If you see the following logs, it indicates that the file source connector has started successfully. +```shell +tail -100f ~/logs/rocketmqconnect/connect_runtime.log +``` + +>Start connector elasticsearchSourceConnector and set target state STARTED successed!! -effect:Data is written to the target index by consuming the Topic +### Start Elasticsearch Sink Connector +Run the following command to start the ES sink connector. The connector will subscribe to data from +the RocketMQ topic and consume it. It will convert each message into document data and write it to the destination ES. ``` -curl -X POST -H "Content-Type: application/json" http://127.0.0.1:8082/connectors/ElasticsearchSinkConnector -d '{ +curl -X POST -H "Content-Type: application/json" http://127.0.0.1:8082/connectors/elasticsearchSinkConnector -d '{ "connector.class":"org.apache.rocketmq.connect.elasticsearch.connector.ElasticsearchSinkConnector", "elasticsearchHost":"localhost", - "elasticsearchPort":9202, + "elasticsearchPort":9201, "max.tasks":2, "connect.topicnames":"ConnectEsTopic", "value.converter":"org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter", "key.converter":"org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter" }' +``` + +**Note**: The startup command specifies the address and port of the destination ES, which corresponds to +the previously started ES2 in Docker. + +If the curl request returns status:200, it indicates a successful creation, and the sample response will be: +>{"status":200,"body":{"connector.class":"... + +If you see the following logs, it indicates that the file source connector has started successfully: +```shell +tail -100f ~/logs/rocketmqconnect/connect_runtime.log ``` -note:Local testing requires you to start the Elasticsearch process on two different ports +>Start connector elasticsearchSinkConnector and set target state STARTED successed!! + +To check if the sink connector has written data to the destination ES index: + +1. Access the Kibana2 console address in the browser: http://localhost:5602 +2. In the Kibana2 Dev Tools page, query the data within the index. If it matches the data in the source ES1, it means the connector is running properly. + +``` +GET /connect_es/_search +{ + "size": 100 +} +``` -After the two Connector tasks are successfully created Whether the Elasticsearch specified by accessing sink contains data -New data added to the source index can be synchronized to the target index diff --git a/i18n/en/docusaurus-plugin-content-docs/version-5.0/10-connect/03RocketMQ Connect Quick Start.md b/i18n/en/docusaurus-plugin-content-docs/version-5.0/10-connect/03RocketMQ Connect Quick Start.md index 8136302a43..518bf59721 100644 --- a/i18n/en/docusaurus-plugin-content-docs/version-5.0/10-connect/03RocketMQ Connect Quick Start.md +++ b/i18n/en/docusaurus-plugin-content-docs/version-5.0/10-connect/03RocketMQ Connect Quick Start.md @@ -2,163 +2,242 @@ # Quick Start -In standalone mode, [rocketmq-connect-sample] serves as a demo. +This tutorial will start a RocketMQ Connector example project "rocketmq-connect-sample" in standalone mode to help you understand the working principle of connectors. +The example project provides a source connector that reads data from source files and sends it to the RocketMQ cluster. +It also provides a sink connector that reads messages from the RocketMQ cluster and writes them to destination files. -The main purpose of rocketmq-connect-sample is to read data from a source file and send it to a RocketMQ cluster, and then read messages from the Topic and write them to a target file. - -## 1. Prepare +## 1. Preparation: Start RocketMQ 1. Linux/Unix/Mac 2. 64bit JDK 1.8+; 3. Maven 3.2.x+; -4. Start [RocketMQ](https://rocketmq.apache.org/docs/quick-start/); -5. Create test Topic +4. Start RocketMQ. Either [RocketMQ 4.x](https://rocketmq.apache.org/docs/4.x/) or + [RocketMQ 5.x](https://rocketmq.apache.org/docs/quickStart/01quickstart/) 5.x version can be used; +5. Test RocketMQ message sending and receiving using the tool. -> sh ${ROCKETMQ_HOME}/bin/mqadmin updateTopic -t fileTopic -n localhost:9876 -c DefaultCluster -r 8 -w 8 +Here, use the environment variable NAMESRV_ADDR to inform the tool client of the NameServer address of RocketMQ as localhost:9876. -**tips** : ${ROCKETMQ_HOME} locational instructions +```shell +#$ cd distribution/target/rocketmq-4.9.7/rocketmq-4.9.7 +$ cd distribution/target/rocketmq-5.1.4/rocketmq-5.1.4 ->bin-release.zip version:/rocketmq-all-4.9.4-bin-release -> ->source-release.zip version:/rocketmq-all-4.9.4-source-release/distribution +$ export NAMESRV_ADDR=localhost:9876 +$ sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer + SendResult [sendStatus=SEND_OK, msgId= ... +$ sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer + ConsumeMessageThread_%d Receive New Messages: [MessageExt... +``` -## 2. Build Connect +**Note**: RocketMQ has the feature of automatically creating Topic and Group. When sending or subscribing to messages, +if the corresponding Topic or Group does not exist, RocketMQ will automatically create them. Therefore, +there is no need to create Topic and Group in advance. -``` +## 2. Build Connector Runtime +```shell git clone https://github.com/apache/rocketmq-connect.git cd rocketmq-connect -mvn -Prelease-connect -DskipTests clean install -U +export RMQ_CONNECT_HOME=`pwd` +mvn -Prelease-connect -Dmaven.test.skip=true clean install -U ``` -## 3. Run Worker +**Note**: The project already includes the code for rocketmq-connect-sample by default, +so there is no need to build the rocketmq-connect-sample plugin separately. +## 3. Run Connector Worker in Standalone Mode + +### Modify Configuration +Modify the `connect-standalone.conf` file to configure the RocketMQ connection +address and other information. Please refer to [9. Configuration File Instructions](#9-configuration-file-instructions) for details. ``` -cd distribution/target/rocketmq-connect-0.0.1-SNAPSHOT/rocketmq-connect-0.0.1-SNAPSHOT +cd $RMQ_CONNECT_HOME/distribution/target/rocketmq-connect-0.0.1-SNAPSHOT/rocketmq-connect-0.0.1-SNAPSHOT -sh bin/connect-standalone.sh -c conf/connect-standalone.conf & +vim conf/connect-standalone.conf +``` + +In standalone mode, RocketMQ Connect persists the synchronization checkpoint information +to the local file directory storePathRootDir. +>storePathRootDir=/Users/YourUsername/rocketmqconnect/storeRoot + +If you want to reset the synchronization checkpoint, you need to delete the persisted +checkpoint file. + +```shell +rm -rf /Users/YourUsername/rocketmqconnect/storeRoot/* ``` -**tips**: The JVM Parameters Configuration can be adjusted in /bin/runconnect.sh as needed. +### Start Connector Worker in Standalone Mode + +```shell +sh bin/connect-standalone.sh -c conf/connect-standalone.conf & +``` + +**tips**: You can modify `docker/connect/bin/runconnect.sh` to adjust JVM startup +parameters as needed. >JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m" -runtime start successful: +To view the startup log file: ->The standalone worker boot success. +```shell +tail -100f ~/logs/rocketmqconnect/connect_runtime.log +``` -View the startup log files. +If the runtime starts successfully, you will see the following print in the log file: ->tail -100f ~/logs/rocketmqconnect/connect_runtime.log +>The standalone worker boot success. -`ctrl + c` exit log +To exit the log tracking mode of `tail -f` command, you can press the `Ctrl + C` key combination. -## 4. Start source connector +## 4. Start Source Connector -Create a test file named test-source-file.txt in the current directory. +### Create Source File and Write Test Data -``` +```shell +mkdir -p /Users/YourUsername/rocketmqconnect/ +cd /Users/YourUsername/rocketmqconnect/ touch test-source-file.txt echo "Hello \r\nRocketMQ\r\n Connect" >> test-source-file.txt +``` +**Note**: There should be no empty lines (the demo program will throw an error if it +encounters empty lines). The source connector will continuously read the source file +and convert each line of data into a message body to be sent to RocketMQ for consumption +by the sink connector. + +### Start Source Connector + +```shell +curl -X POST -H "Content-Type: application/json" http://127.0.0.1:8082/connectors/fileSourceConnector -d '{ + "connector.class": "org.apache.rocketmq.connect.file.FileSourceConnector", + "filename": "/Users/YourUsername/rocketmqconnect/test-source-file.txt", + "connect.topicname": "fileTopic" +}' +``` + +If the curl request returns status 200, it indicates successful creation. Example response: +>{"status":200,"body":{"connector.class":"org.apache.rocketmq.connect.file.FileSourceConnector","filename":"/Users/YourUsername/rocketmqconnect/test-source-file.txt","connect.topicname":"fileTopic"}} -curl -X POST -H "Content-Type: application/json" http://127.0.0.1:8082/connectors/fileSourceConnector -d '{"connector.class":"org.apache.rocketmq.connect.file.FileSourceConnector","filename":"test-source-file.txt","connect.topicname":"fileTopic"}' +View the log file: +```shell +tail -100f ~/logs/rocketmqconnect/connect_runtime.log ``` -If you see the following log message, it means the file source connector has started successfully. +If you see the following log, it means the file source connector has started successfully: +>Start connector fileSourceConnector and set target state STARTED successed!! ->tail -100f ~/logs/rocketmqconnect/connect_runtime.log -> ->2019-07-16 11:18:39 INFO pool-7-thread-1 - **Source task start**, config:{"properties":{"source-record-... -#### source connector configuration instructions +#### Source Connector Configuration Instructions | key | nullable | default | description | | ----------------- | -------- | ------- | ------------------------------------------------------------ | | connector.class | false | | The class name (including the package name) that implements the Connector interface | -| filename | false | | source file name | +| filename | false | | The name of the source file (recommended to use absolute path) | | connect.topicname | false | | Topic required for synchronizing file data | ## 5. Start sink connector +```shell +curl -X POST -H "Content-Type: application/json" http://127.0.0.1:8082/connectors/fileSinkConnector -d '{ + "connector.class": "org.apache.rocketmq.connect.file.FileSinkConnector", + "filename": "/Users/YourUsername/rocketmqconnect/test-sink-file.txt", + "connect.topicnames": "fileTopic" +}' ``` -curl -X POST -H "Content-Type: application/json" http://127.0.0.1:8082/connectors/fileSinkConnector -d '{"connector.class":"org.apache.rocketmq.connect.file.FileSinkConnector","filename":"test-sink-file.txt","connect.topicnames":"fileTopic"}' -cat test-sink-file.txt +If the curl request returns status 200, it indicates successful creation. Example response: +>{"status":200,"body":{"connector.class":"org.apache.rocketmq.connect.file.FileSinkConnector","filename":"/Users/YourUsername/rocketmqconnect/test-sink-file.txt","connect.topicnames":"fileTopic"}} + +View the log file: +```shell +tail -100f ~/logs/rocketmqconnect/connect_runtime.log ``` +If you see the following log, it means the file sink connector has started successfully: +> Start connector fileSinkConnector and set target state STARTED successed!! -> tail -100f ~/logs/rocketmqconnect/connect_runtime.log +Check if the sink connector has written data to the destination file: +```shell +cat /Users/YourUsername/rocketmqconnect/test-sink-file.txt +``` -If you see the following log message, it means the file sink connector has started successfully. +If the test-sink-file.txt file is generated and its content is the same as the +test-source-file.txt, it means the entire process is running correctly. -> 2019-07-16 11:24:58 INFO pool-7-thread-2 - **Sink task start**, config:{"properties":{"source-record-... +Continue writing test data to the source file test-source-file.txt: +```shell +cd /Users/YourUsername/rocketmqconnect/ -If test-sink-file.txt is generated and its content is the same as source-file.txt, it means that the entire process is running normally. +echo "Say Hi to\r\nRMQ Connector\r\nAgain" >> test-source-file.txt -The file contents may be in a different order, which is normal because the order of messages received from different queues in RocketMQ may also be inconsistent. +# Wait a few seconds, check if rocketmq-connect replicate data to sink file succeed +sleep 10 +cat /Users/YourUsername/rocketmqconnect/test-sink-file.txt +``` + +**Note**: The order of file contents may vary because the `rocketmq-connect-sample` uses `normal message` when +sending and receiving messages to/from a RocketMQ topic. This is different from `ordered message`, and consuming +`normal messages` does not guarantee the order. #### sink connector configuration instructions | key | nullable | default | description | | ------------------ | -------- | ------- | ------------------------------------------------------------ | | connector.class | false | | The class name (including the package name) that implements the Connector interface | -| filename | false | | The sink pulls data and saves it to a file. | -| connect.topicnames | false | | The topics of the data messages that the sink needs to process. | +| filename | false | | The sink pulls data and saves it to a file(recommended to use absolute path) | +| connect.topicnames | false | | The topics of the data messages that the sink needs to process | -``` -Tips:The configuration file instructions for the sample rocketmq-connect-sample are for reference only, different source/sink connectors have different configurations, please refer to the specific source/sink connector. -``` + +**Tips**:The configuration file instructions for the sample rocketmq-connect-sample are for reference only, different source/sink connectors have different configurations, please refer to the specific source/sink connector. ## 6. Stop connector +The RESTful command format for stopping connectors is +`http://(your worker ip):(port)/connectors/(connector name)/stop` +To stop the two connectors in the demo, you can use the following commands: ```shell -#GET request -http://(your worker ip):(port)/connectors/(connector name)/stop - -#Stopping the two connectors in the demo -curl http://127.0.0.1:8082/connectors/fileSinkConnector/stop -curl http://127.0.0.1:8082/connectors/fileSourceConnector/stop - +curl http://127.0.0.1:8082/connectors/fileSinkConnector/stop +curl http://127.0.0.1:8082/connectors/fileSourceConnector/stop ``` -Seeing the following log message indicates that the connector has been successfully stopped. +If the curl request returns a status of 200, it indicates successful stopping of the connectors. +Example response: +>{"status":200,"body":"Connector [fileSinkConnector] deleted successfully"} ->**Source task stop**, config:{"properties":{"source-record-converter":"org.apache.rocketmq.connect.runtime.converter.JsonConverter","filename":"/home/zhoubo/IdeaProjects/my-new3-rocketmq-externals/rocketmq-connect/rocketmq-connect-runtime/source-file.txt","task-class":"org.apache.rocketmq.connect.file.FileSourceTask","topic":"fileTopic","connector-class":"org.apache.rocketmq.connect.file.FileSourceConnector","update-timestamp":"1564765189322"}} - -## 7. Stopping the Worker process +If you see the following log message, it means the file sink connector has been +successfully shut down: +```shell +tail -100f ~/logs/rocketmqconnect/connect_default.log ``` +> Completed shutdown for connectorName:fileSinkConnector + +## 7. Stop the Worker process + +```shell +cd $RMQ_CONNECT_HOME/distribution/target/rocketmq-connect-0.0.1-SNAPSHOT/rocketmq-connect-0.0.1-SNAPSHOT sh bin/connectshutdown.sh ``` ## 8. Log directory ->${user.home}/logs/rocketmqconnect - -## 9. Configuration file - -The default directory for persistent configuration files is /tmp/storeRoot. +You can use the following commands to view the log directory: -| key | description | -| -------------------- | --------------------------------------------------------- | -| connectorConfig.json | Connector configuration persistence files | -| position.json | Source connect data processing progress persistence files | -| taskConfig.json | Task configuration persistence files | -| offset.json | Sink connect data consumption progress persistence files | -| connectorStatus.json | Connector status persistence files | -| taskStatus.json | Task status persistence files | +```shell +ls $HOME/logs/rocketmqconnect +ls ~/logs/rocketmqconnect +``` -## 10. Configuration Instructions +## 9. Configuration File Instructions Modify the RESTful port, storeRoot path, Nameserver address, and other information based on your usage. -The file location is in the conf/connect-standalone.conf under the work startup directory. +Here is an example of a configuration file: ```shell #current cluster node uniquely identifies @@ -168,16 +247,29 @@ workerId=DEFAULT_WORKER_1 httpPort=8082 # Local file dir for config store -storePathRootDir=/home/connect/storeRoot +storePathRootDir=/Users/YourUsername/rocketmqconnect/storeRoot #You need to modify it to your own rocketmq nameserver endpoint. # RocketMQ namesrvAddr namesrvAddr=127.0.0.1:9876 -#This is used for loading Connector plugins, similar to how JVM loads jar packages or classes at startup. This directory is used for placing Connector-related implementation plugins and supports both files and directories. -# Source or sink connector jar file dir -pluginPaths=rocketmq-connect-sample/target/rocketmq-connect-sample-0.0.1-SNAPSHOT.jar +# Plugin path for loading Source/Sink Connectors +# The rocketmq-connect project already includes the rocketmq-connect-sample module by default, so no configuration is needed here. +pluginPaths= +``` + +Explanation of storePathRootDir configuration: + +In standalone mode, RocketMQ Connect persists the synchronization checkpoint information +to the local file directory specified by storePathRootDir. The persistent files include: + + +| key | description | +| -------------------- | --------------------------------------------------------- | +| connectorConfig.json | Connector configuration persistence files | +| position.json | Source connect data processing progress persistence files | +| taskConfig.json | Task configuration persistence files | +| offset.json | Sink connect data consumption progress persistence files | +| connectorStatus.json | Connector status persistence files | +| taskStatus.json | Task status persistence files | -# Addition : put the Connector-related implementation plugins in the specified folder. -# pluginPaths=/usr/local/connector-plugins/* -``` \ No newline at end of file diff --git a/i18n/en/docusaurus-plugin-content-docs/version-5.0/10-connect/07RocketMQ Connect In Action4.md b/i18n/en/docusaurus-plugin-content-docs/version-5.0/10-connect/07RocketMQ Connect In Action4.md index 3f0dd01471..f7e29ecd3e 100644 --- a/i18n/en/docusaurus-plugin-content-docs/version-5.0/10-connect/07RocketMQ Connect In Action4.md +++ b/i18n/en/docusaurus-plugin-content-docs/version-5.0/10-connect/07RocketMQ Connect In Action4.md @@ -1,6 +1,6 @@ # RocketMQ Connect in Action 4 -SFTP Server(file data) -> RocketMQ Connect +SFTP Server (File Data) -> RocketMQ Connect -> SFTP Server (File) ## Preparation @@ -9,55 +9,70 @@ SFTP Server(file data) -> RocketMQ Connect 1. Linux/Unix/Mac 2. 64bit JDK 1.8+; 3. Maven 3.2.x+; -4. Start [RocketMQ](https://rocketmq.apache.org/docs/quick-start/); +4. Start RocketMQ. Either [RocketMQ 4.x](https://rocketmq.apache.org/docs/4.x/) or + [RocketMQ 5.x](https://rocketmq.apache.org/docs/quickStart/01quickstart/) 5.x version can be used; +5. Test RocketMQ message sending and receiving using the tool. +Here, use the environment variable NAMESRV_ADDR to inform the tool client of the NameServer address of RocketMQ as localhost:9876. +```shell +#$ cd distribution/target/rocketmq-4.9.7/rocketmq-4.9.7 +$ cd distribution/target/rocketmq-5.1.4/rocketmq-5.1.4 -**Tips** : ${ROCKETMQ_HOME} locational instructions +$ export NAMESRV_ADDR=localhost:9876 +$ sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer + SendResult [sendStatus=SEND_OK, msgId= ... ->bin-release.zip version:/rocketmq-all-4.9.4-bin-release -> ->source-release.zip version:/rocketmq-all-4.9.4-source-release/distribution +$ sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer + ConsumeMessageThread_%d Receive New Messages: [MessageExt... +``` +**Note**: RocketMQ has the feature of automatically creating Topic and Group. When sending or subscribing to messages, +if the corresponding Topic or Group does not exist, RocketMQ will automatically create them. Therefore, +there is no need to create Topic and Group in advance. -### Start Connect +### Build Connector Runtime +```shell +git clone https://github.com/apache/rocketmq-connect.git -#### **Compiling Connector Plugin** +cd rocketmq-connect -RocketMQ Connector SFTP +export RMQ_CONNECT_HOME=`pwd` -``` -$ cd rocketmq-connect/connectors/rocketmq-connect-sftp/ -$ mvn clean package -Dmaven.test.skip=true +mvn -Prelease-connect -Dmaven.test.skip=true clean install -U ``` -Move the compiled RocketMQ Connector SFTP package into the Runtime loading directory. The command is as follows: +### Build SFTP Connector Plugin ``` -mkdir -p /usr/local/connector-plugins -cp target/rocketmq-connect-sftp-0.0.1-SNAPSHOT-jar-with-dependencies.jar /usr/local/connector-plugins -``` - -#### Start Connect Runtime +cd $RMQ_CONNECT_HOME/connectors/rocketmq-connect-sftp/ +mvn clean package -Dmaven.test.skip=true ``` -cd rocketmq-connect -mvn -Prelease-connect -DskipTests clean install -U +Put the compiled jar of the SFTP RocketMQ Connector into the Plugin directory for runtime loading. +``` +mkdir -p /Users/YourUsername/rocketmqconnect/connector-plugins +cp target/rocketmq-connect-sftp-0.0.1-SNAPSHOT-jar-with-dependencies.jar /Users/YourUsername/rocketmqconnect/connector-plugins ``` -Modify the configuration `connect-standalone.conf`, the main configuration is as follows +### Run Connector Worker in Standalone Mode + +Modify the `connect-standalone.conf` file to configure the RocketMQ connection +address and other information. ``` -$ cd distribution/target/rocketmq-connect-0.0.1-SNAPSHOT/rocketmq-connect-0.0.1-SNAPSHOT -$ vim conf/connect-standalone.conf +cd $RMQ_CONNECT_HOME/distribution/target/rocketmq-connect-0.0.1-SNAPSHOT/rocketmq-connect-0.0.1-SNAPSHOT + +vim conf/connect-standalone.conf ``` +Example configuration information is as follows: ``` workerId=standalone-worker -storePathRootDir=/tmp/storeRoot +storePathRootDir=/Users/YourUsername/rocketmqconnect/storeRoot ## Http port for user to access REST API httpPort=8082 @@ -67,84 +82,164 @@ namesrvAddr=localhost:9876 # RocketMQ acl aclEnable=false -accessKey=rocketmq -secretKey=12345678 +#accessKey=rocketmq +#secretKey=12345678 -autoCreateGroupEnable=false clusterName="DefaultCluster" -# Core configuration, configure the plugin directory that was previously compiled here. -# Source or sink connector jar file dir,The default value is rocketmq-connect-sample -pluginPaths=/usr/local/connector-plugins +# Plugin path for loading Source/Sink Connectors +pluginPaths=/Users/YourUsername/rocketmqconnect/connector-plugins ``` +In standalone mode, RocketMQ Connect persistently stores the synchronization checkpoint information +in the local file directory specified by storePathRootDir. + +>storePathRootDir=/Users/YourUsername/rocketmqconnect/storeRoot +If you want to reset the synchronization checkpoint, you need to delete the persisted checkpoint information files. +```shell +rm -rf /Users/YourUsername/rocketmqconnect/storeRoot/* ``` -cd distribution/target/rocketmq-connect-0.0.1-SNAPSHOT/rocketmq-connect-0.0.1-SNAPSHOT +To start Connector Worker in standalone mode: +``` sh bin/connect-standalone.sh -c conf/connect-standalone.conf & - ``` ### Set up an SFTP server -Use the built-in SFTP server on MAC OS. +SFTP (SSH File Transfer Protocol) is a file transfer protocol used for secure file transfers between computers. +SFTP is built on top of the SSH (Secure Shell) protocol and utilizes encryption and authentication. -[Allow remote computers to access your Mac](https://support.apple.com/zh-cn/guide/mac-help/mchlp1066/mac) +We will use the built-in SFTP service in macOS (by enabling "Remote Login" access). +For detailed instructions, please refer to the +[Allow a remote computer to access your Mac](https://support.apple.com/guide/mac-help/allow-a-remote-computer-to-access-your-mac-mchlp1066/mac)document. + +### Create Source Test File +Create a test file named `source.txt` and write some test data to it: + +```shell +mkdir -p /Users/YourUsername/rocketmqconnect/sftp-test/ -### Test data +cd /Users/YourUsername/rocketmqconnect/sftp-test/ -Log in to the SFTP server and place a file called source.txt with specific contents in the user directory, for example: /path/to/. +touch source.txt + +echo 'John Doe|100000202211290001|20221129001|30000.00|2022-11-28|03:00:00|7.00 +Jane Smith|100000202211290002|20221129002|40000.00|2022-11-28|04:00:00|9.00 +Bob Johnson|100000202211290003|20221129003|50000.00|2022-11-28|05:00:00|12.00' >> source.txt +``` + +Log in to the SFTP service to verify that you can access it normally. Enter the following command, then enter your +password : +```shell +# sftp -P port YourUsername@hostname +sftp -P 22 YourUsername@127.0.0.1 +``` +**Note**: Since this is the SFTP service provided by your local MAC OS, the address is `127.0.0.1` and the port is the default 22. -```text -zhangsan|100000202211290001|20221129001|30000.00|2022-11-28|03:00:00|7.00 -lisi|100000202211290002|20221129002|40000.00|2022-11-28|04:00:00|9.00 -zhaowu|100000202211290003|20221129003|50000.00|2022-11-28|05:00:00|12.00 +```shell +sftp> cd /Users/YourUsername/rocketmqconnect/sftp-test/ +sftp> ls source.txt +sftp> bye ``` ## Start Connector -### Start SFTP source connector +### Start SFTP Source Connector -Synchronize the SFTP file: source.txt -Purpose: by logging into the SFTP server, parsing the file and encapsulating it into a generic ConnectRecord object, sending it to the RocketMQ Topic. +Run the following command to start the SFTP source connector. This connector will connect to the +SFTP service to read from the `source.txt` file. For each line of text in the file, the connector +will parse and package the contents into a generic ConnectRecord object, which will then be sent +to a RocketMQ topic for consumption by sink connectors. ```shell curl -X POST --location "http://localhost:8082/connectors/SftpSourceConnector" --http1.1 \ -H "Host: localhost:8082" \ -H "Content-Type: application/json" \ - -d "{ - \"connector.class\": \"org.apache.rocketmq.connect.http.sink.SftpSourceConnector\", - \"host\": \"127.0.0.1\", - \"port\": 22, - \"username\": \"wencheng\", - \"password\": \"1617\", - \"filePath\": \"/Users/wencheng/Documents/source.txt\", - \"connect.topicname\": \"sftpTopic\", - \"fieldSeparator\": \"|\", - \"fieldSchema\": \"username|idCardNo|orderNo|orderAmount|trxDate|trxTime|profit\" - }" + -d '{ + "connector.class": "org.apache.rocketmq.connect.http.sink.SftpSourceConnector", + "host": "127.0.0.1", + "port": 22, + "username": "YourUsername", + "password": "yourPassword", + "filePath": "/Users/YourUsername/rocketmqconnect/sftp-test/source.txt", + "connect.topicname": "sftpTopic", + "fieldSeparator": "|", + "fieldSchema": "username|idCardNo|orderNo|orderAmount|trxDate|trxTime|profit" + }' +``` + +If the curl request returns status: 200, it indicates that the connector was successfully +created. An example response would look like this: +```json +{"status":200,"body":{"connector.class":"... +``` + +To confirm that the file source connector has started successfully, run the following command: +```shell +tail -100f ~/logs/rocketmqconnect/connect_runtime.log ``` -After running the above commands, the file data on the SFTP server will be organized into data in the specified format, and written to MQ. Afterwards, it can be consumed by the sink connector or other business systems. +>Start connector SftpSourceConnector and set target state STARTED successed!! -### Start SFTP sink connector +### Start SFTP Sink Connector -Purpose: by consuming the data in the Topic, use the SFTP protocol to write to the target file. +Run the following command to start the SFTP sink connector. This connector will subscribe to the RocketMQ topic +to consume messages and convert each one into a single line of text, which will then be written to the destination +file `sink.txt` using the SFTP protocol: ```shell curl -X POST --location "http://localhost:8082/connectors/SftpSinkConnector" --http1.1 \ -H "Host: localhost:8082" \ -H "Content-Type: application/json" \ - -d "{ - \"connector.class\": \"org.apache.rocketmq.connect.http.sink.SftpSinkConnector\", - \"host\": \"127.0.0.1\", - \"port\": 22, - \"username\": \"wencheng\", - \"password\": \"1617\", - \"filePath\": \"/Users/wencheng/Documents/sink.txt\", - \"connect.topicnames\": \"sftpTopic\", - \"fieldSeparator\": \"|\", - \"fieldSchema\": \"username|idCardNo|orderNo|orderAmount|trxDate|trxTime|profit\" - }" + -d '{ + "connector.class": "org.apache.rocketmq.connect.http.sink.SftpSinkConnector", + "host": "127.0.0.1", + "port": 22, + "username": "YourUsername", + "password": "yourPassword", + "filePath": "/Users/YourUsername/rocketmqconnect/sftp-test/sink.txt", + "connect.topicnames": "sftpTopic", + "fieldSeparator": "|", + "fieldSchema": "username|idCardNo|orderNo|orderAmount|trxDate|trxTime|profit" + }' ``` + +If the curl request returns status: 200, it indicates that the connector was successfully +created. An example response would look like this: +```json +{"status":200,"body":{"connector.class":"... +``` + +Check the logs to confirm successful startup of the SFTP sink connector: +```shell +tail -100f ~/logs/rocketmqconnect/connect_runtime.log +``` + +>Start connector SftpSinkConnector and set target state STARTED successed!! + +Confirm that the data has been written to the destination file by running the following command: +```shell +cat /Users/YourUsername/rocketmqconnect/sftp-test/sink.txt +``` + +If the `sink.txt` file has been generated and its contents match those of the `source.txt` file, the entire process is working correctly. + +Write more test data to the `source.txt` file to continue testing: +```shell +cd /Users/YourUsername/rocketmqconnect/sftp-test/ + +echo 'John Doe|100000202211290001|20221129001|30000.00|2022-11-28|03:00:00|7.00 +Jane Smith|100000202211290002|20221129002|40000.00|2022-11-28|04:00:00|9.00 +Bob Johnson|100000202211290003|20221129003|50000.00|2022-11-28|05:00:00|12.00' >> source.txt + +# Wait a few seconds to give the connector time to replicate data to the sink file. +sleep 10 + +cat /Users/YourUsername/rocketmqconnect/sftp-test/sink.txt +``` + +**Note**: The order of file contents may vary because the `rocketmq-connect-sftp` uses `normal message` when +sending and receiving messages to/from a RocketMQ topic. This is different from `ordered message`, and consuming +`normal messages` does not guarantee the order. \ No newline at end of file diff --git a/i18n/en/docusaurus-plugin-content-docs/version-5.0/10-connect/08RocketMQ Connect In Action5-ES.md b/i18n/en/docusaurus-plugin-content-docs/version-5.0/10-connect/08RocketMQ Connect In Action5-ES.md index 269fc8cd02..d91129dee4 100644 --- a/i18n/en/docusaurus-plugin-content-docs/version-5.0/10-connect/08RocketMQ Connect In Action5-ES.md +++ b/i18n/en/docusaurus-plugin-content-docs/version-5.0/10-connect/08RocketMQ Connect In Action5-ES.md @@ -1,6 +1,6 @@ # RocketMQ Connect in Action 5 -Elsticsearch Source - >RocketMQ Connect -> Elasticsearch Sink +Elasticsearch Source -> RocketMQ Connect -> Elasticsearch Sink ## preparatory work @@ -8,54 +8,77 @@ Elsticsearch Source - >RocketMQ Connect -> Elasticsearch Sink 1. Linux/Unix/Mac 2. 64bit JDK 1.8+; -3. Maven 3.2.x or later; -4. Start [RocketMQ](https://rocketmq.apache.org/docs/quick-start/); +3. Maven 3.2.x+; +4. Start RocketMQ. Either [RocketMQ 4.x](https://rocketmq.apache.org/docs/4.x/) or + [RocketMQ 5.x](https://rocketmq.apache.org/docs/quickStart/01quickstart/) 5.x version can be used; +5. Test RocketMQ message sending and receiving using the tool. +Here, use the environment variable NAMESRV_ADDR to inform the tool client of the NameServer address of RocketMQ as localhost:9876. +```shell +#$ cd distribution/target/rocketmq-4.9.7/rocketmq-4.9.7 +$ cd distribution/target/rocketmq-5.1.4/rocketmq-5.1.4 -**tips** : ${ROCKETMQ_HOME} Position Description +$ export NAMESRV_ADDR=localhost:9876 +$ sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer + SendResult [sendStatus=SEND_OK, msgId= ... ->bin-release.zip version:/rocketmq-all-4.9.4-bin-release -> ->source-release.zip versioon:/rocketmq-all-4.9.4-source-release/distribution +$ sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer + ConsumeMessageThread_%d Receive New Messages: [MessageExt... +``` +**Note**: RocketMQ has the feature of automatically creating Topic and Group. When sending or subscribing to messages, +if the corresponding Topic or Group does not exist, RocketMQ will automatically create them. Therefore, +there is no need to create Topic and Group in advance. -### Start Connect +Here's the English translation of the content: +### Building the Connector Runtime -#### Connector plugin compilation +Clone the repository and build the RocketMQ Connect project: -Elasticsearch RocketMQ Connector -``` -$ cd rocketmq-connect/connectors/rocketmq-connect-elasticsearch/ -$ mvn clean package -Dmaven.test.skip=true -``` +```shell +git clone https://github.com/apache/rocketmq-connect.git -Move the compiled Elasticsearch RocketMQ Connector package into the Runtime load directory. The command is as follows: -``` -mkdir -p /usr/local/connector-plugins -cp rocketmq-connect-elasticsearch/target/rocketmq-connect-elasticsearch-1.0.0-jar-with-dependencies.jar /usr/local/connector-plugins +cd rocketmq-connect + +export RMQ_CONNECT_HOME=`pwd` + +mvn -Prelease-connect -Dmaven.test.skip=true clean install -U ``` +### Build Elasticsearch Connector Plugin +Build the Elasticsearch RocketMQ Connector plugin: -#### Start Connect Runtime +```shell +cd $RMQ_CONNECT_HOME/connectors/rocketmq-connect-elasticsearch/ +mvn clean package -Dmaven.test.skip=true ``` -cd rocketmq-connect -mvn -Prelease-connect -DskipTests clean install -U +Copy the compiled Elasticsearch RocketMQ Connector plugin JAR file into the plugin directory used by the runtime: -``` +```shell +mkdir -p /Users/YourUsername/rocketmqconnect/connector-plugins -Update `connect-standalone.conf` ,Key configurations are as follows: +cp target/rocketmq-connect-elasticsearch-1.0.0-jar-with-dependencies.jar /Users/YourUsername/rocketmqconnect/connector-plugins ``` -$ cd distribution/target/rocketmq-connect-0.0.1-SNAPSHOT/rocketmq-connect-0.0.1-SNAPSHOT -$ vim conf/connect-standalone.conf + +### Run Connector Worker in Standalone Mode + +Modify the `connect-standalone.conf` file to configure the RocketMQ connection +address and other information. + +```shell +cd $RMQ_CONNECT_HOME/distribution/target/rocketmq-connect-0.0.1-SNAPSHOT/rocketmq-connect-0.0.1-SNAPSHOT + +vim conf/connect-standalone.conf ``` +Example configuration information is as follows: ``` workerId=standalone-worker -storePathRootDir=/tmp/storeRoot +storePathRootDir=/Users/YourUsername/rocketmqconnect/storeRoot ## Http port for user to access REST API httpPort=8082 @@ -65,53 +88,217 @@ namesrvAddr=localhost:9876 # RocketMQ acl aclEnable=false -accessKey=rocketmq -secretKey=12345678 +#accessKey=rocketmq +#secretKey=12345678 -autoCreateGroupEnable=false clusterName="DefaultCluster" -# Core configuration where the plugin directory where you compiled the elasticsearch package is located -# Source or sink connector jar file dir,The default value is rocketmq-connect-sample -pluginPaths=/usr/local/connector-plugins +# Plugin path for loading Source/Sink Connectors +pluginPaths=/Users/YourUsername/rocketmqconnect/connector-plugins ``` +In standalone mode, RocketMQ Connect persistently stores the synchronization checkpoint information +in the local file directory specified by storePathRootDir. + +>storePathRootDir=/Users/YourUsername/rocketmqconnect/storeRoot +If you want to reset the synchronization checkpoint, delete the persistence files: +```shell +rm -rf /Users/YourUsername/rocketmqconnect/storeRoot/* ``` -cd distribution/target/rocketmq-connect-0.0.1-SNAPSHOT/rocketmq-connect-0.0.1-SNAPSHOT +To start Connector Worker in standalone mode: +``` sh bin/connect-standalone.sh -c conf/connect-standalone.conf & +``` + +### Set Up Elasticsearch Services + +Elasticsearch is an open-source search and analytics engine. + +We'll use two separate Docker instances of Elasticsearch to serve as our source and destination databases: ``` +docker pull docker.elastic.co/elasticsearch/elasticsearch:7.15.1 -### Elasticsearch Image +docker run --name es1 -p 9200:9200 -p 9300:9300 -e "discovery.type=single-node" -e "ES_JAVA_OPTS=-Xms1g -Xmx1g" \ +-v /Users/YourUsername/rocketmqconnect/es/es1_data:/usr/share/elasticsearch/data \ +-d docker.elastic.co/elasticsearch/elasticsearch:7.15.1 -Use docker to build the Elasticsearch database +docker run --name es2 -p 9201:9200 -p 9301:9300 -e "discovery.type=single-node" -e "ES_JAVA_OPTS=-Xms1g -Xmx1g" \ +-v /Users/YourUsername/rocketmqconnect/es/es2_data:/usr/share/elasticsearch/data \ +-d docker.elastic.co/elasticsearch/elasticsearch:7.15.1 ``` -# starting a elasticsearch instance -docker run --name my-elasticsearch -p 9200:9200 -p 9300:9300 -e "ES_JAVA_OPTS=-Xms1g -Xmx1g" -d 74c2e0ec249c + +Explanation of Docker commands: + +- `--name es2`: Specifies a name for the container, e.g., `es2`. +- `-p 9201:9200 -p 9301:9300`: Maps ports 9200 and 9300 on the Elasticsearch container to host ports 9201 and 9301 so that the Elasticsearch service can be accessed via the host. +- `-e discovery.type=single-node`: configures Elasticsearch to work on a single node without discovering other nodes in a cluster, suitable for single-server deployment. +- `-v /Users/YourUsername/rocketmqconnect/es/es2_data:/usr/share/elasticsearch/data`: Mounts a directory on the host to `/usr/share/elasticsearch/data` within the container for persistent storage of Elasticsearch data. + +This runs a custom-configured instance of Elasticsearch with persistent data storage on a container accessible through port 9200 on the host machine, making it useful for development or testing environments on a local machine. + +View the Elasticsearch logs: + ``` -### Kibana Image +docker logs -f es1 + +docker logs -f es2 +``` + +Verify that Elasticsearch has started successfully: -Use docker to build the Kibana environment ``` -docker run --name my-kibana -e ELASTICSEARCH_URL=http://192.168.0.101:9200 -p 5601:5601 -d 5dca66b41943 +# Check Elasticsearch instance 1 +curl -XGET http://localhost:9200 + +# Check Elasticsearch instance 2 +curl -XGET http://localhost:9201 ``` +A successful connection and correct operation will result in JSON responses containing information +about Elasticsearch and its version number. -### test data +### Set Up Kibana Services +Kibana is an open-source data visualization tool that allows users to interactively explore +and understand data stored within Elasticsearch clusters. It offers rich features such as charts, graphs, and dashboards. -Create test data with kibana Dev Tools: reference [console-ibana](https://www.elastic.co/guide/en/kibana/8.5/console-kibana.html#console-kibana); +For convenience, we'll set up two separate instances of Kibana in Docker and link them to +our previously established Elasticsearch containers using the following command: +``` +docker pull docker.elastic.co/kibana/kibana:7.15.1 -Source Index:connect_es +docker run --name kibana1 --link es1:elasticsearch -p 5601:5601 -d docker.elastic.co/kibana/kibana:7.15.1 -## Start Connector +docker run --name kibana2 --link es2:elasticsearch -p 5602:5601 -d docker.elastic.co/kibana/kibana:7.15.1 +``` -### Start Elasticsearch source connector +Explanation of Docker Commands: +- `--name kibana2`: Assigns a name to the new container, e.g., kibana2 +- `--link es2:elasticsearch`: Links the container to another named Elasticsearch instance (in this case, 'es2'). This enables communication between Kibana and Elasticsearch. +- `-p 5602:5601`: Maps Kibana's default port (5601) to the same port on the host machine to make it accessible through the browser. +- `-d`: runs the Docker container in detached mode. -Synchronizing source index data: connect_es -effect:Send a RocketMQ Topic by parsing Elasticsearch document data and wrapping it into a generic ConnectRecord object +Once the container has launched, you can monitor its log output: + +``` +docker logs -f kibana1 + +docker logs -f kibana2 +``` + + +To access Kibana console pages, simply visit following addresses in your browser +- kibana1: http://localhost:5601 +- kibana2:http://localhost:5602 + +If they load correctly, it indicates successful startup of the respective Kibana instances. + +### Write Test Data to the Source Elasticsearch +Kibana's Dev Tools can help you interact and operate directly with Elasticsearch in Kibana. +You can execute various queries and operations, analyze and understand the returned data. +Refer to the documentation [console-kibana](https://www.elastic.co/guide/en/kibana/8.9/console-kibana.html). + +#### Bulk Write Test Data +Access the Kibana1 console through the browser, find Dev Tools from the left menu, +and enter the following commands on the page to write test data: + +``` +POST /_bulk +{ "index" : { "_index" : "connect_es" } } +{ "id": "1", "field1": "value1", "field2": "value2" } +{ "index" : { "_index" : "connect_es" } } +{ "id": "2", "field1": "value3", "field2": "value4" } +``` + +**Note**: +- connect_es: The index name for the data. +- id/field1/field2: These are field names, and 1, value1, value2 represent the values for the fields. + +**Note**: There is a limitation in `rocketmq-connect-elasticsearch`, which requires a field in the data that +can be used for >= comparison operations (string or number). This field will be used to record the +synchronization checkpoint. In the above example, the `id` field is a globally unique, incrementing numerical field. + +#### Query Data +To query data within an index, use the following command: +``` +GET /connect_es/_search +{ + "size": 100 +} +``` + +If there is no data available, the response will be: +``` +{ + "error" : { + ... + "type" : "index_not_found_exception", + "reason" : "no such index [connect_es]", + "resource.type" : "index_or_alias", + "resource.id" : "connect_es", + "index_uuid" : "_na_", + "index" : "connect_es" + }, + "status" : 404 +} +``` + +If there is data available, the response will be: +``` +{ + ... + "hits" : { + "total" : { + "value" : 2, + "relation" : "eq" + }, + "max_score" : 1.0, + "hits" : [ + { + "_index" : "connect_es", + "_type" : "_doc", + "_id" : "_dx49osBb46Z9cN4hYCg", + "_score" : 1.0, + "_source" : { + "id" : "1", + "field1" : "value1", + "field2" : "value2" + } + }, + { + "_index" : "connect_es", + "_type" : "_doc", + "_id" : "_tx49osBb46Z9cN4hYCg", + "_score" : 1.0, + "_source" : { + "id" : "2", + "field1" : "value3", + "field2" : "value4" + } + } + ] + } +} + +``` + +#### Delete Data + +If you need to delete data within an index due to repeated testing or other reasons, you can use the following command: + +``` +DELETE /connect_es +``` + +## Start Connector + +### Start Elasticsearch Source Connector +Run the following command to start the ES source connector. The connector will connect to Elasticsearch +and read document data from the connect_es index. It will parse the Elasticsearch document data and +package it into a generic ConnectRecord object, which will be sent to a RocketMQ topic for consumption by the Sink Connector. ``` curl -X POST -H "Content-Type: application/json" http://127.0.0.1:8082/connectors/elasticsearchSourceConnector -d '{ @@ -131,26 +318,60 @@ curl -X POST -H "Content-Type: application/json" http://127.0.0.1:8082/connector }' ``` -### Start Elasticsearch sink connector +**Note**: The startup command specifies that the source ES should synchronize the connect_es index, +and the incrementing field in the index is id. Data will be fetched starting from id=1. + +If the curl request returns status:200, it indicates a successful creation, and the sample response will be: +>{"status":200,"body":{"connector.class":"... + +If you see the following logs, it indicates that the file source connector has started successfully. +```shell +tail -100f ~/logs/rocketmqconnect/connect_runtime.log +``` + +>Start connector elasticsearchSourceConnector and set target state STARTED successed!! -effect:Data is written to the target index by consuming the Topic +### Start Elasticsearch Sink Connector +Run the following command to start the ES sink connector. The connector will subscribe to data from +the RocketMQ topic and consume it. It will convert each message into document data and write it to the destination ES. ``` -curl -X POST -H "Content-Type: application/json" http://127.0.0.1:8082/connectors/ElasticsearchSinkConnector -d '{ +curl -X POST -H "Content-Type: application/json" http://127.0.0.1:8082/connectors/elasticsearchSinkConnector -d '{ "connector.class":"org.apache.rocketmq.connect.elasticsearch.connector.ElasticsearchSinkConnector", "elasticsearchHost":"localhost", - "elasticsearchPort":9202, + "elasticsearchPort":9201, "max.tasks":2, "connect.topicnames":"ConnectEsTopic", "value.converter":"org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter", "key.converter":"org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter" }' +``` + +**Note**: The startup command specifies the address and port of the destination ES, which corresponds to +the previously started ES2 in Docker. + +If the curl request returns status:200, it indicates a successful creation, and the sample response will be: +>{"status":200,"body":{"connector.class":"... + +If you see the following logs, it indicates that the file source connector has started successfully: +```shell +tail -100f ~/logs/rocketmqconnect/connect_runtime.log ``` -note:Local testing requires you to start the Elasticsearch process on two different ports +>Start connector elasticsearchSinkConnector and set target state STARTED successed!! + +To check if the sink connector has written data to the destination ES index: + +1. Access the Kibana2 console address in the browser: http://localhost:5602 +2. In the Kibana2 Dev Tools page, query the data within the index. If it matches the data in the source ES1, it means the connector is running properly. + +``` +GET /connect_es/_search +{ + "size": 100 +} +``` -After the two Connector tasks are successfully created Whether the Elasticsearch specified by accessing sink contains data -New data added to the source index can be synchronized to the target index