From 874f5e4673c3d38fcdd9a5bea5fdd6d878445460 Mon Sep 17 00:00:00 2001 From: AirToSupply Date: Tue, 23 Nov 2021 16:39:28 +0800 Subject: [PATCH] support spark 3.1.x --- .gitignore | 2 + README.md | 9 +- binlog-common/pom.xml | 4 +- .../org/apache/spark/streaming/RawEvent.java | 2 +- .../binlog/common/CommonSourceOffset.scala | 5 +- hbase-wal/pom.xml | 4 +- .../hbase/MLSQLHBaseWALDataSource.scala | 2 +- .../sources/hbase/wal/HBaseWALClient.scala | 5 +- .../wal/HBaseWALSocketServerInExecutor.scala | 18 +++- ...pache.spark.sql.sources.DataSourceRegister | 1 + mysql-binlog/pom.xml | 9 +- .../mlsql/sources/MLSQLBinLogDataSource.scala | 83 +++++++++++-------- .../binlog/BinLogSocketServerInExecutor.scala | 2 +- .../sources/mysql/binlog/MySQLCDCUtils.java | 16 +++- .../sources/mysql/binlog/RawBinlogEvent.java | 2 +- ...pache.spark.sql.sources.DataSourceRegister | 1 + .../mlsql/test/binlogserver/BinlogSuite.scala | 11 ++- pom.xml | 25 ++++-- 18 files changed, 128 insertions(+), 73 deletions(-) create mode 100644 hbase-wal/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister create mode 100644 mysql-binlog/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister diff --git a/.gitignore b/.gitignore index 22e7547..5184dc6 100644 --- a/.gitignore +++ b/.gitignore @@ -2,3 +2,5 @@ target/ .idea/ spark-binlog.iml release.sh +*.iml +*/*.iml diff --git a/README.md b/README.md index 6559846..2ed26b4 100644 --- a/README.md +++ b/README.md @@ -23,16 +23,16 @@ MySQL Binlog: ``` groupId: tech.mlsql -artifactId: mysql-binlog_2.11 -version: 1.0.4 +artifactId: mysql-binlog_2.12 +version: 1.0.5 ``` HBase WAL: ``` groupId: tech.mlsql -artifactId: hbase-wal_2.11 -version: 1.0.4 +artifactId: hbase-wal_2.12 +version: 1.0.5 ``` ## Limitation @@ -355,6 +355,7 @@ object Main{ ``` +If you try the above code, it doesn't work! You need to check whether MySQL connector Java dependency (mysql-connector-java) is introduced. diff --git a/binlog-common/pom.xml b/binlog-common/pom.xml index 5f88220..3477d2c 100644 --- a/binlog-common/pom.xml +++ b/binlog-common/pom.xml @@ -3,9 +3,9 @@ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - spark-binlog_2.11 + spark-binlog_2.12 tech.mlsql - 1.0.4 + 1.0.5 4.0.0 diff --git a/binlog-common/src/main/java/org/apache/spark/streaming/RawEvent.java b/binlog-common/src/main/java/org/apache/spark/streaming/RawEvent.java index fe18d03..f3b6ce1 100644 --- a/binlog-common/src/main/java/org/apache/spark/streaming/RawEvent.java +++ b/binlog-common/src/main/java/org/apache/spark/streaming/RawEvent.java @@ -1,6 +1,6 @@ package org.apache.spark.streaming; -import org.apache.spark.sql.sources.v2.reader.streaming.Offset; +import org.apache.spark.sql.execution.streaming.Offset; import java.io.Serializable; diff --git a/binlog-common/src/main/java/tech/mlsql/binlog/common/CommonSourceOffset.scala b/binlog-common/src/main/java/tech/mlsql/binlog/common/CommonSourceOffset.scala index 41edd7a..1ab3d29 100644 --- a/binlog-common/src/main/java/tech/mlsql/binlog/common/CommonSourceOffset.scala +++ b/binlog-common/src/main/java/tech/mlsql/binlog/common/CommonSourceOffset.scala @@ -1,7 +1,8 @@ package tech.mlsql.binlog.common -import org.apache.spark.sql.execution.streaming.{Offset, SerializedOffset} -import org.apache.spark.sql.sources.v2.reader.streaming.{Offset => OffsetV2} +import org.apache.spark.sql.connector.read.streaming.Offset +import org.apache.spark.sql.execution.streaming.SerializedOffset +import org.apache.spark.sql.execution.streaming.{Offset => OffsetV2} import org.json4s.NoTypeHints import org.json4s.jackson.Serialization diff --git a/hbase-wal/pom.xml b/hbase-wal/pom.xml index 327df9a..37f5f43 100644 --- a/hbase-wal/pom.xml +++ b/hbase-wal/pom.xml @@ -3,9 +3,9 @@ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - spark-binlog_2.11 + spark-binlog_2.12 tech.mlsql - 1.0.4 + 1.0.5 4.0.0 diff --git a/hbase-wal/src/main/java/org/apache/spark/sql/mlsql/sources/hbase/MLSQLHBaseWALDataSource.scala b/hbase-wal/src/main/java/org/apache/spark/sql/mlsql/sources/hbase/MLSQLHBaseWALDataSource.scala index 8f78739..8868b0f 100644 --- a/hbase-wal/src/main/java/org/apache/spark/sql/mlsql/sources/hbase/MLSQLHBaseWALDataSource.scala +++ b/hbase-wal/src/main/java/org/apache/spark/sql/mlsql/sources/hbase/MLSQLHBaseWALDataSource.scala @@ -133,7 +133,7 @@ case class MLSQLHBaseWAlSource(hostAndPort: ReportHostAndPort, spark: SparkSessi if (content(0) == 'v') { val indexOfNewLine = content.indexOf("\n") if (indexOfNewLine > 0) { - val version = parseVersion(content.substring(0, indexOfNewLine), VERSION) + val version = validateVersion(content.substring(0, indexOfNewLine), VERSION) CommonSourceOffset(SerializedOffset(content.substring(indexOfNewLine + 1))) } else { throw new IllegalStateException( diff --git a/hbase-wal/src/main/java/org/apache/spark/sql/mlsql/sources/hbase/wal/HBaseWALClient.scala b/hbase-wal/src/main/java/org/apache/spark/sql/mlsql/sources/hbase/wal/HBaseWALClient.scala index 9325d19..757bb24 100644 --- a/hbase-wal/src/main/java/org/apache/spark/sql/mlsql/sources/hbase/wal/HBaseWALClient.scala +++ b/hbase-wal/src/main/java/org/apache/spark/sql/mlsql/sources/hbase/wal/HBaseWALClient.scala @@ -4,17 +4,16 @@ import java.io.EOFException import java.nio.charset.Charset import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicBoolean - import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hadoop.hbase.client.{Delete, Put} import org.apache.hadoop.hbase.wal.{WAL, WALEdit, WALFactory} import org.apache.hadoop.hbase.{Cell, CellUtil} import org.apache.spark.sql.execution.streaming.LongOffset -import org.apache.spark.sql.sources.v2.reader.streaming.Offset +import org.apache.spark.sql.execution.streaming.Offset import org.apache.spark.streaming.RawEvent -import org.spark_project.guava.cache.{CacheBuilder, CacheLoader, LoadingCache} import tech.mlsql.binlog.common.HDFSContext +import tech.mlsql.common.utils.cache.{CacheBuilder, CacheLoader, LoadingCache} import tech.mlsql.common.utils.log.Logging import scala.collection.JavaConverters._ diff --git a/hbase-wal/src/main/java/org/apache/spark/sql/mlsql/sources/hbase/wal/HBaseWALSocketServerInExecutor.scala b/hbase-wal/src/main/java/org/apache/spark/sql/mlsql/sources/hbase/wal/HBaseWALSocketServerInExecutor.scala index ffe3102..8a4d513 100644 --- a/hbase-wal/src/main/java/org/apache/spark/sql/mlsql/sources/hbase/wal/HBaseWALSocketServerInExecutor.scala +++ b/hbase-wal/src/main/java/org/apache/spark/sql/mlsql/sources/hbase/wal/HBaseWALSocketServerInExecutor.scala @@ -4,10 +4,9 @@ import java.io.{DataInputStream, DataOutputStream} import java.util import java.util.concurrent.atomic.AtomicReference import java.util.regex.Pattern - import org.apache.hadoop.conf.Configuration import org.apache.spark.SparkEnv -import org.apache.spark.sql.execution.streaming.{LongOffset, Offset} +import org.apache.spark.sql.execution.streaming.{LongOffset, Offset, SerializedOffset} import org.apache.spark.sql.mlsql.sources.hbase.wal.io.{DeleteWriter, PutWriter} import org.apache.spark.streaming.RawEvent import tech.mlsql.binlog.common.OriginalSourceServerInExecutor @@ -83,6 +82,17 @@ class HBaseWALSocketServerInExecutor[T](taskContextRef: AtomicReference[T], chec jsonList } + /** + * Convert generic Offset to LongOffset if possible + * Note: Since spark 3.1 started, the object class of LongOffset removed the convert method and added this method for code consistency + * @return converted LongOffset + */ + def convert(offset: Offset): Option[LongOffset] = offset match { + case lo: LongOffset => Some(lo) + case so: SerializedOffset => Some(LongOffset(so)) + case _ => None + } + override def process(dIn: DataInputStream, dOut: DataOutputStream): Unit = { client.readRequest(dIn) match { case _: NooopsRequest => @@ -93,7 +103,7 @@ class HBaseWALSocketServerInExecutor[T](taskContextRef: AtomicReference[T], chec flushAheadLog } val offsets = committedOffsets.asScala. - map(f => (f._1, (LongOffset.convert(f._2).get.offset+1).toString)) + map(f => (f._1, (convert(f._2).get.offset+1).toString)) client.sendResponse(dOut, OffsetResponse(offsets.toMap)) case RequestData(name, startOffset, endOffset) => try { @@ -180,7 +190,7 @@ class HBaseWALSocketServerInExecutor[T](taskContextRef: AtomicReference[T], chec require(a != null || b != null, "two offsets should not be null at the same time ") if (a == null) true else if (b == null) false - else LongOffset.convert(a).get.offset < LongOffset.convert(b).get.offset + else convert(a).get.offset < convert(b).get.offset } } diff --git a/hbase-wal/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister b/hbase-wal/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister new file mode 100644 index 0000000..657d629 --- /dev/null +++ b/hbase-wal/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister @@ -0,0 +1 @@ +org.apache.spark.sql.mlsql.sources.hbase.MLSQLHBaseWALDataSource \ No newline at end of file diff --git a/mysql-binlog/pom.xml b/mysql-binlog/pom.xml index 96dad67..ba40641 100644 --- a/mysql-binlog/pom.xml +++ b/mysql-binlog/pom.xml @@ -3,9 +3,9 @@ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - spark-binlog_2.11 + spark-binlog_2.12 tech.mlsql - 1.0.4 + 1.0.5 4.0.0 @@ -27,6 +27,11 @@ mysql-binlog-connector-java ${binlog.version} + + joda-time + joda-time + 2.9.9 + diff --git a/mysql-binlog/src/main/java/org/apache/spark/sql/mlsql/sources/MLSQLBinLogDataSource.scala b/mysql-binlog/src/main/java/org/apache/spark/sql/mlsql/sources/MLSQLBinLogDataSource.scala index 64ab2c9..ade69c3 100644 --- a/mysql-binlog/src/main/java/org/apache/spark/sql/mlsql/sources/MLSQLBinLogDataSource.scala +++ b/mysql-binlog/src/main/java/org/apache/spark/sql/mlsql/sources/MLSQLBinLogDataSource.scala @@ -9,6 +9,7 @@ import java.util.{Locale, UUID} import com.github.shyiko.mysql.binlog.network.ServerException import org.apache.commons.io.IOUtils +import org.apache.hadoop.fs.Path import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap @@ -23,7 +24,6 @@ import org.apache.spark.{SparkEnv, TaskContext} import tech.mlsql.common.utils.hdfs.HDFSOperator import tech.mlsql.common.utils.path.PathFun - /** * This Datasource is used to consume MySQL binlog. Not support MariaDB yet because the connector we are using is * lack of the ability. @@ -109,24 +109,31 @@ class MLSQLBinLogDataSource extends StreamSourceProvider with DataSourceRegister val checkPointDir = metadataPath.stripSuffix("/").split("/"). dropRight(2).mkString("/") - def getOffsetFromCk = { - val offsetPath = PathFun(checkPointDir). - add("offsets"). - toPath + def getOffsetFromCk: Option[Long] = { + // [bugfix] checkpointLocation:offsets is not a valid DFS filename. reason: File.pathSeparator is ':' or ';' + // val offsetPath = PathFun(checkPointDir).add("offsets").toPath + val offsetPath = new Path(checkPointDir.stripSuffix(Path.SEPARATOR), "offsets").toString + logInfo(s"from checkpoint accquire offsetPath: ${offsetPath}") - val lastFile = HDFSOperator.listFiles(offsetPath) + val files = HDFSOperator.listFiles(offsetPath) + if (files.isEmpty) { + logInfo(s"OffsetPath: ${offsetPath} checkpoint not found!") + return None + } + val lastFile = files .filterNot(f => f.getPath.getName.endsWith(".tmp.crc") || f.getPath.getName.endsWith(".tmp")) - .map { fileName => - (fileName.getPath.getName.split("/").last.toInt, fileName.getPath) - } - .sortBy(f => f._1).last._2 - + .map { fileName => (fileName.getPath.getName.split("/").last.toInt, fileName.getPath) } + .sortBy(f => f._1) + .last._2 val content = HDFSOperator.readFile(lastFile.toString) - content.split("\n").last.toLong + Some(content.split("\n").last.toLong) } val offsetFromCk = try { - Option(LongOffset(getOffsetFromCk)) + getOffsetFromCk match { + case Some(checkponit) => Option(LongOffset(checkponit)) + case _ => None + } } catch { case e: Exception => logError(e.getMessage, e) @@ -181,31 +188,32 @@ class MLSQLBinLogDataSource extends StreamSourceProvider with DataSourceRegister ex.printStackTrace() } - def sendStopBinlogServerRequest = { - // send signal to stop server - val socket2 = new Socket(executorBinlogServer.host, executorBinlogServer.port) - val dout2 = new DataOutputStream(socket2.getOutputStream) - BinLogSocketServerCommand.sendRequest(dout2, - ShutdownBinlogServer()) - socket2.close() - } - + /** + * Add callback logic code about closing binlog server when spark task goes wrong. + */ TaskContext.get().addTaskFailureListener(new TaskFailureListener { override def onTaskFailure(context: TaskContext, error: Throwable): Unit = { taskContextRef.set(null) - sendStopBinlogServerRequest - + val socket = new Socket(executorBinlogServer.host, executorBinlogServer.port) + val out = new DataOutputStream(socket.getOutputStream) + BinLogSocketServerCommand.sendRequest(out, ShutdownBinlogServer()) + socket.close() } }) + /** + * Add callback logic code about closing binlog server when spark task has been done. + */ TaskContext.get().addTaskCompletionListener(new TaskCompletionListener { override def onTaskCompletion(context: TaskContext): Unit = { taskContextRef.set(null) - sendStopBinlogServerRequest + val socket = new Socket(executorBinlogServer.host, executorBinlogServer.port) + val out = new DataOutputStream(socket.getOutputStream) + BinLogSocketServerCommand.sendRequest(out, ShutdownBinlogServer()) + socket.close() } }) - val socket = new Socket(tempSocketServerHost, tempSocketServerPort) val dout = new DataOutputStream(socket.getOutputStream) BinLogSocketServerCommand.sendRequest(dout, @@ -252,7 +260,7 @@ class MLSQLBinLogDataSource extends StreamSourceProvider with DataSourceRegister MLSQLBinLogSource(executorBinlogServer, sqlContext.sparkSession, metadataPath, finalStartingOffsets, parameters ++ Map("binlogServerId" -> binlogServerId)) } - override def shortName(): String = "mysql-binglog" + override def shortName(): String = "mysql-binlog" } /** @@ -317,7 +325,7 @@ case class MLSQLBinLogSource(executorBinlogServer: ExecutorBinlogServer, if (content(0) == 'v') { val indexOfNewLine = content.indexOf("\n") if (indexOfNewLine > 0) { - val version = parseVersion(content.substring(0, indexOfNewLine), VERSION) + val version = validateVersion(content.substring(0, indexOfNewLine), VERSION) LongOffset(SerializedOffset(content.substring(indexOfNewLine + 1))) } else { throw new IllegalStateException( @@ -348,6 +356,17 @@ case class MLSQLBinLogSource(executorBinlogServer: ExecutorBinlogServer, LongOffset(response.currentOffset) } + /** + * Convert generic Offset to LongOffset if possible + * Note: Since spark 3.1 started, the object class of LongOffset removed the convert method and added this method for code consistency + * @return converted LongOffset + */ + def convert(offset: Offset): Option[LongOffset] = offset match { + case lo: LongOffset => Some(lo) + case so: SerializedOffset => Some(LongOffset(so)) + case _ => None + } + override def getOffset: Option[Offset] = { synchronized { if (initialized.compareAndSet(false, true)) { @@ -368,7 +387,7 @@ case class MLSQLBinLogSource(executorBinlogServer: ExecutorBinlogServer, initialPartitionOffsets - val untilPartitionOffsets = LongOffset.convert(end) + val untilPartitionOffsets = convert(end) // On recovery, getBatch will get called before getOffset if (currentPartitionOffsets.isEmpty) { @@ -383,10 +402,8 @@ case class MLSQLBinLogSource(executorBinlogServer: ExecutorBinlogServer, // once we have changed checkpoint path, then we can start from provided starting offset. // In normal case, we will recover the start from checkpoint offset directory val fromPartitionOffsets = start match { - case Some(prevBatchEndOffset) => - LongOffset.convert(prevBatchEndOffset) - case None => - Some(initialPartitionOffsets) + case Some(prevBatchEndOffset) => convert(prevBatchEndOffset) + case None => Some(initialPartitionOffsets) } val executorBinlogServerCopy = executorBinlogServer.copy() diff --git a/mysql-binlog/src/main/java/org/apache/spark/sql/mlsql/sources/mysql/binlog/BinLogSocketServerInExecutor.scala b/mysql-binlog/src/main/java/org/apache/spark/sql/mlsql/sources/mysql/binlog/BinLogSocketServerInExecutor.scala index 745a8fd..e3cfe02 100644 --- a/mysql-binlog/src/main/java/org/apache/spark/sql/mlsql/sources/mysql/binlog/BinLogSocketServerInExecutor.scala +++ b/mysql-binlog/src/main/java/org/apache/spark/sql/mlsql/sources/mysql/binlog/BinLogSocketServerInExecutor.scala @@ -180,7 +180,7 @@ class BinLogSocketServerInExecutor[T](taskContextRef: AtomicReference[T], val eventDeserializer = new EventDeserializer() eventDeserializer.setCompatibilityMode( - //EventDeserializer.CompatibilityMode.DATE_AND_TIME_AS_LONG, + //EventDeserializer.CompatibilityMode.DATE_AND_TIME_AS_LONG_MICRO, EventDeserializer.CompatibilityMode.CHAR_AND_BINARY_AS_BYTE_ARRAY //EventDeserializer.CompatibilityMode.INVALID_DATE_AND_TIME_AS_MIN_VALUE ) diff --git a/mysql-binlog/src/main/java/org/apache/spark/sql/mlsql/sources/mysql/binlog/MySQLCDCUtils.java b/mysql-binlog/src/main/java/org/apache/spark/sql/mlsql/sources/mysql/binlog/MySQLCDCUtils.java index a185de8..d4c52f3 100644 --- a/mysql-binlog/src/main/java/org/apache/spark/sql/mlsql/sources/mysql/binlog/MySQLCDCUtils.java +++ b/mysql-binlog/src/main/java/org/apache/spark/sql/mlsql/sources/mysql/binlog/MySQLCDCUtils.java @@ -9,6 +9,7 @@ import com.fasterxml.jackson.databind.JsonSerializer; import com.fasterxml.jackson.databind.SerializerProvider; import org.apache.commons.lang3.time.FastDateFormat; +import org.apache.spark.sql.catalyst.json.JSONOptions; import org.apache.spark.sql.mlsql.sources.mysql.binlog.io.SchemaTool; import java.io.IOException; @@ -17,6 +18,7 @@ import java.time.ZoneId; import java.util.ArrayList; import java.util.BitSet; +import java.util.Locale; import java.util.TimeZone; /** @@ -56,7 +58,9 @@ public static Object getWritableObject(SchemaTool schemaTool, Serializable value if (schemaTool.isTimestamp() && value instanceof java.sql.Timestamp) { java.sql.Timestamp item = (java.sql.Timestamp) value; - FastDateFormat timestampFormat = JsonOptions.options(schemaTool.timeZone()).timestampFormat(); + JSONOptions options = JsonOptions.options(schemaTool.timeZone()); + FastDateFormat timestampFormat = FastDateFormat.getInstance(options.timestampFormat(), + TimeZone.getTimeZone(options.zoneId()), options.locale()); return timestampFormat.format(item); } @@ -64,19 +68,23 @@ public static Object getWritableObject(SchemaTool schemaTool, Serializable value if (schemaTool.isTimestamp() && value instanceof java.util.Date) { org.joda.time.DateTime item = new org.joda.time.DateTime((java.util.Date) value); int offset = TimeZone.getTimeZone(ZoneId.of(schemaTool.timeZone())).getRawOffset(); - FastDateFormat timestampFormat = JsonOptions.options(schemaTool.timeZone()).timestampFormat(); + JSONOptions options = JsonOptions.options(schemaTool.timeZone()); + FastDateFormat timestampFormat = FastDateFormat.getInstance(options.timestampFormat(), + TimeZone.getTimeZone(options.zoneId()), options.locale()); return timestampFormat.format(item.minusMillis(offset).toDate()); } if (schemaTool.isDate() && value instanceof java.sql.Date) { java.sql.Date item = (java.sql.Date) value; - FastDateFormat dateFormat = JsonOptions.options(schemaTool.timeZone()).dateFormat(); + JSONOptions options = JsonOptions.options(schemaTool.timeZone()); + FastDateFormat dateFormat = FastDateFormat.getInstance(options.dateFormat(), options.locale()); return dateFormat.format(item); } if (schemaTool.isDate() && value instanceof java.util.Date) { java.util.Date item = (java.util.Date) value; - FastDateFormat dateFormat = JsonOptions.options(schemaTool.timeZone()).dateFormat(); + JSONOptions options = JsonOptions.options(schemaTool.timeZone()); + FastDateFormat dateFormat = FastDateFormat.getInstance(options.dateFormat(), options.locale()); return dateFormat.format(item); } diff --git a/mysql-binlog/src/main/java/org/apache/spark/sql/mlsql/sources/mysql/binlog/RawBinlogEvent.java b/mysql-binlog/src/main/java/org/apache/spark/sql/mlsql/sources/mysql/binlog/RawBinlogEvent.java index f561afd..de0408a 100644 --- a/mysql-binlog/src/main/java/org/apache/spark/sql/mlsql/sources/mysql/binlog/RawBinlogEvent.java +++ b/mysql-binlog/src/main/java/org/apache/spark/sql/mlsql/sources/mysql/binlog/RawBinlogEvent.java @@ -3,7 +3,7 @@ import com.github.shyiko.mysql.binlog.event.Event; import com.github.shyiko.mysql.binlog.event.EventHeaderV4; import org.apache.spark.sql.execution.streaming.LongOffset; -import org.apache.spark.sql.sources.v2.reader.streaming.Offset; +import org.apache.spark.sql.execution.streaming.Offset; import org.apache.spark.streaming.RawEvent; public class RawBinlogEvent implements RawEvent { diff --git a/mysql-binlog/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister b/mysql-binlog/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister new file mode 100644 index 0000000..a0cad21 --- /dev/null +++ b/mysql-binlog/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister @@ -0,0 +1 @@ +org.apache.spark.sql.mlsql.sources.MLSQLBinLogDataSource \ No newline at end of file diff --git a/mysql-binlog/src/test/java/tech/mlsql/test/binlogserver/BinlogSuite.scala b/mysql-binlog/src/test/java/tech/mlsql/test/binlogserver/BinlogSuite.scala index 8b04128..f95cc60 100644 --- a/mysql-binlog/src/test/java/tech/mlsql/test/binlogserver/BinlogSuite.scala +++ b/mysql-binlog/src/test/java/tech/mlsql/test/binlogserver/BinlogSuite.scala @@ -3,10 +3,10 @@ package tech.mlsql.test.binlogserver import java.io.File import java.sql.{ResultSet, SQLException, Statement} import java.util.TimeZone - import net.sf.json.JSONObject import org.apache.spark.sql.catalyst.expressions.AttributeReference import org.apache.spark.sql.catalyst.parser.LegacyTypeStringParser +import org.apache.spark.sql.connector.read.streaming.SparkDataStream import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.mlsql.sources.MLSQLBinLogDataSource import org.apache.spark.sql.mlsql.sources.mysql.binlog._ @@ -14,6 +14,7 @@ import org.apache.spark.sql.streaming._ import org.apache.spark.sql.streaming.util.StreamManualClock import org.apache.spark.sql.types.{DataType, StringType, StructField, StructType} import org.apache.spark.sql.{DataSetHelper, SaveMode} +import org.scalatest.{Args, ConfigMap, Filter, Status, Suite, TestData} import org.scalatest.time.SpanSugar._ import tech.mlsql.common.utils.lang.sc.ScalaReflect @@ -99,13 +100,11 @@ trait BaseBinlogTest extends StreamTest { .option("driver", s"com.mysql.jdbc.Driver") .option("dbtable", s"script_file").load().write.format(delta).mode(SaveMode.Overwrite).save(path) } - } - -class BinlogSuite extends BaseBinlogTest with BinLogSocketServerSerDer { +trait BinlogSuite extends BaseBinlogTest with BinLogSocketServerSerDer { def deserializeSchema(json: String): StructType = { - Try(DataType.fromJson(json)).getOrElse(LegacyTypeStringParser.parse(json)) match { + Try(DataType.fromJson(json)).getOrElse(LegacyTypeStringParser.parseString(json)) match { case t: StructType => t case _ => throw new RuntimeException(s"Failed parsing StructType: $json") } @@ -114,7 +113,7 @@ class BinlogSuite extends BaseBinlogTest with BinLogSocketServerSerDer { object TriggerData { def apply(source: Source, f: () => Unit) = { new TriggerData(source) { - override def addData(query: Option[StreamExecution]): (BaseStreamingSource, Offset) = { + override def addData(query: Option[StreamExecution]): (SparkDataStream, Offset) = { f() (source, source.getOffset.get) } diff --git a/pom.xml b/pom.xml index 3960ef6..155a664 100644 --- a/pom.xml +++ b/pom.xml @@ -5,9 +5,9 @@ 4.0.0 tech.mlsql - spark-binlog_2.11 + spark-binlog_2.12 pom - 1.0.4 + 1.0.5 binlog-common mysql-binlog @@ -50,7 +50,7 @@ UTF-8 2.11.8 - 2.11 + 2.12 2.11.0-M3 2.4.3 @@ -69,17 +69,22 @@ ${common-utils.version} - org.scalactic scalactic_${scala.binary.version} - 3.0.0 + 3.1.0 test org.scalatest scalatest_${scala.binary.version} - 3.0.0 + 3.1.0 + test + + + com.vladsch.flexmark + flexmark-all + 0.35.10 test @@ -181,7 +186,13 @@ 3.0 - + + spark-3.1.x + + 3.1.1 + 3.1 + + disable-java8-doclint