-
Notifications
You must be signed in to change notification settings - Fork 5
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Larger cluster enabling ML random forrest to work with full data #239
Comments
I think this was done as part of pull #237. Needs checks to confirm it works. |
Follow on issue - configurable deployments #240 |
@NigelHambly If you are happy with the performance, then this is done. |
No dice with the current cluster (see below). I'm trying to train an RF of 5000 decision trees with of order 200,000 rows of data with 16 features. It works fine with 20,000 rows and 100 trees (same features but 10% sampling). Will try again keeping the full row count but with 500 trees, and not just for the sake of it - could be that such a large number of trees is unjustifiable in terms of the sensitivity/specificity of the results. @stvoutsin: how much memory and how much scratch disk space does each executor have? IIUC the latter is by default /tmp which may not be very big ... Exception trace: --------------------------------------------------------------------------- Py4JJavaError Traceback (most recent call last) in 6 # instantiate a trained RF classifier, seeded for repeatability at this stage: 7 rf = RandomForestClassifier(featureSubsetStrategy = 'sqrt', featuresCol = 'features', labelCol = 'label', numTrees = 5000, impurity = 'gini', seed=42) ----> 8 model = rf.fit(training_df) 9 10 # benchmarks: featureSubsetStrategy = "sqrt" /opt/spark/python/lib/pyspark.zip/pyspark/ml/base.py in fit(self, dataset, params) 130 return self.copy(params)._fit(dataset) 131 else: --> 132 return self._fit(dataset) 133 else: 134 raise ValueError("Params must be either a param map or a list/tuple of param maps, " /opt/spark/python/lib/pyspark.zip/pyspark/ml/wrapper.py in _fit(self, dataset) 293 294 def _fit(self, dataset): --> 295 java_model = self._fit_java(dataset) 296 model = self._create_model(java_model) 297 return self._copyValues(model) /opt/spark/python/lib/pyspark.zip/pyspark/ml/wrapper.py in _fit_java(self, dataset) 290 """ 291 self._transfer_params_to_java() --> 292 return self._java_obj.fit(dataset._jdf) 293 294 def _fit(self, dataset): /opt/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in __call__(self, *args) 1255 answer = self.gateway_client.send_command(command) 1256 return_value = get_return_value( -> 1257 answer, self.gateway_client, self.target_id, self.name) 1258 1259 for temp_arg in temp_args: /opt/spark/python/lib/pyspark.zip/pyspark/sql/utils.py in deco(*a, **kw) 61 def deco(*a, **kw): 62 try: ---> 63 return f(*a, **kw) 64 except py4j.protocol.Py4JJavaError as e: 65 s = e.java_exception.toString() /opt/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name) 326 raise Py4JJavaError( 327 "An error occurred while calling {0}{1}{2}.\n". --> 328 format(target_id, ".", name), value) 329 else: 330 raise Py4JError( Py4JJavaError: An error occurred while calling o1681.fit. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 5720 in stage 121.0 failed 4 times, most recent failure: Lost task 5720.3 in stage 121.0 (TID 487751, worker03, executor 4): java.io.IOException: No space left on device at sun.nio.ch.FileDispatcherImpl.write0(Native Method) at sun.nio.ch.FileDispatcherImpl.write(FileDispatcherImpl.java:60) at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) at sun.nio.ch.IOUtil.write(IOUtil.java:65) at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:211) at org.apache.spark.storage.CountingWritableChannel.write(DiskStore.scala:332) at java.nio.channels.Channels.writeFullyImpl(Channels.java:78) at java.nio.channels.Channels.writeFully(Channels.java:101) at java.nio.channels.Channels.access$000(Channels.java:61) at java.nio.channels.Channels$1.write(Channels.java:174) at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126) at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877) at java.io.ObjectOutputStream$BlockDataOutputStream.writeByte(ObjectOutputStream.java:1915) at java.io.ObjectOutputStream.writeFatalException(ObjectOutputStream.java:1576) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:351) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:43) at org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:140) at org.apache.spark.serializer.SerializerManager.dataSerializeStream(SerializerManager.scala:174) at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1$$anonfun$apply$7.apply(BlockManager.scala:1174) at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1$$anonfun$apply$7.apply(BlockManager.scala:1172) at org.apache.spark.storage.DiskStore.put(DiskStore.scala:69) at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1172) at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156) at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091) at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156) at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882) at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357) at org.apache.spark.rdd.RDD.iterator(RDD.scala:308) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55) at org.apache.spark.scheduler.Task.run(Task.scala:123) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1925) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1913) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1912) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1912) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:948) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:948) at scala.Option.foreach(Option.scala:257) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:948) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2146) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2095) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2084) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:759) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126) at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:990) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:385) at org.apache.spark.rdd.RDD.collect(RDD.scala:989) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$collectAsMap$1.apply(PairRDDFunctions.scala:743) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$collectAsMap$1.apply(PairRDDFunctions.scala:742) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:385) at org.apache.spark.rdd.PairRDDFunctions.collectAsMap(PairRDDFunctions.scala:742) at org.apache.spark.ml.tree.impl.RandomForest$.findBestSplits(RandomForest.scala:567) at org.apache.spark.ml.tree.impl.RandomForest$.run(RandomForest.scala:201) at org.apache.spark.ml.classification.RandomForestClassifier$$anonfun$train$1.apply(RandomForestClassifier.scala:142) at org.apache.spark.ml.classification.RandomForestClassifier$$anonfun$train$1.apply(RandomForestClassifier.scala:120) at org.apache.spark.ml.util.Instrumentation$$anonfun$11.apply(Instrumentation.scala:185) at scala.util.Try$.apply(Try.scala:192) at org.apache.spark.ml.util.Instrumentation$.instrumented(Instrumentation.scala:185) at org.apache.spark.ml.classification.RandomForestClassifier.train(RandomForestClassifier.scala:120) at org.apache.spark.ml.classification.RandomForestClassifier.train(RandomForestClassifier.scala:46) at org.apache.spark.ml.Predictor.fit(Predictor.scala:118) at org.apache.spark.ml.Predictor.fit(Predictor.scala:82) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:748) Caused by: java.io.IOException: No space left on device at sun.nio.ch.FileDispatcherImpl.write0(Native Method) at sun.nio.ch.FileDispatcherImpl.write(FileDispatcherImpl.java:60) at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) at sun.nio.ch.IOUtil.write(IOUtil.java:65) at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:211) at org.apache.spark.storage.CountingWritableChannel.write(DiskStore.scala:332) at java.nio.channels.Channels.writeFullyImpl(Channels.java:78) at java.nio.channels.Channels.writeFully(Channels.java:101) at java.nio.channels.Channels.access$000(Channels.java:61) at java.nio.channels.Channels$1.write(Channels.java:174) at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126) at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877) at java.io.ObjectOutputStream$BlockDataOutputStream.writeByte(ObjectOutputStream.java:1915) at java.io.ObjectOutputStream.writeFatalException(ObjectOutputStream.java:1576) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:351) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:43) at org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:140) at org.apache.spark.serializer.SerializerManager.dataSerializeStream(SerializerManager.scala:174) at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1$$anonfun$apply$7.apply(BlockManager.scala:1174) at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1$$anonfun$apply$7.apply(BlockManager.scala:1172) at org.apache.spark.storage.DiskStore.put(DiskStore.scala:69) at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1172) at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156) at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091) at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156) at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882) at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357) at org.apache.spark.rdd.RDD.iterator(RDD.scala:308) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55) at org.apache.spark.scheduler.Task.run(Task.scala:123) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ... 1 more |
Interesting: 500 tree RF trains on ~200,000 data set no problem (takes just over an hour) and the accuracy of the classification seems OK: misclassification rate of 0.4%. |
I think we'll close this one for now: decent results are obtainable with the current cluster size and a sensible number of trees in the RF. It could well be that decision trees involving 1000s of nodes are in any case inadvisable in that they over-fit the data. We'll only be able to address these kinds of configuration questions when we have more users attempting a greater variety of real-world work-flows. |
Update the Ansible deployment to create a larger cluster enabling Nigel to work on ML random forest with full data.
The text was updated successfully, but these errors were encountered: