Skip to content

Commit 3495ea9

Browse files
authored
Add PySpark Demo (#34)
1 parent e925908 commit 3495ea9

File tree

8 files changed

+307
-0
lines changed

8 files changed

+307
-0
lines changed

demo/README.md

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ This demo is composed of 3 parts:
99
- `ReadWriteDemo`: reads the ArangoDB collections created above as Spark Dataframes, applies projections and filtering,
1010
writes to a new ArangoDB collection
1111

12+
There are demos available written in Scala & Python (using PySpark) as outlined below.
13+
1214
## Requirements
1315

1416
This demo requires:
@@ -17,6 +19,9 @@ This demo requires:
1719
- `maven`
1820
- `docker`
1921

22+
For the python demo, you will also need
23+
- `python`
24+
2025
## Prepare the environment
2126

2227
Set environment variables:
@@ -79,3 +84,26 @@ docker run -it --rm \
7984
--packages="com.arangodb:arangodb-spark-datasource-3.2_2.12:$ARANGO_SPARK_VERSION" \
8085
--class Demo /demo/target/demo-$ARANGO_SPARK_VERSION.jar
8186
```
87+
88+
## Python(PySpark) Demo
89+
90+
This demo requires the same environment setup as outlined above.
91+
Additionally, the python requirements will need to be installed as follows:
92+
```shell
93+
pip install -r ./python-demo/requirements.txt
94+
```
95+
96+
To run the PySpark demo, run
97+
```shell
98+
python ./python-demo/demo.py \
99+
--endpoints=172.28.0.1:8529,172.28.0.1:8539,172.28.0.1:8549
100+
```
101+
102+
To run it against an Oasis deployment, run
103+
```shell
104+
python ./python-demo/demo.py \
105+
--password=<root-password> \
106+
--endpoints=<endpoint> \
107+
--ssl-enabled=true \
108+
--ssl-cert-value=<base64-encoded-cert>
109+
```

demo/python-demo/demo.py

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
import os
2+
import pathlib
3+
from argparse import ArgumentParser
4+
from typing import Dict
5+
6+
from pyspark.sql import SparkSession
7+
8+
from read_write_demo import read_write_demo
9+
from read_demo import read_demo
10+
from write_demo import write_demo
11+
12+
13+
def create_spark_session() -> SparkSession:
14+
# Here we can initialize the spark session, and in doing so,
15+
# include the ArangoDB Spark DataSource package
16+
arango_spark_version = os.environ["ARANGO_SPARK_VERSION"]
17+
18+
spark = SparkSession.builder \
19+
.appName("ArangoDBPySparkDataTypesExample") \
20+
.master("local[*]") \
21+
.config("spark.jars.packages", f"com.arangodb:arangodb-spark-datasource-3.2_2.12:{arango_spark_version}") \
22+
.getOrCreate()
23+
24+
return spark
25+
26+
27+
def create_base_arangodb_datasource_opts(password: str, endpoints: str, ssl_enabled: str, ssl_cert_value: str) -> Dict[str, str]:
28+
return {
29+
"password": password,
30+
"endpoints": endpoints,
31+
"ssl.enabled": ssl_enabled,
32+
"ssl.cert.value": ssl_cert_value,
33+
}
34+
35+
36+
def main():
37+
parser = ArgumentParser()
38+
parser.add_argument("--import-path", default=None)
39+
parser.add_argument("--password", default="test")
40+
parser.add_argument("--endpoints", default="localhost:8529")
41+
parser.add_argument("--ssl-enabled", default="false")
42+
parser.add_argument("--ssl-cert-value", default="")
43+
args = parser.parse_args()
44+
45+
if args.import_path is None:
46+
args.import_path = pathlib.Path(__file__).resolve().parent.parent / "docker" / "import"
47+
48+
spark = create_spark_session()
49+
base_opts = create_base_arangodb_datasource_opts(args.password, args.endpoints, args.ssl_enabled, args.ssl_cert_value)
50+
write_demo(spark, base_opts, args.import_path)
51+
read_demo(spark, base_opts)
52+
read_write_demo(spark, base_opts)
53+
54+
55+
if __name__ == "__main__":
56+
main()

demo/python-demo/read_demo.py

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
from typing import Dict
2+
3+
import pyspark.sql
4+
from pyspark.sql import SparkSession
5+
from pyspark.sql.types import StructType, StructField, StringType
6+
7+
from schemas import movie_schema
8+
from utils import combine_dicts
9+
10+
11+
def read_demo(spark: SparkSession, base_opts: Dict[str, str]):
12+
movies_df = read_collection(spark, "movies", base_opts, movie_schema)
13+
14+
print("Read table: history movies or documentaries about 'World War' released from 2000-01-01")
15+
# We can get to what we want in 2 different ways:
16+
# First, the PySpark dataframe way...
17+
movies_df \
18+
.select("title", "releaseDate", "genre", "description") \
19+
.filter("genre IN ('History', 'Documentary') AND description LIKE '%World War%' AND releaseDate > '2000'") \
20+
.show()
21+
22+
# Second, in the Pandas on Spark way...
23+
movies_pd_df = movies_df.to_pandas_on_spark()
24+
subset = movies_pd_df[["title", "releaseDate", "genre", "description"]]
25+
recent_ww_movies = subset[subset["genre"].isin(["History", "Documentary"])\
26+
& (subset["releaseDate"] >= '2000')\
27+
& subset["description"].str.contains("World War")]
28+
print(recent_ww_movies)
29+
30+
print("Read query: actors of movies directed by Clint Eastwood with related movie title and interpreted role")
31+
read_aql_query(
32+
spark,
33+
"""WITH movies, persons
34+
FOR v, e, p IN 2 ANY "persons/1062" OUTBOUND directed, INBOUND actedIn
35+
RETURN {movie: p.vertices[1].title, name: v.name, role: p.edges[1].name}
36+
""",
37+
base_opts,
38+
StructType([
39+
StructField("movie", StringType()),
40+
StructField("name", StringType()),
41+
StructField("role", StringType())
42+
])
43+
).show(20, 200)
44+
45+
46+
def read_collection(spark: SparkSession, collection_name: str, base_opts: Dict[str, str], schema: StructType) -> pyspark.sql.DataFrame:
47+
arangodb_datasource_options = combine_dicts([base_opts, {"table": collection_name}])
48+
49+
return spark.read \
50+
.format("com.arangodb.spark") \
51+
.options(**arangodb_datasource_options) \
52+
.schema(schema) \
53+
.load()
54+
55+
56+
def read_aql_query(spark: SparkSession, query: str, base_opts: Dict[str, str], schema: StructType) -> pyspark.sql.DataFrame:
57+
arangodb_datasource_options = combine_dicts([base_opts, {"query": query}])
58+
59+
return spark.read \
60+
.format("com.arangodb.spark") \
61+
.options(**arangodb_datasource_options) \
62+
.schema(schema) \
63+
.load()
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
from typing import Dict
2+
3+
from pyspark.sql import SparkSession
4+
5+
import read_demo
6+
import write_demo
7+
from schemas import movie_schema
8+
9+
10+
def read_write_demo(spark: SparkSession, opts: Dict[str, str]):
11+
print("-----------------------")
12+
print("--- READ-WRITE DEMO ---")
13+
print("-----------------------")
14+
15+
print("Reading 'movies' collection and writing 'actionMovies' collection...")
16+
action_movies_df = read_demo.read_collection(spark, "movies", opts, movie_schema)\
17+
.select("_key", "title", "releaseDate", "runtime", "description")\
18+
.filter("genre = 'Action'")
19+
write_demo.save_df(action_movies_df.to_pandas_on_spark(), "actionMovies", opts)
20+
print("You can now view the actionMovies collection in ArangoDB!")

demo/python-demo/requirements.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
pyspark[pandas_on_spark]==3.2.1

demo/python-demo/schemas.py

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, DateType, IntegerType
2+
3+
movie_schema: StructType = StructType([
4+
StructField("_id", StringType(), nullable=False),
5+
StructField("_key", StringType(), nullable=False),
6+
StructField("description", StringType()),
7+
StructField("genre", StringType()),
8+
StructField("homepage", StringType()),
9+
StructField("imageUrl", StringType()),
10+
StructField("imdbId", StringType()),
11+
StructField("language", StringType()),
12+
StructField("lastModified", TimestampType()),
13+
StructField("releaseDate", DateType()),
14+
StructField("runtime", IntegerType()),
15+
StructField("studio", StringType()),
16+
StructField("tagline", StringType()),
17+
StructField("title", StringType()),
18+
StructField("trailer", StringType())
19+
])
20+
person_schema: StructType = StructType([
21+
StructField("_id", StringType(), nullable=False),
22+
StructField("_key", StringType(), nullable=False),
23+
StructField("biography", StringType()),
24+
StructField("birthday", DateType()),
25+
StructField("birthplace", StringType()),
26+
StructField("lastModified", TimestampType()),
27+
StructField("name", StringType()),
28+
StructField("profileImageUrl", StringType())
29+
])
30+
edges_schema: StructType = StructType([
31+
StructField("_key", StringType(), nullable=False),
32+
StructField("_from", StringType(), nullable=False),
33+
StructField("_to", StringType(), nullable=False),
34+
StructField("$label", StringType()),
35+
StructField("name", StringType()),
36+
StructField("type", StringType()),
37+
])
38+
acts_in_schema: StructType = StructType([
39+
StructField("_id", StringType(), nullable=False),
40+
StructField("_key", StringType(), nullable=False),
41+
StructField("_from", StringType(), nullable=False),
42+
StructField("_to", StringType(), nullable=False),
43+
StructField("name", StringType())
44+
])
45+
directed_schema: StructType = StructType([
46+
StructField("_id", StringType(), nullable=False),
47+
StructField("_key", StringType(), nullable=False),
48+
StructField("_from", StringType(), nullable=False),
49+
StructField("_to", StringType(), nullable=False)
50+
])

demo/python-demo/utils.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
def combine_dicts(list_of_dicts):
2+
whole_dict = {}
3+
for d in list_of_dicts:
4+
whole_dict.update(d)
5+
return whole_dict

demo/python-demo/write_demo.py

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
import datetime
2+
import pathlib
3+
from typing import Dict
4+
5+
from pyspark import pandas as ps
6+
from pyspark.sql import SparkSession, functions as f
7+
from pyspark.sql.types import StructType
8+
9+
from utils import combine_dicts
10+
from schemas import person_schema, movie_schema, directed_schema, acts_in_schema
11+
12+
13+
def save_df(ps_df, table_name: str, options: Dict[str, str], table_type: str = None) -> None:
14+
if not table_type:
15+
table_type = "document"
16+
17+
all_opts = combine_dicts([options, {
18+
"table.shards": "9",
19+
"confirmTruncate": "true",
20+
"overwriteMode": "replace",
21+
"table": table_name,
22+
"table.type": table_type
23+
}])
24+
25+
ps_df.to_spark()\
26+
.write\
27+
.mode("overwrite")\
28+
.format("com.arangodb.spark")\
29+
.options(**all_opts)\
30+
.save()
31+
32+
33+
def write_demo(spark: SparkSession, save_opts: Dict[str, str], import_path_str: str):
34+
import_path = pathlib.Path(import_path_str)
35+
36+
print("Read Nodes from JSONL using Pandas on Spark API")
37+
nodes_pd_df = ps.read_json(str(import_path / "nodes.jsonl"))
38+
nodes_pd_df = nodes_pd_df[nodes_pd_df["_key"].notnull()]
39+
nodes_pd_df["releaseDate"] = ps.to_datetime(nodes_pd_df["releaseDate"], unit="ms")
40+
nodes_pd_df["birthday"] = ps.to_datetime(nodes_pd_df["birthday"], unit="ms")
41+
42+
def convert_to_timestamp(to_modify, column):
43+
tz_aware_datetime = datetime.datetime.utcfromtimestamp(
44+
int(to_modify[column])/1000
45+
).replace(tzinfo=datetime.timezone.utc).astimezone(tz=None)
46+
tz_naive = tz_aware_datetime.replace(tzinfo=None)
47+
to_modify[column] = tz_naive
48+
return to_modify
49+
50+
nodes_pd_df = nodes_pd_df.apply(convert_to_timestamp, axis=1, args=("lastModified",))
51+
52+
nodes_df = nodes_pd_df.to_spark()
53+
nodes_pd_df = nodes_df\
54+
.withColumn("releaseDate", f.to_date(nodes_df["releaseDate"])) \
55+
.withColumn("birthday", f.to_date(nodes_df["birthday"])) \
56+
.to_pandas_on_spark()
57+
58+
print("Read Edges from JSONL using PySpark API")
59+
edges_df = spark.read.json(str(import_path / "edges.jsonl"))
60+
# apply the schema to change nullability of _key, _from, and _to columns in schema
61+
edges_pd_df = edges_df.to_pandas_on_spark()
62+
edges_pd_df["_from"] = "persons/" + edges_pd_df["_from"]
63+
edges_pd_df["_to"] = "movies/" + edges_pd_df["_to"]
64+
65+
print("Create the collection dfs")
66+
persons_df = nodes_pd_df[nodes_pd_df["type"] == "Person"][person_schema.fieldNames()[1:]]
67+
movies_df = nodes_pd_df[nodes_pd_df["type"] == "Movie"][movie_schema.fieldNames()[1:]]
68+
directed_df = edges_pd_df[edges_pd_df["$label"] == "DIRECTED"][directed_schema.fieldNames()[1:]]
69+
acted_in_df = edges_pd_df[edges_pd_df["$label"] == "ACTS_IN"][acts_in_schema.fieldNames()[1:]]
70+
71+
# _from and _to need to be set with nullable=False in the schema in order for it to work
72+
directed_df = spark.createDataFrame(directed_df.to_spark().rdd, StructType(
73+
directed_schema.fields[1:])).to_pandas_on_spark()
74+
acted_in_df = spark.createDataFrame(acted_in_df.to_spark().rdd, StructType(
75+
acts_in_schema.fields[1:])).to_pandas_on_spark()
76+
77+
print("writing the persons collection")
78+
save_df(persons_df, "persons", save_opts)
79+
print("writing the movies collection")
80+
save_df(movies_df, "movies", save_opts)
81+
print("writing the 'directed' edge collection")
82+
save_df(directed_df, "directed", save_opts, "edge")
83+
print("writing the 'actedIn' collection")
84+
save_df(acted_in_df, "actedIn", save_opts, "edge")

0 commit comments

Comments
 (0)