Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mysql5.6.x 是不支持吗?在本地可以连,但在程序里连不了。 #32

Open
liangrui1988 opened this issue Dec 8, 2020 · 2 comments

Comments

@liangrui1988
Copy link

liangrui1988 commented Dec 8, 2020

show master status;
mysql-bin-changelog.139198 5292

telnet 10.40.0.109 3306 在本地也是可以进去的。
是什么原因呢?

20/12/08 15:51:13 WARN BinaryLogClient: Failed to establish connection in 1665ms. Forcing disconnect.
Exception in thread "connect mysql(10.40.0.109, 3306) " java.io.EOFException
at com.github.shyiko.mysql.binlog.io.ByteArrayInputStream.read(ByteArrayInputStream.java:190)
at com.github.shyiko.mysql.binlog.io.ByteArrayInputStream.readInteger(ByteArrayInputStream.java:46)
at com.github.shyiko.mysql.binlog.network.protocol.PacketChannel.read(PacketChannel.java:59)
at com.github.shyiko.mysql.binlog.BinaryLogClient.confirmSupportOfChecksum(BinaryLogClient.java:882)
at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:533)
at org.apache.spark.sql.mlsql.sources.mysql.binlog.BinLogSocketServerInExecutor.org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$_connectMySQL(BinLogSocketServerInExecutor.scala:261)
at org.apache.spark.sql.mlsql.sources.mysql.binlog.BinLogSocketServerInExecutor$$anon$1.run(BinLogSocketServerInExecutor.scala:289)

@liangrui1988
Copy link
Author

liangrui1988 commented Dec 8, 2020

` val db = "delta_db"
val table = "dlta_tab"

val df = spark.readStream.
  format("org.apache.spark.sql.mlsql.sources.MLSQLBinLogDataSource").
  option("host", host).
  option("port", prot).
  option("userName", user).
  option("password", passwd).
  option("bingLogNamePrefix", "mysql-bin-changelog").
  option("binlogIndex", "139198").
  option("binlogFileOffset", "5292").
  option("databaseNamePattern", db).
  option("tableNamePattern", table).
  load()`

批查询是可用的

val mysqlConf = Map(
      "url" -> url,
      "driver" -> "com.mysql.jdbc.Driver",
      "user" -> user,
      "password" -> passwd,
      "dbtable" -> "(SELECT * FROM dlta_tab limit 10) as dlta_tab_tmp"
    )
    import org.apache.spark.sql.functions.col
    var df = spark.read.format("jdbc").options(mysqlConf).load()
    df = df.repartitionByRange(2, col("id"))
    df.show()
    df.write
      .format("delta").
      mode("overwrite").

      save("/tmp/datahouse/delta/batch_dlta_tab")
    spark.close()

@liangrui1988
Copy link
Author

image
因为我们走的是代理连mysql,所以比较慢,用原始连接设了连接时长就可以了,请问这个参数可以在这里设吗?还是需要改原代码,问题比较多,用的很坚难呀

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant