我正在通过kafka快速入门:

http://kafka.apache.org/07/quickstart.html

和基本的消费者群体示例:

https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example

我已经编写了Consumer和ConsumerThreadPool,如上所示:

import kafka.consumer.KafkaStream;
import kafka.consumer.ConsumerIterator;

public class Consumer implements Runnable {

    private KafkaStream m_stream;
    private Integer m_threadNumber;

    public Consumer(KafkaStream a_stream,Integer a_threadNumber) {
        m_threadNumber = a_threadNumber;
        m_stream = a_stream;
    }

    public void run() {
        ConsumerIterator<byte[],byte[]> it = m_stream.iterator();
        while (it.hasNext()) {
            System.out.println("Thread " + m_threadNumber + ": " + new String(it.next().message()));

        }
        System.out.println("Shutting down Thread: " + m_threadNumber);
    }
}

其他几个方面:我使用spring来管理我的zookeeper:

import javax.inject.Named;
import java.util.Properties;
import kafka.consumer.ConsumerConfig;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;

@Configuration
@ComponentScan("com.truecar.inventory.worker.core")
public class AppConfig {

    @Bean
    @Named("consumerConfig")
    private static ConsumerConfig createConsumerConfig() {
        String zookeeperAddress = "127.0.0.1:2181";
        String groupId = "inventory";
        Properties props = new Properties();
        props.put("zookeeper.connect",zookeeperAddress);
        props.put("group.id",groupId);
        props.put("zookeeper.session.timeout.ms","400");
        props.put("zookeeper.sync.time.ms","200");
        props.put("auto.commit.interval.ms","1000");
        return new ConsumerConfig(props);
    }
}

我正在使用Maven和OneJar Maven插件进行编译.但是,我编译然后运行生成的一个jar我得到以下错误:

Aug 26,2013 6:15:41 PM org.springframework.context.annotation.ClasspathScanningCandidateComponentProvider registerDefaultFilters
INFO: JSR-330 'javax.inject.Named' annotation found and supported for component scanning
Exception in thread "main" java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at com.simontuffs.onejar.Boot.run(Boot.java:340)
at com.simontuffs.onejar.Boot.main(Boot.java:166)
Caused by: java.lang.NoClassDefFoundError: scala/ScalaObject
at java.lang.classLoader.defineClass1(Native Method)
at java.lang.classLoader.defineClass(ClassLoader.java:792)
at com.simontuffs.onejar.JarClassLoader.defineClass(JarClassLoader.java:803)
at com.simontuffs.onejar.JarClassLoader.findClass(JarClassLoader.java:710)
at java.lang.classLoader.loadClass(ClassLoader.java:424)
at com.simontuffs.onejar.JarClassLoader.loadClass(JarClassLoader.java:630)
at java.lang.classLoader.loadClass(ClassLoader.java:357)
at java.lang.class.getDeclaredMethods0(Native Method)
at java.lang.class.privateGetDeclaredMethods(Class.java:2521)
at java.lang.class.getDeclaredMethods(Class.java:1845)
at org.springframework.core.type.StandardAnnotationMetadata.getAnnotatedMethods(StandardAnnotationMetadata.java:180)
at org.springframework.context.annotation.ConfigurationClassparser.doProcessConfigurationClass(ConfigurationClassparser.java:222)
at org.springframework.context.annotation.ConfigurationClassparser.processConfigurationClass(ConfigurationClassparser.java:165)
at org.springframework.context.annotation.ConfigurationClassparser.parse(ConfigurationClassparser.java:140)
at org.springframework.context.annotation.ConfigurationClasspostProcessor.processConfigBeanDeFinitions(ConfigurationClasspostProcessor.java:282)
at org.springframework.context.annotation.ConfigurationClasspostProcessor.postProcessBeanDeFinitionRegistry(ConfigurationClasspostProcessor.java:223)
at org.springframework.context.support.AbstractApplicationContext.invokebeanfactoryPostProcessors(AbstractApplicationContext.java:630)
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:461)
at org.springframework.context.annotation.AnnotationConfigApplicationContext.<init>(AnnotationConfigApplicationContext.java:73)
at com.truecar.inventory.worker.core.consumer.ConsumerThreadPool.<clinit>(ConsumerThreadPool.java:31)
at com.truecar.inventory.worker.core.application.Starter.main(Starter.java:20)
... 6 more
Caused by: java.lang.classNotFoundException: scala.ScalaObject
at com.simontuffs.onejar.JarClassLoader.findClass(JarClassLoader.java:713)
at java.lang.classLoader.loadClass(ClassLoader.java:424)
at com.simontuffs.onejar.JarClassLoader.loadClass(JarClassLoader.java:630)
at java.lang.classLoader.loadClass(ClassLoader.java:357)
... 27 more

现在,我对Kafka知之甚少,对Scala一无所知.我该如何解决?接下来我该怎么办?这是一个已知的问题?我需要其他依赖项吗?这是我的pom.xml中的kafka版本:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.9.2</artifactId>
    <version>0.8.0-beta1</version>
</dependency>

更新:我联系了Kafka dev邮件列表,他们让我知道了scala依赖项的一些特定版本要求.但是,还有一个未记录的log4j依赖项,这会导致另一个运行时,而不是编译时异常.

Exception in thread "main" java.lang.reflect.InvocationTargetException
Caused by: java.lang.NoSuchMethodError: ch.qos.logback.classic.Logger.filterandLog(Ljava/lang/String;Lorg/slf4j/Marker;Lch/qos/logback/classic/Level;Ljava/lang/String;[Ljava/lang/Object;Ljava/lang/Throwable;)V
at org.apache.log4j.Category.log(Category.java:333)
at org.apache.commons.logging.impl.Log4JLogger.debug(Log4JLogger.java:177)

另一个更新:

我发现了正确的log4j依赖:

<dependency>
        <groupId>log4j</groupId>
        <artifactId>log4j</artifactId>
        <version>1.2.17</version>
    </dependency>

但现在我遇到了一个更加神秘的运行时异常:

Exception in thread "main" java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at com.simontuffs.onejar.Boot.run(Boot.java:340)
at com.simontuffs.onejar.Boot.main(Boot.java:166)
Caused by: java.lang.NoClassDefFoundError: org/I0Itec/zkclient/IZkStateListener
at kafka.javaapi.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:64)
at kafka.javaapi.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:66)
at kafka.consumer.Consumer$.createJavaConsumerConnector(ConsumerConnector.scala:100)
at kafka.consumer.Consumer.createJavaConsumerConnector(ConsumerConnector.scala)

在这一点上,我得到了WTF的那种感觉.所以我添加了另一个依赖:

<dependency>
        <groupId>com.101tec</groupId>
        <artifactId>zkclient</artifactId>
        <version>0.3</version>
    </dependency>

但是这暴露了另一个运行时异常:

Exception in thread "main" java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at com.simontuffs.onejar.Boot.run(Boot.java:340)
at com.simontuffs.onejar.Boot.main(Boot.java:166)
Caused by: java.lang.NoClassDefFoundError: com/yammer/metrics/core/Gauge
at kafka.consumer.ZookeeperConsumerConnector.createFetcher(ZookeeperConsumerConnector.scala:146)
at kafka.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:113)
at kafka.javaapi.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:64)
at kafka.javaapi.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:66)
at kafka.consumer.Consumer$.createJavaConsumerConnector(ConsumerConnector.scala:100)
at kafka.consumer.Consumer.createJavaConsumerConnector(ConsumerConnector.scala)

我希望能够让这个婴儿的例子正常运行,但也许这是使用beta产品的代价?也许我应该切换到Apache Active MQ.但这听起来不那么有趣.我错过了什么吗?

解决方法

问题是 kafka beta was built in a way that pom generated with a jar isn’t valid and maven could not recognize it and parse properly,从而获取传递依赖.我们已经设法通过在我们的pom定义中从该pom(scala,zk等)中获取所有依赖项来缓解此问题.我们正在等待kafka的下一个beta版本,其中将解决问题.

完整依赖列表如下.请注意,您必须相应地更改scala版本依赖关系到kafka工件的后缀.

<dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>2.8.0</version>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.15</version>
            <exclusions>
                <exclusion>
                    <groupId>com.sun.jmx</groupId>
                    <artifactId>jmxri</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>com.sun.jdmk</groupId>
                    <artifactId>jmxtools</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>javax.jms</groupId>
                    <artifactId>jms</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>net.sf.jopt-simple</groupId>
            <artifactId>jopt-simple</artifactId>
            <version>3.2</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-simple</artifactId>
            <version>1.6.4</version>
        </dependency>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-compiler</artifactId>
            <version>2.8.0</version>
        </dependency>
        <dependency>
            <groupId>com.101tec</groupId>
            <artifactId>zkclient</artifactId>
            <version>0.3</version>
        </dependency>
        <dependency>
            <groupId>com.yammer.metrics</groupId>
            <artifactId>metrics-core</artifactId>
            <version>2.2.0</version>
        </dependency>
        <dependency>
            <groupId>com.yammer.metrics</groupId>
            <artifactId>metrics-annotation</artifactId>
            <version>2.2.0</version>
        </dependency>
        <dependency>
            <groupId>org.easymock</groupId>
            <artifactId>easymock</artifactId>
            <version>3.0</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.scalatest</groupId>
            <artifactId>scalatest</artifactId>
            <version>1.2</version>
            <scope>test</scope>
        </dependency>

至于

Maybe I should switch to Apache Active MQ. But that sounds less fun.
Am I missing something?

那么,你不要忘记这是测试版吗?确实发生了一些不好的事情,但目前我们正在毫不费力地运行kafka 0.7.

java – Kafka快速入门:我需要哪些依赖项?的更多相关文章

  1. Swift 学习一

    国外开发者最近发现,WWDC2014上苹果发布的新语言Swift,和古老的Scala语言在语法上存在众多的相似之处。Swift语言从语法上来看,几乎是Scala的一个分支,在以下功能上几乎是等同的:类型继承、闭包、元组、协议、扩展、泛型等。不过Swift的运行环境和Scala的区别还是很大,这个概念才是Swift最重要的。但Swift最终编译到机器代码,使用引用计数机制,与Objective-C无缝整合。所以Swift和Scala在代码表象上的相似,应该并不太影响两种语言本质机理上的重大不一致。

  2. Android上的Scala:java.lang.NoSuchMethodError:java.lang.String.isEmpty

    解决方法使用JRE/JDK1.5,它在String上没有isEmpty方法.这将避免Scala使用1.6的isEmpty而不是自己的情况.如果您也有Java库,请务必选择与1.5兼容的库.

  3. 如何将Android App用作Kafka的“制作客户”?

    使用Android应用作为ApacheKafka的“制作客户端”是否可行/是否有意义?

  4. android – 如何管理来自调用Play2!-Scala REST服务的本地移动应用程序发送的用户请求的身份验证/授权

    我一直在挖掘Play2!这在纯粹的Web透视图中是有意义的:请求是从浏览器发送的,服务器可以在客户端上存储cookie.现在,如果我有一个本地的移动应用程序,而且我只是打一个Play2!Scala应用程序支持的REST服务.在这种情况下,我没有使用浏览器,所以服务器不能在客户端应用程序上存储cookie.我还可以使用像t2v’sPlay20-auth这样的模块进行授权/认证吗?处理这种事情的最佳做法是什么?

  5. java发送kafka事务消息的实现方法

    本文主要介绍了java发送kafka事务消息的实现方法,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧

  6. Java Kafka实现延迟队列的示例代码

    kafka作为一个使用广泛的消息队列,很多人都不会陌生。本文将利用Kafka实现延迟队列,文中的示例代码讲解详细,感兴趣的小伙伴可以尝试一下

  7. 使用jmx exporter采集kafka指标示例详解

    这篇文章主要为大家介绍了使用jmx exporter采集kafka指标示例详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪

  8. KOA+egg.js集成kafka消息队列的示例

    这篇文章主要介绍了KOA+egg.js集成kafka消息队列的示例,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧

  9. Kafka消费客户端协调器GroupCoordinator详解

    这篇文章主要为大家介绍了Kafka消费客户端协调器GroupCoordinator使用示例详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪

  10. Springboot微服务项目整合Kafka实现文章上下架功能

    这篇文章主要介绍了Springboot微服务项目整合Kafka实现文章上下架功能,包括Kafka消息发送快速入门及相关功能引入,本文通过示例代码给大家介绍的非常详细,需要的朋友可以参考下

随机推荐

  1. java – 合并多个相同的Kafka Streams主题

    )添加自定义存储,以使重复过滤容错.

  2. java – 我在哪里可以找到kafka的maven存储库?

    我想尝试kafka0.8.但是我在哪里可以找到kafkamaven存储库.我应该添加哪些额外的存储库URL?我找到了一些博客但它不起作用.我正在寻找合适的maven依赖.或者我应该从git中检出它并部署在我们的内部神器中?

  3. java – 为kafka主题配置ACL

    我有一个不安全的kafka实例,有2个代理,一切运行正常,直到我决定为主题配置ACL,在ACL配置后,我的消费者停止从Kafka轮询数据,并且在获取具有相关ID的元数据时我不断收到警告错误,我的代理属性看起来像下面:-我的客户端配置如下所示:–我用下面的命令来配置ACL在我启动消费者后完成所有上述配置后,它停止接收消息.有人可以指出我在误解的地方.提前致谢.解决方法我们成功使用ACL,但没有使用P

  4. java – kafka如何平衡分区加载?

    我在kafka遇到了负载均衡的问题.所以,我创建了一个包含10个分区的主题并创建了2个消费者.这10个分区被划分并分配给这些消费者,并且工作正常.有时第一个消费者工作,有时第二.但是,在某一时刻,我们可能面临一种情况,例如第二个消费者收到消息并且需要时间来处理此消息.那么,我的问题是kafka将如何决定将消息存储在哪个分区中?

  5. java – Kafka Connect实现错误

    我在这里运行教程:http://kafka.apache.org/documentation.html#introduction当我进入“步骤7:使用KafkaConnect导入/导出数据”并尝试启动两个连接器时,我收到以下错误:这是教程的一部分:Next,we’llstarttwoconnectorsrunninginstandalonemode,whichmeanstheyruninasing

  6. java – Storm-Kafka多个出口,如何共享负载?

    提前感谢你的时间.更新响应答案:现在在卡夫卡使用多分区(即5)以下是使用的代码:builder.setspout;在每个分区上用800MB数据进行淹水测试,花费22秒完成阅读.再次使用parallelism_hint=1的代码即builder.setspout;现在需要更多23秒!可以看到,这里的经纪人可以使用hosts.add添加,而且在新的KafkaConfig.StaticHosts代码片段中将partion号指定为4.如何提及卡夫卡喷嘴的并行提示?我是新来的风暴和java!!!!

  7. java Kafka生产者错误

    我做了kafkajava生产者.但控制台说错误.kafka服务器在aws上.和制作人在我的Mac上.然而kara服务器是可以访问的.当我从制作人发送消息时,kafka服务器显示“已接受的连接…”

  8. java – Kafka – 使用高级消费者的延迟队列实现

    如果是,我将消费两次相同的消息>问题等待很长时间没有提交偏移量,例如延迟时间是24小时,将从迭代器获取下一个,睡眠24小时,进程和提交(ZK会话超时?)>ZK会话如何在没有提交新的偏移量的情况下保持活动?

  9. java – 如何以健壮的方式处理kafka发布失败

    我正在使用Kafka,我们有一个用例来构建一个容错系统,甚至连一条消息都不会错过.所以这就是问题所在:如果由于任何原因(ZooKeeperdown,Kafkabroker等)向Kafka发布失败,我们如何能够有效地处理这些消息并在事情再次恢复后重播它们.正如我所说的那样,即使单个消息失败也无法承受.另一个用例是我们还需要在任何给定时间点知道有多少消息由于任何原因而无法发布到Kafka,例如计数器功

  10. java – Kafka快速入门:我需要哪些依赖项?

    接下来我该怎么办?我需要其他依赖项吗?解决方法问题是kafkabetawasbuiltinawaythatpomgeneratedwithajarisn’tvalidandmavencouldnotrecognizeitandparseproperly,从而获取传递依赖.我们已经设法通过在我们的pom定义中从该pom中获取所有依赖项来缓解此问题.我们正在等待kafka的下一个beta版本,其中将解决问题.完整依赖列表如下.请注意,您必须相应地更改scala版本依赖关系到kafka工件的后缀.至于Maybe

返回
顶部