我有一个使用Spring Boot的框架,其中包含一个控制器 RestController
类,
@RequestMapping("/details")
@RestController
public class DataController {
private KafkaStreams kafkaStreams;
public DataController(KafkaStreams kafkaStreams) {
this.kafkaStreams = kafkaStreams;
}
@Autowired
DataService dataService;
@RequestMapping(value = "getAllDetails", method = RequestMethod.GET)
public boolean getAllDetails(KafkaStreams kafkaStreams) {
return ktableService.getAllDetails(kafkaStreams);
}
}
在我的服务实现类中,我正在使用这个
kafkaStreams
对象来查找我的不同服务的详细信息。现在我将这个框架作为依赖项在我的另一个应用程序中使用,在那里我有一个运行器类。
import org.apache.kafka.streams.KafkaStreams;
@Component
public class PipelineRunner {
private final StreamsBuilder streamsBuilder;
private final KafkaProperties kafkaProperties;
private final SerdesExt serdesExt;
@Autowired
public PipelineRunner(StreamsBuilder streamsBuilder, KafkaProperties kafkaProperties, SerdesExt serdesExt) {
this.streamsBuilder = streamsBuilder;
this.kafkaProperties = kafkaProperties;
this.serdesExt = serdesExt;
}
@PostConstruct
public void run() {
ReflectData.AllowNull.get().addStringable(Utf8.class);
ReflectData.get().addStringable(Utf8.class);
DataProcessor processor = new DataProcessor(streamsBuilder, kafkaProperties,
serdesExt);
start();
}
private void start() {
KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(),
kafkaProperties.getKafkaStreamsProperties(serdesExt));
System.out.println("----Its is started----");
DataController controller = new DataController(kafkaStreams);
kafkaStreams.start();
}
}
在这个类中,我正在尝试创建
DataController
对象。所以当我尝试运行应用程序类时,
@SpringBootApplication(scanBasePackages = { "framework package" })
@EnableConfigurationProperties(KafkaProperties.class)
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
我遇到了这个错误:
***************************
APPLICATION FAILED TO START
***************************
Description:
Parameter 0 of constructor in frameworkpackage.controllers.DataController required a bean of type 'org.apache.kafka.streams.KafkaStreams' that could not be found.
Action:
Consider defining a bean of type 'org.apache.kafka.streams.KafkaStreams' in your configuration.
我是新手,对Spring Boot不太熟悉,可能会做错什么。如果需要更多信息,我可以提供。
更新
我的pom文件,
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.2.RELEASE</version>
</parent>
<properties>
<confluent.version>4.1.0</confluent.version>
<kafka.version>1.1.0</kafka.version>
<lombok.version>1.18.0</lombok.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>${confluent.version}</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-streams-avro-serde</artifactId>
<version>${confluent.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>${kafka.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${lombok.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-avro</artifactId>
<version>2.8.5</version>
</dependency>
</dependencies>
spring boot starter
依赖?更新问题时附上pom.xml
或build.gradle
会更有见地。 - Branislav Lazicprivate KafkaStreams kafkaStreams;
应该在构造函数中自动装配或注入(根据你的日志)。 - MickaelKafkaStreams
,你必须在某个地方声明这个 bean。我认为应该是你的kafkaStreams = new KafkaStreams(streamsBuilder.build(), kafkaProperties.getKafkaStreamsProperties(serdesExt));
或类似的代码。 - Mickael