Skip to content

Commit

Permalink
[VL] Gluten-it: Improve test report table format for parameterized te…
Browse files Browse the repository at this point in the history
…st (#6052)
  • Loading branch information
zhztheplayer authored Jun 12, 2024
1 parent 31902ae commit 1c505df
Show file tree
Hide file tree
Showing 7 changed files with 243 additions and 130 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public class Parameterized implements Callable<Integer> {

@Override
public Integer call() throws Exception {
final Map<String, Map<String, List<Map.Entry<String, String>>>> parsed = new HashMap<>();
final Map<String, Map<String, List<Map.Entry<String, String>>>> parsed = new LinkedHashMap<>();

final Seq<scala.collection.immutable.Set<DimKv>> excludedCombinations = JavaConverters.asScalaBufferConverter(Arrays.stream(excludedDims).map(d -> {
final Matcher m = excludedDimsPattern.matcher(d);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@ package org.apache.gluten.integration.action

import org.apache.commons.lang3.exception.ExceptionUtils
import org.apache.gluten.integration.action.Actions.QuerySelector
import org.apache.gluten.integration.action.TableRender.Field
import org.apache.gluten.integration.action.TableRender.RowParser.FieldAppender.RowAppender
import org.apache.gluten.integration.stat.RamStat
import org.apache.gluten.integration.{QueryRunner, Suite, TableCreator}
import org.apache.spark.sql.ConfUtils.ConfImplicits._
import org.apache.spark.sql.SparkSessionSwitcher

import java.util.concurrent.atomic.AtomicInteger
import scala.collection.mutable
import scala.collection.mutable.ListBuffer

Expand All @@ -39,6 +41,8 @@ class Parameterized(
metrics: Array[String])
extends Action {

validateDims(configDimensions)

private def validateDims(configDimensions: Seq[Dim]): Unit = {
if (configDimensions
.map(dim => {
Expand All @@ -57,32 +61,33 @@ class Parameterized(
}

private val coordinates: mutable.LinkedHashMap[Coordinate, Seq[(String, String)]] = {
validateDims(configDimensions)
val dimCount = configDimensions.size
val coordinateMap = mutable.LinkedHashMap[Coordinate, Seq[(String, String)]]()
val nextId: AtomicInteger = new AtomicInteger(1);

def fillCoordinates(
dimOffset: Int,
intermediateCoordinates: Map[String, String],
intermediateCoordinate: Map[String, String],
intermediateConf: Seq[(String, String)]): Unit = {
if (dimOffset == dimCount) {
// we got one coordinate
excludedCombinations.foreach { ec: Set[DimKv] =>
if (ec.forall { kv =>
intermediateCoordinates.contains(kv.k) && intermediateCoordinates(kv.k) == kv.v
intermediateCoordinate.contains(kv.k) && intermediateCoordinate(kv.k) == kv.v
}) {
println(s"Coordinate ${Coordinate(intermediateCoordinates)} excluded by $ec.")
println(s"Coordinate ${intermediateCoordinate} excluded by $ec.")
return
}
}
coordinateMap(Coordinate(intermediateCoordinates)) = intermediateConf
coordinateMap(Coordinate(nextId.getAndIncrement(), intermediateCoordinate)) =
intermediateConf
return
}
val dim = configDimensions(dimOffset)
dim.dimValues.foreach { dimValue =>
fillCoordinates(
dimOffset + 1,
intermediateCoordinates + (dim.name -> dimValue.name),
intermediateCoordinate + (dim.name -> dimValue.name),
intermediateConf ++ dimValue.conf)
}
}
Expand All @@ -95,7 +100,6 @@ class Parameterized(
override def execute(suite: Suite): Boolean = {
val runner: QueryRunner =
new QueryRunner(suite.queryResource(), suite.dataWritePath(scale, genPartitionedData))
val allQueries = suite.allQueryIds()

val sessionSwitcher = suite.sessionSwitcher
val testConf = suite.getTestConf()
Expand All @@ -116,36 +120,40 @@ class Parameterized(

val runQueryIds = queries.select(suite)

// warm up
(0 until warmupIterations).foreach { _ =>
runQueryIds.foreach { queryId =>
Parameterized.warmUp(suite.tableCreator(), queryId, suite.desc(), sessionSwitcher, runner)
}
}

val results = coordinates.flatMap { entry =>
val coordinate = entry._1
val coordinateResults = (0 until iterations).flatMap { iteration =>
println(s"Running tests (iteration $iteration) with coordinate $coordinate...")
runQueryIds.map { queryId =>
Parameterized.runQuery(
runner,
suite.tableCreator(),
sessionSwitcher,
val results = (0 until iterations).flatMap { iteration =>
runQueryIds.map { queryId =>
val queryResult =
TestResultLine(
queryId,
coordinate,
suite.desc(),
explain,
metrics)
}
}.toList
coordinateResults
coordinates.map { entry =>
val coordinate = entry._1
println(s"Running tests (iteration $iteration) with coordinate $coordinate...")
// warm up
(0 until warmupIterations).foreach { _ =>
Parameterized.warmUp(
runner,
suite.tableCreator(),
sessionSwitcher,
queryId,
suite.desc())
}
// run
Parameterized.runQuery(
runner,
suite.tableCreator(),
sessionSwitcher,
queryId,
coordinate,
suite.desc(),
explain,
metrics)
}.toList)
queryResult
}
}

val dimNames = configDimensions.map(dim => dim.name)

val passedCount = results.count(l => l.succeed)
val count = results.count(_ => true)
val succeededCount = results.count(l => l.succeeded())
val totalCount = results.count(_ => true)

// RAM stats
println("Performing GC to collect RAM statistics... ")
Expand All @@ -160,22 +168,37 @@ class Parameterized(
println("")
println("Test report: ")
println("")
printf("Summary: %d out of %d queries passed. \n", passedCount, count)
printf(
"Summary: %d out of %d queries successfully run on all config combinations. \n",
succeededCount,
totalCount)
println("")
TestResultLines(dimNames, metrics, results.filter(_.succeed)).print()
println("Configurations:")
coordinates.foreach { coord =>
println(s"${coord._1.id}. ${coord._1}")
}
println("")
val succeeded = results.filter(_.succeeded())
TestResultLines(
coordinates.size,
configDimensions,
metrics,
succeeded ++ TestResultLine.aggregate("all", succeeded))
.print()
println("")

if (passedCount == count) {
if (succeededCount == totalCount) {
println("No failed queries. ")
println("")
} else {
println("Failed queries: ")
println("")
TestResultLines(dimNames, metrics, results.filter(!_.succeed)).print()
TestResultLines(coordinates.size, configDimensions, metrics, results.filter(!_.succeeded()))
.print()
println("")
}

if (passedCount != count) {
if (succeededCount != totalCount) {
return false
}
true
Expand All @@ -185,56 +208,84 @@ class Parameterized(
case class DimKv(k: String, v: String)
case class Dim(name: String, dimValues: Seq[DimValue])
case class DimValue(name: String, conf: Seq[(String, String)])
case class Coordinate(coordinate: Map[String, String]) // [dim, dim value]

case class TestResultLine(
queryId: String,
succeed: Boolean,
coordinate: Coordinate,
rowCount: Option[Long],
planningTimeMillis: Option[Long],
executionTimeMillis: Option[Long],
metrics: Map[String, Long],
errorMessage: Option[String])
// coordinate: [dim, dim value]
case class Coordinate(id: Int, coordinate: Map[String, String]) {
override def toString: String = coordinate.mkString(", ")
}

case class TestResultLine(queryId: String, coordinates: Seq[TestResultLine.Coord]) {
def succeeded(): Boolean = {
coordinates.forall(_.succeeded)
}
}

object TestResultLine {
class Parser(dimNames: Seq[String], metricNames: Seq[String])
extends TableRender.RowParser[TestResultLine] {
case class Coord(
coordinate: Coordinate,
succeeded: Boolean,
rowCount: Option[Long],
planningTimeMillis: Option[Long],
executionTimeMillis: Option[Long],
metrics: Map[String, Long],
errorMessage: Option[String])

class Parser(metricNames: Seq[String]) extends TableRender.RowParser[TestResultLine] {
override def parse(rowAppender: RowAppender, line: TestResultLine): Unit = {
val inc = rowAppender.incremental()
inc.next().write(line.queryId)
inc.next().write(line.succeed)
dimNames.foreach { dimName =>
val coordinate = line.coordinate.coordinate
if (!coordinate.contains(dimName)) {
throw new IllegalStateException("Dimension name not found" + dimName)
}
inc.next().write(coordinate(dimName))
}
metricNames.foreach { metricName =>
val metrics = line.metrics
inc.next().write(metrics.getOrElse(metricName, "N/A"))
}
inc.next().write(line.rowCount.getOrElse("N/A"))
inc.next().write(line.planningTimeMillis.getOrElse("N/A"))
inc.next().write(line.executionTimeMillis.getOrElse("N/A"))
val coords = line.coordinates
coords.foreach(coord => inc.next().write(coord.succeeded))
coords.foreach(coord => inc.next().write(coord.rowCount))
metricNames.foreach(metricName =>
coords.foreach(coord => inc.next().write(coord.metrics(metricName))))
coords.foreach(coord => inc.next().write(coord.planningTimeMillis))
coords.foreach(coord => inc.next().write(coord.executionTimeMillis))
}
}

def aggregate(name: String, lines: Iterable[TestResultLine]): Iterable[TestResultLine] = {
if (lines.isEmpty) {
return Nil
}

if (lines.size == 1) {
return Nil
}

List(lines.reduce { (left, right) =>
TestResultLine(name, left.coordinates.zip(right.coordinates).map {
case (leftCoord, rightCoord) =>
assert(leftCoord.coordinate == rightCoord.coordinate)
Coord(
leftCoord.coordinate,
leftCoord.succeeded && rightCoord.succeeded,
(leftCoord.rowCount, rightCoord.rowCount).onBothProvided(_ + _),
(leftCoord.planningTimeMillis, rightCoord.planningTimeMillis).onBothProvided(_ + _),
(leftCoord.executionTimeMillis, rightCoord.executionTimeMillis).onBothProvided(_ + _),
(leftCoord.metrics, rightCoord.metrics).sumUp,
(leftCoord.errorMessage ++ rightCoord.errorMessage).reduceOption(_ + ", " + _))
})
})
}
}

case class TestResultLines(
dimNames: Seq[String],
coordCount: Int,
configDimensions: Seq[Dim],
metricNames: Seq[String],
lines: Iterable[TestResultLine]) {
def print(): Unit = {
val fields = ListBuffer[String]("Query ID", "Succeeded")
dimNames.foreach(dimName => fields.append(dimName))
metricNames.foreach(metricName => fields.append(metricName))
fields.append("Row Count")
fields.append("Planning Time (Millis)")
fields.append("Query Time (Millis)")
val render = TableRender.plain[TestResultLine](fields: _*)(
new TestResultLine.Parser(dimNames, metricNames))
val fields = ListBuffer[Field](Field.Leaf("Query ID"))
val coordFields = (1 to coordCount).map(id => Field.Leaf(id.toString))

fields.append(Field.Branch("Succeeded", coordFields))
fields.append(Field.Branch("Row Count", coordFields))
metricNames.foreach(metricName => fields.append(Field.Branch(metricName, coordFields)))
fields.append(Field.Branch("Planning Time (Millis)", coordFields))
fields.append(Field.Branch("Query Time (Millis)", coordFields))

val render =
TableRender.create[TestResultLine](fields: _*)(new TestResultLine.Parser(metricNames))

lines.foreach { line =>
render.appendRow(line)
Expand All @@ -253,10 +304,10 @@ object Parameterized {
coordinate: Coordinate,
desc: String,
explain: Boolean,
metrics: Array[String]) = {
metrics: Array[String]): TestResultLine.Coord = {
println(s"Running query: $id...")
try {
val testDesc = "Gluten Spark %s %s %s".format(desc, id, coordinate)
val testDesc = "Gluten Spark %s [%s] %s".format(desc, id, coordinate)
sessionSwitcher.useSession(coordinate.toString, testDesc)
runner.createTables(creator, sessionSwitcher.spark())
val result =
Expand All @@ -265,10 +316,9 @@ object Parameterized {
println(
s"Successfully ran query $id. " +
s"Returned row count: ${resultRows.length}")
TestResultLine(
id,
succeed = true,
TestResultLine.Coord(
coordinate,
succeeded = true,
Some(resultRows.length),
Some(result.planningTimeMillis),
Some(result.executionTimeMillis),
Expand All @@ -280,16 +330,16 @@ object Parameterized {
println(
s"Error running query $id. " +
s" Error: ${error.get}")
TestResultLine(id, succeed = false, coordinate, None, None, None, Map.empty, error)
TestResultLine.Coord(coordinate, succeeded = false, None, None, None, Map.empty, error)
}
}

private[integration] def warmUp(
private def warmUp(
runner: QueryRunner,
creator: TableCreator,
id: String,
desc: String,
sessionSwitcher: SparkSessionSwitcher,
runner: QueryRunner): Unit = {
id: String,
desc: String): Unit = {
println(s"Warming up: Running query: $id...")
try {
val testDesc = "Gluten Spark %s %s warm up".format(desc, id)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,9 @@ object Queries {
val inc = rowAppender.incremental()
inc.next().write(line.queryId)
inc.next().write(line.testPassed)
inc.next().write(line.rowCount.getOrElse("N/A"))
inc.next().write(line.planningTimeMillis.getOrElse("N/A"))
inc.next().write(line.executionTimeMillis.getOrElse("N/A"))
inc.next().write(line.rowCount)
inc.next().write(line.planningTimeMillis)
inc.next().write(line.executionTimeMillis)
}
}
}
Expand Down
Loading

0 comments on commit 1c505df

Please sign in to comment.