Issue Description

When running a Spring Boot application with Apache Spark in cluster mode, a java.lang.ClassCastException related to java.lang.invoke.SerializedLambda occurs. This issue is not reproducible when running the application without Spring Boot in the cluster environment.

Steps to Reproduce

@SpringBootApplication(exclude = {org.springframework.boot.autoconfigure.gson.GsonAutoConfiguration.class})
public class SampleSparkJob{

    public static void main(String[] args) {
        SpringApplication.run(DataIngestionServiceApplication.class, args);


        SparkSession spark = SparkSession.builder()
                .appName("SampleSparkJob")
                .master("local[*]")
                .getOrCreate();

        Dataset<String> dummyData = spark.createDataset(Arrays.asList("Abhi", "Andrii", "Rick", "Duc"), Encoders.STRING());

        Dataset<TestData> transformedData = dummyData.flatMap(new TestDataFlatMap(), Encoders.bean(TestData.class));

        transformedData.show();

        transformedData.write().mode("append").parquet("outputpath");

        spark.stop();
    }

}
class TestDataFlatMap implements FlatMapFunction<String, TestData>, Serializable {
    @Override
    public Iterator<TestData> call(String name) {
        return Arrays.asList(new TestData(name)).iterator();
    }
}

@Data
@AllArgsConstructor
public class TestData implements Serializable {
    private String name;
}

Running this Spark application with spark submit command in cluster mode with Spring Boot results in the mentioned ClassCastException.

Environment

  • Java Version: 11
  • Spring Boot Version: 2.7.10
  • Spark Version: 3.2.1

Additional Information

The issue seems to be related to Spring Boot auto-configuration or the dependencies included with Spring Boot.

Comment From: philwebb

It's not clear that this is a Spring Boot issue since we don't provide any integration with Spark. Could you please provide the full stack trace for the exception so we can see if there's anything obvious.

Comment From: punditavi

It might not be a direct issue but the issue might be related to how Spring Boot interacts with Spark, especially during the serialization process. Spring Boot might introduce certain classes or configurations that conflict with Spark's serialization requirements.

As when I run it as a simple java main class without spring boot dependency it works fine.

**Stack trace:**
2023-11-07 11:30:32.432+01 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0) (10.248.66.38 executor 0): java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.spark.rdd.MapPartitionsRDD.f of type scala.Function3 in instance of org.apache.spark.rdd.MapPartitionsRDD
    at java.base/java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2076)
    at java.base/java.io.ObjectStreamClass$FieldReflector.checkObjectFieldValueTypes(ObjectStreamClass.java:2039)
    at java.base/java.io.ObjectStreamClass.checkObjFieldValueTypes(ObjectStreamClass.java:1293)
    at java.base/java.io.ObjectInputStream.defaultCheckFieldValues(ObjectInputStream.java:2512)
    at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2419)
    at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
    at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687)
    at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2496)
    at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2390)
    at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
    at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687)
    at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:489)
    at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:447)
    at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:527)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    at java.base/java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1046)
    at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2357)
    at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
    at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687)
    at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2496)
    at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2390)
    at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
    at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687)
    at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2496)
    at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2390)
    at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
    at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687)
    at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:489)
    at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:447)
    at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:527)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    at java.base/java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1046)
    at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2357)
    at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
    at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687)
    at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2496)
    at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2390)
    at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
    at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687)
    at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2496)
    at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2390)
    at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
    at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687)
    at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:489)
    at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:447)
    at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:527)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    at java.base/java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1046)
    at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2357)
    at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
    at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687)
    at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2496)
    at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2390)
    at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
    at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687)
    at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2496)
    at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2390)
    at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
    at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687)
    at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:489)
    at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:447)
    at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76)
    at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:115)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:83)
    at org.apache.spark.scheduler.Task.run(Task.scala:131)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)

Comment From: philwebb

I can't see anything in the stack trace that indicates this is a Spring Boot issue that we can help you with. I'm afraid nobody on the team has much knowledge of Spark. I did find this stackoverflow.com question where someone has a very similar exception. The answer seems to suggest that jars might be missing.

I'm afraid you'll need to get help from the Spark community instead of us. I would suggest putting together a minimal reproducer and seeing if someone on stackoverflow.com can help you.

If you are able to find a specific Spring Boot bug then we can reopen this issue.