diff --git a/pom.xml b/pom.xml index dc8eafd64..07cc7283f 100644 --- a/pom.xml +++ b/pom.xml @@ -14,7 +14,7 @@ org.mule.connectors mule-db-connector mule-extension - 2.0.0-SNAPSHOT + 2.0.3-SNAPSHOT Database Connector A Mule extension that provides functionality for connecting to Databases through the JDBC standard @@ -62,6 +62,8 @@ 5.3.2 5.3.2 1.3.7 + 1.0.0-SNAPSHOT + 1.0.0-SNAPSHOT @@ -174,6 +176,16 @@ ${xmlunit.version} test + + org.mule.sdk + mule-sdk-compatibility-api + ${muleSdkCompatibilityApiVersion} + + + org.mule.sdk + mule-sdk-api + ${muleSdkApiVersion} + diff --git a/src/main/java/org/mule/extension/db/internal/domain/connection/datasource/DbDataSourceReferenceConnectionProvider.java b/src/main/java/org/mule/extension/db/internal/domain/connection/datasource/DbDataSourceReferenceConnectionProvider.java index e0c66e3f6..6c24ac0e2 100644 --- a/src/main/java/org/mule/extension/db/internal/domain/connection/datasource/DbDataSourceReferenceConnectionProvider.java +++ b/src/main/java/org/mule/extension/db/internal/domain/connection/datasource/DbDataSourceReferenceConnectionProvider.java @@ -12,6 +12,7 @@ import javax.sql.DataSource; +import org.mule.db.commons.internal.domain.connection.DataSourceConfigDbConnectionTracingMetadata; import org.mule.db.commons.internal.domain.connection.DbConnection; import org.mule.db.commons.internal.domain.connection.datasource.DataSourceReferenceConnectionProvider; import org.mule.db.commons.internal.domain.type.ResolvedDbType; @@ -43,7 +44,8 @@ public class DbDataSourceReferenceConnectionProvider extends DataSourceReference protected DbConnection createDbConnection(Connection connection) throws Exception { if (isOracle(connection)) { return new OracleDbConnection(connection, super.resolveCustomTypes(), resolvedDbTypesCache, - super.getCacheQueryTemplateSize()); + super.getCacheQueryTemplateSize(), + new DataSourceConfigDbConnectionTracingMetadata(getDataSourceConfig().get())); } else { return super.createDbConnection(connection); } diff --git a/src/main/java/org/mule/extension/db/internal/domain/connection/derby/DerbyConnection.java b/src/main/java/org/mule/extension/db/internal/domain/connection/derby/DerbyConnection.java index 0ff903e62..e67ff51f4 100644 --- a/src/main/java/org/mule/extension/db/internal/domain/connection/derby/DerbyConnection.java +++ b/src/main/java/org/mule/extension/db/internal/domain/connection/derby/DerbyConnection.java @@ -8,6 +8,7 @@ import org.mule.db.commons.internal.domain.connection.DefaultDbConnection; import org.mule.db.commons.internal.domain.type.DbType; +import org.mule.db.commons.internal.domain.connection.DbConnectionTracingMetadata; import java.sql.Connection; import java.util.List; @@ -19,8 +20,9 @@ */ public class DerbyConnection extends DefaultDbConnection { - DerbyConnection(Connection connection, List dbTypes, long cacheQueryTemplateSize) { - super(connection, dbTypes, cacheQueryTemplateSize); + DerbyConnection(Connection connection, List dbTypes, long cacheQueryTemplateSize, + DbConnectionTracingMetadata dbConnectionTracingMetadata) { + super(connection, dbTypes, cacheQueryTemplateSize, dbConnectionTracingMetadata); } // We are disabling content streaming for Derby because of a incompatibility between the connector logic and the diff --git a/src/main/java/org/mule/extension/db/internal/domain/connection/derby/DerbyConnectionProvider.java b/src/main/java/org/mule/extension/db/internal/domain/connection/derby/DerbyConnectionProvider.java index 6b0373094..b44b63e7f 100644 --- a/src/main/java/org/mule/extension/db/internal/domain/connection/derby/DerbyConnectionProvider.java +++ b/src/main/java/org/mule/extension/db/internal/domain/connection/derby/DerbyConnectionProvider.java @@ -25,6 +25,7 @@ import org.mule.db.commons.api.exception.connection.DbError; import org.mule.db.commons.internal.domain.connection.DataSourceConfig; +import org.mule.db.commons.internal.domain.connection.DataSourceConfigDbConnectionTracingMetadata; import org.mule.db.commons.internal.domain.connection.DbConnection; import org.mule.db.commons.internal.domain.connection.DbConnectionProvider; import org.mule.runtime.extension.api.annotation.Alias; @@ -64,7 +65,8 @@ public java.util.Optional getDataSourceConfig() { @Override protected DbConnection createDbConnection(Connection connection) throws Exception { - return new DerbyConnection(connection, resolveCustomTypes(), super.getCacheQueryTemplateSize()); + return new DerbyConnection(connection, resolveCustomTypes(), super.getCacheQueryTemplateSize(), + new DataSourceConfigDbConnectionTracingMetadata(getDataSourceConfig().get())); } @Override diff --git a/src/main/java/org/mule/extension/db/internal/domain/connection/generic/DbGenericConnectionProvider.java b/src/main/java/org/mule/extension/db/internal/domain/connection/generic/DbGenericConnectionProvider.java index 306215754..1cfc4dfa8 100644 --- a/src/main/java/org/mule/extension/db/internal/domain/connection/generic/DbGenericConnectionProvider.java +++ b/src/main/java/org/mule/extension/db/internal/domain/connection/generic/DbGenericConnectionProvider.java @@ -10,6 +10,7 @@ import static org.mule.runtime.api.meta.ExternalLibraryType.JAR; import static org.mule.db.commons.internal.domain.connection.DbConnectionProvider.DRIVER_FILE_NAME_PATTERN; +import org.mule.db.commons.internal.domain.connection.DataSourceConfigDbConnectionTracingMetadata; import org.mule.db.commons.internal.domain.connection.DbConnection; import org.mule.db.commons.internal.domain.connection.generic.GenericConnectionProvider; import org.mule.db.commons.internal.domain.type.ResolvedDbType; @@ -39,7 +40,8 @@ public class DbGenericConnectionProvider extends GenericConnectionProvider { @Override protected DbConnection createDbConnection(Connection connection) throws Exception { if (isOracle(connection)) { - return new OracleDbConnection(connection, resolveCustomTypes(), resolvedDbTypesCache, super.getCacheQueryTemplateSize()); + return new OracleDbConnection(connection, resolveCustomTypes(), resolvedDbTypesCache, super.getCacheQueryTemplateSize(), + new DataSourceConfigDbConnectionTracingMetadata(getDataSourceConfig().get())); } else { return super.createDbConnection(connection); } diff --git a/src/main/java/org/mule/extension/db/internal/domain/connection/mysql/MySqlConnectionProvider.java b/src/main/java/org/mule/extension/db/internal/domain/connection/mysql/MySqlConnectionProvider.java index 319feaabc..0b70ddd48 100644 --- a/src/main/java/org/mule/extension/db/internal/domain/connection/mysql/MySqlConnectionProvider.java +++ b/src/main/java/org/mule/extension/db/internal/domain/connection/mysql/MySqlConnectionProvider.java @@ -23,6 +23,7 @@ import org.mule.db.commons.api.exception.connection.DbError; import org.mule.db.commons.internal.domain.connection.DataSourceConfig; import org.mule.db.commons.internal.domain.connection.DbConnectionProvider; +import org.mule.db.commons.internal.domain.connection.DbConnectionTracingMetadata; import org.mule.runtime.extension.api.annotation.Alias; import org.mule.runtime.extension.api.annotation.ExternalLib; import org.mule.runtime.extension.api.annotation.param.ParameterGroup; @@ -72,4 +73,8 @@ public java.util.Optional getDbVendorErrorType(SQLException e) { return empty(); } + @Override + protected DbConnectionTracingMetadata getDbConnectionTracingMetadataFrom() { + return new MySqlConnectionTracingMetadata(mySqlParameters); + } } diff --git a/src/main/java/org/mule/extension/db/internal/domain/connection/mysql/MySqlConnectionTracingMetadata.java b/src/main/java/org/mule/extension/db/internal/domain/connection/mysql/MySqlConnectionTracingMetadata.java new file mode 100644 index 000000000..53f86f705 --- /dev/null +++ b/src/main/java/org/mule/extension/db/internal/domain/connection/mysql/MySqlConnectionTracingMetadata.java @@ -0,0 +1,51 @@ +/* + * Copyright (c) MuleSoft, Inc. All rights reserved. http://www.mulesoft.com + * The software in this package is published under the terms of the CPAL v1.0 + * license, a copy of which has been included with this distribution in the + * LICENSE.txt file. + */ +package org.mule.extension.db.internal.domain.connection.mysql; + +import org.mule.db.commons.internal.domain.connection.DbConnectionTracingMetadata; + +import static org.mule.db.commons.internal.domain.connection.ConnectionTracingMetadataUtils.getPeerNameFrom; +import static org.mule.db.commons.internal.domain.connection.ConnectionTracingMetadataUtils.getPeerTransportFrom; + +import static java.util.Optional.ofNullable; + +import java.util.Optional; + +public class MySqlConnectionTracingMetadata implements DbConnectionTracingMetadata { + + public static final String MYSQL = "mysql"; + private final MySqlConnectionParameters mySqlParameter; + + public MySqlConnectionTracingMetadata(MySqlConnectionParameters mySqlParameters) { + this.mySqlParameter = mySqlParameters; + } + + @Override + public String getDbSystem() { + return MYSQL; + } + + @Override + public String getConnectionString() { + return mySqlParameter.getUrl(); + } + + @Override + public String getUser() { + return mySqlParameter.getUser(); + } + + @Override + public Optional getPeerName() { + return ofNullable(getPeerNameFrom(mySqlParameter.getUrl())); + } + + @Override + public Optional getPeerTransport() { + return ofNullable(getPeerTransportFrom(mySqlParameter.getUrl())); + } +} diff --git a/src/main/java/org/mule/extension/db/internal/domain/connection/oracle/OracleDbConnection.java b/src/main/java/org/mule/extension/db/internal/domain/connection/oracle/OracleDbConnection.java index a7ba6e61f..fea38ecce 100644 --- a/src/main/java/org/mule/extension/db/internal/domain/connection/oracle/OracleDbConnection.java +++ b/src/main/java/org/mule/extension/db/internal/domain/connection/oracle/OracleDbConnection.java @@ -26,6 +26,7 @@ import org.mule.extension.db.internal.domain.connection.oracle.types.OracleSQLXMLType; import org.mule.extension.db.internal.domain.connection.oracle.types.OracleSYSXMLType; import org.mule.extension.db.internal.domain.connection.oracle.types.OracleXMLType; +import org.mule.db.commons.internal.domain.connection.DbConnectionTracingMetadata; import java.sql.Array; import java.sql.Connection; @@ -79,8 +80,9 @@ public class OracleDbConnection extends DefaultDbConnection { public OracleDbConnection(Connection jdbcConnection, List customDataTypes, - Map> resolvedDbTypesCache, long cacheQueryTemplateSize) { - super(jdbcConnection, customDataTypes, cacheQueryTemplateSize); + Map> resolvedDbTypesCache, long cacheQueryTemplateSize, + DbConnectionTracingMetadata dbConnectionTracingMetadata) { + super(jdbcConnection, customDataTypes, cacheQueryTemplateSize, dbConnectionTracingMetadata); this.resolvedDbTypesCache = resolvedDbTypesCache; } diff --git a/src/main/java/org/mule/extension/db/internal/domain/connection/oracle/OracleDbConnectionProvider.java b/src/main/java/org/mule/extension/db/internal/domain/connection/oracle/OracleDbConnectionProvider.java index 09ea28576..baa1dc6ad 100644 --- a/src/main/java/org/mule/extension/db/internal/domain/connection/oracle/OracleDbConnectionProvider.java +++ b/src/main/java/org/mule/extension/db/internal/domain/connection/oracle/OracleDbConnectionProvider.java @@ -7,10 +7,6 @@ package org.mule.extension.db.internal.domain.connection.oracle; -import static java.util.Optional.empty; -import static java.util.Optional.of; -import static java.util.Optional.ofNullable; - import static org.mule.db.commons.api.exception.connection.DbError.CANNOT_REACH; import static org.mule.db.commons.api.exception.connection.DbError.INVALID_CREDENTIALS; import static org.mule.db.commons.api.exception.connection.DbError.INVALID_DATABASE; @@ -21,6 +17,10 @@ import static org.mule.runtime.extension.api.annotation.param.ParameterGroup.CONNECTION; import static org.mule.extension.db.internal.util.MigrationUtils.mapDataSourceConfig; +import static java.util.Optional.empty; +import static java.util.Optional.of; +import static java.util.Optional.ofNullable; + import javax.sql.DataSource; import java.sql.Connection; import java.sql.SQLException; @@ -30,6 +30,7 @@ import org.mule.db.commons.api.exception.connection.DbError; import org.mule.db.commons.internal.domain.connection.DataSourceConfig; +import org.mule.db.commons.internal.domain.connection.DataSourceConfigDbConnectionTracingMetadata; import org.mule.db.commons.internal.domain.connection.DbConnection; import org.mule.db.commons.internal.domain.connection.DbConnectionProvider; import org.mule.db.commons.internal.domain.type.ResolvedDbType; @@ -79,7 +80,8 @@ public java.util.Optional getDataSourceConfig() { @Override protected DbConnection createDbConnection(Connection connection) throws Exception { return new OracleDbConnection(connection, super.resolveCustomTypes(), resolvedDbTypesCache, - super.getCacheQueryTemplateSize()); + super.getCacheQueryTemplateSize(), + new DataSourceConfigDbConnectionTracingMetadata(getDataSourceConfig().get())); } @Override diff --git a/src/main/java/org/mule/extension/db/internal/domain/connection/sqlserver/SqlServerConnection.java b/src/main/java/org/mule/extension/db/internal/domain/connection/sqlserver/SqlServerConnection.java index bb501c0d0..c43f59d9b 100644 --- a/src/main/java/org/mule/extension/db/internal/domain/connection/sqlserver/SqlServerConnection.java +++ b/src/main/java/org/mule/extension/db/internal/domain/connection/sqlserver/SqlServerConnection.java @@ -11,6 +11,7 @@ import org.mule.extension.db.internal.domain.connection.sqlserver.types.SqlServerBinaryDbType; import org.mule.extension.db.internal.domain.connection.sqlserver.types.SqlServerVarBinaryDbType; import org.mule.db.commons.internal.domain.type.DbType; +import org.mule.db.commons.internal.domain.connection.DbConnectionTracingMetadata; import java.sql.Connection; import java.util.ArrayList; @@ -23,8 +24,9 @@ */ public class SqlServerConnection extends DefaultDbConnection { - SqlServerConnection(Connection jdbcConnection, List customDataTypes, long cacheQueryTemplateSize) { - super(jdbcConnection, customDataTypes, cacheQueryTemplateSize); + SqlServerConnection(Connection jdbcConnection, List customDataTypes, long cacheQueryTemplateSize, + DbConnectionTracingMetadata dbConnectionTracingMetadata) { + super(jdbcConnection, customDataTypes, cacheQueryTemplateSize, dbConnectionTracingMetadata); } /** diff --git a/src/main/java/org/mule/extension/db/internal/domain/connection/sqlserver/SqlServerConnectionProvider.java b/src/main/java/org/mule/extension/db/internal/domain/connection/sqlserver/SqlServerConnectionProvider.java index a3b384554..522be45f7 100644 --- a/src/main/java/org/mule/extension/db/internal/domain/connection/sqlserver/SqlServerConnectionProvider.java +++ b/src/main/java/org/mule/extension/db/internal/domain/connection/sqlserver/SqlServerConnectionProvider.java @@ -23,8 +23,10 @@ import javax.sql.DataSource; import java.sql.Connection; import java.sql.SQLException; + import org.mule.db.commons.api.exception.connection.DbError; import org.mule.db.commons.internal.domain.connection.DataSourceConfig; +import org.mule.db.commons.internal.domain.connection.DataSourceConfigDbConnectionTracingMetadata; import org.mule.db.commons.internal.domain.connection.DbConnection; import org.mule.db.commons.internal.domain.connection.DbConnectionProvider; import org.mule.runtime.extension.api.annotation.Alias; @@ -56,7 +58,8 @@ public class SqlServerConnectionProvider extends DbConnectionProvider { @Override protected DbConnection createDbConnection(Connection connection) throws Exception { - return new SqlServerConnection(connection, super.resolveCustomTypes(), super.getCacheQueryTemplateSize()); + return new SqlServerConnection(connection, super.resolveCustomTypes(), super.getCacheQueryTemplateSize(), + new DataSourceConfigDbConnectionTracingMetadata(getDataSourceConfig().get())); } @Override diff --git a/src/main/java/org/mule/extension/db/internal/operation/DbBulkOperations.java b/src/main/java/org/mule/extension/db/internal/operation/DbBulkOperations.java index 4a0c36c0a..a7296cc13 100644 --- a/src/main/java/org/mule/extension/db/internal/operation/DbBulkOperations.java +++ b/src/main/java/org/mule/extension/db/internal/operation/DbBulkOperations.java @@ -8,6 +8,7 @@ import org.mule.db.commons.AbstractDbConnector; import org.mule.db.commons.internal.domain.connection.DbConnection; +import org.mule.db.commons.internal.domain.connection.DbConnectionTracingMetadata; import org.mule.db.commons.internal.domain.metadata.DbInputMetadataResolver; import org.mule.db.commons.internal.operation.BulkOperations; import org.mule.db.commons.internal.operation.OperationErrorTypeProvider; @@ -25,13 +26,20 @@ import org.mule.extension.db.api.param.BulkQueryDefinition; import org.mule.extension.db.api.param.BulkScript; import org.mule.extension.db.api.param.QuerySettings; +import org.mule.sdk.api.runtime.parameter.CorrelationInfo; +import org.mule.sdk.compatibility.api.utils.ForwardCompatibilityHelper; +import javax.inject.Inject; import java.sql.SQLException; import java.util.List; import java.util.Map; +import static org.mule.db.commons.internal.domain.query.QueryType.DELETE; +import static org.mule.db.commons.internal.domain.query.QueryType.INSERT; +import static org.mule.db.commons.internal.domain.query.QueryType.UPDATE; import static org.mule.db.commons.internal.operation.BaseDbOperations.QUERY_GROUP; import static org.mule.db.commons.internal.operation.BaseDbOperations.QUERY_SETTINGS; +import static org.mule.extension.db.internal.operation.tracing.TracingUtils.setAttributesForDbClientOperation; import static org.mule.extension.db.internal.util.MigrationUtils.mapBulkQueryDefinition; import static org.mule.extension.db.internal.util.MigrationUtils.mapBulkScript; import static org.mule.extension.db.internal.util.MigrationUtils.mapQuerySettings; @@ -47,6 +55,9 @@ public class DbBulkOperations implements Initialisable { private BulkOperations bulkOperations; + @Inject + private java.util.Optional forwardCompatibilityHelper; + @Override public void initialise() throws InitialisationException { this.bulkOperations = new BulkOperations.Builder().build(); @@ -70,8 +81,16 @@ public int[] bulkInsert(@DisplayName("Input Parameters") @Content @Placement( @ParameterGroup(name = QUERY_GROUP) BulkQueryDefinition query, @Config AbstractDbConnector connector, @Connection DbConnection connection, + CorrelationInfo correlationInfo, StreamingHelper streamingHelper) throws SQLException { + forwardCompatibilityHelper.ifPresent(fwh -> { + DbConnectionTracingMetadata dbConnectionTracingMetadata = connection.getDbConnectionTracingMetadata(); + setAttributesForDbClientOperation(dbConnectionTracingMetadata, + dbConnectionTracingMetadata.getDbSystem(), + fwh.getDistributedTraceContextManager(correlationInfo), + query.getSql()); + }); return bulkOperations.bulkInsert(bulkInputParameters, mapBulkQueryDefinition(query), connector, connection, streamingHelper); } @@ -94,8 +113,16 @@ public int[] bulkUpdate(@DisplayName("Input Parameters") @Content @Placement( @ParameterGroup(name = QUERY_GROUP) BulkQueryDefinition query, @Config AbstractDbConnector connector, @Connection DbConnection connection, + CorrelationInfo correlationInfo, StreamingHelper streamingHelper) throws SQLException { + forwardCompatibilityHelper.ifPresent(fwh -> { + DbConnectionTracingMetadata dbConnectionTracingMetadata = connection.getDbConnectionTracingMetadata(); + setAttributesForDbClientOperation(dbConnectionTracingMetadata, + UPDATE.name() + " " + dbConnectionTracingMetadata.getDbSystem(), + fwh.getDistributedTraceContextManager(correlationInfo), + query.getSql()); + }); return bulkOperations.bulkUpdate(bulkInputParameters, mapBulkQueryDefinition(query), connector, connection, streamingHelper); } @@ -117,8 +144,16 @@ public int[] bulkDelete(@DisplayName("Input Parameters") @Content @Placement( @ParameterGroup(name = QUERY_GROUP) BulkQueryDefinition query, @Config AbstractDbConnector connector, @Connection DbConnection connection, + CorrelationInfo correlationInfo, StreamingHelper streamingHelper) throws SQLException { + forwardCompatibilityHelper.ifPresent(fwh -> { + DbConnectionTracingMetadata dbConnectionTracingMetadata = connection.getDbConnectionTracingMetadata(); + setAttributesForDbClientOperation(dbConnectionTracingMetadata, + DELETE.name() + " " + dbConnectionTracingMetadata.getDbSystem(), + fwh.getDistributedTraceContextManager(correlationInfo), + query.getSql()); + }); return bulkOperations.bulkDelete(bulkInputParameters, mapBulkQueryDefinition(query), connector, connection, streamingHelper); } @@ -135,8 +170,16 @@ public int[] bulkDelete(@DisplayName("Input Parameters") @Content @Placement( */ public int[] executeScript(@ParameterGroup(name = QUERY_GROUP) BulkScript script, @ParameterGroup(name = QUERY_SETTINGS) QuerySettings settings, - @Connection DbConnection connection) + @Connection DbConnection connection, + CorrelationInfo correlationInfo) throws SQLException { + forwardCompatibilityHelper.ifPresent(fwh -> { + DbConnectionTracingMetadata dbConnectionTracingMetadata = connection.getDbConnectionTracingMetadata(); + setAttributesForDbClientOperation(dbConnectionTracingMetadata, + dbConnectionTracingMetadata.getDbSystem(), + fwh.getDistributedTraceContextManager(correlationInfo), + "