Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

when binlog server is shutdown, the streming can not Fail fast #19

Open
zhengqiangtan opened this issue Dec 24, 2019 · 4 comments
Open
Labels
bug Something isn't working

Comments

@zhengqiangtan
Copy link

zhengqiangtan commented Dec 24, 2019

question:

when I modify the mysql data type from datetime to timestamp , I found the binlog server is shutdown , but the streaming program is still working , and no data can insert into delta table, why ?

image

and the error log as follow :

19/12/24 10:53:30 ERROR MicroBatchExecution: Query [id = b11c2596-8106-40b9-a8d3-e0117ffcf23e, runId = b469978d-ecff-4518-b62b-c545215742eb] terminated with error
org.apache.spark.sql.AnalysisException: Failed to merge fields 'activated_at' and 'activated_at'. Failed to merge incompatible data types TimestampType and DateType;;

@allwefantasy allwefantasy added the bug Something isn't working label Dec 24, 2019
@zhengqiangtan
Copy link
Author

The detailed error log is shown below

19/12/24 11:23:43 ERROR MicroBatchExecution: Query [id = b11c2596-8106-40b9-a8d3-e0117ffcf23e, runId = cea35ba0-a5bc-4c68-94d8-6223cb2cf2ed] terminated with error
org.apache.spark.sql.AnalysisException: Failed to merge fields 'activated_at' and 'activated_at'. Failed to merge incompatible data types TimestampType and DateType;;
at org.apache.spark.sql.delta.schema.SchemaUtils$$anonfun$18.apply(SchemaUtils.scala:666)
at org.apache.spark.sql.delta.schema.SchemaUtils$$anonfun$18.apply(SchemaUtils.scala:655)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
at org.apache.spark.sql.delta.schema.SchemaUtils$.org$apache$spark$sql$delta$schema$SchemaUtils$$merge$1(SchemaUtils.scala:655)
at org.apache.spark.sql.delta.schema.SchemaUtils$.mergeSchemas(SchemaUtils.scala:731)
at org.apache.spark.sql.delta.schema.ImplicitMetadataOperation$class.updateMetadata(ImplicitMetadataOperation.scala:60)
at org.apache.spark.sql.delta.commands.UpsertTableInDelta.updateMetadata(UpsertTableInDelta.scala:17)
at org.apache.spark.sql.delta.commands.UpsertTableInDelta$$anonfun$_run$4.apply(UpsertTableInDelta.scala:58)
at org.apache.spark.sql.delta.commands.UpsertTableInDelta$$anonfun$_run$4.apply(UpsertTableInDelta.scala:55)
at org.apache.spark.sql.delta.DeltaLog.withNewTransaction(DeltaLog.scala:388)
at org.apache.spark.sql.delta.commands.UpsertTableInDelta._run(UpsertTableInDelta.scala:55)
at org.apache.spark.sql.delta.commands.UpsertTableInDelta$$anonfun$run$1.apply$mcV$sp(UpsertTableInDelta.scala:31)
at tech.mlsql.common.DeltaJob$.runWithTry(DeltaJob.scala:17)
at org.apache.spark.sql.delta.commands.UpsertTableInDelta.run(UpsertTableInDelta.scala:30)
at org.apache.spark.sql.delta.commands.BinlogSyncToDelta$$anonfun$saveToSink$1$1.apply(BinlogSyncToDelta.scala:121)
at org.apache.spark.sql.delta.commands.BinlogSyncToDelta$$anonfun$saveToSink$1$1.apply(BinlogSyncToDelta.scala:82)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.immutable.Map$Map1.foreach(Map.scala:116)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.spark.sql.delta.commands.BinlogSyncToDelta$.saveToSink$1(BinlogSyncToDelta.scala:82)
at org.apache.spark.sql.delta.commands.BinlogSyncToDelta$.run(BinlogSyncToDelta.scala:132)
at org.apache.spark.sql.delta.sources.MLSQLDeltaSink.addBatch(MLSQLDeltaSink.scala:31)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$5$$anonfun$apply$17.apply(MicroBatchExecution.scala:537)
at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$5.apply(MicroBatchExecution.scala:535)
at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:534)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:198)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:166)
at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:160)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:281)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:193)
19/12/24 11:23:43 INFO DAGScheduler: Asked to cancel job group 18b844a4-64b3-473e-8c99-c7361931e7db
19/12/24 11:23:43 ERROR SocketServerInExecutor: The server ServerSocket[addr=/192.168.46.175,localport=60840] is closing the socket Socket[addr=/192.168.46.175,port=60844,localport=60840] connection
java.io.EOFException
at java.io.DataInputStream.readInt(DataInputStream.java:392)
at org.apache.spark.sql.mlsql.sources.mysql.binlog.BinLogSocketServerSerDer$class.readRequest(servers.scala:141)
at org.apache.spark.sql.mlsql.sources.mysql.binlog.BinLogSocketServerInExecutor.readRequest(BinLogSocketServerInExecutor.scala:29)
at org.apache.spark.sql.mlsql.sources.mysql.binlog.BinLogSocketServerInExecutor.handleConnection(BinLogSocketServerInExecutor.scala:403)
at org.apache.spark.sql.mlsql.sources.mysql.binlog.SocketServerInExecutor$$anonfun$2.apply(servers.scala:128)
at org.apache.spark.sql.mlsql.sources.mysql.binlog.SocketServerInExecutor$$anonfun$2.apply(servers.scala:127)
at org.apache.spark.sql.mlsql.sources.mysql.binlog.SocketServerInExecutor$$anon$4$$anon$5.run(servers.scala:101)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
19/12/24 11:23:43 INFO TaskSchedulerImpl: Cancelling stage 3
19/12/24 11:23:43 INFO TaskSchedulerImpl: Killing all running tasks in stage 3: Stage cancelled
19/12/24 11:23:43 INFO Executor: Executor is trying to kill task 0.0 in stage 3.0 (TID 51), reason: Stage cancelled
19/12/24 11:23:43 INFO SocketServerInExecutor: Received connection fromSocket[addr=/192.168.46.175,port=60899,localport=60840]
19/12/24 11:23:43 ERROR SocketServerInExecutor: The server ServerSocket[addr=/192.168.46.175,localport=60840] is closing the socket Socket[addr=/192.168.46.175,port=60899,localport=60840] connection
java.io.EOFException
at java.io.DataInputStream.readInt(DataInputStream.java:392)
at org.apache.spark.sql.mlsql.sources.mysql.binlog.BinLogSocketServerSerDer$class.readRequest(servers.scala:141)
at org.apache.spark.sql.mlsql.sources.mysql.binlog.BinLogSocketServerInExecutor.readRequest(BinLogSocketServerInExecutor.scala:29)
at org.apache.spark.sql.mlsql.sources.mysql.binlog.BinLogSocketServerInExecutor.handleConnection(BinLogSocketServerInExecutor.scala:403)
at org.apache.spark.sql.mlsql.sources.mysql.binlog.SocketServerInExecutor$$anonfun$2.apply(servers.scala:128)
at org.apache.spark.sql.mlsql.sources.mysql.binlog.SocketServerInExecutor$$anonfun$2.apply(servers.scala:127)
at org.apache.spark.sql.mlsql.sources.mysql.binlog.SocketServerInExecutor$$anon$4$$anon$5.run(servers.scala:101)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
19/12/24 11:23:43 INFO TaskSchedulerImpl: Stage 3 was cancelled
19/12/24 11:23:43 INFO Executor: Executor interrupted and killed task 0.0 in stage 3.0 (TID 51), reason: Stage cancelled
19/12/24 11:23:43 INFO BinLogSocketServerInExecutor: Shutdown ServerSocket[addr=/192.168.46.175,localport=60840]. This may caused by the task is killed.
Exception in thread "main" org.apache.spark.sql.streaming.StreamingQueryException: Failed to merge fields 'activated_at' and 'activated_at'. Failed to merge incompatible data types TimestampType and DateType;;
=== Streaming Query ===
Identifier: [id = b11c2596-8106-40b9-a8d3-e0117ffcf23e, runId = cea35ba0-a5bc-4c68-94d8-6223cb2cf2ed]
Current Committed Offsets: {MLSQLBinLogSource(ExecutorBinlogServer(192.168.46.175,60840),org.apache.spark.sql.SparkSession@35f57010,file:///tmp/checkpoint/sources/0,Some(15000000123421051),Map(binglognameprefix -> mysql-bin, tablenamepattern -> users, databasenamepattern -> bi, binlogindex -> 1500, username -> xxx, host -> xxx, port -> 3306, binlogserverid -> 18b844a4-64b3-473e-8c99-c7361931e7db, binlogfileoffset -> 48739574, password -> xxx)): 15000000123421051}
Current Available Offsets: {MLSQLBinLogSource(ExecutorBinlogServer(192.168.46.175,60840),org.apache.spark.sql.SparkSession@35f57010,file:///tmp/checkpoint/sources/0,Some(15000000123421051),Map(binglognameprefix -> mysql-bin, tablenamepattern -> users, databasenamepattern -> bi, binlogindex -> 1500, username -> xxx, host -> xx, port -> 3306, binlogserverid -> 18b844a4-64b3-473e-8c99-c7361931e7db, binlogfileoffset -> 48739574, password -> xxxx)): 15000000138427733}

Current State: ACTIVE
Thread State: RUNNABLE

Logical Plan:
MLSQLBinLogSource(ExecutorBinlogServer(192.168.46.175,60840),org.apache.spark.sql.SparkSession@35f57010,file:///tmp/checkpoint/sources/0,Some(15000000123421051),Map(binglognameprefix -> mysql-bin, tablenamepattern -> users, databasenamepattern -> bi, binlogindex -> 1500, username -> xx, host -> xxx, port -> 3306, binlogserverid -> 18b844a4-64b3-473e-8c99-c7361931e7db, binlogfileoffset -> 48739574, password -> xxx))
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:297)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:193)
Caused by: org.apache.spark.sql.AnalysisException: Failed to merge fields 'activated_at' and 'activated_at'. Failed to merge incompatible data types TimestampType and DateType;;
at org.apache.spark.sql.delta.schema.SchemaUtils$$anonfun$18.apply(SchemaUtils.scala:666)
at org.apache.spark.sql.delta.schema.SchemaUtils$$anonfun$18.apply(SchemaUtils.scala:655)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
at org.apache.spark.sql.delta.schema.SchemaUtils$.org$apache$spark$sql$delta$schema$SchemaUtils$$merge$1(SchemaUtils.scala:655)
at org.apache.spark.sql.delta.schema.SchemaUtils$.mergeSchemas(SchemaUtils.scala:731)
at org.apache.spark.sql.delta.schema.ImplicitMetadataOperation$class.updateMetadata(ImplicitMetadataOperation.scala:60)
at org.apache.spark.sql.delta.commands.UpsertTableInDelta.updateMetadata(UpsertTableInDelta.scala:17)
at org.apache.spark.sql.delta.commands.UpsertTableInDelta$$anonfun$_run$4.apply(UpsertTableInDelta.scala:58)
at org.apache.spark.sql.delta.commands.UpsertTableInDelta$$anonfun$_run$4.apply(UpsertTableInDelta.scala:55)
at org.apache.spark.sql.delta.DeltaLog.withNewTransaction(DeltaLog.scala:388)
at org.apache.spark.sql.delta.commands.UpsertTableInDelta._run(UpsertTableInDelta.scala:55)
at org.apache.spark.sql.delta.commands.UpsertTableInDelta$$anonfun$run$1.apply$mcV$sp(UpsertTableInDelta.scala:31)
at tech.mlsql.common.DeltaJob$.runWithTry(DeltaJob.scala:17)
at org.apache.spark.sql.delta.commands.UpsertTableInDelta.run(UpsertTableInDelta.scala:30)
at org.apache.spark.sql.delta.commands.BinlogSyncToDelta$$anonfun$saveToSink$1$1.apply(BinlogSyncToDelta.scala:121)
at org.apache.spark.sql.delta.commands.BinlogSyncToDelta$$anonfun$saveToSink$1$1.apply(BinlogSyncToDelta.scala:82)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.immutable.Map$Map1.foreach(Map.scala:116)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.spark.sql.delta.commands.BinlogSyncToDelta$.saveToSink$1(BinlogSyncToDelta.scala:82)
at org.apache.spark.sql.delta.commands.BinlogSyncToDelta$.run(BinlogSyncToDelta.scala:132)
at org.apache.spark.sql.delta.sources.MLSQLDeltaSink.addBatch(MLSQLDeltaSink.scala:31)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$5$$anonfun$apply$17.apply(MicroBatchExecution.scala:537)
at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$5.apply(MicroBatchExecution.scala:535)
at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:534)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:198)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:166)
at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:160)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:281)
... 1 more
19/12/24 11:23:43 ERROR SocketServerInExecutor: The server ServerSocket[addr=/192.168.46.175,localport=60840] is closing the socket Socket[addr=/192.168.46.175,port=60848,localport=60840] connection
java.net.SocketException: Socket closed
at java.net.SocketInputStream.socketRead0(Native Method)
at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
at java.net.SocketInputStream.read(SocketInputStream.java:171)
at java.net.SocketInputStream.read(SocketInputStream.java:141)
at java.net.SocketInputStream.read(SocketInputStream.java:224)
at java.io.DataInputStream.readInt(DataInputStream.java:387)
at org.apache.spark.sql.mlsql.sources.mysql.binlog.BinLogSocketServerSerDer$class.readRequest(servers.scala:141)
at org.apache.spark.sql.mlsql.sources.mysql.binlog.BinLogSocketServerInExecutor.readRequest(BinLogSocketServerInExecutor.scala:29)
at org.apache.spark.sql.mlsql.sources.mysql.binlog.BinLogSocketServerInExecutor.handleConnection(BinLogSocketServerInExecutor.scala:403)
at org.apache.spark.sql.mlsql.sources.mysql.binlog.SocketServerInExecutor$$anonfun$2.apply(servers.scala:128)
at org.apache.spark.sql.mlsql.sources.mysql.binlog.SocketServerInExecutor$$anonfun$2.apply(servers.scala:127)
at org.apache.spark.sql.mlsql.sources.mysql.binlog.SocketServerInExecutor$$anon$4$$anon$5.run(servers.scala:101)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
19/12/24 11:23:43 INFO DAGScheduler: ResultStage 3 (start at MysqlBinLogRead.scala:44) failed in 22.634 s due to Job 1 cancelled part of cancelled job group 18b844a4-64b3-473e-8c99-c7361931e7db
19/12/24 11:23:43 INFO FileBasedWriteAheadLog_BinLogSocketServerInExecutor: Stopped write ahead log manager
19/12/24 11:23:43 INFO DAGScheduler: Job 1 failed: start at MysqlBinLogRead.scala:44, took 22.655395 s
Exception in thread "launch-binlog-socket-server-in-spark-job" org.apache.spark.SparkException: Job 1 cancelled part of cancelled job group 18b844a4-64b3-473e-8c99-c7361931e7db
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889)
at org.apache.spark.scheduler.DAGScheduler.handleJobCancellation(DAGScheduler.scala:1824)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleJobGroupCancelled$1.apply$mcVI$sp(DAGScheduler.scala:906)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleJobGroupCancelled$1.apply(DAGScheduler.scala:906)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleJobGroupCancelled$1.apply(DAGScheduler.scala:906)
at scala.collection.mutable.HashSet.foreach(HashSet.scala:78)
at org.apache.spark.scheduler.DAGScheduler.handleJobGroupCancelled(DAGScheduler.scala:906)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2079)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
at org.apache.spark.rdd.RDD.collect(RDD.scala:944)
at org.apache.spark.sql.mlsql.sources.MLSQLBinLogDataSource.org$apache$spark$sql$mlsql$sources$MLSQLBinLogDataSource$$launchBinlogServer$1(MLSQLBinLogDataSource.scala:203)
at org.apache.spark.sql.mlsql.sources.MLSQLBinLogDataSource$$anon$4.run(MLSQLBinLogDataSource.scala:210)
19/12/24 11:23:43 WARN TaskSetManager: Lost task 0.0 in stage 3.0 (TID 51, localhost, executor driver): TaskKilled (Stage cancelled)
19/12/24 11:23:43 INFO TaskSchedulerImpl: Removed TaskSet 3.0, whose tasks have all completed, from pool

@zhengqiangtan
Copy link
Author

If a schema change causes the binlog server to fail, the binlog server service will still fail if the inserted data is consumed from the previous consumption point

@allwefantasy
Copy link
Owner

This issue is caused by Delta. By default, Delta will validate the schema of input data and do not doing schema merging. This exception will make the task close the connection to binlog service and we do not handle the exception. I guess the best way is trying to quit the streaming application, another way is to make the binlog service handle this exception.

@zhengqiangtan
Copy link
Author

This issue is caused by Delta. By default, Delta will validate the schema of input data and do not doing schema merging. This exception will make the task close the connection to binlog service and we do not handle the exception. I guess the best way is trying to quit the streaming application, another way is to make the binlog service handle this exception.

I think it's a good idea to just throw an exception and exit, rather than pretend the program is dead

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants