Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[INLONG-11349][Sort] Integrate opentelemetry for sort-connectors-v1.15 #11351

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.inlong.sort.iceberg.source.reader;

import org.apache.inlong.sort.base.util.OpenTelemetryLogger;

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSourceReaderBase;
Expand All @@ -25,6 +27,7 @@
import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
import org.apache.iceberg.flink.source.split.SplitRequestEvent;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.logging.log4j.Level;

import java.util.Collection;
import java.util.Collections;
Expand All @@ -40,6 +43,7 @@ public class IcebergSourceReader<T>
SingleThreadMultiplexSourceReaderBase<RecordAndPosition<T>, T, IcebergSourceSplit, IcebergSourceSplit> {

private final InlongIcebergSourceReaderMetrics<T> metrics;
private final OpenTelemetryLogger openTelemetryLogger;
public IcebergSourceReader(
InlongIcebergSourceReaderMetrics<T> metrics,
ReaderFunction<T> readerFunction,
Expand All @@ -50,17 +54,28 @@ public IcebergSourceReader(
context.getConfiguration(),
context);
this.metrics = metrics;
this.openTelemetryLogger = new OpenTelemetryLogger.Builder()
.setLogLevel(Level.ERROR)
.setServiceName(this.getClass().getSimpleName())
.setLocalHostIp(this.context.getLocalHostName()).build();
}

@Override
public void start() {
this.openTelemetryLogger.install();
// We request a split only if we did not get splits during the checkpoint restore.
// Otherwise, reader restarts will keep requesting more and more splits.
if (getNumberOfCurrentlyAssignedSplits() == 0) {
requestSplit(Collections.emptyList());
}
}

@Override
public void close() throws Exception {
super.close();
openTelemetryLogger.uninstall();
}

@Override
protected void onSplitFinished(Map<String, IcebergSourceSplit> finishedSplitIds) {
requestSplit(Lists.newArrayList(finishedSplitIds.keySet()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.inlong.sort.kafka.source.reader;

import org.apache.inlong.sort.base.util.OpenTelemetryLogger;
import org.apache.inlong.sort.kafka.table.DynamicKafkaDeserializationSchema;

import org.apache.flink.annotation.Internal;
Expand All @@ -37,6 +38,7 @@
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.logging.log4j.Level;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -66,6 +68,7 @@ public class KafkaSourceReader<T>
private final KafkaSourceReaderMetrics kafkaSourceReaderMetrics;
private final boolean commitOffsetsOnCheckpoint;
private final KafkaDeserializationSchema<RowData> metricSchema;
private final OpenTelemetryLogger openTelemetryLogger;

public KafkaSourceReader(
FutureCompletingBlockingQueue<RecordsWithSplitIds<ConsumerRecord<byte[], byte[]>>> elementsQueue,
Expand All @@ -87,6 +90,22 @@ public KafkaSourceReader(
"Offset commit on checkpoint is disabled. "
+ "Consuming offset will not be reported back to Kafka cluster.");
}
this.openTelemetryLogger = new OpenTelemetryLogger.Builder()
.setLogLevel(Level.ERROR)
.setServiceName(this.getClass().getSimpleName())
.setLocalHostIp(this.context.getLocalHostName()).build();
}

@Override
public void start() {
this.openTelemetryLogger.install();
super.start();
}

@Override
public void close() throws Exception {
super.close();
openTelemetryLogger.uninstall(); // 关闭日志上报功能
qy-liuhuo marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.inlong.sort.mysql.source.reader;

import org.apache.inlong.sort.base.util.OpenTelemetryLogger;
import org.apache.inlong.sort.mysql.RowDataDebeziumDeserializeSchema;

import com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils;
Expand Down Expand Up @@ -55,6 +56,7 @@
import org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager;
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.logging.log4j.Level;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -88,6 +90,7 @@ public class MySqlSourceReader<T>
private final MySqlSourceReaderContext mySqlSourceReaderContext;
private MySqlBinlogSplit suspendedBinlogSplit;
private final DebeziumDeserializationSchema<T> metricSchema;
private final OpenTelemetryLogger openTelemetryLogger;

public MySqlSourceReader(
FutureCompletingBlockingQueue<RecordsWithSplitIds<SourceRecords>> elementQueue,
Expand All @@ -109,15 +112,26 @@ public MySqlSourceReader(
this.mySqlSourceReaderContext = context;
this.suspendedBinlogSplit = null;
this.metricSchema = metricSchema;
this.openTelemetryLogger = new OpenTelemetryLogger.Builder()
.setLogLevel(Level.ERROR)
.setServiceName(this.getClass().getSimpleName())
.setLocalHostIp(this.context.getLocalHostName()).build();
}

@Override
public void start() {
openTelemetryLogger.install();
if (getNumberOfCurrentlyAssignedSplits() == 0) {
context.sendSplitRequest();
}
}

@Override
public void close() throws Exception {
super.close();
openTelemetryLogger.uninstall();
}

@Override
protected MySqlSplitState initializedState(MySqlSplit split) {
if (split.isSnapshotSplit()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.inlong.sort.pulsar.source.reader;

import org.apache.inlong.sort.base.util.OpenTelemetryLogger;

import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.SourceReaderBase;
Expand Down Expand Up @@ -44,6 +46,7 @@ abstract class PulsarSourceReaderBase<OUT>
protected final SourceConfiguration sourceConfiguration;
protected final PulsarClient pulsarClient;
protected final PulsarAdmin pulsarAdmin;
private final OpenTelemetryLogger openTelemetryLogger;

protected PulsarSourceReaderBase(
FutureCompletingBlockingQueue<RecordsWithSplitIds<PulsarMessage<OUT>>> elementsQueue,
Expand All @@ -62,6 +65,10 @@ protected PulsarSourceReaderBase(
this.sourceConfiguration = sourceConfiguration;
this.pulsarClient = pulsarClient;
this.pulsarAdmin = pulsarAdmin;
this.openTelemetryLogger = new OpenTelemetryLogger.Builder()
.setLogLevel(org.apache.logging.log4j.Level.ERROR)
.setServiceName(this.getClass().getSimpleName())
.setLocalHostIp(this.context.getLocalHostName()).build();
}

@Override
Expand All @@ -75,6 +82,12 @@ protected PulsarPartitionSplit toSplitType(
return splitState.toPulsarPartitionSplit();
}

@Override
public void start() {
this.openTelemetryLogger.install();
super.start();
}

@Override
public void close() throws Exception {
// Close the all the consumers first.
Expand All @@ -83,5 +96,6 @@ public void close() throws Exception {
// Close shared pulsar resources.
pulsarClient.shutdown();
pulsarAdmin.close();
openTelemetryLogger.uninstall();
}
}
Loading