Patch/abhi tablename : Adding import query type property and modify tableName property for redshift and postgres plugin#607
Conversation
amazon-redshift-plugin/src/main/java/io/cdap/plugin/amazon/redshift/RedshiftSchemaReader.java
Outdated
Show resolved
Hide resolved
| /** | ||
| * Override: Fetches schema fields for a specific table using database metadata. | ||
| */ | ||
| @Override |
There was a problem hiding this comment.
It is there in parent class, you can use that one, instead you can call getSchema method of this class inside
There was a problem hiding this comment.
no update on this, also javadoc not updated
| public void configurePipeline(PipelineConfigurer pipelineConfigurer) { | ||
| FailureCollector collector = pipelineConfigurer.getStageConfigurer().getFailureCollector(); | ||
| if (!sourceConfig.containsMacro("tableName") && !sourceConfig.containsMacro("importQuery")) { | ||
| if ((sourceConfig.getTableName() == null || sourceConfig.getTableName().isEmpty()) |
There was a problem hiding this comment.
Use Strings method for null empty check
amazon-redshift-plugin/src/main/java/io/cdap/plugin/amazon/redshift/RedshiftSource.java
Outdated
Show resolved
Hide resolved
|
|
||
| public DatabaseMetaData getDatabaseMetadata(Connection connection) throws SQLException { | ||
| return (DatabaseMetaData) connection.getMetaData().getColumns(null, | ||
| null, redshiftSourceConfig.getTableName(), null); |
There was a problem hiding this comment.
schema is not handled here, if customer is passing "schema.tablename", it will not work
There was a problem hiding this comment.
for now this method is not used so no need to handle we can remove this method .
There was a problem hiding this comment.
if not getting used then remove it
| "name": "port" | ||
| } | ||
| ] | ||
| }, |
There was a problem hiding this comment.
no need to add filters as it is hidden
There was a problem hiding this comment.
keeping it so that if further we have to make changes for this we can just changed the hidden property.
but if not required , will removed..
There was a problem hiding this comment.
remove for now, will add later if required
database-commons/src/main/java/io/cdap/plugin/db/CommonSchemaReader.java
Outdated
Show resolved
Hide resolved
database-commons/src/main/java/io/cdap/plugin/db/source/AbstractDBSource.java
Outdated
Show resolved
Hide resolved
database-commons/src/main/java/io/cdap/plugin/db/source/AbstractDBSource.java
Outdated
Show resolved
Hide resolved
| if (!Strings.isNullOrEmpty(importQuery)) { | ||
| return loadSchemaFromDB(connection, importQuery); | ||
| } else { | ||
| String query = String.format("SELECT * FROM %s LIMIT 1", tableName); |
There was a problem hiding this comment.
we have fetch schema from database metadata in case of tableName, this should not behave like this
| } | ||
| } | ||
|
|
There was a problem hiding this comment.
not removed properly, this class has no changes it should not be there in commit files
postgresql-plugin/src/main/java/io/cdap/plugin/postgres/PostgresSource.java
Outdated
Show resolved
Hide resolved
| public void configurePipeline(PipelineConfigurer pipelineConfigurer) { | ||
| FailureCollector collector = pipelineConfigurer.getStageConfigurer().getFailureCollector(); | ||
| if (!sourceConfig.containsMacro("tableName") && !sourceConfig.containsMacro("importQuery")) { | ||
| if ((Strings.isNullOrEmpty(sourceConfig.getTableName())) |
There was a problem hiding this comment.
merge these two conditions
database-commons/src/main/java/io/cdap/plugin/db/CommonSchemaReader.java
Show resolved
Hide resolved
database-commons/src/main/java/io/cdap/plugin/db/CommonSchemaReader.java
Show resolved
Hide resolved
| if (!containsMacro(IMPORT_QUERY) && Strings.isNullOrEmpty(importQuery)) { | ||
| collector.addFailure("Import Query is empty.", "Specify the Import Query.") | ||
| .withConfigProperty(IMPORT_QUERY); | ||
| if (!containsMacro(TABLE_NAME) && !containsMacro(IMPORT_QUERY)) { |
There was a problem hiding this comment.
merge if conditions
| "Import Query %s must contain the string '$CONDITIONS'. if Number of Splits is not set to 1.", importQuery), | ||
| "Include '$CONDITIONS' in the Import Query") | ||
| .withConfigProperty(IMPORT_QUERY); | ||
| if (!Strings.isNullOrEmpty(importQuery)) { |
There was a problem hiding this comment.
merge if conditions
database-commons/src/main/java/io/cdap/plugin/db/config/AbstractDBSpecificSourceConfig.java
Outdated
Show resolved
Hide resolved
| { | ||
| "name": "ImportQuery", | ||
| "condition": { | ||
| "expression": "importQueryType == 'importQuery'" |
There was a problem hiding this comment.
this is not done as per redshift
c91b373 to
bf6a780
Compare
| if ("timestamp".equalsIgnoreCase(typeName)) { | ||
| return Schema.of(Schema.LogicalType.DATETIME); | ||
| } | ||
| return DBUtils.getSchema(typeName, columnType, precision, scale, columnName, true, true); |
There was a problem hiding this comment.
last two values shouldn't be true by default, get it from metadata
amazon-redshift-plugin/src/main/java/io/cdap/plugin/amazon/redshift/RedshiftSource.java
Outdated
Show resolved
Hide resolved
| // Accept either ConnectException or SunCertPathBuilderException | ||
| String message = e.getMessage(); | ||
| assertTrue( | ||
| "Expected either ConnectException or SunCertPathBuilderException, but got: " + message, |
There was a problem hiding this comment.
when this exception will be thrown
| */ | ||
| public Schema getSchema(String typeName, int columnType, int precision, int scale, String columnName , | ||
| boolean isSigned, boolean handleAsDecimal) { | ||
| return null; |
There was a problem hiding this comment.
return DBUtils method from here
database-commons/src/main/java/io/cdap/plugin/db/config/AbstractDBSpecificSourceConfig.java
Show resolved
Hide resolved
| return String.format("%s.%s.%s", "source", ConnectionConfig.JDBC_PLUGIN_TYPE, sourceConfig.getJdbcPluginName()); | ||
| } | ||
|
|
||
| private Schema.Type mapSqlTypeToSchemaType(int sqlType) { |
There was a problem hiding this comment.
why this method is added?
| @Override | ||
| public Schema getSchema(String typeName, int columnType, int precision, int scale, String columnName, | ||
| boolean isSigned, boolean handleAsDecimal) { | ||
| if (STRING_MAPPED_REDSHIFT_TYPES_NAMES.contains(typeName)) { |
There was a problem hiding this comment.
this code is getting repeated from public Schema getSchema(ResultSetMetaData metadata, int index) method, extract that common code, do the same in postgres
database-commons/src/main/java/io/cdap/plugin/db/config/AbstractDBSpecificSourceConfig.java
Show resolved
Hide resolved
e627cb7 to
467a8cd
Compare
| The '$CONDITIONS' string will be replaced by 'splitBy' field limits specified by the bounding query. | ||
| The '$CONDITIONS' string is not required if numSplits is set to one. | ||
|
|
||
| **ImportQueryType** - determines how data is extracted—either by using a Table Name or a custom Import Query. |
There was a problem hiding this comment.
Add spaces in names, It should be the label name, refer snowflake PR for this
| return; | ||
| } | ||
| if (!sourceConfig.containsMacro(PROPERTY_IMPORT_QUERY_TYPE)) { | ||
| boolean isImportQuerySelected = ImportQueryType.IMPORT_QUERY.getValue().equals(sourceConfig.importQueryType); |
There was a problem hiding this comment.
This should not be a direct comparison as in case of null it should return import query. Use its getter method. No need for two variables, for one you can use negation.
7675916 to
27bbd79
Compare
|
|
||
| @Override | ||
| public void prepareRun(BatchSourceContext context) throws Exception { | ||
| FailureCollector collector = context.getFailureCollector(); |
There was a problem hiding this comment.
move this common logic of configurePipeline and prepareRun to its parent class as a separate method and call the same from postgres also
| if (!containsMacro(PROPERTY_IMPORT_QUERY_TYPE)) { | ||
| ImportQueryType importQueryType = ImportQueryType.fromString(getImportQueryType()); | ||
|
|
||
| boolean isImportQuerySelected = importQueryType == ImportQueryType.IMPORT_QUERY; |
There was a problem hiding this comment.
These conditions are already tested in redshift and postgres, here add just a single condition if both values are null or empty or macro. keep just the one on line 166
| actualField.getSchema().getNonNullable() : actualField.getSchema(); | ||
| Schema expectedFieldSchema = field.getSchema().isNullable() ? | ||
| field.getSchema().getNonNullable() : field.getSchema(); | ||
| field.getSchema().getNonNullable() : field.getSchema(); |
There was a problem hiding this comment.
revert unintended changes from whole PR
| if (STRING_MAPPED_POSTGRES_TYPES_NAMES.contains(typeName) || STRING_MAPPED_POSTGRES_TYPES.contains(columnType)) { | ||
| return Schema.of(Schema.Type.STRING); | ||
| } | ||
| if (typeName.equalsIgnoreCase("INT")) { |
There was a problem hiding this comment.
This was not there in postgres from where you added this and numeric changes
| if (Types.NUMERIC == columnType) { | ||
| int precision = metadata.getPrecision(index); | ||
| if (Types.NUMERIC == columnType || | ||
| "numeric".equalsIgnoreCase(typeName) || |
There was a problem hiding this comment.
from where this is added?
| "jdbc:postgresql://localhost:5432/null and arguments: " + | ||
| "{user=username}. Error: ConnectException: Connection refused " + | ||
| "(Connection refused)."); | ||
| "jdbc:postgresql://localhost:5432/null and arguments: " + |
There was a problem hiding this comment.
remove unintended changes
| "default": "importQuery", | ||
| "options": [ | ||
| { | ||
| "id": "importQuery", |
There was a problem hiding this comment.
This might be the reason e2e test cases are failing. Change name of id to nativeQuery and namedTable in all widget jsons and change name in enum also.
Text field with name tableName can be there as it is.
b83def1 to
5719a06
Compare
…t in redshift and Postgres plugin.
5719a06 to
e2a9c67
Compare
| if (STRING_MAPPED_REDSHIFT_TYPES_NAMES.contains(typeName)) { | ||
| return Schema.of(Schema.Type.STRING); | ||
| } | ||
| if ("INT".equalsIgnoreCase(typeName)) { |
| if ("INT".equalsIgnoreCase(typeName)) { | ||
| return Schema.of(Schema.Type.INT); | ||
| } | ||
| if ("BIGINT".equalsIgnoreCase(typeName)) { |
| columnName, typeName)); | ||
| return Schema.of(Schema.Type.STRING); | ||
| } | ||
| if ("timestamp".equalsIgnoreCase(typeName)) { |
| " converting into STRING type to avoid any precision loss.", | ||
| columnName, typeName)); | ||
| return Schema.of(Schema.Type.STRING); | ||
| } |
There was a problem hiding this comment.
why is this not similar to what we are doing for other datatypes?
| String columnName = columns.getString("COLUMN_NAME"); | ||
| String typeName = columns.getString("TYPE_NAME"); | ||
| int columnType = columns.getInt("DATA_TYPE"); | ||
| int precision = columns.getInt("COLUMN_SIZE"); | ||
| int scale = columns.getInt("DECIMAL_DIGITS"); | ||
| int nullable = columns.getInt("NULLABLE"); |
| } | ||
|
|
||
|
|
||
| @Override |
There was a problem hiding this comment.
This code can be moved to a parent class as mentioned in above comment for RedShift
Adding import query type property and modify tableName property for redshift and postgres plugin