分享好友 最新动态首页 最新动态分类 切换频道
史上最轻便好用的kafka ui界面可视化图形界面工具
2024-12-27 12:17
package com.jq.kafkaui.util;

史上最轻便好用的kafka ui界面可视化图形界面工具

import com.alibaba.fastjson.JSONObject; import com.jq.kafkaui.domain.Topic; import com.jq.kafkaui.dto.ResponseDto; import com.jq.kafkaui.dto.SourceInfo; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.admin.*; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.protocol.types.Field; import org.springframework.util.StringUtils; import java.util.*; import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; @Slf4j public class KafkaUtil { public static AdminClient createAdminClientByProperties(SourceInfo sourceInfo) { Properties prop = getCommonProperties(sourceInfo); prop.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, sourceInfo.getBroker()); prop.setProperty(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "2000"); prop.setProperty(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, "2000"); return AdminClient.create(prop); } private static Properties getCommonProperties(SourceInfo sourceInfo) { Properties prop = new Properties(); String userName = sourceInfo.getUserName(); String password = sourceInfo.getPassword(); if (!StringUtils.isEmpty(userName) && !StringUtils.isEmpty(password)) { prop.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=" + userName + " password=" + password + ";"); prop.put("security.protocol", "SASL_PLAINTEXT"); prop.put("sasl.mechanism", "PLAIN"); } return prop; } public static ResponseDto listTopicsWithOptions(SourceInfo sourceInfo, String keyword) { AdminClient adminClient = null; try { // 创建AdminClient客户端对象 adminClient = createAdminClientByProperties(sourceInfo); ListTopicsOptions options = new ListTopicsOptions(); // 列出内部的Topic options.listInternal(true); // 列出所有的topic ListTopicsResult result = adminClient.listTopics(options); Collection<TopicListing> topicListings = result.listings().get(); List<Topic> collect = topicListings.stream().map(t -> { Topic topic = new Topic(); topic.setName(t.name()); topic.setInternal(t.isInternal()); return topic; }).sorted(Comparator.comparing(t -> t.getName())).collect(Collectors.toList()); if (keyword != null) { collect = collect.stream().filter(t -> t.getName().contains(keyword)).collect(Collectors.toList()); } ResponseDto success = ResponseDto.success(collect); return success; } catch (Exception e) { log.error(e.getMessage(), e); return ResponseDto.fail(e.getMessage()); } finally { adminClient.close(); } } public static void createTopic(SourceInfo sourceInfo, String topic, Integer partition, Integer replica) throws Exception { AdminClient adminClient = null; try { adminClient = createAdminClientByProperties(sourceInfo); List<NewTopic> topicList = new ArrayList(); NewTopic newTopic = new NewTopic(topic, partition, replica.shortValue()); topicList.add(newTopic); CreateTopicsResult result = adminClient.createTopics(topicList); result.all().get(); result.values().forEach((name, future) -> System.out.println("topicName:" + name)); } catch (Exception e) { } finally { adminClient.close(); } } public static Producer<String, String> getProducer(SourceInfo sourceInfo) { Properties props = getCommonProperties(sourceInfo); props.put("bootstrap.servers", sourceInfo.getBroker()); props.put("acks", "all"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<String, String> producer = new KafkaProducer<>(props); return producer; } public static KafkaConsumer<String, String> getConsumer(SourceInfo sourceInfo, String topic, String group, String offset) { Properties props = getCommonProperties(sourceInfo); props.setProperty("bootstrap.servers", sourceInfo.getBroker()); props.setProperty("group.id", group); props.setProperty("enable.auto.commit", "true"); props.setProperty("auto.commit.interval.ms", "1000"); props.setProperty("auto.offset.reset", offset); props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Collections.singleton(topic)); return consumer; } public static KafkaConsumer<String, String> getConsumer(SourceInfo sourceInfo, Collection<String> topics, String group, String offset) { Properties props = getCommonProperties(sourceInfo); props.setProperty("bootstrap.servers", sourceInfo.getBroker()); props.setProperty("group.id", group); props.setProperty("enable.auto.commit", "true"); props.setProperty("auto.commit.interval.ms", "1000"); props.setProperty("auto.offset.reset", offset); props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(topics); return consumer; } public static void main(String[] args) throws Exception { } public static void deleteTopic(SourceInfo sourceInfo, String name) { AdminClient adminClient = createAdminClientByProperties(sourceInfo); List<String> list = new ArrayList<>(); list.add(name); adminClient.deleteTopics(list); adminClient.close(); } public static JSONObject node2Json(Node node) { JSONObject leaderNode = new JSONObject(); leaderNode.put("id", node.id()); leaderNode.put("host", node.host()); leaderNode.put("port", node.port()); leaderNode.put("rack", node.rack());
最新文章
【新农人】安丘市农业考察团莅临广东新农...
 从1926年南洋归来的徐闻人倪国良在愚公楼村栽下第一棵菠萝,到种植范围由愚公楼扩大到全县,乃至跨出县域,广东省湛江市徐闻县的菠萝种植已经有了近一个世纪的历史。  在近百年发展的基础上,徐闻县近几年通过国家现代农业产业园创建,
给一个网站让做优化自然排名做到百度首页前三要怎么做
在开始进行网站优化之前,首先要明确我们的目标:将网站的自然排名提升到百度首页前三。这需要我们对网站的关键词、内容、结构、外部链接等多个方面进行全面优化。关键词研究与策略制定1. 选择合适的关键词:根据网站的主题和定位,选择具
融水网络推广优化,企业腾飞的新动力引擎
融水网络推广优化,成为企业发展的新引擎,通过精准策略提升品牌知名度,拓宽市场渠道,助力企业高效腾飞,实现业绩飞跃。随着互联网的飞速发展,网络营销已经成为企业品牌推广、产品销售的重要手段,在竞争激烈的今天,如何进行有效的融水
淘宝店铺为什么要补流量?
现在很多朋友在运营淘宝店铺的时候,一上来就会选择提升销量,有的会选择提升流量,有的会选择提升排名,有的会选择提升关键词,提升关键词里有大学问,今天跟各位分享一下淘宝运营如何提升词?淘宝运营如何提升词当我们新品上架以后,从前
粉笔垂域大模型落地面试场景 AI考官1:1模拟真实考场
  2025年度国考笔试已经落幕,广大考生即将投入到紧张的面试备考复习中。公考行业龙头粉笔(02469.HK)宣布,基于公司自研垂域大模型,推出精品面试AI点评产品,于12月13日正式上线,用户可以以1元/次的价格限时进行体验。  据了解,精
置顶【商家券API】常见问题官方精选热门
Q1:商家券接口文档参数字段”适用商品范围goods_name”是在哪里展示的?A1:在商家券详情里的优惠说明展示,具体展示规则如下:换购券:“商家券批次名称stock_name”和“适用商品范围goods_name”拼接满减券:适用商品范围goods_name折扣
新奥精准资料免费大全,可持续执行探索_免费版46.676
随着数字时代的到来,数据的获取和分析成为了企业决策的重要依据。新奥精准资料免费大全,免费版46.676,作为一套全面的数据分析工具,为用户提供了强大的数据支持。本文将详细介绍这一工具的特点、功能以及如何可持续地执行探索。新奥精准
自我提升的4个好方法
月5停止无意义的抱怨。要明白,无论当下的处境多么艰难,都只是你自己造成的,与别人无关,抱怨只会雪上加霜,并不能带来任何有用的改变。与其怨天尤人,不如停下吐槽的嘴巴,踏踏实实地去做一些能改变生活的事。如果你觉得自己一无是处,
营销推广岗岗位职责
营销推广岗岗位职责15篇  在我们平凡的日常里,接触到岗位职责的地方越来越多,制定岗位职责能够有效的地防止因为职位分配不合理而导致部门之间或是员工之间出现工作推脱、责任推卸等现象发生。一般岗位职责是怎么制定的呢?下面是小编收
百度AI的2020
世界的2020,是充满不确定性的变局之年;中国的2020,是团结一心、共克时艰、于变局中开新局的希望之年;百度 AI 的2020,是坚定信念,拥抱变化,践行“科技为更好”的实干之年。 回望2020年&#x
相关文章
推荐文章
发表评论
0评