Skip to content

Commit d89a082

Browse files
GoutyEthanjeremy-degrootjcmejias1mas-chensivavenkatgogineni
authored andcommittedSep 29, 2023
[FLINK-29398] Provide rack ID to KafkaSource to take advantage of Rack Awareness
This closes #53. This closes #20. Co-authored-by: Jeremy DeGroot <jeremy.degroot@gmail.com> Co-authored-by: jcmejias1 <jcmejias1@gmail.com> Co-authored-by: Mason Chen <mas.chen@berkeley.edu> Co-authored-by: Ethan Gouty <ethan.gouty@imperva.com> Co-authored-by: Siva Venkat Gogineni <gogineni.sivavenkat@gmail.com>
1 parent 4c03d60 commit d89a082

File tree

6 files changed

+168
-7
lines changed

6 files changed

+168
-7
lines changed
 

‎docs/content/docs/connectors/datastream/kafka.md

+19
Original file line numberDiff line numberDiff line change
@@ -465,6 +465,25 @@ client dependencies in the job JAR, so you may need to rewrite it with the actua
465465
For detailed explanations of security configurations, please refer to
466466
<a href="https://kafka.apache.org/documentation/#security">the "Security" section in Apache Kafka documentation</a>.
467467

468+
## Kafka Rack Awareness
469+
470+
Kafka rack awareness allows Flink to select and control the cloud region and availability zone that Kafka consumers read from, based on the Rack ID. This feature reduces network costs and latency since it allows consumers to connect to the closest Kafka brokers, possibly colocated in the same cloud region and availability zone.
471+
A client's rack is indicated using the `client.rack` config, and should correspond to a broker's `broker.rack` config.
472+
473+
https://kafka.apache.org/documentation/#consumerconfigs_client.rack
474+
475+
### RackId
476+
477+
setRackIdSupplier() is the Builder method allows us to determine the consumer's rack. If provided, the Supplier will be run when the consumer is set up on the Task Manager, and the consumer's `client.rack` configuration will be set to the value.
478+
479+
One of the ways this can be implemented is by making setRackId equal to an environment variable within your taskManager, for instance:
480+
481+
```
482+
.setRackIdSupplier(() -> System.getenv("TM_NODE_AZ"))
483+
```
484+
485+
The "TM_NODE_AZ" is the name of the environment variable in the TaskManager container that contains the zone we want to use.
486+
468487
### Behind the Scene
469488
{{< hint info >}}
470489
If you are interested in how Kafka source works under the design of new data source API, you may

‎flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java

+15-2
Original file line numberDiff line numberDiff line change
@@ -49,13 +49,15 @@
4949
import org.apache.flink.core.io.SimpleVersionedSerializer;
5050
import org.apache.flink.metrics.MetricGroup;
5151
import org.apache.flink.util.UserCodeClassLoader;
52+
import org.apache.flink.util.function.SerializableSupplier;
5253

5354
import org.apache.kafka.clients.consumer.ConsumerRecord;
5455

5556
import javax.annotation.Nullable;
5657

5758
import java.io.IOException;
5859
import java.util.Collection;
60+
import java.util.Optional;
5961
import java.util.Properties;
6062
import java.util.function.Consumer;
6163
import java.util.function.Supplier;
@@ -98,20 +100,24 @@ public class KafkaSource<OUT>
98100
private final KafkaRecordDeserializationSchema<OUT> deserializationSchema;
99101
// The configurations.
100102
private final Properties props;
103+
// Client rackId callback
104+
private final SerializableSupplier<String> rackIdSupplier;
101105

102106
KafkaSource(
103107
KafkaSubscriber subscriber,
104108
OffsetsInitializer startingOffsetsInitializer,
105109
@Nullable OffsetsInitializer stoppingOffsetsInitializer,
106110
Boundedness boundedness,
107111
KafkaRecordDeserializationSchema<OUT> deserializationSchema,
108-
Properties props) {
112+
Properties props,
113+
SerializableSupplier<String> rackIdSupplier) {
109114
this.subscriber = subscriber;
110115
this.startingOffsetsInitializer = startingOffsetsInitializer;
111116
this.stoppingOffsetsInitializer = stoppingOffsetsInitializer;
112117
this.boundedness = boundedness;
113118
this.deserializationSchema = deserializationSchema;
114119
this.props = props;
120+
this.rackIdSupplier = rackIdSupplier;
115121
}
116122

117123
/**
@@ -157,7 +163,14 @@ public UserCodeClassLoader getUserCodeClassLoader() {
157163
new KafkaSourceReaderMetrics(readerContext.metricGroup());
158164

159165
Supplier<KafkaPartitionSplitReader> splitReaderSupplier =
160-
() -> new KafkaPartitionSplitReader(props, readerContext, kafkaSourceReaderMetrics);
166+
() ->
167+
new KafkaPartitionSplitReader(
168+
props,
169+
readerContext,
170+
kafkaSourceReaderMetrics,
171+
Optional.ofNullable(rackIdSupplier)
172+
.map(Supplier::get)
173+
.orElse(null));
161174
KafkaRecordEmitter<OUT> recordEmitter = new KafkaRecordEmitter<>(deserializationSchema);
162175

163176
return new KafkaSourceReader<>(

‎flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java

+18-1
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializerValidator;
2727
import org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriber;
2828
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
29+
import org.apache.flink.util.function.SerializableSupplier;
2930

3031
import org.apache.kafka.clients.consumer.ConsumerConfig;
3132
import org.apache.kafka.common.TopicPartition;
@@ -80,6 +81,7 @@
8081
* .setTopics(Arrays.asList(TOPIC1, TOPIC2))
8182
* .setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))
8283
* .setUnbounded(OffsetsInitializer.latest())
84+
* .setRackId(() -> MY_RACK_ID)
8385
* .build();
8486
* }</pre>
8587
*
@@ -100,6 +102,8 @@ public class KafkaSourceBuilder<OUT> {
100102
private KafkaRecordDeserializationSchema<OUT> deserializationSchema;
101103
// The configurations.
102104
protected Properties props;
105+
// Client rackId supplier
106+
private SerializableSupplier<String> rackIdSupplier;
103107

104108
KafkaSourceBuilder() {
105109
this.subscriber = null;
@@ -108,6 +112,7 @@ public class KafkaSourceBuilder<OUT> {
108112
this.boundedness = Boundedness.CONTINUOUS_UNBOUNDED;
109113
this.deserializationSchema = null;
110114
this.props = new Properties();
115+
this.rackIdSupplier = null;
111116
}
112117

113118
/**
@@ -355,6 +360,17 @@ public KafkaSourceBuilder<OUT> setClientIdPrefix(String prefix) {
355360
return setProperty(KafkaSourceOptions.CLIENT_ID_PREFIX.key(), prefix);
356361
}
357362

363+
/**
364+
* Set the clientRackId supplier to be passed down to the KafkaPartitionSplitReader.
365+
*
366+
* @param rackIdCallback callback to provide Kafka consumer client.rack
367+
* @return this KafkaSourceBuilder
368+
*/
369+
public KafkaSourceBuilder<OUT> setRackIdSupplier(SerializableSupplier<String> rackIdCallback) {
370+
this.rackIdSupplier = rackIdCallback;
371+
return this;
372+
}
373+
358374
/**
359375
* Set an arbitrary property for the KafkaSource and KafkaConsumer. The valid keys can be found
360376
* in {@link ConsumerConfig} and {@link KafkaSourceOptions}.
@@ -422,7 +438,8 @@ public KafkaSource<OUT> build() {
422438
stoppingOffsetsInitializer,
423439
boundedness,
424440
deserializationSchema,
425-
props);
441+
props,
442+
rackIdSupplier);
426443
}
427444

428445
// ------------- private helpers --------------

‎flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java

+23
Original file line numberDiff line numberDiff line change
@@ -80,11 +80,20 @@ public KafkaPartitionSplitReader(
8080
Properties props,
8181
SourceReaderContext context,
8282
KafkaSourceReaderMetrics kafkaSourceReaderMetrics) {
83+
this(props, context, kafkaSourceReaderMetrics, null);
84+
}
85+
86+
public KafkaPartitionSplitReader(
87+
Properties props,
88+
SourceReaderContext context,
89+
KafkaSourceReaderMetrics kafkaSourceReaderMetrics,
90+
String rackIdSupplier) {
8391
this.subtaskId = context.getIndexOfSubtask();
8492
this.kafkaSourceReaderMetrics = kafkaSourceReaderMetrics;
8593
Properties consumerProps = new Properties();
8694
consumerProps.putAll(props);
8795
consumerProps.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, createConsumerClientId(props));
96+
setConsumerClientRack(consumerProps, rackIdSupplier);
8897
this.consumer = new KafkaConsumer<>(consumerProps);
8998
this.stoppingOffsets = new HashMap<>();
9099
this.groupId = consumerProps.getProperty(ConsumerConfig.GROUP_ID_CONFIG);
@@ -256,6 +265,20 @@ KafkaConsumer<byte[], byte[]> consumer() {
256265

257266
// --------------- private helper method ----------------------
258267

268+
/**
269+
* This Method performs Null and empty Rack Id validation and sets the rack id to the
270+
* client.rack Consumer Config.
271+
*
272+
* @param consumerProps Consumer Property.
273+
* @param rackId Rack Id's.
274+
*/
275+
@VisibleForTesting
276+
void setConsumerClientRack(Properties consumerProps, String rackId) {
277+
if (rackId != null && !rackId.isEmpty()) {
278+
consumerProps.setProperty(ConsumerConfig.CLIENT_RACK_CONFIG, rackId);
279+
}
280+
}
281+
259282
private void parseStartingOffsets(
260283
KafkaPartitionSplit split,
261284
List<TopicPartition> partitionsStartingFromEarliest,

‎flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReaderTest.java

+42-1
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import org.junit.jupiter.params.ParameterizedTest;
4949
import org.junit.jupiter.params.provider.CsvSource;
5050
import org.junit.jupiter.params.provider.EmptySource;
51+
import org.junit.jupiter.params.provider.NullAndEmptySource;
5152
import org.junit.jupiter.params.provider.ValueSource;
5253

5354
import java.io.IOException;
@@ -319,6 +320,38 @@ public void testUsingCommittedOffsetsWithEarliestOrLatestOffsetResetStrategy(
319320
assertThat(reader.consumer().position(partition)).isEqualTo(expectedOffset);
320321
}
321322

323+
@Test
324+
public void testConsumerClientRackSupplier() {
325+
String rackId = "use1-az1";
326+
Properties properties = new Properties();
327+
KafkaPartitionSplitReader reader =
328+
createReader(
329+
properties,
330+
UnregisteredMetricsGroup.createSourceReaderMetricGroup(),
331+
rackId);
332+
333+
// Here we call the helper function directly, because the KafkaPartitionSplitReader
334+
// doesn't allow us to examine the final ConsumerConfig object
335+
reader.setConsumerClientRack(properties, rackId);
336+
assertThat(properties.get(ConsumerConfig.CLIENT_RACK_CONFIG)).isEqualTo(rackId);
337+
}
338+
339+
@ParameterizedTest
340+
@NullAndEmptySource
341+
public void testSetConsumerClientRackIgnoresNullAndEmpty(String rackId) {
342+
Properties properties = new Properties();
343+
KafkaPartitionSplitReader reader =
344+
createReader(
345+
properties,
346+
UnregisteredMetricsGroup.createSourceReaderMetricGroup(),
347+
rackId);
348+
349+
// Here we call the helper function directly, because the KafkaPartitionSplitReader
350+
// doesn't allow us to examine the final ConsumerConfig object
351+
reader.setConsumerClientRack(properties, rackId);
352+
assertThat(properties.containsKey(ConsumerConfig.CLIENT_RACK_CONFIG)).isFalse();
353+
}
354+
322355
// ------------------
323356

324357
private void assignSplitsAndFetchUntilFinish(KafkaPartitionSplitReader reader, int readerId)
@@ -383,6 +416,13 @@ private KafkaPartitionSplitReader createReader() {
383416

384417
private KafkaPartitionSplitReader createReader(
385418
Properties additionalProperties, SourceReaderMetricGroup sourceReaderMetricGroup) {
419+
return createReader(additionalProperties, sourceReaderMetricGroup, null);
420+
}
421+
422+
private KafkaPartitionSplitReader createReader(
423+
Properties additionalProperties,
424+
SourceReaderMetricGroup sourceReaderMetricGroup,
425+
String rackId) {
386426
Properties props = new Properties();
387427
props.putAll(KafkaSourceTestEnv.getConsumerProperties(ByteArrayDeserializer.class));
388428
props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");
@@ -394,7 +434,8 @@ private KafkaPartitionSplitReader createReader(
394434
return new KafkaPartitionSplitReader(
395435
props,
396436
new TestingReaderContext(new Configuration(), sourceReaderMetricGroup),
397-
kafkaSourceReaderMetrics);
437+
kafkaSourceReaderMetrics,
438+
rackId);
398439
}
399440

400441
private Map<String, KafkaPartitionSplit> assignSplits(

‎flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReaderTest.java

+51-3
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import org.apache.flink.metrics.MetricGroup;
4141
import org.apache.flink.metrics.testutils.MetricListener;
4242
import org.apache.flink.runtime.metrics.groups.InternalSourceReaderMetricGroup;
43+
import org.apache.flink.util.function.SerializableSupplier;
4344

4445
import org.apache.kafka.clients.admin.AdminClient;
4546
import org.apache.kafka.clients.admin.NewTopic;
@@ -53,6 +54,7 @@
5354
import org.junit.jupiter.api.AfterAll;
5455
import org.junit.jupiter.api.BeforeAll;
5556
import org.junit.jupiter.api.Test;
57+
import org.mockito.Mockito;
5658

5759
import java.time.Duration;
5860
import java.util.ArrayList;
@@ -79,6 +81,8 @@
7981
import static org.apache.flink.connector.kafka.testutils.KafkaSourceTestEnv.NUM_PARTITIONS;
8082
import static org.apache.flink.core.testutils.CommonTestUtils.waitUtil;
8183
import static org.assertj.core.api.Assertions.assertThat;
84+
import static org.mockito.Mockito.never;
85+
import static org.mockito.Mockito.verify;
8286

8387
/** Unit tests for {@link KafkaSourceReader}. */
8488
public class KafkaSourceReaderTest extends SourceReaderTestBase<KafkaPartitionSplit> {
@@ -271,7 +275,8 @@ void testDisableOffsetCommit() throws Exception {
271275
Boundedness.CONTINUOUS_UNBOUNDED,
272276
new TestingReaderContext(),
273277
(ignore) -> {},
274-
properties)) {
278+
properties,
279+
null)) {
275280
reader.addSplits(
276281
getSplits(numSplits, NUM_RECORDS_PER_SPLIT, Boundedness.CONTINUOUS_UNBOUNDED));
277282
ValidatingSourceOutput output = new ValidatingSourceOutput();
@@ -479,6 +484,45 @@ public void testSupportsPausingOrResumingSplits() throws Exception {
479484
}
480485
}
481486

487+
@Test
488+
public void testThatReaderDoesNotCallRackIdSupplierOnInit() throws Exception {
489+
SerializableSupplier<String> rackIdSupplier = Mockito.mock(SerializableSupplier.class);
490+
491+
try (KafkaSourceReader<Integer> reader =
492+
(KafkaSourceReader<Integer>)
493+
createReader(
494+
Boundedness.CONTINUOUS_UNBOUNDED,
495+
new TestingReaderContext(),
496+
(ignore) -> {},
497+
new Properties(),
498+
rackIdSupplier)) {
499+
// Do nothing here
500+
}
501+
502+
verify(rackIdSupplier, never()).get();
503+
}
504+
505+
@Test
506+
public void testThatReaderDoesCallRackIdSupplierOnSplitAssignment() throws Exception {
507+
SerializableSupplier<String> rackIdSupplier = Mockito.mock(SerializableSupplier.class);
508+
Mockito.when(rackIdSupplier.get()).thenReturn("use1-az1");
509+
510+
try (KafkaSourceReader<Integer> reader =
511+
(KafkaSourceReader<Integer>)
512+
createReader(
513+
Boundedness.CONTINUOUS_UNBOUNDED,
514+
new TestingReaderContext(),
515+
(ignore) -> {},
516+
new Properties(),
517+
rackIdSupplier)) {
518+
reader.addSplits(
519+
Collections.singletonList(
520+
new KafkaPartitionSplit(new TopicPartition(TOPIC, 1), 1L)));
521+
}
522+
523+
verify(rackIdSupplier).get();
524+
}
525+
482526
// ------------------------------------------
483527

484528
@Override
@@ -535,14 +579,15 @@ private SourceReader<Integer, KafkaPartitionSplit> createReader(
535579
throws Exception {
536580
Properties properties = new Properties();
537581
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
538-
return createReader(boundedness, context, splitFinishedHook, properties);
582+
return createReader(boundedness, context, splitFinishedHook, properties, null);
539583
}
540584

541585
private SourceReader<Integer, KafkaPartitionSplit> createReader(
542586
Boundedness boundedness,
543587
SourceReaderContext context,
544588
Consumer<Collection<String>> splitFinishedHook,
545-
Properties props)
589+
Properties props,
590+
SerializableSupplier<String> rackIdSupplier)
546591
throws Exception {
547592
KafkaSourceBuilder<Integer> builder =
548593
KafkaSource.<Integer>builder()
@@ -559,6 +604,9 @@ private SourceReader<Integer, KafkaPartitionSplit> createReader(
559604
if (boundedness == Boundedness.BOUNDED) {
560605
builder.setBounded(OffsetsInitializer.latest());
561606
}
607+
if (rackIdSupplier != null) {
608+
builder.setRackIdSupplier(rackIdSupplier);
609+
}
562610

563611
return KafkaSourceTestUtils.createReaderWithFinishedSplitHook(
564612
builder.build(), context, splitFinishedHook);

0 commit comments

Comments
 (0)
Please sign in to comment.