Skip to content

Commit

Permalink
Improve message in case of gcs response failure
Browse files Browse the repository at this point in the history
  • Loading branch information
mdedetrich committed Sep 13, 2024
1 parent eb3ae26 commit 20bcdd5
Showing 1 changed file with 11 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package org.apache.pekko.stream.connectors.google
import org.apache.pekko
import pekko.NotUsed
import pekko.annotation.InternalApi
import pekko.dispatch.ExecutionContexts
import pekko.http.scaladsl.model.HttpMethods.{ POST, PUT }
import pekko.http.scaladsl.model.StatusCodes.{ Created, OK, PermanentRedirect }
import pekko.http.scaladsl.model._
Expand Down Expand Up @@ -93,10 +94,16 @@ private[connectors] object ResumableUpload {
import implicits._

implicit val um: FromResponseUnmarshaller[Uri] =
Unmarshaller.withMaterializer { implicit ec => implicit mat => (response: HttpResponse) =>
response.discardEntityBytes().future.map { _ =>
response.header[Location].fold(throw InvalidResponseException(ErrorInfo("No Location header")))(_.uri)
}
Unmarshaller.withMaterializer { _ => implicit mat => (response: HttpResponse) =>
if (response.status.isSuccess())
response.discardEntityBytes().future.map { _ =>
response.header[Location].fold(throw InvalidResponseException(ErrorInfo("No Location header")))(_.uri)
}(ExecutionContexts.parasitic)
else
Unmarshal(response.entity).to[String].flatMap { errorString =>
Future.failed(InvalidResponseException(
ErrorInfo(s"Resumable upload failed with status: ${response.status}", s"body: $errorString")))
}(ExecutionContexts.parasitic)
}.withDefaultRetry

GoogleHttp(mat.system).singleAuthenticatedRequest[Uri](request)
Expand Down

0 comments on commit 20bcdd5

Please sign in to comment.