Skip to content

Comments

Kafka Connect: Handle java.util.Date in RecordConverter.convertLong()#15345

Open
gurmeetsaran-ant wants to merge 1 commit intoapache:mainfrom
gurmeetsaran-ant:fix/kafka-connect-date-to-long-conversion
Open

Kafka Connect: Handle java.util.Date in RecordConverter.convertLong()#15345
gurmeetsaran-ant wants to merge 1 commit intoapache:mainfrom
gurmeetsaran-ant:fix/kafka-connect-date-to-long-conversion

Conversation

@gurmeetsaran-ant
Copy link

The Confluent Avro converter deserializes timestamp-millis logical type fields as java.util.Date, but convertLong() only handled Number and String, throwing IllegalArgumentException.

Add instanceof Date branch that calls Date.getTime() to extract epoch millis, consistent with other conversion methods in RecordConverter.

Closes #15344

The Confluent Avro converter deserializes timestamp-millis logical type
fields as java.util.Date, but convertLong() only handled Number and
String, throwing IllegalArgumentException.

Add instanceof Date branch that calls Date.getTime() to extract epoch
millis, consistent with other conversion methods in RecordConverter.

Closes apache#15344
@kumarpritam863
Copy link
Contributor

@gurmeetsaran-ant If the logical type in avro is timestamp-milliseconds then it maps to iceberg TimestampType and in that case convertLong will not be called rather

protected Temporal convertTimestampValue(Object value, TimestampType type) {
will be called which handles the java.util.Date.

@gurmeetsaran-ant
Copy link
Author

@gurmeetsaran-ant If the logical type in avro is timestamp-milliseconds then it maps to iceberg TimestampType and in that case convertLong will not be called rather

protected Temporal convertTimestampValue(Object value, TimestampType type) {

will be called which handles the java.util.Date.

Thanks for the review @kumarpritam863 . You're correct that when the Kafka Connect schema has Timestamp logical type, toIcebergType() maps to TimestampType and convertTimestampValue() is called, which already handles java.util.Date.

However, in my setup the sink uses value.converter.schemas.enable=false with the Confluent Avro converter. This means record.valueSchema() is null on the sink side

if (sample.valueSchema() == null) {
Type type = SchemaUtils.inferIcebergType(sample.value(), config);
if (type == null) {

With valueSchema() == null, the Iceberg sink takes the schemaless code path and autoCreateTable() uses inferIcebergType(). The toIcebergType() -> TimestampType -> convertTimestampValue() path is never reached in this configuration. Instead, the type inference and value conversion depend on the Java runtime types in the Map, and convertLong() ends up receiving a java.util.Date.

Relevant sink config:

  value.converter=io.confluent.connect.avro.AvroConverter
  value.converter.schemas.enable=false
  iceberg.tables.auto-create-enabled=true
  iceberg.tables.evolve-schema-enabled=true

Avro schema in Schema Registry for the field:

{"type":"long","connect.version":1,"connect.name":"org.apache.kafka.connect.data.Timestamp","logicalType":"timestamp-millis"}

Stack trace:

java.lang.IllegalArgumentException: Cannot convert to long: java.util.Date
    at o.a.i.connect.data.RecordConverter.convertLong(RecordConverter.java:327)
    at o.a.i.connect.data.RecordConverter.convertValue(RecordConverter.java:247)
    at o.a.i.connect.data.RecordConverter.convertStructValue(RecordConverter.java:227)
    at o.a.i.connect.data.RecordConverter.convert(RecordConverter.java:111)

The fix also makes convertLong() consistent with convertDateValue(), convertTimeValue(), convertOffsetDateTime(), and convertLocalDateTime(), which all already handle java.util.Date using the same @SuppressWarnings("JavaUtilDate") pattern.

@kumarpritam863
Copy link
Contributor

So two things here:

  1. Even if the schema is disabled, the types is inferred by the value: If value was of type java.util.Date then this part should have returned TimestampType else if (value instanceof java.util.Date || value instanceof OffsetDateTime) { return TimestampType.withZone(); and if the value was not a type of java.util.Date then you would not receive error.

The fix also makes convertLong() consistent with convertDateValue(), convertTimeValue(), convertOffsetDateTime(), and convertLocalDateTime(), which all already handle java.util.Date using the same

The other methods has a handling for date because their name suggests so, in case of convertLong() it does not make sense to check Date. If it is receiving Date may be the root cause is something else.

@gurmeetsaran-ant
Copy link
Author

Thanks for the review. I investigated further and you're right - with the current configuration, toIcebergType() correctly maps the org.apache.kafka.connect.data.Timestamp schema to TimestampType, so the value goes through convertTimestampValue() and never reaches convertLong().

I added debug logging and confirmed:

  • hasValueSchema=true, valueClass=Struct
  • __ts_ms schema: type=INT64, schemaName=org.apache.kafka.connect.data.Timestamp
  • toIcebergType result: __ts_ms: optional timestamptz

I was unable to reproduce the original crash with the current setup. The original failure likely occurred under different conditions (e.g., a pre-existing table with a LONG column for __ts_ms, or something to do with my config at the time of error).

That said, convertLong() is a general-purpose conversion method that can receive any Object. Adding Date support to convertLong() makes it consistent and prevents a confusing IllegalArgumentException if this edge case is ever hit.

Happy to close this if you feel it's not worth the change or keep it as a defensive improvement.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Kafka Connect: RecordConverter.convertLong() fails with java.util.Date from Confluent Avro converter

3 participants