diff --git a/.idea/vcs.xml b/.idea/vcs.xml index 59c36d8ab14..35eb1ddfbbc 100644 --- a/.idea/vcs.xml +++ b/.idea/vcs.xml @@ -1,32 +1,6 @@ - - + - - - - + \ No newline at end of file diff --git a/plugin-mapping.properties b/plugin-mapping.properties index d72508717b9..c781f7edc18 100644 --- a/plugin-mapping.properties +++ b/plugin-mapping.properties @@ -105,4 +105,6 @@ seatunnel.source.Persistiq = connector-http-persistiq seatunnel.sink.SelectDBCloud = connector-selectdb-cloud seatunnel.sink.Hbase = connector-hbase seatunnel.source.StarRocks = connector-starrocks +seatunnel.source.AuthSource = connector-http-auth + diff --git a/seatunnel-connectors-v2/connector-http/connector-http-plus/pom.xml b/seatunnel-connectors-v2/connector-http/connector-http-plus/pom.xml new file mode 100644 index 00000000000..d56f95cc3fe --- /dev/null +++ b/seatunnel-connectors-v2/connector-http/connector-http-plus/pom.xml @@ -0,0 +1,32 @@ + + + 4.0.0 + + org.apache.seatunnel + seatunnel-connectors-v2 + 2.3.2-SNAPSHOT + ../../pom.xml + + + connector-http-plus + SeaTunnel : Connectors V2 : Http : Plus + + + 8 + 8 + UTF-8 + + + + org.apache.seatunnel + connector-http-base + ${project.version} + + + org.apache.commons + commons-lang3 + + + \ No newline at end of file diff --git a/seatunnel-connectors-v2/connector-http/connector-http-plus/src/main/java/org/apache/seatunnel/plus/source/AbstractRequestHandler.java b/seatunnel-connectors-v2/connector-http/connector-http-plus/src/main/java/org/apache/seatunnel/plus/source/AbstractRequestHandler.java new file mode 100644 index 00000000000..67148945f94 --- /dev/null +++ b/seatunnel-connectors-v2/connector-http/connector-http-plus/src/main/java/org/apache/seatunnel/plus/source/AbstractRequestHandler.java @@ -0,0 +1,17 @@ +package org.apache.seatunnel.plus.source; + +import org.apache.seatunnel.connectors.seatunnel.http.config.HttpParameter; +import org.apache.seatunnel.shade.com.typesafe.config.Config; + +public abstract class AbstractRequestHandler { + + public void before(Config pluginConfig){ + + } + + public void after(Config pluginConfig){ + + } + + abstract public void request(Config pluginConfig, HttpParameter httpParameter); +} diff --git a/seatunnel-connectors-v2/connector-http/connector-http-plus/src/main/java/org/apache/seatunnel/plus/source/HttpPlusSource.java b/seatunnel-connectors-v2/connector-http/connector-http-plus/src/main/java/org/apache/seatunnel/plus/source/HttpPlusSource.java new file mode 100644 index 00000000000..fe8ab8f86ed --- /dev/null +++ b/seatunnel-connectors-v2/connector-http/connector-http-plus/src/main/java/org/apache/seatunnel/plus/source/HttpPlusSource.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.plus.source; + +import org.apache.seatunnel.api.common.PrepareFailException; +import org.apache.seatunnel.api.source.Boundedness; +import org.apache.seatunnel.api.source.SeaTunnelSource; +import org.apache.seatunnel.api.table.catalog.CatalogTableUtil; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.common.constants.JobMode; + + +import org.apache.seatunnel.common.exception.CommonErrorCode; +import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader; +import org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext; +import org.apache.seatunnel.connectors.seatunnel.http.config.HttpConfig; +import org.apache.seatunnel.connectors.seatunnel.http.exception.HttpConnectorException; +import org.apache.seatunnel.connectors.seatunnel.http.source.HttpSource; +import org.apache.seatunnel.connectors.seatunnel.http.source.HttpSourceReader; +import org.apache.seatunnel.connectors.seatunnel.http.source.SimpleTextDeserializationSchema; +import org.apache.seatunnel.format.json.JsonDeserializationSchema; +import org.apache.seatunnel.plus.source.config.HttpPlusSourceParameter; +import org.apache.seatunnel.shade.com.typesafe.config.Config; + +import java.util.Locale; + +import com.google.auto.service.AutoService; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@AutoService(SeaTunnelSource.class) +public class HttpPlusSource extends HttpSource { + + private Config pluginConfig; + private HttpPlusSourceParameter plusSourceParameter=new HttpPlusSourceParameter(); + + @Override + public String getPluginName() { + return "Plus"; + } + + @Override + public void prepare(Config pluginConfig) throws PrepareFailException { + //如果配置包名那么校验内容 + buildSchemaWithConfig(pluginConfig); + this.pluginConfig=pluginConfig; + } + + @Override + public AbstractSingleSplitReader createReader(SingleSplitReaderContext readerContext) throws Exception { + return new HttpPlusSourceReader(plusSourceParameter, + readerContext, + this.deserializationSchema, + jsonField, + contentField,pluginConfig); + } + + @Override + public Boundedness getBoundedness() { + if (JobMode.BATCH.equals(jobContext.getJobMode())) { + return Boundedness.BOUNDED; + } + throw new UnsupportedOperationException( + "http-plus source connector not support unbounded operation"); + } + + +} diff --git a/seatunnel-connectors-v2/connector-http/connector-http-plus/src/main/java/org/apache/seatunnel/plus/source/HttpPlusSourceFactory.java b/seatunnel-connectors-v2/connector-http/connector-http-plus/src/main/java/org/apache/seatunnel/plus/source/HttpPlusSourceFactory.java new file mode 100644 index 00000000000..6d385e99cd8 --- /dev/null +++ b/seatunnel-connectors-v2/connector-http/connector-http-plus/src/main/java/org/apache/seatunnel/plus/source/HttpPlusSourceFactory.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + *//* + + +package org.apache.seatunnel.auth.source; + +import org.apache.seatunnel.api.configuration.util.OptionRule; +import org.apache.seatunnel.api.table.factory.Factory; +import org.apache.seatunnel.connectors.seatunnel.http.source.HttpSourceFactory; + +import com.google.auto.service.AutoService; + +@AutoService(Factory.class) +public class AuthSourceFactory extends HttpSourceFactory { + @Override + public String factoryIdentifier() { + return "Gitlab"; + } + + @Override + public OptionRule optionRule() { + return getHttpBuilder().required(GitlabSourceConfig.ACCESS_TOKEN).build(); + } +} +*/ diff --git a/seatunnel-connectors-v2/connector-http/connector-http-plus/src/main/java/org/apache/seatunnel/plus/source/HttpPlusSourceReader.java b/seatunnel-connectors-v2/connector-http/connector-http-plus/src/main/java/org/apache/seatunnel/plus/source/HttpPlusSourceReader.java new file mode 100644 index 00000000000..9aeb4db2863 --- /dev/null +++ b/seatunnel-connectors-v2/connector-http/connector-http-plus/src/main/java/org/apache/seatunnel/plus/source/HttpPlusSourceReader.java @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.plus.source; + +import org.apache.seatunnel.api.serialization.DeserializationSchema; +import org.apache.seatunnel.api.source.Boundedness; +import org.apache.seatunnel.api.source.Collector; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.common.utils.JsonUtils; +import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader; +import org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext; +import org.apache.seatunnel.connectors.seatunnel.http.client.HttpClientProvider; +import org.apache.seatunnel.connectors.seatunnel.http.client.HttpResponse; +import org.apache.seatunnel.connectors.seatunnel.http.config.HttpParameter; +import org.apache.seatunnel.connectors.seatunnel.http.config.JsonField; +import org.apache.seatunnel.connectors.seatunnel.http.exception.HttpConnectorErrorCode; +import org.apache.seatunnel.connectors.seatunnel.http.exception.HttpConnectorException; +import org.apache.seatunnel.connectors.seatunnel.http.source.DeserializationCollector; +import org.apache.seatunnel.shade.com.typesafe.config.Config; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +import com.google.common.base.Strings; +import com.jayway.jsonpath.Configuration; +import com.jayway.jsonpath.JsonPath; +import com.jayway.jsonpath.Option; +import com.jayway.jsonpath.ReadContext; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class HttpPlusSourceReader extends AbstractSingleSplitReader { + protected final SingleSplitReaderContext context; + protected final HttpParameter httpParameter; + protected HttpClientProvider httpClient; + private final DeserializationCollector deserializationCollector; + private static final Option[] DEFAULT_OPTIONS = { + Option.SUPPRESS_EXCEPTIONS, Option.ALWAYS_RETURN_LIST, Option.DEFAULT_PATH_LEAF_TO_NULL + }; + private JsonPath[] jsonPaths; + private final JsonField jsonField; + private final String contentJson; + private Config pluginConfig; + private final Configuration jsonConfiguration = + Configuration.defaultConfiguration().addOptions(DEFAULT_OPTIONS); + + public HttpPlusSourceReader( + HttpParameter httpParameter, + SingleSplitReaderContext context, + DeserializationSchema deserializationSchema, + JsonField jsonField, + String contentJson, + Config pluginConfig) { + this.context = context; + this.httpParameter = httpParameter; + this.deserializationCollector = new DeserializationCollector(deserializationSchema); + this.jsonField = jsonField; + this.contentJson = contentJson; + this.pluginConfig=pluginConfig; + } + + @Override + public void open() { + httpClient = new HttpClientProvider(httpParameter); + } + + @Override + public void close() throws IOException { + if (Objects.nonNull(httpClient)) { + httpClient.close(); + } + } + + @Override + public void pollNext(Collector output) throws Exception { + try { + new QimenRequestHandler().request(pluginConfig,httpParameter); + HttpResponse response = + httpClient.execute( + this.httpParameter.getUrl(), + this.httpParameter.getMethod().getMethod(), + this.httpParameter.getHeaders(), + this.httpParameter.getParams(), + this.httpParameter.getBody()); + if (HttpResponse.STATUS_OK == response.getCode()) { + String content = response.getContent(); + if (!Strings.isNullOrEmpty(content)) { + if (contentJson != null) { + content = JsonUtils.stringToJsonNode(getPartOfJson(content)).toString(); + } + if (jsonField != null) { + this.initJsonPath(jsonField); + content = + JsonUtils.toJsonNode(parseToMap(decodeJSON(content), jsonField)) + .toString(); + } + deserializationCollector.collect(content.getBytes(), output); + } + return; + } + log.error( + "http client execute exception, http response status code:[{}], content:[{}]", + response.getCode(), + response.getContent()); + } catch (Exception e) { + log.error(e.getMessage(), e); + } finally { + if (Boundedness.BOUNDED.equals(context.getBoundedness())) { + // signal to the source that we have reached the end of the data. + log.info("Closed the bounded http source"); + context.signalNoMoreElement(); + } else { + if (httpParameter.getPollIntervalMillis() > 0) { + Thread.sleep(httpParameter.getPollIntervalMillis()); + } + } + } + } + + private List> parseToMap(List> datas, JsonField jsonField) { + List> decodeDatas = new ArrayList<>(datas.size()); + String[] keys = jsonField.getFields().keySet().toArray(new String[] {}); + + for (List data : datas) { + Map decodeData = new HashMap<>(jsonField.getFields().size()); + final int[] index = {0}; + data.forEach( + field -> { + decodeData.put(keys[index[0]], field); + index[0]++; + }); + decodeDatas.add(decodeData); + } + + return decodeDatas; + } + + private List> decodeJSON(String data) { + ReadContext jsonReadContext = JsonPath.using(jsonConfiguration).parse(data); + List> results = new ArrayList<>(jsonPaths.length); + for (JsonPath path : jsonPaths) { + List result = jsonReadContext.read(path); + results.add(result); + } + for (int i = 1; i < results.size(); i++) { + List result0 = results.get(0); + List result = results.get(i); + if (result0.size() != result.size()) { + throw new HttpConnectorException( + HttpConnectorErrorCode.FIELD_DATA_IS_INCONSISTENT, + String.format( + "[%s](%d) and [%s](%d) the number of parsing records is inconsistent.", + jsonPaths[0].getPath(), + result0.size(), + jsonPaths[i].getPath(), + result.size())); + } + } + + return dataFlip(results); + } + + private String getPartOfJson(String data) { + ReadContext jsonReadContext = JsonPath.using(jsonConfiguration).parse(data); + return JsonUtils.toJsonString(jsonReadContext.read(JsonPath.compile(contentJson))); + } + + private List> dataFlip(List> results) { + + List> datas = new ArrayList<>(); + for (int i = 0; i < results.size(); i++) { + List result = results.get(i); + if (i == 0) { + for (Object o : result) { + String val = o == null ? null : o.toString(); + List row = new ArrayList<>(jsonPaths.length); + row.add(val); + datas.add(row); + } + } else { + for (int j = 0; j < result.size(); j++) { + Object o = result.get(j); + String val = o == null ? null : o.toString(); + List row = datas.get(j); + row.add(val); + } + } + } + return datas; + } + + private void initJsonPath(JsonField jsonField) { + jsonPaths = new JsonPath[jsonField.getFields().size()]; + for (int index = 0; index < jsonField.getFields().keySet().size(); index++) { + jsonPaths[index] = + JsonPath.compile( + jsonField.getFields().values().toArray(new String[] {})[index]); + } + } +} diff --git a/seatunnel-connectors-v2/connector-http/connector-http-plus/src/main/java/org/apache/seatunnel/plus/source/QimenRequestHandler.java b/seatunnel-connectors-v2/connector-http/connector-http-plus/src/main/java/org/apache/seatunnel/plus/source/QimenRequestHandler.java new file mode 100644 index 00000000000..a6413944883 --- /dev/null +++ b/seatunnel-connectors-v2/connector-http/connector-http-plus/src/main/java/org/apache/seatunnel/plus/source/QimenRequestHandler.java @@ -0,0 +1,52 @@ +package org.apache.seatunnel.plus.source; + +import org.apache.seatunnel.connectors.seatunnel.http.config.HttpParameter; +import org.apache.seatunnel.shade.com.typesafe.config.Config; + +import javax.crypto.Mac; +import javax.crypto.spec.SecretKeySpec; + +import java.io.IOException; +import java.security.GeneralSecurityException; +import java.security.MessageDigest; +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.util.Arrays; +import java.util.Date; +import java.util.Map; + +import static org.apache.seatunnel.plus.source.ApiTest.CHARSET_UTF8; +import static org.apache.seatunnel.plus.source.ApiTest.buildQuery; +import static org.apache.seatunnel.plus.source.ApiTest.signTopRequest; + +public class QimenRequestHandler extends AbstractRequestHandler{ + private static final String SIGN_METHOD_HMAC="md5"; + @Override + public void request(Config pluginConfig, HttpParameter httpParameter) { + try { + httpParameter.getParams().put("method", "jushuitan.order.list.query"); + httpParameter.getParams().put("app_key", "34289603"); + //params.put("session", sessionKey); + DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + httpParameter.getParams().put("timestamp", df.format(new Date())); + httpParameter.getParams().put("format", "json"); + httpParameter.getParams().put("v", "2.0"); + httpParameter.getParams().put("sign_method", "hmac"); + httpParameter.getParams().put("customer_id","11276573"); + httpParameter.getParams().put("target_app_key","23060081"); + httpParameter.getParams().put("start_time","2023-10-01"); + httpParameter.getParams().put("end_time","2023-10-07"); + httpParameter.getParams().put("page_size","100"); + // 业务参数 + //params.put("fields", "num_iid,title,nick,price,num"); + //params.put("num_iid", "123456789"); + // 签名参数 + httpParameter.getParams().put("sign", signTopRequest(httpParameter.getParams(), "015e246d33f2479453d0cb9f588df501", SIGN_METHOD_HMAC)); + } catch (IOException e) { + throw new RuntimeException(e); + } + + } + + +} diff --git a/seatunnel-connectors-v2/connector-http/connector-http-plus/src/main/java/org/apache/seatunnel/plus/source/config/HttpPlusSourceConfig.java b/seatunnel-connectors-v2/connector-http/connector-http-plus/src/main/java/org/apache/seatunnel/plus/source/config/HttpPlusSourceConfig.java new file mode 100644 index 00000000000..4c59bf0e45c --- /dev/null +++ b/seatunnel-connectors-v2/connector-http/connector-http-plus/src/main/java/org/apache/seatunnel/plus/source/config/HttpPlusSourceConfig.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.plus.source.config; + +import org.apache.seatunnel.api.configuration.Option; +import org.apache.seatunnel.api.configuration.Options; +import org.apache.seatunnel.connectors.seatunnel.http.config.HttpConfig; + +public class HttpPlusSourceConfig extends HttpConfig { + + + public static final Option ACCESS_TOKEN = + Options.key("access_token") + .stringType() + .noDefaultValue() + .withDescription("Github access_token"); +} diff --git a/seatunnel-connectors-v2/connector-http/connector-http-plus/src/main/java/org/apache/seatunnel/plus/source/config/HttpPlusSourceParameter.java b/seatunnel-connectors-v2/connector-http/connector-http-plus/src/main/java/org/apache/seatunnel/plus/source/config/HttpPlusSourceParameter.java new file mode 100644 index 00000000000..9dff0266e01 --- /dev/null +++ b/seatunnel-connectors-v2/connector-http/connector-http-plus/src/main/java/org/apache/seatunnel/plus/source/config/HttpPlusSourceParameter.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.plus.source.config; + +import org.apache.seatunnel.connectors.seatunnel.http.config.HttpParameter; +import org.apache.seatunnel.shade.com.typesafe.config.Config; + +import java.util.HashMap; +import java.util.Optional; +import java.util.function.Predicate; +import java.util.stream.Stream; + +public class HttpPlusSourceParameter extends HttpParameter { + +// @Override +// public void buildWithConfig(Config pluginConfig) { +// super.buildWithConfig(pluginConfig); +// headers = Optional.ofNullable(getHeaders()).orElse(new HashMap<>()); +// setHeaders(headers); +// } + + +} diff --git a/seatunnel-connectors-v2/connector-http/pom.xml b/seatunnel-connectors-v2/connector-http/pom.xml index 170d37f5c28..79c2c15a4f4 100644 --- a/seatunnel-connectors-v2/connector-http/pom.xml +++ b/seatunnel-connectors-v2/connector-http/pom.xml @@ -41,5 +41,6 @@ connector-http-github connector-http-notion connector-http-persistiq + connector-http-plus diff --git a/seatunnel-examples/seatunnel-engine-examples/pom.xml b/seatunnel-examples/seatunnel-engine-examples/pom.xml index 08054d9a077..9135f38330f 100644 --- a/seatunnel-examples/seatunnel-engine-examples/pom.xml +++ b/seatunnel-examples/seatunnel-engine-examples/pom.xml @@ -61,5 +61,10 @@ connector-console ${project.version} + + org.apache.seatunnel + connector-http-plus + 2.3.2-SNAPSHOT + diff --git a/seatunnel-examples/seatunnel-engine-examples/src/main/java/org/apache/seatunnel/example/engine/SeaTunnelEngineExample.java b/seatunnel-examples/seatunnel-engine-examples/src/main/java/org/apache/seatunnel/example/engine/SeaTunnelEngineExample.java index 2a7c25e0830..124b819c3b6 100644 --- a/seatunnel-examples/seatunnel-engine-examples/src/main/java/org/apache/seatunnel/example/engine/SeaTunnelEngineExample.java +++ b/seatunnel-examples/seatunnel-engine-examples/src/main/java/org/apache/seatunnel/example/engine/SeaTunnelEngineExample.java @@ -31,7 +31,7 @@ public class SeaTunnelEngineExample { public static void main(String[] args) throws FileNotFoundException, URISyntaxException, CommandException { - String configurePath = args.length > 0 ? args[0] : "/examples/fake_to_console.conf"; + String configurePath = args.length > 0 ? args[0] : "/examples/http_plus.conf"; String configFile = getTestConfigFile(configurePath); ClientCommandArgs clientCommandArgs = new ClientCommandArgs(); clientCommandArgs.setConfigFile(configFile); diff --git a/seatunnel-examples/seatunnel-engine-examples/src/main/resources/examples/fake_to_console.conf b/seatunnel-examples/seatunnel-engine-examples/src/main/resources/examples/fake_to_console.conf index 5e50c5d87ed..7d241a22f18 100644 --- a/seatunnel-examples/seatunnel-engine-examples/src/main/resources/examples/fake_to_console.conf +++ b/seatunnel-examples/seatunnel-engine-examples/src/main/resources/examples/fake_to_console.conf @@ -20,7 +20,7 @@ env { # You can set engine configuration here - execution.parallelism = 1 + execution.parallelism = 2 job.mode = "BATCH" checkpoint.interval = 5000 #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint" @@ -30,7 +30,7 @@ source { # This is a example source plugin **only for test and demonstrate the feature source plugin** FakeSource { result_table_name = "fake" - parallelism = 1 + #parallelism = 1 schema = { fields { name = "string" @@ -41,7 +41,7 @@ source { FakeSource { result_table_name = "fake" - parallelism = 1 + parallelism = 2 schema = { fields { name = "string" diff --git a/seatunnel-examples/seatunnel-engine-examples/src/main/resources/examples/http_plus.conf b/seatunnel-examples/seatunnel-engine-examples/src/main/resources/examples/http_plus.conf new file mode 100644 index 00000000000..9e4994ca1e3 --- /dev/null +++ b/seatunnel-examples/seatunnel-engine-examples/src/main/resources/examples/http_plus.conf @@ -0,0 +1,65 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +env { + # You can set engine configuration here + execution.parallelism = 1 + job.mode = "BATCH" + checkpoint.interval = 5000 + #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint" +} + +source { + # This is a example source plugin **only for test and demonstrate the feature source plugin** + Plus { + result_table_name = "http" + url = "http://a1q40taq0j.api.taobao.com/router/qm" + method = "GET" + format = "json" + json_field = { + name = "$.data[*].name" + age = "$.data[*].age" + } + pageing={ + total_page_size=2 + page_field=page_index + #when don't know the total_page_size use batch_size if read size