Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support spark 3.1.x #35

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,5 @@ target/
.idea/
spark-binlog.iml
release.sh
*.iml
*/*.iml
9 changes: 5 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,16 @@ MySQL Binlog:

```
groupId: tech.mlsql
artifactId: mysql-binlog_2.11
version: 1.0.4
artifactId: mysql-binlog_2.12
version: 1.0.5
```

HBase WAL:

```
groupId: tech.mlsql
artifactId: hbase-wal_2.11
version: 1.0.4
artifactId: hbase-wal_2.12
version: 1.0.5
```

## Limitation
Expand Down Expand Up @@ -355,6 +355,7 @@ object Main{

```

If you try the above code, it doesn't work! You need to check whether MySQL connector Java dependency (mysql-connector-java) is introduced.



Expand Down
4 changes: 2 additions & 2 deletions binlog-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>spark-binlog_2.11</artifactId>
<artifactId>spark-binlog_2.12</artifactId>
<groupId>tech.mlsql</groupId>
<version>1.0.4</version>
<version>1.0.5</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package org.apache.spark.streaming;

import org.apache.spark.sql.sources.v2.reader.streaming.Offset;
import org.apache.spark.sql.execution.streaming.Offset;

import java.io.Serializable;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
package tech.mlsql.binlog.common

import org.apache.spark.sql.execution.streaming.{Offset, SerializedOffset}
import org.apache.spark.sql.sources.v2.reader.streaming.{Offset => OffsetV2}
import org.apache.spark.sql.connector.read.streaming.Offset
import org.apache.spark.sql.execution.streaming.SerializedOffset
import org.apache.spark.sql.execution.streaming.{Offset => OffsetV2}
import org.json4s.NoTypeHints
import org.json4s.jackson.Serialization

Expand Down
4 changes: 2 additions & 2 deletions hbase-wal/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>spark-binlog_2.11</artifactId>
<artifactId>spark-binlog_2.12</artifactId>
<groupId>tech.mlsql</groupId>
<version>1.0.4</version>
<version>1.0.5</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<properties>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ case class MLSQLHBaseWAlSource(hostAndPort: ReportHostAndPort, spark: SparkSessi
if (content(0) == 'v') {
val indexOfNewLine = content.indexOf("\n")
if (indexOfNewLine > 0) {
val version = parseVersion(content.substring(0, indexOfNewLine), VERSION)
val version = validateVersion(content.substring(0, indexOfNewLine), VERSION)
CommonSourceOffset(SerializedOffset(content.substring(indexOfNewLine + 1)))
} else {
throw new IllegalStateException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,16 @@ import java.io.EOFException
import java.nio.charset.Charset
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicBoolean

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hbase.client.{Delete, Put}
import org.apache.hadoop.hbase.wal.{WAL, WALEdit, WALFactory}
import org.apache.hadoop.hbase.{Cell, CellUtil}
import org.apache.spark.sql.execution.streaming.LongOffset
import org.apache.spark.sql.sources.v2.reader.streaming.Offset
import org.apache.spark.sql.execution.streaming.Offset
import org.apache.spark.streaming.RawEvent
import org.spark_project.guava.cache.{CacheBuilder, CacheLoader, LoadingCache}
import tech.mlsql.binlog.common.HDFSContext
import tech.mlsql.common.utils.cache.{CacheBuilder, CacheLoader, LoadingCache}
import tech.mlsql.common.utils.log.Logging

import scala.collection.JavaConverters._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,9 @@ import java.io.{DataInputStream, DataOutputStream}
import java.util
import java.util.concurrent.atomic.AtomicReference
import java.util.regex.Pattern

import org.apache.hadoop.conf.Configuration
import org.apache.spark.SparkEnv
import org.apache.spark.sql.execution.streaming.{LongOffset, Offset}
import org.apache.spark.sql.execution.streaming.{LongOffset, Offset, SerializedOffset}
import org.apache.spark.sql.mlsql.sources.hbase.wal.io.{DeleteWriter, PutWriter}
import org.apache.spark.streaming.RawEvent
import tech.mlsql.binlog.common.OriginalSourceServerInExecutor
Expand Down Expand Up @@ -83,6 +82,17 @@ class HBaseWALSocketServerInExecutor[T](taskContextRef: AtomicReference[T], chec
jsonList
}

/**
* Convert generic Offset to LongOffset if possible
* Note: Since spark 3.1 started, the object class of LongOffset removed the convert method and added this method for code consistency
* @return converted LongOffset
*/
def convert(offset: Offset): Option[LongOffset] = offset match {
case lo: LongOffset => Some(lo)
case so: SerializedOffset => Some(LongOffset(so))
case _ => None
}

override def process(dIn: DataInputStream, dOut: DataOutputStream): Unit = {
client.readRequest(dIn) match {
case _: NooopsRequest =>
Expand All @@ -93,7 +103,7 @@ class HBaseWALSocketServerInExecutor[T](taskContextRef: AtomicReference[T], chec
flushAheadLog
}
val offsets = committedOffsets.asScala.
map(f => (f._1, (LongOffset.convert(f._2).get.offset+1).toString))
map(f => (f._1, (convert(f._2).get.offset+1).toString))
client.sendResponse(dOut, OffsetResponse(offsets.toMap))
case RequestData(name, startOffset, endOffset) =>
try {
Expand Down Expand Up @@ -180,7 +190,7 @@ class HBaseWALSocketServerInExecutor[T](taskContextRef: AtomicReference[T], chec
require(a != null || b != null, "two offsets should not be null at the same time ")
if (a == null) true
else if (b == null) false
else LongOffset.convert(a).get.offset < LongOffset.convert(b).get.offset
else convert(a).get.offset < convert(b).get.offset
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
org.apache.spark.sql.mlsql.sources.hbase.MLSQLHBaseWALDataSource
9 changes: 7 additions & 2 deletions mysql-binlog/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>spark-binlog_2.11</artifactId>
<artifactId>spark-binlog_2.12</artifactId>
<groupId>tech.mlsql</groupId>
<version>1.0.4</version>
<version>1.0.5</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand All @@ -27,6 +27,11 @@
<artifactId>mysql-binlog-connector-java</artifactId>
<version>${binlog.version}</version>
</dependency>
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
<version>2.9.9</version>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import java.util.{Locale, UUID}

import com.github.shyiko.mysql.binlog.network.ServerException
import org.apache.commons.io.IOUtils
import org.apache.hadoop.fs.Path
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
Expand All @@ -23,7 +24,6 @@ import org.apache.spark.{SparkEnv, TaskContext}
import tech.mlsql.common.utils.hdfs.HDFSOperator
import tech.mlsql.common.utils.path.PathFun


/**
* This Datasource is used to consume MySQL binlog. Not support MariaDB yet because the connector we are using is
* lack of the ability.
Expand Down Expand Up @@ -109,24 +109,31 @@ class MLSQLBinLogDataSource extends StreamSourceProvider with DataSourceRegister
val checkPointDir = metadataPath.stripSuffix("/").split("/").
dropRight(2).mkString("/")

def getOffsetFromCk = {
val offsetPath = PathFun(checkPointDir).
add("offsets").
toPath
def getOffsetFromCk: Option[Long] = {
// [bugfix] checkpointLocation:offsets is not a valid DFS filename. reason: File.pathSeparator is ':' or ';'
// val offsetPath = PathFun(checkPointDir).add("offsets").toPath
val offsetPath = new Path(checkPointDir.stripSuffix(Path.SEPARATOR), "offsets").toString
logInfo(s"from checkpoint accquire offsetPath: ${offsetPath}")

val lastFile = HDFSOperator.listFiles(offsetPath)
val files = HDFSOperator.listFiles(offsetPath)
if (files.isEmpty) {
logInfo(s"OffsetPath: ${offsetPath} checkpoint not found!")
return None
}
val lastFile = files
.filterNot(f => f.getPath.getName.endsWith(".tmp.crc") || f.getPath.getName.endsWith(".tmp"))
.map { fileName =>
(fileName.getPath.getName.split("/").last.toInt, fileName.getPath)
}
.sortBy(f => f._1).last._2

.map { fileName => (fileName.getPath.getName.split("/").last.toInt, fileName.getPath) }
.sortBy(f => f._1)
.last._2
val content = HDFSOperator.readFile(lastFile.toString)
content.split("\n").last.toLong
Some(content.split("\n").last.toLong)
}

val offsetFromCk = try {
Option(LongOffset(getOffsetFromCk))
getOffsetFromCk match {
case Some(checkponit) => Option(LongOffset(checkponit))
case _ => None
}
} catch {
case e: Exception =>
logError(e.getMessage, e)
Expand Down Expand Up @@ -181,31 +188,32 @@ class MLSQLBinLogDataSource extends StreamSourceProvider with DataSourceRegister
ex.printStackTrace()
}

def sendStopBinlogServerRequest = {
// send signal to stop server
val socket2 = new Socket(executorBinlogServer.host, executorBinlogServer.port)
val dout2 = new DataOutputStream(socket2.getOutputStream)
BinLogSocketServerCommand.sendRequest(dout2,
ShutdownBinlogServer())
socket2.close()
}

/**
* Add callback logic code about closing binlog server when spark task goes wrong.
*/
TaskContext.get().addTaskFailureListener(new TaskFailureListener {
override def onTaskFailure(context: TaskContext, error: Throwable): Unit = {
taskContextRef.set(null)
sendStopBinlogServerRequest

val socket = new Socket(executorBinlogServer.host, executorBinlogServer.port)
val out = new DataOutputStream(socket.getOutputStream)
BinLogSocketServerCommand.sendRequest(out, ShutdownBinlogServer())
socket.close()
}
})

/**
* Add callback logic code about closing binlog server when spark task has been done.
*/
TaskContext.get().addTaskCompletionListener(new TaskCompletionListener {
override def onTaskCompletion(context: TaskContext): Unit = {
taskContextRef.set(null)
sendStopBinlogServerRequest
val socket = new Socket(executorBinlogServer.host, executorBinlogServer.port)
val out = new DataOutputStream(socket.getOutputStream)
BinLogSocketServerCommand.sendRequest(out, ShutdownBinlogServer())
socket.close()
}
})


val socket = new Socket(tempSocketServerHost, tempSocketServerPort)
val dout = new DataOutputStream(socket.getOutputStream)
BinLogSocketServerCommand.sendRequest(dout,
Expand Down Expand Up @@ -252,7 +260,7 @@ class MLSQLBinLogDataSource extends StreamSourceProvider with DataSourceRegister
MLSQLBinLogSource(executorBinlogServer, sqlContext.sparkSession, metadataPath, finalStartingOffsets, parameters ++ Map("binlogServerId" -> binlogServerId))
}

override def shortName(): String = "mysql-binglog"
override def shortName(): String = "mysql-binlog"
}

/**
Expand Down Expand Up @@ -317,7 +325,7 @@ case class MLSQLBinLogSource(executorBinlogServer: ExecutorBinlogServer,
if (content(0) == 'v') {
val indexOfNewLine = content.indexOf("\n")
if (indexOfNewLine > 0) {
val version = parseVersion(content.substring(0, indexOfNewLine), VERSION)
val version = validateVersion(content.substring(0, indexOfNewLine), VERSION)
LongOffset(SerializedOffset(content.substring(indexOfNewLine + 1)))
} else {
throw new IllegalStateException(
Expand Down Expand Up @@ -348,6 +356,17 @@ case class MLSQLBinLogSource(executorBinlogServer: ExecutorBinlogServer,
LongOffset(response.currentOffset)
}

/**
* Convert generic Offset to LongOffset if possible
* Note: Since spark 3.1 started, the object class of LongOffset removed the convert method and added this method for code consistency
* @return converted LongOffset
*/
def convert(offset: Offset): Option[LongOffset] = offset match {
case lo: LongOffset => Some(lo)
case so: SerializedOffset => Some(LongOffset(so))
case _ => None
}

override def getOffset: Option[Offset] = {
synchronized {
if (initialized.compareAndSet(false, true)) {
Expand All @@ -368,7 +387,7 @@ case class MLSQLBinLogSource(executorBinlogServer: ExecutorBinlogServer,

initialPartitionOffsets

val untilPartitionOffsets = LongOffset.convert(end)
val untilPartitionOffsets = convert(end)

// On recovery, getBatch will get called before getOffset
if (currentPartitionOffsets.isEmpty) {
Expand All @@ -383,10 +402,8 @@ case class MLSQLBinLogSource(executorBinlogServer: ExecutorBinlogServer,
// once we have changed checkpoint path, then we can start from provided starting offset.
// In normal case, we will recover the start from checkpoint offset directory
val fromPartitionOffsets = start match {
case Some(prevBatchEndOffset) =>
LongOffset.convert(prevBatchEndOffset)
case None =>
Some(initialPartitionOffsets)
case Some(prevBatchEndOffset) => convert(prevBatchEndOffset)
case None => Some(initialPartitionOffsets)
}

val executorBinlogServerCopy = executorBinlogServer.copy()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ class BinLogSocketServerInExecutor[T](taskContextRef: AtomicReference[T],

val eventDeserializer = new EventDeserializer()
eventDeserializer.setCompatibilityMode(
//EventDeserializer.CompatibilityMode.DATE_AND_TIME_AS_LONG,
//EventDeserializer.CompatibilityMode.DATE_AND_TIME_AS_LONG_MICRO,
EventDeserializer.CompatibilityMode.CHAR_AND_BINARY_AS_BYTE_ARRAY
//EventDeserializer.CompatibilityMode.INVALID_DATE_AND_TIME_AS_MIN_VALUE
)
Expand Down
Loading