Skip to content

Commit e925908

Browse files
geenen124rashtao
andauthored
Add PySpark Integration Tests (#33)
* Add Initial MacOS ArangoDB Starter Script * test(macos): Create Network When Starting DB * (test): Add Initial PyTest Setup * (test): Port Read Tests To Python * (test): Python BadRecords & CompositeFilter Tests * (test): Add Deserialization Cast & WIP EqualTo Python * (test): Add equalTo & IN Tests Python * (test): Add Python Test CLI Args & Initial Github Actions * (test): Remove Commented Matrix Python Tests * (test): Simplify Python Test Matrix * Update test_python.yml * Update test_python.yml * Update test_python.yml * (test): Add Python Test Requirements File * (test): Move Python Tests To Separate Dir * (test): Add More Python Tests & Fix isin Timestamps * (test): Expand Python Test Matrix * (test): Python - Remove non-null Requirement For Null Cast * (test): Add Python SaveMode Tests * (test): Update Python Test Matrix To Include Cluster & 3.2 Also changed how PySpark 2.4 is included - to prevent exhaustive exclude cases * (test): Disable Python Schema Inference Test For Cluster This test is not valid in cluster mode - (since ordering is not guaranteed) * (test): Disable PySpark 2.4 Tests For Now * Update test_python.yml * ignore CI for changes to demo/** * merged CI workflows Co-authored-by: Michele Rastelli <michele@arangodb.com>
1 parent c7ac8ff commit e925908

File tree

17 files changed

+1568
-0
lines changed

17 files changed

+1568
-0
lines changed

.github/workflows/test.yml

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,55 @@ jobs:
9393
name: logs.tgz
9494
path: ./logs.tgz
9595

96+
test-python:
97+
runs-on: ubuntu-latest
98+
strategy:
99+
fail-fast: false
100+
matrix:
101+
python-version: [3.9]
102+
scala-version: [2.12]
103+
spark-version: [3.1, 3.2]
104+
topology: [single, cluster]
105+
java-version: [8, 11]
106+
docker-img: ["docker.io/arangodb/arangodb:3.9.1"]
107+
108+
steps:
109+
- uses: actions/checkout@v2
110+
with:
111+
fetch-depth: 0
112+
- uses: actions/setup-java@v2
113+
with:
114+
java-version: ${{matrix.java-version}}
115+
distribution: 'adopt'
116+
- uses: actions/setup-python@v4
117+
with:
118+
python-version: ${{matrix.python-version}}
119+
- name: Start Database
120+
run: ./docker/start_db.sh
121+
env:
122+
STARTER_MODE: ${{matrix.topology}}
123+
DOCKER_IMAGE: ${{matrix.docker-img}}
124+
- name: Cache local Maven repository
125+
uses: actions/cache@v2
126+
with:
127+
path: ~/.m2/repository
128+
key: ${{ runner.os }}-maven-${{ hashFiles('**/pom.xml') }}
129+
restore-keys: ${{ runner.os }}-maven-
130+
- name: Maven Info
131+
run: mvn -version
132+
- name: Install python dependencies
133+
run: |
134+
python -m pip install --upgrade pip
135+
pip install pyspark~=${{matrix.spark-version}}.0
136+
pip install -r python-integration-tests/test-requirements.txt
137+
- name: Build Spark Datasource Artifact
138+
run: |
139+
mvn -e --no-transfer-progress -DskipTests -Pscala-${{matrix.scala-version}} -Pspark-${{matrix.spark-version}} package
140+
find . -name "arangodb-spark-datasource-${{matrix.spark-version}}_*-jar-with-dependencies.jar" -exec cp {} ./arangodb-spark-datasource-under-test.jar \;
141+
- name: Run tests for PySpark ${{matrix.spark-version}} on Python ${{matrix.python-version}}
142+
run: |
143+
pytest python-integration-tests/integration --adb-datasource-jar ./arangodb-spark-datasource-under-test.jar --adb-hostname 172.28.0.1
144+
96145
testSsl:
97146
timeout-minutes: 5
98147
runs-on: ubuntu-latest

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,3 +8,4 @@
88
**/docker/jwtHeader
99
**/docker/jwtSecret
1010
**/.flattened-pom.xml
11+
__pycache__/

docker/start_db_macos.sh

Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
#!/bin/bash
2+
3+
# Configuration environment variables:
4+
# STARTER_MODE: (single|cluster|activefailover), default single
5+
# DOCKER_IMAGE: ArangoDB docker image, default docker.io/arangodb/arangodb:latest
6+
# SSL: (true|false), default false
7+
# DATABASE_EXTENDED_NAMES: (true|false), default false
8+
# ARANGO_LICENSE_KEY: only required for ArangoDB Enterprise
9+
10+
# EXAMPLE:
11+
# STARTER_MODE=cluster SSL=true ./start_db.sh
12+
13+
STARTER_MODE=${STARTER_MODE:=single}
14+
DOCKER_IMAGE=${DOCKER_IMAGE:=docker.io/arangodb/arangodb:latest}
15+
SSL=${SSL:=false}
16+
DATABASE_EXTENDED_NAMES=${DATABASE_EXTENDED_NAMES:=false}
17+
18+
STARTER_DOCKER_IMAGE=docker.io/arangodb/arangodb-starter:latest
19+
GW=172.28.0.1
20+
LOCALGW=localhost
21+
docker network create arangodb --subnet 172.28.0.0/16
22+
23+
# exit when any command fails
24+
set -e
25+
26+
docker pull $STARTER_DOCKER_IMAGE
27+
docker pull $DOCKER_IMAGE
28+
29+
LOCATION=$(pwd)/$(dirname "$0")
30+
31+
echo "Averysecretword" > "$LOCATION"/jwtSecret
32+
docker run --rm -v "$LOCATION"/jwtSecret:/jwtSecret "$STARTER_DOCKER_IMAGE" auth header --auth.jwt-secret /jwtSecret > "$LOCATION"/jwtHeader
33+
AUTHORIZATION_HEADER=$(cat "$LOCATION"/jwtHeader)
34+
35+
STARTER_ARGS=
36+
SCHEME=http
37+
ARANGOSH_SCHEME=http+tcp
38+
COORDINATORS=("$LOCALGW:8529" "$LOCALGW:8539" "$LOCALGW:8549")
39+
COORDINATORSINTERNAL=("$GW:8529" "$GW:8539" "$GW:8549")
40+
41+
if [ "$STARTER_MODE" == "single" ]; then
42+
COORDINATORS=("$LOCALGW:8529")
43+
COORDINATORSINTERNAL=("$GW:8529")
44+
fi
45+
46+
if [ "$SSL" == "true" ]; then
47+
STARTER_ARGS="$STARTER_ARGS --ssl.keyfile=server.pem"
48+
SCHEME=https
49+
ARANGOSH_SCHEME=http+ssl
50+
fi
51+
52+
if [ "$DATABASE_EXTENDED_NAMES" == "true" ]; then
53+
STARTER_ARGS="${STARTER_ARGS} --all.database.extended-names-databases=true"
54+
fi
55+
56+
if [ "$USE_MOUNTED_DATA" == "true" ]; then
57+
STARTER_ARGS="${STARTER_ARGS} --starter.data-dir=/data"
58+
MOUNT_DATA="-v $LOCATION/data:/data"
59+
fi
60+
61+
docker run -d \
62+
--name=adb \
63+
-p 8528:8528 \
64+
-v "$LOCATION"/server.pem:/server.pem \
65+
-v "$LOCATION"/jwtSecret:/jwtSecret \
66+
$MOUNT_DATA \
67+
-v /var/run/docker.sock:/var/run/docker.sock \
68+
-e ARANGO_LICENSE_KEY="$ARANGO_LICENSE_KEY" \
69+
$STARTER_DOCKER_IMAGE \
70+
$STARTER_ARGS \
71+
--docker.container=adb \
72+
--auth.jwt-secret=/jwtSecret \
73+
--starter.address="${GW}" \
74+
--docker.image="${DOCKER_IMAGE}" \
75+
--starter.local --starter.mode=${STARTER_MODE} --all.log.level=debug --all.log.output=+ --log.verbose
76+
77+
78+
wait_server() {
79+
# shellcheck disable=SC2091
80+
until $(curl --output /dev/null --insecure --fail --silent --head -i -H "$AUTHORIZATION_HEADER" "$SCHEME://$1/_api/version"); do
81+
printf '.'
82+
sleep 1
83+
done
84+
}
85+
86+
echo "Waiting..."
87+
88+
for a in ${COORDINATORS[*]} ; do
89+
wait_server "$a"
90+
done
91+
92+
set +e
93+
ITER=0
94+
for a in ${COORDINATORS[*]} ; do
95+
echo ""
96+
echo "Setting username and password..."
97+
docker run --rm ${DOCKER_IMAGE} arangosh --server.endpoint="$ARANGOSH_SCHEME://${COORDINATORSINTERNAL[ITER]}" --server.authentication=false --javascript.execute-string='require("org/arangodb/users").update("root", "test")'
98+
ITER=$(expr $ITER + 1)
99+
done
100+
set -e
101+
102+
for a in ${COORDINATORS[*]} ; do
103+
echo ""
104+
echo "Requesting endpoint version..."
105+
curl -u root:test --insecure --fail "$SCHEME://$a/_api/version"
106+
done
107+
108+
echo ""
109+
echo ""
110+
echo "Done, your deployment is reachable at: "
111+
for a in ${COORDINATORS[*]} ; do
112+
echo "$SCHEME://$a"
113+
echo ""
114+
done
115+
116+
if [ "$STARTER_MODE" == "activefailover" ]; then
117+
LEADER=$("$LOCATION"/find_active_endpoint.sh)
118+
echo "Leader: $SCHEME://$LEADER"
119+
echo ""
120+
fi

python-integration-tests/integration/__init__.py

Whitespace-only changes.
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
import pytest
2+
from integration.test_basespark import arangodb_client, database_conn, spark, endpoints, single_endpoint, adb_hostname
3+
4+
5+
def pytest_addoption(parser):
6+
parser.addoption("--adb-datasource-jar", action="store", dest="datasource_jar_loc", required=True)
7+
parser.addoption("--adb-hostname", action="store", dest="adb_hostname", default="localhost")
Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
from typing import Any, Dict, List
2+
3+
import arango.database
4+
import pytest
5+
from py4j.protocol import Py4JJavaError
6+
from pyspark.sql import SparkSession
7+
from pyspark.sql.types import StructType, StringType, StructField, IntegerType, DoubleType, BooleanType
8+
9+
from integration import test_basespark
10+
from integration.test_basespark import options, arango_datasource_name
11+
from integration.utils import combine_dicts
12+
13+
14+
content_types = ["vpack", "json"]
15+
COLLECTION_NAME = "deserializationCast"
16+
17+
18+
def do_test_bad_record(db: arango.database.StandardDatabase, spark: SparkSession, schema: StructType, data: List[Dict[str, Any]], json_data: List[str], opts: Dict[str, str]):
19+
df_from_json = spark.read.schema(schema).options(**opts).json(spark.sparkContext.parallelize(json_data))
20+
df_from_json.show()
21+
22+
table_df = test_basespark.create_df(db, spark, COLLECTION_NAME, data, schema, opts)
23+
assert table_df.collect() == df_from_json.collect()
24+
25+
query_df = test_basespark.create_query_df(spark, f"RETURN {json_data[0]}", schema, opts)
26+
assert query_df.collect() == df_from_json.collect()
27+
28+
29+
def check_bad_record(db: arango.database.StandardDatabase, spark: SparkSession, schema: StructType, data: List[Dict[str, Any]], json_data: List[str], content_type: str):
30+
# Permissive
31+
do_test_bad_record(db, spark, schema, data, json_data, {"contentType": content_type})
32+
33+
# Permissive with column name of corrupt record
34+
do_test_bad_record(
35+
db,
36+
spark,
37+
schema.add(StructField("corruptRecord", StringType())),
38+
data,
39+
json_data,
40+
{
41+
"contentType": content_type,
42+
"columnNameOfCorruptRecord": "corruptRecord"
43+
}
44+
)
45+
46+
# Dropmalformed
47+
do_test_bad_record(db, spark, schema, data, json_data,
48+
{
49+
"contentType": content_type,
50+
"mode": "DROPMALFORMED"
51+
})
52+
53+
# Failfast
54+
df = test_basespark.create_df(db, spark, COLLECTION_NAME, data, schema,
55+
{
56+
"contentType": content_type,
57+
"mode": "FAILFAST"
58+
})
59+
with pytest.raises(Py4JJavaError) as e:
60+
df.collect()
61+
62+
e.match("SparkException")
63+
e.match("Malformed record")
64+
if not spark.version.startswith("2.4"):
65+
e.match("BadRecordException")
66+
67+
68+
@pytest.mark.parametrize("content_type", content_types)
69+
def test_string_as_integer(database_conn: arango.database.StandardDatabase, spark: SparkSession, content_type: str):
70+
check_bad_record(
71+
database_conn,
72+
spark,
73+
StructType([StructField("a", IntegerType())]),
74+
[{"a": "1"}],
75+
['{"a":"1"}'],
76+
content_type
77+
)
78+
79+
80+
@pytest.mark.parametrize("content_type", content_types)
81+
def test_boolean_as_integer(database_conn: arango.database.StandardDatabase, spark: SparkSession, content_type: str):
82+
check_bad_record(
83+
database_conn,
84+
spark,
85+
StructType([StructField("a", IntegerType())]),
86+
[{"a": True}],
87+
['{"a":true}'],
88+
content_type
89+
)
90+
91+
92+
@pytest.mark.parametrize("content_type", content_types)
93+
def test_string_as_double(database_conn: arango.database.StandardDatabase, spark: SparkSession, content_type: str):
94+
check_bad_record(
95+
database_conn,
96+
spark,
97+
StructType([StructField("a", DoubleType())]),
98+
[{"a": "1"}],
99+
['{"a":"1"}'],
100+
content_type
101+
)
102+
103+
104+
@pytest.mark.parametrize("content_type", content_types)
105+
def test_boolean_as_double(database_conn: arango.database.StandardDatabase, spark: SparkSession, content_type: str):
106+
check_bad_record(
107+
database_conn,
108+
spark,
109+
StructType([StructField("a", DoubleType())]),
110+
[{"a": True}],
111+
['{"a":true}'],
112+
content_type
113+
)
114+
115+
116+
@pytest.mark.parametrize("content_type", content_types)
117+
def test_string_as_boolean(database_conn: arango.database.StandardDatabase, spark: SparkSession, content_type: str):
118+
check_bad_record(
119+
database_conn,
120+
spark,
121+
StructType([StructField("a", BooleanType())]),
122+
[{"a": "true"}],
123+
['{"a":"true"}'],
124+
content_type
125+
)
126+
127+
128+
@pytest.mark.parametrize("content_type", content_types)
129+
def test_number_as_boolean(database_conn: arango.database.StandardDatabase, spark: SparkSession, content_type: str):
130+
check_bad_record(
131+
database_conn,
132+
spark,
133+
StructType([StructField("a", BooleanType())]),
134+
[{"a": 1}],
135+
['{"a":1}'],
136+
content_type
137+
)

0 commit comments

Comments
 (0)