Kafka Connect: Handle java.util.Date in RecordConverter.convertLong()#15345
Kafka Connect: Handle java.util.Date in RecordConverter.convertLong()#15345gurmeetsaran-ant wants to merge 1 commit intoapache:mainfrom
Conversation
The Confluent Avro converter deserializes timestamp-millis logical type fields as java.util.Date, but convertLong() only handled Number and String, throwing IllegalArgumentException. Add instanceof Date branch that calls Date.getTime() to extract epoch millis, consistent with other conversion methods in RecordConverter. Closes apache#15344
|
@gurmeetsaran-ant If the logical type in avro is timestamp-milliseconds then it maps to iceberg TimestampType and in that case convertLong will not be called rather will be called which handles the java.util.Date. |
Thanks for the review @kumarpritam863 . You're correct that when the Kafka Connect schema has Timestamp logical type, toIcebergType() maps to TimestampType and convertTimestampValue() is called, which already handles java.util.Date. However, in my setup the sink uses value.converter.schemas.enable=false with the Confluent Avro converter. This means record.valueSchema() is null on the sink side With valueSchema() == null, the Iceberg sink takes the schemaless code path and autoCreateTable() uses inferIcebergType(). The toIcebergType() -> TimestampType -> convertTimestampValue() path is never reached in this configuration. Instead, the type inference and value conversion depend on the Java runtime types in the Map, and convertLong() ends up receiving a java.util.Date. Relevant sink config: Avro schema in Schema Registry for the field:
Stack trace: The fix also makes convertLong() consistent with convertDateValue(), convertTimeValue(), convertOffsetDateTime(), and convertLocalDateTime(), which all already handle java.util.Date using the same @SuppressWarnings("JavaUtilDate") pattern. |
|
So two things here:
The other methods has a handling for date because their name suggests so, in case of convertLong() it does not make sense to check Date. If it is receiving Date may be the root cause is something else. |
|
Thanks for the review. I investigated further and you're right - with the current configuration, toIcebergType() correctly maps the org.apache.kafka.connect.data.Timestamp schema to TimestampType, so the value goes through convertTimestampValue() and never reaches convertLong(). I added debug logging and confirmed:
I was unable to reproduce the original crash with the current setup. The original failure likely occurred under different conditions (e.g., a pre-existing table with a LONG column for __ts_ms, or something to do with my config at the time of error). That said, convertLong() is a general-purpose conversion method that can receive any Object. Adding Date support to convertLong() makes it consistent and prevents a confusing IllegalArgumentException if this edge case is ever hit. Happy to close this if you feel it's not worth the change or keep it as a defensive improvement. |
The Confluent Avro converter deserializes timestamp-millis logical type fields as java.util.Date, but convertLong() only handled Number and String, throwing IllegalArgumentException.
Add instanceof Date branch that calls Date.getTime() to extract epoch millis, consistent with other conversion methods in RecordConverter.
Closes #15344