Skip to content

Commit a00f2aa

Browse files
authored
[DE-210] Feature/ignore null fields (#28)
* ignoreNullFields * mapping options * Spark 2.4 * test keep null in arrays
1 parent f85831c commit a00f2aa

File tree

19 files changed

+266
-62
lines changed

19 files changed

+266
-62
lines changed

arangodb-spark-commons/src/main/scala/org/apache/spark/sql/arangodb/commons/ArangoDBConf.scala

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -232,6 +232,12 @@ object ArangoDBConf {
232232
.intConf
233233
.createWithDefault(DEFAULT_MAX_RETRY_DELAY)
234234

235+
val IGNORE_NULL_FIELDS = "ignoreNullFields"
236+
val ignoreNullFieldsConf: ConfigEntry[Boolean] = ConfigBuilder(IGNORE_NULL_FIELDS)
237+
.doc("whether to ignore null fields during serialization (only supported in Spark 3.x)")
238+
.booleanConf
239+
.createWithDefault(false)
240+
235241
private[sql] val confEntries: Map[String, ConfigEntry[_]] = CaseInsensitiveMap(Map(
236242
// driver config
237243
USER -> userConf,
@@ -272,7 +278,8 @@ object ArangoDBConf {
272278
KEEP_NULL -> keepNullConf,
273279
MAX_ATTEMPTS -> maxAttemptsConf,
274280
MIN_RETRY_DELAY -> minRetryDelayConf,
275-
MAX_RETRY_DELAY -> maxRetryDelayConf
281+
MAX_RETRY_DELAY -> maxRetryDelayConf,
282+
IGNORE_NULL_FIELDS -> ignoreNullFieldsConf
276283
))
277284

278285
/**
@@ -317,6 +324,8 @@ object ArangoDBConf {
317324
CaseInsensitiveMap(Map(configs.map { cfg => cfg.key -> cfg }: _*))
318325
}
319326

327+
def apply(): ArangoDBConf = new ArangoDBConf(Map.empty)
328+
320329
def apply(options: Map[String, String]): ArangoDBConf = new ArangoDBConf(options)
321330

322331
def apply(options: util.Map[String, String]): ArangoDBConf = ArangoDBConf(options.asScala.toMap)
@@ -335,6 +344,7 @@ class ArangoDBConf(opts: Map[String, String]) extends Serializable with Logging
335344
lazy val driverOptions: ArangoDBDriverConf = new ArangoDBDriverConf(settings)
336345
lazy val readOptions: ArangoDBReadConf = new ArangoDBReadConf(settings)
337346
lazy val writeOptions: ArangoDBWriteConf = new ArangoDBWriteConf(settings)
347+
lazy val mappingOptions: ArangoDBMappingConf = new ArangoDBMappingConf(settings)
338348

339349
def updated(kv: (String, String)): ArangoDBConf = new ArangoDBConf(settings + kv)
340350

@@ -559,7 +569,7 @@ class ArangoDBWriteConf(opts: Map[String, String]) extends ArangoDBConf(opts) {
559569

560570
val maxRetryDelay: Int = getConf(maxRetryDelayConf)
561571

562-
override def toString =
572+
override def toString: String =
563573
s"""ArangoDBWriteConf(
564574
|\t db=$db
565575
|\t collection=$collection
@@ -576,3 +586,16 @@ class ArangoDBWriteConf(opts: Map[String, String]) extends ArangoDBConf(opts) {
576586
|\t maxRetryDelay=$maxRetryDelay
577587
|)""".stripMargin
578588
}
589+
590+
591+
class ArangoDBMappingConf(opts: Map[String, String]) extends ArangoDBConf(opts) {
592+
593+
import ArangoDBConf._
594+
595+
val ignoreNullFields: Boolean = getConf(ignoreNullFieldsConf)
596+
597+
override def toString: String =
598+
s"""ArangoDBMappingConf(
599+
|\t ignoreNullFields=$ignoreNullFields
600+
|)""".stripMargin
601+
}

arangodb-spark-commons/src/main/scala/org/apache/spark/sql/arangodb/commons/mapping/ArangoGeneratorProvider.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
11
package org.apache.spark.sql.arangodb.commons.mapping
22

3-
import org.apache.spark.sql.arangodb.commons.ContentType
3+
import org.apache.spark.sql.arangodb.commons.{ArangoDBConf, ContentType}
44
import org.apache.spark.sql.types.StructType
55

66
import java.io.OutputStream
77
import java.util.ServiceLoader
88

99
trait ArangoGeneratorProvider {
10-
def of(contentType: ContentType, schema: StructType, outputStream: OutputStream): ArangoGenerator
10+
def of(contentType: ContentType, schema: StructType, outputStream: OutputStream, conf: ArangoDBConf): ArangoGenerator
1111
}
1212

1313
object ArangoGeneratorProvider {

arangodb-spark-commons/src/main/scala/org/apache/spark/sql/arangodb/commons/mapping/ArangoParserProvider.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
package org.apache.spark.sql.arangodb.commons.mapping
22

3-
import org.apache.spark.sql.arangodb.commons.ContentType
3+
import org.apache.spark.sql.arangodb.commons.{ArangoDBConf, ContentType}
44
import org.apache.spark.sql.types.DataType
55

66
import java.util.ServiceLoader
77

88
trait ArangoParserProvider {
9-
def of(contentType: ContentType, schema: DataType): ArangoParser
9+
def of(contentType: ContentType, schema: DataType, conf: ArangoDBConf): ArangoParser
1010
}
1111

1212
object ArangoParserProvider {

arangodb-spark-datasource-2.4/src/main/scala/org/apache/spark/sql/arangodb/datasource/mapping/ArangoGeneratorImpl.scala

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ package org.apache.spark.sql.arangodb.datasource.mapping
22

33
import com.arangodb.jackson.dataformat.velocypack.VPackFactory
44
import com.fasterxml.jackson.core.JsonFactory
5-
import org.apache.spark.sql.arangodb.commons.ContentType
5+
import org.apache.spark.sql.arangodb.commons.{ArangoDBConf, ContentType}
66
import org.apache.spark.sql.arangodb.datasource.mapping.json.{JSONOptions, JacksonGenerator}
77
import org.apache.spark.sql.types.{DataType, StructType}
88
import org.apache.spark.sql.arangodb.commons.mapping.{ArangoGenerator, ArangoGeneratorProvider}
@@ -19,23 +19,23 @@ abstract sealed class ArangoGeneratorImpl(
1919
options) with ArangoGenerator
2020

2121
class ArangoGeneratorProviderImpl extends ArangoGeneratorProvider {
22-
override def of(contentType: ContentType, schema: StructType, outputStream: OutputStream): ArangoGeneratorImpl = contentType match {
23-
case ContentType.JSON => new JsonArangoGenerator(schema, outputStream)
24-
case ContentType.VPACK => new VPackArangoGenerator(schema, outputStream)
22+
override def of(contentType: ContentType, schema: StructType, outputStream: OutputStream, conf: ArangoDBConf): ArangoGeneratorImpl = contentType match {
23+
case ContentType.JSON => new JsonArangoGenerator(schema, outputStream, conf)
24+
case ContentType.VPACK => new VPackArangoGenerator(schema, outputStream, conf)
2525
case _ => throw new IllegalArgumentException
2626
}
2727
}
2828

29-
class JsonArangoGenerator(schema: StructType, outputStream: OutputStream)
29+
class JsonArangoGenerator(schema: StructType, outputStream: OutputStream, conf: ArangoDBConf)
3030
extends ArangoGeneratorImpl(
3131
schema,
3232
outputStream,
33-
createOptions(new JsonFactory())
33+
createOptions(new JsonFactory(), conf)
3434
)
3535

36-
class VPackArangoGenerator(schema: StructType, outputStream: OutputStream)
36+
class VPackArangoGenerator(schema: StructType, outputStream: OutputStream, conf: ArangoDBConf)
3737
extends ArangoGeneratorImpl(
3838
schema,
3939
outputStream,
40-
createOptions(new VPackFactory())
40+
createOptions(new VPackFactory(), conf)
4141
)

arangodb-spark-datasource-2.4/src/main/scala/org/apache/spark/sql/arangodb/datasource/mapping/ArangoParserImpl.scala

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ package org.apache.spark.sql.arangodb.datasource.mapping
33
import com.arangodb.jackson.dataformat.velocypack.VPackFactory
44
import com.arangodb.velocypack.{VPackParser, VPackSlice}
55
import com.fasterxml.jackson.core.{JsonFactory, JsonParser}
6-
import org.apache.spark.sql.arangodb.commons.ContentType
6+
import org.apache.spark.sql.arangodb.commons.{ArangoDBConf, ContentType}
77
import org.apache.spark.sql.arangodb.commons.mapping.{ArangoParser, ArangoParserProvider}
88
import org.apache.spark.sql.arangodb.datasource.mapping.json.{JSONOptions, JacksonParser}
99
import org.apache.spark.sql.catalyst.InternalRow
@@ -23,24 +23,23 @@ abstract sealed class ArangoParserImpl(
2323
}
2424

2525
class ArangoParserProviderImpl extends ArangoParserProvider {
26-
override def of(contentType: ContentType, schema: DataType): ArangoParserImpl = contentType match {
27-
case ContentType.JSON => new JsonArangoParser(schema)
28-
case ContentType.VPACK => new VPackArangoParser(schema)
26+
override def of(contentType: ContentType, schema: DataType, conf: ArangoDBConf): ArangoParserImpl = contentType match {
27+
case ContentType.JSON => new JsonArangoParser(schema, conf)
28+
case ContentType.VPACK => new VPackArangoParser(schema, conf)
2929
case _ => throw new IllegalArgumentException
3030
}
3131
}
3232

33-
class JsonArangoParser(schema: DataType)
33+
class JsonArangoParser(schema: DataType, conf: ArangoDBConf)
3434
extends ArangoParserImpl(
3535
schema,
36-
createOptions(new JsonFactory()
37-
.configure(JsonParser.Feature.ALLOW_UNQUOTED_CONTROL_CHARS, true)),
36+
createOptions(new JsonFactory().configure(JsonParser.Feature.ALLOW_UNQUOTED_CONTROL_CHARS, true), conf),
3837
(bytes: Array[Byte]) => UTF8String.fromBytes(bytes)
3938
)
4039

41-
class VPackArangoParser(schema: DataType)
40+
class VPackArangoParser(schema: DataType, conf: ArangoDBConf)
4241
extends ArangoParserImpl(
4342
schema,
44-
createOptions(new VPackFactory()),
43+
createOptions(new VPackFactory(), conf),
4544
(bytes: Array[Byte]) => UTF8String.fromString(new VPackParser.Builder().build().toJson(new VPackSlice(bytes), true))
4645
)

arangodb-spark-datasource-2.4/src/main/scala/org/apache/spark/sql/arangodb/datasource/mapping/package.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11
package org.apache.spark.sql.arangodb.datasource
22

33
import com.fasterxml.jackson.core.JsonFactory
4+
import org.apache.spark.sql.arangodb.commons.ArangoDBConf
45
import org.apache.spark.sql.arangodb.datasource.mapping.json.JSONOptions
56

67
package object mapping {
7-
private[mapping] def createOptions(jsonFactory: JsonFactory) = new JSONOptions(
8+
private[mapping] def createOptions(jsonFactory: JsonFactory, conf: ArangoDBConf) = new JSONOptions(
89
Map.empty[String, String],
910
"UTC"
1011
) {

arangodb-spark-datasource-2.4/src/main/scala/org/apache/spark/sql/arangodb/datasource/reader/ArangoCollectionPartitionReader.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ class ArangoCollectionPartitionReader(
2424
// override endpoints with partition endpoint
2525
private val options = opts.updated(ArangoDBConf.ENDPOINTS, inputPartition.endpoint)
2626
private val actualSchema = StructType(ctx.requiredSchema.filterNot(_.name == options.readOptions.columnNameOfCorruptRecord))
27-
private val parser = ArangoParserProvider().of(options.driverOptions.contentType, actualSchema)
27+
private val parser = ArangoParserProvider().of(options.driverOptions.contentType, actualSchema, options)
2828
private val safeParser = new FailureSafeParser[Array[Byte]](
2929
parser.parse(_).toSeq,
3030
options.readOptions.parseMode,

arangodb-spark-datasource-2.4/src/main/scala/org/apache/spark/sql/arangodb/datasource/reader/ArangoQueryReader.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ class ArangoQueryReader(schema: StructType, options: ArangoDBConf) extends Input
1818
with Logging {
1919

2020
private val actualSchema = StructType(schema.filterNot(_.name == options.readOptions.columnNameOfCorruptRecord))
21-
private val parser = ArangoParserProvider().of(options.driverOptions.contentType, actualSchema)
21+
private val parser = ArangoParserProvider().of(options.driverOptions.contentType, actualSchema, options)
2222
private val safeParser = new FailureSafeParser[Array[Byte]](
2323
parser.parse(_).toSeq,
2424
options.readOptions.parseMode,

arangodb-spark-datasource-2.4/src/main/scala/org/apache/spark/sql/arangodb/datasource/writer/ArangoDataSourceWriter.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@ class ArangoDataSourceWriter(writeUUID: String, schema: StructType, mode: SaveMo
9292

9393
logInfo(s"Using save mode: $mode")
9494
logInfo(s"Using write configuration: \n${updatedOptions.writeOptions}")
95+
logInfo(s"Using mapping configuration: \n${updatedOptions.mappingOptions}")
9596
logInfo(s"Writing schema: \n${schema.treeString}")
9697
logInfo(s"Can retry: $canRetry")
9798

arangodb-spark-datasource-2.4/src/main/scala/org/apache/spark/sql/arangodb/datasource/writer/ArangoDataWriter.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ class ArangoDataWriter(schema: StructType, options: ArangoDBConf, partitionId: I
6666
private def initBatch(): Unit = {
6767
batchCount = 0
6868
outVPack = new ByteArrayOutputStream()
69-
vpackGenerator = ArangoGeneratorProvider().of(options.driverOptions.contentType, schema, outVPack)
69+
vpackGenerator = ArangoGeneratorProvider().of(options.driverOptions.contentType, schema, outVPack, options)
7070
vpackGenerator.writeStartArray()
7171
}
7272

0 commit comments

Comments
 (0)