You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
I have no idea why did my streaming program failed after running for a while.
Yesterday, a few hours after I started the program to run steadily, I received an email warning that my program had failed. Fortunately, I set up the automatic recovery of failure. The following is the detailed error information。
2019-12-25 17:35:03 INFO ContextCleaner:54 - Cleaned accumulator 103188
2019-12-25 17:35:03 INFO ContextCleaner:54 - Cleaned accumulator 101683
2019-12-25 17:40:00 ERROR MicroBatchExecution:91 - Query [id = 0665d063-69a7-494b-9e48-3dc5dab9fcdc, runId = a6b0095c-dd63-4652-9718-3dbe9b53dbf4] terminated with error
java.io.EOFException
at java.io.DataInputStream.readInt(DataInputStream.java:392)
at org.apache.spark.sql.mlsql.sources.mysql.binlog.BinLogSocketServerSerDer$class.readResponse(servers.scala:163)
at org.apache.spark.sql.mlsql.sources.MLSQLBinLogSource.readResponse(MLSQLBinLogDataSource.scala:239)
at org.apache.spark.sql.mlsql.sources.MLSQLBinLogSource.getLatestOffset(MLSQLBinLogDataSource.scala:320)
at org.apache.spark.sql.mlsql.sources.MLSQLBinLogSource.getOffset(MLSQLBinLogDataSource.scala:330)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$5$$anonfun$apply$9.apply(MicroBatchExecution.scala:345)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$5$$anonfun$apply$9.apply(MicroBatchExecution.scala:345)
at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$5.apply(MicroBatchExecution.scala:344)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$5.apply(MicroBatchExecution.scala:341)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply$mcZ$sp(MicroBatchExecution.scala:341)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply(MicroBatchExecution.scala:337)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply(MicroBatchExecution.scala:337)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.withProgressLocked(MicroBatchExecution.scala:554)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch(MicroBatchExecution.scala:337)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:183)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:166)
at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:160)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)
2019-12-25 17:40:00 INFO DAGScheduler:54 - Asked to cancel job group 499fdc52-d16e-4cd3-88ba-964fea21cf0d
2019-12-25 17:40:00 INFO YarnScheduler:54 - Cancelling stage 3
2019-12-25 17:40:00 INFO YarnScheduler:54 - Killing all running tasks in stage 3: Stage cancelled
2019-12-25 17:40:00 INFO YarnScheduler:54 - Stage 3 was cancelled
2019-12-25 17:40:00 INFO DAGScheduler:54 - ResultStage 3 (start at NativeMethodAccessorImpl.java:0) failed in 86674.120 s due to Job 1 cancelled part of cancelled job group 499fdc52-d16e-4cd3-88ba-964fea21cf0d
2019-12-25 17:40:00 INFO DAGScheduler:54 - Job 1 failed: start at NativeMethodAccessorImpl.java:0, took 86674.125588 s
Exception in thread "launch-binlog-socket-server-in-spark-job" org.apache.spark.SparkException: Job 1 cancelled part of cancelled job group 499fdc52-d16e-4cd3-88ba-964fea21cf0d
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1887)
at org.apache.spark.scheduler.DAGScheduler.handleJobCancellation(DAGScheduler.scala:1822)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleJobGroupCancelled$1.apply$mcVI$sp(DAGScheduler.scala:906)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleJobGroupCancelled$1.apply(DAGScheduler.scala:906)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleJobGroupCancelled$1.apply(DAGScheduler.scala:906)
at scala.collection.mutable.HashSet.foreach(HashSet.scala:78)
at org.apache.spark.scheduler.DAGScheduler.handleJobGroupCancelled(DAGScheduler.scala:906)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2077)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2057)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2046)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
at org.apache.spark.rdd.RDD.collect(RDD.scala:944)
at org.apache.spark.sql.mlsql.sources.MLSQLBinLogDataSource.org$apache$spark$sql$mlsql$sources$MLSQLBinLogDataSource$$launchBinlogServer$1(MLSQLBinLogDataSource.scala:203)
at org.apache.spark.sql.mlsql.sources.MLSQLBinLogDataSource$$anon$4.run(MLSQLBinLogDataSource.scala:210)
Traceback (most recent call last):
File "/data/sync/stream2hdfs.py", line 38, in
query.awaitTermination()
File "/opt/spark-2.4.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/sql/streaming.py", line 103, in awaitTermination
File "/opt/spark-2.4.0-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in call
File "/opt/spark-2.4.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/sql/utils.py", line 75, in deco
pyspark.sql.utils.StreamingQueryException: u'null\n=== Streaming Query ===\nIdentifier: [id = 0665d063-69a7-494b-9e48-3dc5dab9fcdc,
runId = a6b0095c-dd63-4652-9718-3dbe9b53dbf4]\nCurrent Committed Offsets: {MLSQLBinLogSource(ExecutorBinlogServer(zmbd-pm26,37103),
org.apache.spark.sql.SparkSession@65f394ba,hdfs://nameservice1/user/zmbigdata/checkpoint-binlog/sources/0,Some(97250000386023094),Map(binglognameprefix -> mysql-bin, tablenamepattern -> xxx, databasenamepattern -> xxx, binlogindex -> 9724,
username -> xxx, host -> xxx, port -> 3306, binlogserverid -> 499fdc52-d16e-4cd3-88ba-964fea21cf0d, binlogfileoffset -> 88952842, password -> xxx)): 97790000376934283}\n
Current Available Offsets: {MLSQLBinLogSource(ExecutorBinlogServer(zmbd-pm26,37103),org.apache.spark.sql.SparkSession@65f394ba,hdfs://nameservice1/user/zmbigdata/checkpoint-binlog/sources/0,Some(97250000386023094),Map(binglognameprefix -> mysql-bin,
tablenamepattern -> users, databasenamepattern -> xx, binlogindex -> 9724, username -> xx, host -> 172.xx.xx.xx, port -> 3306, binlogserverid -> 499fdc52-d16e-4cd3-88ba-964fea21cf0d, binlogfileoffset -> 88952842, password -> xxx)): 97790000376934283}
\n\nCurrent State: ACTIVE\nThread State: RUNNABLE\n\nLogical Plan:\nMLSQLBinLogSource(ExecutorBinlogServer(zmbd-pm26,37103),org.apache.spark.sql.SparkSession@65f394ba,
hdfs://nameservice1/user/zmbigdata/checkpoint-binlog/sources/0,Some(97250000386023094),Map(binglognameprefix -> mysql-bin,
tablenamepattern -> users, databasenamepattern -> xx, binlogindex -> 9724, username -> xx, host -> xx, port -> 3306,
binlogserverid -> 499fdc52-d16e-4cd3-88ba-964fea21cf0d, binlogfileoffset -> 88952842, password -> xx))'
2019-12-25 17:40:00 INFO SparkContext:54 - Invoking stop() from shutdown hook
2019-12-25 17:40:00 INFO AbstractConnector:318 - Stopped Spark@9a4dd2b{HTTP/1.1,[http/1.1]}{0.0.0.0:4040}
2019-12-25 17:40:00 INFO SparkUI:54 - Stopped Spark web UI at http://zmbd-vpc-wk04:4040
2019-12-25 17:40:00 INFO YarnClientSchedulerBackend:54 - Interrupting monitor thread
2019-12-25 17:40:00 INFO YarnClientSchedulerBackend:54 - Shutting down all executors
2019-12-25 17:40:00 INFO YarnSchedulerBackend$YarnDriverEndpoint:54 - Asking each executor to shut down
2019-12-25 17:40:00 INFO SchedulerExtensionServices:54 - Stopping SchedulerExtensionServices
(serviceOption=None,
services=List(),
started=false)
2019-12-25 17:40:00 INFO YarnClientSchedulerBackend:54 - Stopped
2019-12-25 17:40:00 ERROR TransportRequestHandler:277 - Error while invoking RpcHandler#receive() for one-way message.
org.apache.spark.SparkException: Could not find CoarseGrainedScheduler.
at org.apache.spark.rpc.netty.Dispatcher.postMessage(Dispatcher.scala:160)
at org.apache.spark.rpc.netty.Dispatcher.postOneWayMessage(Dispatcher.scala:140)
at org.apache.spark.rpc.netty.NettyRpcHandler.receive(NettyRpcEnv.scala:655)
at org.apache.spark.network.server.TransportRequestHandler.processOneWayMessage(TransportRequestHandler.java:275)
at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:105)
at org.apache.spark.network.server.TransportChannelHandler.channelRead(TransportChannelHandler.java:118)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:85)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1359)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:935)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:138)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
at java.lang.Thread.run(Thread.java:748)
2019-12-25 17:40:00 INFO MapOutputTrackerMasterEndpoint:54 - MapOutputTrackerMasterEndpoint stopped!
2019-12-25 17:40:00 INFO MemoryStore:54 - MemoryStore cleared
2019-12-25 17:40:00 INFO BlockManager:54 - BlockManager stopped
2019-12-25 17:40:00 INFO BlockManagerMaster:54 - BlockManagerMaster stopped
2019-12-25 17:40:00 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint:54 - OutputCommitCoordinator stopped!
2019-12-25 17:40:00 INFO SparkContext:54 - Successfully stopped SparkContext
2019-12-25 17:40:00 INFO ShutdownHookManager:54 - Shutdown hook called
2019-12-25 17:40:00 INFO ShutdownHookManager:54 - Deleting directory /dfs/data1/sparkdata/spark-8d3cdd89-85d5-4ef4-8fc8-754a1a9add98
2019-12-25 17:40:00 INFO ShutdownHookManager:54 - Deleting directory /dfs/data1/sparkdata/spark-8d3cdd89-85d5-4ef4-8fc8-754a1a9add98/pyspark-63924722-b7ee-43fc-ae3e-75f1d2df5bd4
2019-12-25 17:40:00 INFO ShutdownHookManager:54 - Deleting directory /tmp/spark-9d33e5d5-df54-4a64-9518-52553eb4ec06
@allwefantasy Can you give me some advice to avoid such problems? thanks~
The text was updated successfully, but these errors were encountered:
Network jitter. Every duration the streaming application will request a new offset from the binlog server, and they communicate with a socket. When the network is unstable, the connection may be broken and the streaming application may be not able to read data from the connection.
Make sure when the application fails you can restart it automatically. And we will also try to add some retry mechanism on the socket communication in the future. Thanks for your feedback.
I have no idea why did my streaming program failed after running for a while.
Yesterday, a few hours after I started the program to run steadily, I received an email warning that my program had failed. Fortunately, I set up the automatic recovery of failure. The following is the detailed error information。
@allwefantasy Can you give me some advice to avoid such problems? thanks~
The text was updated successfully, but these errors were encountered: