From 1c0c4b689939700fb96c9868cb037e8009f9ab91 Mon Sep 17 00:00:00 2001 From: psainics Date: Fri, 2 May 2025 08:33:23 +0530 Subject: [PATCH] Skip field validation for compatiblity --- .../plugin/db/source/AbstractDBSource.java | 24 ++++++++++------- .../db/source/AbstractDBSourceTest.java | 12 ++++++--- .../mariadb/MariadbFieldsValidator.java | 25 +++++++++++++++++ .../io/cdap/plugin/mariadb/MariadbSink.java | 7 +++++ .../io/cdap/plugin/mariadb/MariadbSource.java | 27 +++++++++++++++++++ 5 files changed, 83 insertions(+), 12 deletions(-) create mode 100644 mariadb-plugin/src/main/java/io/cdap/plugin/mariadb/MariadbFieldsValidator.java diff --git a/database-commons/src/main/java/io/cdap/plugin/db/source/AbstractDBSource.java b/database-commons/src/main/java/io/cdap/plugin/db/source/AbstractDBSource.java index 3fd490ada..54d1e2ab6 100644 --- a/database-commons/src/main/java/io/cdap/plugin/db/source/AbstractDBSource.java +++ b/database-commons/src/main/java/io/cdap/plugin/db/source/AbstractDBSource.java @@ -529,7 +529,7 @@ public void validateSchema(Schema actualSchema, FailureCollector collector) { } @VisibleForTesting - static void validateSchema(Schema actualSchema, Schema configSchema, FailureCollector collector) { + void validateSchema(Schema actualSchema, Schema configSchema, FailureCollector collector) { if (configSchema == null) { collector.addFailure("Schema should not be null or empty.", null) .withConfigProperty(SCHEMA); @@ -550,14 +550,20 @@ static void validateSchema(Schema actualSchema, Schema configSchema, FailureColl Schema expectedFieldSchema = field.getSchema().isNullable() ? field.getSchema().getNonNullable() : field.getSchema(); - if (actualFieldSchema.getType() != expectedFieldSchema.getType() || - actualFieldSchema.getLogicalType() != expectedFieldSchema.getLogicalType()) { - collector.addFailure( - String.format("Schema field '%s' has type '%s but found '%s'.", - field.getName(), expectedFieldSchema.getDisplayName(), - actualFieldSchema.getDisplayName()), null) - .withOutputSchemaField(field.getName()); - } + validateField(collector, field, actualFieldSchema, expectedFieldSchema); + } + } + + protected void validateField(FailureCollector collector, Schema.Field field, Schema actualFieldSchema, + Schema expectedFieldSchema) { + if (actualFieldSchema.getType() != expectedFieldSchema.getType() || + actualFieldSchema.getLogicalType() != expectedFieldSchema.getLogicalType()) { + collector.addFailure( + String.format("Schema field '%s' is expected to have type '%s but found '%s'.", field.getName(), + expectedFieldSchema.getDisplayName(), actualFieldSchema.getDisplayName()), + String.format("Change the data type of field %s to %s.", field.getName(), + actualFieldSchema.getDisplayName())) + .withOutputSchemaField(field.getName()); } } diff --git a/database-commons/src/test/java/io/cdap/plugin/db/source/AbstractDBSourceTest.java b/database-commons/src/test/java/io/cdap/plugin/db/source/AbstractDBSourceTest.java index 3dc7a2d1c..a8be38b46 100644 --- a/database-commons/src/test/java/io/cdap/plugin/db/source/AbstractDBSourceTest.java +++ b/database-commons/src/test/java/io/cdap/plugin/db/source/AbstractDBSourceTest.java @@ -43,11 +43,17 @@ public class AbstractDBSourceTest { Schema.Field.of("double_column", Schema.nullableOf(Schema.of(Schema.Type.DOUBLE))), Schema.Field.of("boolean_column", Schema.nullableOf(Schema.of(Schema.Type.BOOLEAN))) ); + private static final AbstractDBSource.DBSourceConfig TEST_CONFIG = new AbstractDBSource.DBSourceConfig() { + @Override + public String getConnectionString() { + return ""; + } + }; @Test public void testValidateSourceSchemaCorrectSchema() { MockFailureCollector collector = new MockFailureCollector(MOCK_STAGE); - AbstractDBSource.DBSourceConfig.validateSchema(SCHEMA, SCHEMA, collector); + TEST_CONFIG.validateSchema(SCHEMA, SCHEMA, collector); Assert.assertEquals(0, collector.getValidationFailures().size()); } @@ -65,7 +71,7 @@ public void testValidateSourceSchemaMismatchFields() { ); MockFailureCollector collector = new MockFailureCollector(MOCK_STAGE); - AbstractDBSource.DBSourceConfig.validateSchema(actualSchema, SCHEMA, collector); + TEST_CONFIG.validateSchema(actualSchema, SCHEMA, collector); assertPropertyValidationFailed(collector, "boolean_column"); } @@ -84,7 +90,7 @@ public void testValidateSourceSchemaInvalidFieldType() { ); MockFailureCollector collector = new MockFailureCollector(MOCK_STAGE); - AbstractDBSource.DBSourceConfig.validateSchema(actualSchema, SCHEMA, collector); + TEST_CONFIG.validateSchema(actualSchema, SCHEMA, collector); assertPropertyValidationFailed(collector, "boolean_column"); } diff --git a/mariadb-plugin/src/main/java/io/cdap/plugin/mariadb/MariadbFieldsValidator.java b/mariadb-plugin/src/main/java/io/cdap/plugin/mariadb/MariadbFieldsValidator.java new file mode 100644 index 000000000..71ccb0d06 --- /dev/null +++ b/mariadb-plugin/src/main/java/io/cdap/plugin/mariadb/MariadbFieldsValidator.java @@ -0,0 +1,25 @@ +/* + * Copyright © 2025 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.mariadb; + +import io.cdap.plugin.mysql.MysqlFieldsValidator; + +/** + * Field validator for maraidb + */ +public class MariadbFieldsValidator extends MysqlFieldsValidator { +} diff --git a/mariadb-plugin/src/main/java/io/cdap/plugin/mariadb/MariadbSink.java b/mariadb-plugin/src/main/java/io/cdap/plugin/mariadb/MariadbSink.java index 589da2953..52a73344a 100644 --- a/mariadb-plugin/src/main/java/io/cdap/plugin/mariadb/MariadbSink.java +++ b/mariadb-plugin/src/main/java/io/cdap/plugin/mariadb/MariadbSink.java @@ -25,6 +25,8 @@ import io.cdap.plugin.db.SchemaReader; import io.cdap.plugin.db.config.DBSpecificSinkConfig; import io.cdap.plugin.db.sink.AbstractDBSink; +import io.cdap.plugin.db.sink.FieldsValidator; +import io.cdap.plugin.mysql.MysqlFieldsValidator; import io.cdap.plugin.util.DBUtils; import java.util.Map; @@ -70,6 +72,11 @@ protected String getExternalDocumentationLink() { return DBUtils.MARIADB_SUPPORTED_DOC_URL; } + @Override + protected FieldsValidator getFieldsValidator() { + return new MariadbFieldsValidator(); + } + /** * MariaDB Sink Config. */ diff --git a/mariadb-plugin/src/main/java/io/cdap/plugin/mariadb/MariadbSource.java b/mariadb-plugin/src/main/java/io/cdap/plugin/mariadb/MariadbSource.java index 098145ab9..28204100c 100644 --- a/mariadb-plugin/src/main/java/io/cdap/plugin/mariadb/MariadbSource.java +++ b/mariadb-plugin/src/main/java/io/cdap/plugin/mariadb/MariadbSource.java @@ -19,6 +19,8 @@ import io.cdap.cdap.api.annotation.Description; import io.cdap.cdap.api.annotation.Name; import io.cdap.cdap.api.annotation.Plugin; +import io.cdap.cdap.api.data.schema.Schema; +import io.cdap.cdap.etl.api.FailureCollector; import io.cdap.cdap.etl.api.batch.BatchSource; import io.cdap.cdap.etl.api.batch.BatchSourceContext; import io.cdap.plugin.common.Asset; @@ -172,5 +174,30 @@ public Map getConnectionArguments() { arguments.putIfAbsent(MARIADB_TINYINT1_IS_BIT, "false"); return arguments; } + + @Override + protected void validateField(FailureCollector collector, + Schema.Field field, + Schema actualFieldSchema, + Schema expectedFieldSchema) { + // Backward compatibility changes to support MySQL YEAR to Date type conversion + if (Schema.LogicalType.DATE.equals(expectedFieldSchema.getLogicalType()) + && Schema.Type.INT.equals(actualFieldSchema.getType())) { + return; + } + + // Backward compatibility change to support MySQL MEDIUMINT UNSIGNED to Long type conversion + if (Schema.Type.LONG.equals(expectedFieldSchema.getType()) + && Schema.Type.INT.equals(actualFieldSchema.getType())) { + return; + } + + // Backward compatibility change to support MySQL TINYINT(1) to Bool type conversion + if (Schema.Type.BOOLEAN.equals(expectedFieldSchema.getType()) + && Schema.Type.INT.equals(actualFieldSchema.getType())) { + return; + } + super.validateField(collector, field, actualFieldSchema, expectedFieldSchema); + } } }