Skip to content

Commit 5ea55d7

Browse files
authored
previous attempts exceptions in ArangoDBDataWriterException (#42)
1 parent 1962277 commit 5ea55d7

File tree

8 files changed

+64
-7
lines changed

8 files changed

+64
-7
lines changed

ChangeLog.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ to [Semantic Versioning](http://semver.org/spec/v2.0.0.html).
55

66
## [Unreleased]
77

8+
- added previous attempts exceptions in `ArangoDBDataWriterException`
9+
810
## [1.4.2] - 2023-03-16
911

1012
- added debug header `x-arango-spark-request-id`
Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,24 @@
11
package org.apache.spark.sql.arangodb.commons.exceptions
22

3-
class ArangoDBDataWriterException(val exception: Exception, val attempts: Int = 1)
4-
extends RuntimeException(s"Failed $attempts times, most recent failure: $exception", exception)
3+
/**
4+
* Exception thrown after all writes attempts have failed.
5+
* It contains the exceptions thrown at each attempt.
6+
*
7+
* @param exceptions array of exceptions thrown at each attempt
8+
*/
9+
class ArangoDBDataWriterException(val exceptions: Array[Exception])
10+
extends RuntimeException(s"Failed ${exceptions.length} times: ${ArangoDBDataWriterException.toMessage(exceptions)}") {
11+
val attempts: Int = exceptions.length
12+
13+
override def getCause: Throwable = exceptions(0)
14+
}
15+
16+
private object ArangoDBDataWriterException {
17+
18+
// creates exception message
19+
private def toMessage(exceptions: Array[Exception]): String = exceptions
20+
.zipWithIndex
21+
.map(it => s"""Attempt #${it._2 + 1}: ${it._1}""")
22+
.mkString("[\n\t", ",\n\t", "\n]")
23+
24+
}

arangodb-spark-datasource-2.4/src/main/scala/org/apache/spark/sql/arangodb/datasource/writer/ArangoDataWriter.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ class ArangoDataWriter(schema: StructType, options: ArangoDBConf, partitionId: I
2121
extends DataWriter[InternalRow] with Logging {
2222

2323
private var failures = 0
24+
private var exceptions: List[Exception] = List()
2425
private var requestCount = 0L
2526
private var endpointIdx = partitionId
2627
private val endpoints = Stream.continually(options.driverOptions.endpoints).flatten
@@ -90,10 +91,12 @@ class ArangoDataWriter(schema: StructType, options: ArangoDBConf, partitionId: I
9091
client.saveDocuments(payload)
9192
logDebug(s"Received response #$requestCount for partition $partitionId")
9293
failures = 0
94+
exceptions = List()
9395
} catch {
9496
case e: Exception =>
9597
client.shutdown()
9698
failures += 1
99+
exceptions = e :: exceptions
97100
endpointIdx += 1
98101
if ((canRetry || isConnectionException(e)) && failures < options.writeOptions.maxAttempts) {
99102
val delay = computeDelay()
@@ -102,7 +105,7 @@ class ArangoDataWriter(schema: StructType, options: ArangoDBConf, partitionId: I
102105
client = createClient()
103106
saveDocuments(payload)
104107
} else {
105-
throw new ArangoDBDataWriterException(e, failures)
108+
throw new ArangoDBDataWriterException(exceptions.reverse.toArray)
106109
}
107110
}
108111
}

arangodb-spark-datasource-3.1/src/main/scala/org/apache/spark/sql/arangodb/datasource/writer/ArangoDataWriter.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ class ArangoDataWriter(schema: StructType, options: ArangoDBConf, partitionId: I
2121
extends DataWriter[InternalRow] with Logging {
2222

2323
private var failures = 0
24+
private var exceptions: List[Exception] = List()
2425
private var requestCount = 0L
2526
private var endpointIdx = partitionId
2627
private val endpoints = Stream.continually(options.driverOptions.endpoints).flatten
@@ -91,10 +92,12 @@ class ArangoDataWriter(schema: StructType, options: ArangoDBConf, partitionId: I
9192
client.saveDocuments(payload)
9293
logDebug(s"Received response #$requestCount for partition $partitionId")
9394
failures = 0
95+
exceptions = List()
9496
} catch {
9597
case e: Exception =>
9698
client.shutdown()
9799
failures += 1
100+
exceptions = e :: exceptions
98101
endpointIdx += 1
99102
if ((canRetry || isConnectionException(e)) && failures < options.writeOptions.maxAttempts) {
100103
val delay = computeDelay()
@@ -103,7 +106,7 @@ class ArangoDataWriter(schema: StructType, options: ArangoDBConf, partitionId: I
103106
client = createClient()
104107
saveDocuments(payload)
105108
} else {
106-
throw new ArangoDBDataWriterException(e, failures)
109+
throw new ArangoDBDataWriterException(exceptions.reverse.toArray)
107110
}
108111
}
109112
}

arangodb-spark-datasource-3.2/src/main/scala/org/apache/spark/sql/arangodb/datasource/writer/ArangoDataWriter.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ class ArangoDataWriter(schema: StructType, options: ArangoDBConf, partitionId: I
2121
extends DataWriter[InternalRow] with Logging {
2222

2323
private var failures = 0
24+
private var exceptions: List[Exception] = List()
2425
private var requestCount = 0L
2526
private var endpointIdx = partitionId
2627
private val endpoints = Stream.continually(options.driverOptions.endpoints).flatten
@@ -91,10 +92,12 @@ class ArangoDataWriter(schema: StructType, options: ArangoDBConf, partitionId: I
9192
client.saveDocuments(payload)
9293
logDebug(s"Received response #$requestCount for partition $partitionId")
9394
failures = 0
95+
exceptions = List()
9496
} catch {
9597
case e: Exception =>
9698
client.shutdown()
9799
failures += 1
100+
exceptions = e :: exceptions
98101
endpointIdx += 1
99102
if ((canRetry || isConnectionException(e)) && failures < options.writeOptions.maxAttempts) {
100103
val delay = computeDelay()
@@ -103,7 +106,7 @@ class ArangoDataWriter(schema: StructType, options: ArangoDBConf, partitionId: I
103106
client = createClient()
104107
saveDocuments(payload)
105108
} else {
106-
throw new ArangoDBDataWriterException(e, failures)
109+
throw new ArangoDBDataWriterException(exceptions.reverse.toArray)
107110
}
108111
}
109112
}

bin/clean.sh

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
#!/bin/bash
2+
3+
mvn clean -Pspark-2.4 -Pscala-2.11
4+
mvn clean -Pspark-2.4 -Pscala-2.12
5+
mvn clean -Pspark-3.1 -Pscala-2.12
6+
mvn clean -Pspark-3.2 -Pscala-2.12
7+
mvn clean -Pspark-3.2 -Pscala-2.13

bin/test.sh

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
#!/bin/bash
2+
3+
# exit when any command fails
4+
set -e
5+
6+
mvn clean -Pspark-2.4 -Pscala-2.11
7+
mvn test -Pspark-2.4 -Pscala-2.11
8+
9+
mvn clean -Pspark-2.4 -Pscala-2.12
10+
mvn test -Pspark-2.4 -Pscala-2.12
11+
12+
mvn clean -Pspark-3.1 -Pscala-2.12
13+
mvn test -Pspark-3.1 -Pscala-2.12
14+
15+
mvn clean -Pspark-3.2 -Pscala-2.12
16+
mvn test -Pspark-3.2 -Pscala-2.12
17+
18+
mvn clean -Pspark-3.2 -Pscala-2.13
19+
mvn test -Pspark-3.2 -Pscala-2.13

integration-tests/src/test/scala/org/apache/spark/sql/arangodb/datasource/write/AbortTest.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ class AbortTest extends BaseSparkTest {
8787
assertThat(thrown).isInstanceOf(classOf[SparkException])
8888
assertThat(thrown.getCause.getCause).isInstanceOf(classOf[ArangoDBDataWriterException])
8989
assertThat(thrown.getCause.getCause.asInstanceOf[ArangoDBDataWriterException].attempts).isEqualTo(1)
90-
assertThat(thrown.getCause.getCause.getMessage).contains("Failed 1 times, most recent failure:")
90+
assertThat(thrown.getCause.getCause.getMessage).contains("Failed 1 times:")
9191

9292
val validInserted = db.query(
9393
s"""FOR d IN $collectionName FILTER d.name == "valid" RETURN d""",
@@ -121,7 +121,7 @@ class AbortTest extends BaseSparkTest {
121121
assertThat(thrown).isInstanceOf(classOf[SparkException])
122122
assertThat(thrown.getCause.getCause).isInstanceOf(classOf[ArangoDBDataWriterException])
123123
assertThat(thrown.getCause.getCause.asInstanceOf[ArangoDBDataWriterException].attempts).isEqualTo(10)
124-
assertThat(thrown.getCause.getCause.getMessage).contains("Failed 10 times, most recent failure:")
124+
assertThat(thrown.getCause.getCause.getMessage).contains("Failed 10 times:")
125125

126126
val rootEx = thrown.getCause.getCause.getCause
127127
assertThat(rootEx).isInstanceOf(classOf[ArangoDBMultiException])

0 commit comments

Comments
 (0)