Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Why did my stream fail after running for a while ? #20

Open
zhengqiangtan opened this issue Dec 26, 2019 · 2 comments
Open

Why did my stream fail after running for a while ? #20

zhengqiangtan opened this issue Dec 26, 2019 · 2 comments
Labels
enhancement New feature or request

Comments

@zhengqiangtan
Copy link

I have no idea why did my streaming program failed after running for a while.

Yesterday, a few hours after I started the program to run steadily, I received an email warning that my program had failed. Fortunately, I set up the automatic recovery of failure. The following is the detailed error information。

2019-12-25 17:35:03 INFO ContextCleaner:54 - Cleaned accumulator 103188
2019-12-25 17:35:03 INFO ContextCleaner:54 - Cleaned accumulator 101683
2019-12-25 17:40:00 ERROR MicroBatchExecution:91 - Query [id = 0665d063-69a7-494b-9e48-3dc5dab9fcdc, runId = a6b0095c-dd63-4652-9718-3dbe9b53dbf4] terminated with error
java.io.EOFException
at java.io.DataInputStream.readInt(DataInputStream.java:392)
at org.apache.spark.sql.mlsql.sources.mysql.binlog.BinLogSocketServerSerDer$class.readResponse(servers.scala:163)
at org.apache.spark.sql.mlsql.sources.MLSQLBinLogSource.readResponse(MLSQLBinLogDataSource.scala:239)
at org.apache.spark.sql.mlsql.sources.MLSQLBinLogSource.getLatestOffset(MLSQLBinLogDataSource.scala:320)
at org.apache.spark.sql.mlsql.sources.MLSQLBinLogSource.getOffset(MLSQLBinLogDataSource.scala:330)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$5$$anonfun$apply$9.apply(MicroBatchExecution.scala:345)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$5$$anonfun$apply$9.apply(MicroBatchExecution.scala:345)
at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$5.apply(MicroBatchExecution.scala:344)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$5.apply(MicroBatchExecution.scala:341)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply$mcZ$sp(MicroBatchExecution.scala:341)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply(MicroBatchExecution.scala:337)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply(MicroBatchExecution.scala:337)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.withProgressLocked(MicroBatchExecution.scala:554)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch(MicroBatchExecution.scala:337)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:183)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:166)
at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:160)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)
2019-12-25 17:40:00 INFO DAGScheduler:54 - Asked to cancel job group 499fdc52-d16e-4cd3-88ba-964fea21cf0d
2019-12-25 17:40:00 INFO YarnScheduler:54 - Cancelling stage 3
2019-12-25 17:40:00 INFO YarnScheduler:54 - Killing all running tasks in stage 3: Stage cancelled
2019-12-25 17:40:00 INFO YarnScheduler:54 - Stage 3 was cancelled
2019-12-25 17:40:00 INFO DAGScheduler:54 - ResultStage 3 (start at NativeMethodAccessorImpl.java:0) failed in 86674.120 s due to Job 1 cancelled part of cancelled job group 499fdc52-d16e-4cd3-88ba-964fea21cf0d
2019-12-25 17:40:00 INFO DAGScheduler:54 - Job 1 failed: start at NativeMethodAccessorImpl.java:0, took 86674.125588 s
Exception in thread "launch-binlog-socket-server-in-spark-job" org.apache.spark.SparkException: Job 1 cancelled part of cancelled job group 499fdc52-d16e-4cd3-88ba-964fea21cf0d
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1887)
at org.apache.spark.scheduler.DAGScheduler.handleJobCancellation(DAGScheduler.scala:1822)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleJobGroupCancelled$1.apply$mcVI$sp(DAGScheduler.scala:906)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleJobGroupCancelled$1.apply(DAGScheduler.scala:906)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleJobGroupCancelled$1.apply(DAGScheduler.scala:906)
at scala.collection.mutable.HashSet.foreach(HashSet.scala:78)
at org.apache.spark.scheduler.DAGScheduler.handleJobGroupCancelled(DAGScheduler.scala:906)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2077)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2057)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2046)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
at org.apache.spark.rdd.RDD.collect(RDD.scala:944)
at org.apache.spark.sql.mlsql.sources.MLSQLBinLogDataSource.org$apache$spark$sql$mlsql$sources$MLSQLBinLogDataSource$$launchBinlogServer$1(MLSQLBinLogDataSource.scala:203)
at org.apache.spark.sql.mlsql.sources.MLSQLBinLogDataSource$$anon$4.run(MLSQLBinLogDataSource.scala:210)
Traceback (most recent call last):
File "/data/sync/stream2hdfs.py", line 38, in
query.awaitTermination()
File "/opt/spark-2.4.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/sql/streaming.py", line 103, in awaitTermination
File "/opt/spark-2.4.0-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in call
File "/opt/spark-2.4.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/sql/utils.py", line 75, in deco
pyspark.sql.utils.StreamingQueryException: u'null\n=== Streaming Query ===\nIdentifier: [id = 0665d063-69a7-494b-9e48-3dc5dab9fcdc,
runId = a6b0095c-dd63-4652-9718-3dbe9b53dbf4]\nCurrent Committed Offsets: {MLSQLBinLogSource(ExecutorBinlogServer(zmbd-pm26,37103),
org.apache.spark.sql.SparkSession@65f394ba,hdfs://nameservice1/user/zmbigdata/checkpoint-binlog/sources/0,Some(97250000386023094),Map(binglognameprefix -> mysql-bin, tablenamepattern -> xxx, databasenamepattern -> xxx, binlogindex -> 9724,
username -> xxx, host -> xxx, port -> 3306, binlogserverid -> 499fdc52-d16e-4cd3-88ba-964fea21cf0d, binlogfileoffset -> 88952842, password -> xxx)): 97790000376934283}\n
Current Available Offsets: {MLSQLBinLogSource(ExecutorBinlogServer(zmbd-pm26,37103),org.apache.spark.sql.SparkSession@65f394ba,hdfs://nameservice1/user/zmbigdata/checkpoint-binlog/sources/0,Some(97250000386023094),Map(binglognameprefix -> mysql-bin,
tablenamepattern -> users, databasenamepattern -> xx, binlogindex -> 9724, username -> xx, host -> 172.xx.xx.xx, port -> 3306, binlogserverid -> 499fdc52-d16e-4cd3-88ba-964fea21cf0d, binlogfileoffset -> 88952842, password -> xxx)): 97790000376934283}
\n\nCurrent State: ACTIVE\nThread State: RUNNABLE\n\nLogical Plan:\nMLSQLBinLogSource(ExecutorBinlogServer(zmbd-pm26,37103),org.apache.spark.sql.SparkSession@65f394ba,
hdfs://nameservice1/user/zmbigdata/checkpoint-binlog/sources/0,Some(97250000386023094),Map(binglognameprefix -> mysql-bin,
tablenamepattern -> users, databasenamepattern -> xx, binlogindex -> 9724, username -> xx, host -> xx, port -> 3306,
binlogserverid -> 499fdc52-d16e-4cd3-88ba-964fea21cf0d, binlogfileoffset -> 88952842, password -> xx))'
2019-12-25 17:40:00 INFO SparkContext:54 - Invoking stop() from shutdown hook
2019-12-25 17:40:00 INFO AbstractConnector:318 - Stopped Spark@9a4dd2b{HTTP/1.1,[http/1.1]}{0.0.0.0:4040}
2019-12-25 17:40:00 INFO SparkUI:54 - Stopped Spark web UI at http://zmbd-vpc-wk04:4040
2019-12-25 17:40:00 INFO YarnClientSchedulerBackend:54 - Interrupting monitor thread
2019-12-25 17:40:00 INFO YarnClientSchedulerBackend:54 - Shutting down all executors
2019-12-25 17:40:00 INFO YarnSchedulerBackend$YarnDriverEndpoint:54 - Asking each executor to shut down
2019-12-25 17:40:00 INFO SchedulerExtensionServices:54 - Stopping SchedulerExtensionServices
(serviceOption=None,
services=List(),
started=false)
2019-12-25 17:40:00 INFO YarnClientSchedulerBackend:54 - Stopped
2019-12-25 17:40:00 ERROR TransportRequestHandler:277 - Error while invoking RpcHandler#receive() for one-way message.
org.apache.spark.SparkException: Could not find CoarseGrainedScheduler.
at org.apache.spark.rpc.netty.Dispatcher.postMessage(Dispatcher.scala:160)
at org.apache.spark.rpc.netty.Dispatcher.postOneWayMessage(Dispatcher.scala:140)
at org.apache.spark.rpc.netty.NettyRpcHandler.receive(NettyRpcEnv.scala:655)
at org.apache.spark.network.server.TransportRequestHandler.processOneWayMessage(TransportRequestHandler.java:275)
at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:105)
at org.apache.spark.network.server.TransportChannelHandler.channelRead(TransportChannelHandler.java:118)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:85)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1359)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:935)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:138)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
at java.lang.Thread.run(Thread.java:748)
2019-12-25 17:40:00 INFO MapOutputTrackerMasterEndpoint:54 - MapOutputTrackerMasterEndpoint stopped!
2019-12-25 17:40:00 INFO MemoryStore:54 - MemoryStore cleared
2019-12-25 17:40:00 INFO BlockManager:54 - BlockManager stopped
2019-12-25 17:40:00 INFO BlockManagerMaster:54 - BlockManagerMaster stopped
2019-12-25 17:40:00 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint:54 - OutputCommitCoordinator stopped!
2019-12-25 17:40:00 INFO SparkContext:54 - Successfully stopped SparkContext
2019-12-25 17:40:00 INFO ShutdownHookManager:54 - Shutdown hook called
2019-12-25 17:40:00 INFO ShutdownHookManager:54 - Deleting directory /dfs/data1/sparkdata/spark-8d3cdd89-85d5-4ef4-8fc8-754a1a9add98
2019-12-25 17:40:00 INFO ShutdownHookManager:54 - Deleting directory /dfs/data1/sparkdata/spark-8d3cdd89-85d5-4ef4-8fc8-754a1a9add98/pyspark-63924722-b7ee-43fc-ae3e-75f1d2df5bd4
2019-12-25 17:40:00 INFO ShutdownHookManager:54 - Deleting directory /tmp/spark-9d33e5d5-df54-4a64-9518-52553eb4ec06

@allwefantasy Can you give me some advice to avoid such problems? thanks~

@allwefantasy
Copy link
Owner

allwefantasy commented Dec 26, 2019

Network jitter. Every duration the streaming application will request a new offset from the binlog server, and they communicate with a socket. When the network is unstable, the connection may be broken and the streaming application may be not able to read data from the connection.

Make sure when the application fails you can restart it automatically. And we will also try to add some retry mechanism on the socket communication in the future. Thanks for your feedback.

@allwefantasy allwefantasy added the enhancement New feature or request label Dec 26, 2019
@zhengqiangtan
Copy link
Author

zhengqiangtan commented Dec 26, 2019

We are looking forward to add some retry mechanism on the socket communication in the future,because it is very necessary, thanks for the answer!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

2 participants