How to use Scala XML with Apache Flink?
Asked Answered
D

1

11

I am trying to use Scala XML library in Flink to parse an XML and I am not able to make it work. Please note that I need to use both the serialized and unserialized (string) version on my code in the same processing function.

I tried already different solutions, they always work in IntelliJ but not when I run them on a Flink cluster. They always return different java.lang.LinkageError: com/sun/org/apache/xerces/internal/jaxp/SAXParserImpl$JAXPSAXParser; I tried multiple things but I still get error similar to this one.

This is an example of what my Flink Job looks like:

object StreamingJob {
  import org.apache.flink.streaming.api.scala._

  val l = List(
    """<ciao>ciao</ciao>""",
  )

  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    // set up kafka section excluded
    env.setParallelism(10)

    val stream = env.fromCollection(l)

    stream
      .uid("process")
      .map(new Processor)
      .print

    env.execute("Flink-TEST")
  }
}

This is an example of my processing function:

import javax.xml.parsers.{SAXParser, SAXParserFactory}
import org.apache.flink.api.common.functions.MapFunction
import scala.xml.{Elem, XML}
import scala.xml.factory.XMLLoader

class Processor extends MapFunction[String, String] {
  override def map(translatedMessage: String): String = {
    val xml = Processor.xmlLoader.loadString(translatedMessage)
    xml.toString
  }
}
object Processor {
  val factory: SAXParserFactory = SAXParserFactory.newInstance
  val SAXParser: SAXParser = factory.newSAXParser
  val xmlLoader: XMLLoader[Elem] = XML.withSAXParser(SAXParser)
}

and finally this is my pom.xml, using the maven-shade plugin to make the jar I pass to flink:

        <!-- other sections of the pom are excluded -->
        <properties>
            <flink.version>1.7.0</flink.version>
            <scala.binary.version>2.12</scala.binary.version>
            <scala.version>2.12.8</scala.version>
        </properties>
        <!-- other sections of the pom are excluded -->
    <dependencies>
        <!-- Apache Flink dependencies -->
        <!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <!-- Scala Library, provided by Flink as well. -->
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.dataformat</groupId>
            <artifactId>jackson-dataformat-xml</artifactId>
            <version>2.9.4</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_2.12</artifactId>
            <version>1.7.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <version>2.11.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-slf4j-impl</artifactId>
            <version>2.11.1</version>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.4.0</version>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.dataformat</groupId>
            <artifactId>jackson-dataformat-yaml</artifactId>
            <version>2.4.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-api-scala_2.12</artifactId>
            <version>11.0</version>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
        </dependency>
        <dependency>
            <groupId>org.scala-lang.modules</groupId>
            <artifactId>scala-xml_2.12</artifactId>
            <version>1.1.1</version>
        </dependency>
    </dependencies>
        <!-- other sections of the pom are excluded -->
<build>
        <plugins>
            <!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.0.0</version>
                <executions>
                    <!-- Run shade goal on package phase -->
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <artifactSet>
                                <excludes>
                                    <exclude>org.apache.flink:force-shading</exclude>
                                    <exclude>com.google.code.findbugs:jsr305</exclude>
                                    <exclude>org.slf4j:*</exclude>
                                    <exclude>log4j:*</exclude>
                                </excludes>
                            </artifactSet>
                            <filters>
                                <filter>
                                    <!-- Do not copy the signatures in the META-INF folder.
                                    Otherwise, this might cause SecurityExceptions when using the JAR. -->
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>com.mycompany.myproj.artifactId.default.StreamingJob</mainClass>
                                </transformer>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                                    <resource>reference.conf</resource>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
            <!-- Java Compiler -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>

            <!-- Scala Compiler -->
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.2</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>

            <plugin>
                <groupId>org.codehaus.mojo</groupId>
                <artifactId>build-helper-maven-plugin</artifactId>
                <version>1.7</version>
                <executions>
                    <!-- Add src/main/scala to eclipse build path -->
                    <execution>
                        <id>add-source</id>
                        <phase>generate-sources</phase>
                        <goals>
                            <goal>add-source</goal>
                        </goals>
                        <configuration>
                            <sources>
                                <source>src/main/scala</source>
                            </sources>
                        </configuration>
                    </execution>
                    <!-- Add src/test/scala to eclipse build path -->
                    <execution>
                        <id>add-test-source</id>
                        <phase>generate-test-sources</phase>
                        <goals>
                            <goal>add-test-source</goal>
                        </goals>
                        <configuration>
                            <sources>
                                <source>src/test/scala</source>
                            </sources>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
        <!-- other sections of the pom are excluded -->

I believe the issue is somehow related to the implementation that is going to be used for the SAXParser that Flink is using at runtime. I also tried using the @transient annotation to prevent persisting the fields from Flink but without success.

However I am quite confusion on what exactly is happening, anybody knows how to prevent the error and what went wrong?

Declan answered 31/1, 2019 at 14:27 Comment(4)
Your problem might be related to thisRuisdael
Seems like, however I don't see a valid solution for my use case.Declan
Is there a reason you exclude org.apache.flink:force-shading ? Shading usually helps agains these kind of runtime errorsZennas
Because I execute under flink already so I don't need it on the jar.Declan
D
0

After a while I figure out what was wrong with it.

Scala XML docs says:

In Scala 2.11 and later, add the following to your build.sbt file's libraryDependencies:

"org.scala-lang.modules" %% "scala-xml" % "1.1.1"

Which in Maven translates into:

<dependency>
    <groupId>org.scala-lang.modules</groupId>
    <artifactId>scala-xml_2.12</artifactId>
    <version>1.1.1</version>
</dependency>

Well it appears that this dependency is not needed as even though Flink 1.7.2 appears to use Scala 2.12.8 it still retains Scala XML inside his distribution (hence in the classpath), I believe this can cause problems into which class is actually loaded and if correctly, however that might not be the solution to the linkage error.

The solution to the linkage error is actually by using Flink's own RichMapFunction[InputT, OutputT]:

class Processor extends RichMapFunction[String, String] {
  var factory: SAXParserFactory = _
  var SAXParser: SAXParser = _
  var xmlLoader: XMLLoader[Elem] = _

  override def open(parameters: Configuration): Unit = {
    factory = SAXParserFactory.newInstance
    SAXParser = factory.newSAXParser
    xmlLoader = XML.withSAXParser(SAXParser)
  }

  override def map(translatedMessage: String): String = {
    val xml = xmlLoader.loadString(translatedMessage)
    xml.toString
  }
}

As the JavaDoc says:

Initialization method for the function.

It is called before the actual working methods (like map or join) and thus suitable for one time setup work. For functions that are part of an iteration, this method will be invoked at the beginning of each iteration superstep.

Unfortunately the usage of var is preferred in this case as the initialisation of values/variables needs to be handled by Flink preventing the linkage error at runtime.

Some notes:

  • I realised this might happens only on DataStream[T] and not on DataSet[T].
  • The Job needs to have parallelism set at more than 1 to cause multiple task manager loading the same class, if done in IDEs it might be tricky has explained here.
  • After noticing the cause of this issue it seems that companion objects may not be ideal for Flink usage.
  • This last part might be a nice note to be put in the "Scala API extensions" page of Flink where it is also explains how Flink does not usually support anonymous pattern matching functions to deconstruct tuples unless the Flink Scala API Extension is used: https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/scala_api_extensions.html
Declan answered 3/4, 2019 at 2:5 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.