Skip to main content

 Run first Kafka application -- Kafka inside (2)

start zookeeper:


bin/zookeeper-server-start.sh config/zookeeper.properties


start kafka server:


bin/kafka-server-start.sh config/server.properties

run the demo:

a. For unlimited sync-producer-consumer run, `run bin/java-producer-consumer-demo.sh sync`
b. For unlimited async-producer-consumer run, `run bin/java-producer-consumer-demo.sh`adf

Let's take a look the source code java-producer-consumer-demo.sh,.
#!/bin/bash
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
base_dir=$(dirname $0)/../..
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xmx512M"
fi
exec $base_dir/bin/kafka-run-class.sh kafka.examples.KafkaConsumerProducerDemo $@
the script is mainly use kafka.examples.KafkaConsumerProducerDemo as the demo class.

public class KafkaConsumerProducerDemo {
public static void main(String[] args) {
boolean isAsync = args.length == 0 || !args[0].trim().equalsIgnoreCase("sync");
Producer producerThread = new Producer(KafkaProperties.TOPIC, isAsync);
producerThread.start();
Consumer consumerThread = new Consumer(KafkaProperties.TOPIC);
consumerThread.start();
}
}
the code created 2 topics(queues) for produce and consumer.

the time to dig into code step by step.


Firstly, take a look at source code of Producer
public class Producer extends Thread {
private final KafkaProducer<Integer, String> producer;
private final String topic;
private final Boolean isAsync;
public Producer(String topic, Boolean isAsync) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT);
props.put(ProducerConfig.CLIENT_ID_CONFIG, "DemoProducer");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producer = new KafkaProducer<>(props);
this.topic = topic;
this.isAsync = isAsync;
}
public void run() {
int messageNo = 1;
while (true) {
String messageStr = "Message_" + messageNo;
long startTime = System.currentTimeMillis();
if (isAsync) { // Send asynchronously
producer.send(new ProducerRecord<>(topic,
messageNo,
messageStr), new DemoCallBack(startTime, messageNo, messageStr));
} else { // Send synchronously
try {
producer.send(new ProducerRecord<>(topic,
messageNo,
messageStr)).get();
System.out.println("Sent message: (" + messageNo + ", " + messageStr + ")");
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
++messageNo;
}
}
}
class DemoCallBack implements Callback {
private final long startTime;
private final int key;
private final String message;
public DemoCallBack(long startTime, int key, String message) {
this.startTime = startTime;
this.key = key;
this.message = message;
}
/**
* A callback method the user can implement to provide asynchronous handling of request completion. This method will
* be called when the record sent to the server has been acknowledged. When exception is not null in the callback,
* metadata will contain the special -1 value for all fields except for topicPartition, which will be valid.
*
* @param metadata The metadata for the record that was sent (i.e. the partition and offset). Null if an error
* occurred.
* @param exception The exception thrown during processing of this record. Null if no error occurred.
*/
public void onCompletion(RecordMetadata metadata, Exception exception) {
long elapsedTime = System.currentTimeMillis() - startTime;
if (metadata != null) {
System.out.println(
"message(" + key + ", " + message + ") sent to partition(" + metadata.partition() +
"), " +
"offset(" + metadata.offset() + ") in " + elapsedTime + " ms");
} else {
exception.printStackTrace();
}
}
}
view raw Producer.java hosted with ❤ by GitHub
Producer created messages and send to kafka server.
on line 53, KafkaProducer send the message with callback : ProducerRecord
Per each message, the topic, messgeNo and messager body are defined.

DemoCallBack provide asynchronous handling of request completion.
here, DemoCallBack just print the message when completing send the message.


@Override
public void doWork() {
consumer.subscribe(Collections.singletonList(this.topic));
ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<Integer, String> record : records) {
System.out.println("Received message: (" + record.key() + ", " + record.value() + ") at offset " + record.offset());
}
}
view raw Consumer.java hosted with ❤ by GitHub
The Consumer class will poll the data every one second and print the message.
all producerRecord and Consume inherited from Bean.

Comments

Popular posts from this blog

How to fix "ValueError when trying to compile python module with VC Express"

When I tried to compile the python, I always get compile issue as following: ------------ ... File "C:\Python26\lib\ distutils\msvc9compiler.py ", line 358, in initialize vc_env = query_vcvarsall(VERSION, plat_spec) File "C:\Python26\lib\ distutils\msvc9compiler.py ", line 274, in query_vcvarsall raise ValueError(str(list(result.keys()))) ValueError: [u'path'] --------------------- Python community discussed a lot but no solution: http://bugs.python.org/issue7511 The root cause is because the latest visual studio change the *.bat file a lot especially on 64bit env. The python 2.7 didn't update the path accordingly. Based on the assumption above, the following solution worked for me. To install Visual Studio 2008 Express Edition with all required components: 1. Install Microsoft Visual Studio 2008 Express Edition. The main Visual Studio 2008 Express installer is available from (the C++ installer name is vcsetup.exe): https://ww...

How to convert the ResultSet to Stream

Java 8 provided the Stream family and easy operation of it. The way of pipeline usage made the code clear and smart. However, ResultSet is still go with very legacy way to process. Per actual ResultSet usage, it is really helpful if converted as Stream. Here is the simple usage of above: StreamUtils.uncheckedConsumer is required to convert the the SQLException to runtimeException to make the Lamda clear.

How to run odoo(openerp8) in IDE from source on windows

1. install python 2.7 (per openerp8's official doc, python 27 is required.) 2. download get-pip.py from https://bootstrap.pypa.io/get-pip.py , execute the command: python get-pip.py 3. get source of openerp8 from https://github.com/odoo/odoo.git 4. execute the command: pip install -r D:\source_code\odoo\openerp8/requirements.txt . (requirements.txt contains all dependencies. ) The pip will install the python module automatically. However, the real world always bring us the issues because our C++ compile environment is not setup correctly.  we will get the link error when pip try to install psycopg2 (driver to access postgresql db.). Go to  http://www.stickpeople.com/projects/python/win-psycopg/  and choose the compiled binary file directly. For Python-ldap, go to  http://www.lfd.uci.edu/~gohlke/pythonlibs/ 5. Finally, go to http://sourceforge.net/projects/pywin32/files/pywin32 and choose correct version for python-win32service. 6. If you are family with...