start zookeeper:
bin/zookeeper-server-start.sh config/zookeeper.properties
start kafka server:
bin/kafka-server-start.sh config/server.properties
run the demo:
a. For unlimited sync-producer-consumer run, `run bin/java-producer-consumer-demo.sh sync` b. For unlimited async-producer-consumer run, `run bin/java-producer-consumer-demo.sh`adf
Let's take a look the source code java-producer-consumer-demo.sh,.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/bash | |
# Licensed to the Apache Software Foundation (ASF) under one or more | |
# contributor license agreements. See the NOTICE file distributed with | |
# this work for additional information regarding copyright ownership. | |
# The ASF licenses this file to You under the Apache License, Version 2.0 | |
# (the "License"); you may not use this file except in compliance with | |
# the License. You may obtain a copy of the License at | |
# | |
# http://www.apache.org/licenses/LICENSE-2.0 | |
# | |
# Unless required by applicable law or agreed to in writing, software | |
# distributed under the License is distributed on an "AS IS" BASIS, | |
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
# See the License for the specific language governing permissions and | |
# limitations under the License. | |
base_dir=$(dirname $0)/../.. | |
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then | |
export KAFKA_HEAP_OPTS="-Xmx512M" | |
fi | |
exec $base_dir/bin/kafka-run-class.sh kafka.examples.KafkaConsumerProducerDemo $@ |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class KafkaConsumerProducerDemo { | |
public static void main(String[] args) { | |
boolean isAsync = args.length == 0 || !args[0].trim().equalsIgnoreCase("sync"); | |
Producer producerThread = new Producer(KafkaProperties.TOPIC, isAsync); | |
producerThread.start(); | |
Consumer consumerThread = new Consumer(KafkaProperties.TOPIC); | |
consumerThread.start(); | |
} | |
} |
the time to dig into code step by step.
Firstly, take a look at source code of Producer
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class Producer extends Thread { | |
private final KafkaProducer<Integer, String> producer; | |
private final String topic; | |
private final Boolean isAsync; | |
public Producer(String topic, Boolean isAsync) { | |
Properties props = new Properties(); | |
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT); | |
props.put(ProducerConfig.CLIENT_ID_CONFIG, "DemoProducer"); | |
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName()); | |
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); | |
producer = new KafkaProducer<>(props); | |
this.topic = topic; | |
this.isAsync = isAsync; | |
} | |
public void run() { | |
int messageNo = 1; | |
while (true) { | |
String messageStr = "Message_" + messageNo; | |
long startTime = System.currentTimeMillis(); | |
if (isAsync) { // Send asynchronously | |
producer.send(new ProducerRecord<>(topic, | |
messageNo, | |
messageStr), new DemoCallBack(startTime, messageNo, messageStr)); | |
} else { // Send synchronously | |
try { | |
producer.send(new ProducerRecord<>(topic, | |
messageNo, | |
messageStr)).get(); | |
System.out.println("Sent message: (" + messageNo + ", " + messageStr + ")"); | |
} catch (InterruptedException | ExecutionException e) { | |
e.printStackTrace(); | |
} | |
} | |
++messageNo; | |
} | |
} | |
} | |
class DemoCallBack implements Callback { | |
private final long startTime; | |
private final int key; | |
private final String message; | |
public DemoCallBack(long startTime, int key, String message) { | |
this.startTime = startTime; | |
this.key = key; | |
this.message = message; | |
} | |
/** | |
* A callback method the user can implement to provide asynchronous handling of request completion. This method will | |
* be called when the record sent to the server has been acknowledged. When exception is not null in the callback, | |
* metadata will contain the special -1 value for all fields except for topicPartition, which will be valid. | |
* | |
* @param metadata The metadata for the record that was sent (i.e. the partition and offset). Null if an error | |
* occurred. | |
* @param exception The exception thrown during processing of this record. Null if no error occurred. | |
*/ | |
public void onCompletion(RecordMetadata metadata, Exception exception) { | |
long elapsedTime = System.currentTimeMillis() - startTime; | |
if (metadata != null) { | |
System.out.println( | |
"message(" + key + ", " + message + ") sent to partition(" + metadata.partition() + | |
"), " + | |
"offset(" + metadata.offset() + ") in " + elapsedTime + " ms"); | |
} else { | |
exception.printStackTrace(); | |
} | |
} | |
} |
on line 53, KafkaProducer send the message with callback : ProducerRecord
Per each message, the topic, messgeNo and messager body are defined.
DemoCallBack provide asynchronous handling of request completion.
here, DemoCallBack just print the message when completing send the message.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@Override | |
public void doWork() { | |
consumer.subscribe(Collections.singletonList(this.topic)); | |
ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofSeconds(1)); | |
for (ConsumerRecord<Integer, String> record : records) { | |
System.out.println("Received message: (" + record.key() + ", " + record.value() + ") at offset " + record.offset()); | |
} | |
} |
all producerRecord and Consume inherited from Bean.
Comments
Post a Comment