Skip to content

Commit d0e5d6e

Browse files
authored
[DE-675] read from smart graph edge collections (#47)
* updated test docker images * deps upd * 3.10: read from smart edge collection * fixed tests for Scala 2.13 * CI: added enterprise tests * 3.11: read from smart edge collection * fixed compilation for Scala 2.12
1 parent 13afe99 commit d0e5d6e

File tree

5 files changed

+131
-22
lines changed

5 files changed

+131
-22
lines changed

.github/workflows/test.yml

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -47,9 +47,10 @@ jobs:
4747
- 8
4848
- 11
4949
docker-img:
50-
- docker.io/arangodb/arangodb:3.9.10
51-
- docker.io/arangodb/arangodb:3.10.6
52-
- docker.io/arangodb/arangodb:3.11.0
50+
- docker.io/arangodb/arangodb:3.10.10
51+
- docker.io/arangodb/enterprise:3.10.10
52+
- docker.io/arangodb/arangodb:3.11.3
53+
- docker.io/arangodb/enterprise:3.11.3
5354
exclude:
5455
- scala-version: 2.11
5556
spark-version: 3.1
@@ -65,12 +66,18 @@ jobs:
6566
spark-version: 2.4
6667
- scala-version: 2.13
6768
spark-version: 3.1
68-
- docker-img: docker.io/arangodb/arangodb:3.9.10
69+
- docker-img: docker.io/arangodb/arangodb:3.10.10
6970
java-version: 8
70-
- docker-img: docker.io/arangodb/arangodb:3.10.6
71+
- docker-img: docker.io/arangodb/enterprise:3.10.10
7172
java-version: 8
72-
- docker-img: docker.io/arangodb/arangodb:3.11.0
73+
- docker-img: docker.io/arangodb/enterprise:3.10.10
74+
topology: single
75+
- docker-img: docker.io/arangodb/arangodb:3.11.3
7376
java-version: 11
77+
- docker-img: docker.io/arangodb/enterprise:3.11.3
78+
java-version: 8
79+
- docker-img: docker.io/arangodb/enterprise:3.11.3
80+
topology: single
7481

7582
steps:
7683
- uses: actions/checkout@v2
@@ -111,7 +118,7 @@ jobs:
111118
java-version:
112119
- 8
113120
docker-img:
114-
- docker.io/arangodb/arangodb:3.11.0
121+
- docker.io/arangodb/arangodb:3.11.3
115122
exclude:
116123
- scala-version: 2.11
117124
spark-version: 3.1
@@ -157,7 +164,7 @@ jobs:
157164
spark-version: [3.1, 3.2, 3.3, 3.4]
158165
topology: [single, cluster]
159166
java-version: [8, 11]
160-
docker-img: ["docker.io/arangodb/arangodb:3.11.0"]
167+
docker-img: ["docker.io/arangodb/arangodb:3.11.3"]
161168
exclude:
162169
- topology: cluster
163170
java-version: 8
@@ -218,7 +225,7 @@ jobs:
218225
- 8
219226
- 11
220227
docker-img:
221-
- docker.io/arangodb/arangodb:3.11.0
228+
- docker.io/arangodb/arangodb:3.11.3
222229
exclude:
223230
- scala-version: 2.11
224231
spark-version: 3.1
@@ -344,7 +351,7 @@ jobs:
344351
run: ./docker/start_db.sh
345352
env:
346353
STARTER_MODE: cluster
347-
DOCKER_IMAGE: docker.io/arangodb/arangodb:3.11.0
354+
DOCKER_IMAGE: docker.io/arangodb/arangodb:3.11.3
348355
- name: Info
349356
run: mvn -version
350357
- name: Install
@@ -368,7 +375,7 @@ jobs:
368375
java-version:
369376
- 11
370377
docker-img:
371-
- docker.io/arangodb/arangodb:3.11.0
378+
- docker.io/arangodb/arangodb:3.11.3
372379

373380
steps:
374381
- uses: actions/checkout@v2

arangodb-spark-commons/src/main/scala/org/apache/spark/sql/arangodb/commons/ArangoClient.scala

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -206,13 +206,22 @@ object ArangoClient extends Logging {
206206
val client = ArangoClient(options)
207207
val adb = client.arangoDB
208208
try {
209-
val res = adb.execute(new Request.Builder[Void]()
209+
val colName = options.readOptions.collection.get
210+
val props = adb.execute(new Request.Builder[Void]()
210211
.db(options.readOptions.db)
211212
.method(Request.Method.GET)
212-
.path(s"/_api/collection/${options.readOptions.collection.get}/shards")
213+
.path(s"/_api/collection/$colName/properties")
213214
.build(),
214-
classOf[RawBytes])
215-
val shardIds: Array[String] = adb.getSerde.deserialize(res.getBody.get, "/shards", classOf[Array[String]])
215+
classOf[java.util.Map[String, Any]]).getBody
216+
217+
val shardIds: Array[String] =
218+
if (props.get("isSmart") == true && props.get("type") == 3) {
219+
// Smart Edge collection (BTS-1595, BTS-1596)
220+
requestShards(adb, options.readOptions.db, s"_local_$colName") ++
221+
requestShards(adb, options.readOptions.db, s"_from_$colName")
222+
} else {
223+
requestShards(adb, options.readOptions.db, colName)
224+
}
216225
client.shutdown()
217226
shardIds
218227
} catch {
@@ -221,13 +230,23 @@ object ArangoClient extends Logging {
221230
// single server < 3.8 returns Response: 500, Error: 4 - internal error
222231
// single server >= 3.8 returns Response: 501, Error: 9 - shards API is only available in a cluster
223232
if (INTERNAL_ERROR_CODE.equals(e.getErrorNum) || SHARDS_API_UNAVAILABLE_CODE.equals(e.getErrorNum)) {
224-
Array("")
233+
Array(null)
225234
} else {
226235
throw e
227236
}
228237
}
229238
}
230239

240+
private def requestShards(adb: ArangoDB, db: String, col: String): Array[String] = {
241+
val res = adb.execute(new Request.Builder[Void]()
242+
.db(db)
243+
.method(Request.Method.GET)
244+
.path(s"/_api/collection/$col/shards")
245+
.build(),
246+
classOf[RawBytes])
247+
adb.getSerde.deserialize(res.getBody.get, "/shards", classOf[Array[String]])
248+
}
249+
231250
def acquireHostList(options: ArangoDBConf): Iterable[String] = {
232251
logDebug("acquiring host list")
233252
val client = ArangoClient(options)

integration-tests/src/test/scala/org/apache/spark/sql/arangodb/datasource/BaseSparkTest.scala

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
package org.apache.spark.sql.arangodb.datasource
22

3-
import com.arangodb.entity.ServerRole
3+
import com.arangodb.entity.{License, ServerRole}
44
import com.arangodb.model.CollectionCreateOptions
55
import com.arangodb.serde.jackson.JacksonSerde
66
import com.arangodb.spark.DefaultSource
@@ -33,7 +33,9 @@ class BaseSparkTest {
3333

3434
def isSingle: Boolean = BaseSparkTest.isSingle
3535

36-
def isCluster: Boolean = !BaseSparkTest.isSingle
36+
def isCluster: Boolean = BaseSparkTest.isCluster
37+
38+
def isEnterprise: Boolean = BaseSparkTest.isEnterprise
3739
}
3840

3941
object BaseSparkTest {
@@ -78,8 +80,10 @@ object BaseSparkTest {
7880
.serde(serde)
7981
.build()
8082
}
81-
private val db: ArangoDatabase = arangoDB.db(database)
82-
private val isSingle: Boolean = arangoDB.getRole == ServerRole.SINGLE
83+
val db: ArangoDatabase = arangoDB.db(database)
84+
val isSingle: Boolean = arangoDB.getRole == ServerRole.SINGLE
85+
val isCluster: Boolean = !isSingle
86+
val isEnterprise: Boolean = arangoDB.getVersion.getLicense == License.ENTERPRISE
8387
private val options = Map(
8488
"database" -> database,
8589
"user" -> user,
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
package org.apache.spark.sql.arangodb.datasource
2+
3+
import com.arangodb.entity.EdgeDefinition
4+
import com.arangodb.model.GraphCreateOptions
5+
import org.apache.spark.sql.DataFrame
6+
import org.apache.spark.sql.arangodb.commons.ArangoDBConf
7+
import org.assertj.core.api.Assertions.assertThat
8+
import org.junit.jupiter.api.Assumptions.assumeTrue
9+
import org.junit.jupiter.api.BeforeAll
10+
import org.junit.jupiter.params.ParameterizedTest
11+
import org.junit.jupiter.params.provider.MethodSource
12+
13+
import java.util
14+
import scala.collection.JavaConverters.asJavaIterableConverter
15+
import scala.collection.immutable
16+
17+
class ReadSmartEdgeCollectionTest extends BaseSparkTest {
18+
19+
@ParameterizedTest
20+
@MethodSource(Array("provideProtocolAndContentType"))
21+
def readSmartEdgeCollection(protocol: String, contentType: String): Unit = {
22+
val df: DataFrame = spark.read
23+
.format(BaseSparkTest.arangoDatasource)
24+
.options(options + (
25+
ArangoDBConf.COLLECTION -> ReadSmartEdgeCollectionTest.name,
26+
ArangoDBConf.PROTOCOL -> protocol,
27+
ArangoDBConf.CONTENT_TYPE -> contentType
28+
))
29+
.load()
30+
31+
32+
import spark.implicits._
33+
val read = df
34+
.as[Edge]
35+
.collect()
36+
37+
assertThat(read.map(_.name)).containsAll(ReadSmartEdgeCollectionTest.data.map(d => d("name")).asJava)
38+
}
39+
40+
}
41+
42+
object ReadSmartEdgeCollectionTest {
43+
val name = "smartEdgeCol"
44+
val from = s"from-$name"
45+
val to = s"from-$name"
46+
47+
val data: immutable.Seq[Map[String, String]] = (1 to 10)
48+
.map(x => Map(
49+
"name" -> s"name-$x",
50+
"_from" -> s"$from/a:$x",
51+
"_to" -> s"$to/b:$x"
52+
))
53+
54+
@BeforeAll
55+
def init(): Unit = {
56+
assumeTrue(BaseSparkTest.isCluster && BaseSparkTest.isEnterprise)
57+
58+
if (BaseSparkTest.db.graph(name).exists()) {
59+
BaseSparkTest.db.graph(name).drop(true)
60+
}
61+
62+
val ed = new EdgeDefinition()
63+
.collection(name)
64+
.from(from)
65+
.to(to)
66+
val opts = new GraphCreateOptions()
67+
.numberOfShards(2)
68+
.isSmart(true)
69+
.smartGraphAttribute("name")
70+
BaseSparkTest.db.createGraph(name, List(ed).asJava.asInstanceOf[util.Collection[EdgeDefinition]], opts)
71+
BaseSparkTest.db.collection(name).insertDocuments(data.asJava.asInstanceOf[util.Collection[Any]])
72+
}
73+
}
74+
75+
case class Edge(
76+
name: String,
77+
_from: String,
78+
_to: String
79+
)

pom.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@
111111
<profile>
112112
<id>spark-3.3</id>
113113
<properties>
114-
<spark.version>3.3.2</spark.version>
114+
<spark.version>3.3.3</spark.version>
115115
<spark.compat.version>3.3</spark.compat.version>
116116
<jackson.vpack.variant/>
117117
<jackson.vpack.version>4.1.0</jackson.vpack.version>
@@ -120,7 +120,7 @@
120120
<profile>
121121
<id>spark-3.4</id>
122122
<properties>
123-
<spark.version>3.4.0</spark.version>
123+
<spark.version>3.4.1</spark.version>
124124
<spark.compat.version>3.4</spark.compat.version>
125125
<jackson.vpack.variant/>
126126
<jackson.vpack.version>4.1.0</jackson.vpack.version>

0 commit comments

Comments
 (0)