先安装zookeeper
docker run -d --restart=always --name zookeeper --volume /etc/localtime:/etc/localtime -p 2181:2181 -p 2888:2888 -p 3888:3888 -t zookeeper
再安装kafka
docker run -d --name kafka --publish 9092:9092 \
--link zookeeper \
--env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 \
--env KAFKA_ADVERTISED_HOST_NAME=192.168.1.22 \
--env KAFKA_ADVERTISED_PORT=9092 --restart=always \
--volume /etc/localtime:/etc/localtime \
wurstmeister/kafka
192.168.1.22为访问地址
下面为application.yml关于kafka配置
kafka:
producer:
retries: 1
servers: 192.168.1.22:9092
linger: 1
batch:
size: 4096
buffer:
memory: 40960
consumer:
auto:
offset:
reset: latest
commit:
interval: 100
servers: 192.168.1.22:9092
session:
timeout: 20000
enable:
auto:
commit: true
concurrency: 10
group:
id: ${spring.application.name}-group
java消费者配置类
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
@Value("${kafka.consumer.servers}")
private String servers;
@Value("${kafka.consumer.enable.auto.commit}")
private boolean enableAutoCommit;
@Value("${kafka.consumer.session.timeout}")
private String sessionTimeout;
@Value("${kafka.consumer.auto.commit.interval}")
private String autoCommitInterval;
@Value("${kafka.consumer.group.id}")
private String groupId;
@Value("${kafka.consumer.auto.offset.reset}")
private String autoOffsetReset;
@Value("${kafka.consumer.concurrency}")
private int concurrency;
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(concurrency);
factory.getContainerProperties().setPollTimeout(1500);
return factory;
}
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
public Map<String, Object> consumerConfigs() {
Map<String, Object> propsMap = new HashMap<>(8);
propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);
propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
return propsMap;
}
}
java生产者配置类
@Configuration
@EnableKafka
public class KafkaProducerConfig {
@Value("${kafka.producer.servers}")
private String servers;
@Value("${kafka.producer.retries}")
private int retries;
@Value("${kafka.producer.batch.size}")
private int batchSize;
@Value("${kafka.producer.linger}")
private int linger;
@Value("${kafka.producer.buffer.memory}")
private int bufferMemory;
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate(producerFactory());
}
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
props.put(ProducerConfig.RETRIES_CONFIG, retries);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
props.put(ProducerConfig.LINGER_MS_CONFIG, linger);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
}
发送kafka消息类
@Component
@Slf4j
public class KafkaMessageProducer {
@Resource
KafkaTemplate kafkaTemplate;
/**
* 发送消息
*
* @param message
*/
public void sendMessage(ProducerMessage message) {
log.info("kafka 请求参数:[{}]", JsonUtils.toJSONString(message));
try {
if (StringUtils.isNotBlank(message.getKey())) {
this.kafkaTemplate.send(message.getTopic(), message.getKey(), message.getBody());
} else {
this.kafkaTemplate.send(message.getTopic(), message.getBody());
}
} catch (Exception e) {
log.error("发送kafka异常:[{}]", message, e.getMessage(), e);
}
log.info("kafka response:[{}]", message);
}
}
接受kafka消息
@KafkaListener(topics = {Constants.CREATE_ORDER_DELIVERY_TOPIC})
public void createReceivingAddress(String message){
log.info("CREATE_ORDER_DELIVERY_TOPIC kafak message:{[]}",message);
ReceivingAddress receivingAddress = JsonUtils.parseObject(message,ReceivingAddress.class);
if(receivingAddress!=null){
ReceivingAddress receivingAddressOld = receivingAddressService.findOneByEntity(receivingAddress);
if(receivingAddressOld==null){
receivingAddress.setStatus(DeliveryStatus.TO_BE_PAID);
receivingAddressService.create(receivingAddress);
return;
}else{
log.error("已存在订单:{[]}",JsonUtils.toJSONString(receivingAddressOld));
}
}else{
log.error("CREATE_ORDER_DELIVERY_TOPIC kafka 消息异常");
}
}
评论